Commit 6253f0dc authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-33668: More efficient XA dependency tracking in SQL driver thread

Avoid linear scan of all recently queued XIDs in the SQL driver thread,
which might be expensive in XA-heavy workloads and large number of parallel
replication worker threads.

Instead keep a hash in the rpl_parallel_entry of where recently queued XIDs
were scheduled. This allows direct lookup of any potential scheduling
dependency.

Keep a list in each scheduling bucket of recently queued XIDs, and purge the
list (based on generations) when queueing next XA.

Also implement a more fine-grained dependency check based on sub_id
comparison. This can sometimes avoid a scheduling dependency that would
otherwise look necessary based solely on the generation check.
Signed-off-by: default avatarKristian Nielsen <knielsen@knielsen-hq.org>
parent 50472b2b
...@@ -2326,13 +2326,52 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli) ...@@ -2326,13 +2326,52 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
} }
static uchar *
xa_xid_hash_key_func(const uchar *element, size_t *length,
my_bool not_used __attribute__((unused)))
{
const rpl_parallel_entry::xid_active_generation *x=
(const rpl_parallel_entry::xid_active_generation *)element;
*length= x->xid.key_length();
return x->xid.key();
}
static void
free_xa_xid_entry(void *element)
{
const rpl_parallel_entry::xid_active_generation *x=
(const rpl_parallel_entry::xid_active_generation *)element;
delete x;
}
/*
Record currently scheduled gtid_sub_id. Used for dependency tracking for
external XA XIDs.
*/
void
rpl_parallel_entry::sched_bucket::update_sub_id_for_xa_xid(uint64 sub_id)
{
last_sched_sub_id= sub_id;
/*
For the last scheduled XID, if any, record the next scheduled sub_id. This
can be used for XID dependency tracking. If we already waited for that
sub_id to commit, then we can be sure that the processing of the XID is
completely done.
*/
if (last_xid)
{
last_xid->next_sched_sub_id= sub_id;
last_xid= nullptr;
}
}
rpl_parallel_entry::sched_bucket * rpl_parallel_entry::sched_bucket *
rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid) rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid, sched_bucket *bucket)
{ {
uint64 cur_gen= current_generation; uint64 cur_gen= current_generation;
my_off_t i= 0;
while (i < maybe_active_xid.elements)
{
/* /*
Purge no longer active XID from the list: Purge no longer active XID from the list:
...@@ -2347,28 +2386,79 @@ rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid) ...@@ -2347,28 +2386,79 @@ rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid)
Therefore, XID can be safely scheduled to a different worker in Therefore, XID can be safely scheduled to a different worker in
generation (N+3) when last prior use was in generation N (or earlier). generation (N+3) when last prior use was in generation N (or earlier).
*/ */
xid_active_generation *a= while (!bucket->xids.is_empty() &&
dynamic_element(&maybe_active_xid, i, xid_active_generation *); bucket->xids.head()->generation + 3 <= cur_gen)
if (a->generation + 3 <= cur_gen)
{ {
*a= *((xid_active_generation *)pop_dynamic(&maybe_active_xid)); /*
continue; Mustn't try to assign the next_sched_sub_id after deleting the object!
This will not be possible, because deletion happens here only after 3
generations have passed, and then a sub_id will have been scheduled and
the last_xid pointer cleared.
*/
DBUG_ASSERT(bucket->last_xid != bucket->xids.head());
my_hash_delete(&xa_xid_hash, (uchar *)bucket->xids.get());
} }
if (xid->eq(&a->xid))
xid_active_generation *x= (xid_active_generation *)
my_hash_search(&xa_xid_hash, xid->key(), xid->key_length());
if (!x)
return nullptr;
/*
Purge no longer active XIDs that no longer require a scheduling dependency.
(We also purged the passed-in bucket above; if we had not, then we would
only purge on successful lookup in the hash, and old XIDs could easily
accumulate without a reasonable bound. This way we are sure to purge old
generations before inserting a new XID in a bucket list.)
*/
bool dropped= false;
sched_bucket *b= x->thr;
while (!b->xids.is_empty() &&
b->xids.head()->generation + 3 <= cur_gen)
{ {
/* Update the last used generation and return the match. */ xid_active_generation *y= b->xids.get();
a->generation= cur_gen; dropped= (y == x) || dropped;
return a->thr; DBUG_ASSERT(b->last_xid != y);
} my_hash_delete(&xa_xid_hash, (uchar *)y);
++i;
} }
/* try to keep allocated memory in the range of [2,10] * initial_chunk_size */ if (dropped)
if (maybe_active_xid.elements <= 2 * active_xid_init_alloc() && return nullptr;
maybe_active_xid.max_element > 10 * active_xid_init_alloc())
freeze_size(&maybe_active_xid); /*
The XID was last scheduled to bucket `b`. See if we can safely schedule it
into `bucket` anyway.
/* No matching XID conflicts. */ Suppose `b` scheduled some event group E1 after the XID, and suppose
further that `bucket` scheduled some other event group E2 with a higher
sub_id than E1. Then E2 can only commit _after_ E1, and E1 can only commit
after the XID processing in `b` was complete. Thus, we can safely schedule
the same XID on `bucket` after E2 in this case.
*/
if (x->next_sched_sub_id != 0 &&
x->next_sched_sub_id < bucket->last_sched_sub_id)
{
/*
Mustn't try to assign the next_sched_sub_id after deleting the object!
This is not possible, because next_sched_sub_id is only assigned once,
then the last_xid pointer is cleared.
*/
DBUG_ASSERT(b->last_xid != x);
my_hash_delete(&xa_xid_hash, (uchar *)x);
return nullptr; return nullptr;
}
/*
We have to schedule this event group on the same bucket as last event group
with the same XID. Put the XID at the end of the list.
*/
x->unlink();
x->generation= cur_gen;
x->next_sched_sub_id= 0;
b->xids.push_back(x);
b->last_xid= x;
return b;
} }
...@@ -2399,7 +2489,7 @@ rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid) ...@@ -2399,7 +2489,7 @@ rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid)
A worker for XA transaction is determined through xid hashing which A worker for XA transaction is determined through xid hashing which
ensure for a XA-complete to be scheduled to the same-xid XA-prepare worker. ensure for a XA-complete to be scheduled to the same-xid XA-prepare worker.
*/ */
rpl_parallel_thread * rpl_parallel_entry::sched_bucket *
rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage, PSI_stage_info *old_stage,
Gtid_log_event *gtid_ev) Gtid_log_event *gtid_ev)
...@@ -2415,16 +2505,18 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, ...@@ -2415,16 +2505,18 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
sched_bucket *fifo_head= thread_sched_fifo->head(); sched_bucket *fifo_head= thread_sched_fifo->head();
/* /*
When the next scheduling bucket is marked with the current generation All buckets are moved to the back of the FIFO after being scheduled.
number, it means that _all_ buckets were now scheduled in this generation, Therefore, once we see one from the current generation at the front of
and we should move to the next one. */ the FIFO again, we know all have been scheduled, and a new generation
can be started.
*/
if (fifo_head->last_gen == current_generation) if (fifo_head->last_gen == current_generation)
++current_generation; ++current_generation;
if (gtid_ev->flags2 & if (gtid_ev->flags2 &
(Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
{ {
if ((cur_thr= check_xa_xid_dependency(&gtid_ev->xid))) if ((cur_thr= check_xa_xid_dependency(&gtid_ev->xid, fifo_head)))
{ {
/* /*
A previously scheduled event group with the same XID might still be A previously scheduled event group with the same XID might still be
...@@ -2438,12 +2530,20 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, ...@@ -2438,12 +2530,20 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
{ {
/* Record this XID now active. */ /* Record this XID now active. */
xid_active_generation *a= xid_active_generation *a=
(xid_active_generation *)alloc_dynamic(&maybe_active_xid); new xid_active_generation(&gtid_ev->xid, fifo_head, current_generation);
if (!a) if (!a)
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*a)));
return NULL;
}
if (my_hash_insert(&xa_xid_hash, (uchar *)a))
{
delete a;
return NULL; return NULL;
a->thr= cur_thr= fifo_head; }
a->generation= current_generation; cur_thr= fifo_head;
a->xid.set(&gtid_ev->xid); cur_thr->xids.push_back(a);
cur_thr->last_xid= a;
} }
} }
else else
...@@ -2532,7 +2632,7 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, ...@@ -2532,7 +2632,7 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
cur_thr->thr= thr= cur_thr->thr= thr=
global_rpl_thread_pool.get_thread(&cur_thr->thr, this); global_rpl_thread_pool.get_thread(&cur_thr->thr, this);
return thr; return (thr ? cur_thr : NULL);
} }
static void static void
...@@ -2545,7 +2645,7 @@ free_rpl_parallel_entry(void *element) ...@@ -2545,7 +2645,7 @@ free_rpl_parallel_entry(void *element)
dealloc_gco(e->current_gco); dealloc_gco(e->current_gco);
e->current_gco= prev_gco; e->current_gco= prev_gco;
} }
delete_dynamic(&e->maybe_active_xid); my_hash_free(&e->xa_xid_hash);
mysql_cond_destroy(&e->COND_parallel_entry); mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry);
my_free(e); my_free(e);
...@@ -2615,9 +2715,8 @@ rpl_parallel::find(uint32 domain_id) ...@@ -2615,9 +2715,8 @@ rpl_parallel::find(uint32 domain_id)
e->rpl_threads= p; e->rpl_threads= p;
e->rpl_thread_max= count; e->rpl_thread_max= count;
e->current_generation = 0; e->current_generation = 0;
init_dynamic_array2(PSI_INSTRUMENT_ME, &e->maybe_active_xid, my_hash_init(PSI_INSTRUMENT_ME, &e->xa_xid_hash, &my_charset_bin, 32,
sizeof(rpl_parallel_entry::xid_active_generation), 0, 0, xa_xid_hash_key_func, free_xa_xid_entry, HASH_UNIQUE);
0, e->active_xid_init_alloc(), 0, MYF(0));
e->domain_id= domain_id; e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
e->pause_sub_id= (uint64)ULONGLONG_MAX; e->pause_sub_id= (uint64)ULONGLONG_MAX;
...@@ -3081,14 +3180,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -3081,14 +3180,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
commit). But do not exceed a limit of --slave-domain-parallel-threads; commit). But do not exceed a limit of --slave-domain-parallel-threads;
instead re-use a thread that we queued for previously. instead re-use a thread that we queued for previously.
*/ */
cur_thread= rpl_parallel_entry::sched_bucket *cur_sched=
e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, gtid_ev); e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, gtid_ev);
if (!cur_thread) if (!cur_sched)
{ {
/* This means we were killed. The error is already signalled. */ /* This means we were killed. The error is already signalled. */
delete ev; delete ev;
return 1; return 1;
} }
cur_thread= cur_sched->thr;
if (!(qev= cur_thread->get_qev(ev, event_size, rli))) if (!(qev= cur_thread->get_qev(ev, event_size, rli)))
{ {
...@@ -3115,6 +3215,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -3115,6 +3215,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
delete ev; delete ev;
return 1; return 1;
} }
cur_sched->update_sub_id_for_xa_xid(rgi->gtid_sub_id);
/* /*
We queue the event group in a new worker thread, to run in parallel We queue the event group in a new worker thread, to run in parallel
......
...@@ -326,16 +326,6 @@ struct rpl_parallel_thread_pool { ...@@ -326,16 +326,6 @@ struct rpl_parallel_thread_pool {
struct rpl_parallel_entry { struct rpl_parallel_entry {
/*
A small struct to put worker threads references into a FIFO (using an
I_List) for round-robin scheduling.
*/
struct sched_bucket : public ilink {
sched_bucket() : last_gen(0), thr(nullptr) { }
/* Generation this bucket was last scheduled in. */
uint64 last_gen;
rpl_parallel_thread *thr;
};
/* /*
A struct to keep track of into which "generation" an XA XID was last A struct to keep track of into which "generation" an XA XID was last
scheduled. A "generation" means that we know that every worker thread scheduled. A "generation" means that we know that every worker thread
...@@ -343,11 +333,40 @@ struct rpl_parallel_entry { ...@@ -343,11 +333,40 @@ struct rpl_parallel_entry {
that two generations have passed, we can safely reuse the XID in a that two generations have passed, we can safely reuse the XID in a
different worker. different worker.
*/ */
struct xid_active_generation { struct sched_bucket;
struct xid_active_generation : public ilink {
xid_active_generation(xid_t *xid_, sched_bucket *thr_, uint64 gen) :
generation(gen), next_sched_sub_id(0), thr(thr_) { xid.set(xid_); }
uint64 generation; uint64 generation;
/*
The sub_id of the event group scheduled _after_ this XID on the same
scheduling bucket, if any; else 0.
*/
uint64 next_sched_sub_id;
sched_bucket *thr; sched_bucket *thr;
xid_t xid; xid_t xid;
}; };
/*
A struct to put worker threads references into a FIFO (using an
I_List) for round-robin scheduling.
*/
struct sched_bucket : public ilink {
sched_bucket()
: last_gen(0), last_sched_sub_id(0), thr(nullptr), last_xid(nullptr)
{ }
void update_sub_id_for_xa_xid(uint64 sub_id);
/* Generation this bucket was last scheduled in. */
uint64 last_gen;
/* Last sub_id scheduled on this bucket, for XA XID dependency tracking. */
uint64_t last_sched_sub_id;
/* List of XIDs that were recently scheduled in this bucket. */
I_List<xid_active_generation> xids;
/* The worker thread last assigned to this scheduling bucket. */
rpl_parallel_thread *thr;
xid_active_generation *last_xid;
};
mysql_mutex_t LOCK_parallel_entry; mysql_mutex_t LOCK_parallel_entry;
mysql_cond_t COND_parallel_entry; mysql_cond_t COND_parallel_entry;
...@@ -392,10 +411,10 @@ struct rpl_parallel_entry { ...@@ -392,10 +411,10 @@ struct rpl_parallel_entry {
I_List<sched_bucket> *thread_sched_fifo; I_List<sched_bucket> *thread_sched_fifo;
uint32 rpl_thread_max; uint32 rpl_thread_max;
/* /*
Keep track of all XA XIDs that may still be active in a worker thread. Hash mapping external XA XID to the corresponding xid_active_generation
The elements are of type xid_active_generation. object, for dependency tracking.
*/ */
DYNAMIC_ARRAY maybe_active_xid; HASH xa_xid_hash;
/* /*
Keeping track of the current scheduling generation. Keeping track of the current scheduling generation.
...@@ -457,18 +476,12 @@ struct rpl_parallel_entry { ...@@ -457,18 +476,12 @@ struct rpl_parallel_entry {
/* The group_commit_orderer object for the events currently being queued. */ /* The group_commit_orderer object for the events currently being queued. */
group_commit_orderer *current_gco; group_commit_orderer *current_gco;
sched_bucket *check_xa_xid_dependency(xid_t *xid); sched_bucket *check_xa_xid_dependency(xid_t *xid, sched_bucket *bucket);
rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, sched_bucket *choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage, PSI_stage_info *old_stage,
Gtid_log_event *gtid_ev); Gtid_log_event *gtid_ev);
int queue_master_restart(rpl_group_info *rgi, int queue_master_restart(rpl_group_info *rgi,
Format_description_log_event *fdev); Format_description_log_event *fdev);
/*
the initial size of maybe_ array corresponds to the case of
each worker receives perhaps unlikely XA-PREPARE and XA-COMMIT within
the same generation.
*/
inline uint active_xid_init_alloc() { return 3 * 2 * rpl_thread_max; }
}; };
struct rpl_parallel { struct rpl_parallel {
HASH domain_hash; HASH domain_hash;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment