Commit c27577a1 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

MDEV-21326 : Address TSAN warnings in tpool.

1. Fix places where data race warnings were relevant.

tls_worker_data::m_state should be modified under mutex protection,
since both maintainence timer and current worker set this flag.

2. Suppress warnings that are legitimate, yet harmless.
Apparently, the dirty reads in waitable_task::get_ref_count() or
write_slots->pending_io_count()

Avoiding race entirely without side-effects here is tricky,
and the effects of race is harmless.

The worst thing that can happen due to race is an extra wait notification,
under rare circumstances.
parent 1bbb67b3
...@@ -4082,12 +4082,15 @@ void os_aio_free() ...@@ -4082,12 +4082,15 @@ void os_aio_free()
be other, synchronous, pending writes. */ be other, synchronous, pending writes. */
void os_aio_wait_until_no_pending_writes() void os_aio_wait_until_no_pending_writes()
{ {
if (write_slots->pending_io_count()) bool notify_wait = write_slots->pending_io_count() > 0;
{
if (notify_wait)
tpool::tpool_wait_begin(); tpool::tpool_wait_begin();
write_slots->wait(); write_slots->wait();
if (notify_wait)
tpool::tpool_wait_end(); tpool::tpool_wait_end();
}
} }
......
...@@ -1243,12 +1243,15 @@ extern tpool::waitable_task purge_worker_task; ...@@ -1243,12 +1243,15 @@ extern tpool::waitable_task purge_worker_task;
/** Wait for pending purge jobs to complete. */ /** Wait for pending purge jobs to complete. */
static void trx_purge_wait_for_workers_to_complete() static void trx_purge_wait_for_workers_to_complete()
{ {
if (purge_worker_task.get_ref_count()) bool notify_wait = purge_worker_task.is_running();
{
if (notify_wait)
tpool::tpool_wait_begin(); tpool::tpool_wait_begin();
purge_worker_task.wait(); purge_worker_task.wait();
if(notify_wait)
tpool::tpool_wait_end(); tpool::tpool_wait_end();
}
/* There should be no outstanding tasks as long /* There should be no outstanding tasks as long
as the worker threads are active. */ as the worker threads are active. */
......
...@@ -100,8 +100,8 @@ class waitable_task :public task ...@@ -100,8 +100,8 @@ class waitable_task :public task
waitable_task(callback_func func, void* arg, task_group* group = nullptr); waitable_task(callback_func func, void* arg, task_group* group = nullptr);
void add_ref() override; void add_ref() override;
void release() override; void release() override;
bool is_running() { return m_ref_count > 0; } TPOOL_SUPPRESS_TSAN bool is_running() { return get_ref_count() > 0; }
bool get_ref_count() {return m_ref_count;} TPOOL_SUPPRESS_TSAN int get_ref_count() {return m_ref_count;}
void wait(); void wait();
virtual ~waitable_task() {}; virtual ~waitable_task() {};
}; };
......
...@@ -121,6 +121,10 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data ...@@ -121,6 +121,10 @@ struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data
{ {
return m_state & LONG_TASK; return m_state & LONG_TASK;
} }
bool is_waiting()
{
return m_state & WAITING;
}
std::chrono::system_clock::time_point m_task_start_time; std::chrono::system_clock::time_point m_task_start_time;
worker_data() : worker_data() :
m_cv(), m_cv(),
...@@ -738,10 +742,10 @@ void thread_pool_generic::submit_task(task* task) ...@@ -738,10 +742,10 @@ void thread_pool_generic::submit_task(task* task)
/* Notify thread pool that current thread is going to wait */ /* Notify thread pool that current thread is going to wait */
void thread_pool_generic::wait_begin() void thread_pool_generic::wait_begin()
{ {
if (!tls_worker_data || tls_worker_data->is_long_task()) if (!tls_worker_data || tls_worker_data->is_long_task() || tls_worker_data->is_waiting())
return; return;
tls_worker_data->m_state |= worker_data::WAITING;
std::unique_lock<std::mutex> lk(m_mtx); std::unique_lock<std::mutex> lk(m_mtx);
tls_worker_data->m_state |= worker_data::WAITING;
m_waiting_task_count++; m_waiting_task_count++;
/* Maintain concurrency */ /* Maintain concurrency */
...@@ -754,10 +758,10 @@ void thread_pool_generic::wait_begin() ...@@ -754,10 +758,10 @@ void thread_pool_generic::wait_begin()
void thread_pool_generic::wait_end() void thread_pool_generic::wait_end()
{ {
if (tls_worker_data && (tls_worker_data->m_state & worker_data::WAITING)) if (tls_worker_data && tls_worker_data->is_waiting())
{ {
tls_worker_data->m_state &= ~worker_data::WAITING;
std::unique_lock<std::mutex> lk(m_mtx); std::unique_lock<std::mutex> lk(m_mtx);
tls_worker_data->m_state &= ~worker_data::WAITING;
m_waiting_task_count--; m_waiting_task_count--;
} }
} }
......
...@@ -21,6 +21,22 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ ...@@ -21,6 +21,22 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include <assert.h> #include <assert.h>
#include <algorithm> #include <algorithm>
/* Suppress TSAN warnings, that we believe are not critical. */
#if defined(__has_feature)
#define TPOOL_HAS_FEATURE(...) __has_feature(__VA_ARGS__)
#else
#define TPOOL_HAS_FEATURE(...) 0
#endif
#if TPOOL_HAS_FEATURE(address_sanitizer)
#define TPOOL_SUPPRESS_TSAN __attribute__((no_sanitize("thread"),noinline))
#elif defined(__GNUC__) && defined (__SANITIZE_THREAD__)
#define TPOOL_SUPPRESS_TSAN __attribute__((no_sanitize_thread,noinline))
#else
#define TPOOL_SUPPRESS_TSAN
#endif
namespace tpool namespace tpool
{ {
...@@ -106,7 +122,7 @@ template<typename T> class cache ...@@ -106,7 +122,7 @@ template<typename T> class cache
m_waiters--; m_waiters--;
} }
size_t size() TPOOL_SUPPRESS_TSAN size_t size()
{ {
return m_cache.size(); return m_cache.size();
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment