Commit 2aff00ff authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

Threadpool : Rest of monty's review

parent 49f9ee3d
......@@ -1159,7 +1159,7 @@ void my_net_set_read_timeout(NET *net, uint timeout)
{
DBUG_ENTER("my_net_set_read_timeout");
DBUG_PRINT("enter", ("timeout: %d", timeout));
if(net->read_timeout == timeout)
if (net->read_timeout == timeout)
DBUG_VOID_RETURN;
net->read_timeout= timeout;
#ifdef NO_ALARM
......@@ -1174,7 +1174,7 @@ void my_net_set_write_timeout(NET *net, uint timeout)
{
DBUG_ENTER("my_net_set_write_timeout");
DBUG_PRINT("enter", ("timeout: %d", timeout));
if(net->write_timeout == timeout)
if (net->write_timeout == timeout)
DBUG_VOID_RETURN;
net->write_timeout= timeout;
#ifdef NO_ALARM
......
......@@ -1630,7 +1630,7 @@ void THD::disconnect()
/* Disconnect even if a active vio is not associated. */
if (net.vio != vio)
vio_close(net.vio);
vio_close(net.vio);
mysql_mutex_unlock(&LOCK_thd_data);
}
......
......@@ -1823,11 +1823,7 @@ static Sys_var_enum Sys_thread_handling(
#endif
, READ_ONLY GLOBAL_VAR(thread_handling), CMD_LINE(REQUIRED_ARG),
thread_handling_names,
#ifdef HAVE_POOL_OF_THREADS
DEFAULT(2)
#else
DEFAULT(0)
#endif
);
#ifdef HAVE_QUERY_CACHE
......
......@@ -158,10 +158,6 @@ void threadpool_remove_connection(THD *thd)
end_connection(thd);
close_connection(thd, 0);
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->event_scheduler.data= NULL;
mysql_mutex_unlock(&thd->LOCK_thd_data);
unlink_thd(thd);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_thread_count);
......
......@@ -334,7 +334,9 @@ static void* native_event_get_userdata(native_event *event)
{
return event->udata;
}
#elif defined (__sun)
static int io_poll_create()
{
return port_create();
......@@ -980,15 +982,16 @@ static bool too_many_threads(thread_group_t *thread_group)
Pending event in our case means that there is either a pending login request
(if connection is not yet logged in), or there are unread bytes on the socket.
If there are no pending events currently, thread will wait. If timeout specified
int abstime parameter passes, the function returns NULL.
If there are no pending events currently, thread will wait.
If timeout specified in abstime parameter passes, the function returns NULL.
@param current_thread - current worker thread
@param thread_group - current thread group
@param abstime - absolute wait timeout
@return
connection with pending event. NULL is returned if timeout has expired,or on shutdown.
connection with pending event.
NULL is returned if timeout has expired,or on shutdown.
*/
connection_t *get_event(worker_thread_t *current_thread,
......@@ -1001,13 +1004,14 @@ connection_t *get_event(worker_thread_t *current_thread,
mysql_mutex_lock(&thread_group->mutex);
DBUG_ASSERT(thread_group->active_thread_count >= 0);
do
for(;;)
{
bool oversubscribed = too_many_threads(thread_group);
if (thread_group->shutdown)
break;
/* Check if queue is not empty */
if (!too_many_threads(thread_group))
if (!oversubscribed)
{
connection = queue_get(thread_group);
if(connection)
......@@ -1034,7 +1038,7 @@ connection_t *get_event(worker_thread_t *current_thread,
Last thing we try before going to sleep is to
pick a single event via epoll, without waiting (timeout 0)
*/
if (!too_many_threads(thread_group))
if (!oversubscribed)
{
native_event nev;
if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
......@@ -1057,9 +1061,14 @@ connection_t *get_event(worker_thread_t *current_thread,
thread_group->active_thread_count--;
if(abstime)
err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex, abstime);
{
err = mysql_cond_timedwait(&current_thread->cond, &thread_group->mutex,
abstime);
}
else
{
err = mysql_cond_wait(&current_thread->cond, &thread_group->mutex);
}
thread_group->active_thread_count++;
if (!current_thread->woken)
......@@ -1074,9 +1083,7 @@ connection_t *get_event(worker_thread_t *current_thread,
if(err)
break;
}
while(true);
thread_group->stalled= false;
mysql_mutex_unlock(&thread_group->mutex);
......@@ -1163,9 +1170,7 @@ void tp_add_connection(THD *thd)
connection_t *connection= alloc_connection(thd);
if(connection)
{
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->event_scheduler.data= connection;
mysql_mutex_unlock(&thd->LOCK_thd_data);
/* Assign connection to a group. */
thread_group_t *group=
......@@ -1183,7 +1188,11 @@ void tp_add_connection(THD *thd)
*/
queue_put(group, connection);
}
else
{
/* Allocation failed */
threadpool_remove_connection(thd);
}
DBUG_VOID_RETURN;
}
......@@ -1358,9 +1367,7 @@ static int start_io(connection_t *connection)
if (group != connection->thread_group)
{
if (change_group(connection, connection->thread_group, group))
{
return -1;
}
}
/*
......@@ -1385,12 +1392,12 @@ static void handle_event(connection_t *connection)
if (!connection->logged_in)
{
err = threadpool_add_connection(connection->thd);
err= threadpool_add_connection(connection->thd);
connection->logged_in= true;
}
else
{
err = threadpool_process_request(connection->thd);
err= threadpool_process_request(connection->thd);
}
if(!err)
......@@ -1400,12 +1407,13 @@ static void handle_event(connection_t *connection)
}
if (err)
{
connection_abort(connection);
}
DBUG_VOID_RETURN;
}
/**
Worker thread's main
*/
......@@ -1505,6 +1513,7 @@ void tp_end()
{
thread_group_close(&all_groups[i]);
}
started= false;
DBUG_VOID_RETURN;
}
......
......@@ -652,12 +652,10 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
*/
void tp_add_connection(THD *thd)
{
connection_t *con = (connection_t *)malloc(sizeof(connection_t));
if (con)
threads.append(thd);
threads.append(thd);
mysql_mutex_unlock(&LOCK_thread_count);
connection_t *con = (connection_t *)malloc(sizeof(connection_t));
if(!con)
{
tp_log_warning("Allocation failed", "tp_add_connection");
......@@ -667,6 +665,8 @@ void tp_add_connection(THD *thd)
init_connection(con);
con->thd= thd;
thd->event_scheduler.data= con;
/* Try to login asynchronously, using threads in the pool */
PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ);
if (wrk)
......@@ -720,4 +720,5 @@ void tp_wait_end(THD *thd)
int tp_get_idle_thread_count()
{
return 0;
}
\ No newline at end of file
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment