Commit b65e6e3d authored by Rich Prohaska's avatar Rich Prohaska

#141 redo table open and close locking to avoid table opening pileup

parent 7fe66e0c
......@@ -205,14 +205,39 @@ exit:
return error;
}
/** @brief
Simple lock controls. The "share" it creates is a structure we will
pass to each tokudb handler. Do you have to have one of these? Well, you have
pieces that are used for locking, and they are needed to function.
MUST have tokudb_mutex locked on input
static void free_key_and_col_info (KEY_AND_COL_INFO* kc_info) {
for (uint i = 0; i < MAX_KEY+1; i++) {
bitmap_free(&kc_info->key_filters[i]);
}
*/
for (uint i = 0; i < MAX_KEY+1; i++) {
tokudb_my_free(kc_info->cp_info[i]);
kc_info->cp_info[i] = NULL; // 3144
}
tokudb_my_free(kc_info->field_lengths);
tokudb_my_free(kc_info->length_bytes);
tokudb_my_free(kc_info->blob_fields);
}
void TOKUDB_SHARE::init(void) {
use_count = 0;
thr_lock_init(&lock);
tokudb_pthread_mutex_init(&mutex, MY_MUTEX_INIT_FAST);
my_rwlock_init(&num_DBs_lock, 0);
tokudb_pthread_cond_init(&m_openclose_cond, NULL);
m_state = CLOSED;
}
void TOKUDB_SHARE::destroy(void) {
assert(m_state == CLOSED);
thr_lock_delete(&lock);
tokudb_pthread_mutex_destroy(&mutex);
rwlock_destroy(&num_DBs_lock);
tokudb_pthread_cond_destroy(&m_openclose_cond);
}
// MUST have tokudb_mutex locked on input
static TOKUDB_SHARE *get_share(const char *table_name, TABLE_SHARE* table_share) {
TOKUDB_SHARE *share = NULL;
int error = 0;
......@@ -234,7 +259,8 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE_SHARE* table_share)
);
assert(share);
share->use_count = 0;
share->init();
share->table_name_length = length;
share->table_name = tmp_name;
strmov(share->table_name, table_name);
......@@ -244,54 +270,30 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE_SHARE* table_share)
goto exit;
}
memset((void *) share->key_file, 0, sizeof(share->key_file));
error = my_hash_insert(&tokudb_open_tables, (uchar *) share);
if (error) {
free_key_and_col_info(&share->kc_info);
goto exit;
}
thr_lock_init(&share->lock);
pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
my_rwlock_init(&share->num_DBs_lock, 0);
}
exit:
if (error) {
pthread_mutex_destroy(&share->mutex);
share->destroy();
tokudb_my_free((uchar *) share);
share = NULL;
}
return share;
}
static void free_key_and_col_info (KEY_AND_COL_INFO* kc_info) {
for (uint i = 0; i < MAX_KEY+1; i++) {
bitmap_free(&kc_info->key_filters[i]);
}
for (uint i = 0; i < MAX_KEY+1; i++) {
tokudb_my_free(kc_info->cp_info[i]);
kc_info->cp_info[i] = NULL; // 3144
}
tokudb_my_free(kc_info->field_lengths);
tokudb_my_free(kc_info->length_bytes);
tokudb_my_free(kc_info->blob_fields);
}
//
// MUST have tokudb_mutex locked on input
// bool mutex_is_locked specifies if share->mutex is locked
//
static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) {
static int free_share(TOKUDB_SHARE * share) {
int error, result = 0;
if (mutex_is_locked) {
pthread_mutex_unlock(&share->mutex);
}
tokudb_pthread_mutex_lock(&share->mutex);
DBUG_PRINT("info", ("share->use_count %u", share->use_count));
if (!--share->use_count) {
DBUG_PRINT("info", ("share->use_count %u", share->use_count));
share->m_state = TOKUDB_SHARE::CLOSING;
tokudb_pthread_mutex_unlock(&share->mutex);
//
// number of open DB's may not be equal to number of keys we have because add_index
......@@ -316,13 +318,25 @@ static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) {
error = tokudb::close_status(&share->status_block);
assert(error == 0);
my_hash_delete(&tokudb_open_tables, (uchar *) share);
thr_lock_delete(&share->lock);
pthread_mutex_destroy(&share->mutex);
rwlock_destroy(&share->num_DBs_lock);
tokudb_my_free((uchar *) share);
tokudb_pthread_mutex_lock(&tokudb_mutex);
tokudb_pthread_mutex_lock(&share->mutex);
share->m_state = TOKUDB_SHARE::CLOSED;
if (share->use_count > 0) {
tokudb_pthread_cond_broadcast(&share->m_openclose_cond);
tokudb_pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&tokudb_mutex);
} else {
my_hash_delete(&tokudb_open_tables, (uchar *) share);
tokudb_pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&tokudb_mutex);
share->destroy();
tokudb_my_free((uchar *) share);
}
} else {
tokudb_pthread_mutex_unlock(&share->mutex);
}
return result;
......@@ -588,7 +602,7 @@ smart_dbt_callback_ir_rowread(DBT const *key, DBT const *row, void *context) {
// Returns:
// The value of the auto increment column in record
//
ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar *record)
static ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar *record)
{
const uchar *key; /* Key */
ulonglong unsigned_autoinc = 0; /* Unsigned auto-increment */
......@@ -1710,8 +1724,6 @@ exit:
return error;
}
//
// Creates and opens a handle to a table which already exists in a tokudb
// database.
......@@ -1788,36 +1800,49 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
goto exit;
}
/* Init shared structure */
pthread_mutex_lock(&tokudb_mutex);
// lookup or create share
tokudb_pthread_mutex_lock(&tokudb_mutex);
share = get_share(name, table_share);
assert(share);
thr_lock_data_init(&share->lock, &lock, NULL);
/* Fill in shared structure, if needed */
pthread_mutex_lock(&share->mutex);
if (!share->use_count++) {
ret_val = initialize_share(
name,
mode
);
if (ret_val) {
free_share(share, true);
pthread_mutex_unlock(&tokudb_mutex);
goto exit;
tokudb_pthread_mutex_lock(&share->mutex);
tokudb_pthread_mutex_unlock(&tokudb_mutex);
share->use_count++;
while (share->m_state == TOKUDB_SHARE::OPENING || share->m_state == TOKUDB_SHARE::CLOSING) {
tokudb_pthread_cond_wait(&share->m_openclose_cond, &share->mutex);
}
if (share->m_state == TOKUDB_SHARE::CLOSED) {
share->m_state = TOKUDB_SHARE::OPENING;
tokudb_pthread_mutex_unlock(&share->mutex);
ret_val = initialize_share(name, mode);
tokudb_pthread_mutex_lock(&share->mutex);
if (ret_val == 0) {
share->m_state = TOKUDB_SHARE::OPENED;
} else {
share->m_state = TOKUDB_SHARE::ERROR;
share->m_error = ret_val;
}
tokudb_pthread_cond_broadcast(&share->m_openclose_cond);
}
if (share->m_state == TOKUDB_SHARE::ERROR) {
ret_val = share->m_error;
tokudb_pthread_mutex_unlock(&share->mutex);
free_share(share);
goto exit;
} else {
assert(share->m_state == TOKUDB_SHARE::OPENED);
tokudb_pthread_mutex_unlock(&share->mutex);
}
pthread_mutex_unlock(&share->mutex);
pthread_mutex_unlock(&tokudb_mutex);
ref_length = share->ref_length; // If second open
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
pthread_mutex_lock(&share->mutex);
TOKUDB_TRACE("tokudbopen:%p:share=%p:file=%p:table=%p:table->s=%p:%d\n",
this, share, share->file, table, table->s, share->use_count);
pthread_mutex_unlock(&share->mutex);
}
key_read = false;
......@@ -2128,9 +2153,7 @@ int ha_tokudb::__close() {
rec_update_buff = NULL;
alloc_ptr = NULL;
ha_tokudb::reset();
pthread_mutex_lock(&tokudb_mutex);
int retval = free_share(share, false);
pthread_mutex_unlock(&tokudb_mutex);
int retval = free_share(share);
TOKUDB_DBUG_RETURN(retval);
}
......@@ -2926,7 +2949,7 @@ DBT *ha_tokudb::pack_ext_key(
//
void ha_tokudb::init_hidden_prim_key_info() {
TOKUDB_DBUG_ENTER("ha_tokudb::init_prim_key_info");
pthread_mutex_lock(&share->mutex);
tokudb_pthread_mutex_lock(&share->mutex);
if (!(share->status & STATUS_PRIMARY_KEY_INIT)) {
int error = 0;
THD* thd = ha_thd();
......@@ -2966,7 +2989,7 @@ void ha_tokudb::init_hidden_prim_key_info() {
}
share->status |= STATUS_PRIMARY_KEY_INIT;
}
pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&share->mutex);
DBUG_VOID_RETURN;
}
......@@ -3246,9 +3269,9 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) {
}
}
exit_try_table_lock:
pthread_mutex_lock(&share->mutex);
tokudb_pthread_mutex_lock(&share->mutex);
share->try_table_lock = false;
pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&share->mutex);
}
DBUG_VOID_RETURN;
}
......@@ -3265,9 +3288,9 @@ int ha_tokudb::end_bulk_insert(bool abort) {
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
bool using_loader = (loader != NULL);
if (ai_metadata_update_required) {
pthread_mutex_lock(&share->mutex);
tokudb_pthread_mutex_lock(&share->mutex);
error = update_max_auto_inc(share->status_block, share->last_auto_increment);
pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&share->mutex);
if (error) { goto cleanup; }
}
delay_updating_ai_metadata = false;
......@@ -3891,7 +3914,7 @@ int ha_tokudb::write_row(uchar * record) {
// of the auto inc field.
//
if (share->has_auto_inc && record == table->record[0]) {
pthread_mutex_lock(&share->mutex);
tokudb_pthread_mutex_lock(&share->mutex);
ulonglong curr_auto_inc = retrieve_auto_increment(
table->field[share->ai_field_index]->key_type(),
field_offset(table->field[share->ai_field_index], table),
......@@ -3906,7 +3929,7 @@ int ha_tokudb::write_row(uchar * record) {
update_max_auto_inc(share->status_block, share->last_auto_increment);
}
}
pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&share->mutex);
}
//
......@@ -4084,7 +4107,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
// of the auto inc field.
//
if (share->has_auto_inc && new_row == table->record[0]) {
pthread_mutex_lock(&share->mutex);
tokudb_pthread_mutex_lock(&share->mutex);
ulonglong curr_auto_inc = retrieve_auto_increment(
table->field[share->ai_field_index]->key_type(),
field_offset(table->field[share->ai_field_index], table),
......@@ -4096,7 +4119,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
share->last_auto_increment = curr_auto_inc;
}
}
pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&share->mutex);
}
//
......@@ -5377,6 +5400,8 @@ int ha_tokudb::index_prev(uchar * buf) {
TOKUDB_DBUG_RETURN(error);
}
volatile int tokudb_index_first_wait = 0;
//
// Reads the first row from the active index (cursor) into buf, and advances cursor
// Parameters:
......@@ -5398,6 +5423,8 @@ int ha_tokudb::index_first(uchar * buf) {
ha_statistic_increment(&SSV::ha_read_first_count);
while (tokudb_index_first_wait) sleep(1);
info.ha = this;
info.buf = buf;
info.keynr = tokudb_active_index;
......@@ -6125,7 +6152,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
transaction = trx->sub_sp_level;
}
else {
pthread_mutex_lock(&share->mutex);
tokudb_pthread_mutex_lock(&share->mutex);
// hate dealing with comparison of signed vs unsigned, so doing this
if (deleted_rows > added_rows && share->rows < (deleted_rows - added_rows)) {
share->rows = 0;
......@@ -6133,7 +6160,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
else {
share->rows += (added_rows - deleted_rows);
}
pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&share->mutex);
added_rows = 0;
deleted_rows = 0;
share->rows_from_locked_table = 0;
......@@ -7443,7 +7470,7 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl
ulonglong nr;
bool over;
pthread_mutex_lock(&share->mutex);
tokudb_pthread_mutex_lock(&share->mutex);
if (share->auto_inc_create_value > share->last_auto_increment) {
nr = share->auto_inc_create_value;
......@@ -7472,7 +7499,7 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl
}
*first_value = nr;
*nb_reserved_values = nb_desired_values;
pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&share->mutex);
DBUG_VOID_RETURN;
}
......@@ -7838,18 +7865,18 @@ int ha_tokudb::tokudb_add_index(
// We have an accurate row count, might as well update share->rows
//
if(!creating_hot_index) {
pthread_mutex_lock(&share->mutex);
tokudb_pthread_mutex_lock(&share->mutex);
share->rows = num_processed;
pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&share->mutex);
}
//
// now write stuff to status.tokudb
//
pthread_mutex_lock(&share->mutex);
tokudb_pthread_mutex_lock(&share->mutex);
for (uint i = 0; i < num_of_keys; i++) {
write_key_name_to_status(share->status_block, key_info[i].name, txn);
}
pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&share->mutex);
error = 0;
cleanup:
......
......@@ -126,6 +126,10 @@ typedef struct hot_optimize_context {
// and auto increment information.
//
class TOKUDB_SHARE {
public:
void init(void);
void destroy(void);
public:
char *table_name;
uint table_name_length, use_count;
......@@ -184,6 +188,10 @@ public:
bool replace_into_fast;
rw_lock_t num_DBs_lock;
uint32_t num_DBs;
pthread_cond_t m_openclose_cond;
enum { CLOSED, OPENING, OPENED, CLOSING, ERROR } m_state;
int m_error;
};
typedef struct st_filter_key_part_info {
......@@ -443,10 +451,7 @@ private:
int write_auto_inc_create(DB* db, ulonglong val, DB_TXN* txn);
void init_auto_increment();
bool can_replace_into_be_fast(TABLE_SHARE* table_share, KEY_AND_COL_INFO* kc_info, uint pk);
int initialize_share(
const char* name,
int mode
);
int initialize_share(const char* name, int mode);
void set_query_columns(uint keynr);
int prelock_range (const key_range *start_key, const key_range *end_key);
......@@ -599,10 +604,10 @@ public:
int get_status(DB_TXN* trans);
void init_hidden_prim_key_info();
inline void get_auto_primary_key(uchar * to) {
pthread_mutex_lock(&share->mutex);
tokudb_pthread_mutex_lock(&share->mutex);
share->auto_ident++;
hpk_num_to_char(to, share->auto_ident);
pthread_mutex_unlock(&share->mutex);
tokudb_pthread_mutex_unlock(&share->mutex);
}
virtual void get_auto_increment(ulonglong offset, ulonglong increment, ulonglong nb_desired_values, ulonglong * first_value, ulonglong * nb_reserved_values);
bool is_optimize_blocking();
......
......@@ -443,4 +443,44 @@ static inline void* tokudb_my_multi_malloc(myf myFlags, ...) {
return start;
}
static inline void tokudb_pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) {
int r = pthread_mutex_init(mutex, attr);
assert(r == 0);
}
static inline void tokudb_pthread_mutex_destroy(pthread_mutex_t *mutex) {
int r = pthread_mutex_destroy(mutex);
assert(r == 0);
}
static inline void tokudb_pthread_mutex_lock(pthread_mutex_t *mutex) {
int r = pthread_mutex_lock(mutex);
assert(r == 0);
}
static inline void tokudb_pthread_mutex_unlock(pthread_mutex_t *mutex) {
int r = pthread_mutex_unlock(mutex);
assert(r == 0);
}
static inline void tokudb_pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr) {
int r = pthread_cond_init(cond, attr);
assert(r == 0);
}
static inline void tokudb_pthread_cond_destroy(pthread_cond_t *cond) {
int r = pthread_cond_destroy(cond);
assert(r == 0);
}
static inline void tokudb_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) {
int r = pthread_cond_wait(cond, mutex);
assert(r == 0);
}
static inline void tokudb_pthread_cond_broadcast(pthread_cond_t *cond) {
int r = pthread_cond_broadcast(cond);
assert(r == 0);
}
#endif
......@@ -311,7 +311,7 @@ static int tokudb_init_func(void *p) {
tokudb_hton = (handlerton *) p;
pthread_mutex_init(&tokudb_mutex, MY_MUTEX_INIT_FAST);
tokudb_pthread_mutex_init(&tokudb_mutex, MY_MUTEX_INIT_FAST);
(void) my_hash_init(&tokudb_open_tables, table_alias_charset, 32, 0, 0, (my_hash_get_key) tokudb_get_key, 0, 0);
tokudb_hton->state = SHOW_OPTION_YES;
......@@ -529,7 +529,7 @@ static int tokudb_done_func(void *p) {
tokudb_my_free(toku_global_status_rows);
toku_global_status_rows = NULL;
my_hash_free(&tokudb_open_tables);
pthread_mutex_destroy(&tokudb_mutex);
tokudb_pthread_mutex_destroy(&tokudb_mutex);
#if defined(_WIN64)
toku_ydb_destroy();
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment