Commit 0906cd0c authored by marko's avatar marko

branches/zip: Implement crash recovery of writing BLOB pointers.

page_zip_parse_write_blob_ptr(): New function for applying the redo log
record MLOG_ZIP_WRITE_BLOB_PTR.

page_zip_write_blob_ptr(): Write the necessary information to the redo log.

page0zip.c: Tighten the assertions to ensure that blob_ptr < page_zip->n_blobs.

page_zip_write_node_ptr(): Use memcpy() instead of mach_write_to_4().
parent 44b22ebe
......@@ -117,8 +117,20 @@ page_zip_write_rec(
ulint create) /* in: nonzero=insert, zero=update */
__attribute__((nonnull));
/***************************************************************
Parses a log record of writing a BLOB pointer of a record. */
byte*
page_zip_parse_write_blob_ptr(
/*==========================*/
/* out: end of log record or NULL */
byte* ptr, /* in: redo log buffer */
byte* end_ptr,/* in: redo log buffer end */
page_t* page, /* in/out: uncompressed page */
page_zip_des_t* page_zip);/* in/out: compressed page */
/**************************************************************************
Write the BLOB pointer of a record on the leaf page of a clustered index.
Write a BLOB pointer of a record on the leaf page of a clustered index.
The information must already have been updated on the uncompressed page. */
void
......
......@@ -897,6 +897,10 @@ recv_parse_or_apply_log_rec_body(
ptr = page_zip_parse_write_node_ptr(
ptr, end_ptr, page, page_zip);
break;
case MLOG_ZIP_WRITE_BLOB_PTR:
ptr = page_zip_parse_write_blob_ptr(
ptr, end_ptr, page, page_zip);
break;
case MLOG_ZIP_WRITE_HEADER:
ptr = page_zip_parse_write_header(
ptr, end_ptr, page, page_zip);
......
......@@ -2043,7 +2043,7 @@ page_zip_write_rec(
page_zip, rec, index);
byte* ext_end = externs - page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(blob_no <= page_zip->n_blobs);
ut_ad(blob_no < page_zip->n_blobs);
externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;
if (create) {
......@@ -2058,7 +2058,7 @@ page_zip_write_rec(
externs - ext_end);
}
ut_a(blob_no + n_ext <= page_zip->n_blobs);
ut_a(blob_no + n_ext < page_zip->n_blobs);
}
/* Store separately trx_id, roll_ptr and
......@@ -2153,8 +2153,66 @@ page_zip_write_rec(
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
}
/***************************************************************
Parses a log record of writing a BLOB pointer of a record. */
byte*
page_zip_parse_write_blob_ptr(
/*==========================*/
/* out: end of log record or NULL */
byte* ptr, /* in: redo log buffer */
byte* end_ptr,/* in: redo log buffer end */
page_t* page, /* in/out: uncompressed page */
page_zip_des_t* page_zip)/* in/out: compressed page */
{
ulint offset;
ulint z_offset;
ut_ad(!page == !page_zip);
if (UNIV_UNLIKELY(end_ptr < ptr + (2 + 2 + BTR_EXTERN_FIELD_REF_SIZE))) {
return(NULL);
}
offset = mach_read_from_2(ptr);
z_offset = mach_read_from_2(ptr + 2);
if (UNIV_UNLIKELY(offset < PAGE_ZIP_START)
|| UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)
|| UNIV_UNLIKELY(z_offset >= UNIV_PAGE_SIZE)
|| UNIV_UNLIKELY(!page_zip)) {
corrupt:
recv_sys->found_corrupt_log = TRUE;
return(NULL);
}
if (page) {
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
if (UNIV_UNLIKELY(!page_is_leaf(page))) {
goto corrupt;
}
memcpy(page + offset,
ptr + 4, BTR_EXTERN_FIELD_REF_SIZE);
memcpy(page_zip->data + z_offset,
ptr + 4, BTR_EXTERN_FIELD_REF_SIZE);
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
}
return(ptr + (2 + 2 + BTR_EXTERN_FIELD_REF_SIZE));
}
/**************************************************************************
Write the BLOB pointer of a record on the leaf page of a clustered index.
Write a BLOB pointer of a record on the leaf page of a clustered index.
The information must already have been updated on the uncompressed page. */
void
......@@ -2189,7 +2247,7 @@ page_zip_write_blob_ptr(
blob_no = page_zip_get_n_prev_extern(page_zip, rec, index)
+ rec_get_n_extern_new(rec, index, n);
ut_a(blob_no <= page_zip->n_blobs);
ut_a(blob_no < page_zip->n_blobs);
/* The heap number of the first user record is 2. */
if (dict_index_is_clust(index)) {
......@@ -2205,9 +2263,10 @@ page_zip_write_blob_ptr(
field = rec_get_nth_field((rec_t*) rec, offsets, n, &len);
memcpy(externs - (blob_no + 1) * BTR_EXTERN_FIELD_REF_SIZE,
field + len - BTR_EXTERN_FIELD_REF_SIZE,
BTR_EXTERN_FIELD_REF_SIZE);
externs -= (blob_no + 1) * BTR_EXTERN_FIELD_REF_SIZE;
field += len - BTR_EXTERN_FIELD_REF_SIZE;
memcpy(externs, field, BTR_EXTERN_FIELD_REF_SIZE);
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip,
......@@ -2215,9 +2274,19 @@ page_zip_write_blob_ptr(
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
if (mtr) {
mlog_write_initial_log_record(
(rec_t*) rec, MLOG_ZIP_WRITE_BLOB_PTR, mtr);
/* TODO: write n */
byte* log_ptr = mlog_open(mtr,
11 + 2 + BTR_EXTERN_FIELD_REF_SIZE);
if (UNIV_UNLIKELY(!log_ptr)) {
return;
}
log_ptr = mlog_write_initial_log_record_fast((byte*) field,
MLOG_ZIP_WRITE_BLOB_PTR, log_ptr, mtr);
mach_write_to_2(log_ptr, externs - page_zip->data);
log_ptr += 2;
memcpy(log_ptr, externs, BTR_EXTERN_FIELD_REF_SIZE);
log_ptr += BTR_EXTERN_FIELD_REF_SIZE;
mlog_close(mtr, log_ptr);
}
}
......@@ -2238,7 +2307,7 @@ page_zip_parse_write_node_ptr(
ut_ad(!page == !page_zip);
if (UNIV_UNLIKELY(end_ptr < ptr + (2 + 2 + 4))) {
if (UNIV_UNLIKELY(end_ptr < ptr + (2 + 2 + REC_NODE_PTR_SIZE))) {
return(NULL);
}
......@@ -2287,9 +2356,6 @@ page_zip_parse_write_node_ptr(
goto corrupt;
}
#if REC_NODE_PTR_SIZE != 4
# error "REC_NODE_PTR_SIZE != 4"
#endif
memcpy(field, ptr + 4, REC_NODE_PTR_SIZE);
memcpy(storage, ptr + 4, REC_NODE_PTR_SIZE);
......@@ -2298,7 +2364,7 @@ page_zip_parse_write_node_ptr(
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
}
return(ptr + 6);
return(ptr + (2 + 2 + REC_NODE_PTR_SIZE));
}
/**************************************************************************
......@@ -2344,7 +2410,7 @@ page_zip_write_node_ptr(
memcpy(storage, field, REC_NODE_PTR_SIZE);
if (mtr) {
byte* log_ptr = mlog_open(mtr, 11 + 4 + 2);
byte* log_ptr = mlog_open(mtr, 11 + 2 + REC_NODE_PTR_SIZE);
if (UNIV_UNLIKELY(!log_ptr)) {
return;
}
......@@ -2353,8 +2419,7 @@ page_zip_write_node_ptr(
MLOG_ZIP_WRITE_NODE_PTR, log_ptr, mtr);
mach_write_to_2(log_ptr, storage - page_zip->data);
log_ptr += 2;
mach_write_to_4(log_ptr, ptr);
log_ptr += 4;
memcpy(log_ptr, field, REC_NODE_PTR_SIZE);
mlog_close(mtr, log_ptr + 6);
}
}
......@@ -2787,7 +2852,7 @@ page_zip_write_header_log(
ut_ad(length < 256);
/* If no logging is requested, we may return now */
if (log_ptr == NULL) {
if (UNIV_UNLIKELY(!log_ptr)) {
return;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment