Commit 830a7c67 authored by Sergey Vojtovich's avatar Sergey Vojtovich

MDEV-17441 - InnoDB transition to C++11 atomics

Simplified worker threads completion waiting, made it use std::atomic.
parent db9b96ef
...@@ -157,12 +157,9 @@ class purge_sys_t ...@@ -157,12 +157,9 @@ class purge_sys_t
MY_ALIGNED(CACHE_LINE_SIZE) MY_ALIGNED(CACHE_LINE_SIZE)
ReadView view; /*!< The purge will not remove undo logs ReadView view; /*!< The purge will not remove undo logs
which are >= this view (purge view) */ which are >= this view (purge view) */
/** Total number of tasks submitted by srv_purge_coordinator_thread. /** Number of not completed tasks. Accessed by srv_purge_coordinator
Not accessed by other threads. */ and srv_worker_thread by std::atomic. */
ulint n_submitted; std::atomic<ulint> n_tasks;
/** Number of completed tasks. Accessed by srv_purge_coordinator
and srv_worker_thread by my_atomic. */
ulint n_completed;
/** Iterator to the undo log records of committed transactions */ /** Iterator to the undo log records of committed transactions */
struct iterator struct iterator
...@@ -232,7 +229,7 @@ class purge_sys_t ...@@ -232,7 +229,7 @@ class purge_sys_t
uninitialised. Real initialisation happens in create(). uninitialised. Real initialisation happens in create().
*/ */
purge_sys_t() : event(NULL), m_enabled(false) {} purge_sys_t() : event(NULL), m_enabled(false), n_tasks(0) {}
/** Create the instance */ /** Create the instance */
......
...@@ -2445,7 +2445,7 @@ static bool srv_task_execute() ...@@ -2445,7 +2445,7 @@ static bool srv_task_execute()
UT_LIST_REMOVE(srv_sys.tasks, thr); UT_LIST_REMOVE(srv_sys.tasks, thr);
mutex_exit(&srv_sys.tasks_mutex); mutex_exit(&srv_sys.tasks_mutex);
que_run_threads(thr); que_run_threads(thr);
my_atomic_addlint(&purge_sys.n_completed, 1); purge_sys.n_tasks.fetch_sub(1, std::memory_order_release);
return true; return true;
} }
......
...@@ -164,8 +164,6 @@ void purge_sys_t::create() ...@@ -164,8 +164,6 @@ void purge_sys_t::create()
ut_ad(event); ut_ad(event);
m_paused= 0; m_paused= 0;
query= purge_graph_build(); query= purge_graph_build();
n_submitted= 0;
n_completed= 0;
next_stored= false; next_stored= false;
rseg= NULL; rseg= NULL;
page_no= 0; page_no= 0;
...@@ -185,6 +183,7 @@ void purge_sys_t::close() ...@@ -185,6 +183,7 @@ void purge_sys_t::close()
if (!event) return; if (!event) return;
ut_ad(!enabled()); ut_ad(!enabled());
ut_ad(n_tasks.load(std::memory_order_relaxed) == 0);
trx_t* trx = query->trx; trx_t* trx = query->trx;
que_graph_free(query); que_graph_free(query);
ut_ad(!trx->id); ut_ad(!trx->id);
...@@ -1266,8 +1265,7 @@ void ...@@ -1266,8 +1265,7 @@ void
trx_purge_wait_for_workers_to_complete() trx_purge_wait_for_workers_to_complete()
{ {
/* Ensure that the work queue empties out. */ /* Ensure that the work queue empties out. */
while (my_atomic_loadlint(&purge_sys.n_completed) while (purge_sys.n_tasks.load(std::memory_order_acquire)) {
!= purge_sys.n_submitted) {
if (srv_get_task_queue_length() > 0) { if (srv_get_task_queue_length() > 0) {
srv_release_threads(SRV_WORKER, 1); srv_release_threads(SRV_WORKER, 1);
...@@ -1298,9 +1296,8 @@ trx_purge( ...@@ -1298,9 +1296,8 @@ trx_purge(
srv_dml_needed_delay = trx_purge_dml_delay(); srv_dml_needed_delay = trx_purge_dml_delay();
/* The number of tasks submitted should be completed. */ /* All submitted tasks should be completed. */
ut_a(purge_sys.n_submitted ut_ad(purge_sys.n_tasks.load(std::memory_order_relaxed) == 0);
== my_atomic_loadlint(&purge_sys.n_completed));
rw_lock_x_lock(&purge_sys.latch); rw_lock_x_lock(&purge_sys.latch);
trx_sys.clone_oldest_view(); trx_sys.clone_oldest_view();
...@@ -1314,7 +1311,7 @@ trx_purge( ...@@ -1314,7 +1311,7 @@ trx_purge(
/* Fetch the UNDO recs that need to be purged. */ /* Fetch the UNDO recs that need to be purged. */
n_pages_handled = trx_purge_attach_undo_recs(n_purge_threads); n_pages_handled = trx_purge_attach_undo_recs(n_purge_threads);
purge_sys.n_submitted += n_purge_threads; purge_sys.n_tasks.store(n_purge_threads - 1, std::memory_order_relaxed);
/* Submit tasks to workers queue if using multi-threaded purge. */ /* Submit tasks to workers queue if using multi-threaded purge. */
for (ulint i = n_purge_threads; --i; ) { for (ulint i = n_purge_threads; --i; ) {
...@@ -1327,14 +1324,9 @@ trx_purge( ...@@ -1327,14 +1324,9 @@ trx_purge(
que_run_threads(thr); que_run_threads(thr);
my_atomic_addlint(&purge_sys.n_completed, 1); trx_purge_wait_for_workers_to_complete();
if (n_purge_threads > 1) { ut_ad(purge_sys.n_tasks.load(std::memory_order_relaxed) == 0);
trx_purge_wait_for_workers_to_complete();
}
ut_a(purge_sys.n_submitted
== my_atomic_loadlint(&purge_sys.n_completed));
if (truncate) { if (truncate) {
trx_purge_truncate_history(); trx_purge_truncate_history();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment