Commit dd715af4 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #846

merge back into main branch

git-svn-id: file:///svn/mysql/tokudb-engine/src@4138 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5cd4cb4a
......@@ -171,6 +171,7 @@ static bool tokudb_show_status(handlerton * hton, THD * thd, stat_print_fn * pri
static int tokudb_close_connection(handlerton * hton, THD * thd);
static int tokudb_commit(handlerton * hton, THD * thd, bool all);
static int tokudb_rollback(handlerton * hton, THD * thd, bool all);
static uint tokudb_alter_table_flags(uint flags);
#if 0
static int tokudb_rollback_to_savepoint(handlerton * hton, THD * thd, void *savepoint);
static int tokudb_savepoint(handlerton * hton, THD * thd, void *savepoint);
......@@ -218,7 +219,7 @@ static int tokudb_init_func(void *p) {
tokudb_hton->panic = tokudb_end;
tokudb_hton->flush_logs = tokudb_flush_logs;
tokudb_hton->show_status = tokudb_show_status;
tokudb_hton->alter_table_flags = tokudb_alter_table_flags;
#if 0
if (!tokudb_tmpdir)
tokudb_tmpdir = mysql_tmpdir;
......@@ -366,30 +367,27 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE * table) {
if (!(share = (TOKUDB_SHARE *) hash_search(&tokudb_open_tables, (uchar *) table_name, length))) {
ulong *rec_per_key;
char *tmp_name;
DB **key_file;
u_int32_t *key_type;
uint keys = table->s->keys;
uint num_keys = table->s->keys;
if (!(share = (TOKUDB_SHARE *)
my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
&share, sizeof(*share),
&tmp_name, length + 1,
&rec_per_key, keys * sizeof(ha_rows),
&key_file, (keys + 1) * sizeof(*key_file),
&key_type, (keys + 1) * sizeof(u_int32_t),
&rec_per_key, num_keys * sizeof(ha_rows),
&key_type, (num_keys + 1) * sizeof(u_int32_t),
NullS))) {
pthread_mutex_unlock(&tokudb_mutex);
return NULL;
}
share->use_count = 0;
share->table_name_length = length;
share->table_name = tmp_name;
strmov(share->table_name, table_name);
share->rec_per_key = rec_per_key;
share->key_file = key_file;
share->key_type = key_type;
bzero((void *) share->key_file, sizeof(share->key_file));
if (my_hash_insert(&tokudb_open_tables, (uchar *) share))
goto error;
......@@ -409,7 +407,7 @@ static TOKUDB_SHARE *get_share(const char *table_name, TABLE * table) {
static int free_share(TOKUDB_SHARE * share, TABLE * table, uint hidden_primary_key, bool mutex_is_locked) {
int error, result = 0;
uint keys = table->s->keys + test(hidden_primary_key);
uint num_keys = table->s->keys + test(hidden_primary_key);
pthread_mutex_lock(&tokudb_mutex);
......@@ -422,7 +420,7 @@ static int free_share(TOKUDB_SHARE * share, TABLE * table, uint hidden_primary_k
/* this does share->file->close() implicitly */
update_status(share, table);
for (uint i = 0; i < keys; i++) {
for (uint i = 0; i < num_keys; i++) {
if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("dbclose:%p\n", key_file[i]);
if (key_file[i] && (error = key_file[i]->close(key_file[i], 0)))
......@@ -646,6 +644,20 @@ void tokudb_cleanup_log_files(void) {
DBUG_VOID_RETURN;
}
//
// *******NOTE*****
// If the flags HA_ONLINE_DROP_INDEX and HA_ONLINE_DROP_UNIQUE_INDEX
// are ever added, prepare_drop_index and final_drop_index will need to be modified
// so that the actual deletion of DB's is done in final_drop_index and not prepare_drop_index
//
static uint tokudb_alter_table_flags(uint flags)
{
return (HA_ONLINE_ADD_INDEX_NO_WRITES| HA_ONLINE_DROP_INDEX_NO_WRITES |
HA_ONLINE_ADD_UNIQUE_INDEX_NO_WRITES| HA_ONLINE_DROP_UNIQUE_INDEX_NO_WRITES);
}
static int get_name_length(const char *name) {
int n = 0;
const char *newname = name;
......@@ -672,9 +684,10 @@ static void make_name(char *newname, const char *tablename, const char *dictname
nn += sprintf(nn, "/%s%s", dictname, ha_tokudb_ext);
}
ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg)
:
handler(hton, table_arg), alloc_ptr(0), rec_buff(0), file(0),
handler(hton, table_arg), alloc_ptr(0), rec_buff(0),
// flags defined in sql\handler.h
int_table_flags(HA_REC_NOT_IN_SEQ | HA_FAST_KEY_READ | HA_NULL_IN_KEY | HA_CAN_INDEX_BLOBS | HA_PRIMARY_KEY_IN_READ_INDEX |
HA_FILE_BASED | HA_CAN_GEOMETRY | HA_AUTO_PART_KEY | HA_TABLE_SCAN_ON_INDEX),
......@@ -940,6 +953,60 @@ static bool tokudb_key_cmp(TABLE * table, KEY * key_info, const uchar * key, uin
}
#endif
//
// Open a secondary table, the key will be a secondary index, the data will be a primary key
//
int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type) {
int error = ENOSYS;
char part[MAX_ALIAS_NAME + 10];
char name_buff[FN_REFLEN];
uint open_flags = (mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD;
char newname[strlen(name) + 32];
DBT cmp_byte_stream;
if (tokudb_init_flags & DB_INIT_TXN)
open_flags += DB_AUTO_COMMIT;
if ((error = db_create(ptr, db_env, 0))) {
my_errno = error;
goto cleanup;
}
sprintf(part, "key-%s", key_info->name);
make_name(newname, name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
*key_type = key_info->flags & HA_NOSAME ? DB_NOOVERWRITE : DB_YESOVERWRITE;
(*ptr)->app_private = (void *) (key_info);
if (tokudb_debug & TOKUDB_DEBUG_SAVE_TRACE) {
bzero((void *) &cmp_byte_stream, sizeof(cmp_byte_stream));
cmp_byte_stream.flags = DB_DBT_MALLOC;
if ((error = tokutrace_db_get_cmp_byte_stream(*ptr, &cmp_byte_stream))) {
my_errno = error;
goto cleanup;
}
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
my_free(cmp_byte_stream.data, MYF(0));
}
else
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
if (!(key_info->flags & HA_NOSAME)) {
DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %s\n", key_info->name));
(*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT);
(*ptr)->api_internal = share->file->app_private;
(*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key);
}
if ((error = (*ptr)->open(*ptr, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
my_errno = error;
goto cleanup;
}
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s:file=%p\n", newname, *ptr);
}
cleanup:
return error;
}
//
// Creates and opens a handle to a table which already exists in a tokudb
// database.
......@@ -994,13 +1061,10 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
TOKUDB_DBUG_RETURN(1);
}
thr_lock_data_init(&share->lock, &lock, NULL);
key_file = share->key_file;
key_type = share->key_type;
bzero((void *) &current_row, sizeof(current_row));
/* Fill in shared structure, if needed */
pthread_mutex_lock(&share->mutex);
file = share->file;
if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("tokudbopen:%p:share=%p:file=%p:table=%p:table->s=%p:%d\n",
this, share, share->file, table, table->s, share->use_count);
......@@ -1008,37 +1072,36 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
DBUG_PRINT("info", ("share->use_count %u", share->use_count));
DBT cmp_byte_stream;
if ((error = db_create(&file, db_env, 0))) {
if ((error = db_create(&share->file, db_env, 0))) {
free_share(share, table, hidden_primary_key, 1);
my_free((char *) rec_buff, MYF(0));
my_free(alloc_ptr, MYF(0));
my_errno = error;
TOKUDB_DBUG_RETURN(1);
}
share->file = file;
if (!hidden_primary_key)
file->app_private = (void *) (table_share->key_info + table_share->primary_key);
share->file->app_private = (void *) (table_share->key_info + table_share->primary_key);
if (tokudb_debug & TOKUDB_DEBUG_SAVE_TRACE) {
bzero((void *) &cmp_byte_stream, sizeof(cmp_byte_stream));
cmp_byte_stream.flags = DB_DBT_MALLOC;
if ((error = tokutrace_db_get_cmp_byte_stream(file, &cmp_byte_stream))) {
if ((error = tokutrace_db_get_cmp_byte_stream(share->file, &cmp_byte_stream))) {
free_share(share, table, hidden_primary_key, 1);
my_free((char *) rec_buff, MYF(0));
my_free(alloc_ptr, MYF(0));
my_errno = error;
TOKUDB_DBUG_RETURN(1);
}
file->set_bt_compare(file, (hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_packed_key));
share->file->set_bt_compare(share->file, (hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_packed_key));
my_free(cmp_byte_stream.data, MYF(0));
}
else
file->set_bt_compare(file, (hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_packed_key));
share->file->set_bt_compare(share->file, (hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_packed_key));
char newname[strlen(name) + 32];
make_name(newname, name, "main");
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
if ((error = file->open(file, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
if ((error = share->file->open(share->file, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
free_share(share, table, hidden_primary_key, 1);
my_free((char *) rec_buff, MYF(0));
my_free(alloc_ptr, MYF(0));
......@@ -1046,53 +1109,19 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
TOKUDB_DBUG_RETURN(1);
}
if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("open:%s:file=%p\n", newname, file);
TOKUDB_TRACE("open:%s:file=%p\n", newname, share->file);
/* Open other keys; These are part of the share structure */
key_file[primary_key] = file;
key_type[primary_key] = hidden_primary_key ? 0 : DB_NOOVERWRITE;
share->key_file[primary_key] = share->file;
share->key_type[primary_key] = hidden_primary_key ? 0 : DB_NOOVERWRITE;
DB **ptr = key_file;
for (uint i = 0, used_keys = 0; i < table_share->keys; i++, ptr++) {
char part[16];
DB **ptr = share->key_file;
for (uint i = 0; i < table_share->keys; i++, ptr++) {
if (i != primary_key) {
if ((error = db_create(ptr, db_env, 0))) {
__close(1);
my_errno = error;
TOKUDB_DBUG_RETURN(1);
}
sprintf(part, "key%d", ++used_keys);
make_name(newname, name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
key_type[i] = table->key_info[i].flags & HA_NOSAME ? DB_NOOVERWRITE : DB_YESOVERWRITE;
(*ptr)->app_private = (void *) (table_share->key_info + i);
if (tokudb_debug & TOKUDB_DEBUG_SAVE_TRACE) {
bzero((void *) &cmp_byte_stream, sizeof(cmp_byte_stream));
cmp_byte_stream.flags = DB_DBT_MALLOC;
if ((error = tokutrace_db_get_cmp_byte_stream(*ptr, &cmp_byte_stream))) {
__close(1);
my_errno = error;
TOKUDB_DBUG_RETURN(1);
}
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
my_free(cmp_byte_stream.data, MYF(0));
}
else
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_packed_key);
if (!(table->key_info[i].flags & HA_NOSAME)) {
DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %u", i));
(*ptr)->set_flags(*ptr, DB_DUP + DB_DUPSORT);
(*ptr)->api_internal = file->app_private;
(*ptr)->set_dup_compare(*ptr, hidden_primary_key ? tokudb_cmp_hidden_key : tokudb_cmp_primary_key);
}
if ((error = (*ptr)->open(*ptr, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) {
if ((error = open_secondary_table(ptr,&table_share->key_info[i],name,mode,&share->key_type[i]))) {
__close(1);
my_errno = error;
TOKUDB_DBUG_RETURN(1);
}
if (tokudb_debug & TOKUDB_DEBUG_OPEN)
TOKUDB_TRACE("open:%s:file=%p\n", newname, *ptr);
}
}
/* Calculate pack_length of primary key */
......@@ -1275,12 +1304,13 @@ void ha_tokudb::unpack_key(uchar * record, DBT * key, uint index) {
}
}
//
// Create a packed key from a row. This key will be written as such
// to the index tree. This will never fail as the key buffer is pre-allocated.
// Parameters:
// [out] key - DBT that holds the key
// keynr - index for which to create the key
// [in] key_info - holds data about the key, such as it's length and offset into record
// [out] buff - buffer that will hold the data for key (unless
// we have a hidden primary key)
// [in] record - row from which to create the key
......@@ -1289,23 +1319,19 @@ void ha_tokudb::unpack_key(uchar * record, DBT * key, uint index) {
// the parameter key
//
DBT *ha_tokudb::create_key(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length) {
TOKUDB_DBUG_ENTER("ha_tokudb::create_key");
bzero((void *) key, sizeof(*key));
if (hidden_primary_key && keynr == primary_key) {
key->data = current_ident;
key->size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
DBUG_RETURN(key);
}
KEY *key_info = table->key_info + keynr;
DBT* ha_tokudb::create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length) {
KEY_PART_INFO *key_part = key_info->key_part;
KEY_PART_INFO *end = key_part + key_info->key_parts;
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
key->data = buff;
for (; key_part != end && key_length > 0; key_part++) {
if (key_part->null_bit) {
//
// accessing key_part->field->null_bit instead off key_part->null_bit
// because key_part->null_bit is not set in add_index
// filed ticket 862 to look into this
//
if (key_part->field->null_bit) {
/* Store 0 if the key part is a NULL part */
if (record[key_part->null_offset] & key_part->null_bit) {
*buff++ = 0;
......@@ -1318,7 +1344,12 @@ DBT *ha_tokudb::create_key(DBT * key, uint keynr, uchar * buff, const uchar * re
}
*buff++ = 1; // Store NOT NULL marker
}
buff = key_part->field->pack_key(buff, (uchar *) (record + key_part->offset),
//
// accessing field_offset(key_part->field) instead off key_part->offset
// because key_part->offset is SET INCORRECTLY in add_index
// filed ticket 862 to look into this
//
buff = key_part->field->pack_key(buff, (uchar *) (record + field_offset(key_part->field)),
#if MYSQL_VERSION_ID < 50123
key_part->length);
#else
......@@ -1329,7 +1360,32 @@ DBT *ha_tokudb::create_key(DBT * key, uint keynr, uchar * buff, const uchar * re
key->size = (buff - (uchar *) key->data);
DBUG_DUMP("key", (uchar *) key->data, key->size);
dbug_tmp_restore_column_map(table->write_set, old_map);
DBUG_RETURN(key);
return key;
}
//
// Create a packed key from a row. This key will be written as such
// to the index tree. This will never fail as the key buffer is pre-allocated.
// Parameters:
// [out] key - DBT that holds the key
// keynr - index for which to create the key
// [out] buff - buffer that will hold the data for key (unless
// we have a hidden primary key)
// [in] record - row from which to create the key
// key_length - currently set to MAX_KEY_LENGTH, is it size of buff?
// Returns:
// the parameter key
//
DBT *ha_tokudb::create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length) {
TOKUDB_DBUG_ENTER("ha_tokudb::create_dbt_key_from_table");
bzero((void *) key, sizeof(*key));
if (hidden_primary_key && keynr == primary_key) {
key->data = current_ident;
key->size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
DBUG_RETURN(key);
}
DBUG_RETURN(create_dbt_key_from_key(key, &table->key_info[keynr],buff,record,key_length));
}
......@@ -1613,36 +1669,38 @@ int ha_tokudb::write_row(uchar * record) {
get_auto_primary_key(current_ident);
}
u_int32_t put_flags = key_type[primary_key];
u_int32_t put_flags = share->key_type[primary_key];
THD *thd = ha_thd();
if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
put_flags = DB_YESOVERWRITE;
}
if (table_share->keys + test(hidden_primary_key) == 1) {
error = file->put(file, transaction, create_key(&prim_key, primary_key, key_buff, record), &row, put_flags);
error = share->file->put(share->file, transaction, create_dbt_key_from_table(&prim_key, primary_key, key_buff, record), &row, put_flags);
last_dup_key = primary_key;
} else {
DB_TXN *sub_trans = transaction;
/* QQQ Don't use sub transactions in temporary tables */
for (uint retry = 0; retry < tokudb_trans_retry; retry++) {
key_map changed_keys(0);
if (!(error = file->put(file, sub_trans, create_key(&prim_key, primary_key, key_buff, record), &row, put_flags))) {
if (!(error = share->file->put(share->file, sub_trans, create_dbt_key_from_table(&prim_key, primary_key, key_buff, record), &row, put_flags))) {
changed_keys.set_bit(primary_key);
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
if (keynr == primary_key)
continue;
put_flags = key_type[keynr];
put_flags = share->key_type[keynr];
if (put_flags == DB_NOOVERWRITE && thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS))
put_flags = DB_YESOVERWRITE;
if ((error = key_file[keynr]->put(key_file[keynr], sub_trans, create_key(&key, keynr, key_buff2, record), &prim_key, put_flags))) {
if ((error = share->key_file[keynr]->put(share->key_file[keynr], sub_trans, create_dbt_key_from_table(&key, keynr, key_buff2, record), &prim_key, put_flags))) {
last_dup_key = keynr;
break;
}
changed_keys.set_bit(keynr);
}
} else
}
else {
last_dup_key = primary_key;
}
if (error) {
/* Remove inserted row */
DBUG_PRINT("error", ("Got error %d", error));
......@@ -1711,12 +1769,12 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
// Delete the old row and add a new one
if (!(error = remove_key(trans, primary_key, old_row, old_key))) {
if (!(error = pack_row(&row, new_row))) {
if ((error = file->put(file, trans, new_key, &row, key_type[primary_key]))) {
if ((error = share->file->put(share->file, trans, new_key, &row, share->key_type[primary_key]))) {
// Probably a duplicated key; restore old key and row if needed
last_dup_key = primary_key;
if (local_using_ignore) {
int new_error;
if ((new_error = pack_row(&row, old_row)) || (new_error = file->put(file, trans, old_key, &row, key_type[primary_key])))
if ((new_error = pack_row(&row, old_row)) || (new_error = share->file->put(share->file, trans, old_key, &row, share->key_type[primary_key])))
error = new_error; // fatal error
}
}
......@@ -1725,7 +1783,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
} else {
// Primary key didn't change; just update the row data
if (!(error = pack_row(&row, new_row)))
error = file->put(file, trans, new_key, &row, 0);
error = share->file->put(share->file, trans, new_key, &row, 0);
}
TOKUDB_DBUG_RETURN(error);
}
......@@ -1756,7 +1814,7 @@ int ha_tokudb::restore_keys(DB_TXN * trans, key_map * changed_keys, uint primary
if (changed_keys->is_set(keynr)) {
if (changed_keys->is_prefix(1) && (error = remove_key(trans, keynr, new_row, new_key)))
break;
if ((error = key_file[keynr]->put(key_file[keynr], trans, create_key(&tmp_key, keynr, key_buff2, old_row), old_key, key_type[keynr])))
if ((error = share->key_file[keynr]->put(share->key_file[keynr], trans, create_dbt_key_from_table(&tmp_key, keynr, key_buff2, old_row), old_key, share->key_type[keynr])))
break;
}
}
......@@ -1795,10 +1853,10 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
prim_key.size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
old_prim_key = prim_key;
} else {
create_key(&prim_key, primary_key, key_buff, new_row);
create_dbt_key_from_table(&prim_key, primary_key, key_buff, new_row);
if ((primary_key_changed = key_cmp(primary_key, old_row, new_row)))
create_key(&old_prim_key, primary_key, primary_key_buff, old_row);
create_dbt_key_from_table(&old_prim_key, primary_key, primary_key_buff, old_row);
else
old_prim_key = prim_key;
}
......@@ -1817,7 +1875,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
TOKUDB_DBUG_RETURN(error); // Fatal error
}
changed_keys.set_bit(keynr);
if ((error = key_file[keynr]->put(key_file[keynr], sub_trans, create_key(&key, keynr, key_buff2, new_row), &prim_key, key_type[keynr]))) {
if ((error = share->key_file[keynr]->put(share->key_file[keynr], sub_trans, create_dbt_key_from_table(&key, keynr, key_buff2, new_row), &prim_key, share->key_type[keynr]))) {
last_dup_key = keynr;
break;
}
......@@ -1873,7 +1931,7 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT
else if (keynr == primary_key || ((table->key_info[keynr].flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME)) { // Unique key
DBUG_PRINT("Unique key", ("index: %d", keynr));
DBUG_ASSERT(keynr == primary_key || prim_key->data != key_buff2);
error = key_file[keynr]->del(key_file[keynr], trans, keynr == primary_key ? prim_key : create_key(&key, keynr, key_buff2, record), 0);
error = share->key_file[keynr]->del(share->key_file[keynr], trans, keynr == primary_key ? prim_key : create_dbt_key_from_table(&key, keynr, key_buff2, record), 0);
} else {
/* QQQ use toku_db_delboth(key_file[keynr], key, val, trans);
To delete the not duplicated key, we need to open an cursor on the
......@@ -1882,8 +1940,8 @@ int ha_tokudb::remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT
*/
DBUG_ASSERT(keynr != primary_key && prim_key->data != key_buff2);
DBC *tmp_cursor;
if (!(error = key_file[keynr]->cursor(key_file[keynr], trans, &tmp_cursor, 0))) {
if (!(error = tmp_cursor->c_get(tmp_cursor, create_key(&key, keynr, key_buff2, record), prim_key, DB_GET_BOTH))) {
if (!(error = share->key_file[keynr]->cursor(share->key_file[keynr], trans, &tmp_cursor, 0))) {
if (!(error = tmp_cursor->c_get(tmp_cursor, create_dbt_key_from_table(&key, keynr, key_buff2, record), prim_key, DB_GET_BOTH))) {
DBUG_DUMP("cget key", (uchar *) key.data, key.size);
error = tmp_cursor->c_del(tmp_cursor, 0);
}
......@@ -1936,7 +1994,7 @@ int ha_tokudb::delete_row(const uchar * record) {
key_map keys = table_share->keys_in_use;
statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status);
create_key(&prim_key, primary_key, key_buff, record);
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record);
if (hidden_primary_key)
keys.set_bit(primary_key);
......@@ -1983,8 +2041,8 @@ int ha_tokudb::index_init(uint keynr, bool sorted) {
}
active_index = keynr;
DBUG_ASSERT(keynr <= table->s->keys);
DBUG_ASSERT(key_file[keynr]);
if ((error = key_file[keynr]->cursor(key_file[keynr], transaction, &cursor, table->reginfo.lock_type > TL_WRITE_ALLOW_READ ? 0 : 0)))
DBUG_ASSERT(share->key_file[keynr]);
if ((error = share->key_file[keynr]->cursor(share->key_file[keynr], transaction, &cursor, table->reginfo.lock_type > TL_WRITE_ALLOW_READ ? 0 : 0)))
cursor = NULL; // Safety
bzero((void *) &last_key, sizeof(last_key));
TOKUDB_DBUG_RETURN(error);
......@@ -2074,7 +2132,7 @@ int ha_tokudb::read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * fou
// Read the data into current_row
//
current_row.flags = DB_DBT_REALLOC;
if ((error = file->get(file, transaction, &key, &current_row, 0))) {
if ((error = share->file->get(share->file, transaction, &key, &current_row, 0))) {
table->status = STATUS_NOT_FOUND;
TOKUDB_DBUG_RETURN(error == DB_NOTFOUND ? HA_ERR_CRASHED : error);
}
......@@ -2109,7 +2167,7 @@ int ha_tokudb::index_read_idx(uchar * buf, uint keynr, const uchar * key, uint k
table->in_use->status_var.ha_read_key_count++;
current_row.flags = DB_DBT_REALLOC;
active_index = MAX_KEY;
TOKUDB_DBUG_RETURN(read_row(key_file[keynr]->get(key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0), buf, keynr, &current_row, &last_key, 0));
TOKUDB_DBUG_RETURN(read_row(share->key_file[keynr]->get(share->key_file[keynr], transaction, pack_key(&last_key, keynr, key_buff, key, key_len), &current_row, 0), buf, keynr, &current_row, &last_key, 0));
}
//TODO: QQQ Function to tell if a key+keylen is the entire key (loop through the schema), see comparison function for ideas.
......@@ -2485,7 +2543,7 @@ int ha_tokudb::rnd_pos(uchar * buf, uchar * pos) {
statistic_increment(table->in_use->status_var.ha_read_rnd_count, &LOCK_status);
active_index = MAX_KEY;
DBT* key = get_pos(&db_pos, pos);
TOKUDB_DBUG_RETURN(read_row(file->get(file, transaction, key, &current_row, 0), buf, primary_key, &current_row, key, 0));
TOKUDB_DBUG_RETURN(read_row(share->file->get(share->file, transaction, key, &current_row, 0), buf, primary_key, &current_row, key, 0));
}
/*
......@@ -2516,7 +2574,7 @@ void ha_tokudb::position(const uchar * record) {
DBUG_ASSERT(ref_length == TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
memcpy_fixed(ref, (char *) current_ident, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
} else {
create_key(&key, primary_key, ref, record);
create_dbt_key_from_table(&key, primary_key, ref, record);
if (key.size < ref_length)
bzero(ref + key.size, ref_length - key.size);
}
......@@ -2836,6 +2894,10 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
char newname[get_name_length(name) + 32];
uint i;
//
// tracing information about what type of table we are creating
//
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
for (i = 0; i < form->s->fields; i++) {
Field *field = form->s->field[i];
......@@ -2854,8 +2916,11 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
}
}
//
// check if auto increment is properly defined
// tokudb only supports auto increment on the first field in the primary key
// or the first field in the row
//
int pk_found = 0;
int ai_found = 0;
for (i = 0; i < form->s->keys; i++) {
......@@ -2902,11 +2967,10 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
primary_key = form->s->primary_key;
/* Create the keys */
char part[16];
uint index = 1;
char part[MAX_ALIAS_NAME + 10];
for (uint i = 0; i < form->s->keys; i++) {
if (i != primary_key) {
sprintf(part, "key%d", index++);
sprintf(part, "key-%s", form->s->key_info[i].name);
make_name(newname, name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, (form->key_info[i].flags & HA_NOSAME) ? 0 : DB_DUP + DB_DUPSORT);
......@@ -3055,7 +3119,7 @@ ha_rows ha_tokudb::records_in_range(uint keynr, key_range* start_key, key_range*
TOKUDB_DBUG_ENTER("ha_tokudb::records_in_range");
DBT key;
ha_rows ret_val = HA_TOKUDB_RANGE_COUNT;
DB *kfile = key_file[keynr];
DB *kfile = share->key_file[keynr];
u_int64_t less, equal, greater;
u_int64_t start_rows, end_rows, rows;
int is_exact;
......@@ -3151,6 +3215,220 @@ void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulongl
DBUG_VOID_RETURN;
}
//
// Adds indexes to the table. Takes the array of KEY passed in key_info, and creates
// DB's that will go at the end of share->key_file. THE IMPLICIT ASSUMPTION HERE is
// that the table will be modified and that these added keys will be appended to the end
// of the array table->key_info
// Parameters:
// [in] table_arg - table that is being modified, seems to be identical to this->table
// [in] key_info - array of KEY's to be added
// num_of_keys - number of keys to be added, number of elements in key_info
// Returns:
// 0 on success, error otherwise
//
int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
TOKUDB_DBUG_ENTER("ha_tokudb::add_index");
char name_buff[FN_REFLEN];
int error;
char newname[share->table_name_length + 32];
uint curr_index = 0;
DBC* tmp_cursor = NULL;
int cursor_ret_val = 0;
DBT current_primary_key;
DBT row;
DB_TXN* txn = NULL;
//
// these variables are for error handling
//
uint num_files_created = 0;
uint num_DB_opened = 0;
//
// in unpack_row, MySQL passes a buffer that is this long,
// so this length should be good enough for us as well
//
uchar tmp_record[table_arg->s->rec_buff_length];
bzero((void *) &current_primary_key, sizeof(current_primary_key));
bzero((void *) &row, sizeof(row));
//
// first create all the DB's files
//
char part[MAX_ALIAS_NAME + 10];
for (uint i = 0; i < num_of_keys; i++) {
sprintf(part, "key-%s", key_info[i].name);
make_name(newname, share->table_name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
error = create_sub_table(name_buff, NULL, DB_BTREE, (key_info[i].flags & HA_NOSAME) ? 0 : DB_DUP + DB_DUPSORT);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, key_info[i].flags, error);
}
if (error) { goto cleanup; }
num_files_created++;
}
//
// open all the DB files and set the appropriate variables in share
// they go to the end of share->key_file
//
curr_index = table_arg->s->keys;
for (uint i = 0; i < num_of_keys; i++, curr_index++) {
error = open_secondary_table(
&share->key_file[curr_index],
&key_info[i],
share->table_name,
0,
&share->key_type[curr_index]
);
if (error) { goto cleanup; }
num_DB_opened++;
}
//
// scan primary table, create each secondary key, add to each DB
//
error = db_env->txn_begin(db_env, 0, &txn, 0);
assert(error == 0);
if ((error = share->file->cursor(share->file, txn, &tmp_cursor, 0))) {
tmp_cursor = NULL; // Safety
goto cleanup;
}
//
// for each element in the primary table, insert the proper key value pair in each secondary table
// that is created
//
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &current_primary_key, &row, DB_NEXT);
while (cursor_ret_val != DB_NOTFOUND) {
if (cursor_ret_val) {
error = cursor_ret_val;
goto cleanup;
}
unpack_row(tmp_record, &row);
for (uint i = 0; i < num_of_keys; i++) {
DBT secondary_key;
create_dbt_key_from_key(&secondary_key,&key_info[i], key_buff, tmp_record);
uint curr_index = i + table_arg->s->keys;
u_int32_t put_flags = share->key_type[curr_index];
error = share->key_file[curr_index]->put(share->key_file[curr_index], NULL, &secondary_key, &current_primary_key, put_flags);
if (error) {
//
// in the case of any error anywhere, we can just nuke all the files created, so we dont need
// to be tricky and try to roll back changes. That is why we commit the transaction,
// which should be fast. The DB is going to go away anyway, so no pt in trying to keep
// it in a good state.
//
txn->commit(txn, 0);
goto cleanup;
}
}
cursor_ret_val = tmp_cursor->c_get(tmp_cursor, &current_primary_key, &row, DB_NEXT);
}
error = txn->commit(txn, 0);
assert(error == 0);
tmp_cursor->c_close(tmp_cursor);
error = 0;
cleanup:
if (error) {
//
// We need to delete all the files that may have been created
// The DB's must be closed and removed
//
for (uint i = table_arg->s->keys; i < table_arg->s->keys + num_DB_opened; i++) {
share->key_file[i]->close(share->key_file[i], 0);
share->key_file[i] = NULL;
}
for (uint i = 0; i < num_files_created; i++) {
DB* tmp;
sprintf(part, "key-%s", key_info[i].name);
make_name(newname, share->table_name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
if (!(db_create(&tmp, db_env, 0))) {
tmp->remove(tmp, name_buff, NULL, 0);
}
}
}
TOKUDB_DBUG_RETURN(error);
}
//
// Prepares to drop indexes to the table. For each value, i, in the array key_num,
// table->key_info[i] is a key that is to be dropped.
// ***********NOTE*******************
// Although prepare_drop_index is supposed to just get the DB's ready for removal,
// and not actually do the removal, we are doing it here and not in final_drop_index
// For the flags we expose in alter_table_flags, namely xxx_NO_WRITES, this is allowed
// Changes for "future-proofing" this so that it works when we have the equivalent flags
// that are not NO_WRITES are not worth it at the moments
// Parameters:
// [in] table_arg - table that is being modified, seems to be identical to this->table
// [in] key_num - array of indexes that specify which keys of the array table->key_info
// are to be dropped
// num_of_keys - size of array, key_num
// Returns:
// 0 on success, error otherwise
//
int ha_tokudb::prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys) {
TOKUDB_DBUG_ENTER("ha_tokudb::prepare_drop_index");
int error;
char name_buff[FN_REFLEN];
char newname[share->table_name_length + 32];
char part[MAX_ALIAS_NAME + 10];
DB** dbs_to_remove = NULL;
//
// we allocate an array of DB's here to get ready for removal
// We do this so that all potential memory allocation errors that may occur
// will do so BEFORE we go about dropping any indexes. This way, we
// can fail gracefully without losing integrity of data in such cases. If on
// on the other hand, we started removing DB's, and in the middle,
// one failed, it is not immedietely obvious how one would rollback
//
dbs_to_remove = (DB **)my_malloc(sizeof(*dbs_to_remove)*num_of_keys, MYF(MY_ZEROFILL));
if (dbs_to_remove == NULL) {
error = ENOMEM;
goto cleanup;
}
for (uint i = 0; i < num_of_keys; i++) {
error = db_create(&dbs_to_remove[i], db_env, 0);
if (error) {
goto cleanup;
}
}
for (uint i = 0; i < num_of_keys; i++) {
uint curr_index = key_num[i];
share->key_file[curr_index]->close(share->key_file[curr_index],0);
share->key_file[curr_index] = NULL;
sprintf(part, "key-%s", table_arg->key_info[curr_index].name);
make_name(newname, share->table_name, part);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME);
dbs_to_remove[i]->remove(dbs_to_remove[i], name_buff, NULL, 0);
}
cleanup:
my_free(dbs_to_remove, MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(error);
}
// ***********NOTE*******************
// Although prepare_drop_index is supposed to just get the DB's ready for removal,
// and not actually do the removal, we are doing it here and not in final_drop_index
// For the flags we expose in alter_table_flags, namely xxx_NO_WRITES, this is allowed
// Changes for "future-proofing" this so that it works when we have the equivalent flags
// that are not NO_WRITES are not worth it at the moments, therefore, we can make
// this function just return
int ha_tokudb::final_drop_index(TABLE *table_arg) {
TOKUDB_DBUG_ENTER("ha_tokudb::final_drop_index");
TOKUDB_DBUG_RETURN(0);
}
void ha_tokudb::print_error(int error, myf errflag) {
if (error == DB_LOCK_DEADLOCK)
error = HA_ERR_LOCK_DEADLOCK;
......
......@@ -14,7 +14,16 @@ typedef struct st_tokudb_share {
ulonglong last_auto_increment;
ha_rows rows, org_rows;
ulong *rec_per_key;
DB *status_block, *file, **key_file;
DB *status_block;
//
// DB that is indexed on the primary key
//
DB *file;
//
// array of all DB's that make up table, includes DB that
// is indexed on the primary key
//
DB *key_file[MAX_KEY];
u_int32_t *key_type;
uint status, version;
uint ref_length;
......@@ -71,25 +80,11 @@ private:
//
uchar *primary_key_buff;
//
// DB that is indexed on the primary key
//
DB *file;
//
// array of all DB's that make up table, includes DB that
// is indexed on the primary key
//
DB **key_file;
//
// transaction used by ha_tokudb's cursor
//
DB_TXN *transaction;
//
// array that holds put_flags for each database. So, when doing a
// key_file[i]->put, key_type[i] gets passed in for flags
//
u_int32_t *key_type;
//
// instance of cursor being used for init_xxx and rnd_xxx functions
//
......@@ -123,7 +118,8 @@ private:
int pack_row(DBT * row, const uchar * record);
void unpack_row(uchar * record, DBT * row);
void unpack_key(uchar * record, DBT * key, uint index);
DBT *create_key(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT* create_dbt_key_from_key(DBT * key, KEY* key_info, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT *create_dbt_key_from_table(DBT * key, uint keynr, uchar * buff, const uchar * record, int key_length = MAX_KEY_LENGTH);
DBT *pack_key(DBT * key, uint keynr, uchar * buff, const uchar * key_ptr, uint key_length);
int remove_key(DB_TXN * trans, uint keynr, const uchar * record, DBT * prim_key);
int remove_keys(DB_TXN * trans, const uchar * record, DBT * prim_key, key_map * keys);
......@@ -132,7 +128,9 @@ private:
int update_primary_key(DB_TXN * trans, bool primary_key_changed, const uchar * old_row, DBT * old_key, const uchar * new_row, DBT * prim_key, bool local_using_ignore);
int read_row(int error, uchar * buf, uint keynr, DBT * row, DBT * key, bool);
DBT *get_pos(DBT * to, uchar * pos);
int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type);
public:
ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg);
~ha_tokudb() {
......@@ -246,6 +244,10 @@ public:
int cmp_ref(const uchar * ref1, const uchar * ref2);
bool check_if_incompatible_data(HA_CREATE_INFO * info, uint table_changes);
int add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys);
int prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys);
int final_drop_index(TABLE *table_arg);
private:
int __close(int mutex_is_locked);
int read_last();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment