Commit 2fc13d04 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-19313 Threadpool : provide information schema tables for internals of generic threadpool

Added thread_pool_groups, thread_pool_queues, thread_pool_waits and
thread_pool_stats tables to information_schema.
parent 5f18bd3a
...@@ -164,6 +164,7 @@ IF ((CMAKE_SYSTEM_NAME MATCHES "Linux" OR ...@@ -164,6 +164,7 @@ IF ((CMAKE_SYSTEM_NAME MATCHES "Linux" OR
ENDIF() ENDIF()
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc) SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc)
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_common.cc) SET(SQL_SOURCE ${SQL_SOURCE} threadpool_common.cc)
MYSQL_ADD_PLUGIN(thread_pool_info thread_pool_info.cc DEFAULT STATIC_ONLY NOT_EMBEDDED)
ENDIF() ENDIF()
IF(WIN32) IF(WIN32)
......
...@@ -7461,7 +7461,7 @@ static int debug_status_func(THD *thd, SHOW_VAR *var, char *buff, ...@@ -7461,7 +7461,7 @@ static int debug_status_func(THD *thd, SHOW_VAR *var, char *buff,
#endif #endif
#ifdef HAVE_POOL_OF_THREADS #ifdef HAVE_POOL_OF_THREADS
int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff, static int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff,
enum enum_var_type scope) enum enum_var_type scope)
{ {
var->type= SHOW_INT; var->type= SHOW_INT;
......
/* Copyright(C) 2019 MariaDB
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include <mysql_version.h>
#include <mysql/plugin.h>
#include <my_global.h>
#include <sql_class.h>
#include <table.h>
#include <mysql/plugin.h>
#include <sql_show.h>
#include <threadpool_generic.h>
static ST_FIELD_INFO groups_fields_info[] =
{
{"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
{"CONNECTIONS", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
{"THREADS", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
{"ACTIVE_THREADS",6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
{"STANDBY_THREADS", 6, MYSQL_TYPE_LONG, 0, 0, 0,0},
{"QUEUE_LENGTH", 6, MYSQL_TYPE_LONG, 0,0, 0, 0},
{"HAS_LISTENER",1,MYSQL_TYPE_TINY, 0, 0, 0, 0},
{"IS_STALLED",1,MYSQL_TYPE_TINY, 0, 0, 0, 0},
{0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}
};
static int groups_fill_table(THD* thd, TABLE_LIST* tables, COND*)
{
if (!all_groups)
return 0;
TABLE* table = tables->table;
for (uint i = 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
{
thread_group_t* group = &all_groups[i];
/* ID */
table->field[0]->store(i, true);
/* CONNECTION_COUNT */
table->field[1]->store(group->connection_count, true);
/* THREAD_COUNT */
table->field[2]->store(group->thread_count, true);
/* ACTIVE_THREAD_COUNT */
table->field[3]->store(group->active_thread_count, true);
/* STANDBY_THREAD_COUNT */
table->field[4]->store(group->waiting_threads.elements(), true);
/* QUEUE LENGTH */
uint queue_len = group->queues[TP_PRIORITY_LOW].elements()
+ group->queues[TP_PRIORITY_HIGH].elements();
table->field[5]->store(queue_len, true);
/* HAS_LISTENER */
table->field[6]->store((longlong)(group->listener != 0), true);
/* IS_STALLED */
table->field[7]->store(group->stalled, true);
if (schema_table_store_record(thd, table))
return 1;
}
return 0;
}
static int groups_init(void* p)
{
ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
schema->fields_info = groups_fields_info;
schema->fill_table = groups_fill_table;
return 0;
}
static ST_FIELD_INFO queues_field_info[] =
{
{"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
{"POSITION",6,MYSQL_TYPE_LONG, 0, 0, 0, 0},
{"PRIORITY", 1, MYSQL_TYPE_LONG, 0, 0, 0, 0},
{"CONNECTION_ID", 19, MYSQL_TYPE_LONGLONG, MY_I_S_UNSIGNED, 0, 0, 0},
{"QUEUEING_TIME_MICROSECONDS", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
{0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0}
};
typedef connection_queue_t::Iterator connection_queue_iterator;
static int queues_fill_table(THD* thd, TABLE_LIST* tables, COND*)
{
if (!all_groups)
return 0;
TABLE* table = tables->table;
for (uint group_id = 0;
group_id < threadpool_max_size && all_groups[group_id].pollfd != INVALID_HANDLE_VALUE;
group_id++)
{
thread_group_t* group = &all_groups[group_id];
mysql_mutex_lock(&group->mutex);
bool err = false;
int pos = 0;
ulonglong now = microsecond_interval_timer();
for (uint prio = 0; prio < NQUEUES && !err; prio++)
{
connection_queue_iterator it(group->queues[prio]);
TP_connection_generic* c;
while ((c = it++) != 0)
{
/* GROUP_ID */
table->field[0]->store(group_id, true);
/* POSITION */
table->field[1]->store(pos++, true);
/* PRIORITY */
table->field[2]->store(prio, true);
/* CONNECTION_ID */
table->field[3]->store(c->thd->thread_id, true);
/* QUEUEING_TIME */
table->field[4]->store(now - c->enqueue_time, true);
err = schema_table_store_record(thd, table);
if (err)
break;
}
}
mysql_mutex_unlock(&group->mutex);
if (err)
return 1;
}
return 0;
}
static int queues_init(void* p)
{
ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
schema->fields_info = queues_field_info;
schema->fill_table = queues_fill_table;
return 0;
}
static ST_FIELD_INFO stats_fields_info[] =
{
{"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
{"THREAD_CREATIONS",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
{"THREAD_CREATIONS_DUE_TO_STALL",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
{"WAKES",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
{"WAKES_DUE_TO_STALL",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
{"THROTTLES",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
{"STALLS",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0},
{"POLLS_BY_LISTENER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0},
{"POLLS_BY_WORKER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0},
{"DEQUEUES_BY_LISTENER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0},
{"DEQUEUES_BY_WORKER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0},
{0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}
};
static int stats_fill_table(THD* thd, TABLE_LIST* tables, COND*)
{
if (!all_groups)
return 0;
TABLE* table = tables->table;
for (uint i = 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
{
table->field[0]->store(i, true);
thread_group_t* group = &all_groups[i];
mysql_mutex_lock(&group->mutex);
thread_group_counters_t* counters = &group->counters;
table->field[1]->store(counters->thread_creations, true);
table->field[2]->store(counters->thread_creations_due_to_stall, true);
table->field[3]->store(counters->wakes, true);
table->field[4]->store(counters->wakes_due_to_stall, true);
table->field[5]->store(counters->throttles, true);
table->field[6]->store(counters->stalls, true);
table->field[7]->store(counters->polls_by_listener, true);
table->field[8]->store(counters->polls_by_worker, true);
table->field[9]->store(counters->dequeues_by_listener, true);
table->field[10]->store(counters->dequeues_by_worker, true);
mysql_mutex_unlock(&group->mutex);
if (schema_table_store_record(thd, table))
return 1;
}
return 0;
}
static int stats_reset_table()
{
for (uint i = 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
{
thread_group_t* group = &all_groups[i];
mysql_mutex_lock(&group->mutex);
memset(&group->counters, 0, sizeof(group->counters));
mysql_mutex_unlock(&group->mutex);
}
return 0;
}
static int stats_init(void* p)
{
ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
schema->fields_info = stats_fields_info;
schema->fill_table = stats_fill_table;
schema->reset_table = stats_reset_table;
return 0;
}
static ST_FIELD_INFO waits_fields_info[] =
{
{"REASON", 16, MYSQL_TYPE_STRING, 0, 0, 0, 0},
{"COUNT",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
{0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}
};
/* See thd_wait_type enum for explanation*/
static const LEX_CSTRING wait_reasons[THD_WAIT_LAST] =
{
{STRING_WITH_LEN("UNKNOWN")},
{STRING_WITH_LEN("SLEEP")},
{STRING_WITH_LEN("DISKIO")},
{STRING_WITH_LEN("ROW_LOCK")},
{STRING_WITH_LEN("GLOBAL_LOCK")},
{STRING_WITH_LEN("META_DATA_LOCK")},
{STRING_WITH_LEN("TABLE_LOCK")},
{STRING_WITH_LEN("USER_LOCK")},
{STRING_WITH_LEN("BINLOG")},
{STRING_WITH_LEN("GROUP_COMMIT")},
{STRING_WITH_LEN("SYNC")},
{STRING_WITH_LEN("NET")}
};
extern Atomic_counter<unsigned long long> tp_waits[THD_WAIT_LAST];
static int waits_fill_table(THD* thd, TABLE_LIST* tables, COND*)
{
if (!all_groups)
return 0;
TABLE* table = tables->table;
for (auto i = 0; i < THD_WAIT_LAST; i++)
{
table->field[0]->store(wait_reasons[i].str, wait_reasons[i].length, system_charset_info);
table->field[1]->store(tp_waits[i], true);
if (schema_table_store_record(thd, table))
return 1;
}
return 0;
}
static int waits_reset_table()
{
for (auto i = 0; i < THD_WAIT_LAST; i++)
tp_waits[i] = 0;
return 0;
}
static int waits_init(void* p)
{
ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
schema->fields_info = waits_fields_info;
schema->fill_table = waits_fill_table;
schema->reset_table = waits_reset_table;
return 0;
}
static struct st_mysql_information_schema plugin_descriptor =
{ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };
maria_declare_plugin(thread_pool_info)
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&plugin_descriptor,
"THREAD_POOL_GROUPS",
"Vladislav Vaintroub",
"Provides information about threadpool groups.",
PLUGIN_LICENSE_GPL,
groups_init,
0,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
},
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&plugin_descriptor,
"THREAD_POOL_QUEUES",
"Vladislav Vaintroub",
"Provides information about threadpool queues.",
PLUGIN_LICENSE_GPL,
queues_init,
0,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
},
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&plugin_descriptor,
"THREAD_POOL_STATS",
"Vladislav Vaintroub",
"Provides performance counter information for threadpool.",
PLUGIN_LICENSE_GPL,
stats_init,
0,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
},
{
MYSQL_INFORMATION_SCHEMA_PLUGIN,
&plugin_descriptor,
"THREAD_POOL_WAITS",
"Vladislav Vaintroub",
"Provides wait counters for threadpool.",
PLUGIN_LICENSE_GPL,
waits_init,
0,
0x0100,
NULL,
NULL,
"1.0",
MariaDB_PLUGIN_MATURITY_STABLE
}
maria_declare_plugin_end;
...@@ -64,8 +64,6 @@ extern int tp_get_thread_count(); ...@@ -64,8 +64,6 @@ extern int tp_get_thread_count();
/* Activate threadpool scheduler */ /* Activate threadpool scheduler */
extern void tp_scheduler(void); extern void tp_scheduler(void);
extern int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff,
enum enum_var_type scope);
enum TP_PRIORITY { enum TP_PRIORITY {
TP_PRIORITY_HIGH, TP_PRIORITY_HIGH,
...@@ -88,6 +86,8 @@ enum TP_STATE ...@@ -88,6 +86,8 @@ enum TP_STATE
inside threadpool_win.cc and threadpool_unix.cc inside threadpool_win.cc and threadpool_unix.cc
*/ */
class CONNECT;
struct TP_connection struct TP_connection
{ {
THD* thd; THD* thd;
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#include <sql_audit.h> #include <sql_audit.h>
#include <debug_sync.h> #include <debug_sync.h>
#include <threadpool.h> #include <threadpool.h>
#include <my_counter.h>
/* Threadpool parameters */ /* Threadpool parameters */
...@@ -153,9 +153,8 @@ static TP_PRIORITY get_priority(TP_connection *c) ...@@ -153,9 +153,8 @@ static TP_PRIORITY get_priority(TP_connection *c)
DBUG_ASSERT(c->thd == current_thd); DBUG_ASSERT(c->thd == current_thd);
TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority; TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
if (prio == TP_PRIORITY_AUTO) if (prio == TP_PRIORITY_AUTO)
{ prio= c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
}
return prio; return prio;
} }
...@@ -463,12 +462,17 @@ void tp_timeout_handler(TP_connection *c) ...@@ -463,12 +462,17 @@ void tp_timeout_handler(TP_connection *c)
mysql_mutex_unlock(&thd->LOCK_thd_kill); mysql_mutex_unlock(&thd->LOCK_thd_kill);
} }
MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) Atomic_counter<unsigned long long> tp_waits[THD_WAIT_LAST];
static void tp_wait_begin(THD *thd, int type) static void tp_wait_begin(THD *thd, int type)
{ {
TP_connection *c = get_TP_connection(thd); TP_connection *c = get_TP_connection(thd);
if (c) if (c)
{
DBUG_ASSERT(type > 0 && type < THD_WAIT_LAST);
tp_waits[type]++;
c->wait_begin(type); c->wait_begin(type);
}
} }
......
...@@ -13,56 +13,27 @@ ...@@ -13,56 +13,27 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
#if (defined HAVE_POOL_OF_THREADS) && !defined(EMBEDDED_LIBRARY)
#include "threadpool_generic.h"
#include "mariadb.h" #include "mariadb.h"
#include <violite.h> #include <violite.h>
#include <sql_priv.h> #include <sql_priv.h>
#include <sql_class.h> #include <sql_class.h>
#include <my_pthread.h> #include <my_pthread.h>
#include <scheduler.h> #include <scheduler.h>
#ifdef HAVE_POOL_OF_THREADS
#ifdef _WIN32
/* AIX may define this, too ?*/
#define HAVE_IOCP
#endif
#ifdef HAVE_IOCP
#define OPTIONAL_IO_POLL_READ_PARAM this
#else
#define OPTIONAL_IO_POLL_READ_PARAM 0
#endif
#ifdef _WIN32
typedef HANDLE TP_file_handle;
#else
typedef int TP_file_handle;
#define INVALID_HANDLE_VALUE -1
#endif
#include <sql_connect.h> #include <sql_connect.h>
#include <mysqld.h> #include <mysqld.h>
#include <debug_sync.h> #include <debug_sync.h>
#include <time.h> #include <time.h>
#include <sql_plist.h> #include <sql_plist.h>
#include <threadpool.h> #include <threadpool.h>
#include <time.h>
#ifdef __linux__
#include <sys/epoll.h>
typedef struct epoll_event native_event;
#elif defined(HAVE_KQUEUE)
#include <sys/event.h>
typedef struct kevent native_event;
#elif defined (__sun)
#include <port.h>
typedef port_event_t native_event;
#elif defined (HAVE_IOCP)
typedef OVERLAPPED_ENTRY native_event;
#else
#error threadpool is not available on this platform
#endif
#ifdef HAVE_IOCP
#define OPTIONAL_IO_POLL_READ_PARAM this
#else
#define OPTIONAL_IO_POLL_READ_PARAM 0
#endif
static void io_poll_close(TP_file_handle fd) static void io_poll_close(TP_file_handle fd)
{ {
...@@ -119,86 +90,7 @@ static PSI_thread_info thread_list[] = ...@@ -119,86 +90,7 @@ static PSI_thread_info thread_list[] =
#define PSI_register(X) /* no-op */ #define PSI_register(X) /* no-op */
#endif #endif
thread_group_t *all_groups;
struct thread_group_t;
/* Per-thread structure for workers */
struct worker_thread_t
{
ulonglong event_count; /* number of request handled by this thread */
thread_group_t* thread_group;
worker_thread_t *next_in_list;
worker_thread_t **prev_in_list;
mysql_cond_t cond;
bool woken;
};
typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
&worker_thread_t::next_in_list,
&worker_thread_t::prev_in_list>
>
worker_list_t;
struct TP_connection_generic:public TP_connection
{
TP_connection_generic(CONNECT *c);
~TP_connection_generic();
virtual int init(){ return 0; };
virtual void set_io_timeout(int sec);
virtual int start_io();
virtual void wait_begin(int type);
virtual void wait_end();
thread_group_t *thread_group;
TP_connection_generic *next_in_queue;
TP_connection_generic **prev_in_queue;
ulonglong abs_wait_timeout;
ulonglong dequeue_time;
TP_file_handle fd;
bool bound_to_poll_descriptor;
int waiting;
#ifdef HAVE_IOCP
OVERLAPPED overlapped;
#endif
#ifdef _WIN32
enum_vio_type vio_type;
#endif
};
typedef I_P_List<TP_connection_generic,
I_P_List_adapter<TP_connection_generic,
&TP_connection_generic::next_in_queue,
&TP_connection_generic::prev_in_queue>,
I_P_List_null_counter,
I_P_List_fast_push_back<TP_connection_generic> >
connection_queue_t;
const int NQUEUES=2; /* We have high and low priority queues*/
struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
{
mysql_mutex_t mutex;
connection_queue_t queues[NQUEUES];
worker_list_t waiting_threads;
worker_thread_t *listener;
pthread_attr_t *pthread_attr;
TP_file_handle pollfd;
int thread_count;
int active_thread_count;
int connection_count;
/* Stats for the deadlock detection timer routine.*/
int io_event_count;
int queue_event_count;
ulonglong last_thread_creation_time;
int shutdown_pipe[2];
bool shutdown;
bool stalled;
};
static thread_group_t *all_groups;
static uint group_count; static uint group_count;
static int32 shutdown_group_count; static int32 shutdown_group_count;
...@@ -224,9 +116,9 @@ static pool_timer_t pool_timer; ...@@ -224,9 +116,9 @@ static pool_timer_t pool_timer;
static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection); static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection);
static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt); static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt);
static int wake_thread(thread_group_t *thread_group); static int wake_thread(thread_group_t *thread_group,bool due_to_stall);
static int wake_or_create_thread(thread_group_t *thread_group); static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall=false);
static int create_worker(thread_group_t *thread_group); static int create_worker(thread_group_t *thread_group, bool due_to_stall);
static void *worker_main(void *param); static void *worker_main(void *param);
static void check_stall(thread_group_t *thread_group); static void check_stall(thread_group_t *thread_group);
static void set_next_timeout_check(ulonglong abstime); static void set_next_timeout_check(ulonglong abstime);
...@@ -563,11 +455,11 @@ static void queue_init(thread_group_t *thread_group) ...@@ -563,11 +455,11 @@ static void queue_init(thread_group_t *thread_group)
static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt) static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt)
{ {
ulonglong now= pool_timer.current_microtime; ulonglong now= threadpool_exact_stats?microsecond_interval_timer():pool_timer.current_microtime;
for(int i=0; i < cnt; i++) for(int i=0; i < cnt; i++)
{ {
TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]); TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]);
c->dequeue_time= now; c->enqueue_time= now;
thread_group->queues[c->priority].push_back(c); thread_group->queues[c->priority].push_back(c);
} }
} }
...@@ -681,7 +573,7 @@ void check_stall(thread_group_t *thread_group) ...@@ -681,7 +573,7 @@ void check_stall(thread_group_t *thread_group)
for (;;) for (;;)
{ {
c= thread_group->queues[TP_PRIORITY_LOW].front(); c= thread_group->queues[TP_PRIORITY_LOW].front();
if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer) if (c && pool_timer.current_microtime - c->enqueue_time > 1000ULL * threadpool_prio_kickup_timer)
{ {
thread_group->queues[TP_PRIORITY_LOW].remove(c); thread_group->queues[TP_PRIORITY_LOW].remove(c);
thread_group->queues[TP_PRIORITY_HIGH].push_back(c); thread_group->queues[TP_PRIORITY_HIGH].push_back(c);
...@@ -698,7 +590,7 @@ void check_stall(thread_group_t *thread_group) ...@@ -698,7 +590,7 @@ void check_stall(thread_group_t *thread_group)
*/ */
if (!thread_group->listener && !thread_group->io_event_count) if (!thread_group->listener && !thread_group->io_event_count)
{ {
wake_or_create_thread(thread_group); wake_or_create_thread(thread_group, true);
mysql_mutex_unlock(&thread_group->mutex); mysql_mutex_unlock(&thread_group->mutex);
return; return;
} }
...@@ -735,7 +627,8 @@ void check_stall(thread_group_t *thread_group) ...@@ -735,7 +627,8 @@ void check_stall(thread_group_t *thread_group)
if (!is_queue_empty(thread_group) && !thread_group->queue_event_count) if (!is_queue_empty(thread_group) && !thread_group->queue_event_count)
{ {
thread_group->stalled= true; thread_group->stalled= true;
wake_or_create_thread(thread_group); TP_INCREMENT_GROUP_COUNTER(thread_group,stalls);
wake_or_create_thread(thread_group,true);
} }
/* Reset queue event count */ /* Reset queue event count */
...@@ -790,13 +683,13 @@ static TP_connection_generic * listener(worker_thread_t *current_thread, ...@@ -790,13 +683,13 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
break; break;
cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1); cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_listener);
if (cnt <=0) if (cnt <=0)
{ {
DBUG_ASSERT(thread_group->shutdown); DBUG_ASSERT(thread_group->shutdown);
break; break;
} }
mysql_mutex_lock(&thread_group->mutex); mysql_mutex_lock(&thread_group->mutex);
if (thread_group->shutdown) if (thread_group->shutdown)
...@@ -864,7 +757,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread, ...@@ -864,7 +757,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
if(thread_group->active_thread_count==0) if(thread_group->active_thread_count==0)
{ {
/* We added some work items to queue, now wake a worker. */ /* We added some work items to queue, now wake a worker. */
if(wake_thread(thread_group)) if(wake_thread(thread_group, false))
{ {
/* /*
Wake failed, hence groups has no idle threads. Now check if there are Wake failed, hence groups has no idle threads. Now check if there are
...@@ -882,7 +775,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread, ...@@ -882,7 +775,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
create thread, but waiting for timer would be an inefficient and create thread, but waiting for timer would be an inefficient and
pointless delay. pointless delay.
*/ */
create_worker(thread_group); create_worker(thread_group, false);
} }
} }
} }
...@@ -919,7 +812,7 @@ static void add_thread_count(thread_group_t *thread_group, int32 count) ...@@ -919,7 +812,7 @@ static void add_thread_count(thread_group_t *thread_group, int32 count)
per group to prevent deadlocks (one listener + one worker) per group to prevent deadlocks (one listener + one worker)
*/ */
static int create_worker(thread_group_t *thread_group) static int create_worker(thread_group_t *thread_group, bool due_to_stall)
{ {
pthread_t thread_id; pthread_t thread_id;
bool max_threads_reached= false; bool max_threads_reached= false;
...@@ -942,6 +835,11 @@ static int create_worker(thread_group_t *thread_group) ...@@ -942,6 +835,11 @@ static int create_worker(thread_group_t *thread_group)
thread_group->last_thread_creation_time=microsecond_interval_timer(); thread_group->last_thread_creation_time=microsecond_interval_timer();
statistic_increment(thread_created,&LOCK_status); statistic_increment(thread_created,&LOCK_status);
add_thread_count(thread_group, 1); add_thread_count(thread_group, 1);
TP_INCREMENT_GROUP_COUNTER(thread_group,thread_creations);
if(due_to_stall)
{
TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations_due_to_stall);
}
} }
else else
{ {
...@@ -993,15 +891,17 @@ static ulonglong microsecond_throttling_interval(thread_group_t *thread_group) ...@@ -993,15 +891,17 @@ static ulonglong microsecond_throttling_interval(thread_group_t *thread_group)
Worker creation is throttled, so we avoid too many threads Worker creation is throttled, so we avoid too many threads
to be created during the short time. to be created during the short time.
*/ */
static int wake_or_create_thread(thread_group_t *thread_group) static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall)
{ {
DBUG_ENTER("wake_or_create_thread"); DBUG_ENTER("wake_or_create_thread");
if (thread_group->shutdown) if (thread_group->shutdown)
DBUG_RETURN(0); DBUG_RETURN(0);
if (wake_thread(thread_group) == 0) if (wake_thread(thread_group, due_to_stall) == 0)
{
DBUG_RETURN(0); DBUG_RETURN(0);
}
if (thread_group->thread_count > thread_group->connection_count) if (thread_group->thread_count > thread_group->connection_count)
DBUG_RETURN(-1); DBUG_RETURN(-1);
...@@ -1015,7 +915,7 @@ static int wake_or_create_thread(thread_group_t *thread_group) ...@@ -1015,7 +915,7 @@ static int wake_or_create_thread(thread_group_t *thread_group)
idle thread to wakeup. Smells like a potential deadlock or very slowly idle thread to wakeup. Smells like a potential deadlock or very slowly
executing requests, e.g sleeps or user locks. executing requests, e.g sleeps or user locks.
*/ */
DBUG_RETURN(create_worker(thread_group)); DBUG_RETURN(create_worker(thread_group, due_to_stall));
} }
ulonglong now = microsecond_interval_timer(); ulonglong now = microsecond_interval_timer();
...@@ -1026,9 +926,10 @@ static int wake_or_create_thread(thread_group_t *thread_group) ...@@ -1026,9 +926,10 @@ static int wake_or_create_thread(thread_group_t *thread_group)
if (time_since_last_thread_created > if (time_since_last_thread_created >
microsecond_throttling_interval(thread_group)) microsecond_throttling_interval(thread_group))
{ {
DBUG_RETURN(create_worker(thread_group)); DBUG_RETURN(create_worker(thread_group, due_to_stall));
} }
TP_INCREMENT_GROUP_COUNTER(thread_group,throttles);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
...@@ -1074,7 +975,7 @@ void thread_group_destroy(thread_group_t *thread_group) ...@@ -1074,7 +975,7 @@ void thread_group_destroy(thread_group_t *thread_group)
Wake sleeping thread from waiting list Wake sleeping thread from waiting list
*/ */
static int wake_thread(thread_group_t *thread_group) static int wake_thread(thread_group_t *thread_group,bool due_to_stall)
{ {
DBUG_ENTER("wake_thread"); DBUG_ENTER("wake_thread");
worker_thread_t *thread = thread_group->waiting_threads.front(); worker_thread_t *thread = thread_group->waiting_threads.front();
...@@ -1083,6 +984,11 @@ static int wake_thread(thread_group_t *thread_group) ...@@ -1083,6 +984,11 @@ static int wake_thread(thread_group_t *thread_group)
thread->woken= true; thread->woken= true;
thread_group->waiting_threads.remove(thread); thread_group->waiting_threads.remove(thread);
mysql_cond_signal(&thread->cond); mysql_cond_signal(&thread->cond);
TP_INCREMENT_GROUP_COUNTER(thread_group, wakes);
if (due_to_stall)
{
TP_INCREMENT_GROUP_COUNTER(thread_group, wakes_due_to_stall);
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */ DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
...@@ -1140,7 +1046,7 @@ static void thread_group_close(thread_group_t *thread_group) ...@@ -1140,7 +1046,7 @@ static void thread_group_close(thread_group_t *thread_group)
wake_listener(thread_group); wake_listener(thread_group);
/* Wake all workers. */ /* Wake all workers. */
while(wake_thread(thread_group) == 0) while(wake_thread(thread_group, false) == 0)
{ {
} }
...@@ -1224,7 +1130,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, ...@@ -1224,7 +1130,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
{ {
connection = queue_get(thread_group); connection = queue_get(thread_group);
if(connection) if(connection)
{
TP_INCREMENT_GROUP_COUNTER(thread_group,dequeues_by_worker);
break; break;
}
} }
/* If there is currently no listener in the group, become one. */ /* If there is currently no listener in the group, become one. */
...@@ -1235,7 +1144,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, ...@@ -1235,7 +1144,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
mysql_mutex_unlock(&thread_group->mutex); mysql_mutex_unlock(&thread_group->mutex);
connection = listener(current_thread, thread_group); connection = listener(current_thread, thread_group);
if (connection)
{
TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues_by_listener);
}
mysql_mutex_lock(&thread_group->mutex); mysql_mutex_lock(&thread_group->mutex);
thread_group->active_thread_count++; thread_group->active_thread_count++;
/* There is no listener anymore, it just returned. */ /* There is no listener anymore, it just returned. */
...@@ -1251,9 +1163,9 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, ...@@ -1251,9 +1163,9 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
*/ */
if (!oversubscribed) if (!oversubscribed)
{ {
native_event ev[MAX_EVENTS]; native_event ev[MAX_EVENTS];
int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0); int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0);
TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_worker);
if (cnt > 0) if (cnt > 0)
{ {
queue_put(thread_group, ev, cnt); queue_put(thread_group, ev, cnt);
...@@ -1300,6 +1212,7 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, ...@@ -1300,6 +1212,7 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
} }
thread_group->stalled= false; thread_group->stalled= false;
mysql_mutex_unlock(&thread_group->mutex); mysql_mutex_unlock(&thread_group->mutex);
DBUG_RETURN(connection); DBUG_RETURN(connection);
...@@ -1515,7 +1428,7 @@ static int change_group(TP_connection_generic *c, ...@@ -1515,7 +1428,7 @@ static int change_group(TP_connection_generic *c,
new_group->connection_count++; new_group->connection_count++;
/* Ensure that there is a listener in the new group. */ /* Ensure that there is a listener in the new group. */
if (!new_group->thread_count) if (!new_group->thread_count)
ret= create_worker(new_group); ret= create_worker(new_group, false);
mysql_mutex_unlock(&new_group->mutex); mysql_mutex_unlock(&new_group->mutex);
return ret; return ret;
} }
...@@ -1775,4 +1688,6 @@ static void print_pool_blocked_message(bool max_threads_reached) ...@@ -1775,4 +1688,6 @@ static void print_pool_blocked_message(bool max_threads_reached)
} }
} }
#endif /* HAVE_POOL_OF_THREADS */ #endif /* HAVE_POOL_OF_THREADS */
/* Copyright(C) 2019 MariaDB
*
* This program is free software; you can redistribute itand /or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; version 2 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#if defined (HAVE_POOL_OF_THREADS)
#include <my_global.h>
#include <sql_plist.h>
#include <my_pthread.h>
#include <threadpool.h>
#include <mysqld.h>
#include <violite.h>
#ifdef _WIN32
#include <windows.h>
/* AIX may define this, too ?*/
#define HAVE_IOCP
#endif
#ifdef _WIN32
typedef HANDLE TP_file_handle;
#else
typedef int TP_file_handle;
#define INVALID_HANDLE_VALUE -1
#endif
#ifdef __linux__
#include <sys/epoll.h>
typedef struct epoll_event native_event;
#elif defined(HAVE_KQUEUE)
#include <sys/event.h>
typedef struct kevent native_event;
#elif defined (__sun)
#include <port.h>
typedef port_event_t native_event;
#elif defined (HAVE_IOCP)
typedef OVERLAPPED_ENTRY native_event;
#else
#error threadpool is not available on this platform
#endif
struct thread_group_t;
/* Per-thread structure for workers */
struct worker_thread_t
{
ulonglong event_count; /* number of request handled by this thread */
thread_group_t* thread_group;
worker_thread_t* next_in_list;
worker_thread_t** prev_in_list;
mysql_cond_t cond;
bool woken;
};
typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
& worker_thread_t::next_in_list,
& worker_thread_t::prev_in_list>,
I_P_List_counter
>
worker_list_t;
struct TP_connection_generic :public TP_connection
{
TP_connection_generic(CONNECT* c);
~TP_connection_generic();
virtual int init() { return 0; };
virtual void set_io_timeout(int sec);
virtual int start_io();
virtual void wait_begin(int type);
virtual void wait_end();
thread_group_t* thread_group;
TP_connection_generic* next_in_queue;
TP_connection_generic** prev_in_queue;
ulonglong abs_wait_timeout;
ulonglong enqueue_time;
TP_file_handle fd;
bool bound_to_poll_descriptor;
int waiting;
#ifdef HAVE_IOCP
OVERLAPPED overlapped;
#endif
#ifdef _WIN32
enum_vio_type vio_type;
#endif
};
typedef I_P_List<TP_connection_generic,
I_P_List_adapter<TP_connection_generic,
& TP_connection_generic::next_in_queue,
& TP_connection_generic::prev_in_queue>,
I_P_List_counter,
I_P_List_fast_push_back<TP_connection_generic> >
connection_queue_t;
const int NQUEUES = 2; /* We have high and low priority queues*/
struct thread_group_counters_t
{
ulonglong thread_creations;
ulonglong thread_creations_due_to_stall;
ulonglong wakes;
ulonglong wakes_due_to_stall;
ulonglong throttles;
ulonglong stalls;
ulonglong dequeues_by_worker;
ulonglong dequeues_by_listener;
ulonglong polls_by_listener;
ulonglong polls_by_worker;
};
struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
{
mysql_mutex_t mutex;
connection_queue_t queues[NQUEUES];
worker_list_t waiting_threads;
worker_thread_t* listener;
pthread_attr_t* pthread_attr;
TP_file_handle pollfd;
int thread_count;
int active_thread_count;
int connection_count;
/* Stats for the deadlock detection timer routine.*/
int io_event_count;
int queue_event_count;
ulonglong last_thread_creation_time;
int shutdown_pipe[2];
bool shutdown;
bool stalled;
thread_group_counters_t counters;
};
#define TP_INCREMENT_GROUP_COUNTER(group,var) group->counters.var++;
extern thread_group_t* all_groups;
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment