Commit 08a18571 authored by sunny's avatar sunny

Fix for bug# 16979, this is a major change in InnoDB auto-inc handling. There

is one test that fails as of this commit. The updated test case should be
part of the snapshot from MySQL shortly.

Fix for bug# 27950 - Init AUTOINC from delete_row().

Fix for bug# 28781 - Use value specified by MySQL, in update_row().

Summary of structural changes:
==============================
InnoDB needs to hold a table level lock for AUTOINC allocations to overcome
the non-determinism inherent in MySQL SBR for INSERT ... SELECT. In this fix 
for simple INSERT statements (including multi-value inserts), we try and avoid
acquiring the special AUTOINC table level lock unless another transaction has
already reserved the AUTOINC table level lock, in which case we fall back
to the old behavior of acquiring the AUTOINC table level lock.

The max AUTOINC value is now read directly using the low level interface
of InnoDB.
parent c5025f7a
......@@ -418,6 +418,18 @@ dict_table_get_col_name(
return(s);
}
/************************************************************************
Acquire the autoinc lock.*/
void
dict_table_autoinc_lock(
/*====================*/
dict_table_t* table)
{
mutex_enter(&table->autoinc_mutex);
}
/************************************************************************
Initializes the autoinc counter. It is not an error to initialize an already
initialized counter. */
......@@ -428,54 +440,8 @@ dict_table_autoinc_initialize(
dict_table_t* table, /* in: table */
ib_longlong value) /* in: next value to assign to a row */
{
mutex_enter(&(table->autoinc_mutex));
table->autoinc_inited = TRUE;
table->autoinc = value;
mutex_exit(&(table->autoinc_mutex));
}
/************************************************************************
Gets the next autoinc value (== autoinc counter value), 0 if not yet
initialized. If initialized, increments the counter by 1. */
ib_longlong
dict_table_autoinc_get(
/*===================*/
/* out: value for a new row, or 0 */
dict_table_t* table) /* in: table */
{
ib_longlong value;
mutex_enter(&(table->autoinc_mutex));
if (!table->autoinc_inited) {
value = 0;
} else {
value = table->autoinc;
table->autoinc = table->autoinc + 1;
}
mutex_exit(&(table->autoinc_mutex));
return(value);
}
/************************************************************************
Decrements the autoinc counter value by 1. */
void
dict_table_autoinc_decrement(
/*=========================*/
dict_table_t* table) /* in: table */
{
mutex_enter(&(table->autoinc_mutex));
table->autoinc = table->autoinc - 1;
mutex_exit(&(table->autoinc_mutex));
}
/************************************************************************
......@@ -490,32 +456,6 @@ dict_table_autoinc_read(
{
ib_longlong value;
mutex_enter(&(table->autoinc_mutex));
if (!table->autoinc_inited) {
value = 0;
} else {
value = table->autoinc;
}
mutex_exit(&(table->autoinc_mutex));
return(value);
}
/************************************************************************
Peeks the autoinc counter value, 0 if not yet initialized. Does not
increment the counter. The read not protected by any mutex! */
ib_longlong
dict_table_autoinc_peek(
/*====================*/
/* out: value of the counter */
dict_table_t* table) /* in: table */
{
ib_longlong value;
if (!table->autoinc_inited) {
value = 0;
......@@ -527,7 +467,7 @@ dict_table_autoinc_peek(
}
/************************************************************************
Updates the autoinc counter if the value supplied is equal or bigger than the
Updates the autoinc counter if the value supplied is greater than the
current value. If not inited, does nothing. */
void
......@@ -537,15 +477,21 @@ dict_table_autoinc_update(
dict_table_t* table, /* in: table */
ib_longlong value) /* in: value which was assigned to a row */
{
mutex_enter(&(table->autoinc_mutex));
if (table->autoinc_inited && value > table->autoinc) {
if (table->autoinc_inited) {
if (value >= table->autoinc) {
table->autoinc = value + 1;
}
table->autoinc = value;
}
}
/************************************************************************
Release the autoinc lock.*/
mutex_exit(&(table->autoinc_mutex));
void
dict_table_autoinc_unlock(
/*======================*/
dict_table_t* table) /* in: release autoinc lock for this table */
{
mutex_exit(&table->autoinc_mutex);
}
/************************************************************************
......
......@@ -90,6 +90,11 @@ dict_mem_table_create(
mutex_create(&table->autoinc_mutex, SYNC_DICT_AUTOINC_MUTEX);
table->autoinc_inited = FALSE;
/* The actual increment value will be set by MySQL, we simply
default to 1 here.*/
table->autoinc_increment = 1;
#ifdef UNIV_DEBUG
table->magic_n = DICT_TABLE_MAGIC_N;
#endif /* UNIV_DEBUG */
......
......@@ -1918,6 +1918,8 @@ retry:
trx_mark_sql_stat_end(trx);
}
trx->n_autoinc_rows = 0; /* Reset the number AUTO-INC rows required */
if (trx->declared_to_be_inside_innodb) {
/* Release our possible ticket in the FIFO */
......@@ -3326,6 +3328,93 @@ skip_field:
}
}
/************************************************************************
This special handling is really to overcome the limitations of MySQL's
binlogging. We need to eliminate the non-determinism that will arise in
INSERT ... SELECT type of statements, since MySQL binlog only stores the
min value of the autoinc interval. Once that is fixed we can get rid of
the special lock handling.*/
ulong
ha_innobase::innobase_autoinc_lock(void)
/*====================================*/
/* out: DB_SUCCESS if all OK else
error code */
{
ulint error = DB_SUCCESS;
if (thd_sql_command(user_thd) == SQLCOM_INSERT) {
dict_table_autoinc_lock(prebuilt->table);
/* We peek at the dict_table_t::auto_inc_lock to check if
another statement has locked it */
if (prebuilt->trx->auto_inc_lock != NULL) {
/* Release the mutex to avoid deadlocks */
dict_table_autoinc_unlock(prebuilt->table);
goto acquire_auto_inc_lock;
}
} else {
acquire_auto_inc_lock:
error = row_lock_table_autoinc_for_mysql(prebuilt);
if (error == DB_SUCCESS) {
dict_table_autoinc_lock(prebuilt->table);
}
}
return(ulong(error));
}
/************************************************************************
Reset the autoinc value in the table.*/
ulong
ha_innobase::innobase_reset_autoinc(
/*================================*/
/* out: DB_SUCCESS if all went well
else error code */
ulonglong autoinc) /* in: value to store */
{
ulint error;
error = innobase_autoinc_lock();
if (error == DB_SUCCESS) {
dict_table_autoinc_initialize(prebuilt->table, autoinc);
dict_table_autoinc_unlock(prebuilt->table);
}
return(ulong(error));
}
/************************************************************************
Store the autoinc value in the table. The autoinc value is only set if
it's greater than the existing autoinc value in the table.*/
ulong
ha_innobase::innobase_set_max_autoinc(
/*==================================*/
/* out: DB_SUCCES if all went well
else error code */
ulonglong auto_inc) /* in: value to store */
{
ulint error;
error = innobase_autoinc_lock();
if (error == DB_SUCCESS) {
dict_table_autoinc_update(prebuilt->table, auto_inc);
dict_table_autoinc_unlock(prebuilt->table);
}
return(ulong(error));
}
/************************************************************************
Stores a row in an InnoDB database, to the table specified in this
handle. */
......@@ -3336,9 +3425,7 @@ ha_innobase::write_row(
/* out: error code */
mysql_byte* record) /* in: a row in MySQL format */
{
int error;
longlong auto_inc;
longlong dummy;
int error = 0;
ibool auto_inc_used= FALSE;
ulint sql_command;
trx_t* trx = thd_to_trx(user_thd);
......@@ -3437,62 +3524,20 @@ no_commit:
num_write_row++;
/* This is the case where the table has an auto-increment column */
if (table->next_number_field && record == table->record[0]) {
/* This is the case where the table has an
auto-increment column */
/* Initialize the auto-inc counter if it has not been
initialized yet */
if (0 == dict_table_autoinc_peek(prebuilt->table)) {
/* This call initializes the counter */
error = innobase_read_and_init_auto_inc(&dummy);
if (error) {
/* Deadlock or lock wait timeout */
goto func_exit;
}
/* We have to set sql_stat_start to TRUE because
the above call probably has called a select, and
has reset that flag; row_insert_for_mysql has to
know to set the IX intention lock on the table,
something it only does at the start of each
statement */
prebuilt->sql_stat_start = TRUE;
}
/* We have to use the transactional lock mechanism on the
auto-inc counter of the table to ensure that replication and
roll-forward of the binlog exactly imitates also the given
auto-inc values. The lock is released at each SQL statement's
end. This lock also prevents a race where two threads would
call ::get_auto_increment() simultaneously. */
error = row_lock_table_autoinc_for_mysql(prebuilt);
if (error != DB_SUCCESS) {
/* Deadlock or lock wait timeout */
error = convert_error_code_to_mysql(error, user_thd);
if ((error = update_auto_increment())) {
goto func_exit;
}
/* We must use the handler code to update the auto-increment
value to be sure that we increment it correctly. */
if ((error= update_auto_increment()))
goto func_exit;
auto_inc_used = 1;
auto_inc_used = TRUE;
}
if (prebuilt->mysql_template == NULL
|| prebuilt->template_type != ROW_MYSQL_WHOLE_ROW) {
|| prebuilt->template_type != ROW_MYSQL_WHOLE_ROW) {
/* Build the template used in converting quickly between
the two database formats */
......@@ -3503,40 +3548,63 @@ no_commit:
error = row_insert_for_mysql((byte*) record, prebuilt);
if (error == DB_SUCCESS && auto_inc_used) {
/* Handle duplicate key errors */
if (auto_inc_used) {
ulonglong auto_inc;
/* Fetch the value that was set in the autoincrement field */
/* Note the number of rows processed for this statement, used
by get_auto_increment() to determine the number of AUTO-INC
values to reserve. This is only useful for a mult-value INSERT
and is a statement level counter.*/
if (trx->n_autoinc_rows > 0) {
--trx->n_autoinc_rows;
}
/* Get the value that MySQL attempted to store in the table.*/
auto_inc = table->next_number_field->val_int();
if (auto_inc != 0) {
/* This call will update the counter according to the
value that was inserted in the table */
switch (error) {
case DB_DUPLICATE_KEY:
dict_table_autoinc_update(prebuilt->table, auto_inc);
}
}
/* A REPLACE command and LOAD DATA INFILE REPLACE
handle a duplicate key error themselves, but we
must update the autoinc counter if we are performing
those statements. */
/* A REPLACE command and LOAD DATA INFILE REPLACE handle a duplicate
key error themselves, and we must update the autoinc counter if we are
performing those statements. */
switch (sql_command) {
case SQLCOM_LOAD:
if ((trx->duplicates
& (TRX_DUP_IGNORE | TRX_DUP_REPLACE))) {
if (error == DB_DUPLICATE_KEY && auto_inc_used
&& (sql_command == SQLCOM_REPLACE
|| sql_command == SQLCOM_REPLACE_SELECT
|| (sql_command == SQLCOM_INSERT
&& ((trx->duplicates
& (TRX_DUP_IGNORE | TRX_DUP_REPLACE))
== TRX_DUP_IGNORE))
|| (sql_command == SQLCOM_LOAD
&& ((trx->duplicates
& (TRX_DUP_IGNORE | TRX_DUP_REPLACE))
== (TRX_DUP_IGNORE | TRX_DUP_REPLACE))))) {
goto set_max_autoinc;
}
break;
auto_inc = table->next_number_field->val_int();
case SQLCOM_REPLACE:
case SQLCOM_INSERT_SELECT:
case SQLCOM_REPLACE_SELECT:
goto set_max_autoinc;
break;
if (auto_inc != 0) {
dict_table_autoinc_update(prebuilt->table, auto_inc);
default:
break;
}
break;
case DB_SUCCESS:
/* If the actual value inserted is greater than
the upper limit of the interval, then we try and
update the table upper limit. Note: last_value
will be 0 if get_auto_increment() was not called.*/
if (auto_inc > prebuilt->last_value) {
set_max_autoinc:
auto_inc += prebuilt->table->autoinc_increment;
innobase_set_max_autoinc(auto_inc);
}
break;
}
}
......@@ -3544,8 +3612,6 @@ no_commit:
error = convert_error_code_to_mysql(error, user_thd);
/* Tell InnoDB server that there might be work for
utility threads: */
func_exit:
innobase_active_small();
......@@ -3750,6 +3816,32 @@ ha_innobase::update_row(
error = row_update_for_mysql((byte*) old_row, prebuilt);
/* We need to do some special AUTOINC handling for the following case:
INSERT INTO t (c1,c2) VALUES(x,y) ON DUPLICATE KEY UPDATE ...
We need to use the AUTOINC counter that was actually used by
MySQL in the UPDATE statement, which can be different from the
value used in the INSERT statement.*/
if (error == DB_SUCCESS
&& table->next_number_field
&& new_row == table->record[0]
&& thd_sql_command(user_thd) == SQLCOM_INSERT
&& (trx->duplicates & (TRX_DUP_IGNORE | TRX_DUP_REPLACE))
== TRX_DUP_IGNORE) {
longlong auto_inc;
auto_inc = table->next_number_field->val_int();
if (auto_inc != 0) {
auto_inc += prebuilt->table->autoinc_increment;
innobase_set_max_autoinc(auto_inc);
}
}
innodb_srv_conc_exit_innodb(trx);
error = convert_error_code_to_mysql(error, user_thd);
......@@ -3778,6 +3870,19 @@ ha_innobase::delete_row(
ut_a(prebuilt->trx == trx);
/* Only if the table has an AUTOINC column */
if (table->found_next_number_field && record == table->record[0]) {
ulonglong dummy = 0;
error = innobase_get_auto_increment(&dummy);
if (error == DB_SUCCESS) {
dict_table_autoinc_unlock(prebuilt->table);
} else {
goto error_exit;
}
}
if (!prebuilt->upd_node) {
row_get_prebuilt_update_vector(prebuilt);
}
......@@ -3792,6 +3897,7 @@ ha_innobase::delete_row(
innodb_srv_conc_exit_innodb(trx);
error_exit:
error = convert_error_code_to_mysql(error, user_thd);
/* Tell the InnoDB server that there might be work for
......@@ -4106,42 +4212,66 @@ ha_innobase::index_read_last(
}
/************************************************************************
Changes the active index of a handle. */
Get the index for a handle. Does not change active index.*/
int
ha_innobase::change_active_index(
/*=============================*/
/* out: 0 or error code */
uint keynr) /* in: use this index; MAX_KEY means always clustered
index, even if it was internally generated by
InnoDB */
dict_index_t*
ha_innobase::innobase_get_index(
/*============================*/
/* out: NULL or index instance. */
uint keynr) /* in: use this index; MAX_KEY means always
clustered index, even if it was internally
generated by InnoDB */
{
KEY* key=0;
DBUG_ENTER("change_active_index");
KEY* key = 0;
dict_index_t* index = 0;
DBUG_ENTER("innobase_get_index");
ha_statistic_increment(&SSV::ha_read_key_count);
ut_ad(user_thd == ha_thd());
ut_a(prebuilt->trx == thd_to_trx(user_thd));
active_index = keynr;
if (keynr != MAX_KEY && table->s->keys > 0) {
key = table->key_info + active_index;
key = table->key_info + keynr;
prebuilt->index = dict_table_get_index_noninline(
index = dict_table_get_index_noninline(
prebuilt->table, key->name);
} else {
prebuilt->index = dict_table_get_first_index_noninline(
prebuilt->table);
index = dict_table_get_first_index_noninline(prebuilt->table);
}
if (!prebuilt->index) {
if (!index) {
sql_print_error(
"Innodb could not find key n:o %u with name %s "
"from dict cache for table %s",
keynr, key ? key->name : "NULL",
prebuilt->table->name);
}
DBUG_RETURN(index);
}
/************************************************************************
Changes the active index of a handle. */
int
ha_innobase::change_active_index(
/*=============================*/
/* out: 0 or error code */
uint keynr) /* in: use this index; MAX_KEY means always clustered
index, even if it was internally generated by
InnoDB */
{
DBUG_ENTER("change_active_index");
ut_ad(user_thd == ha_thd());
ut_a(prebuilt->trx == thd_to_trx(user_thd));
active_index = keynr;
prebuilt->index = innobase_get_index(keynr);
if (!prebuilt->index) {
DBUG_RETURN(1);
}
......@@ -4984,7 +5114,10 @@ ha_innobase::create(
maximum value in the column. */
auto_inc_value = create_info->auto_increment_value;
dict_table_autoinc_lock(innobase_table);
dict_table_autoinc_initialize(innobase_table, auto_inc_value);
dict_table_autoinc_unlock(innobase_table);
}
/* Tell the InnoDB server that there might be work for
......@@ -7046,15 +7179,15 @@ the value of the auto-inc counter. */
int
ha_innobase::innobase_read_and_init_auto_inc(
/*=========================================*/
/* out: 0 or error code: deadlock or lock wait
timeout */
longlong* ret) /* out: auto-inc value */
/* out: 0 or error code:
deadlock or lock wait timeout */
longlong* value) /* out: the autoinc value */
{
longlong auto_inc;
ulint old_select_lock_type;
ibool trx_was_not_started = FALSE;
ibool stmt_start;
int error;
int mysql_error = 0;
dict_table_t* innodb_table = prebuilt->table;
ibool trx_was_not_started = FALSE;
ut_a(prebuilt);
ut_a(prebuilt->table);
......@@ -7075,103 +7208,47 @@ ha_innobase::innobase_read_and_init_auto_inc(
trx_search_latch_release_if_reserved(prebuilt->trx);
auto_inc = dict_table_autoinc_read(prebuilt->table);
if (auto_inc != 0) {
/* Already initialized */
*ret = auto_inc;
error = 0;
goto func_exit_early;
}
error = row_lock_table_autoinc_for_mysql(prebuilt);
dict_table_autoinc_lock(prebuilt->table);
if (error != DB_SUCCESS) {
error = convert_error_code_to_mysql(error, user_thd);
goto func_exit_early;
}
/* Check again if someone has initialized the counter meanwhile */
auto_inc = dict_table_autoinc_read(prebuilt->table);
if (auto_inc != 0) {
*ret = auto_inc;
error = 0;
goto func_exit_early;
/* Was the AUTOINC counter reset during normal processing, if
so then we simply start count from 1. No need to go to the index.*/
if (auto_inc == 0 && innodb_table->autoinc_inited) {
++auto_inc;
dict_table_autoinc_initialize(innodb_table, auto_inc);
}
(void) extra(HA_EXTRA_KEYREAD);
index_init(table->s->next_number_index, 1);
/* Starting from 5.0.9, we use a consistent read to read the auto-inc
column maximum value. This eliminates the spurious deadlocks caused
by the row X-lock that we previously used. Note the following flaw
in our algorithm: if some other user meanwhile UPDATEs the auto-inc
column, our consistent read will not return the largest value. We
accept this flaw, since the deadlocks were a bigger trouble. */
/* Fetch all the columns in the key */
if (auto_inc == 0) {
dict_index_t* index;
ulint error = DB_SUCCESS;
const char* autoinc_col_name;
prebuilt->hint_need_to_fetch_extra_cols = ROW_RETRIEVE_ALL_COLS;
old_select_lock_type = prebuilt->select_lock_type;
prebuilt->select_lock_type = LOCK_NONE;
ut_a(!innodb_table->autoinc_inited);
/* Eliminate an InnoDB error print that happens when we try to SELECT
from a table when no table has been locked in ::external_lock(). */
prebuilt->trx->n_mysql_tables_in_use++;
index = innobase_get_index(table->s->next_number_index);
error = index_last(table->record[1]);
autoinc_col_name = table->found_next_number_field->field_name;
prebuilt->trx->n_mysql_tables_in_use--;
prebuilt->select_lock_type = old_select_lock_type;
error = row_search_max_autoinc(
index, autoinc_col_name, &auto_inc);
if (error) {
if (error == HA_ERR_END_OF_FILE) {
/* The table was empty, initialize to 1 */
auto_inc = 1;
error = 0;
if (error == DB_SUCCESS) {
++auto_inc;
dict_table_autoinc_initialize(innodb_table, auto_inc);
} else {
/* This should not happen in a consistent read */
sql_print_error("Consistent read of auto-inc column "
"returned %lu", (ulong) error);
auto_inc = -1;
fprintf(stderr, " InnoDB error: Couldn't read the "
"max AUTOINC value from index (%s).\n",
index->name);
goto func_exit;
mysql_error = 1;
}
} else {
/* Initialize to max(col) + 1; we use
'found_next_number_field' below because MySQL in SHOW TABLE
STATUS does not seem to set 'next_number_field'. The comment
in table.h says that 'next_number_field' is set when it is
'active'.
Since 5.1 MySQL enforces that we announce fields which we will
read; as we only do a val_*() call, dbug_tmp_use_all_columns()
with read_set is sufficient. */
my_bitmap_map *old_map;
old_map= dbug_tmp_use_all_columns(table, table->read_set);
auto_inc = (longlong) table->found_next_number_field->
val_int_offset(table->s->rec_buff_length) + 1;
dbug_tmp_restore_column_map(table->read_set, old_map);
}
dict_table_autoinc_initialize(prebuilt->table, auto_inc);
func_exit:
(void) extra(HA_EXTRA_NO_KEYREAD);
index_end();
*value = auto_inc;
*ret = auto_inc;
dict_table_autoinc_unlock(prebuilt->table);
func_exit_early:
/* Since MySQL does not seem to call autocommit after SHOW TABLE
STATUS (even if we would register the trx here), we commit our
transaction here if it was started here. This is to eliminate a
......@@ -7186,6 +7263,63 @@ func_exit_early:
prebuilt->sql_stat_start = stmt_start;
return(mysql_error);
}
/*******************************************************************************
Read the next autoinc value, initialize the table if it's not initialized.
On return if there is no error then the tables AUTOINC lock is locked.*/
ulong
ha_innobase::innobase_get_auto_increment(
ulonglong* value) /* out: autoinc value */
{
ulint error;
do {
error = innobase_autoinc_lock();
if (error == DB_SUCCESS) {
ib_longlong autoinc;
/* Determine the first value of the interval */
autoinc = dict_table_autoinc_read(prebuilt->table);
/* We need to initialize the AUTO-INC value, for
that we release all locks.*/
if (autoinc <= 0) {
trx_t* trx;
trx = prebuilt->trx;
dict_table_autoinc_unlock(prebuilt->table);
if (trx->auto_inc_lock) {
/* If we had reserved the AUTO-INC
lock in this SQL statement we release
it before retrying.*/
row_unlock_table_autoinc_for_mysql(trx);
}
/* Just to make sure */
ut_a(!trx->auto_inc_lock);
int mysql_error;
mysql_error = innobase_read_and_init_auto_inc(
&autoinc);
if (!mysql_error) {
/* Should have read the proper value */
ut_a(autoinc > 0);
} else {
error = DB_ERROR;
}
} else {
*value = (ulonglong) autoinc;
}
}
} while (*value == 0 && error == DB_SUCCESS);
return(error);
}
......@@ -7197,37 +7331,87 @@ auto-inc counter in *first_value, and ULONGLONG_MAX in *nb_reserved_values (as
we have a table-level lock). offset, increment, nb_desired_values are ignored.
*first_value is set to -1 if error (deadlock or lock wait timeout) */
void ha_innobase::get_auto_increment(
void
ha_innobase::get_auto_increment(
/*=================================*/
ulonglong offset, /* in */
ulonglong increment, /* in */
ulonglong nb_desired_values, /* in */
ulonglong *first_value, /* out */
ulonglong *nb_reserved_values) /* out */
ulonglong offset, /* in: */
ulonglong increment, /* in: table autoinc increment */
ulonglong nb_desired_values, /* in: number of values reqd */
ulonglong *first_value, /* out: the autoinc value */
ulonglong *nb_reserved_values) /* out: count of reserved values */
{
longlong nr;
int error;
ulint error;
ulonglong autoinc = 0;
/* Prepare prebuilt->trx in the table handle */
update_thd(ha_thd());
error = innobase_read_and_init_auto_inc(&nr);
error = innobase_get_auto_increment(&autoinc);
if (error) {
/* This should never happen in the current (5.0.6) code, since
we call this function only after the counter has been
initialized. */
if (error != DB_SUCCESS) {
/* This should never happen in the code > ver 5.0.6,
since we call this function only after the counter
has been initialized. */
ut_print_timestamp(stderr);
sql_print_error("Error %lu in ::get_auto_increment()",
(ulong) error);
*first_value= (~(ulonglong) 0);
sql_print_error("Error %lu in ::get_auto_increment()", error);
*first_value = (~(ulonglong) 0);
return;
}
*first_value= (ulonglong) nr;
/* table-level autoinc lock reserves up to +inf */
*nb_reserved_values= ULONGLONG_MAX;
/* This is a hack, since nb_desired_values seems to be accurate only
for the first call to get_auto_increment() for multi-row INSERT and
meaningless for other statements e.g, LOAD etc. Subsequent calls to
this method for the same statement results in different values which
don't make sense. Therefore we store the value the first time we are
called and count down from that as rows are written (see write_row()).
We make one exception, if the *first_value is precomputed by MySQL
we use that value. And set the number of reserved values to 1 if
this is the first time we were called for the SQL statement, this
will force MySQL to call us for the next value. If we are in the
middle of a multi-row insert we preserve the existing counter.*/
if (*first_value == 0) {
/* Called for the first time ? */
if (prebuilt->trx->n_autoinc_rows == 0) {
prebuilt->trx->n_autoinc_rows = nb_desired_values;
/* It's possible for nb_desired_values to be 0:
e.g., INSERT INTO T1(C) SELECT C FROM T2; */
if (nb_desired_values == 0) {
++prebuilt->trx->n_autoinc_rows;
}
}
*first_value = autoinc;
} else if (prebuilt->trx->n_autoinc_rows == 0) {
prebuilt->trx->n_autoinc_rows = 1;
}
ut_a(prebuilt->trx->n_autoinc_rows > 0);
*nb_reserved_values = prebuilt->trx->n_autoinc_rows;
/* Compute the last value in the interval */
prebuilt->last_value = *first_value + (*nb_reserved_values * increment);
ut_a(prebuilt->last_value >= *first_value);
/* Update the table autoinc variable */
dict_table_autoinc_update(prebuilt->table, prebuilt->last_value);
/* The increment to be used to increase the AUTOINC value, we use
this in write_row() and update_row() to increase the autoinc counter
for columns that are filled by the user.*/
prebuilt->table->autoinc_increment = increment;
dict_table_autoinc_unlock(prebuilt->table);
}
/* See comment in handler.h */
......@@ -7248,7 +7432,7 @@ ha_innobase::reset_auto_increment(ulonglong value)
DBUG_RETURN(error);
}
dict_table_autoinc_initialize(prebuilt->table, value);
innobase_reset_autoinc(value);
DBUG_RETURN(0);
}
......@@ -7534,6 +7718,7 @@ innobase_xa_prepare(
row_unlock_table_autoinc_for_mysql(trx);
}
/* Store the current undo_no of the transaction so that we
know where to roll back if we have to roll back the next
SQL statement */
......
......@@ -32,7 +32,10 @@ typedef struct st_innobase_share {
} INNOBASE_SHARE;
struct dict_index_struct;
struct row_prebuilt_struct;
typedef struct dict_index_struct dict_index_t;
typedef struct row_prebuilt_struct row_prebuilt_t;
/* The class defining a handle to an Innodb table */
......@@ -70,6 +73,11 @@ class ha_innobase: public handler
int change_active_index(uint keynr);
int general_fetch(uchar* buf, uint direction, uint match_mode);
int innobase_read_and_init_auto_inc(longlong* ret);
ulong innobase_autoinc_lock();
ulong innobase_set_max_autoinc(ulonglong auto_inc);
ulong innobase_reset_autoinc(ulonglong auto_inc);
ulong innobase_get_auto_increment(ulonglong* value);
dict_index_t* innobase_get_index(uint keynr);
/* Init values for the class: */
public:
......
......@@ -171,6 +171,13 @@ dict_col_name_is_reserved(
/* out: TRUE if name is reserved */
const char* name); /* in: column name */
/************************************************************************
Acquire the autoinc lock.*/
void
dict_table_autoinc_lock(
/*====================*/
dict_table_t* table); /* in: table */
/************************************************************************
Initializes the autoinc counter. It is not an error to initialize an already
initialized counter. */
......@@ -180,22 +187,6 @@ dict_table_autoinc_initialize(
dict_table_t* table, /* in: table */
ib_longlong value); /* in: next value to assign to a row */
/************************************************************************
Gets the next autoinc value (== autoinc counter value), 0 if not yet
initialized. If initialized, increments the counter by 1. */
ib_longlong
dict_table_autoinc_get(
/*===================*/
/* out: value for a new row, or 0 */
dict_table_t* table); /* in: table */
/************************************************************************
Decrements the autoinc counter value by 1. */
void
dict_table_autoinc_decrement(
/*=========================*/
dict_table_t* table); /* in: table */
/************************************************************************
Reads the next autoinc value (== autoinc counter value), 0 if not yet
initialized. */
......@@ -205,15 +196,6 @@ dict_table_autoinc_read(
/* out: value for a new row, or 0 */
dict_table_t* table); /* in: table */
/************************************************************************
Peeks the autoinc counter value, 0 if not yet initialized. Does not
increment the counter. The read not protected by any mutex! */
ib_longlong
dict_table_autoinc_peek(
/*====================*/
/* out: value of the counter */
dict_table_t* table); /* in: table */
/************************************************************************
Updates the autoinc counter if the value supplied is equal or bigger than the
current value. If not inited, does nothing. */
......@@ -223,6 +205,13 @@ dict_table_autoinc_update(
dict_table_t* table, /* in: table */
ib_longlong value); /* in: value which was assigned to a row */
/************************************************************************
Release the autoinc lock.*/
void
dict_table_autoinc_unlock(
/*======================*/
dict_table_t* table); /* in: table */
/**************************************************************************
Adds a table object to the dictionary cache. */
......
......@@ -410,6 +410,11 @@ struct dict_table_struct{
SELECT MAX(auto inc column) */
ib_longlong autoinc;/* autoinc counter value to give to the
next inserted row */
ib_longlong autoinc_increment;
/* The increment step of the auto increment
column. Value must be greater than or equal
to 1 */
#ifdef UNIV_DEBUG
ulint magic_n;/* magic number */
# define DICT_TABLE_MAGIC_N 76333786
......
......@@ -670,6 +670,7 @@ struct row_prebuilt_struct {
to this heap */
mem_heap_t* old_vers_heap; /* memory heap where a previous
version is built in consistent read */
ulonglong last_value; /* last value of AUTO-INC interval */
ulint magic_n2; /* this should be the same as
magic_n */
};
......
......@@ -171,7 +171,17 @@ row_search_check_if_query_cache_permitted(
trx_t* trx, /* in: transaction object */
const char* norm_name); /* in: concatenation of database name,
'/' char, table name */
/***********************************************************************
Read the max AUTOINC value from an index. */
ulint
row_search_max_autoinc(
/*===================*/
/* out: DB_SUCCESS if all OK else
error code */
dict_index_t* index, /* in: index to search */
const char* col_name, /* in: autoinc column name */
ib_longlong* value); /* out: AUTOINC value read */
/* A structure for caching column values for prefetched rows */
struct sel_buf_struct{
......
......@@ -681,6 +681,9 @@ struct trx_struct{
trx_undo_arr_t* undo_no_arr; /* array of undo numbers of undo log
records which are currently processed
by a rollback operation */
ulint n_autoinc_rows; /* no. of AUTO-INC rows required for
an SQL statement. This is useful for
multi-row INSERTs */
/*------------------------------*/
char detailed_error[256]; /* detailed error message for last
error, or empty. */
......
......@@ -655,6 +655,8 @@ row_create_prebuilt(
prebuilt->old_vers_heap = NULL;
prebuilt->last_value = 0;
return(prebuilt);
}
......@@ -2894,6 +2896,8 @@ next_rec:
dict_table_change_id_in_cache(table, new_id);
}
/* MySQL calls ha_innobase::reset_auto_increment() which does
the same thing. */
dict_table_autoinc_initialize(table, 0);
dict_update_statistics(table);
......
......@@ -4519,3 +4519,169 @@ row_search_check_if_query_cache_permitted(
return(ret);
}
/***********************************************************************
Read the AUTOINC column from the current row. */
static
ib_longlong
row_search_autoinc_read_column(
/*===========================*/
/* out: value read from the column */
dict_index_t* index, /* in: index to read from */
const rec_t* rec, /* in: current rec */
ulint col_no, /* in: column number */
ibool unsigned_type) /* in: signed or unsigned flag */
{
ulint len;
byte* ptr;
const byte* data;
ib_longlong value;
mem_heap_t* heap = NULL;
byte dest[sizeof(value)];
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
*offsets_ = sizeof offsets_ / sizeof *offsets_;
/* TODO: We have to cast away the const of rec for now. This needs
to be fixed later.*/
offsets = rec_get_offsets(
(rec_t*) rec, index, offsets, ULINT_UNDEFINED, &heap);
/* TODO: We have to cast away the const of rec for now. This needs
to be fixed later.*/
data = rec_get_nth_field((rec_t*)rec, offsets, col_no, &len);
ut_a(len != UNIV_SQL_NULL);
ut_a(len <= sizeof value);
/* Convert integer data from Innobase to a little-endian format,
sign bit restored to normal */
for (ptr = dest + len; ptr != dest; ++data) {
--ptr;
*ptr = *data;
}
if (!unsigned_type) {
dest[len - 1] ^= 128;
}
/* The assumption here is that the AUTOINC value can't be negative.*/
switch (len) {
case 8:
value = *(ib_longlong*) ptr;
break;
case 4:
value = *(ib_uint32_t*) ptr;
break;
case 2:
value = *(uint16 *) ptr;
break;
case 1:
value = *ptr;
break;
default:
ut_error;
}
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
ut_a(value >= 0);
return(value);
}
/***********************************************************************
Get the last row. */
static
const rec_t*
row_search_autoinc_get_rec(
/*=======================*/
/* out: current rec or NULL */
btr_pcur_t* pcur, /* in: the current cursor */
mtr_t* mtr) /* in: mini transaction */
{
do {
const rec_t* rec = btr_pcur_get_rec(pcur);
if (page_rec_is_user_rec(rec)) {
return(rec);
}
} while (btr_pcur_move_to_prev(pcur, mtr));
return(NULL);
}
/***********************************************************************
Read the max AUTOINC value from an index. */
ulint
row_search_max_autoinc(
/*===================*/
/* out: DB_SUCCESS if all OK else
error code, DB_RECORD_NOT_FOUND if
column name can't be found in index */
dict_index_t* index, /* in: index to search */
const char* col_name, /* in: name of autoinc column */
ib_longlong* value) /* out: AUTOINC value read */
{
ulint i;
ulint n_cols;
dict_field_t* dfield = NULL;
ulint error = DB_SUCCESS;
n_cols = dict_index_get_n_ordering_defined_by_user(index);
/* Search the index for the AUTOINC column name */
for (i = 0; i < n_cols; ++i) {
dfield = dict_index_get_nth_field(index, i);
if (strcmp(col_name, dfield->name) == 0) {
break;
}
}
*value = 0;
/* Must find the AUTOINC column name */
if (i < n_cols && dfield) {
mtr_t mtr;
btr_pcur_t pcur;
mtr_start(&mtr);
/* Open at the high/right end (FALSE), and INIT
cursor (TRUE) */
btr_pcur_open_at_index_side(
FALSE, index, BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
if (page_get_n_recs(btr_pcur_get_page(&pcur)) > 0) {
const rec_t* rec;
rec = row_search_autoinc_get_rec(&pcur, &mtr);
if (rec != NULL) {
ibool unsigned_type = (
dfield->col->prtype & DATA_UNSIGNED);
*value = row_search_autoinc_read_column(
index, rec, i, unsigned_type);
}
}
btr_pcur_close(&pcur);
mtr_commit(&mtr);
} else {
error = DB_RECORD_NOT_FOUND;
}
return(error);
}
......@@ -195,6 +195,8 @@ trx_create(
memset(&trx->xid, 0, sizeof(trx->xid));
trx->xid.formatID = -1;
trx->n_autoinc_rows = 0;
trx_reset_new_rec_lock_info(trx);
return(trx);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment