Commit cfa56f90 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

address second round review comments

parent 8877fe73
...@@ -483,7 +483,7 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, ...@@ -483,7 +483,7 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
struct timespec wait_timeout; struct timespec wait_timeout;
enum enum_thr_lock_result result= THR_LOCK_ABORTED; enum enum_thr_lock_result result= THR_LOCK_ABORTED;
const char *old_proc_info; const char *old_proc_info;
my_bool use_wait_callbacks; my_bool use_wait_callbacks= FALSE;
DBUG_ENTER("wait_for_lock"); DBUG_ENTER("wait_for_lock");
/* /*
...@@ -540,8 +540,6 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, ...@@ -540,8 +540,6 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
use_wait_callbacks= TRUE; use_wait_callbacks= TRUE;
(*before_lock_wait)(); (*before_lock_wait)();
} }
else
use_wait_callbacks= FALSE;
set_timespec(wait_timeout, lock_wait_timeout); set_timespec(wait_timeout, lock_wait_timeout);
while (!thread_var->abort || in_wait_list) while (!thread_var->abort || in_wait_list)
......
...@@ -48,26 +48,26 @@ extern "C" ...@@ -48,26 +48,26 @@ extern "C"
{ {
static void scheduler_wait_lock_begin(void) { static void scheduler_wait_lock_begin(void) {
THD *thd=current_thd; THD *thd=current_thd;
scheduler_functions *func= thd->scheduler; if(thd)
MYSQL_CALLBACK(func, thd_wait_begin, (thd, THD_WAIT_TABLE_LOCK)); MYSQL_CALLBACK(thd->scheduler, thd_wait_begin, (thd, THD_WAIT_TABLE_LOCK));
} }
static void scheduler_wait_lock_end(void) { static void scheduler_wait_lock_end(void) {
THD *thd=current_thd; THD *thd=current_thd;
scheduler_functions *func= thd->scheduler; if(thd)
MYSQL_CALLBACK(func, thd_wait_end, (thd)); MYSQL_CALLBACK(thd->scheduler, thd_wait_end, (thd));
} }
static void scheduler_wait_sync_begin(void) { static void scheduler_wait_sync_begin(void) {
THD *thd=current_thd; THD *thd=current_thd;
scheduler_functions *func= thd ? thd->scheduler : thread_scheduler; if(thd)
MYSQL_CALLBACK(func, thd_wait_begin, (thd, THD_WAIT_TABLE_LOCK)); MYSQL_CALLBACK(thd->scheduler, thd_wait_begin, (thd, THD_WAIT_TABLE_LOCK));
} }
static void scheduler_wait_sync_end(void) { static void scheduler_wait_sync_end(void) {
THD *thd=current_thd; THD *thd=current_thd;
scheduler_functions *func= thd ? thd->scheduler : thread_scheduler; if(thd)
MYSQL_CALLBACK(func, thd_wait_end, (thd)); MYSQL_CALLBACK(thd->scheduler, thd_wait_end, (thd));
} }
}; };
/**@}*/ /**@}*/
......
...@@ -76,6 +76,7 @@ void one_thread_per_connection_scheduler(scheduler_functions *func, ...@@ -76,6 +76,7 @@ void one_thread_per_connection_scheduler(scheduler_functions *func,
ulong *arg_max_connections, uint *arg_connection_count); ulong *arg_max_connections, uint *arg_connection_count);
void one_thread_scheduler(scheduler_functions *func); void one_thread_scheduler(scheduler_functions *func);
extern void scheduler_init();
/* /*
To be used for pool-of-threads (implemeneted differently on various OSs) To be used for pool-of-threads (implemeneted differently on various OSs)
......
...@@ -1523,7 +1523,7 @@ void THD::awake(killed_state state_to_set) ...@@ -1523,7 +1523,7 @@ void THD::awake(killed_state state_to_set)
if (state_to_set >= KILL_CONNECTION || state_to_set == NOT_KILLED) if (state_to_set >= KILL_CONNECTION || state_to_set == NOT_KILLED)
{ {
#ifdef SIGNAL_WITH_VIO_CLOSE #ifdef SIGNAL_WITH_VIO_CLOSE
if (this != current_thd) if (this != current_thd)
{ {
if(active_vio) if(active_vio)
vio_shutdown(active_vio, SHUT_RDWR); vio_shutdown(active_vio, SHUT_RDWR);
...@@ -3935,10 +3935,16 @@ extern "C" bool thd_sqlcom_can_generate_row_events(const MYSQL_THD thd) ...@@ -3935,10 +3935,16 @@ extern "C" bool thd_sqlcom_can_generate_row_events(const MYSQL_THD thd)
*/ */
extern "C" void thd_wait_begin(MYSQL_THD thd, int wait_type) extern "C" void thd_wait_begin(MYSQL_THD thd, int wait_type)
{ {
if(!thd)
thd= current_thd; if (unlikely(!thread_scheduler) || !thread_scheduler->thd_wait_begin)
if (thd) return;
MYSQL_CALLBACK(thd->scheduler, thd_wait_begin, (thd, wait_type)); if (thd == NULL)
{
thd=current_thd;
if(unlikely(thd == NULL))
return;
}
MYSQL_CALLBACK(thd->scheduler, thd_wait_begin, (thd, wait_type));
} }
/** /**
...@@ -3950,10 +3956,16 @@ extern "C" void thd_wait_begin(MYSQL_THD thd, int wait_type) ...@@ -3950,10 +3956,16 @@ extern "C" void thd_wait_begin(MYSQL_THD thd, int wait_type)
*/ */
extern "C" void thd_wait_end(MYSQL_THD thd) extern "C" void thd_wait_end(MYSQL_THD thd)
{ {
if(!thd) if (unlikely(!thread_scheduler) || ! thread_scheduler->thd_wait_begin)
thd= current_thd; return;
if (thd) if (thd == NULL)
MYSQL_CALLBACK(thd->scheduler, thd_wait_end, (thd)); {
thd=current_thd;
if(unlikely(thd == NULL))
return;
}
if(likely(thd->scheduler == thread_scheduler))
thread_scheduler->thd_wait_end(thd);
} }
#endif // INNODB_COMPATIBILITY_HOOKS */ #endif // INNODB_COMPATIBILITY_HOOKS */
......
...@@ -4278,6 +4278,8 @@ inline int handler::ha_update_tmp_row(const uchar *old_data, uchar *new_data) ...@@ -4278,6 +4278,8 @@ inline int handler::ha_update_tmp_row(const uchar *old_data, uchar *new_data)
return error; return error;
} }
extern pthread_attr_t *get_connection_attrib(void);
#endif /* MYSQL_SERVER */ #endif /* MYSQL_SERVER */
#endif /* SQL_CLASS_INCLUDED */ #endif /* SQL_CLASS_INCLUDED */
...@@ -46,11 +46,10 @@ extern TP_STATISTICS tp_stats; ...@@ -46,11 +46,10 @@ extern TP_STATISTICS tp_stats;
/* Functions to set threadpool parameters */ /* Functions to set threadpool parameters */
extern void tp_set_min_threads(uint val); extern void tp_set_min_threads(uint val);
extern void tp_set_max_threads(uint val); extern void tp_set_max_threads(uint val);
extern int tp_set_threadpool_size(uint val); extern void tp_set_threadpool_size(uint val);
extern void tp_set_threadpool_stall_limit(uint val); extern void tp_set_threadpool_stall_limit(uint val);
/* Activate threadpool scheduler */ /* Activate threadpool scheduler */
extern void tp_scheduler(void); extern void tp_scheduler(void);
extern int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff); extern int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff);
...@@ -136,7 +136,7 @@ int threadpool_add_connection(THD *thd) ...@@ -136,7 +136,7 @@ int threadpool_add_connection(THD *thd)
{ {
retval= 0; retval= 0;
thd->net.reading_or_writing= 1; thd->net.reading_or_writing= 1;
thd->skip_wait_timeout= true; thd->skip_wait_timeout= true;
} }
} }
} }
...@@ -185,10 +185,11 @@ int threadpool_process_request(THD *thd) ...@@ -185,10 +185,11 @@ int threadpool_process_request(THD *thd)
killed flag was set by timeout handler killed flag was set by timeout handler
or KILL command. Return error. or KILL command. Return error.
*/ */
worker_context.restore(); retval= 1;
return 1; goto end;
} }
for(;;) for(;;)
{ {
Vio *vio; Vio *vio;
...@@ -196,12 +197,12 @@ int threadpool_process_request(THD *thd) ...@@ -196,12 +197,12 @@ int threadpool_process_request(THD *thd)
mysql_audit_release(thd); mysql_audit_release(thd);
if ((retval= do_command(thd)) != 0) if ((retval= do_command(thd)) != 0)
break ; goto end;
if (!thd_is_connection_alive(thd)) if (!thd_is_connection_alive(thd))
{ {
retval= 1; retval= 1;
break; goto end;
} }
vio= thd->net.vio; vio= thd->net.vio;
...@@ -210,10 +211,11 @@ int threadpool_process_request(THD *thd) ...@@ -210,10 +211,11 @@ int threadpool_process_request(THD *thd)
/* More info on this debug sync is in sql_parse.cc*/ /* More info on this debug sync is in sql_parse.cc*/
DEBUG_SYNC(thd, "before_do_command_net_read"); DEBUG_SYNC(thd, "before_do_command_net_read");
thd->net.reading_or_writing= 1; thd->net.reading_or_writing= 1;
break; goto end;
} }
} }
end:
worker_context.restore(); worker_context.restore();
return retval; return retval;
} }
...@@ -234,8 +236,6 @@ static scheduler_functions tp_scheduler_functions= ...@@ -234,8 +236,6 @@ static scheduler_functions tp_scheduler_functions=
tp_end // end tp_end // end
}; };
extern void scheduler_init();
void pool_of_threads_scheduler(struct scheduler_functions *func, void pool_of_threads_scheduler(struct scheduler_functions *func,
ulong *arg_max_connections, ulong *arg_max_connections,
uint *arg_connection_count) uint *arg_connection_count)
......
...@@ -24,6 +24,11 @@ typedef struct kevent native_event; ...@@ -24,6 +24,11 @@ typedef struct kevent native_event;
typedef port_event_t native_event; typedef port_event_t native_event;
#endif #endif
/** Maximum number of native events a listener can read in one go */
#define MAX_EVENTS 1024
/** Indicates that threadpool was initialized*/
static bool threadpool_started= false;
/* /*
Define PSI Keys for performance schema. Define PSI Keys for performance schema.
...@@ -130,7 +135,7 @@ static uint group_count; ...@@ -130,7 +135,7 @@ static uint group_count;
Used for printing "pool blocked" message, see Used for printing "pool blocked" message, see
print_pool_blocked_message(); print_pool_blocked_message();
*/ */
static time_t pool_block_start; static ulonglong pool_block_start;
/* Global timer for all groups */ /* Global timer for all groups */
struct pool_timer_t struct pool_timer_t
...@@ -145,11 +150,6 @@ struct pool_timer_t ...@@ -145,11 +150,6 @@ struct pool_timer_t
static pool_timer_t pool_timer; static pool_timer_t pool_timer;
/* Externals functions and variables we use */
extern void scheduler_init();
extern pthread_attr_t *get_connection_attrib(void);
static void queue_put(thread_group_t *thread_group, connection_t *connection); static void queue_put(thread_group_t *thread_group, connection_t *connection);
static int wake_thread(thread_group_t *thread_group); static int wake_thread(thread_group_t *thread_group);
static void handle_event(connection_t *connection); static void handle_event(connection_t *connection);
...@@ -462,6 +462,9 @@ static void timeout_check(pool_timer_t *timer) ...@@ -462,6 +462,9 @@ static void timeout_check(pool_timer_t *timer)
Besides checking for stalls, timer thread is also responsible for terminating Besides checking for stalls, timer thread is also responsible for terminating
clients that have been idle for longer than wait_timeout seconds. clients that have been idle for longer than wait_timeout seconds.
TODO: Let the timer sleep for long time if there is no work to be done.
Currently it wakes up rather often on and idle server.
*/ */
static void* timer_thread(void *param) static void* timer_thread(void *param)
...@@ -491,7 +494,7 @@ static void* timer_thread(void *param) ...@@ -491,7 +494,7 @@ static void* timer_thread(void *param)
{ {
timer->current_microtime= microsecond_interval_timer(); timer->current_microtime= microsecond_interval_timer();
/* Check stallls in thread groups */ /* Check stalls in thread groups */
for(i=0; i< array_elements(all_groups);i++) for(i=0; i< array_elements(all_groups);i++)
{ {
if(all_groups[i].connection_count) if(all_groups[i].connection_count)
...@@ -574,7 +577,6 @@ static void stop_timer(pool_timer_t *timer) ...@@ -574,7 +577,6 @@ static void stop_timer(pool_timer_t *timer)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
#define MAX_EVENTS 1024
/** /**
Poll for socket events and distribute them to worker threads Poll for socket events and distribute them to worker threads
...@@ -586,10 +588,8 @@ static connection_t * listener(worker_thread_t *current_thread, ...@@ -586,10 +588,8 @@ static connection_t * listener(worker_thread_t *current_thread,
thread_group_t *thread_group) thread_group_t *thread_group)
{ {
DBUG_ENTER("listener"); DBUG_ENTER("listener");
connection_t *retval= NULL; connection_t *retval= NULL;
for(;;) for(;;)
{ {
native_event ev[MAX_EVENTS]; native_event ev[MAX_EVENTS];
...@@ -767,7 +767,6 @@ static int create_worker(thread_group_t *thread_group) ...@@ -767,7 +767,6 @@ static int create_worker(thread_group_t *thread_group)
{ {
my_errno= errno; my_errno= errno;
} }
end: end:
if (err) if (err)
...@@ -897,11 +896,10 @@ static int wake_thread(thread_group_t *thread_group) ...@@ -897,11 +896,10 @@ static int wake_thread(thread_group_t *thread_group)
{ {
thread->woken= true; thread->woken= true;
thread_group->waiting_threads.remove(thread); thread_group->waiting_threads.remove(thread);
if (mysql_cond_signal(&thread->cond)) mysql_cond_signal(&thread->cond);
abort();
DBUG_RETURN(0); DBUG_RETURN(0);
} }
DBUG_RETURN(-1); /* no thread- missed wakeup*/ DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
} }
...@@ -1188,7 +1186,7 @@ void tp_add_connection(THD *thd) ...@@ -1188,7 +1186,7 @@ void tp_add_connection(THD *thd)
/* Assign connection to a group. */ /* Assign connection to a group. */
thread_group_t *group= thread_group_t *group=
&all_groups[connection->thd->thread_id%group_count]; &all_groups[thd->thread_id%group_count];
connection->thread_group=group; connection->thread_group=group;
...@@ -1416,12 +1414,13 @@ static void handle_event(connection_t *connection) ...@@ -1416,12 +1414,13 @@ static void handle_event(connection_t *connection)
err= threadpool_process_request(connection->thd); err= threadpool_process_request(connection->thd);
} }
if (!err) if(err)
{ goto end;
set_wait_timeout(connection);
err= start_io(connection); set_wait_timeout(connection);
} err= start_io(connection);
end:
if (err) if (err)
connection_abort(connection); connection_abort(connection);
...@@ -1481,11 +1480,10 @@ static void *worker_main(void *param) ...@@ -1481,11 +1480,10 @@ static void *worker_main(void *param)
} }
static bool started=false;
bool tp_init() bool tp_init()
{ {
DBUG_ENTER("tp_init"); DBUG_ENTER("tp_init");
started = true; threadpool_started= true;
scheduler_init(); scheduler_init();
for(uint i=0; i < array_elements(all_groups); i++) for(uint i=0; i < array_elements(all_groups); i++)
...@@ -1493,7 +1491,12 @@ bool tp_init() ...@@ -1493,7 +1491,12 @@ bool tp_init()
thread_group_init(&all_groups[i], get_connection_attrib()); thread_group_init(&all_groups[i], get_connection_attrib());
} }
tp_set_threadpool_size(threadpool_size); tp_set_threadpool_size(threadpool_size);
if(group_count == 0)
{
/* Something went wrong */
sql_print_error("Can't set threadpool size to %d",threadpool_size);
DBUG_RETURN(1);
}
PSI_register(mutex); PSI_register(mutex);
PSI_register(cond); PSI_register(cond);
PSI_register(thread); PSI_register(thread);
...@@ -1508,7 +1511,7 @@ void tp_end() ...@@ -1508,7 +1511,7 @@ void tp_end()
{ {
DBUG_ENTER("tp_end"); DBUG_ENTER("tp_end");
if (!started) if (!threadpool_started)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
stop_timer(&pool_timer); stop_timer(&pool_timer);
...@@ -1516,18 +1519,18 @@ void tp_end() ...@@ -1516,18 +1519,18 @@ void tp_end()
{ {
thread_group_close(&all_groups[i]); thread_group_close(&all_groups[i]);
} }
started= false; threadpool_started= false;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/** Ensure that poll descriptors are created when threadpool_size changes */ /** Ensure that poll descriptors are created when threadpool_size changes */
int tp_set_threadpool_size(uint size) void tp_set_threadpool_size(uint size)
{ {
bool success= true; bool success= true;
if (!started) if (!threadpool_started)
return 0; return;
for(uint i=0; i< size; i++) for(uint i=0; i< size; i++)
{ {
...@@ -1537,21 +1540,25 @@ int tp_set_threadpool_size(uint size) ...@@ -1537,21 +1540,25 @@ int tp_set_threadpool_size(uint size)
{ {
group->pollfd= io_poll_create(); group->pollfd= io_poll_create();
success= (group->pollfd >= 0); success= (group->pollfd >= 0);
if(!success)
{
sql_print_error("io_poll_create() failed, errno=%d\n", errno);
break;
}
} }
mysql_mutex_unlock(&all_groups[i].mutex); mysql_mutex_unlock(&all_groups[i].mutex);
if (!success) if (!success)
{ {
group_count= i-1; group_count= i;
return -1; return;
} }
} }
group_count= size; group_count= size;
return 0;
} }
void tp_set_threadpool_stall_limit(uint limit) void tp_set_threadpool_stall_limit(uint limit)
{ {
if (!started) if (!threadpool_started)
return; return;
mysql_mutex_lock(&(pool_timer.mutex)); mysql_mutex_lock(&(pool_timer.mutex));
pool_timer.tick_interval= limit; pool_timer.tick_interval= limit;
...@@ -1582,20 +1589,23 @@ int tp_get_idle_thread_count() ...@@ -1582,20 +1589,23 @@ int tp_get_idle_thread_count()
/* Report threadpool problems */ /* Report threadpool problems */
#define BLOCK_MSG_DELAY 30 /**
Delay in microseconds, after which "pool blocked" message is printed.
(30 sec == 30 Mio usec)
*/
#define BLOCK_MSG_DELAY 30*1000000
static const char *max_threads_reached_msg= #define MAX_THREADS_REACHED_MSG \
"Threadpool could not create additional thread to handle queries, because the " "Threadpool could not create additional thread to handle queries, because the \
"number of allowed threads was reached. Increasing 'thread_pool_max_threads' " number of allowed threads was reached. Increasing 'thread_pool_max_threads' \
"parameter can help in this situation.\n" parameter can help in this situation.\n \
"If 'extra_port' parameter is set, you can still connect to the database with " If 'extra_port' parameter is set, you can still connect to the database with \
"superuser account (it must be TCP connection using extra_port as TCP port) " superuser account (it must be TCP connection using extra_port as TCP port) \
"and troubleshoot the situation. " and troubleshoot the situation. \
"A likely cause of pool blocks are clients that lock resources for long time. " A likely cause of pool blocks are clients that lock resources for long time. \
"'show processlist' or 'show engine innodb status' can give additional hints."; 'show processlist' or 'show engine innodb status' can give additional hints."
static const char *create_thread_error_msg= #define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)."
"Can't create threads in threadpool (errno=%d).";
/** /**
Write a message when blocking situation in threadpool occurs. Write a message when blocking situation in threadpool occurs.
...@@ -1606,10 +1616,10 @@ static const char *create_thread_error_msg= ...@@ -1606,10 +1616,10 @@ static const char *create_thread_error_msg=
static void print_pool_blocked_message(bool max_threads_reached) static void print_pool_blocked_message(bool max_threads_reached)
{ {
time_t now; ulonglong now;
static bool msg_written; static bool msg_written;
now= time(NULL); now= microsecond_interval_timer();
if (pool_block_start == 0) if (pool_block_start == 0)
{ {
pool_block_start= now; pool_block_start= now;
...@@ -1620,12 +1630,12 @@ static void print_pool_blocked_message(bool max_threads_reached) ...@@ -1620,12 +1630,12 @@ static void print_pool_blocked_message(bool max_threads_reached)
if (now > pool_block_start + BLOCK_MSG_DELAY && !msg_written) if (now > pool_block_start + BLOCK_MSG_DELAY && !msg_written)
{ {
if (max_threads_reached) if (max_threads_reached)
sql_print_error(max_threads_reached_msg); sql_print_error(MAX_THREADS_REACHED_MSG);
else else
sql_print_error(create_thread_error_msg, my_errno); sql_print_error(CREATE_THREAD_ERROR_MSG, my_errno);
sql_print_information("Threadpool has been blocked for %u seconds\n", sql_print_information("Threadpool has been blocked for %u seconds\n",
(uint)(now- pool_block_start)); (uint)((now- pool_block_start)/1000000));
/* avoid reperated messages for the same blocking situation */ /* avoid reperated messages for the same blocking situation */
msg_written= true; msg_written= true;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment