Commit 86767bcc authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-29593 Purge misses a chance to free not-yet-reused undo pages

trx_purge_truncate_rseg_history(): If all other conditions for
invoking trx_purge_remove_log_hdr() hold, but the state is
TRX_UNDO_CACHED instead of TRX_UNDO_TO_PURGE, detach and free it.

Tested by: Matthias Leich
parent 40eff3f8
......@@ -343,7 +343,7 @@ struct mtr_t {
{
mtr_memo_slot_t &slot= m_memo[savepoint];
ut_ad(slot.type <= MTR_MEMO_BUF_FIX);
ut_ad(type <= MTR_MEMO_BUF_FIX);
ut_ad(type < MTR_MEMO_S_LOCK);
slot.type= type;
}
......
......@@ -275,7 +275,7 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
if (undo->state != TRX_UNDO_CACHED) {
/* The undo log segment will not be reused */
ut_a(undo->id < TRX_RSEG_N_SLOTS);
compile_time_assert(FIL_NULL == 0xffffffff);
static_assert(FIL_NULL == 0xffffffff, "");
mtr->memset(rseg_header,
TRX_RSEG + TRX_RSEG_UNDO_SLOTS
+ undo->id * TRX_RSEG_SLOT_SIZE, 4, 0xff);
......@@ -385,45 +385,11 @@ static dberr_t trx_purge_remove_log_hdr(buf_block_t *rseg, buf_block_t* log,
uint16_t(offset + TRX_UNDO_HISTORY_NODE), mtr);
}
MY_ATTRIBUTE((nonnull, warn_unused_result))
/** Free an undo log segment, and remove the header from the history list.
@param[in,out] mtr mini-transaction
@param[in,out] rseg rollback segment
@param[in] hdr_addr file address of log_hdr
@return error code */
static dberr_t
trx_purge_free_segment(mtr_t &mtr, trx_rseg_t* rseg, fil_addr_t hdr_addr)
/** Free an undo log segment.
@param block rollback segment header page
@param mtr mini-transaction */
static void trx_purge_free_segment(buf_block_t *block, mtr_t &mtr)
{
mtr.commit();
log_free_check();
mtr.start();
const page_id_t hdr_page_id{rseg->space->id, hdr_addr.page};
dberr_t err;
buf_block_t *rseg_hdr= rseg->get(&mtr, &err);
if (!rseg_hdr)
return err;
buf_block_t *block= buf_page_get_gen(hdr_page_id, 0, RW_X_LATCH,
nullptr, BUF_GET_POSSIBLY_FREED,
&mtr, &err);
if (!block)
return err;
const uint32_t seg_size=
flst_get_len(TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + block->page.frame);
err= trx_purge_remove_log_hdr(rseg_hdr, block, hdr_addr.boffset, &mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS))
return err;
ut_ad(rseg->curr_size >= seg_size);
rseg->curr_size-= seg_size;
rseg->history_size--;
byte *hist= TRX_RSEG + TRX_RSEG_HISTORY_SIZE + rseg_hdr->page.frame;
ut_ad(mach_read_from_4(hist) >= seg_size);
mtr.write<4>(*rseg_hdr, hist, mach_read_from_4(hist) - seg_size);
while (!fseg_free_step_not_header(TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER +
block->page.frame, &mtr))
{
......@@ -444,9 +410,9 @@ trx_purge_free_segment(mtr_t &mtr, trx_rseg_t* rseg, fil_addr_t hdr_addr)
{
block->unfix();
block->page.lock.x_unlock();
block= buf_page_get_gen(id, 0, RW_X_LATCH, nullptr, BUF_GET, &mtr, &err);
block= buf_page_get_gen(id, 0, RW_X_LATCH, nullptr, BUF_GET, &mtr);
if (!block)
return err;
return;
}
else
mtr.memo_push(block, MTR_MEMO_PAGE_X_MODIFY);
......@@ -454,102 +420,135 @@ trx_purge_free_segment(mtr_t &mtr, trx_rseg_t* rseg, fil_addr_t hdr_addr)
while (!fseg_free_step(TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER +
block->page.frame, &mtr));
return DB_SUCCESS;
}
/** Remove unnecessary history data from a rollback segment.
@param[in,out] rseg rollback segment
@param[in] limit truncate anything before this
@return error code */
static
dberr_t
trx_purge_truncate_rseg_history(
trx_rseg_t& rseg,
const purge_sys_t::iterator& limit)
static dberr_t
trx_purge_truncate_rseg_history(trx_rseg_t& rseg,
const purge_sys_t::iterator& limit)
{
fil_addr_t hdr_addr;
mtr_t mtr;
fil_addr_t hdr_addr;
mtr_t mtr;
mtr.start();
log_free_check();
mtr.start();
dberr_t err;
buf_block_t* rseg_hdr = rseg.get(&mtr, &err);
if (!rseg_hdr) {
goto func_exit;
}
dberr_t err;
reget:
buf_block_t *rseg_hdr= rseg.get(&mtr, &err);
if (!rseg_hdr)
{
func_exit:
mtr.commit();
return err;
}
hdr_addr = flst_get_last(TRX_RSEG + TRX_RSEG_HISTORY
+ rseg_hdr->page.frame);
hdr_addr.boffset = static_cast<uint16_t>(hdr_addr.boffset
- TRX_UNDO_HISTORY_NODE);
hdr_addr= flst_get_last(TRX_RSEG + TRX_RSEG_HISTORY + rseg_hdr->page.frame);
hdr_addr.boffset= static_cast<uint16_t>(hdr_addr.boffset -
TRX_UNDO_HISTORY_NODE);
loop:
if (hdr_addr.page == FIL_NULL) {
func_exit:
mtr.commit();
return err;
}
if (hdr_addr.page == FIL_NULL)
goto func_exit;
buf_block_t* block = buf_page_get_gen(page_id_t(rseg.space->id,
hdr_addr.page),
0, RW_X_LATCH, nullptr,
BUF_GET_POSSIBLY_FREED,
&mtr, &err);
if (!block) {
goto func_exit;
}
buf_block_t *b=
buf_page_get_gen(page_id_t(rseg.space->id, hdr_addr.page),
0, RW_X_LATCH, nullptr, BUF_GET_POSSIBLY_FREED,
&mtr, &err);
if (!b)
goto func_exit;
const trx_id_t undo_trx_no = mach_read_from_8(
block->page.frame + hdr_addr.boffset + TRX_UNDO_TRX_NO);
const trx_id_t undo_trx_no=
mach_read_from_8(b->page.frame + hdr_addr.boffset + TRX_UNDO_TRX_NO);
if (undo_trx_no >= limit.trx_no) {
if (undo_trx_no == limit.trx_no) {
err = trx_undo_truncate_start(
&rseg, hdr_addr.page,
hdr_addr.boffset, limit.undo_no);
}
if (undo_trx_no >= limit.trx_no)
{
if (undo_trx_no == limit.trx_no)
err = trx_undo_truncate_start(&rseg, hdr_addr.page,
hdr_addr.boffset, limit.undo_no);
goto func_exit;
}
goto func_exit;
}
fil_addr_t prev_hdr_addr=
flst_get_prev_addr(b->page.frame + hdr_addr.boffset +
TRX_UNDO_HISTORY_NODE);
prev_hdr_addr.boffset= static_cast<uint16_t>(prev_hdr_addr.boffset -
TRX_UNDO_HISTORY_NODE);
err= trx_purge_remove_log_hdr(rseg_hdr, b, hdr_addr.boffset, &mtr);
if (UNIV_UNLIKELY(err != DB_SUCCESS))
goto func_exit;
fil_addr_t prev_hdr_addr = flst_get_prev_addr(
block->page.frame + hdr_addr.boffset + TRX_UNDO_HISTORY_NODE);
prev_hdr_addr.boffset = static_cast<uint16_t>(prev_hdr_addr.boffset
- TRX_UNDO_HISTORY_NODE);
if (!rseg.is_referenced()
&& rseg.needs_purge <= (purge_sys.head.trx_no
? purge_sys.head.trx_no
: purge_sys.tail.trx_no)
&& mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE
+ block->page.frame)
== TRX_UNDO_TO_PURGE
&& !mach_read_from_2(block->page.frame + hdr_addr.boffset
+ TRX_UNDO_NEXT_LOG)) {
/* We can free the whole log segment.
This will call trx_purge_remove_log_hdr(). */
err = trx_purge_free_segment(mtr, &rseg, hdr_addr);
} else {
/* Remove the log hdr from the rseg history. */
rseg.history_size--;
err = trx_purge_remove_log_hdr(rseg_hdr, block,
hdr_addr.boffset, &mtr);
}
rseg_hdr->fix();
mtr.commit();
if (err != DB_SUCCESS) {
return err;
}
mtr.start();
if (mach_read_from_2(b->page.frame + hdr_addr.boffset + TRX_UNDO_NEXT_LOG) ||
rseg.is_referenced() ||
rseg.needs_purge > (purge_sys.head.trx_no
? purge_sys.head.trx_no
: purge_sys.tail.trx_no))
/* We cannot free the entire undo page. */;
else
{
const uint32_t seg_size=
flst_get_len(TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + b->page.frame);
switch (mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE +
b->page.frame)) {
case TRX_UNDO_TO_PURGE:
{
byte *hist= TRX_RSEG + TRX_RSEG_HISTORY_SIZE + rseg_hdr->page.frame;
ut_ad(mach_read_from_4(hist) >= seg_size);
mtr.write<4>(*rseg_hdr, hist, mach_read_from_4(hist) - seg_size);
}
free_segment:
ut_ad(rseg.curr_size >= seg_size);
rseg.curr_size-= seg_size;
trx_purge_free_segment(b, mtr);
break;
case TRX_UNDO_CACHED:
/* rseg.undo_cached must point to this page */
trx_undo_t *undo= UT_LIST_GET_FIRST(rseg.undo_cached);
for (; undo; undo= UT_LIST_GET_NEXT(undo_list, undo))
if (undo->hdr_page_no == hdr_addr.page)
goto found_cached;
ut_ad("inconsistent undo logs" == 0);
break;
found_cached:
UT_LIST_REMOVE(rseg.undo_cached, undo);
static_assert(FIL_NULL == 0xffffffff, "");
if (UNIV_UNLIKELY(mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT +
rseg_hdr->page.frame)))
trx_rseg_format_upgrade(rseg_hdr, &mtr);
mtr.memset(rseg_hdr, TRX_RSEG + TRX_RSEG_UNDO_SLOTS +
undo->id * TRX_RSEG_SLOT_SIZE, 4, 0xff);
ut_free(undo);
mtr.write<8,mtr_t::MAYBE_NOP>(*rseg_hdr, TRX_RSEG + TRX_RSEG_MAX_TRX_ID +
rseg_hdr->page.frame,
trx_sys.get_max_trx_id() - 1);
MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_CACHED);
MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED);
goto free_segment;
}
}
hdr_addr = prev_hdr_addr;
hdr_addr= prev_hdr_addr;
rseg_hdr = rseg.get(&mtr, &err);
if (!rseg_hdr) {
goto func_exit;
}
mtr.commit();
ut_ad(rseg.history_size > 0);
rseg.history_size--;
log_free_check();
mtr.start();
rseg_hdr->page.lock.x_lock();
if (UNIV_UNLIKELY(rseg_hdr->page.id() != rseg.page_id()))
{
rseg_hdr->unfix();
rseg_hdr->page.lock.x_unlock();
goto reget;
}
mtr.memo_push(rseg_hdr, MTR_MEMO_PAGE_X_MODIFY);
goto loop;
goto loop;
}
/** Cleanse purge queue to remove the rseg that reside in undo-tablespace
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment