Commit a4cd8862 authored by He Zhenxing's avatar He Zhenxing

BUG#38734 rpl_server_id2 sync_with_master failed

Rotate event is automatically generated and written when rotating binary
log or relay log. Rotate events for relay logs are usually ignored by slave
SQL thread becuase they have the same server id as that of the slave.
However, if --replicate-same-server-id is enabled, rotate event
for relay log would be treated as if it's a rotate event from master, and
would be executed by slave to update the rli->group_master_log_name and
rli->group_master_log_pos to a wrong value and cause the MASTER_POS_WAIT
function to fail and return NULL.

This patch fixed this problem by setting a flag bit (LOG_EVENT_RELAY_LOG_F)
in the event to tell the SQL thread to ignore these Rotate events generated
for relay logs.

This patch also added another binlog event flag bit (LOG_EVENT_ARTIFICIAL_F)
to distinquish faked events, the method used before this was by checking if
log_pos was zero.


sql/log.h:
  Add a member to MYSQL_BIN_LOG to distinguish binary log from relay log.
sql/log_event.cc:
  Change artificial_event member to LOG_EVENT_ARTIFICIAL_F flag
  
  If LOG_EVENT_RELAY_LOG_F is set in the event flags for a rotate event, ignore it when updating position
  
  Refactored the code in Rotate_log_event::do_update_pos
sql/log_event.h:
  Add LOG_EVENT_RELAY_LOG_F flag to Log_event flags
  Add RELAY_LOG flag to Rotate_log_event flags
sql/sql_repl.cc:
  Set LOG_EVENT_ARTIFICIAL_F for fake rotate events
parent 78789715
...@@ -2346,6 +2346,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name, ...@@ -2346,6 +2346,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
MYSQL_BIN_LOG::MYSQL_BIN_LOG() MYSQL_BIN_LOG::MYSQL_BIN_LOG()
:bytes_written(0), prepared_xids(0), file_id(1), open_count(1), :bytes_written(0), prepared_xids(0), file_id(1), open_count(1),
need_start_event(TRUE), m_table_map_version(0), need_start_event(TRUE), m_table_map_version(0),
is_relay_log(0),
description_event_for_exec(0), description_event_for_queue(0) description_event_for_exec(0), description_event_for_queue(0)
{ {
/* /*
...@@ -2543,7 +2544,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, ...@@ -2543,7 +2544,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
*/ */
description_event_for_queue->created= 0; description_event_for_queue->created= 0;
/* Don't set log_pos in event header */ /* Don't set log_pos in event header */
description_event_for_queue->artificial_event=1; description_event_for_queue->set_artificial_event();
if (description_event_for_queue->write(&log_file)) if (description_event_for_queue->write(&log_file))
goto err; goto err;
...@@ -3466,7 +3467,7 @@ void MYSQL_BIN_LOG::new_file_impl(bool need_lock) ...@@ -3466,7 +3467,7 @@ void MYSQL_BIN_LOG::new_file_impl(bool need_lock)
to change base names at some point. to change base names at some point.
*/ */
Rotate_log_event r(new_name+dirname_length(new_name), Rotate_log_event r(new_name+dirname_length(new_name),
0, LOG_EVENT_OFFSET, 0); 0, LOG_EVENT_OFFSET, is_relay_log ? Rotate_log_event::RELAY_LOG : 0);
r.write(&log_file); r.write(&log_file);
bytes_written += r.data_written; bytes_written += r.data_written;
} }
......
...@@ -274,6 +274,10 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -274,6 +274,10 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
public: public:
MYSQL_LOG::generate_name; MYSQL_LOG::generate_name;
MYSQL_LOG::is_open; MYSQL_LOG::is_open;
/* This is relay log */
bool is_relay_log;
/* /*
These describe the log's format. This is used only for relay logs. These describe the log's format. This is used only for relay logs.
_for_exec is used by the SQL thread, _for_queue by the I/O thread. It's _for_exec is used by the SQL thread, _for_queue by the I/O thread. It's
......
...@@ -810,9 +810,8 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) ...@@ -810,9 +810,8 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
if (is_artificial_event()) if (is_artificial_event())
{ {
/* /*
We should not do any cleanup on slave when reading this. We Artificial events are automatically generated and do not exist
mark this by setting log_pos to 0. Start_log_event_v3() will in master's binary log, so log_pos should be set to 0.
detect this on reading and set artificial_event=1 for the event.
*/ */
log_pos= 0; log_pos= 0;
} }
...@@ -2625,7 +2624,7 @@ Query_log_event::do_shall_skip(Relay_log_info *rli) ...@@ -2625,7 +2624,7 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Start_log_event_v3::Start_log_event_v3() Start_log_event_v3::Start_log_event_v3()
:Log_event(), created(0), binlog_version(BINLOG_VERSION), :Log_event(), created(0), binlog_version(BINLOG_VERSION),
artificial_event(0), dont_set_created(0) dont_set_created(0)
{ {
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
} }
...@@ -2673,7 +2672,7 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) ...@@ -2673,7 +2672,7 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
my_b_printf(&cache, "# Warning: this binlog was not closed properly. " my_b_printf(&cache, "# Warning: this binlog was not closed properly. "
"Most probably mysqld crashed writing it.\n"); "Most probably mysqld crashed writing it.\n");
} }
if (!artificial_event && created) if (!is_artificial_event() && created)
{ {
#ifdef WHEN_WE_HAVE_THE_RESET_CONNECTION_SQL_COMMAND #ifdef WHEN_WE_HAVE_THE_RESET_CONNECTION_SQL_COMMAND
/* /*
...@@ -2715,8 +2714,6 @@ Start_log_event_v3::Start_log_event_v3(const char* buf, ...@@ -2715,8 +2714,6 @@ Start_log_event_v3::Start_log_event_v3(const char* buf,
// prevent overrun if log is corrupted on disk // prevent overrun if log is corrupted on disk
server_version[ST_SERVER_VER_LEN-1]= 0; server_version[ST_SERVER_VER_LEN-1]= 0;
created= uint4korr(buf+ST_CREATED_OFFSET); created= uint4korr(buf+ST_CREATED_OFFSET);
/* We use log_pos to mark if this was an artificial event or not */
artificial_event= (log_pos == 0);
dont_set_created= 1; dont_set_created= 1;
} }
...@@ -3117,7 +3114,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli) ...@@ -3117,7 +3114,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
original place when it comes to us; we'll know this by checking original place when it comes to us; we'll know this by checking
log_pos ("artificial" events have log_pos == 0). log_pos ("artificial" events have log_pos == 0).
*/ */
if (!artificial_event && created && thd->transaction.all.ha_list) if (!is_artificial_event() && created && thd->transaction.all.ha_list)
{ {
/* This is not an error (XA is safe), just an information */ /* This is not an error (XA is safe), just an information */
rli->report(INFORMATION_LEVEL, 0, rli->report(INFORMATION_LEVEL, 0,
...@@ -4054,6 +4051,8 @@ Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg, ...@@ -4054,6 +4051,8 @@ Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
#endif #endif
if (flags & DUP_NAME) if (flags & DUP_NAME)
new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME)); new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
if (flags & RELAY_LOG)
set_relay_log_event();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
#endif #endif
...@@ -4125,8 +4124,6 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli) ...@@ -4125,8 +4124,6 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident)); DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident));
DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf))); DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf)));
pthread_mutex_lock(&rli->data_lock);
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
/* /*
If we are in a transaction or in a group: the only normal case is If we are in a transaction or in a group: the only normal case is
when the I/O thread was copying a big transaction, then it was when the I/O thread was copying a big transaction, then it was
...@@ -4144,23 +4141,24 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli) ...@@ -4144,23 +4141,24 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
relay log, which shall not change the group positions. relay log, which shall not change the group positions.
*/ */
if ((server_id != ::server_id || rli->replicate_same_server_id) && if ((server_id != ::server_id || rli->replicate_same_server_id) &&
!is_relay_log_event() &&
!rli->is_in_group()) !rli->is_in_group())
{ {
pthread_mutex_lock(&rli->data_lock);
DBUG_PRINT("info", ("old group_master_log_name: '%s' " DBUG_PRINT("info", ("old group_master_log_name: '%s' "
"old group_master_log_pos: %lu", "old group_master_log_pos: %lu",
rli->group_master_log_name, rli->group_master_log_name,
(ulong) rli->group_master_log_pos)); (ulong) rli->group_master_log_pos));
memcpy(rli->group_master_log_name, new_log_ident, ident_len+1); memcpy(rli->group_master_log_name, new_log_ident, ident_len+1);
rli->notify_group_master_log_name_update(); rli->notify_group_master_log_name_update();
rli->group_master_log_pos= pos; rli->inc_group_relay_log_pos(pos, TRUE /* skip_lock */);
strmake(rli->group_relay_log_name, rli->event_relay_log_name,
sizeof(rli->group_relay_log_name) - 1);
rli->notify_group_relay_log_name_update();
rli->group_relay_log_pos= rli->event_relay_log_pos;
DBUG_PRINT("info", ("new group_master_log_name: '%s' " DBUG_PRINT("info", ("new group_master_log_name: '%s' "
"new group_master_log_pos: %lu", "new group_master_log_pos: %lu",
rli->group_master_log_name, rli->group_master_log_name,
(ulong) rli->group_master_log_pos)); (ulong) rli->group_master_log_pos));
pthread_mutex_unlock(&rli->data_lock);
flush_relay_log_info(rli);
/* /*
Reset thd->options and sql_mode etc, because this could be the signal of Reset thd->options and sql_mode etc, because this could be the signal of
a master's downgrade from 5.0 to 4.0. a master's downgrade from 5.0 to 4.0.
...@@ -4174,9 +4172,9 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli) ...@@ -4174,9 +4172,9 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
thd->variables.auto_increment_increment= thd->variables.auto_increment_increment=
thd->variables.auto_increment_offset= 1; thd->variables.auto_increment_offset= 1;
} }
pthread_mutex_unlock(&rli->data_lock); else
pthread_cond_broadcast(&rli->data_cond); rli->inc_event_relay_log_pos();
flush_relay_log_info(rli);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -449,6 +449,25 @@ struct sql_ex_info ...@@ -449,6 +449,25 @@ struct sql_ex_info
*/ */
#define LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F 0x10 #define LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F 0x10
/**
@def LOG_EVENT_ARTIFICIAL_F
Artificial events are created arbitarily and not written to binary
log
These events should not update the master log position when slave
SQL thread executes them.
*/
#define LOG_EVENT_ARTIFICIAL_F 0x20
/**
@def LOG_EVENT_RELAY_LOG_F
Events with this flag set are created by slave IO thread and written
to relay log
*/
#define LOG_EVENT_RELAY_LOG_F 0x40
/** /**
@def OPTIONS_WRITTEN_TO_BIN_LOG @def OPTIONS_WRITTEN_TO_BIN_LOG
...@@ -966,7 +985,10 @@ class Log_event ...@@ -966,7 +985,10 @@ class Log_event
#endif #endif
virtual Log_event_type get_type_code() = 0; virtual Log_event_type get_type_code() = 0;
virtual bool is_valid() const = 0; virtual bool is_valid() const = 0;
virtual bool is_artificial_event() { return 0; } void set_artificial_event() { flags |= LOG_EVENT_ARTIFICIAL_F; }
void set_relay_log_event() { flags |= LOG_EVENT_RELAY_LOG_F; }
bool is_artificial_event() const { return flags & LOG_EVENT_ARTIFICIAL_F; }
bool is_relay_log_event() const { return flags & LOG_EVENT_RELAY_LOG_F; }
inline bool get_cache_stmt() const { return cache_stmt; } inline bool get_cache_stmt() const { return cache_stmt; }
Log_event(const char* buf, const Format_description_log_event Log_event(const char* buf, const Format_description_log_event
*description_event); *description_event);
...@@ -2064,12 +2086,6 @@ class Start_log_event_v3: public Log_event ...@@ -2064,12 +2086,6 @@ class Start_log_event_v3: public Log_event
time_t created; time_t created;
uint16 binlog_version; uint16 binlog_version;
char server_version[ST_SERVER_VER_LEN]; char server_version[ST_SERVER_VER_LEN];
/*
artifical_event is 1 in the case where this is a generated event that
should not case any cleanup actions. We handle this in the log by
setting log_event == 0 (for now).
*/
bool artificial_event;
/* /*
We set this to 1 if we don't want to have the created time in the log, We set this to 1 if we don't want to have the created time in the log,
which is the case when we rollover to a new log. which is the case when we rollover to a new log.
...@@ -2098,7 +2114,6 @@ class Start_log_event_v3: public Log_event ...@@ -2098,7 +2114,6 @@ class Start_log_event_v3: public Log_event
{ {
return START_V3_HEADER_LEN; //no variable-sized part return START_V3_HEADER_LEN; //no variable-sized part
} }
virtual bool is_artificial_event() { return artificial_event; }
protected: protected:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
...@@ -2518,7 +2533,8 @@ class Rotate_log_event: public Log_event ...@@ -2518,7 +2533,8 @@ class Rotate_log_event: public Log_event
{ {
public: public:
enum { enum {
DUP_NAME= 2 // if constructor should dup the string argument DUP_NAME= 2, // if constructor should dup the string argument
RELAY_LOG=4 // rotate event for relay log
}; };
const char* new_log_ident; const char* new_log_ident;
ulonglong pos; ulonglong pos;
......
...@@ -155,6 +155,7 @@ int init_relay_log_info(Relay_log_info* rli, ...@@ -155,6 +155,7 @@ int init_relay_log_info(Relay_log_info* rli,
sql_print_error("Failed in open_log() called from init_relay_log_info()"); sql_print_error("Failed in open_log() called from init_relay_log_info()");
DBUG_RETURN(1); DBUG_RETURN(1);
} }
rli->relay_log.is_relay_log= TRUE;
} }
/* if file does not exist */ /* if file does not exist */
......
...@@ -63,7 +63,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, ...@@ -63,7 +63,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN; ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
int4store(header + SERVER_ID_OFFSET, server_id); int4store(header + SERVER_ID_OFFSET, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len); int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, 0); int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
// TODO: check what problems this may cause and fix them // TODO: check what problems this may cause and fix them
int4store(header + LOG_POS_OFFSET, 0); int4store(header + LOG_POS_OFFSET, 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment