Commit 9d91b4bd authored by marko's avatar marko

branches/zip: Bug fixes for BLOB handling. At least one bug remains:

page_zip_dir_delete() will need to handle BLOBs.

rec_set_field_extern_bits(), rec_set_field_extern_bits_new():
Add parameter offsets.

rec_offs_set_nth_extern(): New function to set an extern bit in offsets.
This will be called when an extern bit is set in a record.

page_cur_rec_insert(), page_cur_insert_rec_low(): Document that the
parameter "offsets" is in/out.

page_zip_dir_delete(): Note that the array of BLOB pointers will need
to be shifted.

page0zip.ic: Document the entry type for clearing a record.

page_zip_available(): Add parameter "index".  Remove parameters
"is_leaf" and "is_clustered".

page_zip_get_trailer_len(): New function for computing the trailer length
of the compressed page.

page_zip_apply_log(): Implement the modification log entry type for
clearing the data bytes of a record.

page_zip_decompress(): Initialize n_blobs when actually copying the
BLOB pointers to place.

page_zip_validate(): Add diagnostic messages for failures.  Check
also m_start, m_end, and n_blobs.

page_zip_write_blob_ptr(): Add page_zip_validate() assertion.
parent 65dfefe2
......@@ -150,7 +150,7 @@ page_cur_rec_insert(
page_zip_des_t* page_zip,/* in/out: compressed page, or NULL */
rec_t* rec, /* in: record to insert */
dict_index_t* index, /* in: record descriptor */
ulint* offsets,/* in: rec_get_offsets(rec, index) */
ulint* offsets,/* in/out: rec_get_offsets(rec, index) */
mtr_t* mtr); /* in: mini-transaction handle */
/***************************************************************
Inserts a record next to page cursor. Returns pointer to inserted record if
......@@ -167,7 +167,7 @@ page_cur_insert_rec_low(
page_zip_des_t* page_zip,/* in/out: compressed page, or NULL */
dict_index_t* index, /* in: record descriptor */
rec_t* rec, /* in: pointer to a physical record */
ulint* offsets,/* in: rec_get_offsets(rec, index) */
ulint* offsets,/* in/out: rec_get_offsets(rec, index) */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext, /* in: number of elements in vec */
mtr_t* mtr); /* in: mini-transaction handle */
......
......@@ -220,7 +220,7 @@ page_cur_rec_insert(
page_zip_des_t* page_zip,/* in/out: compressed page, or NULL */
rec_t* rec, /* in: record to insert */
dict_index_t* index, /* in: record descriptor */
ulint* offsets,/* in: rec_get_offsets(rec, index) */
ulint* offsets,/* in/out: rec_get_offsets(rec, index) */
mtr_t* mtr) /* in: mini-transaction handle */
{
return(page_cur_insert_rec_low(cursor, page_zip,
......
......@@ -209,7 +209,8 @@ page_zip_rec_set_owned(
__attribute__((nonnull));
/**************************************************************************
Shift the dense page directory when a record is deleted. */
Shift the dense page directory and the array of BLOB pointers
when a record is deleted. */
void
page_zip_dir_delete(
......
......@@ -48,9 +48,11 @@ covering the compressed portion of the page, as follows.
MODIFICATION LOG ENTRY FORMAT
- write record:
- heap_no-1 (1..2 bytes)
- (heap_no - 1) << 1 (1..2 bytes)
- extra bytes backwards
- data bytes
- clear record:
- (heap_no - 1) << 1 | 1 (1..2 bytes)
The integer values are stored in a variable-length format:
- 0xxxxxxx: 0..127
......@@ -100,11 +102,8 @@ page_zip_available(
/* out: TRUE if page_zip_write_rec()
will succeed */
const page_zip_des_t* page_zip,/* in: compressed page */
dict_index_t* index, /* in: index of the B-tree node */
ulint length, /* in: combined size of the record */
ulint is_leaf,/* in: nonzero=leaf node,
zero=node pointer page */
ulint is_clustered,/* in: nonzero=clustered index,
zero=secondary index */
ulint create) /* in: nonzero=add the record to
the heap */
__attribute__((warn_unused_result, nonnull, pure));
......@@ -164,8 +163,7 @@ page_zip_alloc(
ut_ad(page_is_comp((page_t*) page));
ut_ad(page_zip_validate(page_zip, page));
if (page_zip_available(page_zip, length, page_is_leaf(page),
dict_index_is_clust(index), create)) {
if (page_zip_available(page_zip, index, length, create)) {
return(TRUE);
}
......@@ -181,47 +179,67 @@ page_zip_alloc(
}
/* Check if there is enough space available after compression. */
return(page_zip_available(page_zip, length, page_is_leaf(page),
dict_index_is_clust(index), create));
return(page_zip_available(page_zip, index, length, create));
}
/**************************************************************************
Determine if enough space is available in the modification log. */
Determine if the length of the page trailer. */
UNIV_INLINE
ibool
page_zip_available(
/*===============*/
/* out: TRUE if enough space
is available */
page_zip_get_trailer_len(
/*=====================*/
/* out: length of the page trailer,
in bytes, not including the terminating
zero byte of the modification log */
const page_zip_des_t* page_zip,/* in: compressed page */
ulint length, /* in: combined size of the record */
ulint is_leaf,/* in: nonzero=leaf node,
zero=node pointer page */
ulint is_clustered,/* in: nonzero=clustered index,
zero=secondary index */
ulint create) /* in: nonzero=add the record to
the heap */
dict_index_t* index, /* in: index of the B-tree node */
ulint* entry_size)/* out: size of the uncompressed
portion of a user record */
{
ulint uncompressed_size;
ulint trailer_len;
ut_ad(page_zip_simple_validate(page_zip));
ut_ad(length > REC_N_NEW_EXTRA_BYTES);
if (UNIV_UNLIKELY(!is_leaf)) {
if (UNIV_UNLIKELY(!page_is_leaf((page_t*) page_zip->data))) {
uncompressed_size = PAGE_ZIP_DIR_SLOT_SIZE
+ REC_NODE_PTR_SIZE;
} else if (UNIV_UNLIKELY(is_clustered)) {
} else if (dict_index_is_clust(index)) {
uncompressed_size = PAGE_ZIP_DIR_SLOT_SIZE
+ DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
} else {
uncompressed_size = PAGE_ZIP_DIR_SLOT_SIZE;
}
trailer_len = (page_dir_get_n_heap((page_t*) page_zip->data) - 2)
if (entry_size) {
*entry_size = uncompressed_size;
}
return((page_dir_get_n_heap((page_t*) page_zip->data) - 2)
* uncompressed_size
+ page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE;
+ page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE);
}
/**************************************************************************
Determine if enough space is available in the modification log. */
UNIV_INLINE
ibool
page_zip_available(
/*===============*/
/* out: TRUE if enough space
is available */
const page_zip_des_t* page_zip,/* in: compressed page */
dict_index_t* index, /* in: index of the B-tree node */
ulint length, /* in: combined size of the record */
ulint create) /* in: nonzero=add the record to
the heap */
{
ulint uncompressed_size;
ulint trailer_len;
ut_ad(length > REC_N_NEW_EXTRA_BYTES);
trailer_len = page_zip_get_trailer_len(page_zip, index,
&uncompressed_size);
/* Subtract the fixed extra bytes and add the maximum
space needed for identifying the record (encoded heap_no). */
......
......@@ -470,6 +470,7 @@ rec_set_field_extern_bits(
/*======================*/
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
ulint* offsets,/* in/out: rec_get_offsets(rec, index) */
const ulint* vec, /* in: array of field numbers */
ulint n_fields);/* in: number of fields numbers */
/***************************************************************
......
......@@ -891,7 +891,7 @@ page_cur_insert_rec_low(
page_zip_des_t* page_zip,/* in/out: compressed page, or NULL */
dict_index_t* index, /* in: record descriptor */
rec_t* rec, /* in: pointer to a physical record */
ulint* offsets,/* in: rec_get_offsets(rec, index) */
ulint* offsets,/* in/out: rec_get_offsets(rec, index) */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext, /* in: number of elements in vec */
mtr_t* mtr) /* in: mini-transaction handle */
......@@ -1009,7 +1009,8 @@ page_cur_insert_rec_low(
/* Set the "extern storage" flags */
if (UNIV_UNLIKELY(n_ext)) {
rec_set_field_extern_bits(insert_rec, index, ext, n_ext);
rec_set_field_extern_bits(insert_rec, index, offsets,
ext, n_ext);
}
/* 4. Insert the record in the linked list of records */
......
......@@ -1181,13 +1181,28 @@ page_zip_apply_log(
if (UNIV_UNLIKELY(data >= end)) {
return(NULL);
}
if (UNIV_UNLIKELY(val > n_dense)) {
if (UNIV_UNLIKELY((val >> 1) > n_dense)) {
return(NULL);
}
/* Determine the heap number and status bits of the record. */
rec = recs[val - 1];
hs = (val + 1) << REC_HEAP_NO_SHIFT;
rec = recs[(val >> 1) - 1];
if (val & 1) {
/* Clear the data bytes of the record. */
mem_heap_t* heap = NULL;
ulint* offs;
offs = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
memset(rec, 0, rec_offs_data_size(offs));
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
continue;
}
hs = ((val >> 1) + 1) << REC_HEAP_NO_SHIFT;
hs |= heap_status & ((1 << REC_HEAP_NO_SHIFT) - 1);
/* This may either be an old record that is being
......@@ -1435,8 +1450,6 @@ page_zip_decompress(
*offsets = n;
}
page_zip->n_blobs = 0;
while (n_dense--) {
byte* const last = d_stream.next_out;
rec_t* rec = *recsc++;
......@@ -1555,7 +1568,6 @@ page_zip_decompress(
-= BTR_EXTERN_FIELD_REF_SIZE;
d_stream.next_out
+= BTR_EXTERN_FIELD_REF_SIZE;
page_zip->n_blobs++;
}
}
......@@ -1662,6 +1674,7 @@ page_zip_decompress(
/* Copy the uncompressed fields. */
page_zip->n_blobs = 0;
storage = page_zip->data + page_zip->size
- n_dense * PAGE_ZIP_DIR_SLOT_SIZE;
if (trx_id_col != ULINT_UNDEFINED) {
......@@ -1701,6 +1714,8 @@ page_zip_decompress(
/* Copy the BLOB pointer */
memcpy(dst, externs,
BTR_EXTERN_FIELD_REF_SIZE);
page_zip->n_blobs++;
}
}
......@@ -1756,9 +1771,36 @@ page_zip_validate(
== page_zip);
ut_a(page_is_comp((page_t*) page));
valid = page_zip_decompress(&temp_page_zip, temp_page, NULL)
&& !memcmp(page, temp_page,
UNIV_PAGE_SIZE - FIL_PAGE_DATA_END);
valid = page_zip_decompress(&temp_page_zip, temp_page, NULL);
if (!valid) {
fputs("page_zip_validate(): failed to decompress\n", stderr);
goto func_exit;
}
if (page_zip->n_blobs != temp_page_zip.n_blobs) {
fprintf(stderr,
"page_zip_validate(): n_blobs mismatch: %lu!=%lu\n",
page_zip->n_blobs, temp_page_zip.n_blobs);
valid = FALSE;
}
if (page_zip->m_start != temp_page_zip.m_start) {
fprintf(stderr,
"page_zip_validate(): m_start mismatch: %lu!=%lu\n",
page_zip->m_start, temp_page_zip.m_start);
valid = FALSE;
}
if (page_zip->m_end != temp_page_zip.m_end) {
fprintf(stderr,
"page_zip_validate(): m_end mismatch: %lu!=%lu\n",
page_zip->m_end, temp_page_zip.m_end);
valid = FALSE;
}
if (memcmp(page + PAGE_HEADER, temp_page + PAGE_HEADER,
UNIV_PAGE_SIZE - PAGE_HEADER - FIL_PAGE_DATA_END)) {
fputs("content mismatch\n", stderr);
valid = FALSE;
}
func_exit:
buf_frame_free(temp_page);
return(valid);
}
......@@ -1810,10 +1852,10 @@ page_zip_write_rec(
/* Identify the record by writing its heap number - 1.
0 is reserved to indicate the end of the modification log. */
if (UNIV_UNLIKELY(heap_no - 1 >= 128)) {
*data++ = 0x80 | (heap_no - 1) >> 8;
if (UNIV_UNLIKELY(heap_no - 1 >= 64)) {
*data++ = 0x80 | (heap_no - 1) >> 7;
}
*data++ = heap_no - 1;
*data++ = (heap_no - 1) << 1;
{
const byte* start = rec_get_start((rec_t*) rec, offsets);
......@@ -2035,6 +2077,11 @@ page_zip_write_blob_ptr(
field + len - BTR_EXTERN_FIELD_REF_SIZE,
BTR_EXTERN_FIELD_REF_SIZE);
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip,
ut_align_down((rec_t*) rec, UNIV_PAGE_SIZE)));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
if (mtr) {
mlog_write_initial_log_record(
(rec_t*) rec, MLOG_ZIP_WRITE_BLOB_PTR, mtr);
......@@ -2160,20 +2207,35 @@ page_zip_clear_rec(
const ulint* offsets,/* in: rec_get_offsets(rec, index) */
mtr_t* mtr) /* in: mini-transaction */
{
ulint heap_no;
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, ut_align_down(rec, UNIV_PAGE_SIZE)));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
ut_ad(rec_offs_validate(rec, index, offsets));
if (page_zip_available(page_zip, rec_offs_size(offsets),
page_is_leaf(page_zip->data),
dict_index_is_clust(index), 0)) {
memset(rec - rec_offs_extra_size(offsets), 0,
rec_offs_extra_size(offsets) - REC_N_NEW_EXTRA_BYTES);
heap_no = rec_get_heap_no_new(rec);
if (page_zip->m_end
+ 1 + ((heap_no - 1) >= 64)/* size of the log entry */
+ page_zip_get_trailer_len(page_zip, index, NULL)
< page_zip->size) {
byte* data;
/* Do not touch the extra bytes, because the
decompressor depends on them. */
memset(rec, 0, rec_offs_data_size(offsets));
/* Log that the data was zeroed out. */
page_zip_write_rec(page_zip, rec, index, offsets, 0);
data = page_zip->data + page_zip->m_end;
ut_ad(!*data);
if (UNIV_UNLIKELY(heap_no - 1 >= 64)) {
*data++ = 0x80 | (heap_no - 1) >> 7;
}
*data++ = (heap_no - 1) << 1 | 1;
ut_ad(!*data);
page_zip->m_end = data - page_zip->data;
ut_ad(page_zip_validate(page_zip,
ut_align_down(rec, UNIV_PAGE_SIZE)));
} else {
/* There is not enough space to log the clearing.
Try to clear the block and to recompress the page. */
......@@ -2182,8 +2244,8 @@ page_zip_clear_rec(
memcpy(buf, rec - rec_offs_extra_size(offsets),
rec_offs_size(offsets));
memset(rec - rec_offs_extra_size(offsets), 0,
rec_offs_extra_size(offsets) - REC_N_NEW_EXTRA_BYTES);
/* Do not touch the extra bytes, because the
decompressor depends on them. */
memset(rec, 0, rec_offs_data_size(offsets));
/* TODO: maybe log the memset()s? */
......@@ -2257,7 +2319,8 @@ page_zip_rec_set_owned(
/**************************************************************************
Shift the dense page directory when a record is deleted. */
Shift the dense page directory and the array of BLOB pointers
when a record is deleted. */
void
page_zip_dir_delete(
......@@ -2295,6 +2358,8 @@ page_zip_dir_delete(
slot_rec - slot_free);
}
/* TODO: shift and zero fill the array of BLOB pointers, set n_blobs */
/* Write the entry for the deleted record.
The "owned" and "deleted" flags will be cleared. */
mach_write_to_2(slot_free, ut_align_offset(rec, UNIV_PAGE_SIZE));
......
......@@ -727,6 +727,18 @@ rec_set_nth_field_null_bit(
rec_2_set_field_end_info(rec, i, info);
}
/**********************************************************
Sets the extern bit in nth field of rec. */
UNIV_INLINE
void
rec_offs_set_nth_extern(
/*====================*/
ulint* offsets,/* in: array returned by rec_get_offsets() */
ulint n) /* in: nth field */
{
rec_offs_base(offsets)[1 + n] |= REC_OFFS_EXTERNAL;
}
/***************************************************************
Sets the ith field extern storage bit of an old-style record. */
static
......@@ -755,6 +767,7 @@ rec_set_field_extern_bits_new(
/*==========================*/
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
ulint* offsets,/* in/out: rec_get_offsets(rec, index) */
const ulint* ext, /* in: array of field numbers */
ulint n_ext) /* in: number of elements in ext */
{
......@@ -820,6 +833,7 @@ rec_set_field_extern_bits_new(
/* set the extern bit */
len |= 0x40;
lens[1] = (byte) len;
rec_offs_set_nth_extern(offsets, i);
}
lens--;
} else {
......@@ -841,17 +855,22 @@ rec_set_field_extern_bits(
/*======================*/
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
ulint* offsets,/* in/out: rec_get_offsets(rec, index) */
const ulint* vec, /* in: array of field numbers */
ulint n_fields)/* in: number of fields numbers */
{
if (dict_table_is_comp(index->table)) {
rec_set_field_extern_bits_new(rec, index, vec, n_fields);
ut_ad(rec_offs_validate(rec, index, offsets));
if (rec_offs_comp(offsets)) {
rec_set_field_extern_bits_new(rec, index, offsets,
vec, n_fields);
} else {
ut_a(!rec_get_1byte_offs_flag(rec));
ulint i;
for (i = 0; i < n_fields; i++) {
rec_set_nth_field_extern_bit_old(rec, vec[i]);
rec_offs_set_nth_extern(offsets, vec[i]);
}
}
}
......
......@@ -1406,7 +1406,7 @@ trx_undo_prev_version_build(
*old_vers = rec_convert_dtuple_to_rec(buf, index, entry);
/* Now set the extern bits in the old version of the record */
rec_set_field_extern_bits(*old_vers, index,
rec_set_field_extern_bits(*old_vers, index, offsets,
ext_vect, n_ext_vect);
mem_free(ext_vect);
} else {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment