Commit c84c4b10 authored by Marko Mäkelä's avatar Marko Mäkelä

Bug#53674: InnoDB: Error: unlock row could not find a 4 mode lock on the record

In semi-consistent read, only unlock freshly locked non-matching records.

lock_rec_lock_fast(): Return LOCK_REC_SUCCESS,
LOCK_REC_SUCCESS_CREATED, or LOCK_REC_FAIL instead of TRUE/FALSE.

enum db_err: Add DB_SUCCESS_LOCKED_REC for indicating a successful
operation where a record lock was created.

lock_sec_rec_read_check_and_lock(),
lock_clust_rec_read_check_and_lock(), lock_rec_enqueue_waiting(),
lock_rec_lock_slow(), lock_rec_lock(), row_ins_set_shared_rec_lock(),
row_ins_set_exclusive_rec_lock(), sel_set_rec_lock(),
row_sel_get_clust_rec_for_mysql(): Return DB_SUCCESS_LOCKED_REC if a
new record lock was created. Adjust callers.

row_unlock_for_mysql(): Correct the function documentation.

row_prebuilt_t::new_rec_locks: Correct the documentation.
parent 306e1338
create table bug53674(a int)engine=innodb;
insert into bug53674 values (1),(2);
start transaction;
select * from bug53674 for update;
a
1
2
select * from bug53674 where a=(select a from bug53674 where a > 1);
a
2
drop table bug53674;
--log-bin --innodb-locks-unsafe-for-binlog --binlog-format=mixed
-- source include/have_innodb_plugin.inc
create table bug53674(a int)engine=innodb;
insert into bug53674 values (1),(2);
start transaction;
select * from bug53674 for update;
select * from bug53674 where a=(select a from bug53674 where a > 1);
drop table bug53674;
......@@ -28,6 +28,8 @@ Created 5/24/1996 Heikki Tuuri
enum db_err {
DB_SUCCESS_LOCKED_REC = 9, /*!< like DB_SUCCESS, but a new
explicit record lock was created */
DB_SUCCESS = 10,
/* The following are error codes */
......
......@@ -340,11 +340,12 @@ lock_sec_rec_modify_check_and_lock(
que_thr_t* thr, /*!< in: query thread */
mtr_t* mtr); /*!< in/out: mini-transaction */
/*********************************************************************//**
Like the counterpart for a clustered index below, but now we read a
Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record.
@return DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
UNIV_INTERN
ulint
enum db_err
lock_sec_rec_read_check_and_lock(
/*=============================*/
ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG
......@@ -371,9 +372,10 @@ if the query thread should anyway be suspended for some reason; if not, then
puts the transaction and the query thread to the lock wait state and inserts a
waiting request for a record lock to the lock queue. Sets the requested mode
lock on the record.
@return DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
UNIV_INTERN
ulint
enum db_err
lock_clust_rec_read_check_and_lock(
/*===============================*/
ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG
......
......@@ -264,27 +264,26 @@ row_update_for_mysql(
row_prebuilt_t* prebuilt); /*!< in: prebuilt struct in MySQL
handle */
/*********************************************************************//**
This can only be used when srv_locks_unsafe_for_binlog is TRUE or
session is using a READ COMMITTED isolation level. Before
calling this function we must use trx_reset_new_rec_lock_info() and
trx_register_new_rec_lock() to store the information which new record locks
really were set. This function removes a newly set lock under prebuilt->pcur,
and also under prebuilt->clust_pcur. Currently, this is only used and tested
in the case of an UPDATE or a DELETE statement, where the row lock is of the
LOCK_X type.
Thus, this implements a 'mini-rollback' that releases the latest record
locks we set.
@return error code or DB_SUCCESS */
This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
Before calling this function row_search_for_mysql() must have
initialized prebuilt->new_rec_locks to store the information which new
record locks really were set. This function removes a newly set
clustered index record lock under prebuilt->pcur or
prebuilt->clust_pcur. Thus, this implements a 'mini-rollback' that
releases the latest clustered index record lock we set.
@return error code or DB_SUCCESS */
UNIV_INTERN
int
row_unlock_for_mysql(
/*=================*/
row_prebuilt_t* prebuilt, /*!< in: prebuilt struct in MySQL
row_prebuilt_t* prebuilt, /*!< in/out: prebuilt struct in MySQL
handle */
ibool has_latches_on_recs);/*!< TRUE if called so that we have
the latches on the records under pcur
and clust_pcur, and we do not need to
reposition the cursors. */
ibool has_latches_on_recs);/*!< in: TRUE if called
so that we have the latches on
the records under pcur and
clust_pcur, and we do not need
to reposition the cursors. */
/*********************************************************************//**
Creates an query graph node of 'update' type to be used in the MySQL
interface.
......@@ -702,18 +701,17 @@ struct row_prebuilt_struct {
ulint new_rec_locks; /*!< normally 0; if
srv_locks_unsafe_for_binlog is
TRUE or session is using READ
COMMITTED isolation level, in a
cursor search, if we set a new
record lock on an index, this is
incremented; this is used in
releasing the locks under the
cursors if we are performing an
UPDATE and we determine after
retrieving the row that it does
not need to be locked; thus,
these can be used to implement a
'mini-rollback' that releases
the latest record locks */
COMMITTED or READ UNCOMMITTED
isolation level, set in
row_search_for_mysql() if we set a new
record lock on the secondary
or clustered index; this is
used in row_unlock_for_mysql()
when releasing the lock under
the cursor if we determine
after retrieving the row that
it does not need to be locked
('mini-rollback') */
ulint mysql_prefix_len;/*!< byte offset of the end of
the last requested column */
ulint mysql_row_len; /*!< length in bytes of a row in the
......
......@@ -1733,11 +1733,11 @@ lock_rec_create(
Enqueues a waiting request for a lock which cannot be granted immediately.
Checks for deadlocks.
@return DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED, or
DB_SUCCESS; DB_SUCCESS means that there was a deadlock, but another
transaction was chosen as a victim, and we got the lock immediately:
no need to wait then */
DB_SUCCESS_LOCKED_REC; DB_SUCCESS_LOCKED_REC means that
there was a deadlock, but another transaction was chosen as a victim,
and we got the lock immediately: no need to wait then */
static
ulint
enum db_err
lock_rec_enqueue_waiting(
/*=====================*/
ulint type_mode,/*!< in: lock mode this
......@@ -1809,7 +1809,7 @@ lock_rec_enqueue_waiting(
if (trx->wait_lock == NULL) {
return(DB_SUCCESS);
return(DB_SUCCESS_LOCKED_REC);
}
trx->que_state = TRX_QUE_LOCK_WAIT;
......@@ -1925,6 +1925,16 @@ lock_rec_add_to_queue(
return(lock_rec_create(type_mode, block, heap_no, index, trx));
}
/** Record locking request status */
enum lock_rec_req_status {
/** Failed to acquire a lock */
LOCK_REC_FAIL,
/** Succeeded in acquiring a lock (implicit or already acquired) */
LOCK_REC_SUCCESS,
/** Explicitly created a new lock */
LOCK_REC_SUCCESS_CREATED
};
/*********************************************************************//**
This is a fast routine for locking a record in the most common cases:
there are no explicit locks on the page, or there is just one lock, owned
......@@ -1932,9 +1942,9 @@ by this transaction, and of the right type_mode. This is a low-level function
which does NOT look at implicit locks! Checks lock compatibility within
explicit locks. This function sets a normal next-key lock, or in the case of
a page supremum record, a gap type lock.
@return TRUE if locking succeeded */
@return whether the locking succeeded */
UNIV_INLINE
ibool
enum lock_rec_req_status
lock_rec_lock_fast(
/*===============*/
ibool impl, /*!< in: if TRUE, no lock is set
......@@ -1973,19 +1983,19 @@ lock_rec_lock_fast(
lock_rec_create(mode, block, heap_no, index, trx);
}
return(TRUE);
return(LOCK_REC_SUCCESS_CREATED);
}
if (lock_rec_get_next_on_page(lock)) {
return(FALSE);
return(LOCK_REC_FAIL);
}
if (lock->trx != trx
|| lock->type_mode != (mode | LOCK_REC)
|| lock_rec_get_n_bits(lock) <= heap_no) {
return(FALSE);
return(LOCK_REC_FAIL);
}
if (!impl) {
......@@ -1994,10 +2004,11 @@ lock_rec_lock_fast(
if (!lock_rec_get_nth_bit(lock, heap_no)) {
lock_rec_set_nth_bit(lock, heap_no);
return(LOCK_REC_SUCCESS_CREATED);
}
}
return(TRUE);
return(LOCK_REC_SUCCESS);
}
/*********************************************************************//**
......@@ -2005,9 +2016,10 @@ This is the general, and slower, routine for locking a record. This is a
low-level function which does NOT look at implicit locks! Checks lock
compatibility within explicit locks. This function sets a normal next-key
lock, or in the case of a page supremum record, a gap type lock.
@return DB_SUCCESS, DB_LOCK_WAIT, or error code */
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
static
ulint
enum db_err
lock_rec_lock_slow(
/*===============*/
ibool impl, /*!< in: if TRUE, no lock is set
......@@ -2024,7 +2036,6 @@ lock_rec_lock_slow(
que_thr_t* thr) /*!< in: query thread */
{
trx_t* trx;
ulint err;
ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
......@@ -2043,27 +2054,23 @@ lock_rec_lock_slow(
/* The trx already has a strong enough lock on rec: do
nothing */
err = DB_SUCCESS;
} else if (lock_rec_other_has_conflicting(mode, block, heap_no, trx)) {
/* If another transaction has a non-gap conflicting request in
the queue, as this transaction does not have a lock strong
enough already granted on the record, we have to wait. */
err = lock_rec_enqueue_waiting(mode, block, heap_no,
index, thr);
} else {
if (!impl) {
/* Set the requested lock on the record */
return(lock_rec_enqueue_waiting(mode, block, heap_no,
index, thr));
} else if (!impl) {
/* Set the requested lock on the record */
lock_rec_add_to_queue(LOCK_REC | mode, block,
heap_no, index, trx);
}
err = DB_SUCCESS;
lock_rec_add_to_queue(LOCK_REC | mode, block,
heap_no, index, trx);
return(DB_SUCCESS_LOCKED_REC);
}
return(err);
return(DB_SUCCESS);
}
/*********************************************************************//**
......@@ -2072,9 +2079,10 @@ possible, enqueues a waiting lock request. This is a low-level function
which does NOT look at implicit locks! Checks lock compatibility within
explicit locks. This function sets a normal next-key lock, or in the case
of a page supremum record, a gap type lock.
@return DB_SUCCESS, DB_LOCK_WAIT, or error code */
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
static
ulint
enum db_err
lock_rec_lock(
/*==========*/
ibool impl, /*!< in: if TRUE, no lock is set
......@@ -2090,8 +2098,6 @@ lock_rec_lock(
dict_index_t* index, /*!< in: index of record */
que_thr_t* thr) /*!< in: query thread */
{
ulint err;
ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
|| lock_table_has(thr_get_trx(thr), index->table, LOCK_IS));
......@@ -2103,18 +2109,20 @@ lock_rec_lock(
|| mode - (LOCK_MODE_MASK & mode) == LOCK_REC_NOT_GAP
|| mode - (LOCK_MODE_MASK & mode) == 0);
if (lock_rec_lock_fast(impl, mode, block, heap_no, index, thr)) {
/* We try a simplified and faster subroutine for the most
common cases */
err = DB_SUCCESS;
} else {
err = lock_rec_lock_slow(impl, mode, block,
heap_no, index, thr);
/* We try a simplified and faster subroutine for the most
common cases */
switch (lock_rec_lock_fast(impl, mode, block, heap_no, index, thr)) {
case LOCK_REC_SUCCESS:
return(DB_SUCCESS);
case LOCK_REC_SUCCESS_CREATED:
return(DB_SUCCESS_LOCKED_REC);
case LOCK_REC_FAIL:
return(lock_rec_lock_slow(impl, mode, block,
heap_no, index, thr));
}
return(err);
ut_error;
return(DB_ERROR);
}
/*********************************************************************//**
......@@ -5072,7 +5080,14 @@ lock_rec_insert_check_and_lock(
lock_mutex_exit_kernel();
if ((err == DB_SUCCESS) && !dict_index_is_clust(index)) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
/* fall through */
case DB_SUCCESS:
if (dict_index_is_clust(index)) {
break;
}
/* Update the page max trx id field */
page_update_max_trx_id(block,
buf_block_get_page_zip(block),
......@@ -5195,6 +5210,10 @@ lock_clust_rec_modify_check_and_lock(
ut_ad(lock_rec_queue_validate(block, rec, index, offsets));
if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
err = DB_SUCCESS;
}
return(err);
}
......@@ -5261,22 +5280,27 @@ lock_sec_rec_modify_check_and_lock(
}
#endif /* UNIV_DEBUG */
if (err == DB_SUCCESS) {
if (err == DB_SUCCESS || err == DB_SUCCESS_LOCKED_REC) {
/* Update the page max trx id field */
/* It might not be necessary to do this if
err == DB_SUCCESS (no new lock created),
but it should not cost too much performance. */
page_update_max_trx_id(block,
buf_block_get_page_zip(block),
thr_get_trx(thr)->id, mtr);
err = DB_SUCCESS;
}
return(err);
}
/*********************************************************************//**
Like the counterpart for a clustered index below, but now we read a
Like lock_clust_rec_read_check_and_lock(), but reads a
secondary index record.
@return DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
UNIV_INTERN
ulint
enum db_err
lock_sec_rec_read_check_and_lock(
/*=============================*/
ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG
......@@ -5297,8 +5321,8 @@ lock_sec_rec_read_check_and_lock(
LOCK_REC_NOT_GAP */
que_thr_t* thr) /*!< in: query thread */
{
ulint err;
ulint heap_no;
enum db_err err;
ulint heap_no;
ut_ad(!dict_index_is_clust(index));
ut_ad(block->frame == page_align(rec));
......@@ -5349,9 +5373,10 @@ if the query thread should anyway be suspended for some reason; if not, then
puts the transaction and the query thread to the lock wait state and inserts a
waiting request for a record lock to the lock queue. Sets the requested mode
lock on the record.
@return DB_SUCCESS, DB_LOCK_WAIT, DB_DEADLOCK, or DB_QUE_THR_SUSPENDED */
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, DB_LOCK_WAIT, DB_DEADLOCK,
or DB_QUE_THR_SUSPENDED */
UNIV_INTERN
ulint
enum db_err
lock_clust_rec_read_check_and_lock(
/*===============================*/
ulint flags, /*!< in: if BTR_NO_LOCKING_FLAG
......@@ -5372,8 +5397,8 @@ lock_clust_rec_read_check_and_lock(
LOCK_REC_NOT_GAP */
que_thr_t* thr) /*!< in: query thread */
{
ulint err;
ulint heap_no;
enum db_err err;
ulint heap_no;
ut_ad(dict_index_is_clust(index));
ut_ad(block->frame == page_align(rec));
......@@ -5444,17 +5469,22 @@ lock_clust_rec_read_check_and_lock_alt(
mem_heap_t* tmp_heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
ulint ret;
ulint err;
rec_offs_init(offsets_);
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &tmp_heap);
ret = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
err = lock_clust_rec_read_check_and_lock(flags, block, rec, index,
offsets, mode, gap_mode, thr);
if (tmp_heap) {
mem_heap_free(tmp_heap);
}
return(ret);
if (UNIV_UNLIKELY(err == DB_SUCCESS_LOCKED_REC)) {
err = DB_SUCCESS;
}
return(err);
}
/*******************************************************************//**
......
......@@ -1121,9 +1121,9 @@ row_ins_foreign_check_on_constraint(
/*********************************************************************//**
Sets a shared lock on a record. Used in locking possible duplicate key
records and also in checking foreign key constraints.
@return DB_SUCCESS or error code */
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
static
ulint
enum db_err
row_ins_set_shared_rec_lock(
/*========================*/
ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
......@@ -1134,7 +1134,7 @@ row_ins_set_shared_rec_lock(
const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
que_thr_t* thr) /*!< in: query thread */
{
ulint err;
enum db_err err;
ut_ad(rec_offs_validate(rec, index, offsets));
......@@ -1152,9 +1152,9 @@ row_ins_set_shared_rec_lock(
/*********************************************************************//**
Sets a exclusive lock on a record. Used in locking possible duplicate key
records
@return DB_SUCCESS or error code */
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
static
ulint
enum db_err
row_ins_set_exclusive_rec_lock(
/*===========================*/
ulint type, /*!< in: LOCK_ORDINARY, LOCK_GAP, or
......@@ -1165,7 +1165,7 @@ row_ins_set_exclusive_rec_lock(
const ulint* offsets,/*!< in: rec_get_offsets(rec, index) */
que_thr_t* thr) /*!< in: query thread */
{
ulint err;
enum db_err err;
ut_ad(rec_offs_validate(rec, index, offsets));
......@@ -1205,7 +1205,6 @@ row_ins_check_foreign_constraint(
dict_index_t* check_index;
ulint n_fields_cmp;
btr_pcur_t pcur;
ibool moved;
int cmp;
ulint err;
ulint i;
......@@ -1336,13 +1335,13 @@ row_ins_check_foreign_constraint(
/* Scan index records and check if there is a matching record */
for (;;) {
do {
const rec_t* rec = btr_pcur_get_rec(&pcur);
const buf_block_t* block = btr_pcur_get_block(&pcur);
if (page_rec_is_infimum(rec)) {
goto next_rec;
continue;
}
offsets = rec_get_offsets(rec, check_index,
......@@ -1353,12 +1352,13 @@ row_ins_check_foreign_constraint(
err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
rec, check_index,
offsets, thr);
if (err != DB_SUCCESS) {
break;
switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
continue;
default:
goto end_scan;
}
goto next_rec;
}
cmp = cmp_dtuple_rec(entry, rec, offsets);
......@@ -1369,9 +1369,12 @@ row_ins_check_foreign_constraint(
err = row_ins_set_shared_rec_lock(
LOCK_ORDINARY, block,
rec, check_index, offsets, thr);
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto end_scan;
}
} else {
/* Found a matching record. Lock only
......@@ -1382,15 +1385,18 @@ row_ins_check_foreign_constraint(
LOCK_REC_NOT_GAP, block,
rec, check_index, offsets, thr);
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto end_scan;
}
if (check_ref) {
err = DB_SUCCESS;
break;
goto end_scan;
} else if (foreign->type != 0) {
/* There is an ON UPDATE or ON DELETE
condition: check them in a separate
......@@ -1416,7 +1422,7 @@ row_ins_check_foreign_constraint(
err = DB_FOREIGN_DUPLICATE_KEY;
}
break;
goto end_scan;
}
/* row_ins_foreign_check_on_constraint
......@@ -1429,49 +1435,41 @@ row_ins_check_foreign_constraint(
thr, foreign, rec, entry);
err = DB_ROW_IS_REFERENCED;
break;
goto end_scan;
}
}
}
} else {
ut_a(cmp < 0);
if (cmp < 0) {
err = row_ins_set_shared_rec_lock(
LOCK_GAP, block,
rec, check_index, offsets, thr);
if (err != DB_SUCCESS) {
break;
}
if (check_ref) {
err = DB_NO_REFERENCED_ROW;
row_ins_foreign_report_add_err(
trx, foreign, rec, entry);
} else {
err = DB_SUCCESS;
switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
if (check_ref) {
err = DB_NO_REFERENCED_ROW;
row_ins_foreign_report_add_err(
trx, foreign, rec, entry);
} else {
err = DB_SUCCESS;
}
}
break;
goto end_scan;
}
} while (btr_pcur_move_to_next(&pcur, &mtr));
ut_a(cmp == 0);
next_rec:
moved = btr_pcur_move_to_next(&pcur, &mtr);
if (!moved) {
if (check_ref) {
rec = btr_pcur_get_rec(&pcur);
row_ins_foreign_report_add_err(
trx, foreign, rec, entry);
err = DB_NO_REFERENCED_ROW;
} else {
err = DB_SUCCESS;
}
break;
}
if (check_ref) {
row_ins_foreign_report_add_err(
trx, foreign, btr_pcur_get_rec(&pcur), entry);
err = DB_NO_REFERENCED_ROW;
} else {
err = DB_SUCCESS;
}
end_scan:
btr_pcur_close(&pcur);
mtr_commit(&mtr);
......@@ -1719,9 +1717,13 @@ row_ins_scan_sec_index_for_duplicate(
rec, index, offsets, thr);
}
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
goto end_scan;
}
if (page_rec_is_supremum(rec)) {
......@@ -1738,17 +1740,15 @@ row_ins_scan_sec_index_for_duplicate(
thr_get_trx(thr)->error_info = index;
break;
goto end_scan;
}
} else {
ut_a(cmp < 0);
goto end_scan;
}
if (cmp < 0) {
break;
}
ut_a(cmp == 0);
} while (btr_pcur_move_to_next(&pcur, &mtr));
end_scan:
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
......@@ -1837,7 +1837,11 @@ row_ins_duplicate_error_in_clust(
cursor->index, offsets, thr);
}
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto func_exit;
}
......@@ -1877,7 +1881,11 @@ row_ins_duplicate_error_in_clust(
rec, cursor->index, offsets, thr);
}
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto func_exit;
}
......
......@@ -1430,27 +1430,26 @@ row_update_for_mysql(
}
/*********************************************************************//**
This can only be used when srv_locks_unsafe_for_binlog is TRUE or
this session is using a READ COMMITTED isolation level. Before
calling this function we must use trx_reset_new_rec_lock_info() and
trx_register_new_rec_lock() to store the information which new record locks
really were set. This function removes a newly set lock under prebuilt->pcur,
and also under prebuilt->clust_pcur. Currently, this is only used and tested
in the case of an UPDATE or a DELETE statement, where the row lock is of the
LOCK_X type.
Thus, this implements a 'mini-rollback' that releases the latest record
locks we set.
@return error code or DB_SUCCESS */
This can only be used when srv_locks_unsafe_for_binlog is TRUE or this
session is using a READ COMMITTED or READ UNCOMMITTED isolation level.
Before calling this function row_search_for_mysql() must have
initialized prebuilt->new_rec_locks to store the information which new
record locks really were set. This function removes a newly set
clustered index record lock under prebuilt->pcur or
prebuilt->clust_pcur. Thus, this implements a 'mini-rollback' that
releases the latest clustered index record lock we set.
@return error code or DB_SUCCESS */
UNIV_INTERN
int
row_unlock_for_mysql(
/*=================*/
row_prebuilt_t* prebuilt, /*!< in: prebuilt struct in MySQL
row_prebuilt_t* prebuilt, /*!< in/out: prebuilt struct in MySQL
handle */
ibool has_latches_on_recs)/*!< TRUE if called so that we have
the latches on the records under pcur
and clust_pcur, and we do not need to
reposition the cursors. */
ibool has_latches_on_recs)/*!< in: TRUE if called so
that we have the latches on
the records under pcur and
clust_pcur, and we do not need
to reposition the cursors. */
{
btr_pcur_t* pcur = prebuilt->pcur;
btr_pcur_t* clust_pcur = prebuilt->clust_pcur;
......
......@@ -863,8 +863,14 @@ row_sel_get_clust_rec(
clust_rec, index, offsets,
node->row_lock_mode, lock_type, thr);
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS:
case DB_SUCCESS_LOCKED_REC:
/* Declare the variable uninitialized in Valgrind.
It should be set to DB_SUCCESS at func_exit. */
UNIV_MEM_INVALID(&err, sizeof err);
break;
default:
goto err_exit;
}
} else {
......@@ -934,9 +940,9 @@ row_sel_get_clust_rec(
/*********************************************************************//**
Sets a lock on a record.
@return DB_SUCCESS or error code */
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
UNIV_INLINE
ulint
enum db_err
sel_set_rec_lock(
/*=============*/
const buf_block_t* block, /*!< in: buffer block of rec */
......@@ -948,8 +954,8 @@ sel_set_rec_lock(
LOC_REC_NOT_GAP */
que_thr_t* thr) /*!< in: query thread */
{
trx_t* trx;
ulint err;
trx_t* trx;
enum db_err err;
trx = thr_get_trx(thr);
......@@ -1482,11 +1488,15 @@ row_sel(
node->row_lock_mode,
lock_type, thr);
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
/* Note that in this case we will store in pcur
the PREDECESSOR of the record we are waiting
the lock for */
goto lock_wait_or_error;
}
}
......@@ -1538,8 +1548,12 @@ row_sel(
rec, index, offsets,
node->row_lock_mode, lock_type, thr);
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error;
}
}
......@@ -2801,9 +2815,9 @@ row_sel_build_prev_vers_for_mysql(
Retrieves the clustered index record corresponding to a record in a
non-clustered index. Does the necessary locking. Used in the MySQL
interface.
@return DB_SUCCESS or error code */
@return DB_SUCCESS, DB_SUCCESS_LOCKED_REC, or error code */
static
ulint
enum db_err
row_sel_get_clust_rec_for_mysql(
/*============================*/
row_prebuilt_t* prebuilt,/*!< in: prebuilt struct in the handle */
......@@ -2830,7 +2844,7 @@ row_sel_get_clust_rec_for_mysql(
dict_index_t* clust_index;
const rec_t* clust_rec;
rec_t* old_vers;
ulint err;
enum db_err err;
trx_t* trx;
*out_rec = NULL;
......@@ -2889,6 +2903,7 @@ row_sel_get_clust_rec_for_mysql(
clust_rec = NULL;
err = DB_SUCCESS;
goto func_exit;
}
......@@ -2904,8 +2919,11 @@ row_sel_get_clust_rec_for_mysql(
0, btr_pcur_get_block(prebuilt->clust_pcur),
clust_rec, clust_index, *offsets,
prebuilt->select_lock_type, LOCK_REC_NOT_GAP, thr);
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS:
case DB_SUCCESS_LOCKED_REC:
break;
default:
goto err_exit;
}
} else {
......@@ -2965,6 +2983,8 @@ row_sel_get_clust_rec_for_mysql(
rec, sec_index, clust_rec, clust_index));
#endif
}
err = DB_SUCCESS;
}
func_exit:
......@@ -2977,7 +2997,6 @@ row_sel_get_clust_rec_for_mysql(
btr_pcur_store_position(prebuilt->clust_pcur, mtr);
}
err = DB_SUCCESS;
err_exit:
return(err);
}
......@@ -3702,8 +3721,12 @@ row_search_for_mysql(
prebuilt->select_lock_type,
LOCK_GAP, thr);
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error;
}
}
......@@ -3801,8 +3824,12 @@ row_search_for_mysql(
prebuilt->select_lock_type,
LOCK_ORDINARY, thr);
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
err = DB_SUCCESS;
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error;
}
}
......@@ -3932,8 +3959,11 @@ row_search_for_mysql(
prebuilt->select_lock_type, LOCK_GAP,
thr);
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error;
}
}
......@@ -3968,8 +3998,11 @@ row_search_for_mysql(
prebuilt->select_lock_type, LOCK_GAP,
thr);
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS_LOCKED_REC:
case DB_SUCCESS:
break;
default:
goto lock_wait_or_error;
}
}
......@@ -4039,15 +4072,21 @@ row_search_for_mysql(
switch (err) {
const rec_t* old_vers;
case DB_SUCCESS:
case DB_SUCCESS_LOCKED_REC:
if (srv_locks_unsafe_for_binlog
|| trx->isolation_level <= TRX_ISO_READ_COMMITTED) {
|| trx->isolation_level
<= TRX_ISO_READ_COMMITTED) {
/* Note that a record of
prebuilt->index was locked. */
prebuilt->new_rec_locks = 1;
}
err = DB_SUCCESS;
case DB_SUCCESS:
break;
case DB_LOCK_WAIT:
/* Never unlock rows that were part of a conflict. */
prebuilt->new_rec_locks = 0;
if (UNIV_LIKELY(prebuilt->row_read_type
!= ROW_READ_TRY_SEMI_CONSISTENT)
|| unique_search
......@@ -4077,7 +4116,6 @@ row_search_for_mysql(
if (UNIV_LIKELY(trx->wait_lock != NULL)) {
lock_cancel_waiting_and_release(
trx->wait_lock);
prebuilt->new_rec_locks = 0;
} else {
mutex_exit(&kernel_mutex);
......@@ -4089,9 +4127,6 @@ row_search_for_mysql(
ULINT_UNDEFINED,
&heap);
err = DB_SUCCESS;
/* Note that a record of
prebuilt->index was locked. */
prebuilt->new_rec_locks = 1;
break;
}
mutex_exit(&kernel_mutex);
......@@ -4228,27 +4263,30 @@ row_search_for_mysql(
err = row_sel_get_clust_rec_for_mysql(prebuilt, index, rec,
thr, &clust_rec,
&offsets, &heap, &mtr);
if (err != DB_SUCCESS) {
switch (err) {
case DB_SUCCESS:
if (clust_rec == NULL) {
/* The record did not exist in the read view */
ut_ad(prebuilt->select_lock_type == LOCK_NONE);
goto next_rec;
}
break;
case DB_SUCCESS_LOCKED_REC:
ut_a(clust_rec != NULL);
if (srv_locks_unsafe_for_binlog
|| trx->isolation_level
<= TRX_ISO_READ_COMMITTED) {
/* Note that the clustered index record
was locked. */
prebuilt->new_rec_locks = 2;
}
err = DB_SUCCESS;
break;
default:
goto lock_wait_or_error;
}
if (clust_rec == NULL) {
/* The record did not exist in the read view */
ut_ad(prebuilt->select_lock_type == LOCK_NONE);
goto next_rec;
}
if ((srv_locks_unsafe_for_binlog
|| trx->isolation_level <= TRX_ISO_READ_COMMITTED)
&& prebuilt->select_lock_type != LOCK_NONE) {
/* Note that both the secondary index record
and the clustered index record were locked. */
ut_ad(prebuilt->new_rec_locks == 1);
prebuilt->new_rec_locks = 2;
}
if (UNIV_UNLIKELY(rec_get_deleted_flag(clust_rec, comp))) {
/* The record is delete marked: we can skip it */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment