Commit 2ca904f0 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-13103 Deal with page_compressed page corruption

fil_page_decompress(): Replaces fil_decompress_page().
Allow the caller detect errors. Remove
duplicated code. Use the "safe" instead of "fast" variants of
decompression routines.

fil_page_compress(): Replaces fil_compress_page().
The length of the input buffer always was srv_page_size (innodb_page_size).
Remove printouts, and remove the fil_space_t* parameter.

buf_tmp_buffer_t::reserved: Make private; the accessors acquire()
and release() will use atomic memory access.

buf_pool_reserve_tmp_slot(): Make static. Remove the second parameter.
Do not acquire any mutex. Remove the allocation of the buffers.

buf_tmp_reserve_crypt_buf(), buf_tmp_reserve_compression_buf():
Refactored away from buf_pool_reserve_tmp_slot().

buf_page_decrypt_after_read(): Make static, and simplify the logic.
Use the encryption buffer also for decompressing.

buf_page_io_complete(), buf_dblwr_process(): Check more failures.

fil_space_encrypt(): Simplify the debug checks.

fil_space_t::printed_compression_failure: Remove.

fil_get_compression_alg_name(): Remove.

fil_iterate(): Allocate a buffer for compression and decompression
only once, instead of allocating and freeing it for every page
that uses compression, during IMPORT TABLESPACE. Also, validate the
page checksum before decryption, and reduce the scope of some variables.

fil_page_is_index_page(), fil_page_is_lzo_compressed(): Remove (unused).

AbstractCallback::operator()(): Remove the parameter 'offset'.
The check for it in FetchIndexRootPages::operator() was basically
redundant and dead code since the previous refactoring.
parent 2cdb483b
call mtr.add_suppression("InnoDB: Compression failed for space [0-9]+ name test/innodb_page_compressed[0-9] len [0-9]+ err 2 write_size [0-9]+.");
set global innodb_file_format = `Barracuda`; set global innodb_file_format = `Barracuda`;
set global innodb_file_per_table = on; set global innodb_file_per_table = on;
create table innodb_normal (c1 int not null auto_increment primary key, b char(200)) engine=innodb; create table innodb_normal (c1 int not null auto_increment primary key, b char(200)) engine=innodb;
......
call mtr.add_suppression("InnoDB: Compression failed for space [0-9]+ name test/innodb_page_compressed[0-9] len [0-9]+ err 2 write_size [0-9]+.");
set global innodb_compression_algorithm = snappy; set global innodb_compression_algorithm = snappy;
set global innodb_file_format = `Barracuda`; set global innodb_file_format = `Barracuda`;
set global innodb_file_per_table = on; set global innodb_file_per_table = on;
......
--source include/have_innodb.inc --source include/have_innodb.inc
--source include/not_embedded.inc --source include/not_embedded.inc
call mtr.add_suppression("InnoDB: Compression failed for space [0-9]+ name test/innodb_page_compressed[0-9] len [0-9]+ err 2 write_size [0-9]+.");
# All page compression test use the same # All page compression test use the same
--source include/innodb-page-compression.inc --source include/innodb-page-compression.inc
......
...@@ -2,8 +2,6 @@ ...@@ -2,8 +2,6 @@
-- source include/have_innodb_snappy.inc -- source include/have_innodb_snappy.inc
--source include/not_embedded.inc --source include/not_embedded.inc
call mtr.add_suppression("InnoDB: Compression failed for space [0-9]+ name test/innodb_page_compressed[0-9] len [0-9]+ err 2 write_size [0-9]+.");
# snappy # snappy
set global innodb_compression_algorithm = snappy; set global innodb_compression_algorithm = snappy;
......
This diff is collapsed.
...@@ -533,10 +533,11 @@ buf_dblwr_process() ...@@ -533,10 +533,11 @@ buf_dblwr_process()
} }
unaligned_read_buf = static_cast<byte*>( unaligned_read_buf = static_cast<byte*>(
ut_malloc_nokey(2 * UNIV_PAGE_SIZE)); ut_malloc_nokey(3 * UNIV_PAGE_SIZE));
read_buf = static_cast<byte*>( read_buf = static_cast<byte*>(
ut_align(unaligned_read_buf, UNIV_PAGE_SIZE)); ut_align(unaligned_read_buf, UNIV_PAGE_SIZE));
byte* const buf = read_buf + UNIV_PAGE_SIZE;
for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin(); for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin();
i != recv_dblwr.pages.end(); i != recv_dblwr.pages.end();
...@@ -604,24 +605,24 @@ buf_dblwr_process() ...@@ -604,24 +605,24 @@ buf_dblwr_process()
ignore this page (there should be redo log ignore this page (there should be redo log
records to initialize it). */ records to initialize it). */
} else { } else {
if (fil_page_is_compressed_encrypted(read_buf) || /* Decompress the page before
fil_page_is_compressed(read_buf)) { validating the checksum. */
/* Decompress the page before ulint decomp = fil_page_decompress(buf, read_buf);
validating the checksum. */ if (!decomp || (decomp != srv_page_size
fil_decompress_page( && page_size.is_compressed())) {
NULL, read_buf, srv_page_size, goto bad;
NULL, true);
} }
if (fil_space_verify_crypt_checksum( if (fil_space_verify_crypt_checksum(
read_buf, page_size, space_id, page_no) read_buf, page_size, space_id, page_no)
|| !buf_page_is_corrupted( || !buf_page_is_corrupted(
true, read_buf, page_size, space)) { true, read_buf, page_size, space)) {
/* The page is good; there is no need /* The page is good; there is no need
to consult the doublewrite buffer. */ to consult the doublewrite buffer. */
continue; continue;
} }
bad:
/* We intentionally skip this message for /* We intentionally skip this message for
is_all_zero pages. */ is_all_zero pages. */
ib::info() ib::info()
...@@ -629,19 +630,16 @@ buf_dblwr_process() ...@@ -629,19 +630,16 @@ buf_dblwr_process()
<< " from the doublewrite buffer."; << " from the doublewrite buffer.";
} }
/* Next, validate the doublewrite page. */ ulint decomp = fil_page_decompress(buf, page);
if (fil_page_is_compressed_encrypted(page) || if (!decomp || (decomp != srv_page_size
fil_page_is_compressed(page)) { && page_size.is_compressed())) {
/* Decompress the page before goto bad_doublewrite;
validating the checksum. */
fil_decompress_page(
NULL, page, srv_page_size, NULL, true);
} }
if (!fil_space_verify_crypt_checksum(page, page_size, if (!fil_space_verify_crypt_checksum(page, page_size,
space_id, page_no) space_id, page_no)
&& buf_page_is_corrupted(true, page, page_size, space)) { && buf_page_is_corrupted(true, page, page_size, space)) {
if (!is_all_zero) { if (!is_all_zero) {
bad_doublewrite:
ib::warn() << "A doublewrite copy of page " ib::warn() << "A doublewrite copy of page "
<< page_id << " is corrupted."; << page_id << " is corrupted.";
} }
......
...@@ -665,57 +665,38 @@ fil_space_encrypt( ...@@ -665,57 +665,38 @@ fil_space_encrypt(
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
if (tmp) { if (tmp) {
/* Verify that encrypted buffer is not corrupted */ /* Verify that encrypted buffer is not corrupted */
byte* tmp_mem = (byte *)malloc(UNIV_PAGE_SIZE);
dberr_t err = DB_SUCCESS; dberr_t err = DB_SUCCESS;
byte* src = src_frame; byte* src = src_frame;
bool page_compressed_encrypted = (mach_read_from_2(tmp+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); bool page_compressed_encrypted = (mach_read_from_2(tmp+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
byte* comp_mem = NULL; byte uncomp_mem[UNIV_PAGE_SIZE_MAX];
byte* uncomp_mem = NULL; byte tmp_mem[UNIV_PAGE_SIZE_MAX];
if (page_compressed_encrypted) { if (page_compressed_encrypted) {
comp_mem = (byte *)malloc(UNIV_PAGE_SIZE); memcpy(uncomp_mem, src, srv_page_size);
uncomp_mem = (byte *)malloc(UNIV_PAGE_SIZE); ulint unzipped1 = fil_page_decompress(
memcpy(comp_mem, src_frame, UNIV_PAGE_SIZE); tmp_mem, uncomp_mem);
fil_decompress_page(uncomp_mem, comp_mem, ut_ad(unzipped1);
srv_page_size, NULL); if (unzipped1 != srv_page_size) {
src = uncomp_mem; src = uncomp_mem;
}
} }
bool corrupted1 = buf_page_is_corrupted(true, src, page_size, space); ut_ad(!buf_page_is_corrupted(true, src, page_size, space));
bool ok = fil_space_decrypt(crypt_data, tmp_mem, page_size, tmp, &err); ut_ad(fil_space_decrypt(crypt_data, tmp_mem, page_size, tmp,
&err));
ut_ad(err == DB_SUCCESS);
/* Need to decompress the page if it was also compressed */ /* Need to decompress the page if it was also compressed */
if (page_compressed_encrypted) { if (page_compressed_encrypted) {
memcpy(comp_mem, tmp_mem, UNIV_PAGE_SIZE); byte buf[UNIV_PAGE_SIZE_MAX];
fil_decompress_page(tmp_mem, comp_mem, memcpy(buf, tmp_mem, srv_page_size);
srv_page_size, NULL); ulint unzipped2 = fil_page_decompress(tmp_mem, buf);
} ut_ad(unzipped2);
bool corrupted = buf_page_is_corrupted(true, tmp_mem, page_size, space);
memcpy(tmp_mem+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, src+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 8);
bool different = memcmp(src, tmp_mem, page_size.physical());
if (!ok || corrupted || corrupted1 || err != DB_SUCCESS || different) {
fprintf(stderr, "ok %d corrupted %d corrupted1 %d err %d different %d\n",
ok , corrupted, corrupted1, err, different);
fprintf(stderr, "src_frame\n");
buf_page_print(src_frame, page_size);
fprintf(stderr, "encrypted_frame\n");
buf_page_print(tmp, page_size);
fprintf(stderr, "decrypted_frame\n");
buf_page_print(tmp_mem, page_size);
ut_ad(0);
} }
free(tmp_mem); memcpy(tmp_mem + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
src + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 8);
if (comp_mem) { ut_ad(!memcmp(src, tmp_mem, page_size.physical()));
free(comp_mem);
}
if (uncomp_mem) {
free(uncomp_mem);
}
} }
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
......
This diff is collapsed.
...@@ -5040,6 +5040,11 @@ ibuf_check_bitmap_on_import( ...@@ -5040,6 +5040,11 @@ ibuf_check_bitmap_on_import(
continue; continue;
} }
if (!bitmap_page) {
mutex_exit(&ibuf_mutex);
return DB_CORRUPTION;
}
for (i = FSP_IBUF_BITMAP_OFFSET + 1; for (i = FSP_IBUF_BITMAP_OFFSET + 1;
i < page_size.physical(); i < page_size.physical();
i++) { i++) {
......
...@@ -41,6 +41,7 @@ Created 11/5/1995 Heikki Tuuri ...@@ -41,6 +41,7 @@ Created 11/5/1995 Heikki Tuuri
#include "os0proc.h" #include "os0proc.h"
#include "log0log.h" #include "log0log.h"
#include "srv0srv.h" #include "srv0srv.h"
#include "my_atomic.h"
#include <ostream> #include <ostream>
// Forward declaration // Forward declaration
...@@ -1516,8 +1517,10 @@ NOTE! The definition appears here only for other modules of this ...@@ -1516,8 +1517,10 @@ NOTE! The definition appears here only for other modules of this
directory (buf) to see it. Do not use from outside! */ directory (buf) to see it. Do not use from outside! */
typedef struct { typedef struct {
bool reserved; /*!< true if this slot is reserved private:
int32 reserved; /*!< true if this slot is reserved
*/ */
public:
byte* crypt_buf; /*!< for encryption the data needs to be byte* crypt_buf; /*!< for encryption the data needs to be
copied to a separate buffer before it's copied to a separate buffer before it's
encrypted&written. this as a page can be encrypted&written. this as a page can be
...@@ -1528,6 +1531,21 @@ typedef struct { ...@@ -1528,6 +1531,21 @@ typedef struct {
byte* out_buf; /*!< resulting buffer after byte* out_buf; /*!< resulting buffer after
encryption/compression. This is a encryption/compression. This is a
pointer and not allocated. */ pointer and not allocated. */
/** Release the slot */
void release()
{
my_atomic_store32_explicit(&reserved, false,
MY_MEMORY_ORDER_RELAXED);
}
/** Acquire the slot
@return whether the slot was acquired */
bool acquire()
{
return !my_atomic_fas32_explicit(&reserved, true,
MY_MEMORY_ORDER_RELAXED);
}
} buf_tmp_buffer_t; } buf_tmp_buffer_t;
/** The common buffer control block structure /** The common buffer control block structure
......
...@@ -175,9 +175,6 @@ struct fil_space_t { ...@@ -175,9 +175,6 @@ struct fil_space_t {
/** MariaDB encryption data */ /** MariaDB encryption data */
fil_space_crypt_t* crypt_data; fil_space_crypt_t* crypt_data;
/** True if we have already printed compression failure */
bool printed_compression_failure;
/** True if the device this filespace is on supports atomic writes */ /** True if the device this filespace is on supports atomic writes */
bool atomic_write_supported; bool atomic_write_supported;
......
/***************************************************************************** /*****************************************************************************
Copyright (C) 2013, 2017 MariaDB Corporation. All Rights Reserved. Copyright (C) 2013, 2018 MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -30,40 +30,24 @@ atomic writes information to table space. ...@@ -30,40 +30,24 @@ atomic writes information to table space.
Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com
***********************************************************************/ ***********************************************************************/
/****************************************************************//** /** Compress a page_compressed page before writing to a data file.
For page compressed pages compress the page before actual write @param[in] buf page to be compressed
operation. @param[out] out_buf compressed page
@return compressed page to be written*/ @param[in] level compression level
UNIV_INTERN @param[in] block_size file system block size
byte* @param[in] encrypted whether the page will be subsequently encrypted
fil_compress_page( @return actual length of compressed page
/*==============*/ @retval 0 if the page was not compressed */
fil_space_t* space, /*!< in,out: tablespace (NULL during IMPORT) */ ulint fil_page_compress(const byte* buf, byte* out_buf, ulint level,
byte* buf, /*!< in: buffer from which to write; in aio ulint block_size, bool encrypted)
this must be appropriately aligned */ MY_ATTRIBUTE((nonnull, warn_unused_result));
byte* out_buf, /*!< out: compressed buffer */
ulint len, /*!< in: length of input buffer.*/ /** Decompress a page that may be subject to page_compressed compression.
ulint level, /* in: compression level */ @param[in,out] tmp_buf temporary buffer (of innodb_page_size)
ulint block_size, /*!< in: block size */ @param[in,out] buf compressed page buffer
bool encrypted, /*!< in: is page also encrypted */ @return size of the compressed data
ulint* out_len); /*!< out: actual length of compressed @retval 0 if decompression failed
page */ @retval srv_page_size if the page was not compressed */
ulint fil_page_decompress(byte* tmp_buf, byte* buf)
/****************************************************************//** MY_ATTRIBUTE((nonnull, warn_unused_result));
For page compressed pages decompress the page after actual read
operation. */
UNIV_INTERN
void
fil_decompress_page(
/*================*/
byte* page_buf, /*!< in: preallocated buffer or NULL */
byte* buf, /*!< out: buffer from which to read; in aio
this must be appropriately aligned */
ulong len, /*!< in: length of output buffer.*/
ulint* write_size, /*!< in/out: Actual payload size of
the compressed data. */
bool return_error=false);
/*!< in: true if only an error should
be produced when decompression fails.
By default this parameter is false. */
#endif #endif
/***************************************************************************** /*****************************************************************************
Copyright (C) 2013, 2017, MariaDB Corporation. All Rights Reserved. Copyright (C) 2013, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -38,18 +38,6 @@ fsp_flags_get_page_compression_level( ...@@ -38,18 +38,6 @@ fsp_flags_get_page_compression_level(
} }
/*******************************************************************//**
Find out wheather the page is index page or not
@return true if page type index page, false if not */
UNIV_INLINE
bool
fil_page_is_index_page(
/*===================*/
byte* buf) /*!< in: page */
{
return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_INDEX);
}
/*******************************************************************//** /*******************************************************************//**
Find out wheather the page is page compressed Find out wheather the page is page compressed
@return true if page is page compressed, false if not */ @return true if page is page compressed, false if not */
...@@ -73,59 +61,3 @@ fil_page_is_compressed_encrypted( ...@@ -73,59 +61,3 @@ fil_page_is_compressed_encrypted(
{ {
return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
} }
/****************************************************************//**
Get the name of the compression algorithm used for page
compression.
@return compression algorithm name or "UNKNOWN" if not known*/
UNIV_INLINE
const char*
fil_get_compression_alg_name(
/*=========================*/
ib_uint64_t comp_alg) /*!<in: compression algorithm number */
{
switch(comp_alg) {
case PAGE_UNCOMPRESSED:
return ("uncompressed");
break;
case PAGE_ZLIB_ALGORITHM:
return ("ZLIB");
break;
case PAGE_LZ4_ALGORITHM:
return ("LZ4");
break;
case PAGE_LZO_ALGORITHM:
return ("LZO");
break;
case PAGE_LZMA_ALGORITHM:
return ("LZMA");
break;
case PAGE_BZIP2_ALGORITHM:
return ("BZIP2");
break;
case PAGE_SNAPPY_ALGORITHM:
return ("SNAPPY");
break;
/* No default to get compiler warning */
}
return ("NULL");
}
#ifndef UNIV_INNOCHECKSUM
/*******************************************************************//**
Find out wheather the page is page compressed with lzo method
@return true if page is page compressed with lzo method, false if not */
UNIV_INLINE
bool
fil_page_is_lzo_compressed(
/*=======================*/
byte* buf) /*!< in: page */
{
return((mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED &&
mach_read_from_8(buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION) == PAGE_LZO_ALGORITHM) ||
(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED &&
mach_read_from_2(buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE) == PAGE_LZO_ALGORITHM));
}
#endif /* UNIV_INNOCHECKSUM */
...@@ -40,6 +40,12 @@ Created 2012-02-08 by Sunny Bains. ...@@ -40,6 +40,12 @@ Created 2012-02-08 by Sunny Bains.
#include "row0quiesce.h" #include "row0quiesce.h"
#include "fil0pagecompress.h" #include "fil0pagecompress.h"
#include "ut0new.h" #include "ut0new.h"
#ifdef HAVE_LZO
#include "lzo/lzo1x.h"
#endif
#ifdef HAVE_SNAPPY
#include "snappy-c.h"
#endif
#include <vector> #include <vector>
...@@ -406,12 +412,9 @@ class AbstractCallback ...@@ -406,12 +412,9 @@ class AbstractCallback
updated then its state must be set to BUF_PAGE_NOT_USED. For updated then its state must be set to BUF_PAGE_NOT_USED. For
compressed tables the page descriptor memory will be at offset: compressed tables the page descriptor memory will be at offset:
block->frame + UNIV_PAGE_SIZE; block->frame + UNIV_PAGE_SIZE;
@param offset - physical offset within the file @param block block read from file, note it is not from the buffer pool
@param block - block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */ @retval DB_SUCCESS or error code. */
virtual dberr_t operator()( virtual dberr_t operator()(buf_block_t* block) UNIV_NOTHROW = 0;
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW = 0;
/** /**
@return the space id of the tablespace */ @return the space id of the tablespace */
...@@ -635,12 +638,9 @@ struct FetchIndexRootPages : public AbstractCallback { ...@@ -635,12 +638,9 @@ struct FetchIndexRootPages : public AbstractCallback {
} }
/** Called for each block as it is read from the file. /** Called for each block as it is read from the file.
@param offset physical offset in the file
@param block block to convert, it is not from the buffer pool. @param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */ @retval DB_SUCCESS or error code. */
virtual dberr_t operator() ( dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW;
/** Update the import configuration that will be used to import /** Update the import configuration that will be used to import
the tablespace. */ the tablespace. */
...@@ -657,13 +657,9 @@ struct FetchIndexRootPages : public AbstractCallback { ...@@ -657,13 +657,9 @@ struct FetchIndexRootPages : public AbstractCallback {
determine the exact row format. We can't get that from the tablespace determine the exact row format. We can't get that from the tablespace
header flags alone. header flags alone.
@param offset physical offset in the file
@param block block to convert, it is not from the buffer pool. @param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */ @retval DB_SUCCESS or error code. */
dberr_t dberr_t FetchIndexRootPages::operator()(buf_block_t* block) UNIV_NOTHROW
FetchIndexRootPages::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
{ {
if (is_interrupted()) return DB_INTERRUPTED; if (is_interrupted()) return DB_INTERRUPTED;
...@@ -671,15 +667,7 @@ FetchIndexRootPages::operator() ( ...@@ -671,15 +667,7 @@ FetchIndexRootPages::operator() (
ulint page_type = fil_page_get_type(page); ulint page_type = fil_page_get_type(page);
if (block->page.id.page_no() * m_page_size.physical() != offset) { if (page_type == FIL_PAGE_TYPE_XDES) {
ib::error() << "Page offset doesn't match file offset:"
" page offset: " << block->page.id.page_no()
<< ", file offset: "
<< (offset / m_page_size.physical());
return DB_CORRUPTION;
} else if (page_type == FIL_PAGE_TYPE_XDES) {
return set_current_xdes(block->page.id.page_no(), page); return set_current_xdes(block->page.id.page_no(), page);
} else if (fil_page_index_page_check(page) } else if (fil_page_index_page_check(page)
&& !is_free(block->page.id.page_no()) && !is_free(block->page.id.page_no())
...@@ -825,12 +813,9 @@ class PageConverter : public AbstractCallback { ...@@ -825,12 +813,9 @@ class PageConverter : public AbstractCallback {
} }
/** Called for each block as it is read from the file. /** Called for each block as it is read from the file.
@param offset physical offset in the file
@param block block to convert, it is not from the buffer pool. @param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */ @retval DB_SUCCESS or error code. */
virtual dberr_t operator() ( dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW;
private: private:
/** Update the page, set the space id, max trx id and index id. /** Update the page, set the space id, max trx id and index id.
@param block block read from file @param block block read from file
...@@ -1954,8 +1939,7 @@ PageConverter::update_page( ...@@ -1954,8 +1939,7 @@ PageConverter::update_page(
updated then its state must be set to BUF_PAGE_NOT_USED. updated then its state must be set to BUF_PAGE_NOT_USED.
@param block block read from file, note it is not from the buffer pool @param block block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */ @retval DB_SUCCESS or error code. */
dberr_t dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
PageConverter::operator() (os_offset_t, buf_block_t* block) UNIV_NOTHROW
{ {
/* If we already had an old page with matching number /* If we already had an old page with matching number
in the buffer pool, evict it now, because in the buffer pool, evict it now, because
...@@ -3299,15 +3283,29 @@ fil_iterate( ...@@ -3299,15 +3283,29 @@ fil_iterate(
const ulint size = callback.get_page_size().physical(); const ulint size = callback.get_page_size().physical();
ulint n_bytes = iter.n_io_buffers * size; ulint n_bytes = iter.n_io_buffers * size;
const ulint buf_size = srv_page_size
#ifdef HAVE_LZO
+ LZO1X_1_15_MEM_COMPRESS
#elif defined HAVE_SNAPPY
+ snappy_max_compressed_length(srv_page_size)
#endif
;
byte* page_compress_buf = static_cast<byte*>(malloc(buf_size));
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
if (!page_compress_buf) {
return DB_OUT_OF_MEMORY;
}
/* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless /* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
copying for non-index pages. Unfortunately, it is copying for non-index pages. Unfortunately, it is
required by buf_zip_decompress() */ required by buf_zip_decompress() */
dberr_t err = DB_SUCCESS;
for (offset = iter.start; offset < iter.end; offset += n_bytes) { for (offset = iter.start; offset < iter.end; offset += n_bytes) {
if (callback.is_interrupted()) { if (callback.is_interrupted()) {
return DB_INTERRUPTED; err = DB_INTERRUPTED;
goto func_exit;
} }
byte* io_buffer = iter.io_buffer; byte* io_buffer = iter.io_buffer;
...@@ -3337,11 +3335,12 @@ fil_iterate( ...@@ -3337,11 +3335,12 @@ fil_iterate(
IORequest read_request(IORequest::READ); IORequest read_request(IORequest::READ);
read_request.disable_partial_io_warnings(); read_request.disable_partial_io_warnings();
dberr_t err = os_file_read_no_error_handling( err = os_file_read_no_error_handling(
read_request, iter.file, readptr, offset, n_bytes, 0); read_request, iter.file, readptr, offset, n_bytes, 0);
if (err != DB_SUCCESS) { if (err != DB_SUCCESS) {
ib::error() << iter.filepath ib::error() << iter.filepath
<< ": os_file_read() failed"; << ": os_file_read() failed";
goto func_exit;
} }
bool updated = false; bool updated = false;
...@@ -3352,18 +3351,9 @@ fil_iterate( ...@@ -3352,18 +3351,9 @@ fil_iterate(
for (ulint i = 0; i < n_pages_read; for (ulint i = 0; i < n_pages_read;
block->page.id.set_page_no(block->page.id.page_no() + 1), block->page.id.set_page_no(block->page.id.page_no() + 1),
++i, page_off += size, block->frame += size) { ++i, page_off += size, block->frame += size) {
bool decrypted = false;
err = DB_SUCCESS;
byte* src = readptr + i * size; byte* src = readptr + i * size;
byte* dst = io_buffer + i * size;
bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
const bool page_compressed
= page_type
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
const ulint page_no = page_get_page_no(src); const ulint page_no = page_get_page_no(src);
if (!page_no && page_off) { if (!page_no && block->page.id.page_no()) {
const ulint* b = reinterpret_cast<const ulint*> const ulint* b = reinterpret_cast<const ulint*>
(src); (src);
const ulint* const e = b + size / sizeof *b; const ulint* const e = b + size / sizeof *b;
...@@ -3378,54 +3368,84 @@ fil_iterate( ...@@ -3378,54 +3368,84 @@ fil_iterate(
continue; continue;
} }
if (page_no != page_off / size) { if (page_no != block->page.id.page_no()) {
page_corrupted:
ib::warn() << callback.filename()
<< ": Page " << (offset / size)
<< " at offset " << offset
<< " looks corrupted.";
err = DB_CORRUPTION;
goto func_exit;
}
const bool page_compressed
= fil_page_is_compressed_encrypted(src)
|| fil_page_is_compressed(src);
if (page_compressed && block->page.zip.data) {
goto page_corrupted; goto page_corrupted;
} }
if (encrypted) { bool decrypted = false;
byte* dst = io_buffer + i * size;
bool frame_changed = false;
if (!encrypted) {
} else if (!mach_read_from_4(
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ src)) {
not_encrypted:
if (!page_compressed
&& !block->page.zip.data) {
block->frame = src;
frame_changed = true;
} else {
ut_ad(dst != src);
memcpy(dst, src, size);
}
} else {
if (!fil_space_verify_crypt_checksum(
src, callback.get_page_size(),
block->page.id.space(),
block->page.id.page_no())) {
goto page_corrupted;
}
decrypted = fil_space_decrypt( decrypted = fil_space_decrypt(
iter.crypt_data, dst, iter.crypt_data, dst,
callback.get_page_size(), src, &err); callback.get_page_size(), src, &err);
if (err != DB_SUCCESS) { if (err != DB_SUCCESS) {
return err; goto func_exit;
} }
if (decrypted) { if (!decrypted) {
updated = true; goto not_encrypted;
} else {
if (!page_compressed
&& !block->page.zip.data) {
block->frame = src;
frame_changed = true;
} else {
ut_ad(dst != src);
memcpy(dst, src, size);
}
} }
updated = true;
} }
/* If the original page is page_compressed, we need /* If the original page is page_compressed, we need
to decompress it before adjusting further. */ to decompress it before adjusting further. */
if (page_compressed) { if (page_compressed) {
fil_decompress_page(NULL, dst, ulong(size), ulint compress_length = fil_page_decompress(
NULL); page_compress_buf, dst);
ut_ad(compress_length != srv_page_size);
if (compress_length == 0) {
goto page_corrupted;
}
updated = true; updated = true;
} else if (buf_page_is_corrupted( } else if (buf_page_is_corrupted(
false, false,
encrypted && !frame_changed encrypted && !frame_changed
? dst : src, ? dst : src,
callback.get_page_size(), NULL)) { callback.get_page_size(), NULL)) {
page_corrupted: goto page_corrupted;
ib::warn() << callback.filename()
<< ": Page " << (offset / size)
<< " at offset " << offset
<< " looks corrupted.";
return DB_CORRUPTION;
} }
if ((err = callback(page_off, block)) != DB_SUCCESS) { if ((err = callback(block)) != DB_SUCCESS) {
return err; goto func_exit;
} else if (!updated) { } else if (!updated) {
updated = buf_block_get_state(block) updated = buf_block_get_state(block)
== BUF_BLOCK_FILE_PAGE; == BUF_BLOCK_FILE_PAGE;
...@@ -3475,20 +3495,18 @@ fil_iterate( ...@@ -3475,20 +3495,18 @@ fil_iterate(
src = io_buffer + (i * size); src = io_buffer + (i * size);
if (page_compressed) { if (page_compressed) {
ulint len = 0;
fil_compress_page(
NULL,
src,
NULL,
size,
0,/* FIXME: compression level */
512,/* FIXME: use proper block size */
encrypted,
&len);
ut_ad(len <= size);
memset(src + len, 0, size - len);
updated = true; updated = true;
if (ulint len = fil_page_compress(
src,
page_compress_buf,
0,/* FIXME: compression level */
512,/* FIXME: proper block size */
encrypted)) {
/* FIXME: remove memcpy() */
memcpy(src, page_compress_buf, len);
memset(src + len, 0,
srv_page_size - len);
}
} }
/* Encrypt the page if encryption was used. */ /* Encrypt the page if encryption was used. */
...@@ -3520,12 +3538,14 @@ fil_iterate( ...@@ -3520,12 +3538,14 @@ fil_iterate(
writeptr, offset, n_bytes); writeptr, offset, n_bytes);
if (err != DB_SUCCESS) { if (err != DB_SUCCESS) {
return err; goto func_exit;
} }
} }
} }
return DB_SUCCESS; func_exit:
free(page_compress_buf);
return err;
} }
/********************************************************************//** /********************************************************************//**
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment