Commit 4cb1e0ee authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-6321: close_temporary_tables() in format description event not serialised correctly

When a master server starts up, it logs a special format_description event at
the start of a new binlog to mark that is has restarted. This is used by a
slave to drop all temporary tables - this is needed in case the master crashed
and did not have a chance to send explicit DROP TEMPORARY TABLE statements to
the slave.

In parallel replication, we need to be careful when dropping the temporary
tables - we need to be sure that no prior events are still executing that
might be using the temporary tables to be dropped, _and_ that no following
events have started executing that might have created new temporary tables
that should not be dropped.

This was not handled correctly, which could cause errors about access to not
existing temporary tables or even crashes. This patch implements that such
format_description events cause serialisation of event execution; all prior
events are executed to completion first, then the format_description event is
executed, dropping temporary tables, then following events are queued for
execution.

Master restarts should be sufficiently infrequent that the resulting loss of
parallelism should be of minimal impact.
parent bd2117d1
include/rpl_init.inc [topology=1->2]
*** MDEV-6321: close_temporary_tables() in format description event not serialised correctly ***
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=5;
CHANGE MASTER TO master_use_gtid= current_pos;
include/start_slave.inc
CREATE TABLE t1 (a INT PRIMARY KEY, b VARCHAR(100) CHARACTER SET utf8);
include/stop_slave.inc
SET gtid_domain_id= 1;
INSERT INTO t1 VALUES (1, 0);
CREATE TEMPORARY TABLE t2 (a int);
SET gtid_domain_id= 2;
CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY);
CREATE TEMPORARY TABLE t4 (a int);
INSERT INTO t3 VALUES (100);
INSERT INTO t4 SELECT a+1 FROM t3;
INSERT INTO t2 VALUES (2), (4), (6), (8), (10), (12), (14), (16), (18), (20);
INSERT INTO t2 VALUES (3), (6), (9), (12), (15), (18);
INSERT INTO t2 VALUES (4), (8), (12), (16), (20);
INSERT INTO t3 SELECT a+2 FROM t4;
INSERT INTO t4 SELECT a+4 FROM t3;
INSERT INTO t2 VALUES (5), (10), (15), (20);
INSERT INTO t2 VALUES (6), (12), (18);
INSERT INTO t2 VALUES (7), (14);
INSERT INTO t2 VALUES (8), (16);
INSERT INTO t2 VALUES (9), (18);
INSERT INTO t2 VALUES (10), (20);
INSERT INTO t3 SELECT a+8 FROM t4;
INSERT INTO t4 SELECT a+16 FROM t3;
INSERT INTO t2 VALUES (11);
INSERT INTO t2 VALUES (12);
INSERT INTO t2 VALUES (13);
INSERT INTO t3 SELECT a+32 FROM t4;
INSERT INTO t2 VALUES (14);
INSERT INTO t2 VALUES (15);
INSERT INTO t2 VALUES (16);
INSERT INTO t4 SELECT a+64 FROM t3;
INSERT INTO t2 VALUES (17);
INSERT INTO t2 VALUES (18);
INSERT INTO t2 VALUES (19);
INSERT INTO t3 SELECT a+128 FROM t4;
INSERT INTO t2 VALUES (20);
INSERT INTO t1 SELECT a, a MOD 7 FROM t3;
INSERT INTO t1 SELECT a, a MOD 7 FROM t4;
INSERT INTO t1 SELECT a, COUNT(*) FROM t2 GROUP BY a;
FLUSH TABLES;
SET SESSION debug_dbug="+d,crash_dispatch_command_before";
SELECT 1;
Got one of the listed errors
INSERT INTO t1 VALUES (0, 1);
include/start_slave.inc
SELECT * FROM t1 WHERE a <= 20 ORDER BY a;
a b
0 1
1 0
2 1
3 1
4 2
5 1
6 3
7 1
8 3
9 2
10 3
11 1
12 5
13 1
14 3
15 3
16 4
17 1
18 5
19 1
20 5
SELECT COUNT(*) FROM t1 WHERE a BETWEEN 100+0 AND 100+256;
COUNT(*)
55
SHOW STATUS LIKE 'Slave_open_temp_tables';
Variable_name Value
Slave_open_temp_tables 0
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
DROP TABLE t1;
include/rpl_end.inc
--source include/have_binlog_format_statement.inc
--let $rpl_topology=1->2
--source include/rpl_init.inc
--echo *** MDEV-6321: close_temporary_tables() in format description event not serialised correctly ***
--connection server_2
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=5;
CHANGE MASTER TO master_use_gtid= current_pos;
--source include/start_slave.inc
--connection server_1
CREATE TABLE t1 (a INT PRIMARY KEY, b VARCHAR(100) CHARACTER SET utf8);
--save_master_pos
--connection server_2
--sync_with_master
--source include/stop_slave.inc
--connection server_1
SET gtid_domain_id= 1;
INSERT INTO t1 VALUES (1, 0);
CREATE TEMPORARY TABLE t2 (a int);
--connection default
SET gtid_domain_id= 2;
CREATE TEMPORARY TABLE t3 (a INT PRIMARY KEY);
CREATE TEMPORARY TABLE t4 (a int);
INSERT INTO t3 VALUES (100);
INSERT INTO t4 SELECT a+1 FROM t3;
--connection server_1
INSERT INTO t2 VALUES (2), (4), (6), (8), (10), (12), (14), (16), (18), (20);
INSERT INTO t2 VALUES (3), (6), (9), (12), (15), (18);
INSERT INTO t2 VALUES (4), (8), (12), (16), (20);
--connection default
INSERT INTO t3 SELECT a+2 FROM t4;
INSERT INTO t4 SELECT a+4 FROM t3;
--connection server_1
INSERT INTO t2 VALUES (5), (10), (15), (20);
INSERT INTO t2 VALUES (6), (12), (18);
INSERT INTO t2 VALUES (7), (14);
INSERT INTO t2 VALUES (8), (16);
INSERT INTO t2 VALUES (9), (18);
INSERT INTO t2 VALUES (10), (20);
--connection default
INSERT INTO t3 SELECT a+8 FROM t4;
INSERT INTO t4 SELECT a+16 FROM t3;
--connection server_1
INSERT INTO t2 VALUES (11);
INSERT INTO t2 VALUES (12);
INSERT INTO t2 VALUES (13);
--connection default
INSERT INTO t3 SELECT a+32 FROM t4;
--connection server_1
INSERT INTO t2 VALUES (14);
INSERT INTO t2 VALUES (15);
INSERT INTO t2 VALUES (16);
--connection default
INSERT INTO t4 SELECT a+64 FROM t3;
--connection server_1
INSERT INTO t2 VALUES (17);
INSERT INTO t2 VALUES (18);
INSERT INTO t2 VALUES (19);
--connection default
INSERT INTO t3 SELECT a+128 FROM t4;
--connection server_1
INSERT INTO t2 VALUES (20);
--connection default
INSERT INTO t1 SELECT a, a MOD 7 FROM t3;
INSERT INTO t1 SELECT a, a MOD 7 FROM t4;
--connection server_1
INSERT INTO t1 SELECT a, COUNT(*) FROM t2 GROUP BY a;
# Crash the master server, so that temporary tables are implicitly dropped.
--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
wait
EOF
FLUSH TABLES;
SET SESSION debug_dbug="+d,crash_dispatch_command_before";
--error 2006,2013
SELECT 1;
--source include/wait_until_disconnected.inc
--connection default
--source include/wait_until_disconnected.inc
--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
restart
EOF
--connection default
--enable_reconnect
--source include/wait_until_connected_again.inc
--connection server_1
--enable_reconnect
--source include/wait_until_connected_again.inc
INSERT INTO t1 VALUES (0, 1);
--save_master_pos
--connection server_2
# Start the slave replicating the events.
# The bug was that the format description event written after the crash could
# be fetched ahead of the execution of the temporary table events and executed
# out-of-band. This would cause drop of all temporary tables and thus failure
# for execution of remaining events.
--source include/start_slave.inc
--sync_with_master
SELECT * FROM t1 WHERE a <= 20 ORDER BY a;
SELECT COUNT(*) FROM t1 WHERE a BETWEEN 100+0 AND 100+256;
SHOW STATUS LIKE 'Slave_open_temp_tables';
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc
--connection server_1
DROP TABLE t1;
--source include/rpl_end.inc
...@@ -1617,6 +1617,36 @@ rpl_parallel::workers_idle() ...@@ -1617,6 +1617,36 @@ rpl_parallel::workers_idle()
} }
void
rpl_parallel::wait_for_workers_idle(THD *thd)
{
uint32 i, max_i;
max_i= domain_hash.records;
for (i= 0; i < max_i; ++i)
{
bool active;
wait_for_commit my_orderer;
struct rpl_parallel_entry *e;
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry);
if ((active= (e->current_sub_id > e->last_committed_sub_id)))
{
wait_for_commit *waitee= &e->current_group_info->commit_orderer;
my_orderer.register_wait_for_prior_commit(waitee);
thd->wait_for_commit_ptr= &my_orderer;
}
mysql_mutex_unlock(&e->LOCK_parallel_entry);
if (active)
{
my_orderer.wait_for_prior_commit(thd);
thd->wait_for_commit_ptr= NULL;
}
}
}
/* /*
This is used when we get an error during processing in do_event(); This is used when we get an error during processing in do_event();
We will not queue any event to the thread, but we still need to wake it up We will not queue any event to the thread, but we still need to wake it up
...@@ -1684,6 +1714,24 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -1684,6 +1714,24 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
/* ToDo: what to do with this lock?!? */ /* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock); mysql_mutex_unlock(&rli->data_lock);
if (typ == FORMAT_DESCRIPTION_EVENT)
{
Format_description_log_event *fdev=
static_cast<Format_description_log_event *>(ev);
if (fdev->created)
{
/*
This format description event marks a new binlog after a master server
restart. We are going to close all temporary tables to clean up any
possible left-overs after a prior master crash.
Thus we need to wait for all prior events to execute to completion,
in case they need access to any of the temporary tables.
*/
wait_for_workers_idle(rli->sql_driver_thd);
}
}
/* /*
Stop queueing additional event groups once the SQL thread is requested to Stop queueing additional event groups once the SQL thread is requested to
stop. stop.
......
...@@ -239,6 +239,7 @@ struct rpl_parallel { ...@@ -239,6 +239,7 @@ struct rpl_parallel {
void wait_for_done(THD *thd, Relay_log_info *rli); void wait_for_done(THD *thd, Relay_log_info *rli);
void stop_during_until(); void stop_during_until();
bool workers_idle(); bool workers_idle();
void wait_for_workers_idle(THD *thd);
int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size);
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment