Commit 17e59ed3 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-33454 release row locks for non-modified rows at XA PREPARE

From the correctness point of view, it should be safe to release
all locks on index records that were not modified by the transaction.
Doing so should make the locks after XA PREPARE fully compatible
with what would happen if the server were restarted: InnoDB table
IX locks and exclusive record locks would be resurrected based on
undo log records.

Concurrently running transactions that are waiting for a lock may invoke
lock_rec_convert_impl_to_expl() to create an explicit record lock object
on behalf of the lock-owning transaction so that they can attaching
their waiting lock request on the explicit record lock object. Explicit
locks would be released by trx_t::release_locks() during commit or
rollback.

Any clustered index record whose DB_TRX_ID belongs to a transaction that
is in active or XA PREPARE state will be implicitly locked by that
transaction. On XA PREPARE, we can release explicit exclusive locks on
records whose DB_TRX_ID does not match the current transaction identifier.

lock_rec_unlock_unmodified(): Release record locks that are not implicitly
held by the current transaction.

lock_release_on_prepare_try(), lock_release_on_prepare():
Invoke lock_rec_unlock_unmodified().

row_trx_id_offset(): Declare non-static.

lock_rec_unlock(): Replaces lock_rec_unlock_supremum().

Reviewed by: Vladislav Lesin
parent fa8a46eb
......@@ -4182,7 +4182,7 @@ TRANSACTIONAL_TARGET static bool lock_release_try(trx_t *trx)
ulint count= 1000;
/* We will not attempt hardware lock elision (memory transaction)
here. Both lock_rec_dequeue_from_page() and lock_table_dequeue()
would likely lead to a memory transaction due to a system call, to
would likely lead to a memory transaction abort due to a system call, to
wake up a waiting transaction. */
lock_sys.rd_lock(SRW_LOCK_CALL);
trx->mutex_lock();
......@@ -4352,28 +4352,82 @@ void lock_release_on_drop(trx_t *trx)
}
}
/** Reset lock bit for supremum and rebuild waiting queue.
/** Reset a lock bit and rebuild waiting queue.
@param cell rec hash cell of in_lock
@param lock the lock with supemum bit set */
static void lock_rec_unlock_supremum(hash_cell_t &cell, lock_t *lock)
static void lock_rec_unlock(hash_cell_t &cell, lock_t *lock, ulint heap_no)
{
ut_ad(lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM));
ut_ad(lock_rec_get_nth_bit(lock, heap_no));
#ifdef SAFE_MUTEX
ut_ad(!mysql_mutex_is_owner(&lock_sys.wait_mutex));
#endif /* SAFE_MUTEX */
ut_ad(!lock->is_table());
ut_ad(lock_sys.is_writer() || lock->trx->mutex_is_owner());
lock_rec_reset_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM);
lock_rec_reset_nth_bit(lock, heap_no);
lock_t *first_lock= lock_sys_t::get_first(
cell, lock->un_member.rec_lock.page_id, PAGE_HEAP_NO_SUPREMUM);
lock_t *first_lock=
lock_sys_t::get_first(cell, lock->un_member.rec_lock.page_id, heap_no);
lock_rec_rebuild_waiting_queue(
#if defined(UNIV_DEBUG) || !defined(DBUG_OFF)
lock->trx,
#endif /* defined(UNIV_DEBUG) || !defined(DBUG_OFF) */
cell, first_lock, PAGE_HEAP_NO_SUPREMUM);
cell, first_lock, heap_no);
}
/** Release locks to unmodified records on a clustered index page.
@param cell lock_sys.rec_hash cell of lock
@param lock record lock
@param offsets storage for rec_get_offsets()
@param heap storage for rec_get_offsets()
@param mtr mini-transaction (will be started and committed) */
static void lock_rec_unlock_unmodified(hash_cell_t &cell, lock_t *lock,
rec_offs *&offsets, mem_heap_t *&heap,
mtr_t &mtr)
{
ut_ad(!lock->is_waiting());
dict_index_t *const index= lock->index;
mtr.start();
if (buf_block_t *block=
btr_block_get(*index, lock->un_member.rec_lock.page_id.page_no(),
RW_S_LATCH, true, &mtr))
{
if (UNIV_UNLIKELY(!page_is_leaf(block->page.frame)))
{
ut_ad("corrupted lock system" == 0);
goto func_exit;
}
for (ulint i= PAGE_HEAP_NO_USER_LOW; i < lock_rec_get_n_bits(lock); ++i)
{
if (!lock_rec_get_nth_bit(lock, i));
else if (const rec_t *rec=
page_find_rec_with_heap_no(block->page.frame, i))
{
if (index->is_clust())
{
if (trx_read_trx_id(rec + row_trx_id_offset(rec, index)) ==
lock->trx->id)
continue;
unlock_rec:
lock_rec_unlock(cell, lock, i);
}
else
{
offsets= rec_get_offsets(rec, index, offsets, index->n_core_fields,
ULINT_UNDEFINED, &heap);
if (lock->trx !=
lock_sec_rec_some_has_impl(lock->trx, rec, index, offsets))
goto unlock_rec;
}
}
}
}
func_exit:
mtr.commit();
}
/** Release non-exclusive locks on XA PREPARE,
......@@ -4391,6 +4445,12 @@ static bool lock_release_on_prepare_try(trx_t *trx)
DBUG_ASSERT(trx->state == TRX_STATE_PREPARED);
bool all_released= true;
mtr_t mtr;
rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
rec_offs *offsets= offsets_;
mem_heap_t *heap= nullptr;
rec_offs_init(offsets_);
lock_sys.rd_lock(SRW_LOCK_CALL);
trx->mutex_lock();
......@@ -4407,20 +4467,24 @@ static bool lock_release_on_prepare_try(trx_t *trx)
if (!lock->is_table())
{
ut_ad(!lock->index->table->is_temporary());
bool supremum_bit = lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM);
bool rec_granted_exclusive_not_gap =
bool supremum_bit= lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM);
bool rec_granted_exclusive_not_gap=
lock->is_rec_granted_exclusive_not_gap();
if (!supremum_bit && rec_granted_exclusive_not_gap)
continue;
auto &lock_hash= lock_sys.hash_get(lock->type_mode);
auto cell= lock_hash.cell_get(lock->un_member.rec_lock.page_id.fold());
if (UNIV_UNLIKELY(lock->type_mode & (LOCK_PREDICATE | LOCK_PRDT_PAGE)))
continue; /* SPATIAL INDEX locking is broken. */
auto cell=
lock_sys.rec_hash.cell_get(lock->un_member.rec_lock.page_id.fold());
auto latch= lock_sys_t::hash_table::latch(cell);
if (latch->try_acquire())
{
if (!rec_granted_exclusive_not_gap)
lock_rec_dequeue_from_page(lock, false);
else if (supremum_bit)
lock_rec_unlock_supremum(*cell, lock);
lock_rec_unlock(*cell, lock, PAGE_HEAP_NO_SUPREMUM);
else
lock_rec_unlock_unmodified(*cell, lock, offsets, heap, mtr);
latch->release();
}
else
......@@ -4453,6 +4517,8 @@ static bool lock_release_on_prepare_try(trx_t *trx)
lock_sys.rd_unlock();
trx->mutex_unlock();
if (UNIV_LIKELY_NULL(heap))
mem_heap_free(heap);
return all_released;
}
......@@ -4466,52 +4532,71 @@ void lock_release_on_prepare(trx_t *trx)
if (lock_release_on_prepare_try(trx))
return;
LockMutexGuard g{SRW_LOCK_CALL};
trx->mutex_lock();
mtr_t mtr;
rec_offs offsets_[REC_OFFS_NORMAL_SIZE];
rec_offs *offsets= offsets_;
mem_heap_t *heap= nullptr;
rec_offs_init(offsets_);
for (lock_t *prev, *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock;
lock= prev)
{
ut_ad(lock->trx == trx);
prev= UT_LIST_GET_PREV(trx_locks, lock);
if (!lock->is_table())
LockMutexGuard g{SRW_LOCK_CALL};
trx->mutex_lock();
for (lock_t *prev, *lock= UT_LIST_GET_LAST(trx->lock.trx_locks); lock;
lock= prev)
{
ut_ad(!lock->index->table->is_temporary());
if (!lock->is_rec_granted_exclusive_not_gap())
lock_rec_dequeue_from_page(lock, false);
else if (lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM))
ut_ad(lock->trx == trx);
prev= UT_LIST_GET_PREV(trx_locks, lock);
if (!lock->is_table())
{
auto &lock_hash= lock_sys.hash_get(lock->type_mode);
auto cell= lock_hash.cell_get(lock->un_member.rec_lock.page_id.fold());
lock_rec_unlock_supremum(*cell, lock);
ut_ad(!lock->index->table->is_temporary());
if (!lock->is_rec_granted_exclusive_not_gap())
lock_rec_dequeue_from_page(lock, false);
else if (UNIV_UNLIKELY(lock->type_mode &
(LOCK_PREDICATE | LOCK_PRDT_PAGE)))
/* SPATIAL INDEX locking is broken. */;
else
{
auto cell= lock_sys.rec_hash.cell_get(lock->un_member.rec_lock.
page_id.fold());
if (lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM))
lock_rec_unlock(*cell, lock, PAGE_HEAP_NO_SUPREMUM);
else
{
ut_ad(lock->trx->isolation_level > TRX_ISO_READ_COMMITTED ||
/* Insert-intention lock is valid for supremum for isolation
level > TRX_ISO_READ_COMMITTED */
lock->mode() == LOCK_X ||
!lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM));
lock_rec_unlock_unmodified(*cell, lock, offsets, heap, mtr);
}
}
}
else
ut_ad(lock->trx->isolation_level > TRX_ISO_READ_COMMITTED ||
/* Insert-intention lock is valid for supremum for isolation
level > TRX_ISO_READ_COMMITTED */
lock->mode() == LOCK_X ||
!lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM));
}
else
{
ut_d(dict_table_t *table= lock->un_member.tab_lock.table);
ut_ad(!table->is_temporary());
switch (lock->mode()) {
case LOCK_IS:
case LOCK_S:
lock_table_dequeue(lock, false);
break;
case LOCK_IX:
case LOCK_X:
ut_ad(table->id >= DICT_HDR_FIRST_ID || trx->dict_operation);
/* fall through */
default:
break;
{
ut_d(dict_table_t *table= lock->un_member.tab_lock.table);
ut_ad(!table->is_temporary());
switch (lock->mode()) {
case LOCK_IS:
case LOCK_S:
lock_table_dequeue(lock, false);
break;
case LOCK_IX:
case LOCK_X:
ut_ad(table->id >= DICT_HDR_FIRST_ID || trx->dict_operation);
/* fall through */
default:
break;
}
}
}
}
trx->mutex_unlock();
if (UNIV_LIKELY_NULL(heap))
mem_heap_free(heap);
}
/** Release locks on a table whose creation is being rolled back */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment