Commit 95730372 authored by Vlad Lesin's avatar Vlad Lesin

MDEV-30165 X-lock on supremum for prepared transaction for RR

trx_t::set_skip_lock_inheritance() must be invoked at the very beginning
of lock_release_on_prepare(). Currently  trx_t::set_skip_lock_inheritance()
is invoked at the end of lock_release_on_prepare() when lock_sys and trx
are released, and there can be a case when locks on prepare are released,
but "not inherit gap locks" bit has not yet been set, and page split
inherits lock to supremum.

Also reset supremum bit and rebuild waiting queue when XA is prepared.

Reviewed by: Marko Mäkelä
parent 8513f8f1
CREATE TABLE t (
`a` INT NOT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB;
INSERT INTO t VALUES(10);
INSERT INTO t VALUES(20);
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
XA START '1';
SELECT * FROM t WHERE a > 20 FOR UPDATE;
a
INSERT INTO t VALUES(40);
XA END '1';
XA PREPARE '1';
connect con1,localhost,root;
SET innodb_lock_wait_timeout=1;
INSERT INTO t VALUES(50);
disconnect con1;
connection default;
XA COMMIT '1';
DROP TABLE t;
--source include/have_innodb.inc
--source include/count_sessions.inc
CREATE TABLE t (
`a` INT NOT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB;
INSERT INTO t VALUES(10);
INSERT INTO t VALUES(20);
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
XA START '1';
SELECT * FROM t WHERE a > 20 FOR UPDATE;
INSERT INTO t VALUES(40);
XA END '1';
XA PREPARE '1';
connect (con1,localhost,root);
SET innodb_lock_wait_timeout=1;
# This will be finished with lock wait timeout error if XA PREPARE did not
# reset lock on supremum
INSERT INTO t VALUES(50);
--disconnect con1
--connection default
XA COMMIT '1';
DROP TABLE t;
--source include/wait_until_count_sessions.inc
......@@ -560,6 +560,15 @@ lock_rec_get_next_const(
ulint heap_no,/*!< in: heap number of the record */
const lock_t* lock); /*!< in: lock */
/** Gets the first explicit lock request on a record.
@param hash hash chain the lock on
@param page_id page id containing the record
@param heap_no heap number of the record
@return first lock, NULL if none exists */
UNIV_INLINE
lock_t *lock_rec_get_first(hash_table_t *hash, const page_id_t page_id,
ulint heap_no);
/*********************************************************************//**
Gets the first explicit lock request on a record.
@return first lock, NULL if none exists */
......
......@@ -154,6 +154,22 @@ lock_rec_get_next_const(
return(lock_rec_get_next(heap_no, (lock_t*) lock));
}
/** Gets the first explicit lock request on a record.
@param hash hash chain the lock on
@param page_id page id containing the record
@param heap_no heap number of the record
@return first lock, NULL if none exists */
UNIV_INLINE
lock_t *lock_rec_get_first(hash_table_t *hash, const page_id_t page_id,
ulint heap_no)
{
for (lock_t *lock= lock_sys.get_first(*hash, page_id); lock;
lock= lock_rec_get_next_on_page(lock))
if (lock_rec_get_nth_bit(lock, heap_no))
return lock;
return nullptr;
}
/*********************************************************************//**
Gets the first explicit lock request on a record.
@return first lock, NULL if none exists */
......@@ -165,11 +181,7 @@ lock_rec_get_first(
const buf_block_t* block, /*!< in: block containing the record */
ulint heap_no)/*!< in: heap number of the record */
{
for (lock_t *lock= lock_sys.get_first(*hash, block->page.id());
lock; lock= lock_rec_get_next_on_page(lock))
if (lock_rec_get_nth_bit(lock, heap_no))
return lock;
return nullptr;
return lock_rec_get_first(hash, block->page.id(), heap_no);
}
/*********************************************************************//**
......
......@@ -4121,6 +4121,39 @@ lock_grant_and_move_on_rec(
}
}
/** Rebuild waiting queue after first_lock for heap_no. The queue is rebuilt
close to the way lock_rec_dequeue_from_page() does it.
@param trx transaction that has set a lock, which caused the queue
rebuild
@param first_lock the lock after which waiting queue will be rebuilt
@param heap_no heap no of the record for which waiting queue to rebuild */
static void lock_rec_rebuild_waiting_queue(trx_t *trx, lock_t *first_lock,
ulint heap_no)
{
ut_ad(lock_mutex_own());
ut_ad(trx_mutex_own(trx));
if (innodb_lock_schedule_algorithm == INNODB_LOCK_SCHEDULE_ALGORITHM_FCFS ||
thd_is_replication_slave_thread(trx->mysql_thd))
{
/* Check if we can now grant waiting lock requests */
for (lock_t *lock= first_lock; lock != NULL;
lock= lock_rec_get_next(heap_no, lock))
{
if (!lock_get_wait(lock))
continue;
const lock_t *c= lock_rec_has_to_wait_in_queue(lock);
if (!c)
{
/* Grant the lock */
ut_ad(trx != lock->trx);
lock_grant(lock);
}
}
}
else
lock_grant_and_move_on_rec(first_lock, heap_no);
}
/*************************************************************//**
Removes a granted record lock of a transaction from the queue and grants
locks to other transactions waiting in the queue if they now are entitled
......@@ -4181,28 +4214,7 @@ lock_rec_unlock(
released:
ut_a(!lock_get_wait(lock));
lock_rec_reset_nth_bit(lock, heap_no);
if (innodb_lock_schedule_algorithm
== INNODB_LOCK_SCHEDULE_ALGORITHM_FCFS ||
thd_is_replication_slave_thread(lock->trx->mysql_thd)) {
/* Check if we can now grant waiting lock requests */
for (lock = first_lock; lock != NULL;
lock = lock_rec_get_next(heap_no, lock)) {
if (!lock_get_wait(lock)) {
continue;
}
const lock_t* c = lock_rec_has_to_wait_in_queue(lock);
if (!c) {
/* Grant the lock */
ut_ad(trx != lock->trx);
lock_grant(lock);
}
}
} else {
lock_grant_and_move_on_rec(first_lock, heap_no);
}
lock_rec_rebuild_waiting_queue(trx, first_lock, heap_no);
lock_mutex_exit();
trx_mutex_exit(trx);
......@@ -4334,10 +4346,28 @@ void lock_release(trx_t* trx)
#endif
}
/** Reset lock bit for supremum and rebuild waiting queue.
@param lock the lock with supemum bit set */
static void lock_rec_unlock_supremum(lock_t *lock)
{
ut_ad(lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM));
ut_ad(lock_mutex_own());
ut_ad(lock_get_type_low(lock) == LOCK_REC);
trx_mutex_enter(lock->trx);
lock_t *first_lock=
lock_rec_get_first(&lock_sys.rec_hash, lock->un_member.rec_lock.page_id,
PAGE_HEAP_NO_SUPREMUM);
lock_rec_reset_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM);
lock_rec_rebuild_waiting_queue(lock->trx, first_lock, PAGE_HEAP_NO_SUPREMUM);
trx_mutex_exit(lock->trx);
}
/** Release non-exclusive locks on XA PREPARE,
and release possible other transactions waiting because of these locks. */
void lock_release_on_prepare(trx_t *trx)
{
trx->set_skip_lock_inheritance();
ulint count= 0;
lock_mutex_enter();
ut_ad(!trx_mutex_own(trx));
......@@ -4349,8 +4379,10 @@ void lock_release_on_prepare(trx_t *trx)
if (lock_get_type_low(lock) == LOCK_REC)
{
ut_ad(!lock->index->table->is_temporary());
if (lock_rec_get_gap(lock) || lock_get_mode(lock) != LOCK_X)
if ((lock->type_mode & (LOCK_MODE_MASK | LOCK_GAP)) != LOCK_X)
lock_rec_dequeue_from_page(lock);
else if (lock_rec_get_nth_bit(lock, PAGE_HEAP_NO_SUPREMUM))
lock_rec_unlock_supremum(lock);
else
{
ut_ad(trx->dict_operation ||
......@@ -4397,7 +4429,6 @@ void lock_release_on_prepare(trx_t *trx)
lock_mutex_exit();
trx->set_skip_lock_inheritance();
}
/* True if a lock mode is S or X */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment