Commit 2a77b2a5 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-12353: Replace MLOG_*LIST_*_DELETE and MLOG_*REC_DELETE

No longer write the following redo log records:
MLOG_COMP_LIST_END_DELETE, MLOG_LIST_END_DELETE,
MLOG_COMP_LIST_START_DELETE, MLOG_LIST_START_DELETE,
MLOG_REC_DELETE,MLOG_COMP_REC_DELETE.

Each individual deleted record will be logged separately
using physical log records.

page_dir_slot_set_n_owned(),
page_zip_rec_set_owned(), page_zip_dir_delete(), page_zip_clear_rec():
Add the parameter mtr, and write redo log.

page_dir_slot_set_rec(): Remove. Replaced with lower-level operations
that write redo log when necessary.

page_rec_set_n_owned(): Replaces rec_set_n_owned_old(),
rec_set_n_owned_new().

rec_set_heap_no(): Replaces rec_set_heap_no_old(), rec_set_heap_no_new().

page_mem_free(), page_dir_split_slot(), page_dir_balance_slot():
Add the parameter mtr.

page_dir_set_n_slots(): Merge with the caller page_dir_split_slot().

page_dir_slot_set_rec(): Merge with the callers page_dir_split_slot()
and page_dir_balance_slot().

page_cur_insert_rec_low(), page_cur_insert_rec_zip():
Suppress the logging of lower-level operations.

page_cur_delete_rec_write_log(): Remove.

page_cur_delete_rec(): Do not tolerate mtr=NULL.

rec_convert_dtuple_to_rec_old(), rec_convert_dtuple_to_rec_comp():
Replace rec_set_heap_no_old() and rec_set_heap_no_new() with direct
access that does not involve redo logging.

mtr_t::memcpy(): Do allow non-redo-logged writes to uncompressed pages
of ROW_FORMAT=COMPRESSED pages.

buf_page_io_complete(): Evict the uncompressed page of
a ROW_FORMAT=COMPRESSED page after recovery. Because we no longer
write logical log records for deleting index records, but instead
write physical records that may refer directly to the compressed
page frame of a ROW_FORMAT=COMPRESSED page, and because on recovery
we will only apply the changes to the ROW_FORMAT=COMPRESSED page,
the uncompressed page frame can be stale until page_zip_decompress()
is executed.

recv_parse_or_apply_log_rec_body(): After applying MLOG_ZIP_WRITE_STRING,
ensure that the FIL_PAGE_TYPE of the uncompressed page matches the
compressed page, because buf_flush_init_for_writing() assumes that
field to be valid.

mlog_init_t::mark_ibuf_exist(): Invoke page_zip_decompress(), because
the uncompressed page after buf_page_create() is not necessarily
up to date.

buf_LRU_block_remove_hashed(): Bypass a page_zip_validate() check
during redo log apply.

recv_apply_hashed_log_recs(): Invoke mlog_init.mark_ibuf_exist()
also for the last batch, to ensure that page_zip_decompress() will
be called for freshly initialized pages.
parent d00185c4
--loose-innodb-buffer-pool-stats
--loose-innodb-buffer-page
--loose-innodb-buffer-page-lru
--innodb-log-buffer-size=2m
--innodb-defragment=1
\ No newline at end of file
......@@ -4013,9 +4013,8 @@ btr_discard_only_page_on_level(
DBUG_ASSERT(index->table->instant);
DBUG_ASSERT(rec_is_alter_metadata(rec, *index));
btr_set_instant(block, *index, mtr);
rec = page_cur_insert_rec_low(
&cur,
index, rec, offsets, mtr);
rec = page_cur_insert_rec_low(&cur, index, rec,
offsets, mtr);
ut_ad(rec);
mem_heap_free(heap);
} else if (index->is_instant()) {
......
......@@ -202,16 +202,22 @@ inline void PageBulk::insertPage(const rec_t *rec, offset_t *offsets)
static_cast<uint16_t>(next_rec - insert_rec));
mach_write_to_2(m_cur_rec - REC_NEXT,
static_cast<uint16_t>(insert_rec - m_cur_rec));
rec_set_n_owned_new(insert_rec, NULL, 0);
rec_set_heap_no_new(insert_rec,
PAGE_HEAP_NO_USER_LOW + m_rec_no);
rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
rec_set_bit_field_2(insert_rec,
PAGE_HEAP_NO_USER_LOW + m_rec_no,
REC_NEW_HEAP_NO,
REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
} else {
mach_write_to_2(insert_rec - REC_NEXT,
mach_read_from_2(m_cur_rec - REC_NEXT));
mach_write_to_2(m_cur_rec - REC_NEXT, page_offset(insert_rec));
rec_set_n_owned_old(insert_rec, 0);
rec_set_heap_no_old(insert_rec,
PAGE_HEAP_NO_USER_LOW + m_rec_no);
rec_set_bit_field_1(insert_rec, 0, REC_OLD_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
rec_set_bit_field_2(insert_rec,
PAGE_HEAP_NO_USER_LOW + m_rec_no,
REC_OLD_HEAP_NO,
REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
}
/* 4. Set member variables. */
......@@ -282,8 +288,9 @@ inline void PageBulk::finishPage()
{
slot-= PAGE_DIR_SLOT_SIZE;
mach_write_to_2(slot, offset);
rec_set_n_owned_new(m_page + offset, nullptr, count);
count = 0;
rec_set_bit_field_1(m_page + offset, count, REC_NEW_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
count= 0;
}
uint16_t next= (mach_read_from_2(m_page + offset - REC_NEXT) + offset) &
......@@ -308,8 +315,8 @@ inline void PageBulk::finishPage()
{
slot-= PAGE_DIR_SLOT_SIZE;
mach_write_to_2(slot, page_offset(insert_rec));
rec_set_n_owned_old(insert_rec, count);
rec_set_bit_field_1(insert_rec, count, REC_OLD_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
count= 0;
}
......@@ -328,13 +335,26 @@ inline void PageBulk::finishPage()
count+= (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
page_dir_slot_set_n_owned(slot, nullptr, 0);
rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot));
rec_set_bit_field_1(rec, 0, m_is_comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
slot+= PAGE_DIR_SLOT_SIZE;
}
slot-= PAGE_DIR_SLOT_SIZE;
page_dir_slot_set_rec(slot, page_get_supremum_rec(m_page));
page_dir_slot_set_n_owned(slot, nullptr, count + 1);
if (m_is_comp)
{
mach_write_to_2(slot, PAGE_NEW_SUPREMUM);
rec_set_bit_field_1(m_page + PAGE_NEW_SUPREMUM, count + 1, REC_NEW_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
}
else
{
mach_write_to_2(slot, PAGE_OLD_SUPREMUM);
rec_set_bit_field_1(m_page + PAGE_OLD_SUPREMUM, count + 1, REC_OLD_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
}
ut_ad(!dict_index_is_spatial(m_index));
ut_ad(!page_get_instant(m_page));
......
......@@ -5535,13 +5535,27 @@ buf_page_io_complete(buf_page_t* bpage, bool dblwr, bool evict)
ut_ad(buf_pool->n_pend_reads > 0);
buf_pool->n_pend_reads--;
buf_pool->stat.n_pages_read++;
ut_ad(!uncompressed || !bpage->zip.data
|| !recv_recovery_is_on()
|| buf_page_can_relocate(bpage));
mutex_exit(block_mutex);
if (uncompressed) {
#if 1 /* MDEV-12353 FIXME: Remove this! */
if (UNIV_LIKELY_NULL(bpage->zip.data)
&& recv_recovery_is_on()) {
rw_lock_x_unlock_gen(
&reinterpret_cast<buf_block_t*>(bpage)
->lock, BUF_IO_READ);
if (!buf_LRU_free_page(bpage, false)) {
ut_ad(!"could not remove");
}
goto func_exit;
}
#endif
rw_lock_x_unlock_gen(&((buf_block_t*) bpage)->lock,
BUF_IO_READ);
}
mutex_exit(block_mutex);
} else {
/* Write means a flush operation: call the completion
routine in the flush system */
......@@ -5575,9 +5589,8 @@ buf_page_io_complete(buf_page_t* bpage, bool dblwr, bool evict)
DBUG_PRINT("ib_buf", ("%s page %u:%u",
io_type == BUF_IO_READ ? "read" : "wrote",
bpage->id.space(), bpage->id.page_no()));
func_exit:
mutex_exit(&buf_pool->mutex);
return DB_SUCCESS;
}
......
......@@ -1765,7 +1765,10 @@ buf_LRU_block_remove_hashed(
case FIL_PAGE_INDEX:
case FIL_PAGE_RTREE:
#if defined UNIV_ZIP_DEBUG && defined BTR_CUR_HASH_ADAPT
ut_a(page_zip_validate(
/* During recovery, we only update the
compressed page, not the uncompressed one. */
ut_a(recv_recovery_is_on()
|| page_zip_validate(
&bpage->zip, page,
((buf_block_t*) bpage)->index));
#endif /* UNIV_ZIP_DEBUG && BTR_CUR_HASH_ADAPT */
......
......@@ -209,22 +209,6 @@ page_cur_insert_rec_zip(
offset_t* offsets,/*!< in/out: rec_get_offsets(rec, index) */
mtr_t* mtr) /*!< in/out: mini-transaction */
MY_ATTRIBUTE((nonnull, warn_unused_result));
/*************************************************************//**
Copies records from page to a newly created page, from a given record onward,
including that record. Infimum and supremum records are not copied.
IMPORTANT: The caller will have to update IBUF_BITMAP_FREE
if this is a compressed leaf page in a secondary index.
This has to be done either within the same mini-transaction,
or by invoking ibuf_reset_free_bits() before mtr_commit(). */
ATTRIBUTE_COLD /* only used when crash-upgrading */
void
page_copy_rec_list_end_to_created_page(
/*===================================*/
page_t* new_page, /*!< in/out: index page to copy to */
rec_t* rec, /*!< in: first record to copy */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr); /*!< in: mtr */
/***********************************************************//**
Deletes a record at the page cursor. The cursor is moved to the
next record after the deleted one. */
......@@ -363,6 +347,7 @@ page_parse_copy_rec_list_to_created_page(
/***********************************************************//**
Parses log record of a record delete on a page.
@return pointer to record end or NULL */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_cur_parse_delete_rec(
/*======================*/
......
......@@ -406,6 +406,38 @@ inline trx_id_t page_get_max_trx_id(const page_t *page)
return mach_read_from_8(p);
}
/**
Set the number of owned records.
@tparam compressed whether to update any ROW_FORMAT=COMPRESSED page as well
@param[in,out] block index page
@param[in,out] rec ROW_FORMAT=REDUNDANT record
@param[in] n_owned number of records skipped in the sparse page directory
@param[in] comp whether ROW_FORMAT is one of COMPACT,DYNAMIC,COMPRESSED
@param[in,out] mtr mini-transaction */
template<bool compressed>
inline void page_rec_set_n_owned(buf_block_t *block, rec_t *rec, ulint n_owned,
bool comp, mtr_t *mtr)
{
ut_ad(block->frame == page_align(rec));
ut_ad(comp == (page_is_comp(block->frame) != 0));
if (page_zip_des_t *page_zip= compressed
? buf_block_get_page_zip(block) : nullptr)
{
ut_ad(comp);
rec_set_bit_field_1(rec, n_owned, REC_NEW_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
if (rec_get_status(rec) != REC_STATUS_SUPREMUM)
page_zip_rec_set_owned(block, rec, n_owned, mtr);
}
else
{
rec-= comp ? REC_NEW_N_OWNED : REC_OLD_N_OWNED;
mtr->write<1,mtr_t::OPT>(*block, rec, (*rec & ~REC_N_OWNED_MASK) |
(n_owned << REC_N_OWNED_SHIFT));
}
}
/*************************************************************//**
Sets the max trx id field value. */
void
......@@ -620,17 +652,6 @@ uint16_t
page_dir_get_n_slots(
/*=================*/
const page_t* page); /*!< in: index page */
/*************************************************************//**
Sets the number of dir slots in directory. */
UNIV_INLINE
void
page_dir_set_n_slots(
/*=================*/
page_t* page, /*!< in/out: page */
page_zip_des_t* page_zip,/*!< in/out: compressed page whose
uncompressed part will be updated, or NULL */
ulint n_slots);/*!< in: number of slots */
/** Gets the pointer to a directory slot.
@param n sparse directory slot number
@return pointer to the sparse directory slot */
......@@ -664,14 +685,6 @@ inline const rec_t *page_dir_slot_get_rec(const page_dir_slot_t *slot)
return page_dir_slot_get_rec(const_cast<rec_t*>(slot));
}
/***************************************************************//**
This is used to set the record offset in a directory slot. */
UNIV_INLINE
void
page_dir_slot_set_rec(
/*==================*/
page_dir_slot_t*slot, /*!< in: directory slot */
const rec_t* rec); /*!< in: record on the page */
/***************************************************************//**
Gets the number of records owned by a directory slot.
@return number of records */
UNIV_INLINE
......@@ -679,15 +692,6 @@ ulint
page_dir_slot_get_n_owned(
/*======================*/
const page_dir_slot_t* slot); /*!< in: page directory slot */
/***************************************************************//**
This is used to set the owned records field of a directory slot. */
UNIV_INLINE
void
page_dir_slot_set_n_owned(
/*======================*/
page_dir_slot_t*slot, /*!< in/out: directory slot */
page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
ulint n); /*!< in: number of records owned by the slot */
/************************************************************//**
Calculates the space reserved for directory slots of a given
number of records. The exact value is a fraction number
......@@ -1138,6 +1142,7 @@ page_move_rec_list_start(
/**********************************************************//**
Parses a log record of a record list end or start deletion.
@return end of log record or NULL */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_parse_delete_rec_list(
/*=======================*/
......
......@@ -419,19 +419,6 @@ page_dir_get_n_slots(
{
return(page_header_get_field(page, PAGE_N_DIR_SLOTS));
}
/*************************************************************//**
Sets the number of dir slots in directory. */
UNIV_INLINE
void
page_dir_set_n_slots(
/*=================*/
page_t* page, /*!< in/out: page */
page_zip_des_t* page_zip,/*!< in/out: compressed page whose
uncompressed part will be updated, or NULL */
ulint n_slots)/*!< in: number of slots */
{
page_header_set_field(page, page_zip, PAGE_N_DIR_SLOTS, n_slots);
}
/*************************************************************//**
Gets the number of records in the heap.
......@@ -487,20 +474,6 @@ page_rec_check(
return(TRUE);
}
/***************************************************************//**
This is used to set the record offset in a directory slot. */
UNIV_INLINE
void
page_dir_slot_set_rec(
/*==================*/
page_dir_slot_t*slot, /*!< in: directory slot */
const rec_t* rec) /*!< in: record on the page */
{
ut_ad(page_rec_check(rec));
mach_write_to_2(slot, page_offset(rec));
}
/***************************************************************//**
Gets the number of records owned by a directory slot.
@return number of records */
......@@ -518,25 +491,6 @@ page_dir_slot_get_n_owned(
}
}
/***************************************************************//**
This is used to set the owned records field of a directory slot. */
UNIV_INLINE
void
page_dir_slot_set_n_owned(
/*======================*/
page_dir_slot_t*slot, /*!< in/out: directory slot */
page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
ulint n) /*!< in: number of records owned by the slot */
{
rec_t* rec = (rec_t*) page_dir_slot_get_rec(slot);
if (page_rec_is_comp(slot)) {
rec_set_n_owned_new(rec, page_zip, n);
} else {
ut_ad(!page_zip);
rec_set_n_owned_old(rec, n);
}
}
/************************************************************//**
Calculates the space reserved for directory slots of a given number of
records. The exact value is a fraction number n * PAGE_DIR_SLOT_SIZE /
......
......@@ -40,6 +40,8 @@ typedef byte page_t;
#ifndef UNIV_INNOCHECKSUM
/** Index page cursor */
struct page_cur_t;
/** Buffer pool block */
struct buf_block_t;
/** Compressed index page */
typedef byte page_zip_t;
......@@ -150,9 +152,10 @@ must already have been written on the uncompressed page. */
void
page_zip_rec_set_owned(
/*===================*/
page_zip_des_t* page_zip,/*!< in/out: compressed page */
buf_block_t* block, /*!< in/out: ROW_FORMAT=COMPRESSED page */
const byte* rec, /*!< in: record on the uncompressed page */
ulint flag) /*!< in: the owned flag (nonzero=TRUE) */
ulint flag, /*!< in: the owned flag (nonzero=TRUE) */
mtr_t* mtr) /*!< in/out: mini-transaction */
MY_ATTRIBUTE((nonnull));
#endif /* !UNIV_INNOCHECKSUM */
#endif
......@@ -360,9 +360,10 @@ must already have been written on the uncompressed page. */
void
page_zip_rec_set_owned(
/*===================*/
page_zip_des_t* page_zip,/*!< in/out: compressed page */
buf_block_t* block, /*!< in/out: ROW_FORMAT=COMPRESSED page */
const byte* rec, /*!< in: record on the uncompressed page */
ulint flag) /*!< in: the owned flag (nonzero=TRUE) */
ulint flag, /*!< in: the owned flag (nonzero=TRUE) */
mtr_t* mtr) /*!< in/out: mini-transaction */
MY_ATTRIBUTE((nonnull));
/**********************************************************************//**
......@@ -385,9 +386,10 @@ page_zip_dir_delete(
byte* rec, /*!< in: deleted record */
const dict_index_t* index, /*!< in: index of rec */
const offset_t* offsets, /*!< in: rec_get_offsets(rec) */
const byte* free) /*!< in: previous start of
const byte* free, /*!< in: previous start of
the free list */
MY_ATTRIBUTE((nonnull(1,2,3,4)));
mtr_t* mtr) /*!< in/out: mini-transaction */
MY_ATTRIBUTE((nonnull(1,2,3,4,6)));
/**********************************************************************//**
Add a slot to the dense page directory. */
......
......@@ -241,15 +241,6 @@ rec_get_n_owned_old(
const rec_t* rec) /*!< in: old-style physical record */
MY_ATTRIBUTE((warn_unused_result));
/******************************************************//**
The following function is used to set the number of owned records. */
UNIV_INLINE
void
rec_set_n_owned_old(
/*================*/
rec_t* rec, /*!< in: old-style physical record */
ulint n_owned) /*!< in: the number of owned */
MY_ATTRIBUTE((nonnull));
/******************************************************//**
The following function is used to get the number of records owned by the
previous directory record.
@return number of owned records */
......@@ -260,16 +251,6 @@ rec_get_n_owned_new(
const rec_t* rec) /*!< in: new-style physical record */
MY_ATTRIBUTE((warn_unused_result));
/******************************************************//**
The following function is used to set the number of owned records. */
UNIV_INLINE
void
rec_set_n_owned_new(
/*================*/
rec_t* rec, /*!< in/out: new-style physical record */
page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
ulint n_owned)/*!< in: the number of owned */
MY_ATTRIBUTE((nonnull(1)));
/******************************************************//**
The following function is used to retrieve the info bits of
a record.
@return info bits */
......@@ -418,16 +399,6 @@ rec_get_heap_no_old(
const rec_t* rec) /*!< in: physical record */
MY_ATTRIBUTE((warn_unused_result));
/******************************************************//**
The following function is used to set the heap number
field in an old-style record. */
UNIV_INLINE
void
rec_set_heap_no_old(
/*================*/
rec_t* rec, /*!< in: physical record */
ulint heap_no)/*!< in: the heap number */
MY_ATTRIBUTE((nonnull));
/******************************************************//**
The following function is used to get the order number
of a new-style record in the heap of the index page.
@return heap order number */
......@@ -438,16 +409,6 @@ rec_get_heap_no_new(
const rec_t* rec) /*!< in: physical record */
MY_ATTRIBUTE((warn_unused_result));
/******************************************************//**
The following function is used to set the heap number
field in a new-style record. */
UNIV_INLINE
void
rec_set_heap_no_new(
/*================*/
rec_t* rec, /*!< in/out: physical record */
ulint heap_no)/*!< in: the heap number */
MY_ATTRIBUTE((nonnull));
/******************************************************//**
The following function is used to test whether the data offsets
in the record are stored in one-byte or two-byte format.
@return TRUE if 1-byte form */
......
......@@ -503,19 +503,6 @@ rec_get_n_owned_old(
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT));
}
/******************************************************//**
The following function is used to set the number of owned records. */
UNIV_INLINE
void
rec_set_n_owned_old(
/*================*/
rec_t* rec, /*!< in: old-style physical record */
ulint n_owned) /*!< in: the number of owned */
{
rec_set_bit_field_1(rec, n_owned, REC_OLD_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
}
/******************************************************//**
The following function is used to get the number of records owned by the
previous directory record.
......@@ -530,23 +517,6 @@ rec_get_n_owned_new(
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT));
}
/******************************************************//**
The following function is used to set the number of owned records. */
UNIV_INLINE
void
rec_set_n_owned_new(
/*================*/
rec_t* rec, /*!< in/out: new-style physical record */
page_zip_des_t* page_zip,/*!< in/out: compressed page, or NULL */
ulint n_owned)/*!< in: the number of owned */
{
rec_set_bit_field_1(rec, n_owned, REC_NEW_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
if (page_zip && rec_get_status(rec) != REC_STATUS_SUPREMUM) {
page_zip_rec_set_owned(page_zip, rec, n_owned);
}
}
/******************************************************//**
The following function is used to retrieve the info bits of a record.
@return info bits */
......@@ -674,20 +644,6 @@ rec_get_heap_no_old(
REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT));
}
/******************************************************//**
The following function is used to set the heap number
field in an old-style record. */
UNIV_INLINE
void
rec_set_heap_no_old(
/*================*/
rec_t* rec, /*!< in: physical record */
ulint heap_no)/*!< in: the heap number */
{
rec_set_bit_field_2(rec, heap_no, REC_OLD_HEAP_NO,
REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
}
/******************************************************//**
The following function is used to get the order number
of a new-style record in the heap of the index page.
......@@ -702,20 +658,6 @@ rec_get_heap_no_new(
REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT));
}
/******************************************************//**
The following function is used to set the heap number
field in a new-style record. */
UNIV_INLINE
void
rec_set_heap_no_new(
/*================*/
rec_t* rec, /*!< in/out: physical record */
ulint heap_no)/*!< in: the heap number */
{
rec_set_bit_field_2(rec, heap_no, REC_NEW_HEAP_NO,
REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
}
/******************************************************//**
The following function is used to test whether the data offsets in the record
are stored in one-byte or two-byte format.
......
......@@ -313,7 +313,6 @@ class mlog_init_t
void mark_ibuf_exist(mtr_t& mtr)
{
ut_ad(mutex_own(&recv_sys.mutex));
ut_ad(!recv_no_ibuf_operations);
mtr.start();
for (const map::value_type& i : inits) {
......@@ -324,6 +323,21 @@ class mlog_init_t
i.first, 0, RW_X_LATCH, NULL,
BUF_GET_IF_IN_POOL, __FILE__, __LINE__,
&mtr)) {
if (UNIV_LIKELY_NULL(block->page.zip.data)
&& fil_page_type_is_index(
fil_page_get_type(
block->page.zip.data))
&& !page_zip_decompress(&block->page.zip,
block->frame,
true)) {
ib::error() << "corrupted page "
<< block->page.id;
}
if (recv_no_ibuf_operations) {
mtr.commit();
mtr.start();
continue;
}
mutex_exit(&recv_sys.mutex);
block->page.ibuf_exist = ibuf_page_exists(
block->page);
......@@ -1570,6 +1584,13 @@ recv_parse_or_apply_log_rec_body(
}
break;
case MLOG_REC_INSERT: case MLOG_COMP_REC_INSERT:
if (!page_zip) {
} else if (!page_zip_decompress(page_zip, page, true)) {
ib::error() << "corrupted page " << block->page.id;
} else {
ut_d(page_type = fil_page_get_type(page));
}
ut_ad(!page || fil_page_type_is_index(page_type));
if (NULL != (ptr = mlog_parse_index(
......@@ -1603,6 +1624,13 @@ recv_parse_or_apply_log_rec_body(
page, page_zip, mtr);
break;
case MLOG_REC_UPDATE_IN_PLACE: case MLOG_COMP_REC_UPDATE_IN_PLACE:
if (!page_zip) {
} else if (!page_zip_decompress(page_zip, page, true)) {
ib::error() << "corrupted page " << block->page.id;
} else {
ut_d(page_type = fil_page_get_type(page));
}
ut_ad(!page || fil_page_type_is_index(page_type));
if (NULL != (ptr = mlog_parse_index(
......@@ -2113,22 +2141,10 @@ static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
memcpy_aligned<8>(FIL_PAGE_LSN
+ page_zip->data,
FIL_PAGE_LSN + page, 8);
if (fil_page_index_page_check(page)
&& !page_zip_decompress(page_zip, page,
true)) {
ib::error() << "corrupted page "
<< block->page.id;
}
}
}
}
#ifdef UNIV_ZIP_DEBUG
ut_ad(!fil_page_index_page_check(page)
|| !page_zip
|| page_zip_validate_low(page_zip, page, NULL, FALSE));
#endif /* UNIV_ZIP_DEBUG */
if (start_lsn) {
buf_block_modify_clock_inc(block);
log_flush_order_mutex_enter();
......@@ -2479,7 +2495,7 @@ void recv_apply_hashed_log_recs(bool last_batch)
log_mutex_enter();
mutex_enter(&(recv_sys.mutex));
mlog_init.reset();
} else if (!recv_no_ibuf_operations) {
} else {
/* We skipped this in buf_page_create(). */
mlog_init.mark_ibuf_exist(mtr);
}
......
......@@ -276,8 +276,6 @@ void mtr_t::memcpy(const buf_block_t &b, ulint ofs, ulint len)
ut_ad(len);
ut_ad(ofs <= ulint(srv_page_size));
ut_ad(ofs + len <= ulint(srv_page_size));
ut_ad(ofs + len < PAGE_DATA || !b.page.zip.data ||
mach_read_from_2(b.frame + FIL_PAGE_TYPE) <= FIL_PAGE_TYPE_ZBLOB2);
set_modified();
if (get_log_mode() != MTR_LOG_ALL)
......@@ -287,6 +285,9 @@ void mtr_t::memcpy(const buf_block_t &b, ulint ofs, ulint len)
return;
}
ut_ad(ofs + len < PAGE_DATA || !b.page.zip.data ||
mach_read_from_2(b.frame + FIL_PAGE_TYPE) <= FIL_PAGE_TYPE_ZBLOB2);
byte *l= get_log()->open(11 + 2 + 2);
l= mlog_write_initial_log_record_low(MLOG_WRITE_STRING, b.page.id.space(),
b.page.id.page_no(), l, this);
......
......@@ -966,6 +966,24 @@ page_cur_insert_rec_write_log(
}
}
static void rec_set_heap_no(rec_t *rec, ulint heap_no, bool compact)
{
rec_set_bit_field_2(rec, heap_no,
compact ? REC_NEW_HEAP_NO : REC_OLD_HEAP_NO,
REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
}
static void rec_set_heap_no(const buf_block_t& block, rec_t *rec,
ulint heap_no, bool compact, mtr_t *mtr)
{
rec-= compact ? REC_NEW_HEAP_NO : REC_OLD_HEAP_NO;
// MDEV-12353 FIXME: try single-byte write if possible
mtr->write<2,mtr_t::OPT>(block, rec,
(mach_read_from_2(rec) & ~REC_HEAP_NO_MASK) |
(heap_no << REC_HEAP_NO_SHIFT));
}
/***********************************************************//**
Parses a log record of a record insert on a page.
@return end of log record or NULL */
......@@ -1115,15 +1133,13 @@ page_cur_parse_insert_rec(
memcpy(buf, rec_get_start(cursor_rec, offsets), mismatch_index);
memcpy(buf + mismatch_index, ptr, end_seg_len);
rec_set_heap_no(buf + origin_offset, PAGE_HEAP_NO_USER_LOW,
page_is_comp(page));
if (page_is_comp(page)) {
rec_set_heap_no_new(buf + origin_offset,
PAGE_HEAP_NO_USER_LOW);
rec_set_info_and_status_bits(buf + origin_offset,
info_and_status_bits);
} else {
rec_set_heap_no_old(buf + origin_offset,
PAGE_HEAP_NO_USER_LOW);
rec_set_info_bits_old(buf + origin_offset,
info_and_status_bits);
}
......@@ -1196,64 +1212,105 @@ page_direction_increment(
1U + page_header_get_field(page, PAGE_N_DIRECTION));
}
/** Split a directory slot which owns too many records.
@param[in,out] page index page
@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL
@param[in] s the slot that needs to be split */
static void page_dir_split_slot(page_t* page, page_zip_des_t* page_zip,
ulint s)
/**
Set the owned records field of the record pointed to by a directory slot.
@tparam compressed whether to update any ROW_FORMAT=COMPRESSED page as well
@param[in,out] block file page
@param[in] slot sparse directory slot
@param[in,out] n number of records owned by the directory slot
@param[in,out] mtr mini-transaction */
template<bool compressed>
static void page_dir_slot_set_n_owned(buf_block_t *block,
const page_dir_slot_t *slot,
ulint n, mtr_t *mtr)
{
ut_ad(!page_zip || page_is_comp(page));
ut_ad(s);
page_dir_slot_t* slot = page_dir_get_nth_slot(page, s);
const ulint n_owned = PAGE_DIR_SLOT_MAX_N_OWNED + 1;
ut_ad(page_dir_slot_get_n_owned(slot) == n_owned);
compile_time_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
>= PAGE_DIR_SLOT_MIN_N_OWNED);
/* 1. We loop to find a record approximately in the middle of the
records owned by the slot. */
const rec_t* rec = page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE);
for (ulint i = n_owned / 2; i--; ) {
rec = page_rec_get_next_const(rec);
}
/* 2. Add a directory slot immediately below this one. */
const ulint n_slots = page_dir_get_n_slots(page);
page_dir_set_n_slots(page, page_zip, n_slots + 1);
page_dir_slot_t* last_slot = page_dir_get_nth_slot(page, n_slots);
memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE,
slot - last_slot);
/* 3. We store the appropriate values to the new slot. */
page_dir_slot_set_rec(slot, rec);
page_dir_slot_set_n_owned(slot, page_zip, n_owned / 2);
rec_t *rec= const_cast<rec_t*>(page_dir_slot_get_rec(slot));
page_rec_set_n_owned<compressed>(block, rec, n, page_rec_is_comp(rec), mtr);
}
/* 4. Finally, we update the number of records field of the
original slot */
page_dir_slot_set_n_owned(slot - PAGE_DIR_SLOT_SIZE,
page_zip, n_owned - (n_owned / 2));
/**
Split a directory slot which owns too many records.
@tparam compressed whether to update the ROW_FORMAT=COMPRESSED page as well
@param[in,out] block index page
@param[in] s the slot that needs to be split
@param[in,out] mtr mini-transaction */
template<bool compressed>
static void page_dir_split_slot(buf_block_t *block, ulint s, mtr_t* mtr)
{
ut_ad(!block->page.zip.data || page_is_comp(block->frame));
ut_ad(!compressed || block->page.zip.data);
ut_ad(s);
page_dir_slot_t *slot= page_dir_get_nth_slot(block->frame, s);
const ulint n_owned= PAGE_DIR_SLOT_MAX_N_OWNED + 1;
ut_ad(page_dir_slot_get_n_owned(slot) == n_owned);
compile_time_assert((PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2
>= PAGE_DIR_SLOT_MIN_N_OWNED);
/* 1. We loop to find a record approximately in the middle of the
records owned by the slot. */
const rec_t *rec= page_dir_slot_get_rec(slot + PAGE_DIR_SLOT_SIZE);
for (ulint i= n_owned / 2; i--; )
rec= page_rec_get_next_const(rec);
/* Add a directory slot immediately below this one. */
byte *n_slots_p= PAGE_N_DIR_SLOTS + PAGE_HEADER + block->frame;
const uint16_t n_slots= mach_read_from_2(n_slots_p);
page_dir_slot_t *last_slot= static_cast<page_dir_slot_t*>
(block->frame + srv_page_size - (PAGE_DIR + PAGE_DIR_SLOT_SIZE) -
n_slots * PAGE_DIR_SLOT_SIZE);
memmove_aligned<2>(last_slot, last_slot + PAGE_DIR_SLOT_SIZE,
slot - last_slot);
const ulint half_owned= n_owned / 2;
if (compressed && UNIV_LIKELY_NULL(block->page.zip.data))
{
/* Log changes to the compressed page header and the dense page
directory. */
mach_write_to_2(n_slots_p, n_slots + 1);
page_zip_write_header(&block->page.zip, n_slots_p, 2, mtr);
mach_write_to_2(slot, page_offset(rec));
page_rec_set_n_owned<true>(block, page_dir_slot_get_rec(slot), half_owned,
true, mtr);
page_rec_set_n_owned<true>(block,
page_dir_slot_get_rec(slot -
PAGE_DIR_SLOT_SIZE),
n_owned - half_owned, true, mtr);
}
else
{
mtr->write<2>(*block, n_slots_p, 1U + n_slots);
mtr->memcpy(*block, page_offset(last_slot), slot - last_slot);
mtr->write<2>(*block, slot, page_offset(rec));
const bool comp= page_is_comp(block->frame) != 0;
page_rec_set_n_owned<false>(block, page_dir_slot_get_rec(slot), half_owned,
comp, mtr);
page_rec_set_n_owned<false>(block,
page_dir_slot_get_rec(slot -
PAGE_DIR_SLOT_SIZE),
n_owned - half_owned, comp, mtr);
}
}
/** Try to balance an underfilled directory slot with an adjacent one,
/**
Try to balance an underfilled directory slot with an adjacent one,
so that there are at least the minimum number of records owned by the slot;
this may result in merging the two slots.
@param[in,out] page index page
@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL
@param[in] s the slot to be balanced */
static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip,
ulint s)
@param[in,out] block index page
@param[in] s the slot to be balanced
@param[in,out] mtr mini-transaction */
static void page_dir_balance_slot(buf_block_t *block, ulint s, mtr_t *mtr)
{
ut_ad(!page_zip || page_is_comp(page));
ut_ad(!block->page.zip.data || page_is_comp(block->frame));
ut_ad(s > 0);
const ulint n_slots = page_dir_get_n_slots(page);
const ulint n_slots = page_dir_get_n_slots(block->frame);
if (UNIV_UNLIKELY(s + 1 == n_slots)) {
/* The last directory slot cannot be balanced. */
......@@ -1262,9 +1319,10 @@ static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip,
ut_ad(s < n_slots);
page_dir_slot_t* slot = page_dir_get_nth_slot(page, s);
page_dir_slot_t* slot = page_dir_get_nth_slot(block->frame, s);
page_dir_slot_t* up_slot = slot - PAGE_DIR_SLOT_SIZE;
const ulint up_n_owned = page_dir_slot_get_n_owned(up_slot);
page_zip_des_t* page_zip = buf_block_get_page_zip(block);
ut_ad(page_dir_slot_get_n_owned(slot)
== PAGE_DIR_SLOT_MIN_N_OWNED - 1);
......@@ -1274,17 +1332,33 @@ static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip,
<= PAGE_DIR_SLOT_MAX_N_OWNED);
/* Merge the slots. */
ulint n_owned = page_dir_slot_get_n_owned(slot);
page_dir_slot_set_n_owned(slot, page_zip, 0);
page_dir_slot_set_n_owned(up_slot, page_zip,
n_owned
+ page_dir_slot_get_n_owned(up_slot));
page_dir_slot_set_n_owned<true>(block, slot, 0, mtr);
page_dir_slot_set_n_owned<true>(block, up_slot, n_owned
+ page_dir_slot_get_n_owned(
up_slot), mtr);
/* Shift the slots */
page_dir_slot_t* last_slot = page_dir_get_nth_slot(
page, n_slots - 1);
block->frame, n_slots - 1);
memmove_aligned<2>(last_slot + PAGE_DIR_SLOT_SIZE, last_slot,
slot - last_slot);
mach_write_to_2(last_slot, 0);
page_dir_set_n_slots(page, page_zip, n_slots - 1);
if (UNIV_LIKELY_NULL(page_zip)) {
memset_aligned<2>(last_slot, 0, 2);
mach_write_to_2(PAGE_N_DIR_SLOTS + PAGE_HEADER
+ block->frame, n_slots - 1);
page_zip_write_header(page_zip,
PAGE_N_DIR_SLOTS + PAGE_HEADER
+ block->frame, 2, mtr);
} else {
mtr->write<2>(*block,
PAGE_N_DIR_SLOTS + PAGE_HEADER
+ block->frame,
n_slots - 1);
mtr->write<2>(*block, last_slot, 0U);
mtr->memcpy(*block, page_offset(last_slot)
+ PAGE_DIR_SLOT_SIZE,
slot - last_slot);
}
return;
}
......@@ -1292,21 +1366,29 @@ static void page_dir_balance_slot(page_t* page, page_zip_des_t* page_zip,
rec_t* old_rec = const_cast<rec_t*>(page_dir_slot_get_rec(slot));
rec_t* new_rec;
if (page_is_comp(page)) {
if (page_is_comp(block->frame)) {
new_rec = rec_get_next_ptr(old_rec, TRUE);
rec_set_n_owned_new(old_rec, page_zip, 0);
rec_set_n_owned_new(new_rec, page_zip,
PAGE_DIR_SLOT_MIN_N_OWNED);
page_rec_set_n_owned<true>(block, old_rec, 0, true, mtr);
page_rec_set_n_owned<true>(block, new_rec,
PAGE_DIR_SLOT_MIN_N_OWNED,
true, mtr);
if (UNIV_LIKELY_NULL(page_zip)) {
mach_write_to_2(slot, page_offset(new_rec));
goto func_exit;
}
} else {
new_rec = rec_get_next_ptr(old_rec, FALSE);
rec_set_n_owned_old(old_rec, 0);
rec_set_n_owned_old(new_rec, PAGE_DIR_SLOT_MIN_N_OWNED);
page_rec_set_n_owned<false>(block, old_rec, 0, false, mtr);
page_rec_set_n_owned<false>(block, new_rec,
PAGE_DIR_SLOT_MIN_N_OWNED,
false, mtr);
}
page_dir_slot_set_rec(slot, new_rec);
page_dir_slot_set_n_owned(up_slot, page_zip, up_n_owned - 1);
mtr->write<2>(*block, slot, page_offset(new_rec));
func_exit:
page_dir_slot_set_n_owned<true>(block, up_slot, up_n_owned - 1, mtr);
}
/** Allocate space for inserting an index record.
......@@ -1346,7 +1428,6 @@ page_cur_insert_rec_low(
{
byte* insert_buf;
ulint rec_size;
page_t* page; /*!< the relevant page */
rec_t* last_insert; /*!< cursor position at previous
insert */
rec_t* free_rec; /*!< a free record that was reused,
......@@ -1356,19 +1437,27 @@ page_cur_insert_rec_low(
record */
rec_t* current_rec = cur->rec;
buf_block_t* block = cur->block;
ut_ad(rec_offs_validate(rec, index, offsets));
page = page_align(current_rec);
ut_ad(dict_table_is_comp(index->table)
== (ibool) !!page_is_comp(page));
ut_ad(fil_page_index_page_check(page));
ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
== (ibool) !!page_is_comp(block->frame));
ut_ad(fil_page_index_page_check(block->frame));
ut_ad(mach_read_from_8(PAGE_HEADER + PAGE_INDEX_ID + block->frame)
== index->id
|| index->is_dummy
|| mtr->is_inside_ibuf());
ut_ad(!page_rec_is_supremum(current_rec));
const mtr_log_t log_mode = mtr->set_log_mode(MTR_LOG_NONE);
/* We should not write log for ROW_FORMAT=COMPRESSED pages here. */
ut_ad(log_mode == MTR_LOG_NONE
|| log_mode == MTR_LOG_NO_REDO
|| !(index->table->flags & DICT_TF_MASK_ZIP_SSIZE));
/* 1. Get the size of the physical record in the page */
rec_size = rec_offs_size(offsets);
......@@ -1391,7 +1480,7 @@ page_cur_insert_rec_low(
/* 2. Try to find suitable space from page memory management */
free_rec = page_header_get_ptr(page, PAGE_FREE);
free_rec = page_header_get_ptr(block->frame, PAGE_FREE);
if (UNIV_LIKELY_NULL(free_rec)) {
/* Try to allocate from the head of the free list. */
offset_t foffsets_[REC_OFFS_NORMAL_SIZE];
......@@ -1401,7 +1490,7 @@ page_cur_insert_rec_low(
rec_offs_init(foffsets_);
foffsets = rec_get_offsets(
free_rec, index, foffsets, page_is_leaf(page),
free_rec, index, foffsets, page_is_leaf(block->frame),
ULINT_UNDEFINED, &heap);
if (rec_offs_size(foffsets) < rec_size) {
if (UNIV_LIKELY_NULL(heap)) {
......@@ -1413,16 +1502,16 @@ page_cur_insert_rec_low(
insert_buf = free_rec - rec_offs_extra_size(foffsets);
if (page_is_comp(page)) {
if (page_is_comp(block->frame)) {
heap_no = rec_get_heap_no_new(free_rec);
page_mem_alloc_free(page, NULL,
rec_get_next_ptr(free_rec, TRUE),
rec_size);
page_mem_alloc_free(block->frame, NULL,
rec_get_next_ptr(free_rec, TRUE),
rec_size);
} else {
heap_no = rec_get_heap_no_old(free_rec);
page_mem_alloc_free(page, NULL,
rec_get_next_ptr(free_rec, FALSE),
rec_size);
page_mem_alloc_free(block->frame, NULL,
rec_get_next_ptr(free_rec, FALSE),
rec_size);
}
if (UNIV_LIKELY_NULL(heap)) {
......@@ -1431,17 +1520,19 @@ page_cur_insert_rec_low(
} else {
use_heap:
free_rec = NULL;
insert_buf = page_mem_alloc_heap(page, NULL,
insert_buf = page_mem_alloc_heap(block->frame, NULL,
rec_size, &heap_no);
if (UNIV_UNLIKELY(insert_buf == NULL)) {
mtr->set_log_mode(log_mode);
return(NULL);
}
}
/* 3. Create the record */
insert_rec = rec_copy(insert_buf, rec, offsets);
rec_offs_make_valid(insert_rec, index, page_is_leaf(page), offsets);
rec_offs_make_valid(insert_rec, index, page_is_leaf(block->frame),
offsets);
/* 4. Insert the record in the linked list of records */
ut_ad(current_rec != insert_rec);
......@@ -1450,7 +1541,7 @@ page_cur_insert_rec_low(
/* next record after current before the insertion */
rec_t* next_rec = page_rec_get_next(current_rec);
#ifdef UNIV_DEBUG
if (page_is_comp(page)) {
if (page_is_comp(block->frame)) {
switch (rec_get_status(current_rec)) {
case REC_STATUS_ORDINARY:
case REC_STATUS_NODE_PTR:
......@@ -1476,56 +1567,59 @@ page_cur_insert_rec_low(
page_rec_set_next(current_rec, insert_rec);
}
page_header_set_field(page, NULL, PAGE_N_RECS,
1U + page_get_n_recs(page));
page_header_set_field(block->frame, NULL, PAGE_N_RECS,
1U + page_get_n_recs(block->frame));
/* 5. Set the n_owned field in the inserted record to zero,
and set the heap_no field */
if (page_is_comp(page)) {
rec_set_n_owned_new(insert_rec, NULL, 0);
rec_set_heap_no_new(insert_rec, heap_no);
} else {
rec_set_n_owned_old(insert_rec, 0);
rec_set_heap_no_old(insert_rec, heap_no);
}
page_rec_set_n_owned<false>(block, insert_rec, 0,
page_is_comp(block->frame), mtr);
rec_set_heap_no(*block, insert_rec, heap_no,
page_is_comp(block->frame), mtr);
UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
rec_offs_size(offsets));
/* 6. Update the last insertion info in page header */
last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
ut_ad(!last_insert || !page_is_comp(page)
last_insert = page_header_get_ptr(block->frame, PAGE_LAST_INSERT);
ut_ad(!last_insert || !page_is_comp(block->frame)
|| rec_get_node_ptr_flag(last_insert)
== rec_get_node_ptr_flag(insert_rec));
if (!dict_index_is_spatial(index)) {
byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + page;
byte* ptr = PAGE_HEADER + PAGE_DIRECTION_B + block->frame;
if (UNIV_UNLIKELY(last_insert == NULL)) {
no_direction:
page_direction_reset(ptr, page, NULL);
page_direction_reset(ptr, block->frame, NULL);
} else if (last_insert == current_rec
&& page_ptr_get_direction(ptr) != PAGE_LEFT) {
page_direction_increment(ptr, page, NULL, PAGE_RIGHT);
page_direction_increment(ptr, block->frame, NULL,
PAGE_RIGHT);
} else if (page_ptr_get_direction(ptr) != PAGE_RIGHT
&& page_rec_get_next(insert_rec) == last_insert) {
page_direction_increment(ptr, page, NULL, PAGE_LEFT);
page_direction_increment(ptr, block->frame, NULL,
PAGE_LEFT);
} else {
goto no_direction;
}
}
page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
page_header_set_ptr(block->frame, NULL, PAGE_LAST_INSERT, insert_rec);
/* 7. It remains to update the owner record. */
{
rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
ulint n_owned;
if (page_is_comp(page)) {
if (page_is_comp(block->frame)) {
n_owned = rec_get_n_owned_new(owner_rec);
rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
rec_set_bit_field_1(owner_rec, n_owned + 1,
REC_NEW_N_OWNED, REC_N_OWNED_MASK,
REC_N_OWNED_SHIFT);
} else {
n_owned = rec_get_n_owned_old(owner_rec);
rec_set_n_owned_old(owner_rec, n_owned + 1);
rec_set_bit_field_1(owner_rec, n_owned + 1,
REC_OLD_N_OWNED, REC_N_OWNED_MASK,
REC_N_OWNED_SHIFT);
}
/* 8. Now we have incremented the n_owned field of the owner
......@@ -1533,17 +1627,16 @@ page_cur_insert_rec_low(
we have to split the corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
page_dir_split_slot(
page, NULL,
page_dir_find_owner_slot(owner_rec));
page_dir_split_slot<false>(
block,
page_dir_find_owner_slot(owner_rec), mtr);
}
}
/* 9. Write log record of the insert */
if (UNIV_LIKELY(mtr != NULL)) {
page_cur_insert_rec_write_log(insert_rec, rec_size,
current_rec, index, mtr);
}
mtr->set_log_mode(log_mode);
page_cur_insert_rec_write_log(insert_rec, rec_size,
current_rec, index, mtr);
return(insert_rec);
}
......@@ -1903,8 +1996,10 @@ page_cur_insert_rec_zip(
/* 5. Set the n_owned field in the inserted record to zero,
and set the heap_no field */
rec_set_n_owned_new(insert_rec, NULL, 0);
rec_set_heap_no_new(insert_rec, heap_no);
rec_set_bit_field_1(insert_rec, 0, REC_NEW_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
rec_set_bit_field_2(insert_rec, heap_no, REC_NEW_HEAP_NO,
REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
rec_offs_size(offsets));
......@@ -1944,16 +2039,20 @@ page_cur_insert_rec_zip(
ulint n_owned;
n_owned = rec_get_n_owned_new(owner_rec);
rec_set_n_owned_new(owner_rec, page_zip, n_owned + 1);
rec_set_bit_field_1(owner_rec, n_owned + 1, REC_NEW_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
/* 8. Now we have incremented the n_owned field of the owner
record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
we have to split the corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
page_dir_split_slot(
page, page_zip,
page_dir_find_owner_slot(owner_rec));
const mtr_log_t log_mode = mtr->set_log_mode(
MTR_LOG_NONE);
page_dir_split_slot<true>(
page_cur_get_block(cursor),
page_dir_find_owner_slot(owner_rec), mtr);
mtr->set_log_mode(log_mode);
}
}
......@@ -2045,12 +2144,13 @@ ATTRIBUTE_COLD /* only used when crash-upgrading */
void
page_copy_rec_list_end_to_created_page(
/*===================================*/
page_t* new_page, /*!< in/out: index page to copy to */
buf_block_t* block, /*!< in/out: index page to copy to */
rec_t* rec, /*!< in: first record to copy */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mtr */
{
page_dir_slot_t* slot = 0; /* remove warning */
page_t* new_page = block->frame;
byte* heap_top;
rec_t* insert_rec = 0; /* remove warning */
rec_t* prev_rec;
......@@ -2063,6 +2163,8 @@ page_copy_rec_list_end_to_created_page(
offset_t* offsets = offsets_;
rec_offs_init(offsets_);
/* The record was never emitted for ROW_FORMAT=COMPRESSED pages. */
ut_ad(!block->page.zip.data);
ut_ad(page_dir_get_n_heap(new_page) == PAGE_HEAP_NO_USER_LOW);
ut_ad(page_align(rec) != new_page);
ut_ad(page_rec_is_comp(rec) == page_is_comp(new_page));
......@@ -2084,9 +2186,10 @@ page_copy_rec_list_end_to_created_page(
#ifdef UNIV_DEBUG
/* To pass the debug tests we have to set these dummy values
in the debug version */
page_dir_set_n_slots(new_page, NULL, srv_page_size / 2);
page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP,
new_page + srv_page_size - 1);
mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + new_page,
srv_page_size / 2);
mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + new_page,
srv_page_size - 1);
#endif
prev_rec = page_get_infimum_rec(new_page);
if (page_is_comp(new_page)) {
......@@ -2105,22 +2208,21 @@ page_copy_rec_list_end_to_created_page(
ULINT_UNDEFINED, &heap);
insert_rec = rec_copy(heap_top, rec, offsets);
if (page_is_comp(new_page)) {
const bool comp = page_is_comp(new_page) != 0;
if (comp) {
rec_set_next_offs_new(prev_rec,
page_offset(insert_rec));
rec_set_n_owned_new(insert_rec, NULL, 0);
rec_set_heap_no_new(insert_rec,
PAGE_HEAP_NO_USER_LOW + n_recs);
} else {
rec_set_next_offs_old(prev_rec,
page_offset(insert_rec));
rec_set_n_owned_old(insert_rec, 0);
rec_set_heap_no_old(insert_rec,
PAGE_HEAP_NO_USER_LOW + n_recs);
}
page_rec_set_n_owned<false>(block, insert_rec, 0, comp, mtr);
rec_set_heap_no(insert_rec, PAGE_HEAP_NO_USER_LOW + n_recs,
page_is_comp(new_page));
count++;
n_recs++;
......@@ -2130,9 +2232,9 @@ page_copy_rec_list_end_to_created_page(
slot_index++;
slot = page_dir_get_nth_slot(new_page, slot_index);
page_dir_slot_set_rec(slot, insert_rec);
page_dir_slot_set_n_owned(slot, NULL, count);
mach_write_to_2(slot, page_offset(insert_rec));
page_dir_slot_set_n_owned<false>(block, slot, count,
mtr);
count = 0;
}
......@@ -2164,7 +2266,7 @@ page_copy_rec_list_end_to_created_page(
count += (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2;
page_dir_slot_set_n_owned(slot, NULL, 0);
page_dir_slot_set_n_owned<false>(block, slot, 0, mtr);
slot_index--;
}
......@@ -2173,18 +2275,24 @@ page_copy_rec_list_end_to_created_page(
mem_heap_free(heap);
}
slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
if (page_is_comp(new_page)) {
rec_set_next_offs_new(insert_rec, PAGE_NEW_SUPREMUM);
mach_write_to_2(slot, PAGE_NEW_SUPREMUM);
rec_set_bit_field_1(new_page + PAGE_NEW_SUPREMUM, count + 1,
REC_NEW_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
} else {
rec_set_next_offs_old(insert_rec, PAGE_OLD_SUPREMUM);
mach_write_to_2(slot, PAGE_OLD_SUPREMUM);
rec_set_bit_field_1(new_page + PAGE_OLD_SUPREMUM, count + 1,
REC_OLD_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
}
slot = page_dir_get_nth_slot(new_page, 1 + slot_index);
page_dir_slot_set_rec(slot, page_get_supremum_rec(new_page));
page_dir_slot_set_n_owned(slot, NULL, count + 1);
page_dir_set_n_slots(new_page, NULL, 2 + slot_index);
mach_write_to_2(PAGE_HEADER + PAGE_N_DIR_SLOTS + new_page,
2 + slot_index);
page_header_set_ptr(new_page, NULL, PAGE_HEAP_TOP, heap_top);
page_dir_set_n_heap(new_page, NULL, PAGE_HEAP_NO_USER_LOW + n_recs);
page_header_set_field(new_page, NULL, PAGE_N_RECS, n_recs);
......@@ -2195,41 +2303,10 @@ page_copy_rec_list_end_to_created_page(
new_page, NULL);
}
/***********************************************************//**
Writes log record of a record delete on a page. */
UNIV_INLINE
void
page_cur_delete_rec_write_log(
/*==========================*/
rec_t* rec, /*!< in: record to be deleted */
const dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mini-transaction handle */
{
byte* log_ptr;
ut_ad(!!page_rec_is_comp(rec) == dict_table_is_comp(index->table));
ut_ad(mtr->is_named_space(index->table->space));
log_ptr = mlog_open_and_write_index(mtr, rec, index,
page_rec_is_comp(rec)
? MLOG_COMP_REC_DELETE
: MLOG_REC_DELETE, 2);
if (!log_ptr) {
/* Logging in mtr is switched off during crash recovery:
in that case mlog_open returns NULL */
return;
}
/* Write the cursor rec offset as a 2-byte ulint */
mach_write_to_2(log_ptr, page_offset(rec));
mlog_close(mtr, log_ptr + 2);
}
/***********************************************************//**
Parses log record of a record delete on a page.
@return pointer to record end or NULL */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_cur_parse_delete_rec(
/*======================*/
......@@ -2283,33 +2360,40 @@ page_cur_parse_delete_rec(
}
/** Prepend a record to the PAGE_FREE list.
@param[in,out] page index page
@param[in,out] page_zip ROW_FORMAT=COMPRESSED page, or NULL
@param[in,out] rec record being deleted
@param[in] index the index that the page belongs to
@param[in] offsets rec_get_offsets(rec, index) */
static void page_mem_free(page_t* page, page_zip_des_t* page_zip, rec_t* rec,
const dict_index_t* index, const offset_t* offsets)
@param[in,out] block index page
@param[in,out] rec record being deleted
@param[in] index the index that the page belongs to
@param[in] offsets rec_get_offsets(rec, index)
@param[in,out] mtr mini-transaction */
static void page_mem_free(buf_block_t *block, rec_t *rec,
const dict_index_t *index, const offset_t *offsets,
mtr_t *mtr)
{
ut_ad(rec_offs_validate(rec, index, offsets));
const rec_t* free = page_header_get_ptr(page, PAGE_FREE);
if (srv_immediate_scrub_data_uncompressed) {
/* scrub record */
memset(rec, 0, rec_offs_data_size(offsets));
}
page_rec_set_next(rec, free);
page_header_set_ptr(page, page_zip, PAGE_FREE, rec);
page_header_set_field(page, page_zip, PAGE_GARBAGE,
rec_offs_size(offsets)
+ page_header_get_field(page, PAGE_GARBAGE));
if (page_zip) {
page_zip_dir_delete(page_zip, rec, index, offsets, free);
} else {
page_header_set_field(page, page_zip, PAGE_N_RECS,
ulint(page_get_n_recs(page)) - 1);
}
ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(page_align(rec) == block->frame);
const rec_t *free= page_header_get_ptr(block->frame, PAGE_FREE);
if (UNIV_LIKELY_NULL(block->page.zip.data))
page_zip_dir_delete(&block->page.zip, rec, index, offsets, free, mtr);
else
{
if (srv_immediate_scrub_data_uncompressed)
mtr->memset(block, page_offset(rec), rec_offs_data_size(offsets), 0);
uint16_t next= free
? (page_is_comp(block->frame)
? static_cast<uint16_t>(free - rec)
: static_cast<uint16_t>(page_offset(free)))
: 0;
mtr->write<2>(*block, rec - REC_NEXT, next);
mtr->write<2>(*block, PAGE_FREE + PAGE_HEADER + block->frame,
page_offset(rec));
mtr->write<2>(*block, PAGE_GARBAGE + PAGE_HEADER + block->frame,
rec_offs_size(offsets)
+ page_header_get_field(block->frame, PAGE_GARBAGE));
mtr->write<2>(*block, PAGE_N_RECS + PAGE_HEADER + block->frame,
ulint(page_get_n_recs(block->frame)) - 1);
}
}
/***********************************************************//**
......@@ -2356,12 +2440,15 @@ page_cur_delete_rec(
ut_ad(page_rec_is_user_rec(current_rec));
if (page_get_n_recs(block->frame) == 1
#if 1 /* MDEV-12353 TODO: skip this for the physical log format */
/* Empty the page, unless we are applying the redo log
during crash recovery. During normal operation, the
page_create_empty() gets logged as one of MLOG_PAGE_CREATE,
MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */
&& !recv_recovery_is_on()
#endif
&& !rec_is_alter_metadata(current_rec, *index)) {
/* Empty the page, unless we are applying the redo log
during crash recovery. During normal operation, the
page_create_empty() gets logged as one of MLOG_PAGE_CREATE,
MLOG_COMP_PAGE_CREATE, MLOG_ZIP_PAGE_COMPRESS. */
/* Empty the page. */
ut_ad(page_is_leaf(block->frame));
/* Usually, this should be the root page,
and the whole index tree should become empty.
......@@ -2383,10 +2470,7 @@ page_cur_delete_rec(
/* 1. Reset the last insert info in the page header and increment
the modify clock for the frame */
page_zip_des_t* const page_zip = buf_block_get_page_zip(block);
page_header_set_ptr(block->frame, page_zip, PAGE_LAST_INSERT, NULL);
page_header_reset_last_insert(block, mtr);
/* The page gets invalid for btr_pcur_restore_pos().
We avoid invoking buf_block_modify_clock_inc(block) because its
......@@ -2394,8 +2478,6 @@ page_cur_delete_rec(
used during IMPORT TABLESPACE. */
block->modify_clock++;
page_cur_delete_rec_write_log(current_rec, index, mtr);
/* 2. Find the next and the previous record. Note that the cursor is
left at the next record. */
......@@ -2416,34 +2498,72 @@ page_cur_delete_rec(
next_rec = cursor->rec;
/* 3. Remove the record from the linked list of records */
page_rec_set_next(prev_rec, next_rec);
/* 4. If the deleted record is pointed to by a dir slot, update the
record pointer in slot. In the following if-clause we assume that
prev_rec is owned by the same slot, i.e., PAGE_DIR_SLOT_MIN_N_OWNED
>= 2. */
/* 5. Update the number of owned records of the slot */
compile_time_assert(PAGE_DIR_SLOT_MIN_N_OWNED >= 2);
ut_ad(cur_n_owned > 1);
if (current_rec == page_dir_slot_get_rec(cur_dir_slot)) {
page_dir_slot_set_rec(cur_dir_slot, prev_rec);
}
rec_t* slot_rec = const_cast<rec_t*>
(page_dir_slot_get_rec(cur_dir_slot));
if (UNIV_LIKELY_NULL(block->page.zip.data)) {
ut_ad(page_is_comp(block->frame));
if (current_rec == slot_rec) {
page_zip_rec_set_owned(block, prev_rec, 1, mtr);
page_zip_rec_set_owned(block, slot_rec, 0, mtr);
slot_rec = prev_rec;
mach_write_to_2(cur_dir_slot, page_offset(slot_rec));
} else if (cur_n_owned == 1
&& !page_rec_is_supremum(slot_rec)) {
page_zip_rec_set_owned(block, slot_rec, 0, mtr);
}
/* 5. Update the number of owned records of the slot */
mach_write_to_2(prev_rec - REC_NEXT, static_cast<uint16_t>
(next_rec - prev_rec));
mach_write_to_1(slot_rec - REC_NEW_N_OWNED,
(slot_rec[-REC_NEW_N_OWNED]
& ~REC_N_OWNED_MASK)
| (cur_n_owned - 1) << REC_N_OWNED_SHIFT);
} else {
if (current_rec == slot_rec) {
slot_rec = prev_rec;
mtr->write<2>(*block, cur_dir_slot,
page_offset(slot_rec));
}
page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
if (page_is_comp(block->frame)) {
mtr->write<2>(*block, prev_rec - REC_NEXT,
static_cast<uint16_t>
(next_rec - prev_rec));
mtr->write<1>(*block, slot_rec - REC_NEW_N_OWNED,
(slot_rec[-REC_NEW_N_OWNED]
& ~REC_N_OWNED_MASK)
| (cur_n_owned - 1)
<< REC_N_OWNED_SHIFT);
} else {
mtr->write<2>(*block, prev_rec - REC_NEXT,
page_offset(next_rec));
mtr->write<1>(*block, slot_rec - REC_OLD_N_OWNED,
(slot_rec[-REC_OLD_N_OWNED]
& ~REC_N_OWNED_MASK)
| (cur_n_owned - 1)
<< REC_N_OWNED_SHIFT);
}
}
/* 6. Free the memory occupied by the record */
page_mem_free(block->frame, page_zip, current_rec, index, offsets);
page_mem_free(block, current_rec, index, offsets, mtr);
/* 7. Now we have decremented the number of owned records of the slot.
If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
slots. */
if (cur_n_owned <= PAGE_DIR_SLOT_MIN_N_OWNED) {
page_dir_balance_slot(block->frame, page_zip, cur_slot_no);
page_dir_balance_slot(block, cur_slot_no, mtr);
}
}
......
......@@ -842,35 +842,10 @@ page_copy_rec_list_start(
return(ret);
}
/**********************************************************//**
Writes a log record of a record list end or start deletion. */
UNIV_INLINE
void
page_delete_rec_list_write_log(
/*===========================*/
rec_t* rec, /*!< in: record on page */
dict_index_t* index, /*!< in: record descriptor */
mlog_id_t type, /*!< in: operation type:
MLOG_LIST_END_DELETE, ... */
mtr_t* mtr) /*!< in: mtr */
{
byte* log_ptr;
ut_ad(type == MLOG_LIST_END_DELETE
|| type == MLOG_LIST_START_DELETE
|| type == MLOG_COMP_LIST_END_DELETE
|| type == MLOG_COMP_LIST_START_DELETE);
log_ptr = mlog_open_and_write_index(mtr, rec, index, type, 2);
if (log_ptr) {
/* Write the parameter as a 2-byte ulint */
mach_write_to_2(log_ptr, page_offset(rec));
mlog_close(mtr, log_ptr + 2);
}
}
/**********************************************************//**
Parses a log record of a record list end or start deletion.
@return end of log record or NULL */
ATTRIBUTE_COLD /* only used when crash-upgrading */
const byte*
page_parse_delete_rec_list(
/*=======================*/
......@@ -993,29 +968,20 @@ page_delete_rec_list_end(
}
}
/* Reset the last insert info in the page header and increment
the modify clock for the frame */
page_header_set_ptr(block->frame, page_zip, PAGE_LAST_INSERT, NULL);
/* The page gets invalid for optimistic searches: increment the
frame modify clock */
buf_block_modify_clock_inc(block);
page_delete_rec_list_write_log(rec, index, page_is_comp(block->frame)
? MLOG_COMP_LIST_END_DELETE
: MLOG_LIST_END_DELETE, mtr);
const bool is_leaf = page_is_leaf(block->frame);
byte* last_insert = my_assume_aligned<2>(PAGE_LAST_INSERT + PAGE_HEADER
+ block->frame);
if (page_zip) {
mtr_log_t log_mode;
if (UNIV_LIKELY_NULL(page_zip)) {
ut_ad(page_is_comp(block->frame));
/* Individual deletes are not logged */
log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
memset(last_insert, 0, 2);
page_zip_write_header(page_zip, last_insert, 2, mtr);
do {
page_cur_t cur;
......@@ -1034,12 +1000,11 @@ page_delete_rec_list_end(
mem_heap_free(heap);
}
/* Restore log mode */
mtr_set_log_mode(mtr, log_mode);
return;
}
mtr->write<2,mtr_t::OPT>(*block, last_insert, 0U);
prev_rec = page_rec_get_prev(rec);
last_rec = page_rec_get_prev(page_get_supremum_rec(block->frame));
......@@ -1100,6 +1065,20 @@ page_delete_rec_list_end(
slot_index = page_dir_find_owner_slot(rec2);
ut_ad(slot_index > 0);
slot = page_dir_get_nth_slot(block->frame, slot_index);
mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_NEW_SUPREMUM);
byte* owned = PAGE_NEW_SUPREMUM - REC_NEW_N_OWNED
+ block->frame;
byte new_owned = (*owned & ~REC_N_OWNED_MASK)
| static_cast<byte>(n_owned << REC_N_OWNED_SHIFT);
mtr->write<1,mtr_t::OPT>(*block, owned, new_owned);
mtr->write<2>(*block, prev_rec - REC_NEXT,
static_cast<uint16_t>
(PAGE_NEW_SUPREMUM - page_offset(prev_rec)));
uint16_t free = page_header_get_field(block->frame, PAGE_FREE);
mtr->write<2>(*block, last_rec - REC_NEXT, free
? static_cast<uint16_t>
(free - page_offset(last_rec))
: 0U);
} else {
rec_t* rec2 = rec;
ulint count = 0;
......@@ -1116,29 +1095,32 @@ page_delete_rec_list_end(
slot_index = page_dir_find_owner_slot(rec2);
ut_ad(slot_index > 0);
slot = page_dir_get_nth_slot(block->frame, slot_index);
mtr->write<2,mtr_t::OPT>(*block, slot, PAGE_OLD_SUPREMUM);
byte* owned = PAGE_OLD_SUPREMUM - REC_OLD_N_OWNED
+ block->frame;
byte new_owned = (*owned & ~REC_N_OWNED_MASK)
| static_cast<byte>(n_owned << REC_N_OWNED_SHIFT);
mtr->write<1,mtr_t::OPT>(*block, owned, new_owned);
mtr->write<2>(*block, prev_rec - REC_NEXT, PAGE_OLD_SUPREMUM);
mtr->write<2>(*block, last_rec - REC_NEXT,
page_header_get_field(block->frame, PAGE_FREE));
}
page_dir_slot_set_rec(slot, page_get_supremum_rec(block->frame));
page_dir_slot_set_n_owned(slot, NULL, n_owned);
page_dir_set_n_slots(block->frame, NULL, slot_index + 1);
/* Remove the record chain segment from the record chain */
page_rec_set_next(prev_rec, page_get_supremum_rec(block->frame));
mtr->write<2,mtr_t::OPT>(*block, PAGE_N_DIR_SLOTS + PAGE_HEADER
+ block->frame, slot_index + 1);
/* Catenate the deleted chain segment to the page free list */
page_rec_set_next(last_rec, page_header_get_ptr(block->frame,
PAGE_FREE));
page_header_set_ptr(block->frame, NULL, PAGE_FREE, rec);
mtr->write<2>(*block, PAGE_FREE + PAGE_HEADER + block->frame,
page_offset(rec));
byte* garbage = my_assume_aligned<2>(PAGE_GARBAGE + PAGE_HEADER
+ block->frame);
mtr->write<2>(*block, garbage, size + mach_read_from_2(garbage));
page_header_set_field(block->frame, NULL, PAGE_GARBAGE, size
+ page_header_get_field(block->frame,
PAGE_GARBAGE));
ut_ad(page_get_n_recs(block->frame) > n_recs);
page_header_set_field(block->frame, NULL, PAGE_N_RECS,
ulint{page_get_n_recs(block->frame) - n_recs});
byte* page_n_recs = my_assume_aligned<2>(PAGE_N_RECS + PAGE_HEADER
+ block->frame);
mtr->write<2>(*block, page_n_recs,
ulint{mach_read_from_2(page_n_recs)} - n_recs);
}
/*************************************************************//**
......@@ -1187,22 +1169,9 @@ page_delete_rec_list_start(
return;
}
mlog_id_t type;
if (page_rec_is_comp(rec)) {
type = MLOG_COMP_LIST_START_DELETE;
} else {
type = MLOG_LIST_START_DELETE;
}
page_delete_rec_list_write_log(rec, index, type, mtr);
page_cur_set_before_first(block, &cur1);
page_cur_move_to_next(&cur1);
/* Individual deletes are not logged */
mtr_log_t log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
const bool is_leaf = page_rec_is_leaf(rec);
while (page_cur_get_rec(&cur1) != rec) {
......@@ -1215,10 +1184,6 @@ page_delete_rec_list_start(
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
/* Restore log mode */
mtr_set_log_mode(mtr, log_mode);
}
/*************************************************************//**
......
......@@ -4223,7 +4223,8 @@ page_zip_clear_rec(
page_zip_des_t* page_zip, /*!< in/out: compressed page */
byte* rec, /*!< in: record to clear */
const dict_index_t* index, /*!< in: index of rec */
const offset_t* offsets) /*!< in: rec_get_offsets(rec, index) */
const offset_t* offsets, /*!< in: rec_get_offsets(rec, index) */
mtr_t* mtr) /*!< in/out: mini-transaction */
{
ulint heap_no;
page_t* page = page_align(rec);
......@@ -4256,11 +4257,20 @@ page_zip_clear_rec(
rec_offs_n_fields(offsets) - 1,
&len);
ut_ad(len == REC_NODE_PTR_SIZE);
ut_ad(!rec_offs_any_extern(offsets));
memset(field, 0, REC_NODE_PTR_SIZE);
memset(storage - (heap_no - 1) * REC_NODE_PTR_SIZE,
0, REC_NODE_PTR_SIZE);
storage -= (heap_no - 1) * REC_NODE_PTR_SIZE;
clear_page_zip:
/* TODO: write MEMSET record */
memset(storage, 0, len);
if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + len)) {
log_ptr = mlog_write_initial_log_record_fast(
rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr);
mach_write_to_2(log_ptr, storage - page_zip->data);
mach_write_to_2(log_ptr + 2, len);
memcpy(log_ptr + 4, storage, len);
mlog_close(mtr, log_ptr + 4 + len);
}
} else if (dict_index_is_clust(index)) {
/* Clear trx_id and roll_ptr. On the compressed page,
there is an array of these fields immediately before the
......@@ -4269,14 +4279,9 @@ page_zip_clear_rec(
= dict_col_get_clust_pos(
dict_table_get_sys_col(
index->table, DATA_TRX_ID), index);
storage = page_zip_dir_start(page_zip);
field = rec_get_nth_field(rec, offsets, trx_id_pos, &len);
ut_ad(len == DATA_TRX_ID_LEN);
memset(field, 0, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
memset(storage - (heap_no - 1)
* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN),
0, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
if (rec_offs_any_extern(offsets)) {
ulint i;
......@@ -4295,6 +4300,12 @@ page_zip_clear_rec(
}
}
}
len = DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
storage = page_zip_dir_start(page_zip)
- (heap_no - 1)
* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
goto clear_page_zip;
} else {
ut_ad(!rec_offs_any_extern(offsets));
}
......@@ -4338,18 +4349,33 @@ must already have been written on the uncompressed page. */
void
page_zip_rec_set_owned(
/*===================*/
page_zip_des_t* page_zip,/*!< in/out: compressed page */
buf_block_t* block, /*!< in/out: ROW_FORMAT=COMPRESSED page */
const byte* rec, /*!< in: record on the uncompressed page */
ulint flag) /*!< in: the owned flag (nonzero=TRUE) */
ulint flag, /*!< in: the owned flag (nonzero=TRUE) */
mtr_t* mtr) /*!< in/out: mini-transaction */
{
ut_ad(page_align(rec) == block->frame);
page_zip_des_t* const page_zip = &block->page.zip;
byte* slot = page_zip_dir_find(page_zip, page_offset(rec));
ut_a(slot);
UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
const byte b = *slot;
if (flag) {
*slot |= (PAGE_ZIP_DIR_SLOT_OWNED >> 8);
} else {
*slot &= ~(PAGE_ZIP_DIR_SLOT_OWNED >> 8);
}
if (b == *slot) {
} else if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2 + 1)) {
log_ptr = mlog_write_initial_log_record_low(
MLOG_ZIP_WRITE_STRING,
block->page.id.space(), block->page.id.page_no(),
log_ptr, mtr);
mach_write_to_2(log_ptr, slot - page_zip->data);
mach_write_to_2(log_ptr + 2, 1);
log_ptr[4] = *slot;
mlog_close(mtr, log_ptr + 5);
}
}
/**********************************************************************//**
......@@ -4442,12 +4468,12 @@ page_zip_dir_delete(
byte* rec, /*!< in: deleted record */
const dict_index_t* index, /*!< in: index of rec */
const offset_t* offsets, /*!< in: rec_get_offsets(rec) */
const byte* free) /*!< in: previous start of
const byte* free, /*!< in: previous start of
the free list */
mtr_t* mtr) /*!< in/out: mini-transaction */
{
byte* slot_rec;
byte* slot_free;
ulint n_ext;
page_t* page = page_align(rec);
ut_ad(rec_offs_validate(rec, index, offsets));
......@@ -4458,6 +4484,15 @@ page_zip_dir_delete(
UNIV_MEM_ASSERT_RW(rec - rec_offs_extra_size(offsets),
rec_offs_extra_size(offsets));
mach_write_to_2(rec - REC_NEXT, free
? static_cast<uint16_t>(free - rec) : 0);
mach_write_to_2(PAGE_FREE + PAGE_HEADER + page, page_offset(rec));
byte* garbage = PAGE_GARBAGE + PAGE_HEADER + page;
mach_write_to_2(garbage, rec_offs_size(offsets)
+ mach_read_from_2(garbage));
compile_time_assert(PAGE_GARBAGE == PAGE_FREE + 2);
page_zip_write_header(page_zip, PAGE_FREE + PAGE_HEADER + page,
4, mtr);
slot_rec = page_zip_dir_find(page_zip, page_offset(rec));
ut_a(slot_rec);
......@@ -4465,8 +4500,9 @@ page_zip_dir_delete(
ut_ad(n_recs);
ut_ad(n_recs > 1 || page_get_page_no(page) == index->page);
/* This could not be done before page_zip_dir_find(). */
page_header_set_field(page, page_zip, PAGE_N_RECS,
n_recs - 1);
mach_write_to_2(PAGE_N_RECS + PAGE_HEADER + page, n_recs - 1);
page_zip_write_header(page_zip, PAGE_N_RECS + PAGE_HEADER + page,
2, mtr);
if (UNIV_UNLIKELY(!free)) {
/* Make the last slot the start of the free list. */
......@@ -4482,22 +4518,34 @@ page_zip_dir_delete(
slot_free += PAGE_ZIP_DIR_SLOT_SIZE;
}
if (UNIV_LIKELY(slot_rec > slot_free)) {
const ulint slot_len = slot_rec > slot_free
? ulint(slot_rec - slot_free)
: 0;
if (slot_len) {
memmove_aligned<2>(slot_free + PAGE_ZIP_DIR_SLOT_SIZE,
slot_free, ulint(slot_rec - slot_free));
slot_free, slot_len);
/* TODO: issue MEMMOVE record to reduce log volume */
}
/* Write the entry for the deleted record.
The "owned" and "deleted" flags will be cleared. */
mach_write_to_2(slot_free, page_offset(rec));
if (!page_is_leaf(page) || !dict_index_is_clust(index)) {
ut_ad(!rec_offs_any_extern(offsets));
goto skip_blobs;
if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) {
log_ptr = mlog_write_initial_log_record_fast(
rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr);
mach_write_to_2(log_ptr, slot_free - page_zip->data);
mach_write_to_2(log_ptr + 2, slot_len
+ PAGE_ZIP_DIR_SLOT_SIZE);
mlog_close(mtr, log_ptr + 4);
mlog_catenate_string(mtr, slot_free, slot_len
+ PAGE_ZIP_DIR_SLOT_SIZE);
}
n_ext = rec_offs_n_extern(offsets);
if (UNIV_UNLIKELY(n_ext != 0)) {
if (const ulint n_ext = rec_offs_n_extern(offsets)) {
ut_ad(index->is_primary());
ut_ad(page_is_leaf(page));
/* Shift and zero fill the array of BLOB pointers. */
ulint blob_no;
byte* externs;
......@@ -4510,24 +4558,34 @@ page_zip_dir_delete(
- (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW)
* PAGE_ZIP_CLUST_LEAF_SLOT_SIZE;
ext_end = externs - page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE;
externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;
ext_end = externs - page_zip->n_blobs * FIELD_REF_SIZE;
page_zip->n_blobs -= static_cast<unsigned>(n_ext);
/* Shift and zero fill the array. */
memmove(ext_end + n_ext * BTR_EXTERN_FIELD_REF_SIZE, ext_end,
ulint(page_zip->n_blobs - blob_no)
memmove(ext_end + n_ext * FIELD_REF_SIZE, ext_end,
ulint(page_zip->n_blobs - n_ext - blob_no)
* BTR_EXTERN_FIELD_REF_SIZE);
memset(ext_end, 0, n_ext * BTR_EXTERN_FIELD_REF_SIZE);
memset(ext_end, 0, n_ext * FIELD_REF_SIZE);
/* TODO: use MEMMOVE and MEMSET records to reduce volume */
const ulint ext_len = ulint(page_zip->n_blobs - blob_no)
* FIELD_REF_SIZE;
if (byte* log_ptr = mlog_open(mtr, 11 + 2 + 2)) {
log_ptr = mlog_write_initial_log_record_fast(
rec, MLOG_ZIP_WRITE_STRING, log_ptr, mtr);
mach_write_to_2(log_ptr, ext_end - page_zip->data);
mach_write_to_2(log_ptr + 2, ext_len);
mlog_close(mtr, log_ptr + 4);
mlog_catenate_string(mtr, ext_end, ext_len);
}
page_zip->n_blobs -= static_cast<unsigned>(n_ext);
}
skip_blobs:
/* The compression algorithm expects info_bits and n_owned
to be 0 for deleted records. */
rec[-REC_N_NEW_EXTRA_BYTES] = 0; /* info_bits and n_owned */
page_zip_clear_rec(page_zip, rec, index, offsets);
page_zip_clear_rec(page_zip, rec, index, offsets, mtr);
}
/**********************************************************************//**
......
......@@ -1416,7 +1416,8 @@ rec_convert_dtuple_to_rec_old(
/* Set the info bits of the record */
rec_set_info_bits_old(rec, dtuple_get_info_bits(dtuple)
& REC_INFO_BITS_MASK);
rec_set_heap_no_old(rec, PAGE_HEAP_NO_USER_LOW);
rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW, REC_OLD_HEAP_NO,
REC_HEAP_NO_MASK, REC_HEAP_NO_SHIFT);
/* Store the data and the offsets */
......@@ -1529,7 +1530,9 @@ rec_convert_dtuple_to_rec_comp(
ut_ad(n_fields == ulint(index->n_fields) + 1);
rec_set_n_add_field(nulls, n_fields - 1
- index->n_core_fields);
rec_set_heap_no_new(rec, PAGE_HEAP_NO_USER_LOW);
rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW,
REC_NEW_HEAP_NO, REC_HEAP_NO_MASK,
REC_HEAP_NO_SHIFT);
rec_set_status(rec, REC_STATUS_INSTANT);
n_node_ptr_field = ULINT_UNDEFINED;
lens = nulls - UT_BITS_IN_BYTES(index->n_nullable);
......@@ -1545,8 +1548,9 @@ rec_convert_dtuple_to_rec_comp(
case REC_STATUS_ORDINARY:
ut_ad(n_fields <= dict_index_get_n_fields(index));
if (!temp) {
rec_set_heap_no_new(rec, PAGE_HEAP_NO_USER_LOW);
rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW,
REC_NEW_HEAP_NO, REC_HEAP_NO_MASK,
REC_HEAP_NO_SHIFT);
rec_set_status(
rec, n_fields == index->n_core_fields
? REC_STATUS_ORDINARY
......@@ -1569,7 +1573,9 @@ rec_convert_dtuple_to_rec_comp(
break;
case REC_STATUS_NODE_PTR:
ut_ad(!temp);
rec_set_heap_no_new(rec, PAGE_HEAP_NO_USER_LOW);
rec_set_bit_field_2(rec, PAGE_HEAP_NO_USER_LOW,
REC_NEW_HEAP_NO, REC_HEAP_NO_MASK,
REC_HEAP_NO_SHIFT);
rec_set_status(rec, status);
ut_ad(n_fields
== dict_index_get_n_unique_in_tree_nonleaf(index) + 1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment