Commit 2de8db62 authored by Kristian Nielsen's avatar Kristian Nielsen

Merge MDEV-7936 into 10.1

parents 8a01a0ac 60d094ae
include/rpl_init.inc [topology=1->2->3]
*** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure ***
SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
SET GLOBAL binlog_direct_non_transactional_updates=OFF;
SET SESSION binlog_direct_non_transactional_updates=OFF;
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
include/stop_slave.inc
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=10;
SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count;
SET GLOBAL binlog_commit_wait_count=2;
SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec;
SET GLOBAL binlog_commit_wait_usec=2000000;
SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
SET GLOBAL binlog_direct_non_transactional_updates=OFF;
SET SESSION binlog_direct_non_transactional_updates=OFF;
CHANGE MASTER TO master_use_gtid=current_pos;
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=current_pos;
BEGIN;
CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY;
COMMIT;
INSERT INTO t2 VALUES (1);
INSERT INTO t1 SELECT a, a*10 FROM t2;
DROP TABLE t2;
include/save_master_gtid.inc
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 10
include/start_slave.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
a b
1 10
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
SET GLOBAL binlog_commit_wait_count=@old_commit_count;
SET GLOBAL binlog_commit_wait_usec=@old_commit_usec;
SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
include/start_slave.inc
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table");
DROP TABLE t1;
include/rpl_end.inc
......@@ -116,6 +116,26 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
Variable_name Value
Slave_open_temp_tables 0
FLUSH LOGS;
*** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***
CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @commit_id= 10000;
INSERT INTO t4 VALUES (30);
INSERT INTO t4 VALUES (31);
SET SESSION debug_dbug= @old_dbug;
INSERT INTO t1 SELECT a, "conservative" FROM t4;
DROP TEMPORARY TABLE t4;
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 conservative
31 conservative
include/save_master_pos.inc
include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
a b
30 conservative
31 conservative
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
......
!include ../my.cnf
[mysqld.1]
log-slave-updates
loose-innodb
[mysqld.2]
log-slave-updates
loose-innodb
[mysqld.3]
log-slave-updates
loose-innodb
[ENV]
SERVER_MYPORT_3= @mysqld.3.port
SERVER_MYSOCK_3= @mysqld.3.socket
--source include/have_innodb.inc
--let $rpl_topology=1->2->3
--source include/rpl_init.inc
--echo *** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure ***
--connection server_1
SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
SET GLOBAL binlog_direct_non_transactional_updates=OFF;
SET SESSION binlog_direct_non_transactional_updates=OFF;
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
--save_master_pos
--connection server_2
--sync_with_master
--save_master_pos
--source include/stop_slave.inc
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
SET GLOBAL slave_parallel_threads=10;
SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count;
SET GLOBAL binlog_commit_wait_count=2;
SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec;
SET GLOBAL binlog_commit_wait_usec=2000000;
SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
SET GLOBAL binlog_direct_non_transactional_updates=OFF;
SET SESSION binlog_direct_non_transactional_updates=OFF;
CHANGE MASTER TO master_use_gtid=current_pos;
--connection server_3
--sync_with_master
--save_master_pos
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=current_pos;
--connection server_1
BEGIN;
CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY;
COMMIT;
INSERT INTO t2 VALUES (1);
INSERT INTO t1 SELECT a, a*10 FROM t2;
DROP TABLE t2;
--source include/save_master_gtid.inc
--connection server_2
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
--connection server_3
--source include/start_slave.inc
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 ORDER BY a;
# Clean up
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
SET GLOBAL binlog_commit_wait_count=@old_commit_count;
SET GLOBAL binlog_commit_wait_usec=@old_commit_usec;
SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
--source include/start_slave.inc
--connection server_3
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc
--connection server_1
SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table");
DROP TABLE t1;
--source include/rpl_end.inc
......@@ -213,6 +213,29 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
FLUSH LOGS;
--echo *** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***
--connection server_1
CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
SET @old_dbug= @@SESSION.debug_dbug;
SET SESSION debug_dbug="+d,binlog_force_commit_id";
SET @commit_id= 10000;
INSERT INTO t4 VALUES (30);
INSERT INTO t4 VALUES (31);
SET SESSION debug_dbug= @old_dbug;
INSERT INTO t1 SELECT a, "conservative" FROM t4;
DROP TEMPORARY TABLE t4;
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
--source include/save_master_pos.inc
--connection server_2
--source include/sync_with_master_gtid.inc
SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
# Clean up.
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
......
......@@ -704,20 +704,23 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd)
DBUG_VOID_RETURN;
}
thd->lock_temporary_tables();
for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
{
if ((table->query_id == thd->query_id) && ! table->open_by_handler)
mark_tmp_table_for_reuse(table);
}
thd->unlock_temporary_tables();
if (thd->rgi_slave)
if (thd->temporary_tables)
{
/*
Temporary tables are shared with other by sql execution threads.
As a safety messure, clear the pointer to the common area.
*/
thd->temporary_tables= 0;
thd->lock_temporary_tables();
for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
{
if ((table->query_id == thd->query_id) && ! table->open_by_handler)
mark_tmp_table_for_reuse(table);
}
thd->unlock_temporary_tables();
if (thd->rgi_slave)
{
/*
Temporary tables are shared with other by sql execution threads.
As a safety messure, clear the pointer to the common area.
*/
thd->temporary_tables= 0;
}
}
DBUG_VOID_RETURN;
}
......@@ -1590,6 +1593,69 @@ TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl)
}
static bool
use_temporary_table(THD *thd, TABLE *table, TABLE **out_table)
{
*out_table= table;
if (!table)
return false;
/*
Temporary tables are not safe for parallel replication. They were
designed to be visible to one thread only, so have no table locking.
Thus there is no protection against two conflicting transactions
committing in parallel and things like that.
So for now, anything that uses temporary tables will be serialised
with anything before it, when using parallel replication.
ToDo: We might be able to introduce a reference count or something
on temp tables, and have slave worker threads wait for it to reach
zero before being allowed to use the temp table. Might not be worth
it though, as statement-based replication using temporary tables is
in any case rather fragile.
*/
if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
thd->wait_for_prior_commit())
return true;
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
return false;
}
bool
find_and_use_temporary_table(THD *thd, const char *db, const char *table_name,
TABLE **out_table)
{
return use_temporary_table(thd, find_temporary_table(thd, db, table_name),
out_table);
}
bool
find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl, TABLE **out_table)
{
return use_temporary_table(thd, find_temporary_table(thd, tl), out_table);
}
/**
Find a temporary table specified by a key in the THD::temporary_tables list.
......@@ -1610,26 +1676,6 @@ TABLE *find_temporary_table(THD *thd,
if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length))
{
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
result= table;
break;
}
......@@ -5622,6 +5668,14 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton,
(uint) thd->variables.server_id,
(ulong) thd->variables.pseudo_thread_id));
if (add_to_temporary_tables_list)
{
/* Temporary tables are not safe for parallel replication. */
if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
thd->wait_for_prior_commit())
return NULL;
}
/* Create the cache_key for temporary tables */
key_length= create_tmp_table_def_key(thd, cache_key, db, table_name);
......@@ -5857,7 +5911,9 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl)
DBUG_RETURN(FALSE);
}
if (!(table= find_temporary_table(thd, tl)))
if (find_and_use_temporary_table(thd, tl, &table))
DBUG_RETURN(TRUE);
if (!table)
{
if (tl->open_type == OT_TEMPORARY_ONLY &&
tl->open_strategy == TABLE_LIST::OPEN_NORMAL)
......
......@@ -143,7 +143,11 @@ TABLE_LIST *find_table_in_list(TABLE_LIST *table,
const char *db_name,
const char *table_name);
TABLE *find_temporary_table(THD *thd, const char *db, const char *table_name);
bool find_and_use_temporary_table(THD *thd, const char *db,
const char *table_name, TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl);
bool find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl,
TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const char *table_key,
uint table_key_length);
void close_thread_tables(THD *thd);
......
......@@ -4656,7 +4656,9 @@ int create_table_impl(THD *thd,
if (create_info->tmp_table())
{
TABLE *tmp_table;
if ((tmp_table= find_temporary_table(thd, db, table_name)))
if (find_and_use_temporary_table(thd, db, table_name, &tmp_table))
goto err;
if (tmp_table)
{
bool table_creation_was_logged= tmp_table->s->table_creation_was_logged;
if (options.or_replace())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment