Commit 84e3f9ce authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12353: Reduce log volume by an UNDO_APPEND record

We introduce an EXTENDED log record for appending an undo log record
to an undo log page. This is equivalent to the MLOG_UNDO_INSERT record
that was removed in commit f802c989,
only using more compact encoding.

mtr_t::log_write(): Fix a bug that affects longer log
record writes in the !same_page && !have_offset case.
Similar code is already implemented for the have_offset code path.
The bug was unobservable before we started to write longer
EXTENDED records. All !have_offset records (FREE_PAGE, INIT_PAGE,
EXTENDED) that were written so far are short, and we never write
RESERVED or OPTION records.

mtr_t::undo_append(): Write an UNDO_APPEND record.

log_phys_t::undo_append(): Apply an UNDO_APPEND record.

trx_undo_page_set_next_prev_and_add(),
trx_undo_page_report_modify(),
trx_undo_page_report_rename():
Invoke mtr_t::undo_append() instead of emitting WRITE records.
parent 86f262f1
......@@ -439,7 +439,7 @@ inline byte *mtr_t::log_write(const page_id_t id, const buf_page_t *bpage,
}
else if (len >= 3 && end + len > &log_ptr[16])
{
len+= end - log_ptr - 16;
len+= end - log_ptr - 15;
if (len >= MIN_3BYTE)
len+= 2;
else if (len >= MIN_2BYTE)
......@@ -447,7 +447,7 @@ inline byte *mtr_t::log_write(const page_id_t id, const buf_page_t *bpage,
end= log_ptr;
*end++= type | same_page;
mlog_encode_varint(end, len);
end= mlog_encode_varint(end, len);
if (!same_page)
{
......@@ -551,3 +551,32 @@ inline void mtr_t::undo_create(const buf_block_t &block)
{
log_write_extended(block, UNDO_INIT);
}
/** Write log for appending an undo log record.
@param block undo page
@param data record within the undo page
@param len length of the undo record, in bytes */
inline void mtr_t::undo_append(const buf_block_t &block,
const void *data, size_t len)
{
ut_ad(len > 2);
set_modified();
if (m_log_mode != MTR_LOG_ALL)
return;
const bool small= len < mtr_buf_t::MAX_DATA_SIZE - (3 + 3 + 5 + 5);
byte *end= log_write<EXTENDED>(block.page.id, &block.page, len + 1, small);
if (UNIV_LIKELY(small))
{
*end++= UNDO_APPEND;
::memcpy(end, data, len);
m_log.close(end + len);
}
else
{
m_log.close(end);
byte type= UNDO_APPEND;
m_log.push(&type, 1);
m_log.push(static_cast<const byte*>(data), static_cast<uint32_t>(len));
}
m_last_offset= FIL_PAGE_TYPE;
}
......@@ -497,6 +497,12 @@ struct mtr_t {
/** Write log for initializing an undo log page.
@param block undo page */
inline void undo_create(const buf_block_t &block);
/** Write log for appending an undo log record.
@param block undo page
@param data record within the undo page
@param len length of the undo record, in bytes */
inline void undo_append(const buf_block_t &block,
const void *data, size_t len);
/** Write a log record about a file operation.
@param type file operation
......
......@@ -258,7 +258,11 @@ enum mrec_ext_t
/** Initialize an undo log page.
This is roughly (not exactly) equivalent to the old MLOG_UNDO_INIT record.
The current byte offset will be reset to FIL_PAGE_TYPE. */
UNDO_INIT= 2
UNDO_INIT= 2,
/** Append a record to an undo log page.
This is equivalent to the old MLOG_UNDO_INSERT record.
The current byte offset will be reset to FIL_PAGE_TYPE. */
UNDO_APPEND= 3
};
......
......@@ -152,6 +152,35 @@ struct log_phys_t : public log_rec_t
len+= static_cast<uint16_t>(size);
}
/** Apply an UNDO_APPEND record.
@see mtr_t::undo_append()
@param block undo log page
@param data undo log record
@param len length of the undo log record */
static void undo_append(const buf_block_t &block, const byte *data,
size_t len)
{
ut_ad(len > 2);
byte *free_p= my_assume_aligned<2>
(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE + block.frame);
const uint16_t free= mach_read_from_2(free_p);
if (UNIV_UNLIKELY(free < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE ||
free + len + 6 >= srv_page_size - FIL_PAGE_DATA_END))
{
ib::error() << "Not applying UNDO_APPEND due to corruption on "
<< block.page.id;
return;
}
byte *p= block.frame + free;
mach_write_to_2(free_p, free + 4 + len);
memcpy(p, free_p, 2);
p+= 2;
memcpy(p, data, len);
p+= len;
mach_write_to_2(p, free);
}
/** The status of apply() */
enum apply_status {
/** The page was not affected */
......@@ -258,18 +287,27 @@ struct log_phys_t : public log_rec_t
goto record_corrupted;
static_assert(INIT_ROW_FORMAT_REDUNDANT == 0, "compatiblity");
static_assert(INIT_ROW_FORMAT_DYNAMIC == 1, "compatibility");
if (UNIV_UNLIKELY(rlen != 1))
if (UNIV_UNLIKELY(!rlen))
goto record_corrupted;
switch (*l) {
default:
goto record_corrupted;
case INIT_ROW_FORMAT_REDUNDANT:
case INIT_ROW_FORMAT_DYNAMIC:
if (UNIV_UNLIKELY(rlen != 1))
goto record_corrupted;
page_create_low(&block, *l != INIT_ROW_FORMAT_REDUNDANT);
break;
case UNDO_INIT:
if (UNIV_UNLIKELY(rlen != 1))
goto record_corrupted;
trx_undo_page_init(block);
break;
case UNDO_APPEND:
if (UNIV_UNLIKELY(rlen <= 3))
goto record_corrupted;
undo_append(block, ++l, --rlen);
break;
}
last_offset= FIL_PAGE_TYPE;
goto next_after_applying;
......@@ -1814,7 +1852,7 @@ bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t store, bool apply)
goto record_corrupted;
break;
case EXTENDED:
if (UNIV_UNLIKELY(rlen != 1))
if (UNIV_UNLIKELY(!rlen))
goto record_corrupted;
last_offset= FIL_PAGE_TYPE;
break;
......
......@@ -80,36 +80,31 @@ trx_undo_page_set_next_prev_and_add(
written on this undo page. */
mtr_t* mtr) /*!< in: mtr */
{
ut_ad(page_align(ptr) == undo_block->frame);
ut_ad(page_align(ptr) == undo_block->frame);
if (UNIV_UNLIKELY(trx_undo_left(undo_block, ptr) < 2)) {
return(0);
}
byte* ptr_to_first_free = TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
+ undo_block->frame;
uint16_t first_free = mach_read_from_2(ptr_to_first_free);
ut_ad(ptr > &undo_block->frame[first_free]);
if (UNIV_UNLIKELY(trx_undo_left(undo_block, ptr) < 2))
return 0;
/* Write offset of the previous undo log record */
mach_write_to_2(ptr, first_free);
ptr += 2;
byte *ptr_to_first_free= my_assume_aligned<2>(TRX_UNDO_PAGE_HDR +
TRX_UNDO_PAGE_FREE +
undo_block->frame);
uint16_t end_of_rec = uint16_t(ptr - undo_block->frame);
const uint16_t first_free= mach_read_from_2(ptr_to_first_free);
/* Write offset of the next undo log record */
mach_write_to_2(undo_block->frame + first_free, end_of_rec);
/* Write offset of the previous undo log record */
memcpy(ptr, ptr_to_first_free, 2);
ptr += 2;
/* Update the offset to first free undo record */
mtr->write<2>(*undo_block, ptr_to_first_free, end_of_rec);
const uint16_t end_of_rec= static_cast<uint16_t>(ptr - undo_block->frame);
ut_ad(ptr > &undo_block->frame[first_free]);
ut_ad(ptr < &undo_block->frame[srv_page_size]);
mtr->memcpy(*undo_block, first_free,
ptr - &undo_block->frame[first_free]);
/* Update the offset to first free undo record */
mach_write_to_2(ptr_to_first_free, end_of_rec);
/* Write offset of the next undo log record */
memcpy(undo_block->frame + first_free, ptr_to_first_free, 2);
const byte *start= undo_block->frame + first_free + 2;
return(first_free);
mtr->undo_append(*undo_block, start, ptr - start - 2);
return first_free;
}
/** Virtual column undo log version. To distinguish it from a length value
......@@ -379,13 +374,14 @@ trx_undo_page_report_insert(
ut_ad(mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE
+ undo_block->frame) <= 2);
uint16_t first_free = mach_read_from_2(TRX_UNDO_PAGE_HDR
+ TRX_UNDO_PAGE_FREE
+ undo_block->frame);
uint16_t first_free = mach_read_from_2(my_assume_aligned<2>
(TRX_UNDO_PAGE_HDR
+ TRX_UNDO_PAGE_FREE
+ undo_block->frame));
byte* ptr = undo_block->frame + first_free;
ut_ad(first_free >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
ut_ad(first_free <= srv_page_size);
ut_ad(first_free <= srv_page_size - FIL_PAGE_DATA_END);
if (trx_undo_left(undo_block, ptr) < 2 + 1 + 11 + 11) {
/* Not enough space for writing the general parameters */
......@@ -779,8 +775,6 @@ trx_undo_page_report_modify(
virtual column info */
mtr_t* mtr) /*!< in: mtr */
{
byte* ptr;
ut_ad(index->is_primary());
ut_ad(rec_offs_validate(rec, index, offsets));
/* MariaDB 10.3.1+ in trx_undo_page_init() always initializes
......@@ -790,13 +784,15 @@ trx_undo_page_report_modify(
ut_ad(mach_read_from_2(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_TYPE
+ undo_block->frame) <= 2);
uint16_t first_free = mach_read_from_2(TRX_UNDO_PAGE_HDR
+ TRX_UNDO_PAGE_FREE
+ undo_block->frame);
ptr = undo_block->frame + first_free;
byte* ptr_to_first_free = my_assume_aligned<2>(TRX_UNDO_PAGE_HDR
+ TRX_UNDO_PAGE_FREE
+ undo_block->frame);
const uint16_t first_free = mach_read_from_2(ptr_to_first_free);
byte *ptr = undo_block->frame + first_free;
ut_ad(first_free >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
ut_ad(first_free <= srv_page_size);
ut_ad(first_free <= srv_page_size - FIL_PAGE_DATA_END);
if (trx_undo_left(undo_block, ptr) < 50) {
/* NOTE: the value 50 must be big enough so that the general
......@@ -1384,18 +1380,15 @@ trx_undo_page_report_modify(
}
mach_write_to_2(ptr, first_free);
ptr += 2;
const uint16_t new_free = static_cast<uint16_t>(
ptr - undo_block->frame);
ptr + 2 - undo_block->frame);
mach_write_to_2(undo_block->frame + first_free, new_free);
mtr->write<2>(*undo_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
+ undo_block->frame, new_free);
ut_ad(ptr > &undo_block->frame[first_free]);
ut_ad(page_align(ptr) == undo_block->frame);
mtr->memcpy(*undo_block, first_free,
ptr - &undo_block->frame[first_free]);
return first_free;
mach_write_to_2(ptr_to_first_free, new_free);
const byte* start = &undo_block->frame[first_free + 2];
mtr->undo_append(*undo_block, start, ptr - start);
return(first_free);
}
/**********************************************************************//**
......@@ -1848,11 +1841,12 @@ uint16_t
trx_undo_page_report_rename(trx_t* trx, const dict_table_t* table,
buf_block_t* block, mtr_t* mtr)
{
byte* ptr_first_free = TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE
+ block->frame;
byte* ptr_first_free = my_assume_aligned<2>(TRX_UNDO_PAGE_HDR
+ TRX_UNDO_PAGE_FREE
+ block->frame);
const uint16_t first_free = mach_read_from_2(ptr_first_free);
ut_ad(first_free >= TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
ut_ad(first_free <= srv_page_size);
ut_ad(first_free <= srv_page_size - FIL_PAGE_DATA_END);
byte* const start = block->frame + first_free;
size_t len = strlen(table->name.m_name);
const size_t fixed = 2 + 1 + 11 + 11 + 2;
......@@ -1875,12 +1869,9 @@ trx_undo_page_report_rename(trx_t* trx, const dict_table_t* table,
memcpy(ptr, table->name.m_name, len);
ptr += len;
mach_write_to_2(ptr, first_free);
ptr += 2;
uint16_t offset = page_offset(ptr);
mach_write_to_2(start, offset);
mtr->write<2>(*block, ptr_first_free, offset);
ut_ad(page_align(ptr) == block->frame);
mtr->memcpy(*block, first_free, ptr - start);
mach_write_to_2(ptr_first_free, ptr + 2 - block->frame);
memcpy(start, ptr_first_free, 2);
mtr->undo_append(*block, start + 2, ptr - start - 2);
return first_free;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment