Commit a0e90826 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses 922

when locking table, grab table locks

git-svn-id: file:///svn/mysql/tokudb-engine/src@4764 c7de825b-a66e-492c-adef-691d508d4ae1
parent 332f0871
...@@ -942,6 +942,12 @@ int primary_key_part_compare (const void* left, const void* right) { ...@@ -942,6 +942,12 @@ int primary_key_part_compare (const void* left, const void* right) {
return left_part->offset - right_part->offset; return left_part->offset - right_part->offset;
} }
//
// macro that modifies read flag for cursor operations depending on whether
// we have preacquired lock or not
//
#define SET_READ_FLAG(flg) ((range_lock_grabbed || current_thd->options & OPTION_TABLE_LOCK) ? (flg | DB_PRELOCKED) : flg)
// //
// Open a secondary table, the key will be a secondary index, the data will be a primary key // Open a secondary table, the key will be a secondary index, the data will be a primary key
// //
...@@ -2567,7 +2573,7 @@ int ha_tokudb::index_next(uchar * buf) { ...@@ -2567,7 +2573,7 @@ int ha_tokudb::index_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_next"); TOKUDB_DBUG_ENTER("ha_tokudb::index_next");
int error; int error;
DBT row; DBT row;
u_int32_t flags = range_lock_grabbed ? (DB_NEXT | DB_PRELOCKED) : DB_NEXT; u_int32_t flags = SET_READ_FLAG(DB_NEXT);
CHECK_VALID_CURSOR(); CHECK_VALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
...@@ -2601,11 +2607,11 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) { ...@@ -2601,11 +2607,11 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
!(table->key_info[active_index].flags & HA_NOSAME) && !(table->key_info[active_index].flags & HA_NOSAME) &&
!(table->key_info[active_index].flags & HA_END_SPACE_KEY)) { !(table->key_info[active_index].flags & HA_END_SPACE_KEY)) {
u_int32_t flags = range_lock_grabbed ? (DB_NEXT_DUP | DB_PRELOCKED) : DB_NEXT_DUP; u_int32_t flags = SET_READ_FLAG(DB_NEXT_DUP);
error = cursor->c_get(cursor, &last_key, &row, flags); error = cursor->c_get(cursor, &last_key, &row, flags);
error = read_row(error, buf, active_index, &row, &last_key, 1); error = read_row(error, buf, active_index, &row, &last_key, 1);
} else { } else {
u_int32_t flags = range_lock_grabbed ? (DB_NEXT | DB_PRELOCKED) : DB_NEXT; u_int32_t flags = SET_READ_FLAG(DB_NEXT);
error = read_row(cursor->c_get(cursor, &last_key, &row, flags), buf, active_index, &row, &last_key, 1); error = read_row(cursor->c_get(cursor, &last_key, &row, flags), buf, active_index, &row, &last_key, 1);
if (!error &&::key_cmp_if_same(table, key, active_index, keylen)) if (!error &&::key_cmp_if_same(table, key, active_index, keylen))
error = HA_ERR_END_OF_FILE; error = HA_ERR_END_OF_FILE;
...@@ -2626,7 +2632,7 @@ cleanup: ...@@ -2626,7 +2632,7 @@ cleanup:
int ha_tokudb::index_prev(uchar * buf) { int ha_tokudb::index_prev(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_prev"); TOKUDB_DBUG_ENTER("ha_tokudb::index_prev");
int error; int error;
u_int32_t flags = range_lock_grabbed ? (DB_PREV | DB_PRELOCKED) : DB_PREV; u_int32_t flags = SET_READ_FLAG(DB_PREV);
CHECK_VALID_CURSOR(); CHECK_VALID_CURSOR();
DBT row; DBT row;
statistic_increment(table->in_use->status_var.ha_read_prev_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_read_prev_count, &LOCK_status);
...@@ -2733,7 +2739,7 @@ int ha_tokudb::rnd_next(uchar * buf) { ...@@ -2733,7 +2739,7 @@ int ha_tokudb::rnd_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next"); TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next");
int error; int error;
DBT row; DBT row;
u_int32_t flags = range_lock_grabbed ? (DB_NEXT | DB_PRELOCKED) : DB_NEXT; u_int32_t flags = SET_READ_FLAG(DB_NEXT);
CHECK_VALID_CURSOR() CHECK_VALID_CURSOR()
// //
...@@ -2977,6 +2983,51 @@ int ha_tokudb::reset(void) { ...@@ -2977,6 +2983,51 @@ int ha_tokudb::reset(void) {
TOKUDB_DBUG_RETURN(0); TOKUDB_DBUG_RETURN(0);
} }
//
// helper function that iterates through all DB's
// and grabs a lock (either read or write, but not both)
// Parameters:
// [in] trans - transaction to be used to pre acquire the lock
// lt - type of lock to get, either lock_read or lock_write
// Returns:
// 0 on success
// error otherwise
//
int ha_tokudb::acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt) {
int error = ENOSYS;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
if (lt == lock_read) {
for (uint i = 0; i < curr_num_DBs; i++) {
DB* db = share->key_file[i];
error = db->pre_acquire_read_lock(
db,
trans,
db->dbt_neg_infty(), db->dbt_neg_infty(),
db->dbt_pos_infty(), db->dbt_pos_infty()
);
if (error) { goto cleanup; }
}
}
else if (lt == lock_write) {
for (uint i = 0; i < curr_num_DBs; i++) {
DB* db = share->key_file[i];
error = db->pre_acquire_table_lock(db, trans);
if (error) { goto cleanup; }
}
}
else {
error = ENOSYS;
goto cleanup;
}
error = 0;
cleanup:
return error;
}
/* /*
As MySQL will execute an external lock for every new table it uses As MySQL will execute an external lock for every new table it uses
we can use this to start the transactions. we can use this to start the transactions.
...@@ -3033,7 +3084,22 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { ...@@ -3033,7 +3084,22 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
trx->sp_level = trx->all; trx->sp_level = trx->all;
trans_register_ha(thd, TRUE, tokudb_hton); trans_register_ha(thd, TRUE, tokudb_hton);
if (thd->in_lock_tables) { if (thd->in_lock_tables) {
error = 0; // Don't create stmt trans //
// grab table locks
// For the command "Lock tables foo read, bar read"
// This statement is grabbing the locks for the table
// foo. The locks for bar will be grabbed when
// trx->tokudb_lock_count has been initialized
//
assert(lock.type == TL_WRITE || lock.type == TL_READ_NO_INSERT);
if (lock.type == TL_READ_NO_INSERT) {
error = acquire_table_lock(trx->all,lock_read);
}
else if (lock.type == TL_WRITE) {
error = acquire_table_lock(trx->all,lock_write);
}
// Don't create stmt trans
if (error) {trx->tokudb_lock_count--;}
goto cleanup; goto cleanup;
} }
} }
...@@ -3053,6 +3119,26 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { ...@@ -3053,6 +3119,26 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
} }
trans_register_ha(thd, FALSE, tokudb_hton); trans_register_ha(thd, FALSE, tokudb_hton);
} }
else {
if (thd->in_lock_tables) {
assert(trx->all != NULL);
assert(lock.type == TL_WRITE || lock.type == TL_READ_NO_INSERT);
//
// For the command "Lock tables foo read, bar read"
// This statement is grabbing the locks for the table
// bar. The locks for foo will be grabbed when
// trx->tokudb_lock_count is 0 and we are initializing
// trx->all above
//
if (lock.type == TL_READ_NO_INSERT) {
error = acquire_table_lock(trx->all,lock_read);
}
else if (lock.type == TL_WRITE) {
error = acquire_table_lock(trx->all,lock_write);
}
if (error) {trx->tokudb_lock_count--; goto cleanup;}
}
}
transaction = trx->stmt; transaction = trx->stmt;
} }
else { else {
......
...@@ -37,6 +37,11 @@ typedef struct st_prim_key_part_info { ...@@ -37,6 +37,11 @@ typedef struct st_prim_key_part_info {
uint part_index; uint part_index;
} PRIM_KEY_PART_INFO; } PRIM_KEY_PART_INFO;
typedef enum {
lock_read = 0,
lock_write
} TABLE_LOCK_TYPE;
class ha_tokudb : public handler { class ha_tokudb : public handler {
private: private:
THR_LOCK_DATA lock; ///< MySQL lock THR_LOCK_DATA lock; ///< MySQL lock
...@@ -153,6 +158,7 @@ private: ...@@ -153,6 +158,7 @@ private:
DBT *get_pos(DBT * to, uchar * pos); DBT *get_pos(DBT * to, uchar * pos);
int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type); int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type);
int acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt);
public: public:
ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg); ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment