Commit d70a98ae authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-32050: Revert the throttling of MDEV-26356

purge_coordinator_state::do_purge(): Simply use all innodb_purge_threads,
no matter what the LSN age is. During shutdown with innodb_fast_shutdown=0
this code could degrade to using only 1 thread.

Also, restore periodical "InnoDB: to purge" messages that were
accidentally disabled in commit 80585c9d.

Reviewed by: Vladislav Lesin and Vladislav Vaintroub
parent 2027c482
...@@ -610,16 +610,6 @@ srv_que_task_enqueue_low( ...@@ -610,16 +610,6 @@ srv_que_task_enqueue_low(
/*=====================*/ /*=====================*/
que_thr_t* thr); /*!< in: query thread */ que_thr_t* thr); /*!< in: query thread */
/**
Flag which is set, whenever innodb_purge_threads changes.
It is read and reset in srv_do_purge().
Thus it is Atomic_counter<int>, not bool, since unprotected
reads are used. We just need an atomic with relaxed memory
order, to please Thread Sanitizer.
*/
extern Atomic_counter<int> srv_purge_thread_count_changed;
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
/** @return whether purge or master task is active */ /** @return whether purge or master task is active */
bool srv_any_background_activity(); bool srv_any_background_activity();
......
...@@ -506,25 +506,8 @@ static srv_sys_t srv_sys; ...@@ -506,25 +506,8 @@ static srv_sys_t srv_sys;
struct purge_coordinator_state struct purge_coordinator_state
{ {
/** Snapshot of the last history length before the purge call.*/ /** Snapshot of the last history length before the purge call.*/
size_t m_history_length; size_t history_size;
Atomic_counter<int> m_running; Atomic_counter<int> m_running;
private:
ulint n_use_threads;
ulint n_threads;
ulint lsn_lwm;
ulint lsn_hwm;
ulonglong start_time;
ulint lsn_age_factor;
static constexpr ulint adaptive_purge_threshold= 20;
static constexpr ulint safety_net= 20;
ulint series[innodb_purge_threads_MAX + 1];
inline void compute_series();
inline void lazy_init();
void refresh(bool full);
public: public:
inline void do_purge(); inline void do_purge();
}; };
...@@ -1610,83 +1593,50 @@ static bool srv_task_execute() ...@@ -1610,83 +1593,50 @@ static bool srv_task_execute()
static void purge_create_background_thds(int ); static void purge_create_background_thds(int );
std::mutex purge_thread_count_mtx; /** Flag which is set, whenever innodb_purge_threads changes. */
static Atomic_relaxed<bool> srv_purge_thread_count_changed;
static std::mutex purge_thread_count_mtx;
void srv_update_purge_thread_count(uint n) void srv_update_purge_thread_count(uint n)
{ {
std::lock_guard<std::mutex> lk(purge_thread_count_mtx); std::lock_guard<std::mutex> lk(purge_thread_count_mtx);
purge_create_background_thds(n); purge_create_background_thds(n);
srv_n_purge_threads = n; srv_n_purge_threads = n;
srv_purge_thread_count_changed = 1; srv_purge_thread_count_changed = true;
} }
Atomic_counter<int> srv_purge_thread_count_changed;
inline void purge_coordinator_state::do_purge() inline void purge_coordinator_state::do_purge()
{ {
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
lazy_init();
ut_ad(n_threads);
while (purge_sys.enabled() && !purge_sys.paused()) if (!purge_sys.enabled() || purge_sys.paused())
{ return;
loop:
const auto now= my_interval_timer(); uint n_threads;
if (now - start_time >= 1000000)
{ {
refresh(false); std::lock_guard<std::mutex> lk(purge_thread_count_mtx);
start_time= now; n_threads= srv_n_purge_threads;
srv_purge_thread_count_changed= false;
goto first_loop;
} }
const auto old_activity_count= srv_sys.activity_count; do
const auto history_size= trx_sys.history_size(); {
if (UNIV_UNLIKELY(srv_purge_thread_count_changed)) if (UNIV_UNLIKELY(srv_purge_thread_count_changed))
{ {
/* Read the fresh value of srv_n_purge_threads, reset /* Read the fresh value of srv_n_purge_threads, reset
the changed flag. Both are protected by purge_thread_count_mtx. the changed flag. Both are protected by purge_thread_count_mtx. */
This code does not run concurrently, it is executed
by a single purge_coordinator thread, and no races
involving srv_purge_thread_count_changed are possible. */
{ {
std::lock_guard<std::mutex> lk(purge_thread_count_mtx); std::lock_guard<std::mutex> lk(purge_thread_count_mtx);
n_threads= n_use_threads= srv_n_purge_threads; n_threads= srv_n_purge_threads;
srv_purge_thread_count_changed= 0; srv_purge_thread_count_changed= false;
}
refresh(true);
start_time= now;
}
else if (history_size > m_history_length)
{
/* dynamically adjust the purge thread based on redo log fill factor */
if (n_use_threads < n_threads && lsn_age_factor < lsn_lwm)
{
more_threads:
++n_use_threads;
lsn_hwm= lsn_lwm;
lsn_lwm-= series[n_use_threads];
}
else if (n_use_threads > 1 && lsn_age_factor >= lsn_hwm)
{
fewer_threads:
--n_use_threads;
lsn_lwm= lsn_hwm;
lsn_hwm+= series[n_use_threads];
} }
else if (n_use_threads == 1 && lsn_age_factor >= 100 - safety_net)
break;
} }
else if (n_threads > n_use_threads && first_loop:
srv_max_purge_lag && m_history_length > srv_max_purge_lag) ut_ad(n_threads);
goto more_threads;
else if (n_use_threads > 1 && old_activity_count == srv_sys.activity_count)
goto fewer_threads;
ut_ad(n_use_threads);
ut_ad(n_use_threads <= n_threads);
m_history_length= history_size; history_size= trx_sys.history_size();
if (!history_size) if (!history_size)
{ {
...@@ -1697,7 +1647,7 @@ inline void purge_coordinator_state::do_purge() ...@@ -1697,7 +1647,7 @@ inline void purge_coordinator_state::do_purge()
break; break;
} }
ulint n_pages_handled= trx_purge(n_use_threads, history_size); ulint n_pages_handled= trx_purge(n_threads, history_size);
if (!trx_sys.history_exists()) if (!trx_sys.history_exists())
goto no_history; goto no_history;
if (purge_sys.truncate.current || srv_shutdown_state != SRV_SHUTDOWN_NONE) if (purge_sys.truncate.current || srv_shutdown_state != SRV_SHUTDOWN_NONE)
...@@ -1709,79 +1659,13 @@ inline void purge_coordinator_state::do_purge() ...@@ -1709,79 +1659,13 @@ inline void purge_coordinator_state::do_purge()
srv_thread_pool->submit_task(&purge_truncation_task); srv_thread_pool->submit_task(&purge_truncation_task);
if (!n_pages_handled) if (!n_pages_handled)
break; break;
if (!srv_purge_should_exit(history_size))
goto loop;
} }
while (purge_sys.enabled() && !purge_sys.paused() &&
!srv_purge_should_exit(history_size));
m_running= 0; m_running= 0;
} }
inline void purge_coordinator_state::compute_series()
{
ulint points= n_threads;
memset(series, 0, sizeof series);
constexpr ulint spread= 100 - adaptive_purge_threshold - safety_net;
/* We distribute spread across n_threads,
e.g.: spread of 60 is distributed across n_threads=4 as: 6+12+18+24 */
const ulint additional_points= (points * (points + 1)) / 2;
if (spread % additional_points == 0)
{
/* Arithmetic progression is possible. */
const ulint delta= spread / additional_points;
ulint growth= delta;
do
{
series[points--]= growth;
growth += delta;
}
while (points);
return;
}
/* Use average distribution to spread across the points */
const ulint delta= spread / points;
ulint total= 0;
do
{
series[points--]= delta;
total+= delta;
}
while (points);
for (points= 1; points <= n_threads && total++ < spread; )
series[points++]++;
}
inline void purge_coordinator_state::lazy_init()
{
if (n_threads)
return;
n_threads= n_use_threads= srv_n_purge_threads;
refresh(true);
start_time= my_interval_timer();
}
void purge_coordinator_state::refresh(bool full)
{
if (full)
{
compute_series();
lsn_lwm= adaptive_purge_threshold;
lsn_hwm= adaptive_purge_threshold + series[n_threads];
}
mysql_mutex_lock(&log_sys.mutex);
const lsn_t last= log_sys.last_checkpoint_lsn,
max_age= log_sys.max_checkpoint_age;
mysql_mutex_unlock(&log_sys.mutex);
lsn_age_factor= ulint(((log_sys.get_lsn() - last) * 100) / max_age);
}
static std::list<THD*> purge_thds; static std::list<THD*> purge_thds;
static std::mutex purge_thd_mutex; static std::mutex purge_thd_mutex;
extern void* thd_attach_thd(THD*); extern void* thd_attach_thd(THD*);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment