Commit 7e52afcd authored by unknown's avatar unknown

sql/log.cc

    Added magic number to binlog
sql/log_event.cc
    distinquish bogus data from truncated logs
sql/log_event.h
    added magic number
    added LOG_READ_TRUNC error
sql/mysqlbinlog.cc
    fixed to handle magic number
    added O_BINARY to my_fopen
sql/mysqld.cc
    added code for replicate-rewrite-db
sql/slave.cc
    replicate-rewrite-db
    O_BINARY
    handle magic
sql/sql_class.h
    added i_string_pair class
sql/sql_repl.cc
    added magic
    better error messages
support-files/magic
    added magic for binlog

Added test case for replication of queries with error


sql/log.cc:
  Added magic number to binlog
sql/log_event.cc:
  distinquish bogus data from truncated logs
sql/log_event.h:
  added magic number
  added LOG_READ_TRUNC error
sql/mysqlbinlog.cc:
  fixed to handle magic number
  added O_BINARY to my_fopen
sql/mysqld.cc:
  added code for replicate-rewrite-db
sql/slave.cc:
  replicate-rewrite-db
  O_BINARY
  handle magic
sql/sql_class.h:
  added i_string_pair class
sql/sql_repl.cc:
  added magic
  better error messages
support-files/magic:
  added magic for binlog
parent 295c3d1f
source ../include/master-slave.inc;
connection master;
drop table if exists x;
create table x(n int primary key);
!insert into x values (1),(2),(2);
insert into x values (3);
connection slave;
sleep 3;
@x.master select * from x;
...@@ -149,6 +149,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -149,6 +149,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
fn_format(index_file_name, name, mysql_data_home, ".index", 6); fn_format(index_file_name, name, mysql_data_home, ".index", 6);
db[0]=0; db[0]=0;
MY_STAT tmp_stat;
bool do_magic = ((log_type == LOG_BIN) && !my_stat(log_file_name,
&tmp_stat, MYF(0)));
file=my_fopen(log_file_name,O_APPEND | O_WRONLY | O_BINARY, file=my_fopen(log_file_name,O_APPEND | O_WRONLY | O_BINARY,
MYF(MY_WME | ME_WAITTANG)); MYF(MY_WME | ME_WAITTANG));
if (!file) if (!file)
...@@ -187,10 +191,18 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -187,10 +191,18 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
} }
else if (log_type == LOG_BIN) else if (log_type == LOG_BIN)
{ {
Start_log_event s;
if(!index_file && // Explanation of the boolean black magic:
//
// if we are supposed to write magic number try write
// clean up if failed
// then if index_file has not been previously opened, try to open it
// clean up if failed
if((do_magic && my_fwrite(file, (byte*)BINLOG_MAGIC, 4,
MYF(MY_NABP|MY_WME)) ||
(!index_file &&
!(index_file = my_fopen(index_file_name,O_APPEND | O_BINARY | O_RDWR, !(index_file = my_fopen(index_file_name,O_APPEND | O_BINARY | O_RDWR,
MYF(MY_WME)))) MYF(MY_WME))))))
{ {
my_fclose(file,MYF(MY_WME)); my_fclose(file,MYF(MY_WME));
my_free(name,MYF(0)); my_free(name,MYF(0));
...@@ -199,6 +211,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -199,6 +211,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
log_type=LOG_CLOSED; log_type=LOG_CLOSED;
return; return;
} }
Start_log_event s;
s.write(file); s.write(file);
pthread_mutex_lock(&LOCK_index); pthread_mutex_lock(&LOCK_index);
my_fseek(index_file, 0L, MY_SEEK_END, MYF(MY_WME)); my_fseek(index_file, 0L, MY_SEEK_END, MYF(MY_WME));
......
...@@ -100,7 +100,7 @@ int Log_event::read_log_event(FILE* file, String* packet, ...@@ -100,7 +100,7 @@ int Log_event::read_log_event(FILE* file, String* packet,
{ {
if(log_lock) if(log_lock)
pthread_mutex_unlock(log_lock); pthread_mutex_unlock(log_lock);
return feof(file) ? LOG_READ_BOGUS: LOG_READ_IO; return feof(file) ? LOG_READ_TRUNC: LOG_READ_IO;
} }
if(log_lock) pthread_mutex_unlock(log_lock); if(log_lock) pthread_mutex_unlock(log_lock);
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#define LOG_READ_BOGUS -2 #define LOG_READ_BOGUS -2
#define LOG_READ_IO -3 #define LOG_READ_IO -3
#define LOG_READ_MEM -5 #define LOG_READ_MEM -5
#define LOG_READ_TRUNC -6
#define LOG_EVENT_OFFSET 4 #define LOG_EVENT_OFFSET 4
#define BINLOG_VERSION 1 #define BINLOG_VERSION 1
...@@ -42,6 +43,7 @@ ...@@ -42,6 +43,7 @@
#define ROTATE_EVENT_OVERHEAD LOG_EVENT_HEADER_LEN #define ROTATE_EVENT_OVERHEAD LOG_EVENT_HEADER_LEN
#define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN+sizeof(sql_ex_info)) #define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN+sizeof(sql_ex_info))
#define BINLOG_MAGIC "\xfe\x62\x69\x6e"
enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2, enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2,
STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5, STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5,
......
...@@ -267,6 +267,15 @@ static void dump_remote_log_entries(const char* logname) ...@@ -267,6 +267,15 @@ static void dump_remote_log_entries(const char* logname)
char buf[128]; char buf[128];
uint len; uint len;
NET* net = &mysql->net; NET* net = &mysql->net;
if(!position) position = 4; // protect the innocent from spam
if(position < 4)
{
position = 4;
// warn the guity
fprintf(stderr,
"Warning: with the position so small you would hit the magic number\n\
Unfortunately, no sweepstakes today, adjusted position to 4\n");
}
int4store(buf, position); int4store(buf, position);
int2store(buf + 4, binlog_flags); int2store(buf + 4, binlog_flags);
len = (uint) strlen(logname); len = (uint) strlen(logname);
...@@ -305,7 +314,7 @@ static void dump_local_log_entries(const char* logname) ...@@ -305,7 +314,7 @@ static void dump_local_log_entries(const char* logname)
int rec_count = 0; int rec_count = 0;
if(logname && logname[0] != '-') if(logname && logname[0] != '-')
file = my_fopen(logname, O_RDONLY, MYF(MY_WME)); file = my_fopen(logname, O_RDONLY|O_BINARY, MYF(MY_WME));
else else
file = stdin; file = stdin;
...@@ -314,6 +323,15 @@ static void dump_local_log_entries(const char* logname) ...@@ -314,6 +323,15 @@ static void dump_local_log_entries(const char* logname)
if(my_fseek(file, position, MY_SEEK_SET, MYF(MY_WME))) if(my_fseek(file, position, MY_SEEK_SET, MYF(MY_WME)))
die("failed on my_fseek()"); die("failed on my_fseek()");
if(!position)
{
char magic[4];
if(my_fread(file, magic, sizeof(magic), MYF(MY_NABP|MY_WME)))
die("I/O error reading binlog magic number");
if(memcmp(magic, BINLOG_MAGIC, 4))
die("Bad magic number");
}
while(1) while(1)
{ {
......
...@@ -177,6 +177,7 @@ static VioSSLAcceptorFd* ssl_acceptor_fd = 0; ...@@ -177,6 +177,7 @@ static VioSSLAcceptorFd* ssl_acceptor_fd = 0;
extern bool slave_running; extern bool slave_running;
I_List <i_string_pair> replicate_rewrite_db;
I_List<i_string> replicate_do_db, replicate_ignore_db; I_List<i_string> replicate_do_db, replicate_ignore_db;
// allow the user to tell us which db to replicate and which to ignore // allow the user to tell us which db to replicate and which to ignore
I_List<i_string> binlog_do_db, binlog_ignore_db; I_List<i_string> binlog_do_db, binlog_ignore_db;
...@@ -2854,6 +2855,38 @@ static void get_options(int argc,char **argv) ...@@ -2854,6 +2855,38 @@ static void get_options(int argc,char **argv)
replicate_do_db.push_back(db); replicate_do_db.push_back(db);
break; break;
} }
case (int)OPT_REPLICATE_REWRITE_DB:
{
char* key = optarg,*p, *val;
p = strstr(optarg, "->");
if(!p)
{
fprintf(stderr,
"bad syntax in replicate-rewrite-db - missing ->\n");
exit(1);
}
val = p--;
while(isspace(*p) && p > optarg) *p-- = 0;
if(p == optarg)
{
fprintf(stderr,
"bad syntax in replicate-rewrite-db - empty FROM db\n");
exit(1);
}
*val = 0;
val += 2;
while(*val && isspace(*val)) *val++;
if(!*val)
{
fprintf(stderr,
"bad syntax in replicate-rewrite-db - empty TO db\n");
exit(1);
}
i_string_pair* db_pair = new i_string_pair(key, val);
replicate_rewrite_db.push_back(db_pair);
break;
}
case (int)OPT_BINLOG_IGNORE_DB: case (int)OPT_BINLOG_IGNORE_DB:
{ {
......
...@@ -30,6 +30,7 @@ extern my_string master_user, master_password, master_host, ...@@ -30,6 +30,7 @@ extern my_string master_user, master_password, master_host,
master_info_file; master_info_file;
extern I_List<i_string> replicate_do_db, replicate_ignore_db; extern I_List<i_string> replicate_do_db, replicate_ignore_db;
extern I_List<i_string_pair> replicate_rewrite_db;
extern I_List<THD> threads; extern I_List<THD> threads;
bool slave_running = 0; bool slave_running = 0;
pthread_t slave_real_id; pthread_t slave_real_id;
...@@ -48,6 +49,7 @@ static int safe_sleep(THD* thd, int sec); ...@@ -48,6 +49,7 @@ static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, char* db, char* table); static int request_table_dump(MYSQL* mysql, char* db, char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db, static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name); const char* table_name);
static inline char* rewrite_db(char* db);
static inline bool slave_killed(THD* thd) static inline bool slave_killed(THD* thd)
{ {
...@@ -62,6 +64,20 @@ static inline void skip_load_data_infile(NET* net) ...@@ -62,6 +64,20 @@ static inline void skip_load_data_infile(NET* net)
send_ok(net); // the master expects it send_ok(net); // the master expects it
} }
static inline char* rewrite_db(char* db)
{
if(replicate_rewrite_db.is_empty() || !db) return db;
I_List_iterator<i_string_pair> it(replicate_rewrite_db);
i_string_pair* tmp;
while((tmp=it++))
{
if(!strcmp(tmp->key, db))
return tmp->val;
}
return db;
}
int db_ok(const char* db, I_List<i_string> &do_list, int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list ) I_List<i_string> &ignore_list )
...@@ -278,11 +294,11 @@ int init_master_info(MASTER_INFO* mi) ...@@ -278,11 +294,11 @@ int init_master_info(MASTER_INFO* mi)
if(!my_stat(fname, &stat_area, MYF(0))) // we do not want any messages if(!my_stat(fname, &stat_area, MYF(0))) // we do not want any messages
// if the file does not exist // if the file does not exist
{ {
file = my_fopen(fname, O_CREAT|O_RDWR, MYF(MY_WME)); file = my_fopen(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME));
if(!file) if(!file)
return 1; return 1;
mi->log_file_name[0] = 0; mi->log_file_name[0] = 0;
mi->pos = 0; mi->pos = 4; // skip magic number
mi->file = file; mi->file = file;
if(master_host) if(master_host)
...@@ -299,7 +315,7 @@ int init_master_info(MASTER_INFO* mi) ...@@ -299,7 +315,7 @@ int init_master_info(MASTER_INFO* mi)
} }
else else
{ {
file = my_fopen(fname, O_RDWR, MYF(MY_WME)); file = my_fopen(fname, O_RDWR|O_BINARY, MYF(MY_WME));
if(!file) if(!file)
return 1; return 1;
...@@ -589,7 +605,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -589,7 +605,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
Query_log_event* qev = (Query_log_event*)ev; Query_log_event* qev = (Query_log_event*)ev;
int q_len = qev->q_len; int q_len = qev->q_len;
init_sql_alloc(&thd->mem_root, 8192,0); init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = (char*)qev->db; thd->db = rewrite_db((char*)qev->db);
if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{ {
thd->query = (char*)qev->query; thd->query = (char*)qev->query;
...@@ -645,7 +661,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -645,7 +661,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
{ {
Load_log_event* lev = (Load_log_event*)ev; Load_log_event* lev = (Load_log_event*)ev;
init_sql_alloc(&thd->mem_root, 8192,0); init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = (char*)lev->db; thd->db = rewrite_db((char*)lev->db);
thd->query = 0; thd->query = 0;
thd->query_error = 0; thd->query_error = 0;
...@@ -766,7 +782,8 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -766,7 +782,8 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
int ident_len = rev->ident_len; int ident_len = rev->ident_len;
memcpy(mi->log_file_name, rev->new_log_ident,ident_len ); memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
mi->log_file_name[ident_len] = 0; mi->log_file_name[ident_len] = 0;
mi->pos = 0; mi->pos = 4; // skip magic number
flush_master_info(mi);
break; break;
} }
......
...@@ -218,6 +218,17 @@ class i_string: public ilink ...@@ -218,6 +218,17 @@ class i_string: public ilink
i_string(char* s) : ptr(s) {} i_string(char* s) : ptr(s) {}
}; };
//needed for linked list of two strings for replicate-rewrite-db
class i_string_pair: public ilink
{
public:
char* key;
char* val;
i_string_pair():key(0),val(0) { }
i_string_pair(char* key, char* val) : key(key),val(val) {}
};
/**************************************************************************** /****************************************************************************
** every connection is handle by a thread with a THD ** every connection is handle by a thread with a THD
****************************************************************************/ ****************************************************************************/
......
...@@ -98,6 +98,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -98,6 +98,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
LOG_INFO linfo; LOG_INFO linfo;
char *log_file_name = linfo.log_file_name; char *log_file_name = linfo.log_file_name;
char search_file_name[FN_REFLEN]; char search_file_name[FN_REFLEN];
char magic[4];
FILE* log = NULL; FILE* log = NULL;
String* packet = &thd->packet; String* packet = &thd->packet;
int error; int error;
...@@ -129,7 +130,25 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -129,7 +130,25 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
errmsg = "Could not open log file"; errmsg = "Could not open log file";
goto err; goto err;
} }
if(my_fread(log, magic, sizeof(magic), MYF(MY_NABP|MY_WME)))
{
errmsg = "I/O error reading binlog magic number";
goto err;
}
if(memcmp(magic, BINLOG_MAGIC, 4))
{
errmsg = "Binlog has bad magic number, fire your magician";
goto err;
}
if(pos < 4)
{
errmsg = "Contratulations! You have hit the magic number and can win \
sweepstakes if you report the bug";
goto err;
}
if(my_fseek(log, pos, MY_SEEK_SET, MYF(MY_WME)) == MY_FILEPOS_ERROR ) if(my_fseek(log, pos, MY_SEEK_SET, MYF(MY_WME)) == MY_FILEPOS_ERROR )
{ {
errmsg = "Error on fseek()"; errmsg = "Error on fseek()";
...@@ -168,7 +187,21 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -168,7 +187,21 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
} }
if(error != LOG_READ_EOF) if(error != LOG_READ_EOF)
{ {
errmsg = "error reading log event"; switch(error)
{
case LOG_READ_BOGUS:
errmsg = "bogus data in log event";
break;
case LOG_READ_IO:
errmsg = "I/O error reading log event";
break;
case LOG_READ_MEM:
errmsg = "memory allocation failed reading log event";
break;
case LOG_READ_TRUNC:
errmsg = "binlog truncated in the middle of event";
break;
}
goto err; goto err;
} }
...@@ -261,7 +294,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -261,7 +294,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
} }
else else
{ {
bool loop_breaker = 0; // need this to break out of the for loop from switch bool loop_breaker = 0;
// need this to break out of the for loop from switch
thd->proc_info = "switching to next log"; thd->proc_info = "switching to next log";
switch(mysql_bin_log.find_next_log(&linfo)) switch(mysql_bin_log.find_next_log(&linfo))
{ {
...@@ -281,14 +315,31 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) ...@@ -281,14 +315,31 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
(void) my_fclose(log, MYF(MY_WME)); (void) my_fclose(log, MYF(MY_WME));
log = my_fopen(log_file_name, O_RDONLY|O_BINARY, MYF(MY_WME)); log = my_fopen(log_file_name, O_RDONLY|O_BINARY, MYF(MY_WME));
if(!log) if(!log)
goto err; {
errmsg = "Could not open next log";
goto err;
}
//check the magic
if(my_fread(log, magic, sizeof(magic), MYF(MY_NABP|MY_WME)))
{
errmsg = "I/O error reading binlog magic number";
goto err;
}
if(memcmp(magic, BINLOG_MAGIC, 4))
{
errmsg = "Binlog has bad magic number, fire your magician";
goto err;
}
// fake Rotate_log event just in case it did not make it to the log // fake Rotate_log event just in case it did not make it to the log
// otherwise the slave make get confused about the offset // otherwise the slave make get confused about the offset
{ {
char header[LOG_EVENT_HEADER_LEN]; char header[LOG_EVENT_HEADER_LEN];
memset(header, 0, 4); // when does not matter memset(header, 0, 4); // when does not matter
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT; header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
char* p = strrchr(log_file_name, FN_LIBCHAR); // find the last slash char* p = strrchr(log_file_name, FN_LIBCHAR);
// find the last slash
if(p) if(p)
p++; p++;
else else
......
...@@ -12,3 +12,4 @@ ...@@ -12,3 +12,4 @@
>3 byte x Version %d >3 byte x Version %d
0 belong&0xffffff00 0xfefe0600 MySQL ISAM compressed data file 0 belong&0xffffff00 0xfefe0600 MySQL ISAM compressed data file
>3 byte x Version %d >3 byte x Version %d
0 string \376bin MySQL replication log
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment