lock0lock.h, lock0lock.c:

  Remove a spurious deadlock when >= 2 insert transactions have to wait because there is a next-key lock set
parent e2cc7f9c
...@@ -537,6 +537,14 @@ extern lock_sys_t* lock_sys; ...@@ -537,6 +537,14 @@ extern lock_sys_t* lock_sys;
the bit is set; locks of this type are created the bit is set; locks of this type are created
when records are removed from the index chain when records are removed from the index chain
of records */ of records */
#define LOCK_INSERT_INTENTION 1024 /* this bit is set when we place a waiting
gap type record lock request in order to let
an insert of an index record to wait until
there are no conflicting locks by other
transactions on the gap; note that this flag
remains set when the waiting lock is granted,
or if the lock is inherited to a neighboring
record */
/* When lock bits are reset, the following flags are available: */ /* When lock bits are reset, the following flags are available: */
#define LOCK_RELEASE_WAIT 1 #define LOCK_RELEASE_WAIT 1
......
...@@ -96,7 +96,9 @@ is only performed when a transaction itself requests a lock! ...@@ -96,7 +96,9 @@ is only performed when a transaction itself requests a lock!
An insert is allowed to a gap if there are no explicit lock requests by An insert is allowed to a gap if there are no explicit lock requests by
other transactions on the next record. It does not matter if these lock other transactions on the next record. It does not matter if these lock
requests are granted or waiting, gap bit set or not. On the other hand, an requests are granted or waiting, gap bit set or not, with the exception
that a gap type request set by another transaction to wait for
its turn to do an insert is ignored. On the other hand, an
implicit x-lock by another transaction does not prevent an insert, which implicit x-lock by another transaction does not prevent an insert, which
allows for more concurrency when using an Oracle-style sequence number allows for more concurrency when using an Oracle-style sequence number
generator for the primary key with many transactions doing inserts generator for the primary key with many transactions doing inserts
...@@ -544,8 +546,7 @@ lock_sec_rec_cons_read_sees( ...@@ -544,8 +546,7 @@ lock_sec_rec_cons_read_sees(
index record */ index record */
rec_t* rec, /* in: user record which should be read or rec_t* rec, /* in: user record which should be read or
passed over by a read cursor */ passed over by a read cursor */
dict_index_t* index __attribute__((unused)), dict_index_t* index, /* in: non-clustered index */
/* in: non-clustered index */
read_view_t* view) /* in: consistent read view */ read_view_t* view) /* in: consistent read view */
{ {
dulint max_trx_id; dulint max_trx_id;
...@@ -616,7 +617,7 @@ UNIV_INLINE ...@@ -616,7 +617,7 @@ UNIV_INLINE
ulint ulint
lock_get_type( lock_get_type(
/*==========*/ /*==========*/
/* out: LOCK_TABLE or LOCK_RECa */ /* out: LOCK_TABLE or LOCK_REC */
lock_t* lock) /* in: lock */ lock_t* lock) /* in: lock */
{ {
ut_ad(lock); ut_ad(lock);
...@@ -717,6 +718,46 @@ lock_rec_set_gap( ...@@ -717,6 +718,46 @@ lock_rec_set_gap(
} }
} }
/*************************************************************************
Gets the waiting insert flag of a record lock. */
UNIV_INLINE
ibool
lock_rec_get_insert_intention(
/*==========================*/
/* out: TRUE if gap flag set */
lock_t* lock) /* in: record lock */
{
ut_ad(lock);
ut_ad(lock_get_type(lock) == LOCK_REC);
if (lock->type_mode & LOCK_INSERT_INTENTION) {
return(TRUE);
}
return(FALSE);
}
/*************************************************************************
Sets the waiting insert flag of a record lock. */
UNIV_INLINE
void
lock_rec_set_insert_intention(
/*==========================*/
lock_t* lock, /* in: record lock */
ibool val) /* in: value to set: TRUE or FALSE */
{
ut_ad(lock);
ut_ad((val == TRUE) || (val == FALSE));
ut_ad(lock_get_type(lock) == LOCK_REC);
if (val) {
lock->type_mode = lock->type_mode | LOCK_INSERT_INTENTION;
} else {
lock->type_mode = lock->type_mode & ~LOCK_INSERT_INTENTION;
}
}
/************************************************************************* /*************************************************************************
Calculates if lock mode 1 is stronger or equal to lock mode 2. */ Calculates if lock mode 1 is stronger or equal to lock mode 2. */
UNIV_INLINE UNIV_INLINE
...@@ -1331,6 +1372,12 @@ lock_rec_other_has_expl_req( ...@@ -1331,6 +1372,12 @@ lock_rec_other_has_expl_req(
into account, or 0 if not */ into account, or 0 if not */
ulint wait, /* in: LOCK_WAIT if also waiting locks are ulint wait, /* in: LOCK_WAIT if also waiting locks are
taken into account, or 0 if not */ taken into account, or 0 if not */
ibool ignore_insert_intention,/* in: FALSE if also locks where the
LOCK_INSERT_INTENTION
flag is set should be taken into account; we can
use this option to prevent unnecessary deadlocks
between inserts which have needed to set a wait
lock to wait for some next-key lock to be released */
rec_t* rec, /* in: record to look at */ rec_t* rec, /* in: record to look at */
trx_t* trx) /* in: transaction, or NULL if requests trx_t* trx) /* in: transaction, or NULL if requests
by any transaction are wanted */ by any transaction are wanted */
...@@ -1338,8 +1385,11 @@ lock_rec_other_has_expl_req( ...@@ -1338,8 +1385,11 @@ lock_rec_other_has_expl_req(
lock_t* lock; lock_t* lock;
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad((mode == LOCK_X) || (mode == LOCK_S)); ut_ad(mode == LOCK_X || mode == LOCK_S);
ut_ad(gap == 0 || gap == LOCK_GAP);
ut_ad(wait == 0 || wait == LOCK_WAIT);
ut_ad(ignore_insert_intention == TRUE
|| ignore_insert_intention == FALSE);
lock = lock_rec_get_first(rec); lock = lock_rec_get_first(rec);
while (lock) { while (lock) {
...@@ -1349,8 +1399,15 @@ lock_rec_other_has_expl_req( ...@@ -1349,8 +1399,15 @@ lock_rec_other_has_expl_req(
&& (wait || !lock_get_wait(lock)) && (wait || !lock_get_wait(lock))
&& lock_mode_stronger_or_eq(lock_get_mode(lock), mode)) { && lock_mode_stronger_or_eq(lock_get_mode(lock), mode)) {
if (ignore_insert_intention
&& lock_rec_get_insert_intention(lock)) {
/* We can ignore this lock */
} else {
return(lock); return(lock);
} }
}
lock = lock_rec_get_next(rec, lock); lock = lock_rec_get_next(rec, lock);
} }
...@@ -1526,7 +1583,10 @@ lock_rec_enqueue_waiting( ...@@ -1526,7 +1583,10 @@ lock_rec_enqueue_waiting(
DB_QUE_THR_SUSPENDED */ DB_QUE_THR_SUSPENDED */
ulint type_mode,/* in: lock mode this transaction is ulint type_mode,/* in: lock mode this transaction is
requesting: LOCK_S or LOCK_X, ORed with requesting: LOCK_S or LOCK_X, ORed with
LOCK_GAP if a gap lock is requested */ LOCK_GAP if a gap lock is requested, ORed
with LOCK_INSERT_INTENTION if the waiting
lock request is set when performing an
insert of an index record */
rec_t* rec, /* in: record */ rec_t* rec, /* in: record */
dict_index_t* index, /* in: index of record */ dict_index_t* index, /* in: index of record */
que_thr_t* thr) /* in: query thread */ que_thr_t* thr) /* in: query thread */
...@@ -1613,11 +1673,12 @@ lock_rec_add_to_queue( ...@@ -1613,11 +1673,12 @@ lock_rec_add_to_queue(
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad((type_mode & (LOCK_WAIT | LOCK_GAP)) ut_ad((type_mode & (LOCK_WAIT | LOCK_GAP))
|| ((type_mode & LOCK_MODE_MASK) != LOCK_S) || ((type_mode & LOCK_MODE_MASK) != LOCK_S)
|| !lock_rec_other_has_expl_req(LOCK_X, 0, LOCK_WAIT, rec, trx)); || !lock_rec_other_has_expl_req(LOCK_X, 0, LOCK_WAIT,
FALSE, rec, trx));
ut_ad((type_mode & (LOCK_WAIT | LOCK_GAP)) ut_ad((type_mode & (LOCK_WAIT | LOCK_GAP))
|| ((type_mode & LOCK_MODE_MASK) != LOCK_X) || ((type_mode & LOCK_MODE_MASK) != LOCK_X)
|| !lock_rec_other_has_expl_req(LOCK_S, 0, LOCK_WAIT, rec, trx)); || !lock_rec_other_has_expl_req(LOCK_S, 0, LOCK_WAIT,
FALSE,rec, trx));
type_mode = type_mode | LOCK_REC; type_mode = type_mode | LOCK_REC;
page = buf_frame_align(rec); page = buf_frame_align(rec);
...@@ -1752,8 +1813,8 @@ lock_rec_lock_slow( ...@@ -1752,8 +1813,8 @@ lock_rec_lock_slow(
nothing */ nothing */
err = DB_SUCCESS; err = DB_SUCCESS;
} else if (lock_rec_other_has_expl_req(confl_mode, 0, LOCK_WAIT, rec, } else if (lock_rec_other_has_expl_req(confl_mode, 0, LOCK_WAIT,
trx)) { FALSE, rec, trx)) {
/* If another transaction has a non-gap conflicting request in /* If another transaction has a non-gap conflicting request in
the queue, as this transaction does not have a lock strong the queue, as this transaction does not have a lock strong
enough already granted on the record, we have to wait. */ enough already granted on the record, we have to wait. */
...@@ -2056,7 +2117,8 @@ lock_rec_inherit_to_gap( ...@@ -2056,7 +2117,8 @@ lock_rec_inherit_to_gap(
lock = lock_rec_get_first(rec); lock = lock_rec_get_first(rec);
while (lock != NULL) { while (lock != NULL) {
lock_rec_add_to_queue((lock->type_mode | LOCK_GAP) & ~LOCK_WAIT, lock_rec_add_to_queue(((lock->type_mode | LOCK_GAP)
& ~LOCK_WAIT),
heir, lock->index, lock->trx); heir, lock->index, lock->trx);
lock = lock_rec_get_next(rec, lock); lock = lock_rec_get_next(rec, lock);
} }
...@@ -2312,7 +2374,7 @@ list start is moved to another page. */ ...@@ -2312,7 +2374,7 @@ list start is moved to another page. */
void void
lock_move_rec_list_start( lock_move_rec_list_start(
/*=====================*/ /*=====================*/
page_t* new_page __attribute__((unused)),/*in: index page to move to */ page_t* new_page, /* in: index page to move to */
page_t* page, /* in: index page */ page_t* page, /* in: index page */
rec_t* rec, /* in: record on page: this is the rec_t* rec, /* in: record on page: this is the
first record NOT copied */ first record NOT copied */
...@@ -2782,7 +2844,7 @@ lock_deadlock_recursive( ...@@ -2782,7 +2844,7 @@ lock_deadlock_recursive(
we return TRUE */ we return TRUE */
{ {
lock_t* lock; lock_t* lock;
ulint bit_no = 0; /* remove warning */ ulint bit_no;
trx_t* lock_trx; trx_t* lock_trx;
char* err_buf; char* err_buf;
...@@ -3486,6 +3548,10 @@ lock_rec_print( ...@@ -3486,6 +3548,10 @@ lock_rec_print(
buf += sprintf(buf, " gap type lock"); buf += sprintf(buf, " gap type lock");
} }
if (lock_rec_get_insert_intention(lock)) {
buf += sprintf(buf, " insert intention");
}
if (lock_get_wait(lock)) { if (lock_get_wait(lock)) {
buf += sprintf(buf, " waiting"); buf += sprintf(buf, " waiting");
} }
...@@ -3503,6 +3569,15 @@ lock_rec_print( ...@@ -3503,6 +3569,15 @@ lock_rec_print(
IB__FILE__, __LINE__, &mtr); IB__FILE__, __LINE__, &mtr);
if (page) { if (page) {
page = buf_page_get_nowait(space, page_no, RW_S_LATCH, &mtr); page = buf_page_get_nowait(space, page_no, RW_S_LATCH, &mtr);
if (!page) {
/* Let us try to get an X-latch. If the current thread
is holding an X-latch on the page, we cannot get an
S-latch. */
page = buf_page_get_nowait(space, page_no, RW_X_LATCH,
&mtr);
}
} }
if (page) { if (page) {
...@@ -3869,7 +3944,7 @@ lock_rec_queue_validate( ...@@ -3869,7 +3944,7 @@ lock_rec_queue_validate(
impl_trx = lock_clust_rec_some_has_impl(rec, index); impl_trx = lock_clust_rec_some_has_impl(rec, index);
if (impl_trx && lock_rec_other_has_expl_req(LOCK_S, 0, if (impl_trx && lock_rec_other_has_expl_req(LOCK_S, 0,
LOCK_WAIT, rec, impl_trx)) { LOCK_WAIT, FALSE, rec, impl_trx)) {
ut_a(lock_rec_has_expl(LOCK_X, rec, impl_trx)); ut_a(lock_rec_has_expl(LOCK_X, rec, impl_trx));
} }
...@@ -3884,7 +3959,7 @@ lock_rec_queue_validate( ...@@ -3884,7 +3959,7 @@ lock_rec_queue_validate(
impl_trx = lock_sec_rec_some_has_impl_off_kernel(rec, index); impl_trx = lock_sec_rec_some_has_impl_off_kernel(rec, index);
if (impl_trx && lock_rec_other_has_expl_req(LOCK_S, 0, if (impl_trx && lock_rec_other_has_expl_req(LOCK_S, 0,
LOCK_WAIT, rec, impl_trx)) { LOCK_WAIT, FALSE, rec, impl_trx)) {
ut_a(lock_rec_has_expl(LOCK_X, rec, impl_trx)); ut_a(lock_rec_has_expl(LOCK_X, rec, impl_trx));
} }
...@@ -3909,12 +3984,10 @@ lock_rec_queue_validate( ...@@ -3909,12 +3984,10 @@ lock_rec_queue_validate(
if (lock_get_mode(lock) == LOCK_S) { if (lock_get_mode(lock) == LOCK_S) {
ut_a(!lock_rec_other_has_expl_req(LOCK_X, ut_a(!lock_rec_other_has_expl_req(LOCK_X,
0, 0, rec, 0, 0, FALSE, rec, lock->trx));
lock->trx));
} else { } else {
ut_a(!lock_rec_other_has_expl_req(LOCK_S, ut_a(!lock_rec_other_has_expl_req(LOCK_S,
0, 0, rec, 0, 0, FALSE, rec, lock->trx));
lock->trx));
} }
} else if (lock_get_wait(lock) && !lock_rec_get_gap(lock)) { } else if (lock_get_wait(lock) && !lock_rec_get_gap(lock)) {
...@@ -4153,11 +4226,19 @@ lock_rec_insert_check_and_lock( ...@@ -4153,11 +4226,19 @@ lock_rec_insert_check_and_lock(
*inherit = TRUE; *inherit = TRUE;
/* If another transaction has an explicit lock request, gap or not, /* If another transaction has an explicit lock request, gap or not,
waiting or granted, on the successor, the insert has to wait */ waiting or granted, on the successor, the insert has to wait.
if (lock_rec_other_has_expl_req(LOCK_S, LOCK_GAP, LOCK_WAIT, next_rec, An exception is the case where the lock by the another transaction
trx)) { is a gap type lock which it placed to wait for its turn to insert. We
err = lock_rec_enqueue_waiting(LOCK_X | LOCK_GAP, next_rec, do not consider that kind of a lock conflicting with our insert. This
eliminates an unnecessary deadlock which resulted when 2 transactions
had to wait for their insert. Both had waiting gap type lock requests
on the successor, which produced an unnecessary deadlock. */
if (lock_rec_other_has_expl_req(LOCK_S, LOCK_GAP, LOCK_WAIT,
TRUE, next_rec, trx)) {
err = lock_rec_enqueue_waiting(LOCK_X | LOCK_GAP
| LOCK_INSERT_INTENTION, next_rec,
index, thr); index, thr);
} else { } else {
err = DB_SUCCESS; err = DB_SUCCESS;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment