Commit de172721 authored by unknown's avatar unknown

more predicatable slave behaviour with wait_for_slave_stop in mysqltest

fixed a couple of bugs with SEQ_READ_APPEND cache
rpl000016 still has non-deterministic result, but I am going to commit and
push since what I have is now better than what is in the main repository


client/mysqltest.c:
  added wait_for_slave_to_stop
  cleaned up TODO and comments
include/my_sys.h:
  fixed race in flush_io_cache in SEQ_READ_APPEND cache
mysql-test/r/rpl000016.result:
  updated result
mysql-test/t/rpl000016.test:
  use wait_for_slave_to_stop to have deterministic slave behaviour for the test
mysys/mf_iocache.c:
  fixed race in flush_io_cache()
  fixed failure to unlock mutex in my_b_append()
sql/log.cc:
  be compatible with 3.23 master
sql/log_event.cc:
  3.23 master compat
sql/slave.cc:
  3.23 master compat
sql/sql_class.h:
  compat with 3.23 master
parent 1c280293
......@@ -15,7 +15,8 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* mysqltest test tool
* See man page for more information.
* See the manual for more information
* TODO: document better how mysqltest works
*
* Written by:
* Sasha Pachev <sasha@mysql.com>
......@@ -26,9 +27,6 @@
/**********************************************************************
TODO:
- Print also the queries that returns a result to the log file; This makes
it much easier to find out what's wrong.
- Do comparison line by line, instead of doing a full comparison of
the text file. This will save space as we don't need to keep many
results in memory. It will also make it possible to do simple
......@@ -43,7 +41,7 @@
**********************************************************************/
#define MTEST_VERSION "1.13"
#define MTEST_VERSION "1.14"
#include <my_global.h>
#include <mysql_embed.h>
......@@ -88,6 +86,12 @@
#define CON_RETRY_SLEEP 2
#define MAX_CON_TRIES 5
#ifndef OS2
#define SLAVE_POLL_INTERVAL 300000 /* 0.3 of a sec */
#else
#defile SLAVE_POLL_INTERVAL 0.3
#endif
enum {OPT_MANAGER_USER=256,OPT_MANAGER_HOST,OPT_MANAGER_PASSWD,
OPT_MANAGER_PORT,OPT_MANAGER_WAIT_TIMEOUT};
......@@ -187,6 +191,7 @@ Q_DISABLE_RPL_PARSE, Q_EVAL_RESULT,
Q_ENABLE_QUERY_LOG, Q_DISABLE_QUERY_LOG,
Q_ENABLE_RESULT_LOG, Q_DISABLE_RESULT_LOG,
Q_SERVER_START, Q_SERVER_STOP,Q_REQUIRE_MANAGER,
Q_WAIT_FOR_SLAVE_TO_STOP,
Q_UNKNOWN, /* Unknown command. */
Q_COMMENT, /* Comments, ignored. */
Q_COMMENT_WITH_COMMAND
......@@ -222,7 +227,7 @@ const char *command_names[] = {
"enable_query_log", "disable_query_log",
"enable_result_log", "disable_result_log",
"server_start", "server_stop",
"require_manager",
"require_manager", "wait_for_slave_to_stop",
0
};
......@@ -653,6 +658,45 @@ int open_file(const char* name)
return 0;
}
/* ugly long name, but we are following the convention */
int do_wait_for_slave_to_stop(struct st_query* __attribute__((unused)) q)
{
MYSQL* mysql = &cur_con->mysql;
#ifndef OS2
struct timeval t;
#endif
for (;;)
{
MYSQL_RES* res;
MYSQL_ROW row;
int done;
LINT_INIT(res);
if (mysql_query(mysql,"show status like 'Slave_running'")
|| !(res=mysql_store_result(mysql)))
die("Query failed while probing slave for stop: %s",
mysql_error(mysql));
if (!(row=mysql_fetch_row(res)) || !row[1])
{
mysql_free_result(res);
die("Strange result from query while probing slave for stop");
}
done = !strcmp(row[1],"OFF");
mysql_free_result(res);
if (done)
break;
#ifndef OS2
t.tv_sec=0;
t.tv_usec=SLAVE_POLL_INTERVAL;
select(0,0,0,0,&t); /* sleep */
#else
DosSleep(OS2_SLAVE_POLL_INTERVAL);
#endif
}
return 0;
}
int do_require_manager(struct st_query* __attribute__((unused)) q)
{
if (!manager)
......@@ -2335,6 +2379,7 @@ int main(int argc, char** argv)
case Q_DISABLE_RESULT_LOG: disable_result_log=1; break;
case Q_SOURCE: do_source(q); break;
case Q_SLEEP: do_sleep(q); break;
case Q_WAIT_FOR_SLAVE_TO_STOP: do_wait_for_slave_to_stop(q); break;
case Q_REQUIRE_MANAGER: do_require_manager(q); break;
#ifndef EMBEDDED_LIBRARY
case Q_SERVER_START: do_server_start(q); break;
......
......@@ -643,7 +643,10 @@ extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count);
extern int my_b_append(IO_CACHE *info,const byte *Buffer,uint Count);
extern int my_block_write(IO_CACHE *info, const byte *Buffer,
uint Count, my_off_t pos);
extern int flush_io_cache(IO_CACHE *info);
extern int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock);
#define flush_io_cache(info) _flush_io_cache((info),1)
extern int end_io_cache(IO_CACHE *info);
extern uint my_b_fill(IO_CACHE *info);
extern void my_b_seek(IO_CACHE *info,my_off_t pos);
......
......@@ -33,7 +33,6 @@ master-bin.003
insert into t2 values(1234);
set insert_id=1234;
insert into t2 values(NULL);
slave stop;
set sql_slave_skip_counter=1;
slave start;
purge master logs to 'master-bin.003';
......@@ -66,7 +65,7 @@ slave stop;
slave start;
show slave status;
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
127.0.0.1 root MASTER_PORT 60 master-bin.006 445 mysql-relay-bin.004 1312 master-bin.006 Yes Yes 0 0 445
127.0.0.1 root MASTER_PORT 60 master-bin.006 445 mysql-relay-bin.004 1376 master-bin.006 Yes Yes 0 0 445
lock tables t3 read;
select count(*) from t3 where n >= 4;
count(*)
......
......@@ -51,9 +51,7 @@ insert into t2 values(NULL);
connection slave;
sync_with_master;
#the slave may have already stopped, so we ignore the error
--error 0,1199
!slave stop;
wait_for_slave_to_stop;
#restart slave skipping one event
set sql_slave_skip_counter=1;
......
......@@ -808,13 +808,19 @@ int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
Buffer+=rest_length;
Count-=rest_length;
info->write_pos+=rest_length;
if (flush_io_cache(info))
if (_flush_io_cache(info,0))
{
unlock_append_buffer(info);
return 1;
}
if (Count >= IO_SIZE)
{ /* Fill first intern buffer */
length=Count & (uint) ~(IO_SIZE-1);
if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
{
unlock_append_buffer(info);
return info->error= -1;
}
Count-=length;
Buffer+=length;
}
......@@ -883,14 +889,16 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
/* Flush write cache */
int flush_io_cache(IO_CACHE *info)
int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
{
uint length;
my_bool append_cache;
my_off_t pos_in_file;
DBUG_ENTER("flush_io_cache");
append_cache = (info->type == SEQ_READ_APPEND);
if (!(append_cache = (info->type == SEQ_READ_APPEND)))
need_append_buffer_lock=0;
if (info->type == WRITE_CACHE || append_cache)
{
if (info->file == -1)
......@@ -898,6 +906,8 @@ int flush_io_cache(IO_CACHE *info)
if (real_open_cached_file(info))
DBUG_RETURN((info->error= -1));
}
if (need_append_buffer_lock)
lock_append_buffer(info);
if ((length=(uint) (info->write_pos - info->write_buffer)))
{
pos_in_file=info->pos_in_file;
......@@ -909,6 +919,8 @@ int flush_io_cache(IO_CACHE *info)
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
MY_FILEPOS_ERROR)
{
if (need_append_buffer_lock)
unlock_append_buffer(info);
DBUG_RETURN((info->error= -1));
}
if (!append_cache)
......@@ -932,6 +944,8 @@ int flush_io_cache(IO_CACHE *info)
info->end_of_file+=(info->write_pos-info->append_read_pos);
info->append_read_pos=info->write_pos=info->write_buffer;
if (need_append_buffer_lock)
unlock_append_buffer(info);
DBUG_RETURN(info->error);
}
}
......@@ -942,6 +956,8 @@ int flush_io_cache(IO_CACHE *info)
info->inited=0;
}
#endif
if (need_append_buffer_lock)
unlock_append_buffer(info);
DBUG_RETURN(0);
}
......
......@@ -703,12 +703,37 @@ void MYSQL_LOG::new_file(bool inside_mutex)
}
}
bool MYSQL_LOG::append(Log_event* ev)
{
bool error = 0;
pthread_mutex_lock(&LOCK_log);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
// Log_event::write() is smart enough to use my_b_write() or
// my_b_append() depending on the kind of cache we have
if (ev->write(&log_file))
{
error=1;
goto err;
}
if ((uint)my_b_append_tell(&log_file) > max_binlog_size)
{
new_file(1);
}
signal_update();
err:
pthread_mutex_unlock(&LOCK_log);
return error;
}
bool MYSQL_LOG::appendv(const char* buf, uint len,...)
{
bool error = 0;
va_list(args);
va_start(args,len);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
pthread_mutex_lock(&LOCK_log);
do
{
......
......@@ -26,6 +26,18 @@
#include <assert.h>
inline int my_b_safe_write(IO_CACHE* file, const char* buf,
int len)
{
// Sasha: We are not writing this with the ? operator to avoid hitting
// a possible compiler bug. At least gcc 2.95 cannot deal with
// several layers of ternary operators that evaluated comma(,) operator
// expressions inside - I do have a test case if somebody wants it
if (file->type == SEQ_READ_APPEND)
return my_b_append(file,buf,len);
return my_b_write(file,buf,len);
}
#ifdef MYSQL_CLIENT
static void pretty_print_str(FILE* file, char* str, int len)
{
......@@ -403,7 +415,7 @@ int Log_event::write_header(IO_CACHE* file)
pos += 4;
int2store(pos, flags);
pos += 2;
return (my_b_write(file, (byte*) buf, (uint) (pos - buf)));
return (my_b_safe_write(file, (byte*) buf, (uint) (pos - buf)));
}
#ifndef MYSQL_CLIENT
......@@ -677,7 +689,7 @@ int Start_log_event::write_data(IO_CACHE* file)
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
int4store(buff + ST_CREATED_OFFSET,created);
return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
}
Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
......@@ -714,8 +726,8 @@ int Rotate_log_event::write_data(IO_CACHE* file)
{
char buf[ROTATE_HEADER_LEN];
int8store(buf, pos + R_POS_OFFSET);
return my_b_write(file, (byte*)buf, ROTATE_HEADER_LEN) ||
my_b_write(file, (byte*)new_log_ident, (uint) ident_len);
return my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) ||
my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len);
}
#ifndef MYSQL_CLIENT
......@@ -812,9 +824,9 @@ int Query_log_event::write_data(IO_CACHE* file)
buf[Q_DB_LEN_OFFSET] = (char)db_len;
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
return (my_b_write(file, (byte*) buf, QUERY_HEADER_LEN) ||
my_b_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
my_b_write(file, (byte*) query, q_len)) ? -1 : 0;
return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) ||
my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0;
}
Intvar_log_event::Intvar_log_event(const char* buf, bool old_format):
......@@ -840,7 +852,7 @@ int Intvar_log_event::write_data(IO_CACHE* file)
char buf[9];
buf[I_TYPE_OFFSET] = type;
int8store(buf + I_VAL_OFFSET, val);
return my_b_write(file, (byte*) buf, sizeof(buf));
return my_b_safe_write(file, (byte*) buf, sizeof(buf));
}
#ifdef MYSQL_CLIENT
......@@ -878,7 +890,7 @@ int Load_log_event::write_data_header(IO_CACHE* file)
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
buf[L_DB_LEN_OFFSET] = (char)db_len;
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
return my_b_write(file, (byte*)buf, LOAD_HEADER_LEN);
return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN);
}
int Load_log_event::write_data_body(IO_CACHE* file)
......@@ -886,20 +898,20 @@ int Load_log_event::write_data_body(IO_CACHE* file)
if (sql_ex.write_data(file)) return 1;
if (num_fields && fields && field_lens)
{
if (my_b_write(file, (byte*)field_lens, num_fields) ||
my_b_write(file, (byte*)fields, field_block_len))
if (my_b_safe_write(file, (byte*)field_lens, num_fields) ||
my_b_safe_write(file, (byte*)fields, field_block_len))
return 1;
}
return (my_b_write(file, (byte*)table_name, table_name_len + 1) ||
my_b_write(file, (byte*)db, db_len + 1) ||
my_b_write(file, (byte*)fname, fname_len));
return (my_b_safe_write(file, (byte*)table_name, table_name_len + 1) ||
my_b_safe_write(file, (byte*)db, db_len + 1) ||
my_b_safe_write(file, (byte*)fname, fname_len));
}
static bool write_str(IO_CACHE *file, char *str, byte length)
{
return (my_b_write(file, &length, 1) ||
my_b_write(file, (byte*) str, (int) length));
return (my_b_safe_write(file, &length, 1) ||
my_b_safe_write(file, (byte*) str, (int) length));
}
int sql_ex_info::write_data(IO_CACHE* file)
......@@ -911,7 +923,7 @@ int sql_ex_info::write_data(IO_CACHE* file)
write_str(file, line_term, line_term_len) ||
write_str(file, line_start, line_start_len) ||
write_str(file, escaped, escaped_len) ||
my_b_write(file,(byte*) &opt_flags,1));
my_b_safe_write(file,(byte*) &opt_flags,1));
}
else
{
......@@ -923,7 +935,7 @@ int sql_ex_info::write_data(IO_CACHE* file)
old_ex.escaped= *escaped;
old_ex.opt_flags= opt_flags;
old_ex.empty_flags=empty_flags;
return my_b_write(file, (byte*) &old_ex, sizeof(old_ex));
return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex));
}
}
......@@ -1280,7 +1292,7 @@ int Slave_log_event::write_data(IO_CACHE* file)
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
// log and host are already there
return my_b_write(file, (byte*)mem_pool, get_data_size());
return my_b_safe_write(file, (byte*)mem_pool, get_data_size());
}
void Slave_log_event::init_from_mem_pool(int data_size)
......@@ -1330,8 +1342,8 @@ int Create_file_log_event::write_data_body(IO_CACHE* file)
int res;
if ((res = Load_log_event::write_data_body(file)) || fake_base)
return res;
return (my_b_write(file, (byte*) "", 1) ||
my_b_write(file, (byte*) block, block_len));
return (my_b_safe_write(file, (byte*) "", 1) ||
my_b_safe_write(file, (byte*) block, block_len));
}
int Create_file_log_event::write_data_header(IO_CACHE* file)
......@@ -1341,7 +1353,7 @@ int Create_file_log_event::write_data_header(IO_CACHE* file)
return res;
byte buf[CREATE_FILE_HEADER_LEN];
int4store(buf + CF_FILE_ID_OFFSET, file_id);
return my_b_write(file, buf, CREATE_FILE_HEADER_LEN);
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN);
}
int Create_file_log_event::write_base(IO_CACHE* file)
......@@ -1423,8 +1435,8 @@ int Append_block_log_event::write_data(IO_CACHE* file)
{
byte buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id);
return (my_b_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
my_b_write(file, (byte*) block, block_len));
return (my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
my_b_safe_write(file, (byte*) block, block_len));
}
#ifdef MYSQL_CLIENT
......@@ -1473,7 +1485,7 @@ int Delete_file_log_event::write_data(IO_CACHE* file)
{
byte buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id);
return my_b_write(file, buf, DELETE_FILE_HEADER_LEN);
return my_b_safe_write(file, buf, DELETE_FILE_HEADER_LEN);
}
#ifdef MYSQL_CLIENT
......@@ -1520,7 +1532,7 @@ int Execute_load_log_event::write_data(IO_CACHE* file)
{
byte buf[EXEC_LOAD_HEADER_LEN];
int4store(buf + EL_FILE_ID_OFFSET, file_id);
return my_b_write(file, buf, EXEC_LOAD_HEADER_LEN);
return my_b_safe_write(file, buf, EXEC_LOAD_HEADER_LEN);
}
#ifdef MYSQL_CLIENT
......
......@@ -54,6 +54,9 @@ static int stuck_count = 0;
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
void skip_load_data_infile(NET* net);
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
static int queue_old_event(MASTER_INFO* mi, const char* buf,
uint event_len);
static inline bool slave_killed(THD* thd,MASTER_INFO* mi);
static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli);
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
......@@ -1918,34 +1921,86 @@ the slave SQL thread with \"mysqladmin start-slave\". We stopped at log \
DBUG_RETURN(0); // Can't return anything here
}
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev)
{
if (!rev->is_valid())
return 1;
DBUG_ASSERT(rev->ident_len<sizeof(mi->master_log_name));
memcpy(mi->master_log_name,rev->new_log_ident,
rev->ident_len);
mi->master_log_name[rev->ident_len] = 0;
mi->master_log_pos = rev->pos;
#ifndef DBUG_OFF
/* if we do not do this, we will be getting the first
rotate event forever, so
we need to not disconnect after one
*/
if (disconnect_slave_event_count)
events_till_disconnect++;
#endif
return 0;
}
static int queue_old_event(MASTER_INFO* mi, const char* buf,
uint event_len)
{
const char* errmsg = 0;
bool inc_pos = 1;
Log_event* ev = Log_event::read_log_event(buf,event_len, &errmsg,
1/*old format*/);
if (!ev)
{
sql_print_error("Read invalid event from master: '%s',\
master could be corrupt but a more likely cause of this is a bug",
errmsg);
return 1;
}
ev->log_pos = mi->master_log_pos;
switch (ev->get_type_code())
{
case ROTATE_EVENT:
if (process_io_rotate(mi,(Rotate_log_event*)ev))
{
delete ev;
return 1;
}
inc_pos = 0;
break;
case LOAD_EVENT:
// TODO: actually process it
mi->master_log_pos += event_len;
return 0;
break;
default:
break;
}
if (mi->rli.relay_log.append(ev))
{
delete ev;
return 1;
}
delete ev;
if (inc_pos)
mi->master_log_pos += event_len;
return 0;
}
int queue_event(MASTER_INFO* mi,const char* buf,uint event_len)
{
int error;
bool inc_pos = 1;
if (mi->old_format)
return 1; // TODO: deal with old format
return queue_old_event(mi,buf,event_len);
// TODO: figure out if other events in addition to Rotate
// require special processing
switch (buf[EVENT_TYPE_OFFSET])
{
case ROTATE_EVENT:
{
Rotate_log_event rev(buf,event_len,0);
if (!rev.is_valid())
if (process_io_rotate(mi,&rev))
return 1;
DBUG_ASSERT(rev.ident_len<sizeof(mi->master_log_name));
memcpy(mi->master_log_name,rev.new_log_ident,
rev.ident_len);
mi->master_log_name[rev.ident_len] = 0;
mi->master_log_pos = rev.pos;
inc_pos = 0;
#ifndef DBUG_OFF
/* if we do not do this, we will be getting the first
rotate event forever, so
we need to not disconnect after one
*/
if (disconnect_slave_event_count)
events_till_disconnect++;
#endif
inc_pos=0;
break;
}
default:
......
......@@ -108,6 +108,7 @@ public:
//v stands for vector
//invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0)
bool appendv(const char* buf,uint len,...);
bool append(Log_event* ev);
int generate_new_name(char *new_name,const char *old_name);
void make_log_name(char* buf, const char* log_ident);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment