Commit 56f5599f authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-27610 Unnecessary wait in InnoDB crash recovery

In recv_sys_t::apply(), we were unnecessarily looking up pages
in buf_pool.page_hash and potentially waiting for exclusive page latches.

Before buf_page_get_low() would return an x-latched page,
that page will have to be read and buf_page_read_complete() would
have invoked recv_recover_page() to apply the log to the page.

Therefore, it suffices to invoke recv_read_in_area() to trigger
a transition from RECV_NOT_PROCESSED.

recv_read_in_area(): Take the iterator as a parameter, and remove
page_id lookups. Should the page already be in buf_pool.page_hash,
buf_page_init_for_read() will return nullptr to buf_read_page_low()
and buf_read_page_background().

recv_sys_t::apply(): Replace goto, remove dead code, and add assertions
to guarantee that the iteration will make progress.

Reviewed by: Vladislav Lesin
parent 216834b0
......@@ -2504,31 +2504,30 @@ void recv_recover_page(fil_space_t* space, buf_page_t* bpage)
ut_ad(mtr.has_committed());
}
/** Reads in pages which have hashed log records, from an area around a given
page number.
@param[in] page_id page id */
static void recv_read_in_area(page_id_t page_id)
/** Read pages for which log needs to be applied.
@param page_id first page identifier to read
@param i iterator to recv_sys.pages */
static void recv_read_in_area(page_id_t page_id, recv_sys_t::map::iterator i)
{
uint32_t page_nos[32];
ut_ad(page_id == i->first);
page_id.set_page_no(ut_2pow_round(page_id.page_no(), 32U));
const uint32_t up_limit = page_id.page_no() + 32;
uint32_t* p = page_nos;
const page_id_t up_limit{page_id + 31};
uint32_t* p= page_nos;
for (recv_sys_t::map::iterator i= recv_sys.pages.lower_bound(page_id);
i != recv_sys.pages.end()
&& i->first.space() == page_id.space()
&& i->first.page_no() < up_limit; i++) {
if (i->second.state == page_recv_t::RECV_NOT_PROCESSED
&& !buf_pool.page_hash_contains(i->first)) {
i->second.state = page_recv_t::RECV_BEING_READ;
*p++ = i->first.page_no();
for (; i != recv_sys.pages.end() && i->first <= up_limit; i++)
{
if (i->second.state == page_recv_t::RECV_NOT_PROCESSED)
{
i->second.state= page_recv_t::RECV_BEING_READ;
*p++= i->first.page_no();
}
}
if (p != page_nos) {
if (p != page_nos)
{
mutex_exit(&recv_sys.mutex);
buf_read_recv_pages(page_id.space(), page_nos,
ulint(p - page_nos));
buf_read_recv_pages(page_id.space(), page_nos, ulint(p - page_nos));
mutex_enter(&recv_sys.mutex);
}
}
......@@ -2694,10 +2693,9 @@ void recv_sys_t::apply(bool last_batch)
for (map::iterator p= pages.begin(); p != pages.end(); )
{
const page_id_t page_id= p->first;
page_recv_t &recs= p->second;
ut_ad(!recs.log.empty());
ut_ad(!p->second.log.empty());
switch (recs.state) {
switch (p->second.state) {
case page_recv_t::RECV_BEING_READ:
case page_recv_t::RECV_BEING_PROCESSED:
p++;
......@@ -2708,35 +2706,17 @@ void recv_sys_t::apply(bool last_batch)
mutex_exit(&mutex);
free_block= buf_LRU_get_free_block(false);
mutex_enter(&mutex);
next_page:
p= pages.lower_bound(page_id);
}
continue;
case page_recv_t::RECV_NOT_PROCESSED:
mtr.start();
mtr.set_log_mode(MTR_LOG_NO_REDO);
if (buf_block_t *block= buf_page_get_low(page_id, 0, RW_X_LATCH,
nullptr, BUF_GET_IF_IN_POOL,
__FILE__, __LINE__,
&mtr, nullptr, false))
{
buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
recv_recover_page(block, mtr, p);
ut_ad(mtr.has_committed());
}
else
{
mtr.commit();
recv_read_in_area(page_id);
break;
}
map::iterator r= p++;
r->second.log.clear();
pages.erase(r);
ut_ad(p == pages.end() || p->first > page_id);
continue;
case page_recv_t::RECV_NOT_PROCESSED:
recv_read_in_area(page_id, p);
}
goto next_page;
p= pages.lower_bound(page_id);
/* Ensure that progress will be made. */
ut_ad(p == pages.end() || p->first > page_id ||
p->second.state >= page_recv_t::RECV_BEING_READ);
}
buf_pool.free_block(free_block);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment