Commit 4a8aa70e authored by Marko Mäkelä's avatar Marko Mäkelä

Bug#53674: InnoDB: Error: unlock row could not find a 4 mode lock on the record

In semi-consistent read, only unlock freshly locked non-matching records.

Define DB_SUCCESS_LOCKED_REC for indicating a successful operation
where a record lock was created.

lock_rec_lock_fast(): Return LOCK_REC_SUCCESS,
LOCK_REC_SUCCESS_CREATED, or LOCK_REC_FAIL instead of TRUE/FALSE.

lock_sec_rec_read_check_and_lock(),
lock_clust_rec_read_check_and_lock(), lock_rec_enqueue_waiting(),
lock_rec_lock_slow(), lock_rec_lock(), row_ins_set_shared_rec_lock(),
row_ins_set_exclusive_rec_lock(), sel_set_rec_lock(),
row_sel_get_clust_rec_for_mysql(): Return DB_SUCCESS_LOCKED_REC if a
new record lock was created. Adjust callers.

row_unlock_for_mysql(): Correct the function documentation.

row_prebuilt_t::new_rec_locks: Correct the documentation.
parent 9938680e
create table bug53674(a int)engine=innodb;
insert into bug53674 values (1),(2);
start transaction;
select * from bug53674 for update;
a
1
2
select * from bug53674 where a=(select a from bug53674 where a > 1);
a
2
drop table bug53674;
--log-bin --innodb-locks-unsafe-for-binlog --binlog-format=mixed
-- source include/have_innodb.inc
create table bug53674(a int)engine=innodb;
insert into bug53674 values (1),(2);
start transaction;
select * from bug53674 for update;
select * from bug53674 where a=(select a from bug53674 where a > 1);
drop table bug53674;
...@@ -10,6 +10,8 @@ Created 5/24/1996 Heikki Tuuri ...@@ -10,6 +10,8 @@ Created 5/24/1996 Heikki Tuuri
#define db0err_h #define db0err_h
#define DB_SUCCESS_LOCKED_REC 9 /* like DB_SUCCESS, but a new
explicit record lock was created */
#define DB_SUCCESS 10 #define DB_SUCCESS 10
/* The following are error codes */ /* The following are error codes */
......
...@@ -292,14 +292,15 @@ lock_sec_rec_modify_check_and_lock( ...@@ -292,14 +292,15 @@ lock_sec_rec_modify_check_and_lock(
dict_index_t* index, /* in: secondary index */ dict_index_t* index, /* in: secondary index */
que_thr_t* thr); /* in: query thread */ que_thr_t* thr); /* in: query thread */
/************************************************************************* /*************************************************************************
Like the counterpart for a clustered index below, but now we read a Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record. */ secondary index record. */
ulint ulint
lock_sec_rec_read_check_and_lock( lock_sec_rec_read_check_and_lock(
/*=============================*/ /*=============================*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set, ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set,
does nothing */ does nothing */
rec_t* rec, /* in: user record or page supremum record rec_t* rec, /* in: user record or page supremum record
...@@ -324,8 +325,9 @@ lock on the record. */ ...@@ -324,8 +325,9 @@ lock on the record. */
ulint ulint
lock_clust_rec_read_check_and_lock( lock_clust_rec_read_check_and_lock(
/*===============================*/ /*===============================*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set, ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set,
does nothing */ does nothing */
rec_t* rec, /* in: user record or page supremum record rec_t* rec, /* in: user record or page supremum record
......
...@@ -246,22 +246,20 @@ row_update_for_mysql( ...@@ -246,22 +246,20 @@ row_update_for_mysql(
row_prebuilt_t* prebuilt); /* in: prebuilt struct in MySQL row_prebuilt_t* prebuilt); /* in: prebuilt struct in MySQL
handle */ handle */
/************************************************************************* /*************************************************************************
This can only be used when srv_locks_unsafe_for_binlog is TRUE or This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
session is using a READ COMMITTED isolation level. Before session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
calling this function we must use trx_reset_new_rec_lock_info() and Before calling this function row_search_for_mysql() must have
trx_register_new_rec_lock() to store the information which new record locks initialized prebuilt->new_rec_locks to store the information which new
really were set. This function removes a newly set lock under prebuilt->pcur, record locks really were set. This function removes a newly set
and also under prebuilt->clust_pcur. Currently, this is only used and tested clustered index record lock under prebuilt->pcur or
in the case of an UPDATE or a DELETE statement, where the row lock is of the prebuilt->clust_pcur. Thus, this implements a 'mini-rollback' that
LOCK_X type. releases the latest clustered index record lock we set. */
Thus, this implements a 'mini-rollback' that releases the latest record
locks we set. */
int int
row_unlock_for_mysql( row_unlock_for_mysql(
/*=================*/ /*=================*/
/* out: error code or DB_SUCCESS */ /* out: error code or DB_SUCCESS */
row_prebuilt_t* prebuilt, /* in: prebuilt struct in MySQL row_prebuilt_t* prebuilt, /* in/out: prebuilt struct in MySQL
handle */ handle */
ibool has_latches_on_recs);/* TRUE if called so that we have ibool has_latches_on_recs);/* TRUE if called so that we have
the latches on the records under pcur the latches on the records under pcur
...@@ -660,18 +658,17 @@ struct row_prebuilt_struct { ...@@ -660,18 +658,17 @@ struct row_prebuilt_struct {
ulint new_rec_locks; /* normally 0; if ulint new_rec_locks; /* normally 0; if
srv_locks_unsafe_for_binlog is srv_locks_unsafe_for_binlog is
TRUE or session is using READ TRUE or session is using READ
COMMITTED isolation level, in a COMMITTED or READ UNCOMMITTED
cursor search, if we set a new isolation level, set in
record lock on an index, this is row_search_for_mysql() if we set a new
incremented; this is used in record lock on the secondary
releasing the locks under the or clustered index; this is
cursors if we are performing an used in row_unlock_for_mysql()
UPDATE and we determine after when releasing the lock under
retrieving the row that it does the cursor if we determine
not need to be locked; thus, after retrieving the row that
these can be used to implement a it does not need to be locked
'mini-rollback' that releases ('mini-rollback') */
the latest record locks */
ulint mysql_prefix_len;/* byte offset of the end of ulint mysql_prefix_len;/* byte offset of the end of
the last requested column */ the last requested column */
ulint mysql_row_len; /* length in bytes of a row in the ulint mysql_row_len; /* length in bytes of a row in the
......
...@@ -1739,11 +1739,12 @@ ulint ...@@ -1739,11 +1739,12 @@ ulint
lock_rec_enqueue_waiting( lock_rec_enqueue_waiting(
/*=====================*/ /*=====================*/
/* out: DB_LOCK_WAIT, DB_DEADLOCK, or /* out: DB_LOCK_WAIT, DB_DEADLOCK, or
DB_QUE_THR_SUSPENDED, or DB_SUCCESS; DB_QUE_THR_SUSPENDED, or DB_SUCCESS_LOCKED_REC;
DB_SUCCESS means that there was a deadlock, DB_SUCCESS_LOCKED_REC means that there
but another transaction was chosen as a was a deadlock, but another
victim, and we got the lock immediately: transaction was chosen as a victim,
no need to wait then */ and we got the lock immediately: no
need to wait then */
ulint type_mode,/* in: lock mode this transaction is ulint type_mode,/* in: lock mode this transaction is
requesting: LOCK_S or LOCK_X, possibly ORed requesting: LOCK_S or LOCK_X, possibly ORed
with LOCK_GAP or LOCK_REC_NOT_GAP, ORed with LOCK_GAP or LOCK_REC_NOT_GAP, ORed
...@@ -1804,7 +1805,7 @@ lock_rec_enqueue_waiting( ...@@ -1804,7 +1805,7 @@ lock_rec_enqueue_waiting(
if (trx->wait_lock == NULL) { if (trx->wait_lock == NULL) {
return(DB_SUCCESS); return(DB_SUCCESS_LOCKED_REC);
} }
trx->que_state = TRX_QUE_LOCK_WAIT; trx->que_state = TRX_QUE_LOCK_WAIT;
...@@ -1903,6 +1904,16 @@ lock_rec_add_to_queue( ...@@ -1903,6 +1904,16 @@ lock_rec_add_to_queue(
return(lock_rec_create(type_mode, rec, index, trx)); return(lock_rec_create(type_mode, rec, index, trx));
} }
/** Record locking request status */
enum lock_rec_req_status {
/** Failed to acquire a lock */
LOCK_REC_FAIL,
/** Succeeded in acquiring a lock (implicit or already acquired) */
LOCK_REC_SUCCESS,
/** Explicitly created a new lock */
LOCK_REC_SUCCESS_CREATED
};
/************************************************************************* /*************************************************************************
This is a fast routine for locking a record in the most common cases: This is a fast routine for locking a record in the most common cases:
there are no explicit locks on the page, or there is just one lock, owned there are no explicit locks on the page, or there is just one lock, owned
...@@ -1911,10 +1922,10 @@ which does NOT look at implicit locks! Checks lock compatibility within ...@@ -1911,10 +1922,10 @@ which does NOT look at implicit locks! Checks lock compatibility within
explicit locks. This function sets a normal next-key lock, or in the case of explicit locks. This function sets a normal next-key lock, or in the case of
a page supremum record, a gap type lock. */ a page supremum record, a gap type lock. */
UNIV_INLINE UNIV_INLINE
ibool enum lock_rec_req_status
lock_rec_lock_fast( lock_rec_lock_fast(
/*===============*/ /*===============*/
/* out: TRUE if locking succeeded */ /* out: whether the locking succeeded */
ibool impl, /* in: if TRUE, no lock is set if no wait ibool impl, /* in: if TRUE, no lock is set if no wait
is necessary: we assume that the caller will is necessary: we assume that the caller will
set an implicit lock */ set an implicit lock */
...@@ -1950,19 +1961,19 @@ lock_rec_lock_fast( ...@@ -1950,19 +1961,19 @@ lock_rec_lock_fast(
lock_rec_create(mode, rec, index, trx); lock_rec_create(mode, rec, index, trx);
} }
return(TRUE); return(LOCK_REC_SUCCESS_CREATED);
} }
if (lock_rec_get_next_on_page(lock)) { if (lock_rec_get_next_on_page(lock)) {
return(FALSE); return(LOCK_REC_FAIL);
} }
if (lock->trx != trx if (lock->trx != trx
|| lock->type_mode != (mode | LOCK_REC) || lock->type_mode != (mode | LOCK_REC)
|| lock_rec_get_n_bits(lock) <= heap_no) { || lock_rec_get_n_bits(lock) <= heap_no) {
return(FALSE); return(LOCK_REC_FAIL);
} }
if (!impl) { if (!impl) {
...@@ -1971,10 +1982,11 @@ lock_rec_lock_fast( ...@@ -1971,10 +1982,11 @@ lock_rec_lock_fast(
if (!lock_rec_get_nth_bit(lock, heap_no)) { if (!lock_rec_get_nth_bit(lock, heap_no)) {
lock_rec_set_nth_bit(lock, heap_no); lock_rec_set_nth_bit(lock, heap_no);
return(LOCK_REC_SUCCESS_CREATED);
} }
} }
return(TRUE); return(LOCK_REC_SUCCESS);
} }
/************************************************************************* /*************************************************************************
...@@ -1986,8 +1998,9 @@ static ...@@ -1986,8 +1998,9 @@ static
ulint ulint
lock_rec_lock_slow( lock_rec_lock_slow(
/*===============*/ /*===============*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, or error /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
code */ DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
ibool impl, /* in: if TRUE, no lock is set if no wait is ibool impl, /* in: if TRUE, no lock is set if no wait is
necessary: we assume that the caller will set necessary: we assume that the caller will set
an implicit lock */ an implicit lock */
...@@ -1998,7 +2011,6 @@ lock_rec_lock_slow( ...@@ -1998,7 +2011,6 @@ lock_rec_lock_slow(
que_thr_t* thr) /* in: query thread */ que_thr_t* thr) /* in: query thread */
{ {
trx_t* trx; trx_t* trx;
ulint err;
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
...@@ -2017,26 +2029,21 @@ lock_rec_lock_slow( ...@@ -2017,26 +2029,21 @@ lock_rec_lock_slow(
/* The trx already has a strong enough lock on rec: do /* The trx already has a strong enough lock on rec: do
nothing */ nothing */
err = DB_SUCCESS;
} else if (lock_rec_other_has_conflicting(mode, rec, trx)) { } else if (lock_rec_other_has_conflicting(mode, rec, trx)) {
/* If another transaction has a non-gap conflicting request in /* If another transaction has a non-gap conflicting request in
the queue, as this transaction does not have a lock strong the queue, as this transaction does not have a lock strong
enough already granted on the record, we have to wait. */ enough already granted on the record, we have to wait. */
err = lock_rec_enqueue_waiting(mode, rec, index, thr); return(lock_rec_enqueue_waiting(mode, rec, index, thr));
} else { } else if (!impl) {
if (!impl) { /* Set the requested lock on the record */
/* Set the requested lock on the record */
lock_rec_add_to_queue(LOCK_REC | mode, rec, index, lock_rec_add_to_queue(LOCK_REC | mode, rec, index, trx);
trx); return(DB_SUCCESS_LOCKED_REC);
}
err = DB_SUCCESS;
} }
return(err); return(DB_SUCCESS);
} }
/************************************************************************* /*************************************************************************
...@@ -2049,8 +2056,9 @@ static ...@@ -2049,8 +2056,9 @@ static
ulint ulint
lock_rec_lock( lock_rec_lock(
/*==========*/ /*==========*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, or error /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
code */ DB_LOCK_WAIT, DB_DEADLOCK, or
DB_QUE_THR_SUSPENDED */
ibool impl, /* in: if TRUE, no lock is set if no wait is ibool impl, /* in: if TRUE, no lock is set if no wait is
necessary: we assume that the caller will set necessary: we assume that the caller will set
an implicit lock */ an implicit lock */
...@@ -2060,8 +2068,6 @@ lock_rec_lock( ...@@ -2060,8 +2068,6 @@ lock_rec_lock(
dict_index_t* index, /* in: index of record */ dict_index_t* index, /* in: index of record */
que_thr_t* thr) /* in: query thread */ que_thr_t* thr) /* in: query thread */
{ {
ulint err;
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
|| lock_table_has(thr_get_trx(thr), index->table, LOCK_IS)); || lock_table_has(thr_get_trx(thr), index->table, LOCK_IS));
...@@ -2073,17 +2079,19 @@ lock_rec_lock( ...@@ -2073,17 +2079,19 @@ lock_rec_lock(
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP || mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0); || mode - (LOCK_MODE_MASK & mode) == 0);
if (lock_rec_lock_fast(impl, mode, rec, index, thr)) { /* We try a simplified and faster subroutine for the most
common cases */
/* We try a simplified and faster subroutine for the most switch (lock_rec_lock_fast(impl, mode, rec, index, thr)) {
common cases */ case LOCK_REC_SUCCESS:
return(DB_SUCCESS);
err = DB_SUCCESS; case LOCK_REC_SUCCESS_CREATED:
} else { return(DB_SUCCESS_LOCKED_REC);
err = lock_rec_lock_slow(impl, mode, rec, index, thr); case LOCK_REC_FAIL:
return(lock_rec_lock_slow(impl, mode, rec, index, thr));
} }
return(err); ut_error;
return(DB_ERROR);
} }
/************************************************************************* /*************************************************************************
...@@ -4832,7 +4840,7 @@ lock_rec_insert_check_and_lock( ...@@ -4832,7 +4840,7 @@ lock_rec_insert_check_and_lock(
lock = lock_rec_get_first(next_rec); lock = lock_rec_get_first(next_rec);
if (lock == NULL) { if (UNIV_LIKELY(lock == NULL)) {
/* We optimize CPU time usage in the simplest case */ /* We optimize CPU time usage in the simplest case */
lock_mutex_exit_kernel(); lock_mutex_exit_kernel();
...@@ -4840,8 +4848,7 @@ lock_rec_insert_check_and_lock( ...@@ -4840,8 +4848,7 @@ lock_rec_insert_check_and_lock(
if (!(index->type & DICT_CLUSTERED)) { if (!(index->type & DICT_CLUSTERED)) {
/* Update the page max trx id field */ /* Update the page max trx id field */
page_update_max_trx_id(buf_frame_align(rec), page_update_max_trx_id(buf_frame_align(rec), trx->id);
thr_get_trx(thr)->id);
} }
return(DB_SUCCESS); return(DB_SUCCESS);
...@@ -4873,11 +4880,16 @@ lock_rec_insert_check_and_lock( ...@@ -4873,11 +4880,16 @@ lock_rec_insert_check_and_lock(
lock_mutex_exit_kernel(); lock_mutex_exit_kernel();
if (!(index->type & DICT_CLUSTERED) && (err == DB_SUCCESS)) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
/* fall through */
case DB_SUCCESS:
if (index->type & DICT_CLUSTERED) {
break;
}
/* Update the page max trx id field */ /* Update the page max trx id field */
page_update_max_trx_id(buf_frame_align(rec), page_update_max_trx_id(buf_frame_align(rec), trx->id);
thr_get_trx(thr)->id);
} }
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
...@@ -4984,6 +4996,10 @@ lock_clust_rec_modify_check_and_lock( ...@@ -4984,6 +4996,10 @@ lock_clust_rec_modify_check_and_lock(
ut_ad(lock_rec_queue_validate(rec, index, offsets)); ut_ad(lock_rec_queue_validate(rec, index, offsets));
if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
err = DB_SUCCESS;
}
return(err); return(err);
} }
...@@ -5043,25 +5059,29 @@ lock_sec_rec_modify_check_and_lock( ...@@ -5043,25 +5059,29 @@ lock_sec_rec_modify_check_and_lock(
} }
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
if (err == DB_SUCCESS) { if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
/* Update the page max trx id field */ /* Update the page max trx id field */
/* It might not be necessary to do this if
err == DB_SUCCESS (no new lock created),
but it should not cost too much performance. */
page_update_max_trx_id(buf_frame_align(rec), page_update_max_trx_id(buf_frame_align(rec),
thr_get_trx(thr)->id); thr_get_trx(thr)->id);
err = DB_SUCCESS;
} }
return(err); return(err);
} }
/************************************************************************* /*************************************************************************
Like the counterpart for a clustered index below, but now we read a Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record. */ secondary index record. */
ulint ulint
lock_sec_rec_read_check_and_lock( lock_sec_rec_read_check_and_lock(
/*=============================*/ /*=============================*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set, ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set,
does nothing */ does nothing */
rec_t* rec, /* in: user record or page supremum record rec_t* rec, /* in: user record or page supremum record
...@@ -5126,8 +5146,9 @@ lock on the record. */ ...@@ -5126,8 +5146,9 @@ lock on the record. */
ulint ulint
lock_clust_rec_read_check_and_lock( lock_clust_rec_read_check_and_lock(
/*===============================*/ /*===============================*/
/* out: DB_SUCCESS, DB_LOCK_WAIT, /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */ DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set, ulint flags, /* in: if BTR_NO_LOCKING_FLAG bit is set,
does nothing */ does nothing */
rec_t* rec, /* in: user record or page supremum record rec_t* rec, /* in: user record or page supremum record
...@@ -5206,16 +5227,21 @@ lock_clust_rec_read_check_and_lock_alt( ...@@ -5206,16 +5227,21 @@ lock_clust_rec_read_check_and_lock_alt(
mem_heap_t* tmp_heap = NULL; mem_heap_t* tmp_heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE]; ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_; ulint* offsets = offsets_;
ulint ret; ulint err;
*offsets_ = (sizeof offsets_) / sizeof *offsets_; *offsets_ = (sizeof offsets_) / sizeof *offsets_;
offsets = rec_get_offsets(rec, index, offsets, offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &tmp_heap); ULINT_UNDEFINED, &tmp_heap);
ret = lock_clust_rec_read_check_and_lock(flags, rec, index, err = lock_clust_rec_read_check_and_lock(flags, rec, index,
offsets, mode, gap_mode, thr); offsets, mode, gap_mode, thr);
if (tmp_heap) { if (tmp_heap) {
mem_heap_free(tmp_heap); mem_heap_free(tmp_heap);
} }
return(ret);
if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
err = DB_SUCCESS;
}
return(err);
} }
...@@ -1114,7 +1114,8 @@ static ...@@ -1114,7 +1114,8 @@ static
ulint ulint
row_ins_set_shared_rec_lock( row_ins_set_shared_rec_lock(
/*========================*/ /*========================*/
/* out: DB_SUCCESS or error code */ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
or error code */
ulint type, /* in: LOCK_ORDINARY, LOCK_GAP, or ulint type, /* in: LOCK_ORDINARY, LOCK_GAP, or
LOCK_REC_NOT_GAP type lock */ LOCK_REC_NOT_GAP type lock */
rec_t* rec, /* in: record */ rec_t* rec, /* in: record */
...@@ -1145,7 +1146,8 @@ static ...@@ -1145,7 +1146,8 @@ static
ulint ulint
row_ins_set_exclusive_rec_lock( row_ins_set_exclusive_rec_lock(
/*===========================*/ /*===========================*/
/* out: DB_SUCCESS or error code */ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
or error code */
ulint type, /* in: LOCK_ORDINARY, LOCK_GAP, or ulint type, /* in: LOCK_ORDINARY, LOCK_GAP, or
LOCK_REC_NOT_GAP type lock */ LOCK_REC_NOT_GAP type lock */
rec_t* rec, /* in: record */ rec_t* rec, /* in: record */
...@@ -1195,9 +1197,7 @@ row_ins_check_foreign_constraint( ...@@ -1195,9 +1197,7 @@ row_ins_check_foreign_constraint(
dict_table_t* check_table; dict_table_t* check_table;
dict_index_t* check_index; dict_index_t* check_index;
ulint n_fields_cmp; ulint n_fields_cmp;
rec_t* rec;
btr_pcur_t pcur; btr_pcur_t pcur;
ibool moved;
int cmp; int cmp;
ulint err; ulint err;
ulint i; ulint i;
...@@ -1328,12 +1328,12 @@ row_ins_check_foreign_constraint( ...@@ -1328,12 +1328,12 @@ row_ins_check_foreign_constraint(
/* Scan index records and check if there is a matching record */ /* Scan index records and check if there is a matching record */
for (;;) { do {
rec = btr_pcur_get_rec(&pcur); rec_t* rec = btr_pcur_get_rec(&pcur);
if (page_rec_is_infimum(rec)) { if (page_rec_is_infimum(rec)) {
goto next_rec; continue;
} }
offsets = rec_get_offsets(rec, check_index, offsets = rec_get_offsets(rec, check_index,
...@@ -1343,12 +1343,13 @@ row_ins_check_foreign_constraint( ...@@ -1343,12 +1343,13 @@ row_ins_check_foreign_constraint(
err = row_ins_set_shared_rec_lock( err = row_ins_set_shared_rec_lock(
LOCK_ORDINARY, rec, check_index, offsets, thr); LOCK_ORDINARY, rec, check_index, offsets, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
break; case DB_SUCCESS:
continue;
default:
goto end_scan;
} }
goto next_rec;
} }
cmp = cmp_dtuple_rec(entry, rec, offsets); cmp = cmp_dtuple_rec(entry, rec, offsets);
...@@ -1359,9 +1360,12 @@ row_ins_check_foreign_constraint( ...@@ -1359,9 +1360,12 @@ row_ins_check_foreign_constraint(
err = row_ins_set_shared_rec_lock( err = row_ins_set_shared_rec_lock(
LOCK_ORDINARY, rec, check_index, LOCK_ORDINARY, rec, check_index,
offsets, thr); offsets, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break; break;
default:
goto end_scan;
} }
} else { } else {
/* Found a matching record. Lock only /* Found a matching record. Lock only
...@@ -1372,15 +1376,18 @@ row_ins_check_foreign_constraint( ...@@ -1372,15 +1376,18 @@ row_ins_check_foreign_constraint(
LOCK_REC_NOT_GAP, rec, check_index, LOCK_REC_NOT_GAP, rec, check_index,
offsets, thr); offsets, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break; break;
default:
goto end_scan;
} }
if (check_ref) { if (check_ref) {
err = DB_SUCCESS; err = DB_SUCCESS;
break; goto end_scan;
} else if (foreign->type != 0) { } else if (foreign->type != 0) {
/* There is an ON UPDATE or ON DELETE /* There is an ON UPDATE or ON DELETE
condition: check them in a separate condition: check them in a separate
...@@ -1406,7 +1413,7 @@ row_ins_check_foreign_constraint( ...@@ -1406,7 +1413,7 @@ row_ins_check_foreign_constraint(
err = DB_FOREIGN_DUPLICATE_KEY; err = DB_FOREIGN_DUPLICATE_KEY;
} }
break; goto end_scan;
} }
} else { } else {
row_ins_foreign_report_err( row_ins_foreign_report_err(
...@@ -1414,48 +1421,39 @@ row_ins_check_foreign_constraint( ...@@ -1414,48 +1421,39 @@ row_ins_check_foreign_constraint(
thr, foreign, rec, entry); thr, foreign, rec, entry);
err = DB_ROW_IS_REFERENCED; err = DB_ROW_IS_REFERENCED;
break; goto end_scan;
} }
} }
} } else {
ut_a(cmp < 0);
if (cmp < 0) {
err = row_ins_set_shared_rec_lock( err = row_ins_set_shared_rec_lock(
LOCK_GAP, rec, check_index, offsets, thr); LOCK_GAP, rec, check_index, offsets, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
break; case DB_SUCCESS:
} if (check_ref) {
err = DB_NO_REFERENCED_ROW;
if (check_ref) { row_ins_foreign_report_add_err(
err = DB_NO_REFERENCED_ROW; trx, foreign, rec, entry);
row_ins_foreign_report_add_err( } else {
trx, foreign, rec, entry); err = DB_SUCCESS;
} else { }
err = DB_SUCCESS;
} }
break; goto end_scan;
} }
} while (btr_pcur_move_to_next(&pcur, &mtr));
ut_a(cmp == 0); if (check_ref) {
next_rec: row_ins_foreign_report_add_err(
moved = btr_pcur_move_to_next(&pcur, &mtr); trx, foreign, btr_pcur_get_rec(&pcur), entry);
err = DB_NO_REFERENCED_ROW;
if (!moved) { } else {
if (check_ref) { err = DB_SUCCESS;
rec = btr_pcur_get_rec(&pcur);
row_ins_foreign_report_add_err(
trx, foreign, rec, entry);
err = DB_NO_REFERENCED_ROW;
} else {
err = DB_SUCCESS;
}
break;
}
} }
end_scan:
btr_pcur_close(&pcur); btr_pcur_close(&pcur);
mtr_commit(&mtr); mtr_commit(&mtr);
...@@ -1641,10 +1639,8 @@ row_ins_scan_sec_index_for_duplicate( ...@@ -1641,10 +1639,8 @@ row_ins_scan_sec_index_for_duplicate(
ulint i; ulint i;
int cmp; int cmp;
ulint n_fields_cmp; ulint n_fields_cmp;
rec_t* rec;
btr_pcur_t pcur; btr_pcur_t pcur;
ulint err = DB_SUCCESS; ulint err = DB_SUCCESS;
ibool moved;
unsigned allow_duplicates; unsigned allow_duplicates;
mtr_t mtr; mtr_t mtr;
mem_heap_t* heap = NULL; mem_heap_t* heap = NULL;
...@@ -1680,12 +1676,12 @@ row_ins_scan_sec_index_for_duplicate( ...@@ -1680,12 +1676,12 @@ row_ins_scan_sec_index_for_duplicate(
/* Scan index records and check if there is a duplicate */ /* Scan index records and check if there is a duplicate */
for (;;) { do {
rec = btr_pcur_get_rec(&pcur); rec_t* rec = btr_pcur_get_rec(&pcur);
if (page_rec_is_infimum(rec)) { if (page_rec_is_infimum(rec)) {
goto next_rec; continue;
} }
offsets = rec_get_offsets(rec, index, offsets, offsets = rec_get_offsets(rec, index, offsets,
...@@ -1706,14 +1702,18 @@ row_ins_scan_sec_index_for_duplicate( ...@@ -1706,14 +1702,18 @@ row_ins_scan_sec_index_for_duplicate(
LOCK_ORDINARY, rec, index, offsets, thr); LOCK_ORDINARY, rec, index, offsets, thr);
} }
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break; break;
default:
goto end_scan;
} }
if (page_rec_is_supremum(rec)) { if (page_rec_is_supremum(rec)) {
goto next_rec; continue;
} }
cmp = cmp_dtuple_rec(entry, rec, offsets); cmp = cmp_dtuple_rec(entry, rec, offsets);
...@@ -1725,23 +1725,15 @@ row_ins_scan_sec_index_for_duplicate( ...@@ -1725,23 +1725,15 @@ row_ins_scan_sec_index_for_duplicate(
thr_get_trx(thr)->error_info = index; thr_get_trx(thr)->error_info = index;
break; goto end_scan;
} }
} else {
ut_a(cmp < 0);
goto end_scan;
} }
} while (btr_pcur_move_to_next(&pcur, &mtr));
if (cmp < 0) { end_scan:
break;
}
ut_a(cmp == 0);
next_rec:
moved = btr_pcur_move_to_next(&pcur, &mtr);
if (!moved) {
break;
}
}
if (UNIV_LIKELY_NULL(heap)) { if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap); mem_heap_free(heap);
} }
...@@ -1837,7 +1829,11 @@ row_ins_duplicate_error_in_clust( ...@@ -1837,7 +1829,11 @@ row_ins_duplicate_error_in_clust(
cursor->index, offsets, thr); cursor->index, offsets, thr);
} }
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto func_exit; goto func_exit;
} }
...@@ -1875,7 +1871,11 @@ row_ins_duplicate_error_in_clust( ...@@ -1875,7 +1871,11 @@ row_ins_duplicate_error_in_clust(
cursor->index, offsets, thr); cursor->index, offsets, thr);
} }
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto func_exit; goto func_exit;
} }
......
...@@ -1455,22 +1455,20 @@ row_update_for_mysql( ...@@ -1455,22 +1455,20 @@ row_update_for_mysql(
} }
/************************************************************************* /*************************************************************************
This can only be used when srv_locks_unsafe_for_binlog is TRUE or This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
this session is using a READ COMMITTED isolation level. Before session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
calling this function we must use trx_reset_new_rec_lock_info() and Before calling this function row_search_for_mysql() must have
trx_register_new_rec_lock() to store the information which new record locks initialized prebuilt->new_rec_locks to store the information which new
really were set. This function removes a newly set lock under prebuilt->pcur, record locks really were set. This function removes a newly set
and also under prebuilt->clust_pcur. Currently, this is only used and tested clustered index record lock under prebuilt->pcur or
in the case of an UPDATE or a DELETE statement, where the row lock is of the prebuilt->clust_pcur. Thus, this implements a 'mini-rollback' that
LOCK_X type. releases the latest clustered index record lock we set. */
Thus, this implements a 'mini-rollback' that releases the latest record
locks we set. */
int int
row_unlock_for_mysql( row_unlock_for_mysql(
/*=================*/ /*=================*/
/* out: error code or DB_SUCCESS */ /* out: error code or DB_SUCCESS */
row_prebuilt_t* prebuilt, /* in: prebuilt struct in MySQL row_prebuilt_t* prebuilt, /* in/out: prebuilt struct in MySQL
handle */ handle */
ibool has_latches_on_recs)/* TRUE if called so that we have ibool has_latches_on_recs)/* TRUE if called so that we have
the latches on the records under pcur the latches on the records under pcur
......
...@@ -754,8 +754,14 @@ row_sel_get_clust_rec( ...@@ -754,8 +754,14 @@ row_sel_get_clust_rec(
0, clust_rec, index, offsets, 0, clust_rec, index, offsets,
node->row_lock_mode, lock_type, thr); node->row_lock_mode, lock_type, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS:
case DB_SUCCESS_LOCKED_REC:
/* Declare the variable uninitialized in Valgrind.
It should be set to DB_SUCCESS at func_exit. */
UNIV_MEM_INVALID(&err, sizeof err);
break;
default:
goto err_exit; goto err_exit;
} }
} else { } else {
...@@ -826,7 +832,8 @@ UNIV_INLINE ...@@ -826,7 +832,8 @@ UNIV_INLINE
ulint ulint
sel_set_rec_lock( sel_set_rec_lock(
/*=============*/ /*=============*/
/* out: DB_SUCCESS or error code */ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
or error code */
rec_t* rec, /* in: record */ rec_t* rec, /* in: record */
dict_index_t* index, /* in: index */ dict_index_t* index, /* in: index */
const ulint* offsets,/* in: rec_get_offsets(rec, index) */ const ulint* offsets,/* in: rec_get_offsets(rec, index) */
...@@ -1374,11 +1381,15 @@ row_sel( ...@@ -1374,11 +1381,15 @@ row_sel(
node->row_lock_mode, node->row_lock_mode,
lock_type, thr); lock_type, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
/* Note that in this case we will store in pcur /* Note that in this case we will store in pcur
the PREDECESSOR of the record we are waiting the PREDECESSOR of the record we are waiting
the lock for */ the lock for */
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
...@@ -1429,8 +1440,12 @@ row_sel( ...@@ -1429,8 +1440,12 @@ row_sel(
err = sel_set_rec_lock(rec, index, offsets, err = sel_set_rec_lock(rec, index, offsets,
node->row_lock_mode, lock_type, thr); node->row_lock_mode, lock_type, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
...@@ -2745,7 +2760,8 @@ static ...@@ -2745,7 +2760,8 @@ static
ulint ulint
row_sel_get_clust_rec_for_mysql( row_sel_get_clust_rec_for_mysql(
/*============================*/ /*============================*/
/* out: DB_SUCCESS or error code */ /* out: DB_SUCCESS, DB_SUCCESS_LOCKED_REC,
or error code */
row_prebuilt_t* prebuilt,/* in: prebuilt struct in the handle */ row_prebuilt_t* prebuilt,/* in: prebuilt struct in the handle */
dict_index_t* sec_index,/* in: secondary index where rec resides */ dict_index_t* sec_index,/* in: secondary index where rec resides */
rec_t* rec, /* in: record in a non-clustered index; if rec_t* rec, /* in: record in a non-clustered index; if
...@@ -2826,6 +2842,7 @@ row_sel_get_clust_rec_for_mysql( ...@@ -2826,6 +2842,7 @@ row_sel_get_clust_rec_for_mysql(
clust_rec = NULL; clust_rec = NULL;
err = DB_SUCCESS;
goto func_exit; goto func_exit;
} }
...@@ -2840,8 +2857,11 @@ row_sel_get_clust_rec_for_mysql( ...@@ -2840,8 +2857,11 @@ row_sel_get_clust_rec_for_mysql(
err = lock_clust_rec_read_check_and_lock( err = lock_clust_rec_read_check_and_lock(
0, clust_rec, clust_index, *offsets, 0, clust_rec, clust_index, *offsets,
prebuilt->select_lock_type, LOCK_REC_NOT_GAP, thr); prebuilt->select_lock_type, LOCK_REC_NOT_GAP, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS:
case DB_SUCCESS_LOCKED_REC:
break;
default:
goto err_exit; goto err_exit;
} }
} else { } else {
...@@ -2900,6 +2920,8 @@ row_sel_get_clust_rec_for_mysql( ...@@ -2900,6 +2920,8 @@ row_sel_get_clust_rec_for_mysql(
rec, sec_index, clust_rec, clust_index)); rec, sec_index, clust_rec, clust_index));
#endif #endif
} }
err = DB_SUCCESS;
} }
func_exit: func_exit:
...@@ -2912,7 +2934,6 @@ row_sel_get_clust_rec_for_mysql( ...@@ -2912,7 +2934,6 @@ row_sel_get_clust_rec_for_mysql(
btr_pcur_store_position(prebuilt->clust_pcur, mtr); btr_pcur_store_position(prebuilt->clust_pcur, mtr);
} }
err = DB_SUCCESS;
err_exit: err_exit:
return(err); return(err);
} }
...@@ -3626,8 +3647,12 @@ row_search_for_mysql( ...@@ -3626,8 +3647,12 @@ row_search_for_mysql(
prebuilt->select_lock_type, prebuilt->select_lock_type,
LOCK_GAP, thr); LOCK_GAP, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
...@@ -3724,8 +3749,12 @@ row_search_for_mysql( ...@@ -3724,8 +3749,12 @@ row_search_for_mysql(
prebuilt->select_lock_type, prebuilt->select_lock_type,
LOCK_ORDINARY, thr); LOCK_ORDINARY, thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
...@@ -3856,8 +3885,11 @@ row_search_for_mysql( ...@@ -3856,8 +3885,11 @@ row_search_for_mysql(
prebuilt->select_lock_type, LOCK_GAP, prebuilt->select_lock_type, LOCK_GAP,
thr); thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
...@@ -3891,8 +3923,11 @@ row_search_for_mysql( ...@@ -3891,8 +3923,11 @@ row_search_for_mysql(
prebuilt->select_lock_type, LOCK_GAP, prebuilt->select_lock_type, LOCK_GAP,
thr); thr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
} }
...@@ -3961,15 +3996,21 @@ row_search_for_mysql( ...@@ -3961,15 +3996,21 @@ row_search_for_mysql(
switch (err) { switch (err) {
rec_t* old_vers; rec_t* old_vers;
case DB_SUCCESS: case DB_SUCCESS_LOCKED_REC:
if (srv_locks_unsafe_for_binlog if (srv_locks_unsafe_for_binlog
|| trx->isolation_level <= TRX_ISO_READ_COMMITTED) { || trx->isolation_level
<= TRX_ISO_READ_COMMITTED) {
/* Note that a record of /* Note that a record of
prebuilt->index was locked. */ prebuilt->index was locked. */
prebuilt->new_rec_locks = 1; prebuilt->new_rec_locks = 1;
} }
err = DB_SUCCESS;
case DB_SUCCESS:
break; break;
case DB_LOCK_WAIT: case DB_LOCK_WAIT:
/* Never unlock rows that were part of a conflict. */
prebuilt->new_rec_locks = 0;
if (UNIV_LIKELY(prebuilt->row_read_type if (UNIV_LIKELY(prebuilt->row_read_type
!= ROW_READ_TRY_SEMI_CONSISTENT) != ROW_READ_TRY_SEMI_CONSISTENT)
|| unique_search || unique_search
...@@ -3999,7 +4040,6 @@ row_search_for_mysql( ...@@ -3999,7 +4040,6 @@ row_search_for_mysql(
if (UNIV_LIKELY(trx->wait_lock != NULL)) { if (UNIV_LIKELY(trx->wait_lock != NULL)) {
lock_cancel_waiting_and_release( lock_cancel_waiting_and_release(
trx->wait_lock); trx->wait_lock);
prebuilt->new_rec_locks = 0;
} else { } else {
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
...@@ -4011,9 +4051,6 @@ row_search_for_mysql( ...@@ -4011,9 +4051,6 @@ row_search_for_mysql(
ULINT_UNDEFINED, ULINT_UNDEFINED,
&heap); &heap);
err = DB_SUCCESS; err = DB_SUCCESS;
/* Note that a record of
prebuilt->index was locked. */
prebuilt->new_rec_locks = 1;
break; break;
} }
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
...@@ -4151,27 +4188,30 @@ row_search_for_mysql( ...@@ -4151,27 +4188,30 @@ row_search_for_mysql(
err = row_sel_get_clust_rec_for_mysql(prebuilt, index, rec, err = row_sel_get_clust_rec_for_mysql(prebuilt, index, rec,
thr, &clust_rec, thr, &clust_rec,
&offsets, &heap, &mtr); &offsets, &heap, &mtr);
if (err != DB_SUCCESS) { switch (err) {
case DB_SUCCESS:
if (clust_rec == NULL) {
/* The record did not exist in the read view */
ut_ad(prebuilt->select_lock_type == LOCK_NONE);
goto next_rec;
}
break;
case DB_SUCCESS_LOCKED_REC:
ut_a(clust_rec != NULL);
if (srv_locks_unsafe_for_binlog
|| trx->isolation_level
<= TRX_ISO_READ_COMMITTED) {
/* Note that the clustered index record
was locked. */
prebuilt->new_rec_locks = 2;
}
err = DB_SUCCESS;
break;
default:
goto lock_wait_or_error; goto lock_wait_or_error;
} }
if (clust_rec == NULL) {
/* The record did not exist in the read view */
ut_ad(prebuilt->select_lock_type == LOCK_NONE);
goto next_rec;
}
if ((srv_locks_unsafe_for_binlog
|| trx->isolation_level <= TRX_ISO_READ_COMMITTED)
&& prebuilt->select_lock_type != LOCK_NONE) {
/* Note that both the secondary index record
and the clustered index record were locked. */
ut_ad(prebuilt->new_rec_locks == 1);
prebuilt->new_rec_locks = 2;
}
if (UNIV_UNLIKELY(rec_get_deleted_flag(clust_rec, comp))) { if (UNIV_UNLIKELY(rec_get_deleted_flag(clust_rec, comp))) {
/* The record is delete marked: we can skip it */ /* The record is delete marked: we can skip it */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment