Commit d1f9c8ec authored by marko's avatar marko

branches/zip: Minor fixes to get a page with one record compress/decompress.

A page with multiple records or deleted records still does not compress
or decompress properly.

buf_flush_init_for_writing(): Initialize block->page_zip properly so that all
assertions in page0zip can be enabled.

page_zip_decompress(): Note that corrupt data should not lead to assertions.

page_zip_dir_set(): Correct the interface.  Fix off-by-one error.
page_zip_dir_get(): Fix off-by-one error.

page0zip.c: Replace n_heap with n_dense and add comments about
the infimum and supremum records whenever we subtract 2 from heap_no.
Fix some programming errors.
parent 13002bdf
...@@ -454,9 +454,6 @@ buf_flush_init_for_writing( ...@@ -454,9 +454,6 @@ buf_flush_init_for_writing(
ulint space, /* in: space id */ ulint space, /* in: space id */
ulint page_no) /* in: page number */ ulint page_no) /* in: page number */
{ {
#if 1 /* testing */
page_zip_des_t page_zip = { buf_frame_alloc(), UNIV_PAGE_SIZE, 0, 0 };
#endif /* testing */
/* Write the newest modification lsn to the page header and trailer */ /* Write the newest modification lsn to the page header and trailer */
mach_write_to_8(page + FIL_PAGE_LSN, newest_lsn); mach_write_to_8(page + FIL_PAGE_LSN, newest_lsn);
...@@ -483,10 +480,17 @@ buf_flush_init_for_writing( ...@@ -483,10 +480,17 @@ buf_flush_init_for_writing(
buf_calc_page_old_checksum(page) : BUF_NO_CHECKSUM_MAGIC); buf_calc_page_old_checksum(page) : BUF_NO_CHECKSUM_MAGIC);
#if 1 /* testing */ #if 1 /* testing */
if (page_is_comp(page)) { if (page_is_comp(page)) {
ut_a(page_zip_compress(&page_zip, page)); byte zip_data[16384];
fprintf(stderr, "page_zip.size==%lu\n", (ulong) page_zip.size); page_zip_des_t* page_zip = &buf_block_align(page)->page_zip;
page_zip->data = zip_data;
page_zip->size = sizeof zip_data;
page_zip->m_start = page_zip->m_end = 0;
ut_a(page_zip_compress(page_zip, page));
fprintf(stderr, "zip size==%lu+%lu\n",
(ulong) page_zip->m_start,
(ulong) 2 * page_dir_get_n_heap(page_zip->data));
page_zip->data = NULL;
} }
buf_frame_free(page_zip.data);
#endif /* testing */ #endif /* testing */
} }
......
...@@ -38,7 +38,9 @@ page_zip_compress( ...@@ -38,7 +38,9 @@ page_zip_compress(
const page_t* page); /* in: uncompressed page */ const page_t* page); /* in: uncompressed page */
/************************************************************************** /**************************************************************************
Decompress a page. */ Decompress a page. This function should tolerate errors on the compressed
page. Instead of letting assertions fail, it will return FALSE if an
inconsistency is detected. */
ibool ibool
page_zip_decompress( page_zip_decompress(
...@@ -102,14 +104,11 @@ UNIV_INLINE ...@@ -102,14 +104,11 @@ UNIV_INLINE
void void
page_zip_dir_set( page_zip_dir_set(
/*==============*/ /*==============*/
const page_zip_des_t* page_zip, /* in: compressed page */ page_zip_des_t* page_zip, /* in: compressed page */
ulint slot, /* in: slot ulint slot, /* in: slot (0=first user record) */
(0=first user record) */ ulint offs); /* in: offset, possibly ORed with
ulint offs) /* in: offset, possibly PAGE_ZIP_DIR_SLOT_DEL or
ORed with PAGE_ZIP_DIR_SLOT_OWNED */
PAGE_ZIP_DIR_SLOT_DEL or
PAGE_ZIP_DIR_SLOT_OWNED */
__attribute__((pure));
/************************************************************************** /**************************************************************************
Determine the encoded length of an integer in the modification log. */ Determine the encoded length of an integer in the modification log. */
UNIV_INLINE UNIV_INLINE
......
...@@ -166,7 +166,7 @@ page_zip_dir_get( ...@@ -166,7 +166,7 @@ page_zip_dir_get(
ut_ad(page_zip_simple_validate(page_zip)); ut_ad(page_zip_simple_validate(page_zip));
ut_ad(slot + 2 < page_dir_get_n_heap((page_t*) page_zip->data)); ut_ad(slot + 2 < page_dir_get_n_heap((page_t*) page_zip->data));
return(mach_read_from_2(page_zip->data + page_zip->size return(mach_read_from_2(page_zip->data + page_zip->size
- PAGE_ZIP_DIR_SLOT_SIZE * slot)); - PAGE_ZIP_DIR_SLOT_SIZE * (slot + 1)));
} }
/***************************************************************** /*****************************************************************
Write a given slot in the dense page directory. */ Write a given slot in the dense page directory. */
...@@ -174,18 +174,15 @@ UNIV_INLINE ...@@ -174,18 +174,15 @@ UNIV_INLINE
void void
page_zip_dir_set( page_zip_dir_set(
/*==============*/ /*==============*/
const page_zip_des_t* page_zip, /* in: compressed page */ page_zip_des_t* page_zip, /* in: compressed page */
ulint slot, /* in: slot ulint slot, /* in: slot (0=first user record) */
(0=first user record) */ ulint offs) /* in: offset, possibly ORed with
ulint offs) /* in: offset, possibly PAGE_ZIP_DIR_SLOT_DEL or
ORed with PAGE_ZIP_DIR_SLOT_OWNED */
PAGE_ZIP_DIR_SLOT_DEL or
PAGE_ZIP_DIR_SLOT_OWNED */
{ {
ut_ad(page_zip_simple_validate(page_zip)); ut_ad(page_zip_simple_validate(page_zip));
ut_ad(slot + 2 < page_dir_get_n_heap((page_t*) page_zip->data));
mach_write_to_2(page_zip->data + page_zip->size mach_write_to_2(page_zip->data + page_zip->size
- PAGE_ZIP_DIR_SLOT_SIZE * slot, - PAGE_ZIP_DIR_SLOT_SIZE * (slot + 1),
offs); offs);
} }
......
...@@ -17,6 +17,9 @@ Created June 2005 by Marko Makela ...@@ -17,6 +17,9 @@ Created June 2005 by Marko Makela
#include "ut0sort.h" #include "ut0sort.h"
#include "zlib.h" #include "zlib.h"
/* Please refer to ../include/page0zip.ic for a description of the
compressed page format. */
/* The infimum and supremum records are omitted from the compressed page. /* The infimum and supremum records are omitted from the compressed page.
On compress, we compare that the records are there, and on uncompress we On compress, we compare that the records are there, and on uncompress we
restore the records. */ restore the records. */
...@@ -107,7 +110,7 @@ page_zip_dir_encode( ...@@ -107,7 +110,7 @@ page_zip_dir_encode(
page_zip_dir_set(page_zip, i++, offs); page_zip_dir_set(page_zip, i++, offs);
/* Ensure that each heap_no occurs at most once. */ /* Ensure that each heap_no occurs at most once. */
ut_a(!recs[heap_no - 2]); ut_a(!recs[heap_no - 2]); /* exclude infimum and supremum */
recs[heap_no - 2] = rec; recs[heap_no - 2] = rec;
ut_a(rec_get_status(rec) == status); ut_a(rec_get_status(rec) == status);
...@@ -121,7 +124,7 @@ page_zip_dir_encode( ...@@ -121,7 +124,7 @@ page_zip_dir_encode(
rec = (page_t*) page + offs; rec = (page_t*) page + offs;
heap_no = rec_get_heap_no_new(rec); heap_no = rec_get_heap_no_new(rec);
ut_a(heap_no >= 2); ut_a(heap_no >= 2); /* only user records can be deleted */
ut_a(heap_no < n_heap); ut_a(heap_no < n_heap);
ut_a(!rec[-REC_N_NEW_EXTRA_BYTES]); /* info_bits and n_owned */ ut_a(!rec[-REC_N_NEW_EXTRA_BYTES]); /* info_bits and n_owned */
...@@ -130,14 +133,14 @@ page_zip_dir_encode( ...@@ -130,14 +133,14 @@ page_zip_dir_encode(
page_zip_dir_set(page_zip, i++, offs); page_zip_dir_set(page_zip, i++, offs);
/* Ensure that each heap_no occurs at most once. */ /* Ensure that each heap_no occurs at most once. */
ut_a(!recs[heap_no - 2]); ut_a(!recs[heap_no - 2]); /* exclude infimum and supremum */
recs[heap_no - 2] = rec; recs[heap_no - 2] = rec;
offs = rec_get_next_offs(rec, TRUE); offs = rec_get_next_offs(rec, TRUE);
} }
/* Ensure that each heap no occurs at least once. */ /* Ensure that each heap no occurs at least once. */
ut_a(i + 2 == n_heap); ut_a(i + 2/* infimum and supremum */ == n_heap);
} }
/************************************************************************** /**************************************************************************
...@@ -154,7 +157,7 @@ page_zip_compress( ...@@ -154,7 +157,7 @@ page_zip_compress(
z_stream c_stream; z_stream c_stream;
int err; int err;
byte* buf; byte* buf;
ulint n_heap; ulint n_dense;
const byte* src; const byte* src;
const byte** recs; /* dense page directory, sorted by address */ const byte** recs; /* dense page directory, sorted by address */
mem_heap_t* heap; mem_heap_t* heap;
...@@ -179,17 +182,18 @@ page_zip_compress( ...@@ -179,17 +182,18 @@ page_zip_compress(
== PAGE_NEW_SUPREMUM); == PAGE_NEW_SUPREMUM);
} }
n_heap = page_dir_get_n_heap((page_t*) page) - 2; /* The dense directory excludes the infimum and supremum records. */
ut_a(n_heap * PAGE_ZIP_DIR_SLOT_SIZE < page_zip->size); n_dense = page_dir_get_n_heap((page_t*) page) - 2;
ut_a(n_dense * PAGE_ZIP_DIR_SLOT_SIZE < page_zip->size);
heap = mem_heap_create(page_zip->size heap = mem_heap_create(page_zip->size
+ n_heap * ((sizeof *recs) - PAGE_ZIP_DIR_SLOT_SIZE)); + n_dense * ((sizeof *recs) - PAGE_ZIP_DIR_SLOT_SIZE));
recs = mem_heap_alloc(heap, n_heap * sizeof *recs); recs = mem_heap_alloc(heap, n_dense * sizeof *recs);
memset(recs, 0, n_heap * sizeof *recs); memset(recs, 0, n_dense * sizeof *recs);
buf = mem_heap_alloc(heap, page_zip->size buf = mem_heap_alloc(heap, page_zip->size
- PAGE_DATA - PAGE_ZIP_DIR_SLOT_SIZE * n_heap); - PAGE_DATA - PAGE_ZIP_DIR_SLOT_SIZE * n_dense);
page_zip_dir_encode(page, page_zip, recs); page_zip_dir_encode(page, page_zip, recs);
...@@ -202,19 +206,19 @@ page_zip_compress( ...@@ -202,19 +206,19 @@ page_zip_compress(
ut_a(err == Z_OK); ut_a(err == Z_OK);
c_stream.next_out = buf; c_stream.next_out = buf;
c_stream.avail_out = page_zip->size - (PAGE_DATA - 1) c_stream.avail_out = page_zip->size - (PAGE_DATA + 1)
- n_heap * PAGE_ZIP_DIR_SLOT_SIZE; - n_dense * PAGE_ZIP_DIR_SLOT_SIZE;
if (UNIV_LIKELY(n_heap > 0) if (UNIV_LIKELY(n_dense > 0)
&& *recs == page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES)) { && *recs == page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES)) {
src = page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES); src = page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES);
recs++; recs++;
n_heap--; n_dense--;
} else { } else {
src = page + PAGE_ZIP_START; src = page + PAGE_ZIP_START;
} }
while (n_heap--) { while (n_dense--) {
c_stream.next_in = (void*) src; c_stream.next_in = (void*) src;
c_stream.avail_in = *recs - src - REC_N_NEW_EXTRA_BYTES; c_stream.avail_in = *recs - src - REC_N_NEW_EXTRA_BYTES;
...@@ -354,7 +358,7 @@ page_zip_dir_decode( ...@@ -354,7 +358,7 @@ page_zip_dir_decode(
rec_t** recs, /* out: dense page directory sorted by rec_t** recs, /* out: dense page directory sorted by
ascending address (and heap_no) */ ascending address (and heap_no) */
rec_t** recs_aux,/* in/out: scratch area */ rec_t** recs_aux,/* in/out: scratch area */
ulint n_heap) /* in: number of user records, and ulint n_dense)/* in: number of user records, and
size of recs[] and recs_aux[] */ size of recs[] and recs_aux[] */
{ {
ulint i; ulint i;
...@@ -396,7 +400,7 @@ page_zip_dir_decode( ...@@ -396,7 +400,7 @@ page_zip_dir_decode(
} }
/* Copy the rest of the dense directory. */ /* Copy the rest of the dense directory. */
for (i = 0; i < n_heap; i++) { for (i = 0; i < n_dense; i++) {
ulint offs = page_zip_dir_get(page_zip, i); ulint offs = page_zip_dir_get(page_zip, i);
if (UNIV_UNLIKELY(offs & ~PAGE_ZIP_DIR_SLOT_MASK)) { if (UNIV_UNLIKELY(offs & ~PAGE_ZIP_DIR_SLOT_MASK)) {
...@@ -406,8 +410,8 @@ page_zip_dir_decode( ...@@ -406,8 +410,8 @@ page_zip_dir_decode(
recs[i] = page + offs; recs[i] = page + offs;
} }
if (UNIV_LIKELY(n_heap > 1)) { if (UNIV_LIKELY(n_dense > 1)) {
page_zip_dir_sort(recs, recs_aux, 0, n_heap - 1); page_zip_dir_sort(recs, recs_aux, 0, n_dense - 1);
} }
return(TRUE); return(TRUE);
} }
...@@ -433,6 +437,8 @@ page_zip_set_extra_bytes( ...@@ -433,6 +437,8 @@ page_zip_set_extra_bytes(
for (i = 0; i < n; i++) { for (i = 0; i < n; i++) {
offs = page_zip_dir_get(page_zip, i); offs = page_zip_dir_get(page_zip, i);
rec_set_next_offs_new(rec, NULL, offs);
rec = page + offs;
if (UNIV_UNLIKELY(offs & PAGE_ZIP_DIR_SLOT_DEL)) { if (UNIV_UNLIKELY(offs & PAGE_ZIP_DIR_SLOT_DEL)) {
info_bits |= REC_INFO_DELETED_FLAG; info_bits |= REC_INFO_DELETED_FLAG;
...@@ -443,15 +449,14 @@ page_zip_set_extra_bytes( ...@@ -443,15 +449,14 @@ page_zip_set_extra_bytes(
} else { } else {
n_owned++; n_owned++;
} }
rec[-REC_N_NEW_EXTRA_BYTES] = info_bits;
info_bits = 0;
offs &= PAGE_ZIP_DIR_SLOT_MASK; offs &= PAGE_ZIP_DIR_SLOT_MASK;
if (UNIV_UNLIKELY(!offs)) { if (UNIV_UNLIKELY(offs < PAGE_ZIP_START
+ REC_N_NEW_EXTRA_BYTES)) {
return(FALSE); return(FALSE);
} }
rec_set_next_offs_new(rec, NULL, offs);
rec = page + offs; rec[-REC_N_NEW_EXTRA_BYTES] = info_bits;
info_bits = 0;
} }
/* Set the next pointer of the last user record. */ /* Set the next pointer of the last user record. */
...@@ -462,7 +467,7 @@ page_zip_set_extra_bytes( ...@@ -462,7 +467,7 @@ page_zip_set_extra_bytes(
n = page_dir_get_n_heap(page); n = page_dir_get_n_heap(page);
if (i + 2 >= n) { if (i + 2/* infimum and supremum */ >= n) {
return(UNIV_LIKELY(i + 2 == n)); return(UNIV_LIKELY(i + 2 == n));
} }
...@@ -486,7 +491,7 @@ page_zip_set_extra_bytes( ...@@ -486,7 +491,7 @@ page_zip_set_extra_bytes(
rec[-REC_N_NEW_EXTRA_BYTES] = 0; /* info_bits and n_owned */ rec[-REC_N_NEW_EXTRA_BYTES] = 0; /* info_bits and n_owned */
rec_set_next_offs_new(rec, NULL, 0); rec_set_next_offs_new(rec, NULL, 0);
return(UNIV_LIKELY(i + 2 == n)); return(UNIV_LIKELY(i + 2/* infimum and supremum */ == n));
} }
/************************************************************************** /**************************************************************************
...@@ -531,7 +536,9 @@ page_zip_apply_log( ...@@ -531,7 +536,9 @@ page_zip_apply_log(
} }
/************************************************************************** /**************************************************************************
Decompress a page. */ Decompress a page. This function should tolerate errors on the compressed
page. Instead of letting assertions fail, it will return FALSE if an
inconsistency is detected. */
ibool ibool
page_zip_decompress( page_zip_decompress(
...@@ -547,24 +554,25 @@ page_zip_decompress( ...@@ -547,24 +554,25 @@ page_zip_decompress(
byte** recs; /* dense page directory, sorted by address */ byte** recs; /* dense page directory, sorted by address */
byte* dst; byte* dst;
ulint heap_status;/* heap_no and status bits */ ulint heap_status;/* heap_no and status bits */
ulint n_heap; ulint n_dense;
mem_heap_t* heap; mem_heap_t* heap;
ulint info_bits; ulint info_bits;
ut_ad(page_zip_simple_validate(page_zip)); ut_ad(page_zip_simple_validate(page_zip));
n_heap = page_dir_get_n_heap(page_zip->data) - 2; /* The dense directory excludes the infimum and supremum records. */
ut_a(n_heap * PAGE_ZIP_DIR_SLOT_SIZE < page_zip->size); n_dense = page_dir_get_n_heap(page_zip->data) - 2;
ut_a(n_dense * PAGE_ZIP_DIR_SLOT_SIZE < page_zip->size);
heap = mem_heap_create(n_heap * (2 * sizeof *recs)); heap = mem_heap_create(n_dense * (2 * sizeof *recs));
recs = mem_heap_alloc(heap, n_heap * (2 * sizeof *recs)); recs = mem_heap_alloc(heap, n_dense * (2 * sizeof *recs));
/* Copy the page header. */ /* Copy the page header. */
memcpy(page, page_zip->data, PAGE_DATA); memcpy(page, page_zip->data, PAGE_DATA);
/* Copy the page directory. */ /* Copy the page directory. */
if (UNIV_UNLIKELY(!page_zip_dir_decode(page_zip, page, if (UNIV_UNLIKELY(!page_zip_dir_decode(page_zip, page,
recs, recs + n_heap, n_heap))) { recs, recs + n_dense, n_dense))) {
mem_heap_free(heap); mem_heap_free(heap);
return(FALSE); return(FALSE);
} }
...@@ -594,12 +602,14 @@ page_zip_decompress( ...@@ -594,12 +602,14 @@ page_zip_decompress(
ut_a(err == Z_OK); ut_a(err == Z_OK);
d_stream.next_in = page_zip->data + PAGE_DATA; d_stream.next_in = page_zip->data + PAGE_DATA;
d_stream.avail_in = page_zip->size - n_heap - (PAGE_DATA + 1); d_stream.avail_in = page_zip->size - (PAGE_DATA + 1)
- n_dense * PAGE_ZIP_DIR_SLOT_SIZE;
if (UNIV_LIKELY(n_heap > 0) if (UNIV_LIKELY(n_dense > 0)
&& *recs == page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES)) { && *recs == page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES)) {
dst = page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES); dst = page + (PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES);
recs++; recs++;
n_dense--;
} else { } else {
dst = page + PAGE_ZIP_START; dst = page + PAGE_ZIP_START;
} }
...@@ -616,10 +626,11 @@ page_zip_decompress( ...@@ -616,10 +626,11 @@ page_zip_decompress(
heap_status = REC_STATUS_ORDINARY | 2 << REC_HEAP_NO_SHIFT; heap_status = REC_STATUS_ORDINARY | 2 << REC_HEAP_NO_SHIFT;
} }
while (n_heap--) { while (n_dense--) {
d_stream.next_out = dst; d_stream.next_out = dst;
d_stream.avail_out = *recs - dst - REC_N_NEW_EXTRA_BYTES; d_stream.avail_out = *recs - dst - REC_N_NEW_EXTRA_BYTES;
ut_ad(d_stream.avail_out < UNIV_PAGE_SIZE);
/* set heap_no and the status bits */ /* set heap_no and the status bits */
mach_write_to_2(dst - REC_NEW_HEAP_NO, heap_status); mach_write_to_2(dst - REC_NEW_HEAP_NO, heap_status);
heap_status += REC_HEAP_NO_SHIFT; heap_status += REC_HEAP_NO_SHIFT;
...@@ -648,6 +659,9 @@ page_zip_decompress( ...@@ -648,6 +659,9 @@ page_zip_decompress(
- ut_align_offset(dst, UNIV_PAGE_SIZE); - ut_align_offset(dst, UNIV_PAGE_SIZE);
ut_a(d_stream.avail_out < UNIV_PAGE_SIZE); ut_a(d_stream.avail_out < UNIV_PAGE_SIZE);
/* set heap_no and the status bits */
mach_write_to_2(dst - REC_NEW_HEAP_NO, heap_status);
err = inflate(&d_stream, Z_FINISH); err = inflate(&d_stream, Z_FINISH);
if (err != Z_STREAM_END) { if (err != Z_STREAM_END) {
...@@ -667,7 +681,8 @@ zlib_error: ...@@ -667,7 +681,8 @@ zlib_error:
return(FALSE); return(FALSE);
} }
n_heap = page_dir_get_n_heap(page) - 2; /* The dense directory excludes the infimum and supremum records. */
n_dense = page_dir_get_n_heap(page) - 2;
page_zip->m_start = PAGE_DATA + d_stream.total_in; page_zip->m_start = PAGE_DATA + d_stream.total_in;
...@@ -677,7 +692,7 @@ zlib_error: ...@@ -677,7 +692,7 @@ zlib_error:
mod_log_ptr = page_zip_apply_log( mod_log_ptr = page_zip_apply_log(
page_zip->data + page_zip->m_start, page_zip->data + page_zip->m_start,
page_zip->data + page_zip->size page_zip->data + page_zip->size
- n_heap * PAGE_ZIP_DIR_SLOT_SIZE, - n_dense * PAGE_ZIP_DIR_SLOT_SIZE,
page); page);
if (UNIV_UNLIKELY(!mod_log_ptr)) { if (UNIV_UNLIKELY(!mod_log_ptr)) {
return(FALSE); return(FALSE);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment