Commit 324c3b81 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

refs #4743 rollback alter table add key with partitions

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@46436 c7de825b-a66e-492c-adef-691d508d4ae1
parent b568fddb
...@@ -7765,7 +7765,7 @@ volatile int ha_tokudb_drop_indexes_wait = 0; // debug ...@@ -7765,7 +7765,7 @@ volatile int ha_tokudb_drop_indexes_wait = 0; // debug
// Internal function called by ha_tokudb::prepare_drop_index and ha_tokudb::alter_table_phase2 // Internal function called by ha_tokudb::prepare_drop_index and ha_tokudb::alter_table_phase2
// With a transaction, drops dictionaries associated with indexes in key_num // With a transaction, drops dictionaries associated with indexes in key_num
// //
int ha_tokudb::drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, DB_TXN* txn) { int ha_tokudb::drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, KEY *key_info, DB_TXN* txn) {
TOKUDB_DBUG_ENTER("ha_tokudb::drop_indexes"); TOKUDB_DBUG_ENTER("ha_tokudb::drop_indexes");
assert(txn); assert(txn);
...@@ -7787,10 +7787,10 @@ int ha_tokudb::drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, D ...@@ -7787,10 +7787,10 @@ int ha_tokudb::drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, D
assert(r==0); assert(r==0);
share->key_file[curr_index] = NULL; share->key_file[curr_index] = NULL;
error = remove_key_name_from_status(share->status_block, table_arg->key_info[curr_index].name, txn); error = remove_key_name_from_status(share->status_block, key_info[curr_index].name, txn);
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = delete_or_rename_dictionary(share->table_name, NULL, table_arg->key_info[curr_index].name, true, txn, true); error = delete_or_rename_dictionary(share->table_name, NULL, key_info[curr_index].name, true, txn, true);
if (error) { goto cleanup; } if (error) { goto cleanup; }
} }
......
...@@ -593,7 +593,7 @@ public: ...@@ -593,7 +593,7 @@ public:
bool* modified_DB bool* modified_DB
); );
void restore_add_index(TABLE* table_arg, uint num_of_keys, bool incremented_numDBs, bool modified_DBs); void restore_add_index(TABLE* table_arg, uint num_of_keys, bool incremented_numDBs, bool modified_DBs);
int drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, DB_TXN* txn); int drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys, KEY *key_info, DB_TXN* txn);
void restore_drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys); void restore_drop_indexes(TABLE *table_arg, uint *key_num, uint num_of_keys);
public: public:
......
...@@ -66,7 +66,7 @@ ha_tokudb::prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys) ...@@ -66,7 +66,7 @@ ha_tokudb::prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys)
error = db_env->txn_begin(db_env, 0, &txn, 0); error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = drop_indexes(table_arg, key_num, num_of_keys, txn); error = drop_indexes(table_arg, key_num, num_of_keys, table_arg->key_info, txn);
if (error) { goto cleanup; } if (error) { goto cleanup; }
cleanup: cleanup:
...@@ -457,7 +457,7 @@ ha_tokudb::alter_table_phase2( ...@@ -457,7 +457,7 @@ ha_tokudb::alter_table_phase2(
// drop indexes // drop indexes
if (dropping_indexes) { if (dropping_indexes) {
error = drop_indexes(table, alter_info->index_drop_buffer, alter_info->index_drop_count, txn); error = drop_indexes(table, alter_info->index_drop_buffer, alter_info->index_drop_count, table->key_info, txn);
if (error) { goto cleanup; } if (error) { goto cleanup; }
} }
......
...@@ -111,7 +111,7 @@ ha_tokudb::prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys) ...@@ -111,7 +111,7 @@ ha_tokudb::prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys)
DB_TXN *txn = transaction; DB_TXN *txn = transaction;
assert(txn); assert(txn);
int error = drop_indexes(table_arg, key_num, num_of_keys, txn); int error = drop_indexes(table_arg, key_num, num_of_keys, table_arg->key_info, txn);
DBUG_EXECUTE_IF("prepare_drop_index_fail", { DBUG_EXECUTE_IF("prepare_drop_index_fail", {
error = 1; error = 1;
}); });
......
...@@ -92,11 +92,13 @@ is_disjoint_add_drop(Alter_inplace_info *ha_alter_info) { ...@@ -92,11 +92,13 @@ is_disjoint_add_drop(Alter_inplace_info *ha_alter_info) {
class tokudb_alter_ctx : public inplace_alter_handler_ctx { class tokudb_alter_ctx : public inplace_alter_handler_ctx {
public: public:
tokudb_alter_ctx() { tokudb_alter_ctx() {
alter_txn = NULL;
add_index_changed = false; add_index_changed = false;
drop_index_changed = false; drop_index_changed = false;
compression_changed = false; compression_changed = false;
} }
public: public:
DB_TXN *alter_txn;
bool add_index_changed; bool add_index_changed;
bool incremented_num_DBs, modified_DBs; bool incremented_num_DBs, modified_DBs;
bool drop_index_changed; bool drop_index_changed;
...@@ -123,9 +125,6 @@ ha_tokudb::check_if_supported_inplace_alter(TABLE *altered_table, Alter_inplace_ ...@@ -123,9 +125,6 @@ ha_tokudb::check_if_supported_inplace_alter(TABLE *altered_table, Alter_inplace_
enum_alter_inplace_result result = HA_ALTER_INPLACE_NOT_SUPPORTED; // default is NOT inplace enum_alter_inplace_result result = HA_ALTER_INPLACE_NOT_SUPPORTED; // default is NOT inplace
HA_CREATE_INFO *create_info = ha_alter_info->create_info; HA_CREATE_INFO *create_info = ha_alter_info->create_info;
ha_alter_info->handler_ctx = new tokudb_alter_ctx;
assert(ha_alter_info->handler_ctx);
ulong handler_flags = fix_handler_flags(ha_alter_info, table, altered_table); ulong handler_flags = fix_handler_flags(ha_alter_info, table, altered_table);
// always allow rename table + any other operation, so turn off the handler flag // always allow rename table + any other operation, so turn off the handler flag
...@@ -221,12 +220,19 @@ ha_tokudb::check_if_supported_inplace_alter(TABLE *altered_table, Alter_inplace_ ...@@ -221,12 +220,19 @@ ha_tokudb::check_if_supported_inplace_alter(TABLE *altered_table, Alter_inplace_
DBUG_RETURN(result); DBUG_RETURN(result);
} }
// prepare not yet called by the mysql 5.5 patch
bool bool
ha_tokudb::prepare_inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alter_info) { ha_tokudb::prepare_inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alter_info) {
TOKUDB_DBUG_ENTER("prepare_inplace_alter_table"); TOKUDB_DBUG_ENTER("prepare_inplace_alter_table");
bool result = false; // success bool result = false; // success
assert(ha_alter_info->handler_ctx == NULL);
tokudb_alter_ctx *ctx = new tokudb_alter_ctx;
assert(ctx);
ha_alter_info->handler_ctx = ctx;
int r = db_env->txn_begin(db_env, 0, &ctx->alter_txn, 0);
if (r != 0)
result = true; // fail
DBUG_RETURN(result); DBUG_RETURN(result);
} }
...@@ -235,6 +241,7 @@ ha_tokudb::inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alte ...@@ -235,6 +241,7 @@ ha_tokudb::inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alte
TOKUDB_DBUG_ENTER("inplace_alter_table"); TOKUDB_DBUG_ENTER("inplace_alter_table");
int error = 0; int error = 0;
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
HA_CREATE_INFO *create_info = ha_alter_info->create_info; HA_CREATE_INFO *create_info = ha_alter_info->create_info;
ulong handler_flags = fix_handler_flags(ha_alter_info, table, altered_table); ulong handler_flags = fix_handler_flags(ha_alter_info, table, altered_table);
...@@ -249,24 +256,24 @@ ha_tokudb::inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alte ...@@ -249,24 +256,24 @@ ha_tokudb::inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alte
error = alter_table_add_or_drop_column(altered_table, ha_alter_info); error = alter_table_add_or_drop_column(altered_table, ha_alter_info);
} }
if (error == 0 && (handler_flags & Alter_inplace_info::CHANGE_CREATE_OPTION) && (create_info->used_fields & HA_CREATE_USED_AUTO)) { if (error == 0 && (handler_flags & Alter_inplace_info::CHANGE_CREATE_OPTION) && (create_info->used_fields & HA_CREATE_USED_AUTO)) {
error = write_auto_inc_create(share->status_block, create_info->auto_increment_value, transaction); error = write_auto_inc_create(share->status_block, create_info->auto_increment_value, ctx->alter_txn);
} }
if (error == 0 && (handler_flags & Alter_inplace_info::CHANGE_CREATE_OPTION) && (create_info->used_fields & HA_CREATE_USED_ROW_FORMAT)) { if (error == 0 && (handler_flags & Alter_inplace_info::CHANGE_CREATE_OPTION) && (create_info->used_fields & HA_CREATE_USED_ROW_FORMAT)) {
enum toku_compression_method method = row_type_to_compression_method(create_info->row_type);
// Get the current type // Get the current type
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx); tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
DB *db = share->key_file[0]; DB *db = share->key_file[0];
error = db->get_compression_method(db, &ctx->orig_compression_method); error = db->get_compression_method(db, &ctx->orig_compression_method);
assert(error == 0); assert(error == 0);
ctx->compression_changed = true;
// Set the new type. // Set the new type.
enum toku_compression_method method = row_type_to_compression_method(create_info->row_type);
uint32_t curr_num_DBs = table->s->keys + test(hidden_primary_key); uint32_t curr_num_DBs = table->s->keys + test(hidden_primary_key);
for (uint32_t i = 0; i < curr_num_DBs; i++) { for (uint32_t i = 0; i < curr_num_DBs; i++) {
db = share->key_file[i]; db = share->key_file[i];
error = db->change_compression_method(db, method); error = db->change_compression_method(db, method);
if (error) if (error)
break; break;
ctx->compression_changed = true;
} }
} }
...@@ -293,7 +300,7 @@ ha_tokudb::alter_table_add_index(TABLE *altered_table, Alter_inplace_info *ha_al ...@@ -293,7 +300,7 @@ ha_tokudb::alter_table_add_index(TABLE *altered_table, Alter_inplace_info *ha_al
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx); tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
ctx->add_index_changed = true; ctx->add_index_changed = true;
int error = tokudb_add_index(table, key_info, ha_alter_info->index_add_count, transaction, &ctx->incremented_num_DBs, &ctx->modified_DBs); int error = tokudb_add_index(table, key_info, ha_alter_info->index_add_count, ctx->alter_txn, &ctx->incremented_num_DBs, &ctx->modified_DBs);
if (error == HA_ERR_FOUND_DUPP_KEY) { if (error == HA_ERR_FOUND_DUPP_KEY) {
// hack for now, in case of duplicate key error, // hack for now, in case of duplicate key error,
// because at the moment we cannot display the right key // because at the moment we cannot display the right key
...@@ -307,24 +314,54 @@ ha_tokudb::alter_table_add_index(TABLE *altered_table, Alter_inplace_info *ha_al ...@@ -307,24 +314,54 @@ ha_tokudb::alter_table_add_index(TABLE *altered_table, Alter_inplace_info *ha_al
return error; return error;
} }
static bool find_index_of_key(const char *key_name, TABLE *table, uint *index_offset_ptr) {
for (uint i = 0; i < table->s->keys; i++) {
if (strcmp(key_name, table->key_info[i].name) == 0) {
*index_offset_ptr = i;
return true;
}
}
return false;
}
static bool find_index_of_key(const char *key_name, KEY *key_info, uint key_count, uint *index_offset_ptr) {
for (uint i = 0; i < key_count; i++) {
if (strcmp(key_name, key_info[i].name) == 0) {
*index_offset_ptr = i;
return true;
}
}
return false;
}
int int
ha_tokudb::alter_table_drop_index(TABLE *altered_table, Alter_inplace_info *ha_alter_info) { ha_tokudb::alter_table_drop_index(TABLE *altered_table, Alter_inplace_info *ha_alter_info) {
// translate KEY pointers to indexes into the key_info array KEY *key_info = table->key_info;
// translate key names to indexes into the key_info array
uint index_drop_offsets[ha_alter_info->index_drop_count]; uint index_drop_offsets[ha_alter_info->index_drop_count];
for (uint i = 0; i < ha_alter_info->index_drop_count; i++) for (uint i = 0; i < ha_alter_info->index_drop_count; i++) {
index_drop_offsets[i] = ha_alter_info->index_drop_buffer[i] - table->key_info; bool found;
found = find_index_of_key(ha_alter_info->index_drop_buffer[i]->name, table, &index_drop_offsets[i]);
if (!found) {
// undo of add key in partition engine
found = find_index_of_key(ha_alter_info->index_drop_buffer[i]->name, ha_alter_info->key_info_buffer, ha_alter_info->key_count, &index_drop_offsets[i]);
assert(found);
key_info = ha_alter_info->key_info_buffer;
}
}
// drop indexes // drop indexes
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx); tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
ctx->drop_index_changed = true; ctx->drop_index_changed = true;
int error = drop_indexes(table, index_drop_offsets, ha_alter_info->index_drop_count, transaction); int error = drop_indexes(table, index_drop_offsets, ha_alter_info->index_drop_count, key_info, ctx->alter_txn);
return error; return error;
} }
int int
ha_tokudb::alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_info *ha_alter_info) { ha_tokudb::alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_info *ha_alter_info) {
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
int error; int error;
uchar *column_extra = NULL; uchar *column_extra = NULL;
uchar *row_desc_buff = NULL; uchar *row_desc_buff = NULL;
...@@ -413,7 +450,7 @@ ha_tokudb::alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_in ...@@ -413,7 +450,7 @@ ha_tokudb::alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_in
} }
error = share->key_file[i]->change_descriptor( error = share->key_file[i]->change_descriptor(
share->key_file[i], share->key_file[i],
transaction, ctx->alter_txn,
&row_descriptor, &row_descriptor,
0 0
); );
...@@ -437,7 +474,7 @@ ha_tokudb::alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_in ...@@ -437,7 +474,7 @@ ha_tokudb::alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_in
DBUG_ASSERT(num_column_extra <= max_column_extra_size); DBUG_ASSERT(num_column_extra <= max_column_extra_size);
error = share->key_file[i]->update_broadcast( error = share->key_file[i]->update_broadcast(
share->key_file[i], share->key_file[i],
transaction, ctx->alter_txn,
&column_dbt, &column_dbt,
DB_IS_RESETTING_OP DB_IS_RESETTING_OP
); );
...@@ -457,54 +494,47 @@ bool ...@@ -457,54 +494,47 @@ bool
ha_tokudb::commit_inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alter_info, bool commit) { ha_tokudb::commit_inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alter_info, bool commit) {
TOKUDB_DBUG_ENTER("commit_inplace_alter_table"); TOKUDB_DBUG_ENTER("commit_inplace_alter_table");
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
bool result = false; // success bool result = false; // success
if (commit) { if (commit) {
if (altered_table->part_info == NULL) { if (altered_table->part_info == NULL) {
// read frmdata for the altered table // read frm data for the altered table
uchar *frm_data; size_t frm_len; uchar *frm_data; size_t frm_len;
int error = readfrm(altered_table->s->path.str, &frm_data, &frm_len); int error = readfrm(altered_table->s->path.str, &frm_data, &frm_len);
if (error) { if (!error) {
result = true; // transactionally write frm data to status
} else { error = write_to_status(share->status_block, hatoku_frm_data, (void *)frm_data, (uint)frm_len, ctx->alter_txn);
// transactionally write frmdata to status
assert(transaction);
error = write_to_status(share->status_block, hatoku_frm_data, (void *)frm_data, (uint)frm_len, transaction);
if (error) {
result = true;
}
my_free(frm_data); my_free(frm_data);
} }
if (error) if (error) {
commit = false;
result = true;
print_error(error, MYF(0)); print_error(error, MYF(0));
} }
} }
}
if (!commit || result == true) { if (commit) {
// commit the alter transaction NOW
// abort the transaction NOW so that any alters are rolled back. this allows the following restores to work. commit_txn(ctx->alter_txn, 0);
THD *thd = ha_thd(); ctx->alter_txn = NULL;
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); } else {
assert(trx && transaction == trx->stmt && transaction == trx->sub_sp_level); // abort the alter transaction NOW so that any alters are rolled back. this allows the following restores to work.
abort_txn(transaction); abort_txn(ctx->alter_txn);
transaction = NULL; ctx->alter_txn = NULL;
trx->stmt = NULL;
trx->sub_sp_level = NULL;
trx->should_abort = false;
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
if (ctx->add_index_changed) { if (ctx->add_index_changed) {
restore_add_index(table, ha_alter_info->index_add_count, ctx->incremented_num_DBs, ctx->modified_DBs); restore_add_index(table, ha_alter_info->index_add_count, ctx->incremented_num_DBs, ctx->modified_DBs);
} }
if (ctx->drop_index_changed) { if (ctx->drop_index_changed) {
// translate KEY pointers to indexes into the key_info array // translate key names to indexes into the key_info array
uint index_drop_offsets[ha_alter_info->index_drop_count]; uint index_drop_offsets[ha_alter_info->index_drop_count];
for (uint i = 0; i < ha_alter_info->index_drop_count; i++) for (uint i = 0; i < ha_alter_info->index_drop_count; i++) {
index_drop_offsets[i] = ha_alter_info->index_drop_buffer[i] - table->key_info; bool found = find_index_of_key(ha_alter_info->index_drop_buffer[i]->name, table, &index_drop_offsets[i]);
assert(found);
}
restore_drop_indexes(table, index_drop_offsets, ha_alter_info->index_drop_count); restore_drop_indexes(table, index_drop_offsets, ha_alter_info->index_drop_count);
} }
if (ctx->compression_changed) { if (ctx->compression_changed) {
......
...@@ -210,8 +210,9 @@ static inline void make_name(char *newname, const char *tablename, const char *d ...@@ -210,8 +210,9 @@ static inline void make_name(char *newname, const char *tablename, const char *d
} }
static inline void commit_txn(DB_TXN* txn, uint32_t flags) { static inline void commit_txn(DB_TXN* txn, uint32_t flags) {
int r; if (tokudb_debug & TOKUDB_DEBUG_TXN)
r = txn->commit(txn, flags); TOKUDB_TRACE("commit_txn %p\n", txn);
int r = txn->commit(txn, flags);
if (r != 0) { if (r != 0) {
sql_print_error("tried committing transaction %p and got error code %d", txn, r); sql_print_error("tried committing transaction %p and got error code %d", txn, r);
} }
...@@ -219,8 +220,9 @@ static inline void commit_txn(DB_TXN* txn, uint32_t flags) { ...@@ -219,8 +220,9 @@ static inline void commit_txn(DB_TXN* txn, uint32_t flags) {
} }
static inline void abort_txn(DB_TXN* txn) { static inline void abort_txn(DB_TXN* txn) {
int r; if (tokudb_debug & TOKUDB_DEBUG_TXN)
r = txn->abort(txn); TOKUDB_TRACE("abort_txn %p\n", txn);
int r = txn->abort(txn);
if (r != 0) { if (r != 0) {
sql_print_error("tried aborting transaction %p and got error code %d", txn, r); sql_print_error("tried aborting transaction %p and got error code %d", txn, r);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment