Commit f4bbea90 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-30100 preparation: Simplify InnoDB transaction commit

trx_commit_cleanup(): Clean up any temporary undo log.
Replaces trx_undo_commit_cleanup() and trx_undo_seg_free().

trx_write_serialisation_history(): Commit the mini-transaction.
Do not touch temporary undo logs. Assume that a persistent rollback
segment has been assigned.

trx_serialise(): Merged into trx_write_serialisation_history().

trx_t::commit_low(): Correct some comments and assertions.

trx_t::commit_persist(): Only invoke commit_low() on a mini-transaction
if the persistent state needs to change.
parent eda75cad
......@@ -11498,7 +11498,7 @@ ha_innobase::commit_inplace_alter_table(
DEBUG_SYNC(m_user_thd, "innodb_alter_inplace_before_commit");
if (new_clustered) {
ut_ad(trx->has_logged());
ut_ad(trx->has_logged_persistent());
for (inplace_alter_handler_ctx** pctx = ctx_array; *pctx;
pctx++) {
auto ctx= static_cast<ha_innobase_inplace_ctx*>(*pctx);
......
......@@ -226,12 +226,6 @@ void trx_undo_set_state_at_prepare(trx_t *trx, trx_undo_t *undo, bool rollback,
mtr_t *mtr)
MY_ATTRIBUTE((nonnull));
/** Free temporary undo log after commit or rollback.
The information is not needed after a commit or rollback, therefore
the data can be discarded.
@param undo temporary undo log */
void trx_undo_commit_cleanup(trx_undo_t *undo);
/** At shutdown, frees the undo logs of a transaction. */
void
trx_undo_free_at_shutdown(trx_t *trx);
......
......@@ -969,93 +969,50 @@ trx_start_low(
ut_a(trx->error_state == DB_SUCCESS);
}
/** Set the serialisation number for a persistent committed transaction.
@param[in,out] trx committed transaction with persistent changes */
static
void
trx_serialise(trx_t* trx)
{
trx_rseg_t *rseg = trx->rsegs.m_redo.rseg;
ut_ad(rseg);
if (rseg->last_page_no == FIL_NULL) {
mysql_mutex_lock(&purge_sys.pq_mutex);
}
trx_sys.assign_new_trx_no(trx);
/* If the rollback segment is not empty then the
new trx_t::no can't be less than any trx_t::no
already in the rollback segment. User threads only
produce events when a rollback segment is empty. */
if (rseg->last_page_no == FIL_NULL) {
purge_sys.purge_queue.push(TrxUndoRsegs(trx->rw_trx_hash_element->no,
*rseg));
mysql_mutex_unlock(&purge_sys.pq_mutex);
}
}
/****************************************************************//**
Assign the transaction its history serialisation number and write the
update UNDO log record to the assigned rollback segment. */
static
void
trx_write_serialisation_history(
/*============================*/
trx_t* trx, /*!< in/out: transaction */
mtr_t* mtr) /*!< in/out: mini-transaction */
/** Assign the transaction its history serialisation number and write the
UNDO log to the assigned rollback segment.
@param trx persistent transaction
@param mtr mini-transaction */
static void trx_write_serialisation_history(trx_t *trx, mtr_t *mtr)
{
/* Change the undo log segment states from TRX_UNDO_ACTIVE to some
other state: these modifications to the file data structure define
the transaction as committed in the file based domain, at the
serialization point of the log sequence number lsn obtained below. */
/* We have to hold the rseg mutex because update log headers have
to be put to the history list in the (serialisation) order of the
UNDO trx number. This is required for the purge in-memory data
structures too. */
if (trx_undo_t* undo = trx->rsegs.m_noredo.undo) {
/* Undo log for temporary tables is discarded at transaction
commit. There is no purge for temporary tables, and also no
MVCC, because they are private to a session. */
mtr_t temp_mtr;
temp_mtr.start();
temp_mtr.set_log_mode(MTR_LOG_NO_REDO);
buf_block_t* block= buf_page_get(page_id_t(SRV_TMP_SPACE_ID,
undo->hdr_page_no),
0, RW_X_LATCH, mtr);
ut_a(block);
temp_mtr.write<2>(*block, TRX_UNDO_SEG_HDR + TRX_UNDO_STATE
+ block->page.frame, TRX_UNDO_TO_PURGE);
undo->state = TRX_UNDO_TO_PURGE;
temp_mtr.commit();
}
trx_rseg_t* rseg = trx->rsegs.m_redo.rseg;
if (!rseg) {
ut_ad(!trx->rsegs.m_redo.undo);
return;
}
trx_undo_t*& undo = trx->rsegs.m_redo.undo;
ut_ad(!trx->read_only);
/* Assign the transaction serialisation number and add any
undo log to the purge queue. */
if (undo) {
rseg->latch.wr_lock(SRW_LOCK_CALL);
ut_ad(undo->rseg == rseg);
trx_serialise(trx);
UT_LIST_REMOVE(rseg->undo_list, undo);
trx_purge_add_undo_to_history(trx, undo, mtr);
MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);
rseg->latch.wr_unlock();
}
rseg->release();
ut_ad(!trx->read_only);
trx_rseg_t *rseg= trx->rsegs.m_redo.rseg;
trx_undo_t *&undo= trx->rsegs.m_redo.undo;
if (UNIV_LIKELY(undo != nullptr))
{
MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);
/* We have to hold exclusive rseg->latch because undo log headers have
to be put to the history list in the (serialisation) order of the
UNDO trx number. This is required for purge_sys too. */
rseg->latch.wr_lock(SRW_LOCK_CALL);
ut_ad(undo->rseg == rseg);
/* Assign the transaction serialisation number and add any
undo log to the purge queue. */
if (rseg->last_page_no == FIL_NULL)
{
mysql_mutex_lock(&purge_sys.pq_mutex);
trx_sys.assign_new_trx_no(trx);
/* If the rollback segment is not empty, trx->no cannot be less
than any trx_t::no already in rseg. User threads only produce
events when a rollback segment is empty. */
purge_sys.purge_queue.push(TrxUndoRsegs(trx->rw_trx_hash_element->no,
*rseg));
mysql_mutex_unlock(&purge_sys.pq_mutex);
}
else
trx_sys.assign_new_trx_no(trx);
UT_LIST_REMOVE(rseg->undo_list, undo);
/* Change the undo log segment state from TRX_UNDO_ACTIVE, to
define the transaction as committed in the file based domain,
at mtr->commit_lsn() obtained in mtr->commit() below. */
trx_purge_add_undo_to_history(trx, undo, mtr);
rseg->release();
rseg->latch.wr_unlock();
}
else
rseg->release();
mtr->commit();
}
/********************************************************************
......@@ -1229,6 +1186,55 @@ void trx_t::evict_table(table_id_t table_id, bool reset_only)
}
}
/** Free temporary undo log after commit or rollback.
@param undo temporary undo log */
ATTRIBUTE_NOINLINE static void trx_commit_cleanup(trx_undo_t *&undo)
{
trx_rseg_t *const rseg= undo->rseg;
ut_ad(rseg->space == fil_system.temp_space);
rseg->latch.wr_lock(SRW_LOCK_CALL);
UT_LIST_REMOVE(rseg->undo_list, undo);
ut_ad(undo->state == TRX_UNDO_ACTIVE || undo->state == TRX_UNDO_PREPARED);
ut_ad(undo->id < TRX_RSEG_N_SLOTS);
/* Delete first the undo log segment in the file */
bool finished;
mtr_t mtr;
do
{
mtr.start();
mtr.set_log_mode(MTR_LOG_NO_REDO);
finished= true;
if (buf_block_t *block=
buf_page_get(page_id_t(SRV_TMP_SPACE_ID, undo->hdr_page_no), 0,
RW_X_LATCH, &mtr))
{
fseg_header_t *file_seg= TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER +
block->page.frame;
finished= fseg_free_step(file_seg, &mtr);
if (!finished);
else if (buf_block_t *rseg_header= rseg->get(&mtr, nullptr))
{
static_assert(FIL_NULL == 0xffffffff, "compatibility");
memset(rseg_header->page.frame + TRX_RSEG + TRX_RSEG_UNDO_SLOTS +
undo->id * TRX_RSEG_SLOT_SIZE, 0xff, 4);
}
}
mtr.commit();
}
while (!finished);
ut_ad(rseg->curr_size > undo->size);
rseg->curr_size-= undo->size;
rseg->latch.wr_unlock();
ut_free(undo);
undo= nullptr;
}
TRANSACTIONAL_INLINE inline void trx_t::commit_in_memory(const mtr_t *mtr)
{
/* We already detached from rseg in trx_write_serialisation_history() */
......@@ -1300,15 +1306,14 @@ TRANSACTIONAL_INLINE inline void trx_t::commit_in_memory(const mtr_t *mtr)
release_locks();
}
if (mtr)
if (trx_undo_t *&undo= rsegs.m_noredo.undo)
{
if (trx_undo_t *&undo= rsegs.m_noredo.undo)
{
ut_ad(undo->rseg == rsegs.m_noredo.rseg);
trx_undo_commit_cleanup(undo);
undo= nullptr;
}
ut_ad(undo->rseg == rsegs.m_noredo.rseg);
trx_commit_cleanup(undo);
}
if (mtr)
{
/* NOTE that we could possibly make a group commit more efficient
here: call std::this_thread::yield() here to allow also other trxs to come
to commit! */
......@@ -1346,8 +1351,6 @@ TRANSACTIONAL_INLINE inline void trx_t::commit_in_memory(const mtr_t *mtr)
trx_flush_log_if_needed(commit_lsn, this);
}
ut_ad(!rsegs.m_noredo.undo);
savepoints_discard();
if (fts_trx)
......@@ -1390,7 +1393,7 @@ TRANSACTIONAL_TARGET void trx_t::commit_low(mtr_t *mtr)
{
ut_ad(!mtr || mtr->is_active());
ut_d(bool aborted= in_rollback && error_state == DB_DEADLOCK);
ut_ad(!mtr == (aborted || !has_logged()));
ut_ad(!mtr == (aborted || !has_logged_persistent()));
ut_ad(!mtr || !aborted);
if (fts_trx && undo_no)
......@@ -1416,7 +1419,6 @@ TRANSACTIONAL_TARGET void trx_t::commit_low(mtr_t *mtr)
{
if (UNIV_UNLIKELY(apply_online_log))
apply_log();
trx_write_serialisation_history(this, mtr);
/* The following call commits the mini-transaction, making the
whole transaction committed in the file-based world, at this log
......@@ -1424,16 +1426,12 @@ TRANSACTIONAL_TARGET void trx_t::commit_low(mtr_t *mtr)
the log to disk, but in the logical sense the commit in the
file-based data structures (undo logs etc.) happens here.
NOTE that transaction numbers, which are assigned only to
transactions with an update undo log, do not necessarily come in
NOTE that transaction numbers do not necessarily come in
exactly the same order as commit lsn's, if the transactions have
different rollback segments. To get exactly the same order we
should hold the kernel mutex up to this point, adding to the
contention of the kernel mutex. However, if a transaction T2 is
different rollback segments. However, if a transaction T2 is
able to see modifications made by a transaction T1, T2 will always
get a bigger transaction number and a bigger commit lsn than T1. */
mtr->commit();
trx_write_serialisation_history(this, mtr);
}
else if (trx_rseg_t *rseg= rsegs.m_redo.rseg)
{
......@@ -1456,7 +1454,7 @@ void trx_t::commit_persist()
mtr_t *mtr= nullptr;
mtr_t local_mtr;
if (has_logged())
if (has_logged_persistent())
{
mtr= &local_mtr;
local_mtr.start();
......
......@@ -961,47 +961,6 @@ trx_undo_truncate_start(
goto loop;
}
/** Frees an undo log segment which is not in the history list.
@param undo temporary undo log */
static void trx_undo_seg_free(const trx_undo_t *undo)
{
ut_ad(undo->id < TRX_RSEG_N_SLOTS);
trx_rseg_t *const rseg= undo->rseg;
bool finished;
mtr_t mtr;
ut_ad(rseg->space == fil_system.temp_space);
do
{
mtr.start();
mtr.set_log_mode(MTR_LOG_NO_REDO);
finished= true;
if (buf_block_t *block=
buf_page_get(page_id_t(SRV_TMP_SPACE_ID, undo->hdr_page_no), 0,
RW_X_LATCH, &mtr))
{
fseg_header_t *file_seg= TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER +
block->page.frame;
finished= fseg_free_step(file_seg, &mtr);
if (!finished);
else if (buf_block_t* rseg_header = rseg->get(&mtr, nullptr))
{
static_assert(FIL_NULL == 0xffffffff, "compatibility");
mtr.memset(rseg_header, TRX_RSEG + TRX_RSEG_UNDO_SLOTS +
undo->id * TRX_RSEG_SLOT_SIZE, 4, 0xff);
}
}
mtr.commit();
}
while (!finished);
}
/*========== UNDO LOG MEMORY COPY INITIALIZATION =====================*/
/** Read an undo log when starting up the database.
......@@ -1508,27 +1467,6 @@ void trx_undo_set_state_at_prepare(trx_t *trx, trx_undo_t *undo, bool rollback,
trx_undo_write_xid(block, offset, undo->xid, mtr);
}
/** Free temporary undo log after commit or rollback.
The information is not needed after a commit or rollback, therefore
the data can be discarded.
@param undo temporary undo log */
void trx_undo_commit_cleanup(trx_undo_t *undo)
{
trx_rseg_t *rseg= undo->rseg;
ut_ad(rseg->space == fil_system.temp_space);
rseg->latch.wr_lock(SRW_LOCK_CALL);
UT_LIST_REMOVE(rseg->undo_list, undo);
ut_ad(undo->state == TRX_UNDO_TO_PURGE);
/* Delete first the undo log segment in the file */
trx_undo_seg_free(undo);
ut_ad(rseg->curr_size > undo->size);
rseg->curr_size-= undo->size;
rseg->latch.wr_unlock();
ut_free(undo);
}
/** At shutdown, frees the undo logs of a transaction. */
void trx_undo_free_at_shutdown(trx_t *trx)
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment