Commit d90a2b44 authored by Kristian Nielsen's avatar Kristian Nielsen Committed by Andrei

MDEV-33668: More precise dependency tracking of XA XID in parallel replication

Keep track of each recently active XID, recording which worker it was queued
on. If an XID might still be active, choose the same worker to queue event
groups that refer to the same XID to avoid conflicts.

Otherwise, schedule the XID freely in the next round-robin slot.

This way, XA PREPARE can normally be scheduled without restrictions (unless
duplicate XID transactions come close together). This improves scheduling
and parallelism over the old method, where the worker thread to schedule XA
PREPARE on was fixed based on a hash value of the XID.

XA COMMIT will normally be scheduled on the same worker as XA PREPARE, but
can be a different one if the XA PREPARE is far back in the event history.

Testcase and code for trimming dynamic array due to Andrei.
Reviewed-by: default avatarAndrei Elkin <andrei.elkin@mariadb.com>
Signed-off-by: default avatarKristian Nielsen <knielsen@knielsen-hq.org>
parent f9ecaa87
include/master-slave.inc
[connection master]
call mtr.add_suppression("Deadlock found when trying to get lock; try restarting transaction");
call mtr.add_suppression("WSREP: handlerton rollback failed");
connection master;
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
connection slave;
include/stop_slave.inc
SET @old_parallel_threads = @@GLOBAL.slave_parallel_threads;
SET @old_slave_domain_parallel_threads = @@GLOBAL.slave_domain_parallel_threads;
SET @@global.slave_parallel_threads = 5;
SET @@global.slave_domain_parallel_threads = 3;
SET @old_parallel_mode = @@GLOBAL.slave_parallel_mode;
CHANGE MASTER TO master_use_gtid=slave_pos;
connection master;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);
include/save_master_gtid.inc
connection slave;
include/start_slave.inc
include/sync_with_master_gtid.inc
include/stop_slave.inc
SET @@global.slave_parallel_mode ='optimistic';
connection master;
include/save_master_gtid.inc
connection slave;
include/start_slave.inc
include/sync_with_master_gtid.inc
include/stop_slave.inc
connection master;
include/save_master_gtid.inc
connection slave;
SET @@global.slave_parallel_mode ='conservative';
include/start_slave.inc
include/sync_with_master_gtid.inc
include/stop_slave.inc
include/save_master_gtid.inc
connection slave;
SET @@global.slave_parallel_mode = 'optimistic';
include/start_slave.inc
include/sync_with_master_gtid.inc
include/diff_tables.inc [master:t1, slave:t1]
connection slave;
include/stop_slave.inc
SET @@global.slave_parallel_mode = @old_parallel_mode;
SET @@global.slave_parallel_threads = @old_parallel_threads;
SET @@global.slave_domain_parallel_threads = @old_slave_domain_parallel_threads;
include/start_slave.inc
connection master;
DROP TABLE t1;
include/save_master_gtid.inc
connection slave;
include/sync_with_master_gtid.inc
connection master;
include/rpl_end.inc
# Similar to rpl_parallel_optimistic_xa to verify XA
# parallel execution with multiple gtid domain.
# References:
# MDEV-33668 Adapt parallel slave's round-robin scheduling to XA events
--source include/have_innodb.inc
--source include/have_perfschema.inc
--source include/master-slave.inc
# Tests' global declarations
--let $trx = _trx_
call mtr.add_suppression("Deadlock found when trying to get lock; try restarting transaction");
call mtr.add_suppression("WSREP: handlerton rollback failed");
--connection master
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
--save_master_pos
# Prepare to restart slave into optimistic parallel mode
--connection slave
--sync_with_master
--source include/stop_slave.inc
SET @old_parallel_threads = @@GLOBAL.slave_parallel_threads;
SET @old_slave_domain_parallel_threads = @@GLOBAL.slave_domain_parallel_threads;
SET @@global.slave_parallel_threads = 5;
SET @@global.slave_domain_parallel_threads = 3;
SET @old_parallel_mode = @@GLOBAL.slave_parallel_mode;
CHANGE MASTER TO master_use_gtid=slave_pos;
--connection master
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1, 0);
--source include/save_master_gtid.inc
--connection slave
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
--let $mode = 2
# mode = 2 is optimistic
SET @@global.slave_parallel_mode ='optimistic';
while ($mode)
{
--connection master
#
# create XA events alternating gtid domains to run them in parallel on slave.
#
--let $domain_num = 3
--let $trx_num = 777
--let $i = $trx_num
--let $conn = master
--disable_query_log
while($i > 0)
{
--let $domain_id = `SELECT $i % $domain_num`
--eval set @@gtid_domain_id = $domain_id
# 'decision' to commit 0, or rollback 1
--let $decision = `SELECT $i % 2`
--eval XA START '$conn$trx$i'
--eval UPDATE t1 SET b = 1 - 2 * $decision WHERE a = 1
--eval XA END '$conn$trx$i'
--eval XA PREPARE '$conn$trx$i'
--let $term = COMMIT
if ($decision)
{
--let $term = ROLLBACK
}
--eval XA $term '$conn$trx$i'
--dec $i
}
--enable_query_log
--source include/save_master_gtid.inc
--connection slave
if (`select $mode = 1`)
{
SET @@global.slave_parallel_mode ='conservative';
}
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
--dec $mode
}
# Generations test.
# Create few ranges of XAP groups length of greater than
# 3 * slave_parallel_threads + 1
# terminated upon each range.
--let $iter = 3
--let $generation_len = @@global.slave_parallel_threads
--let $domain_num = 3
--disable_query_log
--connection master
while ($iter)
{
--let $k = `select 3 * 3 * $generation_len`
--let $_k = $k
while ($k)
{
--source include/count_sessions.inc
--connect(con$k,localhost,root,,)
#
# create XA events alternating gtid domains to run them in parallel on slave.
#
--let $domain_id = `SELECT $k % $domain_num`
--eval set @@gtid_domain_id = $domain_id
--eval XA START '$trx$k'
--eval INSERT INTO t1 VALUES ($k + 1, $iter)
--eval XA END '$trx$k'
--eval XA PREPARE '$trx$k'
--disconnect con$k
--connection master
--source include/wait_until_count_sessions.inc
--dec $k
}
--connection master
--let $k = $_k
while ($k)
{
--let $term = COMMIT
--let $decision = `SELECT $k % 2`
if ($decision)
{
--let $term = ROLLBACK
}
--eval XA $term '$trx$k'
}
--dec $iter
}
--enable_query_log
--source include/save_master_gtid.inc
--connection slave
SET @@global.slave_parallel_mode = 'optimistic';
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
#
# Overall consistency check
#
--let $diff_tables= master:t1, slave:t1
--source include/diff_tables.inc
#
# Clean up.
#
--connection slave
--source include/stop_slave.inc
SET @@global.slave_parallel_mode = @old_parallel_mode;
SET @@global.slave_parallel_threads = @old_parallel_threads;
SET @@global.slave_domain_parallel_threads = @old_slave_domain_parallel_threads;
--source include/start_slave.inc
--connection master
DROP TABLE t1;
--source include/save_master_gtid.inc
--connection slave
--source include/sync_with_master_gtid.inc
--connection master
--source include/rpl_end.inc
...@@ -4199,7 +4199,8 @@ int XA_prepare_log_event::do_commit() ...@@ -4199,7 +4199,8 @@ int XA_prepare_log_event::do_commit()
thd->lex->xid= &xid; thd->lex->xid= &xid;
if (!one_phase) if (!one_phase)
{ {
if ((res= thd->wait_for_prior_commit())) if (thd->is_current_stmt_binlog_disabled() &&
(res= thd->wait_for_prior_commit()))
return res; return res;
thd->lex->sql_command= SQLCOM_XA_PREPARE; thd->lex->sql_command= SQLCOM_XA_PREPARE;
......
...@@ -2325,6 +2325,80 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli) ...@@ -2325,6 +2325,80 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
} }
} }
/*
Check when we have done a complete round of scheduling for workers
0, 1, ..., (rpl_thread_max-1), in this order.
This often occurs every rpl_thread_max event group, but XA XID dependency
restrictions can cause insertion of extra out-of-order worker scheduling
in-between the normal round-robin scheduling.
*/
void
rpl_parallel_entry::check_scheduling_generation(sched_bucket *cur)
{
uint32 idx= static_cast<uint32>(cur - rpl_threads);
DBUG_ASSERT(cur >= rpl_threads);
DBUG_ASSERT(cur < rpl_threads + rpl_thread_max);
if (idx == current_generation_idx)
{
++idx;
if (idx >= rpl_thread_max)
{
/* A new generation; all workers have been scheduled at least once. */
idx= 0;
++current_generation;
}
current_generation_idx= idx;
}
}
rpl_parallel_entry::sched_bucket *
rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid)
{
uint64 cur_gen= current_generation;
my_off_t i= 0;
while (i < maybe_active_xid.elements)
{
/*
Purge no longer active XID from the list:
- In generation N, XID might have been scheduled for worker W.
- Events in generation (N+1) might run freely in parallel with W.
- Events in generation (N+2) will have done wait_for_prior_commit for
the event group with XID (or a later one), but the XID might still be
active for a bit longer after wakeup_prior_commit().
- Events in generation (N+3) will have done wait_for_prior_commit() for
an event in W _after_ the XID, so are sure not to see the XID active.
Therefore, XID can be safely scheduled to a different worker in
generation (N+3) when last prior use was in generation N (or earlier).
*/
xid_active_generation *a=
dynamic_element(&maybe_active_xid, i, xid_active_generation *);
if (a->generation + 3 <= cur_gen)
{
*a= *((xid_active_generation *)pop_dynamic(&maybe_active_xid));
continue;
}
if (xid->eq(&a->xid))
{
/* Update the last used generation and return the match. */
a->generation= cur_gen;
return a->thr;
}
++i;
}
/* try to keep allocated memory in the range of [2,10] * initial_chunk_size */
if (maybe_active_xid.elements <= 2 * active_xid_init_alloc() &&
maybe_active_xid.max_element > 10 * active_xid_init_alloc())
freeze_size(&maybe_active_xid);
/* No matching XID conflicts. */
return nullptr;
}
/* /*
Obtain a worker thread that we can queue an event to. Obtain a worker thread that we can queue an event to.
...@@ -2369,17 +2443,36 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, ...@@ -2369,17 +2443,36 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
if (gtid_ev->flags2 & if (gtid_ev->flags2 &
(Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
{ {
/* if ((cur_thr= check_xa_xid_dependency(&gtid_ev->xid)))
For XA COMMIT/ROLLBACK, choose the same bucket as the XA PREPARE, {
overriding the round-robin scheduling. /*
*/ A previously scheduled event group with the same XID might still be
uint32 idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), active in a worker, so schedule this event group in the same worker
gtid_ev->xid.key_length()) % rpl_thread_max; to avoid a conflict.
rpl_threads[idx].unlink(); */
thread_sched_fifo->append(rpl_threads + idx); cur_thr->unlink();
thread_sched_fifo->append(cur_thr);
}
else
{
/* Record this XID now active. */
xid_active_generation *a=
(xid_active_generation *)alloc_dynamic(&maybe_active_xid);
if (!a)
return NULL;
a->thr= cur_thr= thread_sched_fifo->head();
a->generation= current_generation;
a->xid.set(&gtid_ev->xid);
}
} }
else
cur_thr= thread_sched_fifo->head();
check_scheduling_generation(cur_thr);
} }
cur_thr= thread_sched_fifo->head(); else
cur_thr= thread_sched_fifo->head();
thr= cur_thr->thr; thr= cur_thr->thr;
if (thr) if (thr)
...@@ -2471,6 +2564,7 @@ free_rpl_parallel_entry(void *element) ...@@ -2471,6 +2564,7 @@ free_rpl_parallel_entry(void *element)
dealloc_gco(e->current_gco); dealloc_gco(e->current_gco);
e->current_gco= prev_gco; e->current_gco= prev_gco;
} }
delete_dynamic(&e->maybe_active_xid);
mysql_cond_destroy(&e->COND_parallel_entry); mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry);
my_free(e); my_free(e);
...@@ -2524,11 +2618,26 @@ rpl_parallel::find(uint32 domain_id) ...@@ -2524,11 +2618,26 @@ rpl_parallel::find(uint32 domain_id)
my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p))); my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
return NULL; return NULL;
} }
/* Initialize a FIFO of scheduled worker threads. */
e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>; e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>;
for (ulong i= 0; i < count; ++i) /*
e->thread_sched_fifo->push_back(::new (p+i) rpl_parallel_entry::sched_bucket); (We cycle the FIFO _before_ allocating next entry in
rpl_parallel_entry::choose_thread(). So initialize the FIFO with the
highest element at the front, just so that the first event group gets
scheduled on entry 0).
*/
e->thread_sched_fifo->
push_back(::new (p+count-1) rpl_parallel_entry::sched_bucket);
for (ulong i= 0; i < count-1; ++i)
e->thread_sched_fifo->
push_back(::new (p+i) rpl_parallel_entry::sched_bucket);
e->rpl_threads= p; e->rpl_threads= p;
e->rpl_thread_max= count; e->rpl_thread_max= count;
e->current_generation = 0;
e->current_generation_idx = 0;
init_dynamic_array2(PSI_INSTRUMENT_ME, &e->maybe_active_xid,
sizeof(rpl_parallel_entry::xid_active_generation),
0, e->active_xid_init_alloc(), 0, MYF(0));
e->domain_id= domain_id; e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
e->pause_sub_id= (uint64)ULONGLONG_MAX; e->pause_sub_id= (uint64)ULONGLONG_MAX;
......
...@@ -326,10 +326,26 @@ struct rpl_parallel_thread_pool { ...@@ -326,10 +326,26 @@ struct rpl_parallel_thread_pool {
struct rpl_parallel_entry { struct rpl_parallel_entry {
/*
A small struct to put worker threads references into a FIFO (using an
I_List) for round-robin scheduling.
*/
struct sched_bucket : public ilink { struct sched_bucket : public ilink {
sched_bucket() : thr(nullptr) { } sched_bucket() : thr(nullptr) { }
rpl_parallel_thread *thr; rpl_parallel_thread *thr;
}; };
/*
A struct to keep track of into which "generation" an XA XID was last
scheduled. A "generation" means that we know that every worker thread
slot in the rpl_parallel_entry was scheduled at least once. When more
that two generations have passed, we can safely reuse the XID in a
different worker.
*/
struct xid_active_generation {
uint64 generation;
sched_bucket *thr;
xid_t xid;
};
mysql_mutex_t LOCK_parallel_entry; mysql_mutex_t LOCK_parallel_entry;
mysql_cond_t COND_parallel_entry; mysql_cond_t COND_parallel_entry;
...@@ -373,6 +389,23 @@ struct rpl_parallel_entry { ...@@ -373,6 +389,23 @@ struct rpl_parallel_entry {
sched_bucket *rpl_threads; sched_bucket *rpl_threads;
I_List<sched_bucket> *thread_sched_fifo; I_List<sched_bucket> *thread_sched_fifo;
uint32 rpl_thread_max; uint32 rpl_thread_max;
/*
Keep track of all XA XIDs that may still be active in a worker thread.
The elements are of type xid_active_generation.
*/
DYNAMIC_ARRAY maybe_active_xid;
/*
Keeping track of the current scheduling generation.
A new generation means that every worker thread in the rpl_threads array
have been scheduled at least one event group.
When we have scheduled to slot current_generation_idx= 0, 1, ..., N-1 in this
order, we know that (at least) one generation has passed.
*/
uint64 current_generation;
uint32 current_generation_idx;
/* /*
The sub_id of the last transaction to commit within this domain_id. The sub_id of the last transaction to commit within this domain_id.
Must be accessed under LOCK_parallel_entry protection. Must be accessed under LOCK_parallel_entry protection.
...@@ -426,11 +459,19 @@ struct rpl_parallel_entry { ...@@ -426,11 +459,19 @@ struct rpl_parallel_entry {
/* The group_commit_orderer object for the events currently being queued. */ /* The group_commit_orderer object for the events currently being queued. */
group_commit_orderer *current_gco; group_commit_orderer *current_gco;
void check_scheduling_generation(sched_bucket *cur);
sched_bucket *check_xa_xid_dependency(xid_t *xid);
rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage, PSI_stage_info *old_stage,
Gtid_log_event *gtid_ev); Gtid_log_event *gtid_ev);
int queue_master_restart(rpl_group_info *rgi, int queue_master_restart(rpl_group_info *rgi,
Format_description_log_event *fdev); Format_description_log_event *fdev);
/*
the initial size of maybe_ array corresponds to the case of
each worker receives perhaps unlikely XA-PREPARE and XA-COMMIT within
the same generation.
*/
inline uint active_xid_init_alloc() { return 3 * 2 * rpl_thread_max; }
}; };
struct rpl_parallel { struct rpl_parallel {
HASH domain_hash; HASH domain_hash;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment