Commit dafe440f authored by unknown's avatar unknown

Bug#46640: output from mysqlbinlog command in 5.1 breaks replication

      
The BINLOG statement was sharing too much code with the slave SQL thread, introduced with
the patch for Bug#32407. This caused statements to be logged with the wrong server_id, the
id stored inside the events of the BINLOG statement rather than the id of the running 
server.
      
Fix by rearranging code a bit so that only relevant parts of the code are executed by
the BINLOG statement, and the server_id of the server executing the statements will 
not be overrided by the server_id stored in the 'format description BINLOG statement'.

mysql-test/extra/binlog_tests/binlog.test:
  Added test to verify if the server_id stored in the 'format 
  description BINLOG statement' will override the server_id
  of the server executing the statements.
mysql-test/suite/binlog/r/binlog_row_binlog.result:
  Test result for bug#46640
mysql-test/suite/binlog/r/binlog_stm_binlog.result:
  Test result for bug#46640
sql/log_event.cc:
  Moved rows_event_stmt_clean() call from update_pos() to apply_event(). This in any case
  makes more sense, and is needed as update_pos() is no longer called when executing
  BINLOG statements.
  
  Moved setting of rli->relay_log.description_event_for_exec from 
  Format_description_log_event::do_update_pos() to 
  Format_description_log_event::do_apply_event()
sql/log_event_old.cc:
  Moved rows_event_stmt_clean() call from update_pos() to apply_event(). This in any case
  makes more sense, and is needed as update_pos() is no longer called when executing
  BINLOG statements.
sql/slave.cc:
  The skip flag is no longer needed, as the code path for BINLOG statement has been 
  cleaned up.
sql/sql_binlog.cc:
  Don't invoke the update_pos() code path for the BINLOG statement, as it contains code 
  that is redundant and/or harmful (especially setting thd->server_id).
parent 140bb6ee
......@@ -270,3 +270,42 @@ INSERT INTO test.t1 VALUES (1), (2);
CREATE TABLE test.t2 SELECT * FROM test.t1;
USE test;
DROP TABLES t1, t2;
#
# Bug#46640
# This test verifies if the server_id stored in the "format
# description BINLOG statement" will override the server_id
# of the server executing the statements.
#
connect (fresh,localhost,root,,test);
connection fresh;
RESET MASTER;
CREATE TABLE t1 (a INT PRIMARY KEY);
# Format description event, with server_id = 10;
BINLOG '
3u9kSA8KAAAAZgAAAGoAAAABAAQANS4xLjM1LW1hcmlhLWJldGExLWRlYnVnLWxvZwAAAAAAAAAA
AAAAAAAAAAAAAAAAAADe72RIEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC
';
# What server_id is logged for a statement? Should be our own, not the
# one from the format description event.
INSERT INTO t1 VALUES (1);
# INSERT INTO t1 VALUES (2), with server_id=20. Check that this is logged
# with our own server id, not the 20 from the BINLOG statement.
BINLOG '
3u9kSBMUAAAAKQAAAJEBAAAAABoAAAAAAAAABHRlc3QAAnQxAAEDAAA=
3u9kSBcUAAAAIgAAALMBAAAQABoAAAAAAAEAAf/+AgAAAA==
';
# Show binlog events to check that server ids are correct.
--replace_column 1 # 2 # 5 #
--replace_regex /Server ver: .*, Binlog ver: .*/Server ver: #, Binlog ver: #/ /table_id: [0-9]+/table_id: #/
SHOW BINLOG EVENTS;
DROP TABLE t1;
disconnect fresh;
......@@ -1309,3 +1309,27 @@ INSERT INTO test.t1 VALUES (1), (2);
CREATE TABLE test.t2 SELECT * FROM test.t1;
USE test;
DROP TABLES t1, t2;
RESET MASTER;
CREATE TABLE t1 (a INT PRIMARY KEY);
BINLOG '
3u9kSA8KAAAAZgAAAGoAAAABAAQANS4xLjM1LW1hcmlhLWJldGExLWRlYnVnLWxvZwAAAAAAAAAA
AAAAAAAAAAAAAAAAAADe72RIEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC
';
INSERT INTO t1 VALUES (1);
BINLOG '
3u9kSBMUAAAAKQAAAJEBAAAAABoAAAAAAAAABHRlc3QAAnQxAAEDAAA=
3u9kSBcUAAAAIgAAALMBAAAQABoAAAAAAAEAAf/+AgAAAA==
';
SHOW BINLOG EVENTS;
Log_name Pos Event_type Server_id End_log_pos Info
# # Format_desc 1 # Server ver: #, Binlog ver: #
# # Query 1 # use `test`; CREATE TABLE t1 (a INT PRIMARY KEY)
# # Query 1 # BEGIN
# # Table_map 1 # table_id: # (test.t1)
# # Write_rows 1 # table_id: # flags: STMT_END_F
# # Query 1 # COMMIT
# # Query 1 # BEGIN
# # Table_map 1 # table_id: # (test.t1)
# # Write_rows 1 # table_id: # flags: STMT_END_F
# # Query 1 # COMMIT
DROP TABLE t1;
......@@ -784,3 +784,24 @@ INSERT INTO test.t1 VALUES (1), (2);
CREATE TABLE test.t2 SELECT * FROM test.t1;
USE test;
DROP TABLES t1, t2;
RESET MASTER;
CREATE TABLE t1 (a INT PRIMARY KEY);
BINLOG '
3u9kSA8KAAAAZgAAAGoAAAABAAQANS4xLjM1LW1hcmlhLWJldGExLWRlYnVnLWxvZwAAAAAAAAAA
AAAAAAAAAAAAAAAAAADe72RIEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC
';
INSERT INTO t1 VALUES (1);
BINLOG '
3u9kSBMUAAAAKQAAAJEBAAAAABoAAAAAAAAABHRlc3QAAnQxAAEDAAA=
3u9kSBcUAAAAIgAAALMBAAAQABoAAAAAAAEAAf/+AgAAAA==
';
SHOW BINLOG EVENTS;
Log_name Pos Event_type Server_id End_log_pos Info
# # Format_desc 1 # Server ver: #, Binlog ver: #
# # Query 1 # use `test`; CREATE TABLE t1 (a INT PRIMARY KEY)
# # Query 1 # use `test`; INSERT INTO t1 VALUES (1)
# # Query 1 # BEGIN
# # Table_map 1 # table_id: # (test.t1)
# # Write_rows 1 # table_id: # flags: STMT_END_F
# # Query 1 # COMMIT
DROP TABLE t1;
......@@ -3859,6 +3859,7 @@ bool Format_description_log_event::write(IO_CACHE* file)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
{
int ret= 0;
DBUG_ENTER("Format_description_log_event::do_apply_event");
#ifdef USING_TRANSACTIONS
......@@ -3900,17 +3901,21 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
0, then 96, then jump to first really asked event (which is
>96). So this is ok.
*/
DBUG_RETURN(Start_log_event_v3::do_apply_event(rli));
ret= Start_log_event_v3::do_apply_event(rli);
}
DBUG_RETURN(0);
if (!ret)
{
/* Save the information describing this binlog */
delete rli->relay_log.description_event_for_exec;
const_cast<Relay_log_info *>(rli)->relay_log.description_event_for_exec= this;
}
DBUG_RETURN(ret);
}
int Format_description_log_event::do_update_pos(Relay_log_info *rli)
{
/* save the information describing this binlog */
delete rli->relay_log.description_event_for_exec;
rli->relay_log.description_event_for_exec= this;
if (server_id == (uint32) ::server_id)
{
/*
......@@ -7506,6 +7511,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
thd->reset_current_stmt_binlog_row_based();
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
thd->is_slave_error= 1;
DBUG_RETURN(error);
}
/*
This code would ideally be placed in do_update_pos() instead, but
......@@ -7534,6 +7540,14 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
}
if (get_flags(STMT_END_F))
if (error= rows_event_stmt_cleanup(rli, thd))
rli->report(ERROR_LEVEL, error,
"Error in %s event: commit of row events failed, "
"table `%s`.`%s`",
get_type_str(), m_table->s->db.str,
m_table->s->table_name.str);
DBUG_RETURN(error);
}
......@@ -7632,33 +7646,19 @@ Rows_log_event::do_update_pos(Relay_log_info *rli)
if (get_flags(STMT_END_F))
{
if ((error= rows_event_stmt_cleanup(rli, thd)) == 0)
{
/*
Indicate that a statement is finished.
Step the group log position if we are not in a transaction,
otherwise increase the event log position.
*/
rli->stmt_done(log_pos, when);
/*
Clear any errors pushed in thd->net.last_err* if for example "no key
found" (as this is allowed). This is a safety measure; apparently
those errors (e.g. when executing a Delete_rows_log_event of a
non-existing row, like in rpl_row_mystery22.test,
thd->net.last_error = "Can't find record in 't1'" and last_errno=1032)
do not become visible. We still prefer to wipe them out.
*/
thd->clear_error();
}
else
{
rli->report(ERROR_LEVEL, error,
"Error in %s event: commit of row events failed, "
"table `%s`.`%s`",
get_type_str(), m_table->s->db.str,
m_table->s->table_name.str);
}
/*
Indicate that a statement is finished.
Step the group log position if we are not in a transaction,
otherwise increase the event log position.
*/
rli->stmt_done(log_pos, when);
/*
Clear any errors in thd->net.last_err*. It is not known if this is
needed or not. It is believed that any errors that may exist in
thd->net.last_err* are allowed. Examples of errors are "key not
found", which is produced in the test case rpl_row_conflicts.test
*/
thd->clear_error();
}
else
{
......
......@@ -1814,33 +1814,6 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
}
DBUG_RETURN(0);
}
Log_event::enum_skip_reason
Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
{
/*
If the slave skip counter is 1 and this event does not end a
statement, then we should not start executing on the next event.
Otherwise, we defer the decision to the normal skipping logic.
*/
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
return Log_event::EVENT_SKIP_IGNORE;
else
return Log_event::do_shall_skip(rli);
}
int
Old_rows_log_event::do_update_pos(Relay_log_info *rli)
{
DBUG_ENTER("Old_rows_log_event::do_update_pos");
int error= 0;
DBUG_PRINT("info", ("flags: %s",
get_flags(STMT_END_F) ? "STMT_END_F " : ""));
if (get_flags(STMT_END_F))
{
/*
......@@ -1869,7 +1842,12 @@ Old_rows_log_event::do_update_pos(Relay_log_info *rli)
are involved, commit the transaction and flush the pending event to the
binlog.
*/
error= ha_autocommit_or_rollback(thd, 0);
if (error= ha_autocommit_or_rollback(thd, 0))
rli->report(ERROR_LEVEL, error,
"Error in %s event: commit of row events failed, "
"table `%s`.`%s`",
get_type_str(), m_table->s->db.str,
m_table->s->table_name.str);
/*
Now what if this is not a transactional engine? we still need to
......@@ -1882,33 +1860,51 @@ Old_rows_log_event::do_update_pos(Relay_log_info *rli)
*/
thd->reset_current_stmt_binlog_row_based();
rli->cleanup_context(thd, 0);
if (error == 0)
{
/*
Indicate that a statement is finished.
Step the group log position if we are not in a transaction,
otherwise increase the event log position.
*/
rli->stmt_done(log_pos, when);
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
}
/*
Clear any errors pushed in thd->net.client_last_err* if for
example "no key found" (as this is allowed). This is a safety
measure; apparently those errors (e.g. when executing a
Delete_rows_log_event_old of a non-existing row, like in
rpl_row_mystery22.test, thd->net.last_error = "Can't
find record in 't1'" and last_errno=1032) do not become
visible. We still prefer to wipe them out.
*/
thd->clear_error();
}
else
rli->report(ERROR_LEVEL, error,
"Error in %s event: commit of row events failed, "
"table `%s`.`%s`",
get_type_str(), m_table->s->db.str,
m_table->s->table_name.str);
DBUG_RETURN(error);
}
Log_event::enum_skip_reason
Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
{
/*
If the slave skip counter is 1 and this event does not end a
statement, then we should not start executing on the next event.
Otherwise, we defer the decision to the normal skipping logic.
*/
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
return Log_event::EVENT_SKIP_IGNORE;
else
return Log_event::do_shall_skip(rli);
}
int
Old_rows_log_event::do_update_pos(Relay_log_info *rli)
{
DBUG_ENTER("Old_rows_log_event::do_update_pos");
int error= 0;
DBUG_PRINT("info", ("flags: %s",
get_flags(STMT_END_F) ? "STMT_END_F " : ""));
if (get_flags(STMT_END_F))
{
/*
Indicate that a statement is finished.
Step the group log position if we are not in a transaction,
otherwise increase the event log position.
*/
rli->stmt_done(log_pos, when);
/*
Clear any errors in thd->net.last_err*. It is not known if this is
needed or not. It is believed that any errors that may exist in
thd->net.last_err* are allowed. Examples of errors are "key not
found", which is produced in the test case rpl_row_conflicts.test
*/
thd->clear_error();
}
else
{
......
......@@ -2082,8 +2082,7 @@ static int has_temporary_error(THD *thd)
@retval 2 No error calling ev->apply_event(), but error calling
ev->update_pos().
*/
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
bool skip)
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
{
int exec_res= 0;
......@@ -2128,37 +2127,33 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
ev->when= my_time(0);
ev->thd = thd; // because up to this point, ev->thd == 0
if (skip)
{
int reason= ev->shall_skip(rli);
if (reason == Log_event::EVENT_SKIP_COUNT)
--rli->slave_skip_counter;
pthread_mutex_unlock(&rli->data_lock);
if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli);
int reason= ev->shall_skip(rli);
if (reason == Log_event::EVENT_SKIP_COUNT)
--rli->slave_skip_counter;
pthread_mutex_unlock(&rli->data_lock);
if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli);
#ifndef DBUG_OFF
/*
This only prints information to the debug trace.
/*
This only prints information to the debug trace.
TODO: Print an informational message to the error log?
*/
static const char *const explain[] = {
// EVENT_SKIP_NOT,
"not skipped",
// EVENT_SKIP_IGNORE,
"skipped because event should be ignored",
// EVENT_SKIP_COUNT
"skipped because event skip counter was non-zero"
};
DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
thd->options & OPTION_BEGIN ? 1 : 0,
rli->get_flag(Relay_log_info::IN_STMT)));
DBUG_PRINT("skip_event", ("%s event was %s",
ev->get_type_str(), explain[reason]));
TODO: Print an informational message to the error log?
*/
static const char *const explain[] = {
// EVENT_SKIP_NOT,
"not skipped",
// EVENT_SKIP_IGNORE,
"skipped because event should be ignored",
// EVENT_SKIP_COUNT
"skipped because event skip counter was non-zero"
};
DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
thd->options & OPTION_BEGIN ? 1 : 0,
rli->get_flag(Relay_log_info::IN_STMT)));
DBUG_PRINT("skip_event", ("%s event was %s",
ev->get_type_str(), explain[reason]));
#endif
}
else
exec_res= ev->apply_event(rli);
DBUG_PRINT("info", ("apply_event error = %d", exec_res));
if (exec_res == 0)
......@@ -2278,7 +2273,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
delete ev;
DBUG_RETURN(1);
}
exec_res= apply_event_and_update_pos(ev, thd, rli, TRUE);
exec_res= apply_event_and_update_pos(ev, thd, rli);
/*
Format_description_log_event should not be deleted because it will be
......
......@@ -190,8 +190,7 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli);
void rotate_relay_log(Master_info* mi);
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
bool skip);
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli);
pthread_handler_t handle_slave_io(void *arg);
pthread_handler_t handle_slave_sql(void *arg);
......
......@@ -56,17 +56,20 @@ void mysql_client_binlog_statement(THD* thd)
Format_description_event.
*/
my_bool have_fd_event= TRUE;
if (!thd->rli_fake)
int err;
Relay_log_info *rli;
rli= thd->rli_fake;
if (!rli)
{
thd->rli_fake= new Relay_log_info;
rli= thd->rli_fake= new Relay_log_info;
#ifdef HAVE_purify
thd->rli_fake->is_fake= TRUE;
rli->is_fake= TRUE;
#endif
have_fd_event= FALSE;
}
if (thd->rli_fake && !thd->rli_fake->relay_log.description_event_for_exec)
if (rli && !rli->relay_log.description_event_for_exec)
{
thd->rli_fake->relay_log.description_event_for_exec=
rli->relay_log.description_event_for_exec=
new Format_description_log_event(4);
have_fd_event= FALSE;
}
......@@ -78,16 +81,16 @@ void mysql_client_binlog_statement(THD* thd)
/*
Out of memory check
*/
if (!(thd->rli_fake &&
thd->rli_fake->relay_log.description_event_for_exec &&
if (!(rli &&
rli->relay_log.description_event_for_exec &&
buf))
{
my_error(ER_OUTOFMEMORY, MYF(0), 1); /* needed 1 bytes */
goto end;
}
thd->rli_fake->sql_thd= thd;
thd->rli_fake->no_storage= TRUE;
rli->sql_thd= thd;
rli->no_storage= TRUE;
for (char const *strptr= thd->lex->comment.str ;
strptr < thd->lex->comment.str + thd->lex->comment.length ; )
......@@ -170,8 +173,7 @@ void mysql_client_binlog_statement(THD* thd)
}
ev= Log_event::read_log_event(bufptr, event_len, &error,
thd->rli_fake->relay_log.
description_event_for_exec);
rli->relay_log.description_event_for_exec);
DBUG_PRINT("info",("binlog base64 err=%s", error));
if (!ev)
......@@ -209,18 +211,10 @@ void mysql_client_binlog_statement(THD* thd)
reporting.
*/
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
if (apply_event_and_update_pos(ev, thd, thd->rli_fake, FALSE))
{
delete ev;
/*
TODO: Maybe a better error message since the BINLOG statement
now contains several events.
*/
my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement");
goto end;
}
err= ev->apply_event(rli);
#else
err= 0;
#endif
/*
Format_description_log_event should not be deleted because it
will be used to read info about the relay log's format; it
......@@ -228,8 +222,17 @@ void mysql_client_binlog_statement(THD* thd)
i.e. when this thread terminates.
*/
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
delete ev;
delete ev;
ev= 0;
if (err)
{
/*
TODO: Maybe a better error message since the BINLOG statement
now contains several events.
*/
my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement");
goto end;
}
}
}
......@@ -238,7 +241,7 @@ void mysql_client_binlog_statement(THD* thd)
my_ok(thd);
end:
thd->rli_fake->clear_tables_to_lock();
rli->clear_tables_to_lock();
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
DBUG_VOID_RETURN;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment