updates for LOAD DATA FROM MASTER + some cleanup of replication code

parent 8a3f87ab
......@@ -205,4 +205,6 @@
#define ER_SLAVE_THREAD 1202
#define ER_TOO_MANY_USER_CONNECTIONS 1203
#define ER_SET_CONSTANTS_ONLY 1204
#define ER_ERROR_MESSAGES 205
#define ER_CONNECT_TO_MASTER 1205
#define ER_QUERY_ON_MASTER 1206
#define ER_ERROR_MESSAGES 207
n m
4 15
Database
bar
foo
mysql
test
Database
mysql
test
Database
bar
foo
mysql
test
Tables_in_foo
Tables_in_bar
t1
t2
n s
1 one bar
2 two bar
3 three bar
n s
11 eleven bar
12 twelve bar
13 thirteen bar
n s
1 one bar
2 two bar
3 three bar
4 four bar
......@@ -31,3 +31,56 @@ connection slave;
sync_with_master;
drop database if exists bar;
drop database if exists foo;
#now let's test load data from master
#first create some databases and tables on the master
connection master;
set sql_log_bin = 0;
create database foo;
create database bar;
show databases;
create table foo.t1(n int, s char(20));
create table foo.t2(n int, s text);
insert into foo.t1 values (1, 'one'), (2, 'two'), (3, 'three');
insert into foo.t2 values (11, 'eleven'), (12, 'twelve'), (13, 'thirteen');
create table bar.t1(n int, s char(20));
create table bar.t2(n int, s text);
insert into bar.t1 values (1, 'one bar'), (2, 'two bar'), (3, 'three bar');
insert into bar.t2 values (11, 'eleven bar'), (12, 'twelve bar'),
(13, 'thirteen bar');
set sql_log_bin = 1;
save_master_pos;
connection slave;
sync_with_master;
#this should show that the slave is empty at this point
show databases;
load data from master;
#now let's check if we have the right tables and the right data in them
show databases;
use foo;
show tables;
use bar;
show tables;
select * from bar.t1;
select * from bar.t2;
#now let's see if replication works
connection master;
insert into bar.t1 values (4, 'four bar');
save_master_pos;
connection slave;
sync_with_master;
select * from bar.t1;
#now time for cleanup
connection master;
drop database bar;
drop database foo;
save_master_pos;
connection slave;
sync_with_master;
......@@ -69,9 +69,22 @@ extern "C" { // Because of SCO 3.2V4.2
}
static void mc_free_rows(MYSQL_DATA *cur);
static MYSQL_FIELD *unpack_fields(MYSQL_DATA *data,MEM_ROOT *alloc,uint fields,
my_bool default_value,
my_bool long_flag_protocol);
static void mc_end_server(MYSQL *mysql);
static int mc_sock_connect(File s, const struct sockaddr *name, uint namelen, uint to);
static void mc_free_old_query(MYSQL *mysql);
static int mc_send_file_to_server(MYSQL *mysql, const char *filename);
static my_ulonglong mc_net_field_length_ll(uchar **packet);
static ulong mc_net_field_length(uchar **packet);
static int mc_read_one_row(MYSQL *mysql,uint fields,MYSQL_ROW row,
ulong *lengths);
static MYSQL_DATA *mc_read_rows(MYSQL *mysql,MYSQL_FIELD *mysql_fields,
uint fields);
#define CLIENT_CAPABILITIES (CLIENT_LONG_PASSWORD | CLIENT_LONG_FLAG | CLIENT_LOCAL_FILES)
......@@ -824,3 +837,498 @@ mc_mysql_close(MYSQL *mysql)
}
DBUG_VOID_RETURN;
}
void STDCALL mc_mysql_free_result(MYSQL_RES *result)
{
DBUG_ENTER("mc_mysql_free_result");
DBUG_PRINT("enter",("mysql_res: %lx",result));
if (result)
{
if (result->handle && result->handle->status == MYSQL_STATUS_USE_RESULT)
{
DBUG_PRINT("warning",("Not all rows in set were read; Ignoring rows"));
for (;;)
{
uint pkt_len;
if ((pkt_len=(uint) mc_net_safe_read(result->handle)) == packet_error)
break;
if (pkt_len == 1 && result->handle->net.read_pos[0] == 254)
break; /* End of data */
}
result->handle->status=MYSQL_STATUS_READY;
}
mc_free_rows(result->data);
if (result->fields)
free_root(&result->field_alloc,MYF(0));
if (result->row)
my_free((gptr) result->row,MYF(0));
my_free((gptr) result,MYF(0));
}
DBUG_VOID_RETURN;
}
static void mc_free_rows(MYSQL_DATA *cur)
{
if (cur)
{
free_root(&cur->alloc,MYF(0));
my_free((gptr) cur,MYF(0));
}
}
static MYSQL_FIELD *
mc_unpack_fields(MYSQL_DATA *data,MEM_ROOT *alloc,uint fields,
my_bool default_value, my_bool long_flag_protocol)
{
MYSQL_ROWS *row;
MYSQL_FIELD *field,*result;
DBUG_ENTER("unpack_fields");
field=result=(MYSQL_FIELD*) alloc_root(alloc,sizeof(MYSQL_FIELD)*fields);
if (!result)
DBUG_RETURN(0);
for (row=data->data; row ; row = row->next,field++)
{
field->table= strdup_root(alloc,(char*) row->data[0]);
field->name= strdup_root(alloc,(char*) row->data[1]);
field->length= (uint) uint3korr(row->data[2]);
field->type= (enum enum_field_types) (uchar) row->data[3][0];
if (long_flag_protocol)
{
field->flags= uint2korr(row->data[4]);
field->decimals=(uint) (uchar) row->data[4][2];
}
else
{
field->flags= (uint) (uchar) row->data[4][0];
field->decimals=(uint) (uchar) row->data[4][1];
}
if (INTERNAL_NUM_FIELD(field))
field->flags|= NUM_FLAG;
if (default_value && row->data[5])
field->def=strdup_root(alloc,(char*) row->data[5]);
else
field->def=0;
field->max_length= 0;
}
mc_free_rows(data); /* Free old data */
DBUG_RETURN(result);
}
int STDCALL
mc_mysql_send_query(MYSQL* mysql, const char* query, uint length)
{
return mc_simple_command(mysql, COM_QUERY, query, length, 1);
}
int STDCALL mc_mysql_read_query_result(MYSQL *mysql)
{
uchar *pos;
ulong field_count;
MYSQL_DATA *fields;
uint length;
DBUG_ENTER("mc_mysql_read_query_result");
if ((length = mc_net_safe_read(mysql)) == packet_error)
DBUG_RETURN(-1);
mc_free_old_query(mysql); /* Free old result */
get_info:
pos=(uchar*) mysql->net.read_pos;
if ((field_count= mc_net_field_length(&pos)) == 0)
{
mysql->affected_rows= mc_net_field_length_ll(&pos);
mysql->insert_id= mc_net_field_length_ll(&pos);
if (mysql->server_capabilities & CLIENT_TRANSACTIONS)
{
mysql->server_status=uint2korr(pos); pos+=2;
}
if (pos < mysql->net.read_pos+length && mc_net_field_length(&pos))
mysql->info=(char*) pos;
DBUG_RETURN(0);
}
if (field_count == NULL_LENGTH) /* LOAD DATA LOCAL INFILE */
{
int error=mc_send_file_to_server(mysql,(char*) pos);
if ((length=mc_net_safe_read(mysql)) == packet_error || error)
DBUG_RETURN(-1);
goto get_info; /* Get info packet */
}
if (!(mysql->server_status & SERVER_STATUS_AUTOCOMMIT))
mysql->server_status|= SERVER_STATUS_IN_TRANS;
mysql->extra_info= mc_net_field_length_ll(&pos); /* Maybe number of rec */
if (!(fields=mc_read_rows(mysql,(MYSQL_FIELD*) 0,5)))
DBUG_RETURN(-1);
if (!(mysql->fields=mc_unpack_fields(fields,&mysql->field_alloc,
(uint) field_count,0,
(my_bool) test(mysql->server_capabilities &
CLIENT_LONG_FLAG))))
DBUG_RETURN(-1);
mysql->status=MYSQL_STATUS_GET_RESULT;
mysql->field_count=field_count;
DBUG_RETURN(0);
}
int STDCALL mc_mysql_query(MYSQL *mysql, const char *query, uint length)
{
DBUG_ENTER("mysql_real_query");
DBUG_PRINT("enter",("handle: %lx",mysql));
DBUG_PRINT("query",("Query = \"%s\"",query));
if(!length)
length = strlen(query);
if (mc_simple_command(mysql,COM_QUERY,query,length,1))
DBUG_RETURN(-1);
DBUG_RETURN(mc_mysql_read_query_result(mysql));
}
static int mc_send_file_to_server(MYSQL *mysql, const char *filename)
{
int fd, readcount;
char buf[IO_SIZE*15],*tmp_name;
DBUG_ENTER("send_file_to_server");
fn_format(buf,filename,"","",4); /* Convert to client format */
if (!(tmp_name=my_strdup(buf,MYF(0))))
{
strmov(mysql->net.last_error, ER(mysql->net.last_errno=CR_OUT_OF_MEMORY));
DBUG_RETURN(-1);
}
if ((fd = my_open(tmp_name,O_RDONLY, MYF(0))) < 0)
{
mysql->net.last_errno=EE_FILENOTFOUND;
sprintf(buf,EE(mysql->net.last_errno),tmp_name,errno);
strmake(mysql->net.last_error,buf,sizeof(mysql->net.last_error)-1);
my_net_write(&mysql->net,"",0); net_flush(&mysql->net);
my_free(tmp_name,MYF(0));
DBUG_RETURN(-1);
}
while ((readcount = (int) my_read(fd,buf,sizeof(buf),MYF(0))) > 0)
{
if (my_net_write(&mysql->net,buf,readcount))
{
mysql->net.last_errno=CR_SERVER_LOST;
strmov(mysql->net.last_error,ER(mysql->net.last_errno));
DBUG_PRINT("error",("Lost connection to MySQL server during LOAD DATA of local file"));
(void) my_close(fd,MYF(0));
my_free(tmp_name,MYF(0));
DBUG_RETURN(-1);
}
}
(void) my_close(fd,MYF(0));
/* Send empty packet to mark end of file */
if (my_net_write(&mysql->net,"",0) || net_flush(&mysql->net))
{
mysql->net.last_errno=CR_SERVER_LOST;
sprintf(mysql->net.last_error,ER(mysql->net.last_errno),errno);
my_free(tmp_name,MYF(0));
DBUG_RETURN(-1);
}
if (readcount < 0)
{
mysql->net.last_errno=EE_READ; /* the errmsg for not entire file read */
sprintf(buf,EE(mysql->net.last_errno),tmp_name,errno);
strmake(mysql->net.last_error,buf,sizeof(mysql->net.last_error)-1);
my_free(tmp_name,MYF(0));
DBUG_RETURN(-1);
}
DBUG_RETURN(0);
}
/* Get the length of next field. Change parameter to point at fieldstart */
static ulong mc_net_field_length(uchar **packet)
{
reg1 uchar *pos= *packet;
if (*pos < 251)
{
(*packet)++;
return (ulong) *pos;
}
if (*pos == 251)
{
(*packet)++;
return NULL_LENGTH;
}
if (*pos == 252)
{
(*packet)+=3;
return (ulong) uint2korr(pos+1);
}
if (*pos == 253)
{
(*packet)+=4;
return (ulong) uint3korr(pos+1);
}
(*packet)+=9; /* Must be 254 when here */
return (ulong) uint4korr(pos+1);
}
/* Same as above, but returns ulonglong values */
static my_ulonglong mc_net_field_length_ll(uchar **packet)
{
reg1 uchar *pos= *packet;
if (*pos < 251)
{
(*packet)++;
return (my_ulonglong) *pos;
}
if (*pos == 251)
{
(*packet)++;
return (my_ulonglong) NULL_LENGTH;
}
if (*pos == 252)
{
(*packet)+=3;
return (my_ulonglong) uint2korr(pos+1);
}
if (*pos == 253)
{
(*packet)+=4;
return (my_ulonglong) uint3korr(pos+1);
}
(*packet)+=9; /* Must be 254 when here */
#ifdef NO_CLIENT_LONGLONG
return (my_ulonglong) uint4korr(pos+1);
#else
return (my_ulonglong) uint8korr(pos+1);
#endif
}
/* Read all rows (fields or data) from server */
static MYSQL_DATA *mc_read_rows(MYSQL *mysql,MYSQL_FIELD *mysql_fields,
uint fields)
{
uint field,pkt_len;
ulong len;
uchar *cp;
char *to;
MYSQL_DATA *result;
MYSQL_ROWS **prev_ptr,*cur;
NET *net = &mysql->net;
DBUG_ENTER("mc_read_rows");
if ((pkt_len=(uint) mc_net_safe_read(mysql)) == packet_error)
DBUG_RETURN(0);
if (!(result=(MYSQL_DATA*) my_malloc(sizeof(MYSQL_DATA),
MYF(MY_WME | MY_ZEROFILL))))
{
net->last_errno=CR_OUT_OF_MEMORY;
strmov(net->last_error,ER(net->last_errno));
DBUG_RETURN(0);
}
init_alloc_root(&result->alloc,8192,0); /* Assume rowlength < 8192 */
result->alloc.min_malloc=sizeof(MYSQL_ROWS);
prev_ptr= &result->data;
result->rows=0;
result->fields=fields;
while (*(cp=net->read_pos) != 254 || pkt_len != 1)
{
result->rows++;
if (!(cur= (MYSQL_ROWS*) alloc_root(&result->alloc,
sizeof(MYSQL_ROWS))) ||
!(cur->data= ((MYSQL_ROW)
alloc_root(&result->alloc,
(fields+1)*sizeof(char *)+pkt_len))))
{
mc_free_rows(result);
net->last_errno=CR_OUT_OF_MEMORY;
strmov(net->last_error,ER(net->last_errno));
DBUG_RETURN(0);
}
*prev_ptr=cur;
prev_ptr= &cur->next;
to= (char*) (cur->data+fields+1);
for (field=0 ; field < fields ; field++)
{
if ((len=(ulong) mc_net_field_length(&cp)) == NULL_LENGTH)
{ /* null field */
cur->data[field] = 0;
}
else
{
cur->data[field] = to;
memcpy(to,(char*) cp,len); to[len]=0;
to+=len+1;
cp+=len;
if (mysql_fields)
{
if (mysql_fields[field].max_length < len)
mysql_fields[field].max_length=len;
}
}
}
cur->data[field]=to; /* End of last field */
if ((pkt_len=mc_net_safe_read(mysql)) == packet_error)
{
mc_free_rows(result);
DBUG_RETURN(0);
}
}
*prev_ptr=0; /* last pointer is null */
DBUG_PRINT("exit",("Got %d rows",result->rows));
DBUG_RETURN(result);
}
/*
** Read one row. Uses packet buffer as storage for fields.
** When next packet is read, the previous field values are destroyed
*/
static int mc_read_one_row(MYSQL *mysql,uint fields,MYSQL_ROW row,
ulong *lengths)
{
uint field;
ulong pkt_len,len;
uchar *pos,*prev_pos;
if ((pkt_len=(uint) mc_net_safe_read(mysql)) == packet_error)
return -1;
if (pkt_len == 1 && mysql->net.read_pos[0] == 254)
return 1; /* End of data */
prev_pos= 0; /* allowed to write at packet[-1] */
pos=mysql->net.read_pos;
for (field=0 ; field < fields ; field++)
{
if ((len=(ulong) mc_net_field_length(&pos)) == NULL_LENGTH)
{ /* null field */
row[field] = 0;
*lengths++=0;
}
else
{
row[field] = (char*) pos;
pos+=len;
*lengths++=len;
}
if (prev_pos)
*prev_pos=0; /* Terminate prev field */
prev_pos=pos;
}
row[field]=(char*) prev_pos+1; /* End of last field */
*prev_pos=0; /* Terminate last field */
return 0;
}
my_ulonglong STDCALL mc_mysql_num_rows(MYSQL_RES *res)
{
return res->row_count;
}
unsigned int STDCALL mc_mysql_num_fields(MYSQL_RES *res)
{
return res->field_count;
}
void STDCALL mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row)
{
MYSQL_ROWS *tmp=0;
DBUG_PRINT("info",("mysql_data_seek(%ld)",(long) row));
if (result->data)
for (tmp=result->data->data; row-- && tmp ; tmp = tmp->next) ;
result->current_row=0;
result->data_cursor = tmp;
}
MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res)
{
DBUG_ENTER("mc_mysql_fetch_row");
if (!res->data)
{ /* Unbufferred fetch */
if (!res->eof)
{
if (!(mc_read_one_row(res->handle,res->field_count,res->row,
res->lengths)))
{
res->row_count++;
DBUG_RETURN(res->current_row=res->row);
}
else
{
DBUG_PRINT("info",("end of data"));
res->eof=1;
res->handle->status=MYSQL_STATUS_READY;
}
}
DBUG_RETURN((MYSQL_ROW) NULL);
}
{
MYSQL_ROW tmp;
if (!res->data_cursor)
{
DBUG_PRINT("info",("end of data"));
DBUG_RETURN(res->current_row=(MYSQL_ROW) NULL);
}
tmp = res->data_cursor->data;
res->data_cursor = res->data_cursor->next;
DBUG_RETURN(res->current_row=tmp);
}
}
int STDCALL mc_mysql_select_db(MYSQL *mysql, const char *db)
{
int error;
DBUG_ENTER("mysql_select_db");
DBUG_PRINT("enter",("db: '%s'",db));
if ((error=mc_simple_command(mysql,COM_INIT_DB,db,(uint) strlen(db),0)))
DBUG_RETURN(error);
my_free(mysql->db,MYF(MY_ALLOW_ZERO_PTR));
mysql->db=my_strdup(db,MYF(MY_WME));
DBUG_RETURN(0);
}
MYSQL_RES * STDCALL mc_mysql_store_result(MYSQL *mysql)
{
MYSQL_RES *result;
DBUG_ENTER("mysql_store_result");
if (!mysql->fields)
DBUG_RETURN(0);
if (mysql->status != MYSQL_STATUS_GET_RESULT)
{
strmov(mysql->net.last_error,
ER(mysql->net.last_errno=CR_COMMANDS_OUT_OF_SYNC));
DBUG_RETURN(0);
}
mysql->status=MYSQL_STATUS_READY; /* server is ready */
if (!(result=(MYSQL_RES*) my_malloc(sizeof(MYSQL_RES)+
sizeof(ulong)*mysql->field_count,
MYF(MY_WME | MY_ZEROFILL))))
{
mysql->net.last_errno=CR_OUT_OF_MEMORY;
strmov(mysql->net.last_error, ER(mysql->net.last_errno));
DBUG_RETURN(0);
}
result->eof=1; /* Marker for buffered */
result->lengths=(ulong*) (result+1);
if (!(result->data=mc_read_rows(mysql,mysql->fields,mysql->field_count)))
{
my_free((gptr) result,MYF(0));
DBUG_RETURN(0);
}
mysql->affected_rows= result->row_count= result->data->rows;
result->data_cursor= result->data->data;
result->fields= mysql->fields;
result->field_alloc= mysql->field_alloc;
result->field_count= mysql->field_count;
result->current_field=0;
result->current_row=0; /* Must do a fetch first */
mysql->fields=0; /* fields is now in result */
DBUG_RETURN(result); /* Data fetched */
}
......@@ -42,6 +42,17 @@ char * STDCALL mc_mysql_error(MYSQL *mysql);
int STDCALL mc_mysql_errno(MYSQL *mysql);
my_bool STDCALL mc_mysql_reconnect(MYSQL* mysql);
int STDCALL mc_mysql_send_query(MYSQL* mysql, const char* query, uint length);
int STDCALL mc_mysql_read_query_result(MYSQL *mysql);
int STDCALL mc_mysql_query(MYSQL *mysql, const char *query, uint length);
MYSQL_RES * STDCALL mc_mysql_store_result(MYSQL *mysql);
void STDCALL mc_mysql_free_result(MYSQL_RES *result);
void STDCALL mc_mysql_data_seek(MYSQL_RES *result, my_ulonglong row);
my_ulonglong STDCALL mc_mysql_num_rows(MYSQL_RES *res);
unsigned int STDCALL mc_mysql_num_fields(MYSQL_RES *res);
MYSQL_ROW STDCALL mc_mysql_fetch_row(MYSQL_RES *res);
int STDCALL mc_mysql_select_db(MYSQL *mysql, const char *db);
#endif
......@@ -223,7 +223,7 @@ inline THD *_current_thd(void)
#include "opt_range.h"
void mysql_create_db(THD *thd, char *db, uint create_info);
int mysql_create_db(THD *thd, char *db, uint create_info);
void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags);
int mysql_rm_table(THD *thd,TABLE_LIST *tables, my_bool if_exists);
int quick_rm_table(enum db_type base,const char *db,
......@@ -245,7 +245,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
char* packet, uint packet_length);
bool check_stack_overrun(THD *thd,char *dummy);
bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables);
void mysql_rm_db(THD *thd,char *db,bool if_exists);
int mysql_rm_db(THD *thd,char *db,bool if_exists);
void table_cache_init(void);
void table_cache_free(void);
uint cached_tables(void);
......
......@@ -153,7 +153,7 @@
"You have an error in your SQL syntax",
"Delayed insert thread couldn't get requested lock for table %-.64s",
"Too many delayed threads in use",
"Aborted connection %ld to db: '%-.64s' user: '%-.32s' (%-.64s)",
"Aborted connection %ld to db: '%-.64s' user: '%-.32s' (%-.64s) - see http://www.mysql.com/doc/C/o/Communication_errors.html",
"Got a packet bigger than 'max_allowed_packet'",
"Got a read error from the connection pipe",
"Got an error from fcntl()",
......@@ -185,7 +185,7 @@
"Got error %d during ROLLBACK",
"Got error %d during FLUSH_LOGS",
"Got error %d during CHECKPOINT",
"Aborted connection %ld to db: '%-.64s' user: '%-.32s' host: `%-.64s' (%-.64s)",
"Aborted connection %ld to db: '%-.64s' user: '%-.32s' host: `%-.64s' (%-.64s) - see http://www.mysql.com/doc/C/o/Communication_errors.html",
"The handler for the table does not support binary table dump",
"Binlog closed, cannot RESET MASTER",
"Failed rebuilding the index of dumped table '%-.64s'",
......@@ -206,3 +206,8 @@
"Could not create slave thread, check system resources",
"User %-.64s has already more than 'max_user_connections' active connections",
"You may only use constant expressions with SET",
"Error connecting to master: %-.128s",
"Error running query on master: %-.128s",
......@@ -20,6 +20,7 @@
#include <myisam.h>
#include "mini_client.h"
#include "slave.h"
#include "sql_repl.h"
#include <thr_alarm.h>
#include <my_dir.h>
......@@ -55,7 +56,7 @@ static int init_slave_thread(THD* thd);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, char* db, char* table);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name);
inline char* rewrite_db(char* db);
......@@ -344,7 +345,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
thd->proc_info = "Creating table from master dump";
// save old db in case we are creating in a different database
char* save_db = thd->db;
thd->db = thd->last_nx_db;
thd->db = (char*)db;
mysql_parse(thd, thd->query, packet_len); // run create table
thd->db = save_db; // leave things the way the were before
......@@ -400,31 +401,39 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
return error;
}
int fetch_nx_table(THD* thd, MASTER_INFO* mi)
int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
MASTER_INFO* mi, MYSQL* mysql)
{
MYSQL* mysql = mc_mysql_init(NULL);
int error = 1;
int nx_errno = 0;
if(!mysql)
bool called_connected = (mysql != NULL);
if(!called_connected && !(mysql = mc_mysql_init(NULL)))
{
sql_print_error("fetch_nx_table: Error in mysql_init()");
nx_errno = ER_GET_ERRNO;
goto err;
}
safe_connect(thd, mysql, mi);
if(slave_killed(thd))
goto err;
if(request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table))
if(!called_connected)
{
if(connect_to_master(thd, mysql, mi))
{
sql_print_error("Could not connect to master while fetching table\
'%-64s.%-64s'", db_name, table_name);
nx_errno = ER_CONNECT_TO_MASTER;
goto err;
}
}
if(request_table_dump(mysql, db_name, table_name))
{
nx_errno = ER_GET_ERRNO;
sql_print_error("fetch_nx_table: failed on table dump request ");
goto err;
}
if(create_table_from_dump(thd, &mysql->net, thd->last_nx_db,
thd->last_nx_table))
if(create_table_from_dump(thd, &mysql->net, db_name,
table_name))
{
// create_table_from_dump will have sent the error alread
sql_print_error("fetch_nx_table: failed on create table ");
......@@ -434,7 +443,7 @@ int fetch_nx_table(THD* thd, MASTER_INFO* mi)
error = 0;
err:
if (mysql)
if (mysql && !called_connected)
mc_mysql_close(mysql);
if (nx_errno && thd->net.vio)
send_error(&thd->net, nx_errno, "Error in fetch_nx_table");
......@@ -764,7 +773,7 @@ static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
return 0;
}
static int request_table_dump(MYSQL* mysql, char* db, char* table)
static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
{
char buf[1024];
char * p = buf;
......@@ -901,7 +910,6 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
VOID(pthread_mutex_lock(&LOCK_thread_count));
thd->query_id = query_id++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
thd->last_nx_table = thd->last_nx_db = 0;
thd->query_error = 0; // clear error
thd->net.last_errno = 0;
thd->net.last_error[0] = 0;
......
#ifndef SLAVE_H
#define SLAVE_H
#include "mysql.h"
typedef struct st_master_info
{
char log_file_name[FN_REFLEN];
......@@ -65,11 +67,14 @@ typedef struct st_table_rule_ent
int flush_master_info(MASTER_INFO* mi);
int mysql_table_dump(THD* thd, char* db, char* tbl_name, int fd = -1);
int mysql_table_dump(THD* thd, const char* db,
const char* tbl_name, int fd = -1);
// if fd is -1, dump to NET
int fetch_nx_table(THD* thd, MASTER_INFO* mi);
int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
MASTER_INFO* mi, MYSQL* mysql);
// retrieve non-exitent table from master
// the caller must set thd->last_nx_table and thd->last_nx_db first
int show_master_info(THD* thd);
int show_binlog_info(THD* thd);
......
......@@ -837,25 +837,6 @@ TABLE *open_table(THD *thd,const char *db,const char *table_name,
!(table->table_cache_key=memdup_root(&table->mem_root,(char*) key,
key_length)))
{
MEM_ROOT* glob_alloc;
LINT_INIT(glob_alloc);
if (errno == ENOENT &&
(glob_alloc = my_pthread_getspecific_ptr(MEM_ROOT*,THR_MALLOC)))
// Sasha: needed for replication
// remember the name of the non-existent table
// so we can try to download it from the master
{
int table_name_len = (uint) strlen(table_name);
int db_len = (uint) strlen(db);
thd->last_nx_db = alloc_root(glob_alloc,db_len + table_name_len + 2);
if(thd->last_nx_db)
{
thd->last_nx_table = thd->last_nx_db + db_len + 1;
memcpy(thd->last_nx_table, table_name, table_name_len + 1);
memcpy(thd->last_nx_db, db, db_len + 1);
}
}
table->next=table->prev=table;
free_cache_entry(table);
VOID(pthread_mutex_unlock(&LOCK_open));
......
......@@ -96,7 +96,6 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
current_linfo = 0;
slave_thread = 0;
slave_proxy_id = 0;
last_nx_table = last_nx_db = 0;
cond_count=0;
convert_set=0;
mysys_var=0;
......
......@@ -242,8 +242,6 @@ class THD :public ilink {
enum enum_server_command command;
uint32 server_id;
const char *where;
char* last_nx_table; // last non-existent table, we need this for replication
char* last_nx_db; // database of the last nx table
time_t start_time,time_after_lock,user_time;
time_t connect_time,thr_create_time; // track down slow pthread_create
thr_lock_type update_lock_default;
......
......@@ -30,11 +30,12 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *path,
/* db-name is already validated when we come here */
void mysql_create_db(THD *thd, char *db, uint create_options)
int mysql_create_db(THD *thd, char *db, uint create_options)
{
char path[FN_REFLEN+16];
MY_DIR *dirp;
long result=1;
int error = 0;
DBUG_ENTER("mysql_create_db");
VOID(pthread_mutex_lock(&LOCK_mysql_create_db));
......@@ -47,7 +48,9 @@ void mysql_create_db(THD *thd, char *db, uint create_options)
my_dirend(dirp);
if (!(create_options & HA_LEX_CREATE_IF_NOT_EXISTS))
{
net_printf(&thd->net,ER_DB_CREATE_EXISTS,db);
if(thd)
net_printf(&thd->net,ER_DB_CREATE_EXISTS,db);
error = 1;
goto exit;
}
result = 0;
......@@ -57,34 +60,39 @@ void mysql_create_db(THD *thd, char *db, uint create_options)
strend(path)[-1]=0; // Remove last '/' from path
if (my_mkdir(path,0777,MYF(0)) < 0)
{
net_printf(&thd->net,ER_CANT_CREATE_DB,db,my_errno);
if(thd)
net_printf(&thd->net,ER_CANT_CREATE_DB,db,my_errno);
error = 1;
goto exit;
}
}
if (!thd->query)
{
thd->query = path;
thd->query_length = (uint) (strxmov(path,"create database ", db, NullS)-
path);
}
if(thd)
{
mysql_update_log.write(thd,thd->query, thd->query_length);
if (mysql_bin_log.is_open())
if (!thd->query)
{
Query_log_event qinfo(thd, thd->query);
mysql_bin_log.write(&qinfo);
thd->query = path;
thd->query_length = (uint) (strxmov(path,"create database ", db, NullS)-
path);
}
{
mysql_update_log.write(thd,thd->query, thd->query_length);
if (mysql_bin_log.is_open())
{
Query_log_event qinfo(thd, thd->query);
mysql_bin_log.write(&qinfo);
}
}
if (thd->query == path)
{
thd->query = 0; // just in case
thd->query_length = 0;
}
send_ok(&thd->net, result);
}
if (thd->query == path)
{
thd->query = 0; // just in case
thd->query_length = 0;
}
send_ok(&thd->net, result);
exit:
VOID(pthread_mutex_unlock(&LOCK_mysql_create_db));
DBUG_VOID_RETURN;
DBUG_RETURN(error);
}
const char *del_exts[]=
......@@ -94,10 +102,14 @@ static TYPELIB deletable_extentions=
/* db-name is already validated when we come here */
void mysql_rm_db(THD *thd,char *db,bool if_exists)
/* If thd == 0, do not write any messages
This is useful in replication when we want to remove
a stale database before replacing it with the new one
*/
int mysql_rm_db(THD *thd,char *db,bool if_exists)
{
long deleted=0;
int error = 0;
char path[FN_REFLEN+16];
MY_DIR *dirp;
DBUG_ENTER("mysql_rm_db");
......@@ -110,15 +122,19 @@ void mysql_rm_db(THD *thd,char *db,bool if_exists)
/* See if the directory exists */
if (!(dirp = my_dir(path,MYF(MY_WME | MY_DONT_SORT))))
{
if (!if_exists)
net_printf(&thd->net,ER_DB_DROP_EXISTS,db);
else
send_ok(&thd->net,0);
if(thd)
{
if (!if_exists)
net_printf(&thd->net,ER_DB_DROP_EXISTS,db);
else
send_ok(&thd->net,0);
}
error = !if_exists;
goto exit;
}
remove_db_from_cache(db);
if ((deleted=mysql_rm_known_files(thd, dirp, path,0)) >= 0)
if ((deleted=mysql_rm_known_files(thd, dirp, path,0)) >= 0 && thd)
{
if (!thd->query)
{
......@@ -137,13 +153,14 @@ void mysql_rm_db(THD *thd,char *db,bool if_exists)
thd->query = 0; // just in case
thd->query_length = 0;
}
send_ok(&thd->net,(ulong) deleted);
}
exit:
VOID(pthread_mutex_unlock(&LOCK_open));
VOID(pthread_mutex_unlock(&LOCK_mysql_create_db));
DBUG_VOID_RETURN;
DBUG_RETURN(error);
}
/*
......@@ -151,6 +168,7 @@ void mysql_rm_db(THD *thd,char *db,bool if_exists)
are 2 digits (raid directories).
*/
/* This one also needs to work with thd == 0 for replication */
static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
uint level)
{
......@@ -162,7 +180,7 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
/* remove all files with known extensions */
for (uint idx=2 ;
idx < (uint) dirp->number_off_files && !thd->killed ;
idx < (uint) dirp->number_off_files && (!thd || !thd->killed) ;
idx++)
{
FILEINFO *file=dirp->dir_entry+idx;
......@@ -196,7 +214,8 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
unpack_filename(filePath,filePath);
if (my_delete(filePath,MYF(MY_WME)))
{
net_printf(&thd->net,ER_DB_DROP_DELETE,filePath,my_error);
if(thd)
net_printf(&thd->net,ER_DB_DROP_DELETE,filePath,my_error);
my_dirend(dirp);
DBUG_RETURN(-1);
}
......@@ -205,7 +224,7 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
my_dirend(dirp);
if (thd->killed)
if (thd && thd->killed)
{
send_error(&thd->net,ER_SERVER_SHUTDOWN);
DBUG_RETURN(-1);
......@@ -229,7 +248,8 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
/* Don't give errors if we can't delete 'RAID' directory */
if (level)
DBUG_RETURN(deleted);
send_error(&thd->net);
if(thd)
send_error(&thd->net);
DBUG_RETURN(-1);
}
path=filePath;
......@@ -242,7 +262,8 @@ static long mysql_rm_known_files(THD *thd, MY_DIR *dirp, const char *org_path,
/* Don't give errors if we can't delete 'RAID' directory */
if (rmdir(path) < 0 && !level)
{
net_printf(&thd->net,ER_DB_DROP_RMDIR, path,errno);
if(thd)
net_printf(&thd->net,ER_DB_DROP_RMDIR, path,errno);
DBUG_RETURN(-1);
}
}
......
......@@ -53,7 +53,7 @@ enum enum_sql_command {
SQLCOM_BEGIN, SQLCOM_LOAD_MASTER_TABLE, SQLCOM_CHANGE_MASTER,
SQLCOM_RENAME_TABLE, SQLCOM_BACKUP_TABLE, SQLCOM_RESTORE_TABLE,
SQLCOM_RESET, SQLCOM_PURGE, SQLCOM_SHOW_BINLOGS,
SQLCOM_SHOW_OPEN_TABLES,
SQLCOM_SHOW_OPEN_TABLES, SQLCOM_LOAD_MASTER_DATA,
SQLCOM_HA_OPEN, SQLCOM_HA_CLOSE, SQLCOM_HA_READ
};
......
......@@ -1203,6 +1203,13 @@ mysql_execute_command(void)
res = show_binlog_info(thd);
break;
}
case SQLCOM_LOAD_MASTER_DATA: // sync with master
if(check_process_priv(thd))
goto error;
res = load_master_data(thd);
break;
case SQLCOM_LOAD_MASTER_TABLE:
if (!tables->db)
......@@ -1226,9 +1233,7 @@ mysql_execute_command(void)
break;
}
thd->last_nx_table = tables->real_name;
thd->last_nx_db = tables->db;
if(fetch_nx_table(thd, &glob_mi))
if(fetch_nx_table(thd, tables->db, tables->real_name, &glob_mi, 0))
// fetch_nx_table is responsible for sending
// the error
{
......
......@@ -21,6 +21,7 @@
#include "sql_repl.h"
#include "sql_acl.h"
#include "log_event.h"
#include "mini_client.h"
#include <thr_alarm.h>
#include <my_dir.h>
......@@ -845,5 +846,220 @@ int show_binlogs(THD* thd)
return 1;
}
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
{
if(!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))
{
sql_print_error("Connection to master failed: %s",
mc_mysql_error(mysql));
return 1;
}
return 0;
}
static inline void cleanup_mysql_results(MYSQL_RES* db_res,
MYSQL_RES** cur, MYSQL_RES** start)
{
for( ; cur >= start; --cur)
if(*cur)
mc_mysql_free_result(*cur);
mc_mysql_free_result(db_res);
}
static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
MYSQL_RES* table_res)
{
MYSQL_ROW row;
for( row = mc_mysql_fetch_row(table_res); row;
row = mc_mysql_fetch_row(table_res))
{
TABLE_LIST table;
const char* table_name = row[0];
int error;
if(table_rules_on)
{
table.next = 0;
table.db = (char*)db;
table.real_name = (char*)table_name;
if(!tables_ok(thd, &table))
continue;
}
if((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql)))
return error;
}
return 0;
}
int load_master_data(THD* thd)
{
MYSQL mysql;
MYSQL_RES* master_status_res = 0;
bool slave_was_running = 0;
int error = 0;
mc_mysql_init(&mysql);
pthread_mutex_lock(&LOCK_slave);
// we do not want anyone messing with the slave at all for the entire
// duration of the data load;
// first, kill the slave
if((slave_was_running = slave_running))
{
abort_slave = 1;
thr_alarm_kill(slave_real_id);
thd->proc_info = "waiting for slave to die";
while(slave_running)
pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
}
if(connect_to_master(thd, &mysql, &glob_mi))
{
net_printf(&thd->net, error = ER_CONNECT_TO_MASTER,
mc_mysql_error(&mysql));
goto err;
}
// now that we are connected, get all database and tables in each
{
MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
uint num_dbs;
MYSQL_ROW row;
if(mc_mysql_query(&mysql, "show databases", 0) ||
!(db_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
if(!(num_dbs = mc_mysql_num_rows(db_res)))
goto err;
// in theory, the master could have no databases at all
// and run with skip-grant
if(!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
{
net_printf(&thd->net, error = ER_OUTOFMEMORY);
goto err;
}
// this is a temporary solution until we have online backup
// capabilities - to be replaced once online backup is working
// we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
// can to minimize the lock time
if(mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0)
|| mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
!(master_status_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
// go through every table in every database, and if the replication
// rules allow replicating it, get it
table_res_end = table_res + num_dbs;
for(cur_table_res = table_res; cur_table_res < table_res_end;
++cur_table_res)
{
MYSQL_ROW row = mc_mysql_fetch_row(db_res);
// since we know how many rows we have, this can never be NULL
char* db = row[0];
int drop_error = 0;
// do not replicate databases excluded by rules
// also skip mysql database - in most cases the user will
// mess up and not exclude mysql database with the rules when
// he actually means to - in this case, he is up for a surprise if
// his priv tables get dropped and downloaded from master
// TO DO - add special option, not enabled
// by default, to allow inclusion of mysql database into load
// data from master
if(!db_ok(db, replicate_do_db, replicate_ignore_db) ||
!strcmp(db,"mysql"))
{
*cur_table_res = 0;
continue;
}
if((drop_error = mysql_rm_db(0, db, 1)) ||
mysql_create_db(0, db, 0))
{
error = (drop_error) ? ER_DB_DROP_DELETE : ER_CANT_CREATE_DB;
net_printf(&thd->net, error, db, my_error);
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
goto err;
}
if(mc_mysql_select_db(&mysql, db) ||
mc_mysql_query(&mysql, "show tables", 0) ||
!(*cur_table_res = mc_mysql_store_result(&mysql)))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
goto err;
}
if((error = fetch_db_tables(thd, &mysql, db, *cur_table_res)))
{
// we do not report the error - fetch_db_tables handles it
cleanup_mysql_results(db_res, cur_table_res, table_res);
goto err;
}
}
cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
// adjust position in the master
if(master_status_res)
{
MYSQL_ROW row = mc_mysql_fetch_row(master_status_res);
// we need this check because the master may not be running with
// log-bin, but it will still allow us to do all the steps
// of LOAD DATA FROM MASTER - no reason to forbid it, really,
// although it does not make much sense for the user to do it
if(row[0] && row[1])
{
strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name));
glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB
if(glob_mi.pos < 4)
glob_mi.pos = 4; // don't hit the magic number
glob_mi.pending = 0;
flush_master_info(&glob_mi);
}
mc_mysql_free_result(master_status_res);
}
if(mc_mysql_query(&mysql, "UNLOCK TABLES", 0))
{
net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
mc_mysql_error(&mysql));
goto err;
}
}
err:
pthread_mutex_unlock(&LOCK_slave);
if(slave_was_running)
start_slave(0, 0);
mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
if(!error)
send_ok(&thd->net);
return error;
}
......@@ -14,6 +14,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
int start_slave(THD* thd = 0, bool net_report = 1);
int stop_slave(THD* thd = 0, bool net_report = 1);
int load_master_data(THD* thd);
int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi);
int change_master(THD* thd);
void reset_slave();
void reset_master();
......
......@@ -2401,6 +2401,11 @@ load: LOAD DATA_SYM opt_low_priority opt_local INFILE TEXT_STRING
YYABORT;
}
|
LOAD DATA_SYM FROM MASTER_SYM
{
Lex->sql_command = SQLCOM_LOAD_MASTER_DATA;
}
opt_local:
/* empty */ { $$=0;}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment