Commit cfded943 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:3015], make handlerton changes for update_multiple usage

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@25474 c7de825b-a66e-492c-adef-691d508d4ae1
parent df6db990
...@@ -322,11 +322,6 @@ typedef struct index_read_info { ...@@ -322,11 +322,6 @@ typedef struct index_read_info {
DBT* orig_key; DBT* orig_key;
} *INDEX_READ_INFO; } *INDEX_READ_INFO;
typedef struct row_buffers {
uchar** key_buff;
uchar** rec_buff;
} *ROW_BUFFERS;
int poll_fun(void *extra, float progress) { int poll_fun(void *extra, float progress) {
LOADER_CONTEXT context = (LOADER_CONTEXT)extra; LOADER_CONTEXT context = (LOADER_CONTEXT)extra;
...@@ -1167,6 +1162,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t ...@@ -1167,6 +1162,7 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
HA_FILE_BASED | HA_AUTO_PART_KEY | HA_TABLE_SCAN_ON_INDEX; HA_FILE_BASED | HA_AUTO_PART_KEY | HA_TABLE_SCAN_ON_INDEX;
alloc_ptr = NULL; alloc_ptr = NULL;
rec_buff = NULL; rec_buff = NULL;
rec_update_buff = NULL;
transaction = NULL; transaction = NULL;
cursor = NULL; cursor = NULL;
fixed_cols_for_query = NULL; fixed_cols_for_query = NULL;
...@@ -1711,18 +1707,27 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { ...@@ -1711,18 +1707,27 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
alloced_rec_buff_length = table_share->rec_buff_length + table_share->fields; alloced_rec_buff_length = table_share->rec_buff_length + table_share->fields;
rec_buff = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME)); rec_buff = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME));
if (rec_buff == NULL) { if (rec_buff == NULL) {
ret_val = 1; ret_val = 1;
goto exit; goto exit;
} }
for (u_int32_t i = 0; i < curr_num_DBs; i++) { alloced_update_rec_buff_length = alloced_rec_buff_length;
rec_update_buff = (uchar *) my_malloc(alloced_update_rec_buff_length, MYF(MY_WME));
if (rec_update_buff == NULL) {
ret_val = 1;
goto exit;
}
for (u_int32_t i = 0; i < sizeof(mult_key_buff)/sizeof(mult_key_buff[0]); i++) {
mult_key_buff[i] = (uchar *)my_malloc(max_key_length, MYF(MY_WME)); mult_key_buff[i] = (uchar *)my_malloc(max_key_length, MYF(MY_WME));
assert(mult_key_buff[i] != NULL); assert(mult_key_buff[i] != NULL);
mult_key_dbt[i].ulen = max_key_length; mult_key_dbt[i].ulen = max_key_length;
mult_key_dbt[i].flags = DB_DBT_USERMEM; mult_key_dbt[i].flags = DB_DBT_USERMEM;
mult_key_dbt[i].data = mult_key_buff[i]; mult_key_dbt[i].data = mult_key_buff[i];
}
for (u_int32_t i = 0; i < curr_num_DBs; i++) {
if (table_share->key_info[i].flags & HA_CLUSTERING) { if (table_share->key_info[i].flags & HA_CLUSTERING) {
mult_rec_buff[i] = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME)); mult_rec_buff[i] = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME));
assert(mult_rec_buff[i]); assert(mult_rec_buff[i]);
...@@ -1774,10 +1779,14 @@ exit: ...@@ -1774,10 +1779,14 @@ exit:
alloc_ptr = NULL; alloc_ptr = NULL;
my_free(rec_buff, MYF(MY_ALLOW_ZERO_PTR)); my_free(rec_buff, MYF(MY_ALLOW_ZERO_PTR));
rec_buff = NULL; rec_buff = NULL;
my_free(rec_update_buff, MYF(MY_ALLOW_ZERO_PTR));
rec_update_buff = NULL;
for (u_int32_t i = 0; i < curr_num_DBs; i++) { for (u_int32_t i = 0; i < curr_num_DBs; i++) {
my_free(mult_key_buff[i], MYF(MY_ALLOW_ZERO_PTR));
my_free(mult_rec_buff[i], MYF(MY_ALLOW_ZERO_PTR)); my_free(mult_rec_buff[i], MYF(MY_ALLOW_ZERO_PTR));
} }
for (u_int32_t i = 0; i < sizeof(mult_key_buff)/sizeof(mult_key_buff[0]); i++) {
my_free(mult_key_buff[i], MYF(MY_ALLOW_ZERO_PTR));
}
if (error) { if (error) {
my_errno = error; my_errno = error;
...@@ -1998,13 +2007,17 @@ int ha_tokudb::__close(int mutex_is_locked) { ...@@ -1998,13 +2007,17 @@ int ha_tokudb::__close(int mutex_is_locked) {
if (tokudb_debug & TOKUDB_DEBUG_OPEN) if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("close:%p\n", this); TOKUDB_TRACE("close:%p\n", this);
my_free(rec_buff, MYF(MY_ALLOW_ZERO_PTR)); my_free(rec_buff, MYF(MY_ALLOW_ZERO_PTR));
my_free(rec_update_buff, MYF(MY_ALLOW_ZERO_PTR));
my_free(blob_buff, MYF(MY_ALLOW_ZERO_PTR)); my_free(blob_buff, MYF(MY_ALLOW_ZERO_PTR));
my_free(alloc_ptr, MYF(MY_ALLOW_ZERO_PTR)); my_free(alloc_ptr, MYF(MY_ALLOW_ZERO_PTR));
for (u_int32_t i = 0; i < (table_share->keys + test(hidden_primary_key)); i++) { for (u_int32_t i = 0; i < sizeof(mult_key_buff)/sizeof(mult_key_buff[0]); i++) {
my_free(mult_key_buff[i], MYF(MY_ALLOW_ZERO_PTR)); my_free(mult_key_buff[i], MYF(MY_ALLOW_ZERO_PTR));
}
for (u_int32_t i = 0; i < (table_share->keys + test(hidden_primary_key)); i++) {
my_free(mult_rec_buff[i], MYF(MY_ALLOW_ZERO_PTR)); my_free(mult_rec_buff[i], MYF(MY_ALLOW_ZERO_PTR));
} }
rec_buff = NULL; rec_buff = NULL;
rec_update_buff = NULL;
alloc_ptr = NULL; alloc_ptr = NULL;
ha_tokudb::reset(); ha_tokudb::reset();
TOKUDB_DBUG_RETURN(free_share(share, mutex_is_locked)); TOKUDB_DBUG_RETURN(free_share(share, mutex_is_locked));
...@@ -2027,6 +2040,24 @@ bool ha_tokudb::fix_rec_buff_for_blob(ulong length) { ...@@ -2027,6 +2040,24 @@ bool ha_tokudb::fix_rec_buff_for_blob(ulong length) {
return 0; return 0;
} }
//
// Reallocate record buffer (rec_buff) if needed
// If not needed, does nothing
// Parameters:
// length - size of buffer required for rec_buff
//
bool ha_tokudb::fix_rec_update_buff_for_blob(ulong length) {
if (!rec_update_buff || (length > alloced_update_rec_buff_length)) {
uchar *newptr;
if (!(newptr = (uchar *) my_realloc((void *) rec_update_buff, length, MYF(MY_ALLOW_ZERO_PTR))))
return 1;
rec_update_buff= newptr;
alloced_update_rec_buff_length = length;
}
return 0;
}
void ha_tokudb::fix_mult_rec_buff() { void ha_tokudb::fix_mult_rec_buff() {
if (alloced_rec_buff_length > alloced_mult_rec_buff_length) { if (alloced_rec_buff_length > alloced_mult_rec_buff_length) {
for (uint i = 0; i < table_share->keys; i++) { for (uint i = 0; i < table_share->keys; i++) {
...@@ -2073,10 +2104,11 @@ ulong ha_tokudb::max_row_length(const uchar * buf) { ...@@ -2073,10 +2104,11 @@ ulong ha_tokudb::max_row_length(const uchar * buf) {
// [in] record - row in MySQL format // [in] record - row in MySQL format
// //
int ha_tokudb::pack_row( int ha_tokudb::pack_row_in_buff(
DBT * row, DBT * row,
const uchar* record, const uchar* record,
uint index uint index,
uchar* row_buff
) )
{ {
uchar* fixed_field_ptr = NULL; uchar* fixed_field_ptr = NULL;
...@@ -2088,17 +2120,10 @@ int ha_tokudb::pack_row( ...@@ -2088,17 +2120,10 @@ int ha_tokudb::pack_row(
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set); my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
if (table_share->blob_fields) {
if (fix_rec_buff_for_blob(max_row_length(record))) {
r = HA_ERR_OUT_OF_MEM;
goto cleanup;
}
}
/* Copy null bits */ /* Copy null bits */
memcpy(rec_buff, record, table_share->null_bytes); memcpy(row_buff, record, table_share->null_bytes);
fixed_field_ptr = rec_buff + table_share->null_bytes; fixed_field_ptr = row_buff + table_share->null_bytes;
var_field_offset_ptr = fixed_field_ptr + share->kc_info.mcp_info[index].var_len_offset; var_field_offset_ptr = fixed_field_ptr + share->kc_info.mcp_info[index].var_len_offset;
start_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[index].len_of_offsets; start_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[index].len_of_offsets;
var_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[index].len_of_offsets; var_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[index].len_of_offsets;
...@@ -2141,8 +2166,8 @@ int ha_tokudb::pack_row( ...@@ -2141,8 +2166,8 @@ int ha_tokudb::pack_row(
); );
} }
row->data = rec_buff; row->data = row_buff;
row->size = (size_t) (var_field_data_ptr - rec_buff); row->size = (size_t) (var_field_data_ptr - row_buff);
r = 0; r = 0;
cleanup: cleanup:
...@@ -2151,6 +2176,25 @@ cleanup: ...@@ -2151,6 +2176,25 @@ cleanup:
} }
int ha_tokudb::pack_row(
DBT * row,
const uchar* record,
uint index
)
{
return pack_row_in_buff(row,record,index,rec_buff);
}
int ha_tokudb::pack_old_row_for_update(
DBT * row,
const uchar* record,
uint index
)
{
return pack_row_in_buff(row,record,index,rec_update_buff);
}
int ha_tokudb::unpack_blobs( int ha_tokudb::unpack_blobs(
uchar* record, uchar* record,
const uchar* from_tokudb_blob, const uchar* from_tokudb_blob,
...@@ -3472,7 +3516,6 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN ...@@ -3472,7 +3516,6 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN
int error = 0; int error = 0;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key); uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
ulonglong wait_lock_time = get_write_lock_wait_time(thd); ulonglong wait_lock_time = get_write_lock_wait_time(thd);
u_int32_t mult_put_flags[MAX_KEY + 1] = {DB_YESOVERWRITE};
set_main_dict_put_flags(thd, &mult_put_flags[primary_key]); set_main_dict_put_flags(thd, &mult_put_flags[primary_key]);
if (mult_put_flags[primary_key] == DB_NOOVERWRITE_NO_ERROR) { if (mult_put_flags[primary_key] == DB_NOOVERWRITE_NO_ERROR) {
...@@ -3486,7 +3529,7 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN ...@@ -3486,7 +3529,7 @@ int ha_tokudb::insert_rows_to_dictionaries_mult(DBT* pk_key, DBT* pk_val, DB_TXN
lockretryN(wait_lock_time){ lockretryN(wait_lock_time){
error = db_env->put_multiple( error = db_env->put_multiple(
db_env, db_env,
NULL, share->key_file[primary_key],
txn, txn,
pk_key, pk_key,
pk_val, pk_val,
...@@ -3681,7 +3724,7 @@ int ha_tokudb::key_cmp(uint keynr, const uchar * old_row, const uchar * new_row) ...@@ -3681,7 +3724,7 @@ int ha_tokudb::key_cmp(uint keynr, const uchar * old_row, const uchar * new_row)
// //
int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
TOKUDB_DBUG_ENTER("update_row"); TOKUDB_DBUG_ENTER("update_row");
DBT prim_key, old_prim_key, prim_row; DBT prim_key, old_prim_key, prim_row, old_prim_row;
int error; int error;
bool primary_key_changed; bool primary_key_changed;
bool has_null; bool has_null;
...@@ -3689,19 +3732,14 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3689,19 +3732,14 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
DB_TXN* sub_trans = NULL; DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot); tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
u_int32_t mult_put_flags[MAX_KEY + 1] = {DB_YESOVERWRITE}; uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
DB* dbs[MAX_KEY + 1];
DBT key_dbts[MAX_KEY + 1];
DBT rec_dbts[MAX_KEY + 1];
u_int32_t curr_db_index;
ulonglong wait_lock_time = get_write_lock_wait_time(thd); ulonglong wait_lock_time = get_write_lock_wait_time(thd);
LINT_INIT(error); LINT_INIT(error);
bzero((void *) &prim_key, sizeof(prim_key)); bzero((void *) &prim_key, sizeof(prim_key));
bzero((void *) &old_prim_key, sizeof(old_prim_key)); bzero((void *) &old_prim_key, sizeof(old_prim_key));
bzero((void *) &prim_row, sizeof(prim_row)); bzero((void *) &prim_row, sizeof(prim_row));
bzero((void *) &key_dbts, sizeof(key_dbts)); bzero((void *) &old_prim_row, sizeof(old_prim_row));
bzero((void *) &rec_dbts, sizeof(rec_dbts));
statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status);
...@@ -3757,82 +3795,65 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -3757,82 +3795,65 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
} }
} }
if (primary_key_changed) { //
// Primary key changed or we are updating a key that can have duplicates. // do uniqueness checks
// Delete the old row and add a new one //
error = remove_key(txn, primary_key, old_row, &old_prim_key); if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
if (error) { goto cleanup; }
}
error = pack_row(&prim_row, new_row, primary_key);
if (error) { goto cleanup; }
dbs[0] = share->key_file[primary_key];
key_dbts[0] = prim_key;
rec_dbts[0] = prim_row;
mult_put_flags[0] = primary_key_changed ? DB_NOOVERWRITE : DB_YESOVERWRITE;
curr_db_index = 1;
// Update all other keys
for (uint keynr = 0; keynr < table_share->keys; keynr++) { for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool secondary_key_changed = key_cmp(keynr, old_row, new_row); bool is_unique_key = table->key_info[keynr].flags & HA_NOSAME;
if (keynr == primary_key) { if (keynr == primary_key) {
continue; continue;
} }
if (table->key_info[keynr].flags & HA_CLUSTERING || if (is_unique_key) {
secondary_key_changed || bool key_changed = key_cmp(keynr, old_row, new_row);
primary_key_changed if (key_changed) {
) bool is_unique;
{
bool is_unique_key = table->key_info[keynr].flags & HA_NOSAME;
//
// only remove the old value if the key has changed
// if the key has not changed (in case of clustering keys,
// then we overwrite the old value)
//
if (secondary_key_changed || primary_key_changed) {
error = remove_key(txn, keynr, old_row, &old_prim_key);
if (error) {
goto cleanup;
}
}
//
// if unique key, check uniqueness constraint
// but, we do not need to check it if the key has a null
// and we do not need to check it if unique_checks is off
//
if (is_unique_key && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
bool is_unique = false;
error = is_val_unique(&is_unique, new_row, &table->key_info[keynr], keynr, txn); error = is_val_unique(&is_unique, new_row, &table->key_info[keynr], keynr, txn);
if (error) { goto cleanup; } if (error) goto cleanup;
if (!is_unique) { if (!is_unique) {
error = DB_KEYEXIST; error = DB_KEYEXIST;
last_dup_key = keynr; last_dup_key = keynr;
goto cleanup; goto cleanup;
} }
} }
}
}
}
dbs[curr_db_index] = share->key_file[keynr]; if (table_share->blob_fields) {
key_dbts[curr_db_index] = mult_key_dbt[keynr]; if (fix_rec_buff_for_blob(max_row_length(new_row))) {
rec_dbts[curr_db_index] = mult_rec_dbt[keynr]; error = HA_ERR_OUT_OF_MEM;
curr_db_index++; goto cleanup;
}
if (fix_rec_update_buff_for_blob(max_row_length(old_row))) {
error = HA_ERR_OUT_OF_MEM;
goto cleanup;
} }
} }
error = pack_row(&prim_row, new_row, primary_key);
if (error) { goto cleanup; }
error = pack_old_row_for_update(&old_prim_row, old_row, primary_key);
if (error) { goto cleanup; }
set_main_dict_put_flags(thd, &mult_put_flags[primary_key]);
lockretryN(wait_lock_time){ lockretryN(wait_lock_time){
error = db_env->put_multiple( error = db_env->update_multiple(
db_env, db_env,
NULL, share->key_file[primary_key],
txn, txn,
&old_prim_key,
&old_prim_row,
&prim_key, &prim_key,
&prim_row, &prim_row,
curr_db_index, curr_num_DBs,
dbs, share->key_file,
key_dbts,
rec_dbts,
mult_put_flags, mult_put_flags,
2*curr_num_DBs,
mult_key_dbt,
curr_num_DBs,
mult_rec_dbt,
NULL NULL
); );
lockretry_wait; lockretry_wait;
...@@ -3939,7 +3960,7 @@ int ha_tokudb::delete_row(const uchar * record) { ...@@ -3939,7 +3960,7 @@ int ha_tokudb::delete_row(const uchar * record) {
lockretryN(wait_lock_time){ lockretryN(wait_lock_time){
error = db_env->del_multiple( error = db_env->del_multiple(
db_env, db_env,
NULL, share->key_file[primary_key],
transaction, transaction,
&prim_key, &prim_key,
&row, &row,
......
...@@ -154,6 +154,11 @@ private: ...@@ -154,6 +154,11 @@ private:
// number of bytes allocated in rec_buff // number of bytes allocated in rec_buff
// //
ulong alloced_rec_buff_length; ulong alloced_rec_buff_length;
//
// same as above two, but for updates
//
uchar *rec_update_buff;
ulong alloced_update_rec_buff_length;
u_int32_t max_key_length; u_int32_t max_key_length;
// //
// buffer used to temporarily store a "packed key" // buffer used to temporarily store a "packed key"
...@@ -180,9 +185,9 @@ private: ...@@ -180,9 +185,9 @@ private:
// //
// individual key buffer for each index // individual key buffer for each index
// //
uchar* mult_key_buff[MAX_KEY + 1]; uchar* mult_key_buff[2*(MAX_KEY + 1)];
uchar* mult_rec_buff[MAX_KEY + 1]; uchar* mult_rec_buff[MAX_KEY + 1];
DBT mult_key_dbt[MAX_KEY + 1]; DBT mult_key_dbt[2*(MAX_KEY + 1)];
DBT mult_rec_dbt[MAX_KEY + 1]; DBT mult_rec_dbt[MAX_KEY + 1];
u_int32_t mult_put_flags[MAX_KEY + 1]; u_int32_t mult_put_flags[MAX_KEY + 1];
u_int32_t mult_del_flags[MAX_KEY + 1]; u_int32_t mult_del_flags[MAX_KEY + 1];
...@@ -278,15 +283,27 @@ private: ...@@ -278,15 +283,27 @@ private:
int loader_error; int loader_error;
bool fix_rec_buff_for_blob(ulong length); bool fix_rec_buff_for_blob(ulong length);
bool fix_rec_update_buff_for_blob(ulong length);
void fix_mult_rec_buff(); void fix_mult_rec_buff();
uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH]; uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH];
ulong max_row_length(const uchar * buf); ulong max_row_length(const uchar * buf);
int pack_row_in_buff(
DBT * row,
const uchar* record,
uint index,
uchar* row_buff
);
int pack_row( int pack_row(
DBT * row, DBT * row,
const uchar* record, const uchar* record,
uint index uint index
); );
int pack_old_row_for_update(
DBT * row,
const uchar* record,
uint index
);
u_int32_t place_key_into_mysql_buff(KEY* key_info, uchar * record, uchar* data); u_int32_t place_key_into_mysql_buff(KEY* key_info, uchar * record, uchar* data);
void unpack_key(uchar * record, DBT const *key, uint index); void unpack_key(uchar * record, DBT const *key, uint index);
u_int32_t place_key_into_dbt_buff(KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length); u_int32_t place_key_into_dbt_buff(KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment