Commit 2df17557 authored by Rich Prohaska's avatar Rich Prohaska

#133 dont overlock key ranges for unique secondary keys

parent fceffa57
set default_storage_engine=tokudb;
drop table if exists t;
create table t (id int, unique key(id));
begin;
insert into t values (1);
begin;
insert into t values (2);
commit;
commit;
select * from t;
id
1
2
drop table if exists t;
create table t (id int not null, unique key(id));
begin;
insert into t values (1);
begin;
insert into t values (2);
commit;
commit;
select * from t;
id
1
2
drop table if exists t;
set default_storage_engine=tokudb;
drop table if exists t;
create table t (id int, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (5);
begin;
insert into t values (6);
commit;
commit;
select * from t;
id
5
6
10
100
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (5);
begin;
insert into t values (6);
commit;
commit;
select * from t;
id
5
6
10
100
drop table if exists t;
set default_storage_engine=tokudb;
drop table if exists t;
create table t (id int, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (50);
begin;
insert into t values (60);
commit;
commit;
select * from t;
id
10
50
60
100
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (50);
begin;
insert into t values (60);
commit;
commit;
select * from t;
id
10
50
60
100
drop table if exists t;
set default_storage_engine=tokudb;
drop table if exists t;
create table t (id int, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (500);
begin;
insert into t values (600);
commit;
commit;
select * from t;
id
10
100
500
600
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
begin;
insert into t values (500);
begin;
insert into t values (600);
commit;
commit;
select * from t;
id
10
100
500
600
drop table if exists t;
# test case for overlocking unique secondary keys
source include/have_tokudb.inc;
set default_storage_engine=tokudb;
disable_warnings;
drop table if exists t;
enable_warnings;
create table t (id int, unique key(id));
connect(c1,localhost,root,,);
begin;
insert into t values (1);
connect(c2,localhost,root,,);
begin;
insert into t values (2);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
create table t (id int not null, unique key(id));
connect(c1,localhost,root,,);
begin;
insert into t values (1);
connect(c2,localhost,root,,);
begin;
insert into t values (2);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
# test case for overlocking unique secondary keys
source include/have_tokudb.inc;
set default_storage_engine=tokudb;
disable_warnings;
drop table if exists t;
enable_warnings;
create table t (id int, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (5);
connect(c2,localhost,root,,);
begin;
insert into t values (6);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (5);
connect(c2,localhost,root,,);
begin;
insert into t values (6);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
# test case for overlocking unique secondary keys
source include/have_tokudb.inc;
set default_storage_engine=tokudb;
disable_warnings;
drop table if exists t;
enable_warnings;
create table t (id int, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (50);
connect(c2,localhost,root,,);
begin;
insert into t values (60);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (50);
connect(c2,localhost,root,,);
begin;
insert into t values (60);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
# test case for overlocking unique secondary keys
source include/have_tokudb.inc;
set default_storage_engine=tokudb;
disable_warnings;
drop table if exists t;
enable_warnings;
create table t (id int, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (500);
connect(c2,localhost,root,,);
begin;
insert into t values (600);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
create table t (id int not null, unique key(id));
insert into t values (10),(100);
connect(c1,localhost,root,,);
begin;
insert into t values (500);
connect(c2,localhost,root,,);
begin;
insert into t values (600);
connection c1;
commit;
connection c2;
commit;
connection default;
select * from t;
disconnect c1;
disconnect c2;
drop table if exists t;
......@@ -2724,7 +2724,8 @@ DBT* ha_tokudb::create_dbt_key_from_key(
const uchar * record,
bool* has_null,
bool dont_pack_pk,
int key_length
int key_length,
uint8_t inf_byte
)
{
uint32_t size = 0;
......@@ -2739,7 +2740,7 @@ DBT* ha_tokudb::create_dbt_key_from_key(
// from a row, there is no way that columns can be missing, so in practice,
// this will be meaningless. Might as well put in a value
//
*tmp_buff++ = COL_ZERO;
*tmp_buff++ = inf_byte;
size++;
size += place_key_into_dbt_buff(
key_info,
......@@ -2805,7 +2806,7 @@ DBT *ha_tokudb::create_dbt_key_from_table(
*has_null = false;
DBUG_RETURN(key);
}
DBUG_RETURN(create_dbt_key_from_key(key, &table->key_info[keynr],buff,record, has_null, (keynr == primary_key), key_length));
DBUG_RETURN(create_dbt_key_from_key(key, &table->key_info[keynr],buff,record, has_null, (keynr == primary_key), key_length, COL_ZERO));
}
DBT* ha_tokudb::create_dbt_key_for_lookup(
......@@ -2818,13 +2819,12 @@ DBT* ha_tokudb::create_dbt_key_for_lookup(
)
{
TOKUDB_HANDLER_DBUG_ENTER("");
DBT* ret = create_dbt_key_from_key(key, key_info, buff, record, has_null, true, key_length);
// override the infinity byte, needed in case the pk is a string
// to make sure that the cursor that uses this key properly positions
// it at the right location. If the table stores "D", but we look up for "d",
// and the infinity byte is 0, then we will skip the "D", because
// in bytes, "d" > "D".
buff[0] = COL_NEG_INF;
DBT* ret = create_dbt_key_from_key(key, key_info, buff, record, has_null, true, key_length, COL_NEG_INF);
DBUG_RETURN(ret);
}
......@@ -3563,62 +3563,56 @@ int ha_tokudb::is_index_unique(bool* is_unique, DB_TXN* txn, DB* db, KEY* key_in
}
int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn) {
DBT key;
int error = 0;
bool has_null;
DBC* tmp_cursor = NULL;
struct index_read_info ir_info;
struct smart_dbt_info info;
memset((void *)&key, 0, sizeof(key));
info.ha = this;
info.buf = NULL;
info.keynr = dict_index;
ir_info.smart_dbt_info = info;
create_dbt_key_for_lookup(
&key,
key_info,
key_buff3,
record,
&has_null
);
ir_info.orig_key = &key;
DBT key; memset((void *)&key, 0, sizeof(key));
create_dbt_key_from_key(&key, key_info, key_buff2, record, &has_null, true, MAX_KEY_LENGTH, COL_NEG_INF);
if (has_null) {
error = 0;
*is_unique = true;
goto cleanup;
}
error = share->key_file[dict_index]->cursor(
share->key_file[dict_index],
txn,
&tmp_cursor,
DB_SERIALIZABLE
);
if (error) { goto cleanup; }
error = share->key_file[dict_index]->cursor(share->key_file[dict_index], txn, &tmp_cursor, DB_SERIALIZABLE | DB_RMW);
if (error) {
goto cleanup;
} else {
// prelock (key,-inf),(key,+inf) so that the subsequent key lookup does not overlock
uint flags = 0;
DBT key_right; memset(&key_right, 0, sizeof key_right);
create_dbt_key_from_key(&key_right, key_info, key_buff3, record, &has_null, true, MAX_KEY_LENGTH, COL_POS_INF);
error = tmp_cursor->c_set_bounds(tmp_cursor, &key, &key_right, true, DB_NOTFOUND);
if (error == 0) {
flags = DB_PRELOCKED | DB_PRELOCKED_WRITE;
}
error = tmp_cursor->c_getf_set_range(
tmp_cursor,
0,
&key,
smart_dbt_callback_lookup,
&ir_info
);
if (error == DB_NOTFOUND) {
*is_unique = true;
error = 0;
goto cleanup;
}
else if (error) {
goto cleanup;
}
if (ir_info.cmp) {
*is_unique = true;
}
else {
*is_unique = false;
// lookup key and check unique prefix
struct smart_dbt_info info;
info.ha = this;
info.buf = NULL;
info.keynr = dict_index;
struct index_read_info ir_info;
ir_info.orig_key = &key;
ir_info.smart_dbt_info = info;
error = tmp_cursor->c_getf_set_range(tmp_cursor, flags, &key, smart_dbt_callback_lookup, &ir_info);
if (error == DB_NOTFOUND) {
*is_unique = true;
error = 0;
goto cleanup;
}
else if (error) {
goto cleanup;
}
if (ir_info.cmp) {
*is_unique = true;
}
else {
*is_unique = false;
}
}
error = 0;
......
......@@ -419,7 +419,7 @@ class ha_tokudb : public handler {
uint32_t place_key_into_mysql_buff(KEY* key_info, uchar * record, uchar* data);
void unpack_key(uchar * record, DBT const *key, uint index);
uint32_t place_key_into_dbt_buff(KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, bool dont_pack_pk, int key_length = MAX_KEY_LENGTH);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, bool dont_pack_pk, int key_length, uint8_t inf_byte);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT* create_dbt_key_for_lookup(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length, int8_t inf_byte);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment