Commit b03ec0d7 authored by guilhem@mysql.com's avatar guilhem@mysql.com

Replication: new code to not modify in-memory log positions until the COMMIT

is executed, even if the transaction spans on >=2 relay logs (bug #53).
New variable relay_log_purge =0|1
New test to verify bug #53
parent d1609043
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
stop slave;
create table t1 (a int) type=innodb;
reset slave;
start slave;
stop slave;
start slave;
select master_pos_wait('master-bin.001',3000,120)=-1;
master_pos_wait('master-bin.001',3000,120)=-1
0
select * from t1 where a=8000;
a
8000
-O max_binlog_size=16384
--innodb
--log-warnings
# When the relay log gets rotated while the I/O thread
# is reading a transaction, the transaction spans on two or more
# relay logs. If STOP SLAVE occurs while the SQL thread is
# executing a part of the transaction in the non-first relay logs,
# we test if START SLAVE will resume in the beginning of the
# transaction (i.e., step back to the first relay log)
# The slave is started with max_binlog_size=16384 bytes,
# to force many rotations (approximately 30 rotations)
# If the master or slave does not support InnoDB, this test will pass
source include/master-slave.inc;
connection slave;
stop slave;
connection master;
create table t1 (a int) type=innodb;
let $1=8000;
disable_query_log;
begin;
while ($1)
{
# eval means expand $ expressions
eval insert into t1 values( $1 );
dec $1;
}
commit;
# This will generate a 500kB master's binlog,
# which corresponds to 30 slave's relay logs.
enable_query_log;
save_master_pos;
connection slave;
reset slave;
start slave;
# We wait 1 sec for the SQL thread to be somewhere in
# the middle of the transaction, hopefully not in
# the first relay log, and hopefully before the COMMIT.
# Usually it stops when the SQL thread is around the 15th relay log.
# We cannot use MASTER_POS_WAIT() as master's position
# increases only when the slave executes the COMMIT.
system sleep 1;
stop slave;
# We suppose the SQL thread stopped before COMMIT.
# If so the transaction was rolled back
# and the table is now empty.
# Now restart
start slave;
# And see if the table contains '8000'
# which proves that the transaction restarted at
# the right place.
# We must wait for the transaction to commit before
# reading, MASTER_POS_WAIT() will do it for sure
# (the only statement with position>=3000 is COMMIT).
# Older versions of MySQL would hang forever in MASTER_POS_WAIT
# because COMMIT was said to be position 0 in the master's log (bug).
# Detect this with timeout.
select master_pos_wait('master-bin.001',3000,120)=-1;
select * from t1 where a=8000;
# Note that the simple fact to have less than around 30 slave's binlogs
# (the slave is started with --log-slave-updates) is already
# a proof that the transaction was not properly resumed.
...@@ -592,24 +592,32 @@ err: ...@@ -592,24 +592,32 @@ err:
/* /*
Delete the current log file, remove it from index file and start on next Delete relay log files prior to rli->group_relay_log_name
(i.e. all logs which are not involved in a non-finished group
(transaction)), remove them from the index file and start on next relay log.
SYNOPSIS SYNOPSIS
purge_first_log() purge_first_log()
rli Relay log information rli Relay log information
included If false, all relay logs that are strictly before
rli->group_relay_log_name are deleted ; if true, the latter is
deleted too (i.e. all relay logs
read by the SQL slave thread are deleted).
NOTE NOTE
- This is only called from the slave-execute thread when it has read - This is only called from the slave-execute thread when it has read
all commands from a log and want to switch to a new log. all commands from a relay log and want to switch to a new relay log.
- When this happens, we should never be in an active transaction as - When this happens, we can be in an active transaction as
a transaction is always written as a single block to the binary log. a transaction can span over two relay logs
(although it is always written as a single block to the master's binary
log, hence cannot span over two master's binary logs).
IMPLEMENTATION IMPLEMENTATION
- Protects index file with LOCK_index - Protects index file with LOCK_index
- Delete first log file, - Delete relevant relay log files
- Copy all file names after this one to the front of the index file - Copy all file names after these ones to the front of the index file
- If the OS has truncate, truncate the file, else fill it with \n' - If the OS has truncate, truncate the file, else fill it with \n'
- Read the first file name from the index file and store in rli->linfo - Read the next file name from the index file and store in rli->linfo
RETURN VALUES RETURN VALUES
0 ok 0 ok
...@@ -620,66 +628,68 @@ err: ...@@ -620,66 +628,68 @@ err:
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
int MYSQL_LOG::purge_first_log(struct st_relay_log_info* rli) int MYSQL_LOG::purge_first_log(struct st_relay_log_info* rli, bool included)
{ {
int error; int error;
DBUG_ENTER("purge_first_log"); DBUG_ENTER("purge_first_log");
/*
Test pre-conditions.
Assume that we have previously read the first log and
stored it in rli->relay_log_name
*/
DBUG_ASSERT(is_open()); DBUG_ASSERT(is_open());
DBUG_ASSERT(rli->slave_running == 1); DBUG_ASSERT(rli->slave_running == 1);
DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->relay_log_name)); DBUG_ASSERT(!strcmp(rli->linfo.log_file_name,rli->event_relay_log_name));
DBUG_ASSERT(rli->linfo.index_file_offset ==
strlen(rli->relay_log_name) + 1);
/* We have already processed the relay log, so it's safe to delete it */
my_delete(rli->relay_log_name, MYF(0));
pthread_mutex_lock(&LOCK_index); pthread_mutex_lock(&LOCK_index);
if (copy_up_file_and_fill(&index_file, rli->linfo.index_file_offset)) pthread_mutex_lock(&rli->log_space_lock);
{ rli->relay_log.purge_logs(rli->group_relay_log_name, included,
error= LOG_INFO_IO; 0, 0, &rli->log_space_total);
goto err; // Tell the I/O thread to take the relay_log_space_limit into account
} rli->ignore_log_space_limit= 0;
pthread_mutex_unlock(&rli->log_space_lock);
/* /*
Update the space counter used by all relay logs
Ok to broadcast after the critical region as there is no risk of Ok to broadcast after the critical region as there is no risk of
the mutex being destroyed by this thread later - this helps save the mutex being destroyed by this thread later - this helps save
context switches context switches
*/ */
pthread_mutex_lock(&rli->log_space_lock);
rli->log_space_total -= rli->relay_log_pos;
//tell the I/O thread to take the relay_log_space_limit into account
rli->ignore_log_space_limit= 0;
pthread_mutex_unlock(&rli->log_space_lock);
pthread_cond_broadcast(&rli->log_space_cond); pthread_cond_broadcast(&rli->log_space_cond);
/* /*
Read the next log file name from the index file and pass it back to Read the next log file name from the index file and pass it back to
the caller the caller
If included is true, we want the first relay log;
otherwise we want the one after event_relay_log_name.
*/ */
if ((error=find_log_pos(&rli->linfo, NullS, 0 /*no mutex*/))) if ((included && (error=find_log_pos(&rli->linfo, NullS, 0))) ||
(!included &&
((error=find_log_pos(&rli->linfo, rli->event_relay_log_name, 0)) ||
(error=find_next_log(&rli->linfo, 0)))))
{ {
char buff[22]; char buff[22];
sql_print_error("next log error: %d offset: %s log: %s", sql_print_error("next log error: %d offset: %s log: %s included: %d",
error, error,
llstr(rli->linfo.index_file_offset,buff), llstr(rli->linfo.index_file_offset,buff),
rli->linfo.log_file_name); rli->group_relay_log_name,
included);
goto err; goto err;
} }
/*
Reset rli's coordinates to the current log.
*/
rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
sizeof(rli->event_relay_log_name)-1);
/* /*
Reset position to current log. This involves setting both of the If we removed the rli->group_relay_log_name file,
position variables: we must update the rli->group* coordinates, otherwise do not touch it as the
group's execution is not finished (e.g. COMMIT not executed)
*/ */
rli->relay_log_pos = BIN_LOG_HEADER_SIZE; if (included)
rli->pending = 0; {
strmake(rli->relay_log_name,rli->linfo.log_file_name, rli->group_relay_log_pos = BIN_LOG_HEADER_SIZE;
sizeof(rli->relay_log_name)-1); strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
sizeof(rli->group_relay_log_name)-1);
}
/* Store where we are in the new file for the execution thread */ /* Store where we are in the new file for the execution thread */
flush_relay_log_info(rli); flush_relay_log_info(rli);
...@@ -693,12 +703,13 @@ err: ...@@ -693,12 +703,13 @@ err:
Update log index_file Update log index_file
*/ */
int MYSQL_LOG::update_log_index(LOG_INFO* log_info) int MYSQL_LOG::update_log_index(LOG_INFO* log_info, bool need_update_threads)
{ {
if (copy_up_file_and_fill(&index_file, log_info->index_file_start_offset)) if (copy_up_file_and_fill(&index_file, log_info->index_file_start_offset))
return LOG_INFO_IO; return LOG_INFO_IO;
// now update offsets in index file for running threads // now update offsets in index file for running threads
if (need_update_threads)
adjust_linfo_offsets(log_info->index_file_start_offset); adjust_linfo_offsets(log_info->index_file_start_offset);
return 0; return 0;
} }
...@@ -708,9 +719,13 @@ int MYSQL_LOG::update_log_index(LOG_INFO* log_info) ...@@ -708,9 +719,13 @@ int MYSQL_LOG::update_log_index(LOG_INFO* log_info)
SYNOPSIS SYNOPSIS
purge_logs() purge_logs()
thd Thread pointer to_log Delete all log file name before this file.
to_log Delete all log file name before this file. This file is not included If true, to_log is deleted too.
deleted need_mutex
need_update_threads If we want to update the log coordinates of
all threads. False for relay logs, true otherwise.
freed_log_space If not null, decrement this variable of
the amount of log space freed
NOTES NOTES
If any of the logs before the deleted one is in use, If any of the logs before the deleted one is in use,
...@@ -722,31 +737,59 @@ int MYSQL_LOG::update_log_index(LOG_INFO* log_info) ...@@ -722,31 +737,59 @@ int MYSQL_LOG::update_log_index(LOG_INFO* log_info)
LOG_INFO_EOF to_log not found LOG_INFO_EOF to_log not found
*/ */
int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) int MYSQL_LOG::purge_logs(const char *to_log,
bool included,
bool need_mutex,
bool need_update_threads,
ulonglong *decrease_log_space)
{ {
int error; int error;
bool exit_loop= 0;
LOG_INFO log_info; LOG_INFO log_info;
DBUG_ENTER("purge_logs"); DBUG_ENTER("purge_logs");
DBUG_PRINT("info",("to_log= %s",to_log));
if (no_rotate) if (no_rotate)
DBUG_RETURN(LOG_INFO_PURGE_NO_ROTATE); DBUG_RETURN(LOG_INFO_PURGE_NO_ROTATE);
if (need_mutex)
pthread_mutex_lock(&LOCK_index); pthread_mutex_lock(&LOCK_index);
if ((error=find_log_pos(&log_info, to_log, 0 /*no mutex*/))) if ((error=find_log_pos(&log_info, to_log, 0 /*no mutex*/)))
goto err; goto err;
/* /*
File name exists in index file; Delete until we find this file File name exists in index file; delete until we find this file
or a file that is used. or a file that is used.
*/ */
if ((error=find_log_pos(&log_info, NullS, 0 /*no mutex*/))) if ((error=find_log_pos(&log_info, NullS, 0 /*no mutex*/)))
goto err; goto err;
while (strcmp(to_log,log_info.log_file_name) && while ((strcmp(to_log,log_info.log_file_name) || (exit_loop=included)) &&
!log_in_use(log_info.log_file_name)) !log_in_use(log_info.log_file_name))
{ {
/* It's not fatal even if we can't delete a log file */ ulong tmp;
my_delete(log_info.log_file_name, MYF(0)); if (decrease_log_space) //stat the file we want to delete
if (find_next_log(&log_info, 0)) {
MY_STAT s;
if (my_stat(log_info.log_file_name,&s,MYF(0)))
tmp= s.st_size;
else
{
/*
If we could not stat, we can't know the amount
of space that deletion will free. In most cases,
deletion won't work either, so it's not a problem.
*/
tmp= 0;
}
}
/*
It's not fatal if we can't delete a log file ;
if we could delete it, take its size into account
*/
DBUG_PRINT("info",("purging %s",log_info.log_file_name));
if (!my_delete(log_info.log_file_name, MYF(0)) && decrease_log_space)
*decrease_log_space-= tmp;
if (find_next_log(&log_info, 0) || exit_loop)
break; break;
} }
...@@ -754,9 +797,10 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log) ...@@ -754,9 +797,10 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log)
If we get killed -9 here, the sysadmin would have to edit If we get killed -9 here, the sysadmin would have to edit
the log index file after restart - otherwise, this should be safe the log index file after restart - otherwise, this should be safe
*/ */
error= update_log_index(&log_info); error= update_log_index(&log_info, need_update_threads);
err: err:
if (need_mutex)
pthread_mutex_unlock(&LOCK_index); pthread_mutex_unlock(&LOCK_index);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -779,7 +823,7 @@ err: ...@@ -779,7 +823,7 @@ err:
LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated LOG_INFO_PURGE_NO_ROTATE Binary file that can't be rotated
*/ */
int MYSQL_LOG::purge_logs_before_date(THD* thd, time_t purge_time) int MYSQL_LOG::purge_logs_before_date(time_t purge_time)
{ {
int error; int error;
LOG_INFO log_info; LOG_INFO log_info;
...@@ -816,7 +860,7 @@ int MYSQL_LOG::purge_logs_before_date(THD* thd, time_t purge_time) ...@@ -816,7 +860,7 @@ int MYSQL_LOG::purge_logs_before_date(THD* thd, time_t purge_time)
If we get killed -9 here, the sysadmin would have to edit If we get killed -9 here, the sysadmin would have to edit
the log index file after restart - otherwise, this should be safe the log index file after restart - otherwise, this should be safe
*/ */
error= update_log_index(&log_info); error= update_log_index(&log_info, 1);
err: err:
pthread_mutex_unlock(&LOCK_index); pthread_mutex_unlock(&LOCK_index);
...@@ -1269,7 +1313,7 @@ err: ...@@ -1269,7 +1313,7 @@ err:
{ {
long purge_time= time(0) - expire_logs_days*24*60*60; long purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0) if (purge_time >= 0)
error= purge_logs_before_date(current_thd, purge_time); error= purge_logs_before_date(purge_time);
} }
#endif #endif
...@@ -1534,7 +1578,6 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length, ...@@ -1534,7 +1578,6 @@ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
If you don't do it this way, you will get a deadlock in THD::awake() If you don't do it this way, you will get a deadlock in THD::awake()
*/ */
void MYSQL_LOG:: wait_for_update(THD* thd) void MYSQL_LOG:: wait_for_update(THD* thd)
{ {
safe_mutex_assert_owner(&LOCK_log); safe_mutex_assert_owner(&LOCK_log);
......
...@@ -310,11 +310,36 @@ int Log_event::exec_event(struct st_relay_log_info* rli) ...@@ -310,11 +310,36 @@ int Log_event::exec_event(struct st_relay_log_info* rli)
*/ */
if (rli) if (rli)
{ {
if (rli->inside_transaction) /*
rli->inc_pending(get_event_len()); If in a transaction, and if the slave supports transactions,
just inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
(not OPTION_NOT_AUTOCOMMIT) as transactions are logged
with BEGIN/COMMIT, not with SET AUTOCOMMIT= .
CAUTION: opt_using_transactions means
innodb || bdb ; suppose the master supports InnoDB and BDB,
but the slave supports only BDB, problems
will arise:
- suppose an InnoDB table is created on the master,
- then it will be MyISAM on the slave
- but as opt_using_transactions is true, the slave will believe he is
transactional with the MyISAM table. And problems will come when one
does START SLAVE; STOP SLAVE; START SLAVE; (the slave will resume at BEGIN
whereas there has not been any rollback). This is the problem of
using opt_using_transactions instead of a finer
"does the slave support _the_transactional_handler_used_on_the_master_".
More generally, we'll have problems when a query mixes a transactional
handler and MyISAM and STOP SLAVE is issued in the middle of the
"transaction". START SLAVE will resume at BEGIN while the MyISAM table has
already been updated.
*/
if ((thd->options & OPTION_BEGIN) && opt_using_transactions)
rli->inc_event_relay_log_pos(get_event_len());
else else
{ {
rli->inc_pos(get_event_len(),log_pos); rli->inc_group_relay_log_pos(get_event_len(),log_pos);
flush_relay_log_info(rli); flush_relay_log_info(rli);
} }
} }
...@@ -878,9 +903,13 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -878,9 +903,13 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
thd->db = rewrite_db((char*)db); thd->db = rewrite_db((char*)db);
/* /*
InnoDB internally stores the master log position it has processed so far; InnoDB internally stores the master log position it has executed so far,
position to store is really pos + pending + event_len i.e. the position just after the COMMIT event.
since we must store the pos of the END of the current log event When InnoDB will want to store, the positions in rli won't have
been updated yet, so group_master_log_* will point to old BEGIN
and event_master_log* will point to the beginning of current COMMIT.
So the position to store is event_master_log_pos + event_len
since we must store the pos of the END of the current log event (COMMIT).
*/ */
rli->event_len= get_event_len(); rli->event_len= get_event_len();
...@@ -909,18 +938,6 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -909,18 +938,6 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
DBUG_PRINT("query",("%s",thd->query)); DBUG_PRINT("query",("%s",thd->query));
mysql_parse(thd, thd->query, q_len); mysql_parse(thd, thd->query, q_len);
/*
Set a flag if we are inside an transaction so that we can restart
the transaction from the start if we are killed
This will only be done if we are supporting transactional tables
in the slave.
*/
if (!strcmp(thd->query,"BEGIN"))
rli->inside_transaction= opt_using_transactions;
else if (!strcmp(thd->query,"COMMIT"))
rli->inside_transaction=0;
DBUG_PRINT("info",("expected_error: %d last_errno: %d", DBUG_PRINT("info",("expected_error: %d last_errno: %d",
expected_error, thd->net.last_errno)); expected_error, thd->net.last_errno));
if ((expected_error != (actual_error= thd->net.last_errno)) && if ((expected_error != (actual_error= thd->net.last_errno)) &&
...@@ -1776,14 +1793,15 @@ int Rotate_log_event::write_data(IO_CACHE* file) ...@@ -1776,14 +1793,15 @@ int Rotate_log_event::write_data(IO_CACHE* file)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Rotate_log_event::exec_event(struct st_relay_log_info* rli) int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
{ {
char* log_name = rli->master_log_name; char* log_name = rli->group_master_log_name;
DBUG_ENTER("Rotate_log_event::exec_event"); DBUG_ENTER("Rotate_log_event::exec_event");
pthread_mutex_lock(&rli->data_lock); pthread_mutex_lock(&rli->data_lock);
memcpy(log_name, new_log_ident, ident_len+1); memcpy(log_name, new_log_ident, ident_len+1);
rli->master_log_pos = pos; rli->group_master_log_pos = pos;
rli->relay_log_pos += get_event_len(); rli->event_relay_log_pos += get_event_len();
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) rli->master_log_pos)); rli->group_relay_log_pos = rli->event_relay_log_pos;
DBUG_PRINT("info", ("group_master_log_pos: %d", (ulong) rli->group_master_log_pos));
pthread_mutex_unlock(&rli->data_lock); pthread_mutex_unlock(&rli->data_lock);
pthread_cond_broadcast(&rli->data_cond); pthread_cond_broadcast(&rli->data_cond);
flush_relay_log_info(rli); flush_relay_log_info(rli);
...@@ -1905,7 +1923,7 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -1905,7 +1923,7 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
thd->next_insert_id = val; thd->next_insert_id = val;
break; break;
} }
rli->inc_pending(get_event_len()); rli->inc_event_relay_log_pos(get_event_len());
return 0; return 0;
} }
#endif #endif
...@@ -1967,7 +1985,7 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -1967,7 +1985,7 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli)
{ {
thd->rand.seed1= (ulong) seed1; thd->rand.seed1= (ulong) seed1;
thd->rand.seed2= (ulong) seed2; thd->rand.seed2= (ulong) seed2;
rli->inc_pending(get_event_len()); rli->inc_event_relay_log_pos(get_event_len());
return 0; return 0;
} }
#endif // !MYSQL_CLIENT #endif // !MYSQL_CLIENT
...@@ -2199,7 +2217,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2199,7 +2217,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli)
e.update_hash(val, val_len, type, charset, Item::COER_NOCOLL); e.update_hash(val, val_len, type, charset, Item::COER_NOCOLL);
free_root(&thd->mem_root,0); free_root(&thd->mem_root,0);
rli->inc_pending(get_event_len()); rli->inc_event_relay_log_pos(get_event_len());
return 0; return 0;
} }
#endif // !MYSQL_CLIENT #endif // !MYSQL_CLIENT
...@@ -2241,7 +2259,7 @@ Slave_log_event::Slave_log_event(THD* thd_arg, ...@@ -2241,7 +2259,7 @@ Slave_log_event::Slave_log_event(THD* thd_arg,
pthread_mutex_lock(&mi->data_lock); pthread_mutex_lock(&mi->data_lock);
pthread_mutex_lock(&rli->data_lock); pthread_mutex_lock(&rli->data_lock);
master_host_len = strlen(mi->host); master_host_len = strlen(mi->host);
master_log_len = strlen(rli->master_log_name); master_log_len = strlen(rli->group_master_log_name);
// on OOM, just do not initialize the structure and print the error // on OOM, just do not initialize the structure and print the error
if ((mem_pool = (char*)my_malloc(get_data_size() + 1, if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
MYF(MY_WME)))) MYF(MY_WME))))
...@@ -2249,9 +2267,9 @@ Slave_log_event::Slave_log_event(THD* thd_arg, ...@@ -2249,9 +2267,9 @@ Slave_log_event::Slave_log_event(THD* thd_arg,
master_host = mem_pool + SL_MASTER_HOST_OFFSET ; master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
memcpy(master_host, mi->host, master_host_len + 1); memcpy(master_host, mi->host, master_host_len + 1);
master_log = master_host + master_host_len + 1; master_log = master_host + master_host_len + 1;
memcpy(master_log, rli->master_log_name, master_log_len + 1); memcpy(master_log, rli->group_master_log_name, master_log_len + 1);
master_port = mi->port; master_port = mi->port;
master_pos = rli->master_log_pos; master_pos = rli->group_master_log_pos;
DBUG_PRINT("info", ("master_log: %s pos: %d", master_log, DBUG_PRINT("info", ("master_log: %s pos: %d", master_log,
(ulong) master_pos)); (ulong) master_pos));
} }
...@@ -2381,19 +2399,19 @@ void Stop_log_event::print(FILE* file, bool short_form, char* last_db) ...@@ -2381,19 +2399,19 @@ void Stop_log_event::print(FILE* file, bool short_form, char* last_db)
int Stop_log_event::exec_event(struct st_relay_log_info* rli) int Stop_log_event::exec_event(struct st_relay_log_info* rli)
{ {
// do not clean up immediately after rotate event // do not clean up immediately after rotate event
if (rli->master_log_pos > BIN_LOG_HEADER_SIZE) if (rli->group_master_log_pos > BIN_LOG_HEADER_SIZE)
{ {
close_temporary_tables(thd); close_temporary_tables(thd);
cleanup_load_tmpdir(); cleanup_load_tmpdir();
} }
/* /*
We do not want to update master_log pos because we get a rotate event We do not want to update master_log pos because we get a rotate event
before stop, so by now master_log_name is set to the next log. before stop, so by now group_master_log_name is set to the next log.
If we updated it, we will have incorrect master coordinates and this If we updated it, we will have incorrect master coordinates and this
could give false triggers in MASTER_POS_WAIT() that we have reached could give false triggers in MASTER_POS_WAIT() that we have reached
the target position when in fact we have not. the target position when in fact we have not.
*/ */
rli->inc_pos(get_event_len(), 0); rli->inc_group_relay_log_pos(get_event_len(), 0);
flush_relay_log_info(rli); flush_relay_log_info(rli);
return 0; return 0;
} }
......
...@@ -721,7 +721,7 @@ extern ulong max_binlog_size, rpl_recovery_rank, thread_cache_size; ...@@ -721,7 +721,7 @@ extern ulong max_binlog_size, rpl_recovery_rank, thread_cache_size;
extern ulong com_stat[(uint) SQLCOM_END], com_other, back_log; extern ulong com_stat[(uint) SQLCOM_END], com_other, back_log;
extern ulong specialflag, current_pid; extern ulong specialflag, current_pid;
extern ulong expire_logs_days; extern ulong expire_logs_days;
extern my_bool relay_log_purge;
extern uint test_flags,select_errors,ha_open_options; extern uint test_flags,select_errors,ha_open_options;
extern uint protocol_version,dropping_tables; extern uint protocol_version,dropping_tables;
extern uint delay_key_write_options; extern uint delay_key_write_options;
......
...@@ -314,7 +314,7 @@ my_bool opt_safe_user_create = 0, opt_no_mix_types = 0; ...@@ -314,7 +314,7 @@ my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool lower_case_table_names, opt_old_rpl_compat; my_bool lower_case_table_names, opt_old_rpl_compat;
my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0; my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0, opt_old_passwords=0, use_old_passwords=0; my_bool opt_log_slave_updates= 0, opt_old_passwords=0, use_old_passwords=0;
my_bool opt_console= 0; my_bool opt_console= 0, opt_bdb, opt_innodb;
volatile bool mqh_used = 0; volatile bool mqh_used = 0;
FILE *bootstrap_file=0; FILE *bootstrap_file=0;
...@@ -442,6 +442,7 @@ const char **errmesg; /* Error messages */ ...@@ -442,6 +442,7 @@ const char **errmesg; /* Error messages */
const char *myisam_recover_options_str="OFF"; const char *myisam_recover_options_str="OFF";
const char *sql_mode_str="OFF"; const char *sql_mode_str="OFF";
ulong rpl_recovery_rank=0; ulong rpl_recovery_rank=0;
my_bool relay_log_purge=1;
my_string mysql_unix_port=NULL, opt_mysql_tmpdir=NULL; my_string mysql_unix_port=NULL, opt_mysql_tmpdir=NULL;
MY_TMPDIR mysql_tmpdir_list; MY_TMPDIR mysql_tmpdir_list;
...@@ -2208,7 +2209,7 @@ static int init_server_components() ...@@ -2208,7 +2209,7 @@ static int init_server_components()
{ {
long purge_time= time(0) - expire_logs_days*24*60*60; long purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0) if (purge_time >= 0)
mysql_bin_log.purge_logs_before_date(current_thd, purge_time); mysql_bin_log.purge_logs_before_date(purge_time);
} }
#endif #endif
} }
...@@ -3400,7 +3401,7 @@ enum options ...@@ -3400,7 +3401,7 @@ enum options
OPT_DELAY_KEY_WRITE, OPT_CHARSETS_DIR, OPT_DELAY_KEY_WRITE, OPT_CHARSETS_DIR,
OPT_BDB_HOME, OPT_BDB_LOG, OPT_BDB_HOME, OPT_BDB_LOG,
OPT_BDB_TMP, OPT_BDB_NOSYNC, OPT_BDB_TMP, OPT_BDB_NOSYNC,
OPT_BDB_LOCK, OPT_BDB_SKIP, OPT_BDB_LOCK, OPT_BDB,
OPT_BDB_NO_RECOVER, OPT_BDB_SHARED, OPT_BDB_NO_RECOVER, OPT_BDB_SHARED,
OPT_MASTER_HOST, OPT_MASTER_USER, OPT_MASTER_HOST, OPT_MASTER_USER,
OPT_MASTER_PASSWORD, OPT_MASTER_PORT, OPT_MASTER_PASSWORD, OPT_MASTER_PORT,
...@@ -3430,7 +3431,7 @@ enum options ...@@ -3430,7 +3431,7 @@ enum options
OPT_INNODB_FLUSH_METHOD, OPT_INNODB_FLUSH_METHOD,
OPT_INNODB_FAST_SHUTDOWN, OPT_INNODB_FAST_SHUTDOWN,
OPT_SAFE_SHOW_DB, OPT_SAFE_SHOW_DB,
OPT_INNODB_SKIP, OPT_SKIP_SAFEMALLOC, OPT_INNODB, OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_TEMP_POOL, OPT_TX_ISOLATION,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL, OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL,
...@@ -3468,7 +3469,7 @@ enum options ...@@ -3468,7 +3469,7 @@ enum options
OPT_OPEN_FILES_LIMIT, OPT_OPEN_FILES_LIMIT,
OPT_QUERY_CACHE_LIMIT, OPT_QUERY_CACHE_MIN_RES_UNIT, OPT_QUERY_CACHE_SIZE, OPT_QUERY_CACHE_LIMIT, OPT_QUERY_CACHE_MIN_RES_UNIT, OPT_QUERY_CACHE_SIZE,
OPT_QUERY_CACHE_TYPE, OPT_RECORD_BUFFER, OPT_QUERY_CACHE_TYPE, OPT_RECORD_BUFFER,
OPT_RECORD_RND_BUFFER, OPT_RELAY_LOG_SPACE_LIMIT, OPT_RECORD_RND_BUFFER, OPT_RELAY_LOG_SPACE_LIMIT, OPT_RELAY_LOG_PURGE,
OPT_SLAVE_NET_TIMEOUT, OPT_SLAVE_COMPRESSED_PROTOCOL, OPT_SLOW_LAUNCH_TIME, OPT_SLAVE_NET_TIMEOUT, OPT_SLAVE_COMPRESSED_PROTOCOL, OPT_SLOW_LAUNCH_TIME,
OPT_SORT_BUFFER, OPT_TABLE_CACHE, OPT_SORT_BUFFER, OPT_TABLE_CACHE,
OPT_THREAD_CONCURRENCY, OPT_THREAD_CACHE_SIZE, OPT_THREAD_CONCURRENCY, OPT_THREAD_CACHE_SIZE,
...@@ -3529,8 +3530,10 @@ struct my_option my_long_options[] = ...@@ -3529,8 +3530,10 @@ struct my_option my_long_options[] =
(gptr*) &berkeley_tmpdir, (gptr*) &berkeley_tmpdir, 0, GET_STR, (gptr*) &berkeley_tmpdir, (gptr*) &berkeley_tmpdir, 0, GET_STR,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#endif /* HAVE_BERKELEY_DB */ #endif /* HAVE_BERKELEY_DB */
{"skip-bdb", OPT_BDB_SKIP, "Don't use berkeley db (will save memory)", {"bdb", OPT_BDB, "Enable Berkeley DB (if this version of MySQL supports it). \
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, Disable with --skip-bdb (will save memory)",
(gptr*) &opt_bdb, (gptr*) &opt_bdb, 0, GET_BOOL, NO_ARG, 1, 0, 0,
0, 0, 0},
{"big-tables", OPT_BIG_TABLES, {"big-tables", OPT_BIG_TABLES,
"Allow big result sets by saving all temporary sets on file (Solves most 'table full' errors)", "Allow big result sets by saving all temporary sets on file (Solves most 'table full' errors)",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
...@@ -3885,8 +3888,10 @@ struct my_option my_long_options[] = ...@@ -3885,8 +3888,10 @@ struct my_option my_long_options[] =
"Start without grant tables. This gives all users FULL ACCESS to all tables!", "Start without grant tables. This gives all users FULL ACCESS to all tables!",
(gptr*) &opt_noacl, (gptr*) &opt_noacl, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, (gptr*) &opt_noacl, (gptr*) &opt_noacl, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0,
0}, 0},
{"skip-innodb", OPT_INNODB_SKIP, "Don't use Innodb (will save memory)", {"innodb", OPT_INNODB, "Enable InnoDB (if this version of MySQL supports it). \
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, Disable with --skip-innodb (will save memory)",
(gptr*) &opt_innodb, (gptr*) &opt_innodb, 0, GET_BOOL, NO_ARG, 1, 0, 0,
0, 0, 0},
{"skip-locking", OPT_SKIP_LOCK, {"skip-locking", OPT_SKIP_LOCK,
"Deprecated option, use --skip-external-locking instead", "Deprecated option, use --skip-external-locking instead",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
...@@ -4272,6 +4277,11 @@ struct my_option my_long_options[] = ...@@ -4272,6 +4277,11 @@ struct my_option my_long_options[] =
(gptr*) &max_system_variables.read_buff_size,0, GET_ULONG, REQUIRED_ARG, (gptr*) &max_system_variables.read_buff_size,0, GET_ULONG, REQUIRED_ARG,
128*1024L, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE, 0}, 128*1024L, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE, 0},
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
{"relay_log_purge", OPT_RELAY_LOG_PURGE,
"0 = do not purge relay logs. 1 = purge them as soon as they are no more needed.",
(gptr*) &relay_log_purge,
(gptr*) &relay_log_purge, 0, GET_BOOL, NO_ARG,
1, 0, 1, 0, 1, 0},
{"relay_log_space_limit", OPT_RELAY_LOG_SPACE_LIMIT, {"relay_log_space_limit", OPT_RELAY_LOG_SPACE_LIMIT,
"Max space to use for all relay logs", "Max space to use for all relay logs",
(gptr*) &relay_log_space_limit, (gptr*) &relay_log_space_limit,
...@@ -5016,16 +5026,32 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -5016,16 +5026,32 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
berkeley_shared_data=1; berkeley_shared_data=1;
break; break;
#endif /* HAVE_BERKELEY_DB */ #endif /* HAVE_BERKELEY_DB */
case OPT_BDB_SKIP: case OPT_BDB:
#ifdef HAVE_BERKELEY_DB #ifdef HAVE_BERKELEY_DB
if (opt_bdb)
{
berkeley_skip=0;
have_berkeley_db=SHOW_OPTION_YES;
}
else
{
berkeley_skip=1; berkeley_skip=1;
have_berkeley_db=SHOW_OPTION_DISABLED; have_berkeley_db=SHOW_OPTION_DISABLED;
}
#endif #endif
break; break;
case OPT_INNODB_SKIP: case OPT_INNODB:
#ifdef HAVE_INNOBASE_DB #ifdef HAVE_INNOBASE_DB
if (opt_innodb)
{
innodb_skip=0;
have_innodb=SHOW_OPTION_YES;
}
else
{
innodb_skip=1; innodb_skip=1;
have_innodb=SHOW_OPTION_DISABLED; have_innodb=SHOW_OPTION_DISABLED;
}
#endif #endif
break; break;
case OPT_INNODB_DATA_FILE_PATH: case OPT_INNODB_DATA_FILE_PATH:
......
...@@ -873,7 +873,6 @@ int load_master_data(THD* thd) ...@@ -873,7 +873,6 @@ int load_master_data(THD* thd)
// don't hit the magic number // don't hit the magic number
if (active_mi->master_log_pos < BIN_LOG_HEADER_SIZE) if (active_mi->master_log_pos < BIN_LOG_HEADER_SIZE)
active_mi->master_log_pos = BIN_LOG_HEADER_SIZE; active_mi->master_log_pos = BIN_LOG_HEADER_SIZE;
active_mi->rli.pending = 0;
flush_master_info(active_mi); flush_master_info(active_mi);
} }
mc_mysql_free_result(master_status_res); mc_mysql_free_result(master_status_res);
...@@ -897,9 +896,13 @@ int load_master_data(THD* thd) ...@@ -897,9 +896,13 @@ int load_master_data(THD* thd)
return 1; return 1;
} }
pthread_mutex_lock(&active_mi->rli.data_lock); pthread_mutex_lock(&active_mi->rli.data_lock);
active_mi->rli.master_log_pos = active_mi->master_log_pos; active_mi->rli.group_master_log_pos = active_mi->master_log_pos;
strmake(active_mi->rli.master_log_name,active_mi->master_log_name, strmake(active_mi->rli.group_master_log_name,active_mi->master_log_name,
sizeof(active_mi->rli.master_log_name)-1); sizeof(active_mi->rli.group_master_log_name)-1);
/*
No need to update rli.event* coordinates, they will be when the slave
threads start ; only rli.group* coordinates are necessary here.
*/
flush_relay_log_info(&active_mi->rli); flush_relay_log_info(&active_mi->rli);
pthread_cond_broadcast(&active_mi->rli.data_cond); pthread_cond_broadcast(&active_mi->rli.data_cond);
pthread_mutex_unlock(&active_mi->rli.data_lock); pthread_mutex_unlock(&active_mi->rli.data_lock);
......
...@@ -201,6 +201,10 @@ sys_var_thd_ulong sys_read_buff_size("read_buffer_size", ...@@ -201,6 +201,10 @@ sys_var_thd_ulong sys_read_buff_size("read_buffer_size",
&SV::read_buff_size); &SV::read_buff_size);
sys_var_thd_ulong sys_read_rnd_buff_size("read_rnd_buffer_size", sys_var_thd_ulong sys_read_rnd_buff_size("read_rnd_buffer_size",
&SV::read_rnd_buff_size); &SV::read_rnd_buff_size);
#ifdef HAVE_REPLICATION
sys_var_bool_ptr sys_relay_log_purge("relay_log_purge",
&relay_log_purge);
#endif
sys_var_long_ptr sys_rpl_recovery_rank("rpl_recovery_rank", sys_var_long_ptr sys_rpl_recovery_rank("rpl_recovery_rank",
&rpl_recovery_rank); &rpl_recovery_rank);
sys_var_long_ptr sys_query_cache_size("query_cache_size", sys_var_long_ptr sys_query_cache_size("query_cache_size",
...@@ -407,6 +411,9 @@ sys_var *sys_variables[]= ...@@ -407,6 +411,9 @@ sys_var *sys_variables[]=
&sys_rand_seed2, &sys_rand_seed2,
&sys_read_buff_size, &sys_read_buff_size,
&sys_read_rnd_buff_size, &sys_read_rnd_buff_size,
#ifdef HAVE_REPLICATION
&sys_relay_log_purge,
#endif
&sys_rpl_recovery_rank, &sys_rpl_recovery_rank,
&sys_safe_updates, &sys_safe_updates,
&sys_select_limit, &sys_select_limit,
...@@ -563,6 +570,9 @@ struct show_var_st init_vars[]= { ...@@ -563,6 +570,9 @@ struct show_var_st init_vars[]= {
{sys_pseudo_thread_id.name, (char*) &sys_pseudo_thread_id, SHOW_SYS}, {sys_pseudo_thread_id.name, (char*) &sys_pseudo_thread_id, SHOW_SYS},
{sys_read_buff_size.name, (char*) &sys_read_buff_size, SHOW_SYS}, {sys_read_buff_size.name, (char*) &sys_read_buff_size, SHOW_SYS},
{sys_read_rnd_buff_size.name,(char*) &sys_read_rnd_buff_size, SHOW_SYS}, {sys_read_rnd_buff_size.name,(char*) &sys_read_rnd_buff_size, SHOW_SYS},
#ifdef HAVE_REPLICATION
{sys_relay_log_purge.name, (char*) &sys_relay_log_purge, SHOW_SYS},
#endif
{sys_rpl_recovery_rank.name,(char*) &sys_rpl_recovery_rank, SHOW_SYS}, {sys_rpl_recovery_rank.name,(char*) &sys_rpl_recovery_rank, SHOW_SYS},
#ifdef HAVE_QUERY_CACHE #ifdef HAVE_QUERY_CACHE
{sys_query_cache_limit.name,(char*) &sys_query_cache_limit, SHOW_SYS}, {sys_query_cache_limit.name,(char*) &sys_query_cache_limit, SHOW_SYS},
......
...@@ -217,11 +217,7 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len, ...@@ -217,11 +217,7 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
- If not, open the 'log' binary file. - If not, open the 'log' binary file.
TODO TODO
- check proper initialization of master_log_name/master_log_pos - check proper initialization of group_master_log_name/group_master_log_pos
- We may always want to delete all logs before 'log'.
Currently if we are not calling this with 'log' as NULL or the first
log we will never delete relay logs.
If we want this we should not set skip_log_purge to 1.
RETURN VALUES RETURN VALUES
0 ok 0 ok
...@@ -248,7 +244,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, ...@@ -248,7 +244,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
rli->cur_log_fd = -1; rli->cur_log_fd = -1;
} }
rli->relay_log_pos = pos; rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
/* /*
Test to see if the previous run was with the skip of purging Test to see if the previous run was with the skip of purging
...@@ -260,18 +256,15 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, ...@@ -260,18 +256,15 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
goto err; goto err;
} }
if (log) // If not first log if (log && rli->relay_log.find_log_pos(&rli->linfo, log, 1))
{
if (strcmp(log, rli->linfo.log_file_name))
rli->skip_log_purge= 1; // Different name; Don't purge
if (rli->relay_log.find_log_pos(&rli->linfo, log, 1))
{ {
*errmsg="Could not find target log during relay log initialization"; *errmsg="Could not find target log during relay log initialization";
goto err; goto err;
} }
} strmake(rli->group_relay_log_name,rli->linfo.log_file_name,
strmake(rli->relay_log_name,rli->linfo.log_file_name, sizeof(rli->group_relay_log_name)-1);
sizeof(rli->relay_log_name)-1); strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
sizeof(rli->event_relay_log_name)-1);
if (rli->relay_log.is_active(rli->linfo.log_file_name)) if (rli->relay_log.is_active(rli->linfo.log_file_name))
{ {
/* /*
...@@ -302,7 +295,7 @@ err: ...@@ -302,7 +295,7 @@ err:
If we don't purge, we can't honour relay_log_space_limit ; If we don't purge, we can't honour relay_log_space_limit ;
silently discard it silently discard it
*/ */
if (rli->skip_log_purge) if (!relay_log_purge)
rli->log_space_limit= 0; rli->log_space_limit= 0;
pthread_cond_broadcast(&rli->data_cond); pthread_cond_broadcast(&rli->data_cond);
if (need_data_lock) if (need_data_lock)
...@@ -383,9 +376,8 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, ...@@ -383,9 +376,8 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
to display fine in any case. to display fine in any case.
*/ */
rli->master_log_name[0]= 0; rli->group_master_log_name[0]= 0;
rli->master_log_pos= 0; rli->group_master_log_pos= 0;
rli->pending= 0;
if (!rli->inited) if (!rli->inited)
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -402,14 +394,16 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, ...@@ -402,14 +394,16 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
goto err; goto err;
} }
/* Save name of used relay log file */ /* Save name of used relay log file */
strmake(rli->relay_log_name, rli->relay_log.get_log_fname(), strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
sizeof(rli->relay_log_name)-1); sizeof(rli->group_relay_log_name)-1);
strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
sizeof(rli->event_relay_log_name)-1);
// Just first log with magic number and nothing else // Just first log with magic number and nothing else
rli->log_space_total= BIN_LOG_HEADER_SIZE; rli->log_space_total= BIN_LOG_HEADER_SIZE;
rli->relay_log_pos= BIN_LOG_HEADER_SIZE; rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
rli->relay_log.reset_bytes_written(); rli->relay_log.reset_bytes_written();
if (!just_reset) if (!just_reset)
error= init_relay_log_pos(rli, rli->relay_log_name, rli->relay_log_pos, error= init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos,
0 /* do not need data lock */, errmsg); 0 /* do not need data lock */, errmsg);
err: err:
...@@ -1238,13 +1232,11 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) ...@@ -1238,13 +1232,11 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
fn_format(fname, info_fname, mysql_data_home, "", 4+32); fn_format(fname, info_fname, mysql_data_home, "", 4+32);
pthread_mutex_lock(&rli->data_lock); pthread_mutex_lock(&rli->data_lock);
info_fd = rli->info_fd; info_fd = rli->info_fd;
rli->pending = 0;
rli->cur_log_fd = -1; rli->cur_log_fd = -1;
rli->slave_skip_counter=0; rli->slave_skip_counter=0;
rli->abort_pos_wait=0; rli->abort_pos_wait=0;
rli->skip_log_purge=0; rli->log_space_limit= relay_log_space_limit;
rli->log_space_limit = relay_log_space_limit; rli->log_space_total= 0;
rli->log_space_total = 0;
// TODO: make this work with multi-master // TODO: make this work with multi-master
if (!opt_relay_logname) if (!opt_relay_logname)
...@@ -1285,8 +1277,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) ...@@ -1285,8 +1277,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */, if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
&msg)) &msg))
goto err; goto err;
rli->master_log_name[0]= 0; rli->group_master_log_name[0]= 0;
rli->master_log_pos= 0; rli->group_master_log_pos= 0;
rli->info_fd= info_fd; rli->info_fd= info_fd;
} }
else // file exists else // file exists
...@@ -1307,31 +1299,33 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) ...@@ -1307,31 +1299,33 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->info_fd = info_fd; rli->info_fd = info_fd;
int relay_log_pos, master_log_pos; int relay_log_pos, master_log_pos;
if (init_strvar_from_file(rli->relay_log_name, if (init_strvar_from_file(rli->group_relay_log_name,
sizeof(rli->relay_log_name), &rli->info_file, sizeof(rli->group_relay_log_name), &rli->info_file,
"") || "") ||
init_intvar_from_file(&relay_log_pos, init_intvar_from_file(&relay_log_pos,
&rli->info_file, BIN_LOG_HEADER_SIZE) || &rli->info_file, BIN_LOG_HEADER_SIZE) ||
init_strvar_from_file(rli->master_log_name, init_strvar_from_file(rli->group_master_log_name,
sizeof(rli->master_log_name), &rli->info_file, sizeof(rli->group_master_log_name), &rli->info_file,
"") || "") ||
init_intvar_from_file(&master_log_pos, &rli->info_file, 0)) init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
{ {
msg="Error reading slave log configuration"; msg="Error reading slave log configuration";
goto err; goto err;
} }
rli->relay_log_pos= relay_log_pos; strmake(rli->event_relay_log_name,rli->group_relay_log_name,
rli->master_log_pos= master_log_pos; sizeof(rli->event_relay_log_name)-1);
rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
rli->group_master_log_pos= master_log_pos;
if (init_relay_log_pos(rli, if (init_relay_log_pos(rli,
rli->relay_log_name, rli->group_relay_log_name,
rli->relay_log_pos, rli->group_relay_log_pos,
0 /* no data lock*/, 0 /* no data lock*/,
&msg)) &msg))
goto err; goto err;
} }
DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE); DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
/* /*
Now change the cache from READ to WRITE - must do this Now change the cache from READ to WRITE - must do this
before flush_relay_log_info before flush_relay_log_info
...@@ -1407,7 +1401,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli) ...@@ -1407,7 +1401,7 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
{ {
LOG_INFO linfo; LOG_INFO linfo;
DBUG_ENTER("count_relay_log_space"); DBUG_ENTER("count_relay_log_space");
rli->log_space_total = 0; rli->log_space_total= 0;
if (rli->relay_log.find_log_pos(&linfo, NullS, 1)) if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
{ {
sql_print_error("Could not find first log while counting relay log space"); sql_print_error("Could not find first log while counting relay log space");
...@@ -1631,10 +1625,10 @@ int show_master_info(THD* thd, MASTER_INFO* mi) ...@@ -1631,10 +1625,10 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
protocol->store((uint32) mi->connect_retry); protocol->store((uint32) mi->connect_retry);
protocol->store(mi->master_log_name, &my_charset_bin); protocol->store(mi->master_log_name, &my_charset_bin);
protocol->store((ulonglong) mi->master_log_pos); protocol->store((ulonglong) mi->master_log_pos);
protocol->store(mi->rli.relay_log_name + protocol->store(mi->rli.group_relay_log_name +
dirname_length(mi->rli.relay_log_name), &my_charset_bin); dirname_length(mi->rli.group_relay_log_name), &my_charset_bin);
protocol->store((ulonglong) mi->rli.relay_log_pos); protocol->store((ulonglong) mi->rli.group_relay_log_pos);
protocol->store(mi->rli.master_log_name, &my_charset_bin); protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
protocol->store(mi->slave_running ? "Yes":"No", &my_charset_bin); protocol->store(mi->slave_running ? "Yes":"No", &my_charset_bin);
protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin); protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
protocol->store(&replicate_do_db); protocol->store(&replicate_do_db);
...@@ -1642,7 +1636,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi) ...@@ -1642,7 +1636,7 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
protocol->store((uint32) mi->rli.last_slave_errno); protocol->store((uint32) mi->rli.last_slave_errno);
protocol->store(mi->rli.last_slave_error, &my_charset_bin); protocol->store(mi->rli.last_slave_error, &my_charset_bin);
protocol->store((uint32) mi->rli.slave_skip_counter); protocol->store((uint32) mi->rli.slave_skip_counter);
protocol->store((ulonglong) mi->rli.master_log_pos); protocol->store((ulonglong) mi->rli.group_master_log_pos);
protocol->store((ulonglong) mi->rli.log_space_total); protocol->store((ulonglong) mi->rli.log_space_total);
pthread_mutex_unlock(&mi->rli.data_lock); pthread_mutex_unlock(&mi->rli.data_lock);
pthread_mutex_unlock(&mi->data_lock); pthread_mutex_unlock(&mi->data_lock);
...@@ -1673,17 +1667,15 @@ bool flush_master_info(MASTER_INFO* mi) ...@@ -1673,17 +1667,15 @@ bool flush_master_info(MASTER_INFO* mi)
st_relay_log_info::st_relay_log_info() st_relay_log_info::st_relay_log_info()
:info_fd(-1), cur_log_fd(-1), master_log_pos(0), save_temporary_tables(0), :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
cur_log_old_open_count(0), log_space_total(0), ignore_log_space_limit(0), cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), ignore_log_space_limit(0), slave_skip_counter(0), abort_pos_wait(0),
sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0), slave_run_id(0), sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0),
slave_running(0), skip_log_purge(0), slave_running(0)
inside_transaction(0) /* the default is autocommit=1 */ {
{ group_relay_log_name[0]= event_relay_log_name[0]= group_master_log_name[0]= 0;
relay_log_name[0] = master_log_name[0] = 0;
last_slave_error[0]=0; last_slave_error[0]=0;
bzero(&info_file,sizeof(info_file)); bzero(&info_file,sizeof(info_file));
bzero(&cache_buf, sizeof(cache_buf)); bzero(&cache_buf, sizeof(cache_buf));
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
...@@ -1745,8 +1737,8 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, ...@@ -1745,8 +1737,8 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
set_timespec(abstime,timeout); set_timespec(abstime,timeout);
DBUG_ENTER("wait_for_pos"); DBUG_ENTER("wait_for_pos");
DBUG_PRINT("enter",("master_log_name: '%s' pos: %lu timeout: %ld", DBUG_PRINT("enter",("group_master_log_name: '%s' pos: %lu timeout: %ld",
master_log_name, (ulong) master_log_pos, group_master_log_name, (ulong) group_master_log_pos,
(long) timeout)); (long) timeout));
pthread_mutex_lock(&data_lock); pthread_mutex_lock(&data_lock);
...@@ -1796,10 +1788,10 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, ...@@ -1796,10 +1788,10 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
{ {
bool pos_reached; bool pos_reached;
int cmp_result= 0; int cmp_result= 0;
DBUG_ASSERT(*master_log_name || master_log_pos == 0); DBUG_ASSERT(*group_master_log_name || group_master_log_pos == 0);
if (*master_log_name) if (*group_master_log_name)
{ {
char *basename= master_log_name + dirname_length(master_log_name); char *basename= group_master_log_name + dirname_length(group_master_log_name);
/* /*
First compare the parts before the extension. First compare the parts before the extension.
Find the dot in the master's log basename, Find the dot in the master's log basename,
...@@ -1814,13 +1806,13 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, ...@@ -1814,13 +1806,13 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
} }
// Now compare extensions. // Now compare extensions.
char *q_end; char *q_end;
ulong master_log_name_extension= strtoul(q, &q_end, 10); ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
if (master_log_name_extension < log_name_extension) if (group_master_log_name_extension < log_name_extension)
cmp_result = -1 ; cmp_result = -1 ;
else else
cmp_result= (master_log_name_extension > log_name_extension) ? 1 : 0 ; cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
} }
pos_reached = ((!cmp_result && master_log_pos >= (ulonglong)log_pos) || pos_reached = ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
cmp_result > 0); cmp_result > 0);
if (pos_reached || thd->killed) if (pos_reached || thd->killed)
break; break;
...@@ -2127,7 +2119,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) ...@@ -2127,7 +2119,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
(rli->slave_skip_counter && type_code != ROTATE_EVENT)) (rli->slave_skip_counter && type_code != ROTATE_EVENT))
{ {
/* TODO: I/O thread should not even log events with the same server id */ /* TODO: I/O thread should not even log events with the same server id */
rli->inc_pos(ev->get_event_len(), rli->inc_group_relay_log_pos(ev->get_event_len(),
type_code != STOP_EVENT ? ev->log_pos : LL(0), type_code != STOP_EVENT ? ev->log_pos : LL(0),
1/* skip lock*/); 1/* skip lock*/);
flush_relay_log_info(rli); flush_relay_log_info(rli);
...@@ -2497,15 +2489,13 @@ slave_begin: ...@@ -2497,15 +2489,13 @@ slave_begin:
rli->abort_slave = 0; rli->abort_slave = 0;
pthread_mutex_unlock(&rli->run_lock); pthread_mutex_unlock(&rli->run_lock);
pthread_cond_broadcast(&rli->start_cond); pthread_cond_broadcast(&rli->start_cond);
// This should always be set to 0 when the slave thread is started
rli->pending = 0;
//tell the I/O thread to take relay_log_space_limit into account from now on //tell the I/O thread to take relay_log_space_limit into account from now on
rli->ignore_log_space_limit= 0; rli->ignore_log_space_limit= 0;
if (init_relay_log_pos(rli, if (init_relay_log_pos(rli,
rli->relay_log_name, rli->group_relay_log_name,
rli->relay_log_pos, rli->group_relay_log_pos,
1 /*need data lock*/, &errmsg)) 1 /*need data lock*/, &errmsg))
{ {
sql_print_error("Error initializing relay log position: %s", sql_print_error("Error initializing relay log position: %s",
...@@ -2513,18 +2503,18 @@ slave_begin: ...@@ -2513,18 +2503,18 @@ slave_begin:
goto err; goto err;
} }
THD_CHECK_SENTRY(thd); THD_CHECK_SENTRY(thd);
DBUG_ASSERT(rli->relay_log_pos >= BIN_LOG_HEADER_SIZE); DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
DBUG_ASSERT(rli->sql_thd == thd); DBUG_ASSERT(rli->sql_thd == thd);
DBUG_PRINT("master_info",("log_file_name: %s position: %s", DBUG_PRINT("master_info",("log_file_name: %s position: %s",
rli->master_log_name, rli->group_master_log_name,
llstr(rli->master_log_pos,llbuff))); llstr(rli->group_master_log_pos,llbuff)));
if (global_system_variables.log_warnings) if (global_system_variables.log_warnings)
sql_print_error("Slave SQL thread initialized, starting replication in \ sql_print_error("Slave SQL thread initialized, starting replication in \
log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
llstr(rli->master_log_pos,llbuff),rli->relay_log_name, llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
llstr(rli->relay_log_pos,llbuff1)); llstr(rli->group_relay_log_pos,llbuff1));
/* Read queries from the IO/THREAD until this thread is killed */ /* Read queries from the IO/THREAD until this thread is killed */
...@@ -2541,7 +2531,7 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, ...@@ -2541,7 +2531,7 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
Error running query, slave SQL thread aborted. Fix the problem, and restart \ Error running query, slave SQL thread aborted. Fix the problem, and restart \
the slave SQL thread with \"SLAVE START\". We stopped at log \ the slave SQL thread with \"SLAVE START\". We stopped at log \
'%s' position %s", '%s' position %s",
RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff)); RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
goto err; goto err;
} }
} }
...@@ -2549,7 +2539,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ ...@@ -2549,7 +2539,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
/* Thread stopped. Print the current replication position to the log */ /* Thread stopped. Print the current replication position to the log */
sql_print_error("Slave SQL thread exiting, replication stopped in log \ sql_print_error("Slave SQL thread exiting, replication stopped in log \
'%s' at position %s", '%s' at position %s",
RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff)); RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
err: err:
VOID(pthread_mutex_lock(&LOCK_thread_count)); VOID(pthread_mutex_lock(&LOCK_thread_count));
...@@ -2699,7 +2689,7 @@ err: ...@@ -2699,7 +2689,7 @@ err:
rev The rotate log event read from the binary log rev The rotate log event read from the binary log
DESCRIPTION DESCRIPTION
Updates the master info and relay data with the place in the next binary Updates the master info with the place in the next binary
log where we should start reading. log where we should start reading.
NOTES NOTES
...@@ -3073,18 +3063,14 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli) ...@@ -3073,18 +3063,14 @@ bool flush_relay_log_info(RELAY_LOG_INFO* rli)
IO_CACHE *file = &rli->info_file; IO_CACHE *file = &rli->info_file;
char buff[FN_REFLEN*2+22*2+4], *pos; char buff[FN_REFLEN*2+22*2+4], *pos;
/* sql_thd is not set when calling from init_slave() */
if ((rli->sql_thd && rli->sql_thd->options & OPTION_BEGIN))
return 0; // Wait for COMMIT
my_b_seek(file, 0L); my_b_seek(file, 0L);
pos=strmov(buff, rli->relay_log_name); pos=strmov(buff, rli->group_relay_log_name);
*pos++='\n'; *pos++='\n';
pos=longlong2str(rli->relay_log_pos, pos, 10); pos=longlong2str(rli->group_relay_log_pos, pos, 10);
*pos++='\n'; *pos++='\n';
pos=strmov(pos, rli->master_log_name); pos=strmov(pos, rli->group_master_log_name);
*pos++='\n'; *pos++='\n';
pos=longlong2str(rli->master_log_pos, pos, 10); pos=longlong2str(rli->group_master_log_pos, pos, 10);
*pos='\n'; *pos='\n';
if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1)) if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
error=1; error=1;
...@@ -3107,7 +3093,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg) ...@@ -3107,7 +3093,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
DBUG_ENTER("reopen_relay_log"); DBUG_ENTER("reopen_relay_log");
IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf; IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name, if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
errmsg)) <0) errmsg)) <0)
DBUG_RETURN(0); DBUG_RETURN(0);
/* /*
...@@ -3115,7 +3101,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg) ...@@ -3115,7 +3101,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
relay_log_pos Current log pos relay_log_pos Current log pos
pending Number of bytes already processed from the event pending Number of bytes already processed from the event
*/ */
my_b_seek(cur_log,rli->relay_log_pos + rli->pending); my_b_seek(cur_log,rli->event_relay_log_pos);
DBUG_RETURN(cur_log); DBUG_RETURN(cur_log);
} }
...@@ -3173,7 +3159,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) ...@@ -3173,7 +3159,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
} }
} }
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending); DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
/* /*
Relay log is always in new format - if the master is 3.23, the Relay log is always in new format - if the master is 3.23, the
I/O thread will convert the format for us I/O thread will convert the format for us
...@@ -3240,8 +3226,8 @@ Log_event* next_event(RELAY_LOG_INFO* rli) ...@@ -3240,8 +3226,8 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
// prevent the I/O thread from blocking next times // prevent the I/O thread from blocking next times
rli->ignore_log_space_limit= 1; rli->ignore_log_space_limit= 1;
// If the I/O thread is blocked, unblock it // If the I/O thread is blocked, unblock it
pthread_cond_broadcast(&rli->log_space_cond);
pthread_mutex_unlock(&rli->log_space_lock); pthread_mutex_unlock(&rli->log_space_lock);
pthread_cond_broadcast(&rli->log_space_cond);
// Note that wait_for_update unlocks lock_log ! // Note that wait_for_update unlocks lock_log !
rli->relay_log.wait_for_update(rli->sql_thd); rli->relay_log.wait_for_update(rli->sql_thd);
// re-acquire data lock since we released it earlier // re-acquire data lock since we released it earlier
...@@ -3258,16 +3244,25 @@ Log_event* next_event(RELAY_LOG_INFO* rli) ...@@ -3258,16 +3244,25 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
my_close(rli->cur_log_fd, MYF(MY_WME)); my_close(rli->cur_log_fd, MYF(MY_WME));
rli->cur_log_fd = -1; rli->cur_log_fd = -1;
if (relay_log_purge)
{
/* /*
TODO: make skip_log_purge a start-up option. At this point this purge_first_log will properly set up relay log coordinates in rli.
is not critical priority If the group's coordinates are equal to the event's coordinates
(i.e. the relay log was not rotated in the middle of a group),
we can purge this relay log too.
We do ulonglong and string comparisons, this may be slow but
- purging the last relay log is nice (it can save 1GB of disk), so we
like to detect the case where we can do it, and given this,
- I see no better detection method
- purge_first_log is not called that often
*/ */
if (!rli->skip_log_purge) if (rli->relay_log.purge_first_log
{ (rli,
// purge_first_log will properly set up relay log coordinates in rli rli->group_relay_log_pos == rli->event_relay_log_pos
if (rli->relay_log.purge_first_log(rli)) && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
{ {
errmsg = "Error purging processed log"; errmsg = "Error purging processed logs";
goto err; goto err;
} }
} }
...@@ -3285,10 +3280,9 @@ Log_event* next_event(RELAY_LOG_INFO* rli) ...@@ -3285,10 +3280,9 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
errmsg = "error switching to the next log"; errmsg = "error switching to the next log";
goto err; goto err;
} }
rli->relay_log_pos = BIN_LOG_HEADER_SIZE; rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
rli->pending=0; strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
strmake(rli->relay_log_name,rli->linfo.log_file_name, sizeof(rli->event_relay_log_name)-1);
sizeof(rli->relay_log_name)-1);
flush_relay_log_info(rli); flush_relay_log_info(rli);
} }
...@@ -3336,7 +3330,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli) ...@@ -3336,7 +3330,7 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
event(errno: %d cur_log->error: %d)", event(errno: %d cur_log->error: %d)",
my_errno,cur_log->error); my_errno,cur_log->error);
// set read position to the beginning of the event // set read position to the beginning of the event
my_b_seek(cur_log,rli->relay_log_pos+rli->pending); my_b_seek(cur_log,rli->event_relay_log_pos);
/* otherwise, we have had a partial read */ /* otherwise, we have had a partial read */
errmsg = "Aborting slave SQL thread because of partial event read"; errmsg = "Aborting slave SQL thread because of partial event read";
break; // To end of function break; // To end of function
......
...@@ -92,12 +92,6 @@ typedef struct st_relay_log_info ...@@ -92,12 +92,6 @@ typedef struct st_relay_log_info
cur_log_fd - file descriptor of the current read relay log cur_log_fd - file descriptor of the current read relay log
*/ */
File info_fd,cur_log_fd; File info_fd,cur_log_fd;
/* name of current read relay log */
char relay_log_name[FN_REFLEN];
/* master log name corresponding to current read position */
char master_log_name[FN_REFLEN];
/* original log position of last processed event */
volatile my_off_t master_log_pos;
/* /*
Protected with internal locks. Protected with internal locks.
...@@ -142,20 +136,36 @@ typedef struct st_relay_log_info ...@@ -142,20 +136,36 @@ typedef struct st_relay_log_info
uint32 cur_log_old_open_count; uint32 cur_log_old_open_count;
/* /*
relay_log_pos - Current offset in the relay log. Let's call a group (of events) :
pending - In some cases we do not increment offset immediately - a transaction
after processing an event, because the following event or
needs to be processed atomically together with this one - an autocommiting query + its associated events (INSERT_ID,
such as: TIMESTAMP...)
We need these rli coordinates :
Intvar_event - sets auto_increment value - relay log name and position of the beginning of the group we currently are
Rand_event - sets the random seed executing. Needed to know where we have to restart when replication has
stopped in the middle of a group (which has been rolled back by the slave).
However, once both events have been processed, we need to - relay log name and position just after the event we have just
increment by the cumulative offset. 'pending' stores the executed. This event is part of the current group.
extra offset to be added to the position. Formerly we only had the immediately above coordinates, plus a 'pending'
variable, but this dealt wrong with the case of a transaction starting on a
relay log and finishing (commiting) on another relay log. Case which can
happen when, for example, the relay log gets rotated because of
max_binlog_size.
*/
char group_relay_log_name[FN_REFLEN];
ulonglong group_relay_log_pos;
char event_relay_log_name[FN_REFLEN];
ulonglong event_relay_log_pos;
/*
Original log name and position of the group we're currently executing
(whose coordinates are group_relay_log_name/pos in the relay log)
in the master's binlog. These concern the *group*, because in the master's
binlog the log_pos that comes with each event is the position of the
beginning of the group.
*/ */
ulonglong relay_log_pos, pending; char group_master_log_name[FN_REFLEN];
volatile my_off_t group_master_log_pos;
/* /*
Handling of the relay_log_space_limit optional constraint. Handling of the relay_log_space_limit optional constraint.
...@@ -193,36 +203,37 @@ typedef struct st_relay_log_info ...@@ -193,36 +203,37 @@ typedef struct st_relay_log_info
/* if not set, the value of other members of the structure are undefined */ /* if not set, the value of other members of the structure are undefined */
bool inited; bool inited;
volatile bool abort_slave, slave_running; volatile bool abort_slave, slave_running;
bool skip_log_purge;
bool inside_transaction;
st_relay_log_info(); st_relay_log_info();
~st_relay_log_info(); ~st_relay_log_info();
inline void inc_pending(ulonglong val)
inline void inc_event_relay_log_pos(ulonglong val)
{ {
pending += val; event_relay_log_pos+= val;
} }
/* TODO: this probably needs to be fixed */
inline void inc_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0) void inc_group_relay_log_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0)
{ {
if (!skip_lock) if (!skip_lock)
pthread_mutex_lock(&data_lock); pthread_mutex_lock(&data_lock);
relay_log_pos += val+pending; inc_event_relay_log_pos(val);
pending = 0; group_relay_log_pos= event_relay_log_pos;
if (log_pos) strmake(group_relay_log_name,event_relay_log_name,
master_log_pos = log_pos+ val; sizeof(group_relay_log_name)-1);
pthread_cond_broadcast(&data_cond);
if (!skip_lock)
pthread_mutex_unlock(&data_lock);
}
/* /*
thread safe read of position - not needed if we are in the slave thread, If the slave does not support transactions and replicates a transaction,
but required otherwise as var is a longlong users should not trust group_master_log_pos (which they can display with
SHOW SLAVE STATUS or read from relay-log.info), because to compute
group_master_log_pos the slave relies on log_pos stored in the master's
binlog, but if we are in a master's transaction these positions are always
the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
not advance as it should on the non-transactional slave (it advances by
big leaps, whereas it should advance by small leaps).
*/ */
inline void read_pos(ulonglong& var) if (log_pos) // 3.23 binlogs don't have log_posx
{ group_master_log_pos= log_pos+ val;
pthread_mutex_lock(&data_lock); pthread_cond_broadcast(&data_cond);
var = relay_log_pos; if (!skip_lock)
pthread_mutex_unlock(&data_lock); pthread_mutex_unlock(&data_lock);
} }
...@@ -334,7 +345,7 @@ typedef struct st_table_rule_ent ...@@ -334,7 +345,7 @@ typedef struct st_table_rule_ent
#define TABLE_RULE_ARR_SIZE 16 #define TABLE_RULE_ARR_SIZE 16
#define MAX_SLAVE_ERRMSG 1024 #define MAX_SLAVE_ERRMSG 1024
#define RPL_LOG_NAME (rli->master_log_name[0] ? rli->master_log_name :\ #define RPL_LOG_NAME (rli->group_master_log_name[0] ? rli->group_master_log_name :\
"FIRST") "FIRST")
#define IO_RPL_LOG_NAME (mi->master_log_name[0] ? mi->master_log_name :\ #define IO_RPL_LOG_NAME (mi->master_log_name[0] ? mi->master_log_name :\
"FIRST") "FIRST")
......
...@@ -145,10 +145,12 @@ public: ...@@ -145,10 +145,12 @@ public:
int generate_new_name(char *new_name,const char *old_name); int generate_new_name(char *new_name,const char *old_name);
void make_log_name(char* buf, const char* log_ident); void make_log_name(char* buf, const char* log_ident);
bool is_active(const char* log_file_name); bool is_active(const char* log_file_name);
int update_log_index(LOG_INFO* linfo); int update_log_index(LOG_INFO* linfo, bool need_update_threads);
int purge_logs(THD* thd, const char* to_log); int purge_logs(const char *to_log, bool included,
int purge_logs_before_date(THD* thd, time_t purge_time); bool need_mutex, bool need_update_threads,
int purge_first_log(struct st_relay_log_info* rli); ulonglong *decrease_log_space);
int purge_logs_before_date(time_t purge_time);
int purge_first_log(struct st_relay_log_info* rli, bool included);
bool reset_logs(THD* thd); bool reset_logs(THD* thd);
// if we are exiting, we also want to close the index file // if we are exiting, we also want to close the index file
void close(bool exiting = 0); void close(bool exiting = 0);
......
...@@ -3980,7 +3980,7 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables) ...@@ -3980,7 +3980,7 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables)
{ {
long purge_time= time(0) - expire_logs_days*24*60*60; long purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0) if (purge_time >= 0)
mysql_bin_log.purge_logs_before_date(thd, purge_time); mysql_bin_log.purge_logs_before_date(purge_time);
} }
#endif #endif
mysql_slow_log.new_file(1); mysql_slow_log.new_file(1);
......
...@@ -292,7 +292,7 @@ int purge_master_logs(THD* thd, const char* to_log) ...@@ -292,7 +292,7 @@ int purge_master_logs(THD* thd, const char* to_log)
char search_file_name[FN_REFLEN]; char search_file_name[FN_REFLEN];
mysql_bin_log.make_log_name(search_file_name, to_log); mysql_bin_log.make_log_name(search_file_name, to_log);
int res = mysql_bin_log.purge_logs(thd, search_file_name); int res = mysql_bin_log.purge_logs(search_file_name, 0, 1, 1, NULL);
return purge_error_message(thd, res); return purge_error_message(thd, res);
} }
...@@ -300,7 +300,7 @@ int purge_master_logs(THD* thd, const char* to_log) ...@@ -300,7 +300,7 @@ int purge_master_logs(THD* thd, const char* to_log)
int purge_master_logs_before_date(THD* thd, time_t purge_time) int purge_master_logs_before_date(THD* thd, time_t purge_time)
{ {
int res = mysql_bin_log.purge_logs_before_date(thd, purge_time); int res = mysql_bin_log.purge_logs_before_date(purge_time);
return purge_error_message(thd ,res); return purge_error_message(thd ,res);
} }
...@@ -776,24 +776,25 @@ int reset_slave(THD *thd, MASTER_INFO* mi) ...@@ -776,24 +776,25 @@ int reset_slave(THD *thd, MASTER_INFO* mi)
error=1; error=1;
goto err; goto err;
} }
//delete relay logs, clear relay log coordinates // delete relay logs, clear relay log coordinates
if ((error= purge_relay_logs(&mi->rli, thd, if ((error= purge_relay_logs(&mi->rli, thd,
1 /* just reset */, 1 /* just reset */,
&errmsg))) &errmsg)))
goto err; goto err;
//Clear master's log coordinates (only for good display of SHOW SLAVE STATUS) // Clear master's log coordinates (only for good display of SHOW SLAVE STATUS)
mi->master_log_name[0]= 0; mi->master_log_name[0]= 0;
mi->master_log_pos= BIN_LOG_HEADER_SIZE; mi->master_log_pos= BIN_LOG_HEADER_SIZE;
//close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0 // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
end_master_info(mi); end_master_info(mi);
//and delete these two files // and delete these two files
fn_format(fname, master_info_file, mysql_data_home, "", 4+32); fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
{ {
error=1; error=1;
goto err; goto err;
} }
// delete relay_log_info_file
fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32); fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
{ {
...@@ -874,7 +875,6 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -874,7 +875,6 @@ int change_master(THD* thd, MASTER_INFO* mi)
// if we change host or port, we must reset the postion // if we change host or port, we must reset the postion
mi->master_log_name[0] = 0; mi->master_log_name[0] = 0;
mi->master_log_pos= BIN_LOG_HEADER_SIZE; mi->master_log_pos= BIN_LOG_HEADER_SIZE;
mi->rli.pending = 0;
} }
if (lex_mi->log_file_name) if (lex_mi->log_file_name)
...@@ -883,7 +883,6 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -883,7 +883,6 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->pos) if (lex_mi->pos)
{ {
mi->master_log_pos= lex_mi->pos; mi->master_log_pos= lex_mi->pos;
mi->rli.pending = 0;
} }
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos)); DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
...@@ -901,20 +900,22 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -901,20 +900,22 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->relay_log_name) if (lex_mi->relay_log_name)
{ {
need_relay_log_purge= 0; need_relay_log_purge= 0;
strmake(mi->rli.relay_log_name,lex_mi->relay_log_name, strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
sizeof(mi->rli.relay_log_name)-1); sizeof(mi->rli.group_relay_log_name)-1);
strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
sizeof(mi->rli.event_relay_log_name)-1);
} }
if (lex_mi->relay_log_pos) if (lex_mi->relay_log_pos)
{ {
need_relay_log_purge= 0; need_relay_log_purge= 0;
mi->rli.relay_log_pos=lex_mi->relay_log_pos; mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
} }
flush_master_info(mi); flush_master_info(mi);
if (need_relay_log_purge) if (need_relay_log_purge)
{ {
mi->rli.skip_log_purge= 0; relay_log_purge= 1;
thd->proc_info="purging old relay logs"; thd->proc_info="purging old relay logs";
if (purge_relay_logs(&mi->rli, thd, if (purge_relay_logs(&mi->rli, thd,
0 /* not only reset, but also reinit */, 0 /* not only reset, but also reinit */,
...@@ -928,11 +929,11 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -928,11 +929,11 @@ int change_master(THD* thd, MASTER_INFO* mi)
else else
{ {
const char* msg; const char* msg;
mi->rli.skip_log_purge= 1; relay_log_purge= 0;
/* Relay log is already initialized */ /* Relay log is already initialized */
if (init_relay_log_pos(&mi->rli, if (init_relay_log_pos(&mi->rli,
mi->rli.relay_log_name, mi->rli.group_relay_log_name,
mi->rli.relay_log_pos, mi->rli.group_relay_log_pos,
0 /*no data lock*/, 0 /*no data lock*/,
&msg)) &msg))
{ {
...@@ -941,12 +942,12 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -941,12 +942,12 @@ int change_master(THD* thd, MASTER_INFO* mi)
DBUG_RETURN(1); DBUG_RETURN(1);
} }
} }
mi->rli.master_log_pos = mi->master_log_pos; mi->rli.group_master_log_pos = mi->master_log_pos;
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos)); DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
strmake(mi->rli.master_log_name,mi->master_log_name, strmake(mi->rli.group_master_log_name,mi->master_log_name,
sizeof(mi->rli.master_log_name)-1); sizeof(mi->rli.group_master_log_name)-1);
if (!mi->rli.master_log_name[0]) // uninitialized case if (!mi->rli.group_master_log_name[0]) // uninitialized case
mi->rli.master_log_pos=0; mi->rli.group_master_log_pos=0;
pthread_mutex_lock(&mi->rli.data_lock); pthread_mutex_lock(&mi->rli.data_lock);
mi->rli.abort_pos_wait++; mi->rli.abort_pos_wait++;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment