Commit de910f49 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #648

Updated ha_tokudb::index_read and comparison functions based on code review.
Also put in some (very informal) comments about how index_read is supposed to work.

git-svn-id: file:///svn/mysql/tokudb-engine/src@3275 c7de825b-a66e-492c-adef-691d508d4ae1
parent e11df507
...@@ -684,33 +684,34 @@ static int tokudb_cmp_packed_key(DB * file, const DBT * new_key, const DBT * sav ...@@ -684,33 +684,34 @@ static int tokudb_cmp_packed_key(DB * file, const DBT * new_key, const DBT * sav
//DBUG_DUMP("key_in_index", saved_key_ptr, saved_key->size); //DBUG_DUMP("key_in_index", saved_key_ptr, saved_key->size);
for (; key_part != end && (int) key_length > 0 && (int) saved_key_length > 0; key_part++) { for (; key_part != end && (int) key_length > 0 && (int) saved_key_length > 0; key_part++) {
int cmp; int cmp;
uint length; uint new_key_field_length;
uint saved_key_field_length;
if (key_part->null_bit) { if (key_part->null_bit) {
assert(new_key_ptr < (uchar *) new_key->data + new_key->size); assert(new_key_ptr < (uchar *) new_key->data + new_key->size);
assert(saved_key_ptr < (uchar *) saved_key->data + saved_key->size); assert(saved_key_ptr < (uchar *) saved_key->data + saved_key->size);
if (*new_key_ptr != *saved_key_ptr++) if (*new_key_ptr != *saved_key_ptr) {
return ((int) *new_key_ptr - (int) saved_key_ptr[-1]); return ((int) *new_key_ptr - (int) *saved_key_ptr); }
saved_key_ptr++;
key_length--; key_length--;
saved_key_length--; saved_key_length--;
if (!*new_key_ptr++) if (!*new_key_ptr++) { continue; }
continue;
} }
new_key_field_length = key_part->field->packed_col_length(new_key_ptr, key_part->length);
saved_key_field_length = key_part->field->packed_col_length(saved_key_ptr, key_part->length);
assert( key_length >= new_key_field_length);
assert(saved_key_length >= saved_key_field_length);
if ((cmp = key_part->field->pack_cmp(new_key_ptr, saved_key_ptr, key_part->length, 0))) if ((cmp = key_part->field->pack_cmp(new_key_ptr, saved_key_ptr, key_part->length, 0)))
return cmp; return cmp;
length = key_part->field->packed_col_length(new_key_ptr, key_part->length); new_key_ptr += new_key_field_length;
new_key_ptr += length; key_length -= new_key_field_length;
key_length -= length; saved_key_ptr += saved_key_field_length;
length = key_part->field->packed_col_length(saved_key_ptr, key_part->length); saved_key_length -= saved_key_field_length;
saved_key_ptr += length; }
saved_key_length -= length; return key_length - saved_key_length;
}
if (key_length < saved_key_length)
return -1;
if (key_length > saved_key_length)
return 1;
return 0;
} }
//TODO: QQQ Only do one direction for prefix.
//TODO: QQQ Combine this and previous function.
static int tokudb_prefix_cmp_packed_key(DB * file, const DBT * new_key, const DBT * saved_key) { static int tokudb_prefix_cmp_packed_key(DB * file, const DBT * new_key, const DBT * saved_key) {
assert(file->app_private != 0); assert(file->app_private != 0);
KEY *key = (KEY *) file->app_private; KEY *key = (KEY *) file->app_private;
...@@ -723,25 +724,28 @@ static int tokudb_prefix_cmp_packed_key(DB * file, const DBT * new_key, const DB ...@@ -723,25 +724,28 @@ static int tokudb_prefix_cmp_packed_key(DB * file, const DBT * new_key, const DB
//DBUG_DUMP("key_in_index", saved_key_ptr, saved_key->size); //DBUG_DUMP("key_in_index", saved_key_ptr, saved_key->size);
for (; key_part != end && (int) key_length > 0 && (int) saved_key_length > 0; key_part++) { for (; key_part != end && (int) key_length > 0 && (int) saved_key_length > 0; key_part++) {
int cmp; int cmp;
uint length; uint new_key_field_length;
uint saved_key_field_length;
if (key_part->null_bit) { if (key_part->null_bit) {
assert(new_key_ptr < (uchar *) new_key->data + new_key->size); assert(new_key_ptr < (uchar *) new_key->data + new_key->size);
assert(saved_key_ptr < (uchar *) saved_key->data + saved_key->size); assert(saved_key_ptr < (uchar *) saved_key->data + saved_key->size);
if (*new_key_ptr != *saved_key_ptr++) if (*new_key_ptr != *saved_key_ptr) {
return ((int) *new_key_ptr - (int) saved_key_ptr[-1]); return ((int) *new_key_ptr - (int) *saved_key_ptr); }
saved_key_ptr++;
key_length--; key_length--;
saved_key_length--; saved_key_length--;
if (!*new_key_ptr++) if (!*new_key_ptr++) { continue; }
continue;
} }
new_key_field_length = key_part->field->packed_col_length(new_key_ptr, key_part->length);
saved_key_field_length = key_part->field->packed_col_length(saved_key_ptr, key_part->length);
assert( key_length >= new_key_field_length);
assert(saved_key_length >= saved_key_field_length);
if ((cmp = key_part->field->pack_cmp(new_key_ptr, saved_key_ptr, key_part->length, 0))) if ((cmp = key_part->field->pack_cmp(new_key_ptr, saved_key_ptr, key_part->length, 0)))
return cmp; return cmp;
length = key_part->field->packed_col_length(new_key_ptr, key_part->length); new_key_ptr += new_key_field_length;
new_key_ptr += length; key_length -= new_key_field_length;
key_length -= length; saved_key_ptr += saved_key_field_length;
length = key_part->field->packed_col_length(saved_key_ptr, key_part->length); saved_key_length -= saved_key_field_length;
saved_key_ptr += length;
saved_key_length -= length;
} }
return 0; return 0;
} }
...@@ -1746,6 +1750,8 @@ int ha_tokudb::index_read_idx(uchar * buf, uint keynr, const uchar * key, uint k ...@@ -1746,6 +1750,8 @@ int ha_tokudb::index_read_idx(uchar * buf, uint keynr, const uchar * key, uint k
TOKUDB_DBUG_RETURN(read_row(key_file[keynr]->get(key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0), buf, keynr, &current_row, &last_key, 0)); TOKUDB_DBUG_RETURN(read_row(key_file[keynr]->get(key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0), buf, keynr, &current_row, &last_key, 0));
} }
//TODO: QQQ Function to tell if a key+keylen is the entire key (loop through the schema), see comparison function for ideas.
int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_rkey_function find_flag) { int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_rkey_function find_flag) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_read"); TOKUDB_DBUG_ENTER("ha_tokudb::index_read");
DBT row; DBT row;
...@@ -1754,6 +1760,67 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -1754,6 +1760,67 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
table->in_use->status_var.ha_read_key_count++; table->in_use->status_var.ha_read_key_count++;
bzero((void *) &row, sizeof(row)); bzero((void *) &row, sizeof(row));
pack_key(&last_key, active_index, key_buff, key, key_len); pack_key(&last_key, active_index, key_buff, key, key_len);
/*
if (full_key) {
switch (find_flag) {
case (HA_READ_PREFIX): //Synonym for HA_READ_KEY_EXACT
case (HA_READ_KEY_EXACT):
Just c_get DB_SET, return.
case (HA_READ_AFTER_KEY):
c_get DB_SET_RANGE. If EQUAL to query, then do DB_NEXT (or is it DB_NEXT_NODUP?)
case (HA_READ_KEY_OR_NEXT):
c_get DB_SET_RANGE
case (HA_READ_BEFORE_KEY):
c_get DB_SET_RANGE, then do DB_PREV (or is it DB_PREV_NODUP)?
case (HA_READ_KEY_OR_PREV):
c_get DB_SET_RANGE. If NOT EQUAL to query, then do DB_PREV (or is it DB_PREV_NODUP?)
case (HA_READ_PREFIX_LAST_OR_PREV):
c_get DB_SET_RANGE. If NOT EQUAL to query, then do DB_PREV (or is it DB_PREV_NODUP?)
if if WAS equal to the query, then
if (NO_DUP db, just return it) else:
do DB_NEXT_NODUP
if found, do DB_PREV and return
else do DB_LAST and return
case (HA_READ_PREFIX_LAST):
c_get DB_SET. if !found, return NOT FOUND
if (NO_DUP db, just return it) else:
do c_get DB_NEXT_NODUP
if found, do DB_PREV and return.
else do DB_LAST and return.
default: Crash a lot.
}
else {
// Not full key
switch (find_flag) {
case (HA_READ_PREFIX): //Synonym for HA_READ_KEY_EXACT
case (HA_READ_KEY_EXACT):
c_get DB_SET_RANGE, then check a prefix
case (HA_READ_AFTER_KEY):
c_get DB_SET_RANGE, then:
while (found && query is prefix of 'dbtfound') do:
c_get DB_NEXT_NODUP (Definitely NEXT_NODUP since we care about key only).
case (HA_READ_KEY_OR_NEXT):
c_get SET_RANGE
case (HA_READ_BEFORE_KEY):
c_get DB_SET_RANGE, then do DB_PREV (or is it DB_PREV_NODUP)?
case (HA_READ_KEY_OR_PREV):
c_get DB_SET_RANGE. If query not a prefix of found, then DB_PREV (or is it DB_PREV_NODUP?)
case (HA_READ_PREFIX_LAST_OR_PREV):
c_get DB_SET_RANGE, then:
if (found && query is prefix of whatever found) do:
c_get DB_NEXT till not prefix (and return the one that was)
if (found originally but was not prefix of whatever found) do:
c_get DB_PREV
case (HA_READ_PREFIX_LAST):
c_get DB_SET_RANGE. if !found, or query not prefix of what found, return NOT FOUND
whlie query is prefix of whatfound, do c_get DB_NEXT till not.. then return the last one that was.
default: Crash a lot.
}
}
Note that sometimes if not found, will need things like DB_FIRST or DB_LAST
TODO: QQQ maybe need to pass true/1 as last parameter of read_row (this would make it
return END_OF_FILE instead of just NOT_FOUND
*/
switch (find_flag) { switch (find_flag) {
case HA_READ_KEY_EXACT: case HA_READ_KEY_EXACT:
...@@ -1764,34 +1831,30 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -1764,34 +1831,30 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key)) if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key))
error = DB_NOTFOUND; error = DB_NOTFOUND;
} }
error = read_row(error, buf, active_index, &row, 0, 0);
break; break;
case HA_READ_AFTER_KEY: case HA_READ_AFTER_KEY:
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE); error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
if (error == 0) { if (error == 0) {
DBT orig_key; DBT orig_key;
pack_key(&orig_key, active_index, key_buff2, key, key_len);
for (;;) { for (;;) {
pack_key(&orig_key, active_index, key_buff2, key, key_len);
if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key) != 0) if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key) != 0)
break; break;
error = cursor->c_get(cursor, &last_key, &row, DB_NEXT); error = cursor->c_get(cursor, &last_key, &row, DB_NEXT_NODUP);
if (error != 0) if (error != 0)
break; break;
} }
} }
error = read_row(error, buf, active_index, &row, 0, 0);
break; break;
case HA_READ_BEFORE_KEY: case HA_READ_BEFORE_KEY:
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE); error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
if (error == 0) if (error == 0)
error = cursor->c_get(cursor, &last_key, &row, DB_PREV); error = cursor->c_get(cursor, &last_key, &row, DB_PREV);
else else if (error == DB_NOTFOUND)
error = cursor->c_get(cursor, &last_key, &row, DB_LAST); error = cursor->c_get(cursor, &last_key, &row, DB_LAST);
error = read_row(error, buf, active_index, &row, 0, 0);
break; break;
case HA_READ_KEY_OR_NEXT: case HA_READ_KEY_OR_NEXT:
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE); error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
error = read_row(error, buf, active_index, &row, 0, 0);
break; break;
case HA_READ_KEY_OR_PREV: case HA_READ_KEY_OR_PREV:
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE); error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
...@@ -1800,33 +1863,36 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_ ...@@ -1800,33 +1863,36 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
pack_key(&orig_key, active_index, key_buff2, key, key_len); pack_key(&orig_key, active_index, key_buff2, key, key_len);
if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key) != 0) if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key) != 0)
error = cursor->c_get(cursor, &last_key, &row, DB_PREV); error = cursor->c_get(cursor, &last_key, &row, DB_PREV);
} else }
else if (error == DB_NOTFOUND)
error = cursor->c_get(cursor, &last_key, &row, DB_LAST); error = cursor->c_get(cursor, &last_key, &row, DB_LAST);
error = read_row(error, buf, active_index, &row, 0, 0);
break; break;
case HA_READ_PREFIX_LAST_OR_PREV: case HA_READ_PREFIX_LAST_OR_PREV:
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE); error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
if (error == 0) { if (error == 0) {
DBT orig_key; DBT orig_key;
pack_key(&orig_key, active_index, key_buff2, key, key_len);
for (;;) { for (;;) {
pack_key(&orig_key, active_index, key_buff2, key, key_len);
if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key) != 0) if (tokudb_prefix_cmp_packed_key(share->key_file[active_index], &orig_key, &last_key) != 0)
break; break;
error = cursor->c_get(cursor, &last_key, &row, DB_NEXT); error = cursor->c_get(cursor, &last_key, &row, DB_NEXT_NODUP);
if (error != 0) if (error != 0)
break; break;
} }
if (error == 0) if (error == 0)
error = cursor->c_get(cursor, &last_key, &row, DB_PREV); error = cursor->c_get(cursor, &last_key, &row, DB_PREV);
else if (error == DB_NOTFOUND)
error = cursor->c_get(cursor, &last_key, &row, DB_LAST);
} }
error = read_row(error, buf, active_index, &row, 0, 0); else if (error == DB_NOTFOUND)
error = cursor->c_get(cursor, &last_key, &row, DB_LAST);
break; break;
default: default:
printf("%d:%s:%d:unsupported:%d\n", my_tid(), __FILE__, __LINE__, find_flag); printf("%d:%s:%d:unsupported:%d\n", my_tid(), __FILE__, __LINE__, find_flag);
error = HA_ERR_UNSUPPORTED; error = HA_ERR_UNSUPPORTED;
break; break;
} }
error = read_row(error, buf, active_index, &row, 0, find_flag != HA_READ_KEY_EXACT);
if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR)) if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR))
printf("%d:%s:%d:error:%d:%d\n", my_tid(), __FILE__, __LINE__, error, find_flag); printf("%d:%s:%d:error:%d:%d\n", my_tid(), __FILE__, __LINE__, error, find_flag);
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment