Commit 0dab9f40 authored by unknown's avatar unknown

LOAD DATA INFILE is now replicated properly, except for cleanup on

Stop event and bugs the test suite could not catch
Did some big restructuring of binlog event classes - most important
change is that now each event class has exec_event method and one does
not need to modify slave core code to add a new event. Slave code is
now much smaller and easier to read



include/my_sys.h:
  pre_code and arg in IO_CACHE
mysql-test/r/rpl_log.result:
  updated result for LOAD DATA INFILE fix
mysys/mf_iocache.c:
  pre_close routine and arg pointer for callback magic
sql/log.cc:
  changed MYSQL_LOG so that write() method is for generic
  Log_event - removed redundant code
sql/log_event.cc:
  added classes for file events
  added exec_event() method to all classes
  restructured/cleaned up event classes
sql/log_event.h:
  added classes for file events
  added exec_event() method to all classes
  restructured/cleaned up event classes
sql/mf_iocache.cc:
  pre_close/arg
sql/mysqld.cc:
  added slave-load-tmpdir and old-rpl-compat options
sql/slave.cc:
  changed exec_event() to use Log_event::exec_event()
  some routines are now needed in log_event.cc and cannot be static/inline
  general cleanup
sql/slave.h:
  some routines are now extern because they are called from log_event.cc
sql/sql_class.cc:
  added slave_net
sql/sql_class.h:
  added slave_net to THD
  MYSQL_LOG::write now handles generic Log_event
sql/sql_load.cc:
  changes for new handling of LOAD DATA INFILE replication
sql/sql_repl.cc:
  added log_loaded_block() callback for IO_CACHE
sql/sql_repl.h:
  added structure to pass args to IO_CACHE callback from mysql_load
parent 07ed42de
...@@ -281,6 +281,8 @@ typedef struct st_io_cache /* Used when cacheing files */ ...@@ -281,6 +281,8 @@ typedef struct st_io_cache /* Used when cacheing files */
/* callbacks when the actual read I/O happens */ /* callbacks when the actual read I/O happens */
IO_CACHE_CALLBACK pre_read; IO_CACHE_CALLBACK pre_read;
IO_CACHE_CALLBACK post_read; IO_CACHE_CALLBACK post_read;
IO_CACHE_CALLBACK pre_close;
void* arg; /* for use by pre/post_read */
char *file_name; /* if used with 'open_cached_file' */ char *file_name; /* if used with 'open_cached_file' */
char *dir,*prefix; char *dir,*prefix;
File file; File file;
......
...@@ -5,8 +5,9 @@ master-bin.001 172 Intvar 1 3 INSERT_ID=1 ...@@ -5,8 +5,9 @@ master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
master-bin.001 263 Query 1 5 use test; drop table t1 master-bin.001 263 Query 1 5 use test; drop table t1
master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null) master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null)
master-bin.001 386 Load 1 7 use test; LOAD DATA INFILE '../../std_data/words.dat' INTO TABLE t1 FIELDS TERMINATED BY '\\t' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' (word) master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=11;block_len=81
master-bin.001 468 Query 1 8 use test; drop table t1 master-bin.001 554 Exec_load 1 8 ;file_id=11
master-bin.001 577 Query 1 9 use test; drop table t1
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
...@@ -21,10 +22,11 @@ master-bin.001 172 Intvar 1 3 INSERT_ID=1 ...@@ -21,10 +22,11 @@ master-bin.001 172 Intvar 1 3 INSERT_ID=1
master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL)
master-bin.001 263 Query 1 5 use test; drop table t1 master-bin.001 263 Query 1 5 use test; drop table t1
master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null) master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null)
master-bin.001 386 Load 1 7 use test; LOAD DATA INFILE '../../std_data/words.dat' INTO TABLE t1 FIELDS TERMINATED BY '\\t' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' (word) master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=11;block_len=81
master-bin.001 468 Query 1 8 use test; drop table t1 master-bin.001 554 Exec_load 1 8 ;file_id=11
master-bin.001 516 Rotate 1 9 master-bin.002;pos=4 master-bin.001 577 Query 1 9 use test; drop table t1
master-bin.001 557 Stop 1 10 master-bin.001 625 Rotate 1 10 master-bin.002;pos=4
master-bin.001 666 Stop 1 11
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2
master-bin.002 79 Query 1 2 use test; create table t1 (n int) master-bin.002 79 Query 1 2 use test; create table t1 (n int)
...@@ -38,18 +40,20 @@ slave-bin.001 ...@@ -38,18 +40,20 @@ slave-bin.001
slave-bin.002 slave-bin.002
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2
slave-bin.001 79 Slave 2 2 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4 slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4
slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key) slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key)
slave-bin.001 225 Intvar 1 3 INSERT_ID=1 slave-bin.001 225 Intvar 1 3 INSERT_ID=1
slave-bin.001 253 Query 1 4 use test; insert into t1 values (NULL) slave-bin.001 253 Query 1 4 use test; insert into t1 values (NULL)
slave-bin.001 316 Query 1 5 use test; drop table t1 slave-bin.001 316 Query 1 5 use test; drop table t1
slave-bin.001 364 Query 1 6 use test; create table t1 (word char(20) not null) slave-bin.001 364 Query 1 6 use test; create table t1 (word char(20) not null)
slave-bin.001 439 Query 1 8 use test; drop table t1 slave-bin.001 439 Create_file 1 7 db=test;table=t1;file_id=11;block_len=81
slave-bin.001 487 Rotate 2 3 slave-bin.002;pos=4; forced by master slave-bin.001 647 Exec_load 1 8 ;file_id=11
slave-bin.001 527 Stop 2 4 slave-bin.001 670 Query 1 9 use test; drop table t1
slave-bin.001 718 Rotate 1 4 slave-bin.002;pos=4; forced by master
slave-bin.001 758 Stop 2 5
Log_name Pos Event_type Server_id Log_seq Info Log_name Pos Event_type Server_id Log_seq Info
slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2
slave-bin.002 79 Slave 2 2 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4 slave-bin.002 79 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4
slave-bin.002 132 Query 1 2 use test; create table t1 (n int) slave-bin.002 132 Query 1 2 use test; create table t1 (n int)
slave-bin.002 190 Query 1 3 use test; insert into t1 values (1) slave-bin.002 190 Query 1 3 use test; insert into t1 values (1)
slave-bin.002 250 Query 1 4 use test; drop table t1 slave-bin.002 250 Query 1 4 use test; drop table t1
......
...@@ -56,7 +56,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, ...@@ -56,7 +56,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset)); DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset));
info->file=file; info->file=file;
info->pre_read = info->post_read = 0; info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
if (!cachesize) if (!cachesize)
if (! (cachesize= my_default_record_cache_size)) if (! (cachesize= my_default_record_cache_size))
DBUG_RETURN(1); /* No cache requested */ DBUG_RETURN(1); /* No cache requested */
...@@ -608,7 +609,10 @@ int flush_io_cache(IO_CACHE *info) ...@@ -608,7 +609,10 @@ int flush_io_cache(IO_CACHE *info)
int end_io_cache(IO_CACHE *info) int end_io_cache(IO_CACHE *info)
{ {
int error=0; int error=0;
IO_CACHE_CALLBACK pre_close;
DBUG_ENTER("end_io_cache"); DBUG_ENTER("end_io_cache");
if((pre_close=info->pre_close))
(*pre_close)(info);
if (info->buffer) if (info->buffer)
{ {
if (info->file != -1) /* File doesn't exist */ if (info->file != -1) /* File doesn't exist */
...@@ -618,3 +622,4 @@ int end_io_cache(IO_CACHE *info) ...@@ -618,3 +622,4 @@ int end_io_cache(IO_CACHE *info)
} }
DBUG_RETURN(error); DBUG_RETURN(error);
} /* end_io_cache */ } /* end_io_cache */
...@@ -535,8 +535,8 @@ void MYSQL_LOG::new_file(bool inside_mutex) ...@@ -535,8 +535,8 @@ void MYSQL_LOG::new_file(bool inside_mutex)
We log the whole file name for log file as the user may decide We log the whole file name for log file as the user may decide
to change base names at some point. to change base names at some point.
*/ */
Rotate_log_event r(new_name+dirname_length(new_name));
THD* thd = current_thd; THD* thd = current_thd;
Rotate_log_event r(thd,new_name+dirname_length(new_name));
r.set_log_seq(0, this); r.set_log_seq(0, this);
// this log rotation could have been initiated by a master of // this log rotation could have been initiated by a master of
// the slave running with log-bin // the slave running with log-bin
...@@ -638,24 +638,8 @@ bool MYSQL_LOG::write(THD *thd,enum enum_server_command command, ...@@ -638,24 +638,8 @@ bool MYSQL_LOG::write(THD *thd,enum enum_server_command command,
return 0; return 0;
} }
/* Write to binary log in a format to be used for replication */
bool MYSQL_LOG::write(Slave_log_event* event_info) bool MYSQL_LOG::write(Log_event* event_info)
{
bool error;
if (!inited) // Can't use mutex if not init
return 0;
VOID(pthread_mutex_lock(&LOCK_log));
if(!event_info->log_seq)
event_info->set_log_seq(current_thd, this);
error = event_info->write(&log_file);
flush_io_cache(&log_file);
VOID(pthread_mutex_unlock(&LOCK_log));
return error;
}
bool MYSQL_LOG::write(Query_log_event* event_info)
{ {
/* In most cases this is only called if 'is_open()' is true */ /* In most cases this is only called if 'is_open()' is true */
bool error=0; bool error=0;
...@@ -667,40 +651,42 @@ bool MYSQL_LOG::write(Query_log_event* event_info) ...@@ -667,40 +651,42 @@ bool MYSQL_LOG::write(Query_log_event* event_info)
if (is_open()) if (is_open())
{ {
THD *thd=event_info->thd; THD *thd=event_info->thd;
const char* db = event_info->get_db();
#ifdef USING_TRANSACTIONS #ifdef USING_TRANSACTIONS
IO_CACHE *file = (event_info->cache_stmt ? &thd->transaction.trans_log : IO_CACHE *file = ((event_info->cache_stmt && thd) ?
&thd->transaction.trans_log :
&log_file); &log_file);
#else #else
IO_CACHE *file = &log_file; IO_CACHE *file = &log_file;
#endif #endif
if ((!(thd->options & OPTION_BIN_LOG) && if ((thd && !(thd->options & OPTION_BIN_LOG) &&
(thd->master_access & PROCESS_ACL)) || (thd->master_access & PROCESS_ACL)) ||
!db_ok(event_info->db, binlog_do_db, binlog_ignore_db)) (db && !db_ok(db, binlog_do_db, binlog_ignore_db)))
{ {
VOID(pthread_mutex_unlock(&LOCK_log)); VOID(pthread_mutex_unlock(&LOCK_log));
return 0; return 0;
} }
error=1; error=1;
if (thd->last_insert_id_used) if (thd && thd->last_insert_id_used)
{ {
Intvar_log_event e((uchar)LAST_INSERT_ID_EVENT, thd->last_insert_id); Intvar_log_event e(thd,(uchar)LAST_INSERT_ID_EVENT,thd->last_insert_id);
e.set_log_seq(thd, this); e.set_log_seq(thd, this);
if (thd->server_id) if (thd->server_id)
e.server_id = thd->server_id; e.server_id = thd->server_id;
if (e.write(file)) if (e.write(file))
goto err; goto err;
} }
if (thd->insert_id_used) if (thd && thd->insert_id_used)
{ {
Intvar_log_event e((uchar)INSERT_ID_EVENT, thd->last_insert_id); Intvar_log_event e(thd,(uchar)INSERT_ID_EVENT,thd->last_insert_id);
e.set_log_seq(thd, this); e.set_log_seq(thd, this);
if (thd->server_id) if (thd->server_id)
e.server_id = thd->server_id; e.server_id = thd->server_id;
if (e.write(file)) if (e.write(file))
goto err; goto err;
} }
if (thd->convert_set) if (thd && thd->convert_set)
{ {
char buf[1024] = "SET CHARACTER SET "; char buf[1024] = "SET CHARACTER SET ";
char* p = strend(buf); char* p = strend(buf);
...@@ -795,42 +781,6 @@ bool MYSQL_LOG::write(IO_CACHE *cache) ...@@ -795,42 +781,6 @@ bool MYSQL_LOG::write(IO_CACHE *cache)
} }
bool MYSQL_LOG::write(Load_log_event* event_info)
{
bool error=0;
bool should_rotate = 0;
if (inited)
{
VOID(pthread_mutex_lock(&LOCK_log));
if (is_open())
{
THD *thd=event_info->thd;
if ((thd->options & OPTION_BIN_LOG) ||
!(thd->master_access & PROCESS_ACL))
{
event_info->set_log_seq(thd, this);
if (event_info->write(&log_file) || flush_io_cache(&log_file))
{
if (!write_error)
sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
error=write_error=1;
}
should_rotate = (my_b_tell(&log_file) >= max_binlog_size);
VOID(pthread_cond_broadcast(&COND_binlog_update));
}
}
if(should_rotate)
new_file(1); // inside mutex
VOID(pthread_mutex_unlock(&LOCK_log));
}
return error;
}
/* Write update log in a format suitable for incremental backup */ /* Write update log in a format suitable for incremental backup */
bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length, bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
......
...@@ -63,6 +63,18 @@ static void pretty_print_char(String* packet, int c) ...@@ -63,6 +63,18 @@ static void pretty_print_char(String* packet, int c)
packet->append('\''); packet->append('\'');
} }
static inline char* slave_load_file_stem(char*buf, uint file_id,
int event_server_id)
{
fn_format(buf,"SQL_LOAD-",slave_load_tmpdir,"",4+32);
buf = strend(buf);
buf = int10_to_str(::server_id, buf, 10);
*buf++ = '-';
buf = int10_to_str(event_server_id, buf, 10);
*buf++ = '-';
return int10_to_str(file_id, buf, 10);
}
#endif #endif
const char* Log_event::get_type_str() const char* Log_event::get_type_str()
...@@ -76,11 +88,59 @@ const char* Log_event::get_type_str() ...@@ -76,11 +88,59 @@ const char* Log_event::get_type_str()
case INTVAR_EVENT: return "Intvar"; case INTVAR_EVENT: return "Intvar";
case LOAD_EVENT: return "Load"; case LOAD_EVENT: return "Load";
case SLAVE_EVENT: return "Slave"; case SLAVE_EVENT: return "Slave";
case CREATE_FILE_EVENT: return "Create_file";
case APPEND_BLOCK_EVENT: return "Append_block";
case DELETE_FILE_EVENT: return "Delete_file";
case EXEC_LOAD_EVENT: return "Exec_load";
default: /* impossible */ return "Unknown"; default: /* impossible */ return "Unknown";
} }
} }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg):
exec_time(0),
flags(flags_arg),cached_event_len(0),
temp_buf(0),thd(thd_arg)
{
if (thd)
{
server_id = thd->server_id;
log_seq = thd->log_seq;
when = thd->start_time;
}
else
{
server_id = ::server_id;
log_seq = 0;
when = time(NULL);
}
}
#endif
Log_event::Log_event(const char* buf):cached_event_len(0),temp_buf(0)
{
when = uint4korr(buf);
server_id = uint4korr(buf + SERVER_ID_OFFSET);
log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
flags = uint2korr(buf + FLAGS_OFFSET);
#ifndef MYSQL_CLIENT
thd = 0;
#endif
}
#ifndef MYSQL_CLIENT
int Log_event::exec_event(struct st_master_info* mi)
{
if (mi)
{
thd->log_seq = 0;
mi->inc_pos(get_event_len(), log_seq);
flush_master_info(mi);
}
return 0;
}
void Log_event::pack_info(String* packet) void Log_event::pack_info(String* packet)
{ {
...@@ -131,7 +191,7 @@ void Load_log_event::pack_info(String* packet) ...@@ -131,7 +191,7 @@ void Load_log_event::pack_info(String* packet)
} }
tmp.append("LOAD DATA INFILE '"); tmp.append("LOAD DATA INFILE '");
tmp.append(fname); tmp.append(fname, fname_len);
tmp.append("' ", 2); tmp.append("' ", 2);
if(sql_ex.opt_flags && REPLACE_FLAG ) if(sql_ex.opt_flags && REPLACE_FLAG )
tmp.append(" REPLACE "); tmp.append(" REPLACE ");
...@@ -385,12 +445,15 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock) ...@@ -385,12 +445,15 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock)
error = "read error"; error = "read error";
goto err; goto err;
} }
res = read_log_event(buf, data_len); if((res = read_log_event(buf, data_len)))
res->register_temp_buf(buf);
err: err:
if (log_lock) pthread_mutex_unlock(log_lock); if (log_lock) pthread_mutex_unlock(log_lock);
if(error) if(error)
{
sql_print_error(error); sql_print_error(error);
my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
}
return res; return res;
} }
...@@ -400,61 +463,54 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len) ...@@ -400,61 +463,54 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len)
(uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET)) (uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET))
return NULL; // general sanity check - will fail on a partial read return NULL; // general sanity check - will fail on a partial read
Log_event* ev = NULL;
switch(buf[EVENT_TYPE_OFFSET]) switch(buf[EVENT_TYPE_OFFSET])
{ {
case QUERY_EVENT: case QUERY_EVENT:
{ ev = new Query_log_event(buf, event_len);
Query_log_event* q = new Query_log_event(buf, event_len); break;
if (!q->query)
{
delete q;
return NULL;
}
return q;
}
case LOAD_EVENT: case LOAD_EVENT:
{ ev = new Load_log_event(buf, event_len);
Load_log_event* l = new Load_log_event(buf, event_len); break;
if (!l->table_name)
{
delete l;
return NULL;
}
return l;
}
case ROTATE_EVENT: case ROTATE_EVENT:
{ ev = new Rotate_log_event(buf, event_len);
Rotate_log_event* r = new Rotate_log_event(buf, event_len); break;
if (!r->new_log_ident)
{
delete r;
return NULL;
}
return r;
}
case SLAVE_EVENT: case SLAVE_EVENT:
{ ev = new Slave_log_event(buf, event_len);
Slave_log_event* s = new Slave_log_event(buf, event_len); break;
if (!s->master_host) case CREATE_FILE_EVENT:
{ ev = new Create_file_log_event(buf, event_len);
delete s; break;
return NULL; case APPEND_BLOCK_EVENT:
} ev = new Append_block_log_event(buf, event_len);
break;
return s; case DELETE_FILE_EVENT:
} ev = new Delete_file_log_event(buf, event_len);
case START_EVENT: return new Start_log_event(buf); break;
case STOP_EVENT: return new Stop_log_event(buf); case EXEC_LOAD_EVENT:
case INTVAR_EVENT: return new Intvar_log_event(buf); ev = new Execute_load_log_event(buf, event_len);
break;
case START_EVENT:
ev = new Start_log_event(buf);
break;
case STOP_EVENT:
ev = new Stop_log_event(buf);
break;
case INTVAR_EVENT:
ev = new Intvar_log_event(buf);
break;
default: default:
break; break;
} }
return NULL; // default value if (!ev) return 0;
if (!ev->is_valid())
{
delete ev;
return 0;
}
ev->cached_event_len = event_len;
return ev;
} }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
...@@ -568,6 +624,23 @@ int Rotate_log_event::write_data(IO_CACHE* file) ...@@ -568,6 +624,23 @@ int Rotate_log_event::write_data(IO_CACHE* file)
my_b_write(file, (byte*)new_log_ident, (uint) ident_len); my_b_write(file, (byte*)new_log_ident, (uint) ident_len);
} }
#ifndef MYSQL_CLIENT
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
bool using_trans):
Log_event(thd_arg), data_buf(0), query(query_arg), db(thd_arg->db),
q_len(thd_arg->query_length),
error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno),
thread_id(thd_arg->thread_id),
cache_stmt(using_trans &&
(thd_arg->options & (OPTION_NOT_AUTO_COMMIT | OPTION_BEGIN)))
{
time_t end_time;
time(&end_time);
exec_time = (ulong) (end_time - thd->start_time);
db_len = (db) ? (uint32) strlen(db) : 0;
}
#endif
Query_log_event::Query_log_event(const char* buf, int event_len): Query_log_event::Query_log_event(const char* buf, int event_len):
Log_event(buf),data_buf(0), query(NULL), db(NULL) Log_event(buf),data_buf(0), query(NULL), db(NULL)
{ {
...@@ -690,7 +763,7 @@ void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) ...@@ -690,7 +763,7 @@ void Intvar_log_event::print(FILE* file, bool short_form, char* last_db)
} }
#endif #endif
int Load_log_event::write_data(IO_CACHE* file) int Load_log_event::write_data_header(IO_CACHE* file)
{ {
char buf[LOAD_HEADER_LEN]; char buf[LOAD_HEADER_LEN];
int4store(buf + L_THREAD_ID_OFFSET, thread_id); int4store(buf + L_THREAD_ID_OFFSET, thread_id);
...@@ -699,45 +772,110 @@ int Load_log_event::write_data(IO_CACHE* file) ...@@ -699,45 +772,110 @@ int Load_log_event::write_data(IO_CACHE* file)
buf[L_TBL_LEN_OFFSET] = (char)table_name_len; buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
buf[L_DB_LEN_OFFSET] = (char)db_len; buf[L_DB_LEN_OFFSET] = (char)db_len;
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
memcpy(buf + L_SQL_EX_OFFSET, &sql_ex, sizeof(sql_ex));
if(my_b_write(file, (byte*)buf, sizeof(buf)) || return my_b_write(file, (byte*)buf, LOAD_HEADER_LEN);
my_b_write(file, (byte*)&sql_ex, sizeof(sql_ex))) }
return 1;
int Load_log_event::write_data_body(IO_CACHE* file)
{
if (num_fields && fields && field_lens) if (num_fields && fields && field_lens)
{ {
if(my_b_write(file, (byte*)field_lens, num_fields) || if(my_b_write(file, (byte*)field_lens, num_fields) ||
my_b_write(file, (byte*)fields, field_block_len)) my_b_write(file, (byte*)fields, field_block_len))
return 1; return 1;
} }
if(my_b_write(file, (byte*)table_name, table_name_len + 1) || return my_b_write(file, (byte*)table_name, table_name_len + 1) ||
my_b_write(file, (byte*)db, db_len + 1) || my_b_write(file, (byte*)db, db_len + 1) ||
my_b_write(file, (byte*)fname, fname_len)) my_b_write(file, (byte*)fname, fname_len);
return 1;
return 0;
} }
#ifndef MYSQL_CLIENT
Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
const char* db_arg, const char* table_name_arg,
List<Item>& fields_arg, enum enum_duplicates handle_dup):
Log_event(thd),thread_id(thd->thread_id),
num_fields(0),fields(0),field_lens(0),field_block_len(0),
table_name(table_name_arg),
db(db_arg),
fname(ex->file_name),fname_null_term(1)
{
time_t end_time;
time(&end_time);
exec_time = (ulong) (end_time - thd->start_time);
db_len = (db) ? (uint32) strlen(db) : 0;
table_name_len = (table_name) ? (uint32) strlen(table_name) : 0;
fname_len = (fname) ? (uint) strlen(fname) : 0;
sql_ex.field_term = (*ex->field_term)[0];
sql_ex.enclosed = (*ex->enclosed)[0];
sql_ex.line_term = (*ex->line_term)[0];
sql_ex.line_start = (*ex->line_start)[0];
sql_ex.escaped = (*ex->escaped)[0];
sql_ex.opt_flags = 0;
if(ex->dumpfile)
sql_ex.opt_flags |= DUMPFILE_FLAG;
if(ex->opt_enclosed)
sql_ex.opt_flags |= OPT_ENCLOSED_FLAG;
sql_ex.empty_flags = 0;
switch(handle_dup)
{
case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break;
case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break;
case DUP_ERROR: break;
}
if(!ex->field_term->length())
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
if(!ex->enclosed->length())
sql_ex.empty_flags |= ENCLOSED_EMPTY;
if(!ex->line_term->length())
sql_ex.empty_flags |= LINE_TERM_EMPTY;
if(!ex->line_start->length())
sql_ex.empty_flags |= LINE_START_EMPTY;
if(!ex->escaped->length())
sql_ex.empty_flags |= ESCAPED_EMPTY;
skip_lines = ex->skip_lines;
List_iterator<Item> li(fields_arg);
field_lens_buf.length(0);
fields_buf.length(0);
Item* item;
while((item = li++))
{
num_fields++;
uchar len = (uchar) strlen(item->name);
field_block_len += len + 1;
fields_buf.append(item->name, len + 1);
field_lens_buf.append((char*)&len, 1);
}
field_lens = (const uchar*)field_lens_buf.ptr();
fields = fields_buf.ptr();
}
#endif
Load_log_event::Load_log_event(const char* buf, int event_len): Load_log_event::Load_log_event(const char* buf, int event_len):
Log_event(buf),data_buf(0),num_fields(0),fields(0), Log_event(buf),num_fields(0),fields(0),
field_lens(0),field_block_len(0), field_lens(0),field_block_len(0),
table_name(0),db(0),fname(0) table_name(0),db(0),fname(0),fname_null_term(0)
{ {
uint data_len; if (!event_len) // derived class, will call copy_log_event() itself
if((uint)event_len < (LOAD_EVENT_OVERHEAD + LOG_EVENT_HEADER_LEN))
return;
memcpy(&sql_ex, buf + LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN,
sizeof(sql_ex));
data_len = event_len - LOAD_HEADER_LEN - LOG_EVENT_HEADER_LEN -
sizeof(sql_ex);
if(!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME))))
return; return;
memcpy(data_buf, buf +LOG_EVENT_HEADER_LEN + LOAD_HEADER_LEN copy_log_event(buf, event_len);
+ sizeof(sql_ex), data_len);
copy_log_event(buf, data_len);
} }
void Load_log_event::copy_log_event(const char *buf, ulong data_len) int Load_log_event::copy_log_event(const char *buf, ulong event_len)
{ {
uint data_len;
int body_offset = get_data_body_offset();
if((int)event_len < body_offset)
return 1;
memcpy(&sql_ex, buf + L_SQL_EX_OFFSET + LOG_EVENT_HEADER_LEN,
sizeof(sql_ex));
data_len = event_len - body_offset;
thread_id = uint4korr(buf + L_THREAD_ID_OFFSET + LOG_EVENT_HEADER_LEN); thread_id = uint4korr(buf + L_THREAD_ID_OFFSET + LOG_EVENT_HEADER_LEN);
exec_time = uint4korr(buf + L_EXEC_TIME_OFFSET + LOG_EVENT_HEADER_LEN); exec_time = uint4korr(buf + L_EXEC_TIME_OFFSET + LOG_EVENT_HEADER_LEN);
skip_lines = uint4korr(buf + L_SKIP_LINES_OFFSET + LOG_EVENT_HEADER_LEN); skip_lines = uint4korr(buf + L_SKIP_LINES_OFFSET + LOG_EVENT_HEADER_LEN);
...@@ -746,22 +884,21 @@ void Load_log_event::copy_log_event(const char *buf, ulong data_len) ...@@ -746,22 +884,21 @@ void Load_log_event::copy_log_event(const char *buf, ulong data_len)
num_fields = uint4korr(buf + L_NUM_FIELDS_OFFSET + LOG_EVENT_HEADER_LEN); num_fields = uint4korr(buf + L_NUM_FIELDS_OFFSET + LOG_EVENT_HEADER_LEN);
if (num_fields > data_len) // simple sanity check against corruption if (num_fields > data_len) // simple sanity check against corruption
return; return 1;
field_lens = (uchar*)buf + body_offset;
field_lens = (uchar*) data_buf;
uint i; uint i;
for (i = 0; i < num_fields; i++) for (i = 0; i < num_fields; i++)
{ {
field_block_len += (uint)field_lens[i] + 1; field_block_len += (uint)field_lens[i] + 1;
} }
fields = (char*)field_lens + num_fields; fields = (char*)field_lens + num_fields;
*((char*)data_buf+data_len) = 0;
table_name = fields + field_block_len; table_name = fields + field_block_len;
db = table_name + table_name_len + 1; db = table_name + table_name_len + 1;
fname = db + db_len + 1; fname = db + db_len + 1;
fname_len = data_len - 2 - db_len - table_name_len - num_fields - fname_len = (get_type_code() == LOAD_EVENT) ?
field_block_len; data_len - 2 - db_len - table_name_len - num_fields - field_block_len :
strlen(fname);
return 0;
} }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
...@@ -786,7 +923,7 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) ...@@ -786,7 +923,7 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
if(db && db[0] && !same_db) if(db && db[0] && !same_db)
fprintf(file, "use %s;\n", db); fprintf(file, "use %s;\n", db);
fprintf(file, "LOAD DATA INFILE '%s' ", fname); fprintf(file, "LOAD DATA INFILE '%-*s' ", fname_len, fname);
if(sql_ex.opt_flags && REPLACE_FLAG ) if(sql_ex.opt_flags && REPLACE_FLAG )
fprintf(file," REPLACE "); fprintf(file," REPLACE ");
...@@ -870,9 +1007,8 @@ void Load_log_event::set_fields(List<Item> &fields) ...@@ -870,9 +1007,8 @@ void Load_log_event::set_fields(List<Item> &fields)
} }
Slave_log_event::Slave_log_event(THD* thd_arg,MASTER_INFO* mi): Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi):
Log_event(thd_arg->start_time, 0, 1, thd_arg->server_id), Log_event(thd_arg),mem_pool(0),master_host(0)
mem_pool(0),master_host(0)
{ {
if(!mi->inited) if(!mi->inited)
return; return;
...@@ -947,7 +1083,6 @@ void Slave_log_event::init_from_mem_pool(int data_size) ...@@ -947,7 +1083,6 @@ void Slave_log_event::init_from_mem_pool(int data_size)
master_host = 0; master_host = 0;
return; return;
} }
master_log_len = strlen(master_log); master_log_len = strlen(master_log);
} }
...@@ -965,43 +1100,59 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len): ...@@ -965,43 +1100,59 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len):
} }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Create_file_log_event::Create_file_log_event(THD* thd, TABLE_LIST * table, Create_file_log_event::Create_file_log_event(THD* thd_arg, sql_exchange* ex,
char* block_arg, const char* db_arg, const char* table_name_arg,
uint block_len_arg) : List<Item>& fields_arg, enum enum_duplicates handle_dup,
Log_event(thd->start_time), db(table->db),tbl_name(table->real_name), char* block_arg, uint block_len_arg):
db_len(strlen(table->db)),tbl_name_len(strlen(table->real_name)), Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup),
block(block_arg),block_len(block_len_arg), fake_base(0),block(block_arg),block_len(block_len_arg),
file_id(thd->file_id = thd->query_id) file_id(thd_arg->file_id = thd_arg->query_id)
{ {
set_log_seq(thd, &mysql_bin_log);
} }
#endif #endif
int Create_file_log_event::write_data(IO_CACHE* file) int Create_file_log_event::write_data_body(IO_CACHE* file)
{
int res;
if ((res = Load_log_event::write_data_body(file)) || fake_base)
return res;
return my_b_write(file, "", 1) || my_b_write(file, block, block_len);
}
int Create_file_log_event::write_data_header(IO_CACHE* file)
{ {
int res;
if ((res = Load_log_event::write_data_header(file)) || fake_base)
return res;
char buf[CREATE_FILE_HEADER_LEN]; char buf[CREATE_FILE_HEADER_LEN];
buf[CF_DB_LEN_OFFSET] = (uchar)db_len;
buf[CF_TBL_LEN_OFFSET] = (uchar)tbl_name_len;
int4store(buf + CF_FILE_ID_OFFSET, file_id); int4store(buf + CF_FILE_ID_OFFSET, file_id);
return my_b_write(file, buf, CREATE_FILE_HEADER_LEN) || return my_b_write(file, buf, CREATE_FILE_HEADER_LEN);
my_b_write(file, db, db_len) || }
my_b_write(file, tbl_name, tbl_name_len) ||
my_b_write(file, block, block_len); int Create_file_log_event::write_base(IO_CACHE* file)
{
int res;
fake_base = 1; // pretend we are Load event
res = write(file);
fake_base = 0;
return res;
} }
Create_file_log_event::Create_file_log_event(const char* buf, int len): Create_file_log_event::Create_file_log_event(const char* buf, int len):
Log_event(buf),db(0) Load_log_event(buf,0),fake_base(0),block(0)
{ {
db_len = (uint)buf[LOG_EVENT_HEADER_LEN + CF_DB_LEN_OFFSET]; int block_offset;
tbl_name_len = (uint)buf[CF_TBL_LEN_OFFSET + LOG_EVENT_HEADER_LEN]; if (copy_log_event(buf,len))
if ((uint)len < db_len + tbl_name_len + CREATE_FILE_EVENT_OVERHEAD)
return; return;
fname_null_term = 1;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + CF_FILE_ID_OFFSET); file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
db = (char*)buf + CREATE_FILE_EVENT_OVERHEAD; + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
tbl_name = db + db_len; block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
block = tbl_name + tbl_name_len; CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname
block_len = len - (db_len + tbl_name_len + CREATE_FILE_EVENT_OVERHEAD); if(len < block_offset)
return;
block = (char*)buf + block_offset;
block_len = len - block_offset;
} }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void Create_file_log_event::print(FILE* file, bool short_form = 0, void Create_file_log_event::print(FILE* file, bool short_form = 0,
...@@ -1009,11 +1160,8 @@ void Create_file_log_event::print(FILE* file, bool short_form = 0, ...@@ -1009,11 +1160,8 @@ void Create_file_log_event::print(FILE* file, bool short_form = 0,
{ {
if (short_form) if (short_form)
return; return;
print_header(file); Load_log_event::print(file, 1, last_db);
fputc('\n', file); fprintf(file, " file_id=%d, block_len=%d\n", file_id, block_len);
fprintf(file, "Create_file: db='%-*s' table='%-*s' file_id=%d,\
block_len=%d\n", db_len, db, tbl_name_len, tbl_name, file_id,
block_len);
} }
#endif #endif
...@@ -1027,13 +1175,601 @@ void Create_file_log_event::pack_info(String* packet) ...@@ -1027,13 +1175,601 @@ void Create_file_log_event::pack_info(String* packet)
tmp.append("db="); tmp.append("db=");
tmp.append(db, db_len); tmp.append(db, db_len);
tmp.append(";table="); tmp.append(";table=");
tmp.append(tbl_name, tbl_name_len); tmp.append(table_name, table_name_len);
tmp.append(";file_id="); tmp.append(";file_id=");
tmp.append(llstr(file_id,buf)); tmp.append(llstr(file_id,buf));
tmp.append(";block_len="); tmp.append(";block_len=");
tmp.append(llstr(block_len,buf)); tmp.append(llstr(block_len,buf));
net_store_data(packet, (char*)tmp.ptr(), tmp.length());
}
#endif
#ifndef MYSQL_CLIENT
Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg,
uint block_len_arg):
Log_event(thd_arg), block(block_arg),block_len(block_len_arg),
file_id(thd_arg->file_id)
{
}
#endif
Append_block_log_event::Append_block_log_event(const char* buf, int len):
Log_event(buf),block(0)
{
if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD;
block_len = len - APPEND_BLOCK_EVENT_OVERHEAD;
}
int Append_block_log_event::write_data(IO_CACHE* file)
{
char buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id);
return my_b_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
my_b_write(file, block, block_len);
}
#ifdef MYSQL_CLIENT
void Append_block_log_event::print(FILE* file, bool short_form = 0,
char* last_db = 0)
{
if (short_form)
return;
print_header(file);
fputc('\n', file);
fprintf(file, "#Append_block: file_id=%d, block_len=%d\n",
file_id, block_len);
}
#endif
#ifndef MYSQL_CLIENT
void Append_block_log_event::pack_info(String* packet)
{
char buf1[256];
String tmp(buf1, sizeof(buf1));
tmp.length(0);
char buf[22];
tmp.append(";file_id=");
tmp.append(llstr(file_id,buf));
tmp.append(";block_len=");
tmp.append(llstr(block_len,buf));
net_store_data(packet, (char*)tmp.ptr(), tmp.length());
}
#endif
#ifndef MYSQL_CLIENT
Delete_file_log_event::Delete_file_log_event(THD* thd_arg):
Log_event(thd_arg),file_id(thd_arg->file_id)
{
}
#endif
Delete_file_log_event::Delete_file_log_event(const char* buf, int len):
Log_event(buf),file_id(0)
{
if((uint)len < DELETE_FILE_EVENT_OVERHEAD)
return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
}
int Delete_file_log_event::write_data(IO_CACHE* file)
{
char buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id);
return my_b_write(file, buf, DELETE_FILE_HEADER_LEN);
}
#ifdef MYSQL_CLIENT
void Delete_file_log_event::print(FILE* file, bool short_form = 0,
char* last_db = 0)
{
if (short_form)
return;
print_header(file);
fputc('\n', file);
fprintf(file, "#Delete_file: file_id=%d\n",
file_id);
}
#endif
#ifndef MYSQL_CLIENT
void Delete_file_log_event::pack_info(String* packet)
{
char buf1[256];
String tmp(buf1, sizeof(buf1));
tmp.length(0);
char buf[22];
tmp.append(";file_id=");
tmp.append(llstr(file_id,buf));
net_store_data(packet, (char*)tmp.ptr(), tmp.length());
}
#endif
#ifndef MYSQL_CLIENT
Execute_load_log_event::Execute_load_log_event(THD* thd_arg):
Log_event(thd_arg),file_id(thd_arg->file_id)
{
}
#endif
Execute_load_log_event::Execute_load_log_event(const char* buf,int len):
Log_event(buf),file_id(0)
{
if((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET);
}
int Execute_load_log_event::write_data(IO_CACHE* file)
{
char buf[EXEC_LOAD_HEADER_LEN];
int4store(buf + EL_FILE_ID_OFFSET, file_id);
return my_b_write(file, buf, EXEC_LOAD_HEADER_LEN);
}
#ifdef MYSQL_CLIENT
void Execute_load_log_event::print(FILE* file, bool short_form = 0,
char* last_db = 0)
{
if (short_form)
return;
print_header(file);
fputc('\n', file);
fprintf(file, "#Exec_load: file_id=%d\n",
file_id);
} }
#endif #endif
#ifndef MYSQL_CLIENT
void Execute_load_log_event::pack_info(String* packet)
{
char buf1[256];
String tmp(buf1, sizeof(buf1));
tmp.length(0);
char buf[22];
tmp.append(";file_id=");
tmp.append(llstr(file_id,buf));
net_store_data(packet, (char*)tmp.ptr(), tmp.length());
}
#endif
#ifndef MYSQL_CLIENT
int Query_log_event::exec_event(struct st_master_info* mi)
{
int expected_error,actual_error = 0;
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = rewrite_db((char*)db);
if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->query = (char*)query;
thd->set_time((time_t)when);
thd->current_tablenr = 0;
VOID(pthread_mutex_lock(&LOCK_thread_count));
thd->query_id = query_id++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
thd->query_error = 0; // clear error
thd->net.last_errno = 0;
thd->net.last_error[0] = 0;
thd->slave_proxy_id = thread_id; // for temp tables
// sanity check to make sure the master did not get a really bad
// error on the query
if (!check_expected_error(thd, (expected_error = error_code)))
{
mysql_parse(thd, thd->query, q_len);
if (expected_error !=
(actual_error = thd->net.last_errno) && expected_error)
{
const char* errmsg = "Slave: did not get the expected error\
running query from master - expected: '%s'(%d), got '%s'(%d)";
sql_print_error(errmsg, ER_SAFE(expected_error),
expected_error,
actual_error ? thd->net.last_error:"no error",
actual_error);
thd->query_error = 1;
}
else if (expected_error == actual_error)
{
thd->query_error = 0;
*last_slave_error = 0;
last_slave_errno = 0;
}
}
else
{
// master could be inconsistent, abort and tell DBA to check/fix it
thd->db = thd->query = 0;
thd->convert_set = 0;
close_thread_tables(thd);
free_root(&thd->mem_root,0);
return 1;
}
}
thd->db = 0; // prevent db from being freed
thd->query = 0; // just to be sure
// assume no convert for next query unless set explictly
thd->convert_set = 0;
close_thread_tables(thd);
if (thd->query_error || thd->fatal_error)
{
slave_print_error(actual_error, "error '%s' on query '%s'",
actual_error ? thd->net.last_error :
"unexpected success or fatal error", query);
free_root(&thd->mem_root,0);
return 1;
}
free_root(&thd->mem_root,0);
return Log_event::exec_event(mi);
}
int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
{
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = rewrite_db((char*)db);
thd->query = 0;
thd->query_error = 0;
if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->set_time((time_t)when);
thd->current_tablenr = 0;
VOID(pthread_mutex_lock(&LOCK_thread_count));
thd->query_id = query_id++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
TABLE_LIST tables;
bzero((char*) &tables,sizeof(tables));
tables.db = thd->db;
tables.name = tables.real_name = (char*)table_name;
tables.lock_type = TL_WRITE;
// the table will be opened in mysql_load
if(table_rules_on && !tables_ok(thd, &tables))
{
if (net)
skip_load_data_infile(net);
}
else
{
char llbuff[22];
enum enum_duplicates handle_dup = DUP_IGNORE;
char fname_buf[FN_REFLEN+1], *fname_p;
if (fname_null_term)
fname_p = (char*)fname;
else
{
int len = min(FN_REFLEN,fname_len);
memcpy(fname_buf,fname,len);
fname_buf[len] = 0;
fname_p = fname_buf;
}
if(sql_ex.opt_flags && REPLACE_FLAG)
handle_dup = DUP_REPLACE;
sql_exchange ex(fname_p, sql_ex.opt_flags &&
DUMPFILE_FLAG );
String field_term(&sql_ex.field_term, 1),
enclosed(&sql_ex.enclosed, 1),
line_term(&sql_ex.line_term,1),
escaped(&sql_ex.escaped, 1),
line_start(&sql_ex.line_start, 1);
ex.field_term = &field_term;
if(sql_ex.empty_flags & FIELD_TERM_EMPTY)
ex.field_term->length(0);
ex.enclosed = &enclosed;
if(sql_ex.empty_flags & ENCLOSED_EMPTY)
ex.enclosed->length(0);
ex.line_term = &line_term;
if(sql_ex.empty_flags & LINE_TERM_EMPTY)
ex.line_term->length(0);
ex.line_start = &line_start;
if(sql_ex.empty_flags & LINE_START_EMPTY)
ex.line_start->length(0);
ex.escaped = &escaped;
if(sql_ex.empty_flags & ESCAPED_EMPTY)
ex.escaped->length(0);
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
if(sql_ex.empty_flags & FIELD_TERM_EMPTY)
ex.field_term->length(0);
ex.skip_lines = skip_lines;
List<Item> fields;
set_fields(fields);
thd->slave_proxy_id = thd->thread_id;
if (net)
{
// mysql_load will use thd->net to read the file
thd->net.vio = net->vio;
// make sure the client does not get confused
// about the packet sequence
thd->net.pkt_nr = net->pkt_nr;
}
if(mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0,
TL_WRITE))
thd->query_error = 1;
if(thd->cuted_fields)
sql_print_error("Slave: load data infile at position %s in log \
'%s' produced %d warning(s)", llstr(mi->pos,llbuff), RPL_LOG_NAME,
thd->cuted_fields );
if(net)
net->pkt_nr = thd->net.pkt_nr;
}
}
else
{
// we will just ask the master to send us /dev/null if we do not
// want to load the data
if (net)
skip_load_data_infile(net);
}
thd->net.vio = 0;
thd->db = 0;// prevent db from being freed
close_thread_tables(thd);
if(thd->query_error)
{
int sql_error = thd->net.last_errno;
if(!sql_error)
sql_error = ER_UNKNOWN_ERROR;
slave_print_error(sql_error, "Slave: Error '%s' running load data infile ",
ER(sql_error));
free_root(&thd->mem_root,0);
return 1;
}
free_root(&thd->mem_root,0);
if(thd->fatal_error)
{
sql_print_error("Slave: Fatal error running LOAD DATA INFILE ");
return 1;
}
return Log_event::exec_event(mi);
}
int Start_log_event::exec_event(struct st_master_info* mi)
{
close_temporary_tables(thd);
return Log_event::exec_event(mi);
}
int Stop_log_event::exec_event(struct st_master_info* mi)
{
if(mi->pos > 4) // stop event should be ignored after rotate event
{
close_temporary_tables(thd);
mi->inc_pos(get_event_len(), log_seq);
flush_master_info(mi);
}
thd->log_seq = 0;
return 0;
}
int Rotate_log_event::exec_event(struct st_master_info* mi)
{
bool rotate_binlog = 0, write_slave_event = 0;
char* log_name = mi->log_file_name;
pthread_mutex_lock(&mi->lock);
// rotate local binlog only if the name of remote has changed
if (!*log_name || !(log_name[ident_len] == 0 &&
!memcmp(log_name, new_log_ident, ident_len)))
{
write_slave_event = (!(flags & LOG_EVENT_FORCED_ROTATE_F)
&& mysql_bin_log.is_open());
rotate_binlog = (*log_name && write_slave_event);
memcpy(log_name, new_log_ident,ident_len );
log_name[ident_len] = 0;
}
mi->pos = pos;
mi->last_log_seq = log_seq;
#ifndef DBUG_OFF
if (abort_slave_event_count)
++events_till_abort;
#endif
if (rotate_binlog)
{
mysql_bin_log.new_file();
mi->last_log_seq = 0;
}
pthread_cond_broadcast(&mi->cond);
pthread_mutex_unlock(&mi->lock);
flush_master_info(mi);
if (write_slave_event)
{
Slave_log_event s(thd, mi);
if (s.master_host)
{
s.set_log_seq(0, &mysql_bin_log);
s.server_id = ::server_id;
mysql_bin_log.write(&s);
}
}
thd->log_seq = 0;
return 0;
}
int Intvar_log_event::exec_event(struct st_master_info* mi)
{
switch(type)
{
case LAST_INSERT_ID_EVENT:
thd->last_insert_id_used = 1;
thd->last_insert_id = val;
break;
case INSERT_ID_EVENT:
thd->next_insert_id = val;
break;
}
mi->inc_pending(get_event_len());
return 0;
}
int Slave_log_event::exec_event(struct st_master_info* mi)
{
if(mysql_bin_log.is_open())
mysql_bin_log.write(this);
return Log_event::exec_event(mi);
}
int Create_file_log_event::exec_event(struct st_master_info* mi)
{
char fname_buf[FN_REFLEN+10];
char* p,*p1;
int fd = -1;
IO_CACHE file;
int error = 1;
p = slave_load_file_stem(fname_buf, file_id, server_id);
memcpy(p, ".info", 6);
bzero((char*)&file, sizeof(file));
if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
MYF(MY_WME))) < 0 ||
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
slave_print_error(my_errno, "Could not open file '%s'", fname_buf);
goto err;
}
// a trick to avoid allocating another buffer
memcpy(p, ".data", 6);
fname = fname_buf;
fname_len = (uint)(p-fname) + 5;
if (write_base(&file))
{
memcpy(p, ".info", 6); // to have it right in the error message
slave_print_error(my_errno, "Could not write to file '%s'", fname_buf);
goto err;
}
end_io_cache(&file);
my_close(fd, MYF(0));
// fname_buf now already has .data, not .info, because we did our trick
if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
MYF(MY_WME))) < 0)
{
slave_print_error(my_errno, "Could not open file '%s'", fname_buf);
goto err;
}
if (my_write(fd, block, block_len, MYF(MY_WME+MY_NABP)))
{
slave_print_error(my_errno, "Write to '%s' failed", fname_buf);
goto err;
}
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
error=0;
err:
if (error)
end_io_cache(&file);
if (fd >= 0)
my_close(fd, MYF(0));
return error ? 1 : Log_event::exec_event(mi);
}
int Delete_file_log_event::exec_event(struct st_master_info* mi)
{
char fname[FN_REFLEN+10];
char* p;
p = slave_load_file_stem(fname, file_id, server_id);
memcpy(p, ".data", 6);
(void)my_delete(fname, MYF(MY_WME));
memcpy(p, ".info", 6);
(void)my_delete(fname, MYF(MY_WME));
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
return Log_event::exec_event(mi);
}
int Append_block_log_event::exec_event(struct st_master_info* mi)
{
char fname[FN_REFLEN+10];
char* p;
int fd = -1;
int error = 1;
p = slave_load_file_stem(fname, file_id, server_id);
memcpy(p, ".data", 6);
if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0)
{
slave_print_error(my_errno, "Could not open file '%s'", fname);
goto err;
}
if (my_write(fd, block, block_len, MYF(MY_WME+MY_NABP)))
{
slave_print_error(my_errno, "Write to '%s' failed", fname);
goto err;
}
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
error=0;
err:
if (fd >= 0)
my_close(fd, MYF(0));
return error ? error : Log_event::exec_event(mi);
}
int Execute_load_log_event::exec_event(struct st_master_info* mi)
{
char fname[FN_REFLEN+10];
char* p;
int fd = -1;
int error = 1;
int save_options;
IO_CACHE file;
Load_log_event* lev = 0;
p = slave_load_file_stem(fname, file_id, server_id);
memcpy(p, ".info", 6);
bzero((char*)&file, sizeof(file));
if ((fd = my_open(fname, O_RDONLY|O_BINARY, MYF(MY_WME))) < 0 ||
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
slave_print_error(my_errno, "Could not open file '%s'", fname);
goto err;
}
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,0))
|| lev->get_type_code() != LOAD_EVENT)
{
slave_print_error(0, "File '%s' appears corrupted", fname);
goto err;
}
// we want to disable binary logging in slave thread
// because we need the file events to appear in the same order
// as they do on the master relative to other events, so that we
// can preserve ascending order of log sequence numbers - needed
// to handle failover
save_options = thd->options;
thd->options &= ~OPTION_BIN_LOG;
lev->thd = thd;
if (lev->exec_event(0,0))
{
slave_print_error(my_errno, "Failed executing load from '%s'", fname);
thd->options = save_options;
goto err;
}
thd->options = save_options;
(void)my_delete(fname, MYF(MY_WME));
memcpy(p, ".data", 6);
(void)my_delete(fname, MYF(MY_WME));
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
error = 0;
err:
delete lev;
end_io_cache(&file);
if (fd >= 0)
my_close(fd, MYF(0));
return error ? error : Log_event::exec_event(mi);
}
#endif
......
...@@ -42,6 +42,29 @@ ...@@ -42,6 +42,29 @@
*/ */
#define ST_SERVER_VER_LEN 50 #define ST_SERVER_VER_LEN 50
#define DUMPFILE_FLAG 0x1
#define OPT_ENCLOSED_FLAG 0x2
#define REPLACE_FLAG 0x4
#define IGNORE_FLAG 0x8
#define FIELD_TERM_EMPTY 0x1
#define ENCLOSED_EMPTY 0x2
#define LINE_TERM_EMPTY 0x4
#define LINE_START_EMPTY 0x8
#define ESCAPED_EMPTY 0x10
struct sql_ex_info
{
char field_term;
char enclosed;
char line_term;
char line_start;
char escaped;
char opt_flags; // flags for the options
char empty_flags; // flags to indicate which of the terminating charact
} ;
/* Binary log consists of events. Each event has a fixed length header, /* Binary log consists of events. Each event has a fixed length header,
followed by possibly variable ( depending on the type of event) length followed by possibly variable ( depending on the type of event) length
data body. The data body consists of an optional fixed length segment data body. The data body consists of an optional fixed length segment
...@@ -49,13 +72,17 @@ ...@@ -49,13 +72,17 @@
comments below for the format specifics comments below for the format specifics
*/ */
/* event-specific post-header sizes */ /* event-specific post-header sizes */
#define LOG_EVENT_HEADER_LEN 19 #define LOG_EVENT_HEADER_LEN 19
#define QUERY_HEADER_LEN (4 + 4 + 1 + 2) #define QUERY_HEADER_LEN (4 + 4 + 1 + 2)
#define LOAD_HEADER_LEN (4 + 4 + 4 + 1 +1 + 4) #define LOAD_HEADER_LEN (4 + 4 + 4 + 1 +1 + 4+sizeof(struct sql_ex_info))
#define START_HEADER_LEN (2 + ST_SERVER_VER_LEN + 4) #define START_HEADER_LEN (2 + ST_SERVER_VER_LEN + 4)
#define ROTATE_HEADER_LEN 8 #define ROTATE_HEADER_LEN 8
#define CREATE_FILE_HEADER_LEN 6 #define CREATE_FILE_HEADER_LEN 4
#define APPEND_BLOCK_HEADER_LEN 4
#define EXEC_LOAD_HEADER_LEN 4
#define DELETE_FILE_HEADER_LEN 4
/* event header offsets */ /* event header offsets */
...@@ -98,6 +125,7 @@ ...@@ -98,6 +125,7 @@
#define L_DB_LEN_OFFSET 12 #define L_DB_LEN_OFFSET 12
#define L_TBL_LEN_OFFSET 13 #define L_TBL_LEN_OFFSET 13
#define L_NUM_FIELDS_OFFSET 14 #define L_NUM_FIELDS_OFFSET 14
#define L_SQL_EX_OFFSET 18
#define L_DATA_OFFSET LOAD_HEADER_LEN #define L_DATA_OFFSET LOAD_HEADER_LEN
/* Rotate event post-header */ /* Rotate event post-header */
...@@ -105,15 +133,26 @@ ...@@ -105,15 +133,26 @@
#define R_POS_OFFSET 0 #define R_POS_OFFSET 0
#define R_IDENT_OFFSET 8 #define R_IDENT_OFFSET 8
#define CF_DB_LEN_OFFSET 0 #define CF_FILE_ID_OFFSET 0
#define CF_TBL_LEN_OFFSET 1 #define CF_DATA_OFFSET CREATE_FILE_HEADER_LEN
#define CF_FILE_ID_OFFSET 2
#define AB_FILE_ID_OFFSET 0
#define AB_DATA_OFFSET APPEND_BLOCK_HEADER_LEN
#define EL_FILE_ID_OFFSET 0
#define DF_FILE_ID_OFFSET 0
#define QUERY_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN) #define QUERY_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN)
#define QUERY_DATA_OFFSET (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN) #define QUERY_DATA_OFFSET (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN)
#define ROTATE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+ROTATE_HEADER_LEN) #define ROTATE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+ROTATE_HEADER_LEN)
#define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN+sizeof(sql_ex_info)) #define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN)
#define CREATE_FILE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+CREATE_FILE_HEADER_LEN) #define CREATE_FILE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+\
+LOAD_HEADER_LEN+CREATE_FILE_HEADER_LEN)
#define DELETE_FILE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+DELETE_FILE_HEADER_LEN)
#define EXEC_LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+EXEC_LOAD_HEADER_LEN)
#define APPEND_BLOCK_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+APPEND_BLOCK_HEADER_LEN)
#define BINLOG_MAGIC "\xfe\x62\x69\x6e" #define BINLOG_MAGIC "\xfe\x62\x69\x6e"
...@@ -123,7 +162,7 @@ ...@@ -123,7 +162,7 @@
enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2, enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2,
STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5, STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5,
LOAD_EVENT=6, SLAVE_EVENT=7, CREATE_FILE_EVENT=8, LOAD_EVENT=6, SLAVE_EVENT=7, CREATE_FILE_EVENT=8,
APPEND_TO_FILE_EVENT=9, EXEC_LOAD_EVENT=10, DELETE_FILE_EVENT=11}; APPEND_BLOCK_EVENT=9, EXEC_LOAD_EVENT=10, DELETE_FILE_EVENT=11};
enum Int_event_type { INVALID_INT_EVENT = 0, LAST_INSERT_ID_EVENT = 1, INSERT_ID_EVENT = 2 enum Int_event_type { INVALID_INT_EVENT = 0, LAST_INSERT_ID_EVENT = 1, INSERT_ID_EVENT = 2
}; };
...@@ -145,6 +184,11 @@ class Log_event ...@@ -145,6 +184,11 @@ class Log_event
uint32 server_id; uint32 server_id;
uint32 log_seq; uint32 log_seq;
uint16 flags; uint16 flags;
int cached_event_len;
char* temp_buf;
#ifndef MYSQL_CLIENT
THD* thd;
#endif
static void *operator new(size_t size) static void *operator new(size_t size)
{ {
...@@ -158,30 +202,32 @@ class Log_event ...@@ -158,30 +202,32 @@ class Log_event
int write(IO_CACHE* file); int write(IO_CACHE* file);
int write_header(IO_CACHE* file); int write_header(IO_CACHE* file);
virtual int write_data(IO_CACHE* file __attribute__((unused))) { return 0; } virtual int write_data(IO_CACHE* file)
{ return write_data_header(file) || write_data_body(file); }
virtual int write_data_header(IO_CACHE* file __attribute__((unused)))
{ return 0; }
virtual int write_data_body(IO_CACHE* file __attribute__((unused)))
{ return 0; }
virtual Log_event_type get_type_code() = 0; virtual Log_event_type get_type_code() = 0;
Log_event(time_t when_arg, ulong exec_time_arg = 0, virtual bool is_valid() = 0;
int valid_exec_time = 0, uint32 server_id_arg = 0, Log_event(const char* buf);
uint32 log_seq_arg = 0, uint16 flags_arg = 0): #ifndef MYSQL_CLIENT
when(when_arg), exec_time(exec_time_arg), Log_event(THD* thd_arg, uint16 flags_arg = 0);
log_seq(log_seq_arg),flags(0) #endif
{ virtual ~Log_event() { free_temp_buf();}
server_id = server_id_arg ? server_id_arg : (::server_id); void register_temp_buf(char* buf) { temp_buf = buf; }
if(valid_exec_time) void free_temp_buf()
flags |= LOG_EVENT_TIME_F; {
} if (temp_buf)
{
Log_event(const char* buf) my_free(temp_buf, MYF(0));
{ temp_buf = 0;
when = uint4korr(buf); }
server_id = uint4korr(buf + SERVER_ID_OFFSET); }
log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
flags = uint2korr(buf + FLAGS_OFFSET);
}
virtual ~Log_event() {}
virtual int get_data_size() { return 0;} virtual int get_data_size() { return 0;}
virtual int get_data_body_offset() { return 0; }
int get_event_len() { return cached_event_len ? cached_event_len :
(cached_event_len = LOG_EVENT_HEADER_LEN + get_data_size()); }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
virtual void print(FILE* file, bool short_form = 0, char* last_db = 0) = 0; virtual void print(FILE* file, bool short_form = 0, char* last_db = 0) = 0;
void print_timestamp(FILE* file, time_t *ts = 0); void print_timestamp(FILE* file, time_t *ts = 0);
...@@ -200,6 +246,11 @@ class Log_event ...@@ -200,6 +246,11 @@ class Log_event
virtual void pack_info(String* packet); virtual void pack_info(String* packet);
int net_send(THD* thd, const char* log_name, ulong pos); int net_send(THD* thd, const char* log_name, ulong pos);
static void init_show_field_list(List<Item>* field_list); static void init_show_field_list(List<Item>* field_list);
virtual int exec_event(struct st_master_info* mi);
virtual const char* get_db()
{
return thd ? thd->db : 0;
}
#endif #endif
}; };
...@@ -219,24 +270,13 @@ class Query_log_event: public Log_event ...@@ -219,24 +270,13 @@ class Query_log_event: public Log_event
uint16 error_code; uint16 error_code;
ulong thread_id; ulong thread_id;
#if !defined(MYSQL_CLIENT) #if !defined(MYSQL_CLIENT)
THD* thd;
bool cache_stmt; bool cache_stmt;
Query_log_event(THD* thd_arg, const char* query_arg, bool using_trans=0):
Log_event(thd_arg->start_time,0,1,thd_arg->server_id,thd_arg->log_seq), Query_log_event(THD* thd_arg, const char* query_arg,
data_buf(0), bool using_trans=0);
query(query_arg), db(thd_arg->db), q_len(thd_arg->query_length), const char* get_db() { return db; }
error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno),
thread_id(thd_arg->thread_id), thd(thd_arg),
cache_stmt(using_trans &&
(thd_arg->options & (OPTION_NOT_AUTO_COMMIT | OPTION_BEGIN)))
{
time_t end_time;
time(&end_time);
exec_time = (ulong) (end_time - thd->start_time);
db_len = (db) ? (uint32) strlen(db) : 0;
}
void pack_info(String* packet); void pack_info(String* packet);
int exec_event(struct st_master_info* mi);
#endif #endif
Query_log_event(const char* buf, int event_len); Query_log_event(const char* buf, int event_len);
...@@ -250,6 +290,7 @@ class Query_log_event: public Log_event ...@@ -250,6 +290,7 @@ class Query_log_event: public Log_event
Log_event_type get_type_code() { return QUERY_EVENT; } Log_event_type get_type_code() { return QUERY_EVENT; }
int write(IO_CACHE* file); int write(IO_CACHE* file);
int write_data(IO_CACHE* file); // returns 0 on success, -1 on error int write_data(IO_CACHE* file); // returns 0 on success, -1 on error
bool is_valid() { return query != 0; }
int get_data_size() int get_data_size()
{ {
return q_len + db_len + 2 + return q_len + db_len + 2 +
...@@ -279,11 +320,13 @@ class Slave_log_event: public Log_event ...@@ -279,11 +320,13 @@ class Slave_log_event: public Log_event
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Slave_log_event(THD* thd_arg, struct st_master_info* mi); Slave_log_event(THD* thd_arg, struct st_master_info* mi);
void pack_info(String* packet); void pack_info(String* packet);
int exec_event(struct st_master_info* mi);
#endif #endif
Slave_log_event(const char* buf, int event_len); Slave_log_event(const char* buf, int event_len);
~Slave_log_event(); ~Slave_log_event();
int get_data_size(); int get_data_size();
bool is_valid() { return master_host != 0; }
Log_event_type get_type_code() { return SLAVE_EVENT; } Log_event_type get_type_code() { return SLAVE_EVENT; }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void print(FILE* file, bool short_form = 0, char* last_db = 0); void print(FILE* file, bool short_form = 0, char* last_db = 0);
...@@ -292,34 +335,10 @@ class Slave_log_event: public Log_event ...@@ -292,34 +335,10 @@ class Slave_log_event: public Log_event
}; };
#define DUMPFILE_FLAG 0x1
#define OPT_ENCLOSED_FLAG 0x2
#define REPLACE_FLAG 0x4
#define IGNORE_FLAG 0x8
#define FIELD_TERM_EMPTY 0x1
#define ENCLOSED_EMPTY 0x2
#define LINE_TERM_EMPTY 0x4
#define LINE_START_EMPTY 0x8
#define ESCAPED_EMPTY 0x10
struct sql_ex_info
{
char field_term;
char enclosed;
char line_term;
char line_start;
char escaped;
char opt_flags; // flags for the options
char empty_flags; // flags to indicate which of the terminating charact
} ;
class Load_log_event: public Log_event class Load_log_event: public Log_event
{ {
protected: protected:
char* data_buf; int copy_log_event(const char *buf, ulong event_len);
void copy_log_event(const char *buf, ulong data_len);
public: public:
ulong thread_id; ulong thread_id;
...@@ -330,96 +349,39 @@ class Load_log_event: public Log_event ...@@ -330,96 +349,39 @@ class Load_log_event: public Log_event
const char* fields; const char* fields;
const uchar* field_lens; const uchar* field_lens;
uint32 field_block_len; uint32 field_block_len;
const char* table_name; const char* table_name;
const char* db; const char* db;
const char* fname; const char* fname;
bool fname_null_term;
uint32 skip_lines; uint32 skip_lines;
sql_ex_info sql_ex; sql_ex_info sql_ex;
#if !defined(MYSQL_CLIENT) #if !defined(MYSQL_CLIENT)
THD* thd;
String field_lens_buf; String field_lens_buf;
String fields_buf; String fields_buf;
Load_log_event(THD* thd, sql_exchange* ex, const char* table_name_arg,
List<Item>& fields_arg, enum enum_duplicates handle_dup ): Load_log_event(THD* thd, sql_exchange* ex, const char* db_arg,
Log_event(thd->start_time),data_buf(0),thread_id(thd->thread_id), const char* table_name_arg,
num_fields(0),fields(0),field_lens(0),field_block_len(0), List<Item>& fields_arg, enum enum_duplicates handle_dup);
table_name(table_name_arg),
db(thd->db),
fname(ex->file_name),
thd(thd)
{
time_t end_time;
time(&end_time);
exec_time = (ulong) (end_time - thd->start_time);
db_len = (db) ? (uint32) strlen(db) : 0;
table_name_len = (table_name) ? (uint32) strlen(table_name) : 0;
fname_len = (fname) ? (uint) strlen(fname) : 0;
sql_ex.field_term = (*ex->field_term)[0];
sql_ex.enclosed = (*ex->enclosed)[0];
sql_ex.line_term = (*ex->line_term)[0];
sql_ex.line_start = (*ex->line_start)[0];
sql_ex.escaped = (*ex->escaped)[0];
sql_ex.opt_flags = 0;
if(ex->dumpfile)
sql_ex.opt_flags |= DUMPFILE_FLAG;
if(ex->opt_enclosed)
sql_ex.opt_flags |= OPT_ENCLOSED_FLAG;
sql_ex.empty_flags = 0;
switch(handle_dup)
{
case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break;
case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break;
case DUP_ERROR: break;
}
if(!ex->field_term->length())
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
if(!ex->enclosed->length())
sql_ex.empty_flags |= ENCLOSED_EMPTY;
if(!ex->line_term->length())
sql_ex.empty_flags |= LINE_TERM_EMPTY;
if(!ex->line_start->length())
sql_ex.empty_flags |= LINE_START_EMPTY;
if(!ex->escaped->length())
sql_ex.empty_flags |= ESCAPED_EMPTY;
skip_lines = ex->skip_lines;
List_iterator<Item> li(fields_arg);
field_lens_buf.length(0);
fields_buf.length(0);
Item* item;
while((item = li++))
{
num_fields++;
uchar len = (uchar) strlen(item->name);
field_block_len += len + 1;
fields_buf.append(item->name, len + 1);
field_lens_buf.append((char*)&len, 1);
}
field_lens = (const uchar*)field_lens_buf.ptr();
fields = fields_buf.ptr();
}
void set_fields(List<Item> &fields_arg); void set_fields(List<Item> &fields_arg);
void pack_info(String* packet); void pack_info(String* packet);
const char* get_db() { return db; }
int exec_event(struct st_master_info* mi)
{
return exec_event(thd->slave_net,mi);
}
int exec_event(NET* net, struct st_master_info* mi);
#endif #endif
Load_log_event(const char* buf, int event_len); Load_log_event(const char* buf, int event_len);
~Load_log_event() ~Load_log_event()
{ {
if (data_buf)
{
my_free((gptr) data_buf, MYF(0));
}
} }
Log_event_type get_type_code() { return LOAD_EVENT; } Log_event_type get_type_code() { return LOAD_EVENT; }
int write_data(IO_CACHE* file); // returns 0 on success, -1 on error int write_data_header(IO_CACHE* file);
int write_data_body(IO_CACHE* file);
bool is_valid() { return table_name != 0; }
int get_data_size() int get_data_size()
{ {
return table_name_len + 2 + db_len + 2 + fname_len return table_name_len + 2 + db_len + 2 + fname_len
...@@ -427,9 +389,10 @@ class Load_log_event: public Log_event ...@@ -427,9 +389,10 @@ class Load_log_event: public Log_event
+ 4 // exec_time + 4 // exec_time
+ 4 // skip_lines + 4 // skip_lines
+ 4 // field block len + 4 // field block len
+ sizeof(sql_ex) + field_block_len + num_fields*sizeof(uchar) ; + sizeof(sql_ex) + field_block_len + num_fields;
; ;
} }
int get_data_body_offset() { return LOAD_EVENT_OVERHEAD; }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void print(FILE* file, bool short_form = 0, char* last_db = 0); void print(FILE* file, bool short_form = 0, char* last_db = 0);
#endif #endif
...@@ -443,23 +406,25 @@ class Start_log_event: public Log_event ...@@ -443,23 +406,25 @@ class Start_log_event: public Log_event
uint32 created; uint32 created;
uint16 binlog_version; uint16 binlog_version;
char server_version[ST_SERVER_VER_LEN]; char server_version[ST_SERVER_VER_LEN];
#ifndef MYSQL_CLIENT
Start_log_event() :Log_event(time(NULL)),binlog_version(BINLOG_VERSION) Start_log_event() :Log_event((THD*)0),binlog_version(BINLOG_VERSION)
{ {
created = (uint32) when; created = (uint32) when;
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
} }
#endif
Start_log_event(const char* buf); Start_log_event(const char* buf);
~Start_log_event() {} ~Start_log_event() {}
Log_event_type get_type_code() { return START_EVENT;} Log_event_type get_type_code() { return START_EVENT;}
int write_data(IO_CACHE* file); int write_data(IO_CACHE* file);
bool is_valid() { return 1; }
int get_data_size() int get_data_size()
{ {
return START_HEADER_LEN; return START_HEADER_LEN;
} }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
void pack_info(String* packet); void pack_info(String* packet);
int exec_event(struct st_master_info* mi);
#endif #endif
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void print(FILE* file, bool short_form = 0, char* last_db = 0); void print(FILE* file, bool short_form = 0, char* last_db = 0);
...@@ -471,17 +436,21 @@ class Intvar_log_event: public Log_event ...@@ -471,17 +436,21 @@ class Intvar_log_event: public Log_event
public: public:
ulonglong val; ulonglong val;
uchar type; uchar type;
Intvar_log_event(uchar type_arg, ulonglong val_arg) #ifndef MYSQL_CLIENT
:Log_event(time(NULL)),val(val_arg),type(type_arg) Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg)
:Log_event(thd_arg),val(val_arg),type(type_arg)
{} {}
#endif
Intvar_log_event(const char* buf); Intvar_log_event(const char* buf);
~Intvar_log_event() {} ~Intvar_log_event() {}
Log_event_type get_type_code() { return INTVAR_EVENT;} Log_event_type get_type_code() { return INTVAR_EVENT;}
const char* get_var_type_name(); const char* get_var_type_name();
int get_data_size() { return sizeof(type) + sizeof(val);} int get_data_size() { return sizeof(type) + sizeof(val);}
int write_data(IO_CACHE* file); int write_data(IO_CACHE* file);
bool is_valid() { return 1; }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
void pack_info(String* packet); void pack_info(String* packet);
int exec_event(struct st_master_info* mi);
#endif #endif
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
...@@ -492,15 +461,21 @@ class Intvar_log_event: public Log_event ...@@ -492,15 +461,21 @@ class Intvar_log_event: public Log_event
class Stop_log_event: public Log_event class Stop_log_event: public Log_event
{ {
public: public:
Stop_log_event() :Log_event(time(NULL)) #ifndef MYSQL_CLIENT
Stop_log_event() :Log_event((THD*)0)
{} {}
#endif
Stop_log_event(const char* buf):Log_event(buf) Stop_log_event(const char* buf):Log_event(buf)
{ {
} }
~Stop_log_event() {} ~Stop_log_event() {}
Log_event_type get_type_code() { return STOP_EVENT;} Log_event_type get_type_code() { return STOP_EVENT;}
bool is_valid() { return 1; }
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void print(FILE* file, bool short_form = 0, char* last_db = 0); void print(FILE* file, bool short_form = 0, char* last_db = 0);
#endif
#ifndef MYSQL_CLIENT
int exec_event(struct st_master_info* mi);
#endif #endif
}; };
...@@ -511,16 +486,16 @@ class Rotate_log_event: public Log_event ...@@ -511,16 +486,16 @@ class Rotate_log_event: public Log_event
uchar ident_len; uchar ident_len;
ulonglong pos; ulonglong pos;
bool alloced; bool alloced;
#ifndef MYSQL_CLIENT
Rotate_log_event(const char* new_log_ident_arg, uint ident_len_arg = 0, Rotate_log_event(THD* thd_arg, const char* new_log_ident_arg,
ulonglong pos_arg = 4) : uint ident_len_arg = 0,ulonglong pos_arg = 4) :
Log_event(time(NULL)), Log_event(thd_arg),
new_log_ident(new_log_ident_arg), new_log_ident(new_log_ident_arg),
ident_len(ident_len_arg ? ident_len_arg : ident_len(ident_len_arg ? ident_len_arg :
(uint) strlen(new_log_ident_arg)), pos(pos_arg), (uint) strlen(new_log_ident_arg)), pos(pos_arg),
alloced(0) alloced(0)
{} {}
#endif
Rotate_log_event(const char* buf, int event_len); Rotate_log_event(const char* buf, int event_len);
~Rotate_log_event() ~Rotate_log_event()
{ {
...@@ -529,40 +504,136 @@ class Rotate_log_event: public Log_event ...@@ -529,40 +504,136 @@ class Rotate_log_event: public Log_event
} }
Log_event_type get_type_code() { return ROTATE_EVENT;} Log_event_type get_type_code() { return ROTATE_EVENT;}
int get_data_size() { return ident_len + ROTATE_HEADER_LEN;} int get_data_size() { return ident_len + ROTATE_HEADER_LEN;}
bool is_valid() { return new_log_ident != 0; }
int write_data(IO_CACHE* file); int write_data(IO_CACHE* file);
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
void print(FILE* file, bool short_form = 0, char* last_db = 0); void print(FILE* file, bool short_form = 0, char* last_db = 0);
#endif #endif
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
void pack_info(String* packet); void pack_info(String* packet);
int exec_event(struct st_master_info* mi);
#endif #endif
}; };
/* the classes below are for the new LOAD DATA INFILE logging */ /* the classes below are for the new LOAD DATA INFILE logging */
class Create_file_log_event: public Log_event class Create_file_log_event: public Load_log_event
{ {
protected:
// pretend we are Load event, so we can write out just
// our Load part - used on the slave when writing event out to
// SQL_LOAD-*.info file
bool fake_base;
public: public:
char* db;
char* tbl_name;
uint db_len;
uint tbl_name_len;
char* block; char* block;
uint block_len; uint block_len;
uint file_id; uint file_id;
#ifndef MYSQL_CLIENT
#ifndef MYSQL_CLIENT Create_file_log_event(THD* thd, sql_exchange* ex, const char* db_arg,
Create_file_log_event(THD* thd, TABLE_LIST * table, char* block_arg, const char* table_name_arg,
uint block_len_arg); List<Item>& fields_arg, enum enum_duplicates handle_dup,
char* block_arg, uint block_len_arg);
#endif #endif
Create_file_log_event(const char* buf, int event_len); Create_file_log_event(const char* buf, int event_len);
~Create_file_log_event() ~Create_file_log_event()
{ {
} }
Log_event_type get_type_code() { return CREATE_FILE_EVENT;} Log_event_type get_type_code() { return fake_base ? LOAD_EVENT :
int get_data_size() { return tbl_name_len + block_len + CREATE_FILE_EVENT;}
CREATE_FILE_HEADER_LEN ;} int get_data_size() { return fake_base ? Load_log_event::get_data_size() :
Load_log_event::get_data_size() +
4 + 1 + block_len;}
int get_data_body_offset() { return fake_base ? LOAD_EVENT_OVERHEAD:
LOAD_EVENT_OVERHEAD + CREATE_FILE_HEADER_LEN; }
bool is_valid() { return block != 0; }
int write_data_header(IO_CACHE* file);
int write_data_body(IO_CACHE* file);
int write_base(IO_CACHE* file); // cut out Create_file extentions and
// write it as Load event - used on the slave
#ifdef MYSQL_CLIENT
void print(FILE* file, bool short_form = 0, char* last_db = 0);
#endif
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
int exec_event(struct st_master_info* mi);
#endif
};
class Append_block_log_event: public Log_event
{
public:
char* block;
uint block_len;
uint file_id;
#ifndef MYSQL_CLIENT
Append_block_log_event(THD* thd, char* block_arg,
uint block_len_arg);
int exec_event(struct st_master_info* mi);
#endif
Append_block_log_event(const char* buf, int event_len);
~Append_block_log_event()
{
}
Log_event_type get_type_code() { return APPEND_BLOCK_EVENT;}
int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;}
bool is_valid() { return block != 0; }
int write_data(IO_CACHE* file);
#ifdef MYSQL_CLIENT
void print(FILE* file, bool short_form = 0, char* last_db = 0);
#endif
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
#endif
};
class Delete_file_log_event: public Log_event
{
public:
uint file_id;
#ifndef MYSQL_CLIENT
Delete_file_log_event(THD* thd);
#endif
Delete_file_log_event(const char* buf, int event_len);
~Delete_file_log_event()
{
}
Log_event_type get_type_code() { return DELETE_FILE_EVENT;}
int get_data_size() { return DELETE_FILE_HEADER_LEN ;}
bool is_valid() { return file_id != 0; }
int write_data(IO_CACHE* file);
#ifdef MYSQL_CLIENT
void print(FILE* file, bool short_form = 0, char* last_db = 0);
#endif
#ifndef MYSQL_CLIENT
void pack_info(String* packet);
int exec_event(struct st_master_info* mi);
#endif
};
class Execute_load_log_event: public Log_event
{
public:
uint file_id;
#ifndef MYSQL_CLIENT
Execute_load_log_event(THD* thd);
#endif
Execute_load_log_event(const char* buf, int event_len);
~Execute_load_log_event()
{
}
Log_event_type get_type_code() { return EXEC_LOAD_EVENT;}
int get_data_size() { return EXEC_LOAD_HEADER_LEN ;}
bool is_valid() { return file_id != 0; }
int write_data(IO_CACHE* file); int write_data(IO_CACHE* file);
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
...@@ -570,6 +641,7 @@ class Create_file_log_event: public Log_event ...@@ -570,6 +641,7 @@ class Create_file_log_event: public Log_event
#endif #endif
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
void pack_info(String* packet); void pack_info(String* packet);
int exec_event(struct st_master_info* mi);
#endif #endif
}; };
......
...@@ -57,7 +57,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, ...@@ -57,7 +57,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
/* There is no file in net_reading */ /* There is no file in net_reading */
info->file= file; info->file= file;
info->pre_read = info->post_read = 0; info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0;
if (!cachesize) if (!cachesize)
if (! (cachesize= my_default_record_cache_size)) if (! (cachesize= my_default_record_cache_size))
DBUG_RETURN(1); /* No cache requested */ DBUG_RETURN(1); /* No cache requested */
...@@ -681,7 +682,10 @@ int flush_io_cache(IO_CACHE *info) ...@@ -681,7 +682,10 @@ int flush_io_cache(IO_CACHE *info)
int end_io_cache(IO_CACHE *info) int end_io_cache(IO_CACHE *info)
{ {
int error=0; int error=0;
IO_CACHE_CALLBACK pre_close;
DBUG_ENTER("end_io_cache"); DBUG_ENTER("end_io_cache");
if((pre_close=info->pre_close))
(*pre_close)(info);
if (info->buffer) if (info->buffer)
{ {
if (info->file != -1) /* File doesn't exist */ if (info->file != -1) /* File doesn't exist */
......
...@@ -223,7 +223,7 @@ static bool opt_log,opt_update_log,opt_bin_log,opt_slow_log,opt_noacl, ...@@ -223,7 +223,7 @@ static bool opt_log,opt_update_log,opt_bin_log,opt_slow_log,opt_noacl,
opt_ansi_mode=0,opt_myisam_log=0, opt_ansi_mode=0,opt_myisam_log=0,
opt_large_files=sizeof(my_off_t) > 4; opt_large_files=sizeof(my_off_t) > 4;
bool opt_sql_bin_update = 0, opt_log_slave_updates = 0, opt_safe_show_db=0, bool opt_sql_bin_update = 0, opt_log_slave_updates = 0, opt_safe_show_db=0,
opt_show_slave_auth_info = 0; opt_show_slave_auth_info = 0, opt_old_rpl_compat = 0;
FILE *bootstrap_file=0; FILE *bootstrap_file=0;
int segfaulted = 0; // ensure we do not enter SIGSEGV handler twice int segfaulted = 0; // ensure we do not enter SIGSEGV handler twice
extern MASTER_INFO glob_mi; extern MASTER_INFO glob_mi;
...@@ -718,6 +718,7 @@ void clean_up(bool print_message) ...@@ -718,6 +718,7 @@ void clean_up(bool print_message)
free_defaults(defaults_argv); free_defaults(defaults_argv);
my_free(charsets_list, MYF(MY_ALLOW_ZERO_PTR)); my_free(charsets_list, MYF(MY_ALLOW_ZERO_PTR));
my_free(mysql_tmpdir,MYF(0)); my_free(mysql_tmpdir,MYF(0));
my_free(slave_load_tmpdir,MYF(0));
x_free(opt_bin_logname); x_free(opt_bin_logname);
bitmap_free(&temp_pool); bitmap_free(&temp_pool);
free_max_user_conn(); free_max_user_conn();
...@@ -2518,7 +2519,8 @@ enum options { ...@@ -2518,7 +2519,8 @@ enum options {
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINK, OPT_REPORT_HOST, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINK, OPT_REPORT_HOST,
OPT_REPORT_USER, OPT_REPORT_PASSWORD, OPT_REPORT_PORT, OPT_REPORT_USER, OPT_REPORT_PASSWORD, OPT_REPORT_PORT,
OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL, OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL,
OPT_SHOW_SLAVE_AUTH_INFO}; OPT_SHOW_SLAVE_AUTH_INFO, OPT_OLD_RPL_COMPAT,
OPT_SLAVE_LOAD_TMPDIR};
static struct option long_options[] = { static struct option long_options[] = {
{"ansi", no_argument, 0, 'a'}, {"ansi", no_argument, 0, 'a'},
...@@ -2611,6 +2613,7 @@ static struct option long_options[] = { ...@@ -2611,6 +2613,7 @@ static struct option long_options[] = {
OPT_SAFEMALLOC_MEM_LIMIT}, OPT_SAFEMALLOC_MEM_LIMIT},
{"new", no_argument, 0, 'n'}, {"new", no_argument, 0, 'n'},
{"old-protocol", no_argument, 0, 'o'}, {"old-protocol", no_argument, 0, 'o'},
{"old-rpl-compat", no_argument, 0, (int)OPT_OLD_RPL_COMPAT},
#ifdef ONE_THREAD #ifdef ONE_THREAD
{"one-thread", no_argument, 0, (int) OPT_ONE_THREAD}, {"one-thread", no_argument, 0, (int) OPT_ONE_THREAD},
#endif #endif
...@@ -2659,6 +2662,7 @@ static struct option long_options[] = { ...@@ -2659,6 +2662,7 @@ static struct option long_options[] = {
{"skip-stack-trace", no_argument, 0, (int) OPT_SKIP_STACK_TRACE}, {"skip-stack-trace", no_argument, 0, (int) OPT_SKIP_STACK_TRACE},
{"skip-symlink", no_argument, 0, (int) OPT_SKIP_SYMLINK}, {"skip-symlink", no_argument, 0, (int) OPT_SKIP_SYMLINK},
{"skip-thread-priority", no_argument, 0, (int) OPT_SKIP_PRIOR}, {"skip-thread-priority", no_argument, 0, (int) OPT_SKIP_PRIOR},
{"slave-load-tmpdir", required_argument, 0, (int) OPT_SLAVE_LOAD_TMPDIR},
{"sql-bin-update-same", no_argument, 0, (int) OPT_SQL_BIN_UPDATE_SAME}, {"sql-bin-update-same", no_argument, 0, (int) OPT_SQL_BIN_UPDATE_SAME},
#include "sslopt-longopts.h" #include "sslopt-longopts.h"
#ifdef __WIN__ #ifdef __WIN__
...@@ -3311,6 +3315,12 @@ static void get_options(int argc,char **argv) ...@@ -3311,6 +3315,12 @@ static void get_options(int argc,char **argv)
safemalloc_mem_limit = atoi(optarg); safemalloc_mem_limit = atoi(optarg);
#endif #endif
break; break;
case OPT_SLAVE_LOAD_TMPDIR:
slave_load_tmpdir = my_strdup(optarg, MYF(MY_FAE));
break;
case OPT_OLD_RPL_COMPAT:
opt_old_rpl_compat = 1;
break;
case OPT_SHOW_SLAVE_AUTH_INFO: case OPT_SHOW_SLAVE_AUTH_INFO:
opt_show_slave_auth_info = 1; opt_show_slave_auth_info = 1;
break; break;
...@@ -4377,6 +4387,14 @@ static void fix_paths(void) ...@@ -4377,6 +4387,14 @@ static void fix_paths(void)
mysql_tmpdir=(char*) my_realloc(mysql_tmpdir,(uint) strlen(mysql_tmpdir)+1, mysql_tmpdir=(char*) my_realloc(mysql_tmpdir,(uint) strlen(mysql_tmpdir)+1,
MYF(MY_HOLD_ON_ERROR)); MYF(MY_HOLD_ON_ERROR));
} }
if (!slave_load_tmpdir)
{
int copy_len;
slave_load_tmpdir = (char*) my_malloc((copy_len=strlen(mysql_tmpdir) + 1)
, MYF(MY_FAE));
// no need to check return value, if we fail, my_malloc() never returns
memcpy(slave_load_tmpdir, mysql_tmpdir, copy_len);
}
} }
......
...@@ -24,10 +24,8 @@ ...@@ -24,10 +24,8 @@
#include <thr_alarm.h> #include <thr_alarm.h>
#include <my_dir.h> #include <my_dir.h>
#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\
"FIRST")
volatile bool slave_running = 0; volatile bool slave_running = 0;
char* slave_load_tmpdir = 0;
pthread_t slave_real_id; pthread_t slave_real_id;
MASTER_INFO glob_mi; MASTER_INFO glob_mi;
HASH replicate_do_table, replicate_ignore_table; HASH replicate_do_table, replicate_ignore_table;
...@@ -41,16 +39,17 @@ THD* slave_thd = 0; ...@@ -41,16 +39,17 @@ THD* slave_thd = 0;
// when slave thread exits, we need to remember the temporary tables so we // when slave thread exits, we need to remember the temporary tables so we
// can re-use them on slave start // can re-use them on slave start
static int last_slave_errno = 0; int last_slave_errno = 0;
static char last_slave_error[1024] = ""; char last_slave_error[MAX_SLAVE_ERRMSG] = "";
#ifndef DBUG_OFF #ifndef DBUG_OFF
int disconnect_slave_event_count = 0, abort_slave_event_count = 0; int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
static int events_till_disconnect = -1, events_till_abort = -1; static int events_till_disconnect = -1;
int events_till_abort = -1;
static int stuck_count = 0; static int stuck_count = 0;
#endif #endif
inline void skip_load_data_infile(NET* net); void skip_load_data_infile(NET* net);
inline bool slave_killed(THD* thd); inline bool slave_killed(THD* thd);
static int init_slave_thread(THD* thd); static int init_slave_thread(THD* thd);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
...@@ -59,8 +58,7 @@ static int safe_sleep(THD* thd, int sec); ...@@ -59,8 +58,7 @@ static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db, static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name); const char* table_name);
inline char* rewrite_db(char* db); char* rewrite_db(char* db);
static int check_expected_error(THD* thd, int expected_error);
static void free_table_ent(TABLE_RULE_ENT* e) static void free_table_ent(TABLE_RULE_ENT* e)
{ {
...@@ -219,7 +217,16 @@ inline bool slave_killed(THD* thd) ...@@ -219,7 +217,16 @@ inline bool slave_killed(THD* thd)
return abort_slave || abort_loop || thd->killed; return abort_slave || abort_loop || thd->killed;
} }
inline void skip_load_data_infile(NET* net) void slave_print_error(int err_code, const char* msg, ...)
{
va_list args;
va_start(args,msg);
my_vsnprintf(last_slave_error, sizeof(last_slave_error), msg, args);
sql_print_error("Slave: %s, error_code=%d", last_slave_error, err_code);
last_slave_errno = err_code;
}
void skip_load_data_infile(NET* net)
{ {
(void)my_net_write(net, "\xfb/dev/null", 10); (void)my_net_write(net, "\xfb/dev/null", 10);
(void)net_flush(net); (void)net_flush(net);
...@@ -227,7 +234,7 @@ inline void skip_load_data_infile(NET* net) ...@@ -227,7 +234,7 @@ inline void skip_load_data_infile(NET* net)
send_ok(net); // the master expects it send_ok(net); // the master expects it
} }
inline char* rewrite_db(char* db) char* rewrite_db(char* db)
{ {
if(replicate_rewrite_db.is_empty() || !db) return db; if(replicate_rewrite_db.is_empty() || !db) return db;
I_List_iterator<i_string_pair> it(replicate_rewrite_db); I_List_iterator<i_string_pair> it(replicate_rewrite_db);
...@@ -904,7 +911,7 @@ server_errno=%d)", ...@@ -904,7 +911,7 @@ server_errno=%d)",
return len - 1; return len - 1;
} }
static int check_expected_error(THD* thd, int expected_error) int check_expected_error(THD* thd, int expected_error)
{ {
switch(expected_error) switch(expected_error)
{ {
...@@ -935,6 +942,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -935,6 +942,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
if (ev) if (ev)
{ {
int type_code = ev->get_type_code(); int type_code = ev->get_type_code();
int exec_res;
if (ev->server_id == ::server_id || slave_skip_counter) if (ev->server_id == ::server_id || slave_skip_counter)
{ {
if(type_code == LOAD_EVENT) if(type_code == LOAD_EVENT)
...@@ -952,320 +960,12 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) ...@@ -952,320 +960,12 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
thd->set_time(); // time the query thd->set_time(); // time the query
if(!thd->log_seq) if(!thd->log_seq)
thd->log_seq = ev->log_seq; thd->log_seq = ev->log_seq;
if (!ev->when) if (!ev->when)
ev->when = time(NULL); ev->when = time(NULL);
ev->thd = thd;
switch(type_code) { exec_res = ev->exec_event(mi);
case QUERY_EVENT: delete ev;
{ return exec_res;
Query_log_event* qev = (Query_log_event*)ev;
int q_len = qev->q_len;
int expected_error,actual_error = 0;
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = rewrite_db((char*)qev->db);
if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->query = (char*)qev->query;
thd->set_time((time_t)qev->when);
thd->current_tablenr = 0;
VOID(pthread_mutex_lock(&LOCK_thread_count));
thd->query_id = query_id++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
thd->query_error = 0; // clear error
thd->net.last_errno = 0;
thd->net.last_error[0] = 0;
thd->slave_proxy_id = qev->thread_id; // for temp tables
// sanity check to make sure the master did not get a really bad
// error on the query
if (!check_expected_error(thd, (expected_error = qev->error_code)))
{
mysql_parse(thd, thd->query, q_len);
if (expected_error !=
(actual_error = thd->net.last_errno) && expected_error)
{
const char* errmsg = "Slave: did not get the expected error\
running query from master - expected: '%s'(%d), got '%s'(%d)";
sql_print_error(errmsg, ER_SAFE(expected_error),
expected_error,
actual_error ? thd->net.last_error:"no error",
actual_error);
thd->query_error = 1;
}
else if (expected_error == actual_error)
{
thd->query_error = 0;
*last_slave_error = 0;
last_slave_errno = 0;
}
}
else
{
// master could be inconsistent, abort and tell DBA to check/fix it
thd->db = thd->query = 0;
thd->convert_set = 0;
close_thread_tables(thd);
free_root(&thd->mem_root,0);
delete ev;
return 1;
}
}
thd->db = 0; // prevent db from being freed
thd->query = 0; // just to be sure
// assume no convert for next query unless set explictly
thd->convert_set = 0;
close_thread_tables(thd);
if (thd->query_error || thd->fatal_error)
{
sql_print_error("Slave: error running query '%s' ",
qev->query);
last_slave_errno = actual_error ? actual_error : -1;
my_snprintf(last_slave_error, sizeof(last_slave_error),
"error '%s' on query '%s'",
actual_error ? thd->net.last_error :
"unexpected success or fatal error",
qev->query
);
free_root(&thd->mem_root,0);
delete ev;
return 1;
}
free_root(&thd->mem_root,0);
thd->log_seq = 0;
mi->inc_pos(event_len, ev->log_seq);
delete ev;
flush_master_info(mi);
break;
}
case SLAVE_EVENT:
{
if(mysql_bin_log.is_open())
{
Slave_log_event *sev = (Slave_log_event*)ev;
mysql_bin_log.write(sev);
}
thd->log_seq = 0;
mi->inc_pos(event_len, ev->log_seq);
flush_master_info(mi);
delete ev;
break;
}
case LOAD_EVENT:
{
Load_log_event* lev = (Load_log_event*)ev;
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = rewrite_db((char*)lev->db);
thd->query = 0;
thd->query_error = 0;
if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->set_time((time_t)lev->when);
thd->current_tablenr = 0;
VOID(pthread_mutex_lock(&LOCK_thread_count));
thd->query_id = query_id++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
TABLE_LIST tables;
bzero((char*) &tables,sizeof(tables));
tables.db = thd->db;
tables.name = tables.real_name = (char*)lev->table_name;
tables.lock_type = TL_WRITE;
// the table will be opened in mysql_load
if(table_rules_on && !tables_ok(thd, &tables))
{
skip_load_data_infile(net);
}
else
{
enum enum_duplicates handle_dup = DUP_IGNORE;
if(lev->sql_ex.opt_flags && REPLACE_FLAG)
handle_dup = DUP_REPLACE;
sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags &&
DUMPFILE_FLAG );
String field_term(&lev->sql_ex.field_term, 1),
enclosed(&lev->sql_ex.enclosed, 1),
line_term(&lev->sql_ex.line_term,1),
escaped(&lev->sql_ex.escaped, 1),
line_start(&lev->sql_ex.line_start, 1);
ex.field_term = &field_term;
if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
ex.field_term->length(0);
ex.enclosed = &enclosed;
if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY)
ex.enclosed->length(0);
ex.line_term = &line_term;
if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY)
ex.line_term->length(0);
ex.line_start = &line_start;
if(lev->sql_ex.empty_flags & LINE_START_EMPTY)
ex.line_start->length(0);
ex.escaped = &escaped;
if(lev->sql_ex.empty_flags & ESCAPED_EMPTY)
ex.escaped->length(0);
ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
ex.field_term->length(0);
ex.skip_lines = lev->skip_lines;
List<Item> fields;
lev->set_fields(fields);
thd->slave_proxy_id = thd->thread_id;
thd->net.vio = net->vio;
// mysql_load will use thd->net to read the file
thd->net.pkt_nr = net->pkt_nr;
// make sure the client does not get confused
// about the packet sequence
if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1,
TL_WRITE))
thd->query_error = 1;
if(thd->cuted_fields)
sql_print_error("Slave: load data infile at position %s in log \
'%s' produced %d warning(s)", llstr(glob_mi.pos,llbuff), RPL_LOG_NAME,
thd->cuted_fields );
net->pkt_nr = thd->net.pkt_nr;
}
}
else
{
// we will just ask the master to send us /dev/null if we do not
// want to load the data :-)
skip_load_data_infile(net);
}
thd->net.vio = 0;
thd->db = 0;// prevent db from being freed
close_thread_tables(thd);
if(thd->query_error)
{
int sql_error = thd->net.last_errno;
if(!sql_error)
sql_error = ER_UNKNOWN_ERROR;
sql_print_error("Slave: Error '%s' running load data infile ",
ER(sql_error));
delete ev;
free_root(&thd->mem_root,0);
return 1;
}
thd->log_seq = 0;
free_root(&thd->mem_root,0);
if(thd->fatal_error)
{
sql_print_error("Slave: Fatal error running query '%s' ",
thd->query);
delete ev;
return 1;
}
mi->inc_pos(event_len, ev->log_seq);
delete ev;
flush_master_info(mi);
break;
}
case START_EVENT:
close_temporary_tables(thd);
mi->inc_pos(event_len, ev->log_seq);
flush_master_info(mi);
delete ev;
thd->log_seq = 0;
break;
case STOP_EVENT:
if(mi->pos > 4) // stop event should be ignored after rotate event
{
close_temporary_tables(thd);
mi->inc_pos(event_len, ev->log_seq);
flush_master_info(mi);
}
delete ev;
thd->log_seq = 0;
break;
case ROTATE_EVENT:
{
Rotate_log_event* rev = (Rotate_log_event*)ev;
int ident_len = rev->ident_len;
bool rotate_binlog = 0, write_slave_event = 0;
char* log_name = mi->log_file_name;
pthread_mutex_lock(&mi->lock);
// rotate local binlog only if the name of remote has changed
if (!*log_name || !(log_name[ident_len] == 0 &&
!memcmp(log_name, rev->new_log_ident, ident_len)))
{
write_slave_event = (!(rev->flags & LOG_EVENT_FORCED_ROTATE_F)
&& mysql_bin_log.is_open());
rotate_binlog = (*log_name && write_slave_event);
memcpy(log_name, rev->new_log_ident,ident_len );
log_name[ident_len] = 0;
}
mi->pos = rev->pos;
mi->last_log_seq = ev->log_seq;
#ifndef DBUG_OFF
if (abort_slave_event_count)
++events_till_abort;
#endif
if (rotate_binlog)
{
mysql_bin_log.new_file();
mi->last_log_seq = 0;
}
pthread_cond_broadcast(&mi->cond);
pthread_mutex_unlock(&mi->lock);
flush_master_info(mi);
if (write_slave_event)
{
Slave_log_event s(thd, mi);
if (s.master_host)
{
s.set_log_seq(0, &mysql_bin_log);
s.server_id = ::server_id;
mysql_bin_log.write(&s);
}
}
delete ev;
thd->log_seq = 0;
break;
}
case INTVAR_EVENT:
{
Intvar_log_event* iev = (Intvar_log_event*)ev;
switch(iev->type)
{
case LAST_INSERT_ID_EVENT:
thd->last_insert_id_used = 1;
thd->last_insert_id = iev->val;
break;
case INSERT_ID_EVENT:
thd->next_insert_id = iev->val;
break;
}
mi->inc_pending(event_len);
delete ev;
// do not reset log_seq
break;
}
}
} }
else else
{ {
...@@ -1275,7 +975,6 @@ This may also be a network problem, or just a bug in the master or slave code.\ ...@@ -1275,7 +975,6 @@ This may also be a network problem, or just a bug in the master or slave code.\
"); ");
return 1; return 1;
} }
return 0;
} }
// slave thread // slave thread
...@@ -1363,6 +1062,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) ...@@ -1363,6 +1062,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
connected: connected:
thd->slave_net = &mysql->net;
// register ourselves with the master // register ourselves with the master
// if fails, this is not fatal - we just print the error message and go // if fails, this is not fatal - we just print the error message and go
// on with life // on with life
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
#define SLAVE_NET_TIMEOUT 3600 #define SLAVE_NET_TIMEOUT 3600
extern ulong slave_net_timeout; extern ulong slave_net_timeout;
extern char* slave_load_tmpdir;
typedef struct st_master_info typedef struct st_master_info
{ {
...@@ -70,6 +70,11 @@ typedef struct st_table_rule_ent ...@@ -70,6 +70,11 @@ typedef struct st_table_rule_ent
#define TABLE_RULE_HASH_SIZE 16 #define TABLE_RULE_HASH_SIZE 16
#define TABLE_RULE_ARR_SIZE 16 #define TABLE_RULE_ARR_SIZE 16
#define MAX_SLAVE_ERRMSG 1024
#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\
"FIRST")
int flush_master_info(MASTER_INFO* mi); int flush_master_info(MASTER_INFO* mi);
int register_slave_on_master(MYSQL* mysql); int register_slave_on_master(MYSQL* mysql);
...@@ -97,6 +102,10 @@ int add_table_rule(HASH* h, const char* table_spec); ...@@ -97,6 +102,10 @@ int add_table_rule(HASH* h, const char* table_spec);
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec); int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec);
void init_table_rule_hash(HASH* h, bool* h_inited); void init_table_rule_hash(HASH* h, bool* h_inited);
void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited); void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited);
char* rewrite_db(char* db);
int check_expected_error(THD* thd, int error_code);
void skip_load_data_infile(NET* net);
void slave_print_error(int err_code, const char* msg, ...);
void end_slave(); // clean up void end_slave(); // clean up
int init_master_info(MASTER_INFO* mi); int init_master_info(MASTER_INFO* mi);
...@@ -109,6 +118,11 @@ extern uint32 slave_skip_counter; ...@@ -109,6 +118,11 @@ extern uint32 slave_skip_counter;
// we want to restart it skipping one or more events in the master log that // we want to restart it skipping one or more events in the master log that
// have caused errors, and have been manually applied by DBA already // have caused errors, and have been manually applied by DBA already
extern int last_slave_errno;
#ifndef DBUG_OFF
extern int events_till_abort;
#endif
extern char last_slave_error[MAX_SLAVE_ERRMSG];
extern pthread_t slave_real_id; extern pthread_t slave_real_id;
extern THD* slave_thd; extern THD* slave_thd;
extern MASTER_INFO glob_mi; extern MASTER_INFO glob_mi;
......
...@@ -121,6 +121,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0), ...@@ -121,6 +121,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
proc_info="login"; proc_info="login";
where="field list"; where="field list";
server_id = ::server_id; server_id = ::server_id;
slave_net = 0;
server_status=SERVER_STATUS_AUTOCOMMIT; server_status=SERVER_STATUS_AUTOCOMMIT;
update_lock_default= low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE; update_lock_default= low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE;
options=thd_startup_options; options=thd_startup_options;
......
...@@ -86,9 +86,7 @@ class MYSQL_LOG { ...@@ -86,9 +86,7 @@ class MYSQL_LOG {
bool write(THD *thd, enum enum_server_command command,const char *format,...); bool write(THD *thd, enum enum_server_command command,const char *format,...);
bool write(THD *thd, const char *query, uint query_length, bool write(THD *thd, const char *query, uint query_length,
time_t query_start=0); time_t query_start=0);
bool write(Query_log_event* event_info); // binary log write bool write(Log_event* event_info); // binary log write
bool write(Load_log_event* event_info);
bool write(Slave_log_event* event_info);
bool write(IO_CACHE *cache); bool write(IO_CACHE *cache);
int generate_new_name(char *new_name,const char *old_name); int generate_new_name(char *new_name,const char *old_name);
void make_log_name(char* buf, const char* log_ident); void make_log_name(char* buf, const char* log_ident);
...@@ -300,6 +298,8 @@ class THD :public ilink { ...@@ -300,6 +298,8 @@ class THD :public ilink {
ulong slave_proxy_id; // in slave thread we need to know in behalf of which ulong slave_proxy_id; // in slave thread we need to know in behalf of which
// thread the query is being run to replicate temp tables properly // thread the query is being run to replicate temp tables properly
NET* slave_net; // network connection from slave to master
THD(); THD();
~THD(); ~THD();
bool store_globals(); bool store_globals();
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "mysql_priv.h" #include "mysql_priv.h"
#include <my_dir.h> #include <my_dir.h>
#include <m_ctype.h> #include <m_ctype.h>
#include "sql_repl.h"
class READ_INFO { class READ_INFO {
File file; File file;
...@@ -32,6 +33,7 @@ class READ_INFO { ...@@ -32,6 +33,7 @@ class READ_INFO {
int field_term_char,line_term_char,enclosed_char,escape_char; int field_term_char,line_term_char,enclosed_char,escape_char;
int *stack,*stack_pos; int *stack,*stack_pos;
bool found_end_of_line,start_of_line,eof; bool found_end_of_line,start_of_line,eof;
bool need_end_io_cache;
IO_CACHE cache; IO_CACHE cache;
NET *io_net; NET *io_net;
...@@ -50,6 +52,18 @@ class READ_INFO { ...@@ -50,6 +52,18 @@ class READ_INFO {
char unescape(char chr); char unescape(char chr);
int terminator(char *ptr,uint length); int terminator(char *ptr,uint length);
bool find_start_of_fields(); bool find_start_of_fields();
// we need to force cache close before destructor is invoked to log
// the last read block
void end_io_cache()
{
::end_io_cache(&cache);
need_end_io_cache = 0;
}
// either this method, or we need to make cache public
// arg must be set from mysql_load() since constructor does not see
// either the table or THD value
void set_io_cache_arg(void* arg) { cache.arg = arg; }
}; };
static int read_fixed_length(THD *thd,COPY_INFO &info,TABLE *table, static int read_fixed_length(THD *thd,COPY_INFO &info,TABLE *table,
...@@ -67,10 +81,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -67,10 +81,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
File file; File file;
TABLE *table; TABLE *table;
int error; int error;
uint save_skip_lines = ex->skip_lines;
String *field_term=ex->field_term,*escaped=ex->escaped, String *field_term=ex->field_term,*escaped=ex->escaped,
*enclosed=ex->enclosed; *enclosed=ex->enclosed;
bool is_fifo=0; bool is_fifo=0;
LOAD_FILE_INFO lf_info;
char * db = table_list->db ? table_list->db : thd->db;
DBUG_ENTER("mysql_load"); DBUG_ENTER("mysql_load");
if (escaped->length() > 1 || enclosed->length() > 1) if (escaped->length() > 1 || enclosed->length() > 1)
...@@ -79,7 +94,6 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -79,7 +94,6 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
MYF(0)); MYF(0));
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
if (!(table = open_ltable(thd,table_list,lock_type))) if (!(table = open_ltable(thd,table_list,lock_type)))
DBUG_RETURN(-1); DBUG_RETURN(-1);
if (!fields.elements) if (!fields.elements)
...@@ -161,8 +175,9 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -161,8 +175,9 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (!my_stat(name,&stat_info,MYF(MY_WME))) if (!my_stat(name,&stat_info,MYF(MY_WME)))
DBUG_RETURN(-1); DBUG_RETURN(-1);
// the file must be: // if we are not in slave thread, the file must be:
if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others if (!thd->slave_thread &&
!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
#ifndef __EMX__ #ifndef __EMX__
(stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
#endif #endif
...@@ -195,13 +210,27 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -195,13 +210,27 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
DBUG_RETURN(-1); // Can't allocate buffers DBUG_RETURN(-1); // Can't allocate buffers
} }
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
{
lf_info.thd = thd;
lf_info.ex = ex;
lf_info.db = db;
lf_info.table_name = table_list->real_name;
lf_info.fields = &fields;
lf_info.handle_dup = handle_duplicates;
lf_info.wrote_create_file = 0;
lf_info.last_pos_in_file = HA_POS_ERROR;
read_info.set_io_cache_arg((void*)&lf_info);
}
restore_record(table,2); restore_record(table,2);
thd->count_cuted_fields=1; /* calc cuted fields */ thd->count_cuted_fields=1; /* calc cuted fields */
thd->cuted_fields=0L; thd->cuted_fields=0L;
if (ex->line_term->length() && field_term->length()) if (ex->line_term->length() && field_term->length())
{ {
while (ex->skip_lines--) // ex->skip_lines needs to be preserved for logging
uint skip_lines = ex->skip_lines;
while (skip_lines--)
{ {
if (read_info.next_line()) if (read_info.next_line())
break; break;
...@@ -240,7 +269,14 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -240,7 +269,14 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
table->copy_blobs=0; table->copy_blobs=0;
thd->count_cuted_fields=0; /* Don`t calc cuted fields */ thd->count_cuted_fields=0; /* Don`t calc cuted fields */
if (error) if (error)
{
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
{
Delete_file_log_event d(thd);
mysql_bin_log.write(&d);
}
DBUG_RETURN(-1); // Error on read DBUG_RETURN(-1); // Error on read
}
sprintf(name,ER(ER_LOAD_INFO),info.records,info.deleted, sprintf(name,ER(ER_LOAD_INFO),info.records,info.deleted,
info.records-info.copied,thd->cuted_fields); info.records-info.copied,thd->cuted_fields);
send_ok(&thd->net,info.copied+info.deleted,0L,name); send_ok(&thd->net,info.copied+info.deleted,0L,name);
...@@ -250,12 +286,20 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, ...@@ -250,12 +286,20 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (!table->file->has_transactions()) if (!table->file->has_transactions())
thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; thd->options|=OPTION_STATUS_NO_TRANS_UPDATE;
if (!read_file_from_client && mysql_bin_log.is_open()) if (mysql_bin_log.is_open())
{ {
ex->skip_lines = save_skip_lines; if (opt_old_rpl_compat && !read_file_from_client)
Load_log_event qinfo(thd, ex, table->table_name, fields, {
Load_log_event qinfo(thd, ex, db, table->table_name, fields,
handle_duplicates); handle_duplicates);
mysql_bin_log.write(&qinfo); mysql_bin_log.write(&qinfo);
}
if (!opt_old_rpl_compat)
{
read_info.end_io_cache(); // make sure last block gets logged
Execute_load_log_event e(thd);
mysql_bin_log.write(&e);
}
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -480,6 +524,13 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term, ...@@ -480,6 +524,13 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term,
my_free((gptr) buffer,MYF(0)); /* purecov: inspected */ my_free((gptr) buffer,MYF(0)); /* purecov: inspected */
error=1; error=1;
} }
else
{
need_end_io_cache = 1;
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
cache.pre_read = cache.pre_close =
(IO_CACHE_CALLBACK)log_loaded_block;
}
} }
} }
...@@ -488,7 +539,8 @@ READ_INFO::~READ_INFO() ...@@ -488,7 +539,8 @@ READ_INFO::~READ_INFO()
{ {
if (!error) if (!error)
{ {
end_io_cache(&cache); if (need_end_io_cache)
::end_io_cache(&cache);
my_free((gptr) buffer,MYF(0)); my_free((gptr) buffer,MYF(0));
error=1; error=1;
} }
...@@ -798,3 +850,4 @@ bool READ_INFO::find_start_of_fields() ...@@ -798,3 +850,4 @@ bool READ_INFO::find_start_of_fields()
} }
return 0; return 0;
} }
...@@ -1578,3 +1578,33 @@ int load_master_data(THD* thd) ...@@ -1578,3 +1578,33 @@ int load_master_data(THD* thd)
return error; return error;
} }
int log_loaded_block(IO_CACHE* file)
{
LOAD_FILE_INFO* lf_info;
uint block_len ;
if (!(block_len = file->rc_end - file->buffer))
return 0;
lf_info = (LOAD_FILE_INFO*)file->arg;
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
lf_info->last_pos_in_file >= file->pos_in_file)
return 0;
lf_info->last_pos_in_file = file->pos_in_file;
if (lf_info->wrote_create_file)
{
Append_block_log_event a(lf_info->thd, file->buffer,block_len);
mysql_bin_log.write(&a);
}
else
{
Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db,
lf_info->table_name, *lf_info->fields,
lf_info->handle_dup, file->buffer,
block_len);
mysql_bin_log.write(&c);
lf_info->wrote_create_file = 1;
}
return 0;
}
...@@ -12,7 +12,7 @@ typedef struct st_slave_info ...@@ -12,7 +12,7 @@ typedef struct st_slave_info
uint16 port; uint16 port;
} SLAVE_INFO; } SLAVE_INFO;
extern bool opt_show_slave_auth_info; extern bool opt_show_slave_auth_info, opt_old_rpl_compat;
extern HASH slave_list; extern HASH slave_list;
extern char* master_host; extern char* master_host;
extern my_string opt_bin_logname, master_info_file; extern my_string opt_bin_logname, master_info_file;
...@@ -51,4 +51,19 @@ int show_binlogs(THD* thd); ...@@ -51,4 +51,19 @@ int show_binlogs(THD* thd);
extern int init_master_info(MASTER_INFO* mi); extern int init_master_info(MASTER_INFO* mi);
void kill_zombie_dump_threads(uint32 slave_server_id); void kill_zombie_dump_threads(uint32 slave_server_id);
typedef struct st_load_file_info
{
THD* thd;
sql_exchange* ex;
List <Item> *fields;
enum enum_duplicates handle_dup;
char* db;
char* table_name;
bool wrote_create_file;
my_off_t last_pos_in_file;
} LOAD_FILE_INFO;
int log_loaded_block(IO_CACHE* file);
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment