Commit 69d5695c authored by Rik Prohaska's avatar Rik Prohaska

DB-823 fix lock tables + transaction accounting for subsequent operations

parent 135dcf67
......@@ -6167,6 +6167,12 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
if (error) { goto cleanup; }
thd_set_ha_data(thd, tokudb_hton, trx);
}
if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_HANDLER_TRACE("trx %p %p %p %p %u %u", trx->all, trx->stmt, trx->sp_level, trx->sub_sp_level,
trx->tokudb_lock_count, trx->create_lock_count);
}
if (trx->all == NULL) {
trx->sp_level = NULL;
}
......@@ -6175,22 +6181,16 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
if (lock_type == F_WRLCK) {
use_write_locks = true;
}
if (!trx->tokudb_lock_count++) {
if (trx->stmt) {
if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_HANDLER_TRACE("stmt already set %p %p %p %p", trx->all, trx->stmt, trx->sp_level, trx->sub_sp_level);
}
} else {
assert(trx->stmt == 0);
transaction = NULL; // Safety
error = create_txn(thd, trx);
if (error) {
trx->tokudb_lock_count--; // We didn't get the lock
goto cleanup;
}
if (!trx->stmt) {
transaction = NULL; // Safety
error = create_txn(thd, trx);
if (error) {
goto cleanup;
}
trx->create_lock_count = trx->tokudb_lock_count;
}
transaction = trx->sub_sp_level;
trx->tokudb_lock_count++;
}
else {
tokudb_pthread_mutex_lock(&share->mutex);
......@@ -6205,21 +6205,24 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
added_rows = 0;
deleted_rows = 0;
share->rows_from_locked_table = 0;
if (trx->tokudb_lock_count > 0 && !--trx->tokudb_lock_count) {
if (trx->stmt) {
/*
F_UNLCK is done without a transaction commit / rollback.
This happens if the thread didn't update any rows
We must in this case commit the work to keep the row locks
*/
DBUG_PRINT("trans", ("commiting non-updating transaction"));
reset_stmt_progress(&trx->stmt_progress);
commit_txn(trx->stmt, 0);
trx->stmt = NULL;
trx->sub_sp_level = NULL;
if (trx->tokudb_lock_count > 0) {
if (--trx->tokudb_lock_count <= trx->create_lock_count) {
trx->create_lock_count = 0;
if (trx->stmt) {
/*
F_UNLCK is done without a transaction commit / rollback.
This happens if the thread didn't update any rows
We must in this case commit the work to keep the row locks
*/
DBUG_PRINT("trans", ("commiting non-updating transaction"));
reset_stmt_progress(&trx->stmt_progress);
commit_txn(trx->stmt, 0);
trx->stmt = NULL;
trx->sub_sp_level = NULL;
}
}
transaction = NULL;
}
transaction = NULL;
}
cleanup:
if (tokudb_debug & TOKUDB_DEBUG_LOCK)
......@@ -6234,8 +6237,9 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
*/
int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) {
TOKUDB_HANDLER_DBUG_ENTER("cmd %d lock %d %s", thd_sql_command(thd), lock_type, share->table_name);
if (0)
if (tokudb_debug & TOKUDB_DEBUG_LOCK) {
TOKUDB_HANDLER_TRACE("q %s", thd->query());
}
int error = 0;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_get_ha_data(thd, tokudb_hton);
......@@ -6245,6 +6249,11 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) {
thd_set_ha_data(thd, tokudb_hton, trx);
}
if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_HANDLER_TRACE("trx %p %p %p %p %u %u", trx->all, trx->stmt, trx->sp_level, trx->sub_sp_level,
trx->tokudb_lock_count, trx->create_lock_count);
}
/*
note that trx->stmt may have been already initialized as start_stmt()
is called for *each table* not for each storage engine,
......@@ -6255,9 +6264,7 @@ int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) {
if (error) {
goto cleanup;
}
if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_HANDLER_TRACE("%p %p %p %p %u", trx->all, trx->stmt, trx->sp_level, trx->sub_sp_level, trx->tokudb_lock_count);
}
trx->create_lock_count = trx->tokudb_lock_count;
}
else {
if (tokudb_debug & TOKUDB_DEBUG_TXN) {
......
......@@ -784,13 +784,16 @@ bool ha_tokudb::commit_inplace_alter_table(TABLE *altered_table, Alter_inplace_i
assert(trx->tokudb_lock_count > 0);
// for partitioned tables, we use a single transaction to do all of the partition changes. the tokudb_lock_count
// is a reference count for each of the handlers to the same transaction. obviously, we want to only abort once.
if (!--trx->tokudb_lock_count) {
abort_txn(ctx->alter_txn);
ctx->alter_txn = NULL;
trx->stmt = NULL;
trx->sub_sp_level = NULL;
if (trx->tokudb_lock_count > 0) {
if (--trx->tokudb_lock_count <= trx->create_lock_count) {
trx->create_lock_count = 0;
abort_txn(ctx->alter_txn);
ctx->alter_txn = NULL;
trx->stmt = NULL;
trx->sub_sp_level = NULL;
}
transaction = NULL;
}
transaction = NULL;
if (ctx->add_index_changed) {
restore_add_index(table, ha_alter_info->index_add_count, ctx->incremented_num_DBs, ctx->modified_DBs);
......
......@@ -354,6 +354,7 @@ typedef struct st_tokudb_trx_data {
DB_TXN *sp_level;
DB_TXN *sub_sp_level;
uint tokudb_lock_count;
uint create_lock_count;
tokudb_stmt_progress stmt_progress;
bool checkpoint_lock_taken;
LIST *handlers;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment