Commit f9ecaa87 authored by Kristian Nielsen's avatar Kristian Nielsen Committed by Andrei

MDEV-33668: Refactor parallel replication round-robin scheduling to use explicit FIFO

This is a preparatory patch to facilitate the next commit to improve
the scheduling of XA transactions in parallel replication.

When choosing the scheduling bucket for the next event group in
rpl_parallel_entry::choose_thread(), use an explicit FIFO for the
round-robin selection instead of a simple cyclic counter i := (i+1) % N.

This allows to schedule XA COMMIT/ROLLBACK dependencies explicitly without
changing the round-robin scheduling of other event groups.
Reviewed-by: default avatarAndrei Elkin <andrei.elkin@mariadb.com>
Signed-off-by: default avatarKristian Nielsen <knielsen@knielsen-hq.org>
parent 89c907bd
...@@ -2357,33 +2357,38 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, ...@@ -2357,33 +2357,38 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage, PSI_stage_info *old_stage,
Gtid_log_event *gtid_ev) Gtid_log_event *gtid_ev)
{ {
uint32 idx; sched_bucket *cur_thr;
Relay_log_info *rli= rgi->rli; Relay_log_info *rli= rgi->rli;
rpl_parallel_thread *thr; rpl_parallel_thread *thr;
idx= rpl_thread_idx;
if (gtid_ev) if (gtid_ev)
{ {
/* New event group; cycle the thread scheduling buckets round-robin. */
thread_sched_fifo->push_back(thread_sched_fifo->get());
if (gtid_ev->flags2 & if (gtid_ev->flags2 &
(Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
gtid_ev->xid.key_length()) % rpl_thread_max;
else
{ {
++idx; /*
if (idx >= rpl_thread_max) For XA COMMIT/ROLLBACK, choose the same bucket as the XA PREPARE,
idx= 0; overriding the round-robin scheduling.
*/
uint32 idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
gtid_ev->xid.key_length()) % rpl_thread_max;
rpl_threads[idx].unlink();
thread_sched_fifo->append(rpl_threads + idx);
} }
rpl_thread_idx= idx;
} }
thr= rpl_threads[idx]; cur_thr= thread_sched_fifo->head();
thr= cur_thr->thr;
if (thr) if (thr)
{ {
*did_enter_cond= false; *did_enter_cond= false;
mysql_mutex_lock(&thr->LOCK_rpl_thread); mysql_mutex_lock(&thr->LOCK_rpl_thread);
for (;;) for (;;)
{ {
if (thr->current_owner != &rpl_threads[idx]) if (thr->current_owner != &cur_thr->thr)
{ {
/* /*
The worker thread became idle, and returned to the free list and The worker thread became idle, and returned to the free list and
...@@ -2450,8 +2455,8 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, ...@@ -2450,8 +2455,8 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
} }
} }
if (!thr) if (!thr)
rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx], cur_thr->thr= thr=
this); global_rpl_thread_pool.get_thread(&cur_thr->thr, this);
return thr; return thr;
} }
...@@ -2508,15 +2513,20 @@ rpl_parallel::find(uint32 domain_id) ...@@ -2508,15 +2513,20 @@ rpl_parallel::find(uint32 domain_id)
ulong count= opt_slave_domain_parallel_threads; ulong count= opt_slave_domain_parallel_threads;
if (count == 0 || count > opt_slave_parallel_threads) if (count == 0 || count > opt_slave_parallel_threads)
count= opt_slave_parallel_threads; count= opt_slave_parallel_threads;
rpl_parallel_thread **p; rpl_parallel_entry::sched_bucket *p;
I_List<rpl_parallel_entry::sched_bucket> *fifo;
if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL), if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL),
&e, sizeof(*e), &e, sizeof(*e),
&p, count*sizeof(*p), &p, count*sizeof(*p),
&fifo, sizeof(*fifo),
NULL)) NULL))
{ {
my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p))); my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
return NULL; return NULL;
} }
e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>;
for (ulong i= 0; i < count; ++i)
e->thread_sched_fifo->push_back(::new (p+i) rpl_parallel_entry::sched_bucket);
e->rpl_threads= p; e->rpl_threads= p;
e->rpl_thread_max= count; e->rpl_thread_max= count;
e->domain_id= domain_id; e->domain_id= domain_id;
...@@ -2582,10 +2592,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) ...@@ -2582,10 +2592,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
mysql_mutex_unlock(&e->LOCK_parallel_entry); mysql_mutex_unlock(&e->LOCK_parallel_entry);
for (j= 0; j < e->rpl_thread_max; ++j) for (j= 0; j < e->rpl_thread_max; ++j)
{ {
if ((rpt= e->rpl_threads[j])) if ((rpt= e->rpl_threads[j].thr))
{ {
mysql_mutex_lock(&rpt->LOCK_rpl_thread); mysql_mutex_lock(&rpt->LOCK_rpl_thread);
if (rpt->current_owner == &e->rpl_threads[j]) if (rpt->current_owner == &e->rpl_threads[j].thr)
mysql_cond_signal(&rpt->COND_rpl_thread); mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
} }
...@@ -2605,10 +2615,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) ...@@ -2605,10 +2615,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
for (j= 0; j < e->rpl_thread_max; ++j) for (j= 0; j < e->rpl_thread_max; ++j)
{ {
if ((rpt= e->rpl_threads[j])) if ((rpt= e->rpl_threads[j].thr))
{ {
mysql_mutex_lock(&rpt->LOCK_rpl_thread); mysql_mutex_lock(&rpt->LOCK_rpl_thread);
while (rpt->current_owner == &e->rpl_threads[j]) while (rpt->current_owner == &e->rpl_threads[j].thr)
mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread); mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
} }
...@@ -2655,7 +2665,7 @@ int ...@@ -2655,7 +2665,7 @@ int
rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
Format_description_log_event *fdev) Format_description_log_event *fdev)
{ {
uint32 idx; sched_bucket *cur_thr;
rpl_parallel_thread *thr; rpl_parallel_thread *thr;
rpl_parallel_thread::queued_event *qev; rpl_parallel_thread::queued_event *qev;
Relay_log_info *rli= rgi->rli; Relay_log_info *rli= rgi->rli;
...@@ -2670,12 +2680,12 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, ...@@ -2670,12 +2680,12 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
Thus there is no need for the full complexity of choose_thread(). We only Thus there is no need for the full complexity of choose_thread(). We only
need to check if we have a current worker thread, and queue for it if so. need to check if we have a current worker thread, and queue for it if so.
*/ */
idx= rpl_thread_idx; cur_thr= thread_sched_fifo->head();
thr= rpl_threads[idx]; thr= cur_thr->thr;
if (!thr) if (!thr)
return 0; return 0;
mysql_mutex_lock(&thr->LOCK_rpl_thread); mysql_mutex_lock(&thr->LOCK_rpl_thread);
if (thr->current_owner != &rpl_threads[idx]) if (thr->current_owner != &cur_thr->thr)
{ {
/* No active worker thread, so no need to queue the master restart. */ /* No active worker thread, so no need to queue the master restart. */
mysql_mutex_unlock(&thr->LOCK_rpl_thread); mysql_mutex_unlock(&thr->LOCK_rpl_thread);
......
...@@ -326,6 +326,11 @@ struct rpl_parallel_thread_pool { ...@@ -326,6 +326,11 @@ struct rpl_parallel_thread_pool {
struct rpl_parallel_entry { struct rpl_parallel_entry {
struct sched_bucket : public ilink {
sched_bucket() : thr(nullptr) { }
rpl_parallel_thread *thr;
};
mysql_mutex_t LOCK_parallel_entry; mysql_mutex_t LOCK_parallel_entry;
mysql_cond_t COND_parallel_entry; mysql_cond_t COND_parallel_entry;
uint32 domain_id; uint32 domain_id;
...@@ -355,17 +360,19 @@ struct rpl_parallel_entry { ...@@ -355,17 +360,19 @@ struct rpl_parallel_entry {
uint64 stop_sub_id; uint64 stop_sub_id;
/* /*
Cyclic array recording the last rpl_thread_max worker threads that we Array recording the last rpl_thread_max worker threads that we
queued event for. This is used to limit how many workers a single domain queued event for. This is used to limit how many workers a single domain
can occupy (--slave-domain-parallel-threads). can occupy (--slave-domain-parallel-threads).
The array is structured as a FIFO using an I_List thread_sched_fifo.
Note that workers are never explicitly deleted from the array. Instead, Note that workers are never explicitly deleted from the array. Instead,
we need to check (under LOCK_rpl_thread) that the thread still belongs we need to check (under LOCK_rpl_thread) that the thread still belongs
to us before re-using (rpl_thread::current_owner). to us before re-using (rpl_thread::current_owner).
*/ */
rpl_parallel_thread **rpl_threads; sched_bucket *rpl_threads;
I_List<sched_bucket> *thread_sched_fifo;
uint32 rpl_thread_max; uint32 rpl_thread_max;
uint32 rpl_thread_idx;
/* /*
The sub_id of the last transaction to commit within this domain_id. The sub_id of the last transaction to commit within this domain_id.
Must be accessed under LOCK_parallel_entry protection. Must be accessed under LOCK_parallel_entry protection.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment