Commit 1e25202a authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-34830: LSN in the future is not being treated as serious corruption

The invariant of write-ahead logging is that before any change to a
page is written to the data file, the corresponding log record must
must first have been durably written.

On crash recovery, there were some sloppy checks for this. Let us
implement accurate checks and flag an inconsistency as a hard error,
so that we can avoid further corruption of a corrupted database.
For data extraction from the corrupted database, innodb_force_recovery=6
can be used.

A section of the test mariabackup.innodb_redo_overwrite
that is parsing some mariadb-backup --backup output has
been removed, because that output "redo log block is overwritten"
would often be missing in a Microsoft Windows environment
as a result of these changes.

recv_sys_t::max_page_lsn: Replaces recv_max_page_lsn.

recv_sys_t::early_batch: Whether apply(false) is executing.
Before the final recovery batch, we will not have read the
log records until the end and therefore will not know the final LSN.

recv_lsn_checks_on: Remove.

recv_sys_t::validate_checkpoint(): Validate the write-ahead-logging
condition at the end of the recovery. This includes validating
max_page_lsn in case a multi-batch recovery was executed.

recv_dblwr_t::validate_page(): Keep track of the maximum LSN
(if we are checking a non-doublewrite copy of a page) but
do not complain LSN being in the future. The doublewrite buffer
is a special case, because it will be read early during recovery.
Besides, starting with commit 762bcb81
the dblwr=true copies of pages may legitimately be "too new".

recv_sys_t::check_page_lsn(): Validate FIL_PAGE_LSN during recovery.
Update max_page_lsn if needed. Do not flag an error if early_batch.

recv_dblwr_t::find_page(): Find a valid page with the smallest
FIL_PAGE_LSN that is large enough for recovery. Invoke
recv_sys_t::check_page_lsn() on the chosen LSN so that
"LSN in the future" can be flagged.

recv_dblwr_t::restore_first_page(): Require the recv_sys.mutex
to be held by the caller, and return an error code.

buf_dblwr_t::recover(): Simplify the message output. Do attempt
doublewrite recovery on user page read error. Ignore doublewrite
pages whose FIL_PAGE_LSN is outside the usable bounds.

buf_page_is_corrupted(): Distinguish the return values
CORRUPTED_FUTURE_LSN and CORRUPTED_OTHER.

buf_page_check_corrupt(): Return the error code DB_CORRUPTION
in case the LSN is in the future.

Datafile::read_first_page(): Handle FSP_SPACE_FLAGS=0xffffffff
in the same way on both 32-bit and 64-bit architectures.
parent 9f0b1066
......@@ -523,7 +523,7 @@ is_page_corrupted(
normal method. */
if (is_encrypted && key_version != 0) {
is_corrupted = use_full_crc32
? buf_page_is_corrupted(true, buf, flags)
? !!buf_page_is_corrupted(false, buf, flags)
: !fil_space_verify_crypt_checksum(buf, zip_size);
if (is_corrupted && log_file) {
......
......@@ -8,8 +8,10 @@ INSERT INTO t2 VALUES(1);
# Corrupt the pages
SELECT * FROM t1;
ERROR 42000: Unknown storage engine 'InnoDB'
FOUND 1 /InnoDB: Page \[page id: space=[1-9][0-9]*, page number=3\] log sequence number 1311768467463790320 is in the future!/ in mysqld.1.err
SELECT * FROM t1;
Got one of the listed errors
a
1
SELECT * FROM t2;
a
1
......@@ -27,7 +29,7 @@ SET GLOBAL innodb_flush_log_at_trx_commit=1;
DELETE FROM t1 WHERE pk=3;
# Kill the server
disconnect con1;
# Corrupt the pages
# Corrupt the page
SELECT * FROM t1;
pk
1
......
......@@ -22,7 +22,7 @@ flush table t1 for export;
# Kill the server
# restart
FOUND 1 /InnoDB: Restoring page \[page id: space=[1-9][0-9]*, page number=0\] of datafile/ in mysqld.1.err
FOUND 1 /InnoDB: Recovered page \[page id: space=[1-9][0-9]*, page number=3\]/ in mysqld.1.err
# restart
XA ROLLBACK 'x';
check table t1;
Table Op Msg_type Msg_text
......@@ -44,7 +44,7 @@ connection default;
flush table t1 for export;
# Kill the server
# restart
FOUND 1 /InnoDB: Restoring page \[page id: space=[1-9][0-9]*, page number=0\] of datafile/ in mysqld.1.err
FOUND 2 /InnoDB: (Restoring|Recovering) page \[page id: space=[1-9][0-9]*, page number=[03]\]/ in mysqld.1.err
XA ROLLBACK 'x';
check table t1;
Table Op Msg_type Msg_text
......
......@@ -5,14 +5,19 @@ call mtr.add_suppression("InnoDB: Plugin initialization aborted");
call mtr.add_suppression("Plugin 'InnoDB' init function returned error");
call mtr.add_suppression("Plugin 'InnoDB' registration as a STORAGE ENGINE failed");
call mtr.add_suppression("InnoDB: Database page corruption on disk or a failed read of file '.*test.t1\\.ibd' page");
call mtr.add_suppression("InnoDB: Failed to read page 3 from file '.*test.t1\\.ibd': Page read from tablespace is corrupted.");
call mtr.add_suppression("InnoDB: Failed to read page 3 from file '.*test.t1\\.ibd': Data structure corruption");
call mtr.add_suppression("InnoDB: (Unable to apply log to|Discarding log for) corrupted page .*, page number=3\\]");
call mtr.add_suppression("InnoDB: Table `test`.`t1` is corrupted. Please drop the table and recreate.");
call mtr.add_suppression("InnoDB: File '.*test/t1\\.ibd' is corrupted");
call mtr.add_suppression("InnoDB: A long wait .* was observed for dict_sys");
call mtr.add_suppression("InnoDB: Page \\[page id: space=[1-9][0-9]*, page number=3\\] log sequence number 1311768467463790320 is in the future!");
call mtr.add_suppression("InnoDB: Your database may be corrupt");
call mtr.add_suppression("InnoDB: MySQL-8\\.0 tablespace in .*test/t2\\.ibd");
call mtr.add_suppression("InnoDB: Restart in MySQL for migration/recovery\\.");
--enable_query_log
let INNODB_PAGE_SIZE=`select @@innodb_page_size`;
let ALGO=`select @@innodb_checksum_algorithm`;
CREATE TABLE t1(a BIGINT PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES(1);
# Force a redo log checkpoint.
......@@ -30,15 +35,32 @@ INSERT INTO t2 VALUES(1);
--echo # Corrupt the pages
perl;
do "$ENV{MTR_SUITE_DIR}/include/crc32.pl";
my $polynomial = 0x82f63b78; # CRC-32C
my $algo = $ENV{ALGO};
my $ps = $ENV{INNODB_PAGE_SIZE};
my $file = "$ENV{MYSQLD_DATADIR}/test/t1.ibd";
open(FILE, "+<$file") || die "Unable to open $file";
binmode FILE;
sysseek(FILE, 3*$ps, 0) || die "Unable to seek $file\n";
die "Unable to read $file" unless sysread(FILE, $page, $ps) == $ps;
# Replace the a=1 with a=0.
$page =~ s/\x80\x0\x0\x0\x0\x0\x0\x1/\x80\x0\x0\x0\x0\x0\x0\x0/;
# Assign a future FIL_PAGE_LSN
substr($page, 16, 8) = pack("NN", 0x12345678, 0x9abcdef0);
substr($page, $ps - 8, 8) = pack("NN", 0x9abcdef0, 0x9abcdef0);
if ($algo =~ /full_crc32/)
{
my $ck = mycrc32(substr($page, 0, $ps - 4), 0, $polynomial);
substr($page, $ps - 4, 4) = pack("N", $ck);
}
else
{
# Replace the innodb_checksum_algorithm=crc32 checksum
my $ck= pack("N",
mycrc32(substr($page, 4, 22), 0, $polynomial) ^
mycrc32(substr($page, 38, $ps - 38 - 8), 0, $polynomial));
substr ($page, 0, 4) = $ck;
substr ($page, $ps - 8, 4) = $ck;
}
sysseek(FILE, 3*$ps, 0) || die "Unable to seek $file\n";
syswrite(FILE, $page, $ps)==$ps || die "Unable to write $file\n";
close FILE or die "close";
......@@ -46,20 +68,23 @@ close FILE or die "close";
$file = "$ENV{MYSQLD_DATADIR}/test/t2.ibd";
open(FILE, "+<$file") || die "Unable to open $file";
binmode FILE;
# Corrupt pages 1 to 3. MLOG_INIT_FILE_PAGE2 should protect us!
# Unfortunately, we are not immune to page 0 corruption.
seek (FILE, $ps, SEEK_SET) or die "seek";
print FILE chr(0xff) x ($ps * 3);
# Corrupt pages 0 to 3. INIT_PAGE should protect us!
print FILE chr(0xff) x ($ps * 4);
close FILE or die "close";
EOF
--source include/start_mysqld.inc
--error ER_UNKNOWN_STORAGE_ENGINE
SELECT * FROM t1;
let SEARCH_FILE= $MYSQLTEST_VARDIR/log/mysqld.1.err;
let SEARCH_PATTERN=InnoDB: Page \\[page id: space=[1-9][0-9]*, page number=3\\] log sequence number 1311768467463790320 is in the future!;
--source include/search_pattern_in_file.inc
let $restart_parameters=--innodb_force_recovery=1;
--source include/restart_mysqld.inc
--error ER_NO_SUCH_TABLE_IN_ENGINE,ER_TABLE_CORRUPT
--error 0,ER_NO_SUCH_TABLE_IN_ENGINE
SELECT * FROM t1;
SELECT * FROM t2;
CHECK TABLE t2;
......@@ -81,13 +106,36 @@ DELETE FROM t1 WHERE pk=3;
--source ../include/no_checkpoint_end.inc
disconnect con1;
--echo # Corrupt the pages
--echo # Corrupt the page
perl;
do "$ENV{MTR_SUITE_DIR}/include/crc32.pl";
my $polynomial = 0x82f63b78; # CRC-32C
my $algo = $ENV{ALGO};
my $ps = $ENV{INNODB_PAGE_SIZE};
my $file = "$ENV{MYSQLD_DATADIR}/test/t1.ibd";
open(FILE, "+<$file") || die "Unable to open $file";
binmode FILE;
seek (FILE, $ENV{INNODB_PAGE_SIZE} * 3, SEEK_SET) or die "seek";
print FILE "junk";
sysseek(FILE, $ps * 3, SEEK_SET) or die "seek";
sysread(FILE, $page, $ps)==$ps||die "Unable to read $file\n";
# Set FIL_PAGE_LSN to the maximum
substr($page, 16, 8) = chr(255) x 8;
substr($page, $ps - 8, 8) = chr(255) x 8;
if ($algo =~ /full_crc32/)
{
my $ck = mycrc32(substr($page, 0, $ps - 4), 0, $polynomial);
substr($page, $ps - 4, 4) = pack("N", $ck);
}
else
{
# Replace the innodb_checksum_algorithm=crc32 checksum
my $ck= pack("N",
mycrc32(substr($page, 4, 22), 0, $polynomial) ^
mycrc32(substr($page_, 38, $ps - 38 - 8), 0, $polynomial));
substr ($page, 0, 4) = $ck;
substr ($page, $ps - 8, 4) = $ck;
}
sysseek(FILE, $ps * 3, SEEK_SET) or die "seek";
syswrite(FILE, $page);
close FILE or die "close";
EOF
--source include/start_mysqld.inc
......
......@@ -18,6 +18,8 @@ call mtr.add_suppression("InnoDB: Checksum mismatch in datafile: ");
call mtr.add_suppression("InnoDB: Inconsistent tablespace ID in .*t1\\.ibd");
call mtr.add_suppression("\\[Warning\\] Found 1 prepared XA transactions");
call mtr.add_suppression("InnoDB: Header page consists of zero bytes in datafile:");
call mtr.add_suppression("InnoDB: The log was only scanned up to \\d+, while the current LSN at the time of the latest checkpoint \\d+ was 0 and the maximum LSN on a data page was 18446744073709551615!");
call mtr.add_suppression("InnoDB: Plugin initialization aborted");
--enable_query_log
let INNODB_PAGE_SIZE=`select @@innodb_page_size`;
......@@ -74,7 +76,26 @@ syswrite(FILE, chr(0) x ($page_size/2));
sysseek(FILE, 3*$page_size, 0);
sysread(FILE, $page, $page_size)==$page_size||die "Unable to read $name\n";
sysseek(FILE, 3*$page_size, 0)||die "Unable to seek $fname\n";
syswrite(FILE, chr(0) x ($page_size/2));
my $corrupted = $page;
# Set FIL_PAGE_LSN to the maximum
substr($corrupted, 16, 8) = chr(255) x 8;
substr($corrupted, $page_size - 8, 8) = chr(255) x 8;
if ($algo =~ /full_crc32/)
{
my $ck = mycrc32(substr($corrupted, 0, $page_size - 4), 0, $polynomial);
substr($corrupted, $page_size - 4, 4) = pack("N", $ck);
}
else
{
# Replace the innodb_checksum_algorithm=crc32 checksum
my $ck= pack("N",
mycrc32(substr($corrupted, 4, 22), 0, $polynomial) ^
mycrc32(substr($corrupted_, 38, $page_size - 38 - 8), 0,
$polynomial));
substr ($corrupted, 0, 4) = $ck;
substr ($corrupted, $page_size - 8, 4) = $ck;
}
syswrite(FILE, $corrupted);
close FILE;
# Change the flag offset of page 0 in doublewrite buffer
......@@ -118,8 +139,26 @@ EOF
--source include/start_mysqld.inc
let SEARCH_PATTERN=InnoDB: Restoring page \[page id: space=[1-9][0-9]*, page number=0\] of datafile;
--source include/search_pattern_in_file.inc
let SEARCH_PATTERN=InnoDB: Recovered page \[page id: space=[1-9][0-9]*, page number=3\];
let SEARCH_PATTERN=InnoDB: The log was only scanned up to \\d+, while the current LSN at the time of the latest checkpoint \\d+ was 0 and the maximum LSN on a data page was 18446744073709551615!
--source include/search_pattern_in_file.inc
--error ER_XAER_NOTA
XA ROLLBACK 'x';
let $shutdown_timeout=0;
--source include/shutdown_mysqld.inc
let $shutdown_timeout=;
# Corrupt the file in a better way.
perl;
use IO::Handle;
my $fname= "$ENV{'MYSQLD_DATADIR'}test/t1.ibd";
my $page_size = $ENV{INNODB_PAGE_SIZE};
open(FILE, "+<", $fname) or die;
sysseek(FILE, ($page_size/2), 0);
syswrite(FILE, chr(0) x ($page_size/2));
sysseek(FILE, 3*$page_size, 0);
syswrite(FILE, chr(0) x ($page_size/2));
close FILE;
EOF
--source include/start_mysqld.inc
XA ROLLBACK 'x';
check table t1;
select f1, f2 from t1;
......@@ -147,7 +186,7 @@ close FILE;
EOF
--source include/start_mysqld.inc
let SEARCH_PATTERN=InnoDB: Restoring page \[page id: space=[1-9][0-9]*, page number=0\] of datafile;
let SEARCH_PATTERN=InnoDB: (Restoring|Recovering) page \[page id: space=[1-9][0-9]*, page number=[03]\];
--source include/search_pattern_in_file.inc
XA ROLLBACK 'x';
check table t1;
......
......@@ -22,6 +22,5 @@ INSERT INTO t SELECT * FROM t;
INSERT INTO t SELECT * FROM t;
INSERT INTO t SELECT * FROM t;
# xtrabackup backup
FOUND 1 /failed: redo log block is overwritten/ in backup.log
FOUND 1 /failed: redo log block checksum does not match/ in backup.log
DROP TABLE t;
......@@ -31,13 +31,9 @@ INSERT INTO t SELECT * FROM t;
--disable_result_log
--error 1
--exec $XTRABACKUP --defaults-file=$MYSQLTEST_VARDIR/my.cnf --backup --target-dir=$targetdir --dbug=+d,mariabackup_events > $backuplog
--exec $XTRABACKUP --defaults-file=$MYSQLTEST_VARDIR/my.cnf --backup --target-dir=$targetdir --dbug=+d,mariabackup_events
--enable_result_log
--let SEARCH_PATTERN=failed: redo log block is overwritten
--let SEARCH_FILE=$backuplog
--source include/search_pattern_in_file.inc
--remove_file $backuplog
--rmdir $targetdir
--let before_innodb_log_copy_thread_started=INSERT INTO test.t VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), (9)
......
......@@ -510,43 +510,55 @@ buf_page_is_checksum_valid_crc32(
return checksum_field1 == crc32;
}
#ifndef UNIV_INNOCHECKSUM
/** Checks whether the lsn present in the page is lesser than the
peek current lsn.
@param[in] check_lsn lsn to check
@param[in] read_buf page. */
static void buf_page_check_lsn(bool check_lsn, const byte* read_buf)
@param check_lsn lsn to check
@param read_buf page frame
@return whether the FIL_PAGE_LSN is invalid */
static bool buf_page_check_lsn(bool check_lsn, const byte *read_buf)
{
#ifndef UNIV_INNOCHECKSUM
if (check_lsn && recv_lsn_checks_on) {
const lsn_t current_lsn = log_sys.get_lsn();
const lsn_t page_lsn
= mach_read_from_8(read_buf + FIL_PAGE_LSN);
if (!check_lsn)
return false;
lsn_t current_lsn= log_sys.get_lsn();
if (UNIV_UNLIKELY(current_lsn == LOG_START_LSN + LOG_BLOCK_HDR_SIZE) &&
srv_force_recovery == SRV_FORCE_NO_LOG_REDO)
return false;
const lsn_t page_lsn= mach_read_from_8(read_buf + FIL_PAGE_LSN);
/* Since we are going to reset the page LSN during the import
phase it makes no sense to spam the log with error messages. */
if (current_lsn < page_lsn) {
if (UNIV_LIKELY(current_lsn >= page_lsn))
return false;
mysql_mutex_lock(&recv_sys.mutex);
current_lsn= recv_sys.check_page_lsn(page_lsn);
mysql_mutex_unlock(&recv_sys.mutex);
if (!current_lsn)
return false;
const uint32_t space_id= mach_read_from_4(read_buf + FIL_PAGE_SPACE_ID);
const uint32_t page_no= mach_read_from_4(read_buf + FIL_PAGE_OFFSET);
const uint32_t space_id = mach_read_from_4(
read_buf + FIL_PAGE_SPACE_ID);
const uint32_t page_no = mach_read_from_4(
read_buf + FIL_PAGE_OFFSET);
sql_print_error("InnoDB: Page "
"[page id: space=" UINT32PF ", page number=" UINT32PF "]"
" log sequence number " LSN_PF
" is in the future! Current system log sequence number "
LSN_PF ".",
space_id, page_no, page_lsn, current_lsn);
ib::error() << "Page " << page_id_t(space_id, page_no)
<< " log sequence number " << page_lsn
<< " is in the future! Current system"
<< " log sequence number "
<< current_lsn << ".";
if (srv_force_recovery)
return false;
ib::error() << "Your database may be corrupt or"
sql_print_error("InnoDB: Your database may be corrupt or"
" you may have copied the InnoDB"
" tablespace but not the InnoDB"
" log files. "
<< FORCE_RECOVERY_MSG;
" tablespace but not the ib_logfile0. %s",
FORCE_RECOVERY_MSG);
}
}
#endif /* !UNIV_INNOCHECKSUM */
return true;
}
#endif
/** Check if a buffer is all zeroes.
......@@ -563,7 +575,7 @@ bool buf_is_zeroes(span<const byte> buf)
@param[in] read_buf database page
@param[in] fsp_flags tablespace flags
@return whether the page is corrupted */
bool
buf_page_is_corrupted_reason
buf_page_is_corrupted(
bool check_lsn,
const byte* read_buf,
......@@ -574,14 +586,14 @@ buf_page_is_corrupted(
const uint size = buf_page_full_crc32_size(
read_buf, &compressed, &corrupted);
if (corrupted) {
return true;
return CORRUPTED_OTHER;
}
const byte* end = read_buf + (size - FIL_PAGE_FCRC32_CHECKSUM);
uint crc32 = mach_read_from_4(end);
if (!crc32 && size == srv_page_size
&& buf_is_zeroes(span<const byte>(read_buf, size))) {
return false;
return NOT_CORRUPTED;
}
DBUG_EXECUTE_IF(
......@@ -594,7 +606,7 @@ buf_page_is_corrupted(
if (crc32 != ut_crc32(read_buf,
size - FIL_PAGE_FCRC32_CHECKSUM)) {
return true;
return CORRUPTED_OTHER;
}
static_assert(FIL_PAGE_FCRC32_KEY_VERSION == 0, "alignment");
static_assert(FIL_PAGE_LSN % 4 == 0, "alignment");
......@@ -606,11 +618,15 @@ buf_page_is_corrupted(
end - (FIL_PAGE_FCRC32_END_LSN
- FIL_PAGE_FCRC32_CHECKSUM),
4)) {
return true;
return CORRUPTED_OTHER;
}
buf_page_check_lsn(check_lsn, read_buf);
return false;
return
#ifndef UNIV_INNOCHECKSUM
buf_page_check_lsn(check_lsn, read_buf)
? CORRUPTED_FUTURE_LSN :
#endif
NOT_CORRUPTED;
}
const ulint zip_size = fil_space_t::zip_size(fsp_flags);
......@@ -631,7 +647,13 @@ buf_page_is_corrupted(
&& FSP_FLAGS_HAS_PAGE_COMPRESSION(fsp_flags)
#endif
) {
return(false);
check_lsn:
return
#ifndef UNIV_INNOCHECKSUM
buf_page_check_lsn(check_lsn, read_buf)
? CORRUPTED_FUTURE_LSN :
#endif
NOT_CORRUPTED;
}
static_assert(FIL_PAGE_LSN % 4 == 0, "alignment");
......@@ -644,15 +666,16 @@ buf_page_is_corrupted(
/* Stored log sequence numbers at the start and the end
of page do not match */
return(true);
return CORRUPTED_OTHER;
}
buf_page_check_lsn(check_lsn, read_buf);
/* Check whether the checksum fields have correct values */
if (zip_size) {
return !page_zip_verify_checksum(read_buf, zip_size);
if (!page_zip_verify_checksum(read_buf, zip_size)) {
return CORRUPTED_OTHER;
}
goto check_lsn;
}
const uint32_t checksum_field1 = mach_read_from_4(
......@@ -689,7 +712,7 @@ buf_page_is_corrupted(
}
if (all_zeroes) {
return false;
return NOT_CORRUPTED;
}
}
......@@ -698,13 +721,17 @@ buf_page_is_corrupted(
case SRV_CHECKSUM_ALGORITHM_STRICT_FULL_CRC32:
case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
#endif /* !UNIV_INNOCHECKSUM */
return !buf_page_is_checksum_valid_crc32(
read_buf, checksum_field1, checksum_field2);
if (!buf_page_is_checksum_valid_crc32(read_buf,
checksum_field1,
checksum_field2)) {
return CORRUPTED_OTHER;
}
goto check_lsn;
#ifndef UNIV_INNOCHECKSUM
default:
if (checksum_field1 == BUF_NO_CHECKSUM_MAGIC
&& checksum_field2 == BUF_NO_CHECKSUM_MAGIC) {
return false;
goto check_lsn;
}
const uint32_t crc32 = buf_calc_page_crc32(read_buf);
......@@ -722,27 +749,33 @@ buf_page_is_corrupted(
DBUG_EXECUTE_IF(
"page_intermittent_checksum_mismatch", {
static int page_counter;
if (page_counter++ == 3) return true;
if (page_counter++ == 3)
return CORRUPTED_OTHER;
});
if ((checksum_field1 != crc32
|| checksum_field2 != crc32)
&& checksum_field2
!= buf_calc_page_old_checksum(read_buf)) {
return true;
return CORRUPTED_OTHER;
}
}
switch (checksum_field1) {
case 0:
case BUF_NO_CHECKSUM_MAGIC:
return false;
}
return (checksum_field1 != crc32 || checksum_field2 != crc32)
break;
default:
if ((checksum_field1 != crc32
|| checksum_field2 != crc32)
&& checksum_field1
!= buf_calc_page_new_checksum(read_buf);
!= buf_calc_page_new_checksum(read_buf)) {
return CORRUPTED_OTHER;
}
}
}
#endif /* !UNIV_INNOCHECKSUM */
goto check_lsn;
}
#ifndef UNIV_INNOCHECKSUM
......@@ -3599,6 +3632,7 @@ or decrypt/decompress just failed.
@return whether the operation succeeded
@retval DB_SUCCESS if page has been read and is not corrupted
@retval DB_PAGE_CORRUPTED if page based on checksum check is corrupted
@retval DB_CORRUPTION if the page LSN is in the future
@retval DB_DECRYPTION_FAILED if page post encryption checksum matches but
after decryption normal page checksum does not match. */
static dberr_t buf_page_check_corrupt(buf_page_t *bpage,
......@@ -3635,8 +3669,18 @@ static dberr_t buf_page_check_corrupt(buf_page_t *bpage,
node.space->is_compressed())) {
err = DB_PAGE_CORRUPTED;
}
} else if (buf_page_is_corrupted(true, dst_frame, node.space->flags)) {
} else {
switch (buf_page_is_corrupted(true, dst_frame,
node.space->flags)) {
case NOT_CORRUPTED:
break;
case CORRUPTED_OTHER:
err = DB_PAGE_CORRUPTED;
break;
case CORRUPTED_FUTURE_LSN:
err = DB_CORRUPTION;
break;
}
}
if (seems_encrypted && err == DB_PAGE_CORRUPTED
......@@ -3733,20 +3777,23 @@ dberr_t buf_page_t::read_complete(const fil_node_t &node)
if (belongs_to_unzip_LRU())
memset_aligned<UNIV_PAGE_SIZE_MIN>(frame, 0, srv_page_size);
if (err == DB_PAGE_CORRUPTED)
{
ib::error() << "Database page corruption on disk"
" or a failed read of file '"
<< node.name << "' page " << expected_id
<< ". You may have to recover from a backup.";
switch (err) {
default:
break;
case DB_PAGE_CORRUPTED:
sql_print_error("InnoDB: Database page corruption on disk"
" or a failed read of file '%s' page "
"[page id: space=" UINT32PF ", page number=" UINT32PF
"]. You may have to recover from a backup.",
node.name, expected_id.space(), expected_id.page_no());
buf_page_print(read_frame, zip_size());
sql_print_information("InnoDB: You can use CHECK TABLE to scan"
" your table for corruption. %s",
FORCE_RECOVERY_MSG);
/* fall through */
case DB_CORRUPTION:
node.space->set_corrupted();
ib::info() << " You can use CHECK TABLE to scan"
" your table for corruption. "
<< FORCE_RECOVERY_MSG;
}
if (!srv_force_recovery)
......
......@@ -33,6 +33,7 @@ Created 2011/12/19
#include "trx0sys.h"
#include "fil0crypt.h"
#include "fil0pagecompress.h"
#include "log.h"
using st_::span;
......@@ -365,25 +366,17 @@ void buf_dblwr_t::recover()
for (recv_dblwr_t::list::iterator i= recv_sys.dblwr.pages.begin();
i != recv_sys.dblwr.pages.end(); ++i, ++page_no_dblwr)
{
byte *page= *i;
const uint32_t page_no= page_get_page_no(page);
const uint32_t page_no= page_get_page_no(*i);
if (!page_no) /* recovered via recv_dblwr_t::restore_first_page() */
continue;
const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
if (recv_sys.parse_start_lsn > lsn)
/* Pages written before the checkpoint are not useful for recovery. */
const lsn_t lsn= mach_read_from_8(*i + FIL_PAGE_LSN);
if (recv_sys.parse_start_lsn > lsn || lsn > recv_sys.scanned_lsn)
/* Pages written before or after the recovery range are not usable. */
continue;
const ulint space_id= page_get_space_id(page);
const uint32_t space_id= page_get_space_id(*i);
const page_id_t page_id(space_id, page_no);
if (recv_sys.scanned_lsn < lsn)
{
ib::info() << "Ignoring a doublewrite copy of page " << page_id
<< " with future log sequence number " << lsn;
continue;
}
fil_space_t *space= fil_space_t::get(space_id);
if (!space)
......@@ -395,17 +388,21 @@ void buf_dblwr_t::recover()
/* Do not report the warning for undo tablespaces, because they
can be truncated in place. */
if (!srv_is_undo_tablespace(space_id))
ib::warn() << "A copy of page " << page_no
<< " in the doublewrite buffer slot " << page_no_dblwr
<< " is beyond the end of " << space->chain.start->name
<< " (" << space->size << " pages)";
sql_print_warning("InnoDB: A copy of page "
"[page id: space=" UINT32PF
", page number=" UINT32PF "]"
" in the doublewrite buffer slot " UINT32PF
" is beyond the end of %s (" UINT32PF " pages)",
page_id.space(), page_id.page_no(),
page_no_dblwr, space->chain.start->name,
space->size);
next_page:
space->release();
continue;
}
const ulint physical_size= space->physical_size();
ut_ad(!buf_is_zeroes(span<const byte>(page, physical_size)));
ut_ad(!buf_is_zeroes(span<const byte>(*i, physical_size)));
/* We want to ensure that for partial reads the unread portion of
the page is NUL. */
......@@ -417,14 +414,13 @@ void buf_dblwr_t::recover()
physical_size, read_buf);
if (UNIV_UNLIKELY(fio.err != DB_SUCCESS))
{
ib::warn() << "Double write buffer recovery: " << page_id
<< " ('" << space->chain.start->name
<< "') read failed with error: " << fio.err;
continue;
}
if (buf_is_zeroes(span<const byte>(read_buf, physical_size)))
sql_print_warning("InnoDB: Double write buffer recovery: "
"[page id: space=" UINT32PF
", page number=" UINT32PF "]"
" ('%s') read failed with error: %s",
page_id.space(), page_id.page_no(), fio.node->name,
ut_strerr(fio.err));
else if (buf_is_zeroes(span<const byte>(read_buf, physical_size)))
{
/* We will check if the copy in the doublewrite buffer is
valid. If not, we will ignore this page (there should be redo
......@@ -434,14 +430,14 @@ void buf_dblwr_t::recover()
goto next_page;
else
/* We intentionally skip this message for all-zero pages. */
ib::info() << "Trying to recover page " << page_id
<< " from the doublewrite buffer.";
page= recv_sys.dblwr.find_page(page_id, space, buf);
if (!page)
goto next_page;
sql_print_information("InnoDB: Trying to recover page "
"[page id: space=" UINT32PF
", page number=" UINT32PF "]"
" from the doublewrite buffer.",
page_id.space(), page_id.page_no());
if (byte *page= recv_sys.dblwr.find_page(page_id, space, buf))
{
/* Write the good page from the doublewrite buffer to the intended
position. */
space->reacquire();
......@@ -450,8 +446,14 @@ void buf_dblwr_t::recover()
physical_size, page);
if (fio.err == DB_SUCCESS)
ib::info() << "Recovered page " << page_id << " to '" << fio.node->name
<< "' from the doublewrite buffer.";
sql_print_information("InnoDB: Recovered page "
"[page id: space=" UINT32PF
", page number=" UINT32PF "]"
" to '%s' from the doublewrite buffer.",
page_id.space(), page_id.page_no(),
fio.node->name);
}
goto next_page;
}
......
......@@ -305,7 +305,7 @@ Datafile::read_first_page(bool read_only_mode)
m_flags = fsp_header_get_flags(m_first_page);
if (!fil_space_t::is_valid_flags(m_flags, m_space_id)) {
ulint cflags = fsp_flags_convert_from_101(m_flags);
if (cflags == ULINT_UNDEFINED) {
if (unsigned(cflags) == ~0U) {
switch (fsp_flags_is_incompatible_mysql(m_flags)) {
case 0:
sql_print_error("InnoDB: Invalid flags 0x%zx in %s",
......@@ -571,8 +571,13 @@ dberr_t Datafile::validate_first_page(lsn_t *flush_lsn)
goto err_exit;
}
if (buf_page_is_corrupted(false, m_first_page, m_flags)) {
/* Look for checksum and other corruptions. */
switch (buf_page_is_corrupted(false, m_first_page, m_flags)) {
case NOT_CORRUPTED:
break;
case CORRUPTED_FUTURE_LSN:
error_txt = "LSN is in the future";
goto err_exit;
case CORRUPTED_OTHER:
error_txt = "Checksum mismatch";
goto err_exit;
}
......
......@@ -590,9 +590,11 @@ SysTablespace::read_lsn_and_check_flags(lsn_t* flushed_lsn)
err = it->validate_first_page(flushed_lsn);
if (err != DB_SUCCESS) {
if (recv_sys.dblwr.restore_first_page(
it->m_space_id, it->m_filepath,
it->handle())) {
mysql_mutex_lock(&recv_sys.mutex);
err = recv_sys.dblwr.restore_first_page(
it->m_space_id, it->m_filepath, it->handle());
mysql_mutex_unlock(&recv_sys.mutex);
if (err != DB_SUCCESS) {
it->close();
return(err);
}
......
......@@ -308,12 +308,20 @@ buf_block_modify_clock_inc(
@return whether the buffer is all zeroes */
bool buf_is_zeroes(st_::span<const byte> buf);
/** Reason why buf_page_is_corrupted() fails */
enum buf_page_is_corrupted_reason
{
CORRUPTED_FUTURE_LSN= -1,
NOT_CORRUPTED= 0,
CORRUPTED_OTHER
};
/** Check if a page is corrupt.
@param[in] check_lsn whether the LSN should be checked
@param[in] read_buf database page
@param[in] fsp_flags tablespace flags
@return whether the page is corrupted */
bool
buf_page_is_corrupted_reason
buf_page_is_corrupted(
bool check_lsn,
const byte* read_buf,
......
......@@ -131,28 +131,32 @@ struct recv_dblwr_t
@param space the tablespace of the page (not available for page 0)
@param tmp_buf 2*srv_page_size for decrypting and decompressing any
page_compressed or encrypted pages
@param dblwr whether this is the doublewrite copy of the page
@return whether the page is valid */
bool validate_page(const page_id_t page_id, const byte *page,
const fil_space_t *space, byte *tmp_buf);
const fil_space_t *space, byte *tmp_buf,
bool dblwr= false);
/** Find a doublewrite copy of a page.
/** Find a doublewrite copy of a page with the smallest FIL_PAGE_LSN
that is large enough for recovery.
@param page_id page identifier
@param space tablespace (not available for page_id.page_no()==0)
@param space tablespace (nullptr for page_id.page_no()==0)
@param tmp_buf 2*srv_page_size for decrypting and decompressing any
page_compressed or encrypted pages
@return page frame
@retval NULL if no valid page for page_id was found */
byte* find_page(const page_id_t page_id, const fil_space_t *space= NULL,
byte *tmp_buf= NULL);
byte* find_page(const page_id_t page_id, const fil_space_t *space= nullptr,
byte *tmp_buf= nullptr);
/** Restore the first page of the given tablespace from
doublewrite buffer.
@param space_id tablespace identifier
@param name tablespace filepath
@param file tablespace file handle
@return whether the operation failed */
bool restore_first_page(
ulint space_id, const char *name, pfs_os_file_t file);
@return error code
@retval DB_SUCCESS if the page could be recovered */
dberr_t restore_first_page(ulint space_id, const char *name,
pfs_os_file_t file);
/** Restore the first page of the given tablespace from
doublewrite buffer.
......@@ -260,6 +264,10 @@ struct recv_sys_t
/** set when an inconsistency with the file system contents is detected
during log scan or apply */
bool found_corrupt_fs;
/** whether apply(false) is executing */
bool early_batch;
/** the maximum FIL_PAGE_LSN read during recovery */
lsn_t max_page_lsn;
public:
/** whether we are applying redo log records during crash recovery */
bool recovery_on;
......@@ -470,6 +478,18 @@ struct recv_sys_t
/** @return whether log file corruption was found */
bool is_corrupt_log() const { return UNIV_UNLIKELY(found_corrupt_log); }
/** Check if a FIL_PAGE_LSN is valid during recovery.
@param lsn the FIL_PAGE_LSN
@return the current log sequence number
@retval 0 if the current log sequence number is unknown */
ATTRIBUTE_COLD lsn_t check_page_lsn(lsn_t lsn);
/** Check if recovery reached a consistent log sequence number.
@param start_lsn the checkpoint LSN
@param end_lsn the end LSN of the FILE_CHECKPOINT mini-transaction
@return whether the recovery failed to process enough log */
inline bool validate_checkpoint(lsn_t start_lsn, lsn_t end_lsn) const;
/** Attempt to initialize a page based on redo log records.
@param page_id page identifier
@return the recovered block
......@@ -512,11 +532,6 @@ Protected by log_sys.mutex. */
extern bool recv_no_log_write;
#endif /* UNIV_DEBUG */
/** TRUE if buf_page_is_corrupted() should check if the log sequence
number (FIL_PAGE_LSN) is in the future. Initially FALSE, and set by
recv_recovery_from_checkpoint_start(). */
extern bool recv_lsn_checks_on;
/** Size of the parsing buffer; it must accommodate RECV_SCAN_SIZE many
times! */
#define RECV_PARSING_BUF_SIZE (2U << 20)
......
......@@ -66,11 +66,6 @@ Protected by log_sys.mutex. */
bool recv_no_log_write = false;
#endif /* UNIV_DEBUG */
/** TRUE if buf_page_is_corrupted() should check if the log sequence
number (FIL_PAGE_LSN) is in the future. Initially FALSE, and set by
recv_recovery_from_checkpoint_start(). */
bool recv_lsn_checks_on;
/** If the following is TRUE, the buffer pool file pages must be invalidated
after recovery and no ibuf operations are allowed; this becomes TRUE if
the log record hash table becomes too full, and log records must be merged
......@@ -82,11 +77,6 @@ true means that recovery is running and no operations on the log file
are allowed yet: the variable name is misleading. */
bool recv_no_ibuf_operations;
/** The maximum lsn we see for a page during the recovery process. If this
is bigger than the lsn we are able to scan up to, that is an indication that
the recovery failed and the database may be corrupt. */
static lsn_t recv_max_page_lsn;
/** Stored physical log record */
struct log_phys_t : public log_rec_t
{
......@@ -1472,12 +1462,13 @@ void recv_sys_t::create()
recovered_lsn = 0;
found_corrupt_log = false;
found_corrupt_fs = false;
early_batch = false;
max_page_lsn= 0;
mlog_checkpoint_lsn = 0;
progress_time = time(NULL);
ut_ad(pages.empty());
pages_it = pages.end();
recv_max_page_lsn = 0;
memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces);
last_stored_lsn = 1;
......@@ -2233,6 +2224,25 @@ ATTRIBUTE_COLD void recv_sys_t::wait_for_pool(size_t pages)
buf_flush_sync_batch(recovered_lsn);
}
/** Check if a FIL_PAGE_LSN is valid during recovery.
@param lsn the FIL_PAGE_LSN
@return the current log sequence number
@retval 0 if the current log sequence number is unknown */
ATTRIBUTE_COLD lsn_t recv_sys_t::check_page_lsn(lsn_t lsn)
{
ut_ad(lsn);
mysql_mutex_assert_owner(&mutex);
if (recovered_lsn >= lsn)
lsn= 0;
else
{
if (lsn >= max_page_lsn)
max_page_lsn= lsn;
lsn= early_batch ? 0 : recovered_lsn;
}
return lsn;
}
/** Register a redo log snippet for a page.
@param it page iterator
@param start_lsn start LSN of the mini-transaction
......@@ -3117,8 +3127,7 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
buf_pool.corrupted_evict(&block->page,
block->page.state() &
buf_page_t::LRU_MASK);
block = nullptr;
goto done;
return nullptr;
}
if (!start_lsn) {
......@@ -3157,12 +3166,6 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
mtr.discard_modifications();
mtr.commit();
done:
/* FIXME: do this in page read, protected with recv_sys.mutex! */
if (recv_max_page_lsn < page_lsn) {
recv_max_page_lsn = page_lsn;
}
return block;
}
......@@ -3682,6 +3685,7 @@ void recv_sys_t::apply(bool last_batch)
DBUG_ASSERT(!last_batch == mysql_mutex_is_owner(&log_sys.mutex));
#endif /* SAFE_MUTEX */
mysql_mutex_lock(&mutex);
early_batch= !last_batch;
garbage_collect();
......@@ -4111,7 +4115,6 @@ recv_group_scan_log_recs(
recv_sys.scanned_lsn = *contiguous_lsn;
recv_sys.recovered_lsn = *contiguous_lsn;
recv_sys.scanned_checkpoint_no = 0;
ut_ad(recv_max_page_lsn == 0);
mysql_mutex_unlock(&recv_sys.mutex);
lsn_t start_lsn;
......@@ -4448,6 +4451,22 @@ dberr_t recv_recovery_read_max_checkpoint()
return err;
}
inline
bool recv_sys_t::validate_checkpoint(lsn_t start_lsn, lsn_t end_lsn) const
{
if (recovered_lsn >= start_lsn &&
recovered_lsn >= end_lsn &&
recovered_lsn >= max_page_lsn)
return false;
sql_print_error("InnoDB: The log was only scanned up to "
LSN_PF ", while the current LSN at the "
"time of the latest checkpoint " LSN_PF
" was " LSN_PF
" and the maximum LSN on a data page was " LSN_PF "!",
recovered_lsn, start_lsn, end_lsn, max_page_lsn);
return true;
}
/** Start recovering from a redo log checkpoint.
@param[in] flush_lsn FIL_PAGE_FILE_FLUSH_LSN
of first system tablespace page
......@@ -4660,8 +4679,10 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
recv_sys.parse_start_lsn = checkpoint_lsn;
if (srv_operation <= SRV_OPERATION_EXPORT_RESTORED) {
mysql_mutex_lock(&recv_sys.mutex);
deferred_spaces.deferred_dblwr();
buf_dblwr.recover();
mysql_mutex_unlock(&recv_sys.mutex);
}
ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);
......@@ -4694,25 +4715,8 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
}
if (!log_sys.is_physical()) {
} else if (recv_sys.recovered_lsn < checkpoint_lsn
|| recv_sys.recovered_lsn < end_lsn) {
sql_print_error("InnoDB: The log was only scanned up to "
LSN_PF ", while the current LSN at the "
"time of the latest checkpoint " LSN_PF
" was " LSN_PF "!",
recv_sys.recovered_lsn,
checkpoint_lsn, end_lsn);
} else if (recv_sys.validate_checkpoint(checkpoint_lsn, end_lsn)) {
goto err_exit;
} else if (log_sys.log.scanned_lsn < checkpoint_lsn
|| log_sys.log.scanned_lsn < end_lsn
|| log_sys.log.scanned_lsn < recv_max_page_lsn) {
sql_print_error("InnoDB: We scanned the log up to " LSN_PF
". A checkpoint was at " LSN_PF
" and the maximum LSN on a database page was "
LSN_PF ". It is possible that the"
" database is now corrupt!",
log_sys.log.scanned_lsn, checkpoint_lsn,
recv_max_page_lsn);
}
log_sys.next_checkpoint_lsn = checkpoint_lsn;
......@@ -4755,7 +4759,6 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
err = recv_rename_files();
}
recv_lsn_checks_on = true;
mysql_mutex_unlock(&recv_sys.mutex);
/* The database is now ready to start almost normal processing of user
......@@ -4771,11 +4774,14 @@ recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
bool recv_dblwr_t::validate_page(const page_id_t page_id,
const byte *page,
const fil_space_t *space,
byte *tmp_buf)
byte *tmp_buf, bool dblwr)
{
mysql_mutex_assert_owner(&recv_sys.mutex);
ulint flags;
if (page_id.page_no() == 0)
{
ulint flags= fsp_header_get_flags(page);
flags= fsp_header_get_flags(page);
if (!fil_space_t::is_valid_flags(flags, page_id.space()))
{
ulint cflags= fsp_flags_convert_from_101(flags);
......@@ -4790,7 +4796,22 @@ bool recv_dblwr_t::validate_page(const page_id_t page_id,
}
/* Page 0 is never page_compressed or encrypted. */
return !buf_page_is_corrupted(true, page, flags);
goto check_if_corrupted;
}
flags= space->flags;
if (space->full_crc32())
{
check_if_corrupted:
if (buf_page_is_corrupted(false, page, flags))
return false;
if (!dblwr)
/* The doublewrite buffer copy may legitimately carry a future
FIL_PAGE_LSN. recv_dblwr_t::find_page() and buf_dblwr_t::recover()
will deal with that. */
recv_sys.check_page_lsn(mach_read_from_8(page + FIL_PAGE_LSN));
return true;
}
ut_ad(tmp_buf);
......@@ -4800,9 +4821,6 @@ bool recv_dblwr_t::validate_page(const page_id_t page_id,
const bool expect_encrypted= space->crypt_data &&
space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
if (space->full_crc32())
return !buf_page_is_corrupted(true, page, space->flags);
if (expect_encrypted &&
mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
{
......@@ -4829,17 +4847,17 @@ bool recv_dblwr_t::validate_page(const page_id_t page_id,
return false; /* decompression failed */
if (decomp == srv_page_size)
return false; /* the page was not compressed (invalid page type) */
return !buf_page_is_corrupted(true, tmp_page, space->flags);
page= tmp_page;
}
return !buf_page_is_corrupted(true, page, space->flags);
goto check_if_corrupted;
}
byte *recv_dblwr_t::find_page(const page_id_t page_id,
const fil_space_t *space, byte *tmp_buf)
{
byte *result= NULL;
lsn_t max_lsn= 0;
byte *result= nullptr;
lsn_t min_lsn= LSN_MAX;
for (byte *page : pages)
{
......@@ -4855,19 +4873,21 @@ byte *recv_dblwr_t::find_page(const page_id_t page_id,
}
const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
if (lsn <= max_lsn ||
!validate_page(page_id, page, space, tmp_buf))
if (lsn > min_lsn ||
lsn < recv_sys.parse_start_lsn ||
!validate_page(page_id, page, space, tmp_buf, true))
{
/* Mark processed for subsequent iterations in buf_dblwr_t::recover() */
memset(page + FIL_PAGE_LSN, 0, 8);
memset_aligned<8>(page + FIL_PAGE_LSN, 0, 8);
continue;
}
ut_a(page_get_page_no(page) == page_id.page_no());
max_lsn= lsn;
min_lsn= lsn;
result= page;
}
if (result)
recv_sys.check_page_lsn(min_lsn);
return result;
}
......@@ -4926,30 +4946,36 @@ uint32_t recv_dblwr_t::find_first_page(const char *name, pfs_os_file_t file)
return 0;
}
bool recv_dblwr_t::restore_first_page(ulint space_id, const char *name,
dberr_t recv_dblwr_t::restore_first_page(ulint space_id, const char *name,
pfs_os_file_t file)
{
mysql_mutex_assert_owner(&recv_sys.mutex);
if (pages.empty())
return DB_CORRUPTION;
const page_id_t page_id(space_id, 0);
const byte* page= find_page(page_id);
const byte *page= find_page(page_id);
if (!page)
{
/* If the first page of the given user tablespace is not there
in the doublewrite buffer, then the recovery is going to fail
now. Report error only when doublewrite buffer is not empty */
if (pages.size())
ib::error() << "Corrupted page " << page_id << " of datafile '"
<< name <<"' could not be found in the "
<<"doublewrite buffer.";
return true;
sql_print_error("InnoDB: Corrupted page "
"[page id: space=" UINT32PF ", page number=0]"
" of datafile '%s' could not be found"
" in the doublewrite buffer", page_id.space(), name);
return DB_CORRUPTION;
}
ulint physical_size= fil_space_t::physical_size(
mach_read_from_4(page + FSP_HEADER_OFFSET + FSP_SPACE_FLAGS));
ib::info() << "Restoring page " << page_id << " of datafile '"
<< name << "' from the doublewrite buffer. Writing "
<< physical_size << " bytes into file '" << name << "'";
unsigned physical_size=
fil_space_t::physical_size(mach_read_from_4(FSP_HEADER_OFFSET +
FSP_SPACE_FLAGS + page));
sql_print_information("InnoDB: Restoring page "
"[page id: space=" UINT32PF ", page number=0]"
" of datafile '%s' from the doublewrite buffer",
page_id.space(), name);
return os_file_write(
IORequestWrite, name, file, page, 0, physical_size) !=
DB_SUCCESS;
return os_file_write(IORequestWrite, name, file, page, 0, physical_size);
}
......@@ -743,29 +743,30 @@ srv_undo_tablespaces_init(bool create_new_db)
srv_operation == SRV_OPERATION_RESTORE_DELTA)
? srv_undo_tablespaces : TRX_SYS_N_RSEGS;
if (dberr_t err= srv_all_undo_tablespaces_open(create_new_db, n_undo))
return err;
mysql_mutex_lock(&recv_sys.mutex);
dberr_t err= srv_all_undo_tablespaces_open(create_new_db, n_undo);
mysql_mutex_unlock(&recv_sys.mutex);
/* Initialize srv_undo_space_id_start=0 when there are no
dedicated undo tablespaces. */
if (srv_undo_tablespaces_open == 0)
srv_undo_space_id_start= 0;
if (create_new_db)
if (err == DB_SUCCESS && create_new_db)
{
mtr_t mtr;
for (ulint i= 0; i < srv_undo_tablespaces; ++i)
{
mtr.start();
dberr_t err= fsp_header_init(fil_space_get(srv_undo_space_id_start + i),
err= fsp_header_init(fil_space_get(srv_undo_space_id_start + i),
SRV_UNDO_TABLESPACE_SIZE_IN_PAGES, &mtr);
mtr.commit();
if (err)
return err;
break;
}
}
return DB_SUCCESS;
return err;
}
/** Create the temporary file tablespace.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment