Commit 784310f1 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

addresses #915

merge auto increment fix into main branch

git-svn-id: file:///svn/mysql/tokudb-engine/src@5725 c7de825b-a66e-492c-adef-691d508d4ae1
parent a906383c
...@@ -1029,6 +1029,85 @@ static void smart_dbt_callback_rowread_heavi(DBT const *key, DBT const *row, vo ...@@ -1029,6 +1029,85 @@ static void smart_dbt_callback_rowread_heavi(DBT const *key, DBT const *row, vo
// //
#define SET_READ_FLAG(flg) ((range_lock_grabbed || current_thd->options & OPTION_TABLE_LOCK) ? ((flg) | DB_PRELOCKED) : (flg)) #define SET_READ_FLAG(flg) ((range_lock_grabbed || current_thd->options & OPTION_TABLE_LOCK) ? ((flg) | DB_PRELOCKED) : (flg))
//
// This method retrieves the value of the auto increment column of a record in MySQL format
// This was basically taken from MyISAM
// Parameters:
// type - the type of the auto increment column (e.g. int, float, double...)
// offset - offset into the record where the auto increment column is stored
// [in] record - MySQL row whose auto increment value we want to extract
// Returns:
// The value of the auto increment column in record
//
ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar *record)
{
ulonglong value= 0; /* Store unsigned values here */
longlong s_value= 0; /* Store signed values here */
const uchar *key= (uchar*) record + offset;
switch (type) {
case HA_KEYTYPE_INT8:
s_value= (longlong) *(char*)key;
break;
case HA_KEYTYPE_BINARY:
value=(ulonglong) *(uchar*) key;
break;
case HA_KEYTYPE_SHORT_INT:
s_value= (longlong) sint2korr(key);
break;
case HA_KEYTYPE_USHORT_INT:
value=(ulonglong) uint2korr(key);
break;
case HA_KEYTYPE_LONG_INT:
s_value= (longlong) sint4korr(key);
break;
case HA_KEYTYPE_ULONG_INT:
value=(ulonglong) uint4korr(key);
break;
case HA_KEYTYPE_INT24:
s_value= (longlong) sint3korr(key);
break;
case HA_KEYTYPE_UINT24:
value=(ulonglong) uint3korr(key);
break;
case HA_KEYTYPE_FLOAT: /* This shouldn't be used */
{
float f_1;
float4get(f_1,key);
/* Ignore negative values */
value = (f_1 < (float) 0.0) ? 0 : (ulonglong) f_1;
break;
}
case HA_KEYTYPE_DOUBLE: /* This shouldn't be used */
{
double f_1;
float8get(f_1,key);
/* Ignore negative values */
value = (f_1 < 0.0) ? 0 : (ulonglong) f_1;
break;
}
case HA_KEYTYPE_LONGLONG:
s_value= sint8korr(key);
break;
case HA_KEYTYPE_ULONGLONG:
value= uint8korr(key);
break;
default:
DBUG_ASSERT(0);
value=0; /* Error */
break;
}
/*
The following code works becasue if s_value < 0 then value is 0
and if s_value == 0 then value will contain either s_value or the
correct value.
*/
return (s_value > 0) ? (ulonglong) s_value : value;
}
// //
// Open a secondary table, the key will be a secondary index, the data will be a primary key // Open a secondary table, the key will be a secondary index, the data will be a primary key
// //
...@@ -1262,6 +1341,13 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) { ...@@ -1262,6 +1341,13 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
__close(1); __close(1);
TOKUDB_DBUG_RETURN(1); TOKUDB_DBUG_RETURN(1);
} }
//
// initialize auto increment data
//
share->has_auto_inc = has_auto_increment_flag(&share->ai_field_index);
if (share->has_auto_inc) {
init_auto_increment();
}
} }
ref_length = share->ref_length; // If second open ref_length = share->ref_length; // If second open
pthread_mutex_unlock(&share->mutex); pthread_mutex_unlock(&share->mutex);
...@@ -1347,7 +1433,98 @@ cleanup: ...@@ -1347,7 +1433,98 @@ cleanup:
return error; return error;
} }
//
// states if table has an auto increment column, if so, sets index where auto inc column is to index
// Parameters:
// [out] index - if auto inc exists, then this param is set to where it exists in table, if not, then unchanged
// Returns:
// true if auto inc column exists, false otherwise
//
bool ha_tokudb::has_auto_increment_flag(uint* index) {
//
// check to see if we have auto increment field
//
bool ai_found = false;
uint i = 0;
for (Field ** field = table->field; *field; field++,i++) {
if ((*field)->flags & AUTO_INCREMENT_FLAG) {
ai_found = true;
*index = i;
break;
}
}
return ai_found;
}
//
// helper function to write a piece of metadata in to status.tokudb
//
int ha_tokudb::write_metadata(DB* db, HA_METADATA_KEY curr_key_data, void* data, ulonglong size ){
int error;
DBT key;
DBT value;
DB_TXN* txn = NULL;
//
// transaction to be used for putting metadata into status.tokudb
//
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
goto cleanup;
}
bzero(&key, sizeof(key));
bzero(&value, sizeof(value));
key.data = &curr_key_data;
key.size = sizeof(curr_key_data);
value.data = data;
value.size = sizeof(size);
error = db->put(db, txn, &key, &value, 0);
if (error) {
goto cleanup;
}
error = 0;
cleanup:
if (txn) {
if (!error) {
txn->commit(txn, DB_TXN_NOSYNC);
}
else {
txn->abort(txn);
}
}
return error;
}
//
// Updates status.tokudb with a new max value used for the auto increment column
// Parameters:
// [in] db - this will always be status.tokudb
// val - value to store
// Returns:
// 0 on success, error otherwise
//
//
int ha_tokudb::update_max_auto_inc(DB* db, ulonglong val){
return write_metadata(db,hatoku_max_ai,&val,sizeof(val));
}
//
// Writes the initial auto increment value, as specified by create table
// so if a user does "create table t1 (a int auto_increment, primary key (a)) auto_increment=100",
// then the value 100 will be stored here in val
// Parameters:
// [in] db - this will always be status.tokudb
// val - value to store
// Returns:
// 0 on success, error otherwise
//
//
int ha_tokudb::write_auto_inc_create(DB* db, ulonglong val){
return write_metadata(db,hatoku_ai_create_value,&val,sizeof(val));
}
// //
...@@ -1823,16 +2000,6 @@ void ha_tokudb::init_hidden_prim_key_info() { ...@@ -1823,16 +2000,6 @@ void ha_tokudb::init_hidden_prim_key_info() {
(void) extra(HA_EXTRA_NO_KEYREAD); (void) extra(HA_EXTRA_NO_KEYREAD);
if (error == 0) { if (error == 0) {
share->auto_ident = uint5korr(current_ident); share->auto_ident = uint5korr(current_ident);
// mysql may not initialize the next_number_field here
// so we do this in the get_auto_increment method
// index_last uses record[1]
assert(table->next_number_field == 0);
if (table->next_number_field) {
share->last_auto_increment = table->next_number_field->val_int_offset(table->s->rec_buff_length);
if (tokudb_debug & TOKUDB_DEBUG_AUTO_INCREMENT)
TOKUDB_TRACE("init auto increment:%lld\n", share->last_auto_increment);
}
} }
share->status |= STATUS_PRIMARY_KEY_INIT; share->status |= STATUS_PRIMARY_KEY_INIT;
...@@ -1973,6 +2140,18 @@ int ha_tokudb::cmp_ref(const uchar * ref1, const uchar * ref2) { ...@@ -1973,6 +2140,18 @@ int ha_tokudb::cmp_ref(const uchar * ref1, const uchar * ref2) {
} }
bool ha_tokudb::check_if_incompatible_data(HA_CREATE_INFO * info, uint table_changes) { bool ha_tokudb::check_if_incompatible_data(HA_CREATE_INFO * info, uint table_changes) {
//
// This is a horrendous hack for now, as copied by InnoDB.
// This states that if the auto increment create field has changed,
// via a "alter table foo auto_increment=new_val", that this
// change is incompatible, and to rebuild the entire table
// This will need to be fixed
//
if ((info->used_fields & HA_CREATE_USED_AUTO) &&
info->auto_increment_value != 0) {
return COMPATIBLE_DATA_NO;
}
if (table_changes < IS_EQUAL_YES) if (table_changes < IS_EQUAL_YES)
return COMPATIBLE_DATA_NO; return COMPATIBLE_DATA_NO;
return COMPATIBLE_DATA_YES; return COMPATIBLE_DATA_YES;
...@@ -2007,6 +2186,29 @@ int ha_tokudb::write_row(uchar * record) { ...@@ -2007,6 +2186,29 @@ int ha_tokudb::write_row(uchar * record) {
if (table->next_number_field && record == table->record[0]) { if (table->next_number_field && record == table->record[0]) {
update_auto_increment(); update_auto_increment();
} }
//
// check to see if some value for the auto increment column that is bigger
// than anything else til now is being used. If so, update the metadata to reflect it
// the goal here is we never want to have a dup key error due to a bad increment
// of the auto inc field.
//
if (share->has_auto_inc && record == table->record[0]) {
pthread_mutex_lock(&share->mutex);
ulonglong curr_auto_inc = retrieve_auto_increment(
table->field[share->ai_field_index]->key_type(),
field_offset(table->field[share->ai_field_index]),
record
);
if (curr_auto_inc > share->last_auto_increment) {
error = update_max_auto_inc(share->status_block, curr_auto_inc);
if (!error) {
share->last_auto_increment = curr_auto_inc;
}
}
pthread_mutex_unlock(&share->mutex);
}
if ((error = pack_row(&row, (const uchar *) record))){ if ((error = pack_row(&row, (const uchar *) record))){
goto cleanup; goto cleanup;
} }
...@@ -2171,6 +2373,30 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2171,6 +2373,30 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
table->timestamp_field->set_time(); table->timestamp_field->set_time();
} }
//
// check to see if some value for the auto increment column that is bigger
// than anything else til now is being used. If so, update the metadata to reflect it
// the goal here is we never want to have a dup key error due to a bad increment
// of the auto inc field.
//
if (share->has_auto_inc && new_row == table->record[0]) {
pthread_mutex_lock(&share->mutex);
ulonglong curr_auto_inc = retrieve_auto_increment(
table->field[share->ai_field_index]->key_type(),
field_offset(table->field[share->ai_field_index]),
new_row
);
if (curr_auto_inc > share->last_auto_increment) {
error = update_max_auto_inc(share->status_block, curr_auto_inc);
if (!error) {
share->last_auto_increment = curr_auto_inc;
}
}
pthread_mutex_unlock(&share->mutex);
}
if (hidden_primary_key) { if (hidden_primary_key) {
primary_key_changed = 0; primary_key_changed = 0;
bzero((void *) &prim_key, sizeof(prim_key)); bzero((void *) &prim_key, sizeof(prim_key));
...@@ -2234,6 +2460,9 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) { ...@@ -2234,6 +2460,9 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
} }
} }
} }
cleanup: cleanup:
if (error == DB_KEYEXIST) { if (error == DB_KEYEXIST) {
error = HA_ERR_FOUND_DUPP_KEY; error = HA_ERR_FOUND_DUPP_KEY;
...@@ -3749,34 +3978,6 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -3749,34 +3978,6 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
} }
} }
//
// check if auto increment is properly defined
// tokudb only supports auto increment on the first field in the primary key
// or the first field in the row
//
int pk_found = 0;
int ai_found = 0;
for (i = 0; i < form->s->keys; i++) {
KEY *key = &form->s->key_info[i];
int is_primary = (strcmp(key->name, "PRIMARY") == 0);
if (is_primary) pk_found = 1;
uint p;
for (p = 0; p < key->key_parts; p++) {
KEY_PART_INFO *key_part = &key->key_part[p];
Field *field = key_part->field;
if (field->flags & AUTO_INCREMENT_FLAG) {
ai_found = 1;
if (is_primary && p > 0)
TOKUDB_DBUG_RETURN(HA_ERR_UNSUPPORTED);
}
}
}
if (!pk_found && ai_found) {
Field *field = form->s->field[0];
if (!(field->flags & AUTO_INCREMENT_FLAG))
TOKUDB_DBUG_RETURN(HA_ERR_UNSUPPORTED);
}
// a table is a directory of dictionaries // a table is a directory of dictionaries
make_name(dirname, name, 0); make_name(dirname, name, 0);
...@@ -3853,6 +4054,10 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in ...@@ -3853,6 +4054,10 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
value.size = sizeof(capabilities); value.size = sizeof(capabilities);
error = status_block->put(status_block, txn, &key, &value, 0); error = status_block->put(status_block, txn, &key, &value, 0);
if (error) { goto quit_status; } if (error) { goto quit_status; }
error = write_auto_inc_create(status_block, create_info->auto_increment_value);
if (error) { goto quit_status; }
quit_status: quit_status:
if (!error) { txn->commit(txn, 0); } if (!error) { txn->commit(txn, 0); }
else { txn->abort(txn); } else { txn->abort(txn); }
...@@ -4089,29 +4294,97 @@ cleanup: ...@@ -4089,29 +4294,97 @@ cleanup:
TOKUDB_DBUG_RETURN(ret_val); TOKUDB_DBUG_RETURN(ret_val);
} }
//
// initializes the auto increment data needed
//
void ha_tokudb::init_auto_increment() {
DBT key;
DBT value;
int error;
HA_METADATA_KEY key_val = hatoku_max_ai;
bzero(&key, sizeof(key));
bzero(&value, sizeof(value));
key.data = &key_val;
key.size = sizeof(key_val);
value.flags = DB_DBT_MALLOC;
DB_TXN* txn = NULL;
error = db_env->txn_begin(db_env, 0, &txn, 0);
if (error) {
share->last_auto_increment = 0;
}
else {
//
// First retrieve hatoku_max_ai, which is max value used by auto increment
// column so far, the max value could have been auto generated (e.g. insert (NULL))
// or it could have been manually inserted by user (e.g. insert (345))
//
error = share->status_block->get(
share->status_block,
txn,
&key,
&value,
0
);
if (error == 0 && value.size == sizeof(share->last_auto_increment)) {
share->last_auto_increment = *(uint *)value.data;
}
else {
share->last_auto_increment = 0;
}
//
// Now retrieve the initial auto increment value, as specified by create table
// so if a user does "create table t1 (a int auto_increment, primary key (a)) auto_increment=100",
// then the value 100 should be stored here
//
key_val = hatoku_ai_create_value;
error = share->status_block->get(
share->status_block,
txn,
&key,
&value,
0
);
if (error == 0 && value.size == sizeof(share->auto_inc_create_value)) {
share->auto_inc_create_value = *(uint *)value.data;
}
else {
share->auto_inc_create_value = 0;
}
txn->commit(txn,DB_TXN_NOSYNC);
}
if (tokudb_debug & TOKUDB_DEBUG_AUTO_INCREMENT) {
TOKUDB_TRACE("init auto increment:%lld\n", share->last_auto_increment);
}
}
void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulonglong nb_desired_values, ulonglong * first_value, ulonglong * nb_reserved_values) { void ha_tokudb::get_auto_increment(ulonglong offset, ulonglong increment, ulonglong nb_desired_values, ulonglong * first_value, ulonglong * nb_reserved_values) {
TOKUDB_DBUG_ENTER("ha_tokudb::get_auto_increment"); TOKUDB_DBUG_ENTER("ha_tokudb::get_auto_increment");
ulonglong nr; ulonglong nr;
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
if (!(share->status & STATUS_AUTO_INCREMENT_INIT)) {
share->status |= STATUS_AUTO_INCREMENT_INIT; if (share->auto_inc_create_value > share->last_auto_increment) {
int error = read_last(); nr = share->auto_inc_create_value;
if (error == 0) { share->last_auto_increment = share->auto_inc_create_value;
share->last_auto_increment = table->next_number_field->val_int_offset(table->s->rec_buff_length);
if (tokudb_debug & TOKUDB_DEBUG_AUTO_INCREMENT)
TOKUDB_TRACE("init auto increment:%lld\n", share->last_auto_increment);
}
} }
nr = share->last_auto_increment + increment; else {
share->last_auto_increment = nr + nb_desired_values - 1; nr = share->last_auto_increment + increment;
pthread_mutex_unlock(&share->mutex); }
update_max_auto_inc(share->status_block, nr + (nb_desired_values - 1)*increment);
share->last_auto_increment = nr + (nb_desired_values - 1)*increment;
if (tokudb_debug & TOKUDB_DEBUG_AUTO_INCREMENT) if (tokudb_debug & TOKUDB_DEBUG_AUTO_INCREMENT) {
TOKUDB_TRACE("get_auto_increment(%lld,%lld,%lld):got:%lld:%lld\n", TOKUDB_TRACE("get_auto_increment(%lld,%lld,%lld):got:%lld:%lld\n",
offset, increment, nb_desired_values, nr, nb_desired_values); offset, increment, nb_desired_values, nr, nb_desired_values);
}
*first_value = nr; *first_value = nr;
*nb_reserved_values = nb_desired_values; *nb_reserved_values = nb_desired_values;
pthread_mutex_unlock(&share->mutex);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
......
...@@ -11,7 +11,7 @@ typedef struct st_tokudb_share { ...@@ -11,7 +11,7 @@ typedef struct st_tokudb_share {
THR_LOCK lock; THR_LOCK lock;
ulonglong auto_ident; ulonglong auto_ident;
ulonglong last_auto_increment; ulonglong last_auto_increment, auto_inc_create_value;
ha_rows rows; ha_rows rows;
DB *status_block; DB *status_block;
// //
...@@ -27,8 +27,15 @@ typedef struct st_tokudb_share { ...@@ -27,8 +27,15 @@ typedef struct st_tokudb_share {
u_int32_t key_type[MAX_KEY +1]; u_int32_t key_type[MAX_KEY +1];
uint status, version, capabilities; uint status, version, capabilities;
uint ref_length; uint ref_length;
bool fixed_length_primary_key, fixed_length_row; bool fixed_length_primary_key, fixed_length_row;
//
// whether table has an auto increment column
//
bool has_auto_inc;
//
// index of auto increment column in table->field, if auto_inc exists
//
uint ai_field_index;
} TOKUDB_SHARE; } TOKUDB_SHARE;
#define HA_TOKU_VERSION 1 #define HA_TOKU_VERSION 1
...@@ -45,7 +52,9 @@ typedef struct st_tokudb_share { ...@@ -45,7 +52,9 @@ typedef struct st_tokudb_share {
// //
typedef enum { typedef enum {
hatoku_version = 0, hatoku_version = 0,
hatoku_capabilities hatoku_capabilities,
hatoku_max_ai, //maximum auto increment value found so far
hatoku_ai_create_value
} HA_METADATA_KEY ; } HA_METADATA_KEY ;
typedef struct st_prim_key_part_info { typedef struct st_prim_key_part_info {
...@@ -174,6 +183,12 @@ private: ...@@ -174,6 +183,12 @@ private:
int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type); int open_secondary_table(DB** ptr, KEY* key_info, const char* name, int mode, u_int32_t* key_type);
int acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt); int acquire_table_lock (DB_TXN* trans, TABLE_LOCK_TYPE lt);
int estimate_num_rows(DB* db, u_int64_t* num_rows); int estimate_num_rows(DB* db, u_int64_t* num_rows);
bool has_auto_increment_flag(uint* index);
int write_metadata(DB* db, HA_METADATA_KEY curr_key_data, void* data, ulonglong size );
int update_max_auto_inc(DB* db, ulonglong val);
int write_auto_inc_create(DB* db, ulonglong val);
void init_auto_increment();
public: public:
ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg); ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment