Commit 263932d5 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-33325 Crash in flst_read_addr on corrupted data

flst_read_addr(): Remove assertions. Instead, we will check these
conditions in the callers and avoid a crash in case of corruption.
We will check the conditions more carefully, because the callers
know more exact bounds for the page numbers and the byte offsets
withing pages.

flst_remove(), flst_add_first(), flst_add_last(): Add a parameter
for passing fil_space_t::free_limit. None of the lists may point to
pages that are beyond the current initialized length of the
tablespace.

trx_rseg_mem_restore(): Access the first page of the tablespace,
so that we will correctly recover rseg->space->free_limit
in case some log based recovery is pending.

ibuf_remove_free_page(): Only look up the root page once, and
validate the last page number.

Reviewed by: Debarun Banerjee
parent da47c037
......@@ -562,7 +562,8 @@ btr_page_alloc_for_ibuf(
{
buf_page_make_young_if_needed(&new_block->page);
*err= flst_remove(root, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, new_block,
PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
fil_system.sys_space->free_limit, mtr);
}
ut_d(if (*err == DB_SUCCESS)
flst_validate(root, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr));
......@@ -666,7 +667,8 @@ btr_page_free_for_ibuf(
buf_block_t *root= btr_get_latched_root(*index, mtr);
dberr_t err=
flst_add_first(root, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
block, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE, mtr);
block, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
fil_system.sys_space->free_limit, mtr);
ut_d(if (err == DB_SUCCESS)
flst_validate(root, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST, mtr));
return err;
......
......@@ -261,6 +261,7 @@ inline void xdes_init(const buf_block_t &block, xdes_t *descr, mtr_t *mtr)
}
/** Mark a page used in an extent descriptor.
@param[in] space tablespace
@param[in,out] seg_inode segment inode
@param[in,out] iblock segment inode page
@param[in] page page number
......@@ -270,7 +271,8 @@ inline void xdes_init(const buf_block_t &block, xdes_t *descr, mtr_t *mtr)
@return error code */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
fseg_mark_page_used(fseg_inode_t *seg_inode, buf_block_t *iblock,
fseg_mark_page_used(const fil_space_t *space,
fseg_inode_t *seg_inode, buf_block_t *iblock,
uint32_t page, xdes_t *descr, buf_block_t *xdes, mtr_t *mtr)
{
ut_ad(fil_page_get_type(iblock->page.frame) == FIL_PAGE_INODE);
......@@ -280,15 +282,16 @@ fseg_mark_page_used(fseg_inode_t *seg_inode, buf_block_t *iblock,
const uint16_t xoffset= uint16_t(descr - xdes->page.frame + XDES_FLST_NODE);
const uint16_t ioffset= uint16_t(seg_inode - iblock->page.frame);
const uint32_t limit= space->free_limit;
if (!xdes_get_n_used(descr))
{
/* We move the extent from the free list to the NOT_FULL list */
if (dberr_t err= flst_remove(iblock, uint16_t(FSEG_FREE + ioffset),
xdes, xoffset, mtr))
xdes, xoffset, limit, mtr))
return err;
if (dberr_t err= flst_add_last(iblock, uint16_t(FSEG_NOT_FULL + ioffset),
xdes, xoffset, mtr))
xdes, xoffset, limit, mtr))
return err;
}
......@@ -305,10 +308,10 @@ fseg_mark_page_used(fseg_inode_t *seg_inode, buf_block_t *iblock,
{
/* We move the extent from the NOT_FULL list to the FULL list */
if (dberr_t err= flst_remove(iblock, uint16_t(FSEG_NOT_FULL + ioffset),
xdes, xoffset, mtr))
xdes, xoffset, limit, mtr))
return err;
if (dberr_t err= flst_add_last(iblock, uint16_t(FSEG_FULL + ioffset),
xdes, xoffset, mtr))
xdes, xoffset, limit, mtr))
return err;
mtr->write<4>(*iblock, seg_inode + FSEG_NOT_FULL_N_USED,
not_full_n_used - FSP_EXTENT_SIZE);
......@@ -891,7 +894,7 @@ fsp_fill_free_list(
xdes_set_free<false>(*xdes, descr, FSP_IBUF_BITMAP_OFFSET, mtr);
xdes_set_state(*xdes, descr, XDES_FREE_FRAG, mtr);
if (dberr_t err= flst_add_last(header, FSP_HEADER_OFFSET + FSP_FREE_FRAG,
xdes, xoffset, mtr))
xdes, xoffset, space->free_limit, mtr))
return err;
byte *n_used= FSP_HEADER_OFFSET + FSP_FRAG_N_USED + header->page.frame;
mtr->write<4>(*header, n_used, 2U + mach_read_from_4(n_used));
......@@ -900,7 +903,7 @@ fsp_fill_free_list(
{
if (dberr_t err=
flst_add_last(header, FSP_HEADER_OFFSET + FSP_FREE,
xdes, xoffset, mtr))
xdes, xoffset, space->free_limit, mtr))
return err;
count++;
}
......@@ -951,7 +954,11 @@ static xdes_t *fsp_alloc_free_extent(fil_space_t *space, uint32_t hint,
first = flst_get_first(FSP_HEADER_OFFSET + FSP_FREE
+ header->page.frame);
if (first.page == FIL_NULL) {
if (first.page >= space->free_limit) {
if (first.page != FIL_NULL) {
goto flst_corrupted;
}
*err = fsp_fill_free_list(false, space, header, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS)) {
goto corrupted;
......@@ -962,6 +969,17 @@ static xdes_t *fsp_alloc_free_extent(fil_space_t *space, uint32_t hint,
if (first.page == FIL_NULL) {
return nullptr; /* No free extents left */
}
if (first.page >= space->free_limit) {
goto flst_corrupted;
}
}
if (first.boffset < FSP_HEADER_OFFSET + FSP_HEADER_SIZE
|| first.boffset >= space->physical_size()
- (XDES_SIZE + FIL_PAGE_DATA_END)) {
flst_corrupted:
*err = DB_CORRUPTION;
goto corrupted;
}
descr = xdes_lst_get_descriptor(*space, first, mtr,
......@@ -974,7 +992,7 @@ static xdes_t *fsp_alloc_free_extent(fil_space_t *space, uint32_t hint,
*err = flst_remove(header, FSP_HEADER_OFFSET + FSP_FREE, desc_block,
static_cast<uint16_t>(descr - desc_block->page.frame
+ XDES_FLST_NODE),
mtr);
space->free_limit, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS)) {
return nullptr;
}
......@@ -991,11 +1009,12 @@ MY_ATTRIBUTE((nonnull, warn_unused_result))
@param[in,out] xdes extent descriptor page
@param[in,out] descr extent descriptor
@param[in] bit slot to allocate in the extent
@param[in] space tablespace
@param[in,out] mtr mini-transaction
@return error code */
static dberr_t
fsp_alloc_from_free_frag(buf_block_t *header, buf_block_t *xdes, xdes_t *descr,
uint32_t bit, mtr_t *mtr)
uint32_t bit, fil_space_t *space, mtr_t *mtr)
{
if (UNIV_UNLIKELY(xdes_get_state(descr) != XDES_FREE_FRAG ||
!xdes_is_free(descr, bit)))
......@@ -1008,14 +1027,15 @@ fsp_alloc_from_free_frag(buf_block_t *header, buf_block_t *xdes, xdes_t *descr,
if (xdes_is_full(descr))
{
const uint32_t limit= space->free_limit;
/* The fragment is full: move it to another list */
const uint16_t xoffset=
static_cast<uint16_t>(descr - xdes->page.frame + XDES_FLST_NODE);
if (dberr_t err= flst_remove(header, FSP_HEADER_OFFSET + FSP_FREE_FRAG,
xdes, xoffset, mtr))
xdes, xoffset, limit, mtr))
return err;
if (dberr_t err= flst_add_last(header, FSP_HEADER_OFFSET + FSP_FULL_FRAG,
xdes, xoffset, mtr))
xdes, xoffset, limit, mtr))
return err;
xdes_set_state(*xdes, descr, XDES_FULL_FRAG, mtr);
n_used-= FSP_EXTENT_SIZE;
......@@ -1079,8 +1099,11 @@ buf_block_t *fsp_alloc_free_page(fil_space_t *space, uint32_t hint,
/* Else take the first extent in free_frag list */
fil_addr_t first = flst_get_first(FSP_HEADER_OFFSET + FSP_FREE_FRAG +
block->page.frame);
if (first.page == FIL_NULL)
if (first.page >= space->free_limit)
{
if (first.page != FIL_NULL)
goto flst_corrupted;
/* There are no partially full fragments: allocate a free extent
and add it to the FREE_FRAG list. NOTE that the allocation may
have as a side-effect that an extent containing a descriptor
......@@ -1091,13 +1114,23 @@ buf_block_t *fsp_alloc_free_page(fil_space_t *space, uint32_t hint,
return nullptr;
*err= flst_add_last(block, FSP_HEADER_OFFSET + FSP_FREE_FRAG, xdes,
static_cast<uint16_t>(descr - xdes->page.frame +
XDES_FLST_NODE), mtr);
XDES_FLST_NODE),
space->free_limit, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS))
return nullptr;
xdes_set_state(*xdes, descr, XDES_FREE_FRAG, mtr);
}
else
{
if (first.boffset < FSP_HEADER_OFFSET + FSP_HEADER_SIZE ||
first.boffset >= space->physical_size() -
(XDES_SIZE + FIL_PAGE_DATA_END))
{
flst_corrupted:
*err= DB_CORRUPTION;
goto err_exit;
}
descr= xdes_lst_get_descriptor(*space, first, mtr, &xdes, err);
if (!descr)
return nullptr;
......@@ -1144,7 +1177,7 @@ buf_block_t *fsp_alloc_free_page(fil_space_t *space, uint32_t hint,
}
}
*err= fsp_alloc_from_free_frag(block, xdes, descr, free, mtr);
*err= fsp_alloc_from_free_frag(block, xdes, descr, free, space, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS))
goto corrupted;
return fsp_page_create(space, page_no, init_mtr);
......@@ -1183,7 +1216,8 @@ static dberr_t fsp_free_extent(fil_space_t* space, page_no_t offset,
space->free_len++;
return flst_add_last(block, FSP_HEADER_OFFSET + FSP_FREE,
xdes, static_cast<uint16_t>(descr - xdes->page.frame +
XDES_FLST_NODE), mtr);
XDES_FLST_NODE),
space->free_limit, mtr);
}
MY_ATTRIBUTE((nonnull))
......@@ -1237,16 +1271,17 @@ static dberr_t fsp_free_page(fil_space_t *space, page_no_t offset, mtr_t *mtr)
const uint16_t xoffset= static_cast<uint16_t>(descr - xdes->page.frame
+ XDES_FLST_NODE);
const uint32_t limit = space->free_limit;
if (state == XDES_FULL_FRAG) {
/* The fragment was full: move it to another list */
err = flst_remove(header, FSP_HEADER_OFFSET + FSP_FULL_FRAG,
xdes, xoffset, mtr);
xdes, xoffset, limit, mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
return err;
}
err = flst_add_last(header, FSP_HEADER_OFFSET + FSP_FREE_FRAG,
xdes, xoffset, mtr);
xdes, xoffset, limit, mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
return err;
}
......@@ -1268,7 +1303,7 @@ static dberr_t fsp_free_page(fil_space_t *space, page_no_t offset, mtr_t *mtr)
if (!xdes_get_n_used(descr)) {
/* The extent has become free: move it to another list */
err = flst_remove(header, FSP_HEADER_OFFSET + FSP_FREE_FRAG,
xdes, xoffset, mtr);
xdes, xoffset, limit, mtr);
if (err == DB_SUCCESS) {
err = fsp_free_extent(space, offset, mtr);
}
......@@ -1362,7 +1397,7 @@ static dberr_t fsp_alloc_seg_inode_page(fil_space_t *space,
#endif
return flst_add_last(header, FSP_HEADER_OFFSET + FSP_SEG_INODES_FREE,
block, FSEG_INODE_PAGE_NODE, mtr);
block, FSEG_INODE_PAGE_NODE, space->free_limit, mtr);
}
MY_ATTRIBUTE((nonnull, warn_unused_result))
......@@ -1418,12 +1453,13 @@ fsp_alloc_seg_inode(fil_space_t *space, buf_block_t *header,
{
/* There are no other unused headers left on the page: move it
to another list */
const uint32_t limit= space->free_limit;
*err= flst_remove(header, FSP_HEADER_OFFSET + FSP_SEG_INODES_FREE,
block, FSEG_INODE_PAGE_NODE, mtr);
block, FSEG_INODE_PAGE_NODE, limit, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS))
return nullptr;
*err= flst_add_last(header, FSP_HEADER_OFFSET + FSP_SEG_INODES_FULL,
block, FSEG_INODE_PAGE_NODE, mtr);
block, FSEG_INODE_PAGE_NODE, limit, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS))
return nullptr;
}
......@@ -1456,16 +1492,17 @@ static void fsp_free_seg_inode(fil_space_t *space, fseg_inode_t *inode,
}
const ulint physical_size= space->physical_size();
const uint32_t limit= space->free_limit;
if (ULINT_UNDEFINED == fsp_seg_inode_page_find_free(iblock->page.frame, 0,
physical_size))
{
/* Move the page to another list */
if (flst_remove(header, FSP_HEADER_OFFSET + FSP_SEG_INODES_FULL,
iblock, FSEG_INODE_PAGE_NODE, mtr) != DB_SUCCESS)
iblock, FSEG_INODE_PAGE_NODE, limit, mtr) != DB_SUCCESS)
return;
if (flst_add_last(header, FSP_HEADER_OFFSET + FSP_SEG_INODES_FREE,
iblock, FSEG_INODE_PAGE_NODE, mtr) != DB_SUCCESS)
iblock, FSEG_INODE_PAGE_NODE, limit, mtr) != DB_SUCCESS)
return;
}
......@@ -1477,7 +1514,7 @@ static void fsp_free_seg_inode(fil_space_t *space, fseg_inode_t *inode,
/* There are no other used headers left on the page: free it */
if (flst_remove(header, FSP_HEADER_OFFSET + FSP_SEG_INODES_FREE,
iblock, FSEG_INODE_PAGE_NODE, mtr) == DB_SUCCESS)
iblock, FSEG_INODE_PAGE_NODE, limit, mtr) == DB_SUCCESS)
fsp_free_page(space, iblock->page.id().page_no(), mtr);
}
......@@ -1850,7 +1887,8 @@ static dberr_t fseg_fill_free_list(const fseg_inode_t *inode,
static_cast<uint16_t>(inode - iblock->page.frame +
FSEG_FREE), xdes,
static_cast<uint16_t>(descr - xdes->page.frame +
XDES_FLST_NODE), mtr))
XDES_FLST_NODE),
space->free_limit, mtr))
return err;
xdes_set_state(*xdes, descr, XDES_FSEG, mtr);
mtr->memcpy(*xdes, descr + XDES_ID, inode + FSEG_ID, 8);
......@@ -1885,11 +1923,25 @@ fseg_alloc_free_extent(
ut_ad(!memcmp(FSEG_MAGIC_N_BYTES, FSEG_MAGIC_N + inode, 4));
ut_d(space->modify_check(*mtr));
if (UNIV_UNLIKELY(page_offset(inode) < FSEG_ARR_OFFSET))
{
corrupted:
*err= DB_CORRUPTION;
space->set_corrupted();
return nullptr;
}
if (flst_get_len(inode + FSEG_FREE))
{
const fil_addr_t first= flst_get_first(inode + FSEG_FREE);
if (first.page >= space->free_limit ||
first.boffset < FSP_HEADER_OFFSET + FSP_HEADER_SIZE ||
first.boffset >= space->physical_size() -
(XDES_SIZE + FIL_PAGE_DATA_END))
goto corrupted;
/* Segment free list is not empty, allocate from it */
return xdes_lst_get_descriptor(*space, flst_get_first(inode + FSEG_FREE),
mtr, xdes, err);
return xdes_lst_get_descriptor(*space, first, mtr, xdes, err);
}
xdes_t* descr= fsp_alloc_free_extent(space, 0, xdes, mtr, err);
......@@ -1901,7 +1953,8 @@ fseg_alloc_free_extent(
static_cast<uint16_t>(inode - iblock->page.frame +
FSEG_FREE), *xdes,
static_cast<uint16_t>(descr - (*xdes)->page.frame +
XDES_FLST_NODE), mtr);
XDES_FLST_NODE),
space->free_limit, mtr);
if (UNIV_LIKELY(*err != DB_SUCCESS))
return nullptr;
/* Try to fill the segment free list */
......@@ -2042,7 +2095,8 @@ fseg_alloc_free_page_low(
+ FSEG_FREE), xdes,
static_cast<uint16_t>(ret_descr
- xdes->page.frame
+ XDES_FLST_NODE), mtr);
+ XDES_FLST_NODE),
space->free_limit, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS)) {
return nullptr;
}
......@@ -2088,6 +2142,14 @@ fseg_alloc_free_page_low(
return nullptr;
}
if (first.page >= space->free_limit
|| first.boffset < FSP_HEADER_OFFSET + FSP_HEADER_SIZE
|| first.boffset >= space->physical_size()
- (XDES_SIZE + FIL_PAGE_DATA_END)) {
*err= DB_CORRUPTION;
return nullptr;
}
ret_descr = xdes_lst_get_descriptor(*space, first, mtr, &xdes);
if (!ret_descr) {
return nullptr;
......@@ -2181,8 +2243,8 @@ fseg_alloc_free_page_low(
ut_ad(xdes == xxdes);
ut_ad(xdes_is_free(ret_descr, ret_page % extent_size));
*err = fseg_mark_page_used(seg_inode, iblock, ret_page,
ret_descr, xdes, mtr);
*err = fseg_mark_page_used(space, seg_inode, iblock, ret_page,
ret_descr, xdes, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS)) {
return nullptr;
}
......@@ -2524,18 +2586,19 @@ fseg_free_page_low(
const uint16_t xoffset= uint16_t(descr - xdes->page.frame
+ XDES_FLST_NODE);
const uint16_t ioffset= uint16_t(seg_inode - iblock->page.frame);
const uint32_t limit = space->free_limit;
if (xdes_is_full(descr)) {
/* The fragment is full: move it to another list */
err = flst_remove(iblock,
static_cast<uint16_t>(FSEG_FULL + ioffset),
xdes, xoffset, mtr);
xdes, xoffset, limit, mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
return err;
}
err = flst_add_last(iblock, static_cast<uint16_t>(FSEG_NOT_FULL
+ ioffset),
xdes, xoffset, mtr);
xdes, xoffset, limit, mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
return err;
}
......@@ -2553,7 +2616,7 @@ fseg_free_page_low(
if (!xdes_get_n_used(descr)) {
err = flst_remove(iblock, static_cast<uint16_t>(FSEG_NOT_FULL
+ ioffset),
xdes, xoffset, mtr);
xdes, xoffset, limit, mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
return err;
}
......@@ -2698,11 +2761,12 @@ fseg_free_extent(
#endif /* BTR_CUR_HASH_ADAPT */
uint16_t lst;
uint32_t limit = space->free_limit;
if (xdes_is_full(descr)) {
lst = static_cast<uint16_t>(FSEG_FULL + ioffset);
remove:
err = flst_remove(iblock, lst, xdes, xoffset, mtr);
err = flst_remove(iblock, lst, xdes, xoffset, limit, mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
return err;
}
......@@ -2712,7 +2776,7 @@ fseg_free_extent(
} else {
err = flst_remove(
iblock, static_cast<uint16_t>(FSEG_NOT_FULL + ioffset),
xdes, xoffset, mtr);
xdes, xoffset, limit, mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
return err;
}
......@@ -2962,7 +3026,10 @@ fseg_get_first_extent(
return nullptr;
}
if (first.page == FIL_NULL)
if (first.page >= space->free_limit ||
first.boffset < FSP_HEADER_OFFSET + FSP_HEADER_SIZE ||
first.boffset >= space->physical_size() -
(XDES_SIZE + FIL_PAGE_DATA_END))
goto corrupted;
return xdes_lst_get_descriptor(*space, first, mtr, nullptr, err);
......
......@@ -113,17 +113,18 @@ static void flst_add_to_empty(buf_block_t *base, uint16_t boffset,
}
/** Insert a node after another one.
@param[in,out] base base node block
@param[in] boffset byte offset of the base node
@param[in,out] cur insert position block
@param[in] coffset byte offset of the insert position
@param[in,out] add block to be added
@param[in] aoffset byte offset of the block to be added
@param[in,out] mtr mini-transaction */
@param base base node block
@param boffset byte offset of the base node
@param cur insert position block
@param coffset byte offset of the insert position
@param add block to be added
@param aoffset byte offset of the block to be added
@param limit fil_space_t::free_limit
@param mtr mini-transaction */
static dberr_t flst_insert_after(buf_block_t *base, uint16_t boffset,
buf_block_t *cur, uint16_t coffset,
buf_block_t *add, uint16_t aoffset,
mtr_t *mtr)
uint32_t limit, mtr_t *mtr)
{
ut_ad(base != cur || boffset != coffset);
ut_ad(base != add || boffset != aoffset);
......@@ -139,6 +140,15 @@ static dberr_t flst_insert_after(buf_block_t *base, uint16_t boffset,
MTR_MEMO_PAGE_SX_FIX));
fil_addr_t next_addr= flst_get_next_addr(cur->page.frame + coffset);
if (next_addr.page >= limit)
{
if (UNIV_UNLIKELY(next_addr.page != FIL_NULL))
return DB_CORRUPTION;
}
else if (UNIV_UNLIKELY(next_addr.boffset < FIL_PAGE_DATA ||
next_addr.boffset >= base->physical_size() -
FIL_PAGE_DATA_END))
return DB_CORRUPTION;
flst_write_addr(*add, add->page.frame + aoffset + FLST_PREV,
cur->page.id().page_no(), coffset, mtr);
......@@ -167,18 +177,19 @@ static dberr_t flst_insert_after(buf_block_t *base, uint16_t boffset,
}
/** Insert a node before another one.
@param[in,out] base base node block
@param[in] boffset byte offset of the base node
@param[in,out] cur insert position block
@param[in] coffset byte offset of the insert position
@param[in,out] add block to be added
@param[in] aoffset byte offset of the block to be added
@param[in,out] mtr mini-transaction
@param base base node block
@param boffset byte offset of the base node
@param cur insert position block
@param coffset byte offset of the insert position
@param add block to be added
@param aoffset byte offset of the block to be added
@param limit fil_space_t::free_limit
@param mtr mini-transaction
@return error code */
static dberr_t flst_insert_before(buf_block_t *base, uint16_t boffset,
buf_block_t *cur, uint16_t coffset,
buf_block_t *add, uint16_t aoffset,
mtr_t *mtr)
uint32_t limit, mtr_t *mtr)
{
ut_ad(base != cur || boffset != coffset);
ut_ad(base != add || boffset != aoffset);
......@@ -194,6 +205,15 @@ static dberr_t flst_insert_before(buf_block_t *base, uint16_t boffset,
MTR_MEMO_PAGE_SX_FIX));
fil_addr_t prev_addr= flst_get_prev_addr(cur->page.frame + coffset);
if (prev_addr.page >= limit)
{
if (UNIV_UNLIKELY(prev_addr.page != FIL_NULL))
return DB_CORRUPTION;
}
else if (UNIV_UNLIKELY(prev_addr.boffset < FIL_PAGE_DATA ||
prev_addr.boffset >= base->physical_size() -
FIL_PAGE_DATA_END))
return DB_CORRUPTION;
flst_write_addr(*add, add->page.frame + aoffset + FLST_PREV,
prev_addr.page, prev_addr.boffset, mtr);
......@@ -234,14 +254,9 @@ void flst_init(const buf_block_t& block, byte *base, mtr_t *mtr)
flst_zero_both(block, base + FLST_FIRST, mtr);
}
/** Append a file list node to a list.
@param[in,out] base base node block
@param[in] boffset byte offset of the base node
@param[in,out] add block to be added
@param[in] aoffset byte offset of the node to be added
@param[in,outr] mtr mini-transaction */
dberr_t flst_add_last(buf_block_t *base, uint16_t boffset,
buf_block_t *add, uint16_t aoffset, mtr_t *mtr)
buf_block_t *add, uint16_t aoffset,
uint32_t limit, mtr_t *mtr)
{
ut_ad(base != add || boffset != aoffset);
ut_ad(boffset < base->physical_size());
......@@ -258,6 +273,13 @@ dberr_t flst_add_last(buf_block_t *base, uint16_t boffset,
else
{
fil_addr_t addr= flst_get_last(base->page.frame + boffset);
if (UNIV_UNLIKELY(addr.page >= limit))
return DB_CORRUPTION;
else if (UNIV_UNLIKELY(addr.boffset < FIL_PAGE_DATA ||
addr.boffset >= base->physical_size() -
FIL_PAGE_DATA_END))
return DB_CORRUPTION;
buf_block_t *cur= add;
dberr_t err;
if (addr.page != add->page.id().page_no() &&
......@@ -266,19 +288,13 @@ dberr_t flst_add_last(buf_block_t *base, uint16_t boffset,
BUF_GET_POSSIBLY_FREED, mtr, &err)))
return err;
return flst_insert_after(base, boffset, cur, addr.boffset,
add, aoffset, mtr);
add, aoffset, limit, mtr);
}
}
/** Prepend a file list node to a list.
@param[in,out] base base node block
@param[in] boffset byte offset of the base node
@param[in,out] add block to be added
@param[in] aoffset byte offset of the node to be added
@param[in,out] mtr mini-transaction
@return error code */
dberr_t flst_add_first(buf_block_t *base, uint16_t boffset,
buf_block_t *add, uint16_t aoffset, mtr_t *mtr)
buf_block_t *add, uint16_t aoffset,
uint32_t limit, mtr_t *mtr)
{
ut_ad(base != add || boffset != aoffset);
ut_ad(boffset < base->physical_size());
......@@ -296,6 +312,12 @@ dberr_t flst_add_first(buf_block_t *base, uint16_t boffset,
else
{
fil_addr_t addr= flst_get_first(base->page.frame + boffset);
if (UNIV_UNLIKELY(addr.page >= limit))
return DB_CORRUPTION;
else if (UNIV_UNLIKELY(addr.boffset < FIL_PAGE_DATA ||
addr.boffset >= base->physical_size() -
FIL_PAGE_DATA_END))
return DB_CORRUPTION;
buf_block_t *cur= add;
dberr_t err;
if (addr.page != add->page.id().page_no() &&
......@@ -304,19 +326,13 @@ dberr_t flst_add_first(buf_block_t *base, uint16_t boffset,
BUF_GET_POSSIBLY_FREED, mtr, &err)))
return err;
return flst_insert_before(base, boffset, cur, addr.boffset,
add, aoffset, mtr);
add, aoffset, limit, mtr);
}
}
/** Remove a file list node.
@param[in,out] base base node block
@param[in] boffset byte offset of the base node
@param[in,out] cur block to be removed
@param[in] coffset byte offset of the current record to be removed
@param[in,out] mtr mini-transaction
@return error code */
dberr_t flst_remove(buf_block_t *base, uint16_t boffset,
buf_block_t *cur, uint16_t coffset, mtr_t *mtr)
buf_block_t *cur, uint16_t coffset,
uint32_t limit, mtr_t *mtr)
{
ut_ad(boffset < base->physical_size());
ut_ad(coffset < cur->physical_size());
......@@ -329,9 +345,27 @@ dberr_t flst_remove(buf_block_t *base, uint16_t boffset,
const fil_addr_t next_addr= flst_get_next_addr(cur->page.frame + coffset);
dberr_t err= DB_SUCCESS;
if (prev_addr.page == FIL_NULL)
if (next_addr.page >= limit)
{
if (next_addr.page != FIL_NULL)
return DB_CORRUPTION;
}
else if (UNIV_UNLIKELY(next_addr.boffset < FIL_PAGE_DATA ||
next_addr.boffset >= base->physical_size() -
FIL_PAGE_DATA_END))
return DB_CORRUPTION;
if (prev_addr.page >= limit)
{
if (prev_addr.page != FIL_NULL)
return DB_CORRUPTION;
flst_write_addr(*base, base->page.frame + boffset + FLST_FIRST,
next_addr.page, next_addr.boffset, mtr);
}
else if (UNIV_UNLIKELY(prev_addr.boffset < FIL_PAGE_DATA ||
prev_addr.boffset >= base->physical_size() -
FIL_PAGE_DATA_END))
return DB_CORRUPTION;
else
{
buf_block_t *b= cur;
......@@ -375,25 +409,19 @@ void flst_validate(const buf_block_t *base, uint16_t boffset, mtr_t *mtr)
ut_ad(mtr->memo_contains_flagged(base, MTR_MEMO_PAGE_X_FIX |
MTR_MEMO_PAGE_SX_FIX));
/* We use two mini-transaction handles: the first is used to lock
the base node, and prevent other threads from modifying the list.
The second is used to traverse the list. We cannot run the second
mtr without committing it at times, because if the list is long,
the x-locked pages could fill the buffer, resulting in a deadlock. */
mtr_t mtr2;
const uint32_t len= flst_get_len(base->page.frame + boffset);
fil_addr_t addr= flst_get_first(base->page.frame + boffset);
for (uint32_t i= len; i--; )
{
mtr2.start();
ut_ad(addr.boffset >= FIL_PAGE_DATA);
ut_ad(addr.boffset < base->physical_size() - FIL_PAGE_DATA_END);
const buf_block_t *b=
buf_page_get_gen(page_id_t(base->page.id().space(), addr.page),
base->zip_size(), RW_SX_LATCH, nullptr, BUF_GET, mtr);
ut_ad(b);
addr= flst_get_next_addr(b->page.frame + addr.boffset);
mtr2.commit();
mtr->release_last_page();
}
ut_ad(addr.page == FIL_NULL);
......@@ -402,13 +430,14 @@ void flst_validate(const buf_block_t *base, uint16_t boffset, mtr_t *mtr)
for (uint32_t i= len; i--; )
{
mtr2.start();
ut_ad(addr.boffset >= FIL_PAGE_DATA);
ut_ad(addr.boffset < base->physical_size() - FIL_PAGE_DATA_END);
const buf_block_t *b=
buf_page_get_gen(page_id_t(base->page.id().space(), addr.page),
base->zip_size(), RW_SX_LATCH, nullptr, BUF_GET, mtr);
ut_ad(b);
addr= flst_get_prev_addr(b->page.frame + addr.boffset);
mtr2.commit();
mtr->release_last_page();
}
ut_ad(addr.page == FIL_NULL);
......
......@@ -1831,7 +1831,7 @@ static bool ibuf_add_free_page()
err = flst_add_last(ibuf_root, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
block, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
&mtr);
fil_system.sys_space->free_limit, &mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
goto corrupted;
}
......@@ -1862,7 +1862,6 @@ Removes a page from the free list and frees it to the fsp system. */
static void ibuf_remove_free_page()
{
mtr_t mtr;
mtr_t mtr2;
page_t* header_page;
log_free_check();
......@@ -1889,26 +1888,28 @@ static void ibuf_remove_free_page()
return;
}
ibuf_mtr_start(&mtr2);
buf_block_t* root = ibuf_tree_root_get(&mtr2);
const auto root_savepoint = mtr.get_savepoint();
buf_block_t* root = ibuf_tree_root_get(&mtr);
if (UNIV_UNLIKELY(!root)) {
ibuf_mtr_commit(&mtr2);
goto early_exit;
}
mysql_mutex_unlock(&ibuf_mutex);
const uint32_t page_no = flst_get_last(PAGE_HEADER
+ PAGE_BTR_IBUF_FREE_LIST
+ root->page.frame).page;
if (page_no >= fil_system.sys_space->free_limit) {
goto early_exit;
}
mysql_mutex_unlock(&ibuf_mutex);
/* NOTE that we must release the latch on the ibuf tree root
because in fseg_free_page we access level 1 pages, and the root
is a level 2 page. */
ibuf_mtr_commit(&mtr2);
root->page.lock.u_unlock();
mtr.lock_register(root_savepoint, MTR_MEMO_BUF_FIX);
ibuf_exit(&mtr);
/* Since pessimistic inserts were prevented, we know that the
......@@ -1931,15 +1932,7 @@ static void ibuf_remove_free_page()
ibuf_enter(&mtr);
mysql_mutex_lock(&ibuf_mutex);
root = ibuf_tree_root_get(&mtr, &err);
if (UNIV_UNLIKELY(!root)) {
mysql_mutex_unlock(&ibuf_pessimistic_insert_mutex);
goto func_exit;
}
ut_ad(page_no == flst_get_last(PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST
+ root->page.frame).page);
mtr.upgrade_buffer_fix(root_savepoint, RW_X_LATCH);
/* Remove the page from the free list and update the ibuf size data */
if (buf_block_t* block =
......@@ -1948,7 +1941,7 @@ static void ibuf_remove_free_page()
err = flst_remove(root, PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST,
block,
PAGE_HEADER + PAGE_BTR_IBUF_FREE_LIST_NODE,
&mtr);
fil_system.sys_space->free_limit, &mtr);
}
mysql_mutex_unlock(&ibuf_pessimistic_insert_mutex);
......
......@@ -78,34 +78,40 @@ void flst_init(const buf_block_t &block, byte *base, mtr_t *mtr)
MY_ATTRIBUTE((nonnull));
/** Append a file list node to a list.
@param[in,out] base base node block
@param[in] boffset byte offset of the base node
@param[in,out] add block to be added
@param[in] aoffset byte offset of the node to be added
@param[in,out] mtr mini-transaction
@param base base node block
@param boffset byte offset of the base node
@param add block to be added
@param aoffset byte offset of the node to be added
@param limit fil_space_t::free_limit
@param mtr mini-transaction
@return error code */
dberr_t flst_add_last(buf_block_t *base, uint16_t boffset,
buf_block_t *add, uint16_t aoffset, mtr_t *mtr)
buf_block_t *add, uint16_t aoffset,
uint32_t limit, mtr_t *mtr)
MY_ATTRIBUTE((nonnull, warn_unused_result));
/** Prepend a file list node to a list.
@param[in,out] base base node block
@param[in] boffset byte offset of the base node
@param[in,out] add block to be added
@param[in] aoffset byte offset of the node to be added
@param[in,out] mtr mini-transaction
@param base base node block
@param boffset byte offset of the base node
@param add block to be added
@param aoffset byte offset of the node to be added
@param limit fil_space_t::free_limit
@param mtr mini-transaction
@return error code */
dberr_t flst_add_first(buf_block_t *base, uint16_t boffset,
buf_block_t *add, uint16_t aoffset, mtr_t *mtr)
buf_block_t *add, uint16_t aoffset,
uint32_t limit, mtr_t *mtr)
MY_ATTRIBUTE((nonnull, warn_unused_result));
/** Remove a file list node.
@param[in,out] base base node block
@param[in] boffset byte offset of the base node
@param[in,out] cur block to be removed
@param[in] coffset byte offset of the current record to be removed
@param[in,out] mtr mini-transaction
@param base base node block
@param boffset byte offset of the base node
@param cur block to be removed
@param coffset byte offset of the current record to be removed
@param limit fil_space_t::free_limit
@param mtr mini-transaction
@return error code */
dberr_t flst_remove(buf_block_t *base, uint16_t boffset,
buf_block_t *cur, uint16_t coffset, mtr_t *mtr)
buf_block_t *cur, uint16_t coffset,
uint32_t limit, mtr_t *mtr)
MY_ATTRIBUTE((nonnull, warn_unused_result));
/** @return the length of a list */
......@@ -117,11 +123,9 @@ inline uint32_t flst_get_len(const flst_base_node_t *base)
/** @return a file address */
inline fil_addr_t flst_read_addr(const byte *faddr)
{
fil_addr_t addr= { mach_read_from_4(faddr + FIL_ADDR_PAGE),
mach_read_from_2(faddr + FIL_ADDR_BYTE) };
ut_a(addr.page == FIL_NULL || addr.boffset >= FIL_PAGE_DATA);
ut_a(ut_align_offset(faddr, srv_page_size) >= FIL_PAGE_DATA);
return addr;
ut_ad(ut_align_offset(faddr, srv_page_size) >= FIL_PAGE_DATA);
return fil_addr_t{mach_read_from_4(faddr + FIL_ADDR_PAGE),
mach_read_from_2(faddr + FIL_ADDR_BYTE)};
}
/** @return list first node address */
......
......@@ -266,7 +266,8 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
that is known to be corrupted. */
ut_a(flst_add_first(rseg_header, TRX_RSEG + TRX_RSEG_HISTORY, undo_page,
uint16_t(page_offset(undo_header) +
TRX_UNDO_HISTORY_NODE), mtr) == DB_SUCCESS);
TRX_UNDO_HISTORY_NODE), rseg->space->free_limit,
mtr) == DB_SUCCESS);
mtr->write<2>(*undo_page, TRX_UNDO_SEG_HDR + TRX_UNDO_STATE +
undo_page->page.frame, undo_state);
......@@ -356,6 +357,19 @@ inline dberr_t purge_sys_t::iterator::free_history_rseg(trx_rseg_t &rseg) const
mtr_t mtr;
bool freed= false;
uint32_t rseg_ref= 0;
const auto last_boffset= srv_page_size - TRX_UNDO_LOG_OLD_HDR_SIZE;
/* Technically, rseg.space->free_limit is not protected by
rseg.latch, which we are holding, but rseg.space->latch. The value
that we are reading may become stale (too small) if other pages are
being allocated in this tablespace, for other rollback
segments. Nothing can be added to this rseg without holding
rseg.latch, and hence we can validate the entire file-based list
against the limit that we are reading here.
Note: The read here may look like a data race. On none of our target
architectures this should be an actual problem, because the uint32_t
value should always fit in a register and be correctly aligned. */
const auto last_page= rseg.space->free_limit;
mtr.start();
......@@ -371,13 +385,23 @@ inline dberr_t purge_sys_t::iterator::free_history_rseg(trx_rseg_t &rseg) const
}
hdr_addr= flst_get_last(TRX_RSEG + TRX_RSEG_HISTORY + rseg_hdr->page.frame);
hdr_addr.boffset= static_cast<uint16_t>(hdr_addr.boffset -
TRX_UNDO_HISTORY_NODE);
loop:
if (hdr_addr.page == FIL_NULL)
goto func_exit;
if (hdr_addr.page >= last_page ||
hdr_addr.boffset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE ||
hdr_addr.boffset >= last_boffset)
{
corrupted:
err= DB_CORRUPTION;
goto func_exit;
}
hdr_addr.boffset= static_cast<uint16_t>(hdr_addr.boffset -
TRX_UNDO_HISTORY_NODE);
loop:
buf_block_t *b=
buf_page_get_gen(page_id_t(rseg.space->id, hdr_addr.page),
0, RW_X_LATCH, nullptr, BUF_GET_POSSIBLY_FREED,
......@@ -426,11 +450,18 @@ inline dberr_t purge_sys_t::iterator::free_history_rseg(trx_rseg_t &rseg) const
fil_addr_t prev_hdr_addr=
flst_get_prev_addr(b->page.frame + hdr_addr.boffset +
TRX_UNDO_HISTORY_NODE);
if (prev_hdr_addr.page == FIL_NULL);
else if (prev_hdr_addr.page >= last_page ||
prev_hdr_addr.boffset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE ||
prev_hdr_addr.boffset >= last_boffset)
goto corrupted;
prev_hdr_addr.boffset= static_cast<uint16_t>(prev_hdr_addr.boffset -
TRX_UNDO_HISTORY_NODE);
err= flst_remove(rseg_hdr, TRX_RSEG + TRX_RSEG_HISTORY, b,
uint16_t(hdr_addr.boffset + TRX_UNDO_HISTORY_NODE), &mtr);
uint16_t(hdr_addr.boffset + TRX_UNDO_HISTORY_NODE),
last_page, &mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS))
goto func_exit;
......@@ -490,6 +521,9 @@ inline dberr_t purge_sys_t::iterator::free_history_rseg(trx_rseg_t &rseg) const
ut_ad(rseg_hdr->page.id() == rseg.page_id());
mtr.memo_push(rseg_hdr, MTR_MEMO_PAGE_X_FIX);
if (hdr_addr.page == FIL_NULL)
goto func_exit;
goto loop;
}
......@@ -780,13 +814,17 @@ bool purge_sys_t::rseg_get_next_history_log()
{
const byte *log_hdr= undo_page->page.frame + rseg->last_offset();
prev_log_addr= flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE);
if (prev_log_addr.boffset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE ||
prev_log_addr.boffset >= srv_page_size - TRX_UNDO_LOG_OLD_HDR_SIZE)
goto corrupted;
prev_log_addr.boffset = static_cast<uint16_t>(prev_log_addr.boffset -
TRX_UNDO_HISTORY_NODE);
}
else
prev_log_addr.page= FIL_NULL;
goto corrupted;
if (prev_log_addr.page == FIL_NULL)
if (prev_log_addr.page >= rseg->space->free_limit)
corrupted:
rseg->last_page_no= FIL_NULL;
else
{
......
......@@ -448,7 +448,14 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, mtr_t *mtr)
{
if (!rseg->space)
return DB_TABLESPACE_NOT_FOUND;
/* Access the tablespace header page to recover rseg->space->free_limit */
page_id_t page_id{rseg->space->id, 0};
dberr_t err;
if (!buf_page_get_gen(page_id, 0, RW_S_LATCH, nullptr, BUF_GET, mtr, &err))
return err;
mtr->release_last_page();
page_id.set_page_no(rseg->page_no);
const buf_block_t *rseg_hdr=
buf_page_get_gen(rseg->page_id(), 0, RW_S_LATCH, nullptr, BUF_GET, mtr,
&err);
......@@ -518,6 +525,11 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, mtr_t *mtr)
fil_addr_t node_addr= flst_get_last(TRX_RSEG + TRX_RSEG_HISTORY +
rseg_hdr->page.frame);
if (node_addr.page >= rseg->space->free_limit ||
node_addr.boffset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE ||
node_addr.boffset >= srv_page_size - TRX_UNDO_LOG_OLD_HDR_SIZE)
return DB_CORRUPTION;
node_addr.boffset= static_cast<uint16_t>(node_addr.boffset -
TRX_UNDO_HISTORY_NODE);
rseg->last_page_no= node_addr.page;
......
......@@ -513,7 +513,7 @@ trx_undo_seg_create(fil_space_t *space, buf_block_t *rseg_hdr, ulint *id,
*err = flst_add_last(block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST,
block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE,
mtr);
space->free_limit, mtr);
*id = slot_no;
mtr->write<4>(*rseg_hdr, TRX_RSEG + TRX_RSEG_UNDO_SLOTS
......@@ -696,7 +696,8 @@ buf_block_t *trx_undo_add_page(trx_undo_t *undo, mtr_t *mtr, dberr_t *err)
mtr->undo_create(*new_block);
trx_undo_page_init(*new_block);
*err= flst_add_last(header_block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST,
new_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE, mtr);
new_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE,
rseg->space->free_limit, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS))
new_block= nullptr;
else
......@@ -747,9 +748,11 @@ trx_undo_free_page(
buf_page_make_young_if_needed(&header_block->page);
const uint32_t limit = rseg->space->free_limit;
*err = flst_remove(header_block, TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST,
undo_block, TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE,
mtr);
limit, mtr);
if (UNIV_UNLIKELY(*err != DB_SUCCESS)) {
return FIL_NULL;
......@@ -758,7 +761,13 @@ trx_undo_free_page(
const fil_addr_t last_addr = flst_get_last(
TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST
+ header_block->page.frame);
if (UNIV_UNLIKELY(last_addr.page == page_no)) {
if (UNIV_UNLIKELY(last_addr.page == page_no)
|| UNIV_UNLIKELY(last_addr.page != FIL_NULL
&& last_addr.page >= limit)
|| UNIV_UNLIKELY(last_addr.boffset < TRX_UNDO_PAGE_HDR
+ TRX_UNDO_PAGE_NODE)
|| UNIV_UNLIKELY(last_addr.boffset >= srv_page_size
- TRX_UNDO_LOG_OLD_HDR_SIZE)) {
*err = DB_CORRUPTION;
return FIL_NULL;
}
......@@ -975,8 +984,8 @@ trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no)
ut_ad(id < TRX_RSEG_N_SLOTS);
mtr.start();
const buf_block_t* block = buf_page_get(
page_id_t(rseg->space->id, page_no), 0, RW_X_LATCH, &mtr);
const page_id_t page_id{rseg->space->id, page_no};
const buf_block_t* block = buf_page_get(page_id, 0, RW_X_LATCH, &mtr);
if (UNIV_UNLIKELY(!block)) {
corrupted:
mtr.commit();
......@@ -1078,6 +1087,15 @@ trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no)
fil_addr_t last_addr = flst_get_last(
TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + block->page.frame);
if (last_addr.page >= rseg->space->free_limit
|| last_addr.boffset < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE
|| last_addr.boffset >= srv_page_size
- TRX_UNDO_LOG_OLD_HDR_SIZE) {
corrupted_undo:
ut_free(undo);
goto corrupted;
}
undo->last_page_no = last_addr.page;
undo->top_page_no = last_addr.page;
......@@ -1086,8 +1104,7 @@ trx_undo_mem_create_at_db_start(trx_rseg_t *rseg, ulint id, uint32_t page_no)
RW_X_LATCH, &mtr);
if (UNIV_UNLIKELY(!last)) {
ut_free(undo);
goto corrupted;
goto corrupted_undo;
}
if (const trx_undo_rec_t* rec = trx_undo_page_get_last_rec(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment