Commit bee0941d authored by unknown's avatar unknown

Adding thread support for mysqlimport. You can now specify a number of threads...

Adding thread support for mysqlimport. You can now specify a number of threads to use and it will thread the loading of the database. Anyone who has had to go through the pain of loading the database will immediatly get the reason for this.


client/Makefile.am:
  Adding client_thread_libs for mysqlimport (aka it gets pthreads)
client/client_priv.h:
  New option
client/mysqlimport.c:
  Reworked logic to allow someone to use threads.
mysql-test/r/mysqldump.result:
  New results
mysql-test/t/mysqldump.test:
  Added tests for threads.
parent 033b29ee
...@@ -49,6 +49,7 @@ mysqlbinlog_SOURCES = mysqlbinlog.cc $(top_srcdir)/mysys/mf_tempdir.c \ ...@@ -49,6 +49,7 @@ mysqlbinlog_SOURCES = mysqlbinlog.cc $(top_srcdir)/mysys/mf_tempdir.c \
$(top_srcdir)/mysys/base64.c $(top_srcdir)/mysys/base64.c
mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS) mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS)
mysqlslap_LDADD = $(LDADD) $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS) mysqlslap_LDADD = $(LDADD) $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS)
mysqlimport_LDADD = $(LDADD) $(CXXLDFLAGS) $(CLIENT_THREAD_LIBS)
mysqltestmanager_pwgen_SOURCES = mysqlmanager-pwgen.c mysqltestmanager_pwgen_SOURCES = mysqlmanager-pwgen.c
mysqltestmanagerc_SOURCES= mysqlmanagerc.c $(yassl_dummy_link_fix) mysqltestmanagerc_SOURCES= mysqlmanagerc.c $(yassl_dummy_link_fix)
mysqlcheck_SOURCES= mysqlcheck.c $(yassl_dummy_link_fix) mysqlcheck_SOURCES= mysqlcheck.c $(yassl_dummy_link_fix)
......
...@@ -54,6 +54,7 @@ enum options_client ...@@ -54,6 +54,7 @@ enum options_client
OPT_MYSQL_LOCK_DIRECTORY, OPT_MYSQL_LOCK_DIRECTORY,
OPT_MYSQL_SLAP_SLAVE, OPT_MYSQL_SLAP_SLAVE,
OPT_USE_THREADS, OPT_USE_THREADS,
OPT_IMPORT_USE_THREADS,
OPT_MYSQL_NUMBER_OF_QUERY, OPT_MYSQL_NUMBER_OF_QUERY,
OPT_MYSQL_PRESERVE_SCHEMA, OPT_MYSQL_PRESERVE_SCHEMA,
OPT_IGNORE_TABLE,OPT_INSERT_IGNORE,OPT_SHOW_WARNINGS,OPT_DROP_DATABASE, OPT_IGNORE_TABLE,OPT_INSERT_IGNORE,OPT_SHOW_WARNINGS,OPT_DROP_DATABASE,
......
...@@ -29,6 +29,10 @@ ...@@ -29,6 +29,10 @@
#include "client_priv.h" #include "client_priv.h"
#include "mysql_version.h" #include "mysql_version.h"
#include <my_pthread.h>
/* Global Thread counter */
int counter= 0;
static void db_error_with_table(MYSQL *mysql, char *table); static void db_error_with_table(MYSQL *mysql, char *table);
static void db_error(MYSQL *mysql); static void db_error(MYSQL *mysql);
...@@ -39,6 +43,7 @@ static char *add_load_option(char *ptr,const char *object, ...@@ -39,6 +43,7 @@ static char *add_load_option(char *ptr,const char *object,
static my_bool verbose=0,lock_tables=0,ignore_errors=0,opt_delete=0, static my_bool verbose=0,lock_tables=0,ignore_errors=0,opt_delete=0,
replace=0,silent=0,ignore=0,opt_compress=0, replace=0,silent=0,ignore=0,opt_compress=0,
opt_low_priority= 0, tty_password= 0; opt_low_priority= 0, tty_password= 0;
static my_bool opt_use_threads= 0;
static uint opt_local_file=0; static uint opt_local_file=0;
static MYSQL mysql_connection; static MYSQL mysql_connection;
static char *opt_password=0, *current_user=0, static char *opt_password=0, *current_user=0,
...@@ -138,6 +143,11 @@ static struct my_option my_long_options[] = ...@@ -138,6 +143,11 @@ static struct my_option my_long_options[] =
(gptr*) &opt_mysql_unix_port, (gptr*) &opt_mysql_unix_port, 0, GET_STR, (gptr*) &opt_mysql_unix_port, (gptr*) &opt_mysql_unix_port, 0, GET_STR,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#include <sslopt-longopts.h> #include <sslopt-longopts.h>
{"use-threads", OPT_USE_THREADS,
"Parrelize the loading of files. Requires an arguement for the number \
threads to use for loading of data.",
(gptr*) &opt_use_threads, (gptr*) &opt_use_threads, 0,
GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#ifndef DONT_ALLOW_USER_CHANGE #ifndef DONT_ALLOW_USER_CHANGE
{"user", 'u', "User for login if not current user.", (gptr*) &current_user, {"user", 'u', "User for login if not current user.", (gptr*) &current_user,
(gptr*) &current_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, (gptr*) &current_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
...@@ -287,7 +297,7 @@ static int write_to_table(char *filename, MYSQL *sock) ...@@ -287,7 +297,7 @@ static int write_to_table(char *filename, MYSQL *sock)
{ {
if (verbose) if (verbose)
fprintf(stdout, "Deleting the old data from table %s\n", tablename); fprintf(stdout, "Deleting the old data from table %s\n", tablename);
sprintf(sql_statement, "DELETE FROM %s", tablename); snprintf(sql_statement, FN_REFLEN*16+256, "DELETE FROM %s", tablename);
if (mysql_query(sock, sql_statement)) if (mysql_query(sock, sql_statement))
{ {
db_error_with_table(sock, tablename); db_error_with_table(sock, tablename);
...@@ -497,12 +507,39 @@ static char *field_escape(char *to,const char *from,uint length) ...@@ -497,12 +507,39 @@ static char *field_escape(char *to,const char *from,uint length)
} }
int
worker_thread(char *raw_table_name)
{
MYSQL *sock= 0;
if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
{
goto error;
}
if (mysql_query(sock, "set @@character_set_database=binary;"))
{
db_error(sock); /* We shall countinue here, if --force was given */
goto error;
}
/*
We should do something about the error
*/
write_to_table(raw_table_name, sock);
error:
if (sock)
db_disconnect(current_host, sock);
counter--;
return 0;
}
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
int exitcode=0, error=0; int exitcode=0, error=0;
char **argv_to_free; char **argv_to_free;
MYSQL *sock=0;
MY_INIT(argv[0]); MY_INIT(argv[0]);
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
...@@ -513,25 +550,73 @@ int main(int argc, char **argv) ...@@ -513,25 +550,73 @@ int main(int argc, char **argv)
free_defaults(argv_to_free); free_defaults(argv_to_free);
return(1); return(1);
} }
if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
{
free_defaults(argv_to_free);
return(1); /* purecov: deadcode */
}
if (mysql_query(sock, "set @@character_set_database=binary;")) if (opt_use_threads)
{ {
db_error(sock); /* We shall countinue here, if --force was given */ pthread_t mainthread; /* Thread descriptor */
return(1); pthread_attr_t attr; /* Thread attributes */
for (; *argv != NULL; argv++) // Loop through tables
{
/*
If we hit thread count limit we loop until some threads exit.
We sleep for a second, so that we don't chew up a lot of
CPU in the loop.
*/
sanity_label:
if (counter == opt_use_threads)
{
sleep(1);
goto sanity_label;
}
counter++;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,
PTHREAD_CREATE_DETACHED);
/* now create the thread */
if (pthread_create(&mainthread, &attr, (void *)worker_thread,
(void *)*argv) != 0)
{
counter--;
fprintf(stderr,"%s: Could not create thread\n",
my_progname);
}
}
/*
We loop until we know that all children have cleaned up.
*/
loop_label:
if (counter)
{
sleep(1);
goto loop_label;
}
} }
else
{
MYSQL *sock= 0;
if (!(sock= db_connect(current_host,current_db,current_user,opt_password)))
{
free_defaults(argv_to_free);
return(1); /* purecov: deadcode */
}
if (lock_tables) if (mysql_query(sock, "set @@character_set_database=binary;"))
lock_table(sock, argc, argv); {
for (; *argv != NULL; argv++) db_error(sock); /* We shall countinue here, if --force was given */
if ((error=write_to_table(*argv, sock))) return(1);
if (exitcode == 0) }
exitcode = error;
db_disconnect(current_host, sock); if (lock_tables)
lock_table(sock, argc, argv);
for (; *argv != NULL; argv++)
if ((error=write_to_table(*argv, sock)))
if (exitcode == 0)
exitcode = error;
db_disconnect(current_host, sock);
}
my_free(opt_password,MYF(MY_ALLOW_ZERO_PTR)); my_free(opt_password,MYF(MY_ALLOW_ZERO_PTR));
#ifdef HAVE_SMEM #ifdef HAVE_SMEM
my_free(shared_memory_base_name,MYF(MY_ALLOW_ZERO_PTR)); my_free(shared_memory_base_name,MYF(MY_ALLOW_ZERO_PTR));
......
...@@ -2650,3 +2650,35 @@ DELIMITER ; ...@@ -2650,3 +2650,35 @@ DELIMITER ;
DROP TRIGGER tr1; DROP TRIGGER tr1;
DROP TABLE t1; DROP TABLE t1;
create table t1 (a text , b text);
create table t2 (a text , b text);
insert t1 values ("Duck, Duck", "goose");
insert t1 values ("Duck, Duck", "pidgeon");
insert t2 values ("We the people", "in order to perform");
insert t2 values ("a more perfect", "union");
select * from t1;
a b
Duck, Duck goose
Duck, Duck pidgeon
select * from t2;
a b
We the people in order to perform
a more perfect union
test.t1: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
test.t2: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
test.t1: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
test.t2: Records: 2 Deleted: 0 Skipped: 0 Warnings: 0
select * from t1;
a b
Duck, Duck goose
Duck, Duck pidgeon
Duck, Duck goose
Duck, Duck pidgeon
select * from t2;
a b
We the people in order to perform
a more perfect union
We the people in order to perform
a more perfect union
drop table t1;
drop table t2;
...@@ -1048,3 +1048,27 @@ SET SQL_MODE = @old_sql_mode; ...@@ -1048,3 +1048,27 @@ SET SQL_MODE = @old_sql_mode;
DROP TRIGGER tr1; DROP TRIGGER tr1;
DROP TABLE t1; DROP TABLE t1;
#
# Added for use-thread option
#
create table t1 (a text , b text);
create table t2 (a text , b text);
insert t1 values ("Duck, Duck", "goose");
insert t1 values ("Duck, Duck", "pidgeon");
insert t2 values ("We the people", "in order to perform");
insert t2 values ("a more perfect", "union");
select * from t1;
select * from t2;
--exec $MYSQL_DUMP --tab=$MYSQLTEST_VARDIR/tmp/ test
--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/t1.sql
--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/t2.sql
# The first load tests the pausing code
--exec $MYSQL_IMPORT --use-threads=1 test $MYSQLTEST_VARDIR/tmp/t1.txt $MYSQLTEST_VARDIR/tmp/t2.txt
# Now we test with multiple threads!
--exec $MYSQL_IMPORT --use-threads=5 test $MYSQLTEST_VARDIR/tmp/t1.txt $MYSQLTEST_VARDIR/tmp/t2.txt
select * from t1;
select * from t2;
drop table t1;
drop table t2;
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment