Commit a8ff4f24 authored by Teemu Ollakka's avatar Teemu Ollakka Committed by Jan Lindström

MDEV-18631 Fix streaming replication with wsrep_gtid_mode=ON

With wsrep_gtid_mode=ON, the appropriate commit hooks were not
called in all cases for applied streaming transactions.

As a fix, removed all special handling of commit order critical
section from Wsrep_high_priority_service and Wsrep_storage_service.
Now commit order critical section is always entered in ha_commit_trans().

Check for wsrep_run_commit_hook is now done in handler.cc, log.cc.
This makes it explicit that the transaction is an active wsrep
transaction which must go through commit hooks.
parent fa52846b
...@@ -1207,8 +1207,9 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg) ...@@ -1207,8 +1207,9 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht_arg)
static int prepare_or_error(handlerton *ht, THD *thd, bool all) static int prepare_or_error(handlerton *ht, THD *thd, bool all)
{ {
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION && const bool run_wsrep_hooks= wsrep_run_commit_hook(thd, all);
if (run_wsrep_hooks && ht->flags & HTON_WSREP_REPLICATION &&
wsrep_before_prepare(thd, all)) wsrep_before_prepare(thd, all))
{ {
return(1); return(1);
...@@ -1222,7 +1223,7 @@ static int prepare_or_error(handlerton *ht, THD *thd, bool all) ...@@ -1222,7 +1223,7 @@ static int prepare_or_error(handlerton *ht, THD *thd, bool all)
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (WSREP(thd) && ht->flags & HTON_WSREP_REPLICATION && if (run_wsrep_hooks && !err && ht->flags & HTON_WSREP_REPLICATION &&
wsrep_after_prepare(thd, all)) wsrep_after_prepare(thd, all))
{ {
err= 1; err= 1;
...@@ -1369,6 +1370,9 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1369,6 +1370,9 @@ int ha_commit_trans(THD *thd, bool all)
Ha_trx_info *ha_info= trans->ha_list; Ha_trx_info *ha_info= trans->ha_list;
bool need_prepare_ordered, need_commit_ordered; bool need_prepare_ordered, need_commit_ordered;
my_xid xid; my_xid xid;
#ifdef WITH_WSREP
const bool run_wsrep_hooks= wsrep_run_commit_hook(thd, all);
#endif /* WITH_WSREP */
DBUG_ENTER("ha_commit_trans"); DBUG_ENTER("ha_commit_trans");
DBUG_PRINT("info",("thd: %p option_bits: %lu all: %d", DBUG_PRINT("info",("thd: %p option_bits: %lu all: %d",
thd, (ulong) thd->variables.option_bits, all)); thd, (ulong) thd->variables.option_bits, all));
...@@ -1424,7 +1428,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1424,7 +1428,7 @@ int ha_commit_trans(THD *thd, bool all)
if (is_real_trans) if (is_real_trans)
thd->transaction.cleanup(); thd->transaction.cleanup();
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (WSREP(thd) && all && !error) if (wsrep_is_active(thd) && is_real_trans && !error)
{ {
wsrep_commit_empty(thd, all); wsrep_commit_empty(thd, all);
} }
...@@ -1519,11 +1523,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1519,11 +1523,7 @@ int ha_commit_trans(THD *thd, bool all)
This commit will not go through log_and_order() where wsrep commit This commit will not go through log_and_order() where wsrep commit
ordering is normally done. Commit ordering must be done here. ordering is normally done. Commit ordering must be done here.
*/ */
bool run_wsrep_commit= (WSREP(thd) && if (run_wsrep_hooks)
rw_ha_count &&
wsrep_thd_is_local(thd) &&
wsrep_has_changes(thd, all));
if (run_wsrep_commit)
error= wsrep_before_commit(thd, all); error= wsrep_before_commit(thd, all);
if (error) if (error)
{ {
...@@ -1533,8 +1533,8 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1533,8 +1533,8 @@ int ha_commit_trans(THD *thd, bool all)
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
error= ha_commit_one_phase(thd, all); error= ha_commit_one_phase(thd, all);
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (run_wsrep_commit) if (run_wsrep_hooks)
error= wsrep_after_commit(thd, all); error= error || wsrep_after_commit(thd, all);
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
goto done; goto done;
} }
...@@ -1567,7 +1567,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1567,7 +1567,7 @@ int ha_commit_trans(THD *thd, bool all)
DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (!error && WSREP_ON) if (run_wsrep_hooks && !error)
{ {
wsrep::seqno const s= wsrep_xid_seqno(thd->wsrep_xid); wsrep::seqno const s= wsrep_xid_seqno(thd->wsrep_xid);
if (!s.is_undefined()) if (!s.is_undefined())
...@@ -1584,7 +1584,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1584,7 +1584,7 @@ int ha_commit_trans(THD *thd, bool all)
goto done; goto done;
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (wsrep_before_commit(thd, all)) if (run_wsrep_hooks && (error = wsrep_before_commit(thd, all)))
goto wsrep_err; goto wsrep_err;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order"); DEBUG_SYNC(thd, "ha_commit_trans_before_log_and_order");
...@@ -1600,10 +1600,10 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1600,10 +1600,10 @@ int ha_commit_trans(THD *thd, bool all)
error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0; error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0;
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (error || wsrep_after_commit(thd, all)) if (run_wsrep_hooks && (error || (error = wsrep_after_commit(thd, all))))
{ {
mysql_mutex_lock(&thd->LOCK_thd_data); mysql_mutex_lock(&thd->LOCK_thd_data);
if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort) if (wsrep_must_abort(thd))
{ {
mysql_mutex_unlock(&thd->LOCK_thd_data); mysql_mutex_unlock(&thd->LOCK_thd_data);
(void)tc_log->unlog(cookie, xid); (void)tc_log->unlog(cookie, xid);
...@@ -1636,7 +1636,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1636,7 +1636,7 @@ int ha_commit_trans(THD *thd, bool all)
#ifdef WITH_WSREP #ifdef WITH_WSREP
wsrep_err: wsrep_err:
mysql_mutex_lock(&thd->LOCK_thd_data); mysql_mutex_lock(&thd->LOCK_thd_data);
if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort) if (run_wsrep_hooks && wsrep_must_abort(thd))
{ {
WSREP_DEBUG("BF abort has happened after prepare & certify"); WSREP_DEBUG("BF abort has happened after prepare & certify");
mysql_mutex_unlock(&thd->LOCK_thd_data); mysql_mutex_unlock(&thd->LOCK_thd_data);
...@@ -1672,7 +1672,8 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1672,7 +1672,8 @@ int ha_commit_trans(THD *thd, bool all)
thd->mdl_context.release_lock(mdl_request.ticket); thd->mdl_context.release_lock(mdl_request.ticket);
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (WSREP(thd) && all && !error && (rw_ha_count == 0)) if (wsrep_is_active(thd) && is_real_trans && !error && (rw_ha_count == 0) &&
wsrep_not_committed(thd))
{ {
wsrep_commit_empty(thd, all); wsrep_commit_empty(thd, all);
} }
......
...@@ -7686,7 +7686,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) ...@@ -7686,7 +7686,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{ {
int is_leader= queue_for_group_commit(entry); int is_leader= queue_for_group_commit(entry);
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (is_leader >= 0 && if (wsrep_run_commit_hook(entry->thd, true) && is_leader >= 0 &&
wsrep_ordered_commit(entry->thd, entry->all, wsrep_apply_error())) wsrep_ordered_commit(entry->thd, entry->all, wsrep_apply_error()))
return true; return true;
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
......
...@@ -195,7 +195,8 @@ int Wsrep_high_priority_service::start_transaction( ...@@ -195,7 +195,8 @@ int Wsrep_high_priority_service::start_transaction(
const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta)
{ {
DBUG_ENTER(" Wsrep_high_priority_service::start_transaction"); DBUG_ENTER(" Wsrep_high_priority_service::start_transaction");
DBUG_RETURN(m_thd->wsrep_cs().start_transaction(ws_handle, ws_meta)); DBUG_RETURN(m_thd->wsrep_cs().start_transaction(ws_handle, ws_meta) ||
trans_begin(m_thd));
} }
const wsrep::transaction& Wsrep_high_priority_service::transaction() const const wsrep::transaction& Wsrep_high_priority_service::transaction() const
...@@ -204,14 +205,23 @@ const wsrep::transaction& Wsrep_high_priority_service::transaction() const ...@@ -204,14 +205,23 @@ const wsrep::transaction& Wsrep_high_priority_service::transaction() const
DBUG_RETURN(m_thd->wsrep_trx()); DBUG_RETURN(m_thd->wsrep_trx());
} }
void Wsrep_high_priority_service::adopt_transaction(const wsrep::transaction& transaction) int Wsrep_high_priority_service::adopt_transaction(
const wsrep::transaction& transaction)
{ {
DBUG_ENTER(" Wsrep_high_priority_service::adopt_transaction"); DBUG_ENTER(" Wsrep_high_priority_service::adopt_transaction");
/* Adopt transaction first to set up transaction meta data for
trans begin. If trans_begin() fails for some reason, roll back
the wsrep transaction before return. */
m_thd->wsrep_cs().adopt_transaction(transaction); m_thd->wsrep_cs().adopt_transaction(transaction);
DBUG_VOID_RETURN; int ret= trans_begin(m_thd);
if (ret)
{
m_thd->wsrep_cs().before_rollback();
m_thd->wsrep_cs().after_rollback();
}
DBUG_RETURN(ret);
} }
int Wsrep_high_priority_service::append_fragment_and_commit( int Wsrep_high_priority_service::append_fragment_and_commit(
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
...@@ -254,23 +264,8 @@ int Wsrep_high_priority_service::append_fragment_and_commit( ...@@ -254,23 +264,8 @@ int Wsrep_high_priority_service::append_fragment_and_commit(
ws_meta, true); ws_meta, true);
} }
if (!ret) ret= ret || trans_commit(m_thd);
{
DBUG_ASSERT(wsrep_thd_trx_seqno(m_thd) > 0);
if (!do_binlog_commit)
{
ret= wsrep_before_commit(m_thd, true);
}
ret= ret || trans_commit(m_thd);
if (!do_binlog_commit)
{
if (opt_log_slave_updates)
{
ret= ret || wsrep_ordered_commit(m_thd, true, wsrep_apply_error());
}
ret= ret || wsrep_after_commit(m_thd, true);
}
}
m_thd->wsrep_cs().after_applying(); m_thd->wsrep_cs().after_applying();
m_thd->mdl_context.release_transactional_locks(); m_thd->mdl_context.release_transactional_locks();
...@@ -298,47 +293,39 @@ int Wsrep_high_priority_service::commit(const wsrep::ws_handle& ws_handle, ...@@ -298,47 +293,39 @@ int Wsrep_high_priority_service::commit(const wsrep::ws_handle& ws_handle,
thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, true); thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, true);
thd_proc_info(thd, "committing"); thd_proc_info(thd, "committing");
int ret= 0;
const bool is_ordered= !ws_meta.seqno().is_undefined(); const bool is_ordered= !ws_meta.seqno().is_undefined();
/* If opt_log_slave_updates is not on, applier does not write int ret= trans_commit(thd);
anything to binlog cache and neither wsrep_before_commit()
nor wsrep_after_commit() we be reached from binlog code
path for applier. Therefore run wsrep_before_commit()
and wsrep_after_commit() here. wsrep_ordered_commit()
will be called from wsrep_ordered_commit_if_no_binlog(). */
if (!opt_log_slave_updates && !opt_bin_log && is_ordered)
{
if (m_thd->transaction.all.no_2pc == false)
{
ret= wsrep_before_prepare(thd, true);
ret= ret || wsrep_after_prepare(thd, true);
}
ret= ret || wsrep_before_commit(thd, true);
}
ret= ret || trans_commit(thd);
if (ret == 0) if (ret == 0)
{ {
m_rgi->cleanup_context(thd, 0); m_rgi->cleanup_context(thd, 0);
} }
if (ret == 0 && !opt_log_slave_updates && !opt_bin_log && is_ordered)
{
ret= wsrep_after_commit(thd, true);
}
m_thd->mdl_context.release_transactional_locks(); m_thd->mdl_context.release_transactional_locks();
thd_proc_info(thd, "wsrep applier committed"); thd_proc_info(thd, "wsrep applier committed");
if (!is_ordered) if (!is_ordered)
{ {
/* Wsrep commit was not ordered so it does not go through commit time
hooks and remains active. Roll it back to make cleanup happen
in after_applying() call. */
m_thd->wsrep_cs().before_rollback(); m_thd->wsrep_cs().before_rollback();
m_thd->wsrep_cs().after_rollback(); m_thd->wsrep_cs().after_rollback();
} }
else if (m_thd->wsrep_trx().state() == wsrep::transaction::s_executing)
{
/*
Wsrep commit was ordered but it did not go through commit time
hooks and remains active. Cycle through commit hooks to release
commit order and to make cleanup happen in after_applying() call.
This is a workaround for CTAS with empty result set.
*/
WSREP_DEBUG("Commit not finished for applier %llu", thd->thread_id);
ret= ret || m_thd->wsrep_cs().before_commit() ||
m_thd->wsrep_cs().ordered_commit() ||
m_thd->wsrep_cs().after_commit();
}
thd->lex->sql_command= SQLCOM_END;
must_exit_= check_exit_status(); must_exit_= check_exit_status();
DBUG_RETURN(ret); DBUG_RETURN(ret);
...@@ -380,6 +367,8 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, ...@@ -380,6 +367,8 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
trans_commit(thd); trans_commit(thd);
thd->close_temporary_tables(); thd->close_temporary_tables();
thd->lex->sql_command= SQLCOM_END;
wsrep_set_SE_checkpoint(client_state.toi_meta().gtid()); wsrep_set_SE_checkpoint(client_state.toi_meta().gtid());
must_exit_= check_exit_status(); must_exit_= check_exit_status();
......
...@@ -36,7 +36,7 @@ class Wsrep_high_priority_service : ...@@ -36,7 +36,7 @@ class Wsrep_high_priority_service :
int start_transaction(const wsrep::ws_handle&, int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&); const wsrep::ws_meta&);
const wsrep::transaction& transaction() const; const wsrep::transaction& transaction() const;
void adopt_transaction(const wsrep::transaction&); int adopt_transaction(const wsrep::transaction&);
int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&) = 0; int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&) = 0;
int append_fragment_and_commit(const wsrep::ws_handle&, int append_fragment_and_commit(const wsrep::ws_handle&,
const wsrep::ws_meta&, const wsrep::ws_meta&,
......
...@@ -147,39 +147,14 @@ int Wsrep_storage_service::commit(const wsrep::ws_handle& ws_handle, ...@@ -147,39 +147,14 @@ int Wsrep_storage_service::commit(const wsrep::ws_handle& ws_handle,
WSREP_DEBUG("Storage service commit: %llu, %lld", WSREP_DEBUG("Storage service commit: %llu, %lld",
ws_meta.transaction_id().get(), ws_meta.seqno().get()); ws_meta.transaction_id().get(), ws_meta.seqno().get());
int ret= 0; int ret= 0;
const bool do_binlog_commit= (opt_log_slave_updates && wsrep_gtid_mode);
const bool is_ordered= !ws_meta.seqno().is_undefined(); const bool is_ordered= !ws_meta.seqno().is_undefined();
/*
Write skip event into binlog if gtid_mode is on. This is to
maintain gtid continuity.
*/
if (do_binlog_commit && is_ordered)
{
ret= wsrep_write_skip_event(m_thd);
}
if (!ret && is_ordered) if (is_ordered)
{ {
ret= m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ret= m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, true);
ws_meta, true);
} }
if (!ret) ret= ret || trans_commit(m_thd);
{
if (!do_binlog_commit && is_ordered)
{
ret= wsrep_before_commit(m_thd, true);
}
ret= ret || trans_commit(m_thd);
if (!do_binlog_commit && is_ordered)
{
if (opt_log_slave_updates)
{
ret= ret || wsrep_ordered_commit(m_thd, true, wsrep_apply_error());
}
ret= ret || wsrep_after_commit(m_thd, true);
}
}
if (!is_ordered) if (!is_ordered)
{ {
......
/* Copyright 2016 Codership Oy <http://www.codership.com> /* Copyright 2016-2019 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
...@@ -32,7 +32,39 @@ class THD; ...@@ -32,7 +32,39 @@ class THD;
static inline bool wsrep_is_active(THD* thd) static inline bool wsrep_is_active(THD* thd)
{ {
return (thd->wsrep_cs().state() != wsrep::client_state::s_none && return (thd->wsrep_cs().state() != wsrep::client_state::s_none &&
thd->wsrep_cs().transaction().active()); thd->wsrep_cs().transaction().active());
}
/*
Return true if transaction is ordered.
*/
static inline bool wsrep_is_ordered(THD* thd)
{
return thd->wsrep_trx().ordered();
}
/*
Return true if transaction has been BF aborted but has not been
rolled back yet.
It is required that the caller holds thd->LOCK_thd_data.
*/
static inline bool wsrep_must_abort(THD* thd)
{
mysql_mutex_assert_owner(&thd->LOCK_thd_data);
return (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort);
}
/*
Return true if transaction has not been committed.
Note that we don't require thd->LOCK_thd_data here. Calling this method
makes sense only from codepaths which are past ordered_commit state
and the wsrep transaction is immune to BF aborts at that point.
*/
static inline bool wsrep_not_committed(THD* thd)
{
return (thd->wsrep_trx().state() != wsrep::transaction::s_committed);
} }
/* /*
...@@ -47,7 +79,7 @@ static inline bool wsrep_is_real(THD* thd, bool all) ...@@ -47,7 +79,7 @@ static inline bool wsrep_is_real(THD* thd, bool all)
/* /*
Check if a transaction has generated changes. Check if a transaction has generated changes.
*/ */
static inline bool wsrep_has_changes(THD* thd, my_bool all) static inline bool wsrep_has_changes(THD* thd)
{ {
return (thd->wsrep_trx().is_empty() == false); return (thd->wsrep_trx().is_empty() == false);
} }
...@@ -137,11 +169,42 @@ static inline int wsrep_after_row(THD* thd, bool) ...@@ -137,11 +169,42 @@ static inline int wsrep_after_row(THD* thd, bool)
/* /*
Helper method to determine whether commit time hooks Helper method to determine whether commit time hooks
should be run for the transaction. should be run for the transaction.
Commit hooks must be run in the following cases:
- The transaction is local and has generated write set and is committing.
- The transaction has been BF aborted
- Is running in high priority mode and is ordered. This can be replayer,
applier or storage access.
*/ */
static inline bool wsrep_run_commit_hook(THD* thd, bool all) static inline bool wsrep_run_commit_hook(THD* thd, bool all)
{ {
return (wsrep_is_real(thd, all) && wsrep_is_active(thd) && DBUG_ENTER("wsrep_run_commit_hook");
(wsrep_thd_is_applying(thd) || wsrep_has_changes(thd, all))); DBUG_PRINT("wsrep", ("Is_active: %d is_real %d has_changes %d is_applying %d "
"is_ordered: %d",
wsrep_is_active(thd), wsrep_is_real(thd, all),
wsrep_has_changes(thd), wsrep_thd_is_applying(thd),
wsrep_is_ordered(thd)));
/* Is MST commit or autocommit? */
bool ret= wsrep_is_active(thd) && wsrep_is_real(thd, all);
if (ret && !(wsrep_has_changes(thd) || /* Has generated write set */
/* Is high priority (replay, applier, storage) and the
transaction is scheduled for commit ordering */
(wsrep_thd_is_applying(thd) && wsrep_is_ordered(thd))))
{
mysql_mutex_lock(&thd->LOCK_thd_data);
DBUG_PRINT("wsrep", ("state: %s",
wsrep::to_c_string(thd->wsrep_trx().state())));
/* Transaction is local but has no changes, the commit hooks will
be skipped and the wsrep transaction is terminated in
wsrep_commit_empty() */
if (thd->wsrep_trx().state() == wsrep::transaction::s_executing)
{
ret= false;
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
}
DBUG_PRINT("wsrep", ("return: %d", ret));
DBUG_RETURN(ret);
} }
/* /*
...@@ -154,14 +217,12 @@ static inline int wsrep_before_prepare(THD* thd, bool all) ...@@ -154,14 +217,12 @@ static inline int wsrep_before_prepare(THD* thd, bool all)
DBUG_ENTER("wsrep_before_prepare"); DBUG_ENTER("wsrep_before_prepare");
WSREP_DEBUG("wsrep_before_prepare: %d", wsrep_is_real(thd, all)); WSREP_DEBUG("wsrep_before_prepare: %d", wsrep_is_real(thd, all));
int ret= 0; int ret= 0;
if (wsrep_run_commit_hook(thd, all)) DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
if ((ret= thd->wsrep_cs().before_prepare()) == 0)
{ {
if ((ret= thd->wsrep_cs().before_prepare()) == 0) DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
{ wsrep_xid_init(&thd->wsrep_xid,
DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
wsrep_xid_init(&thd->wsrep_xid,
thd->wsrep_trx().ws_meta().gtid()); thd->wsrep_trx().ws_meta().gtid());
}
} }
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
...@@ -175,8 +236,8 @@ static inline int wsrep_after_prepare(THD* thd, bool all) ...@@ -175,8 +236,8 @@ static inline int wsrep_after_prepare(THD* thd, bool all)
{ {
DBUG_ENTER("wsrep_after_prepare"); DBUG_ENTER("wsrep_after_prepare");
WSREP_DEBUG("wsrep_after_prepare: %d", wsrep_is_real(thd, all)); WSREP_DEBUG("wsrep_after_prepare: %d", wsrep_is_real(thd, all));
int ret= (wsrep_run_commit_hook(thd, all) ? DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
thd->wsrep_cs().after_prepare() : 0); int ret= thd->wsrep_cs().after_prepare();
DBUG_ASSERT(ret == 0 || thd->wsrep_cs().current_error() || DBUG_ASSERT(ret == 0 || thd->wsrep_cs().current_error() ||
thd->wsrep_cs().transaction().state() == wsrep::transaction::s_must_replay); thd->wsrep_cs().transaction().state() == wsrep::transaction::s_must_replay);
DBUG_RETURN(ret); DBUG_RETURN(ret);
...@@ -198,14 +259,12 @@ static inline int wsrep_before_commit(THD* thd, bool all) ...@@ -198,14 +259,12 @@ static inline int wsrep_before_commit(THD* thd, bool all)
wsrep_is_real(thd, all), wsrep_is_real(thd, all),
(long long)wsrep_thd_trx_seqno(thd)); (long long)wsrep_thd_trx_seqno(thd));
int ret= 0; int ret= 0;
if (wsrep_run_commit_hook(thd, all)) DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
if ((ret= thd->wsrep_cs().before_commit()) == 0)
{ {
if ((ret= thd->wsrep_cs().before_commit()) == 0) DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
{ wsrep_xid_init(&thd->wsrep_xid,
DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); thd->wsrep_trx().ws_meta().gtid());
wsrep_xid_init(&thd->wsrep_xid,
thd->wsrep_trx().ws_meta().gtid());
}
} }
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
...@@ -228,8 +287,8 @@ static inline int wsrep_ordered_commit(THD* thd, ...@@ -228,8 +287,8 @@ static inline int wsrep_ordered_commit(THD* thd,
{ {
DBUG_ENTER("wsrep_ordered_commit"); DBUG_ENTER("wsrep_ordered_commit");
WSREP_DEBUG("wsrep_ordered_commit: %d", wsrep_is_real(thd, all)); WSREP_DEBUG("wsrep_ordered_commit: %d", wsrep_is_real(thd, all));
DBUG_RETURN(wsrep_run_commit_hook(thd, all) ? DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
thd->wsrep_cs().ordered_commit() : 0); DBUG_RETURN(thd->wsrep_cs().ordered_commit());
} }
/* /*
...@@ -244,14 +303,12 @@ static inline int wsrep_after_commit(THD* thd, bool all) ...@@ -244,14 +303,12 @@ static inline int wsrep_after_commit(THD* thd, bool all)
wsrep_is_real(thd, all), wsrep_is_real(thd, all),
wsrep_is_active(thd), wsrep_is_active(thd),
(long long)wsrep_thd_trx_seqno(thd), (long long)wsrep_thd_trx_seqno(thd),
wsrep_has_changes(thd, all)); wsrep_has_changes(thd));
if (wsrep_run_commit_hook(thd, all)) DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
{ DBUG_RETURN((thd->wsrep_trx().state() == wsrep::transaction::s_committing
DBUG_RETURN((wsrep_ordered_commit_if_no_binlog(thd, all) || ? thd->wsrep_cs().ordered_commit() : 0) ||
(thd->wsrep_xid.null(), (thd->wsrep_xid.null(),
thd->wsrep_cs().after_commit()))); thd->wsrep_cs().after_commit()));
}
DBUG_RETURN(0);
} }
/* /*
...@@ -415,11 +472,25 @@ static inline void wsrep_commit_empty(THD* thd, bool all) ...@@ -415,11 +472,25 @@ static inline void wsrep_commit_empty(THD* thd, bool all)
thd->wsrep_trx().active() && thd->wsrep_trx().active() &&
thd->wsrep_trx().state() != wsrep::transaction::s_committed) thd->wsrep_trx().state() != wsrep::transaction::s_committed)
{ {
/* @todo CTAS with STATEMENT binlog format and empty result set
seems to be committing empty. Figure out why and try to fix
elsewhere. */
DBUG_ASSERT(!wsrep_has_changes(thd) ||
(thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
!thd->is_current_stmt_binlog_format_row()));
bool have_error= wsrep_current_error(thd); bool have_error= wsrep_current_error(thd);
int ret= wsrep_before_rollback(thd, all) || int ret= wsrep_before_rollback(thd, all) ||
wsrep_after_rollback(thd, all) || wsrep_after_rollback(thd, all) ||
wsrep_after_statement(thd); wsrep_after_statement(thd);
DBUG_ASSERT(have_error || !wsrep_current_error(thd)); /* The committing transaction was empty but it held some locks and
got BF aborted. As there were no certified changes in the
data, we ignore the deadlock error and rely on error reporting
by storage engine/server. */
if (!ret && !have_error && wsrep_current_error(thd))
{
DBUG_ASSERT(wsrep_current_error(thd) == wsrep::e_deadlock_error);
thd->wsrep_cs().reset_error();
}
if (ret) if (ret)
{ {
WSREP_DEBUG("wsrep_commit_empty failed: %d", wsrep_current_error(thd)); WSREP_DEBUG("wsrep_commit_empty failed: %d", wsrep_current_error(thd));
......
Subproject commit 9bec7d940cd27c9ea14e1665c4a4fd0ee5890ea7 Subproject commit badf53a28d2273741c449df0aa5fa6db67561466
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment