Commit 99726042 authored by marko's avatar marko

branches/zip: Eliminate page corruption in btr_compress() when

page_zip_reorganize() was invoked.

btr_compress(): Obtain orig_pred and orig_succ after copying the records.
Add a debug assertion about FIL_PAGE_PREV.

page_copy_rec_list_end(), page_copy_rec_list_start(): Change the return
type from ibool to rec_t.  Adjust the return value after invoking
page_zip_reorganize().
parent 05c16ac6
......@@ -2312,12 +2312,12 @@ btr_compress(
/* Move records to the merge page */
if (is_left) {
rec_t* orig_pred = page_rec_get_prev(
page_get_supremum_rec(merge_page));
if (UNIV_UNLIKELY(!page_copy_rec_list_start(
rec_t* orig_pred = page_copy_rec_list_start(
merge_page, merge_page_zip,
page_get_supremum_rec(page),
cursor->index, mtr))) {
cursor->index, mtr);
if (UNIV_UNLIKELY(!orig_pred)) {
return(FALSE);
}
......@@ -2331,8 +2331,7 @@ btr_compress(
} else {
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
rec_t* orig_succ = page_rec_get_next(
page_get_infimum_rec(merge_page));
rec_t* orig_succ;
#ifdef UNIV_BTR_DEBUG
byte fil_page_prev[4];
#endif /* UNIV_BTR_DEBUG */
......@@ -2352,12 +2351,16 @@ btr_compress(
memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
}
if (UNIV_UNLIKELY(!page_copy_rec_list_end(
orig_succ = page_copy_rec_list_end(
merge_page, merge_page_zip,
page_get_infimum_rec(page),
cursor->index, mtr))) {
cursor->index, mtr);
if (UNIV_UNLIKELY(!orig_succ)) {
ut_a(merge_page_zip);
/* FIL_PAGE_PREV was restored from merge_page_zip. */
ut_ad(!memcmp(fil_page_prev,
merge_page + FIL_PAGE_PREV, 4));
return(FALSE);
}
......
......@@ -655,10 +655,14 @@ Copies records from page to new_page, from the given record onward,
including that record. Infimum and supremum records are not copied.
The records are copied to the start of the record list on new_page. */
ibool
rec_t*
page_copy_rec_list_end(
/*===================*/
/* out: TRUE on success */
/* out: pointer to the original
successor of the infimum record
on new_page, or NULL on zip overflow
(new_page will be decompressed
from new_page_zip) */
page_t* new_page, /* in/out: index page to copy to */
page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */
rec_t* rec, /* in: record on page */
......@@ -670,12 +674,14 @@ Copies records from page to new_page, up to the given record, NOT
including that record. Infimum and supremum records are not copied.
The records are copied to the end of the record list on new_page. */
ibool
rec_t*
page_copy_rec_list_start(
/*=====================*/
/* out: TRUE on success; FALSE on
compression failure (new_page will
be decompressed from new_page_zip) */
/* out: pointer to the original
predecessor of the supremum record
on new_page, or NULL on zip overflow
(new_page will be decompressed
from new_page_zip) */
page_t* new_page, /* in/out: index page to copy to */
page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */
rec_t* rec, /* in: record on page */
......
......@@ -573,12 +573,14 @@ Copies records from page to new_page, from a given record onward,
including that record. Infimum and supremum records are not copied.
The records are copied to the start of the record list on new_page. */
ibool
rec_t*
page_copy_rec_list_end(
/*===================*/
/* out: TRUE on success; FALSE on
compression failure (new_page will
be decompressed from new_page_zip) */
/* out: pointer to the original
successor of the infimum record
on new_page, or NULL on zip overflow
(new_page will be decompressed
from new_page_zip) */
page_t* new_page, /* in/out: index page to copy to */
page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */
rec_t* rec, /* in: record on page */
......@@ -586,6 +588,7 @@ page_copy_rec_list_end(
mtr_t* mtr) /* in: mtr */
{
page_t* page = ut_align_down(rec, UNIV_PAGE_SIZE);
rec_t* ret = page_rec_get_next(page_get_infimum_rec(new_page));
ulint log_mode= 0; /* remove warning */
/* page_zip_validate() will fail here if btr_compress()
......@@ -608,15 +611,29 @@ page_copy_rec_list_end(
mtr_set_log_mode(mtr, log_mode);
if (UNIV_UNLIKELY(!page_zip_compress(
new_page_zip, new_page, index, mtr))
&& UNIV_UNLIKELY(!page_zip_reorganize(
new_page_zip, new_page, index, mtr))) {
/* Before trying to reorganize the page,
store the number of preceding records on the page. */
ulint ret_pos
= page_rec_get_n_recs_before(ret);
if (UNIV_UNLIKELY(!page_zip_reorganize(
new_page_zip, new_page, index, mtr))) {
if (UNIV_UNLIKELY(!page_zip_decompress(
new_page_zip, new_page))) {
ut_error;
}
return(NULL);
} else {
/* The page was reorganized:
Seek to ret_pos. */
ret = new_page + PAGE_NEW_INFIMUM;
if (UNIV_UNLIKELY(!page_zip_decompress(
new_page_zip, new_page))) {
ut_error;
do {
ret = rec_get_next_ptr(ret, TRUE);
} while (--ret_pos);
}
return(FALSE);
}
}
......@@ -629,7 +646,7 @@ page_copy_rec_list_end(
btr_search_move_or_delete_hash_entries(new_page, page, index);
return(TRUE);
return(ret);
}
/*****************************************************************
......@@ -637,12 +654,14 @@ Copies records from page to new_page, up to the given record,
NOT including that record. Infimum and supremum records are not copied.
The records are copied to the end of the record list on new_page. */
ibool
rec_t*
page_copy_rec_list_start(
/*=====================*/
/* out: TRUE on success; FALSE on
compression failure (new_page will
be decompressed from new_page_zip) */
/* out: pointer to the original
predecessor of the supremum record
on new_page, or NULL on zip overflow
(new_page will be decompressed
from new_page_zip) */
page_t* new_page, /* in/out: index page to copy to */
page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */
rec_t* rec, /* in: record on page */
......@@ -655,13 +674,15 @@ page_copy_rec_list_start(
rec_t* old_end;
ulint log_mode = 0 /* remove warning */;
mem_heap_t* heap = NULL;
rec_t* ret = page_rec_get_prev(
page_get_supremum_rec(new_page));
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
if (page_rec_is_infimum(rec)) {
return(TRUE);
return(ret);
}
if (UNIV_LIKELY_NULL(new_page_zip)) {
......@@ -700,16 +721,30 @@ page_copy_rec_list_start(
mtr_set_log_mode(mtr, log_mode);
if (UNIV_UNLIKELY(!page_zip_compress(
new_page_zip, new_page, index, mtr))
&& UNIV_UNLIKELY(!page_zip_reorganize(
new_page_zip, new_page, index, mtr))) {
/* Before trying to reorganize the page,
store the number of preceding records on the page. */
ulint ret_pos
= page_rec_get_n_recs_before(ret);
if (UNIV_UNLIKELY(!page_zip_decompress(
new_page_zip, new_page))) {
ut_error;
}
if (UNIV_UNLIKELY(!page_zip_reorganize(
new_page_zip, new_page, index, mtr))) {
if (UNIV_UNLIKELY(!page_zip_decompress(
new_page_zip, new_page))) {
ut_error;
}
return(FALSE);
return(NULL);
} else {
/* The page was reorganized:
Seek to ret_pos. */
ret = new_page + PAGE_NEW_INFIMUM;
do {
ret = rec_get_next_ptr(ret, TRUE);
} while (--ret_pos);
}
}
}
......@@ -722,7 +757,7 @@ page_copy_rec_list_start(
btr_search_move_or_delete_hash_entries(new_page, page, index);
return(TRUE);
return(ret);
}
/**************************************************************
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment