Commit 210855ce authored by Sergey Vojtovich's avatar Sergey Vojtovich

Move XID_STATE::xid to XID_cache_element

Part of MDEV-7974 - backport fix for mysql bug#12161 (XA and binlog)
parent b7fd7ce2
......@@ -2618,7 +2618,6 @@ class THD: public THD_count, /* this must be first */
st_transactions()
{
bzero((char*)this, sizeof(*this));
xid_state.xid.null();
implicit_xid.null();
init_sql_alloc(&mem_root, "THD::transactions",
ALLOC_ROOT_MIN_BLOCK_SIZE, 0,
......
......@@ -26,6 +26,18 @@
***************************************************************************/
enum xa_states { XA_ACTIVE= 0, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY };
struct XID_cache_insert_element
{
enum xa_states xa_state;
XID *xid;
XID_cache_element *xid_cache_element;
XID_cache_insert_element(enum xa_states xa_state_arg, XID *xid_arg):
xa_state(xa_state_arg), xid(xid_arg) {}
};
class XID_cache_element
{
/*
......@@ -62,10 +74,10 @@ class XID_cache_element
public:
static const int32 ACQUIRED= 1 << 30;
static const int32 RECOVERED= 1 << 29;
XID_STATE *m_xid_state;
/* Error reported by the Resource Manager (RM) to the Transaction Manager. */
uint rm_error;
enum xa_states xa_state;
XID xid;
bool is_set(int32_t flag)
{ return m_state.load(std::memory_order_relaxed) & flag; }
void set(int32_t flag)
......@@ -110,12 +122,13 @@ class XID_cache_element
}
static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)),
XID_cache_element *element,
XID_STATE *xid_state)
XID_cache_insert_element *new_element)
{
DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED));
element->rm_error= 0;
element->m_xid_state= xid_state;
xid_state->xid_cache_element= element;
element->xa_state= new_element->xa_state;
element->xid.set(new_element->xid);
new_element->xid_cache_element= element;
}
static void lf_alloc_constructor(uchar *ptr)
{
......@@ -126,14 +139,12 @@ class XID_cache_element
{
XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
DBUG_ASSERT(!element->is_set(ACQUIRED));
if (element->is_set(RECOVERED))
my_free(element->m_xid_state);
}
static uchar *key(const XID_cache_element *element, size_t *length,
my_bool not_used __attribute__((unused)))
{
*length= element->m_xid_state->xid.key_length();
return element->m_xid_state->xid.key();
*length= element->xid.key_length();
return element->xid.key();
}
};
......@@ -191,7 +202,7 @@ bool XID_STATE::check_has_uncommitted_xa() const
XID *XID_STATE::get_xid() const
{
DBUG_ASSERT(is_explicit_XA());
return const_cast<XID*>(&xid);
return &xid_cache_element->xid;
}
......@@ -221,45 +232,38 @@ void xid_cache_free()
Find recovered XA transaction by XID.
*/
static XID_STATE *xid_cache_search(THD *thd, XID *xid)
static XID_cache_element *xid_cache_search(THD *thd, XID *xid)
{
XID_STATE *xs= 0;
DBUG_ASSERT(thd->xid_hash_pins);
XID_cache_element *element=
(XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
xid->key(), xid->key_length());
if (element)
{
if (element->acquire_recovered())
xs= element->m_xid_state;
if (!element->acquire_recovered())
element= 0;
lf_hash_search_unpin(thd->xid_hash_pins);
DEBUG_SYNC(thd, "xa_after_search");
}
return xs;
return element;
}
bool xid_cache_insert(XID *xid)
{
XID_STATE *xs;
XID_cache_insert_element new_element(XA_PREPARED, xid);
LF_PINS *pins;
int res= 1;
if (!(pins= lf_hash_get_pins(&xid_cache)))
return true;
if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME))))
{
xs->xid.set(xid);
if ((res= lf_hash_insert(&xid_cache, pins, xs)))
my_free(xs);
else
int res= lf_hash_insert(&xid_cache, pins, &new_element);
switch (res)
{
xs->xid_cache_element->xa_state= XA_PREPARED;
xs->xid_cache_element->set(XID_cache_element::RECOVERED);
}
if (res == 1)
case 0:
new_element.xid_cache_element->set(XID_cache_element::RECOVERED);
break;
case 1:
res= 0;
}
lf_hash_put_pins(pins);
......@@ -267,44 +271,42 @@ bool xid_cache_insert(XID *xid)
}
bool xid_cache_insert(THD *thd, XID_STATE *xid_state)
bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid)
{
XID_cache_insert_element new_element(XA_ACTIVE, xid);
if (thd->fix_xid_hash_pins())
return true;
int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state);
int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, &new_element);
switch (res)
{
case 0:
xid_state->xid_cache_element->xa_state= XA_ACTIVE;
xid_state->xid_cache_element= new_element.xid_cache_element;
xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED);
break;
case 1:
my_error(ER_XAER_DUPID, MYF(0));
/* fall through */
default:
xid_state->xid_cache_element= 0;
}
return res;
}
void xid_cache_delete(THD *thd, XID_STATE *xid_state)
static void xid_cache_delete(THD *thd, XID_cache_element *&element)
{
if (xid_state->xid_cache_element)
{
bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED);
DBUG_ASSERT(thd->xid_hash_pins);
xid_state->xid_cache_element->mark_uninitialized();
element->mark_uninitialized();
lf_hash_delete(&xid_cache, thd->xid_hash_pins,
xid_state->xid.key(), xid_state->xid.key_length());
if (recovered)
my_free(xid_state);
else
element->xid.key(), element->xid.key_length());
}
void xid_cache_delete(THD *thd, XID_STATE *xid_state)
{
if (xid_state->xid_cache_element)
{
xid_cache_delete(thd, xid_state->xid_cache_element);
xid_state->xid_cache_element= 0;
xid_state->xid.null();
}
}
}
......@@ -321,7 +323,7 @@ static my_bool xid_cache_iterate_callback(XID_cache_element *element,
my_bool res= FALSE;
if (element->lock())
{
res= arg->action(element->m_xid_state, arg->argument);
res= arg->action(element, arg->argument);
element->unlock();
}
return res;
......@@ -348,12 +350,11 @@ static int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg)
@return TRUE if transaction was rolled back or if the transaction
state is XA_ROLLBACK_ONLY. FALSE otherwise.
*/
static bool xa_trans_rolled_back(XID_STATE *xid_state)
static bool xa_trans_rolled_back(XID_cache_element *element)
{
DBUG_ASSERT(xid_state->is_explicit_XA());
if (xid_state->xid_cache_element->rm_error)
if (element->rm_error)
{
switch (xid_state->xid_cache_element->rm_error) {
switch (element->rm_error) {
case ER_LOCK_WAIT_TIMEOUT:
my_error(ER_XA_RBTIMEOUT, MYF(0));
break;
......@@ -363,10 +364,10 @@ static bool xa_trans_rolled_back(XID_STATE *xid_state)
default:
my_error(ER_XA_RBROLLBACK, MYF(0));
}
xid_state->xid_cache_element->xa_state= XA_ROLLBACK_ONLY;
element->xa_state= XA_ROLLBACK_ONLY;
}
return xid_state->xid_cache_element->xa_state == XA_ROLLBACK_ONLY;
return element->xa_state == XA_ROLLBACK_ONLY;
}
......@@ -404,7 +405,8 @@ bool trans_xa_start(THD *thd)
thd->transaction.xid_state.xid_cache_element->xa_state == XA_IDLE &&
thd->lex->xa_opt == XA_RESUME)
{
bool not_equal= !thd->transaction.xid_state.xid.eq(thd->lex->xid);
bool not_equal=
!thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid);
if (not_equal)
my_error(ER_XAER_NOTA, MYF(0));
else
......@@ -421,11 +423,8 @@ bool trans_xa_start(THD *thd)
my_error(ER_XAER_OUTSIDE, MYF(0));
else if (!trans_begin(thd))
{
DBUG_ASSERT(thd->transaction.xid_state.xid.is_null());
thd->transaction.xid_state.xid.set(thd->lex->xid);
if (xid_cache_insert(thd, &thd->transaction.xid_state))
if (xid_cache_insert(thd, &thd->transaction.xid_state, thd->lex->xid))
{
thd->transaction.xid_state.xid.null();
trans_rollback(thd);
DBUG_RETURN(true);
}
......@@ -455,9 +454,9 @@ bool trans_xa_end(THD *thd)
else if (!thd->transaction.xid_state.is_explicit_XA() ||
thd->transaction.xid_state.xid_cache_element->xa_state != XA_ACTIVE)
thd->transaction.xid_state.er_xaer_rmfail();
else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
else if (!thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid))
my_error(ER_XAER_NOTA, MYF(0));
else if (!xa_trans_rolled_back(&thd->transaction.xid_state))
else if (!xa_trans_rolled_back(thd->transaction.xid_state.xid_cache_element))
thd->transaction.xid_state.xid_cache_element->xa_state= XA_IDLE;
DBUG_RETURN(thd->is_error() ||
......@@ -481,7 +480,7 @@ bool trans_xa_prepare(THD *thd)
if (!thd->transaction.xid_state.is_explicit_XA() ||
thd->transaction.xid_state.xid_cache_element->xa_state != XA_IDLE)
thd->transaction.xid_state.er_xaer_rmfail();
else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
else if (!thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid))
my_error(ER_XAER_NOTA, MYF(0));
else if (ha_prepare(thd))
{
......@@ -510,7 +509,8 @@ bool trans_xa_commit(THD *thd)
bool res= TRUE;
DBUG_ENTER("trans_xa_commit");
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
if (!thd->transaction.xid_state.is_explicit_XA() ||
!thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid))
{
if (thd->fix_xid_hash_pins())
{
......@@ -518,20 +518,18 @@ bool trans_xa_commit(THD *thd)
DBUG_RETURN(TRUE);
}
XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
res= !xs;
if (res)
my_error(ER_XAER_NOTA, MYF(0));
else
if (auto xs= xid_cache_search(thd, thd->lex->xid))
{
res= xa_trans_rolled_back(xs);
ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
xid_cache_delete(thd, xs);
}
else
my_error(ER_XAER_NOTA, MYF(0));
DBUG_RETURN(res);
}
if (xa_trans_rolled_back(&thd->transaction.xid_state))
if (xa_trans_rolled_back(thd->transaction.xid_state.xid_cache_element))
{
xa_trans_force_rollback(thd);
res= thd->is_error();
......@@ -606,7 +604,8 @@ bool trans_xa_rollback(THD *thd)
bool res= TRUE;
DBUG_ENTER("trans_xa_rollback");
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
if (!thd->transaction.xid_state.is_explicit_XA() ||
!thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid))
{
if (thd->fix_xid_hash_pins())
{
......@@ -614,15 +613,14 @@ bool trans_xa_rollback(THD *thd)
DBUG_RETURN(TRUE);
}
XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
if (!xs)
my_error(ER_XAER_NOTA, MYF(0));
else
if (auto xs= xid_cache_search(thd, thd->lex->xid))
{
xa_trans_rolled_back(xs);
ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
xid_cache_delete(thd, xs);
}
else
my_error(ER_XAER_NOTA, MYF(0));
DBUG_RETURN(thd->get_stmt_da()->is_error());
}
......@@ -755,10 +753,10 @@ static uint get_sql_xid(XID *xid, char *buf)
It can be easily fixed later, if necessary.
*/
static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol,
static my_bool xa_recover_callback(XID_cache_element *xs, Protocol *protocol,
char *data, uint data_len, CHARSET_INFO *data_cs)
{
if (xs->xid_cache_element->xa_state == XA_PREPARED)
if (xs->xa_state == XA_PREPARED)
{
protocol->prepare_for_resend();
protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
......@@ -772,14 +770,16 @@ static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol,
}
static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol)
static my_bool xa_recover_callback_short(XID_cache_element *xs,
Protocol *protocol)
{
return xa_recover_callback(xs, protocol, xs->xid.data,
xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin);
}
static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol)
static my_bool xa_recover_callback_verbose(XID_cache_element *xs,
Protocol *protocol)
{
char buf[SQL_XIDSIZE];
uint len= get_sql_xid(&xs->xid, buf);
......
......@@ -20,8 +20,6 @@
class XID_cache_element;
struct XID_STATE {
/* For now, this is only used to catch duplicated external xids */
XID xid; // transaction identifier
XID_cache_element *xid_cache_element;
bool check_has_uncommitted_xa() const;
......@@ -34,7 +32,7 @@ struct XID_STATE {
void xid_cache_init(void);
void xid_cache_free(void);
bool xid_cache_insert(XID *xid);
bool xid_cache_insert(THD *thd, XID_STATE *xid_state);
bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid);
void xid_cache_delete(THD *thd, XID_STATE *xid_state);
bool trans_xa_start(THD *thd);
......
......@@ -1708,7 +1708,8 @@ int spider_check_and_set_time_zone(
}
static int spider_xa_lock(
XID_STATE *xid_state
XID_STATE *xid_state,
XID *xid
) {
THD *thd = current_thd;
int error_num;
......@@ -1726,7 +1727,7 @@ static int spider_xa_lock(
#endif
old_proc_info = thd_proc_info(thd, "Locking xid by Spider");
#ifdef SPIDER_XID_USES_xid_cache_iterate
if (xid_cache_insert(thd, xid_state))
if (xid_cache_insert(thd, xid_state, xid))
{
error_num = (spider_stmt_da_sql_errno(thd) == ER_XAER_DUPID ?
ER_SPIDER_XA_LOCKED_NUM : HA_ERR_OUT_OF_MEM);
......@@ -1948,11 +1949,10 @@ int spider_internal_start_trx(
thd->server_id));
#endif
trx->internal_xid_state.xid.set(&trx->xid);
#ifdef SPIDER_XID_STATE_HAS_in_thd
trx->internal_xid_state.in_thd = 1;
#endif
if ((error_num = spider_xa_lock(&trx->internal_xid_state)))
if ((error_num = spider_xa_lock(&trx->internal_xid_state, &trx->xid)))
{
if (error_num == ER_SPIDER_XA_LOCKED_NUM)
my_message(error_num, ER_SPIDER_XA_LOCKED_STR, MYF(0));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment