Commit a5418c55 authored by unknown's avatar unknown

MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction

When a transaction fails in parallel replication, it should signal the error
to any following transactions doing wait_for_prior_commit() on it. But the
code for this was incorrect, and would not correctly remember a prior error
when sending the signal. This caused corruption when slave stopped due to an
error.

Fix by remembering the error code when we first get an error, and passing the
saved error code to wakeup_subsequent_commits().

Thanks to nanyi607rao who reported this bug on
maria-developers@lists.launchpad.net and analysed the root cause.
parent e59dec03
...@@ -694,6 +694,7 @@ STOP SLAVE IO_THREAD; ...@@ -694,6 +694,7 @@ STOP SLAVE IO_THREAD;
SET GLOBAL debug_dbug=@old_dbug; SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_max_queued= @old_max_queued; SET GLOBAL slave_parallel_max_queued= @old_max_queued;
INSERT INTO t3 VALUES (82,0); INSERT INTO t3 VALUES (82,0);
SET binlog_format=@old_format;
SET debug_sync='RESET'; SET debug_sync='RESET';
include/start_slave.inc include/start_slave.inc
SELECT * FROM t3 WHERE a >= 80 ORDER BY a; SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
...@@ -726,6 +727,40 @@ SELECT * FROM t3 WHERE a >= 100 ORDER BY a; ...@@ -726,6 +727,40 @@ SELECT * FROM t3 WHERE a >= 100 ORDER BY a;
a b a b
106 # 106 #
107 # 107 #
*** MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction ***
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
INSERT INTO t3 VALUES (110, 1);
SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
a b
110 1
SET sql_log_bin=0;
INSERT INTO t3 VALUES (111, 666);
SET sql_log_bin=1;
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
INSERT INTO t3 VALUES (111, 2);
SET debug_sync='now WAIT_FOR master_queued1';
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
INSERT INTO t3 VALUES (112, 3);
SET debug_sync='now WAIT_FOR master_queued2';
SET debug_sync='now SIGNAL master_cont1';
SET debug_sync='RESET';
include/wait_for_slave_sql_error.inc [errno=1062]
include/wait_for_slave_sql_to_stop.inc
SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
a b
110 1
111 666
SET sql_log_bin=0;
DELETE FROM t3 WHERE a=111 AND b=666;
SET sql_log_bin=1;
START SLAVE SQL_THREAD;
SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
a b
110 1
111 2
112 3
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc include/start_slave.inc
......
...@@ -1053,6 +1053,7 @@ SET GLOBAL slave_parallel_max_queued= @old_max_queued; ...@@ -1053,6 +1053,7 @@ SET GLOBAL slave_parallel_max_queued= @old_max_queued;
--connection server_1 --connection server_1
INSERT INTO t3 VALUES (82,0); INSERT INTO t3 VALUES (82,0);
SET binlog_format=@old_format;
--save_master_pos --save_master_pos
--connection server_2 --connection server_2
...@@ -1113,6 +1114,64 @@ INSERT INTO t3 VALUES (107, rand()); ...@@ -1113,6 +1114,64 @@ INSERT INTO t3 VALUES (107, rand());
SELECT * FROM t3 WHERE a >= 100 ORDER BY a; SELECT * FROM t3 WHERE a >= 100 ORDER BY a;
--echo *** MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction ***
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
--source include/start_slave.inc
--connection server_1
INSERT INTO t3 VALUES (110, 1);
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
# Inject a duplicate key error.
SET sql_log_bin=0;
INSERT INTO t3 VALUES (111, 666);
SET sql_log_bin=1;
--connection server_1
# Create a group commit with two inserts, the first one conflicts with a row on the slave
--connect (con1,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
send INSERT INTO t3 VALUES (111, 2);
--connection server_1
SET debug_sync='now WAIT_FOR master_queued1';
--connect (con2,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
send INSERT INTO t3 VALUES (112, 3);
--connection server_1
SET debug_sync='now WAIT_FOR master_queued2';
SET debug_sync='now SIGNAL master_cont1';
--connection con1
REAP;
--connection con2
REAP;
SET debug_sync='RESET';
--save_master_pos
--connection server_2
--let $slave_sql_errno= 1062
--source include/wait_for_slave_sql_error.inc
--source include/wait_for_slave_sql_to_stop.inc
# We should not see the row (112,3) here, it should be rolled back due to
# error signal from the prior transaction.
SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
SET sql_log_bin=0;
DELETE FROM t3 WHERE a=111 AND b=666;
SET sql_log_bin=1;
START SLAVE SQL_THREAD;
--sync_with_master
SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
--connection server_2 --connection server_2
--source include/stop_slave.inc --source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
struct rpl_parallel_thread_pool global_rpl_thread_pool; struct rpl_parallel_thread_pool global_rpl_thread_pool;
static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
int err);
static int static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev, rpt_handle_event(rpl_parallel_thread::queued_event *qev,
...@@ -94,10 +96,11 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) ...@@ -94,10 +96,11 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
static void static void
finish_event_group(THD *thd, int err, uint64 sub_id, finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
rpl_parallel_entry *entry, rpl_group_info *rgi) rpl_group_info *rgi)
{ {
wait_for_commit *wfc= &rgi->commit_orderer; wait_for_commit *wfc= &rgi->commit_orderer;
int err;
/* /*
Remove any left-over registration to wait for a prior commit to Remove any left-over registration to wait for a prior commit to
...@@ -120,10 +123,10 @@ finish_event_group(THD *thd, int err, uint64 sub_id, ...@@ -120,10 +123,10 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
waiting for us will in any case receive the error back from their waiting for us will in any case receive the error back from their
wait_for_prior_commit() call. wait_for_prior_commit() call.
*/ */
if (err) if (rgi->worker_error)
wfc->unregister_wait_for_prior_commit(); wfc->unregister_wait_for_prior_commit();
else else if ((err= wfc->wait_for_prior_commit(thd)))
err= wfc->wait_for_prior_commit(thd); signal_error_to_sql_driver_thread(thd, rgi, err);
thd->wait_for_commit_ptr= NULL; thd->wait_for_commit_ptr= NULL;
/* /*
...@@ -150,7 +153,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id, ...@@ -150,7 +153,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
not yet started should just skip their group, preparing for stop of the not yet started should just skip their group, preparing for stop of the
SQL driver thread. SQL driver thread.
*/ */
if (unlikely(rgi->is_error) && if (unlikely(rgi->worker_error) &&
entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX) entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
entry->stop_on_error_sub_id= sub_id; entry->stop_on_error_sub_id= sub_id;
/* /*
...@@ -163,14 +166,14 @@ finish_event_group(THD *thd, int err, uint64 sub_id, ...@@ -163,14 +166,14 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
thd->clear_error(); thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area(); thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(err); wfc->wakeup_subsequent_commits(rgi->worker_error);
} }
static void static void
signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi) signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
{ {
rgi->is_error= true; rgi->worker_error= err;
rgi->cleanup_context(thd, true); rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true; rgi->rli->abort_slave= true;
rgi->rli->stop_for_until= false; rgi->rli->stop_for_until= false;
...@@ -294,7 +297,6 @@ handle_rpl_parallel_thread(void *arg) ...@@ -294,7 +297,6 @@ handle_rpl_parallel_thread(void *arg)
continue; continue;
} }
err= 0;
group_rgi= rgi; group_rgi= rgi;
gco= rgi->gco; gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */ /* Handle a new event group, which will be initiated by a GTID event. */
...@@ -346,12 +348,12 @@ handle_rpl_parallel_thread(void *arg) ...@@ -346,12 +348,12 @@ handle_rpl_parallel_thread(void *arg)
did_enter_cond= true; did_enter_cond= true;
do do
{ {
if (thd->check_killed() && !rgi->is_error) if (thd->check_killed() && !rgi->worker_error)
{ {
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->send_kill_message(); thd->send_kill_message();
slave_output_error_info(rgi->rli, thd); slave_output_error_info(rgi->rli, thd);
signal_error_to_sql_driver_thread(thd, rgi); signal_error_to_sql_driver_thread(thd, rgi, 1);
/* /*
Even though we were killed, we need to continue waiting for the Even though we were killed, we need to continue waiting for the
prior event groups to signal that we can continue. Otherwise we prior event groups to signal that we can continue. Otherwise we
...@@ -417,7 +419,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -417,7 +419,7 @@ handle_rpl_parallel_thread(void *arg)
*/ */
rgi->cleanup_context(thd, true); rgi->cleanup_context(thd, true);
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
thd->wait_for_commit_ptr->wakeup_subsequent_commits(err); thd->wait_for_commit_ptr->wakeup_subsequent_commits(rgi->worker_error);
} }
thd->wait_for_commit_ptr= &rgi->commit_orderer; thd->wait_for_commit_ptr= &rgi->commit_orderer;
...@@ -430,7 +432,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -430,7 +432,7 @@ handle_rpl_parallel_thread(void *arg)
{ {
/* Error. */ /* Error. */
slave_output_error_info(rgi->rli, thd); slave_output_error_info(rgi->rli, thd);
signal_error_to_sql_driver_thread(thd, rgi); signal_error_to_sql_driver_thread(thd, rgi, 1);
} }
else if (!res) else if (!res)
{ {
...@@ -460,7 +462,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -460,7 +462,7 @@ handle_rpl_parallel_thread(void *arg)
processing between the event groups as a simple way to ensure that processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly. everything is stopped and cleaned up correctly.
*/ */
if (!rgi->is_error && !skip_event_group) if (!rgi->worker_error && !skip_event_group)
err= rpt_handle_event(events, rpt); err= rpt_handle_event(events, rpt);
else else
err= thd->wait_for_prior_commit(); err= thd->wait_for_prior_commit();
...@@ -474,15 +476,15 @@ handle_rpl_parallel_thread(void *arg) ...@@ -474,15 +476,15 @@ handle_rpl_parallel_thread(void *arg)
events->next= qevs_to_free; events->next= qevs_to_free;
qevs_to_free= events; qevs_to_free= events;
if (err) if (unlikely(err) && !rgi->worker_error)
{ {
slave_output_error_info(rgi->rli, thd); slave_output_error_info(rgi->rli, thd);
signal_error_to_sql_driver_thread(thd, rgi); signal_error_to_sql_driver_thread(thd, rgi, err);
} }
if (end_of_group) if (end_of_group)
{ {
in_event_group= false; in_event_group= false;
finish_event_group(thd, err, event_gtid_sub_id, entry, rgi); finish_event_group(thd, event_gtid_sub_id, entry, rgi);
rgi->next= rgis_to_free; rgi->next= rgis_to_free;
rgis_to_free= rgi; rgis_to_free= rgi;
group_rgi= rgi= NULL; group_rgi= rgi= NULL;
...@@ -541,9 +543,9 @@ handle_rpl_parallel_thread(void *arg) ...@@ -541,9 +543,9 @@ handle_rpl_parallel_thread(void *arg)
*/ */
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
thd->wait_for_prior_commit(); thd->wait_for_prior_commit();
finish_event_group(thd, 1, group_rgi->gtid_sub_id, signal_error_to_sql_driver_thread(thd, group_rgi, 1);
finish_event_group(thd, group_rgi->gtid_sub_id,
group_rgi->parallel_entry, group_rgi); group_rgi->parallel_entry, group_rgi);
signal_error_to_sql_driver_thread(thd, group_rgi);
in_event_group= false; in_event_group= false;
mysql_mutex_lock(&rpt->LOCK_rpl_thread); mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_rgi(group_rgi); rpt->free_rgi(group_rgi);
......
...@@ -1489,7 +1489,7 @@ rpl_group_info::reinit(Relay_log_info *rli) ...@@ -1489,7 +1489,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
tables_to_lock_count= 0; tables_to_lock_count= 0;
trans_retries= 0; trans_retries= 0;
last_event_start_time= 0; last_event_start_time= 0;
is_error= false; worker_error= 0;
row_stmt_start_timestamp= 0; row_stmt_start_timestamp= 0;
long_find_row_note_printed= false; long_find_row_note_printed= false;
did_mark_start_commit= false; did_mark_start_commit= false;
......
...@@ -569,7 +569,7 @@ struct rpl_group_info ...@@ -569,7 +569,7 @@ struct rpl_group_info
*/ */
char future_event_master_log_name[FN_REFLEN]; char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec; bool is_parallel_exec;
bool is_error; int worker_error;
/* /*
Set true when we signalled that we reach the commit phase. Used to avoid Set true when we signalled that we reach the commit phase. Used to avoid
counting one event group twice. counting one event group twice.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment