Commit 4532dae0 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-32216 option --parallel/-j for mariadb-dump to increase parallelism

At the moment, it only works with --tab, to execute "SELECT INTO OUTFILE"
queries concurrently.

Uses connection_pool for concurrent execution.
parent ec5db640
...@@ -53,7 +53,7 @@ SET_TARGET_PROPERTIES(mariadb-test PROPERTIES ENABLE_EXPORTS TRUE) ...@@ -53,7 +53,7 @@ SET_TARGET_PROPERTIES(mariadb-test PROPERTIES ENABLE_EXPORTS TRUE)
MYSQL_ADD_EXECUTABLE(mariadb-check mysqlcheck.c) MYSQL_ADD_EXECUTABLE(mariadb-check mysqlcheck.c)
TARGET_LINK_LIBRARIES(mariadb-check ${CLIENT_LIB}) TARGET_LINK_LIBRARIES(mariadb-check ${CLIENT_LIB})
MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.cc ../sql-common/my_user.c) MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.cc ../sql-common/my_user.c connection_pool.cc)
TARGET_LINK_LIBRARIES(mariadb-dump ${CLIENT_LIB}) TARGET_LINK_LIBRARIES(mariadb-dump ${CLIENT_LIB})
MYSQL_ADD_EXECUTABLE(mariadb-import mysqlimport.c) MYSQL_ADD_EXECUTABLE(mariadb-import mysqlimport.c)
......
...@@ -61,7 +61,7 @@ ...@@ -61,7 +61,7 @@
#include "mysqld_error.h" #include "mysqld_error.h"
#include <welcome_copyright_notice.h> /* ORACLE_WELCOME_COPYRIGHT_NOTICE */ #include <welcome_copyright_notice.h> /* ORACLE_WELCOME_COPYRIGHT_NOTICE */
#include "connection_pool.h"
/* Exit codes */ /* Exit codes */
#define EX_USAGE 1 #define EX_USAGE 1
...@@ -194,7 +194,7 @@ FILE *stderror_file=0; ...@@ -194,7 +194,7 @@ FILE *stderror_file=0;
static uint opt_protocol= 0; static uint opt_protocol= 0;
static char *opt_plugin_dir= 0, *opt_default_auth= 0; static char *opt_plugin_dir= 0, *opt_default_auth= 0;
static uint opt_parallel= 0;
/* /*
Dynamic_string wrapper functions. In this file use these Dynamic_string wrapper functions. In this file use these
wrappers, they will terminate the process if there is wrappers, they will terminate the process if there is
...@@ -246,6 +246,8 @@ static HASH ignore_table, ignore_data; ...@@ -246,6 +246,8 @@ static HASH ignore_table, ignore_data;
static HASH ignore_database; static HASH ignore_database;
static async_pool::connection_pool connection_pool;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
{"all-databases", 'A', {"all-databases", 'A',
...@@ -526,6 +528,8 @@ static struct my_option my_long_options[] = ...@@ -526,6 +528,8 @@ static struct my_option my_long_options[] =
{"password", 'p', {"password", 'p',
"Password to use when connecting to server. If password is not given it's solicited on the tty.", "Password to use when connecting to server. If password is not given it's solicited on the tty.",
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0}, 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
{"parallel", 'j', "Number of dump table jobs executed in parallel (only with --tab option)",
&opt_parallel, &opt_parallel, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#ifdef _WIN32 #ifdef _WIN32
{"pipe", 'W', "Use named pipes to connect to server.", 0, 0, 0, GET_NO_ARG, {"pipe", 'W', "Use named pipes to connect to server.", 0, 0, 0, GET_NO_ARG,
NO_ARG, 0, 0, 0, 0, 0, 0}, NO_ARG, 0, 0, 0, 0, 0, 0},
...@@ -1920,6 +1924,13 @@ static void free_resources() ...@@ -1920,6 +1924,13 @@ static void free_resources()
else else
fflush(md_result_file); fflush(md_result_file);
} }
if (first_error && mysql)
{
connection_pool.for_each_connection(
[](MYSQL *c) { mysql_kill(mysql,c->thread_id);});
}
connection_pool.close();
if (get_table_name_result) if (get_table_name_result)
mysql_free_result(get_table_name_result); mysql_free_result(get_table_name_result);
if (routine_res) if (routine_res)
...@@ -4061,6 +4072,21 @@ static void vers_append_system_time(DYNAMIC_STRING* query_string) ...@@ -4061,6 +4072,21 @@ static void vers_append_system_time(DYNAMIC_STRING* query_string)
} }
} }
/**
Completion handler for async queries in the pool.
Dies in case query produced an error.
@param mysql The connection that executed the query.
@param query The query that was executed.
@param success Whether the query was successful.
*/
static void send_query_completion_func(MYSQL* mysql, const char* query,
bool success, void*)
{
if (!success)
maybe_die(EX_MYSQLERR, "Couldn't execute async query '%s' (%s)", query,
mysql_error(mysql));
}
/* /*
...@@ -4216,6 +4242,10 @@ static void dump_table(const char *table, const char *db, const uchar *hash_key, ...@@ -4216,6 +4242,10 @@ static void dump_table(const char *table, const char *db, const uchar *hash_key,
dynstr_append_checked(&query_string, select_field_names.str); dynstr_append_checked(&query_string, select_field_names.str);
} }
dynstr_append_checked(&query_string, " FROM "); dynstr_append_checked(&query_string, " FROM ");
char quoted_db_buf[NAME_LEN * 2 + 3];
char *qdatabase= quote_name(db, quoted_db_buf, opt_quoted);
dynstr_append_checked(&query_string, qdatabase);
dynstr_append_checked(&query_string, ".");
dynstr_append_checked(&query_string, result_table); dynstr_append_checked(&query_string, result_table);
if (versioned) if (versioned)
...@@ -4239,8 +4269,16 @@ static void dump_table(const char *table, const char *db, const uchar *hash_key, ...@@ -4239,8 +4269,16 @@ static void dump_table(const char *table, const char *db, const uchar *hash_key,
my_free(order_by); my_free(order_by);
order_by= 0; order_by= 0;
} }
if (opt_parallel)
if (mysql_real_query(mysql, query_string.str, (ulong)query_string.length)) {
if (connection_pool.execute_async(query_string.str,send_query_completion_func,nullptr,true))
{
dynstr_free(&query_string);
DB_error(mysql, "when executing send_query 'SELECT INTO OUTFILE'");
DBUG_VOID_RETURN;
}
}
else if (mysql_real_query(mysql, query_string.str, (ulong)query_string.length))
{ {
dynstr_free(&query_string); dynstr_free(&query_string);
DB_error(mysql, "when executing 'SELECT INTO OUTFILE'"); DB_error(mysql, "when executing 'SELECT INTO OUTFILE'");
...@@ -7082,6 +7120,27 @@ static void dynstr_realloc_checked(DYNAMIC_STRING *str, ulong additional_size) ...@@ -7082,6 +7120,27 @@ static void dynstr_realloc_checked(DYNAMIC_STRING *str, ulong additional_size)
die(EX_MYSQLERR, DYNAMIC_STR_ERROR_MSG); die(EX_MYSQLERR, DYNAMIC_STR_ERROR_MSG);
} }
#define MAX_POOL_CONNECTIONS 256
static void init_connection_pool(uint n_connections)
{
MYSQL *conn[MAX_POOL_CONNECTIONS];
if (n_connections > array_elements(conn))
die(EX_USAGE, "Too many connections");
for (uint i= 0; i < n_connections; i++)
{
MYSQL *c= connect_to_db(current_host, current_user, opt_password);
if (!c)
{
for (uint j= 0; j < i; j++)
mysql_close(conn[j]);
die(EX_MYSQLERR, "Error during connection to DB");
}
conn[i]= c;
}
connection_pool.init(conn, n_connections);
}
int main(int argc, char **argv) int main(int argc, char **argv)
{ {
...@@ -7118,15 +7177,24 @@ int main(int argc, char **argv) ...@@ -7118,15 +7177,24 @@ int main(int argc, char **argv)
} }
} }
if (connect_to_db(current_host, current_user, opt_password)) mysql= connect_to_db(current_host, current_user, opt_password);
if (!mysql)
{ {
free_resources(); free_resources();
exit(EX_MYSQLERR); exit(EX_MYSQLERR);
} }
if (!path) if (!path)
{
write_header(md_result_file, *argv); write_header(md_result_file, *argv);
if (opt_parallel)
{
verbose_msg("-- Warning: ignoring --parallel setting, it currently only "
"works together with --tab\n");
opt_parallel= 0;
}
}
else if (opt_parallel)
init_connection_pool(opt_parallel);
/* Check if the server support multi source */ /* Check if the server support multi source */
if (mysql_get_server_version(mysql) >= 100000) if (mysql_get_server_version(mysql) >= 100000)
...@@ -7179,8 +7247,15 @@ int main(int argc, char **argv) ...@@ -7179,8 +7247,15 @@ int main(int argc, char **argv)
goto err; goto err;
} }
if (opt_single_transaction && start_transaction(mysql)) if (opt_single_transaction)
goto err; {
if (start_transaction(mysql))
goto err;
connection_pool.for_each_connection([](MYSQL *c) {
if (start_transaction(c))
maybe_die(EX_MYSQLERR, "Failed to start transaction on connection ID %u", mysql->thread_id);
});
}
/* Add 'STOP SLAVE to beginning of dump */ /* Add 'STOP SLAVE to beginning of dump */
if (opt_slave_apply && add_stop_slave()) if (opt_slave_apply && add_stop_slave())
...@@ -7273,6 +7348,9 @@ int main(int argc, char **argv) ...@@ -7273,6 +7348,9 @@ int main(int argc, char **argv)
if (opt_delete_master_logs && purge_bin_logs_to(mysql, bin_log_name)) if (opt_delete_master_logs && purge_bin_logs_to(mysql, bin_log_name))
goto err; goto err;
/* wait for outstanding asynchronous queries */
connection_pool.wait_all();
/* /*
No reason to explicitly COMMIT the transaction, neither to explicitly No reason to explicitly COMMIT the transaction, neither to explicitly
UNLOCK TABLES: these will be automatically be done by the server when we UNLOCK TABLES: these will be automatically be done by the server when we
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment