Commit 9db2b327 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-34759: buf_page_get_low() is unnecessarily acquiring exclusive latch

buf_page_ibuf_merge_try(): A new, separate function for invoking
ibuf_merge_or_delete_for_page() when needed. Use the already requested
page latch for determining if the call is necessary. If it is and
if we are currently holding rw_latch==RW_S_LATCH, upgrading to an exclusive
latch may involve waiting that another thread acquires and releases
a U or X latch on the page. If we have to wait, we must recheck if the
call to ibuf_merge_or_delete_for_page() is still needed. If the page
turns out to be corrupted, we will release and fail the operation.
Finally, the exclusive page latch will be downgraded to the originally
requested latch.

ssux_lock_impl::rd_u_upgrade_try(): Attempt to upgrade a shared lock to
an update lock.

sux_lock::s_x_upgrade_try(): Attempt to upgrade a shared lock to
exclusive.

sux_lock::s_x_upgrade(): Upgrade a shared lock to exclusive.
Return whether a wait was elided.

ssux_lock_impl::u_rd_downgrade(), sux_lock::u_s_downgrade():
Downgrade an update lock to shared.
parent 3e5e97b2
......@@ -2404,6 +2404,78 @@ buf_zip_decompress(
return(FALSE);
}
ATTRIBUTE_COLD
/** Try to merge buffered changes to a buffer pool page.
@param block buffer-fixed and latched block
@param rw_latch RW_X_LATCH, RW_SX_LATCH, RW_S_LATCH held on block
@param err error code
@return whether the page is invalid (corrupted) */
static bool buf_page_ibuf_merge_try(buf_block_t *block, ulint rw_latch,
dberr_t *err)
{
ut_ad(block->page.lock.have_any());
ut_ad(block->page.buf_fix_count());
if (fil_page_get_type(block->page.frame) != FIL_PAGE_INDEX ||
!page_is_leaf(block->page.frame))
return false;
if (rw_latch != RW_X_LATCH)
{
if (rw_latch == RW_S_LATCH)
{
if (!block->page.lock.s_x_upgrade())
{
uint32_t state;
state= block->page.state();
if (state < buf_page_t::UNFIXED)
{
fail:
block->page.lock.x_unlock();
return true;
}
ut_ad(state & ~buf_page_t::LRU_MASK);
ut_ad(state < buf_page_t::READ_FIX);
if (state < buf_page_t::IBUF_EXIST || state >= buf_page_t::REINIT)
/* ibuf_merge_or_delete_for_page() was already invoked in
another thread. */
goto downgrade_to_s;
}
}
else
{
ut_ad(rw_latch == RW_SX_LATCH);
block->page.lock.u_x_upgrade();
}
}
ut_ad(block->page.lock.have_x());
block->page.clear_ibuf_exist();
if (dberr_t e= ibuf_merge_or_delete_for_page(block, block->page.id(),
block->zip_size()))
{
if (err)
*err= e;
goto fail;
}
switch (rw_latch) {
default:
ut_ad(rw_latch == RW_X_LATCH);
break;
case RW_SX_LATCH:
block->page.lock.x_u_downgrade();
break;
case RW_S_LATCH:
downgrade_to_s:
block->page.lock.x_u_downgrade();
block->page.lock.u_s_downgrade();
break;
}
return false;
}
/** Low level function used to get access to a database page.
@param[in] page_id page id
@param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
......@@ -2443,6 +2515,7 @@ buf_page_get_low(
|| (rw_latch == RW_X_LATCH)
|| (rw_latch == RW_SX_LATCH)
|| (rw_latch == RW_NO_LATCH));
ut_ad(rw_latch != RW_NO_LATCH || !allow_ibuf_merge);
if (err) {
*err = DB_SUCCESS;
......@@ -2844,54 +2917,9 @@ buf_page_get_low(
state to FREED). Therefore, after acquiring the page latch we
must recheck the state. */
if (state >= buf_page_t::UNFIXED
&& allow_ibuf_merge
&& fil_page_get_type(block->page.frame) == FIL_PAGE_INDEX
&& page_is_leaf(block->page.frame)) {
block->page.lock.x_lock();
state = block->page.state();
ut_ad(state < buf_page_t::READ_FIX);
if (state >= buf_page_t::IBUF_EXIST
&& state < buf_page_t::REINIT) {
block->page.clear_ibuf_exist();
if (dberr_t local_err =
ibuf_merge_or_delete_for_page(block, page_id,
block->zip_size())) {
if (err) {
*err = local_err;
}
goto release_and_ignore_block;
}
} else if (state < buf_page_t::UNFIXED) {
release_and_ignore_block:
block->page.lock.x_unlock();
goto ignore_block;
}
#ifdef BTR_CUR_HASH_ADAPT
btr_search_drop_page_hash_index(block, true);
#endif /* BTR_CUR_HASH_ADAPT */
switch (rw_latch) {
case RW_NO_LATCH:
block->page.lock.x_unlock();
break;
case RW_S_LATCH:
block->page.lock.x_unlock();
block->page.lock.s_lock();
break;
case RW_SX_LATCH:
block->page.lock.x_u_downgrade();
break;
default:
ut_ad(rw_latch == RW_X_LATCH);
}
mtr->memo_push(block, mtr_memo_type_t(rw_latch));
} else {
switch (rw_latch) {
case RW_NO_LATCH:
ut_ad(!allow_ibuf_merge);
mtr->memo_push(block, MTR_MEMO_BUF_FIX);
return block;
case RW_S_LATCH:
......@@ -2915,17 +2943,23 @@ buf_page_get_low(
state = block->page.state();
if (UNIV_UNLIKELY(state < buf_page_t::UNFIXED)) {
corrupted:
mtr->release_last_page();
goto ignore_unfixed;
}
ut_ad(state < buf_page_t::READ_FIX
|| state > buf_page_t::WRITE_FIX);
if (state >= buf_page_t::IBUF_EXIST && state < buf_page_t::REINIT
&& allow_ibuf_merge
&& buf_page_ibuf_merge_try(block, rw_latch, err)) {
ut_ad(block == mtr->at_savepoint(mtr->get_savepoint() - 1));
mtr->lock_register(mtr->get_savepoint() - 1, MTR_MEMO_BUF_FIX);
goto corrupted;
}
#ifdef BTR_CUR_HASH_ADAPT
btr_search_drop_page_hash_index(block, true);
#endif /* BTR_CUR_HASH_ADAPT */
}
ut_ad(page_id_t(page_get_space_id(block->page.frame),
page_get_page_no(block->page.frame)) == page_id);
......
......@@ -282,6 +282,8 @@ class ssux_lock_impl
#endif
}
bool rd_u_upgrade_try() { return writer.wr_lock_try(); }
void u_wr_upgrade()
{
DBUG_ASSERT(writer.is_locked());
......@@ -296,6 +298,13 @@ class ssux_lock_impl
readers.store(0, std::memory_order_release);
/* Note: Any pending rd_lock() will not be woken up until u_unlock() */
}
void u_rd_downgrade()
{
DBUG_ASSERT(writer.is_locked());
ut_d(uint32_t lk=) readers.fetch_add(1, std::memory_order_relaxed);
ut_ad(lk < WRITER);
u_unlock();
}
void rd_unlock()
{
......
......@@ -198,6 +198,30 @@ class sux_lock final
/** Upgrade an update lock */
inline void u_x_upgrade();
inline void u_x_upgrade(const char *file, unsigned line);
/** @return whether a shared lock was upgraded to exclusive */
bool s_x_upgrade_try()
{
ut_ad(have_s());
ut_ad(!have_u_or_x());
if (!lock.rd_u_upgrade_try())
return false;
claim_ownership();
s_unlock();
lock.u_wr_upgrade();
recursive= RECURSIVE_X;
return true;
}
__attribute__((warn_unused_result))
/** @return whether the operation succeeded without waiting */
bool s_x_upgrade()
{
if (s_x_upgrade_try())
return true;
s_unlock();
x_lock();
return false;
}
/** Downgrade a single exclusive lock to an update lock */
void x_u_downgrade()
{
......@@ -206,6 +230,16 @@ class sux_lock final
recursive*= RECURSIVE_U;
lock.wr_u_downgrade();
}
/** Downgrade a single update lock to a shared lock */
void u_s_downgrade()
{
ut_ad(have_u_or_x());
ut_ad(recursive == RECURSIVE_U);
recursive= 0;
set_new_owner(0);
lock.u_rd_downgrade();
ut_d(s_lock_register());
}
/** Acquire an exclusive lock or upgrade an update lock
@return whether U locks were upgraded to X */
......
......@@ -92,6 +92,25 @@ static void test_ssux_lock()
ssux.wr_u_downgrade();
ssux.u_unlock();
}
for (auto j= M_ROUNDS; j--; )
{
ssux.rd_lock();
assert(!critical);
if (ssux.rd_u_upgrade_try())
{
assert(!critical);
ssux.rd_unlock();
ssux.u_wr_upgrade();
assert(!critical);
critical= true;
critical= false;
ssux.wr_u_downgrade();
ssux.u_rd_downgrade();
}
assert(!critical);
ssux.rd_unlock();
}
}
}
......@@ -129,6 +148,14 @@ static void test_sux_lock()
critical= false;
sux.x_u_downgrade();
sux.u_unlock();
sux.s_lock();
std::ignore= sux.s_x_upgrade();
assert(!critical);
sux.x_lock();
critical= true;
sux.x_unlock();
critical= false;
sux.x_unlock();
}
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment