Commit df7ecf64 authored by Sujatha Sivakumar's avatar Sujatha Sivakumar

Bug#23251517: SEMISYNC REPLICATION HANGING

Revert following bug fix:

Bug#20685029: SLAVE IO THREAD SHOULD STOP WHEN DISK IS
FULL
Bug#21753696: MAKE SHOW SLAVE STATUS NON BLOCKING IF IO
THREAD WAITS FOR DISK SPACE

This fix results in a deadlock between slave IO thread
and SQL thread.
parent 818b3a91
# ==== Purpose ====
#
# Grep a file for a pattern, produce a single string out of the
# matching lines, and assert that the string matches a given regular
# expression.
#
# ==== Usage ====
#
# --let $assert_text= TEXT
# --let $assert_file= FILE
# --let $assert_select= REGEX
# [--let $assert_match= REGEX | --let $assert_count= NUMBER]
# [--let $assert_only_after= REGEX]
# --source include/assert_grep.inc
#
# Parameters:
#
# $assert_text
# Text that describes what is being checked. This text is written to
# the query log so it should not contain non-deterministic elements.
#
# $assert_file
# File to search.
#
# $assert_select
# All lines matching this text will be checked.
#
# $assert_match
# The script will find all lines that match $assert_select,
# concatenate them to a long string, and assert that it matches
# $assert_match.
#
# $assert_count
# Instead of asserting that the selected lines match
# $assert_match, assert that there were exactly $assert_count
# matching lines.
#
# $assert_only_after
# Reset all the lines matched and the counter when finding this pattern.
# It is useful for searching things in the mysqld.err log file just
# after the last server restart for example (discarding the log content
# of previous server executions).
if (!$assert_text)
{
--die !!!ERROR IN TEST: you must set $assert_text
}
if (!$assert_file)
{
--die !!!ERROR IN TEST: you must set $assert_file
}
if (!$assert_select)
{
--die !!!ERROR IN TEST: you must set $assert_select
}
if ($assert_match == '')
{
if ($assert_count == '')
{
--die !!!ERROR IN TEST: you must set either $assert_match or $assert_count
}
}
if ($assert_match != '')
{
if ($assert_count != '')
{
--echo assert_text='$assert_text' assert_count='$assert_count'
--die !!!ERROR IN TEST: you must set only one of $assert_match or $assert_count
}
}
--let $include_filename= assert_grep.inc [$assert_text]
--source include/begin_include_file.inc
--let _AG_ASSERT_TEXT= $assert_text
--let _AG_ASSERT_FILE= $assert_file
--let _AG_ASSERT_SELECT= $assert_select
--let _AG_ASSERT_MATCH= $assert_match
--let _AG_ASSERT_COUNT= $assert_count
--let _AG_OUT= `SELECT CONCAT('$MYSQLTEST_VARDIR/tmp/_ag_', UUID())`
--let _AG_ASSERT_ONLY_AFTER= $assert_only_after
--perl
use strict;
use warnings;
my $file= $ENV{'_AG_ASSERT_FILE'};
my $assert_select= $ENV{'_AG_ASSERT_SELECT'};
my $assert_match= $ENV{'_AG_ASSERT_MATCH'};
my $assert_count= $ENV{'_AG_ASSERT_COUNT'};
my $assert_only_after= $ENV{'_AG_ASSERT_ONLY_AFTER'};
my $out= $ENV{'_AG_OUT'};
my $result= '';
my $count= 0;
open(FILE, "$file") or die("Error $? opening $file: $!\n");
while (<FILE>) {
my $line = $_;
if ($assert_only_after && $line =~ /$assert_only_after/) {
$result = "";
$count = 0;
}
if ($line =~ /$assert_select/) {
if ($assert_count ne '') {
$count++;
}
else {
$result .= $line;
}
}
}
close(FILE) or die("Error $? closing $file: $!");
open OUT, "> $out" or die("Error $? opening $out: $!");
if ($assert_count ne '' && ($count != $assert_count)) {
print OUT ($count) or die("Error $? writing $out: $!");
}
elsif ($assert_count eq '' && $result !~ /$assert_match/) {
print OUT ($result) or die("Error $? writing $out: $!");
}
else {
print OUT ("assert_grep.inc ok");
}
close OUT or die("Error $? closing $out: $!");
EOF
--let $_ag_outcome= `SELECT LOAD_FILE('$_AG_OUT')`
if ($_ag_outcome != 'assert_grep.inc ok')
{
--source include/show_rpl_debug_info.inc
--echo include/assert_grep.inc failed!
--echo assert_text: '$assert_text'
--echo assert_file: '$assert_file'
--echo assert_select: '$assert_select'
--echo assert_match: '$assert_match'
--echo assert_count: '$assert_count'
--echo assert_only_after: '$assert_only_after'
if ($assert_match != '')
{
--echo matching lines: '$_ag_outcome'
}
if ($assert_count != '')
{
--echo number of matching lines: $_ag_outcome
}
--die assert_grep.inc failed.
}
--let $include_filename= include/assert_grep.inc [$assert_text]
--source include/end_include_file.inc
......@@ -43,7 +43,6 @@
#
# [--let $rpl_server_count= 7]
# --let $rpl_topology= 1->2->3->1->4, 2->5, 6->7
# [--let $rpl_extra_connections_per_server= 1]
# [--let $rpl_check_server_ids= 1]
# [--let $rpl_skip_change_master= 1]
# [--let $rpl_skip_start_slave= 1]
......@@ -66,12 +65,6 @@
# want to specify the empty topology (no server replicates at
# all), you have to set $rpl_topology=none.
#
# $rpl_extra_connections_per_server
# By default, this script creates connections server_N and
# server_N_1. If you can set this variable to a number, the
# script creates:
# server_N, server_N_1, ..., server_N_$rpl_extra_connections_per_server
#
# $rpl_check_server_ids
# If $rpl_check_server_ids is set, this script checks that the
# @@server_id of all servers are different. This is normally
......@@ -146,17 +139,8 @@ if (!$SERVER_MYPORT_4)
# Check that $rpl_server_count is set
if (!$rpl_server_count)
{
--let $rpl_server_count= `SELECT REPLACE('$rpl_topology', '->', ',')`
if (`SELECT LOCATE(',', '$rpl_server_count')`)
{
--let $rpl_server_count= `SELECT GREATEST($rpl_server_count)`
}
}
--let $_rpl_extra_connections_per_server= $rpl_extra_connections_per_server
if ($_rpl_extra_connections_per_server == '')
{
--let $_rpl_extra_connections_per_server= 1
--let $_compute_rpl_server_count= `SELECT REPLACE('$rpl_topology', '->', ',')`
--let $rpl_server_count= `SELECT GREATEST($_compute_rpl_server_count)`
}
......@@ -175,20 +159,15 @@ if (!$rpl_debug)
# Create two connections to each server; reset master/slave, select
# database, set autoinc variables.
--let $_rpl_server= $rpl_server_count
--let $underscore= _
--let $_rpl_one= _1
while ($_rpl_server)
{
# Connect.
--let $rpl_server_number= $_rpl_server
--let $rpl_connection_name= server_$_rpl_server
--source include/rpl_connect.inc
--let $_rpl_connection_number= 1
while ($_rpl_connection_number <= $_rpl_extra_connections_per_server)
{
--let $rpl_connection_name= server_$_rpl_server$underscore$_rpl_connection_number
--let $rpl_connection_name= server_$_rpl_server$_rpl_one
--source include/rpl_connect.inc
--inc $_rpl_connection_number
}
# Configure server.
--let $rpl_connection_name= server_$_rpl_server
......
......@@ -12,7 +12,6 @@
# ==== Usage ====
#
# --let $rpl_server_number= N
# [--let $rpl_extra_connections_per_server= 1]
# [--let $rpl_debug= 1]
# --source include/rpl_reconnect.inc
#
......@@ -22,7 +21,7 @@
# master server, 2 the slave server, 3 the 3rd server, and so on.
# Cf. include/rpl_init.inc
#
# $rpl_extra_connections_per_server, $rpl_debug
# $rpl_debug
# See include/rpl_init.inc
--let $include_filename= rpl_reconnect.inc
......@@ -33,11 +32,6 @@ if (!$rpl_server_number)
--die ERROR IN TEST: you must set $rpl_server_number before you source rpl_connect.inc
}
if ($_rpl_extra_connections_per_server == '')
{
--let $_rpl_extra_connections_per_server= 1
}
if ($rpl_debug)
{
......@@ -78,14 +72,10 @@ if (!$_rpl_server_number)
--source include/rpl_connection.inc
--enable_reconnect
--let $_rpl_connection_number= 1
while ($_rpl_connection_number <= $_rpl_extra_connections_per_server)
{
--let $rpl_connection_name= server_$rpl_server_number$underscore$_rpl_connection_number
--source include/rpl_connection.inc
--enable_reconnect
--inc $_rpl_connection_number
}
--let $_rpl_one= _1
--let $rpl_connection_name= server_$rpl_server_number$_rpl_one
--source include/rpl_connection.inc
--enable_reconnect
if ($rpl_debug)
{
......@@ -132,15 +122,10 @@ if (!$_rpl_server_number)
--source include/wait_until_connected_again.inc
--disable_reconnect
--let $_rpl_connection_number= 1
while ($_rpl_connection_number <= $_rpl_extra_connections_per_server)
{
--let $rpl_connection_name= server_$rpl_server_number$underscore$_rpl_connection_number
--source include/rpl_connection.inc
--source include/wait_until_connected_again.inc
--disable_reconnect
--inc $_rpl_connection_number
}
--let $rpl_connection_name= server_$rpl_server_number$_rpl_one
--source include/rpl_connection.inc
--source include/wait_until_connected_again.inc
--disable_reconnect
--let $include_filename= rpl_reconnect.inc
......
# ==== Purpose ====
#
# Issues START SLAVE SQL_THREAD on the current connection. Then waits
# until the SQL thread has started, or until a timeout is reached.
#
# Please use this instead of 'START SLAVE SQL_THREAD', to reduce the
# risk of races in test cases.
#
#
# ==== Usage ====
#
# [--let $slave_timeout= NUMBER]
# [--let $rpl_debug= 1]
# --source include/start_slave_sql.inc
#
# Parameters:
# $slave_timeout
# See include/wait_for_slave_param.inc
#
# $rpl_debug
# See include/rpl_init.inc
--let $include_filename= start_slave_sql.inc
--source include/begin_include_file.inc
if (!$rpl_debug)
{
--disable_query_log
}
START SLAVE SQL_THREAD;
--source include/wait_for_slave_sql_to_start.inc
--let $include_filename= start_slave_sql.inc
--source include/end_include_file.inc
include/master-slave.inc
[connection master]
CREATE TABLE t1(a INT);
INSERT INTO t1 VALUES(1);
CALL mtr.add_suppression("Disk is full writing");
CALL mtr.add_suppression("Retry in 60 secs");
include/stop_slave_sql.inc
SET @@GLOBAL.DEBUG= 'd,simulate_io_thd_wait_for_disk_space';
INSERT INTO t1 VALUES(2);
SET DEBUG_SYNC='now WAIT_FOR parked';
SET @@GLOBAL.DEBUG= '$debug_saved';
include/assert_grep.inc [Found the disk full error message on the slave]
include/start_slave_sql.inc
DROP TABLE t1;
include/rpl_end.inc
# ==== Purpose ====
#
# Check that the execution of SHOW SLAVE STATUS command is not blocked when IO
# thread is blocked waiting for disk space.
#
# ==== Implementation ====
#
# Simulate a scenario where IO thread is waiting for disk space while writing
# into the relay log. Execute SHOW SLAVE STATUS command after IO thread is
# blocked waiting for space. The command should not be blocked.
#
# ==== References ====
#
# Bug#21753696: MAKE SHOW SLAVE STATUS NON BLOCKING IF IO THREAD WAITS FOR
# DISK SPACE
# Bug#20685029: SLAVE IO THREAD SHOULD STOP WHEN DISK IS FULL
#
###############################################################################
--source include/have_debug.inc
# Inorder to grep a specific error pattern in error log a fresh error log
# needs to be generated.
--source include/force_restart.inc
--source include/master-slave.inc
# Generate events to be replicated to the slave
CREATE TABLE t1(a INT);
INSERT INTO t1 VALUES(1);
--sync_slave_with_master
# Those errors will only happen in the slave
CALL mtr.add_suppression("Disk is full writing");
CALL mtr.add_suppression("Retry in 60 secs");
# Stop the SQL thread to avoid writing on disk
--source include/stop_slave_sql.inc
# Set the debug option that will simulate disk full
--let $debug_saved= `SELECT @@GLOBAL.DEBUG`
SET @@GLOBAL.DEBUG= 'd,simulate_io_thd_wait_for_disk_space';
# Generate events to be replicated to the slave
--connection master
INSERT INTO t1 VALUES(2);
--connection slave1
SET DEBUG_SYNC='now WAIT_FOR parked';
# Get the relay log file name using SHOW SLAVE STATUS
--let $relay_log_file= query_get_value(SHOW SLAVE STATUS, Relay_Log_File, 1)
--connection slave
# Restore the debug options to "simulate" freed space on disk
SET @@GLOBAL.DEBUG= '$debug_saved';
# There should be a message in the error log of the slave stating
# that it was waiting for space to write on the relay log.
--let $assert_file=$MYSQLTEST_VARDIR/log/mysqld.2.err
# Grep only after the message that the I/O thread has started
--let $assert_only_after= Slave I/O .* connected to master .*replication started in log .* at position
--let $assert_count= 1
--let $assert_select=Disk is full writing .*$relay_log_file.*
--let $assert_text= Found the disk full error message on the slave
--source include/assert_grep.inc
# Start the SQL thread to let the slave to sync and finish gracefully
--source include/start_slave_sql.inc
# Cleanup
--connection master
DROP TABLE t1;
--source include/rpl_end.inc
......@@ -15,7 +15,7 @@
#include "mysys_priv.h"
#include "mysys_err.h"
#include "m_string.h"
#ifndef SHARED_LIBRARY
const char *globerrs[GLOBERRS]=
......@@ -109,7 +109,6 @@ void init_glob_errs()
*/
void wait_for_free_space(const char *filename, int errors)
{
size_t time_to_sleep= MY_WAIT_FOR_USER_TO_FIX_PANIC;
if (!(errors % MY_WAIT_GIVE_USER_A_MESSAGE))
{
my_printf_warning(EE(EE_DISK_FULL),
......@@ -120,15 +119,10 @@ void wait_for_free_space(const char *filename, int errors)
}
DBUG_EXECUTE_IF("simulate_no_free_space_error",
{
time_to_sleep= 1;
});
DBUG_EXECUTE_IF("simulate_io_thd_wait_for_disk_space",
{
time_to_sleep= 1;
(void) sleep(1);
return;
});
(void) sleep(time_to_sleep);
DEBUG_SYNC_C("disk_full_reached");
(void) sleep(MY_WAIT_FOR_USER_TO_FIX_PANIC);
}
const char **get_global_errmsgs()
......
......@@ -24,7 +24,6 @@ size_t my_write(File Filedes, const uchar *Buffer, size_t Count, myf MyFlags)
{
size_t writtenbytes, written;
uint errors;
size_t ToWriteCount;
DBUG_ENTER("my_write");
DBUG_PRINT("my",("fd: %d Buffer: %p Count: %lu MyFlags: %d",
Filedes, Buffer, (ulong) Count, MyFlags));
......@@ -38,14 +37,11 @@ size_t my_write(File Filedes, const uchar *Buffer, size_t Count, myf MyFlags)
{ DBUG_SET("+d,simulate_file_write_error");});
for (;;)
{
ToWriteCount= Count;
DBUG_EXECUTE_IF("simulate_io_thd_wait_for_disk_space", { ToWriteCount= 1; });
#ifdef _WIN32
writtenbytes= my_win_write(Filedes, Buffer, ToWriteCount);
writtenbytes= my_win_write(Filedes, Buffer, Count);
#else
writtenbytes= write(Filedes, Buffer, ToWriteCount);
writtenbytes= write(Filedes, Buffer, Count);
#endif
DBUG_EXECUTE_IF("simulate_io_thd_wait_for_disk_space", { errno= ENOSPC; });
DBUG_EXECUTE_IF("simulate_file_write_error",
{
errno= ENOSPC;
......
......@@ -37,7 +37,6 @@
#include "log_event.h" // Query_log_event
#include "rpl_filter.h"
#include "rpl_rli.h"
#include "rpl_mi.h"
#include "sql_audit.h"
#include "sql_show.h"
......@@ -4378,22 +4377,13 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
}
#ifdef HAVE_REPLICATION
bool MYSQL_BIN_LOG::append(Log_event* ev, Master_info *mi)
bool MYSQL_BIN_LOG::append(Log_event* ev)
{
bool error = 0;
mysql_mutex_assert_owner(&mi->data_lock);
mysql_mutex_lock(&LOCK_log);
DBUG_ENTER("MYSQL_BIN_LOG::append");
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
/*
Release data_lock by holding LOCK_log, while writing into the relay log.
If slave IO thread waits here for free space, we don't want
SHOW SLAVE STATUS to hang on mi->data_lock. Note LOCK_log mutex is
sufficient to block SQL thread when IO thread is updating relay log here.
*/
mysql_mutex_unlock(&mi->data_lock);
/*
Log_event::write() is smart enough to use my_b_write() or
my_b_append() depending on the kind of cache we have.
......@@ -4408,58 +4398,24 @@ bool MYSQL_BIN_LOG::append(Log_event* ev, Master_info *mi)
if (flush_and_sync(0))
goto err;
if ((uint) my_b_append_tell(&log_file) > max_size)
{
/*
If rotation is required we must acquire data_lock to protect
description_event from clients executing FLUSH LOGS in parallel.
In order do that we must release the existing LOCK_log so that we
get it once again in proper locking order to avoid dead locks.
i.e data_lock , LOCK_log.
*/
mysql_mutex_unlock(&LOCK_log);
mysql_mutex_lock(&mi->data_lock);
mysql_mutex_lock(&LOCK_log);
error= new_file_without_locking();
/*
After rotation release data_lock, we need the LOCK_log till we signal
the updation.
*/
mysql_mutex_unlock(&mi->data_lock);
}
err:
signal_update(); // Safe as we don't call close
mysql_mutex_unlock(&LOCK_log);
mysql_mutex_lock(&mi->data_lock);
signal_update(); // Safe as we don't call close
DBUG_RETURN(error);
}
bool MYSQL_BIN_LOG::appendv(Master_info* mi, const char* buf, uint len,...)
bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...)
{
bool error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::appendv");
va_list(args);
va_start(args,len);
mysql_mutex_assert_owner(&mi->data_lock);
mysql_mutex_lock(&LOCK_log);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
/*
Release data_lock by holding LOCK_log, while writing into the relay log.
If slave IO thread waits here for free space, we don't want
SHOW SLAVE STATUS to hang on mi->data_lock. Note LOCK_log mutex is
sufficient to block SQL thread when IO thread is updating relay log here.
*/
mysql_mutex_unlock(&mi->data_lock);
DBUG_EXECUTE_IF("simulate_io_thd_wait_for_disk_space",
{
const char act[]= "disk_full_reached SIGNAL parked";
DBUG_ASSERT(opt_debug_sync_timeout > 0);
DBUG_ASSERT(!debug_sync_set_action(current_thd,
STRING_WITH_LEN(act)));
};);
mysql_mutex_assert_owner(&LOCK_log);
do
{
if (my_b_append(&log_file,(uchar*) buf,len))
......@@ -4472,34 +4428,13 @@ bool MYSQL_BIN_LOG::appendv(Master_info* mi, const char* buf, uint len,...)
DBUG_PRINT("info",("max_size: %lu",max_size));
if (flush_and_sync(0))
goto err;
if ((uint) my_b_append_tell(&log_file) >
DBUG_EVALUATE_IF("rotate_slave_debug_group", 500, max_size))
{
/*
If rotation is required we must acquire data_lock to protect
description_event from clients executing FLUSH LOGS in parallel.
In order do that we must release the existing LOCK_log so that we
get it once again in proper locking order to avoid dead locks.
i.e data_lock , LOCK_log.
*/
mysql_mutex_unlock(&LOCK_log);
mysql_mutex_lock(&mi->data_lock);
mysql_mutex_lock(&LOCK_log);
if ((uint) my_b_append_tell(&log_file) > max_size)
error= new_file_without_locking();
/*
After rotation release data_lock, we need the LOCK_log till we signal
the updation.
*/
mysql_mutex_unlock(&mi->data_lock);
}
err:
if (!error)
signal_update();
mysql_mutex_unlock(&LOCK_log);
mysql_mutex_lock(&mi->data_lock);
DBUG_RETURN(error);
}
#endif
bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
{
......
......@@ -20,7 +20,6 @@
#include "handler.h" /* my_xid */
class Relay_log_info;
class Master_info;
class Format_description_log_event;
......@@ -455,8 +454,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
v stands for vector
invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0)
*/
bool appendv(Master_info* mi, const char* buf,uint len,...);
bool append(Log_event* ev, Master_info* mi);
bool appendv(const char* buf,uint len,...);
bool append(Log_event* ev);
void make_log_name(char* buf, const char* log_ident);
bool is_active(const char* log_file_name);
......
......@@ -1660,7 +1660,7 @@ Waiting for the slave SQL thread to free enough relay log space");
#endif
if (rli->sql_force_rotate_relay)
{
rotate_relay_log(rli->mi, true/*need_data_lock=true*/);
rotate_relay_log(rli->mi);
rli->sql_force_rotate_relay= false;
}
......@@ -1705,7 +1705,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
if (likely((bool)ev))
{
ev->server_id= 0; // don't be ignored by slave SQL thread
if (unlikely(rli->relay_log.append(ev, mi)))
if (unlikely(rli->relay_log.append(ev)))
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Rotate event"
......@@ -3605,7 +3605,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
break;
Execute_load_log_event xev(thd,0,0);
xev.log_pos = cev->log_pos;
if (unlikely(mi->rli.relay_log.append(&xev, mi)))
if (unlikely(mi->rli.relay_log.append(&xev)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
......@@ -3619,7 +3619,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
{
cev->block = net->read_pos;
cev->block_len = num_bytes;
if (unlikely(mi->rli.relay_log.append(cev, mi)))
if (unlikely(mi->rli.relay_log.append(cev)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
......@@ -3634,7 +3634,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
aev.block = net->read_pos;
aev.block_len = num_bytes;
aev.log_pos = cev->log_pos;
if (unlikely(mi->rli.relay_log.append(&aev, mi)))
if (unlikely(mi->rli.relay_log.append(&aev)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
......@@ -3713,7 +3713,7 @@ static int process_io_rotate(Master_info *mi, Rotate_log_event *rev)
Rotate the relay log makes binlog format detection easier (at next slave
start or mysqlbinlog)
*/
DBUG_RETURN(rotate_relay_log(mi, false/*need_data_lock=false*/));
DBUG_RETURN(rotate_relay_log(mi) /* will take the right mutexes */);
}
/*
......@@ -3819,7 +3819,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
Log_event::Log_event(const char* buf...) in log_event.cc).
*/
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
if (unlikely(rli->relay_log.append(ev, mi)))
if (unlikely(rli->relay_log.append(ev)))
{
delete ev;
mysql_mutex_unlock(&mi->data_lock);
......@@ -3875,7 +3875,7 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
inc_pos= event_len;
break;
}
if (unlikely(rli->relay_log.append(ev, mi)))
if (unlikely(rli->relay_log.append(ev)))
{
delete ev;
mysql_mutex_unlock(&mi->data_lock);
......@@ -4083,6 +4083,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
direct master (an unsupported, useless setup!).
*/
mysql_mutex_lock(log_lock);
s_id= uint4korr(buf + SERVER_ID_OFFSET);
if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
/*
......@@ -4115,7 +4116,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
IGNORE_SERVER_IDS it increments mi->master_log_pos
as well as rli->group_relay_log_pos.
*/
mysql_mutex_lock(log_lock);
if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
(buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
......@@ -4127,14 +4127,13 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rli->ign_master_log_pos_end= mi->master_log_pos;
}
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
mysql_mutex_unlock(log_lock);
DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored",
(ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET)));
}
else
{
/* write the event to the relay log */
if (likely(!(rli->relay_log.appendv(mi, buf,event_len,0))))
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
{
mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
......@@ -4144,10 +4143,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
}
mysql_mutex_lock(log_lock);
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
mysql_mutex_unlock(log_lock);
}
mysql_mutex_unlock(log_lock);
skip_relay_logging:
......@@ -5007,21 +5005,11 @@ event(errno: %d cur_log->error: %d)",
locks; here we don't, so this function is mainly taking locks).
Returns nothing as we cannot catch any error (MYSQL_BIN_LOG::new_file()
is void).
@param mi Master_info for the IO thread.
@param need_data_lock If true, mi->data_lock will be acquired otherwise,
mi->data_lock must be held by the caller.
*/
int rotate_relay_log(Master_info* mi, bool need_data_lock)
int rotate_relay_log(Master_info* mi)
{
DBUG_ENTER("rotate_relay_log");
if (need_data_lock)
mysql_mutex_lock(&mi->data_lock);
else
{
mysql_mutex_assert_owner(&mi->data_lock);
}
Relay_log_info* rli= &mi->rli;
int error= 0;
......@@ -5056,8 +5044,6 @@ int rotate_relay_log(Master_info* mi, bool need_data_lock)
*/
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
end:
if (need_data_lock)
mysql_mutex_unlock(&mi->data_lock);
DBUG_RETURN(error);
}
......
......@@ -205,7 +205,7 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
const char** errmsg);
void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli);
int rotate_relay_log(Master_info* mi, bool need_data_lock);
int rotate_relay_log(Master_info* mi);
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli);
pthread_handler_t handle_slave_io(void *arg);
......
......@@ -157,7 +157,7 @@ bool reload_acl_and_cache(THD *thd, unsigned long options,
{
#ifdef HAVE_REPLICATION
mysql_mutex_lock(&LOCK_active_mi);
if (rotate_relay_log(active_mi, true/*need_data_lock=true*/))
if (rotate_relay_log(active_mi))
*write_to_binlog= -1;
mysql_mutex_unlock(&LOCK_active_mi);
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment