Commit 21987e59 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-20612 fixup: Reduce hash table lookups

Let us calculate the hash table cell address while we are calculating
the latch address, to avoid repeated computations of the address.
The latch address can be derived from the cell address with a simple
bitmask operation.
parent 7953bae2
......@@ -3281,7 +3281,7 @@ ibuf_insert_low(
goto fail_exit;
} else {
LockGuard g{lock_sys.rec_hash, page_id};
if (lock_sys.rec_hash.get_first(page_id)) {
if (lock_sys_t::get_first(g.cell(), page_id)) {
goto commit_exit;
}
}
......
......@@ -645,7 +645,13 @@ class lock_sys_t
/** @return raw array index converted to padded index */
static ulint pad(ulint h) { return 1 + (h / ELEMENTS_PER_LATCH) + h; }
/** Get a latch. */
inline hash_latch *lock_get(ulint fold) const;
static hash_latch *latch(hash_cell_t *cell)
{
void *l= ut_align_down(cell, (ELEMENTS_PER_LATCH + 1) * sizeof *cell);
return static_cast<hash_latch*>(l);
}
/** Get a hash table cell. */
inline hash_cell_t *cell_get(ulint fold) const;
#ifdef UNIV_DEBUG
void assert_locked(const page_id_t id) const;
......@@ -653,20 +659,6 @@ class lock_sys_t
void assert_locked(const page_id_t) const {}
#endif
/** Get the first lock on a page.
@param id page number
@return first lock
@retval nullptr if none exists */
lock_t *get_first(const page_id_t id) const
{
assert_locked(id);
for (auto lock= static_cast<lock_t*>(array[calc_hash(id.fold())].node);
lock; lock= lock->hash)
if (lock->un_member.rec_lock.page_id == id)
return lock;
return nullptr;
}
private:
/** @return the hash value before any ELEMENTS_PER_LATCH padding */
static ulint hash(ulint fold, ulint n) { return ut_hash_ulint(fold, n); }
......@@ -676,14 +668,6 @@ class lock_sys_t
{
return pad(hash(fold, n_cells));
}
/** Get a page_hash latch. */
hash_latch *lock_get(ulint fold, ulint n) const
{
static_assert(!((ELEMENTS_PER_LATCH + 1) & ELEMENTS_PER_LATCH),
"must be one less than a power of 2");
return reinterpret_cast<hash_latch*>
(&array[calc_hash(fold, n) & ~ELEMENTS_PER_LATCH]);
}
};
private:
......@@ -798,13 +782,16 @@ class lock_sys_t
/** @return whether the current thread is the lock_sys.latch writer */
bool is_writer() const
{ return writer.load(std::memory_order_relaxed) == os_thread_get_curr_id(); }
/** Assert that a lock shard is exclusively latched by this thread */
/** Assert that a lock shard is exclusively latched (by some thread) */
void assert_locked(const lock_t &lock) const;
/** Assert that a table lock shard is exclusively latched by this thread */
void assert_locked(const dict_table_t &table) const;
/** Assert that a hash table cell is exclusively latched (by some thread) */
void assert_locked(const hash_cell_t &cell) const;
#else
void assert_locked(const lock_t &) const {}
void assert_locked(const dict_table_t &) const {}
void assert_locked(const hash_cell_t &) const {}
#endif
/**
......@@ -858,19 +845,21 @@ class lock_sys_t
hash_table &prdt_hash_get(bool page)
{ return page ? prdt_page_hash : prdt_hash; }
lock_t *get_first(ulint type_mode, const page_id_t id)
{
return hash_get(type_mode).get_first(id);
}
/** Get the first lock on a page.
@param cell hash table cell
@param id page number
@return first lock
@retval nullptr if none exists */
static inline lock_t *get_first(const hash_cell_t &cell, page_id_t id);
/** Get the first explicit lock request on a record.
@param hash lock hash table
@param cell first lock hash table cell
@param id page identifier
@param heap_no record identifier in page
@return first lock
@retval nullptr if none exists */
inline lock_t *get_first(hash_table &hash, const page_id_t id,
ulint heap_no);
static inline lock_t *get_first(const hash_cell_t &cell, page_id_t id,
ulint heap_no);
/** Remove locks on a discarded SPATIAL INDEX page.
@param id page to be discarded
......@@ -888,15 +877,31 @@ inline ulint lock_sys_t::hash_table::calc_hash(ulint fold) const
return calc_hash(fold, n_cells);
}
/** Get a latch. */
inline
lock_sys_t::hash_latch *lock_sys_t::hash_table::lock_get(ulint fold) const
/** Get a hash table cell. */
inline hash_cell_t *lock_sys_t::hash_table::cell_get(ulint fold) const
{
ut_ad(lock_sys.is_writer() || lock_sys.readers);
return lock_get(fold, n_cells);
return &array[calc_hash(fold)];
}
/** Get the first lock on a page.
@param cell hash table cell
@param id page number
@return first lock
@retval nullptr if none exists */
inline lock_t *lock_sys_t::get_first(const hash_cell_t &cell, page_id_t id)
{
lock_sys.assert_locked(cell);
for (auto lock= static_cast<lock_t*>(cell.node); lock; lock= lock->hash)
{
ut_ad(!lock->is_table());
if (lock->un_member.rec_lock.page_id == id)
return lock;
}
return nullptr;
}
/** lock_sys.latch guard */
/** lock_sys.latch exclusive guard */
struct LockMutexGuard
{
LockMutexGuard(SRW_LOCK_ARGS(const char *file, unsigned line))
......@@ -904,30 +909,39 @@ struct LockMutexGuard
~LockMutexGuard() { lock_sys.wr_unlock(); }
};
/** lock_sys.latch guard for a page_id_t shard */
/** lock_sys latch guard for 1 page_id_t */
struct LockGuard
{
LockGuard(lock_sys_t::hash_table &hash, const page_id_t id);
~LockGuard()
{
latch->release();
lock_sys_t::hash_table::latch(cell_)->release();
/* Must be last, to avoid a race with lock_sys_t::hash_table::resize() */
lock_sys.rd_unlock();
}
/** @return the hash array cell */
hash_cell_t &cell() const { return *cell_; }
private:
/** The hash bucket */
lock_sys_t::hash_latch *latch;
/** The hash array cell */
hash_cell_t *cell_;
};
/** lock_sys.latch guard for 2 page_id_t shards */
/** lock_sys latch guard for 2 page_id_t */
struct LockMultiGuard
{
LockMultiGuard(lock_sys_t::hash_table &hash,
const page_id_t id1, const page_id_t id2);
~LockMultiGuard();
/** @return the first hash array cell */
hash_cell_t &cell1() const { return *cell1_; }
/** @return the second hash array cell */
hash_cell_t &cell2() const { return *cell2_; }
private:
/** The hash buckets */
lock_sys_t::hash_latch *latch1, *latch2;
/** The first hash array cell */
hash_cell_t *cell1_;
/** The second hash array cell */
hash_cell_t *cell2_;
};
/*********************************************************************//**
......
......@@ -520,18 +520,23 @@ lock_rec_get_next_const(
const lock_t* lock); /*!< in: lock */
/** Get the first explicit lock request on a record.
@param hash lock hash table
@param cell first lock hash table cell
@param id page identifier
@param heap_no record identifier in page
@return first lock
@retval nullptr if none exists */
inline lock_t *lock_sys_t::get_first(lock_sys_t::hash_table &hash,
const page_id_t id, ulint heap_no)
inline lock_t *lock_sys_t::get_first(const hash_cell_t &cell, page_id_t id,
ulint heap_no)
{
for (lock_t *lock= hash.get_first(id);
lock; lock= lock_rec_get_next_on_page(lock))
if (lock_rec_get_nth_bit(lock, heap_no))
lock_sys.assert_locked(cell);
for (lock_t *lock= static_cast<lock_t*>(cell.node); lock; lock= lock->hash)
{
ut_ad(!lock->is_table());
if (lock->un_member.rec_lock.page_id == id &&
lock_rec_get_nth_bit(lock, heap_no))
return lock;
}
return nullptr;
}
......
......@@ -173,13 +173,22 @@ void lock_sys_t::assert_locked(const dict_table_t &table) const
ut_ad(table.lock_mutex_is_owner());
}
/** Assert that a page shard is exclusively latched by this thread */
/** Assert that hash cell for page is exclusively latched by this thread */
void lock_sys_t::hash_table::assert_locked(const page_id_t id) const
{
if (lock_sys.is_writer())
return;
ut_ad(lock_sys.readers);
ut_ad(lock_get(id.fold())->is_locked());
ut_ad(latch(cell_get(id.fold()))->is_locked());
}
/** Assert that a hash table cell is exclusively latched (by some thread) */
void lock_sys_t::assert_locked(const hash_cell_t &cell) const
{
if (lock_sys.is_writer())
return;
ut_ad(lock_sys.readers);
ut_ad(hash_table::latch(const_cast<hash_cell_t*>(&cell))->is_locked());
}
#endif
......@@ -187,8 +196,8 @@ LockGuard::LockGuard(lock_sys_t::hash_table &hash, page_id_t id)
{
const auto id_fold= id.fold();
lock_sys.rd_lock(SRW_LOCK_CALL);
latch= hash.lock_get(id_fold);
latch->acquire();
cell_= hash.cell_get(id_fold);
hash.latch(cell_)->acquire();
}
LockMultiGuard::LockMultiGuard(lock_sys_t::hash_table &hash,
......@@ -197,8 +206,10 @@ LockMultiGuard::LockMultiGuard(lock_sys_t::hash_table &hash,
ut_ad(id1.space() == id2.space());
const auto id1_fold= id1.fold(), id2_fold= id2.fold();
lock_sys.rd_lock(SRW_LOCK_CALL);
latch1= hash.lock_get(id1_fold);
latch2= hash.lock_get(id2_fold);
cell1_= hash.cell_get(id1_fold);
cell2_= hash.cell_get(id2_fold);
auto latch1= hash.latch(cell1_), latch2= hash.latch(cell2_);
if (latch1 > latch2)
std::swap(latch1, latch2);
latch1->acquire();
......@@ -208,6 +219,8 @@ LockMultiGuard::LockMultiGuard(lock_sys_t::hash_table &hash,
LockMultiGuard::~LockMultiGuard()
{
auto latch1= lock_sys_t::hash_table::latch(cell1_),
latch2= lock_sys_t::hash_table::latch(cell2_);
latch1->release();
if (latch1 != latch2)
latch2->release();
......@@ -792,21 +805,16 @@ lock_rec_get_prev(
const lock_t* in_lock,/*!< in: record lock */
ulint heap_no)/*!< in: heap number of the record */
{
lock_t* lock;
lock_t* found_lock = NULL;
ut_ad(!in_lock->is_table());
const page_id_t id{in_lock->un_member.rec_lock.page_id};
ut_ad(!in_lock->is_table());
const page_id_t id{in_lock->un_member.rec_lock.page_id};
hash_cell_t *cell= lock_sys.hash_get(in_lock->type_mode).cell_get(id.fold());
for (lock = lock_sys.get_first(in_lock->type_mode, id);
lock != in_lock;
lock = lock_rec_get_next_on_page(lock)) {
if (lock_rec_get_nth_bit(lock, heap_no)) {
found_lock = lock;
}
}
for (lock_t *lock= lock_sys_t::get_first(*cell, id); lock != in_lock;
lock= lock_rec_get_next_on_page(lock))
if (lock_rec_get_nth_bit(lock, heap_no))
return lock;
return found_lock;
return nullptr;
}
/*============= FUNCTIONS FOR ANALYZING RECORD LOCK QUEUE ================*/
......@@ -824,6 +832,7 @@ lock_rec_has_expl(
LOCK_REC_NOT_GAP, for a
supremum record we regard this
always a gap type request */
const hash_cell_t& cell, /*!< in: lock hash table cell */
const page_id_t id, /*!< in: page identifier */
ulint heap_no,/*!< in: heap number of the record */
const trx_t* trx) /*!< in: transaction */
......@@ -832,7 +841,7 @@ lock_rec_has_expl(
|| (precise_mode & LOCK_MODE_MASK) == LOCK_X);
ut_ad(!(precise_mode & LOCK_INSERT_INTENTION));
for (lock_t *lock= lock_sys.get_first(lock_sys.rec_hash, id, heap_no); lock;
for (lock_t *lock= lock_sys_t::get_first(cell, id, heap_no); lock;
lock= lock_rec_get_next(heap_no, lock))
if (lock->trx == trx &&
!(lock->type_mode & (LOCK_WAIT | LOCK_INSERT_INTENTION)) &&
......@@ -855,6 +864,7 @@ lock_t*
lock_rec_other_has_expl_req(
/*========================*/
lock_mode mode, /*!< in: LOCK_S or LOCK_X */
const hash_cell_t& cell, /*!< in: lock hash table cell */
const page_id_t id, /*!< in: page identifier */
bool wait, /*!< in: whether also waiting locks
are taken into account */
......@@ -871,7 +881,7 @@ lock_rec_other_has_expl_req(
return(NULL);
}
for (lock_t* lock = lock_sys.get_first(lock_sys.rec_hash, id, heap_no);
for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no);
lock; lock = lock_rec_get_next(heap_no, lock)) {
if (lock->trx != trx
&& !lock->is_gap()
......@@ -928,18 +938,22 @@ ATTRIBUTE_COLD ATTRIBUTE_NOINLINE static void lock_wait_wsrep(trx_t *trx)
if (lock->trx != trx)
victims.emplace(lock->trx);
}
else if (lock_t *lock=
(wait_lock->type_mode & LOCK_PREDICATE
? lock_sys.prdt_hash : lock_sys.rec_hash).
get_first(wait_lock->un_member.rec_lock.page_id))
else
{
const ulint heap_no= lock_rec_find_set_bit(wait_lock);
if (!lock_rec_get_nth_bit(lock, heap_no))
lock= lock_rec_get_next(heap_no, lock);
do
if (lock->trx != trx)
victims.emplace(lock->trx);
while ((lock= lock_rec_get_next(heap_no, lock)));
const page_id_t id{wait_lock->un_member.rec_lock.page_id};
hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
? lock_sys.prdt_hash : lock_sys.rec_hash).cell_get
(id.fold());
if (lock_t *lock= lock_sys_t::get_first(cell, id))
{
const ulint heap_no= lock_rec_find_set_bit(wait_lock);
if (!lock_rec_get_nth_bit(lock, heap_no))
lock= lock_rec_get_next(heap_no, lock);
do
if (lock->trx != trx)
victims.emplace(lock->trx);
while ((lock= lock_rec_get_next(heap_no, lock)));
}
}
if (victims.empty())
......@@ -981,13 +995,14 @@ lock_rec_other_has_conflicting(
possibly ORed to LOCK_GAP or
LOC_REC_NOT_GAP,
LOCK_INSERT_INTENTION */
const hash_cell_t& cell, /*!< in: lock hash table cell */
const page_id_t id, /*!< in: page identifier */
ulint heap_no,/*!< in: heap number of the record */
const trx_t* trx) /*!< in: our transaction */
{
bool is_supremum = (heap_no == PAGE_HEAP_NO_SUPREMUM);
for (lock_t* lock = lock_sys.get_first(lock_sys.rec_hash, id, heap_no);
for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no);
lock; lock = lock_rec_get_next(heap_no, lock)) {
if (lock_rec_has_to_wait(true, trx, mode, lock, is_supremum)) {
return(lock);
......@@ -1332,6 +1347,7 @@ lock_rec_add_to_queue(
/*==================*/
unsigned type_mode,/*!< in: lock mode, wait, gap
etc. flags */
hash_cell_t& cell, /*!< in,out: first hash table cell */
const page_id_t id, /*!< in: page identifier */
const page_t* page, /*!< in: buffer block containing
the record */
......@@ -1362,7 +1378,7 @@ lock_rec_add_to_queue(
: LOCK_S;
const lock_t* other_lock
= lock_rec_other_has_expl_req(
mode, id, false, heap_no, trx);
mode, cell, id, false, heap_no, trx);
#ifdef WITH_WSREP
if (UNIV_LIKELY_NULL(other_lock) && trx->is_wsrep()) {
/* Only BF transaction may be granted lock
......@@ -1396,7 +1412,7 @@ lock_rec_add_to_queue(
if (type_mode & LOCK_WAIT) {
goto create;
} else if (lock_t *first_lock = lock_sys.get_first(type_mode, id)) {
} else if (lock_t *first_lock = lock_sys_t::get_first(cell, id)) {
for (lock_t* lock = first_lock;;) {
if (lock->is_waiting()
&& lock_rec_get_nth_bit(lock, heap_no)) {
......@@ -1482,7 +1498,7 @@ lock_rec_lock(
const page_id_t id{block->page.id()};
LockGuard g{lock_sys.rec_hash, id};
if (lock_t *lock= lock_sys.rec_hash.get_first(id))
if (lock_t *lock= lock_sys_t::get_first(g.cell(), id))
{
dberr_t err= DB_SUCCESS;
trx->mutex_lock();
......@@ -1492,9 +1508,9 @@ lock_rec_lock(
lock_rec_get_n_bits(lock) <= heap_no)
{
/* Do nothing if the trx already has a strong enough lock on rec */
if (!lock_rec_has_expl(mode, id, heap_no, trx))
if (!lock_rec_has_expl(mode, g.cell(), id, heap_no, trx))
{
if (lock_t *c_lock= lock_rec_other_has_conflicting(mode, id,
if (lock_t *c_lock= lock_rec_other_has_conflicting(mode, g.cell(), id,
heap_no, trx))
/*
If another transaction has a non-gap conflicting
......@@ -1507,8 +1523,8 @@ lock_rec_lock(
else if (!impl)
{
/* Set the requested lock on the record. */
lock_rec_add_to_queue(mode, id, block->frame, heap_no, index,
trx, true);
lock_rec_add_to_queue(mode, g.cell(), id, block->frame, heap_no,
index, trx, true);
err= DB_SUCCESS_LOCKED_REC;
}
}
......@@ -1547,9 +1563,7 @@ Checks if a waiting record lock request still has to wait in a queue.
@return lock that is causing the wait */
static
const lock_t*
lock_rec_has_to_wait_in_queue(
/*==========================*/
const lock_t* wait_lock) /*!< in: waiting record lock */
lock_rec_has_to_wait_in_queue(const hash_cell_t &cell, const lock_t *wait_lock)
{
const lock_t* lock;
ulint heap_no;
......@@ -1564,8 +1578,8 @@ lock_rec_has_to_wait_in_queue(
bit_offset = heap_no / 8;
bit_mask = static_cast<ulint>(1) << (heap_no % 8);
for (lock = lock_sys.get_first(wait_lock->type_mode,
wait_lock->un_member.rec_lock.page_id);
for (lock = lock_sys_t::get_first(
cell, wait_lock->un_member.rec_lock.page_id);
lock != wait_lock;
lock = lock_rec_get_next_on_page_const(lock)) {
const byte* p = (const byte*) &lock[1];
......@@ -1646,18 +1660,22 @@ static void lock_wait_rpl_report(trx_t *trx)
if (!(lock->type_mode & LOCK_AUTO_INC) && lock->trx != trx)
thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
}
else if (lock_t *lock=
(wait_lock->type_mode & LOCK_PREDICATE
? lock_sys.prdt_hash : lock_sys.rec_hash).
get_first(wait_lock->un_member.rec_lock.page_id))
else
{
const ulint heap_no= lock_rec_find_set_bit(wait_lock);
if (!lock_rec_get_nth_bit(lock, heap_no))
lock= lock_rec_get_next(heap_no, lock);
do
if (lock->trx->mysql_thd != thd)
thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
while ((lock= lock_rec_get_next(heap_no, lock)));
const page_id_t id{wait_lock->un_member.rec_lock.page_id};
hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
? lock_sys.prdt_hash : lock_sys.rec_hash).cell_get
(id.fold());
if (lock_t *lock= lock_sys_t::get_first(cell, id))
{
const ulint heap_no= lock_rec_find_set_bit(wait_lock);
if (!lock_rec_get_nth_bit(lock, heap_no))
lock= lock_rec_get_next(heap_no, lock);
do
if (lock->trx->mysql_thd != thd)
thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
while ((lock= lock_rec_get_next(heap_no, lock)));
}
}
goto func_exit;
......@@ -1880,8 +1898,8 @@ static void lock_rec_cancel(lock_t *lock)
mysql_mutex_lock(&lock_sys.wait_mutex);
trx->mutex_lock();
lock_sys.hash_get(lock->type_mode).
assert_locked(lock->un_member.rec_lock.page_id);
ut_d(lock_sys.hash_get(lock->type_mode).
assert_locked(lock->un_member.rec_lock.page_id));
/* Reset the bit (there can be only one set bit) in the lock bitmap */
lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
......@@ -1908,7 +1926,6 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex)
const page_id_t page_id{in_lock->un_member.rec_lock.page_id};
auto& lock_hash = lock_sys.hash_get(in_lock->type_mode);
lock_hash.assert_locked(page_id);
ut_ad(lock_sys.is_writer() || in_lock->trx->mutex_is_owner());
ut_d(auto old_n_locks=)
......@@ -1916,6 +1933,8 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex)
ut_ad(old_n_locks);
const ulint rec_fold = page_id.fold();
hash_cell_t &cell = *lock_hash.cell_get(rec_fold);
lock_sys.assert_locked(cell);
HASH_DELETE(lock_t, hash, &lock_hash, rec_fold, in_lock);
ut_ad(lock_sys.is_writer() || in_lock->trx->mutex_is_owner());
......@@ -1930,7 +1949,7 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex)
grant locks if there are no conflicting locks ahead. Stop at
the first X lock that is waiting or has been granted. */
for (lock_t* lock = lock_hash.get_first(page_id);
for (lock_t* lock = lock_sys_t::get_first(cell, page_id);
lock != NULL;
lock = lock_rec_get_next_on_page(lock)) {
......@@ -1946,7 +1965,8 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex)
ut_ad(lock->trx->lock.wait_trx);
ut_ad(lock->trx->lock.wait_lock);
if (const lock_t* c = lock_rec_has_to_wait_in_queue(lock)) {
if (const lock_t* c = lock_rec_has_to_wait_in_queue(
cell, lock)) {
trx_t* c_trx = c->trx;
IF_WSREP(wsrep_assert_no_bf_bf_wait(c, lock, c_trx),);
lock->trx->lock.wait_trx = c_trx;
......@@ -1993,12 +2013,10 @@ Removes record lock objects set on an index page which is discarded. This
function does not move locks, or check for waiting locks, therefore the
lock bitmaps must already be reset when this function is called. */
static void
lock_rec_free_all_from_discard_page(const page_id_t id,
lock_rec_free_all_from_discard_page(page_id_t id, const hash_cell_t &cell,
lock_sys_t::hash_table &lock_hash)
{
lock_t *lock= lock_hash.get_first(id);
while (lock)
for (lock_t *lock= lock_sys_t::get_first(cell, id); lock; )
{
ut_ad(&lock_hash != &lock_sys.rec_hash ||
lock_rec_find_set_bit(lock) == ULINT_UNDEFINED);
......@@ -2016,11 +2034,10 @@ Resets the lock bits for a single record. Releases transactions waiting for
lock requests here. */
static
void
lock_rec_reset_and_release_wait(
const page_id_t id, /*!< in: page identifier */
ulint heap_no)/*!< in: heap number of record */
lock_rec_reset_and_release_wait(const hash_cell_t &cell, const page_id_t id,
ulint heap_no)
{
for (lock_t *lock= lock_sys.get_first(lock_sys.rec_hash, id, heap_no); lock;
for (lock_t *lock= lock_sys.get_first(cell, id, heap_no); lock;
lock= lock_rec_get_next(heap_no, lock))
{
if (lock->is_waiting())
......@@ -2044,9 +2061,11 @@ static
void
lock_rec_inherit_to_gap(
/*====================*/
hash_cell_t& heir_cell, /*!< heir hash table cell */
const page_id_t heir, /*!< in: page containing the
record which inherits */
const page_id_t id, /*!< in: page containing the
const hash_cell_t& donor_cell, /*!< donor hash table cell */
const page_id_t donor, /*!< in: page containing the
record from which inherited;
does NOT reset the locks on
this record */
......@@ -2062,7 +2081,7 @@ lock_rec_inherit_to_gap(
DO want S-locks/X-locks(taken for replace) set by a consistency
constraint to be inherited also then. */
for (lock_t* lock= lock_sys.get_first(lock_sys.rec_hash, id, heap_no);
for (lock_t* lock= lock_sys_t::get_first(donor_cell, donor, heap_no);
lock;
lock = lock_rec_get_next(heap_no, lock)) {
trx_t* lock_trx = lock->trx;
......@@ -2071,7 +2090,7 @@ lock_rec_inherit_to_gap(
|| lock->mode() !=
(lock_trx->duplicates ? LOCK_S : LOCK_X))) {
lock_rec_add_to_queue(LOCK_GAP | lock->mode(),
heir, heir_page,
heir_cell, heir, heir_page,
heir_heap_no,
lock->index, lock_trx, false);
}
......@@ -2097,12 +2116,13 @@ lock_rec_inherit_to_gap_if_gap_lock(
const page_id_t id{block->page.id()};
LockGuard g{lock_sys.rec_hash, id};
for (lock_t *lock= lock_sys.get_first(lock_sys.rec_hash, id, heap_no); lock;
for (lock_t *lock= lock_sys_t::get_first(g.cell(), id, heap_no); lock;
lock= lock_rec_get_next(heap_no, lock))
if (!lock->is_insert_intention() && (heap_no == PAGE_HEAP_NO_SUPREMUM ||
!lock->is_record_not_gap()) &&
!lock_table_has(lock->trx, lock->index->table, LOCK_X))
lock_rec_add_to_queue(LOCK_GAP | lock->mode(), id, block->frame,
lock_rec_add_to_queue(LOCK_GAP | lock->mode(),
g.cell(), id, block->frame,
heir_heap_no, lock->index, lock->trx, false);
}
......@@ -2111,11 +2131,12 @@ Moves the locks of a record to another record and resets the lock bits of
the donating record. */
static
void
lock_rec_move_low(
/*==============*/
lock_sys_t::hash_table* lock_hash, /*!< in: hash table to use */
lock_rec_move(
hash_cell_t& receiver_cell, /*!< in: hash table cell */
const buf_block_t& receiver, /*!< in: buffer block containing
the receiving record */
const page_id_t receiver_id, /*!< in: page identifier */
const hash_cell_t& donator_cell, /*!< in: hash table cell */
const page_id_t donator_id, /*!< in: page identifier of
the donating record */
ulint receiver_heap_no,/*!< in: heap_no of the record
......@@ -2125,15 +2146,11 @@ lock_rec_move_low(
ulint donator_heap_no)/*!< in: heap_no of the record
which gives the locks */
{
const page_id_t receiver_id{receiver.page.id()};
ut_ad(!lock_sys_t::get_first(receiver_cell,
receiver_id, receiver_heap_no));
/* If the lock is predicate lock, it resides on INFIMUM record */
ut_ad(!lock_sys.get_first(*lock_hash, receiver_id, receiver_heap_no)
|| lock_hash == &lock_sys.prdt_hash
|| lock_hash == &lock_sys.prdt_page_hash);
for (lock_t *lock =
lock_sys.get_first(*lock_hash, donator_id, donator_heap_no);
for (lock_t *lock = lock_sys_t::get_first(donator_cell, donator_id,
donator_heap_no);
lock != NULL;
lock = lock_rec_get_next(donator_heap_no, lock)) {
const auto type_mode = lock->type_mode;
......@@ -2149,14 +2166,15 @@ lock_rec_move_low(
/* Note that we FIRST reset the bit, and then set the lock:
the function works also if donator_id == receiver_id */
lock_rec_add_to_queue(type_mode, receiver_id, receiver.frame,
lock_rec_add_to_queue(type_mode, receiver_cell,
receiver_id, receiver.frame,
receiver_heap_no,
lock->index, lock_trx, true);
lock_trx->mutex_unlock();
}
ut_ad(!lock_sys.get_first(lock_sys.rec_hash,
donator_id, donator_heap_no));
ut_ad(!lock_sys_t::get_first(donator_cell, donator_id,
donator_heap_no));
}
/** Move all the granted locks to the front of the given lock list.
......@@ -2192,28 +2210,6 @@ lock_move_granted_locks_to_front(
}
}
/*************************************************************//**
Moves the locks of a record to another record and resets the lock bits of
the donating record. */
UNIV_INLINE
void
lock_rec_move(
/*==========*/
const buf_block_t& receiver, /*!< in: buffer block containing
the receiving record */
const page_id_t donator_id, /*!< in: page identifier of
the donating record */
ulint receiver_heap_no,/*!< in: heap_no of the record
which gets the locks; there
must be no lock requests
on it! */
ulint donator_heap_no)/*!< in: heap_no of the record
which gives the locks */
{
lock_rec_move_low(&lock_sys.rec_hash, receiver, donator_id,
receiver_heap_no, donator_heap_no);
}
/*************************************************************//**
Updates the lock table when we have reorganized a page. NOTE: we copy
also the locks set on the infimum of the page; the infimum may carry
......@@ -2234,19 +2230,21 @@ lock_move_reorganize_page(
UT_LIST_INIT(old_locks, &lock_t::trx_locks);
const page_id_t id{block->page.id()};
const auto id_fold= id.fold();
{
LockGuard g{lock_sys.rec_hash, id};
if (!lock_sys.rec_hash.get_first(id))
if (!lock_sys_t::get_first(g.cell(), id))
return;
}
/* We will modify arbitrary trx->lock.trx_locks. */
LockMutexGuard g{SRW_LOCK_CALL};
hash_cell_t &cell= *lock_sys.rec_hash.cell_get(id_fold);
/* Note: Predicate locks for SPATIAL INDEX are not affected by
page reorganize, because they do not refer to individual record
heap numbers. */
lock_t *lock= lock_sys.rec_hash.get_first(id);
lock_t *lock= lock_sys_t::get_first(cell, id);
if (!lock)
return;
......@@ -2332,8 +2330,8 @@ lock_move_reorganize_page(
/* NOTE that the old lock bitmap could be too
small for the new heap number! */
lock_rec_add_to_queue(lock->type_mode, id, block->frame, new_heap_no,
lock->index, lock_trx, true);
lock_rec_add_to_queue(lock->type_mode, cell, id, block->frame,
new_heap_no, lock->index, lock_trx, true);
}
lock_trx->mutex_unlock();
......@@ -2386,7 +2384,7 @@ lock_move_rec_list_end(
the original order, because new elements are inserted to a hash
table to the end of the hash chain, and lock_rec_add_to_queue
does not reuse locks if there are waiters in the queue. */
for (lock_t *lock= lock_sys.rec_hash.get_first(id); lock;
for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
lock= lock_rec_get_next_on_page(lock))
{
const rec_t *rec1= rec;
......@@ -2455,7 +2453,7 @@ lock_move_rec_list_end(
lock->type_mode&= ~LOCK_WAIT;
}
lock_rec_add_to_queue(type_mode, new_id, new_block->frame,
lock_rec_add_to_queue(type_mode, g.cell2(), new_id, new_block->frame,
rec2_heap_no, lock->index, lock_trx, true);
}
......@@ -2505,7 +2503,7 @@ lock_move_rec_list_start(
{
LockMultiGuard g{lock_sys.rec_hash, id, new_id};
for (lock_t *lock= lock_sys.rec_hash.get_first(id); lock;
for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
lock= lock_rec_get_next_on_page(lock))
{
const rec_t *rec1;
......@@ -2567,7 +2565,7 @@ lock_move_rec_list_start(
lock->type_mode&= ~LOCK_WAIT;
}
lock_rec_add_to_queue(type_mode, new_id, new_block->frame,
lock_rec_add_to_queue(type_mode, g.cell2(), new_id, new_block->frame,
rec2_heap_no, lock->index, lock_trx, true);
}
......@@ -2614,7 +2612,7 @@ lock_rtr_move_rec_list(
{
LockMultiGuard g{lock_sys.rec_hash, id, new_id};
for (lock_t *lock= lock_sys.rec_hash.get_first(id); lock;
for (lock_t *lock= lock_sys_t::get_first(g.cell1(), id); lock;
lock= lock_rec_get_next_on_page(lock))
{
const rec_t *rec1;
......@@ -2659,7 +2657,7 @@ lock_rtr_move_rec_list(
lock->type_mode&= ~LOCK_WAIT;
}
lock_rec_add_to_queue(type_mode, new_id, new_block->frame,
lock_rec_add_to_queue(type_mode, g.cell2(), new_id, new_block->frame,
rec2_heap_no, lock->index, lock_trx, true);
rec_move[moved].moved= true;
......@@ -2691,26 +2689,30 @@ lock_update_split_right(
/* Move the locks on the supremum of the left page to the supremum
of the right page */
lock_rec_move(*right_block, l, PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
lock_rec_move(g.cell2(), *right_block, r, g.cell1(), l,
PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
/* Inherit the locks to the supremum of left page from the successor
of the infimum on right page */
lock_rec_inherit_to_gap(l, r, left_block->frame, PAGE_HEAP_NO_SUPREMUM, h);
lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->frame,
PAGE_HEAP_NO_SUPREMUM, h);
}
#ifdef UNIV_DEBUG
static void lock_assert_no_spatial(const page_id_t id)
{
const auto id_fold= id.fold();
auto latch= lock_sys.prdt_page_hash.lock_get(id_fold);
auto cell= lock_sys.prdt_page_hash.cell_get(id_fold);
auto latch= lock_sys_t::hash_table::latch(cell);
latch->acquire();
/* there should exist no page lock on the left page,
otherwise, it will be blocked from merge */
ut_ad(!lock_sys.prdt_page_hash.get_first(id));
ut_ad(!lock_sys_t::get_first(*cell, id));
latch->release();
latch= lock_sys.prdt_hash.lock_get(id_fold);
cell= lock_sys.prdt_hash.cell_get(id_fold);
latch= lock_sys_t::hash_table::latch(cell);
latch->acquire();
ut_ad(!lock_sys.prdt_hash.get_first(id));
ut_ad(!lock_sys_t::get_first(*cell, id));
latch->release();
}
#endif
......@@ -2739,14 +2741,14 @@ lock_update_merge_right(
/* Inherit the locks from the supremum of the left page to the
original successor of infimum on the right page, to which the left
page was merged */
lock_rec_inherit_to_gap(r, l, right_block->frame,
lock_rec_inherit_to_gap(g.cell2(), r, g.cell1(), l, right_block->frame,
page_rec_get_heap_no(orig_succ),
PAGE_HEAP_NO_SUPREMUM);
/* Reset the locks on the supremum of the left page, releasing
waiting transactions */
lock_rec_reset_and_release_wait(l, PAGE_HEAP_NO_SUPREMUM);
lock_rec_free_all_from_discard_page(l, lock_sys.rec_hash);
lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
lock_rec_free_all_from_discard_page(l, g.cell1(), lock_sys.rec_hash);
ut_d(lock_assert_no_spatial(l));
}
......@@ -2759,9 +2761,11 @@ of the root page will act as a dummy carrier of the locks of the record
to be updated. */
void lock_update_root_raise(const buf_block_t &block, const page_id_t root)
{
LockMultiGuard g{lock_sys.rec_hash, block.page.id(), root};
const page_id_t id{block.page.id()};
LockMultiGuard g{lock_sys.rec_hash, id, root};
/* Move the locks on the supremum of the root to the supremum of block */
lock_rec_move(block, root, PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
lock_rec_move(g.cell1(), block, id, g.cell2(), root,
PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
}
/** Update the lock table when a page is copied to another.
......@@ -2769,10 +2773,12 @@ void lock_update_root_raise(const buf_block_t &block, const page_id_t root)
@param old old page (not index root page) */
void lock_update_copy_and_discard(const buf_block_t &new_block, page_id_t old)
{
LockMultiGuard g{lock_sys.rec_hash, new_block.page.id(), old};
const page_id_t id{new_block.page.id()};
LockMultiGuard g{lock_sys.rec_hash, id, old};
/* Move the locks on the supremum of the old page to the supremum of new */
lock_rec_move(new_block, old, PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
lock_rec_free_all_from_discard_page(old, lock_sys.rec_hash);
lock_rec_move(g.cell1(), new_block, id, g.cell2(), old,
PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
lock_rec_free_all_from_discard_page(old, g.cell2(), lock_sys.rec_hash);
}
/*************************************************************//**
......@@ -2789,7 +2795,8 @@ lock_update_split_left(
LockMultiGuard g{lock_sys.rec_hash, l, r};
/* Inherit the locks to the supremum of the left page from the
successor of the infimum on the right page */
lock_rec_inherit_to_gap(l, r, left_block->frame, PAGE_HEAP_NO_SUPREMUM, h);
lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->frame,
PAGE_HEAP_NO_SUPREMUM, h);
}
/** Update the lock table when a page is merged to the left.
......@@ -2810,19 +2817,20 @@ void lock_update_merge_left(const buf_block_t& left, const rec_t *orig_pred,
{
/* Inherit the locks on the supremum of the left page to the
first record which was moved from the right page */
lock_rec_inherit_to_gap(l, l, left.frame,
lock_rec_inherit_to_gap(g.cell1(), l, g.cell1(), l, left.frame,
page_rec_get_heap_no(left_next_rec),
PAGE_HEAP_NO_SUPREMUM);
/* Reset the locks on the supremum of the left page,
releasing waiting transactions */
lock_rec_reset_and_release_wait(l, PAGE_HEAP_NO_SUPREMUM);
lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
}
/* Move the locks from the supremum of right page to the supremum
of the left page */
lock_rec_move(left, right, PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
lock_rec_free_all_from_discard_page(right, lock_sys.rec_hash);
lock_rec_move(g.cell1(), left, l, g.cell2(), right,
PAGE_HEAP_NO_SUPREMUM, PAGE_HEAP_NO_SUPREMUM);
lock_rec_free_all_from_discard_page(right, g.cell2(), lock_sys.rec_hash);
/* there should exist no page lock on the right page,
otherwise, it will be blocked from merge */
......@@ -2848,8 +2856,9 @@ lock_rec_reset_and_inherit_gap_locks(
{
const page_id_t heir{heir_block.page.id()};
LockMultiGuard g{lock_sys.rec_hash, heir, donor};
lock_rec_reset_and_release_wait(heir, heir_heap_no);
lock_rec_inherit_to_gap(heir, donor, heir_block.frame, heir_heap_no, heap_no);
lock_rec_reset_and_release_wait(g.cell1(), heir, heir_heap_no);
lock_rec_inherit_to_gap(g.cell1(), heir, g.cell2(), donor, heir_block.frame,
heir_heap_no, heap_no);
}
/*************************************************************//**
......@@ -2871,7 +2880,7 @@ lock_update_discard(
const page_id_t page_id(block->page.id());
LockMultiGuard g{lock_sys.rec_hash, heir, page_id};
if (lock_sys.rec_hash.get_first(page_id)) {
if (lock_sys_t::get_first(g.cell2(), page_id)) {
ut_d(lock_assert_no_spatial(page_id));
/* Inherit all the locks on the page to the record and
reset all the locks on the page */
......@@ -2882,12 +2891,13 @@ lock_update_discard(
do {
heap_no = rec_get_heap_no_new(rec);
lock_rec_inherit_to_gap(heir, page_id,
lock_rec_inherit_to_gap(g.cell1(), heir,
g.cell2(), page_id,
heir_block->frame,
heir_heap_no, heap_no);
lock_rec_reset_and_release_wait(
page_id, heap_no);
g.cell2(), page_id, heap_no);
rec = page + rec_get_next_offs(rec, TRUE);
} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
......@@ -2897,30 +2907,33 @@ lock_update_discard(
do {
heap_no = rec_get_heap_no_old(rec);
lock_rec_inherit_to_gap(heir, page_id,
lock_rec_inherit_to_gap(g.cell1(), heir,
g.cell2(), page_id,
heir_block->frame,
heir_heap_no, heap_no);
lock_rec_reset_and_release_wait(
page_id, heap_no);
g.cell2(), page_id, heap_no);
rec = page + rec_get_next_offs(rec, FALSE);
} while (heap_no != PAGE_HEAP_NO_SUPREMUM);
}
lock_rec_free_all_from_discard_page(page_id,
lock_rec_free_all_from_discard_page(page_id, g.cell2(),
lock_sys.rec_hash);
} else {
const auto fold = page_id.fold();
auto latch = lock_sys.prdt_hash.lock_get(fold);
auto cell = lock_sys.prdt_hash.cell_get(fold);
auto latch = lock_sys_t::hash_table::latch(cell);
latch->acquire();
lock_rec_free_all_from_discard_page(page_id,
lock_rec_free_all_from_discard_page(page_id, *cell,
lock_sys.prdt_hash);
latch->release();
latch= lock_sys.prdt_page_hash.lock_get(fold);
cell = lock_sys.prdt_page_hash.cell_get(fold);
latch = lock_sys_t::hash_table::latch(cell);
latch->acquire();
lock_rec_free_all_from_discard_page(
page_id, lock_sys.prdt_page_hash);
lock_rec_free_all_from_discard_page(page_id, *cell,
lock_sys.prdt_page_hash);
latch->release();
}
}
......@@ -2988,10 +3001,11 @@ lock_update_delete(
/* Let the next record inherit the locks from rec, in gap mode */
lock_rec_inherit_to_gap(id, id, block->frame, next_heap_no, heap_no);
lock_rec_inherit_to_gap(g.cell(), id, g.cell(), id, block->frame,
next_heap_no, heap_no);
/* Reset the lock bits on rec and release waiting transactions */
lock_rec_reset_and_release_wait(id, heap_no);
lock_rec_reset_and_release_wait(g.cell(), id, heap_no);
}
/*********************************************************************//**
......@@ -3017,7 +3031,8 @@ lock_rec_store_on_page_infimum(
const page_id_t id{block->page.id()};
LockGuard g{lock_sys.rec_hash, id};
lock_rec_move(*block, id, PAGE_HEAP_NO_INFIMUM, heap_no);
lock_rec_move(g.cell(), *block, id, g.cell(), id,
PAGE_HEAP_NO_INFIMUM, heap_no);
}
/** Restore the explicit lock requests on a single record, where the
......@@ -3030,8 +3045,10 @@ void lock_rec_restore_from_page_infimum(const buf_block_t &block,
const rec_t *rec, page_id_t donator)
{
const ulint heap_no= page_rec_get_heap_no(rec);
LockMultiGuard g{lock_sys.rec_hash, block.page.id(), donator};
lock_rec_move(block, donator, heap_no, PAGE_HEAP_NO_INFIMUM);
const page_id_t id{block.page.id()};
LockMultiGuard g{lock_sys.rec_hash, id, donator};
lock_rec_move(g.cell1(), block, id, g.cell2(), donator, heap_no,
PAGE_HEAP_NO_INFIMUM);
}
/*========================= TABLE LOCKS ==============================*/
......@@ -3648,7 +3665,7 @@ lock_rec_unlock(
LockGuard g{lock_sys.rec_hash, id};
first_lock = lock_sys.get_first(lock_sys.rec_hash, id, heap_no);
first_lock = lock_sys_t::get_first(g.cell(), id, heap_no);
/* Find the last lock with the same lock_mode and transaction
on the record. */
......@@ -3690,7 +3707,8 @@ lock_rec_unlock(
ut_ad(lock->trx->lock.wait_trx);
ut_ad(lock->trx->lock.wait_lock);
if (const lock_t* c = lock_rec_has_to_wait_in_queue(lock)) {
if (const lock_t* c = lock_rec_has_to_wait_in_queue(g.cell(),
lock)) {
#ifdef WITH_WSREP
wsrep_assert_no_bf_bf_wait(c, lock, c->trx);
#endif /* WITH_WSREP */
......@@ -3742,7 +3760,8 @@ static bool lock_release_try(trx_t *trx)
lock->index->table->id >= DICT_HDR_FIRST_ID ||
trx->dict_operation);
auto &lock_hash= lock_sys.hash_get(lock->type_mode);
auto latch= lock_hash.lock_get(lock->un_member.rec_lock.page_id.fold());
auto cell= lock_hash.cell_get(lock->un_member.rec_lock.page_id.fold());
auto latch= lock_sys_t::hash_table::latch(cell);
if (!latch->try_acquire())
all_released= false;
else
......@@ -4304,11 +4323,12 @@ lock_rec_queue_validate(
lock_sys.wr_lock(SRW_LOCK_CALL);
}
lock_sys.rec_hash.assert_locked(id);
hash_cell_t &cell= *lock_sys.rec_hash.cell_get(id.fold());
lock_sys.assert_locked(cell);
if (!page_rec_is_user_rec(rec)) {
for (lock = lock_sys.get_first(lock_sys.rec_hash, id, heap_no);
for (lock = lock_sys_t::get_first(cell, id, heap_no);
lock != NULL;
lock = lock_rec_get_next_const(heap_no, lock)) {
......@@ -4319,7 +4339,7 @@ lock_rec_queue_validate(
ut_ad(trx_state_eq(lock->trx,
TRX_STATE_COMMITTED_IN_MEMORY)
|| !lock->is_waiting()
|| lock_rec_has_to_wait_in_queue(lock));
|| lock_rec_has_to_wait_in_queue(cell, lock));
lock->trx->mutex_unlock();
}
......@@ -4348,7 +4368,8 @@ lock_rec_queue_validate(
if (impl_trx->state == TRX_STATE_COMMITTED_IN_MEMORY) {
} else if (const lock_t* other_lock
= lock_rec_other_has_expl_req(
LOCK_S, id, true, heap_no, impl_trx)) {
LOCK_S, cell, id, true, heap_no,
impl_trx)) {
/* The impl_trx is holding an implicit lock on the
given record 'rec'. So there cannot be another
explicit granted lock. Also, there can be another
......@@ -4380,7 +4401,7 @@ lock_rec_queue_validate(
wsrep_report_bf_lock_wait(other_lock->trx->mysql_thd, other_lock->trx->id);
if (!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
id, heap_no,
cell, id, heap_no,
impl_trx)) {
ib::info() << "WSREP impl BF lock conflict";
}
......@@ -4389,14 +4410,15 @@ lock_rec_queue_validate(
{
ut_ad(other_lock->is_waiting());
ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
id, heap_no, impl_trx));
cell, id, heap_no,
impl_trx));
}
}
impl_trx->mutex_unlock();
}
for (lock = lock_sys.get_first(lock_sys.rec_hash, id, heap_no);
for (lock = lock_sys_t::get_first(cell, id, heap_no);
lock != NULL;
lock = lock_rec_get_next_const(heap_no, lock)) {
......@@ -4409,14 +4431,15 @@ lock_rec_queue_validate(
if (lock->is_waiting()) {
ut_a(lock->is_gap()
|| lock_rec_has_to_wait_in_queue(lock));
|| lock_rec_has_to_wait_in_queue(cell, lock));
} else if (!lock->is_gap()) {
const lock_mode mode = lock->mode() == LOCK_S
? LOCK_X : LOCK_S;
const lock_t* other_lock
= lock_rec_other_has_expl_req(
mode, id, false, heap_no, lock->trx);
mode, cell, id, false, heap_no,
lock->trx);
#ifdef WITH_WSREP
if (UNIV_UNLIKELY(other_lock && lock->trx->is_wsrep())) {
/* Only BF transaction may be granted
......@@ -4458,7 +4481,7 @@ static bool lock_rec_validate_page(const buf_block_t *block, bool latched)
LockGuard g{lock_sys.rec_hash, id};
loop:
lock = lock_sys.rec_hash.get_first(id);
lock = lock_sys_t::get_first(g.cell(), id);
if (!lock) {
goto function_exit;
......@@ -4686,7 +4709,7 @@ lock_rec_insert_check_and_lock(
BTR_NO_LOCKING_FLAG and skip the locking altogether. */
ut_ad(lock_table_has(trx, index->table, LOCK_IX));
*inherit= lock_sys.get_first(lock_sys.rec_hash, id, heap_no);
*inherit= lock_sys_t::get_first(g.cell(), id, heap_no);
if (*inherit)
{
......@@ -4706,8 +4729,9 @@ lock_rec_insert_check_and_lock(
on the successor, which produced an unnecessary deadlock. */
const unsigned type_mode= LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION;
if(lock_t *c_lock= lock_rec_other_has_conflicting(type_mode, id,
heap_no, trx))
if (lock_t *c_lock= lock_rec_other_has_conflicting(type_mode,
g.cell(), id,
heap_no, trx))
{
trx->mutex_lock();
err= lock_rec_enqueue_waiting(c_lock, type_mode, id, block->frame,
......@@ -4777,9 +4801,10 @@ lock_rec_convert_impl_to_expl_for_trx(
ut_ad(!trx_state_eq(trx, TRX_STATE_NOT_STARTED));
if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) &&
!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, id, heap_no, trx))
lock_rec_add_to_queue(LOCK_X | LOCK_REC_NOT_GAP, id, page_align(rec),
heap_no, index, trx, true);
!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id, heap_no,
trx))
lock_rec_add_to_queue(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id,
page_align(rec), heap_no, index, trx, true);
}
trx->mutex_unlock();
......@@ -4793,6 +4818,7 @@ lock_rec_convert_impl_to_expl_for_trx(
struct lock_rec_other_trx_holds_expl_arg
{
const ulint heap_no;
const hash_cell_t &cell;
const page_id_t id;
const trx_t &impl_trx;
};
......@@ -4810,7 +4836,7 @@ static my_bool lock_rec_other_trx_holds_expl_callback(
lock_t *expl_lock= element->trx->state == TRX_STATE_COMMITTED_IN_MEMORY
? nullptr
: lock_rec_has_expl(LOCK_S | LOCK_REC_NOT_GAP,
arg->id, arg->heap_no, element->trx);
arg->cell, arg->id, arg->heap_no, element->trx);
/*
An explicit lock is held by trx other than the trx holding the implicit
lock.
......@@ -4844,6 +4870,7 @@ static void lock_rec_other_trx_holds_expl(trx_t *caller_trx, trx_t *trx,
if (trx)
{
ut_ad(!page_rec_is_metadata(rec));
const auto fold= id.fold();
LockMutexGuard g{SRW_LOCK_CALL};
ut_ad(trx->is_referenced());
const trx_state_t state{trx->state};
......@@ -4851,9 +4878,9 @@ static void lock_rec_other_trx_holds_expl(trx_t *caller_trx, trx_t *trx,
if (state == TRX_STATE_COMMITTED_IN_MEMORY)
/* The transaction was committed before we acquired LockMutexGuard. */
return;
lock_rec_other_trx_holds_expl_arg arg= { page_rec_get_heap_no(rec), id,
*trx };
;
lock_rec_other_trx_holds_expl_arg arg=
{ page_rec_get_heap_no(rec), *lock_sys.rec_hash.cell_get(fold), id, *trx };
trx_sys.rw_trx_hash.iterate(caller_trx,
lock_rec_other_trx_holds_expl_callback, &arg);
}
......@@ -5634,7 +5661,8 @@ bool lock_trx_has_expl_x_lock(const trx_t &trx, const dict_table_t &table,
if (!lock_table_has(&trx, &table, LOCK_X))
{
LockGuard g{lock_sys.rec_hash, id};
ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, id, heap_no, &trx));
ut_ad(lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP,
g.cell(), id, heap_no, &trx));
}
return true;
}
......@@ -5800,7 +5828,7 @@ namespace Deadlock
print(*next);
print("*** WAITING FOR THIS LOCK TO BE GRANTED:\n");
print(*wait_lock);
if (r == REPORT_BASIC);
if (r == REPORT_BASIC);
else if (wait_lock->is_table())
{
if (const lock_t *lock=
......@@ -5816,24 +5844,27 @@ namespace Deadlock
else
ut_ad("no conflicting table lock found" == 0);
}
else if (const lock_t *lock=
(wait_lock->type_mode & LOCK_PREDICATE
? lock_sys.prdt_hash : lock_sys.rec_hash).
get_first(wait_lock->un_member.rec_lock.page_id))
else
{
const ulint heap_no= lock_rec_find_set_bit(wait_lock);
if (!lock_rec_get_nth_bit(lock, heap_no))
lock= lock_rec_get_next_const(heap_no, lock);
ut_ad(!lock->is_waiting());
print("*** CONFLICTING WITH:\n");
do
print(*lock);
while ((lock= lock_rec_get_next_const(heap_no, lock)) &&
!lock->is_waiting());
const page_id_t id{wait_lock->un_member.rec_lock.page_id};
hash_cell_t &cell= *(wait_lock->type_mode & LOCK_PREDICATE
? lock_sys.prdt_hash : lock_sys.rec_hash).
cell_get(id.fold());
if (const lock_t *lock= lock_sys_t::get_first(cell, id))
{
const ulint heap_no= lock_rec_find_set_bit(wait_lock);
if (!lock_rec_get_nth_bit(lock, heap_no))
lock= lock_rec_get_next_const(heap_no, lock);
ut_ad(!lock->is_waiting());
print("*** CONFLICTING WITH:\n");
do
print(*lock);
while ((lock= lock_rec_get_next_const(heap_no, lock)) &&
!lock->is_waiting());
}
else
ut_ad("no conflicting record lock found" == 0);
}
else
ut_ad("no conflicting record lock found" == 0);
if (next == cycle)
break;
}
......@@ -5948,16 +5979,17 @@ lock_update_split_and_merge(
/* Inherit the locks on the supremum of the left page to the
first record which was moved from the right page */
lock_rec_inherit_to_gap(l, l, left_block->frame,
lock_rec_inherit_to_gap(g.cell1(), l, g.cell1(), l, left_block->frame,
page_rec_get_heap_no(left_next_rec),
PAGE_HEAP_NO_SUPREMUM);
/* Reset the locks on the supremum of the left page,
releasing waiting transactions */
lock_rec_reset_and_release_wait(l, PAGE_HEAP_NO_SUPREMUM);
lock_rec_reset_and_release_wait(g.cell1(), l, PAGE_HEAP_NO_SUPREMUM);
/* Inherit the locks to the supremum of the left page from the
successor of the infimum on the right page */
lock_rec_inherit_to_gap(l, r, left_block->frame, PAGE_HEAP_NO_SUPREMUM,
lock_rec_inherit_to_gap(g.cell1(), l, g.cell2(), r, left_block->frame,
PAGE_HEAP_NO_SUPREMUM,
lock_get_min_heap_no(right_block));
}
......@@ -228,7 +228,7 @@ lock_t*
lock_prdt_has_lock(
/*===============*/
ulint precise_mode, /*!< in: LOCK_S or LOCK_X */
unsigned type_mode, /*!< in: LOCK_PREDICATE etc. */
hash_cell_t& cell, /*!< hash table cell of id */
const page_id_t id, /*!< in: page identifier */
lock_prdt_t* prdt, /*!< in: The predicate to be
attached to the new lock */
......@@ -238,8 +238,7 @@ lock_prdt_has_lock(
|| (precise_mode & LOCK_MODE_MASK) == LOCK_X);
ut_ad(!(precise_mode & LOCK_INSERT_INTENTION));
for (lock_t*lock= lock_sys.get_first(lock_sys.hash_get(type_mode),
id, PRDT_HEAPNO);
for (lock_t*lock= lock_sys_t::get_first(cell, id, PRDT_HEAPNO);
lock;
lock = lock_rec_get_next(PRDT_HEAPNO, lock)) {
ut_ad(lock->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE));
......@@ -282,14 +281,14 @@ lock_prdt_other_has_conflicting(
unsigned mode, /*!< in: LOCK_S or LOCK_X,
possibly ORed to LOCK_PREDICATE or
LOCK_PRDT_PAGE, LOCK_INSERT_INTENTION */
const hash_cell_t& cell, /*!< in: hash table cell */
const page_id_t id, /*!< in: page identifier */
lock_prdt_t* prdt, /*!< in: Predicates (currently)
the Minimum Bounding Rectangle)
the new lock will be on */
const trx_t* trx) /*!< in: our transaction */
{
for (lock_t* lock = lock_sys.get_first(lock_sys.hash_get(mode),
id, PRDT_HEAPNO);
for (lock_t* lock = lock_sys_t::get_first(cell, id, PRDT_HEAPNO);
lock != NULL;
lock = lock_rec_get_next(PRDT_HEAPNO, lock)) {
......@@ -381,9 +380,10 @@ lock_prdt_find_on_page(
lock_prdt_t* prdt, /*!< in: MBR with the lock */
const trx_t* trx) /*!< in: transaction */
{
lock_t* lock;
const page_id_t id{block->page.id()};
hash_cell_t& cell = *lock_sys.hash_get(type_mode).cell_get(id.fold());
for (lock = lock_sys.get_first(type_mode, block->page.id());
for (lock_t *lock = lock_sys_t::get_first(cell, id);
lock != NULL;
lock = lock_rec_get_next_on_page(lock)) {
......@@ -440,26 +440,29 @@ lock_prdt_add_to_queue(
#endif /* UNIV_DEBUG */
/* Try to extend a similar non-waiting lock on the same page */
if (type_mode & LOCK_WAIT) {
goto create;
}
for (lock_t* lock = lock_sys.get_first(type_mode, block->page.id());
lock; lock = lock_rec_get_next_on_page(lock)) {
if (lock->is_waiting()
&& lock->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)
&& lock_rec_get_nth_bit(lock, PRDT_HEAPNO)) {
goto create;
if (!(type_mode & LOCK_WAIT)) {
const page_id_t id{block->page.id()};
hash_cell_t& cell = *lock_sys.hash_get(type_mode).
cell_get(id.fold());
for (lock_t* lock = lock_sys_t::get_first(cell, id);
lock; lock = lock_rec_get_next_on_page(lock)) {
if (lock->is_waiting()
&& lock->type_mode
& (LOCK_PREDICATE | LOCK_PRDT_PAGE)
&& lock_rec_get_nth_bit(lock, PRDT_HEAPNO)) {
goto create;
}
}
}
if (lock_t* lock = lock_prdt_find_on_page(type_mode, block,
prdt, trx)) {
if (lock->type_mode & LOCK_PREDICATE) {
lock_prdt_enlarge_prdt(lock, prdt);
}
if (lock_t* lock = lock_prdt_find_on_page(type_mode, block,
prdt, trx)) {
if (lock->type_mode & LOCK_PREDICATE) {
lock_prdt_enlarge_prdt(lock, prdt);
}
return lock;
return lock;
}
}
create:
......@@ -509,8 +512,7 @@ lock_prdt_insert_check_and_lock(
ut_ad(lock_table_has(trx, index->table, LOCK_IX));
/* Only need to check locks on prdt_hash */
if (ut_d(lock_t *lock=)
lock_sys.get_first(lock_sys.prdt_hash, id, PRDT_HEAPNO))
if (ut_d(lock_t *lock=) lock_sys_t::get_first(g.cell(), id, PRDT_HEAPNO))
{
ut_ad(lock->type_mode & LOCK_PREDICATE);
......@@ -522,7 +524,8 @@ lock_prdt_insert_check_and_lock(
with each other */
const ulint mode= LOCK_X | LOCK_PREDICATE | LOCK_INSERT_INTENTION;
lock_t *c_lock= lock_prdt_other_has_conflicting(mode, id, prdt, trx);
lock_t *c_lock= lock_prdt_other_has_conflicting(mode, g.cell(), id,
prdt, trx);
if (c_lock)
{
......@@ -556,10 +559,12 @@ lock_prdt_update_parent(
lock_prdt_t* right_prdt, /*!< in: MBR on the new page */
const page_id_t page_id) /*!< in: parent page */
{
auto fold= page_id.fold();
LockMutexGuard g{SRW_LOCK_CALL};
hash_cell_t& cell = *lock_sys.prdt_hash.cell_get(fold);
/* Get all locks in parent */
for (lock_t *lock = lock_sys.prdt_hash.get_first(page_id);
for (lock_t *lock = lock_sys_t::get_first(cell, page_id);
lock;
lock = lock_rec_get_next_on_page(lock)) {
lock_prdt_t* lock_prdt;
......@@ -604,13 +609,13 @@ lock_prdt_update_split_low(
buf_block_t* new_block, /*!< in/out: the new half page */
lock_prdt_t* prdt, /*!< in: MBR on the old page */
lock_prdt_t* new_prdt, /*!< in: MBR on the new page */
const page_id_t page_id, /*!< in: page number */
const page_id_t id, /*!< in: page number */
unsigned type_mode) /*!< in: LOCK_PREDICATE or
LOCK_PRDT_PAGE */
{
lock_t* lock;
hash_cell_t& cell = *lock_sys.hash_get(type_mode).cell_get(id.fold());
for (lock = lock_sys.get_first(type_mode, page_id);
for (lock_t* lock = lock_sys_t::get_first(cell, id);
lock;
lock = lock_rec_get_next_on_page(lock)) {
/* First dealing with Page Lock */
......@@ -726,7 +731,7 @@ lock_prdt_lock(
LockGuard g{hash, id};
const unsigned prdt_mode = type_mode | mode;
lock_t* lock = hash.get_first(id);
lock_t* lock = lock_sys_t::get_first(g.cell(), id);
if (lock == NULL) {
lock = lock_rec_create(
......@@ -746,14 +751,14 @@ lock_prdt_lock(
trx->mutex_lock();
lock = lock_prdt_has_lock(
mode, type_mode, id, prdt, trx);
mode, g.cell(), id, prdt, trx);
if (lock == NULL) {
lock_t* wait_for;
wait_for = lock_prdt_other_has_conflicting(
prdt_mode, id, prdt, trx);
prdt_mode, g.cell(), id, prdt, trx);
if (wait_for != NULL) {
......@@ -811,7 +816,7 @@ lock_place_prdt_page_lock(
LockGuard g{lock_sys.prdt_page_hash, page_id};
const lock_t* lock = lock_sys.prdt_page_hash.get_first(page_id);
const lock_t* lock = lock_sys_t::get_first(g.cell(), page_id);
const ulint mode = LOCK_S | LOCK_PRDT_PAGE;
trx_t* trx = thr_get_trx(thr);
......@@ -847,7 +852,7 @@ lock_place_prdt_page_lock(
bool lock_test_prdt_page_lock(const trx_t *trx, const page_id_t page_id)
{
LockGuard g{lock_sys.prdt_page_hash, page_id};
lock_t *lock= lock_sys.prdt_page_hash.get_first(page_id);
lock_t *lock= lock_sys_t::get_first(g.cell(), page_id);
return !lock || trx == lock->trx;
}
......@@ -863,9 +868,9 @@ lock_prdt_rec_move(
{
LockMultiGuard g{lock_sys.prdt_hash, receiver->page.id(), donator};
for (lock_t *lock = lock_sys.get_first(lock_sys.prdt_hash,
donator, PRDT_HEAPNO);
lock != NULL;
for (lock_t *lock = lock_sys_t::get_first(g.cell2(), donator,
PRDT_HEAPNO);
lock;
lock = lock_rec_get_next(PRDT_HEAPNO, lock)) {
const auto type_mode = lock->type_mode;
......@@ -889,11 +894,11 @@ void lock_sys_t::prdt_page_free_from_discard(const page_id_t id, bool all)
{
const auto id_fold= id.fold();
rd_lock(SRW_LOCK_CALL);
auto latch= prdt_page_hash.lock_get(id_fold);
auto cell= prdt_page_hash.cell_get(id_fold);
auto latch= hash_table::latch(cell);
latch->acquire();
for (lock_t *lock= lock_sys.prdt_page_hash.get_first(id), *next; lock;
lock= next)
for (lock_t *lock= get_first(*cell, id), *next; lock; lock= next)
{
next= lock_rec_get_next_on_page(lock);
lock_rec_discard(prdt_page_hash, lock);
......@@ -902,10 +907,10 @@ void lock_sys_t::prdt_page_free_from_discard(const page_id_t id, bool all)
if (all)
{
latch->release();
latch= prdt_hash.lock_get(id_fold);
cell= prdt_hash.cell_get(id_fold);
latch= hash_table::latch(cell);
latch->acquire();
for (lock_t *lock= lock_sys.prdt_hash.get_first(id), *next; lock;
lock= next)
for (lock_t *lock= get_first(*cell, id), *next; lock; lock= next)
{
next= lock_rec_get_next_on_page(lock);
lock_rec_discard(prdt_hash, lock);
......@@ -913,11 +918,11 @@ void lock_sys_t::prdt_page_free_from_discard(const page_id_t id, bool all)
}
latch->release();
latch= rec_hash.lock_get(id_fold);
cell= rec_hash.cell_get(id_fold);
latch= hash_table::latch(cell);
latch->acquire();
for (lock_t *lock= lock_sys.rec_hash.get_first(id), *next; lock;
lock= next)
for (lock_t *lock= get_first(*cell, id), *next; lock; lock= next)
{
next= lock_rec_get_next_on_page(lock);
lock_rec_discard(rec_hash, lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment