Commit 96bd9e6b authored by Andrei's avatar Andrei

MDEV-31949 parallel slave xa Round-Robin distribution

XA-Prepare group of events

  XA START xid
  ...
  XA END xid
  XA PREPARE xid

and its XA-"complete" terminator

  XA COMMIT or
  XA ROLLBACK

are made distributed Round-Robin across slave parallel workers.
The former hash-based policy was proven to attribute to execution
latency through creating a big - many times larger than the size
of the worker pool - queue of binlog-ordered transactions
to commit.

Acronyms and notations used below:

  XAP := XA-Prepare event or the whole prepared XA group of events
  XAC := XA-"complete", which is a solitary group of events
  |W| := the size of the slave worker pool
  Subscripts like `_k' denote order in a corresponding sequence
     (e.g binlog file).

KEY CHANGES:

The parallel slave
------------------
driver thread now maintains a list XAP:s currently
in processing. It's purpose is to avoid "wild" parallel execution of XA:s
with duplicate xids (unlikely, but that's the user's right).
The list is arranged as a sliding window with the size of 2*|W| to account
a possibility of XAP_k -> XAP_k+2|W|-1 the largest (in the group-of-events
count sense) dependency.
Say k=1, and |W| the # of Workers is 4. As transactions are distributed
Round-Robin, it's possible to have T^*_1 -> T^*_8 as the largest
dependency ('*' marks the dependents) in runtime.
It can be seen from worker queues, like in the picture below.
Let Q_i worker queues  develop downward:

  Q1 ...  Q4
  1^* 2 3 4
  5   6 7 8^*

Worker # 1 has assigned with T_1 and T_5.
Worker #4 can take on its T_8 when T_1 is yet at the
beginning of its processing, so even before XA START of that XAP.

XA related
----------
XID_cache_element is extended with two pointers to resolve
two types of dependencies: the duplicate xid XAP_k -> XAP_k+i
and the ordinary completion on the prepare XAP_k -> XAC_k+j.
The former is handled by a wait-for-xid protocol conducted by
xid_cache_delete() and xid_cache_insert_maybe_wait().
The later is done analogously by xid_cache_search_maybe_wait() and
slave_applier_reset_xa_trans().

XA-"complete" are allowed to go forward before its XAP parent
has released the xid (all recovery concerns are covered in MDEV-21496,
MDEV-21777).
Yet XAC is going to wait for it at a critical
point of execution which is at "complete" the work in Engine.

CAVEAT: storage/innobase/trx/trx0undo.cc changes are due to possibly
        fixed MDEV-32144,
	TODO: to be verified.

Thanks to Brandon Nesterenko at mariadb.com for initial review and
a lot of creative efforts to advance with this work!
parent ee5cadd5
# ==== Usage ====
#
# [--let $binlog_file= [<FILENAME> | LAST]]
# [--let $binlog_start= <POSITION> ]
# [--let $filter_cid= [0 | 1]
if ($binlog_start)
{
--let $_binlog_start=$binlog_start
......@@ -14,4 +20,9 @@ if ($binlog_file)
--replace_result "$_from_binlog_start" "from <binlog_start>" $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
--replace_column 2 # 5 #
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ /file_id=[0-9]+/file_id=#/ /GTID [0-9]+-[0-9]+-[0-9]+/GTID #-#-#/
if ($filter_cid)
{
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ /file_id=[0-9]+/file_id=#/ /GTID [0-9]+-[0-9]+-[0-9]+/GTID #-#-#/ / cid=[0-9]+//
}
--eval show binlog events $_in_binlog_file from $_binlog_start
CREATE TABLE ta (c INT KEY) engine=Aria;
XA START 'xid_a';
INSERT INTO ta VALUES (1);
XA END 'xid_a';
XA PREPARE 'xid_a';
Warnings:
Warning 1030 Got error 131 "Command not supported by the engine" from storage engine Aria
LOAD INDEX INTO CACHE c KEY(PRIMARY);
Table Op Msg_type Msg_text
test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state
test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state
test.c preload_keys error Corrupt
Warnings:
Warning 1196 Some non-transactional changed tables couldn't be rolled back
XA ROLLBACK 'xid_a';
CREATE TABLE ti (c INT KEY) engine=Innodb;
XA START 'xid_i';
INSERT INTO ti VALUES (1);
XA END 'xid_i';
XA PREPARE 'xid_i';
LOAD INDEX INTO CACHE c KEY(PRIMARY);
Table Op Msg_type Msg_text
test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state
test.c preload_keys Error XAER_RMFAIL: The command cannot be executed when global transaction is in the PREPARED state
test.c preload_keys error Corrupt
XA COMMIT 'xid_i';
SELECT * FROM ti;
c
include/show_binlog_events.inc
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # use `test`; CREATE TABLE ta (c INT KEY) engine=Aria
master-bin.000001 # Gtid # # BEGIN GTID #-#-#
master-bin.000001 # Annotate_rows # # INSERT INTO ta VALUES (1)
master-bin.000001 # Table_map # # table_id: # (test.ta)
master-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
master-bin.000001 # Query # # COMMIT
master-bin.000001 # Gtid # # XA START X'7869645f61',X'',1 GTID #-#-#
master-bin.000001 # Query # # XA END X'7869645f61',X'',1
master-bin.000001 # XA_prepare # # XA PREPARE X'7869645f61',X'',1
master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # XA ROLLBACK X'7869645f61',X'',1
master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # use `test`; CREATE TABLE ti (c INT KEY) engine=Innodb
master-bin.000001 # Gtid # # XA START X'7869645f69',X'',1 GTID #-#-#
master-bin.000001 # Annotate_rows # # INSERT INTO ti VALUES (1)
master-bin.000001 # Table_map # # table_id: # (test.ti)
master-bin.000001 # Write_rows_v1 # # table_id: # flags: STMT_END_F
master-bin.000001 # Query # # XA END X'7869645f69',X'',1
master-bin.000001 # XA_prepare # # XA PREPARE X'7869645f69',X'',1
master-bin.000001 # Gtid # # GTID #-#-#
master-bin.000001 # Query # # XA ROLLBACK X'7869645f69',X'',1
drop table ta,ti;
--source include/have_binlog_format_row.inc
--source include/have_innodb.inc
CREATE TABLE ta (c INT KEY) engine=Aria;
XA START 'xid_a';
INSERT INTO ta VALUES (1);
XA END 'xid_a';
XA PREPARE 'xid_a';
#--error ER_XAER_RMFAIL
LOAD INDEX INTO CACHE c KEY(PRIMARY);
XA ROLLBACK 'xid_a';
CREATE TABLE ti (c INT KEY) engine=Innodb;
XA START 'xid_i';
INSERT INTO ti VALUES (1);
XA END 'xid_i';
XA PREPARE 'xid_i';
# --error ER_XAER_RMFAIL
LOAD INDEX INTO CACHE c KEY(PRIMARY);
XA COMMIT 'xid_i';
SELECT * FROM ti;
#
--source include/show_binlog_events.inc
drop table ta,ti;
This diff is collapsed.
This diff is collapsed.
......@@ -1165,5 +1165,56 @@ connection server_1;
set @@binlog_format = @sav_binlog_format;
set @@global.binlog_format = @sav_binlog_format;
connection server_1;
create table t_not_in_binlog (a int) engine=innodb;
flush logs;
include/save_master_gtid.inc
connect con1,localhost,root,,;
call mtr.add_suppression("XAER_NOTA: Unknown XID");
SET sql_log_bin=0;
XA START 'a';
insert into t_not_in_binlog set a=1;
XA END 'a';
XA PREPARE 'a';
disconnect con1;
connection server_1;
xa recover;
formatID gtrid_length bqual_length data
1 1 0 a
XA ROLLBACK 'a';
drop table t_not_in_binlog;
include/save_master_gtid.inc
connection server_2;
XAER_NOTA: Unknown XID
include/wait_for_slave_sql_error.inc [errno=1397]
connect con2,127.0.0.1,root,,test,$SERVER_MYPORT_2,;
SET sql_log_bin=0;
XA START 'a';
insert into t_not_in_binlog set a=1;
XA END 'a';
XA PREPARE 'a';
disconnect con2;
connection server_2;
xa recover;
formatID gtrid_length bqual_length data
1 1 0 a
include/start_slave.inc
include/sync_with_master_gtid.inc
connection server_3;
XAER_NOTA: Unknown XID
include/wait_for_slave_sql_error.inc [errno=1397]
connect con3,127.0.0.1,root,,test,$SERVER_MYPORT_3,;
SET sql_log_bin=0;
XA START 'a';
insert into t_not_in_binlog set a=1;
XA END 'a';
XA PREPARE 'a';
disconnect con3;
connection server_3;
xa recover;
formatID gtrid_length bqual_length data
1 1 0 a
include/start_slave.inc
include/sync_with_master_gtid.inc
connection server_1;
include/rpl_end.inc
# End of rpl_xa_empty_transaction.test
......@@ -42,7 +42,7 @@ connection master;
drop table t1;
connection slave;
# TODO: Remove after fixing MDEV-21777
set @@global.gtid_slave_pos= "0-1-100";
set @@global.gtid_slave_pos= "0-1-101";
set @@global.slave_parallel_threads= @save_par_thds;
set @@global.gtid_strict_mode= @save_strict_mode;
set @@global.innodb_lock_wait_timeout= @save_innodb_lock_wait_timeout;
......
#
# This test ensures that two-phase XA transactions have their first and
# second phases parallelized for both XA COMMIT and XA ROLLBACK. It ensures the
# following behaviors:
#
# Test Case 1: Ensure that a 2-phase XA transaction has its XA PREPARE and
# XA COMMIT/ROLLBACK run concurrently. That is, the XA COMMIT/ROLLBACK will
# wait at group commit until the XA PREPARE binlogs, and then it will wait
# again until the XA PREPARE finishes preparing in all engines. At this point,
# the XA COMMIT/ROLLBACK will run to completion.
#
# Test Case 2: If two XA transactions have different XIDs, if XA COMMIT ends
# a transaction, ensure both phases of both transactions can all execute
# concurrently.
#
# Test Case 3: Two current 2-phase XA transactions with matching XIDs should
# run one after the other, while each transaction still allows both phases of
# its own transaction to run concurrently.
#
# Test Case 4: Error Case. If an XAP errors while its XAC/R is waiting on it,
# both the XAP and XAC/R should rollback successfully. Note this tests both:
# a) XAC/R is waiting in group commit (first phase times out in DMLs)
# b) XAC/R is waiting in group commit, with another XAP with a duplicate XID
# waiting on it.
#
# Test Case 5: If an XAP is skipped by the replica (e.g. by incorrectly
# setting gtid_slave_pos), and only its XAC/XAR is tried to execute, the
# replica should report ER_XAER_NOTA.
#
#
# References:
# MDEV-31949: slow parallel replication of user xa
#
--source include/have_debug.inc
--source include/have_innodb.inc
--source include/have_binlog_format_mixed.inc
--source include/master-slave.inc
--let $xa_complete_sym= COMMIT
--source include/rpl_xa_concurrent_2pc.inc
--let $xa_complete_sym= ROLLBACK
--source include/rpl_xa_concurrent_2pc.inc
--echo #
--echo # Test Case 5: If an XAP is skipped by the replica (e.g. by incorrectly
--echo # setting gtid_slave_pos), and only its XAC/XAR is tried to execute, the
--echo # replica should report ER_XAER_NOTA.
--connection master
create table t1 (a int) engine=innodb;
--source include/save_master_gtid.inc
--connection slave
--source include/sync_with_master_gtid.inc
call mtr.add_suppression("XAER_NOTA: Unknown XID");
--let $i=2
while ($i)
{
--source include/stop_slave.inc
--replace_regex /[0-9]*-[0-9]*-[0-9]*/<value>/
change master to master_use_gtid = slave_pos;
--connection master
--let $complete=rollback
if ($i == 1)
{
--let $complete=commit
}
xa start '1'; insert into t1 set a=1; xa end '1'; xa prepare '1';
--eval xa $complete '1'
insert into t1 set a=2;
--source include/save_master_gtid.inc
--connection slave
# reposition the slave to skip one transaction from master
set @save_gtid_slave_pos= @@global.gtid_slave_pos;
SELECT CONCAT(domain_id,"-",server_id,"-", seq_no + 1)
into @gtid_skip
FROM mysql.gtid_slave_pos
WHERE seq_no = (SELECT DISTINCT max(seq_no) FROM mysql.gtid_slave_pos) limit 1;
set @@global.gtid_slave_pos = @gtid_skip;
start slave;
let $slave_sql_errno= 1397; # ER_XAER_NOTA
source include/wait_for_slave_sql_error.inc;
--eval select count(*) = $i % 2 as 'must be true' from t1;
--source include/stop_slave.inc
--disable_warnings
set @@global.gtid_slave_pos = @save_gtid_slave_pos;
--enable_warnings
--replace_regex /[0-9]*-[0-9]*-[0-9]*/<value>/
show warnings;
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
--dec $i
}
# MDEV-31949 cleanup
--connection master
drop table t1;
--sync_slave_with_master
--source include/rpl_end.inc
--echo # End of rpl_xa_concurrent_2pc.test
......@@ -167,6 +167,95 @@ set @@global.binlog_format = row;
set @@binlog_format = @sav_binlog_format;
set @@global.binlog_format = @sav_binlog_format;
# MDEV-32257 dangling XA-rollback in binlog from emtpy XA
# create a case of XA ROLLBACK gets to binlog while its XAP was not and
# try replicate it.
# Expected result is both slaves error out.
--connection server_1
--let $binlog_start = query_get_value(SHOW MASTER STATUS, Position, 1)
--let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1)
create table t_not_in_binlog (a int) engine=innodb;
flush logs;
--source include/save_master_gtid.inc
--let $binlog_file=query_get_value(SHOW MASTER STATUS, File, 1)
# External connection XID access after disconnect is subject to race.
# "(" open parenthesis to remember # of connection before ...
--source include/count_sessions.inc
--connect(con1,localhost,root,,)
call mtr.add_suppression("XAER_NOTA: Unknown XID");
SET sql_log_bin=0;
XA START 'a';
insert into t_not_in_binlog set a=1;
XA END 'a';
XA PREPARE 'a';
--disconnect con1
--connection server_1
# .. ")" close parenthesis, to wait until con1 fully releases access to xid.
--source include/wait_until_count_sessions.inc
xa recover;
#
# replicate orphan XAR to server 2,3 and expect the error first
# after that compensate it.
--error 0
XA ROLLBACK 'a';
# cleanup at once
drop table t_not_in_binlog;
--source include/save_master_gtid.inc
--connection server_2
--echo XAER_NOTA: Unknown XID
--let $slave_sql_errno= 1397
--source include/wait_for_slave_sql_error.inc
# "("
--source include/count_sessions.inc
--connect (con2,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
SET sql_log_bin=0;
XA START 'a';
insert into t_not_in_binlog set a=1;
XA END 'a';
XA PREPARE 'a';
--disconnect con2
--connection server_2
# ")"
--source include/wait_until_count_sessions.inc
xa recover;
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
--connection server_3
--echo XAER_NOTA: Unknown XID
--let $slave_sql_errno= 1397
--source include/wait_for_slave_sql_error.inc
# "("
--source include/count_sessions.inc
--connect (con3,127.0.0.1,root,,test,$SERVER_MYPORT_3,)
SET sql_log_bin=0;
XA START 'a';
insert into t_not_in_binlog set a=1;
XA END 'a';
XA PREPARE 'a';
--disconnect con3
--connection server_3
# ")"
--source include/wait_until_count_sessions.inc
xa recover;
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
#
# Cleanup
--connection server_1
......
......@@ -56,8 +56,8 @@ xa start '1';
update t1 set b=b+10 where a=1;
xa end '1';
xa prepare '1';
--let $new_gtid= `SELECT @@global.gtid_binlog_pos`
xa commit '1';
--let $new_gtid= `SELECT @@global.gtid_binlog_pos`
--source include/save_master_gtid.inc
--connection slave1
......
......@@ -2180,13 +2180,13 @@ int ha_rollback_trans(THD *thd, bool all)
rollback without signalling following transactions. And in release
builds, we explicitly do the signalling before rolling back.
*/
DBUG_ASSERT(
!(thd->rgi_slave &&
!thd->rgi_slave->worker_error &&
thd->rgi_slave->did_mark_start_commit) ||
(thd->transaction->xid_state.is_explicit_XA() ||
(thd->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_PREPARED_XA)));
DBUG_ASSERT(!(thd->rgi_slave &&
!thd->rgi_slave->worker_error &&
thd->rgi_slave->did_mark_start_commit) ||
(thd->transaction->xid_state.is_explicit_XA() ||
(thd->rgi_slave->gtid_ev_flags2 &
(Gtid_log_event::FL_PREPARED_XA |
Gtid_log_event::FL_COMPLETED_XA))));
if (thd->rgi_slave &&
!thd->rgi_slave->worker_error &&
thd->rgi_slave->did_mark_start_commit)
......@@ -2343,6 +2343,15 @@ int ha_commit_or_rollback_by_xid(XID *xid, bool commit)
else
binlog_rollback_by_xid(binlog_hton, xid);
#ifdef ENABLED_DEBUG_SYNC
DBUG_EXECUTE_IF(
"stop_after_binlog_cor_by_xid",
DBUG_ASSERT(!debug_sync_set_action(
current_thd,
STRING_WITH_LEN(
"now SIGNAL xa_cor_binlogged WAIT_FOR continue_xa_cor"))););
#endif
plugin_foreach(NULL, commit ? xacommit_handlerton : xarollback_handlerton,
MYSQL_STORAGE_ENGINE_PLUGIN, &xaop);
......
This diff is collapsed.
......@@ -3314,16 +3314,22 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
XID_STATE &xid_state= thd->transaction->xid_state;
if (is_transactional)
{
if (xid_state.is_explicit_XA() &&
(thd->lex->sql_command == SQLCOM_XA_PREPARE ||
xid_state.get_state_code() == XA_PREPARED))
bool is_async_xac= false;
if ((xid_state.is_explicit_XA() &&
(thd->lex->sql_command == SQLCOM_XA_PREPARE ||
xid_state.get_state_code() == XA_PREPARED)) ||
(is_async_xac= (thd->rgi_slave && thd->rgi_slave->is_async_xac)))
{
DBUG_ASSERT(!(thd->lex->sql_command == SQLCOM_XA_COMMIT &&
thd->lex->xa_opt == XA_ONE_PHASE));
DBUG_ASSERT(!is_async_xac ||
thd->lex->sql_command == SQLCOM_XA_ROLLBACK ||
thd->lex->sql_command == SQLCOM_XA_COMMIT);
flags2|= thd->lex->sql_command == SQLCOM_XA_PREPARE ?
FL_PREPARED_XA : FL_COMPLETED_XA;
xid.set(xid_state.get_xid());
xid.set(is_async_xac? thd->lex->xid :
thd->transaction->xid_state.get_xid());
}
/* count non-zero extra recoverable engines; total = extra + 1 */
if (has_xid)
......@@ -4172,9 +4178,6 @@ int XA_prepare_log_event::do_commit()
thd->lex->xid= &xid;
if (!one_phase)
{
if ((res= thd->wait_for_prior_commit()))
return res;
thd->lex->sql_command= SQLCOM_XA_PREPARE;
res= trans_xa_prepare(thd);
}
......
......@@ -1055,7 +1055,7 @@ PSI_cond_key key_BINLOG_COND_xid_list,
key_BINLOG_COND_queue_busy;
PSI_cond_key key_RELAYLOG_COND_relay_log_updated,
key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready,
key_COND_wait_commit;
key_COND_wait_commit, key_COND_wait_commit_dep;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
......@@ -1083,6 +1083,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_RELAYLOG_COND_queue_busy, "MYSQL_RELAY_LOG::COND_queue_busy", 0},
{ &key_COND_wakeup_ready, "THD::COND_wakeup_ready", 0},
{ &key_COND_wait_commit, "wait_for_commit::COND_wait_commit", 0},
{ &key_COND_wait_commit, "wait_for_commit::COND_wait_commit_dep", 0},
{ &key_COND_cache_status_changed, "Query_cache::COND_cache_status_changed", 0},
{ &key_COND_manager, "COND_manager", PSI_FLAG_GLOBAL},
{ &key_COND_server_started, "COND_server_started", PSI_FLAG_GLOBAL},
......@@ -9224,6 +9225,7 @@ PSI_stage_info stage_waiting_for_deadlock_kill= { 0, "Waiting for parallel repli
PSI_stage_info stage_starting= { 0, "starting", 0};
PSI_stage_info stage_waiting_for_flush= { 0, "Waiting for non trans tables to be flushed", 0};
PSI_stage_info stage_waiting_for_ddl= { 0, "Waiting for DDLs", 0};
PSI_stage_info stage_waiting_for_prior_xa_transaction= { 0, "Waiting for prior xa transaction", 0};
PSI_memory_key key_memory_DATE_TIME_FORMAT;
PSI_memory_key key_memory_DDL_LOG_MEMORY_ENTRY;
......
......@@ -376,7 +376,7 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_COND_start_thread;
extern PSI_cond_key key_RELAYLOG_COND_relay_log_updated,
key_RELAYLOG_COND_bin_log_updated, key_COND_wakeup_ready,
key_COND_wait_commit;
key_COND_wait_commit, key_COND_wait_commit_dep;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
......@@ -679,7 +679,7 @@ extern PSI_stage_info stage_slave_background_process_request;
extern PSI_stage_info stage_slave_background_wait_request;
extern PSI_stage_info stage_waiting_for_deadlock_kill;
extern PSI_stage_info stage_starting;
extern PSI_stage_info stage_waiting_for_prior_xa_transaction;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
/**
Statement instrumentation keys (sql).
......
......@@ -9,6 +9,8 @@
#ifdef WITH_WSREP
#include "wsrep_trans_observer.h"
#endif
#include <algorithm>
using std::max;
/*
Code for optional parallel execution of replicated events on the slave.
......@@ -760,7 +762,8 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi)
return;
err_code= thd->get_stmt_da()->sql_errno();
if ((rgi->speculation == rpl_group_info::SPECULATE_OPTIMISTIC &&
err_code != ER_PRIOR_COMMIT_FAILED) ||
(err_code != ER_PRIOR_COMMIT_FAILED &&
err_code != ER_XAER_NOTA)) ||
((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
rgi->killed_for_retry))
{
......@@ -2364,16 +2367,9 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
idx= rpl_thread_idx;
if (gtid_ev)
{
if (gtid_ev->flags2 &
(Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
gtid_ev->xid.key_length()) % rpl_thread_max;
else
{
++idx;
if (idx >= rpl_thread_max)
idx= 0;
}
++idx;
if (idx >= rpl_thread_max)
idx= 0;
rpl_thread_idx= idx;
}
thr= rpl_threads[idx];
......@@ -2467,6 +2463,7 @@ free_rpl_parallel_entry(void *element)
}
mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry);
e->concurrent_xaps_window.~Dynamic_array();
my_free(e);
}
......@@ -2521,6 +2518,19 @@ rpl_parallel::find(uint32 domain_id)
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
e->pause_sub_id= (uint64)ULONGLONG_MAX;
e->concurrent_xaps_window.init((PSI_memory_key) PSI_INSTRUMENT_ME,
max((decltype(e->rpl_thread_max)) 2,
2*e->rpl_thread_max));
e->cxap_lhs= e->cxap_rhs= 0;
/*
0 initialize each element
*/
for (size_t i= 0; i < e->concurrent_xaps_window.max_size(); i++)
{
e->concurrent_xaps_window.at(i)= {0, 0};
}
mysql_mutex_init(key_LOCK_parallel_entry, &e->LOCK_parallel_entry,
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_parallel_entry, &e->COND_parallel_entry, NULL);
......@@ -2798,6 +2808,90 @@ abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
mysql_cond_signal(&cur_thread->COND_rpl_thread);
}
/**
Check the concurrency status of @c xid with ones in progress.
Any new @c xid of XA-prepare (@c is_xap is true then) is appended to
a sliding window designed as circular buffer. Through search in the window
the return result is computed.
@param e parallel entry pointer
@param xid a pointer to the xid of either XA-prepare of XA-"complete"
@param is_xap
true when xid belongs to XA-prepare
@return true when there exists a duplicate xid hash value,
false otherwise.
*/
static bool
handle_xa_prepera_duplicate_xid(rpl_parallel_entry *e, XID *xid, bool is_xap)
{
DBUG_ASSERT(e->current_group_info ||
(e->count_queued_event_groups == 0 &&
e->cxap_lhs == e->cxap_rhs && e->cxap_lhs == 0));
DBUG_ASSERT(xid);
DBUG_ASSERT(!xid->is_null());
DBUG_ASSERT(xid->key());
DBUG_ASSERT(xid->key_length());
uint64 curr_event_count= e->count_queued_event_groups;
uint32 i;
bool rc= false;
/*
We've seen XAP's before, so move the LHS up to a relevant spot.
LHS = RHS indicates the empty buffer (which implies RHS is exclusive "edge"
of the window.
Otherwise RHS always points to a free cell of which one at least must
exist at this point.
While transaction disribution is Round-robin, potential conflicts with
the current input xid can come only from
the preceeding 2*|W| - 1 xids, the 2*|W|th in the past is safe.
*/
for (i= e->cxap_lhs; i != e->cxap_rhs;
i= (i+1) % (e->concurrent_xaps_window.max_size()))
{
uint64 old_event_count= e->concurrent_xaps_window.at(i).second;
uint64 queued_event_diff= curr_event_count - old_event_count;
if (queued_event_diff >= e->rpl_thread_max)
{
/*
Squeeze the window from the left
as this XAP can't run in parallel with us.
*/
e->cxap_lhs= (i+1) % (e->concurrent_xaps_window.max_size());
}
else
{
// new LHS is determined
DBUG_ASSERT(e->cxap_lhs != e->cxap_rhs);
break;
}
}
std::size_t xid_hash= std::hash<XID>{}(*xid);
for (; i != e->cxap_rhs; i= (i+1) % (e->concurrent_xaps_window.max_size()))
{
std::size_t old_xid_hash= e->concurrent_xaps_window.at(i).first;
if (old_xid_hash == xid_hash)
{
rc= true;
break;
}
}
// Add the XAP to the sliding window
if (is_xap)
{
e->concurrent_xaps_window.at(e->cxap_rhs).first= xid_hash;
e->concurrent_xaps_window.at(e->cxap_rhs).second= curr_event_count;
e->cxap_rhs= (e->cxap_rhs + 1) % (e->concurrent_xaps_window.max_size());
if (e->cxap_rhs == e->cxap_lhs)
{
// the entire array is full therefore the lhs has become stale
e->cxap_lhs= (e->cxap_lhs + 1) % (e->concurrent_xaps_window.max_size());
}
}
return rc;
}
/*
do_event() is executed by the sql_driver_thd thread.
......@@ -3046,6 +3140,18 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
new_gco= true;
force_switch_flag= 0;
gco= e->current_gco;
/*
Take care of duplicate xids in XA-prepare, XA-"complete" should not
race its XA-prepare parent either. When the current transaction's xid
was seen and its transaction may still be in process this event group
gets flagged to wait for prior commits at the start of execution.
*/
if ((gtid_flags & (Gtid_log_event::FL_PREPARED_XA |
Gtid_log_event::FL_COMPLETED_XA)) &&
handle_xa_prepera_duplicate_xid(e, &gtid_ev->xid,
gtid_flags &
Gtid_log_event::FL_PREPARED_XA))
gtid_flags &= ~Gtid_log_event::FL_ALLOW_PARALLEL;
if (likely(gco))
{
uint8 flags= gco->flags;
......
......@@ -324,6 +324,14 @@ struct rpl_parallel_thread_pool {
void release_thread(rpl_parallel_thread *rpt);
};
template <>
struct std::hash<XID>
{
std::size_t operator()(const XID& xid) const
{
return my_hash_sort(&my_charset_bin, xid.key(), xid.key_length());
}
};
struct rpl_parallel_entry {
mysql_mutex_t LOCK_parallel_entry;
......@@ -419,6 +427,14 @@ struct rpl_parallel_entry {
/* The group_commit_orderer object for the events currently being queued. */
group_commit_orderer *current_gco;
/*
Circular buffer of size slave_parallel_threads to hold XIDs of XA-prepare
group of events which may be processed concurrently.
See how handle_xa_prepera_duplicate_xid operates on it.
*/
Dynamic_array<std::pair<std::size_t, uint32>> concurrent_xaps_window;
uint32 cxap_lhs, cxap_rhs;
rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage,
Gtid_log_event *gtid_ev);
......
......@@ -2155,12 +2155,14 @@ rpl_group_info::reinit(Relay_log_info *rli)
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
speculation= SPECULATE_NO;
commit_orderer.reinit();
is_async_xac= false;
}
rpl_group_info::rpl_group_info(Relay_log_info *rli)
: thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
is_async_xac(false)
{
reinit(rli);
bzero(&current_gtid, sizeof(current_gtid));
......@@ -2291,7 +2293,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
if (thd->transaction->xid_state.is_explicit_XA() &&
thd->transaction->xid_state.get_state_code() != XA_PREPARED)
xa_trans_force_rollback(thd);
is_async_xac= false;
thd->release_transactional_locks();
if (thd == rli->sql_driver_thd)
......
......@@ -835,6 +835,14 @@ struct rpl_group_info
};
uchar killed_for_retry;
/*
When true indicates that the user xa transaction is going to
complete (with COMMIT or ROLLBACK) by the worker thread,
*while* another worker is still preparing it. Once the latter is done
the xid will be acquired and the flag gets reset.
*/
bool is_async_xac;
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
void reinit(Relay_log_info *rli);
......
......@@ -137,7 +137,7 @@ template <class Elem> class Dynamic_array
*/
Elem& at(size_t idx)
{
DBUG_ASSERT(idx < array.elements);
DBUG_ASSERT(idx < max_size());
return *(((Elem*)array.buffer) + idx);
}
/// Const variant of at(), which cannot change data
......@@ -172,6 +172,8 @@ template <class Elem> class Dynamic_array
size_t size() const { return array.elements; }
size_t max_size() const { return array.max_element; }
const Elem *end() const
{
return back() + 1;
......
......@@ -7960,6 +7960,7 @@ wait_for_commit::wait_for_commit()
{
mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
mysql_cond_init(key_COND_wait_commit_dep, &COND_wait_commit_dep, 0);
reinit();
}
......@@ -7989,6 +7990,7 @@ wait_for_commit::~wait_for_commit()
mysql_mutex_destroy(&LOCK_wait_commit);
mysql_cond_destroy(&COND_wait_commit);
mysql_cond_destroy(&COND_wait_commit_dep);
}
......
......@@ -2367,6 +2367,15 @@ struct wait_for_commit
event group is fully done.
*/
bool wakeup_blocked;
/*
The condition variable servers as a part of facilities to handle various
commit time additional dependency between groups of replication events, e.g
XA-Prepare -> XA-Commit, or XA-Prepare -> XA-Prepare all with the same xid.
*/
mysql_cond_t COND_wait_commit_dep;
#ifndef DBUG_OFF
bool debug_done;
#endif
void register_wait_for_prior_commit(wait_for_commit *waitee);
int wait_for_prior_commit(THD *thd, bool allow_kill=true)
......
This diff is collapsed.
......@@ -639,8 +639,7 @@ static void trx_undo_write_xid(buf_block_t *block, uint16_t offset,
static_cast<uint32_t>(xid.bqual_length));
const ulint xid_length= static_cast<ulint>(xid.gtrid_length
+ xid.bqual_length);
mtr->memcpy(*block, &block->page.frame[offset + TRX_UNDO_XA_XID],
xid.data, xid_length);
mtr->memcpy<mtr_t::MAYBE_NOP>(*block, &block->page.frame[offset + TRX_UNDO_XA_XID], xid.data, xid_length);
if (UNIV_LIKELY(xid_length < XIDDATASIZE))
mtr->memset(block, offset + TRX_UNDO_XA_XID + xid_length,
XIDDATASIZE - xid_length, 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment