Commit 169c0099 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12699 Improve crash recovery of corrupted data pages

InnoDB crash recovery used to read every data page for which
redo log exists. This is unnecessary for those pages that are
initialized by the redo log. If a newly created page is corrupted,
recovery could unnecessarily fail. It would suffice to reinitialize
the page based on the redo log records.

To add insult to injury, InnoDB crash recovery could hang if it
encountered a corrupted page. We will fix also that problem.
InnoDB would normally refuse to start up if it encounters a
corrupted page on recovery, but that can be overridden by
setting innodb_force_recovery=1.

Data pages are completely initialized by the records
MLOG_INIT_FILE_PAGE2 and MLOG_ZIP_PAGE_COMPRESS.
MariaDB 10.4 additionally recognizes MLOG_INIT_FREE_PAGE,
which notifies that a page has been freed and its contents
can be discarded (filled with zeroes).

The record MLOG_INDEX_LOAD notifies that redo logging has
been re-enabled after being disabled. We can avoid loading
the page if all buffered redo log records predate the
MLOG_INDEX_LOAD record.

For the internal tables of FULLTEXT INDEX, no MLOG_INDEX_LOAD
records were written before commit aa3f7a10.
Hence, we will skip these optimizations for tables whose
name starts with FTS_.

This is joint work with Thirunarayanan Balathandayuthapani.

fil_space_t::enable_lsn, file_name_t::enable_lsn: The LSN of the
latest recovered MLOG_INDEX_LOAD record for a tablespace.

mlog_init: Page initialization operations discovered during
redo log scanning. FIXME: This really belongs in recv_sys->addr_hash,
and should be removed in MDEV-19176.

recv_addr_state: Add the new state RECV_WILL_NOT_READ to
indicate that according to mlog_init, the page will be
initialized based on redo log record contents.

recv_add_to_hash_table(): Set the RECV_WILL_NOT_READ state
if appropriate. For now, we do not treat MLOG_ZIP_PAGE_COMPRESS
as page initialization. This works around bugs in the crash
recovery of ROW_FORMAT=COMPRESSED tables.

recv_mark_log_index_load(): Process a MLOG_INDEX_LOAD record
by resetting the state to RECV_NOT_PROCESSED and by updating
the fil_name_t::enable_lsn.

recv_init_crash_recovery_spaces(): Copy fil_name_t::enable_lsn
to fil_space_t::enable_lsn.

recv_recover_page(): Add the parameter init_lsn, to ignore
any log records that precede the page initialization.
Add DBUG output about skipped operations.

buf_page_create(): Initialize FIL_PAGE_LSN, so that
recv_recover_page() will not wrongly skip applying
the page-initialization record due to the field containing
some newer LSN as a leftover from a different page.
Do not invoke ibuf_merge_or_delete_for_page() during
crash recovery.

recv_apply_hashed_log_recs(): Remove some unnecessary lookups.
Note if a corrupted page was found during recovery.
After invoking buf_page_create(), do invoke
ibuf_merge_or_delete_for_page() via mlog_init.ibuf_merge()
in the last recovery batch.

ibuf_merge_or_delete_for_page(): Relax a debug assertion.

innobase_start_or_create_for_mysql(): Abort startup if
a corrupted page was found during recovery. Corrupted pages
will not be flagged if innodb_force_recovery is set.
However, the recv_sys->found_corrupt_fs flag can be set
regardless of innodb_force_recovery if file names are found
to be incorrect (for example, multiple files with the same
tablespace ID).
parent 376bf4ed
CREATE TABLE t1(a BIGINT PRIMARY KEY) ENGINE=InnoDB, ENCRYPTED=YES;
INSERT INTO t1 VALUES(1);
CREATE TABLE t2(a BIGINT PRIMARY KEY) ENGINE=InnoDB, ENCRYPTED=YES;
INSERT INTO t1 VALUES(2);
SET GLOBAL innodb_flush_log_at_trx_commit=1;
INSERT INTO t2 VALUES(2);
# Kill the server
# Corrupt the pages
SELECT * FROM t1;
ERROR 42000: Unknown storage engine 'InnoDB'
SELECT * FROM t1;
a
1
2
SELECT * FROM t2;
a
2
CHECK TABLE t1,t2;
Table Op Msg_type Msg_text
test.t1 check status OK
test.t2 check status OK
DROP TABLE t1, t2;
--source include/have_innodb.inc
--source include/have_file_key_management_plugin.inc
--disable_query_log
call mtr.add_suppression("InnoDB: Plugin initialization aborted");
call mtr.add_suppression("Plugin 'InnoDB' init function returned error");
call mtr.add_suppression("Plugin 'InnoDB' registration as a STORAGE ENGINE failed");
call mtr.add_suppression("InnoDB: Database page corruption on disk or a failed file read of tablespace test/t1 page");
call mtr.add_suppression("InnoDB: Failed to read file '.*test.t1\\.ibd' at offset 3: Table is encrypted but decrypt failed");
call mtr.add_suppression("InnoDB: The page \\[page id: space=\\d+, page number=3\\] in file '.*test.t1\\.ibd' cannot be decrypted");
call mtr.add_suppression("InnoDB: Table in tablespace \\d+ encrypted. However key management plugin or used key_version \\d+ is not found or used encryption algorithm or method does not match. Can't continue opening the table.");
--enable_query_log
let INNODB_PAGE_SIZE=`select @@innodb_page_size`;
CREATE TABLE t1(a BIGINT PRIMARY KEY) ENGINE=InnoDB, ENCRYPTED=YES;
INSERT INTO t1 VALUES(1);
# Force a redo log checkpoint.
--source include/restart_mysqld.inc
--source ../../suite/innodb/include/no_checkpoint_start.inc
CREATE TABLE t2(a BIGINT PRIMARY KEY) ENGINE=InnoDB, ENCRYPTED=YES;
INSERT INTO t1 VALUES(2);
SET GLOBAL innodb_flush_log_at_trx_commit=1;
INSERT INTO t2 VALUES(2);
--let CLEANUP_IF_CHECKPOINT=DROP TABLE t1,t2;
--source ../../suite/innodb/include/no_checkpoint_end.inc
--echo # Corrupt the pages
perl;
my $ps = $ENV{INNODB_PAGE_SIZE};
my $file = "$ENV{MYSQLD_DATADIR}/test/t1.ibd";
open(FILE, "+<$file") || die "Unable to open $file";
binmode FILE;
seek (FILE, $ENV{INNODB_PAGE_SIZE} * 3, SEEK_SET) or die "seek";
print FILE "junk";
close FILE or die "close";
$file = "$ENV{MYSQLD_DATADIR}/test/t2.ibd";
open(FILE, "+<$file") || die "Unable to open $file";
binmode FILE;
# Corrupt pages 1 to 3. MLOG_INIT_FILE_PAGE2 should protect us!
# Unfortunately, we are not immune to page 0 corruption.
seek (FILE, $ps, SEEK_SET) or die "seek";
print FILE chr(0xff) x ($ps * 3);
close FILE or die "close";
EOF
--source include/start_mysqld.inc
--error ER_UNKNOWN_STORAGE_ENGINE
SELECT * FROM t1;
let $restart_parameters=--innodb_force_recovery=1;
--source include/restart_mysqld.inc
SELECT * FROM t1;
SELECT * FROM t2;
CHECK TABLE t1,t2;
DROP TABLE t1, t2;
CREATE TABLE t1(a BIGINT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES(1);
CREATE TABLE t2(a BIGINT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES(2);
SET GLOBAL innodb_flush_log_at_trx_commit=1;
INSERT INTO t2 VALUES(1);
# Kill the server
# Corrupt the pages
SELECT * FROM t1;
ERROR 42000: Unknown storage engine 'InnoDB'
SELECT * FROM t1;
a
0
2
SELECT * FROM t2;
a
1
CHECK TABLE t1,t2;
Table Op Msg_type Msg_text
test.t1 check status OK
test.t2 check status OK
DROP TABLE t1, t2;
--source include/have_innodb.inc
--disable_query_log
call mtr.add_suppression("InnoDB: Plugin initialization aborted");
call mtr.add_suppression("Plugin 'InnoDB' init function returned error");
call mtr.add_suppression("Plugin 'InnoDB' registration as a STORAGE ENGINE failed");
call mtr.add_suppression("InnoDB: Database page corruption on disk or a failed file read of tablespace test/t1 page");
call mtr.add_suppression("InnoDB: Failed to read file '.*test.t1\\.ibd' at offset 3: Page read from tablespace is corrupted.");
--enable_query_log
let INNODB_PAGE_SIZE=`select @@innodb_page_size`;
CREATE TABLE t1(a BIGINT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES(1);
# Force a redo log checkpoint.
--source include/restart_mysqld.inc
--source ../include/no_checkpoint_start.inc
CREATE TABLE t2(a BIGINT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES(2);
SET GLOBAL innodb_flush_log_at_trx_commit=1;
INSERT INTO t2 VALUES(1);
--let CLEANUP_IF_CHECKPOINT=DROP TABLE t1,t2;
--source ../include/no_checkpoint_end.inc
--echo # Corrupt the pages
perl;
my $ps = $ENV{INNODB_PAGE_SIZE};
my $file = "$ENV{MYSQLD_DATADIR}/test/t1.ibd";
open(FILE, "+<$file") || die "Unable to open $file";
binmode FILE;
sysseek(FILE, 3*$ps, 0) || die "Unable to seek $file\n";
die "Unable to read $file" unless sysread(FILE, $page, $ps) == $ps;
# Replace the a=1 with a=0.
$page =~ s/\x80\x0\x0\x0\x0\x0\x0\x1/\x80\x0\x0\x0\x0\x0\x0\x0/;
sysseek(FILE, 3*$ps, 0) || die "Unable to seek $file\n";
syswrite(FILE, $page, $ps)==$ps || die "Unable to write $file\n";
close FILE or die "close";
$file = "$ENV{MYSQLD_DATADIR}/test/t2.ibd";
open(FILE, "+<$file") || die "Unable to open $file";
binmode FILE;
# Corrupt pages 1 to 3. MLOG_INIT_FILE_PAGE2 should protect us!
# Unfortunately, we are not immune to page 0 corruption.
seek (FILE, $ps, SEEK_SET) or die "seek";
print FILE chr(0xff) x ($ps * 3);
close FILE or die "close";
EOF
--source include/start_mysqld.inc
--error ER_UNKNOWN_STORAGE_ENGINE
SELECT * FROM t1;
let $restart_parameters=--innodb_force_recovery=1;
--source include/restart_mysqld.inc
SELECT * FROM t1;
SELECT * FROM t2;
CHECK TABLE t1,t2;
DROP TABLE t1, t2;
...@@ -5567,7 +5567,15 @@ buf_page_create( ...@@ -5567,7 +5567,15 @@ buf_page_create(
buf_block_free(free_block); buf_block_free(free_block);
return(buf_page_get_with_no_latch(page_id, page_size, mtr)); if (!recv_recovery_is_on()) {
return buf_page_get_with_no_latch(page_id, page_size,
mtr);
}
mutex_exit(&recv_sys->mutex);
block = buf_page_get_with_no_latch(page_id, page_size, mtr);
mutex_enter(&recv_sys->mutex);
return block;
} }
/* If we get here, the page was not in buf_pool: init it there */ /* If we get here, the page was not in buf_pool: init it there */
...@@ -5633,7 +5641,9 @@ buf_page_create( ...@@ -5633,7 +5641,9 @@ buf_page_create(
/* Delete possible entries for the page from the insert buffer: /* Delete possible entries for the page from the insert buffer:
such can exist if the page belonged to an index which was dropped */ such can exist if the page belonged to an index which was dropped */
ibuf_merge_or_delete_for_page(NULL, page_id, &page_size, TRUE); if (!recv_recovery_is_on()) {
ibuf_merge_or_delete_for_page(NULL, page_id, &page_size, TRUE);
}
frame = block->frame; frame = block->frame;
...@@ -5648,6 +5658,7 @@ buf_page_create( ...@@ -5648,6 +5658,7 @@ buf_page_create(
(3) key_version on encrypted pages (not page 0:0) */ (3) key_version on encrypted pages (not page 0:0) */
memset(frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 0, 8); memset(frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 0, 8);
memset(frame + FIL_PAGE_LSN, 0, 8);
#if defined UNIV_DEBUG || defined UNIV_BUF_DEBUG #if defined UNIV_DEBUG || defined UNIV_BUF_DEBUG
ut_a(++buf_dbg_counter % 5771 || buf_validate()); ut_a(++buf_dbg_counter % 5771 || buf_validate());
......
/***************************************************************************** /*****************************************************************************
Copyright (c) 1997, 2016, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 1997, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2016, 2018, MariaDB Corporation. Copyright (c) 2016, 2019, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
...@@ -4450,7 +4450,8 @@ ibuf_merge_or_delete_for_page( ...@@ -4450,7 +4450,8 @@ ibuf_merge_or_delete_for_page(
ulint dops[IBUF_OP_COUNT]; ulint dops[IBUF_OP_COUNT];
ut_ad(block == NULL || page_id == block->page.id); ut_ad(block == NULL || page_id == block->page.id);
ut_ad(block == NULL || buf_block_get_io_fix(block) == BUF_IO_READ); ut_ad(block == NULL || buf_block_get_io_fix(block) == BUF_IO_READ
|| recv_recovery_is_on());
if (srv_force_recovery >= SRV_FORCE_NO_IBUF_MERGE if (srv_force_recovery >= SRV_FORCE_NO_IBUF_MERGE
|| trx_sys_hdr_page(page_id) || trx_sys_hdr_page(page_id)
......
...@@ -235,6 +235,15 @@ class page_id_t { ...@@ -235,6 +235,15 @@ class page_id_t {
} }
bool operator!=(const page_id_t& rhs) const { return !(*this == rhs); } bool operator!=(const page_id_t& rhs) const { return !(*this == rhs); }
bool operator<(const page_id_t& rhs) const
{
if (m_space == rhs.m_space) {
return m_page_no < rhs.m_page_no;
}
return m_space < rhs.m_space;
}
/** Retrieve the tablespace id. /** Retrieve the tablespace id.
@return tablespace id */ @return tablespace id */
uint32_t space() const { return m_space; } uint32_t space() const { return m_space; }
......
...@@ -90,6 +90,9 @@ struct fil_space_t { ...@@ -90,6 +90,9 @@ struct fil_space_t {
Protected by log_sys->mutex. Protected by log_sys->mutex.
If and only if this is nonzero, the If and only if this is nonzero, the
tablespace will be in named_spaces. */ tablespace will be in named_spaces. */
/** Log sequence number of the latest MLOG_INDEX_LOAD record
that was found while parsing the redo log */
lsn_t enable_lsn;
bool stop_new_ops; bool stop_new_ops;
/*!< we set this true when we start /*!< we set this true when we start
deleting a single-table tablespace. deleting a single-table tablespace.
......
...@@ -155,10 +155,22 @@ struct file_name_t { ...@@ -155,10 +155,22 @@ struct file_name_t {
/** FSP_SIZE of tablespace */ /** FSP_SIZE of tablespace */
ulint size; ulint size;
/** the log sequence number of the last observed MLOG_INDEX_LOAD
record for the tablespace */
lsn_t enable_lsn;
/** Constructor */ /** Constructor */
file_name_t(std::string name_, bool deleted) : file_name_t(std::string name_, bool deleted) :
name(name_), space(NULL), status(deleted ? DELETED: NORMAL), name(name_), space(NULL), status(deleted ? DELETED: NORMAL),
size(0) {} size(0), enable_lsn(0) {}
/** Report a MLOG_INDEX_LOAD operation, meaning that
mlog_init for any earlier LSN must be skipped.
@param lsn log sequence number of the MLOG_INDEX_LOAD */
void mlog_index_load(lsn_t lsn)
{
if (enable_lsn < lsn) enable_lsn = lsn;
}
}; };
/** Map of dirty tablespaces during recovery */ /** Map of dirty tablespaces during recovery */
...@@ -174,6 +186,8 @@ static recv_spaces_t recv_spaces; ...@@ -174,6 +186,8 @@ static recv_spaces_t recv_spaces;
enum recv_addr_state { enum recv_addr_state {
/** not yet processed */ /** not yet processed */
RECV_NOT_PROCESSED, RECV_NOT_PROCESSED,
/** not processed; the page will be reinitialized */
RECV_WILL_NOT_READ,
/** page is being read */ /** page is being read */
RECV_BEING_READ, RECV_BEING_READ,
/** log records are being applied on the page */ /** log records are being applied on the page */
...@@ -220,6 +234,117 @@ void (*log_file_op)(ulint space_id, const byte* flags, ...@@ -220,6 +234,117 @@ void (*log_file_op)(ulint space_id, const byte* flags,
const byte* name, ulint len, const byte* name, ulint len,
const byte* new_name, ulint new_len); const byte* new_name, ulint new_len);
/** Information about initializing page contents during redo log processing */
class mlog_init_t
{
public:
/** A page initialization operation that was parsed from
the redo log */
struct init {
/** log sequence number of the page initialization */
lsn_t lsn;
/** Whether btr_page_create() avoided a read of the page.
At the end of the last recovery batch, ibuf_merge()
will invoke change buffer merge for pages that reside
in the buffer pool. (In the last batch, loading pages
would trigger change buffer merge.) */
bool created;
};
private:
typedef std::map<const page_id_t, init,
std::less<const page_id_t>,
ut_allocator<std::pair<const page_id_t, init> > >
map;
/** Map of page initialization operations.
FIXME: Merge this to recv_sys->addr_hash! */
map inits;
public:
/** Record that a page will be initialized by the redo log.
@param[in] space tablespace identifier
@param[in] page_no page number
@param[in] lsn log sequence number */
void add(ulint space, ulint page_no, lsn_t lsn)
{
ut_ad(mutex_own(&recv_sys->mutex));
const init init = { lsn, false };
std::pair<map::iterator, bool> p = inits.insert(
map::value_type(page_id_t(space, page_no), init));
ut_ad(!p.first->second.created);
if (!p.second && p.first->second.lsn < init.lsn) {
p.first->second = init;
}
}
/** Get the last stored lsn of the page id and its respective
init/load operation.
@param[in] page_id page id
@param[in,out] init initialize log or load log
@return the latest page initialization;
not valid after releasing recv_sys->mutex. */
init& last(page_id_t page_id)
{
ut_ad(mutex_own(&recv_sys->mutex));
return inits.find(page_id)->second;
}
/** At the end of each recovery batch, reset the 'created' flags. */
void reset()
{
ut_ad(mutex_own(&recv_sys->mutex));
ut_ad(recv_no_ibuf_operations);
for (map::iterator i= inits.begin(); i != inits.end(); i++) {
i->second.created = false;
}
}
/** On the last recovery batch, merge buffered changes to those
pages that were initialized by buf_page_create() and still reside
in the buffer pool. Stale pages are not allowed in the buffer pool.
Note: When MDEV-14481 implements redo log apply in the
background, we will have to ensure that buf_page_get_gen()
will not deliver stale pages to users (pages on which the
change buffer was not merged yet). Normally, the change
buffer merge is performed on I/O completion. Maybe, add a
flag to buf_page_t and perform the change buffer merge on
the first actual access?
@param[in,out] mtr dummy mini-transaction */
void ibuf_merge(mtr_t& mtr)
{
ut_ad(mutex_own(&recv_sys->mutex));
ut_ad(!recv_no_ibuf_operations);
mtr.start();
for (map::const_iterator i= inits.begin(); i != inits.end();
i++) {
if (!i->second.created) {
continue;
}
if (buf_block_t* block = buf_page_get_gen(
i->first, univ_page_size, RW_X_LATCH, NULL,
BUF_GET_IF_IN_POOL, __FILE__, __LINE__,
&mtr, NULL)) {
mutex_exit(&recv_sys->mutex);
ibuf_merge_or_delete_for_page(
block, i->first,
&block->page.size, true);
mtr.commit();
mtr.start();
mutex_enter(&recv_sys->mutex);
}
}
mtr.commit();
}
/** Clear the data structure */
void clear() { inits.clear(); }
};
static mlog_init_t mlog_init;
/** Process a MLOG_CREATE2 record that indicates that a tablespace /** Process a MLOG_CREATE2 record that indicates that a tablespace
is being shrunk in size. is being shrunk in size.
@param[in] space_id tablespace identifier @param[in] space_id tablespace identifier
...@@ -621,6 +746,7 @@ recv_sys_close() ...@@ -621,6 +746,7 @@ recv_sys_close()
} }
recv_spaces.clear(); recv_spaces.clear();
mlog_init.clear();
} }
/************************************************************ /************************************************************
...@@ -1843,6 +1969,18 @@ recv_add_to_hash_table( ...@@ -1843,6 +1969,18 @@ recv_add_to_hash_table(
recv_sys->n_addrs++; recv_sys->n_addrs++;
} }
switch (type) {
case MLOG_INIT_FILE_PAGE2:
case MLOG_ZIP_PAGE_COMPRESS:
/* Ignore any earlier redo log records for this page. */
ut_ad(recv_addr->state == RECV_NOT_PROCESSED
|| recv_addr->state == RECV_WILL_NOT_READ);
recv_addr->state = RECV_WILL_NOT_READ;
mlog_init.add(space, page_no, start_lsn);
default:
break;
}
UT_LIST_ADD_LAST(recv_addr->rec_list, recv); UT_LIST_ADD_LAST(recv_addr->rec_list, recv);
prev_field = &(recv->data); prev_field = &(recv->data);
...@@ -1911,9 +2049,10 @@ recv_data_copy_to_buf( ...@@ -1911,9 +2049,10 @@ recv_data_copy_to_buf(
lsn of a log record. lsn of a log record.
@param[in,out] block buffer pool page @param[in,out] block buffer pool page
@param[in,out] mtr mini-transaction @param[in,out] mtr mini-transaction
@param[in,out] recv_addr recovery address */ @param[in,out] recv_addr recovery address
@param[in] init_lsn the initial LSN where to start recovery */
static void recv_recover_page(buf_block_t* block, mtr_t& mtr, static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
recv_addr_t* recv_addr) recv_addr_t* recv_addr, lsn_t init_lsn = 0)
{ {
page_t* page; page_t* page;
page_zip_des_t* page_zip; page_zip_des_t* page_zip;
...@@ -1961,6 +2100,15 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr, ...@@ -1961,6 +2100,15 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
if (recv->start_lsn < page_lsn) { if (recv->start_lsn < page_lsn) {
/* Ignore this record, because there are later changes /* Ignore this record, because there are later changes
for this page. */ for this page. */
DBUG_LOG("ib_log", "apply skip "
<< get_mlog_string(recv->type)
<< " LSN " << recv->start_lsn << " < "
<< page_lsn);
} else if (recv->start_lsn < init_lsn) {
DBUG_LOG("ib_log", "init skip "
<< get_mlog_string(recv->type)
<< " LSN " << recv->start_lsn << " < "
<< init_lsn);
} else if (srv_was_tablespace_truncated( } else if (srv_was_tablespace_truncated(
fil_space_get(recv_addr->space)) fil_space_get(recv_addr->space))
&& recv->start_lsn && recv->start_lsn
...@@ -2207,6 +2355,7 @@ void recv_apply_hashed_log_recs(bool last_batch) ...@@ -2207,6 +2355,7 @@ void recv_apply_hashed_log_recs(bool last_batch)
case RECV_DISCARDED: case RECV_DISCARDED:
goto ignore; goto ignore;
case RECV_NOT_PROCESSED: case RECV_NOT_PROCESSED:
case RECV_WILL_NOT_READ:
break; break;
} }
...@@ -2220,19 +2369,95 @@ void recv_apply_hashed_log_recs(bool last_batch) ...@@ -2220,19 +2369,95 @@ void recv_apply_hashed_log_recs(bool last_batch)
const page_id_t page_id(recv_addr->space, const page_id_t page_id(recv_addr->space,
recv_addr->page_no); recv_addr->page_no);
mtr.start(); if (recv_addr->state == RECV_NOT_PROCESSED) {
mtr.set_log_mode(MTR_LOG_NONE); apply:
if (buf_block_t* block = buf_page_get_gen( mtr.start();
page_id, univ_page_size, RW_X_LATCH, mtr.set_log_mode(MTR_LOG_NONE);
NULL, BUF_GET_IF_IN_POOL, if (buf_block_t* block = buf_page_get_gen(
__FILE__, __LINE__, &mtr, NULL)) { page_id, univ_page_size,
buf_block_dbg_add_level( RW_X_LATCH, NULL,
block, SYNC_NO_ORDER_CHECK); BUF_GET_IF_IN_POOL,
recv_recover_page(block, mtr, recv_addr); __FILE__, __LINE__, &mtr, NULL)) {
ut_ad(mtr.has_committed()); buf_block_dbg_add_level(
block, SYNC_NO_ORDER_CHECK);
recv_recover_page(block, mtr,
recv_addr);
ut_ad(mtr.has_committed());
} else {
mtr.commit();
recv_read_in_area(page_id);
}
} else { } else {
mtr.commit(); mlog_init_t::init& i = mlog_init.last(page_id);
recv_read_in_area(page_id); const lsn_t end_lsn = UT_LIST_GET_LAST(
recv_addr->rec_list)->end_lsn;
if (end_lsn < i.lsn) {
DBUG_LOG("ib_log", "skip log for page "
<< page_id
<< " LSN " << end_lsn
<< " < " << i.lsn);
skip:
recv_addr->state = RECV_PROCESSED;
goto ignore;
}
fil_space_t* space = fil_space_acquire(
recv_addr->space);
if (!space) {
goto skip;
}
if (space->enable_lsn) {
do_read:
fil_space_release(space);
recv_addr->state = RECV_NOT_PROCESSED;
goto apply;
}
/* Determine if a tablespace could be
for an internal table for FULLTEXT INDEX.
For those tables, no MLOG_INDEX_LOAD record
used to be written when redo logging was
disabled. Hence, we cannot optimize
away page reads, because all the redo
log records for initializing and
modifying the page in the past could
be older than the page in the data
file.
The check is too broad, causing all
tables whose names start with FTS_ to
skip the optimization. */
if (strstr(space->name, "/FTS_")) {
goto do_read;
}
mtr.start();
mtr.set_log_mode(MTR_LOG_NONE);
buf_block_t* block = buf_page_create(
page_id, page_size_t(space->flags),
&mtr);
if (recv_addr->state == RECV_PROCESSED) {
/* The page happened to exist
in the buffer pool, or it was
just being read in. Before
buf_page_get_with_no_latch()
returned, all changes must have
been applied to the page already. */
mtr.commit();
} else {
i.created = true;
buf_block_dbg_add_level(
block, SYNC_NO_ORDER_CHECK);
mtr.x_latch_at_savepoint(0, block);
recv_recover_page(block, mtr,
recv_addr, i.lsn);
ut_ad(mtr.has_committed());
}
fil_space_release(space);
} }
} }
} }
...@@ -2240,7 +2465,13 @@ void recv_apply_hashed_log_recs(bool last_batch) ...@@ -2240,7 +2465,13 @@ void recv_apply_hashed_log_recs(bool last_batch)
/* Wait until all the pages have been processed */ /* Wait until all the pages have been processed */
while (recv_sys->n_addrs != 0) { while (recv_sys->n_addrs != 0) {
bool abort = recv_sys->found_corrupt_log; const bool abort = recv_sys->found_corrupt_log
|| recv_sys->found_corrupt_fs;
if (recv_sys->found_corrupt_fs && !srv_force_recovery) {
ib::info() << "Set innodb_force_recovery=1"
" to ignore corrupted pages.";
}
mutex_exit(&(recv_sys->mutex)); mutex_exit(&(recv_sys->mutex));
...@@ -2279,6 +2510,10 @@ void recv_apply_hashed_log_recs(bool last_batch) ...@@ -2279,6 +2510,10 @@ void recv_apply_hashed_log_recs(bool last_batch)
log_mutex_enter(); log_mutex_enter();
mutex_enter(&(recv_sys->mutex)); mutex_enter(&(recv_sys->mutex));
mlog_init.reset();
} else if (!recv_no_ibuf_operations) {
/* We skipped this in buf_page_create(). */
mlog_init.ibuf_merge(mtr);
} }
recv_sys->apply_log_recs = FALSE; recv_sys->apply_log_recs = FALSE;
...@@ -2480,9 +2715,17 @@ recv_report_corrupt_log( ...@@ -2480,9 +2715,17 @@ recv_report_corrupt_log(
} }
/** Report a MLOG_INDEX_LOAD operation. /** Report a MLOG_INDEX_LOAD operation.
@param[in] space_id tablespace identifier */ @param[in] space_id tablespace id
ATTRIBUTE_COLD static void recv_mlog_index_load(ulint space_id) @param[in] page_no page number
@param[in] lsn log sequence number */
ATTRIBUTE_COLD static void
recv_mlog_index_load(ulint space_id, ulint page_no, lsn_t lsn)
{ {
recv_spaces_t::iterator it = recv_spaces.find(space_id);
if (it != recv_spaces.end()) {
it->second.mlog_index_load(lsn);
}
if (log_optimized_ddl_op) { if (log_optimized_ddl_op) {
log_optimized_ddl_op(space_id); log_optimized_ddl_op(space_id);
} }
...@@ -2646,7 +2889,7 @@ bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply) ...@@ -2646,7 +2889,7 @@ bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply)
/* fall through */ /* fall through */
case MLOG_INDEX_LOAD: case MLOG_INDEX_LOAD:
if (type == MLOG_INDEX_LOAD) { if (type == MLOG_INDEX_LOAD) {
recv_mlog_index_load(space); recv_mlog_index_load(space, page_no, old_lsn);
} }
/* fall through */ /* fall through */
case MLOG_FILE_NAME: case MLOG_FILE_NAME:
...@@ -2800,7 +3043,7 @@ bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply) ...@@ -2800,7 +3043,7 @@ bool recv_parse_log_recs(lsn_t checkpoint_lsn, store_t store, bool apply)
break; break;
#endif /* UNIV_LOG_LSN_DEBUG */ #endif /* UNIV_LOG_LSN_DEBUG */
case MLOG_INDEX_LOAD: case MLOG_INDEX_LOAD:
recv_mlog_index_load(space); recv_mlog_index_load(space, page_no, old_lsn);
break; break;
case MLOG_FILE_NAME: case MLOG_FILE_NAME:
case MLOG_FILE_DELETE: case MLOG_FILE_DELETE:
...@@ -3344,6 +3587,7 @@ recv_init_crash_recovery_spaces(bool rescan, bool& missing_tablespace) ...@@ -3344,6 +3587,7 @@ recv_init_crash_recovery_spaces(bool rescan, bool& missing_tablespace)
/* The tablespace was found, and there /* The tablespace was found, and there
are some redo log records for it. */ are some redo log records for it. */
fil_names_dirty(i->second.space); fil_names_dirty(i->second.space);
i->second.space->enable_lsn = i->second.enable_lsn;
} else if (i->second.name == "") { } else if (i->second.name == "") {
ib::error() << "Missing MLOG_FILE_NAME" ib::error() << "Missing MLOG_FILE_NAME"
" or MLOG_FILE_DELETE" " or MLOG_FILE_DELETE"
......
...@@ -2227,7 +2227,8 @@ innobase_start_or_create_for_mysql() ...@@ -2227,7 +2227,8 @@ innobase_start_or_create_for_mysql()
recv_apply_hashed_log_recs(true); recv_apply_hashed_log_recs(true);
if (recv_sys->found_corrupt_log) { if (recv_sys->found_corrupt_log
|| recv_sys->found_corrupt_fs) {
return(srv_init_abort(DB_CORRUPTION)); return(srv_init_abort(DB_CORRUPTION));
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment