Commit 12c760ef authored by unknown's avatar unknown

MDEV-4506: Parallel replication.

Improve STOP SLAVE in parallel mode.

Now, the parallel part will queue the current event group to the
end, and then stop queing any more events. Each worker will
complete the current event group, and then just skip any further
queued events.
parent 45c3c715
...@@ -117,7 +117,6 @@ include/start_slave.inc ...@@ -117,7 +117,6 @@ include/start_slave.inc
FLUSH LOGS; FLUSH LOGS;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7); INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
SET binlog_format=@old_format;
BEGIN; BEGIN;
INSERT INTO t3 VALUES (2,102); INSERT INTO t3 VALUES (2,102);
BEGIN; BEGIN;
...@@ -211,6 +210,50 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16, ...@@ -211,6 +210,50 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
'group_commit_waiting_for_prior SIGNAL slave_queued3', 'group_commit_waiting_for_prior SIGNAL slave_queued3',
'')) ''))
slave-bin.000003 # Xid # # COMMIT /* XID */ slave-bin.000003 # Xid # # COMMIT /* XID */
*** Test STOP SLAVE in parallel mode ***
include/stop_slave.inc
SET binlog_direct_non_transactional_updates=0;
SET sql_log_bin=0;
CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
SET sql_log_bin=1;
BEGIN;
INSERT INTO t2 VALUES (20);
INSERT INTO t1 VALUES (20);
INSERT INTO t2 VALUES (21);
INSERT INTO t3 VALUES (20, 20);
COMMIT;
INSERT INTO t3 VALUES(21, 21);
INSERT INTO t3 VALUES(22, 22);
SET binlog_format=@old_format;
BEGIN;
INSERT INTO t2 VALUES (21);
START SLAVE;
STOP SLAVE;
ROLLBACK;
include/wait_for_slave_to_stop.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
a
20
SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
a
20
21
SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
a b
20 20
include/start_slave.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
a
20
SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
a
20
21
SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
a b
20 20
21 21
22 22
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL binlog_format=@old_format; SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=0;
......
...@@ -165,7 +165,6 @@ CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; ...@@ -165,7 +165,6 @@ CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
# Create some sentinel rows so that the rows inserted in parallel fall into # Create some sentinel rows so that the rows inserted in parallel fall into
# separate gaps and do not cause gap lock conflicts. # separate gaps and do not cause gap lock conflicts.
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7); INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
SET binlog_format=@old_format;
--save_master_pos --save_master_pos
--connection server_2 --connection server_2
--sync_with_master --sync_with_master
...@@ -264,6 +263,73 @@ SELECT * FROM t3 ORDER BY a; ...@@ -264,6 +263,73 @@ SELECT * FROM t3 ORDER BY a;
--source include/show_binlog_events.inc --source include/show_binlog_events.inc
--echo *** Test STOP SLAVE in parallel mode ***
--connection server_2
--source include/stop_slave.inc
--connection server_1
# Set up a couple of transactions. The first will be blocked halfway
# through on a lock, and while it is blocked we initiate STOP SLAVE.
# We then test that the halfway-initiated transaction is allowed to
# complete, but no subsequent ones.
# We have to use statement-based mode and set
# binlog_direct_non_transactional_updates=0; otherwise the binlog will
# be split into two event groups, one for the MyISAM part and one for the
# InnoDB part.
SET binlog_direct_non_transactional_updates=0;
SET sql_log_bin=0;
CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
SET sql_log_bin=1;
BEGIN;
INSERT INTO t2 VALUES (20);
--disable_warnings
INSERT INTO t1 VALUES (20);
--disable_warnings
INSERT INTO t2 VALUES (21);
INSERT INTO t3 VALUES (20, 20);
COMMIT;
INSERT INTO t3 VALUES(21, 21);
INSERT INTO t3 VALUES(22, 22);
SET binlog_format=@old_format;
--save_master_pos
# Start a connection that will block the replicated transaction halfway.
--connection con_temp1
BEGIN;
INSERT INTO t2 VALUES (21);
--connection server_2
START SLAVE;
# Wait for the MyISAM change to be visible, after which replication will wait
# for con_temp1 to roll back.
--let $wait_condition= SELECT COUNT(*) = 1 FROM t1 WHERE a=20
--source include/wait_condition.inc
--connection con_temp2
# Initiate slave stop. It will have to wait for the current event group
# to complete.
send STOP SLAVE;
--connection con_temp1
ROLLBACK;
--connection con_temp2
reap;
--connection server_2
--source include/wait_for_slave_to_stop.inc
# We should see the first transaction applied, but not the two others.
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
--source include/start_slave.inc
--sync_with_master
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
--connection server_2 --connection server_2
--source include/stop_slave.inc --source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format; SET GLOBAL binlog_format=@old_format;
......
...@@ -77,6 +77,28 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, ...@@ -77,6 +77,28 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
} }
static bool
sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group)
{
if (!rgi->rli->abort_slave && !abort_loop)
return false;
/*
Do not abort in the middle of an event group that cannot be rolled back.
*/
if ((thd->transaction.all.modified_non_trans_table ||
(thd->variables.option_bits & OPTION_KEEP_LOG))
&& in_event_group)
return false;
/* ToDo: should we add some timeout like in sql_slave_killed?
if (rgi->last_event_start_time == 0)
rgi->last_event_start_time= my_time(0);
*/
return true;
}
pthread_handler_t pthread_handler_t
handle_rpl_parallel_thread(void *arg) handle_rpl_parallel_thread(void *arg)
{ {
...@@ -131,7 +153,6 @@ handle_rpl_parallel_thread(void *arg) ...@@ -131,7 +153,6 @@ handle_rpl_parallel_thread(void *arg)
"Waiting for work from SQL thread"); "Waiting for work from SQL thread");
while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed)
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
/* Mark that this thread is now executing */
rpt->event_queue= rpt->last_in_queue= NULL; rpt->event_queue= rpt->last_in_queue= NULL;
thd->exit_cond(old_msg); thd->exit_cond(old_msg);
...@@ -159,7 +180,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -159,7 +180,7 @@ handle_rpl_parallel_thread(void *arg)
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 & (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE)); Gtid_log_event::FL_STANDALONE));
/* Save this, as it gets cleared once event group commits. */ /* Save this, as it gets cleared when the event group commits. */
event_gtid_sub_id= rgi->gtid_sub_id; event_gtid_sub_id= rgi->gtid_sub_id;
rgi->thd= thd; rgi->thd= thd;
...@@ -197,7 +218,16 @@ handle_rpl_parallel_thread(void *arg) ...@@ -197,7 +218,16 @@ handle_rpl_parallel_thread(void *arg)
thd->wait_for_commit_ptr= &rgi->commit_orderer; thd->wait_for_commit_ptr= &rgi->commit_orderer;
} }
/*
If the SQL thread is stopping, we just skip execution of all the
following event groups. We still do all the normal waiting and wakeup
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
if (!sql_worker_killed(thd, rgi, in_event_group))
rpt_handle_event(events, rpt); rpt_handle_event(events, rpt);
else
thd->wait_for_prior_commit();
end_of_group= end_of_group=
in_event_group && in_event_group &&
...@@ -207,7 +237,6 @@ handle_rpl_parallel_thread(void *arg) ...@@ -207,7 +237,6 @@ handle_rpl_parallel_thread(void *arg)
(!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) || (!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) ||
!strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query)))); !strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query))));
/* ToDo: must use rgi here, not rli, for thread safety. */
delete_or_keep_event_post_apply(rgi, event_type, events->ev); delete_or_keep_event_post_apply(rgi, event_type, events->ev);
my_free(events); my_free(events);
...@@ -516,7 +545,7 @@ free_rpl_parallel_entry(void *element) ...@@ -516,7 +545,7 @@ free_rpl_parallel_entry(void *element)
rpl_parallel::rpl_parallel() : rpl_parallel::rpl_parallel() :
current(NULL) current(NULL), sql_thread_stopping(false)
{ {
my_hash_init(&domain_hash, &my_charset_bin, 32, my_hash_init(&domain_hash, &my_charset_bin, 32,
offsetof(rpl_parallel_entry, domain_id), sizeof(uint32), offsetof(rpl_parallel_entry, domain_id), sizeof(uint32),
...@@ -529,6 +558,7 @@ rpl_parallel::reset() ...@@ -529,6 +558,7 @@ rpl_parallel::reset()
{ {
my_hash_reset(&domain_hash); my_hash_reset(&domain_hash);
current= NULL; current= NULL;
sql_thread_stopping= false;
} }
...@@ -591,10 +621,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) ...@@ -591,10 +621,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
rpl_group_info *rgi= NULL; rpl_group_info *rgi= NULL;
Relay_log_info *rli= serial_rgi->rli; Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ; enum Log_event_type typ;
bool is_group_event;
/* ToDo: what to do with this lock?!? */ /* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock); mysql_mutex_unlock(&rli->data_lock);
/*
Stop queueing additional event groups once the SQL thread is requested to
stop.
*/
if (((typ= ev->get_type_code()) == GTID_EVENT ||
!(is_group_event= Log_event::is_group_event(typ))) &&
rli->abort_slave)
sql_thread_stopping= true;
if (sql_thread_stopping)
return false;
if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
MYF(0)))) MYF(0))))
{ {
...@@ -604,7 +646,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) ...@@ -604,7 +646,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
qev->ev= ev; qev->ev= ev;
qev->next= NULL; qev->next= NULL;
if ((typ= ev->get_type_code()) == GTID_EVENT) if (typ == GTID_EVENT)
{ {
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
...@@ -714,7 +756,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) ...@@ -714,7 +756,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
e->current_sub_id= rgi->gtid_sub_id; e->current_sub_id= rgi->gtid_sub_id;
current= rgi->parallel_entry= e; current= rgi->parallel_entry= e;
} }
else if (!Log_event::is_group_event(typ) || !current) else if (!is_group_event || !current)
{ {
/* /*
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
......
...@@ -76,6 +76,7 @@ struct rpl_parallel_entry { ...@@ -76,6 +76,7 @@ struct rpl_parallel_entry {
struct rpl_parallel { struct rpl_parallel {
HASH domain_hash; HASH domain_hash;
rpl_parallel_entry *current; rpl_parallel_entry *current;
bool sql_thread_stopping;
rpl_parallel(); rpl_parallel();
~rpl_parallel(); ~rpl_parallel();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment