Commit 8c547853 authored by Vladislav Vaintroub's avatar Vladislav Vaintroub

Improve InnoDB purge: reduce context switching, better work distribution

Before this patch, InnoDB purge submitted innodb_purge_threads-1 tasks,
even if there was insufficient work for all. For example,
with innodb_purge_threads at the default (4) and updates on a single table, only one thread could work,
yet three tasks were submitted. This caused unnecessary wakeups and context
switches.

The patch modifies trx_purge_attach_undo_recs() to create as many work
items as necessary. The purge coordinator now processes more tasks itself,
which reduces context switching and improves efficiency.
parent 2e580dc2
......@@ -624,6 +624,15 @@ Complete the shutdown tasks such as background DROP TABLE,
and optionally change buffer merge (on innodb_fast_shutdown=0). */
void srv_shutdown(bool ibuf_merge);
/**
Fetches and executes tasks from the purge work queue,
until this queue is empty.
This is main part of purge worker task, but also
executed in coordinator.
@note needs current_thd to be set beforehand.
*/
void srv_purge_worker_task_low();
} /* extern "C" */
#ifdef UNIV_DEBUG
......
......@@ -1557,7 +1557,6 @@ static bool srv_purge_should_exit(size_t old_history_size)
/*********************************************************************//**
Fetch and execute a task from the work queue.
@param [in,out] slot purge worker thread slot
@return true if a task was executed */
static bool srv_task_execute()
{
......@@ -1697,6 +1696,13 @@ static void release_thd(THD *thd, void *ctx)
set_current_thd(0);
}
void srv_purge_worker_task_low()
{
ut_ad(current_thd);
while (srv_task_execute())
ut_ad(purge_sys.running());
}
static void purge_worker_callback(void*)
{
ut_ad(!current_thd);
......@@ -1704,8 +1710,7 @@ static void purge_worker_callback(void*)
ut_ad(srv_force_recovery < SRV_FORCE_NO_BACKGROUND);
void *ctx;
THD *thd= acquire_thd(&ctx);
while (srv_task_execute())
ut_ad(purge_sys.running());
srv_purge_worker_task_low();
release_thd(thd,ctx);
}
......
......@@ -1214,119 +1214,138 @@ dict_table_t *purge_sys_t::close_and_reopen(table_id_t id, THD *thd,
/** Run a purge batch.
@param n_purge_threads number of purge threads
@param thd purge coordinator thread handle
@param n_work_items number of work items (currently tables) to process
@return new purge_sys.head */
static purge_sys_t::iterator
trx_purge_attach_undo_recs(ulint n_purge_threads, THD *thd)
static purge_sys_t::iterator trx_purge_attach_undo_recs(THD *thd,
ulint *n_work_items)
{
que_thr_t* thr;
que_thr_t *thr;
ulint i;
ut_a(n_purge_threads > 0);
ut_a(UT_LIST_GET_LEN(purge_sys.query->thrs) >= n_purge_threads);
purge_sys_t::iterator head = purge_sys.tail;
#ifdef UNIV_DEBUG
i = 0;
/* Debug code to validate some pre-requisites and reset done flag. */
for (thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
thr != NULL && i < n_purge_threads;
thr = UT_LIST_GET_NEXT(thrs, thr), ++i) {
purge_node_t* node;
/* Get the purge node. */
node = (purge_node_t*) thr->child;
ut_ad(que_node_get_type(node) == QUE_NODE_PURGE);
ut_ad(node->undo_recs.empty());
ut_ad(!node->in_progress);
ut_d(node->in_progress = true);
}
/* There should never be fewer nodes than threads, the inverse
however is allowed because we only use purge threads as needed. */
ut_ad(i == n_purge_threads);
#endif
purge_sys_t::iterator head= purge_sys.tail;
/* Fetch and parse the UNDO records. The UNDO records are added
to a per purge node vector. */
thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
thr= nullptr;
ut_ad(head <= purge_sys.tail);
i = 0;
i= 0;
std::unordered_map<table_id_t, purge_node_t*>
table_id_map(TRX_PURGE_TABLE_BUCKETS);
purge_sys.m_active = true;
std::unordered_map<table_id_t, purge_node_t *> table_id_map(
TRX_PURGE_TABLE_BUCKETS);
purge_sys.m_active= true;
MDL_context* const mdl_context
= static_cast<MDL_context*>(thd_mdl_context(thd));
MDL_context *const mdl_context=
static_cast<MDL_context *>(thd_mdl_context(thd));
ut_ad(mdl_context);
const size_t max_pages = std::min(buf_pool.curr_size * 3 / 4,
size_t{srv_purge_batch_size});
const size_t max_pages=
std::min(buf_pool.curr_size * 3 / 4, size_t{srv_purge_batch_size});
while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) {
while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown)
{
/* Track the max {trx_id, undo_no} for truncating the
UNDO logs once we have purged the records. */
if (head <= purge_sys.tail) {
head = purge_sys.tail;
if (head <= purge_sys.tail)
{
head= purge_sys.tail;
}
/* Fetch the next record, and advance the purge_sys.tail. */
trx_purge_rec_t purge_rec = purge_sys.fetch_next_rec();
trx_purge_rec_t purge_rec= purge_sys.fetch_next_rec();
if (!purge_rec.undo_rec) {
if (!purge_rec.roll_ptr) {
if (!purge_rec.undo_rec)
{
if (!purge_rec.roll_ptr)
{
break;
}
ut_ad(purge_rec.roll_ptr == 1);
continue;
}
table_id_t table_id = trx_undo_rec_get_table_id(
purge_rec.undo_rec);
purge_node_t*& table_node = table_id_map[table_id];
table_id_t table_id= trx_undo_rec_get_table_id(purge_rec.undo_rec);
if (!table_node) {
std::pair<dict_table_t*,MDL_ticket*> p;
p.first = trx_purge_table_open(table_id, mdl_context,
&p.second);
if (p.first == reinterpret_cast<dict_table_t*>(-1)) {
p.first = purge_sys.close_and_reopen(
table_id, thd, &p.second);
purge_node_t *&table_node= table_id_map[table_id];
if (table_node)
ut_ad(!table_node->in_progress);
if (!table_node)
{
std::pair<dict_table_t *, MDL_ticket *> p;
p.first= trx_purge_table_open(table_id, mdl_context, &p.second);
if (p.first == reinterpret_cast<dict_table_t *>(-1))
{
p.first= purge_sys.close_and_reopen(table_id, thd, &p.second);
}
thr = UT_LIST_GET_NEXT(thrs, thr);
if (!(++i % n_purge_threads)) {
thr = UT_LIST_GET_FIRST(
purge_sys.query->thrs);
if (!thr)
thr= UT_LIST_GET_FIRST(purge_sys.query->thrs);
else
{
thr= UT_LIST_GET_NEXT(thrs, thr);
if (!thr)
thr= UT_LIST_GET_FIRST(purge_sys.query->thrs);
}
i++;
table_node= static_cast<purge_node_t *>(thr->child);
table_node = static_cast<purge_node_t*>(thr->child);
ut_a(que_node_get_type(table_node) == QUE_NODE_PURGE);
ut_d(auto i=)
table_node->tables.emplace(table_id, p);
ut_ad(i.second);
if (p.first) {
ut_d(auto pair=) table_node->tables.emplace(table_id, p);
ut_ad(pair.second);
if (p.first)
{
goto enqueue;
}
} else if (table_node->tables[table_id].first) {
enqueue:
}
else if (table_node->tables[table_id].first)
{
enqueue:
table_node->undo_recs.push(purge_rec);
ut_ad(!table_node->in_progress);
}
if (purge_sys.n_pages_handled() >= max_pages) {
if (purge_sys.n_pages_handled() >= max_pages)
{
break;
}
}
purge_sys.m_active = false;
purge_sys.m_active= false;
*n_work_items= i;
#ifdef UNIV_DEBUG
i= 0;
/* Debug code to validate some pre-requisites and reset done flag. */
for (thr= UT_LIST_GET_FIRST(purge_sys.query->thrs);
thr != NULL && i < *n_work_items;
thr= UT_LIST_GET_NEXT(thrs, thr), ++i)
{
purge_node_t *node;
/* Get the purge node. */
node= (purge_node_t *) thr->child;
ut_ad(que_node_get_type(node) == QUE_NODE_PURGE);
if (i < *n_work_items)
{
//BUGBUG - dummy node ut_ad(!node->undo_recs.empty());
ut_ad(!node->in_progress);
node->in_progress= true;
}
else
{
ut_ad(node->undo_recs.empty());
ut_ad(!node->in_progress);
}
}
/* There should never be fewer nodes than threads, the inverse
however is allowed because we only use purge threads as needed. */
#endif
ut_ad(head <= purge_sys.tail);
......@@ -1393,55 +1412,80 @@ TRANSACTIONAL_TARGET ulint trx_purge(ulint n_tasks, ulint history_size)
purge_sys.clone_oldest_view();
#ifdef UNIV_DEBUG
if (srv_purge_view_update_only_debug) {
return(0);
if (srv_purge_view_update_only_debug)
{
return (0);
}
#endif /* UNIV_DEBUG */
THD* const thd = current_thd;
THD *const thd= current_thd;
/* Fetch the UNDO recs that need to be purged. */
const purge_sys_t::iterator head
= trx_purge_attach_undo_recs(n_tasks, thd);
const size_t n_pages = purge_sys.n_pages_handled();
ulint n_work_items;
const purge_sys_t::iterator head=
trx_purge_attach_undo_recs(thd, &n_work_items);
const size_t n_pages= purge_sys.n_pages_handled();
{
ulint delay = n_pages ? srv_max_purge_lag : 0;
if (UNIV_UNLIKELY(delay)) {
if (delay >= history_size) {
ulint delay= n_pages ? srv_max_purge_lag : 0;
if (UNIV_UNLIKELY(delay))
{
if (delay >= history_size)
{
no_throttle:
delay = 0;
} else if (const ulint max_delay =
srv_max_purge_lag_delay) {
delay = std::min(max_delay,
10000 * history_size / delay
- 5000);
} else {
delay= 0;
}
else if (const ulint max_delay= srv_max_purge_lag_delay)
{
delay= std::min(max_delay, 10000 * history_size / delay - 5000);
}
else
{
goto no_throttle;
}
}
srv_dml_needed_delay = delay;
srv_dml_needed_delay= delay;
}
que_thr_t* thr = nullptr;
ut_ad(n_tasks);
que_thr_t *thr= nullptr;
/* Submit tasks to workers queue if using multi-threaded purge. */
for (ulint i = n_tasks; --i; ) {
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
ut_a(thr);
if (n_work_items)
{
for (ulint i= 0; i < n_work_items; i++)
{
thr= que_fork_scheduler_round_robin(purge_sys.query, thr);
if (!thr)
break;
srv_que_task_enqueue_low(thr);
srv_thread_pool->submit_task(&purge_worker_task);
}
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
/*
To reduce context switches we only submit at most n_tasks-1 worker task.
(we can use less tasks, if there is not enough work)
que_run_threads(thr);
The coordinator does worker's job, instead of waiting and sitting idle,
then waits for all others to finish.
This also means if innodb_purge_threads=1, the coordinator does all
the work alone.
*/
for (ulint i= 0; i < std::min(n_work_items,n_tasks) - 1; i++)
{
srv_thread_pool->submit_task(&purge_worker_task);
}
srv_purge_worker_task_low();
if (n_tasks > 1)
trx_purge_wait_for_workers_to_complete();
}
for (thr = UT_LIST_GET_FIRST(purge_sys.query->thrs); thr;
thr = UT_LIST_GET_NEXT(thrs, thr)) {
purge_node_t* node = static_cast<purge_node_t*>(thr->child);
ulint n= 0;
for (thr= UT_LIST_GET_FIRST(purge_sys.query->thrs);
thr != nullptr && n++ < n_work_items;
thr= UT_LIST_GET_NEXT(thrs, thr))
{
purge_node_t *node= static_cast<purge_node_t *>(thr->child);
trx_purge_close_tables(node, thd);
node->tables.clear();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment