Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
f89418b4
Commit
f89418b4
authored
Nov 10, 2001
by
sasha@mysql.sashanet.com
Browse files
Options
Browse Files
Download
Plain Diff
Merge work:/home/bk/mysql-4.0
into mysql.sashanet.com:/home/sasha/src/bk/mysql-4.0
parents
2f51cf20
beaf95b0
Changes
19
Hide whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
919 additions
and
722 deletions
+919
-722
client/mysqlbinlog.cc
client/mysqlbinlog.cc
+78
-3
libmysql/Makefile.shared
libmysql/Makefile.shared
+2
-1
mysql-test/mysql-test-run.sh
mysql-test/mysql-test-run.sh
+7
-0
mysql-test/r/rpl000002.result
mysql-test/r/rpl000002.result
+1
-1
mysql-test/r/rpl000016.result
mysql-test/r/rpl000016.result
+4
-4
mysql-test/r/rpl_log.result
mysql-test/r/rpl_log.result
+13
-15
mysql-test/t/rpl000016.test
mysql-test/t/rpl000016.test
+0
-3
mysys/mf_iocache.c
mysys/mf_iocache.c
+7
-1
sql/log.cc
sql/log.cc
+16
-8
sql/log_event.cc
sql/log_event.cc
+58
-39
sql/log_event.h
sql/log_event.h
+13
-11
sql/repl_failsafe.cc
sql/repl_failsafe.cc
+611
-0
sql/repl_failsafe.h
sql/repl_failsafe.h
+16
-0
sql/slave.cc
sql/slave.cc
+66
-6
sql/slave.h
sql/slave.h
+3
-1
sql/sql_class.h
sql/sql_class.h
+5
-2
sql/sql_parse.cc
sql/sql_parse.cc
+1
-0
sql/sql_repl.cc
sql/sql_repl.cc
+11
-617
sql/sql_repl.h
sql/sql_repl.h
+7
-10
No files found.
client/mysqlbinlog.cc
View file @
f89418b4
...
...
@@ -21,6 +21,8 @@
#include <time.h>
#include "log_event.h"
#define PROBE_HEADER_LEN (4+EVENT_LEN_OFFSET)
#define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_LOCAL_FILES)
char
server_version
[
SERVER_VERSION_LENGTH
];
...
...
@@ -288,6 +290,52 @@ static void dump_remote_table(NET* net, const char* db, const char* table)
}
}
static
int
check_master_version
(
MYSQL
*
mysql
)
{
MYSQL_RES
*
res
=
0
;
MYSQL_ROW
row
;
const
char
*
version
;
int
old_format
=
0
;
if
(
mysql_query
(
mysql
,
"SELECT VERSION()"
)
||
!
(
res
=
mysql_store_result
(
mysql
)))
{
mysql_close
(
mysql
);
die
(
"Error checking master version: %s"
,
mysql_error
(
mysql
));
}
if
(
!
(
row
=
mysql_fetch_row
(
res
)))
{
mysql_free_result
(
res
);
mysql_close
(
mysql
);
die
(
"Master returned no rows for SELECT VERSION()"
);
return
1
;
}
if
(
!
(
version
=
row
[
0
]))
{
mysql_free_result
(
res
);
mysql_close
(
mysql
);
die
(
"Master reported NULL for the version"
);
}
switch
(
*
version
)
{
case
'3'
:
old_format
=
1
;
break
;
case
'4'
:
old_format
=
0
;
break
;
default:
sql_print_error
(
"Master reported unrecognized MySQL version '%s'"
,
version
);
mysql_free_result
(
res
);
mysql_close
(
mysql
);
return
1
;
}
mysql_free_result
(
res
);
return
old_format
;
}
static
void
dump_remote_log_entries
(
const
char
*
logname
)
{
...
...
@@ -295,6 +343,9 @@ static void dump_remote_log_entries(const char* logname)
char
last_db
[
FN_REFLEN
+
1
]
=
""
;
uint
len
;
NET
*
net
=
&
mysql
->
net
;
int
old_format
;
old_format
=
check_master_version
(
mysql
);
if
(
!
position
)
position
=
4
;
// protect the innocent from spam
if
(
position
<
4
)
{
...
...
@@ -307,7 +358,7 @@ static void dump_remote_log_entries(const char* logname)
len
=
(
uint
)
strlen
(
logname
);
int4store
(
buf
+
6
,
0
);
memcpy
(
buf
+
10
,
logname
,
len
);
if
(
simple_command
(
mysql
,
COM_BINLOG_DUMP
,
buf
,
len
+
10
,
1
))
if
(
simple_command
(
mysql
,
COM_BINLOG_DUMP
,
buf
,
len
+
10
,
1
))
die
(
"Error sending the log dump command"
);
for
(;;)
...
...
@@ -322,7 +373,7 @@ static void dump_remote_log_entries(const char* logname)
len
,
net
->
read_pos
[
5
]));
Log_event
*
ev
=
Log_event
::
read_log_event
(
(
const
char
*
)
net
->
read_pos
+
1
,
len
-
1
,
&
error
);
len
-
1
,
&
error
,
old_format
);
if
(
ev
)
{
ev
->
print
(
result_file
,
short_form
,
last_db
);
...
...
@@ -335,12 +386,34 @@ static void dump_remote_log_entries(const char* logname)
}
}
static
int
check_header
(
IO_CACHE
*
file
)
{
char
buf
[
PROBE_HEADER_LEN
];
int
old_format
;
my_off_t
pos
=
my_b_tell
(
file
);
my_b_seek
(
file
,
(
my_off_t
)
0
);
if
(
my_b_read
(
file
,
buf
,
sizeof
(
buf
)))
die
(
"Failed reading header"
);
if
(
buf
[
EVENT_TYPE_OFFSET
+
4
]
==
START_EVENT
)
{
uint
event_len
;
event_len
=
uint4korr
(
buf
+
EVENT_LEN_OFFSET
+
4
);
old_format
=
(
event_len
<
LOG_EVENT_HEADER_LEN
+
START_HEADER_LEN
);
}
else
old_format
=
0
;
my_b_seek
(
file
,
pos
);
return
old_format
;
}
static
void
dump_local_log_entries
(
const
char
*
logname
)
{
File
fd
=
-
1
;
IO_CACHE
cache
,
*
file
=
&
cache
;
ulonglong
rec_count
=
0
;
char
last_db
[
FN_REFLEN
+
1
]
=
""
;
bool
old_format
=
0
;
if
(
logname
&&
logname
[
0
]
!=
'-'
)
{
...
...
@@ -349,12 +422,14 @@ static void dump_local_log_entries(const char* logname)
if
(
init_io_cache
(
file
,
fd
,
0
,
READ_CACHE
,
(
my_off_t
)
position
,
0
,
MYF
(
MY_WME
|
MY_NABP
)))
exit
(
1
);
old_format
=
check_header
(
file
);
}
else
{
if
(
init_io_cache
(
file
,
fileno
(
result_file
),
0
,
READ_CACHE
,
(
my_off_t
)
0
,
0
,
MYF
(
MY_WME
|
MY_NABP
|
MY_DONT_CHECK_FILESIZE
)))
exit
(
1
);
old_format
=
check_header
(
file
);
if
(
position
)
{
/* skip 'position' characters from stdout */
...
...
@@ -385,7 +460,7 @@ static void dump_local_log_entries(const char* logname)
char
llbuff
[
21
];
my_off_t
old_off
=
my_b_tell
(
file
);
Log_event
*
ev
=
Log_event
::
read_log_event
(
file
);
Log_event
*
ev
=
Log_event
::
read_log_event
(
file
,
old_format
);
if
(
!
ev
)
{
if
(
file
->
error
)
...
...
libmysql/Makefile.shared
View file @
f89418b4
...
...
@@ -55,7 +55,8 @@ mysysobjects1 = my_init.lo my_static.lo my_malloc.lo my_realloc.lo \
mf_loadpath.lo my_pthread.lo my_thr_init.lo
\
thr_mutex.lo mulalloc.lo string.lo default.lo
\
my_compress.lo array.lo my_once.lo list.lo my_net.lo
\
charset.lo hash.lo mf_iocache.lo my_seek.lo
\
charset.lo hash.lo mf_iocache.lo
\
mf_iocache2.lo my_seek.lo
\
my_pread.lo mf_cache.lo my_vsnprintf.lo md5.lo
# Not needed in the minimum library
...
...
mysql-test/mysql-test-run.sh
View file @
f89418b4
...
...
@@ -168,6 +168,9 @@ while test $# -gt 0; do
USE_MANAGER
=
1
USE_RUNNING_SERVER
=
;;
--start-and-exit
)
START_AND_EXIT
=
1
;;
--skip-innobase
)
EXTRA_MASTER_MYSQLD_OPT
=
"
$EXTRA_MASTER_MYSQLD_OPT
--skip-innobase"
EXTRA_SLAVE_MYSQLD_OPT
=
"
$EXTRA_SLAVE_MYSQLD_OPT
--skip-innobase"
;;
...
...
@@ -1091,6 +1094,10 @@ then
mysql_loadstd
fi
if
[
"x
$START_AND_EXIT
"
=
"x1"
]
;
then
echo
"Servers started, exiting"
exit
fi
$ECHO
"Starting Tests"
...
...
mysql-test/r/rpl000002.result
View file @
f89418b4
...
...
@@ -16,7 +16,7 @@ n
2002
show slave hosts;
Server_id Host Port Rpl_recovery_rank Master_id
2 127.0.0.1
9307
2 1
2 127.0.0.1
$SLAVE_MYPORT
2 1
drop table t1;
slave stop;
drop table if exists t2;
...
...
mysql-test/r/rpl000016.result
View file @
f89418b4
...
...
@@ -15,7 +15,7 @@ create table t1 (s text);
insert into t1 values('Could not break slave'),('Tried hard');
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root
9999
60 master-bin.001 234 Yes 0 0 3
127.0.0.1 root
$MASTER_MYPORT
60 master-bin.001 234 Yes 0 0 3
select * from t1;
s
Could not break slave
...
...
@@ -42,7 +42,7 @@ master-bin.003
insert into t2 values (65);
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root
9999 60 master-bin.003 202 Yes 0 0 3
127.0.0.1 root
$MASTER_MYPORT 60 master-bin.003 127 Yes 0 0 2
select * from t2;
m
34
...
...
@@ -60,12 +60,12 @@ master-bin.005
master-bin.006
show master status;
File Position Binlog_do_db Binlog_ignore_db
master-bin.006
710
master-bin.006
382
slave stop;
slave start;
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root
9999 60 master-bin.006 710 Yes 0 0 11
127.0.0.1 root
$MASTER_MYPORT 60 master-bin.006 382 Yes 0 0 6
lock tables t3 read;
select count(*) from t3 where n >= 4;
count(*)
...
...
mysql-test/r/rpl_log.result
View file @
f89418b4
...
...
@@ -16,7 +16,7 @@ load data infile '../../std_data/words.dat' into table t1;
drop table t1;
show binlog events;
Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 4 Start 1 1 Server ver:
$VERSION
, Binlog ver: 2
master-bin.001 4 Start 1 1 Server ver:
4.0.1-alpha-debug-log
, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
...
...
@@ -41,7 +41,7 @@ insert into t1 values (1);
drop table t1;
show binlog events;
Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 4 Start 1 1 Server ver:
$VERSION
, Binlog ver: 2
master-bin.001 4 Start 1 1 Server ver:
4.0.1-alpha-debug-log
, Binlog ver: 2
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
...
...
@@ -54,10 +54,9 @@ master-bin.001 627 Rotate 1 10 master-bin.002;pos=4
master-bin.001 668 Stop 1 11
show binlog events in 'master-bin.002';
Log_name Pos Event_type Server_id Log_seq Info
master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
master-bin.002 79 Query 1 2 use test; create table t1 (n int)
master-bin.002 137 Query 1 3 use test; insert into t1 values (1)
master-bin.002 197 Query 1 4 use test; drop table t1
master-bin.002 4 Query 1 1 use test; create table t1 (n int)
master-bin.002 62 Query 1 2 use test; insert into t1 values (1)
master-bin.002 122 Query 1 3 use test; drop table t1
show master logs;
Log_name
master-bin.001
...
...
@@ -69,7 +68,7 @@ slave-bin.001
slave-bin.002
show binlog events in 'slave-bin.001' from 4;
Log_name Pos Event_type Server_id Log_seq Info
slave-bin.001 4 Start 2 1 Server ver:
$VERSION
, Binlog ver: 2
slave-bin.001 4 Start 2 1 Server ver:
4.0.1-alpha-debug-log
, Binlog ver: 2
slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4
slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
slave-bin.001 225 Intvar 1 3 INSERT_ID=1
...
...
@@ -83,14 +82,13 @@ slave-bin.001 689 Rotate 1 4 slave-bin.002;pos=4; forced by master
slave-bin.001 729 Stop 2 5
show binlog events in 'slave-bin.002' from 4;
Log_name Pos Event_type Server_id Log_seq Info
slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2
slave-bin.002 79 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
slave-bin.002 132 Query 1 2 use test; create table t1 (n int)
slave-bin.002 190 Query 1 3 use test; insert into t1 values (1)
slave-bin.002 250 Query 1 4 use test; drop table t1
slave-bin.002 4 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
slave-bin.002 57 Query 1 1 use test; create table t1 (n int)
slave-bin.002 115 Query 1 2 use test; insert into t1 values (1)
slave-bin.002 175 Query 1 3 use test; drop table t1
show slave status;
Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Last_log_seq
127.0.0.1 root $MASTER_MYPORT 1 master-bin.002
245 Yes 0 0 4
127.0.0.1 root $MASTER_MYPORT 1 master-bin.002
170 Yes 0 0 3
show new master for slave with master_log_file='master-bin.001' and
master_log_pos=4 and master_log_seq=1 and master_server_id=1;
Log_name Log_pos
...
...
@@ -106,8 +104,8 @@ slave-bin.001 439
show new master for slave with master_log_file='master-bin.002' and
master_log_pos=4 and master_log_seq=1 and master_server_id=1;
Log_name Log_pos
slave-bin.002
132
slave-bin.002
57
show new master for slave with master_log_file='master-bin.002' and
master_log_pos=137 and master_log_seq=3 and master_server_id=1;
Log_name Log_pos
slave-bin.002 2
50
slave-bin.002 2
23
mysql-test/t/rpl000016.test
View file @
f89418b4
...
...
@@ -23,7 +23,6 @@ insert into t1 values('Could not break slave'),('Tried hard');
save_master_pos
;
connection
slave
;
sync_with_master
;
--
replace_result
9306
9999
3334
9999
3335
9999
show
slave
status
;
select
*
from
t1
;
connection
master
;
...
...
@@ -70,7 +69,6 @@ insert into t2 values (65);
save_master_pos
;
connection
slave
;
sync_with_master
;
--
replace_result
9306
9999
3334
9999
3335
9999
show
slave
status
;
select
*
from
t2
;
connection
master
;
...
...
@@ -92,7 +90,6 @@ connection slave;
slave
stop
;
slave
start
;
sync_with_master
;
--
replace_result
9306
9999
3334
9999
3335
9999
show
slave
status
;
# because of concurrent insert, the table may not be up to date
# if we do not lock
...
...
mysys/mf_iocache.c
View file @
f89418b4
...
...
@@ -166,7 +166,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info
->
seek_not_done
=
test
(
file
>=
0
&&
type
!=
READ_FIFO
&&
type
!=
READ_NET
);
info
->
myflags
=
cache_myflags
&
~
(
MY_NABP
|
MY_FNABP
);
info
->
rc_request_pos
=
info
->
rc_pos
=
info
->
buffer
;
info
->
rc_request_pos
=
info
->
rc_pos
=
info
->
write_pos
=
info
->
buffer
;
info
->
write_pos
=
info
->
write_end
=
0
;
if
(
type
==
SEQ_READ_APPEND
)
{
info
->
append_read_pos
=
info
->
write_pos
=
info
->
append_buffer
;
...
...
@@ -308,6 +309,11 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
{
info
->
append_read_pos
=
info
->
write_pos
=
info
->
append_buffer
;
}
if
(
!
info
->
write_pos
)
info
->
write_pos
=
info
->
buffer
;
if
(
!
info
->
write_end
)
info
->
write_end
=
info
->
buffer
+
info
->
buffer_length
-
(
seek_offset
&
(
IO_SIZE
-
1
));
info
->
type
=
type
;
info
->
error
=
0
;
init_read_function
(
info
,
type
);
...
...
sql/log.cc
View file @
f89418b4
...
...
@@ -81,7 +81,8 @@ static int find_uniq_filename(char *name)
MYSQL_LOG
::
MYSQL_LOG
()
:
last_time
(
0
),
query_start
(
0
),
index_file
(
-
1
),
name
(
0
),
log_type
(
LOG_CLOSED
),
write_error
(
0
),
inited
(
0
),
log_seq
(
1
),
file_id
(
1
),
no_rotate
(
0
)
inited
(
0
),
log_seq
(
1
),
file_id
(
1
),
no_rotate
(
0
),
need_start_event
(
1
)
{
/*
We don't want to intialize LOCK_Log here as the thread system may
...
...
@@ -136,9 +137,11 @@ bool MYSQL_LOG::open_index( int options)
MYF
(
MY_WME
)))
<
0
);
}
void
MYSQL_LOG
::
init
(
enum_log_type
log_type_arg
)
void
MYSQL_LOG
::
init
(
enum_log_type
log_type_arg
,
enum
cache_type
io_cache_type_arg
)
{
log_type
=
log_type_arg
;
io_cache_type
=
io_cache_type_arg
;
if
(
!
inited
)
{
inited
=
1
;
...
...
@@ -184,7 +187,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
if
((
file
=
my_open
(
log_file_name
,
O_CREAT
|
O_APPEND
|
O_WRONLY
|
O_BINARY
,
MYF
(
MY_WME
|
ME_WAITTANG
)))
<
0
||
init_io_cache
(
&
log_file
,
file
,
IO_SIZE
,
WRITE_CACHE
,
init_io_cache
(
&
log_file
,
file
,
IO_SIZE
,
io_cache_type
,
my_tell
(
file
,
MYF
(
MY_WME
)),
0
,
MYF
(
MY_WME
|
MY_NABP
)))
goto
err
;
...
...
@@ -220,6 +223,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
}
else
if
(
log_type
==
LOG_BIN
)
{
bool
error
;
/*
Explanation of the boolean black magic:
if we are supposed to write magic number try write
...
...
@@ -232,10 +236,13 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
goto
err
;
log_seq
=
1
;
Start_log_event
s
;
bool
error
;
s
.
set_log_seq
(
0
,
this
);
s
.
write
(
&
log_file
);
if
(
need_start_event
)
{
Start_log_event
s
;
s
.
set_log_seq
(
0
,
this
);
s
.
write
(
&
log_file
);
need_start_event
=
0
;
}
flush_io_cache
(
&
log_file
);
pthread_mutex_lock
(
&
LOCK_index
);
error
=
(
my_write
(
index_file
,
(
byte
*
)
log_file_name
,
strlen
(
log_file_name
),
...
...
@@ -715,7 +722,8 @@ bool MYSQL_LOG::write(Log_event* event_info)
file
==
&
log_file
&&
flush_io_cache
(
file
))
goto
err
;
error
=
0
;
should_rotate
=
(
file
==
&
log_file
&&
my_b_tell
(
file
)
>=
max_binlog_size
);
should_rotate
=
(
file
==
&
log_file
&&
(
uint
)
my_b_tell
(
file
)
>=
max_binlog_size
);
err:
if
(
error
)
{
...
...
sql/log_event.cc
View file @
f89418b4
...
...
@@ -149,12 +149,21 @@ static void cleanup_load_tmpdir()
#endif
Log_event
::
Log_event
(
const
char
*
buf
)
:
cached_event_len
(
0
),
temp_buf
(
0
)
Log_event
::
Log_event
(
const
char
*
buf
,
bool
old_format
)
:
cached_event_len
(
0
),
temp_buf
(
0
)
{
when
=
uint4korr
(
buf
);
server_id
=
uint4korr
(
buf
+
SERVER_ID_OFFSET
);
log_seq
=
uint4korr
(
buf
+
LOG_SEQ_OFFSET
);
flags
=
uint2korr
(
buf
+
FLAGS_OFFSET
);
if
(
old_format
)
{
log_seq
=
0
;
flags
=
0
;
}
else
{
log_seq
=
uint4korr
(
buf
+
LOG_SEQ_OFFSET
);
flags
=
uint2korr
(
buf
+
FLAGS_OFFSET
);
}
#ifndef MYSQL_CLIENT
thd
=
0
;
#endif
...
...
@@ -441,17 +450,24 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
#define UNLOCK_MUTEX
#endif
#ifndef MYSQL_CLIENT
#define LOCK_MUTEX if(log_lock) pthread_mutex_lock(log_lock);
#else
#define LOCK_MUTEX
#endif
// allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT
Log_event
*
Log_event
::
read_log_event
(
IO_CACHE
*
file
,
pthread_mutex_t
*
log_lock
)
Log_event
*
Log_event
::
read_log_event
(
IO_CACHE
*
file
,
pthread_mutex_t
*
log_lock
,
bool
old_format
)
#else
Log_event
*
Log_event
::
read_log_event
(
IO_CACHE
*
file
)
Log_event
*
Log_event
::
read_log_event
(
IO_CACHE
*
file
,
bool
old_format
)
#endif
{
char
head
[
LOG_EVENT_HEADER_LEN
];
#ifndef MYSQL_CLIENT
if
(
log_lock
)
pthread_mutex_lock
(
log_lock
);
#endif
LOCK_MUTEX
;
if
(
my_b_read
(
file
,
(
byte
*
)
head
,
sizeof
(
head
)))
{
UNLOCK_MUTEX
;
...
...
@@ -489,7 +505,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file)
error
=
"read error"
;
goto
err
;
}
if
((
res
=
read_log_event
(
buf
,
data_len
,
&
error
)))
if
((
res
=
read_log_event
(
buf
,
data_len
,
&
error
,
old_format
)))
res
->
register_temp_buf
(
buf
);
err:
UNLOCK_MUTEX
;
...
...
@@ -502,7 +518,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file)
}
Log_event
*
Log_event
::
read_log_event
(
const
char
*
buf
,
int
event_len
,
const
char
**
error
)
const
char
**
error
,
bool
old_format
)
{
if
(
event_len
<
EVENT_LEN_OFFSET
||
(
uint
)
event_len
!=
uint4korr
(
buf
+
EVENT_LEN_OFFSET
))
...
...
@@ -513,14 +529,14 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
switch
(
buf
[
EVENT_TYPE_OFFSET
])
{
case
QUERY_EVENT
:
ev
=
new
Query_log_event
(
buf
,
event_len
);
ev
=
new
Query_log_event
(
buf
,
event_len
,
old_format
);
break
;
case
LOAD_EVENT
:
case
NEW_LOAD_EVENT
:
ev
=
new
Load_log_event
(
buf
,
event_len
);
ev
=
new
Load_log_event
(
buf
,
event_len
,
old_format
);
break
;
case
ROTATE_EVENT
:
ev
=
new
Rotate_log_event
(
buf
,
event_len
);
ev
=
new
Rotate_log_event
(
buf
,
event_len
,
old_format
);
break
;
case
SLAVE_EVENT
:
ev
=
new
Slave_log_event
(
buf
,
event_len
);
...
...
@@ -538,13 +554,13 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev
=
new
Execute_load_log_event
(
buf
,
event_len
);
break
;
case
START_EVENT
:
ev
=
new
Start_log_event
(
buf
);
ev
=
new
Start_log_event
(
buf
,
old_format
);
break
;
case
STOP_EVENT
:
ev
=
new
Stop_log_event
(
buf
);
ev
=
new
Stop_log_event
(
buf
,
old_format
);
break
;
case
INTVAR_EVENT
:
ev
=
new
Intvar_log_event
(
buf
);
ev
=
new
Intvar_log_event
(
buf
,
old_format
);
break
;
default:
break
;
...
...
@@ -634,7 +650,8 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
#endif
/* #ifdef MYSQL_CLIENT */
Start_log_event
::
Start_log_event
(
const
char
*
buf
)
:
Log_event
(
buf
)
Start_log_event
::
Start_log_event
(
const
char
*
buf
,
bool
old_format
)
:
Log_event
(
buf
,
old_format
)
{
binlog_version
=
uint2korr
(
buf
+
LOG_EVENT_HEADER_LEN
+
ST_BINLOG_VER_OFFSET
);
...
...
@@ -652,8 +669,9 @@ int Start_log_event::write_data(IO_CACHE* file)
return
(
my_b_write
(
file
,
(
byte
*
)
buff
,
sizeof
(
buff
))
?
-
1
:
0
);
}
Rotate_log_event
::
Rotate_log_event
(
const
char
*
buf
,
int
event_len
)
:
Log_event
(
buf
),
new_log_ident
(
NULL
),
alloced
(
0
)
Rotate_log_event
::
Rotate_log_event
(
const
char
*
buf
,
int
event_len
,
bool
old_format
)
:
Log_event
(
buf
,
old_format
),
new_log_ident
(
NULL
),
alloced
(
0
)
{
// the caller will ensure that event_len is what we have at
// EVENT_LEN_OFFSET
...
...
@@ -695,8 +713,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
}
#endif
Query_log_event
::
Query_log_event
(
const
char
*
buf
,
int
event_len
)
:
Log_event
(
buf
),
data_buf
(
0
),
query
(
NULL
),
db
(
NULL
)
Query_log_event
::
Query_log_event
(
const
char
*
buf
,
int
event_len
,
bool
old_format
)
:
Log_event
(
buf
,
old_format
),
data_buf
(
0
),
query
(
NULL
),
db
(
NULL
)
{
if
((
uint
)
event_len
<
QUERY_EVENT_OVERHEAD
)
return
;
...
...
@@ -766,7 +785,8 @@ int Query_log_event::write_data(IO_CACHE* file)
my_b_write
(
file
,
(
byte
*
)
query
,
q_len
))
?
-
1
:
0
;
}
Intvar_log_event
::
Intvar_log_event
(
const
char
*
buf
)
:
Log_event
(
buf
)
Intvar_log_event
::
Intvar_log_event
(
const
char
*
buf
,
bool
old_format
)
:
Log_event
(
buf
,
old_format
)
{
buf
+=
LOG_EVENT_HEADER_LEN
;
type
=
buf
[
I_TYPE_OFFSET
];
...
...
@@ -1003,8 +1023,9 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
// the caller must do buf[event_len] = 0 before he starts using the
// constructed event
Load_log_event
::
Load_log_event
(
const
char
*
buf
,
int
event_len
)
:
Log_event
(
buf
),
num_fields
(
0
),
fields
(
0
),
Load_log_event
::
Load_log_event
(
const
char
*
buf
,
int
event_len
,
bool
old_format
)
:
Log_event
(
buf
,
old_format
),
num_fields
(
0
),
fields
(
0
),
field_lens
(
0
),
field_block_len
(
0
),
table_name
(
0
),
db
(
0
),
fname
(
0
)
{
...
...
@@ -1237,7 +1258,7 @@ void Slave_log_event::init_from_mem_pool(int data_size)
}
Slave_log_event
::
Slave_log_event
(
const
char
*
buf
,
int
event_len
)
:
Log_event
(
buf
),
mem_pool
(
0
),
master_host
(
0
)
Log_event
(
buf
,
0
),
mem_pool
(
0
),
master_host
(
0
)
{
event_len
-=
LOG_EVENT_HEADER_LEN
;
if
(
event_len
<
0
)
...
...
@@ -1291,7 +1312,7 @@ int Create_file_log_event::write_base(IO_CACHE* file)
}
Create_file_log_event
::
Create_file_log_event
(
const
char
*
buf
,
int
len
)
:
Load_log_event
(
buf
,
0
),
fake_base
(
0
),
block
(
0
)
Load_log_event
(
buf
,
0
,
0
),
fake_base
(
0
),
block
(
0
)
{
int
block_offset
;
if
(
copy_log_event
(
buf
,
len
))
...
...
@@ -1347,7 +1368,7 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg,
#endif
Append_block_log_event
::
Append_block_log_event
(
const
char
*
buf
,
int
len
)
:
Log_event
(
buf
),
block
(
0
)
Log_event
(
buf
,
0
),
block
(
0
)
{
if
((
uint
)
len
<
APPEND_BLOCK_EVENT_OVERHEAD
)
return
;
...
...
@@ -1399,7 +1420,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg):
#endif
Delete_file_log_event
::
Delete_file_log_event
(
const
char
*
buf
,
int
len
)
:
Log_event
(
buf
),
file_id
(
0
)
Log_event
(
buf
,
0
),
file_id
(
0
)
{
if
((
uint
)
len
<
DELETE_FILE_EVENT_OVERHEAD
)
return
;
...
...
@@ -1446,7 +1467,7 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg):
#endif
Execute_load_log_event
::
Execute_load_log_event
(
const
char
*
buf
,
int
len
)
:
Log_event
(
buf
),
file_id
(
0
)
Log_event
(
buf
,
0
),
file_id
(
0
)
{
if
((
uint
)
len
<
EXEC_LOAD_EVENT_OVERHEAD
)
return
;
...
...
@@ -1657,15 +1678,11 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
int
Start_log_event
::
exec_event
(
struct
st_master_info
*
mi
)
{
#ifdef TO_BE_DELETED
/*
We can't close temporary files or cleanup the tmpdir here, becasue
someone may have just rotated the logs on the master.
We should only do this cleanup when we know the master restarted.
*/
close_temporary_tables
(
thd
);
cleanup_load_tmpdir
();
#endif
if
(
!
mi
->
old_format
)
{
close_temporary_tables
(
thd
);
cleanup_load_tmpdir
();
}
return
Log_event
::
exec_event
(
mi
);
}
...
...
@@ -1866,7 +1883,9 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
slave_print_error
(
my_errno
,
"Could not open file '%s'"
,
fname
);
goto
err
;
}
if
(
!
(
lev
=
(
Load_log_event
*
)
Log_event
::
read_log_event
(
&
file
,
0
))
if
(
!
(
lev
=
(
Load_log_event
*
)
Log_event
::
read_log_event
(
&
file
,
(
pthread_mutex_t
*
)
0
,
(
bool
)
0
))
||
lev
->
get_type_code
()
!=
NEW_LOAD_EVENT
)
{
slave_print_error
(
0
,
"File '%s' appears corrupted"
,
fname
);
...
...
sql/log_event.h
View file @
f89418b4
...
...
@@ -242,7 +242,7 @@ class Log_event
virtual
Log_event_type
get_type_code
()
=
0
;
virtual
bool
is_valid
()
=
0
;
virtual
bool
get_cache_stmt
()
{
return
0
;
}
Log_event
(
const
char
*
buf
);
Log_event
(
const
char
*
buf
,
bool
old_format
);
#ifndef MYSQL_CLIENT
Log_event
(
THD
*
thd_arg
,
uint16
flags_arg
=
0
);
#endif
...
...
@@ -268,12 +268,14 @@ class Log_event
#ifndef MYSQL_CLIENT
// if mutex is 0, the read will proceed without mutex
static
Log_event
*
read_log_event
(
IO_CACHE
*
file
,
pthread_mutex_t
*
log_lock
);
static
Log_event
*
read_log_event
(
IO_CACHE
*
file
,
pthread_mutex_t
*
log_lock
,
bool
old_format
);
#else // avoid having to link mysqlbinlog against libpthread
static
Log_event
*
read_log_event
(
IO_CACHE
*
file
);
static
Log_event
*
read_log_event
(
IO_CACHE
*
file
,
bool
old_format
);
#endif
static
Log_event
*
read_log_event
(
const
char
*
buf
,
int
event_len
,
const
char
**
error
);
const
char
**
error
,
bool
old_format
);
const
char
*
get_type_str
();
#ifndef MYSQL_CLIENT
...
...
@@ -317,7 +319,7 @@ class Query_log_event: public Log_event
bool
get_cache_stmt
()
{
return
cache_stmt
;
}
#endif
Query_log_event
(
const
char
*
buf
,
int
event_len
);
Query_log_event
(
const
char
*
buf
,
int
event_len
,
bool
old_format
);
~
Query_log_event
()
{
if
(
data_buf
)
...
...
@@ -411,7 +413,7 @@ class Load_log_event: public Log_event
int
exec_event
(
NET
*
net
,
struct
st_master_info
*
mi
);
#endif
Load_log_event
(
const
char
*
buf
,
int
event_len
);
Load_log_event
(
const
char
*
buf
,
int
event_len
,
bool
old_format
);
~
Load_log_event
()
{
}
...
...
@@ -451,7 +453,7 @@ class Start_log_event: public Log_event
memcpy
(
server_version
,
::
server_version
,
ST_SERVER_VER_LEN
);
}
#endif
Start_log_event
(
const
char
*
buf
);
Start_log_event
(
const
char
*
buf
,
bool
old_format
);
~
Start_log_event
()
{}
Log_event_type
get_type_code
()
{
return
START_EVENT
;}
int
write_data
(
IO_CACHE
*
file
);
...
...
@@ -479,7 +481,7 @@ class Intvar_log_event: public Log_event
:
Log_event
(
thd_arg
),
val
(
val_arg
),
type
(
type_arg
)
{}
#endif
Intvar_log_event
(
const
char
*
buf
);
Intvar_log_event
(
const
char
*
buf
,
bool
old_format
);
~
Intvar_log_event
()
{}
Log_event_type
get_type_code
()
{
return
INTVAR_EVENT
;}
const
char
*
get_var_type_name
();
...
...
@@ -503,7 +505,8 @@ class Stop_log_event: public Log_event
Stop_log_event
()
:
Log_event
((
THD
*
)
0
)
{}
#endif
Stop_log_event
(
const
char
*
buf
)
:
Log_event
(
buf
)
Stop_log_event
(
const
char
*
buf
,
bool
old_format
)
:
Log_event
(
buf
,
old_format
)
{
}
~
Stop_log_event
()
{}
...
...
@@ -534,7 +537,7 @@ class Rotate_log_event: public Log_event
alloced
(
0
)
{}
#endif
Rotate_log_event
(
const
char
*
buf
,
int
event_len
);
Rotate_log_event
(
const
char
*
buf
,
int
event_len
,
bool
old_format
);
~
Rotate_log_event
()
{
if
(
alloced
)
...
...
@@ -686,7 +689,6 @@ class Execute_load_log_event: public Log_event
#endif
};
#endif
...
...
sql/repl_failsafe.cc
View file @
f89418b4
...
...
@@ -20,12 +20,21 @@
#include "repl_failsafe.h"
#include "sql_repl.h"
#include "slave.h"
#include "sql_acl.h"
#include "mini_client.h"
#include "log_event.h"
#include <mysql.h>
#include <thr_alarm.h>
#define SLAVE_LIST_CHUNK 128
#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
RPL_STATUS
rpl_status
=
RPL_NULL
;
pthread_mutex_t
LOCK_rpl_status
;
pthread_cond_t
COND_rpl_status
;
HASH
slave_list
;
extern
const
char
*
any_db
;
const
char
*
rpl_role_type
[]
=
{
"MASTER"
,
"SLAVE"
,
NullS
};
TYPELIB
rpl_role_typelib
=
{
array_elements
(
rpl_role_type
)
-
1
,
""
,
...
...
@@ -37,6 +46,10 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE",
TYPELIB
rpl_status_typelib
=
{
array_elements
(
rpl_status_type
)
-
1
,
""
,
rpl_status_type
};
static
Slave_log_event
*
find_slave_event
(
IO_CACHE
*
log
,
const
char
*
log_file_name
,
char
*
errmsg
);
static
int
init_failsafe_rpl_thread
(
THD
*
thd
)
{
DBUG_ENTER
(
"init_failsafe_rpl_thread"
);
...
...
@@ -89,6 +102,333 @@ void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
pthread_mutex_unlock
(
&
LOCK_rpl_status
);
}
#define get_object(p, obj) \
{\
uint len = (uint)*p++; \
if (p + len > p_end || len >= sizeof(obj)) \
goto err; \
strmake(obj,(char*) p,len); \
p+= len; \
}\
static
inline
int
cmp_master_pos
(
Slave_log_event
*
sev
,
LEX_MASTER_INFO
*
mi
)
{
return
cmp_master_pos
(
sev
->
master_log
,
sev
->
master_pos
,
mi
->
log_file_name
,
mi
->
pos
);
}
void
unregister_slave
(
THD
*
thd
,
bool
only_mine
,
bool
need_mutex
)
{
if
(
need_mutex
)
pthread_mutex_lock
(
&
LOCK_slave_list
);
if
(
thd
->
server_id
)
{
SLAVE_INFO
*
old_si
;
if
((
old_si
=
(
SLAVE_INFO
*
)
hash_search
(
&
slave_list
,
(
byte
*
)
&
thd
->
server_id
,
4
))
&&
(
!
only_mine
||
old_si
->
thd
==
thd
))
hash_delete
(
&
slave_list
,
(
byte
*
)
old_si
);
}
if
(
need_mutex
)
pthread_mutex_unlock
(
&
LOCK_slave_list
);
}
int
register_slave
(
THD
*
thd
,
uchar
*
packet
,
uint
packet_length
)
{
SLAVE_INFO
*
si
;
int
res
=
1
;
uchar
*
p
=
packet
,
*
p_end
=
packet
+
packet_length
;
if
(
check_access
(
thd
,
FILE_ACL
,
any_db
))
return
1
;
if
(
!
(
si
=
(
SLAVE_INFO
*
)
my_malloc
(
sizeof
(
SLAVE_INFO
),
MYF
(
MY_WME
))))
goto
err
;
thd
->
server_id
=
si
->
server_id
=
uint4korr
(
p
);
p
+=
4
;
get_object
(
p
,
si
->
host
);
get_object
(
p
,
si
->
user
);
get_object
(
p
,
si
->
password
);
si
->
port
=
uint2korr
(
p
);
p
+=
2
;
si
->
rpl_recovery_rank
=
uint4korr
(
p
);
p
+=
4
;
if
(
!
(
si
->
master_id
=
uint4korr
(
p
)))
si
->
master_id
=
server_id
;
si
->
thd
=
thd
;
pthread_mutex_lock
(
&
LOCK_slave_list
);
unregister_slave
(
thd
,
0
,
0
);
res
=
hash_insert
(
&
slave_list
,
(
byte
*
)
si
);
pthread_mutex_unlock
(
&
LOCK_slave_list
);
return
res
;
err:
if
(
si
)
my_free
((
gptr
)
si
,
MYF
(
MY_WME
));
return
res
;
}
static
uint32
*
slave_list_key
(
SLAVE_INFO
*
si
,
uint
*
len
,
my_bool
not_used
__attribute__
((
unused
)))
{
*
len
=
4
;
return
&
si
->
server_id
;
}
static
void
slave_info_free
(
void
*
s
)
{
my_free
((
gptr
)
s
,
MYF
(
MY_WME
));
}
void
init_slave_list
()
{
hash_init
(
&
slave_list
,
SLAVE_LIST_CHUNK
,
0
,
0
,
(
hash_get_key
)
slave_list_key
,
slave_info_free
,
0
);
pthread_mutex_init
(
&
LOCK_slave_list
,
MY_MUTEX_INIT_FAST
);
}
void
end_slave_list
()
{
pthread_mutex_lock
(
&
LOCK_slave_list
);
hash_free
(
&
slave_list
);
pthread_mutex_unlock
(
&
LOCK_slave_list
);
pthread_mutex_destroy
(
&
LOCK_slave_list
);
}
static
int
find_target_pos
(
LEX_MASTER_INFO
*
mi
,
IO_CACHE
*
log
,
char
*
errmsg
)
{
uint32
log_seq
=
mi
->
last_log_seq
;
uint32
target_server_id
=
mi
->
server_id
;
for
(;;)
{
Log_event
*
ev
;
if
(
!
(
ev
=
Log_event
::
read_log_event
(
log
,
(
pthread_mutex_t
*
)
0
,
0
)))
{
if
(
log
->
error
>
0
)
strmov
(
errmsg
,
"Binary log truncated in the middle of event"
);
else
if
(
log
->
error
<
0
)
strmov
(
errmsg
,
"I/O error reading binary log"
);
else
strmov
(
errmsg
,
"Could not find target event in the binary log"
);
return
1
;
}
if
(
ev
->
log_seq
==
log_seq
&&
ev
->
server_id
==
target_server_id
)
{
delete
ev
;
mi
->
pos
=
my_b_tell
(
log
);
return
0
;
}
delete
ev
;
}
}
int
translate_master
(
THD
*
thd
,
LEX_MASTER_INFO
*
mi
,
char
*
errmsg
)
{
LOG_INFO
linfo
;
char
search_file_name
[
FN_REFLEN
],
last_log_name
[
FN_REFLEN
];
IO_CACHE
log
;
File
file
=
-
1
,
last_file
=
-
1
;
pthread_mutex_t
*
log_lock
;
const
char
*
errmsg_p
;
Slave_log_event
*
sev
=
0
;
my_off_t
last_pos
=
0
;
int
error
=
1
;
int
cmp_res
;
LINT_INIT
(
cmp_res
);
if
(
!
mysql_bin_log
.
is_open
())
{
strmov
(
errmsg
,
"Binary log is not open"
);
return
1
;
}
if
(
!
server_id_supplied
)
{
strmov
(
errmsg
,
"Misconfigured master - server id was not set"
);
return
1
;
}
linfo
.
index_file_offset
=
0
;
search_file_name
[
0
]
=
0
;
if
(
mysql_bin_log
.
find_first_log
(
&
linfo
,
search_file_name
))
{
strmov
(
errmsg
,
"Could not find first log"
);
return
1
;
}
thd
->
current_linfo
=
&
linfo
;
bzero
((
char
*
)
&
log
,
sizeof
(
log
));
log_lock
=
mysql_bin_log
.
get_log_lock
();
pthread_mutex_lock
(
log_lock
);
for
(;;)
{
if
((
file
=
open_binlog
(
&
log
,
linfo
.
log_file_name
,
&
errmsg_p
))
<
0
)
{
strmov
(
errmsg
,
errmsg_p
);
goto
err
;
}
if
(
!
(
sev
=
find_slave_event
(
&
log
,
linfo
.
log_file_name
,
errmsg
)))
goto
err
;
cmp_res
=
cmp_master_pos
(
sev
,
mi
);
delete
sev
;
if
(
!
cmp_res
)
{
/* Copy basename */
fn_format
(
mi
->
log_file_name
,
linfo
.
log_file_name
,
""
,
""
,
1
);
mi
->
pos
=
my_b_tell
(
&
log
);
goto
mi_inited
;
}
else
if
(
cmp_res
>
0
)
{
if
(
!
last_pos
)
{
strmov
(
errmsg
,
"Slave event in first log points past the target position"
);
goto
err
;
}
end_io_cache
(
&
log
);
(
void
)
my_close
(
file
,
MYF
(
MY_WME
));
if
(
init_io_cache
(
&
log
,
(
file
=
last_file
),
IO_SIZE
,
READ_CACHE
,
0
,
0
,
MYF
(
MY_WME
)))
{
errmsg
[
0
]
=
0
;
goto
err
;
}
break
;
}
strmov
(
last_log_name
,
linfo
.
log_file_name
);
last_pos
=
my_b_tell
(
&
log
);
switch
(
mysql_bin_log
.
find_next_log
(
&
linfo
))
{
case
LOG_INFO_EOF
:
if
(
last_file
>=
0
)
(
void
)
my_close
(
last_file
,
MYF
(
MY_WME
));
last_file
=
-
1
;
goto
found_log
;
case
0
:
break
;
default:
strmov
(
errmsg
,
"Error reading log index"
);
goto
err
;
}
end_io_cache
(
&
log
);
if
(
last_file
>=
0
)
(
void
)
my_close
(
last_file
,
MYF
(
MY_WME
));
last_file
=
file
;
}
found_log:
my_b_seek
(
&
log
,
last_pos
);
if
(
find_target_pos
(
mi
,
&
log
,
errmsg
))
goto
err
;
fn_format
(
mi
->
log_file_name
,
last_log_name
,
""
,
""
,
1
);
/* Copy basename */
mi_inited:
error
=
0
;
err:
pthread_mutex_unlock
(
log_lock
);
end_io_cache
(
&
log
);
pthread_mutex_lock
(
&
LOCK_thread_count
);
thd
->
current_linfo
=
0
;
pthread_mutex_unlock
(
&
LOCK_thread_count
);
if
(
file
>=
0
)
(
void
)
my_close
(
file
,
MYF
(
MY_WME
));
if
(
last_file
>=
0
&&
last_file
!=
file
)
(
void
)
my_close
(
last_file
,
MYF
(
MY_WME
));
return
error
;
}
// caller must delete result when done
static
Slave_log_event
*
find_slave_event
(
IO_CACHE
*
log
,
const
char
*
log_file_name
,
char
*
errmsg
)
{
Log_event
*
ev
;
int
i
;
bool
slave_event_found
=
0
;
LINT_INIT
(
ev
);
for
(
i
=
0
;
i
<
2
;
i
++
)
{
if
(
!
(
ev
=
Log_event
::
read_log_event
(
log
,
(
pthread_mutex_t
*
)
0
,
0
)))
{
my_snprintf
(
errmsg
,
SLAVE_ERRMSG_SIZE
,
"Error reading event in log '%s'"
,
(
char
*
)
log_file_name
);
return
0
;
}
if
(
ev
->
get_type_code
()
==
SLAVE_EVENT
)
{
slave_event_found
=
1
;
break
;
}
delete
ev
;
}
if
(
!
slave_event_found
)
{
my_snprintf
(
errmsg
,
SLAVE_ERRMSG_SIZE
,
"Could not find slave event in log '%s'"
,
(
char
*
)
log_file_name
);
delete
ev
;
return
0
;
}
return
(
Slave_log_event
*
)
ev
;
}
int
show_new_master
(
THD
*
thd
)
{
DBUG_ENTER
(
"show_new_master"
);
List
<
Item
>
field_list
;
char
errmsg
[
SLAVE_ERRMSG_SIZE
];
LEX_MASTER_INFO
*
lex_mi
=
&
thd
->
lex
.
mi
;
errmsg
[
0
]
=
0
;
// Safety
if
(
translate_master
(
thd
,
lex_mi
,
errmsg
))
{
if
(
errmsg
[
0
])
net_printf
(
&
thd
->
net
,
ER_ERROR_WHEN_EXECUTING_COMMAND
,
"SHOW NEW MASTER"
,
errmsg
);
else
send_error
(
&
thd
->
net
,
0
);
DBUG_RETURN
(
1
);
}
else
{
String
*
packet
=
&
thd
->
packet
;
field_list
.
push_back
(
new
Item_empty_string
(
"Log_name"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Log_pos"
,
20
));
if
(
send_fields
(
thd
,
field_list
,
1
))
DBUG_RETURN
(
-
1
);
packet
->
length
(
0
);
net_store_data
(
packet
,
lex_mi
->
log_file_name
);
net_store_data
(
packet
,
(
longlong
)
lex_mi
->
pos
);
if
(
my_net_write
(
&
thd
->
net
,
packet
->
ptr
(),
packet
->
length
()))
DBUG_RETURN
(
-
1
);
send_eof
(
&
thd
->
net
);
DBUG_RETURN
(
0
);
}
}
int
update_slave_list
(
MYSQL
*
mysql
)
{
MYSQL_RES
*
res
=
0
;
...
...
@@ -216,6 +556,277 @@ pthread_handler_decl(handle_failsafe_rpl,arg)
DBUG_RETURN
(
0
);
}
int
show_slave_hosts
(
THD
*
thd
)
{
List
<
Item
>
field_list
;
NET
*
net
=
&
thd
->
net
;
String
*
packet
=
&
thd
->
packet
;
DBUG_ENTER
(
"show_slave_hosts"
);
field_list
.
push_back
(
new
Item_empty_string
(
"Server_id"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Host"
,
20
));
if
(
opt_show_slave_auth_info
)
{
field_list
.
push_back
(
new
Item_empty_string
(
"User"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Password"
,
20
));
}
field_list
.
push_back
(
new
Item_empty_string
(
"Port"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Rpl_recovery_rank"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Master_id"
,
20
));
if
(
send_fields
(
thd
,
field_list
,
1
))
DBUG_RETURN
(
-
1
);
pthread_mutex_lock
(
&
LOCK_slave_list
);
for
(
uint
i
=
0
;
i
<
slave_list
.
records
;
++
i
)
{
SLAVE_INFO
*
si
=
(
SLAVE_INFO
*
)
hash_element
(
&
slave_list
,
i
);
packet
->
length
(
0
);
net_store_data
(
packet
,
si
->
server_id
);
net_store_data
(
packet
,
si
->
host
);
if
(
opt_show_slave_auth_info
)
{
net_store_data
(
packet
,
si
->
user
);
net_store_data
(
packet
,
si
->
password
);
}
net_store_data
(
packet
,
(
uint32
)
si
->
port
);
net_store_data
(
packet
,
si
->
rpl_recovery_rank
);
net_store_data
(
packet
,
si
->
master_id
);
if
(
my_net_write
(
net
,
(
char
*
)
packet
->
ptr
(),
packet
->
length
()))
{
pthread_mutex_unlock
(
&
LOCK_slave_list
);
DBUG_RETURN
(
-
1
);
}
}
pthread_mutex_unlock
(
&
LOCK_slave_list
);
send_eof
(
net
);
DBUG_RETURN
(
0
);
}
int
connect_to_master
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
)
{
if
(
!
mc_mysql_connect
(
mysql
,
mi
->
host
,
mi
->
user
,
mi
->
password
,
0
,
mi
->
port
,
0
,
0
))
{
sql_print_error
(
"Connection to master failed: %s"
,
mc_mysql_error
(
mysql
));
return
1
;
}
return
0
;
}
static
inline
void
cleanup_mysql_results
(
MYSQL_RES
*
db_res
,
MYSQL_RES
**
cur
,
MYSQL_RES
**
start
)
{
for
(
;
cur
>=
start
;
--
cur
)
{
if
(
*
cur
)
mc_mysql_free_result
(
*
cur
);
}
mc_mysql_free_result
(
db_res
);
}
static
inline
int
fetch_db_tables
(
THD
*
thd
,
MYSQL
*
mysql
,
const
char
*
db
,
MYSQL_RES
*
table_res
)
{
MYSQL_ROW
row
;
for
(
row
=
mc_mysql_fetch_row
(
table_res
);
row
;
row
=
mc_mysql_fetch_row
(
table_res
))
{
TABLE_LIST
table
;
const
char
*
table_name
=
row
[
0
];
int
error
;
if
(
table_rules_on
)
{
table
.
next
=
0
;
table
.
db
=
(
char
*
)
db
;
table
.
real_name
=
(
char
*
)
table_name
;
table
.
updating
=
1
;
if
(
!
tables_ok
(
thd
,
&
table
))
continue
;
}
if
((
error
=
fetch_nx_table
(
thd
,
db
,
table_name
,
&
glob_mi
,
mysql
)))
return
error
;
}
return
0
;
}
int
load_master_data
(
THD
*
thd
)
{
MYSQL
mysql
;
MYSQL_RES
*
master_status_res
=
0
;
bool
slave_was_running
=
0
;
int
error
=
0
;
mc_mysql_init
(
&
mysql
);
// we do not want anyone messing with the slave at all for the entire
// duration of the data load;
pthread_mutex_lock
(
&
LOCK_slave
);
// first, kill the slave
if
((
slave_was_running
=
slave_running
))
{
abort_slave
=
1
;
KICK_SLAVE
;
thd
->
proc_info
=
"waiting for slave to die"
;
while
(
slave_running
)
pthread_cond_wait
(
&
COND_slave_stopped
,
&
LOCK_slave
);
// wait until done
}
if
(
connect_to_master
(
thd
,
&
mysql
,
&
glob_mi
))
{
net_printf
(
&
thd
->
net
,
error
=
ER_CONNECT_TO_MASTER
,
mc_mysql_error
(
&
mysql
));
goto
err
;
}
// now that we are connected, get all database and tables in each
{
MYSQL_RES
*
db_res
,
**
table_res
,
**
table_res_end
,
**
cur_table_res
;
uint
num_dbs
;
if
(
mc_mysql_query
(
&
mysql
,
"show databases"
,
0
)
||
!
(
db_res
=
mc_mysql_store_result
(
&
mysql
)))
{
net_printf
(
&
thd
->
net
,
error
=
ER_QUERY_ON_MASTER
,
mc_mysql_error
(
&
mysql
));
goto
err
;
}
if
(
!
(
num_dbs
=
(
uint
)
mc_mysql_num_rows
(
db_res
)))
goto
err
;
// in theory, the master could have no databases at all
// and run with skip-grant
if
(
!
(
table_res
=
(
MYSQL_RES
**
)
thd
->
alloc
(
num_dbs
*
sizeof
(
MYSQL_RES
*
))))
{
net_printf
(
&
thd
->
net
,
error
=
ER_OUTOFMEMORY
);
goto
err
;
}
// this is a temporary solution until we have online backup
// capabilities - to be replaced once online backup is working
// we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
// can to minimize the lock time
if
(
mc_mysql_query
(
&
mysql
,
"FLUSH TABLES WITH READ LOCK"
,
0
)
||
mc_mysql_query
(
&
mysql
,
"SHOW MASTER STATUS"
,
0
)
||
!
(
master_status_res
=
mc_mysql_store_result
(
&
mysql
)))
{
net_printf
(
&
thd
->
net
,
error
=
ER_QUERY_ON_MASTER
,
mc_mysql_error
(
&
mysql
));
goto
err
;
}
// go through every table in every database, and if the replication
// rules allow replicating it, get it
table_res_end
=
table_res
+
num_dbs
;
for
(
cur_table_res
=
table_res
;
cur_table_res
<
table_res_end
;
cur_table_res
++
)
{
// since we know how many rows we have, this can never be NULL
MYSQL_ROW
row
=
mc_mysql_fetch_row
(
db_res
);
char
*
db
=
row
[
0
];
/*
Do not replicate databases excluded by rules
also skip mysql database - in most cases the user will
mess up and not exclude mysql database with the rules when
he actually means to - in this case, he is up for a surprise if
his priv tables get dropped and downloaded from master
TO DO - add special option, not enabled
by default, to allow inclusion of mysql database into load
data from master
*/
if
(
!
db_ok
(
db
,
replicate_do_db
,
replicate_ignore_db
)
||
!
strcmp
(
db
,
"mysql"
))
{
*
cur_table_res
=
0
;
continue
;
}
if
(
mysql_rm_db
(
thd
,
db
,
1
,
1
)
||
mysql_create_db
(
thd
,
db
,
0
,
1
))
{
send_error
(
&
thd
->
net
,
0
,
0
);
cleanup_mysql_results
(
db_res
,
cur_table_res
-
1
,
table_res
);
goto
err
;
}
if
(
mc_mysql_select_db
(
&
mysql
,
db
)
||
mc_mysql_query
(
&
mysql
,
"show tables"
,
0
)
||
!
(
*
cur_table_res
=
mc_mysql_store_result
(
&
mysql
)))
{
net_printf
(
&
thd
->
net
,
error
=
ER_QUERY_ON_MASTER
,
mc_mysql_error
(
&
mysql
));
cleanup_mysql_results
(
db_res
,
cur_table_res
-
1
,
table_res
);
goto
err
;
}
if
((
error
=
fetch_db_tables
(
thd
,
&
mysql
,
db
,
*
cur_table_res
)))
{
// we do not report the error - fetch_db_tables handles it
cleanup_mysql_results
(
db_res
,
cur_table_res
,
table_res
);
goto
err
;
}
}
cleanup_mysql_results
(
db_res
,
cur_table_res
-
1
,
table_res
);
// adjust position in the master
if
(
master_status_res
)
{
MYSQL_ROW
row
=
mc_mysql_fetch_row
(
master_status_res
);
/*
We need this check because the master may not be running with
log-bin, but it will still allow us to do all the steps
of LOAD DATA FROM MASTER - no reason to forbid it, really,
although it does not make much sense for the user to do it
*/
if
(
row
[
0
]
&&
row
[
1
])
{
strmake
(
glob_mi
.
log_file_name
,
row
[
0
],
sizeof
(
glob_mi
.
log_file_name
));
glob_mi
.
pos
=
atoi
(
row
[
1
]);
// atoi() is ok, since offset is <= 1GB
if
(
glob_mi
.
pos
<
4
)
glob_mi
.
pos
=
4
;
// don't hit the magic number
glob_mi
.
pending
=
0
;
flush_master_info
(
&
glob_mi
);
}
mc_mysql_free_result
(
master_status_res
);
}
if
(
mc_mysql_query
(
&
mysql
,
"UNLOCK TABLES"
,
0
))
{
net_printf
(
&
thd
->
net
,
error
=
ER_QUERY_ON_MASTER
,
mc_mysql_error
(
&
mysql
));
goto
err
;
}
}
err:
pthread_mutex_unlock
(
&
LOCK_slave
);
if
(
slave_was_running
)
start_slave
(
0
,
0
);
mc_mysql_close
(
&
mysql
);
// safe to call since we always do mc_mysql_init()
if
(
!
error
)
send_ok
(
&
thd
->
net
);
return
error
;
}
sql/repl_failsafe.h
View file @
f89418b4
...
...
@@ -2,6 +2,8 @@
#define REPL_FAILSAFE_H
#include "mysql.h"
#include "my_sys.h"
#include "slave.h"
typedef
enum
{
RPL_AUTH_MASTER
=
0
,
RPL_ACTIVE_SLAVE
,
RPL_IDLE_SLAVE
,
RPL_LOST_SOLDIER
,
RPL_TROOP_SOLDIER
,
...
...
@@ -19,4 +21,18 @@ pthread_handler_decl(handle_failsafe_rpl,arg);
void
change_rpl_status
(
RPL_STATUS
from_status
,
RPL_STATUS
to_status
);
int
find_recovery_captain
(
THD
*
thd
,
MYSQL
*
mysql
);
int
update_slave_list
(
MYSQL
*
mysql
);
extern
HASH
slave_list
;
int
load_master_data
(
THD
*
thd
);
int
connect_to_master
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
);
int
show_new_master
(
THD
*
thd
);
int
show_slave_hosts
(
THD
*
thd
);
int
translate_master
(
THD
*
thd
,
LEX_MASTER_INFO
*
mi
,
char
*
errmsg
);
void
init_slave_list
();
void
end_slave_list
();
int
register_slave
(
THD
*
thd
,
uchar
*
packet
,
uint
packet_length
);
void
unregister_slave
(
THD
*
thd
,
bool
only_mine
,
bool
need_mutex
);
#endif
sql/slave.cc
View file @
f89418b4
...
...
@@ -61,6 +61,8 @@ static int safe_sleep(THD* thd, int sec);
static
int
request_table_dump
(
MYSQL
*
mysql
,
const
char
*
db
,
const
char
*
table
);
static
int
create_table_from_dump
(
THD
*
thd
,
NET
*
net
,
const
char
*
db
,
const
char
*
table_name
);
static
int
check_master_version
(
MYSQL
*
mysql
,
MASTER_INFO
*
mi
);
char
*
rewrite_db
(
char
*
db
);
static
void
free_table_ent
(
TABLE_RULE_ENT
*
e
)
...
...
@@ -333,6 +335,54 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
return
1
;
}
static
int
check_master_version
(
MYSQL
*
mysql
,
MASTER_INFO
*
mi
)
{
MYSQL_RES
*
res
;
MYSQL_ROW
row
;
const
char
*
version
;
const
char
*
errmsg
=
0
;
if
(
mc_mysql_query
(
mysql
,
"SELECT VERSION()"
,
0
)
||
!
(
res
=
mc_mysql_store_result
(
mysql
)))
{
sql_print_error
(
"Error checking master version: %s"
,
mc_mysql_error
(
mysql
));
return
1
;
}
if
(
!
(
row
=
mc_mysql_fetch_row
(
res
)))
{
errmsg
=
"Master returned no rows for SELECT VERSION()"
;
goto
err
;
}
if
(
!
(
version
=
row
[
0
]))
{
errmsg
=
"Master reported NULL for the version"
;
goto
err
;
}
switch
(
*
version
)
{
case
'3'
:
mi
->
old_format
=
1
;
break
;
case
'4'
:
mi
->
old_format
=
0
;
break
;
default:
errmsg
=
"Master reported unrecognized MySQL version"
;
goto
err
;
}
err:
if
(
res
)
mc_mysql_free_result
(
res
);
if
(
errmsg
)
{
sql_print_error
(
errmsg
);
return
1
;
}
return
0
;
}
static
int
create_table_from_dump
(
THD
*
thd
,
NET
*
net
,
const
char
*
db
,
const
char
*
table_name
)
...
...
@@ -580,7 +630,7 @@ int init_master_info(MASTER_INFO* mi)
mi
->
inited
=
1
;
// now change the cache from READ to WRITE - must do this
// before flush_master_info
reinit_io_cache
(
&
mi
->
file
,
WRITE_CACHE
,
0L
,
0
,
1
);
reinit_io_cache
(
&
mi
->
file
,
WRITE_CACHE
,
0L
,
0
,
1
);
error
=
test
(
flush_master_info
(
mi
));
pthread_mutex_unlock
(
&
mi
->
lock
);
return
error
;
...
...
@@ -943,12 +993,14 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{
const
char
*
error_msg
;
Log_event
*
ev
=
Log_event
::
read_log_event
((
const
char
*
)
net
->
read_pos
+
1
,
event_len
,
&
error_msg
);
event_len
,
&
error_msg
,
mi
->
old_format
);
if
(
ev
)
{
int
type_code
=
ev
->
get_type_code
();
int
exec_res
;
if
(
ev
->
server_id
==
::
server_id
||
slave_skip_counter
)
if
(
ev
->
server_id
==
::
server_id
||
(
slave_skip_counter
&&
ev
->
get_type_code
()
!=
ROTATE_EVENT
))
{
if
(
type_code
==
LOAD_EVENT
)
skip_load_data_infile
(
net
);
...
...
@@ -1070,9 +1122,17 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
// register ourselves with the master
// if fails, this is not fatal - we just print the error message and go
// on with life
thd
->
proc_info
=
"Registering slave on master"
;
register_slave_on_master
(
mysql
);
update_slave_list
(
mysql
);
thd
->
proc_info
=
"Checking master version"
;
if
(
check_master_version
(
mysql
,
&
glob_mi
))
{
goto
err
;
}
if
(
!
glob_mi
.
old_format
)
{
thd
->
proc_info
=
"Registering slave on master"
;
if
(
register_slave_on_master
(
mysql
)
||
update_slave_list
(
mysql
))
goto
err
;
}
while
(
!
slave_killed
(
thd
))
{
...
...
sql/slave.h
View file @
f89418b4
...
...
@@ -23,8 +23,10 @@ typedef struct st_master_info
pthread_mutex_t
lock
;
pthread_cond_t
cond
;
bool
inited
;
bool
old_format
;
/* master binlog is in 3.23 format */
st_master_info
()
:
pending
(
0
),
fd
(
-
1
),
last_log_seq
(
0
),
inited
(
0
)
st_master_info
()
:
pending
(
0
),
fd
(
-
1
),
last_log_seq
(
0
),
inited
(
0
),
old_format
(
0
)
{
host
[
0
]
=
0
;
user
[
0
]
=
0
;
password
[
0
]
=
0
;
pthread_mutex_init
(
&
lock
,
MY_MUTEX_INIT_FAST
);
...
...
sql/sql_class.h
View file @
f89418b4
...
...
@@ -72,15 +72,18 @@ class MYSQL_LOG {
// we should not try to rotate it or write any rotation events
// the user should use FLUSH MASTER instead of FLUSH LOGS for
// purging
enum
cache_type
io_cache_type
;
bool
need_start_event
;
friend
class
Log_event
;
public:
MYSQL_LOG
();
~
MYSQL_LOG
();
pthread_mutex_t
*
get_log_lock
()
{
return
&
LOCK_log
;
}
void
set_need_start_event
()
{
need_start_event
=
1
;
}
void
set_index_file_name
(
const
char
*
index_file_name
=
0
);
void
init
(
enum_log_type
log_type_arg
);
void
init
(
enum_log_type
log_type_arg
,
enum
cache_type
io_cache_type_arg
=
WRITE_CACHE
);
void
open
(
const
char
*
log_name
,
enum_log_type
log_type
,
const
char
*
new_name
=
0
);
void
new_file
(
bool
inside_mutex
=
0
);
...
...
sql/sql_parse.cc
View file @
f89418b4
...
...
@@ -22,6 +22,7 @@
#include "mysql_priv.h"
#include "sql_acl.h"
#include "sql_repl.h"
#include "repl_failsafe.h"
#include <m_ctype.h>
#include <thr_alarm.h>
#include <myisam.h>
...
...
sql/sql_repl.cc
View file @
f89418b4
...
...
@@ -24,58 +24,15 @@
#include <thr_alarm.h>
#include <my_dir.h>
#define SLAVE_LIST_CHUNK 128
#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
extern
const
char
*
any_db
;
extern
pthread_handler_decl
(
handle_slave
,
arg
);
HASH
slave_list
;
#ifndef DBUG_OFF
int
max_binlog_dump_events
=
0
;
// unlimited
bool
opt_sporadic_binlog_dump_fail
=
0
;
static
int
binlog_dump_count
=
0
;
#endif
#ifdef SIGNAL_WITH_VIO_CLOSE
#define KICK_SLAVE { slave_thd->close_active_vio(); \
thr_alarm_kill(slave_real_id); }
#else
#define KICK_SLAVE thr_alarm_kill(slave_real_id);
#endif
static
Slave_log_event
*
find_slave_event
(
IO_CACHE
*
log
,
const
char
*
log_file_name
,
char
*
errmsg
);
static
uint32
*
slave_list_key
(
SLAVE_INFO
*
si
,
uint
*
len
,
my_bool
not_used
__attribute__
((
unused
)))
{
*
len
=
4
;
return
&
si
->
server_id
;
}
static
void
slave_info_free
(
void
*
s
)
{
my_free
((
gptr
)
s
,
MYF
(
MY_WME
));
}
void
init_slave_list
()
{
hash_init
(
&
slave_list
,
SLAVE_LIST_CHUNK
,
0
,
0
,
(
hash_get_key
)
slave_list_key
,
slave_info_free
,
0
);
pthread_mutex_init
(
&
LOCK_slave_list
,
MY_MUTEX_INIT_FAST
);
}
void
end_slave_list
()
{
pthread_mutex_lock
(
&
LOCK_slave_list
);
hash_free
(
&
slave_list
);
pthread_mutex_unlock
(
&
LOCK_slave_list
);
pthread_mutex_destroy
(
&
LOCK_slave_list
);
}
static
int
fake_rotate_event
(
NET
*
net
,
String
*
packet
,
char
*
log_file_name
,
const
char
**
errmsg
)
{
...
...
@@ -104,69 +61,6 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
return
0
;
}
#define get_object(p, obj) \
{\
uint len = (uint)*p++; \
if (p + len > p_end || len >= sizeof(obj)) \
goto err; \
strmake(obj,(char*) p,len); \
p+= len; \
}\
void
unregister_slave
(
THD
*
thd
,
bool
only_mine
,
bool
need_mutex
)
{
if
(
need_mutex
)
pthread_mutex_lock
(
&
LOCK_slave_list
);
if
(
thd
->
server_id
)
{
SLAVE_INFO
*
old_si
;
if
((
old_si
=
(
SLAVE_INFO
*
)
hash_search
(
&
slave_list
,
(
byte
*
)
&
thd
->
server_id
,
4
))
&&
(
!
only_mine
||
old_si
->
thd
==
thd
))
hash_delete
(
&
slave_list
,
(
byte
*
)
old_si
);
}
if
(
need_mutex
)
pthread_mutex_unlock
(
&
LOCK_slave_list
);
}
int
register_slave
(
THD
*
thd
,
uchar
*
packet
,
uint
packet_length
)
{
SLAVE_INFO
*
si
;
int
res
=
1
;
uchar
*
p
=
packet
,
*
p_end
=
packet
+
packet_length
;
if
(
check_access
(
thd
,
FILE_ACL
,
any_db
))
return
1
;
if
(
!
(
si
=
(
SLAVE_INFO
*
)
my_malloc
(
sizeof
(
SLAVE_INFO
),
MYF
(
MY_WME
))))
goto
err
;
thd
->
server_id
=
si
->
server_id
=
uint4korr
(
p
);
p
+=
4
;
get_object
(
p
,
si
->
host
);
get_object
(
p
,
si
->
user
);
get_object
(
p
,
si
->
password
);
si
->
port
=
uint2korr
(
p
);
p
+=
2
;
si
->
rpl_recovery_rank
=
uint4korr
(
p
);
p
+=
4
;
if
(
!
(
si
->
master_id
=
uint4korr
(
p
)))
si
->
master_id
=
server_id
;
si
->
thd
=
thd
;
pthread_mutex_lock
(
&
LOCK_slave_list
);
unregister_slave
(
thd
,
0
,
0
);
res
=
hash_insert
(
&
slave_list
,
(
byte
*
)
si
);
pthread_mutex_unlock
(
&
LOCK_slave_list
);
return
res
;
err:
if
(
si
)
my_free
((
gptr
)
si
,
MYF
(
MY_WME
));
return
res
;
}
static
int
send_file
(
THD
*
thd
)
{
NET
*
net
=
&
thd
->
net
;
...
...
@@ -252,6 +146,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
if
(
my_b_read
(
log
,
(
byte
*
)
magic
,
sizeof
(
magic
)))
{
*
errmsg
=
"I/O error reading the header from the binary log"
;
sql_print_error
(
"%s, errno=%d, io cache code=%d"
,
*
errmsg
,
my_errno
,
log
->
error
);
goto
err
;
}
if
(
memcmp
(
magic
,
BINLOG_MAGIC
,
sizeof
(
magic
)))
...
...
@@ -887,8 +783,13 @@ void reset_master()
}
LOG_INFO
linfo
;
pthread_mutex_t
*
log_lock
=
mysql_bin_log
.
get_log_lock
();
pthread_mutex_lock
(
log_lock
);
if
(
mysql_bin_log
.
find_first_log
(
&
linfo
,
""
))
{
pthread_mutex_unlock
(
log_lock
);
return
;
}
for
(;;)
{
...
...
@@ -898,7 +799,9 @@ void reset_master()
}
mysql_bin_log
.
close
(
1
);
// exiting close
my_delete
(
mysql_bin_log
.
get_index_fname
(),
MYF
(
MY_WME
));
mysql_bin_log
.
set_need_start_event
();
mysql_bin_log
.
open
(
opt_bin_logname
,
LOG_BIN
);
pthread_mutex_unlock
(
log_lock
);
}
...
...
@@ -915,242 +818,6 @@ int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
return
-
1
;
}
static
inline
int
cmp_master_pos
(
Slave_log_event
*
sev
,
LEX_MASTER_INFO
*
mi
)
{
return
cmp_master_pos
(
sev
->
master_log
,
sev
->
master_pos
,
mi
->
log_file_name
,
mi
->
pos
);
}
static
int
find_target_pos
(
LEX_MASTER_INFO
*
mi
,
IO_CACHE
*
log
,
char
*
errmsg
)
{
uint32
log_seq
=
mi
->
last_log_seq
;
uint32
target_server_id
=
mi
->
server_id
;
for
(;;)
{
Log_event
*
ev
;
if
(
!
(
ev
=
Log_event
::
read_log_event
(
log
,
0
)))
{
if
(
log
->
error
>
0
)
strmov
(
errmsg
,
"Binary log truncated in the middle of event"
);
else
if
(
log
->
error
<
0
)
strmov
(
errmsg
,
"I/O error reading binary log"
);
else
strmov
(
errmsg
,
"Could not find target event in the binary log"
);
return
1
;
}
if
(
ev
->
log_seq
==
log_seq
&&
ev
->
server_id
==
target_server_id
)
{
delete
ev
;
mi
->
pos
=
my_b_tell
(
log
);
return
0
;
}
delete
ev
;
}
}
int
translate_master
(
THD
*
thd
,
LEX_MASTER_INFO
*
mi
,
char
*
errmsg
)
{
LOG_INFO
linfo
;
char
search_file_name
[
FN_REFLEN
],
last_log_name
[
FN_REFLEN
];
IO_CACHE
log
;
File
file
=
-
1
,
last_file
=
-
1
;
pthread_mutex_t
*
log_lock
;
const
char
*
errmsg_p
;
Slave_log_event
*
sev
=
0
;
my_off_t
last_pos
=
0
;
int
error
=
1
;
int
cmp_res
;
LINT_INIT
(
cmp_res
);
if
(
!
mysql_bin_log
.
is_open
())
{
strmov
(
errmsg
,
"Binary log is not open"
);
return
1
;
}
if
(
!
server_id_supplied
)
{
strmov
(
errmsg
,
"Misconfigured master - server id was not set"
);
return
1
;
}
linfo
.
index_file_offset
=
0
;
search_file_name
[
0
]
=
0
;
if
(
mysql_bin_log
.
find_first_log
(
&
linfo
,
search_file_name
))
{
strmov
(
errmsg
,
"Could not find first log"
);
return
1
;
}
thd
->
current_linfo
=
&
linfo
;
bzero
((
char
*
)
&
log
,
sizeof
(
log
));
log_lock
=
mysql_bin_log
.
get_log_lock
();
pthread_mutex_lock
(
log_lock
);
for
(;;)
{
if
((
file
=
open_binlog
(
&
log
,
linfo
.
log_file_name
,
&
errmsg_p
))
<
0
)
{
strmov
(
errmsg
,
errmsg_p
);
goto
err
;
}
if
(
!
(
sev
=
find_slave_event
(
&
log
,
linfo
.
log_file_name
,
errmsg
)))
goto
err
;
cmp_res
=
cmp_master_pos
(
sev
,
mi
);
delete
sev
;
if
(
!
cmp_res
)
{
/* Copy basename */
fn_format
(
mi
->
log_file_name
,
linfo
.
log_file_name
,
""
,
""
,
1
);
mi
->
pos
=
my_b_tell
(
&
log
);
goto
mi_inited
;
}
else
if
(
cmp_res
>
0
)
{
if
(
!
last_pos
)
{
strmov
(
errmsg
,
"Slave event in first log points past the target position"
);
goto
err
;
}
end_io_cache
(
&
log
);
(
void
)
my_close
(
file
,
MYF
(
MY_WME
));
if
(
init_io_cache
(
&
log
,
(
file
=
last_file
),
IO_SIZE
,
READ_CACHE
,
0
,
0
,
MYF
(
MY_WME
)))
{
errmsg
[
0
]
=
0
;
goto
err
;
}
break
;
}
strmov
(
last_log_name
,
linfo
.
log_file_name
);
last_pos
=
my_b_tell
(
&
log
);
switch
(
mysql_bin_log
.
find_next_log
(
&
linfo
))
{
case
LOG_INFO_EOF
:
if
(
last_file
>=
0
)
(
void
)
my_close
(
last_file
,
MYF
(
MY_WME
));
last_file
=
-
1
;
goto
found_log
;
case
0
:
break
;
default:
strmov
(
errmsg
,
"Error reading log index"
);
goto
err
;
}
end_io_cache
(
&
log
);
if
(
last_file
>=
0
)
(
void
)
my_close
(
last_file
,
MYF
(
MY_WME
));
last_file
=
file
;
}
found_log:
my_b_seek
(
&
log
,
last_pos
);
if
(
find_target_pos
(
mi
,
&
log
,
errmsg
))
goto
err
;
fn_format
(
mi
->
log_file_name
,
last_log_name
,
""
,
""
,
1
);
/* Copy basename */
mi_inited:
error
=
0
;
err:
pthread_mutex_unlock
(
log_lock
);
end_io_cache
(
&
log
);
pthread_mutex_lock
(
&
LOCK_thread_count
);
thd
->
current_linfo
=
0
;
pthread_mutex_unlock
(
&
LOCK_thread_count
);
if
(
file
>=
0
)
(
void
)
my_close
(
file
,
MYF
(
MY_WME
));
if
(
last_file
>=
0
&&
last_file
!=
file
)
(
void
)
my_close
(
last_file
,
MYF
(
MY_WME
));
return
error
;
}
// caller must delete result when done
static
Slave_log_event
*
find_slave_event
(
IO_CACHE
*
log
,
const
char
*
log_file_name
,
char
*
errmsg
)
{
Log_event
*
ev
;
if
(
!
(
ev
=
Log_event
::
read_log_event
(
log
,
0
)))
{
my_snprintf
(
errmsg
,
SLAVE_ERRMSG_SIZE
,
"Error reading start event in log '%s'"
,
(
char
*
)
log_file_name
);
return
0
;
}
delete
ev
;
if
(
!
(
ev
=
Log_event
::
read_log_event
(
log
,
0
)))
{
my_snprintf
(
errmsg
,
SLAVE_ERRMSG_SIZE
,
"Error reading slave event in log '%s'"
,
(
char
*
)
log_file_name
);
return
0
;
}
if
(
ev
->
get_type_code
()
!=
SLAVE_EVENT
)
{
my_snprintf
(
errmsg
,
SLAVE_ERRMSG_SIZE
,
"Second event in log '%s' is not slave event"
,
(
char
*
)
log_file_name
);
delete
ev
;
return
0
;
}
return
(
Slave_log_event
*
)
ev
;
}
int
show_new_master
(
THD
*
thd
)
{
DBUG_ENTER
(
"show_new_master"
);
List
<
Item
>
field_list
;
char
errmsg
[
SLAVE_ERRMSG_SIZE
];
LEX_MASTER_INFO
*
lex_mi
=
&
thd
->
lex
.
mi
;
errmsg
[
0
]
=
0
;
// Safety
if
(
translate_master
(
thd
,
lex_mi
,
errmsg
))
{
if
(
errmsg
[
0
])
net_printf
(
&
thd
->
net
,
ER_ERROR_WHEN_EXECUTING_COMMAND
,
"SHOW NEW MASTER"
,
errmsg
);
else
send_error
(
&
thd
->
net
,
0
);
DBUG_RETURN
(
1
);
}
else
{
String
*
packet
=
&
thd
->
packet
;
field_list
.
push_back
(
new
Item_empty_string
(
"Log_name"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Log_pos"
,
20
));
if
(
send_fields
(
thd
,
field_list
,
1
))
DBUG_RETURN
(
-
1
);
packet
->
length
(
0
);
net_store_data
(
packet
,
lex_mi
->
log_file_name
);
net_store_data
(
packet
,
(
longlong
)
lex_mi
->
pos
);
if
(
my_net_write
(
&
thd
->
net
,
packet
->
ptr
(),
packet
->
length
()))
DBUG_RETURN
(
-
1
);
send_eof
(
&
thd
->
net
);
DBUG_RETURN
(
0
);
}
}
int
show_binlog_events
(
THD
*
thd
)
{
DBUG_ENTER
(
"show_binlog_events"
);
...
...
@@ -1202,7 +869,8 @@ int show_binlog_events(THD* thd)
pthread_mutex_lock
(
mysql_bin_log
.
get_log_lock
());
my_b_seek
(
&
log
,
pos
);
for
(
event_count
=
0
;
(
ev
=
Log_event
::
read_log_event
(
&
log
,
0
));
)
for
(
event_count
=
0
;
(
ev
=
Log_event
::
read_log_event
(
&
log
,(
pthread_mutex_t
*
)
0
,
0
));
)
{
if
(
event_count
>=
limit_start
&&
ev
->
net_send
(
thd
,
linfo
.
log_file_name
,
pos
))
...
...
@@ -1247,56 +915,6 @@ int show_binlog_events(THD* thd)
DBUG_RETURN
(
0
);
}
int
show_slave_hosts
(
THD
*
thd
)
{
List
<
Item
>
field_list
;
NET
*
net
=
&
thd
->
net
;
String
*
packet
=
&
thd
->
packet
;
DBUG_ENTER
(
"show_slave_hosts"
);
field_list
.
push_back
(
new
Item_empty_string
(
"Server_id"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Host"
,
20
));
if
(
opt_show_slave_auth_info
)
{
field_list
.
push_back
(
new
Item_empty_string
(
"User"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Password"
,
20
));
}
field_list
.
push_back
(
new
Item_empty_string
(
"Port"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Rpl_recovery_rank"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Master_id"
,
20
));
if
(
send_fields
(
thd
,
field_list
,
1
))
DBUG_RETURN
(
-
1
);
pthread_mutex_lock
(
&
LOCK_slave_list
);
for
(
uint
i
=
0
;
i
<
slave_list
.
records
;
++
i
)
{
SLAVE_INFO
*
si
=
(
SLAVE_INFO
*
)
hash_element
(
&
slave_list
,
i
);
packet
->
length
(
0
);
net_store_data
(
packet
,
si
->
server_id
);
net_store_data
(
packet
,
si
->
host
);
if
(
opt_show_slave_auth_info
)
{
net_store_data
(
packet
,
si
->
user
);
net_store_data
(
packet
,
si
->
password
);
}
net_store_data
(
packet
,
(
uint32
)
si
->
port
);
net_store_data
(
packet
,
si
->
rpl_recovery_rank
);
net_store_data
(
packet
,
si
->
master_id
);
if
(
my_net_write
(
net
,
(
char
*
)
packet
->
ptr
(),
packet
->
length
()))
{
pthread_mutex_unlock
(
&
LOCK_slave_list
);
DBUG_RETURN
(
-
1
);
}
}
pthread_mutex_unlock
(
&
LOCK_slave_list
);
send_eof
(
net
);
DBUG_RETURN
(
0
);
}
int
show_binlog_info
(
THD
*
thd
)
{
DBUG_ENTER
(
"show_binlog_info"
);
...
...
@@ -1402,230 +1020,6 @@ int show_binlogs(THD* thd)
return
1
;
}
int
connect_to_master
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
)
{
if
(
!
mc_mysql_connect
(
mysql
,
mi
->
host
,
mi
->
user
,
mi
->
password
,
0
,
mi
->
port
,
0
,
0
))
{
sql_print_error
(
"Connection to master failed: %s"
,
mc_mysql_error
(
mysql
));
return
1
;
}
return
0
;
}
static
inline
void
cleanup_mysql_results
(
MYSQL_RES
*
db_res
,
MYSQL_RES
**
cur
,
MYSQL_RES
**
start
)
{
for
(
;
cur
>=
start
;
--
cur
)
{
if
(
*
cur
)
mc_mysql_free_result
(
*
cur
);
}
mc_mysql_free_result
(
db_res
);
}
static
inline
int
fetch_db_tables
(
THD
*
thd
,
MYSQL
*
mysql
,
const
char
*
db
,
MYSQL_RES
*
table_res
)
{
MYSQL_ROW
row
;
for
(
row
=
mc_mysql_fetch_row
(
table_res
);
row
;
row
=
mc_mysql_fetch_row
(
table_res
))
{
TABLE_LIST
table
;
const
char
*
table_name
=
row
[
0
];
int
error
;
if
(
table_rules_on
)
{
table
.
next
=
0
;
table
.
db
=
(
char
*
)
db
;
table
.
real_name
=
(
char
*
)
table_name
;
table
.
updating
=
1
;
if
(
!
tables_ok
(
thd
,
&
table
))
continue
;
}
if
((
error
=
fetch_nx_table
(
thd
,
db
,
table_name
,
&
glob_mi
,
mysql
)))
return
error
;
}
return
0
;
}
int
load_master_data
(
THD
*
thd
)
{
MYSQL
mysql
;
MYSQL_RES
*
master_status_res
=
0
;
bool
slave_was_running
=
0
;
int
error
=
0
;
mc_mysql_init
(
&
mysql
);
// we do not want anyone messing with the slave at all for the entire
// duration of the data load;
pthread_mutex_lock
(
&
LOCK_slave
);
// first, kill the slave
if
((
slave_was_running
=
slave_running
))
{
abort_slave
=
1
;
KICK_SLAVE
;
thd
->
proc_info
=
"waiting for slave to die"
;
while
(
slave_running
)
pthread_cond_wait
(
&
COND_slave_stopped
,
&
LOCK_slave
);
// wait until done
}
if
(
connect_to_master
(
thd
,
&
mysql
,
&
glob_mi
))
{
net_printf
(
&
thd
->
net
,
error
=
ER_CONNECT_TO_MASTER
,
mc_mysql_error
(
&
mysql
));
goto
err
;
}
// now that we are connected, get all database and tables in each
{
MYSQL_RES
*
db_res
,
**
table_res
,
**
table_res_end
,
**
cur_table_res
;
uint
num_dbs
;
if
(
mc_mysql_query
(
&
mysql
,
"show databases"
,
0
)
||
!
(
db_res
=
mc_mysql_store_result
(
&
mysql
)))
{
net_printf
(
&
thd
->
net
,
error
=
ER_QUERY_ON_MASTER
,
mc_mysql_error
(
&
mysql
));
goto
err
;
}
if
(
!
(
num_dbs
=
(
uint
)
mc_mysql_num_rows
(
db_res
)))
goto
err
;
// in theory, the master could have no databases at all
// and run with skip-grant
if
(
!
(
table_res
=
(
MYSQL_RES
**
)
thd
->
alloc
(
num_dbs
*
sizeof
(
MYSQL_RES
*
))))
{
net_printf
(
&
thd
->
net
,
error
=
ER_OUTOFMEMORY
);
goto
err
;
}
// this is a temporary solution until we have online backup
// capabilities - to be replaced once online backup is working
// we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
// can to minimize the lock time
if
(
mc_mysql_query
(
&
mysql
,
"FLUSH TABLES WITH READ LOCK"
,
0
)
||
mc_mysql_query
(
&
mysql
,
"SHOW MASTER STATUS"
,
0
)
||
!
(
master_status_res
=
mc_mysql_store_result
(
&
mysql
)))
{
net_printf
(
&
thd
->
net
,
error
=
ER_QUERY_ON_MASTER
,
mc_mysql_error
(
&
mysql
));
goto
err
;
}
// go through every table in every database, and if the replication
// rules allow replicating it, get it
table_res_end
=
table_res
+
num_dbs
;
for
(
cur_table_res
=
table_res
;
cur_table_res
<
table_res_end
;
cur_table_res
++
)
{
// since we know how many rows we have, this can never be NULL
MYSQL_ROW
row
=
mc_mysql_fetch_row
(
db_res
);
char
*
db
=
row
[
0
];
/*
Do not replicate databases excluded by rules
also skip mysql database - in most cases the user will
mess up and not exclude mysql database with the rules when
he actually means to - in this case, he is up for a surprise if
his priv tables get dropped and downloaded from master
TO DO - add special option, not enabled
by default, to allow inclusion of mysql database into load
data from master
*/
if
(
!
db_ok
(
db
,
replicate_do_db
,
replicate_ignore_db
)
||
!
strcmp
(
db
,
"mysql"
))
{
*
cur_table_res
=
0
;
continue
;
}
if
(
mysql_rm_db
(
thd
,
db
,
1
,
1
)
||
mysql_create_db
(
thd
,
db
,
0
,
1
))
{
send_error
(
&
thd
->
net
,
0
,
0
);
cleanup_mysql_results
(
db_res
,
cur_table_res
-
1
,
table_res
);
goto
err
;
}
if
(
mc_mysql_select_db
(
&
mysql
,
db
)
||
mc_mysql_query
(
&
mysql
,
"show tables"
,
0
)
||
!
(
*
cur_table_res
=
mc_mysql_store_result
(
&
mysql
)))
{
net_printf
(
&
thd
->
net
,
error
=
ER_QUERY_ON_MASTER
,
mc_mysql_error
(
&
mysql
));
cleanup_mysql_results
(
db_res
,
cur_table_res
-
1
,
table_res
);
goto
err
;
}
if
((
error
=
fetch_db_tables
(
thd
,
&
mysql
,
db
,
*
cur_table_res
)))
{
// we do not report the error - fetch_db_tables handles it
cleanup_mysql_results
(
db_res
,
cur_table_res
,
table_res
);
goto
err
;
}
}
cleanup_mysql_results
(
db_res
,
cur_table_res
-
1
,
table_res
);
// adjust position in the master
if
(
master_status_res
)
{
MYSQL_ROW
row
=
mc_mysql_fetch_row
(
master_status_res
);
/*
We need this check because the master may not be running with
log-bin, but it will still allow us to do all the steps
of LOAD DATA FROM MASTER - no reason to forbid it, really,
although it does not make much sense for the user to do it
*/
if
(
row
[
0
]
&&
row
[
1
])
{
strmake
(
glob_mi
.
log_file_name
,
row
[
0
],
sizeof
(
glob_mi
.
log_file_name
));
glob_mi
.
pos
=
atoi
(
row
[
1
]);
// atoi() is ok, since offset is <= 1GB
if
(
glob_mi
.
pos
<
4
)
glob_mi
.
pos
=
4
;
// don't hit the magic number
glob_mi
.
pending
=
0
;
flush_master_info
(
&
glob_mi
);
}
mc_mysql_free_result
(
master_status_res
);
}
if
(
mc_mysql_query
(
&
mysql
,
"UNLOCK TABLES"
,
0
))
{
net_printf
(
&
thd
->
net
,
error
=
ER_QUERY_ON_MASTER
,
mc_mysql_error
(
&
mysql
));
goto
err
;
}
}
err:
pthread_mutex_unlock
(
&
LOCK_slave
);
if
(
slave_was_running
)
start_slave
(
0
,
0
);
mc_mysql_close
(
&
mysql
);
// safe to call since we always do mc_mysql_init()
if
(
!
error
)
send_ok
(
&
thd
->
net
);
return
error
;
}
int
log_loaded_block
(
IO_CACHE
*
file
)
{
LOAD_FILE_INFO
*
lf_info
;
...
...
sql/sql_repl.h
View file @
f89418b4
...
...
@@ -15,7 +15,6 @@ typedef struct st_slave_info
}
SLAVE_INFO
;
extern
bool
opt_show_slave_auth_info
,
opt_old_rpl_compat
;
extern
HASH
slave_list
;
extern
char
*
master_host
;
extern
my_string
opt_bin_logname
,
master_info_file
;
extern
uint32
server_id
;
...
...
@@ -27,26 +26,24 @@ extern int max_binlog_dump_events;
extern
bool
opt_sporadic_binlog_dump_fail
;
#endif
#ifdef SIGNAL_WITH_VIO_CLOSE
#define KICK_SLAVE { slave_thd->close_active_vio(); \
thr_alarm_kill(slave_real_id); }
#else
#define KICK_SLAVE thr_alarm_kill(slave_real_id);
#endif
File
open_binlog
(
IO_CACHE
*
log
,
const
char
*
log_file_name
,
const
char
**
errmsg
);
int
start_slave
(
THD
*
thd
=
0
,
bool
net_report
=
1
);
int
stop_slave
(
THD
*
thd
=
0
,
bool
net_report
=
1
);
int
load_master_data
(
THD
*
thd
);
int
connect_to_master
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
);
int
change_master
(
THD
*
thd
);
int
show_new_master
(
THD
*
thd
);
int
show_slave_hosts
(
THD
*
thd
);
int
show_binlog_events
(
THD
*
thd
);
int
translate_master
(
THD
*
thd
,
LEX_MASTER_INFO
*
mi
,
char
*
errmsg
);
int
cmp_master_pos
(
const
char
*
log_file_name1
,
ulonglong
log_pos1
,
const
char
*
log_file_name2
,
ulonglong
log_pos2
);
void
reset_slave
();
void
reset_master
();
void
init_slave_list
();
void
end_slave_list
();
int
register_slave
(
THD
*
thd
,
uchar
*
packet
,
uint
packet_length
);
void
unregister_slave
(
THD
*
thd
,
bool
only_mine
,
bool
need_mutex
);
int
purge_master_logs
(
THD
*
thd
,
const
char
*
to_log
);
bool
log_in_use
(
const
char
*
log_name
);
void
adjust_linfo_offsets
(
my_off_t
purge_offset
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment