Commit 722df777 authored by Vlad Lesin's avatar Vlad Lesin

MDEV-33757 Get rid of TrxUndoRsegs code

TrxUndoRsegs is wrapper for vector of trx_rseg_t*. It has two
constructors, both initialize the vector with only one element. And they
are used to push transactions rseg(the singular) to purge queue. There is
no function to add elements to the vector. The default constructor is used
only for declaration of NullElement.

The TrxUndoRsegs was introduced in WL#6915 in MySQL 5.7 and. MySQL 5.7
would unnecessarily let the purge of history parse the
temporary undo records, and then look up the table (via a global hash
table), and only at the point of processing the parsed undo log record
determine that the table is a temporary table and the undo record must be
thrown away.

In MariaDB 10.2 we have two disjoint sets of rollback segments (128 for
persistent, 128 for temporary), and purge does not even see the temporary
tables. The only reason why temporary tables are visible to other threads
is a SQL layer bug (MDEV-17805).

purge_sys_t::choose_next_log(): merge the relevant part
of TrxUndoRsegsIterator::set_next() to the start of
purge_sys_t::choose_next_log().

purge_sys_t::rseg_get_next_history_log(): add a tail call of
purge_sys_t::choose_next_log() and adjust the callers, to simplify the
control flow further.

purge_sys.pq_mutex and purge_sys.purge_queue: make it private by adding
some simple accessor function.

trx_purge_cleanse_purge_queue(): make it a member of purge_sys_t to have
have access to private purge_sys.pq_mutex and purge_sys.purge_queue,
simplify the code with using simple array copy and clearing purge queue
instead of poping each purge queue element.

rseg_t::last_commit_and_offset: exchange trx_no and offset bits to avoid
bitwise operations during pushing to/popping from purge queue.

Thanks Marko Mäkelä for historical overview of TrxUndoRsegs development.

Reviewed by: Marko Mäkelä
parent ccb7a1e9
...@@ -55,80 +55,79 @@ Run a purge batch. ...@@ -55,80 +55,79 @@ Run a purge batch.
@return number of undo log pages handled in the batch */ @return number of undo log pages handled in the batch */
ulint trx_purge(ulint n_tasks, ulint history_size); ulint trx_purge(ulint n_tasks, ulint history_size);
/** Rollback segements from a given transaction with trx-no
scheduled for purge. */
class TrxUndoRsegs {
private:
typedef std::vector<trx_rseg_t*, ut_allocator<trx_rseg_t*> >
trx_rsegs_t;
public:
typedef trx_rsegs_t::iterator iterator;
typedef trx_rsegs_t::const_iterator const_iterator;
TrxUndoRsegs() = default;
/** Constructor */
TrxUndoRsegs(trx_rseg_t& rseg)
: trx_no(rseg.last_trx_no()), m_rsegs(1, &rseg) {}
/** Constructor */
TrxUndoRsegs(trx_id_t trx_no, trx_rseg_t& rseg)
: trx_no(trx_no), m_rsegs(1, &rseg) {}
bool operator!=(const TrxUndoRsegs& other) const
{ return trx_no != other.trx_no; }
bool empty() const { return m_rsegs.empty(); }
void erase(iterator& it) { m_rsegs.erase(it); }
iterator begin() { return(m_rsegs.begin()); }
iterator end() { return(m_rsegs.end()); }
const_iterator begin() const { return m_rsegs.begin(); }
const_iterator end() const { return m_rsegs.end(); }
/** Compare two TrxUndoRsegs based on trx_no.
@param elem1 first element to compare
@param elem2 second element to compare
@return true if elem1 > elem2 else false.*/
bool operator()(const TrxUndoRsegs& lhs, const TrxUndoRsegs& rhs)
{
return(lhs.trx_no > rhs.trx_no);
}
/** Copy of trx_rseg_t::last_trx_no() */
trx_id_t trx_no= 0;
private:
/** Rollback segments of a transaction, scheduled for purge. */
trx_rsegs_t m_rsegs{};
};
typedef std::priority_queue<
TrxUndoRsegs,
std::vector<TrxUndoRsegs, ut_allocator<TrxUndoRsegs> >,
TrxUndoRsegs> purge_pq_t;
/** Chooses the rollback segment with the oldest committed transaction */
struct TrxUndoRsegsIterator {
/** Constructor */
TrxUndoRsegsIterator();
/** Sets the next rseg to purge in purge_sys.
Executed in the purge coordinator thread.
@retval false when nothing is to be purged
@retval true when purge_sys.rseg->latch was locked */
inline bool set_next();
private:
// Disable copying
TrxUndoRsegsIterator(const TrxUndoRsegsIterator&);
TrxUndoRsegsIterator& operator=(const TrxUndoRsegsIterator&);
/** The current element to process */
TrxUndoRsegs m_rsegs;
/** Track the current element in m_rsegs */
TrxUndoRsegs::const_iterator m_iter;
};
/** The control structure used in the purge operation */ /** The control structure used in the purge operation */
class purge_sys_t class purge_sys_t
{ {
friend TrxUndoRsegsIterator; /** Min-heap based priority queue over fixed size array */
class purge_queue
{
/** Array of indexes in trx_sys.rseg_array. */
alignas(CPU_LEVEL1_DCACHE_LINESIZE) byte m_array[TRX_SYS_N_RSEGS];
/** Pointer to the end of m_array. */
byte *m_end= m_array;
public:
struct trx_rseg_cmp
{
/** Compare two trx_rseg_t* based on trx_no.
@param lhs first index in trx_sys.rseg_array to compare
@param rhs second index in trx_sys.rseg_array to compare
@return whether lhs>rhs */
bool operator()(const byte lhs, const byte rhs)
{
ut_ad(lhs < TRX_SYS_N_RSEGS);
ut_ad(rhs < TRX_SYS_N_RSEGS);
/* We can compare without trx_rseg_t::latch, because rseg last
commit is always set before pushing rseg to purge queue. */
return trx_sys.rseg_array[lhs].last_commit_and_offset >
trx_sys.rseg_array[rhs].last_commit_and_offset;
}
};
byte *begin() { return m_array; }
byte *end() { return m_end; }
const byte *c_begin() const { return m_array; }
const byte *c_end() const { return m_end; }
size_t size() const
{
size_t s= c_end() - c_begin();
ut_ad(s <= TRX_SYS_N_RSEGS);
return s;
}
bool empty() const { return !size(); }
void clear() { m_end= m_array; }
/** Push index of trx_sys.rseg_array into min-heap.
@param i index to push */
void push_rseg_index(byte i)
{
ut_ad(i < TRX_SYS_N_RSEGS);
ut_ad(size() + 1 <= TRX_SYS_N_RSEGS);
*m_end++= i;
std::push_heap(begin(), end(), trx_rseg_cmp());
}
/** Push rseg to priority queue.
@param rseg trx_rseg_t pointer to push */
void push(const trx_rseg_t *rseg)
{
ut_ad(rseg >= trx_sys.rseg_array);
ut_ad(rseg < trx_sys.rseg_array + TRX_SYS_N_RSEGS);
byte i= byte(rseg - trx_sys.rseg_array);
push_rseg_index(i);
}
/** Pop rseg from priority queue.
@return pointer to popped trx_rseg_t object */
trx_rseg_t *pop()
{
ut_ad(!empty());
std::pop_heap(begin(), end(), trx_rseg_cmp());
byte i= *--m_end;
ut_ad(i < TRX_SYS_N_RSEGS);
return &trx_sys.rseg_array[i];
}
};
public: public:
/** latch protecting view, m_enabled */ /** latch protecting view, m_enabled */
alignas(CPU_LEVEL1_DCACHE_LINESIZE) mutable srw_spin_lock latch; alignas(CPU_LEVEL1_DCACHE_LINESIZE) mutable srw_spin_lock latch;
...@@ -244,15 +243,27 @@ class purge_sys_t ...@@ -244,15 +243,27 @@ class purge_sys_t
record */ record */
uint16_t hdr_offset; /*!< Header byte offset on the page */ uint16_t hdr_offset; /*!< Header byte offset on the page */
/** Binary min-heap of indexes in trx_sys.rseg_array, ordered on
rseg_t::last_trx_no(). It is protected by the pq_mutex */
purge_queue purge_queue;
/** Mutex protecting purge_queue */
mysql_mutex_t pq_mutex;
TrxUndoRsegsIterator
rseg_iter; /*!< Iterator to get the next rseg
to process */
public: public:
purge_pq_t purge_queue; /*!< Binary min-heap, ordered on /** Push to purge queue without acquiring pq_mutex.
TrxUndoRsegs::trx_no. It is protected @param rseg rseg to push */
by the pq_mutex */ void enqueue(trx_rseg_t &rseg)
mysql_mutex_t pq_mutex; /*!< Mutex protecting purge_queue */ {
mysql_mutex_assert_owner(&pq_mutex);
purge_queue.push(&rseg);
}
/** Acquare purge_queue_mutex */
void queue_lock() { mysql_mutex_lock(&pq_mutex); }
/** Release purge queue mutex */
void queue_unlock() { mysql_mutex_unlock(&pq_mutex); }
/** innodb_undo_log_truncate=ON state; /** innodb_undo_log_truncate=ON state;
only modified by purge_coordinator_callback() */ only modified by purge_coordinator_callback() */
...@@ -332,8 +343,9 @@ class purge_sys_t ...@@ -332,8 +343,9 @@ class purge_sys_t
/** Update the last not yet purged history log info in rseg when /** Update the last not yet purged history log info in rseg when
we have purged a whole undo log. Advances also purge_trx_no we have purged a whole undo log. Advances also purge_trx_no
past the purged log. */ past the purged log.
void rseg_get_next_history_log(); @return whether anything is to be purged */
bool rseg_get_next_history_log();
public: public:
/** /**
...@@ -438,6 +450,11 @@ class purge_sys_t ...@@ -438,6 +450,11 @@ class purge_sys_t
@param already_stopped True indicates purge threads were @param already_stopped True indicates purge threads were
already stopped */ already stopped */
void stop_FTS(const dict_table_t &table, bool already_stopped=false); void stop_FTS(const dict_table_t &table, bool already_stopped=false);
/** Cleanse purge queue to remove the rseg that reside in undo-tablespace
marked for truncate.
@param space undo tablespace being truncated */
void cleanse_purge_queue(const fil_space_t &space);
}; };
/** The global data structure coordinating a purge */ /** The global data structure coordinating a purge */
......
...@@ -170,19 +170,21 @@ struct alignas(CPU_LEVEL1_DCACHE_LINESIZE) trx_rseg_t ...@@ -170,19 +170,21 @@ struct alignas(CPU_LEVEL1_DCACHE_LINESIZE) trx_rseg_t
/** Last not yet purged undo log header; FIL_NULL if all purged */ /** Last not yet purged undo log header; FIL_NULL if all purged */
uint32_t last_page_no; uint32_t last_page_no;
/** trx_t::no | last_offset << 48 */ /** trx_t::no << 16 | last_offset */
uint64_t last_commit_and_offset; uint64_t last_commit_and_offset;
/** @return the commit ID of the last committed transaction */ /** @return the commit ID of the last committed transaction */
trx_id_t last_trx_no() const trx_id_t last_trx_no() const
{ return last_commit_and_offset & ((1ULL << 48) - 1); } { return last_commit_and_offset >> 16; }
/** @return header offset of the last committed transaction */ /** @return header offset of the last committed transaction */
uint16_t last_offset() const uint16_t last_offset() const
{ return static_cast<uint16_t>(last_commit_and_offset >> 48); } {
return static_cast<uint16_t>(last_commit_and_offset & ((1ULL << 16) - 1));
}
void set_last_commit(uint16_t last_offset, trx_id_t trx_no) void set_last_commit(uint16_t last_offset, trx_id_t trx_no)
{ {
last_commit_and_offset= static_cast<uint64_t>(last_offset) << 48 | trx_no; last_commit_and_offset= trx_no << 16 | static_cast<uint64_t>(last_offset);
} }
/** @return the page identifier */ /** @return the page identifier */
......
...@@ -56,84 +56,6 @@ purge_sys_t purge_sys; ...@@ -56,84 +56,6 @@ purge_sys_t purge_sys;
my_bool srv_purge_view_update_only_debug; my_bool srv_purge_view_update_only_debug;
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
/** Sentinel value */
static const TrxUndoRsegs NullElement;
/** Default constructor */
TrxUndoRsegsIterator::TrxUndoRsegsIterator()
: m_rsegs(NullElement), m_iter(m_rsegs.begin())
{
}
/** Sets the next rseg to purge in purge_sys.
Executed in the purge coordinator thread.
@retval false when nothing is to be purged
@retval true when purge_sys.rseg->latch was locked */
inline bool TrxUndoRsegsIterator::set_next()
{
ut_ad(!purge_sys.next_stored);
mysql_mutex_lock(&purge_sys.pq_mutex);
/* Only purge consumes events from the priority queue, user
threads only produce the events. */
/* Check if there are more rsegs to process in the
current element. */
if (m_iter != m_rsegs.end()) {
/* We are still processing rollback segment from
the same transaction and so expected transaction
number shouldn't increase. Undo the increment of
expected commit done by caller assuming rollback
segments from given transaction are done. */
purge_sys.tail.trx_no = (*m_iter)->last_trx_no();
} else if (!purge_sys.purge_queue.empty()) {
m_rsegs = purge_sys.purge_queue.top();
purge_sys.purge_queue.pop();
ut_ad(purge_sys.purge_queue.empty()
|| purge_sys.purge_queue.top() != m_rsegs);
m_iter = m_rsegs.begin();
} else {
/* Queue is empty, reset iterator. */
purge_sys.rseg = NULL;
mysql_mutex_unlock(&purge_sys.pq_mutex);
m_rsegs = NullElement;
m_iter = m_rsegs.begin();
return false;
}
purge_sys.rseg = *m_iter++;
mysql_mutex_unlock(&purge_sys.pq_mutex);
/* We assume in purge of externally stored fields that space
id is in the range of UNDO tablespace space ids */
ut_ad(purge_sys.rseg->space->id == TRX_SYS_SPACE
|| srv_is_undo_tablespace(purge_sys.rseg->space->id));
purge_sys.rseg->latch.wr_lock(SRW_LOCK_CALL);
trx_id_t last_trx_no = purge_sys.rseg->last_trx_no();
purge_sys.hdr_offset = purge_sys.rseg->last_offset();
purge_sys.hdr_page_no = purge_sys.rseg->last_page_no;
/* Only the purge_coordinator_task will access this object
purge_sys.rseg_iter, or any of purge_sys.hdr_page_no,
purge_sys.tail.
The field purge_sys.head and purge_sys.view are modified by
purge_sys_t::clone_end_view()
in the purge_coordinator_task
while holding exclusive purge_sys.latch.
The purge_sys.view may also be modified by
purge_sys_t::wake_if_not_active() while holding exclusive
purge_sys.latch.
The purge_sys.head may be read by
purge_truncation_callback(). */
ut_ad(last_trx_no == m_rsegs.trx_no);
ut_a(purge_sys.hdr_page_no != FIL_NULL);
ut_a(purge_sys.tail.trx_no <= last_trx_no);
purge_sys.tail.trx_no = last_trx_no;
return(true);
}
/** Build a purge 'query' graph. The actual purge is performed by executing /** Build a purge 'query' graph. The actual purge is performed by executing
this query graph. this query graph.
@return own: the query graph */ @return own: the query graph */
...@@ -571,42 +493,17 @@ inline dberr_t purge_sys_t::iterator::free_history_rseg(trx_rseg_t &rseg) const ...@@ -571,42 +493,17 @@ inline dberr_t purge_sys_t::iterator::free_history_rseg(trx_rseg_t &rseg) const
goto loop; goto loop;
} }
/** Cleanse purge queue to remove the rseg that reside in undo-tablespace void purge_sys_t::cleanse_purge_queue(const fil_space_t &space)
marked for truncate.
@param[in] space undo tablespace being truncated */
static void trx_purge_cleanse_purge_queue(const fil_space_t& space)
{ {
typedef std::vector<TrxUndoRsegs> purge_elem_list_t; byte purge_elem_list[TRX_SYS_N_RSEGS];
purge_elem_list_t purge_elem_list; mysql_mutex_lock(&pq_mutex);
std::copy(purge_queue.c_begin(), purge_queue.c_end(), purge_elem_list);
mysql_mutex_lock(&purge_sys.pq_mutex); byte *purge_list_end = purge_elem_list + purge_queue.size();
purge_queue.clear();
/* Remove rseg instances that are in the purge queue before we start for (byte *elem = purge_elem_list; elem < purge_list_end; ++elem)
truncate of corresponding UNDO truncate. */ if (trx_sys.rseg_array[*elem].space != &space)
while (!purge_sys.purge_queue.empty()) { purge_queue.push_rseg_index(*elem);
purge_elem_list.push_back(purge_sys.purge_queue.top()); mysql_mutex_unlock(&pq_mutex);
purge_sys.purge_queue.pop();
}
for (purge_elem_list_t::iterator it = purge_elem_list.begin();
it != purge_elem_list.end();
++it) {
for (TrxUndoRsegs::iterator it2 = it->begin();
it2 != it->end();
++it2) {
if ((*it2)->space == &space) {
it->erase(it2);
break;
}
}
if (!it->empty()) {
purge_sys.purge_queue.push(*it);
}
}
mysql_mutex_unlock(&purge_sys.pq_mutex);
} }
dberr_t purge_sys_t::iterator::free_history() const dberr_t purge_sys_t::iterator::free_history() const
...@@ -750,7 +647,7 @@ TRANSACTIONAL_TARGET void trx_purge_truncate_history() ...@@ -750,7 +647,7 @@ TRANSACTIONAL_TARGET void trx_purge_truncate_history()
const char *file_name= UT_LIST_GET_FIRST(space->chain)->name; const char *file_name= UT_LIST_GET_FIRST(space->chain)->name;
sql_print_information("InnoDB: Truncating %s", file_name); sql_print_information("InnoDB: Truncating %s", file_name);
trx_purge_cleanse_purge_queue(*space); purge_sys.cleanse_purge_queue(*space);
/* Lock all modified pages of the tablespace. /* Lock all modified pages of the tablespace.
...@@ -869,7 +766,7 @@ buf_block_t *purge_sys_t::get_page(page_id_t id) ...@@ -869,7 +766,7 @@ buf_block_t *purge_sys_t::get_page(page_id_t id)
return nullptr; return nullptr;
} }
void purge_sys_t::rseg_get_next_history_log() bool purge_sys_t::rseg_get_next_history_log()
{ {
fil_addr_t prev_log_addr; fil_addr_t prev_log_addr;
...@@ -917,12 +814,13 @@ void purge_sys_t::rseg_get_next_history_log() ...@@ -917,12 +814,13 @@ void purge_sys_t::rseg_get_next_history_log()
can never produce events from an empty rollback segment. */ can never produce events from an empty rollback segment. */
mysql_mutex_lock(&pq_mutex); mysql_mutex_lock(&pq_mutex);
purge_queue.push(*rseg); purge_queue.push(rseg);
mysql_mutex_unlock(&pq_mutex); mysql_mutex_unlock(&pq_mutex);
} }
} }
rseg->latch.wr_unlock(); rseg->latch.wr_unlock();
return choose_next_log();
} }
/** Position the purge sys "iterator" on the undo record to use for purging. /** Position the purge sys "iterator" on the undo record to use for purging.
...@@ -930,11 +828,37 @@ void purge_sys_t::rseg_get_next_history_log() ...@@ -930,11 +828,37 @@ void purge_sys_t::rseg_get_next_history_log()
@retval true when purge_sys.rseg->latch was locked */ @retval true when purge_sys.rseg->latch was locked */
bool purge_sys_t::choose_next_log() bool purge_sys_t::choose_next_log()
{ {
if (!rseg_iter.set_next()) ut_ad(!next_stored);
return false;
hdr_offset= rseg->last_offset(); mysql_mutex_lock(&pq_mutex);
hdr_page_no= rseg->last_page_no; if (purge_queue.empty()) {
rseg = nullptr;
mysql_mutex_unlock(&purge_sys.pq_mutex);
return false;
}
rseg= purge_queue.pop();
mysql_mutex_unlock(&purge_sys.pq_mutex);
/* We assume in purge of externally stored fields that space
id is in the range of UNDO tablespace space ids */
ut_ad(rseg->space == fil_system.sys_space ||
srv_is_undo_tablespace(rseg->space->id));
rseg->latch.wr_lock(SRW_LOCK_CALL);
trx_id_t last_trx_no = rseg->last_trx_no();
hdr_offset = rseg->last_offset();
hdr_page_no = rseg->last_page_no;
/* Only the purge_coordinator_task will access this any of
purge_sys.hdr_page_no, purge_sys.tail. The field purge_sys.head and
purge_sys.view are modified by clone_end_view() in the
purge_coordinator_task while holding exclusive purge_sys.latch. The
purge_sys.view may also be modified by wake_if_not_active() while holding
exclusive purge_sys.latch. The purge_sys.head may be read by
purge_truncation_callback(). */
ut_a(hdr_page_no != FIL_NULL);
ut_a(tail.trx_no <= last_trx_no);
tail.trx_no = last_trx_no;
if (!rseg->needs_purge) if (!rseg->needs_purge)
{ {
...@@ -993,12 +917,9 @@ inline trx_purge_rec_t purge_sys_t::get_next_rec(roll_ptr_t roll_ptr) ...@@ -993,12 +917,9 @@ inline trx_purge_rec_t purge_sys_t::get_next_rec(roll_ptr_t roll_ptr)
if (!offset) if (!offset)
{ {
/* It is the dummy undo log record, which means that there is no /* It is the dummy undo log record, which means that there is no need to
need to purge this undo log */ purge this undo log. Look for the next undo log and record to purge */
rseg_get_next_history_log(); if (rseg_get_next_history_log())
/* Look for the next undo log and record to purge */
if (choose_next_log())
rseg->latch.wr_unlock(); rseg->latch.wr_unlock();
return {nullptr, 1}; return {nullptr, 1};
} }
...@@ -1046,9 +967,8 @@ inline trx_purge_rec_t purge_sys_t::get_next_rec(roll_ptr_t roll_ptr) ...@@ -1046,9 +967,8 @@ inline trx_purge_rec_t purge_sys_t::get_next_rec(roll_ptr_t roll_ptr)
else else
{ {
got_no_rec: got_no_rec:
rseg_get_next_history_log();
/* Look for the next undo log and record to purge */ /* Look for the next undo log and record to purge */
locked= choose_next_log(); locked= rseg_get_next_history_log();
} }
if (locked) if (locked)
......
...@@ -544,7 +544,7 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, mtr_t *mtr) ...@@ -544,7 +544,7 @@ static dberr_t trx_rseg_mem_restore(trx_rseg_t *rseg, mtr_t *mtr)
if (rseg->last_page_no != FIL_NULL) if (rseg->last_page_no != FIL_NULL)
/* There is no need to cover this operation by the purge /* There is no need to cover this operation by the purge
mutex because we are still bootstrapping. */ mutex because we are still bootstrapping. */
purge_sys.purge_queue.push(*rseg); purge_sys.enqueue(*rseg);
} }
return err; return err;
...@@ -584,7 +584,17 @@ dberr_t trx_rseg_array_init() ...@@ -584,7 +584,17 @@ dberr_t trx_rseg_array_init()
#endif #endif
mtr_t mtr; mtr_t mtr;
dberr_t err = DB_SUCCESS; dberr_t err = DB_SUCCESS;
/* mariabackup --prepare only deals with the redo log and the data
files, not with transactions or the data dictionary, that's why
trx_lists_init_at_db_start() does not invoke purge_sys.create() and
purge queue mutex stays uninitialized, and trx_rseg_mem_restore() quits
before initializing undo log lists. */
if (srv_operation != SRV_OPERATION_RESTORE)
/* Acquiring purge queue mutex here should be fine from the
deadlock prevention point of view, because executing that
function is a prerequisite for starting the purge subsystem or
any transactions. */
purge_sys.queue_lock();
for (ulint rseg_id = 0; rseg_id < TRX_SYS_N_RSEGS; rseg_id++) { for (ulint rseg_id = 0; rseg_id < TRX_SYS_N_RSEGS; rseg_id++) {
mtr.start(); mtr.start();
if (const buf_block_t* sys = trx_sysf_get(&mtr, false)) { if (const buf_block_t* sys = trx_sysf_get(&mtr, false)) {
...@@ -640,7 +650,8 @@ dberr_t trx_rseg_array_init() ...@@ -640,7 +650,8 @@ dberr_t trx_rseg_array_init()
mtr.commit(); mtr.commit();
} }
if (srv_operation != SRV_OPERATION_RESTORE)
purge_sys.queue_unlock();
if (err != DB_SUCCESS) { if (err != DB_SUCCESS) {
for (auto& rseg : trx_sys.rseg_array) { for (auto& rseg : trx_sys.rseg_array) {
while (auto u = UT_LIST_GET_FIRST(rseg.undo_list)) { while (auto u = UT_LIST_GET_FIRST(rseg.undo_list)) {
......
...@@ -1138,15 +1138,23 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr) ...@@ -1138,15 +1138,23 @@ inline void trx_t::write_serialisation_history(mtr_t *mtr)
} }
else if (rseg->last_page_no == FIL_NULL) else if (rseg->last_page_no == FIL_NULL)
{ {
mysql_mutex_lock(&purge_sys.pq_mutex); /* trx_sys.assign_new_trx_no() and
purge_sys.enqueue() must be invoked in the same
critical section protected with purge queue mutex to avoid rseg with
greater last commit number to be pushed to purge queue prior to rseg with
lesser last commit number. In other words pushing to purge queue must be
serialized along with assigning trx_no. Otherwise purge coordinator
thread can also fetch redo log records from rseg with greater last commit
number before rseg with lesser one. */
purge_sys.queue_lock();
trx_sys.assign_new_trx_no(this); trx_sys.assign_new_trx_no(this);
const trx_id_t end{rw_trx_hash_element->no}; const trx_id_t end{rw_trx_hash_element->no};
rseg->last_page_no= undo->hdr_page_no;
/* end cannot be less than anything in rseg. User threads only /* end cannot be less than anything in rseg. User threads only
produce events when a rollback segment is empty. */ produce events when a rollback segment is empty. */
purge_sys.purge_queue.push(TrxUndoRsegs{end, *rseg});
mysql_mutex_unlock(&purge_sys.pq_mutex);
rseg->last_page_no= undo->hdr_page_no;
rseg->set_last_commit(undo->hdr_offset, end); rseg->set_last_commit(undo->hdr_offset, end);
purge_sys.enqueue(*rseg);
purge_sys.queue_unlock();
} }
else else
trx_sys.assign_new_trx_no(this); trx_sys.assign_new_trx_no(this);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment