Commit 1dd4d568 authored by marko's avatar marko

branches/zip: Write compressed pages to disk.

os_aio_simulated_handle(): Temporarily disable os_file_check_page_trailers(),
which cannot be invoked on compressed pages.

dict_table_add_system_columns(): New function, split from
dict_table_add_to_cache().

mlog_parse_index(): Add system columns to the dummy table and identify
DB_TRX_ID and DB_ROLL_PTR in the dummy index.

buf_LRU_get_free_block(): Note that page_zip->data should be allocated from
an aligned memory pool.

buf_flush_buffered_writes(): Write compressed pages to disk.

buf_flush_post_to_doublewrite_buf(): Copy compressed pages to the
doublewrite buffer.  Zero fill any excess space.

buf_flush_init_for_writing(): Treat all compressed pages the same.

buf_read_page_low(): Read compressed pages from disk.

buf_page_io_complete(): Process compressed pages.

trx_sys_doublewrite_init_or_restore_page(): Process compressed pages.

mlog_write_initial_log_record_fast(): Enable a debug printout
#ifdef UNIV_LOG_DEBUG.

fsp_header_init(), fsp_fill_free_list(): Pass the compressed page size
to buf_page_create().

page_zip_compress_write_log(): Flatten the if-else if-else logic.

page_zip_parse_write_blob_ptr(): Do not test page_zip if page==NULL.

page_zip_parse_write_node_ptr(): Do not test page_zip if page==NULL.
Invoke mlog_close() correctly.

row_sel_store_row_id_to_prebuilt(): Add UNIV_UNLIKELY hint to an
assertion-like test.
parent 662318ba
......@@ -3626,6 +3626,7 @@ btr_store_big_rec_extern_fields(
if (UNIV_LIKELY_NULL(page_zip)) {
int err;
page_zip_des_t* blob_page_zip;
mach_write_to_2(page + FIL_PAGE_TYPE,
FIL_PAGE_TYPE_ZBLOB);
......@@ -3649,6 +3650,16 @@ btr_store_big_rec_extern_fields(
0, c_stream.avail_out);
mlog_log_string(page + FIL_PAGE_TYPE,
page_zip->size - FIL_PAGE_TYPE, &mtr);
/* Copy the page to compressed storage,
because it will be flushed to disk
from there. */
blob_page_zip = buf_block_get_page_zip(
buf_block_align(page));
ut_ad(blob_page_zip);
ut_ad(blob_page_zip->size == page_zip->size);
memcpy(blob_page_zip->data, page,
page_zip->size);
/* TODO: retain blob_page_zip, release page */
if (err == Z_OK && prev_page_no != FIL_NULL) {
......
......@@ -1961,20 +1961,44 @@ buf_page_io_complete(
if (io_type == BUF_IO_READ) {
ulint read_page_no;
ulint read_space_id;
byte* frame;
if (block->page_zip.size) {
ut_a(block->space);
switch (fil_page_get_type(block->page_zip.data)) {
frame = block->page_zip.data;
switch (fil_page_get_type(frame)) {
case FIL_PAGE_INDEX:
if (block->frame) {
if (!page_zip_decompress(
&block->page_zip,
block->frame)) {
goto corrupt;
}
}
break;
case FIL_PAGE_INODE:
case FIL_PAGE_IBUF_BITMAP:
case FIL_PAGE_TYPE_FSP_HDR:
case FIL_PAGE_TYPE_XDES:
case FIL_PAGE_TYPE_ZBLOB:
/* TODO: checksum, but do not decompress */
/* Copy to uncompressed storage. */
memcpy(block->frame, frame,
block->page_zip.size);
break;
default:
/* TODO: how to distinguish uncompressed
and compressed pages? */
case 0:
/* uninitialized page */
break;
default:
ut_print_timestamp(stderr);
fprintf(stderr,
"InnoDB: unknown compressed page type %lu\n",
fil_page_get_type(frame));
goto corrupt;
}
} else {
frame = block->frame;
}
/* If this page is not uninitialized and not in the
......@@ -2013,22 +2037,19 @@ buf_page_io_complete(
/* From version 3.23.38 up we store the page checksum
to the 4 first bytes of the page end lsn field */
if (buf_page_is_corrupted(block->frame/* TODO */,
block->page_zip.size)) {
if (buf_page_is_corrupted(frame, block->page_zip.size)) {
corrupt:
fprintf(stderr,
"InnoDB: Database page corruption on disk or a failed\n"
"InnoDB: file read of page %lu.\n", (ulong) block->offset);
fputs(
"InnoDB: You may have to recover from a backup.\n", stderr);
buf_page_print(block->frame, block->page_zip.size);
"InnoDB: file read of page %lu.\n"
"InnoDB: You may have to recover from a backup.\n",
(ulong) block->offset);
buf_page_print(frame, block->page_zip.size);
fprintf(stderr,
"InnoDB: Database page corruption on disk or a failed\n"
"InnoDB: file read of page %lu.\n", (ulong) block->offset);
fputs(
"InnoDB: You may have to recover from a backup.\n", stderr);
"InnoDB: file read of page %lu.\n"
"InnoDB: You may have to recover from a backup.\n",
(ulong) block->offset);
fputs(
"InnoDB: It is also possible that your operating\n"
"InnoDB: system has corrupted its own file cache\n"
......
......@@ -365,8 +365,22 @@ flush:
for (i = 0; i < trx_doublewrite->first_free; i++) {
block = trx_doublewrite->buf_block_arr[i];
if (UNIV_LIKELY(!block->page_zip.data) && UNIV_UNLIKELY(
memcmp(block->frame + (FIL_PAGE_LSN + 4),
ut_a(block->state == BUF_BLOCK_FILE_PAGE);
if (UNIV_UNLIKELY(block->page_zip.size)) {
ulint blk_size
= UNIV_PAGE_SIZE / block->page_zip.size;
fil_io(OS_FILE_WRITE | OS_AIO_SIMULATED_WAKE_LATER,
FALSE, block->space,
block->offset / blk_size,
(block->offset % blk_size)
* block->page_zip.size,
block->page_zip.size,
(void*)block->page_zip.data,
(void*)block);
continue;
} else if (UNIV_UNLIKELY(memcmp(
block->frame + (FIL_PAGE_LSN + 4),
block->frame + (UNIV_PAGE_SIZE
- FIL_PAGE_END_LSN_OLD_CHKSUM + 4), 4))) {
ut_print_timestamp(stderr);
......@@ -379,7 +393,6 @@ flush:
(ulong)block->io_fix,
(ulong)block->state);
}
ut_a(block->state == BUF_BLOCK_FILE_PAGE);
fil_io(OS_FILE_WRITE | OS_AIO_SIMULATED_WAKE_LATER,
FALSE, block->space, block->offset, 0, UNIV_PAGE_SIZE,
......@@ -417,6 +430,7 @@ buf_flush_post_to_doublewrite_buf(
/*==============================*/
buf_block_t* block) /* in: buffer block to write */
{
ulint zip_size;
try_again:
mutex_enter(&(trx_doublewrite->mutex));
......@@ -431,10 +445,21 @@ try_again:
goto try_again;
}
/* TODO: page_zip */
zip_size = block->page_zip.size;
if (UNIV_UNLIKELY(zip_size)) {
/* Copy the compressed page and clear the rest. */
memcpy(trx_doublewrite->write_buf
+ UNIV_PAGE_SIZE * trx_doublewrite->first_free,
block->page_zip.data, zip_size);
memset(trx_doublewrite->write_buf
+ UNIV_PAGE_SIZE * trx_doublewrite->first_free
+ zip_size, 0, UNIV_PAGE_SIZE - zip_size);
} else {
memcpy(trx_doublewrite->write_buf
+ UNIV_PAGE_SIZE * trx_doublewrite->first_free,
block->frame, UNIV_PAGE_SIZE);
}
trx_doublewrite->buf_block_arr[trx_doublewrite->first_free] = block;
......@@ -468,9 +493,10 @@ buf_flush_init_for_writing(
ulint zip_size = fil_space_get_zip_size(space);
if (zip_size && zip_size != ULINT_UNDEFINED) {
ut_a(page_zip);
ut_a(page_zip->size == zip_size);
switch (UNIV_EXPECT(fil_page_get_type(page), FIL_PAGE_INDEX)) {
case FIL_PAGE_TYPE_ZBLOB:
ut_ad(!page_zip);
mach_write_to_4(page + FIL_PAGE_OFFSET, page_no);
mach_write_to_4(page + FIL_PAGE_ZBLOB_SPACE_ID, space);
mach_write_to_8(page + FIL_PAGE_LSN, newest_lsn);
......@@ -484,8 +510,8 @@ buf_flush_init_for_writing(
case FIL_PAGE_IBUF_BITMAP:
case FIL_PAGE_TYPE_FSP_HDR:
case FIL_PAGE_TYPE_XDES:
/* This is essentially an uncompressed page. */
break;
/* These are essentially uncompressed pages. */
memcpy(page_zip->data, page, zip_size);
case FIL_PAGE_INDEX:
ut_a(zip_size == page_zip->size);
mach_write_to_4(page
......
......@@ -424,7 +424,7 @@ loop:
}
if (zip_size) {
/* TODO: allocate this from a separate pool */
/* TODO: allocate zip from an aligned pool */
block->page_zip.data = ut_malloc(zip_size);
} else {
block->page_zip.data = NULL;
......
......@@ -140,10 +140,20 @@ buf_read_page_low(
ut_a(block->state == BUF_BLOCK_FILE_PAGE);
if (zip_size) {
ulint zip_blk = UNIV_PAGE_SIZE / zip_size;
*err = fil_io(OS_FILE_READ | wake_later,
sync, space,
offset / zip_blk, (offset % zip_blk)
* zip_size, zip_size,
(void*)block->page_zip.data, (void*)block);
} else {
*err = fil_io(OS_FILE_READ | wake_later,
sync, space,
offset, 0, UNIV_PAGE_SIZE,
(void*)block->frame, (void*)block);
}
ut_a(*err == DB_SUCCESS);
if (sync) {
......
......@@ -793,29 +793,20 @@ dict_table_get_and_increment_handle_count(
}
/**************************************************************************
Adds a table object to the dictionary cache. */
Adds system columns to a table object. */
void
dict_table_add_to_cache(
/*====================*/
dict_table_t* table) /* in: table */
dict_table_add_system_columns(
/*==========================*/
dict_table_t* table) /* in/out: table */
{
ulint fold;
ulint id_fold;
ulint i;
ut_ad(table);
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&(dict_sys->mutex)));
#endif /* UNIV_SYNC_DEBUG */
ut_ad(table->n_def == table->n_cols - DATA_N_SYS_COLS);
ut_ad(table->magic_n == DICT_TABLE_MAGIC_N);
ut_ad(table->cached == FALSE);
fold = ut_fold_string(table->name);
id_fold = ut_fold_dulint(table->id);
table->cached = TRUE;
ut_ad(!table->cached);
/* NOTE: the system columns MUST be added in the following order
(so that they can be indexed by the numerical value of DATA_ROW_ID,
......@@ -849,6 +840,26 @@ dict_table_add_to_cache(
#if DATA_N_SYS_COLS != 4
#error "DATA_N_SYS_COLS != 4"
#endif
}
/**************************************************************************
Adds a table object to the dictionary cache. */
void
dict_table_add_to_cache(
/*====================*/
dict_table_t* table) /* in: table */
{
ulint fold;
ulint id_fold;
ulint i;
dict_table_add_system_columns(table);
table->cached = TRUE;
fold = ut_fold_string(table->name);
id_fold = ut_fold_dulint(table->id);
/* Look for a table with the same name: error if such exists */
{
......
......@@ -890,7 +890,7 @@ fsp_header_init(
mtr_x_lock(fil_space_get_latch(space), mtr);
page = buf_page_create(space, 0, 0/* TODO: zip_size!=16k? */, mtr);
page = buf_page_create(space, 0, zip_size, mtr);
buf_page_get(space, 0, RW_X_LATCH, mtr);
#ifdef UNIV_SYNC_DEBUG
buf_page_dbg_add_level(page, SYNC_FSP_PAGE);
......@@ -1218,6 +1218,7 @@ fsp_fill_free_list(
{
ulint limit;
ulint size;
ulint zip_size;
xdes_t* descr;
ulint count = 0;
ulint frag_n_used;
......@@ -1233,6 +1234,8 @@ fsp_fill_free_list(
size = mtr_read_ulint(header + FSP_SIZE, MLOG_4BYTES, mtr);
limit = mtr_read_ulint(header + FSP_FREE_LIMIT, MLOG_4BYTES, mtr);
zip_size = mach_read_from_4(FSP_PAGE_ZIP_SIZE + header);
if (space == 0 && srv_auto_extend_last_data_file
&& size < limit + FSP_EXTENT_SIZE * FSP_FREE_ADD) {
......@@ -1272,8 +1275,8 @@ fsp_fill_free_list(
pages should be ignored. */
if (i > 0) {
/* TODO: zip_size != 16384 */
descr_page = buf_page_create(space, i, 0, mtr);
descr_page = buf_page_create(
space, i, zip_size, mtr);
buf_page_get(space, i, RW_X_LATCH, mtr);
#ifdef UNIV_SYNC_DEBUG
buf_page_dbg_add_level(descr_page,
......@@ -1291,10 +1294,9 @@ fsp_fill_free_list(
mtr_start(&ibuf_mtr);
/* TODO: no ibuf on compressed tablespaces */
ibuf_page = buf_page_create(space,
i + FSP_IBUF_BITMAP_OFFSET,
0, &ibuf_mtr);
zip_size, &ibuf_mtr);
buf_page_get(space, i + FSP_IBUF_BITMAP_OFFSET,
RW_X_LATCH, &ibuf_mtr);
#ifdef UNIV_SYNC_DEBUG
......
......@@ -163,6 +163,13 @@ dict_table_autoinc_update(
dict_table_t* table, /* in: table */
ib_longlong value); /* in: value which was assigned to a row */
/**************************************************************************
Adds system columns to a table object. */
void
dict_table_add_system_columns(
/*==========================*/
dict_table_t* table); /* in/out: table */
/**************************************************************************
Adds a table object to the dictionary cache. */
void
......
......@@ -182,9 +182,9 @@ mlog_write_initial_log_record_fast(
mtr->n_log_recs++;
#ifdef UNIV_LOG_DEBUG
/* fprintf(stderr,
fprintf(stderr,
"Adding to mtr log record type %lu space %lu page no %lu\n",
type, space, offset); */
(ulong) type, space, offset);
#endif
#ifdef UNIV_DEBUG
......
......@@ -534,6 +534,7 @@ mlog_parse_index(
n = mach_read_from_2(ptr);
ptr += 2;
n_uniq = mach_read_from_2(ptr);
ptr += 2;
ut_ad(n_uniq <= n);
if (end_ptr < ptr + (n + 1) * 2) {
return(NULL);
......@@ -548,13 +549,13 @@ mlog_parse_index(
ind->table = table;
ind->n_uniq = n_uniq;
if (n_uniq != n) {
ut_a(n_uniq + DATA_ROLL_PTR <= n);
ind->type = DICT_CLUSTERED;
}
/* avoid ut_ad(index->cached) in dict_index_get_n_unique_in_tree */
ind->cached = TRUE;
if (comp) {
for (i = 0; i < n; i++) {
ulint len = mach_read_from_2(ptr += 2);
ulint len = mach_read_from_2(ptr);
ptr += 2;
/* The high-order bit of len is the NOT NULL flag;
the rest is 0 or 0x7fff for variable-length fields,
and 1..0x7ffe for fixed-length fields. */
......@@ -567,8 +568,25 @@ mlog_parse_index(
dict_index_add_col(ind,
dict_table_get_nth_col(table, i), 0);
}
ptr += 2;
dict_table_add_system_columns(table);
if (n_uniq != n) {
/* Identify DB_TRX_ID and DB_ROLL_PTR in the index. */
ut_a(dtype_get_len(dict_col_get_type(
dict_field_get_col(dict_index_get_nth_field(
ind, n_uniq + (DATA_TRX_ID - 1)))))
== DATA_TRX_ID_LEN);
ut_a(dtype_get_len(dict_col_get_type(
dict_field_get_col(dict_index_get_nth_field(
ind, n_uniq + (DATA_ROLL_PTR - 1)))))
== DATA_ROLL_PTR_LEN);
dict_table_get_nth_col(table, i + DATA_TRX_ID)
->clust_pos = n_uniq + (DATA_TRX_ID - 1);
dict_table_get_nth_col(table, i + DATA_ROLL_PTR)
->clust_pos = n_uniq + (DATA_ROLL_PTR - 1);
}
}
/* avoid ut_ad(index->cached) in dict_index_get_n_unique_in_tree */
ind->cached = TRUE;
*index = ind;
return(ptr);
}
......@@ -3943,7 +3943,8 @@ consecutive_loop:
/* Do the i/o with ordinary, synchronous i/o functions: */
if (slot->type == OS_FILE_WRITE) {
if (array == os_aio_write_array /* TODO: && !page_zip */) {
#if 0 /* TODO: && !page_zip */
if (array == os_aio_write_array) {
if ((total_len % UNIV_PAGE_SIZE != 0)
|| (slot->offset % UNIV_PAGE_SIZE != 0)) {
fprintf(stderr,
......@@ -3955,13 +3956,14 @@ consecutive_loop:
}
os_file_check_page_trailers(combined_buf, total_len);
}
#endif
ret = os_file_write(slot->name, slot->file, combined_buf,
slot->offset, slot->offset_high, total_len);
if (array == os_aio_write_array /* TODO: && !page_zip */) {
#if 0 /* TODO: && !page_zip */
if (array == os_aio_write_array) {
os_file_check_page_trailers(combined_buf, total_len);
}
#endif
} else {
ret = os_file_read(slot->file, combined_buf,
slot->offset, slot->offset_high, total_len);
......
......@@ -211,16 +211,14 @@ page_zip_compress_write_log(
Subtract 2 for the infimum and supremum records. */
trailer_size = page_dir_get_n_heap(page_zip->data) - 2;
/* Multiply by uncompressed of size stored per record */
if (page_is_leaf(page)) {
if (dict_index_is_clust(index)) {
if (!page_is_leaf(page)) {
trailer_size *= PAGE_ZIP_DIR_SLOT_SIZE + REC_NODE_PTR_SIZE;
} else if (dict_index_is_clust(index)) {
trailer_size *= PAGE_ZIP_DIR_SLOT_SIZE
+ DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
} else {
trailer_size *= PAGE_ZIP_DIR_SLOT_SIZE;
}
} else {
trailer_size *= PAGE_ZIP_DIR_SLOT_SIZE + REC_NODE_PTR_SIZE;
}
/* Add the space occupied by BLOB pointers. */
trailer_size += page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
ut_a(page_zip->m_end > PAGE_DATA);
......@@ -2311,8 +2309,7 @@ page_zip_parse_write_blob_ptr(
if (UNIV_UNLIKELY(offset < PAGE_ZIP_START)
|| UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)
|| UNIV_UNLIKELY(z_offset >= UNIV_PAGE_SIZE)
|| UNIV_UNLIKELY(!page_zip)) {
|| UNIV_UNLIKELY(z_offset >= UNIV_PAGE_SIZE)) {
corrupt:
recv_sys->found_corrupt_log = TRUE;
......@@ -2320,15 +2317,16 @@ corrupt:
}
if (page) {
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
if (UNIV_UNLIKELY(!page_is_leaf(page))) {
if (UNIV_UNLIKELY(!page_zip)
|| UNIV_UNLIKELY(!page_is_leaf(page))) {
goto corrupt;
}
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
memcpy(page + offset,
ptr + 4, BTR_EXTERN_FIELD_REF_SIZE);
memcpy(page_zip->data + z_offset,
......@@ -2451,8 +2449,7 @@ page_zip_parse_write_node_ptr(
if (UNIV_UNLIKELY(offset < PAGE_ZIP_START)
|| UNIV_UNLIKELY(offset >= UNIV_PAGE_SIZE)
|| UNIV_UNLIKELY(z_offset >= UNIV_PAGE_SIZE)
|| UNIV_UNLIKELY(!page_zip)) {
|| UNIV_UNLIKELY(z_offset >= UNIV_PAGE_SIZE)) {
corrupt:
recv_sys->found_corrupt_log = TRUE;
......@@ -2465,15 +2462,16 @@ corrupt:
byte* storage;
ulint heap_no;
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
if (UNIV_UNLIKELY(page_is_leaf(page))) {
if (UNIV_UNLIKELY(!page_zip)
|| UNIV_UNLIKELY(page_is_leaf(page))) {
goto corrupt;
}
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip, page));
#endif /* UNIV_DEBUG || UNIV_ZIP_DEBUG */
field = page + offset;
storage = page_zip->data + z_offset;
......@@ -2558,7 +2556,8 @@ page_zip_write_node_ptr(
mach_write_to_2(log_ptr, storage - page_zip->data);
log_ptr += 2;
memcpy(log_ptr, field, REC_NODE_PTR_SIZE);
mlog_close(mtr, log_ptr + 6);
log_ptr += REC_NODE_PTR_SIZE;
mlog_close(mtr, log_ptr);
}
}
......
......@@ -2369,7 +2369,7 @@ row_sel_store_row_id_to_prebuilt(
data = rec_get_nth_field(index_rec, offsets,
dict_index_get_sys_col_pos(index, DATA_ROW_ID), &len);
if (len != DATA_ROW_ID_LEN) {
if (UNIV_UNLIKELY(len != DATA_ROW_ID_LEN)) {
fprintf(stderr,
"InnoDB: Error: Row id field is wrong length %lu in ", (ulong) len);
dict_index_name_print(stderr, prebuilt->trx, index);
......
......@@ -465,19 +465,32 @@ trx_sys_doublewrite_init_or_restore_pages(
do nothing */
} else {
ulint zip_size;
/* Read in the actual page from the data files */
fil_io(OS_FILE_READ, TRUE, space_id, page_no, 0,
UNIV_PAGE_SIZE, read_buf, NULL);
/* Check if the page is corrupt */
ulint zip_blk;
if (space_id) {
zip_size = fil_space_get_zip_size(space_id);
if (UNIV_LIKELY(!zip_size)) {
goto read_uncompressed;
}
zip_blk = UNIV_PAGE_SIZE / zip_size;
/* Read in the actual page from the file */
fil_io(OS_FILE_READ, TRUE, space_id,
page_no / zip_blk,
(page_no % zip_blk)
* zip_size, zip_size, read_buf, NULL);
} else {
read_uncompressed:
zip_size = 0;
zip_blk = 1;
/* Read in the actual page from the file */
fil_io(OS_FILE_READ, TRUE, space_id, page_no,
0, UNIV_PAGE_SIZE, read_buf, NULL);
}
if (buf_page_is_corrupted(read_buf, zip_size)) {
/* Check if the page is corrupt */
if (UNIV_UNLIKELY(buf_page_is_corrupted(
read_buf, zip_size))) {
fprintf(stderr,
"InnoDB: Warning: database page corruption or a failed\n"
......@@ -506,9 +519,17 @@ trx_sys_doublewrite_init_or_restore_pages(
doublewrite buffer to the intended
position */
if (zip_size) {
fil_io(OS_FILE_WRITE, TRUE, space_id,
page_no / zip_blk,
(page_no % zip_blk)
* zip_size, zip_size,
page, NULL);
} else {
fil_io(OS_FILE_WRITE, TRUE, space_id,
page_no, 0,
UNIV_PAGE_SIZE, page, NULL);
}
fprintf(stderr,
"InnoDB: Recovered the page from the doublewrite buffer.\n");
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment