Commit bdbce30d authored by unknown's avatar unknown

MDEV-26: Global transaction id: Intermediate commit.

Now slave can connect to master, sending start position as slave state
rather than old-style binlog name/position.

This enables to switch to a new master by changing just connection
information, replication slave GTID state ensures that slave starts
at the correct point in the new master.
parent 3e798b03
......@@ -20,6 +20,12 @@
--
set sql_mode='';
-- We want this to be created with the default storage engine.
-- This way, if InnoDB is used we get crash safety, and if MyISAM is used
-- we avoid mixed-engine transactions.
CREATE TABLE IF NOT EXISTS rpl_slave_state (domain_id INT UNSIGNED NOT NULL, sub_id BIGINT UNSIGNED NOT NULL, server_id INT UNSIGNED NOT NULL, seq_no BIGINT UNSIGNED NOT NULL, PRIMARY KEY (domain_id, sub_id)) comment='Replication slave GTID state';
set storage_engine=myisam;
flush tables;
......
......@@ -3270,9 +3270,12 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
there had been an entry (domain_id, server_id, 0).
*/
if (rpl_global_gtid_binlog_state.count())
{
Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state);
if (gl_ev.write(&log_file))
goto err;
}
/* Output a binlog checkpoint event at the start of the binlog file. */
......
......@@ -6279,6 +6279,67 @@ rpl_slave_state::next_subid(uint32 domain_id)
#endif
/*
Prepare the current slave state as a string, suitable for sending to the
master to request to receive binlog events starting from that GTID state.
The state consists of the most recently applied GTID for each domain_id,
ie. the one with the highest sub_id within each domain_id.
*/
int
rpl_slave_state::tostring(String *dest)
{
bool first= true;
uint32 i;
int err= 1;
lock();
for (i= 0; i < hash.records; ++i)
{
uint64 best_sub_id;
rpl_gtid best_gtid;
element *e= (element *)my_hash_element(&hash, i);
list_element *l= e->list;
DBUG_ASSERT(l /* We should never have empty list in element. */);
if (!l)
goto err;
best_gtid.domain_id= e->domain_id;
best_gtid.server_id= l->server_id;
best_gtid.seq_no= l->seq_no;
best_sub_id= l->sub_id;
while ((l= l->next))
{
if (l->sub_id > best_sub_id)
{
best_sub_id= l->sub_id;
best_gtid.server_id= l->server_id;
best_gtid.seq_no= l->seq_no;
}
}
if (first)
first= false;
else
dest->append("-",1);
dest->append_ulonglong(best_gtid.domain_id);
dest->append("-",1);
dest->append_ulonglong(best_gtid.server_id);
dest->append("-",1);
dest->append_ulonglong(best_gtid.seq_no);
}
err= 0;
err:
unlock();
return err;
}
rpl_binlog_state::rpl_binlog_state()
{
my_hash_init(&hash, &my_charset_bin, 32,
......@@ -6410,6 +6471,119 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src)
}
return 0;
}
slave_connection_state::slave_connection_state()
{
my_hash_init(&hash, &my_charset_bin, 32,
offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, my_free,
HASH_UNIQUE);
}
slave_connection_state::~slave_connection_state()
{
my_hash_free(&hash);
}
/*
Create a hash from the slave GTID state that is sent to master when slave
connects to start replication.
The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
0-2-112,1-4-1022
The state gives for each domain_id the GTID to start replication from for
the corresponding replication stream. So domain_id must be unique.
Returns 0 if ok, non-zero if error due to malformed input.
Note that input string is built by slave server, so it will not be incorrect
unless bug/corruption/malicious server. So we just need basic sanity check,
not fancy user-friendly error message.
*/
int
slave_connection_state::load(char *slave_request, size_t len)
{
char *p, *q, *end;
uint64 v;
uint32 domain_id, server_id;
uint64 seq_no;
uchar *rec;
rpl_gtid *gtid;
int err= 0;
my_hash_reset(&hash);
p= slave_request;
end= slave_request + len;
for (;;)
{
q= end;
v= (uint64)my_strtoll10(p, &q, &err);
if (err != 0 || v > (uint32)0xffffffff || *q != '-')
return 1;
domain_id= (uint32)v;
p= q+1;
q= end;
v= (uint64)my_strtoll10(p, &q, &err);
if (err != 0 || v > (uint32)0xffffffff || *q != '-')
return 1;
server_id= (uint32)v;
p= q+1;
q= end;
seq_no= (uint64)my_strtoll10(p, &q, &err);
if (err != 0)
return 1;
if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME))))
return 1;
gtid= (rpl_gtid *)rec;
gtid->domain_id= domain_id;
gtid->server_id= server_id;
gtid->seq_no= seq_no;
if (my_hash_insert(&hash, rec))
{
my_free(rec);
return 1;
}
if (q == end)
break; /* Finished. */
if (*q != ',')
return 1;
p= q+1;
}
return 0;
}
rpl_gtid *
slave_connection_state::find(uint32 domain_id)
{
return (rpl_gtid *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
}
void
slave_connection_state::remove(const rpl_gtid *in_gtid)
{
bool err;
uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
#ifndef DBUG_OFF
rpl_gtid *slave_gtid= (rpl_gtid *)rec;
DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
#endif
err= my_hash_delete(&hash, rec);
DBUG_ASSERT(!err);
}
#endif /* MYSQL_SERVER */
......@@ -6443,6 +6617,28 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
{
}
/*
Used to record GTID while sending binlog to slave, without having to
fully contruct every Gtid_log_event() needlessly.
*/
bool
Gtid_log_event::peek(const char *event_start, size_t event_len,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
uchar *flags2)
{
const char *p;
if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
return true;
*server_id= uint4korr(event_start + SERVER_ID_OFFSET);
p= event_start + LOG_EVENT_HEADER_LEN;
*seq_no= uint8korr(p);
p+= 8;
*domain_id= uint4korr(p);
return false;
}
bool
Gtid_log_event::write(IO_CACHE *file)
{
......
......@@ -3008,6 +3008,7 @@ struct rpl_slave_state
int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
bool in_transaction);
uint64 next_subid(uint32 domain_id);
int tostring(String *dest);
void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
......@@ -3044,6 +3045,26 @@ struct rpl_binlog_state
int read_from_iocache(IO_CACHE *src);
};
/*
Represent the GTID state that a slave connection to a master requests
the master to start sending binlog events from.
*/
struct slave_connection_state
{
/* Mapping from domain_id to the GTID requested for that domain. */
HASH hash;
slave_connection_state();
~slave_connection_state();
int load(char *slave_request, size_t len);
rpl_gtid *find(uint32 domain_id);
void remove(const rpl_gtid *gtid);
ulong count() const { return hash.records; }
};
/**
@class Gtid_log_event
......@@ -3134,6 +3155,9 @@ public:
bool write(IO_CACHE *file);
static int make_compatible_event(String *packet, bool *need_dummy_event,
ulong ev_offset, uint8 checksum_alg);
static bool peek(const char *event_start, size_t event_len,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
uchar *flags2);
#endif
};
......
......@@ -1784,6 +1784,42 @@ past_checksum:
after_set_capability:
#endif
/* Request dump start from slave replication GTID state. */
/* ToDo: This needs to be configurable somehow in a useful way ... */
if (rpl_global_gtid_slave_state.count())
{
int rc;
char str_buf[256];
String connect_state(str_buf, sizeof(str_buf), system_charset_info);
connect_state.length(0);
connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"),
system_charset_info);
rpl_global_gtid_slave_state.tostring(&connect_state);
connect_state.append(STRING_WITH_LEN("'"), system_charset_info);
rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length());
if (rc)
{
err_code= mysql_errno(mysql);
if (is_network_error(err_code))
{
mi->report(ERROR_LEVEL, err_code,
"Setting @slave_connect_state failed with error: %s",
mysql_error(mysql));
goto network_err;
}
else
{
/* Fatal error */
errmsg= "The slave I/O thread stops because a fatal error is "
"encountered when it tries to set @slave_connect_state.";
sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql));
goto err;
}
}
}
err:
if (errmsg)
{
......
......@@ -506,6 +506,26 @@ get_mariadb_slave_capability(THD *thd)
}
/*
Get the value of the @slave_connect_state user variable into the supplied
String (this is the GTID connect state requested by the connecting slave).
Returns false if error (ie. slave did not set the variable and does not
want to use GTID to set start position), true if success.
*/
static bool
get_slave_connect_state(THD *thd, String *out_str)
{
bool null_value;
const LEX_STRING name= { C_STRING_WITH_LEN("slave_connect_state") };
user_var_entry *entry=
(user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
name.length);
return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
}
/*
Function prepares and sends repliation heartbeat event.
......@@ -569,6 +589,216 @@ static int send_heartbeat_event(NET* net, String* packet,
}
struct binlog_file_entry
{
binlog_file_entry *next;
char *name;
};
static binlog_file_entry *
get_binlog_list(MEM_ROOT *memroot)
{
IO_CACHE *index_file;
char fname[FN_REFLEN];
size_t length;
binlog_file_entry *current_list= NULL, *e;
DBUG_ENTER("get_binlog_list");
if (!mysql_bin_log.is_open())
{
my_error(ER_NO_BINARY_LOGGING, MYF(0));
DBUG_RETURN(NULL);
}
mysql_bin_log.lock_index();
index_file=mysql_bin_log.get_index_file();
reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
/* The file ends with EOF or empty line */
while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
{
--length; /* Remove the newline */
if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) ||
!(e->name= strmake_root(memroot, fname, length)))
{
mysql_bin_log.unlock_index();
my_error(ER_OUTOFMEMORY, MYF(0), length + 1 + sizeof(*e));
DBUG_RETURN(NULL);
}
e->next= current_list;
current_list= e;
}
mysql_bin_log.unlock_index();
DBUG_RETURN(current_list);
}
/*
Find the Gtid_list_log_event at the start of a binlog.
NULL for ok, non-NULL error message for error.
If ok, then the event is returned in *out_gtid_list. This can be NULL if we
get back to binlogs written by old server version without GTID support. If
so, it means we have reached the point to start from, as no GTID events can
exist in earlier binlogs.
*/
static const char *
get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
{
Format_description_log_event init_fdle(BINLOG_VERSION);
Format_description_log_event *fdle;
Log_event *ev;
const char *errormsg = NULL;
*out_gtid_list= NULL;
if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
opt_master_verify_checksum)) ||
ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
return "Could not read format description log event while looking for "
"GTID position in binlog";
fdle= static_cast<Format_description_log_event *>(ev);
for (;;)
{
Log_event_type typ;
ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum);
if (!ev)
{
errormsg= "Could not read GTID list event while looking for GTID "
"position in binlog";
break;
}
typ= ev->get_type_code();
if (typ == GTID_LIST_EVENT)
break; /* Done, found it */
if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
typ == FORMAT_DESCRIPTION_EVENT)
continue; /* Continue looking */
/* We did not find any Gtid_list_log_event, must be old binlog. */
ev= NULL;
break;
}
delete fdle;
*out_gtid_list= static_cast<Gtid_list_log_event *>(ev);
return errormsg;
}
/*
Check if every GTID requested by the slave is contained in this (or a later)
binlog file. Return true if so, false if not.
*/
static bool
contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
{
uint32 i;
for (i= 0; i < glev->count; ++i)
{
const rpl_gtid *gtid= st->find(glev->list[i].domain_id);
if (gtid != NULL &&
gtid->server_id == glev->list[i].server_id &&
gtid->seq_no <= glev->list[i].seq_no)
{
/*
The slave needs to receive gtid, but it is contained in an earlier
binlog file. So we need to serch back further.
*/
return false;
}
}
return true;
}
/*
Find the name of the binlog file to start reading for a slave that connects
using GTID state.
Returns the file name in out_name, which must be of size at least FN_REFLEN.
Returns NULL on ok, error message on error.
*/
static const char *
gtid_find_binlog_file(slave_connection_state *state, char *out_name)
{
MEM_ROOT memroot;
binlog_file_entry *list;
Gtid_list_log_event *glev;
const char *errormsg= NULL;
IO_CACHE cache;
File file = (File)-1;
char buf[FN_REFLEN];
bzero((char*) &cache, sizeof(cache));
init_alloc_root(&memroot, 10*(FN_REFLEN+sizeof(binlog_file_entry)), 0);
if (!(list= get_binlog_list(&memroot)))
{
errormsg= "Out of memory while looking for GTID position in binlog";
goto end;
}
while (list)
{
if (!list->next)
{
/*
It should be safe to read the currently used binlog, as we will only
read the header part that is already written.
But if that does not work on windows, then we will need to cache the
event somewhere in memory I suppose - that could work too.
*/
}
/*
Read the Gtid_list_log_event at the start of the binlog file to
get the binlog state.
*/
if (normalize_binlog_name(buf, list->name, false))
{
errormsg= "Failed to determine binlog file name while looking for "
"GTID position in binlog";
goto end;
}
if ((file= open_binlog(&cache, buf, &errormsg)) == (File)-1 ||
(errormsg= get_gtid_list_event(&cache, &glev)))
goto end;
if (!glev || contains_all_slave_gtid(state, glev))
{
strmake(out_name, buf, FN_REFLEN);
goto end;
}
list= list->next;
}
/* We reached the end without finding anything. */
errormsg= "Could not find GTID state requested by slave in any binlog "
"files. Probably the slave state is too old and required binlog files "
"have been purged.";
end:
if (file != (File)-1)
{
end_io_cache(&cache);
mysql_file_close(file, MYF(MY_WME));
}
free_root(&memroot, MYF(0));
return errormsg;
}
enum enum_gtid_skip_type {
GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION
};
/*
Helper function for mysql_binlog_send() to write an event down the slave
connection.
......@@ -579,10 +809,65 @@ static const char *
send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Log_event_type event_type, char *log_file_name,
IO_CACHE *log, int mariadb_slave_capability,
ulong ev_offset, uint8 current_checksum_alg)
ulong ev_offset, uint8 current_checksum_alg,
bool using_gtid_state, slave_connection_state *gtid_state,
enum_gtid_skip_type *gtid_skip_group)
{
my_off_t pos;
/* Skip GTID event groups until we reach slave position within a domain_id. */
if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0)
{
uint32 server_id, domain_id;
uint64 seq_no;
uchar flags2;
rpl_gtid *gtid;
size_t len= packet->length();
if (ev_offset > len ||
Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
&domain_id, &server_id, &seq_no, &flags2))
return "Failed to read Gtid_log_event: corrupt binlog";
gtid= gtid_state->find(domain_id);
if (gtid != NULL)
{
/* Skip this event group if we have not yet reached slave start pos. */
if (server_id != gtid->server_id || seq_no <= gtid->seq_no)
*gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
/*
Delete this entry if we have reached slave start position (so we will
not skip subsequent events and won't have to look them up and check).
*/
if (server_id == gtid->server_id && seq_no >= gtid->seq_no)
gtid_state->remove(gtid);
}
}
/*
Skip event group if we have not yet reached the correct slave GTID position.
Note that slave that understands GTID can also tolerate holes, so there is
no need to supply dummy event.
*/
switch (*gtid_skip_group)
{
case GTID_SKIP_STANDALONE:
if (event_type != INTVAR_EVENT &&
event_type != RAND_EVENT &&
event_type != USER_VAR_EVENT &&
event_type != TABLE_MAP_EVENT &&
event_type != ANNOTATE_ROWS_EVENT)
*gtid_skip_group= GTID_SKIP_NOT;
return NULL;
case GTID_SKIP_TRANSACTION:
if (event_type == XID_EVENT /* ToDo || is_COMMIT_query_event() */)
*gtid_skip_group= GTID_SKIP_NOT;
return NULL;
case GTID_SKIP_NOT:
break;
}
/* Do not send annotate_rows events unless slave requested it. */
if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
{
......@@ -722,6 +1007,11 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
mysql_mutex_t *log_lock;
mysql_cond_t *log_cond;
int mariadb_slave_capability;
char str_buf[256];
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
bool using_gtid_state;
slave_connection_state gtid_state;
enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
......@@ -748,6 +1038,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
set_timespec_nsec(*heartbeat_ts, 0);
}
mariadb_slave_capability= get_mariadb_slave_capability(thd);
connect_gtid_state.length(0);
using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
if (global_system_variables.log_warnings > 1)
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
(int)thd->variables.server_id, log_ident, (ulong)pos);
......@@ -781,10 +1075,30 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
name=search_file_name;
if (using_gtid_state)
{
if (gtid_state.load(connect_gtid_state.c_ptr_quick(),
connect_gtid_state.length()))
{
errmsg= "Out of memory or malformed slave request when obtaining start "
"position from GTID state";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name)))
{
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
pos= 4;
}
else
{
if (log_ident[0])
mysql_bin_log.make_log_name(search_file_name, log_ident);
else
name=0; // Find first log
}
linfo.index_file_offset = 0;
......@@ -1036,7 +1350,8 @@ impossible position";
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
log_file_name, &log,
mariadb_slave_capability, ev_offset,
current_checksum_alg)))
current_checksum_alg, using_gtid_state,
&gtid_state, &gtid_skip_group)))
{
errmsg= tmp_msg;
my_errno= ER_UNKNOWN_ERROR;
......@@ -1197,7 +1512,9 @@ impossible position";
(tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
log_file_name, &log,
mariadb_slave_capability, ev_offset,
current_checksum_alg)))
current_checksum_alg,
using_gtid_state, &gtid_state,
&gtid_skip_group)))
{
errmsg= tmp_msg;
my_errno= ER_UNKNOWN_ERROR;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment