Commit 572d2075 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12353: Reduce log volume of page_cur_delete_rec()

mrec_ext_t: Introduce DELETE_ROW_FORMAT_REDUNDANT,
DELETE_ROW_FORMAT_DYNAMIC.

mtr_t::page_delete(): Write DELETE_ROW_FORMAT_REDUNDANT or
DELETE_ROW_FORMAT_DYNAMIC log records. We log the byte offset
of the preceding record, so that on recovery we can easily
find everything to update. For DELETE_ROW_FORMAT_DYNAMIC,
we must also write the header and data size of the record.

We will retain the physical logging for ROW_FORMAT=COMPRESSED pages.

page_zip_dir_balance_slot(): Renamed from page_dir_balance_slot(),
and specialized for ROW_FORMAT=COMPRESSED only.

page_rec_set_n_owned(), page_dir_slot_set_n_owned(),
page_dir_balance_slot(): New variants that do not write any log.

page_mem_free(): Take data_size, extra_size as parameters.
Always zerofill the record payload.

page_cur_delete_rec(): For other than ROW_FORMAT=COMPRESSED,
only write log by mtr_t::page_delete().
parent bc76cfe8
......@@ -536,7 +536,7 @@ inline void mtr_t::log_write_extended(const buf_block_t &block, byte type)
}
/** Write log for partly initializing a B-tree or R-tree page.
@param block B-tree page
@param block B-tree or R-tree page
@param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */
inline void mtr_t::page_create(const buf_block_t &block, bool comp)
{
......@@ -545,6 +545,58 @@ inline void mtr_t::page_create(const buf_block_t &block, bool comp)
log_write_extended(block, comp);
}
/** Write log for deleting a B-tree or R-tree record in ROW_FORMAT=REDUNDANT.
@param block B-tree or R-tree page
@param prev_rec byte offset of the predecessor of the record to delete,
starting from PAGE_OLD_INFIMUM */
inline void mtr_t::page_delete(const buf_block_t &block, ulint prev_rec)
{
ut_ad(!block.zip_size());
ut_ad(prev_rec < block.physical_size());
set_modified();
if (m_log_mode != MTR_LOG_ALL)
return;
size_t len= (prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4);
byte *l= log_write<EXTENDED>(block.page.id, &block.page, len, true);
ut_d(byte *end= l + len);
*l++= DELETE_ROW_FORMAT_REDUNDANT;
l= mlog_encode_varint(l, prev_rec);
ut_ad(end == l);
m_log.close(l);
m_last_offset= FIL_PAGE_TYPE;
}
/** Write log for deleting a COMPACT or DYNAMIC B-tree or R-tree record.
@param block B-tree or R-tree page
@param prev_rec byte offset of the predecessor of the record to delete,
starting from PAGE_NEW_INFIMUM
@param prev_rec the predecessor of the record to delete
@param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES
@param data_size data payload size, in bytes */
inline void mtr_t::page_delete(const buf_block_t &block, ulint prev_rec,
size_t hdr_size, size_t data_size)
{
ut_ad(!block.zip_size());
set_modified();
ut_ad(hdr_size < MIN_3BYTE);
ut_ad(prev_rec < block.physical_size());
ut_ad(data_size < block.physical_size());
if (m_log_mode != MTR_LOG_ALL)
return;
size_t len= prev_rec < MIN_2BYTE ? 2 : prev_rec < MIN_3BYTE ? 3 : 4;
len+= hdr_size < MIN_2BYTE ? 1 : 2;
len+= data_size < MIN_2BYTE ? 1 : data_size < MIN_3BYTE ? 2 : 3;
byte *l= log_write<EXTENDED>(block.page.id, &block.page, len, true);
ut_d(byte *end= l + len);
*l++= DELETE_ROW_FORMAT_DYNAMIC;
l= mlog_encode_varint(l, prev_rec);
l= mlog_encode_varint(l, hdr_size);
l= mlog_encode_varint(l, data_size);
ut_ad(end == l);
m_log.close(l);
m_last_offset= FIL_PAGE_TYPE;
}
/** Write log for initializing an undo log page.
@param block undo page */
inline void mtr_t::undo_create(const buf_block_t &block)
......
......@@ -491,9 +491,23 @@ struct mtr_t {
@param id page identifier */
inline void free(const page_id_t id);
/** Write log for partly initializing a B-tree or R-tree page.
@param block B-tree page
@param block B-tree or R-tree page
@param comp false=ROW_FORMAT=REDUNDANT, true=COMPACT or DYNAMIC */
inline void page_create(const buf_block_t &block, bool comp);
/** Write log for deleting a B-tree or R-tree record in ROW_FORMAT=REDUNDANT.
@param block B-tree or R-tree page
@param prev_rec byte offset of the predecessor of the record to delete,
starting from PAGE_OLD_INFIMUM */
inline void page_delete(const buf_block_t &block, ulint prev_rec);
/** Write log for deleting a COMPACT or DYNAMIC B-tree or R-tree record.
@param block B-tree or R-tree page
@param prev_rec byte offset of the predecessor of the record to delete,
starting from PAGE_NEW_INFIMUM
@param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES
@param data_size data payload size, in bytes */
inline void page_delete(const buf_block_t &block, ulint prev_rec,
size_t hdr_size, size_t data_size);
/** Write log for initializing an undo log page.
@param block undo page */
inline void undo_create(const buf_block_t &block);
......
......@@ -262,7 +262,18 @@ enum mrec_ext_t
/** Append a record to an undo log page.
This is equivalent to the old MLOG_UNDO_INSERT record.
The current byte offset will be reset to FIL_PAGE_TYPE. */
UNDO_APPEND= 3
UNDO_APPEND= 3,
/** Delete a record on a ROW_FORMAT=REDUNDANT page.
We point to the precedessor of the record to be deleted.
The current byte offset will be reset to FIL_PAGE_TYPE.
This is similar to the old MLOG_REC_DELETE record. */
DELETE_ROW_FORMAT_REDUNDANT= 8,
/** Delete a record on a ROW_FORMAT=COMPACT or DYNAMIC page.
We point to the precedessor of the record to be deleted
and include the total size of the record being deleted.
The current byte offset will be reset to FIL_PAGE_TYPE.
This is similar to the old MLOG_COMP_REC_DELETE record. */
DELETE_ROW_FORMAT_DYNAMIC= 9
};
......
......@@ -201,6 +201,21 @@ page_cur_delete_rec(
mtr_t* mtr) /*!< in/out: mini-transaction */
MY_ATTRIBUTE((nonnull));
/** Apply a DELETE_ROW_FORMAT_REDUNDANT record that was written by
page_cur_delete_rec() for a ROW_FORMAT=REDUNDANT page.
@param block B-tree or R-tree page in ROW_FORMAT=REDUNDANT
@param prev byte offset of the predecessor, relative to PAGE_OLD_INFIMUM */
void page_apply_delete_redundant(const buf_block_t &block, ulint prev);
/** Apply a DELETE_ROW_FORMAT_DYNAMIC record that was written by
page_cur_delete_rec() for a ROW_FORMAT=COMPACT or DYNAMIC page.
@param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
@param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
@param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES
@param data_size data payload size, in bytes */
void page_apply_delete_dynamic(const buf_block_t &block, ulint prev,
size_t hdr_size, size_t data_size);
/** Search the right position for a page cursor.
@param[in] block buffer block
@param[in] index index tree
......
......@@ -410,7 +410,7 @@ inline trx_id_t page_get_max_trx_id(const page_t *page)
Set the number of owned records.
@tparam compressed whether to update any ROW_FORMAT=COMPRESSED page as well
@param[in,out] block index page
@param[in,out] rec ROW_FORMAT=REDUNDANT record
@param[in,out] rec record in block.frame
@param[in] n_owned number of records skipped in the sparse page directory
@param[in] comp whether ROW_FORMAT is one of COMPACT,DYNAMIC,COMPRESSED
@param[in,out] mtr mini-transaction */
......@@ -643,7 +643,7 @@ page_rec_check(
@return pointer to record */
inline rec_t *page_dir_slot_get_rec(page_dir_slot_t *slot)
{
return page_align(slot) + mach_read_from_2(slot);
return page_align(slot) + mach_read_from_2(my_assume_aligned<2>(slot));
}
inline const rec_t *page_dir_slot_get_rec(const page_dir_slot_t *slot)
{
......
......@@ -45,6 +45,7 @@ Created 9/20/1997 Heikki Tuuri
#include "mtr0mtr.h"
#include "mtr0log.h"
#include "page0page.h"
#include "page0cur.h"
#include "trx0undo.h"
#include "ibuf0ibuf.h"
#include "trx0undo.h"
......@@ -282,14 +283,14 @@ struct log_phys_t : public log_rec_t
goto next;
case EXTENDED:
if (UNIV_UNLIKELY(block.page.id.page_no() < 3 ||
block.page.zip.ssize) &&
!srv_force_recovery)
block.page.zip.ssize))
goto record_corrupted;
static_assert(INIT_ROW_FORMAT_REDUNDANT == 0, "compatiblity");
static_assert(INIT_ROW_FORMAT_DYNAMIC == 1, "compatibility");
if (UNIV_UNLIKELY(!rlen))
goto record_corrupted;
switch (*l) {
uint8_t ll;
default:
goto record_corrupted;
case INIT_ROW_FORMAT_REDUNDANT:
......@@ -308,6 +309,39 @@ struct log_phys_t : public log_rec_t
goto record_corrupted;
undo_append(block, ++l, --rlen);
break;
case DELETE_ROW_FORMAT_REDUNDANT:
if (UNIV_UNLIKELY(rlen < 2 || rlen > 4))
goto record_corrupted;
rlen--;
ll= mlog_decode_varint_length(*++l);
if (UNIV_UNLIKELY(ll != rlen))
goto record_corrupted;
page_apply_delete_redundant(block, mlog_decode_varint(l));
break;
case DELETE_ROW_FORMAT_DYNAMIC:
if (UNIV_UNLIKELY(rlen < 2))
goto record_corrupted;
rlen--;
ll= mlog_decode_varint_length(*++l);
if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
goto record_corrupted;
size_t prev_rec= mlog_decode_varint(l);
ut_ad(prev_rec != MLOG_DECODE_ERROR);
rlen-= ll;
l+= ll;
ll= mlog_decode_varint_length(*l);
if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
goto record_corrupted;
size_t hdr_size= mlog_decode_varint(l);
ut_ad(hdr_size != MLOG_DECODE_ERROR);
rlen-= ll;
l+= ll;
ll= mlog_decode_varint_length(*l);
if (UNIV_UNLIKELY(ll > 3 || ll != rlen))
goto record_corrupted;
page_apply_delete_dynamic(block, prev_rec, hdr_size,
mlog_decode_varint(l));
break;
}
last_offset= FIL_PAGE_TYPE;
goto next_after_applying;
......
......@@ -785,17 +785,15 @@ page_cur_open_on_rnd_user_rec(
}
/**
Set the owned records field of the record pointed to by a directory slot.
@param[in,out] block file page
@param[in] slot sparse directory slot
@param[in,out] n number of records owned by the directory slot
@param[in,out] mtr mini-transaction */
static void page_dir_slot_set_n_owned(buf_block_t *block,
const page_dir_slot_t *slot,
ulint n, mtr_t *mtr)
Set the number of owned records.
@param[in,out] rec record in block.frame
@param[in] n_owned number of records skipped in the sparse page directory
@param[in] comp whether ROW_FORMAT is COMPACT or DYNAMIC */
static void page_rec_set_n_owned(rec_t *rec, ulint n_owned, bool comp)
{
rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot));
page_rec_set_n_owned<true>(block, rec, n, page_rec_is_comp(rec), mtr);
rec-= comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED;
*rec= static_cast<byte>((*rec & ~REC_N_OWNED_MASK) |
(n_owned << REC_N_OWNED_SHIFT));
}
/**
......@@ -874,12 +872,13 @@ static void page_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
Try to balance an underfilled directory slot with an adjacent one,
so that there are at least the minimum number of records owned by the slot;
this may result in merging the two slots.
@param[in,out] block index page
@param[in,out] block ROW_FORMAT=COMPRESSED page
@param[in] s the slot to be balanced
@param[in,out] mtr mini-transaction */
static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
static void page_zip_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
{
ut_ad(!block->page.zip.data || page_is_comp(block->frame));
ut_ad(block->page.zip.data);
ut_ad(page_is_comp(block->frame));
ut_ad(s > 0);
const ulint n_slots = page_dir_get_n_slots(block->frame);
......@@ -892,21 +891,23 @@ static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
ut_ad(s < n_slots);
page_dir_slot_t* slot = page_dir_get_nth_slot(block->frame, s);
page_dir_slot_t* up_slot = slot - PAGE_DIR_SLOT_SIZE;
const ulint up_n_owned = page_dir_slot_get_n_owned(up_slot);
rec_t* const up_rec = const_cast<rec_t*>
(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE));
rec_t* const slot_rec = const_cast<rec_t*>
(page_dir_slot_get_rec(slot));
const ulint up_n_owned = rec_get_n_owned_new(up_rec);
ut_ad(page_dir_slot_get_n_owned(slot)
ut_ad(rec_get_n_owned_new(page_dir_slot_get_rec(slot))
== PAGE_DIR_SLOT_MIN_N_OWNED - 1);
if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
compile_time_assert(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1
<= PAGE_DIR_SLOT_MAX_N_OWNED);
/* Merge the slots. */
ulint n_owned = page_dir_slot_get_n_owned(slot);
page_dir_slot_set_n_owned(block, slot, 0, mtr);
page_dir_slot_set_n_owned(block, up_slot, n_owned
+ page_dir_slot_get_n_owned(up_slot),
mtr);
page_rec_set_n_owned<true>(block, slot_rec, 0, true, mtr);
page_rec_set_n_owned<true>(block, up_rec, up_n_owned
+ (PAGE_DIR_SLOT_MIN_N_OWNED - 1),
true, mtr);
/* Shift the slots */
page_dir_slot_t* last_slot = page_dir_get_nth_slot(
block->frame, n_slots - 1);
......@@ -916,48 +917,92 @@ static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
byte *n_slots_p= my_assume_aligned<2>
(n_slots_f + block->frame);
mtr->write<2>(*block, n_slots_p, n_slots - 1);
if (UNIV_LIKELY_NULL(block->page.zip.data)) {
memset_aligned<2>(last_slot, 0, 2);
memcpy_aligned<2>(n_slots_f + block->page.zip.data,
n_slots_p, 2);
} else {
mtr->memmove(*block, PAGE_DIR_SLOT_SIZE
+ page_offset(last_slot),
page_offset(last_slot), slot - last_slot);
mtr->write<2>(*block, last_slot, 0U);
}
memset_aligned<2>(last_slot, 0, 2);
return;
}
/* Transfer one record to the underfilled slot */
rec_t* old_rec = const_cast<rec_t*>(page_dir_slot_get_rec(slot));
rec_t* new_rec;
if (page_is_comp(block->frame)) {
new_rec = rec_get_next_ptr(old_rec, TRUE);
page_rec_set_n_owned<true>(block, old_rec, 0, true, mtr);
page_rec_set_n_owned<true>(block, slot_rec, 0, true, mtr);
rec_t* new_rec = rec_get_next_ptr(slot_rec, TRUE);
page_rec_set_n_owned<true>(block, new_rec,
PAGE_DIR_SLOT_MIN_N_OWNED,
true, mtr);
if (UNIV_LIKELY_NULL(block->page.zip.data)) {
mach_write_to_2(slot, page_offset(new_rec));
goto func_exit;
page_rec_set_n_owned(up_rec, up_n_owned - 1, true);
}
/**
Try to balance an underfilled directory slot with an adjacent one,
so that there are at least the minimum number of records owned by the slot;
this may result in merging the two slots.
@param[in,out] block index page
@param[in] s the slot to be balanced */
static void page_dir_balance_slot(const buf_block_t &block, ulint s)
{
const bool comp= page_is_comp(block.frame);
ut_ad(!block.page.zip.data);
ut_ad(s > 0);
const ulint n_slots = page_dir_get_n_slots(block.frame);
if (UNIV_UNLIKELY(s + 1 == n_slots)) {
/* The last directory slot cannot be balanced. */
return;
}
} else {
new_rec = rec_get_next_ptr(old_rec, FALSE);
page_rec_set_n_owned<false>(block, old_rec, 0, false, mtr);
page_rec_set_n_owned<false>(block, new_rec,
PAGE_DIR_SLOT_MIN_N_OWNED,
false, mtr);
ut_ad(s < n_slots);
page_dir_slot_t* slot = page_dir_get_nth_slot(block.frame, s);
rec_t* const up_rec = const_cast<rec_t*>
(page_dir_slot_get_rec(slot - PAGE_DIR_SLOT_SIZE));
rec_t* const slot_rec = const_cast<rec_t*>
(page_dir_slot_get_rec(slot));
const ulint up_n_owned = comp
? rec_get_n_owned_new(up_rec)
: rec_get_n_owned_old(up_rec);
ut_ad(page_dir_slot_get_n_owned(slot)
== PAGE_DIR_SLOT_MIN_N_OWNED - 1);
if (up_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
compile_time_assert(2 * PAGE_DIR_SLOT_MIN_N_OWNED - 1
<= PAGE_DIR_SLOT_MAX_N_OWNED);
/* Merge the slots. */
page_rec_set_n_owned(slot_rec, 0, comp);
page_rec_set_n_owned(up_rec, up_n_owned
+ (PAGE_DIR_SLOT_MIN_N_OWNED - 1), comp);
/* Shift the slots */
page_dir_slot_t* last_slot = page_dir_get_nth_slot(
block.frame, n_slots - 1);
memmove_aligned<2>(last_slot + PAGE_DIR_SLOT_SIZE, last_slot,
slot - last_slot);
memset_aligned<2>(last_slot, 0, 2);
constexpr uint16_t n_slots_f = PAGE_N_DIR_SLOTS + PAGE_HEADER;
byte *n_slots_p= my_assume_aligned<2>
(n_slots_f + block.frame);
mach_write_to_2(n_slots_p, n_slots - 1);
return;
}
/* Transfer one record to the underfilled slot */
rec_t* new_rec;
if (comp) {
page_rec_set_n_owned(slot_rec, 0, true);
new_rec = rec_get_next_ptr(slot_rec, TRUE);
page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED, true);
page_rec_set_n_owned(up_rec, up_n_owned - 1, true);
} else {
page_rec_set_n_owned(slot_rec, 0, false);
new_rec = rec_get_next_ptr(slot_rec, FALSE);
page_rec_set_n_owned(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED,
false);
page_rec_set_n_owned(up_rec, up_n_owned - 1, false);
}
mtr->write<2>(*block, slot, page_offset(new_rec));
func_exit:
page_dir_slot_set_n_owned(block, up_slot, up_n_owned - 1, mtr);
mach_write_to_2(slot, page_offset(new_rec));
}
/** Allocate space for inserting an index record.
......@@ -1766,111 +1811,77 @@ page_cur_insert_rec_zip(
return insert_rec;
}
/** Prepend a record to the PAGE_FREE list.
/** Prepend a record to the PAGE_FREE list, or shrink PAGE_HEAP_TOP.
@param[in,out] block index page
@param[in,out] rec record being deleted
@param[in] index the index that the page belongs to
@param[in] offsets rec_get_offsets(rec, index)
@param[in,out] mtr mini-transaction */
static void page_mem_free(buf_block_t *block, rec_t *rec,
const dict_index_t *index, const offset_t *offsets,
mtr_t *mtr)
@param[in] data_size record payload size, in bytes
@param[in] extra_size record header size, in bytes */
static void page_mem_free(const buf_block_t &block, rec_t *rec,
size_t data_size, size_t extra_size)
{
ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(page_align(rec) == block->frame);
const rec_t *free= page_header_get_ptr(block->frame, PAGE_FREE);
ut_ad(page_align(rec) == block.frame);
ut_ad(!block.page.zip.data);
const rec_t *free= page_header_get_ptr(block.frame, PAGE_FREE);
if (UNIV_LIKELY_NULL(block->page.zip.data))
{
page_header_reset_last_insert(block, mtr);
page_zip_dir_delete(block, rec, index, offsets, free, mtr);
return;
}
const uint16_t n_heap= page_header_get_field(block->frame, PAGE_N_HEAP) - 1;
ut_ad(page_get_n_recs(block->frame) < (n_heap & 0x7fff));
alignas(4) byte page_header[6];
const bool deleting_last= n_heap == ((n_heap & 0x8000)
const uint16_t n_heap= page_header_get_field(block.frame, PAGE_N_HEAP) - 1;
ut_ad(page_get_n_recs(block.frame) < (n_heap & 0x7fff));
const bool deleting_top= n_heap == ((n_heap & 0x8000)
? (rec_get_heap_no_new(rec) | 0x8000)
: rec_get_heap_no_old(rec));
if (deleting_last)
if (deleting_top)
{
const uint16_t heap_top= page_header_get_offs(block->frame, PAGE_HEAP_TOP);
const size_t extra_savings= heap_top -
page_offset(rec_get_end(rec, offsets));
byte *page_heap_top= my_assume_aligned<2>(PAGE_HEAP_TOP + PAGE_HEADER +
block.frame);
const uint16_t heap_top= mach_read_from_2(page_heap_top);
const size_t extra_savings= heap_top - page_offset(rec + data_size);
ut_ad(extra_savings < heap_top);
/* When deleting the last record, do not add it to the PAGE_FREE list.
Instead, decrement PAGE_HEAP_TOP and PAGE_N_HEAP. */
mach_write_to_2(page_header, page_offset(rec_get_start(rec, offsets)));
mach_write_to_2(my_assume_aligned<2>(page_header + 2), n_heap);
mach_write_to_2(page_heap_top, page_offset(rec - extra_size));
mach_write_to_2(my_assume_aligned<2>(page_heap_top + 2), n_heap);
static_assert(PAGE_N_HEAP == PAGE_HEAP_TOP + 2, "compatibility");
mtr->memcpy(*block, my_assume_aligned<4>(PAGE_HEAP_TOP + PAGE_HEADER +
block->frame), page_header, 4);
if (extra_savings)
{
uint16_t garbage= page_header_get_field(block->frame, PAGE_GARBAGE);
mach_write_to_2(page_header, garbage - extra_savings);
size_t len= 2;
if (page_header_get_field(block->frame, PAGE_LAST_INSERT))
{
memset_aligned<2>(page_header + 2, 0, 2);
len= 4;
byte *page_garbage= my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER +
block.frame);
uint16_t garbage= mach_read_from_2(page_garbage);
ut_ad(garbage >= extra_savings);
mach_write_to_2(page_garbage, garbage - extra_savings);
}
mtr->memcpy(*block, my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER +
block->frame),
page_header, len);
}
else
mtr->write<2,mtr_t::OPT>(*block, my_assume_aligned<2>
(PAGE_LAST_INSERT + PAGE_HEADER + block->frame),
0U);
}
else
{
mach_write_to_2(page_header, page_offset(rec));
mach_write_to_2(my_assume_aligned<2>(page_header + 2),
rec_offs_size(offsets) +
page_header_get_field(block->frame, PAGE_GARBAGE));
static_assert(PAGE_FREE + 2 == PAGE_GARBAGE, "compatibility");
static_assert(PAGE_FREE + 4 == PAGE_LAST_INSERT, "compatibility");
size_t size;
if (page_header_get_field(block->frame, PAGE_LAST_INSERT))
{
memset_aligned<2>(page_header + 4, 0, 2);
size= 6;
}
else
size= 4;
mtr->memcpy(*block, my_assume_aligned<4>(PAGE_FREE + PAGE_HEADER +
block->frame), page_header, size);
byte *page_free= my_assume_aligned<2>(PAGE_FREE + PAGE_HEADER +
block.frame);
byte *page_garbage= my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER +
block.frame);
mach_write_to_2(page_free, page_offset(rec));
mach_write_to_2(page_garbage, mach_read_from_2(page_garbage) +
extra_size + data_size);
}
mtr->write<2>(*block, PAGE_N_RECS + PAGE_HEADER + block->frame,
ulint(page_get_n_recs(block->frame)) - 1);
memset_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER + block.frame, 0, 2);
byte *page_n_recs= my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER +
block.frame);
mach_write_to_2(page_n_recs, mach_read_from_2(page_n_recs) - 1);
if (!deleting_last)
const byte* const end= rec + data_size;
if (!deleting_top)
{
uint16_t next= free
? ((n_heap & 0x8000)
? static_cast<uint16_t>(free - rec)
: static_cast<uint16_t>(page_offset(free)))
: static_cast<uint16_t>(free - block.frame))
: 0;
mtr->write<2>(*block, rec - REC_NEXT, next);
mach_write_to_2(rec - REC_NEXT, next);
}
if (srv_immediate_scrub_data_uncompressed)
{
size_t size= rec_offs_data_size(offsets);
if (deleting_last)
{
const size_t extra_size= rec_offs_extra_size(offsets);
else
rec-= extra_size;
size+= extra_size;
}
mtr->memset(block, page_offset(rec), size, 0);
}
memset(rec, 0, end - rec);
}
/***********************************************************//**
......@@ -1886,7 +1897,6 @@ page_cur_delete_rec(
mtr_t* mtr) /*!< in/out: mini-transaction */
{
page_dir_slot_t* cur_dir_slot;
page_dir_slot_t* prev_slot;
rec_t* current_rec;
rec_t* prev_rec = NULL;
rec_t* next_rec;
......@@ -1946,10 +1956,8 @@ page_cur_delete_rec(
/* Find the next and the previous record. Note that the cursor is
left at the next record. */
ut_ad(cur_slot_no > 0);
prev_slot = page_dir_get_nth_slot(block->frame, cur_slot_no - 1);
rec = const_cast<rec_t*>(page_dir_slot_get_rec(prev_slot));
rec = const_cast<rec_t*>
(page_dir_slot_get_rec(cur_dir_slot + PAGE_DIR_SLOT_SIZE));
/* rec now points to the record of the previous directory slot. Look
for the immediate predecessor of current_rec in a loop. */
......@@ -1989,47 +1997,243 @@ page_cur_delete_rec(
mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
(next_rec - prev_rec));
mach_write_to_1(slot_rec - REC_NEW_N_OWNED,
(slot_rec[-REC_NEW_N_OWNED]
& ~REC_N_OWNED_MASK)
slot_rec[-REC_NEW_N_OWNED] = static_cast<byte>(
(slot_rec[-REC_NEW_N_OWNED] & ~REC_N_OWNED_MASK)
| (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
} else {
page_header_reset_last_insert(block, mtr);
page_zip_dir_delete(block, rec, index, offsets,
page_header_get_ptr(block->frame,
PAGE_FREE),
mtr);
if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
page_zip_dir_balance_slot(block, cur_slot_no, mtr);
}
return;
}
if (current_rec == slot_rec) {
slot_rec = prev_rec;
mtr->write<2>(*block, cur_dir_slot,
page_offset(slot_rec));
mach_write_to_2(cur_dir_slot, page_offset(slot_rec));
}
const size_t data_size = rec_offs_data_size(offsets);
const size_t extra_size = rec_offs_extra_size(offsets);
if (page_is_comp(block->frame)) {
mtr->write<2>(*block, prev_rec - REC_NEXT,
static_cast<uint16_t>
mtr->page_delete(*block, page_offset(prev_rec)
- PAGE_NEW_INFIMUM,
extra_size - REC_N_NEW_EXTRA_BYTES,
data_size);
mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
(next_rec - prev_rec));
mtr->write<1>(*block, slot_rec - REC_NEW_N_OWNED,
(slot_rec[-REC_NEW_N_OWNED]
& ~REC_N_OWNED_MASK)
| (cur_n_owned - 1)
<< REC_N_OWNED_SHIFT);
slot_rec[-REC_NEW_N_OWNED] = static_cast<byte>(
(slot_rec[-REC_NEW_N_OWNED] & ~REC_N_OWNED_MASK)
| (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
} else {
mtr->write<2>(*block, prev_rec - REC_NEXT,
page_offset(next_rec));
mtr->write<1>(*block, slot_rec - REC_OLD_N_OWNED,
(slot_rec[-REC_OLD_N_OWNED]
& ~REC_N_OWNED_MASK)
| (cur_n_owned - 1)
<< REC_N_OWNED_SHIFT);
}
mtr->page_delete(*block, page_offset(prev_rec)
- PAGE_OLD_INFIMUM);
memcpy(prev_rec - REC_NEXT, current_rec - REC_NEXT, 2);
slot_rec[-REC_OLD_N_OWNED] = static_cast<byte>(
(slot_rec[-REC_OLD_N_OWNED] & ~REC_N_OWNED_MASK)
| (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
}
/* Free the memory occupied by the record */
page_mem_free(block, current_rec, index, offsets, mtr);
page_mem_free(*block, current_rec, data_size, extra_size);
/* Now we have decremented the number of owned records of the slot.
If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
slots. */
if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
page_dir_balance_slot(block, cur_slot_no, mtr);
page_dir_balance_slot(*block, cur_slot_no);
}
ut_ad(page_is_comp(block->frame)
? page_simple_validate_new(block->frame)
: page_simple_validate_old(block->frame));
}
/** Apply a DELETE_ROW_FORMAT_REDUNDANT record that was written by
page_cur_delete_rec() for a ROW_FORMAT=REDUNDANT page.
@param block B-tree or R-tree page in ROW_FORMAT=REDUNDANT
@param prev byte offset of the predecessor, relative to PAGE_OLD_INFIMUM */
void page_apply_delete_redundant(const buf_block_t &block, ulint prev)
{
const uint16_t n_slots= page_dir_get_n_slots(block.frame);
ulint n_recs= page_get_n_recs(block.frame);
if (UNIV_UNLIKELY(!n_recs || n_slots < 2 ||
!fil_page_index_page_check(block.frame) ||
page_get_page_no(block.frame) != block.page.id.page_no() ||
mach_read_from_2(my_assume_aligned<2>
(PAGE_OLD_SUPREMUM - REC_NEXT +
block.frame)) ||
page_is_comp(block.frame)))
{
corrupted:
ib::error() << "Not applying DELETE_ROW_FORMAT_REDUNDANT"
" due to corruption on " << block.page.id;
return;
}
byte *slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
rec_t *prev_rec= block.frame + PAGE_OLD_INFIMUM + prev;
if (UNIV_UNLIKELY(prev_rec > slot))
goto corrupted;
uint16_t n= mach_read_from_2(prev_rec - REC_NEXT);
rec_t *rec= block.frame + n;
if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
slot < rec))
goto corrupted;
const ulint extra_size= REC_N_OLD_EXTRA_BYTES + rec_get_n_fields_old(rec) *
(rec_get_1byte_offs_flag(rec) ? 1 : 2);
const ulint data_size= rec_get_data_size_old(rec);
if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + extra_size ||
slot < rec + data_size))
goto corrupted;
n= mach_read_from_2(rec - REC_NEXT);
rec_t *next= block.frame + n;
if (n == PAGE_OLD_SUPREMUM);
else if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
slot < next))
goto corrupted;
rec_t *s= rec;
ulint slot_owned;
for (ulint i= n_recs; !(slot_owned= rec_get_n_owned_old(s)); )
{
n= mach_read_from_2(s - REC_NEXT);
s= block.frame + n;
if (n == PAGE_OLD_SUPREMUM);
else if (UNIV_UNLIKELY(n < PAGE_OLD_SUPREMUM_END + REC_N_OLD_EXTRA_BYTES ||
slot < s))
goto corrupted;
if (UNIV_UNLIKELY(!i--)) /* Corrupted (cyclic?) next-record list */
goto corrupted;
}
slot_owned--;
/* The first slot is always pointing to the infimum record.
Find the directory slot pointing to s. */
const byte * const first_slot= block.frame + srv_page_size - (PAGE_DIR + 2);
alignas(2) byte slot_offs[2];
mach_write_to_2(slot_offs, s - block.frame);
static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
while (memcmp_aligned<2>(slot, slot_offs, 2))
if ((slot+= 2) == first_slot)
goto corrupted;
if (rec == s)
{
s= prev_rec;
mach_write_to_2(slot, s - block.frame);
}
memcpy(prev_rec - REC_NEXT, rec - REC_NEXT, 2);
s-= REC_OLD_N_OWNED;
*s= static_cast<byte>((*s & ~REC_N_OWNED_MASK) |
slot_owned << REC_N_OWNED_SHIFT);
page_mem_free(block, rec, data_size, extra_size);
if (slot_owned < PAGE_DIR_SLOT_MIN_N_OWNED)
page_dir_balance_slot(block, (first_slot - slot) / 2);
ut_ad(page_simple_validate_old(block.frame));
}
/** Apply a DELETE_ROW_FORMAT_DYNAMIC record that was written by
page_cur_delete_rec() for a ROW_FORMAT=COMPACT or DYNAMIC page.
@param block B-tree or R-tree page in ROW_FORMAT=COMPACT or DYNAMIC
@param prev byte offset of the predecessor, relative to PAGE_NEW_INFIMUM
@param hdr_size record header size, excluding REC_N_NEW_EXTRA_BYTES
@param data_size data payload size, in bytes */
void page_apply_delete_dynamic(const buf_block_t &block, ulint prev,
size_t hdr_size, size_t data_size)
{
const uint16_t n_slots= page_dir_get_n_slots(block.frame);
ulint n_recs= page_get_n_recs(block.frame);
if (UNIV_UNLIKELY(!n_recs || n_slots < 2 ||
!fil_page_index_page_check(block.frame) ||
page_get_page_no(block.frame) != block.page.id.page_no() ||
mach_read_from_2(my_assume_aligned<2>
(PAGE_NEW_SUPREMUM - REC_NEXT +
block.frame)) ||
!page_is_comp(block.frame)))
{
corrupted:
ib::error() << "Not applying DELETE_ROW_FORMAT_DYNAMIC"
" due to corruption on " << block.page.id;
return;
}
byte *slot= page_dir_get_nth_slot(block.frame, n_slots - 1);
uint16_t n= static_cast<uint16_t>(PAGE_NEW_INFIMUM + prev);
rec_t *prev_rec= block.frame + n;
if (UNIV_UNLIKELY(prev_rec > slot))
goto corrupted;
n+= mach_read_from_2(prev_rec - REC_NEXT);
rec_t *rec= block.frame + n;
if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
slot < rec))
goto corrupted;
const ulint extra_size= REC_N_NEW_EXTRA_BYTES + hdr_size;
if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + extra_size ||
slot < rec + data_size))
goto corrupted;
n+= mach_read_from_2(rec - REC_NEXT);
rec_t *next= block.frame + n;
if (n == PAGE_NEW_SUPREMUM);
else if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
slot < next))
goto corrupted;
rec_t *s= rec;
n= static_cast<uint16_t>(rec - block.frame);
ulint slot_owned;
for (ulint i= n_recs; !(slot_owned= rec_get_n_owned_new(s)); )
{
n+= mach_read_from_2(s - REC_NEXT);
s= block.frame + n;
if (n == PAGE_NEW_SUPREMUM);
else if (UNIV_UNLIKELY(n < PAGE_NEW_SUPREMUM_END + REC_N_NEW_EXTRA_BYTES ||
slot < s))
goto corrupted;
if (UNIV_UNLIKELY(!i--)) /* Corrupted (cyclic?) next-record list */
goto corrupted;
}
slot_owned--;
/* The first slot is always pointing to the infimum record.
Find the directory slot pointing to s. */
const byte * const first_slot= block.frame + srv_page_size - (PAGE_DIR + 2);
alignas(2) byte slot_offs[2];
mach_write_to_2(slot_offs, s - block.frame);
static_assert(PAGE_DIR_SLOT_SIZE == 2, "compatibility");
while (memcmp_aligned<2>(slot, slot_offs, 2))
if ((slot+= 2) == first_slot)
goto corrupted;
if (rec == s)
{
s= prev_rec;
mach_write_to_2(slot, s - block.frame);
}
mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>(next - prev_rec));
s-= REC_NEW_N_OWNED;
*s= static_cast<byte>((*s & ~REC_N_OWNED_MASK) |
slot_owned << REC_N_OWNED_SHIFT);
page_mem_free(block, rec, data_size, extra_size);
if (slot_owned < PAGE_DIR_SLOT_MIN_N_OWNED)
page_dir_balance_slot(block, (first_slot - slot) / 2);
ut_ad(page_simple_validate_new(block.frame));
}
#ifdef UNIV_COMPILE_TEST_FUNCS
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment