Commit cfbd57df authored by Denis Protivensky's avatar Denis Protivensky Committed by Julius Goryavsky

MDEV-33064: Sync trx->wsrep state from THD on trx start

InnoDB transactions may be reused after committed:
- when taken from the transaction pool
- during a DDL operation execution

In this case wsrep flag on trx object is cleared, which may cause wrong
execution logic afterwards (wsrep-related hooks are not run).

Make trx->wsrep flag initialize from THD object only once on InnoDB transaction
start and don't change it throughout the transaction's lifetime.
The flag is reset at commit time as before.

Unconditionally set wsrep=OFF for THD objects that represent InnoDB background
threads.

Make Wsrep_schema::store_view() operate in its own transaction.

Fix streaming replication transactions' fragments rollback to not switch
THD->wsrep value during transaction's execution
(use THD->wsrep_ignore_table as a workaround).
Signed-off-by: default avatarJulius Goryavsky <julius.goryavsky@mariadb.com>
parent d4723914
connection node_2;
connection node_1;
connect con1,127.0.0.1,root,,test,$NODE_MYPORT_1;
CREATE TABLE t1(c1 INT PRIMARY KEY) ENGINE=InnoDB;
CREATE TABLE t1_fk(c1 INT PRIMARY KEY, c2 INT, INDEX (c2), FOREIGN KEY (c2) REFERENCES t1(c1)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
connection con1;
SET SESSION wsrep_retry_autocommit = 0;
SET DEBUG_SYNC = 'ib_after_row_insert SIGNAL may_alter WAIT_FOR bf_abort';
INSERT INTO t1_fk VALUES (1, 1);
connection node_1;
SET DEBUG_SYNC = 'now WAIT_FOR may_alter';
SET DEBUG_SYNC = 'lock_wait_end WAIT_FOR alter_continue';
ALTER TABLE t1 ADD COLUMN c2 INT, ALGORITHM=INPLACE;
connection con1;
ERROR 40001: Deadlock found when trying to get lock; try restarting transaction
SET DEBUG_SYNC = 'now SIGNAL alter_continue';
connection node_1;
connection node_2;
INSERT INTO t1 (c1, c2) VALUES (2, 2);
connection node_1;
SET DEBUG_SYNC = 'RESET';
DROP TABLE t1_fk, t1;
disconnect con1;
disconnect node_2;
disconnect node_1;
#
# MDEV-33064: ALTER INPLACE running TOI should abort a conflicting DML operation
#
# DDL operations may commit InnoDB transactions more than once during the execution.
# In this case wsrep flag on trx object is cleared, which may cause wrong logic of
# such operations afterwards (wsrep-related hooks are not run).
# One of the consequences was that DDL operation couldn't abort a DML operation
# holding conflicting locks.
#
# The fix: re-enable wsrep flag on trx restart if it's a part of a DDL operation.
#
--source include/galera_cluster.inc
--source include/have_debug_sync.inc
--source include/have_debug.inc
--connect con1,127.0.0.1,root,,test,$NODE_MYPORT_1
CREATE TABLE t1(c1 INT PRIMARY KEY) ENGINE=InnoDB;
CREATE TABLE t1_fk(c1 INT PRIMARY KEY, c2 INT, INDEX (c2), FOREIGN KEY (c2) REFERENCES t1(c1)) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
--connection con1
SET SESSION wsrep_retry_autocommit = 0;
SET DEBUG_SYNC = 'ib_after_row_insert SIGNAL may_alter WAIT_FOR bf_abort';
# INSERT also grabs FK-referenced table lock.
--send
INSERT INTO t1_fk VALUES (1, 1);
--connection node_1
SET DEBUG_SYNC = 'now WAIT_FOR may_alter';
SET DEBUG_SYNC = 'lock_wait_end WAIT_FOR alter_continue';
# ALTER BF-aborts INSERT.
--send
ALTER TABLE t1 ADD COLUMN c2 INT, ALGORITHM=INPLACE;
--connection con1
# INSERT gets BF-aborted.
--error ER_LOCK_DEADLOCK
--reap
SET DEBUG_SYNC = 'now SIGNAL alter_continue';
--connection node_1
# ALTER succeeds.
--reap
--connection node_2
# Sanity check that ALTER has been replicated.
INSERT INTO t1 (c1, c2) VALUES (2, 2);
# Cleanup.
--connection node_1
SET DEBUG_SYNC = 'RESET';
DROP TABLE t1_fk, t1;
--disconnect con1
--source include/galera_end.inc
...@@ -7885,7 +7885,9 @@ int handler::ha_delete_row(const uchar *buf) ...@@ -7885,7 +7885,9 @@ int handler::ha_delete_row(const uchar *buf)
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
THD *thd= ha_thd(); THD *thd= ha_thd();
if (WSREP_NNULL(thd)) /* For streaming replication, when removing fragments, don't call
wsrep_after_row() as that would initiate new streaming transaction */
if (WSREP_NNULL(thd) && !thd->wsrep_ignore_table)
{ {
/* for streaming replication, the following wsrep_after_row() /* for streaming replication, the following wsrep_after_row()
may replicate a fragment, so we have to declare potential PA may replicate a fragment, so we have to declare potential PA
......
...@@ -5068,6 +5068,9 @@ MYSQL_THD create_background_thd() ...@@ -5068,6 +5068,9 @@ MYSQL_THD create_background_thd()
thd->real_id= 0; thd->real_id= 0;
thd->thread_id= 0; thd->thread_id= 0;
thd->query_id= 0; thd->query_id= 0;
#ifdef WITH_WSREP
thd->variables.wsrep_on= FALSE;
#endif /* WITH_WSREP */
return thd; return thd;
} }
...@@ -6411,7 +6414,8 @@ int THD::decide_logging_format(TABLE_LIST *tables) ...@@ -6411,7 +6414,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
wsrep_is_active(this) && wsrep_is_active(this) &&
variables.wsrep_trx_fragment_size > 0) variables.wsrep_trx_fragment_size > 0)
{ {
if (!is_current_stmt_binlog_format_row()) if (!is_current_stmt_binlog_disabled() &&
!is_current_stmt_binlog_format_row())
{ {
my_message(ER_NOT_SUPPORTED_YET, my_message(ER_NOT_SUPPORTED_YET,
"Streaming replication not supported with " "Streaming replication not supported with "
......
...@@ -155,6 +155,24 @@ class wsrep_off ...@@ -155,6 +155,24 @@ class wsrep_off
my_bool m_wsrep_on; my_bool m_wsrep_on;
}; };
class wsrep_ignore_table
{
public:
wsrep_ignore_table(THD* thd)
: m_thd(thd)
, m_wsrep_ignore_table(thd->wsrep_ignore_table)
{
thd->wsrep_ignore_table= true;
}
~wsrep_ignore_table()
{
m_thd->wsrep_ignore_table= m_wsrep_ignore_table;
}
private:
THD* m_thd;
my_bool m_wsrep_ignore_table;
};
class thd_server_status class thd_server_status
{ {
public: public:
...@@ -739,6 +757,12 @@ int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view) ...@@ -739,6 +757,12 @@ int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view)
Wsrep_schema_impl::binlog_off binlog_off(thd); Wsrep_schema_impl::binlog_off binlog_off(thd);
Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd); Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
if (trans_begin(thd, MYSQL_START_TRANS_OPT_READ_WRITE))
{
WSREP_ERROR("Failed to start transaction for store view");
goto out_not_started;
}
/* /*
Clean up cluster table and members table. Clean up cluster table and members table.
*/ */
...@@ -832,7 +856,22 @@ int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view) ...@@ -832,7 +856,22 @@ int Wsrep_schema::store_view(THD* thd, const Wsrep_view& view)
#endif /* WSREP_SCHEMA_MEMBERS_HISTORY */ #endif /* WSREP_SCHEMA_MEMBERS_HISTORY */
ret= 0; ret= 0;
out: out:
if (ret)
{
trans_rollback_stmt(thd);
if (!trans_rollback(thd))
{
close_thread_tables(thd);
}
}
else if (trans_commit(thd))
{
ret= 1;
WSREP_ERROR("Failed to commit transaction for store view");
}
thd->release_transactional_locks();
out_not_started:
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
...@@ -1184,7 +1223,7 @@ int Wsrep_schema::remove_fragments(THD* thd, ...@@ -1184,7 +1223,7 @@ int Wsrep_schema::remove_fragments(THD* thd,
int ret= 0; int ret= 0;
WSREP_DEBUG("Removing %zu fragments", fragments.size()); WSREP_DEBUG("Removing %zu fragments", fragments.size());
Wsrep_schema_impl::wsrep_off wsrep_off(thd); Wsrep_schema_impl::wsrep_ignore_table wsrep_ignore_table(thd);
Wsrep_schema_impl::binlog_off binlog_off(thd); Wsrep_schema_impl::binlog_off binlog_off(thd);
Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd); Wsrep_schema_impl::sql_safe_updates sql_safe_updates(thd);
......
...@@ -239,29 +239,9 @@ void Wsrep_server_service::log_view( ...@@ -239,29 +239,9 @@ void Wsrep_server_service::log_view(
view.state_id().seqno().get() >= prev_view.state_id().seqno().get()); view.state_id().seqno().get() >= prev_view.state_id().seqno().get());
} }
if (trans_begin(applier->m_thd, MYSQL_START_TRANS_OPT_READ_WRITE)) if (wsrep_schema->store_view(applier->m_thd, view))
{ {
WSREP_WARN("Failed to start transaction for store view"); WSREP_WARN("Failed to store view");
}
else
{
if (wsrep_schema->store_view(applier->m_thd, view))
{
WSREP_WARN("Failed to store view");
trans_rollback_stmt(applier->m_thd);
if (!trans_rollback(applier->m_thd))
{
close_thread_tables(applier->m_thd);
}
}
else
{
if (trans_commit(applier->m_thd))
{
WSREP_WARN("Failed to commit transaction for store view");
}
}
applier->m_thd->release_transactional_locks();
} }
/* /*
......
...@@ -2892,10 +2892,6 @@ innobase_trx_init( ...@@ -2892,10 +2892,6 @@ innobase_trx_init(
thd, OPTION_RELAXED_UNIQUE_CHECKS); thd, OPTION_RELAXED_UNIQUE_CHECKS);
trx->snapshot_isolation = THDVAR(thd, snapshot_isolation) & 1; trx->snapshot_isolation = THDVAR(thd, snapshot_isolation) & 1;
#ifdef WITH_WSREP
trx->wsrep = wsrep_on(thd);
#endif
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -4414,9 +4410,6 @@ innobase_commit_low( ...@@ -4414,9 +4410,6 @@ innobase_commit_low(
trx_commit_for_mysql(trx); trx_commit_for_mysql(trx);
} else { } else {
trx->will_lock = false; trx->will_lock = false;
#ifdef WITH_WSREP
trx->wsrep = false;
#endif /* WITH_WSREP */
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
...@@ -8718,7 +8711,10 @@ ha_innobase::update_row( ...@@ -8718,7 +8711,10 @@ ha_innobase::update_row(
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (error == DB_SUCCESS && trx->is_wsrep() if (error == DB_SUCCESS &&
/* For sequences, InnoDB transaction may not have been started yet.
Check THD-level wsrep state in that case. */
(trx->is_wsrep() || (!trx_is_started(trx) && wsrep_on(m_user_thd)))
&& wsrep_thd_is_local(m_user_thd) && wsrep_thd_is_local(m_user_thd)
&& !wsrep_thd_ignore_table(m_user_thd)) { && !wsrep_thd_ignore_table(m_user_thd)) {
DBUG_PRINT("wsrep", ("update row key")); DBUG_PRINT("wsrep", ("update row key"));
......
...@@ -919,6 +919,7 @@ trx_start_low( ...@@ -919,6 +919,7 @@ trx_start_low(
#ifdef WITH_WSREP #ifdef WITH_WSREP
trx->xid.null(); trx->xid.null();
trx->wsrep = wsrep_on(trx->mysql_thd);
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
ut_a(ib_vector_is_empty(trx->autoinc_locks)); ut_a(ib_vector_is_empty(trx->autoinc_locks));
...@@ -1492,6 +1493,8 @@ TRANSACTIONAL_INLINE inline void trx_t::commit_in_memory(const mtr_t *mtr) ...@@ -1492,6 +1493,8 @@ TRANSACTIONAL_INLINE inline void trx_t::commit_in_memory(const mtr_t *mtr)
trx_finalize_for_fts(this, undo_no != 0); trx_finalize_for_fts(this, undo_no != 0);
#ifdef WITH_WSREP #ifdef WITH_WSREP
ut_ad(is_wsrep() == wsrep_on(mysql_thd));
/* Serialization history has been written and the transaction is /* Serialization history has been written and the transaction is
committed in memory, which makes this commit ordered. Release commit committed in memory, which makes this commit ordered. Release commit
order critical section. */ order critical section. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment