Commit 21bec970 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-32050: Clean up online ALTER

UndorecApplier::assign_rec(): Remove. We will pass the undo record to
UndorecApplier::apply_undo_rec(). There is no need to copy the
undo record, because nothing else can write to the undo log pages
that belong to an active or incomplete transaction.

trx_t::apply_log(): Buffer-fix the undo page across mini-transaction
boundary in order to avoid repeated page lookups.

Reviewed by: Vladislav Lesin
parent 9bb5d9fe
......@@ -308,13 +308,13 @@ class UndorecApplier
{
/** Undo log block page id */
page_id_t page_id;
/** Undo log record pointer */
/** Pointer to within undo log record */
const trx_undo_rec_t *undo_rec;
/** Undo log record type */
byte type;
/** compiler information */
byte cmpl_info;
/** Offset of the undo log record within the block */
/** page_offset(undo_rec) of the start of undo_rec */
uint16_t offset;
/** Transaction id of the undo log */
const trx_id_t trx_id;
......@@ -338,15 +338,10 @@ class UndorecApplier
page_id= next_page_id;
}
/** Assign the undo log record and offset */
inline void assign_rec(const buf_block_t &block, uint16_t offset);
uint16_t get_offset() const { return offset; }
page_id_t get_page_id() const { return page_id; }
/** Handle the DML undo log and apply it on online indexes */
inline void apply_undo_rec();
inline void apply_undo_rec(const trx_undo_rec_t *rec);
~UndorecApplier()
{
......@@ -368,12 +363,7 @@ class UndorecApplier
/** Check whether the given roll pointer is generated by
the current undo log record information stored.
@return true if roll pointer matches with current undo log info */
bool is_same(roll_ptr_t roll_ptr) const
{
uint16_t offset= static_cast<uint16_t>(roll_ptr);
uint32_t page_no= static_cast<uint32_t>(roll_ptr >> 16);
return page_no == page_id.page_no() && offset == this->offset;
}
inline bool is_same(roll_ptr_t roll_ptr) const;
/** Clear the undo log record information */
void clear_undo_rec()
......
......@@ -3825,6 +3825,12 @@ dberr_t dict_table_t::clear(que_thr_t *thr)
return err;
}
inline bool UndorecApplier::is_same(roll_ptr_t roll_ptr) const
{
return uint16_t(roll_ptr) == offset &&
uint32_t(roll_ptr >> 16) == page_id.page_no();
}
const rec_t *
UndorecApplier::get_old_rec(const dtuple_t &tuple, dict_index_t *index,
const rec_t **clust_rec, rec_offs **offsets)
......@@ -3950,8 +3956,7 @@ void UndorecApplier::log_insert(const dtuple_t &tuple,
/* Update the row with virtual column values present
in the undo log or update vector */
if (type == TRX_UNDO_UPD_DEL_REC)
row_upd_replace_vcol(row, table, update, false,
nullptr,
row_upd_replace_vcol(row, table, update, false, nullptr,
(cmpl_info & UPD_NODE_NO_ORD_CHANGE)
? nullptr : undo_rec);
else
......@@ -4073,7 +4078,7 @@ void UndorecApplier::log_update(const dtuple_t &tuple,
if (table->n_v_cols)
row_upd_replace_vcol(row, table, update, false, nullptr,
(cmpl_info & UPD_NODE_NO_ORD_CHANGE)
? nullptr : this->undo_rec);
? nullptr : undo_rec);
bool success= true;
dict_index_t *index= dict_table_get_next_index(clust_index);
......
......@@ -1192,7 +1192,7 @@ static bool row_undo_mod_parse_undo_rec(undo_node_t* node, bool dict_locked)
row_upd_replace_vcol(node->row, node->table,
node->update, false, node->undo_row,
(node->cmpl_info & UPD_NODE_NO_ORD_CHANGE)
? NULL : ptr);
? nullptr : ptr);
}
return true;
......
......@@ -1060,10 +1060,8 @@ row_upd_replace_vcol(
bool is_undo_log = true;
/* We will read those unchanged (but indexed) virtual columns in */
if (ptr != NULL) {
const byte* end_ptr;
end_ptr = ptr + mach_read_from_2(ptr);
if (ptr) {
const byte* const end_ptr = ptr + mach_read_from_2(ptr);
ptr += 2;
while (ptr != end_ptr) {
......@@ -1189,7 +1187,7 @@ row_upd_replace(
*ext = NULL;
}
row_upd_replace_vcol(row, table, update, true, NULL, NULL);
row_upd_replace_vcol(row, table, update, true, nullptr, nullptr);
}
/***********************************************************//**
......
......@@ -2335,7 +2335,7 @@ trx_undo_prev_version_build(
update vector to dtuple vrow */
if (v_status & TRX_UNDO_GET_OLD_V_VALUE) {
row_upd_replace_vcol((dtuple_t*)*vrow, index->table, update,
false, NULL, NULL);
false, nullptr, nullptr);
}
#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
......
......@@ -299,18 +299,13 @@ trx_undo_get_first_rec(const fil_space_t &space, uint32_t page_no,
mtr);
}
inline void UndorecApplier::assign_rec(const buf_block_t &block,
uint16_t offset)
{
ut_ad(block.page.lock.have_s());
this->offset= offset;
this->undo_rec= trx_undo_rec_copy(block.page.frame + offset, heap);
}
inline void UndorecApplier::apply_undo_rec()
inline void UndorecApplier::apply_undo_rec(const trx_undo_rec_t *rec)
{
undo_rec= rec;
if (!undo_rec)
return;
offset= page_offset(undo_rec);
bool updated_extern= false;
undo_no_t undo_no= 0;
table_id_t table_id= 0;
......@@ -382,14 +377,14 @@ ATTRIBUTE_COLD void trx_t::apply_log()
undo->hdr_offset);
while (rec)
{
log_applier.assign_rec(*block, page_offset(rec));
block->page.fix();
mtr.commit();
log_applier.apply_undo_rec();
/* Since we are the only thread who could write to this undo page,
it is safe to dereference rec while only holding a buffer-fix. */
log_applier.apply_undo_rec(rec);
mtr.start();
block= buf_page_get(log_applier.get_page_id(), 0, RW_S_LATCH, &mtr);
if (UNIV_UNLIKELY(!block))
goto func_exit;
rec= trx_undo_page_get_next_rec(block, log_applier.get_offset(),
mtr.page_lock(block, RW_S_LATCH);
rec= trx_undo_page_get_next_rec(block, page_offset(rec),
page_id.page_no(), undo->hdr_offset);
}
......@@ -406,7 +401,6 @@ ATTRIBUTE_COLD void trx_t::apply_log()
break;
log_applier.assign_next(next_page_id);
}
func_exit:
mtr.commit();
apply_online_log= false;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment