Commit e10e4c0c authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1788

make updates faster
by not doing extraneous delete

git-svn-id: file:///svn/mysql/tokudb-engine/src@12434 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2882af63
...@@ -862,6 +862,9 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i ...@@ -862,6 +862,9 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
my_errno = error; my_errno = error;
goto cleanup; goto cleanup;
} }
//
// TODO: make sure that with clustering keys, DB_YESOVERWRITE IS ALWAYS SET
//
*key_type = key_info->flags & HA_NOSAME ? DB_NOOVERWRITE : DB_YESOVERWRITE; *key_type = key_info->flags & HA_NOSAME ? DB_NOOVERWRITE : DB_YESOVERWRITE;
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_dbt_key); (*ptr)->set_bt_compare(*ptr, tokudb_cmp_dbt_key);
...@@ -2507,21 +2510,30 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons ...@@ -2507,21 +2510,30 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
if (primary_key_changed) { if (primary_key_changed) {
// Primary key changed or we are updating a key that can have duplicates. // Primary key changed or we are updating a key that can have duplicates.
// Delete the old row and add a new one // Delete the old row and add a new one
if (!(error = remove_key(trans, primary_key, old_row, old_key))) { error = remove_key(trans, primary_key, old_row, old_key);
if (!(error = pack_row(&row, new_row, primary_key))) { if (error) { goto cleanup; }
if ((error = share->file->put(share->file, trans, new_key, &row, share->key_type[primary_key]))) {
// Probably a duplicated key; restore old key and row if needed error = pack_row(&row, new_row, primary_key);
if (error) { goto cleanup; }
error = share->file->put(share->file, trans, new_key, &row, share->key_type[primary_key]);
if (error) {
last_dup_key = primary_key; last_dup_key = primary_key;
} goto cleanup;
}
} }
} }
else { else {
// Primary key didn't change; just update the row data // Primary key didn't change; just update the row data
if (!(error = pack_row(&row, new_row, primary_key))) { error = pack_row(&row, new_row, primary_key);
error = share->file->put(share->file, trans, new_key, &row, 0); if (error) { goto cleanup; }
}
//
// we can use DB_YESOVERWRITE because we know we are overwriting the old row
//
error = share->file->put(share->file, trans, new_key, &row, DB_YESOVERWRITE);
if (error) { goto cleanup; }
} }
cleanup:
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -2614,18 +2626,27 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2614,18 +2626,27 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
} }
// Update all other keys // Update all other keys
for (uint keynr = 0; keynr < table_share->keys; keynr++) { for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool secondary_key_changed = key_cmp(keynr, old_row, new_row);
if (keynr == primary_key) { if (keynr == primary_key) {
continue; continue;
} }
if (table->key_info[keynr].flags & HA_CLUSTERING || if (table->key_info[keynr].flags & HA_CLUSTERING ||
key_cmp(keynr, old_row, new_row) || secondary_key_changed ||
primary_key_changed primary_key_changed
) )
{ {
u_int32_t put_flags; u_int32_t put_flags;
if ((error = remove_key(txn, keynr, old_row, &old_prim_key))) { //
// only remove the old value if the key has changed
// if the key has not changed (in case of clustering keys,
// then we overwrite the old value)
//
if (secondary_key_changed || primary_key_changed) {
error = remove_key(txn, keynr, old_row, &old_prim_key);
if (error) {
goto cleanup; goto cleanup;
} }
}
create_dbt_key_from_table(&key, keynr, key_buff2, new_row, &has_null), create_dbt_key_from_table(&key, keynr, key_buff2, new_row, &has_null),
put_flags = share->key_type[keynr]; put_flags = share->key_type[keynr];
if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) { if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) {
...@@ -2635,6 +2656,11 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2635,6 +2656,11 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
if ((error = pack_row(&row, (const uchar *) new_row, keynr))){ if ((error = pack_row(&row, (const uchar *) new_row, keynr))){
goto cleanup; goto cleanup;
} }
//
// make sure that for clustering keys, we are using DB_YESOVERWRITE,
// therefore making this put an overwrite if the key has not changed
//
assert(put_flags & DB_YESOVERWRITE);
error = share->key_file[keynr]->put( error = share->key_file[keynr]->put(
share->key_file[keynr], share->key_file[keynr],
txn, txn,
...@@ -2754,17 +2780,15 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT ...@@ -2754,17 +2780,15 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT
// 0 on success // 0 on success
// error otherwise // error otherwise
// //
int ha_tokudb::remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys) { int ha_tokudb::remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key) {
int result = 0; int result = 0;
for (uint keynr = 0; keynr < table_share->keys + test(hidden_primary_key); keynr++) { for (uint keynr = 0; keynr < table_share->keys + test(hidden_primary_key); keynr++) {
if (keys->is_set(keynr)) {
int new_error = remove_key(trans, keynr, record, prim_key); int new_error = remove_key(trans, keynr, record, prim_key);
if (new_error) { if (new_error) {
result = new_error; // Return last error result = new_error; // Return last error
break; // Let rollback correct things break; // Let rollback correct things
} }
} }
}
return result; return result;
} }
...@@ -2800,7 +2824,7 @@ int ha_tokudb::delete_row(const uchar * record) { ...@@ -2800,7 +2824,7 @@ int ha_tokudb::delete_row(const uchar * record) {
/* Subtransactions may be used in order to retry the delete in /* Subtransactions may be used in order to retry the delete in
case we get a DB_LOCK_DEADLOCK error. */ case we get a DB_LOCK_DEADLOCK error. */
DB_TXN *sub_trans = transaction; DB_TXN *sub_trans = transaction;
error = remove_keys(sub_trans, record, &prim_key, &keys); error = remove_keys(sub_trans, record, &prim_key);
if (error) { if (error) {
DBUG_PRINT("error", ("Got error %d", error)); DBUG_PRINT("error", ("Got error %d", error));
} }
......
...@@ -245,7 +245,7 @@ private: ...@@ -245,7 +245,7 @@ private:
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH); DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length, uchar inf_byte); DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length, uchar inf_byte);
int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key); int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key);
int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys); int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key);
int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row); int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row);
int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key); int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key);
int handle_cursor_error(int error, int err_to_return, uint keynr); int handle_cursor_error(int error, int err_to_return, uint keynr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment