From 5674905ffbc43173e4ebd2a97b0161b7aded2863 Mon Sep 17 00:00:00 2001
From: marko <>
Date: Thu, 26 Jan 2006 10:45:27 +0000
Subject: [PATCH] Implement semi-consistent read to reduce lock conflicts at
 the cost of breaking serializability.  (Bug #3300)

ha_innobase::unlock_row(): reset the "did semi consistent read" flag

ha_innobase::was_semi_consistent_read(),
ha_innobase::try_semi_consistent_read(): new methods

row_prebuilt_t, row_create_prebuilt(): add field row_read_type for
keeping track of semi-consistent reads

row_vers_build_for_semi_consistent_read(),
row_sel_build_committed_vers_for_mysql(): new functions

row_search_for_mysql(): implement semi-consistent reads
---
 handler/ha_innodb.cc |  48 ++++++++++--
 handler/ha_innodb.h  |   2 +
 include/row0mysql.h  |  29 ++++++++
 include/row0vers.h   |  26 +++++++
 row/row0mysql.c      |   2 +
 row/row0sel.c        | 169 ++++++++++++++++++++++++++++++++++++++++---
 row/row0vers.c       | 138 +++++++++++++++++++++++++++++++++++
 7 files changed, 397 insertions(+), 17 deletions(-)

diff --git a/handler/ha_innodb.cc b/handler/ha_innodb.cc
index 58438fd8c4a..433b03c01fd 100644
--- a/handler/ha_innodb.cc
+++ b/handler/ha_innodb.cc
@@ -3828,9 +3828,9 @@ ha_innobase::delete_row(
 }
 
 /**************************************************************************
-Removes a new lock set on a row. This can be called after a row has been read
-in the processing of an UPDATE or a DELETE query, if the option
-innodb_locks_unsafe_for_binlog is set. */
+Removes a new lock set on a row, if it was not read optimistically. This can 
+be called after a row has been read in the processing of an UPDATE or a DELETE
+query, if the option innodb_locks_unsafe_for_binlog is set. */
 
 void
 ha_innobase::unlock_row(void)
@@ -3840,7 +3840,7 @@ ha_innobase::unlock_row(void)
 
 	DBUG_ENTER("ha_innobase::unlock_row");
 
-	if (last_query_id != user_thd->query_id) {
+	if (UNIV_UNLIKELY(last_query_id != user_thd->query_id)) {
 		ut_print_timestamp(stderr);
 		sql_print_error("last_query_id is %lu != user_thd_query_id is "
 				"%lu", (ulong) last_query_id,
@@ -3848,9 +3848,45 @@ ha_innobase::unlock_row(void)
 		mem_analyze_corruption((byte *) prebuilt->trx);
 		ut_error;
 	}
-	
-	if (srv_locks_unsafe_for_binlog) {
+
+	switch (prebuilt->row_read_type) {
+	case ROW_READ_WITH_LOCKS:
+		if (!srv_locks_unsafe_for_binlog) {
+			break;
+		}
+		/* fall through */
+	case ROW_READ_TRY_SEMI_CONSISTENT:
 		row_unlock_for_mysql(prebuilt, FALSE);
+		break;
+	case ROW_READ_DID_SEMI_CONSISTENT:
+		prebuilt->row_read_type = ROW_READ_TRY_SEMI_CONSISTENT;
+		break;
+	}
+
+	DBUG_VOID_RETURN;
+}
+
+/* See handler.h and row0mysql.h for docs on this function. */
+bool
+ha_innobase::was_semi_consistent_read(void)
+/*=======================================*/
+{
+	row_prebuilt_t*	prebuilt = (row_prebuilt_t*) innobase_prebuilt;
+
+	return(prebuilt->row_read_type == ROW_READ_DID_SEMI_CONSISTENT);
+}
+
+/* See handler.h and row0mysql.h for docs on this function. */
+void
+ha_innobase::try_semi_consistent_read(bool yes)
+/*===========================================*/
+{
+	row_prebuilt_t*	prebuilt = (row_prebuilt_t*) innobase_prebuilt;
+
+	if (yes && srv_locks_unsafe_for_binlog) {
+		prebuilt->row_read_type = ROW_READ_TRY_SEMI_CONSISTENT;
+	} else {
+		prebuilt->row_read_type = ROW_READ_WITH_LOCKS;
 	}
 }
 
diff --git a/handler/ha_innodb.h b/handler/ha_innodb.h
index 420ec2385df..d7f7e2fb0dc 100644
--- a/handler/ha_innodb.h
+++ b/handler/ha_innodb.h
@@ -119,6 +119,8 @@ class ha_innobase: public handler
   	int write_row(byte * buf);
   	int update_row(const byte * old_data, byte * new_data);
   	int delete_row(const byte * buf);
+	bool was_semi_consistent_read();
+	void try_semi_consistent_read(bool yes);
 	void unlock_row();
 
   	int index_init(uint index, bool sorted);
diff --git a/include/row0mysql.h b/include/row0mysql.h
index b5da4634d98..cb97dbaed79 100644
--- a/include/row0mysql.h
+++ b/include/row0mysql.h
@@ -612,6 +612,31 @@ struct row_prebuilt_struct {
 					that was decided in ha_innodb.cc,
 					::store_lock(), ::external_lock(),
 					etc. */
+	ulint		row_read_type;	/* ROW_READ_WITH_LOCKS if row locks
+					should be the obtained for records
+					under an UPDATE or DELETE cursor.
+					If innodb_locks_unsafe_for_binlog
+					is TRUE, this can be set to
+					ROW_READ_TRY_SEMI_CONSISTENT, so that
+					if the row under an UPDATE or DELETE
+					cursor was locked by another
+					transaction, InnoDB will resort
+					to reading the last committed value
+					('semi-consistent read').  Then,
+					this field will be set to
+					ROW_READ_DID_SEMI_CONSISTENT to
+					indicate that.  If the row does not
+					match the WHERE condition, MySQL will
+					invoke handler::unlock_row() to
+					clear the flag back to
+					ROW_READ_TRY_SEMI_CONSISTENT and
+					to simply skip the row.  If
+					the row matches, the next call to
+					row_search_for_mysql() will lock
+					the row.
+					This eliminates lock waits in some
+					cases; note that this breaks
+					serializability. */
 	ulint		mysql_prefix_len;/* byte offset of the end of
 					the last requested column */
 	ulint		mysql_row_len;	/* length in bytes of a row in the
@@ -657,6 +682,10 @@ struct row_prebuilt_struct {
 #define ROW_RETRIEVE_PRIMARY_KEY	1
 #define ROW_RETRIEVE_ALL_COLS		2
 
+/* Values for row_read_type */
+#define ROW_READ_WITH_LOCKS		0
+#define ROW_READ_TRY_SEMI_CONSISTENT	1
+#define ROW_READ_DID_SEMI_CONSISTENT	2
 
 #ifndef UNIV_NONINL
 #include "row0mysql.ic"
diff --git a/include/row0vers.h b/include/row0vers.h
index 079d841f7f3..fafbe9a2402 100644
--- a/include/row0vers.h
+++ b/include/row0vers.h
@@ -92,6 +92,32 @@ row_vers_build_for_consistent_read(
 				record does not exist in the view, that is,
 				it was freshly inserted afterwards */
 
+/*********************************************************************
+Constructs the last committed version of a clustered index record,
+which should be seen by a semi-consistent read. */
+
+ulint
+row_vers_build_for_semi_consistent_read(
+/*====================================*/
+				/* out: DB_SUCCESS or DB_MISSING_HISTORY */
+	rec_t*		rec,	/* in: record in a clustered index; the
+				caller must have a latch on the page; this
+				latch locks the top of the stack of versions
+				of this records */
+	mtr_t*		mtr,	/* in: mtr holding the latch on rec */
+	dict_index_t*	index,	/* in: the clustered index */
+	ulint**		offsets,/* in/out: offsets returned by
+				rec_get_offsets(rec, index) */
+	mem_heap_t**	offset_heap,/* in/out: memory heap from which
+				the offsets are allocated */
+	mem_heap_t*	in_heap,/* in: memory heap from which the memory for
+				old_vers is allocated; memory for possible
+				intermediate versions is allocated and freed
+				locally within the function */
+	rec_t**		old_vers);/* out, own: rec, old version, or NULL if the
+				record does not exist in the view, that is,
+				it was freshly inserted afterwards */
+
 
 #ifndef UNIV_NONINL
 #include "row0vers.ic"
diff --git a/row/row0mysql.c b/row/row0mysql.c
index 05badcfa1d0..4cc060d55a7 100644
--- a/row/row0mysql.c
+++ b/row/row0mysql.c
@@ -626,6 +626,8 @@ row_create_prebuilt(
 	prebuilt->select_lock_type = LOCK_NONE;
 	prebuilt->stored_select_lock_type = 99999999;
 
+	prebuilt->row_read_type = ROW_READ_WITH_LOCKS;
+
 	prebuilt->sel_graph = NULL;
 
 	prebuilt->search_tuple = dtuple_create(heap,
diff --git a/row/row0sel.c b/row/row0sel.c
index ef61de0706e..1a930dd7a0b 100644
--- a/row/row0sel.c
+++ b/row/row0sel.c
@@ -535,6 +535,41 @@ row_sel_build_prev_vers(
 	return(err);
 }
 
+/*************************************************************************
+Builds the last committed version of a clustered index record for a
+semi-consistent read. */
+static
+ulint
+row_sel_build_committed_vers_for_mysql(
+/*===================================*/
+					/* out: DB_SUCCESS or error code */
+	dict_index_t*	clust_index,	/* in: clustered index */
+	row_prebuilt_t*	prebuilt,	/* in: prebuilt struct */
+	rec_t*		rec,		/* in: record in a clustered index */
+	ulint**		offsets,	/* in/out: offsets returned by
+					rec_get_offsets(rec, clust_index) */
+	mem_heap_t**	offset_heap,	/* in/out: memory heap from which
+					the offsets are allocated */
+	rec_t**		old_vers,	/* out: old version, or NULL if the
+					record does not exist in the view:
+					i.e., it was freshly inserted
+					afterwards */
+	mtr_t*		mtr)		/* in: mtr */
+{
+	ulint	err;
+
+	if (prebuilt->old_vers_heap) {
+		mem_heap_empty(prebuilt->old_vers_heap);
+	} else {
+		prebuilt->old_vers_heap = mem_heap_create(200);
+	}
+	
+	err = row_vers_build_for_semi_consistent_read(rec, mtr, clust_index,
+					offsets, offset_heap,
+					prebuilt->old_vers_heap, old_vers);
+	return(err);
+}
+
 /*************************************************************************
 Tests the conditions which determine when the index segment we are searching
 through has been exhausted. */
@@ -3066,7 +3101,6 @@ row_search_for_mysql(
 	rec_t*		rec;
 	rec_t*		result_rec;
 	rec_t*		clust_rec;
-	rec_t*		old_vers;
 	ulint		err				= DB_SUCCESS;
 	ibool		unique_search			= FALSE;
 	ibool		unique_search_from_clust_index	= FALSE;
@@ -3077,6 +3111,11 @@ row_search_for_mysql(
 					locking SELECT, and the isolation
 					level is <= TRX_ISO_READ_COMMITTED,
 					then this is set to FALSE */
+	ibool		did_semi_consistent_read	= FALSE;
+					/* if the returned record was locked
+					and we did a semi-consistent read
+					(fetch the newest committed version),
+					then this is set to TRUE */
 #ifdef UNIV_SEARCH_DEBUG
 	ulint		cnt				= 0;
 #endif /* UNIV_SEARCH_DEBUG */
@@ -3163,7 +3202,7 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
 		trx->search_latch_timeout = BTR_SEA_TIMEOUT;
 	}
 	
-	/* Reset the new record lock info if we srv_locks_unsafe_for_binlog
+	/* Reset the new record lock info if srv_locks_unsafe_for_binlog
 	is set. Then we are able to remove the record locks set here on an
 	individual row. */
 
@@ -3431,9 +3470,28 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
 	clust_index = dict_table_get_first_index(index->table);
 
 	if (UNIV_LIKELY(direction != 0)) {
-		if (!sel_restore_position_for_mysql(&same_user_rec,
-						BTR_SEARCH_LEAF,
-						pcur, moves_up, &mtr)) {
+		ibool	need_to_process = sel_restore_position_for_mysql(
+				&same_user_rec, BTR_SEARCH_LEAF,
+				pcur, moves_up, &mtr);
+
+		if (UNIV_UNLIKELY(need_to_process)) {
+			if (UNIV_UNLIKELY(prebuilt->row_read_type
+					== ROW_READ_DID_SEMI_CONSISTENT)) {
+				/* We did a semi-consistent read,
+				but the record was removed in
+				the meantime. */
+				prebuilt->row_read_type
+					= ROW_READ_TRY_SEMI_CONSISTENT;
+			}
+		} else if (UNIV_LIKELY(prebuilt->row_read_type
+			   != ROW_READ_DID_SEMI_CONSISTENT)) {
+
+			/* The cursor was positioned on the record
+			that we returned previously.  If we need
+			to repeat a semi-consistent read as a
+			pessimistic locking read, the record
+			cannot be skipped. */
+
 			goto next_rec;
 		}
 
@@ -3751,7 +3809,64 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
 					prebuilt->select_lock_type,
 					lock_type, thr);
 
-		if (err != DB_SUCCESS) {
+		switch (err) {
+			rec_t*	old_vers;
+		case DB_SUCCESS:
+			break;
+		case DB_LOCK_WAIT:
+			if (UNIV_LIKELY(prebuilt->row_read_type
+			    != ROW_READ_TRY_SEMI_CONSISTENT)
+			    || index != clust_index) {
+
+				goto lock_wait_or_error;
+			}
+
+			/* The following call returns 'offsets'
+			associated with 'old_vers' */
+			err = row_sel_build_committed_vers_for_mysql(
+					clust_index, prebuilt, rec,
+					&offsets, &heap,
+					&old_vers, &mtr);
+
+			if (err != DB_SUCCESS) {
+
+				goto lock_wait_or_error;
+			}
+
+			mutex_enter(&kernel_mutex);
+			if (trx->was_chosen_as_deadlock_victim) {
+				mutex_exit(&kernel_mutex);
+
+				goto lock_wait_or_error;
+			}
+			if (UNIV_LIKELY(trx->wait_lock != NULL)) {
+				lock_cancel_waiting_and_release(
+						trx->wait_lock);
+				trx_reset_new_rec_lock_info(trx);
+			} else {
+				mutex_exit(&kernel_mutex);
+
+				/* The lock was granted while we were
+				searching for the last committed version.
+				Do a normal locking read. */
+
+				offsets = rec_get_offsets(rec, index, offsets,
+						ULINT_UNDEFINED, &heap);
+				err = DB_SUCCESS;
+				break;
+			}
+			mutex_exit(&kernel_mutex);
+
+			if (old_vers == NULL) {
+				/* The row was not yet committed */
+
+				goto next_rec;
+			}
+
+			did_semi_consistent_read = TRUE;
+			rec = old_vers;
+			break;
+		default:
 
 			goto lock_wait_or_error;
 		}
@@ -3775,6 +3890,7 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
                             && !lock_clust_rec_cons_read_sees(rec, index,
 						offsets, trx->read_view)) {
 
+				rec_t*	old_vers;
 				/* The following call returns 'offsets'
 				associated with 'old_vers' */
 				err = row_sel_build_prev_vers_for_mysql(
@@ -3821,14 +3937,13 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
 		/* The record is delete-marked: we can skip it */
 
 		if (srv_locks_unsafe_for_binlog
-	    	    && prebuilt->select_lock_type != LOCK_NONE) {
+	    	    && prebuilt->select_lock_type != LOCK_NONE
+		    && !did_semi_consistent_read) {
 
 			/* No need to keep a lock on a delete-marked record
 			if we do not want to use next-key locking. */
 
 			row_unlock_for_mysql(prebuilt, TRUE);
-			
-			trx_reset_new_rec_lock_info(trx);
 		}
 		
 		goto next_rec;
@@ -3882,8 +3997,6 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
 				locking. */
 
 				row_unlock_for_mysql(prebuilt, TRUE);
-			
-				trx_reset_new_rec_lock_info(trx);
 			}
 
 			goto next_rec;
@@ -3990,6 +4103,19 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
 	goto normal_return;
 
 next_rec:
+	/* Reset the old and new "did semi-consistent read" flags. */
+	if (UNIV_UNLIKELY(prebuilt->row_read_type
+			== ROW_READ_DID_SEMI_CONSISTENT)) {
+		prebuilt->row_read_type = ROW_READ_TRY_SEMI_CONSISTENT;
+	}
+	did_semi_consistent_read = FALSE;
+
+	if (UNIV_UNLIKELY(srv_locks_unsafe_for_binlog)
+	    && prebuilt->select_lock_type != LOCK_NONE) {
+
+		trx_reset_new_rec_lock_info(trx);
+	}
+
 	/*-------------------------------------------------------------*/
 	/* PHASE 5: Move the cursor to the next index record */
 
@@ -4042,6 +4168,13 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
 	goto rec_loop;
 
 lock_wait_or_error:
+	/* Reset the old and new "did semi-consistent read" flags. */
+	if (UNIV_UNLIKELY(prebuilt->row_read_type
+				== ROW_READ_DID_SEMI_CONSISTENT)) {
+		prebuilt->row_read_type = ROW_READ_TRY_SEMI_CONSISTENT;
+	}
+	did_semi_consistent_read = FALSE;
+
 	/*-------------------------------------------------------------*/
 
 	btr_pcur_store_position(pcur, &mtr);
@@ -4126,6 +4259,20 @@ cursor lock count is done correctly. See bugs #12263 and #12456!
 	if (UNIV_LIKELY_NULL(heap)) {
 		mem_heap_free(heap);
 	}
+
+	/* Set or reset the "did semi-consistent read" flag on return.
+	The flag did_semi_consistent_read is set if and only if
+	the record being returned was fetched with a semi-consistent read. */
+	ut_ad(prebuilt->row_read_type != ROW_READ_WITH_LOCKS
+		|| !did_semi_consistent_read);
+
+	if (UNIV_UNLIKELY(prebuilt->row_read_type != ROW_READ_WITH_LOCKS)) {
+		if (UNIV_UNLIKELY(did_semi_consistent_read)) {
+			prebuilt->row_read_type = ROW_READ_DID_SEMI_CONSISTENT;
+		} else {
+			prebuilt->row_read_type = ROW_READ_TRY_SEMI_CONSISTENT;
+		}
+	}
 	return(err);
 }
 
diff --git a/row/row0vers.c b/row/row0vers.c
index 8e747423047..b32ab8822f4 100644
--- a/row/row0vers.c
+++ b/row/row0vers.c
@@ -490,3 +490,141 @@ row_vers_build_for_consistent_read(
 
 	return(err);
 }
+
+/*********************************************************************
+Constructs the last committed version of a clustered index record,
+which should be seen by a semi-consistent read. */
+
+ulint
+row_vers_build_for_semi_consistent_read(
+/*====================================*/
+				/* out: DB_SUCCESS or DB_MISSING_HISTORY */
+	rec_t*		rec,	/* in: record in a clustered index; the
+				caller must have a latch on the page; this
+				latch locks the top of the stack of versions
+				of this records */
+	mtr_t*		mtr,	/* in: mtr holding the latch on rec */
+	dict_index_t*	index,	/* in: the clustered index */
+	ulint**		offsets,/* in/out: offsets returned by
+				rec_get_offsets(rec, index) */
+	mem_heap_t**	offset_heap,/* in/out: memory heap from which
+				the offsets are allocated */
+	mem_heap_t*	in_heap,/* in: memory heap from which the memory for
+				old_vers is allocated; memory for possible
+				intermediate versions is allocated and freed
+				locally within the function */
+	rec_t**		old_vers)/* out, own: rec, old version, or NULL if the
+				record does not exist in the view, that is,
+				it was freshly inserted afterwards */
+{
+	rec_t*		version;
+	mem_heap_t*	heap		= NULL;
+	byte*		buf;
+	ulint		err;
+	dulint		rec_trx_id;
+
+	ut_ad(index->type & DICT_CLUSTERED);
+	ut_ad(mtr_memo_contains(mtr, buf_block_align(rec), MTR_MEMO_PAGE_X_FIX)
+		|| mtr_memo_contains(mtr, buf_block_align(rec),
+						MTR_MEMO_PAGE_S_FIX));
+#ifdef UNIV_SYNC_DEBUG
+	ut_ad(!rw_lock_own(&(purge_sys->latch), RW_LOCK_SHARED));
+#endif /* UNIV_SYNC_DEBUG */
+
+	ut_ad(rec_offs_validate(rec, index, *offsets));
+
+	rw_lock_s_lock(&(purge_sys->latch));
+	/* The S-latch on purge_sys prevents the purge view from
+	changing.  Thus, if we have an uncommitted transaction at
+	this point, then purge cannot remove its undo log even if
+	the transaction could commit now. */
+
+	version = rec;
+
+	for (;;) {
+		trx_t*		version_trx;
+		mem_heap_t*	heap2;
+		rec_t*		prev_version;
+		dulint		version_trx_id;
+
+		version_trx_id = row_get_rec_trx_id(
+					version, index, *offsets);
+		if (rec == version) {
+			rec_trx_id = version_trx_id;
+		}
+
+		mutex_enter(&kernel_mutex);
+		version_trx = trx_get_on_id(version_trx_id);
+		mutex_exit(&kernel_mutex);
+
+		if (!version_trx
+		    || version_trx->conc_state == TRX_NOT_STARTED
+		    || version_trx->conc_state == TRX_COMMITTED_IN_MEMORY) {
+
+			/* We found a version that belongs to a
+			committed transaction: return it. */
+
+			if (rec == version) {
+				*old_vers = rec;
+				err = DB_SUCCESS;
+				break;
+			}
+
+			/* We assume that a rolled-back transaction stays in
+			TRX_ACTIVE state until all the changes have been
+			rolled back and the transaction is removed from
+			the global list of transactions. */
+
+			if (!ut_dulint_cmp(rec_trx_id, version_trx_id)) {
+				/* The transaction was committed while
+				we searched for earlier versions.
+				Return the current version as a
+				semi-consistent read. */
+
+				version = rec;
+				*offsets = rec_get_offsets(version,
+					index, *offsets,
+					ULINT_UNDEFINED, offset_heap);
+			}
+
+			buf = mem_heap_alloc(in_heap, rec_offs_size(*offsets));
+			*old_vers = rec_copy(buf, version, *offsets);
+			rec_offs_make_valid(*old_vers, index, *offsets);
+			err = DB_SUCCESS;
+
+			break;
+		}
+
+		heap2 = heap;
+		heap = mem_heap_create(1024);
+
+		err = trx_undo_prev_version_build(rec, mtr, version, index,
+						*offsets, heap, &prev_version);
+		if (heap2) {
+			mem_heap_free(heap2); /* free version */
+		}
+
+		if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
+			break;
+		}
+
+		if (prev_version == NULL) {
+			/* It was a freshly inserted version */
+			*old_vers = NULL;
+			err = DB_SUCCESS;
+
+			break;
+		}
+
+		version = prev_version;
+		*offsets = rec_get_offsets(version, index, *offsets,
+					ULINT_UNDEFINED, offset_heap);
+	}/* for (;;) */
+
+	if (heap) {
+		mem_heap_free(heap);
+	}
+	rw_lock_s_unlock(&(purge_sys->latch));
+
+	return(err);
+}
-- 
2.30.9