Commit 99c6f2fe authored by unknown's avatar unknown

work to enable reading 3.23 logs - not yet finished

moved fail-safe replication routines from sql_repl.cc to repl_failsafe.cc
write start event only in the first log


client/mysqlbinlog.cc:
  work to enable reading 3.23 logs
libmysql/Makefile.shared:
  added mf_iocache2 to libmysqlclient - needed for mysqlbinlog
mysql-test/mysql-test-run.sh:
  added --start-and-exit
mysql-test/r/rpl000002.result:
  result clean-up
mysql-test/r/rpl000016.result:
  result update
mysql-test/r/rpl_log.result:
  result update
mysql-test/t/rpl000016.test:
  test cleanup
mysys/mf_iocache.c:
  fixed new bug
sql/log.cc:
  write start event only on server start or after reset master
sql/log_event.cc:
  work to enable reading 3.23 log format
sql/log_event.h:
  work to enable reading 3.23 format
sql/repl_failsafe.cc:
  code restructuring
sql/repl_failsafe.h:
  re-organized code
sql/slave.cc:
  check master version
sql/slave.h:
  old_format member
sql/sql_class.h:
  allow user to specify io cache type
  need_start_event member to allow writing start event only in the first log
sql/sql_parse.cc:
  code re-organization
sql/sql_repl.cc:
  code reorganization
sql/sql_repl.h:
  reorganized code
parent e6676493
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
#include <time.h> #include <time.h>
#include "log_event.h" #include "log_event.h"
#define PROBE_HEADER_LEN (4+EVENT_LEN_OFFSET)
#define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_LOCAL_FILES) #define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_LOCAL_FILES)
char server_version[SERVER_VERSION_LENGTH]; char server_version[SERVER_VERSION_LENGTH];
...@@ -288,6 +290,52 @@ static void dump_remote_table(NET* net, const char* db, const char* table) ...@@ -288,6 +290,52 @@ static void dump_remote_table(NET* net, const char* db, const char* table)
} }
} }
static int check_master_version(MYSQL* mysql)
{
MYSQL_RES* res = 0;
MYSQL_ROW row;
const char* version;
int old_format = 0;
if (mysql_query(mysql, "SELECT VERSION()")
|| !(res = mysql_store_result(mysql)))
{
mysql_close(mysql);
die("Error checking master version: %s",
mysql_error(mysql));
}
if (!(row = mysql_fetch_row(res)))
{
mysql_free_result(res);
mysql_close(mysql);
die("Master returned no rows for SELECT VERSION()");
return 1;
}
if (!(version = row[0]))
{
mysql_free_result(res);
mysql_close(mysql);
die("Master reported NULL for the version");
}
switch (*version)
{
case '3':
old_format = 1;
break;
case '4':
old_format = 0;
break;
default:
sql_print_error("Master reported unrecognized MySQL version '%s'",
version);
mysql_free_result(res);
mysql_close(mysql);
return 1;
}
mysql_free_result(res);
return old_format;
}
static void dump_remote_log_entries(const char* logname) static void dump_remote_log_entries(const char* logname)
{ {
...@@ -295,6 +343,9 @@ static void dump_remote_log_entries(const char* logname) ...@@ -295,6 +343,9 @@ static void dump_remote_log_entries(const char* logname)
char last_db[FN_REFLEN+1] = ""; char last_db[FN_REFLEN+1] = "";
uint len; uint len;
NET* net = &mysql->net; NET* net = &mysql->net;
int old_format;
old_format = check_master_version(mysql);
if(!position) position = 4; // protect the innocent from spam if(!position) position = 4; // protect the innocent from spam
if (position < 4) if (position < 4)
{ {
...@@ -307,7 +358,7 @@ static void dump_remote_log_entries(const char* logname) ...@@ -307,7 +358,7 @@ static void dump_remote_log_entries(const char* logname)
len = (uint) strlen(logname); len = (uint) strlen(logname);
int4store(buf + 6, 0); int4store(buf + 6, 0);
memcpy(buf + 10, logname,len); memcpy(buf + 10, logname,len);
if(simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
die("Error sending the log dump command"); die("Error sending the log dump command");
for(;;) for(;;)
...@@ -322,7 +373,7 @@ static void dump_remote_log_entries(const char* logname) ...@@ -322,7 +373,7 @@ static void dump_remote_log_entries(const char* logname)
len, net->read_pos[5])); len, net->read_pos[5]));
Log_event * ev = Log_event::read_log_event( Log_event * ev = Log_event::read_log_event(
(const char*) net->read_pos + 1 , (const char*) net->read_pos + 1 ,
len - 1, &error); len - 1, &error, old_format);
if (ev) if (ev)
{ {
ev->print(result_file, short_form, last_db); ev->print(result_file, short_form, last_db);
...@@ -335,12 +386,34 @@ static void dump_remote_log_entries(const char* logname) ...@@ -335,12 +386,34 @@ static void dump_remote_log_entries(const char* logname)
} }
} }
static int check_header (IO_CACHE* file)
{
char buf[PROBE_HEADER_LEN];
int old_format;
my_off_t pos = my_b_tell(file);
my_b_seek(file, (my_off_t)0);
if (my_b_read(file, buf, sizeof(buf)))
die("Failed reading header");
if (buf[EVENT_TYPE_OFFSET+4] == START_EVENT)
{
uint event_len;
event_len = uint4korr(buf + EVENT_LEN_OFFSET + 4);
old_format = (event_len < LOG_EVENT_HEADER_LEN + START_HEADER_LEN);
}
else
old_format = 0;
my_b_seek(file, pos);
return old_format;
}
static void dump_local_log_entries(const char* logname) static void dump_local_log_entries(const char* logname)
{ {
File fd = -1; File fd = -1;
IO_CACHE cache,*file= &cache; IO_CACHE cache,*file= &cache;
ulonglong rec_count = 0; ulonglong rec_count = 0;
char last_db[FN_REFLEN+1] = ""; char last_db[FN_REFLEN+1] = "";
bool old_format = 0;
if (logname && logname[0] != '-') if (logname && logname[0] != '-')
{ {
...@@ -349,12 +422,14 @@ static void dump_local_log_entries(const char* logname) ...@@ -349,12 +422,14 @@ static void dump_local_log_entries(const char* logname)
if (init_io_cache(file, fd, 0, READ_CACHE, (my_off_t) position, 0, if (init_io_cache(file, fd, 0, READ_CACHE, (my_off_t) position, 0,
MYF(MY_WME | MY_NABP))) MYF(MY_WME | MY_NABP)))
exit(1); exit(1);
old_format = check_header(file);
} }
else else
{ {
if (init_io_cache(file, fileno(result_file), 0, READ_CACHE, (my_off_t) 0, if (init_io_cache(file, fileno(result_file), 0, READ_CACHE, (my_off_t) 0,
0, MYF(MY_WME | MY_NABP | MY_DONT_CHECK_FILESIZE))) 0, MYF(MY_WME | MY_NABP | MY_DONT_CHECK_FILESIZE)))
exit(1); exit(1);
old_format = check_header(file);
if (position) if (position)
{ {
/* skip 'position' characters from stdout */ /* skip 'position' characters from stdout */
...@@ -385,7 +460,7 @@ static void dump_local_log_entries(const char* logname) ...@@ -385,7 +460,7 @@ static void dump_local_log_entries(const char* logname)
char llbuff[21]; char llbuff[21];
my_off_t old_off = my_b_tell(file); my_off_t old_off = my_b_tell(file);
Log_event* ev = Log_event::read_log_event(file); Log_event* ev = Log_event::read_log_event(file, old_format);
if (!ev) if (!ev)
{ {
if (file->error) if (file->error)
......
...@@ -55,7 +55,8 @@ mysysobjects1 = my_init.lo my_static.lo my_malloc.lo my_realloc.lo \ ...@@ -55,7 +55,8 @@ mysysobjects1 = my_init.lo my_static.lo my_malloc.lo my_realloc.lo \
mf_loadpath.lo my_pthread.lo my_thr_init.lo \ mf_loadpath.lo my_pthread.lo my_thr_init.lo \
thr_mutex.lo mulalloc.lo string.lo default.lo \ thr_mutex.lo mulalloc.lo string.lo default.lo \
my_compress.lo array.lo my_once.lo list.lo my_net.lo \ my_compress.lo array.lo my_once.lo list.lo my_net.lo \
charset.lo hash.lo mf_iocache.lo my_seek.lo \ charset.lo hash.lo mf_iocache.lo \
mf_iocache2.lo my_seek.lo \
my_pread.lo mf_cache.lo my_vsnprintf.lo md5.lo my_pread.lo mf_cache.lo my_vsnprintf.lo md5.lo
# Not needed in the minimum library # Not needed in the minimum library
......
...@@ -168,6 +168,9 @@ while test $# -gt 0; do ...@@ -168,6 +168,9 @@ while test $# -gt 0; do
USE_MANAGER=1 USE_MANAGER=1
USE_RUNNING_SERVER= USE_RUNNING_SERVER=
;; ;;
--start-and-exit)
START_AND_EXIT=1
;;
--skip-innobase) --skip-innobase)
EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT --skip-innobase" EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT --skip-innobase"
EXTRA_SLAVE_MYSQLD_OPT="$EXTRA_SLAVE_MYSQLD_OPT --skip-innobase" ;; EXTRA_SLAVE_MYSQLD_OPT="$EXTRA_SLAVE_MYSQLD_OPT --skip-innobase" ;;
...@@ -1091,6 +1094,10 @@ then ...@@ -1091,6 +1094,10 @@ then
mysql_loadstd mysql_loadstd
fi fi
if [ "x$START_AND_EXIT" = "x1" ] ; then
echo "Servers started, exiting"
exit
fi
$ECHO "Starting Tests" $ECHO "Starting Tests"
......
...@@ -16,7 +16,7 @@ n ...@@ -16,7 +16,7 @@ n
2002 2002
show slave hosts; show slave hosts;
Server_id Host Port Rpl_recovery_rank Master_id Server_id Host Port Rpl_recovery_rank Master_id
2 127.0.0.1 9307 2 1 2 127.0.0.1 $SLAVE_MYPORT 2 1
drop table t1; drop table t1;
slave stop; slave stop;
drop table if exists t2; drop table if exists t2;
......
...@@ -15,7 +15,7 @@ create table t1 (s text); ...@@ -15,7 +15,7 @@ create table t1 (s text);
insert into t1 values('Could not break slave'),('Tried hard'); insert into t1 values('Could not break slave'),('Tried hard');
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root 9999 60 master-bin.001 234 Yes 0 0 3 127.0.0.1 root $MASTER_MYPORT 60 master-bin.001 234 Yes 0 0 3
select * from t1; select * from t1;
s s
Could not break slave Could not break slave
...@@ -42,7 +42,7 @@ master-bin.003 ...@@ -42,7 +42,7 @@ master-bin.003
insert into t2 values (65); insert into t2 values (65);
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root 9999 60 master-bin.003 202 Yes 0 0 3 127.0.0.1 root $MASTER_MYPORT 60 master-bin.003 127 Yes 0 0 2
select * from t2; select * from t2;
m m
34 34
...@@ -60,12 +60,12 @@ master-bin.005 ...@@ -60,12 +60,12 @@ master-bin.005
master-bin.006 master-bin.006
show master status; show master status;
File Position Binlog_do_db Binlog_ignore_db File Position Binlog_do_db Binlog_ignore_db
master-bin.006 710 master-bin.006 382
slave stop; slave stop;
slave start; slave start;
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root 9999 60 master-bin.006 710 Yes 0 0 11 127.0.0.1 root $MASTER_MYPORT 60 master-bin.006 382 Yes 0 0 6
lock tables t3 read; lock tables t3 read;
select count(*) from t3 where n >= 4; select count(*) from t3 where n >= 4;
count(*) count(*)
......
...@@ -16,7 +16,7 @@ load data infile '../../std_data/words.dat' into table t1; ...@@ -16,7 +16,7 @@ load data infile '../../std_data/words.dat' into table t1;
drop table t1; drop table t1;
show binlog events; show binlog events;
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 master-bin.001 4 Start 1 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1 master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
...@@ -41,7 +41,7 @@ insert into t1 values (1); ...@@ -41,7 +41,7 @@ insert into t1 values (1);
drop table t1; drop table t1;
show binlog events; show binlog events;
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 master-bin.001 4 Start 1 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1 master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
...@@ -54,10 +54,9 @@ master-bin.001 627 Rotate 1 10 master-bin.002;pos=4 ...@@ -54,10 +54,9 @@ master-bin.001 627 Rotate 1 10 master-bin.002;pos=4
master-bin.001 668 Stop 1 11 master-bin.001 668 Stop 1 11
show binlog events in 'master-bin.002'; show binlog events in 'master-bin.002';
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 master-bin.002 4 Query 1 1 use test; create table t1 (n int)
master-bin.002 79 Query 1 2 use test; create table t1 (n int) master-bin.002 62 Query 1 2 use test; insert into t1 values (1)
master-bin.002 137 Query 1 3 use test; insert into t1 values (1) master-bin.002 122 Query 1 3 use test; drop table t1
master-bin.002 197 Query 1 4 use test; drop table t1
show master logs; show master logs;
Log_name Log_name
master-bin.001 master-bin.001
...@@ -69,7 +68,7 @@ slave-bin.001 ...@@ -69,7 +68,7 @@ slave-bin.001
slave-bin.002 slave-bin.002
show binlog events in 'slave-bin.001' from 4; show binlog events in 'slave-bin.001' from 4;
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 slave-bin.001 4 Start 2 1 Server ver: 4.0.1-alpha-debug-log, Binlog ver: 2
slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4 slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4
slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key) slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
slave-bin.001 225 Intvar 1 3 INSERT_ID=1 slave-bin.001 225 Intvar 1 3 INSERT_ID=1
...@@ -83,14 +82,13 @@ slave-bin.001 689 Rotate 1 4 slave-bin.002;pos=4; forced by master ...@@ -83,14 +82,13 @@ slave-bin.001 689 Rotate 1 4 slave-bin.002;pos=4; forced by master
slave-bin.001 729 Stop 2 5 slave-bin.001 729 Stop 2 5
show binlog events in 'slave-bin.002' from 4; show binlog events in 'slave-bin.002' from 4;
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 slave-bin.002 4 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
slave-bin.002 79 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4 slave-bin.002 57 Query 1 1 use test; create table t1 (n int)
slave-bin.002 132 Query 1 2 use test; create table t1 (n int) slave-bin.002 115 Query 1 2 use test; insert into t1 values (1)
slave-bin.002 190 Query 1 3 use test; insert into t1 values (1) slave-bin.002 175 Query 1 3 use test; drop table t1
slave-bin.002 250 Query 1 4 use test; drop table t1
show slave status; show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 245 Yes 0 0 4 127.0.0.1 root $MASTER_MYPORT 1 master-bin.002 170 Yes 0 0 3
show new master for slave with master_log_file='master-bin.001' and show new master for slave with master_log_file='master-bin.001' and
master_log_pos=4 and master_log_seq=1 and master_server_id=1; master_log_pos=4 and master_log_seq=1 and master_server_id=1;
Log_name Log_pos Log_name Log_pos
...@@ -106,8 +104,8 @@ slave-bin.001 439 ...@@ -106,8 +104,8 @@ slave-bin.001 439
show new master for slave with master_log_file='master-bin.002' and show new master for slave with master_log_file='master-bin.002' and
master_log_pos=4 and master_log_seq=1 and master_server_id=1; master_log_pos=4 and master_log_seq=1 and master_server_id=1;
Log_name Log_pos Log_name Log_pos
slave-bin.002 132 slave-bin.002 57
show new master for slave with master_log_file='master-bin.002' and show new master for slave with master_log_file='master-bin.002' and
master_log_pos=137 and master_log_seq=3 and master_server_id=1; master_log_pos=137 and master_log_seq=3 and master_server_id=1;
Log_name Log_pos Log_name Log_pos
slave-bin.002 250 slave-bin.002 223
...@@ -23,7 +23,6 @@ insert into t1 values('Could not break slave'),('Tried hard'); ...@@ -23,7 +23,6 @@ insert into t1 values('Could not break slave'),('Tried hard');
save_master_pos; save_master_pos;
connection slave; connection slave;
sync_with_master; sync_with_master;
--replace_result 9306 9999 3334 9999 3335 9999
show slave status; show slave status;
select * from t1; select * from t1;
connection master; connection master;
...@@ -70,7 +69,6 @@ insert into t2 values (65); ...@@ -70,7 +69,6 @@ insert into t2 values (65);
save_master_pos; save_master_pos;
connection slave; connection slave;
sync_with_master; sync_with_master;
--replace_result 9306 9999 3334 9999 3335 9999
show slave status; show slave status;
select * from t2; select * from t2;
connection master; connection master;
...@@ -92,7 +90,6 @@ connection slave; ...@@ -92,7 +90,6 @@ connection slave;
slave stop; slave stop;
slave start; slave start;
sync_with_master; sync_with_master;
--replace_result 9306 9999 3334 9999 3335 9999
show slave status; show slave status;
# because of concurrent insert, the table may not be up to date # because of concurrent insert, the table may not be up to date
# if we do not lock # if we do not lock
......
...@@ -166,7 +166,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, ...@@ -166,7 +166,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->seek_not_done= test(file >= 0 && type != READ_FIFO && info->seek_not_done= test(file >= 0 && type != READ_FIFO &&
type != READ_NET); type != READ_NET);
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP); info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
info->rc_request_pos=info->rc_pos=info->buffer; info->rc_request_pos=info->rc_pos= info->write_pos = info->buffer;
info->write_pos = info->write_end = 0;
if (type == SEQ_READ_APPEND) if (type == SEQ_READ_APPEND)
{ {
info->append_read_pos = info->write_pos = info->append_buffer; info->append_read_pos = info->write_pos = info->append_buffer;
...@@ -308,6 +309,11 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, ...@@ -308,6 +309,11 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
{ {
info->append_read_pos = info->write_pos = info->append_buffer; info->append_read_pos = info->write_pos = info->append_buffer;
} }
if (!info->write_pos)
info->write_pos = info->buffer;
if (!info->write_end)
info->write_end = info->buffer+info->buffer_length-
(seek_offset & (IO_SIZE-1));
info->type=type; info->type=type;
info->error=0; info->error=0;
init_read_function(info,type); init_read_function(info,type);
......
...@@ -81,7 +81,8 @@ static int find_uniq_filename(char *name) ...@@ -81,7 +81,8 @@ static int find_uniq_filename(char *name)
MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1), MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1),
name(0), log_type(LOG_CLOSED),write_error(0), name(0), log_type(LOG_CLOSED),write_error(0),
inited(0), log_seq(1), file_id(1),no_rotate(0) inited(0), log_seq(1), file_id(1),no_rotate(0),
need_start_event(1)
{ {
/* /*
We don't want to intialize LOCK_Log here as the thread system may We don't want to intialize LOCK_Log here as the thread system may
...@@ -136,9 +137,11 @@ bool MYSQL_LOG::open_index( int options) ...@@ -136,9 +137,11 @@ bool MYSQL_LOG::open_index( int options)
MYF(MY_WME))) < 0); MYF(MY_WME))) < 0);
} }
void MYSQL_LOG::init(enum_log_type log_type_arg) void MYSQL_LOG::init(enum_log_type log_type_arg,
enum cache_type io_cache_type_arg)
{ {
log_type = log_type_arg; log_type = log_type_arg;
io_cache_type = io_cache_type_arg;
if (!inited) if (!inited)
{ {
inited=1; inited=1;
...@@ -184,7 +187,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -184,7 +187,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
if ((file=my_open(log_file_name,O_CREAT | O_APPEND | O_WRONLY | O_BINARY, if ((file=my_open(log_file_name,O_CREAT | O_APPEND | O_WRONLY | O_BINARY,
MYF(MY_WME | ME_WAITTANG))) < 0 || MYF(MY_WME | ME_WAITTANG))) < 0 ||
init_io_cache(&log_file, file, IO_SIZE, WRITE_CACHE, init_io_cache(&log_file, file, IO_SIZE, io_cache_type,
my_tell(file,MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP))) my_tell(file,MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP)))
goto err; goto err;
...@@ -220,6 +223,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -220,6 +223,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
} }
else if (log_type == LOG_BIN) else if (log_type == LOG_BIN)
{ {
bool error;
/* /*
Explanation of the boolean black magic: Explanation of the boolean black magic:
if we are supposed to write magic number try write if we are supposed to write magic number try write
...@@ -232,10 +236,13 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -232,10 +236,13 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
goto err; goto err;
log_seq = 1; log_seq = 1;
if (need_start_event)
{
Start_log_event s; Start_log_event s;
bool error;
s.set_log_seq(0, this); s.set_log_seq(0, this);
s.write(&log_file); s.write(&log_file);
need_start_event=0;
}
flush_io_cache(&log_file); flush_io_cache(&log_file);
pthread_mutex_lock(&LOCK_index); pthread_mutex_lock(&LOCK_index);
error=(my_write(index_file, (byte*) log_file_name, strlen(log_file_name), error=(my_write(index_file, (byte*) log_file_name, strlen(log_file_name),
...@@ -715,7 +722,8 @@ bool MYSQL_LOG::write(Log_event* event_info) ...@@ -715,7 +722,8 @@ bool MYSQL_LOG::write(Log_event* event_info)
file == &log_file && flush_io_cache(file)) file == &log_file && flush_io_cache(file))
goto err; goto err;
error=0; error=0;
should_rotate = (file == &log_file && my_b_tell(file) >= max_binlog_size); should_rotate = (file == &log_file &&
(uint)my_b_tell(file) >= max_binlog_size);
err: err:
if (error) if (error)
{ {
......
...@@ -149,12 +149,21 @@ static void cleanup_load_tmpdir() ...@@ -149,12 +149,21 @@ static void cleanup_load_tmpdir()
#endif #endif
Log_event::Log_event(const char* buf):cached_event_len(0),temp_buf(0) Log_event::Log_event(const char* buf, bool old_format):
cached_event_len(0),temp_buf(0)
{ {
when = uint4korr(buf); when = uint4korr(buf);
server_id = uint4korr(buf + SERVER_ID_OFFSET); server_id = uint4korr(buf + SERVER_ID_OFFSET);
if (old_format)
{
log_seq=0;
flags=0;
}
else
{
log_seq = uint4korr(buf + LOG_SEQ_OFFSET); log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
flags = uint2korr(buf + FLAGS_OFFSET); flags = uint2korr(buf + FLAGS_OFFSET);
}
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
thd = 0; thd = 0;
#endif #endif
...@@ -441,17 +450,24 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, ...@@ -441,17 +450,24 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
#define UNLOCK_MUTEX #define UNLOCK_MUTEX
#endif #endif
#ifndef MYSQL_CLIENT
#define LOCK_MUTEX if(log_lock) pthread_mutex_lock(log_lock);
#else
#define LOCK_MUTEX
#endif
// allocates memory - the caller is responsible for clean-up // allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock) Log_event* Log_event::read_log_event(IO_CACHE* file,
pthread_mutex_t* log_lock,
bool old_format)
#else #else
Log_event* Log_event::read_log_event(IO_CACHE* file) Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
#endif #endif
{ {
char head[LOG_EVENT_HEADER_LEN]; char head[LOG_EVENT_HEADER_LEN];
#ifndef MYSQL_CLIENT LOCK_MUTEX;
if(log_lock) pthread_mutex_lock(log_lock);
#endif
if (my_b_read(file, (byte *) head, sizeof(head))) if (my_b_read(file, (byte *) head, sizeof(head)))
{ {
UNLOCK_MUTEX; UNLOCK_MUTEX;
...@@ -489,7 +505,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file) ...@@ -489,7 +505,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file)
error = "read error"; error = "read error";
goto err; goto err;
} }
if ((res = read_log_event(buf, data_len, &error))) if ((res = read_log_event(buf, data_len, &error, old_format)))
res->register_temp_buf(buf); res->register_temp_buf(buf);
err: err:
UNLOCK_MUTEX; UNLOCK_MUTEX;
...@@ -502,7 +518,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file) ...@@ -502,7 +518,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file)
} }
Log_event* Log_event::read_log_event(const char* buf, int event_len, Log_event* Log_event::read_log_event(const char* buf, int event_len,
const char **error) const char **error, bool old_format)
{ {
if (event_len < EVENT_LEN_OFFSET || if (event_len < EVENT_LEN_OFFSET ||
(uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET)) (uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET))
...@@ -513,14 +529,14 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, ...@@ -513,14 +529,14 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
switch(buf[EVENT_TYPE_OFFSET]) switch(buf[EVENT_TYPE_OFFSET])
{ {
case QUERY_EVENT: case QUERY_EVENT:
ev = new Query_log_event(buf, event_len); ev = new Query_log_event(buf, event_len, old_format);
break; break;
case LOAD_EVENT: case LOAD_EVENT:
case NEW_LOAD_EVENT: case NEW_LOAD_EVENT:
ev = new Load_log_event(buf, event_len); ev = new Load_log_event(buf, event_len, old_format);
break; break;
case ROTATE_EVENT: case ROTATE_EVENT:
ev = new Rotate_log_event(buf, event_len); ev = new Rotate_log_event(buf, event_len, old_format);
break; break;
case SLAVE_EVENT: case SLAVE_EVENT:
ev = new Slave_log_event(buf, event_len); ev = new Slave_log_event(buf, event_len);
...@@ -538,13 +554,13 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, ...@@ -538,13 +554,13 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Execute_load_log_event(buf, event_len); ev = new Execute_load_log_event(buf, event_len);
break; break;
case START_EVENT: case START_EVENT:
ev = new Start_log_event(buf); ev = new Start_log_event(buf, old_format);
break; break;
case STOP_EVENT: case STOP_EVENT:
ev = new Stop_log_event(buf); ev = new Stop_log_event(buf, old_format);
break; break;
case INTVAR_EVENT: case INTVAR_EVENT:
ev = new Intvar_log_event(buf); ev = new Intvar_log_event(buf, old_format);
break; break;
default: default:
break; break;
...@@ -634,7 +650,8 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) ...@@ -634,7 +650,8 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
#endif /* #ifdef MYSQL_CLIENT */ #endif /* #ifdef MYSQL_CLIENT */
Start_log_event::Start_log_event(const char* buf) :Log_event(buf) Start_log_event::Start_log_event(const char* buf,
bool old_format) :Log_event(buf, old_format)
{ {
binlog_version = uint2korr(buf + LOG_EVENT_HEADER_LEN + binlog_version = uint2korr(buf + LOG_EVENT_HEADER_LEN +
ST_BINLOG_VER_OFFSET); ST_BINLOG_VER_OFFSET);
...@@ -652,8 +669,9 @@ int Start_log_event::write_data(IO_CACHE* file) ...@@ -652,8 +669,9 @@ int Start_log_event::write_data(IO_CACHE* file)
return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
} }
Rotate_log_event::Rotate_log_event(const char* buf, int event_len): Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
Log_event(buf),new_log_ident(NULL),alloced(0) bool old_format):
Log_event(buf, old_format),new_log_ident(NULL),alloced(0)
{ {
// the caller will ensure that event_len is what we have at // the caller will ensure that event_len is what we have at
// EVENT_LEN_OFFSET // EVENT_LEN_OFFSET
...@@ -695,8 +713,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, ...@@ -695,8 +713,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
} }
#endif #endif
Query_log_event::Query_log_event(const char* buf, int event_len): Query_log_event::Query_log_event(const char* buf, int event_len,
Log_event(buf),data_buf(0), query(NULL), db(NULL) bool old_format):
Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL)
{ {
if ((uint)event_len < QUERY_EVENT_OVERHEAD) if ((uint)event_len < QUERY_EVENT_OVERHEAD)
return; return;
...@@ -766,7 +785,8 @@ int Query_log_event::write_data(IO_CACHE* file) ...@@ -766,7 +785,8 @@ int Query_log_event::write_data(IO_CACHE* file)
my_b_write(file, (byte*) query, q_len)) ? -1 : 0; my_b_write(file, (byte*) query, q_len)) ? -1 : 0;
} }
Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf) Intvar_log_event::Intvar_log_event(const char* buf, bool old_format):
Log_event(buf, old_format)
{ {
buf += LOG_EVENT_HEADER_LEN; buf += LOG_EVENT_HEADER_LEN;
type = buf[I_TYPE_OFFSET]; type = buf[I_TYPE_OFFSET];
...@@ -1003,8 +1023,9 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, ...@@ -1003,8 +1023,9 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
// the caller must do buf[event_len] = 0 before he starts using the // the caller must do buf[event_len] = 0 before he starts using the
// constructed event // constructed event
Load_log_event::Load_log_event(const char* buf, int event_len): Load_log_event::Load_log_event(const char* buf, int event_len,
Log_event(buf),num_fields(0),fields(0), bool old_format):
Log_event(buf, old_format),num_fields(0),fields(0),
field_lens(0),field_block_len(0), field_lens(0),field_block_len(0),
table_name(0),db(0),fname(0) table_name(0),db(0),fname(0)
{ {
...@@ -1237,7 +1258,7 @@ void Slave_log_event::init_from_mem_pool(int data_size) ...@@ -1237,7 +1258,7 @@ void Slave_log_event::init_from_mem_pool(int data_size)
} }
Slave_log_event::Slave_log_event(const char* buf, int event_len): Slave_log_event::Slave_log_event(const char* buf, int event_len):
Log_event(buf),mem_pool(0),master_host(0) Log_event(buf,0),mem_pool(0),master_host(0)
{ {
event_len -= LOG_EVENT_HEADER_LEN; event_len -= LOG_EVENT_HEADER_LEN;
if(event_len < 0) if(event_len < 0)
...@@ -1291,7 +1312,7 @@ int Create_file_log_event::write_base(IO_CACHE* file) ...@@ -1291,7 +1312,7 @@ int Create_file_log_event::write_base(IO_CACHE* file)
} }
Create_file_log_event::Create_file_log_event(const char* buf, int len): Create_file_log_event::Create_file_log_event(const char* buf, int len):
Load_log_event(buf,0),fake_base(0),block(0) Load_log_event(buf,0,0),fake_base(0),block(0)
{ {
int block_offset; int block_offset;
if (copy_log_event(buf,len)) if (copy_log_event(buf,len))
...@@ -1347,7 +1368,7 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg, ...@@ -1347,7 +1368,7 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg,
#endif #endif
Append_block_log_event::Append_block_log_event(const char* buf, int len): Append_block_log_event::Append_block_log_event(const char* buf, int len):
Log_event(buf),block(0) Log_event(buf, 0),block(0)
{ {
if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD) if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
return; return;
...@@ -1399,7 +1420,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg): ...@@ -1399,7 +1420,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg):
#endif #endif
Delete_file_log_event::Delete_file_log_event(const char* buf, int len): Delete_file_log_event::Delete_file_log_event(const char* buf, int len):
Log_event(buf),file_id(0) Log_event(buf, 0),file_id(0)
{ {
if((uint)len < DELETE_FILE_EVENT_OVERHEAD) if((uint)len < DELETE_FILE_EVENT_OVERHEAD)
return; return;
...@@ -1446,7 +1467,7 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg): ...@@ -1446,7 +1467,7 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg):
#endif #endif
Execute_load_log_event::Execute_load_log_event(const char* buf,int len): Execute_load_log_event::Execute_load_log_event(const char* buf,int len):
Log_event(buf),file_id(0) Log_event(buf, 0),file_id(0)
{ {
if((uint)len < EXEC_LOAD_EVENT_OVERHEAD) if((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
return; return;
...@@ -1657,15 +1678,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) ...@@ -1657,15 +1678,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
int Start_log_event::exec_event(struct st_master_info* mi) int Start_log_event::exec_event(struct st_master_info* mi)
{ {
#ifdef TO_BE_DELETED if (!mi->old_format)
/* {
We can't close temporary files or cleanup the tmpdir here, becasue
someone may have just rotated the logs on the master.
We should only do this cleanup when we know the master restarted.
*/
close_temporary_tables(thd); close_temporary_tables(thd);
cleanup_load_tmpdir(); cleanup_load_tmpdir();
#endif }
return Log_event::exec_event(mi); return Log_event::exec_event(mi);
} }
...@@ -1866,7 +1883,9 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) ...@@ -1866,7 +1883,9 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
slave_print_error(my_errno, "Could not open file '%s'", fname); slave_print_error(my_errno, "Could not open file '%s'", fname);
goto err; goto err;
} }
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,0)) if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
(pthread_mutex_t*)0,
(bool)0))
|| lev->get_type_code() != NEW_LOAD_EVENT) || lev->get_type_code() != NEW_LOAD_EVENT)
{ {
slave_print_error(0, "File '%s' appears corrupted", fname); slave_print_error(0, "File '%s' appears corrupted", fname);
......
...@@ -242,7 +242,7 @@ class Log_event ...@@ -242,7 +242,7 @@ class Log_event
virtual Log_event_type get_type_code() = 0; virtual Log_event_type get_type_code() = 0;
virtual bool is_valid() = 0; virtual bool is_valid() = 0;
virtual bool get_cache_stmt() { return 0; } virtual bool get_cache_stmt() { return 0; }
Log_event(const char* buf); Log_event(const char* buf, bool old_format);
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Log_event(THD* thd_arg, uint16 flags_arg = 0); Log_event(THD* thd_arg, uint16 flags_arg = 0);
#endif #endif
...@@ -268,12 +268,14 @@ class Log_event ...@@ -268,12 +268,14 @@ class Log_event
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
// if mutex is 0, the read will proceed without mutex // if mutex is 0, the read will proceed without mutex
static Log_event* read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock); static Log_event* read_log_event(IO_CACHE* file,
pthread_mutex_t* log_lock,
bool old_format);
#else // avoid having to link mysqlbinlog against libpthread #else // avoid having to link mysqlbinlog against libpthread
static Log_event* read_log_event(IO_CACHE* file); static Log_event* read_log_event(IO_CACHE* file, bool old_format);
#endif #endif
static Log_event* read_log_event(const char* buf, int event_len, static Log_event* read_log_event(const char* buf, int event_len,
const char **error); const char **error, bool old_format);
const char* get_type_str(); const char* get_type_str();
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
...@@ -317,7 +319,7 @@ class Query_log_event: public Log_event ...@@ -317,7 +319,7 @@ class Query_log_event: public Log_event
bool get_cache_stmt() { return cache_stmt; } bool get_cache_stmt() { return cache_stmt; }
#endif #endif
Query_log_event(const char* buf, int event_len); Query_log_event(const char* buf, int event_len, bool old_format);
~Query_log_event() ~Query_log_event()
{ {
if (data_buf) if (data_buf)
...@@ -411,7 +413,7 @@ class Load_log_event: public Log_event ...@@ -411,7 +413,7 @@ class Load_log_event: public Log_event
int exec_event(NET* net, struct st_master_info* mi); int exec_event(NET* net, struct st_master_info* mi);
#endif #endif
Load_log_event(const char* buf, int event_len); Load_log_event(const char* buf, int event_len, bool old_format);
~Load_log_event() ~Load_log_event()
{ {
} }
...@@ -451,7 +453,7 @@ class Start_log_event: public Log_event ...@@ -451,7 +453,7 @@ class Start_log_event: public Log_event
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
} }
#endif #endif
Start_log_event(const char* buf); Start_log_event(const char* buf, bool old_format);
~Start_log_event() {} ~Start_log_event() {}
Log_event_type get_type_code() { return START_EVENT;} Log_event_type get_type_code() { return START_EVENT;}
int write_data(IO_CACHE* file); int write_data(IO_CACHE* file);
...@@ -479,7 +481,7 @@ class Intvar_log_event: public Log_event ...@@ -479,7 +481,7 @@ class Intvar_log_event: public Log_event
:Log_event(thd_arg),val(val_arg),type(type_arg) :Log_event(thd_arg),val(val_arg),type(type_arg)
{} {}
#endif #endif
Intvar_log_event(const char* buf); Intvar_log_event(const char* buf, bool old_format);
~Intvar_log_event() {} ~Intvar_log_event() {}
Log_event_type get_type_code() { return INTVAR_EVENT;} Log_event_type get_type_code() { return INTVAR_EVENT;}
const char* get_var_type_name(); const char* get_var_type_name();
...@@ -503,7 +505,8 @@ class Stop_log_event: public Log_event ...@@ -503,7 +505,8 @@ class Stop_log_event: public Log_event
Stop_log_event() :Log_event((THD*)0) Stop_log_event() :Log_event((THD*)0)
{} {}
#endif #endif
Stop_log_event(const char* buf):Log_event(buf) Stop_log_event(const char* buf, bool old_format):Log_event(buf,
old_format)
{ {
} }
~Stop_log_event() {} ~Stop_log_event() {}
...@@ -534,7 +537,7 @@ class Rotate_log_event: public Log_event ...@@ -534,7 +537,7 @@ class Rotate_log_event: public Log_event
alloced(0) alloced(0)
{} {}
#endif #endif
Rotate_log_event(const char* buf, int event_len); Rotate_log_event(const char* buf, int event_len, bool old_format);
~Rotate_log_event() ~Rotate_log_event()
{ {
if (alloced) if (alloced)
...@@ -686,7 +689,6 @@ class Execute_load_log_event: public Log_event ...@@ -686,7 +689,6 @@ class Execute_load_log_event: public Log_event
#endif #endif
}; };
#endif #endif
......
This diff is collapsed.
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
#define REPL_FAILSAFE_H #define REPL_FAILSAFE_H
#include "mysql.h" #include "mysql.h"
#include "my_sys.h"
#include "slave.h"
typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE, typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE,
RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER, RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER,
...@@ -19,4 +21,18 @@ pthread_handler_decl(handle_failsafe_rpl,arg); ...@@ -19,4 +21,18 @@ pthread_handler_decl(handle_failsafe_rpl,arg);
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status); void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status);
int find_recovery_captain(THD* thd, MYSQL* mysql); int find_recovery_captain(THD* thd, MYSQL* mysql);
int update_slave_list(MYSQL* mysql); int update_slave_list(MYSQL* mysql);
extern HASH slave_list;
int load_master_data(THD* thd);
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
int show_new_master(THD* thd);
int show_slave_hosts(THD* thd);
int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg);
void init_slave_list();
void end_slave_list();
int register_slave(THD* thd, uchar* packet, uint packet_length);
void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
#endif #endif
...@@ -61,6 +61,8 @@ static int safe_sleep(THD* thd, int sec); ...@@ -61,6 +61,8 @@ static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db, static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name); const char* table_name);
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi);
char* rewrite_db(char* db); char* rewrite_db(char* db);
static void free_table_ent(TABLE_RULE_ENT* e) static void free_table_ent(TABLE_RULE_ENT* e)
...@@ -333,6 +335,54 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val) ...@@ -333,6 +335,54 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
return 1; return 1;
} }
static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
{
MYSQL_RES* res;
MYSQL_ROW row;
const char* version;
const char* errmsg = 0;
if (mc_mysql_query(mysql, "SELECT VERSION()", 0)
|| !(res = mc_mysql_store_result(mysql)))
{
sql_print_error("Error checking master version: %s",
mc_mysql_error(mysql));
return 1;
}
if (!(row = mc_mysql_fetch_row(res)))
{
errmsg = "Master returned no rows for SELECT VERSION()";
goto err;
}
if (!(version = row[0]))
{
errmsg = "Master reported NULL for the version";
goto err;
}
switch (*version)
{
case '3':
mi->old_format = 1;
break;
case '4':
mi->old_format = 0;
break;
default:
errmsg = "Master reported unrecognized MySQL version";
goto err;
}
err:
if (res)
mc_mysql_free_result(res);
if (errmsg)
{
sql_print_error(errmsg);
return 1;
}
return 0;
}
static int create_table_from_dump(THD* thd, NET* net, const char* db, static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name) const char* table_name)
...@@ -580,7 +630,7 @@ int init_master_info(MASTER_INFO* mi) ...@@ -580,7 +630,7 @@ int init_master_info(MASTER_INFO* mi)
mi->inited = 1; mi->inited = 1;
// now change the cache from READ to WRITE - must do this // now change the cache from READ to WRITE - must do this
// before flush_master_info // before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE, 0L,0,1); reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
error=test(flush_master_info(mi)); error=test(flush_master_info(mi));
pthread_mutex_unlock(&mi->lock); pthread_mutex_unlock(&mi->lock);
return error; return error;
...@@ -943,12 +993,14 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -943,12 +993,14 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{ {
const char *error_msg; const char *error_msg;
Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1, Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
event_len, &error_msg); event_len, &error_msg,
mi->old_format);
if (ev) if (ev)
{ {
int type_code = ev->get_type_code(); int type_code = ev->get_type_code();
int exec_res; int exec_res;
if (ev->server_id == ::server_id || slave_skip_counter) if (ev->server_id == ::server_id ||
(slave_skip_counter && ev->get_type_code() != ROTATE_EVENT))
{ {
if(type_code == LOAD_EVENT) if(type_code == LOAD_EVENT)
skip_load_data_infile(net); skip_load_data_infile(net);
...@@ -1070,9 +1122,17 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) ...@@ -1070,9 +1122,17 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
// register ourselves with the master // register ourselves with the master
// if fails, this is not fatal - we just print the error message and go // if fails, this is not fatal - we just print the error message and go
// on with life // on with life
thd->proc_info = "Checking master version";
if (check_master_version(mysql, &glob_mi))
{
goto err;
}
if (!glob_mi.old_format)
{
thd->proc_info = "Registering slave on master"; thd->proc_info = "Registering slave on master";
register_slave_on_master(mysql); if (register_slave_on_master(mysql) || update_slave_list(mysql))
update_slave_list(mysql); goto err;
}
while (!slave_killed(thd)) while (!slave_killed(thd))
{ {
......
...@@ -23,8 +23,10 @@ typedef struct st_master_info ...@@ -23,8 +23,10 @@ typedef struct st_master_info
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t cond; pthread_cond_t cond;
bool inited; bool inited;
bool old_format; /* master binlog is in 3.23 format */
st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0) st_master_info():pending(0),fd(-1),last_log_seq(0),inited(0),
old_format(0)
{ {
host[0] = 0; user[0] = 0; password[0] = 0; host[0] = 0; user[0] = 0; password[0] = 0;
pthread_mutex_init(&lock, MY_MUTEX_INIT_FAST); pthread_mutex_init(&lock, MY_MUTEX_INIT_FAST);
......
...@@ -72,15 +72,18 @@ class MYSQL_LOG { ...@@ -72,15 +72,18 @@ class MYSQL_LOG {
// we should not try to rotate it or write any rotation events // we should not try to rotate it or write any rotation events
// the user should use FLUSH MASTER instead of FLUSH LOGS for // the user should use FLUSH MASTER instead of FLUSH LOGS for
// purging // purging
enum cache_type io_cache_type;
bool need_start_event;
friend class Log_event; friend class Log_event;
public: public:
MYSQL_LOG(); MYSQL_LOG();
~MYSQL_LOG(); ~MYSQL_LOG();
pthread_mutex_t* get_log_lock() { return &LOCK_log; } pthread_mutex_t* get_log_lock() { return &LOCK_log; }
void set_need_start_event() { need_start_event = 1; }
void set_index_file_name(const char* index_file_name = 0); void set_index_file_name(const char* index_file_name = 0);
void init(enum_log_type log_type_arg); void init(enum_log_type log_type_arg,
enum cache_type io_cache_type_arg = WRITE_CACHE);
void open(const char *log_name,enum_log_type log_type, void open(const char *log_name,enum_log_type log_type,
const char *new_name=0); const char *new_name=0);
void new_file(bool inside_mutex = 0); void new_file(bool inside_mutex = 0);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "mysql_priv.h" #include "mysql_priv.h"
#include "sql_acl.h" #include "sql_acl.h"
#include "sql_repl.h" #include "sql_repl.h"
#include "repl_failsafe.h"
#include <m_ctype.h> #include <m_ctype.h>
#include <thr_alarm.h> #include <thr_alarm.h>
#include <myisam.h> #include <myisam.h>
......
This diff is collapsed.
...@@ -15,7 +15,6 @@ typedef struct st_slave_info ...@@ -15,7 +15,6 @@ typedef struct st_slave_info
} SLAVE_INFO; } SLAVE_INFO;
extern bool opt_show_slave_auth_info, opt_old_rpl_compat; extern bool opt_show_slave_auth_info, opt_old_rpl_compat;
extern HASH slave_list;
extern char* master_host; extern char* master_host;
extern my_string opt_bin_logname, master_info_file; extern my_string opt_bin_logname, master_info_file;
extern uint32 server_id; extern uint32 server_id;
...@@ -27,26 +26,24 @@ extern int max_binlog_dump_events; ...@@ -27,26 +26,24 @@ extern int max_binlog_dump_events;
extern bool opt_sporadic_binlog_dump_fail; extern bool opt_sporadic_binlog_dump_fail;
#endif #endif
#ifdef SIGNAL_WITH_VIO_CLOSE
#define KICK_SLAVE { slave_thd->close_active_vio(); \
thr_alarm_kill(slave_real_id); }
#else
#define KICK_SLAVE thr_alarm_kill(slave_real_id);
#endif
File open_binlog(IO_CACHE *log, const char *log_file_name, File open_binlog(IO_CACHE *log, const char *log_file_name,
const char **errmsg); const char **errmsg);
int start_slave(THD* thd = 0, bool net_report = 1); int start_slave(THD* thd = 0, bool net_report = 1);
int stop_slave(THD* thd = 0, bool net_report = 1); int stop_slave(THD* thd = 0, bool net_report = 1);
int load_master_data(THD* thd);
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
int change_master(THD* thd); int change_master(THD* thd);
int show_new_master(THD* thd);
int show_slave_hosts(THD* thd);
int show_binlog_events(THD* thd); int show_binlog_events(THD* thd);
int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg);
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
const char* log_file_name2, ulonglong log_pos2); const char* log_file_name2, ulonglong log_pos2);
void reset_slave(); void reset_slave();
void reset_master(); void reset_master();
void init_slave_list();
void end_slave_list();
int register_slave(THD* thd, uchar* packet, uint packet_length);
void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
int purge_master_logs(THD* thd, const char* to_log); int purge_master_logs(THD* thd, const char* to_log);
bool log_in_use(const char* log_name); bool log_in_use(const char* log_name);
void adjust_linfo_offsets(my_off_t purge_offset); void adjust_linfo_offsets(my_off_t purge_offset);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment