Commit 83611517 authored by Sujatha Sivakumar's avatar Sujatha Sivakumar

Bug#20685029: SLAVE IO THREAD SHOULD STOP WHEN DISK IS

FULL
Bug#21753696: MAKE SHOW SLAVE STATUS NON BLOCKING IF IO
THREAD WAITS FOR DISK SPACE

Problem:
========
Currently SHOW SLAVE STATUS blocks if IO thread waits for
disk space. This makes automation tools verifying
server health block on taking relevant action. Finally this
will create SHOW SLAVE STATUS piles.

Analysis:
=========
SHOW SLAVE STATUS hangs on mi->data_lock if relay log write
is waiting for free disk space while holding mi->data_lock.
mi->data_lock is needed to protect the format description
event (mi->format_description_event) which is accessed by
the clients running FLUSH LOGS and slave IO thread. Note
relay log writes don't need to be protected by
mi->data_lock, LOCK_log is used to protect relay log between
IO and SQL thread (see MYSQL_BIN_LOG::append_event). The
code takes mi->data_lock to protect
mi->format_description_event during relay log rotate which
might get triggered right after relay log write.

Fix:
====
Release the data_lock just for the duration of writing into
relay log.

Made change to ensure the following lock order is maintained
to avoid deadlocks.

data_lock, LOCK_log

data_lock is held during relay log rotations to protect
the description event.
parent bb32ac1d
# ==== Purpose ====
#
# Grep a file for a pattern, produce a single string out of the
# matching lines, and assert that the string matches a given regular
# expression.
#
# ==== Usage ====
#
# --let $assert_text= TEXT
# --let $assert_file= FILE
# --let $assert_select= REGEX
# [--let $assert_match= REGEX | --let $assert_count= NUMBER]
# [--let $assert_only_after= REGEX]
# --source include/assert_grep.inc
#
# Parameters:
#
# $assert_text
# Text that describes what is being checked. This text is written to
# the query log so it should not contain non-deterministic elements.
#
# $assert_file
# File to search.
#
# $assert_select
# All lines matching this text will be checked.
#
# $assert_match
# The script will find all lines that match $assert_select,
# concatenate them to a long string, and assert that it matches
# $assert_match.
#
# $assert_count
# Instead of asserting that the selected lines match
# $assert_match, assert that there were exactly $assert_count
# matching lines.
#
# $assert_only_after
# Reset all the lines matched and the counter when finding this pattern.
# It is useful for searching things in the mysqld.err log file just
# after the last server restart for example (discarding the log content
# of previous server executions).
if (!$assert_text)
{
--die !!!ERROR IN TEST: you must set $assert_text
}
if (!$assert_file)
{
--die !!!ERROR IN TEST: you must set $assert_file
}
if (!$assert_select)
{
--die !!!ERROR IN TEST: you must set $assert_select
}
if ($assert_match == '')
{
if ($assert_count == '')
{
--die !!!ERROR IN TEST: you must set either $assert_match or $assert_count
}
}
if ($assert_match != '')
{
if ($assert_count != '')
{
--echo assert_text='$assert_text' assert_count='$assert_count'
--die !!!ERROR IN TEST: you must set only one of $assert_match or $assert_count
}
}
--let $include_filename= assert_grep.inc [$assert_text]
--source include/begin_include_file.inc
--let _AG_ASSERT_TEXT= $assert_text
--let _AG_ASSERT_FILE= $assert_file
--let _AG_ASSERT_SELECT= $assert_select
--let _AG_ASSERT_MATCH= $assert_match
--let _AG_ASSERT_COUNT= $assert_count
--let _AG_OUT= `SELECT CONCAT('$MYSQLTEST_VARDIR/tmp/_ag_', UUID())`
--let _AG_ASSERT_ONLY_AFTER= $assert_only_after
--perl
use strict;
use warnings;
my $file= $ENV{'_AG_ASSERT_FILE'};
my $assert_select= $ENV{'_AG_ASSERT_SELECT'};
my $assert_match= $ENV{'_AG_ASSERT_MATCH'};
my $assert_count= $ENV{'_AG_ASSERT_COUNT'};
my $assert_only_after= $ENV{'_AG_ASSERT_ONLY_AFTER'};
my $out= $ENV{'_AG_OUT'};
my $result= '';
my $count= 0;
open(FILE, "$file") or die("Error $? opening $file: $!\n");
while (<FILE>) {
my $line = $_;
if ($assert_only_after && $line =~ /$assert_only_after/) {
$result = "";
$count = 0;
}
if ($line =~ /$assert_select/) {
if ($assert_count ne '') {
$count++;
}
else {
$result .= $line;
}
}
}
close(FILE) or die("Error $? closing $file: $!");
open OUT, "> $out" or die("Error $? opening $out: $!");
if ($assert_count ne '' && ($count != $assert_count)) {
print OUT ($count) or die("Error $? writing $out: $!");
}
elsif ($assert_count eq '' && $result !~ /$assert_match/) {
print OUT ($result) or die("Error $? writing $out: $!");
}
else {
print OUT ("assert_grep.inc ok");
}
close OUT or die("Error $? closing $out: $!");
EOF
--let $_ag_outcome= `SELECT LOAD_FILE('$_AG_OUT')`
if ($_ag_outcome != 'assert_grep.inc ok')
{
--source include/show_rpl_debug_info.inc
--echo include/assert_grep.inc failed!
--echo assert_text: '$assert_text'
--echo assert_file: '$assert_file'
--echo assert_select: '$assert_select'
--echo assert_match: '$assert_match'
--echo assert_count: '$assert_count'
--echo assert_only_after: '$assert_only_after'
if ($assert_match != '')
{
--echo matching lines: '$_ag_outcome'
}
if ($assert_count != '')
{
--echo number of matching lines: $_ag_outcome
}
--die assert_grep.inc failed.
}
--let $include_filename= include/assert_grep.inc [$assert_text]
--source include/end_include_file.inc
......@@ -43,6 +43,7 @@
#
# [--let $rpl_server_count= 7]
# --let $rpl_topology= 1->2->3->1->4, 2->5, 6->7
# [--let $rpl_extra_connections_per_server= 1]
# [--let $rpl_check_server_ids= 1]
# [--let $rpl_skip_change_master= 1]
# [--let $rpl_skip_start_slave= 1]
......@@ -65,6 +66,12 @@
# want to specify the empty topology (no server replicates at
# all), you have to set $rpl_topology=none.
#
# $rpl_extra_connections_per_server
# By default, this script creates connections server_N and
# server_N_1. If you can set this variable to a number, the
# script creates:
# server_N, server_N_1, ..., server_N_$rpl_extra_connections_per_server
#
# $rpl_check_server_ids
# If $rpl_check_server_ids is set, this script checks that the
# @@server_id of all servers are different. This is normally
......@@ -139,8 +146,17 @@ if (!$SERVER_MYPORT_4)
# Check that $rpl_server_count is set
if (!$rpl_server_count)
{
--let $_compute_rpl_server_count= `SELECT REPLACE('$rpl_topology', '->', ',')`
--let $rpl_server_count= `SELECT GREATEST($_compute_rpl_server_count)`
--let $rpl_server_count= `SELECT REPLACE('$rpl_topology', '->', ',')`
if (`SELECT LOCATE(',', '$rpl_server_count')`)
{
--let $rpl_server_count= `SELECT GREATEST($rpl_server_count)`
}
}
--let $_rpl_extra_connections_per_server= $rpl_extra_connections_per_server
if ($_rpl_extra_connections_per_server == '')
{
--let $_rpl_extra_connections_per_server= 1
}
......@@ -159,15 +175,20 @@ if (!$rpl_debug)
# Create two connections to each server; reset master/slave, select
# database, set autoinc variables.
--let $_rpl_server= $rpl_server_count
--let $_rpl_one= _1
--let $underscore= _
while ($_rpl_server)
{
# Connect.
--let $rpl_server_number= $_rpl_server
--let $rpl_connection_name= server_$_rpl_server
--source include/rpl_connect.inc
--let $rpl_connection_name= server_$_rpl_server$_rpl_one
--source include/rpl_connect.inc
--let $_rpl_connection_number= 1
while ($_rpl_connection_number <= $_rpl_extra_connections_per_server)
{
--let $rpl_connection_name= server_$_rpl_server$underscore$_rpl_connection_number
--source include/rpl_connect.inc
--inc $_rpl_connection_number
}
# Configure server.
--let $rpl_connection_name= server_$_rpl_server
......
......@@ -12,6 +12,7 @@
# ==== Usage ====
#
# --let $rpl_server_number= N
# [--let $rpl_extra_connections_per_server= 1]
# [--let $rpl_debug= 1]
# --source include/rpl_reconnect.inc
#
......@@ -21,7 +22,7 @@
# master server, 2 the slave server, 3 the 3rd server, and so on.
# Cf. include/rpl_init.inc
#
# $rpl_debug
# $rpl_extra_connections_per_server, $rpl_debug
# See include/rpl_init.inc
--let $include_filename= rpl_reconnect.inc
......@@ -32,6 +33,11 @@ if (!$rpl_server_number)
--die ERROR IN TEST: you must set $rpl_server_number before you source rpl_connect.inc
}
if ($_rpl_extra_connections_per_server == '')
{
--let $_rpl_extra_connections_per_server= 1
}
if ($rpl_debug)
{
......@@ -72,10 +78,14 @@ if (!$_rpl_server_number)
--source include/rpl_connection.inc
--enable_reconnect
--let $_rpl_one= _1
--let $rpl_connection_name= server_$rpl_server_number$_rpl_one
--source include/rpl_connection.inc
--enable_reconnect
--let $_rpl_connection_number= 1
while ($_rpl_connection_number <= $_rpl_extra_connections_per_server)
{
--let $rpl_connection_name= server_$rpl_server_number$underscore$_rpl_connection_number
--source include/rpl_connection.inc
--enable_reconnect
--inc $_rpl_connection_number
}
if ($rpl_debug)
{
......@@ -122,10 +132,15 @@ if (!$_rpl_server_number)
--source include/wait_until_connected_again.inc
--disable_reconnect
--let $rpl_connection_name= server_$rpl_server_number$_rpl_one
--source include/rpl_connection.inc
--source include/wait_until_connected_again.inc
--disable_reconnect
--let $_rpl_connection_number= 1
while ($_rpl_connection_number <= $_rpl_extra_connections_per_server)
{
--let $rpl_connection_name= server_$rpl_server_number$underscore$_rpl_connection_number
--source include/rpl_connection.inc
--source include/wait_until_connected_again.inc
--disable_reconnect
--inc $_rpl_connection_number
}
--let $include_filename= rpl_reconnect.inc
......
# ==== Purpose ====
#
# Issues START SLAVE SQL_THREAD on the current connection. Then waits
# until the SQL thread has started, or until a timeout is reached.
#
# Please use this instead of 'START SLAVE SQL_THREAD', to reduce the
# risk of races in test cases.
#
#
# ==== Usage ====
#
# [--let $slave_timeout= NUMBER]
# [--let $rpl_debug= 1]
# --source include/start_slave_sql.inc
#
# Parameters:
# $slave_timeout
# See include/wait_for_slave_param.inc
#
# $rpl_debug
# See include/rpl_init.inc
--let $include_filename= start_slave_sql.inc
--source include/begin_include_file.inc
if (!$rpl_debug)
{
--disable_query_log
}
START SLAVE SQL_THREAD;
--source include/wait_for_slave_sql_to_start.inc
--let $include_filename= start_slave_sql.inc
--source include/end_include_file.inc
include/master-slave.inc
[connection master]
CREATE TABLE t1(a INT);
INSERT INTO t1 VALUES(1);
CALL mtr.add_suppression("Disk is full writing");
CALL mtr.add_suppression("Retry in 60 secs");
include/stop_slave_sql.inc
SET @@GLOBAL.DEBUG= 'd,simulate_io_thd_wait_for_disk_space';
INSERT INTO t1 VALUES(2);
include/wait_for_slave_param.inc [Slave_IO_State]
SET @@GLOBAL.DEBUG= '$debug_saved';
include/assert_grep.inc [Found the disk full error message on the slave]
include/start_slave_sql.inc
DROP TABLE t1;
include/rpl_end.inc
# ==== Purpose ====
#
# Check that the execution of SHOW SLAVE STATUS command is not blocked when IO
# thread is blocked waiting for disk space.
#
# ==== Implementation ====
#
# Simulate a scenario where IO thread is waiting for disk space while writing
# into the relay log. Execute SHOW SLAVE STATUS command after IO thread is
# blocked waiting for space. The command should not be blocked.
#
# ==== References ====
#
# Bug#21753696: MAKE SHOW SLAVE STATUS NON BLOCKING IF IO THREAD WAITS FOR
# DISK SPACE
# Bug#20685029: SLAVE IO THREAD SHOULD STOP WHEN DISK IS FULL
#
###############################################################################
--source include/have_debug.inc
--source include/master-slave.inc
# Generate events to be replicated to the slave
CREATE TABLE t1(a INT);
INSERT INTO t1 VALUES(1);
--sync_slave_with_master
# Those errors will only happen in the slave
CALL mtr.add_suppression("Disk is full writing");
CALL mtr.add_suppression("Retry in 60 secs");
# Stop the SQL thread to avoid writing on disk
--source include/stop_slave_sql.inc
# Set the debug option that will simulate disk full
--let $debug_saved= `SELECT @@GLOBAL.DEBUG`
SET @@GLOBAL.DEBUG= 'd,simulate_io_thd_wait_for_disk_space';
# Generate events to be replicated to the slave
--connection master
INSERT INTO t1 VALUES(2);
--connection slave
# Wait until IO thread is queuing events from master
# Notice that this is performed by querying SHOW SLAVE STATUS
--let $slave_param= Slave_IO_State
--let $slave_param_value= Queueing master event to the relay log
--source include/wait_for_slave_param.inc
# Get the relay log file name, also using SHOW SLAVE STATUS
--let $relay_log_file= query_get_value(SHOW SLAVE STATUS, Relay_Log_File, 1)
# Restore the debug options to "simulate" freed space on disk
SET @@GLOBAL.DEBUG= '$debug_saved';
# There should be a message in the error log of the slave stating
# that it was waiting for space to write on the relay log.
--let $assert_file=$MYSQLTEST_VARDIR/log/mysqld.2.err
# Grep only after the message that the I/O thread has started
--let $assert_only_after= Slave I/O .* connected to master .*replication started in log .* at position
--let $assert_count= 1
--let $assert_select=Disk is full writing .*$relay_log_file.*
--let $assert_text= Found the disk full error message on the slave
--source include/assert_grep.inc
# Start the SQL thread to let the slave to sync and finish gracefully
--source include/start_slave_sql.inc
# Cleanup
--connection master
DROP TABLE t1;
--source include/rpl_end.inc
/* Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
/* Copyright (c) 2000, 2016, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -109,6 +109,7 @@ void init_glob_errs()
*/
void wait_for_free_space(const char *filename, int errors)
{
size_t time_to_sleep= MY_WAIT_FOR_USER_TO_FIX_PANIC;
if (!(errors % MY_WAIT_GIVE_USER_A_MESSAGE))
{
my_printf_warning(EE(EE_DISK_FULL),
......@@ -119,10 +120,14 @@ void wait_for_free_space(const char *filename, int errors)
}
DBUG_EXECUTE_IF("simulate_no_free_space_error",
{
(void) sleep(1);
return;
time_to_sleep= 1;
});
(void) sleep(MY_WAIT_FOR_USER_TO_FIX_PANIC);
DBUG_EXECUTE_IF("simulate_io_thd_wait_for_disk_space",
{
time_to_sleep= 1;
});
(void) sleep(time_to_sleep);
}
const char **get_global_errmsgs()
......
/* Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
/* Copyright (c) 2000, 2016, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -24,6 +24,7 @@ size_t my_write(File Filedes, const uchar *Buffer, size_t Count, myf MyFlags)
{
size_t writtenbytes, written;
uint errors;
size_t ToWriteCount;
DBUG_ENTER("my_write");
DBUG_PRINT("my",("fd: %d Buffer: %p Count: %lu MyFlags: %d",
Filedes, Buffer, (ulong) Count, MyFlags));
......@@ -37,11 +38,14 @@ size_t my_write(File Filedes, const uchar *Buffer, size_t Count, myf MyFlags)
{ DBUG_SET("+d,simulate_file_write_error");});
for (;;)
{
ToWriteCount= Count;
DBUG_EXECUTE_IF("simulate_io_thd_wait_for_disk_space", { ToWriteCount= 1; });
#ifdef _WIN32
writtenbytes= my_win_write(Filedes, Buffer, Count);
writtenbytes= my_win_write(Filedes, Buffer, ToWriteCount);
#else
writtenbytes= write(Filedes, Buffer, Count);
writtenbytes= write(Filedes, Buffer, ToWriteCount);
#endif
DBUG_EXECUTE_IF("simulate_io_thd_wait_for_disk_space", { errno= ENOSPC; });
DBUG_EXECUTE_IF("simulate_file_write_error",
{
errno= ENOSPC;
......
/* Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.
/* Copyright (c) 2000, 2016, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -37,6 +37,7 @@
#include "log_event.h" // Query_log_event
#include "rpl_filter.h"
#include "rpl_rli.h"
#include "rpl_mi.h"
#include "sql_audit.h"
#include "sql_show.h"
......@@ -4377,13 +4378,22 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock)
}
bool MYSQL_BIN_LOG::append(Log_event* ev)
#ifdef HAVE_REPLICATION
bool MYSQL_BIN_LOG::append(Log_event* ev, Master_info *mi)
{
bool error = 0;
mysql_mutex_assert_owner(&mi->data_lock);
mysql_mutex_lock(&LOCK_log);
DBUG_ENTER("MYSQL_BIN_LOG::append");
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
/*
Release data_lock by holding LOCK_log, while writing into the relay log.
If slave IO thread waits here for free space, we don't want
SHOW SLAVE STATUS to hang on mi->data_lock. Note LOCK_log mutex is
sufficient to block SQL thread when IO thread is updating relay log here.
*/
mysql_mutex_unlock(&mi->data_lock);
/*
Log_event::write() is smart enough to use my_b_write() or
my_b_append() depending on the kind of cache we have.
......@@ -4398,24 +4408,50 @@ bool MYSQL_BIN_LOG::append(Log_event* ev)
if (flush_and_sync(0))
goto err;
if ((uint) my_b_append_tell(&log_file) > max_size)
{
/*
If rotation is required we must acquire data_lock to protect
description_event from clients executing FLUSH LOGS in parallel.
In order do that we must release the existing LOCK_log so that we
get it once again in proper locking order to avoid dead locks.
i.e data_lock , LOCK_log.
*/
mysql_mutex_unlock(&LOCK_log);
mysql_mutex_lock(&mi->data_lock);
mysql_mutex_lock(&LOCK_log);
error= new_file_without_locking();
/*
After rotation release data_lock, we need the LOCK_log till we signal
the updation.
*/
mysql_mutex_unlock(&mi->data_lock);
}
err:
mysql_mutex_unlock(&LOCK_log);
signal_update(); // Safe as we don't call close
mysql_mutex_unlock(&LOCK_log);
mysql_mutex_lock(&mi->data_lock);
DBUG_RETURN(error);
}
bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...)
bool MYSQL_BIN_LOG::appendv(Master_info* mi, const char* buf, uint len,...)
{
bool error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::appendv");
va_list(args);
va_start(args,len);
mysql_mutex_assert_owner(&mi->data_lock);
mysql_mutex_lock(&LOCK_log);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
mysql_mutex_assert_owner(&LOCK_log);
/*
Release data_lock by holding LOCK_log, while writing into the relay log.
If slave IO thread waits here for free space, we don't want
SHOW SLAVE STATUS to hang on mi->data_lock. Note LOCK_log mutex is
sufficient to block SQL thread when IO thread is updating relay log here.
*/
mysql_mutex_unlock(&mi->data_lock);
do
{
if (my_b_append(&log_file,(uchar*) buf,len))
......@@ -4428,13 +4464,34 @@ bool MYSQL_BIN_LOG::appendv(const char* buf, uint len,...)
DBUG_PRINT("info",("max_size: %lu",max_size));
if (flush_and_sync(0))
goto err;
if ((uint) my_b_append_tell(&log_file) > max_size)
if ((uint) my_b_append_tell(&log_file) >
DBUG_EVALUATE_IF("rotate_slave_debug_group", 500, max_size))
{
/*
If rotation is required we must acquire data_lock to protect
description_event from clients executing FLUSH LOGS in parallel.
In order do that we must release the existing LOCK_log so that we
get it once again in proper locking order to avoid dead locks.
i.e data_lock , LOCK_log.
*/
mysql_mutex_unlock(&LOCK_log);
mysql_mutex_lock(&mi->data_lock);
mysql_mutex_lock(&LOCK_log);
error= new_file_without_locking();
/*
After rotation release data_lock, we need the LOCK_log till we signal
the updation.
*/
mysql_mutex_unlock(&mi->data_lock);
}
err:
if (!error)
signal_update();
mysql_mutex_unlock(&LOCK_log);
mysql_mutex_lock(&mi->data_lock);
DBUG_RETURN(error);
}
#endif
bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
{
......
/* Copyright (c) 2005, 2012, Oracle and/or its affiliates. All rights reserved.
/* Copyright (c) 2005, 2016, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -20,6 +20,7 @@
#include "handler.h" /* my_xid */
class Relay_log_info;
class Master_info;
class Format_description_log_event;
......@@ -454,8 +455,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
v stands for vector
invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0)
*/
bool appendv(const char* buf,uint len,...);
bool append(Log_event* ev);
bool appendv(Master_info* mi, const char* buf,uint len,...);
bool append(Log_event* ev, Master_info* mi);
void make_log_name(char* buf, const char* log_ident);
bool is_active(const char* log_file_name);
......
/* Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.
/* Copyright (c) 2000, 2016, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -1660,7 +1660,7 @@ Waiting for the slave SQL thread to free enough relay log space");
#endif
if (rli->sql_force_rotate_relay)
{
rotate_relay_log(rli->mi);
rotate_relay_log(rli->mi, true/*need_data_lock=true*/);
rli->sql_force_rotate_relay= false;
}
......@@ -1705,7 +1705,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
if (likely((bool)ev))
{
ev->server_id= 0; // don't be ignored by slave SQL thread
if (unlikely(rli->relay_log.append(ev)))
if (unlikely(rli->relay_log.append(ev, mi)))
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Rotate event"
......@@ -3605,7 +3605,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
break;
Execute_load_log_event xev(thd,0,0);
xev.log_pos = cev->log_pos;
if (unlikely(mi->rli.relay_log.append(&xev)))
if (unlikely(mi->rli.relay_log.append(&xev, mi)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
......@@ -3619,7 +3619,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
{
cev->block = net->read_pos;
cev->block_len = num_bytes;
if (unlikely(mi->rli.relay_log.append(cev)))
if (unlikely(mi->rli.relay_log.append(cev, mi)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
......@@ -3634,7 +3634,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
aev.block = net->read_pos;
aev.block_len = num_bytes;
aev.log_pos = cev->log_pos;
if (unlikely(mi->rli.relay_log.append(&aev)))
if (unlikely(mi->rli.relay_log.append(&aev, mi)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
......@@ -3713,7 +3713,7 @@ static int process_io_rotate(Master_info *mi, Rotate_log_event *rev)
Rotate the relay log makes binlog format detection easier (at next slave
start or mysqlbinlog)
*/
DBUG_RETURN(rotate_relay_log(mi) /* will take the right mutexes */);
DBUG_RETURN(rotate_relay_log(mi, false/*need_data_lock=false*/));
}
/*
......@@ -3819,7 +3819,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
Log_event::Log_event(const char* buf...) in log_event.cc).
*/
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
if (unlikely(rli->relay_log.append(ev)))
if (unlikely(rli->relay_log.append(ev, mi)))
{
delete ev;
mysql_mutex_unlock(&mi->data_lock);
......@@ -3875,7 +3875,7 @@ static int queue_binlog_ver_3_event(Master_info *mi, const char *buf,
inc_pos= event_len;
break;
}
if (unlikely(rli->relay_log.append(ev)))
if (unlikely(rli->relay_log.append(ev, mi)))
{
delete ev;
mysql_mutex_unlock(&mi->data_lock);
......@@ -4083,7 +4083,6 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
direct master (an unsupported, useless setup!).
*/
mysql_mutex_lock(log_lock);
s_id= uint4korr(buf + SERVER_ID_OFFSET);
if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
/*
......@@ -4116,6 +4115,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
IGNORE_SERVER_IDS it increments mi->master_log_pos
as well as rli->group_relay_log_pos.
*/
mysql_mutex_lock(log_lock);
if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
(buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
......@@ -4127,13 +4127,14 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
rli->ign_master_log_pos_end= mi->master_log_pos;
}
rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
mysql_mutex_unlock(log_lock);
DBUG_PRINT("info", ("master_log_pos: %lu, event originating from %u server, ignored",
(ulong) mi->master_log_pos, uint4korr(buf + SERVER_ID_OFFSET)));
}
else
{
/* write the event to the relay log */
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
if (likely(!(rli->relay_log.appendv(mi, buf,event_len,0))))
{
mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
......@@ -4143,9 +4144,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
}
mysql_mutex_lock(log_lock);
rli->ign_master_log_name_end[0]= 0; // last event is not ignored
mysql_mutex_unlock(log_lock);
}
mysql_mutex_unlock(log_lock);
skip_relay_logging:
......@@ -5005,11 +5007,21 @@ event(errno: %d cur_log->error: %d)",
locks; here we don't, so this function is mainly taking locks).
Returns nothing as we cannot catch any error (MYSQL_BIN_LOG::new_file()
is void).
@param mi Master_info for the IO thread.
@param need_data_lock If true, mi->data_lock will be acquired otherwise,
mi->data_lock must be held by the caller.
*/
int rotate_relay_log(Master_info* mi)
int rotate_relay_log(Master_info* mi, bool need_data_lock)
{
DBUG_ENTER("rotate_relay_log");
if (need_data_lock)
mysql_mutex_lock(&mi->data_lock);
else
{
mysql_mutex_assert_owner(&mi->data_lock);
}
Relay_log_info* rli= &mi->rli;
int error= 0;
......@@ -5044,6 +5056,8 @@ int rotate_relay_log(Master_info* mi)
*/
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
end:
if (need_data_lock)
mysql_mutex_unlock(&mi->data_lock);
DBUG_RETURN(error);
}
......
/* Copyright (c) 2000, 2010, Oracle and/or its affiliates. All rights reserved.
/* Copyright (c) 2000, 2016, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -205,7 +205,7 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
const char** errmsg);
void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli);
int rotate_relay_log(Master_info* mi);
int rotate_relay_log(Master_info* mi, bool need_data_lock);
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli);
pthread_handler_t handle_slave_io(void *arg);
......
/* Copyright (c) 2010, 2015, Oracle and/or its affiliates. All rights reserved.
/* Copyright (c) 2010, 2016, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
......@@ -157,7 +157,7 @@ bool reload_acl_and_cache(THD *thd, unsigned long options,
{
#ifdef HAVE_REPLICATION
mysql_mutex_lock(&LOCK_active_mi);
if (rotate_relay_log(active_mi))
if (rotate_relay_log(active_mi, true/*need_data_lock=true*/))
*write_to_binlog= -1;
mysql_mutex_unlock(&LOCK_active_mi);
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment