Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
6d88799e
Commit
6d88799e
authored
Oct 29, 2002
by
nick@mysql.com
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Reordered functions--grouped by class now.
Added comment blocks.
parent
cb1d72e4
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
1746 additions
and
1040 deletions
+1746
-1040
.bzrignore
.bzrignore
+2
-0
sql/log_event.cc
sql/log_event.cc
+1647
-1030
sql/log_event.h
sql/log_event.h
+97
-10
No files found.
.bzrignore
View file @
6d88799e
...
@@ -513,3 +513,5 @@ innobase/stamp-h1
...
@@ -513,3 +513,5 @@ innobase/stamp-h1
myisam/rt_test.MYD
myisam/rt_test.MYD
myisam/rt_test.MYI
myisam/rt_test.MYI
stamp-h1
stamp-h1
scripts/fill_func_tables
scripts/fill_func_tables.sql
sql/log_event.cc
View file @
6d88799e
...
@@ -26,6 +26,11 @@
...
@@ -26,6 +26,11 @@
#include <assert.h>
#include <assert.h>
/*****************************************************************************
my_b_safe_write()
****************************************************************************/
inline
int
my_b_safe_write
(
IO_CACHE
*
file
,
const
byte
*
buf
,
inline
int
my_b_safe_write
(
IO_CACHE
*
file
,
const
byte
*
buf
,
int
len
)
int
len
)
{
{
...
@@ -40,6 +45,11 @@ inline int my_b_safe_write(IO_CACHE* file, const byte *buf,
...
@@ -40,6 +45,11 @@ inline int my_b_safe_write(IO_CACHE* file, const byte *buf,
return
my_b_write
(
file
,
buf
,
len
);
return
my_b_write
(
file
,
buf
,
len
);
}
}
/*****************************************************************************
pretty_print_str()
****************************************************************************/
#ifdef MYSQL_CLIENT
#ifdef MYSQL_CLIENT
static
void
pretty_print_str
(
FILE
*
file
,
char
*
str
,
int
len
)
static
void
pretty_print_str
(
FILE
*
file
,
char
*
str
,
int
len
)
{
{
...
@@ -63,16 +73,26 @@ static void pretty_print_str(FILE* file, char* str, int len)
...
@@ -63,16 +73,26 @@ static void pretty_print_str(FILE* file, char* str, int len)
}
}
fputc
(
'\''
,
file
);
fputc
(
'\''
,
file
);
}
}
#endif
#endif
// MYSQL_CLIENT
#ifndef MYSQL_CLIENT
/*****************************************************************************
ignored_error_code()
****************************************************************************/
#ifndef MYSQL_CLIENT
inline
int
ignored_error_code
(
int
err_code
)
inline
int
ignored_error_code
(
int
err_code
)
{
{
return
use_slave_mask
&&
bitmap_is_set
(
&
slave_error_mask
,
err_code
);
return
use_slave_mask
&&
bitmap_is_set
(
&
slave_error_mask
,
err_code
);
}
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
pretty_print_str()
****************************************************************************/
#ifndef MYSQL_CLIENT
static
void
pretty_print_str
(
String
*
packet
,
char
*
str
,
int
len
)
static
void
pretty_print_str
(
String
*
packet
,
char
*
str
,
int
len
)
{
{
char
*
end
=
str
+
len
;
char
*
end
=
str
+
len
;
...
@@ -95,8 +115,14 @@ static void pretty_print_str(String* packet, char* str, int len)
...
@@ -95,8 +115,14 @@ static void pretty_print_str(String* packet, char* str, int len)
}
}
packet
->
append
(
'\''
);
packet
->
append
(
'\''
);
}
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
slave_load_file_stem()
****************************************************************************/
#ifndef MYSQL_CLIENT
static
inline
char
*
slave_load_file_stem
(
char
*
buf
,
uint
file_id
,
static
inline
char
*
slave_load_file_stem
(
char
*
buf
,
uint
file_id
,
int
event_server_id
)
int
event_server_id
)
{
{
...
@@ -108,9 +134,81 @@ static inline char* slave_load_file_stem(char*buf, uint file_id,
...
@@ -108,9 +134,81 @@ static inline char* slave_load_file_stem(char*buf, uint file_id,
*
buf
++
=
'-'
;
*
buf
++
=
'-'
;
return
int10_to_str
(
file_id
,
buf
,
10
);
return
int10_to_str
(
file_id
,
buf
,
10
);
}
}
#endif // !MYSQL_CLIENT
#endif
/*****************************************************************************
cleanup_load_tmpdir()
Delete all temporary files used for SQL_LOAD.
TODO
- When we get a 'server start' event, we should only remove
the files associated with the server id that just started.
Easily fixable by adding server_id as a prefix to the log files.
****************************************************************************/
#ifndef MYSQL_CLIENT
static
void
cleanup_load_tmpdir
()
{
MY_DIR
*
dirp
;
FILEINFO
*
file
;
uint
i
;
if
(
!
(
dirp
=
my_dir
(
slave_load_tmpdir
,
MYF
(
MY_WME
))))
return
;
for
(
i
=
0
;
i
<
(
uint
)
dirp
->
number_off_files
;
i
++
)
{
file
=
dirp
->
dir_entry
+
i
;
if
(
is_prefix
(
file
->
name
,
"SQL_LOAD-"
))
my_delete
(
file
->
name
,
MYF
(
0
));
}
my_dirend
(
dirp
);
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
write_str()
****************************************************************************/
static
bool
write_str
(
IO_CACHE
*
file
,
char
*
str
,
byte
length
)
{
return
(
my_b_safe_write
(
file
,
&
length
,
1
)
||
my_b_safe_write
(
file
,
(
byte
*
)
str
,
(
int
)
length
));
}
/*****************************************************************************
read_str()
****************************************************************************/
static
inline
int
read_str
(
char
*
&
buf
,
char
*
buf_end
,
char
*
&
str
,
uint8
&
len
)
{
if
(
buf
+
(
uint
)
(
uchar
)
*
buf
>=
buf_end
)
return
1
;
len
=
(
uint8
)
*
buf
;
str
=
buf
+
1
;
buf
+=
(
uint
)
len
+
1
;
return
0
;
}
/*****************************************************************************
*****************************************************************************
Log_event methods
*****************************************************************************
****************************************************************************/
/*****************************************************************************
Log_event::get_type_str()
****************************************************************************/
const
char
*
Log_event
::
get_type_str
()
const
char
*
Log_event
::
get_type_str
()
{
{
switch
(
get_type_code
())
{
switch
(
get_type_code
())
{
...
@@ -131,6 +229,11 @@ const char* Log_event::get_type_str()
...
@@ -131,6 +229,11 @@ const char* Log_event::get_type_str()
}
}
}
}
/*****************************************************************************
Log_event::Log_event()
****************************************************************************/
#ifndef MYSQL_CLIENT
#ifndef MYSQL_CLIENT
Log_event
::
Log_event
(
THD
*
thd_arg
,
uint16
flags_arg
)
Log_event
::
Log_event
(
THD
*
thd_arg
,
uint16
flags_arg
)
:
exec_time
(
0
),
flags
(
flags_arg
),
cached_event_len
(
0
),
:
exec_time
(
0
),
flags
(
flags_arg
),
cached_event_len
(
0
),
...
@@ -146,39 +249,16 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg)
...
@@ -146,39 +249,16 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg)
{
{
server_id
=
::
server_id
;
server_id
=
::
server_id
;
when
=
time
(
NULL
);
when
=
time
(
NULL
);
log_pos
=
0
;
log_pos
=
0
;
}
}
}
}
#endif // !MYSQL_CLIENT
/*
/*****************************************************************************
Delete all temporary files used for SQL_LOAD.
TODO
- When we get a 'server start' event, we should only remove
the files associated with the server id that just started.
Easily fixable by adding server_id as a prefix to the log files.
*/
static
void
cleanup_load_tmpdir
()
{
MY_DIR
*
dirp
;
FILEINFO
*
file
;
uint
i
;
if
(
!
(
dirp
=
my_dir
(
slave_load_tmpdir
,
MYF
(
MY_WME
))))
return
;
for
(
i
=
0
;
i
<
(
uint
)
dirp
->
number_off_files
;
i
++
)
{
file
=
dirp
->
dir_entry
+
i
;
if
(
is_prefix
(
file
->
name
,
"SQL_LOAD-"
))
my_delete
(
file
->
name
,
MYF
(
0
));
}
my_dirend
(
dirp
);
}
#endif
Log_event::Log_event()
****************************************************************************/
Log_event
::
Log_event
(
const
char
*
buf
,
bool
old_format
)
Log_event
::
Log_event
(
const
char
*
buf
,
bool
old_format
)
:
cached_event_len
(
0
),
temp_buf
(
0
)
:
cached_event_len
(
0
),
temp_buf
(
0
)
{
{
...
@@ -202,6 +282,11 @@ Log_event::Log_event(const char* buf, bool old_format)
...
@@ -202,6 +282,11 @@ Log_event::Log_event(const char* buf, bool old_format)
#ifndef MYSQL_CLIENT
#ifndef MYSQL_CLIENT
/*****************************************************************************
Log_event::exec_event()
****************************************************************************/
int
Log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
int
Log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
{
{
if
(
rli
)
// QQ When is this not true ?
if
(
rli
)
// QQ When is this not true ?
...
@@ -213,219 +298,73 @@ int Log_event::exec_event(struct st_relay_log_info* rli)
...
@@ -213,219 +298,73 @@ int Log_event::exec_event(struct st_relay_log_info* rli)
return
0
;
return
0
;
}
}
/*****************************************************************************
Log_event::pack_info()
****************************************************************************/
void
Log_event
::
pack_info
(
String
*
packet
)
void
Log_event
::
pack_info
(
String
*
packet
)
{
{
net_store_data
(
packet
,
""
,
0
);
net_store_data
(
packet
,
""
,
0
);
}
}
void
Query_log_event
::
pack_info
(
String
*
packet
)
/*****************************************************************************
{
char
buf
[
256
];
String
tmp
(
buf
,
sizeof
(
buf
),
system_charset_info
);
tmp
.
length
(
0
);
if
(
db
&&
db_len
)
{
tmp
.
append
(
"use `"
,
5
);
tmp
.
append
(
db
,
db_len
);
tmp
.
append
(
"`; "
,
3
);
}
if
(
query
&&
q_len
)
Log_event::init_show_field_list()
tmp
.
append
(
query
,
q_len
);
net_store_data
(
packet
,
(
char
*
)
tmp
.
ptr
(),
tmp
.
length
());
}
void
Start_log_event
::
pack_info
(
String
*
packet
)
****************************************************************************/
void
Log_event
::
init_show_field_list
(
List
<
Item
>*
field_list
)
{
{
char
buf1
[
256
];
field_list
->
push_back
(
new
Item_empty_string
(
"Log_name"
,
20
));
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
field_list
->
push_back
(
new
Item_empty_string
(
"Pos"
,
20
));
tmp
.
length
(
0
);
field_list
->
push_back
(
new
Item_empty_string
(
"Event_type"
,
20
));
char
buf
[
22
];
field_list
->
push_back
(
new
Item_empty_string
(
"Server_id"
,
20
));
field_list
->
push_back
(
new
Item_empty_string
(
"Orig_log_pos"
,
20
));
tmp
.
append
(
"Server ver: "
);
field_list
->
push_back
(
new
Item_empty_string
(
"Info"
,
20
));
tmp
.
append
(
server_version
);
tmp
.
append
(
", Binlog ver: "
);
tmp
.
append
(
llstr
(
binlog_version
,
buf
));
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
}
void
Load_log_event
::
pack_info
(
String
*
packet
)
/*****************************************************************************
{
char
buf
[
256
];
String
tmp
(
buf
,
sizeof
(
buf
),
system_charset_info
);
tmp
.
length
(
0
);
if
(
db
&&
db_len
)
{
tmp
.
append
(
"use "
);
tmp
.
append
(
db
,
db_len
);
tmp
.
append
(
"; "
,
2
);
}
tmp
.
append
(
"LOAD DATA INFILE '"
);
tmp
.
append
(
fname
,
fname_len
);
tmp
.
append
(
"' "
,
2
);
if
(
sql_ex
.
opt_flags
&&
REPLACE_FLAG
)
tmp
.
append
(
" REPLACE "
);
else
if
(
sql_ex
.
opt_flags
&&
IGNORE_FLAG
)
tmp
.
append
(
" IGNORE "
);
tmp
.
append
(
"INTO TABLE "
);
Log_event::net_send()
tmp
.
append
(
table_name
);
if
(
sql_ex
.
field_term_len
)
{
tmp
.
append
(
" FIELDS TERMINATED BY "
);
pretty_print_str
(
&
tmp
,
sql_ex
.
field_term
,
sql_ex
.
field_term_len
);
}
if
(
sql_ex
.
enclosed_len
)
Only called by SHOW BINLOG EVENTS
{
if
(
sql_ex
.
opt_flags
&&
OPT_ENCLOSED_FLAG
)
tmp
.
append
(
" OPTIONALLY "
);
tmp
.
append
(
" ENCLOSED BY "
);
pretty_print_str
(
&
tmp
,
sql_ex
.
enclosed
,
sql_ex
.
enclosed_len
);
}
if
(
sql_ex
.
escaped_len
)
****************************************************************************/
{
int
Log_event
::
net_send
(
THD
*
thd
,
const
char
*
log_name
,
my_off_t
pos
)
tmp
.
append
(
" ESCAPED BY "
);
{
pretty_print_str
(
&
tmp
,
sql_ex
.
escaped
,
sql_ex
.
escaped_len
);
String
*
packet
=
&
thd
->
packet
;
}
const
char
*
p
=
strrchr
(
log_name
,
FN_LIBCHAR
);
const
char
*
event_type
;
if
(
p
)
log_name
=
p
+
1
;
if
(
sql_ex
.
line_term_len
)
packet
->
length
(
0
);
{
net_store_data
(
packet
,
log_name
,
strlen
(
log_name
));
tmp
.
append
(
" LINES TERMINATED BY "
);
net_store_data
(
packet
,
(
longlong
)
pos
);
pretty_print_str
(
&
tmp
,
sql_ex
.
line_term
,
sql_ex
.
line_term_len
);
event_type
=
get_type_str
();
}
net_store_data
(
packet
,
event_type
,
strlen
(
event_type
));
net_store_data
(
packet
,
server_id
);
net_store_data
(
packet
,
(
longlong
)
log_pos
);
pack_info
(
packet
);
return
my_net_write
(
&
thd
->
net
,
(
char
*
)
packet
->
ptr
(),
packet
->
length
());
}
#endif // !MYSQL_CLIENT
if
(
sql_ex
.
line_start_len
)
/*****************************************************************************
{
tmp
.
append
(
" LINES STARTING BY "
);
pretty_print_str
(
&
tmp
,
sql_ex
.
line_start
,
sql_ex
.
line_start_len
);
}
if
((
int
)
skip_lines
>
0
)
Log_event::write()
tmp
.
append
(
" IGNORE %ld LINES "
,
(
long
)
skip_lines
);
if
(
num_fields
)
****************************************************************************/
{
int
Log_event
::
write
(
IO_CACHE
*
file
)
uint
i
;
{
const
char
*
field
=
fields
;
return
(
write_header
(
file
)
||
write_data
(
file
))
?
-
1
:
0
;
tmp
.
append
(
" ("
);
}
for
(
i
=
0
;
i
<
num_fields
;
i
++
)
{
if
(
i
)
tmp
.
append
(
" ,"
);
tmp
.
append
(
field
);
field
+=
field_lens
[
i
]
+
1
;
/*****************************************************************************
}
tmp
.
append
(
')'
);
}
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
void
Rotate_log_event
::
pack_info
(
String
*
packet
)
{
char
buf1
[
256
],
buf
[
22
];
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
tmp
.
length
(
0
);
tmp
.
append
(
new_log_ident
,
ident_len
);
tmp
.
append
(
";pos="
);
tmp
.
append
(
llstr
(
pos
,
buf
));
if
(
flags
&
LOG_EVENT_FORCED_ROTATE_F
)
tmp
.
append
(
"; forced by master"
);
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
void
Intvar_log_event
::
pack_info
(
String
*
packet
)
{
char
buf1
[
256
],
buf
[
22
];
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
tmp
.
length
(
0
);
tmp
.
append
(
get_var_type_name
());
tmp
.
append
(
'='
);
tmp
.
append
(
llstr
(
val
,
buf
));
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
void
Rand_log_event
::
pack_info
(
String
*
packet
)
{
char
buf1
[
256
],
buf
[
22
];
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
tmp
.
length
(
0
);
tmp
.
append
(
"randseed1="
);
tmp
.
append
(
llstr
(
seed1
,
buf
));
tmp
.
append
(
",randseed2="
);
tmp
.
append
(
llstr
(
seed2
,
buf
));
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
void
Slave_log_event
::
pack_info
(
String
*
packet
)
{
char
buf1
[
256
],
buf
[
22
],
*
end
;
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
tmp
.
length
(
0
);
tmp
.
append
(
"host="
);
tmp
.
append
(
master_host
);
tmp
.
append
(
",port="
);
end
=
int10_to_str
((
long
)
master_port
,
buf
,
10
);
tmp
.
append
(
buf
,
(
uint32
)
(
end
-
buf
));
tmp
.
append
(
",log="
);
tmp
.
append
(
master_log
);
tmp
.
append
(
",pos="
);
tmp
.
append
(
llstr
(
master_pos
,
buf
));
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
void
Log_event
::
init_show_field_list
(
List
<
Item
>*
field_list
)
{
field_list
->
push_back
(
new
Item_empty_string
(
"Log_name"
,
20
));
field_list
->
push_back
(
new
Item_empty_string
(
"Pos"
,
20
));
field_list
->
push_back
(
new
Item_empty_string
(
"Event_type"
,
20
));
field_list
->
push_back
(
new
Item_empty_string
(
"Server_id"
,
20
));
field_list
->
push_back
(
new
Item_empty_string
(
"Orig_log_pos"
,
20
));
field_list
->
push_back
(
new
Item_empty_string
(
"Info"
,
20
));
}
/*
* only called by SHOW BINLOG EVENTS
*/
int
Log_event
::
net_send
(
THD
*
thd
,
const
char
*
log_name
,
my_off_t
pos
)
{
String
*
packet
=
&
thd
->
packet
;
const
char
*
p
=
strrchr
(
log_name
,
FN_LIBCHAR
);
const
char
*
event_type
;
if
(
p
)
log_name
=
p
+
1
;
packet
->
length
(
0
);
net_store_data
(
packet
,
log_name
,
strlen
(
log_name
));
net_store_data
(
packet
,
(
longlong
)
pos
);
event_type
=
get_type_str
();
net_store_data
(
packet
,
event_type
,
strlen
(
event_type
));
net_store_data
(
packet
,
server_id
);
net_store_data
(
packet
,
(
longlong
)
log_pos
);
pack_info
(
packet
);
return
my_net_write
(
&
thd
->
net
,
(
char
*
)
packet
->
ptr
(),
packet
->
length
());
}
#endif
/* MYSQL_CLIENT */
int
Query_log_event
::
write
(
IO_CACHE
*
file
)
{
return
query
?
Log_event
::
write
(
file
)
:
-
1
;
}
int
Log_event
::
write
(
IO_CACHE
*
file
)
{
return
(
write_header
(
file
)
||
write_data
(
file
))
?
-
1
:
0
;
}
Log_event::write_header()
****************************************************************************/
int
Log_event
::
write_header
(
IO_CACHE
*
file
)
int
Log_event
::
write_header
(
IO_CACHE
*
file
)
{
{
char
buf
[
LOG_EVENT_HEADER_LEN
];
char
buf
[
LOG_EVENT_HEADER_LEN
];
...
@@ -445,8 +384,13 @@ int Log_event::write_header(IO_CACHE* file)
...
@@ -445,8 +384,13 @@ int Log_event::write_header(IO_CACHE* file)
return
(
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
(
uint
)
(
pos
-
buf
)));
return
(
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
(
uint
)
(
pos
-
buf
)));
}
}
#ifndef MYSQL_CLIENT
/*****************************************************************************
Log_event::read_log_event()
****************************************************************************/
#ifndef MYSQL_CLIENT
int
Log_event
::
read_log_event
(
IO_CACHE
*
file
,
String
*
packet
,
int
Log_event
::
read_log_event
(
IO_CACHE
*
file
,
String
*
packet
,
pthread_mutex_t
*
log_lock
)
pthread_mutex_t
*
log_lock
)
{
{
...
@@ -501,8 +445,7 @@ end:
...
@@ -501,8 +445,7 @@ end:
pthread_mutex_unlock
(
log_lock
);
pthread_mutex_unlock
(
log_lock
);
DBUG_RETURN
(
result
);
DBUG_RETURN
(
result
);
}
}
#endif // !MYSQL_CLIENT
#endif // MYSQL_CLIENT
#ifndef MYSQL_CLIENT
#ifndef MYSQL_CLIENT
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
...
@@ -513,7 +456,13 @@ end:
...
@@ -513,7 +456,13 @@ end:
#define LOCK_MUTEX
#define LOCK_MUTEX
#endif
#endif
// allocates memory - the caller is responsible for clean-up
/*****************************************************************************
Log_event::read_log_event()
Allocates memory--the caller is responsible for clean-up
****************************************************************************/
#ifndef MYSQL_CLIENT
#ifndef MYSQL_CLIENT
Log_event
*
Log_event
::
read_log_event
(
IO_CACHE
*
file
,
Log_event
*
Log_event
::
read_log_event
(
IO_CACHE
*
file
,
pthread_mutex_t
*
log_lock
,
pthread_mutex_t
*
log_lock
,
...
@@ -576,7 +525,11 @@ data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]);
...
@@ -576,7 +525,11 @@ data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]);
return
res
;
return
res
;
}
}
/*****************************************************************************
Log_event::read_log_event()
****************************************************************************/
Log_event
*
Log_event
::
read_log_event
(
const
char
*
buf
,
int
event_len
,
Log_event
*
Log_event
::
read_log_event
(
const
char
*
buf
,
int
event_len
,
const
char
**
error
,
bool
old_format
)
const
char
**
error
,
bool
old_format
)
{
{
...
@@ -642,8 +595,13 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
...
@@ -642,8 +595,13 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
return
ev
;
return
ev
;
}
}
#ifdef MYSQL_CLIENT
#ifdef MYSQL_CLIENT
/*****************************************************************************
Log_event::print_header()
****************************************************************************/
void
Log_event
::
print_header
(
FILE
*
file
)
void
Log_event
::
print_header
(
FILE
*
file
)
{
{
char
llbuff
[
22
];
char
llbuff
[
22
];
...
@@ -653,6 +611,11 @@ void Log_event::print_header(FILE* file)
...
@@ -653,6 +611,11 @@ void Log_event::print_header(FILE* file)
llstr
(
log_pos
,
llbuff
));
llstr
(
log_pos
,
llbuff
));
}
}
/*****************************************************************************
Log_event::print_timestamp()
****************************************************************************/
void
Log_event
::
print_timestamp
(
FILE
*
file
,
time_t
*
ts
)
void
Log_event
::
print_timestamp
(
FILE
*
file
,
time_t
*
ts
)
{
{
struct
tm
*
res
;
struct
tm
*
res
;
...
@@ -674,131 +637,114 @@ void Log_event::print_timestamp(FILE* file, time_t* ts)
...
@@ -674,131 +637,114 @@ void Log_event::print_timestamp(FILE* file, time_t* ts)
res
->
tm_sec
);
res
->
tm_sec
);
}
}
#endif // MYSQL_CLIENT
void
Start_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
/*****************************************************************************
{
if
(
short_form
)
return
;
print_header
(
file
);
Log_event::set_log_pos()
fprintf
(
file
,
"
\t
Start: binlog v %d, server v %s created "
,
binlog_version
,
server_version
);
print_timestamp
(
file
,
(
time_t
*
)
&
created
);
fputc
(
'\n'
,
file
);
fflush
(
file
);
}
void
Stop_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
****************************************************************************/
#ifndef MYSQL_CLIENT
void
Log_event
::
set_log_pos
(
MYSQL_LOG
*
log
)
{
{
if
(
short_form
)
if
(
!
log_pos
)
return
;
log_pos
=
my_b_tell
(
&
log
->
log_file
);
print_header
(
file
);
fprintf
(
file
,
"
\t
Stop
\n
"
);
fflush
(
file
);
}
}
#endif // !MYSQL_CLIENT
void
Rotate_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
{
char
buf
[
22
];
if
(
short_form
)
return
;
print_header
(
file
);
fprintf
(
file
,
"
\t
Rotate to "
);
if
(
new_log_ident
)
my_fwrite
(
file
,
(
byte
*
)
new_log_ident
,
(
uint
)
ident_len
,
MYF
(
MY_NABP
|
MY_WME
));
fprintf
(
file
,
" pos: %s"
,
llstr
(
pos
,
buf
));
if
(
flags
&
LOG_EVENT_FORCED_ROTATE_F
)
fprintf
(
file
,
" forced by master"
);
fputc
(
'\n'
,
file
);
fflush
(
file
);
}
#endif
/* #ifdef MYSQL_CLIENT */
/*****************************************************************************
*****************************************************************************
Query_log_event methods
Start_log_event
::
Start_log_event
(
const
char
*
buf
,
*****************************************************************************
bool
old_format
)
****************************************************************************/
:
Log_event
(
buf
,
old_format
)
{
buf
+=
(
old_format
)
?
OLD_HEADER_LEN
:
LOG_EVENT_HEADER_LEN
;
binlog_version
=
uint2korr
(
buf
+
ST_BINLOG_VER_OFFSET
);
memcpy
(
server_version
,
buf
+
ST_SERVER_VER_OFFSET
,
ST_SERVER_VER_LEN
);
created
=
uint4korr
(
buf
+
ST_CREATED_OFFSET
);
}
int
Start_log_event
::
write_data
(
IO_CACHE
*
file
)
#ifndef MYSQL_CLIENT
{
/*****************************************************************************
char
buff
[
START_HEADER_LEN
];
int2store
(
buff
+
ST_BINLOG_VER_OFFSET
,
binlog_version
);
memcpy
(
buff
+
ST_SERVER_VER_OFFSET
,
server_version
,
ST_SERVER_VER_LEN
);
int4store
(
buff
+
ST_CREATED_OFFSET
,
created
);
return
(
my_b_safe_write
(
file
,
(
byte
*
)
buff
,
sizeof
(
buff
))
?
-
1
:
0
);
}
Query_log_event::pack_info()
Rotate_log_event
::
Rotate_log_event
(
const
char
*
buf
,
int
event_len
,
****************************************************************************/
bool
old_format
)
void
Query_log_event
::
pack_info
(
String
*
packet
)
:
Log_event
(
buf
,
old_format
),
new_log_ident
(
NULL
),
alloced
(
0
)
{
{
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
char
buf
[
256
];
int
header_size
=
(
old_format
)
?
OLD_HEADER_LEN
:
LOG_EVENT_HEADER_LEN
;
String
tmp
(
buf
,
sizeof
(
buf
),
system_charset_info
);
uint
ident_offset
;
tmp
.
length
(
0
);
if
(
event_len
<
header_size
)
if
(
db
&&
db_len
)
return
;
buf
+=
header_size
;
if
(
old_format
)
{
ident_len
=
(
uint
)(
event_len
-
OLD_HEADER_LEN
);
pos
=
4
;
ident_offset
=
0
;
}
else
{
{
ident_len
=
(
uint
)(
event_len
-
ROTATE_EVENT_OVERHEAD
);
tmp
.
append
(
"use `"
,
5
);
pos
=
uint8korr
(
buf
+
R_POS_OFFSET
);
tmp
.
append
(
db
,
db_len
);
ident_offset
=
ROTATE_HEADER_LEN
;
tmp
.
append
(
"`; "
,
3
)
;
}
}
set_if_smaller
(
ident_len
,
FN_REFLEN
-
1
);
if
(
!
(
new_log_ident
=
my_strdup_with_length
((
byte
*
)
buf
+
if
(
query
&&
q_len
)
ident_offset
,
tmp
.
append
(
query
,
q_len
);
(
uint
)
ident_len
,
net_store_data
(
packet
,
(
char
*
)
tmp
.
ptr
(),
tmp
.
length
());
MYF
(
MY_WME
))))
return
;
alloced
=
1
;
}
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
int
Rotate_log_event
::
write_data
(
IO_CACHE
*
file
)
Query_log_event::write()
****************************************************************************/
int
Query_log_event
::
write
(
IO_CACHE
*
file
)
{
{
char
buf
[
ROTATE_HEADER_LEN
];
return
query
?
Log_event
::
write
(
file
)
:
-
1
;
int8store
(
buf
,
pos
+
R_POS_OFFSET
);
return
(
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
ROTATE_HEADER_LEN
)
||
my_b_safe_write
(
file
,
(
byte
*
)
new_log_ident
,
(
uint
)
ident_len
));
}
}
/*****************************************************************************
#ifndef MYSQL_CLIENT
Query_log_event::write_data()
Query_log_event
::
Query_log_event
(
THD
*
thd_arg
,
const
char
*
query_arg
,
ulong
query_length
,
bool
using_trans
)
:
Log_event
(
thd_arg
),
data_buf
(
0
),
query
(
query_arg
),
db
(
thd_arg
->
db
),
q_len
((
uint32
)
query_length
),
error_code
(
thd_arg
->
killed
?
ER_SERVER_SHUTDOWN
:
thd_arg
->
net
.
last_errno
),
thread_id
(
thd_arg
->
thread_id
),
cache_stmt
(
using_trans
&&
(
thd_arg
->
options
&
(
OPTION_NOT_AUTOCOMMIT
|
OPTION_BEGIN
)))
{
time_t
end_time
;
time
(
&
end_time
);
exec_time
=
(
ulong
)
(
end_time
-
thd
->
start_time
);
db_len
=
(
db
)
?
(
uint32
)
strlen
(
db
)
:
0
;
}
#endif
Query_log_event
::
Query_log_event
(
const
char
*
buf
,
int
event_len
,
****************************************************************************/
int
Query_log_event
::
write_data
(
IO_CACHE
*
file
)
{
if
(
!
query
)
return
-
1
;
char
buf
[
QUERY_HEADER_LEN
];
int4store
(
buf
+
Q_THREAD_ID_OFFSET
,
thread_id
);
int4store
(
buf
+
Q_EXEC_TIME_OFFSET
,
exec_time
);
buf
[
Q_DB_LEN_OFFSET
]
=
(
char
)
db_len
;
int2store
(
buf
+
Q_ERR_CODE_OFFSET
,
error_code
);
return
(
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
QUERY_HEADER_LEN
)
||
my_b_safe_write
(
file
,
(
db
)
?
(
byte
*
)
db
:
(
byte
*
)
""
,
db_len
+
1
)
||
my_b_safe_write
(
file
,
(
byte
*
)
query
,
q_len
))
?
-
1
:
0
;
}
/*****************************************************************************
Query_log_event::Query_log_event()
****************************************************************************/
#ifndef MYSQL_CLIENT
Query_log_event
::
Query_log_event
(
THD
*
thd_arg
,
const
char
*
query_arg
,
ulong
query_length
,
bool
using_trans
)
:
Log_event
(
thd_arg
),
data_buf
(
0
),
query
(
query_arg
),
db
(
thd_arg
->
db
),
q_len
((
uint32
)
query_length
),
error_code
(
thd_arg
->
killed
?
ER_SERVER_SHUTDOWN
:
thd_arg
->
net
.
last_errno
),
thread_id
(
thd_arg
->
thread_id
),
cache_stmt
(
using_trans
&&
(
thd_arg
->
options
&
(
OPTION_NOT_AUTOCOMMIT
|
OPTION_BEGIN
)))
{
time_t
end_time
;
time
(
&
end_time
);
exec_time
=
(
ulong
)
(
end_time
-
thd
->
start_time
);
db_len
=
(
db
)
?
(
uint32
)
strlen
(
db
)
:
0
;
}
#endif // MYSQL_CLIENT
/*****************************************************************************
Query_log_event::Query_log_event()
****************************************************************************/
Query_log_event
::
Query_log_event
(
const
char
*
buf
,
int
event_len
,
bool
old_format
)
bool
old_format
)
:
Log_event
(
buf
,
old_format
),
data_buf
(
0
),
query
(
NULL
),
db
(
NULL
)
:
Log_event
(
buf
,
old_format
),
data_buf
(
0
),
query
(
NULL
),
db
(
NULL
)
{
{
...
@@ -833,9 +779,12 @@ Query_log_event::Query_log_event(const char* buf, int event_len,
...
@@ -833,9 +779,12 @@ Query_log_event::Query_log_event(const char* buf, int event_len,
*
((
char
*
)
query
+
q_len
)
=
0
;
*
((
char
*
)
query
+
q_len
)
=
0
;
}
}
/*****************************************************************************
Query_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
#ifdef MYSQL_CLIENT
void
Query_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
void
Query_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
{
{
char
buff
[
40
],
*
end
;
// Enough for SET TIMESTAMP
char
buff
[
40
],
*
end
;
// Enough for SET TIMESTAMP
...
@@ -863,113 +812,311 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db)
...
@@ -863,113 +812,311 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db)
my_fwrite
(
file
,
(
byte
*
)
query
,
q_len
,
MYF
(
MY_NABP
|
MY_WME
));
my_fwrite
(
file
,
(
byte
*
)
query
,
q_len
,
MYF
(
MY_NABP
|
MY_WME
));
fprintf
(
file
,
";
\n
"
);
fprintf
(
file
,
";
\n
"
);
}
}
#endif
#endif // MYSQL_CLIENT
/*****************************************************************************
Query_log_event::exec_event()
int
Query_log_event
::
write_data
(
IO_CACHE
*
file
)
****************************************************************************/
#ifndef MYSQL_CLIENT
int
Query_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
{
{
if
(
!
query
)
int
expected_error
,
actual_error
=
0
;
return
-
1
;
init_sql_alloc
(
&
thd
->
mem_root
,
8192
,
0
);
thd
->
db
=
rewrite_db
((
char
*
)
db
);
char
buf
[
QUERY_HEADER_LEN
];
/*
int4store
(
buf
+
Q_THREAD_ID_OFFSET
,
thread_id
);
InnoDB internally stores the master log position it has processed so far;
int4store
(
buf
+
Q_EXEC_TIME_OFFSET
,
exec_time
);
position to store is really pos + pending + event_len
buf
[
Q_DB_LEN_OFFSET
]
=
(
char
)
db_len
;
since we must store the pos of the END of the current log event
int2store
(
buf
+
Q_ERR_CODE_OFFSET
,
error_code
);
*/
rli
->
event_len
=
get_event_len
();
return
(
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
QUERY_HEADER_LEN
)
||
if
(
db_ok
(
thd
->
db
,
replicate_do_db
,
replicate_ignore_db
))
my_b_safe_write
(
file
,
(
db
)
?
(
byte
*
)
db
:
(
byte
*
)
""
,
db_len
+
1
)
||
{
my_b_safe_write
(
file
,
(
byte
*
)
query
,
q_len
))
?
-
1
:
0
;
thd
->
query
=
(
char
*
)
query
;
}
thd
->
set_time
((
time_t
)
when
);
thd
->
current_tablenr
=
0
;
VOID
(
pthread_mutex_lock
(
&
LOCK_thread_count
));
thd
->
query_id
=
query_id
++
;
VOID
(
pthread_mutex_unlock
(
&
LOCK_thread_count
));
thd
->
query_error
=
0
;
// clear error
thd
->
net
.
last_errno
=
0
;
thd
->
net
.
last_error
[
0
]
=
0
;
thd
->
slave_proxy_id
=
thread_id
;
// for temp tables
Intvar_log_event
::
Intvar_log_event
(
const
char
*
buf
,
bool
old_format
)
/*
:
Log_event
(
buf
,
old_format
)
Sanity check to make sure the master did not get a really bad
{
error on the query.
buf
+=
(
old_format
)
?
OLD_HEADER_LEN
:
LOG_EVENT_HEADER_LEN
;
*/
type
=
buf
[
I_TYPE_OFFSET
];
if
(
ignored_error_code
((
expected_error
=
error_code
))
||
val
=
uint8korr
(
buf
+
I_VAL_OFFSET
);
!
check_expected_error
(
thd
,
rli
,
expected_error
))
}
{
mysql_log
.
write
(
thd
,
COM_QUERY
,
"%s"
,
thd
->
query
);
DBUG_PRINT
(
"query"
,(
"%s"
,
thd
->
query
));
mysql_parse
(
thd
,
thd
->
query
,
q_len
);
DBUG_PRINT
(
"info"
,(
"expected_error: %d last_errno: %d"
,
expected_error
,
thd
->
net
.
last_errno
));
if
((
expected_error
!=
(
actual_error
=
thd
->
net
.
last_errno
))
&&
expected_error
&&
!
ignored_error_code
(
actual_error
)
&&
!
ignored_error_code
(
expected_error
))
{
const
char
*
errmsg
=
"Slave: did not get the expected error\
running query from master - expected: '%s' (%d), got '%s' (%d)"
;
sql_print_error
(
errmsg
,
ER_SAFE
(
expected_error
),
expected_error
,
actual_error
?
thd
->
net
.
last_error
:
"no error"
,
actual_error
);
thd
->
query_error
=
1
;
}
else
if
(
expected_error
==
actual_error
||
ignored_error_code
(
actual_error
))
{
DBUG_PRINT
(
"info"
,(
"error ignored"
));
thd
->
query_error
=
0
;
*
rli
->
last_slave_error
=
0
;
rli
->
last_slave_errno
=
0
;
}
}
else
{
// master could be inconsistent, abort and tell DBA to check/fix it
thd
->
db
=
thd
->
query
=
0
;
thd
->
variables
.
convert_set
=
0
;
close_thread_tables
(
thd
);
free_root
(
&
thd
->
mem_root
,
0
);
return
1
;
}
}
thd
->
db
=
0
;
// prevent db from being freed
thd
->
query
=
0
;
// just to be sure
// assume no convert for next query unless set explictly
thd
->
variables
.
convert_set
=
0
;
close_thread_tables
(
thd
);
const
char
*
Intvar_log_event
::
get_var_type_name
()
if
(
thd
->
query_error
||
thd
->
fatal_error
)
{
{
switch
(
type
)
{
slave_print_error
(
rli
,
actual_error
,
"error '%s' on query '%s'"
,
case
LAST_INSERT_ID_EVENT
:
return
"LAST_INSERT_ID"
;
actual_error
?
thd
->
net
.
last_error
:
case
INSERT_ID_EVENT
:
return
"INSERT_ID"
;
"unexpected success or fatal error"
,
query
);
default:
/* impossible */
return
"UNKNOWN"
;
free_root
(
&
thd
->
mem_root
,
0
);
return
1
;
}
}
free_root
(
&
thd
->
mem_root
,
0
);
return
Log_event
::
exec_event
(
rli
);
}
}
#endif // !MYSQL_CLIENT
int
Intvar_log_event
::
write_data
(
IO_CACHE
*
file
)
/*****************************************************************************
*****************************************************************************
Start_log_event methods
*****************************************************************************
****************************************************************************/
/*****************************************************************************
Start_log_event::pack_info()
****************************************************************************/
#ifndef MYSQL_CLIENT
void
Start_log_event
::
pack_info
(
String
*
packet
)
{
{
char
buf
[
9
];
char
buf1
[
256
];
buf
[
I_TYPE_OFFSET
]
=
type
;
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
int8store
(
buf
+
I_VAL_OFFSET
,
val
);
tmp
.
length
(
0
);
return
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
sizeof
(
buf
));
char
buf
[
22
];
tmp
.
append
(
"Server ver: "
);
tmp
.
append
(
server_version
);
tmp
.
append
(
", Binlog ver: "
);
tmp
.
append
(
llstr
(
binlog_version
,
buf
));
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Start_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
#ifdef MYSQL_CLIENT
void
Intvar
_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
void
Start
_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
{
{
char
llbuff
[
22
];
if
(
short_form
)
const
char
*
msg
;
return
;
LINT_INIT
(
msg
);
if
(
!
short_form
)
{
print_header
(
file
);
print_header
(
file
);
fprintf
(
file
,
"
\t
Intvar
\n
"
);
fprintf
(
file
,
"
\t
Start: binlog v %d, server v %s created "
,
binlog_version
,
}
server_version
);
print_timestamp
(
file
,
(
time_t
*
)
&
created
);
fprintf
(
file
,
"SET "
);
fputc
(
'\n'
,
file
);
switch
(
type
)
{
case
LAST_INSERT_ID_EVENT
:
msg
=
"LAST_INSERT_ID"
;
break
;
case
INSERT_ID_EVENT
:
msg
=
"INSERT_ID"
;
break
;
}
fprintf
(
file
,
"%s=%s;
\n
"
,
msg
,
llstr
(
val
,
llbuff
));
fflush
(
file
);
fflush
(
file
);
}
}
#endif
#endif
// MYSQL_CLIENT
/*****************************************************************************
/*****************************************************************************
*
* Rand log event
Start_log_event::Start_log_event()
*
****************************************************************************/
****************************************************************************/
Rand_log_event
::
Rand_log_event
(
const
char
*
buf
,
bool
old_format
)
Start_log_event
::
Start_log_event
(
const
char
*
buf
,
bool
old_format
)
:
Log_event
(
buf
,
old_format
)
:
Log_event
(
buf
,
old_format
)
{
{
buf
+=
(
old_format
)
?
OLD_HEADER_LEN
:
LOG_EVENT_HEADER_LEN
;
buf
+=
(
old_format
)
?
OLD_HEADER_LEN
:
LOG_EVENT_HEADER_LEN
;
seed1
=
uint8korr
(
buf
+
RAND_SEED1_OFFSET
);
binlog_version
=
uint2korr
(
buf
+
ST_BINLOG_VER_OFFSET
);
seed2
=
uint8korr
(
buf
+
RAND_SEED2_OFFSET
);
memcpy
(
server_version
,
buf
+
ST_SERVER_VER_OFFSET
,
ST_SERVER_VER_LEN
);
created
=
uint4korr
(
buf
+
ST_CREATED_OFFSET
);
}
}
int
Rand_log_event
::
write_data
(
IO_CACHE
*
file
)
/*****************************************************************************
{
char
buf
[
16
];
int8store
(
buf
+
RAND_SEED1_OFFSET
,
seed1
);
int8store
(
buf
+
RAND_SEED2_OFFSET
,
seed2
);
return
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
sizeof
(
buf
));
}
#ifdef MYSQL_CLIENT
Start_log_event::write_data()
void
Rand_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
****************************************************************************/
int
Start_log_event
::
write_data
(
IO_CACHE
*
file
)
{
{
char
llbuff
[
22
];
char
buff
[
START_HEADER_LEN
];
if
(
!
short_form
)
int2store
(
buff
+
ST_BINLOG_VER_OFFSET
,
binlog_version
);
{
memcpy
(
buff
+
ST_SERVER_VER_OFFSET
,
server_version
,
ST_SERVER_VER_LEN
);
print_header
(
file
);
int4store
(
buff
+
ST_CREATED_OFFSET
,
created
);
fprintf
(
file
,
"
\t
Rand
\n
"
);
return
(
my_b_safe_write
(
file
,
(
byte
*
)
buff
,
sizeof
(
buff
))
?
-
1
:
0
);
}
fprintf
(
file
,
"SET RAND SEED1=%s;
\n
"
,
llstr
(
seed1
,
llbuff
));
fprintf
(
file
,
"SET RAND SEED2=%s;
\n
"
,
llstr
(
seed2
,
llbuff
));
fflush
(
file
);
}
}
#endif
/*****************************************************************************
Start_log_event::exec_event()
The master started
IMPLEMENTATION
- To handle the case where the master died without a stop event,
we clean up all temporary tables + locks that we got.
TODO
- Remove all active user locks
- If we have an active transaction at this point, the master died
in the middle while writing the transaction to the binary log.
In this case we should stop the slave.
****************************************************************************/
#ifndef MYSQL_CLIENT
int
Start_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
{
/* All temporary tables was deleted on the master */
close_temporary_tables
(
thd
);
/*
If we have old format, load_tmpdir is cleaned up by the I/O thread
*/
if
(
!
rli
->
mi
->
old_format
)
cleanup_load_tmpdir
();
return
Log_event
::
exec_event
(
rli
);
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
*****************************************************************************
Load_log_event methods
*****************************************************************************
****************************************************************************/
/*****************************************************************************
Load_log_event::pack_info()
****************************************************************************/
#ifndef MYSQL_CLIENT
void
Load_log_event
::
pack_info
(
String
*
packet
)
{
char
buf
[
256
];
String
tmp
(
buf
,
sizeof
(
buf
),
system_charset_info
);
tmp
.
length
(
0
);
if
(
db
&&
db_len
)
{
tmp
.
append
(
"use "
);
tmp
.
append
(
db
,
db_len
);
tmp
.
append
(
"; "
,
2
);
}
tmp
.
append
(
"LOAD DATA INFILE '"
);
tmp
.
append
(
fname
,
fname_len
);
tmp
.
append
(
"' "
,
2
);
if
(
sql_ex
.
opt_flags
&&
REPLACE_FLAG
)
tmp
.
append
(
" REPLACE "
);
else
if
(
sql_ex
.
opt_flags
&&
IGNORE_FLAG
)
tmp
.
append
(
" IGNORE "
);
tmp
.
append
(
"INTO TABLE "
);
tmp
.
append
(
table_name
);
if
(
sql_ex
.
field_term_len
)
{
tmp
.
append
(
" FIELDS TERMINATED BY "
);
pretty_print_str
(
&
tmp
,
sql_ex
.
field_term
,
sql_ex
.
field_term_len
);
}
if
(
sql_ex
.
enclosed_len
)
{
if
(
sql_ex
.
opt_flags
&&
OPT_ENCLOSED_FLAG
)
tmp
.
append
(
" OPTIONALLY "
);
tmp
.
append
(
" ENCLOSED BY "
);
pretty_print_str
(
&
tmp
,
sql_ex
.
enclosed
,
sql_ex
.
enclosed_len
);
}
if
(
sql_ex
.
escaped_len
)
{
tmp
.
append
(
" ESCAPED BY "
);
pretty_print_str
(
&
tmp
,
sql_ex
.
escaped
,
sql_ex
.
escaped_len
);
}
if
(
sql_ex
.
line_term_len
)
{
tmp
.
append
(
" LINES TERMINATED BY "
);
pretty_print_str
(
&
tmp
,
sql_ex
.
line_term
,
sql_ex
.
line_term_len
);
}
if
(
sql_ex
.
line_start_len
)
{
tmp
.
append
(
" LINES STARTING BY "
);
pretty_print_str
(
&
tmp
,
sql_ex
.
line_start
,
sql_ex
.
line_start_len
);
}
if
((
int
)
skip_lines
>
0
)
tmp
.
append
(
" IGNORE %ld LINES "
,
(
long
)
skip_lines
);
if
(
num_fields
)
{
uint
i
;
const
char
*
field
=
fields
;
tmp
.
append
(
" ("
);
for
(
i
=
0
;
i
<
num_fields
;
i
++
)
{
if
(
i
)
tmp
.
append
(
" ,"
);
tmp
.
append
(
field
);
field
+=
field_lens
[
i
]
+
1
;
}
tmp
.
append
(
')'
);
}
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Load_log_event::write_data_header()
****************************************************************************/
int
Load_log_event
::
write_data_header
(
IO_CACHE
*
file
)
int
Load_log_event
::
write_data_header
(
IO_CACHE
*
file
)
{
{
char
buf
[
LOAD_HEADER_LEN
];
char
buf
[
LOAD_HEADER_LEN
];
...
@@ -982,6 +1129,11 @@ int Load_log_event::write_data_header(IO_CACHE* file)
...
@@ -982,6 +1129,11 @@ int Load_log_event::write_data_header(IO_CACHE* file)
return
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
LOAD_HEADER_LEN
);
return
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
LOAD_HEADER_LEN
);
}
}
/*****************************************************************************
Load_log_event::write_data_body()
****************************************************************************/
int
Load_log_event
::
write_data_body
(
IO_CACHE
*
file
)
int
Load_log_event
::
write_data_body
(
IO_CACHE
*
file
)
{
{
if
(
sql_ex
.
write_data
(
file
))
if
(
sql_ex
.
write_data
(
file
))
...
@@ -997,99 +1149,11 @@ int Load_log_event::write_data_body(IO_CACHE* file)
...
@@ -997,99 +1149,11 @@ int Load_log_event::write_data_body(IO_CACHE* file)
my_b_safe_write
(
file
,
(
byte
*
)
fname
,
fname_len
));
my_b_safe_write
(
file
,
(
byte
*
)
fname
,
fname_len
));
}
}
/*****************************************************************************
Load_log_event::Load_log_event()
static
bool
write_str
(
IO_CACHE
*
file
,
char
*
str
,
byte
length
)
****************************************************************************/
{
return
(
my_b_safe_write
(
file
,
&
length
,
1
)
||
my_b_safe_write
(
file
,
(
byte
*
)
str
,
(
int
)
length
));
}
int
sql_ex_info
::
write_data
(
IO_CACHE
*
file
)
{
if
(
new_format
())
{
return
(
write_str
(
file
,
field_term
,
field_term_len
)
||
write_str
(
file
,
enclosed
,
enclosed_len
)
||
write_str
(
file
,
line_term
,
line_term_len
)
||
write_str
(
file
,
line_start
,
line_start_len
)
||
write_str
(
file
,
escaped
,
escaped_len
)
||
my_b_safe_write
(
file
,(
byte
*
)
&
opt_flags
,
1
));
}
else
{
old_sql_ex
old_ex
;
old_ex
.
field_term
=
*
field_term
;
old_ex
.
enclosed
=
*
enclosed
;
old_ex
.
line_term
=
*
line_term
;
old_ex
.
line_start
=
*
line_start
;
old_ex
.
escaped
=
*
escaped
;
old_ex
.
opt_flags
=
opt_flags
;
old_ex
.
empty_flags
=
empty_flags
;
return
my_b_safe_write
(
file
,
(
byte
*
)
&
old_ex
,
sizeof
(
old_ex
));
}
}
static
inline
int
read_str
(
char
*
&
buf
,
char
*
buf_end
,
char
*
&
str
,
uint8
&
len
)
{
if
(
buf
+
(
uint
)
(
uchar
)
*
buf
>=
buf_end
)
return
1
;
len
=
(
uint8
)
*
buf
;
str
=
buf
+
1
;
buf
+=
(
uint
)
len
+
1
;
return
0
;
}
char
*
sql_ex_info
::
init
(
char
*
buf
,
char
*
buf_end
,
bool
use_new_format
)
{
cached_new_format
=
use_new_format
;
if
(
use_new_format
)
{
empty_flags
=
0
;
/*
The code below assumes that buf will not disappear from
under our feet during the lifetime of the event. This assumption
holds true in the slave thread if the log is in new format, but is not
the case when we have old format because we will be reusing net buffer
to read the actual file before we write out the Create_file event.
*/
if
(
read_str
(
buf
,
buf_end
,
field_term
,
field_term_len
)
||
read_str
(
buf
,
buf_end
,
enclosed
,
enclosed_len
)
||
read_str
(
buf
,
buf_end
,
line_term
,
line_term_len
)
||
read_str
(
buf
,
buf_end
,
line_start
,
line_start_len
)
||
read_str
(
buf
,
buf_end
,
escaped
,
escaped_len
))
return
0
;
opt_flags
=
*
buf
++
;
}
else
{
field_term_len
=
enclosed_len
=
line_term_len
=
line_start_len
=
escaped_len
=
1
;
field_term
=
buf
++
;
// Use first byte in string
enclosed
=
buf
++
;
line_term
=
buf
++
;
line_start
=
buf
++
;
escaped
=
buf
++
;
opt_flags
=
*
buf
++
;
empty_flags
=
*
buf
++
;
if
(
empty_flags
&
FIELD_TERM_EMPTY
)
field_term_len
=
0
;
if
(
empty_flags
&
ENCLOSED_EMPTY
)
enclosed_len
=
0
;
if
(
empty_flags
&
LINE_TERM_EMPTY
)
line_term_len
=
0
;
if
(
empty_flags
&
LINE_START_EMPTY
)
line_start_len
=
0
;
if
(
empty_flags
&
ESCAPED_EMPTY
)
escaped_len
=
0
;
}
return
buf
;
}
#ifndef MYSQL_CLIENT
#ifndef MYSQL_CLIENT
Load_log_event
::
Load_log_event
(
THD
*
thd
,
sql_exchange
*
ex
,
Load_log_event
::
Load_log_event
(
THD
*
thd
,
sql_exchange
*
ex
,
const
char
*
db_arg
,
const
char
*
table_name_arg
,
const
char
*
db_arg
,
const
char
*
table_name_arg
,
...
@@ -1162,14 +1226,16 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
...
@@ -1162,14 +1226,16 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
field_lens
=
(
const
uchar
*
)
field_lens_buf
.
ptr
();
field_lens
=
(
const
uchar
*
)
field_lens_buf
.
ptr
();
fields
=
fields_buf
.
ptr
();
fields
=
fields_buf
.
ptr
();
}
}
#endif // !MYSQL_CLIENT
#endif
/*****************************************************************************
Load_log_event::Load_log_event()
/*
The caller must do buf[event_len] = 0 before he starts using the
The caller must do buf[event_len] = 0 before he starts using the
constructed event.
constructed event.
*/
****************************************************************************/
Load_log_event
::
Load_log_event
(
const
char
*
buf
,
int
event_len
,
Load_log_event
::
Load_log_event
(
const
char
*
buf
,
int
event_len
,
bool
old_format
)
:
bool
old_format
)
:
Log_event
(
buf
,
old_format
),
num_fields
(
0
),
fields
(
0
),
Log_event
(
buf
,
old_format
),
num_fields
(
0
),
fields
(
0
),
...
@@ -1181,6 +1247,11 @@ Load_log_event::Load_log_event(const char* buf, int event_len,
...
@@ -1181,6 +1247,11 @@ Load_log_event::Load_log_event(const char* buf, int event_len,
copy_log_event
(
buf
,
event_len
,
old_format
);
copy_log_event
(
buf
,
event_len
,
old_format
);
}
}
/*****************************************************************************
Load_log_event::copy_log_event()
****************************************************************************/
int
Load_log_event
::
copy_log_event
(
const
char
*
buf
,
ulong
event_len
,
int
Load_log_event
::
copy_log_event
(
const
char
*
buf
,
ulong
event_len
,
bool
old_format
)
bool
old_format
)
{
{
...
@@ -1225,8 +1296,12 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
...
@@ -1225,8 +1296,12 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
return
0
;
return
0
;
}
}
#ifdef MYSQL_CLIENT
/*****************************************************************************
Load_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
void
Load_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
void
Load_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
{
{
if
(
!
short_form
)
if
(
!
short_form
)
...
@@ -1307,18 +1382,14 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
...
@@ -1307,18 +1382,14 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
fprintf
(
file
,
";
\n
"
);
fprintf
(
file
,
";
\n
"
);
}
}
#endif
/* #ifdef MYSQL_CLIENT */
#endif
/* #ifdef MYSQL_CLIENT */
#ifndef MYSQL_CLIENT
/*****************************************************************************
void
Log_event
::
set_log_pos
(
MYSQL_LOG
*
log
)
{
if
(
!
log_pos
)
log_pos
=
my_b_tell
(
&
log
->
log_file
);
}
Load_log_event::set_fields()
****************************************************************************/
#ifndef MYSQL_CLIENT
void
Load_log_event
::
set_fields
(
List
<
Item
>
&
fields
)
void
Load_log_event
::
set_fields
(
List
<
Item
>
&
fields
)
{
{
uint
i
;
uint
i
;
...
@@ -1329,589 +1400,690 @@ void Load_log_event::set_fields(List<Item> &fields)
...
@@ -1329,589 +1400,690 @@ void Load_log_event::set_fields(List<Item> &fields)
field
+=
field_lens
[
i
]
+
1
;
field
+=
field_lens
[
i
]
+
1
;
}
}
}
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Slave_log_event
::
Slave_log_event
(
THD
*
thd_arg
,
Load_log_event::exec_event()
struct
st_relay_log_info
*
rli
)
:
Log_event
(
thd_arg
),
mem_pool
(
0
),
master_host
(
0
)
****************************************************************************/
#ifndef MYSQL_CLIENT
int
Load_log_event
::
exec_event
(
NET
*
net
,
struct
st_relay_log_info
*
rli
)
{
{
DBUG_ENTER
(
"Slave_log_event"
);
init_sql_alloc
(
&
thd
->
mem_root
,
8192
,
0
);
if
(
!
rli
->
inited
)
// QQ When can this happen ?
thd
->
db
=
rewrite_db
((
char
*
)
db
);
DBUG_VOID_RETURN
;
thd
->
query
=
0
;
thd
->
query_error
=
0
;
MASTER_INFO
*
mi
=
rli
->
mi
;
if
(
db_ok
(
thd
->
db
,
replicate_do_db
,
replicate_ignore_db
))
// TODO: re-write this better without holding both locks at the same time
pthread_mutex_lock
(
&
mi
->
data_lock
);
pthread_mutex_lock
(
&
rli
->
data_lock
);
master_host_len
=
strlen
(
mi
->
host
);
master_log_len
=
strlen
(
rli
->
master_log_name
);
// on OOM, just do not initialize the structure and print the error
if
((
mem_pool
=
(
char
*
)
my_malloc
(
get_data_size
()
+
1
,
MYF
(
MY_WME
))))
{
{
master_host
=
mem_pool
+
SL_MASTER_HOST_OFFSET
;
thd
->
set_time
((
time_t
)
when
);
memcpy
(
master_host
,
mi
->
host
,
master_host_len
+
1
);
thd
->
current_tablenr
=
0
;
master_log
=
master_host
+
master_host_len
+
1
;
VOID
(
pthread_mutex_lock
(
&
LOCK_thread_count
));
memcpy
(
master_log
,
rli
->
master_log_name
,
master_log_len
+
1
);
thd
->
query_id
=
query_id
++
;
master_port
=
mi
->
port
;
VOID
(
pthread_mutex_unlock
(
&
LOCK_thread_count
));
master_pos
=
rli
->
master_log_pos
;
DBUG_PRINT
(
"info"
,
(
"master_log: %s pos: %d"
,
master_log
,
TABLE_LIST
tables
;
(
ulong
)
master_pos
));
bzero
((
char
*
)
&
tables
,
sizeof
(
tables
));
tables
.
db
=
thd
->
db
;
tables
.
alias
=
tables
.
real_name
=
(
char
*
)
table_name
;
tables
.
lock_type
=
TL_WRITE
;
// the table will be opened in mysql_load
if
(
table_rules_on
&&
!
tables_ok
(
thd
,
&
tables
))
{
// TODO: this is a bug - this needs to be moved to the I/O thread
if
(
net
)
skip_load_data_infile
(
net
);
}
}
else
else
sql_print_error
(
"Out of memory while recording slave event"
);
{
pthread_mutex_unlock
(
&
rli
->
data_lock
);
char
llbuff
[
22
];
pthread_mutex_unlock
(
&
mi
->
data_lock
);
enum
enum_duplicates
handle_dup
=
DUP_IGNORE
;
DBUG_VOID_RETURN
;
if
(
sql_ex
.
opt_flags
&&
REPLACE_FLAG
)
handle_dup
=
DUP_REPLACE
;
sql_exchange
ex
((
char
*
)
fname
,
sql_ex
.
opt_flags
&&
DUMPFILE_FLAG
);
String
field_term
(
sql_ex
.
field_term
,
sql_ex
.
field_term_len
,
system_charset_info
);
String
enclosed
(
sql_ex
.
enclosed
,
sql_ex
.
enclosed_len
,
system_charset_info
);
String
line_term
(
sql_ex
.
line_term
,
sql_ex
.
line_term_len
,
system_charset_info
);
String
line_start
(
sql_ex
.
line_start
,
sql_ex
.
line_start_len
,
system_charset_info
);
String
escaped
(
sql_ex
.
escaped
,
sql_ex
.
escaped_len
,
system_charset_info
);
ex
.
opt_enclosed
=
(
sql_ex
.
opt_flags
&
OPT_ENCLOSED_FLAG
);
if
(
sql_ex
.
empty_flags
&
FIELD_TERM_EMPTY
)
ex
.
field_term
->
length
(
0
);
ex
.
skip_lines
=
skip_lines
;
List
<
Item
>
fields
;
set_fields
(
fields
);
thd
->
slave_proxy_id
=
thd
->
thread_id
;
if
(
net
)
{
// mysql_load will use thd->net to read the file
thd
->
net
.
vio
=
net
->
vio
;
/*
Make sure the client does not get confused about the packet sequence
*/
thd
->
net
.
pkt_nr
=
net
->
pkt_nr
;
}
if
(
mysql_load
(
thd
,
&
ex
,
&
tables
,
fields
,
handle_dup
,
net
!=
0
,
TL_WRITE
))
thd
->
query_error
=
1
;
if
(
thd
->
cuted_fields
)
sql_print_error
(
"Slave: load data infile at position %s in log \
'%s' produced %d warning(s)"
,
llstr
(
rli
->
master_log_pos
,
llbuff
),
RPL_LOG_NAME
,
thd
->
cuted_fields
);
if
(
net
)
net
->
pkt_nr
=
thd
->
net
.
pkt_nr
;
}
}
else
{
/*
We will just ask the master to send us /dev/null if we do not
want to load the data.
TODO: this a bug - needs to be done in I/O thread
*/
if
(
net
)
skip_load_data_infile
(
net
);
}
thd
->
net
.
vio
=
0
;
thd
->
db
=
0
;
// prevent db from being freed
close_thread_tables
(
thd
);
if
(
thd
->
query_error
)
{
int
sql_error
=
thd
->
net
.
last_errno
;
if
(
!
sql_error
)
sql_error
=
ER_UNKNOWN_ERROR
;
slave_print_error
(
rli
,
sql_error
,
"Slave: Error '%s' running load data infile "
,
ER_SAFE
(
sql_error
));
free_root
(
&
thd
->
mem_root
,
0
);
return
1
;
}
free_root
(
&
thd
->
mem_root
,
0
);
if
(
thd
->
fatal_error
)
{
sql_print_error
(
"Slave: Fatal error running LOAD DATA INFILE "
);
return
1
;
}
return
Log_event
::
exec_event
(
rli
);
}
}
#endif // !MYSQL_CLIENT
#endif
/* ! MYSQL_CLIENT */
/*****************************************************************************
*****************************************************************************
Rotate_log_event methods
Slave_log_event
::~
Slave_log_event
()
*****************************************************************************
****************************************************************************/
/*****************************************************************************
Rotate_log_event::pack_info()
****************************************************************************/
#ifndef MYSQL_CLIENT
void
Rotate_log_event
::
pack_info
(
String
*
packet
)
{
{
my_free
(
mem_pool
,
MYF
(
MY_ALLOW_ZERO_PTR
));
char
buf1
[
256
],
buf
[
22
];
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
tmp
.
length
(
0
);
tmp
.
append
(
new_log_ident
,
ident_len
);
tmp
.
append
(
";pos="
);
tmp
.
append
(
llstr
(
pos
,
buf
));
if
(
flags
&
LOG_EVENT_FORCED_ROTATE_F
)
tmp
.
append
(
"; forced by master"
);
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
}
#endif // !MYSQL_CLIENT
#ifdef MYSQL_CLIENT
/*****************************************************************************
void
Slave_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
Rotate_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
void
Rotate_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
{
{
char
llbuf
f
[
22
];
char
bu
f
[
22
];
if
(
short_form
)
if
(
short_form
)
return
;
return
;
print_header
(
file
);
print_header
(
file
);
fprintf
(
file
,
"
\t
Rotate to "
);
if
(
new_log_ident
)
my_fwrite
(
file
,
(
byte
*
)
new_log_ident
,
(
uint
)
ident_len
,
MYF
(
MY_NABP
|
MY_WME
));
fprintf
(
file
,
" pos: %s"
,
llstr
(
pos
,
buf
));
if
(
flags
&
LOG_EVENT_FORCED_ROTATE_F
)
fprintf
(
file
,
" forced by master"
);
fputc
(
'\n'
,
file
);
fputc
(
'\n'
,
file
);
fprintf
(
file
,
"Slave: master_host: '%s' master_port: %d \
fflush
(
file
);
master_log: '%s' master_pos: %s
\n
"
,
master_host
,
master_port
,
master_log
,
llstr
(
master_pos
,
llbuff
));
}
#endif
/* MYSQL_CLIENT */
int
Slave_log_event
::
get_data_size
()
{
return
master_host_len
+
master_log_len
+
1
+
SL_MASTER_HOST_OFFSET
;
}
}
#endif // MYSQL_CLIENT
int
Slave_log_event
::
write_data
(
IO_CACHE
*
file
)
/*****************************************************************************
{
int8store
(
mem_pool
+
SL_MASTER_POS_OFFSET
,
master_pos
);
int2store
(
mem_pool
+
SL_MASTER_PORT_OFFSET
,
master_port
);
// log and host are already there
return
my_b_safe_write
(
file
,
(
byte
*
)
mem_pool
,
get_data_size
());
}
Rotate_log_event::Rotate_log_event()
void
Slave_log_event
::
init_from_mem_pool
(
int
data_size
)
****************************************************************************/
Rotate_log_event
::
Rotate_log_event
(
const
char
*
buf
,
int
event_len
,
bool
old_format
)
:
Log_event
(
buf
,
old_format
),
new_log_ident
(
NULL
),
alloced
(
0
)
{
{
master_pos
=
uint8korr
(
mem_pool
+
SL_MASTER_POS_OFFSET
);
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
master_port
=
uint2korr
(
mem_pool
+
SL_MASTER_PORT_OFFSET
);
int
header_size
=
(
old_format
)
?
OLD_HEADER_LEN
:
LOG_EVENT_HEADER_LEN
;
master_host
=
mem_pool
+
SL_MASTER_HOST_OFFSET
;
uint
ident_offset
;
master_host_len
=
strlen
(
master_host
);
if
(
event_len
<
header_size
)
// safety
master_log
=
master_host
+
master_host_len
+
1
;
if
(
master_log
>
mem_pool
+
data_size
)
{
master_host
=
0
;
return
;
return
;
buf
+=
header_size
;
if
(
old_format
)
{
ident_len
=
(
uint
)(
event_len
-
OLD_HEADER_LEN
);
pos
=
4
;
ident_offset
=
0
;
}
}
master_log_len
=
strlen
(
master_log
);
else
}
{
ident_len
=
(
uint
)(
event_len
-
ROTATE_EVENT_OVERHEAD
);
Slave_log_event
::
Slave_log_event
(
const
char
*
buf
,
int
event_len
)
pos
=
uint8korr
(
buf
+
R_POS_OFFSET
);
:
Log_event
(
buf
,
0
),
mem_pool
(
0
),
master_host
(
0
)
ident_offset
=
ROTATE_HEADER_LEN
;
{
}
event_len
-=
LOG_EVENT_HEADER_LEN
;
set_if_smaller
(
ident_len
,
FN_REFLEN
-
1
);
if
(
event_len
<
0
)
if
(
!
(
new_log_ident
=
my_strdup_with_length
((
byte
*
)
buf
+
return
;
ident_offset
,
if
(
!
(
mem_pool
=
(
char
*
)
my_malloc
(
event_len
+
1
,
MYF
(
MY_WME
))))
(
uint
)
ident_len
,
MYF
(
MY_WME
))))
return
;
return
;
memcpy
(
mem_pool
,
buf
+
LOG_EVENT_HEADER_LEN
,
event_len
);
alloced
=
1
;
mem_pool
[
event_len
]
=
0
;
init_from_mem_pool
(
event_len
);
}
}
#ifndef MYSQL_CLIENT
/*****************************************************************************
Create_file_log_event
::
Create_file_log_event
(
THD
*
thd_arg
,
sql_exchange
*
ex
,
const
char
*
db_arg
,
const
char
*
table_name_arg
,
List
<
Item
>&
fields_arg
,
enum
enum_duplicates
handle_dup
,
char
*
block_arg
,
uint
block_len_arg
)
:
Load_log_event
(
thd_arg
,
ex
,
db_arg
,
table_name_arg
,
fields_arg
,
handle_dup
),
fake_base
(
0
),
block
(
block_arg
),
block_len
(
block_len_arg
),
file_id
(
thd_arg
->
file_id
=
mysql_bin_log
.
next_file_id
())
{
sql_ex
.
force_new_format
();
}
#endif
int
Create_file_log_event
::
write_data_body
(
IO_CACHE
*
file
)
Rotate_log_event::write_data()
****************************************************************************/
int
Rotate_log_event
::
write_data
(
IO_CACHE
*
file
)
{
{
int
res
;
char
buf
[
ROTATE_HEADER_LEN
];
if
((
res
=
Load_log_event
::
write_data_body
(
file
))
||
fake_base
)
int8store
(
buf
,
pos
+
R_POS_OFFSET
);
return
res
;
return
(
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
ROTATE_HEADER_LEN
)
||
return
(
my_b_safe_write
(
file
,
(
byte
*
)
""
,
1
)
||
my_b_safe_write
(
file
,
(
byte
*
)
new_log_ident
,
(
uint
)
ident_len
));
my_b_safe_write
(
file
,
(
byte
*
)
block
,
block_len
));
}
}
int
Create_file_log_event
::
write_data_header
(
IO_CACHE
*
file
)
/*****************************************************************************
{
int
res
;
if
((
res
=
Load_log_event
::
write_data_header
(
file
))
||
fake_base
)
return
res
;
byte
buf
[
CREATE_FILE_HEADER_LEN
];
int4store
(
buf
+
CF_FILE_ID_OFFSET
,
file_id
);
return
my_b_safe_write
(
file
,
buf
,
CREATE_FILE_HEADER_LEN
);
}
int
Create_file_log_event
::
write_base
(
IO_CACHE
*
file
)
Rotate_log_event::exec_event()
{
int
res
;
fake_base
=
1
;
// pretend we are Load event
res
=
write
(
file
);
fake_base
=
0
;
return
res
;
}
Create_file_log_event
::
Create_file_log_event
(
const
char
*
buf
,
int
len
,
Got a rotate log even from the master
bool
old_format
)
:
Load_log_event
(
buf
,
0
,
old_format
),
fake_base
(
0
),
block
(
0
),
inited_from_old
(
0
)
{
int
block_offset
;
if
(
copy_log_event
(
buf
,
len
,
old_format
))
return
;
if
(
!
old_format
)
{
file_id
=
uint4korr
(
buf
+
LOG_EVENT_HEADER_LEN
+
+
LOAD_HEADER_LEN
+
CF_FILE_ID_OFFSET
);
// + 1 for \0 terminating fname
block_offset
=
(
LOG_EVENT_HEADER_LEN
+
Load_log_event
::
get_data_size
()
+
CREATE_FILE_HEADER_LEN
+
1
);
if
(
len
<
block_offset
)
return
;
block
=
(
char
*
)
buf
+
block_offset
;
block_len
=
len
-
block_offset
;
}
else
{
sql_ex
.
force_new_format
();
inited_from_old
=
1
;
}
}
IMPLEMENTATION
This is mainly used so that we can later figure out the logname and
position for the master.
#ifdef MYSQL_CLIENT
We can't rotate the slave as this will cause infinitive rotations
void
Create_file_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
in a A -> B -> A setup.
char
*
last_db
)
{
if
(
short_form
)
return
;
Load_log_event
::
print
(
file
,
1
,
last_db
);
fprintf
(
file
,
" file_id: %d block_len: %d
\n
"
,
file_id
,
block_len
);
}
#endif
#ifndef MYSQL_CLIENT
RETURN VALUES
void
Create_file_log_event
::
pack_info
(
String
*
packet
)
0 ok
{
char
buf1
[
256
],
buf
[
22
],
*
end
;
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
tmp
.
length
(
0
);
tmp
.
append
(
"db="
);
tmp
.
append
(
db
,
db_len
);
tmp
.
append
(
";table="
);
tmp
.
append
(
table_name
,
table_name_len
);
tmp
.
append
(
";file_id="
);
end
=
int10_to_str
((
long
)
file_id
,
buf
,
10
);
tmp
.
append
(
buf
,
(
uint32
)
(
end
-
buf
));
tmp
.
append
(
";block_len="
);
end
=
int10_to_str
((
long
)
block_len
,
buf
,
10
);
tmp
.
append
(
buf
,
(
uint32
)
(
end
-
buf
));
net_store_data
(
packet
,
(
char
*
)
tmp
.
ptr
(),
tmp
.
length
());
}
#endif
****************************************************************************/
#ifndef MYSQL_CLIENT
#ifndef MYSQL_CLIENT
Append_block_log_event
::
Append_block_log_event
(
THD
*
thd_arg
,
char
*
block_arg
,
int
Rotate_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
uint
block_len_arg
)
:
Log_event
(
thd_arg
),
block
(
block_arg
),
block_len
(
block_len_arg
),
file_id
(
thd_arg
->
file_id
)
{
{
char
*
log_name
=
rli
->
master_log_name
;
DBUG_ENTER
(
"Rotate_log_event::exec_event"
);
pthread_mutex_lock
(
&
rli
->
data_lock
);
memcpy
(
log_name
,
new_log_ident
,
ident_len
+
1
);
rli
->
master_log_pos
=
pos
;
rli
->
relay_log_pos
+=
get_event_len
();
DBUG_PRINT
(
"info"
,
(
"master_log_pos: %d"
,
(
ulong
)
rli
->
master_log_pos
));
pthread_mutex_unlock
(
&
rli
->
data_lock
);
pthread_cond_broadcast
(
&
rli
->
data_cond
);
flush_relay_log_info
(
rli
);
DBUG_RETURN
(
0
);
}
}
#endif
#endif
// !MYSQL_CLIENT
Append_block_log_event
::
Append_block_log_event
(
const
char
*
buf
,
int
len
)
/*****************************************************************************
:
Log_event
(
buf
,
0
),
block
(
0
)
*****************************************************************************
{
if
((
uint
)
len
<
APPEND_BLOCK_EVENT_OVERHEAD
)
return
;
file_id
=
uint4korr
(
buf
+
LOG_EVENT_HEADER_LEN
+
AB_FILE_ID_OFFSET
);
block
=
(
char
*
)
buf
+
APPEND_BLOCK_EVENT_OVERHEAD
;
block_len
=
len
-
APPEND_BLOCK_EVENT_OVERHEAD
;
}
int
Append_block_log_event
::
write_data
(
IO_CACHE
*
file
)
Intvar_log_event methods
{
byte
buf
[
APPEND_BLOCK_HEADER_LEN
];
int4store
(
buf
+
AB_FILE_ID_OFFSET
,
file_id
);
return
(
my_b_safe_write
(
file
,
buf
,
APPEND_BLOCK_HEADER_LEN
)
||
my_b_safe_write
(
file
,
(
byte
*
)
block
,
block_len
));
}
#ifdef MYSQL_CLIENT
*****************************************************************************
void
Append_block_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
****************************************************************************/
char
*
last_db
)
{
/*****************************************************************************
if
(
short_form
)
return
;
Intvar_log_event::pack_info()
print_header
(
file
);
fputc
(
'\n'
,
file
);
fprintf
(
file
,
"#Append_block: file_id: %d block_len: %d
\n
"
,
file_id
,
block_len
);
}
#endif
****************************************************************************/
#ifndef MYSQL_CLIENT
#ifndef MYSQL_CLIENT
void
Append_block
_log_event
::
pack_info
(
String
*
packet
)
void
Intvar
_log_event
::
pack_info
(
String
*
packet
)
{
{
char
buf
[
256
];
char
buf1
[
256
],
buf
[
22
];
uint
length
;
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
length
=
(
uint
)
my_sprintf
(
buf
,
tmp
.
length
(
0
);
(
buf
,
";file_id=%u;block_len=%u"
,
file_id
,
tmp
.
append
(
get_var_type_name
());
block_len
));
tmp
.
append
(
'='
);
net_store_data
(
packet
,
buf
,
(
int32
)
length
);
tmp
.
append
(
llstr
(
val
,
buf
));
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Delete_file_log_event
::
Delete_file_log_event
(
THD
*
thd_arg
)
Intvar_log_event::Intvar_log_event()
:
Log_event
(
thd_arg
),
file_id
(
thd_arg
->
file_id
)
****************************************************************************/
Intvar_log_event
::
Intvar_log_event
(
const
char
*
buf
,
bool
old_format
)
:
Log_event
(
buf
,
old_format
)
{
{
buf
+=
(
old_format
)
?
OLD_HEADER_LEN
:
LOG_EVENT_HEADER_LEN
;
type
=
buf
[
I_TYPE_OFFSET
];
val
=
uint8korr
(
buf
+
I_VAL_OFFSET
);
}
}
#endif
/*****************************************************************************
Intvar_log_event::get_var_type_name()
Delete_file_log_event
::
Delete_file_log_event
(
const
char
*
buf
,
int
len
)
****************************************************************************/
:
Log_event
(
buf
,
0
),
file_id
(
0
)
const
char
*
Intvar_log_event
::
get_var_type_name
(
)
{
{
if
((
uint
)
len
<
DELETE_FILE_EVENT_OVERHEAD
)
switch
(
type
)
{
return
;
case
LAST_INSERT_ID_EVENT
:
return
"LAST_INSERT_ID"
;
file_id
=
uint4korr
(
buf
+
LOG_EVENT_HEADER_LEN
+
AB_FILE_ID_OFFSET
);
case
INSERT_ID_EVENT
:
return
"INSERT_ID"
;
default:
/* impossible */
return
"UNKNOWN"
;
}
}
}
/*****************************************************************************
Intvar_log_event::write_data()
int
Delete_file_log_event
::
write_data
(
IO_CACHE
*
file
)
****************************************************************************/
int
Intvar_log_event
::
write_data
(
IO_CACHE
*
file
)
{
{
byte
buf
[
DELETE_FILE_HEADER_LEN
];
char
buf
[
9
];
int4store
(
buf
+
DF_FILE_ID_OFFSET
,
file_id
);
buf
[
I_TYPE_OFFSET
]
=
type
;
return
my_b_safe_write
(
file
,
buf
,
DELETE_FILE_HEADER_LEN
);
int8store
(
buf
+
I_VAL_OFFSET
,
val
);
return
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
sizeof
(
buf
));
}
}
/*****************************************************************************
Intvar_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
#ifdef MYSQL_CLIENT
void
Delete_file_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
void
Intvar_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
char
*
last_db
)
{
{
if
(
short_form
)
char
llbuff
[
22
];
return
;
const
char
*
msg
;
LINT_INIT
(
msg
);
if
(
!
short_form
)
{
print_header
(
file
);
print_header
(
file
);
fputc
(
'\n'
,
file
);
fprintf
(
file
,
"
\t
Intvar
\n
"
);
fprintf
(
file
,
"#Delete_file: file_id=%u
\n
"
,
file_id
);
}
}
#endif
#ifndef MYSQL_CLIENT
fprintf
(
file
,
"SET "
);
void
Delete_file_log_event
::
pack_info
(
String
*
packet
)
switch
(
type
)
{
{
case
LAST_INSERT_ID_EVENT
:
char
buf
[
64
];
msg
=
"LAST_INSERT_ID"
;
uint
length
;
break
;
length
=
(
uint
)
my_sprintf
(
buf
,
(
buf
,
";file_id=%u"
,
(
uint
)
file_id
));
case
INSERT_ID_EVENT
:
net_store_data
(
packet
,
buf
,
(
int32
)
length
);
msg
=
"INSERT_ID"
;
break
;
}
fprintf
(
file
,
"%s=%s;
\n
"
,
msg
,
llstr
(
val
,
llbuff
));
fflush
(
file
);
}
}
#endif
#endif // MYSQL_CLIENT
/*****************************************************************************
Intvar_log_event::exec_event()
****************************************************************************/
#ifndef MYSQL_CLIENT
#ifndef MYSQL_CLIENT
Execute_load_log_event
::
Execute_load_log_event
(
THD
*
thd_arg
)
int
Intvar_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
:
Log_event
(
thd_arg
),
file_id
(
thd_arg
->
file_id
)
{
{
switch
(
type
)
{
case
LAST_INSERT_ID_EVENT
:
thd
->
last_insert_id_used
=
1
;
thd
->
last_insert_id
=
val
;
break
;
case
INSERT_ID_EVENT
:
thd
->
next_insert_id
=
val
;
break
;
}
rli
->
inc_pending
(
get_event_len
());
return
0
;
}
}
#endif
#endif
// !MYSQL_CLIENT
Execute_load_log_event
::
Execute_load_log_event
(
const
char
*
buf
,
int
len
)
/*****************************************************************************
:
Log_event
(
buf
,
0
),
file_id
(
0
)
*****************************************************************************
{
if
((
uint
)
len
<
EXEC_LOAD_EVENT_OVERHEAD
)
return
;
file_id
=
uint4korr
(
buf
+
LOG_EVENT_HEADER_LEN
+
EL_FILE_ID_OFFSET
);
}
Rand_log_event methods
int
Execute_load_log_event
::
write_data
(
IO_CACHE
*
file
)
*****************************************************************************
{
****************************************************************************/
byte
buf
[
EXEC_LOAD_HEADER_LEN
];
int4store
(
buf
+
EL_FILE_ID_OFFSET
,
file_id
);
return
my_b_safe_write
(
file
,
buf
,
EXEC_LOAD_HEADER_LEN
);
}
#ifdef MYSQL_CLIENT
/*****************************************************************************
void
Execute_load_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
Rand_log_event::pack_info()
****************************************************************************/
#ifndef MYSQL_CLIENT
void
Rand_log_event
::
pack_info
(
String
*
packet
)
{
{
if
(
short_form
)
char
buf1
[
256
],
buf
[
22
];
return
;
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
print_header
(
file
);
tmp
.
length
(
0
);
fputc
(
'\n'
,
file
);
tmp
.
append
(
"randseed1="
);
fprintf
(
file
,
"#Exec_load: file_id=%d
\n
"
,
tmp
.
append
(
llstr
(
seed1
,
buf
));
file_id
);
tmp
.
append
(
",randseed2="
);
tmp
.
append
(
llstr
(
seed2
,
buf
));
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
}
#endif
#endif // !MYSQL_CLIENT
#ifndef MYSQL_CLIENT
void
Execute_load_log_event
::
pack_info
(
String
*
packet
)
/*****************************************************************************
Rand_log_event::Rand_log_event()
****************************************************************************/
Rand_log_event
::
Rand_log_event
(
const
char
*
buf
,
bool
old_format
)
:
Log_event
(
buf
,
old_format
)
{
{
char
buf
[
64
];
buf
+=
(
old_format
)
?
OLD_HEADER_LEN
:
LOG_EVENT_HEADER_LEN
;
uint
length
;
seed1
=
uint8korr
(
buf
+
RAND_SEED1_OFFSET
);
length
=
(
uint
)
my_sprintf
(
buf
,
(
buf
,
";file_id=%u"
,
(
uint
)
file_id
));
seed2
=
uint8korr
(
buf
+
RAND_SEED2_OFFSET
);
net_store_data
(
packet
,
buf
,
(
int32
)
length
);
}
}
#endif
#ifndef MYSQL_CLIENT
/*****************************************************************************
int
Query_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
Rand_log_event::write_data()
****************************************************************************/
int
Rand_log_event
::
write_data
(
IO_CACHE
*
file
)
{
{
int
expected_error
,
actual_error
=
0
;
char
buf
[
16
];
init_sql_alloc
(
&
thd
->
mem_root
,
8192
,
0
);
int8store
(
buf
+
RAND_SEED1_OFFSET
,
seed1
);
thd
->
db
=
rewrite_db
((
char
*
)
db
);
int8store
(
buf
+
RAND_SEED2_OFFSET
,
seed2
);
return
my_b_safe_write
(
file
,
(
byte
*
)
buf
,
sizeof
(
buf
));
}
/*
/*****************************************************************************
InnoDB internally stores the master log position it has processed so far;
position to store is really pos + pending + event_len
since we must store the pos of the END of the current log event
*/
rli
->
event_len
=
get_event_len
();
if
(
db_ok
(
thd
->
db
,
replicate_do_db
,
replicate_ignore_db
))
Rand_log_event::print()
{
thd
->
query
=
(
char
*
)
query
;
thd
->
set_time
((
time_t
)
when
);
thd
->
current_tablenr
=
0
;
VOID
(
pthread_mutex_lock
(
&
LOCK_thread_count
));
thd
->
query_id
=
query_id
++
;
VOID
(
pthread_mutex_unlock
(
&
LOCK_thread_count
));
thd
->
query_error
=
0
;
// clear error
thd
->
net
.
last_errno
=
0
;
thd
->
net
.
last_error
[
0
]
=
0
;
thd
->
slave_proxy_id
=
thread_id
;
// for temp tables
/*
****************************************************************************/
Sanity check to make sure the master did not get a really bad
#ifdef MYSQL_CLIENT
error on the query.
void
Rand_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
*/
{
if
(
ignored_error_code
((
expected_error
=
error_code
))
||
char
llbuff
[
22
];
!
check_expected_error
(
thd
,
rli
,
expected_error
))
if
(
!
short_form
)
{
mysql_log
.
write
(
thd
,
COM_QUERY
,
"%s"
,
thd
->
query
);
DBUG_PRINT
(
"query"
,(
"%s"
,
thd
->
query
));
mysql_parse
(
thd
,
thd
->
query
,
q_len
);
DBUG_PRINT
(
"info"
,(
"expected_error: %d last_errno: %d"
,
expected_error
,
thd
->
net
.
last_errno
));
if
((
expected_error
!=
(
actual_error
=
thd
->
net
.
last_errno
))
&&
expected_error
&&
!
ignored_error_code
(
actual_error
)
&&
!
ignored_error_code
(
expected_error
))
{
const
char
*
errmsg
=
"Slave: did not get the expected error\
running query from master - expected: '%s' (%d), got '%s' (%d)"
;
sql_print_error
(
errmsg
,
ER_SAFE
(
expected_error
),
expected_error
,
actual_error
?
thd
->
net
.
last_error
:
"no error"
,
actual_error
);
thd
->
query_error
=
1
;
}
else
if
(
expected_error
==
actual_error
||
ignored_error_code
(
actual_error
))
{
DBUG_PRINT
(
"info"
,(
"error ignored"
));
thd
->
query_error
=
0
;
*
rli
->
last_slave_error
=
0
;
rli
->
last_slave_errno
=
0
;
}
}
else
{
{
// master could be inconsistent, abort and tell DBA to check/fix it
print_header
(
file
);
thd
->
db
=
thd
->
query
=
0
;
fprintf
(
file
,
"
\t
Rand
\n
"
);
thd
->
variables
.
convert_set
=
0
;
close_thread_tables
(
thd
);
free_root
(
&
thd
->
mem_root
,
0
);
return
1
;
}
}
}
thd
->
db
=
0
;
// prevent db from being freed
fprintf
(
file
,
"SET RAND SEED1=%s;
\n
"
,
llstr
(
seed1
,
llbuff
));
thd
->
query
=
0
;
// just to be sure
fprintf
(
file
,
"SET RAND SEED2=%s;
\n
"
,
llstr
(
seed2
,
llbuff
));
// assume no convert for next query unless set explictly
fflush
(
file
);
thd
->
variables
.
convert_set
=
0
;
}
close_thread_tables
(
thd
);
#endif // MYSQL_CLIENT
if
(
thd
->
query_error
||
thd
->
fatal_error
)
/*****************************************************************************
{
slave_print_error
(
rli
,
actual_error
,
"error '%s' on query '%s'"
,
Rand_log_event::exec_event()
actual_error
?
thd
->
net
.
last_error
:
"unexpected success or fatal error"
,
query
);
****************************************************************************/
free_root
(
&
thd
->
mem_root
,
0
);
#ifndef MYSQL_CLIENT
return
1
;
int
Rand_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
}
{
free_root
(
&
thd
->
mem_root
,
0
);
thd
->
rand
.
seed1
=
seed1
;
return
Log_event
::
exec_event
(
rli
);
thd
->
rand
.
seed2
=
seed2
;
rli
->
inc_pending
(
get_event_len
());
return
0
;
}
}
#endif // !MYSQL_CLIENT
int
Load_log_event
::
exec_event
(
NET
*
net
,
struct
st_relay_log_info
*
rli
)
/*****************************************************************************
*****************************************************************************
Slave_log_event methods
*****************************************************************************
****************************************************************************/
/*****************************************************************************
Slave_log_event::pack_info()
****************************************************************************/
#ifndef MYSQL_CLIENT
void
Slave_log_event
::
pack_info
(
String
*
packet
)
{
{
init_sql_alloc
(
&
thd
->
mem_root
,
8192
,
0
);
char
buf1
[
256
],
buf
[
22
],
*
end
;
thd
->
db
=
rewrite_db
((
char
*
)
db
);
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
thd
->
query
=
0
;
tmp
.
length
(
0
);
thd
->
query_error
=
0
;
tmp
.
append
(
"host="
);
tmp
.
append
(
master_host
);
tmp
.
append
(
",port="
);
end
=
int10_to_str
((
long
)
master_port
,
buf
,
10
);
tmp
.
append
(
buf
,
(
uint32
)
(
end
-
buf
));
tmp
.
append
(
",log="
);
tmp
.
append
(
master_log
);
tmp
.
append
(
",pos="
);
tmp
.
append
(
llstr
(
master_pos
,
buf
));
net_store_data
(
packet
,
tmp
.
ptr
(),
tmp
.
length
());
}
#endif // !MYSQL_CLIENT
if
(
db_ok
(
thd
->
db
,
replicate_do_db
,
replicate_ignore_db
))
/*****************************************************************************
{
thd
->
set_time
((
time_t
)
when
);
thd
->
current_tablenr
=
0
;
VOID
(
pthread_mutex_lock
(
&
LOCK_thread_count
));
thd
->
query_id
=
query_id
++
;
VOID
(
pthread_mutex_unlock
(
&
LOCK_thread_count
));
TABLE_LIST
tables
;
Slave_log_event::Slave_log_event()
bzero
((
char
*
)
&
tables
,
sizeof
(
tables
));
tables
.
db
=
thd
->
db
;
****************************************************************************/
tables
.
alias
=
tables
.
real_name
=
(
char
*
)
table_name
;
#ifndef MYSQL_CLIENT
tables
.
lock_type
=
TL_WRITE
;
Slave_log_event
::
Slave_log_event
(
THD
*
thd_arg
,
// the table will be opened in mysql_load
struct
st_relay_log_info
*
rli
)
:
if
(
table_rules_on
&&
!
tables_ok
(
thd
,
&
tables
))
Log_event
(
thd_arg
),
mem_pool
(
0
),
master_host
(
0
)
{
DBUG_ENTER
(
"Slave_log_event"
);
if
(
!
rli
->
inited
)
// QQ When can this happen ?
DBUG_VOID_RETURN
;
MASTER_INFO
*
mi
=
rli
->
mi
;
// TODO: re-write this better without holding both locks at the same time
pthread_mutex_lock
(
&
mi
->
data_lock
);
pthread_mutex_lock
(
&
rli
->
data_lock
);
master_host_len
=
strlen
(
mi
->
host
);
master_log_len
=
strlen
(
rli
->
master_log_name
);
// on OOM, just do not initialize the structure and print the error
if
((
mem_pool
=
(
char
*
)
my_malloc
(
get_data_size
()
+
1
,
MYF
(
MY_WME
))))
{
{
// TODO: this is a bug - this needs to be moved to the I/O thread
master_host
=
mem_pool
+
SL_MASTER_HOST_OFFSET
;
if
(
net
)
memcpy
(
master_host
,
mi
->
host
,
master_host_len
+
1
);
skip_load_data_infile
(
net
);
master_log
=
master_host
+
master_host_len
+
1
;
memcpy
(
master_log
,
rli
->
master_log_name
,
master_log_len
+
1
);
master_port
=
mi
->
port
;
master_pos
=
rli
->
master_log_pos
;
DBUG_PRINT
(
"info"
,
(
"master_log: %s pos: %d"
,
master_log
,
(
ulong
)
master_pos
));
}
}
else
else
{
sql_print_error
(
"Out of memory while recording slave event"
);
pthread_mutex_unlock
(
&
rli
->
data_lock
);
pthread_mutex_unlock
(
&
mi
->
data_lock
);
DBUG_VOID_RETURN
;
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Slave_log_event dtor
****************************************************************************/
Slave_log_event
::~
Slave_log_event
()
{
my_free
(
mem_pool
,
MYF
(
MY_ALLOW_ZERO_PTR
));
}
/*****************************************************************************
Slave_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
void
Slave_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
{
char
llbuff
[
22
];
char
llbuff
[
22
];
enum
enum_duplicates
handle_dup
=
DUP_IGNORE
;
if
(
short_form
)
if
(
sql_ex
.
opt_flags
&&
REPLACE_FLAG
)
return
;
handle_dup
=
DUP_REPLACE
;
print_header
(
file
);
sql_exchange
ex
((
char
*
)
fname
,
sql_ex
.
opt_flags
&&
fputc
(
'\n'
,
file
);
DUMPFILE_FLAG
);
fprintf
(
file
,
"Slave: master_host: '%s' master_port: %d \
String
field_term
(
sql_ex
.
field_term
,
sql_ex
.
field_term_len
,
master_log: '%s' master_pos: %s
\n
"
,
system_charset_info
);
master_host
,
master_port
,
master_log
,
llstr
(
master_pos
,
llbuff
));
String
enclosed
(
sql_ex
.
enclosed
,
sql_ex
.
enclosed_len
,
}
system_charset_info
);
#endif // MYSQL_CLIENT
String
line_term
(
sql_ex
.
line_term
,
sql_ex
.
line_term_len
,
system_charset_info
);
String
line_start
(
sql_ex
.
line_start
,
sql_ex
.
line_start_len
,
system_charset_info
);
String
escaped
(
sql_ex
.
escaped
,
sql_ex
.
escaped_len
,
system_charset_info
);
ex
.
opt_enclosed
=
(
sql_ex
.
opt_flags
&
OPT_ENCLOSED_FLAG
);
/*****************************************************************************
if
(
sql_ex
.
empty_flags
&
FIELD_TERM_EMPTY
)
ex
.
field_term
->
length
(
0
);
ex
.
skip_lines
=
skip_lines
;
Slave_log_event::get_data_size()
List
<
Item
>
fields
;
set_fields
(
fields
);
thd
->
slave_proxy_id
=
thd
->
thread_id
;
if
(
net
)
{
// mysql_load will use thd->net to read the file
thd
->
net
.
vio
=
net
->
vio
;
/*
Make sure the client does not get confused about the packet sequence
*/
thd
->
net
.
pkt_nr
=
net
->
pkt_nr
;
}
if
(
mysql_load
(
thd
,
&
ex
,
&
tables
,
fields
,
handle_dup
,
net
!=
0
,
TL_WRITE
))
thd
->
query_error
=
1
;
if
(
thd
->
cuted_fields
)
sql_print_error
(
"Slave: load data infile at position %s in log \
'%s' produced %d warning(s)"
,
llstr
(
rli
->
master_log_pos
,
llbuff
),
RPL_LOG_NAME
,
thd
->
cuted_fields
);
if
(
net
)
net
->
pkt_nr
=
thd
->
net
.
pkt_nr
;
}
}
else
{
/*
We will just ask the master to send us /dev/null if we do not
want to load the data.
TODO: this a bug - needs to be done in I/O thread
*/
if
(
net
)
skip_load_data_infile
(
net
);
}
thd
->
net
.
vio
=
0
;
****************************************************************************/
thd
->
db
=
0
;
// prevent db from being freed
int
Slave_log_event
::
get_data_size
()
close_thread_tables
(
thd
);
{
if
(
thd
->
query_error
)
return
master_host_len
+
master_log_len
+
1
+
SL_MASTER_HOST_OFFSET
;
{
}
int
sql_error
=
thd
->
net
.
last_errno
;
if
(
!
sql_error
)
sql_error
=
ER_UNKNOWN_ERROR
;
slave_print_error
(
rli
,
sql_error
,
/*****************************************************************************
"Slave: Error '%s' running load data infile "
,
ER_SAFE
(
sql_error
));
free_root
(
&
thd
->
mem_root
,
0
);
return
1
;
}
free_root
(
&
thd
->
mem_root
,
0
);
if
(
thd
->
fatal_error
)
Slave_log_event::write_data()
****************************************************************************/
int
Slave_log_event
::
write_data
(
IO_CACHE
*
file
)
{
int8store
(
mem_pool
+
SL_MASTER_POS_OFFSET
,
master_pos
);
int2store
(
mem_pool
+
SL_MASTER_PORT_OFFSET
,
master_port
);
// log and host are already there
return
my_b_safe_write
(
file
,
(
byte
*
)
mem_pool
,
get_data_size
());
}
/*****************************************************************************
Slave_log_event::init_from_mem_pool()
****************************************************************************/
void
Slave_log_event
::
init_from_mem_pool
(
int
data_size
)
{
master_pos
=
uint8korr
(
mem_pool
+
SL_MASTER_POS_OFFSET
);
master_port
=
uint2korr
(
mem_pool
+
SL_MASTER_PORT_OFFSET
);
master_host
=
mem_pool
+
SL_MASTER_HOST_OFFSET
;
master_host_len
=
strlen
(
master_host
);
// safety
master_log
=
master_host
+
master_host_len
+
1
;
if
(
master_log
>
mem_pool
+
data_size
)
{
{
sql_print_error
(
"Slave: Fatal error running LOAD DATA INFILE "
)
;
master_host
=
0
;
return
1
;
return
;
}
}
master_log_len
=
strlen
(
master_log
);
}
/*****************************************************************************
Slave_log_event::Slave_log_event()
****************************************************************************/
Slave_log_event
::
Slave_log_event
(
const
char
*
buf
,
int
event_len
)
:
Log_event
(
buf
,
0
),
mem_pool
(
0
),
master_host
(
0
)
{
event_len
-=
LOG_EVENT_HEADER_LEN
;
if
(
event_len
<
0
)
return
;
if
(
!
(
mem_pool
=
(
char
*
)
my_malloc
(
event_len
+
1
,
MYF
(
MY_WME
))))
return
;
memcpy
(
mem_pool
,
buf
+
LOG_EVENT_HEADER_LEN
,
event_len
);
mem_pool
[
event_len
]
=
0
;
init_from_mem_pool
(
event_len
);
}
/*****************************************************************************
Slave_log_event::exec_event()
****************************************************************************/
#ifndef MYSQL_CLIENT
int
Slave_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
{
if
(
mysql_bin_log
.
is_open
())
mysql_bin_log
.
write
(
this
);
return
Log_event
::
exec_event
(
rli
);
return
Log_event
::
exec_event
(
rli
);
}
}
#endif // !MYSQL_CLIENT
/*
/*
****************************************************************************
The master started
*****************************************************************************
IMPLEMENTATION
Stop_log_event methods
- To handle the case where the master died without a stop event,
we clean up all temporary tables + locks that we got.
TODO
*****************************************************************************
- Remove all active user locks
****************************************************************************/
- If we have an active transaction at this point, the master died
in the middle while writing the transaction to the binary log.
In this case we should stop the slave.
*/
int
Start_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
/*****************************************************************************
Stop_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
void
Stop_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
{
{
/* All temporary tables was deleted on the master */
if
(
short_form
)
close_temporary_tables
(
thd
);
return
;
/*
If we have old format, load_tmpdir is cleaned up by the I/O thread
print_header
(
file
);
*/
fprintf
(
file
,
"
\t
Stop
\n
"
);
if
(
!
rli
->
mi
->
old_format
)
fflush
(
file
);
cleanup_load_tmpdir
();
return
Log_event
::
exec_event
(
rli
);
}
}
#endif // MYSQL_CLIENT
/*****************************************************************************
Stop_log_event::exec_event()
/*
The master stopped. Clean up all temporary tables + locks that the
The master stopped. Clean up all temporary tables + locks that the
master may have set.
master may have set.
TODO
TODO
- Remove all active user locks
- Remove all active user locks
*/
****************************************************************************/
#ifndef MYSQL_CLIENT
int
Stop_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
int
Stop_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
{
{
// do not clean up immediately after rotate event
// do not clean up immediately after rotate event
...
@@ -1931,70 +2103,156 @@ int Stop_log_event::exec_event(struct st_relay_log_info* rli)
...
@@ -1931,70 +2103,156 @@ int Stop_log_event::exec_event(struct st_relay_log_info* rli)
flush_relay_log_info
(
rli
);
flush_relay_log_info
(
rli
);
return
0
;
return
0
;
}
}
#endif // !MYSQL_CLIENT
/*
/*
****************************************************************************
Got a rotate log even from the master
*****************************************************************************
IMPLEMENTATION
Create_file_log_event methods
This is mainly used so that we can later figure out the logname and
position for the master.
We can't rotate the slave as this will cause infinitive rotations
*****************************************************************************
in a A -> B -> A setup.
****************************************************************************/
RETURN VALUES
/*****************************************************************************
0 ok
*/
Create_file_log_event ctor
int
Rotate_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
****************************************************************************/
#ifndef MYSQL_CLIENT
Create_file_log_event
::
Create_file_log_event
(
THD
*
thd_arg
,
sql_exchange
*
ex
,
const
char
*
db_arg
,
const
char
*
table_name_arg
,
List
<
Item
>&
fields_arg
,
enum
enum_duplicates
handle_dup
,
char
*
block_arg
,
uint
block_len_arg
)
:
Load_log_event
(
thd_arg
,
ex
,
db_arg
,
table_name_arg
,
fields_arg
,
handle_dup
),
fake_base
(
0
),
block
(
block_arg
),
block_len
(
block_len_arg
),
file_id
(
thd_arg
->
file_id
=
mysql_bin_log
.
next_file_id
())
{
{
char
*
log_name
=
rli
->
master_log_name
;
sql_ex
.
force_new_format
();
DBUG_ENTER
(
"Rotate_log_event::exec_event"
);
}
#endif // !MYSQL_CLIENT
pthread_mutex_lock
(
&
rli
->
data_lock
);
/*****************************************************************************
memcpy
(
log_name
,
new_log_ident
,
ident_len
+
1
);
rli
->
master_log_pos
=
pos
;
Create_file_log_event::write_data_body()
rli
->
relay_log_pos
+=
get_event_len
();
DBUG_PRINT
(
"info"
,
(
"master_log_pos: %d"
,
(
ulong
)
rli
->
master_log_pos
));
****************************************************************************/
pthread_mutex_unlock
(
&
rli
->
data_lock
);
int
Create_file_log_event
::
write_data_body
(
IO_CACHE
*
file
)
pthread_cond_broadcast
(
&
rli
->
data_cond
);
{
flush_relay_log_info
(
rli
);
int
res
;
DBUG_RETURN
(
0
);
if
((
res
=
Load_log_event
::
write_data_body
(
file
))
||
fake_base
)
return
res
;
return
(
my_b_safe_write
(
file
,
(
byte
*
)
""
,
1
)
||
my_b_safe_write
(
file
,
(
byte
*
)
block
,
block_len
));
}
}
/*****************************************************************************
Create_file_log_event::write_data_header()
int
Intvar_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
****************************************************************************/
int
Create_file_log_event
::
write_data_header
(
IO_CACHE
*
file
)
{
{
switch
(
type
)
{
int
res
;
case
LAST_INSERT_ID_EVENT
:
if
((
res
=
Load_log_event
::
write_data_header
(
file
))
||
fake_base
)
thd
->
last_insert_id_used
=
1
;
return
res
;
thd
->
last_insert_id
=
val
;
byte
buf
[
CREATE_FILE_HEADER_LEN
];
break
;
int4store
(
buf
+
CF_FILE_ID_OFFSET
,
file_id
);
case
INSERT_ID_EVENT
:
return
my_b_safe_write
(
file
,
buf
,
CREATE_FILE_HEADER_LEN
);
thd
->
next_insert_id
=
val
;
}
break
;
/*****************************************************************************
Create_file_log_event::write_base()
****************************************************************************/
int
Create_file_log_event
::
write_base
(
IO_CACHE
*
file
)
{
int
res
;
fake_base
=
1
;
// pretend we are Load event
res
=
write
(
file
);
fake_base
=
0
;
return
res
;
}
/*****************************************************************************
Create_file_log_event ctor
****************************************************************************/
Create_file_log_event
::
Create_file_log_event
(
const
char
*
buf
,
int
len
,
bool
old_format
)
:
Load_log_event
(
buf
,
0
,
old_format
),
fake_base
(
0
),
block
(
0
),
inited_from_old
(
0
)
{
int
block_offset
;
if
(
copy_log_event
(
buf
,
len
,
old_format
))
return
;
if
(
!
old_format
)
{
file_id
=
uint4korr
(
buf
+
LOG_EVENT_HEADER_LEN
+
+
LOAD_HEADER_LEN
+
CF_FILE_ID_OFFSET
);
// + 1 for \0 terminating fname
block_offset
=
(
LOG_EVENT_HEADER_LEN
+
Load_log_event
::
get_data_size
()
+
CREATE_FILE_HEADER_LEN
+
1
);
if
(
len
<
block_offset
)
return
;
block
=
(
char
*
)
buf
+
block_offset
;
block_len
=
len
-
block_offset
;
}
else
{
sql_ex
.
force_new_format
();
inited_from_old
=
1
;
}
}
rli
->
inc_pending
(
get_event_len
());
return
0
;
}
}
int
Rand_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
/*****************************************************************************
Create_file_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
void
Create_file_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
{
{
thd
->
rand
.
seed1
=
seed1
;
if
(
short_form
)
thd
->
rand
.
seed2
=
seed2
;
return
;
rli
->
inc_pending
(
get_event_len
()
);
Load_log_event
::
print
(
file
,
1
,
last_db
);
return
0
;
fprintf
(
file
,
" file_id: %d block_len: %d
\n
"
,
file_id
,
block_len
)
;
}
}
#endif // MYSQL_CLIENT
int
Slave_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
/*****************************************************************************
Create_file_log_event::pack_info()
****************************************************************************/
#ifndef MYSQL_CLIENT
void
Create_file_log_event
::
pack_info
(
String
*
packet
)
{
{
if
(
mysql_bin_log
.
is_open
())
char
buf1
[
256
],
buf
[
22
],
*
end
;
mysql_bin_log
.
write
(
this
);
String
tmp
(
buf1
,
sizeof
(
buf1
),
system_charset_info
);
return
Log_event
::
exec_event
(
rli
);
tmp
.
length
(
0
);
tmp
.
append
(
"db="
);
tmp
.
append
(
db
,
db_len
);
tmp
.
append
(
";table="
);
tmp
.
append
(
table_name
,
table_name_len
);
tmp
.
append
(
";file_id="
);
end
=
int10_to_str
((
long
)
file_id
,
buf
,
10
);
tmp
.
append
(
buf
,
(
uint32
)
(
end
-
buf
));
tmp
.
append
(
";block_len="
);
end
=
int10_to_str
((
long
)
block_len
,
buf
,
10
);
tmp
.
append
(
buf
,
(
uint32
)
(
end
-
buf
));
net_store_data
(
packet
,
(
char
*
)
tmp
.
ptr
(),
tmp
.
length
());
}
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Create_file_log_event::exec_event()
****************************************************************************/
#ifndef MYSQL_CLIENT
int
Create_file_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
int
Create_file_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
{
{
char
fname_buf
[
FN_REFLEN
+
10
];
char
fname_buf
[
FN_REFLEN
+
10
];
...
@@ -2051,20 +2309,100 @@ err:
...
@@ -2051,20 +2309,100 @@ err:
my_close
(
fd
,
MYF
(
0
));
my_close
(
fd
,
MYF
(
0
));
return
error
?
1
:
Log_event
::
exec_event
(
rli
);
return
error
?
1
:
Log_event
::
exec_event
(
rli
);
}
}
#endif // !MYSQL_CLIENT
int
Delete_file_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
/*****************************************************************************
*****************************************************************************
Append_block_log_event methods
*****************************************************************************
****************************************************************************/
/*****************************************************************************
Append_block_log_event ctor
****************************************************************************/
#ifndef MYSQL_CLIENT
Append_block_log_event
::
Append_block_log_event
(
THD
*
thd_arg
,
char
*
block_arg
,
uint
block_len_arg
)
:
Log_event
(
thd_arg
),
block
(
block_arg
),
block_len
(
block_len_arg
),
file_id
(
thd_arg
->
file_id
)
{
{
char
fname
[
FN_REFLEN
+
10
];
char
*
p
=
slave_load_file_stem
(
fname
,
file_id
,
server_id
);
memcpy
(
p
,
".data"
,
6
);
(
void
)
my_delete
(
fname
,
MYF
(
MY_WME
));
memcpy
(
p
,
".info"
,
6
);
(
void
)
my_delete
(
fname
,
MYF
(
MY_WME
));
if
(
mysql_bin_log
.
is_open
())
mysql_bin_log
.
write
(
this
);
return
Log_event
::
exec_event
(
rli
);
}
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Append_block_log_event ctor
****************************************************************************/
Append_block_log_event
::
Append_block_log_event
(
const
char
*
buf
,
int
len
)
:
Log_event
(
buf
,
0
),
block
(
0
)
{
if
((
uint
)
len
<
APPEND_BLOCK_EVENT_OVERHEAD
)
return
;
file_id
=
uint4korr
(
buf
+
LOG_EVENT_HEADER_LEN
+
AB_FILE_ID_OFFSET
);
block
=
(
char
*
)
buf
+
APPEND_BLOCK_EVENT_OVERHEAD
;
block_len
=
len
-
APPEND_BLOCK_EVENT_OVERHEAD
;
}
/*****************************************************************************
Append_block_log_event::write_data()
****************************************************************************/
int
Append_block_log_event
::
write_data
(
IO_CACHE
*
file
)
{
byte
buf
[
APPEND_BLOCK_HEADER_LEN
];
int4store
(
buf
+
AB_FILE_ID_OFFSET
,
file_id
);
return
(
my_b_safe_write
(
file
,
buf
,
APPEND_BLOCK_HEADER_LEN
)
||
my_b_safe_write
(
file
,
(
byte
*
)
block
,
block_len
));
}
/*****************************************************************************
Append_block_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
void
Append_block_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
{
if
(
short_form
)
return
;
print_header
(
file
);
fputc
(
'\n'
,
file
);
fprintf
(
file
,
"#Append_block: file_id: %d block_len: %d
\n
"
,
file_id
,
block_len
);
}
#endif // MYSQL_CLIENT
/*****************************************************************************
Append_block_log_event::pack_info()
****************************************************************************/
#ifndef MYSQL_CLIENT
void
Append_block_log_event
::
pack_info
(
String
*
packet
)
{
char
buf
[
256
];
uint
length
;
length
=
(
uint
)
my_sprintf
(
buf
,
(
buf
,
";file_id=%u;block_len=%u"
,
file_id
,
block_len
));
net_store_data
(
packet
,
buf
,
(
int32
)
length
);
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Append_block_log_event::exec_event()
****************************************************************************/
#ifndef MYSQL_CLIENT
int
Append_block_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
int
Append_block_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
{
{
char
fname
[
FN_REFLEN
+
10
];
char
fname
[
FN_REFLEN
+
10
];
...
@@ -2092,7 +2430,191 @@ err:
...
@@ -2092,7 +2430,191 @@ err:
my_close
(
fd
,
MYF
(
0
));
my_close
(
fd
,
MYF
(
0
));
return
error
?
error
:
Log_event
::
exec_event
(
rli
);
return
error
?
error
:
Log_event
::
exec_event
(
rli
);
}
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
*****************************************************************************
Delete_file_log_event methods
*****************************************************************************
****************************************************************************/
/*****************************************************************************
Delete_file_log_event ctor
****************************************************************************/
#ifndef MYSQL_CLIENT
Delete_file_log_event
::
Delete_file_log_event
(
THD
*
thd_arg
)
:
Log_event
(
thd_arg
),
file_id
(
thd_arg
->
file_id
)
{
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Delete_file_log_event ctor
****************************************************************************/
Delete_file_log_event
::
Delete_file_log_event
(
const
char
*
buf
,
int
len
)
:
Log_event
(
buf
,
0
),
file_id
(
0
)
{
if
((
uint
)
len
<
DELETE_FILE_EVENT_OVERHEAD
)
return
;
file_id
=
uint4korr
(
buf
+
LOG_EVENT_HEADER_LEN
+
AB_FILE_ID_OFFSET
);
}
/*****************************************************************************
Delete_file_log_event::write_data()
****************************************************************************/
int
Delete_file_log_event
::
write_data
(
IO_CACHE
*
file
)
{
byte
buf
[
DELETE_FILE_HEADER_LEN
];
int4store
(
buf
+
DF_FILE_ID_OFFSET
,
file_id
);
return
my_b_safe_write
(
file
,
buf
,
DELETE_FILE_HEADER_LEN
);
}
/*****************************************************************************
Delete_file_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
void
Delete_file_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
{
if
(
short_form
)
return
;
print_header
(
file
);
fputc
(
'\n'
,
file
);
fprintf
(
file
,
"#Delete_file: file_id=%u
\n
"
,
file_id
);
}
#endif // MYSQL_CLIENT
/*****************************************************************************
Delete_file_log_event::pack_info()
****************************************************************************/
#ifndef MYSQL_CLIENT
void
Delete_file_log_event
::
pack_info
(
String
*
packet
)
{
char
buf
[
64
];
uint
length
;
length
=
(
uint
)
my_sprintf
(
buf
,
(
buf
,
";file_id=%u"
,
(
uint
)
file_id
));
net_store_data
(
packet
,
buf
,
(
int32
)
length
);
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Delete_file_log_event::exec_event()
****************************************************************************/
#ifndef MYSQL_CLIENT
int
Delete_file_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
{
char
fname
[
FN_REFLEN
+
10
];
char
*
p
=
slave_load_file_stem
(
fname
,
file_id
,
server_id
);
memcpy
(
p
,
".data"
,
6
);
(
void
)
my_delete
(
fname
,
MYF
(
MY_WME
));
memcpy
(
p
,
".info"
,
6
);
(
void
)
my_delete
(
fname
,
MYF
(
MY_WME
));
if
(
mysql_bin_log
.
is_open
())
mysql_bin_log
.
write
(
this
);
return
Log_event
::
exec_event
(
rli
);
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
*****************************************************************************
Execute_load_log_event methods
*****************************************************************************
****************************************************************************/
/*****************************************************************************
Execute_load_log_event ctor
****************************************************************************/
#ifndef MYSQL_CLIENT
Execute_load_log_event
::
Execute_load_log_event
(
THD
*
thd_arg
)
:
Log_event
(
thd_arg
),
file_id
(
thd_arg
->
file_id
)
{
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Execute_load_log_event ctor
****************************************************************************/
Execute_load_log_event
::
Execute_load_log_event
(
const
char
*
buf
,
int
len
)
:
Log_event
(
buf
,
0
),
file_id
(
0
)
{
if
((
uint
)
len
<
EXEC_LOAD_EVENT_OVERHEAD
)
return
;
file_id
=
uint4korr
(
buf
+
LOG_EVENT_HEADER_LEN
+
EL_FILE_ID_OFFSET
);
}
/*****************************************************************************
Execute_load_log_event::write_data()
****************************************************************************/
int
Execute_load_log_event
::
write_data
(
IO_CACHE
*
file
)
{
byte
buf
[
EXEC_LOAD_HEADER_LEN
];
int4store
(
buf
+
EL_FILE_ID_OFFSET
,
file_id
);
return
my_b_safe_write
(
file
,
buf
,
EXEC_LOAD_HEADER_LEN
);
}
/*****************************************************************************
Execute_load_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
void
Execute_load_log_event
::
print
(
FILE
*
file
,
bool
short_form
,
char
*
last_db
)
{
if
(
short_form
)
return
;
print_header
(
file
);
fputc
(
'\n'
,
file
);
fprintf
(
file
,
"#Exec_load: file_id=%d
\n
"
,
file_id
);
}
#endif // MYSQL_CLIENT
/*****************************************************************************
Execute_load_log_event::pack_info()
****************************************************************************/
#ifndef MYSQL_CLIENT
void
Execute_load_log_event
::
pack_info
(
String
*
packet
)
{
char
buf
[
64
];
uint
length
;
length
=
(
uint
)
my_sprintf
(
buf
,
(
buf
,
";file_id=%u"
,
(
uint
)
file_id
));
net_store_data
(
packet
,
buf
,
(
int32
)
length
);
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
Execute_load_log_event::exec_event()
****************************************************************************/
#ifndef MYSQL_CLIENT
int
Execute_load_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
int
Execute_load_log_event
::
exec_event
(
struct
st_relay_log_info
*
rli
)
{
{
char
fname
[
FN_REFLEN
+
10
];
char
fname
[
FN_REFLEN
+
10
];
...
@@ -2151,5 +2673,100 @@ err:
...
@@ -2151,5 +2673,100 @@ err:
}
}
return
error
?
error
:
Log_event
::
exec_event
(
rli
);
return
error
?
error
:
Log_event
::
exec_event
(
rli
);
}
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
*****************************************************************************
sql_ex_info methods
*****************************************************************************
****************************************************************************/
/*****************************************************************************
sql_ex_info::write_data()
****************************************************************************/
int
sql_ex_info
::
write_data
(
IO_CACHE
*
file
)
{
if
(
new_format
())
{
return
(
write_str
(
file
,
field_term
,
field_term_len
)
||
write_str
(
file
,
enclosed
,
enclosed_len
)
||
write_str
(
file
,
line_term
,
line_term_len
)
||
write_str
(
file
,
line_start
,
line_start_len
)
||
write_str
(
file
,
escaped
,
escaped_len
)
||
my_b_safe_write
(
file
,(
byte
*
)
&
opt_flags
,
1
));
}
else
{
old_sql_ex
old_ex
;
old_ex
.
field_term
=
*
field_term
;
old_ex
.
enclosed
=
*
enclosed
;
old_ex
.
line_term
=
*
line_term
;
old_ex
.
line_start
=
*
line_start
;
old_ex
.
escaped
=
*
escaped
;
old_ex
.
opt_flags
=
opt_flags
;
old_ex
.
empty_flags
=
empty_flags
;
return
my_b_safe_write
(
file
,
(
byte
*
)
&
old_ex
,
sizeof
(
old_ex
));
}
}
/*****************************************************************************
sql_ex_info::init()
****************************************************************************/
char
*
sql_ex_info
::
init
(
char
*
buf
,
char
*
buf_end
,
bool
use_new_format
)
{
cached_new_format
=
use_new_format
;
if
(
use_new_format
)
{
empty_flags
=
0
;
/*
The code below assumes that buf will not disappear from
under our feet during the lifetime of the event. This assumption
holds true in the slave thread if the log is in new format, but is not
the case when we have old format because we will be reusing net buffer
to read the actual file before we write out the Create_file event.
*/
if
(
read_str
(
buf
,
buf_end
,
field_term
,
field_term_len
)
||
read_str
(
buf
,
buf_end
,
enclosed
,
enclosed_len
)
||
read_str
(
buf
,
buf_end
,
line_term
,
line_term_len
)
||
read_str
(
buf
,
buf_end
,
line_start
,
line_start_len
)
||
read_str
(
buf
,
buf_end
,
escaped
,
escaped_len
))
return
0
;
opt_flags
=
*
buf
++
;
}
else
{
field_term_len
=
enclosed_len
=
line_term_len
=
line_start_len
=
escaped_len
=
1
;
field_term
=
buf
++
;
// Use first byte in string
enclosed
=
buf
++
;
line_term
=
buf
++
;
line_start
=
buf
++
;
escaped
=
buf
++
;
opt_flags
=
*
buf
++
;
empty_flags
=
*
buf
++
;
if
(
empty_flags
&
FIELD_TERM_EMPTY
)
field_term_len
=
0
;
if
(
empty_flags
&
ENCLOSED_EMPTY
)
enclosed_len
=
0
;
if
(
empty_flags
&
LINE_TERM_EMPTY
)
line_term_len
=
0
;
if
(
empty_flags
&
LINE_START_EMPTY
)
line_start_len
=
0
;
if
(
empty_flags
&
ESCAPED_EMPTY
)
escaped_len
=
0
;
}
return
buf
;
}
#endif
/* !MYSQL_CLIENT */
sql/log_event.h
View file @
6d88799e
...
@@ -54,6 +54,11 @@
...
@@ -54,6 +54,11 @@
#define LINE_START_EMPTY 0x8
#define LINE_START_EMPTY 0x8
#define ESCAPED_EMPTY 0x10
#define ESCAPED_EMPTY 0x10
/*****************************************************************************
old_sql_ex struct
****************************************************************************/
struct
old_sql_ex
struct
old_sql_ex
{
{
char
field_term
;
char
field_term
;
...
@@ -67,6 +72,11 @@ struct old_sql_ex
...
@@ -67,6 +72,11 @@ struct old_sql_ex
#define NUM_LOAD_DELIM_STRS 5
#define NUM_LOAD_DELIM_STRS 5
/*****************************************************************************
sql_ex_info struct
****************************************************************************/
struct
sql_ex_info
struct
sql_ex_info
{
{
char
*
field_term
;
char
*
field_term
;
...
@@ -99,13 +109,19 @@ struct sql_ex_info
...
@@ -99,13 +109,19 @@ struct sql_ex_info
}
}
};
};
/*
/*****************************************************************************
Binary log consists of events. Each event has a fixed length header,
followed by possibly variable ( depending on the type of event) length
MySQL Binary Log
data body. The data body consists of an optional fixed length segment
(post-header), and an optional variable length segment. See #defines and
This log consists of events. Each event has a fixed-length header,
comments below for the format specifics
possibly followed by a variable length data body.
*/
The data body consists of an optional fixed length segment (post-header)
and an optional variable length segment.
See the #defines below for the format specifics.
****************************************************************************/
/* event-specific post-header sizes */
/* event-specific post-header sizes */
#define LOG_EVENT_HEADER_LEN 19
#define LOG_EVENT_HEADER_LEN 19
...
@@ -221,6 +237,13 @@ class THD;
...
@@ -221,6 +237,13 @@ class THD;
struct
st_relay_log_info
;
struct
st_relay_log_info
;
/*****************************************************************************
Log_event class
This is the abstract base class for binary log events.
****************************************************************************/
class
Log_event
class
Log_event
{
{
public:
public:
...
@@ -303,6 +326,13 @@ public:
...
@@ -303,6 +326,13 @@ public:
};
};
/*****************************************************************************
Query Log Event class
Logs SQL queries
****************************************************************************/
class
Query_log_event
:
public
Log_event
class
Query_log_event
:
public
Log_event
{
{
protected:
protected:
...
@@ -355,6 +385,11 @@ public:
...
@@ -355,6 +385,11 @@ public:
};
};
/*****************************************************************************
Slave Log Event class
****************************************************************************/
class
Slave_log_event
:
public
Log_event
class
Slave_log_event
:
public
Log_event
{
{
protected:
protected:
...
@@ -384,6 +419,12 @@ public:
...
@@ -384,6 +419,12 @@ public:
int
write_data
(
IO_CACHE
*
file
);
int
write_data
(
IO_CACHE
*
file
);
};
};
/*****************************************************************************
Load Log Event class
****************************************************************************/
class
Load_log_event
:
public
Log_event
class
Load_log_event
:
public
Log_event
{
{
protected:
protected:
...
@@ -446,6 +487,11 @@ public:
...
@@ -446,6 +487,11 @@ public:
extern
char
server_version
[
SERVER_VERSION_LENGTH
];
extern
char
server_version
[
SERVER_VERSION_LENGTH
];
/*****************************************************************************
Start Log Event class
****************************************************************************/
class
Start_log_event
:
public
Log_event
class
Start_log_event
:
public
Log_event
{
{
public:
public:
...
@@ -477,6 +523,13 @@ public:
...
@@ -477,6 +523,13 @@ public:
};
};
/*****************************************************************************
Intvar Log Event class
Logs special variables such as auto_increment values
****************************************************************************/
class
Intvar_log_event
:
public
Log_event
class
Intvar_log_event
:
public
Log_event
{
{
public:
public:
...
@@ -503,9 +556,11 @@ public:
...
@@ -503,9 +556,11 @@ public:
};
};
/*****************************************************************************
/*****************************************************************************
*
* Rand log event class
Rand Log Event class
*
Logs random seed used by the next RAND()
****************************************************************************/
****************************************************************************/
class
Rand_log_event
:
public
Log_event
class
Rand_log_event
:
public
Log_event
{
{
...
@@ -531,6 +586,12 @@ class Rand_log_event: public Log_event
...
@@ -531,6 +586,12 @@ class Rand_log_event: public Log_event
bool
is_valid
()
{
return
1
;
}
bool
is_valid
()
{
return
1
;
}
};
};
/*****************************************************************************
Stop Log Event class
****************************************************************************/
class
Stop_log_event
:
public
Log_event
class
Stop_log_event
:
public
Log_event
{
{
public:
public:
...
@@ -551,6 +612,13 @@ public:
...
@@ -551,6 +612,13 @@ public:
};
};
/*****************************************************************************
Rotate Log Event class
This will be depricated when we move to using sequence ids.
****************************************************************************/
class
Rotate_log_event
:
public
Log_event
class
Rotate_log_event
:
public
Log_event
{
{
public:
public:
...
@@ -585,6 +653,11 @@ public:
...
@@ -585,6 +653,11 @@ public:
/* the classes below are for the new LOAD DATA INFILE logging */
/* the classes below are for the new LOAD DATA INFILE logging */
/*****************************************************************************
Create File Log Event class
****************************************************************************/
class
Create_file_log_event
:
public
Load_log_event
class
Create_file_log_event
:
public
Load_log_event
{
{
protected:
protected:
...
@@ -641,6 +714,11 @@ public:
...
@@ -641,6 +714,11 @@ public:
};
};
/*****************************************************************************
Append Block Log Event class
****************************************************************************/
class
Append_block_log_event
:
public
Log_event
class
Append_block_log_event
:
public
Log_event
{
{
public:
public:
...
@@ -665,7 +743,11 @@ public:
...
@@ -665,7 +743,11 @@ public:
int
write_data
(
IO_CACHE
*
file
);
int
write_data
(
IO_CACHE
*
file
);
};
};
/*****************************************************************************
Delete File Log Event class
****************************************************************************/
class
Delete_file_log_event
:
public
Log_event
class
Delete_file_log_event
:
public
Log_event
{
{
public:
public:
...
@@ -687,6 +769,11 @@ public:
...
@@ -687,6 +769,11 @@ public:
int
write_data
(
IO_CACHE
*
file
);
int
write_data
(
IO_CACHE
*
file
);
};
};
/*****************************************************************************
Execute Load Log Event class
****************************************************************************/
class
Execute_load_log_event
:
public
Log_event
class
Execute_load_log_event
:
public
Log_event
{
{
public:
public:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment