Commit d2c593c2 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-22877 Avoid unnecessary buf_pool.page_hash S-latch acquisition

MDEV-15053 did not remove all unnecessary buf_pool.page_hash S-latch
acquisition. There are code paths where we are holding buf_pool.mutex
(which will sufficiently protect buf_pool.page_hash against changes)
and unnecessarily acquire the latch. Many invocations of
buf_page_hash_get_locked() can be replaced with the much simpler
buf_pool.page_hash_get_low().

In the worst case the thread that is holding buf_pool.mutex will become
a victim of MDEV-22871, suffering from a spurious reader-reader conflict
with another thread that genuinely needs to acquire a buf_pool.page_hash
S-latch.

In many places, we were also evaluating page_id_t::fold() while holding
buf_pool.mutex. Low-level functions such as buf_pool.page_hash_get_low()
must get the page_id_t::fold() as a parameter.

buf_buddy_relocate(): Defer the hash_lock acquisition to the critical
section that starts by calling buf_page_t::can_relocate().
parent 0b5dc626
......@@ -7023,9 +7023,11 @@ static void btr_blob_free(buf_block_t *block, bool all, mtr_t *mtr)
ut_ad(mtr->memo_contains_flagged(block, MTR_MEMO_PAGE_X_FIX));
mtr->commit();
const ulint fold= page_id.fold();
mutex_enter(&buf_pool.mutex);
if (buf_page_t *bpage= buf_pool.page_hash_get_low(page_id))
if (buf_page_t *bpage= buf_pool.page_hash_get_low(page_id, fold))
if(!buf_LRU_free_page(bpage, all) && all && bpage->zip.data)
/* Attempt to deallocate the redundant copy of the uncompressed page
if the whole ROW_FORMAT=COMPRESSED block cannot be deallocted. */
......
......@@ -2074,7 +2074,6 @@ btr_search_hash_table_validate(ulint hash_table_id)
for (; node != NULL; node = node->next) {
const buf_block_t* block
= buf_pool.block_from_ahi((byte*) node->data);
const buf_block_t* hash_block;
index_id_t page_index_id;
if (UNIV_LIKELY(block->page.state()
......@@ -2085,29 +2084,22 @@ btr_search_hash_table_validate(ulint hash_table_id)
the block is being freed
(BUF_BLOCK_REMOVE_HASH, see the
assertion and the comment below) */
hash_block = buf_block_hash_get(
block->page.id());
} else {
hash_block = NULL;
}
if (hash_block) {
ut_a(hash_block == block);
} else {
/* When a block is being freed,
buf_LRU_search_and_free_block() first
removes the block from
buf_pool.page_hash by calling
buf_LRU_block_remove_hashed_page().
After that, it invokes
btr_search_drop_page_hash_index() to
remove the block from
btr_search_sys->hash_tables[i]. */
ut_a(block->page.state()
== BUF_BLOCK_REMOVE_HASH);
const page_id_t id(block->page.id());
if (const buf_page_t* hash_page
= buf_pool.page_hash_get_low(
id, id.fold())) {
ut_ad(hash_page == &block->page);
goto state_ok;
}
}
/* When a block is being freed,
buf_LRU_search_and_free_block() first removes
the block from buf_pool.page_hash by calling
buf_LRU_block_remove_hashed_page(). Then it
invokes btr_search_drop_page_hash_index(). */
ut_a(block->page.state() == BUF_BLOCK_REMOVE_HASH);
state_ok:
ut_ad(!dict_index_is_ibuf(block->index));
ut_ad(block->page.id().space()
== block->index->table->space_id);
......
......@@ -508,12 +508,9 @@ static bool buf_buddy_relocate(void* src, void* dst, ulint i, bool force)
ut_ad(space != BUF_BUDDY_STAMP_FREE);
const page_id_t page_id(space, offset);
const ulint fold= page_id.fold();
rw_lock_t* hash_lock = buf_pool.hash_lock_get(page_id);
rw_lock_x_lock(hash_lock);
bpage = buf_pool.page_hash_get_low(page_id);
bpage = buf_pool.page_hash_get_low(page_id, fold);
if (!bpage || bpage->zip.data != src) {
/* The block has probably been freshly
......@@ -521,8 +518,6 @@ static bool buf_buddy_relocate(void* src, void* dst, ulint i, bool force)
added to buf_pool.page_hash yet. Obviously,
it cannot be relocated. */
rw_lock_x_unlock(hash_lock);
if (!force || space != 0 || offset != 0) {
return(false);
}
......@@ -534,8 +529,6 @@ static bool buf_buddy_relocate(void* src, void* dst, ulint i, bool force)
while (bpage != NULL) {
if (bpage->zip.data == src) {
ut_ad(bpage->id() == page_id);
hash_lock = buf_pool.hash_lock_get(page_id);
rw_lock_x_lock(hash_lock);
break;
}
bpage = UT_LIST_GET_NEXT(LRU, bpage);
......@@ -551,9 +544,6 @@ static bool buf_buddy_relocate(void* src, void* dst, ulint i, bool force)
have to relocate all blocks covered by src.
For the sake of simplicity, give up. */
ut_ad(page_zip_get_size(&bpage->zip) < size);
rw_lock_x_unlock(hash_lock);
return(false);
}
......@@ -561,6 +551,13 @@ static bool buf_buddy_relocate(void* src, void* dst, ulint i, bool force)
contain uninitialized data. */
UNIV_MEM_ASSERT_W(src, size);
if (!bpage->can_relocate()) {
return false;
}
rw_lock_t * hash_lock = buf_pool.hash_lock_get_low(fold);
rw_lock_x_lock(hash_lock);
if (bpage->can_relocate()) {
/* Relocate the compressed page. */
const ulonglong ns = my_interval_timer();
......
This diff is collapsed.
......@@ -1230,7 +1230,7 @@ static bool buf_flush_check_neighbor(const page_id_t id,
ut_ad(flush == IORequest::LRU || flush == IORequest::FLUSH_LIST);
ut_ad(mutex_own(&buf_pool.mutex));
buf_page_t *bpage= buf_pool.page_hash_get_low(id);
buf_page_t *bpage= buf_pool.page_hash_get_low(id, id.fold());
if (!bpage || buf_pool.watch_is_sentinel(*bpage))
return false;
......@@ -1409,9 +1409,11 @@ buf_flush_try_neighbors(
}
}
const ulint fold = id.fold();
mutex_enter(&buf_pool.mutex);
bpage = buf_page_hash_get(id);
bpage = buf_pool.page_hash_get_low(id, fold);
if (bpage == NULL) {
mutex_exit(&buf_pool.mutex);
......
......@@ -1159,7 +1159,8 @@ bool buf_LRU_free_page(buf_page_t *bpage, bool zip)
/* We must hold an exclusive hash_lock to prevent
bpage->can_relocate() from changing due to a concurrent
execution of buf_page_get_low(). */
rw_lock_t* hash_lock = buf_pool.hash_lock_get(id);
const ulint fold = id.fold();
rw_lock_t* hash_lock = buf_pool.hash_lock_get_low(fold);
rw_lock_x_lock(hash_lock);
if (UNIV_UNLIKELY(!bpage->can_relocate())) {
......@@ -1216,7 +1217,7 @@ bool buf_LRU_free_page(buf_page_t *bpage, bool zip)
rw_lock_x_lock(hash_lock);
ut_ad(!buf_pool.page_hash_get_low(id));
ut_ad(!buf_pool.page_hash_get_low(id, fold));
ut_ad(b->zip_size());
UNIV_MEM_DESC(b->zip.data, b->zip_size());
......@@ -1238,8 +1239,7 @@ bool buf_LRU_free_page(buf_page_t *bpage, bool zip)
ut_ad(b->in_LRU_list);
ut_ad(b->in_page_hash);
HASH_INSERT(buf_page_t, hash, buf_pool.page_hash,
id.fold(), b);
HASH_INSERT(buf_page_t, hash, buf_pool.page_hash, fold, b);
/* Insert b where bpage was in the LRU list. */
if (prev_b) {
......
......@@ -119,14 +119,16 @@ static buf_page_t* buf_page_init_for_read(ulint mode, const page_id_t page_id,
rw_lock_x_lock_gen(&block->lock, BUF_IO_READ);
}
const ulint fold= page_id.fold();
mutex_enter(&buf_pool.mutex);
/* We must acquire hash_lock this early to prevent
a race condition with buf_pool_t::watch_remove() */
rw_lock_t *hash_lock= buf_pool.hash_lock_get(page_id);
rw_lock_t *hash_lock= buf_pool.hash_lock_get_low(fold);
rw_lock_x_lock(hash_lock);
buf_page_t *hash_page= buf_pool.page_hash_get_low(page_id);
buf_page_t *hash_page= buf_pool.page_hash_get_low(page_id, fold);
if (hash_page && !buf_pool.watch_is_sentinel(*hash_page))
{
/* The page is already in the buffer pool. */
......@@ -157,7 +159,7 @@ static buf_page_t* buf_page_init_for_read(ulint mode, const page_id_t page_id,
block->page.set_state(BUF_BLOCK_FILE_PAGE);
ut_ad(!block->page.in_page_hash);
ut_d(block->page.in_page_hash= true);
HASH_INSERT(buf_page_t, hash, buf_pool.page_hash, page_id.fold(), bpage);
HASH_INSERT(buf_page_t, hash, buf_pool.page_hash, fold, bpage);
rw_lock_x_unlock(hash_lock);
/* The block must be put to the LRU list, to the old blocks */
......@@ -198,7 +200,7 @@ static buf_page_t* buf_page_init_for_read(ulint mode, const page_id_t page_id,
check the page_hash again, as it may have been modified. */
if (UNIV_UNLIKELY(lru))
{
hash_page= buf_pool.page_hash_get_low(page_id);
hash_page= buf_pool.page_hash_get_low(page_id, fold);
if (UNIV_UNLIKELY(hash_page && !buf_pool.watch_is_sentinel(*hash_page)))
{
......@@ -230,7 +232,7 @@ static buf_page_t* buf_page_init_for_read(ulint mode, const page_id_t page_id,
ut_ad(!bpage->in_page_hash);
ut_d(bpage->in_page_hash= true);
HASH_INSERT(buf_page_t, hash, buf_pool.page_hash, page_id.fold(), bpage);
HASH_INSERT(buf_page_t, hash, buf_pool.page_hash, fold, bpage);
bpage->set_io_fix(BUF_IO_READ);
rw_lock_x_unlock(hash_lock);
......@@ -425,7 +427,7 @@ buf_read_ahead_random(const page_id_t page_id, ulint zip_size, bool ibuf)
{
const ulint fold= i.fold();
rw_lock_t *hash_lock= buf_pool.page_hash_lock<false>(fold);
const buf_page_t* bpage= buf_pool.page_hash_get_low(i);
const buf_page_t* bpage= buf_pool.page_hash_get_low(i, fold);
bool found= bpage && bpage->is_accessed() && buf_page_peek_if_young(bpage);
rw_lock_s_unlock(hash_lock);
if (found && !--count)
......@@ -619,7 +621,7 @@ buf_read_ahead_linear(const page_id_t page_id, ulint zip_size, bool ibuf)
{
const ulint fold= i.fold();
rw_lock_t *hash_lock= buf_pool.page_hash_lock<false>(fold);
const buf_page_t* bpage= buf_pool.page_hash_get_low(i);
const buf_page_t* bpage= buf_pool.page_hash_get_low(i, fold);
if (i == page_id)
{
/* Read the natural predecessor and successor page addresses from
......
......@@ -3335,7 +3335,7 @@ ibuf_insert_low(
/* We check if the index page is suitable for buffered entries */
if (buf_page_hash_get(page_id)
if (buf_pool.page_hash_contains(page_id)
|| lock_rec_expl_exist_on_page(page_id.space(),
page_id.page_no())) {
......@@ -3576,9 +3576,9 @@ ibuf_insert(
would always trigger the buffer pool watch during purge and
thus prevent the buffering of delete operations. We assume
that the issuer of IBUF_OP_DELETE has called
buf_pool_watch_set(space, page_no). */
buf_pool_t::watch_set(). */
if (buf_page_get_also_watch(page_id)) {
if (buf_pool.page_hash_contains<true>(page_id)) {
/* A buffer pool watch has been set or the
page has been read into the buffer pool.
Do not buffer the request. If a purge operation
......
......@@ -711,72 +711,6 @@ void buf_page_monitor(const buf_page_t *bpage, buf_io_fix io_type);
@retval DB_DECRYPTION_FAILED if the page cannot be decrypted */
dberr_t buf_page_read_complete(buf_page_t *bpage, const fil_node_t &node);
/** Returns the control block of a file page, NULL if not found.
If the block is found and lock is not NULL then the appropriate
page_hash lock is acquired in the specified lock mode. Otherwise,
mode value is ignored. It is up to the caller to release the
lock. If the block is found and the lock is NULL then the page_hash
lock is released by this function.
@param[in] page_id page id
@param[in,out] lock lock of the page hash acquired if bpage is
found, NULL otherwise. If NULL is passed then the hash_lock is released by
this function.
@param[in] lock_mode RW_LOCK_X or RW_LOCK_S. Ignored if
lock == NULL
@param[in] watch if true, return watch sentinel also.
@return pointer to the bpage or NULL; if NULL, lock is also NULL or
a watch sentinel. */
UNIV_INLINE
buf_page_t*
buf_page_hash_get_locked(
const page_id_t page_id,
rw_lock_t** lock,
ulint lock_mode,
bool watch = false);
/** Returns the control block of a file page, NULL if not found.
If the block is found and lock is not NULL then the appropriate
page_hash lock is acquired in the specified lock mode. Otherwise,
mode value is ignored. It is up to the caller to release the
lock. If the block is found and the lock is NULL then the page_hash
lock is released by this function.
@param[in] page_id page id
@param[in,out] lock lock of the page hash acquired if bpage is
found, NULL otherwise. If NULL is passed then the hash_lock is released by
this function.
@param[in] lock_mode RW_LOCK_X or RW_LOCK_S. Ignored if
lock == NULL
@return pointer to the block or NULL; if NULL, lock is also NULL. */
UNIV_INLINE
buf_block_t*
buf_block_hash_get_locked(
const page_id_t page_id,
rw_lock_t** lock,
ulint lock_mode);
/* There are four different ways we can try to get a bpage or block
from the page hash:
1) Caller already holds the appropriate page hash lock: in the case call
buf_pool_t::page_hash_get_low().
2) Caller wants to hold page hash lock in x-mode
3) Caller wants to hold page hash lock in s-mode
4) Caller doesn't want to hold page hash lock */
#define buf_page_hash_get_s_locked(page_id, l) \
buf_page_hash_get_locked(page_id, l, RW_LOCK_S)
#define buf_page_hash_get_x_locked(page_id, l) \
buf_page_hash_get_locked(page_id, l, RW_LOCK_X)
#define buf_page_hash_get(page_id) \
buf_page_hash_get_locked(page_id, nullptr, RW_LOCK_S)
#define buf_page_get_also_watch(page_id) \
buf_page_hash_get_locked(page_id, nullptr, RW_LOCK_S, true)
#define buf_block_hash_get_s_locked(page_id, l) \
buf_block_hash_get_locked(page_id, l, RW_LOCK_S)
#define buf_block_hash_get_x_locked(page_id, l) \
buf_block_hash_get_locked(page_id, l, RW_LOCK_X)
#define buf_block_hash_get(page_id) \
buf_block_hash_get_locked(page_id, nullptr, RW_LOCK_S)
/** Calculate aligned buffer pool size based on srv_buf_pool_chunk_unit,
if needed.
@param[in] size size in bytes
......@@ -1649,7 +1583,7 @@ class buf_pool_t
This function does not return if the block is not identified.
@param ptr pointer to within a page frame
@return pointer to block, never NULL */
inline buf_block_t* block_from_ahi(const byte *ptr) const;
inline buf_block_t *block_from_ahi(const byte *ptr) const;
#endif /* BTR_CUR_HASH_ADAPT */
bool is_block_lock(const BPageLock *l) const
......@@ -1718,20 +1652,81 @@ class buf_pool_t
}
/** Look up a block descriptor.
@param id page identifier
@param id page identifier
@param fold id.fold()
@return block descriptor, possibly in watch[]
@retval nullptr if not found*/
buf_page_t *page_hash_get_low(const page_id_t id)
buf_page_t *page_hash_get_low(const page_id_t id, const ulint fold)
{
ut_ad(id.fold() == fold);
ut_ad(mutex_own(&mutex) ||
rw_lock_own_flagged(hash_lock_get(id),
rw_lock_own_flagged(hash_lock_get_low(fold),
RW_LOCK_FLAG_X | RW_LOCK_FLAG_S));
buf_page_t* bpage;
buf_page_t *bpage;
/* Look for the page in the hash table */
HASH_SEARCH(hash, page_hash, id.fold(), buf_page_t*, bpage,
HASH_SEARCH(hash, page_hash, fold, buf_page_t*, bpage,
ut_ad(bpage->in_page_hash), id == bpage->id());
return bpage;
}
private:
/** Look up a block descriptor.
@tparam exclusive whether the latch is to be acquired exclusively
@tparam watch whether to allow watch_is_sentinel()
@param page_id page identifier
@param fold page_id.fold()
@param hash_lock pointer to the acquired latch (to be released by caller)
@return pointer to the block
@retval nullptr if no block was found; !lock || !*lock will also hold */
template<bool exclusive,bool watch>
buf_page_t *page_hash_get_locked(const page_id_t page_id, ulint fold,
rw_lock_t **hash_lock)
{
ut_ad(hash_lock || !exclusive);
rw_lock_t *latch= page_hash_lock<exclusive>(fold);
buf_page_t *bpage= page_hash_get_low(page_id, fold);
if (!bpage || watch_is_sentinel(*bpage))
{
if (exclusive)
rw_lock_x_unlock(latch);
else
rw_lock_s_unlock(latch);
if (hash_lock)
*hash_lock= nullptr;
return watch ? bpage : nullptr;
}
ut_ad(bpage->in_file());
ut_ad(page_id == bpage->id());
if (hash_lock)
*hash_lock= latch; /* to be released by the caller */
else if (exclusive)
rw_lock_x_unlock(latch);
else
rw_lock_s_unlock(latch);
return bpage;
}
public:
/** Look up a block descriptor.
@tparam exclusive whether the latch is to be acquired exclusively
@param page_id page identifier
@param fold page_id.fold()
@param hash_lock pointer to the acquired latch (to be released by caller)
@return pointer to the block
@retval nullptr if no block was found; !lock || !*lock will also hold */
template<bool exclusive>
buf_page_t *page_hash_get_locked(const page_id_t page_id, ulint fold,
rw_lock_t **hash_lock)
{ return page_hash_get_locked<exclusive,false>(page_id, fold, hash_lock); }
/** @return whether the buffer pool contains a page
@tparam watch whether to allow watch_is_sentinel()
@param page_id page identifier */
template<bool watch= false>
bool page_hash_contains(const page_id_t page_id)
{
return page_hash_get_locked<false,watch>(page_id, page_id.fold(), nullptr);
}
/** Acquire exclusive latches on all page_hash buckets. */
void page_hash_lock_all() const
......@@ -1779,9 +1774,10 @@ class buf_pool_t
@return whether the page was read to the buffer pool */
bool watch_occurred(const page_id_t id)
{
rw_lock_t *hash_lock= page_hash_lock<false>(id.fold());
const ulint fold= id.fold();
rw_lock_t *hash_lock= page_hash_lock<false>(fold);
/* The page must exist because watch_set() increments buf_fix_count. */
buf_page_t *bpage= page_hash_get_low(id);
buf_page_t *bpage= page_hash_get_low(id, fold);
const bool is_sentinel= watch_is_sentinel(*bpage);
rw_lock_s_unlock(hash_lock);
return !is_sentinel;
......@@ -1791,7 +1787,7 @@ class buf_pool_t
exclusive page hash latch. The *hash_lock may be released,
relocated, and reacquired.
@param id page identifier
@param hash_lock page_hash latch that is held in RW_LOCK_X mode
@param hash_lock exclusively held page_hash latch
@return a buffer pool block corresponding to id
@retval nullptr if the block was not present, and a watch was installed */
inline buf_page_t *watch_set(const page_id_t id, rw_lock_t **hash_lock);
......@@ -1804,7 +1800,7 @@ class buf_pool_t
const ulint fold= id.fold();
rw_lock_t *hash_lock= page_hash_lock<true>(fold);
/* The page must exist because watch_set() increments buf_fix_count. */
buf_page_t *watch= page_hash_get_low(id);
buf_page_t *watch= page_hash_get_low(id, fold);
if (watch->unfix() == 0 && watch_is_sentinel(*watch))
{
/* The following is based on watch_remove(). */
......@@ -1824,7 +1820,7 @@ class buf_pool_t
mutex_exit(&mutex);
}
else
rw_lock_x_unlock(hash_lock);
rw_lock_x_unlock(hash_lock);
}
/** Remove the sentinel block for the watch before replacing it with a
......
......@@ -122,33 +122,6 @@ inline bool buf_page_peek_if_too_old(const buf_page_t *bpage)
}
}
/*********************************************************************//**
Gets the buf_block_t handle of a buffered file block if an uncompressed
page frame exists, or NULL.
@return control block, or NULL */
UNIV_INLINE
buf_block_t*
buf_page_get_block(
/*===============*/
buf_page_t* bpage) /*!< in: control block, or NULL */
{
if (bpage != NULL) {
ut_ad(mutex_own(&buf_pool.mutex)
|| rw_lock_own_flagged(buf_pool.hash_lock_get(bpage->id()),
RW_LOCK_FLAG_X | RW_LOCK_FLAG_S));
switch (bpage->state()) {
case BUF_BLOCK_FILE_PAGE:
return reinterpret_cast<buf_block_t*>(bpage);
case BUF_BLOCK_ZIP_PAGE:
return nullptr;
default:
ut_ad(0);
}
}
return(NULL);
}
#ifdef UNIV_DEBUG
/*********************************************************************//**
Gets a pointer to the memory frame of a block.
......@@ -337,115 +310,6 @@ buf_block_buf_fix_dec(
#endif /* UNIV_DEBUG */
}
/** Returns the control block of a file page, NULL if not found.
If the block is found and lock is not NULL then the appropriate
page_hash lock is acquired in the specified lock mode. Otherwise,
mode value is ignored. It is up to the caller to release the
lock. If the block is found and the lock is NULL then the page_hash
lock is released by this function.
@param[in] page_id page id
@param[in,out] lock lock of the page hash acquired if bpage is
found, NULL otherwise. If NULL is passed then the hash_lock is released by
this function.
@param[in] lock_mode RW_LOCK_X or RW_LOCK_S
@param[in] watch if true, return watch sentinel also.
@return pointer to the bpage or NULL; if NULL, lock is also NULL or
a watch sentinel. */
UNIV_INLINE
buf_page_t*
buf_page_hash_get_locked(
const page_id_t page_id,
rw_lock_t** lock,
ulint lock_mode,
bool watch)
{
ut_ad(lock_mode == RW_LOCK_X || lock_mode == RW_LOCK_S);
ut_ad(lock || lock_mode == RW_LOCK_S);
if (lock != NULL) {
*lock = NULL;
}
const ulint fold= page_id.fold();
rw_lock_t* hash_lock = lock_mode == RW_LOCK_S
? buf_pool.page_hash_lock<false>(fold)
: buf_pool.page_hash_lock<true>(fold);
buf_page_t* bpage = buf_pool.page_hash_get_low(page_id);
if (!bpage || buf_pool.watch_is_sentinel(*bpage)) {
if (!watch) {
bpage = NULL;
}
goto unlock_and_exit;
}
ut_ad(bpage->in_file());
ut_ad(page_id == bpage->id());
if (lock) {
/* To be released by the caller */
*lock = hash_lock;
return bpage;
}
unlock_and_exit:
if (lock_mode == RW_LOCK_S) {
rw_lock_s_unlock(hash_lock);
} else {
rw_lock_x_unlock(hash_lock);
}
return(bpage);
}
/** Returns the control block of a file page, NULL if not found.
If the block is found and lock is not NULL then the appropriate
page_hash lock is acquired in the specified lock mode. Otherwise,
mode value is ignored. It is up to the caller to release the
lock. If the block is found and the lock is NULL then the page_hash
lock is released by this function.
@param[in] page_id page id
@param[in,out] lock lock of the page hash acquired if bpage is
found, NULL otherwise. If NULL is passed then the hash_lock is released by
this function.
@param[in] lock_mode RW_LOCK_X or RW_LOCK_S. Ignored if
lock == NULL
@return pointer to the block or NULL; if NULL, lock is also NULL. */
UNIV_INLINE
buf_block_t*
buf_block_hash_get_locked(
const page_id_t page_id,
rw_lock_t** lock,
ulint lock_mode)
{
buf_page_t* bpage = buf_page_hash_get_locked(page_id, lock,
lock_mode);
buf_block_t* block = buf_page_get_block(bpage);
if (block != NULL) {
ut_ad(block->page.state() == BUF_BLOCK_FILE_PAGE);
ut_ad(!lock || rw_lock_own(*lock, lock_mode));
return(block);
} else if (bpage) {
/* It is not a block. Just a bpage */
ut_ad(bpage->in_file());
if (lock) {
if (lock_mode == RW_LOCK_S) {
rw_lock_s_unlock(*lock);
} else {
rw_lock_x_unlock(*lock);
}
}
*lock = NULL;
return(NULL);
}
ut_ad(lock == NULL ||*lock == NULL);
return(NULL);
}
/********************************************************************//**
Releases a compressed-only page acquired with buf_page_get_zip(). */
UNIV_INLINE
......
......@@ -2537,7 +2537,7 @@ static void recv_read_in_area(page_id_t page_id)
&& i->first.space() == page_id.space()
&& i->first.page_no() < up_limit; i++) {
if (i->second.state == page_recv_t::RECV_NOT_PROCESSED
&& !buf_page_hash_get(i->first)) {
&& !buf_pool.page_hash_contains(i->first)) {
i->second.state = page_recv_t::RECV_BEING_READ;
*p++ = i->first.page_no();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment