Commit 2ff9aee8 authored by unknown's avatar unknown

changed processing of LOAD DATA in mysqlbinlog


client/Makefile.am:
  added ../mysys/mf_tempdir.c to mysqlbinlog_SOURCES
client/mysqlbinlog.cc:
  changed processing of LOAD DATA
mysql-test/r/rpl_loaddata.result:
  added LINE STARTING BY '>'
mysql-test/std_data/rpl_loaddata2.dat:
  added LINE STARTING BY '>'
mysql-test/t/rpl_loaddata.test:
  added LINE STARTING BY '>'
sql/log_event.cc:
  fixed some bugs in processing of LOAD DATA
parent 429c488f
...@@ -35,7 +35,7 @@ mysqldump_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) ...@@ -35,7 +35,7 @@ mysqldump_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES)
mysqlimport_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) mysqlimport_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES)
mysqltest_SOURCES= mysqltest.c mysqltest_SOURCES= mysqltest.c
mysqltest_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) mysqltest_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES)
mysqlbinlog_SOURCES = mysqlbinlog.cc mysqlbinlog_SOURCES = mysqlbinlog.cc ../mysys/mf_tempdir.c
mysqlbinlog_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) mysqlbinlog_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES)
mysqlmanagerc_SOURCES = mysqlmanagerc.c mysqlmanagerc_SOURCES = mysqlmanagerc.c
mysqlmanagerc_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) mysqlmanagerc_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES)
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <time.h> #include <time.h>
#include <assert.h> #include <assert.h>
#include "log_event.h" #include "log_event.h"
#include "include/my_sys.h"
#define BIN_LOG_HEADER_SIZE 4 #define BIN_LOG_HEADER_SIZE 4
#define PROBE_HEADER_LEN (EVENT_LEN_OFFSET+4) #define PROBE_HEADER_LEN (EVENT_LEN_OFFSET+4)
...@@ -57,7 +58,6 @@ static short binlog_flags = 0; ...@@ -57,7 +58,6 @@ static short binlog_flags = 0;
static MYSQL* mysql = NULL; static MYSQL* mysql = NULL;
static const char* table = 0; static const char* table = 0;
static bool use_local_load= 0;
static const char* dirname_for_local_load= 0; static const char* dirname_for_local_load= 0;
static void dump_local_log_entries(const char* logname); static void dump_local_log_entries(const char* logname);
...@@ -106,7 +106,7 @@ class Load_log_processor ...@@ -106,7 +106,7 @@ class Load_log_processor
gptr data, uint size) gptr data, uint size)
{ {
File file; File file;
if ((file= my_open(fname,flags,MYF(MY_WME)) < 0) || if (((file= my_open(fname,flags,MYF(MY_WME))) < 0) ||
my_write(file,(byte*) data,size,MYF(MY_WME|MY_NABP)) || my_write(file,(byte*) data,size,MYF(MY_WME|MY_NABP)) ||
my_close(file,MYF(MY_WME))) my_close(file,MYF(MY_WME)))
exit(1); exit(1);
...@@ -149,7 +149,9 @@ class Load_log_processor ...@@ -149,7 +149,9 @@ class Load_log_processor
} }
void init_by_cur_dir() void init_by_cur_dir()
{ {
target_dir_name_len= 0; if (my_getwd(target_dir_name,sizeof(target_dir_name),MYF(MY_WME)))
exit(1);
target_dir_name_len= strlen(target_dir_name);
} }
void destroy() void destroy()
{ {
...@@ -176,7 +178,7 @@ class Load_log_processor ...@@ -176,7 +178,7 @@ class Load_log_processor
void process(Create_file_log_event *ce) void process(Create_file_log_event *ce)
{ {
const char *fname= create_file(ce); const char *fname= create_file(ce);
append_to_file(fname,O_CREAT|O_BINARY,ce->block,ce->block_len); append_to_file(fname,O_CREAT|O_EXCL|O_BINARY|O_WRONLY,ce->block,ce->block_len);
} }
void process(Append_block_log_event *ae) void process(Append_block_log_event *ae)
{ {
...@@ -184,7 +186,7 @@ class Load_log_processor ...@@ -184,7 +186,7 @@ class Load_log_processor
die("Skiped CreateFile event for file_id: %u",ae->file_id); die("Skiped CreateFile event for file_id: %u",ae->file_id);
Create_file_log_event* ce= Create_file_log_event* ce=
*((Create_file_log_event**)file_names.buffer + ae->file_id); *((Create_file_log_event**)file_names.buffer + ae->file_id);
append_to_file(ce->fname,O_APPEND|O_BINARY,ae->block,ae->block_len); append_to_file(ce->fname,O_APPEND|O_BINARY|O_WRONLY,ae->block,ae->block_len);
} }
}; };
...@@ -309,7 +311,7 @@ extern "C" my_bool ...@@ -309,7 +311,7 @@ extern "C" my_bool
get_one_option(int optid, const struct my_option *opt __attribute__((unused)), get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
char *argument) char *argument)
{ {
switch(optid) { switch (optid) {
#ifndef DBUG_OFF #ifndef DBUG_OFF
case '#': case '#':
DBUG_PUSH(argument ? argument : default_dbug_option); DBUG_PUSH(argument ? argument : default_dbug_option);
...@@ -338,9 +340,6 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -338,9 +340,6 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
case 'V': case 'V':
print_version(); print_version();
exit(0); exit(0);
case 'l':
use_local_load= 1;
break;
case '?': case '?':
usage(); usage();
exit(0); exit(0);
...@@ -552,8 +551,6 @@ static void dump_local_log_entries(const char* logname) ...@@ -552,8 +551,6 @@ static void dump_local_log_entries(const char* logname)
MYF(MY_WME | MY_NABP))) MYF(MY_WME | MY_NABP)))
exit(1); exit(1);
old_format = check_header(file); old_format = check_header(file);
if (use_local_load && !dirname_for_local_load)
load_processor.init_by_file_name(logname);
} }
else else
{ {
...@@ -575,8 +572,6 @@ static void dump_local_log_entries(const char* logname) ...@@ -575,8 +572,6 @@ static void dump_local_log_entries(const char* logname)
} }
file->pos_in_file=position; file->pos_in_file=position;
file->seek_not_done=0; file->seek_not_done=0;
if (use_local_load && !dirname_for_local_load)
load_processor.init_by_cur_dir();
} }
if (!position) if (!position)
...@@ -632,37 +627,31 @@ Could not read entry at offset %s : Error in log format or read error", ...@@ -632,37 +627,31 @@ Could not read entry at offset %s : Error in log format or read error",
if (!short_form) if (!short_form)
fprintf(result_file, "# at %s\n",llstr(old_off,llbuff)); fprintf(result_file, "# at %s\n",llstr(old_off,llbuff));
if (!use_local_load) switch (ev->get_type_code()) {
case CREATE_FILE_EVENT:
{
Create_file_log_event* ce= (Create_file_log_event*)ev;
ce->print(result_file, short_form, last_db,true);
load_processor.process(ce);
ev= 0;
break;
}
case APPEND_BLOCK_EVENT:
ev->print(result_file, short_form, last_db); ev->print(result_file, short_form, last_db);
else load_processor.process((Append_block_log_event*)ev);
break;
case EXEC_LOAD_EVENT:
{ {
switch(ev->get_type_code()) ev->print(result_file, short_form, last_db);
{ Execute_load_log_event *exv= (Execute_load_log_event*)ev;
case CREATE_FILE_EVENT: Create_file_log_event *ce= load_processor.grab_event(exv->file_id);
{ ce->print(result_file, short_form, last_db,true);
Create_file_log_event* ce= (Create_file_log_event*)ev; my_free((char*)ce->fname,MYF(MY_WME));
ce->print(result_file, short_form, last_db,true); delete ce;
load_processor.process(ce); break;
ev= 0; }
break; default:
} ev->print(result_file, short_form, last_db);
case APPEND_BLOCK_EVENT:
ev->print(result_file, short_form, last_db);
load_processor.process((Append_block_log_event*)ev);
break;
case EXEC_LOAD_EVENT:
{
ev->print(result_file, short_form, last_db);
Execute_load_log_event *exv= (Execute_load_log_event*)ev;
Create_file_log_event *ce= load_processor.grab_event(exv->file_id);
ce->print(result_file, short_form, last_db,true);
my_free((char*)ce->fname,MYF(MY_WME));
delete ce;
break;
}
default:
ev->print(result_file, short_form, last_db);
}
} }
} }
rec_count++; rec_count++;
...@@ -688,8 +677,20 @@ int main(int argc, char** argv) ...@@ -688,8 +677,20 @@ int main(int argc, char** argv)
if (use_remote) if (use_remote)
mysql = safe_connect(); mysql = safe_connect();
MY_TMPDIR tmpdir;
tmpdir.list= 0;
if (!dirname_for_local_load)
{
if (init_tmpdir(&tmpdir, 0))
exit(1);
dirname_for_local_load= my_tmpdir(&tmpdir);
}
if (dirname_for_local_load) if (dirname_for_local_load)
load_processor.init_by_dir_name(dirname_for_local_load); load_processor.init_by_dir_name(dirname_for_local_load);
else
load_processor.init_by_cur_dir();
if (table) if (table)
{ {
...@@ -707,6 +708,8 @@ int main(int argc, char** argv) ...@@ -707,6 +708,8 @@ int main(int argc, char** argv)
while (--argc >= 0) while (--argc >= 0)
dump_log_entries(*(argv++)); dump_log_entries(*(argv++));
} }
if (tmpdir.list)
free_tmpdir(&tmpdir);
if (result_file != stdout) if (result_file != stdout)
my_fclose(result_file, MYF(0)); my_fclose(result_file, MYF(0));
if (use_remote) if (use_remote)
......
...@@ -7,7 +7,7 @@ start slave; ...@@ -7,7 +7,7 @@ start slave;
create table t1(a int not null auto_increment, b int, primary key(a) ); create table t1(a int not null auto_increment, b int, primary key(a) );
load data infile '../../std_data/rpl_loaddata.dat' into table t1; load data infile '../../std_data/rpl_loaddata.dat' into table t1;
create temporary table t2 (day date,id int(9),category enum('a','b','c'),name varchar(60)); create temporary table t2 (day date,id int(9),category enum('a','b','c'),name varchar(60));
load data infile '../../std_data/rpl_loaddata2.dat' into table t2 fields terminated by ',' optionally enclosed by '%' escaped by '@' lines terminated by '\n##\n' ignore 1 lines; load data infile '../../std_data/rpl_loaddata2.dat' into table t2 fields terminated by ',' optionally enclosed by '%' escaped by '@' lines terminated by '\n##\n' starting by '>' ignore 1 lines;
create table t3 (day date,id int(9),category enum('a','b','c'),name varchar(60)); create table t3 (day date,id int(9),category enum('a','b','c'),name varchar(60));
insert into t3 select * from t2; insert into t3 select * from t2;
select * from t1; select * from t1;
......
2003-01-21,6328,%a%,%aaaaa% >2003-01-21,6328,%a%,%aaaaa%
## ##
2003-02-22,2461,b,%a a a @@ @% @b ' " a% >2003-02-22,2461,b,%a a a @@ @% @b ' " a%
## ##
2003-03-22,2161,%c%,%asdf% >2003-03-22,2161,%c%,%asdf%
## ##
2003-04-22,2416,%a%,%bbbbb% >2003-04-22,2416,%a%,%bbbbb%
## ##
...@@ -10,8 +10,7 @@ create table t1(a int not null auto_increment, b int, primary key(a) ); ...@@ -10,8 +10,7 @@ create table t1(a int not null auto_increment, b int, primary key(a) );
load data infile '../../std_data/rpl_loaddata.dat' into table t1; load data infile '../../std_data/rpl_loaddata.dat' into table t1;
create temporary table t2 (day date,id int(9),category enum('a','b','c'),name varchar(60)); create temporary table t2 (day date,id int(9),category enum('a','b','c'),name varchar(60));
#load data infile '../../std_data/rpl_loaddata2.dat' into table t2 fields terminated by ',' optionaly enclosed by '%' escaped by '@' lines terminated by '\n%%\n' ignore 1 lines; load data infile '../../std_data/rpl_loaddata2.dat' into table t2 fields terminated by ',' optionally enclosed by '%' escaped by '@' lines terminated by '\n##\n' starting by '>' ignore 1 lines;
load data infile '../../std_data/rpl_loaddata2.dat' into table t2 fields terminated by ',' optionally enclosed by '%' escaped by '@' lines terminated by '\n##\n' ignore 1 lines;
create table t3 (day date,id int(9),category enum('a','b','c'),name varchar(60)); create table t3 (day date,id int(9),category enum('a','b','c'),name varchar(60));
insert into t3 select * from t2; insert into t3 select * from t2;
......
...@@ -1182,15 +1182,19 @@ void Load_log_event::pack_info(Protocol *protocol) ...@@ -1182,15 +1182,19 @@ void Load_log_event::pack_info(Protocol *protocol)
pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len); pos= pretty_print_str(pos, sql_ex.escaped, sql_ex.escaped_len);
} }
bool line_lexem_added= false;
if (sql_ex.line_term_len) if (sql_ex.line_term_len)
{ {
pos= strmov(pos, " LINES TERMINATED BY "); pos= strmov(pos, " LINES TERMINATED BY ");
pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len); pos= pretty_print_str(pos, sql_ex.line_term, sql_ex.line_term_len);
line_lexem_added= true;
} }
if (sql_ex.line_start_len) if (sql_ex.line_start_len)
{ {
pos= strmov(pos, " LINES STARTING BY "); if (!line_lexem_added)
pos= strmov(pos," LINES");
pos= strmov(pos, " STARTING BY ");
pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len); pos= pretty_print_str(pos, sql_ex.line_start, sql_ex.line_start_len);
} }
...@@ -1438,10 +1442,10 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) ...@@ -1438,10 +1442,10 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
if (db && db[0] && !same_db) if (db && db[0] && !same_db)
fprintf(file, "use %s;\n", db); fprintf(file, "use %s;\n", db);
fprintf(file, "LOAD "); fprintf(file, "LOAD DATA ");
if (check_fname_outside_temp_buf()) if (check_fname_outside_temp_buf())
fprintf(file, "LOCAL "); fprintf(file, "LOCAL ");
fprintf(file, "DATA INFILE '%-*s' ", fname_len, fname); fprintf(file, "INFILE '%-*s' ", fname_len, fname);
if (sql_ex.opt_flags & REPLACE_FLAG) if (sql_ex.opt_flags & REPLACE_FLAG)
fprintf(file," REPLACE "); fprintf(file," REPLACE ");
...@@ -1469,15 +1473,19 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) ...@@ -1469,15 +1473,19 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len); pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len);
} }
bool line_lexem_added= false;
if (sql_ex.line_term) if (sql_ex.line_term)
{ {
fprintf(file," LINES TERMINATED BY "); fprintf(file," LINES TERMINATED BY ");
pretty_print_str(file, sql_ex.line_term, sql_ex.line_term_len); pretty_print_str(file, sql_ex.line_term, sql_ex.line_term_len);
line_lexem_added= true;
} }
if (sql_ex.line_start) if (sql_ex.line_start)
{ {
fprintf(file," LINES STARTING BY "); if (!line_lexem_added)
fprintf(file," LINES");
fprintf(file," STARTING BY ");
pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len); pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment