Commit 804f3bfe authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

Threadpool : Rest of monty's review

parent c5c32972
...@@ -1159,7 +1159,7 @@ void my_net_set_read_timeout(NET *net, uint timeout) ...@@ -1159,7 +1159,7 @@ void my_net_set_read_timeout(NET *net, uint timeout)
{ {
DBUG_ENTER("my_net_set_read_timeout"); DBUG_ENTER("my_net_set_read_timeout");
DBUG_PRINT("enter", ("timeout: %d", timeout)); DBUG_PRINT("enter", ("timeout: %d", timeout));
if(net->read_timeout == timeout) if (net->read_timeout == timeout)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
net->read_timeout= timeout; net->read_timeout= timeout;
#ifdef NO_ALARM #ifdef NO_ALARM
...@@ -1174,7 +1174,7 @@ void my_net_set_write_timeout(NET *net, uint timeout) ...@@ -1174,7 +1174,7 @@ void my_net_set_write_timeout(NET *net, uint timeout)
{ {
DBUG_ENTER("my_net_set_write_timeout"); DBUG_ENTER("my_net_set_write_timeout");
DBUG_PRINT("enter", ("timeout: %d", timeout)); DBUG_PRINT("enter", ("timeout: %d", timeout));
if(net->write_timeout == timeout) if (net->write_timeout == timeout)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
net->write_timeout= timeout; net->write_timeout= timeout;
#ifdef NO_ALARM #ifdef NO_ALARM
......
...@@ -1823,11 +1823,7 @@ static Sys_var_enum Sys_thread_handling( ...@@ -1823,11 +1823,7 @@ static Sys_var_enum Sys_thread_handling(
#endif #endif
, READ_ONLY GLOBAL_VAR(thread_handling), CMD_LINE(REQUIRED_ARG), , READ_ONLY GLOBAL_VAR(thread_handling), CMD_LINE(REQUIRED_ARG),
thread_handling_names, thread_handling_names,
#ifdef HAVE_POOL_OF_THREADS
DEFAULT(2)
#else
DEFAULT(0) DEFAULT(0)
#endif
); );
#ifdef HAVE_QUERY_CACHE #ifdef HAVE_QUERY_CACHE
......
...@@ -158,10 +158,6 @@ void threadpool_remove_connection(THD *thd) ...@@ -158,10 +158,6 @@ void threadpool_remove_connection(THD *thd)
end_connection(thd); end_connection(thd);
close_connection(thd, 0); close_connection(thd, 0);
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->event_scheduler.data= NULL;
mysql_mutex_unlock(&thd->LOCK_thd_data);
unlink_thd(thd); unlink_thd(thd);
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_thread_count); mysql_cond_broadcast(&COND_thread_count);
......
...@@ -334,7 +334,9 @@ static void* native_event_get_userdata(native_event *event) ...@@ -334,7 +334,9 @@ static void* native_event_get_userdata(native_event *event)
{ {
return event->udata; return event->udata;
} }
#elif defined (__sun) #elif defined (__sun)
static int io_poll_create() static int io_poll_create()
{ {
return port_create(); return port_create();
...@@ -980,15 +982,16 @@ static bool too_many_threads(thread_group_t *thread_group) ...@@ -980,15 +982,16 @@ static bool too_many_threads(thread_group_t *thread_group)
Pending event in our case means that there is either a pending login request Pending event in our case means that there is either a pending login request
(if connection is not yet logged in), or there are unread bytes on the socket. (if connection is not yet logged in), or there are unread bytes on the socket.
If there are no pending events currently, thread will wait. If timeout specified If there are no pending events currently, thread will wait.
int abstime parameter passes, the function returns NULL. If timeout specified in abstime parameter passes, the function returns NULL.
@param current_thread - current worker thread @param current_thread - current worker thread
@param thread_group - current thread group @param thread_group - current thread group
@param abstime - absolute wait timeout @param abstime - absolute wait timeout
@return @return
connection with pending event. NULL is returned if timeout has expired,or on shutdown. connection with pending event.
NULL is returned if timeout has expired,or on shutdown.
*/ */
connection_t *get_event(worker_thread_t *current_thread, connection_t *get_event(worker_thread_t *current_thread,
...@@ -1001,13 +1004,14 @@ connection_t *get_event(worker_thread_t *current_thread, ...@@ -1001,13 +1004,14 @@ connection_t *get_event(worker_thread_t *current_thread,
mysql_mutex_lock(&thread_group->mutex); mysql_mutex_lock(&thread_group->mutex);
DBUG_ASSERT(thread_group->active_thread_count >= 0); DBUG_ASSERT(thread_group->active_thread_count >= 0);
do for(;;)
{ {
bool oversubscribed = too_many_threads(thread_group);
if (thread_group->shutdown) if (thread_group->shutdown)
break; break;
/* Check if queue is not empty */ /* Check if queue is not empty */
if (!too_many_threads(thread_group)) if (!oversubscribed)
{ {
connection = queue_get(thread_group); connection = queue_get(thread_group);
if(connection) if(connection)
...@@ -1034,7 +1038,7 @@ connection_t *get_event(worker_thread_t *current_thread, ...@@ -1034,7 +1038,7 @@ connection_t *get_event(worker_thread_t *current_thread,
Last thing we try before going to sleep is to Last thing we try before going to sleep is to
pick a single event via epoll, without waiting (timeout 0) pick a single event via epoll, without waiting (timeout 0)
*/ */
if (!too_many_threads(thread_group)) if (!oversubscribed)
{ {
native_event nev; native_event nev;
if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1) if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
...@@ -1057,9 +1061,14 @@ connection_t *get_event(worker_thread_t *current_thread, ...@@ -1057,9 +1061,14 @@ connection_t *get_event(worker_thread_t *current_thread,
thread_group->active_thread_count--; thread_group->active_thread_count--;
if(abstime) if(abstime)
err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex, abstime); {
err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex,
abstime);
}
else else
{
err = mysql_cond_wait(&current_thread->cond, &thread_group->mutex); err = mysql_cond_wait(&current_thread->cond, &thread_group->mutex);
}
thread_group->active_thread_count++; thread_group->active_thread_count++;
if (!current_thread->woken) if (!current_thread->woken)
...@@ -1074,9 +1083,7 @@ connection_t *get_event(worker_thread_t *current_thread, ...@@ -1074,9 +1083,7 @@ connection_t *get_event(worker_thread_t *current_thread,
if(err) if(err)
break; break;
} }
while(true);
thread_group->stalled= false; thread_group->stalled= false;
mysql_mutex_unlock(&thread_group->mutex); mysql_mutex_unlock(&thread_group->mutex);
...@@ -1163,9 +1170,7 @@ void tp_add_connection(THD *thd) ...@@ -1163,9 +1170,7 @@ void tp_add_connection(THD *thd)
connection_t *connection= alloc_connection(thd); connection_t *connection= alloc_connection(thd);
if(connection) if(connection)
{ {
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->event_scheduler.data= connection; thd->event_scheduler.data= connection;
mysql_mutex_unlock(&thd->LOCK_thd_data);
/* Assign connection to a group. */ /* Assign connection to a group. */
thread_group_t *group= thread_group_t *group=
...@@ -1183,7 +1188,11 @@ void tp_add_connection(THD *thd) ...@@ -1183,7 +1188,11 @@ void tp_add_connection(THD *thd)
*/ */
queue_put(group, connection); queue_put(group, connection);
} }
else
{
/* Allocation failed */
threadpool_remove_connection(thd);
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -1358,10 +1367,8 @@ static int start_io(connection_t *connection) ...@@ -1358,10 +1367,8 @@ static int start_io(connection_t *connection)
if (group != connection->thread_group) if (group != connection->thread_group)
{ {
if (change_group(connection, connection->thread_group, group)) if (change_group(connection, connection->thread_group, group))
{
return -1; return -1;
} }
}
/* /*
Bind to poll descriptor if not yet done. Bind to poll descriptor if not yet done.
...@@ -1385,12 +1392,12 @@ static void handle_event(connection_t *connection) ...@@ -1385,12 +1392,12 @@ static void handle_event(connection_t *connection)
if (!connection->logged_in) if (!connection->logged_in)
{ {
err = threadpool_add_connection(connection->thd); err= threadpool_add_connection(connection->thd);
connection->logged_in= true; connection->logged_in= true;
} }
else else
{ {
err = threadpool_process_request(connection->thd); err= threadpool_process_request(connection->thd);
} }
if(!err) if(!err)
...@@ -1400,12 +1407,13 @@ static void handle_event(connection_t *connection) ...@@ -1400,12 +1407,13 @@ static void handle_event(connection_t *connection)
} }
if (err) if (err)
{
connection_abort(connection); connection_abort(connection);
}
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/** /**
Worker thread's main Worker thread's main
*/ */
...@@ -1505,6 +1513,7 @@ void tp_end() ...@@ -1505,6 +1513,7 @@ void tp_end()
{ {
thread_group_close(&all_groups[i]); thread_group_close(&all_groups[i]);
} }
started= false;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
......
...@@ -652,12 +652,10 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, ...@@ -652,12 +652,10 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
*/ */
void tp_add_connection(THD *thd) void tp_add_connection(THD *thd)
{ {
connection_t *con = (connection_t *)malloc(sizeof(connection_t));
if (con)
threads.append(thd); threads.append(thd);
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
connection_t *con = (connection_t *)malloc(sizeof(connection_t));
if(!con) if(!con)
{ {
tp_log_warning("Allocation failed", "tp_add_connection"); tp_log_warning("Allocation failed", "tp_add_connection");
...@@ -667,6 +665,8 @@ void tp_add_connection(THD *thd) ...@@ -667,6 +665,8 @@ void tp_add_connection(THD *thd)
init_connection(con); init_connection(con);
con->thd= thd; con->thd= thd;
thd->event_scheduler.data= con;
/* Try to login asynchronously, using threads in the pool */ /* Try to login asynchronously, using threads in the pool */
PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ); PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ);
if (wrk) if (wrk)
...@@ -721,3 +721,4 @@ int tp_get_idle_thread_count() ...@@ -721,3 +721,4 @@ int tp_get_idle_thread_count()
{ {
return 0; return 0;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment