Commit 2b5ea13f authored by Mats Kindahl's avatar Mats Kindahl

Bug #34707: Row based replication: slave creates table within wrong database

The failure was caused by executing a CREATE-SELECT statement that creates a
table in another database than the current one. In row-based logging, the
CREATE statement was written to the binary log without the database, hence
creating the table in the wrong database, causing the following inserts to
fail since the table didn't exist in the given database.

Fixed the bug by adding a parameter to store_create_info() that will make
the function print the database name before the table name and used that
in the calls that write the CREATE statement to the binary log. The database
name is only printed if it is different than the currently selected database.

The output of SHOW CREATE TABLE has not changed and is still printed without
the database name.

mysql-test/suite/rpl/t/rpl_row_create_table.test:
  Added test to check that CREATE-SELECT into another database than the
  current one replicates.
sql/sql_insert.cc:
  Adding parameter to calls to store_create_info().
sql/sql_show.cc:
  Adding parameter to calls to store_create_info().
  
  Extending store_create_info() with parameter 'show_database' that will cause
  the database to be written before the table name.
sql/sql_show.h:
  Adding parameter to call to store_create_info() to tell if the database should be shown or not.
sql/sql_table.cc:
  Adding parameter to calls to store_create_info().
parent 7c70643d
......@@ -430,4 +430,25 @@ a
1
2
DROP TABLE t1;
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
CREATE DATABASE mysqltest1;
CREATE TABLE mysqltest1.without_select (f1 BIGINT);
CREATE TABLE mysqltest1.with_select AS SELECT 1 AS f1;
show binlog events from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query # # CREATE DATABASE mysqltest1
master-bin.000001 # Query # # use `test`; CREATE TABLE mysqltest1.without_select (f1 BIGINT)
master-bin.000001 # Query # # use `test`; BEGIN
master-bin.000001 # Query # # use `test`; CREATE TABLE `mysqltest1`.`with_select` (
`f1` int(1) NOT NULL DEFAULT '0'
)
master-bin.000001 # Table_map # # table_id: # (mysqltest1.with_select)
master-bin.000001 # Write_rows # # table_id: # flags: STMT_END_F
master-bin.000001 # Query # # use `test`; COMMIT
DROP DATABASE mysqltest1;
end of the tests
......@@ -259,5 +259,22 @@ connection master;
DROP TABLE t1;
sync_slave_with_master;
#
# BUG#34707: Row based replication: slave creates table within wrong database
#
source include/master-slave-reset.inc;
connection master;
CREATE DATABASE mysqltest1;
CREATE TABLE mysqltest1.without_select (f1 BIGINT);
CREATE TABLE mysqltest1.with_select AS SELECT 1 AS f1;
source include/show_binlog_events.inc;
sync_slave_with_master;
connection master;
DROP DATABASE mysqltest1;
sync_slave_with_master;
--echo end of the tests
......@@ -4338,6 +4338,8 @@ static int write_locked_table_maps(THD *thd)
(long) thd, (long) thd->lock,
(long) thd->locked_tables, (long) thd->extra_lock));
DBUG_PRINT("debug", ("get_binlog_table_maps(): %d", thd->get_binlog_table_maps()));
if (thd->get_binlog_table_maps() == 0)
{
MYSQL_LOCK *locks[3];
......
......@@ -1377,6 +1377,8 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->options, OPTION_BEGIN)));
thd->binlog_flush_pending_rows_event(TRUE);
/*
NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of
only transactional tables. If the transaction contain changes to
......@@ -1395,8 +1397,6 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data,
were, we would have to ensure that we're not ending a statement
inside a stored function.
*/
thd->binlog_flush_pending_rows_event(TRUE);
error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev);
trx_data->reset();
......
......@@ -3558,6 +3558,24 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end)
}
static const char *
show_query_type(THD::enum_binlog_query_type qtype)
{
switch (qtype) {
case THD::ROW_QUERY_TYPE:
return "ROW";
case THD::STMT_QUERY_TYPE:
return "STMT";
case THD::MYSQL_QUERY_TYPE:
return "MYSQL";
}
static char buf[64];
sprintf(buf, "UNKNOWN#%d", qtype);
return buf;
}
/*
Member function that will log query, either row-based or
statement-based depending on the value of the 'current_stmt_binlog_row_based'
......@@ -3586,7 +3604,8 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
THD::killed_state killed_status_arg)
{
DBUG_ENTER("THD::binlog_query");
DBUG_PRINT("enter", ("qtype: %d query: '%s'", qtype, query_arg));
DBUG_PRINT("enter", ("qtype: %s query: '%s'",
show_query_type(qtype), query_arg));
DBUG_ASSERT(query_arg && mysql_bin_log.is_open());
/*
......@@ -3625,6 +3644,9 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
switch (qtype) {
case THD::ROW_QUERY_TYPE:
DBUG_PRINT("debug",
("current_stmt_binlog_row_based: %d",
current_stmt_binlog_row_based));
if (current_stmt_binlog_row_based)
DBUG_RETURN(0);
/* Otherwise, we fall through */
......
......@@ -996,6 +996,21 @@ enum enum_thread_type
SYSTEM_THREAD_EVENT_WORKER= 32
};
inline char const *
show_system_thread(enum_thread_type thread)
{
#define RETURN_NAME_AS_STRING(NAME) case (NAME): return #NAME
switch (thread) {
RETURN_NAME_AS_STRING(NON_SYSTEM_THREAD);
RETURN_NAME_AS_STRING(SYSTEM_THREAD_DELAYED_INSERT);
RETURN_NAME_AS_STRING(SYSTEM_THREAD_SLAVE_IO);
RETURN_NAME_AS_STRING(SYSTEM_THREAD_SLAVE_SQL);
RETURN_NAME_AS_STRING(SYSTEM_THREAD_NDBCLUSTER_BINLOG);
RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_SCHEDULER);
RETURN_NAME_AS_STRING(SYSTEM_THREAD_EVENT_WORKER);
}
#undef RETURN_NAME_AS_STRING
}
/**
This class represents the interface for internal error handlers.
......@@ -2092,6 +2107,10 @@ class THD :public Statement,
Don't reset binlog format for NDB binlog injector thread.
*/
DBUG_PRINT("debug",
("temporary_tables: %d, in_sub_stmt: %d, system_thread: %s",
(int) temporary_tables, in_sub_stmt,
show_system_thread(system_thread)));
if ((temporary_tables == NULL) && (in_sub_stmt == 0) &&
(system_thread != SYSTEM_THREAD_NDBCLUSTER_BINLOG))
{
......
......@@ -3635,7 +3635,8 @@ select_create::binlog_show_create_table(TABLE **tables, uint count)
tmp_table_list.table = *tables;
query.length(0); // Have to zero it since constructor doesn't
result= store_create_info(thd, &tmp_table_list, &query, create_info);
result= store_create_info(thd, &tmp_table_list, &query, create_info,
/* show_database */ TRUE);
DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
if (mysql_bin_log.is_open())
......
......@@ -5488,6 +5488,10 @@ void mysql_reset_thd_for_next_command(THD *thd)
*/
thd->reset_current_stmt_binlog_row_based();
DBUG_PRINT("debug",
("current_stmt_binlog_row_based: %d",
thd->current_stmt_binlog_row_based));
DBUG_VOID_RETURN;
}
......
......@@ -619,7 +619,8 @@ mysqld_show_create(THD *thd, TABLE_LIST *table_list)
if ((table_list->view ?
view_store_create_info(thd, table_list, &buffer) :
store_create_info(thd, table_list, &buffer, NULL)))
store_create_info(thd, table_list, &buffer, NULL,
FALSE /* show_database */)))
DBUG_RETURN(TRUE);
List<Item> field_list;
......@@ -810,7 +811,8 @@ mysqld_dump_create_info(THD *thd, TABLE_LIST *table_list, int fd)
DBUG_PRINT("enter",("table: %s",table_list->table->s->table_name.str));
protocol->prepare_for_resend();
if (store_create_info(thd, table_list, packet, NULL))
if (store_create_info(thd, table_list, packet, NULL,
FALSE /* show_database */))
DBUG_RETURN(-1);
if (fd < 0)
......@@ -1062,7 +1064,7 @@ static bool get_field_default_value(THD *thd, TABLE *table,
*/
int store_create_info(THD *thd, TABLE_LIST *table_list, String *packet,
HA_CREATE_INFO *create_info_arg)
HA_CREATE_INFO *create_info_arg, bool show_database)
{
List<Item> field_list;
char tmp[MAX_FIELD_WIDTH], *for_str, buff[128], def_value_buf[MAX_FIELD_WIDTH];
......@@ -1110,6 +1112,25 @@ int store_create_info(THD *thd, TABLE_LIST *table_list, String *packet,
alias= share->table_name.str;
}
}
/*
Print the database before the table name if told to do that. The
database name is only printed in the event that it is different
from the current database. The main reason for doing this is to
avoid having to update gazillions of tests and result files, but
it also saves a few bytes of the binary log.
*/
if (show_database)
{
const LEX_STRING *const db=
table_list->schema_table ? &INFORMATION_SCHEMA_NAME : &table->s->db;
if (strcmp(db->str, thd->db) != 0)
{
append_identifier(thd, packet, db->str, db->length);
packet->append(STRING_WITH_LEN("."));
}
}
append_identifier(thd, packet, alias, strlen(alias));
packet->append(STRING_WITH_LEN(" (\n"));
/*
......
......@@ -33,7 +33,7 @@ find_files_result find_files(THD *thd, List<LEX_STRING> *files, const char *db,
const char *path, const char *wild, bool dir);
int store_create_info(THD *thd, TABLE_LIST *table_list, String *packet,
HA_CREATE_INFO *create_info_arg);
HA_CREATE_INFO *create_info_arg, bool show_database);
int view_store_create_info(THD *thd, TABLE_LIST *table, String *buff);
int copy_event_to_schema_table(THD *thd, TABLE *sch_table, TABLE *event_table);
......
......@@ -5010,8 +5010,9 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table, TABLE_LIST* src_table,
}
VOID(pthread_mutex_unlock(&LOCK_open));
IF_DBUG(int result=) store_create_info(thd, table, &query,
create_info);
IF_DBUG(int result=)
store_create_info(thd, table, &query,
create_info, FALSE /* show_database */);
DBUG_ASSERT(result == 0); // store_create_info() always return 0
write_bin_log(thd, TRUE, query.ptr(), query.length());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment