LOAD DATA INFILE is now replicated properly, except for cleanup on

Stop event and bugs the test suite could not catch
Did some big restructuring of binlog event classes - most important
change is that now each event class has exec_event method and one does
not need to modify slave core code to add a new event. Slave code is
now much smaller and easier to read
parent d4d22f21
...@@ -281,6 +281,8 @@ typedef struct st_io_cache /* Used when cacheing files */ ...@@ -281,6 +281,8 @@ typedef struct st_io_cache /* Used when cacheing files */
/* callbacks when the actual read I/O happens */ /* callbacks when the actual read I/O happens */
IO_CACHE_CALLBACK pre_read; IO_CACHE_CALLBACK pre_read;
IO_CACHE_CALLBACK post_read; IO_CACHE_CALLBACK post_read;
IO_CACHE_CALLBACK pre_close;
void* arg; /* for use by pre/post_read */
char *file_name; /* if used with 'open_cached_file' */ char *file_name; /* if used with 'open_cached_file' */
char *dir,*prefix; char *dir,*prefix;
File file; File file;
......
...@@ -5,8 +5,9 @@ master-bin.001 172 Intvar 1 3 INSERT_ID=1 ...@@ -5,8 +5,9 @@ master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
master-bin.001 263 Query 1 5 use test; drop table t1 master-bin.001 263 Query 1 5 use test; drop table t1
master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null) master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null)
master-bin.001 386 Load 1 7 use test; LOAD DATA INFILE '../../std_data/words.dat' INTO TABLE t1 FIELDS TERMINATED BY '\\t' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' (word) master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=11;block_len=81
master-bin.001 468 Query 1 8 use test; drop table t1 master-bin.001 554 Exec_load 1 8 ;file_id=11
master-bin.001 577 Query 1 9 use test; drop table t1
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
...@@ -21,10 +22,11 @@ master-bin.001 172 Intvar 1 3 INSERT_ID=1 ...@@ -21,10 +22,11 @@ master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
master-bin.001 263 Query 1 5 use test; drop table t1 master-bin.001 263 Query 1 5 use test; drop table t1
master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null) master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null)
master-bin.001 386 Load 1 7 use test; LOAD DATA INFILE '../../std_data/words.dat' INTO TABLE t1 FIELDS TERMINATED BY '\\t' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' (word) master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=11;block_len=81
master-bin.001 468 Query 1 8 use test; drop table t1 master-bin.001 554 Exec_load 1 8 ;file_id=11
master-bin.001 516 Rotate 1 9 master-bin.002;pos=4 master-bin.001 577 Query 1 9 use test; drop table t1
master-bin.001 557 Stop 1 10 master-bin.001 625 Rotate 1 10 master-bin.002;pos=4
master-bin.001 666 Stop 1 11
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
master-bin.002 79 Query 1 2 use test; create table t1 (n int) master-bin.002 79 Query 1 2 use test; create table t1 (n int)
...@@ -38,18 +40,20 @@ slave-bin.001 ...@@ -38,18 +40,20 @@ slave-bin.001
slave-bin.002 slave-bin.002
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2
slave-bin.001 79 Slave 2 2 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4 slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4
slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key) slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
slave-bin.001 225 Intvar 1 3 INSERT_ID=1 slave-bin.001 225 Intvar 1 3 INSERT_ID=1
slave-bin.001 253 Query 1 4 use test; insert into t1 values (NULL) slave-bin.001 253 Query 1 4 use test; insert into t1 values (NULL)
slave-bin.001 316 Query 1 5 use test; drop table t1 slave-bin.001 316 Query 1 5 use test; drop table t1
slave-bin.001 364 Query 1 6 use test; create table t1 (word char(20) not null) slave-bin.001 364 Query 1 6 use test; create table t1 (word char(20) not null)
slave-bin.001 439 Query 1 8 use test; drop table t1 slave-bin.001 439 Create_file 1 7 db=test;table=t1;file_id=11;block_len=81
slave-bin.001 487 Rotate 2 3 slave-bin.002;pos=4; forced by master slave-bin.001 647 Exec_load 1 8 ;file_id=11
slave-bin.001 527 Stop 2 4 slave-bin.001 670 Query 1 9 use test; drop table t1
slave-bin.001 718 Rotate 1 4 slave-bin.002;pos=4; forced by master
slave-bin.001 758 Stop 2 5
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2
slave-bin.002 79 Slave 2 2 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4 slave-bin.002 79 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
slave-bin.002 132 Query 1 2 use test; create table t1 (n int) slave-bin.002 132 Query 1 2 use test; create table t1 (n int)
slave-bin.002 190 Query 1 3 use test; insert into t1 values (1) slave-bin.002 190 Query 1 3 use test; insert into t1 values (1)
slave-bin.002 250 Query 1 4 use test; drop table t1 slave-bin.002 250 Query 1 4 use test; drop table t1
......
...@@ -56,7 +56,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, ...@@ -56,7 +56,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset)); DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset));
info->file=file; info->file=file;
info->pre_read = info->post_read = 0; info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
if (!cachesize) if (!cachesize)
if (! (cachesize= my_default_record_cache_size)) if (! (cachesize= my_default_record_cache_size))
DBUG_RETURN(1); /* No cache requested */ DBUG_RETURN(1); /* No cache requested */
...@@ -608,7 +609,10 @@ int flush_io_cache(IO_CACHE *info) ...@@ -608,7 +609,10 @@ int flush_io_cache(IO_CACHE *info)
int end_io_cache(IO_CACHE *info) int end_io_cache(IO_CACHE *info)
{ {
int error=0; int error=0;
IO_CACHE_CALLBACK pre_close;
DBUG_ENTER("end_io_cache"); DBUG_ENTER("end_io_cache");
if((pre_close=info->pre_close))
(*pre_close)(info);
if (info->buffer) if (info->buffer)
{ {
if (info->file != -1) /* File doesn't exist */ if (info->file != -1) /* File doesn't exist */
...@@ -618,3 +622,4 @@ int end_io_cache(IO_CACHE *info) ...@@ -618,3 +622,4 @@ int end_io_cache(IO_CACHE *info)
} }
DBUG_RETURN(error); DBUG_RETURN(error);
} /* end_io_cache */ } /* end_io_cache */
...@@ -535,8 +535,8 @@ void MYSQL_LOG::new_file(bool inside_mutex) ...@@ -535,8 +535,8 @@ void MYSQL_LOG::new_file(bool inside_mutex)
We log the whole file name for log file as the user may decide We log the whole file name for log file as the user may decide
to change base names at some point. to change base names at some point.
*/ */
Rotate_log_event r(new_name+dirname_length(new_name));
THD* thd = current_thd; THD* thd = current_thd;
Rotate_log_event r(thd,new_name+dirname_length(new_name));
r.set_log_seq(0, this); r.set_log_seq(0, this);
// this log rotation could have been initiated by a master of // this log rotation could have been initiated by a master of
// the slave running with log-bin // the slave running with log-bin
...@@ -638,24 +638,8 @@ bool MYSQL_LOG::write(THD *thd,enum enum_server_command command, ...@@ -638,24 +638,8 @@ bool MYSQL_LOG::write(THD *thd,enum enum_server_command command,
return 0; return 0;
} }
/* Write to binary log in a format to be used for replication */
bool MYSQL_LOG::write(Slave_log_event* event_info) bool MYSQL_LOG::write(Log_event* event_info)
{
bool error;
if (!inited) // Can't use mutex if not init
return 0;
VOID(pthread_mutex_lock(&LOCK_log));
if(!event_info->log_seq)
event_info->set_log_seq(current_thd, this);
error = event_info->write(&log_file);
flush_io_cache(&log_file);
VOID(pthread_mutex_unlock(&LOCK_log));
return error;
}
bool MYSQL_LOG::write(Query_log_event* event_info)
{ {
/* In most cases this is only called if 'is_open()' is true */ /* In most cases this is only called if 'is_open()' is true */
bool error=0; bool error=0;
...@@ -667,40 +651,42 @@ bool MYSQL_LOG::write(Query_log_event* event_info) ...@@ -667,40 +651,42 @@ bool MYSQL_LOG::write(Query_log_event* event_info)
if (is_open()) if (is_open())
{ {
THD *thd=event_info->thd; THD *thd=event_info->thd;
const char* db = event_info->get_db();
#ifdef USING_TRANSACTIONS #ifdef USING_TRANSACTIONS
IO_CACHE *file = (event_info->cache_stmt ? &thd->transaction.trans_log : IO_CACHE *file = ((event_info->cache_stmt && thd) ?
&thd->transaction.trans_log :
&log_file); &log_file);
#else #else
IO_CACHE *file = &log_file; IO_CACHE *file = &log_file;
#endif #endif
if ((!(thd->options & OPTION_BIN_LOG) && if ((thd && !(thd->options & OPTION_BIN_LOG) &&
(thd->master_access & PROCESS_ACL)) || (thd->master_access & PROCESS_ACL)) ||
!db_ok(event_info->db, binlog_do_db, binlog_ignore_db)) (db && !db_ok(db, binlog_do_db, binlog_ignore_db)))
{ {
VOID(pthread_mutex_unlock(&LOCK_log)); VOID(pthread_mutex_unlock(&LOCK_log));
return 0; return 0;
} }
error=1; error=1;
if (thd->last_insert_id_used) if (thd && thd->last_insert_id_used)
{ {
Intvar_log_event e((uchar)LAST_INSERT_ID_EVENT, thd->last_insert_id); Intvar_log_event e(thd,(uchar)LAST_INSERT_ID_EVENT,thd->last_insert_id);
e.set_log_seq(thd, this); e.set_log_seq(thd, this);
if (thd->server_id) if (thd->server_id)
e.server_id = thd->server_id; e.server_id = thd->server_id;
if (e.write(file)) if (e.write(file))
goto err; goto err;
} }
if (thd->insert_id_used) if (thd && thd->insert_id_used)
{ {
Intvar_log_event e((uchar)INSERT_ID_EVENT, thd->last_insert_id); Intvar_log_event e(thd,(uchar)INSERT_ID_EVENT,thd->last_insert_id);
e.set_log_seq(thd, this); e.set_log_seq(thd, this);
if (thd->server_id) if (thd->server_id)
e.server_id = thd->server_id; e.server_id = thd->server_id;
if (e.write(file)) if (e.write(file))
goto err; goto err;
} }
if (thd->convert_set) if (thd && thd->convert_set)
{ {
char buf[1024] = "SET CHARACTER SET "; char buf[1024] = "SET CHARACTER SET ";
char* p = strend(buf); char* p = strend(buf);
...@@ -795,42 +781,6 @@ bool MYSQL_LOG::write(IO_CACHE *cache) ...@@ -795,42 +781,6 @@ bool MYSQL_LOG::write(IO_CACHE *cache)
} }
bool MYSQL_LOG::write(Load_log_event* event_info)
{
bool error=0;
bool should_rotate = 0;
if (inited)
{
VOID(pthread_mutex_lock(&LOCK_log));
if (is_open())
{
THD *thd=event_info->thd;
if ((thd->options & OPTION_BIN_LOG) ||
!(thd->master_access & PROCESS_ACL))
{
event_info->set_log_seq(thd, this);
if (event_info->write(&log_file) || flush_io_cache(&log_file))
{
if (!write_error)
sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
error=write_error=1;
}
should_rotate = (my_b_tell(&log_file) >= max_binlog_size);
VOID(pthread_cond_broadcast(&COND_binlog_update));
}
}
if(should_rotate)
new_file(1); // inside mutex
VOID(pthread_mutex_unlock(&LOCK_log));
}
return error;
}
/* Write update log in a format suitable for incremental backup */ /* Write update log in a format suitable for incremental backup */
bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length, bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
......
This diff is collapsed.
This diff is collapsed.
...@@ -57,7 +57,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, ...@@ -57,7 +57,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
/* There is no file in net_reading */ /* There is no file in net_reading */
info->file= file; info->file= file;
info->pre_read = info->post_read = 0; info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
if (!cachesize) if (!cachesize)
if (! (cachesize= my_default_record_cache_size)) if (! (cachesize= my_default_record_cache_size))
DBUG_RETURN(1); /* No cache requested */ DBUG_RETURN(1); /* No cache requested */
...@@ -681,7 +682,10 @@ int flush_io_cache(IO_CACHE *info) ...@@ -681,7 +682,10 @@ int flush_io_cache(IO_CACHE *info)
int end_io_cache(IO_CACHE *info) int end_io_cache(IO_CACHE *info)
{ {
int error=0; int error=0;
IO_CACHE_CALLBACK pre_close;
DBUG_ENTER("end_io_cache"); DBUG_ENTER("end_io_cache");
if((pre_close=info->pre_close))
(*pre_close)(info);
if (info->buffer) if (info->buffer)
{ {
if (info->file != -1) /* File doesn't exist */ if (info->file != -1) /* File doesn't exist */
......
...@@ -223,7 +223,7 @@ static bool opt_log,opt_update_log,opt_bin_log,opt_slow_log,opt_noacl, ...@@ -223,7 +223,7 @@ static bool opt_log,opt_update_log,opt_bin_log,opt_slow_log,opt_noacl,
opt_ansi_mode=0,opt_myisam_log=0, opt_ansi_mode=0,opt_myisam_log=0,
opt_large_files=sizeof(my_off_t) > 4; opt_large_files=sizeof(my_off_t) > 4;
bool opt_sql_bin_update = 0, opt_log_slave_updates = 0, opt_safe_show_db=0, bool opt_sql_bin_update = 0, opt_log_slave_updates = 0, opt_safe_show_db=0,
opt_show_slave_auth_info = 0; opt_show_slave_auth_info = 0, opt_old_rpl_compat = 0;
FILE *bootstrap_file=0; FILE *bootstrap_file=0;
int segfaulted = 0; // ensure we do not enter SIGSEGV handler twice int segfaulted = 0; // ensure we do not enter SIGSEGV handler twice
extern MASTER_INFO glob_mi; extern MASTER_INFO glob_mi;
...@@ -718,6 +718,7 @@ void clean_up(bool print_message) ...@@ -718,6 +718,7 @@ void clean_up(bool print_message)
free_defaults(defaults_argv); free_defaults(defaults_argv);
my_free(charsets_list, MYF(MY_ALLOW_ZERO_PTR)); my_free(charsets_list, MYF(MY_ALLOW_ZERO_PTR));
my_free(mysql_tmpdir,MYF(0)); my_free(mysql_tmpdir,MYF(0));
my_free(slave_load_tmpdir,MYF(0));
x_free(opt_bin_logname); x_free(opt_bin_logname);
bitmap_free(&temp_pool); bitmap_free(&temp_pool);
free_max_user_conn(); free_max_user_conn();
...@@ -2518,7 +2519,8 @@ enum options { ...@@ -2518,7 +2519,8 @@ enum options {
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINK, OPT_REPORT_HOST, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINK, OPT_REPORT_HOST,
OPT_REPORT_USER, OPT_REPORT_PASSWORD, OPT_REPORT_PORT, OPT_REPORT_USER, OPT_REPORT_PASSWORD, OPT_REPORT_PORT,
OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL, OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL,
OPT_SHOW_SLAVE_AUTH_INFO}; OPT_SHOW_SLAVE_AUTH_INFO, OPT_OLD_RPL_COMPAT,
OPT_SLAVE_LOAD_TMPDIR};
static struct option long_options[] = { static struct option long_options[] = {
{"ansi", no_argument, 0, 'a'}, {"ansi", no_argument, 0, 'a'},
...@@ -2611,6 +2613,7 @@ static struct option long_options[] = { ...@@ -2611,6 +2613,7 @@ static struct option long_options[] = {
OPT_SAFEMALLOC_MEM_LIMIT}, OPT_SAFEMALLOC_MEM_LIMIT},
{"new", no_argument, 0, 'n'}, {"new", no_argument, 0, 'n'},
{"old-protocol", no_argument, 0, 'o'}, {"old-protocol", no_argument, 0, 'o'},
{"old-rpl-compat", no_argument, 0, (int)OPT_OLD_RPL_COMPAT},
#ifdef ONE_THREAD #ifdef ONE_THREAD
{"one-thread", no_argument, 0, (int) OPT_ONE_THREAD}, {"one-thread", no_argument, 0, (int) OPT_ONE_THREAD},
#endif #endif
...@@ -2659,6 +2662,7 @@ static struct option long_options[] = { ...@@ -2659,6 +2662,7 @@ static struct option long_options[] = {
{"skip-stack-trace", no_argument, 0, (int) OPT_SKIP_STACK_TRACE}, {"skip-stack-trace", no_argument, 0, (int) OPT_SKIP_STACK_TRACE},
{"skip-symlink", no_argument, 0, (int) OPT_SKIP_SYMLINK}, {"skip-symlink", no_argument, 0, (int) OPT_SKIP_SYMLINK},
{"skip-thread-priority", no_argument, 0, (int) OPT_SKIP_PRIOR}, {"skip-thread-priority", no_argument, 0, (int) OPT_SKIP_PRIOR},
{"slave-load-tmpdir", required_argument, 0, (int) OPT_SLAVE_LOAD_TMPDIR},
{"sql-bin-update-same", no_argument, 0, (int) OPT_SQL_BIN_UPDATE_SAME}, {"sql-bin-update-same", no_argument, 0, (int) OPT_SQL_BIN_UPDATE_SAME},
#include "sslopt-longopts.h" #include "sslopt-longopts.h"
#ifdef __WIN__ #ifdef __WIN__
...@@ -3311,6 +3315,12 @@ static void get_options(int argc,char **argv) ...@@ -3311,6 +3315,12 @@ static void get_options(int argc,char **argv)
safemalloc_mem_limit = atoi(optarg); safemalloc_mem_limit = atoi(optarg);
#endif #endif
break; break;
case OPT_SLAVE_LOAD_TMPDIR:
slave_load_tmpdir = my_strdup(optarg, MYF(MY_FAE));
break;
case OPT_OLD_RPL_COMPAT:
opt_old_rpl_compat = 1;
break;
case OPT_SHOW_SLAVE_AUTH_INFO: case OPT_SHOW_SLAVE_AUTH_INFO:
opt_show_slave_auth_info = 1; opt_show_slave_auth_info = 1;
break; break;
...@@ -4377,6 +4387,14 @@ static void fix_paths(void) ...@@ -4377,6 +4387,14 @@ static void fix_paths(void)
mysql_tmpdir=(char*) my_realloc(mysql_tmpdir,(uint) strlen(mysql_tmpdir)+1, mysql_tmpdir=(char*) my_realloc(mysql_tmpdir,(uint) strlen(mysql_tmpdir)+1,
MYF(MY_HOLD_ON_ERROR)); MYF(MY_HOLD_ON_ERROR));
} }
if (!slave_load_tmpdir)
{
int copy_len;
slave_load_tmpdir = (char*) my_malloc((copy_len=strlen(mysql_tmpdir) + 1)
, MYF(MY_FAE));
// no need to check return value, if we fail, my_malloc() never returns
memcpy(slave_load_tmpdir, mysql_tmpdir, copy_len);
}
} }
......
This diff is collapsed.
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
#define SLAVE_NET_TIMEOUT 3600 #define SLAVE_NET_TIMEOUT 3600
extern ulong slave_net_timeout; extern ulong slave_net_timeout;
extern char* slave_load_tmpdir;
typedef struct st_master_info typedef struct st_master_info
{ {
...@@ -70,6 +70,11 @@ typedef struct st_table_rule_ent ...@@ -70,6 +70,11 @@ typedef struct st_table_rule_ent
#define TABLE_RULE_HASH_SIZE 16 #define TABLE_RULE_HASH_SIZE 16
#define TABLE_RULE_ARR_SIZE 16 #define TABLE_RULE_ARR_SIZE 16
#define MAX_SLAVE_ERRMSG 1024
#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\
"FIRST")
int flush_master_info(MASTER_INFO* mi); int flush_master_info(MASTER_INFO* mi);
int register_slave_on_master(MYSQL* mysql); int register_slave_on_master(MYSQL* mysql);
...@@ -97,6 +102,10 @@ int add_table_rule(HASH* h, const char* table_spec); ...@@ -97,6 +102,10 @@ int add_table_rule(HASH* h, const char* table_spec);
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec); int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec);
void init_table_rule_hash(HASH* h, bool* h_inited); void init_table_rule_hash(HASH* h, bool* h_inited);
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited); void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited);
char* rewrite_db(char* db);
int check_expected_error(THD* thd, int error_code);
void skip_load_data_infile(NET* net);
void slave_print_error(int err_code, const char* msg, ...);
void end_slave(); // clean up void end_slave(); // clean up
int init_master_info(MASTER_INFO* mi); int init_master_info(MASTER_INFO* mi);
...@@ -109,6 +118,11 @@ extern uint32 slave_skip_counter; ...@@ -109,6 +118,11 @@ extern uint32 slave_skip_counter;
// we want to restart it skipping one or more events in the master log that // we want to restart it skipping one or more events in the master log that
// have caused errors, and have been manually applied by DBA already // have caused errors, and have been manually applied by DBA already
extern int last_slave_errno;
#ifndef DBUG_OFF
extern int events_till_abort;
#endif
extern char last_slave_error[MAX_SLAVE_ERRMSG];
extern pthread_t slave_real_id; extern pthread_t slave_real_id;
extern THD* slave_thd; extern THD* slave_thd;
extern MASTER_INFO glob_mi; extern MASTER_INFO glob_mi;
......
...@@ -121,6 +121,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0), ...@@ -121,6 +121,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
proc_info="login"; proc_info="login";
where="field list"; where="field list";
server_id = ::server_id; server_id = ::server_id;
slave_net = 0;
server_status=SERVER_STATUS_AUTOCOMMIT; server_status=SERVER_STATUS_AUTOCOMMIT;
update_lock_default= low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE; update_lock_default= low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE;
options=thd_startup_options; options=thd_startup_options;
......
...@@ -86,9 +86,7 @@ class MYSQL_LOG { ...@@ -86,9 +86,7 @@ class MYSQL_LOG {
bool write(THD *thd, enum enum_server_command command,const char *format,...); bool write(THD *thd, enum enum_server_command command,const char *format,...);
bool write(THD *thd, const char *query, uint query_length, bool write(THD *thd, const char *query, uint query_length,
time_t query_start=0); time_t query_start=0);
bool write(Query_log_event* event_info); // binary log write bool write(Log_event* event_info); // binary log write
bool write(Load_log_event* event_info);
bool write(Slave_log_event* event_info);
bool write(IO_CACHE *cache); bool write(IO_CACHE *cache);
int generate_new_name(char *new_name,const char *old_name); int generate_new_name(char *new_name,const char *old_name);
void make_log_name(char* buf, const char* log_ident); void make_log_name(char* buf, const char* log_ident);
...@@ -300,6 +298,8 @@ class THD :public ilink { ...@@ -300,6 +298,8 @@ class THD :public ilink {
ulong slave_proxy_id; // in slave thread we need to know in behalf of which ulong slave_proxy_id; // in slave thread we need to know in behalf of which
// thread the query is being run to replicate temp tables properly // thread the query is being run to replicate temp tables properly
NET* slave_net; // network connection from slave to master
THD(); THD();
~THD(); ~THD();
bool store_globals(); bool store_globals();
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "mysql_priv.h" #include "mysql_priv.h"
#include <my_dir.h> #include <my_dir.h>
#include <m_ctype.h> #include <m_ctype.h>
#include "sql_repl.h"
class READ_INFO { class READ_INFO {
File file; File file;
...@@ -32,6 +33,7 @@ class READ_INFO { ...@@ -32,6 +33,7 @@ class READ_INFO {
int field_term_char,line_term_char,enclosed_char,escape_char; int field_term_char,line_term_char,enclosed_char,escape_char;
int *stack,*stack_pos; int *stack,*stack_pos;
bool found_end_of_line,start_of_line,eof; bool found_end_of_line,start_of_line,eof;
bool need_end_io_cache;
IO_CACHE cache; IO_CACHE cache;
NET *io_net; NET *io_net;
...@@ -50,6 +52,18 @@ class READ_INFO { ...@@ -50,6 +52,18 @@ class READ_INFO {
char unescape(char chr); char unescape(char chr);
int terminator(char *ptr,uint length); int terminator(char *ptr,uint length);
bool find_start_of_fields(); bool find_start_of_fields();
// we need to force cache close before destructor is invoked to log
// the last read block
void end_io_cache()
{
::end_io_cache(&cache);
need_end_io_cache = 0;
}
// either this method, or we need to make cache public
// arg must be set from mysql_load() since constructor does not see
// either the table or THD value
void set_io_cache_arg(void* arg) { cache.arg = arg; }
}; };
static int read_fixed_length(THD *thd,COPY_INFO &info,TABLE *table, static int read_fixed_length(THD *thd,COPY_INFO &info,TABLE *table,
...@@ -67,10 +81,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -67,10 +81,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
File file; File file;
TABLE *table; TABLE *table;
int error; int error;
uint save_skip_lines = ex->skip_lines;
String *field_term=ex->field_term,*escaped=ex->escaped, String *field_term=ex->field_term,*escaped=ex->escaped,
*enclosed=ex->enclosed; *enclosed=ex->enclosed;
bool is_fifo=0; bool is_fifo=0;
LOAD_FILE_INFO lf_info;
char * db = table_list->db ? table_list->db : thd->db;
DBUG_ENTER("mysql_load"); DBUG_ENTER("mysql_load");
if (escaped->length() > 1 || enclosed->length() > 1) if (escaped->length() > 1 || enclosed->length() > 1)
...@@ -79,7 +94,6 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -79,7 +94,6 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
MYF(0)); MYF(0));
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (!(table = open_ltable(thd,table_list,lock_type))) if (!(table = open_ltable(thd,table_list,lock_type)))
DBUG_RETURN(-1); DBUG_RETURN(-1);
if (!fields.elements) if (!fields.elements)
...@@ -161,8 +175,9 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -161,8 +175,9 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (!my_stat(name,&stat_info,MYF(MY_WME))) if (!my_stat(name,&stat_info,MYF(MY_WME)))
DBUG_RETURN(-1); DBUG_RETURN(-1);
// the file must be: // if we are not in slave thread, the file must be:
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others if (!thd->slave_thread &&
!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
#ifndef __EMX__ #ifndef __EMX__
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
#endif #endif
...@@ -195,13 +210,27 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -195,13 +210,27 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
DBUG_RETURN(-1); // Can't allocate buffers DBUG_RETURN(-1); // Can't allocate buffers
} }
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
{
lf_info.thd = thd;
lf_info.ex = ex;
lf_info.db = db;
lf_info.table_name = table_list->real_name;
lf_info.fields = &fields;
lf_info.handle_dup = handle_duplicates;
lf_info.wrote_create_file = 0;
lf_info.last_pos_in_file = HA_POS_ERROR;
read_info.set_io_cache_arg((void*)&lf_info);
}
restore_record(table,2); restore_record(table,2);
thd->count_cuted_fields=1; /* calc cuted fields */ thd->count_cuted_fields=1; /* calc cuted fields */
thd->cuted_fields=0L; thd->cuted_fields=0L;
if (ex->line_term->length() && field_term->length()) if (ex->line_term->length() && field_term->length())
{ {
while (ex->skip_lines--) // ex->skip_lines needs to be preserved for logging
uint skip_lines = ex->skip_lines;
while (skip_lines--)
{ {
if (read_info.next_line()) if (read_info.next_line())
break; break;
...@@ -240,7 +269,14 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -240,7 +269,14 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
table->copy_blobs=0; table->copy_blobs=0;
thd->count_cuted_fields=0; /* Don`t calc cuted fields */ thd->count_cuted_fields=0; /* Don`t calc cuted fields */
if (error) if (error)
{
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
{
Delete_file_log_event d(thd);
mysql_bin_log.write(&d);
}
DBUG_RETURN(-1); // Error on read DBUG_RETURN(-1); // Error on read
}
sprintf(name,ER(ER_LOAD_INFO),info.records,info.deleted, sprintf(name,ER(ER_LOAD_INFO),info.records,info.deleted,
info.records-info.copied,thd->cuted_fields); info.records-info.copied,thd->cuted_fields);
send_ok(&thd->net,info.copied+info.deleted,0L,name); send_ok(&thd->net,info.copied+info.deleted,0L,name);
...@@ -250,13 +286,21 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -250,13 +286,21 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (!table->file->has_transactions()) if (!table->file->has_transactions())
thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
if (!read_file_from_client && mysql_bin_log.is_open()) if (mysql_bin_log.is_open())
{
if (opt_old_rpl_compat && !read_file_from_client)
{ {
ex->skip_lines = save_skip_lines; Load_log_event qinfo(thd, ex, db, table->table_name, fields,
Load_log_event qinfo(thd, ex, table->table_name, fields,
handle_duplicates); handle_duplicates);
mysql_bin_log.write(&qinfo); mysql_bin_log.write(&qinfo);
} }
if (!opt_old_rpl_compat)
{
read_info.end_io_cache(); // make sure last block gets logged
Execute_load_log_event e(thd);
mysql_bin_log.write(&e);
}
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -480,6 +524,13 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term, ...@@ -480,6 +524,13 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term,
my_free((gptr) buffer,MYF(0)); /* purecov: inspected */ my_free((gptr) buffer,MYF(0)); /* purecov: inspected */
error=1; error=1;
} }
else
{
need_end_io_cache = 1;
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
cache.pre_read = cache.pre_close =
(IO_CACHE_CALLBACK)log_loaded_block;
}
} }
} }
...@@ -488,7 +539,8 @@ READ_INFO::~READ_INFO() ...@@ -488,7 +539,8 @@ READ_INFO::~READ_INFO()
{ {
if (!error) if (!error)
{ {
end_io_cache(&cache); if (need_end_io_cache)
::end_io_cache(&cache);
my_free((gptr) buffer,MYF(0)); my_free((gptr) buffer,MYF(0));
error=1; error=1;
} }
...@@ -798,3 +850,4 @@ bool READ_INFO::find_start_of_fields() ...@@ -798,3 +850,4 @@ bool READ_INFO::find_start_of_fields()
} }
return 0; return 0;
} }
...@@ -1578,3 +1578,33 @@ int load_master_data(THD* thd) ...@@ -1578,3 +1578,33 @@ int load_master_data(THD* thd)
return error; return error;
} }
int log_loaded_block(IO_CACHE* file)
{
LOAD_FILE_INFO* lf_info;
uint block_len ;
if (!(block_len = file->rc_end - file->buffer))
return 0;
lf_info = (LOAD_FILE_INFO*)file->arg;
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
lf_info->last_pos_in_file >= file->pos_in_file)
return 0;
lf_info->last_pos_in_file = file->pos_in_file;
if (lf_info->wrote_create_file)
{
Append_block_log_event a(lf_info->thd, file->buffer,block_len);
mysql_bin_log.write(&a);
}
else
{
Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db,
lf_info->table_name, *lf_info->fields,
lf_info->handle_dup, file->buffer,
block_len);
mysql_bin_log.write(&c);
lf_info->wrote_create_file = 1;
}
return 0;
}
...@@ -12,7 +12,7 @@ typedef struct st_slave_info ...@@ -12,7 +12,7 @@ typedef struct st_slave_info
uint16 port; uint16 port;
} SLAVE_INFO; } SLAVE_INFO;
extern bool opt_show_slave_auth_info; extern bool opt_show_slave_auth_info, opt_old_rpl_compat;
extern HASH slave_list; extern HASH slave_list;
extern char* master_host; extern char* master_host;
extern my_string opt_bin_logname, master_info_file; extern my_string opt_bin_logname, master_info_file;
...@@ -51,4 +51,19 @@ int show_binlogs(THD* thd); ...@@ -51,4 +51,19 @@ int show_binlogs(THD* thd);
extern int init_master_info(MASTER_INFO* mi); extern int init_master_info(MASTER_INFO* mi);
void kill_zombie_dump_threads(uint32 slave_server_id); void kill_zombie_dump_threads(uint32 slave_server_id);
typedef struct st_load_file_info
{
THD* thd;
sql_exchange* ex;
List <Item> *fields;
enum enum_duplicates handle_dup;
char* db;
char* table_name;
bool wrote_create_file;
my_off_t last_pos_in_file;
} LOAD_FILE_INFO;
int log_loaded_block(IO_CACHE* file);
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment