Commit 3105884e authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1069 #1077

merge into main line for 1.0.7

git-svn-id: file:///svn/mysql/tokudb-engine/src@5567 c7de825b-a66e-492c-adef-691d508d4ae1
parent 8f1f24b0
...@@ -139,7 +139,6 @@ static char *tokudb_log_dir; ...@@ -139,7 +139,6 @@ static char *tokudb_log_dir;
//static long tokudb_lock_scan_time = 0; //static long tokudb_lock_scan_time = 0;
//static ulong tokudb_region_size = 0; //static ulong tokudb_region_size = 0;
//static ulong tokudb_cache_parts = 1; //static ulong tokudb_cache_parts = 1;
static ulong tokudb_trans_retry = 1;
static ulong tokudb_max_lock; static ulong tokudb_max_lock;
static ulong tokudb_debug; static ulong tokudb_debug;
#ifdef TOKUDB_VERSION #ifdef TOKUDB_VERSION
...@@ -209,10 +208,8 @@ static int tokudb_init_func(void *p) { ...@@ -209,10 +208,8 @@ static int tokudb_init_func(void *p) {
tokudb_hton->savepoint_rollback = tokudb_rollback_to_savepoint; tokudb_hton->savepoint_rollback = tokudb_rollback_to_savepoint;
tokudb_hton->savepoint_release = tokudb_release_savepoint; tokudb_hton->savepoint_release = tokudb_release_savepoint;
#endif #endif
if (tokudb_init_flags & DB_INIT_TXN) {
tokudb_hton->commit = tokudb_commit; tokudb_hton->commit = tokudb_commit;
tokudb_hton->rollback = tokudb_rollback; tokudb_hton->rollback = tokudb_rollback;
}
tokudb_hton->panic = tokudb_end; tokudb_hton->panic = tokudb_end;
tokudb_hton->flush_logs = tokudb_flush_logs; tokudb_hton->flush_logs = tokudb_flush_logs;
tokudb_hton->show_status = tokudb_show_status; tokudb_hton->show_status = tokudb_show_status;
...@@ -1032,7 +1029,6 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i ...@@ -1032,7 +1029,6 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
char newname[strlen(name) + 32]; char newname[strlen(name) + 32];
DBT cmp_byte_stream; DBT cmp_byte_stream;
if (tokudb_init_flags & DB_INIT_TXN)
open_flags += DB_AUTO_COMMIT; open_flags += DB_AUTO_COMMIT;
if ((error = db_create(ptr, db_env, 0))) { if ((error = db_create(ptr, db_env, 0))) {
...@@ -1098,7 +1094,6 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { ...@@ -1098,7 +1094,6 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
transaction = NULL; transaction = NULL;
cursor = NULL; cursor = NULL;
if (tokudb_init_flags & DB_INIT_TXN)
open_flags += DB_AUTO_COMMIT; open_flags += DB_AUTO_COMMIT;
/* Open primary key */ /* Open primary key */
...@@ -1784,7 +1779,7 @@ DBT *ha_tokudb::pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ ...@@ -1784,7 +1779,7 @@ DBT *ha_tokudb::pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_
int ha_tokudb::read_last() { int ha_tokudb::read_last() {
TOKUDB_DBUG_ENTER("ha_tokudb::read_last"); TOKUDB_DBUG_ENTER("ha_tokudb::read_last");
int do_commit = 0; int do_commit = 0;
if (transaction == NULL && (tokudb_init_flags & DB_INIT_TXN)) { if (transaction == NULL) {
int r = db_env->txn_begin(db_env, 0, &transaction, 0); int r = db_env->txn_begin(db_env, 0, &transaction, 0);
assert(r == 0); assert(r == 0);
do_commit = 1; do_commit = 1;
...@@ -1980,7 +1975,13 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -1980,7 +1975,13 @@ int ha_tokudb::write_row(uchar * record) {
TOKUDB_DBUG_ENTER("ha_tokudb::write_row"); TOKUDB_DBUG_ENTER("ha_tokudb::write_row");
DBT row, prim_key, key; DBT row, prim_key, key;
int error; int error;
THD *thd = NULL;
u_int32_t put_flags;
//
// some crap that needs to be done because MySQL does not properly abstract
// this work away from us, namely filling in auto increment and setting auto timestamp
//
statistic_increment(table->in_use->status_var.ha_write_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_write_count, &LOCK_status);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) { if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) {
table->timestamp_field->set_time(); table->timestamp_field->set_time();
...@@ -1989,75 +1990,67 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -1989,75 +1990,67 @@ int ha_tokudb::write_row(uchar * record) {
update_auto_increment(); update_auto_increment();
} }
if ((error = pack_row(&row, (const uchar *) record))){ if ((error = pack_row(&row, (const uchar *) record))){
TOKUDB_DBUG_RETURN(error); goto cleanup;
} }
if (hidden_primary_key) { if (hidden_primary_key) {
get_auto_primary_key(current_ident); get_auto_primary_key(current_ident);
} }
u_int32_t put_flags = share->key_type[primary_key]; //
THD *thd = ha_thd(); // first the primary key (because it must be unique, has highest chance of failure)
//
put_flags = share->key_type[primary_key];
thd = ha_thd();
if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) { if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
put_flags = DB_YESOVERWRITE; put_flags = DB_YESOVERWRITE;
} }
if (table_share->keys + test(hidden_primary_key) == 1) {
error = share->file->put(share->file, transaction, create_dbt_key_from_table(&prim_key, primary_key, key_buff, record), &row, put_flags); error = share->file->put(share->file, transaction, create_dbt_key_from_table(&prim_key, primary_key, key_buff, record), &row, put_flags);
if (error) {
last_dup_key = primary_key; last_dup_key = primary_key;
} else { goto cleanup;
DB_TXN *sub_trans = transaction; }
/* QQQ Don't use sub transactions in temporary tables */ //
for (uint retry = 0; retry < tokudb_trans_retry; retry++) { // now insertion for rest of indexes
key_map changed_keys(0); //
if (!(error = share->file->put(share->file, sub_trans, create_dbt_key_from_table(&prim_key, primary_key, key_buff, record), &row, put_flags))) {
changed_keys.set_bit(primary_key);
for (uint keynr = 0; keynr < table_share->keys; keynr++) { for (uint keynr = 0; keynr < table_share->keys; keynr++) {
if (keynr == primary_key) if (keynr == primary_key) {
continue; continue;
}
put_flags = share->key_type[keynr]; put_flags = share->key_type[keynr];
if (put_flags == DB_NOOVERWRITE && thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) if (put_flags == DB_NOOVERWRITE && thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
put_flags = DB_YESOVERWRITE; put_flags = DB_YESOVERWRITE;
if ((error = share->key_file[keynr]->put(share->key_file[keynr], sub_trans, create_dbt_key_from_table(&key, keynr, key_buff2, record), &prim_key, put_flags))) {
last_dup_key = keynr;
break;
}
changed_keys.set_bit(keynr);
}
}
else {
last_dup_key = primary_key;
} }
error = share->key_file[keynr]->put(
share->key_file[keynr],
transaction,
create_dbt_key_from_table(&key, keynr, key_buff2, record),
&prim_key,
put_flags
);
//
// We break if we hit an error, unless it is a dup key error
// and MySQL told us to ignore duplicate key errors
//
if (error) { if (error) {
/* Remove inserted row */ if (using_ignore && error == DB_KEYEXIST) {
DBUG_PRINT("error", ("Got error %d", error)); error = 0;
if (using_ignore) { continue;
int new_error = 0;
if (!changed_keys.is_clear_all()) {
new_error = 0;
for (uint keynr = 0; keynr < table_share->keys + test(hidden_primary_key); keynr++) {
if (changed_keys.is_set(keynr)) {
if ((new_error = remove_key(sub_trans, keynr, record, &prim_key)))
break;
}
}
}
if (new_error) {
error = new_error; // This shouldn't happen
break;
} }
else {
last_dup_key = keynr;
goto cleanup;
} }
} }
if (error != DB_LOCK_DEADLOCK && error != DB_LOCK_NOTGRANTED)
break;
} }
if (!error) {
added_rows++;
} }
cleanup:
if (error == DB_KEYEXIST) { if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY; error = HA_ERR_FOUND_DUPP_KEY;
} }
else if (!error) {
added_rows++;
}
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -2117,43 +2110,6 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons ...@@ -2117,43 +2110,6 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
/*
Restore changed keys, when a non-fatal error aborts the insert/update
of one row.
Clobbers keybuff2
*/
int ha_tokudb::restore_keys(DB_TXN * trans, key_map * changed_keys, uint primary_key, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * new_key) {
TOKUDB_DBUG_ENTER("restore_keys");
int error;
DBT tmp_key;
uint keynr;
/* Restore the old primary key, and the old row, but don't ignore
duplicate key failure */
if ((error = update_primary_key(trans, TRUE, new_row, new_key, old_row, old_key, FALSE)))
goto err;
/* Remove the new key, and put back the old key
changed_keys is a map of all non-primary keys that need to be
rolled back. The last key set in changed_keys is the one that
triggered the duplicate key error (it wasn't inserted), so for
that one just put back the old value. */
if (!changed_keys->is_clear_all()) {
for (keynr = 0; keynr < table_share->keys + test(hidden_primary_key); keynr++) {
if (changed_keys->is_set(keynr)) {
if (changed_keys->is_prefix(1) && (error = remove_key(trans, keynr, new_row, new_key)))
break;
if ((error = share->key_file[keynr]->put(share->key_file[keynr], trans, create_dbt_key_from_table(&tmp_key, keynr, key_buff2, old_row), old_key, share->key_type[keynr])))
break;
}
}
}
err:
DBUG_ASSERT(error != DB_KEYEXIST);
TOKUDB_DBUG_RETURN(error);
}
// //
// Updates a row in the table, called when handling an UPDATE query // Updates a row in the table, called when handling an UPDATE query
// Parameters: // Parameters:
...@@ -2167,13 +2123,13 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2167,13 +2123,13 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
TOKUDB_DBUG_ENTER("update_row"); TOKUDB_DBUG_ENTER("update_row");
DBT prim_key, key, old_prim_key; DBT prim_key, key, old_prim_key;
int error; int error;
DB_TXN *sub_trans;
bool primary_key_changed; bool primary_key_changed;
LINT_INIT(error); LINT_INIT(error);
statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) {
table->timestamp_field->set_time(); table->timestamp_field->set_time();
}
if (hidden_primary_key) { if (hidden_primary_key) {
primary_key_changed = 0; primary_key_changed = 0;
...@@ -2181,55 +2137,59 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2181,55 +2137,59 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
prim_key.data = (void *) current_ident; prim_key.data = (void *) current_ident;
prim_key.size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH; prim_key.size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
old_prim_key = prim_key; old_prim_key = prim_key;
} else { }
else {
create_dbt_key_from_table(&prim_key, primary_key, key_buff, new_row); create_dbt_key_from_table(&prim_key, primary_key, key_buff, new_row);
if ((primary_key_changed = key_cmp(primary_key, old_row, new_row))) {
if ((primary_key_changed = key_cmp(primary_key, old_row, new_row)))
create_dbt_key_from_table(&old_prim_key, primary_key, primary_key_buff, old_row); create_dbt_key_from_table(&old_prim_key, primary_key, primary_key_buff, old_row);
else }
else {
old_prim_key = prim_key; old_prim_key = prim_key;
} }
}
sub_trans = transaction;
for (uint retry = 0; retry < tokudb_trans_retry; retry++) {
key_map changed_keys(0);
/* Start by updating the primary key */ /* Start by updating the primary key */
if (!(error = update_primary_key(sub_trans, primary_key_changed, old_row, &old_prim_key, new_row, &prim_key, using_ignore))) { error = update_primary_key(transaction, primary_key_changed, old_row, &old_prim_key, new_row, &prim_key, using_ignore);
if (error) {
last_dup_key = primary_key;
goto cleanup;
}
// Update all other keys // Update all other keys
for (uint keynr = 0; keynr < table_share->keys; keynr++) { for (uint keynr = 0; keynr < table_share->keys; keynr++) {
if (keynr == primary_key) if (keynr == primary_key) {
continue; continue;
if (key_cmp(keynr, old_row, new_row) || primary_key_changed) {
if ((error = remove_key(sub_trans, keynr, old_row, &old_prim_key))) {
TOKUDB_DBUG_RETURN(error); // Fatal error
}
changed_keys.set_bit(keynr);
if ((error = share->key_file[keynr]->put(share->key_file[keynr], sub_trans, create_dbt_key_from_table(&key, keynr, key_buff2, new_row), &prim_key, share->key_type[keynr]))) {
last_dup_key = keynr;
break;
}
}
} }
if (key_cmp(keynr, old_row, new_row) || primary_key_changed) {
if ((error = remove_key(transaction, keynr, old_row, &old_prim_key))) {
goto cleanup;
} }
error = share->key_file[keynr]->put(
share->key_file[keynr],
transaction,
create_dbt_key_from_table(&key, keynr, key_buff2, new_row),
&prim_key,
share->key_type[keynr]
);
//
// We break if we hit an error, unless it is a dup key error
// and MySQL told us to ignore duplicate key errors
//
if (error) { if (error) {
/* Remove inserted row */ if (using_ignore && error == DB_KEYEXIST) {
DBUG_PRINT("error", ("Got error %d", error)); error = 0;
if (using_ignore) { continue;
int new_error = 0; }
if (!changed_keys.is_clear_all()) else {
new_error = restore_keys(transaction, &changed_keys, primary_key, old_row, &old_prim_key, new_row, &prim_key); last_dup_key = keynr;
if (new_error) { goto cleanup;
/* This shouldn't happen */
error = new_error;
break;
} }
} }
} }
if (error != DB_LOCK_DEADLOCK && error != DB_LOCK_NOTGRANTED)
break;
} }
if (error == DB_KEYEXIST) cleanup:
if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY; error = HA_ERR_FOUND_DUPP_KEY;
}
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -2324,22 +2284,17 @@ int ha_tokudb::delete_row(const uchar * record) { ...@@ -2324,22 +2284,17 @@ int ha_tokudb::delete_row(const uchar * record) {
statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status);
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record); create_dbt_key_from_table(&prim_key, primary_key, key_buff, record);
if (hidden_primary_key) if (hidden_primary_key) {
keys.set_bit(primary_key); keys.set_bit(primary_key);
}
/* Subtransactions may be used in order to retry the delete in /* Subtransactions may be used in order to retry the delete in
case we get a DB_LOCK_DEADLOCK error. */ case we get a DB_LOCK_DEADLOCK error. */
DB_TXN *sub_trans = transaction; DB_TXN *sub_trans = transaction;
for (uint retry = 0; retry < tokudb_trans_retry; retry++) {
error = remove_keys(sub_trans, record, &prim_key, &keys); error = remove_keys(sub_trans, record, &prim_key, &keys);
if (error) { if (error) {
DBUG_PRINT("error", ("Got error %d", error)); DBUG_PRINT("error", ("Got error %d", error));
break; // No retry - return error
}
if (error != DB_LOCK_DEADLOCK && error != DB_LOCK_NOTGRANTED)
break;
} }
if (!error) { else {
deleted_rows++; deleted_rows++;
} }
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
...@@ -3389,10 +3344,6 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { ...@@ -3389,10 +3344,6 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
// QQQ this is here to allow experiments without transactions // QQQ this is here to allow experiments without transactions
int error = 0; int error = 0;
tokudb_trx_data *trx = NULL; tokudb_trx_data *trx = NULL;
if ((tokudb_init_flags & DB_INIT_TXN) == 0) {
error = 0;
goto cleanup;
}
trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
if (!trx) { if (!trx) {
trx = (tokudb_trx_data *) trx = (tokudb_trx_data *)
...@@ -3523,8 +3474,6 @@ cleanup: ...@@ -3523,8 +3474,6 @@ cleanup:
int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) { int ha_tokudb::start_stmt(THD * thd, thr_lock_type lock_type) {
TOKUDB_DBUG_ENTER("ha_tokudb::start_stmt"); TOKUDB_DBUG_ENTER("ha_tokudb::start_stmt");
if (!(tokudb_init_flags & DB_INIT_TXN))
TOKUDB_DBUG_RETURN(0);
int error = 0; int error = 0;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
DBUG_ASSERT(trx); DBUG_ASSERT(trx);
......
...@@ -166,7 +166,6 @@ private: ...@@ -166,7 +166,6 @@ private:
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length); DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length);
int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key); int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key);
int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys); int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys);
int restore_keys(DB_TXN * trans, key_map * changed_keys, uint primary_key, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * new_key);
int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row); int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row);
int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key, bool local_using_ignore); int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key, bool local_using_ignore);
int handle_cursor_error(int error, int err_to_return, uint keynr); int handle_cursor_error(int error, int err_to_return, uint keynr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment