Commit a69a6dda authored by Monty's avatar Monty

MDEV-4487 Allow replication from MySQL 5.6+ when GTID is enabled on the master

MDEV-8685 MariaDB fails to decode Anonymous_GTID entries
MDEV-5705 Replication testing: 5.6->10.0

- Ignoring GTID events from MySQL 5.6+ (Allows replication from MySQL 5.6+ with GTID enabled)
- Added ignorable events from MySQL 5.6
- mysqlbinlog now writes information about GTID and ignorable events.
- Added more information in error message when replication stops because of wrong information in binary log.
- Fixed wrong test when write_on_release() should flush cache.
parent 7c1e2fe3
......@@ -305,7 +305,7 @@ class Write_on_release_cache
~Write_on_release_cache()
{
copy_event_cache_to_file_and_reinit(m_cache, m_file);
if (m_flags | FLUSH_F)
if (m_flags & FLUSH_F)
fflush(m_file);
}
......@@ -813,6 +813,15 @@ const char* Log_event::get_type_str(Log_event_type type)
case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint";
case GTID_EVENT: return "Gtid";
case GTID_LIST_EVENT: return "Gtid_list";
/* The following is only for mysqlbinlog */
case IGNORABLE_LOG_EVENT: return "Ignorable log event";
case ROWS_QUERY_LOG_EVENT: return "MySQL Rows_query";
case GTID_LOG_EVENT: return "MySQL Gtid";
case ANONYMOUS_GTID_LOG_EVENT: return "MySQL Anonymous_Gtid";
case PREVIOUS_GTIDS_LOG_EVENT: return "MySQL Previous_gtids";
case HEARTBEAT_LOG_EVENT: return "Heartbeat";
default: return "Unknown"; /* impossible */
}
}
......@@ -1416,6 +1425,8 @@ Log_event* Log_event::read_log_event(IO_CACHE* file,
DBUG_ENTER("Log_event::read_log_event");
DBUG_ASSERT(description_event != 0);
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
my_off_t position= my_b_tell(file);
/*
First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
check the event for sanity and to know its length; no need to really parse
......@@ -1427,7 +1438,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file,
LOG_EVENT_MINIMAL_HEADER_LEN);
LOCK_MUTEX;
DBUG_PRINT("info", ("my_b_tell: %lu", (ulong) my_b_tell(file)));
DBUG_PRINT("info", ("my_b_tell: %llu", (ulonglong) position));
if (my_b_read(file, (uchar *) head, header_size))
{
DBUG_PRINT("info", ("Log_event::read_log_event(IO_CACHE*,Format_desc*) \
......@@ -1484,8 +1495,9 @@ failed my_b_read"));
{
DBUG_ASSERT(error != 0);
sql_print_error("Error in Log_event::read_log_event(): "
"'%s', data_len: %lu, event_type: %d",
error,data_len,(uchar)(head[EVENT_TYPE_OFFSET]));
"'%s' at offset: %llu data_len: %lu event_type: %d",
error, position, data_len,
(uchar)(head[EVENT_TYPE_OFFSET]));
my_free(buf);
/*
The SQL slave thread will check if file->error<0 to know
......@@ -1518,10 +1530,12 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version));
DBUG_DUMP("data", (unsigned char*) buf, event_len);
/* Check the integrity */
/*
Check the integrity; This is needed because handle_slave_io() doesn't
check if packet is of proper length.
*/
if (event_len < EVENT_LEN_OFFSET ||
(uchar)buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
event_len != uint4korr(buf+EVENT_LEN_OFFSET))
{
*error="Sanity check failed"; // Needed to free buffer
DBUG_RETURN(NULL); // general sanity check - will fail on a partial read
......@@ -1703,6 +1717,15 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
case DELETE_ROWS_EVENT:
ev = new Delete_rows_log_event(buf, event_len, description_event);
break;
/* MySQL GTID events are ignored */
case GTID_LOG_EVENT:
case ANONYMOUS_GTID_LOG_EVENT:
case PREVIOUS_GTIDS_LOG_EVENT:
ev= new Ignorable_log_event(buf, description_event,
get_type_str((Log_event_type) event_type));
break;
case TABLE_MAP_EVENT:
ev = new Table_map_log_event(buf, event_len, description_event);
break;
......@@ -1720,10 +1743,22 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
ev = new Annotate_rows_log_event(buf, event_len, description_event);
break;
default:
DBUG_PRINT("error",("Unknown event code: %d",
(int) buf[EVENT_TYPE_OFFSET]));
ev= NULL;
break;
/*
Create an object of Ignorable_log_event for unrecognized sub-class.
So that SLAVE SQL THREAD will only update the position and continue.
*/
if (uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F)
{
ev= new Ignorable_log_event(buf, description_event,
get_type_str((Log_event_type) event_type));
}
else
{
DBUG_PRINT("error",("Unknown event code: %d",
(int) buf[EVENT_TYPE_OFFSET]));
ev= NULL;
break;
}
}
}
......@@ -4891,6 +4926,9 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
post_header_len[IGNORABLE_LOG_EVENT-1]= 0;
post_header_len[ROWS_QUERY_LOG_EVENT-1]= 0;
post_header_len[GTID_LOG_EVENT-1]= 0;
post_header_len[ANONYMOUS_GTID_LOG_EVENT-1]= 0;
post_header_len[PREVIOUS_GTIDS_LOG_EVENT-1]= 0;
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
......@@ -12639,6 +12677,52 @@ Incident_log_event::write_data_body(IO_CACHE *file)
}
Ignorable_log_event::Ignorable_log_event(const char *buf,
const Format_description_log_event
*descr_event,
const char *event_name)
:Log_event(buf, descr_event), number((int) (uchar) buf[EVENT_TYPE_OFFSET]),
description(event_name)
{
DBUG_ENTER("Ignorable_log_event::Ignorable_log_event");
DBUG_VOID_RETURN;
}
Ignorable_log_event::~Ignorable_log_event()
{
}
#ifndef MYSQL_CLIENT
/* Pack info for its unrecognized ignorable event */
void Ignorable_log_event::pack_info(THD *thd, Protocol *protocol)
{
char buf[256];
size_t bytes;
bytes= my_snprintf(buf, sizeof(buf), "# Ignorable event type %d (%s)",
number, description);
protocol->store(buf, bytes, &my_charset_bin);
}
#endif
#ifdef MYSQL_CLIENT
/* Print for its unrecognized ignorable event */
void
Ignorable_log_event::print(FILE *file,
PRINT_EVENT_INFO *print_event_info)
{
if (print_event_info->short_form)
return;
print_header(&print_event_info->head_cache, print_event_info, FALSE);
my_b_printf(&print_event_info->head_cache, "\tIgnorable\n");
my_b_printf(&print_event_info->head_cache,
"# Ignorable event type %d (%s)\n", number, description);
copy_event_cache_to_file_and_reinit(&print_event_info->head_cache,
file);
}
#endif
#ifdef MYSQL_CLIENT
/**
The default values for these variables should be values that are
......@@ -12720,4 +12804,25 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos,
return TRUE;
#endif
}
/**
Check if we should write event to the relay log
This is used to skip events that is only supported by MySQL
Return:
0 ok
1 Don't write event
*/
bool event_that_should_be_ignored(const char *buf)
{
uint event_type= (uchar)buf[EVENT_TYPE_OFFSET];
if (event_type == GTID_LOG_EVENT ||
event_type == ANONYMOUS_GTID_LOG_EVENT ||
event_type == PREVIOUS_GTIDS_LOG_EVENT ||
(uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F))
return 1;
return 0;
}
#endif
......@@ -260,6 +260,7 @@ struct sql_ex_info
#define EXECUTE_LOAD_QUERY_HEADER_LEN (QUERY_HEADER_LEN + EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN)
#define INCIDENT_HEADER_LEN 2
#define HEARTBEAT_HEADER_LEN 0
#define IGNORABLE_HEADER_LEN 0
#define ROWS_HEADER_LEN_V2 10
#define ANNOTATE_ROWS_HEADER_LEN 0
#define BINLOG_CHECKPOINT_HEADER_LEN 4
......@@ -519,6 +520,17 @@ struct sql_ex_info
*/
#define LOG_EVENT_RELAY_LOG_F 0x40
/**
@def LOG_EVENT_IGNORABLE_F
For an event, 'e', carrying a type code, that a slave,
's', does not recognize, 's' will check 'e' for
LOG_EVENT_IGNORABLE_F, and if the flag is set, then 'e'
is ignored. Otherwise, 's' acknowledges that it has
found an unknown event in the relay log.
*/
#define LOG_EVENT_IGNORABLE_F 0x80
/**
@def LOG_EVENT_SKIP_REPLICATION_F
......@@ -697,6 +709,11 @@ enum Log_event_type
UPDATE_ROWS_EVENT = 31,
DELETE_ROWS_EVENT = 32,
/* MySQL 5.6 GTID events, ignored by MariaDB */
GTID_LOG_EVENT= 33,
ANONYMOUS_GTID_LOG_EVENT= 34,
PREVIOUS_GTIDS_LOG_EVENT= 35,
/*
Add new events here - right above this comment!
Existing events (except ENUM_END_EVENT) should never change their numbers
......@@ -4740,6 +4757,60 @@ class Incident_log_event : public Log_event {
LEX_STRING m_message;
};
/**
@class Ignorable_log_event
Base class for ignorable log events. Events deriving from
this class can be safely ignored by slaves that cannot
recognize them. Newer slaves, will be able to read and
handle them. This has been designed to be an open-ended
architecture, so adding new derived events shall not harm
the old slaves that support ignorable log event mechanism
(they will just ignore unrecognized ignorable events).
@note The only thing that makes an event ignorable is that it has
the LOG_EVENT_IGNORABLE_F flag set. It is not strictly necessary
that ignorable event types derive from Ignorable_log_event; they may
just as well derive from Log_event and pass LOG_EVENT_IGNORABLE_F as
argument to the Log_event constructor.
**/
class Ignorable_log_event : public Log_event {
public:
int number;
const char *description;
#ifndef MYSQL_CLIENT
Ignorable_log_event(THD *thd_arg)
:Log_event(thd_arg, LOG_EVENT_IGNORABLE_F, FALSE),
number(0), description("internal")
{
DBUG_ENTER("Ignorable_log_event::Ignorable_log_event");
DBUG_VOID_RETURN;
}
#endif
Ignorable_log_event(const char *buf,
const Format_description_log_event *descr_event,
const char *event_name);
virtual ~Ignorable_log_event();
#ifndef MYSQL_CLIENT
void pack_info(THD *, Protocol*);
#endif
#ifdef MYSQL_CLIENT
virtual void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
#endif
virtual Log_event_type get_type_code() { return IGNORABLE_LOG_EVENT; }
virtual bool is_valid() const { return 1; }
virtual int get_data_size() { return IGNORABLE_HEADER_LEN; }
};
static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache,
FILE *file)
{
......@@ -4797,6 +4868,7 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos,
ulonglong *relay_log_pos);
bool event_checksum_test(uchar *buf, ulong event_len, uint8 alg);
bool event_that_should_be_ignored(const char *buf);
uint8 get_checksum_alg(const char* buf, ulong len);
extern TYPELIB binlog_checksum_typelib;
......
......@@ -5615,6 +5615,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
error_msg.append(llbuf, strlen(llbuf));
goto err;
}
/*
Heartbeat events doesn't count in the binlog size, so we don't have to
increment mi->master_log_pos
*/
goto skip_relay_logging;
}
break;
......@@ -5844,6 +5849,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
else
if ((s_id == global_system_variables.server_id &&
!mi->rli.replicate_same_server_id) ||
event_that_should_be_ignored(buf) ||
/*
the following conjunction deals with IGNORE_SERVER_IDS, if set
If the master is on the ignore list, execution of
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment