Commit 8fc78e08 authored by unknown's avatar unknown

cleanup

removal of duplicate code in mf_iocache.cc 
work on failsafe replication
work on SEQ_READ_APPEND io cache


include/my_sys.h:
  updates for SEQ_READ_APPEND
libmysql/Makefile.am:
  fix for mysys/mf_iocache.c
libmysql/libmysql.c:
  updates for new format of SHOW SLAVE HOSTS
mysql-test/r/rpl000001.result:
  test replication of LOAD DATA LOCAL INFILE
mysql-test/r/rpl000002.result:
  updated test result
mysql-test/t/rpl000001.test:
  test LOAD DATA LOCAL INFILE
mysys/mf_iocache.c:
  cleanup to remove duplicate functionality
  some work on SEQ_READ_APPEND
sql/mf_iocache.cc:
  cleanup to remove duplicate functionality
sql/repl_failsafe.cc:
  more work on failsafe replication
sql/repl_failsafe.h:
  more work on failsafe replication
sql/slave.cc:
  cleanup
  more work on failsafe replication
sql/sql_load.cc:
  fixed bug on replicating empty file loads
  got LOAD DATA LOCAL INFILE to work again, and to be replicated
sql/sql_repl.cc:
  cleanup
  more work on failsafe replication
sql/sql_repl.h:
  more work on failsafe replication
parent 74f49f9f
......@@ -243,7 +243,10 @@ typedef struct st_typelib { /* Different types saved here */
const char **type_names;
} TYPELIB;
enum cache_type {READ_CACHE,WRITE_CACHE,READ_FIFO,READ_NET,WRITE_NET};
enum cache_type {READ_CACHE,WRITE_CACHE,
SEQ_READ_APPEND /* sequential read or append */,
READ_FIFO,
READ_NET,WRITE_NET};
enum flush_type { FLUSH_KEEP, FLUSH_RELEASE, FLUSH_IGNORE_CHANGED,
FLUSH_FORCE_WRITE};
......@@ -294,6 +297,16 @@ typedef struct st_io_cache /* Used when cacheing files */
{
my_off_t pos_in_file,end_of_file;
byte *rc_pos,*rc_end,*buffer,*rc_request_pos;
my_bool alloced_buffer; /* currented READ_NET is the only one
that will use a buffer allocated somewhere
else
*/
byte *append_buffer, *append_pos, *append_end;
/* for append buffer used in READ_APPEND cache */
#ifdef THREAD
pthread_mutex_t append_buffer_lock;
/* need mutex copying from append buffer to read buffer */
#endif
int (*read_function)(struct st_io_cache *,byte *,uint);
/* callbacks when the actual read I/O happens */
IO_CACHE_CALLBACK pre_read;
......@@ -546,6 +559,7 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
my_off_t seek_offset,pbool use_async_io,
pbool clear_cache);
extern int _my_b_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_seq_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count);
extern int _my_b_get(IO_CACHE *info);
extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count);
......
......@@ -18,7 +18,7 @@
# This file is public domain and comes with NO WARRANTY of any kind
target = libmysqlclient.la
target_defs = -DUNDEF_THREADS_HACK -DDONT_USE_RAID
target_defs = -DUNDEF_THREADS_HACK -DDONT_USE_RAID -DMYSQL_CLIENT
LIBS = @CLIENT_LIBS@
INCLUDES = -I$(srcdir)/../include -I../include \
-I$(srcdir)/.. -I$(top_srcdir) -I.. $(openssl_includes)
......
......@@ -1147,6 +1147,8 @@ static inline int get_slaves_from_master(MYSQL* mysql)
MYSQL_ROW row;
int error = 1;
int has_auth_info;
int port_ind;
if (!mysql->net.vio && !mysql_real_connect(mysql,0,0,0,0,0,0,0))
{
expand_error(mysql, CR_PROBE_MASTER_CONNECT);
......@@ -1162,8 +1164,14 @@ static inline int get_slaves_from_master(MYSQL* mysql)
switch (mysql_num_fields(res))
{
case 3: has_auth_info = 0; break;
case 5: has_auth_info = 1; break;
case 5:
has_auth_info = 0;
port_ind=2;
break;
case 7:
has_auth_info = 1;
port_ind=4;
break;
default:
goto err;
}
......@@ -1175,8 +1183,8 @@ static inline int get_slaves_from_master(MYSQL* mysql)
if (has_auth_info)
{
tmp_user = row[3];
tmp_pass = row[4];
tmp_user = row[2];
tmp_pass = row[3];
}
else
{
......@@ -1184,7 +1192,7 @@ static inline int get_slaves_from_master(MYSQL* mysql)
tmp_pass = mysql->passwd;
}
if (!(slave = spawn_init(mysql, row[1], atoi(row[2]),
if (!(slave = spawn_init(mysql, row[1], atoi(row[port_ind]),
tmp_user, tmp_pass)))
goto err;
......
......@@ -7,6 +7,29 @@ use test;
drop table if exists t1,t3;
create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1;
load data local infile '/home/sasha/bk/mysql-4.0/mysql-test/std_data/words.dat' into table t1;
select * from t1;
word
Aarhus
Aaron
Ababa
aback
abaft
abandon
abandoned
abandoning
abandonment
abandons
Aarhus
Aaron
Ababa
aback
abaft
abandon
abandoned
abandoning
abandonment
abandons
set password = password('foo');
set password = password('');
create table t3(n int);
......@@ -18,7 +41,7 @@ n
2
select sum(length(word)) from t1;
sum(length(word))
71
141
drop table t1,t3;
reset master;
reset slave;
......
......@@ -15,8 +15,8 @@ n
2001
2002
show slave hosts;
Server_id Host Port
2 127.0.0.1 $SLAVE_MYPORT
Server_id Host Port Rpl_recovery_rank Master_id
2 127.0.0.1 9307 2 1
drop table t1;
slave stop;
drop table if exists t2;
......
......@@ -4,6 +4,8 @@ use test;
drop table if exists t1,t3;
create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1;
eval load data local infile '$MYSQL_TEST_DIR/std_data/words.dat' into table t1;
select * from t1;
set password = password('foo');
set password = password('');
create table t3(n int);
......
This diff is collapsed.
This diff is collapsed.
......@@ -18,6 +18,10 @@
#include "mysql_priv.h"
#include "repl_failsafe.h"
#include "sql_repl.h"
#include "slave.h"
#include "mini_client.h"
#include <mysql.h>
RPL_STATUS rpl_status=RPL_NULL;
pthread_mutex_t LOCK_rpl_status;
......@@ -33,11 +37,184 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE",
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
rpl_status_type};
static int init_failsafe_rpl_thread(THD* thd)
{
DBUG_ENTER("init_failsafe_rpl_thread");
thd->system_thread = thd->bootstrap = 1;
thd->client_capabilities = 0;
my_net_init(&thd->net, 0);
thd->net.timeout = slave_net_timeout;
thd->max_packet_length=thd->net.max_packet;
thd->master_access= ~0;
thd->priv_user = 0;
thd->system_thread = 1;
pthread_mutex_lock(&LOCK_thread_count);
thd->thread_id = thread_id++;
pthread_mutex_unlock(&LOCK_thread_count);
if (init_thr_lock() ||
my_pthread_setspecific_ptr(THR_THD, thd) ||
my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
my_pthread_setspecific_ptr(THR_NET, &thd->net))
{
close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
end_thread(thd,0);
DBUG_RETURN(-1);
}
thd->mysys_var=my_thread_var;
thd->dbug_thread_id=my_thread_id();
#if !defined(__WIN__) && !defined(OS2)
sigset_t set;
VOID(sigemptyset(&set)); // Get mask in use
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif
thd->mem_root.free=thd->mem_root.used=0;
if (thd->max_join_size == (ulong) ~0L)
thd->options |= OPTION_BIG_SELECTS;
thd->proc_info="Thread initialized";
thd->version=refresh_version;
thd->set_time();
DBUG_RETURN(0);
}
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
{
pthread_mutex_lock(&LOCK_rpl_status);
if (rpl_status == from_status || rpl_status == RPL_ANY)
rpl_status = to_status;
pthread_cond_signal(&COND_rpl_status);
pthread_mutex_unlock(&LOCK_rpl_status);
}
int update_slave_list(MYSQL* mysql)
{
MYSQL_RES* res=0;
MYSQL_ROW row;
const char* error=0;
bool have_auth_info;
int port_ind;
if (mc_mysql_query(mysql,"SHOW SLAVE HOSTS",0) ||
!(res = mc_mysql_store_result(mysql)))
{
error = "Query error";
goto err;
}
switch (mc_mysql_num_fields(res))
{
case 5:
have_auth_info = 0;
port_ind=2;
break;
case 7:
have_auth_info = 1;
port_ind=4;
break;
default:
error = "Invalid number of fields in SHOW SLAVE HOSTS";
goto err;
}
pthread_mutex_lock(&LOCK_slave_list);
while ((row = mc_mysql_fetch_row(res)))
{
uint32 server_id;
SLAVE_INFO* si, *old_si;
server_id = atoi(row[0]);
if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
(byte*)&server_id,4)))
si = old_si;
else
{
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
{
error = "Out of memory";
pthread_mutex_unlock(&LOCK_slave_list);
goto err;
}
si->server_id = server_id;
}
strnmov(si->host, row[1], sizeof(si->host));
si->port = atoi(row[port_ind]);
si->rpl_recovery_rank = atoi(row[port_ind+1]);
si->master_id = atoi(row[port_ind+2]);
if (have_auth_info)
{
strnmov(si->user, row[2], sizeof(si->user));
strnmov(si->password, row[3], sizeof(si->password));
}
}
pthread_mutex_unlock(&LOCK_slave_list);
err:
if (res)
mc_mysql_free_result(res);
if (error)
{
sql_print_error("Error updating slave list:",error);
return 1;
}
return 0;
}
int find_recovery_captain(THD* thd, MYSQL* mysql)
{
return 0;
}
pthread_handler_decl(handle_failsafe_rpl,arg)
{
DBUG_ENTER("handle_failsafe_rpl");
THD *thd = new THD;
thd->thread_stack = (char*)&thd;
MYSQL* recovery_captain = 0;
pthread_detach_this_thread();
if (init_failsafe_rpl_thread(thd) || !(recovery_captain=mc_mysql_init(0)))
{
sql_print_error("Could not initialize failsafe replication thread");
goto err;
}
pthread_mutex_lock(&LOCK_rpl_status);
while (!thd->killed && !abort_loop)
{
bool break_req_chain = 0;
const char* msg = thd->enter_cond(&COND_rpl_status,
&LOCK_rpl_status, "Waiting for request");
pthread_cond_wait(&COND_rpl_status, &LOCK_rpl_status);
thd->proc_info="Processling request";
while (!break_req_chain)
{
switch (rpl_status)
{
case RPL_LOST_SOLDIER:
if (find_recovery_captain(thd, recovery_captain))
rpl_status=RPL_TROOP_SOLDIER;
else
rpl_status=RPL_RECOVERY_CAPTAIN;
break_req_chain=1; /* for now until other states are implemented */
break;
default:
break_req_chain=1;
break;
}
}
thd->exit_cond(msg);
}
pthread_mutex_unlock(&LOCK_rpl_status);
err:
if (recovery_captain)
mc_mysql_close(recovery_captain);
delete thd;
my_thread_end();
pthread_exit(0);
DBUG_RETURN(0);
}
#ifndef REPL_FAILSAFE_H
#define REPL_FAILSAFE_H
#include "mysql.h"
typedef enum {RPL_AUTH_MASTER=0,RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE,
RPL_LOST_SOLDIER,RPL_TROOP_SOLDIER,
RPL_RECOVERY_CAPTAIN,RPL_NULL /* inactive */,
......@@ -10,7 +12,11 @@ extern RPL_STATUS rpl_status;
extern pthread_mutex_t LOCK_rpl_status;
extern pthread_cond_t COND_rpl_status;
extern TYPELIB rpl_role_typelib, rpl_status_typelib;
extern uint rpl_recovery_rank;
extern const char* rpl_role_type[], *rpl_status_type[];
pthread_handler_decl(handle_failsafe_rpl,arg);
void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status);
int find_recovery_captain(THD* thd, MYSQL* mysql);
int update_slave_list(MYSQL* mysql);
#endif
......@@ -55,6 +55,8 @@ inline bool slave_killed(THD* thd);
static int init_slave_thread(THD* thd);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect);
static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
......@@ -615,6 +617,10 @@ int register_slave_on_master(MYSQL* mysql)
int2store(buf, (uint16)report_port);
packet.append(buf, 2);
int4store(buf, rpl_recovery_rank);
packet.append(buf, 4);
int4store(buf, 0); /* tell the master will fill in master_id */
packet.append(buf, 4);
if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
packet.length(), 0))
......@@ -868,7 +874,7 @@ command");
}
static uint read_event(MYSQL* mysql, MASTER_INFO *mi)
static ulong read_event(MYSQL* mysql, MASTER_INFO *mi)
{
ulong len = packet_error;
// for convinience lets think we start by
......@@ -1017,7 +1023,6 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ !
thd->set_time();
DBUG_ENTER("handle_slave");
pthread_detach_this_thread();
......@@ -1067,6 +1072,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
// on with life
thd->proc_info = "Registering slave on master";
register_slave_on_master(mysql);
update_slave_list(mysql);
while (!slave_killed(thd))
{
......@@ -1117,7 +1123,7 @@ try again, log '%s' at postion %s", RPL_LOG_NAME,
while(!slave_killed(thd))
{
thd->proc_info = "Reading master update";
uint event_len = read_event(mysql, &glob_mi);
ulong event_len = read_event(mysql, &glob_mi);
if(slave_killed(thd))
{
sql_print_error("Slave thread killed while reading event");
......@@ -1244,30 +1250,7 @@ position %s",
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
int slave_was_killed;
#ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count;
#endif
while(!(slave_was_killed = slave_killed(thd)) &&
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))
{
sql_print_error("Slave thread: error connecting to master: %s (%d),\
retry in %d sec", mc_mysql_error(mysql), errno, mi->connect_retry);
safe_sleep(thd, mi->connect_retry);
}
if(!slave_was_killed)
{
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
mi->user, mi->host, mi->port);
#ifdef SIGNAL_WITH_VIO_CLOSE
thd->set_active_vio(mysql->net.vio);
#endif
}
return slave_was_killed;
return connect_to_master(thd, mysql, mi, 0);
}
/*
......@@ -1275,7 +1258,8 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
master_retry_count times
*/
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect)
{
int slave_was_killed;
int last_errno= -2; // impossible error
......@@ -1290,12 +1274,15 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
#ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count;
#endif
while (!(slave_was_killed = slave_killed(thd)) && mc_mysql_reconnect(mysql))
while (!(slave_was_killed = slave_killed(thd)) &&
(reconnect ? mc_mysql_reconnect(mysql) :
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0)))
{
/* Don't repeat last error */
if (mc_mysql_errno(mysql) != last_errno)
{
sql_print_error("Slave thread: error re-connecting to master: \
sql_print_error("Slave thread: error connecting to master: \
%s, last_errno=%d, retry in %d sec",
mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
mi->connect_retry);
......@@ -1309,18 +1296,26 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
if (master_retry_count && err_count++ == master_retry_count)
{
slave_was_killed=1;
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
if (reconnect)
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
break;
}
}
if (!slave_was_killed)
{
sql_print_error("Slave: reconnected to master '%s@%s:%d',\
if (reconnect)
sql_print_error("Slave: connected to master '%s@%s:%d',\
replication resumed in log '%s' at position %s", glob_mi.user,
glob_mi.host, glob_mi.port,
RPL_LOG_NAME,
llstr(glob_mi.pos,llbuff));
else
{
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
mi->user, mi->host, mi->port);
}
#ifdef SIGNAL_WITH_VIO_CLOSE
thd->set_active_vio(mysql->net.vio);
#endif
......@@ -1329,6 +1324,17 @@ replication resumed in log '%s' at position %s", glob_mi.user,
return slave_was_killed;
}
/*
Try to connect until successful or slave killed or we have retried
master_retry_count times
*/
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
return connect_to_master(thd, mysql, mi, 1);
}
#ifdef __GNUC__
template class I_List_iterator<i_string>;
template class I_List_iterator<i_string_pair>;
......
......@@ -278,8 +278,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
ha_autocommit_or_rollback(thd,error);
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
{
Delete_file_log_event d(thd);
mysql_bin_log.write(&d);
if (lf_info.wrote_create_file)
{
Delete_file_log_event d(thd);
mysql_bin_log.write(&d);
}
}
DBUG_RETURN(-1); // Error on read
}
......@@ -303,8 +306,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (!opt_old_rpl_compat)
{
read_info.end_io_cache(); // make sure last block gets logged
Execute_load_log_event e(thd);
mysql_bin_log.write(&e);
if (lf_info.wrote_create_file)
{
Execute_load_log_event e(thd);
mysql_bin_log.write(&e);
}
}
}
if (using_transactions)
......@@ -534,6 +540,14 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term,
}
else
{
/* init_io_cache() will not initialize read_function member
if the cache is READ_NET. The reason is explained in
mysys/mf_iocache.c. So we work around the problem with a
manual assignment
*/
if (get_it_from_net)
cache.read_function = _my_b_net_read;
need_end_io_cache = 1;
if (!opt_old_rpl_compat && mysql_bin_log.is_open())
cache.pre_read = cache.pre_close =
......
......@@ -140,6 +140,11 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
get_object(p,si->user);
get_object(p,si->password);
si->port = uint2korr(p);
p += 2;
si->rpl_recovery_rank = uint4korr(p);
p += 4;
if (!(si->master_id = uint4korr(p)))
si->master_id = server_id;
si->thd = thd;
pthread_mutex_lock(&LOCK_slave_list);
......@@ -534,6 +539,7 @@ impossible position";
DBUG_PRINT("wait",("waiting for data on binary log"));
if (!thd->killed)
pthread_cond_wait(&COND_binlog_update, log_lock);
DBUG_PRINT("wait",("binary log received update"));
break;
default:
......@@ -1253,6 +1259,8 @@ int show_slave_hosts(THD* thd)
field_list.push_back(new Item_empty_string("Password",20));
}
field_list.push_back(new Item_empty_string("Port",20));
field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
field_list.push_back(new Item_empty_string("Master_id", 20));
if (send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
......@@ -1271,6 +1279,8 @@ int show_slave_hosts(THD* thd)
net_store_data(packet, si->password);
}
net_store_data(packet, (uint32) si->port);
net_store_data(packet, si->rpl_recovery_rank);
net_store_data(packet, si->master_id);
if (my_net_write(net, (char*)packet->ptr(), packet->length()))
{
pthread_mutex_unlock(&LOCK_slave_list);
......@@ -1616,7 +1626,8 @@ int log_loaded_block(IO_CACHE* file)
{
LOAD_FILE_INFO* lf_info;
uint block_len ;
if (!(block_len = file->rc_end - file->buffer))
char* buffer = (char*)file->buffer;
if (!(block_len = file->rc_end - buffer))
return 0;
lf_info = (LOAD_FILE_INFO*)file->arg;
if (lf_info->last_pos_in_file != HA_POS_ERROR &&
......@@ -1625,14 +1636,14 @@ int log_loaded_block(IO_CACHE* file)
lf_info->last_pos_in_file = file->pos_in_file;
if (lf_info->wrote_create_file)
{
Append_block_log_event a(lf_info->thd, (char*) file->buffer, block_len);
Append_block_log_event a(lf_info->thd, buffer, block_len);
mysql_bin_log.write(&a);
}
else
{
Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db,
lf_info->table_name, *lf_info->fields,
lf_info->handle_dup, (char*) file->buffer,
lf_info->handle_dup, buffer,
block_len);
mysql_bin_log.write(&c);
lf_info->wrote_create_file = 1;
......
......@@ -6,6 +6,7 @@
typedef struct st_slave_info
{
uint32 server_id;
uint32 rpl_recovery_rank, master_id;
char host[HOSTNAME_LENGTH+1];
char user[USERNAME_LENGTH+1];
char password[HASH_PASSWORD_LENGTH+1];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment