Commit b8296570 authored by marko's avatar marko

branches/zip: Fix bugs triggered by running out of space.

btr_root_raise_and_insert(): If page_copy_rec_list_end() fails,
copy the pages byte for byte.

page_zip_compress(): Ensure that the uncompressed storage area will
fit on the compressed page.
parent 8099eba8
...@@ -1055,6 +1055,7 @@ btr_root_raise_and_insert( ...@@ -1055,6 +1055,7 @@ btr_root_raise_and_insert(
page_zip_des_t* new_page_zip; page_zip_des_t* new_page_zip;
root = btr_cur_get_page(cursor); root = btr_cur_get_page(cursor);
root_page_zip = buf_block_get_page_zip(buf_block_align(root));
tree = btr_cur_get_tree(cursor); tree = btr_cur_get_tree(cursor);
ut_ad(dict_tree_get_page(tree) == buf_frame_get_page_no(root)); ut_ad(dict_tree_get_page(tree) == buf_frame_get_page_no(root));
...@@ -1072,6 +1073,9 @@ btr_root_raise_and_insert( ...@@ -1072,6 +1073,9 @@ btr_root_raise_and_insert(
new_page = btr_page_alloc(tree, 0, FSP_NO_DIR, level, mtr); new_page = btr_page_alloc(tree, 0, FSP_NO_DIR, level, mtr);
new_page_zip = buf_block_get_page_zip(buf_block_align(new_page)); new_page_zip = buf_block_get_page_zip(buf_block_align(new_page));
ut_a(!new_page_zip == !root_page_zip);
ut_a(!new_page_zip || new_page_zip->size == root_page_zip->size);
ut_a(!root_page_zip || !root_page_zip->n_blobs);
btr_page_create(new_page, new_page_zip, tree, level, mtr); btr_page_create(new_page, new_page_zip, tree, level, mtr);
...@@ -1079,14 +1083,20 @@ btr_root_raise_and_insert( ...@@ -1079,14 +1083,20 @@ btr_root_raise_and_insert(
btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr); btr_page_set_next(new_page, new_page_zip, FIL_NULL, mtr);
btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr); btr_page_set_prev(new_page, new_page_zip, FIL_NULL, mtr);
/* Move the records from root to the new page */ /* Copy the records from root to the new page one by one. */
if (UNIV_UNLIKELY(!page_copy_rec_list_end(new_page, new_page_zip, if (UNIV_UNLIKELY(!page_copy_rec_list_end(new_page, new_page_zip,
page_get_infimum_rec(root), cursor->index, mtr))) { page_get_infimum_rec(root), cursor->index, mtr))) {
/* This should always succeed, as new_page ut_a(new_page_zip);
is created from the scratch and receives
the records in sorted order. */ /* Copy the pages byte for byte. This will succeed. */
ut_error; buf_frame_copy(new_page, root);
memcpy(new_page_zip->data, root_page_zip->data,
new_page_zip->size);
new_page_zip->n_blobs = 0;
new_page_zip->m_start = root_page_zip->m_start;
new_page_zip->m_end = root_page_zip->m_end;
page_zip_compress_write_log(new_page_zip, new_page, mtr);
} }
/* If this is a pessimistic insert which is actually done to /* If this is a pessimistic insert which is actually done to
...@@ -1114,7 +1124,6 @@ btr_root_raise_and_insert( ...@@ -1114,7 +1124,6 @@ btr_root_raise_and_insert(
| REC_INFO_MIN_REC_FLAG); | REC_INFO_MIN_REC_FLAG);
/* Rebuild the root page to get free space */ /* Rebuild the root page to get free space */
root_page_zip = buf_block_get_page_zip(buf_block_align(root));
if (UNIV_LIKELY_NULL(root_page_zip)) { if (UNIV_LIKELY_NULL(root_page_zip)) {
page_create_zip(root, root_page_zip, cursor->index, page_create_zip(root, root_page_zip, cursor->index,
level + 1, mtr); level + 1, mtr);
......
...@@ -560,6 +560,7 @@ page_zip_compress( ...@@ -560,6 +560,7 @@ page_zip_compress(
byte* buf; /* compressed payload of the page */ byte* buf; /* compressed payload of the page */
byte* buf_end;/* end of buf */ byte* buf_end;/* end of buf */
ulint n_dense; ulint n_dense;
ulint slot_size;/* amount of uncompressed bytes per record */
const rec_t** recs; /* dense page directory, sorted by address */ const rec_t** recs; /* dense page directory, sorted by address */
mem_heap_t* heap; mem_heap_t* heap;
ulint trx_id_col; ulint trx_id_col;
...@@ -641,8 +642,8 @@ page_zip_compress( ...@@ -641,8 +642,8 @@ page_zip_compress(
ut_ad(trx_id_col > 0); ut_ad(trx_id_col > 0);
ut_ad(trx_id_col != ULINT_UNDEFINED); ut_ad(trx_id_col != ULINT_UNDEFINED);
c_stream.avail_out -= n_dense * (PAGE_ZIP_DIR_SLOT_SIZE slot_size = PAGE_ZIP_DIR_SLOT_SIZE
+ DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
} else { } else {
/* Signal the absence of trx_id /* Signal the absence of trx_id
in page_zip_fields_encode() */ in page_zip_fields_encode() */
...@@ -650,14 +651,18 @@ page_zip_compress( ...@@ -650,14 +651,18 @@ page_zip_compress(
index, DATA_TRX_ID) index, DATA_TRX_ID)
== ULINT_UNDEFINED); == ULINT_UNDEFINED);
trx_id_col = 0; trx_id_col = 0;
c_stream.avail_out -= n_dense * PAGE_ZIP_DIR_SLOT_SIZE; slot_size = PAGE_ZIP_DIR_SLOT_SIZE;
} }
} else { } else {
c_stream.avail_out -= n_dense * (PAGE_ZIP_DIR_SLOT_SIZE slot_size = PAGE_ZIP_DIR_SLOT_SIZE + REC_NODE_PTR_SIZE;
+ REC_NODE_PTR_SIZE);
trx_id_col = ULINT_UNDEFINED; trx_id_col = ULINT_UNDEFINED;
} }
if (UNIV_UNLIKELY(c_stream.avail_out < n_dense * slot_size)) {
goto zlib_error;
}
c_stream.avail_out -= n_dense * slot_size;
c_stream.avail_in = page_zip_fields_encode( c_stream.avail_in = page_zip_fields_encode(
n_fields, index, trx_id_col, fields); n_fields, index, trx_id_col, fields);
c_stream.next_in = fields; c_stream.next_in = fields;
...@@ -2951,6 +2956,7 @@ page_zip_compress_write_log( ...@@ -2951,6 +2956,7 @@ page_zip_compress_write_log(
mlog_close(mtr, log_ptr); mlog_close(mtr, log_ptr);
/* TODO: omit the unused bytes at page_zip->m_end */ /* TODO: omit the unused bytes at page_zip->m_end */
/* TODO: omit some bytes at page header */
mlog_catenate_string(mtr, page_zip->data, page_zip->size); mlog_catenate_string(mtr, page_zip->data, page_zip->size);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment