Commit 9f4f19d9 authored by unknown's avatar unknown

Replication: new code to not modify in-memory log positions until the COMMIT

is executed, even if the transaction spans on >=2 relay logs (bug #53).
New variable relay_log_purge =0|1
New test to verify bug #53


sql/log.cc:
  Now we purge a relay log only when we are sure we won't need it,
  i.e. we have executed the final query (if autocommit=1) or the COMMIT.
sql/log_event.cc:
  Better tracking of the relay log's name and position
  lastly executed, even if we are in a transaction which spans on
  2 or more relay logs.
sql/mysql_priv.h:
  new option relay_log_purge (the user can now decide himself
  if he wants his relay logs to be automatically purged or not,
  we don't make unsafe guesses like before)
sql/mysqld.cc:
  new option --innodb (replaces --skip-innodb).
  Useful for the test suite : we have skip-innodb in mysql-test-run,
  but we can ('-opt.info' file) choose to start the server with
  InnoDB for this test only.
  New option --bdb
sql/repl_failsafe.cc:
  Better tracking of the relay log's name and position
  lastly executed, even if we are in a transaction which spans on
  2 or more relay logs.
sql/set_var.cc:
  new variable relay_log_purge
sql/slave.cc:
  Better tracking of the relay log's name and position
  lastly executed, even if we are in a transaction which spans on
  2 or more relay logs.
  Now we purge a relay log only when we are sure we won't need it,
  i.e. we have executed the final query (if autocommit=1) or the COMMIT
sql/slave.h:
  Better tracking of the relay log's name and position
  lastly executed, even if we are in a transaction which spans on
  2 or more relay logs.
sql/sql_class.h:
  prototypes change
sql/sql_parse.cc:
  removed thd argument (was not used in the function's body)
sql/sql_repl.cc:
  Better tracking of the relay log's name and position
  lastly executed, even if we are in a transaction which spans on
  2 or more relay logs.
  Turn relay_log_purge silently off when someone does CHANGE
  MASTER TO RELAY_LOG_*
parent 4ac98ec5
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
stop slave;
create table t1 (a int) type=innodb;
reset slave;
start slave;
stop slave;
start slave;
select master_pos_wait('master-bin.001',3000,120)=-1;
master_pos_wait('master-bin.001',3000,120)=-1
0
select * from t1 where a=8000;
a
8000
-O max_binlog_size=16384
--innodb
--log-warnings
# When the relay log gets rotated while the I/O thread
# is reading a transaction, the transaction spans on two or more
# relay logs. If STOP SLAVE occurs while the SQL thread is
# executing a part of the transaction in the non-first relay logs,
# we test if START SLAVE will resume in the beginning of the
# transaction (i.e., step back to the first relay log)
# The slave is started with max_binlog_size=16384 bytes,
# to force many rotations (approximately 30 rotations)
# If the master or slave does not support InnoDB, this test will pass
source include/master-slave.inc;
connection slave;
stop slave;
connection master;
create table t1 (a int) type=innodb;
let $1=8000;
disable_query_log;
begin;
while ($1)
{
# eval means expand $ expressions
eval insert into t1 values( $1 );
dec $1;
}
commit;
# This will generate a 500kB master's binlog,
# which corresponds to 30 slave's relay logs.
enable_query_log;
save_master_pos;
connection slave;
reset slave;
start slave;
# We wait 1 sec for the SQL thread to be somewhere in
# the middle of the transaction, hopefully not in
# the first relay log, and hopefully before the COMMIT.
# Usually it stops when the SQL thread is around the 15th relay log.
# We cannot use MASTER_POS_WAIT() as master's position
# increases only when the slave executes the COMMIT.
system sleep 1;
stop slave;
# We suppose the SQL thread stopped before COMMIT.
# If so the transaction was rolled back
# and the table is now empty.
# Now restart
start slave;
# And see if the table contains '8000'
# which proves that the transaction restarted at
# the right place.
# We must wait for the transaction to commit before
# reading, MASTER_POS_WAIT() will do it for sure
# (the only statement with position>=3000 is COMMIT).
# Older versions of MySQL would hang forever in MASTER_POS_WAIT
# because COMMIT was said to be position 0 in the master's log (bug).
# Detect this with timeout.
select master_pos_wait('master-bin.001',3000,120)=-1;
select * from t1 where a=8000;
# Note that the simple fact to have less than around 30 slave's binlogs
# (the slave is started with --log-slave-updates) is already
# a proof that the transaction was not properly resumed.
This diff is collapsed.
...@@ -310,11 +310,36 @@ int Log_event::exec_event(struct st_relay_log_info* rli) ...@@ -310,11 +310,36 @@ int Log_event::exec_event(struct st_relay_log_info* rli)
*/ */
if (rli) if (rli)
{ {
if (rli->inside_transaction) /*
rli->inc_pending(get_event_len()); If in a transaction, and if the slave supports transactions,
just inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
(not OPTION_NOT_AUTOCOMMIT) as transactions are logged
with BEGIN/COMMIT, not with SET AUTOCOMMIT= .
CAUTION: opt_using_transactions means
innodb || bdb ; suppose the master supports InnoDB and BDB,
but the slave supports only BDB, problems
will arise:
- suppose an InnoDB table is created on the master,
- then it will be MyISAM on the slave
- but as opt_using_transactions is true, the slave will believe he is
transactional with the MyISAM table. And problems will come when one
does START SLAVE; STOP SLAVE; START SLAVE; (the slave will resume at BEGIN
whereas there has not been any rollback). This is the problem of
using opt_using_transactions instead of a finer
"does the slave support _the_transactional_handler_used_on_the_master_".
More generally, we'll have problems when a query mixes a transactional
handler and MyISAM and STOP SLAVE is issued in the middle of the
"transaction". START SLAVE will resume at BEGIN while the MyISAM table has
already been updated.
*/
if ((thd->options & OPTION_BEGIN) && opt_using_transactions)
rli->inc_event_relay_log_pos(get_event_len());
else else
{ {
rli->inc_pos(get_event_len(),log_pos); rli->inc_group_relay_log_pos(get_event_len(),log_pos);
flush_relay_log_info(rli); flush_relay_log_info(rli);
} }
} }
...@@ -878,9 +903,13 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -878,9 +903,13 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
thd->db = rewrite_db((char*)db); thd->db = rewrite_db((char*)db);
/* /*
InnoDB internally stores the master log position it has processed so far; InnoDB internally stores the master log position it has executed so far,
position to store is really pos + pending + event_len i.e. the position just after the COMMIT event.
since we must store the pos of the END of the current log event When InnoDB will want to store, the positions in rli won't have
been updated yet, so group_master_log_* will point to old BEGIN
and event_master_log* will point to the beginning of current COMMIT.
So the position to store is event_master_log_pos + event_len
since we must store the pos of the END of the current log event (COMMIT).
*/ */
rli->event_len= get_event_len(); rli->event_len= get_event_len();
...@@ -909,18 +938,6 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -909,18 +938,6 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
DBUG_PRINT("query",("%s",thd->query)); DBUG_PRINT("query",("%s",thd->query));
mysql_parse(thd, thd->query, q_len); mysql_parse(thd, thd->query, q_len);
/*
Set a flag if we are inside an transaction so that we can restart
the transaction from the start if we are killed
This will only be done if we are supporting transactional tables
in the slave.
*/
if (!strcmp(thd->query,"BEGIN"))
rli->inside_transaction= opt_using_transactions;
else if (!strcmp(thd->query,"COMMIT"))
rli->inside_transaction=0;
DBUG_PRINT("info",("expected_error: %d last_errno: %d", DBUG_PRINT("info",("expected_error: %d last_errno: %d",
expected_error, thd->net.last_errno)); expected_error, thd->net.last_errno));
if ((expected_error != (actual_error= thd->net.last_errno)) && if ((expected_error != (actual_error= thd->net.last_errno)) &&
...@@ -1776,14 +1793,15 @@ int Rotate_log_event::write_data(IO_CACHE* file) ...@@ -1776,14 +1793,15 @@ int Rotate_log_event::write_data(IO_CACHE* file)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Rotate_log_event::exec_event(struct st_relay_log_info* rli) int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
{ {
char* log_name = rli->master_log_name; char* log_name = rli->group_master_log_name;
DBUG_ENTER("Rotate_log_event::exec_event"); DBUG_ENTER("Rotate_log_event::exec_event");
pthread_mutex_lock(&rli->data_lock); pthread_mutex_lock(&rli->data_lock);
memcpy(log_name, new_log_ident, ident_len+1); memcpy(log_name, new_log_ident, ident_len+1);
rli->master_log_pos = pos; rli->group_master_log_pos = pos;
rli->relay_log_pos += get_event_len(); rli->event_relay_log_pos += get_event_len();
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) rli->master_log_pos)); rli->group_relay_log_pos = rli->event_relay_log_pos;
DBUG_PRINT("info", ("group_master_log_pos: %d", (ulong) rli->group_master_log_pos));
pthread_mutex_unlock(&rli->data_lock); pthread_mutex_unlock(&rli->data_lock);
pthread_cond_broadcast(&rli->data_cond); pthread_cond_broadcast(&rli->data_cond);
flush_relay_log_info(rli); flush_relay_log_info(rli);
...@@ -1905,7 +1923,7 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -1905,7 +1923,7 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
thd->next_insert_id = val; thd->next_insert_id = val;
break; break;
} }
rli->inc_pending(get_event_len()); rli->inc_event_relay_log_pos(get_event_len());
return 0; return 0;
} }
#endif #endif
...@@ -1967,7 +1985,7 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -1967,7 +1985,7 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli)
{ {
thd->rand.seed1= (ulong) seed1; thd->rand.seed1= (ulong) seed1;
thd->rand.seed2= (ulong) seed2; thd->rand.seed2= (ulong) seed2;
rli->inc_pending(get_event_len()); rli->inc_event_relay_log_pos(get_event_len());
return 0; return 0;
} }
#endif // !MYSQL_CLIENT #endif // !MYSQL_CLIENT
...@@ -2199,7 +2217,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) ...@@ -2199,7 +2217,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli)
e.update_hash(val, val_len, type, charset, Item::COER_NOCOLL); e.update_hash(val, val_len, type, charset, Item::COER_NOCOLL);
free_root(&thd->mem_root,0); free_root(&thd->mem_root,0);
rli->inc_pending(get_event_len()); rli->inc_event_relay_log_pos(get_event_len());
return 0; return 0;
} }
#endif // !MYSQL_CLIENT #endif // !MYSQL_CLIENT
...@@ -2241,7 +2259,7 @@ Slave_log_event::Slave_log_event(THD* thd_arg, ...@@ -2241,7 +2259,7 @@ Slave_log_event::Slave_log_event(THD* thd_arg,
pthread_mutex_lock(&mi->data_lock); pthread_mutex_lock(&mi->data_lock);
pthread_mutex_lock(&rli->data_lock); pthread_mutex_lock(&rli->data_lock);
master_host_len = strlen(mi->host); master_host_len = strlen(mi->host);
master_log_len = strlen(rli->master_log_name); master_log_len = strlen(rli->group_master_log_name);
// on OOM, just do not initialize the structure and print the error // on OOM, just do not initialize the structure and print the error
if ((mem_pool = (char*)my_malloc(get_data_size() + 1, if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
MYF(MY_WME)))) MYF(MY_WME))))
...@@ -2249,9 +2267,9 @@ Slave_log_event::Slave_log_event(THD* thd_arg, ...@@ -2249,9 +2267,9 @@ Slave_log_event::Slave_log_event(THD* thd_arg,
master_host = mem_pool + SL_MASTER_HOST_OFFSET ; master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
memcpy(master_host, mi->host, master_host_len + 1); memcpy(master_host, mi->host, master_host_len + 1);
master_log = master_host + master_host_len + 1; master_log = master_host + master_host_len + 1;
memcpy(master_log, rli->master_log_name, master_log_len + 1); memcpy(master_log, rli->group_master_log_name, master_log_len + 1);
master_port = mi->port; master_port = mi->port;
master_pos = rli->master_log_pos; master_pos = rli->group_master_log_pos;
DBUG_PRINT("info", ("master_log: %s pos: %d", master_log, DBUG_PRINT("info", ("master_log: %s pos: %d", master_log,
(ulong) master_pos)); (ulong) master_pos));
} }
...@@ -2381,19 +2399,19 @@ void Stop_log_event::print(FILE* file, bool short_form, char* last_db) ...@@ -2381,19 +2399,19 @@ void Stop_log_event::print(FILE* file, bool short_form, char* last_db)
int Stop_log_event::exec_event(struct st_relay_log_info* rli) int Stop_log_event::exec_event(struct st_relay_log_info* rli)
{ {
// do not clean up immediately after rotate event // do not clean up immediately after rotate event
if (rli->master_log_pos > BIN_LOG_HEADER_SIZE) if (rli->group_master_log_pos > BIN_LOG_HEADER_SIZE)
{ {
close_temporary_tables(thd); close_temporary_tables(thd);
cleanup_load_tmpdir(); cleanup_load_tmpdir();
} }
/* /*
We do not want to update master_log pos because we get a rotate event We do not want to update master_log pos because we get a rotate event
before stop, so by now master_log_name is set to the next log. before stop, so by now group_master_log_name is set to the next log.
If we updated it, we will have incorrect master coordinates and this If we updated it, we will have incorrect master coordinates and this
could give false triggers in MASTER_POS_WAIT() that we have reached could give false triggers in MASTER_POS_WAIT() that we have reached
the target position when in fact we have not. the target position when in fact we have not.
*/ */
rli->inc_pos(get_event_len(), 0); rli->inc_group_relay_log_pos(get_event_len(), 0);
flush_relay_log_info(rli); flush_relay_log_info(rli);
return 0; return 0;
} }
......
...@@ -721,7 +721,7 @@ extern ulong max_binlog_size, rpl_recovery_rank, thread_cache_size; ...@@ -721,7 +721,7 @@ extern ulong max_binlog_size, rpl_recovery_rank, thread_cache_size;
extern ulong com_stat[(uint) SQLCOM_END], com_other, back_log; extern ulong com_stat[(uint) SQLCOM_END], com_other, back_log;
extern ulong specialflag, current_pid; extern ulong specialflag, current_pid;
extern ulong expire_logs_days; extern ulong expire_logs_days;
extern my_bool relay_log_purge;
extern uint test_flags,select_errors,ha_open_options; extern uint test_flags,select_errors,ha_open_options;
extern uint protocol_version,dropping_tables; extern uint protocol_version,dropping_tables;
extern uint delay_key_write_options; extern uint delay_key_write_options;
......
...@@ -314,7 +314,7 @@ my_bool opt_safe_user_create = 0, opt_no_mix_types = 0; ...@@ -314,7 +314,7 @@ my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool lower_case_table_names, opt_old_rpl_compat; my_bool lower_case_table_names, opt_old_rpl_compat;
my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0; my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0, opt_old_passwords=0, use_old_passwords=0; my_bool opt_log_slave_updates= 0, opt_old_passwords=0, use_old_passwords=0;
my_bool opt_console= 0; my_bool opt_console= 0, opt_bdb, opt_innodb;
volatile bool mqh_used = 0; volatile bool mqh_used = 0;
FILE *bootstrap_file=0; FILE *bootstrap_file=0;
...@@ -442,6 +442,7 @@ const char **errmesg; /* Error messages */ ...@@ -442,6 +442,7 @@ const char **errmesg; /* Error messages */
const char *myisam_recover_options_str="OFF"; const char *myisam_recover_options_str="OFF";
const char *sql_mode_str="OFF"; const char *sql_mode_str="OFF";
ulong rpl_recovery_rank=0; ulong rpl_recovery_rank=0;
my_bool relay_log_purge=1;
my_string mysql_unix_port=NULL, opt_mysql_tmpdir=NULL; my_string mysql_unix_port=NULL, opt_mysql_tmpdir=NULL;
MY_TMPDIR mysql_tmpdir_list; MY_TMPDIR mysql_tmpdir_list;
...@@ -2208,7 +2209,7 @@ static int init_server_components() ...@@ -2208,7 +2209,7 @@ static int init_server_components()
{ {
long purge_time= time(0) - expire_logs_days*24*60*60; long purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0) if (purge_time >= 0)
mysql_bin_log.purge_logs_before_date(current_thd, purge_time); mysql_bin_log.purge_logs_before_date(purge_time);
} }
#endif #endif
} }
...@@ -3400,7 +3401,7 @@ enum options ...@@ -3400,7 +3401,7 @@ enum options
OPT_DELAY_KEY_WRITE, OPT_CHARSETS_DIR, OPT_DELAY_KEY_WRITE, OPT_CHARSETS_DIR,
OPT_BDB_HOME, OPT_BDB_LOG, OPT_BDB_HOME, OPT_BDB_LOG,
OPT_BDB_TMP, OPT_BDB_NOSYNC, OPT_BDB_TMP, OPT_BDB_NOSYNC,
OPT_BDB_LOCK, OPT_BDB_SKIP, OPT_BDB_LOCK, OPT_BDB,
OPT_BDB_NO_RECOVER, OPT_BDB_SHARED, OPT_BDB_NO_RECOVER, OPT_BDB_SHARED,
OPT_MASTER_HOST, OPT_MASTER_USER, OPT_MASTER_HOST, OPT_MASTER_USER,
OPT_MASTER_PASSWORD, OPT_MASTER_PORT, OPT_MASTER_PASSWORD, OPT_MASTER_PORT,
...@@ -3430,7 +3431,7 @@ enum options ...@@ -3430,7 +3431,7 @@ enum options
OPT_INNODB_FLUSH_METHOD, OPT_INNODB_FLUSH_METHOD,
OPT_INNODB_FAST_SHUTDOWN, OPT_INNODB_FAST_SHUTDOWN,
OPT_SAFE_SHOW_DB, OPT_SAFE_SHOW_DB,
OPT_INNODB_SKIP, OPT_SKIP_SAFEMALLOC, OPT_INNODB, OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_TEMP_POOL, OPT_TX_ISOLATION,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL, OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL,
...@@ -3468,7 +3469,7 @@ enum options ...@@ -3468,7 +3469,7 @@ enum options
OPT_OPEN_FILES_LIMIT, OPT_OPEN_FILES_LIMIT,
OPT_QUERY_CACHE_LIMIT, OPT_QUERY_CACHE_MIN_RES_UNIT, OPT_QUERY_CACHE_SIZE, OPT_QUERY_CACHE_LIMIT, OPT_QUERY_CACHE_MIN_RES_UNIT, OPT_QUERY_CACHE_SIZE,
OPT_QUERY_CACHE_TYPE, OPT_RECORD_BUFFER, OPT_QUERY_CACHE_TYPE, OPT_RECORD_BUFFER,
OPT_RECORD_RND_BUFFER, OPT_RELAY_LOG_SPACE_LIMIT, OPT_RECORD_RND_BUFFER, OPT_RELAY_LOG_SPACE_LIMIT, OPT_RELAY_LOG_PURGE,
OPT_SLAVE_NET_TIMEOUT, OPT_SLAVE_COMPRESSED_PROTOCOL, OPT_SLOW_LAUNCH_TIME, OPT_SLAVE_NET_TIMEOUT, OPT_SLAVE_COMPRESSED_PROTOCOL, OPT_SLOW_LAUNCH_TIME,
OPT_SORT_BUFFER, OPT_TABLE_CACHE, OPT_SORT_BUFFER, OPT_TABLE_CACHE,
OPT_THREAD_CONCURRENCY, OPT_THREAD_CACHE_SIZE, OPT_THREAD_CONCURRENCY, OPT_THREAD_CACHE_SIZE,
...@@ -3529,8 +3530,10 @@ struct my_option my_long_options[] = ...@@ -3529,8 +3530,10 @@ struct my_option my_long_options[] =
(gptr*) &berkeley_tmpdir, (gptr*) &berkeley_tmpdir, 0, GET_STR, (gptr*) &berkeley_tmpdir, (gptr*) &berkeley_tmpdir, 0, GET_STR,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#endif /* HAVE_BERKELEY_DB */ #endif /* HAVE_BERKELEY_DB */
{"skip-bdb", OPT_BDB_SKIP, "Don't use berkeley db (will save memory)", {"bdb", OPT_BDB, "Enable Berkeley DB (if this version of MySQL supports it). \
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, Disable with --skip-bdb (will save memory)",
(gptr*) &opt_bdb, (gptr*) &opt_bdb, 0, GET_BOOL, NO_ARG, 1, 0, 0,
0, 0, 0},
{"big-tables", OPT_BIG_TABLES, {"big-tables", OPT_BIG_TABLES,
"Allow big result sets by saving all temporary sets on file (Solves most 'table full' errors)", "Allow big result sets by saving all temporary sets on file (Solves most 'table full' errors)",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
...@@ -3885,8 +3888,10 @@ struct my_option my_long_options[] = ...@@ -3885,8 +3888,10 @@ struct my_option my_long_options[] =
"Start without grant tables. This gives all users FULL ACCESS to all tables!", "Start without grant tables. This gives all users FULL ACCESS to all tables!",
(gptr*) &opt_noacl, (gptr*) &opt_noacl, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, (gptr*) &opt_noacl, (gptr*) &opt_noacl, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0,
0}, 0},
{"skip-innodb", OPT_INNODB_SKIP, "Don't use Innodb (will save memory)", {"innodb", OPT_INNODB, "Enable InnoDB (if this version of MySQL supports it). \
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, Disable with --skip-innodb (will save memory)",
(gptr*) &opt_innodb, (gptr*) &opt_innodb, 0, GET_BOOL, NO_ARG, 1, 0, 0,
0, 0, 0},
{"skip-locking", OPT_SKIP_LOCK, {"skip-locking", OPT_SKIP_LOCK,
"Deprecated option, use --skip-external-locking instead", "Deprecated option, use --skip-external-locking instead",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
...@@ -4272,6 +4277,11 @@ struct my_option my_long_options[] = ...@@ -4272,6 +4277,11 @@ struct my_option my_long_options[] =
(gptr*) &max_system_variables.read_buff_size,0, GET_ULONG, REQUIRED_ARG, (gptr*) &max_system_variables.read_buff_size,0, GET_ULONG, REQUIRED_ARG,
128*1024L, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE, 0}, 128*1024L, IO_SIZE*2+MALLOC_OVERHEAD, ~0L, MALLOC_OVERHEAD, IO_SIZE, 0},
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
{"relay_log_purge", OPT_RELAY_LOG_PURGE,
"0 = do not purge relay logs. 1 = purge them as soon as they are no more needed.",
(gptr*) &relay_log_purge,
(gptr*) &relay_log_purge, 0, GET_BOOL, NO_ARG,
1, 0, 1, 0, 1, 0},
{"relay_log_space_limit", OPT_RELAY_LOG_SPACE_LIMIT, {"relay_log_space_limit", OPT_RELAY_LOG_SPACE_LIMIT,
"Max space to use for all relay logs", "Max space to use for all relay logs",
(gptr*) &relay_log_space_limit, (gptr*) &relay_log_space_limit,
...@@ -5016,16 +5026,32 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -5016,16 +5026,32 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
berkeley_shared_data=1; berkeley_shared_data=1;
break; break;
#endif /* HAVE_BERKELEY_DB */ #endif /* HAVE_BERKELEY_DB */
case OPT_BDB_SKIP: case OPT_BDB:
#ifdef HAVE_BERKELEY_DB #ifdef HAVE_BERKELEY_DB
berkeley_skip=1; if (opt_bdb)
have_berkeley_db=SHOW_OPTION_DISABLED; {
berkeley_skip=0;
have_berkeley_db=SHOW_OPTION_YES;
}
else
{
berkeley_skip=1;
have_berkeley_db=SHOW_OPTION_DISABLED;
}
#endif #endif
break; break;
case OPT_INNODB_SKIP: case OPT_INNODB:
#ifdef HAVE_INNOBASE_DB #ifdef HAVE_INNOBASE_DB
innodb_skip=1; if (opt_innodb)
have_innodb=SHOW_OPTION_DISABLED; {
innodb_skip=0;
have_innodb=SHOW_OPTION_YES;
}
else
{
innodb_skip=1;
have_innodb=SHOW_OPTION_DISABLED;
}
#endif #endif
break; break;
case OPT_INNODB_DATA_FILE_PATH: case OPT_INNODB_DATA_FILE_PATH:
......
...@@ -873,7 +873,6 @@ int load_master_data(THD* thd) ...@@ -873,7 +873,6 @@ int load_master_data(THD* thd)
// don't hit the magic number // don't hit the magic number
if (active_mi->master_log_pos < BIN_LOG_HEADER_SIZE) if (active_mi->master_log_pos < BIN_LOG_HEADER_SIZE)
active_mi->master_log_pos = BIN_LOG_HEADER_SIZE; active_mi->master_log_pos = BIN_LOG_HEADER_SIZE;
active_mi->rli.pending = 0;
flush_master_info(active_mi); flush_master_info(active_mi);
} }
mc_mysql_free_result(master_status_res); mc_mysql_free_result(master_status_res);
...@@ -897,9 +896,13 @@ int load_master_data(THD* thd) ...@@ -897,9 +896,13 @@ int load_master_data(THD* thd)
return 1; return 1;
} }
pthread_mutex_lock(&active_mi->rli.data_lock); pthread_mutex_lock(&active_mi->rli.data_lock);
active_mi->rli.master_log_pos = active_mi->master_log_pos; active_mi->rli.group_master_log_pos = active_mi->master_log_pos;
strmake(active_mi->rli.master_log_name,active_mi->master_log_name, strmake(active_mi->rli.group_master_log_name,active_mi->master_log_name,
sizeof(active_mi->rli.master_log_name)-1); sizeof(active_mi->rli.group_master_log_name)-1);
/*
No need to update rli.event* coordinates, they will be when the slave
threads start ; only rli.group* coordinates are necessary here.
*/
flush_relay_log_info(&active_mi->rli); flush_relay_log_info(&active_mi->rli);
pthread_cond_broadcast(&active_mi->rli.data_cond); pthread_cond_broadcast(&active_mi->rli.data_cond);
pthread_mutex_unlock(&active_mi->rli.data_lock); pthread_mutex_unlock(&active_mi->rli.data_lock);
......
...@@ -201,6 +201,10 @@ sys_var_thd_ulong sys_read_buff_size("read_buffer_size", ...@@ -201,6 +201,10 @@ sys_var_thd_ulong sys_read_buff_size("read_buffer_size",
&SV::read_buff_size); &SV::read_buff_size);
sys_var_thd_ulong sys_read_rnd_buff_size("read_rnd_buffer_size", sys_var_thd_ulong sys_read_rnd_buff_size("read_rnd_buffer_size",
&SV::read_rnd_buff_size); &SV::read_rnd_buff_size);
#ifdef HAVE_REPLICATION
sys_var_bool_ptr sys_relay_log_purge("relay_log_purge",
&relay_log_purge);
#endif
sys_var_long_ptr sys_rpl_recovery_rank("rpl_recovery_rank", sys_var_long_ptr sys_rpl_recovery_rank("rpl_recovery_rank",
&rpl_recovery_rank); &rpl_recovery_rank);
sys_var_long_ptr sys_query_cache_size("query_cache_size", sys_var_long_ptr sys_query_cache_size("query_cache_size",
...@@ -407,6 +411,9 @@ sys_var *sys_variables[]= ...@@ -407,6 +411,9 @@ sys_var *sys_variables[]=
&sys_rand_seed2, &sys_rand_seed2,
&sys_read_buff_size, &sys_read_buff_size,
&sys_read_rnd_buff_size, &sys_read_rnd_buff_size,
#ifdef HAVE_REPLICATION
&sys_relay_log_purge,
#endif
&sys_rpl_recovery_rank, &sys_rpl_recovery_rank,
&sys_safe_updates, &sys_safe_updates,
&sys_select_limit, &sys_select_limit,
...@@ -563,6 +570,9 @@ struct show_var_st init_vars[]= { ...@@ -563,6 +570,9 @@ struct show_var_st init_vars[]= {
{sys_pseudo_thread_id.name, (char*) &sys_pseudo_thread_id, SHOW_SYS}, {sys_pseudo_thread_id.name, (char*) &sys_pseudo_thread_id, SHOW_SYS},
{sys_read_buff_size.name, (char*) &sys_read_buff_size, SHOW_SYS}, {sys_read_buff_size.name, (char*) &sys_read_buff_size, SHOW_SYS},
{sys_read_rnd_buff_size.name,(char*) &sys_read_rnd_buff_size, SHOW_SYS}, {sys_read_rnd_buff_size.name,(char*) &sys_read_rnd_buff_size, SHOW_SYS},
#ifdef HAVE_REPLICATION
{sys_relay_log_purge.name, (char*) &sys_relay_log_purge, SHOW_SYS},
#endif
{sys_rpl_recovery_rank.name,(char*) &sys_rpl_recovery_rank, SHOW_SYS}, {sys_rpl_recovery_rank.name,(char*) &sys_rpl_recovery_rank, SHOW_SYS},
#ifdef HAVE_QUERY_CACHE #ifdef HAVE_QUERY_CACHE
{sys_query_cache_limit.name,(char*) &sys_query_cache_limit, SHOW_SYS}, {sys_query_cache_limit.name,(char*) &sys_query_cache_limit, SHOW_SYS},
......
This diff is collapsed.
...@@ -92,12 +92,6 @@ typedef struct st_relay_log_info ...@@ -92,12 +92,6 @@ typedef struct st_relay_log_info
cur_log_fd - file descriptor of the current read relay log cur_log_fd - file descriptor of the current read relay log
*/ */
File info_fd,cur_log_fd; File info_fd,cur_log_fd;
/* name of current read relay log */
char relay_log_name[FN_REFLEN];
/* master log name corresponding to current read position */
char master_log_name[FN_REFLEN];
/* original log position of last processed event */
volatile my_off_t master_log_pos;
/* /*
Protected with internal locks. Protected with internal locks.
...@@ -142,20 +136,36 @@ typedef struct st_relay_log_info ...@@ -142,20 +136,36 @@ typedef struct st_relay_log_info
uint32 cur_log_old_open_count; uint32 cur_log_old_open_count;
/* /*
relay_log_pos - Current offset in the relay log. Let's call a group (of events) :
pending - In some cases we do not increment offset immediately - a transaction
after processing an event, because the following event or
needs to be processed atomically together with this one - an autocommiting query + its associated events (INSERT_ID,
such as: TIMESTAMP...)
We need these rli coordinates :
Intvar_event - sets auto_increment value - relay log name and position of the beginning of the group we currently are
Rand_event - sets the random seed executing. Needed to know where we have to restart when replication has
stopped in the middle of a group (which has been rolled back by the slave).
However, once both events have been processed, we need to - relay log name and position just after the event we have just
increment by the cumulative offset. 'pending' stores the executed. This event is part of the current group.
extra offset to be added to the position. Formerly we only had the immediately above coordinates, plus a 'pending'
variable, but this dealt wrong with the case of a transaction starting on a
relay log and finishing (commiting) on another relay log. Case which can
happen when, for example, the relay log gets rotated because of
max_binlog_size.
*/
char group_relay_log_name[FN_REFLEN];
ulonglong group_relay_log_pos;
char event_relay_log_name[FN_REFLEN];
ulonglong event_relay_log_pos;
/*
Original log name and position of the group we're currently executing
(whose coordinates are group_relay_log_name/pos in the relay log)
in the master's binlog. These concern the *group*, because in the master's
binlog the log_pos that comes with each event is the position of the
beginning of the group.
*/ */
ulonglong relay_log_pos, pending; char group_master_log_name[FN_REFLEN];
volatile my_off_t group_master_log_pos;
/* /*
Handling of the relay_log_space_limit optional constraint. Handling of the relay_log_space_limit optional constraint.
...@@ -193,38 +203,39 @@ typedef struct st_relay_log_info ...@@ -193,38 +203,39 @@ typedef struct st_relay_log_info
/* if not set, the value of other members of the structure are undefined */ /* if not set, the value of other members of the structure are undefined */
bool inited; bool inited;
volatile bool abort_slave, slave_running; volatile bool abort_slave, slave_running;
bool skip_log_purge;
bool inside_transaction;
st_relay_log_info(); st_relay_log_info();
~st_relay_log_info(); ~st_relay_log_info();
inline void inc_pending(ulonglong val)
inline void inc_event_relay_log_pos(ulonglong val)
{ {
pending += val; event_relay_log_pos+= val;
} }
/* TODO: this probably needs to be fixed */
inline void inc_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0) void inc_group_relay_log_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0)
{ {
if (!skip_lock) if (!skip_lock)
pthread_mutex_lock(&data_lock); pthread_mutex_lock(&data_lock);
relay_log_pos += val+pending; inc_event_relay_log_pos(val);
pending = 0; group_relay_log_pos= event_relay_log_pos;
if (log_pos) strmake(group_relay_log_name,event_relay_log_name,
master_log_pos = log_pos+ val; sizeof(group_relay_log_name)-1);
/*
If the slave does not support transactions and replicates a transaction,
users should not trust group_master_log_pos (which they can display with
SHOW SLAVE STATUS or read from relay-log.info), because to compute
group_master_log_pos the slave relies on log_pos stored in the master's
binlog, but if we are in a master's transaction these positions are always
the BEGIN's one (excepted for the COMMIT), so group_master_log_pos does
not advance as it should on the non-transactional slave (it advances by
big leaps, whereas it should advance by small leaps).
*/
if (log_pos) // 3.23 binlogs don't have log_posx
group_master_log_pos= log_pos+ val;
pthread_cond_broadcast(&data_cond); pthread_cond_broadcast(&data_cond);
if (!skip_lock) if (!skip_lock)
pthread_mutex_unlock(&data_lock); pthread_mutex_unlock(&data_lock);
} }
/*
thread safe read of position - not needed if we are in the slave thread,
but required otherwise as var is a longlong
*/
inline void read_pos(ulonglong& var)
{
pthread_mutex_lock(&data_lock);
var = relay_log_pos;
pthread_mutex_unlock(&data_lock);
}
int wait_for_pos(THD* thd, String* log_name, longlong log_pos, int wait_for_pos(THD* thd, String* log_name, longlong log_pos,
longlong timeout); longlong timeout);
...@@ -334,7 +345,7 @@ typedef struct st_table_rule_ent ...@@ -334,7 +345,7 @@ typedef struct st_table_rule_ent
#define TABLE_RULE_ARR_SIZE 16 #define TABLE_RULE_ARR_SIZE 16
#define MAX_SLAVE_ERRMSG 1024 #define MAX_SLAVE_ERRMSG 1024
#define RPL_LOG_NAME (rli->master_log_name[0] ? rli->master_log_name :\ #define RPL_LOG_NAME (rli->group_master_log_name[0] ? rli->group_master_log_name :\
"FIRST") "FIRST")
#define IO_RPL_LOG_NAME (mi->master_log_name[0] ? mi->master_log_name :\ #define IO_RPL_LOG_NAME (mi->master_log_name[0] ? mi->master_log_name :\
"FIRST") "FIRST")
......
...@@ -145,10 +145,12 @@ class MYSQL_LOG { ...@@ -145,10 +145,12 @@ class MYSQL_LOG {
int generate_new_name(char *new_name,const char *old_name); int generate_new_name(char *new_name,const char *old_name);
void make_log_name(char* buf, const char* log_ident); void make_log_name(char* buf, const char* log_ident);
bool is_active(const char* log_file_name); bool is_active(const char* log_file_name);
int update_log_index(LOG_INFO* linfo); int update_log_index(LOG_INFO* linfo, bool need_update_threads);
int purge_logs(THD* thd, const char* to_log); int purge_logs(const char *to_log, bool included,
int purge_logs_before_date(THD* thd, time_t purge_time); bool need_mutex, bool need_update_threads,
int purge_first_log(struct st_relay_log_info* rli); ulonglong *decrease_log_space);
int purge_logs_before_date(time_t purge_time);
int purge_first_log(struct st_relay_log_info* rli, bool included);
bool reset_logs(THD* thd); bool reset_logs(THD* thd);
// if we are exiting, we also want to close the index file // if we are exiting, we also want to close the index file
void close(bool exiting = 0); void close(bool exiting = 0);
......
...@@ -3980,7 +3980,7 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables) ...@@ -3980,7 +3980,7 @@ bool reload_acl_and_cache(THD *thd, ulong options, TABLE_LIST *tables)
{ {
long purge_time= time(0) - expire_logs_days*24*60*60; long purge_time= time(0) - expire_logs_days*24*60*60;
if (purge_time >= 0) if (purge_time >= 0)
mysql_bin_log.purge_logs_before_date(thd, purge_time); mysql_bin_log.purge_logs_before_date(purge_time);
} }
#endif #endif
mysql_slow_log.new_file(1); mysql_slow_log.new_file(1);
......
...@@ -292,7 +292,7 @@ int purge_master_logs(THD* thd, const char* to_log) ...@@ -292,7 +292,7 @@ int purge_master_logs(THD* thd, const char* to_log)
char search_file_name[FN_REFLEN]; char search_file_name[FN_REFLEN];
mysql_bin_log.make_log_name(search_file_name, to_log); mysql_bin_log.make_log_name(search_file_name, to_log);
int res = mysql_bin_log.purge_logs(thd, search_file_name); int res = mysql_bin_log.purge_logs(search_file_name, 0, 1, 1, NULL);
return purge_error_message(thd, res); return purge_error_message(thd, res);
} }
...@@ -300,7 +300,7 @@ int purge_master_logs(THD* thd, const char* to_log) ...@@ -300,7 +300,7 @@ int purge_master_logs(THD* thd, const char* to_log)
int purge_master_logs_before_date(THD* thd, time_t purge_time) int purge_master_logs_before_date(THD* thd, time_t purge_time)
{ {
int res = mysql_bin_log.purge_logs_before_date(thd, purge_time); int res = mysql_bin_log.purge_logs_before_date(purge_time);
return purge_error_message(thd ,res); return purge_error_message(thd ,res);
} }
...@@ -776,24 +776,25 @@ int reset_slave(THD *thd, MASTER_INFO* mi) ...@@ -776,24 +776,25 @@ int reset_slave(THD *thd, MASTER_INFO* mi)
error=1; error=1;
goto err; goto err;
} }
//delete relay logs, clear relay log coordinates // delete relay logs, clear relay log coordinates
if ((error= purge_relay_logs(&mi->rli, thd, if ((error= purge_relay_logs(&mi->rli, thd,
1 /* just reset */, 1 /* just reset */,
&errmsg))) &errmsg)))
goto err; goto err;
//Clear master's log coordinates (only for good display of SHOW SLAVE STATUS) // Clear master's log coordinates (only for good display of SHOW SLAVE STATUS)
mi->master_log_name[0]= 0; mi->master_log_name[0]= 0;
mi->master_log_pos= BIN_LOG_HEADER_SIZE; mi->master_log_pos= BIN_LOG_HEADER_SIZE;
//close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0 // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
end_master_info(mi); end_master_info(mi);
//and delete these two files // and delete these two files
fn_format(fname, master_info_file, mysql_data_home, "", 4+32); fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
{ {
error=1; error=1;
goto err; goto err;
} }
// delete relay_log_info_file
fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32); fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME))) if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
{ {
...@@ -874,7 +875,6 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -874,7 +875,6 @@ int change_master(THD* thd, MASTER_INFO* mi)
// if we change host or port, we must reset the postion // if we change host or port, we must reset the postion
mi->master_log_name[0] = 0; mi->master_log_name[0] = 0;
mi->master_log_pos= BIN_LOG_HEADER_SIZE; mi->master_log_pos= BIN_LOG_HEADER_SIZE;
mi->rli.pending = 0;
} }
if (lex_mi->log_file_name) if (lex_mi->log_file_name)
...@@ -883,7 +883,6 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -883,7 +883,6 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->pos) if (lex_mi->pos)
{ {
mi->master_log_pos= lex_mi->pos; mi->master_log_pos= lex_mi->pos;
mi->rli.pending = 0;
} }
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos)); DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
...@@ -901,20 +900,22 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -901,20 +900,22 @@ int change_master(THD* thd, MASTER_INFO* mi)
if (lex_mi->relay_log_name) if (lex_mi->relay_log_name)
{ {
need_relay_log_purge= 0; need_relay_log_purge= 0;
strmake(mi->rli.relay_log_name,lex_mi->relay_log_name, strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
sizeof(mi->rli.relay_log_name)-1); sizeof(mi->rli.group_relay_log_name)-1);
strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
sizeof(mi->rli.event_relay_log_name)-1);
} }
if (lex_mi->relay_log_pos) if (lex_mi->relay_log_pos)
{ {
need_relay_log_purge= 0; need_relay_log_purge= 0;
mi->rli.relay_log_pos=lex_mi->relay_log_pos; mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
} }
flush_master_info(mi); flush_master_info(mi);
if (need_relay_log_purge) if (need_relay_log_purge)
{ {
mi->rli.skip_log_purge= 0; relay_log_purge= 1;
thd->proc_info="purging old relay logs"; thd->proc_info="purging old relay logs";
if (purge_relay_logs(&mi->rli, thd, if (purge_relay_logs(&mi->rli, thd,
0 /* not only reset, but also reinit */, 0 /* not only reset, but also reinit */,
...@@ -928,11 +929,11 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -928,11 +929,11 @@ int change_master(THD* thd, MASTER_INFO* mi)
else else
{ {
const char* msg; const char* msg;
mi->rli.skip_log_purge= 1; relay_log_purge= 0;
/* Relay log is already initialized */ /* Relay log is already initialized */
if (init_relay_log_pos(&mi->rli, if (init_relay_log_pos(&mi->rli,
mi->rli.relay_log_name, mi->rli.group_relay_log_name,
mi->rli.relay_log_pos, mi->rli.group_relay_log_pos,
0 /*no data lock*/, 0 /*no data lock*/,
&msg)) &msg))
{ {
...@@ -941,12 +942,12 @@ int change_master(THD* thd, MASTER_INFO* mi) ...@@ -941,12 +942,12 @@ int change_master(THD* thd, MASTER_INFO* mi)
DBUG_RETURN(1); DBUG_RETURN(1);
} }
} }
mi->rli.master_log_pos = mi->master_log_pos; mi->rli.group_master_log_pos = mi->master_log_pos;
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos)); DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
strmake(mi->rli.master_log_name,mi->master_log_name, strmake(mi->rli.group_master_log_name,mi->master_log_name,
sizeof(mi->rli.master_log_name)-1); sizeof(mi->rli.group_master_log_name)-1);
if (!mi->rli.master_log_name[0]) // uninitialized case if (!mi->rli.group_master_log_name[0]) // uninitialized case
mi->rli.master_log_pos=0; mi->rli.group_master_log_pos=0;
pthread_mutex_lock(&mi->rli.data_lock); pthread_mutex_lock(&mi->rli.data_lock);
mi->rli.abort_pos_wait++; mi->rli.abort_pos_wait++;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment