Commit a23f2a3a authored by unknown's avatar unknown

cleanup

removal of duplicate code in mf_iocache.cc 
work on failsafe replication
work on SEQ_READ_APPEND io cache


include/my_sys.h:
  updates for SEQ_READ_APPEND
libmysql/Makefile.am:
  fix for mysys/mf_iocache.c
libmysql/libmysql.c:
  updates for new format of SHOW SLAVE HOSTS
mysql-test/r/rpl000001.result:
  test replication of LOAD DATA LOCAL INFILE
mysql-test/r/rpl000002.result:
  updated test result
mysql-test/t/rpl000001.test:
  test LOAD DATA LOCAL INFILE
mysys/mf_iocache.c:
  cleanup to remove duplicate functionality
  some work on SEQ_READ_APPEND
sql/mf_iocache.cc:
  cleanup to remove duplicate functionality
sql/repl_failsafe.cc:
  more work on failsafe replication
sql/repl_failsafe.h:
  more work on failsafe replication
sql/slave.cc:
  cleanup
  more work on failsafe replication
sql/sql_load.cc:
  fixed bug on replicating empty file loads
  got LOAD DATA LOCAL INFILE to work again, and to be replicated
sql/sql_repl.cc:
  cleanup
  more work on failsafe replication
sql/sql_repl.h:
  more work on failsafe replication
parent b303703b
......@@ -243,7 +243,10 @@ typedef struct st_typelib { /* Different types saved here */
const char **type_names;
} TYPELIB;
enum cache_type {READ_CACHE,WRITE_CACHE,READ_FIFO,READ_NET,WRITE_NET};
enum cache_type {READ_CACHE,WRITE_CACHE,
SEQ_READ_APPEND /* sequential read or append */,
READ_FIFO,
READ_NET,WRITE_NET};
enum flush_type { FLUSH_KEEP, FLUSH_RELEASE, FLUSH_IGNORE_CHANGED,
FLUSH_FORCE_WRITE};
......@@ -294,6 +297,16 @@ typedef struct st_io_cache /* Used when cacheing files */
{
my_off_t pos_in_file,end_of_file;
byte *rc_pos,*rc_end,*buffer,*rc_request_pos;
my_bool alloced_buffer; /* currented READ_NET is the only one
that will use a buffer allocated somewhere
else
*/
byte *append_buffer, *append_pos, *append_end;
/* for append buffer used in READ_APPEND cache */
#ifdef THREAD
pthread_mutex_t append_buffer_lock;
/* need mutex copying from append buffer to read buffer */
#endif
int (*read_function)(struct st_io_cache *,byte *,uint);
/* callbacks when the actual read I/O happens */
IO_CACHE_CALLBACK pre_read;
......@@ -546,6 +559,7 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
my_off_t seek_offset,pbool use_async_io,
pbool clear_cache);
extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_seq_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_get(IO_CACHE *info);
extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count);
......
......@@ -18,7 +18,7 @@
# This file is public domain and comes with NO WARRANTY of any kind
target = libmysqlclient.la
target_defs = -DUNDEF_THREADS_HACK -DDONT_USE_RAID
target_defs = -DUNDEF_THREADS_HACK -DDONT_USE_RAID -DMYSQL_CLIENT
LIBS = @CLIENT_LIBS@
INCLUDES = -I$(srcdir)/../include -I../include \
-I$(srcdir)/.. -I$(top_srcdir) -I.. $(openssl_includes)
......
......@@ -1147,6 +1147,8 @@ static inline int get_slaves_from_master(MYSQL* mysql)
MYSQL_ROW row;
int error = 1;
int has_auth_info;
int port_ind;
if (!mysql->net.vio && !mysql_real_connect(mysql,0,0,0,0,0,0,0))
{
expand_error(mysql, CR_PROBE_MASTER_CONNECT);
......@@ -1162,8 +1164,14 @@ static inline int get_slaves_from_master(MYSQL* mysql)
switch (mysql_num_fields(res))
{
case 3: has_auth_info = 0; break;
case 5: has_auth_info = 1; break;
case 5:
has_auth_info = 0;
port_ind=2;
break;
case 7:
has_auth_info = 1;
port_ind=4;
break;
default:
goto err;
}
......@@ -1175,8 +1183,8 @@ static inline int get_slaves_from_master(MYSQL* mysql)
if (has_auth_info)
{
tmp_user = row[3];
tmp_pass = row[4];
tmp_user = row[2];
tmp_pass = row[3];
}
else
{
......@@ -1184,7 +1192,7 @@ static inline int get_slaves_from_master(MYSQL* mysql)
tmp_pass = mysql->passwd;
}
if (!(slave = spawn_init(mysql, row[1], atoi(row[2]),
if (!(slave = spawn_init(mysql, row[1], atoi(row[port_ind]),
tmp_user, tmp_pass)))
goto err;
......
......@@ -7,6 +7,29 @@ use test;
drop table if exists t1,t3;
create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1;
load data local infile '/home/sasha/bk/mysql-4.0/mysql-test/std_data/words.dat' into table t1;
select * from t1;
word
Aarhus
Aaron
Ababa
aback
abaft
abandon
abandoned
abandoning
abandonment
abandons
Aarhus
Aaron
Ababa
aback
abaft
abandon
abandoned
abandoning
abandonment
abandons
set password = password('foo');
set password = password('');
create table t3(n int);
......@@ -18,7 +41,7 @@ n
2
select sum(length(word)) from t1;
sum(length(word))
71
141
drop table t1,t3;
reset master;
reset slave;
......
......@@ -15,8 +15,8 @@ n
2001
2002
show slave hosts;
Server_id Host Port
2 127.0.0.1 $SLAVE_MYPORT
Server_id Host Port Rpl_recovery_rank Master_id
2 127.0.0.1 9307 2 1
drop table t1;
slave stop;
drop table if exists t2;
......
......@@ -4,6 +4,8 @@ use test;
drop table if exists t1,t3;
create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1;
eval load data local infile '$MYSQL_TEST_DIR/std_data/words.dat' into table t1;
select * from t1;
set password = password('foo');
set password = password('');
create table t3(n int);
......
......@@ -36,10 +36,34 @@
#include <m_string.h>
#ifdef HAVE_AIOWAIT
#include "mysys_err.h"
#include <errno.h>
static void my_aiowait(my_aio_result *result);
#endif
#include <assert.h>
#include <errno.h>
static void init_read_function(IO_CACHE* info, enum cache_type type);
static void init_read_function(IO_CACHE* info, enum cache_type type)
{
switch (type)
{
#ifndef MYSQL_CLIENT
case READ_NET:
/* must be initialized by the caller. The problem is that
_my_b_net_read has to be defined in sql directory because of
the dependency on THD, and therefore cannot be visible to
programs that link against mysys but know nothing about THD, such
as myisamchk
*/
break;
#endif
case SEQ_READ_APPEND:
info->read_function = _my_b_seq_read;
break;
default:
info->read_function = _my_b_read;
}
}
/*
** if cachesize == 0 then use default cachesize (from s-file)
......@@ -55,14 +79,15 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
DBUG_ENTER("init_io_cache");
DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset));
info->file=file;
/* There is no file in net_reading */
info->file= file;
info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
if (!cachesize)
if (! (cachesize= my_default_record_cache_size))
DBUG_RETURN(1); /* No cache requested */
min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
if (type == READ_CACHE)
if (type == READ_CACHE || type == SEQ_READ_APPEND)
{ /* Assume file isn't growing */
if (cache_myflags & MY_DONT_CHECK_FILESIZE)
{
......@@ -77,36 +102,65 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
if (end_of_file < seek_offset)
end_of_file=seek_offset;
VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0)));
if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
/* Trim cache size if the file is very small.
However, we should not do this with SEQ_READ_APPEND cache
*/
if (type != SEQ_READ_APPEND &&
(my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
{
cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1;
use_async_io=0; /* No nead to use async */
use_async_io=0; /* No need to use async */
}
}
}
info->alloced_buffer = 0;
if ((int) type < (int) READ_NET)
{
uint buffer_block;
for (;;)
{
cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
buffer_block = cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
(ulong) ~(min_cache-1));
if (type == SEQ_READ_APPEND)
buffer_block *= 2;
if (cachesize < min_cache)
cachesize = min_cache;
if ((info->buffer=
(byte*) my_malloc(cachesize,
(byte*) my_malloc(buffer_block,
MYF((cache_myflags & ~ MY_WME) |
(cachesize == min_cache ? MY_WME : 0)))) != 0)
{
if (type == SEQ_READ_APPEND)
info->append_buffer = info->buffer + cachesize;
info->alloced_buffer=1;
break; /* Enough memory found */
}
if (cachesize == min_cache)
DBUG_RETURN(2); /* Can't alloc cache */
cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */
}
info->pos_in_file=seek_offset;
}
else
info->buffer=0;
DBUG_PRINT("info",("init_io_cache: cachesize = %u",cachesize));
info->pos_in_file= seek_offset;
info->read_length=info->buffer_length=cachesize;
info->seek_not_done= test(file >= 0); /* Seek not done */
info->seek_not_done= test(file >= 0 && type != READ_FIFO &&
type != READ_NET);
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
info->rc_request_pos=info->rc_pos=info->buffer;
if (type == SEQ_READ_APPEND)
{
info->append_pos = info->append_buffer;
info->append_end = info->append_buffer + info->buffer_length;
#ifdef THREAD
pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST);
#endif
}
if (type == READ_CACHE)
if (type == READ_CACHE || type == SEQ_READ_APPEND ||
type == READ_NET || type == READ_FIFO)
{
info->rc_end=info->buffer; /* Nothing in cache */
}
......@@ -114,10 +168,12 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
{
info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
}
info->end_of_file=MY_FILEPOS_ERROR; /* May be changed by user */
/* end_of_file may be changed by user later */
info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0
: ~(my_off_t) 0);
info->type=type;
info->error=0;
info->read_function=_my_b_read;
init_read_function(info,type);
#ifdef HAVE_AIOWAIT
if (use_async_io && ! my_disable_async_io)
{
......@@ -169,6 +225,8 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
DBUG_ENTER("reinit_io_cache");
info->seek_not_done= test(info->file >= 0); /* Seek not done */
/* If the whole file is in memory, avoid flushing to disk */
if (! clear_cache &&
seek_offset >= info->pos_in_file &&
seek_offset <= info->pos_in_file +
......@@ -179,8 +237,12 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
info->rc_end=info->rc_pos;
info->end_of_file=my_b_tell(info);
}
else if (info->type == READ_CACHE && type == WRITE_CACHE)
else if (type == WRITE_CACHE)
{
if (info->type == READ_CACHE)
info->rc_end=info->buffer+info->buffer_length;
info->end_of_file = ~(my_off_t) 0;
}
info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
#ifdef HAVE_AIOWAIT
my_aiowait(&info->aio_result); /* Wait for outstanding req */
......@@ -188,13 +250,22 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
}
else
{
/*
If we change from WRITE_CACHE to READ_CACHE, assume that everything
after the current positions should be ignored
*/
if (info->type == WRITE_CACHE && type == READ_CACHE)
info->end_of_file=my_b_tell(info);
if (flush_io_cache(info))
/* No need to flush cache if we want to reuse it */
if ((type != WRITE_CACHE || !clear_cache) && flush_io_cache(info))
DBUG_RETURN(1);
if (info->pos_in_file != seek_offset)
{
info->pos_in_file=seek_offset;
info->seek_not_done=1;
}
info->rc_request_pos=info->rc_pos=info->buffer;
if (type == READ_CACHE)
if (type == READ_CACHE || type == READ_NET || type == READ_FIFO)
{
info->rc_end=info->buffer; /* Nothing in cache */
}
......@@ -202,13 +273,16 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
{
info->rc_end=info->buffer+info->buffer_length-
(seek_offset & (IO_SIZE-1));
info->end_of_file=MY_FILEPOS_ERROR; /* May be changed by user */
info->end_of_file= ((type == READ_NET || type == READ_FIFO) ? 0 :
~(my_off_t) 0);
}
}
info->type=type;
info->error=0;
info->read_function=_my_b_read;
init_read_function(info,type);
#ifdef HAVE_AIOWAIT
if (type != READ_NET)
{
if (use_async_io && ! my_disable_async_io &&
((ulong) info->buffer_length <
(ulong) (info->end_of_file - seek_offset)))
......@@ -216,6 +290,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
info->read_length=info->buffer_length/2;
info->read_function=_my_b_async_read;
}
}
info->inited=0;
#endif
DBUG_RETURN(0);
......@@ -223,18 +298,27 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
/* Read buffered. Returns 1 if can't read requested characters */
/* Returns 0 if record read */
/*
Read buffered. Returns 1 if can't read requested characters
This function is only called from the my_b_read() macro
when there isn't enough characters in the buffer to
satisfy the request.
Returns 0 we succeeded in reading all data
*/
int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
uint length,diff_length,left_length;
my_off_t max_length, pos_in_file;
memcpy(Buffer,info->rc_pos,
(size_t) (left_length=(uint) (info->rc_end-info->rc_pos)));
if ((left_length=(uint) (info->rc_end-info->rc_pos)))
{
dbug_assert(Count >= left_length); /* User is not using my_b_read() */
memcpy(Buffer,info->rc_pos, (size_t) (left_length));
Buffer+=left_length;
Count-=left_length;
}
/* pos_in_file always point on where info->buffer was read */
pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
if (info->seek_not_done)
{ /* File touched, do seek */
......@@ -264,10 +348,10 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
left_length+=length;
diff_length=0;
}
max_length=info->end_of_file - pos_in_file;
if (max_length > info->read_length-diff_length)
max_length=info->read_length-diff_length;
if (info->type != READ_FIFO &&
(info->end_of_file - pos_in_file) < max_length)
max_length = info->end_of_file - pos_in_file;
if (!max_length)
{
if (Count)
......@@ -293,6 +377,78 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
return 0;
}
int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
uint length,diff_length,left_length;
my_off_t max_length, pos_in_file;
if ((left_length=(uint) (info->rc_end-info->rc_pos)))
{
dbug_assert(Count >= left_length); /* User is not using my_b_read() */
memcpy(Buffer,info->rc_pos, (size_t) (left_length));
Buffer+=left_length;
Count-=left_length;
}
/* pos_in_file always point on where info->buffer was read */
pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
/* no need to seek since the read is guaranteed to be sequential */
diff_length=(uint) (pos_in_file & (IO_SIZE-1));
#ifdef THREAD
pthread_mutex_lock(&info->append_buffer_lock);
#endif
#ifdef THREAD
pthread_mutex_unlock(&info->append_buffer_lock);
#endif
if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length)))
{ /* Fill first intern buffer */
uint read_length;
if (info->end_of_file == pos_in_file)
{ /* End of file */
info->error=(int) left_length;
return 1;
}
length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
!= (uint) length)
{
info->error= read_length == (uint) -1 ? -1 :
(int) (read_length+left_length);
return 1;
}
Count-=length;
Buffer+=length;
pos_in_file+=length;
left_length+=length;
diff_length=0;
}
max_length=info->read_length-diff_length;
if (info->type != READ_FIFO &&
(info->end_of_file - pos_in_file) < max_length)
max_length = info->end_of_file - pos_in_file;
if (!max_length)
{
if (Count)
{
info->error= left_length; /* We only got this many char */
return 1;
}
length=0; /* Didn't read any chars */
}
else if ((length=my_read(info->file,info->buffer,(uint) max_length,
info->myflags)) < Count ||
length == (uint) -1)
{
if (length != (uint) -1)
memcpy(Buffer,info->buffer,(size_t) length);
info->error= length == (uint) -1 ? -1 : (int) (length+left_length);
return 1;
}
info->rc_pos=info->buffer+Count;
info->rc_end=info->buffer+length;
info->pos_in_file=pos_in_file;
memcpy(Buffer,info->buffer,(size_t) Count);
return 0;
}
#ifdef HAVE_AIOWAIT
......@@ -490,6 +646,11 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
Buffer+=rest_length;
Count-=rest_length;
info->rc_pos+=rest_length;
if (info->pos_in_file+info->buffer_length > info->end_of_file)
{
my_errno=errno=EFBIG;
return info->error = -1;
}
if (flush_io_cache(info))
return 1;
if (Count >= IO_SIZE)
......@@ -596,7 +757,7 @@ int flush_io_cache(IO_CACHE *info)
}
}
#ifdef HAVE_AIOWAIT
else
else if (info->type != READ_NET)
{
my_aiowait(&info->aio_result); /* Wait for outstanding req */
info->inited=0;
......@@ -613,7 +774,7 @@ int end_io_cache(IO_CACHE *info)
DBUG_ENTER("end_io_cache");
if((pre_close=info->pre_close))
(*pre_close)(info);
if (info->buffer)
if (info->alloced_buffer)
{
if (info->file != -1) /* File doesn't exist */
error=flush_io_cache(info);
......
......@@ -41,295 +41,6 @@ static void my_aiowait(my_aio_result *result);
extern "C" {
/*
** if cachesize == 0 then use default cachesize (from s-file)
** if file == -1 then real_open_cached_file() will be called.
** returns 0 if ok
*/
int init_io_cache(IO_CACHE *info, File file, uint cachesize,
enum cache_type type, my_off_t seek_offset,
pbool use_async_io, myf cache_myflags)
{
uint min_cache;
DBUG_ENTER("init_io_cache");
DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset));
/* There is no file in net_reading */
info->file= file;
info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
if (!cachesize)
if (! (cachesize= my_default_record_cache_size))
DBUG_RETURN(1); /* No cache requested */
min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
if (type == READ_CACHE)
{ /* Assume file isn't growing */
if (cache_myflags & MY_DONT_CHECK_FILESIZE)
{
cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
}
else
{
my_off_t file_pos,end_of_file;
if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR))
DBUG_RETURN(1);
end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
if (end_of_file < seek_offset)
end_of_file=seek_offset;
VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0)));
if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
{
cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1;
use_async_io=0; /* No need to use async */
}
}
}
if ((int) type < (int) READ_NET)
{
for (;;)
{
cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
(ulong) ~(min_cache-1));
if (cachesize < min_cache)
cachesize = min_cache;
if ((info->buffer=
(byte*) my_malloc(cachesize,
MYF((cache_myflags & ~ MY_WME) |
(cachesize == min_cache ? MY_WME : 0)))) != 0)
break; /* Enough memory found */
if (cachesize == min_cache)
DBUG_RETURN(2); /* Can't alloc cache */
cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */
}
}
else
info->buffer=0;
DBUG_PRINT("info",("init_io_cache: cachesize = %u",cachesize));
info->pos_in_file= seek_offset;
info->read_length=info->buffer_length=cachesize;
info->seek_not_done= test(file >= 0 && type != READ_FIFO &&
type != READ_NET);
info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
info->rc_request_pos=info->rc_pos=info->buffer;
if (type == READ_CACHE || type == READ_NET || type == READ_FIFO)
{
info->rc_end=info->buffer; /* Nothing in cache */
}
else /* type == WRITE_CACHE */
{
info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
}
/* end_of_file may be changed by user later */
info->end_of_file= ((type == READ_NET || type == READ_FIFO ) ? 0
: ~(my_off_t) 0);
info->type=type;
info->error=0;
info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read; /* net | file */
#ifdef HAVE_AIOWAIT
if (use_async_io && ! my_disable_async_io)
{
DBUG_PRINT("info",("Using async io"));
info->read_length/=2;
info->read_function=_my_b_async_read;
}
info->inited=info->aio_result.pending=0;
#endif
DBUG_RETURN(0);
} /* init_io_cache */
/* Wait until current request is ready */
#ifdef HAVE_AIOWAIT
static void my_aiowait(my_aio_result *result)
{
if (result->pending)
{
struct aio_result_t *tmp;
for (;;)
{
if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
{
if (errno == EINTR)
continue;
DBUG_PRINT("error",("No aio request, error: %d",errno));
result->pending=0; /* Assume everythings is ok */
break;
}
((my_aio_result*) tmp)->pending=0;
if ((my_aio_result*) tmp == result)
break;
}
}
return;
}
#endif
/* Use this to reset cache to start or other type */
/* Some simple optimizing is done when reinit in current buffer */
my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
my_off_t seek_offset,
pbool use_async_io __attribute__((unused)),
pbool clear_cache)
{
DBUG_ENTER("reinit_io_cache");
info->seek_not_done= test(info->file >= 0); /* Seek not done */
/* If the whole file is in memory, avoid flushing to disk */
if (! clear_cache &&
seek_offset >= info->pos_in_file &&
seek_offset <= info->pos_in_file +
(uint) (info->rc_end - info->rc_request_pos))
{ /* use current buffer */
if (info->type == WRITE_CACHE && type == READ_CACHE)
{
info->rc_end=info->rc_pos;
info->end_of_file=my_b_tell(info);
}
else if (type == WRITE_CACHE)
{
if (info->type == READ_CACHE)
info->rc_end=info->buffer+info->buffer_length;
info->end_of_file = ~(my_off_t) 0;
}
info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
#ifdef HAVE_AIOWAIT
my_aiowait(&info->aio_result); /* Wait for outstanding req */
#endif
}
else
{
/*
If we change from WRITE_CACHE to READ_CACHE, assume that everything
after the current positions should be ignored
*/
if (info->type == WRITE_CACHE && type == READ_CACHE)
info->end_of_file=my_b_tell(info);
/* No need to flush cache if we want to reuse it */
if ((type != WRITE_CACHE || !clear_cache) && flush_io_cache(info))
DBUG_RETURN(1);
if (info->pos_in_file != seek_offset)
{
info->pos_in_file=seek_offset;
info->seek_not_done=1;
}
info->rc_request_pos=info->rc_pos=info->buffer;
if (type == READ_CACHE || type == READ_NET || type == READ_FIFO)
{
info->rc_end=info->buffer; /* Nothing in cache */
}
else
{
info->rc_end=info->buffer+info->buffer_length-
(seek_offset & (IO_SIZE-1));
info->end_of_file= ((type == READ_NET || type == READ_FIFO) ? 0 :
~(my_off_t) 0);
}
}
info->type=type;
info->error=0;
info->read_function=(type == READ_NET) ? _my_b_net_read : _my_b_read;
#ifdef HAVE_AIOWAIT
if (type != READ_NET)
{
if (use_async_io && ! my_disable_async_io &&
((ulong) info->buffer_length <
(ulong) (info->end_of_file - seek_offset)))
{
info->read_length=info->buffer_length/2;
info->read_function=_my_b_async_read;
}
}
info->inited=0;
#endif
DBUG_RETURN(0);
} /* init_io_cache */
/*
Read buffered. Returns 1 if can't read requested characters
This function is only called from the my_b_read() macro
when there isn't enough characters in the buffer to
satisfy the request.
Returns 0 we succeeded in reading all data
*/
int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
uint length,diff_length,left_length;
my_off_t max_length, pos_in_file;
if ((left_length=(uint) (info->rc_end-info->rc_pos)))
{
dbug_assert(Count >= left_length); /* User is not using my_b_read() */
memcpy(Buffer,info->rc_pos, (size_t) (left_length));
Buffer+=left_length;
Count-=left_length;
}
/* pos_in_file always point on where info->buffer was read */
pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
if (info->seek_not_done)
{ /* File touched, do seek */
VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
info->seek_not_done=0;
}
diff_length=(uint) (pos_in_file & (IO_SIZE-1));
if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length)))
{ /* Fill first intern buffer */
uint read_length;
if (info->end_of_file == pos_in_file)
{ /* End of file */
info->error=(int) left_length;
return 1;
}
length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
!= (uint) length)
{
info->error= read_length == (uint) -1 ? -1 :
(int) (read_length+left_length);
return 1;
}
Count-=length;
Buffer+=length;
pos_in_file+=length;
left_length+=length;
diff_length=0;
}
max_length=info->read_length-diff_length;
if (info->type != READ_FIFO &&
(info->end_of_file - pos_in_file) < max_length)
max_length = info->end_of_file - pos_in_file;
if (!max_length)
{
if (Count)
{
info->error= left_length; /* We only got this many char */
return 1;
}
length=0; /* Didn't read any chars */
}
else if ((length=my_read(info->file,info->buffer,(uint) max_length,
info->myflags)) < Count ||
length == (uint) -1)
{
if (length != (uint) -1)
memcpy(Buffer,info->buffer,(size_t) length);
info->error= length == (uint) -1 ? -1 : (int) (length+left_length);
return 1;
}
info->rc_pos=info->buffer+Count;
info->rc_end=info->buffer+length;
info->pos_in_file=pos_in_file;
memcpy(Buffer,info->buffer,(size_t) Count);
return 0;
}
/*
** Read buffered from the net.
** Returns 1 if can't read requested characters
......@@ -359,341 +70,8 @@ int _my_b_net_read(register IO_CACHE *info, byte *Buffer,
info->rc_end = (info->rc_pos = (byte*) net->read_pos) + read_length;
Buffer[0] = info->rc_pos[0]; /* length is always 1 */
info->rc_pos++;
info->buffer = info->rc_pos;
return 0;
}
#ifdef HAVE_AIOWAIT
int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
{
uint length,read_length,diff_length,left_length,use_length,org_Count;
my_off_t max_length;
my_off_t next_pos_in_file;
byte *read_buffer;
memcpy(Buffer,info->rc_pos,
(size_t) (left_length=(uint) (info->rc_end-info->rc_pos)));
Buffer+=left_length;
org_Count=Count;
Count-=left_length;
if (info->inited)
{ /* wait for read block */
info->inited=0; /* No more block to read */
my_aiowait(&info->aio_result); /* Wait for outstanding req */
if (info->aio_result.result.aio_errno)
{
if (info->myflags & MY_WME)
my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
my_filename(info->file),
info->aio_result.result.aio_errno);
my_errno=info->aio_result.result.aio_errno;
info->error= -1;
return(1);
}
if (! (read_length = (uint) info->aio_result.result.aio_return) ||
read_length == (uint) -1)
{
my_errno=0; /* For testing */
info->error= (read_length == (uint) -1 ? -1 :
(int) (read_length+left_length));
return(1);
}
info->pos_in_file+=(uint) (info->rc_end - info->rc_request_pos);
if (info->rc_request_pos != info->buffer)
info->rc_request_pos=info->buffer;
else
info->rc_request_pos=info->buffer+info->read_length;
info->rc_pos=info->rc_request_pos;
next_pos_in_file=info->aio_read_pos+read_length;
/* Check if pos_in_file is changed
(_ni_read_cache may have skipped some bytes) */
if (info->aio_read_pos < info->pos_in_file)
{ /* Fix if skipped bytes */
if (info->aio_read_pos + read_length < info->pos_in_file)
{
read_length=0; /* Skipp block */
next_pos_in_file=info->pos_in_file;
}
else
{
my_off_t offset= (info->pos_in_file - info->aio_read_pos);
info->pos_in_file=info->aio_read_pos; /* Whe are here */
info->rc_pos=info->rc_request_pos+offset;
read_length-=offset; /* Bytes left from rc_pos */
}
}
#ifndef DBUG_OFF
if (info->aio_read_pos > info->pos_in_file)
{
my_errno=EINVAL;
return(info->read_length= -1);
}
#endif
/* Copy found bytes to buffer */
length=min(Count,read_length);
memcpy(Buffer,info->rc_pos,(size_t) length);
Buffer+=length;
Count-=length;
left_length+=length;
info->rc_end=info->rc_pos+read_length;
info->rc_pos+=length;
}
else
next_pos_in_file=(info->pos_in_file+ (uint)
(info->rc_end - info->rc_request_pos));
/* If reading large blocks, or first read or read with skipp */
if (Count)
{
if (next_pos_in_file == info->end_of_file)
{
info->error=(int) (read_length+left_length);
return 1;
}
VOID(my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0)));
read_length=IO_SIZE*2- (uint) (next_pos_in_file & (IO_SIZE-1));
if (Count < read_length)
{ /* Small block, read to cache */
if ((read_length=my_read(info->file,info->rc_request_pos,
read_length, info->myflags)) == (uint) -1)
return info->error= -1;
use_length=min(Count,read_length);
memcpy(Buffer,info->rc_request_pos,(size_t) use_length);
info->rc_pos=info->rc_request_pos+Count;
info->rc_end=info->rc_request_pos+read_length;
info->pos_in_file=next_pos_in_file; /* Start of block in cache */
next_pos_in_file+=read_length;
if (Count != use_length)
{ /* Didn't find hole block */
if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
my_filename(info->file),my_errno);
info->error=(int) (read_length+left_length);
return 1;
}
}
else
{ /* Big block, don't cache it */
if ((read_length=my_read(info->file,Buffer,(uint) Count,info->myflags))
!= Count)
{
info->error= read_length == (uint) -1 ? -1 : read_length+left_length;
return 1;
}
info->rc_pos=info->rc_end=info->rc_request_pos;
info->pos_in_file=(next_pos_in_file+=Count);
}
}
/* Read next block with asyncronic io */
max_length=info->end_of_file - next_pos_in_file;
diff_length=(next_pos_in_file & (IO_SIZE-1));
if (max_length > (my_off_t) info->read_length - diff_length)
max_length= (my_off_t) info->read_length - diff_length;
if (info->rc_request_pos != info->buffer)
read_buffer=info->buffer;
else
read_buffer=info->buffer+info->read_length;
info->aio_read_pos=next_pos_in_file;
if (max_length)
{
info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */
DBUG_PRINT("aioread",("filepos: %ld length: %ld",
(ulong) next_pos_in_file,(ulong) max_length));
if (aioread(info->file,read_buffer,(int) max_length,
(my_off_t) next_pos_in_file,MY_SEEK_SET,
&info->aio_result.result))
{ /* Skipp async io */
my_errno=errno;
DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
errno, info->aio_result.result.aio_errno));
if (info->rc_request_pos != info->buffer)
{
bmove(info->buffer,info->rc_request_pos,
(uint) (info->rc_end - info->rc_pos));
info->rc_request_pos=info->buffer;
info->rc_pos-=info->read_length;
info->rc_end-=info->read_length;
}
info->read_length=info->buffer_length; /* Use hole buffer */
info->read_function=_my_b_read; /* Use normal IO_READ next */
}
else
info->inited=info->aio_result.pending=1;
}
return 0; /* Block read, async in use */
} /* _my_b_async_read */
#endif
/* Read one byte when buffer is empty */
int _my_b_get(IO_CACHE *info)
{
byte buff;
IO_CACHE_CALLBACK pre_read,post_read;
if ((pre_read = info->pre_read))
(*pre_read)(info);
if ((*(info)->read_function)(info,&buff,1))
return my_b_EOF;
if ((post_read = info->post_read))
(*post_read)(info);
return (int) (uchar) buff;
}
/* Returns != 0 if error on write */
int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
{
uint rest_length,length;
rest_length=(uint) (info->rc_end - info->rc_pos);
memcpy(info->rc_pos,Buffer,(size_t) rest_length);
Buffer+=rest_length;
Count-=rest_length;
info->rc_pos+=rest_length;
if (info->pos_in_file+info->buffer_length > info->end_of_file)
{
my_errno=errno=EFBIG;
return info->error = -1;
}
if (flush_io_cache(info))
return 1;
if (Count >= IO_SIZE)
{ /* Fill first intern buffer */
length=Count & (uint) ~(IO_SIZE-1);
if (info->seek_not_done)
{ /* File touched, do seek */
VOID(my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)));
info->seek_not_done=0;
}
if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
return info->error= -1;
Count-=length;
Buffer+=length;
info->pos_in_file+=length;
}
memcpy(info->rc_pos,Buffer,(size_t) Count);
info->rc_pos+=Count;
return 0;
}
/*
Write a block to disk where part of the data may be inside the record
buffer. As all write calls to the data goes through the cache,
we will never get a seek over the end of the buffer
*/
int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
my_off_t pos)
{
uint length;
int error=0;
if (pos < info->pos_in_file)
{
/* Of no overlap, write everything without buffering */
if (pos + Count <= info->pos_in_file)
return my_pwrite(info->file, Buffer, Count, pos,
info->myflags | MY_NABP);
/* Write the part of the block that is before buffer */
length= (uint) (info->pos_in_file - pos);
if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
info->error=error=-1;
Buffer+=length;
pos+= length;
Count-= length;
}
/* Check if we want to write inside the used part of the buffer.*/
length= (uint) (info->rc_end - info->buffer);
if (pos < info->pos_in_file + length)
{
uint offset= (uint) (pos - info->pos_in_file);
length-=offset;
if (length > Count)
length=Count;
memcpy(info->buffer+offset, Buffer, length);
Buffer+=length;
Count-= length;
/* Fix length of buffer if the new data was larger */
if (info->buffer+length > info->rc_pos)
info->rc_pos=info->buffer+length;
if (!Count)
return (error);
}
/* Write at the end of the current buffer; This is the normal case */
if (_my_b_write(info, Buffer, Count))
error= -1;
return error;
}
/* Flush write cache */
int flush_io_cache(IO_CACHE *info)
{
uint length;
DBUG_ENTER("flush_io_cache");
if (info->type == WRITE_CACHE)
{
if (info->file == -1)
{
if (real_open_cached_file(info))
DBUG_RETURN((info->error= -1));
}
if (info->rc_pos != info->buffer)
{
length=(uint) (info->rc_pos - info->buffer);
if (info->seek_not_done)
{ /* File touched, do seek */
if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)) ==
MY_FILEPOS_ERROR)
DBUG_RETURN((info->error= -1));
info->seek_not_done=0;
}
info->rc_pos=info->buffer;
info->pos_in_file+=length;
info->rc_end=(info->buffer+info->buffer_length-
(info->pos_in_file & (IO_SIZE-1)));
if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP))
DBUG_RETURN((info->error= -1));
DBUG_RETURN(0);
}
}
#ifdef HAVE_AIOWAIT
else if (info->type != READ_NET)
{
my_aiowait(&info->aio_result); /* Wait for outstanding req */
info->inited=0;
}
#endif
DBUG_RETURN(0);
}
int end_io_cache(IO_CACHE *info)
{
int error=0;
IO_CACHE_CALLBACK pre_close;
DBUG_ENTER("end_io_cache");
if((pre_close=info->pre_close))
(*pre_close)(info);
if (info->buffer)
{
if (info->file != -1) /* File doesn't exist */
error=flush_io_cache(info);
my_free((gptr) info->buffer,MYF(MY_WME));
info->buffer=info->rc_pos=(byte*) 0;
}
DBUG_RETURN(error);
} /* end_io_cache */
} /* extern "C" */
......@@ -18,6 +18,10 @@
#include "mysql_priv.h"
#include "repl_failsafe.h"
#include "sql_repl.h"
#include "slave.h"
#include "mini_client.h"
#include <mysql.h>
RPL_STATUS rpl_status=RPL_NULL;
pthread_mutex_t LOCK_rpl_status;
......@@ -33,11 +37,184 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE",
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
rpl_status_type};
static int init_failsafe_rpl_thread(THD* thd)
{
DBUG_ENTER("init_failsafe_rpl_thread");
thd->system_thread = thd->bootstrap = 1;
thd->client_capabilities = 0;
my_net_init(&thd->net, 0);
thd->net.timeout = slave_net_timeout;
thd->max_packet_length=thd->net.max_packet;
thd->master_access= ~0;
thd->priv_user = 0;
thd->system_thread = 1;
pthread_mutex_lock(&LOCK_thread_count);
thd->thread_id = thread_id++;
pthread_mutex_unlock(&LOCK_thread_count);
if (init_thr_lock() ||
my_pthread_setspecific_ptr(THR_THD, thd) ||
my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
my_pthread_setspecific_ptr(THR_NET, &thd->net))
{
close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
end_thread(thd,0);
DBUG_RETURN(-1);
}
thd->mysys_var=my_thread_var;
thd->dbug_thread_id=my_thread_id();
#if !defined(__WIN__) && !defined(OS2)
sigset_t set;
VOID(sigemptyset(&set)); // Get mask in use
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif
thd->mem_root.free=thd->mem_root.used=0;
if (thd->max_join_size == (ulong) ~0L)
thd->options |= OPTION_BIG_SELECTS;
thd->proc_info="Thread initialized";
thd->version=refresh_version;
thd->set_time();
DBUG_RETURN(0);
}
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
{
pthread_mutex_lock(&LOCK_rpl_status);
if (rpl_status == from_status || rpl_status == RPL_ANY)
rpl_status = to_status;
pthread_cond_signal(&COND_rpl_status);
pthread_mutex_unlock(&LOCK_rpl_status);
}
int update_slave_list(MYSQL* mysql)
{
MYSQL_RES* res=0;
MYSQL_ROW row;
const char* error=0;
bool have_auth_info;
int port_ind;
if (mc_mysql_query(mysql,"SHOW SLAVE HOSTS",0) ||
!(res = mc_mysql_store_result(mysql)))
{
error = "Query error";
goto err;
}
switch (mc_mysql_num_fields(res))
{
case 5:
have_auth_info = 0;
port_ind=2;
break;
case 7:
have_auth_info = 1;
port_ind=4;
break;
default:
error = "Invalid number of fields in SHOW SLAVE HOSTS";
goto err;
}
pthread_mutex_lock(&LOCK_slave_list);
while ((row = mc_mysql_fetch_row(res)))
{
uint32 server_id;
SLAVE_INFO* si, *old_si;
server_id = atoi(row[0]);
if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
(byte*)&server_id,4)))
si = old_si;
else
{
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
{
error = "Out of memory";
pthread_mutex_unlock(&LOCK_slave_list);
goto err;
}
si->server_id = server_id;
}
strnmov(si->host, row[1], sizeof(si->host));
si->port = atoi(row[port_ind]);
si->rpl_recovery_rank = atoi(row[port_ind+1]);
si->master_id = atoi(row[port_ind+2]);
if (have_auth_info)
{
strnmov(si->user, row[2], sizeof(si->user));
strnmov(si->password, row[3], sizeof(si->password));
}
}
pthread_mutex_unlock(&LOCK_slave_list);
err:
if (res)
mc_mysql_free_result(res);
if (error)
{
sql_print_error("Error updating slave list:",error);
return 1;
}
return 0;
}
int find_recovery_captain(THD* thd, MYSQL* mysql)
{
return 0;
}
pthread_handler_decl(handle_failsafe_rpl,arg)
{
DBUG_ENTER("handle_failsafe_rpl");
THD *thd = new THD;
thd->thread_stack = (char*)&thd;
MYSQL* recovery_captain = 0;
pthread_detach_this_thread();
if (init_failsafe_rpl_thread(thd) || !(recovery_captain=mc_mysql_init(0)))
{
sql_print_error("Could not initialize failsafe replication thread");
goto err;
}
pthread_mutex_lock(&LOCK_rpl_status);
while (!thd->killed && !abort_loop)
{
bool break_req_chain = 0;
const char* msg = thd->enter_cond(&COND_rpl_status,
&LOCK_rpl_status, "Waiting for request");
pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status);
thd->proc_info="Processling request";
while (!break_req_chain)
{
switch (rpl_status)
{
case RPL_LOST_SOLDIER:
if (find_recovery_captain(thd, recovery_captain))
rpl_status=RPL_TROOP_SOLDIER;
else
rpl_status=RPL_RECOVERY_CAPTAIN;
break_req_chain=1; /* for now until other states are implemented */
break;
default:
break_req_chain=1;
break;
}
}
thd->exit_cond(msg);
}
pthread_mutex_unlock(&LOCK_rpl_status);
err:
if (recovery_captain)
mc_mysql_close(recovery_captain);
delete thd;
my_thread_end();
pthread_exit(0);
DBUG_RETURN(0);
}
#ifndef REPL_FAILSAFE_H
#define REPL_FAILSAFE_H
#include "mysql.h"
typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE,
RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER,
RPL_RECOVERY_CAPTAIN,RPL_NULL /* inactive */,
......@@ -10,7 +12,11 @@ extern RPL_STATUS rpl_status;
extern pthread_mutex_t LOCK_rpl_status;
extern pthread_cond_t COND_rpl_status;
extern TYPELIB rpl_role_typelib, rpl_status_typelib;
extern uint rpl_recovery_rank;
extern const char* rpl_role_type[], *rpl_status_type[];
pthread_handler_decl(handle_failsafe_rpl,arg);
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status);
int find_recovery_captain(THD* thd, MYSQL* mysql);
int update_slave_list(MYSQL* mysql);
#endif
......@@ -55,6 +55,8 @@ inline bool slave_killed(THD* thd);
static int init_slave_thread(THD* thd);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect);
static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
......@@ -615,6 +617,10 @@ int register_slave_on_master(MYSQL* mysql)
int2store(buf, (uint16)report_port);
packet.append(buf, 2);
int4store(buf, rpl_recovery_rank);
packet.append(buf, 4);
int4store(buf, 0); /* tell the master will fill in master_id */
packet.append(buf, 4);
if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
packet.length(), 0))
......@@ -868,7 +874,7 @@ command");
}
static uint read_event(MYSQL* mysql, MASTER_INFO *mi)
static ulong read_event(MYSQL* mysql, MASTER_INFO *mi)
{
ulong len = packet_error;
// for convinience lets think we start by
......@@ -1017,7 +1023,6 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ !
thd->set_time();
DBUG_ENTER("handle_slave");
pthread_detach_this_thread();
......@@ -1067,6 +1072,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
// on with life
thd->proc_info = "Registering slave on master";
register_slave_on_master(mysql);
update_slave_list(mysql);
while (!slave_killed(thd))
{
......@@ -1117,7 +1123,7 @@ try again, log '%s' at postion %s", RPL_LOG_NAME,
while(!slave_killed(thd))
{
thd->proc_info = "Reading master update";
uint event_len = read_event(mysql, &glob_mi);
ulong event_len = read_event(mysql, &glob_mi);
if(slave_killed(thd))
{
sql_print_error("Slave thread killed while reading event");
......@@ -1244,30 +1250,7 @@ position %s",
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
int slave_was_killed;
#ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count;
#endif
while(!(slave_was_killed = slave_killed(thd)) &&
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))
{
sql_print_error("Slave thread: error connecting to master: %s (%d),\
retry in %d sec", mc_mysql_error(mysql), errno, mi->connect_retry);
safe_sleep(thd, mi->connect_retry);
}
if(!slave_was_killed)
{
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
mi->user, mi->host, mi->port);
#ifdef SIGNAL_WITH_VIO_CLOSE
thd->set_active_vio(mysql->net.vio);
#endif
}
return slave_was_killed;
return connect_to_master(thd, mysql, mi, 0);
}
/*
......@@ -1275,7 +1258,8 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
master_retry_count times
*/
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect)
{
int slave_was_killed;
int last_errno= -2; // impossible error
......@@ -1290,12 +1274,15 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
#ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count;
#endif
while (!(slave_was_killed = slave_killed(thd)) && mc_mysql_reconnect(mysql))
while (!(slave_was_killed = slave_killed(thd)) &&
(reconnect ? mc_mysql_reconnect(mysql) :
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0)))
{
/* Don't repeat last error */
if (mc_mysql_errno(mysql) != last_errno)
{
sql_print_error("Slave thread: error re-connecting to master: \
sql_print_error("Slave thread: error connecting to master: \
%s, last_errno=%d, retry in %d sec",
mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
mi->connect_retry);
......@@ -1309,6 +1296,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
if (master_retry_count && err_count++ == master_retry_count)
{
slave_was_killed=1;
if (reconnect)
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
break;
}
......@@ -1316,11 +1304,18 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
if (!slave_was_killed)
{
sql_print_error("Slave: reconnected to master '%s@%s:%d',\
if (reconnect)
sql_print_error("Slave: connected to master '%s@%s:%d',\
replication resumed in log '%s' at position %s", glob_mi.user,
glob_mi.host, glob_mi.port,
RPL_LOG_NAME,
llstr(glob_mi.pos,llbuff));
else
{
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
mi->user, mi->host, mi->port);
}
#ifdef SIGNAL_WITH_VIO_CLOSE
thd->set_active_vio(mysql->net.vio);
#endif
......@@ -1329,6 +1324,17 @@ replication resumed in log '%s' at position %s", glob_mi.user,
return slave_was_killed;
}
/*
Try to connect until successful or slave killed or we have retried
master_retry_count times
*/
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
return connect_to_master(thd, mysql, mi, 1);
}
#ifdef __GNUC__
template class I_List_iterator<i_string>;
template class I_List_iterator<i_string_pair>;
......
......@@ -277,10 +277,13 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (using_transactions)
ha_autocommit_or_rollback(thd,error);
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
{
if (lf_info.wrote_create_file)
{
Delete_file_log_event d(thd);
mysql_bin_log.write(&d);
}
}
DBUG_RETURN(-1); // Error on read
}
sprintf(name,ER(ER_LOAD_INFO),info.records,info.deleted,
......@@ -303,10 +306,13 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (!opt_old_rpl_compat)
{
read_info.end_io_cache(); // make sure last block gets logged
if (lf_info.wrote_create_file)
{
Execute_load_log_event e(thd);
mysql_bin_log.write(&e);
}
}
}
if (using_transactions)
error=ha_autocommit_or_rollback(thd,error);
DBUG_RETURN(error);
......@@ -534,6 +540,14 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term,
}
else
{
/* init_io_cache() will not initialize read_function member
if the cache is READ_NET. The reason is explained in
mysys/mf_iocache.c. So we work around the problem with a
manual assignment
*/
if (get_it_from_net)
cache.read_function = _my_b_net_read;
need_end_io_cache = 1;
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
cache.pre_read = cache.pre_close =
......
......@@ -140,6 +140,11 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
get_object(p,si->user);
get_object(p,si->password);
si->port = uint2korr(p);
p += 2;
si->rpl_recovery_rank = uint4korr(p);
p += 4;
if (!(si->master_id = uint4korr(p)))
si->master_id = server_id;
si->thd = thd;
pthread_mutex_lock(&LOCK_slave_list);
......@@ -534,6 +539,7 @@ impossible position";
DBUG_PRINT("wait",("waiting for data on binary log"));
if (!thd->killed)
pthread_cond_wait(&COND_binlog_update, log_lock);
DBUG_PRINT("wait",("binary log received update"));
break;
default:
......@@ -1253,6 +1259,8 @@ int show_slave_hosts(THD* thd)
field_list.push_back(new Item_empty_string("Password",20));
}
field_list.push_back(new Item_empty_string("Port",20));
field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
field_list.push_back(new Item_empty_string("Master_id", 20));
if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
......@@ -1271,6 +1279,8 @@ int show_slave_hosts(THD* thd)
net_store_data(packet, si->password);
}
net_store_data(packet, (uint32) si->port);
net_store_data(packet, si->rpl_recovery_rank);
net_store_data(packet, si->master_id);
if (my_net_write(net, (char*)packet->ptr(), packet->length()))
{
pthread_mutex_unlock(&LOCK_slave_list);
......@@ -1616,7 +1626,8 @@ int log_loaded_block(IO_CACHE* file)
{
LOAD_FILE_INFO* lf_info;
uint block_len ;
if (!(block_len = file->rc_end - file->buffer))
char* buffer = (char*)file->buffer;
if (!(block_len = file->rc_end - buffer))
return 0;
lf_info = (LOAD_FILE_INFO*)file->arg;
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
......@@ -1625,14 +1636,14 @@ int log_loaded_block(IO_CACHE* file)
lf_info->last_pos_in_file = file->pos_in_file;
if (lf_info->wrote_create_file)
{
Append_block_log_event a(lf_info->thd, (char*) file->buffer, block_len);
Append_block_log_event a(lf_info->thd, buffer, block_len);
mysql_bin_log.write(&a);
}
else
{
Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db,
lf_info->table_name, *lf_info->fields,
lf_info->handle_dup, (char*) file->buffer,
lf_info->handle_dup, buffer,
block_len);
mysql_bin_log.write(&c);
lf_info->wrote_create_file = 1;
......
......@@ -6,6 +6,7 @@
typedef struct st_slave_info
{
uint32 server_id;
uint32 rpl_recovery_rank, master_id;
char host[HOSTNAME_LENGTH+1];
char user[USERNAME_LENGTH+1];
char password[HASH_PASSWORD_LENGTH+1];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment