slave master.info FILE -> IO_CACHE

send_file() is more stack-friendly - changed large static buffer to alloc_root()
fixed my_b_seek() to work with WRITE_CACHE
test case to make sure the slave starts correctly after being stopped
truncated words file so the replication tests will take less time
parent 93ac0a25
...@@ -276,3 +276,7 @@ mysql-test/var/slave-data/test/choo.MYI ...@@ -276,3 +276,7 @@ mysql-test/var/slave-data/test/choo.MYI
mysql-test/var/tmp/README mysql-test/var/tmp/README
BitKeeper/tmp/bkOF1wtJ BitKeeper/tmp/bkOF1wtJ
scripts/mysqldumpslow scripts/mysqldumpslow
mysql-test/rpl000011.test
mysql-test/var/lib/mysql-bin.007
sql/share/norwegian/errmsg.sys
sql/share/norwegian-ny/errmsg.sys
#! /bin/sh
# This script is a hack for lazy developers who want to get a quick
# start on the result file. The code here is rather dirty, but it works
# If you have a spare moment feel free to improve it - the right way is
# to start mysqld yourself and run mysqltest -r
RESULT_DIR=r/3.23
if [ -z $EDITOR] then;
EDITOR=vi
fi
function die()
{
echo $1
exit 1
}
function usage()
{
echo "Usage: $0 test_name"
exit 1
}
test_name=$1
[ -z $test_name ] && usage
result_file=$result_dir/$test_name.result
[ -f $result_file ] && die "result file $result_file has already been created"
touch $result_file
echo "Running the test case against empty file, will fail, but don't worry"
./mysql-test-run $test_name
reject_file=$result_file.reject
if [ -f $reject_file ] ; then
echo "Below are the contents of the reject file:"
echo "-----start---------------------"
cat $result_file.
echo "-----end-----------------------"
echo "Is this the output you expected from your test case?(y/n)[n]"
read yes_no
if [ x$yes_no = xy ] then;
echo "Press any key to edit it in $EDITOR, or Ctrl-C to abort"
read junk
$EDITOR $reject_file
edited="edited"
fi
echo "Save $edited file as master result? (y/n)[y]"
read yes_no
if [ x$yes_no != xn ]; then
mv $reject_file $result_file
fi
else
echo "Your test failed so bad, it did not even produce a reject file"
echo "You need to fix your bugs in the test case, the code, or both"
exit 1
fi
sum(length(word)) sum(length(word))
363634 71
This source diff could not be displayed because it is too large. You can view the blob instead.
source t/include/master-slave.inc;
connection master;
use test;
drop table if exists foo;
create table foo (n int);
insert into foo values(1);
connection slave;
#give slave some breathing room to get started
sleep 1;
slave stop;
slave start;
connection master;
insert into foo values(2);
connection slave;
#let slave catch up
sleep 1;
@r/3.23/rpl000011.result select * from foo;
...@@ -27,14 +27,27 @@ ...@@ -27,14 +27,27 @@
/* /*
** Fix that next read will be made at certain position ** Fix that next read will be made at certain position
** This only works with READ_CACHE ** For write cache, make next write happen at a certain position
*/ */
void my_b_seek(IO_CACHE *info,my_off_t pos) void my_b_seek(IO_CACHE *info,my_off_t pos)
{ {
info->seek_not_done=1; if(info->type == READ_CACHE)
info->pos_in_file=pos; {
info->rc_pos=info->rc_end=info->buffer; info->rc_pos=info->rc_end=info->buffer;
}
else if(info->type == WRITE_CACHE)
{
byte* try_rc_pos;
try_rc_pos = info->rc_pos + (pos - info->pos_in_file);
if(try_rc_pos >= info->buffer && try_rc_pos <= info->rc_end)
info->rc_pos = try_rc_pos;
else
flush_io_cache(info);
}
info->pos_in_file=pos;
info->seek_not_done=1;
} }
/* /*
...@@ -139,8 +152,6 @@ uint my_b_printf(IO_CACHE *info, const char* fmt, ...) ...@@ -139,8 +152,6 @@ uint my_b_printf(IO_CACHE *info, const char* fmt, ...)
uint my_b_vprintf(IO_CACHE *info, const char* fmt, va_list args) uint my_b_vprintf(IO_CACHE *info, const char* fmt, va_list args)
{ {
reg1 char *to= info->rc_pos;
char *end=info->rc_end;
uint out_length=0; uint out_length=0;
for (; *fmt ; fmt++) for (; *fmt ; fmt++)
...@@ -188,7 +199,8 @@ uint my_b_vprintf(IO_CACHE *info, const char* fmt, va_list args) ...@@ -188,7 +199,8 @@ uint my_b_vprintf(IO_CACHE *info, const char* fmt, va_list args)
if (my_b_write(info, buff, length)) if (my_b_write(info, buff, length))
goto err; goto err;
} }
else if (*fmt == 'l' && fmt[1] == 'd' || fmt[1] == 'u')/* long parameter */ else if ((*fmt == 'l' && fmt[1] == 'd') || fmt[1] == 'u')
/* long parameter */
{ {
register long iarg; register long iarg;
uint length; uint length;
......
...@@ -22,6 +22,9 @@ ...@@ -22,6 +22,9 @@
#include <thr_alarm.h> #include <thr_alarm.h>
#include <my_dir.h> #include <my_dir.h>
#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\
"FIRST")
bool slave_running = 0; bool slave_running = 0;
pthread_t slave_real_id; pthread_t slave_real_id;
MASTER_INFO glob_mi; MASTER_INFO glob_mi;
...@@ -227,16 +230,17 @@ int db_ok(const char* db, I_List<i_string> &do_list, ...@@ -227,16 +230,17 @@ int db_ok(const char* db, I_List<i_string> &do_list,
} }
} }
static void init_strvar_from_file(char* var, int max_size, FILE* f, static void init_strvar_from_file(char* var, int max_size, IO_CACHE* f,
char* default_val) char* default_val)
{ {
if(fgets(var, max_size, f)) if(my_b_gets(f,var, max_size))
{ {
char* last_p = strend(var) - 1; char* last_p = strend(var) - 1;
int c;
if(*last_p == '\n') *last_p = 0; // if we stopped on newline, kill it if(*last_p == '\n') *last_p = 0; // if we stopped on newline, kill it
else else
while( (fgetc(f) != '\n' && !feof(f))); while( ((c=my_b_get(f)) != '\n' && c != my_b_EOF));
// if we truncated a line or stopped on last char, remove all chars // if we truncated a line or stopped on last char, remove all chars
// up to and including newline // up to and including newline
} }
...@@ -244,12 +248,12 @@ static void init_strvar_from_file(char* var, int max_size, FILE* f, ...@@ -244,12 +248,12 @@ static void init_strvar_from_file(char* var, int max_size, FILE* f,
strmake(var, default_val, max_size); strmake(var, default_val, max_size);
} }
static void init_intvar_from_file(int* var, FILE* f, static void init_intvar_from_file(int* var, IO_CACHE* f,
int default_val) int default_val)
{ {
char buf[32]; char buf[32];
if(fgets(buf, sizeof(buf), f)) if(my_b_gets(f, buf, sizeof(buf)))
{ {
*var = atoi(buf); *var = atoi(buf);
} }
...@@ -392,7 +396,7 @@ int fetch_nx_table(THD* thd, MASTER_INFO* mi) ...@@ -392,7 +396,7 @@ int fetch_nx_table(THD* thd, MASTER_INFO* mi)
int init_master_info(MASTER_INFO* mi) int init_master_info(MASTER_INFO* mi)
{ {
FILE* file; int fd;
MY_STAT stat_area; MY_STAT stat_area;
char fname[FN_REFLEN+128]; char fname[FN_REFLEN+128];
fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32); fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
...@@ -403,19 +407,24 @@ int init_master_info(MASTER_INFO* mi) ...@@ -403,19 +407,24 @@ int init_master_info(MASTER_INFO* mi)
pthread_mutex_lock(&mi->lock); pthread_mutex_lock(&mi->lock);
mi->pending = 0; mi->pending = 0;
fd = mi->fd;
if(!my_stat(fname, &stat_area, MYF(0))) // we do not want any messages if(!my_stat(fname, &stat_area, MYF(0))) // we do not want any messages
// if the file does not exist // if the file does not exist
{ {
file = my_fopen(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME)); // if someone removed the file from underneath our feet, just close
if(!file) // the old descriptor and re-create the old file
if(fd >= 0) my_close(fd, MYF(MY_WME));
if((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0
|| init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
MYF(MY_WME)))
{ {
pthread_mutex_unlock(&mi->lock); pthread_mutex_unlock(&mi->lock);
return 1; return 1;
} }
mi->log_file_name[0] = 0; mi->log_file_name[0] = 0;
mi->pos = 4; // skip magic number mi->pos = 4; // skip magic number
mi->file = file; mi->fd = fd;
if(master_host) if(master_host)
strmake(mi->host, master_host, sizeof(mi->host) - 1); strmake(mi->host, master_host, sizeof(mi->host) - 1);
...@@ -426,22 +435,20 @@ int init_master_info(MASTER_INFO* mi) ...@@ -426,22 +435,20 @@ int init_master_info(MASTER_INFO* mi)
mi->port = master_port; mi->port = master_port;
mi->connect_retry = master_connect_retry; mi->connect_retry = master_connect_retry;
if(flush_master_info(mi))
{
pthread_mutex_unlock(&mi->lock);
return 1;
}
} }
else else
{ {
file = my_fopen(fname, O_RDWR|O_BINARY, MYF(MY_WME)); if(fd >= 0)
if(!file) reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
else if((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
|| init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
0, MYF(MY_WME)))
{ {
pthread_mutex_unlock(&mi->lock); pthread_mutex_unlock(&mi->lock);
return 1; return 1;
} }
if(!fgets(mi->log_file_name, sizeof(mi->log_file_name), file)) if(!my_b_gets(&mi->file, mi->log_file_name, sizeof(mi->log_file_name)))
{ {
sql_print_error("Error reading log file name from master info file "); sql_print_error("Error reading log file name from master info file ");
pthread_mutex_unlock(&mi->lock); pthread_mutex_unlock(&mi->lock);
...@@ -450,7 +457,7 @@ int init_master_info(MASTER_INFO* mi) ...@@ -450,7 +457,7 @@ int init_master_info(MASTER_INFO* mi)
*(strend(mi->log_file_name) - 1) = 0; // kill \n *(strend(mi->log_file_name) - 1) = 0; // kill \n
char buf[FN_REFLEN]; char buf[FN_REFLEN];
if(!fgets(buf, sizeof(buf), file)) if(!my_b_gets(&mi->file, buf, sizeof(buf)))
{ {
sql_print_error("Error reading log file position from master info file"); sql_print_error("Error reading log file position from master info file");
pthread_mutex_unlock(&mi->lock); pthread_mutex_unlock(&mi->lock);
...@@ -458,19 +465,29 @@ int init_master_info(MASTER_INFO* mi) ...@@ -458,19 +465,29 @@ int init_master_info(MASTER_INFO* mi)
} }
mi->pos = atoi(buf); mi->pos = atoi(buf);
mi->file = file; mi->fd = fd;
init_strvar_from_file(mi->host, sizeof(mi->host), file, master_host); init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
init_strvar_from_file(mi->user, sizeof(mi->user), file, master_user); master_host);
init_strvar_from_file(mi->password, sizeof(mi->password), file, init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
master_user);
init_strvar_from_file(mi->password, sizeof(mi->password), &mi->file,
master_password); master_password);
init_intvar_from_file((int*)&mi->port, file, master_port); init_intvar_from_file((int*)&mi->port, &mi->file, master_port);
init_intvar_from_file((int*)&mi->connect_retry, file, init_intvar_from_file((int*)&mi->connect_retry, &mi->file,
master_connect_retry); master_connect_retry);
} }
mi->inited = 1; mi->inited = 1;
// now change the cache from READ to WRITE - must do this
// before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE, 0L,0,1);
if(flush_master_info(mi))
{
pthread_mutex_unlock(&mi->lock);
return 1;
}
pthread_mutex_unlock(&mi->lock); pthread_mutex_unlock(&mi->lock);
return 0; return 0;
...@@ -521,19 +538,14 @@ int show_master_info(THD* thd) ...@@ -521,19 +538,14 @@ int show_master_info(THD* thd)
int flush_master_info(MASTER_INFO* mi) int flush_master_info(MASTER_INFO* mi)
{ {
FILE* file = mi->file; IO_CACHE* file = &mi->file;
char lbuf[22]; char lbuf[22];
if(my_fseek(file, 0L, MY_SEEK_SET, MYF(MY_WME)) == MY_FILEPOS_ERROR || my_b_seek(file, 0L);
fprintf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n", my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n",
mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user, mi->password, mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user, mi->password,
mi->port, mi->connect_retry) < 0 || mi->port, mi->connect_retry);
fflush(file)) flush_io_cache(file);
{
sql_print_error("Write error flushing master_info: %d", errno);
return 1;
}
return 0; return 0;
} }
...@@ -694,7 +706,8 @@ server_errno=%d)", ...@@ -694,7 +706,8 @@ server_errno=%d)",
if(len == 1) if(len == 1)
{ {
sql_print_error("Received 0 length packet from server, looks like master shutdown: %s (%d)", sql_print_error("Slave: received 0 length packet from server, apparent\
master shutdown: %s (%d)",
mc_mysql_error(mysql), read_errno); mc_mysql_error(mysql), read_errno);
return packet_error; return packet_error;
} }
...@@ -1006,7 +1019,16 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) ...@@ -1006,7 +1019,16 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
} }
thd->proc_info = "connecting to master"; thd->proc_info = "connecting to master";
#ifndef DBUG_OFF
sql_print_error("Slave thread initialized");
#endif
safe_connect(thd, mysql, &glob_mi); safe_connect(thd, mysql, &glob_mi);
// always report status on startup, even if we are not in debug
sql_print_error("Slave: connected to master '%s@%s:%d',\
replication started in log '%s' at position %ld", glob_mi.user,
glob_mi.host, glob_mi.port,
RPL_LOG_NAME,
glob_mi.pos);
while(!slave_killed(thd)) while(!slave_killed(thd))
{ {
...@@ -1033,7 +1055,8 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) ...@@ -1033,7 +1055,8 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
thd->proc_info = "reconnecting after a failed dump request"; thd->proc_info = "reconnecting after a failed dump request";
sql_print_error("Slave: failed dump request, reconnecting to \ sql_print_error("Slave: failed dump request, reconnecting to \
try again, master_log_pos=%ld", last_failed_pos = glob_mi.pos ); try again, log '%s' at postion %ld", RPL_LOG_NAME,
last_failed_pos = glob_mi.pos );
safe_reconnect(thd, mysql, &glob_mi); safe_reconnect(thd, mysql, &glob_mi);
if(slave_killed(thd)) if(slave_killed(thd))
goto err; goto err;
...@@ -1063,7 +1086,8 @@ try again, master_log_pos=%ld", last_failed_pos = glob_mi.pos ); ...@@ -1063,7 +1086,8 @@ try again, master_log_pos=%ld", last_failed_pos = glob_mi.pos );
goto err; goto err;
thd->proc_info = "reconnecting after a failed read"; thd->proc_info = "reconnecting after a failed read";
sql_print_error("Slave: Failed reading log event, \ sql_print_error("Slave: Failed reading log event, \
reconnecting to retry, master_log_pos=%ld", last_failed_pos = glob_mi.pos); reconnecting to retry, log '%s' position %ld", RPL_LOG_NAME,
last_failed_pos = glob_mi.pos);
safe_reconnect(thd, mysql, &glob_mi); safe_reconnect(thd, mysql, &glob_mi);
if(slave_killed(thd)) if(slave_killed(thd))
goto err; goto err;
...@@ -1074,7 +1098,8 @@ reconnecting to retry, master_log_pos=%ld", last_failed_pos = glob_mi.pos); ...@@ -1074,7 +1098,8 @@ reconnecting to retry, master_log_pos=%ld", last_failed_pos = glob_mi.pos);
if(exec_event(thd, &mysql->net, &glob_mi, event_len)) if(exec_event(thd, &mysql->net, &glob_mi, event_len))
{ {
sql_print_error("Error running query, slave aborted. Fix the problem, and re-start\ sql_print_error("Error running query, slave aborted. Fix the problem, and re-start\
the slave thread with mysqladmin start-slave"); the slave thread with mysqladmin start-slave - log '%s' position %ld",
RPL_LOG_NAME, glob_mi.pos);
goto err; goto err;
// there was an error running the query // there was an error running the query
// abort the slave thread, when the problem is fixed, the user // abort the slave thread, when the problem is fixed, the user
...@@ -1108,6 +1133,10 @@ reconnecting to retry, master_log_pos=%ld", last_failed_pos = glob_mi.pos); ...@@ -1108,6 +1133,10 @@ reconnecting to retry, master_log_pos=%ld", last_failed_pos = glob_mi.pos);
error = 0; error = 0;
err: err:
// print the current replication position
sql_print_error("Slave thread exiting, replication stopped in log '%s' at \
position %ld",
RPL_LOG_NAME, glob_mi.pos);
thd->query = thd->db = 0; // extra safety thd->query = thd->db = 0; // extra safety
if(mysql) if(mysql)
{ {
......
...@@ -5,8 +5,8 @@ typedef struct st_master_info ...@@ -5,8 +5,8 @@ typedef struct st_master_info
{ {
char log_file_name[FN_REFLEN]; char log_file_name[FN_REFLEN];
ulonglong pos,pending; ulonglong pos,pending;
FILE* file; // we keep the file open, so we need to remember the file pointer File fd; // we keep the file open, so we need to remember the file pointer
IO_CACHE file;
// the variables below are needed because we can change masters on the fly // the variables below are needed because we can change masters on the fly
char host[HOSTNAME_LENGTH+1]; char host[HOSTNAME_LENGTH+1];
char user[USERNAME_LENGTH+1]; char user[USERNAME_LENGTH+1];
...@@ -16,7 +16,7 @@ typedef struct st_master_info ...@@ -16,7 +16,7 @@ typedef struct st_master_info
pthread_mutex_t lock; pthread_mutex_t lock;
bool inited; bool inited;
st_master_info():pending(0),inited(0) st_master_info():pending(0),fd(-1),inited(0)
{ {
host[0] = 0; user[0] = 0; password[0] = 0; host[0] = 0; user[0] = 0; password[0] = 0;
pthread_mutex_init(&lock, NULL); pthread_mutex_init(&lock, NULL);
......
...@@ -32,7 +32,7 @@ static int send_file(THD *thd) ...@@ -32,7 +32,7 @@ static int send_file(THD *thd)
NET* net = &thd->net; NET* net = &thd->net;
int fd = -1,bytes, error = 1; int fd = -1,bytes, error = 1;
char fname[FN_REFLEN+1]; char fname[FN_REFLEN+1];
char buf[IO_SIZE*15]; char *buf;
const char *errmsg = 0; const char *errmsg = 0;
int old_timeout; int old_timeout;
DBUG_ENTER("send_file"); DBUG_ENTER("send_file");
...@@ -42,6 +42,13 @@ static int send_file(THD *thd) ...@@ -42,6 +42,13 @@ static int send_file(THD *thd)
old_timeout = thd->net.timeout; old_timeout = thd->net.timeout;
thd->net.timeout = thd->inactive_timeout; thd->net.timeout = thd->inactive_timeout;
// spare the stack
if(!(buf = alloc_root(&thd->mem_root,IO_SIZE)))
{
errmsg = "Out of memory";
goto err;
}
// we need net_flush here because the client will not know it needs to send // we need net_flush here because the client will not know it needs to send
// us the file name until it has processed the load event entry // us the file name until it has processed the load event entry
if (net_flush(net) || my_net_read(net) == packet_error) if (net_flush(net) || my_net_read(net) == packet_error)
...@@ -62,7 +69,7 @@ static int send_file(THD *thd) ...@@ -62,7 +69,7 @@ static int send_file(THD *thd)
goto err; goto err;
} }
while ((bytes = (int) my_read(fd, (byte*) buf, sizeof(buf), while ((bytes = (int) my_read(fd, (byte*) buf, IO_SIZE,
MYF(MY_WME))) > 0) MYF(MY_WME))) > 0)
{ {
if (my_net_write(net, buf, bytes)) if (my_net_write(net, buf, bytes))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment