Commit ea52a3eb authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-26356 Adaptive purge scheduling based on redo log fill factor

This should be equivalent to pull request #1889 by Krunal Bauskar.

The existing logic in purge_coordinator_state::do_purge()
activates a number of the configured innodb_purge_threads
based on the history list length. Activating more purge worker
tasks should shrink the history list faster. But, more purge
workers will also generate more redo log, which may slow down
writes by user connections.

row_purge_parse_undo_rec(): Revert the work-around that was added in
commit 46904424.

purge_coordinator_state: Keep track of the redo log fill factor
(how big percentage of innodb_log_file_size is being occupied by
log records that were generated since the latest checkpoint).
If the redo log is getting full, log checkpoints will be triggered
more frequently, and user threads may end up waiting in
log_free_check(). We try to reduce purge-induced jitter in overall
throughput by throttling down the active number of purge tasks as
the log checkpoint age is approaching the log size (in other words,
the redo log fill factor is approaching 100%).
parent 717a3215
...@@ -1035,12 +1035,6 @@ row_purge_parse_undo_rec( ...@@ -1035,12 +1035,6 @@ row_purge_parse_undo_rec(
goto err_exit; goto err_exit;
} }
/* FIXME: We are acquiring exclusive dict_sys.latch only to
avoid increased wait times in
trx_purge_get_next_rec() and trx_purge_truncate_history(). */
dict_sys.lock(SRW_LOCK_CALL);
dict_sys.unlock();
already_locked: already_locked:
ut_ad(!node->table->is_temporary()); ut_ad(!node->table->is_temporary());
......
...@@ -533,7 +533,19 @@ struct purge_coordinator_state ...@@ -533,7 +533,19 @@ struct purge_coordinator_state
ulint n_use_threads; ulint n_use_threads;
ulint n_threads; ulint n_threads;
ulint lsn_lwm;
ulint lsn_hwm;
ulonglong start_time;
ulint lsn_age_factor;
static constexpr ulint adaptive_purge_threshold= 20;
static constexpr ulint safety_net= 20;
ulint series[innodb_purge_threads_MAX + 1];
inline void compute_series();
inline void lazy_init(); inline void lazy_init();
void refresh(bool full);
public: public:
inline void do_purge(); inline void do_purge();
}; };
...@@ -1731,8 +1743,15 @@ inline void purge_coordinator_state::do_purge() ...@@ -1731,8 +1743,15 @@ inline void purge_coordinator_state::do_purge()
{ {
loop: loop:
wakeup= false; wakeup= false;
const auto now= my_interval_timer();
const auto sigcount= m_running; const auto sigcount= m_running;
if (now - start_time >= 1000000)
{
refresh(false);
start_time= now;
}
const auto old_activity_count= srv_sys.activity_count; const auto old_activity_count= srv_sys.activity_count;
const auto history_size= trx_sys.history_size(); const auto history_size= trx_sys.history_size();
...@@ -1749,16 +1768,37 @@ inline void purge_coordinator_state::do_purge() ...@@ -1749,16 +1768,37 @@ inline void purge_coordinator_state::do_purge()
n_threads= n_use_threads= srv_n_purge_threads; n_threads= n_use_threads= srv_n_purge_threads;
srv_purge_thread_count_changed= 0; srv_purge_thread_count_changed= 0;
} }
refresh(true);
start_time= now;
} }
else if (history_size > m_history_length || else if (history_size > m_history_length)
(srv_max_purge_lag && m_history_length > srv_max_purge_lag))
{ {
/* dynamically adjust the purge thread based on redo log fill factor */ /* dynamically adjust the purge thread based on redo log fill factor */
if (n_threads > n_use_threads) if (n_use_threads < n_threads && lsn_age_factor < lsn_lwm)
{
more_threads:
++n_use_threads; ++n_use_threads;
lsn_hwm= lsn_lwm;
lsn_lwm-= series[n_use_threads];
}
else if (n_use_threads > 1 && lsn_age_factor >= lsn_hwm)
{
fewer_threads:
--n_use_threads;
lsn_lwm= lsn_hwm;
lsn_hwm+= series[n_use_threads];
}
else if (n_use_threads == 1 && lsn_age_factor >= 100 - safety_net)
{
wakeup= true;
break;
}
} }
else if (n_threads > n_use_threads &&
srv_max_purge_lag && m_history_length > srv_max_purge_lag)
goto more_threads;
else if (n_use_threads > 1 && old_activity_count == srv_sys.activity_count) else if (n_use_threads > 1 && old_activity_count == srv_sys.activity_count)
--n_use_threads; goto fewer_threads;
ut_ad(n_use_threads); ut_ad(n_use_threads);
ut_ad(n_use_threads <= n_threads); ut_ad(n_use_threads <= n_threads);
...@@ -1795,11 +1835,68 @@ inline void purge_coordinator_state::do_purge() ...@@ -1795,11 +1835,68 @@ inline void purge_coordinator_state::do_purge()
m_running= 0; m_running= 0;
} }
inline void purge_coordinator_state::compute_series()
{
ulint points= n_threads;
memset(series, 0, sizeof series);
constexpr ulint spread= 100 - adaptive_purge_threshold - safety_net;
/* We distribute spread across n_threads,
e.g.: spread of 60 is distributed across n_threads=4 as: 6+12+18+24 */
const ulint additional_points= (points * (points + 1)) / 2;
if (spread % additional_points == 0)
{
/* Arithmetic progression is possible. */
const ulint delta= spread / additional_points;
ulint growth= delta;
do
{
series[points--]= growth;
growth += delta;
}
while (points);
return;
}
/* Use average distribution to spread across the points */
const ulint delta= spread / points;
ulint total= 0;
do
{
series[points--]= delta;
total+= delta;
}
while (points);
for (points= 1; points <= n_threads && total++ < spread; )
series[points++]++;
}
inline void purge_coordinator_state::lazy_init() inline void purge_coordinator_state::lazy_init()
{ {
if (n_threads) if (n_threads)
return; return;
n_threads= n_use_threads= srv_n_purge_threads; n_threads= n_use_threads= srv_n_purge_threads;
refresh(true);
start_time= my_interval_timer();
}
void purge_coordinator_state::refresh(bool full)
{
if (full)
{
compute_series();
lsn_lwm= adaptive_purge_threshold;
lsn_hwm= adaptive_purge_threshold + series[n_threads];
}
mysql_mutex_lock(&log_sys.mutex);
const lsn_t last= log_sys.last_checkpoint_lsn,
max_age= log_sys.max_checkpoint_age;
mysql_mutex_unlock(&log_sys.mutex);
lsn_age_factor= ((log_sys.get_lsn() - last) * 100) / max_age;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment