Commit 4d1c7b79 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

some more whitespace, remove pending_thread_start_count. increment counters...

some more whitespace, remove pending_thread_start_count. increment counters (thread_group->count, thread_group->active_thread_count) whenever mysql_create_thread returns success.
parent d782e098
...@@ -58,7 +58,7 @@ static PSI_thread_info thread_list[] = ...@@ -58,7 +58,7 @@ static PSI_thread_info thread_list[] =
/* Macro to simplify performance schema registration */ /* Macro to simplify performance schema registration */
#define PSI_register(X) \ #define PSI_register(X) \
if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list)) if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
struct thread_group_t; struct thread_group_t;
...@@ -112,7 +112,6 @@ struct thread_group_t ...@@ -112,7 +112,6 @@ struct thread_group_t
int pollfd; int pollfd;
int thread_count; int thread_count;
int active_thread_count; int active_thread_count;
int pending_thread_start_count;
int connection_count; int connection_count;
/* Stats for the deadlock detection timer routine.*/ /* Stats for the deadlock detection timer routine.*/
int io_event_count; int io_event_count;
...@@ -691,8 +690,7 @@ static connection_t * listener(worker_thread_t *current_thread, ...@@ -691,8 +690,7 @@ static connection_t * listener(worker_thread_t *current_thread,
Wake failed, hence groups has no idle threads. Now check if there are Wake failed, hence groups has no idle threads. Now check if there are
any threads in the group except listener. any threads in the group except listener.
*/ */
if(thread_group->thread_count == 1 && if(thread_group->thread_count == 1)
thread_group->pending_thread_start_count == 0)
{ {
/* /*
Currently there is no worker thread in the group, as indicated by Currently there is no worker thread in the group, as indicated by
...@@ -714,7 +712,22 @@ static connection_t * listener(worker_thread_t *current_thread, ...@@ -714,7 +712,22 @@ static connection_t * listener(worker_thread_t *current_thread,
DBUG_RETURN(retval); DBUG_RETURN(retval);
} }
/**
Adjust thread counters in group or global
whenever thread is created or is about to exit
@param thread_group
@param count - 1, when new thread is created
-1, when thread is about to exit
*/
static void add_thread_count(thread_group_t *thread_group, int32 count)
{
thread_group->thread_count += count;
/* worker starts out and end in "active" state */
thread_group->active_thread_count += count;
my_atomic_add32(&tp_stats.num_worker_threads, count);
}
/** /**
...@@ -740,13 +753,15 @@ static int create_worker(thread_group_t *thread_group) ...@@ -740,13 +753,15 @@ static int create_worker(thread_group_t *thread_group)
max_threads_reached= true; max_threads_reached= true;
goto end; goto end;
} }
err= mysql_thread_create(key_worker_thread, &thread_id, err= mysql_thread_create(key_worker_thread, &thread_id,
thread_group->pthread_attr, worker_main, thread_group); thread_group->pthread_attr, worker_main, thread_group);
if (!err) if (!err)
{ {
thread_group->pending_thread_start_count++;
thread_group->last_thread_creation_time=microsecond_interval_timer(); thread_group->last_thread_creation_time=microsecond_interval_timer();
thread_created++;
add_thread_count(thread_group, 1);
} }
else else
{ {
...@@ -803,12 +818,12 @@ static int wake_or_create_thread(thread_group_t *thread_group) ...@@ -803,12 +818,12 @@ static int wake_or_create_thread(thread_group_t *thread_group)
{ {
DBUG_ENTER("wake_or_create_thread"); DBUG_ENTER("wake_or_create_thread");
if (thread_group->shutdown)
DBUG_RETURN(0);
if (wake_thread(thread_group) == 0) if (wake_thread(thread_group) == 0)
DBUG_RETURN(0); DBUG_RETURN(0);
if (thread_group->pending_thread_start_count > 0)
DBUG_RETURN(-1);
if (thread_group->thread_count > thread_group->connection_count) if (thread_group->thread_count > thread_group->connection_count)
DBUG_RETURN(-1); DBUG_RETURN(-1);
...@@ -903,8 +918,7 @@ static void thread_group_close(thread_group_t *thread_group) ...@@ -903,8 +918,7 @@ static void thread_group_close(thread_group_t *thread_group)
DBUG_ENTER("thread_group_close"); DBUG_ENTER("thread_group_close");
mysql_mutex_lock(&thread_group->mutex); mysql_mutex_lock(&thread_group->mutex);
if (thread_group->thread_count == 0 && if (thread_group->thread_count == 0)
thread_group->pending_thread_start_count == 0)
{ {
mysql_mutex_unlock(&thread_group->mutex); mysql_mutex_unlock(&thread_group->mutex);
thread_group_destroy(thread_group); thread_group_destroy(thread_group);
...@@ -954,10 +968,10 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection) ...@@ -954,10 +968,10 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection)
mysql_mutex_lock(&thread_group->mutex); mysql_mutex_lock(&thread_group->mutex);
thread_group->queue.push_back(connection); thread_group->queue.push_back(connection);
if (thread_group->active_thread_count == 0) if (thread_group->active_thread_count == 0)
{
wake_or_create_thread(thread_group); wake_or_create_thread(thread_group);
}
mysql_mutex_unlock(&thread_group->mutex); mysql_mutex_unlock(&thread_group->mutex);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -1060,7 +1074,7 @@ connection_t *get_event(worker_thread_t *current_thread, ...@@ -1060,7 +1074,7 @@ connection_t *get_event(worker_thread_t *current_thread,
thread_group->waiting_threads.push_front(current_thread); thread_group->waiting_threads.push_front(current_thread);
thread_group->active_thread_count--; thread_group->active_thread_count--;
if(abstime) if (abstime)
{ {
err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex, err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex,
abstime); abstime);
...@@ -1081,7 +1095,7 @@ connection_t *get_event(worker_thread_t *current_thread, ...@@ -1081,7 +1095,7 @@ connection_t *get_event(worker_thread_t *current_thread,
thread_group->waiting_threads.remove(current_thread); thread_group->waiting_threads.remove(current_thread);
} }
if(err) if (err)
break; break;
} }
...@@ -1107,7 +1121,7 @@ void wait_begin(thread_group_t *thread_group) ...@@ -1107,7 +1121,7 @@ void wait_begin(thread_group_t *thread_group)
DBUG_ASSERT(thread_group->active_thread_count >=0); DBUG_ASSERT(thread_group->active_thread_count >=0);
DBUG_ASSERT(thread_group->connection_count > 0); DBUG_ASSERT(thread_group->connection_count > 0);
if((thread_group->active_thread_count == 0) && if ((thread_group->active_thread_count == 0) &&
(thread_group->queue.is_empty() || !thread_group->listener)) (thread_group->queue.is_empty() || !thread_group->listener))
{ {
/* /*
...@@ -1168,7 +1182,7 @@ void tp_add_connection(THD *thd) ...@@ -1168,7 +1182,7 @@ void tp_add_connection(THD *thd)
threads.append(thd); threads.append(thd);
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
connection_t *connection= alloc_connection(thd); connection_t *connection= alloc_connection(thd);
if(connection) if (connection)
{ {
thd->event_scheduler.data= connection; thd->event_scheduler.data= connection;
...@@ -1243,7 +1257,7 @@ void tp_wait_begin(THD *thd, int type) ...@@ -1243,7 +1257,7 @@ void tp_wait_begin(THD *thd, int type)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
connection_t *connection = (connection_t *)thd->event_scheduler.data; connection_t *connection = (connection_t *)thd->event_scheduler.data;
if(connection) if (connection)
{ {
DBUG_ASSERT(!connection->waiting); DBUG_ASSERT(!connection->waiting);
connection->waiting= true; connection->waiting= true;
...@@ -1264,7 +1278,7 @@ void tp_wait_end(THD *thd) ...@@ -1264,7 +1278,7 @@ void tp_wait_end(THD *thd)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
connection_t *connection = (connection_t *)thd->event_scheduler.data; connection_t *connection = (connection_t *)thd->event_scheduler.data;
if(connection) if (connection)
{ {
DBUG_ASSERT(connection->waiting); DBUG_ASSERT(connection->waiting);
connection->waiting = false; connection->waiting = false;
...@@ -1290,6 +1304,7 @@ static void set_next_timeout_check(ulonglong abstime) ...@@ -1290,6 +1304,7 @@ static void set_next_timeout_check(ulonglong abstime)
/** /**
Set wait timeout for connection. Set wait timeout for connection.
*/ */
static void set_wait_timeout(connection_t *c) static void set_wait_timeout(connection_t *c)
{ {
DBUG_ENTER("set_wait_timeout"); DBUG_ENTER("set_wait_timeout");
...@@ -1316,6 +1331,7 @@ static void set_wait_timeout(connection_t *c) ...@@ -1316,6 +1331,7 @@ static void set_wait_timeout(connection_t *c)
migrate to a different group because group_count has changed migrate to a different group because group_count has changed
after thread_pool_size setting. after thread_pool_size setting.
*/ */
static int change_group(connection_t *c, static int change_group(connection_t *c,
thread_group_t *old_group, thread_group_t *old_group,
thread_group_t *new_group) thread_group_t *new_group)
...@@ -1340,7 +1356,7 @@ static int change_group(connection_t *c, ...@@ -1340,7 +1356,7 @@ static int change_group(connection_t *c,
c->thread_group= new_group; c->thread_group= new_group;
new_group->connection_count++; new_group->connection_count++;
/* Ensure that there is a listener in the new group. */ /* Ensure that there is a listener in the new group. */
if(!new_group->thread_count && !new_group->pending_thread_start_count) if (!new_group->thread_count)
ret= create_worker(new_group); ret= create_worker(new_group);
mysql_mutex_unlock(&new_group->mutex); mysql_mutex_unlock(&new_group->mutex);
return ret; return ret;
...@@ -1373,7 +1389,7 @@ static int start_io(connection_t *connection) ...@@ -1373,7 +1389,7 @@ static int start_io(connection_t *connection)
/* /*
Bind to poll descriptor if not yet done. Bind to poll descriptor if not yet done.
*/ */
if(!connection->bound_to_poll_descriptor) if (!connection->bound_to_poll_descriptor)
{ {
connection->bound_to_poll_descriptor= true; connection->bound_to_poll_descriptor= true;
return io_poll_associate_fd(group->pollfd, fd, connection); return io_poll_associate_fd(group->pollfd, fd, connection);
...@@ -1400,7 +1416,7 @@ static void handle_event(connection_t *connection) ...@@ -1400,7 +1416,7 @@ static void handle_event(connection_t *connection)
err= threadpool_process_request(connection->thd); err= threadpool_process_request(connection->thd);
} }
if(!err) if (!err)
{ {
set_wait_timeout(connection); set_wait_timeout(connection);
err= start_io(connection); err= start_io(connection);
...@@ -1427,7 +1443,6 @@ static void *worker_main(void *param) ...@@ -1427,7 +1443,6 @@ static void *worker_main(void *param)
DBUG_ENTER("worker_main"); DBUG_ENTER("worker_main");
thread_created++;
thread_group_t *thread_group = (thread_group_t *)param; thread_group_t *thread_group = (thread_group_t *)param;
/* Init per-thread structure */ /* Init per-thread structure */
...@@ -1435,13 +1450,6 @@ static void *worker_main(void *param) ...@@ -1435,13 +1450,6 @@ static void *worker_main(void *param)
this_thread.thread_group= thread_group; this_thread.thread_group= thread_group;
this_thread.event_count=0; this_thread.event_count=0;
my_atomic_add32(&tp_stats.num_worker_threads, 1);
mysql_mutex_lock(&thread_group->mutex);
thread_group->thread_count++;
thread_group->active_thread_count++;
thread_group->pending_thread_start_count--;
mysql_mutex_unlock(&thread_group->mutex);
/* Run event loop */ /* Run event loop */
for(;;) for(;;)
{ {
...@@ -1450,9 +1458,7 @@ static void *worker_main(void *param) ...@@ -1450,9 +1458,7 @@ static void *worker_main(void *param)
set_timespec(ts,threadpool_idle_timeout); set_timespec(ts,threadpool_idle_timeout);
connection = get_event(&this_thread, thread_group, &ts); connection = get_event(&this_thread, thread_group, &ts);
if (!connection) if (!connection)
{
break; break;
}
this_thread.event_count++; this_thread.event_count++;
handle_event(connection); handle_event(connection);
} }
...@@ -1460,19 +1466,16 @@ static void *worker_main(void *param) ...@@ -1460,19 +1466,16 @@ static void *worker_main(void *param)
/* Thread shutdown: cleanup per-worker-thread structure. */ /* Thread shutdown: cleanup per-worker-thread structure. */
mysql_cond_destroy(&this_thread.cond); mysql_cond_destroy(&this_thread.cond);
bool last_thread; /* last thread in group exits */
mysql_mutex_lock(&thread_group->mutex); mysql_mutex_lock(&thread_group->mutex);
thread_group->active_thread_count--; add_thread_count(thread_group, -1);
thread_group->thread_count--; last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown);
mysql_mutex_unlock(&thread_group->mutex); mysql_mutex_unlock(&thread_group->mutex);
my_atomic_add32(&tp_stats.num_worker_threads, -1);
/* If it is the last thread in group and pool is terminating, destroy group.*/ /* Last thread in group exits and pool is terminating, destroy group.*/
if (thread_group->shutdown if (last_thread)
&& thread_group->thread_count == 0
&& thread_group->pending_thread_start_count == 0)
{
thread_group_destroy(thread_group); thread_group_destroy(thread_group);
}
my_thread_end(); my_thread_end();
return NULL; return NULL;
} }
...@@ -1519,6 +1522,7 @@ void tp_end() ...@@ -1519,6 +1522,7 @@ void tp_end()
/** Ensure that poll descriptors are created when threadpool_size changes */ /** Ensure that poll descriptors are created when threadpool_size changes */
int tp_set_threadpool_size(uint size) int tp_set_threadpool_size(uint size)
{ {
bool success= true; bool success= true;
...@@ -1560,9 +1564,9 @@ void tp_set_threadpool_stall_limit(uint limit) ...@@ -1560,9 +1564,9 @@ void tp_set_threadpool_stall_limit(uint limit)
Calculate number of idle/waiting threads in the pool. Calculate number of idle/waiting threads in the pool.
Sum idle threads over all groups. Sum idle threads over all groups.
D Don't do any locking, it is not required for stats.
on't do any locking, it is not required for stats.
*/ */
int tp_get_idle_thread_count() int tp_get_idle_thread_count()
{ {
int sum=0; int sum=0;
...@@ -1599,6 +1603,7 @@ static const char *create_thread_error_msg= ...@@ -1599,6 +1603,7 @@ static const char *create_thread_error_msg=
It will be just a single message for each blocking situation (to prevent It will be just a single message for each blocking situation (to prevent
log flood). log flood).
*/ */
static void print_pool_blocked_message(bool max_threads_reached) static void print_pool_blocked_message(bool max_threads_reached)
{ {
time_t now; time_t now;
...@@ -1612,9 +1617,9 @@ static void print_pool_blocked_message(bool max_threads_reached) ...@@ -1612,9 +1617,9 @@ static void print_pool_blocked_message(bool max_threads_reached)
return; return;
} }
if(now > pool_block_start + BLOCK_MSG_DELAY && !msg_written) if (now > pool_block_start + BLOCK_MSG_DELAY && !msg_written)
{ {
if(max_threads_reached) if (max_threads_reached)
sql_print_error(max_threads_reached_msg); sql_print_error(max_threads_reached_msg);
else else
sql_print_error(create_thread_error_msg, my_errno); sql_print_error(create_thread_error_msg, my_errno);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment