Commit 7cbe15cd authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

add commit_sync thread variable. closes #525

git-svn-id: file:///svn/mysql/tokudb-engine/src@2957 c7de825b-a66e-492c-adef-691d508d4ae1
parent 197e8ffe
...@@ -86,20 +86,10 @@ static DB_ENV *db_env; ...@@ -86,20 +86,10 @@ static DB_ENV *db_env;
static const char tokudb_hton_name[] = "TokuDB"; static const char tokudb_hton_name[] = "TokuDB";
static const int tokudb_hton_name_length = sizeof(tokudb_hton_name) - 1; static const int tokudb_hton_name_length = sizeof(tokudb_hton_name) - 1;
#if 0 // QQQ do we need this // thread variables
const char *tokudb_lock_names[] = { "DEFAULT", "OLDEST", "RANDOM", "YOUNGEST", "EXPIRE", "MAXLOCKS",
"MAXWRITE", "MINLOCKS", "MINWRITE", 0
};
u_int32_t tokudb_lock_types[] = { DB_LOCK_DEFAULT, DB_LOCK_OLDEST, DB_LOCK_RANDOM, DB_LOCK_YOUNGEST,
DB_LOCK_EXPIRE, DB_LOCK_MAXLOCKS, DB_LOCK_MAXWRITE, DB_LOCK_MINLOCKS,
DB_LOCK_MINWRITE
};
TYPELIB tokudb_lock_typelib = { array_elements(tokudb_lock_names) - 1, "", static MYSQL_THDVAR_BOOL(commit_sync, PLUGIN_VAR_THDLOCAL, "sync on txn commit",
tokudb_lock_names, NULL /* check */ NULL, /* update */ NULL, /* default*/ TRUE);
};
#endif
static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer); static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer);
static void tokudb_cleanup_log_files(void); static void tokudb_cleanup_log_files(void);
...@@ -419,10 +409,6 @@ int tokudb_end(handlerton * hton, ha_panic_function type) { ...@@ -419,10 +409,6 @@ int tokudb_end(handlerton * hton, ha_panic_function type) {
tokudb_cleanup_log_files(); tokudb_cleanup_log_files();
error = db_env->close(db_env, 0); // Error is logged error = db_env->close(db_env, 0); // Error is logged
db_env = 0; db_env = 0;
#if 0 // avoid a double mutex destroy during shutdown
hash_free(&tokudb_open_tables);
pthread_mutex_destroy(&tokudb_mutex);
#endif
} }
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -450,11 +436,12 @@ bool tokudb_flush_logs(handlerton * hton) { ...@@ -450,11 +436,12 @@ bool tokudb_flush_logs(handlerton * hton) {
static int tokudb_commit(handlerton * hton, THD * thd, bool all) { static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
DBUG_ENTER("tokudb_commit"); DBUG_ENTER("tokudb_commit");
DBUG_PRINT("trans", ("ending transaction %s", all ? "all" : "stmt")); DBUG_PRINT("trans", ("ending transaction %s", all ? "all" : "stmt"));
u_int32_t syncflag = THDVAR(thd, commit_sync) ? 0 : DB_TXN_NOSYNC;
tokudb_trx_data *trx = (tokudb_trx_data *) thd->ha_data[hton->slot]; tokudb_trx_data *trx = (tokudb_trx_data *) thd->ha_data[hton->slot];
DB_TXN **txn = all ? &trx->all : &trx->stmt; DB_TXN **txn = all ? &trx->all : &trx->stmt;
int error = 0; int error = 0;
if (*txn) { if (*txn) {
error = (*txn)->commit(*txn, 0); error = (*txn)->commit(*txn, syncflag);
if (*txn == trx->sp_level) if (*txn == trx->sp_level)
trx->sp_level = 0; trx->sp_level = 0;
*txn = 0; *txn = 0;
...@@ -2112,7 +2099,6 @@ static int create_sub_table(const char *table_name, const char *sub_name, DBTYPE ...@@ -2112,7 +2099,6 @@ static int create_sub_table(const char *table_name, const char *sub_name, DBTYPE
DBUG_RETURN(error); DBUG_RETURN(error);
} }
static int mkdirpath(char *name, mode_t mode) { static int mkdirpath(char *name, mode_t mode) {
int r = mkdir(name, mode); int r = mkdir(name, mode);
if (r == -1 && errno == ENOENT) { if (r == -1 && errno == ENOENT) {
...@@ -2129,6 +2115,38 @@ static int mkdirpath(char *name, mode_t mode) { ...@@ -2129,6 +2115,38 @@ static int mkdirpath(char *name, mode_t mode) {
return r; return r;
} }
#include <dirent.h>
static int rmall(const char *dname) {
int error = 0;
DIR *d = opendir(dname);
if (d) {
struct dirent *dirent;
while ((dirent = readdir(d)) != 0) {
if (0 == strcmp(dirent->d_name, ".") || 0 == strcmp(dirent->d_name, ".."))
continue;
char fname[strlen(dname) + 1 + strlen(dirent->d_name) + 1];
sprintf(fname, "%s/%s", dname, dirent->d_name);
if (dirent->d_type == DT_DIR) {
error = rmall(fname);
} else
error = unlink(fname);
if (error != 0) {
error = errno;
break;
}
}
closedir(d);
if (error == 0) {
error = rmdir(dname);
if (error != 0)
error = errno;
}
} else
error = errno;
return error;
}
int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_info) { int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_info) {
DBUG_ENTER("ha_tokudb::create"); DBUG_ENTER("ha_tokudb::create");
char name_buff[FN_REFLEN]; char name_buff[FN_REFLEN];
...@@ -2141,7 +2159,9 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -2141,7 +2159,9 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
if (error != 0) { if (error != 0) {
DBUG_RETURN(errno); DBUG_RETURN(errno);
} }
// QQQ rmall if the create fails
make_name(newname, name, "main"); make_name(newname, name, "main");
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME); fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
...@@ -2184,37 +2204,6 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -2184,37 +2204,6 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
DBUG_RETURN(error); DBUG_RETURN(error);
} }
#include <dirent.h>
static int rmall(const char *dname) {
int error = 0;
DIR *d = opendir(dname);
if (d) {
struct dirent *dirent;
while ((dirent = readdir(d)) != 0) {
if (0 == strcmp(dirent->d_name, ".") || 0 == strcmp(dirent->d_name, ".."))
continue;
char fname[strlen(dname) + 1 + strlen(dirent->d_name) + 1];
sprintf(fname, "%s/%s", dname, dirent->d_name);
if (dirent->d_type == DT_DIR) {
error = rmall(fname);
} else
error = unlink(fname);
if (error != 0) {
error = errno;
break;
}
}
closedir(d);
if (error == 0) {
error = rmdir(dname);
if (error != 0)
error = errno;
}
} else
error = errno;
return error;
}
int ha_tokudb::delete_table(const char *name) { int ha_tokudb::delete_table(const char *name) {
DBUG_ENTER("ha_tokudb::delete_table"); DBUG_ENTER("ha_tokudb::delete_table");
...@@ -2338,64 +2327,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range * start_key, key_range ...@@ -2338,64 +2327,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range * start_key, key_range
#endif #endif
} }
void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulonglong nb_desired_values, ulonglong * first_value, ulonglong * nb_reserved_values) { void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulonglong nb_desired_values, ulonglong * first_value, ulonglong * nb_reserved_values) {
#if 0
/* Ideally in case of real error (not "empty table") nr should be ~ULL(0) */
ulonglong nr = 1; // Default if error or new key
int error;
(void) ha_tokudb::extra(HA_EXTRA_KEYREAD);
/* Set 'active_index' */
ha_tokudb::index_init(table_share->next_number_index, 0);
if (!table_share->next_number_key_offset) { // Autoincrement at key-start
error = ha_tokudb::index_last(table->record[1]);
/* has taken read lock on page of max key so reserves to infinite */
*nb_reserved_values = ULONGLONG_MAX;
} else {
/*
MySQL needs to call us for next row: assume we are inserting ("a",null)
here, we return 3, and next this statement will want to insert ("b",null):
there is no reason why ("b",3+1) would be the good row to insert: maybe it
already exists, maybe 3+1 is too large...
*/
*nb_reserved_values = 1;
DBT row, old_key;
bzero((void *) &row, sizeof(row));
KEY *key_info = &table->key_info[active_index];
/* Reading next available number for a sub key */
ha_tokudb::create_key(&last_key, active_index, key_buff, table->record[0], table_share->next_number_key_offset);
/* Store for compare */
memcpy(old_key.data = key_buff2, key_buff, (old_key.size = last_key.size));
error = 1;
{
/* Modify the compare so that we will find the next key */
assert(0);
key_info->handler.bdb_return_if_eq = 1;
/* QQQ We lock the next key as the new key will probl. be on the same page */
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE | DB_RMW);
key_info->handler.bdb_return_if_eq = 0;
if (!error || error == DB_NOTFOUND) {
/*
Now search go one step back and then we should have found the
biggest key with the given prefix
*/
error = 1;
if (!cursor->c_get(cursor, &last_key, &row, DB_PREV | DB_RMW) && !tokudb_cmp_packed_key(key_file[active_index], &old_key, &last_key)) {
error = 0; // Found value
unpack_key(table->record[1], &last_key, active_index);
}
}
}
}
if (!error)
nr = (ulonglong) table->next_number_field->val_int_offset(table->s->rec_buff_length) + 1;
ha_tokudb::index_end();
(void) ha_tokudb::extra(HA_EXTRA_NO_KEYREAD);
*first_value = nr;
#else
ulonglong nr; ulonglong nr;
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
...@@ -2407,13 +2339,11 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl ...@@ -2407,13 +2339,11 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl
if (tokudb_debug) printf("%s:%d:init auto increment:%lld\n", __FILE__, __LINE__, share->last_auto_increment); if (tokudb_debug) printf("%s:%d:init auto increment:%lld\n", __FILE__, __LINE__, share->last_auto_increment);
} }
} }
share->last_auto_increment += increment; nr = share->last_auto_increment + increment;
nr = share->last_auto_increment; share->last_auto_increment = nr + nb_desired_values - 1;
share->last_auto_increment += nb_desired_values;
pthread_mutex_unlock(&share->mutex); pthread_mutex_unlock(&share->mutex);
*first_value = nr; *first_value = nr;
*nb_reserved_values = ULONGLONG_MAX; *nb_reserved_values = ULONGLONG_MAX;
#endif
} }
void ha_tokudb::print_error(int error, myf errflag) { void ha_tokudb::print_error(int error, myf errflag) {
...@@ -2479,6 +2409,7 @@ int ha_tokudb::check(THD * thd, HA_CHECK_OPT * check_opt) { ...@@ -2479,6 +2409,7 @@ int ha_tokudb::check(THD * thd, HA_CHECK_OPT * check_opt) {
} }
#endif #endif
struct st_mysql_storage_engine storage_engine_structure = { MYSQL_HANDLERTON_INTERFACE_VERSION }; struct st_mysql_storage_engine storage_engine_structure = { MYSQL_HANDLERTON_INTERFACE_VERSION };
// options flags // options flags
...@@ -2491,6 +2422,9 @@ struct st_mysql_storage_engine storage_engine_structure = { MYSQL_HANDLERTON_INT ...@@ -2491,6 +2422,9 @@ struct st_mysql_storage_engine storage_engine_structure = { MYSQL_HANDLERTON_INT
// PLUGIN_VAR_OPCMDARG Argument optional for cmd line // PLUGIN_VAR_OPCMDARG Argument optional for cmd line
// PLUGIN_VAR_MEMALLOC String needs memory allocated // PLUGIN_VAR_MEMALLOC String needs memory allocated
// system variables
static MYSQL_SYSVAR_ULONGLONG(cache_size, tokudb_cache_size, PLUGIN_VAR_READONLY, "TokuDB cache table size", NULL, NULL, 0, 0, ~0LL, 0); static MYSQL_SYSVAR_ULONGLONG(cache_size, tokudb_cache_size, PLUGIN_VAR_READONLY, "TokuDB cache table size", NULL, NULL, 0, 0, ~0LL, 0);
static MYSQL_SYSVAR_ULONG(max_lock, tokudb_max_lock, PLUGIN_VAR_READONLY, "TokuDB Max Locks", NULL, NULL, 8 * 1024, 0, ~0L, 0); static MYSQL_SYSVAR_ULONG(max_lock, tokudb_max_lock, PLUGIN_VAR_READONLY, "TokuDB Max Locks", NULL, NULL, 8 * 1024, 0, ~0L, 0);
...@@ -2537,6 +2471,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = { ...@@ -2537,6 +2471,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = {
MYSQL_SYSVAR(data_dir), MYSQL_SYSVAR(data_dir),
MYSQL_SYSVAR(log_dir), MYSQL_SYSVAR(log_dir),
MYSQL_SYSVAR(debug), MYSQL_SYSVAR(debug),
MYSQL_SYSVAR(commit_sync),
#if 0 #if 0
MYSQL_SYSVAR(cache_parts), MYSQL_SYSVAR(cache_parts),
MYSQL_SYSVAR(env_flags), MYSQL_SYSVAR(env_flags),
...@@ -2557,7 +2492,7 @@ mysql_declare_plugin(tokudb) { ...@@ -2557,7 +2492,7 @@ mysql_declare_plugin(tokudb) {
&storage_engine_structure, &storage_engine_structure,
"TokuDB", "TokuDB",
"Tokutek Inc", "Tokutek Inc",
"Fractal trees with transactions, row level locks", "Fractal trees, transactions, row level locks",
PLUGIN_LICENSE_PROPRIETARY, /* QQQ license? */ PLUGIN_LICENSE_PROPRIETARY, /* QQQ license? */
tokudb_init_func, /* plugin init */ tokudb_init_func, /* plugin init */
tokudb_done_func, /* plugin deinit */ tokudb_done_func, /* plugin deinit */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment