Commit 6b077aa8 authored by marko's avatar marko

branches/zip: Eliminate page corruption in btr_compress() when

page_zip_reorganize() was invoked.

btr_compress(): Obtain orig_pred and orig_succ after copying the records.
Add a debug assertion about FIL_PAGE_PREV.

page_copy_rec_list_end(), page_copy_rec_list_start(): Change the return
type from ibool to rec_t.  Adjust the return value after invoking
page_zip_reorganize().
parent 0c0e7b92
...@@ -2312,12 +2312,12 @@ btr_compress( ...@@ -2312,12 +2312,12 @@ btr_compress(
/* Move records to the merge page */ /* Move records to the merge page */
if (is_left) { if (is_left) {
rec_t* orig_pred = page_rec_get_prev( rec_t* orig_pred = page_copy_rec_list_start(
page_get_supremum_rec(merge_page));
if (UNIV_UNLIKELY(!page_copy_rec_list_start(
merge_page, merge_page_zip, merge_page, merge_page_zip,
page_get_supremum_rec(page), page_get_supremum_rec(page),
cursor->index, mtr))) { cursor->index, mtr);
if (UNIV_UNLIKELY(!orig_pred)) {
return(FALSE); return(FALSE);
} }
...@@ -2331,8 +2331,7 @@ btr_compress( ...@@ -2331,8 +2331,7 @@ btr_compress(
} else { } else {
mem_heap_t* heap = NULL; mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE]; ulint offsets_[REC_OFFS_NORMAL_SIZE];
rec_t* orig_succ = page_rec_get_next( rec_t* orig_succ;
page_get_infimum_rec(merge_page));
#ifdef UNIV_BTR_DEBUG #ifdef UNIV_BTR_DEBUG
byte fil_page_prev[4]; byte fil_page_prev[4];
#endif /* UNIV_BTR_DEBUG */ #endif /* UNIV_BTR_DEBUG */
...@@ -2352,12 +2351,16 @@ btr_compress( ...@@ -2352,12 +2351,16 @@ btr_compress(
memset(merge_page + FIL_PAGE_PREV, 0xff, 4); memset(merge_page + FIL_PAGE_PREV, 0xff, 4);
} }
if (UNIV_UNLIKELY(!page_copy_rec_list_end( orig_succ = page_copy_rec_list_end(
merge_page, merge_page_zip, merge_page, merge_page_zip,
page_get_infimum_rec(page), page_get_infimum_rec(page),
cursor->index, mtr))) { cursor->index, mtr);
if (UNIV_UNLIKELY(!orig_succ)) {
ut_a(merge_page_zip); ut_a(merge_page_zip);
/* FIL_PAGE_PREV was restored from merge_page_zip. */ /* FIL_PAGE_PREV was restored from merge_page_zip. */
ut_ad(!memcmp(fil_page_prev,
merge_page + FIL_PAGE_PREV, 4));
return(FALSE); return(FALSE);
} }
......
...@@ -655,10 +655,14 @@ Copies records from page to new_page, from the given record onward, ...@@ -655,10 +655,14 @@ Copies records from page to new_page, from the given record onward,
including that record. Infimum and supremum records are not copied. including that record. Infimum and supremum records are not copied.
The records are copied to the start of the record list on new_page. */ The records are copied to the start of the record list on new_page. */
ibool rec_t*
page_copy_rec_list_end( page_copy_rec_list_end(
/*===================*/ /*===================*/
/* out: TRUE on success */ /* out: pointer to the original
successor of the infimum record
on new_page, or NULL on zip overflow
(new_page will be decompressed
from new_page_zip) */
page_t* new_page, /* in/out: index page to copy to */ page_t* new_page, /* in/out: index page to copy to */
page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */ page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */
rec_t* rec, /* in: record on page */ rec_t* rec, /* in: record on page */
...@@ -670,12 +674,14 @@ Copies records from page to new_page, up to the given record, NOT ...@@ -670,12 +674,14 @@ Copies records from page to new_page, up to the given record, NOT
including that record. Infimum and supremum records are not copied. including that record. Infimum and supremum records are not copied.
The records are copied to the end of the record list on new_page. */ The records are copied to the end of the record list on new_page. */
ibool rec_t*
page_copy_rec_list_start( page_copy_rec_list_start(
/*=====================*/ /*=====================*/
/* out: TRUE on success; FALSE on /* out: pointer to the original
compression failure (new_page will predecessor of the supremum record
be decompressed from new_page_zip) */ on new_page, or NULL on zip overflow
(new_page will be decompressed
from new_page_zip) */
page_t* new_page, /* in/out: index page to copy to */ page_t* new_page, /* in/out: index page to copy to */
page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */ page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */
rec_t* rec, /* in: record on page */ rec_t* rec, /* in: record on page */
......
...@@ -573,12 +573,14 @@ Copies records from page to new_page, from a given record onward, ...@@ -573,12 +573,14 @@ Copies records from page to new_page, from a given record onward,
including that record. Infimum and supremum records are not copied. including that record. Infimum and supremum records are not copied.
The records are copied to the start of the record list on new_page. */ The records are copied to the start of the record list on new_page. */
ibool rec_t*
page_copy_rec_list_end( page_copy_rec_list_end(
/*===================*/ /*===================*/
/* out: TRUE on success; FALSE on /* out: pointer to the original
compression failure (new_page will successor of the infimum record
be decompressed from new_page_zip) */ on new_page, or NULL on zip overflow
(new_page will be decompressed
from new_page_zip) */
page_t* new_page, /* in/out: index page to copy to */ page_t* new_page, /* in/out: index page to copy to */
page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */ page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */
rec_t* rec, /* in: record on page */ rec_t* rec, /* in: record on page */
...@@ -586,6 +588,7 @@ page_copy_rec_list_end( ...@@ -586,6 +588,7 @@ page_copy_rec_list_end(
mtr_t* mtr) /* in: mtr */ mtr_t* mtr) /* in: mtr */
{ {
page_t* page = ut_align_down(rec, UNIV_PAGE_SIZE); page_t* page = ut_align_down(rec, UNIV_PAGE_SIZE);
rec_t* ret = page_rec_get_next(page_get_infimum_rec(new_page));
ulint log_mode= 0; /* remove warning */ ulint log_mode= 0; /* remove warning */
/* page_zip_validate() will fail here if btr_compress() /* page_zip_validate() will fail here if btr_compress()
...@@ -608,15 +611,29 @@ page_copy_rec_list_end( ...@@ -608,15 +611,29 @@ page_copy_rec_list_end(
mtr_set_log_mode(mtr, log_mode); mtr_set_log_mode(mtr, log_mode);
if (UNIV_UNLIKELY(!page_zip_compress( if (UNIV_UNLIKELY(!page_zip_compress(
new_page_zip, new_page, index, mtr)) new_page_zip, new_page, index, mtr))) {
&& UNIV_UNLIKELY(!page_zip_reorganize( /* Before trying to reorganize the page,
store the number of preceding records on the page. */
ulint ret_pos
= page_rec_get_n_recs_before(ret);
if (UNIV_UNLIKELY(!page_zip_reorganize(
new_page_zip, new_page, index, mtr))) { new_page_zip, new_page, index, mtr))) {
if (UNIV_UNLIKELY(!page_zip_decompress( if (UNIV_UNLIKELY(!page_zip_decompress(
new_page_zip, new_page))) { new_page_zip, new_page))) {
ut_error; ut_error;
} }
return(FALSE); return(NULL);
} else {
/* The page was reorganized:
Seek to ret_pos. */
ret = new_page + PAGE_NEW_INFIMUM;
do {
ret = rec_get_next_ptr(ret, TRUE);
} while (--ret_pos);
}
} }
} }
...@@ -629,7 +646,7 @@ page_copy_rec_list_end( ...@@ -629,7 +646,7 @@ page_copy_rec_list_end(
btr_search_move_or_delete_hash_entries(new_page, page, index); btr_search_move_or_delete_hash_entries(new_page, page, index);
return(TRUE); return(ret);
} }
/***************************************************************** /*****************************************************************
...@@ -637,12 +654,14 @@ Copies records from page to new_page, up to the given record, ...@@ -637,12 +654,14 @@ Copies records from page to new_page, up to the given record,
NOT including that record. Infimum and supremum records are not copied. NOT including that record. Infimum and supremum records are not copied.
The records are copied to the end of the record list on new_page. */ The records are copied to the end of the record list on new_page. */
ibool rec_t*
page_copy_rec_list_start( page_copy_rec_list_start(
/*=====================*/ /*=====================*/
/* out: TRUE on success; FALSE on /* out: pointer to the original
compression failure (new_page will predecessor of the supremum record
be decompressed from new_page_zip) */ on new_page, or NULL on zip overflow
(new_page will be decompressed
from new_page_zip) */
page_t* new_page, /* in/out: index page to copy to */ page_t* new_page, /* in/out: index page to copy to */
page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */ page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */
rec_t* rec, /* in: record on page */ rec_t* rec, /* in: record on page */
...@@ -655,13 +674,15 @@ page_copy_rec_list_start( ...@@ -655,13 +674,15 @@ page_copy_rec_list_start(
rec_t* old_end; rec_t* old_end;
ulint log_mode = 0 /* remove warning */; ulint log_mode = 0 /* remove warning */;
mem_heap_t* heap = NULL; mem_heap_t* heap = NULL;
rec_t* ret = page_rec_get_prev(
page_get_supremum_rec(new_page));
ulint offsets_[REC_OFFS_NORMAL_SIZE]; ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_; ulint* offsets = offsets_;
*offsets_ = (sizeof offsets_) / sizeof *offsets_; *offsets_ = (sizeof offsets_) / sizeof *offsets_;
if (page_rec_is_infimum(rec)) { if (page_rec_is_infimum(rec)) {
return(TRUE); return(ret);
} }
if (UNIV_LIKELY_NULL(new_page_zip)) { if (UNIV_LIKELY_NULL(new_page_zip)) {
...@@ -700,8 +721,13 @@ page_copy_rec_list_start( ...@@ -700,8 +721,13 @@ page_copy_rec_list_start(
mtr_set_log_mode(mtr, log_mode); mtr_set_log_mode(mtr, log_mode);
if (UNIV_UNLIKELY(!page_zip_compress( if (UNIV_UNLIKELY(!page_zip_compress(
new_page_zip, new_page, index, mtr)) new_page_zip, new_page, index, mtr))) {
&& UNIV_UNLIKELY(!page_zip_reorganize( /* Before trying to reorganize the page,
store the number of preceding records on the page. */
ulint ret_pos
= page_rec_get_n_recs_before(ret);
if (UNIV_UNLIKELY(!page_zip_reorganize(
new_page_zip, new_page, index, mtr))) { new_page_zip, new_page, index, mtr))) {
if (UNIV_UNLIKELY(!page_zip_decompress( if (UNIV_UNLIKELY(!page_zip_decompress(
...@@ -709,7 +735,16 @@ page_copy_rec_list_start( ...@@ -709,7 +735,16 @@ page_copy_rec_list_start(
ut_error; ut_error;
} }
return(FALSE); return(NULL);
} else {
/* The page was reorganized:
Seek to ret_pos. */
ret = new_page + PAGE_NEW_INFIMUM;
do {
ret = rec_get_next_ptr(ret, TRUE);
} while (--ret_pos);
}
} }
} }
...@@ -722,7 +757,7 @@ page_copy_rec_list_start( ...@@ -722,7 +757,7 @@ page_copy_rec_list_start(
btr_search_move_or_delete_hash_entries(new_page, page, index); btr_search_move_or_delete_hash_entries(new_page, page, index);
return(TRUE); return(ret);
} }
/************************************************************** /**************************************************************
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment