Commit 6852a326 authored by Marko Mäkelä's avatar Marko Mäkelä

Bug#13340047 LATCHING ORDER VIOLATION IN IBUF_SET_ENTRY_COUNTER()

ibuf_insert_low(), the only caller of ibuf_set_entry_counter(), will
have latched an insert buffer bitmap page in bitmap_mtr before
invoking ibuf_set_entry_counter(). The latching order forbids any
further pages to be latched.

ibuf_set_entry_counter(): Renamed to ibuf_get_entry_counter(),
simplified the code and added comments.

Added the following symbols for predefined field numbers in change
buffer records:

#define IBUF_REC_FIELD_SPACE	0	/*!< in the pre-4.1 format,
					the page number. later, the space_id */
#define IBUF_REC_FIELD_MARKER	1	/*!< starting with 4.1, a marker
					consisting of 1 byte that is 0 */
#define IBUF_REC_FIELD_PAGE	2	/*!< starting with 4.1, the
					page number */
#define IBUF_REC_FIELD_METADATA	3	/* the metadata field */
#define IBUF_REC_FIELD_USER	4	/* first user field */

rb:802 approved by Sunny Bains
parent d946b1d3
...@@ -254,11 +254,20 @@ ibuf_count_check( ...@@ -254,11 +254,20 @@ ibuf_count_check(
list of the ibuf */ list of the ibuf */
/* @} */ /* @} */
#define IBUF_REC_FIELD_SPACE 0 /*!< in the pre-4.1 format,
the page number. later, the space_id */
#define IBUF_REC_FIELD_MARKER 1 /*!< starting with 4.1, a marker
consisting of 1 byte that is 0 */
#define IBUF_REC_FIELD_PAGE 2 /*!< starting with 4.1, the
page number */
#define IBUF_REC_FIELD_METADATA 3 /* the metadata field */
#define IBUF_REC_FIELD_USER 4 /* first user field */
/* Various constants for checking the type of an ibuf record and extracting /* Various constants for checking the type of an ibuf record and extracting
data from it. For details, see the description of the record format at the data from it. For details, see the description of the record format at the
top of this file. */ top of this file. */
/** @name Format of the fourth column of an insert buffer record /** @name Format of the IBUF_REC_FIELD_METADATA of an insert buffer record
The fourth column in the MySQL 5.5 format contains an operation The fourth column in the MySQL 5.5 format contains an operation
type, counter, and some flags. */ type, counter, and some flags. */
/* @{ */ /* @{ */
...@@ -1233,13 +1242,13 @@ ibuf_rec_get_page_no_func( ...@@ -1233,13 +1242,13 @@ ibuf_rec_get_page_no_func(
ut_ad(ibuf_inside(mtr)); ut_ad(ibuf_inside(mtr));
ut_ad(rec_get_n_fields_old(rec) > 2); ut_ad(rec_get_n_fields_old(rec) > 2);
field = rec_get_nth_field_old(rec, 1, &len); field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
if (len == 1) { if (len == 1) {
/* This is of the >= 4.1.x record format */ /* This is of the >= 4.1.x record format */
ut_a(trx_sys_multiple_tablespace_format); ut_a(trx_sys_multiple_tablespace_format);
field = rec_get_nth_field_old(rec, 2, &len); field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_PAGE, &len);
} else { } else {
ut_a(trx_doublewrite_must_reset_space_ids); ut_a(trx_doublewrite_must_reset_space_ids);
ut_a(!trx_sys_multiple_tablespace_format); ut_a(!trx_sys_multiple_tablespace_format);
...@@ -1279,13 +1288,13 @@ ibuf_rec_get_space_func( ...@@ -1279,13 +1288,13 @@ ibuf_rec_get_space_func(
ut_ad(ibuf_inside(mtr)); ut_ad(ibuf_inside(mtr));
ut_ad(rec_get_n_fields_old(rec) > 2); ut_ad(rec_get_n_fields_old(rec) > 2);
field = rec_get_nth_field_old(rec, 1, &len); field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
if (len == 1) { if (len == 1) {
/* This is of the >= 4.1.x record format */ /* This is of the >= 4.1.x record format */
ut_a(trx_sys_multiple_tablespace_format); ut_a(trx_sys_multiple_tablespace_format);
field = rec_get_nth_field_old(rec, 0, &len); field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);
ut_a(len == 4); ut_a(len == 4);
return(mach_read_from_4(field)); return(mach_read_from_4(field));
...@@ -1335,9 +1344,9 @@ ibuf_rec_get_info_func( ...@@ -1335,9 +1344,9 @@ ibuf_rec_get_info_func(
|| mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX)); || mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
ut_ad(ibuf_inside(mtr)); ut_ad(ibuf_inside(mtr));
fields = rec_get_n_fields_old(rec); fields = rec_get_n_fields_old(rec);
ut_a(fields > 4); ut_a(fields > IBUF_REC_FIELD_USER);
types = rec_get_nth_field_old(rec, 3, &len); types = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
info_len_local = len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE; info_len_local = len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
...@@ -1363,7 +1372,8 @@ ibuf_rec_get_info_func( ...@@ -1363,7 +1372,8 @@ ibuf_rec_get_info_func(
ut_a(op_local < IBUF_OP_COUNT); ut_a(op_local < IBUF_OP_COUNT);
ut_a((len - info_len_local) == ut_a((len - info_len_local) ==
(fields - 4) * DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE); (fields - IBUF_REC_FIELD_USER)
* DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE);
if (op) { if (op) {
*op = op_local; *op = op_local;
...@@ -1407,7 +1417,7 @@ ibuf_rec_get_op_type_func( ...@@ -1407,7 +1417,7 @@ ibuf_rec_get_op_type_func(
ut_ad(ibuf_inside(mtr)); ut_ad(ibuf_inside(mtr));
ut_ad(rec_get_n_fields_old(rec) > 2); ut_ad(rec_get_n_fields_old(rec) > 2);
(void) rec_get_nth_field_old(rec, 1, &len); (void) rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
if (len > 1) { if (len > 1) {
/* This is a < 4.1.x format record */ /* This is a < 4.1.x format record */
...@@ -1436,12 +1446,12 @@ ibuf_rec_get_counter( ...@@ -1436,12 +1446,12 @@ ibuf_rec_get_counter(
const byte* ptr; const byte* ptr;
ulint len; ulint len;
if (rec_get_n_fields_old(rec) < 4) { if (rec_get_n_fields_old(rec) <= IBUF_REC_FIELD_METADATA) {
return(ULINT_UNDEFINED); return(ULINT_UNDEFINED);
} }
ptr = rec_get_nth_field_old(rec, 3, &len); ptr = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
if (len >= 2) { if (len >= 2) {
...@@ -1666,7 +1676,7 @@ ibuf_build_entry_from_ibuf_rec_func( ...@@ -1666,7 +1676,7 @@ ibuf_build_entry_from_ibuf_rec_func(
|| mtr_memo_contains_page(mtr, ibuf_rec, MTR_MEMO_PAGE_S_FIX)); || mtr_memo_contains_page(mtr, ibuf_rec, MTR_MEMO_PAGE_S_FIX));
ut_ad(ibuf_inside(mtr)); ut_ad(ibuf_inside(mtr));
data = rec_get_nth_field_old(ibuf_rec, 1, &len); data = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_MARKER, &len);
if (len > 1) { if (len > 1) {
/* This a < 4.1.x format record */ /* This a < 4.1.x format record */
...@@ -1678,13 +1688,13 @@ ibuf_build_entry_from_ibuf_rec_func( ...@@ -1678,13 +1688,13 @@ ibuf_build_entry_from_ibuf_rec_func(
ut_a(trx_sys_multiple_tablespace_format); ut_a(trx_sys_multiple_tablespace_format);
ut_a(*data == 0); ut_a(*data == 0);
ut_a(rec_get_n_fields_old(ibuf_rec) > 4); ut_a(rec_get_n_fields_old(ibuf_rec) > IBUF_REC_FIELD_USER);
n_fields = rec_get_n_fields_old(ibuf_rec) - 4; n_fields = rec_get_n_fields_old(ibuf_rec) - IBUF_REC_FIELD_USER;
tuple = dtuple_create(heap, n_fields); tuple = dtuple_create(heap, n_fields);
types = rec_get_nth_field_old(ibuf_rec, 3, &len); types = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_METADATA, &len);
ibuf_rec_get_info(mtr, ibuf_rec, NULL, &comp, &info_len, NULL); ibuf_rec_get_info(mtr, ibuf_rec, NULL, &comp, &info_len, NULL);
...@@ -1698,7 +1708,8 @@ ibuf_build_entry_from_ibuf_rec_func( ...@@ -1698,7 +1708,8 @@ ibuf_build_entry_from_ibuf_rec_func(
for (i = 0; i < n_fields; i++) { for (i = 0; i < n_fields; i++) {
field = dtuple_get_nth_field(tuple, i); field = dtuple_get_nth_field(tuple, i);
data = rec_get_nth_field_old(ibuf_rec, i + 4, &len); data = rec_get_nth_field_old(
ibuf_rec, i + IBUF_REC_FIELD_USER, &len);
dfield_set_data(field, data, len); dfield_set_data(field, data, len);
...@@ -1745,7 +1756,7 @@ ibuf_rec_get_size( ...@@ -1745,7 +1756,7 @@ ibuf_rec_get_size(
field_offset = 2; field_offset = 2;
types_offset = DATA_ORDER_NULL_TYPE_BUF_SIZE; types_offset = DATA_ORDER_NULL_TYPE_BUF_SIZE;
} else { } else {
field_offset = 4; field_offset = IBUF_REC_FIELD_USER;
types_offset = DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE; types_offset = DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE;
} }
...@@ -1806,7 +1817,7 @@ ibuf_rec_get_volume_func( ...@@ -1806,7 +1817,7 @@ ibuf_rec_get_volume_func(
ut_ad(ibuf_inside(mtr)); ut_ad(ibuf_inside(mtr));
ut_ad(rec_get_n_fields_old(ibuf_rec) > 2); ut_ad(rec_get_n_fields_old(ibuf_rec) > 2);
data = rec_get_nth_field_old(ibuf_rec, 1, &len); data = rec_get_nth_field_old(ibuf_rec, IBUF_REC_FIELD_MARKER, &len);
pre_4_1 = (len > 1); pre_4_1 = (len > 1);
if (pre_4_1) { if (pre_4_1) {
...@@ -1829,7 +1840,8 @@ ibuf_rec_get_volume_func( ...@@ -1829,7 +1840,8 @@ ibuf_rec_get_volume_func(
ut_a(trx_sys_multiple_tablespace_format); ut_a(trx_sys_multiple_tablespace_format);
ut_a(*data == 0); ut_a(*data == 0);
types = rec_get_nth_field_old(ibuf_rec, 3, &len); types = rec_get_nth_field_old(
ibuf_rec, IBUF_REC_FIELD_METADATA, &len);
ibuf_rec_get_info(mtr, ibuf_rec, &op, &comp, &info_len, NULL); ibuf_rec_get_info(mtr, ibuf_rec, &op, &comp, &info_len, NULL);
...@@ -1859,7 +1871,8 @@ ibuf_rec_get_volume_func( ...@@ -1859,7 +1871,8 @@ ibuf_rec_get_volume_func(
} }
types += info_len; types += info_len;
n_fields = rec_get_n_fields_old(ibuf_rec) - 4; n_fields = rec_get_n_fields_old(ibuf_rec)
- IBUF_REC_FIELD_USER;
} }
data_size = ibuf_rec_get_size(ibuf_rec, types, n_fields, pre_4_1, comp); data_size = ibuf_rec_get_size(ibuf_rec, types, n_fields, pre_4_1, comp);
...@@ -1914,11 +1927,11 @@ ibuf_entry_build( ...@@ -1914,11 +1927,11 @@ ibuf_entry_build(
n_fields = dtuple_get_n_fields(entry); n_fields = dtuple_get_n_fields(entry);
tuple = dtuple_create(heap, n_fields + 4); tuple = dtuple_create(heap, n_fields + IBUF_REC_FIELD_USER);
/* 1) Space Id */ /* 1) Space Id */
field = dtuple_get_nth_field(tuple, 0); field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_SPACE);
buf = mem_heap_alloc(heap, 4); buf = mem_heap_alloc(heap, 4);
...@@ -1928,7 +1941,7 @@ ibuf_entry_build( ...@@ -1928,7 +1941,7 @@ ibuf_entry_build(
/* 2) Marker byte */ /* 2) Marker byte */
field = dtuple_get_nth_field(tuple, 1); field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_MARKER);
buf = mem_heap_alloc(heap, 1); buf = mem_heap_alloc(heap, 1);
...@@ -1940,7 +1953,7 @@ ibuf_entry_build( ...@@ -1940,7 +1953,7 @@ ibuf_entry_build(
/* 3) Page number */ /* 3) Page number */
field = dtuple_get_nth_field(tuple, 2); field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_PAGE);
buf = mem_heap_alloc(heap, 4); buf = mem_heap_alloc(heap, 4);
...@@ -1988,10 +2001,7 @@ ibuf_entry_build( ...@@ -1988,10 +2001,7 @@ ibuf_entry_build(
ulint fixed_len; ulint fixed_len;
const dict_field_t* ifield; const dict_field_t* ifield;
/* We add 4 below because we have the 4 extra fields at the field = dtuple_get_nth_field(tuple, i + IBUF_REC_FIELD_USER);
start of an ibuf record */
field = dtuple_get_nth_field(tuple, i + 4);
entry_field = dtuple_get_nth_field(entry, i); entry_field = dtuple_get_nth_field(entry, i);
dfield_copy(field, entry_field); dfield_copy(field, entry_field);
...@@ -2024,13 +2034,13 @@ ibuf_entry_build( ...@@ -2024,13 +2034,13 @@ ibuf_entry_build(
/* 4) Type info, part #2 */ /* 4) Type info, part #2 */
field = dtuple_get_nth_field(tuple, 3); field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_METADATA);
dfield_set_data(field, type_info, ti - type_info); dfield_set_data(field, type_info, ti - type_info);
/* Set all the types in the new tuple binary */ /* Set all the types in the new tuple binary */
dtuple_set_types_binary(tuple, n_fields + 4); dtuple_set_types_binary(tuple, n_fields + IBUF_REC_FIELD_USER);
return(tuple); return(tuple);
} }
...@@ -2090,11 +2100,11 @@ ibuf_new_search_tuple_build( ...@@ -2090,11 +2100,11 @@ ibuf_new_search_tuple_build(
ut_a(trx_sys_multiple_tablespace_format); ut_a(trx_sys_multiple_tablespace_format);
tuple = dtuple_create(heap, 3); tuple = dtuple_create(heap, IBUF_REC_FIELD_METADATA);
/* Store the space id in tuple */ /* Store the space id in tuple */
field = dtuple_get_nth_field(tuple, 0); field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_SPACE);
buf = mem_heap_alloc(heap, 4); buf = mem_heap_alloc(heap, 4);
...@@ -2104,7 +2114,7 @@ ibuf_new_search_tuple_build( ...@@ -2104,7 +2114,7 @@ ibuf_new_search_tuple_build(
/* Store the new format record marker byte */ /* Store the new format record marker byte */
field = dtuple_get_nth_field(tuple, 1); field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_MARKER);
buf = mem_heap_alloc(heap, 1); buf = mem_heap_alloc(heap, 1);
...@@ -2114,7 +2124,7 @@ ibuf_new_search_tuple_build( ...@@ -2114,7 +2124,7 @@ ibuf_new_search_tuple_build(
/* Store the page number in tuple */ /* Store the page number in tuple */
field = dtuple_get_nth_field(tuple, 2); field = dtuple_get_nth_field(tuple, IBUF_REC_FIELD_PAGE);
buf = mem_heap_alloc(heap, 4); buf = mem_heap_alloc(heap, 4);
...@@ -2122,7 +2132,7 @@ ibuf_new_search_tuple_build( ...@@ -2122,7 +2132,7 @@ ibuf_new_search_tuple_build(
dfield_set_data(field, buf, 4); dfield_set_data(field, buf, 4);
dtuple_set_types_binary(tuple, 3); dtuple_set_types_binary(tuple, IBUF_REC_FIELD_METADATA);
return(tuple); return(tuple);
} }
...@@ -2789,8 +2799,10 @@ ibuf_get_volume_buffered_hash( ...@@ -2789,8 +2799,10 @@ ibuf_get_volume_buffered_hash(
ulint fold; ulint fold;
ulint bitmask; ulint bitmask;
len = ibuf_rec_get_size(rec, types, rec_get_n_fields_old(rec) - 4, len = ibuf_rec_get_size(
FALSE, comp); rec, types,
rec_get_n_fields_old(rec) - IBUF_REC_FIELD_USER,
FALSE, comp);
fold = ut_fold_binary(data, len); fold = ut_fold_binary(data, len);
hash += (fold / (CHAR_BIT * sizeof *hash)) % size; hash += (fold / (CHAR_BIT * sizeof *hash)) % size;
...@@ -2842,8 +2854,8 @@ ibuf_get_volume_buffered_count_func( ...@@ -2842,8 +2854,8 @@ ibuf_get_volume_buffered_count_func(
ut_ad(ibuf_inside(mtr)); ut_ad(ibuf_inside(mtr));
n_fields = rec_get_n_fields_old(rec); n_fields = rec_get_n_fields_old(rec);
ut_ad(n_fields > 4); ut_ad(n_fields > IBUF_REC_FIELD_USER);
n_fields -= 4; n_fields -= IBUF_REC_FIELD_USER;
rec_get_nth_field_offs_old(rec, 1, &len); rec_get_nth_field_offs_old(rec, 1, &len);
/* This function is only invoked when buffering new /* This function is only invoked when buffering new
...@@ -2852,7 +2864,7 @@ ibuf_get_volume_buffered_count_func( ...@@ -2852,7 +2864,7 @@ ibuf_get_volume_buffered_count_func(
ut_a(len == 1); ut_a(len == 1);
ut_ad(trx_sys_multiple_tablespace_format); ut_ad(trx_sys_multiple_tablespace_format);
types = rec_get_nth_field_old(rec, 3, &len); types = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
switch (UNIV_EXPECT(len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE, switch (UNIV_EXPECT(len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE,
IBUF_REC_INFO_SIZE)) { IBUF_REC_INFO_SIZE)) {
...@@ -3164,7 +3176,7 @@ ibuf_update_max_tablespace_id(void) ...@@ -3164,7 +3176,7 @@ ibuf_update_max_tablespace_id(void)
} else { } else {
rec = btr_pcur_get_rec(&pcur); rec = btr_pcur_get_rec(&pcur);
field = rec_get_nth_field_old(rec, 0, &len); field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);
ut_a(len == 4); ut_a(len == 4);
...@@ -3186,10 +3198,12 @@ ibuf_update_max_tablespace_id(void) ...@@ -3186,10 +3198,12 @@ ibuf_update_max_tablespace_id(void)
ibuf_get_entry_counter_low_func(rec,space,page_no) ibuf_get_entry_counter_low_func(rec,space,page_no)
#endif #endif
/****************************************************************//** /****************************************************************//**
Helper function for ibuf_set_entry_counter. Checks if rec is for (space, Helper function for ibuf_get_entry_counter_func. Checks if rec is for
page_no), and if so, reads counter value from it and returns that + 1. (space, page_no), and if so, reads counter value from it and returns
Otherwise, returns 0. that + 1.
@return new counter value, or 0 */ @retval ULINT_UNDEFINED if the record does not contain any counter
@retval 0 if the record is not for (space, page_no)
@retval 1 + previous counter value, otherwise */
static static
ulint ulint
ibuf_get_entry_counter_low_func( ibuf_get_entry_counter_low_func(
...@@ -3210,7 +3224,7 @@ ibuf_get_entry_counter_low_func( ...@@ -3210,7 +3224,7 @@ ibuf_get_entry_counter_low_func(
|| mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX)); || mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_S_FIX));
ut_ad(rec_get_n_fields_old(rec) > 2); ut_ad(rec_get_n_fields_old(rec) > 2);
field = rec_get_nth_field_old(rec, 1, &len); field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_MARKER, &len);
if (UNIV_UNLIKELY(len != 1)) { if (UNIV_UNLIKELY(len != 1)) {
/* pre-4.1 format */ /* pre-4.1 format */
...@@ -3223,7 +3237,7 @@ ibuf_get_entry_counter_low_func( ...@@ -3223,7 +3237,7 @@ ibuf_get_entry_counter_low_func(
ut_a(trx_sys_multiple_tablespace_format); ut_a(trx_sys_multiple_tablespace_format);
/* Check the tablespace identifier. */ /* Check the tablespace identifier. */
field = rec_get_nth_field_old(rec, 0, &len); field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_SPACE, &len);
ut_a(len == 4); ut_a(len == 4);
if (mach_read_from_4(field) != space) { if (mach_read_from_4(field) != space) {
...@@ -3232,7 +3246,7 @@ ibuf_get_entry_counter_low_func( ...@@ -3232,7 +3246,7 @@ ibuf_get_entry_counter_low_func(
} }
/* Check the page offset. */ /* Check the page offset. */
field = rec_get_nth_field_old(rec, 2, &len); field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_PAGE, &len);
ut_a(len == 4); ut_a(len == 4);
if (mach_read_from_4(field) != page_no) { if (mach_read_from_4(field) != page_no) {
...@@ -3241,7 +3255,7 @@ ibuf_get_entry_counter_low_func( ...@@ -3241,7 +3255,7 @@ ibuf_get_entry_counter_low_func(
} }
/* Check if the record contains a counter field. */ /* Check if the record contains a counter field. */
field = rec_get_nth_field_old(rec, 3, &len); field = rec_get_nth_field_old(rec, IBUF_REC_FIELD_METADATA, &len);
switch (len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE) { switch (len % DATA_NEW_ORDER_NULL_TYPE_BUF_SIZE) {
default: default:
...@@ -3257,147 +3271,61 @@ ibuf_get_entry_counter_low_func( ...@@ -3257,147 +3271,61 @@ ibuf_get_entry_counter_low_func(
} }
} }
#ifdef UNIV_DEBUG
# define ibuf_get_entry_counter(space,page_no,rec,mtr,exact_leaf) \
ibuf_get_entry_counter_func(space,page_no,rec,mtr,exact_leaf)
#else /* UNIV_DEBUG */
# define ibuf_get_entry_counter(space,page_no,rec,mtr,exact_leaf) \
ibuf_get_entry_counter_func(space,page_no,rec,exact_leaf)
#endif
/****************************************************************//** /****************************************************************//**
Set the counter field in entry to the correct value based on the current Calculate the counter field for an entry based on the current
last record in ibuf for (space, page_no). last record in ibuf for (space, page_no).
@return FALSE if we should abort this insertion to ibuf */ @return the counter field, or ULINT_UNDEFINED
if we should abort this insertion to ibuf */
static static
ibool ulint
ibuf_set_entry_counter( ibuf_get_entry_counter_func(
/*===================*/ /*========================*/
dtuple_t* entry, /*!< in/out: entry to patch */
ulint space, /*!< in: space id of entry */ ulint space, /*!< in: space id of entry */
ulint page_no, /*!< in: page number of entry */ ulint page_no, /*!< in: page number of entry */
btr_pcur_t* pcur, /*!< in: pcur positioned on the record const rec_t* rec, /*!< in: the record preceding the
found by btr_pcur_open(.., entry, insertion point */
PAGE_CUR_LE, ..., pcur, ...) */ #ifdef UNIV_DEBUG
ibool is_optimistic, /*!< in: is this an optimistic insert */ mtr_t* mtr, /*!< in: mini-transaction */
mtr_t* mtr) /*!< in: mtr */ #endif /* UNIV_DEBUG */
ibool only_leaf) /*!< in: TRUE if this is the only
leaf page that can contain entries
for (space,page_no), that is, there
was no exact match for (space,page_no)
in the node pointer */
{ {
dfield_t* field;
byte* data;
ulint counter = 0;
/* pcur points to either a user rec or to a page's infimum record. */
ut_ad(ibuf_inside(mtr)); ut_ad(ibuf_inside(mtr));
ut_ad(mtr_memo_contains(mtr, btr_pcur_get_block(pcur), ut_ad(mtr_memo_contains_page(mtr, rec, MTR_MEMO_PAGE_X_FIX));
MTR_MEMO_PAGE_X_FIX)); ut_ad(page_validate(page_align(rec), ibuf->index));
ut_ad(page_validate(btr_pcur_get_page(pcur), ibuf->index));
if (btr_pcur_is_on_user_rec(pcur)) {
counter = ibuf_get_entry_counter_low(
mtr, btr_pcur_get_rec(pcur), space, page_no);
if (UNIV_UNLIKELY(counter == ULINT_UNDEFINED)) {
/* The record lacks a counter field.
Such old records must be merged before
new records can be buffered. */
return(FALSE);
}
} else if (btr_pcur_is_before_first_in_tree(pcur, mtr)) {
/* Ibuf tree is either completely empty, or the insert
position is at the very first record of a non-empty tree. In
either case we have no previous records for (space,
page_no). */
counter = 0;
} else if (btr_pcur_is_before_first_on_page(pcur)) {
btr_cur_t* cursor = btr_pcur_get_btr_cur(pcur);
if (cursor->low_match < 3) {
/* If low_match < 3, we know that the father node
pointer did not contain the searched for (space,
page_no), which means that the search ended on the
right page regardless of the counter value, and
since we're at the infimum record, there are no
existing records. */
counter = 0;
} else {
rec_t* rec;
const page_t* page;
buf_block_t* block;
page_t* prev_page;
ulint prev_page_no;
ut_a(cursor->ibuf_cnt != ULINT_UNDEFINED);
page = btr_pcur_get_page(pcur);
prev_page_no = btr_page_get_prev(page, mtr);
ut_a(prev_page_no != FIL_NULL);
block = buf_page_get(
IBUF_SPACE_ID, 0, prev_page_no,
RW_X_LATCH, mtr);
buf_block_dbg_add_level(block, SYNC_IBUF_TREE_NODE); if (page_rec_is_supremum(rec)) {
/* This is just for safety. The record should be a
prev_page = buf_block_get_frame(block); page infimum or a user record. */
ut_ad(0);
rec = page_rec_get_prev( return(ULINT_UNDEFINED);
page_get_supremum_rec(prev_page)); } else if (!page_rec_is_infimum(rec)) {
return(ibuf_get_entry_counter_low(mtr, rec, space, page_no));
ut_ad(page_rec_is_user_rec(rec)); } else if (only_leaf
|| fil_page_get_prev(page_align(rec)) == FIL_NULL) {
counter = ibuf_get_entry_counter_low( /* The parent node pointer did not contain the
mtr, rec, space, page_no); searched for (space, page_no), which means that the
search ended on the correct page regardless of the
if (UNIV_UNLIKELY(counter == ULINT_UNDEFINED)) { counter value, and since we're at the infimum record,
/* The record lacks a counter field. there are no existing records. */
Such old records must be merged before return(0);
new records can be buffered. */
return(FALSE);
}
if (counter < cursor->ibuf_cnt) {
/* Search ended on the wrong page. */
if (is_optimistic) {
/* In an optimistic insert, we can
shift the insert position to the left
page, since it only needs an X-latch
on the page itself, which the
original search acquired for us. */
btr_cur_position(
ibuf->index, rec, block,
btr_pcur_get_btr_cur(pcur));
} else {
/* We can't shift the insert
position to the left page in a
pessimistic insert since it would
require an X-latch on the left
page's left page, so we have to
abort. */
return(FALSE);
}
} else {
/* The counter field in the father node is
the same as we would insert; we don't know
whether the insert should go to this page or
the left page (the later fields can differ),
so refuse the insert. */
return(FALSE);
}
}
} else { } else {
/* The cursor is not positioned at or before a user record. */ /* We used to read the previous page here. It would
return(FALSE); break the latching order, because the caller has
buffer-fixed an insert buffer bitmap page. */
return(ULINT_UNDEFINED);
} }
/* Patch counter value in already built entry. */
field = dtuple_get_nth_field(entry, 3);
data = dfield_get_data(field);
mach_write_to_2(data + IBUF_REC_OFFSET_COUNTER, counter);
return(TRUE);
} }
/*********************************************************************//** /*********************************************************************//**
...@@ -3604,16 +3532,27 @@ fail_exit: ...@@ -3604,16 +3532,27 @@ fail_exit:
} }
} }
/* Patch correct counter value to the entry to insert. This can if (!no_counter) {
change the insert position, which can result in the need to abort in /* Patch correct counter value to the entry to
some cases. */ insert. This can change the insert position, which can
if (!no_counter result in the need to abort in some cases. */
&& !ibuf_set_entry_counter(ibuf_entry, space, page_no, &pcur, ulint counter = ibuf_get_entry_counter(
mode == BTR_MODIFY_PREV, &mtr)) { space, page_no, btr_pcur_get_rec(&pcur), &mtr,
btr_pcur_get_btr_cur(&pcur)->low_match
< IBUF_REC_FIELD_METADATA);
dfield_t* field;
if (counter == ULINT_UNDEFINED) {
bitmap_fail: bitmap_fail:
ibuf_mtr_commit(&bitmap_mtr); ibuf_mtr_commit(&bitmap_mtr);
goto fail_exit;
}
goto fail_exit; field = dtuple_get_nth_field(
ibuf_entry, IBUF_REC_FIELD_METADATA);
mach_write_to_2(
dfield_get_data(field) + IBUF_REC_OFFSET_COUNTER,
counter);
} }
/* Set the bitmap bit denoting that the insert buffer contains /* Set the bitmap bit denoting that the insert buffer contains
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment