Commit de60fbdf authored by marko's avatar marko

branches/zip: Fixes to allow the speedc test to run without assertion failures.

page_zip_dir_decode(): Correct the handling of the free list.

page_zip_set_extra_bytes(): Remove off-by-one error in the first loop.

page_zip_apply_log(): Replace parameter end with size, and let end=data+size.

page_zip_decompress(): Properly handle pages where the first user record has
more than REC_N_NEW_EXTRA_BYTES extra bytes.

page_delete_rec_list_end(): Remove page_zip_temp.  This operation will be
done completely in-place on page_zip.  If page_zip is specified, clear the
data bytes and the info and status bits of deleted records and clear the
removed directory slots.

page_dir_delete_slot(): Clear the last directory slot, which will be removed.
parent 818d1229
......@@ -829,11 +829,8 @@ page_delete_rec_list_end(
ulint slot_index;
rec_t* last_rec;
rec_t* prev_rec;
rec_t* rec2;
ulint count;
ulint n_owned;
page_t* page;
page_zip_des_t* page_zip_temp;
ut_ad(size == ULINT_UNDEFINED || size < UNIV_PAGE_SIZE);
ut_ad(!page_zip || page_rec_is_comp(rec));
......@@ -847,18 +844,11 @@ page_delete_rec_list_end(
return;
}
if (page_zip && !page_zip_available(page_zip, 17)) {
/* Try compressing the page afterwards. */
page_zip_temp = NULL;
} else {
page_zip_temp = page_zip;
}
/* Reset the last insert info in the page header and increment
the modify clock for the frame */
page = ut_align_down(rec, UNIV_PAGE_SIZE);
page_header_set_ptr(page, page_zip_temp, PAGE_LAST_INSERT, NULL);
page_header_set_ptr(page, page_zip, PAGE_LAST_INSERT, NULL);
/* The page gets invalid for optimistic searches: increment the
frame modify clock */
......@@ -874,6 +864,7 @@ page_delete_rec_list_end(
last_rec = page_rec_get_prev(page_get_supremum_rec(page));
if ((size == ULINT_UNDEFINED) || (n_recs == ULINT_UNDEFINED)) {
rec_t* rec2 = rec;
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
......@@ -881,12 +872,24 @@ page_delete_rec_list_end(
/* Calculate the sum of sizes and the number of records */
size = 0;
n_recs = 0;
rec2 = rec;
do {
ulint s;
offsets = rec_get_offsets(rec2, index, offsets,
ULINT_UNDEFINED, &heap);
if (1 /* TODO: UNIV_LIKELY_NULL(page_zip) */) {
/* Clear the data bytes of the deleted
record in order to improve the
compression ratio of the page. The
extra bytes of the record cannot be
cleared, because page_mem_alloc()
needs them in order to determine the
size of the deleted record. */
memset(rec2, 0, rec_offs_data_size(offsets));
}
s = rec_offs_size(offsets);
ut_ad(rec2 - page + s - rec_offs_extra_size(offsets)
< UNIV_PAGE_SIZE);
......@@ -908,10 +911,10 @@ page_delete_rec_list_end(
of the records owned by the supremum record, as it is allowed to be
less than PAGE_DIR_SLOT_MIN_N_OWNED */
rec2 = rec;
count = 0;
if (page_is_comp(page)) {
rec_t* rec2 = rec;
ulint count = 0;
while (rec_get_n_owned_new(rec2) == 0) {
count++;
......@@ -921,7 +924,35 @@ page_delete_rec_list_end(
ut_ad(rec_get_n_owned_new(rec2) > count);
n_owned = rec_get_n_owned_new(rec2) - count;
slot_index = page_dir_find_owner_slot(rec2);
slot = page_dir_get_nth_slot(page, slot_index);
if (1 /* TODO: UNIV_UNLIKELY(page_zip != NULL) */) {
ulint n_slots;
rec2 = rec;
do {
/* The compression algorithm expects
info_bits and n_owned to be 0
for deleted records. */
rec2[-REC_N_NEW_EXTRA_BYTES] = 0;
rec2 = rec_get_next_ptr(rec2, TRUE);
}
while (rec2);
/* The compression algorithm expects the removed
slots in the page directory to be cleared. */
n_slots = page_dir_get_n_slots(page) - slot_index - 1;
ut_ad(n_slots > 0);
ut_ad(n_slots < UNIV_PAGE_SIZE / PAGE_DIR_SLOT_SIZE);
memset(slot - (n_slots * PAGE_DIR_SLOT_SIZE), 0,
n_slots * PAGE_DIR_SLOT_SIZE);
}
} else {
rec_t* rec2 = rec;
ulint count = 0;
while (rec_get_n_owned_old(rec2) == 0) {
count++;
......@@ -931,38 +962,30 @@ page_delete_rec_list_end(
ut_ad(rec_get_n_owned_old(rec2) > count);
n_owned = rec_get_n_owned_old(rec2) - count;
}
slot_index = page_dir_find_owner_slot(rec2);
slot = page_dir_get_nth_slot(page, slot_index);
}
page_dir_slot_set_rec(slot, page_zip_temp,
page_dir_slot_set_rec(slot, page_zip,
page_get_supremum_rec(page));
page_dir_slot_set_n_owned(slot, page_zip_temp, n_owned);
page_dir_slot_set_n_owned(slot, page_zip, n_owned);
page_dir_set_n_slots(page, page_zip_temp, slot_index + 1);
page_dir_set_n_slots(page, page_zip, slot_index + 1);
/* Remove the record chain segment from the record chain */
page_rec_set_next(prev_rec, page_get_supremum_rec(page),
page_zip_temp);
page_rec_set_next(prev_rec, page_get_supremum_rec(page), page_zip);
/* Catenate the deleted chain segment to the page free list */
page_rec_set_next(last_rec, page_header_get_ptr(page, PAGE_FREE),
page_zip_temp);
page_header_set_ptr(page, page_zip_temp, PAGE_FREE, rec);
page_zip);
page_header_set_ptr(page, page_zip, PAGE_FREE, rec);
page_header_set_field(page, page_zip_temp, PAGE_GARBAGE,
page_header_set_field(page, page_zip, PAGE_GARBAGE,
size + page_header_get_field(page, PAGE_GARBAGE));
page_header_set_field(page, page_zip_temp, PAGE_N_RECS,
page_header_set_field(page, page_zip, PAGE_N_RECS,
(ulint)(page_get_n_recs(page) - n_recs));
if (UNIV_LIKELY_NULL(page_zip) && UNIV_UNLIKELY(!page_zip_temp)) {
/* Reorganize and compress the page. */
/* TODO: coverage test. Is this allowed? */
btr_page_reorganize(page, index, mtr);
}
}
/*****************************************************************
......@@ -1210,7 +1233,10 @@ page_dir_delete_slot(
page_zip, rec);
}
/* 4. Update the page header */
/* 4. Zero out the last slot, which will be removed */
mach_write_to_2(page_dir_get_nth_slot(page, n_slots - 1), 0);
/* 5. Update the page header */
page_header_set_field(page, page_zip, PAGE_N_DIR_SLOTS, n_slots - 1);
}
......
......@@ -364,6 +364,10 @@ page_zip_dir_decode(
n_recs = page_get_n_recs(page);
if (UNIV_UNLIKELY(n_recs > n_dense)) {
return(FALSE);
}
/* Traverse the list of stored records in the sorting order,
starting from the first user record. */
......@@ -397,7 +401,7 @@ page_zip_dir_decode(
}
/* Copy the rest of the dense directory. */
for (i = 0; i < n_dense; i++) {
for (; i < n_dense; i++) {
ulint offs = page_zip_dir_get(page_zip, i);
if (UNIV_UNLIKELY(offs & ~PAGE_ZIP_DIR_SLOT_MASK)) {
......@@ -434,8 +438,6 @@ page_zip_set_extra_bytes(
for (i = 0; i < n; i++) {
offs = page_zip_dir_get(page_zip, i);
rec_set_next_offs_new(rec, NULL, offs);
rec = page + offs;
if (UNIV_UNLIKELY(offs & PAGE_ZIP_DIR_SLOT_DEL)) {
info_bits |= REC_INFO_DELETED_FLAG;
......@@ -452,6 +454,8 @@ page_zip_set_extra_bytes(
return(FALSE);
}
rec_set_next_offs_new(rec, NULL, offs);
rec = page + offs;
rec[-REC_N_NEW_EXTRA_BYTES] = info_bits;
info_bits = 0;
}
......@@ -506,9 +510,11 @@ page_zip_apply_log(
/* out: pointer to end of modification log,
or NULL on failure */
const byte* data, /* in: modification log */
const byte* end, /* in: end of compressed page */
ulint size, /* in: maximum length of the log, in bytes */
page_t* page) /* in/out: uncompressed page */
{
const byte* const end = data + size;
/* Apply the modification log. */
while (*data) {
ulint ulint_len;
......@@ -608,15 +614,6 @@ page_zip_decompress(
d_stream.avail_in = page_zip->size - (PAGE_DATA + 1)
- n_dense * PAGE_ZIP_DIR_SLOT_SIZE;
if (UNIV_LIKELY(n_dense > 0)
&& *recs == page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES)) {
dst = page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES);
recs++;
n_dense--;
} else {
dst = page + PAGE_ZIP_START;
}
info_bits = 0;
if (mach_read_from_2((page_t*) page + (PAGE_HEADER + PAGE_LEVEL))) {
......@@ -629,16 +626,36 @@ page_zip_decompress(
heap_status = REC_STATUS_ORDINARY | 2 << REC_HEAP_NO_SHIFT;
}
dst = page + PAGE_ZIP_START;
if (UNIV_LIKELY(n_dense > 0)) {
n_dense--;
if (*recs == page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES)) {
dst = page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES);
recs++;
} else {
/* This is a special case: we are
decompressing the extra bytes of the first
user record. As dst will not be pointing to a
record, we do not set the heap_no and status
bits. On the next round of the loop, dst will
point to the first user record. */
goto first_inflate;
}
}
while (n_dense--) {
/* set heap_no and the status bits */
mach_write_to_2(dst - REC_NEW_HEAP_NO, heap_status);
heap_status += 1 << REC_HEAP_NO_SHIFT;
first_inflate:
d_stream.next_out = dst;
d_stream.avail_out = *recs - dst - REC_N_NEW_EXTRA_BYTES;
ut_ad(d_stream.avail_out < UNIV_PAGE_SIZE
- PAGE_ZIP_START - PAGE_DIR);
/* set heap_no and the status bits */
mach_write_to_2(dst - REC_NEW_HEAP_NO, heap_status);
heap_status += 1 << REC_HEAP_NO_SHIFT;
err = inflate(&d_stream, Z_NO_FLUSH);
switch (err) {
case Z_OK:
......@@ -695,9 +712,7 @@ zlib_error:
const byte* mod_log_ptr;
mod_log_ptr = page_zip_apply_log(
page_zip->data + page_zip->m_start,
page_zip->data + page_zip->size
- n_dense * PAGE_ZIP_DIR_SLOT_SIZE,
page);
d_stream.avail_in, page);
if (UNIV_UNLIKELY(!mod_log_ptr)) {
return(FALSE);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment