Commit 8dd87e69 authored by Yasufumi Kinoshita's avatar Yasufumi Kinoshita

Bug#59354 : Bug #12659252 : ASSERT !OTHER_LOCK AT LOCK_REC_ADD_TO_QUEUE DURING A DELETE OPERATION

The converted implicit lock should wait for the prior conflicting lock if found.

rb://1437 approved by Marko
parent 112a93a7
...@@ -498,6 +498,8 @@ static SHOW_VAR innodb_status_variables[]= { ...@@ -498,6 +498,8 @@ static SHOW_VAR innodb_status_variables[]= {
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
{"purge_trx_id_age", {"purge_trx_id_age",
(char*) &export_vars.innodb_purge_trx_id_age, SHOW_LONG}, (char*) &export_vars.innodb_purge_trx_id_age, SHOW_LONG},
{"purge_view_trx_id_age",
(char*) &export_vars.innodb_purge_view_trx_id_age, SHOW_LONG},
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
{NullS, NullS, SHOW_LONG} {NullS, NullS, SHOW_LONG}
}; };
...@@ -9283,6 +9285,13 @@ static MYSQL_SYSVAR_UINT(limit_optimistic_insert_debug, ...@@ -9283,6 +9285,13 @@ static MYSQL_SYSVAR_UINT(limit_optimistic_insert_debug,
btr_cur_limit_optimistic_insert_debug, PLUGIN_VAR_RQCMDARG, btr_cur_limit_optimistic_insert_debug, PLUGIN_VAR_RQCMDARG,
"Artificially limit the number of records per B-tree page (0=unlimited).", "Artificially limit the number of records per B-tree page (0=unlimited).",
NULL, NULL, 0, 0, UINT_MAX32, 0); NULL, NULL, 0, 0, UINT_MAX32, 0);
static MYSQL_SYSVAR_BOOL(trx_purge_view_update_only_debug,
srv_purge_view_update_only_debug, PLUGIN_VAR_NOCMDARG,
"Pause actual purging any delete-marked records, but merely update the purge view. "
"It is to create artificially the situation the purge view have been updated "
"but the each purges were not done yet.",
NULL, NULL, FALSE);
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
static struct st_mysql_sys_var* innobase_system_variables[]= { static struct st_mysql_sys_var* innobase_system_variables[]= {
...@@ -9333,6 +9342,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= { ...@@ -9333,6 +9342,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
MYSQL_SYSVAR(trx_rseg_n_slots_debug), MYSQL_SYSVAR(trx_rseg_n_slots_debug),
MYSQL_SYSVAR(limit_optimistic_insert_debug), MYSQL_SYSVAR(limit_optimistic_insert_debug),
MYSQL_SYSVAR(trx_purge_view_update_only_debug),
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
NULL NULL
}; };
......
...@@ -685,6 +685,13 @@ extern lock_sys_t* lock_sys; ...@@ -685,6 +685,13 @@ extern lock_sys_t* lock_sys;
remains set when the waiting lock is granted, remains set when the waiting lock is granted,
or if the lock is inherited to a neighboring or if the lock is inherited to a neighboring
record */ record */
#define LOCK_CONV_BY_OTHER 4096 /* this bit is set when the lock is created
by other transaction */
/* Checks if this is a waiting lock created by lock->trx itself.
@param type_mode lock->type_mode
@return whether it is a waiting lock belonging to lock->trx */
#define lock_is_wait_not_by_other(type_mode) \
((type_mode & (LOCK_CONV_BY_OTHER | LOCK_WAIT)) == LOCK_WAIT)
/* When lock bits are reset, the following flags are available: */ /* When lock bits are reset, the following flags are available: */
#define LOCK_RELEASE_WAIT 1 #define LOCK_RELEASE_WAIT 1
......
...@@ -176,6 +176,10 @@ extern ulint srv_fatal_semaphore_wait_threshold; ...@@ -176,6 +176,10 @@ extern ulint srv_fatal_semaphore_wait_threshold;
#define SRV_SEMAPHORE_WAIT_EXTENSION 7200 #define SRV_SEMAPHORE_WAIT_EXTENSION 7200
extern ulint srv_dml_needed_delay; extern ulint srv_dml_needed_delay;
#ifdef UNIV_DEBUG
extern my_bool srv_purge_view_update_only_debug;
#endif /* UNIV_DEBUG */
extern mutex_t* kernel_mutex_temp;/* mutex protecting the server, trx structs, extern mutex_t* kernel_mutex_temp;/* mutex protecting the server, trx structs,
query threads, and lock table: we allocate query threads, and lock table: we allocate
it from dynamic memory to get it to the it from dynamic memory to get it to the
...@@ -571,6 +575,7 @@ struct export_var_struct{ ...@@ -571,6 +575,7 @@ struct export_var_struct{
ulint innodb_rows_deleted; ulint innodb_rows_deleted;
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
ulint innodb_purge_trx_id_age; ulint innodb_purge_trx_id_age;
ulint innodb_purge_view_trx_id_age;
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
}; };
......
...@@ -725,12 +725,16 @@ lock_reset_lock_and_trx_wait( ...@@ -725,12 +725,16 @@ lock_reset_lock_and_trx_wait(
/*=========================*/ /*=========================*/
lock_t* lock) /* in: record lock */ lock_t* lock) /* in: record lock */
{ {
ut_ad((lock->trx)->wait_lock == lock);
ut_ad(lock_get_wait(lock)); ut_ad(lock_get_wait(lock));
/* Reset the back pointer in trx to this waiting lock request */ /* Reset the back pointer in trx to this waiting lock request */
(lock->trx)->wait_lock = NULL; if (!(lock->type_mode & LOCK_CONV_BY_OTHER)) {
ut_ad((lock->trx)->wait_lock == lock);
(lock->trx)->wait_lock = NULL;
} else {
ut_ad(lock_get_type(lock) == LOCK_REC);
}
lock->type_mode = lock->type_mode & ~LOCK_WAIT; lock->type_mode = lock->type_mode & ~LOCK_WAIT;
} }
...@@ -1437,9 +1441,9 @@ lock_rec_has_expl( ...@@ -1437,9 +1441,9 @@ lock_rec_has_expl(
while (lock) { while (lock) {
if (lock->trx == trx if (lock->trx == trx
&& !lock_is_wait_not_by_other(lock->type_mode)
&& lock_mode_stronger_or_eq(lock_get_mode(lock), && lock_mode_stronger_or_eq(lock_get_mode(lock),
precise_mode & LOCK_MODE_MASK) precise_mode & LOCK_MODE_MASK)
&& !lock_get_wait(lock)
&& (!lock_rec_get_rec_not_gap(lock) && (!lock_rec_get_rec_not_gap(lock)
|| (precise_mode & LOCK_REC_NOT_GAP) || (precise_mode & LOCK_REC_NOT_GAP)
|| page_rec_is_supremum(rec)) || page_rec_is_supremum(rec))
...@@ -1723,7 +1727,7 @@ lock_rec_create( ...@@ -1723,7 +1727,7 @@ lock_rec_create(
HASH_INSERT(lock_t, hash, lock_sys->rec_hash, HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
lock_rec_fold(space, page_no), lock); lock_rec_fold(space, page_no), lock);
if (type_mode & LOCK_WAIT) { if (lock_is_wait_not_by_other(type_mode)) {
lock_set_lock_and_trx_wait(lock, trx); lock_set_lock_and_trx_wait(lock, trx);
} }
...@@ -1752,10 +1756,11 @@ lock_rec_enqueue_waiting( ...@@ -1752,10 +1756,11 @@ lock_rec_enqueue_waiting(
lock request is set when performing an lock request is set when performing an
insert of an index record */ insert of an index record */
rec_t* rec, /* in: record */ rec_t* rec, /* in: record */
lock_t* lock, /* in: lock object; NULL if a new
one should be created. */
dict_index_t* index, /* in: index of record */ dict_index_t* index, /* in: index of record */
que_thr_t* thr) /* in: query thread */ que_thr_t* thr) /* in: query thread */
{ {
lock_t* lock;
trx_t* trx; trx_t* trx;
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
...@@ -1785,8 +1790,16 @@ lock_rec_enqueue_waiting( ...@@ -1785,8 +1790,16 @@ lock_rec_enqueue_waiting(
stderr); stderr);
} }
/* Enqueue the lock request that will wait to be granted */ if (lock == NULL) {
lock = lock_rec_create(type_mode | LOCK_WAIT, rec, index, trx); /* Enqueue the lock request that will wait to be granted */
lock = lock_rec_create(type_mode | LOCK_WAIT, rec, index, trx);
} else {
ut_ad(lock->type_mode & LOCK_WAIT);
ut_ad(lock->type_mode & LOCK_CONV_BY_OTHER);
lock->type_mode &= ~LOCK_CONV_BY_OTHER;
lock_set_lock_and_trx_wait(lock, trx);
}
/* Check if a deadlock occurs: if yes, remove the lock request and /* Check if a deadlock occurs: if yes, remove the lock request and
return an error code */ return an error code */
...@@ -2011,6 +2024,7 @@ lock_rec_lock_slow( ...@@ -2011,6 +2024,7 @@ lock_rec_lock_slow(
que_thr_t* thr) /* in: query thread */ que_thr_t* thr) /* in: query thread */
{ {
trx_t* trx; trx_t* trx;
lock_t* lock;
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
...@@ -2025,7 +2039,27 @@ lock_rec_lock_slow( ...@@ -2025,7 +2039,27 @@ lock_rec_lock_slow(
trx = thr_get_trx(thr); trx = thr_get_trx(thr);
if (lock_rec_has_expl(mode, rec, trx)) { lock = lock_rec_has_expl(mode, rec, trx);
if (lock) {
if (lock->type_mode & LOCK_CONV_BY_OTHER) {
/* This lock or lock waiting was created by the other
transaction, not by the transaction (trx) itself.
So, the transaction (trx) should treat it collectly
according as whether granted or not. */
if (lock->type_mode & LOCK_WAIT) {
/* This lock request was not granted yet.
Should wait for granted. */
goto enqueue_waiting;
} else {
/* This lock request was already granted.
Just clearing the flag. */
lock->type_mode &= ~LOCK_CONV_BY_OTHER;
}
}
/* The trx already has a strong enough lock on rec: do /* The trx already has a strong enough lock on rec: do
nothing */ nothing */
...@@ -2035,7 +2069,9 @@ lock_rec_lock_slow( ...@@ -2035,7 +2069,9 @@ lock_rec_lock_slow(
the queue, as this transaction does not have a lock strong the queue, as this transaction does not have a lock strong
enough already granted on the record, we have to wait. */ enough already granted on the record, we have to wait. */
return(lock_rec_enqueue_waiting(mode, rec, index, thr)); ut_ad(lock == NULL);
enqueue_waiting:
return(lock_rec_enqueue_waiting(mode, rec, lock, index, thr));
} else if (!impl) { } else if (!impl) {
/* Set the requested lock on the record */ /* Set the requested lock on the record */
...@@ -2171,7 +2207,8 @@ lock_grant( ...@@ -2171,7 +2207,8 @@ lock_grant(
TRX_QUE_LOCK_WAIT state, and there is no need to end the lock wait TRX_QUE_LOCK_WAIT state, and there is no need to end the lock wait
for it */ for it */
if (lock->trx->que_state == TRX_QUE_LOCK_WAIT) { if (!(lock->type_mode & LOCK_CONV_BY_OTHER)
&& lock->trx->que_state == TRX_QUE_LOCK_WAIT) {
trx_end_lock_wait(lock->trx); trx_end_lock_wait(lock->trx);
} }
} }
...@@ -2188,6 +2225,7 @@ lock_rec_cancel( ...@@ -2188,6 +2225,7 @@ lock_rec_cancel(
{ {
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad(lock_get_type(lock) == LOCK_REC); ut_ad(lock_get_type(lock) == LOCK_REC);
ut_ad(!(lock->type_mode & LOCK_CONV_BY_OTHER));
/* Reset the bit (there can be only one set bit) in the lock bitmap */ /* Reset the bit (there can be only one set bit) in the lock bitmap */
lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock)); lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
...@@ -2331,8 +2369,12 @@ lock_rec_reset_and_release_wait( ...@@ -2331,8 +2369,12 @@ lock_rec_reset_and_release_wait(
lock = lock_rec_get_first(rec); lock = lock_rec_get_first(rec);
while (lock != NULL) { while (lock != NULL) {
if (lock_get_wait(lock)) { if (lock_is_wait_not_by_other(lock->type_mode)) {
lock_rec_cancel(lock); lock_rec_cancel(lock);
} else if (lock_get_wait(lock)) {
/* just reset LOCK_WAIT */
lock_rec_reset_nth_bit(lock, heap_no);
lock_reset_lock_and_trx_wait(lock);
} else { } else {
lock_rec_reset_nth_bit(lock, heap_no); lock_rec_reset_nth_bit(lock, heap_no);
} }
...@@ -3383,6 +3425,7 @@ lock_table_create( ...@@ -3383,6 +3425,7 @@ lock_table_create(
ut_ad(table && trx); ut_ad(table && trx);
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad(!(type_mode & LOCK_CONV_BY_OTHER));
if ((type_mode & LOCK_MODE_MASK) == LOCK_AUTO_INC) { if ((type_mode & LOCK_MODE_MASK) == LOCK_AUTO_INC) {
++table->n_waiting_or_granted_auto_inc_locks; ++table->n_waiting_or_granted_auto_inc_locks;
...@@ -3900,6 +3943,7 @@ lock_cancel_waiting_and_release( ...@@ -3900,6 +3943,7 @@ lock_cancel_waiting_and_release(
lock_t* lock) /* in: waiting lock request */ lock_t* lock) /* in: waiting lock request */
{ {
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad(!(lock->type_mode & LOCK_CONV_BY_OTHER));
if (lock_get_type(lock) == LOCK_REC) { if (lock_get_type(lock) == LOCK_REC) {
...@@ -4871,7 +4915,7 @@ lock_rec_insert_check_and_lock( ...@@ -4871,7 +4915,7 @@ lock_rec_insert_check_and_lock(
/* Note that we may get DB_SUCCESS also here! */ /* Note that we may get DB_SUCCESS also here! */
err = lock_rec_enqueue_waiting(LOCK_X | LOCK_GAP err = lock_rec_enqueue_waiting(LOCK_X | LOCK_GAP
| LOCK_INSERT_INTENTION, | LOCK_INSERT_INTENTION,
next_rec, index, thr); next_rec, NULL, index, thr);
} else { } else {
err = DB_SUCCESS; err = DB_SUCCESS;
} }
...@@ -4941,10 +4985,23 @@ lock_rec_convert_impl_to_expl( ...@@ -4941,10 +4985,23 @@ lock_rec_convert_impl_to_expl(
if (!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, rec, if (!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, rec,
impl_trx)) { impl_trx)) {
ulint type_mode = (LOCK_REC | LOCK_X
| LOCK_REC_NOT_GAP);
/* If the delete-marked record was locked already,
we should reserve lock waiting for impl_trx as
implicit lock. Because cannot lock at this moment.*/
if (rec_get_deleted_flag(rec, rec_offs_comp(offsets))
&& lock_rec_other_has_conflicting(
LOCK_X | LOCK_REC_NOT_GAP,
rec, impl_trx)) {
type_mode |= (LOCK_WAIT | LOCK_CONV_BY_OTHER);
}
lock_rec_add_to_queue( lock_rec_add_to_queue(
LOCK_REC | LOCK_X | LOCK_REC_NOT_GAP, type_mode, rec, index, impl_trx);
rec, index, impl_trx);
} }
} }
} }
......
...@@ -2195,7 +2195,10 @@ row_ins_index_entry( ...@@ -2195,7 +2195,10 @@ row_ins_index_entry(
err = row_ins_index_entry_low(BTR_MODIFY_LEAF, index, entry, err = row_ins_index_entry_low(BTR_MODIFY_LEAF, index, entry,
ext_vec, n_ext_vec, thr); ext_vec, n_ext_vec, thr);
if (err != DB_FAIL) { if (err != DB_FAIL) {
if (index == dict_table_get_first_index(index->table)
&& thr_get_trx(thr)->mysql_thd != 0) {
DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
}
return(err); return(err);
} }
......
...@@ -48,6 +48,10 @@ Created 10/8/1995 Heikki Tuuri ...@@ -48,6 +48,10 @@ Created 10/8/1995 Heikki Tuuri
#include "srv0start.h" #include "srv0start.h"
#include "row0mysql.h" #include "row0mysql.h"
#include "ha_prototypes.h" #include "ha_prototypes.h"
#include "read0read.h"
#include "m_string.h" /* for my_sys.h */
#include "my_sys.h" /* DEBUG_SYNC_C */
/* This is set to TRUE if the MySQL user has set it in MySQL; currently /* This is set to TRUE if the MySQL user has set it in MySQL; currently
affects only FOREIGN KEY definition parsing */ affects only FOREIGN KEY definition parsing */
...@@ -1435,6 +1439,10 @@ srv_suspend_mysql_thread( ...@@ -1435,6 +1439,10 @@ srv_suspend_mysql_thread(
trx = thr_get_trx(thr); trx = thr_get_trx(thr);
if (trx->mysql_thd != 0) {
DEBUG_SYNC_C("srv_suspend_mysql_thread_enter");
}
os_event_set(srv_lock_timeout_thread_event); os_event_set(srv_lock_timeout_thread_event);
mutex_enter(&kernel_mutex); mutex_enter(&kernel_mutex);
...@@ -1920,6 +1928,16 @@ srv_export_innodb_status(void) ...@@ -1920,6 +1928,16 @@ srv_export_innodb_status(void)
export_vars.innodb_purge_trx_id_age = export_vars.innodb_purge_trx_id_age =
ut_dulint_minus(trx_sys->max_trx_id, purge_sys->done_trx_no); ut_dulint_minus(trx_sys->max_trx_id, purge_sys->done_trx_no);
} }
if (!purge_sys->view
|| ut_dulint_cmp(trx_sys->max_trx_id,
purge_sys->view->up_limit_id) < 0) {
export_vars.innodb_purge_view_trx_id_age = 0;
} else {
export_vars.innodb_purge_view_trx_id_age =
ut_dulint_minus(trx_sys->max_trx_id,
purge_sys->view->up_limit_id);
}
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
mutex_exit(&srv_innodb_monitor_mutex); mutex_exit(&srv_innodb_monitor_mutex);
......
...@@ -34,6 +34,10 @@ trx_purge_t* purge_sys = NULL; ...@@ -34,6 +34,10 @@ trx_purge_t* purge_sys = NULL;
which needs no purge */ which needs no purge */
trx_undo_rec_t trx_purge_dummy_rec; trx_undo_rec_t trx_purge_dummy_rec;
#ifdef UNIV_DEBUG
my_bool srv_purge_view_update_only_debug;
#endif /* UNIV_DEBUG */
/********************************************************************* /*********************************************************************
Checks if trx_id is >= purge_view: then it is guaranteed that its update Checks if trx_id is >= purge_view: then it is guaranteed that its update
undo log still exists in the system. */ undo log still exists in the system. */
...@@ -1079,6 +1083,13 @@ trx_purge(void) ...@@ -1079,6 +1083,13 @@ trx_purge(void)
rw_lock_x_unlock(&(purge_sys->latch)); rw_lock_x_unlock(&(purge_sys->latch));
#ifdef UNIV_DEBUG
if (srv_purge_view_update_only_debug) {
mutex_exit(&(purge_sys->mutex));
return(0);
}
#endif
purge_sys->state = TRX_PURGE_ON; purge_sys->state = TRX_PURGE_ON;
/* Handle at most 20 undo log pages in one purge batch */ /* Handle at most 20 undo log pages in one purge batch */
......
...@@ -580,6 +580,8 @@ static SHOW_VAR innodb_status_variables[]= { ...@@ -580,6 +580,8 @@ static SHOW_VAR innodb_status_variables[]= {
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
{"purge_trx_id_age", {"purge_trx_id_age",
(char*) &export_vars.innodb_purge_trx_id_age, SHOW_LONG}, (char*) &export_vars.innodb_purge_trx_id_age, SHOW_LONG},
{"purge_view_trx_id_age",
(char*) &export_vars.innodb_purge_view_trx_id_age, SHOW_LONG},
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
{NullS, NullS, SHOW_LONG} {NullS, NullS, SHOW_LONG}
}; };
...@@ -11271,6 +11273,13 @@ static MYSQL_SYSVAR_UINT(limit_optimistic_insert_debug, ...@@ -11271,6 +11273,13 @@ static MYSQL_SYSVAR_UINT(limit_optimistic_insert_debug,
btr_cur_limit_optimistic_insert_debug, PLUGIN_VAR_RQCMDARG, btr_cur_limit_optimistic_insert_debug, PLUGIN_VAR_RQCMDARG,
"Artificially limit the number of records per B-tree page (0=unlimited).", "Artificially limit the number of records per B-tree page (0=unlimited).",
NULL, NULL, 0, 0, UINT_MAX32, 0); NULL, NULL, 0, 0, UINT_MAX32, 0);
static MYSQL_SYSVAR_BOOL(trx_purge_view_update_only_debug,
srv_purge_view_update_only_debug, PLUGIN_VAR_NOCMDARG,
"Pause actual purging any delete-marked records, but merely update the purge view. "
"It is to create artificially the situation the purge view have been updated "
"but the each purges were not done yet.",
NULL, NULL, FALSE);
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
static struct st_mysql_sys_var* innobase_system_variables[]= { static struct st_mysql_sys_var* innobase_system_variables[]= {
...@@ -11337,6 +11346,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= { ...@@ -11337,6 +11346,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
MYSQL_SYSVAR(trx_rseg_n_slots_debug), MYSQL_SYSVAR(trx_rseg_n_slots_debug),
MYSQL_SYSVAR(limit_optimistic_insert_debug), MYSQL_SYSVAR(limit_optimistic_insert_debug),
MYSQL_SYSVAR(trx_purge_view_update_only_debug),
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
NULL NULL
}; };
......
...@@ -796,14 +796,22 @@ lock_rec_get_page_no( ...@@ -796,14 +796,22 @@ lock_rec_get_page_no(
remains set when the waiting lock is granted, remains set when the waiting lock is granted,
or if the lock is inherited to a neighboring or if the lock is inherited to a neighboring
record */ record */
#if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION)&LOCK_MODE_MASK #define LOCK_CONV_BY_OTHER 4096 /*!< this bit is set when the lock is created
by other transaction */
#if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION|LOCK_CONV_BY_OTHER)&LOCK_MODE_MASK
# error # error
#endif #endif
#if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION)&LOCK_TYPE_MASK #if (LOCK_WAIT|LOCK_GAP|LOCK_REC_NOT_GAP|LOCK_INSERT_INTENTION|LOCK_CONV_BY_OTHER)&LOCK_TYPE_MASK
# error # error
#endif #endif
/* @} */ /* @} */
/** Checks if this is a waiting lock created by lock->trx itself.
@param type_mode lock->type_mode
@return whether it is a waiting lock belonging to lock->trx */
#define lock_is_wait_not_by_other(type_mode) \
((type_mode & (LOCK_CONV_BY_OTHER | LOCK_WAIT)) == LOCK_WAIT)
/** Lock operation struct */ /** Lock operation struct */
typedef struct lock_op_struct lock_op_t; typedef struct lock_op_struct lock_op_t;
/** Lock operation struct */ /** Lock operation struct */
......
...@@ -247,6 +247,10 @@ extern ulint srv_fatal_semaphore_wait_threshold; ...@@ -247,6 +247,10 @@ extern ulint srv_fatal_semaphore_wait_threshold;
#define SRV_SEMAPHORE_WAIT_EXTENSION 7200 #define SRV_SEMAPHORE_WAIT_EXTENSION 7200
extern ulint srv_dml_needed_delay; extern ulint srv_dml_needed_delay;
#ifdef UNIV_DEBUG
extern my_bool srv_purge_view_update_only_debug;
#endif /* UNIV_DEBUG */
extern mutex_t* kernel_mutex_temp;/* mutex protecting the server, trx structs, extern mutex_t* kernel_mutex_temp;/* mutex protecting the server, trx structs,
query threads, and lock table: we allocate query threads, and lock table: we allocate
it from dynamic memory to get it to the it from dynamic memory to get it to the
...@@ -652,6 +656,8 @@ struct export_var_struct{ ...@@ -652,6 +656,8 @@ struct export_var_struct{
ulint innodb_rows_deleted; /*!< srv_n_rows_deleted */ ulint innodb_rows_deleted; /*!< srv_n_rows_deleted */
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
ulint innodb_purge_trx_id_age; /*!< max_trx_id - purged trx_id */ ulint innodb_purge_trx_id_age; /*!< max_trx_id - purged trx_id */
ulint innodb_purge_view_trx_id_age; /*!< rw_max_trx_id
- purged view's min trx_id */
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
}; };
......
...@@ -791,12 +791,16 @@ lock_reset_lock_and_trx_wait( ...@@ -791,12 +791,16 @@ lock_reset_lock_and_trx_wait(
/*=========================*/ /*=========================*/
lock_t* lock) /*!< in: record lock */ lock_t* lock) /*!< in: record lock */
{ {
ut_ad((lock->trx)->wait_lock == lock);
ut_ad(lock_get_wait(lock)); ut_ad(lock_get_wait(lock));
/* Reset the back pointer in trx to this waiting lock request */ /* Reset the back pointer in trx to this waiting lock request */
(lock->trx)->wait_lock = NULL; if (!(lock->type_mode & LOCK_CONV_BY_OTHER)) {
ut_ad((lock->trx)->wait_lock == lock);
(lock->trx)->wait_lock = NULL;
} else {
ut_ad(lock_get_type_low(lock) == LOCK_REC);
}
lock->type_mode &= ~LOCK_WAIT; lock->type_mode &= ~LOCK_WAIT;
} }
...@@ -1420,9 +1424,9 @@ lock_rec_has_expl( ...@@ -1420,9 +1424,9 @@ lock_rec_has_expl(
while (lock) { while (lock) {
if (lock->trx == trx if (lock->trx == trx
&& !lock_is_wait_not_by_other(lock->type_mode)
&& lock_mode_stronger_or_eq(lock_get_mode(lock), && lock_mode_stronger_or_eq(lock_get_mode(lock),
precise_mode & LOCK_MODE_MASK) precise_mode & LOCK_MODE_MASK)
&& !lock_get_wait(lock)
&& (!lock_rec_get_rec_not_gap(lock) && (!lock_rec_get_rec_not_gap(lock)
|| (precise_mode & LOCK_REC_NOT_GAP) || (precise_mode & LOCK_REC_NOT_GAP)
|| heap_no == PAGE_HEAP_NO_SUPREMUM) || heap_no == PAGE_HEAP_NO_SUPREMUM)
...@@ -1721,7 +1725,7 @@ lock_rec_create( ...@@ -1721,7 +1725,7 @@ lock_rec_create(
HASH_INSERT(lock_t, hash, lock_sys->rec_hash, HASH_INSERT(lock_t, hash, lock_sys->rec_hash,
lock_rec_fold(space, page_no), lock); lock_rec_fold(space, page_no), lock);
if (UNIV_UNLIKELY(type_mode & LOCK_WAIT)) { if (lock_is_wait_not_by_other(type_mode)) {
lock_set_lock_and_trx_wait(lock, trx); lock_set_lock_and_trx_wait(lock, trx);
} }
...@@ -1752,10 +1756,11 @@ lock_rec_enqueue_waiting( ...@@ -1752,10 +1756,11 @@ lock_rec_enqueue_waiting(
const buf_block_t* block, /*!< in: buffer block containing const buf_block_t* block, /*!< in: buffer block containing
the record */ the record */
ulint heap_no,/*!< in: heap number of the record */ ulint heap_no,/*!< in: heap number of the record */
lock_t* lock, /*!< in: lock object; NULL if a new
one should be created. */
dict_index_t* index, /*!< in: index of record */ dict_index_t* index, /*!< in: index of record */
que_thr_t* thr) /*!< in: query thread */ que_thr_t* thr) /*!< in: query thread */
{ {
lock_t* lock;
trx_t* trx; trx_t* trx;
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
...@@ -1789,9 +1794,17 @@ lock_rec_enqueue_waiting( ...@@ -1789,9 +1794,17 @@ lock_rec_enqueue_waiting(
stderr); stderr);
} }
/* Enqueue the lock request that will wait to be granted */ if (lock == NULL) {
lock = lock_rec_create(type_mode | LOCK_WAIT, /* Enqueue the lock request that will wait to be granted */
block, heap_no, index, trx); lock = lock_rec_create(type_mode | LOCK_WAIT,
block, heap_no, index, trx);
} else {
ut_ad(lock->type_mode & LOCK_WAIT);
ut_ad(lock->type_mode & LOCK_CONV_BY_OTHER);
lock->type_mode &= ~LOCK_CONV_BY_OTHER;
lock_set_lock_and_trx_wait(lock, trx);
}
/* Check if a deadlock occurs: if yes, remove the lock request and /* Check if a deadlock occurs: if yes, remove the lock request and
return an error code */ return an error code */
...@@ -2036,6 +2049,7 @@ lock_rec_lock_slow( ...@@ -2036,6 +2049,7 @@ lock_rec_lock_slow(
que_thr_t* thr) /*!< in: query thread */ que_thr_t* thr) /*!< in: query thread */
{ {
trx_t* trx; trx_t* trx;
lock_t* lock;
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad((LOCK_MODE_MASK & mode) != LOCK_S ut_ad((LOCK_MODE_MASK & mode) != LOCK_S
...@@ -2050,7 +2064,27 @@ lock_rec_lock_slow( ...@@ -2050,7 +2064,27 @@ lock_rec_lock_slow(
trx = thr_get_trx(thr); trx = thr_get_trx(thr);
if (lock_rec_has_expl(mode, block, heap_no, trx)) { lock = lock_rec_has_expl(mode, block, heap_no, trx);
if (lock) {
if (lock->type_mode & LOCK_CONV_BY_OTHER) {
/* This lock or lock waiting was created by the other
transaction, not by the transaction (trx) itself.
So, the transaction (trx) should treat it collectly
according as whether granted or not. */
if (lock->type_mode & LOCK_WAIT) {
/* This lock request was not granted yet.
Should wait for granted. */
goto enqueue_waiting;
} else {
/* This lock request was already granted.
Just clearing the flag. */
lock->type_mode &= ~LOCK_CONV_BY_OTHER;
}
}
/* The trx already has a strong enough lock on rec: do /* The trx already has a strong enough lock on rec: do
nothing */ nothing */
...@@ -2060,8 +2094,10 @@ lock_rec_lock_slow( ...@@ -2060,8 +2094,10 @@ lock_rec_lock_slow(
the queue, as this transaction does not have a lock strong the queue, as this transaction does not have a lock strong
enough already granted on the record, we have to wait. */ enough already granted on the record, we have to wait. */
ut_ad(lock == NULL);
enqueue_waiting:
return(lock_rec_enqueue_waiting(mode, block, heap_no, return(lock_rec_enqueue_waiting(mode, block, heap_no,
index, thr)); lock, index, thr));
} else if (!impl) { } else if (!impl) {
/* Set the requested lock on the record */ /* Set the requested lock on the record */
...@@ -2203,7 +2239,8 @@ lock_grant( ...@@ -2203,7 +2239,8 @@ lock_grant(
TRX_QUE_LOCK_WAIT state, and there is no need to end the lock wait TRX_QUE_LOCK_WAIT state, and there is no need to end the lock wait
for it */ for it */
if (lock->trx->que_state == TRX_QUE_LOCK_WAIT) { if (!(lock->type_mode & LOCK_CONV_BY_OTHER)
&& lock->trx->que_state == TRX_QUE_LOCK_WAIT) {
trx_end_lock_wait(lock->trx); trx_end_lock_wait(lock->trx);
} }
} }
...@@ -2220,6 +2257,7 @@ lock_rec_cancel( ...@@ -2220,6 +2257,7 @@ lock_rec_cancel(
{ {
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad(lock_get_type_low(lock) == LOCK_REC); ut_ad(lock_get_type_low(lock) == LOCK_REC);
ut_ad(!(lock->type_mode & LOCK_CONV_BY_OTHER));
/* Reset the bit (there can be only one set bit) in the lock bitmap */ /* Reset the bit (there can be only one set bit) in the lock bitmap */
lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock)); lock_rec_reset_nth_bit(lock, lock_rec_find_set_bit(lock));
...@@ -2362,8 +2400,12 @@ lock_rec_reset_and_release_wait( ...@@ -2362,8 +2400,12 @@ lock_rec_reset_and_release_wait(
lock = lock_rec_get_first(block, heap_no); lock = lock_rec_get_first(block, heap_no);
while (lock != NULL) { while (lock != NULL) {
if (lock_get_wait(lock)) { if (lock_is_wait_not_by_other(lock->type_mode)) {
lock_rec_cancel(lock); lock_rec_cancel(lock);
} else if (lock_get_wait(lock)) {
/* just reset LOCK_WAIT */
lock_rec_reset_nth_bit(lock, heap_no);
lock_reset_lock_and_trx_wait(lock);
} else { } else {
lock_rec_reset_nth_bit(lock, heap_no); lock_rec_reset_nth_bit(lock, heap_no);
} }
...@@ -3588,6 +3630,7 @@ lock_table_create( ...@@ -3588,6 +3630,7 @@ lock_table_create(
ut_ad(table && trx); ut_ad(table && trx);
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad(!(type_mode & LOCK_CONV_BY_OTHER));
if ((type_mode & LOCK_MODE_MASK) == LOCK_AUTO_INC) { if ((type_mode & LOCK_MODE_MASK) == LOCK_AUTO_INC) {
++table->n_waiting_or_granted_auto_inc_locks; ++table->n_waiting_or_granted_auto_inc_locks;
...@@ -4139,6 +4182,7 @@ lock_cancel_waiting_and_release( ...@@ -4139,6 +4182,7 @@ lock_cancel_waiting_and_release(
lock_t* lock) /*!< in: waiting lock request */ lock_t* lock) /*!< in: waiting lock request */
{ {
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad(!(lock->type_mode & LOCK_CONV_BY_OTHER));
if (lock_get_type_low(lock) == LOCK_REC) { if (lock_get_type_low(lock) == LOCK_REC) {
...@@ -5153,7 +5197,7 @@ lock_rec_insert_check_and_lock( ...@@ -5153,7 +5197,7 @@ lock_rec_insert_check_and_lock(
err = lock_rec_enqueue_waiting(LOCK_X | LOCK_GAP err = lock_rec_enqueue_waiting(LOCK_X | LOCK_GAP
| LOCK_INSERT_INTENTION, | LOCK_INSERT_INTENTION,
block, next_rec_heap_no, block, next_rec_heap_no,
index, thr); NULL, index, thr);
} else { } else {
err = DB_SUCCESS; err = DB_SUCCESS;
} }
...@@ -5229,10 +5273,23 @@ lock_rec_convert_impl_to_expl( ...@@ -5229,10 +5273,23 @@ lock_rec_convert_impl_to_expl(
if (!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, block, if (!lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, block,
heap_no, impl_trx)) { heap_no, impl_trx)) {
ulint type_mode = (LOCK_REC | LOCK_X
| LOCK_REC_NOT_GAP);
/* If the delete-marked record was locked already,
we should reserve lock waiting for impl_trx as
implicit lock. Because cannot lock at this moment.*/
if (rec_get_deleted_flag(rec, rec_offs_comp(offsets))
&& lock_rec_other_has_conflicting(
LOCK_X | LOCK_REC_NOT_GAP, block,
heap_no, impl_trx)) {
type_mode |= (LOCK_WAIT | LOCK_CONV_BY_OTHER);
}
lock_rec_add_to_queue( lock_rec_add_to_queue(
LOCK_REC | LOCK_X | LOCK_REC_NOT_GAP, type_mode, block, heap_no, index, impl_trx);
block, heap_no, index, impl_trx);
} }
} }
} }
......
...@@ -2265,7 +2265,10 @@ row_ins_index_entry( ...@@ -2265,7 +2265,10 @@ row_ins_index_entry(
err = row_ins_index_entry_low(BTR_MODIFY_LEAF, index, entry, err = row_ins_index_entry_low(BTR_MODIFY_LEAF, index, entry,
n_ext, thr); n_ext, thr);
if (err != DB_FAIL) { if (err != DB_FAIL) {
if (index == dict_table_get_first_index(index->table)
&& thr_get_trx(thr)->mysql_thd != 0) {
DEBUG_SYNC_C("row_ins_clust_index_entry_leaf_after");
}
return(err); return(err);
} }
......
...@@ -85,6 +85,7 @@ Created 10/8/1995 Heikki Tuuri ...@@ -85,6 +85,7 @@ Created 10/8/1995 Heikki Tuuri
#include "ha_prototypes.h" #include "ha_prototypes.h"
#include "trx0i_s.h" #include "trx0i_s.h"
#include "os0sync.h" /* for HAVE_ATOMIC_BUILTINS */ #include "os0sync.h" /* for HAVE_ATOMIC_BUILTINS */
#include "read0read.h"
#ifdef __WIN__ #ifdef __WIN__
/* error LNK2001: unresolved external symbol _debug_sync_C_callback_ptr */ /* error LNK2001: unresolved external symbol _debug_sync_C_callback_ptr */
...@@ -1983,6 +1984,16 @@ srv_export_innodb_status(void) ...@@ -1983,6 +1984,16 @@ srv_export_innodb_status(void)
export_vars.innodb_purge_trx_id_age = export_vars.innodb_purge_trx_id_age =
ut_dulint_minus(trx_sys->max_trx_id, purge_sys->done_trx_no); ut_dulint_minus(trx_sys->max_trx_id, purge_sys->done_trx_no);
} }
if (!purge_sys->view
|| ut_dulint_cmp(trx_sys->max_trx_id,
purge_sys->view->up_limit_id) < 0) {
export_vars.innodb_purge_view_trx_id_age = 0;
} else {
export_vars.innodb_purge_view_trx_id_age =
ut_dulint_minus(trx_sys->max_trx_id,
purge_sys->view->up_limit_id);
}
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
mutex_exit(&srv_innodb_monitor_mutex); mutex_exit(&srv_innodb_monitor_mutex);
......
...@@ -51,6 +51,10 @@ UNIV_INTERN trx_purge_t* purge_sys = NULL; ...@@ -51,6 +51,10 @@ UNIV_INTERN trx_purge_t* purge_sys = NULL;
which needs no purge */ which needs no purge */
UNIV_INTERN trx_undo_rec_t trx_purge_dummy_rec; UNIV_INTERN trx_undo_rec_t trx_purge_dummy_rec;
#ifdef UNIV_DEBUG
UNIV_INTERN my_bool srv_purge_view_update_only_debug;
#endif /* UNIV_DEBUG */
/*****************************************************************//** /*****************************************************************//**
Checks if trx_id is >= purge_view: then it is guaranteed that its update Checks if trx_id is >= purge_view: then it is guaranteed that its update
undo log still exists in the system. undo log still exists in the system.
...@@ -1142,6 +1146,13 @@ trx_purge(void) ...@@ -1142,6 +1146,13 @@ trx_purge(void)
rw_lock_x_unlock(&(purge_sys->latch)); rw_lock_x_unlock(&(purge_sys->latch));
#ifdef UNIV_DEBUG
if (srv_purge_view_update_only_debug) {
mutex_exit(&(purge_sys->mutex));
return(0);
}
#endif
purge_sys->state = TRX_PURGE_ON; purge_sys->state = TRX_PURGE_ON;
/* Handle at most 20 undo log pages in one purge batch */ /* Handle at most 20 undo log pages in one purge batch */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment