Commit 00ba013e authored by Oleksandr Byelkin's avatar Oleksandr Byelkin

Merge branch '11.4' into bb-11.4-release

parents 6f7efd69 2f5174e5
......@@ -53,7 +53,7 @@ SET_TARGET_PROPERTIES(mariadb-test PROPERTIES ENABLE_EXPORTS TRUE)
MYSQL_ADD_EXECUTABLE(mariadb-check mysqlcheck.c)
TARGET_LINK_LIBRARIES(mariadb-check ${CLIENT_LIB})
MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.c ../sql-common/my_user.c)
MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.cc ../sql-common/my_user.c connection_pool.cc)
TARGET_LINK_LIBRARIES(mariadb-dump ${CLIENT_LIB})
MYSQL_ADD_EXECUTABLE(mariadb-import mysqlimport.c)
......
/*
Copyright (c) 2023, MariaDB.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335
USA
*/
/*
Connection pool, with parallel query execution
Does not use threads, but IO multiplexing via mysql_send_query and
poll/iocp to wait for completions
*/
#include <my_global.h>
#ifdef _WIN32
#include <winsock2.h>
#else
#include <poll.h>
#endif
#include "connection_pool.h"
#include <my_compiler.h>
namespace async_pool
{
static ATTRIBUTE_NORETURN void die(const char *format, ...)
{
va_list args;
va_start(args, format);
vfprintf(stderr, format, args);
va_end(args);
abort();
}
pooled_connection *connection_pool::get_connection()
{
while (free_connections.empty())
wait_for_completions();
auto c= free_connections.front();
free_connections.pop();
return c;
}
#ifdef _WIN32
void connection_pool::add_to_pollset(pooled_connection *c)
{
DWORD err= ERROR_SUCCESS;
static char ch;
WSABUF buf;
buf.len= 0;
buf.buf= &ch;
if (!c->is_pipe)
{
/* Do async io (sockets). */
DWORD flags= 0;
if (WSARecv((SOCKET) c->handle, &buf, 1, 0, &flags, c, NULL))
err= WSAGetLastError();
}
else
{
/* Do async read (named pipe) */
if (!ReadFile(c->handle, buf.buf, buf.len, 0, c))
err= GetLastError();
}
if (err && err != ERROR_IO_PENDING)
die("%s failed: %d\n", c->is_pipe ? "ReadFile" : "WSARecv", err);
}
/*
Wait for completions of queries.Uses IOCP on windows to wait for completions.
(ReadFile/WSARecv with 0 bytes serves as readiness notification)
*/
void connection_pool::wait_for_completions()
{
ULONG n;
OVERLAPPED_ENTRY events[32];
if (!GetQueuedCompletionStatusEx(iocp, events, array_elements(events), &n, INFINITE,
FALSE))
{
die("GetQueuedCompletionStatusEx failed: %d\n", GetLastError());
}
for (ULONG i= 0; i < n; i++)
{
auto c= (pooled_connection *) events[i].lpOverlapped;
if (!c)
die("GetQueuedCompletionStatusEx unexpected return");
DBUG_ASSERT(c->mysql);
DBUG_ASSERT(!events[i].lpCompletionKey);
DBUG_ASSERT(!events[i].dwNumberOfBytesTransferred);
complete_query(c);
}
}
#else /* !_WIN32 */
void connection_pool::add_to_pollset(pooled_connection *c)
{
size_t idx= c - &all_connections[0];
pollfd *pfd= &pollset[idx];
pfd->fd= c->fd;
pfd->events= POLLIN;
pfd->revents= 0;
}
/* something Linux-ish, can be returned for POLLIN event*/
#ifndef POLLRDHUP
#define POLLRDHUP 0
#endif
void connection_pool::wait_for_completions()
{
int n;
while ((n= poll(pollset.data(), pollset.size(), -1)) <= 0)
{
if (errno == EINTR)
continue;
die("poll failed: %d\n", errno);
}
for (uint i= 0; n > 0 && i < pollset.size(); i++)
{
pollfd* pfd= &pollset[i];
if (pfd->revents &
(POLLIN | POLLPRI | POLLHUP | POLLRDHUP| POLLERR | POLLNVAL))
{
pfd->events= 0;
pfd->revents= 0;
pfd->fd= -1;
complete_query(&all_connections[i]);
n--;
}
}
if (n)
die("poll() failed to find free connection: %d\n");
}
#endif
void connection_pool::complete_query(pooled_connection *c)
{
int err= mysql_read_query_result(c->mysql);
if (c->on_completion)
c->on_completion(c->mysql, c->query.c_str(), !err, c->context);
if (c->release_connection)
{
c->in_use= false;
free_connections.push(c);
}
}
connection_pool::~connection_pool()
{
close();
}
void connection_pool::init(MYSQL *con[], size_t n)
{
#ifdef _WIN32
iocp= CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (!iocp)
die("CreateIoCompletionPort failed: %d\n", GetLastError());
#else
pollset.resize(n);
for (auto &pfd : pollset)
pfd.fd= -1;
#endif
for (size_t i= 0; i < n; i++)
all_connections.emplace_back(con[i]);
for (auto &con : all_connections)
{
free_connections.push(&con);
#ifdef _WIN32
if (!CreateIoCompletionPort(con.handle, iocp, 0, 0))
die("CreateIoCompletionPort failed: %d\n", GetLastError());
#endif
}
}
int connection_pool::execute_async(const char *query,
query_completion_handler on_completion,
void *context, bool release_connecton)
{
auto c= get_connection();
c->context= context;
c->on_completion= on_completion;
c->release_connection= release_connecton;
c->query= query;
int ret= mysql_send_query(c->mysql, query, (unsigned long) c->query.size());
if (ret)
{
free_connections.push(c);
return ret;
}
c->in_use= true;
add_to_pollset(c);
return 0;
}
/*
Wait until all queries are completed and all
connections are idle.
*/
void connection_pool::wait_all()
{
while (free_connections.size() != all_connections.size())
wait_for_completions();
}
void connection_pool::for_each_connection(void(*f)(MYSQL *mysql))
{
for (auto &c : all_connections)
f(c.mysql);
}
int connection_pool::close()
{
for (auto &c : all_connections)
mysql_close(c.mysql);
all_connections.clear();
while (!free_connections.empty())
free_connections.pop();
#ifdef _WIN32
if (iocp)
{
CloseHandle(iocp);
iocp= nullptr;
}
#endif
return 0;
}
pooled_connection::pooled_connection(MYSQL *c)
{
mysql= c;
#ifdef _WIN32
OVERLAPPED *ov= static_cast<OVERLAPPED *>(this);
memset(ov, 0, sizeof(OVERLAPPED));
mysql_protocol_type protocol;
if (c->host && !strcmp(c->host, "."))
protocol= MYSQL_PROTOCOL_PIPE;
else
protocol= (mysql_protocol_type) c->options.protocol;
is_pipe= protocol == MYSQL_PROTOCOL_PIPE;
handle= (HANDLE) mysql_get_socket(c);
#else
fd= mysql_get_socket(c);
#endif
}
} // namespace async_pool
/*
Copyright (c) 2023, MariaDB.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335
USA
*/
#pragma once
#include <mysql.h>
#include <vector>
#include <queue>
#include <string>
#ifdef _WIN32
#include <windows.h>
#else
#include <poll.h>
#endif
/*
Implementation of asynchronous mariadb connection pool.
This pool consists of set of MYSQL* connections, created by C API
function. The intention is that all connections have the same state
same server, by the same user etc.
The "asynchronous" means the queries are executed on the server
without waiting for the server reply. The queries are submitted
with mysql_send_query(), and completions are picked by poll/IOCP.
*/
namespace async_pool
{
typedef void (*query_completion_handler)(MYSQL *mysql, const char *query, bool success, void *context);
struct pooled_connection
#ifdef _WIN32
: OVERLAPPED
#endif
{
MYSQL *mysql;
query_completion_handler on_completion=NULL;
void *context=NULL;
std::string query;
bool in_use=false;
bool release_connection=false;
#ifdef _WIN32
bool is_pipe;
HANDLE handle;
#else
int fd;
#endif
pooled_connection(MYSQL *mysql);
};
struct connection_pool
{
private:
std::vector<pooled_connection> all_connections;
std::queue<pooled_connection *> free_connections;
pooled_connection *get_connection();
void wait_for_completions();
void complete_query(pooled_connection *c);
void add_to_pollset(pooled_connection *c);
#ifdef _WIN32
HANDLE iocp=nullptr;
#else
std::vector<pollfd> pollset;
#endif
public:
~connection_pool();
/**
Add connections to the connection pool
@param con - connections
@param n_connections - number of connections
*/
void init(MYSQL *con[], size_t n_connections);
/**
Send query to the connection pool
Executes query on a connection in the pool, using mysql_send_query
@param query - query string
@param on_completion - callback function to be called on completion
@param context - user context that will be passed to the callback function
@param release_connecton - if true, the connection should be released to the
pool after the query is executed. If you execute another
mysql_send_query() on the same connection, set this to false.
Note: the function will block if there are no free connections in the pool.
@return return code of mysql_send_query
*/
int execute_async(const char *query, query_completion_handler on_completion, void *context, bool release_connecton=true);
/** Waits for all outstanding queries to complete.*/
void wait_all();
/** Execute callback for each connection in the pool. */
void for_each_connection(void (*f)(MYSQL *mysql));
/**
Closes all connections in pool and frees all resources.
Does not wait for pending queries to complete
(use wait_all() for that)
*/
int close();
};
} // namespace async_pool
......@@ -61,7 +61,7 @@
#include "mysqld_error.h"
#include <welcome_copyright_notice.h> /* ORACLE_WELCOME_COPYRIGHT_NOTICE */
#include "connection_pool.h"
/* Exit codes */
#define EX_USAGE 1
......@@ -150,7 +150,7 @@ static my_bool insert_pat_inited= 0, debug_info_flag= 0, debug_check_flag= 0,
select_field_names_inited= 0;
static ulong opt_max_allowed_packet, opt_net_buffer_length;
static double opt_max_statement_time= 0.0;
static MYSQL mysql_connection,*mysql=0;
static MYSQL *mysql=0;
static DYNAMIC_STRING insert_pat, select_field_names, select_field_names_for_header;
static char *opt_password=0,*current_user=0,
*current_host=0,*path=0,*fields_terminated=0,
......@@ -194,7 +194,7 @@ FILE *stderror_file=0;
static uint opt_protocol= 0;
static char *opt_plugin_dir= 0, *opt_default_auth= 0;
static uint opt_parallel= 0;
/*
Dynamic_string wrapper functions. In this file use these
wrappers, they will terminate the process if there is
......@@ -246,6 +246,8 @@ static HASH ignore_table, ignore_data;
static HASH ignore_database;
static async_pool::connection_pool connection_pool;
static struct my_option my_long_options[] =
{
{"all-databases", 'A',
......@@ -526,6 +528,8 @@ static struct my_option my_long_options[] =
{"password", 'p',
"Password to use when connecting to server. If password is not given it's solicited on the tty.",
0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
{"parallel", 'j', "Number of dump table jobs executed in parallel (only with --tab option)",
&opt_parallel, &opt_parallel, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#ifdef _WIN32
{"pipe", 'W', "Use named pipes to connect to server.", 0, 0, 0, GET_NO_ARG,
NO_ARG, 0, 0, 0, 0, 0, 0},
......@@ -777,7 +781,7 @@ static void write_header(FILE *sql_file, const char *db_name)
"-- ------------------------------------------------------\n"
);
print_comment(sql_file, 0, "-- Server version\t%s\n",
mysql_get_server_info(&mysql_connection));
mysql_get_server_info(mysql));
if (!opt_logging)
fprintf(sql_file,
......@@ -1451,6 +1455,7 @@ static void maybe_die(int error_num, const char* fmt_reason, ...)
static int mysql_query_with_error_report(MYSQL *mysql_con, MYSQL_RES **res,
const char *query)
{
DBUG_ASSERT(mysql_con);
if (mysql_query(mysql_con, query) ||
(res && !((*res)= mysql_store_result(mysql_con))))
{
......@@ -1919,6 +1924,13 @@ static void free_resources()
else
fflush(md_result_file);
}
if (first_error && mysql)
{
connection_pool.for_each_connection(
[](MYSQL *c) { mysql_kill(mysql,c->thread_id);});
}
connection_pool.close();
if (get_table_name_result)
mysql_free_result(get_table_name_result);
if (routine_res)
......@@ -1959,7 +1971,7 @@ static void maybe_exit(int error)
if (ignore_errors)
return;
ignore_errors= 1; /* don't want to recurse, if something fails below */
if (opt_slave_data)
if (opt_slave_data && mysql)
do_start_slave_sql(mysql);
free_resources();
exit(error);
......@@ -1970,49 +1982,49 @@ static void maybe_exit(int error)
db_connect -- connects to the host and selects DB.
*/
static int connect_to_db(char *host, char *user,char *passwd)
static MYSQL* connect_to_db(char *host, char *user,char *passwd)
{
char buff[20+FN_REFLEN];
my_bool reconnect;
DBUG_ENTER("connect_to_db");
verbose_msg("-- Connecting to %s...\n", host ? host : "localhost");
mysql_init(&mysql_connection);
MYSQL* con = mysql_init(NULL);
if (opt_compress)
mysql_options(&mysql_connection,MYSQL_OPT_COMPRESS,NullS);
mysql_options(con,MYSQL_OPT_COMPRESS,NullS);
#ifdef HAVE_OPENSSL
if (opt_use_ssl)
{
mysql_ssl_set(&mysql_connection, opt_ssl_key, opt_ssl_cert, opt_ssl_ca,
mysql_ssl_set(con, opt_ssl_key, opt_ssl_cert, opt_ssl_ca,
opt_ssl_capath, opt_ssl_cipher);
mysql_options(&mysql_connection, MYSQL_OPT_SSL_CRL, opt_ssl_crl);
mysql_options(&mysql_connection, MYSQL_OPT_SSL_CRLPATH, opt_ssl_crlpath);
mysql_options(&mysql_connection, MARIADB_OPT_TLS_VERSION, opt_tls_version);
mysql_options(con, MYSQL_OPT_SSL_CRL, opt_ssl_crl);
mysql_options(con, MYSQL_OPT_SSL_CRLPATH, opt_ssl_crlpath);
mysql_options(con, MARIADB_OPT_TLS_VERSION, opt_tls_version);
}
mysql_options(&mysql_connection,MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
mysql_options(con,MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
(char*)&opt_ssl_verify_server_cert);
#endif
if (opt_protocol)
mysql_options(&mysql_connection,MYSQL_OPT_PROTOCOL,(char*)&opt_protocol);
mysql_options(&mysql_connection, MYSQL_SET_CHARSET_NAME, default_charset);
mysql_options(con,MYSQL_OPT_PROTOCOL,(char*)&opt_protocol);
mysql_options(con, MYSQL_SET_CHARSET_NAME, default_charset);
if (opt_plugin_dir && *opt_plugin_dir)
mysql_options(&mysql_connection, MYSQL_PLUGIN_DIR, opt_plugin_dir);
mysql_options(con, MYSQL_PLUGIN_DIR, opt_plugin_dir);
if (opt_default_auth && *opt_default_auth)
mysql_options(&mysql_connection, MYSQL_DEFAULT_AUTH, opt_default_auth);
mysql_options(con, MYSQL_DEFAULT_AUTH, opt_default_auth);
mysql_options(&mysql_connection, MYSQL_OPT_CONNECT_ATTR_RESET, 0);
mysql_options4(&mysql_connection, MYSQL_OPT_CONNECT_ATTR_ADD,
mysql_options(con, MYSQL_OPT_CONNECT_ATTR_RESET, 0);
mysql_options4(con, MYSQL_OPT_CONNECT_ATTR_ADD,
"program_name", "mysqldump");
mysql= &mysql_connection; /* So we can mysql_close() it properly */
if (!mysql_real_connect(&mysql_connection,host,user,passwd,
if (!mysql_real_connect(con,host,user,passwd,
NULL,opt_mysql_port,opt_mysql_unix_port, 0))
{
DB_error(&mysql_connection, "when trying to connect");
DBUG_RETURN(1);
DB_error(con, "when trying to connect");
goto err;
}
if ((mysql_get_server_version(&mysql_connection) < 40100) ||
if ((mysql_get_server_version(con) < 40100) ||
(opt_compatible_mode & 3))
{
/* Don't dump SET NAMES with a pre-4.1 server (bug#7997). */
......@@ -2026,22 +2038,38 @@ static int connect_to_db(char *host, char *user,char *passwd)
cannot reconnect.
*/
reconnect= 0;
mysql_options(&mysql_connection, MYSQL_OPT_RECONNECT, &reconnect);
mysql_options(con, MYSQL_OPT_RECONNECT, &reconnect);
my_snprintf(buff, sizeof(buff), "/*!40100 SET @@SQL_MODE='%s' */",
compatible_mode_normal_str);
if (mysql_query_with_error_report(mysql, 0, buff))
DBUG_RETURN(1);
if (mysql_query_with_error_report(con, 0, buff))
goto err;
/*
set time_zone to UTC to allow dumping date types between servers with
different time zone settings
*/
if (opt_tz_utc)
{
my_snprintf(buff, sizeof(buff), "/*!40103 SET TIME_ZONE='+00:00' */");
if (mysql_query_with_error_report(mysql, 0, buff))
DBUG_RETURN(1);
if (mysql_query_with_error_report(con, 0,
"/*!40103 SET TIME_ZONE='+00:00' */"))
goto err;
}
DBUG_RETURN(0);
/* Set MAX_STATEMENT_TIME to 0 unless set in client */
my_snprintf(buff, sizeof(buff), "/*!100100 SET @@MAX_STATEMENT_TIME=%f */",
opt_max_statement_time);
if (mysql_query_with_error_report(con, 0, buff))
goto err;
/* Set server side timeout between client commands to server compiled-in default */
if(mysql_query_with_error_report(con,0, "/*!100100 SET WAIT_TIMEOUT=DEFAULT */"))
goto err;
DBUG_RETURN(con);
err:
if (con)
mysql_close(con);
DBUG_RETURN(NULL);
} /* connect_to_db */
......@@ -2063,7 +2091,7 @@ static void unescape(FILE *file,char *pos, size_t length)
if (!(tmp=(char*) my_malloc(PSI_NOT_INSTRUMENTED, length*2+1, MYF(MY_WME))))
die(EX_MYSQLERR, "Couldn't allocate memory");
mysql_real_escape_string(&mysql_connection, tmp, pos, (ulong)length);
mysql_real_escape_string(mysql, tmp, pos, (ulong)length);
fputc('\'', file);
fputs(tmp, file);
fputc('\'', file);
......@@ -4044,6 +4072,21 @@ static void vers_append_system_time(DYNAMIC_STRING* query_string)
}
}
/**
Completion handler for async queries in the pool.
Dies in case query produced an error.
@param mysql The connection that executed the query.
@param query The query that was executed.
@param success Whether the query was successful.
*/
static void send_query_completion_func(MYSQL* mysql, const char* query,
bool success, void*)
{
if (!success)
maybe_die(EX_MYSQLERR, "Couldn't execute async query '%s' (%s)", query,
mysql_error(mysql));
}
/*
......@@ -4199,6 +4242,10 @@ static void dump_table(const char *table, const char *db, const uchar *hash_key,
dynstr_append_checked(&query_string, select_field_names.str);
}
dynstr_append_checked(&query_string, " FROM ");
char quoted_db_buf[NAME_LEN * 2 + 3];
char *qdatabase= quote_name(db, quoted_db_buf, opt_quoted);
dynstr_append_checked(&query_string, qdatabase);
dynstr_append_checked(&query_string, ".");
dynstr_append_checked(&query_string, result_table);
if (versioned)
......@@ -4222,8 +4269,16 @@ static void dump_table(const char *table, const char *db, const uchar *hash_key,
my_free(order_by);
order_by= 0;
}
if (mysql_real_query(mysql, query_string.str, (ulong)query_string.length))
if (opt_parallel)
{
if (connection_pool.execute_async(query_string.str,send_query_completion_func,nullptr,true))
{
dynstr_free(&query_string);
DB_error(mysql, "when executing send_query 'SELECT INTO OUTFILE'");
DBUG_VOID_RETURN;
}
}
else if (mysql_real_query(mysql, query_string.str, (ulong)query_string.length))
{
dynstr_free(&query_string);
DB_error(mysql, "when executing 'SELECT INTO OUTFILE'");
......@@ -4403,7 +4458,7 @@ static void dump_table(const char *table, const char *db, const uchar *hash_key,
{
dynstr_append_checked(&extended_row,"'");
extended_row.length +=
mysql_real_escape_string(&mysql_connection,
mysql_real_escape_string(mysql,
&extended_row.str[extended_row.length],
row[i],length);
extended_row.str[extended_row.length]='\0';
......@@ -6761,7 +6816,7 @@ static char *primary_key_fields(const char *table_name)
{
char *end;
/* result (terminating \0 is already in result_length) */
result= my_malloc(PSI_NOT_INSTRUMENTED, result_length + 10, MYF(MY_WME));
result= (char *)my_malloc(PSI_NOT_INSTRUMENTED, result_length + 10, MYF(MY_WME));
if (!result)
{
fprintf(stderr, "Error: Not enough memory to store ORDER BY clause\n");
......@@ -7065,10 +7120,30 @@ static void dynstr_realloc_checked(DYNAMIC_STRING *str, ulong additional_size)
die(EX_MYSQLERR, DYNAMIC_STR_ERROR_MSG);
}
#define MAX_POOL_CONNECTIONS 256
static void init_connection_pool(uint n_connections)
{
MYSQL *conn[MAX_POOL_CONNECTIONS];
if (n_connections > array_elements(conn))
die(EX_USAGE, "Too many connections");
for (uint i= 0; i < n_connections; i++)
{
MYSQL *c= connect_to_db(current_host, current_user, opt_password);
if (!c)
{
for (uint j= 0; j < i; j++)
mysql_close(conn[j]);
die(EX_MYSQLERR, "Error during connection to DB");
}
conn[i]= c;
}
connection_pool.init(conn, n_connections);
}
int main(int argc, char **argv)
{
char query[48];
char bin_log_name[FN_REFLEN];
int exit_code;
int consistent_binlog_pos= 0;
......@@ -7102,20 +7177,24 @@ int main(int argc, char **argv)
}
}
if (connect_to_db(current_host, current_user, opt_password))
mysql= connect_to_db(current_host, current_user, opt_password);
if (!mysql)
{
free_resources();
exit(EX_MYSQLERR);
}
if (!path)
{
write_header(md_result_file, *argv);
/* Set MAX_STATEMENT_TIME to 0 unless set in client */
my_snprintf(query, sizeof(query), "/*!100100 SET @@MAX_STATEMENT_TIME=%f */", opt_max_statement_time);
mysql_query(mysql, query);
/* Set server side timeout between client commands to server compiled-in default */
mysql_query(mysql, "/*!100100 SET WAIT_TIMEOUT=DEFAULT */");
if (opt_parallel)
{
verbose_msg("-- Warning: ignoring --parallel setting, it currently only "
"works together with --tab\n");
opt_parallel= 0;
}
}
else if (opt_parallel)
init_connection_pool(opt_parallel);
/* Check if the server support multi source */
if (mysql_get_server_version(mysql) >= 100000)
......@@ -7168,8 +7247,15 @@ int main(int argc, char **argv)
goto err;
}
if (opt_single_transaction && start_transaction(mysql))
if (opt_single_transaction)
{
if (start_transaction(mysql))
goto err;
connection_pool.for_each_connection([](MYSQL *c) {
if (start_transaction(c))
maybe_die(EX_MYSQLERR, "Failed to start transaction on connection ID %u", mysql->thread_id);
});
}
/* Add 'STOP SLAVE to beginning of dump */
if (opt_slave_apply && add_stop_slave())
......@@ -7262,6 +7348,9 @@ int main(int argc, char **argv)
if (opt_delete_master_logs && purge_bin_logs_to(mysql, bin_log_name))
goto err;
/* wait for outstanding asynchronous queries */
connection_pool.wait_all();
/*
No reason to explicitly COMMIT the transaction, neither to explicitly
UNLOCK TABLES: these will be automatically be done by the server when we
......
......@@ -148,6 +148,9 @@ static struct my_option my_long_options[] =
{"pipe", 'W', "Use named pipes to connect to server.", 0, 0, 0, GET_NO_ARG,
NO_ARG, 0, 0, 0, 0, 0, 0},
#endif
{"parallel", 'j', "Number of LOAD DATA jobs executed in parallel",
&opt_use_threads, &opt_use_threads, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0,
0, 0},
{"plugin_dir", OPT_PLUGIN_DIR, "Directory for client-side plugins.",
&opt_plugin_dir, &opt_plugin_dir, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
......@@ -170,9 +173,7 @@ static struct my_option my_long_options[] =
&opt_mysql_unix_port, &opt_mysql_unix_port, 0, GET_STR,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#include <sslopt-longopts.h>
{"use-threads", OPT_USE_THREADS,
"Load files in parallel. The argument is the number "
"of threads to use for loading data.",
{"use-threads", OPT_USE_THREADS, "Synonym for --parallel option",
&opt_use_threads, &opt_use_threads, 0,
GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
#ifndef DONT_ALLOW_USER_CHANGE
......
set @save_debug_dbug=@@global.debug_dbug;
set global debug_dbug='+d,select_export_kill';
mariadb-dump: Couldn't execute async query 'SELECT /*!40001 SQL_NO_CACHE */ `Host`, `User`, `Priv` INTO OUTFILE 'MYSQLTEST_VARDIR/tmp/global_priv.txt' /*!50138 CHARACTER SET binary */ FROM `mysql`.`global_priv`' (Query execution was interrupted)
set global debug_dbug=@save_debug_dbug;
--source include/have_debug.inc
--source include/not_embedded.inc
# need to dump "mysql" schema
--source include/have_innodb.inc
# inject error on SELECT INTO OUTFILE
set @save_debug_dbug=@@global.debug_dbug;
set global debug_dbug='+d,select_export_kill';
# This one succeeds
--replace_result .\exe '' $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
--error 2
--exec $MYSQL_DUMP --tab=$MYSQLTEST_VARDIR/tmp/ --parallel=2 mysql global_priv 2>&1
set global debug_dbug=@save_debug_dbug;
--loose-enable-named-pipe
# Test mysqldump specific features (pipe connection, with parallel)
CREATE TABLE t1 (i INT);
INSERT INTO t1 VALUES(1);
DROP TABLE t1;
test.t1: Records: 1 Deleted: 0 Skipped: 0 Warnings: 0
SELECT * FROM t1;
i
1
DROP TABLE t1;
--source include/windows.inc
--source include/not_embedded.inc
--echo # Test mysqldump specific features (pipe connection, with parallel)
CREATE TABLE t1 (i INT);
INSERT INTO t1 VALUES(1);
--exec $MYSQL_DUMP --host=. --tab=$MYSQLTEST_VARDIR/tmp/ test --parallel=2
DROP TABLE t1;
--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/t1.sql
--exec $MYSQL_IMPORT test $MYSQLTEST_VARDIR/tmp/t1.txt
SELECT * FROM t1;
DROP TABLE t1;
......@@ -5531,7 +5531,6 @@ proc
one
DROP DATABASE bug25717383;
mariadb-dump: Got error: 2005: "Unknown server host 'unknownhost'" when trying to connect
mariadb-dump: Couldn't execute 'SHOW SLAVE STATUS': Server has gone away (2006)
Usage: mariadb-dump [OPTIONS] database [tables]
OR mariadb-dump [OPTIONS] --databases DB1 [DB2 DB3...]
OR mariadb-dump [OPTIONS] --all-databases
......@@ -6659,3 +6658,12 @@ drop table t1;
# End of 10.4 tests
#
mariadb-dump: --xml can't be used with --tab.
select @@max_connections into @save_max_connections;
set global max_connections=10;
mariadb-dump: Got error: 1040: "Too many connections" when trying to connect
set global max_connections=300;
mariadb-dump: Too many connections
set global max_connections=@save_max_connections;
#
# End of 11.4 tests
#
......@@ -2482,12 +2482,12 @@ INSERT INTO t2 VALUES (3), (4);
SELECT * FROM t1;
SELECT * FROM t2;
--exec $MYSQL_DUMP --default-character-set=utf8mb4 --tab=$MYSQLTEST_VARDIR/tmp/ db_20772273
--exec $MYSQL_DUMP --default-character-set=utf8mb4 --tab=$MYSQLTEST_VARDIR/tmp/ --parallel=2 db_20772273
--exec $MYSQL db_20772273 < $MYSQLTEST_VARDIR/tmp/t1.sql
--exec $MYSQL db_20772273 < $MYSQLTEST_VARDIR/tmp/t2.sql
# Test mysqlimport with multiple threads
--exec $MYSQL_IMPORT --silent --use-threads=2 db_20772273 $MYSQLTEST_VARDIR/tmp/t1.txt $MYSQLTEST_VARDIR/tmp/t2.txt
--exec $MYSQL_IMPORT --silent --parallel=2 db_20772273 $MYSQLTEST_VARDIR/tmp/t1.txt $MYSQLTEST_VARDIR/tmp/t2.txt
SELECT * FROM t1;
SELECT * FROM t2;
......@@ -3023,3 +3023,24 @@ drop table t1;
--replace_result mariadb-dump.exe mariadb-dump
--error 1
--exec $MYSQL_DUMP --xml --tab=$MYSQLTEST_VARDIR/tmp 2>&1
#
# MDEV-32589 parallel-mysqldump - test "too many connections"
#
select @@max_connections into @save_max_connections;
set global max_connections=10;
--replace_result mariadb-dump.exe mariadb-dump
--error 2
--exec $MYSQL_DUMP --tab=$MYSQLTEST_VARDIR/tmp/ --parallel=20 mysql 2>&1
#
# MDEV-32589 test builtin 256 connections limit
#
set global max_connections=300;
--replace_result mariadb-dump.exe mariadb-dump
--error 1
--exec $MYSQL_DUMP --tab=$MYSQLTEST_VARDIR/tmp/ --parallel=280 mysql 2>&1
set global max_connections=@save_max_connections;
--echo #
--echo # End of 11.4 tests
--echo #
......@@ -134,9 +134,6 @@
#endif
#include <my_service_manager.h>
#ifdef __linux__
#include <sys/eventfd.h>
#endif
#include <source_revision.h>
......@@ -1396,6 +1393,8 @@ struct my_rnd_struct sql_rand; ///< used by sql_class.cc:THD::THD()
Dynamic_array<MYSQL_SOCKET> listen_sockets(PSI_INSTRUMENT_MEM, 0);
bool unix_sock_is_online= false;
static int systemd_sock_activation; /* systemd socket activation */
/** wakeup listening(main) thread by writing to this descriptor */
static int termination_event_fd= -1;
......@@ -1658,66 +1657,21 @@ static my_bool warn_threads_active_after_phase_2(THD *thd, void *)
static void break_connect_loop()
{
#ifdef EXTRA_DEBUG
int count=0;
#endif
abort_loop= 1;
#if defined(_WIN32)
mysqld_win_initiate_shutdown();
#else
/* Avoid waiting for ourselves when thread-handling=no-threads. */
if (pthread_equal(pthread_self(), select_thread))
return;
DBUG_PRINT("quit", ("waiting for select thread: %lu",
(ulong)select_thread));
mysql_mutex_lock(&LOCK_start_thread);
while (select_thread_in_use)
{
struct timespec abstime;
int UNINIT_VAR(error);
DBUG_PRINT("info",("Waiting for select thread"));
{
/*
Cannot call shutdown on systemd socket activated descriptors
as their clone in the pid 1 is reused.
*/
int lowest_fd, shutdowns= 0;
lowest_fd= sd_listen_fds(0) + SD_LISTEN_FDS_START;
for(size_t i= 0; i < listen_sockets.size(); i++)
{
int fd= mysql_socket_getfd(listen_sockets.at(i));
if (fd >= lowest_fd)
{
shutdowns++;
mysql_socket_shutdown(listen_sockets.at(i), SHUT_RDWR);
}
}
if (!shutdowns && termination_event_fd >=0)
if (termination_event_fd >= 0)
{
uint64_t u= 1;
if (write(termination_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t))
sql_print_error("Couldn't send event to terminate listen loop");
}
}
set_timespec(abstime, 2);
for (uint tmp=0 ; tmp < 10 && select_thread_in_use; tmp++)
if(write(termination_event_fd, &u, sizeof(uint64_t)) < 0)
{
error= mysql_cond_timedwait(&COND_start_thread, &LOCK_start_thread,
&abstime);
if (error != EINTR)
break;
sql_print_error("Couldn't send event to terminate listen loop");
abort();
}
#ifdef EXTRA_DEBUG
if (error != 0 && error != ETIMEDOUT && !count++)
sql_print_error("Got error %d from mysql_cond_timedwait", error);
#endif
}
if (termination_event_fd >= 0)
close(termination_event_fd);
mysql_mutex_unlock(&LOCK_start_thread);
#endif /* _WIN32 */
}
......@@ -1963,11 +1917,6 @@ extern "C" void unireg_abort(int exit_code)
static void mysqld_exit(int exit_code)
{
DBUG_ENTER("mysqld_exit");
/*
Important note: we wait for the signal thread to end,
but if a kill -15 signal was sent, the signal thread did
spawn the kill_server_thread thread, which is running concurrently.
*/
rpl_deinit_gtid_waiting();
rpl_deinit_gtid_slave_state();
wait_for_signal_thread_to_end();
......@@ -2124,17 +2073,17 @@ static void clean_up(bool print_message)
*/
static void wait_for_signal_thread_to_end()
{
uint i;
#ifndef _WIN32
/*
Wait up to 10 seconds for signal thread to die. We use this mainly to
avoid getting warnings that my_thread_end has not been called
*/
for (i= 0 ; i < 100 && signal_thread_in_use; i++)
for (uint i= 0 ; i < 100 && signal_thread_in_use; i++)
{
if (pthread_kill(signal_thread, MYSQL_KILL_SIGNAL) == ESRCH)
break;
kill(getpid(), MYSQL_KILL_SIGNAL);
my_sleep(100); // Give it time to die
}
#endif
}
#endif /*EMBEDDED_LIBRARY*/
......@@ -2676,9 +2625,6 @@ static void use_systemd_activated_sockets()
listen_sockets.push(sock);
}
systemd_sock_activation= 1;
termination_event_fd= eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
if (termination_event_fd == -1)
sql_print_warning("eventfd failed %d", errno);
free(names);
DBUG_VOID_RETURN;
......@@ -3232,22 +3178,9 @@ static void start_signal_handler(void)
DBUG_VOID_RETURN;
}
#if defined(USE_ONE_SIGNAL_HAND)
pthread_handler_t kill_server_thread(void *arg __attribute__((unused)))
{
my_thread_init(); // Initialize new thread
break_connect_loop();
my_thread_end();
pthread_exit(0);
return 0;
}
#endif
/** This threads handles all signals */
/* ARGSUSED */
pthread_handler_t signal_hand(void *arg __attribute__((unused)))
pthread_handler_t signal_hand(void *)
{
sigset_t set;
int sig;
......@@ -3291,20 +3224,17 @@ pthread_handler_t signal_hand(void *arg __attribute__((unused)))
int error;
int origin;
if (abort_loop)
break;
while ((error= my_sigwait(&set, &sig, &origin)) == EINTR) /* no-op */;
if (cleanup_done)
{
DBUG_PRINT("quit",("signal_handler: calling my_thread_end()"));
my_thread_end();
DBUG_LEAVE; // Must match DBUG_ENTER()
signal_thread_in_use= 0;
pthread_exit(0); // Safety
return 0; // Avoid compiler warnings
}
if (abort_loop)
break;
switch (sig) {
case SIGTERM:
case SIGQUIT:
case SIGKILL:
#ifdef EXTRA_DEBUG
sql_print_information("Got signal %d to shutdown server",sig);
#endif
......@@ -3312,31 +3242,15 @@ pthread_handler_t signal_hand(void *arg __attribute__((unused)))
logger.set_handlers(global_system_variables.sql_log_slow ? LOG_FILE:LOG_NONE,
opt_log ? LOG_FILE:LOG_NONE);
DBUG_PRINT("info",("Got signal: %d abort_loop: %d",sig,abort_loop));
if (!abort_loop)
{
/* Delete the instrumentation for the signal thread */
PSI_CALL_delete_current_thread();
#ifdef USE_ONE_SIGNAL_HAND
pthread_t tmp;
if (unlikely((error= mysql_thread_create(0, /* Not instrumented */
&tmp, &connection_attrib,
kill_server_thread,
(void*) &sig))))
sql_print_error("Can't create thread to kill server (errno= %d)",
error);
#else
my_sigset(sig, SIG_IGN);
break_connect_loop();
#endif
}
DBUG_ASSERT(abort_loop);
break;
case SIGHUP:
#if defined(SI_KERNEL)
if (!abort_loop && origin != SI_KERNEL)
if (origin != SI_KERNEL)
#elif defined(SI_USER)
if (!abort_loop && origin <= SI_USER)
#else
if (!abort_loop)
if (origin <= SI_USER)
#endif
{
int not_used;
......@@ -3363,6 +3277,11 @@ pthread_handler_t signal_hand(void *arg __attribute__((unused)))
break; /* purecov: tested */
}
}
DBUG_PRINT("quit", ("signal_handler: calling my_thread_end()"));
my_thread_end();
DBUG_LEAVE; // Must match DBUG_ENTER()
signal_thread_in_use= 0;
pthread_exit(0); // Safety
return(0); /* purecov: deadcode */
}
......@@ -6285,16 +6204,11 @@ void handle_connections_sockets()
uint error_count=0;
struct sockaddr_storage cAddr;
int retval;
#ifdef HAVE_POLL
// for ip_sock, unix_sock and extra_ip_sock
Dynamic_array<struct pollfd> fds(PSI_INSTRUMENT_MEM);
#else
fd_set readFDs,clientFDs;
#endif
DBUG_ENTER("handle_connections_sockets");
#ifdef HAVE_POLL
for (size_t i= 0; i < listen_sockets.size(); i++)
{
struct pollfd local_fds;
......@@ -6304,27 +6218,25 @@ void handle_connections_sockets()
fds.push(local_fds);
set_non_blocking_if_supported(listen_sockets.at(i));
}
#else
FD_ZERO(&clientFDs);
for (size_t i= 0; i < listen_sockets.size(); i++)
int termination_fds[2];
if (pipe(termination_fds))
{
int fd= mysql_socket_getfd(listen_sockets.at(i));
FD_SET(fd, &clientFDs);
set_non_blocking_if_supported(listen_sockets.at(i));
sql_print_error("pipe() failed %d", errno);
DBUG_VOID_RETURN;
}
#ifdef FD_CLOEXEC
for (int fd : termination_fds)
(void)fcntl(fd, F_SETFD, FD_CLOEXEC);
#endif
if (termination_event_fd >= 0)
{
#ifdef HAVE_POLL
mysql_mutex_lock(&LOCK_start_thread);
termination_event_fd= termination_fds[1];
mysql_mutex_unlock(&LOCK_start_thread);
struct pollfd event_fd;
event_fd.fd= termination_event_fd;
event_fd.fd= termination_fds[0];
event_fd.events= POLLIN;
fds.push(event_fd);
#else
FD_SET(termination_event_fd, &clientFDs);
#endif
/* no need to read this fd, abrt_loop is set before it gets a chance */
}
sd_notify(0, "READY=1\n"
"STATUS=Taking your SQL requests now...\n");
......@@ -6332,12 +6244,7 @@ void handle_connections_sockets()
DBUG_PRINT("general",("Waiting for connections."));
while (!abort_loop)
{
#ifdef HAVE_POLL
retval= poll(fds.get_pos(0), fds.size(), -1);
#else
readFDs=clientFDs;
retval= select(FD_SETSIZE, &readFDs, NULL, NULL, NULL);
#endif
if (retval < 0)
{
......@@ -6359,7 +6266,6 @@ void handle_connections_sockets()
break;
/* Is this a new connection request ? */
#ifdef HAVE_POLL
for (size_t i= 0; i < fds.size(); ++i)
{
if (fds.at(i).revents & POLLIN)
......@@ -6368,16 +6274,6 @@ void handle_connections_sockets()
break;
}
}
#else // HAVE_POLL
for (size_t i=0; i < listen_sockets.size(); i++)
{
if (FD_ISSET(mysql_socket_getfd(listen_sockets.at(i)), &readFDs))
{
sock= listen_sockets.at(i);
break;
}
}
#endif // HAVE_POLL
for (uint retry=0; retry < MAX_ACCEPT_RETRY && !abort_loop; retry++)
{
......@@ -6405,6 +6301,12 @@ void handle_connections_sockets()
}
}
}
mysql_mutex_lock(&LOCK_start_thread);
for(int fd : termination_fds)
close(fd);
termination_event_fd= -1;
mysql_mutex_unlock(&LOCK_start_thread);
sd_notify(0, "STOPPING=1\n"
"STATUS=Shutdown in progress\n");
DBUG_VOID_RETURN;
......
......@@ -3450,6 +3450,9 @@ int select_export::send_data(List<Item> &items)
uint used_length=0,items_left=items.elements;
List_iterator_fast<Item> li(items);
DBUG_EXECUTE_IF("select_export_kill", {
thd->killed= KILL_QUERY;
});
if (my_b_write(&cache,(uchar*) exchange->line_start->ptr(),
exchange->line_start->length()))
goto err;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment