Commit 20cc1cb9 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #1047 #1056

use smart DBT's for add_index and ha_tokudb cursor operations

git-svn-id: file:///svn/mysql/tokudb-engine/src@5433 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0ac75bd4
......@@ -687,7 +687,7 @@ static void make_name(char *newname, const char *tablename, const char *dictname
}
#define CHECK_VALID_CURSOR() \
#define HANDLE_INVALID_CURSOR() \
if (cursor == NULL) { \
error = last_cursor_error; \
goto cleanup; \
......@@ -946,14 +946,80 @@ int primary_key_part_compare (const void* left, const void* right) {
return left_part->offset - right_part->offset;
}
//
// struct that will be used as a context for smart DBT callbacks
// contains parameters needed to complete the smart DBT cursor call
//
typedef struct smart_dbt_info {
ha_tokudb* ha; //instance to ha_tokudb needed for reading the row
uchar* buf; // output buffer where row will be written
uint keynr; // index into share->key_file that represents DB we are currently operating on
} *SMART_DBT_INFO;
//
// struct that will be used as a context for smart DBT callbacks
// ONLY for the function add_index
//
typedef struct smart_dbt_ai_info {
ha_tokudb* ha; //instance to ha_tokudb needed for reading the row
DBT* prim_key; // DBT to store the primary key
uchar* buf; // buffer to unpack the row
} *SMART_DBT_AI_INFO;
static void smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context) {
SMART_DBT_AI_INFO info = (SMART_DBT_AI_INFO)context;
info->ha->unpack_row(info->buf,row,key);
//
// copy the key to prim_key
//
info->prim_key->size = key->size;
memcpy(info->prim_key->data, key->data, key->size);
}
//
// Smart DBT callback function in case where we have a covering index
//
static void smart_dbt_callback_keyread(DBT const *key, DBT const *row, void *context) {
SMART_DBT_INFO info = (SMART_DBT_INFO)context;
info->ha->extract_hidden_primary_key(info->keynr, row, key);
info->ha->read_key_only(info->buf,info->keynr,row,key);
}
//
// Smart DBT callback function in case where we do NOT have a covering index
//
static void smart_dbt_callback_rowread(DBT const *key, DBT const *row, void *context) {
SMART_DBT_INFO info = (SMART_DBT_INFO)context;
info->ha->extract_hidden_primary_key(info->keynr, row, key);
info->ha->read_primary_key(info->buf,info->keynr,row,key);
}
//
// Smart DBT callback function in c_getf_heavi, in case where we have a covering index,
//
static void smart_dbt_callback_keyread_heavi(DBT const *key, DBT const *row, void *context, int r_h) {
smart_dbt_callback_keyread(key,row,context);
}
//
// Smart DBT callback function in c_getf_heavi, in case where we do NOT have a covering index
//
static void smart_dbt_callback_rowread_heavi(DBT const *key, DBT const *row, void *context, int r_h) {
smart_dbt_callback_rowread(key,row,context);
}
//
// macro for Smart DBT callback function,
// so we do not need to put this long line of code in multiple places
//
#define SMART_DBT_CALLBACK ( this->key_read ? smart_dbt_callback_keyread : smart_dbt_callback_rowread )
//
// macro that modifies read flag for cursor operations depending on whether
// we have preacquired lock or not
//
#define SET_READ_FLAG(flg) ((range_lock_grabbed || current_thd->options & OPTION_TABLE_LOCK) ? (flg | DB_PRELOCKED) : flg)
#define SET_READ_FLAG(flg) ((range_lock_grabbed || current_thd->options & OPTION_TABLE_LOCK) ? ((flg) | DB_PRELOCKED) : (flg))
//
// Open a secondary table, the key will be a secondary index, the data will be a primary key
......@@ -1458,7 +1524,7 @@ cleanup:
// [out] record - row in MySQL format
// [in] row - row stored in DBT to be converted
//
void ha_tokudb::unpack_row(uchar * record, DBT* row, DBT* key) {
void ha_tokudb::unpack_row(uchar * record, DBT const *row, DBT const *key) {
//
// two cases, fixed length row, and variable length row
// fixed length row is first below
......@@ -1548,7 +1614,7 @@ void ha_tokudb::unpack_row(uchar * record, DBT* row, DBT* key) {
// index -index into key_file that represents the DB
// unpacking a key of
//
void ha_tokudb::unpack_key(uchar * record, DBT * key, uint index) {
void ha_tokudb::unpack_key(uchar * record, DBT const *key, uint index) {
KEY *key_info = table->key_info + index;
KEY_PART_INFO *key_part = key_info->key_part, *end = key_part + key_info->key_parts;
uchar *pos = (uchar *) key->data;
......@@ -2331,7 +2397,7 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
range_lock_grabbed = false;
DBUG_ASSERT(keynr <= table->s->keys);
DBUG_ASSERT(share->key_file[keynr]);
if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, table->reginfo.lock_type > TL_WRITE_ALLOW_READ ? 0 : 0))) {
if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, 0))) {
last_cursor_error = error;
cursor = NULL; // Safety
}
......@@ -2356,40 +2422,32 @@ int ha_tokudb::index_end() {
TOKUDB_DBUG_RETURN(error);
}
//
// The funtion read_row checks whether the row was obtained from the primary table or
// from an index table. If it was obtained from an index table, it further dereferences on
// the main table. In the end, the read_row function will manage to return the actual row
// of interest in the buf parameter.
//
// Parameters:
// error - result of preceding DB call
// [out] buf - buffer for the row, in MySQL format
// keynr - index into key_file that represents DB we are currently operating on.
// [in] row - the row that has been read from the preceding DB call
// [in] found_key - key used to retrieve the row
// read_next - if true, DB_NOTFOUND and DB_KEYEMPTY map to HA_ERR_END_OF_FILE,
// else HA_ERR_KEY_NOT_FOUND, this is a bad parameter to have and this funcitonality
// should not be here
//
int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * found_key, bool read_next) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_row");
//
// Disreputable error translation: this makes us all puke
//
int ha_tokudb::handle_cursor_error(int error, int err_to_return, uint keynr) {
TOKUDB_DBUG_ENTER("ha_tokudb::handle_cursor_error");
if (error) {
last_cursor_error = error;
table->status = STATUS_NOT_FOUND;
cursor->c_close(cursor);
cursor = NULL;
if (error == DB_NOTFOUND || error == DB_KEYEMPTY) {
error = read_next ? HA_ERR_END_OF_FILE : HA_ERR_KEY_NOT_FOUND;
if ((share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, table->reginfo.lock_type > TL_WRITE_ALLOW_READ ? 0 : 0))) {
error = err_to_return;
if ((share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, 0))) {
cursor = NULL; // Safety
}
}
TOKUDB_DBUG_RETURN(error);
}
TOKUDB_DBUG_RETURN(error);
}
//
// Helper function for read_row and smart_dbt_callback_xxx functions
// When using a hidden primary key, upon reading a row,
// we set the current_ident field to whatever the primary key we retrieved
// was
//
void ha_tokudb::extract_hidden_primary_key(uint keynr, DBT const *row, DBT const *found_key) {
//
// extract hidden primary key to current_ident
//
......@@ -2401,6 +2459,100 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou
memcpy_fixed(current_ident, (char *) row->data, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
}
}
}
//
// Reads the contents of row and found_key, DBT's retrieved from the DB associated to keynr, into buf
// This function assumes that we are using a covering index, as a result, if keynr is the primary key,
// we do not read row into buf
// Parameters:
// [out] buf - buffer for the row, in MySQL format
// keynr - index into key_file that represents DB we are currently operating on.
// [in] row - the row that has been read from the preceding DB call
// [in] found_key - key used to retrieve the row
//
void ha_tokudb::read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const *found_key) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_key_only");
table->status = 0;
unpack_key(buf, found_key, keynr);
if (!hidden_primary_key && (keynr != primary_key)) {
unpack_key(buf, row, primary_key);
}
DBUG_VOID_RETURN;
}
//
// Helper function used to try to retrieve the entire row
// If keynr is associated with the main table, reads contents of found_key and row into buf, otherwise,
// makes copy of primary key and saves it to last_key. This can later be used to retrieve the entire row
// Parameters:
// [out] buf - buffer for the row, in MySQL format
// keynr - index into key_file that represents DB we are currently operating on.
// [in] row - the row that has been read from the preceding DB call
// [in] found_key - key used to retrieve the row
//
void ha_tokudb::read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_primary_key");
table->status = 0;
if (keynr != primary_key) {
//
// create a DBT that has the same data as row,
//
bzero((void *) &last_key, sizeof(last_key));
last_key.data = key_buff;
last_key.size = row->size;
memcpy(key_buff, row->data, row->size);
}
else {
unpack_row(buf, row, found_key);
}
if (found_key) { DBUG_DUMP("read row key", (uchar *) found_key->data, found_key->size); }
DBUG_VOID_RETURN;
}
//
// This function reads an entire row into buf. This function also assumes that
// the key needed to retrieve the row is stored in the member variable last_key
// Parameters:
// [out] buf - buffer for the row, in MySQL format
// Returns:
// 0 on success, error otherwise
//
int ha_tokudb::read_full_row(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_full_row");
int error;
//
// Read the data into current_row, assumes key is stored in this->last_key
//
current_row.flags = DB_DBT_REALLOC;
if ((error = share->file->get(share->file, transaction, &last_key, &current_row, 0))) {
table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
}
unpack_row(buf, &current_row, &last_key);
TOKUDB_DBUG_RETURN(0);
}
//
// The funtion read_row checks whether the row was obtained from the primary table or
// from an index table. If it was obtained from an index table, it further dereferences on
// the main table. In the end, the read_row function will manage to return the actual row
// of interest in the buf parameter.
//
// Parameters:
// [out] buf - buffer for the row, in MySQL format
// keynr - index into key_file that represents DB we are currently operating on.
// [in] row - the row that has been read from the preceding DB call
// [in] found_key - key used to retrieve the row
//
int ha_tokudb::read_row(uchar * buf, uint keynr, DBT const *row, DBT const *found_key) {
TOKUDB_DBUG_ENTER("ha_tokudb::read_row");
int error;
extract_hidden_primary_key(keynr, row, found_key);
table->status = 0;
//
// if the index shows that the table we read the row from was indexed on the primary key,
......@@ -2436,8 +2588,6 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou
table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
}
// TOKUDB_DBUG_DUMP("key=", key.data, key.size);
// TOKUDB_DBUG_DUMP("row=", row->data, row->size);
unpack_row(buf, &current_row, &key);
}
else {
......@@ -2471,118 +2621,24 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou
//
int ha_tokudb::index_read_idx(uchar * buf, uint keynr, const uchar * key, uint key_len, enum ha_rkey_function find_flag) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_read_idx");
int error;
table->in_use->status_var.ha_read_key_count++;
current_row.flags = DB_DBT_REALLOC;
active_index = MAX_KEY;
TOKUDB_DBUG_RETURN(read_row(share->key_file[keynr]->get(share->key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0), buf, keynr, &current_row, &last_key, 0));
}
//TODO: QQQ Function to tell if a key+keylen is the entire key (loop through the schema), see comparison function for ideas.
/*
if (full_key) {
switch (find_flag) {
case (HA_READ_PREFIX): //Synonym for HA_READ_KEY_EXACT
case (HA_READ_KEY_EXACT):
Just c_get DB_SET, return.
case (HA_READ_AFTER_KEY):
c_get DB_SET_RANGE. If EQUAL to query, then do DB_NEXT (or is it DB_NEXT_NODUP?)
case (HA_READ_KEY_OR_NEXT):
c_get DB_SET_RANGE
case (HA_READ_BEFORE_KEY):
c_get DB_SET_RANGE, then do DB_PREV (or is it DB_PREV_NODUP)?
case (HA_READ_KEY_OR_PREV):
c_get DB_SET_RANGE. If NOT EQUAL to query, then do DB_PREV (or is it DB_PREV_NODUP?)
case (HA_READ_PREFIX_LAST_OR_PREV):
c_get DB_SET_RANGE. If NOT EQUAL to query, then do DB_PREV (or is it DB_PREV_NODUP?)
if if WAS equal to the query, then
if (NO_DUP db, just return it) else:
do DB_NEXT_NODUP
if found, do DB_PREV and return
else do DB_LAST and return
case (HA_READ_PREFIX_LAST):
c_get DB_SET. if !found, return NOT FOUND
if (NO_DUP db, just return it) else:
do c_get DB_NEXT_NODUP
if found, do DB_PREV and return.
else do DB_LAST and return.
default: Crash a lot.
}
else {
// Not full key
switch (find_flag) {
case (HA_READ_PREFIX): //Synonym for HA_READ_KEY_EXACT
case (HA_READ_KEY_EXACT):
c_get DB_SET_RANGE, then check a prefix
case (HA_READ_AFTER_KEY):
c_get DB_SET_RANGE, then:
while (found && query is prefix of 'dbtfound') do:
c_get DB_NEXT_NODUP (Definitely NEXT_NODUP since we care about key only).
case (HA_READ_KEY_OR_NEXT):
c_get SET_RANGE
case (HA_READ_BEFORE_KEY):
c_get DB_SET_RANGE, then do DB_PREV (or is it DB_PREV_NODUP)?
case (HA_READ_KEY_OR_PREV):
c_get DB_SET_RANGE. If query not a prefix of found, then DB_PREV (or is it DB_PREV_NODUP?)
case (HA_READ_PREFIX_LAST_OR_PREV):
c_get DB_SET_RANGE, then:
if (found && query is prefix of whatever found) do:
c_get DB_NEXT till not prefix (and return the one that was)
if (found originally but was not prefix of whatever found) do:
c_get DB_PREV
case (HA_READ_PREFIX_LAST):
c_get DB_SET_RANGE. if !found, or query not prefix of what found, return NOT FOUND
whlie query is prefix of whatfound, do c_get DB_NEXT till not.. then return the last one that was.
default: Crash a lot.
error = share->key_file[keynr]->get(share->key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0);
if (error == DB_NOTFOUND || error == DB_KEYEMPTY) {
error = HA_ERR_KEY_NOT_FOUND;
goto cleanup;
}
if (!error) {
error = read_row(buf, keynr, &current_row, &last_key);
}
}
Note that sometimes if not found, will need things like DB_FIRST or DB_LAST
TODO: QQQ maybe need to pass true/1 as last parameter of read_row (this would make it
return END_OF_FILE instead of just NOT_FOUND
*/
//
// This struct is used to copy results from the DB search into local memory.
//
typedef struct dbt_copy_info {
DBT *key;
DBT *val;
int error;
} *DBT_COPY_INFO;
//
// Verify which are supposed to be prefix and not prefix.
//
// Copies the contents of key and val, the returned values from the search in
// the DB, into a DBT_COPY_INFO.
// Parameters:
// [in] key - returned key object from the search in the DB
// [in] val - returned value object from the search in the DB
// [out] extra_f - context information that was passed into the search
// in the appropriate cursor call
// r_h - value returned by heaviside function
//
//
//
//
static void dbt_copy_heavi(DBT const *key, DBT const *val, void *extra_f, int r_h) {
DBT_COPY_INFO info = (DBT_COPY_INFO)extra_f;
int r;
info->key->size = key->size;
info->key->data = malloc(key->size);
if (!info->key->data) { r = errno; goto cleanup; }
info->key->flags = DB_DBT_REALLOC;
if (key->size) memcpy(info->key->data, key->data, key->size);
info->val->size = val->size;
info->val->data = malloc(val->size);
if (!info->val->data) { r = errno; goto cleanup; }
info->val->flags = DB_DBT_REALLOC;
if (val->size) memcpy(info->val->data, val->data, val->size);
r = 0;
cleanup:
info->error = r;
TOKUDB_DBUG_RETURN(error);
}
//
// context information for the heaviside functions.
// Context information includes data necessary
......@@ -2703,17 +2759,20 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
// TOKUDB_DBUG_DUMP("key=", key, key_len);
DBT row;
int error;
struct smart_dbt_info info;
struct heavi_info heavi_info;
bool do_read_row = true;
CHECK_VALID_CURSOR();
HANDLE_INVALID_CURSOR();
table->in_use->status_var.ha_read_key_count++;
bzero((void *) &row, sizeof(row));
pack_key(&last_key, active_index, key_buff, key, key_len);
struct dbt_copy_info copy_info; //Needed as part of the smart dbt.
struct heavi_info heavi_info; //Needed for the heaviside function.
copy_info.key = &last_key;
copy_info.val = &row;
info.ha = this;
info.buf = buf;
info.keynr = active_index;
heavi_info.db = share->key_file[active_index];
heavi_info.key = &last_key;
switch (find_flag) {
......@@ -2727,14 +2786,22 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
}
break;
case HA_READ_AFTER_KEY: /* Find next rec. after key-record */
error = cursor->c_getf_heavi(cursor, 0, dbt_copy_heavi, &copy_info,
after_key_heavi, &heavi_info, 1);
if (error==0 && copy_info.error!=0) error = copy_info.error;
error = cursor->c_getf_heavi(
cursor, 0,
key_read ? smart_dbt_callback_keyread_heavi : smart_dbt_callback_rowread_heavi, &info,
after_key_heavi, &heavi_info,
1
);
do_read_row = false;
break;
case HA_READ_BEFORE_KEY: /* Find next rec. before key-record */
error = cursor->c_getf_heavi(cursor, 0, dbt_copy_heavi, &copy_info,
before_key_heavi, &heavi_info, -1);
if (error==0 && copy_info.error!=0) error = copy_info.error;
error = cursor->c_getf_heavi(
cursor, 0,
key_read ? smart_dbt_callback_keyread_heavi : smart_dbt_callback_rowread_heavi, &info,
before_key_heavi, &heavi_info,
-1
);
do_read_row = false;
break;
case HA_READ_KEY_OR_NEXT: /* Record or next record */
error = cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE);
......@@ -2751,65 +2818,35 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
error = cursor->c_get(cursor, &last_key, &row, DB_LAST);
break;
case HA_READ_PREFIX_LAST_OR_PREV: /* Last or prev key with the same prefix */
error = cursor->c_getf_heavi(cursor, 0, dbt_copy_heavi, &copy_info,
prefix_last_or_prev_heavi, &heavi_info, -1);
if (error==0 && copy_info.error!=0) error = copy_info.error;
error = cursor->c_getf_heavi(
cursor, 0,
key_read ? smart_dbt_callback_keyread_heavi : smart_dbt_callback_rowread_heavi, &info,
prefix_last_or_prev_heavi, &heavi_info,
-1
);
do_read_row = false;
break;
default:
TOKUDB_TRACE("unsupported:%d\n", find_flag);
error = HA_ERR_UNSUPPORTED;
break;
}
error = read_row(error, buf, active_index, &row, &last_key, 0);
if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR))
TOKUDB_TRACE("error:%d:%d\n", error, find_flag);
cleanup:
//
// Using dbt_copy_heavi (used with c_getf_heavi) will set
// flags==DB_DBT_REALLOC.
// Otherwise, flags will be 0 (which means the DB does memory cleanup).
// We need to clean up our own memory with heaviside functions, since they
// use smart dbts.
//
if (last_key.flags==DB_DBT_REALLOC && last_key.data) {
free(last_key.data);
bzero((void *) &last_key, sizeof(last_key));
error = handle_cursor_error(error,HA_ERR_KEY_NOT_FOUND,active_index);
if (!error && do_read_row) {
error = read_row(buf, active_index, &row, &last_key);
}
if (row.flags==DB_DBT_REALLOC && row.data) {
free(row.data);
bzero((void *) &row, sizeof(row));
else if (!error && !key_read && active_index != primary_key) {
error = read_full_row(buf);
}
if (error && (tokudb_debug & TOKUDB_DEBUG_ERROR)) {
TOKUDB_TRACE("error:%d:%d\n", error, find_flag);
}
cleanup:
TOKUDB_DBUG_RETURN(error);
}
#if 0
/*
Read last key is solved by reading the next key and then reading
the previous key
*/
int ha_tokudb::index_read_last(uchar * buf, const uchar * key, uint key_len) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_read_last");
DBT row;
int error;
KEY *key_info = &table->key_info[active_index];
statistic_increment(table->in_use->status_var.ha_read_key_count, &LOCK_status);
bzero((void *) &row, sizeof(row));
/* read of partial key */
pack_key(&last_key, active_index, key_buff, key, key_len);
/* Store for compare */
memcpy(key_buff2, key_buff, (key_len = last_key.size));
assert(0);
key_info->handler.bdb_return_if_eq = 1;
error = read_row(cursor->c_get(cursor, &last_key, &row, DB_SET_RANGE), buf, active_index, &row, (DBT *) 0, 0);
key_info->handler.bdb_return_if_eq = 0;
bzero((void *) &row, sizeof(row));
if (read_row(cursor->c_get(cursor, &last_key, &row, DB_PREV), buf, active_index, &row, &last_key, 1) || tokudb_key_cmp(table, key_info, key_buff2, key_len))
error = HA_ERR_KEY_NOT_FOUND;
TOKUDB_DBUG_RETURN(error);
}
#endif
//
// Reads the next row from the active index (cursor) into buf, and advances cursor
......@@ -2823,13 +2860,23 @@ int ha_tokudb::index_read_last(uchar * buf, const uchar * key, uint key_len) {
int ha_tokudb::index_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_next");
int error;
DBT row;
u_int32_t flags = SET_READ_FLAG(DB_NEXT);
CHECK_VALID_CURSOR();
struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0);
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
bzero((void *) &row, sizeof(row));
error = read_row(cursor->c_get(cursor, &last_key, &row, flags), buf, active_index, &row, &last_key, 1);
info.ha = this;
info.buf = buf;
info.keynr = active_index;
error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info), HA_ERR_END_OF_FILE,active_index);
//
// still need to get entire contents of the row if operation done on
// secondary DB and it was NOT a covering index
//
if (!error && !key_read && (active_index != primary_key) ) {
error = read_full_row(buf);
}
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -2847,23 +2894,30 @@ cleanup:
//
int ha_tokudb::index_next_same(uchar * buf, const uchar * key, uint keylen) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_next_same %p", this);
DBT row;
int error;
CHECK_VALID_CURSOR();
struct smart_dbt_info info;
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
bzero((void *) &row, sizeof(row));
info.ha = this;
info.buf = buf;
info.keynr = active_index;
/* QQQ NEXT_DUP on nodup returns EINVAL for tokudb */
if (keylen == table->key_info[active_index].key_length &&
!(table->key_info[active_index].flags & HA_NOSAME) &&
!(table->key_info[active_index].flags & HA_END_SPACE_KEY)) {
u_int32_t flags = SET_READ_FLAG(DB_NEXT_DUP);
error = cursor->c_get(cursor, &last_key, &row, flags);
error = read_row(error, buf, active_index, &row, &last_key, 1);
u_int32_t flags = SET_READ_FLAG(0);
error = handle_cursor_error(cursor->c_getf_next_dup(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,active_index);
if (!error && !key_read && active_index != primary_key) {
error = read_full_row(buf);
}
} else {
u_int32_t flags = SET_READ_FLAG(DB_NEXT);
error = read_row(cursor->c_get(cursor, &last_key, &row, flags), buf, active_index, &row, &last_key, 1);
u_int32_t flags = SET_READ_FLAG(0);
error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,active_index);
if (!error && !key_read && active_index != primary_key) {
error = read_full_row(buf);
}
if (!error &&::key_cmp_if_same(table, key, active_index, keylen))
error = HA_ERR_END_OF_FILE;
}
......@@ -2881,14 +2935,26 @@ cleanup:
// error otherwise
//
int ha_tokudb::index_prev(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_prev");
TOKUDB_DBUG_ENTER("ha_tokudb::index_next");
int error;
u_int32_t flags = SET_READ_FLAG(DB_PREV);
CHECK_VALID_CURSOR();
DBT row;
statistic_increment(table->in_use->status_var.ha_read_prev_count, &LOCK_status);
bzero((void *) &row, sizeof(row));
error = read_row(cursor->c_get(cursor, &last_key, &row, flags), buf, active_index, &row, &last_key, 1);
struct smart_dbt_info info;
u_int32_t flags = SET_READ_FLAG(0);
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_next_count, &LOCK_status);
info.ha = this;
info.buf = buf;
info.keynr = active_index;
error = handle_cursor_error(cursor->c_getf_prev(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,active_index);
//
// still need to get entire contents of the row if operation done on
// secondary DB and it was NOT a covering index
//
if (!error && !key_read && (active_index != primary_key) ) {
error = read_full_row(buf);
}
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -2906,10 +2972,15 @@ int ha_tokudb::index_first(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_first");
int error;
DBT row;
CHECK_VALID_CURSOR();
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_first_count, &LOCK_status);
bzero((void *) &row, sizeof(row));
error = read_row(cursor->c_get(cursor, &last_key, &row, DB_FIRST), buf, active_index, &row, &last_key, 1);
error = handle_cursor_error(cursor->c_get(cursor, &last_key, &row, DB_FIRST),HA_ERR_END_OF_FILE,active_index);
if (!error) {
error = read_row(buf, active_index, &row, &last_key);
}
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -2927,10 +2998,14 @@ int ha_tokudb::index_last(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::index_last");
int error;
DBT row;
CHECK_VALID_CURSOR();
HANDLE_INVALID_CURSOR();
statistic_increment(table->in_use->status_var.ha_read_last_count, &LOCK_status);
bzero((void *) &row, sizeof(row));
error = read_row(cursor->c_get(cursor, &last_key, &row, DB_LAST), buf, active_index, &row, &last_key, 1);
error = handle_cursor_error(cursor->c_get(cursor, &last_key, &row, DB_LAST),HA_ERR_END_OF_FILE,active_index);
if (!error) {
error = read_row(buf, active_index, &row, &last_key);
}
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -2977,6 +3052,7 @@ int ha_tokudb::rnd_end() {
TOKUDB_DBUG_RETURN(index_end());
}
//
// Read the next row in a table scan
// Parameters:
......@@ -2989,18 +3065,21 @@ int ha_tokudb::rnd_end() {
int ha_tokudb::rnd_next(uchar * buf) {
TOKUDB_DBUG_ENTER("ha_tokudb::ha_tokudb::rnd_next");
int error;
DBT row;
u_int32_t flags = SET_READ_FLAG(DB_NEXT);
u_int32_t flags = SET_READ_FLAG(0);
struct smart_dbt_info info;
CHECK_VALID_CURSOR()
HANDLE_INVALID_CURSOR();
//
// The reason we do not just call index_next is that index_next
// increments a different variable than we do here
//
statistic_increment(table->in_use->status_var.ha_read_rnd_next_count, &LOCK_status);
bzero((void *) &row, sizeof(row));
DBUG_DUMP("last_key", (uchar *) last_key.data, last_key.size);
error = read_row(cursor->c_get(cursor, &last_key, &row, flags), buf, primary_key, &row, &last_key, 1);
info.ha = this;
info.buf = buf;
info.keynr = primary_key;
error = handle_cursor_error(cursor->c_getf_next(cursor, flags, SMART_DBT_CALLBACK, &info),HA_ERR_END_OF_FILE,primary_key);
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -3036,10 +3115,21 @@ DBT *ha_tokudb::get_pos(DBT * to, uchar * pos) {
int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
TOKUDB_DBUG_ENTER("ha_tokudb::rnd_pos");
DBT db_pos;
int error;
statistic_increment(table->in_use->status_var.ha_read_rnd_count, &LOCK_status);
active_index = MAX_KEY;
DBT* key = get_pos(&db_pos, pos);
TOKUDB_DBUG_RETURN(read_row(share->file->get(share->file, transaction, key, &current_row, 0), buf, primary_key, &current_row, key, 0));
error = share->file->get(share->file, transaction, key, &current_row, 0);
if (error == DB_NOTFOUND || error == DB_KEYEMPTY) {
error = HA_ERR_KEY_NOT_FOUND;
goto cleanup;
}
if (!error) {
error = read_row(buf, primary_key, &current_row, key);
}
cleanup:
TOKUDB_DBUG_RETURN(error);
}
......@@ -4035,9 +4125,10 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
DBC* tmp_cursor = NULL;
int cursor_ret_val = 0;
DBT current_primary_key;
DBT row;
DB_TXN* txn = NULL;
uchar tmp_key_buff[2*table_arg->s->rec_buff_length];
uchar tmp_prim_key_buff[2*table_arg->s->rec_buff_length];
//
// number of DB files we have open currently, before add_index is executed
//
......@@ -4054,8 +4145,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
//
uchar tmp_record[table_arg->s->rec_buff_length];
bzero((void *) &current_primary_key, sizeof(current_primary_key));
bzero((void *) &row, sizeof(row));
current_primary_key.data = tmp_prim_key_buff;
//
// The files for secondary tables are derived from the name of keys
......@@ -4150,13 +4240,19 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
// for each element in the primary table, insert the proper key value pair in each secondary table
// that is created
//
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &current_primary_key, &row, DB_NEXT | DB_PRELOCKED);
struct smart_dbt_ai_info info;
info.ha = this;
info.prim_key = &current_primary_key;
info.buf = tmp_record;
cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_ai_callback, &info);
while (cursor_ret_val != DB_NOTFOUND) {
if (cursor_ret_val) {
error = cursor_ret_val;
goto cleanup;
}
unpack_row(tmp_record, &row, &current_primary_key);
for (uint i = 0; i < num_of_keys; i++) {
DBT secondary_key;
create_dbt_key_from_key(&secondary_key,&key_info[i], tmp_key_buff, tmp_record);
......@@ -4183,7 +4279,11 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
goto cleanup;
}
}
<<<<<<< .mine
cursor_ret_val = tmp_cursor->c_getf_next(tmp_cursor, DB_PRELOCKED, smart_dbt_ai_callback, &info);
=======
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &current_primary_key, &row, DB_NEXT | DB_PRELOCKED);
>>>>>>> .r5430
}
error = txn->commit(txn, 0);
assert(error == 0);
......
......@@ -160,8 +160,7 @@ private:
ulong max_row_length(const uchar * buf);
int pack_row(DBT * row, const uchar * record);
void unpack_row(uchar * record, DBT * row, DBT* key);
void unpack_key(uchar * record, DBT * key, uint index);
void unpack_key(uchar * record, DBT const *key, uint index);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length);
......@@ -170,7 +169,7 @@ private:
int restore_keys(DB_TXN * trans, key_map * changed_keys, uint primary_key, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * new_key);
int key_cmp(uint keynr, const uchar * old_row, const uchar * new_row);
int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key, bool local_using_ignore);
int read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * key, bool);
int handle_cursor_error(int error, int err_to_return, uint keynr);
DBT *get_pos(DBT * to, uchar * pos);
int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type);
......@@ -306,8 +305,14 @@ public:
// delete all rows from the table
// effect: all dictionaries, including the main and indexes, should be empty
int delete_all_rows();
void extract_hidden_primary_key(uint keynr, DBT const *row, DBT const *found_key);
void read_key_only(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
void read_primary_key(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
int read_row(uchar * buf, uint keynr, DBT const *row, DBT const *found_key);
void unpack_row(uchar * record, DBT const *row, DBT const *key);
private:
int read_full_row(uchar * buf);
int __close(int mutex_is_locked);
int read_last();
ulong field_offset(Field *);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment