Commit e6f03715 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-22696 TP_pool_generic::set_pool_size logic so that it marks each...

MDEV-22696 TP_pool_generic::set_pool_size logic so that it marks each connection to change group before the next socket read.
parent 9aa6042a
......@@ -1375,6 +1375,12 @@ static void set_next_timeout_check(ulonglong abstime)
DBUG_VOID_RETURN;
}
static size_t get_group_id(my_thread_id tid)
{
return size_t(tid % group_count);
}
TP_connection_generic::TP_connection_generic(CONNECT *c):
TP_connection(c),
thread_group(0),
......@@ -1382,10 +1388,14 @@ TP_connection_generic::TP_connection_generic(CONNECT *c):
prev_in_queue(0),
abs_wait_timeout(ULONGLONG_MAX),
bound_to_poll_descriptor(false),
waiting(false)
waiting(false),
fix_group(false)
#ifdef HAVE_IOCP
, overlapped()
#endif
#ifdef _WIN32
, vio_type(c->vio_type)
#endif
{
DBUG_ASSERT(c->vio_type != VIO_CLOSED);
......@@ -1398,8 +1408,7 @@ TP_connection_generic::TP_connection_generic(CONNECT *c):
/* Assign connection to a group. */
thread_group_t *group=
&all_groups[c->thread_id%group_count];
&all_groups[get_group_id(c->thread_id)];
thread_group=group;
mysql_mutex_lock(&group->mutex);
......@@ -1473,6 +1482,7 @@ static int change_group(TP_connection_generic *c,
return ret;
}
int TP_connection_generic::start_io()
{
/*
......@@ -1485,14 +1495,17 @@ int TP_connection_generic::start_io()
So we recalculate in which group the connection should be, based
on thread_id and current group count, and migrate if necessary.
*/
thread_group_t *group =
&all_groups[thd->thread_id%group_count];
if (fix_group)
{
fix_group = false;
thread_group_t *new_group= &all_groups[get_group_id(thd->thread_id)];
if (group != thread_group)
if (new_group != thread_group)
{
if (change_group(this, thread_group, group))
if (change_group(this, thread_group, new_group))
return -1;
}
}
/*
Bind to poll descriptor if not yet done.
......@@ -1615,6 +1628,14 @@ TP_pool_generic::~TP_pool_generic()
}
static my_bool thd_reset_group(THD* thd, void*)
{
auto c= (TP_connection_generic*)thd->event_scheduler.data;
if(c)
c->fix_group= true;
return FALSE;
}
/** Ensure that poll descriptors are created when threadpool_size changes */
int TP_pool_generic::set_pool_size(uint size)
{
......@@ -1641,6 +1662,7 @@ int TP_pool_generic::set_pool_size(uint size)
}
}
group_count= size;
server_threads.iterate(thd_reset_group);
return 0;
}
......
......@@ -89,6 +89,7 @@ struct TP_connection_generic :public TP_connection
TP_file_handle fd;
bool bound_to_poll_descriptor;
int waiting;
bool fix_group;
#ifdef HAVE_IOCP
OVERLAPPED overlapped;
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment