Commit 40c0b2c6 authored by unknown's avatar unknown

New variable rli->ignore_log_space_limit to resolve

a deadlock between I/O and SQL threads in replication
when relay_log_space is too small. This fixes bug #79.


sql/log.cc:
  New variable rli->ignore_log_space_limit to resolve
  a deadlock between I/O and SQL threads in replication
  when relay_log_space is too small.
sql/slave.cc:
  New variable rli->ignore_log_space_limit to resolve
  a deadlock between I/O and SQL threads in replication
  when relay_log_space is too small.
sql/slave.h:
  New variable rli->ignore_log_space_limit to resolve
  a deadlock between I/O and SQL threads in replication
  when relay_log_space is too small.
sql/sql_repl.cc:
  New variable rli->ignore_log_space_limit to resolve
  a deadlock between I/O and SQL threads in replication
  when relay_log_space is too small.
parent 21034796
slave stop;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
slave start;
stop slave;
create table t1 (a int);
reset slave;
start slave;
select master_pos_wait('master-bin.001',5000,45)=-1;
master_pos_wait('master-bin.001',5000,45)=-1
0
-O relay_log_space_limit=1024
\ No newline at end of file
# The slave is started with relay_log_space_limit=1024 bytes,
# to force the deadlock
source include/master-slave.inc;
connection slave;
stop slave;
connection master;
create table t1 (a int);
let $1=200;
disable_query_log;
while ($1)
{
# eval means expand $ expressions
eval insert into t1 values( $1 );
dec $1;
}
# This will generate one 10kB master's binlog
enable_query_log;
save_master_pos;
connection slave;
reset slave;
start slave;
# The I/O thread stops filling the relay log when
# it's 1kB. And the SQL thread cannot purge this relay log
# as purge is done only when the SQL thread switches to another
# relay log, which does not exist here.
# So we should have a deadlock.
# if it is not resolved automatically we'll detect
# it with master_pos_wait that waits for farther than 1kB;
# it will timeout after 45 seconds;
# also the slave will probably not cooperate to shutdown
# (as 2 threads are locked)
select master_pos_wait('master-bin.001',5000,45)=-1;
...@@ -645,6 +645,8 @@ int MYSQL_LOG::purge_first_log(struct st_relay_log_info* rli) ...@@ -645,6 +645,8 @@ int MYSQL_LOG::purge_first_log(struct st_relay_log_info* rli)
*/ */
pthread_mutex_lock(&rli->log_space_lock); pthread_mutex_lock(&rli->log_space_lock);
rli->log_space_total -= rli->relay_log_pos; rli->log_space_total -= rli->relay_log_pos;
//tell the I/O thread to take the relay_log_space_limit into account
rli->ignore_log_space_limit= 0;
pthread_mutex_unlock(&rli->log_space_lock); pthread_mutex_unlock(&rli->log_space_lock);
pthread_cond_broadcast(&rli->log_space_cond); pthread_cond_broadcast(&rli->log_space_cond);
......
...@@ -238,7 +238,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, ...@@ -238,7 +238,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
if (log) // If not first log if (log) // If not first log
{ {
if (strcmp(log, rli->linfo.log_file_name)) if (strcmp(log, rli->linfo.log_file_name))
rli->skip_log_purge=1; // Different name; Don't purge rli->skip_log_purge= 1; // Different name; Don't purge
if (rli->relay_log.find_log_pos(&rli->linfo, log, 1)) if (rli->relay_log.find_log_pos(&rli->linfo, log, 1))
{ {
*errmsg="Could not find target log during relay log initialization"; *errmsg="Could not find target log during relay log initialization";
...@@ -273,6 +273,12 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, ...@@ -273,6 +273,12 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
my_b_seek(rli->cur_log,(off_t)pos); my_b_seek(rli->cur_log,(off_t)pos);
err: err:
/*
If we don't purge, we can't honour relay_log_space_limit ;
silently discard it
*/
if (rli->skip_log_purge)
rli->log_space_limit= 0;
pthread_cond_broadcast(&rli->data_cond); pthread_cond_broadcast(&rli->data_cond);
if (need_data_lock) if (need_data_lock)
pthread_mutex_unlock(&rli->data_lock); pthread_mutex_unlock(&rli->data_lock);
...@@ -1312,7 +1318,8 @@ static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli) ...@@ -1312,7 +1318,8 @@ static bool wait_for_relay_log_space(RELAY_LOG_INFO* rli)
save_proc_info = thd->proc_info; save_proc_info = thd->proc_info;
thd->proc_info = "Waiting for relay log space to free"; thd->proc_info = "Waiting for relay log space to free";
while (rli->log_space_limit < rli->log_space_total && while (rli->log_space_limit < rli->log_space_total &&
!(slave_killed=io_slave_killed(thd,mi))) !(slave_killed=io_slave_killed(thd,mi)) &&
!rli->ignore_log_space_limit)
{ {
pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock); pthread_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
} }
...@@ -1588,7 +1595,7 @@ bool flush_master_info(MASTER_INFO* mi) ...@@ -1588,7 +1595,7 @@ bool flush_master_info(MASTER_INFO* mi)
st_relay_log_info::st_relay_log_info() st_relay_log_info::st_relay_log_info()
:info_fd(-1), cur_log_fd(-1), master_log_pos(0), save_temporary_tables(0), :info_fd(-1), cur_log_fd(-1), master_log_pos(0), save_temporary_tables(0),
cur_log_old_open_count(0), log_space_total(0), cur_log_old_open_count(0), log_space_total(0), ignore_log_space_limit(0),
slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0),
sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0), sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0),
slave_running(0), skip_log_purge(0), slave_running(0), skip_log_purge(0),
...@@ -2296,7 +2303,8 @@ reconnect done to recover from failed read"); ...@@ -2296,7 +2303,8 @@ reconnect done to recover from failed read");
} }
flush_master_info(mi); flush_master_info(mi);
if (mi->rli.log_space_limit && mi->rli.log_space_limit < if (mi->rli.log_space_limit && mi->rli.log_space_limit <
mi->rli.log_space_total) mi->rli.log_space_total &&
!mi->rli.ignore_log_space_limit)
if (wait_for_relay_log_space(&mi->rli)) if (wait_for_relay_log_space(&mi->rli))
{ {
sql_print_error("Slave I/O thread aborted while waiting for relay \ sql_print_error("Slave I/O thread aborted while waiting for relay \
...@@ -2408,6 +2416,10 @@ extern "C" pthread_handler_decl(handle_slave_sql,arg) ...@@ -2408,6 +2416,10 @@ extern "C" pthread_handler_decl(handle_slave_sql,arg)
pthread_cond_broadcast(&rli->start_cond); pthread_cond_broadcast(&rli->start_cond);
// This should always be set to 0 when the slave thread is started // This should always be set to 0 when the slave thread is started
rli->pending = 0; rli->pending = 0;
//tell the I/O thread to take relay_log_space_limit into account from now on
rli->ignore_log_space_limit= 0;
if (init_relay_log_pos(rli, if (init_relay_log_pos(rli,
rli->relay_log_name, rli->relay_log_name,
rli->relay_log_pos, rli->relay_log_pos,
...@@ -3086,11 +3098,41 @@ Log_event* next_event(RELAY_LOG_INFO* rli) ...@@ -3086,11 +3098,41 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
update. If we do not, show slave status will block update. If we do not, show slave status will block
*/ */
pthread_mutex_unlock(&rli->data_lock); pthread_mutex_unlock(&rli->data_lock);
/* Note that wait_for_update unlocks lock_log ! */
rli->relay_log.wait_for_update(rli->sql_thd); /*
Possible deadlock :
// re-acquire data lock since we released it earlier - the I/O thread has reached log_space_limit
pthread_mutex_lock(&rli->data_lock); - the SQL thread has read all relay logs, but cannot purge for some
reason:
* it has already purged all logs except the current one
* there are other logs than the current one but they're involved in
a transaction that finishes in the current one (or is not finished)
Solution :
Wake up the possibly waiting I/O thread, and set a boolean asking
the I/O thread to temporarily ignore the log_space_limit
constraint, because we do not want the I/O thread to block because of
space (it's ok if it blocks for any other reason (e.g. because the
master does not send anything). Then the I/O thread stops waiting
and reads more events.
The SQL thread decides when the I/O thread should take log_space_limit
into account again : ignore_log_space_limit is reset to 0
in purge_first_log (when the SQL thread purges the just-read relay
log), and also when the SQL thread starts. We should also reset
ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
fact, no need as RESET SLAVE requires that the slave
be stopped, and when the SQL thread is later restarted
ignore_log_space_limit will be reset to 0.
*/
pthread_mutex_lock(&rli->log_space_lock);
// prevent the I/O thread from blocking next times
rli->ignore_log_space_limit= 1;
// If the I/O thread is blocked, unblock it
pthread_cond_broadcast(&rli->log_space_cond);
pthread_mutex_unlock(&rli->log_space_lock);
// Note that wait_for_update unlocks lock_log !
rli->relay_log.wait_for_update(rli->sql_thd);
// re-acquire data lock since we released it earlier
pthread_mutex_lock(&rli->data_lock);
continue; continue;
} }
/* /*
......
...@@ -137,7 +137,14 @@ typedef struct st_relay_log_info ...@@ -137,7 +137,14 @@ typedef struct st_relay_log_info
offset. pending stored the extra offset to be added to the position. offset. pending stored the extra offset to be added to the position.
*/ */
ulonglong relay_log_pos, pending; ulonglong relay_log_pos, pending;
/*
Handling of the relay_log_space_limit optional constraint.
ignore_log_space_limit is used to resolve a deadlock between I/O and SQL
threads, it makes the I/O thread temporarily forget about the constraint
*/
ulonglong log_space_limit,log_space_total; ulonglong log_space_limit,log_space_total;
bool ignore_log_space_limit;
/* /*
InnoDB internally stores the master log position it has processed InnoDB internally stores the master log position it has processed
......
...@@ -858,22 +858,21 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -858,22 +858,21 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->relay_log_name) if (lex_mi->relay_log_name)
{ {
need_relay_log_purge = 0; need_relay_log_purge= 0;
mi->rli.skip_log_purge=1;
strmake(mi->rli.relay_log_name,lex_mi->relay_log_name, strmake(mi->rli.relay_log_name,lex_mi->relay_log_name,
sizeof(mi->rli.relay_log_name)-1); sizeof(mi->rli.relay_log_name)-1);
} }
if (lex_mi->relay_log_pos) if (lex_mi->relay_log_pos)
{ {
need_relay_log_purge=0; need_relay_log_purge= 0;
mi->rli.relay_log_pos=lex_mi->relay_log_pos; mi->rli.relay_log_pos=lex_mi->relay_log_pos;
} }
flush_master_info(mi); flush_master_info(mi);
if (need_relay_log_purge) if (need_relay_log_purge)
{ {
mi->rli.skip_log_purge=0; mi->rli.skip_log_purge= 0;
thd->proc_info="purging old relay logs"; thd->proc_info="purging old relay logs";
if (purge_relay_logs(&mi->rli, thd, if (purge_relay_logs(&mi->rli, thd,
0 /* not only reset, but also reinit */, 0 /* not only reset, but also reinit */,
...@@ -887,6 +886,7 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -887,6 +886,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
else else
{ {
const char* msg; const char* msg;
mi->rli.skip_log_purge= 1;
/* Relay log is already initialized */ /* Relay log is already initialized */
if (init_relay_log_pos(&mi->rli, if (init_relay_log_pos(&mi->rli,
mi->rli.relay_log_name, mi->rli.relay_log_name,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment