Commit 11fc24ef authored by mats@romeo.(none)'s avatar mats@romeo.(none)

BUG#25688 (RBR: circular replication may cause STMT_END_F flags to be

skipped):

By moving statement end actions from Rows_log_event::do_apply_event() to
Rows_log_event::do_update_pos() they will always be executed, even if
Rows_log_event::do_apply_event() is skipped because the event originated
at the same server. This because Rows_log_event::do_update_pos() is always
executed (unless Rows_log_event::do_apply_event() failed with an error,
in which case the slave stops with an error anyway). 

Adding test case.

Fixing logic to detect if inside a group. If a rotate event occured
when an initial prefix of events for a statement, but for which the
table did contain a key, last_event_start_time is set to zero, causing
rotate to end the group but without unlocking any tables. This left a
lock hanging around, which subsequently triggered an assertion when a
second attempt was made to lock the same sequence of tables.

In order to solve the above problem, a new flag was added to the relay
log info structure that is used to indicate that the replication thread
is currently executing a statement. Using this flag, the replication
thread is in a group if it is either in a statement or inside a trans-
action.

The patch also eliminates some gratuitous header file inclusions that
were not needed (and caused compile errors) and replaced them with
forward definitions.
parent 220c4531
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
CREATE TABLE t1 (a int key, b int) ENGINE=NDB;
SHOW TABLES;
Tables_in_test
t1
RESET MASTER;
INSERT INTO t1 VALUES (1,2);
INSERT INTO t1 VALUES (2,3);
STOP SLAVE;
CHANGE MASTER TO MASTER_HOST="127.0.0.1",MASTER_PORT=SLAVE_PORT,MASTER_USER="root";
RESET MASTER;
START SLAVE;
SHOW SLAVE STATUS;
Slave_IO_State #
Master_Host 127.0.0.1
Master_User root
Master_Port 9308
Connect_Retry 60
Master_Log_File slave-bin.000001
Read_Master_Log_Pos 468
Relay_Log_File #
Relay_Log_Pos #
Relay_Master_Log_File slave-bin.000001
Slave_IO_Running Yes
Slave_SQL_Running Yes
Replicate_Do_DB
Replicate_Ignore_DB
Replicate_Do_Table
Replicate_Ignore_Table
Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table
Last_Errno 0
Last_Error
Skip_Counter 0
Exec_Master_Log_Pos 468
Relay_Log_Space #
Until_Condition None
Until_Log_File
Until_Log_Pos 0
Master_SSL_Allowed No
Master_SSL_CA_File
Master_SSL_CA_Path
Master_SSL_Cert
Master_SSL_Cipher
Master_SSL_Key
Seconds_Behind_Master #
SELECT * FROM t1 ORDER BY a;
a b
1 2
2 3
STOP SLAVE;
START SLAVE;
SELECT * FROM t1 ORDER BY a;
a b
1 2
2 3
--source include/have_ndb.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
connection master;
CREATE TABLE t1 (a int key, b int) ENGINE=NDB;
sync_slave_with_master;
SHOW TABLES;
# Lose the events from the slave binary log: there is no
# need to re-create the table on the master.
connection slave;
RESET MASTER;
# Insert some values on the slave and master
connection master;
INSERT INTO t1 VALUES (1,2);
# Switch to slave once event is applied and insert a row
sync_slave_with_master;
connection slave;
INSERT INTO t1 VALUES (2,3);
# ... it is now very probable that we have a mixed event in the binary
# log. If we don't, the test should still pass, but will not test the
# mixed event situation.
# The statement is disabled since it cannot reliably show the same
# info all the time. Use it for debug purposes.
#SHOW BINLOG EVENTS;
# Replicate back to the master to test this mixed event on the master
STOP SLAVE;
connection master;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval CHANGE MASTER TO MASTER_HOST="127.0.0.1",MASTER_PORT=$SLAVE_MYPORT,MASTER_USER="root";
RESET MASTER;
START SLAVE;
connection slave;
save_master_pos;
connection master;
sync_with_master;
# The statement is disabled since it cannot reliably show the same
# info all the time. Use it for debug purposes.
#SHOW BINLOG EVENTS;
# Check that there is no error in replication
--replace_result $MASTER_MYPORT MASTER_PORT
--replace_column 1 # 8 # 9 # 23 # 33 #
query_vertical SHOW SLAVE STATUS;
# Check that we have the data on the master
SELECT * FROM t1 ORDER BY a;
# We should now have another mixed event, likely with "slave" server
# id last, and with the STMT_END_F flag set.
# The statement is disabled since it cannot reliably show the same
# info all the time. Use it for debug purposes.
#SHOW BINLOG EVENTS;
# now lets see that this data is applied correctly on the slave
STOP SLAVE;
save_master_pos;
connection slave;
START SLAVE;
# check that we have the data on the slave
sync_with_master;
SELECT * FROM t1 ORDER BY a;
......@@ -22,6 +22,7 @@
#include "mysql_priv.h"
#include "slave.h" // for wait_for_master_pos
#include "rpl_mi.h"
#include <m_ctype.h>
#include <hash.h>
#include <time.h>
......
......@@ -20,6 +20,7 @@
#include "mysql_priv.h"
#include "sql_repl.h"
#include "rpl_filter.h"
#include "rpl_rli.h"
#include <my_dir.h>
#include <stdarg.h>
......
This diff is collapsed.
......@@ -841,6 +841,7 @@ class Log_event
}
protected:
/**
Primitive to apply an event to the database.
......@@ -2242,6 +2243,7 @@ class Rows_log_event : public Log_event
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
virtual int do_update_pos(RELAY_LOG_INFO *rli);
/*
Primitive to prepare for a sequence of row executions.
......
......@@ -17,6 +17,7 @@
#include <m_ctype.h>
#include <my_dir.h>
#include "slave.h"
#include "rpl_mi.h"
#include "sql_repl.h"
#include "rpl_filter.h"
#include "repl_failsafe.h"
......
......@@ -19,6 +19,7 @@
#include "repl_failsafe.h"
#include "sql_repl.h"
#include "slave.h"
#include "rpl_mi.h"
#include "rpl_filter.h"
#include "log_event.h"
#include <mysql.h>
......
......@@ -18,6 +18,9 @@
#ifdef HAVE_REPLICATION
#include "rpl_rli.h"
/*****************************************************************************
Replication IO Thread
......
......@@ -15,6 +15,7 @@
#include "mysql_priv.h"
#include "rpl_mi.h"
#include "rpl_rli.h"
#include <my_dir.h> // For MY_STAT
#include "sql_repl.h" // For check_binlog_magic
......@@ -37,7 +38,7 @@ st_relay_log_info::st_relay_log_info()
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0),
tables_to_lock(0), tables_to_lock_count(0),
last_event_start_time(0)
last_event_start_time(0), m_flags(0)
{
DBUG_ENTER("st_relay_log_info::st_relay_log_info");
......@@ -1086,6 +1087,52 @@ bool st_relay_log_info::cached_charset_compare(char *charset) const
}
void st_relay_log_info::stmt_done(my_off_t const event_master_log_pos,
time_t event_creation_time)
{
clear_flag(IN_STMT);
/*
If in a transaction, and if the slave supports transactions, just
inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
(not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
BEGIN/COMMIT, not with SET AUTOCOMMIT= .
CAUTION: opt_using_transactions means innodb || bdb ; suppose the
master supports InnoDB and BDB, but the slave supports only BDB,
problems will arise: - suppose an InnoDB table is created on the
master, - then it will be MyISAM on the slave - but as
opt_using_transactions is true, the slave will believe he is
transactional with the MyISAM table. And problems will come when
one does START SLAVE; STOP SLAVE; START SLAVE; (the slave will
resume at BEGIN whereas there has not been any rollback). This is
the problem of using opt_using_transactions instead of a finer
"does the slave support _transactional handler used on the
master_".
More generally, we'll have problems when a query mixes a
transactional handler and MyISAM and STOP SLAVE is issued in the
middle of the "transaction". START SLAVE will resume at BEGIN
while the MyISAM table has already been updated.
*/
if ((sql_thd->options & OPTION_BEGIN) && opt_using_transactions)
inc_event_relay_log_pos();
else
{
inc_group_relay_log_pos(event_master_log_pos);
flush_relay_log_info(this);
/*
Note that Rotate_log_event::do_apply_event() does not call this
function, so there is no chance that a fake rotate event resets
last_master_timestamp. Note that we update without mutex
(probably ok - except in some very rare cases, only consequence
is that value may take some time to display in
Seconds_Behind_Master - not critical).
*/
last_master_timestamp= event_creation_time;
}
}
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
void st_relay_log_info::cleanup_context(THD *thd, bool error)
{
......@@ -1112,6 +1159,7 @@ void st_relay_log_info::cleanup_context(THD *thd, bool error)
m_table_map.clear_tables();
close_thread_tables(thd);
clear_tables_to_lock();
clear_flag(IN_STMT);
last_event_start_time= 0;
DBUG_VOID_RETURN;
}
......
......@@ -51,6 +51,17 @@ struct RPL_TABLE_LIST;
typedef struct st_relay_log_info
{
/**
Flags for the state of the replication.
*/
enum enum_state_flag {
/** The replication thread is inside a statement */
IN_STMT,
/** Flag counter. Should always be last */
STATE_FLAGS_COUNT
};
/*
If flag set, then rli does not store its state in any info file.
This is the case only when we execute BINLOG SQL commands inside
......@@ -314,6 +325,66 @@ typedef struct st_relay_log_info
transaction).
*/
time_t last_event_start_time;
/**
Helper function to do after statement completion.
This function is called from an event to complete the group by
either stepping the group position, if the "statement" is not
inside a transaction; or increase the event position, if the
"statement" is inside a transaction.
@param event_log_pos
Master log position of the event. The position is recorded in the
relay log info and used to produce information for <code>SHOW
SLAVE STATUS</code>.
@param event_creation_time
Timestamp for the creation of the event on the master side. The
time stamp is recorded in the relay log info and used to compute
the <code>Seconds_behind_master</code> field.
*/
void stmt_done(my_off_t event_log_pos,
time_t event_creation_time);
/**
Set the value of a replication state flag.
@param flag Flag to set
*/
void set_flag(enum_state_flag flag)
{
m_flags |= (1UL << flag);
}
/**
Clear the value of a replication state flag.
@param flag Flag to clear
*/
void clear_flag(enum_state_flag flag)
{
m_flags &= ~(1UL << flag);
}
/**
Is the replication inside a group?
Replication is inside a group if either:
- The OPTION_BEGIN flag is set, meaning we're inside a transaction
- The RLI_IN_STMT flag is set, meaning we're inside a statement
@retval true Replication thread is currently inside a group
@retval false Replication thread is currently not inside a group
*/
bool is_in_group() const {
return (sql_thd->options & OPTION_BEGIN) ||
(m_flags & (1UL << IN_STMT));
}
private:
uint32 m_flags;
} RELAY_LOG_INFO;
......
......@@ -51,6 +51,7 @@
#include "mysql_priv.h"
#include <mysql.h>
#include "slave.h"
#include "rpl_mi.h"
#include <my_getopt.h>
#include <thr_alarm.h>
#include <myisam.h>
......
......@@ -17,8 +17,9 @@
#include <mysql.h>
#include <myisam.h>
#include "rpl_rli.h"
#include "slave.h"
#include "rpl_mi.h"
#include "rpl_rli.h"
#include "sql_repl.h"
#include "rpl_filter.h"
#include "repl_failsafe.h"
......@@ -1731,11 +1732,12 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
/*
*/
DBUG_PRINT("info",("type_code=%d (%s), server_id=%d",
type_code, ev->get_type_str(), ev->server_id));
DBUG_PRINT("info", ("thd->options={ %s%s}",
DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)",
ev->get_type_str(), type_code, ev->server_id));
DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu",
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->options, OPTION_BEGIN)));
FLAGSTR(thd->options, OPTION_BEGIN),
rli->last_event_start_time));
......@@ -1777,21 +1779,21 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli);
#ifndef DBUG_OFF
else
{
/*
This only prints information to the debug trace.
/*
This only prints information to the debug trace.
TODO: Print an informational message to the error log?
*/
static const char *const explain[] = {
"event was not skipped", // EVENT_SKIP_NOT,
"event originated from this server", // EVENT_SKIP_IGNORE,
"event skip counter was non-zero" // EVENT_SKIP_COUNT
};
DBUG_PRINT("info", ("%s was skipped because %s",
ev->get_type_str(), explain[reason]));
}
TODO: Print an informational message to the error log?
*/
static const char *const explain[] = {
// EVENT_SKIP_NOT,
"not skipped",
// EVENT_SKIP_IGNORE,
"skipped because event originated from this server",
// EVENT_SKIP_COUNT
"skipped because event skip counter was non-zero"
};
DBUG_PRINT("skip_event", ("%s event was %s",
ev->get_type_str(), explain[reason]));
#endif
DBUG_PRINT("info", ("apply_event error = %d", exec_res));
......
......@@ -22,13 +22,18 @@
#include "my_list.h"
#include "rpl_filter.h"
#include "rpl_tblmap.h"
#include "rpl_rli.h"
#include "rpl_mi.h"
#define SLAVE_NET_TIMEOUT 3600
#define MAX_SLAVE_ERROR 2000
// Forward declarations
struct st_relay_log_info;
typedef st_relay_log_info RELAY_LOG_INFO;
class MASTER_INFO;
/*****************************************************************************
MySQL Replication
......
......@@ -14,6 +14,7 @@
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "mysql_priv.h"
#include "rpl_rli.h"
#include "base64.h"
/*
......
......@@ -26,6 +26,7 @@
#endif
#include "mysql_priv.h"
#include "rpl_rli.h"
#include <my_bitmap.h>
#include "log_event.h"
#include <m_ctype.h>
......
......@@ -21,9 +21,11 @@
#endif
#include "log.h"
#include "rpl_rli.h"
#include "rpl_tblmap.h"
struct st_relay_log_info;
typedef st_relay_log_info RELAY_LOG_INFO;
class Query_log_event;
class Load_log_event;
class Slave_log_event;
......
......@@ -60,6 +60,7 @@
#include "sql_select.h"
#include "sql_show.h"
#include "slave.h"
#include "rpl_mi.h"
#ifndef EMBEDDED_LIBRARY
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
......
......@@ -16,6 +16,7 @@
#include "mysql_priv.h"
#ifdef HAVE_REPLICATION
#include "rpl_mi.h"
#include "sql_repl.h"
#include "log_event.h"
#include "rpl_filter.h"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment