Commit 53a3013a authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:1979], merge to main line

git-svn-id: file:///svn/mysql/tokudb-engine/src@15772 c7de825b-a66e-492c-adef-691d508d4ae1
parent 867c2c3b
...@@ -500,7 +500,6 @@ ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar *record ...@@ -500,7 +500,6 @@ ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar *record
} }
inline uint get_null_offset(TABLE* table, Field* field) { inline uint get_null_offset(TABLE* table, Field* field) {
return (uint) ((uchar*) field->null_ptr - (uchar*) table->record[0]); return (uint) ((uchar*) field->null_ptr - (uchar*) table->record[0]);
} }
...@@ -813,18 +812,12 @@ const uchar* unpack_toku_field_blob( ...@@ -813,18 +812,12 @@ const uchar* unpack_toku_field_blob(
static int add_table_to_metadata(const char *name, TABLE* table) { static int add_table_to_metadata(const char *name, TABLE* table, DB_TXN* txn) {
int error = 0; int error = 0;
DBT key; DBT key;
DBT val; DBT val;
DB_TXN* txn = NULL;
uchar hidden_primary_key = (table->s->primary_key >= MAX_KEY); uchar hidden_primary_key = (table->s->primary_key >= MAX_KEY);
assert(txn);
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
bzero((void *)&key, sizeof(key)); bzero((void *)&key, sizeof(key));
bzero((void *)&val, sizeof(val)); bzero((void *)&val, sizeof(val));
...@@ -839,25 +832,14 @@ static int add_table_to_metadata(const char *name, TABLE* table) { ...@@ -839,25 +832,14 @@ static int add_table_to_metadata(const char *name, TABLE* table) {
&val, &val,
DB_YESOVERWRITE DB_YESOVERWRITE
); );
cleanup:
if (txn) {
int r = !error ? txn->commit(txn,0) : txn->abort(txn);
assert(!r);
}
return error; return error;
} }
static int drop_table_from_metadata(const char *name) { static int drop_table_from_metadata(const char *name, DB_TXN* txn) {
int error = 0; int error = 0;
DBT key; DBT key;
DBT data; DBT data;
DB_TXN* txn = NULL; assert(txn);
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
bzero((void *)&key, sizeof(key)); bzero((void *)&key, sizeof(key));
bzero((void *)&data, sizeof(data)); bzero((void *)&data, sizeof(data));
key.data = (void *)name; key.data = (void *)name;
...@@ -868,25 +850,15 @@ static int drop_table_from_metadata(const char *name) { ...@@ -868,25 +850,15 @@ static int drop_table_from_metadata(const char *name) {
&key , &key ,
DB_DELETE_ANY DB_DELETE_ANY
); );
cleanup:
if (txn) {
int r = !error ? txn->commit(txn,0) : txn->abort(txn);
assert(!r);
}
return error; return error;
} }
static int rename_table_in_metadata(const char *from, const char *to) { static int rename_table_in_metadata(const char *from, const char *to, DB_TXN* txn) {
int error = 0; int error = 0;
DBT from_key; DBT from_key;
DBT to_key; DBT to_key;
DBT val; DBT val;
DB_TXN* txn = NULL; assert(txn);
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
bzero((void *)&from_key, sizeof(from_key)); bzero((void *)&from_key, sizeof(from_key));
bzero((void *)&to_key, sizeof(to_key)); bzero((void *)&to_key, sizeof(to_key));
...@@ -933,10 +905,6 @@ static int rename_table_in_metadata(const char *from, const char *to) { ...@@ -933,10 +905,6 @@ static int rename_table_in_metadata(const char *from, const char *to) {
error = 0; error = 0;
cleanup: cleanup:
if (txn) {
int r = !error ? txn->commit(txn,0) : txn->abort(txn);
assert(!r);
}
my_free(val.data, MYF(MY_ALLOW_ZERO_PTR)); my_free(val.data, MYF(MY_ALLOW_ZERO_PTR));
return error; return error;
...@@ -976,7 +944,7 @@ static int check_table_in_metadata(const char *name, bool* table_found) { ...@@ -976,7 +944,7 @@ static int check_table_in_metadata(const char *name, bool* table_found) {
cleanup: cleanup:
if (txn) { if (txn) {
error = txn->commit(txn,0); commit_txn(txn, 0);
} }
pthread_mutex_unlock(&tokudb_meta_mutex); pthread_mutex_unlock(&tokudb_meta_mutex);
return error; return error;
...@@ -1051,18 +1019,103 @@ bool ha_tokudb::has_auto_increment_flag(uint* index) { ...@@ -1051,18 +1019,103 @@ bool ha_tokudb::has_auto_increment_flag(uint* index) {
return ai_found; return ai_found;
} }
int ha_tokudb::open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn) {
int error;
char* newname = NULL;
uint open_mode = DB_THREAD;
newname = (char *)my_malloc(
get_max_dict_name_path_length(name),
MYF(MY_WME)
);
if (newname == NULL) {
error = ENOMEM;
goto cleanup;
}
make_name(newname, name, "status");
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s\n", newname);
}
error = db_create(ptr, db_env, 0);
if (error) { goto cleanup; }
(*ptr)->set_bt_compare((*ptr), tokudb_cmp_dbt_key);
error = (*ptr)->open((*ptr), txn, newname, NULL, DB_BTREE, open_mode, 0);
if (error) {
goto cleanup;
}
cleanup:
if (error) {
if (*ptr) {
(*ptr)->close(*ptr, 0);
*ptr = NULL;
}
}
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
int ha_tokudb::open_main_dictionary(const char* name, int mode, DB_TXN* txn) {
int error;
char* newname = NULL;
uint open_flags = (mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD;
open_flags += DB_AUTO_COMMIT;
assert(share->file == NULL);
assert(share->key_file[primary_key] == NULL);
newname = (char *)my_malloc(
get_max_dict_name_path_length(name),
MYF(MY_WME|MY_ZEROFILL)
);
if (newname == NULL) {
error = ENOMEM;
goto exit;
}
make_name(newname, name, "main");
error = db_create(&share->file, db_env, 0);
if (error) {
goto exit;
}
share->key_file[primary_key] = share->file;
share->key_type[primary_key] = hidden_primary_key ? DB_YESOVERWRITE : DB_NOOVERWRITE;
//
// set comparison function for main.tokudb
//
share->file->set_bt_compare(share->file, tokudb_cmp_dbt_key);
error = share->file->open(share->file, txn, newname, NULL, DB_BTREE, open_flags, 0);
if (error) {
goto exit;
}
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s:file=%p\n", newname, share->file);
}
exit:
if (error) {
if (share->file) {
share->file->close(
share->file,
0
);
share->file = NULL;
share->key_file[primary_key] = NULL;
}
}
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
// //
// Open a secondary table, the key will be a secondary index, the data will be a primary key // Open a secondary table, the key will be a secondary index, the data will be a primary key
// //
int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type) { int ha_tokudb::open_secondary_dictionary(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type, DB_TXN* txn) {
int error = ENOSYS; int error = ENOSYS;
char dict_name[MAX_DICT_NAME_LEN]; char dict_name[MAX_DICT_NAME_LEN];
char name_buff[FN_REFLEN];
uint open_flags = (mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD; uint open_flags = (mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD;
char* newname = NULL; char* newname = NULL;
char* fn_ret = NULL;
uint newname_len = 0; uint newname_len = 0;
sprintf(dict_name, "key-%s", key_info->name); sprintf(dict_name, "key-%s", key_info->name);
...@@ -1074,11 +1127,6 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i ...@@ -1074,11 +1127,6 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
goto cleanup; goto cleanup;
} }
make_name(newname, name, dict_name); make_name(newname, name, dict_name);
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
open_flags += DB_AUTO_COMMIT; open_flags += DB_AUTO_COMMIT;
...@@ -1090,8 +1138,8 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i ...@@ -1090,8 +1138,8 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
// TODO: make sure that with clustering keys, DB_YESOVERWRITE IS ALWAYS SET // TODO: make sure that with clustering keys, DB_YESOVERWRITE IS ALWAYS SET
// //
*key_type = key_info->flags & HA_NOSAME ? DB_NOOVERWRITE : DB_YESOVERWRITE; *key_type = key_info->flags & HA_NOSAME ? DB_NOOVERWRITE : DB_YESOVERWRITE;
(*ptr)->set_bt_compare(*ptr, tokudb_cmp_dbt_key); (*ptr)->set_bt_compare(*ptr, tokudb_cmp_dbt_key);
DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %s\n", key_info->name)); DBUG_PRINT("info", ("Setting DB_DUP+DB_DUPSORT for key %s\n", key_info->name));
// //
// clustering keys are not DB_DUP, because their keys are unique (they have the PK embedded) // clustering keys are not DB_DUP, because their keys are unique (they have the PK embedded)
...@@ -1101,7 +1149,7 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i ...@@ -1101,7 +1149,7 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
(*ptr)->set_dup_compare(*ptr, tokudb_cmp_dbt_data); (*ptr)->set_dup_compare(*ptr, tokudb_cmp_dbt_data);
} }
if ((error = (*ptr)->open(*ptr, 0, name_buff, NULL, DB_BTREE, open_flags, 0))) { if ((error = (*ptr)->open(*ptr, txn, newname, NULL, DB_BTREE, open_flags, 0))) {
my_errno = error; my_errno = error;
goto cleanup; goto cleanup;
} }
...@@ -1109,6 +1157,12 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i ...@@ -1109,6 +1157,12 @@ int ha_tokudb::open_secondary_table(DB** ptr, KEY* key_info, const char* name, i
TOKUDB_TRACE("open:%s:file=%p\n", newname, *ptr); TOKUDB_TRACE("open:%s:file=%p\n", newname, *ptr);
} }
cleanup: cleanup:
if (error) {
if (*ptr) {
(*ptr)->close(*ptr, 0);
*ptr = NULL;
}
}
my_free(newname, MYF(MY_ALLOW_ZERO_PTR)); my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
return error; return error;
} }
...@@ -1175,15 +1229,10 @@ int ha_tokudb::initialize_share( ...@@ -1175,15 +1229,10 @@ int ha_tokudb::initialize_share(
) )
{ {
int error = 0; int error = 0;
char* newname = NULL;
char name_buff[FN_REFLEN];
char* fn_ret = NULL;
u_int64_t num_rows = 0; u_int64_t num_rows = 0;
u_int32_t curr_blob_field_index = 0; u_int32_t curr_blob_field_index = 0;
u_int32_t max_var_bytes = 0; u_int32_t max_var_bytes = 0;
bool table_exists; bool table_exists;
uint open_flags = (mode == O_RDONLY ? DB_RDONLY : 0) | DB_THREAD;
open_flags += DB_AUTO_COMMIT;
DBUG_PRINT("info", ("share->use_count %u", share->use_count)); DBUG_PRINT("info", ("share->use_count %u", share->use_count));
table_exists = true; table_exists = true;
...@@ -1198,22 +1247,6 @@ int ha_tokudb::initialize_share( ...@@ -1198,22 +1247,6 @@ int ha_tokudb::initialize_share(
goto exit; goto exit;
} }
newname = (char *)my_malloc(
get_max_dict_name_path_length(name),
MYF(MY_WME|MY_ZEROFILL)
);
if (newname == NULL) {
error = ENOMEM;
goto exit;
}
make_name(newname, name, "main");
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto exit;
}
// //
// fill in the field lengths. 0 means it is a variable sized field length // fill in the field lengths. 0 means it is a variable sized field length
// fill in length_bytes, 0 means it is fixed or blob // fill in length_bytes, 0 means it is fixed or blob
...@@ -1306,37 +1339,19 @@ int ha_tokudb::initialize_share( ...@@ -1306,37 +1339,19 @@ int ha_tokudb::initialize_share(
} }
error = db_create(&share->file, db_env, 0); error = open_main_dictionary(name, mode, NULL);
if (error) { if (error) { goto exit; }
goto exit;
}
share->key_file[primary_key] = share->file;
share->key_type[primary_key] = hidden_primary_key ? DB_YESOVERWRITE : DB_NOOVERWRITE;
//
// set comparison function for main.tokudb
//
share->file->set_bt_compare(share->file, tokudb_cmp_dbt_key);
error = share->file->open(share->file, 0, name_buff, NULL, DB_BTREE, open_flags, 0);
if (error) {
goto exit;
}
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s:file=%p\n", newname, share->file);
}
/* Open other keys; These are part of the share structure */ /* Open other keys; These are part of the share structure */
for (uint i = 0; i < table_share->keys; i++) { for (uint i = 0; i < table_share->keys; i++) {
if (i != primary_key) { if (i != primary_key) {
error = open_secondary_table( error = open_secondary_dictionary(
&share->key_file[i], &share->key_file[i],
&table_share->key_info[i], &table_share->key_info[i],
name, name,
mode, mode,
&share->key_type[i] &share->key_type[i],
NULL
); );
if (error) { if (error) {
goto exit; goto exit;
...@@ -1394,7 +1409,6 @@ int ha_tokudb::initialize_share( ...@@ -1394,7 +1409,6 @@ int ha_tokudb::initialize_share(
error = 0; error = 0;
exit: exit:
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
return error; return error;
} }
...@@ -1574,31 +1588,87 @@ cleanup: ...@@ -1574,31 +1588,87 @@ cleanup:
crsr = NULL; crsr = NULL;
} }
if (do_commit) { if (do_commit) {
transaction->commit(transaction, 0); commit_txn(transaction, 0);
transaction = NULL; transaction = NULL;
} }
return error; return error;
} }
int ha_tokudb::write_to_status(DB* db, HA_METADATA_KEY curr_key_data, void* data, uint size ){ int ha_tokudb::write_to_status(DB* db, HA_METADATA_KEY curr_key_data, void* data, uint size, DB_TXN* txn ){
return write_metadata(db, &curr_key_data, sizeof(curr_key_data), data, size); return write_metadata(db, &curr_key_data, sizeof(curr_key_data), data, size, txn);
}
int ha_tokudb::remove_metadata(DB* db, void* key_data, uint key_size, DB_TXN* transaction){
int error;
DBT key;
DB_TXN* txn = NULL;
bool do_commit = false;
//
// transaction to be used for putting metadata into status.tokudb
//
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
//
// transaction to be used for putting metadata into status.tokudb
//
if (transaction == NULL) {
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
do_commit = true;
}
else {
txn = transaction;
}
bzero(&key, sizeof(key));
key.data = key_data;
key.size = key_size;
error = db->del(db, txn, &key, DB_DELETE_ANY);
if (error) {
goto cleanup;
}
error = 0;
cleanup:
if (do_commit && txn) {
if (!error) {
commit_txn(txn, DB_TXN_NOSYNC);
}
else {
abort_txn(txn);
}
}
return error;
} }
// //
// helper function to write a piece of metadata in to status.tokudb // helper function to write a piece of metadata in to status.tokudb
// //
int ha_tokudb::write_metadata(DB* db, void* key_data, uint key_size, void* val_data, uint val_size ){ int ha_tokudb::write_metadata(DB* db, void* key_data, uint key_size, void* val_data, uint val_size, DB_TXN* transaction ){
int error; int error;
DBT key; DBT key;
DBT value; DBT value;
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
bool do_commit = false;
// //
// transaction to be used for putting metadata into status.tokudb // transaction to be used for putting metadata into status.tokudb
// //
error = db_env->txn_begin(db_env, 0, &txn, 0); if (transaction == NULL) {
if (error) { error = db_env->txn_begin(db_env, 0, &txn, 0);
goto cleanup; if (error) {
goto cleanup;
}
do_commit = true;
}
else {
txn = transaction;
} }
bzero(&key, sizeof(key)); bzero(&key, sizeof(key));
...@@ -1614,12 +1684,12 @@ int ha_tokudb::write_metadata(DB* db, void* key_data, uint key_size, void* val_d ...@@ -1614,12 +1684,12 @@ int ha_tokudb::write_metadata(DB* db, void* key_data, uint key_size, void* val_d
error = 0; error = 0;
cleanup: cleanup:
if (txn) { if (do_commit && txn) {
if (!error) { if (!error) {
txn->commit(txn, DB_TXN_NOSYNC); commit_txn(txn, DB_TXN_NOSYNC);
} }
else { else {
txn->abort(txn); abort_txn(txn);
} }
} }
return error; return error;
...@@ -1637,7 +1707,7 @@ cleanup: ...@@ -1637,7 +1707,7 @@ cleanup:
// //
// //
int ha_tokudb::update_max_auto_inc(DB* db, ulonglong val){ int ha_tokudb::update_max_auto_inc(DB* db, ulonglong val){
return write_to_status(db,hatoku_max_ai,&val,sizeof(val)); return write_to_status(db,hatoku_max_ai,&val,sizeof(val), NULL);
} }
// //
...@@ -1651,8 +1721,8 @@ int ha_tokudb::update_max_auto_inc(DB* db, ulonglong val){ ...@@ -1651,8 +1721,8 @@ int ha_tokudb::update_max_auto_inc(DB* db, ulonglong val){
// 0 on success, error otherwise // 0 on success, error otherwise
// //
// //
int ha_tokudb::write_auto_inc_create(DB* db, ulonglong val){ int ha_tokudb::write_auto_inc_create(DB* db, ulonglong val, DB_TXN* txn){
return write_to_status(db,hatoku_ai_create_value,&val,sizeof(val)); return write_to_status(db,hatoku_ai_create_value,&val,sizeof(val), txn);
} }
...@@ -2360,8 +2430,7 @@ int ha_tokudb::read_last(uint keynr) { ...@@ -2360,8 +2430,7 @@ int ha_tokudb::read_last(uint keynr) {
error = index_last(table->record[1]); error = index_last(table->record[1]);
index_end(); index_end();
if (do_commit) { if (do_commit) {
int r = transaction->commit(transaction, 0); commit_txn(transaction, 0);
assert(r == 0);
transaction = NULL; transaction = NULL;
} }
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
...@@ -2398,36 +2467,15 @@ int ha_tokudb::get_status() { ...@@ -2398,36 +2467,15 @@ int ha_tokudb::get_status() {
DBT key, value; DBT key, value;
HA_METADATA_KEY curr_key; HA_METADATA_KEY curr_key;
int error; int error;
char* newname = NULL;
char* fn_ret = NULL;
// //
// open status.tokudb // open status.tokudb
// //
if (!share->status_block) { if (!share->status_block) {
char name_buff[FN_REFLEN]; error = open_status_dictionary(
newname = (char *)my_malloc( &share->status_block,
get_max_dict_name_path_length(share->table_name), share->table_name,
MYF(MY_WME) NULL
); );
if (newname == NULL) {
error = ENOMEM;
goto cleanup;
}
make_name(newname, share->table_name, "status");
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
uint open_mode = (((table->db_stat & HA_READ_ONLY) ? DB_RDONLY : 0)
| DB_THREAD);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s\n", newname);
}
error = db_create(&share->status_block, db_env, 0);
if (error) { goto cleanup; }
error = share->status_block->open(share->status_block, NULL, name_buff, NULL, DB_BTREE, open_mode, 0);
if (error) { if (error) {
goto cleanup; goto cleanup;
} }
...@@ -2444,64 +2492,56 @@ int ha_tokudb::get_status() { ...@@ -2444,64 +2492,56 @@ int ha_tokudb::get_status() {
error = db_env->txn_begin(db_env, 0, &txn, 0); error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; } if (error) { goto cleanup; }
if (share->status_block) { assert(share->status_block);
int error; //
// // get version
// get version //
// value.ulen = sizeof(share->version);
value.ulen = sizeof(share->version); value.data = &share->version;
value.data = &share->version; curr_key = hatoku_version;
curr_key = hatoku_version; error = share->status_block->get(
error = share->status_block->get( share->status_block,
share->status_block, txn,
txn, &key,
&key, &value,
&value, 0
0 );
); if (error == DB_NOTFOUND) {
if (error == DB_NOTFOUND) { share->version = 0;
share->version = 0; }
} else if (error || value.size != sizeof(share->version)) {
else if (error || value.size != sizeof(share->version)) { if (error == 0) {
if (error == 0) { error = HA_ERR_INTERNAL_ERROR;
error = HA_ERR_INTERNAL_ERROR;
}
goto cleanup;
}
//
// get capabilities
//
curr_key = hatoku_capabilities;
value.ulen = sizeof(share->capabilities);
value.data = &share->capabilities;
error = share->status_block->get(
share->status_block,
txn,
&key,
&value,
0
);
if (error == DB_NOTFOUND) {
share->capabilities= 0;
} }
else if (error || value.size != sizeof(share->version)) { goto cleanup;
if (error == 0) { }
error = HA_ERR_INTERNAL_ERROR; //
} // get capabilities
goto cleanup; //
curr_key = hatoku_capabilities;
value.ulen = sizeof(share->capabilities);
value.data = &share->capabilities;
error = share->status_block->get(
share->status_block,
txn,
&key,
&value,
0
);
if (error == DB_NOTFOUND) {
share->capabilities= 0;
}
else if (error || value.size != sizeof(share->version)) {
if (error == 0) {
error = HA_ERR_INTERNAL_ERROR;
} }
goto cleanup;
} }
error = 0; error = 0;
cleanup: cleanup:
if (txn) { if (txn) {
txn->commit(txn,0); commit_txn(txn,0);
}
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
if (error) {
if (share->status_block) {
share->status_block->close(share->status_block, 0);
share->status_block = NULL;
}
} }
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -2598,8 +2638,7 @@ cleanup: ...@@ -2598,8 +2638,7 @@ cleanup:
tmp_cursor = NULL; tmp_cursor = NULL;
} }
if (txn) { if (txn) {
int r = txn->commit(txn, 0); commit_txn(txn, 0);
assert(r == 0);
txn = NULL; txn = NULL;
} }
return ret_val; return ret_val;
...@@ -2666,13 +2705,6 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2666,13 +2705,6 @@ int ha_tokudb::write_row(uchar * record) {
is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) || is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) ||
(thd_sql_command(thd) == SQLCOM_REPLACE_SELECT); (thd_sql_command(thd) == SQLCOM_REPLACE_SELECT);
//
// this can only fail if we have not opened the environment
// yet. I want to assert that rather than check for the error
//
error = db_env->checkpointing_begin_atomic_operation(db_env);
assert(!error);
// //
// some crap that needs to be done because MySQL does not properly abstract // some crap that needs to be done because MySQL does not properly abstract
// this work away from us, namely filling in auto increment and setting auto timestamp // this work away from us, namely filling in auto increment and setting auto timestamp
...@@ -2835,16 +2867,12 @@ cleanup: ...@@ -2835,16 +2867,12 @@ cleanup:
// nothing we can do about it anyway and it is not what // nothing we can do about it anyway and it is not what
// we want to return. // we want to return.
if (error) { if (error) {
sub_trans->abort(sub_trans); abort_txn(sub_trans);
} }
else { else {
error = sub_trans->commit(sub_trans, DB_TXN_NOSYNC); commit_txn(sub_trans, DB_TXN_NOSYNC);
} }
} }
{
int r = db_env->checkpointing_end_atomic_operation(db_env);
assert(r==0);
}
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -2930,13 +2958,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2930,13 +2958,6 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
//
// this can only fail if we have not opened the environment
// yet. I want to assert that rather than check for the error
//
error = db_env->checkpointing_begin_atomic_operation(db_env);
assert(!error);
LINT_INIT(error); LINT_INIT(error);
statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_update_count, &LOCK_status);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) { if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) {
...@@ -3078,16 +3099,12 @@ cleanup: ...@@ -3078,16 +3099,12 @@ cleanup:
// nothing we can do about it anyway and it is not what // nothing we can do about it anyway and it is not what
// we want to return. // we want to return.
if (error) { if (error) {
sub_trans->abort(sub_trans); abort_txn(sub_trans);
} }
else { else {
error = sub_trans->commit(sub_trans, DB_TXN_NOSYNC); commit_txn(sub_trans, DB_TXN_NOSYNC);
} }
} }
{
int r = db_env->checkpointing_end_atomic_operation(db_env);
assert(r==0);
}
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -3180,13 +3197,6 @@ int ha_tokudb::delete_row(const uchar * record) { ...@@ -3180,13 +3197,6 @@ int ha_tokudb::delete_row(const uchar * record) {
THD* thd = ha_thd(); THD* thd = ha_thd();
tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);; tokudb_trx_data* trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);;
//
// this can only fail if we have not opened the environment
// yet. I want to assert that rather than check for the error
//
error = db_env->checkpointing_begin_atomic_operation(db_env);
assert(!error);
statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status); statistic_increment(table->in_use->status_var.ha_delete_count, &LOCK_status);
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null); create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null);
...@@ -3205,10 +3215,6 @@ int ha_tokudb::delete_row(const uchar * record) { ...@@ -3205,10 +3215,6 @@ int ha_tokudb::delete_row(const uchar * record) {
trx->stmt_progress.deleted++; trx->stmt_progress.deleted++;
track_progress(thd); track_progress(thd);
} }
{
int r = db_env->checkpointing_end_atomic_operation(db_env);
assert(r==0);
}
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -4303,7 +4309,7 @@ int ha_tokudb::info(uint flag) { ...@@ -4303,7 +4309,7 @@ int ha_tokudb::info(uint flag) {
error = 0; error = 0;
cleanup: cleanup:
if (txn != NULL) { if (txn != NULL) {
txn->commit(txn, DB_TXN_NOSYNC); commit_txn(txn, DB_TXN_NOSYNC);
txn = NULL; txn = NULL;
} }
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
...@@ -4521,7 +4527,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) { ...@@ -4521,7 +4527,7 @@ int ha_tokudb::external_lock(THD * thd, int lock_type) {
We must in this case commit the work to keep the row locks We must in this case commit the work to keep the row locks
*/ */
DBUG_PRINT("trans", ("commiting non-updating transaction")); DBUG_PRINT("trans", ("commiting non-updating transaction"));
error = trx->stmt->commit(trx->stmt, 0); commit_txn(trx->stmt, 0);
reset_stmt_progress(&trx->stmt_progress); reset_stmt_progress(&trx->stmt_progress);
if (tokudb_debug & TOKUDB_DEBUG_TXN) if (tokudb_debug & TOKUDB_DEBUG_TXN)
TOKUDB_TRACE("commit:%p:%d\n", trx->stmt, error); TOKUDB_TRACE("commit:%p:%d\n", trx->stmt, error);
...@@ -4640,7 +4646,7 @@ int toku_dbt_up(DB*, ...@@ -4640,7 +4646,7 @@ int toku_dbt_up(DB*,
} }
static int create_sub_table(const char *table_name, int flags , DBT* row_descriptor) { static int create_sub_table(const char *table_name, int flags , DBT* row_descriptor, DB_TXN* txn) {
TOKUDB_DBUG_ENTER("create_sub_table"); TOKUDB_DBUG_ENTER("create_sub_table");
int error; int error;
DB *file = NULL; DB *file = NULL;
...@@ -4662,157 +4668,20 @@ static int create_sub_table(const char *table_name, int flags , DBT* row_descrip ...@@ -4662,157 +4668,20 @@ static int create_sub_table(const char *table_name, int flags , DBT* row_descrip
goto exit; goto exit;
} }
error = file->open(file, NULL, table_name, NULL, DB_BTREE, DB_THREAD | DB_CREATE, my_umask); error = file->open(file, txn, table_name, NULL, DB_BTREE, DB_THREAD | DB_CREATE, my_umask);
if (error) { if (error) {
DBUG_PRINT("error", ("Got error: %d when opening table '%s'", error, table_name)); DBUG_PRINT("error", ("Got error: %d when opening table '%s'", error, table_name));
goto exit; goto exit;
} }
file->close(file, 0);
error = 0; error = 0;
exit: exit:
if (error) { if (file) {
if (file != NULL) { file->close(file, 0);
(void) file->remove(file, table_name, NULL, 0);
}
} }
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
static int mkdirpath(char *name, mode_t mode) {
char* parent = NULL;
int r = toku_os_mkdir(name, mode);
if (r == -1 && errno == ENOENT) {
parent = (char *)my_malloc(strlen(name)+1,MYF(MY_WME));
if (parent == NULL) {
r = ENOMEM;
goto cleanup;
}
strcpy(parent, name);
char *cp = strrchr(parent, '/');
if (cp) {
*cp = 0;
r = toku_os_mkdir(parent, 0755);
if (r == 0)
r = toku_os_mkdir(name, mode);
}
}
cleanup:
my_free(parent, MYF(MY_ALLOW_ZERO_PTR));
return r;
}
extern "C" {
#include <dirent.h>
}
static int rmall(const char *dname) {
int error = 0;
char* fname = NULL;
struct dirent *dirent = NULL;;
DIR *d = opendir(dname);
if (d == NULL) {
error = errno;
goto cleanup;
}
//
// we do two loops, first loop just removes all the .tokudb files
// second loop removes extraneous files
//
while ((dirent = readdir(d)) != 0) {
if (0 == strcmp(dirent->d_name, ".") || 0 == strcmp(dirent->d_name, ".."))
continue;
fname = (char *)my_malloc(strlen(dname) + 1 + strlen(dirent->d_name) + 1, MYF(MY_WME));
sprintf(fname, "%s/%s", dname, dirent->d_name);
if (dirent->d_type == DT_DIR) {
error = rmall(fname);
if (error) { goto cleanup; }
}
else {
//
// if clause checks if the file is a .tokudb file
//
if (strlen(fname) >= strlen (ha_tokudb_ext) &&
strcmp(fname + (strlen(fname) - strlen(ha_tokudb_ext)), ha_tokudb_ext) == 0)
{
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("removing:%s\n", fname);
}
//
// if this fails under low memory conditions, gracefully exit and return error
// user will be notified that something went wrong, and he will
// have to deal with it
//
DB* db = NULL;
error = db_create(&db, db_env, 0);
if (error) { goto cleanup; }
//
// it is ok to do db->remove on any .tokudb file, because any such
// file was created with db->open
//
error = db->remove(db, fname, NULL, 0);
if (error) { goto cleanup; }
}
else {
continue;
}
my_free(fname, MYF(MY_ALLOW_ZERO_PTR));
fname = NULL;
}
}
closedir(d);
d = NULL;
fname = NULL;
d = opendir(dname);
if (d == NULL) {
error = errno;
goto cleanup;
}
//
// second loop to remove extraneous files
//
while ((dirent = readdir(d)) != 0) {
if (0 == strcmp(dirent->d_name, ".") || 0 == strcmp(dirent->d_name, ".."))
continue;
fname = (char *)my_malloc(strlen(dname) + 1 + strlen(dirent->d_name) + 1, MYF(MY_WME));
sprintf(fname, "%s/%s", dname, dirent->d_name);
if (dirent->d_type == DT_DIR) {
error = rmall(fname);
if (error) { goto cleanup; }
}
else {
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("removing:%s\n", fname);
}
//
// Now we are removing files that are not .tokudb, we just delete it
//
error = unlink(fname);
if (error != 0) {
error = errno;
break;
}
my_free(fname, MYF(MY_ALLOW_ZERO_PTR));
fname = NULL;
}
}
closedir(d);
d = NULL;
error = rmdir(dname);
if (error != 0) {
error = errno;
goto cleanup;
}
cleanup:
return error;
}
void ha_tokudb::update_create_info(HA_CREATE_INFO* create_info) { void ha_tokudb::update_create_info(HA_CREATE_INFO* create_info) {
if (share->has_auto_inc) { if (share->has_auto_inc) {
info(HA_STATUS_AUTO); info(HA_STATUS_AUTO);
...@@ -4820,45 +4689,67 @@ void ha_tokudb::update_create_info(HA_CREATE_INFO* create_info) { ...@@ -4820,45 +4689,67 @@ void ha_tokudb::update_create_info(HA_CREATE_INFO* create_info) {
} }
} }
// //
// Creates a new table // removes key name from status.tokudb.
// Parameters: // needed for when we are dropping indexes, so that
// [in] name - table name // during drop table, we do not attempt to remove already dropped
// [in] form - info on table, columns and indexes // indexes because we did not keep status.tokudb in sync with list of indexes.
// [in] create_info - more info on table, CURRENTLY UNUSED
// Returns:
// 0 on success
// error otherwise
// //
int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_info) { int ha_tokudb::remove_key_name_from_status(DB* status_block, char* key_name, DB_TXN* txn) {
TOKUDB_DBUG_ENTER("ha_tokudb::create");
char name_buff[FN_REFLEN];
int error; int error;
DB *status_block = NULL; uchar status_key_info[FN_REFLEN + sizeof(HA_METADATA_KEY)];
bool dir_path_made = false; HA_METADATA_KEY md_key = hatoku_key_name;
char* dirname = NULL; memcpy(status_key_info, &md_key, sizeof(HA_METADATA_KEY));
char* newname = NULL; //
DBT row_descriptor; // put index name in status.tokudb
uchar* row_desc_buff = NULL; //
KEY* prim_key = NULL; memcpy(
char* fn_ret = NULL; status_key_info + sizeof(HA_METADATA_KEY),
uint version; key_name,
uint capabilities; strlen(key_name) + 1
);
pthread_mutex_lock(&tokudb_meta_mutex); error = remove_metadata(
bzero(&row_descriptor, sizeof(row_descriptor)); status_block,
row_desc_buff = (uchar *)my_malloc(2*(form->s->fields * 6)+10 ,MYF(MY_WME)); status_key_info,
if (row_desc_buff == NULL){ error = ENOMEM; goto cleanup;} sizeof(HA_METADATA_KEY) + strlen(key_name) + 1,
txn
);
return error;
}
dirname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME)); //
if (dirname == NULL){ error = ENOMEM; goto cleanup;} // writes the key name in status.tokudb, so that we may later delete or rename
newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME)); // the dictionary associated with key_name
if (newname == NULL){ error = ENOMEM; goto cleanup;} //
primary_key = form->s->primary_key; int ha_tokudb::write_key_name_to_status(DB* status_block, char* key_name, DB_TXN* txn) {
hidden_primary_key = (primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0; int error;
uchar status_key_info[FN_REFLEN + sizeof(HA_METADATA_KEY)];
HA_METADATA_KEY md_key = hatoku_key_name;
memcpy(status_key_info, &md_key, sizeof(HA_METADATA_KEY));
//
// put index name in status.tokudb
//
memcpy(
status_key_info + sizeof(HA_METADATA_KEY),
key_name,
strlen(key_name) + 1
);
error = write_metadata(
status_block,
status_key_info,
sizeof(HA_METADATA_KEY) + strlen(key_name) + 1,
NULL,
0,
txn
);
return error;
}
uint i; //
// some tracing moved out of ha_tokudb::create, because ::create was getting cluttered
//
void ha_tokudb::trace_create_table_info(const char *name, TABLE * form) {
uint i;
// //
// tracing information about what type of table we are creating // tracing information about what type of table we are creating
// //
...@@ -4879,32 +4770,84 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -4879,32 +4770,84 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
} }
} }
} }
}
//
// creates dictionary for secondary index, with key description key_info, all using txn
//
int ha_tokudb::create_secondary_dictionary(const char* name, TABLE* form, KEY* key_info, DB_TXN* txn) {
int error;
DBT row_descriptor;
uchar* row_desc_buff = NULL;
char* newname = NULL;
KEY* prim_key = NULL;
char dict_name[MAX_DICT_NAME_LEN];
uint hpk= (form->s->primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0;
int flags = (key_info->flags & HA_CLUSTERING) ? 0 : DB_DUP + DB_DUPSORT;
// a table is a directory of dictionaries bzero(&row_descriptor, sizeof(row_descriptor));
make_name(dirname, name, 0); row_desc_buff = (uchar *)my_malloc(2*(form->s->fields * 6)+10 ,MYF(MY_WME));
error = mkdirpath(dirname, 0777); if (row_desc_buff == NULL){ error = ENOMEM; goto cleanup;}
if (error != 0) {
error = errno; newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
goto cleanup; if (newname == NULL){ error = ENOMEM; goto cleanup;}
}
dir_path_made = true; sprintf(dict_name, "key-%s", key_info->name);
make_name(newname, name, dict_name);
prim_key = (hpk) ? NULL : &form->s->key_info[primary_key];
//
// setup the row descriptor
//
row_descriptor.data = row_desc_buff;
row_descriptor.size = create_toku_key_descriptor(
row_desc_buff,
false,
key_info->flags & HA_CLUSTERING,
key_info,
hpk,
prim_key
);
error = create_sub_table(newname, flags, &row_descriptor, txn);
cleanup:
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
my_free(row_desc_buff, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
//
// create and close the main dictionarr with name of "name" using table form, all within
// transaction txn.
//
int ha_tokudb::create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn) {
int error;
DBT row_descriptor;
uchar* row_desc_buff = NULL;
char* newname = NULL;
KEY* prim_key = NULL;
uint hpk= (form->s->primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0;
bzero(&row_descriptor, sizeof(row_descriptor));
row_desc_buff = (uchar *)my_malloc(2*(form->s->fields * 6)+10 ,MYF(MY_WME));
if (row_desc_buff == NULL){ error = ENOMEM; goto cleanup;}
newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
if (newname == NULL){ error = ENOMEM; goto cleanup;}
make_name(newname, name, "main"); make_name(newname, name, "main");
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) { prim_key = (hpk) ? NULL : &form->s->key_info[primary_key];
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
// //
// setup the row descriptor // setup the row descriptor
// //
prim_key = (hidden_primary_key) ? NULL : &form->s->key_info[primary_key];
row_descriptor.data = row_desc_buff; row_descriptor.data = row_desc_buff;
row_descriptor.size = create_toku_key_descriptor( row_descriptor.size = create_toku_key_descriptor(
row_desc_buff, row_desc_buff,
hidden_primary_key, hpk,
false, false,
prim_key, prim_key,
false, false,
...@@ -4912,96 +4855,106 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -4912,96 +4855,106 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
); );
/* Create the main table that will hold the real rows */ /* Create the main table that will hold the real rows */
error = create_sub_table(name_buff, 0, &row_descriptor); error = create_sub_table(newname, 0, &row_descriptor, txn);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) { cleanup:
TOKUDB_TRACE("create:%s:error=%d\n", newname, error); my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
} my_free(row_desc_buff, MYF(MY_ALLOW_ZERO_PTR));
if (error) { return error;
goto cleanup; }
}
//
// Creates a new table
// Parameters:
// [in] name - table name
// [in] form - info on table, columns and indexes
// [in] create_info - more info on table, CURRENTLY UNUSED
// Returns:
// 0 on success
// error otherwise
//
int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_info) {
TOKUDB_DBUG_ENTER("ha_tokudb::create");
int error;
DB *status_block = NULL;
KEY* prim_key = NULL;
uint version;
uint capabilities;
DB_TXN* txn = NULL;
char* newname = NULL;
pthread_mutex_lock(&tokudb_meta_mutex);
/* Create the keys */ newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
char dict_name[MAX_DICT_NAME_LEN]; if (newname == NULL){ error = ENOMEM; goto cleanup;}
for (uint i = 0; i < form->s->keys; i++) {
if (i != primary_key) {
int flags = (form->s->key_info[i].flags & HA_CLUSTERING) ? 0 : DB_DUP + DB_DUPSORT;
sprintf(dict_name, "key-%s", form->s->key_info[i].name);
make_name(newname, name, dict_name);
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
//
// setup the row descriptor
//
row_descriptor.size = create_toku_key_descriptor(
row_desc_buff,
false,
form->key_info[i].flags & HA_CLUSTERING,
&form->key_info[i],
hidden_primary_key,
prim_key
);
error = create_sub_table(name_buff, flags, &row_descriptor);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, form->key_info[i].flags, error);
}
if (error) {
goto cleanup;
}
}
}
error = db_create(&status_block, db_env, 0); error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; } if (error) { goto cleanup; }
primary_key = form->s->primary_key;
hidden_primary_key = (primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0;
/* do some tracing */
trace_create_table_info(name,form);
/* Create status.tokudb and save relevant metadata */ /* Create status.tokudb and save relevant metadata */
make_name(newname, name, "status"); make_name(newname, name, "status");
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
error = status_block->open(status_block, NULL, name_buff, NULL, DB_BTREE, DB_CREATE, 0); error = db_create(&status_block, db_env, 0);
if (error) { goto cleanup; }
status_block->set_bt_compare(status_block, tokudb_cmp_dbt_key);
error = status_block->open(status_block, txn, newname, NULL, DB_BTREE, DB_CREATE, 0);
if (error) { goto cleanup; } if (error) { goto cleanup; }
version = HA_TOKU_VERSION; version = HA_TOKU_VERSION;
capabilities = HA_TOKU_CAP; capabilities = HA_TOKU_CAP;
error = write_to_status(status_block, hatoku_version,&version,sizeof(version)); error = write_to_status(status_block, hatoku_version,&version,sizeof(version), txn);
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = write_to_status(status_block, hatoku_capabilities,&capabilities,sizeof(capabilities)); error = write_to_status(status_block, hatoku_capabilities,&capabilities,sizeof(capabilities), txn);
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = write_auto_inc_create(status_block, create_info->auto_increment_value); error = write_auto_inc_create(status_block, create_info->auto_increment_value, txn);
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = create_main_dictionary(name, form, txn);
error = add_table_to_metadata(name, form);
if (error) { if (error) {
goto cleanup; goto cleanup;
} }
for (uint i = 0; i < form->s->keys; i++) {
if (i != primary_key) {
error = create_secondary_dictionary(name, form, &form->key_info[i], txn);
if (error) {
goto cleanup;
}
error = write_key_name_to_status(status_block, form->s->key_info[i].name, txn);
if (error) { goto cleanup; }
}
}
error = add_table_to_metadata(name, form, txn);
if (error) { goto cleanup; }
error = 0; error = 0;
cleanup: cleanup:
if (status_block != NULL) { if (status_block != NULL) {
status_block->close(status_block, 0); status_block->close(status_block, 0);
} }
if (error && dir_path_made) { if (txn) {
rmall(dirname); if (error) {
} abort_txn(txn);
if (error) { }
drop_table_from_metadata(name); else {
commit_txn(txn,0);
}
} }
my_free(newname, MYF(MY_ALLOW_ZERO_PTR)); my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
my_free(dirname, MYF(MY_ALLOW_ZERO_PTR));
my_free(row_desc_buff, MYF(MY_ALLOW_ZERO_PTR));
pthread_mutex_unlock(&tokudb_meta_mutex); pthread_mutex_unlock(&tokudb_meta_mutex);
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -5020,49 +4973,181 @@ int ha_tokudb::discard_or_import_tablespace(my_bool discard) { ...@@ -5020,49 +4973,181 @@ int ha_tokudb::discard_or_import_tablespace(my_bool discard) {
// //
// Drops table // deletes from_name or renames from_name to to_name, all using transaction txn.
// Parameters: // is_delete specifies which we are doing
// [in] name - name of table to be deleted // is_key specifies if it is a secondary index (and hence a "key-" needs to be prepended) or
// Returns: // if it is not a secondary index
// 0 on success
// error otherwise
// //
int ha_tokudb::delete_table(const char *name) { int ha_tokudb::delete_or_rename_dictionary( const char* from_name, const char* to_name, char* secondary_name, bool is_key, DB_TXN* txn, bool is_delete) {
TOKUDB_DBUG_ENTER("ha_tokudb::delete_table"); int error;
char dict_name[MAX_DICT_NAME_LEN];
char* new_from_name = NULL;
char* new_to_name = NULL;
assert(txn);
new_from_name = (char *)my_malloc(
get_max_dict_name_path_length(from_name),
MYF(MY_WME)
);
if (new_from_name == NULL) {
error = ENOMEM;
goto cleanup;
}
if (!is_delete) {
assert(to_name);
new_to_name = (char *)my_malloc(
get_max_dict_name_path_length(to_name),
MYF(MY_WME)
);
if (new_to_name == NULL) {
error = ENOMEM;
goto cleanup;
}
}
if (is_key) {
sprintf(dict_name, "key-%s", secondary_name);
make_name(new_from_name, from_name, dict_name);
}
else {
make_name(new_from_name, from_name, secondary_name);
}
if (!is_delete) {
if (is_key) {
sprintf(dict_name, "key-%s", secondary_name);
make_name(new_to_name, to_name, dict_name);
}
else {
make_name(new_to_name, to_name, secondary_name);
}
}
if (is_delete) {
error = db_env->dbremove(db_env, txn, new_from_name, NULL, 0);
}
else {
error = db_env->dbrename(db_env, txn, new_from_name, NULL, new_to_name, 0);
}
if (error) { goto cleanup; }
cleanup:
my_free(new_from_name, MYF(MY_ALLOW_ZERO_PTR));
my_free(new_to_name, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
//
// deletes or renames a table. if is_delete is true, then we delete, and to_name can be NULL
// if is_delete is false, then to_name must be non-NULL, as we are renaming the table.
//
int ha_tokudb::delete_or_rename_table (const char* from_name, const char* to_name, bool is_delete) {
int error; int error;
char* newname = NULL; DB* status_db;
DB* db_to_delete = NULL;
DBC* status_cursor = NULL;
DB_TXN* txn = NULL;
DBT curr_key;
DBT curr_val;
bzero(&curr_key, sizeof(curr_key));
bzero(&curr_val, sizeof(curr_val));
pthread_mutex_lock(&tokudb_meta_mutex); pthread_mutex_lock(&tokudb_meta_mutex);
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
// //
// this can only fail if we have not opened the environment // modify metadata db
// yet. I want to assert that rather than check for the error
// //
error = db_env->checkpointing_postpone(db_env); if (is_delete) {
assert(!error); error = drop_table_from_metadata(from_name, txn);
// remove all of the dictionaries in the table directory
error = drop_table_from_metadata(name);
if (error) {
goto cleanup;
} }
newname = (char *)my_malloc(get_max_dict_name_path_length(name), MYF(MY_WME|MY_ZEROFILL)); else {
if (newname == NULL) { error = rename_table_in_metadata(from_name, to_name, txn);
error = ENOMEM;
goto cleanup;
} }
make_name(newname, name, 0); if (error) { goto cleanup; }
error = rmall(newname);
//
// open status db,
// create cursor,
// for each name read out of there, create a db and delete or rename it
//
error = open_status_dictionary(&status_db, from_name, txn);
if (error) { goto cleanup; }
error = status_db->cursor(status_db, txn, &status_cursor, 0);
if (error) { goto cleanup; }
while (error != DB_NOTFOUND) {
error = status_cursor->c_get(
status_cursor,
&curr_key,
&curr_val,
DB_NEXT
);
if (error && error != DB_NOTFOUND) { goto cleanup; }
if (error == DB_NOTFOUND) { break; }
HA_METADATA_KEY mk = *(HA_METADATA_KEY *)curr_key.data;
if (mk != hatoku_key_name) {
continue;
}
error = delete_or_rename_dictionary(from_name, to_name, (char *)((char *)curr_key.data + sizeof(HA_METADATA_KEY)), true, txn, is_delete);
if (error) { goto cleanup; }
}
//
// delete or rename main.tokudb
//
error = delete_or_rename_dictionary(from_name, to_name, "main", false, txn, is_delete);
if (error) { goto cleanup; }
error = status_cursor->c_close(status_cursor);
if (error) { goto cleanup; }
status_cursor = NULL;
error = status_db->close(status_db, 0);
if (error) { goto cleanup; }
status_db = NULL;
//
// delete or rename status.tokudb
//
error = delete_or_rename_dictionary(from_name, to_name, "status", false, txn, is_delete);
if (error) { goto cleanup; }
my_errno = error; my_errno = error;
cleanup: cleanup:
{ if (status_cursor) {
int r; status_cursor->c_close(status_cursor);
r = db_env->checkpointing_resume(db_env); }
assert(r==0); if (status_db) {
status_db->close(status_db, 0);
}
if (txn) {
if (error) {
abort_txn(txn);
}
else {
commit_txn(txn, 0);
}
} }
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
pthread_mutex_unlock(&tokudb_meta_mutex); pthread_mutex_unlock(&tokudb_meta_mutex);
TOKUDB_DBUG_RETURN(error); return error;
}
//
// Drops table
// Parameters:
// [in] name - name of table to be deleted
// Returns:
// 0 on success
// error otherwise
//
int ha_tokudb::delete_table(const char *name) {
TOKUDB_DBUG_ENTER("ha_tokudb::delete_table");
TOKUDB_DBUG_RETURN(delete_or_rename_table(name, NULL, true));
} }
...@@ -5078,50 +5163,7 @@ cleanup: ...@@ -5078,50 +5163,7 @@ cleanup:
int ha_tokudb::rename_table(const char *from, const char *to) { int ha_tokudb::rename_table(const char *from, const char *to) {
TOKUDB_DBUG_ENTER("%s %s %s", __FUNCTION__, from, to); TOKUDB_DBUG_ENTER("%s %s %s", __FUNCTION__, from, to);
int error; int error;
char* newfrom = NULL; error = delete_or_rename_table(from, to, false);
char* newto = NULL;
pthread_mutex_lock(&tokudb_meta_mutex);
//
// this can only fail if we have not opened the environment
// yet. I want to assert that rather than check for the error
//
error = db_env->checkpointing_postpone(db_env);
assert(!error);
int n = get_name_length(from) + NAME_CHAR_LEN;
newfrom = (char *)my_malloc(n,MYF(MY_WME));
if (newfrom == NULL){
error = ENOMEM;
goto cleanup;
}
make_name(newfrom, from, 0);
n = get_name_length(to) + NAME_CHAR_LEN;
newto = (char *)my_malloc(n,MYF(MY_WME));
if (newto == NULL){
error = ENOMEM;
goto cleanup;
}
make_name(newto, to, 0);
error = rename(newfrom, newto);
if (error != 0) {
error = my_errno = errno;
goto cleanup;
}
error = rename_table_in_metadata(from, to);
cleanup:
{
int r;
r = db_env->checkpointing_resume(db_env);
assert(r==0);
}
my_free(newfrom, MYF(MY_ALLOW_ZERO_PTR));
my_free(newto, MYF(MY_ALLOW_ZERO_PTR));
pthread_mutex_unlock(&tokudb_meta_mutex);
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -5377,7 +5419,7 @@ void ha_tokudb::init_auto_increment() { ...@@ -5377,7 +5419,7 @@ void ha_tokudb::init_auto_increment() {
share->auto_inc_create_value = 0; share->auto_inc_create_value = 0;
} }
txn->commit(txn,DB_TXN_NOSYNC); commit_txn(txn, 0);
} }
if (tokudb_debug & TOKUDB_DEBUG_AUTO_INCREMENT) { if (tokudb_debug & TOKUDB_DEBUG_AUTO_INCREMENT) {
TOKUDB_TRACE("init auto increment:%lld\n", share->last_auto_increment); TOKUDB_TRACE("init auto increment:%lld\n", share->last_auto_increment);
...@@ -5433,28 +5475,16 @@ bool ha_tokudb::is_auto_inc_singleton(){ ...@@ -5433,28 +5475,16 @@ bool ha_tokudb::is_auto_inc_singleton(){
// //
int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
TOKUDB_DBUG_ENTER("ha_tokudb::add_index"); TOKUDB_DBUG_ENTER("ha_tokudb::add_index");
char name_buff[FN_REFLEN];
int error; int error;
uint curr_index = 0; uint curr_index = 0;
DBC* tmp_cursor = NULL; DBC* tmp_cursor = NULL;
int cursor_ret_val = 0; int cursor_ret_val = 0;
DBT current_primary_key; DBT current_primary_key;
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
char* newname = NULL;
uint newname_len = 0;
uchar* tmp_key_buff = NULL; uchar* tmp_key_buff = NULL;
uchar* tmp_prim_key_buff = NULL; uchar* tmp_prim_key_buff = NULL;
uchar* tmp_record = NULL; uchar* tmp_record = NULL;
THD* thd = ha_thd(); THD* thd = ha_thd();
uchar* row_desc_buff = NULL;
DBT row_descriptor;
char* fn_ret = NULL;
bzero(&row_descriptor, sizeof(row_descriptor));
//
// these variables are for error handling
//
uint num_files_created = 0;
uint num_DB_opened = 0;
// //
// number of DB files we have open currently, before add_index is executed // number of DB files we have open currently, before add_index is executed
// //
...@@ -5467,21 +5497,18 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -5467,21 +5497,18 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
ulonglong num_processed = 0; //variable that stores number of elements inserted thus far ulonglong num_processed = 0; //variable that stores number of elements inserted thus far
thd_proc_info(thd, "Adding indexes"); thd_proc_info(thd, "Adding indexes");
newname_len = get_max_dict_name_path_length(share->table_name);
newname = (char *)my_malloc(newname_len, MYF(MY_WME));
tmp_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME)); tmp_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME));
tmp_prim_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME)); tmp_prim_key_buff = (uchar *)my_malloc(2*table_arg->s->rec_buff_length, MYF(MY_WME));
tmp_record = table->record[0]; tmp_record = table->record[0];
row_desc_buff = (uchar *)my_malloc(2*(table_share->fields * 6)+10 ,MYF(MY_WME)); if (tmp_key_buff == NULL ||
if (newname == NULL || tmp_prim_key_buff == NULL ) {
tmp_key_buff == NULL ||
tmp_prim_key_buff == NULL ||
tmp_record == NULL ||
row_desc_buff == NULL) {
error = ENOMEM; error = ENOMEM;
goto cleanup; goto cleanup;
} }
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
// //
...@@ -5509,38 +5536,9 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -5509,38 +5536,9 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
// //
// first create all the DB's files // first create all the DB's files
// //
row_descriptor.data = row_desc_buff;
char dict_name[MAX_DICT_NAME_LEN];
for (uint i = 0; i < num_of_keys; i++) { for (uint i = 0; i < num_of_keys; i++) {
int flags = (key_info[i].flags & HA_CLUSTERING) ? 0 : DB_DUP + DB_DUPSORT; error = create_secondary_dictionary(share->table_name, table_arg, &key_info[i], txn);
sprintf(dict_name, "key-%s", key_info[i].name);
make_name(newname, share->table_name, dict_name);
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
//
// setup the row descriptor
//
row_descriptor.size = create_toku_key_descriptor(
row_desc_buff,
false,
key_info[i].flags & HA_CLUSTERING,
&key_info[i],
hidden_primary_key,
hidden_primary_key ? NULL : &table_share->key_info[primary_key]
);
error = create_sub_table(name_buff, flags, &row_descriptor);
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("create:%s:flags=%ld:error=%d\n", newname, key_info[i].flags, error);
}
if (error) { goto cleanup; } if (error) { goto cleanup; }
num_files_created++;
}
for (uint i = 0; i < table_share->keys + test(hidden_primary_key); i++) {
} }
...@@ -5573,25 +5571,18 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -5573,25 +5571,18 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
} }
error = open_secondary_table( error = open_secondary_dictionary(
&share->key_file[curr_index], &share->key_file[curr_index],
&key_info[i], &key_info[i],
share->table_name, share->table_name,
0, 2, // TODO: This is a hack. Need to learn what should really be here. Need to ask Yoni
&share->key_type[curr_index] &share->key_type[curr_index],
txn
); );
if (error) { goto cleanup; } if (error) { goto cleanup; }
num_DB_opened++;
} }
//
// scan primary table, create each secondary key, add to each DB
//
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
// //
// grab some locks to make this go faster // grab some locks to make this go faster
// first a global read lock on the main DB, because // first a global read lock on the main DB, because
...@@ -5620,6 +5611,9 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -5620,6 +5611,9 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
if (error) { goto cleanup; } if (error) { goto cleanup; }
} }
//
// scan primary table, create each secondary key, add to each DB
//
if ((error = share->file->cursor(share->file, txn, &tmp_cursor, 0))) { if ((error = share->file->cursor(share->file, txn, &tmp_cursor, 0))) {
tmp_cursor = NULL; // Safety tmp_cursor = NULL; // Safety
goto cleanup; goto cleanup;
...@@ -5727,48 +5721,42 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) { ...@@ -5727,48 +5721,42 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
tmp_cursor = NULL; tmp_cursor = NULL;
} }
error = txn->commit(txn, 0);
txn = NULL; //
assert(error == 0); // now write stuff to status.tokudb
//
pthread_mutex_lock(&share->mutex);
for (uint i = 0; i < num_of_keys; i++) {
write_key_name_to_status(share->status_block, key_info[i].name, txn);
}
pthread_mutex_unlock(&share->mutex);
error = 0; error = 0;
cleanup: cleanup:
if (error) { if (tmp_cursor) {
if (tmp_cursor) { tmp_cursor->c_close(tmp_cursor);
tmp_cursor->c_close(tmp_cursor); tmp_cursor = NULL;
tmp_cursor = NULL; }
} if (txn) {
if (txn) { if (error) {
// curr_index = curr_num_DBs;
// in the case of any error anywhere, we can just nuke all the files created, so we dont need for (uint i = 0; i < num_of_keys; i++, curr_index++) {
// to be tricky and try to roll back changes. That is why we commit the transaction, if (share->key_file[curr_index]) {
// which should be fast. The DB is going to go away anyway, so no pt in trying to keep share->key_file[curr_index]->close(
// it in a good state. share->key_file[curr_index],
// 0
txn->commit(txn, 0); );
} share->key_file[curr_index] = NULL;
// }
// We need to delete all the files that may have been created
// The DB's must be closed and removed
//
for (uint i = curr_num_DBs; i < curr_num_DBs + num_DB_opened; i++) {
share->key_file[i]->close(share->key_file[i], 0);
share->key_file[i] = NULL;
}
for (uint i = 0; i < num_files_created; i++) {
DB* tmp;
sprintf(dict_name, "key-%s", key_info[i].name);
make_name(newname, share->table_name, dict_name);
fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (!(db_create(&tmp, db_env, 0))) {
tmp->remove(tmp, name_buff, NULL, 0);
} }
abort_txn(txn);
}
else {
commit_txn(txn,0);
} }
} }
my_free(newname,MYF(MY_ALLOW_ZERO_PTR));
my_free(tmp_key_buff,MYF(MY_ALLOW_ZERO_PTR)); my_free(tmp_key_buff,MYF(MY_ALLOW_ZERO_PTR));
my_free(tmp_prim_key_buff,MYF(MY_ALLOW_ZERO_PTR)); my_free(tmp_prim_key_buff,MYF(MY_ALLOW_ZERO_PTR));
my_free(row_desc_buff,MYF(MY_ALLOW_ZERO_PTR));
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -5792,58 +5780,31 @@ cleanup: ...@@ -5792,58 +5780,31 @@ cleanup:
int ha_tokudb::prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys) { int ha_tokudb::prepare_drop_index(TABLE *table_arg, uint *key_num, uint num_of_keys) {
TOKUDB_DBUG_ENTER("ha_tokudb::prepare_drop_index"); TOKUDB_DBUG_ENTER("ha_tokudb::prepare_drop_index");
int error; int error;
char name_buff[FN_REFLEN]; DB_TXN* txn = NULL;
char* newname = NULL;
char* fn_ret = NULL;
char dict_name[MAX_DICT_NAME_LEN];
DB** dbs_to_remove = NULL;
newname = (char *)my_malloc( error = db_env->txn_begin(db_env, 0, &txn, 0);
get_max_dict_name_path_length(share->table_name), if (error) { goto cleanup; }
MYF(MY_WME|MY_ZEROFILL)
);
if (newname == NULL) {
error = ENOMEM;
goto cleanup;
}
//
// we allocate an array of DB's here to get ready for removal
// We do this so that all potential memory allocation errors that may occur
// will do so BEFORE we go about dropping any indexes. This way, we
// can fail gracefully without losing integrity of data in such cases. If on
// on the other hand, we started removing DB's, and in the middle,
// one failed, it is not immedietely obvious how one would rollback
//
dbs_to_remove = (DB **)my_malloc(sizeof(*dbs_to_remove)*num_of_keys, MYF(MY_ZEROFILL));
if (dbs_to_remove == NULL) {
error = ENOMEM;
goto cleanup;
}
for (uint i = 0; i < num_of_keys; i++) {
error = db_create(&dbs_to_remove[i], db_env, 0);
if (error) {
goto cleanup;
}
}
for (uint i = 0; i < num_of_keys; i++) { for (uint i = 0; i < num_of_keys; i++) {
uint curr_index = key_num[i]; uint curr_index = key_num[i];
share->key_file[curr_index]->close(share->key_file[curr_index],0); share->key_file[curr_index]->close(share->key_file[curr_index],0);
share->key_file[curr_index] = NULL; share->key_file[curr_index] = NULL;
sprintf(dict_name, "key-%s", table_arg->key_info[curr_index].name);
make_name(newname, share->table_name, dict_name);
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
if (fn_ret == NULL) {
error = HA_ERR_INTERNAL_ERROR;
goto cleanup;
}
dbs_to_remove[i]->remove(dbs_to_remove[i], name_buff, NULL, 0); error = remove_key_name_from_status(share->status_block, table_arg->key_info[curr_index].name, txn);
if (error) { goto cleanup; }
error = delete_or_rename_dictionary(share->table_name, NULL, table_arg->key_info[curr_index].name, true, txn, true);
if (error) { goto cleanup; }
} }
cleanup: cleanup:
my_free(dbs_to_remove, MYF(MY_ALLOW_ZERO_PTR)); if (txn) {
my_free(newname, MYF(MY_ALLOW_ZERO_PTR)); if (error) {
abort_txn(txn);
}
else {
commit_txn(txn,0);
}
}
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
...@@ -5979,11 +5940,66 @@ cleanup: ...@@ -5979,11 +5940,66 @@ cleanup:
tmp_cursor = NULL; tmp_cursor = NULL;
} }
if (do_commit) { if (do_commit) {
error = txn->commit(txn, 0); commit_txn(txn, 0);
} }
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
//
// truncate's dictionary associated with keynr index using transaction txn
// does so by deleting and then recreating the dictionary in the context
// of a transaction
//
int ha_tokudb::truncate_dictionary( uint keynr, DB_TXN* txn ) {
int error;
bool is_pk = (keynr == primary_key);
error = share->key_file[keynr]->close(share->key_file[keynr], 0);
if (error) { goto cleanup; }
share->key_file[keynr] = NULL;
if (is_pk) { share->file = NULL; }
if (is_pk) {
error = delete_or_rename_dictionary(
share->table_name,
NULL,
"main",
false, //is key
txn,
true // is a delete
);
if (error) { goto cleanup; }
}
else {
error = delete_or_rename_dictionary(
share->table_name,
NULL,
table_share->key_info[keynr].name,
true, //is key
txn,
true // is a delete
);
if (error) { goto cleanup; }
}
if (is_pk) {
error = create_main_dictionary(share->table_name, table, txn);
}
else {
error = create_secondary_dictionary(
share->table_name,
table,
&table_share->key_info[keynr],
txn
);
}
if (error) { goto cleanup; }
cleanup:
return error;
}
// delete all rows from a table // delete all rows from a table
// //
// effects: delete all of the rows in the main dictionary and all of the // effects: delete all of the rows in the main dictionary and all of the
...@@ -5997,41 +6013,64 @@ int ha_tokudb::delete_all_rows() { ...@@ -5997,41 +6013,64 @@ int ha_tokudb::delete_all_rows() {
TOKUDB_DBUG_ENTER("delete_all_rows"); TOKUDB_DBUG_ENTER("delete_all_rows");
int error = 0; int error = 0;
uint curr_num_DBs = 0; uint curr_num_DBs = 0;
DB_TXN* txn = NULL;
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) { goto cleanup; }
if (thd_sql_command(ha_thd()) != SQLCOM_TRUNCATE) { if (thd_sql_command(ha_thd()) != SQLCOM_TRUNCATE) {
error = HA_ERR_WRONG_COMMAND; error = HA_ERR_WRONG_COMMAND;
goto cleanup; goto cleanup;
} }
//
// prelock so we know right away if there are any potential
// deadlocks
//
error = acquire_table_lock(transaction, lock_write);
if (error) {
goto cleanup;
}
// truncate all dictionaries
curr_num_DBs = table->s->keys + test(hidden_primary_key); curr_num_DBs = table->s->keys + test(hidden_primary_key);
for (uint i = 0; i < curr_num_DBs; i++) { for (uint i = 0; i < curr_num_DBs; i++) {
DB *db = share->key_file[i]; error = truncate_dictionary(i, txn);
u_int32_t row_count = 0; if (error) { goto cleanup; }
error = db->truncate(db, transaction, &row_count, DB_TRUNCATE_WITHCURSORS);
if (error) {
break;
}
// do something with the row_count?
if (tokudb_debug) {
TOKUDB_TRACE("row_count=%u\n", row_count);
}
} }
// zap the row count // zap the row count
if (error == 0) { if (error == 0) {
share->rows = 0; share->rows = 0;
} }
cleanup: cleanup:
if (txn) {
if (error) {
abort_txn(txn);
}
else {
commit_txn(txn,0);
}
}
//
// regardless of errors, need to reopen the DB's
//
for (uint i = 0; i < curr_num_DBs; i++) {
int r = 0;
if (share->key_file[i] == NULL) {
if (i != primary_key) {
r = open_secondary_dictionary(
&share->key_file[i],
&table_share->key_info[i],
share->table_name,
2, // TODO: This is a hack. Need to learn what should really be here. Need to ask Yoni
&share->key_type[i],
NULL
);
assert(!r);
}
else {
r = open_main_dictionary(
share->table_name,
2, // TODO: This is a hack. Need to learn what should really be here. Need to ask Yoni
NULL
);
assert(!r);
}
}
}
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(error);
} }
......
...@@ -97,6 +97,7 @@ typedef ulonglong HA_METADATA_KEY; ...@@ -97,6 +97,7 @@ typedef ulonglong HA_METADATA_KEY;
#define hatoku_capabilities 1 #define hatoku_capabilities 1
#define hatoku_max_ai 2 //maximum auto increment value found so far #define hatoku_max_ai 2 //maximum auto increment value found so far
#define hatoku_ai_create_value 3 #define hatoku_ai_create_value 3
#define hatoku_key_name 4
typedef struct st_filter_key_part_info { typedef struct st_filter_key_part_info {
uint offset; uint offset;
...@@ -258,14 +259,19 @@ private: ...@@ -258,14 +259,19 @@ private:
int handle_cursor_error(int error, int err_to_return, uint keynr); int handle_cursor_error(int error, int err_to_return, uint keynr);
DBT *get_pos(DBT * to, uchar * pos); DBT *get_pos(DBT * to, uchar * pos);
int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type); int open_main_dictionary(const char* name, int mode, DB_TXN* txn);
int open_secondary_dictionary(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type, DB_TXN* txn);
int open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn);
int acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt); int acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt);
int estimate_num_rows(DB* db, u_int64_t* num_rows); int estimate_num_rows(DB* db, u_int64_t* num_rows);
bool has_auto_increment_flag(uint* index); bool has_auto_increment_flag(uint* index);
int write_to_status(DB* db, HA_METADATA_KEY curr_key_data, void* data, uint size ); int write_to_status(DB* db, HA_METADATA_KEY curr_key_data, void* data, uint size, DB_TXN* txn );
int write_metadata(DB* db, void* key, uint key_size, void* data, uint data_size ); int write_metadata(DB* db, void* key, uint key_size, void* data, uint data_size, DB_TXN* txn );
int remove_metadata(DB* db, void* key_data, uint key_size, DB_TXN* transaction);
int update_max_auto_inc(DB* db, ulonglong val); int update_max_auto_inc(DB* db, ulonglong val);
int write_auto_inc_create(DB* db, ulonglong val); int remove_key_name_from_status(DB* status_block, char* key_name, DB_TXN* txn);
int write_key_name_to_status(DB* status_block, char* key_name, DB_TXN* txn);
int write_auto_inc_create(DB* db, ulonglong val, DB_TXN* txn);
void init_auto_increment(); void init_auto_increment();
int initialize_share( int initialize_share(
const char* name, const char* name,
...@@ -276,7 +282,12 @@ private: ...@@ -276,7 +282,12 @@ private:
int prelock_range ( const key_range *start_key, const key_range *end_key); int prelock_range ( const key_range *start_key, const key_range *end_key);
int create_txn(THD* thd, tokudb_trx_data* trx); int create_txn(THD* thd, tokudb_trx_data* trx);
bool may_table_be_empty(); bool may_table_be_empty();
int delete_or_rename_table (const char* from_name, const char* to_name, bool is_delete);
int delete_or_rename_dictionary( const char* from_name, const char* to_name, char* index_name, bool is_key, DB_TXN* txn, bool is_delete);
int truncate_dictionary( uint keynr, DB_TXN* txn );
int create_secondary_dictionary(const char* name, TABLE* form, KEY* key_info, DB_TXN* txn);
int create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn);
void trace_create_table_info(const char *name, TABLE * form);
public: public:
......
...@@ -114,11 +114,6 @@ static void reset_stmt_progress (tokudb_stmt_progress* val) { ...@@ -114,11 +114,6 @@ static void reset_stmt_progress (tokudb_stmt_progress* val) {
static int get_name_length(const char *name) { static int get_name_length(const char *name) {
int n = 0; int n = 0;
const char *newname = name; const char *newname = name;
if (tokudb_data_dir) {
n += strlen(tokudb_data_dir) + 1;
if (strncmp("./", name, 2) == 0)
newname = name + 2;
}
n += strlen(newname); n += strlen(newname);
n += strlen(ha_tokudb_ext); n += strlen(ha_tokudb_ext);
return n; return n;
...@@ -130,25 +125,37 @@ static int get_name_length(const char *name) { ...@@ -130,25 +125,37 @@ static int get_name_length(const char *name) {
static int get_max_dict_name_path_length(const char *tablename) { static int get_max_dict_name_path_length(const char *tablename) {
int n = 0; int n = 0;
n += get_name_length(tablename); n += get_name_length(tablename);
n += 1; //for the '/' n += 1; //for the '-'
n += MAX_DICT_NAME_LEN; n += MAX_DICT_NAME_LEN;
n += strlen(ha_tokudb_ext);
return n; return n;
} }
static void make_name(char *newname, const char *tablename, const char *dictname) { static void make_name(char *newname, const char *tablename, const char *dictname) {
const char *newtablename = tablename; const char *newtablename = tablename;
char *nn = newname; char *nn = newname;
if (tokudb_data_dir) { assert(tablename);
nn += sprintf(nn, "%s/", tokudb_data_dir); assert(dictname);
if (strncmp("./", tablename, 2) == 0) nn += sprintf(nn, "%s", newtablename);
newtablename = tablename + 2; nn += sprintf(nn, "-%s", dictname);
}
static inline void commit_txn(DB_TXN* txn, u_int32_t flags) {
int r;
r = txn->commit(txn, flags);
if (r != 0) {
sql_print_error("tried committing transaction 0x%x and got error code %d", txn, r);
} }
nn += sprintf(nn, "%s%s", newtablename, ha_tokudb_ext); assert(r == 0);
if (dictname)
nn += sprintf(nn, "/%s%s", dictname, ha_tokudb_ext);
} }
static inline void abort_txn(DB_TXN* txn) {
int r;
r = txn->abort(txn);
if (r != 0) {
sql_print_error("tried aborting transaction 0x%x and got error code %d", txn, r);
}
assert(r == 0);
}
#endif #endif
...@@ -31,7 +31,7 @@ extern "C" { ...@@ -31,7 +31,7 @@ extern "C" {
#undef HAVE_DTRACE #undef HAVE_DTRACE
#undef _DTRACE_VERSION #undef _DTRACE_VERSION
#define TOKU_METADB_NAME ".\\tokudb_meta.tokudb" #define TOKU_METADB_NAME "tokudb_meta"
static inline void *thd_data_get(THD *thd, int slot) { static inline void *thd_data_get(THD *thd, int slot) {
return thd->ha_data[slot].ha_ptr; return thd->ha_data[slot].ha_ptr;
...@@ -300,12 +300,17 @@ static int tokudb_init_func(void *p) { ...@@ -300,12 +300,17 @@ static int tokudb_init_func(void *p) {
r= metadata_db->open(metadata_db, 0, TOKU_METADB_NAME, NULL, DB_BTREE, DB_THREAD|DB_AUTO_COMMIT, 0); r= metadata_db->open(metadata_db, 0, TOKU_METADB_NAME, NULL, DB_BTREE, DB_THREAD|DB_AUTO_COMMIT, 0);
if (r) { if (r) {
sql_print_error("No metadata table exists, so creating it"); if (r != ENOENT) {
sql_print_error("Got error %d when trying to open metadata_db", r);
goto error;
}
sql_print_warning("No metadata table exists, so creating it");
r= metadata_db->open(metadata_db, NULL, TOKU_METADB_NAME, NULL, DB_BTREE, DB_THREAD | DB_CREATE, my_umask); r= metadata_db->open(metadata_db, NULL, TOKU_METADB_NAME, NULL, DB_BTREE, DB_THREAD | DB_CREATE, my_umask);
if (r) { if (r) {
goto error; goto error;
} }
metadata_db->close(metadata_db,0); r = metadata_db->close(metadata_db,0);
assert(r == 0);
r = db_create(&metadata_db, db_env, 0); r = db_create(&metadata_db, db_env, 0);
if (r) { if (r) {
DBUG_PRINT("info", ("failed to create metadata db %d\n", r)); DBUG_PRINT("info", ("failed to create metadata db %d\n", r));
...@@ -435,13 +440,14 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) { ...@@ -435,13 +440,14 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
u_int32_t syncflag = THDVAR(thd, commit_sync) ? 0 : DB_TXN_NOSYNC; u_int32_t syncflag = THDVAR(thd, commit_sync) ? 0 : DB_TXN_NOSYNC;
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot); tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
DB_TXN **txn = all ? &trx->all : &trx->stmt; DB_TXN **txn = all ? &trx->all : &trx->stmt;
int error = 0;
if (*txn) { if (*txn) {
if (tokudb_debug & TOKUDB_DEBUG_TXN) if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("commit:%d:%p\n", all, *txn); TOKUDB_TRACE("commit:%d:%p\n", all, *txn);
error = (*txn)->commit(*txn, syncflag); }
if (*txn == trx->sp_level) commit_txn(*txn, syncflag);
if (*txn == trx->sp_level) {
trx->sp_level = 0; trx->sp_level = 0;
}
*txn = 0; *txn = 0;
} }
else if (tokudb_debug & TOKUDB_DEBUG_TXN) { else if (tokudb_debug & TOKUDB_DEBUG_TXN) {
...@@ -451,7 +457,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) { ...@@ -451,7 +457,7 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
trx->iso_level = hatoku_iso_not_set; trx->iso_level = hatoku_iso_not_set;
} }
reset_stmt_progress(&trx->stmt_progress); reset_stmt_progress(&trx->stmt_progress);
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(0);
} }
static int tokudb_rollback(handlerton * hton, THD * thd, bool all) { static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
...@@ -459,14 +465,15 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) { ...@@ -459,14 +465,15 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
DBUG_PRINT("trans", ("aborting transaction %s", all ? "all" : "stmt")); DBUG_PRINT("trans", ("aborting transaction %s", all ? "all" : "stmt"));
tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot); tokudb_trx_data *trx = (tokudb_trx_data *) thd_data_get(thd, hton->slot);
DB_TXN **txn = all ? &trx->all : &trx->stmt; DB_TXN **txn = all ? &trx->all : &trx->stmt;
int error = 0;
if (*txn) { if (*txn) {
if (tokudb_debug & TOKUDB_DEBUG_TXN) if (tokudb_debug & TOKUDB_DEBUG_TXN) {
TOKUDB_TRACE("rollback:%p\n", *txn); TOKUDB_TRACE("rollback:%p\n", *txn);
error = (*txn)->abort(*txn); }
if (*txn == trx->sp_level) abort_txn(*txn);
trx->sp_level = 0; if (*txn == trx->sp_level) {
*txn = 0; trx->sp_level = 0;
}
*txn = 0;
} }
else { else {
if (tokudb_debug & TOKUDB_DEBUG_TXN) { if (tokudb_debug & TOKUDB_DEBUG_TXN) {
...@@ -477,7 +484,7 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) { ...@@ -477,7 +484,7 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
trx->iso_level = hatoku_iso_not_set; trx->iso_level = hatoku_iso_not_set;
} }
reset_stmt_progress(&trx->stmt_progress); reset_stmt_progress(&trx->stmt_progress);
TOKUDB_DBUG_RETURN(error); TOKUDB_DBUG_RETURN(0);
} }
#if 0 #if 0
...@@ -570,8 +577,6 @@ static bool tokudb_show_data_size(THD * thd, stat_print_fn * stat_print, bool ex ...@@ -570,8 +577,6 @@ static bool tokudb_show_data_size(THD * thd, stat_print_fn * stat_print, bool ex
if (!error) { if (!error) {
char* name = (char *)curr_key.data; char* name = (char *)curr_key.data;
char* newname = NULL; char* newname = NULL;
char name_buff[FN_REFLEN];
char* fn_ret = NULL;
u_int64_t curr_num_bytes = 0; u_int64_t curr_num_bytes = 0;
DB_BTREE_STAT64 dict_stats; DB_BTREE_STAT64 dict_stats;
...@@ -585,12 +590,11 @@ static bool tokudb_show_data_size(THD * thd, stat_print_fn * stat_print, bool ex ...@@ -585,12 +590,11 @@ static bool tokudb_show_data_size(THD * thd, stat_print_fn * stat_print, bool ex
} }
make_name(newname, name, "main"); make_name(newname, name, "main");
fn_ret = fn_format(name_buff, newname, "", 0, MY_UNPACK_FILENAME|MY_SAFE_PATH);
error = db_create(&curr_db, db_env, 0); error = db_create(&curr_db, db_env, 0);
if (error) { goto cleanup; } if (error) { goto cleanup; }
error = curr_db->open(curr_db, 0, name_buff, NULL, DB_BTREE, DB_THREAD, 0); error = curr_db->open(curr_db, 0, newname, NULL, DB_BTREE, DB_THREAD, 0);
if (error == ENOENT) { error = 0; continue; } if (error == ENOENT) { error = 0; continue; }
if (error) { goto cleanup; } if (error) { goto cleanup; }
...@@ -678,7 +682,7 @@ cleanup: ...@@ -678,7 +682,7 @@ cleanup:
tmp_table_cursor->c_close(tmp_table_cursor); tmp_table_cursor->c_close(tmp_table_cursor);
} }
if (txn) { if (txn) {
txn->commit(txn, 0); commit_txn(txn, 0);
} }
if (error) { if (error) {
sql_print_error("got an error %d in show_data_size\n", error); sql_print_error("got an error %d in show_data_size\n", error);
...@@ -763,8 +767,8 @@ static bool tokudb_show_engine_status(THD * thd, stat_print_fn * stat_print) { ...@@ -763,8 +767,8 @@ static bool tokudb_show_engine_status(THD * thd, stat_print_fn * stat_print) {
STATPRINT("logger lock", lockstat); STATPRINT("logger lock", lockstat);
STATPRINT("logger lock counter", buf); STATPRINT("logger lock counter", buf);
lockstat = (engstat.cachetable_lock_ctr & 0x01) ? "Locked" : "Unlocked"; //lockstat = (engstat.cachetable_lock_ctr & 0x01) ? "Locked" : "Unlocked";
lockctr = engstat.cachetable_lock_ctr >> 1; // lsb indicates if locked //lockctr = engstat.cachetable_lock_ctr >> 1; // lsb indicates if locked
sprintf(buf, "%" PRIu32, lockctr); sprintf(buf, "%" PRIu32, lockctr);
STATPRINT("cachetable lock", lockstat); STATPRINT("cachetable lock", lockstat);
STATPRINT("cachetable lock counter", buf); STATPRINT("cachetable lock counter", buf);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment