Commit 603e6b9b authored by sjaakola's avatar sjaakola

MDEV-21723 Async slave thread BF abort and replaying fixes

If async replication slave thread conflicts with cluster replication,
then the async slave transaction should be BF aborted, and depending on the
state of async slave transaction execution, potentially also replayed.
There were problems in such BF abort implementation and the replaying was not
started.
This pull request contains fixes which make sure that if async slave thread is
marked to abort and replay, it will complete carry out the rollback and
release all locks and resources before starting the replaying. After replaying,
async slave transactions is treated as successful, so the slave thread will
continue as usual, handling next replication event.

There is also new mtr test: galera.galera_slave_replay, which stresses both a
certification failure for async slave thread and a successful BF abort
followed by replaying.
parent a9d13248
connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2;
connection node_2a;
connection node_1;
RESET MASTER;
connection node_2a;
START SLAVE;
connection node_1;
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 CHAR(1)) engine=innodb;
INSERT INTO t1 VALUES (1, 'a');
INSERT INTO t1 VALUES (3, 'a');
set binlog_format=STATEMENT;
SET AUTOCOMMIT=ON;
START TRANSACTION;
SELECT * FROM t1 FOR UPDATE;
f1 f2
1 a
3 a
UPDATE t1 SET f2 = 'c' WHERE f1 > 1;
connection node_2a;
SET SESSION wsrep_sync_wait = 0;
connect node_3, 127.0.0.1, root, , test, $NODE_MYPORT_3;
connection node_3;
SET SESSION wsrep_sync_wait = 0;
connection node_2a;
SET GLOBAL wsrep_provider_options = 'dbug=d,commit_monitor_enter_sync';
SET GLOBAL debug_dbug = "d,sync.wsrep_apply_cb";
connection node_3;
INSERT INTO test.t1 VALUES (2, 'b');
connection node_1;
COMMIT;
connection node_2a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL debug_dbug = "";
SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb";
connection node_2a;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=commit_monitor_enter_sync';
connection node_1;
SELECT COUNT(*) = 1 FROM t1 WHERE f2 = 'a';
COUNT(*) = 1
1
SELECT COUNT(*) = 1 FROM t1 WHERE f2 = 'c';
COUNT(*) = 1
1
SELECT * FROM t1;
f1 f2
1 a
3 c
connection node_2a;
set session wsrep_sync_wait=15;
set session wsrep_sync_wait=0;
wsrep_local_replays
1
SELECT * FROM t1;
f1 f2
1 a
2 b
3 c
SET DEBUG_SYNC = "RESET";
#
# test phase with real abort
#
connection node_1;
set binlog_format=ROW;
insert into t1 values (4, 'd');
SET AUTOCOMMIT=ON;
START TRANSACTION;
UPDATE t1 SET f2 = 'd' WHERE f1 = 3;
connection node_2a;
SET GLOBAL wsrep_provider_options = 'dbug=d,commit_monitor_enter_sync';
SET GLOBAL debug_dbug = "d,sync.wsrep_apply_cb";
connection node_3;
UPDATE test.t1 SET f2 = 'e' WHERE f1 = 3;
connection node_1;
COMMIT;
connection node_2a;
SET GLOBAL debug_dbug = "";
SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb";
connection node_2a;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=commit_monitor_enter_sync';
SET DEBUG_SYNC = "RESET";
connection node_2a;
set session wsrep_sync_wait=15;
SELECT COUNT(*) = 1 FROM test.t1 WHERE f2 = 'e';
COUNT(*) = 1
1
set session wsrep_sync_wait=0;
STOP SLAVE;
RESET SLAVE;
DROP TABLE t1;
connection node_1;
DROP TABLE t1;
RESET MASTER;
!include ../galera_2nodes_as_slave.cnf
#
# This test tests the operation of transaction replay for async replication slave.
# If a potentially conflicting galera transaction arrives at
# just the right time during the commit and has lock conflict with async replication transaction
# applied by slave SQL thread, then the async replication transaction should either abort
# or rollback and replay (depending on the nature of lock conflict).
#
--source include/have_innodb.inc
--source include/have_debug_sync.inc
--connect node_2a, 127.0.0.1, root, , test, $NODE_MYPORT_2
--connection node_2a
--source include/galera_cluster.inc
#--source suite/galera/include/galera_have_debug_sync.inc
#
# node 1 is native MariaDB server operating as async replication master
#
--connection node_1
RESET MASTER;
--connection node_2a
#
# count the number of wsrep replay's done in the node
#
--let $wsrep_local_replays_old = `SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_local_replays'`
#
# nodes 2 and 3 form a galera cluster, node 2 operates as slave for native MariaDB naster in node 1
#
--disable_query_log
--eval CHANGE MASTER TO MASTER_HOST='127.0.0.1', MASTER_USER='root', MASTER_PORT=$NODE_MYPORT_1;
--enable_query_log
START SLAVE;
--connection node_1
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 CHAR(1)) engine=innodb;
INSERT INTO t1 VALUES (1, 'a');
INSERT INTO t1 VALUES (3, 'a');
#
# use statement format replication to cause a false positive conflict with async replication transaction
# and galera replication. The conflict will be on GAP lock, and slave SQL thread should rollback
# and replay
#
set binlog_format=STATEMENT;
SET AUTOCOMMIT=ON;
START TRANSACTION;
SELECT * FROM t1 FOR UPDATE;
UPDATE t1 SET f2 = 'c' WHERE f1 > 1;
--connection node_2a
# wait for create table and inserts to be replicated from master
SET SESSION wsrep_sync_wait = 0;
--let $wait_condition = SELECT COUNT(*) = 2 FROM test.t1;
--source include/wait_condition.inc
# wait for create table and inserts to be replicated in cluster
--connect node_3, 127.0.0.1, root, , test, $NODE_MYPORT_3
--connection node_3
SET SESSION wsrep_sync_wait = 0;
--let $wait_condition = SELECT COUNT(*) = 2 FROM test.t1;
--source include/wait_condition.inc
--connection node_2a
# Block the future commit of async replication
--let $galera_sync_point = commit_monitor_enter_sync
--source include/galera_set_sync_point.inc
# block also the applier before applying begins
SET GLOBAL debug_dbug = "d,sync.wsrep_apply_cb";
#
# now inject a conflicting insert from node 3, it will replicate with
# earlier seqno (than async transaction) and pause before applying in node 2
#
--connection node_3
INSERT INTO test.t1 VALUES (2, 'b');
#
# send the update from master, this will succeed here, beceuase of async replication.
# async replication will apply this in node 2 and pause before commit phase,
--connection node_1
--error 0
COMMIT;
# Wait until async slave commit is blocked in node_2
--connection node_2a
--source include/galera_wait_sync_point.inc
#
# release the applier
# note: have to clear wsrep_apply_cb sync point first, as async replication will go for replay
# and as this sync point, after BF applier is released to progress
#
SET GLOBAL debug_dbug = "";
SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb";
# Unblock the async slave commit
--connection node_2a
--source include/galera_clear_sync_point.inc
--source include/galera_signal_sync_point.inc
--connection node_1
SELECT COUNT(*) = 1 FROM t1 WHERE f2 = 'a';
SELECT COUNT(*) = 1 FROM t1 WHERE f2 = 'c';
SELECT * FROM t1;
--connection node_2a
# wsrep_local_replays has increased by 1
set session wsrep_sync_wait=15;
--let $wsrep_local_replays_new = `SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME = 'wsrep_local_replays'`
set session wsrep_sync_wait=0;
--disable_query_log
--eval SELECT $wsrep_local_replays_new - $wsrep_local_replays_old = 1 AS wsrep_local_replays;
--enable_query_log
#
# replaying of async transaction should be effective, and row 3 having 'c' in f2
#
SELECT * FROM t1;
SET DEBUG_SYNC = "RESET";
#********************************************************************************
# test phase 2
#********************************************************************************
--echo #
--echo # test phase with real abort
--echo #
--connection node_1
set binlog_format=ROW;
insert into t1 values (4, 'd');
SET AUTOCOMMIT=ON;
START TRANSACTION;
UPDATE t1 SET f2 = 'd' WHERE f1 = 3;
--connection node_2a
# wait for the last insert to be replicated from master
--let $wait_condition = SELECT COUNT(*) = 4 FROM test.t1;
--source include/wait_condition.inc
# Block the commit
--let $galera_sync_point = commit_monitor_enter_sync
--source include/galera_set_sync_point.inc
# block applier
SET GLOBAL debug_dbug = "d,sync.wsrep_apply_cb";
# Inject a conflicting update from node 3
--connection node_3
UPDATE test.t1 SET f2 = 'e' WHERE f1 = 3;
# send the update from master
--connection node_1
--error 0
COMMIT;
--connection node_2a
# release the applier
SET GLOBAL debug_dbug = "";
SET DEBUG_SYNC = "now SIGNAL signal.wsrep_apply_cb";
# Unblock the async slave commit
--connection node_2a
--source include/galera_clear_sync_point.inc
--source include/galera_signal_sync_point.inc
SET DEBUG_SYNC = "RESET";
--connection node_2a
set session wsrep_sync_wait=15;
SELECT COUNT(*) = 1 FROM test.t1 WHERE f2 = 'e';
set session wsrep_sync_wait=0;
STOP SLAVE;
RESET SLAVE;
DROP TABLE t1;
--connection node_1
DROP TABLE t1;
RESET MASTER;
...@@ -8567,8 +8567,16 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) ...@@ -8567,8 +8567,16 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
res= trans_commit(thd); /* Automatically rolls back on error. */ res= trans_commit(thd); /* Automatically rolls back on error. */
thd->mdl_context.release_transactional_locks(); thd->mdl_context.release_transactional_locks();
#ifdef WITH_WSREP
if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data);
if ((!res || (WSREP(thd) && thd->wsrep_conflict_state == MUST_REPLAY)) && sub_id)
#else
if (!res && sub_id) if (!res && sub_id)
#endif /* WITH_WSREP */
rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, rgi); rpl_global_gtid_slave_state->update_state_hash(sub_id, &gtid, rgi);
#ifdef WITH_WSREP
if (WSREP(thd)) mysql_mutex_unlock(&thd->LOCK_thd_data);
#endif /* WITH_WSREP */
/* /*
Increment the global status commit count variable Increment the global status commit count variable
......
...@@ -3567,14 +3567,34 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi, ...@@ -3567,14 +3567,34 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
exec_res= ev->apply_event(rgi); exec_res= ev->apply_event(rgi);
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (exec_res && thd->wsrep_conflict_state != NO_CONFLICT) if (exec_res)
{ {
switch (thd->wsrep_conflict_state) {
case NO_CONFLICT: break;
case MUST_REPLAY:
WSREP_DEBUG("SQL apply failed for MUST_REPLAY, res %d", exec_res);
mysql_mutex_lock(&thd->LOCK_thd_data);
wsrep_replay_transaction(thd);
switch (thd->wsrep_conflict_state) {
case NO_CONFLICT:
exec_res = 0; /* replaying succeeded, and slave may continue */
break;
case ABORTED: break; /* replaying has failed, trx is rolled back */
default:
WSREP_WARN("unexpected result of slave transaction replaying: %lld, %d",
thd->thread_id, thd->wsrep_conflict_state);
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
break;
default:
WSREP_DEBUG("SQL apply failed, res %d conflict state: %d", WSREP_DEBUG("SQL apply failed, res %d conflict state: %d",
exec_res, thd->wsrep_conflict_state); exec_res, thd->wsrep_conflict_state);
rli->abort_slave= 1; rli->abort_slave= 1;
rli->report(ERROR_LEVEL, ER_UNKNOWN_COM_ERROR, rgi->gtid_info(), rli->report(ERROR_LEVEL, ER_UNKNOWN_COM_ERROR, rgi->gtid_info(),
"Node has dropped from cluster"); "Node has dropped from cluster");
break;
} }
}
#endif #endif
#ifndef DBUG_OFF #ifndef DBUG_OFF
......
...@@ -153,8 +153,9 @@ static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) ...@@ -153,8 +153,9 @@ static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init("wsrep_relay"); if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init("wsrep_relay");
/* thd->system_thread_info.rpl_sql_info isn't initialized. */ /* thd->system_thread_info.rpl_sql_info isn't initialized. */
thd->system_thread_info.rpl_sql_info= if (!thd->slave_thread)
new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter); thd->system_thread_info.rpl_sql_info=
new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter);
thd->wsrep_exec_mode= REPL_RECV; thd->wsrep_exec_mode= REPL_RECV;
thd->net.vio= 0; thd->net.vio= 0;
...@@ -181,7 +182,8 @@ static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) ...@@ -181,7 +182,8 @@ static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
thd->user_time = shadow->user_time; thd->user_time = shadow->user_time;
thd->reset_db(shadow->db, shadow->db_length); thd->reset_db(shadow->db, shadow->db_length);
delete thd->system_thread_info.rpl_sql_info; if (!thd->slave_thread)
delete thd->system_thread_info.rpl_sql_info;
delete thd->wsrep_rgi->rli->mi; delete thd->wsrep_rgi->rli->mi;
delete thd->wsrep_rgi->rli; delete thd->wsrep_rgi->rli;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment