Commit 0e248e62 authored by Jan Lindström's avatar Jan Lindström

Merged revisions 2925--3929 from from lp:~codership/codership-mysql/5.5-23

```---------------------------------------------------------
revno: 3929 [merge]
fixes bug: https://launchpad.net/bugs/1243150
committer: Teemu Ollakka <teemu.ollakka@codership.com>
branch nick: 5.5-23
timestamp: Wed 2013-10-23 20:05:01 +0300
message:     8kB/s - 
  References lp:1243150 - initial wsrep hton cleanups
  
  * Removed wsrep_seqno_changed boolean
  * wsrep_cleanup_transaction() is now called explicitly whenever it is
    clear that transaction has come to an end
  * wsrep_trans_cache_is_empty() now checks from cache_mngr recardless of
    command type
  * Separated call to wsrep->post_commit() to own function, called from
    transaction.cc whenever appropriate
  * wsrep_thd_is_brute_force() now investigates only thd->wsrep_exec_mode
  * More comments and debug time assertions
  * Debug code to check that wsrep position stored in InnoDB is
    monotinically increasing. Enabled with UNIV_DEBUG
```

---------------------------------------------------------
revno: 3928
fixes bug: https://launchpad.net/bugs/1237889
committer: Teemu Ollakka <teemu.ollakka@codership.com>
branch nick: 5.5-23
timestamp: Tue 2013-10-22 22:01:20 +0300
message:
  References lp:1237889 - reverting fix in r3926, it broke crash recovery
------------------------------------------------------------
revno: 3927
fixes bug: https://launchpad.net/bugs/1240040
committer: Teemu Ollakka <teemu.ollakka@codership.com>
branch nick: 5.5-23
timestamp: Tue 2013-10-15 14:46:15 +0300
message:
  References lp:1240040 - added WSREP_MYSQL_DB as a key for DROP VIEW
------------------------------------------------------------
revno: 3926
fixes bug: https://launchpad.net/bugs/1237889
committer: Teemu Ollakka <teemu.ollakka@codership.com>
branch nick: 5.5-23
timestamp: Thu 2013-10-10 14:22:58 +0300
message:
  References lp:1237889 - register wsrep hton only if thd->wsrep_exec_mode == LO
CAL_STATE
------------------------------------------------------------
revno: 3925
fixes bug: https://launchpad.net/bugs/1235635
committer: Alexey Yurchenko <alexey.yurchenko@codership.com>
branch nick: 5.5-23
timestamp: Sat 2013-10-05 18:03:06 +0300
message:
  References lp:1235635 - fixed the warning by initializing c_lock to NULL.
parent eb29ce25
...@@ -1218,11 +1218,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1218,11 +1218,7 @@ int ha_commit_trans(THD *thd, bool all)
{ {
/* Free resources and perform other cleanup even for 'empty' transactions. */ /* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans) if (is_real_trans)
#ifdef WITH_WSREP thd->transaction.cleanup();
thd->transaction.cleanup(thd);
#else
thd->transaction.cleanup();
#endif /* WITH_WSREP */
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1459,11 +1455,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) ...@@ -1459,11 +1455,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
} }
/* Free resources and perform other cleanup even for 'empty' transactions. */ /* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans) if (is_real_trans)
#ifdef WITH_WSREP
thd->transaction.cleanup(thd);
#else
thd->transaction.cleanup(); thd->transaction.cleanup();
#endif /* WITH_WSREP */
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (WSREP(thd)) thd_proc_info(thd, tmp_info); if (WSREP(thd)) thd_proc_info(thd, tmp_info);
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
...@@ -1541,11 +1533,8 @@ int ha_rollback_trans(THD *thd, bool all) ...@@ -1541,11 +1533,8 @@ int ha_rollback_trans(THD *thd, bool all)
} }
/* Always cleanup. Even if nht==0. There may be savepoints. */ /* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans) if (is_real_trans)
#ifdef WITH_WSREP
thd->transaction.cleanup(thd);
#else
thd->transaction.cleanup(); thd->transaction.cleanup();
#endif /* WITH_WSREP */
if (all) if (all)
thd->transaction_rollback_request= FALSE; thd->transaction_rollback_request= FALSE;
......
...@@ -526,20 +526,9 @@ IO_CACHE * get_trans_log(THD * thd) ...@@ -526,20 +526,9 @@ IO_CACHE * get_trans_log(THD * thd)
bool wsrep_trans_cache_is_empty(THD *thd) bool wsrep_trans_cache_is_empty(THD *thd)
{ {
bool res= TRUE; binlog_cache_mngr *const cache_mngr=
if (thd_sql_command((const THD*) thd) != SQLCOM_SELECT)
res= FALSE;
else
{
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
if (cache_mngr) return (!cache_mngr || cache_mngr->trx_cache.empty());
{
res= cache_mngr->trx_cache.empty();
}
}
return res;
} }
void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end) void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
......
...@@ -811,7 +811,7 @@ extern "C" const char *wsrep_thd_exec_mode_str(THD *thd) ...@@ -811,7 +811,7 @@ extern "C" const char *wsrep_thd_exec_mode_str(THD *thd)
(!thd) ? "void" : (!thd) ? "void" :
(thd->wsrep_exec_mode == LOCAL_STATE) ? "local" : (thd->wsrep_exec_mode == LOCAL_STATE) ? "local" :
(thd->wsrep_exec_mode == REPL_RECV) ? "applier" : (thd->wsrep_exec_mode == REPL_RECV) ? "applier" :
(thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" : (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" :
(thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void"; (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void";
} }
...@@ -875,7 +875,7 @@ extern "C" my_thread_id wsrep_thd_thread_id(THD *thd) ...@@ -875,7 +875,7 @@ extern "C" my_thread_id wsrep_thd_thread_id(THD *thd)
} }
extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd) extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd)
{ {
return (thd) ? thd->wsrep_trx_seqno : -1; return (thd) ? thd->wsrep_trx_seqno : WSREP_SEQNO_UNDEFINED;
} }
extern "C" query_id_t wsrep_thd_query_id(THD *thd) extern "C" query_id_t wsrep_thd_query_id(THD *thd)
{ {
...@@ -1000,6 +1000,7 @@ THD::THD() ...@@ -1000,6 +1000,7 @@ THD::THD()
wsrep_applier(is_applier), wsrep_applier(is_applier),
wsrep_applier_closing(FALSE), wsrep_applier_closing(FALSE),
wsrep_client_thread(0), wsrep_client_thread(0),
wsrep_trx_seqno(WSREP_SEQNO_UNDEFINED),
#endif #endif
m_parser_state(NULL), m_parser_state(NULL),
#if defined(ENABLED_DEBUG_SYNC) #if defined(ENABLED_DEBUG_SYNC)
...@@ -1108,7 +1109,6 @@ THD::THD() ...@@ -1108,7 +1109,6 @@ THD::THD()
//wsrep_retry_autocommit= ::wsrep_retry_autocommit; //wsrep_retry_autocommit= ::wsrep_retry_autocommit;
wsrep_retry_counter = 0; wsrep_retry_counter = 0;
wsrep_PA_safe = true; wsrep_PA_safe = true;
wsrep_seqno_changed = false;
wsrep_retry_query = NULL; wsrep_retry_query = NULL;
wsrep_retry_query_len = 0; wsrep_retry_query_len = 0;
wsrep_retry_command = COM_CONNECT; wsrep_retry_command = COM_CONNECT;
...@@ -1470,12 +1470,10 @@ void THD::init(void) ...@@ -1470,12 +1470,10 @@ void THD::init(void)
wsrep_conflict_state= NO_CONFLICT; wsrep_conflict_state= NO_CONFLICT;
wsrep_query_state= QUERY_IDLE; wsrep_query_state= QUERY_IDLE;
wsrep_last_query_id= 0; wsrep_last_query_id= 0;
wsrep_trx_seqno= 0;
wsrep_converted_lock_session= false; wsrep_converted_lock_session= false;
wsrep_retry_counter= 0; wsrep_retry_counter= 0;
wsrep_rli= NULL; wsrep_rli= NULL;
wsrep_PA_safe= true; wsrep_PA_safe= true;
wsrep_seqno_changed= false;
wsrep_consistency_check = NO_CONSISTENCY_CHECK; wsrep_consistency_check = NO_CONSISTENCY_CHECK;
wsrep_mysql_replicated = 0; wsrep_mysql_replicated = 0;
wsrep_bf_thd = NULL; wsrep_bf_thd = NULL;
...@@ -2117,13 +2115,6 @@ void THD::cleanup_after_query() ...@@ -2117,13 +2115,6 @@ void THD::cleanup_after_query()
/* reset table map for multi-table update */ /* reset table map for multi-table update */
table_map_for_update= 0; table_map_for_update= 0;
m_binlog_invoker= FALSE; m_binlog_invoker= FALSE;
#ifdef WITH_WSREP
if (TOTAL_ORDER == wsrep_exec_mode)
{
wsrep_exec_mode = LOCAL_STATE;
}
//wsrep_trx_seqno = 0;
#endif /* WITH_WSREP */
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
if (rli_slave) if (rli_slave)
......
...@@ -985,9 +985,6 @@ struct st_savepoint { ...@@ -985,9 +985,6 @@ struct st_savepoint {
/** State of metadata locks before this savepoint was set. */ /** State of metadata locks before this savepoint was set. */
MDL_savepoint mdl_savepoint; MDL_savepoint mdl_savepoint;
}; };
#ifdef WITH_WSREP
void wsrep_cleanup_transaction(THD *thd); // THD.transactions.cleanup calls it
#endif
enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY}; enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY};
extern const char *xa_state_names[]; extern const char *xa_state_names[];
...@@ -1865,11 +1862,7 @@ class THD :public Statement, ...@@ -1865,11 +1862,7 @@ class THD :public Statement,
*/ */
CHANGED_TABLE_LIST* changed_tables; CHANGED_TABLE_LIST* changed_tables;
MEM_ROOT mem_root; // Transaction-life memory allocation pool MEM_ROOT mem_root; // Transaction-life memory allocation pool
#ifdef WITH_WSREP
void cleanup(THD *thd)
#else
void cleanup() void cleanup()
#endif
{ {
DBUG_ENTER("thd::cleanup"); DBUG_ENTER("thd::cleanup");
changed_tables= 0; changed_tables= 0;
...@@ -1883,11 +1876,6 @@ class THD :public Statement, ...@@ -1883,11 +1876,6 @@ class THD :public Statement,
if (!xid_state.rm_error) if (!xid_state.rm_error)
xid_state.xid.null(); xid_state.xid.null();
free_root(&mem_root,MYF(MY_KEEP_PREALLOC)); free_root(&mem_root,MYF(MY_KEEP_PREALLOC));
#ifdef WITH_WSREP
// Todo: convert into a plugin method
// wsrep's post-commit. LOCAL_COMMIT designates wsrep's commit was ok
if (WSREP(thd)) wsrep_cleanup_transaction(thd);
#endif /* WITH_WSREP */
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
my_bool is_active() my_bool is_active()
...@@ -2367,7 +2355,6 @@ class THD :public Statement, ...@@ -2367,7 +2355,6 @@ class THD :public Statement,
Relay_log_info* wsrep_rli; Relay_log_info* wsrep_rli;
bool wsrep_converted_lock_session; bool wsrep_converted_lock_session;
wsrep_trx_handle_t wsrep_trx_handle; wsrep_trx_handle_t wsrep_trx_handle;
bool wsrep_seqno_changed;
#ifdef WSREP_PROC_INFO #ifdef WSREP_PROC_INFO
char wsrep_info[128]; /* string for dynamic proc info */ char wsrep_info[128]; /* string for dynamic proc info */
#endif /* WSREP_PROC_INFO */ #endif /* WSREP_PROC_INFO */
......
...@@ -4705,7 +4705,7 @@ case SQLCOM_PREPARE: ...@@ -4705,7 +4705,7 @@ case SQLCOM_PREPARE:
if (check_table_access(thd, DROP_ACL, all_tables, FALSE, UINT_MAX, FALSE)) if (check_table_access(thd, DROP_ACL, all_tables, FALSE, UINT_MAX, FALSE))
goto error; goto error;
/* Conditionally writes to binlog. */ /* Conditionally writes to binlog. */
WSREP_TO_ISOLATION_BEGIN(NULL, NULL, NULL) WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
res= mysql_drop_view(thd, first_table, thd->lex->drop_mode); res= mysql_drop_view(thd, first_table, thd->lex->drop_mode);
break; break;
} }
...@@ -6164,6 +6164,9 @@ void wsrep_replay_transaction(THD *thd) ...@@ -6164,6 +6164,9 @@ void wsrep_replay_transaction(THD *thd)
unireg_abort(1); unireg_abort(1);
break; break;
} }
wsrep_cleanup_transaction(thd);
mysql_mutex_lock(&LOCK_wsrep_replaying); mysql_mutex_lock(&LOCK_wsrep_replaying);
wsrep_replaying--; wsrep_replaying--;
WSREP_DEBUG("replaying decreased: %d, thd: %lu", WSREP_DEBUG("replaying decreased: %d, thd: %lu",
...@@ -8567,21 +8570,24 @@ void wsrep_rollback_process(THD *thd) ...@@ -8567,21 +8570,24 @@ void wsrep_rollback_process(THD *thd)
extern "C" extern "C"
int wsrep_thd_is_brute_force(void *thd_ptr) int wsrep_thd_is_brute_force(void *thd_ptr)
{ {
/*
Brute force:
Appliers and replaying are running in REPL_RECV mode. TOI statements
in TOTAL_ORDER mode. Locally committing transaction that has got
past wsrep->pre_commit() without error is running in LOCAL_COMMIT mode.
Everything else is running in LOCAL_STATE and should not be considered
brute force.
*/
if (thd_ptr) { if (thd_ptr) {
switch (((THD *)thd_ptr)->wsrep_exec_mode) { switch (((THD *)thd_ptr)->wsrep_exec_mode) {
case LOCAL_STATE: case LOCAL_STATE: return 0;
{
if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING)
{
return 1;
}
return 0;
}
case REPL_RECV: return 1; case REPL_RECV: return 1;
case TOTAL_ORDER: return 2; case TOTAL_ORDER: return 2;
case LOCAL_COMMIT: return 3; case LOCAL_COMMIT: return 3;
} }
} }
DBUG_ASSERT(0);
return 0; return 0;
} }
extern "C" extern "C"
......
...@@ -142,6 +142,9 @@ bool trans_begin(THD *thd, uint flags) ...@@ -142,6 +142,9 @@ bool trans_begin(THD *thd, uint flags)
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
thd->server_status&= ~SERVER_STATUS_IN_TRANS; thd->server_status&= ~SERVER_STATUS_IN_TRANS;
res= test(ha_commit_trans(thd, TRUE)); res= test(ha_commit_trans(thd, TRUE));
#ifdef WITH_WSREP
wsrep_post_commit(thd, TRUE);
#endif /* WITH_WSREP */
} }
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
...@@ -194,6 +197,9 @@ bool trans_commit(THD *thd) ...@@ -194,6 +197,9 @@ bool trans_commit(THD *thd)
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
thd->server_status&= ~SERVER_STATUS_IN_TRANS; thd->server_status&= ~SERVER_STATUS_IN_TRANS;
res= ha_commit_trans(thd, TRUE); res= ha_commit_trans(thd, TRUE);
#ifdef WITH_WSREP
wsrep_post_commit(thd, TRUE);
#endif /* WITH_WSREP */
if (res) if (res)
/* /*
if res is non-zero, then ha_commit_trans has rolled back the if res is non-zero, then ha_commit_trans has rolled back the
...@@ -240,6 +246,9 @@ bool trans_commit_implicit(THD *thd) ...@@ -240,6 +246,9 @@ bool trans_commit_implicit(THD *thd)
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
thd->server_status&= ~SERVER_STATUS_IN_TRANS; thd->server_status&= ~SERVER_STATUS_IN_TRANS;
res= test(ha_commit_trans(thd, TRUE)); res= test(ha_commit_trans(thd, TRUE));
#ifdef WITH_WSREP
wsrep_post_commit(thd, TRUE);
#endif /* WITH_WSREP */
} }
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
...@@ -324,7 +333,14 @@ bool trans_commit_stmt(THD *thd) ...@@ -324,7 +333,14 @@ bool trans_commit_stmt(THD *thd)
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
res= ha_commit_trans(thd, FALSE); res= ha_commit_trans(thd, FALSE);
if (! thd->in_active_multi_stmt_transaction()) if (! thd->in_active_multi_stmt_transaction())
#ifdef WITH_WSREP
{
#endif /* WITH_WSREP */
thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation; thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation;
#ifdef WITH_WSREP
wsrep_post_commit(thd, FALSE);
}
#endif /* WITH_WSREP */
} }
if (res) if (res)
...@@ -737,6 +753,9 @@ bool trans_xa_commit(THD *thd) ...@@ -737,6 +753,9 @@ bool trans_xa_commit(THD *thd)
int r= ha_commit_trans(thd, TRUE); int r= ha_commit_trans(thd, TRUE);
if ((res= test(r))) if ((res= test(r)))
my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0)); my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
#ifdef WITH_WSREP
wsrep_post_commit(thd, TRUE);
#endif /* WITH_WSREP */
} }
else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE) else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE)
{ {
......
...@@ -32,34 +32,15 @@ extern "C" int thd_binlog_format(const MYSQL_THD thd); ...@@ -32,34 +32,15 @@ extern "C" int thd_binlog_format(const MYSQL_THD thd);
enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all);
/* /*
a post-commit cleanup on behalf of wsrep. Can't be a part of hton struct. Cleanup after local transaction commit/rollback, replay or TOI.
Is called by THD::transactions.cleanup()
*/ */
void wsrep_cleanup_transaction(THD *thd) void wsrep_cleanup_transaction(THD *thd)
{ {
if (thd->thread_id == 0) return; if (wsrep_emulate_bin_log) thd_binlog_trx_reset(thd);
if (thd->wsrep_exec_mode == LOCAL_COMMIT) thd->wsrep_trx_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
{ thd->wsrep_trx_seqno= WSREP_SEQNO_UNDEFINED;
if (thd->variables.wsrep_on && thd->wsrep_exec_mode= LOCAL_STATE;
thd->wsrep_conflict_state != MUST_REPLAY) return;
{
if (thd->wsrep_seqno_changed)
{
if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle))
{
DBUG_PRINT("wsrep", ("set committed fail"));
WSREP_WARN("set committed fail: %llu %d",
(long long)thd->real_id, thd->stmt_da->status());
}
}
//else
//WSREP_DEBUG("no trx handle for %s", thd->query());
thd_binlog_trx_reset(thd);
thd->wsrep_seqno_changed = false;
}
thd->wsrep_exec_mode= LOCAL_STATE;
}
thd->wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID;
} }
/* /*
...@@ -67,25 +48,63 @@ void wsrep_cleanup_transaction(THD *thd) ...@@ -67,25 +48,63 @@ void wsrep_cleanup_transaction(THD *thd)
*/ */
handlerton *wsrep_hton; handlerton *wsrep_hton;
/*
Registers wsrep hton at commit time if transaction has registered htons
for supported engine types.
Hton should not be registered for TOTAL_ORDER operations.
Registration is needed for both LOCAL_MODE and REPL_RECV transactions to run
commit in 2pc so that wsrep position gets properly recorded in storage
engines.
Note that all hton calls should immediately return for threads that are
in REPL_RECV mode as their states are controlled by wsrep appliers or
replaying code. Only threads in LOCAL_MODE should run wsrep callbacks
from hton methods.
*/
void wsrep_register_hton(THD* thd, bool all) void wsrep_register_hton(THD* thd, bool all)
{ {
THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; if (thd->wsrep_exec_mode != TOTAL_ORDER)
for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next())
{ {
if (i->ht()->db_type == DB_TYPE_INNODB) THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next())
{ {
trans_register_ha(thd, all, wsrep_hton); if (i->ht()->db_type == DB_TYPE_INNODB)
/* follow innodb read/write settting */
if (i->is_trx_read_write())
{ {
thd->ha_data[wsrep_hton->slot].ha_info[all].set_trx_read_write(); trans_register_ha(thd, all, wsrep_hton);
/* follow innodb read/write settting */
if (i->is_trx_read_write())
{
thd->ha_data[wsrep_hton->slot].ha_info[all].set_trx_read_write();
}
break;
} }
break;
} }
} }
} }
/*
Calls wsrep->post_commit() for locally executed transactions that have
got seqno from provider (must commit) and don't require replaying.
*/
void wsrep_post_commit(THD* thd, bool all)
{
if (thd->wsrep_exec_mode == LOCAL_COMMIT)
{
DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED);
if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle))
{
DBUG_PRINT("wsrep", ("set committed fail"));
WSREP_WARN("set committed fail: %llu %d",
(long long)thd->real_id, thd->stmt_da->status());
}
wsrep_cleanup_transaction(thd);
}
}
/* /*
wsrep exploits binlog's caches even if binlogging itself is not wsrep exploits binlog's caches even if binlogging itself is not
activated. In such case connection close needs calling activated. In such case connection close needs calling
...@@ -97,7 +116,13 @@ static int ...@@ -97,7 +116,13 @@ static int
wsrep_close_connection(handlerton* hton, THD* thd) wsrep_close_connection(handlerton* hton, THD* thd)
{ {
DBUG_ENTER("wsrep_close_connection"); DBUG_ENTER("wsrep_close_connection");
if (thd_get_ha_data(thd, binlog_hton) != NULL)
if (thd->wsrep_exec_mode == REPL_RECV)
{
DBUG_RETURN(0);
}
if (wsrep_emulate_bin_log && thd_get_ha_data(thd, binlog_hton) != NULL)
binlog_hton->close_connection (binlog_hton, thd); binlog_hton->close_connection (binlog_hton, thd);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -113,10 +138,17 @@ wsrep_close_connection(handlerton* hton, THD* thd) ...@@ -113,10 +138,17 @@ wsrep_close_connection(handlerton* hton, THD* thd)
*/ */
static int wsrep_prepare(handlerton *hton, THD *thd, bool all) static int wsrep_prepare(handlerton *hton, THD *thd, bool all)
{ {
#ifndef DBUG_OFF
//wsrep_seqno_t old = thd->wsrep_trx_seqno;
#endif
DBUG_ENTER("wsrep_prepare"); DBUG_ENTER("wsrep_prepare");
if (thd->wsrep_exec_mode == REPL_RECV)
{
DBUG_RETURN(0);
}
DBUG_ASSERT(thd->ha_data[wsrep_hton->slot].ha_info[all].is_trx_read_write());
DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
DBUG_ASSERT(thd->wsrep_trx_seqno == WSREP_SEQNO_UNDEFINED);
if ((all || if ((all ||
!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd))) (thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd)))
...@@ -138,12 +170,26 @@ static int wsrep_prepare(handlerton *hton, THD *thd, bool all) ...@@ -138,12 +170,26 @@ static int wsrep_prepare(handlerton *hton, THD *thd, bool all)
static int wsrep_savepoint_set(handlerton *hton, THD *thd, void *sv) static int wsrep_savepoint_set(handlerton *hton, THD *thd, void *sv)
{ {
DBUG_ENTER("wsrep_savepoint_set");
if (thd->wsrep_exec_mode == REPL_RECV)
{
DBUG_RETURN(0);
}
if (!wsrep_emulate_bin_log) return 0; if (!wsrep_emulate_bin_log) return 0;
int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv); int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv);
return rcode; return rcode;
} }
static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv) static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
{ {
DBUG_ENTER("wsrep_savepoint_rollback");
if (thd->wsrep_exec_mode == REPL_RECV)
{
DBUG_RETURN(0);
}
if (!wsrep_emulate_bin_log) return 0; if (!wsrep_emulate_bin_log) return 0;
int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv); int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv);
return rcode; return rcode;
...@@ -152,6 +198,12 @@ static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv) ...@@ -152,6 +198,12 @@ static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
static int wsrep_rollback(handlerton *hton, THD *thd, bool all) static int wsrep_rollback(handlerton *hton, THD *thd, bool all)
{ {
DBUG_ENTER("wsrep_rollback"); DBUG_ENTER("wsrep_rollback");
if (thd->wsrep_exec_mode == REPL_RECV)
{
DBUG_RETURN(0);
}
mysql_mutex_lock(&thd->LOCK_wsrep_thd); mysql_mutex_lock(&thd->LOCK_wsrep_thd);
if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY)) (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY))
...@@ -162,25 +214,54 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all) ...@@ -162,25 +214,54 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all)
WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
(long long)thd->real_id, thd->query()); (long long)thd->real_id, thd->query());
} }
wsrep_cleanup_transaction(thd);
} }
int rcode = 0;
if (!wsrep_emulate_bin_log)
{
if (all) thd_binlog_trx_reset(thd);
}
mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
DBUG_RETURN(rcode); DBUG_RETURN(0);
} }
int wsrep_commit(handlerton *hton, THD *thd, bool all) int wsrep_commit(handlerton *hton, THD *thd, bool all)
{ {
DBUG_ENTER("wsrep_commit"); DBUG_ENTER("wsrep_commit");
if (thd->wsrep_exec_mode == REPL_RECV)
{
DBUG_RETURN(0);
}
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY))
{
if (thd->wsrep_exec_mode == LOCAL_COMMIT)
{
DBUG_ASSERT(thd->ha_data[wsrep_hton->slot].ha_info[all].is_trx_read_write());
/*
Call to wsrep->post_commit() (moved to wsrep_post_commit()) must
be done only after commit has done for all involved htons.
*/
DBUG_PRINT("wsrep", ("commit"));
}
else
{
/*
Transaction didn't go through wsrep->pre_commit() so just roll back
possible changes to clean state.
*/
if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle))
{
DBUG_PRINT("wsrep", ("setting rollback fail"));
WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
(long long)thd->real_id, thd->query());
}
wsrep_cleanup_transaction(thd);
}
}
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
extern Rpl_filter* binlog_filter; extern Rpl_filter* binlog_filter;
extern my_bool opt_log_slave_updates; extern my_bool opt_log_slave_updates;
extern void wsrep_write_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len); extern void wsrep_write_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len);
...@@ -305,9 +386,6 @@ wsrep_run_wsrep_commit( ...@@ -305,9 +386,6 @@ wsrep_run_wsrep_commit(
} }
if (data_len == 0) if (data_len == 0)
{ {
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_exec_mode = LOCAL_COMMIT;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
if (thd->stmt_da->is_ok() && if (thd->stmt_da->is_ok() &&
thd->stmt_da->affected_rows() > 0 && thd->stmt_da->affected_rows() > 0 &&
!binlog_filter->is_on()) !binlog_filter->is_on())
...@@ -356,9 +434,11 @@ wsrep_run_wsrep_commit( ...@@ -356,9 +434,11 @@ wsrep_run_wsrep_commit(
WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s", WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s",
thd->thread_id, thd->query()); thd->thread_id, thd->query());
wsrep_write_rbr_buf(thd, rbr_data, data_len); wsrep_write_rbr_buf(thd, rbr_data, data_len);
rcode = WSREP_OK; rcode = WSREP_TRX_FAIL;
break; break;
case WSREP_BF_ABORT: case WSREP_BF_ABORT:
WSREP_DEBUG("thd %lu seqno %lld BF aborted by provider, will replay",
thd->thread_id, (long long)thd->wsrep_trx_seqno);
mysql_mutex_lock(&thd->LOCK_wsrep_thd); mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_conflict_state = MUST_REPLAY; thd->wsrep_conflict_state = MUST_REPLAY;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
...@@ -369,7 +449,6 @@ wsrep_run_wsrep_commit( ...@@ -369,7 +449,6 @@ wsrep_run_wsrep_commit(
mysql_mutex_unlock(&LOCK_wsrep_replaying); mysql_mutex_unlock(&LOCK_wsrep_replaying);
break; break;
default: default:
thd->wsrep_seqno_changed = true;
break; break;
} }
} else { } else {
...@@ -387,7 +466,24 @@ wsrep_run_wsrep_commit( ...@@ -387,7 +466,24 @@ wsrep_run_wsrep_commit(
mysql_mutex_lock(&thd->LOCK_wsrep_thd); mysql_mutex_lock(&thd->LOCK_wsrep_thd);
switch(rcode) { switch(rcode) {
case 0: case 0:
thd->wsrep_exec_mode = LOCAL_COMMIT; /*
About MUST_ABORT: We assume that even if thd conflict state was set
to MUST_ABORT, underlying transaction was not rolled back or marked
as deadlock victim in QUERY_COMMITTING state. Conflict state is
set to NO_CONFLICT and commit proceeds as usual.
*/
if (thd->wsrep_conflict_state == MUST_ABORT)
thd->wsrep_conflict_state= NO_CONFLICT;
if (thd->wsrep_conflict_state != NO_CONFLICT)
{
WSREP_WARN("thd %lu seqno %lld: conflict state %d after post commit",
thd->thread_id,
(long long)thd->wsrep_trx_seqno,
thd->wsrep_conflict_state);
}
thd->wsrep_exec_mode= LOCAL_COMMIT;
DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED);
/* Override XID iff it was generated by mysql */ /* Override XID iff it was generated by mysql */
if (thd->transaction.xid_state.xid.get_my_xid()) if (thd->transaction.xid_state.xid.get_my_xid())
{ {
...@@ -396,10 +492,10 @@ wsrep_run_wsrep_commit( ...@@ -396,10 +492,10 @@ wsrep_run_wsrep_commit(
thd->wsrep_trx_seqno); thd->wsrep_trx_seqno);
} }
DBUG_PRINT("wsrep", ("replicating commit success")); DBUG_PRINT("wsrep", ("replicating commit success"));
break; break;
case WSREP_TRX_FAIL:
case WSREP_BF_ABORT: case WSREP_BF_ABORT:
DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED);
case WSREP_TRX_FAIL:
WSREP_DEBUG("commit failed for reason: %d", rcode); WSREP_DEBUG("commit failed for reason: %d", rcode);
DBUG_PRINT("wsrep", ("replicating commit fail")); DBUG_PRINT("wsrep", ("replicating commit fail"));
......
...@@ -1144,14 +1144,14 @@ static void wsrep_TOI_end(THD *thd) { ...@@ -1144,14 +1144,14 @@ static void wsrep_TOI_end(THD *thd) {
wsrep_status_t ret; wsrep_status_t ret;
wsrep_to_isolation--; wsrep_to_isolation--;
WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void") thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void");
if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno); WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno);
} }
else { else {
WSREP_WARN("TO isolation end failed for: %d, sql: %s", WSREP_WARN("TO isolation end failed for: %d, sql: %s",
ret, (thd->query()) ? thd->query() : "void"); ret, (thd->query()) ? thd->query() : "void");
} }
} }
static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
...@@ -1222,14 +1222,20 @@ static void wsrep_RSU_end(THD *thd) ...@@ -1222,14 +1222,20 @@ static void wsrep_RSU_end(THD *thd)
return; return;
} }
thd->variables.wsrep_on = 1; thd->variables.wsrep_on = 1;
return;
} }
int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
const TABLE_LIST* table_list) const TABLE_LIST* table_list)
{ {
/*
No isolation for applier or replaying threads.
*/
if (thd->wsrep_exec_mode == REPL_RECV) return 0;
int ret= 0; int ret= 0;
mysql_mutex_lock(&thd->LOCK_wsrep_thd); mysql_mutex_lock(&thd->LOCK_wsrep_thd);
if (thd->wsrep_conflict_state == MUST_ABORT) if (thd->wsrep_conflict_state == MUST_ABORT)
{ {
WSREP_INFO("thread: %lu, %s has been aborted due to multi-master conflict", WSREP_INFO("thread: %lu, %s has been aborted due to multi-master conflict",
...@@ -1239,6 +1245,9 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, ...@@ -1239,6 +1245,9 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
} }
mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
DBUG_ASSERT(thd->wsrep_trx_seqno == WSREP_SEQNO_UNDEFINED);
if (wsrep_debug && thd->mdl_context.has_locks()) if (wsrep_debug && thd->mdl_context.has_locks())
{ {
WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lu", WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lu",
...@@ -1265,14 +1274,16 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, ...@@ -1265,14 +1274,16 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
return ret; return ret;
} }
void wsrep_to_isolation_end(THD *thd) { void wsrep_to_isolation_end(THD *thd)
if (thd->wsrep_exec_mode==TOTAL_ORDER) {
if (thd->wsrep_exec_mode == TOTAL_ORDER)
{ {
switch(wsrep_OSU_method_options) switch(wsrep_OSU_method_options)
{ {
case WSREP_OSU_TOI: return wsrep_TOI_end(thd); case WSREP_OSU_TOI: wsrep_TOI_end(thd); break;
case WSREP_OSU_RSU: return wsrep_RSU_end(thd); case WSREP_OSU_RSU: wsrep_RSU_end(thd); break;
} }
wsrep_cleanup_transaction(thd);
} }
} }
......
...@@ -33,7 +33,7 @@ class THD; ...@@ -33,7 +33,7 @@ class THD;
LOCAL_STATE, LOCAL_STATE,
REPL_RECV, REPL_RECV,
TOTAL_ORDER, TOTAL_ORDER,
LOCAL_COMMIT, LOCAL_COMMIT
}; };
enum wsrep_query_state { enum wsrep_query_state {
QUERY_IDLE, QUERY_IDLE,
...@@ -311,7 +311,7 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); ...@@ -311,7 +311,7 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all);
class Ha_trx_info; class Ha_trx_info;
struct THD_TRANS; struct THD_TRANS;
void wsrep_register_hton(THD* thd, bool all); void wsrep_register_hton(THD* thd, bool all);
void wsrep_post_commit(THD* thd, bool all);
void wsrep_replication_process(THD *thd); void wsrep_replication_process(THD *thd);
void wsrep_rollback_process(THD *thd); void wsrep_rollback_process(THD *thd);
void wsrep_brute_force_killer(THD *thd); void wsrep_brute_force_killer(THD *thd);
...@@ -374,7 +374,7 @@ struct TABLE_LIST; ...@@ -374,7 +374,7 @@ struct TABLE_LIST;
int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
const TABLE_LIST* table_list); const TABLE_LIST* table_list);
void wsrep_to_isolation_end(THD *thd); void wsrep_to_isolation_end(THD *thd);
void wsrep_cleanup_transaction(THD *thd);
void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*); void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*);
void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*); void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*);
int wsrep_to_buf_helper( int wsrep_to_buf_helper(
......
...@@ -795,6 +795,24 @@ trx_sys_print_mysql_binlog_offset(void) ...@@ -795,6 +795,24 @@ trx_sys_print_mysql_binlog_offset(void)
#ifdef WITH_WSREP #ifdef WITH_WSREP
#ifdef UNIV_DEBUG
static long long trx_sys_cur_xid_seqno = -1;
static unsigned char trx_sys_cur_xid_uuid[16];
long long read_wsrep_xid_seqno(const XID* xid)
{
long long seqno;
memcpy(&seqno, xid->data + 24, sizeof(long long));
return seqno;
}
void read_wsrep_xid_uuid(const XID* xid, unsigned char* buf)
{
memcpy(buf, xid->data + 8, 16);
}
#endif /* UNIV_DEBUG */
void void
trx_sys_update_wsrep_checkpoint( trx_sys_update_wsrep_checkpoint(
const XID* xid, /*!< in: transaction XID */ const XID* xid, /*!< in: transaction XID */
...@@ -802,6 +820,25 @@ trx_sys_update_wsrep_checkpoint( ...@@ -802,6 +820,25 @@ trx_sys_update_wsrep_checkpoint(
{ {
trx_sysf_t* sys_header; trx_sysf_t* sys_header;
#ifdef UNIV_DEBUG
{
/* Check that seqno is monotonically increasing */
unsigned char xid_uuid[16];
long long xid_seqno = read_wsrep_xid_seqno(xid);
read_wsrep_xid_uuid(xid, xid_uuid);
if (!memcmp(xid_uuid, trx_sys_cur_xid_uuid, 8))
{
ut_ad(xid_seqno > trx_sys_cur_xid_seqno);
trx_sys_cur_xid_seqno = xid_seqno;
}
else
{
memcpy(trx_sys_cur_xid_uuid, xid_uuid, 16);
}
trx_sys_cur_xid_seqno = xid_seqno;
}
#endif /* UNIV_DEBUG */
ut_ad(xid && mtr); ut_ad(xid && mtr);
ut_a(xid->formatID == -1 || wsrep_is_wsrep_xid(xid)); ut_a(xid->formatID == -1 || wsrep_is_wsrep_xid(xid));
......
...@@ -966,6 +966,24 @@ trx_sys_print_mysql_binlog_offset(void) ...@@ -966,6 +966,24 @@ trx_sys_print_mysql_binlog_offset(void)
#ifdef WITH_WSREP #ifdef WITH_WSREP
#ifdef UNIV_DEBUG
static long long trx_sys_cur_xid_seqno = -1;
static unsigned char trx_sys_cur_xid_uuid[16];
long long read_wsrep_xid_seqno(const XID* xid)
{
long long seqno;
memcpy(&seqno, xid->data + 24, sizeof(long long));
return seqno;
}
void read_wsrep_xid_uuid(const XID* xid, unsigned char* buf)
{
memcpy(buf, xid->data + 8, 16);
}
#endif /* UNIV_DEBUG */
void void
trx_sys_update_wsrep_checkpoint( trx_sys_update_wsrep_checkpoint(
const XID* xid, /*!< in: transaction XID */ const XID* xid, /*!< in: transaction XID */
...@@ -973,6 +991,25 @@ trx_sys_update_wsrep_checkpoint( ...@@ -973,6 +991,25 @@ trx_sys_update_wsrep_checkpoint(
{ {
trx_sysf_t* sys_header; trx_sysf_t* sys_header;
#ifdef UNIV_DEBUG
{
/* Check that seqno is monotonically increasing */
unsigned char xid_uuid[16];
long long xid_seqno = read_wsrep_xid_seqno(xid);
read_wsrep_xid_uuid(xid, xid_uuid);
if (!memcmp(xid_uuid, trx_sys_cur_xid_uuid, 8))
{
ut_ad(xid_seqno > trx_sys_cur_xid_seqno);
trx_sys_cur_xid_seqno = xid_seqno;
}
else
{
memcpy(trx_sys_cur_xid_uuid, xid_uuid, 16);
}
trx_sys_cur_xid_seqno = xid_seqno;
}
#endif /* UNIV_DEBUG */
ut_ad(xid && mtr); ut_ad(xid && mtr);
ut_a(xid->formatID == -1 || wsrep_is_wsrep_xid(xid)); ut_a(xid->formatID == -1 || wsrep_is_wsrep_xid(xid));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment