Commit 66de4fef authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-16264 - some improvements

- wait notification, tpool_wait_begin/tpool_wait_end - to notify the
threadpool that current thread is going to wait

Use it to wait for IOs to complete and also when purge waits for workers.
parent d3b2625b
...@@ -84,10 +84,12 @@ class io_slots ...@@ -84,10 +84,12 @@ class io_slots
private: private:
tpool::cache<tpool::aiocb> m_cache; tpool::cache<tpool::aiocb> m_cache;
tpool::task_group m_group; tpool::task_group m_group;
int m_max_aio;
public: public:
io_slots(int max_submitted_io, int max_callback_concurrency) : io_slots(int max_submitted_io, int max_callback_concurrency) :
m_cache(max_submitted_io), m_cache(max_submitted_io),
m_group(max_callback_concurrency) m_group(max_callback_concurrency),
m_max_aio(max_submitted_io)
{ {
} }
/* Get cached AIO control block */ /* Get cached AIO control block */
...@@ -112,6 +114,11 @@ class io_slots ...@@ -112,6 +114,11 @@ class io_slots
m_cache.wait(); m_cache.wait();
} }
size_t pending_io_count()
{
return (size_t)m_max_aio - m_cache.size();
}
tpool::task_group* get_task_group() tpool::task_group* get_task_group()
{ {
return &m_group; return &m_group;
...@@ -4058,7 +4065,12 @@ void os_aio_free() ...@@ -4058,7 +4065,12 @@ void os_aio_free()
be other, synchronous, pending writes. */ be other, synchronous, pending writes. */
void os_aio_wait_until_no_pending_writes() void os_aio_wait_until_no_pending_writes()
{ {
if (write_slots->pending_io_count())
{
tpool::tpool_wait_begin();
write_slots->wait(); write_slots->wait();
tpool::tpool_wait_end();
}
} }
......
...@@ -1239,7 +1239,13 @@ extern tpool::waitable_task purge_worker_task; ...@@ -1239,7 +1239,13 @@ extern tpool::waitable_task purge_worker_task;
/** Wait for pending purge jobs to complete. */ /** Wait for pending purge jobs to complete. */
static void trx_purge_wait_for_workers_to_complete() static void trx_purge_wait_for_workers_to_complete()
{ {
if (purge_worker_task.get_ref_count())
{
tpool::tpool_wait_begin();
purge_worker_task.wait(); purge_worker_task.wait();
tpool::tpool_wait_end();
}
/* There should be no outstanding tasks as long /* There should be no outstanding tasks as long
as the worker threads are active. */ as the worker threads are active. */
ut_ad(srv_get_task_queue_length() == 0); ut_ad(srv_get_task_queue_length() == 0);
......
...@@ -22,6 +22,7 @@ ADD_LIBRARY(tpool STATIC ...@@ -22,6 +22,7 @@ ADD_LIBRARY(tpool STATIC
tpool_generic.cc tpool_generic.cc
task_group.cc task_group.cc
task.cc task.cc
wait_notification.cc
${EXTRA_SOURCES} ${EXTRA_SOURCES}
) )
......
...@@ -214,6 +214,8 @@ class thread_pool ...@@ -214,6 +214,8 @@ class thread_pool
int bind(native_file_handle &fd) { return m_aio->bind(fd); } int bind(native_file_handle &fd) { return m_aio->bind(fd); }
void unbind(const native_file_handle &fd) { m_aio->unbind(fd); } void unbind(const native_file_handle &fd) { m_aio->unbind(fd); }
int submit_io(aiocb *cb) { return m_aio->submit_io(cb); } int submit_io(aiocb *cb) { return m_aio->submit_io(cb); }
virtual void wait_begin() {};
virtual void wait_end() {};
virtual ~thread_pool() {} virtual ~thread_pool() {}
}; };
const int DEFAULT_MIN_POOL_THREADS= 1; const int DEFAULT_MIN_POOL_THREADS= 1;
...@@ -221,6 +223,8 @@ const int DEFAULT_MAX_POOL_THREADS= 500; ...@@ -221,6 +223,8 @@ const int DEFAULT_MAX_POOL_THREADS= 500;
extern thread_pool * extern thread_pool *
create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS, create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS,
int max_threads= DEFAULT_MAX_POOL_THREADS); int max_threads= DEFAULT_MAX_POOL_THREADS);
extern "C" void tpool_wait_begin();
extern "C" void tpool_wait_end();
#ifdef _WIN32 #ifdef _WIN32
extern thread_pool * extern thread_pool *
create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS, create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS,
......
...@@ -70,8 +70,6 @@ namespace tpool ...@@ -70,8 +70,6 @@ namespace tpool
and also ensures that idle timeout works well. LIFO wakeup order ensures and also ensures that idle timeout works well. LIFO wakeup order ensures
that active threads stay active, and idle ones stay idle. that active threads stay active, and idle ones stay idle.
- to minimize spurious wakeups, some items are not put into the queue. Instead
submit() will pass the data directly to the thread it woke up.
*/ */
/** /**
...@@ -109,7 +107,8 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data ...@@ -109,7 +107,8 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data
{ {
NONE = 0, NONE = 0,
EXECUTING_TASK = 1, EXECUTING_TASK = 1,
LONG_TASK = 2 LONG_TASK = 2,
WAITING = 4
}; };
int m_state; int m_state;
...@@ -154,6 +153,9 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data ...@@ -154,6 +153,9 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data
} }
}; };
static thread_local worker_data* tls_worker_data;
class thread_pool_generic : public thread_pool class thread_pool_generic : public thread_pool
{ {
/** Cache for per-worker structures */ /** Cache for per-worker structures */
...@@ -186,6 +188,7 @@ class thread_pool_generic : public thread_pool ...@@ -186,6 +188,7 @@ class thread_pool_generic : public thread_pool
/** Overall number of enqueues*/ /** Overall number of enqueues*/
unsigned long long m_tasks_enqueued; unsigned long long m_tasks_enqueued;
unsigned long long m_group_enqueued;
/** Overall number of dequeued tasks. */ /** Overall number of dequeued tasks. */
unsigned long long m_tasks_dequeued; unsigned long long m_tasks_dequeued;
...@@ -212,6 +215,8 @@ class thread_pool_generic : public thread_pool ...@@ -212,6 +215,8 @@ class thread_pool_generic : public thread_pool
adjusting concurrency */ adjusting concurrency */
int m_long_tasks_count; int m_long_tasks_count;
int m_waiting_task_count;
/** Last time thread was created*/ /** Last time thread was created*/
std::chrono::system_clock::time_point m_last_thread_creation; std::chrono::system_clock::time_point m_last_thread_creation;
...@@ -237,7 +242,8 @@ class thread_pool_generic : public thread_pool ...@@ -237,7 +242,8 @@ class thread_pool_generic : public thread_pool
} }
bool add_thread(); bool add_thread();
bool wake(worker_wake_reason reason, task *t = nullptr); bool wake(worker_wake_reason reason, task *t = nullptr);
void wake_or_create_thread(); void maybe_wake_or_create_thread();
bool too_many_active_threads();
bool get_task(worker_data *thread_var, task **t); bool get_task(worker_data *thread_var, task **t);
bool wait_for_tasks(std::unique_lock<std::mutex> &lk, bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
worker_data *thread_var); worker_data *thread_var);
...@@ -250,6 +256,8 @@ class thread_pool_generic : public thread_pool ...@@ -250,6 +256,8 @@ class thread_pool_generic : public thread_pool
public: public:
thread_pool_generic(int min_threads, int max_threads); thread_pool_generic(int min_threads, int max_threads);
~thread_pool_generic(); ~thread_pool_generic();
void wait_begin() override;
void wait_end() override;
void submit_task(task *task) override; void submit_task(task *task) override;
virtual aio *create_native_aio(int max_io) override virtual aio *create_native_aio(int max_io) override
{ {
...@@ -447,31 +455,24 @@ bool thread_pool_generic::get_task(worker_data *thread_var, task **t) ...@@ -447,31 +455,24 @@ bool thread_pool_generic::get_task(worker_data *thread_var, task **t)
thread_var->m_state = worker_data::NONE; thread_var->m_state = worker_data::NONE;
if (m_task_queue.empty()) while (m_task_queue.empty())
{ {
if (m_in_shutdown) if (m_in_shutdown)
return false; return false;
if (!wait_for_tasks(lk, thread_var)) if (!wait_for_tasks(lk, thread_var))
return false; return false;
if (m_task_queue.empty())
/* Task was handed over directly by signaling thread.*/
if (thread_var->m_wake_reason == WAKE_REASON_TASK)
{ {
*t= thread_var->m_task; m_spurious_wakeups++;
goto end; continue;
} }
if (m_task_queue.empty())
return false;
} }
/* Dequeue from the task queue.*/ /* Dequeue from the task queue.*/
*t= m_task_queue.front(); *t= m_task_queue.front();
m_task_queue.pop(); m_task_queue.pop();
m_tasks_dequeued++; m_tasks_dequeued++;
end:
thread_var->m_state |= worker_data::EXECUTING_TASK; thread_var->m_state |= worker_data::EXECUTING_TASK;
thread_var->m_task_start_time = m_timestamp; thread_var->m_task_start_time = m_timestamp;
return true; return true;
...@@ -491,14 +492,18 @@ void thread_pool_generic::worker_end(worker_data* thread_data) ...@@ -491,14 +492,18 @@ void thread_pool_generic::worker_end(worker_data* thread_data)
} }
} }
extern "C" void set_tls_pool(tpool::thread_pool* pool);
/* The worker get/execute task loop.*/ /* The worker get/execute task loop.*/
void thread_pool_generic::worker_main(worker_data *thread_var) void thread_pool_generic::worker_main(worker_data *thread_var)
{ {
task* task; task* task;
set_tls_pool(this);
if(m_worker_init_callback) if(m_worker_init_callback)
m_worker_init_callback(); m_worker_init_callback();
tls_worker_data = thread_var;
while (get_task(thread_var, &task) && task) while (get_task(thread_var, &task) && task)
{ {
task->execute(); task->execute();
...@@ -557,12 +562,10 @@ void thread_pool_generic::maintainence() ...@@ -557,12 +562,10 @@ void thread_pool_generic::maintainence()
m_long_tasks_count++; m_long_tasks_count++;
} }
} }
maybe_wake_or_create_thread();
size_t thread_cnt = (int)thread_count(); size_t thread_cnt = (int)thread_count();
if (m_active_threads.size() - m_long_tasks_count < m_concurrency*OVERSUBSCRIBE_FACTOR)
{
wake_or_create_thread();
return;
}
if (m_last_activity == m_tasks_dequeued + m_wakeups && if (m_last_activity == m_tasks_dequeued + m_wakeups &&
m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt) m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
{ {
...@@ -638,7 +641,7 @@ bool thread_pool_generic::add_thread() ...@@ -638,7 +641,7 @@ bool thread_pool_generic::add_thread()
} }
/** Wake a standby thread, and hand the given task over to this thread. */ /** Wake a standby thread, and hand the given task over to this thread. */
bool thread_pool_generic::wake(worker_wake_reason reason, task *t) bool thread_pool_generic::wake(worker_wake_reason reason, task *)
{ {
assert(reason != WAKE_REASON_NONE); assert(reason != WAKE_REASON_NONE);
...@@ -650,10 +653,6 @@ bool thread_pool_generic::wake(worker_wake_reason reason, task *t) ...@@ -650,10 +653,6 @@ bool thread_pool_generic::wake(worker_wake_reason reason, task *t)
assert(var->m_wake_reason == WAKE_REASON_NONE); assert(var->m_wake_reason == WAKE_REASON_NONE);
var->m_wake_reason= reason; var->m_wake_reason= reason;
var->m_cv.notify_one(); var->m_cv.notify_one();
if (t)
{
var->m_task= t;
}
m_wakeups++; m_wakeups++;
return true; return true;
} }
...@@ -673,10 +672,11 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) : ...@@ -673,10 +672,11 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
m_tasks_dequeued(), m_tasks_dequeued(),
m_wakeups(), m_wakeups(),
m_spurious_wakeups(), m_spurious_wakeups(),
m_concurrency(std::thread::hardware_concurrency()), m_concurrency(std::thread::hardware_concurrency()*2),
m_in_shutdown(), m_in_shutdown(),
m_timestamp(), m_timestamp(),
m_long_tasks_count(), m_long_tasks_count(),
m_waiting_task_count(),
m_last_thread_creation(), m_last_thread_creation(),
m_min_threads(min_threads), m_min_threads(min_threads),
m_max_threads(max_threads), m_max_threads(max_threads),
...@@ -700,14 +700,15 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) : ...@@ -700,14 +700,15 @@ thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
} }
void thread_pool_generic::wake_or_create_thread() void thread_pool_generic::maybe_wake_or_create_thread()
{ {
assert(!m_task_queue.empty()); if (m_task_queue.empty())
return;
if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency)
return;
if (!m_standby_threads.empty()) if (!m_standby_threads.empty())
{ {
auto t= m_task_queue.front(); wake(WAKE_REASON_TASK);
m_task_queue.pop();
wake(WAKE_REASON_TASK, t);
} }
else else
{ {
...@@ -715,6 +716,11 @@ void thread_pool_generic::wake_or_create_thread() ...@@ -715,6 +716,11 @@ void thread_pool_generic::wake_or_create_thread()
} }
} }
bool thread_pool_generic::too_many_active_threads()
{
return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count >
m_concurrency* OVERSUBSCRIBE_FACTOR;
}
/** Submit a new task*/ /** Submit a new task*/
void thread_pool_generic::submit_task(task* task) void thread_pool_generic::submit_task(task* task)
...@@ -725,9 +731,35 @@ void thread_pool_generic::submit_task(task* task) ...@@ -725,9 +731,35 @@ void thread_pool_generic::submit_task(task* task)
task->add_ref(); task->add_ref();
m_tasks_enqueued++; m_tasks_enqueued++;
m_task_queue.push(task); m_task_queue.push(task);
maybe_wake_or_create_thread();
}
if (m_active_threads.size() - m_long_tasks_count < m_concurrency *OVERSUBSCRIBE_FACTOR) /* Notify thread pool that current thread is going to wait */
wake_or_create_thread(); void thread_pool_generic::wait_begin()
{
if (!tls_worker_data || tls_worker_data->is_long_task())
return;
tls_worker_data->m_state |= worker_data::WAITING;
std::unique_lock<std::mutex> lk(m_mtx);
m_waiting_task_count++;
/* Maintain concurrency */
if (m_task_queue.empty())
return;
if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count < m_concurrency)
maybe_wake_or_create_thread();
}
void thread_pool_generic::wait_end()
{
if (tls_worker_data && (tls_worker_data->m_state & worker_data::WAITING))
{
tls_worker_data->m_state &= ~worker_data::WAITING;
std::unique_lock<std::mutex> lk(m_mtx);
m_waiting_task_count--;
}
} }
/** /**
......
...@@ -105,6 +105,11 @@ template<typename T> class cache ...@@ -105,6 +105,11 @@ template<typename T> class cache
m_cv.wait(lk); m_cv.wait(lk);
m_waiters--; m_waiters--;
} }
size_t size()
{
return m_cache.size();
}
}; };
......
#include <tpool.h>
static thread_local tpool::thread_pool* tls_thread_pool;
extern "C" void set_tls_pool(tpool::thread_pool* pool)
{
tls_thread_pool = pool;
}
extern "C" void tpool_wait_begin()
{
if (tls_thread_pool)
tls_thread_pool->wait_begin();
}
extern "C" void tpool_wait_end()
{
if (tls_thread_pool)
tls_thread_pool->wait_end();
}
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment