Commit 9d9ddad7 authored by unknown's avatar unknown

MDEV-26: Global transaction ID.

Fix things so that a master can switch with MASTER_GTID_POS=AUTO to a slave
that was previously running with log_slave_updates=0, by looking into the
slave replication state on the master when the slave requests something not
present in the binlog.

Be a bit more strict about what position the slave can ask for, to avoid some
easy-to-hit misconfiguration errors.

Start over with seq_no counter when RESET MASTER.
parent 379819d8
......@@ -29,5 +29,49 @@ include/start_slave.inc
SELECT * FROM t1;
a
1
*** Test requesting an explicit GTID position that conflicts with newer GTIDs of our own in the binlog. ***
include/stop_slave.inc
RESET MASTER;
INSERT INTO t1 VALUES (2);
INSERT INTO t1 VALUES (4);
SET sql_log_bin = 0;
INSERT INTO t1 VALUES (2);
SET sql_log_bin = 1;
INSERT INTO t1 VALUES (3);
CHANGE MASTER TO master_gtid_pos = "0-1-1";
ERROR HY000: Requested MASTER_GTID_POS 0-1-1 conflicts with the binary log which contains a more recent GTID 0-2-11. To use the requested MASTER_GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog
RESET MASTER;
CHANGE MASTER TO master_gtid_pos = "0-1-1";
START SLAVE;
SELECT * FROM t1 ORDER BY a;
a
1
2
3
4
*** Test slave requesting a GTID that is not present in the master's binlog ***
include/stop_slave.inc
CHANGE MASTER TO master_gtid_pos = "0-1-3";
START SLAVE;
SET sql_log_bin=0;
CALL mtr.add_suppression("Got fatal error .* from master when reading data from binary log: 'Error: connecting slave requested to start from GTID .*, which is not in the master's binlog'");
SET sql_log_bin=1;
include/wait_for_slave_io_error.inc [errno=1236]
Slave_IO_State = ''
Last_IO_Errno = '1236'
Last_IO_Error = 'Got fatal error 1236 from master when reading data from binary log: 'Error: connecting slave requested to start from GTID 0-1-3, which is not in the master's binlog''
Using_Gtid = '1'
include/stop_slave.inc
CHANGE MASTER TO master_gtid_pos = "0-1-2";
START SLAVE;
include/wait_for_slave_to_start.inc
INSERT INTO t1 VALUES (5);
SELECT * FROM t1 ORDER BY a;
a
1
2
3
4
5
DROP TABLE t1;
include/rpl_end.inc
include/rpl_init.inc [topology=1->2]
select @@global.log_slave_updates;
@@global.log_slave_updates
0
CREATE TABLE t1 (a INT PRIMARY KEY, b INT);
INSERT INTO t1 VALUES (1, 1);
INSERT INTO t1 VALUES (2, 1);
select @@global.log_slave_updates;
@@global.log_slave_updates
0
SELECT * FROM t1 ORDER BY a;
a b
1 1
2 1
include/stop_slave.inc
INSERT INTO t1 VALUES (3, 2);
INSERT INTO t1 VALUES (4, 2);
show binlog events from <binlog_start>;
Log_name Pos Event_type Server_id End_log_pos Info
slave-bin.000001 # Binlog_checkpoint # # slave-bin.000001
slave-bin.000001 # Gtid # # BEGIN GTID 0-2-4
slave-bin.000001 # Query # # use `test`; INSERT INTO t1 VALUES (3, 2)
slave-bin.000001 # Query # # COMMIT
slave-bin.000001 # Gtid # # BEGIN GTID 0-2-5
slave-bin.000001 # Query # # use `test`; INSERT INTO t1 VALUES (4, 2)
slave-bin.000001 # Query # # COMMIT
CHANGE MASTER TO master_host = '127.0.0.1', master_port = SLAVE_PORT,
master_user = 'root', master_gtid_pos = AUTO;
START SLAVE;
SELECT * FROM t1 ORDER BY a;
a b
1 1
2 1
3 2
4 2
include/stop_slave.inc
RESET SLAVE;
INSERT INTO t1 VALUES (5, 1);
INSERT INTO t1 VALUES (6, 1);
CHANGE MASTER TO master_host = '127.0.0.1', master_port = MASTER_PORT,
master_gtid_pos = AUTO;
START SLAVE;
SELECT * FROM t1 ORDER BY a;
a b
1 1
2 1
3 2
4 2
5 1
6 1
DROP TABLE t1;
include/rpl_end.inc
......@@ -35,6 +35,36 @@ SELECT * FROM t1 ORDER BY a;
a
1
2
SET sql_log_bin=0;
call mtr.add_suppression('Could not find GTID state requested by slave in any binlog files');
SET sql_log_bin=1;
*** Test that we give error when explict MASTER_GTID_POS=xxx that conflicts with what is in our binary log ***
include/stop_slave.inc
INSERT INTO t1 VALUES(3);
CHANGE MASTER TO master_host = '127.0.0.1', master_port = MASTER_PORT,
MASTER_GTID_POS='0-1-3';
include/start_slave.inc
SELECT * FROM t1 ORDER by a;
a
1
2
3
include/stop_slave.inc
INSERT INTO t1 VALUES (4);
INSERT INTO t1 VALUES (10);
DELETE FROM t1 WHERE a=10;
CHANGE MASTER TO master_host = '127.0.0.1', master_port = MASTER_PORT,
MASTER_GTID_POS='0-1-4';
ERROR HY000: Requested MASTER_GTID_POS 0-1-4 conflicts with the binary log which contains a more recent GTID 0-2-6. To use the requested MASTER_GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog
RESET MASTER;
CHANGE MASTER TO master_host = '127.0.0.1', master_port = MASTER_PORT,
MASTER_GTID_POS='0-1-4';
START SLAVE;
SELECT * FROM t1 ORDER by a;
a
1
2
3
4
DROP TABLE t1;
include/rpl_end.inc
......@@ -50,6 +50,63 @@ ALTER TABLE mysql.rpl_slave_state ADD PRIMARY KEY (domain_id, sub_id);
--connection slave
SELECT * FROM t1;
--echo *** Test requesting an explicit GTID position that conflicts with newer GTIDs of our own in the binlog. ***
--connection slave
--source include/stop_slave.inc
--connection master
RESET MASTER;
# This insert will be GTID 0-1-1
INSERT INTO t1 VALUES (2);
# And this will be GTID 0-1-2
INSERT INTO t1 VALUES (4);
--connection slave
SET sql_log_bin = 0;
INSERT INTO t1 VALUES (2);
SET sql_log_bin = 1;
INSERT INTO t1 VALUES (3);
--error ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG
CHANGE MASTER TO master_gtid_pos = "0-1-1";
RESET MASTER;
CHANGE MASTER TO master_gtid_pos = "0-1-1";
START SLAVE;
--let $wait_condition= SELECT COUNT(*) = 4 FROM t1
--source include/wait_condition.inc
SELECT * FROM t1 ORDER BY a;
--echo *** Test slave requesting a GTID that is not present in the master's binlog ***
--source include/stop_slave.inc
CHANGE MASTER TO master_gtid_pos = "0-1-3";
START SLAVE;
SET sql_log_bin=0;
CALL mtr.add_suppression("Got fatal error .* from master when reading data from binary log: 'Error: connecting slave requested to start from GTID .*, which is not in the master's binlog'");
SET sql_log_bin=1;
--let $slave_io_errno= 1236
--source include/wait_for_slave_io_error.inc
--let $status_items= Slave_IO_State, Last_IO_Errno, Last_IO_Error, Using_Gtid
--source include/show_slave_status.inc
--let $rpl_only_running_threads= 1
--source include/stop_slave.inc
CHANGE MASTER TO master_gtid_pos = "0-1-2";
START SLAVE;
--source include/wait_for_slave_to_start.inc
--connection master
INSERT INTO t1 VALUES (5);
--connection slave
--let $wait_condition= SELECT COUNT(*) = 5 FROM t1
--source include/wait_condition.inc
SELECT * FROM t1 ORDER BY a;
--connection master
DROP TABLE t1;
......
!include ../my.cnf
[mysqld.1]
log-slave-updates=0
loose-innodb
[mysqld.2]
log-slave-updates=0
loose-innodb
--let $rpl_topology=1->2
--source include/rpl_init.inc
--source include/have_binlog_format_statement.inc
--connection server_1
select @@global.log_slave_updates;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT);
INSERT INTO t1 VALUES (1, 1);
INSERT INTO t1 VALUES (2, 1);
--save_master_pos
--connection server_2
select @@global.log_slave_updates;
--sync_with_master
SELECT * FROM t1 ORDER BY a;
--source include/stop_slave.inc
INSERT INTO t1 VALUES (3, 2);
INSERT INTO t1 VALUES (4, 2);
--source include/show_binlog_events.inc
--connection server_1
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $SLAVE_MYPORT,
master_user = 'root', master_gtid_pos = AUTO;
START SLAVE;
--let $wait_condition= SELECT COUNT(*) = 4 FROM t1
--source include/wait_condition.inc
SELECT * FROM t1 ORDER BY a;
--source include/stop_slave.inc
RESET SLAVE;
INSERT INTO t1 VALUES (5, 1);
INSERT INTO t1 VALUES (6, 1);
--connection server_2
--replace_result $MASTER_MYPORT MASTER_PORT
eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT,
master_gtid_pos = AUTO;
START SLAVE;
--let $wait_condition= SELECT COUNT(*) = 6 FROM t1
--source include/wait_condition.inc
SELECT * FROM t1 ORDER BY a;
# Cleanup.
--connection server_1
DROP TABLE t1;
--source include/rpl_end.inc
......@@ -55,7 +55,50 @@ eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT,
--source include/start_slave.inc
--sync_with_master
SELECT * FROM t1 ORDER BY a;
SET sql_log_bin=0;
call mtr.add_suppression('Could not find GTID state requested by slave in any binlog files');
SET sql_log_bin=1;
--echo *** Test that we give error when explict MASTER_GTID_POS=xxx that conflicts with what is in our binary log ***
--source include/stop_slave.inc
--connection server_1
INSERT INTO t1 VALUES(3);
--connection server_2
--replace_result $MASTER_MYPORT MASTER_PORT
eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT,
MASTER_GTID_POS='0-1-3';
--source include/start_slave.inc
--let $wait_condition= SELECT COUNT(*) = 3 FROM t1
--source include/wait_condition.inc
SELECT * FROM t1 ORDER by a;
--source include/stop_slave.inc
--connection server_1
INSERT INTO t1 VALUES (4);
--connection server_2
# Now add some local transactions that conflict with the GTID position
# being set for MASTER_GTID_POS.
INSERT INTO t1 VALUES (10);
DELETE FROM t1 WHERE a=10;
--replace_result $MASTER_MYPORT MASTER_PORT
--error ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG
eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT,
MASTER_GTID_POS='0-1-4';
# Try again after RESET MASTER to remove the conflicting binlog.
RESET MASTER;
--replace_result $MASTER_MYPORT MASTER_PORT
eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT,
MASTER_GTID_POS='0-1-4';
START SLAVE;
--let $wait_condition= SELECT COUNT(*) = 4 FROM t1
--source include/wait_condition.inc
SELECT * FROM t1 ORDER by a;
# Clean up.
--connection server_1
DROP TABLE t1;
......
......@@ -3835,7 +3835,12 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log)
}
if (!is_relay_log)
{
rpl_global_gtid_binlog_state.reset();
mysql_mutex_lock(&LOCK_gtid_counter);
global_gtid_counter= 0;
mysql_mutex_unlock(&LOCK_gtid_counter);
}
/* Start logging with a new file */
close(LOG_CLOSE_INDEX | LOG_CLOSE_TO_BE_OPENED);
......@@ -5345,20 +5350,8 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
/*
If we see a higher sequence number, use that one as the basis of any
later generated sequence numbers.
This way, in simple tree replication topologies with just one master
generating events at any point in time, sequence number will always be
monotonic irrespectively of server_id. Only if events are produced in
parallel on multiple master servers will sequence id be non-monotonic
and server id needed to distinguish.
We will not rely on this in the server code, but it makes things
conceptually easier to understand for the DBA.
*/
mysql_mutex_lock(&LOCK_gtid_counter);
if (global_gtid_counter < seq_no)
global_gtid_counter= seq_no;
mysql_mutex_unlock(&LOCK_gtid_counter);
bump_seq_no_counter_if_needed(seq_no);
}
else
{
......@@ -5495,6 +5488,53 @@ MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
}
bool
MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id,
rpl_gtid *out_gtid)
{
rpl_gtid *gtid;
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
if ((gtid= rpl_global_gtid_binlog_state.find(domain_id, server_id)))
*out_gtid= *gtid;
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
return gtid != NULL;
}
bool
MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id,
rpl_gtid *out_gtid)
{
rpl_binlog_state::element *elem;
bool res;
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
elem= (rpl_binlog_state::element *)
my_hash_search(&rpl_global_gtid_binlog_state.hash,
(const uchar *)&domain_id, 0);
if (elem)
{
res= true;
*out_gtid= *elem->last_gtid;
}
else
res= false;
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
return res;
}
void
MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint64 seq_no)
{
mysql_mutex_lock(&LOCK_gtid_counter);
if (global_gtid_counter < seq_no)
global_gtid_counter= seq_no;
mysql_mutex_unlock(&LOCK_gtid_counter);
}
/**
Write an event to the binary log. If with_annotate != NULL and
*with_annotate = TRUE write also Annotate_rows before the event
......@@ -8183,7 +8223,8 @@ int TC_LOG_BINLOG::open(const char *opt_name)
else
error= read_state_from_file();
/* Pick the next unused seq_no from the loaded/recovered binlog state. */
global_gtid_counter= rpl_global_gtid_binlog_state.seq_no_from_state();
bump_seq_no_counter_if_needed(
rpl_global_gtid_binlog_state.seq_no_from_state());
delete ev;
end_io_cache(&log);
......@@ -8433,6 +8474,26 @@ binlog_background_thread(void *arg __attribute__((unused)))
mysql_mutex_unlock(&LOCK_thread_count);
thd->store_globals();
/*
Load the slave replication GTID state from the mysql.rpl_slave_state
table.
This is mostly so that we can start our seq_no counter from the highest
seq_no seen by a slave. This way, we have a way to tell if a transaction
logged by ourselves as master is newer or older than a replicated
transaction.
*/
#ifdef HAVE_REPLICATION
if (rpl_load_gtid_slave_state(thd))
sql_print_warning("Failed to load slave replication state from table "
"%s.%s", "mysql", rpl_gtid_slave_state_table_name.str);
#endif
mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread);
binlog_background_thread_started= true;
mysql_cond_signal(&mysql_bin_log.COND_binlog_background_thread_end);
mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread);
for (;;)
{
/*
......@@ -8515,7 +8576,16 @@ start_binlog_background_thread()
binlog_background_thread, NULL))
return 1;
binlog_background_thread_started= true;
/*
Wait for the thread to have started (so we know that the slave replication
state is loaded and we have correct global_gtid_counter).
*/
mysql_mutex_lock(&mysql_bin_log.LOCK_binlog_background_thread);
while (!binlog_background_thread_started)
mysql_cond_wait(&mysql_bin_log.COND_binlog_background_thread_end,
&mysql_bin_log.LOCK_binlog_background_thread);
mysql_mutex_unlock(&mysql_bin_log.LOCK_binlog_background_thread);
return 0;
}
......
......@@ -775,6 +775,10 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
int read_state_from_file();
int write_state_to_file();
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
bool find_in_binlog_state(uint32 domain_id, uint32 server_id,
rpl_gtid *out_gtid);
bool lookup_domain_in_binlog_state(uint32 domain_id, rpl_gtid *out_gtid);
void bump_seq_no_counter_if_needed(uint64 seq_no);
};
class Log_event_handler
......
......@@ -210,6 +210,7 @@ rpl_slave_state::truncate_state_table(THD *thd)
close_thread_tables(thd);
ha_commit_trans(thd, TRUE);
}
thd->mdl_context.release_transactional_locks();
}
return err;
......@@ -360,6 +361,8 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
}
table->file->ha_index_end();
mysql_bin_log.bump_seq_no_counter_if_needed(gtid->seq_no);
end:
if (table_opened)
......@@ -378,6 +381,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
ha_commit_trans(thd, FALSE);
close_thread_tables(thd);
}
if (in_transaction)
thd->mdl_context.release_statement_locks();
else
thd->mdl_context.release_transactional_locks();
}
thd->variables.option_bits= thd_saved_option;
return err;
......@@ -423,6 +430,14 @@ rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first)
The state consists of the most recently applied GTID for each domain_id,
ie. the one with the highest sub_id within each domain_id.
Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when
a server was previously a master and now needs to connect to a new master as
a slave. For each domain_id, if the GTID in the binlog was logged with our
own server_id _and_ has a higher seq_no than what is in the slave state,
then this should be used as the position to start replicating at. This
allows to promote a slave as new master, and connect the old master as a
slave with MASTER_GTID_POS=AUTO.
*/
int
......@@ -438,7 +453,8 @@ rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
my_hash_init(&gtid_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, NULL, HASH_UNIQUE);
for (i= 0; i < num_extra; ++i)
if (my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
if (extra_gtids[i].server_id == global_system_variables.server_id &&
my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
goto err;
lock();
......@@ -507,6 +523,47 @@ rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra)
}
/*
Lookup a domain_id in the current replication slave state.
Returns false if the domain_id has no entries in the slave state.
Otherwise returns true, and fills in out_gtid with the corresponding
GTID.
*/
bool
rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
{
element *elem;
list_element *list;
uint64 best_sub_id;
lock();
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (!elem || !(list= elem->list))
{
unlock();
return false;
}
out_gtid->domain_id= domain_id;
out_gtid->server_id= list->server_id;
out_gtid->seq_no= list->seq_no;
best_sub_id= list->sub_id;
while ((list= list->next))
{
if (best_sub_id > list->sub_id)
continue;
best_sub_id= list->sub_id;
out_gtid->server_id= list->server_id;
out_gtid->seq_no= list->seq_no;
}
unlock();
return true;
}
/*
Parse a GTID at the start of a string, and update the pointer to point
at the first character after the parsed GTID.
......@@ -719,7 +776,7 @@ rpl_binlog_state::update(const struct rpl_gtid *gtid)
}
uint32
uint64
rpl_binlog_state::seq_no_from_state()
{
ulong i, j;
......@@ -804,6 +861,16 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src)
}
rpl_gtid *
rpl_binlog_state::find(uint32 domain_id, uint32 server_id)
{
element *elem;
if (!(elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
return NULL;
return (rpl_gtid *)my_hash_search(&elem->hash, (const uchar *)&server_id, 0);
}
slave_connection_state::slave_connection_state()
{
my_hash_init(&hash, &my_charset_bin, 32,
......
......@@ -89,6 +89,7 @@ struct rpl_slave_state
bool in_transaction);
uint64 next_subid(uint32 domain_id);
int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra);
bool domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid);
int load(THD *thd, char *state_from_master, size_t len, bool reset);
bool is_empty();
......@@ -135,12 +136,13 @@ struct rpl_binlog_state
void reset();
int update(const struct rpl_gtid *gtid);
uint32 seq_no_from_state();
uint64 seq_no_from_state();
int write_to_iocache(IO_CACHE *dest);
int read_from_iocache(IO_CACHE *src);
uint32 count();
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
rpl_gtid *find(uint32 domain_id, uint32 server_id);
};
......
......@@ -1365,4 +1365,136 @@ void Relay_log_info::slave_close_thread_tables(THD *thd)
clear_tables_to_lock();
DBUG_VOID_RETURN;
}
int
rpl_load_gtid_slave_state(THD *thd)
{
TABLE_LIST tlist;
TABLE *table;
bool table_opened= false;
bool table_scanned= false;
struct local_element { uint64 sub_id; rpl_gtid gtid; };
struct local_element *entry;
HASH hash;
int err= 0;
uint32 i;
uint64 highest_seq_no= 0;
DBUG_ENTER("rpl_load_gtid_slave_state");
rpl_global_gtid_slave_state.lock();
bool loaded= rpl_global_gtid_slave_state.loaded;
rpl_global_gtid_slave_state.unlock();
if (loaded)
DBUG_RETURN(0);
my_hash_init(&hash, &my_charset_bin, 32,
offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
mysql_reset_thd_for_next_command(thd, 0);
tlist.init_one_table(STRING_WITH_LEN("mysql"),
rpl_gtid_slave_state_table_name.str,
rpl_gtid_slave_state_table_name.length,
NULL, TL_READ);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table_opened= true;
table= tlist.table;
if ((err= gtid_check_rpl_slave_state_table(table)))
goto end;
bitmap_set_all(table->read_set);
if ((err= table->file->ha_rnd_init_with_error(1)))
goto end;
table_scanned= true;
for (;;)
{
uint32 domain_id, server_id;
uint64 sub_id, seq_no;
uchar *rec;
if ((err= table->file->ha_rnd_next(table->record[0])))
{
if (err == HA_ERR_RECORD_DELETED)
continue;
else if (err == HA_ERR_END_OF_FILE)
break;
else
goto end;
}
domain_id= (ulonglong)table->field[0]->val_int();
sub_id= (ulonglong)table->field[1]->val_int();
server_id= (ulonglong)table->field[2]->val_int();
seq_no= (ulonglong)table->field[3]->val_int();
DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n",
(unsigned)domain_id, (unsigned)server_id,
(ulong)seq_no, (ulong)sub_id));
if (seq_no > highest_seq_no)
highest_seq_no= seq_no;
if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
{
entry= (struct local_element *)rec;
if (entry->sub_id >= sub_id)
continue;
}
else
{
if (!(entry= (struct local_element *)my_malloc(sizeof(*entry),
MYF(MY_WME))))
{
err= 1;
goto end;
}
if ((err= my_hash_insert(&hash, (uchar *)entry)))
{
my_free(entry);
goto end;
}
}
entry->sub_id= sub_id;
entry->gtid.domain_id= domain_id;
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
}
rpl_global_gtid_slave_state.lock();
for (i= 0; i < hash.records; ++i)
{
entry= (struct local_element *)my_hash_element(&hash, i);
if ((err= rpl_global_gtid_slave_state.update(entry->gtid.domain_id,
entry->gtid.server_id,
entry->sub_id,
entry->gtid.seq_no)))
{
rpl_global_gtid_slave_state.unlock();
goto end;
}
}
rpl_global_gtid_slave_state.loaded= true;
rpl_global_gtid_slave_state.unlock();
err= 0; /* Clear HA_ERR_END_OF_FILE */
end:
if (table_scanned)
{
table->file->ha_index_or_rnd_end();
ha_commit_trans(thd, FALSE);
ha_commit_trans(thd, TRUE);
}
if (table_opened)
{
close_thread_tables(thd);
thd->mdl_context.release_transactional_locks();
}
my_hash_free(&hash);
mysql_bin_log.bump_seq_no_counter_if_needed(highest_seq_no);
DBUG_RETURN(err);
}
#endif
......@@ -594,5 +594,6 @@ int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
extern struct rpl_slave_state rpl_global_gtid_slave_state;
int rpl_load_gtid_slave_state(THD *thd);
#endif /* RPL_RLI_H */
......@@ -6611,3 +6611,9 @@ ER_DUPLICATE_GTID_DOMAIN
ER_GTID_OPEN_TABLE_FAILED
eng "Failed to open %s.%s"
ger "Öffnen von %s.%s fehlgeschlagen"
ER_GTID_POSITION_NOT_FOUND_IN_BINLOG
eng "Connecting slave requested to start from GTID %u-%u-%llu, which is not in the master's binlog"
ER_CANNOT_LOAD_SLAVE_GTID_STATE
eng "Failed to load replication slave GTID state from table %s.%s"
ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG
eng "Requested MASTER_GTID_POS %u-%u-%llu conflicts with the binary log which contains a more recent GTID %u-%u-%llu. To use the requested MASTER_GTID_POS, the old binlog must be removed with RESET MASTER to avoid out-of-order binlog"
......@@ -698,6 +698,17 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
/*
Check if every GTID requested by the slave is contained in this (or a later)
binlog file. Return true if so, false if not.
We do the check with a single scan of the list of GTIDs, avoiding the need
to build an in-memory hash or stuff like that.
We need to check that slave did not request GTID D-S-N1, when the
Gtid_list_log_event for this binlog file has D-S-N2 with N2 >= N1.
In addition, we need to check that we do not have a GTID D-S-N3 in the
Gtid_list_log_event where D is not present in the requested slave state at
all. Since if D is not in requested slave state, it means that slave needs
to start at the very first GTID in domain D.
*/
static bool
contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
......@@ -707,21 +718,189 @@ contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
for (i= 0; i < glev->count; ++i)
{
const rpl_gtid *gtid= st->find(glev->list[i].domain_id);
if (gtid != NULL &&
gtid->server_id == glev->list[i].server_id &&
if (!gtid)
{
/*
The slave needs to start from the very beginning of this domain, which
is in an earlier binlog file. So we need to search back further.
*/
return false;
}
if (gtid->server_id == glev->list[i].server_id &&
gtid->seq_no <= glev->list[i].seq_no)
{
/*
The slave needs to receive gtid, but it is contained in an earlier
binlog file. So we need to serch back further.
binlog file. So we need to search back further.
*/
return false;
}
}
return true;
}
/*
Check the start GTID state requested by the slave against our binlog state.
Give an error if the slave requests something that we do not have in our
binlog.
T
*/
static int
check_slave_start_position(THD *thd, slave_connection_state *st,
const char **errormsg, rpl_gtid *error_gtid)
{
uint32 i;
bool found;
int err;
rpl_gtid **delete_list= NULL;
uint32 delete_idx= 0;
bool slave_state_loaded= false;
uint32 missing_domains= 0;
rpl_gtid missing_domain_gtid;
for (i= 0; i < st->hash.records; ++i)
{
rpl_gtid *slave_gtid= (rpl_gtid *)my_hash_element(&st->hash, i);
rpl_gtid master_gtid;
rpl_gtid master_replication_gtid;
rpl_gtid start_gtid;
if ((found= mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
slave_gtid->server_id,
&master_gtid)) &&
master_gtid.seq_no >= slave_gtid->seq_no)
continue;
if (!slave_state_loaded)
{
if (rpl_load_gtid_slave_state(thd))
{
*errormsg= "Failed to load replication slave GTID state";
err= ER_CANNOT_LOAD_SLAVE_GTID_STATE;
goto end;
}
slave_state_loaded= true;
}
if (!rpl_global_gtid_slave_state.domain_to_gtid(slave_gtid->domain_id,
&master_replication_gtid) ||
slave_gtid->server_id != master_replication_gtid.server_id ||
slave_gtid->seq_no != master_replication_gtid.seq_no)
{
rpl_gtid domain_gtid;
if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
&domain_gtid))
{
/*
We do not have anything in this domain, neither in the binlog nor
in the slave state. So we are probably one master in a multi-master
setup, and this domain is served by a different master.
This is not an error, however if we are missing _all_ domains
requested by the slave, then we still give error (below, after
the loop).
*/
if (!(missing_domains++))
missing_domain_gtid= domain_gtid;
continue;
}
*errormsg= "Requested slave GTID state not found in binlog";
*error_gtid= *slave_gtid;
err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
goto end;
}
/*
Ok, so connecting slave asked to start at a GTID that we do not have in
our binlog, but it was in fact the last GTID we applied earlier, when we
were acting as a replication slave.
So this means that we were running as a replication slave without
--log-slave-updates, but now we switched to be a master. It is worth it
to handle this special case, as it allows users to run a simple
master -> slave without --log-slave-updates, and then exchange slave and
master, as long as they make sure the slave is caught up before switching.
*/
/*
First check if we logged something ourselves as a master after being a
slave. This will be seen as a GTID with our own server_id and bigger
seq_no than what is in the slave state.
If we did not log anything ourselves, then start the connecting slave
replicating from the current binlog end position, which in this case
corresponds to our replication slave state and hence what the connecting
slave is requesting.
*/
if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
global_system_variables.server_id,
&start_gtid) &&
start_gtid.seq_no > slave_gtid->seq_no)
{
/*
Start replication within this domain at the first GTID that we logged
ourselves after becoming a master.
*/
slave_gtid->server_id= global_system_variables.server_id;
}
else if (mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
&start_gtid))
{
slave_gtid->server_id= start_gtid.server_id;
slave_gtid->seq_no= start_gtid.seq_no;
}
else
{
/*
We do not have _anything_ in our own binlog for this domain. Just
delete the entry in the slave connection state, then it will pick up
anything new that arrives.
We just queue up the deletion and do it later, after the loop, so that
we do not mess up the iteration over the hash.
*/
if (!delete_list)
{
if ((delete_list= (rpl_gtid **)my_malloc(sizeof(*delete_list),
MYF(MY_WME))))
{
*errormsg= "Out of memory while checking slave start position";
err= ER_OUT_OF_RESOURCES;
goto end;
}
}
delete_list[delete_idx++]= slave_gtid;
}
}
if (missing_domains == st->hash.records && missing_domains > 0)
{
*errormsg= "Requested slave GTID state not found in binlog";
*error_gtid= missing_domain_gtid;
err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
goto end;
}
/* Do any delayed deletes from the hash. */
if (delete_list)
{
for (i= 0; i < delete_idx; ++i)
st->remove(delete_list[i]);
}
err= 0;
end:
if (delete_list)
my_free(delete_list);
return err;
}
/*
Find the name of the binlog file to start reading for a slave that connects
using GTID state.
......@@ -1217,6 +1396,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
bool using_gtid_state;
slave_connection_state gtid_state, return_gtid_state;
rpl_gtid error_gtid;
enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
......@@ -1291,6 +1471,12 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
if ((error= check_slave_start_position(thd, &gtid_state, &errmsg,
&error_gtid)))
{
my_errno= error;
goto err;
}
if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
......@@ -1812,6 +1998,22 @@ impossible position";
my_basename(p_coord->file_name), p_coord->pos,
my_basename(log_file_name), my_b_tell(&log));
}
else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG)
{
my_snprintf(error_text, sizeof(error_text),
"Error: connecting slave requested to start from GTID "
"%u-%u-%llu, which is not in the master's binlog",
error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
/* Use this error code so slave will know not to try reconnect. */
my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
}
else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
{
my_snprintf(error_text, sizeof(error_text),
"Failed to load replication slave GTID state from table %s.%s",
"mysql", rpl_gtid_slave_state_table_name.str);
my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
}
else
strcpy(error_text, errmsg);
end_io_cache(&log);
......@@ -2296,6 +2498,53 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
ret= TRUE;
goto err;
}
/*
Check our own binlog for any of our own transactions that are newer
than the GTID state the user is requesting. Any such transactions would
result in an out-of-order binlog, which could break anyone replicating
with us as master.
So give an error if this is found, requesting the user to do a
RESET MASTER (to clean up the binlog) if they really want this.
*/
if (mysql_bin_log.is_open())
{
rpl_gtid *binlog_gtid_list= NULL;
uint32 num_binlog_gtids= 0;
uint32 i;
if (mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list,
&num_binlog_gtids))
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
ret= TRUE;
goto err;
}
for (i= 0; i < num_binlog_gtids; ++i)
{
rpl_gtid *binlog_gtid= &binlog_gtid_list[i];
rpl_gtid *slave_gtid;
if (binlog_gtid->server_id != global_system_variables.server_id)
continue;
if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id)))
continue;
if (slave_gtid->seq_no < binlog_gtid->seq_no)
{
my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0),
slave_gtid->domain_id, slave_gtid->server_id,
slave_gtid->seq_no, binlog_gtid->domain_id,
binlog_gtid->server_id, binlog_gtid->seq_no);
break;
}
}
my_free(binlog_gtid_list);
if (i != num_binlog_gtids)
{
ret= TRUE;
goto err;
}
}
}
thd_proc_info(thd, "Changing master");
......@@ -3042,127 +3291,4 @@ rpl_deinit_gtid_slave_state()
rpl_global_gtid_slave_state.deinit();
}
int
rpl_load_gtid_slave_state(THD *thd)
{
TABLE_LIST tlist;
TABLE *table;
bool table_opened= false;
bool table_scanned= false;
struct local_element { uint64 sub_id; rpl_gtid gtid; };
struct local_element *entry;
HASH hash;
int err= 0;
uint32 i;
DBUG_ENTER("rpl_load_gtid_slave_state");
my_hash_init(&hash, &my_charset_bin, 32,
offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id),
sizeof(uint32), NULL, my_free, HASH_UNIQUE);
rpl_global_gtid_slave_state.lock();
bool loaded= rpl_global_gtid_slave_state.loaded;
rpl_global_gtid_slave_state.unlock();
if (loaded)
goto end;
mysql_reset_thd_for_next_command(thd, 0);
tlist.init_one_table(STRING_WITH_LEN("mysql"),
rpl_gtid_slave_state_table_name.str,
rpl_gtid_slave_state_table_name.length,
NULL, TL_READ);
if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
goto end;
table_opened= true;
table= tlist.table;
if ((err= gtid_check_rpl_slave_state_table(table)))
goto end;
bitmap_set_all(table->read_set);
if ((err= table->file->ha_rnd_init_with_error(1)))
goto end;
table_scanned= true;
for (;;)
{
uint32 domain_id, server_id;
uint64 sub_id, seq_no;
uchar *rec;
if ((err= table->file->ha_rnd_next(table->record[0])))
{
if (err == HA_ERR_RECORD_DELETED)
continue;
else if (err == HA_ERR_END_OF_FILE)
break;
else
goto end;
}
domain_id= (ulonglong)table->field[0]->val_int();
sub_id= (ulonglong)table->field[1]->val_int();
server_id= (ulonglong)table->field[2]->val_int();
seq_no= (ulonglong)table->field[3]->val_int();
DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n",
(unsigned)domain_id, (unsigned)server_id,
(ulong)seq_no, (ulong)sub_id));
if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
{
entry= (struct local_element *)rec;
if (entry->sub_id >= sub_id)
continue;
}
else
{
if (!(entry= (struct local_element *)my_malloc(sizeof(*entry),
MYF(MY_WME))))
{
err= 1;
goto end;
}
if ((err= my_hash_insert(&hash, (uchar *)entry)))
{
my_free(entry);
goto end;
}
}
entry->sub_id= sub_id;
entry->gtid.domain_id= domain_id;
entry->gtid.server_id= server_id;
entry->gtid.seq_no= seq_no;
}
rpl_global_gtid_slave_state.lock();
for (i= 0; i < hash.records; ++i)
{
entry= (struct local_element *)my_hash_element(&hash, i);
if ((err= rpl_global_gtid_slave_state.update(entry->gtid.domain_id,
entry->gtid.server_id,
entry->sub_id,
entry->gtid.seq_no)))
{
rpl_global_gtid_slave_state.unlock();
goto end;
}
}
rpl_global_gtid_slave_state.loaded= true;
rpl_global_gtid_slave_state.unlock();
err= 0; /* Clear HA_ERR_END_OF_FILE */
end:
if (table_scanned)
{
table->file->ha_index_or_rnd_end();
ha_commit_trans(thd, FALSE);
ha_commit_trans(thd, TRUE);
}
if (table_opened)
close_thread_tables(thd);
my_hash_free(&hash);
DBUG_RETURN(err);
}
#endif /* HAVE_REPLICATION */
......@@ -68,7 +68,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
int rpl_load_gtid_slave_state(THD *thd);
int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
#endif /* HAVE_REPLICATION */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment