Commit 514b8261 authored by Libing Song's avatar Libing Song

Bug#17402313 DUMP THREAD SENDS SOME EVENTS MORE THAN ONCE

Dump thread may encounter an error when reading events from the active binlog
file. However the errors may be temporary, so dump thread will try to read
the event again. But dump thread seeked to an wrong position, it caused some
events was sent twice.

To fix the bug, prev_pos is defined out the while loop and is set the correct
position after reading every event correctly.

This patch also make binlog_can_be_corrupted more accurate, only the binlogs
not closed normally are marked binlog_can_be_corrupted.

Finally, two warnings are added when dump threads encounter the temporary
errors.
parent 044a7d7c
...@@ -1027,6 +1027,17 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, ...@@ -1027,6 +1027,17 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
if (log_file_name_arg) if (log_file_name_arg)
*is_binlog_active= mysql_bin_log.is_active(log_file_name_arg); *is_binlog_active= mysql_bin_log.is_active(log_file_name_arg);
DBUG_EXECUTE_IF("dump_fake_io_error",
{
if (log_lock)
{
pthread_mutex_unlock(log_lock);
DBUG_SET("-d,dump_fake_io_error");
DBUG_RETURN(LOG_READ_IO);
}
});
if (my_b_read(file, (uchar*) buf, sizeof(buf))) if (my_b_read(file, (uchar*) buf, sizeof(buf)))
{ {
/* /*
......
...@@ -360,6 +360,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ...@@ -360,6 +360,9 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
int left_events = max_binlog_dump_events; int left_events = max_binlog_dump_events;
#endif #endif
int old_max_allowed_packet= thd->variables.max_allowed_packet; int old_max_allowed_packet= thd->variables.max_allowed_packet;
bool is_active_binlog= false;
my_off_t prev_pos= pos;
DBUG_ENTER("mysql_binlog_send"); DBUG_ENTER("mysql_binlog_send");
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
...@@ -483,7 +486,8 @@ impossible position"; ...@@ -483,7 +486,8 @@ impossible position";
Try to find a Format_description_log_event at the beginning of Try to find a Format_description_log_event at the beginning of
the binlog the binlog
*/ */
if (!(error = Log_event::read_log_event(&log, packet, log_lock))) if (!(error = Log_event::read_log_event(&log, packet, log_lock,
log_file_name, &is_active_binlog)))
{ {
/* /*
The packet has offsets equal to the normal offsets in a binlog The packet has offsets equal to the normal offsets in a binlog
...@@ -494,8 +498,14 @@ impossible position"; ...@@ -494,8 +498,14 @@ impossible position";
(*packet)[EVENT_TYPE_OFFSET+1])); (*packet)[EVENT_TYPE_OFFSET+1]));
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{ {
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & /*
LOG_EVENT_BINLOG_IN_USE_F); If a binlog is not active, but LOG_EVENT_BINLOG_IN_USE_F
flag is 1. That means it is not closed in normal way
(E.g server crash) and may include corrupted events.
*/
binlog_can_be_corrupted= (!is_active_binlog) &&
test((*packet)[FLAGS_OFFSET+1] & LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/* /*
mark that this event with "log_pos=0", so the slave mark that this event with "log_pos=0", so the slave
...@@ -547,12 +557,11 @@ impossible position"; ...@@ -547,12 +557,11 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed) while (!net->error && net->vio != 0 && !thd->killed)
{ {
my_off_t prev_pos= pos;
bool is_active_binlog= false;
while (!(error= Log_event::read_log_event(&log, packet, log_lock, while (!(error= Log_event::read_log_event(&log, packet, log_lock,
log_file_name, log_file_name,
&is_active_binlog))) &is_active_binlog)))
{ {
DBUG_ASSERT(prev_pos < my_b_tell(&log));
prev_pos= my_b_tell(&log); prev_pos= my_b_tell(&log);
#ifndef DBUG_OFF #ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--) if (max_binlog_dump_events && !left_events--)
...@@ -580,8 +589,9 @@ impossible position"; ...@@ -580,8 +589,9 @@ impossible position";
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{ {
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & binlog_can_be_corrupted= (!is_active_binlog) &&
LOG_EVENT_BINLOG_IN_USE_F); test((*packet)[FLAGS_OFFSET+1] & LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
} }
else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
...@@ -620,9 +630,31 @@ impossible position"; ...@@ -620,9 +630,31 @@ impossible position";
here we were reading binlog that was not closed properly (as a result here we were reading binlog that was not closed properly (as a result
of a crash ?). treat any corruption as EOF of a crash ?). treat any corruption as EOF
*/ */
if (binlog_can_be_corrupted && if ((binlog_can_be_corrupted || is_active_binlog) &&
error != LOG_READ_MEM && error != LOG_READ_EOF) error != LOG_READ_MEM && error != LOG_READ_EOF)
{ {
test_for_non_eof_log_read_errors(error, &errmsg);
if (is_active_binlog)
{
sql_print_warning("Failed to read an event from active binlog(%s,%lu). "
"error: %s. Dump thread will try to read it again.",
log_file_name, (ulong)prev_pos, errmsg);
}
else
{
sql_print_warning("Failed to read an event from inactive binlog"
"(%s, %lu). error: %s. Dump thread found the binlog "
"was not rotated correctly. It will jump to next "
"binlog directly.",
log_file_name, (ulong) prev_pos, errmsg);
}
errmsg= NULL;
/*
If binlog is active, it will try to read the event again. Otherwise,
skip the corrupted events and switch to next binlog.
*/
my_b_seek(&log, prev_pos); my_b_seek(&log, prev_pos);
error=LOG_READ_EOF; error=LOG_READ_EOF;
} }
...@@ -695,6 +727,8 @@ impossible position"; ...@@ -695,6 +727,8 @@ impossible position";
/* we read successfully, so we'll need to send it to the slave */ /* we read successfully, so we'll need to send it to the slave */
pthread_mutex_unlock(log_lock); pthread_mutex_unlock(log_lock);
read_packet = 1; read_packet = 1;
DBUG_ASSERT(prev_pos < my_b_tell(&log));
prev_pos= my_b_tell(&log);
break; break;
case LOG_READ_EOF: case LOG_READ_EOF:
...@@ -757,6 +791,7 @@ impossible position"; ...@@ -757,6 +791,7 @@ impossible position";
thd_proc_info(thd, "Finished reading one binlog; switching to next binlog"); thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
switch (mysql_bin_log.find_next_log(&linfo, 1)) { switch (mysql_bin_log.find_next_log(&linfo, 1)) {
case 0: case 0:
prev_pos= BIN_LOG_HEADER_SIZE;
break; break;
case LOG_INFO_EOF: case LOG_INFO_EOF:
if (mysql_bin_log.is_active(log_file_name)) if (mysql_bin_log.is_active(log_file_name))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment