Commit 23dc2e55 authored by marko's avatar marko

branches/zip: Correctly identify deleted records when deciding whether to

copy BLOB pointers.

page_zip_dir_find_free_low(): New function,
split from page_zip_dir_find_free().

Add comments about processing the records in heap_no order.

Fix some typographic errors in comments and improve formatting.

page_zip_decompress(): Initialize (clear) the BLOB pointers in deleted records.

page_zip_clear_rec(): Relocate page_zip_validate() assertions, so that they
will not fail if a record containing BLOB pointers is being deleted.

Note that page_zip_validate() will fail if page_zip_clear_rec() is unable
to clear the record.
parent d21b11ba
......@@ -118,21 +118,15 @@ page_zip_dir_find(
Find the slot of the given free record in the dense page directory. */
UNIV_INLINE
byte*
page_zip_dir_find_free(
/*===================*/
/* out: dense directory slot,
or NULL if record not found */
page_zip_des_t* page_zip, /* in: compressed page */
ulint offset) /* in: offset of user record */
page_zip_dir_find_free_low(
/*=======================*/
/* out: dense directory slot,
or NULL if record not found */
byte* slot, /* in: start of deleted records */
byte* end, /* in: end of deleted records */
ulint offset) /* in: offset of user record */
{
byte* slot;
byte* end;
ut_ad(page_zip_simple_validate(page_zip));
slot = end = page_zip->data + page_zip->size;
slot -= page_zip_dir_size(page_zip);
end -= page_zip_dir_user_size(page_zip);
ut_ad(slot <= end);
for (; slot < end; slot += PAGE_ZIP_DIR_SLOT_SIZE) {
if ((mach_read_from_2(slot) & PAGE_ZIP_DIR_SLOT_MASK)
......@@ -144,6 +138,26 @@ page_zip_dir_find_free(
return(NULL);
}
/*****************************************************************
Find the slot of the given free record in the dense page directory. */
UNIV_INLINE
byte*
page_zip_dir_find_free(
/*===================*/
/* out: dense directory slot,
or NULL if record not found */
page_zip_des_t* page_zip, /* in: compressed page */
ulint offset) /* in: offset of user record */
{
byte* end = page_zip->data + page_zip->size;
ut_ad(page_zip_simple_validate(page_zip));
return(page_zip_dir_find_free_low(
end - page_zip_dir_size(page_zip),
end - page_zip_dir_user_size(page_zip), offset));
}
/*****************************************************************
Read a given slot in the dense page directory. */
UNIV_INLINE
......@@ -631,6 +645,7 @@ page_zip_compress(
goto recs_done;
}
/* Compress the records in heap_no order. */
if (page_is_leaf(page)) {
/* BTR_EXTERN_FIELD_REF storage */
byte* externs;
......@@ -670,7 +685,7 @@ page_zip_compress(
/* Check if there are any externally stored columns.
For each externally stored column, store the
BTR_EXTERN_FIELD_REF separately._*/
BTR_EXTERN_FIELD_REF separately. */
for (i = 0; i < n_fields; i++) {
ulint len;
......@@ -706,8 +721,8 @@ page_zip_compress(
c_stream.next_in,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
c_stream.next_in +=
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
c_stream.next_in
+= DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
/* Skip also roll_ptr */
i++;
......@@ -742,12 +757,18 @@ page_zip_compress(
+ c_stream.avail_out
+ 1/* end of modification log */);
c_stream.next_in +=
BTR_EXTERN_FIELD_REF_SIZE;
c_stream.next_in
+= BTR_EXTERN_FIELD_REF_SIZE;
if (slot >= page_get_n_recs(
(page_t*) page)) {
/* Skip deleted records */
/* Skip deleted records. */
if (UNIV_LIKELY_NULL(
page_zip_dir_find_free_low(
buf_end - PAGE_ZIP_DIR_SLOT_SIZE
* n_dense,
buf_end - PAGE_ZIP_DIR_SLOT_SIZE
* page_get_n_recs((page_t*) page),
ut_align_offset(rec,
UNIV_PAGE_SIZE)))) {
continue;
}
......@@ -1501,6 +1522,7 @@ page_zip_decompress(
*offsets = n;
}
/* Decompress the records in heap_no order. */
for (slot = 0; slot < n_dense; slot++) {
byte* const last = d_stream.next_out;
rec_t* rec = recs[slot];
......@@ -1615,10 +1637,17 @@ page_zip_decompress(
goto zlib_error;
}
d_stream.avail_in
-= BTR_EXTERN_FIELD_REF_SIZE;
/* Clear the BLOB pointer in case
the record was deleted. The BLOB
pointers of existing records will
be copied later from "externs" in
this function. */
memset(d_stream.next_out, 0,
BTR_EXTERN_FIELD_REF_SIZE);
d_stream.next_out
+= BTR_EXTERN_FIELD_REF_SIZE;
d_stream.avail_in
-= BTR_EXTERN_FIELD_REF_SIZE;
}
}
......@@ -1732,7 +1761,7 @@ page_zip_decompress(
goto recs_done;
}
slot = 0;
/* Restore the uncompressed columns in heap_no order. */
if (page_is_leaf(page)) {
const byte* externs;
......@@ -1748,19 +1777,39 @@ page_zip_decompress(
ulint i;
ulint len;
byte* dst;
rec_t* rec = recs[slot];
rec_t* rec = *recs++;
if (trx_id_col != ULINT_UNDEFINED
|| slot < page_get_n_recs(page)) {
if (UNIV_UNLIKELY(trx_id_col != ULINT_UNDEFINED)) {
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
}
/* Check if there are any externally stored columns.
For each externally stored column, restore the
BTR_EXTERN_FIELD_REF separately. */
dst = rec_get_nth_field(rec, offsets,
trx_id_col, &len);
ut_ad(len >= DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN);
storage -= DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
memcpy(dst, storage,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
if (UNIV_LIKELY(!page_zip_dir_find_free(
page_zip, ut_align_offset(
rec, UNIV_PAGE_SIZE)))) {
goto process_externs;
}
} else if (UNIV_LIKELY(!page_zip_dir_find_free(
page_zip, ut_align_offset(
rec, UNIV_PAGE_SIZE)))) {
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
process_externs:
/* Check if there are any externally stored
columns in this existing (not deleted)
record. For each externally stored column,
restore the BTR_EXTERN_FIELD_REF separately. */
if (slot < page_get_n_recs(page)) {
for (i = 0; i < rec_offs_n_fields(offsets);
i++) {
if (!rec_offs_nth_extern(offsets, i)) {
......@@ -1779,20 +1828,10 @@ page_zip_decompress(
page_zip->n_blobs++;
}
}
if (trx_id_col != ULINT_UNDEFINED) {
dst = rec_get_nth_field(rec, offsets,
trx_id_col, &len);
ut_ad(len >= DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN);
storage -= DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
memcpy(dst, storage,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
}
} while (++slot < n_dense);
} while (--n_dense);
} else {
do {
rec_t* rec = recs[slot];
rec_t* rec = *recs++;
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
......@@ -1803,7 +1842,7 @@ page_zip_decompress(
memcpy(rec_get_end(rec, offsets) - REC_NODE_PTR_SIZE,
storage, REC_NODE_PTR_SIZE);
} while (++slot < n_dense);
} while (--n_dense);
}
recs_done:
......@@ -1864,7 +1903,11 @@ page_zip_validate(
}
if (memcmp(page + PAGE_HEADER, temp_page + PAGE_HEADER,
UNIV_PAGE_SIZE - PAGE_HEADER - FIL_PAGE_DATA_END)) {
fputs("content mismatch\n", stderr);
/* This can happen if a record containing externally
stored columns was deleted and page_zip_clear_rec() failed
to clear the record. However, page_zip_validate() will
only be invoked in debug builds. */
fputs("page_zip_validate(): content mismatch\n", stderr);
valid = FALSE;
}
......@@ -2021,7 +2064,7 @@ page_zip_write_rec(
}
/* Store separately trx_id, roll_ptr and
the BTR_EXTERN_FIELD_REF of each BLOB column._*/
the BTR_EXTERN_FIELD_REF of each BLOB column. */
for (i = 0; i < rec_offs_n_fields(offsets); i++) {
const byte* src;
......@@ -2301,9 +2344,9 @@ page_zip_clear_rec(
mtr_t* mtr) /* in: mini-transaction */
{
ulint heap_no;
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, ut_align_down(rec, UNIV_PAGE_SIZE)));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
page_t* page = ut_align_down(rec, UNIV_PAGE_SIZE);
/* page_zip_validate() would fail here if a record
containing externally stored columns is being deleted. */
ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(!page_zip_dir_find(page_zip,
ut_align_offset(rec, UNIV_PAGE_SIZE)));
......@@ -2318,9 +2361,6 @@ page_zip_clear_rec(
+ page_zip_get_trailer_len(page_zip, index, NULL)
< page_zip->size) {
byte* data;
page_t* page;
page = ut_align_down(rec, UNIV_PAGE_SIZE);
/* Clear only the data bytes, because the allocator and
the decompressor depend on the extra bytes. */
......@@ -2356,6 +2396,9 @@ page_zip_clear_rec(
*data++ = (heap_no - 1) << 1 | 1;
ut_ad(!*data);
page_zip->m_end = data - page_zip->data;
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
} else {
/* There is not enough space to log the clearing.
Try to clear the block and to recompress the page. */
......@@ -2366,11 +2409,16 @@ page_zip_clear_rec(
/* Do not touch the extra bytes, because the
decompressor depends on them. */
memset(rec, 0, rec_offs_data_size(offsets));
if (UNIV_UNLIKELY(!page_zip_compress(page_zip,
ut_align_down(rec, UNIV_PAGE_SIZE),
if (UNIV_UNLIKELY(!page_zip_compress(page_zip, page,
index, mtr))) {
/* Compression failed. Restore the block. */
memcpy(rec, buf, rec_offs_data_size(offsets));
/* From now on, page_zip_validate() would fail
if a record containing externally stored columns
is being deleted. However, page_zip_validate()
will only be invoked in debug builds. Should
this be an issue, we could here zero out the
BLOB pointers contained in rec. */
}
mem_free(buf);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment