Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
aba008d6
Commit
aba008d6
authored
Oct 23, 2001
by
unknown
Browse files
Options
Browse Files
Download
Plain Diff
Merge work:/home/bk/mysql-4.0
into mysql.sashanet.com:/home/sasha/src/bk/mysql-4.0
parents
4bbbfc01
8fc78e08
Changes
14
Show whitespace changes
Inline
Side-by-side
Showing
14 changed files
with
522 additions
and
721 deletions
+522
-721
include/my_sys.h
include/my_sys.h
+15
-1
libmysql/Makefile.am
libmysql/Makefile.am
+1
-1
libmysql/libmysql.c
libmysql/libmysql.c
+13
-5
mysql-test/r/rpl000001.result
mysql-test/r/rpl000001.result
+24
-1
mysql-test/r/rpl000002.result
mysql-test/r/rpl000002.result
+2
-2
mysql-test/t/rpl000001.test
mysql-test/t/rpl000001.test
+2
-0
mysys/mf_iocache.c
mysys/mf_iocache.c
+210
-49
sql/mf_iocache.cc
sql/mf_iocache.cc
+1
-623
sql/repl_failsafe.cc
sql/repl_failsafe.cc
+177
-0
sql/repl_failsafe.h
sql/repl_failsafe.h
+6
-0
sql/slave.cc
sql/slave.cc
+38
-32
sql/sql_load.cc
sql/sql_load.cc
+18
-4
sql/sql_repl.cc
sql/sql_repl.cc
+14
-3
sql/sql_repl.h
sql/sql_repl.h
+1
-0
No files found.
include/my_sys.h
View file @
aba008d6
...
@@ -243,7 +243,10 @@ typedef struct st_typelib { /* Different types saved here */
...
@@ -243,7 +243,10 @@ typedef struct st_typelib { /* Different types saved here */
const
char
**
type_names
;
const
char
**
type_names
;
}
TYPELIB
;
}
TYPELIB
;
enum
cache_type
{
READ_CACHE
,
WRITE_CACHE
,
READ_FIFO
,
READ_NET
,
WRITE_NET
};
enum
cache_type
{
READ_CACHE
,
WRITE_CACHE
,
SEQ_READ_APPEND
/* sequential read or append */
,
READ_FIFO
,
READ_NET
,
WRITE_NET
};
enum
flush_type
{
FLUSH_KEEP
,
FLUSH_RELEASE
,
FLUSH_IGNORE_CHANGED
,
enum
flush_type
{
FLUSH_KEEP
,
FLUSH_RELEASE
,
FLUSH_IGNORE_CHANGED
,
FLUSH_FORCE_WRITE
};
FLUSH_FORCE_WRITE
};
...
@@ -294,6 +297,16 @@ typedef struct st_io_cache /* Used when cacheing files */
...
@@ -294,6 +297,16 @@ typedef struct st_io_cache /* Used when cacheing files */
{
{
my_off_t
pos_in_file
,
end_of_file
;
my_off_t
pos_in_file
,
end_of_file
;
byte
*
rc_pos
,
*
rc_end
,
*
buffer
,
*
rc_request_pos
;
byte
*
rc_pos
,
*
rc_end
,
*
buffer
,
*
rc_request_pos
;
my_bool
alloced_buffer
;
/* currented READ_NET is the only one
that will use a buffer allocated somewhere
else
*/
byte
*
append_buffer
,
*
append_pos
,
*
append_end
;
/* for append buffer used in READ_APPEND cache */
#ifdef THREAD
pthread_mutex_t
append_buffer_lock
;
/* need mutex copying from append buffer to read buffer */
#endif
int
(
*
read_function
)(
struct
st_io_cache
*
,
byte
*
,
uint
);
int
(
*
read_function
)(
struct
st_io_cache
*
,
byte
*
,
uint
);
/* callbacks when the actual read I/O happens */
/* callbacks when the actual read I/O happens */
IO_CACHE_CALLBACK
pre_read
;
IO_CACHE_CALLBACK
pre_read
;
...
@@ -546,6 +559,7 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
...
@@ -546,6 +559,7 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
my_off_t
seek_offset
,
pbool
use_async_io
,
my_off_t
seek_offset
,
pbool
use_async_io
,
pbool
clear_cache
);
pbool
clear_cache
);
extern
int
_my_b_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
_my_b_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
_my_b_seq_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
_my_b_net_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
_my_b_net_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
_my_b_get
(
IO_CACHE
*
info
);
extern
int
_my_b_get
(
IO_CACHE
*
info
);
extern
int
_my_b_async_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
extern
int
_my_b_async_read
(
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
);
...
...
libmysql/Makefile.am
View file @
aba008d6
...
@@ -18,7 +18,7 @@
...
@@ -18,7 +18,7 @@
# This file is public domain and comes with NO WARRANTY of any kind
# This file is public domain and comes with NO WARRANTY of any kind
target
=
libmysqlclient.la
target
=
libmysqlclient.la
target_defs
=
-DUNDEF_THREADS_HACK
-DDONT_USE_RAID
target_defs
=
-DUNDEF_THREADS_HACK
-DDONT_USE_RAID
-DMYSQL_CLIENT
LIBS
=
@CLIENT_LIBS@
LIBS
=
@CLIENT_LIBS@
INCLUDES
=
-I
$(srcdir)
/../include
-I
../include
\
INCLUDES
=
-I
$(srcdir)
/../include
-I
../include
\
-I
$(srcdir)
/..
-I
$(top_srcdir)
-I
..
$(openssl_includes)
-I
$(srcdir)
/..
-I
$(top_srcdir)
-I
..
$(openssl_includes)
...
...
libmysql/libmysql.c
View file @
aba008d6
...
@@ -1147,6 +1147,8 @@ static inline int get_slaves_from_master(MYSQL* mysql)
...
@@ -1147,6 +1147,8 @@ static inline int get_slaves_from_master(MYSQL* mysql)
MYSQL_ROW
row
;
MYSQL_ROW
row
;
int
error
=
1
;
int
error
=
1
;
int
has_auth_info
;
int
has_auth_info
;
int
port_ind
;
if
(
!
mysql
->
net
.
vio
&&
!
mysql_real_connect
(
mysql
,
0
,
0
,
0
,
0
,
0
,
0
,
0
))
if
(
!
mysql
->
net
.
vio
&&
!
mysql_real_connect
(
mysql
,
0
,
0
,
0
,
0
,
0
,
0
,
0
))
{
{
expand_error
(
mysql
,
CR_PROBE_MASTER_CONNECT
);
expand_error
(
mysql
,
CR_PROBE_MASTER_CONNECT
);
...
@@ -1162,8 +1164,14 @@ static inline int get_slaves_from_master(MYSQL* mysql)
...
@@ -1162,8 +1164,14 @@ static inline int get_slaves_from_master(MYSQL* mysql)
switch
(
mysql_num_fields
(
res
))
switch
(
mysql_num_fields
(
res
))
{
{
case
3
:
has_auth_info
=
0
;
break
;
case
5
:
case
5
:
has_auth_info
=
1
;
break
;
has_auth_info
=
0
;
port_ind
=
2
;
break
;
case
7
:
has_auth_info
=
1
;
port_ind
=
4
;
break
;
default:
default:
goto
err
;
goto
err
;
}
}
...
@@ -1175,8 +1183,8 @@ static inline int get_slaves_from_master(MYSQL* mysql)
...
@@ -1175,8 +1183,8 @@ static inline int get_slaves_from_master(MYSQL* mysql)
if
(
has_auth_info
)
if
(
has_auth_info
)
{
{
tmp_user
=
row
[
3
];
tmp_user
=
row
[
2
];
tmp_pass
=
row
[
4
];
tmp_pass
=
row
[
3
];
}
}
else
else
{
{
...
@@ -1184,7 +1192,7 @@ static inline int get_slaves_from_master(MYSQL* mysql)
...
@@ -1184,7 +1192,7 @@ static inline int get_slaves_from_master(MYSQL* mysql)
tmp_pass
=
mysql
->
passwd
;
tmp_pass
=
mysql
->
passwd
;
}
}
if
(
!
(
slave
=
spawn_init
(
mysql
,
row
[
1
],
atoi
(
row
[
2
]),
if
(
!
(
slave
=
spawn_init
(
mysql
,
row
[
1
],
atoi
(
row
[
port_ind
]),
tmp_user
,
tmp_pass
)))
tmp_user
,
tmp_pass
)))
goto
err
;
goto
err
;
...
...
mysql-test/r/rpl000001.result
View file @
aba008d6
...
@@ -7,6 +7,29 @@ use test;
...
@@ -7,6 +7,29 @@ use test;
drop table if exists t1,t3;
drop table if exists t1,t3;
create table t1 (word char(20) not null);
create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1;
load data infile '../../std_data/words.dat' into table t1;
load data local infile '/home/sasha/bk/mysql-4.0/mysql-test/std_data/words.dat' into table t1;
select * from t1;
word
Aarhus
Aaron
Ababa
aback
abaft
abandon
abandoned
abandoning
abandonment
abandons
Aarhus
Aaron
Ababa
aback
abaft
abandon
abandoned
abandoning
abandonment
abandons
set password = password('foo');
set password = password('foo');
set password = password('');
set password = password('');
create table t3(n int);
create table t3(n int);
...
@@ -18,7 +41,7 @@ n
...
@@ -18,7 +41,7 @@ n
2
2
select sum(length(word)) from t1;
select sum(length(word)) from t1;
sum(length(word))
sum(length(word))
7
1
14
1
drop table t1,t3;
drop table t1,t3;
reset master;
reset master;
reset slave;
reset slave;
...
...
mysql-test/r/rpl000002.result
View file @
aba008d6
...
@@ -15,8 +15,8 @@ n
...
@@ -15,8 +15,8 @@ n
2001
2001
2002
2002
show slave hosts;
show slave hosts;
Server_id Host Port
Server_id Host Port
Rpl_recovery_rank Master_id
2 127.0.0.1
$SLAVE_MYPORT
2 127.0.0.1
9307 2 1
drop table t1;
drop table t1;
slave stop;
slave stop;
drop table if exists t2;
drop table if exists t2;
...
...
mysql-test/t/rpl000001.test
View file @
aba008d6
...
@@ -4,6 +4,8 @@ use test;
...
@@ -4,6 +4,8 @@ use test;
drop
table
if
exists
t1
,
t3
;
drop
table
if
exists
t1
,
t3
;
create
table
t1
(
word
char
(
20
)
not
null
);
create
table
t1
(
word
char
(
20
)
not
null
);
load
data
infile
'../../std_data/words.dat'
into
table
t1
;
load
data
infile
'../../std_data/words.dat'
into
table
t1
;
eval
load
data
local
infile
'$MYSQL_TEST_DIR/std_data/words.dat'
into
table
t1
;
select
*
from
t1
;
set
password
=
password
(
'foo'
);
set
password
=
password
(
'foo'
);
set
password
=
password
(
''
);
set
password
=
password
(
''
);
create
table
t3
(
n
int
);
create
table
t3
(
n
int
);
...
...
mysys/mf_iocache.c
View file @
aba008d6
...
@@ -36,10 +36,34 @@
...
@@ -36,10 +36,34 @@
#include <m_string.h>
#include <m_string.h>
#ifdef HAVE_AIOWAIT
#ifdef HAVE_AIOWAIT
#include "mysys_err.h"
#include "mysys_err.h"
#include <errno.h>
static
void
my_aiowait
(
my_aio_result
*
result
);
static
void
my_aiowait
(
my_aio_result
*
result
);
#endif
#endif
#include <assert.h>
#include <errno.h>
static
void
init_read_function
(
IO_CACHE
*
info
,
enum
cache_type
type
);
static
void
init_read_function
(
IO_CACHE
*
info
,
enum
cache_type
type
)
{
switch
(
type
)
{
#ifndef MYSQL_CLIENT
case
READ_NET
:
/* must be initialized by the caller. The problem is that
_my_b_net_read has to be defined in sql directory because of
the dependency on THD, and therefore cannot be visible to
programs that link against mysys but know nothing about THD, such
as myisamchk
*/
break
;
#endif
case
SEQ_READ_APPEND
:
info
->
read_function
=
_my_b_seq_read
;
break
;
default:
info
->
read_function
=
_my_b_read
;
}
}
/*
/*
** if cachesize == 0 then use default cachesize (from s-file)
** if cachesize == 0 then use default cachesize (from s-file)
...
@@ -55,14 +79,15 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
...
@@ -55,14 +79,15 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
DBUG_ENTER
(
"init_io_cache"
);
DBUG_ENTER
(
"init_io_cache"
);
DBUG_PRINT
(
"enter"
,(
"type: %d pos: %ld"
,(
int
)
type
,
(
ulong
)
seek_offset
));
DBUG_PRINT
(
"enter"
,(
"type: %d pos: %ld"
,(
int
)
type
,
(
ulong
)
seek_offset
));
info
->
file
=
file
;
/* There is no file in net_reading */
info
->
file
=
file
;
info
->
pre_close
=
info
->
pre_read
=
info
->
post_read
=
0
;
info
->
pre_close
=
info
->
pre_read
=
info
->
post_read
=
0
;
info
->
arg
=
0
;
info
->
arg
=
0
;
if
(
!
cachesize
)
if
(
!
cachesize
)
if
(
!
(
cachesize
=
my_default_record_cache_size
))
if
(
!
(
cachesize
=
my_default_record_cache_size
))
DBUG_RETURN
(
1
);
/* No cache requested */
DBUG_RETURN
(
1
);
/* No cache requested */
min_cache
=
use_async_io
?
IO_SIZE
*
4
:
IO_SIZE
*
2
;
min_cache
=
use_async_io
?
IO_SIZE
*
4
:
IO_SIZE
*
2
;
if
(
type
==
READ_CACHE
)
if
(
type
==
READ_CACHE
||
type
==
SEQ_READ_APPEND
)
{
/* Assume file isn't growing */
{
/* Assume file isn't growing */
if
(
cache_myflags
&
MY_DONT_CHECK_FILESIZE
)
if
(
cache_myflags
&
MY_DONT_CHECK_FILESIZE
)
{
{
...
@@ -77,36 +102,65 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
...
@@ -77,36 +102,65 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
if
(
end_of_file
<
seek_offset
)
if
(
end_of_file
<
seek_offset
)
end_of_file
=
seek_offset
;
end_of_file
=
seek_offset
;
VOID
(
my_seek
(
file
,
file_pos
,
MY_SEEK_SET
,
MYF
(
0
)));
VOID
(
my_seek
(
file
,
file_pos
,
MY_SEEK_SET
,
MYF
(
0
)));
if
((
my_off_t
)
cachesize
>
end_of_file
-
seek_offset
+
IO_SIZE
*
2
-
1
)
/* Trim cache size if the file is very small.
However, we should not do this with SEQ_READ_APPEND cache
*/
if
(
type
!=
SEQ_READ_APPEND
&&
(
my_off_t
)
cachesize
>
end_of_file
-
seek_offset
+
IO_SIZE
*
2
-
1
)
{
{
cachesize
=
(
uint
)
(
end_of_file
-
seek_offset
)
+
IO_SIZE
*
2
-
1
;
cachesize
=
(
uint
)
(
end_of_file
-
seek_offset
)
+
IO_SIZE
*
2
-
1
;
use_async_io
=
0
;
/* No ne
a
d to use async */
use_async_io
=
0
;
/* No ne
e
d to use async */
}
}
}
}
}
}
info
->
alloced_buffer
=
0
;
if
((
int
)
type
<
(
int
)
READ_NET
)
{
uint
buffer_block
;
for
(;;)
for
(;;)
{
{
cachesize
=
(
uint
)
((
ulong
)
(
cachesize
+
min_cache
-
1
)
&
buffer_block
=
cachesize
=
(
uint
)
((
ulong
)
(
cachesize
+
min_cache
-
1
)
&
(
ulong
)
~
(
min_cache
-
1
));
(
ulong
)
~
(
min_cache
-
1
));
if
(
type
==
SEQ_READ_APPEND
)
buffer_block
*=
2
;
if
(
cachesize
<
min_cache
)
if
(
cachesize
<
min_cache
)
cachesize
=
min_cache
;
cachesize
=
min_cache
;
if
((
info
->
buffer
=
if
((
info
->
buffer
=
(
byte
*
)
my_malloc
(
cachesize
,
(
byte
*
)
my_malloc
(
buffer_block
,
MYF
((
cache_myflags
&
~
MY_WME
)
|
MYF
((
cache_myflags
&
~
MY_WME
)
|
(
cachesize
==
min_cache
?
MY_WME
:
0
))))
!=
0
)
(
cachesize
==
min_cache
?
MY_WME
:
0
))))
!=
0
)
{
if
(
type
==
SEQ_READ_APPEND
)
info
->
append_buffer
=
info
->
buffer
+
cachesize
;
info
->
alloced_buffer
=
1
;
break
;
/* Enough memory found */
break
;
/* Enough memory found */
}
if
(
cachesize
==
min_cache
)
if
(
cachesize
==
min_cache
)
DBUG_RETURN
(
2
);
/* Can't alloc cache */
DBUG_RETURN
(
2
);
/* Can't alloc cache */
cachesize
=
(
uint
)
((
long
)
cachesize
*
3
/
4
);
/* Try with less memory */
cachesize
=
(
uint
)
((
long
)
cachesize
*
3
/
4
);
/* Try with less memory */
}
}
info
->
pos_in_file
=
seek_offset
;
}
else
info
->
buffer
=
0
;
DBUG_PRINT
(
"info"
,(
"init_io_cache: cachesize = %u"
,
cachesize
));
info
->
pos_in_file
=
seek_offset
;
info
->
read_length
=
info
->
buffer_length
=
cachesize
;
info
->
read_length
=
info
->
buffer_length
=
cachesize
;
info
->
seek_not_done
=
test
(
file
>=
0
);
/* Seek not done */
info
->
seek_not_done
=
test
(
file
>=
0
&&
type
!=
READ_FIFO
&&
type
!=
READ_NET
);
info
->
myflags
=
cache_myflags
&
~
(
MY_NABP
|
MY_FNABP
);
info
->
myflags
=
cache_myflags
&
~
(
MY_NABP
|
MY_FNABP
);
info
->
rc_request_pos
=
info
->
rc_pos
=
info
->
buffer
;
info
->
rc_request_pos
=
info
->
rc_pos
=
info
->
buffer
;
if
(
type
==
SEQ_READ_APPEND
)
{
info
->
append_pos
=
info
->
append_buffer
;
info
->
append_end
=
info
->
append_buffer
+
info
->
buffer_length
;
#ifdef THREAD
pthread_mutex_init
(
&
info
->
append_buffer_lock
,
MY_MUTEX_INIT_FAST
);
#endif
}
if
(
type
==
READ_CACHE
)
if
(
type
==
READ_CACHE
||
type
==
SEQ_READ_APPEND
||
type
==
READ_NET
||
type
==
READ_FIFO
)
{
{
info
->
rc_end
=
info
->
buffer
;
/* Nothing in cache */
info
->
rc_end
=
info
->
buffer
;
/* Nothing in cache */
}
}
...
@@ -114,10 +168,12 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
...
@@ -114,10 +168,12 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
{
{
info
->
rc_end
=
info
->
buffer
+
info
->
buffer_length
-
(
seek_offset
&
(
IO_SIZE
-
1
));
info
->
rc_end
=
info
->
buffer
+
info
->
buffer_length
-
(
seek_offset
&
(
IO_SIZE
-
1
));
}
}
info
->
end_of_file
=
MY_FILEPOS_ERROR
;
/* May be changed by user */
/* end_of_file may be changed by user later */
info
->
end_of_file
=
((
type
==
READ_NET
||
type
==
READ_FIFO
)
?
0
:
~
(
my_off_t
)
0
);
info
->
type
=
type
;
info
->
type
=
type
;
info
->
error
=
0
;
info
->
error
=
0
;
in
fo
->
read_function
=
_my_b_read
;
in
it_read_function
(
info
,
type
)
;
#ifdef HAVE_AIOWAIT
#ifdef HAVE_AIOWAIT
if
(
use_async_io
&&
!
my_disable_async_io
)
if
(
use_async_io
&&
!
my_disable_async_io
)
{
{
...
@@ -169,6 +225,8 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
...
@@ -169,6 +225,8 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
DBUG_ENTER
(
"reinit_io_cache"
);
DBUG_ENTER
(
"reinit_io_cache"
);
info
->
seek_not_done
=
test
(
info
->
file
>=
0
);
/* Seek not done */
info
->
seek_not_done
=
test
(
info
->
file
>=
0
);
/* Seek not done */
/* If the whole file is in memory, avoid flushing to disk */
if
(
!
clear_cache
&&
if
(
!
clear_cache
&&
seek_offset
>=
info
->
pos_in_file
&&
seek_offset
>=
info
->
pos_in_file
&&
seek_offset
<=
info
->
pos_in_file
+
seek_offset
<=
info
->
pos_in_file
+
...
@@ -179,8 +237,12 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
...
@@ -179,8 +237,12 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
info
->
rc_end
=
info
->
rc_pos
;
info
->
rc_end
=
info
->
rc_pos
;
info
->
end_of_file
=
my_b_tell
(
info
);
info
->
end_of_file
=
my_b_tell
(
info
);
}
}
else
if
(
info
->
type
==
READ_CACHE
&&
type
==
WRITE_CACHE
)
else
if
(
type
==
WRITE_CACHE
)
{
if
(
info
->
type
==
READ_CACHE
)
info
->
rc_end
=
info
->
buffer
+
info
->
buffer_length
;
info
->
rc_end
=
info
->
buffer
+
info
->
buffer_length
;
info
->
end_of_file
=
~
(
my_off_t
)
0
;
}
info
->
rc_pos
=
info
->
rc_request_pos
+
(
seek_offset
-
info
->
pos_in_file
);
info
->
rc_pos
=
info
->
rc_request_pos
+
(
seek_offset
-
info
->
pos_in_file
);
#ifdef HAVE_AIOWAIT
#ifdef HAVE_AIOWAIT
my_aiowait
(
&
info
->
aio_result
);
/* Wait for outstanding req */
my_aiowait
(
&
info
->
aio_result
);
/* Wait for outstanding req */
...
@@ -188,13 +250,22 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
...
@@ -188,13 +250,22 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
}
}
else
else
{
{
/*
If we change from WRITE_CACHE to READ_CACHE, assume that everything
after the current positions should be ignored
*/
if
(
info
->
type
==
WRITE_CACHE
&&
type
==
READ_CACHE
)
if
(
info
->
type
==
WRITE_CACHE
&&
type
==
READ_CACHE
)
info
->
end_of_file
=
my_b_tell
(
info
);
info
->
end_of_file
=
my_b_tell
(
info
);
if
(
flush_io_cache
(
info
))
/* No need to flush cache if we want to reuse it */
if
((
type
!=
WRITE_CACHE
||
!
clear_cache
)
&&
flush_io_cache
(
info
))
DBUG_RETURN
(
1
);
DBUG_RETURN
(
1
);
if
(
info
->
pos_in_file
!=
seek_offset
)
{
info
->
pos_in_file
=
seek_offset
;
info
->
pos_in_file
=
seek_offset
;
info
->
seek_not_done
=
1
;
}
info
->
rc_request_pos
=
info
->
rc_pos
=
info
->
buffer
;
info
->
rc_request_pos
=
info
->
rc_pos
=
info
->
buffer
;
if
(
type
==
READ_CACHE
)
if
(
type
==
READ_CACHE
||
type
==
READ_NET
||
type
==
READ_FIFO
)
{
{
info
->
rc_end
=
info
->
buffer
;
/* Nothing in cache */
info
->
rc_end
=
info
->
buffer
;
/* Nothing in cache */
}
}
...
@@ -202,13 +273,16 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
...
@@ -202,13 +273,16 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
{
{
info
->
rc_end
=
info
->
buffer
+
info
->
buffer_length
-
info
->
rc_end
=
info
->
buffer
+
info
->
buffer_length
-
(
seek_offset
&
(
IO_SIZE
-
1
));
(
seek_offset
&
(
IO_SIZE
-
1
));
info
->
end_of_file
=
MY_FILEPOS_ERROR
;
/* May be changed by user */
info
->
end_of_file
=
((
type
==
READ_NET
||
type
==
READ_FIFO
)
?
0
:
~
(
my_off_t
)
0
);
}
}
}
}
info
->
type
=
type
;
info
->
type
=
type
;
info
->
error
=
0
;
info
->
error
=
0
;
in
fo
->
read_function
=
_my_b_read
;
in
it_read_function
(
info
,
type
)
;
#ifdef HAVE_AIOWAIT
#ifdef HAVE_AIOWAIT
if
(
type
!=
READ_NET
)
{
if
(
use_async_io
&&
!
my_disable_async_io
&&
if
(
use_async_io
&&
!
my_disable_async_io
&&
((
ulong
)
info
->
buffer_length
<
((
ulong
)
info
->
buffer_length
<
(
ulong
)
(
info
->
end_of_file
-
seek_offset
)))
(
ulong
)
(
info
->
end_of_file
-
seek_offset
)))
...
@@ -216,6 +290,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
...
@@ -216,6 +290,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
info
->
read_length
=
info
->
buffer_length
/
2
;
info
->
read_length
=
info
->
buffer_length
/
2
;
info
->
read_function
=
_my_b_async_read
;
info
->
read_function
=
_my_b_async_read
;
}
}
}
info
->
inited
=
0
;
info
->
inited
=
0
;
#endif
#endif
DBUG_RETURN
(
0
);
DBUG_RETURN
(
0
);
...
@@ -223,18 +298,27 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
...
@@ -223,18 +298,27 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
/* Read buffered. Returns 1 if can't read requested characters */
/*
/* Returns 0 if record read */
Read buffered. Returns 1 if can't read requested characters
This function is only called from the my_b_read() macro
when there isn't enough characters in the buffer to
satisfy the request.
Returns 0 we succeeded in reading all data
*/
int
_my_b_read
(
register
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
)
int
_my_b_read
(
register
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
)
{
{
uint
length
,
diff_length
,
left_length
;
uint
length
,
diff_length
,
left_length
;
my_off_t
max_length
,
pos_in_file
;
my_off_t
max_length
,
pos_in_file
;
memcpy
(
Buffer
,
info
->
rc_pos
,
if
((
left_length
=
(
uint
)
(
info
->
rc_end
-
info
->
rc_pos
)))
(
size_t
)
(
left_length
=
(
uint
)
(
info
->
rc_end
-
info
->
rc_pos
)));
{
dbug_assert
(
Count
>=
left_length
);
/* User is not using my_b_read() */
memcpy
(
Buffer
,
info
->
rc_pos
,
(
size_t
)
(
left_length
));
Buffer
+=
left_length
;
Buffer
+=
left_length
;
Count
-=
left_length
;
Count
-=
left_length
;
}
/* pos_in_file always point on where info->buffer was read */
pos_in_file
=
info
->
pos_in_file
+
(
uint
)
(
info
->
rc_end
-
info
->
buffer
);
pos_in_file
=
info
->
pos_in_file
+
(
uint
)
(
info
->
rc_end
-
info
->
buffer
);
if
(
info
->
seek_not_done
)
if
(
info
->
seek_not_done
)
{
/* File touched, do seek */
{
/* File touched, do seek */
...
@@ -264,10 +348,10 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
...
@@ -264,10 +348,10 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
left_length
+=
length
;
left_length
+=
length
;
diff_length
=
0
;
diff_length
=
0
;
}
}
max_length
=
info
->
end_of_file
-
pos_in_file
;
if
(
max_length
>
info
->
read_length
-
diff_length
)
max_length
=
info
->
read_length
-
diff_length
;
max_length
=
info
->
read_length
-
diff_length
;
if
(
info
->
type
!=
READ_FIFO
&&
(
info
->
end_of_file
-
pos_in_file
)
<
max_length
)
max_length
=
info
->
end_of_file
-
pos_in_file
;
if
(
!
max_length
)
if
(
!
max_length
)
{
{
if
(
Count
)
if
(
Count
)
...
@@ -293,6 +377,78 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
...
@@ -293,6 +377,78 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
return
0
;
return
0
;
}
}
int
_my_b_seq_read
(
register
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
)
{
uint
length
,
diff_length
,
left_length
;
my_off_t
max_length
,
pos_in_file
;
if
((
left_length
=
(
uint
)
(
info
->
rc_end
-
info
->
rc_pos
)))
{
dbug_assert
(
Count
>=
left_length
);
/* User is not using my_b_read() */
memcpy
(
Buffer
,
info
->
rc_pos
,
(
size_t
)
(
left_length
));
Buffer
+=
left_length
;
Count
-=
left_length
;
}
/* pos_in_file always point on where info->buffer was read */
pos_in_file
=
info
->
pos_in_file
+
(
uint
)
(
info
->
rc_end
-
info
->
buffer
);
/* no need to seek since the read is guaranteed to be sequential */
diff_length
=
(
uint
)
(
pos_in_file
&
(
IO_SIZE
-
1
));
#ifdef THREAD
pthread_mutex_lock
(
&
info
->
append_buffer_lock
);
#endif
#ifdef THREAD
pthread_mutex_unlock
(
&
info
->
append_buffer_lock
);
#endif
if
(
Count
>=
(
uint
)
(
IO_SIZE
+
(
IO_SIZE
-
diff_length
)))
{
/* Fill first intern buffer */
uint
read_length
;
if
(
info
->
end_of_file
==
pos_in_file
)
{
/* End of file */
info
->
error
=
(
int
)
left_length
;
return
1
;
}
length
=
(
Count
&
(
uint
)
~
(
IO_SIZE
-
1
))
-
diff_length
;
if
((
read_length
=
my_read
(
info
->
file
,
Buffer
,(
uint
)
length
,
info
->
myflags
))
!=
(
uint
)
length
)
{
info
->
error
=
read_length
==
(
uint
)
-
1
?
-
1
:
(
int
)
(
read_length
+
left_length
);
return
1
;
}
Count
-=
length
;
Buffer
+=
length
;
pos_in_file
+=
length
;
left_length
+=
length
;
diff_length
=
0
;
}
max_length
=
info
->
read_length
-
diff_length
;
if
(
info
->
type
!=
READ_FIFO
&&
(
info
->
end_of_file
-
pos_in_file
)
<
max_length
)
max_length
=
info
->
end_of_file
-
pos_in_file
;
if
(
!
max_length
)
{
if
(
Count
)
{
info
->
error
=
left_length
;
/* We only got this many char */
return
1
;
}
length
=
0
;
/* Didn't read any chars */
}
else
if
((
length
=
my_read
(
info
->
file
,
info
->
buffer
,(
uint
)
max_length
,
info
->
myflags
))
<
Count
||
length
==
(
uint
)
-
1
)
{
if
(
length
!=
(
uint
)
-
1
)
memcpy
(
Buffer
,
info
->
buffer
,(
size_t
)
length
);
info
->
error
=
length
==
(
uint
)
-
1
?
-
1
:
(
int
)
(
length
+
left_length
);
return
1
;
}
info
->
rc_pos
=
info
->
buffer
+
Count
;
info
->
rc_end
=
info
->
buffer
+
length
;
info
->
pos_in_file
=
pos_in_file
;
memcpy
(
Buffer
,
info
->
buffer
,(
size_t
)
Count
);
return
0
;
}
#ifdef HAVE_AIOWAIT
#ifdef HAVE_AIOWAIT
...
@@ -490,6 +646,11 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
...
@@ -490,6 +646,11 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
Buffer
+=
rest_length
;
Buffer
+=
rest_length
;
Count
-=
rest_length
;
Count
-=
rest_length
;
info
->
rc_pos
+=
rest_length
;
info
->
rc_pos
+=
rest_length
;
if
(
info
->
pos_in_file
+
info
->
buffer_length
>
info
->
end_of_file
)
{
my_errno
=
errno
=
EFBIG
;
return
info
->
error
=
-
1
;
}
if
(
flush_io_cache
(
info
))
if
(
flush_io_cache
(
info
))
return
1
;
return
1
;
if
(
Count
>=
IO_SIZE
)
if
(
Count
>=
IO_SIZE
)
...
@@ -596,7 +757,7 @@ int flush_io_cache(IO_CACHE *info)
...
@@ -596,7 +757,7 @@ int flush_io_cache(IO_CACHE *info)
}
}
}
}
#ifdef HAVE_AIOWAIT
#ifdef HAVE_AIOWAIT
else
else
if
(
info
->
type
!=
READ_NET
)
{
{
my_aiowait
(
&
info
->
aio_result
);
/* Wait for outstanding req */
my_aiowait
(
&
info
->
aio_result
);
/* Wait for outstanding req */
info
->
inited
=
0
;
info
->
inited
=
0
;
...
@@ -613,7 +774,7 @@ int end_io_cache(IO_CACHE *info)
...
@@ -613,7 +774,7 @@ int end_io_cache(IO_CACHE *info)
DBUG_ENTER
(
"end_io_cache"
);
DBUG_ENTER
(
"end_io_cache"
);
if
((
pre_close
=
info
->
pre_close
))
if
((
pre_close
=
info
->
pre_close
))
(
*
pre_close
)(
info
);
(
*
pre_close
)(
info
);
if
(
info
->
buffer
)
if
(
info
->
alloced_
buffer
)
{
{
if
(
info
->
file
!=
-
1
)
/* File doesn't exist */
if
(
info
->
file
!=
-
1
)
/* File doesn't exist */
error
=
flush_io_cache
(
info
);
error
=
flush_io_cache
(
info
);
...
...
sql/mf_iocache.cc
View file @
aba008d6
...
@@ -41,295 +41,6 @@ static void my_aiowait(my_aio_result *result);
...
@@ -41,295 +41,6 @@ static void my_aiowait(my_aio_result *result);
extern
"C"
{
extern
"C"
{
/*
** if cachesize == 0 then use default cachesize (from s-file)
** if file == -1 then real_open_cached_file() will be called.
** returns 0 if ok
*/
int
init_io_cache
(
IO_CACHE
*
info
,
File
file
,
uint
cachesize
,
enum
cache_type
type
,
my_off_t
seek_offset
,
pbool
use_async_io
,
myf
cache_myflags
)
{
uint
min_cache
;
DBUG_ENTER
(
"init_io_cache"
);
DBUG_PRINT
(
"enter"
,(
"type: %d pos: %ld"
,(
int
)
type
,
(
ulong
)
seek_offset
));
/* There is no file in net_reading */
info
->
file
=
file
;
info
->
pre_close
=
info
->
pre_read
=
info
->
post_read
=
0
;
info
->
arg
=
0
;
if
(
!
cachesize
)
if
(
!
(
cachesize
=
my_default_record_cache_size
))
DBUG_RETURN
(
1
);
/* No cache requested */
min_cache
=
use_async_io
?
IO_SIZE
*
4
:
IO_SIZE
*
2
;
if
(
type
==
READ_CACHE
)
{
/* Assume file isn't growing */
if
(
cache_myflags
&
MY_DONT_CHECK_FILESIZE
)
{
cache_myflags
&=
~
MY_DONT_CHECK_FILESIZE
;
}
else
{
my_off_t
file_pos
,
end_of_file
;
if
((
file_pos
=
my_tell
(
file
,
MYF
(
0
))
==
MY_FILEPOS_ERROR
))
DBUG_RETURN
(
1
);
end_of_file
=
my_seek
(
file
,
0L
,
MY_SEEK_END
,
MYF
(
0
));
if
(
end_of_file
<
seek_offset
)
end_of_file
=
seek_offset
;
VOID
(
my_seek
(
file
,
file_pos
,
MY_SEEK_SET
,
MYF
(
0
)));
if
((
my_off_t
)
cachesize
>
end_of_file
-
seek_offset
+
IO_SIZE
*
2
-
1
)
{
cachesize
=
(
uint
)
(
end_of_file
-
seek_offset
)
+
IO_SIZE
*
2
-
1
;
use_async_io
=
0
;
/* No need to use async */
}
}
}
if
((
int
)
type
<
(
int
)
READ_NET
)
{
for
(;;)
{
cachesize
=
(
uint
)
((
ulong
)
(
cachesize
+
min_cache
-
1
)
&
(
ulong
)
~
(
min_cache
-
1
));
if
(
cachesize
<
min_cache
)
cachesize
=
min_cache
;
if
((
info
->
buffer
=
(
byte
*
)
my_malloc
(
cachesize
,
MYF
((
cache_myflags
&
~
MY_WME
)
|
(
cachesize
==
min_cache
?
MY_WME
:
0
))))
!=
0
)
break
;
/* Enough memory found */
if
(
cachesize
==
min_cache
)
DBUG_RETURN
(
2
);
/* Can't alloc cache */
cachesize
=
(
uint
)
((
long
)
cachesize
*
3
/
4
);
/* Try with less memory */
}
}
else
info
->
buffer
=
0
;
DBUG_PRINT
(
"info"
,(
"init_io_cache: cachesize = %u"
,
cachesize
));
info
->
pos_in_file
=
seek_offset
;
info
->
read_length
=
info
->
buffer_length
=
cachesize
;
info
->
seek_not_done
=
test
(
file
>=
0
&&
type
!=
READ_FIFO
&&
type
!=
READ_NET
);
info
->
myflags
=
cache_myflags
&
~
(
MY_NABP
|
MY_FNABP
);
info
->
rc_request_pos
=
info
->
rc_pos
=
info
->
buffer
;
if
(
type
==
READ_CACHE
||
type
==
READ_NET
||
type
==
READ_FIFO
)
{
info
->
rc_end
=
info
->
buffer
;
/* Nothing in cache */
}
else
/* type == WRITE_CACHE */
{
info
->
rc_end
=
info
->
buffer
+
info
->
buffer_length
-
(
seek_offset
&
(
IO_SIZE
-
1
));
}
/* end_of_file may be changed by user later */
info
->
end_of_file
=
((
type
==
READ_NET
||
type
==
READ_FIFO
)
?
0
:
~
(
my_off_t
)
0
);
info
->
type
=
type
;
info
->
error
=
0
;
info
->
read_function
=
(
type
==
READ_NET
)
?
_my_b_net_read
:
_my_b_read
;
/* net | file */
#ifdef HAVE_AIOWAIT
if
(
use_async_io
&&
!
my_disable_async_io
)
{
DBUG_PRINT
(
"info"
,(
"Using async io"
));
info
->
read_length
/=
2
;
info
->
read_function
=
_my_b_async_read
;
}
info
->
inited
=
info
->
aio_result
.
pending
=
0
;
#endif
DBUG_RETURN
(
0
);
}
/* init_io_cache */
/* Wait until current request is ready */
#ifdef HAVE_AIOWAIT
static
void
my_aiowait
(
my_aio_result
*
result
)
{
if
(
result
->
pending
)
{
struct
aio_result_t
*
tmp
;
for
(;;)
{
if
((
int
)
(
tmp
=
aiowait
((
struct
timeval
*
)
0
))
==
-
1
)
{
if
(
errno
==
EINTR
)
continue
;
DBUG_PRINT
(
"error"
,(
"No aio request, error: %d"
,
errno
));
result
->
pending
=
0
;
/* Assume everythings is ok */
break
;
}
((
my_aio_result
*
)
tmp
)
->
pending
=
0
;
if
((
my_aio_result
*
)
tmp
==
result
)
break
;
}
}
return
;
}
#endif
/* Use this to reset cache to start or other type */
/* Some simple optimizing is done when reinit in current buffer */
my_bool
reinit_io_cache
(
IO_CACHE
*
info
,
enum
cache_type
type
,
my_off_t
seek_offset
,
pbool
use_async_io
__attribute__
((
unused
)),
pbool
clear_cache
)
{
DBUG_ENTER
(
"reinit_io_cache"
);
info
->
seek_not_done
=
test
(
info
->
file
>=
0
);
/* Seek not done */
/* If the whole file is in memory, avoid flushing to disk */
if
(
!
clear_cache
&&
seek_offset
>=
info
->
pos_in_file
&&
seek_offset
<=
info
->
pos_in_file
+
(
uint
)
(
info
->
rc_end
-
info
->
rc_request_pos
))
{
/* use current buffer */
if
(
info
->
type
==
WRITE_CACHE
&&
type
==
READ_CACHE
)
{
info
->
rc_end
=
info
->
rc_pos
;
info
->
end_of_file
=
my_b_tell
(
info
);
}
else
if
(
type
==
WRITE_CACHE
)
{
if
(
info
->
type
==
READ_CACHE
)
info
->
rc_end
=
info
->
buffer
+
info
->
buffer_length
;
info
->
end_of_file
=
~
(
my_off_t
)
0
;
}
info
->
rc_pos
=
info
->
rc_request_pos
+
(
seek_offset
-
info
->
pos_in_file
);
#ifdef HAVE_AIOWAIT
my_aiowait
(
&
info
->
aio_result
);
/* Wait for outstanding req */
#endif
}
else
{
/*
If we change from WRITE_CACHE to READ_CACHE, assume that everything
after the current positions should be ignored
*/
if
(
info
->
type
==
WRITE_CACHE
&&
type
==
READ_CACHE
)
info
->
end_of_file
=
my_b_tell
(
info
);
/* No need to flush cache if we want to reuse it */
if
((
type
!=
WRITE_CACHE
||
!
clear_cache
)
&&
flush_io_cache
(
info
))
DBUG_RETURN
(
1
);
if
(
info
->
pos_in_file
!=
seek_offset
)
{
info
->
pos_in_file
=
seek_offset
;
info
->
seek_not_done
=
1
;
}
info
->
rc_request_pos
=
info
->
rc_pos
=
info
->
buffer
;
if
(
type
==
READ_CACHE
||
type
==
READ_NET
||
type
==
READ_FIFO
)
{
info
->
rc_end
=
info
->
buffer
;
/* Nothing in cache */
}
else
{
info
->
rc_end
=
info
->
buffer
+
info
->
buffer_length
-
(
seek_offset
&
(
IO_SIZE
-
1
));
info
->
end_of_file
=
((
type
==
READ_NET
||
type
==
READ_FIFO
)
?
0
:
~
(
my_off_t
)
0
);
}
}
info
->
type
=
type
;
info
->
error
=
0
;
info
->
read_function
=
(
type
==
READ_NET
)
?
_my_b_net_read
:
_my_b_read
;
#ifdef HAVE_AIOWAIT
if
(
type
!=
READ_NET
)
{
if
(
use_async_io
&&
!
my_disable_async_io
&&
((
ulong
)
info
->
buffer_length
<
(
ulong
)
(
info
->
end_of_file
-
seek_offset
)))
{
info
->
read_length
=
info
->
buffer_length
/
2
;
info
->
read_function
=
_my_b_async_read
;
}
}
info
->
inited
=
0
;
#endif
DBUG_RETURN
(
0
);
}
/* init_io_cache */
/*
Read buffered. Returns 1 if can't read requested characters
This function is only called from the my_b_read() macro
when there isn't enough characters in the buffer to
satisfy the request.
Returns 0 we succeeded in reading all data
*/
int
_my_b_read
(
register
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
)
{
uint
length
,
diff_length
,
left_length
;
my_off_t
max_length
,
pos_in_file
;
if
((
left_length
=
(
uint
)
(
info
->
rc_end
-
info
->
rc_pos
)))
{
dbug_assert
(
Count
>=
left_length
);
/* User is not using my_b_read() */
memcpy
(
Buffer
,
info
->
rc_pos
,
(
size_t
)
(
left_length
));
Buffer
+=
left_length
;
Count
-=
left_length
;
}
/* pos_in_file always point on where info->buffer was read */
pos_in_file
=
info
->
pos_in_file
+
(
uint
)
(
info
->
rc_end
-
info
->
buffer
);
if
(
info
->
seek_not_done
)
{
/* File touched, do seek */
VOID
(
my_seek
(
info
->
file
,
pos_in_file
,
MY_SEEK_SET
,
MYF
(
0
)));
info
->
seek_not_done
=
0
;
}
diff_length
=
(
uint
)
(
pos_in_file
&
(
IO_SIZE
-
1
));
if
(
Count
>=
(
uint
)
(
IO_SIZE
+
(
IO_SIZE
-
diff_length
)))
{
/* Fill first intern buffer */
uint
read_length
;
if
(
info
->
end_of_file
==
pos_in_file
)
{
/* End of file */
info
->
error
=
(
int
)
left_length
;
return
1
;
}
length
=
(
Count
&
(
uint
)
~
(
IO_SIZE
-
1
))
-
diff_length
;
if
((
read_length
=
my_read
(
info
->
file
,
Buffer
,(
uint
)
length
,
info
->
myflags
))
!=
(
uint
)
length
)
{
info
->
error
=
read_length
==
(
uint
)
-
1
?
-
1
:
(
int
)
(
read_length
+
left_length
);
return
1
;
}
Count
-=
length
;
Buffer
+=
length
;
pos_in_file
+=
length
;
left_length
+=
length
;
diff_length
=
0
;
}
max_length
=
info
->
read_length
-
diff_length
;
if
(
info
->
type
!=
READ_FIFO
&&
(
info
->
end_of_file
-
pos_in_file
)
<
max_length
)
max_length
=
info
->
end_of_file
-
pos_in_file
;
if
(
!
max_length
)
{
if
(
Count
)
{
info
->
error
=
left_length
;
/* We only got this many char */
return
1
;
}
length
=
0
;
/* Didn't read any chars */
}
else
if
((
length
=
my_read
(
info
->
file
,
info
->
buffer
,(
uint
)
max_length
,
info
->
myflags
))
<
Count
||
length
==
(
uint
)
-
1
)
{
if
(
length
!=
(
uint
)
-
1
)
memcpy
(
Buffer
,
info
->
buffer
,(
size_t
)
length
);
info
->
error
=
length
==
(
uint
)
-
1
?
-
1
:
(
int
)
(
length
+
left_length
);
return
1
;
}
info
->
rc_pos
=
info
->
buffer
+
Count
;
info
->
rc_end
=
info
->
buffer
+
length
;
info
->
pos_in_file
=
pos_in_file
;
memcpy
(
Buffer
,
info
->
buffer
,(
size_t
)
Count
);
return
0
;
}
/*
/*
** Read buffered from the net.
** Read buffered from the net.
** Returns 1 if can't read requested characters
** Returns 1 if can't read requested characters
...
@@ -359,341 +70,8 @@ int _my_b_net_read(register IO_CACHE *info, byte *Buffer,
...
@@ -359,341 +70,8 @@ int _my_b_net_read(register IO_CACHE *info, byte *Buffer,
info
->
rc_end
=
(
info
->
rc_pos
=
(
byte
*
)
net
->
read_pos
)
+
read_length
;
info
->
rc_end
=
(
info
->
rc_pos
=
(
byte
*
)
net
->
read_pos
)
+
read_length
;
Buffer
[
0
]
=
info
->
rc_pos
[
0
];
/* length is always 1 */
Buffer
[
0
]
=
info
->
rc_pos
[
0
];
/* length is always 1 */
info
->
rc_pos
++
;
info
->
rc_pos
++
;
info
->
buffer
=
info
->
rc_pos
;
return
0
;
return
0
;
}
}
#ifdef HAVE_AIOWAIT
int
_my_b_async_read
(
register
IO_CACHE
*
info
,
byte
*
Buffer
,
uint
Count
)
{
uint
length
,
read_length
,
diff_length
,
left_length
,
use_length
,
org_Count
;
my_off_t
max_length
;
my_off_t
next_pos_in_file
;
byte
*
read_buffer
;
memcpy
(
Buffer
,
info
->
rc_pos
,
(
size_t
)
(
left_length
=
(
uint
)
(
info
->
rc_end
-
info
->
rc_pos
)));
Buffer
+=
left_length
;
org_Count
=
Count
;
Count
-=
left_length
;
if
(
info
->
inited
)
{
/* wait for read block */
info
->
inited
=
0
;
/* No more block to read */
my_aiowait
(
&
info
->
aio_result
);
/* Wait for outstanding req */
if
(
info
->
aio_result
.
result
.
aio_errno
)
{
if
(
info
->
myflags
&
MY_WME
)
my_error
(
EE_READ
,
MYF
(
ME_BELL
+
ME_WAITTANG
),
my_filename
(
info
->
file
),
info
->
aio_result
.
result
.
aio_errno
);
my_errno
=
info
->
aio_result
.
result
.
aio_errno
;
info
->
error
=
-
1
;
return
(
1
);
}
if
(
!
(
read_length
=
(
uint
)
info
->
aio_result
.
result
.
aio_return
)
||
read_length
==
(
uint
)
-
1
)
{
my_errno
=
0
;
/* For testing */
info
->
error
=
(
read_length
==
(
uint
)
-
1
?
-
1
:
(
int
)
(
read_length
+
left_length
));
return
(
1
);
}
info
->
pos_in_file
+=
(
uint
)
(
info
->
rc_end
-
info
->
rc_request_pos
);
if
(
info
->
rc_request_pos
!=
info
->
buffer
)
info
->
rc_request_pos
=
info
->
buffer
;
else
info
->
rc_request_pos
=
info
->
buffer
+
info
->
read_length
;
info
->
rc_pos
=
info
->
rc_request_pos
;
next_pos_in_file
=
info
->
aio_read_pos
+
read_length
;
/* Check if pos_in_file is changed
(_ni_read_cache may have skipped some bytes) */
if
(
info
->
aio_read_pos
<
info
->
pos_in_file
)
{
/* Fix if skipped bytes */
if
(
info
->
aio_read_pos
+
read_length
<
info
->
pos_in_file
)
{
read_length
=
0
;
/* Skipp block */
next_pos_in_file
=
info
->
pos_in_file
;
}
else
{
my_off_t
offset
=
(
info
->
pos_in_file
-
info
->
aio_read_pos
);
info
->
pos_in_file
=
info
->
aio_read_pos
;
/* Whe are here */
info
->
rc_pos
=
info
->
rc_request_pos
+
offset
;
read_length
-=
offset
;
/* Bytes left from rc_pos */
}
}
#ifndef DBUG_OFF
if
(
info
->
aio_read_pos
>
info
->
pos_in_file
)
{
my_errno
=
EINVAL
;
return
(
info
->
read_length
=
-
1
);
}
#endif
/* Copy found bytes to buffer */
length
=
min
(
Count
,
read_length
);
memcpy
(
Buffer
,
info
->
rc_pos
,(
size_t
)
length
);
Buffer
+=
length
;
Count
-=
length
;
left_length
+=
length
;
info
->
rc_end
=
info
->
rc_pos
+
read_length
;
info
->
rc_pos
+=
length
;
}
else
next_pos_in_file
=
(
info
->
pos_in_file
+
(
uint
)
(
info
->
rc_end
-
info
->
rc_request_pos
));
/* If reading large blocks, or first read or read with skipp */
if
(
Count
)
{
if
(
next_pos_in_file
==
info
->
end_of_file
)
{
info
->
error
=
(
int
)
(
read_length
+
left_length
);
return
1
;
}
VOID
(
my_seek
(
info
->
file
,
next_pos_in_file
,
MY_SEEK_SET
,
MYF
(
0
)));
read_length
=
IO_SIZE
*
2
-
(
uint
)
(
next_pos_in_file
&
(
IO_SIZE
-
1
));
if
(
Count
<
read_length
)
{
/* Small block, read to cache */
if
((
read_length
=
my_read
(
info
->
file
,
info
->
rc_request_pos
,
read_length
,
info
->
myflags
))
==
(
uint
)
-
1
)
return
info
->
error
=
-
1
;
use_length
=
min
(
Count
,
read_length
);
memcpy
(
Buffer
,
info
->
rc_request_pos
,(
size_t
)
use_length
);
info
->
rc_pos
=
info
->
rc_request_pos
+
Count
;
info
->
rc_end
=
info
->
rc_request_pos
+
read_length
;
info
->
pos_in_file
=
next_pos_in_file
;
/* Start of block in cache */
next_pos_in_file
+=
read_length
;
if
(
Count
!=
use_length
)
{
/* Didn't find hole block */
if
(
info
->
myflags
&
(
MY_WME
|
MY_FAE
|
MY_FNABP
)
&&
Count
!=
org_Count
)
my_error
(
EE_EOFERR
,
MYF
(
ME_BELL
+
ME_WAITTANG
),
my_filename
(
info
->
file
),
my_errno
);
info
->
error
=
(
int
)
(
read_length
+
left_length
);
return
1
;
}
}
else
{
/* Big block, don't cache it */
if
((
read_length
=
my_read
(
info
->
file
,
Buffer
,(
uint
)
Count
,
info
->
myflags
))
!=
Count
)
{
info
->
error
=
read_length
==
(
uint
)
-
1
?
-
1
:
read_length
+
left_length
;
return
1
;
}
info
->
rc_pos
=
info
->
rc_end
=
info
->
rc_request_pos
;
info
->
pos_in_file
=
(
next_pos_in_file
+=
Count
);
}
}
/* Read next block with asyncronic io */
max_length
=
info
->
end_of_file
-
next_pos_in_file
;
diff_length
=
(
next_pos_in_file
&
(
IO_SIZE
-
1
));
if
(
max_length
>
(
my_off_t
)
info
->
read_length
-
diff_length
)
max_length
=
(
my_off_t
)
info
->
read_length
-
diff_length
;
if
(
info
->
rc_request_pos
!=
info
->
buffer
)
read_buffer
=
info
->
buffer
;
else
read_buffer
=
info
->
buffer
+
info
->
read_length
;
info
->
aio_read_pos
=
next_pos_in_file
;
if
(
max_length
)
{
info
->
aio_result
.
result
.
aio_errno
=
AIO_INPROGRESS
;
/* Marker for test */
DBUG_PRINT
(
"aioread"
,(
"filepos: %ld length: %ld"
,
(
ulong
)
next_pos_in_file
,(
ulong
)
max_length
));
if
(
aioread
(
info
->
file
,
read_buffer
,(
int
)
max_length
,
(
my_off_t
)
next_pos_in_file
,
MY_SEEK_SET
,
&
info
->
aio_result
.
result
))
{
/* Skipp async io */
my_errno
=
errno
;
DBUG_PRINT
(
"error"
,(
"got error: %d, aio_result: %d from aioread, async skipped"
,
errno
,
info
->
aio_result
.
result
.
aio_errno
));
if
(
info
->
rc_request_pos
!=
info
->
buffer
)
{
bmove
(
info
->
buffer
,
info
->
rc_request_pos
,
(
uint
)
(
info
->
rc_end
-
info
->
rc_pos
));
info
->
rc_request_pos
=
info
->
buffer
;
info
->
rc_pos
-=
info
->
read_length
;
info
->
rc_end
-=
info
->
read_length
;
}
info
->
read_length
=
info
->
buffer_length
;
/* Use hole buffer */
info
->
read_function
=
_my_b_read
;
/* Use normal IO_READ next */
}
else
info
->
inited
=
info
->
aio_result
.
pending
=
1
;
}
return
0
;
/* Block read, async in use */
}
/* _my_b_async_read */
#endif
/* Read one byte when buffer is empty */
int
_my_b_get
(
IO_CACHE
*
info
)
{
byte
buff
;
IO_CACHE_CALLBACK
pre_read
,
post_read
;
if
((
pre_read
=
info
->
pre_read
))
(
*
pre_read
)(
info
);
if
((
*
(
info
)
->
read_function
)(
info
,
&
buff
,
1
))
return
my_b_EOF
;
if
((
post_read
=
info
->
post_read
))
(
*
post_read
)(
info
);
return
(
int
)
(
uchar
)
buff
;
}
/* Returns != 0 if error on write */
int
_my_b_write
(
register
IO_CACHE
*
info
,
const
byte
*
Buffer
,
uint
Count
)
{
uint
rest_length
,
length
;
rest_length
=
(
uint
)
(
info
->
rc_end
-
info
->
rc_pos
);
memcpy
(
info
->
rc_pos
,
Buffer
,(
size_t
)
rest_length
);
Buffer
+=
rest_length
;
Count
-=
rest_length
;
info
->
rc_pos
+=
rest_length
;
if
(
info
->
pos_in_file
+
info
->
buffer_length
>
info
->
end_of_file
)
{
my_errno
=
errno
=
EFBIG
;
return
info
->
error
=
-
1
;
}
if
(
flush_io_cache
(
info
))
return
1
;
if
(
Count
>=
IO_SIZE
)
{
/* Fill first intern buffer */
length
=
Count
&
(
uint
)
~
(
IO_SIZE
-
1
);
if
(
info
->
seek_not_done
)
{
/* File touched, do seek */
VOID
(
my_seek
(
info
->
file
,
info
->
pos_in_file
,
MY_SEEK_SET
,
MYF
(
0
)));
info
->
seek_not_done
=
0
;
}
if
(
my_write
(
info
->
file
,
Buffer
,(
uint
)
length
,
info
->
myflags
|
MY_NABP
))
return
info
->
error
=
-
1
;
Count
-=
length
;
Buffer
+=
length
;
info
->
pos_in_file
+=
length
;
}
memcpy
(
info
->
rc_pos
,
Buffer
,(
size_t
)
Count
);
info
->
rc_pos
+=
Count
;
return
0
;
}
/*
Write a block to disk where part of the data may be inside the record
buffer. As all write calls to the data goes through the cache,
we will never get a seek over the end of the buffer
*/
int
my_block_write
(
register
IO_CACHE
*
info
,
const
byte
*
Buffer
,
uint
Count
,
my_off_t
pos
)
{
uint
length
;
int
error
=
0
;
if
(
pos
<
info
->
pos_in_file
)
{
/* Of no overlap, write everything without buffering */
if
(
pos
+
Count
<=
info
->
pos_in_file
)
return
my_pwrite
(
info
->
file
,
Buffer
,
Count
,
pos
,
info
->
myflags
|
MY_NABP
);
/* Write the part of the block that is before buffer */
length
=
(
uint
)
(
info
->
pos_in_file
-
pos
);
if
(
my_pwrite
(
info
->
file
,
Buffer
,
length
,
pos
,
info
->
myflags
|
MY_NABP
))
info
->
error
=
error
=-
1
;
Buffer
+=
length
;
pos
+=
length
;
Count
-=
length
;
}
/* Check if we want to write inside the used part of the buffer.*/
length
=
(
uint
)
(
info
->
rc_end
-
info
->
buffer
);
if
(
pos
<
info
->
pos_in_file
+
length
)
{
uint
offset
=
(
uint
)
(
pos
-
info
->
pos_in_file
);
length
-=
offset
;
if
(
length
>
Count
)
length
=
Count
;
memcpy
(
info
->
buffer
+
offset
,
Buffer
,
length
);
Buffer
+=
length
;
Count
-=
length
;
/* Fix length of buffer if the new data was larger */
if
(
info
->
buffer
+
length
>
info
->
rc_pos
)
info
->
rc_pos
=
info
->
buffer
+
length
;
if
(
!
Count
)
return
(
error
);
}
/* Write at the end of the current buffer; This is the normal case */
if
(
_my_b_write
(
info
,
Buffer
,
Count
))
error
=
-
1
;
return
error
;
}
/* Flush write cache */
int
flush_io_cache
(
IO_CACHE
*
info
)
{
uint
length
;
DBUG_ENTER
(
"flush_io_cache"
);
if
(
info
->
type
==
WRITE_CACHE
)
{
if
(
info
->
file
==
-
1
)
{
if
(
real_open_cached_file
(
info
))
DBUG_RETURN
((
info
->
error
=
-
1
));
}
if
(
info
->
rc_pos
!=
info
->
buffer
)
{
length
=
(
uint
)
(
info
->
rc_pos
-
info
->
buffer
);
if
(
info
->
seek_not_done
)
{
/* File touched, do seek */
if
(
my_seek
(
info
->
file
,
info
->
pos_in_file
,
MY_SEEK_SET
,
MYF
(
0
))
==
MY_FILEPOS_ERROR
)
DBUG_RETURN
((
info
->
error
=
-
1
));
info
->
seek_not_done
=
0
;
}
info
->
rc_pos
=
info
->
buffer
;
info
->
pos_in_file
+=
length
;
info
->
rc_end
=
(
info
->
buffer
+
info
->
buffer_length
-
(
info
->
pos_in_file
&
(
IO_SIZE
-
1
)));
if
(
my_write
(
info
->
file
,
info
->
buffer
,
length
,
info
->
myflags
|
MY_NABP
))
DBUG_RETURN
((
info
->
error
=
-
1
));
DBUG_RETURN
(
0
);
}
}
#ifdef HAVE_AIOWAIT
else
if
(
info
->
type
!=
READ_NET
)
{
my_aiowait
(
&
info
->
aio_result
);
/* Wait for outstanding req */
info
->
inited
=
0
;
}
#endif
DBUG_RETURN
(
0
);
}
int
end_io_cache
(
IO_CACHE
*
info
)
{
int
error
=
0
;
IO_CACHE_CALLBACK
pre_close
;
DBUG_ENTER
(
"end_io_cache"
);
if
((
pre_close
=
info
->
pre_close
))
(
*
pre_close
)(
info
);
if
(
info
->
buffer
)
{
if
(
info
->
file
!=
-
1
)
/* File doesn't exist */
error
=
flush_io_cache
(
info
);
my_free
((
gptr
)
info
->
buffer
,
MYF
(
MY_WME
));
info
->
buffer
=
info
->
rc_pos
=
(
byte
*
)
0
;
}
DBUG_RETURN
(
error
);
}
/* end_io_cache */
}
/* extern "C" */
}
/* extern "C" */
sql/repl_failsafe.cc
View file @
aba008d6
...
@@ -18,6 +18,10 @@
...
@@ -18,6 +18,10 @@
#include "mysql_priv.h"
#include "mysql_priv.h"
#include "repl_failsafe.h"
#include "repl_failsafe.h"
#include "sql_repl.h"
#include "slave.h"
#include "mini_client.h"
#include <mysql.h>
RPL_STATUS
rpl_status
=
RPL_NULL
;
RPL_STATUS
rpl_status
=
RPL_NULL
;
pthread_mutex_t
LOCK_rpl_status
;
pthread_mutex_t
LOCK_rpl_status
;
...
@@ -33,11 +37,184 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE",
...
@@ -33,11 +37,184 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE",
TYPELIB
rpl_status_typelib
=
{
array_elements
(
rpl_status_type
)
-
1
,
""
,
TYPELIB
rpl_status_typelib
=
{
array_elements
(
rpl_status_type
)
-
1
,
""
,
rpl_status_type
};
rpl_status_type
};
static
int
init_failsafe_rpl_thread
(
THD
*
thd
)
{
DBUG_ENTER
(
"init_failsafe_rpl_thread"
);
thd
->
system_thread
=
thd
->
bootstrap
=
1
;
thd
->
client_capabilities
=
0
;
my_net_init
(
&
thd
->
net
,
0
);
thd
->
net
.
timeout
=
slave_net_timeout
;
thd
->
max_packet_length
=
thd
->
net
.
max_packet
;
thd
->
master_access
=
~
0
;
thd
->
priv_user
=
0
;
thd
->
system_thread
=
1
;
pthread_mutex_lock
(
&
LOCK_thread_count
);
thd
->
thread_id
=
thread_id
++
;
pthread_mutex_unlock
(
&
LOCK_thread_count
);
if
(
init_thr_lock
()
||
my_pthread_setspecific_ptr
(
THR_THD
,
thd
)
||
my_pthread_setspecific_ptr
(
THR_MALLOC
,
&
thd
->
mem_root
)
||
my_pthread_setspecific_ptr
(
THR_NET
,
&
thd
->
net
))
{
close_connection
(
&
thd
->
net
,
ER_OUT_OF_RESOURCES
);
// is this needed?
end_thread
(
thd
,
0
);
DBUG_RETURN
(
-
1
);
}
thd
->
mysys_var
=
my_thread_var
;
thd
->
dbug_thread_id
=
my_thread_id
();
#if !defined(__WIN__) && !defined(OS2)
sigset_t
set
;
VOID
(
sigemptyset
(
&
set
));
// Get mask in use
VOID
(
pthread_sigmask
(
SIG_UNBLOCK
,
&
set
,
&
thd
->
block_signals
));
#endif
thd
->
mem_root
.
free
=
thd
->
mem_root
.
used
=
0
;
if
(
thd
->
max_join_size
==
(
ulong
)
~
0L
)
thd
->
options
|=
OPTION_BIG_SELECTS
;
thd
->
proc_info
=
"Thread initialized"
;
thd
->
version
=
refresh_version
;
thd
->
set_time
();
DBUG_RETURN
(
0
);
}
void
change_rpl_status
(
RPL_STATUS
from_status
,
RPL_STATUS
to_status
)
void
change_rpl_status
(
RPL_STATUS
from_status
,
RPL_STATUS
to_status
)
{
{
pthread_mutex_lock
(
&
LOCK_rpl_status
);
pthread_mutex_lock
(
&
LOCK_rpl_status
);
if
(
rpl_status
==
from_status
||
rpl_status
==
RPL_ANY
)
if
(
rpl_status
==
from_status
||
rpl_status
==
RPL_ANY
)
rpl_status
=
to_status
;
rpl_status
=
to_status
;
pthread_cond_signal
(
&
COND_rpl_status
);
pthread_mutex_unlock
(
&
LOCK_rpl_status
);
}
int
update_slave_list
(
MYSQL
*
mysql
)
{
MYSQL_RES
*
res
=
0
;
MYSQL_ROW
row
;
const
char
*
error
=
0
;
bool
have_auth_info
;
int
port_ind
;
if
(
mc_mysql_query
(
mysql
,
"SHOW SLAVE HOSTS"
,
0
)
||
!
(
res
=
mc_mysql_store_result
(
mysql
)))
{
error
=
"Query error"
;
goto
err
;
}
switch
(
mc_mysql_num_fields
(
res
))
{
case
5
:
have_auth_info
=
0
;
port_ind
=
2
;
break
;
case
7
:
have_auth_info
=
1
;
port_ind
=
4
;
break
;
default:
error
=
"Invalid number of fields in SHOW SLAVE HOSTS"
;
goto
err
;
}
pthread_mutex_lock
(
&
LOCK_slave_list
);
while
((
row
=
mc_mysql_fetch_row
(
res
)))
{
uint32
server_id
;
SLAVE_INFO
*
si
,
*
old_si
;
server_id
=
atoi
(
row
[
0
]);
if
((
old_si
=
(
SLAVE_INFO
*
)
hash_search
(
&
slave_list
,
(
byte
*
)
&
server_id
,
4
)))
si
=
old_si
;
else
{
if
(
!
(
si
=
(
SLAVE_INFO
*
)
my_malloc
(
sizeof
(
SLAVE_INFO
),
MYF
(
MY_WME
))))
{
error
=
"Out of memory"
;
pthread_mutex_unlock
(
&
LOCK_slave_list
);
goto
err
;
}
si
->
server_id
=
server_id
;
}
strnmov
(
si
->
host
,
row
[
1
],
sizeof
(
si
->
host
));
si
->
port
=
atoi
(
row
[
port_ind
]);
si
->
rpl_recovery_rank
=
atoi
(
row
[
port_ind
+
1
]);
si
->
master_id
=
atoi
(
row
[
port_ind
+
2
]);
if
(
have_auth_info
)
{
strnmov
(
si
->
user
,
row
[
2
],
sizeof
(
si
->
user
));
strnmov
(
si
->
password
,
row
[
3
],
sizeof
(
si
->
password
));
}
}
pthread_mutex_unlock
(
&
LOCK_slave_list
);
err:
if
(
res
)
mc_mysql_free_result
(
res
);
if
(
error
)
{
sql_print_error
(
"Error updating slave list:"
,
error
);
return
1
;
}
return
0
;
}
int
find_recovery_captain
(
THD
*
thd
,
MYSQL
*
mysql
)
{
return
0
;
}
pthread_handler_decl
(
handle_failsafe_rpl
,
arg
)
{
DBUG_ENTER
(
"handle_failsafe_rpl"
);
THD
*
thd
=
new
THD
;
thd
->
thread_stack
=
(
char
*
)
&
thd
;
MYSQL
*
recovery_captain
=
0
;
pthread_detach_this_thread
();
if
(
init_failsafe_rpl_thread
(
thd
)
||
!
(
recovery_captain
=
mc_mysql_init
(
0
)))
{
sql_print_error
(
"Could not initialize failsafe replication thread"
);
goto
err
;
}
pthread_mutex_lock
(
&
LOCK_rpl_status
);
while
(
!
thd
->
killed
&&
!
abort_loop
)
{
bool
break_req_chain
=
0
;
const
char
*
msg
=
thd
->
enter_cond
(
&
COND_rpl_status
,
&
LOCK_rpl_status
,
"Waiting for request"
);
pthread_cond_wait
(
&
COND_rpl_status
,
&
LOCK_rpl_status
);
thd
->
proc_info
=
"Processling request"
;
while
(
!
break_req_chain
)
{
switch
(
rpl_status
)
{
case
RPL_LOST_SOLDIER
:
if
(
find_recovery_captain
(
thd
,
recovery_captain
))
rpl_status
=
RPL_TROOP_SOLDIER
;
else
rpl_status
=
RPL_RECOVERY_CAPTAIN
;
break_req_chain
=
1
;
/* for now until other states are implemented */
break
;
default:
break_req_chain
=
1
;
break
;
}
}
thd
->
exit_cond
(
msg
);
}
pthread_mutex_unlock
(
&
LOCK_rpl_status
);
pthread_mutex_unlock
(
&
LOCK_rpl_status
);
err:
if
(
recovery_captain
)
mc_mysql_close
(
recovery_captain
);
delete
thd
;
my_thread_end
();
pthread_exit
(
0
);
DBUG_RETURN
(
0
);
}
}
sql/repl_failsafe.h
View file @
aba008d6
#ifndef REPL_FAILSAFE_H
#ifndef REPL_FAILSAFE_H
#define REPL_FAILSAFE_H
#define REPL_FAILSAFE_H
#include "mysql.h"
typedef
enum
{
RPL_AUTH_MASTER
=
0
,
RPL_ACTIVE_SLAVE
,
RPL_IDLE_SLAVE
,
typedef
enum
{
RPL_AUTH_MASTER
=
0
,
RPL_ACTIVE_SLAVE
,
RPL_IDLE_SLAVE
,
RPL_LOST_SOLDIER
,
RPL_TROOP_SOLDIER
,
RPL_LOST_SOLDIER
,
RPL_TROOP_SOLDIER
,
RPL_RECOVERY_CAPTAIN
,
RPL_NULL
/* inactive */
,
RPL_RECOVERY_CAPTAIN
,
RPL_NULL
/* inactive */
,
...
@@ -10,7 +12,11 @@ extern RPL_STATUS rpl_status;
...
@@ -10,7 +12,11 @@ extern RPL_STATUS rpl_status;
extern
pthread_mutex_t
LOCK_rpl_status
;
extern
pthread_mutex_t
LOCK_rpl_status
;
extern
pthread_cond_t
COND_rpl_status
;
extern
pthread_cond_t
COND_rpl_status
;
extern
TYPELIB
rpl_role_typelib
,
rpl_status_typelib
;
extern
TYPELIB
rpl_role_typelib
,
rpl_status_typelib
;
extern
uint
rpl_recovery_rank
;
extern
const
char
*
rpl_role_type
[],
*
rpl_status_type
[];
extern
const
char
*
rpl_role_type
[],
*
rpl_status_type
[];
pthread_handler_decl
(
handle_failsafe_rpl
,
arg
);
void
change_rpl_status
(
RPL_STATUS
from_status
,
RPL_STATUS
to_status
);
void
change_rpl_status
(
RPL_STATUS
from_status
,
RPL_STATUS
to_status
);
int
find_recovery_captain
(
THD
*
thd
,
MYSQL
*
mysql
);
int
update_slave_list
(
MYSQL
*
mysql
);
#endif
#endif
sql/slave.cc
View file @
aba008d6
...
@@ -55,6 +55,8 @@ inline bool slave_killed(THD* thd);
...
@@ -55,6 +55,8 @@ inline bool slave_killed(THD* thd);
static
int
init_slave_thread
(
THD
*
thd
);
static
int
init_slave_thread
(
THD
*
thd
);
static
int
safe_connect
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
);
static
int
safe_connect
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
);
static
int
safe_reconnect
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
);
static
int
safe_reconnect
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
);
static
int
connect_to_master
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
,
bool
reconnect
);
static
int
safe_sleep
(
THD
*
thd
,
int
sec
);
static
int
safe_sleep
(
THD
*
thd
,
int
sec
);
static
int
request_table_dump
(
MYSQL
*
mysql
,
const
char
*
db
,
const
char
*
table
);
static
int
request_table_dump
(
MYSQL
*
mysql
,
const
char
*
db
,
const
char
*
table
);
static
int
create_table_from_dump
(
THD
*
thd
,
NET
*
net
,
const
char
*
db
,
static
int
create_table_from_dump
(
THD
*
thd
,
NET
*
net
,
const
char
*
db
,
...
@@ -615,6 +617,10 @@ int register_slave_on_master(MYSQL* mysql)
...
@@ -615,6 +617,10 @@ int register_slave_on_master(MYSQL* mysql)
int2store
(
buf
,
(
uint16
)
report_port
);
int2store
(
buf
,
(
uint16
)
report_port
);
packet
.
append
(
buf
,
2
);
packet
.
append
(
buf
,
2
);
int4store
(
buf
,
rpl_recovery_rank
);
packet
.
append
(
buf
,
4
);
int4store
(
buf
,
0
);
/* tell the master will fill in master_id */
packet
.
append
(
buf
,
4
);
if
(
mc_simple_command
(
mysql
,
COM_REGISTER_SLAVE
,
(
char
*
)
packet
.
ptr
(),
if
(
mc_simple_command
(
mysql
,
COM_REGISTER_SLAVE
,
(
char
*
)
packet
.
ptr
(),
packet
.
length
(),
0
))
packet
.
length
(),
0
))
...
@@ -868,7 +874,7 @@ command");
...
@@ -868,7 +874,7 @@ command");
}
}
static
u
int
read_event
(
MYSQL
*
mysql
,
MASTER_INFO
*
mi
)
static
u
long
read_event
(
MYSQL
*
mysql
,
MASTER_INFO
*
mi
)
{
{
ulong
len
=
packet_error
;
ulong
len
=
packet_error
;
// for convinience lets think we start by
// for convinience lets think we start by
...
@@ -1017,7 +1023,6 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
...
@@ -1017,7 +1023,6 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init
();
my_thread_init
();
slave_thd
=
thd
=
new
THD
;
// note that contructor of THD uses DBUG_ !
slave_thd
=
thd
=
new
THD
;
// note that contructor of THD uses DBUG_ !
thd
->
set_time
();
DBUG_ENTER
(
"handle_slave"
);
DBUG_ENTER
(
"handle_slave"
);
pthread_detach_this_thread
();
pthread_detach_this_thread
();
...
@@ -1067,6 +1072,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
...
@@ -1067,6 +1072,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
// on with life
// on with life
thd
->
proc_info
=
"Registering slave on master"
;
thd
->
proc_info
=
"Registering slave on master"
;
register_slave_on_master
(
mysql
);
register_slave_on_master
(
mysql
);
update_slave_list
(
mysql
);
while
(
!
slave_killed
(
thd
))
while
(
!
slave_killed
(
thd
))
{
{
...
@@ -1117,7 +1123,7 @@ try again, log '%s' at postion %s", RPL_LOG_NAME,
...
@@ -1117,7 +1123,7 @@ try again, log '%s' at postion %s", RPL_LOG_NAME,
while
(
!
slave_killed
(
thd
))
while
(
!
slave_killed
(
thd
))
{
{
thd
->
proc_info
=
"Reading master update"
;
thd
->
proc_info
=
"Reading master update"
;
u
int
event_len
=
read_event
(
mysql
,
&
glob_mi
);
u
long
event_len
=
read_event
(
mysql
,
&
glob_mi
);
if
(
slave_killed
(
thd
))
if
(
slave_killed
(
thd
))
{
{
sql_print_error
(
"Slave thread killed while reading event"
);
sql_print_error
(
"Slave thread killed while reading event"
);
...
@@ -1244,30 +1250,7 @@ position %s",
...
@@ -1244,30 +1250,7 @@ position %s",
static
int
safe_connect
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
)
static
int
safe_connect
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
)
{
{
int
slave_was_killed
;
return
connect_to_master
(
thd
,
mysql
,
mi
,
0
);
#ifndef DBUG_OFF
events_till_disconnect
=
disconnect_slave_event_count
;
#endif
while
(
!
(
slave_was_killed
=
slave_killed
(
thd
))
&&
!
mc_mysql_connect
(
mysql
,
mi
->
host
,
mi
->
user
,
mi
->
password
,
0
,
mi
->
port
,
0
,
0
))
{
sql_print_error
(
"Slave thread: error connecting to master: %s (%d),\
retry in %d sec"
,
mc_mysql_error
(
mysql
),
errno
,
mi
->
connect_retry
);
safe_sleep
(
thd
,
mi
->
connect_retry
);
}
if
(
!
slave_was_killed
)
{
change_rpl_status
(
RPL_IDLE_SLAVE
,
RPL_ACTIVE_SLAVE
);
mysql_log
.
write
(
thd
,
COM_CONNECT_OUT
,
"%s@%s:%d"
,
mi
->
user
,
mi
->
host
,
mi
->
port
);
#ifdef SIGNAL_WITH_VIO_CLOSE
thd
->
set_active_vio
(
mysql
->
net
.
vio
);
#endif
}
return
slave_was_killed
;
}
}
/*
/*
...
@@ -1275,7 +1258,8 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
...
@@ -1275,7 +1258,8 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
master_retry_count times
master_retry_count times
*/
*/
static
int
safe_reconnect
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
)
static
int
connect_to_master
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
,
bool
reconnect
)
{
{
int
slave_was_killed
;
int
slave_was_killed
;
int
last_errno
=
-
2
;
// impossible error
int
last_errno
=
-
2
;
// impossible error
...
@@ -1290,12 +1274,15 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
...
@@ -1290,12 +1274,15 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
#ifndef DBUG_OFF
#ifndef DBUG_OFF
events_till_disconnect
=
disconnect_slave_event_count
;
events_till_disconnect
=
disconnect_slave_event_count
;
#endif
#endif
while
(
!
(
slave_was_killed
=
slave_killed
(
thd
))
&&
mc_mysql_reconnect
(
mysql
))
while
(
!
(
slave_was_killed
=
slave_killed
(
thd
))
&&
(
reconnect
?
mc_mysql_reconnect
(
mysql
)
:
!
mc_mysql_connect
(
mysql
,
mi
->
host
,
mi
->
user
,
mi
->
password
,
0
,
mi
->
port
,
0
,
0
)))
{
{
/* Don't repeat last error */
/* Don't repeat last error */
if
(
mc_mysql_errno
(
mysql
)
!=
last_errno
)
if
(
mc_mysql_errno
(
mysql
)
!=
last_errno
)
{
{
sql_print_error
(
"Slave thread: error
re-
connecting to master: \
sql_print_error
(
"Slave thread: error connecting to master: \
%s, last_errno=%d, retry in %d sec"
,
%s, last_errno=%d, retry in %d sec"
,
mc_mysql_error
(
mysql
),
last_errno
=
mc_mysql_errno
(
mysql
),
mc_mysql_error
(
mysql
),
last_errno
=
mc_mysql_errno
(
mysql
),
mi
->
connect_retry
);
mi
->
connect_retry
);
...
@@ -1309,6 +1296,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
...
@@ -1309,6 +1296,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
if
(
master_retry_count
&&
err_count
++
==
master_retry_count
)
if
(
master_retry_count
&&
err_count
++
==
master_retry_count
)
{
{
slave_was_killed
=
1
;
slave_was_killed
=
1
;
if
(
reconnect
)
change_rpl_status
(
RPL_ACTIVE_SLAVE
,
RPL_LOST_SOLDIER
);
change_rpl_status
(
RPL_ACTIVE_SLAVE
,
RPL_LOST_SOLDIER
);
break
;
break
;
}
}
...
@@ -1316,11 +1304,18 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
...
@@ -1316,11 +1304,18 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
if
(
!
slave_was_killed
)
if
(
!
slave_was_killed
)
{
{
sql_print_error
(
"Slave: reconnected to master '%s@%s:%d',\
if
(
reconnect
)
sql_print_error
(
"Slave: connected to master '%s@%s:%d',\
replication resumed in log '%s' at position %s"
,
glob_mi
.
user
,
replication resumed in log '%s' at position %s"
,
glob_mi
.
user
,
glob_mi
.
host
,
glob_mi
.
port
,
glob_mi
.
host
,
glob_mi
.
port
,
RPL_LOG_NAME
,
RPL_LOG_NAME
,
llstr
(
glob_mi
.
pos
,
llbuff
));
llstr
(
glob_mi
.
pos
,
llbuff
));
else
{
change_rpl_status
(
RPL_IDLE_SLAVE
,
RPL_ACTIVE_SLAVE
);
mysql_log
.
write
(
thd
,
COM_CONNECT_OUT
,
"%s@%s:%d"
,
mi
->
user
,
mi
->
host
,
mi
->
port
);
}
#ifdef SIGNAL_WITH_VIO_CLOSE
#ifdef SIGNAL_WITH_VIO_CLOSE
thd
->
set_active_vio
(
mysql
->
net
.
vio
);
thd
->
set_active_vio
(
mysql
->
net
.
vio
);
#endif
#endif
...
@@ -1329,6 +1324,17 @@ replication resumed in log '%s' at position %s", glob_mi.user,
...
@@ -1329,6 +1324,17 @@ replication resumed in log '%s' at position %s", glob_mi.user,
return
slave_was_killed
;
return
slave_was_killed
;
}
}
/*
Try to connect until successful or slave killed or we have retried
master_retry_count times
*/
static
int
safe_reconnect
(
THD
*
thd
,
MYSQL
*
mysql
,
MASTER_INFO
*
mi
)
{
return
connect_to_master
(
thd
,
mysql
,
mi
,
1
);
}
#ifdef __GNUC__
#ifdef __GNUC__
template
class
I_List_iterator
<
i_string
>;
template
class
I_List_iterator
<
i_string
>;
template
class
I_List_iterator
<
i_string_pair
>;
template
class
I_List_iterator
<
i_string_pair
>;
...
...
sql/sql_load.cc
View file @
aba008d6
...
@@ -277,10 +277,13 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
...
@@ -277,10 +277,13 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if
(
using_transactions
)
if
(
using_transactions
)
ha_autocommit_or_rollback
(
thd
,
error
);
ha_autocommit_or_rollback
(
thd
,
error
);
if
(
!
opt_old_rpl_compat
&&
mysql_bin_log
.
is_open
())
if
(
!
opt_old_rpl_compat
&&
mysql_bin_log
.
is_open
())
{
if
(
lf_info
.
wrote_create_file
)
{
{
Delete_file_log_event
d
(
thd
);
Delete_file_log_event
d
(
thd
);
mysql_bin_log
.
write
(
&
d
);
mysql_bin_log
.
write
(
&
d
);
}
}
}
DBUG_RETURN
(
-
1
);
// Error on read
DBUG_RETURN
(
-
1
);
// Error on read
}
}
sprintf
(
name
,
ER
(
ER_LOAD_INFO
),
info
.
records
,
info
.
deleted
,
sprintf
(
name
,
ER
(
ER_LOAD_INFO
),
info
.
records
,
info
.
deleted
,
...
@@ -303,10 +306,13 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
...
@@ -303,10 +306,13 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if
(
!
opt_old_rpl_compat
)
if
(
!
opt_old_rpl_compat
)
{
{
read_info
.
end_io_cache
();
// make sure last block gets logged
read_info
.
end_io_cache
();
// make sure last block gets logged
if
(
lf_info
.
wrote_create_file
)
{
Execute_load_log_event
e
(
thd
);
Execute_load_log_event
e
(
thd
);
mysql_bin_log
.
write
(
&
e
);
mysql_bin_log
.
write
(
&
e
);
}
}
}
}
}
if
(
using_transactions
)
if
(
using_transactions
)
error
=
ha_autocommit_or_rollback
(
thd
,
error
);
error
=
ha_autocommit_or_rollback
(
thd
,
error
);
DBUG_RETURN
(
error
);
DBUG_RETURN
(
error
);
...
@@ -534,6 +540,14 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term,
...
@@ -534,6 +540,14 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term,
}
}
else
else
{
{
/* init_io_cache() will not initialize read_function member
if the cache is READ_NET. The reason is explained in
mysys/mf_iocache.c. So we work around the problem with a
manual assignment
*/
if
(
get_it_from_net
)
cache
.
read_function
=
_my_b_net_read
;
need_end_io_cache
=
1
;
need_end_io_cache
=
1
;
if
(
!
opt_old_rpl_compat
&&
mysql_bin_log
.
is_open
())
if
(
!
opt_old_rpl_compat
&&
mysql_bin_log
.
is_open
())
cache
.
pre_read
=
cache
.
pre_close
=
cache
.
pre_read
=
cache
.
pre_close
=
...
...
sql/sql_repl.cc
View file @
aba008d6
...
@@ -140,6 +140,11 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
...
@@ -140,6 +140,11 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
get_object
(
p
,
si
->
user
);
get_object
(
p
,
si
->
user
);
get_object
(
p
,
si
->
password
);
get_object
(
p
,
si
->
password
);
si
->
port
=
uint2korr
(
p
);
si
->
port
=
uint2korr
(
p
);
p
+=
2
;
si
->
rpl_recovery_rank
=
uint4korr
(
p
);
p
+=
4
;
if
(
!
(
si
->
master_id
=
uint4korr
(
p
)))
si
->
master_id
=
server_id
;
si
->
thd
=
thd
;
si
->
thd
=
thd
;
pthread_mutex_lock
(
&
LOCK_slave_list
);
pthread_mutex_lock
(
&
LOCK_slave_list
);
...
@@ -534,6 +539,7 @@ impossible position";
...
@@ -534,6 +539,7 @@ impossible position";
DBUG_PRINT
(
"wait"
,(
"waiting for data on binary log"
));
DBUG_PRINT
(
"wait"
,(
"waiting for data on binary log"
));
if
(
!
thd
->
killed
)
if
(
!
thd
->
killed
)
pthread_cond_wait
(
&
COND_binlog_update
,
log_lock
);
pthread_cond_wait
(
&
COND_binlog_update
,
log_lock
);
DBUG_PRINT
(
"wait"
,(
"binary log received update"
));
break
;
break
;
default:
default:
...
@@ -1253,6 +1259,8 @@ int show_slave_hosts(THD* thd)
...
@@ -1253,6 +1259,8 @@ int show_slave_hosts(THD* thd)
field_list
.
push_back
(
new
Item_empty_string
(
"Password"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Password"
,
20
));
}
}
field_list
.
push_back
(
new
Item_empty_string
(
"Port"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Port"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Rpl_recovery_rank"
,
20
));
field_list
.
push_back
(
new
Item_empty_string
(
"Master_id"
,
20
));
if
(
send_fields
(
thd
,
field_list
,
1
))
if
(
send_fields
(
thd
,
field_list
,
1
))
DBUG_RETURN
(
-
1
);
DBUG_RETURN
(
-
1
);
...
@@ -1271,6 +1279,8 @@ int show_slave_hosts(THD* thd)
...
@@ -1271,6 +1279,8 @@ int show_slave_hosts(THD* thd)
net_store_data
(
packet
,
si
->
password
);
net_store_data
(
packet
,
si
->
password
);
}
}
net_store_data
(
packet
,
(
uint32
)
si
->
port
);
net_store_data
(
packet
,
(
uint32
)
si
->
port
);
net_store_data
(
packet
,
si
->
rpl_recovery_rank
);
net_store_data
(
packet
,
si
->
master_id
);
if
(
my_net_write
(
net
,
(
char
*
)
packet
->
ptr
(),
packet
->
length
()))
if
(
my_net_write
(
net
,
(
char
*
)
packet
->
ptr
(),
packet
->
length
()))
{
{
pthread_mutex_unlock
(
&
LOCK_slave_list
);
pthread_mutex_unlock
(
&
LOCK_slave_list
);
...
@@ -1616,7 +1626,8 @@ int log_loaded_block(IO_CACHE* file)
...
@@ -1616,7 +1626,8 @@ int log_loaded_block(IO_CACHE* file)
{
{
LOAD_FILE_INFO
*
lf_info
;
LOAD_FILE_INFO
*
lf_info
;
uint
block_len
;
uint
block_len
;
if
(
!
(
block_len
=
file
->
rc_end
-
file
->
buffer
))
char
*
buffer
=
(
char
*
)
file
->
buffer
;
if
(
!
(
block_len
=
file
->
rc_end
-
buffer
))
return
0
;
return
0
;
lf_info
=
(
LOAD_FILE_INFO
*
)
file
->
arg
;
lf_info
=
(
LOAD_FILE_INFO
*
)
file
->
arg
;
if
(
lf_info
->
last_pos_in_file
!=
HA_POS_ERROR
&&
if
(
lf_info
->
last_pos_in_file
!=
HA_POS_ERROR
&&
...
@@ -1625,14 +1636,14 @@ int log_loaded_block(IO_CACHE* file)
...
@@ -1625,14 +1636,14 @@ int log_loaded_block(IO_CACHE* file)
lf_info
->
last_pos_in_file
=
file
->
pos_in_file
;
lf_info
->
last_pos_in_file
=
file
->
pos_in_file
;
if
(
lf_info
->
wrote_create_file
)
if
(
lf_info
->
wrote_create_file
)
{
{
Append_block_log_event
a
(
lf_info
->
thd
,
(
char
*
)
file
->
buffer
,
block_len
);
Append_block_log_event
a
(
lf_info
->
thd
,
buffer
,
block_len
);
mysql_bin_log
.
write
(
&
a
);
mysql_bin_log
.
write
(
&
a
);
}
}
else
else
{
{
Create_file_log_event
c
(
lf_info
->
thd
,
lf_info
->
ex
,
lf_info
->
db
,
Create_file_log_event
c
(
lf_info
->
thd
,
lf_info
->
ex
,
lf_info
->
db
,
lf_info
->
table_name
,
*
lf_info
->
fields
,
lf_info
->
table_name
,
*
lf_info
->
fields
,
lf_info
->
handle_dup
,
(
char
*
)
file
->
buffer
,
lf_info
->
handle_dup
,
buffer
,
block_len
);
block_len
);
mysql_bin_log
.
write
(
&
c
);
mysql_bin_log
.
write
(
&
c
);
lf_info
->
wrote_create_file
=
1
;
lf_info
->
wrote_create_file
=
1
;
...
...
sql/sql_repl.h
View file @
aba008d6
...
@@ -6,6 +6,7 @@
...
@@ -6,6 +6,7 @@
typedef
struct
st_slave_info
typedef
struct
st_slave_info
{
{
uint32
server_id
;
uint32
server_id
;
uint32
rpl_recovery_rank
,
master_id
;
char
host
[
HOSTNAME_LENGTH
+
1
];
char
host
[
HOSTNAME_LENGTH
+
1
];
char
user
[
USERNAME_LENGTH
+
1
];
char
user
[
USERNAME_LENGTH
+
1
];
char
password
[
HASH_PASSWORD_LENGTH
+
1
];
char
password
[
HASH_PASSWORD_LENGTH
+
1
];
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment