Commit 6e2a1982 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

Threadpool -address review comments

parent 73e8e025
...@@ -9,30 +9,28 @@ extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall c ...@@ -9,30 +9,28 @@ extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall c
extern uint threadpool_max_threads; /* Maximum threads in pool */ extern uint threadpool_max_threads; /* Maximum threads in pool */
extern uint threadpool_oversubscribe; /* Maximum active threads in group */ extern uint threadpool_oversubscribe; /* Maximum active threads in group */
/*
Functions used by scheduler.
OS-specific implementations are in
threadpool_unix.cc or threadpool_win.cc
*/
extern bool tp_init();
extern void tp_add_connection(THD*);
extern void tp_wait_begin(THD *, int);
extern void tp_wait_end(THD*);
extern void tp_post_kill_notification(THD *thd);
extern void tp_end(void);
/* /*
Threadpool statistics Threadpool statistics
*/ */
struct TP_STATISTICS struct TP_STATISTICS
{ {
/* Current number of worker thread. */ /* Current number of worker thread. */
volatile int num_worker_threads; volatile int32 num_worker_threads;
/* Current number of idle threads. */ /* Current number of idle threads. */
volatile int num_waiting_threads; volatile int32 num_waiting_threads;
/* Number of login requests are queued but not yet processed. */
volatile int pending_login_requests;
/* Number of threads that are starting. */
volatile int pending_thread_starts;
/* Number of threads that are being shut down */
volatile int pending_thread_shutdowns;
/* Time (in milliseconds) since pool is blocked (num_waiting_threads is 0) */
ulonglong pool_block_duration;
/* Maximum duration of the pending login, im milliseconds. */
ulonglong pending_login_duration;
/* Time since last thread was created */
ulonglong time_since_last_thread_creation;
/* Number of requests processed since pool monitor run last time. */
volatile int requests_dequeued;
volatile int requests_completed;
}; };
extern TP_STATISTICS tp_stats; extern TP_STATISTICS tp_stats;
......
...@@ -7,15 +7,9 @@ ...@@ -7,15 +7,9 @@
#include <sql_connect.h> #include <sql_connect.h>
#include <sql_audit.h> #include <sql_audit.h>
#include <debug_sync.h> #include <debug_sync.h>
#include <threadpool.h>
extern bool login_connection(THD *thd);
extern bool do_command(THD *thd);
extern void prepare_new_connection_state(THD* thd);
extern void end_connection(THD *thd);
extern void thd_cleanup(THD *thd);
extern void delete_thd(THD *thd);
/* Threadpool parameters */ /* Threadpool parameters */
uint threadpool_min_threads; uint threadpool_min_threads;
...@@ -27,14 +21,15 @@ uint threadpool_oversubscribe; ...@@ -27,14 +21,15 @@ uint threadpool_oversubscribe;
extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys); extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
extern bool do_command(THD*);
/* /*
Worker threads contexts, and THD contexts. Worker threads contexts, and THD contexts.
===================================== =========================================
Both worker threads and connections have their sets of thread local variables Both worker threads and connections have their sets of thread local variables
At the moment it is mysys_var (which has e.g dbug my_error and similar At the moment it is mysys_var (this has specific data for dbug, my_error and
goodies inside), and PSI per-client structure. similar goodies), and PSI per-client structure.
Whenever query is executed following needs to be done: Whenever query is executed following needs to be done:
...@@ -77,7 +72,7 @@ struct Worker_thread_context ...@@ -77,7 +72,7 @@ struct Worker_thread_context
/* /*
Attach/associate the connection with the OS thread, Attach/associate the connection with the OS thread,
*/ */
static inline bool thread_attach(THD* thd) static bool thread_attach(THD* thd)
{ {
pthread_setspecific(THR_KEY_mysys,thd->mysys_var); pthread_setspecific(THR_KEY_mysys,thd->mysys_var);
thd->thread_stack=(char*)&thd; thd->thread_stack=(char*)&thd;
...@@ -95,11 +90,10 @@ int threadpool_add_connection(THD *thd) ...@@ -95,11 +90,10 @@ int threadpool_add_connection(THD *thd)
worker_context.save(); worker_context.save();
/* /*
Create a new connection context: mysys_thread_var and PSI thread Create a new connection context: mysys_thread_var and PSI thread
Store them in thd->mysys_var and thd->scheduler.m_psi. Store them in THD.
*/ */
/* Use my_thread_init() to create new mysys_thread_var. */
pthread_setspecific(THR_KEY_mysys, 0); pthread_setspecific(THR_KEY_mysys, 0);
my_thread_init(); my_thread_init();
thd->mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys); thd->mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
...@@ -125,21 +119,29 @@ int threadpool_add_connection(THD *thd) ...@@ -125,21 +119,29 @@ int threadpool_add_connection(THD *thd)
thd->start_utime= now; thd->start_utime= now;
thd->thr_create_utime= now; thd->thr_create_utime= now;
if (setup_connection_thread_globals(thd) == 0) if (!setup_connection_thread_globals(thd))
{ {
if (login_connection(thd) == 0) if (!login_connection(thd))
{ {
prepare_new_connection_state(thd); prepare_new_connection_state(thd);
retval = thd_is_connection_alive(thd)?0:-1;
thd->net.reading_or_writing= 1; /*
Check if THD is ok, as prepare_new_connection_state()
can fail, for example if init command failed.
*/
if (thd_is_connection_alive(thd))
{
retval= 0;
thd->net.reading_or_writing= 1;
thd->skip_wait_timeout= true;
}
} }
} }
thd->skip_wait_timeout= true;
worker_context.restore(); worker_context.restore();
return retval; return retval;
} }
void threadpool_remove_connection(THD *thd) void threadpool_remove_connection(THD *thd)
{ {
...@@ -147,9 +149,7 @@ void threadpool_remove_connection(THD *thd) ...@@ -147,9 +149,7 @@ void threadpool_remove_connection(THD *thd)
worker_context.save(); worker_context.save();
thread_attach(thd); thread_attach(thd);
thd->killed= KILL_CONNECTION; thd->killed= KILL_CONNECTION;
thd->net.reading_or_writing= 0; thd->net.reading_or_writing= 0;
end_connection(thd); end_connection(thd);
...@@ -163,11 +163,13 @@ void threadpool_remove_connection(THD *thd) ...@@ -163,11 +163,13 @@ void threadpool_remove_connection(THD *thd)
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_thread_count); mysql_cond_broadcast(&COND_thread_count);
/* Free resources (thread_var and PSI connection specific struct)*/ /*
Free resources associated with this connection:
mysys thread_var and PSI thread.
*/
my_thread_end(); my_thread_end();
worker_context.restore(); worker_context.restore();
} }
int threadpool_process_request(THD *thd) int threadpool_process_request(THD *thd)
...@@ -181,8 +183,8 @@ int threadpool_process_request(THD *thd) ...@@ -181,8 +183,8 @@ int threadpool_process_request(THD *thd)
if (thd->killed >= KILL_CONNECTION) if (thd->killed >= KILL_CONNECTION)
{ {
/* /*
kill flag can be set have been killed by killed flag was set by timeout handler
timeout handler or by a KILL command or KILL command. Return error.
*/ */
worker_context.restore(); worker_context.restore();
return 1; return 1;
...@@ -206,33 +208,18 @@ int threadpool_process_request(THD *thd) ...@@ -206,33 +208,18 @@ int threadpool_process_request(THD *thd)
vio= thd->net.vio; vio= thd->net.vio;
if (!vio->has_data(vio)) if (!vio->has_data(vio))
{ {
/* /* More info on this debug sync is in sql_parse.cc*/
More info on this debug sync is in sql_parse.cc
*/
DEBUG_SYNC(thd, "before_do_command_net_read"); DEBUG_SYNC(thd, "before_do_command_net_read");
thd->net.reading_or_writing= 1;
break; break;
} }
} }
if (!retval)
thd->net.reading_or_writing= 1;
worker_context.restore(); worker_context.restore();
return retval; return retval;
} }
/*
Scheduler struct, individual functions are implemented
in threadpool_unix.cc or threadpool_win.cc
*/
extern bool tp_init();
extern void tp_add_connection(THD*);
extern void tp_wait_begin(THD *, int);
extern void tp_wait_end(THD*);
extern void tp_post_kill_notification(THD *thd);
extern void tp_end(void);
static scheduler_functions tp_scheduler_functions= static scheduler_functions tp_scheduler_functions=
{ {
0, // max_threads 0, // max_threads
...@@ -255,7 +242,7 @@ void pool_of_threads_scheduler(struct scheduler_functions *func, ...@@ -255,7 +242,7 @@ void pool_of_threads_scheduler(struct scheduler_functions *func,
uint *arg_connection_count) uint *arg_connection_count)
{ {
*func = tp_scheduler_functions; *func = tp_scheduler_functions;
func->max_threads= *arg_max_connections + 1; func->max_threads= threadpool_max_threads;
func->max_connections= arg_max_connections; func->max_connections= arg_max_connections;
func->connection_count= arg_connection_count; func->connection_count= arg_connection_count;
scheduler_init(); scheduler_init();
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment