Commit f74023b9 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-15090 Reduce the overhead of writing undo log records

Remove unnecessary repeated lookups for undo pages.

trx_undo_assign(), trx_undo_assign_low(), trx_undo_seg_create(),
trx_undo_create(): Return the undo log block to the caller.
parent 5d3c3b49
......@@ -255,28 +255,25 @@ trx_undo_lists_init(
/** Assign an undo log for a persistent transaction.
A new undo log is created or a cached undo log reused.
@param[in,out] trx transaction
@param[out] err error code
@param[in,out] mtr mini-transaction
@retval DB_SUCCESS on success
@retval DB_TOO_MANY_CONCURRENT_TRXS
@retval DB_OUT_OF_FILE_SPACE
@retval DB_READ_ONLY
@retval DB_OUT_OF_MEMORY */
dberr_t
trx_undo_assign(trx_t* trx, mtr_t* mtr)
MY_ATTRIBUTE((nonnull, warn_unused_result));
@return the undo log block
@retval NULL on error */
buf_block_t*
trx_undo_assign(trx_t* trx, dberr_t* err, mtr_t* mtr)
MY_ATTRIBUTE((nonnull));
/** Assign an undo log for a transaction.
A new undo log is created or a cached undo log reused.
@param[in,out] trx transaction
@param[in] rseg rollback segment
@param[out] undo the undo log
@param[out] err error code
@param[in,out] mtr mini-transaction
@retval DB_SUCCESS on success
@retval DB_TOO_MANY_CONCURRENT_TRXS
@retval DB_OUT_OF_FILE_SPACE
@retval DB_READ_ONLY
@retval DB_OUT_OF_MEMORY */
dberr_t
trx_undo_assign_low(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo, mtr_t*mtr)
@return the undo log block
@retval NULL on error */
buf_block_t*
trx_undo_assign_low(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo,
dberr_t* err, mtr_t* mtr)
MY_ATTRIBUTE((nonnull, warn_unused_result));
/******************************************************************//**
Sets the state of the undo log segment at a transaction finish.
......
......@@ -3403,7 +3403,7 @@ row_import_for_mysql(
{
mtr_t mtr;
mtr.start();
err = trx_undo_assign(trx, &mtr);
trx_undo_assign(trx, &err, &mtr);
mtr.commit();
}
......
......@@ -1823,7 +1823,7 @@ row_truncate_table_for_mysql(
mutex_enter(&trx->undo_mutex);
mtr_t mtr;
mtr.start();
err = trx_undo_assign(trx, &mtr);
trx_undo_assign(trx, &err, &mtr);
mtr.commit();
mutex_exit(&trx->undo_mutex);
......
......@@ -1902,22 +1902,15 @@ trx_undo_report_rename(trx_t* trx, const dict_table_t* table)
ut_ad(!table->is_temporary());
mtr_t mtr;
dberr_t err;
mtr.start();
mutex_enter(&trx->undo_mutex);
dberr_t err = trx_undo_assign(trx, &mtr);
ut_ad((err == DB_SUCCESS) == (trx->rsegs.m_redo.undo != NULL));
if (trx_undo_t* undo = trx->rsegs.m_redo.undo) {
buf_block_t* block = buf_page_get_gen(
page_id_t(undo->space, undo->last_page_no),
univ_page_size, RW_X_LATCH,
buf_pool_is_obsolete(undo->withdraw_clock)
? NULL : undo->guess_block,
BUF_GET, __FILE__, __LINE__, &mtr, &err);
ut_ad((err == DB_SUCCESS) == !!block);
for (ut_d(int loop_count = 0); block;) {
if (buf_block_t* block = trx_undo_assign(trx, &err, &mtr)) {
trx_undo_t* undo = trx->rsegs.m_redo.undo;
ut_ad(err == DB_SUCCESS);
ut_ad(undo);
for (ut_d(int loop_count = 0);;) {
ut_ad(++loop_count < 2);
buf_block_dbg_add_level(block, SYNC_TRX_UNDO_PAGE);
ut_ad(undo->last_page_no == block->page.id.page_no());
if (ulint offset = trx_undo_page_report_rename(
......@@ -1981,8 +1974,6 @@ trx_undo_report_row_operation(
undo log record */
{
trx_t* trx;
ulint page_no;
buf_block_t* undo_block;
mtr_t mtr;
#ifdef UNIV_DEBUG
int loop_count = 0;
......@@ -2017,27 +2008,19 @@ trx_undo_report_row_operation(
}
mutex_enter(&trx->undo_mutex);
dberr_t err = *pundo
? DB_SUCCESS : trx_undo_assign_low(trx, rseg, pundo, &mtr);
dberr_t err;
buf_block_t* undo_block = trx_undo_assign_low(trx, rseg, pundo,
&err, &mtr);
trx_undo_t* undo = *pundo;
ut_ad((err == DB_SUCCESS) == (undo != NULL));
if (undo == NULL) {
ut_ad((err == DB_SUCCESS) == (undo_block != NULL));
if (undo_block == NULL) {
goto err_exit;
}
page_no = undo->last_page_no;
undo_block = buf_page_get_gen(
page_id_t(undo->space, page_no), univ_page_size, RW_X_LATCH,
buf_pool_is_obsolete(undo->withdraw_clock)
? NULL : undo->guess_block, BUF_GET, __FILE__, __LINE__,
&mtr, &err);
buf_block_dbg_add_level(undo_block, SYNC_TRX_UNDO_PAGE);
ut_ad(undo != NULL);
do {
ut_ad(page_no == undo_block->page.id.page_no());
page_t* undo_page = buf_block_get_frame(undo_block);
ulint offset = !rec
? trx_undo_page_report_insert(
......@@ -2082,7 +2065,7 @@ trx_undo_report_row_operation(
mtr_commit(&mtr);
undo->empty = FALSE;
undo->top_page_no = page_no;
undo->top_page_no = undo_block->page.id.page_no();
undo->top_offset = offset;
undo->top_undo_no = trx->undo_no++;
undo->guess_block = undo_block;
......@@ -2113,11 +2096,11 @@ trx_undo_report_row_operation(
}
*roll_ptr = trx_undo_build_roll_ptr(
!rec, rseg->id, page_no, offset);
!rec, rseg->id, undo->top_page_no, offset);
return(DB_SUCCESS);
}
ut_ad(page_no == undo->last_page_no);
ut_ad(undo_block->page.id.page_no() == undo->last_page_no);
/* We have to extend the undo log by one page */
......@@ -2129,7 +2112,6 @@ trx_undo_report_row_operation(
}
undo_block = trx_undo_add_page(trx, undo, &mtr);
page_no = undo->last_page_no;
DBUG_EXECUTE_IF("ib_err_ins_undo_page_add_failure",
undo_block = NULL;);
......
......@@ -416,34 +416,23 @@ trx_undo_page_init(
MLOG_2BYTES, mtr);
}
/***************************************************************//**
Creates a new undo log segment in file.
@return DB_SUCCESS if page creation OK possible error codes are:
DB_TOO_MANY_CONCURRENT_TRXS DB_OUT_OF_FILE_SPACE */
static MY_ATTRIBUTE((warn_unused_result))
dberr_t
trx_undo_seg_create(
/*================*/
trx_rsegf_t* rseg_hdr,/*!< in: rollback segment header, page
x-latched */
ulint* id, /*!< out: slot index within rseg header */
page_t** undo_page,
/*!< out: segment header page x-latched, NULL
if there was an error */
mtr_t* mtr) /*!< in: mtr */
/** Create an undo log segment.
@param[in,out] rseg_hdr rollback segment header (x-latched)
@param[out] id undo slot number
@param[out] err error code
@param[in,out] mtr mini-transaction
@return undo log block
@retval NULL on failure */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
buf_block_t*
trx_undo_seg_create(trx_rsegf_t* rseg_hdr, ulint* id, dberr_t* err, mtr_t* mtr)
{
ulint slot_no;
ulint space;
buf_block_t* block;
trx_upagef_t* page_hdr;
trx_usegf_t* seg_hdr;
ulint n_reserved;
bool success;
ut_ad(mtr != NULL);
ut_ad(id != NULL);
ut_ad(rseg_hdr != NULL);
slot_no = trx_rsegf_undo_find_free(rseg_hdr, mtr);
if (slot_no == ULINT_UNDEFINED) {
......@@ -451,7 +440,8 @@ trx_undo_seg_create(
" you have too many active transactions running"
" concurrently?";
return(DB_TOO_MANY_CONCURRENT_TRXS);
*err = DB_TOO_MANY_CONCURRENT_TRXS;
return NULL;
}
space = page_get_space_id(page_align(rseg_hdr));
......@@ -459,8 +449,8 @@ trx_undo_seg_create(
success = fsp_reserve_free_extents(&n_reserved, space, 2, FSP_UNDO,
mtr);
if (!success) {
return(DB_OUT_OF_FILE_SPACE);
*err = DB_OUT_OF_FILE_SPACE;
return NULL;
}
/* Allocate a new file segment for the undo log */
......@@ -471,38 +461,35 @@ trx_undo_seg_create(
fil_space_release_free_extents(space, n_reserved);
if (block == NULL) {
/* No space left */
return(DB_OUT_OF_FILE_SPACE);
*err = DB_OUT_OF_FILE_SPACE;
return NULL;
}
buf_block_dbg_add_level(block, SYNC_TRX_UNDO_PAGE);
*undo_page = buf_block_get_frame(block);
page_hdr = *undo_page + TRX_UNDO_PAGE_HDR;
seg_hdr = *undo_page + TRX_UNDO_SEG_HDR;
trx_undo_page_init(*undo_page, mtr);
trx_undo_page_init(block->frame, mtr);
mlog_write_ulint(page_hdr + TRX_UNDO_PAGE_FREE,
mlog_write_ulint(TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE + block->frame,
TRX_UNDO_SEG_HDR + TRX_UNDO_SEG_HDR_SIZE,
MLOG_2BYTES, mtr);
mlog_write_ulint(seg_hdr + TRX_UNDO_LAST_LOG, 0, MLOG_2BYTES, mtr);
mlog_write_ulint(TRX_UNDO_SEG_HDR + TRX_UNDO_LAST_LOG + block->frame,
0, MLOG_2BYTES, mtr);
flst_init(seg_hdr + TRX_UNDO_PAGE_LIST, mtr);
flst_init(TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + block->frame, mtr);
flst_add_last(seg_hdr + TRX_UNDO_PAGE_LIST,
page_hdr + TRX_UNDO_PAGE_NODE, mtr);
flst_add_last(TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + block->frame,
TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_NODE + block->frame,
mtr);
trx_rsegf_set_nth_undo(rseg_hdr, slot_no,
page_get_page_no(*undo_page), mtr);
*id = slot_no;
trx_rsegf_set_nth_undo(rseg_hdr, slot_no, block->page.id.page_no(),
mtr);
MONITOR_INC(MONITOR_NUM_UNDO_SLOT_USED);
return(DB_SUCCESS);
*err = DB_SUCCESS;
return block;
}
/**********************************************************************//**
......@@ -1288,58 +1275,50 @@ trx_undo_mem_free(
ut_free(undo);
}
/**********************************************************************//**
Creates a new undo log.
@return DB_SUCCESS if successful in creating the new undo lob object,
possible error codes are: DB_TOO_MANY_CONCURRENT_TRXS
DB_OUT_OF_FILE_SPACE DB_OUT_OF_MEMORY */
/** Create an undo log.
@param[in,out] trx transaction
@param[in,out] rseg rollback segment
@param[out] undo undo log object
@param[out] err error code
@param[in,out] mtr mini-transaction
@return undo log block
@retval NULL on failure */
static MY_ATTRIBUTE((nonnull, warn_unused_result))
dberr_t
trx_undo_create(
/*============*/
trx_t* trx, /*!< in: transaction */
trx_rseg_t* rseg, /*!< in: rollback segment memory copy */
trx_undo_t** undo, /*!< out: the new undo log object, undefined
* if did not succeed */
mtr_t* mtr) /*!< in: mtr */
buf_block_t*
trx_undo_create(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo,
dberr_t* err, mtr_t* mtr)
{
trx_rsegf_t* rseg_header;
ulint page_no;
ulint offset;
ulint id;
page_t* undo_page;
dberr_t err;
ut_ad(mutex_own(&(rseg->mutex)));
if (rseg->curr_size == rseg->max_size) {
return(DB_OUT_OF_FILE_SPACE);
*err = DB_OUT_OF_FILE_SPACE;
return NULL;
}
rseg_header = trx_rsegf_get(rseg->space, rseg->page_no, mtr);
buf_block_t* block = trx_undo_seg_create(
trx_rsegf_get(rseg->space, rseg->page_no, mtr), &id, err, mtr);
err = trx_undo_seg_create(rseg_header, &id, &undo_page, mtr);
if (err != DB_SUCCESS) {
return(err);
if (!block) {
return block;
}
rseg->curr_size++;
page_no = page_get_page_no(undo_page);
offset = trx_undo_header_create(undo_page, trx->id, mtr);
ulint offset = trx_undo_header_create(block->frame, trx->id, mtr);
trx_undo_header_add_space_for_xid(undo_page, undo_page + offset, mtr);
trx_undo_header_add_space_for_xid(block->frame, block->frame + offset,
mtr);
*undo = trx_undo_mem_create(rseg, id, trx->id, trx->xid,
page_no, offset);
block->page.id.page_no(), offset);
if (*undo == NULL) {
return DB_OUT_OF_MEMORY;
*err = DB_OUT_OF_MEMORY;
/* FIXME: this will not free the undo block to the file */
return NULL;
} else if (rseg != trx->rsegs.m_redo.rseg) {
return DB_SUCCESS;
return block;
}
switch (trx_get_dict_operation(trx)) {
......@@ -1352,60 +1331,68 @@ trx_undo_create(
case TRX_DICT_OP_TABLE:
(*undo)->table_id = trx->table_id;
(*undo)->dict_operation = TRUE;
mlog_write_ulint(undo_page + offset + TRX_UNDO_DICT_TRANS,
mlog_write_ulint(block->frame + offset + TRX_UNDO_DICT_TRANS,
TRUE, MLOG_1BYTE, mtr);
mlog_write_ull(undo_page + offset + TRX_UNDO_TABLE_ID,
mlog_write_ull(block->frame + offset + TRX_UNDO_TABLE_ID,
trx->table_id, mtr);
}
return DB_SUCCESS;
*err = DB_SUCCESS;
return block;
}
/*================ UNDO LOG ASSIGNMENT AND CLEANUP =====================*/
/********************************************************************//**
Reuses a cached undo log.
@return the undo log memory object, NULL if none cached */
/** Reuse a cached undo log block.
@param[in,out] trx transaction
@param[in,out] rseg rollback segment
@param[out] pundo the undo log memory object
@param[in,out] mtr mini-transaction
@return the undo log block
@retval NULL if none cached */
static
trx_undo_t*
trx_undo_reuse_cached(
/*==================*/
trx_t* trx, /*!< in: transaction */
trx_rseg_t* rseg, /*!< in: rollback segment memory object */
mtr_t* mtr) /*!< in: mtr */
buf_block_t*
trx_undo_reuse_cached(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** pundo,
mtr_t* mtr)
{
trx_undo_t* undo;
page_t* undo_page;
ulint offset;
ut_ad(mutex_own(&rseg->mutex));
ut_ad(mutex_own(&(rseg->mutex)));
trx_undo_t* undo = UT_LIST_GET_FIRST(rseg->undo_cached);
if (!undo) {
return NULL;
}
undo = UT_LIST_GET_FIRST(rseg->undo_cached);
if (undo == NULL) {
return(NULL);
ut_ad(undo->size == 1);
ut_ad(undo->id < TRX_RSEG_N_SLOTS);
buf_block_t* block = buf_page_get(page_id_t(undo->space,
undo->hdr_page_no),
univ_page_size, RW_X_LATCH, mtr);
if (!block) {
return NULL;
}
buf_block_dbg_add_level(block, SYNC_TRX_UNDO_PAGE);
UT_LIST_REMOVE(rseg->undo_cached, undo);
MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_CACHED);
ut_ad(undo->size == 1);
ut_a(undo->id < TRX_RSEG_N_SLOTS);
*pundo = undo;
undo_page = trx_undo_page_get(
page_id_t(undo->space, undo->hdr_page_no), mtr);
offset = trx_undo_header_create(undo_page, trx->id, mtr);
ulint offset = trx_undo_header_create(block->frame, trx->id, mtr);
trx_undo_header_add_space_for_xid(undo_page, undo_page + offset, mtr);
trx_undo_header_add_space_for_xid(block->frame, block->frame + offset,
mtr);
trx_undo_mem_init_for_reuse(undo, trx->id, trx->xid, offset);
if (rseg != trx->rsegs.m_redo.rseg) {
return undo;
return block;
}
switch (trx_get_dict_operation(trx)) {
case TRX_DICT_OP_NONE:
return undo;
return block;
case TRX_DICT_OP_INDEX:
/* Do not discard the table on recovery. */
trx->table_id = 0;
......@@ -1413,52 +1400,61 @@ trx_undo_reuse_cached(
case TRX_DICT_OP_TABLE:
undo->table_id = trx->table_id;
undo->dict_operation = TRUE;
mlog_write_ulint(undo_page + offset + TRX_UNDO_DICT_TRANS,
mlog_write_ulint(block->frame + offset + TRX_UNDO_DICT_TRANS,
TRUE, MLOG_1BYTE, mtr);
mlog_write_ull(undo_page + offset + TRX_UNDO_TABLE_ID,
mlog_write_ull(block->frame + offset + TRX_UNDO_TABLE_ID,
trx->table_id, mtr);
}
return(undo);
return block;
}
/** Assign an undo log for a persistent transaction.
A new undo log is created or a cached undo log reused.
@param[in,out] trx transaction
@param[out] err error code
@param[in,out] mtr mini-transaction
@retval DB_SUCCESS on success
@retval DB_TOO_MANY_CONCURRENT_TRXS
@retval DB_OUT_OF_FILE_SPACE
@retval DB_READ_ONLY
@retval DB_OUT_OF_MEMORY */
dberr_t
trx_undo_assign(trx_t* trx, mtr_t* mtr)
@return the undo log block
@retval NULL on error */
buf_block_t*
trx_undo_assign(trx_t* trx, dberr_t* err, mtr_t* mtr)
{
dberr_t err = DB_SUCCESS;
ut_ad(mutex_own(&trx->undo_mutex));
ut_ad(mtr->get_log_mode() == MTR_LOG_ALL);
if (trx->rsegs.m_redo.undo) {
return DB_SUCCESS;
trx_undo_t* undo = trx->rsegs.m_redo.undo;
if (undo) {
return buf_page_get_gen(
page_id_t(undo->space, undo->last_page_no),
univ_page_size, RW_X_LATCH,
buf_pool_is_obsolete(undo->withdraw_clock)
? NULL : undo->guess_block,
BUF_GET, __FILE__, __LINE__, mtr, err);
}
trx_rseg_t* rseg = trx->rsegs.m_redo.rseg;
mutex_enter(&rseg->mutex);
if (!(trx->rsegs.m_redo.undo= trx_undo_reuse_cached(trx, rseg, mtr))) {
err = trx_undo_create(trx, rseg, &trx->rsegs.m_redo.undo, mtr);
if (err != DB_SUCCESS) {
buf_block_t* block = trx_undo_reuse_cached(
trx, rseg, &trx->rsegs.m_redo.undo, mtr);
if (!block) {
block = trx_undo_create(trx, rseg, &trx->rsegs.m_redo.undo,
err, mtr);
ut_ad(!block == (*err != DB_SUCCESS));
if (!block) {
goto func_exit;
}
} else {
*err = DB_SUCCESS;
}
UT_LIST_ADD_FIRST(rseg->undo_list, trx->rsegs.m_redo.undo);
func_exit:
mutex_exit(&rseg->mutex);
return err;
return block;
}
/** Assign an undo log for a transaction.
......@@ -1466,17 +1462,15 @@ A new undo log is created or a cached undo log reused.
@param[in,out] trx transaction
@param[in] rseg rollback segment
@param[out] undo the undo log
@param[out] err error code
@param[in,out] mtr mini-transaction
@retval DB_SUCCESS on success
@retval DB_TOO_MANY_CONCURRENT_TRXS
@retval DB_OUT_OF_FILE_SPACE
@retval DB_READ_ONLY
@retval DB_OUT_OF_MEMORY */
dberr_t
trx_undo_assign_low(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo, mtr_t*mtr)
@return the undo log block
@retval NULL on error */
buf_block_t*
trx_undo_assign_low(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo,
dberr_t* err, mtr_t* mtr)
{
const bool is_temp = rseg == trx->rsegs.m_noredo.rseg;
dberr_t err = DB_SUCCESS;
ut_ad(mutex_own(&trx->undo_mutex));
ut_ad(rseg == trx->rsegs.m_redo.rseg
......@@ -1484,30 +1478,42 @@ trx_undo_assign_low(trx_t* trx, trx_rseg_t* rseg, trx_undo_t** undo, mtr_t*mtr)
ut_ad(undo == (is_temp
? &trx->rsegs.m_noredo.undo
: &trx->rsegs.m_redo.undo));
ut_ad(!*undo);
ut_ad(mtr->get_log_mode()
== (is_temp ? MTR_LOG_NO_REDO : MTR_LOG_ALL));
mutex_enter(&rseg->mutex);
if (*undo) {
return buf_page_get_gen(
page_id_t((*undo)->space, (*undo)->last_page_no),
univ_page_size, RW_X_LATCH,
buf_pool_is_obsolete((*undo)->withdraw_clock)
? NULL : (*undo)->guess_block,
BUF_GET, __FILE__, __LINE__, mtr, err);
}
DBUG_EXECUTE_IF(
"ib_create_table_fail_too_many_trx",
err = DB_TOO_MANY_CONCURRENT_TRXS;
goto func_exit;
*err = DB_TOO_MANY_CONCURRENT_TRXS; return NULL;
);
if (!(*undo= trx_undo_reuse_cached(trx, rseg, mtr))) {
err = trx_undo_create(trx, rseg, undo, mtr);
if (err != DB_SUCCESS) {
mutex_enter(&rseg->mutex);
buf_block_t* block = trx_undo_reuse_cached(trx, rseg, undo, mtr);
if (!block) {
block = trx_undo_create(trx, rseg, undo, err, mtr);
ut_ad(!block == (*err != DB_SUCCESS));
if (!block) {
goto func_exit;
}
} else {
*err = DB_SUCCESS;
}
UT_LIST_ADD_FIRST(rseg->undo_list, *undo);
func_exit:
mutex_exit(&rseg->mutex);
return(err);
return block;
}
/******************************************************************//**
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment