/* -*- mode: C; c-basic-offset: 4 -*- */ #define MYSQL_SERVER 1 #include "mysql_priv.h" extern "C" { #include "stdint.h" #if defined(_WIN32) #include "misc.h" #endif #define __STDC_FORMAT_MACROS #include <inttypes.h> #include "toku_os.h" } /* We define DTRACE after mysql_priv.h in case it disabled dtrace in the main server */ #ifdef HAVE_DTRACE #define _DTRACE_VERSION 1 #else #endif #include <mysql/plugin.h> #include "hatoku_hton.h" #include "hatoku_defines.h" #include "ha_tokudb.h" #undef PACKAGE #undef VERSION #undef HAVE_DTRACE #undef _DTRACE_VERSION #define TOKU_METADB_NAME ".\\tokudb_meta.tokudb" static inline void *thd_data_get(THD *thd, int slot) { return thd->ha_data[slot].ha_ptr; } static inline void thd_data_set(THD *thd, int slot, void *data) { thd->ha_data[slot].ha_ptr = data; } static uchar *tokudb_get_key(TOKUDB_SHARE * share, size_t * length, my_bool not_used __attribute__ ((unused))) { *length = share->table_name_length; return (uchar *) share->table_name; } static handler *tokudb_create_handler(handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root); static MYSQL_THDVAR_BOOL(commit_sync, PLUGIN_VAR_THDLOCAL, "sync on txn commit", /* check */ NULL, /* update */ NULL, /* default*/ TRUE); static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer); static void tokudb_cleanup_log_files(void); static int tokudb_end(handlerton * hton, ha_panic_function type); static bool tokudb_flush_logs(handlerton * hton); static bool tokudb_show_status(handlerton * hton, THD * thd, stat_print_fn * print, enum ha_stat_type); static int tokudb_close_connection(handlerton * hton, THD * thd); static int tokudb_commit(handlerton * hton, THD * thd, bool all); static int tokudb_rollback(handlerton * hton, THD * thd, bool all); static uint tokudb_alter_table_flags(uint flags); #if 0 static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint); static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint); static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint); #endif static bool tokudb_show_logs(THD * thd, stat_print_fn * stat_print); handlerton *tokudb_hton; const char *ha_tokudb_ext = ".tokudb"; char *tokudb_data_dir; ulong tokudb_debug; DB_ENV *db_env; DB* metadata_db; HASH tokudb_open_tables; pthread_mutex_t tokudb_mutex; pthread_mutex_t tokudb_meta_mutex; //my_bool tokudb_shared_data = FALSE; static u_int32_t tokudb_init_flags = DB_CREATE | DB_THREAD | DB_PRIVATE | DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | 0 | // disabled for 1.0.2 DB_INIT_LOG | DB_RECOVER; static u_int32_t tokudb_env_flags = 0; // static u_int32_t tokudb_lock_type = DB_LOCK_DEFAULT; // static ulong tokudb_log_buffer_size = 0; // static ulong tokudb_log_file_size = 0; static ulonglong tokudb_cache_size = 0; static char *tokudb_home; // static char *tokudb_tmpdir; static char *tokudb_log_dir; // static long tokudb_lock_scan_time = 0; // static ulong tokudb_region_size = 0; // static ulong tokudb_cache_parts = 1; static ulong tokudb_max_lock; static const char tokudb_hton_name[] = "TokuDB"; static const int tokudb_hton_name_length = sizeof(tokudb_hton_name) - 1; static u_int32_t tokudb_checkpointing_period; u_int32_t tokudb_write_status_frequency; u_int32_t tokudb_read_status_frequency; my_bool tokudb_prelock_empty; #ifdef TOKUDB_VERSION char *tokudb_version = TOKUDB_VERSION; #else char *tokudb_version; #endif struct st_mysql_storage_engine storage_engine_structure = { MYSQL_HANDLERTON_INTERFACE_VERSION }; extern "C" { #include "ydb.h" } static int tokudb_init_func(void *p) { TOKUDB_DBUG_ENTER("tokudb_init_func"); int r; #if defined(_WIN32) r = toku_ydb_init(); if (r) { goto error; } #endif db_env = NULL; metadata_db = NULL; tokudb_hton = (handlerton *) p; VOID(pthread_mutex_init(&tokudb_mutex, MY_MUTEX_INIT_FAST)); VOID(pthread_mutex_init(&tokudb_meta_mutex, MY_MUTEX_INIT_FAST)); (void) hash_init(&tokudb_open_tables, system_charset_info, 32, 0, 0, (hash_get_key) tokudb_get_key, 0, 0); tokudb_hton->state = SHOW_OPTION_YES; // tokudb_hton->flags= HTON_CAN_RECREATE; // QQQ this came from skeleton tokudb_hton->flags = HTON_CLOSE_CURSORS_AT_COMMIT | HTON_FLUSH_AFTER_RENAME; #ifdef DB_TYPE_TOKUDB tokudb_hton->db_type = DB_TYPE_TOKUDB; #else tokudb_hton->db_type = DB_TYPE_UNKNOWN; #endif tokudb_hton->create = tokudb_create_handler; tokudb_hton->close_connection = tokudb_close_connection; #if 0 tokudb_hton->savepoint_offset = sizeof(DB_TXN *); tokudb_hton->savepoint_set = tokudb_savepoint; tokudb_hton->savepoint_rollback = tokudb_rollback_to_savepoint; tokudb_hton->savepoint_release = tokudb_release_savepoint; #endif tokudb_hton->commit = tokudb_commit; tokudb_hton->rollback = tokudb_rollback; tokudb_hton->panic = tokudb_end; tokudb_hton->flush_logs = tokudb_flush_logs; tokudb_hton->show_status = tokudb_show_status; tokudb_hton->alter_table_flags = tokudb_alter_table_flags; #if 0 if (!tokudb_tmpdir) tokudb_tmpdir = mysql_tmpdir; DBUG_PRINT("info", ("tokudb_tmpdir: %s", tokudb_tmpdir)); #endif if (!tokudb_home) tokudb_home = mysql_real_data_home; DBUG_PRINT("info", ("tokudb_home: %s", tokudb_home)); #if 0 if (!tokudb_log_buffer_size) { // QQQ tokudb_log_buffer_size = max(table_cache_size * 512, 32 * 1024); DBUG_PRINT("info", ("computing tokudb_log_buffer_size %ld\n", tokudb_log_buffer_size)); } tokudb_log_file_size = tokudb_log_buffer_size * 4; tokudb_log_file_size = MY_ALIGN(tokudb_log_file_size, 1024 * 1024L); tokudb_log_file_size = max(tokudb_log_file_size, 10 * 1024 * 1024L); DBUG_PRINT("info", ("computing tokudb_log_file_size: %ld\n", tokudb_log_file_size)); #endif if ((r = db_env_create(&db_env, 0))) { DBUG_PRINT("info", ("db_env_create %d\n", r)); goto error; } DBUG_PRINT("info", ("tokudb_env_flags: 0x%x\n", tokudb_env_flags)); r = db_env->set_flags(db_env, tokudb_env_flags, 1); if (r) { // QQQ if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:WARNING: flags=%x r=%d\n", __FUNCTION__, tokudb_env_flags, r); // goto error; } // config error handling db_env->set_errcall(db_env, tokudb_print_error); db_env->set_errpfx(db_env, "TokuDB"); // // set default comparison functions // r = db_env->set_default_bt_compare(db_env, tokudb_cmp_dbt_key); if (r) { DBUG_PRINT("info", ("set_default_bt_compare%d\n", r)); goto error; } r = db_env->set_default_dup_compare(db_env, tokudb_cmp_dbt_data); if (r) { DBUG_PRINT("info", ("set_default_dup_compare%d\n", r)); goto error; } // config directories #if 0 DBUG_PRINT("info", ("tokudb_tmpdir: %s\n", tokudb_tmpdir)); db_env->set_tmp_dir(db_env, tokudb_tmpdir); #endif { char *data_dir = tokudb_data_dir; if (data_dir == 0) data_dir = mysql_data_home; DBUG_PRINT("info", ("tokudb_data_dir: %s\n", data_dir)); db_env->set_data_dir(db_env, data_dir); } if (tokudb_log_dir) { DBUG_PRINT("info", ("tokudb_log_dir: %s\n", tokudb_log_dir)); db_env->set_lg_dir(db_env, tokudb_log_dir); } // config the cache table size to min(1/2 of physical memory, 1/8 of the process address space) if (tokudb_cache_size == 0) { uint64_t physmem, maxdata; physmem = toku_os_get_phys_memory_size(); tokudb_cache_size = physmem / 2; r = toku_os_get_max_process_data_size(&maxdata); if (r == 0) { if (tokudb_cache_size > maxdata / 8) tokudb_cache_size = maxdata / 8; } } if (tokudb_cache_size) { DBUG_PRINT("info", ("tokudb_cache_size: %lld\n", tokudb_cache_size)); r = db_env->set_cachesize(db_env, (u_int32_t)(tokudb_cache_size >> 30), (u_int32_t)(tokudb_cache_size % (1024L * 1024L * 1024L)), 1); if (r) { DBUG_PRINT("info", ("set_cachesize %d\n", r)); goto error; } } u_int32_t gbytes, bytes; int parts; r = db_env->get_cachesize(db_env, &gbytes, &bytes, &parts); if (r == 0) if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:tokudb_cache_size=%lld\n", __FUNCTION__, ((unsigned long long) gbytes << 30) + bytes); #if 0 // QQQ config the logs DBUG_PRINT("info", ("tokudb_log_file_size: %ld\n", tokudb_log_file_size)); db_env->set_lg_max(db_env, tokudb_log_file_size); DBUG_PRINT("info", ("tokudb_log_buffer_size: %ld\n", tokudb_log_buffer_size)); db_env->set_lg_bsize(db_env, tokudb_log_buffer_size); // DBUG_PRINT("info",("tokudb_region_size: %ld\n", tokudb_region_size)); // db_env->set_lg_regionmax(db_env, tokudb_region_size); #endif // config the locks #if 0 // QQQ no lock types yet DBUG_PRINT("info", ("tokudb_lock_type: 0x%lx\n", tokudb_lock_type)); db_env->set_lk_detect(db_env, tokudb_lock_type); #endif if (tokudb_max_lock) { DBUG_PRINT("info",("tokudb_max_lock: %ld\n", tokudb_max_lock)); r = db_env->set_lk_max_locks(db_env, tokudb_max_lock); if (r) { DBUG_PRINT("info", ("tokudb_set_max_locks %d\n", r)); goto error; } } if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:env open:flags=%x\n", __FUNCTION__, tokudb_init_flags); r = db_env->open(db_env, tokudb_home, tokudb_init_flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH); if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:env opened:return=%d\n", __FUNCTION__, r); if (r) { DBUG_PRINT("info", ("env->open %d\n", r)); goto error; } r = db_env->checkpointing_set_period(db_env, tokudb_checkpointing_period); assert(!r); r = db_create(&metadata_db, db_env, 0); if (r) { DBUG_PRINT("info", ("failed to create metadata db %d\n", r)); goto error; } metadata_db->set_bt_compare(metadata_db, tokudb_cmp_dbt_key); r= metadata_db->open(metadata_db, 0, TOKU_METADB_NAME, NULL, DB_BTREE, DB_THREAD|DB_AUTO_COMMIT, 0); if (r) { sql_print_error("No metadata table exists, so creating it"); r= metadata_db->open(metadata_db, NULL, TOKU_METADB_NAME, NULL, DB_BTREE, DB_THREAD | DB_CREATE, my_umask); if (r) { goto error; } metadata_db->close(metadata_db,0); r = db_create(&metadata_db, db_env, 0); if (r) { DBUG_PRINT("info", ("failed to create metadata db %d\n", r)); goto error; } metadata_db->set_bt_compare(metadata_db, tokudb_cmp_dbt_key); r= metadata_db->open(metadata_db, 0, TOKU_METADB_NAME, NULL, DB_BTREE, DB_THREAD|DB_AUTO_COMMIT, 0); if (r) { goto error; } } DBUG_RETURN(FALSE); error: if (metadata_db) { metadata_db->close(metadata_db, 0); } if (db_env) { db_env->close(db_env, 0); db_env = 0; } DBUG_RETURN(TRUE); } static int tokudb_done_func(void *p) { TOKUDB_DBUG_ENTER("tokudb_done_func"); int error = 0; if (tokudb_open_tables.records) error = 1; hash_free(&tokudb_open_tables); pthread_mutex_destroy(&tokudb_mutex); pthread_mutex_destroy(&tokudb_meta_mutex); #if defined(_WIN32) toku_ydb_destroy(); #endif TOKUDB_DBUG_RETURN(0); } static handler *tokudb_create_handler(handlerton * hton, TABLE_SHARE * table, MEM_ROOT * mem_root) { return new(mem_root) ha_tokudb(hton, table); } int tokudb_end(handlerton * hton, ha_panic_function type) { TOKUDB_DBUG_ENTER("tokudb_end"); int error = 0; if (metadata_db) { metadata_db->close(metadata_db, 0); } if (db_env) { if (tokudb_init_flags & DB_INIT_LOG) tokudb_cleanup_log_files(); error = db_env->close(db_env, 0); // Error is logged db_env = NULL; } TOKUDB_DBUG_RETURN(error); } static int tokudb_close_connection(handlerton * hton, THD * thd) { int error = 0; tokudb_trx_data* trx = NULL; trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); if (trx && trx->checkpoint_lock_taken) { error = db_env->checkpointing_resume(db_env); } my_free(trx, MYF(0)); return error; } bool tokudb_flush_logs(handlerton * hton) { TOKUDB_DBUG_ENTER("tokudb_flush_logs"); int error; bool result = 0; u_int32_t curr_tokudb_checkpointing_period = 0; // // get the current checkpointing period // error = db_env->checkpointing_get_period( db_env, &curr_tokudb_checkpointing_period ); if (error) { my_error(ER_ERROR_DURING_CHECKPOINT, MYF(0), error); result = 1; goto exit; } // // if the current period is not the same as the variable, that means // the user has changed the period and now we need to update it // if (tokudb_checkpointing_period != curr_tokudb_checkpointing_period) { error = db_env->checkpointing_set_period( db_env, tokudb_checkpointing_period ); if (error) { my_error(ER_ERROR_DURING_CHECKPOINT, MYF(0), error); result = 1; goto exit; } } // // take the checkpoint // error = db_env->txn_checkpoint(db_env, 0, 0, 0); if (error) { my_error(ER_ERROR_DURING_CHECKPOINT, MYF(0), error); result = 1; goto exit; } result = 0; exit: TOKUDB_DBUG_RETURN(result); } static int tokudb_commit(handlerton * hton, THD * thd, bool all) { TOKUDB_DBUG_ENTER("tokudb_commit"); DBUG_PRINT("trans", ("ending transaction %s", all ? "all" : "stmt")); u_int32_t syncflag = THDVAR(thd, commit_sync) ? 0 : DB_TXN_NOSYNC; tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot); DB_TXN **txn = all ? &trx->all : &trx->stmt; int error = 0; if (*txn) { if (tokudb_debug & TOKUDB_DEBUG_TXN) TOKUDB_TRACE("commit:%d:%p\n", all, *txn); error = (*txn)->commit(*txn, syncflag); if (*txn == trx->sp_level) trx->sp_level = 0; *txn = 0; } else if (tokudb_debug & TOKUDB_DEBUG_TXN) { TOKUDB_TRACE("commit0\n"); } if (all) { trx->iso_level = hatoku_iso_not_set; } reset_stmt_progress(&trx->stmt_progress); TOKUDB_DBUG_RETURN(error); } static int tokudb_rollback(handlerton * hton, THD * thd, bool all) { TOKUDB_DBUG_ENTER("tokudb_rollback"); DBUG_PRINT("trans", ("aborting transaction %s", all ? "all" : "stmt")); tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot); DB_TXN **txn = all ? &trx->all : &trx->stmt; int error = 0; if (*txn) { if (tokudb_debug & TOKUDB_DEBUG_TXN) TOKUDB_TRACE("rollback:%p\n", *txn); error = (*txn)->abort(*txn); if (*txn == trx->sp_level) trx->sp_level = 0; *txn = 0; } else { if (tokudb_debug & TOKUDB_DEBUG_TXN) { TOKUDB_TRACE("abort0\n"); } } if (all) { trx->iso_level = hatoku_iso_not_set; } reset_stmt_progress(&trx->stmt_progress); TOKUDB_DBUG_RETURN(error); } #if 0 static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint) { TOKUDB_DBUG_ENTER("tokudb_savepoint"); int error; DB_TXN **save_txn = (DB_TXN **) savepoint; tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot); if (!(error = db_env->txn_begin(db_env, trx->sp_level, save_txn, 0))) { trx->sp_level = *save_txn; } TOKUDB_DBUG_RETURN(error); } static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint) { TOKUDB_DBUG_ENTER("tokudb_rollback_to_savepoint"); int error; DB_TXN *parent, **save_txn = (DB_TXN **) savepoint; tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot); parent = (*save_txn)->parent; if (!(error = (*save_txn)->abort(*save_txn))) { trx->sp_level = parent; error = tokudb_savepoint(hton, thd, savepoint); } TOKUDB_DBUG_RETURN(error); } static int tokudb_release_savepoint(handlerton * hton, THD * thd, void *savepoint) { TOKUDB_DBUG_ENTER("tokudb_release_savepoint"); int error; DB_TXN *parent, **save_txn = (DB_TXN **) savepoint; tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot); parent = (*save_txn)->parent; if (!(error = (*save_txn)->commit(*save_txn, 0))) { trx->sp_level = parent; *save_txn = 0; } TOKUDB_DBUG_RETURN(error); } #endif static int smart_dbt_do_nothing (DBT const *key, DBT const *row, void *context) { return 0; } static bool tokudb_show_data_size(THD * thd, stat_print_fn * stat_print, bool exact) { TOKUDB_DBUG_ENTER("tokudb_show_data_size"); int error; u_int64_t num_bytes_in_db = 0; DB* curr_db = NULL; DB_TXN* txn = NULL; DBC* tmp_cursor = NULL; DBC* tmp_table_cursor = NULL; DBT curr_key; DBT curr_val; char data_amount_msg[50] = {0}; memset(&curr_key, 0, sizeof curr_key); memset(&curr_val, 0, sizeof curr_val); pthread_mutex_lock(&tokudb_meta_mutex); error = db_env->txn_begin(db_env, 0, &txn, DB_READ_UNCOMMITTED); if (error) { goto cleanup; } error = metadata_db->cursor(metadata_db, txn, &tmp_cursor, 0); if (error) { goto cleanup; } while (error == 0) { // // here, and in other places, check if process has been killed // if so, get out of function so user is not stalled // if (thd->killed) { break; } // // do not need this to be super fast, so use old simple API // error = tmp_cursor->c_get( tmp_cursor, &curr_key, &curr_val, DB_NEXT ); if (!error) { char* name = (char *)curr_key.data; char* newname = NULL; char name_buff[FN_REFLEN]; char* fn_ret = NULL; u_int64_t curr_num_bytes = 0; DB_BTREE_STAT64 dict_stats; newname = (char *)my_malloc( get_max_dict_name_path_length(name), MYF(MY_WME|MY_ZEROFILL) ); if (newname == NULL) { error = ENOMEM; goto cleanup; } make_name(newname, name, "main"); fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH); error = db_create(&curr_db, db_env, 0); if (error) { goto cleanup; } error = curr_db->open(curr_db, 0, name_buff, NULL, DB_BTREE, DB_THREAD, 0); if (error == ENOENT) { error = 0; continue; } if (error) { goto cleanup; } if (exact) { // // flatten if exact is required // uint curr_num_items = 0; error = curr_db->cursor(curr_db, txn, &tmp_table_cursor, 0); if (error) { tmp_table_cursor = NULL; goto cleanup; } while (error != DB_NOTFOUND) { error = tmp_table_cursor->c_getf_next(tmp_table_cursor, 0, smart_dbt_do_nothing, NULL); if (error && error != DB_NOTFOUND) { goto cleanup; } curr_num_items++; // // allow early exit if command has been killed // if ( (curr_num_items % 1000) == 0 && thd->killed) { goto cleanup; } } tmp_table_cursor->c_close(tmp_table_cursor); tmp_table_cursor = NULL; } error = curr_db->stat64( curr_db, txn, &dict_stats ); if (error) { goto cleanup; } curr_num_bytes = dict_stats.bt_dsize; if (*(uchar *)curr_val.data) { // // in this case, we have a hidden primary key, do not // want to report space taken up by the hidden primary key to the user // u_int64_t hpk_space = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH*dict_stats.bt_ndata; curr_num_bytes = (hpk_space > curr_num_bytes) ? 0 : curr_num_bytes - hpk_space; } else { // // one infinity byte per key needs to be subtracted // u_int64_t inf_byte_space = dict_stats.bt_ndata; curr_num_bytes = (inf_byte_space > curr_num_bytes) ? 0 : curr_num_bytes - inf_byte_space; } num_bytes_in_db += curr_num_bytes; curr_db->close(curr_db, 0); curr_db = NULL; my_free(newname,MYF(MY_ALLOW_ZERO_PTR)); } } sprintf(data_amount_msg, "Number of bytes in database: %" PRIu64, num_bytes_in_db); stat_print( thd, tokudb_hton_name, tokudb_hton_name_length, "Data in tables", strlen("Data in tables"), data_amount_msg, strlen(data_amount_msg) ); error = 0; cleanup: if (curr_db) { curr_db->close(curr_db, 0); } if (tmp_cursor) { tmp_cursor->c_close(tmp_cursor); } if (tmp_table_cursor) { tmp_table_cursor->c_close(tmp_table_cursor); } if (txn) { txn->commit(txn, 0); } if (error) { sql_print_error("got an error %d in show_data_size\n", error); } pthread_mutex_unlock(&tokudb_meta_mutex); if (error) { my_errno = error; } TOKUDB_DBUG_RETURN(error); } static bool tokudb_show_logs(THD * thd, stat_print_fn * stat_print) { TOKUDB_DBUG_ENTER("tokudb_show_logs"); char **all_logs, **free_logs, **a, **f; int error = 1; MEM_ROOT **root_ptr = my_pthread_getspecific_ptr(MEM_ROOT **, THR_MALLOC); MEM_ROOT show_logs_root, *old_mem_root = *root_ptr; init_sql_alloc(&show_logs_root, BDB_LOG_ALLOC_BLOCK_SIZE, BDB_LOG_ALLOC_BLOCK_SIZE); *root_ptr = &show_logs_root; all_logs = free_logs = 0; error = db_env->log_archive(db_env, &all_logs, 0); if (error) { DBUG_PRINT("error", ("log_archive failed (error %d)", error)); db_env->err(db_env, error, "log_archive"); if (error == DB_NOTFOUND) error = 0; // No log files goto err; } /* Error is 0 here */ if (all_logs) { for (a = all_logs, f = free_logs; *a; ++a) { if (f && *f && strcmp(*a, *f) == 0) { f++; if ((error = stat_print(thd, tokudb_hton_name, tokudb_hton_name_length, *a, strlen(*a), STRING_WITH_LEN(SHOW_LOG_STATUS_FREE)))) break; } else { if ((error = stat_print(thd, tokudb_hton_name, tokudb_hton_name_length, *a, strlen(*a), STRING_WITH_LEN(SHOW_LOG_STATUS_INUSE)))) break; } } } err: if (all_logs) free(all_logs); if (free_logs) free(free_logs); free_root(&show_logs_root, MYF(0)); *root_ptr = old_mem_root; if (error) { my_errno = error; } TOKUDB_DBUG_RETURN(error); } #define STATPRINT(legend, val) stat_print(thd, \ tokudb_hton_name, \ tokudb_hton_name_length, \ legend, \ strlen(legend), \ val, \ strlen(val)) static bool tokudb_show_engine_status(THD * thd, stat_print_fn * stat_print) { TOKUDB_DBUG_ENTER("tokudb_show_engine_status"); int error; char buf[1024] = {'\0'}; ENGINE_STATUS engstat; error = db_env->get_engine_status(db_env, &engstat); if (error == 0) { STATPRINT("time now", engstat.now); const char * lockstat = (engstat.ydb_lock_ctr & 0x01) ? "Locked" : "Unlocked"; u_int32_t lockctr = engstat.ydb_lock_ctr >> 1; // lsb indicates if locked sprintf(buf, "%" PRIu32, lockctr); STATPRINT("ydb lock", lockstat); STATPRINT("ydb lock counter", buf); lockstat = (engstat.logger_lock_ctr & 0x01) ? "Locked" : "Unlocked"; lockctr = engstat.logger_lock_ctr >> 1; // lsb indicates if locked sprintf(buf, "%" PRIu32, lockctr); STATPRINT("logger lock", lockstat); STATPRINT("logger lock counter", buf); lockstat = (engstat.cachetable_lock_ctr & 0x01) ? "Locked" : "Unlocked"; lockctr = engstat.cachetable_lock_ctr >> 1; // lsb indicates if locked sprintf(buf, "%" PRIu32, lockctr); STATPRINT("cachetable lock", lockstat); STATPRINT("cachetable lock counter", buf); sprintf(buf, "%" PRIu64, engstat.cachetable_hit); STATPRINT("cachetable hit", buf); sprintf(buf, "%" PRIu64, engstat.cachetable_miss); STATPRINT("cachetable miss", buf); sprintf(buf, "%" PRIu64, engstat.cachetable_wait_reading); STATPRINT("cachetable wait reading", buf); sprintf(buf, "%" PRIu64, engstat.cachetable_wait_writing); STATPRINT("cachetable wait writing", buf); sprintf(buf, "%" PRIu64, engstat.puts); STATPRINT("cachetable puts (new node)", buf); sprintf(buf, "%" PRIu64, engstat.prefetches); STATPRINT("cachetable prefetches", buf); sprintf(buf, "%" PRIu64, engstat.maybe_get_and_pins); STATPRINT("cachetable maybe_get_and_pins", buf); sprintf(buf, "%" PRIu64, engstat.maybe_get_and_pin_hits); STATPRINT("cachetable maybe_get_and_pin_hits", buf); sprintf(buf, "%" PRIu64, engstat.cachetable_size_current); STATPRINT("cachetable size_current", buf); sprintf(buf, "%" PRIu64, engstat.cachetable_size_limit); STATPRINT("cachetable size_limit", buf); sprintf(buf, "%" PRIu64, engstat.cachetable_size_writing); STATPRINT("cachetable size_writing", buf); sprintf(buf, "%" PRIu32, engstat.checkpoint_period); STATPRINT("checkpoint period", buf); sprintf(buf, "%" PRIu32, engstat.checkpoint_footprint); STATPRINT("checkpoint status code (0 = idle)", buf); STATPRINT("last complete checkpoint began ", engstat.checkpoint_time_begin_complete); STATPRINT("last complete checkpoint ended ", engstat.checkpoint_time_end); STATPRINT("last checkpoint began ", engstat.checkpoint_time_begin); sprintf(buf, "%" PRIu32, engstat.range_locks_max); STATPRINT("max range locks", buf); sprintf(buf, "%" PRIu32, engstat.range_locks_max_per_db); STATPRINT("max range locks per db", buf); sprintf(buf, "%" PRIu32, engstat.range_locks_curr); STATPRINT("range locks in use", buf); sprintf(buf, "%" PRIu64, engstat.inserts); STATPRINT("dictionary inserts", buf); sprintf(buf, "%" PRIu64, engstat.deletes); STATPRINT("dictionary deletes", buf); sprintf(buf, "%" PRIu64, engstat.point_queries); STATPRINT("dictionary point queries", buf); sprintf(buf, "%" PRIu64, engstat.sequential_queries); STATPRINT("dictionary sequential queries", buf); sprintf(buf, "%" PRIu64, engstat.commits); STATPRINT("txn commits", buf); sprintf(buf, "%" PRIu64, engstat.aborts); STATPRINT("txn aborts", buf); } if (error) { my_errno = error; } TOKUDB_DBUG_RETURN(error); } int tokudb_checkpoint_lock(THD * thd, stat_print_fn * stat_print) { int error; tokudb_trx_data* trx = NULL; trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); if (!trx) { error = create_tokudb_trx_data_instance(&trx); if (error) { goto cleanup; } thd_data_set(thd, tokudb_hton->slot, trx); } if (trx->checkpoint_lock_taken) { STATPRINT("checkpoint lock", "Lock already taken"); error = 0; goto cleanup; } error = db_env->checkpointing_postpone(db_env); if (error) { goto cleanup; } trx->checkpoint_lock_taken = true; STATPRINT("checkpoint lock", "Lock successfully taken"); error = 0; cleanup: if (error) { my_errno = error; } return error; } int tokudb_checkpoint_unlock(THD * thd, stat_print_fn * stat_print) { int error; tokudb_trx_data* trx = NULL; trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); if (!trx) { error = 0; STATPRINT("checkpoint unlock", "Lock never taken"); goto cleanup; } if (!trx->checkpoint_lock_taken) { error = 0; STATPRINT("checkpoint unlock", "Lock never taken"); goto cleanup; } // // at this point, we know the checkpoint lock has been taken // error = db_env->checkpointing_resume(db_env); if (error) {goto cleanup;} trx->checkpoint_lock_taken = false; STATPRINT("checkpoint unlock", "Successfully unlocked"); cleanup: if (error) { my_errno = error; } return error; } bool tokudb_show_status(handlerton * hton, THD * thd, stat_print_fn * stat_print, enum ha_stat_type stat_type) { switch (stat_type) { case HA_ENGINE_DATA_AMOUNT: return tokudb_show_data_size(thd, stat_print, false); break; case HA_ENGINE_DATA_EXACT_AMOUNT: return tokudb_show_data_size(thd, stat_print, true); break; case HA_ENGINE_LOGS: return tokudb_show_logs(thd, stat_print); break; case HA_ENGINE_STATUS: return tokudb_show_engine_status(thd, stat_print); break; case HA_ENGINE_CHECKPOINT_LOCK: return tokudb_checkpoint_lock(thd, stat_print); break; case HA_ENGINE_CHECKPOINT_UNLOCK: return tokudb_checkpoint_unlock(thd, stat_print); break; default: break; } return FALSE; } static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer) { sql_print_error("%s: %s", db_errpfx, buffer); } void tokudb_cleanup_log_files(void) { TOKUDB_DBUG_ENTER("tokudb_cleanup_log_files"); char **names; int error; if ((error = db_env->txn_checkpoint(db_env, 0, 0, 0))) my_error(ER_ERROR_DURING_CHECKPOINT, MYF(0), error); if ((error = db_env->log_archive(db_env, &names, 0)) != 0) { DBUG_PRINT("error", ("log_archive failed (error %d)", error)); db_env->err(db_env, error, "log_archive"); DBUG_VOID_RETURN; } if (names) { char **np; for (np = names; *np; ++np) { #if 1 if (tokudb_debug) TOKUDB_TRACE("%s:cleanup:%s\n", __FUNCTION__, *np); #else my_delete(*np, MYF(MY_WME)); #endif } free(names); } DBUG_VOID_RETURN; } // // *******NOTE***** // If the flags HA_ONLINE_DROP_INDEX and HA_ONLINE_DROP_UNIQUE_INDEX // are ever added, prepare_drop_index and final_drop_index will need to be modified // so that the actual deletion of DB's is done in final_drop_index and not prepare_drop_index // static uint tokudb_alter_table_flags(uint flags) { return (HA_ONLINE_ADD_INDEX_NO_WRITES| HA_ONLINE_DROP_INDEX_NO_WRITES | HA_ONLINE_ADD_UNIQUE_INDEX_NO_WRITES| HA_ONLINE_DROP_UNIQUE_INDEX_NO_WRITES); } // options flags // PLUGIN_VAR_THDLOCAL Variable is per-connection // PLUGIN_VAR_READONLY Server variable is read only // PLUGIN_VAR_NOSYSVAR Not a server variable // PLUGIN_VAR_NOCMDOPT Not a command line option // PLUGIN_VAR_NOCMDARG No argument for cmd line // PLUGIN_VAR_RQCMDARG Argument required for cmd line // PLUGIN_VAR_OPCMDARG Argument optional for cmd line // PLUGIN_VAR_MEMALLOC String needs memory allocated // system variables static MYSQL_SYSVAR_ULONGLONG(cache_size, tokudb_cache_size, PLUGIN_VAR_READONLY, "TokuDB cache table size", NULL, NULL, 0, 0, ~0LL, 0); static MYSQL_SYSVAR_ULONG(max_lock, tokudb_max_lock, PLUGIN_VAR_READONLY, "TokuDB Max Locks", NULL, NULL, 8 * 1024, 0, ~0L, 0); static MYSQL_SYSVAR_ULONG(debug, tokudb_debug, PLUGIN_VAR_READONLY, "TokuDB Debug", NULL, NULL, 0, 0, ~0L, 0); static MYSQL_SYSVAR_STR(log_dir, tokudb_log_dir, PLUGIN_VAR_READONLY, "TokuDB Log Directory", NULL, NULL, NULL); static MYSQL_SYSVAR_STR(data_dir, tokudb_data_dir, PLUGIN_VAR_READONLY, "TokuDB Data Directory", NULL, NULL, NULL); static MYSQL_SYSVAR_STR(version, tokudb_version, PLUGIN_VAR_READONLY, "TokuDB Version", NULL, NULL, NULL); static MYSQL_SYSVAR_UINT(init_flags, tokudb_init_flags, PLUGIN_VAR_READONLY, "Sets TokuDB DB_ENV->open flags", NULL, NULL, tokudb_init_flags, 0, ~0, 0); static MYSQL_SYSVAR_UINT(checkpointing_period, tokudb_checkpointing_period, 0, "TokuDB Checkpointing period", NULL, NULL, 60, 0, ~0L, 0); static MYSQL_SYSVAR_BOOL(prelock_empty, tokudb_prelock_empty, 0, "Tokudb Prelock Empty Table", NULL, NULL, TRUE); static MYSQL_SYSVAR_UINT(write_status_frequency, tokudb_write_status_frequency, 0, "TokuDB frequency that show processlist updates status of writes", NULL, NULL, 1000, 0, ~0L, 0); static MYSQL_SYSVAR_UINT(read_status_frequency, tokudb_read_status_frequency, 0, "TokuDB frequency that show processlist updates status of reads", NULL, NULL, 10000, 0, ~0L, 0); #if 0 static MYSQL_SYSVAR_ULONG(cache_parts, tokudb_cache_parts, PLUGIN_VAR_READONLY, "Sets TokuDB set_cache_parts", NULL, NULL, 0, 0, ~0L, 0); // this is really a u_int32_t // ? use MYSQL_SYSVAR_SET static MYSQL_SYSVAR_UINT(env_flags, tokudb_env_flags, PLUGIN_VAR_READONLY, "Sets TokuDB env_flags", NULL, NULL, DB_LOG_AUTOREMOVE, 0, ~0, 0); static MYSQL_SYSVAR_STR(home, tokudb_home, PLUGIN_VAR_READONLY, "Sets TokuDB env->open home", NULL, NULL, NULL); // this is really a u_int32_t //? use MYSQL_SYSVAR_SET // this looks to be unused static MYSQL_SYSVAR_LONG(lock_scan_time, tokudb_lock_scan_time, PLUGIN_VAR_READONLY, "Tokudb Lock Scan Time (UNUSED)", NULL, NULL, 0, 0, ~0L, 0); // this is really a u_int32_t //? use MYSQL_SYSVAR_ENUM static MYSQL_SYSVAR_UINT(lock_type, tokudb_lock_type, PLUGIN_VAR_READONLY, "Sets set_lk_detect", NULL, NULL, DB_LOCK_DEFAULT, 0, ~0, 0); static MYSQL_SYSVAR_ULONG(log_buffer_size, tokudb_log_buffer_size, PLUGIN_VAR_READONLY, "Tokudb Log Buffer Size", NULL, NULL, 0, 0, ~0L, 0); static MYSQL_SYSVAR_ULONG(region_size, tokudb_region_size, PLUGIN_VAR_READONLY, "Tokudb Region Size", NULL, NULL, 128 * 1024, 0, ~0L, 0); static MYSQL_SYSVAR_BOOL(shared_data, tokudb_shared_data, PLUGIN_VAR_READONLY, "Tokudb Shared Data", NULL, NULL, FALSE); static MYSQL_SYSVAR_STR(tmpdir, tokudb_tmpdir, PLUGIN_VAR_READONLY, "Tokudb Tmp Dir", NULL, NULL, NULL); #endif static struct st_mysql_sys_var *tokudb_system_variables[] = { MYSQL_SYSVAR(cache_size), MYSQL_SYSVAR(max_lock), MYSQL_SYSVAR(data_dir), MYSQL_SYSVAR(log_dir), MYSQL_SYSVAR(debug), MYSQL_SYSVAR(commit_sync), MYSQL_SYSVAR(version), MYSQL_SYSVAR(init_flags), MYSQL_SYSVAR(checkpointing_period), MYSQL_SYSVAR(prelock_empty), MYSQL_SYSVAR(write_status_frequency), MYSQL_SYSVAR(read_status_frequency), #if 0 MYSQL_SYSVAR(cache_parts), MYSQL_SYSVAR(env_flags), MYSQL_SYSVAR(home), MYSQL_SYSVAR(lock_scan_time), MYSQL_SYSVAR(lock_type), MYSQL_SYSVAR(log_buffer_size), MYSQL_SYSVAR(region_size), MYSQL_SYSVAR(shared_data), MYSQL_SYSVAR(tmpdir), #endif NULL }; mysql_declare_plugin(tokudb) { MYSQL_STORAGE_ENGINE_PLUGIN, &storage_engine_structure, "TokuDB", "Tokutek Inc", "Tokutek TokuDB Storage Engine", PLUGIN_LICENSE_GPL, tokudb_init_func, /* plugin init */ tokudb_done_func, /* plugin deinit */ 0x0210, /* 2.1.0 */ NULL, /* status variables */ tokudb_system_variables, /* system variables */ NULL /* config options */ } mysql_declare_plugin_end;