Commit eaa7bfb5 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12396 IMPORT TABLESPACE: Simplify validation

fil_iterate(): Validate the pages directly.

import_page_status_t, PageConverter::validate(): Remove.

AbstractCallback::filename(): New accessor.

AbstractCallback::is_interrupted(): Replaces periodic_check().

PageConverter::trigger_corruption(): Remove.
parent 6247c64c
......@@ -421,6 +421,8 @@ class AbstractCallback
return(m_page_size);
}
const char* filename() const { return m_filepath; }
/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED. For
......@@ -437,6 +439,8 @@ class AbstractCallback
@return the space id of the tablespace */
virtual ulint get_space_id() const UNIV_NOTHROW = 0;
bool is_interrupted() const { return trx_is_interrupted(m_trx); }
protected:
/**
Get the data page depending on the table type, compressed or not.
......@@ -451,18 +455,6 @@ class AbstractCallback
return(buf_block_get_frame(block));
}
/** Check for session interrupt. If required we could
even flush to disk here every N pages.
@retval DB_SUCCESS or error code */
dberr_t periodic_check() UNIV_NOTHROW
{
if (trx_is_interrupted(m_trx)) {
return(DB_INTERRUPTED);
}
return(DB_SUCCESS);
}
/**
Get the physical offset of the extent descriptor within the page.
@param page_no - page number of the extent descriptor
......@@ -732,11 +724,7 @@ FetchIndexRootPages::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
{
dberr_t err;
if ((err = periodic_check()) != DB_SUCCESS) {
return(err);
}
if (is_interrupted()) return DB_INTERRUPTED;
const page_t* page = get_frame(block);
......@@ -749,9 +737,9 @@ FetchIndexRootPages::operator() (
block->page.offset,
(ulint) (offset / m_page_size));
err = DB_CORRUPTION;
return DB_CORRUPTION;
} else if (page_type == FIL_PAGE_TYPE_XDES) {
err = set_current_xdes(block->page.offset, page);
return set_current_xdes(block->page.offset, page);
} else if (page_type == FIL_PAGE_INDEX
&& !is_free(block->page.offset)
&& is_root_page(page)) {
......@@ -776,7 +764,7 @@ FetchIndexRootPages::operator() (
}
}
return(err);
return DB_SUCCESS;
}
/**
......@@ -900,14 +888,6 @@ class PageConverter : public AbstractCallback {
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW;
private:
/** Status returned by PageConverter::validate() */
enum import_page_status_t {
IMPORT_PAGE_STATUS_OK, /*!< Page is OK */
IMPORT_PAGE_STATUS_ALL_ZERO, /*!< Page is all zeros */
IMPORT_PAGE_STATUS_CORRUPTED /*!< Page is corrupted */
};
/**
Update the page, set the space id, max trx id and index id.
@param block - block read from file
......@@ -917,17 +897,6 @@ class PageConverter : public AbstractCallback {
buf_block_t* block,
ulint& page_type) UNIV_NOTHROW;
#if defined UNIV_DEBUG
/**
@return true error condition is enabled. */
bool trigger_corruption() UNIV_NOTHROW
{
return(false);
}
#else
#define trigger_corruption() (false)
#endif /* UNIV_DEBUG */
/**
Update the space, index id, trx id.
@param block - block to convert
......@@ -940,15 +909,6 @@ class PageConverter : public AbstractCallback {
@retval DB_SUCCESS or error code */
dberr_t update_records(buf_block_t* block) UNIV_NOTHROW;
/**
Validate the page, check for corruption.
@param offset - physical offset within file.
@param page - page read from file.
@return 0 on success, 1 if all zero, 2 if corrupted */
import_page_status_t validate(
os_offset_t offset,
buf_block_t* page) UNIV_NOTHROW;
/**
Validate the space flags and update tablespace header page.
@param block - block read from file, not from the buffer pool.
......@@ -2075,80 +2035,15 @@ PageConverter::update_page(
return(DB_CORRUPTION);
}
/**
Validate the page
@param offset - physical offset within file.
@param page - page read from file.
@return status */
PageConverter::import_page_status_t
PageConverter::validate(
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
{
buf_frame_t* page = get_frame(block);
/* Check that the page number corresponds to the offset in
the file. Flag as corrupt if it doesn't. Disable the check
for LSN in buf_page_is_corrupted() */
if (buf_page_is_corrupted(false, page, get_zip_size(), NULL)
|| (page_get_page_no(page) != offset / m_page_size
&& page_get_page_no(page) != 0)) {
return(IMPORT_PAGE_STATUS_CORRUPTED);
} else if (offset > 0 && page_get_page_no(page) == 0) {
ulint checksum;
checksum = mach_read_from_4(page + FIL_PAGE_SPACE_OR_CHKSUM);
if (checksum != 0) {
/* Checksum check passed in buf_page_is_corrupted(). */
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu checksum " ULINTPF
" should be zero.",
m_filepath, (ulong) (offset / m_page_size),
checksum);
}
const byte* b = page + FIL_PAGE_OFFSET;
const byte* e = page + m_page_size
- FIL_PAGE_END_LSN_OLD_CHKSUM;
/* If the page number is zero and offset > 0 then
the entire page MUST consist of zeroes. If not then
we flag it as corrupt. */
while (b != e) {
if (*b++ && !trigger_corruption()) {
return(IMPORT_PAGE_STATUS_CORRUPTED);
}
}
/* The page is all zero: do nothing. */
return(IMPORT_PAGE_STATUS_ALL_ZERO);
}
return(IMPORT_PAGE_STATUS_OK);
}
/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED.
@param offset - physical offset within the file
@param block - block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
dberr_t
PageConverter::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
PageConverter::operator() (os_offset_t, buf_block_t* block) UNIV_NOTHROW
{
ulint page_type;
dberr_t err = DB_SUCCESS;
if ((err = periodic_check()) != DB_SUCCESS) {
return(err);
}
if (is_compressed_table()) {
m_page_zip_ptr = &block->page.zip;
......@@ -2156,16 +2051,8 @@ PageConverter::operator() (
ut_ad(m_page_zip_ptr == 0);
}
switch(validate(offset, block)) {
case IMPORT_PAGE_STATUS_OK:
/* We have to decompress the compressed pages before
we can work on them */
if ((err = update_page(block, page_type)) != DB_SUCCESS) {
break;
}
dberr_t err = update_page(block, page_type);
if (err == DB_SUCCESS) {
/* Note: For compressed pages this function will write to the
zip descriptor and for uncompressed pages it will write to
page (ie. the block->frame). Therefore the caller should write
......@@ -2187,23 +2074,9 @@ PageConverter::operator() (
get_frame(block), get_zip_size(),
m_current_lsn);
}
break;
case IMPORT_PAGE_STATUS_ALL_ZERO:
/* The page is all zero: leave it as is. */
break;
case IMPORT_PAGE_STATUS_CORRUPTED:
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset " UINT64PF " looks corrupted.",
m_filepath, (ulong) (offset / m_page_size), offset);
err = DB_CORRUPTION;
}
/* If we already had and old page with matching number
/* If we already had an old page with matching number
in the buffer pool, evict it now, because
we no longer evict the pages on DISCARD TABLESPACE. */
buf_page_get_gen(get_space_id(), get_zip_size(), block->page.offset,
......@@ -3519,8 +3392,6 @@ fil_iterate(
AbstractCallback& callback)
{
os_offset_t offset;
ulint page_no = 0;
ulint space_id = callback.get_space_id();
ulint n_bytes = iter.n_io_buffers * iter.page_size;
ut_ad(!srv_read_only_mode);
......@@ -3531,6 +3402,9 @@ fil_iterate(
const bool row_compressed = callback.get_zip_size() > 0;
for (offset = iter.start; offset < iter.end; offset += n_bytes) {
if (callback.is_interrupted()) {
return DB_INTERRUPTED;
}
byte* io_buffer = iter.io_buffer;
......@@ -3572,30 +3446,45 @@ fil_iterate(
os_offset_t page_off = offset;
ulint n_pages_read = (ulint) n_bytes / iter.page_size;
bool decrypted = false;
const ulint size = iter.page_size;
block->page.offset = page_off / size;
for (ulint i = 0; i < n_pages_read; ++i) {
ulint size = iter.page_size;
for (ulint i = 0; i < n_pages_read;
++i, page_off += size, block->frame += size,
block->page.offset++) {
dberr_t err = DB_SUCCESS;
byte* src = readptr + (i * size);
byte* dst = io_buffer + (i * size);
bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
const bool page_compressed
= page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
= page_type
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
const ulint page_no = page_get_page_no(src);
if (!page_no && page_off) {
const ulint* b = reinterpret_cast<const ulint*>
(src);
const ulint* const e = b + size / sizeof *b;
do {
if (*b++) {
goto page_corrupted;
}
} while (b != e);
/* Proceed to the next page,
because this one is all zero. */
continue;
}
if (page_no != page_off / size) {
goto page_corrupted;
}
/* If tablespace is encrypted, we need to decrypt
the page. Note that tablespaces are not in
fil_system during import. */
if (encrypted) {
decrypted = fil_space_decrypt(
iter.crypt_data,
dst, //dst
iter.page_size,
src, // src
&err);
iter.crypt_data, dst,
iter.page_size, src, &err);
if (err != DB_SUCCESS) {
return err;
......@@ -3619,10 +3508,20 @@ fil_iterate(
fil_decompress_page(NULL, dst, ulong(size),
NULL);
updated = true;
} else if (buf_page_is_corrupted(
false,
encrypted && !frame_changed
? dst : src,
callback.get_zip_size(), NULL)) {
page_corrupted:
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset "
UINT64PF " looks corrupted.",
callback.filename(),
ulong(offset / size), offset);
return DB_CORRUPTION;
}
buf_block_set_file_page(block, space_id, page_no++);
if ((err = callback(page_off, block)) != DB_SUCCESS) {
return err;
} else if (!updated) {
......@@ -3714,9 +3613,6 @@ fil_iterate(
updated = true;
}
page_off += iter.page_size;
block->frame += iter.page_size;
}
/* A page was updated in the set, write back to disk. */
......@@ -3820,6 +3716,7 @@ fil_tablespace_iterate(
memset(&block, 0, sizeof block);
block.frame = page;
block.page.space = callback.get_space_id();
block.page.io_fix = BUF_IO_NONE;
block.page.buf_fix_count = 1;
block.page.state = BUF_BLOCK_FILE_PAGE;
......
......@@ -421,6 +421,8 @@ class AbstractCallback
return(m_page_size);
}
const char* filename() const { return m_filepath; }
/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED. For
......@@ -437,6 +439,8 @@ class AbstractCallback
@return the space id of the tablespace */
virtual ulint get_space_id() const UNIV_NOTHROW = 0;
bool is_interrupted() const { return trx_is_interrupted(m_trx); }
protected:
/**
Get the data page depending on the table type, compressed or not.
......@@ -451,18 +455,6 @@ class AbstractCallback
return(buf_block_get_frame(block));
}
/** Check for session interrupt. If required we could
even flush to disk here every N pages.
@retval DB_SUCCESS or error code */
dberr_t periodic_check() UNIV_NOTHROW
{
if (trx_is_interrupted(m_trx)) {
return(DB_INTERRUPTED);
}
return(DB_SUCCESS);
}
/**
Get the physical offset of the extent descriptor within the page.
@param page_no - page number of the extent descriptor
......@@ -732,11 +724,7 @@ FetchIndexRootPages::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
{
dberr_t err;
if ((err = periodic_check()) != DB_SUCCESS) {
return(err);
}
if (is_interrupted()) return DB_INTERRUPTED;
const page_t* page = get_frame(block);
......@@ -749,9 +737,9 @@ FetchIndexRootPages::operator() (
block->page.offset,
(ulint) (offset / m_page_size));
err = DB_CORRUPTION;
return DB_CORRUPTION;
} else if (page_type == FIL_PAGE_TYPE_XDES) {
err = set_current_xdes(block->page.offset, page);
return set_current_xdes(block->page.offset, page);
} else if (page_type == FIL_PAGE_INDEX
&& !is_free(block->page.offset)
&& is_root_page(page)) {
......@@ -776,7 +764,7 @@ FetchIndexRootPages::operator() (
}
}
return(err);
return DB_SUCCESS;
}
/**
......@@ -900,14 +888,6 @@ class PageConverter : public AbstractCallback {
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW;
private:
/** Status returned by PageConverter::validate() */
enum import_page_status_t {
IMPORT_PAGE_STATUS_OK, /*!< Page is OK */
IMPORT_PAGE_STATUS_ALL_ZERO, /*!< Page is all zeros */
IMPORT_PAGE_STATUS_CORRUPTED /*!< Page is corrupted */
};
/**
Update the page, set the space id, max trx id and index id.
@param block - block read from file
......@@ -917,17 +897,6 @@ class PageConverter : public AbstractCallback {
buf_block_t* block,
ulint& page_type) UNIV_NOTHROW;
#if defined UNIV_DEBUG
/**
@return true error condition is enabled. */
bool trigger_corruption() UNIV_NOTHROW
{
return(false);
}
#else
#define trigger_corruption() (false)
#endif /* UNIV_DEBUG */
/**
Update the space, index id, trx id.
@param block - block to convert
......@@ -940,15 +909,6 @@ class PageConverter : public AbstractCallback {
@retval DB_SUCCESS or error code */
dberr_t update_records(buf_block_t* block) UNIV_NOTHROW;
/**
Validate the page, check for corruption.
@param offset - physical offset within file.
@param page - page read from file.
@return 0 on success, 1 if all zero, 2 if corrupted */
import_page_status_t validate(
os_offset_t offset,
buf_block_t* page) UNIV_NOTHROW;
/**
Validate the space flags and update tablespace header page.
@param block - block read from file, not from the buffer pool.
......@@ -2075,80 +2035,15 @@ PageConverter::update_page(
return(DB_CORRUPTION);
}
/**
Validate the page
@param offset - physical offset within file.
@param page - page read from file.
@return status */
PageConverter::import_page_status_t
PageConverter::validate(
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
{
buf_frame_t* page = get_frame(block);
/* Check that the page number corresponds to the offset in
the file. Flag as corrupt if it doesn't. Disable the check
for LSN in buf_page_is_corrupted() */
if (buf_page_is_corrupted(false, page, get_zip_size(), NULL)
|| (page_get_page_no(page) != offset / m_page_size
&& page_get_page_no(page) != 0)) {
return(IMPORT_PAGE_STATUS_CORRUPTED);
} else if (offset > 0 && page_get_page_no(page) == 0) {
ulint checksum;
checksum = mach_read_from_4(page + FIL_PAGE_SPACE_OR_CHKSUM);
if (checksum != 0) {
/* Checksum check passed in buf_page_is_corrupted(). */
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu checksum " ULINTPF
" should be zero.",
m_filepath, (ulong) (offset / m_page_size),
checksum);
}
const byte* b = page + FIL_PAGE_OFFSET;
const byte* e = page + m_page_size
- FIL_PAGE_END_LSN_OLD_CHKSUM;
/* If the page number is zero and offset > 0 then
the entire page MUST consist of zeroes. If not then
we flag it as corrupt. */
while (b != e) {
if (*b++ && !trigger_corruption()) {
return(IMPORT_PAGE_STATUS_CORRUPTED);
}
}
/* The page is all zero: do nothing. */
return(IMPORT_PAGE_STATUS_ALL_ZERO);
}
return(IMPORT_PAGE_STATUS_OK);
}
/**
Called for every page in the tablespace. If the page was not
updated then its state must be set to BUF_PAGE_NOT_USED.
@param offset - physical offset within the file
@param block - block read from file, note it is not from the buffer pool
@retval DB_SUCCESS or error code. */
dberr_t
PageConverter::operator() (
os_offset_t offset,
buf_block_t* block) UNIV_NOTHROW
PageConverter::operator() (os_offset_t, buf_block_t* block) UNIV_NOTHROW
{
ulint page_type;
dberr_t err = DB_SUCCESS;
if ((err = periodic_check()) != DB_SUCCESS) {
return(err);
}
if (is_compressed_table()) {
m_page_zip_ptr = &block->page.zip;
......@@ -2156,16 +2051,8 @@ PageConverter::operator() (
ut_ad(m_page_zip_ptr == 0);
}
switch(validate(offset, block)) {
case IMPORT_PAGE_STATUS_OK:
/* We have to decompress the compressed pages before
we can work on them */
if ((err = update_page(block, page_type)) != DB_SUCCESS) {
break;
}
dberr_t err = update_page(block, page_type);
if (err == DB_SUCCESS) {
/* Note: For compressed pages this function will write to the
zip descriptor and for uncompressed pages it will write to
page (ie. the block->frame). Therefore the caller should write
......@@ -2187,23 +2074,9 @@ PageConverter::operator() (
get_frame(block), get_zip_size(),
m_current_lsn);
}
break;
case IMPORT_PAGE_STATUS_ALL_ZERO:
/* The page is all zero: leave it as is. */
break;
case IMPORT_PAGE_STATUS_CORRUPTED:
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset " UINT64PF " looks corrupted.",
m_filepath, (ulong) (offset / m_page_size), offset);
err = DB_CORRUPTION;
}
/* If we already had and old page with matching number
/* If we already had an old page with matching number
in the buffer pool, evict it now, because
we no longer evict the pages on DISCARD TABLESPACE. */
buf_page_get_gen(get_space_id(), get_zip_size(), block->page.offset,
......@@ -3519,8 +3392,6 @@ fil_iterate(
AbstractCallback& callback)
{
os_offset_t offset;
ulint page_no = 0;
ulint space_id = callback.get_space_id();
ulint n_bytes = iter.n_io_buffers * iter.page_size;
ut_ad(!srv_read_only_mode);
......@@ -3531,6 +3402,9 @@ fil_iterate(
const bool row_compressed = callback.get_zip_size() > 0;
for (offset = iter.start; offset < iter.end; offset += n_bytes) {
if (callback.is_interrupted()) {
return DB_INTERRUPTED;
}
byte* io_buffer = iter.io_buffer;
......@@ -3572,30 +3446,45 @@ fil_iterate(
os_offset_t page_off = offset;
ulint n_pages_read = (ulint) n_bytes / iter.page_size;
bool decrypted = false;
const ulint size = iter.page_size;
block->page.offset = page_off / size;
for (ulint i = 0; i < n_pages_read; ++i) {
ulint size = iter.page_size;
for (ulint i = 0; i < n_pages_read;
++i, page_off += size, block->frame += size,
block->page.offset++) {
dberr_t err = DB_SUCCESS;
byte* src = readptr + (i * size);
byte* dst = io_buffer + (i * size);
bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
const bool page_compressed
= page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
= page_type
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED;
const ulint page_no = page_get_page_no(src);
if (!page_no && page_off) {
const ulint* b = reinterpret_cast<const ulint*>
(src);
const ulint* const e = b + size / sizeof *b;
do {
if (*b++) {
goto page_corrupted;
}
} while (b != e);
/* Proceed to the next page,
because this one is all zero. */
continue;
}
if (page_no != page_off / size) {
goto page_corrupted;
}
/* If tablespace is encrypted, we need to decrypt
the page. Note that tablespaces are not in
fil_system during import. */
if (encrypted) {
decrypted = fil_space_decrypt(
iter.crypt_data,
dst, //dst
iter.page_size,
src, // src
&err);
iter.crypt_data, dst,
iter.page_size, src, &err);
if (err != DB_SUCCESS) {
return err;
......@@ -3619,10 +3508,20 @@ fil_iterate(
fil_decompress_page(NULL, dst, ulong(size),
NULL);
updated = true;
} else if (buf_page_is_corrupted(
false,
encrypted && !frame_changed
? dst : src,
callback.get_zip_size(), NULL)) {
page_corrupted:
ib_logf(IB_LOG_LEVEL_WARN,
"%s: Page %lu at offset "
UINT64PF " looks corrupted.",
callback.filename(),
ulong(offset / size), offset);
return DB_CORRUPTION;
}
buf_block_set_file_page(block, space_id, page_no++);
if ((err = callback(page_off, block)) != DB_SUCCESS) {
return err;
} else if (!updated) {
......@@ -3714,9 +3613,6 @@ fil_iterate(
updated = true;
}
page_off += iter.page_size;
block->frame += iter.page_size;
}
/* A page was updated in the set, write back to disk. */
......@@ -3820,6 +3716,7 @@ fil_tablespace_iterate(
memset(&block, 0, sizeof block);
block.frame = page;
block.page.space = callback.get_space_id();
block.page.io_fix = BUF_IO_NONE;
block.page.buf_fix_count = 1;
block.page.state = BUF_BLOCK_FILE_PAGE;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment