Commit 5a7daab5 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1219

move clustering keys to main line

git-svn-id: file:///svn/mysql/tokudb-engine/src@7591 c7de825b-a66e-492c-adef-691d508d4ae1
parent a99d5df3
......@@ -678,12 +678,6 @@ ulong ha_tokudb::index_flags(uint idx, uint part, bool all_parts) const {
DBUG_RETURN(flags);
}
static int tokudb_cmp_hidden_key(DB * file, const DBT * new_key, const DBT * saved_key) {
ulonglong a = uint5korr((char *) new_key->data);
ulonglong b = uint5korr((char *) saved_key->data);
return a < b ? -1 : (a > b ? 1 : 0);
}
/*
Things that are required for ALL data types:
key_part->field->null_bit
......@@ -804,6 +798,13 @@ cleanup:
return r;
}
static int tokudb_cmp_hidden_key(DB * file, const DBT * new_key, const DBT * saved_key) {
ulonglong a = uint5korr((char *) new_key->data);
ulonglong b = uint5korr((char *) saved_key->data);
return a < b ? -1 : (a > b ? 1 : 0);
}
static int tokudb_compare_two_keys(KEY *key, const DBT * new_key, const DBT * saved_key, bool cmp_prefix) {
uchar new_key_inf_val = *(uchar *) new_key->data;
uchar saved_key_inf_val = *(uchar *) saved_key->data;
......@@ -867,9 +868,122 @@ static int tokudb_compare_two_keys(KEY *key, const DBT * new_key, const DBT * sa
return ret_val;
}
//
// this is super super ugly, copied from compare_two_keys so that it can get done fast
//
static int tokudb_compare_two_clustered_keys(KEY *key, KEY* primary_key, const DBT * new_key, const DBT * saved_key) {
uchar new_key_inf_val = *(uchar *) new_key->data;
uchar saved_key_inf_val = *(uchar *) saved_key->data;
//
// first byte is "infinity" byte
//
uchar *new_key_ptr = (uchar *)(new_key->data) + 1;
uchar *saved_key_ptr = (uchar *)(saved_key->data) + 1;
KEY_PART_INFO *key_part = key->key_part, *end = key_part + key->key_parts;
int ret_val;
//
// do not include the inf val at the beginning
//
uint new_key_length = new_key->size - sizeof(uchar);
uint saved_key_length = saved_key->size - sizeof(uchar);
//DBUG_DUMP("key_in_index", saved_key_ptr, saved_key->size);
for (; key_part != end && (int) new_key_length > 0 && (int) saved_key_length > 0; key_part++) {
int cmp;
uint new_key_field_length;
uint saved_key_field_length;
if (key_part->field->null_bit) {
assert(new_key_ptr < (uchar *) new_key->data + new_key->size);
assert(saved_key_ptr < (uchar *) saved_key->data + saved_key->size);
if (*new_key_ptr != *saved_key_ptr) {
return ((int) *new_key_ptr - (int) *saved_key_ptr); }
saved_key_ptr++;
new_key_length--;
saved_key_length--;
if (!*new_key_ptr++) { continue; }
}
new_key_field_length = key_part->field->packed_col_length(new_key_ptr, key_part->length);
saved_key_field_length = key_part->field->packed_col_length(saved_key_ptr, key_part->length);
assert(new_key_length >= new_key_field_length);
assert(saved_key_length >= saved_key_field_length);
if ((cmp = key_part->field->pack_cmp(new_key_ptr, saved_key_ptr, key_part->length, 0)))
return cmp;
new_key_ptr += new_key_field_length;
new_key_length -= new_key_field_length;
saved_key_ptr += saved_key_field_length;
saved_key_length -= saved_key_field_length;
}
if (new_key_length == 0 && saved_key_length == 0){
ret_val = 0;
}
else if (new_key_length == 0 && saved_key_length > 0) {
ret_val = (new_key_inf_val == COL_POS_INF ) ? 1 : -1;
}
else if (new_key_length > 0 && saved_key_length == 0) {
ret_val = (saved_key_inf_val == COL_POS_INF ) ? -1 : 1;
}
//
// now we compare the primary key
//
else {
if (primary_key == NULL) {
//
// primary key hidden
//
ulonglong a = uint5korr((char *) new_key_ptr);
ulonglong b = uint5korr((char *) saved_key_ptr);
ret_val = a < b ? -1 : (a > b ? 1 : 0);
}
else {
//
// primary key not hidden, I know this is bad, basically copying the code from above
//
key_part = primary_key->key_part;
end = key_part + primary_key->key_parts;
for (; key_part != end && (int) new_key_length > 0 && (int) saved_key_length > 0; key_part++) {
int cmp;
uint new_key_field_length;
uint saved_key_field_length;
if (key_part->field->null_bit) {
assert(new_key_ptr < (uchar *) new_key->data + new_key->size);
assert(saved_key_ptr < (uchar *) saved_key->data + saved_key->size);
if (*new_key_ptr != *saved_key_ptr) {
return ((int) *new_key_ptr - (int) *saved_key_ptr); }
saved_key_ptr++;
new_key_length--;
saved_key_length--;
if (!*new_key_ptr++) { continue; }
}
new_key_field_length = key_part->field->packed_col_length(new_key_ptr, key_part->length);
saved_key_field_length = key_part->field->packed_col_length(saved_key_ptr, key_part->length);
assert(new_key_length >= new_key_field_length);
assert(saved_key_length >= saved_key_field_length);
if ((cmp = key_part->field->pack_cmp(new_key_ptr, saved_key_ptr, key_part->length, 0)))
return cmp;
new_key_ptr += new_key_field_length;
new_key_length -= new_key_field_length;
saved_key_ptr += saved_key_field_length;
saved_key_length -= saved_key_field_length;
}
//
// at this point, we have compared the actual keys and the primary key, we return 0
//
ret_val = 0;
}
}
return ret_val;
}
static int tokudb_cmp_packed_key(DB *file, const DBT *keya, const DBT *keyb) {
assert(file->app_private != 0);
KEY *key = (KEY *) file->app_private;
KEY *primary_key = (KEY *) file->api_internal;
if (key->flags & HA_CLUSTERING) {
return tokudb_compare_two_clustered_keys(key, primary_key, keya, keyb);
}
return tokudb_compare_two_keys(key, keya, keyb, false);
}
......@@ -947,7 +1061,7 @@ typedef struct smart_dbt_ai_info {
static void smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context) {
SMART_DBT_AI_INFO info = (SMART_DBT_AI_INFO)context;
info->ha->unpack_row(info->buf,row,key);
info->ha->unpack_row(info->buf,row,key, true);
//
// copy the key to prim_key
//
......@@ -1207,9 +1321,14 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
}
DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %s\n", key_info->name));
(*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT);
//
// clustering keys are not DB_DUP, because their keys are unique (they have the PK embedded)
//
if (!(key_info->flags & HA_CLUSTERING)) {
(*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT);
(*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key);
}
(*ptr)->api_internal = share->file->app_private;
(*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key);
if ((error = (*ptr)->open(*ptr, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
my_errno = error;
......@@ -1269,7 +1388,8 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
}
/* Need some extra memory in case of packed keys */
// the "+ 1" is for the first byte that states +/- infinity
max_key_length = table_share->max_key_length + MAX_REF_PARTS * 3 + sizeof(uchar);
// multiply everything by 2 to account for clustered keys having a key and primary key together
max_key_length = 2*(table_share->max_key_length + MAX_REF_PARTS * 3 + sizeof(uchar));
if (!(alloc_ptr =
my_multi_malloc(MYF(MY_WME),
&key_buff, max_key_length,
......@@ -1342,8 +1462,12 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
TOKUDB_DBUG_RETURN(1);
}
if (!hidden_primary_key)
if (!hidden_primary_key) {
share->file->app_private = (void *) (table_share->key_info + table_share->primary_key);
}
else {
share->file->app_private = NULL;
}
if (tokudb_debug & TOKUDB_DEBUG_SAVE_TRACE) {
bzero((void *) &cmp_byte_stream, sizeof(cmp_byte_stream));
cmp_byte_stream.flags = DB_DBT_MALLOC;
......@@ -1678,7 +1802,7 @@ ulong ha_tokudb::max_row_length(const uchar * buf) {
// [in] record - row in MySQL format
//
int ha_tokudb::pack_row(DBT * row, const uchar * record) {
int ha_tokudb::pack_row(DBT * row, const uchar * record, bool strip_pk) {
uchar *ptr;
int r = ENOSYS;
bzero((void *) row, sizeof(*row));
......@@ -1692,7 +1816,7 @@ int ha_tokudb::pack_row(DBT * row, const uchar * record) {
// fixed length row is first below
//
if (share->fixed_length_row) {
if (hidden_primary_key) {
if (hidden_primary_key || !strip_pk) {
row->data = (void *)record;
row->size = table_share->reclength;
r = 0;
......@@ -1757,7 +1881,7 @@ int ha_tokudb::pack_row(DBT * row, const uchar * record) {
// if the primary key is hidden, primary_key_offsets will be NULL and
// this clause will not execute
//
if (primary_key_offsets) {
if (primary_key_offsets && strip_pk) {
uint curr_skip_offset = primary_key_offsets[curr_skip_index].offset;
if (curr_skip_offset == curr_field_offset) {
//
......@@ -1799,13 +1923,13 @@ cleanup:
// [out] record - row in MySQL format
// [in] row - row stored in DBT to be converted
//
void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key) {
void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key, bool pk_stripped) {
//
// two cases, fixed length row, and variable length row
// fixed length row is first below
//
if (share->fixed_length_row) {
if (hidden_primary_key) {
if (hidden_primary_key || !pk_stripped) {
memcpy(record, (void *) row->data, table_share->reclength);
}
else {
......@@ -1842,7 +1966,7 @@ void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key) {
const uchar *ptr = (const uchar *) row->data;
memcpy(record, ptr, table_share->null_bytes);
ptr += table_share->null_bytes;
if (primary_key_offsets) {
if (primary_key_offsets && pk_stripped) {
//
// unpack_key will fill in parts of record that are part of the primary key
//
......@@ -1855,7 +1979,7 @@ void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key) {
uint curr_skip_index = 0;
for (Field ** field = table->field; *field; field++) {
uint curr_field_offset = field_offset(*field);
if (primary_key_offsets) {
if (primary_key_offsets && pk_stripped) {
uint curr_skip_offset = primary_key_offsets[curr_skip_index].offset;
if (curr_skip_offset == curr_field_offset) {
//
......@@ -1887,18 +2011,11 @@ void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key) {
}
//
// Store the key and the primary key into the row
// Parameters:
// [out] record - key stored in MySQL format
// [in] key - key stored in DBT to be converted
// index -index into key_file that represents the DB
// unpacking a key of
//
void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) {
u_int32_t ha_tokudb::place_key_into_mysql_buff(uchar * record, uchar* data, uint index) {
KEY *key_info = table->key_info + index;
KEY_PART_INFO *key_part = key_info->key_part, *end = key_part + key_info->key_parts;
uchar *pos = (uchar *) key->data + 1;
uchar *pos = data;
for (; key_part != end; key_part++) {
if (key_part->null_bit) {
......@@ -1922,38 +2039,36 @@ void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) {
unpack_length, table->s->db_low_byte_first);
#endif
}
return pos-data;
}
//
// Create a packed key from a row. This key will be written as such
// to the index tree. This will never fail as the key buffer is pre-allocated.
// Store the key and the primary key into the row
// Parameters:
// [out] key - DBT that holds the key
// [in] key_info - holds data about the key, such as it's length and offset into record
// [out] buff - buffer that will hold the data for key (unless
// we have a hidden primary key)
// [in] record - row from which to create the key
// key_length - currently set to MAX_KEY_LENGTH, is it size of buff?
// Returns:
// the parameter key
// [out] record - key stored in MySQL format
// [in] key - key stored in DBT to be converted
// index -index into key_file that represents the DB
// unpacking a key of
//
void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) {
u_int32_t bytes_read;
uchar *pos = (uchar *) key->data + 1;
bytes_read = place_key_into_mysql_buff(record,pos,index);
if((table->key_info[index].flags & HA_CLUSTERING) && !hidden_primary_key) {
//
// also unpack primary key
//
place_key_into_mysql_buff(record,pos+bytes_read,primary_key);
}
}
DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length) {
KEY_PART_INFO *key_part = key_info->key_part;
KEY_PART_INFO *end = key_part + key_info->key_parts;
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
key->data = buff;
//
// first put the "infinity" byte at beginning. States if missing columns are implicitly
// positive infinity or negative infinity. For this, because we are creating key
// from a row, there is no way that columns can be missing, so in practice,
// this will be meaningless. Might as well put in a value
//
*buff++ = COL_NEG_INF;
u_int32_t ha_tokudb::place_key_into_dbt_buff(KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length) {
KEY_PART_INFO *key_part = key_info->key_part;
KEY_PART_INFO *end = key_part + key_info->key_parts;
uchar* curr_buff = buff;
*has_null = false;
for (; key_part != end && key_length > 0; key_part++) {
//
......@@ -1966,7 +2081,7 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff,
uint null_offset = (uint) ((char*) key_part->field->null_ptr
- (char*) table->record[0]);
if (record[null_offset] & key_part->field->null_bit) {
*buff++ = NULL_COL_VAL;
*curr_buff++ = NULL_COL_VAL;
*has_null = true;
//
// fractal tree does not handle this falg at the moment
......@@ -1975,14 +2090,14 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff,
//key->flags |= DB_DBT_DUPOK;
continue;
}
*buff++ = NONNULL_COL_VAL; // Store NOT NULL marker
*curr_buff++ = NONNULL_COL_VAL; // Store NOT NULL marker
}
//
// accessing field_offset(key_part->field) instead off key_part->offset
// because key_part->offset is SET INCORRECTLY in add_index
// filed ticket 862 to look into this
//
buff = key_part->field->pack_key(buff, (uchar *) (record + field_offset(key_part->field)),
curr_buff = key_part->field->pack_key(curr_buff, (uchar *) (record + field_offset(key_part->field)),
#if MYSQL_VERSION_ID < 50123
key_part->length);
#else
......@@ -1990,7 +2105,60 @@ DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff,
#endif
key_length -= key_part->length;
}
key->size = (buff - (uchar *) key->data);
return curr_buff - buff;
}
//
// Create a packed key from a row. This key will be written as such
// to the index tree. This will never fail as the key buffer is pre-allocated.
// Parameters:
// [out] key - DBT that holds the key
// [in] key_info - holds data about the key, such as it's length and offset into record
// [out] buff - buffer that will hold the data for key (unless
// we have a hidden primary key)
// [in] record - row from which to create the key
// key_length - currently set to MAX_KEY_LENGTH, is it size of buff?
// Returns:
// the parameter key
//
DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length) {
u_int32_t size = 0;
uchar* tmp_buff = buff;
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
key->data = buff;
//
// first put the "infinity" byte at beginning. States if missing columns are implicitly
// positive infinity or negative infinity. For this, because we are creating key
// from a row, there is no way that columns can be missing, so in practice,
// this will be meaningless. Might as well put in a value
//
*tmp_buff++ = COL_NEG_INF;
size++;
size += place_key_into_dbt_buff(key_info, tmp_buff, record, has_null, key_length);
if (key_info->flags & HA_CLUSTERING) {
tmp_buff = buff + size;
if (hidden_primary_key) {
memcpy_fixed(tmp_buff, current_ident, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
size += TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
}
else {
bool tmp_bool = false;
size += place_key_into_dbt_buff(
&table->key_info[primary_key],
tmp_buff,
record,
&tmp_bool,
MAX_KEY_LENGTH //this parameter does not matter
);
}
}
key->size = size;
DBUG_DUMP("key", (uchar *) key->data, key->size);
dbug_tmp_restore_column_map(table->write_set, old_map);
return key;
......@@ -2334,7 +2502,7 @@ int ha_tokudb::write_row(uchar * record) {
pthread_mutex_unlock(&share->mutex);
}
if ((error = pack_row(&row, (const uchar *) record))){
if ((error = pack_row(&row, (const uchar *) record, true))){
goto cleanup;
}
......@@ -2369,10 +2537,12 @@ int ha_tokudb::write_row(uchar * record) {
last_dup_key = primary_key;
goto cleanup;
}
//
// now insertion for rest of indexes
//
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool cluster_row_created = false;
if (keynr == primary_key) {
continue;
}
......@@ -2382,13 +2552,30 @@ int ha_tokudb::write_row(uchar * record) {
if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) {
put_flags = DB_YESOVERWRITE;
}
error = share->key_file[keynr]->put(
share->key_file[keynr],
txn,
&key,
&prim_key,
put_flags
);
if (table->key_info[keynr].flags & HA_CLUSTERING) {
if (!cluster_row_created) {
if ((error = pack_row(&row, (const uchar *) record, false))){
goto cleanup;
}
cluster_row_created = true;
}
error = share->key_file[keynr]->put(
share->key_file[keynr],
txn,
&key,
&row,
put_flags
);
}
else {
error = share->key_file[keynr]->put(
share->key_file[keynr],
txn,
&key,
&prim_key,
put_flags
);
}
//
// We break if we hit an error, unless it is a dup key error
// and MySQL told us to ignore duplicate key errors
......@@ -2456,7 +2643,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
// Primary key changed or we are updating a key that can have duplicates.
// Delete the old row and add a new one
if (!(error = remove_key(trans, primary_key, old_row, old_key))) {
if (!(error = pack_row(&row, new_row))) {
if (!(error = pack_row(&row, new_row, true))) {
if ((error = share->file->put(share->file, trans, new_key, &row, share->key_type[primary_key]))) {
// Probably a duplicated key; restore old key and row if needed
last_dup_key = primary_key;
......@@ -2466,7 +2653,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
}
else {
// Primary key didn't change; just update the row data
if (!(error = pack_row(&row, new_row))) {
if (!(error = pack_row(&row, new_row, true))) {
error = share->file->put(share->file, trans, new_key, &row, 0);
}
}
......@@ -2484,7 +2671,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
//
int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
TOKUDB_DBUG_ENTER("update_row");
DBT prim_key, key, old_prim_key;
DBT prim_key, key, old_prim_key, row;
int error;
bool primary_key_changed;
bool has_null;
......@@ -2555,6 +2742,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
}
// Update all other keys
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool cluster_row_created = false;
if (keynr == primary_key) {
continue;
}
......@@ -2568,13 +2756,30 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) {
put_flags = DB_YESOVERWRITE;
}
error = share->key_file[keynr]->put(
share->key_file[keynr],
txn,
&key,
&prim_key,
put_flags
);
if (table->key_info[keynr].flags & HA_CLUSTERING) {
if (!cluster_row_created) {
if ((error = pack_row(&row, (const uchar *) new_row, false))){
goto cleanup;
}
cluster_row_created = true;
}
error = share->key_file[keynr]->put(
share->key_file[keynr],
txn,
&key,
&row,
put_flags
);
}
else {
error = share->key_file[keynr]->put(
share->key_file[keynr],
txn,
&key,
&prim_key,
put_flags
);
}
//
// We break if we hit an error, unless it is a dup key error
// and MySQL told us to ignore duplicate key errors
......@@ -2635,7 +2840,12 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT
else if (keynr == primary_key) { // Unique key
DBUG_PRINT("Unique key", ("index: %d", keynr));
error = share->key_file[keynr]->del(share->key_file[keynr], trans, prim_key , 0);
}
}
else if (table->key_info[keynr].flags & HA_CLUSTERING) {
DBUG_PRINT("clustering key", ("index: %d", keynr));
create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null);
error = share->key_file[keynr]->del(share->key_file[keynr], trans, &key , 0);
}
else {
create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null);
error = share->key_file[keynr]->delboth(
......@@ -2818,6 +3028,16 @@ void ha_tokudb::extract_hidden_primary_key(uint keynr, DBT const *row, DBT const
if (keynr == primary_key) {
memcpy_fixed(current_ident, (char *) found_key->data, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
}
//
// if clustering key, hidden primary key is at end of found_key
//
else if (table->key_info[keynr].flags & HA_CLUSTERING) {
memcpy_fixed(
current_ident,
(char *) found_key->data + found_key->size - TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH,
TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH
);
}
else {
memcpy_fixed(current_ident, (char *) row->data, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
}
......@@ -2838,7 +3058,7 @@ void ha_tokudb::read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const
TOKUDB_DBUG_ENTER("ha_tokudb::read_key_only");
table->status = 0;
unpack_key(buf, found_key, keynr);
if (!hidden_primary_key && (keynr != primary_key)) {
if (!hidden_primary_key && (keynr != primary_key) && !(table->key_info[keynr].flags & HA_CLUSTERING)) {
unpack_key(buf, row, primary_key);
}
DBUG_VOID_RETURN;
......@@ -2857,7 +3077,10 @@ void ha_tokudb::read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const
void ha_tokudb::read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_primary_key");
table->status = 0;
if (keynr != primary_key) {
//
// case where we read from secondary table that is not clustered
//
if (keynr != primary_key && !(table->key_info[keynr].flags & HA_CLUSTERING)) {
//
// create a DBT that has the same data as row,
//
......@@ -2866,8 +3089,17 @@ void ha_tokudb::read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT co
last_key.size = row->size;
memcpy(key_buff, row->data, row->size);
}
//
// case we read from clustered key, so unpack the entire row
//
else if (keynr != primary_key && (table->key_info[keynr].flags & HA_CLUSTERING)) {
unpack_row(buf, row, found_key, false);
}
//
// case we read from the primary key, so unpack the entire row
//
else {
unpack_row(buf, row, found_key);
unpack_row(buf, row, found_key, true);
}
if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); }
DBUG_VOID_RETURN;
......@@ -2892,7 +3124,7 @@ int ha_tokudb::read_full_row(uchar * buf) {
table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
}
unpack_row(buf, &current_row, &last_key);
unpack_row(buf, &current_row, &last_key, true);
TOKUDB_DBUG_RETURN(0);
}
......@@ -2929,13 +3161,20 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun
if (key_read && found_key) {
// TOKUDB_DBUG_DUMP("key=", found_key->data, found_key->size);
unpack_key(buf, found_key, keynr);
if (!hidden_primary_key) {
if (!hidden_primary_key && !(table->key_info[keynr].flags & HA_CLUSTERING)) {
// TOKUDB_DBUG_DUMP("row=", row->data, row->size);
unpack_key(buf, row, primary_key);
}
TOKUDB_DBUG_RETURN(0);
}
//
// in this case we have a clustered key, so no need to do pt query
//
if (table->key_info[keynr].flags & HA_CLUSTERING) {
unpack_row(buf, row, found_key, false);
TOKUDB_DBUG_RETURN(0);
}
//
// create a DBT that has the same data as row,
//
DBT key;
......@@ -2951,14 +3190,17 @@ int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *foun
table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
}
unpack_row(buf, &current_row, &key);
unpack_row(buf, &current_row, &key, true);
}
else {
//
// in this case we are dealing with the primary key
//
if (key_read && !hidden_primary_key) {
unpack_key(buf, found_key, keynr);
}
else {
unpack_row(buf, row, found_key);
unpack_row(buf, row, found_key, true);
}
}
if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); }
......@@ -3212,7 +3454,7 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
if (!error && do_read_row) {
error = read_row(buf, active_index, &row, &last_key);
}
else if (!error && !key_read && active_index != primary_key) {
else if (!error && !key_read && active_index != primary_key && !(table->key_info[active_index].flags & HA_CLUSTERING)) {
error = read_full_row(buf);
}
......@@ -3251,7 +3493,7 @@ int ha_tokudb::index_next(uchar * buf) {
// still need to get entire contents of the row if operation done on
// secondary DB and it was NOT a covering index
//
if (!error && !key_read && (active_index != primary_key) ) {
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf);
}
cleanup:
......@@ -3286,13 +3528,13 @@ int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
u_int32_t flags = SET_READ_FLAG(0);
error = handle_cursor_error(cursor->c_getf_next_dup(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,active_index);
if (!error && !key_read && active_index != primary_key) {
if (!error && !key_read && active_index != primary_key && !(table->key_info[active_index].flags & HA_CLUSTERING)) {
error = read_full_row(buf);
}
} else {
u_int32_t flags = SET_READ_FLAG(0);
error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,active_index);
if (!error && !key_read && active_index != primary_key) {
if (!error && !key_read && active_index != primary_key && !(table->key_info[active_index].flags & HA_CLUSTERING)) {
error = read_full_row(buf);
}
if (!error &&::key_cmp_if_same(table, key, active_index, keylen))
......@@ -3328,7 +3570,7 @@ int ha_tokudb::index_prev(uchar * buf) {
// still need to get entire contents of the row if operation done on
// secondary DB and it was NOT a covering index
//
if (!error && !key_read && (active_index != primary_key) ) {
if (!error && !key_read && (active_index != primary_key) && !(table->key_info[active_index].flags & HA_CLUSTERING) ) {
error = read_full_row(buf);
}
......@@ -4180,10 +4422,11 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
char part[MAX_ALIAS_NAME + 10];
for (uint i = 0; i < form->s->keys; i++) {
if (i != primary_key) {
int flags = (form->s->key_info[i].flags & HA_CLUSTERING) ? 0 : DB_DUP + DB_DUPSORT;
sprintf(part, "key-%s", form->s->key_info[i].name);
make_name(newname, name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, DB_DUP + DB_DUPSORT);
error = create_sub_table(name_buff, NULL, DB_BTREE, flags);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, form->key_info[i].flags, error);
}
......@@ -4329,11 +4572,17 @@ double ha_tokudb::read_time(
double total_scan;
double ret_val;
if (index != primary_key) {
//
// if it is not the primary key, and it is not a clustering key, then return handler::read_time
//
if (index != primary_key && !(table->key_info[index].flags & HA_CLUSTERING)) {
ret_val = handler::read_time(index, ranges, rows);
goto cleanup;
}
//
// for primary key and for clustered keys, return a fraction of scan_time()
//
total_scan = scan_time();
if (stats.records < rows) {
......@@ -4648,12 +4897,13 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
uchar* tmp_key_buff = NULL;
uchar* tmp_prim_key_buff = NULL;
uchar* tmp_record = NULL;
THD* thd = ha_thd();
uchar* tmp_record2 = NULL;
newname = (char *)my_malloc(share->table_name_length + NAME_CHAR_LEN, MYF(MY_WME));
tmp_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME));
tmp_prim_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME));
tmp_record = (uchar *)my_malloc(table_arg->s->rec_buff_length,MYF(MY_WME));
tmp_record2 = (uchar *)my_malloc(2*table_arg->s->rec_buff_length,MYF(MY_WME));
//
// number of DB files we have open currently, before add_index is executed
......@@ -4692,10 +4942,11 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
//
char part[MAX_ALIAS_NAME + 10];
for (uint i = 0; i < num_of_keys; i++) {
int flags = (key_info[i].flags & HA_CLUSTERING) ? 0 : DB_DUP + DB_DUPSORT;
sprintf(part, "key-%s", key_info[i].name);
make_name(newname, share->table_name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, DB_DUP + DB_DUPSORT);
error = create_sub_table(name_buff, NULL, DB_BTREE, flags);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, key_info[i].flags, error);
}
......@@ -4779,16 +5030,26 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
for (uint i = 0; i < num_of_keys; i++) {
DBT secondary_key;
bool cluster_row_created = false;
DBT secondary_key, row;
bool has_null = false;
create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record, &has_null);
uint curr_index = i + curr_num_DBs;
u_int32_t put_flags = share->key_type[curr_index];
if (put_flags == DB_NOOVERWRITE && (has_null || thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))) {
put_flags = DB_YESOVERWRITE;
put_flags = DB_YESOVERWRITE;
if (key_info[i].flags & HA_CLUSTERING) {
if (!cluster_row_created) {
if ((error = pack_row(&row, (const uchar *) tmp_record, false))){
goto cleanup;
}
cluster_row_created = true;
}
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &row, put_flags);
}
else {
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &current_primary_key, put_flags);
}
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &current_primary_key, put_flags);
if (error) {
//
// in the case of any error anywhere, we can just nuke all the files created, so we dont need
......
......@@ -186,8 +186,10 @@ private:
uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH];
ulong max_row_length(const uchar * buf);
int pack_row(DBT * row, const uchar * record);
int pack_row(DBT * row, const uchar * record, bool strip_pk);
u_int32_t place_key_into_mysql_buff(uchar * record, uchar* data, uint index);
void unpack_key(uchar * record, DBT const *key, uint index);
u_int32_t place_key_into_dbt_buff(KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, bool* has_null, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length, uchar inf_byte);
......@@ -327,6 +329,9 @@ public:
bool primary_key_is_clustered() {
return true;
}
bool supports_clustered_keys() {
return true;
}
int cmp_ref(const uchar * ref1, const uchar * ref2);
bool check_if_incompatible_data(HA_CREATE_INFO * info, uint table_changes);
......@@ -341,7 +346,7 @@ public:
void read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
void read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
int read_row(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
void unpack_row(uchar * record, DBT const *row, DBT const *key);
void unpack_row(uchar * record, DBT const *row, DBT const *key, bool pk_stripped);
int heavi_ret_val;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment