Commit 1801c673 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2314], get updates to use put_multiple API

git-svn-id: file:///svn/mysql/tokudb-engine/src@17231 c7de825b-a66e-492c-adef-691d508d4ae1
parent 913285b1
...@@ -3285,47 +3285,6 @@ int ha_tokudb::key_cmp(uint keynr, const uchar * old_row, const uchar * new_row) ...@@ -3285,47 +3285,6 @@ int ha_tokudb::key_cmp(uint keynr, const uchar * old_row, const uchar * new_row)
return 0; return 0;
} }
/*
Update a row from one value to another.
Clobbers key_buff2
*/
int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * new_key) {
TOKUDB_DBUG_ENTER("update_primary_key");
DBT row;
int error;
if (primary_key_changed) {
u_int32_t put_flags = hidden_primary_key ? DB_YESOVERWRITE : DB_NOOVERWRITE;
// Primary key changed or we are updating a key that can have duplicates.
// Delete the old row and add a new one
error = remove_key(trans, primary_key, old_row, old_key);
if (error) { goto cleanup; }
error = pack_row(&row, new_row, primary_key);
if (error) { goto cleanup; }
error = share->file->put(share->file, trans, new_key, &row, put_flags);
if (error) {
last_dup_key = primary_key;
goto cleanup;
}
}
else {
// Primary key didn't change; just update the row data
error = pack_row(&row, new_row, primary_key);
if (error) { goto cleanup; }
//
// we can use DB_YESOVERWRITE because we know we are overwriting the old row
//
error = share->file->put(share->file, trans, new_key, &row, DB_YESOVERWRITE);
if (error) { goto cleanup; }
}
cleanup:
TOKUDB_DBUG_RETURN(error);
}
// //
// Updates a row in the table, called when handling an UPDATE query // Updates a row in the table, called when handling an UPDATE query
// Parameters: // Parameters:
...@@ -3337,16 +3296,32 @@ cleanup: ...@@ -3337,16 +3296,32 @@ cleanup:
// //
int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
TOKUDB_DBUG_ENTER("update_row"); TOKUDB_DBUG_ENTER("update_row");
DBT prim_key, key, old_prim_key, row; DBT prim_key, key, old_prim_key, row, prim_row;
int error; int error;
bool primary_key_changed; bool primary_key_changed;
bool has_null; bool has_null;
THD* thd = ha_thd(); THD* thd = ha_thd();
DB_TXN* sub_trans = NULL; DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
u_int32_t mult_put_flags[MAX_KEY + 1] = {DB_YESOVERWRITE};
DB* dbs[MAX_KEY + 1];
DBT key_dbts[MAX_KEY + 1];
DBT rec_dbts[MAX_KEY + 1];
DBT prim_val;
u_int32_t curr_db_index;
bool use_put_multiple = share->version > 2;
LINT_INIT(error); LINT_INIT(error);
bzero((void *) &row, sizeof(row));
bzero((void *) &prim_key, sizeof(prim_key));
bzero((void *) &old_prim_key, sizeof(old_prim_key));
bzero((void *) &prim_row, sizeof(prim_row));
bzero((void *) &key, sizeof(key));
bzero((void *) &key_dbts, sizeof(key));
bzero((void *) &rec_dbts, sizeof(key));
statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) { if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) {
table->timestamp_field->set_time(); table->timestamp_field->set_time();
...@@ -3374,6 +3349,13 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3374,6 +3349,13 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
pthread_mutex_unlock(&share->mutex); pthread_mutex_unlock(&share->mutex);
} }
if (using_ignore) {
error = db_env->txn_begin(db_env, transaction, &sub_trans, DB_INHERIT_ISOLATION);
if (error) {
goto cleanup;
}
}
txn = using_ignore ? sub_trans : transaction;
if (hidden_primary_key) { if (hidden_primary_key) {
...@@ -3393,20 +3375,32 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3393,20 +3375,32 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
} }
} }
if (using_ignore) { if (primary_key_changed) {
error = db_env->txn_begin(db_env, transaction, &sub_trans, DB_INHERIT_ISOLATION); // Primary key changed or we are updating a key that can have duplicates.
if (error) { // Delete the old row and add a new one
goto cleanup; error = remove_key(txn, primary_key, old_row, &old_prim_key);
} if (error) { goto cleanup; }
} }
txn = using_ignore ? sub_trans : transaction;
/* Start by updating the primary key */ error = pack_row(&prim_row, new_row, primary_key);
error = update_primary_key(txn, primary_key_changed, old_row, &old_prim_key, new_row, &prim_key); if (error) { goto cleanup; }
if (error) {
last_dup_key = primary_key;
goto cleanup; if (use_put_multiple) {
dbs[0] = share->key_file[primary_key];
key_dbts[0] = prim_key;
rec_dbts[0] = prim_row;
mult_put_flags[0] = primary_key_changed ? DB_NOOVERWRITE : DB_YESOVERWRITE;
} }
else {
u_int32_t put_flags = primary_key_changed ? DB_NOOVERWRITE : DB_YESOVERWRITE;
error = share->file->put(share->file, txn, &prim_key, &prim_row, put_flags);
if (error) {
last_dup_key = primary_key;
goto cleanup;
}
}
curr_db_index = 1;
// Update all other keys // Update all other keys
for (uint keynr = 0; keynr < table_share->keys; keynr++) { for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool secondary_key_changed = key_cmp(keynr, old_row, new_row); bool secondary_key_changed = key_cmp(keynr, old_row, new_row);
...@@ -3418,7 +3412,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3418,7 +3412,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
primary_key_changed primary_key_changed
) )
{ {
u_int32_t put_flags;
bool is_unique_key = table->key_info[keynr].flags & HA_NOSAME; bool is_unique_key = table->key_info[keynr].flags & HA_NOSAME;
// //
// only remove the old value if the key has changed // only remove the old value if the key has changed
...@@ -3431,7 +3424,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3431,7 +3424,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
goto cleanup; goto cleanup;
} }
} }
create_dbt_key_from_table(&key, keynr, key_buff2, new_row, &has_null);
// //
// if unique key, check uniqueness constraint // if unique key, check uniqueness constraint
...@@ -3448,39 +3440,64 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3448,39 +3440,64 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
goto cleanup; goto cleanup;
} }
} }
put_flags = DB_YESOVERWRITE;
if (table->key_info[keynr].flags & HA_CLUSTERING) { if (!use_put_multiple) {
error = pack_row(&row, (const uchar *) new_row, keynr); u_int32_t put_flags;
if (error){ goto cleanup; } put_flags = DB_YESOVERWRITE;
create_dbt_key_from_table(&key, keynr, key_buff2, new_row, &has_null);
if (table->key_info[keynr].flags & HA_CLUSTERING) {
error = pack_row(&row, (const uchar *) new_row, keynr);
if (error){ goto cleanup; }
}
else {
bzero((void *) &row, sizeof(row));
}
//
// make sure that for clustering keys, we are using DB_YESOVERWRITE,
// therefore making this put an overwrite if the key has not changed
//
error = share->key_file[keynr]->put(
share->key_file[keynr],
txn,
&key,
&row,
put_flags
);
//
// We break if we hit an error, unless it is a dup key error
// and MySQL told us to ignore duplicate key errors
//
if (error) {
last_dup_key = keynr;
goto cleanup;
}
} }
else { else {
bzero((void *) &row, sizeof(row)); dbs[curr_db_index] = share->key_file[keynr];
} key_dbts[curr_db_index] = mult_key_dbt[keynr];
// rec_dbts[curr_db_index] = mult_rec_dbt[keynr];
// make sure that for clustering keys, we are using DB_YESOVERWRITE, curr_db_index++;
// therefore making this put an overwrite if the key has not changed
//
error = share->key_file[keynr]->put(
share->key_file[keynr],
txn,
&key,
&row,
put_flags
);
//
// We break if we hit an error, unless it is a dup key error
// and MySQL told us to ignore duplicate key errors
//
if (error) {
last_dup_key = keynr;
goto cleanup;
} }
} }
} }
if (use_put_multiple) {
error = db_env->put_multiple(
db_env,
NULL,
txn,
&prim_key,
&prim_row,
curr_db_index,
dbs,
key_dbts,
rec_dbts,
mult_put_flags,
NULL
);
}
if (!error) { if (!error) {
trx->stmt_progress.updated++; trx->stmt_progress.updated++;
track_progress(thd); track_progress(thd);
......
...@@ -264,7 +264,6 @@ private: ...@@ -264,7 +264,6 @@ private:
int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key); int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key);
int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key); int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key);
int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row); int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row);
int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key);
int handle_cursor_error(int error, int err_to_return, uint keynr); int handle_cursor_error(int error, int err_to_return, uint keynr);
DBT *get_pos(DBT * to, uchar * pos); DBT *get_pos(DBT * to, uchar * pos);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment