Commit 45f6262f authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel...

MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel replication causing replication to fail.

After-review changes. Fix InnoDB coding style issues.
parent ba4e56d8
......@@ -4218,21 +4218,23 @@ innobase_kill_query(
trx = thd_to_trx(thd);
if (trx)
{
THD *cur = current_thd;
THD *owner = trx->current_lock_mutex_owner;
/* Cancel a pending lock request. */
if (owner != cur)
lock_mutex_enter();
trx_mutex_enter(trx);
if (trx->lock.wait_lock)
lock_cancel_waiting_and_release(trx->lock.wait_lock);
trx_mutex_exit(trx);
if (owner != cur)
lock_mutex_exit();
}
if (trx) {
THD *cur = current_thd;
THD *owner = trx->current_lock_mutex_owner;
/* Cancel a pending lock request. */
if (owner != cur) {
lock_mutex_enter();
}
trx_mutex_enter(trx);
if (trx->lock.wait_lock) {
lock_cancel_waiting_and_release(trx->lock.wait_lock);
}
trx_mutex_exit(trx);
if (owner != cur) {
lock_mutex_exit();
}
}
DBUG_VOID_RETURN;
}
......
......@@ -399,9 +399,9 @@ UNIV_INTERN ibool lock_print_waits = FALSE;
/* Buffer to collect THDs to report waits for. */
struct thd_wait_reports {
struct thd_wait_reports *next;
ulint used;
trx_t *waitees[64];
struct thd_wait_reports *next; /*!< List link */
ulint used; /*!< How many elements in waitees[] */
trx_t *waitees[64]; /*!< Trxs for thd_report_wait_for() */
};
......@@ -3915,55 +3915,58 @@ lock_deadlock_search(
return(ctx->start->id);
} else {
/* We do not need to report autoinc locks to the upper
layer. These locks are released before commit, so they can
not cause deadlocks with binlog-fixed commit order. */
if (waitee_ptr && (lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)) {
if (waitee_ptr->used == sizeof(waitee_ptr->waitees)/
sizeof(waitee_ptr->waitees[0])) {
waitee_ptr->next =
(struct thd_wait_reports *)
mem_alloc(sizeof(*waitee_ptr));
waitee_ptr = waitee_ptr->next;
if (!waitee_ptr) {
ctx->too_deep = TRUE;
return(ctx->start->id);
}
waitee_ptr->next = NULL;
waitee_ptr->used = 0;
}
waitee_ptr->waitees[waitee_ptr->used++] = lock->trx;
}
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
/* Another trx ahead has requested a lock in an
incompatible mode, and is itself waiting for a lock. */
++ctx->cost;
/* Save current search state. */
if (!lock_deadlock_push(ctx, lock, heap_no)) {
/* Unable to save current search state, stack
size not big enough. */
ctx->too_deep = TRUE;
return(ctx->start->id);
/* We do not need to report autoinc locks to the upper
layer. These locks are released before commit, so they
can not cause deadlocks with binlog-fixed commit
order. */
if (waitee_ptr &&
(lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)) {
if (waitee_ptr->used ==
sizeof(waitee_ptr->waitees) /
sizeof(waitee_ptr->waitees[0])) {
waitee_ptr->next =
(struct thd_wait_reports *)
mem_alloc(sizeof(*waitee_ptr));
waitee_ptr = waitee_ptr->next;
if (!waitee_ptr) {
ctx->too_deep = TRUE;
return(ctx->start->id);
}
waitee_ptr->next = NULL;
waitee_ptr->used = 0;
}
waitee_ptr->waitees[waitee_ptr->used++] = lock->trx;
}
ctx->wait_lock = lock->trx->lock.wait_lock;
lock = lock_get_first_lock(ctx, &heap_no);
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
/* Another trx ahead has requested a lock in an
incompatible mode, and is itself waiting for a lock. */
++ctx->cost;
/* Save current search state. */
if (!lock_deadlock_push(ctx, lock, heap_no)) {
if (lock->trx->lock.deadlock_mark > ctx->mark_start) {
/* Unable to save current search state, stack
size not big enough. */
ctx->too_deep = TRUE;
return(ctx->start->id);
}
ctx->wait_lock = lock->trx->lock.wait_lock;
lock = lock_get_first_lock(ctx, &heap_no);
if (lock->trx->lock.deadlock_mark > ctx->mark_start) {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
} else {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
} else {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
}
}
......@@ -4028,22 +4031,28 @@ lock_deadlock_trx_rollback(
trx_mutex_exit(trx);
}
static void
mysql_report_waiters(struct thd_wait_reports *waitee_buf_ptr,
THD *mysql_thd,
trx_id_t victim_trx_id)
static
void
lock_report_waiters_to_mysql(
/*=======================*/
struct thd_wait_reports* waitee_buf_ptr, /*!< in: set of trxs */
THD* mysql_thd, /*!< in: THD */
trx_id_t victim_trx_id) /*!< in: Trx selected
as deadlock victim, if
any */
{
struct thd_wait_reports *p = waitee_buf_ptr;
while (p) {
struct thd_wait_reports *q;
ulint i = 0;
struct thd_wait_reports* p;
struct thd_wait_reports* q;
ulint i;
p = waitee_buf_ptr;
while (p) {
i = 0;
while (i < p->used) {
trx_t *w_trx = p->waitees[i];
/* There is no need to report waits to a trx already
selected as a victim. */
if (w_trx->id != victim_trx_id)
{
if (w_trx->id != victim_trx_id) {
/* If thd_report_wait_for() decides to kill the
transaction, then we will get a call back into
innobase_kill_query. We mark this by setting
......@@ -4056,8 +4065,9 @@ mysql_report_waiters(struct thd_wait_reports *waitee_buf_ptr,
++i;
}
q = p->next;
if (p != waitee_buf_ptr)
if (p != waitee_buf_ptr) {
mem_free(p);
}
p = q;
}
}
......@@ -4078,9 +4088,10 @@ lock_deadlock_check_and_resolve(
const lock_t* lock, /*!< in: lock the transaction is requesting */
const trx_t* trx) /*!< in: transaction */
{
trx_id_t victim_trx_id;
struct thd_wait_reports waitee_buf, *waitee_buf_ptr;
THD* start_mysql_thd;
trx_id_t victim_trx_id;
struct thd_wait_reports waitee_buf;
struct thd_wait_reports*waitee_buf_ptr;
THD* start_mysql_thd;
ut_ad(trx != NULL);
ut_ad(lock != NULL);
......@@ -4088,10 +4099,11 @@ lock_deadlock_check_and_resolve(
assert_trx_in_list(trx);
start_mysql_thd = trx->mysql_thd;
if (start_mysql_thd && thd_need_wait_for(start_mysql_thd))
if (start_mysql_thd && thd_need_wait_for(start_mysql_thd)) {
waitee_buf_ptr = &waitee_buf;
else
} else {
waitee_buf_ptr = NULL;
}
/* Try and resolve as many deadlocks as possible. */
do {
......@@ -4113,9 +4125,11 @@ lock_deadlock_check_and_resolve(
victim_trx_id = lock_deadlock_search(&ctx, waitee_buf_ptr);
/* Report waits to upper layer, as needed. */
if (waitee_buf_ptr)
mysql_report_waiters(waitee_buf_ptr, start_mysql_thd,
victim_trx_id);
if (waitee_buf_ptr) {
lock_report_waiters_to_mysql(waitee_buf_ptr,
start_mysql_thd,
victim_trx_id);
}
/* Search too deep, we rollback the joining transaction. */
if (ctx.too_deep) {
......
......@@ -1921,10 +1921,11 @@ trx_weight_ge(
/* First ask the upper server layer if it has any preference for which
to prefer as a deadlock victim. */
pref= thd_deadlock_victim_preference(a->mysql_thd, b->mysql_thd);
if (pref < 0)
if (pref < 0) {
return FALSE;
else if (pref > 0)
} else if (pref > 0) {
return TRUE;
}
/* Upper server layer had no preference, we fall back to comparing the
number of altered/locked rows. */
......
......@@ -4704,13 +4704,13 @@ innobase_kill_connection(
trx = thd_to_trx(thd);
if (trx)
{
if (trx) {
THD *cur = current_thd;
THD *owner = trx->current_lock_mutex_owner;
if (owner != cur)
if (owner != cur) {
lock_mutex_enter();
}
trx_mutex_enter(trx);
/* Cancel a pending lock request. */
......@@ -4718,8 +4718,9 @@ innobase_kill_connection(
lock_cancel_waiting_and_release(trx->lock.wait_lock);
trx_mutex_exit(trx);
if (owner != cur)
if (owner != cur) {
lock_mutex_exit();
}
}
DBUG_VOID_RETURN;
......
......@@ -399,9 +399,9 @@ UNIV_INTERN ibool lock_print_waits = FALSE;
/* Buffer to collect THDs to report waits for. */
struct thd_wait_reports {
struct thd_wait_reports *next;
ulint used;
trx_t *waitees[64];
struct thd_wait_reports *next; /*!< List link */
ulint used; /*!< How many elements in waitees[] */
trx_t *waitees[64]; /*!< Trxs for thd_report_wait_for() */
};
......@@ -3938,55 +3938,58 @@ lock_deadlock_search(
return(ctx->start->id);
} else {
/* We do not need to report autoinc locks to the upper
layer. These locks are released before commit, so they can
not cause deadlocks with binlog-fixed commit order. */
if (waitee_ptr && (lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)) {
if (waitee_ptr->used == sizeof(waitee_ptr->waitees)/
sizeof(waitee_ptr->waitees[0])) {
waitee_ptr->next =
(struct thd_wait_reports *)
mem_alloc(sizeof(*waitee_ptr));
waitee_ptr = waitee_ptr->next;
if (!waitee_ptr) {
ctx->too_deep = TRUE;
return(ctx->start->id);
}
waitee_ptr->next = NULL;
waitee_ptr->used = 0;
}
waitee_ptr->waitees[waitee_ptr->used++] = lock->trx;
}
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
/* Another trx ahead has requested a lock in an
incompatible mode, and is itself waiting for a lock. */
++ctx->cost;
/* Save current search state. */
if (!lock_deadlock_push(ctx, lock, heap_no)) {
/* Unable to save current search state, stack
size not big enough. */
ctx->too_deep = TRUE;
return(ctx->start->id);
/* We do not need to report autoinc locks to the upper
layer. These locks are released before commit, so they
can not cause deadlocks with binlog-fixed commit
order. */
if (waitee_ptr &&
(lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)) {
if (waitee_ptr->used ==
sizeof(waitee_ptr->waitees) /
sizeof(waitee_ptr->waitees[0])) {
waitee_ptr->next =
(struct thd_wait_reports *)
mem_alloc(sizeof(*waitee_ptr));
waitee_ptr = waitee_ptr->next;
if (!waitee_ptr) {
ctx->too_deep = TRUE;
return(ctx->start->id);
}
waitee_ptr->next = NULL;
waitee_ptr->used = 0;
}
waitee_ptr->waitees[waitee_ptr->used++] = lock->trx;
}
ctx->wait_lock = lock->trx->lock.wait_lock;
lock = lock_get_first_lock(ctx, &heap_no);
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
/* Another trx ahead has requested a lock in an
incompatible mode, and is itself waiting for a lock. */
++ctx->cost;
/* Save current search state. */
if (!lock_deadlock_push(ctx, lock, heap_no)) {
if (lock->trx->lock.deadlock_mark > ctx->mark_start) {
/* Unable to save current search state, stack
size not big enough. */
ctx->too_deep = TRUE;
return(ctx->start->id);
}
ctx->wait_lock = lock->trx->lock.wait_lock;
lock = lock_get_first_lock(ctx, &heap_no);
if (lock->trx->lock.deadlock_mark > ctx->mark_start) {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
} else {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
} else {
lock = lock_get_next_lock(ctx, lock, heap_no);
}
}
}
......@@ -4051,22 +4054,28 @@ lock_deadlock_trx_rollback(
trx_mutex_exit(trx);
}
static void
mysql_report_waiters(struct thd_wait_reports *waitee_buf_ptr,
THD *mysql_thd,
trx_id_t victim_trx_id)
static
void
lock_report_waiters_to_mysql(
/*=======================*/
struct thd_wait_reports* waitee_buf_ptr, /*!< in: set of trxs */
THD* mysql_thd, /*!< in: THD */
trx_id_t victim_trx_id) /*!< in: Trx selected
as deadlock victim, if
any */
{
struct thd_wait_reports *p = waitee_buf_ptr;
while (p) {
struct thd_wait_reports *q;
ulint i = 0;
struct thd_wait_reports* p;
struct thd_wait_reports* q;
ulint i;
p = waitee_buf_ptr;
while (p) {
i = 0;
while (i < p->used) {
trx_t *w_trx = p->waitees[i];
/* There is no need to report waits to a trx already
selected as a victim. */
if (w_trx->id != victim_trx_id)
{
if (w_trx->id != victim_trx_id) {
/* If thd_report_wait_for() decides to kill the
transaction, then we will get a call back into
innobase_kill_query. We mark this by setting
......@@ -4079,8 +4088,9 @@ mysql_report_waiters(struct thd_wait_reports *waitee_buf_ptr,
++i;
}
q = p->next;
if (p != waitee_buf_ptr)
if (p != waitee_buf_ptr) {
mem_free(p);
}
p = q;
}
}
......@@ -4101,9 +4111,10 @@ lock_deadlock_check_and_resolve(
const lock_t* lock, /*!< in: lock the transaction is requesting */
const trx_t* trx) /*!< in: transaction */
{
trx_id_t victim_trx_id;
struct thd_wait_reports waitee_buf, *waitee_buf_ptr;
THD* start_mysql_thd;
trx_id_t victim_trx_id;
struct thd_wait_reports waitee_buf;
struct thd_wait_reports*waitee_buf_ptr;
THD* start_mysql_thd;
ut_ad(trx != NULL);
ut_ad(lock != NULL);
......@@ -4111,10 +4122,11 @@ lock_deadlock_check_and_resolve(
assert_trx_in_list(trx);
start_mysql_thd = trx->mysql_thd;
if (start_mysql_thd && thd_need_wait_for(start_mysql_thd))
if (start_mysql_thd && thd_need_wait_for(start_mysql_thd)) {
waitee_buf_ptr = &waitee_buf;
else
} else {
waitee_buf_ptr = NULL;
}
/* Try and resolve as many deadlocks as possible. */
do {
......@@ -4136,9 +4148,11 @@ lock_deadlock_check_and_resolve(
victim_trx_id = lock_deadlock_search(&ctx, waitee_buf_ptr);
/* Report waits to upper layer, as needed. */
if (waitee_buf_ptr)
mysql_report_waiters(waitee_buf_ptr, start_mysql_thd,
victim_trx_id);
if (waitee_buf_ptr) {
lock_report_waiters_to_mysql(waitee_buf_ptr,
start_mysql_thd,
victim_trx_id);
}
/* Search too deep, we rollback the joining transaction. */
if (ctx.too_deep) {
......
......@@ -2142,9 +2142,8 @@ trx_assert_started(
#endif /* UNIV_DEBUG */
/*******************************************************************//**
Compares the "weight" (or size) of two transactions. Transactions that
have edited non-transactional tables are considered heavier than ones
that have not.
Compares the "weight" (or size) of two transactions. The heavier the weight,
the more reluctant we will be to choose the transaction as a deadlock victim.
@return TRUE if weight(a) >= weight(b) */
UNIV_INTERN
ibool
......@@ -2158,10 +2157,11 @@ trx_weight_ge(
/* First ask the upper server layer if it has any preference for which
to prefer as a deadlock victim. */
pref= thd_deadlock_victim_preference(a->mysql_thd, b->mysql_thd);
if (pref < 0)
if (pref < 0) {
return FALSE;
else if (pref > 0)
} else if (pref > 0) {
return TRUE;
}
/* Upper server layer had no preference, we fall back to comparing the
number of altered/locked rows. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment