Commit 7ed673f3 authored by Sergey Vojtovich's avatar Sergey Vojtovich

MDEV-7793 - Race condition between XA COMMIT/ROLLBACK and disconnect

XA COMMIT/ROLLBACK of XA transaction owned by different thread may access
freed memory if that thread disconnects at the same time.

Also concurrent XA COMMIT/ROLLBACK of recovered XA transaction were not
serialized properly.
parent b9756850
#
# MDEV-7793 - Race condition between XA COMMIT/ROLLBACK and disconnect
#
# Note that this test is meaningful only with valgrind.
XA START 'xatest';
XA END 'xatest';
XA PREPARE 'xatest';
SET debug_sync='xa_after_search SIGNAL parked WAIT_FOR go';
XA COMMIT 'xatest';
SET debug_sync='now WAIT_FOR parked';
# Waiting for thread to get deleted
SET debug_sync='now SIGNAL go';
ERROR XAE04: XAER_NOTA: Unknown XID
SET debug_sync='RESET';
XA START 'xatest';
XA END 'xatest';
XA PREPARE 'xatest';
SET debug_sync='xa_after_search SIGNAL parked WAIT_FOR go';
XA ROLLBACK 'xatest';
SET debug_sync='now WAIT_FOR parked';
# Waiting for thread to get deleted
SET debug_sync='now SIGNAL go';
ERROR XAE04: XAER_NOTA: Unknown XID
SET debug_sync='RESET';
--source include/have_debug_sync.inc
--echo #
--echo # MDEV-7793 - Race condition between XA COMMIT/ROLLBACK and disconnect
--echo #
--echo # Note that this test is meaningful only with valgrind.
let $op= XA COMMIT 'xatest';
let $i= 2;
while ($i)
{
connect con1, localhost, root;
connect con2, localhost, root;
connection con1;
XA START 'xatest';
XA END 'xatest';
XA PREPARE 'xatest';
connection con2;
SET debug_sync='xa_after_search SIGNAL parked WAIT_FOR go';
send_eval $op;
connection default;
SET debug_sync='now WAIT_FOR parked';
disconnect con1;
disable_query_log;
echo # Waiting for thread to get deleted;
while (`SELECT VARIABLE_VALUE!=2 FROM INFORMATION_SCHEMA.SESSION_STATUS WHERE VARIABLE_NAME='max_used_connections'`)
{
real_sleep 0.1;
FLUSH STATUS;
}
enable_query_log;
SET debug_sync='now SIGNAL go';
connection con2;
--error ER_XAER_NOTA
reap;
disconnect con2;
connection default;
SET debug_sync='RESET';
let $op= XA ROLLBACK 'xatest';
dec $i;
}
......@@ -1540,7 +1540,6 @@ void THD::init_for_queries()
variables.trans_alloc_block_size,
variables.trans_prealloc_size);
transaction.xid_state.xid.null();
transaction.xid_state.in_thd=1;
}
......@@ -5186,83 +5185,101 @@ void mark_transaction_to_rollback(THD *thd, bool all)
class XID_cache_element
{
/*
bits 1..30 are reference counter
bit 31 is UNINITIALIZED flag
m_state is used to prevent elements from being deleted while XA RECOVER
iterates xid cache and to prevent recovered elments from being acquired by
multiple threads.
bits 1..29 are reference counter
bit 30 is RECOVERED flag
bit 31 is ACQUIRED flag (thread owns this xid)
bit 32 is unused
Newly allocated and deleted elements have UNINITIALIZED flag set.
Newly allocated and deleted elements have m_state set to 0.
On lock() m_state is atomically incremented. It also creates load-ACQUIRE
memory barrier to make sure m_state is actually updated before furhter
memory accesses. Attempting to lock UNINITIALIED element returns failure
and further accesses to element memory are forbidden.
memory accesses. Attempting to lock an element that has neither ACQUIRED
nor RECOVERED flag set returns failure and further accesses to element
memory are forbidden.
On unlock() m_state is decremented. It also creates store-RELEASE memory
barrier to make sure m_state is actually updated after preceding memory
accesses.
UNINITIALIZED flag is cleared upon successful insert.
ACQUIRED flag is set when thread registers it's xid or when thread acquires
recovered xid.
UNINITIALIZED flag is set before delete in a spin loop, after last reference
is released.
RECOVERED flag is set for elements found during crash recovery.
Currently m_state is only used to prevent elements from being deleted while
XA RECOVER iterates xid cache.
ACQUIRED and RECOVERED flags are cleared before element is deleted from
hash in a spin loop, after last reference is released.
*/
int32 m_state;
static const int32 UNINITIALIZED= 1 << 30;
public:
static const int32 ACQUIRED= 1 << 30;
static const int32 RECOVERED= 1 << 29;
XID_STATE *m_xid_state;
bool lock()
bool is_set(int32 flag)
{ return my_atomic_load32_explicit(&m_state, MY_MEMORY_ORDER_RELAXED) & flag; }
void set(int32 flag)
{
if (my_atomic_add32_explicit(&m_state, 1,
MY_MEMORY_ORDER_ACQUIRE) & UNINITIALIZED)
{
unlock();
return false;
}
return true;
DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED));
my_atomic_add32_explicit(&m_state, flag, MY_MEMORY_ORDER_RELAXED);
}
void unlock()
bool lock()
{
my_atomic_add32_explicit(&m_state, -1, MY_MEMORY_ORDER_RELEASE);
int32 old= my_atomic_add32_explicit(&m_state, 1, MY_MEMORY_ORDER_ACQUIRE);
if (old & (ACQUIRED | RECOVERED))
return true;
unlock();
return false;
}
void unlock()
{ my_atomic_add32_explicit(&m_state, -1, MY_MEMORY_ORDER_RELEASE); }
void mark_uninitialized()
{
int32 old= 0;
while (!my_atomic_cas32_weak_explicit(&m_state, &old, UNINITIALIZED,
int32 old= ACQUIRED;
while (!my_atomic_cas32_weak_explicit(&m_state, &old, 0,
MY_MEMORY_ORDER_RELAXED,
MY_MEMORY_ORDER_RELAXED))
{
old= 0;
old&= ACQUIRED | RECOVERED;
(void) LF_BACKOFF;
}
}
void mark_initialized()
bool acquire_recovered()
{
DBUG_ASSERT(m_state & UNINITIALIZED);
my_atomic_add32_explicit(&m_state, -UNINITIALIZED, MY_MEMORY_ORDER_RELAXED);
int32 old= RECOVERED;
while (!my_atomic_cas32_weak_explicit(&m_state, &old, ACQUIRED | RECOVERED,
MY_MEMORY_ORDER_RELAXED,
MY_MEMORY_ORDER_RELAXED))
{
if (!(old & RECOVERED) || (old & ACQUIRED))
return false;
old= RECOVERED;
(void) LF_BACKOFF;
}
return true;
}
static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)),
XID_cache_element *element,
XID_STATE *xid_state)
{
DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED));
element->m_xid_state= xid_state;
xid_state->xid_cache_element= element;
}
static void lf_alloc_constructor(uchar *ptr)
{
XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
element->m_state= UNINITIALIZED;
element->m_state= 0;
}
static void lf_alloc_destructor(uchar *ptr)
{
XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
if (element->m_state != UNINITIALIZED)
{
DBUG_ASSERT(!element->m_xid_state->in_thd);
DBUG_ASSERT(!element->is_set(ACQUIRED));
if (element->is_set(RECOVERED))
my_free(element->m_xid_state);
}
}
static uchar *key(const XID_cache_element *element, size_t *length,
my_bool not_used __attribute__((unused)))
......@@ -5307,18 +5324,25 @@ void xid_cache_free()
}
/**
Find recovered XA transaction by XID.
*/
XID_STATE *xid_cache_search(THD *thd, XID *xid)
{
XID_STATE *xs= 0;
DBUG_ASSERT(thd->xid_hash_pins);
XID_cache_element *element=
(XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
xid->key(), xid->key_length());
if (element)
{
if (element->acquire_recovered())
xs= element->m_xid_state;
lf_hash_search_unpin(thd->xid_hash_pins);
return element->m_xid_state;
DEBUG_SYNC(thd, "xa_after_search");
}
return 0;
return xs;
}
......@@ -5335,13 +5359,12 @@ bool xid_cache_insert(XID *xid, enum xa_states xa_state)
{
xs->xa_state=xa_state;
xs->xid.set(xid);
xs->in_thd=0;
xs->rm_error=0;
if ((res= lf_hash_insert(&xid_cache, pins, xs)))
my_free(xs);
else
xs->xid_cache_element->mark_initialized();
xs->xid_cache_element->set(XID_cache_element::RECOVERED);
if (res == 1)
res= 0;
}
......@@ -5359,7 +5382,7 @@ bool xid_cache_insert(THD *thd, XID_STATE *xid_state)
switch (res)
{
case 0:
xid_state->xid_cache_element->mark_initialized();
xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED);
break;
case 1:
my_error(ER_XAER_DUPID, MYF(0));
......@@ -5374,12 +5397,13 @@ void xid_cache_delete(THD *thd, XID_STATE *xid_state)
{
if (xid_state->xid_cache_element)
{
bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED);
DBUG_ASSERT(thd->xid_hash_pins);
xid_state->xid_cache_element->mark_uninitialized();
lf_hash_delete(&xid_cache, thd->xid_hash_pins,
xid_state->xid.key(), xid_state->xid.key_length());
xid_state->xid_cache_element= 0;
if (!xid_state->in_thd)
if (recovered)
my_free(xid_state);
}
}
......
......@@ -1137,7 +1137,6 @@ typedef struct st_xid_state {
/* For now, this is only used to catch duplicated external xids */
XID xid; // transaction identifier
enum xa_states xa_state; // used by external XA only
bool in_thd;
/* Error reported by the Resource Manager (RM) to the Transaction Manager. */
uint rm_error;
XID_cache_element *xid_cache_element;
......
......@@ -837,18 +837,9 @@ bool trans_xa_commit(THD *thd)
my_error(ER_OUT_OF_RESOURCES, MYF(0));
DBUG_RETURN(TRUE);
}
/*
xid_state.in_thd is always true beside of xa recovery procedure.
Note, that there is no race condition here between xid_cache_search
and xid_cache_delete, since we always delete our own XID
(thd->lex->xid == thd->transaction.xid_state.xid).
The only case when thd->lex->xid != thd->transaction.xid_state.xid
and xid_state->in_thd == 0 is in the function
xa_cache_insert(XID, xa_states), which is called before starting
client connections, and thus is always single-threaded.
*/
XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
res= !xs || xs->in_thd;
res= !xs;
if (res)
my_error(ER_XAER_NOTA, MYF(0));
else
......@@ -949,7 +940,7 @@ bool trans_xa_rollback(THD *thd)
}
XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
if (!xs || xs->in_thd)
if (!xs)
my_error(ER_XAER_NOTA, MYF(0));
else
{
......
......@@ -1872,7 +1872,6 @@ int spider_internal_start_trx(
trx->internal_xid_state.xa_state = XA_ACTIVE;
trx->internal_xid_state.xid.set(&trx->xid);
trx->internal_xid_state.in_thd = 1;
if ((error_num = spider_xa_lock(&trx->internal_xid_state)))
{
if (error_num == ER_SPIDER_XA_LOCKED_NUM)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment