Commit eb872ceb authored by Teemu Ollakka's avatar Teemu Ollakka Committed by Jan Lindström

Fixed wsrep replaying for stored procedures (#1256)

- Changed replaying to always allocate a separate THD object
  for applying log events. This is to avoid tampering original
  THD state during replay process.
- Return success from sp_instr_stmt::exec_core() if replaying
  succeeds.
- Do not push warnings/errors into diagnostics area if the
  transaction must be replayed. This is to avoid reporting
  transient errors to the client.

Added two tests galera_sp_bf_abort, galera_sp_insert_parallel.
Wsrep-lib position updated.
parent fe62ff6e
connection node_2;
connection node_1;
connection node_1;
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 CHAR(1));
connect node_1a, 127.0.0.1, root, , test, $NODE_MYPORT_1;
connection node_1a;
SET SESSION wsrep_sync_wait = 0;
connection node_1;
CREATE PROCEDURE proc_update_insert()
BEGIN
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
connection node_1a;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
connection node_2;
INSERT INTO t1 VALUES (2, 'c');
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
connection node_1;
CALL proc_update_insert;
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
connection node_1;
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
connection node_1;
CREATE PROCEDURE proc_update_insert_with_exit_handler()
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
connection node_1a;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
connection node_2;
INSERT INTO t1 VALUES (2, 'c');
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
connection node_1;
CALL proc_update_insert_with_exit_handler;
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
connection node_1;
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
connection node_1;
CREATE PROCEDURE proc_update_insert_with_continue_handler()
BEGIN
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END;
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
connection node_1a;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
connection node_2;
INSERT INTO t1 VALUES (2, 'c');
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
connection node_1;
CALL proc_update_insert_with_continue_handler;
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
connection node_1;
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
connection node_1;
CREATE PROCEDURE proc_update_insert_transaction()
BEGIN
START TRANSACTION;
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
COMMIT;
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
connection node_1a;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
connection node_2;
INSERT INTO t1 VALUES (2, 'c');
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
connection node_1;
CALL proc_update_insert_transaction;
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
connection node_1;
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
connection node_1;
CREATE PROCEDURE proc_update_insert_transaction_with_continue_handler()
BEGIN
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END;
START TRANSACTION;
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
COMMIT;
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
connection node_1a;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
connection node_2;
INSERT INTO t1 VALUES (2, 'c');
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
connection node_1;
CALL proc_update_insert_transaction_with_continue_handler;
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
connection node_1;
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
connection node_1;
CREATE PROCEDURE proc_update_insert_transaction_with_exit_handler()
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN END;
START TRANSACTION;
UPDATE t1 SET f2 = 'b';
INSERT INTO t1 VALUES (4, 'd');
COMMIT;
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
connection node_1a;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
connection node_2;
INSERT INTO t1 VALUES (2, 'c');
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
connection node_1;
CALL proc_update_insert_transaction_with_exit_handler;
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
connection node_1;
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 b
2 c
3 b
4 d
wsrep_local_replays
1
DELETE FROM t1;
connection node_1;
CREATE PROCEDURE proc_insert_insert_conflict()
BEGIN
INSERT INTO t1 VALUES (2, 'd');
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
connection node_1a;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
connection node_2;
INSERT INTO t1 VALUES (2, 'c');
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
connection node_1;
CALL proc_insert_insert_conflict;
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
connection node_1;
Got one of the listed errors
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 a
2 c
3 a
wsrep_local_replays
1
DELETE FROM t1;
connection node_1;
CREATE PROCEDURE proc_insert_insert_conflict_with_exit_handler()
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION SELECT "Conflict exit handler";
INSERT INTO t1 VALUES (2, 'd');
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
connection node_1a;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
connection node_2;
INSERT INTO t1 VALUES (2, 'c');
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
connection node_1;
CALL proc_insert_insert_conflict_with_exit_handler;
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
connection node_1;
Conflict exit handler
Conflict exit handler
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 a
2 c
3 a
wsrep_local_replays
1
DELETE FROM t1;
connection node_1;
CREATE PROCEDURE proc_insert_insert_conflict_with_continue_handler()
BEGIN
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SELECT "Conflict continue handler";
INSERT INTO t1 VALUES (2, 'd');
INSERT INTO t1 VALUES (4, 'd');
END|
INSERT INTO t1 VALUES (1, 'a'), (3, 'a');
SET SESSION wsrep_sync_wait = 0;
connection node_1a;
SET GLOBAL wsrep_provider_options = 'dbug=d,apply_monitor_slave_enter_sync';
connection node_2;
INSERT INTO t1 VALUES (2, 'c');
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'dbug=d,after_replicate_sync';
connection node_1;
CALL proc_insert_insert_conflict_with_continue_handler;
connection node_1a;
SET SESSION wsrep_on = 0;
SET SESSION wsrep_on = 1;
SET GLOBAL wsrep_provider_options = 'dbug=';
SET GLOBAL wsrep_provider_options = 'signal=apply_monitor_slave_enter_sync';
SET GLOBAL wsrep_provider_options = 'signal=after_replicate_sync';
connection node_1;
Conflict continue handler
Conflict continue handler
SET SESSION wsrep_sync_wait = default;
SELECT * FROM t1;
f1 f2
1 a
2 c
3 a
4 d
wsrep_local_replays
1
DELETE FROM t1;
DROP PROCEDURE proc_update_insert;
DROP PROCEDURE proc_update_insert_with_continue_handler;
DROP PROCEDURE proc_update_insert_with_exit_handler;
DROP PROCEDURE proc_update_insert_transaction;
DROP PROCEDURE proc_update_insert_transaction_with_continue_handler;
DROP PROCEDURE proc_update_insert_transaction_with_exit_handler;
DROP PROCEDURE proc_insert_insert_conflict;
DROP PROCEDURE proc_insert_insert_conflict_with_exit_handler;
DROP PROCEDURE proc_insert_insert_conflict_with_continue_handler;
DROP TABLE t1;
connection node_2;
connection node_1;
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 INTEGER) ENGINE=InnoDB;
CREATE PROCEDURE proc_insert()
BEGIN
DECLARE i INT;
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END;
SET i = 0;
WHILE i < 1000 DO
INSERT IGNORE INTO t1 (f1, f2)
VALUES (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15));
SET i = i + 1;
END WHILE;
END|
connection node_1;
SELECT 0;
0
0
SET SESSION wsrep_sync_wait = 0;
CALL proc_insert;
connection node_2;
SELECT 0;
0
0
SET SESSION wsrep_sync_wait = 0;
CALL proc_insert;
connection node_1;
SET SESSION wsrep_sync_wait = default;
connection node_2;
SET SESSION wsrep_sync_wait = default;
connection node_1;
DROP PROCEDURE proc_insert;
DROP TABLE t1;
--source include/galera_cluster.inc
--source include/have_innodb.inc
CREATE TABLE t1 (f1 INTEGER PRIMARY KEY, f2 INTEGER) ENGINE=InnoDB;
DELIMITER |;
CREATE PROCEDURE proc_insert()
BEGIN
DECLARE i INT;
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION BEGIN END;
SET i = 0;
WHILE i < 1000 DO
INSERT IGNORE INTO t1 (f1, f2)
VALUES (FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15)),
(FLOOR(1 + RAND() * 65535), FLOOR(1 + RAND() * 15));
SET i = i + 1;
END WHILE;
END|
DELIMITER ;|
--connection node_1
SELECT 0;
SET SESSION wsrep_sync_wait = 0;
--send CALL proc_insert
--connection node_2
SELECT 0;
SET SESSION wsrep_sync_wait = 0;
--send CALL proc_insert
--connection node_1
--error 0,ER_LOCK_DEADLOCK,ER_QUERY_INTERRUPTED
--disable_warnings
--reap
--enable_warnings
SET SESSION wsrep_sync_wait = default;
--connection node_2
--error 0,ER_LOCK_DEADLOCK,ER_QUERY_INTERRUPTED
--disable_warnings
--reap
--enable_warnings
SET SESSION wsrep_sync_wait = default;
--connection node_1
DROP PROCEDURE proc_insert;
DROP TABLE t1;
...@@ -3605,32 +3605,45 @@ sp_instr_stmt::exec_core(THD *thd, uint *nextp) ...@@ -3605,32 +3605,45 @@ sp_instr_stmt::exec_core(THD *thd, uint *nextp)
3); 3);
int res= mysql_execute_command(thd); int res= mysql_execute_command(thd);
#ifdef WITH_WSREP #ifdef WITH_WSREP
if ((thd->is_fatal_error || thd->killed_errno()) && if (WSREP(thd))
(thd->wsrep_trx().state() == wsrep::transaction::s_executing))
{ {
/* if ((thd->is_fatal_error || thd->killed_errno()) &&
SP was killed, and it is not due to a wsrep conflict. (thd->wsrep_trx().state() == wsrep::transaction::s_executing))
We skip after_statement hook at this point because {
otherwise it clears the error, and cleans up the /*
whole transaction. For now we just return and finish SP was killed, and it is not due to a wsrep conflict.
our handling once we are back to mysql_parse. We skip after_statement hook at this point because
*/ otherwise it clears the error, and cleans up the
WSREP_DEBUG("Skipping after_command hook for killed SP"); whole transaction. For now we just return and finish
} our handling once we are back to mysql_parse.
else */
{ WSREP_DEBUG("Skipping after_command hook for killed SP");
(void) wsrep_after_statement(thd); }
/* else
Final wsrep error status for statement is known only after
wsrep_after_statement() call. If the error is set, override
error in thd diagnostics area and reset wsrep client_state error
so that the error does not get propagated via client-server protocol.
*/
if (wsrep_current_error(thd))
{ {
wsrep_override_error(thd, wsrep_current_error(thd), const bool must_replay= wsrep_must_replay(thd);
wsrep_current_error_status(thd)); (void) wsrep_after_statement(thd);
thd->wsrep_cs().reset_error(); /*
Reset the return code to zero if the transaction was
replayed succesfully.
*/
if (res && must_replay && !wsrep_current_error(thd))
res= 0;
/*
Final wsrep error status for statement is known only after
wsrep_after_statement() call. If the error is set, override
error in thd diagnostics area and reset wsrep client_state error
so that the error does not get propagated via client-server protocol.
*/
if (wsrep_current_error(thd))
{
wsrep_override_error(thd, wsrep_current_error(thd),
wsrep_current_error_status(thd));
thd->wsrep_cs().reset_error();
/* Reset also thd->killed if it has been set during BF abort. */
if (thd->killed == KILL_QUERY)
thd->reset_killed();
}
} }
} }
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
......
...@@ -1020,6 +1020,15 @@ Sql_condition* THD::raise_condition(uint sql_errno, ...@@ -1020,6 +1020,15 @@ Sql_condition* THD::raise_condition(uint sql_errno,
if (!(variables.option_bits & OPTION_SQL_NOTES) && if (!(variables.option_bits & OPTION_SQL_NOTES) &&
(level == Sql_condition::WARN_LEVEL_NOTE)) (level == Sql_condition::WARN_LEVEL_NOTE))
DBUG_RETURN(NULL); DBUG_RETURN(NULL);
#ifdef WITH_WSREP
/*
Suppress warnings/errors if the wsrep THD is going to replay. The
deadlock/interrupted errors may be transitient and should not be
reported to the client.
*/
if (wsrep_must_replay(this))
DBUG_RETURN(NULL);
#endif /* WITH_WSREP */
da->opt_clear_warning_info(query_id); da->opt_clear_warning_info(query_id);
......
...@@ -250,20 +250,38 @@ void Wsrep_client_service::will_replay() ...@@ -250,20 +250,38 @@ void Wsrep_client_service::will_replay()
enum wsrep::provider::status Wsrep_client_service::replay() enum wsrep::provider::status Wsrep_client_service::replay()
{ {
DBUG_ASSERT(m_thd == current_thd); DBUG_ASSERT(m_thd == current_thd);
Wsrep_replayer_service replayer_service(m_thd); DBUG_ENTER("Wsrep_client_service::replay");
wsrep::provider& provider(m_thd->wsrep_cs().provider());
mysql_mutex_lock(&m_thd->LOCK_thd_data); /*
m_thd->killed= NOT_KILLED; Allocate separate THD for replaying to avoid tampering
mysql_mutex_unlock(&m_thd->LOCK_thd_data); original THD state during replication event applying.
enum wsrep::provider::status ret= */
provider.replay(m_thd->wsrep_trx().ws_handle(), &replayer_service); THD *replayer_thd= new THD(true, true);
replayer_service.replay_status(ret); replayer_thd->thread_stack= m_thd->thread_stack;
replayer_thd->real_id= pthread_self();
replayer_thd->prior_thr_create_utime=
replayer_thd->start_utime= microsecond_interval_timer();
replayer_thd->set_command(COM_SLEEP);
replayer_thd->reset_for_next_command(true);
enum wsrep::provider::status ret;
{
Wsrep_replayer_service replayer_service(replayer_thd, m_thd);
wsrep::provider& provider(replayer_thd->wsrep_cs().provider());
ret= provider.replay(replayer_thd->wsrep_trx().ws_handle(),
&replayer_service);
replayer_service.replay_status(ret);
}
delete replayer_thd;
mysql_mutex_lock(&LOCK_wsrep_replaying); mysql_mutex_lock(&LOCK_wsrep_replaying);
--wsrep_replaying; --wsrep_replaying;
mysql_cond_broadcast(&COND_wsrep_replaying); mysql_cond_broadcast(&COND_wsrep_replaying);
mysql_mutex_unlock(&LOCK_wsrep_replaying); mysql_mutex_unlock(&LOCK_wsrep_replaying);
return ret; DBUG_RETURN(ret);
} }
void Wsrep_client_service::wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock) void Wsrep_client_service::wait_for_replayers(wsrep::unique_lock<wsrep::mutex>& lock)
......
...@@ -519,83 +519,88 @@ bool Wsrep_applier_service::check_exit_status() const ...@@ -519,83 +519,88 @@ bool Wsrep_applier_service::check_exit_status() const
Replayer service Replayer service
*****************************************************************************/ *****************************************************************************/
Wsrep_replayer_service::Wsrep_replayer_service(THD* thd) Wsrep_replayer_service::Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd)
: Wsrep_high_priority_service(thd) : Wsrep_high_priority_service(replayer_thd)
, m_orig_thd(orig_thd)
, m_da_shadow() , m_da_shadow()
, m_replay_status() , m_replay_status()
{ {
/* Response must not have been sent to client */ /* Response must not have been sent to client */
DBUG_ASSERT(!thd->get_stmt_da()->is_sent()); DBUG_ASSERT(!orig_thd->get_stmt_da()->is_sent());
/* PS reprepare observer should have been removed already /* PS reprepare observer should have been removed already
open_table() will fail if we have dangling observer here */ open_table() will fail if we have dangling observer here */
DBUG_ASSERT(!thd->m_reprepare_observer); DBUG_ASSERT(!orig_thd->m_reprepare_observer);
/* Replaying should happen always from after_statement() hook /* Replaying should happen always from after_statement() hook
after rollback, which should guarantee that there are no after rollback, which should guarantee that there are no
transactional locks */ transactional locks */
DBUG_ASSERT(!thd->mdl_context.has_transactional_locks()); DBUG_ASSERT(!orig_thd->mdl_context.has_transactional_locks());
/* Make a shadow copy of diagnostics area and reset */ /* Make a shadow copy of diagnostics area and reset */
m_da_shadow.status= thd->get_stmt_da()->status(); m_da_shadow.status= orig_thd->get_stmt_da()->status();
if (m_da_shadow.status == Diagnostics_area::DA_OK) if (m_da_shadow.status == Diagnostics_area::DA_OK)
{ {
m_da_shadow.affected_rows= thd->get_stmt_da()->affected_rows(); m_da_shadow.affected_rows= orig_thd->get_stmt_da()->affected_rows();
m_da_shadow.last_insert_id= thd->get_stmt_da()->last_insert_id(); m_da_shadow.last_insert_id= orig_thd->get_stmt_da()->last_insert_id();
strmake(m_da_shadow.message, thd->get_stmt_da()->message(), strmake(m_da_shadow.message, orig_thd->get_stmt_da()->message(),
sizeof(m_da_shadow.message) - 1); sizeof(m_da_shadow.message) - 1);
} }
thd->get_stmt_da()->reset_diagnostics_area(); orig_thd->get_stmt_da()->reset_diagnostics_area();
/* Release explicit locks */ /* Release explicit locks */
if (thd->locked_tables_mode && thd->lock) if (orig_thd->locked_tables_mode && orig_thd->lock)
{ {
WSREP_WARN("releasing table lock for replaying (%llu)", WSREP_WARN("releasing table lock for replaying (%llu)",
thd->thread_id); orig_thd->thread_id);
thd->locked_tables_list.unlock_locked_tables(thd); orig_thd->locked_tables_list.unlock_locked_tables(orig_thd);
thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); orig_thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
} }
thd_proc_info(orig_thd, "wsrep replaying trx");
/* /*
Replaying will call MYSQL_START_STATEMENT when handling Swith execution context to replayer_thd and prepare it for
BEGIN Query_log_event so end statement must be called before replay execution.
replaying.
*/ */
MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da()); orig_thd->reset_globals();
thd->m_statement_psi= NULL; replayer_thd->store_globals();
thd->m_digest= NULL; wsrep_open(replayer_thd);
thd_proc_info(thd, "wsrep replaying trx"); wsrep_before_command(replayer_thd);
replayer_thd->wsrep_cs().clone_transaction_for_replay(orig_thd->wsrep_trx());
} }
Wsrep_replayer_service::~Wsrep_replayer_service() Wsrep_replayer_service::~Wsrep_replayer_service()
{ {
THD* thd= m_thd; THD* replayer_thd= m_thd;
DBUG_ASSERT(!thd->get_stmt_da()->is_sent()); THD* orig_thd= m_orig_thd;
DBUG_ASSERT(!thd->get_stmt_da()->is_set());
/* Store replay result/state to original thread wsrep client
state and switch execution context back to original. */
orig_thd->wsrep_cs().after_replay(replayer_thd->wsrep_trx());
wsrep_after_apply(replayer_thd);
wsrep_after_command_ignore_result(replayer_thd);
wsrep_close(replayer_thd);
replayer_thd->reset_globals();
orig_thd->store_globals();
DBUG_ASSERT(!orig_thd->get_stmt_da()->is_sent());
DBUG_ASSERT(!orig_thd->get_stmt_da()->is_set());
if (m_replay_status == wsrep::provider::success) if (m_replay_status == wsrep::provider::success)
{ {
DBUG_ASSERT(thd->wsrep_cs().current_error() == wsrep::e_success); DBUG_ASSERT(replayer_thd->wsrep_cs().current_error() == wsrep::e_success);
thd->killed= NOT_KILLED; orig_thd->killed= NOT_KILLED;
if (m_da_shadow.status == Diagnostics_area::DA_OK) my_ok(orig_thd, m_da_shadow.affected_rows, m_da_shadow.last_insert_id);
{
my_ok(thd,
m_da_shadow.affected_rows,
m_da_shadow.last_insert_id,
m_da_shadow.message);
}
else
{
my_ok(thd);
}
} }
else if (m_replay_status == wsrep::provider::error_certification_failed) else if (m_replay_status == wsrep::provider::error_certification_failed)
{ {
wsrep_override_error(thd, ER_LOCK_DEADLOCK); wsrep_override_error(orig_thd, ER_LOCK_DEADLOCK);
} }
else else
{ {
DBUG_ASSERT(0); DBUG_ASSERT(0);
WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s", WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
m_replay_status, m_replay_status,
thd->db.str, WSREP_QUERY(thd)); orig_thd->db.str, WSREP_QUERY(orig_thd));
unireg_abort(1); unireg_abort(1);
} }
} }
......
...@@ -87,7 +87,7 @@ class Wsrep_applier_service : public Wsrep_high_priority_service ...@@ -87,7 +87,7 @@ class Wsrep_applier_service : public Wsrep_high_priority_service
class Wsrep_replayer_service : public Wsrep_high_priority_service class Wsrep_replayer_service : public Wsrep_high_priority_service
{ {
public: public:
Wsrep_replayer_service(THD*); Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd);
~Wsrep_replayer_service(); ~Wsrep_replayer_service();
int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&); int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&);
void after_apply() { } void after_apply() { }
...@@ -99,6 +99,7 @@ class Wsrep_replayer_service : public Wsrep_high_priority_service ...@@ -99,6 +99,7 @@ class Wsrep_replayer_service : public Wsrep_high_priority_service
/* Replayer should never be forced to exit */ /* Replayer should never be forced to exit */
bool check_exit_status() const { return false; } bool check_exit_status() const { return false; }
private: private:
THD* m_orig_thd;
struct da_shadow struct da_shadow
{ {
enum Diagnostics_area::enum_diagnostics_status status; enum Diagnostics_area::enum_diagnostics_status status;
......
...@@ -55,6 +55,13 @@ static inline bool wsrep_must_abort(THD* thd) ...@@ -55,6 +55,13 @@ static inline bool wsrep_must_abort(THD* thd)
return (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort); return (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort);
} }
/*
Return true if the transaction must be replayed.
*/
static inline bool wsrep_must_replay(THD* thd)
{
return (thd->wsrep_trx().state() == wsrep::transaction::s_must_replay);
}
/* /*
Return true if transaction has not been committed. Return true if transaction has not been committed.
......
Subproject commit ae746fb28957140fb996a4aaf994baea58bd5287 Subproject commit e9dafb73734d71ab55078b34748e54f139aec827
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment