Commit 44689eb7 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-32050: Improve srv_wake_purge_thread_if_not_active()

purge_sys_t::wake_if_not_active(): Replaces
srv_wake_purge_thread_if_not_active().

innodb_ddl_recovery_done(): Move the wakeup call to
srv_init_purge_tasks().

purge_coordinator_timer: Remove. The srv_master_callback() already
invokes purge_sys.wake_if_not_active() once per second.

Reviewed by: Vladislav Lesin and Vladislav Vaintroub
parent 14685b10
......@@ -238,7 +238,7 @@ static void innodb_max_purge_lag_wait_update(THD *thd, st_mysql_sys_var *,
const lsn_t lsn= log_sys.get_lsn();
if ((lsn - last) / 4 >= max_age / 5)
buf_flush_ahead(last + max_age / 5, false);
srv_wake_purge_thread_if_not_active();
purge_sys.wake_if_not_active();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
mysql_mutex_lock(&LOCK_global_system_variables);
......@@ -2159,8 +2159,6 @@ static void innodb_ddl_recovery_done(handlerton*)
if (srv_start_after_restore && !high_level_read_only)
drop_garbage_tables_after_restore();
srv_init_purge_tasks();
purge_sys.coordinator_startup();
srv_wake_purge_thread_if_not_active();
}
}
......@@ -16274,7 +16272,7 @@ innodb_show_status(
DBUG_RETURN(0);
}
srv_wake_purge_thread_if_not_active();
purge_sys.wake_if_not_active();
/* We let the InnoDB Monitor to output at most MAX_STATUS_SIZE
bytes of text. */
......@@ -19386,7 +19384,7 @@ static void innodb_undo_log_truncate_update(THD *thd, struct st_mysql_sys_var*,
void*, const void *save)
{
if ((srv_undo_log_truncate= *static_cast<const my_bool*>(save)))
srv_wake_purge_thread_if_not_active();
purge_sys.wake_if_not_active();
}
static MYSQL_SYSVAR_BOOL(undo_log_truncate, srv_undo_log_truncate,
......
......@@ -570,10 +570,6 @@ Frees the data structures created in srv_init(). */
void
srv_free(void);
/** Wake up the purge if there is work to do. */
void
srv_wake_purge_thread_if_not_active();
/******************************************************************//**
Outputs to a file the output of the InnoDB Monitor.
@return FALSE if not all information printed
......
......@@ -235,12 +235,12 @@ class purge_sys_t
bool paused()
{ return m_paused != 0; }
/** Enable purge at startup. Not protected by latch; the main thread
will wait for purge_sys.enabled() in srv_start() */
/** Enable purge at startup. */
void coordinator_startup()
{
ut_ad(!enabled());
m_enabled.store(true, std::memory_order_relaxed);
wake_if_not_active();
}
/** Disable purge at shutdown */
......@@ -320,6 +320,9 @@ class purge_sys_t
latch.wr_unlock();
}
/** Wake up the purge threads if there is work to do. */
void wake_if_not_active();
/** Update end_view at the end of a purge batch.
@param head the new head of the purge queue */
inline void clone_end_view(const iterator &head);
......
......@@ -82,7 +82,7 @@ static void row_mysql_delay_if_needed()
const lsn_t lsn= log_sys.get_lsn();
if ((lsn - last) / 4 >= max_age / 5)
buf_flush_ahead(last + max_age / 5, false);
srv_wake_purge_thread_if_not_active();
purge_sys.wake_if_not_active();
std::this_thread::sleep_for(std::chrono::microseconds(delay));
}
}
......
......@@ -1301,12 +1301,10 @@ static tpool::task_group purge_truncation_task_group(1);
static tpool::waitable_task purge_truncation_task
(purge_truncation_callback, nullptr, &purge_truncation_task_group);
static tpool::timer *purge_coordinator_timer;
/** Wake up the purge threads if there is work to do. */
void srv_wake_purge_thread_if_not_active()
void purge_sys_t::wake_if_not_active()
{
if (purge_sys.enabled() && !purge_sys.paused() &&
if (enabled() && !paused() && !purge_state.m_running &&
(srv_undo_log_truncate || trx_sys.history_exists()) &&
++purge_state.m_running == 1)
srv_thread_pool->submit_task(&purge_coordinator_task);
......@@ -1389,8 +1387,8 @@ void purge_sys_t::resume()
if (paused == 1)
{
ib::info() << "Resuming purge";
purge_state.m_running = 0;
srv_wake_purge_thread_if_not_active();
purge_state.m_running= 1;
srv_thread_pool->submit_task(&purge_coordinator_task);
MONITOR_ATOMIC_INC(MONITOR_PURGE_RESUME_COUNT);
}
latch.wr_unlock();
......@@ -1535,8 +1533,7 @@ void srv_master_callback(void*)
ut_a(srv_shutdown_state <= SRV_SHUTDOWN_INITIATED);
MONITOR_INC(MONITOR_MASTER_THREAD_SLEEP);
if (!purge_state.m_running)
srv_wake_purge_thread_if_not_active();
purge_sys.wake_if_not_active();
ulonglong counter_time= microsecond_interval_timer();
srv_sync_log_buffer_in_background();
MONITOR_INC_TIME_IN_MICRO_SECS(MONITOR_SRV_LOG_FLUSH_MICROSECOND,
......@@ -1629,16 +1626,11 @@ inline void purge_coordinator_state::do_purge()
ut_ad(!srv_read_only_mode);
lazy_init();
ut_ad(n_threads);
bool wakeup= false;
purge_coordinator_timer->disarm();
while (purge_sys.enabled() && !purge_sys.paused())
{
loop:
wakeup= false;
const auto now= my_interval_timer();
const auto sigcount= m_running;
if (now - start_time >= 1000000)
{
......@@ -1683,10 +1675,7 @@ inline void purge_coordinator_state::do_purge()
lsn_hwm+= series[n_use_threads];
}
else if (n_use_threads == 1 && lsn_age_factor >= 100 - safety_net)
{
wakeup= true;
break;
}
}
else if (n_threads > n_use_threads &&
srv_max_purge_lag && m_history_length > srv_max_purge_lag)
......@@ -1701,50 +1690,30 @@ inline void purge_coordinator_state::do_purge()
if (!history_size)
{
no_history:
srv_dml_needed_delay= 0;
purge_truncation_task.wait();
trx_purge_truncate_history();
}
else
{
ulint n_pages_handled= trx_purge(n_use_threads, history_size);
if (purge_sys.truncate.current ||
srv_shutdown_state != SRV_SHUTDOWN_NONE)
{
purge_truncation_task.wait();
trx_purge_truncate_history();
}
else
srv_thread_pool->submit_task(&purge_truncation_task);
if (n_pages_handled)
continue;
}
if (srv_dml_needed_delay);
else if (m_running == sigcount)
{
/* Purge was not woken up by srv_wake_purge_thread_if_not_active() */
/* The magic number 5000 is an approximation for the case where we have
cached undo log records which prevent truncate of rollback segments. */
wakeup= history_size >= 5000 ||
(history_size && history_size != trx_sys.history_size_approx());
break;
}
ulint n_pages_handled= trx_purge(n_use_threads, history_size);
if (!trx_sys.history_exists())
goto no_history;
if (purge_sys.truncate.current || srv_shutdown_state != SRV_SHUTDOWN_NONE)
{
srv_dml_needed_delay= 0;
break;
purge_truncation_task.wait();
trx_purge_truncate_history();
}
else
srv_thread_pool->submit_task(&purge_truncation_task);
if (!n_pages_handled)
break;
if (!srv_purge_should_exit(history_size))
goto loop;
}
if (wakeup)
purge_coordinator_timer->set_time(10, 0);
m_running= 0;
}
......@@ -1878,15 +1847,12 @@ static void purge_coordinator_callback(void*)
void srv_init_purge_tasks()
{
purge_create_background_thds(srv_n_purge_threads);
purge_coordinator_timer= srv_thread_pool->create_timer
(purge_coordinator_callback, nullptr);
purge_sys.coordinator_startup();
}
static void srv_shutdown_purge_tasks()
{
purge_coordinator_task.disable();
delete purge_coordinator_timer;
purge_coordinator_timer= nullptr;
purge_worker_task.wait();
std::unique_lock<std::mutex> lk(purge_thd_mutex);
while (!purge_thds.empty())
......@@ -1944,7 +1910,7 @@ void srv_purge_shutdown()
{
history_size= trx_sys.history_size();
ut_a(!purge_sys.paused());
srv_wake_purge_thread_if_not_active();
srv_thread_pool->submit_task(&purge_coordinator_task);
purge_coordinator_task.wait();
}
purge_sys.coordinator_shutdown();
......
......@@ -127,10 +127,13 @@ TRANSACTIONAL_INLINE inline bool TrxUndoRsegsIterator::set_next()
/* Only the purge_coordinator_task will access this object
purge_sys.rseg_iter, or any of purge_sys.hdr_page_no,
purge_sys.tail.
The field purge_sys.head and purge_sys.view are only modified by
The field purge_sys.head and purge_sys.view are modified by
purge_sys_t::clone_end_view()
in the purge_coordinator_task
while holding exclusive purge_sys.latch.
The purge_sys.view may also be modified by
purge_sys_t::wake_if_not_active() while holding exclusive
purge_sys.latch.
The purge_sys.head may be read by
purge_truncation_callback(). */
ut_ad(last_trx_no == m_rsegs.trx_no);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment