From 01bb13a34f0066678884d0e0f37b33517f7b256a Mon Sep 17 00:00:00 2001
From: sunny <>
Date: Fri, 29 Jan 2010 21:42:49 +0000
Subject: [PATCH] branches/zip: Two changes to fix the problem:

1. First scan the joining transaction's locks and check if no other
transaction is waiting for a lock held by the joining transaction.
If no other transaction is waiting then  no deadlock an occur and
we avoid doing an exhaustive search.

2. Change the direction of the lock traversal from backward to forward.
Previously we traversed backward from the lock that has to wait, the function
to that fetched the previous node was very inefficient resulting in O(n^2)
access to the rec lock list.

Fix Bug #49047 InnoDB deadlock detection is CPU intensive with many locks on a single row.

rb://218
---
 include/trx0trx.h |   8 ++-
 lock/lock0lock.c  | 130 ++++++++++++++++++++++++++++++++++++++++------
 2 files changed, 116 insertions(+), 22 deletions(-)

diff --git a/include/trx0trx.h b/include/trx0trx.h
index 5f2c1246f37..e2ad1f2f722 100644
--- a/include/trx0trx.h
+++ b/include/trx0trx.h
@@ -526,6 +526,9 @@ struct trx_struct{
 					/* 0, RW_S_LATCH, or RW_X_LATCH:
 					the latch mode trx currently holds
 					on dict_operation_lock */
+	unsigned	deadlock_mark:1;/*!< a mark field used in deadlock
+					checking algorithm. Always protected
+					by the kernel_mutex. */
 	time_t		start_time;	/*!< time the trx object was created
 					or the state last time became
 					TRX_ACTIVE */
@@ -640,11 +643,6 @@ struct trx_struct{
 			wait_thrs;	/*!< query threads belonging to this
 					trx that are in the QUE_THR_LOCK_WAIT
 					state */
-	ulint		deadlock_mark;	/*!< a mark field used in deadlock
-					checking algorithm.  This must be
-					in its own machine word, because
-					it can be changed by other
-					threads while holding kernel_mutex. */
 	/*------------------------------*/
 	mem_heap_t*	lock_heap;	/*!< memory heap for the locks of the
 					transaction */
diff --git a/lock/lock0lock.c b/lock/lock0lock.c
index 0fa67d31716..4552c1e7cde 100644
--- a/lock/lock0lock.c
+++ b/lock/lock0lock.c
@@ -401,7 +401,7 @@ lock_deadlock_recursive(
 /*====================*/
 	trx_t*	start,		/*!< in: recursion starting point */
 	trx_t*	trx,		/*!< in: a transaction waiting for a lock */
-	lock_t*	wait_lock,	/*!< in: the lock trx is waiting to be granted */
+	lock_t*	wait_lock,	/*!< in:  lock that is waiting to be granted */
 	ulint*	cost,		/*!< in/out: number of calculation steps thus
 				far: if this exceeds LOCK_MAX_N_STEPS_...
 				we return LOCK_VICTIM_IS_START */
@@ -411,7 +411,7 @@ lock_deadlock_recursive(
 
 /*********************************************************************//**
 Gets the nth bit of a record lock.
-@return	TRUE if bit set */
+@return	TRUE if bit set also if i == ULINT_UNDEFINED return FALSE*/
 UNIV_INLINE
 ibool
 lock_rec_get_nth_bit(
@@ -1222,7 +1222,7 @@ lock_rec_get_first_on_page(
 
 /*********************************************************************//**
 Gets the next explicit lock request on a record.
-@return	next lock, NULL if none exists */
+@return	next lock, NULL if none exists or if heap_no == ULINT_UNDEFINED */
 UNIV_INLINE
 lock_t*
 lock_rec_get_next(
@@ -3311,6 +3311,64 @@ lock_deadlock_occurs(
 	return(FALSE);
 }
 
+/********************************************************************//**
+Check that no other transaction is waiting on this transaction's locks.
+@return TRUE if some other transaction is waiting for this lock. */
+static
+ulint
+lock_trx_has_no_waiters(
+/*====================*/
+	const trx_t*	trx)		/*!< in: the transaction to check */
+{
+	const lock_t*	lock;
+
+	ut_ad(mutex_own(&kernel_mutex));
+
+	for (lock = UT_LIST_GET_FIRST(trx->trx_locks);
+	     lock != NULL;
+	     lock = UT_LIST_GET_NEXT(trx_locks, lock)) {
+
+		const lock_t*	wait_lock = lock;
+
+		/* Look for all transactions that could be waiting on this
+		transaction's locks. For that we need to search forward. */
+		if (lock_get_type_low(lock) == LOCK_REC) {
+
+			ulint	heap_no;
+
+			/* It's possible for heap_no to be undefined here.
+			This can happen during lock move from one page to
+			another when we split. */
+
+			heap_no = lock_rec_find_set_bit(lock);
+
+			do {
+				wait_lock = lock_rec_get_next(
+					heap_no, (lock_t*) wait_lock);
+
+				if (wait_lock != NULL
+				    && lock_has_to_wait(wait_lock, lock)) {
+
+					return(FALSE);
+				}
+			} while (wait_lock != NULL);
+		} else {
+			do {
+				wait_lock = UT_LIST_GET_NEXT(
+					un_member.tab_lock.locks, wait_lock);
+
+				if (wait_lock != NULL
+				    && lock_has_to_wait(wait_lock, lock) ) {
+
+					return(FALSE);
+				}
+			} while (wait_lock != NULL);
+		}
+	}
+
+	return(TRUE);
+}
+
 /********************************************************************//**
 Looks recursively for a deadlock.
 @return 0 if no deadlock found, LOCK_VICTIM_IS_START if there was a
@@ -3324,7 +3382,7 @@ lock_deadlock_recursive(
 /*====================*/
 	trx_t*	start,		/*!< in: recursion starting point */
 	trx_t*	trx,		/*!< in: a transaction waiting for a lock */
-	lock_t*	wait_lock,	/*!< in: the lock trx is waiting to be granted */
+	lock_t*	wait_lock,	/*!< in: lock that is waiting to be granted */
 	ulint*	cost,		/*!< in/out: number of calculation steps thus
 				far: if this exceeds LOCK_MAX_N_STEPS_...
 				we return LOCK_VICTIM_IS_START */
@@ -3332,10 +3390,10 @@ lock_deadlock_recursive(
 				LOCK_MAX_DEPTH_IN_DEADLOCK_CHECK, we
 				return LOCK_VICTIM_IS_START */
 {
+	ulint	ret;
 	lock_t*	lock;
-	ulint	bit_no		= ULINT_UNDEFINED;
 	trx_t*	lock_trx;
-	ulint	ret;
+	ulint	heap_no		= ULINT_UNDEFINED;
 
 	ut_a(trx);
 	ut_a(start);
@@ -3346,32 +3404,54 @@ lock_deadlock_recursive(
 		/* We have already exhaustively searched the subtree starting
 		from this trx */
 
+		return(0);
+	} else if (lock_trx_has_no_waiters(trx)) {
+		/* If no other transaction is waiting for this transaction
+		to release its locks then no deadlock can occur. */
 		return(0);
 	}
 
 	*cost = *cost + 1;
 
-	lock = wait_lock;
 
 	if (lock_get_type_low(wait_lock) == LOCK_REC) {
+		ulint		space;
+		ulint		page_no;
+
+		heap_no = lock_rec_find_set_bit(wait_lock);
+		ut_a(heap_no != ULINT_UNDEFINED);
+
+		space = wait_lock->un_member.rec_lock.space;
+		page_no = wait_lock->un_member.rec_lock.page_no;
+
+		lock = lock_rec_get_first_on_page_addr(space, page_no);
 
-		bit_no = lock_rec_find_set_bit(wait_lock);
+		/* Position the iterator on the first matching record lock. */
+		while (lock != NULL
+		       && lock != wait_lock
+		       && !lock_rec_get_nth_bit(lock, heap_no)) {
 
-		ut_a(bit_no != ULINT_UNDEFINED);
+			lock = lock_rec_get_next_on_page(lock);
+		}
+
+		if (lock == wait_lock) {
+			lock = NULL;
+		}
+
+		ut_ad(lock == NULL || lock_rec_get_nth_bit(lock, heap_no));
+
+	} else {
+		lock = wait_lock;
 	}
 
 	/* Look at the locks ahead of wait_lock in the lock queue */
 
 	for (;;) {
-		if (lock_get_type_low(lock) & LOCK_TABLE) {
+		/* Get previous table lock. */
+		if (heap_no == ULINT_UNDEFINED) {
 
-			lock = UT_LIST_GET_PREV(un_member.tab_lock.locks,
-						lock);
-		} else {
-			ut_ad(lock_get_type_low(lock) == LOCK_REC);
-			ut_a(bit_no != ULINT_UNDEFINED);
-
-			lock = (lock_t*) lock_rec_get_prev(lock, bit_no);
+			lock = UT_LIST_GET_PREV(
+				un_member.tab_lock.locks, lock);
 		}
 
 		if (lock == NULL) {
@@ -3493,12 +3573,28 @@ lock_deadlock_recursive(
 				ret = lock_deadlock_recursive(
 					start, lock_trx,
 					lock_trx->wait_lock, cost, depth + 1);
+
 				if (ret != 0) {
 
 					return(ret);
 				}
 			}
 		}
+		/* Get the next record lock to check. */
+		if (heap_no != ULINT_UNDEFINED) {
+
+			ut_a(lock != NULL);
+
+			do {
+				lock = lock_rec_get_next_on_page(lock);
+			} while (lock != NULL
+				&& lock != wait_lock
+				&& !lock_rec_get_nth_bit(lock, heap_no));
+
+			if (lock == wait_lock) {
+				lock = NULL;
+			}
+		}
 	}/* end of the 'for (;;)'-loop */
 }
 
-- 
2.30.9