Commit f3230111 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12353: Introduce MLOG_ZIP_WRITE_STRING

Log the low-level operations for ROW_FORMAT=COMPRESSED index pages
using a new record, MLOG_ZIP_WRITE_STRING. We will still use
MLOG_1BYTE,..., MLOG_8BYTES or MLOG_WRITE_STRING for operations
on other than index pages (such as the page allocation bitmap pages).

We will stop writing the record MLOG_ZIP_PAGE_COMPRESS later, after
replacing all MLOG_REC_ and MLOG_COMP_REC_ that update index pages.
parent db5cdc31
......@@ -120,6 +120,9 @@ enum mlog_id_t {
/** initialize an ibuf bitmap page (used in MariaDB 10.2 and 10.3) */
MLOG_IBUF_BITMAP_INIT = 27,
/** MDEV-12353 WIP: write to a ROW_FORMAT=COMPRESSED page */
MLOG_ZIP_WRITE_STRING = 29,
/** write a string to a page */
MLOG_WRITE_STRING = 30,
......
......@@ -259,6 +259,7 @@ page_zip_write_rec(
/***********************************************************//**
Parses a log record of writing a BLOB pointer of a record.
@return end of log record or NULL */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_zip_parse_write_blob_ptr(
/*==========================*/
......@@ -285,6 +286,7 @@ page_zip_write_blob_ptr(
/***********************************************************//**
Parses a log record of writing the node pointer of a record.
@return end of log record or NULL */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_zip_parse_write_node_ptr(
/*==========================*/
......@@ -331,6 +333,7 @@ page_zip_write_trx_id_and_roll_ptr(
@param[in,out] page_zip compressed page
@return end of log record
@retval NULL if the log record is incomplete */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_zip_parse_write_trx_id(
const byte* ptr,
......@@ -398,6 +401,7 @@ page_zip_dir_add_slot(
/***********************************************************//**
Parses a log record of writing to the header of a page.
@return end of log record or NULL */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_zip_parse_write_header(
/*========================*/
......@@ -448,6 +452,7 @@ page_zip_copy_recs(
@param[in,out] block ROW_FORMAT=COMPRESSED block, or NULL for parsing only
@return end of log record
@retval NULL if the log record is incomplete */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte* page_zip_parse_compress(const byte* ptr, const byte* end_ptr,
buf_block_t* block);
......
......@@ -319,15 +319,6 @@ page_zip_des_init(
memset(page_zip, 0, sizeof *page_zip);
}
/**********************************************************************//**
Write a log record of writing to the uncompressed header portion of a page. */
void
page_zip_write_header_log(
/*======================*/
const byte* data,/*!< in: data on the uncompressed page */
ulint length, /*!< in: length of the data */
mtr_t* mtr); /*!< in: mini-transaction */
/**********************************************************************//**
Write data to the uncompressed header portion of a page. The data must
already have been written to the uncompressed page.
......@@ -351,14 +342,21 @@ page_zip_write_header(
pos = page_offset(str);
ut_ad(pos < PAGE_DATA);
ut_ad(pos + length < PAGE_DATA);
memcpy(page_zip->data + pos, str, length);
/* The following would fail in page_cur_insert_rec_zip(). */
/* ut_ad(page_zip_validate(page_zip, str - pos)); */
if (mtr) {
page_zip_write_header_log(str, length, mtr);
if (!mtr) {
} else if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + length)) {
log_ptr = mlog_write_initial_log_record_fast(
str, MLOG_ZIP_WRITE_STRING, log_ptr, mtr);
mach_write_to_2(log_ptr, pos);
mach_write_to_2(log_ptr + 2, length);
memcpy(log_ptr + 4, str, length);
mlog_close(mtr, log_ptr + 4 + length);
}
}
......
......@@ -1737,6 +1737,40 @@ recv_parse_or_apply_log_rec_body(
contents can be ignored. We do not write or apply
this record yet. */
break;
case MLOG_ZIP_WRITE_STRING:
ut_ad(!page_zip
|| !fil_page_get_type(page_zip->data)
|| fil_page_get_type(page_zip->data) == FIL_PAGE_INDEX
|| fil_page_get_type(page_zip->data) == FIL_PAGE_RTREE);
if (ptr + 4 > end_ptr) {
goto truncated;
} else {
const ulint ofs = mach_read_from_2(ptr);
const ulint len = mach_read_from_2(ptr + 2);
if (ofs < FIL_PAGE_PREV || !len) {
goto corrupted;
}
ptr += 4 + len;
if (ptr > end_ptr) {
goto truncated;
}
if (!page_zip) {
break;
}
ut_ad(ofs + len <= block->zip_size());
memcpy(page_zip->data + ofs, old_ptr + 4, len);
if (ofs >= FIL_PAGE_TYPE +2
|| ofs + len < FIL_PAGE_TYPE + 2) {
break;
}
/* Ensure that buf_flush_init_for_writing()
will treat the page as an index page, and
not overwrite the compressed page with the
contents of the uncompressed page. */
memcpy_aligned<2>(&page[FIL_PAGE_TYPE],
&page_zip->data[FIL_PAGE_TYPE], 2);
}
break;
case MLOG_WRITE_STRING:
ut_ad(!page_zip
|| fil_page_get_type(page_zip->data)
......@@ -1813,11 +1847,12 @@ recv_parse_or_apply_log_rec_body(
}
break;
default:
ptr = NULL;
ib::error() << "Incorrect log record type "
<< ib::hex(unsigned(type));
corrupted:
recv_sys.found_corrupt_log = true;
truncated:
ptr = NULL;
}
if (index) {
......@@ -2064,9 +2099,16 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
- FIL_PAGE_END_LSN_OLD_CHKSUM
+ page, end_lsn);
if (page_zip) {
mach_write_to_8(FIL_PAGE_LSN + page_zip->data,
end_lsn);
if (UNIV_LIKELY_NULL(page_zip)) {
memcpy_aligned<8>(FIL_PAGE_LSN
+ page_zip->data,
FIL_PAGE_LSN + page, 8);
if (fil_page_index_page_check(page)
&& !page_zip_decompress(page_zip, page,
true)) {
ib::error() << "corrupted page "
<< block->page.id;
}
}
}
}
......@@ -3936,6 +3978,9 @@ static const char* get_mlog_string(mlog_id_t type)
case MLOG_IBUF_BITMAP_INIT:
return("MLOG_IBUF_BITMAP_INIT");
case MLOG_ZIP_WRITE_STRING:
return("MLOG_ZIP_WRITE_STRING");
case MLOG_WRITE_STRING:
return("MLOG_WRITE_STRING");
......
......@@ -3815,6 +3815,7 @@ page_zip_write_rec(
/***********************************************************//**
Parses a log record of writing a BLOB pointer of a record.
@return end of log record or NULL */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_zip_parse_write_blob_ptr(
/*==========================*/
......@@ -3935,24 +3936,22 @@ page_zip_write_blob_ptr(
ut_a(page_zip_validate(page_zip, page, index));
#endif /* UNIV_ZIP_DEBUG */
if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + FIELD_REF_SIZE)) {
if (byte* log_ptr = mlog_open(mtr, 11 + 2 + FIELD_REF_SIZE)) {
log_ptr = mlog_write_initial_log_record_low(
MLOG_ZIP_WRITE_BLOB_PTR,
MLOG_ZIP_WRITE_STRING,
block->page.id.space(), block->page.id.page_no(),
log_ptr, mtr);
mach_write_to_2(log_ptr, page_offset(field));
log_ptr += 2;
mach_write_to_2(log_ptr, ulint(externs - page_zip->data));
log_ptr += 2;
memcpy(log_ptr, externs, BTR_EXTERN_FIELD_REF_SIZE);
log_ptr += BTR_EXTERN_FIELD_REF_SIZE;
mlog_close(mtr, log_ptr);
mach_write_to_2(log_ptr + 2, BTR_EXTERN_FIELD_REF_SIZE);
memcpy(log_ptr + 4, externs, BTR_EXTERN_FIELD_REF_SIZE);
mlog_close(mtr, log_ptr + 4 + BTR_EXTERN_FIELD_REF_SIZE);
}
}
/***********************************************************//**
Parses a log record of writing the node pointer of a record.
@return end of log record or NULL */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_zip_parse_write_node_ptr(
/*==========================*/
......@@ -4066,18 +4065,15 @@ page_zip_write_node_ptr(
mach_write_to_4(field, ptr);
memcpy(storage, field, REC_NODE_PTR_SIZE);
if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + REC_NODE_PTR_SIZE)) {
if (byte* log_ptr = mlog_open(mtr, 11 + 2 + REC_NODE_PTR_SIZE)) {
log_ptr = mlog_write_initial_log_record_low(
MLOG_ZIP_WRITE_NODE_PTR,
MLOG_ZIP_WRITE_STRING,
block->page.id.space(), block->page.id.page_no(),
log_ptr, mtr);
mach_write_to_2(log_ptr, page_offset(field));
log_ptr += 2;
mach_write_to_2(log_ptr, ulint(storage - page_zip->data));
log_ptr += 2;
memcpy(log_ptr, field, REC_NODE_PTR_SIZE);
log_ptr += REC_NODE_PTR_SIZE;
mlog_close(mtr, log_ptr);
mach_write_to_2(log_ptr + 2, REC_NODE_PTR_SIZE);
memcpy(log_ptr + 4, storage, REC_NODE_PTR_SIZE);
mlog_close(mtr, log_ptr + 4 + REC_NODE_PTR_SIZE);
}
}
......@@ -4120,9 +4116,10 @@ page_zip_write_trx_id_and_roll_ptr(
UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
constexpr ulint sys_len = DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
storage = page_zip_dir_start(page_zip)
- (rec_get_heap_no_new(rec) - 1)
* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
* sys_len;
compile_time_assert(DATA_TRX_ID + 1 == DATA_ROLL_PTR);
field = rec_get_nth_field(rec, offsets, trx_id_col, &len);
......@@ -4131,35 +4128,27 @@ page_zip_write_trx_id_and_roll_ptr(
== rec_get_nth_field(rec, offsets, trx_id_col + 1, &len));
ut_ad(len == DATA_ROLL_PTR_LEN);
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(!memcmp(storage, field, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN));
ut_a(!memcmp(storage, field, sys_len));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
compile_time_assert(DATA_TRX_ID_LEN == 6);
mach_write_to_6(field, trx_id);
compile_time_assert(DATA_ROLL_PTR_LEN == 7);
mach_write_to_7(field + DATA_TRX_ID_LEN, roll_ptr);
memcpy(storage, field, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
memcpy(storage, field, sys_len);
UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
rec_offs_extra_size(offsets));
UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
if (mtr) {
byte* log_ptr = mlog_open(
mtr, 11 + 2 + 2 + DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
if (UNIV_UNLIKELY(!log_ptr)) {
return;
}
if (!mtr) {
} else if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + sys_len)) {
log_ptr = mlog_write_initial_log_record_fast(
(byte*) field, MLOG_ZIP_WRITE_TRX_ID, log_ptr, mtr);
mach_write_to_2(log_ptr, page_offset(field));
log_ptr += 2;
field, MLOG_ZIP_WRITE_STRING, log_ptr, mtr);
mach_write_to_2(log_ptr, ulint(storage - page_zip->data));
log_ptr += 2;
memcpy(log_ptr, field, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
log_ptr += DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
mlog_close(mtr, log_ptr);
mach_write_to_2(log_ptr + 2, sys_len);
memcpy(log_ptr + 4, field, sys_len);
mlog_close(mtr, log_ptr + 4 + sys_len);
}
}
......@@ -4170,6 +4159,7 @@ page_zip_write_trx_id_and_roll_ptr(
@param[in,out] page_zip compressed page
@return end of log record
@retval NULL if the log record is incomplete */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_zip_parse_write_trx_id(
const byte* ptr,
......@@ -4585,6 +4575,7 @@ page_zip_dir_add_slot(
/***********************************************************//**
Parses a log record of writing to the header of a page.
@return end of log record or NULL */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_zip_parse_write_header(
/*========================*/
......@@ -4640,39 +4631,6 @@ page_zip_parse_write_header(
return(ptr + len);
}
/**********************************************************************//**
Write a log record of writing to the uncompressed header portion of a page. */
void
page_zip_write_header_log(
/*======================*/
const byte* data, /*!< in: data on the uncompressed page */
ulint length, /*!< in: length of the data */
mtr_t* mtr) /*!< in: mini-transaction */
{
byte* log_ptr = mlog_open(mtr, 11 + 1 + 1);
ulint offset = page_offset(data);
ut_ad(offset < PAGE_DATA);
ut_ad(offset + length < PAGE_DATA);
compile_time_assert(PAGE_DATA < 256U);
ut_ad(length > 0);
ut_ad(length < 256);
/* If no logging is requested, we may return now */
if (UNIV_UNLIKELY(!log_ptr)) {
return;
}
log_ptr = mlog_write_initial_log_record_fast(
(byte*) data, MLOG_ZIP_WRITE_HEADER, log_ptr, mtr);
*log_ptr++ = (byte) offset;
*log_ptr++ = (byte) length;
mlog_close(mtr, log_ptr);
mlog_catenate_string(mtr, data, length);
}
/**********************************************************************//**
Reorganize and compress a page. This is a low-level operation for
compressed pages, to be used when page_zip_compress() fails.
......@@ -4885,6 +4843,7 @@ page_zip_copy_recs(
@param[in,out] block ROW_FORMAT=COMPRESSED block, or NULL for parsing only
@return end of log record
@retval NULL if the log record is incomplete */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte* page_zip_parse_compress(const byte* ptr, const byte* end_ptr,
buf_block_t* block)
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment