Commit ea986950 authored by Brandon Nesterenko's avatar Brandon Nesterenko Committed by Brandon Nesterenko

MDEV-33921: Replication breaks when filtering two-phase XA transactions

There are two problems.

First, replication fails when XA transactions are used where the
slave has replicate_do_db set and the client has touched a different
database when running DML such as inserts. This is because XA
commands are not treated as keywords, and are thereby not exempt
from the replication filter. The effect of this is that during an XA
transaction, if its logged “use db” from the master is filtered out
by the replication filter, then XA END will be ignored, yet its
corresponding XA PREPARE will be executed in an invalid state,
thereby breaking replication.

Second, if the slave replicates an XA transaction which results in
an empty transaction, the XA START through XA PREPARE first phase of
the transaction won’t be binlogged, yet the XA COMMIT will be
binlogged. This will break replication in chain configurations.

The first problem is fixed by treating XA commands in
Query_log_event as keywords, thus allowing them to bypass the
replication filter. Note that Query_log_event::is_trans_keyword() is
changed to accept a new parameter to define its mode, to either
check for XA commands or regular transaction commands, but not both.
In addition, mysqlbinlog is adapted to use this mode so its
--database filter does not remove XA commands from its output.

The second problem fixed by overwriting the XA state in the XID
cache to be XA_ROLLBACK_ONLY, so at commit time, the server knows to
rollback the transaction and skip its binlogging. If the xid cache
is cleared before an XA transaction receives its completion command
(e.g. on server shutdown), then before reporting ER_XAER_NOTA when
the completion command is executed, the filter is first checked if
the database is ignored, and if so, the error is ignored.

Reviewed By:
============
Kristian Nielsen <knielsen@knielsen-hq.org>
Andrei Elkin <andrei.elkin@mariadb.com>
parent 9fdc0e54
...@@ -1071,7 +1071,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, ...@@ -1071,7 +1071,7 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
case QUERY_COMPRESSED_EVENT: case QUERY_COMPRESSED_EVENT:
{ {
Query_log_event *qe= (Query_log_event*)ev; Query_log_event *qe= (Query_log_event*)ev;
if (!qe->is_trans_keyword()) if (!qe->is_trans_keyword(print_event_info->is_xa_trans()))
{ {
if (shall_skip_database(qe->db)) if (shall_skip_database(qe->db))
goto end; goto end;
......
...@@ -1164,6 +1164,84 @@ include/sync_with_master_gtid.inc ...@@ -1164,6 +1164,84 @@ include/sync_with_master_gtid.inc
connection server_1; connection server_1;
set @@binlog_format = @sav_binlog_format; set @@binlog_format = @sav_binlog_format;
set @@global.binlog_format = @sav_binlog_format; set @@global.binlog_format = @sav_binlog_format;
#
# MDEV-33921.1: If a slave's replication of an XA transaction results in
# an empty transaction, e.g. due to replication filters, the slave
# should not binlog any part of the XA transaction.
connection server_1;
create database db1;
create database db2;
create table db1.t1 (a int) engine=innodb;
include/save_master_gtid.inc
connection server_3;
include/sync_with_master_gtid.inc
include/stop_slave.inc
connection server_2;
include/stop_slave.inc
SET @@GLOBAL.replicate_ignore_db= "";
SET @@GLOBAL.replicate_do_db= "db2";
include/start_slave.inc
connection server_1;
use db1;
XA START "x1";
insert into db1.t1 values (1);
XA END "x1";
XA PREPARE "x1";
XA COMMIT "x1";
include/save_master_gtid.inc
connection server_2;
include/sync_with_master_gtid.inc
connection server_2;
include/save_master_gtid.inc
connection server_3;
include/start_slave.inc
include/sync_with_master_gtid.inc
#
# 33921.2: If the slave shuts down after "preparing" a filtered-to-empty
# XA transaction (and not completing it), then when the respective
# XA completion (COMMIT in this test) command is replicated, the slave
# should not throw ER_XAER_NOTA. Note that internally, the error is
# thrown, but it is ignored because the target db is filtered.
connection server_3;
include/stop_slave.inc
connection server_1;
use db1;
XA START "x2";
insert into db1.t1 values (2);
XA END "x2";
XA PREPARE "x2";
include/save_master_gtid.inc
connection server_2;
include/sync_with_master_gtid.inc
# Connection named slave is needed for reconnection
connect slave,localhost,root,,;
connect slave1,localhost,root,,;
include/rpl_restart_server.inc [server_number=2]
connection server_2;
include/stop_slave.inc
SET @@GLOBAL.replicate_do_db= "db2";
include/start_slave.inc
connection server_1;
XA COMMIT "x2";
connection server_2;
include/sync_with_master_gtid.inc
include/save_master_gtid.inc
connection server_3;
include/start_slave.inc
include/sync_with_master_gtid.inc
#
# 33921.3: Ensure XA commands are not considered by mysqlbinlog's
# --database filter
connection server_1;
# MYSQL_BINLOG datadir/binlog_file --start-position=pre_xa_pos --database=db2 --result-file=assert_file
include/assert_grep.inc [Mysqlbinlog should output all XA commands from the filtered transaction]
connection server_2;
include/stop_slave.inc
SET @@GLOBAL.replicate_do_db="";
include/start_slave.inc
connection server_1;
drop database db1;
drop database db2;
connection server_1; connection server_1;
include/rpl_end.inc include/rpl_end.inc
# End of rpl_xa_empty_transaction.test # End of rpl_xa_empty_transaction.test
...@@ -32,6 +32,10 @@ ...@@ -32,6 +32,10 @@
# MDEV-25616: Binlog event for XA COMMIT is generated without matching # MDEV-25616: Binlog event for XA COMMIT is generated without matching
# XA START, replication aborts # XA START, replication aborts
# #
# MDEV-33921: Replication fails when XA transactions are used where the slave
# has replicate_do_db set and the client has touched a different
# database when running DML such as inserts.
#
--source include/have_log_bin.inc --source include/have_log_bin.inc
--let $rpl_server_count= 3 --let $rpl_server_count= 3
...@@ -167,6 +171,129 @@ set @@global.binlog_format = row; ...@@ -167,6 +171,129 @@ set @@global.binlog_format = row;
set @@binlog_format = @sav_binlog_format; set @@binlog_format = @sav_binlog_format;
set @@global.binlog_format = @sav_binlog_format; set @@global.binlog_format = @sav_binlog_format;
--echo #
--echo # MDEV-33921.1: If a slave's replication of an XA transaction results in
--echo # an empty transaction, e.g. due to replication filters, the slave
--echo # should not binlog any part of the XA transaction.
#
# Note that the MDEV-33921 report is actually about that XA END is filtered
# out (not executed), and then its corresponding XA PREPARE errors because the
# XA state of the transaction is incorrect. This test case inherently tests
# both bugs.
--connection server_1
create database db1;
create database db2;
create table db1.t1 (a int) engine=innodb;
--source include/save_master_gtid.inc
--connection server_3
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
--connection server_2
--source include/stop_slave.inc
SET @@GLOBAL.replicate_ignore_db= "";
SET @@GLOBAL.replicate_do_db= "db2";
--source include/start_slave.inc
--connection server_1
--let $pre_xa_gtid= `SELECT @@global.gtid_binlog_pos`
use db1;
XA START "x1";
insert into db1.t1 values (1);
XA END "x1";
XA PREPARE "x1";
XA COMMIT "x1";
--source include/save_master_gtid.inc
--connection server_2
--source include/sync_with_master_gtid.inc
--let $slave_binlogged_gtid= `SELECT @@global.gtid_binlog_pos`
if (`SELECT strcmp("$slave_binlogged_gtid","$pre_xa_gtid")`)
{
--die Slave binlogged an empty XA transaction yet should not have
}
--connection server_2
--source include/save_master_gtid.inc
--connection server_3
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
--echo #
--echo # 33921.2: If the slave shuts down after "preparing" a filtered-to-empty
--echo # XA transaction (and not completing it), then when the respective
--echo # XA completion (COMMIT in this test) command is replicated, the slave
--echo # should not throw ER_XAER_NOTA. Note that internally, the error is
--echo # thrown, but it is ignored because the target db is filtered.
--connection server_3
--source include/stop_slave.inc
--connection server_1
--let $pre_xa_gtid= `SELECT @@global.gtid_binlog_pos`
# Used by mysqlbinlog in part 3
--let $pre_xa_pos = query_get_value(SHOW MASTER STATUS, Position, 1)
use db1;
XA START "x2";
insert into db1.t1 values (2);
XA END "x2";
XA PREPARE "x2";
--source include/save_master_gtid.inc
--connection server_2
--source include/sync_with_master_gtid.inc
--let $rpl_server_number= 2
--echo # Connection named slave is needed for reconnection
--connect(slave,localhost,root,,)
--connect(slave1,localhost,root,,)
--source include/rpl_restart_server.inc
--connection server_2
--source include/stop_slave.inc
SET @@GLOBAL.replicate_do_db= "db2";
--source include/start_slave.inc
--connection server_1
XA COMMIT "x2";
--connection server_2
--source include/sync_with_master_gtid.inc
--source include/save_master_gtid.inc
--connection server_3
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
--echo #
--echo # 33921.3: Ensure XA commands are not considered by mysqlbinlog's
--echo # --database filter
--connection server_1
--let $datadir= `select @@datadir`
--let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1)
--let assert_file= $MYSQLTEST_VARDIR/tmp/binlog_decoded.out
--echo # MYSQL_BINLOG datadir/binlog_file --start-position=pre_xa_pos --database=db2 --result-file=assert_file
--exec $MYSQL_BINLOG $datadir/$binlog_file --start-position=$pre_xa_pos --database=db2 --result-file=$assert_file
--let assert_text= Mysqlbinlog should output all XA commands from the filtered transaction
--let assert_count= 4
--let assert_select= XA START|XA END|XA PREPARE|XA COMMIT
--source include/assert_grep.inc
--connection server_2
--source include/stop_slave.inc
SET @@GLOBAL.replicate_do_db="";
--source include/start_slave.inc
--connection server_1
drop database db1;
drop database db2;
# #
# Cleanup # Cleanup
--connection server_1 --connection server_1
......
...@@ -1444,6 +1444,22 @@ int ha_prepare(THD *thd) ...@@ -1444,6 +1444,22 @@ int ha_prepare(THD *thd)
error=1; error=1;
} }
} }
else if (thd->rgi_slave)
{
/*
Slave threads will always process XA COMMITs in the binlog handler (see
MDEV-25616 and MDEV-30423), so if this is a slave thread preparing a
transaction which proved empty during replication (e.g. because of
replication filters) then mark it as XA_ROLLBACK_ONLY so the follow up
XA COMMIT will know to roll it back, rather than try to commit and binlog
a standalone XA COMMIT (without its preceding XA START - XA PREPARE).
If the xid_cache is cleared before the completion event comes, before
issuing ER_XAER_NOTA, first check if the event targets an ignored
database, and ignore the error if so.
*/
thd->transaction->xid_state.set_rollback_only();
}
DBUG_RETURN(error); DBUG_RETURN(error);
} }
......
...@@ -845,6 +845,7 @@ typedef struct st_print_event_info ...@@ -845,6 +845,7 @@ typedef struct st_print_event_info
uint lc_time_names_number; uint lc_time_names_number;
uint charset_database_number; uint charset_database_number;
uint verbose; uint verbose;
uchar gtid_ev_flags2;
uint32 flags2; uint32 flags2;
uint32 server_id; uint32 server_id;
uint32 domain_id; uint32 domain_id;
...@@ -916,6 +917,8 @@ typedef struct st_print_event_info ...@@ -916,6 +917,8 @@ typedef struct st_print_event_info
copy_event_cache_to_file_and_reinit(&body_cache, file); copy_event_cache_to_file_and_reinit(&body_cache, file);
fflush(file); fflush(file);
} }
my_bool is_xa_trans();
} PRINT_EVENT_INFO; } PRINT_EVENT_INFO;
#endif // MYSQL_CLIENT #endif // MYSQL_CLIENT
...@@ -2173,7 +2176,7 @@ class Query_log_event: public Log_event ...@@ -2173,7 +2176,7 @@ class Query_log_event: public Log_event
If true, the event always be applied by slave SQL thread or be printed by If true, the event always be applied by slave SQL thread or be printed by
mysqlbinlog mysqlbinlog
*/ */
bool is_trans_keyword() bool is_trans_keyword(bool is_xa)
{ {
/* /*
Before the patch for bug#50407, The 'SAVEPOINT and ROLLBACK TO' Before the patch for bug#50407, The 'SAVEPOINT and ROLLBACK TO'
...@@ -2186,10 +2189,11 @@ class Query_log_event: public Log_event ...@@ -2186,10 +2189,11 @@ class Query_log_event: public Log_event
but we don't handle these cases and after the patch, both quiries are but we don't handle these cases and after the patch, both quiries are
binlogged in upper case with no comments. binlogged in upper case with no comments.
*/ */
return !strncmp(query, "BEGIN", q_len) || return is_xa ? !strncasecmp(query, C_STRING_WITH_LEN("XA "))
!strncmp(query, "COMMIT", q_len) || : (!strncmp(query, "BEGIN", q_len) ||
!strncasecmp(query, "SAVEPOINT", 9) || !strncmp(query, "COMMIT", q_len) ||
!strncasecmp(query, "ROLLBACK", 8); !strncasecmp(query, "SAVEPOINT", 9) ||
!strncasecmp(query, "ROLLBACK", 8));
} }
virtual bool is_begin() { return !strcmp(query, "BEGIN"); } virtual bool is_begin() { return !strcmp(query, "BEGIN"); }
virtual bool is_commit() { return !strcmp(query, "COMMIT"); } virtual bool is_commit() { return !strcmp(query, "COMMIT"); }
......
...@@ -1835,7 +1835,7 @@ bool Query_log_event::print_query_header(IO_CACHE* file, ...@@ -1835,7 +1835,7 @@ bool Query_log_event::print_query_header(IO_CACHE* file,
if ((flags & LOG_EVENT_SUPPRESS_USE_F)) if ((flags & LOG_EVENT_SUPPRESS_USE_F))
{ {
if (!is_trans_keyword()) if (!is_trans_keyword(print_event_info->is_xa_trans()))
print_event_info->db[0]= '\0'; print_event_info->db[0]= '\0';
} }
else if (db) else if (db)
...@@ -3762,6 +3762,7 @@ st_print_event_info::st_print_event_info() ...@@ -3762,6 +3762,7 @@ st_print_event_info::st_print_event_info()
bzero(time_zone_str, sizeof(time_zone_str)); bzero(time_zone_str, sizeof(time_zone_str));
delimiter[0]= ';'; delimiter[0]= ';';
delimiter[1]= 0; delimiter[1]= 0;
gtid_ev_flags2= 0;
flags2_inited= 0; flags2_inited= 0;
flags2= 0; flags2= 0;
sql_mode_inited= 0; sql_mode_inited= 0;
...@@ -3795,6 +3796,11 @@ st_print_event_info::st_print_event_info() ...@@ -3795,6 +3796,11 @@ st_print_event_info::st_print_event_info()
#endif #endif
} }
my_bool st_print_event_info::is_xa_trans()
{
return (gtid_ev_flags2 &
(Gtid_log_event::FL_PREPARED_XA | Gtid_log_event::FL_COMPLETED_XA));
}
bool copy_event_cache_to_string_and_reinit(IO_CACHE *cache, LEX_STRING *to) bool copy_event_cache_to_string_and_reinit(IO_CACHE *cache, LEX_STRING *to)
{ {
...@@ -3906,6 +3912,8 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) ...@@ -3906,6 +3912,8 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
goto err; goto err;
} }
print_event_info->gtid_ev_flags2= flags2;
return cache.flush_data(); return cache.flush_data();
err: err:
return 1; return 1;
......
...@@ -1437,7 +1437,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, ...@@ -1437,7 +1437,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
is created we create tables with thd->variables.wsrep_on=false is created we create tables with thd->variables.wsrep_on=false
to avoid replicating wsrep_schema tables to other nodes. to avoid replicating wsrep_schema tables to other nodes.
*/ */
if (WSREP_ON && !is_trans_keyword()) if (WSREP_ON && !is_trans_keyword(false))
{ {
thd->wsrep_PA_safe= false; thd->wsrep_PA_safe= false;
} }
...@@ -1715,7 +1715,11 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, ...@@ -1715,7 +1715,11 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
::do_apply_event(), then the companion SET also have so ::do_apply_event(), then the companion SET also have so
we don't need to reset_one_shot_variables(). we don't need to reset_one_shot_variables().
*/ */
if (is_trans_keyword() || rpl_filter->db_ok(thd->db.str)) if (rpl_filter->is_db_empty() ||
is_trans_keyword(
(rgi->gtid_ev_flags2 & (Gtid_log_event::FL_PREPARED_XA |
Gtid_log_event::FL_COMPLETED_XA))) ||
rpl_filter->db_ok(thd->db.str))
{ {
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (!wsrep_thd_is_applying(thd)) if (!wsrep_thd_is_applying(thd))
...@@ -2040,6 +2044,16 @@ START SLAVE; . Query: '%s'", expected_error, thd->query()); ...@@ -2040,6 +2044,16 @@ START SLAVE; . Query: '%s'", expected_error, thd->query());
actual_error == ER_CONNECTION_KILLED) actual_error == ER_CONNECTION_KILLED)
thd->reset_killed(); thd->reset_killed();
} }
else if (actual_error == ER_XAER_NOTA && !rpl_filter->db_ok(get_db()))
{
/*
If there is an XA query whos XID cannot be found, if the replication
filter is active and filters the target database, assume that the XID
cache has been cleared (e.g. by server restart) since it was prepared,
so we can just ignore this event.
*/
thd->clear_error(1);
}
/* /*
Other cases: mostly we expected no error and get one. Other cases: mostly we expected no error and get one.
*/ */
......
...@@ -268,6 +268,13 @@ Rpl_filter::is_on() ...@@ -268,6 +268,13 @@ Rpl_filter::is_on()
} }
bool
Rpl_filter::is_db_empty()
{
return do_db.is_empty() && ignore_db.is_empty();
}
/** /**
Parse and add the given comma-separated sequence of filter rules. Parse and add the given comma-separated sequence of filter rules.
......
...@@ -56,6 +56,7 @@ class Rpl_filter ...@@ -56,6 +56,7 @@ class Rpl_filter
bool db_ok_with_wild_table(const char *db); bool db_ok_with_wild_table(const char *db);
bool is_on(); bool is_on();
bool is_db_empty();
/* Setters - add filtering rules */ /* Setters - add filtering rules */
......
...@@ -180,6 +180,13 @@ void XID_STATE::set_error(uint error) ...@@ -180,6 +180,13 @@ void XID_STATE::set_error(uint error)
xid_cache_element->rm_error= error; xid_cache_element->rm_error= error;
} }
void XID_STATE::set_rollback_only()
{
xid_cache_element->xa_state= XA_ROLLBACK_ONLY;
if (current_thd)
MYSQL_SET_TRANSACTION_XA_STATE(current_thd->m_transaction_psi,
XA_ROLLBACK_ONLY);
}
void XID_STATE::er_xaer_rmfail() const void XID_STATE::er_xaer_rmfail() const
{ {
...@@ -547,8 +554,21 @@ bool trans_xa_prepare(THD *thd) ...@@ -547,8 +554,21 @@ bool trans_xa_prepare(THD *thd)
} }
else else
{ {
thd->transaction->xid_state.xid_cache_element->xa_state= XA_PREPARED; if (thd->transaction->xid_state.xid_cache_element->xa_state !=
MYSQL_SET_TRANSACTION_XA_STATE(thd->m_transaction_psi, XA_PREPARED); XA_ROLLBACK_ONLY)
{
thd->transaction->xid_state.xid_cache_element->xa_state= XA_PREPARED;
MYSQL_SET_TRANSACTION_XA_STATE(thd->m_transaction_psi, XA_PREPARED);
}
else
{
/*
In the non-err case, XA_ROLLBACK_ONLY should only be set by a slave
thread which prepared an empty transaction, to prevent binlogging a
standalone XA COMMIT.
*/
DBUG_ASSERT(thd->rgi_slave && !(thd->transaction->all.ha_list));
}
res= thd->variables.pseudo_slave_mode || thd->slave_thread ? res= thd->variables.pseudo_slave_mode || thd->slave_thread ?
slave_applier_reset_xa_trans(thd) : 0; slave_applier_reset_xa_trans(thd) : 0;
} }
......
...@@ -34,6 +34,7 @@ struct XID_STATE { ...@@ -34,6 +34,7 @@ struct XID_STATE {
bool check_has_uncommitted_xa() const; bool check_has_uncommitted_xa() const;
bool is_explicit_XA() const { return xid_cache_element != 0; } bool is_explicit_XA() const { return xid_cache_element != 0; }
void set_error(uint error); void set_error(uint error);
void set_rollback_only();
void er_xaer_rmfail() const; void er_xaer_rmfail() const;
XID *get_xid() const; XID *get_xid() const;
enum xa_states get_state_code() const; enum xa_states get_state_code() const;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment