Commit 86f262f1 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12353: Reduce log volume by an UNDO_INIT record

We introduce an EXTENDED log record for initializing an undo log page.
The size of the record will be 2 bytes plus the optional page identifier.
The entire undo page will be initialized, except the space that is
already reserved for TRX_UNDO_SEG_HDR in trx_undo_seg_create().

mtr_t::undo_create(): Write the UNDO_INIT record.

trx_undo_page_init(): Initialize the undo page corresponding to the
UNDO_INIT record. Unlike the former MLOG_UNDO_INIT record, we will
initialize almost the entire page, including initializing the
TRX_UNDO_PAGE_NODE to an empty list node, so that the subsequent call
to flst_init() will avoid writing log for the undo page.
parent 3ee100b0
...@@ -521,18 +521,33 @@ inline void mtr_t::free(const page_id_t id) ...@@ -521,18 +521,33 @@ inline void mtr_t::free(const page_id_t id)
m_log.close(log_write<FREE_PAGE>(id, nullptr)); m_log.close(log_write<FREE_PAGE>(id, nullptr));
} }
/** Partly initialize a B-tree page. /** Write an EXTENDED log record.
@param block B-tree page @param block buffer pool page
@param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */ @param type extended record subtype; @see mrec_ext_t */
inline void mtr_t::page_create(const buf_block_t &block, bool comp) inline void mtr_t::log_write_extended(const buf_block_t &block, byte type)
{ {
set_modified(); set_modified();
if (m_log_mode != MTR_LOG_ALL) if (m_log_mode != MTR_LOG_ALL)
return; return;
byte *l= log_write<EXTENDED>(block.page.id, &block.page, 1, true); byte *l= log_write<EXTENDED>(block.page.id, &block.page, 1, true);
static_assert(false == INIT_ROW_FORMAT_REDUNDANT, "encoding"); *l++= type;
static_assert(true == INIT_ROW_FORMAT_DYNAMIC, "encoding");
*l++= comp;
m_log.close(l); m_log.close(l);
m_last_offset= FIL_PAGE_TYPE; m_last_offset= FIL_PAGE_TYPE;
} }
/** Write log for partly initializing a B-tree or R-tree page.
@param block B-tree page
@param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */
inline void mtr_t::page_create(const buf_block_t &block, bool comp)
{
static_assert(false == INIT_ROW_FORMAT_REDUNDANT, "encoding");
static_assert(true == INIT_ROW_FORMAT_DYNAMIC, "encoding");
log_write_extended(block, comp);
}
/** Write log for initializing an undo log page.
@param block undo page */
inline void mtr_t::undo_create(const buf_block_t &block)
{
log_write_extended(block, UNDO_INIT);
}
...@@ -490,10 +490,13 @@ struct mtr_t { ...@@ -490,10 +490,13 @@ struct mtr_t {
/** Free a page. /** Free a page.
@param id page identifier */ @param id page identifier */
inline void free(const page_id_t id); inline void free(const page_id_t id);
/** Partly initialize a B-tree page. /** Write log for partly initializing a B-tree or R-tree page.
@param block B-tree page @param block B-tree page
@param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */ @param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */
inline void page_create(const buf_block_t &block, bool comp); inline void page_create(const buf_block_t &block, bool comp);
/** Write log for initializing an undo log page.
@param block undo page */
inline void undo_create(const buf_block_t &block);
/** Write a log record about a file operation. /** Write a log record about a file operation.
@param type file operation @param type file operation
...@@ -526,6 +529,11 @@ struct mtr_t { ...@@ -526,6 +529,11 @@ struct mtr_t {
inline byte *log_write(const page_id_t id, const buf_page_t *bpage, inline byte *log_write(const page_id_t id, const buf_page_t *bpage,
size_t len= 0, bool alloc= false, size_t offset= 0); size_t len= 0, bool alloc= false, size_t offset= 0);
/** Write an EXTENDED log record.
@param block buffer pool page
@param type extended record subtype; @see mrec_ext_t */
inline void log_write_extended(const buf_block_t &block, byte type);
/** Prepare to write the mini-transaction log to the redo log buffer. /** Prepare to write the mini-transaction log to the redo log buffer.
@return number of bytes to write in finish_write() */ @return number of bytes to write in finish_write() */
inline ulint prepare_write(); inline ulint prepare_write();
......
...@@ -254,7 +254,11 @@ enum mrec_ext_t ...@@ -254,7 +254,11 @@ enum mrec_ext_t
/** Partly initialize a ROW_FORMAT=COMPACT or DYNAMIC index page, /** Partly initialize a ROW_FORMAT=COMPACT or DYNAMIC index page,
including writing the "infimum" and "supremum" pseudo-records. including writing the "infimum" and "supremum" pseudo-records.
The current byte offset will be reset to FIL_PAGE_TYPE. */ The current byte offset will be reset to FIL_PAGE_TYPE. */
INIT_ROW_FORMAT_DYNAMIC= 1 INIT_ROW_FORMAT_DYNAMIC= 1,
/** Initialize an undo log page.
This is roughly (not exactly) equivalent to the old MLOG_UNDO_INIT record.
The current byte offset will be reset to FIL_PAGE_TYPE. */
UNDO_INIT= 2
}; };
......
...@@ -156,6 +156,13 @@ trx_undo_get_first_rec(const fil_space_t &space, uint32_t page_no, ...@@ -156,6 +156,13 @@ trx_undo_get_first_rec(const fil_space_t &space, uint32_t page_no,
uint16_t offset, ulint mode, buf_block_t*& block, uint16_t offset, ulint mode, buf_block_t*& block,
mtr_t *mtr); mtr_t *mtr);
/** Initialize an undo log page.
NOTE: This corresponds to a redo log record and must not be changed!
@see mtr_t::undo_create()
@param[in,out] block undo log page */
void trx_undo_page_init(const buf_block_t &block)
MY_ATTRIBUTE((nonnull));
/** Allocate an undo log page. /** Allocate an undo log page.
@param[in,out] undo undo log @param[in,out] undo undo log
@param[in,out] mtr mini-transaction that does not hold any page latch @param[in,out] mtr mini-transaction that does not hold any page latch
......
...@@ -44,10 +44,8 @@ Created 9/20/1997 Heikki Tuuri ...@@ -44,10 +44,8 @@ Created 9/20/1997 Heikki Tuuri
#include "buf0flu.h" #include "buf0flu.h"
#include "mtr0mtr.h" #include "mtr0mtr.h"
#include "mtr0log.h" #include "mtr0log.h"
#include "page0cur.h" #include "page0page.h"
#include "page0zip.h" #include "trx0undo.h"
#include "btr0btr.h"
#include "btr0cur.h"
#include "ibuf0ibuf.h" #include "ibuf0ibuf.h"
#include "trx0undo.h" #include "trx0undo.h"
#include "trx0rec.h" #include "trx0rec.h"
...@@ -259,10 +257,20 @@ struct log_phys_t : public log_rec_t ...@@ -259,10 +257,20 @@ struct log_phys_t : public log_rec_t
!srv_force_recovery) !srv_force_recovery)
goto record_corrupted; goto record_corrupted;
static_assert(INIT_ROW_FORMAT_REDUNDANT == 0, "compatiblity"); static_assert(INIT_ROW_FORMAT_REDUNDANT == 0, "compatiblity");
static_assert(INIT_ROW_FORMAT_DYNAMIC == 1, "compatibilit"); static_assert(INIT_ROW_FORMAT_DYNAMIC == 1, "compatibility");
if (UNIV_UNLIKELY(rlen != 1 || *l > INIT_ROW_FORMAT_DYNAMIC)) if (UNIV_UNLIKELY(rlen != 1))
goto record_corrupted;
switch (*l) {
default:
goto record_corrupted; goto record_corrupted;
page_create_low(&block, *l != INIT_ROW_FORMAT_REDUNDANT); case INIT_ROW_FORMAT_REDUNDANT:
case INIT_ROW_FORMAT_DYNAMIC:
page_create_low(&block, *l != INIT_ROW_FORMAT_REDUNDANT);
break;
case UNDO_INIT:
trx_undo_page_init(block);
break;
}
last_offset= FIL_PAGE_TYPE; last_offset= FIL_PAGE_TYPE;
goto next_after_applying; goto next_after_applying;
case WRITE: case WRITE:
......
...@@ -292,25 +292,37 @@ trx_undo_get_first_rec(const fil_space_t &space, uint32_t page_no, ...@@ -292,25 +292,37 @@ trx_undo_get_first_rec(const fil_space_t &space, uint32_t page_no,
/*============== UNDO LOG FILE COPY CREATION AND FREEING ==================*/ /*============== UNDO LOG FILE COPY CREATION AND FREEING ==================*/
/** Initialize the fields in an undo log segment page. /** Initialize an undo log page.
@param[in,out] undo_block undo log segment page NOTE: This corresponds to a redo log record and must not be changed!
@param[in,out] mtr mini-transaction */ @see mtr_t::undo_create()
static void trx_undo_page_init(const buf_block_t *undo_block, mtr_t *mtr) @param[in,out] block undo log page */
void trx_undo_page_init(const buf_block_t &block)
{ {
static_assert(FIL_PAGE_TYPE_ALLOCATED == 0, "compatibility"); mach_write_to_2(my_assume_aligned<2>(FIL_PAGE_TYPE + block.frame),
/* FIXME: FIL_PAGE_TYPE should be FIL_PAGE_TYPE_ALLOCATED here! */ FIL_PAGE_UNDO_LOG);
ut_ad(mach_read_from_2(FIL_PAGE_TYPE + undo_block->frame) < 0x100); static_assert(TRX_UNDO_PAGE_HDR == FIL_PAGE_DATA, "compatibility");
mtr->write<1>(*undo_block, FIL_PAGE_TYPE + 1 + undo_block->frame, memset_aligned<2>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE + block.frame,
FIL_PAGE_UNDO_LOG); 0, 2);
compile_time_assert(TRX_UNDO_PAGE_TYPE == 0); mach_write_to_2(my_assume_aligned<2>
compile_time_assert(TRX_UNDO_PAGE_START == 2); (TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_START + block.frame),
compile_time_assert(TRX_UNDO_PAGE_NODE == TRX_UNDO_PAGE_FREE + 2); TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
memcpy_aligned<2>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE + block.frame,
alignas(4) byte hdr[6]; TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_START + block.frame, 2);
mach_write_to_4(hdr, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE); /* The following corresponds to flst_zero_both(), but without writing log. */
memcpy_aligned<2>(hdr + 4, hdr + 2, 2); memset_aligned<4>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE + FLST_PREV +
static_assert(TRX_UNDO_PAGE_FREE == 4, "compatibility"); FIL_ADDR_PAGE + block.frame, 0xff, 4);
mtr->memcpy(*undo_block, undo_block->frame + TRX_UNDO_PAGE_HDR, hdr, 6); memset_aligned<2>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE + FLST_PREV +
FIL_ADDR_BYTE + block.frame, 0, 2);
memset_aligned<2>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE + FLST_NEXT +
FIL_ADDR_PAGE + block.frame, 0xff, 4);
memset_aligned<2>(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE + FLST_NEXT +
FIL_ADDR_BYTE + block.frame, 0, 2);
static_assert(TRX_UNDO_PAGE_NODE + FLST_NEXT + FIL_ADDR_BYTE + 2 ==
TRX_UNDO_PAGE_HDR_SIZE, "compatibility");
/* Preserve TRX_UNDO_SEG_HDR, but clear the rest of the page. */
memset_aligned<2>(TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE + block.frame, 0,
srv_page_size - (TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE +
FIL_PAGE_DATA_END));
} }
/** Look for a free slot for an undo log segment. /** Look for a free slot for an undo log segment.
...@@ -383,11 +395,12 @@ trx_undo_seg_create(fil_space_t *space, buf_block_t *rseg_hdr, ulint *id, ...@@ -383,11 +395,12 @@ trx_undo_seg_create(fil_space_t *space, buf_block_t *rseg_hdr, ulint *id,
buf_block_dbg_add_level(block, SYNC_TRX_UNDO_PAGE); buf_block_dbg_add_level(block, SYNC_TRX_UNDO_PAGE);
trx_undo_page_init(block, mtr); mtr->undo_create(*block);
trx_undo_page_init(*block);
mtr->write<2,mtr_t::OPT>(*block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE mtr->write<2>(*block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
+ block->frame, + block->frame,
TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE); TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE);
mtr->write<2,mtr_t::OPT>(*block, TRX_UNDO_SEG_HDR + TRX_UNDO_LAST_LOG mtr->write<2,mtr_t::OPT>(*block, TRX_UNDO_SEG_HDR + TRX_UNDO_LAST_LOG
+ block->frame, 0U); + block->frame, 0U);
...@@ -565,7 +578,8 @@ buf_block_t* trx_undo_add_page(trx_undo_t* undo, mtr_t* mtr) ...@@ -565,7 +578,8 @@ buf_block_t* trx_undo_add_page(trx_undo_t* undo, mtr_t* mtr)
buf_block_dbg_add_level(new_block, SYNC_TRX_UNDO_PAGE); buf_block_dbg_add_level(new_block, SYNC_TRX_UNDO_PAGE);
undo->last_page_no = new_block->page.id.page_no(); undo->last_page_no = new_block->page.id.page_no();
trx_undo_page_init(new_block, mtr); mtr->undo_create(*new_block);
trx_undo_page_init(*new_block);
flst_add_last(header_block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST, flst_add_last(header_block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST,
new_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE, mtr); new_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE, mtr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment