Commit d92be859 authored by Vlad Lesin's avatar Vlad Lesin

MDEV-18976: Implement a CHECKSUM redo log record for improved validation

This is draft implementation without test.
parent 6859e80d
--source include/have_innodb.inc
--source include/have_debug.inc
# Disable pages flushing to allow redo log records to be executed on --prepare.
#SET @old_debug_dbug=@@global.debug_dbug;
SET GLOBAL debug_dbug="+d,ib_log_checkpoint_avoid";
#SET @old_innodb_page_cleaner_disabled_debug=@@global.innodb_page_cleaner_disabled_debug;
SET GLOBAL innodb_page_cleaner_disabled_debug=ON;
SET GLOBAL innodb_redo_log_checksum=ON;
CREATE TABLE t(i INT) ENGINE INNODB;
INSERT INTO t VALUES (1), (2), (3), (4), (5);
--source include/kill_mysqld.inc
--source include/start_mysqld.inc
DROP TABLE t;
#SET GLOBAL innodb_page_cleaner_disabled_debug=@old_innodb_page_cleaner_disabled_debug;
#SET GLOBAL debug_dbug=@old_debug_dbug;
......@@ -2504,10 +2504,6 @@ void buf_page_free(const page_id_t page_id,
buf_block_t *block= reinterpret_cast<buf_block_t*>
(buf_pool.page_hash_get_low(page_id, fold));
/* TODO: try to all this part of mtr_t::free() */
if (srv_immediate_scrub_data_uncompressed || mtr->is_page_compressed())
mtr->add_freed_offset(page_id);
if (!block || block->page.state() != BUF_BLOCK_FILE_PAGE)
{
/* FIXME: if block!=NULL, convert to BUF_BLOCK_FILE_PAGE,
......
......@@ -2637,9 +2637,9 @@ fseg_free_extent(
for (ulint i = 0; i < FSP_EXTENT_SIZE; i++) {
if (!xdes_is_free(descr, i)) {
buf_page_free(
page_id_t(space->id, first_page_in_extent + 1),
mtr, __FILE__, __LINE__);
page_id_t freed_page_id(space->id, first_page_in_extent + 1);
buf_page_free(freed_page_id, mtr, __FILE__, __LINE__);
mtr->add_freed_offset(freed_page_id);
}
}
}
......
......@@ -19914,6 +19914,11 @@ static MYSQL_SYSVAR_BOOL(immediate_scrub_data_uncompressed,
"Enable scrubbing of data",
NULL, NULL, FALSE);
static MYSQL_SYSVAR_BOOL(
redo_log_checksum, srv_redo_log_checksum, 0,
"Write redo log record with page crc for each modified page on mtr commit",
NULL, NULL, FALSE);
static MYSQL_SYSVAR_BOOL(background_scrub_data_uncompressed,
deprecated::innodb_background_scrub_data_uncompressed,
PLUGIN_VAR_OPCMDARG, innodb_deprecated_ignored, NULL,
......@@ -20140,6 +20145,7 @@ static struct st_mysql_sys_var* innobase_system_variables[]= {
MYSQL_SYSVAR(background_thread),
MYSQL_SYSVAR(encrypt_temporary_tables),
MYSQL_SYSVAR(redo_log_checksum),
NULL
};
......
......@@ -171,7 +171,7 @@ class page_id_t
m_id= (m_id & ~uint64_t{0} << 32) | page_no;
}
ulonglong raw() { return m_id; }
ulonglong raw() const { return m_id; }
private:
/** The page identifier */
uint64_t m_id;
......
......@@ -384,8 +384,8 @@ template<byte type>
inline byte *mtr_t::log_write(const page_id_t id, const buf_page_t *bpage,
size_t len, bool alloc, size_t offset)
{
static_assert(!(type & 15) && type != RESERVED && type != OPTION &&
type <= FILE_CHECKPOINT, "invalid type");
static_assert(!(type & 15) && type != RESERVED && type <= FILE_CHECKPOINT,
"invalid type");
ut_ad(type >= FILE_CREATE || is_named_space(id.space()));
ut_ad(!bpage || bpage->id() == id);
constexpr bool have_len= type != INIT_PAGE && type != FREE_PAGE;
......@@ -541,9 +541,13 @@ inline void mtr_t::init(buf_block_t *b)
inline void mtr_t::free(fil_space_t &space, uint32_t offset)
{
page_id_t freed_page_id(space.id, offset);
if (srv_redo_log_checksum || srv_immediate_scrub_data_uncompressed
|| is_page_compressed())
add_freed_offset(freed_page_id);
if (m_log_mode == MTR_LOG_ALL)
m_log.close(log_write<FREE_PAGE>(freed_page_id, nullptr));
ut_ad(!m_user_space || m_user_space == &space);
if (&space == fil_system.sys_space)
freed_system_tablespace_page();
......@@ -673,3 +677,19 @@ inline void mtr_t::trim_pages(const page_id_t id)
m_log.close(l);
set_trim_pages();
}
inline void mtr_t::page_checksum(const page_id_t id, uint32_t crc,
lsn_t flushed_lsn)
{
if (m_log_mode != MTR_LOG_ALL)
return;
static_assert(sizeof(crc) == 4, "compatibility");
static_assert(sizeof(flushed_lsn) == 8, "compatibility");
byte* l = log_write<OPTION>(id, nullptr, 4 + 8 + 1, true);
*l++ = CHECKSUM;
mach_write_to_4(l, crc);
l += 4;
mach_write_to_8(l, flushed_lsn);
l += 8;
m_log.close(l);
}
......@@ -344,7 +344,7 @@ struct mtr_t {
/** Check if we are holding exclusive tablespace latch
@param space tablespace to search for
@return whether space.latch is being held */
bool memo_contains(const fil_space_t& space)
bool memo_contains(const fil_space_t& space) const
MY_ATTRIBUTE((warn_unused_result));
......@@ -378,7 +378,7 @@ struct mtr_t {
mtr_buf_t* get_memo() { return &m_memo; }
/** @return true if system tablespace page has been freed */
bool is_freed_system_tablespace_page()
bool is_freed_system_tablespace_page() const
{
return m_freed_in_system_tablespace;
}
......@@ -577,6 +577,9 @@ struct mtr_t {
@param id first page identifier that will not be in the file */
inline void trim_pages(const page_id_t id);
inline void page_checksum(const page_id_t id, uint32_t crc,
lsn_t flushed_lsn);
/** Write a log record about a file operation.
@param type file operation
@param space_id tablespace identifier
......@@ -645,6 +648,42 @@ struct mtr_t {
{ ut_ad(!m_commit || m_start); return m_start && !m_commit; }
/** @return whether the mini-transaction has been committed */
bool has_committed() const { ut_ad(!m_commit || m_start); return m_commit; }
bool page_is_freed(page_id_t id) const
{
if (!m_freed_pages)
return false;
fil_space_t *freed_space= m_user_space;
/* Get the freed tablespace in case of predefined tablespace */
if (!freed_space)
{
ut_ad(is_freed_system_tablespace_page());
freed_space= fil_system.sys_space;
}
ut_ad(memo_contains(*freed_space));
if (id.space() != freed_space->id)
return false;
return m_freed_pages->contains(id.page_no());
}
static uint32_t page_crc(const byte* page)
{
/* Since the field FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, and in
versions <= 4.1.x FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID, are written
outside the buffer pool to the first pages of data files, we have to
skip them in the page checksum calculation. We must also skip the
field FIL_PAGE_SPACE_OR_CHKSUM where the checksum is stored, and also
the last 8 bytes of page because there we store the old formula
checksum. */
return static_cast<uint32_t>(
ut_fold_binary(page + FIL_PAGE_OFFSET, FIL_PAGE_LSN - FIL_PAGE_OFFSET)
+ ut_fold_binary(page + FIL_PAGE_TYPE,
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION - FIL_PAGE_PREV)
+ ut_fold_binary(page + FIL_PAGE_DATA, srv_page_size - FIL_PAGE_DATA
- FIL_PAGE_END_LSN_OLD_CHKSUM));
}
private:
/** whether start() has been called */
bool m_start= false;
......
......@@ -289,6 +289,9 @@ enum mrec_ext_t
TRIM_PAGES= 10
};
enum mrec_opt_t {
CHECKSUM
};
/** Redo log record types for file-level operations. These bit
patterns will be written to redo log files, so the existing codes or
......
......@@ -410,6 +410,7 @@ extern ulong srv_max_purge_lag_delay;
extern my_bool innodb_encrypt_temporary_tables;
extern my_bool srv_immediate_scrub_data_uncompressed;
extern my_bool srv_redo_log_checksum;
/*-------------------------------------------*/
/** Modes of operation */
......
......@@ -266,7 +266,6 @@ struct log_phys_t : public log_rec_t
next_not_same_page:
last_offset= 1; /* the next record must not be same_page */
}
next:
l+= rlen;
continue;
}
......@@ -280,7 +279,32 @@ struct log_phys_t : public log_rec_t
switch (b & 0x70) {
case OPTION:
goto next;
{
if (UNIV_UNLIKELY(rlen != 1 + 4 + 8 || *l != CHECKSUM))
goto record_corrupted;
++l;
uint32_t crc = mach_read_from_4(l);
l += 4;
// lsn_t flushed_lsn = mach_read_from_8(l);
l += 8;
uint32_t calc_crc = mtr_t::page_crc(frame);
// lsn_t flushed_lsn_from_page = mach_read_from_8(frame + FIL_PAGE_LSN);
if (calc_crc != crc) {
ib::warn() << "Page checksum stored in redo log record " << crc
<< " does not match counted checksum " << calc_crc
<< " for page " << block.page.id();
}
/*
if (flushed_lsn_from_page != flushed_lsn) {
ib::warn() << "Page LSN stored in redo log record " << flushed_lsn
<< " does not match " << flushed_lsn_from_page
<< " stored on page " << block.page.id();
failed = true;
}
*/
applied = APPLIED_YES;
continue;
}
case EXTENDED:
if (UNIV_UNLIKELY(block.page.id().page_no() < 3 ||
block.page.zip.ssize))
......@@ -1970,8 +1994,11 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply)
}
last_offset= FIL_PAGE_TYPE;
break;
case RESERVED:
case OPTION:
if (UNIV_UNLIKELY(rlen != 1 + 4 + 8 || *l != CHECKSUM))
goto record_corrupted;
break;
case RESERVED:
continue;
case WRITE:
case MEMMOVE:
......
......@@ -32,6 +32,7 @@ Created 11/26/1995 Heikki Tuuri
#include "page0types.h"
#include "mtr0log.h"
#include "log0recv.h"
#include <unordered_set>
/** Iterate over a memo block in reverse. */
template <typename Functor>
......@@ -300,6 +301,50 @@ struct ReleaseAll {
}
};
class WriteOptionCRC {
public:
WriteOptionCRC(mtr_t &mtr) : m_mtr(mtr) {}
/** @return true always. */
bool operator()(mtr_memo_slot_t *slot)
{
if (slot->type & MTR_MEMO_MODIFY)
{
#ifdef UNIV_DEBUG
switch (slot->type & ~MTR_MEMO_MODIFY) {
case MTR_MEMO_BUF_FIX:
case MTR_MEMO_PAGE_S_FIX:
case MTR_MEMO_PAGE_SX_FIX:
case MTR_MEMO_PAGE_X_FIX:
break;
default:
ut_ad("invalid type" == 0);
break;
}
#endif /* UNIV_DEBUG */
buf_block_t *block= reinterpret_cast<buf_block_t*>(slot->object);
byte *page = block->frame;
ulonglong page_id_raw = block->page.id().raw();
if (!m_visited_pages.count(page_id_raw)) {
static_assert(FIL_PAGE_SPACE_OR_CHKSUM == FIL_PAGE_OFFSET - 4,
"compatibility");
static_assert(FIL_PAGE_TYPE == FIL_PAGE_LSN + 8, "compatibility");
uint32_t crc = mtr_t::page_crc(page);
lsn_t lsn = mach_read_from_8(page + FIL_PAGE_LSN);
if (!m_mtr.page_is_freed(block->page.id()))
m_mtr.page_checksum(block->page.id(), crc, lsn);
m_visited_pages.insert(page_id_raw);
}
}
return true;
}
private:
mtr_t &m_mtr;
std::unordered_set<ulonglong> m_visited_pages;
};
#ifdef UNIV_DEBUG
/** Check that all slots have been handled. */
struct DebugCheck {
......@@ -400,6 +445,12 @@ void mtr_t::commit()
{
ut_ad(!srv_read_only_mode || m_log_mode == MTR_LOG_NO_REDO);
if (srv_redo_log_checksum)
{
Iterate<WriteOptionCRC> iteration(WriteOptionCRC(*this));
m_memo.for_each_block(iteration);
}
std::pair<lsn_t,bool> lsns;
if (const ulint len= prepare_write())
......@@ -969,7 +1020,7 @@ bool mtr_t::memo_contains(const rw_lock_t &lock, mtr_memo_type_t type)
/** Check if we are holding exclusive tablespace latch
@param space tablespace to search for
@return whether space.latch is being held */
bool mtr_t::memo_contains(const fil_space_t& space)
bool mtr_t::memo_contains(const fil_space_t& space) const
{
Iterate<Find> iteration(Find(&space, MTR_MEMO_SPACE_X_LOCK));
if (m_memo.for_each_block_in_reverse(iteration))
......
......@@ -392,6 +392,8 @@ my_bool innodb_encrypt_temporary_tables;
my_bool srv_immediate_scrub_data_uncompressed;
my_bool srv_redo_log_checksum;
/* Array of English strings describing the current state of an
i/o handler thread */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment