Commit 2c3b2247 authored by marko's avatar marko

branches/zip: Shorten the log record MLOG_ZIP_PAGE_COMPRESS.

page_zip_copy(), page_zip_compress_write_log(): Add parameter 'index'.

page_zip_parse_write_header(): Check for !page_zip only if page != NULL.

page_zip_compress_write_log(), page_zip_parse_compress(): Omit some
fields in the page header.  Omit the unused bytes between the modification
log and the page trailer.

parse_or_apply_log_rec_body(): Remove a bogus debug assertion.
parent 64678b8b
......@@ -1099,7 +1099,7 @@ btr_root_raise_and_insert(
/* Copy the page byte for byte. */
page_zip_copy(new_page_zip, new_page,
root_page_zip, root, mtr);
root_page_zip, root, cursor->index, mtr);
}
/* If this is a pessimistic insert which is actually done to
......@@ -1809,7 +1809,7 @@ func_start:
ut_a(new_page_zip);
page_zip_copy(new_page_zip, new_page,
page_zip, page, mtr);
page_zip, page, cursor->index, mtr);
page_delete_rec_list_end(move_limit - page
+ new_page, cursor->index,
ULINT_UNDEFINED, ULINT_UNDEFINED,
......@@ -1837,7 +1837,7 @@ func_start:
ut_a(new_page_zip);
page_zip_copy(new_page_zip, new_page,
page_zip, page, mtr);
page_zip, page, cursor->index, mtr);
page_delete_rec_list_start(move_limit - page
+ new_page, cursor->index,
new_page_zip, mtr);
......@@ -2147,7 +2147,7 @@ btr_lift_page_up(
/* Copy the page byte for byte. */
page_zip_copy(father_page_zip, father_page,
buf_block_get_page_zip(buf_block_align(page)),
page, mtr);
page, index, mtr);
}
lock_update_copy_and_discard(father_page, page);
......
......@@ -277,6 +277,7 @@ page_zip_copy(
page_t* page, /* out: copy of src */
const page_zip_des_t* src_zip, /* in: compressed page */
const page_t* src, /* in: page */
dict_index_t* index, /* in: index of the B-tree */
mtr_t* mtr) /* in: mini-transaction */
__attribute__((nonnull(1,2,3,4)));
......@@ -288,6 +289,7 @@ page_zip_compress_write_log(
/*========================*/
const page_zip_des_t* page_zip,/* in: compressed page */
const page_t* page, /* in: uncompressed page */
dict_index_t* index, /* in: index of the B-tree node */
mtr_t* mtr) /* in: mini-transaction */
__attribute__((nonnull));
......
......@@ -174,7 +174,8 @@ page_zip_alloc(
if (page_zip_compress(page_zip, page, index)) {
if (mtr) {
page_zip_compress_write_log(page_zip, page, mtr);
page_zip_compress_write_log(
page_zip, page, index, mtr);
}
} else {
/* Unable to compress the page */
......
......@@ -913,7 +913,6 @@ recv_parse_or_apply_log_rec_body(
recv_sys->found_corrupt_log = TRUE;
}
ut_ad(!page || ptr);
if (index) {
dict_table_t* table = index->table;
......
......@@ -1122,7 +1122,7 @@ use_heap:
}
/* 9. Write log record of compressing the page. */
page_zip_compress_write_log(page_zip_orig, page, mtr);
page_zip_compress_write_log(page_zip_orig, page, index, mtr);
return(insert_rec);
}
......
......@@ -496,7 +496,7 @@ page_create_zip(
ut_error;
}
page_zip_compress_write_log(page_zip, frame, mtr);
page_zip_compress_write_log(page_zip, frame, index, mtr);
return(frame);
}
......@@ -618,7 +618,8 @@ page_copy_rec_list_end(
return(FALSE);
}
page_zip_compress_write_log(new_page_zip, new_page, mtr);
page_zip_compress_write_log(new_page_zip,
new_page, index, mtr);
}
/* Update the lock table, MAX_TRX_ID, and possible hash index */
......@@ -711,7 +712,8 @@ page_copy_rec_list_start(
return(FALSE);
}
page_zip_compress_write_log(new_page_zip, new_page, mtr);
page_zip_compress_write_log(new_page_zip,
new_page, index, mtr);
}
/* Update MAX_TRX_ID, the lock table, and possible hash index */
......
......@@ -2855,6 +2855,7 @@ page_zip_parse_write_header(
ulint offset;
ulint len;
ut_ad(ptr && end_ptr);
ut_ad(!page == !page_zip);
if (UNIV_UNLIKELY(end_ptr < ptr + (1 + 1))) {
......@@ -2865,8 +2866,8 @@ page_zip_parse_write_header(
offset = (ulint) *ptr++;
len = (ulint) *ptr++;
if (UNIV_UNLIKELY(!len) || UNIV_UNLIKELY(offset + len >= PAGE_DATA)
|| UNIV_UNLIKELY(!page_zip)) {
if (UNIV_UNLIKELY(!len) || UNIV_UNLIKELY(offset + len >= PAGE_DATA)) {
corrupt:
recv_sys->found_corrupt_log = TRUE;
return(NULL);
......@@ -2878,6 +2879,10 @@ page_zip_parse_write_header(
}
if (page) {
if (UNIV_UNLIKELY(!page_zip)) {
goto corrupt;
}
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
......@@ -2938,6 +2943,7 @@ page_zip_copy(
page_t* page, /* out: copy of src */
const page_zip_des_t* src_zip, /* in: compressed page */
const page_t* src, /* in: page */
dict_index_t* index, /* in: index of the B-tree */
mtr_t* mtr) /* in: mini-transaction */
{
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
......@@ -2977,7 +2983,7 @@ page_zip_copy(
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
page_zip_compress_write_log(page_zip, page, mtr);
page_zip_compress_write_log(page_zip, page, index, mtr);
}
/**************************************************************************
......@@ -2988,28 +2994,63 @@ page_zip_compress_write_log(
/*========================*/
const page_zip_des_t* page_zip,/* in: compressed page */
const page_t* page, /* in: uncompressed page */
dict_index_t* index, /* in: index of the B-tree node */
mtr_t* mtr) /* in: mini-transaction */
{
byte* log_ptr;
ulint trailer_size;
ut_ad(page_zip_validate(page_zip, page));
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
log_ptr = mlog_open(mtr, 11 + 2);
log_ptr = mlog_open(mtr, 11 + 2 + 2);
if (!log_ptr) {
return;
}
/* Read the number of user records.
Subtract 2 for the infimum and supremum records. */
trailer_size = page_dir_get_n_heap(page_zip->data) - 2;
/* Multiply by uncompressed of size stored per record */
if (page_is_leaf(page)) {
if (dict_index_is_clust(index)) {
trailer_size *= PAGE_ZIP_DIR_SLOT_SIZE
+ DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
} else {
trailer_size *= PAGE_ZIP_DIR_SLOT_SIZE;
}
} else {
trailer_size *= PAGE_ZIP_DIR_SLOT_SIZE + REC_NODE_PTR_SIZE;
}
/* Add the space occupied by BLOB pointers. */
trailer_size += page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
ut_a(page_zip->m_end > PAGE_DATA);
#if FIL_PAGE_DATA > PAGE_DATA
# error "FIL_PAGE_DATA > PAGE_DATA"
#endif
ut_a(page_zip->m_end + trailer_size <= page_zip->size);
log_ptr = mlog_write_initial_log_record_fast((page_t*) page,
MLOG_ZIP_PAGE_COMPRESS, log_ptr, mtr);
mach_write_to_2(log_ptr, page_zip->size);
mach_write_to_2(log_ptr, page_zip->m_end - FIL_PAGE_TYPE);
log_ptr += 2;
mach_write_to_2(log_ptr, trailer_size);
log_ptr += 2;
mlog_close(mtr, log_ptr);
/* TODO: omit the unused bytes at page_zip->m_end */
/* TODO: omit some bytes at page header */
mlog_catenate_string(mtr, page_zip->data, page_zip->size);
/* Write FIL_PAGE_PREV and FIL_PAGE_NEXT */
mlog_catenate_string(mtr, page_zip->data + FIL_PAGE_PREV, 4);
mlog_catenate_string(mtr, page_zip->data + FIL_PAGE_NEXT, 4);
/* Write most of the page header, the compressed stream and
the modification log. */
mlog_catenate_string(mtr, page_zip->data + FIL_PAGE_TYPE,
page_zip->m_end - FIL_PAGE_TYPE);
/* Write the uncompressed trailer of the compressed page. */
mlog_catenate_string(mtr, page_zip->data + page_zip->size
- trailer_size, trailer_size);
}
/**************************************************************************
......@@ -3025,39 +3066,51 @@ page_zip_parse_compress(
page_zip_des_t* page_zip)/* out: compressed page */
{
ulint size;
ulint trailer_size;
ut_ad(ptr && end_ptr);
ut_ad(!page == !page_zip);
if (UNIV_UNLIKELY(ptr + 2 > end_ptr)) {
if (UNIV_UNLIKELY(ptr + (2 + 2) > end_ptr)) {
return(NULL);
}
size = mach_read_from_2(ptr);
ptr += 2;
trailer_size = mach_read_from_2(ptr);
ptr += 2;
if (UNIV_UNLIKELY(ptr + size > end_ptr)) {
if (UNIV_UNLIKELY(ptr + 8 + size + trailer_size > end_ptr)) {
return(NULL);
}
if (page) {
if (UNIV_UNLIKELY(!page_zip)
|| UNIV_UNLIKELY(page_zip->size != size)) {
|| UNIV_UNLIKELY(page_zip->size < size)) {
corrupt:
recv_sys->found_corrupt_log = TRUE;
return(NULL);
}
memcpy(page_zip->data, ptr, size);
memcpy(page_zip->data + FIL_PAGE_PREV, ptr, 4);
memcpy(page_zip->data + FIL_PAGE_NEXT, ptr + 4, 4);
memcpy(page_zip->data + FIL_PAGE_TYPE, ptr + 8, size);
memset(page_zip->data + FIL_PAGE_TYPE + size, 0,
page_zip->size - trailer_size
- (FIL_PAGE_TYPE + size));
memcpy(page_zip->data + page_zip->size - trailer_size,
ptr + 8 + size, trailer_size);
if (UNIV_UNLIKELY(!page_zip_decompress(page_zip, page))) {
goto corrupt;
}
}
return(ptr + size);
return(ptr + 8 + size + trailer_size);
}
/**************************************************************************
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment