Commit 91d570d0 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2356], make lock wait on reads user controlled

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@17986 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3a57fdf9
...@@ -1115,6 +1115,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t ...@@ -1115,6 +1115,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
num_blob_bytes = 0; num_blob_bytes = 0;
delay_updating_ai_metadata = false; delay_updating_ai_metadata = false;
ai_metadata_update_required = false; ai_metadata_update_required = false;
read_lock_wait_time = 4000;
bzero(mult_key_buff, sizeof(mult_key_buff)); bzero(mult_key_buff, sizeof(mult_key_buff));
bzero(mult_rec_buff, sizeof(mult_rec_buff)); bzero(mult_rec_buff, sizeof(mult_rec_buff));
bzero(mult_key_dbt, sizeof(mult_key_dbt)); bzero(mult_key_dbt, sizeof(mult_key_dbt));
...@@ -3630,7 +3631,7 @@ void ha_tokudb::column_bitmaps_signal() { ...@@ -3630,7 +3631,7 @@ void ha_tokudb::column_bitmaps_signal() {
int ha_tokudb::prepare_index_scan() { int ha_tokudb::prepare_index_scan() {
int error; int error;
DB* db = share->key_file[active_index]; DB* db = share->key_file[active_index];
lockretry { lockretryN(read_lock_wait_time){
error = db->pre_acquire_read_lock( error = db->pre_acquire_read_lock(
db, db,
transaction, transaction,
...@@ -3661,7 +3662,7 @@ int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) { ...@@ -3661,7 +3662,7 @@ int ha_tokudb::prepare_index_key_scan( const uchar * key, uint key_len ) {
pack_key(&start_key, active_index, key_buff, key, key_len, COL_NEG_INF); pack_key(&start_key, active_index, key_buff, key, key_len, COL_NEG_INF);
pack_key(&end_key, active_index, key_buff2, key, key_len, COL_POS_INF); pack_key(&end_key, active_index, key_buff2, key, key_len, COL_POS_INF);
lockretry { lockretryN(read_lock_wait_time){
error = share->key_file[active_index]->pre_acquire_read_lock( error = share->key_file[active_index]->pre_acquire_read_lock(
share->key_file[active_index], share->key_file[active_index],
transaction, transaction,
...@@ -3709,6 +3710,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) { ...@@ -3709,6 +3710,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
int error; int error;
THD* thd = ha_thd(); THD* thd = ha_thd();
DBUG_PRINT("enter", ("table: '%s' key: %d", table_share->table_name.str, keynr)); DBUG_PRINT("enter", ("table: '%s' key: %d", table_share->table_name.str, keynr));
read_lock_wait_time = get_read_lock_wait_time(ha_thd());
/* /*
Under some very rare conditions (like full joins) we may already have Under some very rare conditions (like full joins) we may already have
...@@ -3912,7 +3914,7 @@ int ha_tokudb::read_full_row(uchar * buf) { ...@@ -3912,7 +3914,7 @@ int ha_tokudb::read_full_row(uchar * buf) {
// //
// assumes key is stored in this->last_key // assumes key is stored in this->last_key
// //
lockretry { lockretryN(read_lock_wait_time){
error = share->file->getf_set( error = share->file->getf_set(
share->file, share->file,
transaction, transaction,
...@@ -3964,7 +3966,7 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { ...@@ -3964,7 +3966,7 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
pack_key(&curr_key, active_index, key_buff2, key, keylen, COL_ZERO); pack_key(&curr_key, active_index, key_buff2, key, keylen, COL_ZERO);
flags = SET_READ_FLAG(0); flags = SET_READ_FLAG(0);
lockretry { lockretryN(read_lock_wait_time){
error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info);
lockretry_wait; lockretry_wait;
} }
...@@ -4039,22 +4041,34 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -4039,22 +4041,34 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
case HA_READ_KEY_EXACT: /* Find first record else error */ case HA_READ_KEY_EXACT: /* Find first record else error */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF);
ir_info.orig_key = &lookup_key; ir_info.orig_key = &lookup_key;
lockretryN(read_lock_wait_time){
error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info); error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info);
lockretry_wait;
}
if (ir_info.cmp) { if (ir_info.cmp) {
error = DB_NOTFOUND; error = DB_NOTFOUND;
} }
break; break;
case HA_READ_AFTER_KEY: /* Find next rec. after key-record */ case HA_READ_AFTER_KEY: /* Find next rec. after key-record */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF);
lockretryN(read_lock_wait_time){
error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info);
lockretry_wait;
}
break; break;
case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */ case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF);
lockretryN(read_lock_wait_time){
error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info);
lockretry_wait;
}
break; break;
case HA_READ_KEY_OR_NEXT: /* Record or next record */ case HA_READ_KEY_OR_NEXT: /* Record or next record */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF);
lockretryN(read_lock_wait_time){
error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info);
lockretry_wait;
}
break; break;
// //
// This case does not seem to ever be used, it is ok for it to be slow // This case does not seem to ever be used, it is ok for it to be slow
...@@ -4062,7 +4076,10 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -4062,7 +4076,10 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
case HA_READ_KEY_OR_PREV: /* Record or previous */ case HA_READ_KEY_OR_PREV: /* Record or previous */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_NEG_INF);
ir_info.orig_key = &lookup_key; ir_info.orig_key = &lookup_key;
lockretryN(read_lock_wait_time){
error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info); error = cursor->c_getf_set_range(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info);
lockretry_wait;
}
if (error == DB_NOTFOUND) { if (error == DB_NOTFOUND) {
error = cursor->c_getf_last(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_last(cursor, flags, SMART_DBT_CALLBACK, &info);
} }
...@@ -4072,12 +4089,18 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -4072,12 +4089,18 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
break; break;
case HA_READ_PREFIX_LAST_OR_PREV: /* Last or prev key with the same prefix */ case HA_READ_PREFIX_LAST_OR_PREV: /* Last or prev key with the same prefix */
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF);
lockretryN(read_lock_wait_time){
error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_CALLBACK, &info);
lockretry_wait;
}
break; break;
case HA_READ_PREFIX_LAST: case HA_READ_PREFIX_LAST:
pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF); pack_key(&lookup_key, active_index, key_buff3, key, key_len, COL_POS_INF);
ir_info.orig_key = &lookup_key; ir_info.orig_key = &lookup_key;
lockretryN(read_lock_wait_time){
error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info); error = cursor->c_getf_set_range_reverse(cursor, flags, &lookup_key, SMART_DBT_IR_CALLBACK, &ir_info);
lockretry_wait;
}
if (ir_info.cmp) { if (ir_info.cmp) {
error = DB_NOTFOUND; error = DB_NOTFOUND;
} }
...@@ -4128,7 +4151,7 @@ int ha_tokudb::index_next(uchar * buf) { ...@@ -4128,7 +4151,7 @@ int ha_tokudb::index_next(uchar * buf) {
info.buf = buf; info.buf = buf;
info.keynr = active_index; info.keynr = active_index;
lockretry { lockretryN(read_lock_wait_time){
error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info);
lockretry_wait; lockretry_wait;
} }
...@@ -4176,7 +4199,7 @@ int ha_tokudb::index_prev(uchar * buf) { ...@@ -4176,7 +4199,7 @@ int ha_tokudb::index_prev(uchar * buf) {
info.buf = buf; info.buf = buf;
info.keynr = active_index; info.keynr = active_index;
lockretry { lockretryN(read_lock_wait_time){
error = cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK, &info);
lockretry_wait; lockretry_wait;
} }
...@@ -4220,7 +4243,7 @@ int ha_tokudb::index_first(uchar * buf) { ...@@ -4220,7 +4243,7 @@ int ha_tokudb::index_first(uchar * buf) {
info.buf = buf; info.buf = buf;
info.keynr = active_index; info.keynr = active_index;
lockretry { lockretryN(read_lock_wait_time){
error = cursor->c_getf_first(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_first(cursor, flags, SMART_DBT_CALLBACK, &info);
lockretry_wait; lockretry_wait;
} }
...@@ -4264,7 +4287,7 @@ int ha_tokudb::index_last(uchar * buf) { ...@@ -4264,7 +4287,7 @@ int ha_tokudb::index_last(uchar * buf) {
info.buf = buf; info.buf = buf;
info.keynr = active_index; info.keynr = active_index;
lockretry { lockretryN(read_lock_wait_time){
error = cursor->c_getf_last(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_last(cursor, flags, SMART_DBT_CALLBACK, &info);
lockretry_wait; lockretry_wait;
} }
...@@ -4296,10 +4319,11 @@ cleanup: ...@@ -4296,10 +4319,11 @@ cleanup:
int ha_tokudb::rnd_init(bool scan) { int ha_tokudb::rnd_init(bool scan) {
TOKUDB_DBUG_ENTER("ha_tokudb::rnd_init"); TOKUDB_DBUG_ENTER("ha_tokudb::rnd_init");
int error; int error;
read_lock_wait_time = get_read_lock_wait_time(ha_thd());
range_lock_grabbed = false; range_lock_grabbed = false;
if (scan) { if (scan) {
DB* db = share->key_file[primary_key]; DB* db = share->key_file[primary_key];
lockretry { lockretryN(read_lock_wait_time){
error = db->pre_acquire_read_lock(db, transaction, db->dbt_neg_infty(), NULL, db->dbt_pos_infty(), NULL); error = db->pre_acquire_read_lock(db, transaction, db->dbt_neg_infty(), NULL, db->dbt_pos_infty(), NULL);
lockretry_wait; lockretry_wait;
} }
...@@ -4359,7 +4383,7 @@ int ha_tokudb::rnd_next(uchar * buf) { ...@@ -4359,7 +4383,7 @@ int ha_tokudb::rnd_next(uchar * buf) {
info.buf = buf; info.buf = buf;
info.keynr = primary_key; info.keynr = primary_key;
lockretry { lockretryN(read_lock_wait_time){
error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info); error = cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info);
lockretry_wait; lockretry_wait;
} }
...@@ -4438,6 +4462,7 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) { ...@@ -4438,6 +4462,7 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
struct smart_dbt_info info; struct smart_dbt_info info;
bool old_unpack_entire_row = unpack_entire_row; bool old_unpack_entire_row = unpack_entire_row;
DBT* key = get_pos(&db_pos, pos); DBT* key = get_pos(&db_pos, pos);
read_lock_wait_time = get_read_lock_wait_time(ha_thd());
unpack_entire_row = true; unpack_entire_row = true;
statistic_increment(table->in_use->status_var.ha_read_rnd_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_rnd_count, &LOCK_status);
...@@ -4447,7 +4472,7 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) { ...@@ -4447,7 +4472,7 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
info.buf = buf; info.buf = buf;
info.keynr = primary_key; info.keynr = primary_key;
lockretry { lockretryN(read_lock_wait_time) {
error = share->file->getf_set(share->file, transaction, 0, key, smart_dbt_callback_rowread_ptquery, &info); error = share->file->getf_set(share->file, transaction, 0, key, smart_dbt_callback_rowread_ptquery, &info);
lockretry_wait; lockretry_wait;
} }
...@@ -4508,7 +4533,7 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k ...@@ -4508,7 +4533,7 @@ int ha_tokudb::prelock_range( const key_range *start_key, const key_range *end_k
end_dbt_data = share->key_file[active_index]->dbt_pos_infty(); end_dbt_data = share->key_file[active_index]->dbt_pos_infty();
} }
lockretry { lockretryN(read_lock_wait_time){
error = share->key_file[active_index]->pre_acquire_read_lock( error = share->key_file[active_index]->pre_acquire_read_lock(
share->key_file[active_index], share->key_file[active_index],
transaction, transaction,
...@@ -5946,6 +5971,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -5946,6 +5971,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
// //
char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound. char status_msg[MAX_ALIAS_NAME + 200]; //buffer of 200 should be a good upper bound.
ulonglong num_processed = 0; //variable that stores number of elements inserted thus far ulonglong num_processed = 0; //variable that stores number of elements inserted thus far
read_lock_wait_time = get_read_lock_wait_time(ha_thd());
thd_proc_info(thd, "Adding indexes"); thd_proc_info(thd, "Adding indexes");
tmp_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME)); tmp_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME));
...@@ -6032,7 +6058,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -6032,7 +6058,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
// first a global read lock on the main DB, because // first a global read lock on the main DB, because
// we intend to scan the entire thing // we intend to scan the entire thing
// //
lockretry { lockretryN(read_lock_wait_time){
error = share->file->pre_acquire_read_lock( error = share->file->pre_acquire_read_lock(
share->file, share->file,
txn, txn,
......
...@@ -244,6 +244,8 @@ private: ...@@ -244,6 +244,8 @@ private:
// //
char write_status_msg[200]; //buffer of 200 should be a good upper bound. char write_status_msg[200]; //buffer of 200 should be a good upper bound.
ulonglong read_lock_wait_time;
bool fix_rec_buff_for_blob(ulong length); bool fix_rec_buff_for_blob(ulong length);
void fix_mult_rec_buff(); void fix_mult_rec_buff();
uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH]; uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH];
......
...@@ -62,6 +62,18 @@ static MYSQL_THDVAR_ULONGLONG(write_lock_wait, ...@@ -62,6 +62,18 @@ static MYSQL_THDVAR_ULONGLONG(write_lock_wait,
1 // blocksize 1 // blocksize
); );
static MYSQL_THDVAR_ULONGLONG(read_lock_wait,
0,
"time waiting for read lock",
NULL,
NULL,
4000, // default
0, // min?
ULONGLONG_MAX, // max
1 // blocksize
);
static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer); static void tokudb_print_error(const DB_ENV * db_env, const char *db_errpfx, const char *buffer);
static void tokudb_cleanup_log_files(void); static void tokudb_cleanup_log_files(void);
static int tokudb_end(handlerton * hton, ha_panic_function type); static int tokudb_end(handlerton * hton, ha_panic_function type);
...@@ -446,6 +458,12 @@ ulonglong get_write_lock_wait_time (THD* thd) { ...@@ -446,6 +458,12 @@ ulonglong get_write_lock_wait_time (THD* thd) {
return (ret_val == 0) ? ULONGLONG_MAX : ret_val; return (ret_val == 0) ? ULONGLONG_MAX : ret_val;
} }
ulonglong get_read_lock_wait_time (THD* thd) {
ulonglong ret_val = THDVAR(thd, read_lock_wait);
return (ret_val == 0) ? ULONGLONG_MAX : ret_val;
}
static int tokudb_commit(handlerton * hton, THD * thd, bool all) { static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
TOKUDB_DBUG_ENTER("tokudb_commit"); TOKUDB_DBUG_ENTER("tokudb_commit");
DBUG_PRINT("trans", ("ending transaction %s", all ? "all" : "stmt")); DBUG_PRINT("trans", ("ending transaction %s", all ? "all" : "stmt"));
...@@ -1032,6 +1050,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = { ...@@ -1032,6 +1050,7 @@ static struct st_mysql_sys_var *tokudb_system_variables[] = {
MYSQL_SYSVAR(debug), MYSQL_SYSVAR(debug),
MYSQL_SYSVAR(commit_sync), MYSQL_SYSVAR(commit_sync),
MYSQL_SYSVAR(write_lock_wait), MYSQL_SYSVAR(write_lock_wait),
MYSQL_SYSVAR(read_lock_wait),
MYSQL_SYSVAR(version), MYSQL_SYSVAR(version),
MYSQL_SYSVAR(init_flags), MYSQL_SYSVAR(init_flags),
MYSQL_SYSVAR(checkpointing_period), MYSQL_SYSVAR(checkpointing_period),
......
...@@ -12,6 +12,7 @@ extern DB *metadata_db; ...@@ -12,6 +12,7 @@ extern DB *metadata_db;
// thread variables // thread variables
ulonglong get_write_lock_wait_time (THD* thd); ulonglong get_write_lock_wait_time (THD* thd);
ulonglong get_read_lock_wait_time (THD* thd);
extern HASH tokudb_open_tables; extern HASH tokudb_open_tables;
extern pthread_mutex_t tokudb_mutex; extern pthread_mutex_t tokudb_mutex;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment