Commit 6bd24dea authored by Sergey Vojtovich's avatar Sergey Vojtovich

MDEV-7728 - Improve xid cache scalability by using lock-free hash

XID cache is now based on lock-free hash.
Also fixed lf_hash_destroy() to call alloc destructor.

Note that previous implementation had race condition when thread was accessing
XA owned by different thread. This new implementation doesn't fix it either.
parent 18e9c314
......@@ -117,7 +117,12 @@ uint lf_alloc_pool_count(LF_ALLOCATOR *allocator);
#define lf_alloc_free(PINS, PTR) lf_pinbox_free((PINS), (PTR))
#define lf_alloc_get_pins(A) lf_pinbox_get_pins(&(A)->pinbox)
#define lf_alloc_put_pins(PINS) lf_pinbox_put_pins(PINS)
#define lf_alloc_direct_free(ALLOC, ADDR) my_free((ADDR))
#define lf_alloc_direct_free(ALLOC, ADDR) \
do { \
if ((ALLOC)->destructor) \
(ALLOC)->destructor((uchar*) ADDR); \
my_free(ADDR); \
} while(0)
void *lf_alloc_new(LF_PINS *pins);
......
......@@ -43,8 +43,8 @@ COMMIT;
XA RECOVER;
formatID gtrid_length bqual_length data
1 3 0 789
1 3 0 456
1 3 0 123
1 3 0 456
XA ROLLBACK '123';
XA ROLLBACK '456';
XA COMMIT '789';
......
......@@ -120,10 +120,6 @@ where name like "wait/synch/mutex/sql/LOCK_audit_mask";
count(name)
1
select count(name) from mutex_instances
where name like "wait/synch/mutex/sql/LOCK_xid_cache";
count(name)
1
select count(name) from mutex_instances
where name like "wait/synch/mutex/sql/LOCK_plugin";
count(name)
1
......
......@@ -5,14 +5,14 @@ SELECT * FROM performance_schema.setup_instruments
WHERE name IN (
'wait/synch/mutex/sql/LOCK_user_conn',
'wait/synch/mutex/sql/LOCK_uuid_generator',
'wait/synch/mutex/sql/LOCK_xid_cache',
'wait/synch/mutex/sql/LOCK_plugin',
'stage/sql/creating table')
AND enabled = 'yes' AND timed = 'no'
ORDER BY name;
NAME ENABLED TIMED
stage/sql/creating table YES NO
wait/synch/mutex/sql/LOCK_plugin YES NO
wait/synch/mutex/sql/LOCK_user_conn YES NO
wait/synch/mutex/sql/LOCK_xid_cache YES NO
SELECT * FROM performance_schema.setup_instruments
WHERE name = 'wait/synch/mutex/sql/LOCK_thread_count'
AND enabled = 'no' AND timed = 'no';
......
......@@ -117,9 +117,6 @@ select count(name) from mutex_instances
select count(name) from mutex_instances
where name like "wait/synch/mutex/sql/LOCK_audit_mask";
select count(name) from mutex_instances
where name like "wait/synch/mutex/sql/LOCK_xid_cache";
select count(name) from mutex_instances
where name like "wait/synch/mutex/sql/LOCK_plugin";
......
......@@ -12,7 +12,7 @@
--loose-performance-schema-instrument='wait/synch/mutex/sql/LOCK_thread_count=OFF'
--loose-performance-schema-instrument=' wait/synch/mutex/sql/LOCK_user_conn = COUNTED'
--loose-performance-schema-instrument='wait%/synch/mutex/sql/LOCK_uu%_genera%/= COUNTED'
--loose-performance-schema-instrument='%%wait/synch/mutex/sql/LOCK_xid_cache=COUNTED'
--loose-performance-schema-instrument='%%wait/synch/mutex/sql/LOCK_plugin=COUNTED'
--loose-performance-schema-instrument='%=FOO'
--loose-performance-schema-instrument='%=%'
--loose-performance-schema-instrument='%'
......
......@@ -15,7 +15,7 @@ SELECT * FROM performance_schema.setup_instruments
WHERE name IN (
'wait/synch/mutex/sql/LOCK_user_conn',
'wait/synch/mutex/sql/LOCK_uuid_generator',
'wait/synch/mutex/sql/LOCK_xid_cache',
'wait/synch/mutex/sql/LOCK_plugin',
'stage/sql/creating table')
AND enabled = 'yes' AND timed = 'no'
ORDER BY name;
......
......@@ -1932,12 +1932,28 @@ int ha_recover(HASH *commit_list)
so mysql_xa_recover does not filter XID's to ensure uniqueness.
It can be easily fixed later, if necessary.
*/
static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol)
{
if (xs->xa_state == XA_PREPARED)
{
protocol->prepare_for_resend();
protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE);
protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE);
protocol->store(xs->xid.data, xs->xid.gtrid_length + xs->xid.bqual_length,
&my_charset_bin);
if (protocol->write())
return TRUE;
}
return FALSE;
}
bool mysql_xa_recover(THD *thd)
{
List<Item> field_list;
Protocol *protocol= thd->protocol;
int i=0;
XID_STATE *xs;
DBUG_ENTER("mysql_xa_recover");
field_list.push_back(new Item_int("formatID", 0, MY_INT32_NUM_DECIMAL_DIGITS));
......@@ -1949,26 +1965,9 @@ bool mysql_xa_recover(THD *thd)
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
mysql_mutex_lock(&LOCK_xid_cache);
while ((xs= (XID_STATE*) my_hash_element(&xid_cache, i++)))
{
if (xs->xa_state==XA_PREPARED)
{
protocol->prepare_for_resend();
protocol->store_longlong((longlong)xs->xid.formatID, FALSE);
protocol->store_longlong((longlong)xs->xid.gtrid_length, FALSE);
protocol->store_longlong((longlong)xs->xid.bqual_length, FALSE);
protocol->store(xs->xid.data, xs->xid.gtrid_length+xs->xid.bqual_length,
&my_charset_bin);
if (protocol->write())
{
mysql_mutex_unlock(&LOCK_xid_cache);
if (xid_cache_iterate(thd, (my_hash_walk_action) xa_recover_callback,
protocol))
DBUG_RETURN(1);
}
}
}
mysql_mutex_unlock(&LOCK_xid_cache);
my_eof(thd);
DBUG_RETURN(0);
}
......
......@@ -612,11 +612,11 @@ struct xid_t {
return sizeof(formatID)+sizeof(gtrid_length)+sizeof(bqual_length)+
gtrid_length+bqual_length;
}
uchar *key()
uchar *key() const
{
return (uchar *)&gtrid_length;
}
uint key_length()
uint key_length() const
{
return sizeof(gtrid_length)+sizeof(bqual_length)+gtrid_length+bqual_length;
}
......
......@@ -4889,11 +4889,7 @@ static int init_server_components()
my_charset_error_reporter= charset_error_reporter;
#endif
if (xid_cache_init())
{
sql_print_error("Out of memory");
unireg_abort(1);
}
xid_cache_init();
/*
initialize delegates for extension observers, errors have already
......
......@@ -914,7 +914,8 @@ THD::THD(bool is_wsrep_applier)
wait_for_commit_ptr(0),
main_da(0, false, false),
m_stmt_da(&main_da),
tdc_hash_pins(0)
tdc_hash_pins(0),
xid_hash_pins(0)
#ifdef WITH_WSREP
,
wsrep_applier(is_wsrep_applier),
......@@ -1593,7 +1594,7 @@ void THD::cleanup(void)
transaction.xid_state.xa_state= XA_NOTR;
trans_rollback(this);
xid_cache_delete(&transaction.xid_state);
xid_cache_delete(this, &transaction.xid_state);
DBUG_ASSERT(open_tables == NULL);
/*
......@@ -1704,6 +1705,8 @@ THD::~THD()
main_da.free_memory();
if (tdc_hash_pins)
lf_hash_put_pins(tdc_hash_pins);
if (xid_hash_pins)
lf_hash_put_pins(xid_hash_pins);
/* Ensure everything is freed */
if (status_var.local_memory_used != 0)
{
......@@ -5106,120 +5109,232 @@ void mark_transaction_to_rollback(THD *thd, bool all)
/***************************************************************************
Handling of XA id cacheing
***************************************************************************/
class XID_cache_element
{
/*
bits 1..31 are reference counter
bit 32 is UNINITIALIZED flag
mysql_mutex_t LOCK_xid_cache;
HASH xid_cache;
Newly allocated and deleted elements have UNINITIALIZED flag set.
extern "C" uchar *xid_get_hash_key(const uchar *, size_t *, my_bool);
extern "C" void xid_free_hash(void *);
On lock() m_state is atomically incremented. It also creates load-ACQUIRE
memory barrier to make sure m_state is actually updated before furhter
memory accesses. Attempting to lock UNINITIALIED element returns failure
and further accesses to element memory are forbidden.
uchar *xid_get_hash_key(const uchar *ptr, size_t *length,
my_bool not_used __attribute__((unused)))
{
*length=((XID_STATE*)ptr)->xid.key_length();
return ((XID_STATE*)ptr)->xid.key();
}
On unlock() m_state is decremented. It also creates store-RELEASE memory
barrier to make sure m_state is actually updated after preceding memory
accesses.
void xid_free_hash(void *ptr)
{
if (!((XID_STATE*)ptr)->in_thd)
my_free(ptr);
}
UNINITIALIZED flag is cleared upon successful insert.
#ifdef HAVE_PSI_INTERFACE
static PSI_mutex_key key_LOCK_xid_cache;
UNINITIALIZED flag is set before delete in a spin loop, after last reference
is released.
static PSI_mutex_info all_xid_mutexes[]=
{
{ &key_LOCK_xid_cache, "LOCK_xid_cache", PSI_FLAG_GLOBAL}
Currently m_state is only used to prevent elements from being deleted while
XA RECOVER iterates xid cache.
*/
uint32 m_state;
static const uint32 UNINITIALIZED= 1 << 31;
public:
XID_STATE *m_xid_state;
bool lock()
{
if (my_atomic_add32_explicit(&m_state, 1,
MY_MEMORY_ORDER_ACQUIRE) & UNINITIALIZED)
{
unlock();
return false;
}
return true;
}
void unlock()
{
my_atomic_add32_explicit(&m_state, -1, MY_MEMORY_ORDER_RELEASE);
}
void mark_uninitialized()
{
uint old= 0;
while (!my_atomic_cas32_weak_explicit(&m_state, &old, UNINITIALIZED,
MY_MEMORY_ORDER_RELAXED,
MY_MEMORY_ORDER_RELAXED))
{
old= 0;
(void) LF_BACKOFF;
}
}
void mark_initialized()
{
DBUG_ASSERT(m_state & UNINITIALIZED);
my_atomic_add32_explicit(&m_state, -UNINITIALIZED, MY_MEMORY_ORDER_RELAXED);
}
static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)),
XID_cache_element *element,
XID_STATE *xid_state)
{
element->m_xid_state= xid_state;
xid_state->xid_cache_element= element;
}
static void lf_alloc_constructor(uchar *ptr)
{
XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
element->m_state= UNINITIALIZED;
}
static void lf_alloc_destructor(uchar *ptr)
{
XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
if (element->m_state != UNINITIALIZED)
{
DBUG_ASSERT(!element->m_xid_state->in_thd);
my_free(element->m_xid_state);
}
}
static uchar *key(const XID_cache_element *element, size_t *length,
my_bool not_used __attribute__((unused)))
{
*length= element->m_xid_state->xid.key_length();
return element->m_xid_state->xid.key();
}
};
static void init_xid_psi_keys(void)
{
const char* category= "sql";
int count;
if (PSI_server == NULL)
return;
static LF_HASH xid_cache;
static bool xid_cache_inited;
count= array_elements(all_xid_mutexes);
PSI_server->register_mutex(category, all_xid_mutexes, count);
}
#endif /* HAVE_PSI_INTERFACE */
bool xid_cache_init()
bool THD::fix_xid_hash_pins()
{
#ifdef HAVE_PSI_INTERFACE
init_xid_psi_keys();
#endif
if (!xid_hash_pins)
xid_hash_pins= lf_hash_get_pins(&xid_cache);
return !xid_hash_pins;
}
mysql_mutex_init(key_LOCK_xid_cache, &LOCK_xid_cache, MY_MUTEX_INIT_FAST);
return my_hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
xid_get_hash_key, xid_free_hash, 0) != 0;
void xid_cache_init()
{
xid_cache_inited= true;
lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0,
(my_hash_get_key) XID_cache_element::key, &my_charset_bin);
xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor;
xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor;
xid_cache.initializer=
(lf_hash_initializer) XID_cache_element::lf_hash_initializer;
}
void xid_cache_free()
{
if (my_hash_inited(&xid_cache))
if (xid_cache_inited)
{
my_hash_free(&xid_cache);
mysql_mutex_destroy(&LOCK_xid_cache);
lf_hash_destroy(&xid_cache);
xid_cache_inited= false;
}
}
XID_STATE *xid_cache_search(XID *xid)
XID_STATE *xid_cache_search(THD *thd, XID *xid)
{
mysql_mutex_lock(&LOCK_xid_cache);
XID_STATE *res=(XID_STATE *)my_hash_search(&xid_cache, xid->key(),
xid->key_length());
mysql_mutex_unlock(&LOCK_xid_cache);
return res;
DBUG_ASSERT(thd->xid_hash_pins);
XID_cache_element *element=
(XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
xid->key(), xid->key_length());
if (element)
{
lf_hash_search_unpin(thd->xid_hash_pins);
return element->m_xid_state;
}
return 0;
}
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
{
XID_STATE *xs;
my_bool res;
mysql_mutex_lock(&LOCK_xid_cache);
if (my_hash_search(&xid_cache, xid->key(), xid->key_length()))
res=0;
else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
res=1;
else
LF_PINS *pins;
int res= 1;
if (!(pins= lf_hash_get_pins(&xid_cache)))
return true;
if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME))))
{
xs->xa_state=xa_state;
xs->xid.set(xid);
xs->in_thd=0;
xs->rm_error=0;
res=my_hash_insert(&xid_cache, (uchar*)xs);
if ((res= lf_hash_insert(&xid_cache, pins, xs)))
my_free(xs);
else
xs->xid_cache_element->mark_initialized();
if (res == 1)
res= 0;
}
mysql_mutex_unlock(&LOCK_xid_cache);
lf_hash_put_pins(pins);
return res;
}
bool xid_cache_insert(XID_STATE *xid_state)
bool xid_cache_insert(THD *thd, XID_STATE *xid_state)
{
mysql_mutex_lock(&LOCK_xid_cache);
if (my_hash_search(&xid_cache, xid_state->xid.key(),
xid_state->xid.key_length()))
if (thd->fix_xid_hash_pins())
return true;
int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state);
switch (res)
{
mysql_mutex_unlock(&LOCK_xid_cache);
case 0:
xid_state->xid_cache_element->mark_initialized();
break;
case 1:
my_error(ER_XAER_DUPID, MYF(0));
return true;
default:
xid_state->xid_cache_element= 0;
}
bool res= my_hash_insert(&xid_cache, (uchar*)xid_state);
mysql_mutex_unlock(&LOCK_xid_cache);
return res;
}
void xid_cache_delete(XID_STATE *xid_state)
void xid_cache_delete(THD *thd, XID_STATE *xid_state)
{
if (xid_state->xid_cache_element)
{
DBUG_ASSERT(thd->xid_hash_pins);
xid_state->xid_cache_element->mark_uninitialized();
lf_hash_delete(&xid_cache, thd->xid_hash_pins,
xid_state->xid.key(), xid_state->xid.key_length());
xid_state->xid_cache_element= 0;
if (!xid_state->in_thd)
my_free(xid_state);
}
}
struct xid_cache_iterate_arg
{
my_hash_walk_action action;
void *argument;
};
static my_bool xid_cache_iterate_callback(XID_cache_element *element,
xid_cache_iterate_arg *arg)
{
my_bool res= FALSE;
if (element->lock())
{
res= arg->action(element->m_xid_state, arg->argument);
element->unlock();
}
return res;
}
int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg)
{
mysql_mutex_lock(&LOCK_xid_cache);
my_hash_delete(&xid_cache, (uchar *)xid_state);
mysql_mutex_unlock(&LOCK_xid_cache);
xid_cache_iterate_arg argument= { action, arg };
return thd->fix_xid_hash_pins() ? -1 :
lf_hash_iterate(&xid_cache, thd->xid_hash_pins,
(my_hash_walk_action) xid_cache_iterate_callback,
&argument);
}
......
......@@ -1119,6 +1119,7 @@ struct st_savepoint {
enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY};
extern const char *xa_state_names[];
class XID_cache_element;
typedef struct st_xid_state {
/* For now, this is only used to catch duplicated external xids */
......@@ -1127,16 +1128,16 @@ typedef struct st_xid_state {
bool in_thd;
/* Error reported by the Resource Manager (RM) to the Transaction Manager. */
uint rm_error;
XID_cache_element *xid_cache_element;
} XID_STATE;
extern mysql_mutex_t LOCK_xid_cache;
extern HASH xid_cache;
bool xid_cache_init(void);
void xid_cache_init(void);
void xid_cache_free(void);
XID_STATE *xid_cache_search(XID *xid);
XID_STATE *xid_cache_search(THD *thd, XID *xid);
bool xid_cache_insert(XID *xid, enum xa_states xa_state);
bool xid_cache_insert(XID_STATE *xid_state);
void xid_cache_delete(XID_STATE *xid_state);
bool xid_cache_insert(THD *thd, XID_STATE *xid_state);
void xid_cache_delete(THD *thd, XID_STATE *xid_state);
int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *argument);
/**
@class Security_context
......@@ -3800,6 +3801,8 @@ public:
}
LF_PINS *tdc_hash_pins;
LF_PINS *xid_hash_pins;
bool fix_xid_hash_pins();
inline ulong wsrep_binlog_format() const
{
......
......@@ -738,7 +738,7 @@ bool trans_xa_start(THD *thd)
thd->transaction.xid_state.xa_state= XA_ACTIVE;
thd->transaction.xid_state.rm_error= 0;
thd->transaction.xid_state.xid.set(thd->lex->xid);
if (xid_cache_insert(&thd->transaction.xid_state))
if (xid_cache_insert(thd, &thd->transaction.xid_state))
{
thd->transaction.xid_state.xa_state= XA_NOTR;
thd->transaction.xid_state.xid.null();
......@@ -801,7 +801,7 @@ bool trans_xa_prepare(THD *thd)
my_error(ER_XAER_NOTA, MYF(0));
else if (ha_prepare(thd))
{
xid_cache_delete(&thd->transaction.xid_state);
xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
my_error(ER_XA_RBROLLBACK, MYF(0));
}
......@@ -830,6 +830,11 @@ bool trans_xa_commit(THD *thd)
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
if (thd->fix_xid_hash_pins())
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
DBUG_RETURN(TRUE);
}
/*
xid_state.in_thd is always true beside of xa recovery procedure.
Note, that there is no race condition here between xid_cache_search
......@@ -840,7 +845,7 @@ bool trans_xa_commit(THD *thd)
xa_cache_insert(XID, xa_states), which is called before starting
client connections, and thus is always single-threaded.
*/
XID_STATE *xs= xid_cache_search(thd->lex->xid);
XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
res= !xs || xs->in_thd;
if (res)
my_error(ER_XAER_NOTA, MYF(0));
......@@ -848,7 +853,7 @@ bool trans_xa_commit(THD *thd)
{
res= xa_trans_rolled_back(xs);
ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
xid_cache_delete(xs);
xid_cache_delete(thd, xs);
}
DBUG_RETURN(res);
}
......@@ -911,7 +916,7 @@ bool trans_xa_commit(THD *thd)
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
xid_cache_delete(&thd->transaction.xid_state);
xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
DBUG_RETURN(res);
......@@ -935,14 +940,20 @@ bool trans_xa_rollback(THD *thd)
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
XID_STATE *xs= xid_cache_search(thd->lex->xid);
if (thd->fix_xid_hash_pins())
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
DBUG_RETURN(TRUE);
}
XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
if (!xs || xs->in_thd)
my_error(ER_XAER_NOTA, MYF(0));
else
{
xa_trans_rolled_back(xs);
ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
xid_cache_delete(xs);
xid_cache_delete(thd, xs);
}
DBUG_RETURN(thd->get_stmt_da()->is_error());
}
......@@ -961,7 +972,7 @@ bool trans_xa_rollback(THD *thd)
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
xid_cache_delete(&thd->transaction.xid_state);
xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
DBUG_RETURN(res);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment