Commit 041f26d3 authored by unknown's avatar unknown

MDEV-26. Intermediate commit.

Implement binlog_gtid_pos() function. This will be used so that
the slave can obtain the gtid position automatically from first
connect with old-style position - then MASTER_GTID_POS=AUTO will
work the next time. Can also be used by mysqldump --master-data
to give the current gtid position directly.
parent 132c2660
......@@ -447,6 +447,19 @@ protected:
};
class Create_func_binlog_gtid_pos : public Create_func_arg2
{
public:
virtual Item *create_2_arg(THD *thd, Item *arg1, Item *arg2);
static Create_func_binlog_gtid_pos s_singleton;
protected:
Create_func_binlog_gtid_pos() {}
virtual ~Create_func_binlog_gtid_pos() {}
};
class Create_func_bit_count : public Create_func_arg1
{
public:
......@@ -3100,6 +3113,16 @@ Create_func_bin::create_1_arg(THD *thd, Item *arg1)
}
Create_func_binlog_gtid_pos Create_func_binlog_gtid_pos::s_singleton;
Item*
Create_func_binlog_gtid_pos::create_2_arg(THD *thd, Item *arg1, Item *arg2)
{
thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
return new (thd->mem_root) Item_func_binlog_gtid_pos(arg1, arg2);
}
Create_func_bit_count Create_func_bit_count::s_singleton;
Item*
......@@ -5323,6 +5346,7 @@ static Native_func_registry func_array[] =
{ { C_STRING_WITH_LEN("ATAN2") }, BUILDER(Create_func_atan)},
{ { C_STRING_WITH_LEN("BENCHMARK") }, BUILDER(Create_func_benchmark)},
{ { C_STRING_WITH_LEN("BIN") }, BUILDER(Create_func_bin)},
{ { C_STRING_WITH_LEN("BINLOG_GTID_POS") }, BUILDER(Create_func_binlog_gtid_pos)},
{ { C_STRING_WITH_LEN("BIT_COUNT") }, BUILDER(Create_func_bit_count)},
{ { C_STRING_WITH_LEN("BIT_LENGTH") }, BUILDER(Create_func_bit_length)},
{ { C_STRING_WITH_LEN("BUFFER") }, GEOM_BUILDER(Create_func_buffer)},
......
......@@ -59,6 +59,7 @@ C_MODE_START
#include "../mysys/my_static.h" // For soundex_map
C_MODE_END
#include "sql_show.h" // append_identifier
#include <sql_repl.h>
/**
@todo Remove this. It is not safe to use a shared String object.
......@@ -2725,6 +2726,40 @@ err:
}
void Item_func_binlog_gtid_pos::fix_length_and_dec()
{
collation.set(system_charset_info);
max_length= MAX_BLOB_WIDTH;
maybe_null= 1;
}
String *Item_func_binlog_gtid_pos::val_str(String *str)
{
DBUG_ASSERT(fixed == 1);
String name_str, *name;
longlong pos;
if (args[0]->null_value || args[1]->null_value)
goto err;
name= args[0]->val_str(&name_str);
pos= args[1]->val_int();
if (pos < 0 || pos > UINT_MAX32)
goto err;
if (gtid_state_from_binlog_pos(name->c_ptr_safe(), (uint32)pos, str))
goto err;
null_value= 0;
return str;
err:
null_value= 1;
return NULL;
}
void Item_func_rpad::fix_length_and_dec()
{
// Handle character set for args[0] and args[2].
......
......@@ -616,6 +616,17 @@ public:
};
class Item_func_binlog_gtid_pos :public Item_str_func
{
String tmp_value;
public:
Item_func_binlog_gtid_pos(Item *arg1,Item *arg2) :Item_str_func(arg1,arg2) {}
String *val_str(String *);
void fix_length_and_dec();
const char *func_name() const { return "binlog_gtid_pos"; }
};
class Item_func_rpad :public Item_str_func
{
String tmp_value, rpad_str;
......
......@@ -6701,6 +6701,19 @@ slave_connection_state::load(char *slave_request, size_t len)
}
int
slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count)
{
uint32 i;
my_hash_reset(&hash);
for (i= 0; i < count; ++i)
if (update(&gtid_list[i]))
return 1;
return 0;
}
rpl_gtid *
slave_connection_state::find(uint32 domain_id)
{
......@@ -6708,6 +6721,30 @@ slave_connection_state::find(uint32 domain_id)
}
int
slave_connection_state::update(const rpl_gtid *in_gtid)
{
rpl_gtid *new_gtid;
uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
if (rec)
{
memcpy(rec, in_gtid, sizeof(*in_gtid));
return 0;
}
if (!(new_gtid= (rpl_gtid *)my_malloc(sizeof(*new_gtid), MYF(MY_WME))))
return 1;
memcpy(new_gtid, in_gtid, sizeof(*new_gtid));
if (my_hash_insert(&hash, (uchar *)new_gtid))
{
my_free(new_gtid);
return 1;
}
return 0;
}
void
slave_connection_state::remove(const rpl_gtid *in_gtid)
{
......@@ -6725,6 +6762,30 @@ slave_connection_state::remove(const rpl_gtid *in_gtid)
}
int
slave_connection_state::to_string(String *out_str)
{
uint32 i;
out_str->length(0);
for (i= 0; i < hash.records; ++i)
{
const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i);
if (i && out_str->append(","))
return 1;
if (gtid->domain_id &&
(out_str->append_ulonglong(gtid->domain_id) ||
out_str->append("-")))
return 1;
if (out_str->append_ulonglong(gtid->server_id) ||
out_str->append("-") ||
out_str->append_ulonglong(gtid->seq_no))
return 1;
}
return 0;
}
#endif /* MYSQL_SERVER */
......@@ -7155,6 +7216,46 @@ Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
#endif /* MYSQL_SERVER */
/*
Used to record gtid_list event while sending binlog to slave, without having to
fully contruct the event object.
*/
bool
Gtid_list_log_event::peek(const char *event_start, uint32 event_len,
rpl_gtid **out_gtid_list, uint32 *out_list_len)
{
const char *p;
uint32 count_field, count;
rpl_gtid *gtid_list;
if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN)
return true;
p= event_start + LOG_EVENT_HEADER_LEN;
count_field= uint4korr(p);
p+= 4;
count= count_field & ((1<<28)-1);
if (event_len < LOG_EVENT_HEADER_LEN + GTID_LIST_HEADER_LEN +
16 * count)
return true;
if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(rpl_gtid)*count, MYF(MY_WME))))
return true;
*out_gtid_list= gtid_list;
*out_list_len= count;
while (count--)
{
gtid_list->domain_id= uint4korr(p);
p+= 4;
gtid_list->server_id= uint4korr(p);
p+= 4;
gtid_list->seq_no= uint8korr(p);
p+= 8;
++gtid_list;
}
return false;
}
/**************************************************************************
Intvar_log_event methods
**************************************************************************/
......
......@@ -3073,9 +3073,12 @@ struct slave_connection_state
~slave_connection_state();
int load(char *slave_request, size_t len);
int load(const rpl_gtid *gtid_list, uint32 count);
rpl_gtid *find(uint32 domain_id);
int update(const rpl_gtid *in_gtid);
void remove(const rpl_gtid *gtid);
ulong count() const { return hash.records; }
int to_string(String *out_str);
};
......@@ -3261,6 +3264,8 @@ public:
#ifdef MYSQL_SERVER
bool write(IO_CACHE *file);
#endif
static bool peek(const char *event_start, uint32 event_len,
rpl_gtid **out_gtid_list, uint32 *out_list_len);
};
......
......@@ -656,8 +656,12 @@ get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
opt_master_verify_checksum)) ||
ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
{
if (ev)
delete ev;
return "Could not read format description log event while looking for "
"GTID position in binlog";
}
fdle= static_cast<Format_description_log_event *>(ev);
......@@ -795,6 +799,177 @@ end:
}
/*
Given an old-style binlog position with file name and file offset, find the
corresponding gtid position. If the offset is not at an event boundary, give
an error.
Return NULL on ok, error message string on error.
ToDo: Improve the performance of this by using binlog index files.
*/
static const char *
gtid_state_from_pos(const char *name, uint32 offset,
slave_connection_state *gtid_state)
{
IO_CACHE cache;
File file;
const char *errormsg= NULL;
bool found_gtid_list_event= false;
bool found_format_description_event= false;
bool valid_pos= false;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int err;
String packet;
if (gtid_state->load((const rpl_gtid *)NULL, 0))
{
errormsg= "Internal error (out of memory?) initializing slave state "
"while scanning binlog to find start position";
return errormsg;
}
if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1)
return errormsg;
/*
First we need to find the initial GTID_LIST_EVENT. We need this even
if the offset is at the very start of the binlog file.
But if we do not find any GTID_LIST_EVENT, then this is an old binlog
with no GTID information, so we return empty GTID state.
*/
for (;;)
{
Log_event_type typ;
uint32 cur_pos;
cur_pos= (uint32)my_b_tell(&cache);
if (cur_pos == offset)
valid_pos= true;
if (found_format_description_event && found_gtid_list_event &&
cur_pos >= offset)
break;
packet.length(0);
err= Log_event::read_log_event(&cache, &packet, NULL,
current_checksum_alg);
if (err)
{
errormsg= "Could not read binlog while searching for slave start "
"position on master";
goto end;
}
/*
The cast to uchar is needed to avoid a signed char being converted to a
negative number.
*/
typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET];
if (typ == FORMAT_DESCRIPTION_EVENT)
{
if (found_format_description_event)
{
errormsg= "Duplicate format description log event found while "
"searching for old-style position in binlog";
goto end;
}
current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length());
found_format_description_event= true;
}
else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event)
{
errormsg= "Did not find format description log event while searching "
"for old-style position in binlog";
goto end;
}
else if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
typ == BINLOG_CHECKPOINT_EVENT)
continue; /* Continue looking */
else if (typ == GTID_LIST_EVENT)
{
rpl_gtid *gtid_list;
bool status;
uint32 list_len;
if (found_gtid_list_event)
{
errormsg= "Found duplicate Gtid_list_log_event while scanning binlog "
"to find slave start position";
goto end;
}
status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
&gtid_list, &list_len);
if (status)
{
errormsg= "Error reading Gtid_list_log_event while searching "
"for old-style position in binlog";
goto end;
}
err= gtid_state->load(gtid_list, list_len);
my_free(gtid_list);
if (err)
{
errormsg= "Internal error (out of memory?) initialising slave state "
"while scanning binlog to find start position";
goto end;
}
found_gtid_list_event= true;
}
else if (!found_gtid_list_event)
{
/* We did not find any Gtid_list_log_event, must be old binlog. */
goto end;
}
else if (typ == GTID_EVENT)
{
rpl_gtid gtid;
uchar flags2;
if (Gtid_log_event::peek(packet.ptr(), packet.length(), &gtid.domain_id,
&gtid.server_id, &gtid.seq_no, &flags2))
{
errormsg= "Corrupt gtid_log_event found while scanning binlog to find "
"initial slave position";
goto end;
}
if (gtid_state->update(&gtid))
{
errormsg= "Internal error (out of memory?) updating slave state while "
"scanning binlog to find start position";
goto end;
}
}
}
if (!valid_pos)
{
errormsg= "Slave requested incorrect position in master binlog. "
"Requested position %u in file '%s', but this position does not "
"correspond to the location of any binlog event.";
}
end:
end_io_cache(&cache);
mysql_file_close(file, MYF(MY_WME));
return errormsg;
}
int
gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str)
{
slave_connection_state gtid_state;
if (pos < 4)
pos= 4;
if (gtid_state_from_pos(name, pos, &gtid_state) ||
gtid_state.to_string(out_str))
return 1;
return 0;
}
enum enum_gtid_skip_type {
GTID_SKIP_NOT, GTID_SKIP_STANDALONE, GTID_SKIP_TRANSACTION
};
......@@ -945,8 +1120,8 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
binlog positions.
*/
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
return "Failed to replace binlog checkpoint event with dummy: "
"too small event.";
return "Failed to replace binlog checkpoint or gtid list event with "
"dummy: too small event.";
}
}
......@@ -1010,7 +1185,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
char str_buf[256];
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
bool using_gtid_state;
slave_connection_state gtid_state;
slave_connection_state gtid_state, return_gtid_state;
enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
......
......@@ -69,6 +69,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
int rpl_load_gtid_slave_state(THD *thd);
int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
#endif /* HAVE_REPLICATION */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment