Commit f74c745a authored by unknown's avatar unknown

MDEV-4488: When master is on the list of ignore_server_ids, GTID position on slave is not updated

The ignored events are not written to the relay log, but instead a fake
Rotate event is generated to handle update of position.

Extend this for Gtid so we similarly generate a fake Gtid_list event
to update the GTID position.

Also fix an unrelated test issue that got triggered by the added test cases.
parent 33c66eb7
include/rpl_init.inc [topology=1->2]
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
SET @old_gtid_strict_mode= @@GLOBAL.gtid_strict_mode;
SET GLOBAL gtid_strict_mode= 1;
include/stop_slave.inc
SET @old_gtid_strict_mode= @@GLOBAL.gtid_strict_mode;
SET GLOBAL gtid_strict_mode=1;
CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
**** MDEV-4488: GTID position should be updated for events that are ignored due to server id ***
include/stop_slave.inc
CHANGE MASTER TO ignore_server_ids=(1);
include/start_slave.inc
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (3);
RESULT
OK
SELECT * FROM t1 ORDER BY a;
a
1
include/stop_slave.inc
CHANGE MASTER TO ignore_server_ids=();
include/start_slave.inc
RESULT
OK
SELECT * FROM t1 ORDER BY a;
a
1
INSERT INTO t1 VALUES (4);
INSERT INTO t1 VALUES (5);
RESULT
OK
a
1
4
5
*** Test the same thing when IO thread exits before SQL thread reaches end of log. ***
include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug= "+d,inject_slave_sql_before_apply_event";
CHANGE MASTER TO ignore_server_ids=(1);
include/start_slave.inc
INSERT INTO t1 VALUES (6);
INSERT INTO t1 VALUES (7);
include/wait_for_slave_param.inc [Read_Master_Log_Pos]
STOP SLAVE IO_THREAD;
SET debug_sync = "now SIGNAL continue";
RESULT
OK
RESULT
OK
include/stop_slave.inc
CHANGE MASTER TO ignore_server_ids=();
SET GLOBAL debug_dbug= @old_dbug;
include/start_slave.inc
INSERT INTO t1 VALUES (8);
INSERT INTO t1 VALUES (9);
SELECT * FROM t1 ORDER BY a;
a
1
4
5
8
9
DROP TABLE t1;
SET GLOBAL gtid_strict_mode= @old_gtid_strict_mode;
SET GLOBAL gtid_strict_mode= @old_gtid_strict_mode;
include/rpl_end.inc
...@@ -7,10 +7,11 @@ INSERT INTO t1 VALUES (1); ...@@ -7,10 +7,11 @@ INSERT INTO t1 VALUES (1);
include/stop_slave.inc include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug; SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,gtid_slave_pos_simulate_failed_delete"; SET GLOBAL debug_dbug="+d,gtid_slave_pos_simulate_failed_delete";
include/start_slave.inc
SET sql_log_bin= 0; SET sql_log_bin= 0;
CALL mtr.add_suppression("Can't find file"); CALL mtr.add_suppression("Can't find file");
ALTER TABLE mysql.gtid_slave_pos ENGINE=MyISAM;
SET sql_log_bin= 1; SET sql_log_bin= 1;
include/start_slave.inc
INSERT INTO t1 VALUES (2); INSERT INTO t1 VALUES (2);
include/wait_for_slave_sql_error.inc [errno=1942] include/wait_for_slave_sql_error.inc [errno=1942]
STOP SLAVE IO_THREAD; STOP SLAVE IO_THREAD;
......
--source include/have_innodb.inc
--source include/have_debug.inc
--source include/have_debug_sync.inc
--let $rpl_topology=1->2
--source include/rpl_init.inc
--connection server_1
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
SET @old_gtid_strict_mode= @@GLOBAL.gtid_strict_mode;
SET GLOBAL gtid_strict_mode= 1;
--connection server_2
--source include/stop_slave.inc
SET @old_gtid_strict_mode= @@GLOBAL.gtid_strict_mode;
SET GLOBAL gtid_strict_mode=1;
CHANGE MASTER TO master_use_gtid=slave_pos;
--source include/start_slave.inc
--connection server_1
CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
--save_master_pos
--connection server_2
--sync_with_master
--echo **** MDEV-4488: GTID position should be updated for events that are ignored due to server id ***
--source include/stop_slave.inc
CHANGE MASTER TO ignore_server_ids=(1);
--source include/start_slave.inc
--connection server_1
# These inserts should be ignored (not applied) on the slave, but the
# gtid_slave_pos should still be updated.
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (3);
--save_master_pos
--let gtid_pos=`SELECT @@GLOBAL.gtid_binlog_pos`
--connection server_2
--sync_with_master
--let $wait_condition= SELECT @@GLOBAL.gtid_slave_pos = '$gtid_pos'
--source include/wait_condition.inc
--disable_query_log
eval SELECT IF(@@GLOBAL.gtid_slave_pos = '$gtid_pos', 'OK', CONCAT("ERROR: Expected $gtid_pos got ", @@GLOBAL.gtid_slave_pos)) AS RESULT;
--enable_query_log
SELECT * FROM t1 ORDER BY a;
--source include/stop_slave.inc
CHANGE MASTER TO ignore_server_ids=();
--source include/start_slave.inc
--sync_with_master
--disable_query_log
eval SELECT IF(@@GLOBAL.gtid_slave_pos = '$gtid_pos', 'OK', CONCAT("ERROR: Expected $gtid_pos got ", @@GLOBAL.gtid_slave_pos)) AS RESULT;
--enable_query_log
SELECT * FROM t1 ORDER BY a;
--connection server_1
INSERT INTO t1 VALUES (4);
INSERT INTO t1 VALUES (5);
--let gtid_pos=`SELECT @@GLOBAL.gtid_binlog_pos`
--save_master_pos
--connection server_2
--sync_with_master
--disable_query_log
eval SELECT IF(@@GLOBAL.gtid_slave_pos = '$gtid_pos', 'OK', CONCAT("ERROR: Expected $gtid_pos got ", @@GLOBAL.gtid_slave_pos)) AS RESULT;
SELECT * FROM t1 ORDER BY a;
--enable_query_log
--echo *** Test the same thing when IO thread exits before SQL thread reaches end of log. ***
--connection server_2
--source include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug= "+d,inject_slave_sql_before_apply_event";
CHANGE MASTER TO ignore_server_ids=(1);
--source include/start_slave.inc
--connection server_1
INSERT INTO t1 VALUES (6);
INSERT INTO t1 VALUES (7);
--let $master_pos= query_get_value(SHOW MASTER STATUS, Position, 1)
--let gtid_pos=`SELECT @@GLOBAL.gtid_binlog_pos`
--save_master_pos
--connection server_2
# Wait for IO thread to have read all events from master, and for SQL thread to
# sit in the debug_sync point.
--let $slave_param= Read_Master_Log_Pos
--let $slave_param_value= $master_pos
--source include/wait_for_slave_param.inc
# Now stop the IO thread, and let the SQL thread continue. The IO thread
# should write a Gtid_list event that the SQL thread can use to update the
# gtid_slave_pos with the GTIDs of the skipped events.
STOP SLAVE IO_THREAD;
SET debug_sync = "now SIGNAL continue";
--sync_with_master
--let $wait_condition= SELECT @@GLOBAL.gtid_slave_pos = '$gtid_pos'
--source include/wait_condition.inc
--disable_query_log
eval SELECT IF(@@GLOBAL.gtid_slave_pos = '$gtid_pos', 'OK', CONCAT("ERROR: Expected $gtid_pos got ", @@GLOBAL.gtid_slave_pos)) AS RESULT;
--let $slave_pos= query_get_value(SHOW SLAVE STATUS, Exec_Master_Log_Pos, 1)
eval SELECT IF('$slave_pos' = '$master_pos', 'OK', "ERROR: Expected $master_pos got $slave_pos") AS RESULT;
--enable_query_log
--source include/stop_slave.inc
CHANGE MASTER TO ignore_server_ids=();
SET GLOBAL debug_dbug= @old_dbug;
--source include/start_slave.inc
--connection server_1
INSERT INTO t1 VALUES (8);
INSERT INTO t1 VALUES (9);
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t1 ORDER BY a;
# Clean up.
--connection server_1
DROP TABLE t1;
SET GLOBAL gtid_strict_mode= @old_gtid_strict_mode;
--connection server_2
SET GLOBAL gtid_strict_mode= @old_gtid_strict_mode;
--source include/rpl_end.inc
...@@ -18,10 +18,14 @@ INSERT INTO t1 VALUES (1); ...@@ -18,10 +18,14 @@ INSERT INTO t1 VALUES (1);
--source include/stop_slave.inc --source include/stop_slave.inc
SET @old_dbug= @@GLOBAL.debug_dbug; SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,gtid_slave_pos_simulate_failed_delete"; SET GLOBAL debug_dbug="+d,gtid_slave_pos_simulate_failed_delete";
--source include/start_slave.inc
SET sql_log_bin= 0; SET sql_log_bin= 0;
CALL mtr.add_suppression("Can't find file"); CALL mtr.add_suppression("Can't find file");
# Since we inject an error updating mysql.gtid_slave_pos, we will get different
# output depending on whether it is InnoDB or MyISAM (roll back or no roll
# back). So fix it to make sure we are consistent.
ALTER TABLE mysql.gtid_slave_pos ENGINE=MyISAM;
SET sql_log_bin= 1; SET sql_log_bin= 1;
--source include/start_slave.inc
--connection master --connection master
INSERT INTO t1 VALUES (2); INSERT INTO t1 VALUES (2);
......
...@@ -6334,7 +6334,7 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) ...@@ -6334,7 +6334,7 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event) const Format_description_log_event *description_event)
: Log_event(buf, description_event), count(0), list(0) : Log_event(buf, description_event), count(0), list(0), sub_id_list(0)
{ {
uint32 i; uint32 i;
uint32 val; uint32 val;
...@@ -6363,6 +6363,31 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, ...@@ -6363,6 +6363,31 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
list[i].seq_no= uint8korr(buf); list[i].seq_no= uint8korr(buf);
buf+= 8; buf+= 8;
} }
#ifdef MYSQL_SERVER
if ((gl_flags & FLAG_IGN_GTIDS))
{
uint32 i;
if (!(sub_id_list= (uint64 *)my_malloc(count*sizeof(uint64), MYF(MY_WME))))
{
my_free(list);
list= NULL;
return;
}
for (i= 0; i < count; ++i)
{
if (!(sub_id_list[i]=
rpl_global_gtid_slave_state.next_sub_id(list[i].domain_id)))
{
my_free(list);
my_free(sub_id_list);
list= NULL;
sub_id_list= NULL;
return;
}
}
}
#endif
} }
...@@ -6370,7 +6395,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, ...@@ -6370,7 +6395,7 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set, Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set,
uint32 gl_flags_) uint32 gl_flags_)
: count(gtid_set->count()), gl_flags(gl_flags_), list(0) : count(gtid_set->count()), gl_flags(gl_flags_), list(0), sub_id_list(0)
{ {
cache_type= EVENT_NO_CACHE; cache_type= EVENT_NO_CACHE;
/* Failure to allocate memory will be caught by is_valid() returning false. */ /* Failure to allocate memory will be caught by is_valid() returning false. */
...@@ -6381,6 +6406,45 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set, ...@@ -6381,6 +6406,45 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set,
} }
Gtid_list_log_event::Gtid_list_log_event(slave_connection_state *gtid_set,
uint32 gl_flags_)
: count(gtid_set->count()), gl_flags(gl_flags_), list(0), sub_id_list(0)
{
cache_type= EVENT_NO_CACHE;
/* Failure to allocate memory will be caught by is_valid() returning false. */
if (count < (1<<28) &&
(list = (rpl_gtid *)my_malloc(count * sizeof(*list) + (count == 0),
MYF(MY_WME))))
{
gtid_set->get_gtid_list(list, count);
if (gl_flags & FLAG_IGN_GTIDS)
{
uint32 i;
if (!(sub_id_list= (uint64 *)my_malloc(count * sizeof(uint64),
MYF(MY_WME))))
{
my_free(list);
list= NULL;
return;
}
for (i= 0; i < count; ++i)
{
if (!(sub_id_list[i]=
rpl_global_gtid_slave_state.next_sub_id(list[i].domain_id)))
{
my_free(list);
my_free(sub_id_list);
list= NULL;
sub_id_list= NULL;
return;
}
}
}
}
}
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
bool bool
Gtid_list_log_event::to_packet(String *packet) Gtid_list_log_event::to_packet(String *packet)
...@@ -6432,7 +6496,20 @@ Gtid_list_log_event::write(IO_CACHE *file) ...@@ -6432,7 +6496,20 @@ Gtid_list_log_event::write(IO_CACHE *file)
int int
Gtid_list_log_event::do_apply_event(Relay_log_info const *rli) Gtid_list_log_event::do_apply_event(Relay_log_info const *rli)
{ {
int ret= Log_event::do_apply_event(rli); int ret;
if (gl_flags & FLAG_IGN_GTIDS)
{
uint32 i;
for (i= 0; i < count; ++i)
{
if ((ret= rpl_global_gtid_slave_state.record_gtid(thd, &list[i],
sub_id_list[i],
false, false)))
return ret;
rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i]);
}
}
ret= Log_event::do_apply_event(rli);
if (rli->until_condition == Relay_log_info::UNTIL_GTID && if (rli->until_condition == Relay_log_info::UNTIL_GTID &&
(gl_flags & FLAG_UNTIL_REACHED)) (gl_flags & FLAG_UNTIL_REACHED))
{ {
......
...@@ -3197,12 +3197,15 @@ class Gtid_list_log_event: public Log_event ...@@ -3197,12 +3197,15 @@ class Gtid_list_log_event: public Log_event
uint32 count; uint32 count;
uint32 gl_flags; uint32 gl_flags;
struct rpl_gtid *list; struct rpl_gtid *list;
uint64 *sub_id_list;
static const uint element_size= 4+4+8; static const uint element_size= 4+4+8;
static const uint32 FLAG_UNTIL_REACHED= (1<<28); static const uint32 FLAG_UNTIL_REACHED= (1<<28);
static const uint32 FLAG_IGN_GTIDS= (1<<29);
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
Gtid_list_log_event(rpl_binlog_state *gtid_set, uint32 gl_flags); Gtid_list_log_event(rpl_binlog_state *gtid_set, uint32 gl_flags);
Gtid_list_log_event(slave_connection_state *gtid_set, uint32 gl_flags);
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol); void pack_info(THD *thd, Protocol *protocol);
#endif #endif
...@@ -3211,7 +3214,7 @@ class Gtid_list_log_event: public Log_event ...@@ -3211,7 +3214,7 @@ class Gtid_list_log_event: public Log_event
#endif #endif
Gtid_list_log_event(const char *buf, uint event_len, Gtid_list_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event); const Format_description_log_event *description_event);
~Gtid_list_log_event() { my_free(list); } ~Gtid_list_log_event() { my_free(list); my_free(sub_id_list); }
Log_event_type get_type_code() { return GTID_LIST_EVENT; } Log_event_type get_type_code() { return GTID_LIST_EVENT; }
int get_data_size() { int get_data_size() {
/* /*
......
...@@ -1419,6 +1419,15 @@ slave_connection_state::remove(const rpl_gtid *in_gtid) ...@@ -1419,6 +1419,15 @@ slave_connection_state::remove(const rpl_gtid *in_gtid)
} }
void
slave_connection_state::remove_if_present(const rpl_gtid *in_gtid)
{
uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
if (rec)
my_hash_delete(&hash, rec);
}
int int
slave_connection_state::to_string(String *out_str) slave_connection_state::to_string(String *out_str)
{ {
...@@ -1442,3 +1451,22 @@ slave_connection_state::append_to_string(String *out_str) ...@@ -1442,3 +1451,22 @@ slave_connection_state::append_to_string(String *out_str)
} }
return 0; return 0;
} }
int
slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
{
uint32 i, pos;
pos= 0;
for (i= 0; i < hash.records; ++i)
{
entry *e;
if (pos >= list_size)
return 1;
e= (entry *)my_hash_element(&hash, i);
memcpy(&gtid_list[pos++], &e->gtid, sizeof(e->gtid));
}
return 0;
}
...@@ -195,9 +195,11 @@ struct slave_connection_state ...@@ -195,9 +195,11 @@ struct slave_connection_state
entry *find_entry(uint32 domain_id); entry *find_entry(uint32 domain_id);
int update(const rpl_gtid *in_gtid); int update(const rpl_gtid *in_gtid);
void remove(const rpl_gtid *gtid); void remove(const rpl_gtid *gtid);
void remove_if_present(const rpl_gtid *in_gtid);
ulong count() const { return hash.records; } ulong count() const { return hash.records; }
int to_string(String *out_str); int to_string(String *out_str);
int append_to_string(String *out_str); int append_to_string(String *out_str);
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
}; };
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
......
...@@ -303,6 +303,8 @@ class Relay_log_info : public Slave_reporting_capability ...@@ -303,6 +303,8 @@ class Relay_log_info : public Slave_reporting_capability
*/ */
char ign_master_log_name_end[FN_REFLEN]; char ign_master_log_name_end[FN_REFLEN];
ulonglong ign_master_log_pos_end; ulonglong ign_master_log_pos_end;
/* Similar for ignored GTID events. */
slave_connection_state ign_gtids;
/* /*
Indentifies where the SQL Thread should create temporary files for the Indentifies where the SQL Thread should create temporary files for the
......
...@@ -2222,34 +2222,66 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi) ...@@ -2222,34 +2222,66 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
DBUG_ASSERT(thd == mi->io_thd); DBUG_ASSERT(thd == mi->io_thd);
mysql_mutex_lock(log_lock); mysql_mutex_lock(log_lock);
if (rli->ign_master_log_name_end[0] || rli->ign_gtids.count())
{
Rotate_log_event *rev;
Gtid_list_log_event *glev;
if (rli->ign_master_log_name_end[0]) if (rli->ign_master_log_name_end[0])
{ {
DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); rev= new Rotate_log_event(rli->ign_master_log_name_end,
Rotate_log_event *ev= new Rotate_log_event(rli->ign_master_log_name_end,
0, rli->ign_master_log_pos_end, 0, rli->ign_master_log_pos_end,
Rotate_log_event::DUP_NAME); Rotate_log_event::DUP_NAME);
rli->ign_master_log_name_end[0]= 0; rli->ign_master_log_name_end[0]= 0;
/* can unlock before writing as slave SQL thd will soon see our Rotate */ if (unlikely(!(bool)rev))
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
"Rotate_event (out of memory?),"
" SHOW SLAVE STATUS may be inaccurate");
}
if (rli->ign_gtids.count())
{
glev= new Gtid_list_log_event(&rli->ign_gtids,
Gtid_list_log_event::FLAG_IGN_GTIDS);
rli->ign_gtids.reset();
if (unlikely(!(bool)glev))
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
"Gtid_list_event (out of memory?),"
" gtid_slave_pos may be inaccurate");
}
/* Can unlock before writing as slave SQL thd will soon see our event. */
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
if (likely((bool)ev)) if (rev)
{ {
ev->server_id= 0; // don't be ignored by slave SQL thread DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
if (unlikely(rli->relay_log.append(ev))) rev->server_id= 0; // don't be ignored by slave SQL thread
if (unlikely(rli->relay_log.append(rev)))
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE), ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Rotate event" "failed to write a Rotate event"
" to the relay log, SHOW SLAVE STATUS may be" " to the relay log, SHOW SLAVE STATUS may be"
" inaccurate"); " inaccurate");
delete rev;
}
if (glev)
{
DBUG_PRINT("info",("writing a Gtid_list event to track down ignored events"));
glev->server_id= 0; // don't be ignored by slave SQL thread
glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos
if (unlikely(rli->relay_log.append(glev)))
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Gtid_list event to the relay log, "
"gtid_slave_pos may be inaccurate");
delete glev;
}
if (likely (rev || glev))
{
rli->relay_log.harvest_bytes_written(&rli->log_space_total); rli->relay_log.harvest_bytes_written(&rli->log_space_total);
if (flush_master_info(mi, TRUE, TRUE)) if (flush_master_info(mi, TRUE, TRUE))
sql_print_error("Failed to flush master info file"); sql_print_error("Failed to flush master info file");
delete ev;
} }
else
mi->report(ERROR_LEVEL, ER_SLAVE_CREATE_EVENT_FAILURE,
ER(ER_SLAVE_CREATE_EVENT_FAILURE),
"Rotate_event (out of memory?),"
" SHOW SLAVE STATUS may be inaccurate");
} }
else else
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
...@@ -3097,6 +3129,12 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli) ...@@ -3097,6 +3129,12 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
rli->slave_skip_counter--; rli->slave_skip_counter--;
} }
mysql_mutex_unlock(&rli->data_lock); mysql_mutex_unlock(&rli->data_lock);
DBUG_EXECUTE_IF("inject_slave_sql_before_apply_event",
{
DBUG_ASSERT(!debug_sync_set_action
(thd, STRING_WITH_LEN("now WAIT_FOR continue")));
DBUG_SET_INITIAL("-d,inject_slave_sql_before_apply_event");
};);
if (reason == Log_event::EVENT_SKIP_NOT) if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli); exec_res= ev->apply_event(rli);
...@@ -4822,6 +4860,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -4822,6 +4860,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
ulong s_id; ulong s_id;
bool unlock_data_lock= TRUE; bool unlock_data_lock= TRUE;
bool gtid_skip_enqueue= false; bool gtid_skip_enqueue= false;
bool got_gtid_event= false;
rpl_gtid event_gtid;
/* /*
FD_q must have been prepared for the first R_a event FD_q must have been prepared for the first R_a event
...@@ -5140,6 +5180,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5140,6 +5180,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{ {
uchar dummy_flag; uchar dummy_flag;
if (Gtid_log_event::peek(buf, event_len, checksum_alg,
&event_gtid.domain_id, &event_gtid.server_id,
&event_gtid.seq_no, &dummy_flag))
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
got_gtid_event= true;
if (mi->using_gtid == Master_info::USE_GTID_NO) if (mi->using_gtid == Master_info::USE_GTID_NO)
goto default_action; goto default_action;
if (unlikely(!mi->gtid_event_seen)) if (unlikely(!mi->gtid_event_seen))
...@@ -5147,8 +5195,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5147,8 +5195,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mi->gtid_event_seen= true; mi->gtid_event_seen= true;
if (mi->gtid_reconnect_event_skip_count) if (mi->gtid_reconnect_event_skip_count)
{ {
rpl_gtid gtid;
/* /*
If we are reconnecting, and we need to skip a partial event group If we are reconnecting, and we need to skip a partial event group
already queued to the relay log before the reconnect, then we check already queued to the relay log before the reconnect, then we check
...@@ -5157,21 +5203,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5157,21 +5203,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
The only way we should be able to receive a different GTID than what The only way we should be able to receive a different GTID than what
we expect is if the binlog on the master (or more likely the whole we expect is if the binlog on the master (or more likely the whole
master server) was replaced with a different one, one the same IP master server) was replaced with a different one, on the same IP
address, _and_ the new master happens to have domains in a different address, _and_ the new master happens to have domains in a different
order so we get the GTID from a different domain first. Still, it is order so we get the GTID from a different domain first. Still, it is
best to protect against this case. best to protect against this case.
*/ */
if (Gtid_log_event::peek(buf, event_len, checksum_alg, if (event_gtid.domain_id != mi->last_queued_gtid.domain_id ||
&gtid.domain_id, &gtid.server_id, event_gtid.server_id != mi->last_queued_gtid.server_id ||
&gtid.seq_no, &dummy_flag)) event_gtid.seq_no != mi->last_queued_gtid.seq_no)
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
if (gtid.domain_id != mi->last_queued_gtid.domain_id ||
gtid.server_id != mi->last_queued_gtid.server_id ||
gtid.seq_no != mi->last_queued_gtid.seq_no)
{ {
bool first; bool first;
error= ER_SLAVE_UNEXPECTED_MASTER_SWITCH; error= ER_SLAVE_UNEXPECTED_MASTER_SWITCH;
...@@ -5181,7 +5220,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5181,7 +5220,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
&first); &first);
error_msg.append(STRING_WITH_LEN(", received: ")); error_msg.append(STRING_WITH_LEN(", received: "));
first= true; first= true;
rpl_slave_state_tostring_helper(&error_msg, &gtid, &first); rpl_slave_state_tostring_helper(&error_msg, &event_gtid, &first);
goto err; goto err;
} }
} }
...@@ -5261,6 +5300,16 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5261,6 +5300,16 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mysql_mutex_lock(log_lock); mysql_mutex_lock(log_lock);
s_id= uint4korr(buf + SERVER_ID_OFFSET); s_id= uint4korr(buf + SERVER_ID_OFFSET);
/*
Write the event to the relay log, unless we reconnected in the middle
of an event group and now need to skip the initial part of the group that
we already wrote before reconnecting.
*/
if (unlikely(gtid_skip_enqueue))
{
mi->master_log_pos+= inc_pos;
}
else
if ((s_id == global_system_variables.server_id && if ((s_id == global_system_variables.server_id &&
!mi->rli.replicate_same_server_id) || !mi->rli.replicate_same_server_id) ||
/* /*
...@@ -5303,6 +5352,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5303,6 +5352,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
DBUG_ASSERT(rli->ign_master_log_name_end[0]); DBUG_ASSERT(rli->ign_master_log_name_end[0]);
rli->ign_master_log_pos_end= mi->master_log_pos; rli->ign_master_log_pos_end= mi->master_log_pos;
if (got_gtid_event)
rli->ign_gtids.update(&event_gtid);
} }
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored", DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored",
...@@ -5310,16 +5361,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5310,16 +5361,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
} }
else else
{ {
/* if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
Write the event to the relay log, unless we reconnected in the middle
of an event group and now need to skip the initial part of the group that
we already wrote before reconnecting.
*/
if (unlikely(gtid_skip_enqueue))
{
mi->master_log_pos+= inc_pos;
}
else if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
{ {
mi->master_log_pos+= inc_pos; mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
...@@ -5330,6 +5372,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5330,6 +5372,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
} }
rli->ign_master_log_name_end[0]= 0; // last event is not ignored rli->ign_master_log_name_end[0]= 0; // last event is not ignored
if (got_gtid_event)
rli->ign_gtids.remove_if_present(&event_gtid);
if (save_buf != NULL) if (save_buf != NULL)
buf= save_buf; buf= save_buf;
} }
...@@ -5932,6 +5976,25 @@ static Log_event* next_event(Relay_log_info* rli) ...@@ -5932,6 +5976,25 @@ static Log_event* next_event(Relay_log_info* rli)
DBUG_RETURN(ev); DBUG_RETURN(ev);
} }
if (rli->ign_gtids.count())
{
/* We generate and return a Gtid_list, to update gtid_slave_pos. */
DBUG_PRINT("info",("seeing ignored end gtids"));
ev= new Gtid_list_log_event(&rli->ign_gtids,
Gtid_list_log_event::FLAG_IGN_GTIDS);
rli->ign_gtids.reset();
mysql_mutex_unlock(log_lock);
if (unlikely(!ev))
{
errmsg= "Slave SQL thread failed to create a Gtid_list event "
"(out of memory?), gtid_slave_pos may be inaccurate";
goto err;
}
ev->server_id= 0; // don't be ignored by slave SQL thread
ev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos
DBUG_RETURN(ev);
}
/* /*
We can, and should release data_lock while we are waiting for We can, and should release data_lock while we are waiting for
update. If we do not, show slave status will block update. If we do not, show slave status will block
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment