Commit e91bbca5 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

Initial threadpool implementation for MariaDB 5.5

parent 5e7b949e
......@@ -22,3 +22,10 @@
# The below was used for really old versions of FreeBSD, roughly: before 5.1.9
# ADD_DEFINITIONS(-DHAVE_BROKEN_REALPATH)
# Use atomic builtins
IF(CMAKE_SIZEOF_VOID_P EQUAL 4 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "i386")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -march=i686")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=i686")
SET(CMAKE_REQUIRED_FLAGS "${CMAKE_REQUIRED_FLAGS} -march=i686")
ENDIF()
......@@ -40,7 +40,11 @@ typedef struct st_alarm_info
} ALARM_INFO;
void thr_alarm_info(ALARM_INFO *info);
extern my_bool my_disable_thr_alarm;
#ifdef _WIN32
#define DONT_USE_THR_ALARM
#endif
#if defined(DONT_USE_THR_ALARM)
#define USE_ALARM_THREAD
......@@ -88,7 +92,7 @@ typedef struct st_alarm {
extern uint thr_client_alarm;
extern pthread_t alarm_thread;
extern my_bool my_disable_thr_alarm;
#define thr_alarm_init(A) (*(A))=0
#define thr_alarm_in_use(A) (*(A)!= 0)
......
......@@ -168,6 +168,7 @@ void vio_end(void);
#define vio_should_retry(vio) (vio)->should_retry(vio)
#define vio_was_interrupted(vio) (vio)->was_interrupted(vio)
#define vio_close(vio) ((vio)->vioclose)(vio)
#define vio_shutdown(vio,how) ((vio)->shutdown)(vio,how)
#define vio_peer_addr(vio, buf, prt, buflen) (vio)->peer_addr(vio, buf, prt, buflen)
#define vio_timeout(vio, which, seconds) (vio)->timeout(vio, which, seconds)
#define vio_poll_read(vio, timeout) (vio)->poll_read(vio, timeout)
......@@ -219,6 +220,7 @@ struct st_vio
void (*timeout)(Vio*, unsigned int which, unsigned int timeout);
my_bool (*poll_read)(Vio *vio, uint timeout);
my_bool (*is_connected)(Vio*);
int (*shutdown)(Vio *, int);
my_bool (*has_data) (Vio*);
#ifdef HAVE_OPENSSL
void *ssl_arg;
......@@ -235,6 +237,7 @@ struct st_vio
char *shared_memory_pos;
#endif /* HAVE_SMEM */
#ifdef _WIN32
DWORD thread_id; /* Used to XP only in vio_shutdown */
OVERLAPPED pipe_overlapped;
DWORD read_timeout_ms;
DWORD write_timeout_ms;
......
......@@ -3461,7 +3461,9 @@ sub mysql_install_db {
mtr_add_arg($args, "--loose-skip-ndbcluster");
mtr_add_arg($args, "--loose-skip-aria");
mtr_add_arg($args, "--disable-sync-frm");
mtr_add_arg($args, "--tmpdir=%s", "$opt_vardir/tmp/");
mtr_add_arg($args, "--tmpdir=.");
mtr_add_arg($args, "--max_allowed_packet=8M");
mtr_add_arg($args, "--net_buffer_length=16K");
mtr_add_arg($args, "--core-file");
if ( $opt_debug )
......
......@@ -29,7 +29,7 @@
#
# Setup
#
--source include/not_threadpool.inc
--source include/not_embedded.inc
--source include/not_threadpool.inc
......
......@@ -8,7 +8,7 @@
###############################################################################
# These tests cannot run with the embedded server
-- source include/not_embedded.inc
-- source include/one_thread_per_connection.inc
#-- source include/one_thread_per_connection.inc
# Save the initial number of concurrent sessions
--source include/count_sessions.inc
......
......@@ -597,93 +597,6 @@ static void *alarm_handler(void *arg __attribute__((unused)))
return 0; /* Impossible */
}
#endif /* USE_ALARM_THREAD */
/*****************************************************************************
thr_alarm for win95
*****************************************************************************/
#else /* __WIN__ */
void thr_alarm_kill(my_thread_id thread_id)
{
/* Can't do this yet */
}
sig_handler process_alarm(int sig __attribute__((unused)))
{
/* Can't do this yet */
}
my_bool thr_alarm(thr_alarm_t *alrm, uint sec, ALARM *alarm)
{
(*alrm)= &alarm->alarmed;
if (alarm_aborted)
{
alarm->alarmed.crono=0;
return 1;
}
if (!(alarm->alarmed.crono=SetTimer((HWND) NULL,0, sec*1000,
(TIMERPROC) NULL)))
return 1;
return 0;
}
my_bool thr_got_alarm(thr_alarm_t *alrm_ptr)
{
thr_alarm_t alrm= *alrm_ptr;
MSG msg;
if (alrm->crono)
{
PeekMessage(&msg,NULL,WM_TIMER,WM_TIMER,PM_REMOVE) ;
if (msg.message == WM_TIMER || alarm_aborted)
{
KillTimer(NULL, alrm->crono);
alrm->crono = 0;
}
}
return !alrm->crono || alarm_aborted;
}
void thr_end_alarm(thr_alarm_t *alrm_ptr)
{
thr_alarm_t alrm= *alrm_ptr;
/* alrm may be zero if thr_alarm aborted with an error */
if (alrm && alrm->crono)
{
KillTimer(NULL, alrm->crono);
alrm->crono = 0;
}
}
void end_thr_alarm(my_bool free_structures)
{
DBUG_ENTER("end_thr_alarm");
alarm_aborted=1; /* No more alarms */
DBUG_VOID_RETURN;
}
void init_thr_alarm(uint max_alarm)
{
DBUG_ENTER("init_thr_alarm");
alarm_aborted=0; /* Yes, Gimmie alarms */
DBUG_VOID_RETURN;
}
void thr_alarm_info(ALARM_INFO *info)
{
bzero((char*) info, sizeof(*info));
}
void resize_thr_alarm(uint max_alarms)
{
}
#endif /* __WIN__ */
#endif
/****************************************************************************
......@@ -954,4 +867,5 @@ int main(int argc __attribute__((unused)),char **argv __attribute__((unused)))
}
#endif /* !defined(DONT_USE_ALARM_THREAD) */
#endif /* WIN */
#endif /* MAIN */
......@@ -31,7 +31,7 @@ ${CMAKE_CURRENT_BINARY_DIR}/lex_hash.h
SET_SOURCE_FILES_PROPERTIES(${GEN_SOURCES} PROPERTIES GENERATED 1)
ADD_DEFINITIONS(-DMYSQL_SERVER -DHAVE_EVENT_SCHEDULER)
ADD_DEFINITIONS(-DMYSQL_SERVER -DHAVE_EVENT_SCHEDULER -DHAVE_POOL_OF_THREADS)
IF(SSL_DEFINES)
ADD_DEFINITIONS(${SSL_DEFINES})
ENDIF()
......@@ -82,9 +82,16 @@ SET (SQL_SOURCE
opt_index_cond_pushdown.cc opt_subselect.cc
opt_table_elimination.cc sql_expression_cache.cc
gcalc_slicescan.cc gcalc_tools.cc
threadpool_common.cc
${GEN_SOURCES}
${MYSYS_LIBWRAP_SOURCE})
${MYSYS_LIBWRAP_SOURCE}
)
IF(WIN32)
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc)
ELSE()
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_unix.cc)
ENDIF()
MYSQL_ADD_PLUGIN(partition ha_partition.cc STORAGE_ENGINE DEFAULT STATIC_ONLY
RECOMPILE_FOR_EMBEDDED)
......
......@@ -73,6 +73,7 @@
#include <waiting_threads.h>
#include "debug_sync.h"
#include "sql_callback.h"
#include "threadpool.h"
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
#include "../storage/perfschema/pfs_server.h"
......@@ -5236,6 +5237,8 @@ default_service_handling(char **argv,
int mysqld_main(int argc, char **argv)
{
my_progname= argv[0];
/*
When several instances are running on the same machine, we
need to have an unique named hEventShudown through the
......@@ -7132,6 +7135,10 @@ SHOW_VAR status_vars[]= {
{"Tc_log_max_pages_used", (char*) &tc_log_max_pages_used, SHOW_LONG},
{"Tc_log_page_size", (char*) &tc_log_page_size, SHOW_LONG},
{"Tc_log_page_waits", (char*) &tc_log_page_waits, SHOW_LONG},
#endif
#ifndef EMBEDDED_LIBRARY
{"Threadpool_idle_threads", (char *) &tp_stats.num_waiting_threads, SHOW_INT},
{"Threadpool_threads", (char *) &tp_stats.num_worker_threads, SHOW_INT},
#endif
{"Threads_cached", (char*) &cached_thread_count, SHOW_LONG_NOFLUSH},
{"Threads_connected", (char*) &connection_count, SHOW_INT},
......@@ -8018,7 +8025,9 @@ static int get_options(int *argc_ptr, char ***argv_ptr)
else if (thread_handling == SCHEDULER_NO_THREADS)
one_thread_scheduler(thread_scheduler);
else
pool_of_threads_scheduler(thread_scheduler); /* purecov: tested */
pool_of_threads_scheduler(thread_scheduler, &max_connections,
&connection_count);
one_thread_per_connection_scheduler(extra_thread_scheduler,
&extra_max_connections,
&extra_connection_count);
......
......@@ -842,7 +842,7 @@ my_real_read(NET *net, size_t *complen)
DBUG_PRINT("info",("vio_read returned %ld errno: %d",
(long) length, vio_errno(net->vio)));
#if !defined(__WIN__) || defined(MYSQL_SERVER)
#if !defined(__WIN__) && defined(MYSQL_SERVER)
/*
We got an error that there was no data on the socket. We now set up
an alarm to not 'read forever', change the socket to the blocking
......@@ -874,7 +874,7 @@ my_real_read(NET *net, size_t *complen)
continue;
}
}
#endif /* (!defined(__WIN__) || defined(MYSQL_SERVER) */
#endif /* (!defined(__WIN__) && defined(MYSQL_SERVER) */
if (thr_alarm_in_use(&alarmed) && !thr_got_alarm(&alarmed) &&
interrupted)
{ /* Probably in MIT threads */
......
......@@ -79,7 +79,7 @@ static void scheduler_wait_sync_end(void) {
one_thread_scheduler() or one_thread_per_connection_scheduler() in
mysqld.cc, so this init function will always be called.
*/
static void scheduler_init() {
void scheduler_init() {
thr_set_lock_wait_callback(scheduler_wait_lock_begin,
scheduler_wait_lock_end);
thr_set_sync_wait_callback(scheduler_wait_sync_begin,
......@@ -124,25 +124,6 @@ void one_thread_scheduler(scheduler_functions *func)
}
#ifdef HAVE_POOL_OF_THREADS
/*
thd_scheduler keeps the link between THD and events.
It's embedded in the THD class.
*/
thd_scheduler::thd_scheduler()
: m_psi(NULL), logged_in(FALSE), io_event(NULL), thread_attached(FALSE)
{
}
thd_scheduler::~thd_scheduler()
{
my_free(io_event);
}
#endif
/*
no pluggable schedulers in mariadb.
......
......@@ -76,13 +76,11 @@ void one_thread_per_connection_scheduler(scheduler_functions *func,
ulong *arg_max_connections, uint *arg_connection_count);
void one_thread_scheduler(scheduler_functions *func);
#if defined(HAVE_LIBEVENT) && !defined(EMBEDDED_LIBRARY)
#define HAVE_POOL_OF_THREADS 1
struct event;
class thd_scheduler
/*
To be used for pool-of-threads (implemeneted differently on various OSs)
*/
struct thd_scheduler
{
public:
/*
......@@ -96,29 +94,33 @@ class thd_scheduler
differently.
*/
PSI_thread *m_psi;
bool logged_in;
struct event* io_event;
LIST list;
bool thread_attached; /* Indicates if THD is attached to the OS thread */
thd_scheduler();
~thd_scheduler();
bool init(THD* parent_thd);
bool thread_attach();
void thread_detach();
void *data; /* scheduler-specific data structure */
#ifndef DBUG_OFF
bool set_explain;
char dbug_explain[512];
#endif
};
void pool_of_threads_scheduler(scheduler_functions* func);
#else
void *thd_get_scheduler_data(THD *thd);
void thd_set_scheduler_data(THD *thd, void *data);
PSI_thread* thd_get_psi(THD *thd);
void thd_set_psi(THD *thd, PSI_thread *psi);
#define pool_of_threads_scheduler(A) \
one_thread_per_connection_scheduler(A, &max_connections, \
&connection_count)
/* Common thread pool routines, suitable for different implementations */
extern void threadpool_remove_connection(THD *thd);
extern int threadpool_process_request(THD *thd);
extern int threadpool_add_connection(THD *thd);
class thd_scheduler
{};
#endif
extern scheduler_functions *thread_scheduler;
#endif /* SCHEDULER_INCLUDED */
#if !defined(EMBEDDED_LIBRARY)
#define HAVE_POOL_OF_THREADS 1
void pool_of_threads_scheduler(scheduler_functions* func,
ulong *arg_max_connections,
uint *arg_connection_count);
#else
#define pool_of_threads_scheduler(A,B,C) \
one_thread_per_connection_scheduler(A, B, C)
#endif
......@@ -1538,33 +1538,8 @@ void THD::awake(killed_state state_to_set)
#ifdef SIGNAL_WITH_VIO_CLOSE
if (this != current_thd)
{
/*
Before sending a signal, let's close the socket of the thread
that is being killed ("this", which is not the current thread).
This is to make sure it does not block if the signal is lost.
This needs to be done only on platforms where signals are not
a reliable interruption mechanism.
Note that the downside of this mechanism is that we could close
the connection while "this" target thread is in the middle of
sending a result to the application, thus violating the client-
server protocol.
On the other hand, without closing the socket we have a race
condition. If "this" target thread passes the check of
thd->killed, and then the current thread runs through
THD::awake(), sets the 'killed' flag and completes the
signaling, and then the target thread runs into read(), it will
block on the socket. As a result of the discussions around
Bug#37780, it has been decided that we accept the race
condition. A second KILL awakes the target from read().
If we are killing ourselves, we know that we are not blocked.
We also know that we will check thd->killed before we go for
reading the next statement.
*/
close_active_vio();
if(active_vio)
vio_shutdown(active_vio, SHUT_RDWR);
}
#endif
......@@ -1740,6 +1715,10 @@ bool THD::store_globals()
mysys_var->stack_ends_here= thread_stack + // for consistency, see libevent_thread_proc
STACK_DIRECTION * (long)my_thread_stack_size;
#ifdef _WIN32
if (net.vio)
net.vio->thread_id= real_id; /* Required to support IO cancelation on XP */
#endif
/*
We have to call thr_lock_info_init() again here as THD may have been
created in another thread
......
......@@ -2339,6 +2339,10 @@ class THD :public Statement,
{
mysql_mutex_lock(&LOCK_thd_data);
active_vio = vio;
#ifdef _WIN32
/* Required to support cancelation on XP */
active_vio->thread_id = pthread_self();
#endif
mysql_mutex_unlock(&LOCK_thd_data);
}
inline void clear_active_vio()
......
......@@ -890,6 +890,7 @@ static int check_connection(THD *thd)
DBUG_PRINT("info",
("New connection received on %s", vio_description(net->vio)));
#ifdef SIGNAL_WITH_VIO_CLOSE
thd->set_active_vio(net->vio);
#endif
......@@ -1175,7 +1176,7 @@ void do_handle_one_connection(THD *thd_arg)
/* We need to set this because of time_out_user_resource_limits */
thd->start_utime= thd->thr_create_utime;
if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
if (MYSQL_CALLBACK_ELSE(thd->scheduler, init_new_connection_thread, (), 0))
{
close_connection(thd, ER_OUT_OF_RESOURCES);
statistic_increment(aborted_connects,&LOCK_status);
......
......@@ -678,6 +678,7 @@ void cleanup_items(Item *item)
@retval
1 request of thread shutdown (see dispatch_command() description)
*/
int skip_net_wait_timeout = 0;
bool do_command(THD *thd)
{
......@@ -700,8 +701,10 @@ bool do_command(THD *thd)
the client, the connection is closed or "net_wait_timeout"
number of seconds has passed.
*/
if(!skip_net_wait_timeout)
my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
/*
XXX: this code is here only to clear possible errors of init_connect.
Consider moving to init_connect() instead.
......
......@@ -50,6 +50,7 @@
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
#include "../storage/perfschema/pfs_server.h"
#endif /* WITH_PERFSCHEMA_STORAGE_ENGINE */
#include "threadpool.h"
/*
The rule for this file: everything should be 'static'. When a sys_var
......@@ -1804,7 +1805,13 @@ static Sys_var_enum Sys_thread_handling(
", pool-of-threads"
#endif
, READ_ONLY GLOBAL_VAR(thread_handling), CMD_LINE(REQUIRED_ARG),
thread_handling_names, DEFAULT(0));
thread_handling_names,
#ifdef HAVE_POOL_OF_THREADS
DEFAULT(2)
#else
DEFAULT(0)
#endif
);
#ifdef HAVE_QUERY_CACHE
static bool fix_query_cache_size(sys_var *self, THD *thd, enum_var_type type)
......@@ -2173,15 +2180,68 @@ static Sys_var_ulong Sys_thread_cache_size(
GLOBAL_VAR(thread_cache_size), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, 16384), DEFAULT(0), BLOCK_SIZE(1));
#ifdef HAVE_POOL_OF_THREADS
static Sys_var_ulong Sys_thread_pool_size(
"thread_pool_size",
"How many threads we should create to handle query requests in "
"case of 'thread_handling=pool-of-threads'",
GLOBAL_VAR(thread_pool_size), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1, 16384), DEFAULT(20), BLOCK_SIZE(0));
#ifndef HAVE_POOL_OF_THREADS
static bool fix_tp_max_threads(sys_var *, THD *, enum_var_type)
{
#ifdef _WIN32
tp_set_max_threads(threadpool_max_threads);
#endif
return false;
}
#ifdef _WIN32
static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type)
{
tp_set_min_threads(threadpool_min_threads);
return false;
}
#endif
#ifdef _WIN32
static Sys_var_uint Sys_threadpool_min_threads(
"thread_pool_min_threads",
"Minimuim number of threads in the thread pool.",
GLOBAL_VAR(threadpool_min_threads), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1, 256), DEFAULT(1), BLOCK_SIZE(1),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_tp_min_threads)
);
#else
static Sys_var_uint Sys_threadpool_idle_thread_timeout(
"thread_pool_idle_timeout",
"Timeout in seconds for an idle thread in the thread pool."
"Worker thread will be shut down after timeout",
GLOBAL_VAR(threadpool_idle_timeout), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1, UINT_MAX/100), DEFAULT(60000), BLOCK_SIZE(1)
);
static Sys_var_uint Sys_threadpool_size(
"thread_pool_size",
"Number of concurrently executing threads in the pool. "
"Leaving value default (0) sets it to the number of processors.",
GLOBAL_VAR(threadpool_size), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, 128), DEFAULT(0), BLOCK_SIZE(1)
);
static Sys_var_uint Sys_threadpool_stall_limit(
"thread_pool_stall_limit",
"Maximum query execution time before in milliseconds,"
"before an executing non-yielding thread is considered stalled."
"If a worker thread is stalled, additional worker thread "
"may be created to handle remaining clients.",
GLOBAL_VAR(threadpool_stall_limit), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(60, UINT_MAX), DEFAULT(500), BLOCK_SIZE(1)
);
#endif /*! WIN32 */
static Sys_var_uint Sys_threadpool_max_threads(
"thread_pool_max_threads",
"Maximum allowed number of worker threads in the thread pool",
GLOBAL_VAR(threadpool_max_threads), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1, UINT_MAX), DEFAULT(3000), BLOCK_SIZE(1),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_tp_max_threads)
);
#endif /* !HAVE_POOL_OF_THREADS */
/**
Can't change the 'next' tx_isolation if we are already in a
transaction.
......
/* Threadpool parameters */
#ifdef _WIN32
extern uint threadpool_min_threads; /* Minimum threads in pool */
#else
extern uint threadpool_idle_timeout; /* Shutdown idle worker threads after this timeout */
extern uint threadpool_size; /* Number of parallel executing threads */
extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/
#endif
extern uint threadpool_max_threads; /* Maximum threads in pool */
/*
Threadpool statistics
*/
struct TP_STATISTICS
{
/* Current number of worker thread. */
volatile int num_worker_threads;
/* Current number of idle threads. */
volatile int num_waiting_threads;
/* Number of login requests are queued but not yet processed. */
volatile int pending_login_requests;
/* Number of threads that are starting. */
volatile int pending_thread_starts;
/* Number of threads that are being shut down */
volatile int pending_thread_shutdowns;
/* Time (in milliseconds) since pool is blocked (num_waiting_threads is 0) */
ulonglong pool_block_duration;
/* Maximum duration of the pending login, im milliseconds. */
ulonglong pending_login_duration;
/* Time since last thread was created */
ulonglong time_since_last_thread_creation;
/* Number of requests processed since pool monitor run last time. */
volatile int requests_dequeued;
volatile int requests_completed;
};
extern TP_STATISTICS tp_stats;
/* Functions to set threadpool parameters */
extern void tp_set_min_threads(uint val);
extern void tp_set_max_threads(uint val);
/* Activate threadpool scheduler */
extern void tp_scheduler(void);
#include <my_global.h>
#include <violite.h>
#include <sql_priv.h>
#include <sql_class.h>
#include <my_pthread.h>
#include <scheduler.h>
#include <sql_connect.h>
#include <sql_audit.h>
#include <debug_sync.h>
extern bool login_connection(THD *thd);
extern bool do_command(THD *thd);
extern void prepare_new_connection_state(THD* thd);
extern void end_connection(THD *thd);
extern void thd_cleanup(THD *thd);
extern void delete_thd(THD *thd);
/* Threadpool parameters */
#ifdef _WIN32
uint threadpool_min_threads;
#else
uint threadpool_idle_timeout;
uint threadpool_size;
uint threadpool_stall_limit;
#endif
uint threadpool_max_threads;
/*
Attach/associate the connection with the OS thread, for command processing.
*/
static inline bool thread_attach(THD* thd, char *stack_start, PSI_thread **save_psi_thread)
{
DBUG_ENTER("thread_attach");
if (PSI_server)
{
*save_psi_thread= PSI_server->get_thread();
PSI_server->set_thread(thd->event_scheduler.m_psi);
}
else
*save_psi_thread= NULL;
/*
We need to know the start of the stack so that we could check for
stack overruns.
*/
thd->thread_stack= stack_start;
/* Calls close_connection() on failure */
if (setup_connection_thread_globals(thd))
{
DBUG_RETURN(TRUE);
}
/* clear errors from processing the previous THD */
my_errno= 0;
thd->mysys_var->abort= 0;
#ifndef DBUG_OFF
if (thd->event_scheduler.set_explain)
DBUG_SET(thd->event_scheduler.dbug_explain);
#endif
DBUG_RETURN(FALSE);
}
/*
Detach/disassociate the connection with the OS thread.
*/
static inline void thread_detach(THD* thd, PSI_thread *restore_psi_thread)
{
DBUG_ENTER("thread_detach");
thd->mysys_var = NULL;
#ifndef DBUG_OFF
/*
If during the session @@session.dbug was assigned, the
dbug options/state has been pushed. Check if this is the
case, to be able to restore the state when we attach this
logical connection to a physical thread.
*/
if (_db_is_pushed_())
{
thd->event_scheduler.set_explain= TRUE;
if (DBUG_EXPLAIN(thd->event_scheduler.dbug_explain, sizeof(thd->event_scheduler.dbug_explain)))
sql_print_error("thd_scheduler: DBUG_EXPLAIN buffer is too small");
}
/* DBUG_POP() is a no-op in case there is no session state */
DBUG_POP();
#endif
if (PSI_server)
PSI_server->set_thread(restore_psi_thread);
pthread_setspecific(THR_THD, NULL);
DBUG_VOID_RETURN;
}
int threadpool_add_connection(THD *thd)
{
int retval=1;
PSI_thread *psi_thread;
#ifndef DBUG_OFF
thd->event_scheduler.set_explain = 0;
#endif
thread_attach(thd, (char *)&thd, &psi_thread);
ulonglong now= microsecond_interval_timer();
thd->prior_thr_create_utime= now;
thd->start_utime= now;
thd->thr_create_utime= now;
if (PSI_server)
{
thd->event_scheduler.m_psi =
PSI_server->new_thread(key_thread_one_connection, thd, thd->thread_id);
PSI_server->set_thread(thd->event_scheduler.m_psi);
}
if (setup_connection_thread_globals(thd) == 0)
{
if (login_connection(thd) == 0)
{
prepare_new_connection_state(thd);
retval = thd_is_connection_alive(thd)?0:-1;
thd->net.reading_or_writing= 1;
}
}
thread_detach(thd, psi_thread);
return retval;
}
void threadpool_remove_connection(THD *thd)
{
PSI_thread *save_psi_thread;
thread_attach(thd, (char *)&thd, &save_psi_thread);
thd->killed= KILL_CONNECTION;
thd->net.reading_or_writing= 0;
end_connection(thd);
close_connection(thd, 0);
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->event_scheduler.data= NULL;
mysql_mutex_unlock(&thd->LOCK_thd_data);
unlink_thd(thd);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_thread_count);
DBUG_POP();
if (PSI_server)
PSI_server->delete_current_thread();
pthread_setspecific(THR_THD, NULL);
}
int threadpool_process_request(THD *thd)
{
int retval= 0;
PSI_thread *psi_thread;
thread_attach(thd, (char *)&thd, &psi_thread);
if (thd->killed == KILL_CONNECTION)
{
/*
kill flag can be set have been killed by
timeout handler or by a KILL command
*/
thread_detach(thd, psi_thread);
return 1;
}
for(;;)
{
Vio *vio;
thd->net.reading_or_writing= 0;
mysql_audit_release(thd);
if ((retval= do_command(thd)) != 0)
break ;
if (!thd_is_connection_alive(thd))
{
retval= 1;
break;
}
vio= thd->net.vio;
if (!vio->has_data(vio))
{
/*
More info on this debug sync is in sql_parse.cc
*/
DEBUG_SYNC(thd, "before_do_command_net_read");
break;
}
}
thread_detach(thd, psi_thread);
if (!retval)
thd->net.reading_or_writing= 1;
return retval;
}
/*
Scheduler struct, individual functions are implemented
in threadpool_unix.cc or threadpool_win.cc
*/
extern bool tp_init();
extern void tp_add_connection(THD*);
extern void tp_wait_begin(THD *, int);
extern void tp_wait_end(THD*);
extern void tp_post_kill_notification(THD *thd);
extern void tp_end(void);
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
NULL,
NULL,
tp_init, // init
NULL, // init_new_connection_thread
tp_add_connection, // add_connection
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post_kill_notification
NULL, // end_thread
tp_end // end
};
extern void scheduler_init();
void pool_of_threads_scheduler(struct scheduler_functions *func,
ulong *arg_max_connections,
uint *arg_connection_count)
{
*func = tp_scheduler_functions;
func->max_threads= *arg_max_connections + 1;
func->max_connections= arg_max_connections;
func->connection_count= arg_connection_count;
scheduler_init();
}
#include <my_global.h>
#include <violite.h>
#include <sql_priv.h>
#include <sql_class.h>
#include <my_pthread.h>
#include <scheduler.h>
#include <sql_connect.h>
#include <mysqld.h>
#include <debug_sync.h>
#include <sys/queue.h>
#include <time.h>
#include <threadpool.h>
#ifdef __linux__
#include <sys/epoll.h>
typedef struct epoll_event native_event;
#endif
#if defined (__FreeBSD__) || defined (__APPLE__)
#include <sys/event.h>
typedef struct kevent native_event;
#endif
#if defined (__sun)
#include <port.h>
typedef port_event_t native_event;
#endif
static PSI_mutex_key key_group_mutex;
static PSI_mutex_key key_timer_mutex;
static PSI_mutex_info mutex_list[]=
{
{ &key_group_mutex, "group_mutex", 0},
{ &key_timer_mutex, "timer_mutex", PSI_FLAG_GLOBAL}
};
static PSI_cond_key key_worker_cond;
static PSI_cond_key key_timer_cond;
static PSI_cond_info cond_list[]=
{
{ &key_worker_cond, "worker_cond", 0},
{ &key_timer_cond, "timer_cond", PSI_FLAG_GLOBAL}
};
static PSI_thread_key key_worker_thread;
static PSI_thread_key key_timer_thread;
static PSI_thread_info thread_list[] =
{
{&key_worker_thread, "worker_thread", 0},
{&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL}
};
TP_STATISTICS tp_stats;
struct thread_group_t;
/* Per-thread structure for workers */
struct worker_thread_t
{
mysql_cond_t cond;
bool woken;
thread_group_t* thread_group;
ulonglong event_count; /* Stats: number of executed requests */
SLIST_ENTRY(worker_thread_t) ptr;
};
/*
Data associated with an io event (also can be sent with with explicit
post_event())
*/
struct pool_event_t
{
STAILQ_ENTRY (pool_event_t) next;
void *data;
};
static pool_event_t POOL_SHUTDOWN_EVENT;
struct thread_group_t
{
mysql_mutex_t mutex;
STAILQ_HEAD(queue_listhead, pool_event_t) queue;
SLIST_HEAD(wait_listhead, worker_thread_t) waiting_threads;
int pollfd;
int thread_count;
int active_thread_count;
int pending_thread_start_count;
int connection_count;
bool shutdown;
bool stalled;
int shutdown_pipe[2];
worker_thread_t *listener;
pthread_attr_t *pthread_attr;
ulonglong last_thread_creation_time;
/* Stats for the deadlock detection timer routine.*/
ulonglong io_event_count;
ulonglong queue_event_count;
} MY_ALIGNED(512);
static thread_group_t all_groups[128];
/* Global timer for all groups */
struct pool_timer_t
{
mysql_mutex_t mutex;
mysql_cond_t cond;
int tick_interval;
volatile ulonglong current_microtime;
volatile ulonglong next_timeout_check;
bool shutdown;
};
static pool_timer_t pool_timer;
struct connection_t
{
pool_event_t event;
THD *thd;
thread_group_t *thread_group;
ulonglong abs_wait_timeout;
bool logged_in;
bool waiting;
};
/* Externals functions and variables we use */
extern uint thread_created;
extern void scheduler_init();
extern pthread_attr_t *get_connection_attrib(void);
extern int skip_net_wait_timeout;
static void post_event(thread_group_t *thread_group, pool_event_t* ev);
static int wake_thread(thread_group_t *thread_group);
static void handle_event(pool_event_t *ev);
static int wake_or_create_thread(thread_group_t *thread_group);
static int create_worker(thread_group_t *thread_group);
static void *worker_main(void *param);
static void check_stall(thread_group_t *thread_group);
static void connection_abort(connection_t *connection);
void tp_post_kill_notification(THD *thd);
static void set_wait_timeout(connection_t *connection);
static void set_next_timeout_check(ulonglong abstime);
/**
Asynchronous network IO.
We use native edge-triggered network IO multiplexing facility.
This maps to different APIs on different Unixes.
Supported are currently Linux with epoll, Solaris with event ports,
OSX and BSD with kevent. All those API's are used with one-shot flags
(the event is signalled once client has written something into the socket,
then socket is removed from the "poll-set" until the command is finished,
and we need to re-arm/re-register socket)
No implementation for poll/select/AIO is currently provided.
The API closely resembles all of the above mentioned platform APIs
and consists of following functions.
- io_poll_create()
Creates an io_poll descriptor
On Linux: epoll_create()
- io_poll_associate_fd(int poll_fd, int fd, void *data)
Associate file descriptor with io poll descriptor
On Linux : epoll_ctl(..EPOLL_CTL_ADD))
- io_poll_disassociate_fd(int pollfd, int fd)
Associate file descriptor with io poll descriptor
On Linux: epoll_ctl(..EPOLL_CTL_DEL)
- io_poll_start_read(int poll_fd,int fd, void *data)
The same as io_poll_associate_fd(), but cannot be used before
io_poll_associate_fd() was called.
On Linux : epoll_ctl(..EPOLL_CTL_MOD)
- io_poll_wait (int pollfd, native_event *native_events, int maxevents,
int timeout_ms)
wait until one or more descriptors added with io_poll_associate_fd()
or io_poll_start_read() becomes readable. Data associated with
descriptors can be retrieved from native_events array, using
native_event_get_userdata() function.
On Linux: epoll_wait()
*/
#if defined (__linux__)
static int io_poll_create()
{
return epoll_create(1);
}
int io_poll_associate_fd(int pollfd, int fd, void *data)
{
struct epoll_event ev;
ev.data.ptr= data;
ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
return epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &ev);
}
int io_poll_start_read(int pollfd, int fd, void *data)
{
struct epoll_event ev;
ev.data.ptr= data;
ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT;
return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev);
}
void io_poll_disassociate_fd(int pollfd, int fd)
{
struct epoll_event ev;
epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev);
}
int io_poll_wait(int pollfd, native_event *native_events, int maxevents,
int timeout_ms)
{
int ret;
do
{
ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms);
}
while(ret == -1 && errno == EINTR);
return ret;
}
static void *native_event_get_userdata(native_event *event)
{
return event->data.ptr;
}
#elif defined (__FreeBSD__) || defined (__APPLE__)
int io_poll_create()
{
return kqueue();
}
int io_poll_start_read(int pollfd, int fd, void *data)
{
struct kevent ke;
EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ENABLE|EV_CLEAR,
0, 0, data);
return kevent(pollfd, &ke, 1, 0, 0, 0);
}
int io_poll_associate_fd(int pollfd, int fd, void *data)
{
return io_poll_start_read(poolfd,fd, data);
}
int io_poll_disassociate_fd(thread_group_t *thread_group, int fd)
{
struct kevent ke;
EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
return kevent(thread_group->pollfd, &ke, 1, 0, 0, 0);
}
int io_poll_wait(int pollfd, struct kevent *events, int maxevents, int timeout_ms)
{
struct timespec ts;
int ret;
if (timeout_ms >= 0)
{
ts.tv_sec= timeout_ms/1000;
ts.tv_nsec= (timeout_ms%1000)*1000000;
}
do
{
ret= kevent(pollfd, 0, 0, events, maxevents,
(timeout_ms >= 0)?&ts:NULL);
}
while (ret == -1 && errno == EINTR);
if (ret > 0)
{
/* Disable monitoring for the events we that we dequeued */
for (int i=0; i < ret; i++)
{
struct kevent *ke = &events[i];
EV_SET(ke, ke->ident, EVFILT_READ, EV_ADD|EV_DISABLE,
0, 0, ke->udata);
}
kevent(pollfd, events, ret, 0, 0, 0);
}
return ret;
}
static void* native_event_get_userdata(native_event *event)
{
return event->udata;
}
#elif defined (__sun)
static int io_poll_create()
{
return port_create();
}
int io_poll_start_read(int pollfd, int fd, void *data)
{
return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
}
static int io_poll_associate_fd(int pollfd, int fd, void *data)
{
return io_poll_start_read(pollfd, fd, data);
}
int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
{
struct timespec ts;
int ret;
uint_t nget= 1;
if (timeout_ms >= 0)
{
ts.tv_sec= timeout_ms/1000;
ts.tv_nsec= (timeout_ms%1000)*1000000;
}
do
{
ret= port_getn(pollfd, events, maxevents, &nget,
(timeout_ms >= 0)?&ts:NULL);
}
while (ret == -1 && errno == EINTR);
return nget;
}
static void* native_event_get_userdata(native_event *event)
{
return event->portev_user;
}
#else
#error not ported yet to this OS
#endif
/* Dequeue element from a workqueue */
static pool_event_t *queue_get(thread_group_t *thread_group)
{
DBUG_ENTER("queue_get");
pool_event_t *ev= NULL;
thread_group->queue_event_count++;
ev= STAILQ_FIRST(&thread_group->queue);
if (ev)
{
STAILQ_REMOVE_HEAD(&thread_group->queue,next);
}
DBUG_RETURN(ev);
}
/* Check if workqueue is empty. */
static bool queue_is_empty(thread_group_t* thread_group)
{
DBUG_ENTER("queue_is_empty");
bool empty= (STAILQ_FIRST(&thread_group->queue) == NULL);
DBUG_RETURN(empty);
}
static void queue_put(thread_group_t *thread_group, pool_event_t *event)
{
DBUG_ENTER("queue_put");
STAILQ_INSERT_TAIL(&thread_group->queue, event, next);
DBUG_VOID_RETURN;
}
static void increment_active_threads(thread_group_t *thread_group)
{
my_atomic_add32(&tp_stats.num_waiting_threads,-1);
thread_group->active_thread_count++;
}
static void decrement_active_threads(thread_group_t *thread_group)
{
my_atomic_add32(&tp_stats.num_waiting_threads,1);
thread_group->active_thread_count--;
}
/*
Handle wait timeout :
Find connections that have been idle for too long and kill them.
Also, recalculate time when next timeout check should run.
*/
static void timeout_check(pool_timer_t *timer)
{
DBUG_ENTER("timeout_check");
mysql_mutex_lock(&LOCK_thread_count);
I_List_iterator<THD> it(threads);
/* Reset next timeout check, it will be recalculated in the loop below */
my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX);
THD *thd;
while ((thd=it++))
{
if (thd->net.reading_or_writing != 1)
continue;
connection_t *connection= (connection_t *)thd->scheduler.data;
if (!connection)
continue;
if(connection->abs_wait_timeout < timer->current_microtime)
{
/* Wait timeout exceeded, kill connection. */
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->killed = THD::KILL_CONNECTION;
tp_post_kill_notification(thd);
mysql_mutex_unlock(&thd->LOCK_thd_data);
}
else
{
set_next_timeout_check(connection->abs_wait_timeout);
}
}
mysql_mutex_unlock(&LOCK_thread_count);
DBUG_VOID_RETURN;
}
/*
Timer thread.
Periodically, check if one of the thread groups is stalled. Stalls happen if
events are not being dequeued from the queue, or from the network, Primary
reason for stall can be a lengthy executing non-blocking request. It could
also happen that thread is waiting but wait_begin/wait_end is forgotten by
storage engine. Timer thread will create a new thread in group in case of
a stall.
Besides checking for stalls, timer thread is also responsible for terminating
clients that have been idle for longer than wait_timeout seconds.
*/
static void* timer_thread(void *param)
{
uint i;
pool_timer_t* timer=(pool_timer_t *)param;
timer->next_timeout_check= ULONGLONG_MAX;
timer->current_microtime= my_micro_time();
my_thread_init();
DBUG_ENTER("timer_thread");
for(;;)
{
struct timespec ts;
set_timespec_nsec(ts,timer->tick_interval*1000000);
mysql_mutex_lock(&timer->mutex);
int err = mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
if (timer->shutdown)
break;
if (err == ETIMEDOUT)
{
timer->current_microtime= my_micro_time();
/* Check stallls in thread groups */
for(i=0; i< threadpool_size;i++)
{
if(all_groups[i].connection_count)
check_stall(&all_groups[i]);
}
/* Check if any client exceeded wait_timeout */
if (timer->next_timeout_check <= timer->current_microtime)
timeout_check(timer);
}
mysql_mutex_unlock(&timer->mutex);
}
DBUG_POP();
my_thread_end();
return NULL;
}
void check_stall(thread_group_t *thread_group)
{
if (mysql_mutex_trylock(&thread_group->mutex) != 0)
{
/* Something happens. Don't disturb */
return;
}
/*
Check if listener is present. If not, check whether any IO
events were dequeued since last time. If not, this means
listener is either in tight loop or thd_wait_begin()
was forgotten. Create a new worker(it will make itself listener).
*/
if (!thread_group->listener && !thread_group->io_event_count)
{
wake_or_create_thread(thread_group);
mysql_mutex_unlock(&thread_group->mutex);
return;
}
/* Reset io event count */
thread_group->io_event_count= 0;
/*
Check whether requests from the workqueue are being dequeued.
*/
if (!queue_is_empty(thread_group) && !thread_group->queue_event_count)
{
thread_group->stalled= true;
wake_or_create_thread(thread_group);
}
/* Reset queue event count */
thread_group->queue_event_count= 0;
mysql_mutex_unlock(&thread_group->mutex);
}
static void start_timer(pool_timer_t* timer)
{
pthread_t thread_id;
DBUG_ENTER("start_timer");
mysql_mutex_init(key_timer_mutex,&timer->mutex, NULL);
mysql_cond_init(key_timer_cond, &timer->cond, NULL);
timer->shutdown = false;
mysql_thread_create(key_timer_thread,&thread_id, NULL, timer_thread, timer);
DBUG_VOID_RETURN;
}
static void stop_timer(pool_timer_t *timer)
{
DBUG_ENTER("stop_timer");
mysql_mutex_lock(&timer->mutex);
timer->shutdown = true;
mysql_cond_signal(&timer->cond);
mysql_mutex_unlock(&timer->mutex);
DBUG_VOID_RETURN;
}
#define MAX_EVENTS 1024
/*
Poll for socket events and distribute them to worker threads.
In many case current thread will handle single event itself.
*/
static pool_event_t * listener(worker_thread_t *current_thread,
thread_group_t *thread_group)
{
DBUG_ENTER("listener");
for(;;)
{
native_event ev[MAX_EVENTS];
int cnt;
if (thread_group->shutdown)
{
DBUG_RETURN(&POOL_SHUTDOWN_EVENT);
}
do
{
cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
}
while(cnt <= 0 && errno == EINTR);
if (cnt <=0)
{
DBUG_ASSERT(thread_group->shutdown);
DBUG_RETURN(&POOL_SHUTDOWN_EVENT);
}
/*
Put events to queue, maybe wakeup workers.
If queue is currently empty, listener will return
so the current thread handles query itself, this avoids
wakeups and context switches. But if queue is not empty
this smells like a flood of queries, and the listener
stays.
*/
mysql_mutex_lock(&thread_group->mutex);
if (thread_group->shutdown)
{
mysql_mutex_unlock(&thread_group->mutex);
DBUG_RETURN(&POOL_SHUTDOWN_EVENT);
}
thread_group->io_event_count += cnt;
bool pick_event= queue_is_empty(thread_group);
for(int i=(pick_event)?1:0; i < cnt ; i++)
{
pool_event_t *e= (pool_event_t *)native_event_get_userdata(&ev[i]);
queue_put(thread_group, e);
}
/* Wake at most one worker thread */
if(thread_group->active_thread_count==0 &&
/*!queue_is_empty(thread_group)*/ !pick_event)
{
if(wake_thread(thread_group))
{
if(thread_group->thread_count == 1)
create_worker(thread_group);
}
}
mysql_mutex_unlock(&thread_group->mutex);
if (pick_event)
DBUG_RETURN((pool_event_t *)(native_event_get_userdata(&ev[0])));
}
}
/* Creates a new worker thread. thread_mutex must be held when calling this function */
static int create_worker(thread_group_t *thread_group)
{
pthread_t thread_id;
int err;
DBUG_ENTER("create_worker");
if (tp_stats.num_worker_threads >= (int)threadpool_max_threads)
{
DBUG_PRINT("info",
("Cannot create new thread (maximum allowed threads reached)"));
DBUG_RETURN(-1);
}
err= pthread_create(&thread_id, thread_group->pthread_attr, worker_main, thread_group);
if (!err)
{
thread_group->pending_thread_start_count++;
thread_group->last_thread_creation_time=my_micro_time();
}
DBUG_RETURN(err);
}
/*
Wakes a worker thread, or creates a new one.
Worker creation is throttled, so we avoid too many threads
to be created during the short time.
*/
static int wake_or_create_thread(thread_group_t *thread_group)
{
ulonglong now;
ulonglong time_since_last_thread_created;
DBUG_ENTER("wake_or_create_thread");
if (wake_thread(thread_group) == 0)
DBUG_RETURN(0);
if (thread_group->pending_thread_start_count > 0)
DBUG_RETURN(-1);
if (thread_group->thread_count < 4)
{
DBUG_RETURN(create_worker(thread_group));
}
now = my_micro_time();
time_since_last_thread_created =
(now - thread_group->last_thread_creation_time)/1000;
if (thread_group->active_thread_count == 0)
{
/*
We're better off creating a new thread here with no delay, as
others threads (at least 4) are all blocking and there was no sleeping
thread to wakeup. It smells like deadlock or very slowly executing
requests, e.g sleeps or user locks.
*/
DBUG_RETURN(create_worker(thread_group));
}
/* Throttle thread creation. */
if ((thread_group->thread_count < 8 && time_since_last_thread_created > 50)
|| (thread_group->thread_count < 16 && time_since_last_thread_created > 100)
|| (time_since_last_thread_created > 200))
{
DBUG_RETURN(create_worker(thread_group));
}
DBUG_RETURN(-1);
}
/* Initialize thread group */
int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
{
DBUG_ENTER("thread_group_init");
memset(thread_group, 0, sizeof(thread_group_t));
thread_group->pthread_attr = thread_attr;
mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
STAILQ_INIT(&thread_group->queue);
SLIST_INIT(&thread_group->waiting_threads);
thread_group->pending_thread_start_count= 0;
thread_group->pollfd= io_poll_create();
thread_group->stalled= false;
if (thread_group->pollfd < 0)
{
DBUG_RETURN(-1);
}
if (pipe(thread_group->shutdown_pipe))
{
DBUG_RETURN(-1);
}
if (io_poll_associate_fd(thread_group->pollfd,
thread_group->shutdown_pipe[0], &POOL_SHUTDOWN_EVENT))
{
DBUG_RETURN(-1);
}
DBUG_RETURN(0);
}
/*
Wake single sleeping thread in pool. Optionally, tell this thread
to listen to socket io notification.
*/
static int wake_thread(thread_group_t *thread_group)
{
DBUG_ENTER("wake_thread");
worker_thread_t *thread = SLIST_FIRST(&thread_group->waiting_threads);
if(thread)
{
thread->woken= true;
SLIST_REMOVE_HEAD(&thread_group->waiting_threads, ptr);
if (mysql_cond_signal(&thread->cond))
abort();
DBUG_RETURN(0);
}
DBUG_RETURN(-1); /* no thread- missed wakeup*/
}
/*
Shutdown thread group.
*/
static void thread_group_close(thread_group_t *thread_group)
{
DBUG_ENTER("thread_group_close");
char c= 0;
mysql_mutex_lock(&thread_group->mutex);
thread_group->shutdown= true;
thread_group->listener= NULL;
/* Wake listener. */
if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
DBUG_VOID_RETURN;
/* Wake all workers. */
while(wake_thread(thread_group) == 0) {};
mysql_mutex_unlock(&thread_group->mutex);
#if 0
/* Wait until workers terminate */
while(thread_group->thread_count)
usleep(1000);
#endif
DBUG_VOID_RETURN;
}
/*
Post a task to the workqueue, maybe wake a worker so
it picks the task.
*/
static void post_event(thread_group_t *thread_group, pool_event_t* ev)
{
DBUG_ENTER("post_event");
mysql_mutex_lock(&thread_group->mutex);
STAILQ_INSERT_TAIL(&thread_group->queue, ev, next);
if (thread_group->active_thread_count == 0)
{
wake_or_create_thread(thread_group);
}
mysql_mutex_unlock(&thread_group->mutex);
DBUG_VOID_RETURN;
}
/*
Check if pool is already overcommited.
This is used to prevent too many threads executing at the same time,
if the workload is not CPU bound.
*/
static bool too_many_threads(thread_group_t *thread_group)
{
return (thread_group->active_thread_count > 4 && !thread_group->stalled);
}
/*
Dequeue a work item.
If it is not immediately available, thread will sleep until
work is available (it also can become IO listener for a while).
*/
int get_event(worker_thread_t *current_thread, thread_group_t *thread_group,
pool_event_t **ev, struct timespec *ts)
{
DBUG_ENTER("get_event");
pool_event_t *first_event = NULL;
int err=0;
mysql_mutex_lock(&thread_group->mutex);
decrement_active_threads(thread_group);
DBUG_ASSERT(thread_group->active_thread_count >= 0);
do
{
if (thread_group->shutdown)
break;
/* Check if queue is not empty */
if (!too_many_threads(thread_group))
{
first_event= queue_get(thread_group);
if(first_event)
break;
}
/* If there is currently no listener in the group, become one. */
if(!thread_group->listener)
{
thread_group->listener= current_thread;
mysql_mutex_unlock(&thread_group->mutex);
first_event= listener(current_thread, thread_group);
mysql_mutex_lock(&thread_group->mutex);
/* There is no listener anymore, it just returned. */
thread_group->listener= NULL;
break;
}
/*
Last thing we try before going to sleep is to
pick a single event via epoll, without waiting (timeout 0)
*/
if (!too_many_threads(thread_group))
{
native_event nev;
if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
{
thread_group->io_event_count++;
first_event = (pool_event_t *)native_event_get_userdata(&nev);
break;
}
}
/* And now, finally sleep */
current_thread->woken = false; /* wake() sets this to true */
/*
Add current thread to the head of the waiting list and wait.
It is important to add thread to the head rather than tail
as it ensures LIFO wakeup order (hot caches, working inactivity timeout)
*/
SLIST_INSERT_HEAD(&thread_group->waiting_threads, current_thread, ptr);
if(ts)
err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex, ts);
else
err = mysql_cond_wait(&current_thread->cond, &thread_group->mutex);
if (!current_thread->woken)
{
/*
Thread was not signalled by wake(), it might be a spurious wakeup or
a timeout. Anyhow, we need to remove ourselves from the list now.
If thread was explicitly woken, than caller removed us from the list.
*/
SLIST_REMOVE(&thread_group->waiting_threads, current_thread, worker_thread_t, ptr);
}
if(err)
break;
}
while(true);
thread_group->stalled= false;
increment_active_threads(thread_group);
mysql_mutex_unlock(&thread_group->mutex);
if (first_event)
*ev = first_event;
else
*ev = &POOL_SHUTDOWN_EVENT;
DBUG_RETURN(err);
}
/*
Tells the pool that thread starts waiting on IO, lock, condition,
sleep() or similar.
Will wake another worker, and if there is no listener will
promote a listener,
*/
void wait_begin(thread_group_t *thread_group)
{
DBUG_ENTER("wait_begin");
mysql_mutex_lock(&thread_group->mutex);
decrement_active_threads(thread_group);
DBUG_ASSERT(thread_group->active_thread_count >=0);
DBUG_ASSERT(thread_group->connection_count > 0);
if((thread_group->active_thread_count == 0) &&
(!queue_is_empty(thread_group) || !thread_group->listener))
{
wake_or_create_thread(thread_group);
}
mysql_mutex_unlock(&thread_group->mutex);
DBUG_VOID_RETURN;
}
/*
Tells the pool current thread finished waiting.
*/
void wait_end(thread_group_t *thread_group)
{
DBUG_ENTER("wait_end");
mysql_mutex_lock(&thread_group->mutex);
increment_active_threads(thread_group);
mysql_mutex_unlock(&thread_group->mutex);
DBUG_VOID_RETURN;
}
/* Scheduler */
connection_t *alloc_connection(THD *thd)
{
DBUG_ENTER("alloc_connection");
connection_t* connection = (connection_t *)my_malloc(sizeof(connection_t),0);
if (connection)
{
connection->thd = thd;
connection->waiting= false;
connection->logged_in= false;
connection->abs_wait_timeout= ULONGLONG_MAX;
}
DBUG_RETURN(connection);
}
/*
Add a new connection to thread pool..
*/
void tp_add_connection(THD *thd)
{
DBUG_ENTER("tp_add_connection");
threads.append(thd);
mysql_mutex_unlock(&LOCK_thread_count);
connection_t *c= alloc_connection(thd);
if(c)
{
c->thread_group= &all_groups[c->thd->thread_id%threadpool_size];
mysql_mutex_lock(&c->thread_group->mutex);
c->thread_group->connection_count++;
mysql_mutex_unlock(&c->thread_group->mutex);
c->thd->scheduler.data = c;
post_event(c->thread_group,&c->event);
}
DBUG_VOID_RETURN;
}
static void connection_abort(connection_t *c)
{
DBUG_ENTER("connection_abort");
mysql_mutex_lock(&c->thread_group->mutex);
c->thread_group->connection_count--;
mysql_mutex_unlock(&c->thread_group->mutex);
threadpool_remove_connection(c->thd);
my_free(c);
DBUG_VOID_RETURN;
}
void tp_post_kill_notification(THD *thd)
{
DBUG_ENTER("tp_post_kill_notification");
if (current_thd == thd || thd->system_thread)
DBUG_VOID_RETURN;
if (thd->net.vio)
vio_shutdown(thd->net.vio, SHUT_RD);
DBUG_VOID_RETURN;
}
void tp_wait_begin(THD *thd, int type)
{
DBUG_ENTER("tp_wait_begin");
if (!thd)
DBUG_VOID_RETURN;
connection_t *connection = (connection_t *)thd->scheduler.data;
if(connection)
{
DBUG_ASSERT(!connection->waiting);
connection->waiting= true;
wait_begin(connection->thread_group);
}
DBUG_VOID_RETURN;
}
void tp_wait_end(THD *thd)
{
DBUG_ENTER("tp_wait_end");
if (!thd)
DBUG_VOID_RETURN;
connection_t *connection = (connection_t *)thd->scheduler.data;
if(connection)
{
DBUG_ASSERT(connection->waiting);
connection->waiting = false;
wait_end(connection->thread_group);
}
DBUG_VOID_RETURN;
}
static void set_next_timeout_check(ulonglong abstime)
{
DBUG_ENTER("set_next_timeout_check");
while(abstime < pool_timer.next_timeout_check)
{
longlong old= (longlong)pool_timer.next_timeout_check;
my_atomic_cas64((volatile int64*)&pool_timer.next_timeout_check,
&old, abstime);
}
DBUG_VOID_RETURN;
}
static void set_wait_timeout(connection_t *c)
{
DBUG_ENTER("set_wait_timeout");
/*
Calculate wait deadline for this connection.
Instead of using my_micro_time() which has a syscall
overhead, use pool_timer.current_microtime and take
into account that its value could be off by at most
one tick interval.
*/
c->abs_wait_timeout= pool_timer.current_microtime +
1000LL*pool_timer.tick_interval +
1000000LL*c->thd->variables.net_wait_timeout;
set_next_timeout_check(c->abs_wait_timeout);
DBUG_VOID_RETURN;
}
static void handle_event(pool_event_t *ev)
{
DBUG_ENTER("handle_event");
/* Normal case, handle query on connection */
connection_t *c = (connection_t*)(void *)ev;
bool do_login = (!c->logged_in);
int ret;
if (do_login)
{
ret= threadpool_add_connection(c->thd);
c->logged_in= true;
}
else
{
ret= threadpool_process_request(c->thd);
}
if(!ret)
{
set_wait_timeout(c);
int fd = c->thd->net.vio->sd;
if (do_login)
{
ret= io_poll_associate_fd(c->thread_group->pollfd, fd, c);
}
else
ret= io_poll_start_read(c->thread_group->pollfd, fd, c);
}
if (ret)
{
connection_abort(c);
}
DBUG_VOID_RETURN;
}
static void *worker_main(void *param)
{
worker_thread_t this_thread;
thread_created++;
pthread_detach_this_thread();
my_thread_init();
DBUG_ENTER("worker_main");
thread_group_t *thread_group = (thread_group_t *)param;
/* Init per-thread structure */
mysql_cond_init(key_worker_cond, &this_thread.cond, NULL);
this_thread.thread_group= thread_group;
this_thread.event_count=0;
mysql_mutex_lock(&thread_group->mutex);
tp_stats.num_worker_threads++;
thread_group->thread_count++;
thread_group->active_thread_count++;
thread_group->pending_thread_start_count--;
mysql_mutex_unlock(&thread_group->mutex);
/* Run event loop */
for(;;)
{
struct pool_event_t *ev;
struct timespec ts;
set_timespec(ts,threadpool_idle_timeout);
if (get_event(&this_thread, thread_group, &ev, &ts)
|| ev == &POOL_SHUTDOWN_EVENT)
{
break;
}
this_thread.event_count++;
handle_event(ev);
}
/* Thread shutdown: cleanup per-worker-thread structure. */
mysql_cond_destroy(&this_thread.cond);
mysql_mutex_lock(&thread_group->mutex);
thread_group->active_thread_count--;
thread_group->thread_count--;
tp_stats.num_worker_threads--;
mysql_mutex_unlock(&thread_group->mutex);
/* If it is the last thread in pool and pool is terminating, destroy pool.*/
if (thread_group->shutdown && (thread_group->thread_count == 0))
{
/* last thread existing, cleanup the pool structure */
mysql_mutex_destroy(&thread_group->mutex);
}
DBUG_POP();
my_thread_end();
return NULL;
}
static bool started=false;
bool tp_init()
{
DBUG_ENTER("tp_init");
started = true;
scheduler_init();
skip_net_wait_timeout= 1;
if (threadpool_size == 0)
{
threadpool_size= my_getncpus();
}
for(uint i=0; i < threadpool_size; i++)
{
thread_group_init(&all_groups[i], get_connection_attrib());
}
#define PSI_register(X) \
if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
PSI_register(mutex);
PSI_register(cond);
PSI_register(thread);
pool_timer.tick_interval= threadpool_stall_limit;
start_timer(&pool_timer);
DBUG_RETURN(0);
}
void tp_end()
{
DBUG_ENTER("tp_end");
if (!started)
DBUG_VOID_RETURN;
stop_timer(&pool_timer);
for(uint i=0; i< threadpool_size; i++)
{
thread_group_close(&all_groups[i]);
}
DBUG_VOID_RETURN;
}
#ifdef _WIN32_WINNT
#undef _WIN32_WINNT
#endif
#define _WIN32_WINNT 0x0601
#include <my_global.h>
#include <violite.h>
#include <sql_priv.h>
#include <sql_class.h>
#include <my_pthread.h>
#include <scheduler.h>
#include <sql_connect.h>
#include <mysqld.h>
#include <debug_sync.h>
#include <threadpool.h>
#include <windows.h>
TP_STATISTICS tp_stats;
#define WEAK_SYMBOL(return_type, function, ...) \
typedef return_type (WINAPI *pFN_##function)(__VA_ARGS__); \
static pFN_##function my_##function = (pFN_##function) \
(GetProcAddress(GetModuleHandle("kernel32"),#function))
WEAK_SYMBOL(VOID, CancelThreadpoolIo, PTP_IO);
#define CancelThreadpoolIo my_CancelThreadpoolIo
WEAK_SYMBOL(VOID, CloseThreadpool, PTP_POOL);
#define CloseThreadpool my_CloseThreadpool
WEAK_SYMBOL(VOID, CloseThreadpoolIo, PTP_IO);
#define CloseThreadpoolIo my_CloseThreadpoolIo
WEAK_SYMBOL(VOID, CloseThreadpoolTimer,PTP_TIMER);
#define CloseThreadpoolTimer my_CloseThreadpoolTimer
WEAK_SYMBOL(VOID, CloseThreadpoolWait,PTP_WAIT);
#define CloseThreadpoolWait my_CloseThreadpoolWait
WEAK_SYMBOL(PTP_POOL, CreateThreadpool,PVOID);
#define CreateThreadpool my_CreateThreadpool
WEAK_SYMBOL(PTP_IO, CreateThreadpoolIo, HANDLE, PTP_WIN32_IO_CALLBACK, PVOID ,
PTP_CALLBACK_ENVIRON);
#define CreateThreadpoolIo my_CreateThreadpoolIo
WEAK_SYMBOL(PTP_TIMER, CreateThreadpoolTimer, PTP_TIMER_CALLBACK ,
PVOID pv, PTP_CALLBACK_ENVIRON pcbe);
#define CreateThreadpoolTimer my_CreateThreadpoolTimer
WEAK_SYMBOL(PTP_WAIT, CreateThreadpoolWait, PTP_WAIT_CALLBACK, PVOID,
PTP_CALLBACK_ENVIRON);
#define CreateThreadpoolWait my_CreateThreadpoolWait
WEAK_SYMBOL(VOID, DisassociateCurrentThreadFromCallback, PTP_CALLBACK_INSTANCE);
#define DisassociateCurrentThreadFromCallback my_DisassociateCurrentThreadFromCallback
WEAK_SYMBOL(DWORD, FlsAlloc, PFLS_CALLBACK_FUNCTION);
#define FlsAlloc my_FlsAlloc
WEAK_SYMBOL(PVOID, FlsGetValue, DWORD);
#define FlsGetValue my_FlsGetValue
WEAK_SYMBOL(BOOL, FlsSetValue, DWORD, PVOID);
#define FlsSetValue my_FlsSetValue
WEAK_SYMBOL(VOID, SetThreadpoolThreadMaximum, PTP_POOL, DWORD);
#define SetThreadpoolThreadMaximum my_SetThreadpoolThreadMaximum
WEAK_SYMBOL(BOOL, SetThreadpoolThreadMinimum, PTP_POOL, DWORD);
#define SetThreadpoolThreadMinimum my_SetThreadpoolThreadMinimum
WEAK_SYMBOL(VOID, SetThreadpoolTimer, PTP_TIMER, PFILETIME,DWORD,DWORD);
#define SetThreadpoolTimer my_SetThreadpoolTimer
WEAK_SYMBOL(VOID, SetThreadpoolWait, PTP_WAIT,HANDLE,PFILETIME);
#define SetThreadpoolWait my_SetThreadpoolWait
WEAK_SYMBOL(VOID, StartThreadpoolIo, PTP_IO);
#define StartThreadpoolIo my_StartThreadpoolIo
WEAK_SYMBOL(VOID, WaitForThreadpoolIoCallbacks,PTP_IO, BOOL);
#define WaitForThreadpoolIoCallbacks my_WaitForThreadpoolIoCallbacks
WEAK_SYMBOL(VOID, WaitForThreadpoolTimerCallbacks, PTP_TIMER, BOOL);
#define WaitForThreadpoolTimerCallbacks my_WaitForThreadpoolTimerCallbacks
WEAK_SYMBOL(VOID, WaitForThreadpoolWaitCallbacks, PTP_WAIT, BOOL);
#define WaitForThreadpoolWaitCallbacks my_WaitForThreadpoolWaitCallbacks
WEAK_SYMBOL(BOOL, SetFileCompletionNotificationModes, HANDLE, UCHAR);
#define SetFileCompletionNotificationModes my_SetFileCompletionNotificationModes
WEAK_SYMBOL(BOOL, TrySubmitThreadpoolCallback, PTP_SIMPLE_CALLBACK pfns,
PVOID pv,PTP_CALLBACK_ENVIRON pcbe);
#define TrySubmitThreadpoolCallback my_TrySubmitThreadpoolCallback
WEAK_SYMBOL(PTP_WORK, CreateThreadpoolWork, PTP_WORK_CALLBACK pfnwk, PVOID pv,
PTP_CALLBACK_ENVIRON pcbe);
#define CreateThreadpoolWork my_CreateThreadpoolWork
WEAK_SYMBOL(VOID, SubmitThreadpoolWork,PTP_WORK pwk);
#define SubmitThreadpoolWork my_SubmitThreadpoolWork
WEAK_SYMBOL(VOID, CloseThreadpoolWork, PTP_WORK pwk);
#define CloseThreadpoolWork my_CloseThreadpoolWork
#if _MSC_VER >= 1600
/* Stack size manipulation available only on Win7+ /declarations in VS10 */
WEAK_SYMBOL(BOOL, SetThreadpoolStackInformation, PTP_POOL,
PTP_POOL_STACK_INFORMATION);
#define SetThreadpoolStackInformation my_SetThreadpoolStackInformation
#endif
#if _MSC_VER < 1600
#define SetThreadpoolCallbackPriority(env,prio)
typedef enum _TP_CALLBACK_PRIORITY {
TP_CALLBACK_PRIORITY_HIGH,
TP_CALLBACK_PRIORITY_NORMAL,
TP_CALLBACK_PRIORITY_LOW,
TP_CALLBACK_PRIORITY_INVALID
} TP_CALLBACK_PRIORITY;
#endif
/* Log a warning */
static void tp_log_warning(const char *msg, const char *fct)
{
sql_print_warning("Threadpool: %s. %s failed (last error %d)",msg, fct,
GetLastError());
}
PTP_POOL pool;
DWORD fls;
extern int skip_net_wait_timeout;
static bool skip_completion_port_on_success = false;
/*
Threadpool callbacks.
io_completion_callback - handle client request
timer_callback - handle wait timeout (kill connection)
shm_read_callback, shm_close_callback - shared memory stuff
login_callback - user login (submitted as threadpool work)
*/
static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PTP_TIMER timer);
static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io);
static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result);
static void CALLBACK shm_close_callback(PTP_CALLBACK_INSTANCE instance,
PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result);
#define CONNECTION_SIGNATURE 0xAFFEAFFE
static void check_thread_init();
/* Get current time as Windows time */
static ulonglong now()
{
ulonglong current_time;
GetSystemTimeAsFileTime((PFILETIME)&current_time);
return current_time;
}
/*
Connection structure, encapsulates THD + structures for asynchronous
IO and pool.
*/
struct connection_t
{
THD *thd;
bool logged_in;
HANDLE handle;
OVERLAPPED overlapped;
/* absolute time for wait timeout (as Windows time) */
volatile ulonglong timeout;
PTP_CLEANUP_GROUP cleanup_group;
TP_CALLBACK_ENVIRON callback_environ;
PTP_IO io;
PTP_TIMER timer;
PTP_WAIT shm_read;
};
void init_connection(connection_t *connection)
{
connection->logged_in = false;
connection->handle= 0;
connection->io= 0;
connection->shm_read= 0;
connection->timer= 0;
connection->logged_in = false;
connection->timeout= ULONGLONG_MAX;
memset(&connection->overlapped, 0, sizeof(OVERLAPPED));
InitializeThreadpoolEnvironment(&connection->callback_environ);
SetThreadpoolCallbackPool(&connection->callback_environ, pool);
connection->thd = 0;
}
int init_io(connection_t *connection, THD *thd)
{
connection->thd= thd;
Vio *vio = thd->net.vio;
switch(vio->type)
{
case VIO_TYPE_SSL:
case VIO_TYPE_TCPIP:
connection->handle= (HANDLE)vio->sd;
break;
case VIO_TYPE_NAMEDPIPE:
connection->handle= (HANDLE)vio->hPipe;
break;
case VIO_TYPE_SHARED_MEMORY:
connection->shm_read= CreateThreadpoolWait(shm_read_callback, connection,
&connection->callback_environ);
if (!connection->shm_read)
{
tp_log_warning("Allocation failed", "CreateThreadpoolWait");
return -1;
}
break;
default:
abort();
}
if (connection->handle)
{
/* Performance tweaks (s. MSDN documentation)*/
UCHAR flags = FILE_SKIP_SET_EVENT_ON_HANDLE;
if (skip_completion_port_on_success)
{
flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
}
(void)SetFileCompletionNotificationModes(connection->handle, flags);
/* Assign io completion callback */
connection->io = CreateThreadpoolIo(connection->handle,
io_completion_callback, connection, &connection->callback_environ);
if(!connection->io)
{
tp_log_warning("Allocation failed", "CreateThreadpoolWait");
return -1;
}
}
connection->timer = CreateThreadpoolTimer(timer_callback, connection,
&connection->callback_environ);
if (!connection->timer)
{
tp_log_warning("Allocation failed", "CreateThreadpoolWait");
return -1;
}
return 0;
}
/*
Start asynchronous read
*/
int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
{
/* Start async read */
DWORD num_bytes = 0;
static char c;
WSABUF buf;
buf.buf= &c;
buf.len= 0;
DWORD flags=0;
DWORD last_error= 0;
int retval;
Vio *vio= connection->thd->net.vio;
if (vio->type == VIO_TYPE_SHARED_MEMORY)
{
SetThreadpoolWait(connection->shm_read, vio->event_server_wrote, NULL);
return 0;
}
if (vio->type == VIO_CLOSED)
{
return -1;
}
DBUG_ASSERT(vio->type == VIO_TYPE_TCPIP ||
vio->type == VIO_TYPE_SSL ||
vio->type == VIO_TYPE_NAMEDPIPE);
OVERLAPPED *overlapped= &connection->overlapped;
PTP_IO io= connection->io;
StartThreadpoolIo(io);
if (vio->type == VIO_TYPE_TCPIP || vio->type == VIO_TYPE_SSL)
{
/* Start async io (sockets). */
if (WSARecv(vio->sd , &buf, 1, &num_bytes, &flags,
overlapped, NULL) == 0)
{
retval= last_error= 0;
}
else
{
retval= -1;
last_error= WSAGetLastError();
}
}
else
{
/* Start async io (named pipe) */
if (ReadFile(vio->hPipe, &c, 0, &num_bytes ,overlapped))
{
retval= last_error= 0;
}
else
{
retval= -1;
last_error= GetLastError();
}
}
if (retval == 0 || last_error == ERROR_MORE_DATA)
{
/*
IO successfully finished (synchronously).
If skip_completion_port_on_success is set, we need to handle it right
here, because completion callback would not be executed by the pool.
*/
if(skip_completion_port_on_success)
{
CancelThreadpoolIo(io);
io_completion_callback(instance, connection, overlapped, last_error,
num_bytes, io);
}
return 0;
}
if(last_error == ERROR_IO_PENDING)
{
return 0;
}
/* Some error occured */
CancelThreadpoolIo(io);
return -1;
}
int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
{
if (threadpool_add_connection(connection->thd) == 0
&& init_io(connection, connection->thd) == 0
&& start_io(connection, instance) == 0)
{
return 0;
}
return -1;
}
/*
Recalculate wait timeout, maybe reset timer.
*/
void set_wait_timeout(connection_t *connection, ulonglong old_timeout)
{
ulonglong new_timeout = now() +
10000000LL*connection->thd->variables.net_wait_timeout;
if (new_timeout < old_timeout)
{
SetThreadpoolTimer(connection->timer, (PFILETIME) &new_timeout, 0, 1000);
}
connection->timeout = new_timeout;
}
/*
Terminates (idle) connection by closing the socket.
This will activate io_completion_callback() in a different thread
*/
void post_kill_notification(connection_t *connection)
{
check_thread_init();
THD *thd=connection->thd;
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->killed = KILL_CONNECTION;
vio_shutdown(thd->net.vio, SHUT_RDWR);
thd->mysys_var= NULL;
mysql_mutex_unlock(&thd->LOCK_thd_data);
}
/* Connection destructor */
void destroy_connection(connection_t *connection)
{
if (connection->thd)
{
threadpool_remove_connection(connection->thd);
}
if (connection->io)
{
WaitForThreadpoolIoCallbacks(connection->io, TRUE);
CloseThreadpoolIo(connection->io);
}
if(connection->shm_read)
{
WaitForThreadpoolWaitCallbacks(connection->shm_read, TRUE);
CloseThreadpoolWait(connection->shm_read);
}
if(connection->timer)
{
SetThreadpoolTimer(connection->timer, 0, 0, 0);
WaitForThreadpoolTimerCallbacks(connection->timer, TRUE);
CloseThreadpoolTimer(connection->timer);
}
DestroyThreadpoolEnvironment(&connection->callback_environ);
}
/*
This function should be called first whenever a callback is invoked in the
threadpool, does my_thread_init() if not yet done
*/
extern ulong thread_created;
static void check_thread_init()
{
if (FlsGetValue(fls) == NULL)
{
FlsSetValue(fls, (void *)1);
my_thread_init();
thread_created++;
InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads);
}
}
/*
Take care of proper cleanup when threadpool threads exit.
We do not control how threads are created, thus it is our responsibility to
check that my_thread_init() is called on thread initialization and
my_thread_end() on thread destruction. On Windows, FlsAlloc() provides the
thread destruction callbacks.
*/
static VOID WINAPI thread_destructor(void *data)
{
if(data)
{
if (InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads) >= 0)
{
/*
The above check for number of thread >= 0 is due to shutdown code (
see tp_end()) where we forcefully set num_worker_threads to 0, even
if not all threads have shut down yet to the point they would ran Fls
destructors, even after CloseThreadpool(). See also comment in tp_end().
*/
mysql_mutex_lock(&LOCK_thread_count);
my_thread_end();
mysql_mutex_unlock(&LOCK_thread_count);
}
}
}
/* Scheduler callback : init */
bool tp_init(void)
{
fls= FlsAlloc(thread_destructor);
pool= CreateThreadpool(NULL);
if(!pool)
{
sql_print_error("Can't create threadpool. "
"CreateThreadpool() failed with %d. Likely cause is memory pressure",
GetLastError());
exit(1);
}
if (threadpool_max_threads)
{
SetThreadpoolThreadMaximum(pool,threadpool_max_threads);
}
if (threadpool_min_threads)
{
if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads))
{
tp_log_warning( "Can't set threadpool minimum threads",
"SetThreadpoolThreadMinimum");
}
}
/*
Control stack size (OS must be Win7 or later, plus corresponding SDK)
*/
#if _MSC_VER >=1600
if (SetThreadpoolStackInformation)
{
TP_POOL_STACK_INFORMATION stackinfo;
stackinfo.StackCommit = 0;
stackinfo.StackReserve = my_thread_stack_size;
if (!SetThreadpoolStackInformation(pool, &stackinfo))
{
tp_log_warning("Can't set threadpool stack size",
"SetThreadpoolStackInformation");
}
}
#endif
skip_net_wait_timeout = 1;
return 0;
}
/*
Scheduler callback : Destroy the scheduler.
*/
extern "C" uint THR_thread_count;
extern "C" mysql_mutex_t THR_LOCK_threads;
extern "C" mysql_cond_t THR_COND_threads;
void tp_end(void)
{
if(pool)
{
SetThreadpoolThreadMaximum(pool, 0);
CloseThreadpool(pool);
/*
Tell my_global_thread_end() we're complete.
This would not be necessary if CloseThreadpool() would synchronously
release all threads and wait until they disappear and call all their FLS
destrructors . However, threads in the pool are released asynchronously
and might spend some time in the CRT shutdown code. Thus zero
num_worker_threads, to avoid thread destructor's my_thread_end()s after
this point.
*/
LONG remaining_threads=
InterlockedExchange( (volatile long *)&tp_stats.num_worker_threads, 0);
if (remaining_threads)
{
mysql_mutex_lock(&THR_LOCK_threads);
THR_thread_count -= remaining_threads;
mysql_cond_signal(&THR_COND_threads);
mysql_mutex_unlock(&THR_LOCK_threads);
}
}
skip_net_wait_timeout= 0;
}
/*
Notify pool about connection being killed.
*/
void tp_post_kill_notification(THD *thd)
{
if (current_thd == thd)
return; /* There is nothing to do.*/
if (thd->system_thread)
return; /* Will crash if we attempt to kill system thread. */
Vio *vio= thd->net.vio;
vio_shutdown(vio, SD_BOTH);
}
/*
Handle read completion/notification.
*/
static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io)
{
if(instance)
{
check_thread_init();
}
connection_t *connection = (connection_t*)context;
THD *thd= connection->thd;
ulonglong old_timeout = connection->timeout;
connection->timeout = ULONGLONG_MAX;
if (threadpool_process_request(connection->thd))
goto error;
set_wait_timeout(connection, old_timeout);
if(start_io(connection, instance))
goto error;
return;
error:
/* Some error has occured. */
if (instance)
DisassociateCurrentThreadFromCallback(instance);
destroy_connection(connection);
my_free(connection);
}
/* Simple callback for login */
static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PTP_WORK work)
{
if(instance)
{
check_thread_init();
}
connection_t *connection =(connection_t *)context;
if (login(connection, instance) != 0)
{
destroy_connection(connection);
my_free(connection);
}
}
/*
Timer callback.
Invoked when connection times out (wait_timeout)
*/
static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
PVOID parameter, PTP_TIMER timer)
{
check_thread_init();
connection_t *con= (connection_t*)parameter;
ulonglong timeout= con->timeout;
if (timeout <= now())
{
con->thd->killed = KILL_CONNECTION;
if(con->thd->net.vio)
vio_shutdown(con->thd->net.vio, SD_BOTH);
}
else if(timeout != ULONGLONG_MAX)
{
/*
Reset timer.
There is a tiny possibility of a race condition, since the value of timeout
could have changed to smaller value in the thread doing io callback.
Given the relative unimportance of the wait timeout, we accept race
condition.
*/
SetThreadpoolTimer(timer, (PFILETIME)&timeout, 0, 1000);
}
}
/*
Shared memory read callback.
Invoked when read event is set on connection.
*/
static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PTP_WAIT wait,TP_WAIT_RESULT wait_result)
{
connection_t *con= (connection_t *)context;
/* Disarm wait. */
SetThreadpoolWait(wait, NULL, NULL);
/*
This is an autoreset event, and one wakeup is eaten already by threadpool,
and the current state is "not set". Thus we need to reset the event again,
or vio_read will hang.
*/
HANDLE h = con->thd->net.vio->event_server_wrote;
SetEvent(h);
io_completion_callback(instance, context, NULL, 0, 0 , 0);
}
/*
Notify the thread pool about a new connection.
NOTE: LOCK_thread_count is locked on entry. This function must unlock it.
*/
void tp_add_connection(THD *thd)
{
bool success = false;
connection_t *con = (connection_t *)my_malloc(sizeof(connection_t), 0);
if (con)
threads.append(thd);
mysql_mutex_unlock(&LOCK_thread_count);
if(!con)
{
tp_log_warning("Allocation failed", "tp_add_connection");
return;
}
init_connection(con);
con->thd= thd;
/* Try to login asynchronously, using threads in the pool */
PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ);
if (wrk)
{
SubmitThreadpoolWork(wrk);
CloseThreadpoolWork(wrk);
}
else
{
/* Likely memory pressure */
login_callback(NULL, con, NULL); /* deletes connection if something goes wrong */
}
}
/*
Sets the number of idle threads the thread pool maintains in anticipation of new
requests.
*/
void tp_set_min_threads(uint val)
{
if (pool)
SetThreadpoolThreadMinimum(pool, val);
}
void tp_set_max_threads(uint val)
{
if (pool)
SetThreadpoolThreadMaximum(pool, val);
}
void tp_wait_begin(THD *thd, int type)
{
if (thd && thd->event_scheduler.data)
{
/* TODO: call CallbackMayRunLong() */
}
}
void tp_wait_end(THD *thd)
{
/* Do we need to do anything ? */
}
......@@ -49,6 +49,25 @@ static my_bool has_no_data(Vio *vio __attribute__((unused)))
return FALSE;
}
#ifdef _WIN32
my_bool vio_shared_memory_has_data(Vio *vio)
{
return (vio->shared_memory_remain > 0);
}
int vio_shared_memory_shutdown(Vio *vio, int how)
{
SetEvent(vio->event_conn_closed);
SetEvent(vio->event_server_wrote);
return 0;
}
int vio_pipe_shutdown(Vio *vio, int how)
{
return vio_socket_shutdown(vio, how); /* cancels io */
}
#endif
/*
* Helper to fill most of the Vio* with defaults.
*/
......@@ -89,6 +108,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->poll_read =no_poll_read;
vio->is_connected =vio_is_connected_pipe;
vio->has_data =has_no_data;
vio->shutdown =vio_pipe_shutdown;
vio->timeout=vio_win32_timeout;
/* Set default timeout */
......@@ -116,7 +136,8 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->poll_read =no_poll_read;
vio->is_connected =vio_is_connected_shared_memory;
vio->has_data =has_no_data;
vio->has_data =vio_shared_memory_has_data;
vio->shutdown =vio_shared_memory_shutdown;
/* Currently, shared memory is on Windows only, hence the below is ok*/
vio->timeout= vio_win32_timeout;
......@@ -145,6 +166,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->poll_read =vio_poll_read;
vio->is_connected =vio_is_connected;
vio->has_data =vio_ssl_has_data;
vio->shutdown =vio_socket_shutdown;
DBUG_VOID_RETURN;
}
#endif /* HAVE_OPENSSL */
......@@ -163,6 +185,7 @@ static void vio_init(Vio* vio, enum enum_vio_type type,
vio->timeout =vio_timeout;
vio->poll_read =vio_poll_read;
vio->is_connected =vio_is_connected;
vio->shutdown =vio_socket_shutdown;
vio->has_data= (flags & VIO_BUFFERED_READ) ?
vio_buff_has_data : has_no_data;
DBUG_VOID_RETURN;
......
......@@ -39,6 +39,7 @@ size_t vio_read_pipe(Vio *vio, uchar * buf, size_t size);
size_t vio_write_pipe(Vio *vio, const uchar * buf, size_t size);
my_bool vio_is_connected_pipe(Vio *vio);
int vio_close_pipe(Vio * vio);
int vio_shutdown_pipe(Vio *vio,int how);
#endif
#ifdef HAVE_SMEM
......@@ -46,8 +47,11 @@ size_t vio_read_shared_memory(Vio *vio, uchar * buf, size_t size);
size_t vio_write_shared_memory(Vio *vio, const uchar * buf, size_t size);
my_bool vio_is_connected_shared_memory(Vio *vio);
int vio_close_shared_memory(Vio * vio);
my_bool vio_shared_memory_has_data(Vio *vio);
int vio_shutdown_shared_memory(Vio *vio, int how);
#endif
int vio_socket_shutdown(Vio *vio, int how);
void vio_timeout(Vio *vio,uint which, uint timeout);
my_bool vio_buff_has_data(Vio *vio);
......
......@@ -131,6 +131,60 @@ size_t vio_write(Vio * vio, const uchar* buf, size_t size)
DBUG_RETURN(r);
}
#ifdef _WIN32
static void CALLBACK cancel_io_apc(ULONG_PTR data)
{
CancelIo((HANDLE)data);
}
/*
Cancel IO on Windows.
On XP, issue CancelIo as asynchronous procedure call to the thread that started
IO. On Vista+, simpler cancelation is done with CancelIoEx.
*/
static int cancel_io(HANDLE handle, DWORD thread_id)
{
static BOOL (WINAPI *fp_CancelIoEx) (HANDLE, OVERLAPPED *);
static volatile int first_time= 1;
int rc;
HANDLE thread_handle;
if (first_time)
{
/* Try to load CancelIoEx using GetProcAddress */
InterlockedCompareExchangePointer((volatile void *)&fp_CancelIoEx,
GetProcAddress(GetModuleHandle("kernel32"), "CancelIoEx"), NULL);
first_time =0;
}
if (fp_CancelIoEx)
{
return fp_CancelIoEx(handle, NULL)? 0 :-1;
}
thread_handle= OpenThread(THREAD_SET_CONTEXT, FALSE, thread_id);
if (thread_handle)
{
rc= QueueUserAPC(cancel_io_apc, thread_handle, (ULONG_PTR)handle);
CloseHandle(thread_handle);
}
return rc;
}
#endif
int vio_socket_shutdown(Vio *vio, int how)
{
#ifdef _WIN32
return cancel_io((HANDLE)vio->sd, vio->thread_id);
#else
return shutdown(vio->sd, how);
#endif
}
int vio_blocking(Vio * vio __attribute__((unused)), my_bool set_blocking_mode,
my_bool *old_mode)
{
......@@ -726,6 +780,22 @@ void vio_timeout(Vio *vio, uint which, uint timeout)
#ifdef __WIN__
/*
Disable posting IO completion event to the port.
In some cases (synchronous timed IO) we want to skip IOCP notifications.
*/
static void disable_iocp_notification(OVERLAPPED *overlapped)
{
HANDLE *handle = &(overlapped->hEvent);
*handle = ((HANDLE)((ULONG_PTR) *handle|1));
}
/* Enable posting IO completion event to the port */
static void enable_iocp_notification(OVERLAPPED *overlapped)
{
HANDLE *handle = &(overlapped->hEvent);
*handle = (HANDLE)((ULONG_PTR) *handle & ~1);
}
/*
Finish pending IO on pipe. Honor wait timeout
......@@ -737,7 +807,7 @@ static size_t pipe_complete_io(Vio* vio, char* buf, size_t size, DWORD timeout_m
DBUG_ENTER("pipe_complete_io");
ret= WaitForSingleObject(vio->pipe_overlapped.hEvent, timeout_ms);
ret= WaitForSingleObjectEx(vio->pipe_overlapped.hEvent, timeout_ms, TRUE);
/*
WaitForSingleObjects will normally return WAIT_OBJECT_O (success, IO completed)
or WAIT_TIMEOUT.
......@@ -768,6 +838,7 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size)
DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf,
(uint) size));
disable_iocp_notification(&vio->pipe_overlapped);
if (ReadFile(vio->hPipe, buf, (DWORD)size, &bytes_read,
&(vio->pipe_overlapped)))
{
......@@ -777,13 +848,14 @@ size_t vio_read_pipe(Vio * vio, uchar *buf, size_t size)
{
if (GetLastError() != ERROR_IO_PENDING)
{
enable_iocp_notification(&vio->pipe_overlapped);
DBUG_PRINT("error",("ReadFile() returned last error %d",
GetLastError()));
DBUG_RETURN((size_t)-1);
}
retval= pipe_complete_io(vio, buf, size,vio->read_timeout_ms);
}
enable_iocp_notification(&vio->pipe_overlapped);
DBUG_PRINT("exit", ("%lld", (longlong)retval));
DBUG_RETURN(retval);
}
......@@ -796,7 +868,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size)
DBUG_ENTER("vio_write_pipe");
DBUG_PRINT("enter", ("sd: %d buf: 0x%lx size: %u", vio->sd, (long) buf,
(uint) size));
disable_iocp_notification(&vio->pipe_overlapped);
if (WriteFile(vio->hPipe, buf, (DWORD)size, &bytes_written,
&(vio->pipe_overlapped)))
{
......@@ -804,6 +876,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size)
}
else
{
enable_iocp_notification(&vio->pipe_overlapped);
if (GetLastError() != ERROR_IO_PENDING)
{
DBUG_PRINT("vio_error",("WriteFile() returned last error %d",
......@@ -812,7 +885,7 @@ size_t vio_write_pipe(Vio * vio, const uchar* buf, size_t size)
}
retval= pipe_complete_io(vio, (char *)buf, size, vio->write_timeout_ms);
}
enable_iocp_notification(&vio->pipe_overlapped);
DBUG_PRINT("exit", ("%lld", (longlong)retval));
DBUG_RETURN(retval);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment