Commit 7345d371 authored by Julius Goryavsky's avatar Julius Goryavsky

MDEV-24853: Duplicate key generated during cluster configuration change

Incorrect processing of an auto-incrementing field in the
WSREP-related code during applying transactions results in
a duplicate key being created. This is due to the fact that
at the beginning of the write_row() and update_row() functions,
the values of the auto-increment parameters are used, which
are read from the parameters of the current thread, but further
along the code other values are used, which are read from global
variables (when applying a transaction). This can happen when
the cluster configuration has changed while applying a transaction
(for example in the high_priority_service mode for Galera 4).
Further during IST processing duplicating key is detected, and
processing of the DB_DUPLICATE_KEY return code (inside innodb,
in the write_row() handler) results in a call to the
wsrep_thd_self_abort() function.
parent 545cba13
......@@ -107,7 +107,6 @@ extern struct wsrep_service_st {
bool (*wsrep_thd_ignore_table_func)(THD *thd);
long long (*wsrep_thd_trx_seqno_func)(THD *thd);
struct wsrep_ws_handle * (*wsrep_thd_ws_handle_func)(THD *thd);
void (*wsrep_thd_auto_increment_variables_func)(THD *thd, unsigned long long *offset, unsigned long long *increment);
void (*wsrep_set_load_multi_commit_func)(THD *thd, bool split);
bool (*wsrep_is_load_multi_commit_func)(THD *thd);
int (*wsrep_trx_is_aborting_func)(MYSQL_THD thd);
......@@ -155,7 +154,6 @@ extern struct wsrep_service_st {
#define wsrep_thd_ignore_table(T) wsrep_service->wsrep_thd_ignore_table_func(T)
#define wsrep_thd_trx_seqno(T) wsrep_service->wsrep_thd_trx_seqno_func(T)
#define wsrep_thd_ws_handle(T) wsrep_service->wsrep_thd_ws_handle_func(T)
#define wsrep_thd_auto_increment_variables(T,O,I) wsrep_service->wsrep_thd_auto_increment_variables_func(T,O,I)
#define wsrep_set_load_multi_commit(T,S) wsrep_service->wsrep_set_load_multi_commit_func(T,S)
#define wsrep_is_load_multi_commit(T) wsrep_service->wsrep_is_load_multi_commit_func(T)
#define wsrep_trx_is_aborting(T) wsrep_service->wsrep_trx_is_aborting_func(T)
......@@ -212,7 +210,6 @@ my_bool wsrep_thd_is_BF(MYSQL_THD thd, my_bool sync);
my_bool wsrep_thd_is_wsrep(MYSQL_THD thd);
struct wsrep *get_wsrep();
struct wsrep_ws_handle *wsrep_thd_ws_handle(THD *thd);
void wsrep_thd_auto_increment_variables(THD *thd, unsigned long long *offset, unsigned long long *increment);
void wsrep_set_load_multi_commit(THD *thd, bool split);
bool wsrep_is_load_multi_commit(THD *thd);
void wsrep_aborting_thd_enqueue(THD *thd);
......
......@@ -789,7 +789,7 @@ t2 CREATE TABLE `t2` (
`n` int(10) unsigned NOT NULL,
`o` enum('FALSE','TRUE') DEFAULT NULL,
PRIMARY KEY (`m`)
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=latin1
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=latin1
INSERT INTO t1 (b,c) SELECT n,o FROM t2 ;
SHOW CREATE TABLE t1;
Table Create Table
......
......@@ -177,7 +177,6 @@ static struct wsrep_service_st wsrep_handler = {
wsrep_thd_ignore_table,
wsrep_thd_trx_seqno,
wsrep_thd_ws_handle,
wsrep_thd_auto_increment_variables,
wsrep_set_load_multi_commit,
wsrep_is_load_multi_commit,
wsrep_trx_is_aborting,
......
......@@ -125,14 +125,6 @@ longlong wsrep_thd_trx_seqno(THD *)
struct wsrep_ws_handle* wsrep_thd_ws_handle(THD *)
{ return 0; }
void wsrep_thd_auto_increment_variables(THD *thd,
unsigned long long *offset,
unsigned long long *increment)
{
*offset= thd->variables.auto_increment_offset;
*increment= thd->variables.auto_increment_increment;
}
void wsrep_set_load_multi_commit(THD *thd, bool split)
{ }
......
......@@ -851,27 +851,6 @@ bool wsrep_thd_has_explicit_locks(THD *thd)
return thd->mdl_context.has_explicit_locks();
}
/*
Get auto increment variables for THD. Use global settings for
applier threads.
*/
void wsrep_thd_auto_increment_variables(THD* thd,
unsigned long long* offset,
unsigned long long* increment)
{
if (thd->wsrep_exec_mode == REPL_RECV &&
thd->wsrep_conflict_state != REPLAYING)
{
*offset= global_system_variables.auto_increment_offset;
*increment= global_system_variables.auto_increment_increment;
}
else
{
*offset= thd->variables.auto_increment_offset;
*increment= thd->variables.auto_increment_increment;
}
}
my_bool wsrep_thd_is_applier(MYSQL_THD thd)
{
my_bool is_applier= false;
......
......@@ -2556,6 +2556,72 @@ innobase_raw_format(
return(ut_str_sql_format(buf_tmp, buf_tmp_used, buf, buf_size));
}
/*
The helper function nlz(x) calculates the number of leading zeros
in the binary representation of the number "x", either using a
built-in compiler function or a substitute trick based on the use
of the multiplication operation and a table indexed by the prefix
of the multiplication result:
*/
#ifdef __GNUC__
#define nlz(x) __builtin_clzll(x)
#elif defined(_MSC_VER) && !defined(_M_CEE_PURE) && \
(defined(_M_IX86) || defined(_M_X64) || defined(_M_ARM64))
#ifndef __INTRIN_H_
#pragma warning(push, 4)
#pragma warning(disable: 4255 4668)
#include <intrin.h>
#pragma warning(pop)
#endif
__forceinline unsigned int nlz (ulonglong x)
{
#if defined(_M_IX86) || defined(_M_X64)
unsigned long n;
#ifdef _M_X64
_BitScanReverse64(&n, x);
return (unsigned int) n ^ 63;
#else
unsigned long y = (unsigned long) (x >> 32);
unsigned int m = 31;
if (y == 0)
{
y = (unsigned long) x;
m = 63;
}
_BitScanReverse(&n, y);
return (unsigned int) n ^ m;
#endif
#elif defined(_M_ARM64)
return _CountLeadingZeros(x);
#endif
}
#else
inline unsigned int nlz (ulonglong x)
{
static unsigned char table [48] = {
32, 6, 5, 0, 4, 12, 0, 20,
15, 3, 11, 0, 0, 18, 25, 31,
8, 14, 2, 0, 10, 0, 0, 0,
0, 0, 0, 21, 0, 0, 19, 26,
7, 0, 13, 0, 16, 1, 22, 27,
9, 0, 17, 23, 28, 24, 29, 30
};
unsigned int y= (unsigned int) (x >> 32);
unsigned int n= 0;
if (y == 0) {
y= (unsigned int) x;
n= 32;
}
y = y | (y >> 1); // Propagate leftmost 1-bit to the right.
y = y | (y >> 2);
y = y | (y >> 4);
y = y | (y >> 8);
y = y & ~(y >> 16);
y = y * 0x3EF5D037;
return n + table[y >> 26];
}
#endif
/*********************************************************************//**
Compute the next autoinc value.
......@@ -2584,85 +2650,93 @@ innobase_next_autoinc(
ulonglong max_value) /*!< in: max value for type */
{
ulonglong next_value;
ulonglong block = need * step;
ulonglong block;
/* Should never be 0. */
ut_a(need > 0);
ut_a(block > 0);
ut_a(step > 0);
ut_a(max_value > 0);
/*
Allow auto_increment to go over max_value up to max ulonglong.
This allows us to detect that all values are exhausted.
If we don't do this, we will return max_value several times
and get duplicate key errors instead of auto increment value
out of range.
*/
max_value= (~(ulonglong) 0);
/*
We need to calculate the "block" value equal to the product
"step * need". However, when calculating this product, an integer
overflow can occur, so we cannot simply use the usual multiplication
operation. The snippet below calculates the product of two numbers
and detects an unsigned integer overflow:
*/
unsigned int m= nlz(need);
unsigned int n= nlz(step);
if (m + n <= 8 * sizeof(ulonglong) - 2) {
// The bit width of the original values is too large,
// therefore we are guaranteed to get an overflow.
goto overflow;
}
block = need * (step >> 1);
if ((longlong) block < 0) {
goto overflow;
}
block += block;
if (step & 1) {
block += need;
if (block < need) {
goto overflow;
}
}
/* Check for overflow. Current can be > max_value if the value
is in reality a negative value. Also, the visual studio compiler
converts large double values (which hypothetically can then be
passed here as the values of the "current" parameter) automatically
into unsigned long long datatype maximum value: */
if (current > max_value) {
goto overflow;
}
/* According to MySQL documentation, if the offset is greater than
the step then the offset is ignored. */
if (offset > block) {
if (offset > step) {
offset = 0;
}
/* Check for overflow. Current can be > max_value if the value is
in reality a negative value.The visual studio compilers converts
large double values automatically into unsigned long long datatype
maximum value */
if (block >= max_value
|| offset > max_value
|| current >= max_value
|| max_value - offset <= offset) {
next_value = max_value;
/*
Let's round the current value to within a step-size block:
*/
if (current > offset) {
next_value = current - offset;
} else {
ut_a(max_value > current);
ulonglong free = max_value - current;
if (free < offset || free - offset <= block) {
next_value = max_value;
} else {
next_value = 0;
}
next_value = offset - current;
}
next_value -= next_value % step;
if (next_value == 0) {
ulonglong next;
if (current > offset) {
next = (current - offset) / step;
} else {
next = (offset - current) / step;
}
ut_a(max_value > next);
next_value = next * step;
/* Check for multiplication overflow. */
ut_a(next_value >= next);
ut_a(max_value > next_value);
/* Check for overflow */
if (max_value - next_value >= block) {
next_value += block;
if (max_value - next_value >= offset) {
next_value += offset;
} else {
next_value = max_value;
}
} else {
next_value = max_value;
}
/*
Add an offset to the next value and check that the addition
does not cause an integer overflow:
*/
next_value += offset;
if (next_value < offset) {
goto overflow;
}
ut_a(next_value != 0);
ut_a(next_value <= max_value);
/*
Add a block to the next value and check that the addition
does not cause an integer overflow:
*/
next_value += block;
if (next_value < block) {
goto overflow;
}
return(next_value);
overflow:
/*
Allow auto_increment to go over max_value up to max ulonglong.
This allows us to detect that all values are exhausted.
If we don't do this, we will return max_value several times
and get duplicate key errors instead of auto increment value
out of range:
*/
return(~(ulonglong) 0);
}
/********************************************************************//**
......@@ -8169,7 +8243,6 @@ ha_innobase::write_row(
/* Handling of errors related to auto-increment. */
if (auto_inc_used) {
ulonglong auto_inc;
ulonglong col_max_value;
/* Note the number of rows processed for this statement, used
by get_auto_increment() to determine the number of AUTO-INC
......@@ -8179,11 +8252,6 @@ ha_innobase::write_row(
--trx->n_autoinc_rows;
}
/* We need the upper limit of the col type to check for
whether we update the table autoinc counter or not. */
col_max_value =
table->next_number_field->get_max_int_value();
/* Get the value that MySQL attempted to store in the table.*/
auto_inc = table->next_number_field->val_uint();
......@@ -8250,38 +8318,25 @@ ha_innobase::write_row(
if (auto_inc >= m_prebuilt->autoinc_last_value) {
set_max_autoinc:
/* We need the upper limit of the col type to check for
whether we update the table autoinc counter or not. */
ulonglong col_max_value =
table->next_number_field->get_max_int_value();
/* This should filter out the negative
values set explicitly by the user. */
if (auto_inc <= col_max_value) {
ut_ad(m_prebuilt->autoinc_increment > 0);
ulonglong offset;
ulonglong increment;
dberr_t err;
#ifdef WITH_WSREP
/* Applier threads which are processing
ROW events and don't go through server
level autoinc processing, therefore
m_prebuilt autoinc values don't get
properly assigned. Fetch values from
server side. */
if (trx->is_wsrep() &&
wsrep_thd_exec_mode(m_user_thd) == REPL_RECV)
{
wsrep_thd_auto_increment_variables(
m_user_thd, &offset, &increment);
}
else
{
#endif /* WITH_WSREP */
ut_a(m_prebuilt->autoinc_increment > 0);
offset = m_prebuilt->autoinc_offset;
increment = m_prebuilt->autoinc_increment;
#ifdef WITH_WSREP
}
#endif /* WITH_WSREP */
offset = m_prebuilt->autoinc_offset;
increment = m_prebuilt->autoinc_increment;
auto_inc = innobase_next_autoinc(
auto_inc,
1, increment, offset,
auto_inc, 1, increment, offset,
col_max_value);
err = innobase_set_max_autoinc(
......@@ -8949,46 +9004,37 @@ ha_innobase::update_row(
/* A value for an AUTO_INCREMENT column
was specified in the UPDATE statement. */
ulonglong offset;
ulonglong increment;
#ifdef WITH_WSREP
/* Applier threads which are processing
ROW events and don't go through server
level autoinc processing, therefore
m_prebuilt autoinc values don't get
properly assigned. Fetch values from
server side. */
if (trx->is_wsrep() &&
wsrep_thd_exec_mode(m_user_thd) == REPL_RECV)
{
wsrep_thd_auto_increment_variables(
m_user_thd, &offset, &increment);
}
else
{
#endif /* WITH_WSREP */
offset = m_prebuilt->autoinc_offset;
increment = m_prebuilt->autoinc_increment;
#ifdef WITH_WSREP
}
#endif /* WITH_WSREP */
autoinc = innobase_next_autoinc(
autoinc, 1, increment, offset,
table->found_next_number_field->get_max_int_value());
error = innobase_set_max_autoinc(autoinc);
if (m_prebuilt->table->persistent_autoinc) {
/* Update the PAGE_ROOT_AUTO_INC. Yes, we do
this even if dict_table_t::autoinc already was
greater than autoinc, because we cannot know
if any INSERT actually used (and wrote to
PAGE_ROOT_AUTO_INC) a value bigger than our
autoinc. */
btr_write_autoinc(dict_table_get_first_index(
m_prebuilt->table),
autoinc);
/* We need the upper limit of the col type to check for
whether we update the table autoinc counter or not. */
ulonglong col_max_value =
table->found_next_number_field->get_max_int_value();
/* This should filter out the negative
values set explicitly by the user. */
if (autoinc <= col_max_value) {
ulonglong offset;
ulonglong increment;
offset = m_prebuilt->autoinc_offset;
increment = m_prebuilt->autoinc_increment;
autoinc = innobase_next_autoinc(
autoinc, 1, increment, offset,
col_max_value);
error = innobase_set_max_autoinc(autoinc);
if (m_prebuilt->table->persistent_autoinc) {
/* Update the PAGE_ROOT_AUTO_INC. Yes, we do
this even if dict_table_t::autoinc already was
greater than autoinc, because we cannot know
if any INSERT actually used (and wrote to
PAGE_ROOT_AUTO_INC) a value bigger than our
autoinc. */
btr_write_autoinc(dict_table_get_first_index(
m_prebuilt->table),
autoinc);
}
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment