Commit 04988d87 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-33627 refactor threading in mariadb-import

Use threadpool, instead of one-thread-and-connection-per-table
parent c483c5ca
...@@ -57,7 +57,8 @@ MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.cc ../sql-common/my_user.c connectio ...@@ -57,7 +57,8 @@ MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.cc ../sql-common/my_user.c connectio
TARGET_LINK_LIBRARIES(mariadb-dump ${CLIENT_LIB}) TARGET_LINK_LIBRARIES(mariadb-dump ${CLIENT_LIB})
MYSQL_ADD_EXECUTABLE(mariadb-import mysqlimport.cc) MYSQL_ADD_EXECUTABLE(mariadb-import mysqlimport.cc)
TARGET_LINK_LIBRARIES(mariadb-import ${CLIENT_LIB}) target_include_directories(mariadb-import PRIVATE ${CMAKE_SOURCE_DIR}/tpool)
target_link_libraries(mariadb-import PRIVATE tpool ${CLIENT_LIB})
MYSQL_ADD_EXECUTABLE(mariadb-upgrade mysql_upgrade.c COMPONENT Server) MYSQL_ADD_EXECUTABLE(mariadb-upgrade mysql_upgrade.c COMPONENT Server)
TARGET_LINK_LIBRARIES(mariadb-upgrade ${CLIENT_LIB}) TARGET_LINK_LIBRARIES(mariadb-upgrade ${CLIENT_LIB})
......
...@@ -36,12 +36,8 @@ ...@@ -36,12 +36,8 @@
#include <welcome_copyright_notice.h> /* ORACLE_WELCOME_COPYRIGHT_NOTICE */ #include <welcome_copyright_notice.h> /* ORACLE_WELCOME_COPYRIGHT_NOTICE */
#include <tpool.h>
/* Global Thread counter */ tpool::thread_pool *thread_pool;
uint counter= 0;
pthread_mutex_t init_mutex;
pthread_mutex_t counter_mutex;
pthread_cond_t count_threshhold;
static void db_error_with_table(MYSQL *mysql, char *table); static void db_error_with_table(MYSQL *mysql, char *table);
static void db_error(MYSQL *mysql); static void db_error(MYSQL *mysql);
...@@ -445,13 +441,10 @@ static MYSQL *db_connect(char *host, char *database, ...@@ -445,13 +441,10 @@ static MYSQL *db_connect(char *host, char *database,
fprintf(stdout, "Connecting to %s\n", host ? host : "localhost"); fprintf(stdout, "Connecting to %s\n", host ? host : "localhost");
if (opt_use_threads && !lock_tables) if (opt_use_threads && !lock_tables)
{ {
pthread_mutex_lock(&init_mutex);
if (!(mysql= mysql_init(NULL))) if (!(mysql= mysql_init(NULL)))
{ {
pthread_mutex_unlock(&init_mutex);
return 0; return 0;
} }
pthread_mutex_unlock(&init_mutex);
} }
else else
if (!(mysql= mysql_init(NULL))) if (!(mysql= mysql_init(NULL)))
...@@ -496,6 +489,8 @@ static MYSQL *db_connect(char *host, char *database, ...@@ -496,6 +489,8 @@ static MYSQL *db_connect(char *host, char *database,
if (ignore_foreign_keys) if (ignore_foreign_keys)
mysql_query(mysql, "set foreign_key_checks= 0;"); mysql_query(mysql, "set foreign_key_checks= 0;");
if (mysql_query(mysql, "/*!40101 set @@character_set_database=binary */;"))
db_error(mysql);
return mysql; return mysql;
} }
...@@ -514,14 +509,10 @@ static void safe_exit(int error, MYSQL *mysql) ...@@ -514,14 +509,10 @@ static void safe_exit(int error, MYSQL *mysql)
if (error && ignore_errors) if (error && ignore_errors)
return; return;
/* in multi-threaded mode protect from concurrent safe_exit's */
if (counter)
pthread_mutex_lock(&counter_mutex);
if (mysql) if (mysql)
mysql_close(mysql); mysql_close(mysql);
if (counter) if (thread_pool)
{ {
/* dirty exit. some threads are running, /* dirty exit. some threads are running,
memory is not freed, openssl not deinitialized */ memory is not freed, openssl not deinitialized */
...@@ -603,49 +594,42 @@ static char *field_escape(char *to,const char *from,uint length) ...@@ -603,49 +594,42 @@ static char *field_escape(char *to,const char *from,uint length)
return to; return to;
} }
int exitcode= 0; std::atomic<int> exitcode;
void set_exitcode(int code)
pthread_handler_t worker_thread(void *arg)
{ {
int error; int expected= 0;
char *raw_table_name= (char *)arg; exitcode.compare_exchange_strong(expected,code);
MYSQL *mysql= 0; }
if (mysql_thread_init()) thread_local MYSQL *thread_local_mysql;
goto error;
if (!(mysql= db_connect(current_host,current_db,current_user,opt_password)))
{
goto error;
}
if (mysql_query(mysql, "/*!40101 set @@character_set_database=binary */;"))
{
db_error(mysql); /* We shall continue here, if --force was given */
goto error;
}
void load_single_table(void *arg)
{
int error;
char *raw_table_name= (char *)arg;
MYSQL *mysql= thread_local_mysql;
/* /*
We are not currently catching the error here. We are not currently catching the error here.
*/ */
if((error= write_to_table(raw_table_name, mysql))) if((error= write_to_table(raw_table_name, mysql)))
if (exitcode == 0) set_exitcode(error);
exitcode= error; }
error:
if (mysql)
db_disconnect(current_host, mysql);
pthread_mutex_lock(&counter_mutex); static void tpool_thread_init(void)
counter--; {
pthread_cond_signal(&count_threshhold); mysql_thread_init();
pthread_mutex_unlock(&counter_mutex); thread_local_mysql= db_connect(current_host,current_db,current_user,opt_password);
}
static void tpool_thread_exit(void)
{
if (thread_local_mysql)
db_disconnect(current_host,thread_local_mysql);
mysql_thread_end(); mysql_thread_end();
pthread_exit(0);
return 0;
} }
#include <vector>
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
int error=0; int error=0;
...@@ -668,102 +652,33 @@ int main(int argc, char **argv) ...@@ -668,102 +652,33 @@ int main(int argc, char **argv)
if (opt_use_threads && !lock_tables) if (opt_use_threads && !lock_tables)
{ {
char **save_argv; thread_pool= tpool::create_thread_pool_generic(opt_use_threads,opt_use_threads);
uint worker_thread_count= 0, table_count= 0, i= 0; thread_pool->set_thread_callbacks(tpool_thread_init,tpool_thread_exit);
pthread_t *worker_threads; /* Thread descriptor */
pthread_attr_t attr; /* Thread attributes */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,
PTHREAD_CREATE_JOINABLE);
pthread_mutex_init(&init_mutex, NULL);
pthread_mutex_init(&counter_mutex, NULL);
pthread_cond_init(&count_threshhold, NULL);
/* Count the number of tables. This number denotes the total number
of threads spawn.
*/
save_argv= argv;
for (table_count= 0; *argv != NULL; argv++)
table_count++;
argv= save_argv;
if (!(worker_threads= (pthread_t*) my_malloc(PSI_NOT_INSTRUMENTED,
table_count * sizeof(*worker_threads), MYF(0))))
return -2;
for (; *argv != NULL; argv++) /* Loop through tables */
{
pthread_mutex_lock(&counter_mutex);
while (counter == opt_use_threads)
{
struct timespec abstime;
set_timespec(abstime, 3);
pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
}
/* Before exiting the lock we set ourselves up for the next thread */
counter++;
pthread_mutex_unlock(&counter_mutex);
/* now create the thread */
if (pthread_create(&worker_threads[worker_thread_count], &attr,
worker_thread, (void *)*argv) != 0)
{
pthread_mutex_lock(&counter_mutex);
counter--;
pthread_mutex_unlock(&counter_mutex);
fprintf(stderr,"%s: Could not create thread\n", my_progname);
continue;
}
worker_thread_count++;
}
/* std::vector<tpool::task> all_tasks;
We loop until we know that all children have cleaned up. for (int i=0; argv[i]; i++)
*/ all_tasks.push_back(tpool::task(load_single_table, argv[i]));
pthread_mutex_lock(&counter_mutex);
while (counter)
{
struct timespec abstime;
set_timespec(abstime, 3);
pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
}
pthread_mutex_unlock(&counter_mutex);
pthread_mutex_destroy(&init_mutex);
pthread_mutex_destroy(&counter_mutex);
pthread_cond_destroy(&count_threshhold);
pthread_attr_destroy(&attr);
for(i= 0; i < worker_thread_count; i++) for (auto &t: all_tasks)
{ thread_pool->submit_task(&t);
if (pthread_join(worker_threads[i], NULL))
fprintf(stderr,"%s: Could not join worker thread.\n", my_progname);
}
my_free(worker_threads); delete thread_pool;
thread_pool= nullptr;
} }
else else
{ {
MYSQL *mysql= 0; MYSQL *mysql= db_connect(current_host,current_db,current_user,opt_password);
if (!(mysql= db_connect(current_host,current_db,current_user,opt_password))) if (!mysql)
{ {
free_defaults(argv_to_free); free_defaults(argv_to_free);
return(1); /* purecov: dead code */ return(1); /* purecov: dead code */
} }
if (mysql_query(mysql, "/*!40101 set @@character_set_database=binary */;"))
{
db_error(mysql); /* We shall continue here, if --force was given */
return(1);
}
if (lock_tables) if (lock_tables)
lock_table(mysql, argc, argv); lock_table(mysql, argc, argv);
for (; *argv != NULL; argv++) for (; *argv != NULL; argv++)
if ((error= write_to_table(*argv, mysql))) if ((error= write_to_table(*argv, mysql)))
if (exitcode == 0) set_exitcode(error);
exitcode= error;
db_disconnect(current_host, mysql); db_disconnect(current_host, mysql);
} }
safe_exit(0, 0); safe_exit(0, 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment