Commit f119108f authored by marko's avatar marko

branches/zip: Add the redo log type MLOG_ZIP_PAGE_COMPRESS. Remove

MLOG_ZIP_LIST_START_COPY and MLOG_ZIP_LIST_END_COPY.

btr_compress(): Simplify a debug assertion.

page_zip_compress_write_log(), page_zip_parse_compress(): New functions.

page_cur_parse_insert_rec(): Simplify the code.

page_parse_create_zip(): Removed.

page_create_zip(), page_copy_rec_list_end(),
page_copy_rec_list_start(): Invoke page_zip_compress_write_log().
parent c65c50ae
......@@ -2214,8 +2214,7 @@ btr_compress(
max_ins_size = page_get_max_insert_size(merge_page, n_recs);
ut_ad(page_validate(merge_page, cursor->index));
ut_ad(page_get_max_insert_size(merge_page, n_recs)
== max_ins_size_reorg);
ut_ad(max_ins_size == max_ins_size_reorg);
if (UNIV_UNLIKELY(data_size > max_ins_size)) {
......
......@@ -137,13 +137,8 @@ flag value must give the length also! */
on a compressed page */
#define MLOG_ZIP_WRITE_HEADER ((byte)49) /* write to compressed page
header */
#define MLOG_ZIP_PAGE_CREATE ((byte)50) /* create a compressed
index page */
#define MLOG_ZIP_LIST_START_COPY ((byte)51) /* copy compact record list
start to a compressed page */
#define MLOG_ZIP_LIST_END_COPY ((byte)52) /* copy compact record list
end to a compressed page */
#define MLOG_BIGGEST_TYPE ((byte)52) /* biggest value (used in
#define MLOG_ZIP_PAGE_COMPRESS ((byte)50) /* compress an index page */
#define MLOG_BIGGEST_TYPE ((byte)50) /* biggest value (used in
asserts) */
/*******************************************************************
......
......@@ -178,7 +178,7 @@ including that record. Infimum and supremum records are not copied. */
void
page_copy_rec_list_end_to_created_page(
/*===================================*/
page_t* new_page, /* in: index page to copy to */
page_t* new_page, /* in/out: index page to copy to */
rec_t* rec, /* in: first record to copy */
dict_index_t* index, /* in: record descriptor */
mtr_t* mtr); /* in: mtr */
......
......@@ -266,6 +266,30 @@ page_zip_write_header(
mtr_t* mtr) /* in: mini-transaction, or NULL */
__attribute__((nonnull(1,2)));
/**************************************************************************
Write a log record of compressing an index page. */
void
page_zip_compress_write_log(
/*========================*/
const page_zip_des_t* page_zip,/* in: compressed page */
const page_t* page, /* in: uncompressed page */
mtr_t* mtr) /* in: mini-transaction */
__attribute__((nonnull));
/**************************************************************************
Parses a log record of compressing an index page. */
byte*
page_zip_parse_compress(
/*====================*/
/* out: end of log record or NULL */
byte* ptr, /* in: buffer */
byte* end_ptr,/* in: buffer end */
page_t* page, /* out: uncompressed page */
page_zip_des_t* page_zip)/* out: compressed page */
__attribute__((nonnull(1,2)));
#ifdef UNIV_MATERIALIZE
# undef UNIV_INLINE
# define UNIV_INLINE UNIV_INLINE_ORIGINAL
......
......@@ -905,16 +905,9 @@ recv_parse_or_apply_log_rec_body(
ptr = page_zip_parse_write_header(
ptr, end_ptr, page, page_zip);
break;
case MLOG_ZIP_PAGE_CREATE:
if (NULL != (ptr = mlog_parse_index(
ptr, end_ptr, TRUE, &index))) {
ptr = page_parse_create_zip(ptr, end_ptr,
page, page_zip, index, mtr);
}
break;
case MLOG_ZIP_LIST_START_COPY:
case MLOG_ZIP_LIST_END_COPY:
ut_error; /* TODO */
case MLOG_ZIP_PAGE_COMPRESS:
ptr = page_zip_parse_compress(
ptr, end_ptr, page, page_zip);
break;
default:
ptr = NULL;
......
......@@ -705,7 +705,6 @@ page_cur_parse_insert_rec(
page_zip_des_t* page_zip,/* in/out: compressed page, or NULL */
mtr_t* mtr) /* in: mtr or NULL */
{
ulint offset = 0; /* remove warning */
ulint origin_offset;
ulint end_seg_len;
ulint mismatch_index;
......@@ -720,24 +719,29 @@ page_cur_parse_insert_rec(
ulint* offsets = offsets_;
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
if (!is_short) {
if (is_short) {
cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
} else {
ulint offset;
/* Read the cursor rec offset as a 2-byte ulint */
if (end_ptr < ptr + 2) {
if (UNIV_UNLIKELY(end_ptr < ptr + 2)) {
return(NULL);
}
offset = mach_read_from_2(ptr);
ptr += 2;
cursor_rec = page + offset;
if (offset >= UNIV_PAGE_SIZE) {
if (UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)) {
recv_sys->found_corrupt_log = TRUE;
return(NULL);
}
ptr += 2;
}
ptr = mach_parse_compressed(ptr, end_ptr, &end_seg_len);
......@@ -783,7 +787,7 @@ page_cur_parse_insert_rec(
ut_a(mismatch_index < UNIV_PAGE_SIZE);
}
if (end_ptr < ptr + (end_seg_len >> 1)) {
if (UNIV_UNLIKELY(end_ptr < ptr + (end_seg_len >> 1))) {
return(NULL);
}
......@@ -799,12 +803,6 @@ page_cur_parse_insert_rec(
/* Read from the log the inserted index record end segment which
differs from the cursor record */
if (is_short) {
cursor_rec = page_rec_get_prev(page_get_supremum_rec(page));
} else {
cursor_rec = page + offset;
}
offsets = rec_get_offsets(cursor_rec, index, offsets,
ULINT_UNDEFINED, &heap);
......@@ -832,7 +830,7 @@ page_cur_parse_insert_rec(
"mismatch index %lu, end_seg_len %lu\n"
"parsed len %lu\n",
(ulong) is_short, (ulong) info_and_status_bits,
(ulong) offset,
(ulong) ut_align_offset(cursor_rec, UNIV_PAGE_SIZE),
(ulong) origin_offset,
(ulong) mismatch_index, (ulong) end_seg_len,
(ulong) (ptr - ptr2));
......@@ -1203,7 +1201,7 @@ including that record. Infimum and supremum records are not copied. */
void
page_copy_rec_list_end_to_created_page(
/*===================================*/
page_t* new_page, /* in: index page to copy to */
page_t* new_page, /* in/out: index page to copy to */
rec_t* rec, /* in: first record to copy */
dict_index_t* index, /* in: record descriptor */
mtr_t* mtr) /* in: mtr */
......
......@@ -482,40 +482,6 @@ page_create(
return(page_create_low(frame, comp));
}
/***************************************************************
Parses a redo log record of creating a compressed page. */
byte*
page_parse_create_zip(
/*==================*/
/* out: end of log record or NULL */
byte* ptr, /* in: buffer */
byte* end_ptr,/* in: buffer end */
page_t* page, /* in/out: page or NULL */
page_zip_des_t* page_zip,/* in/out: compressed page or NULL */
dict_index_t* index, /* in: index of the page */
mtr_t* mtr) /* in: mtr or NULL */
{
ulint level;
ut_ad(ptr && end_ptr && index);
if (UNIV_UNLIKELY(ptr + 2 > end_ptr)) {
return(NULL);
}
level = mach_read_from_2(ptr);
ptr += 2;
if (page) {
page_create_zip(page, page_zip, index, level, mtr);
}
return(ptr);
}
/**************************************************************
Create a compressed B-tree index page. */
......@@ -530,22 +496,11 @@ page_create_zip(
ulint level, /* in: the B-tree level of the page */
mtr_t* mtr) /* in: mini-transaction handle */
{
byte* log_ptr;
ut_ad(frame && page_zip && index);
ut_ad(dict_table_is_comp(index->table));
log_ptr = mlog_open(mtr, 11 + 2);
if (log_ptr) {
log_ptr = mlog_write_initial_log_record_fast(frame,
MLOG_ZIP_PAGE_CREATE, log_ptr, mtr);
mach_write_to_2(log_ptr, level);
log_ptr += 2;
mlog_close(mtr, log_ptr);
}
mach_write_to_2(frame + PAGE_HEADER + PAGE_LEVEL, level);
page_create_low(frame, TRUE);
mach_write_to_2(frame + PAGE_HEADER + PAGE_LEVEL, level);
if (UNIV_UNLIKELY(!page_zip_compress(page_zip, frame, index))) {
/* The compression of a newly created page
......@@ -553,6 +508,8 @@ page_create_zip(
ut_error;
}
page_zip_compress_write_log(page_zip, frame, mtr);
return(frame);
}
......@@ -643,19 +600,26 @@ page_copy_rec_list_end(
mtr_t* mtr) /* in: mtr */
{
page_t* page;
ulint log_mode = 0; /* remove warning */
ut_ad(!new_page_zip || page_zip_validate(new_page_zip, new_page));
if (UNIV_LIKELY_NULL(new_page_zip)) {
log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
}
if (page_dir_get_n_heap(new_page) == 2) {
page_copy_rec_list_end_to_created_page(
new_page, rec, index, mtr);
} else {
ut_ad(!new_page_zip
|| page_zip_validate(new_page_zip, new_page));
page_copy_rec_list_end_no_locks(new_page, rec, index, mtr);
}
page = ut_align_down(rec, UNIV_PAGE_SIZE);
if (UNIV_LIKELY_NULL(new_page_zip)) {
mtr_set_log_mode(mtr, log_mode);
if (UNIV_UNLIKELY(!page_zip_compress(new_page_zip,
new_page, index))) {
......@@ -665,6 +629,8 @@ page_copy_rec_list_end(
}
return(FALSE);
}
page_zip_compress_write_log(new_page_zip, new_page, mtr);
}
/* Update the lock table, MAX_TRX_ID, and possible hash index */
......@@ -700,6 +666,7 @@ page_copy_rec_list_start(
page_cur_t cur2;
page_t* page;
rec_t* old_end;
ulint log_mode = 0 /* remove warning */;
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
......@@ -710,6 +677,10 @@ page_copy_rec_list_start(
return(TRUE);
}
if (UNIV_LIKELY_NULL(new_page_zip)) {
log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
}
page = ut_align_down(rec, UNIV_PAGE_SIZE);
page_cur_set_before_first(page, &cur1);
......@@ -739,6 +710,8 @@ page_copy_rec_list_start(
}
if (UNIV_LIKELY_NULL(new_page_zip)) {
mtr_set_log_mode(mtr, log_mode);
if (UNIV_UNLIKELY(!page_zip_compress(new_page_zip,
new_page, index))) {
......@@ -749,6 +722,8 @@ page_copy_rec_list_start(
/* TODO: try btr_page_reorganize() */
return(FALSE);
}
page_zip_compress_write_log(new_page_zip, new_page, mtr);
}
/* Update MAX_TRX_ID, the lock table, and possible hash index */
......
......@@ -2849,3 +2849,82 @@ page_zip_write_header_log(
mlog_catenate_string(mtr, page_zip->data + offset, length);
}
/**************************************************************************
Write a log record of compressing an index page. */
void
page_zip_compress_write_log(
/*========================*/
const page_zip_des_t* page_zip,/* in: compressed page */
const page_t* page, /* in: uncompressed page */
mtr_t* mtr) /* in: mini-transaction */
{
byte* log_ptr;
ut_ad(page_zip_validate(page_zip, page));
log_ptr = mlog_open(mtr, 11 + 2);
if (!log_ptr) {
return;
}
log_ptr = mlog_write_initial_log_record_fast((page_t*) page,
MLOG_ZIP_PAGE_COMPRESS, log_ptr, mtr);
mach_write_to_2(log_ptr, page_zip->size);
log_ptr += 2;
mlog_close(mtr, log_ptr);
/* TODO: omit the unused bytes at page_zip->m_end */
mlog_catenate_string(mtr, page_zip->data, page_zip->size);
}
/**************************************************************************
Parses a log record of compressing an index page. */
byte*
page_zip_parse_compress(
/*====================*/
/* out: end of log record or NULL */
byte* ptr, /* in: buffer */
byte* end_ptr,/* in: buffer end */
page_t* page, /* out: uncompressed page */
page_zip_des_t* page_zip)/* out: compressed page */
{
ulint size;
ut_ad(ptr && end_ptr);
if (UNIV_UNLIKELY(ptr + 2 > end_ptr)) {
return(NULL);
}
size = mach_read_from_2(ptr);
ptr += 2;
if (UNIV_UNLIKELY(ptr + size > end_ptr)) {
return(NULL);
}
if (page) {
if (UNIV_UNLIKELY(!page_zip)
|| UNIV_UNLIKELY(page_zip->size != size)) {
corrupt:
recv_sys->found_corrupt_log = TRUE;
return(NULL);
}
memcpy(page_zip->data, ptr, size);
if (UNIV_UNLIKELY(!page_zip_decompress(page_zip, page))) {
goto corrupt;
}
}
return(ptr + size);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment