Commit 45f58be0 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1058

get ha_tokudb to properly support keys with NULL values in unique indexes
still need to write an automated test

git-svn-id: file:///svn/mysql/tokudb-engine/src@5589 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3105884e
...@@ -1050,14 +1050,15 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i ...@@ -1050,14 +1050,15 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key); (*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
my_free(cmp_byte_stream.data, MYF(0)); my_free(cmp_byte_stream.data, MYF(0));
} }
else else {
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key); (*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
if (!(key_info->flags & HA_NOSAME)) {
DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %s\n", key_info->name));
(*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT);
(*ptr)->api_internal = share->file->app_private;
(*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key);
} }
DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %s\n", key_info->name));
(*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT);
(*ptr)->api_internal = share->file->app_private;
(*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key);
if ((error = (*ptr)->open(*ptr, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) { if ((error = (*ptr)->open(*ptr, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
my_errno = error; my_errno = error;
goto cleanup; goto cleanup;
...@@ -1653,12 +1654,13 @@ void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) { ...@@ -1653,12 +1654,13 @@ void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) {
// the parameter key // the parameter key
// //
DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length) { DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length) {
KEY_PART_INFO *key_part = key_info->key_part; KEY_PART_INFO *key_part = key_info->key_part;
KEY_PART_INFO *end = key_part + key_info->key_parts; KEY_PART_INFO *end = key_part + key_info->key_parts;
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set); my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
key->data = buff; key->data = buff;
*has_null = false;
for (; key_part != end && key_length > 0; key_part++) { for (; key_part != end && key_length > 0; key_part++) {
// //
// accessing key_part->field->null_bit instead off key_part->null_bit // accessing key_part->field->null_bit instead off key_part->null_bit
...@@ -1669,6 +1671,7 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, ...@@ -1669,6 +1671,7 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff,
/* Store 0 if the key part is a NULL part */ /* Store 0 if the key part is a NULL part */
if (record[key_part->null_offset] & key_part->field->null_bit) { if (record[key_part->null_offset] & key_part->field->null_bit) {
*buff++ = 0; *buff++ = 0;
*has_null = true;
// //
// fractal tree does not handle this falg at the moment // fractal tree does not handle this falg at the moment
// so commenting out for now // so commenting out for now
...@@ -1711,15 +1714,16 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, ...@@ -1711,15 +1714,16 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff,
// Returns: // Returns:
// the parameter key // the parameter key
// //
DBT *ha_tokudb::create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length) { DBT *ha_tokudb::create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length) {
TOKUDB_DBUG_ENTER("ha_tokudb::create_dbt_key_from_table"); TOKUDB_DBUG_ENTER("ha_tokudb::create_dbt_key_from_table");
bzero((void *) key, sizeof(*key)); bzero((void *) key, sizeof(*key));
if (hidden_primary_key && keynr == primary_key) { if (hidden_primary_key && keynr == primary_key) {
key->data = current_ident; key->data = current_ident;
key->size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH; key->size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
*has_null = false;
DBUG_RETURN(key); DBUG_RETURN(key);
} }
DBUG_RETURN(create_dbt_key_from_key(key, &table->key_info[keynr],buff,record,key_length)); DBUG_RETURN(create_dbt_key_from_key(key, &table->key_info[keynr],buff,record, has_null, key_length));
} }
...@@ -1977,6 +1981,7 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -1977,6 +1981,7 @@ int ha_tokudb::write_row(uchar * record) {
int error; int error;
THD *thd = NULL; THD *thd = NULL;
u_int32_t put_flags; u_int32_t put_flags;
bool has_null;
// //
// some crap that needs to be done because MySQL does not properly abstract // some crap that needs to be done because MySQL does not properly abstract
...@@ -2005,7 +2010,13 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2005,7 +2010,13 @@ int ha_tokudb::write_row(uchar * record) {
if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) { if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
put_flags = DB_YESOVERWRITE; put_flags = DB_YESOVERWRITE;
} }
error = share->file->put(share->file, transaction, create_dbt_key_from_table(&prim_key, primary_key, key_buff, record), &row, put_flags); error = share->file->put(
share->file,
transaction,
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null),
&row,
put_flags
);
if (error) { if (error) {
last_dup_key = primary_key; last_dup_key = primary_key;
goto cleanup; goto cleanup;
...@@ -2018,13 +2029,15 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2018,13 +2029,15 @@ int ha_tokudb::write_row(uchar * record) {
continue; continue;
} }
put_flags = share->key_type[keynr]; put_flags = share->key_type[keynr];
if (put_flags == DB_NOOVERWRITE && thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) { create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null);
if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) {
put_flags = DB_YESOVERWRITE; put_flags = DB_YESOVERWRITE;
} }
error = share->key_file[keynr]->put( error = share->key_file[keynr]->put(
share->key_file[keynr], share->key_file[keynr],
transaction, transaction,
create_dbt_key_from_table(&key, keynr, key_buff2, record), &key,
&prim_key, &prim_key,
put_flags put_flags
); );
...@@ -2124,6 +2137,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2124,6 +2137,8 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
DBT prim_key, key, old_prim_key; DBT prim_key, key, old_prim_key;
int error; int error;
bool primary_key_changed; bool primary_key_changed;
bool has_null;
THD* thd = ha_thd();
LINT_INIT(error); LINT_INIT(error);
statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status);
...@@ -2139,9 +2154,9 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2139,9 +2154,9 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
old_prim_key = prim_key; old_prim_key = prim_key;
} }
else { else {
create_dbt_key_from_table(&prim_key, primary_key, key_buff, new_row); create_dbt_key_from_table(&prim_key, primary_key, key_buff, new_row, &has_null);
if ((primary_key_changed = key_cmp(primary_key, old_row, new_row))) { if ((primary_key_changed = key_cmp(primary_key, old_row, new_row))) {
create_dbt_key_from_table(&old_prim_key, primary_key, primary_key_buff, old_row); create_dbt_key_from_table(&old_prim_key, primary_key, primary_key_buff, old_row, &has_null);
} }
else { else {
old_prim_key = prim_key; old_prim_key = prim_key;
...@@ -2160,15 +2175,20 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2160,15 +2175,20 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
continue; continue;
} }
if (key_cmp(keynr, old_row, new_row) || primary_key_changed) { if (key_cmp(keynr, old_row, new_row) || primary_key_changed) {
u_int32_t put_flags;
if ((error = remove_key(transaction, keynr, old_row, &old_prim_key))) { if ((error = remove_key(transaction, keynr, old_row, &old_prim_key))) {
goto cleanup; goto cleanup;
} }
put_flags = share->key_type[keynr];
if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) {
put_flags = DB_YESOVERWRITE;
}
error = share->key_file[keynr]->put( error = share->key_file[keynr]->put(
share->key_file[keynr], share->key_file[keynr],
transaction, transaction,
create_dbt_key_from_table(&key, keynr, key_buff2, new_row), create_dbt_key_from_table(&key, keynr, key_buff2, new_row, &has_null),
&prim_key, &prim_key,
share->key_type[keynr] put_flags
); );
// //
// We break if we hit an error, unless it is a dup key error // We break if we hit an error, unless it is a dup key error
...@@ -2211,16 +2231,17 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT ...@@ -2211,16 +2231,17 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT
TOKUDB_DBUG_ENTER("ha_tokudb::remove_key"); TOKUDB_DBUG_ENTER("ha_tokudb::remove_key");
int error; int error;
DBT key; DBT key;
bool has_null;
DBUG_PRINT("enter", ("index: %d", keynr)); DBUG_PRINT("enter", ("index: %d", keynr));
DBUG_PRINT("primary", ("index: %d", primary_key)); DBUG_PRINT("primary", ("index: %d", primary_key));
DBUG_DUMP("prim_key", (uchar *) prim_key->data, prim_key->size); DBUG_DUMP("prim_key", (uchar *) prim_key->data, prim_key->size);
if (keynr == active_index && cursor) if (keynr == active_index && cursor)
error = cursor->c_del(cursor, 0); error = cursor->c_del(cursor, 0);
else if (keynr == primary_key || ((table->key_info[keynr].flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME)) { // Unique key else if (keynr == primary_key) { // Unique key
DBUG_PRINT("Unique key", ("index: %d", keynr)); DBUG_PRINT("Unique key", ("index: %d", keynr));
DBUG_ASSERT(keynr == primary_key || prim_key->data != key_buff2); DBUG_ASSERT(keynr == primary_key || prim_key->data != key_buff2);
error = share->key_file[keynr]->del(share->key_file[keynr], trans, keynr == primary_key ? prim_key : create_dbt_key_from_table(&key, keynr, key_buff2, record), 0); error = share->key_file[keynr]->del(share->key_file[keynr], trans, keynr == primary_key ? prim_key : create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null), 0);
} else { } else {
/* QQQ use toku_db_delboth(key_file[keynr], key, val, trans); /* QQQ use toku_db_delboth(key_file[keynr], key, val, trans);
To delete the not duplicated key, we need to open an cursor on the To delete the not duplicated key, we need to open an cursor on the
...@@ -2230,7 +2251,7 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT ...@@ -2230,7 +2251,7 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT
DBUG_ASSERT(keynr != primary_key && prim_key->data != key_buff2); DBUG_ASSERT(keynr != primary_key && prim_key->data != key_buff2);
DBC *tmp_cursor; DBC *tmp_cursor;
if (!(error = share->key_file[keynr]->cursor(share->key_file[keynr], trans, &tmp_cursor, 0))) { if (!(error = share->key_file[keynr]->cursor(share->key_file[keynr], trans, &tmp_cursor, 0))) {
if (!(error = tmp_cursor->c_get(tmp_cursor, create_dbt_key_from_table(&key, keynr, key_buff2, record), prim_key, DB_GET_BOTH))) { if (!(error = tmp_cursor->c_get(tmp_cursor, create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null), prim_key, DB_GET_BOTH))) {
DBUG_DUMP("cget key", (uchar *) key.data, key.size); DBUG_DUMP("cget key", (uchar *) key.data, key.size);
error = tmp_cursor->c_del(tmp_cursor, 0); error = tmp_cursor->c_del(tmp_cursor, 0);
} }
...@@ -2281,9 +2302,10 @@ int ha_tokudb::delete_row(const uchar * record) { ...@@ -2281,9 +2302,10 @@ int ha_tokudb::delete_row(const uchar * record) {
int error = ENOSYS; int error = ENOSYS;
DBT prim_key; DBT prim_key;
key_map keys = table_share->keys_in_use; key_map keys = table_share->keys_in_use;
bool has_null;
statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status);
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record); create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null);
if (hidden_primary_key) { if (hidden_primary_key) {
keys.set_bit(primary_key); keys.set_bit(primary_key);
} }
...@@ -3205,8 +3227,10 @@ void ha_tokudb::position(const uchar * record) { ...@@ -3205,8 +3227,10 @@ void ha_tokudb::position(const uchar * record) {
if (hidden_primary_key) { if (hidden_primary_key) {
DBUG_ASSERT(ref_length == TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH); DBUG_ASSERT(ref_length == TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
memcpy_fixed(ref, (char *) current_ident, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH); memcpy_fixed(ref, (char *) current_ident, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
} else { }
create_dbt_key_from_table(&key, primary_key, ref, record); else {
bool has_null;
create_dbt_key_from_table(&key, primary_key, ref, record, &has_null);
if (key.size < ref_length) if (key.size < ref_length)
bzero(ref + key.size, ref_length - key.size); bzero(ref + key.size, ref_length - key.size);
} }
...@@ -3743,7 +3767,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -3743,7 +3767,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
sprintf(part, "key-%s", form->s->key_info[i].name); sprintf(part, "key-%s", form->s->key_info[i].name);
make_name(newname, name, part); make_name(newname, name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME); fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, (form->key_info[i].flags & HA_NOSAME) ? 0 : DB_DUP + DB_DUPSORT); error = create_sub_table(name_buff, NULL, DB_BTREE, DB_DUP + DB_DUPSORT);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, form->key_info[i].flags, error); TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, form->key_info[i].flags, error);
if (error) { if (error) {
...@@ -4076,6 +4100,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -4076,6 +4100,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
uchar tmp_key_buff[2*table_arg->s->rec_buff_length]; uchar tmp_key_buff[2*table_arg->s->rec_buff_length];
uchar tmp_prim_key_buff[2*table_arg->s->rec_buff_length]; uchar tmp_prim_key_buff[2*table_arg->s->rec_buff_length];
THD* thd = ha_thd();
// //
// number of DB files we have open currently, before add_index is executed // number of DB files we have open currently, before add_index is executed
...@@ -4118,7 +4143,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -4118,7 +4143,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
sprintf(part, "key-%s", key_info[i].name); sprintf(part, "key-%s", key_info[i].name);
make_name(newname, share->table_name, part); make_name(newname, share->table_name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME); fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, (key_info[i].flags & HA_NOSAME) ? 0 : DB_DUP + DB_DUPSORT); error = create_sub_table(name_buff, NULL, DB_BTREE, DB_DUP + DB_DUPSORT);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) { if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, key_info[i].flags, error); TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, key_info[i].flags, error);
} }
...@@ -4203,9 +4228,13 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -4203,9 +4228,13 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
for (uint i = 0; i < num_of_keys; i++) { for (uint i = 0; i < num_of_keys; i++) {
DBT secondary_key; DBT secondary_key;
create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record); bool has_null = false;
create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record, &has_null);
uint curr_index = i + curr_num_DBs; uint curr_index = i + curr_num_DBs;
u_int32_t put_flags = share->key_type[curr_index]; u_int32_t put_flags = share->key_type[curr_index];
if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) {
put_flags = DB_YESOVERWRITE;
}
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &current_primary_key, put_flags); error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &current_primary_key, put_flags);
if (error) { if (error) {
......
...@@ -161,8 +161,8 @@ private: ...@@ -161,8 +161,8 @@ private:
ulong max_row_length(const uchar * buf); ulong max_row_length(const uchar * buf);
int pack_row(DBT * row, const uchar * record); int pack_row(DBT * row, const uchar * record);
void unpack_key(uchar * record, DBT const *key, uint index); void unpack_key(uchar * record, DBT const *key, uint index);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH); DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH); DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length); DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length);
int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key); int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key);
int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys); int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment