Commit e9c389c3 authored by Eugene Kosov's avatar Eugene Kosov

MDEV-22701 InnoDB: encapsulate trx_sys.mutex and trx_sys.trx_list into a separate class

thread_safe_trx_ilist_t: almost generic one

UT_LIST was replaced with ilist<t>

innobase_kill_query: wrong comment removed.
parent ead98fe5
......@@ -2063,6 +2063,40 @@ inline void buf_pool_t::write_unlock_all_page_hash()
old_page_hash->write_unlock_all();
}
namespace
{
struct find_interesting_trx
{
void operator()(const trx_t &trx)
{
if (trx.state == TRX_STATE_NOT_STARTED)
return;
if (trx.mysql_thd == nullptr)
return;
if (withdraw_started <= trx.start_time)
return;
if (!found)
{
ib::warn() << "The following trx might hold "
"the blocks in buffer pool to "
"be withdrawn. Buffer pool "
"resizing can complete only "
"after all the transactions "
"below release the blocks.";
found= true;
}
lock_trx_print_wait_and_mvcc_state(stderr, &trx, current_time);
}
bool &found;
time_t withdraw_started;
time_t current_time;
};
} // namespace
/** Resize from srv_buf_pool_old_size to srv_buf_pool_size. */
inline void buf_pool_t::resize()
......@@ -2160,30 +2194,9 @@ inline void buf_pool_t::resize()
}
lock_mutex_enter();
mutex_enter(&trx_sys.mutex);
bool found = false;
for (trx_t* trx = UT_LIST_GET_FIRST(trx_sys.trx_list);
trx != NULL;
trx = UT_LIST_GET_NEXT(trx_list, trx)) {
if (trx->state != TRX_STATE_NOT_STARTED
&& trx->mysql_thd != NULL
&& withdraw_started > trx->start_time) {
if (!found) {
ib::warn() <<
"The following trx might hold"
" the blocks in buffer pool to"
" be withdrawn. Buffer pool"
" resizing can complete only"
" after all the transactions"
" below release the blocks.";
found = true;
}
lock_trx_print_wait_and_mvcc_state(
stderr, trx, current_time);
}
}
mutex_exit(&trx_sys.mutex);
trx_sys.trx_list.for_each(find_interesting_trx{
found, withdraw_started, current_time});
lock_mutex_exit();
withdraw_started = current_time;
......
......@@ -4824,23 +4824,23 @@ static void innobase_kill_query(handlerton*, THD *thd, enum thd_kill_levels)
DBUG_VOID_RETURN;
#endif /* WITH_WSREP */
lock_mutex_enter();
mutex_enter(&trx_sys.mutex);
trx_sys.trx_list.freeze();
trx_mutex_enter(trx);
/* It is possible that innobase_close_connection() is concurrently
being executed on our victim. Even if the trx object is later
reused for another client connection or a background transaction,
its trx->mysql_thd will differ from our thd.
trx_t::state changes are protected by trx_t::mutex, and
trx_sys.trx_list is protected by trx_sys.mutex, in
both trx_create() and trx_free().
trx_sys.trx_list is thread-safe. It's freezed to 'protect'
trx_t. However, trx_t::commit_in_memory() changes a trx_t::state
of autocommit non-locking transactions without any protection.
At this point, trx may have been reallocated for another client
connection, or for a background operation. In that case, either
trx_t::state or trx_t::mysql_thd should not match our expectations. */
bool cancel= trx->mysql_thd == thd && trx->state == TRX_STATE_ACTIVE &&
!trx->lock.was_chosen_as_deadlock_victim;
mutex_exit(&trx_sys.mutex);
trx_sys.trx_list.unfreeze();
if (!cancel);
else if (lock_t *lock= trx->lock.wait_lock)
lock_cancel_waiting_and_release(lock);
......
......@@ -41,8 +41,7 @@ Created 3/26/1996 Heikki Tuuri
#ifdef WITH_WSREP
#include "trx0xa.h"
#endif /* WITH_WSREP */
typedef UT_LIST_BASE_NODE_T(trx_t) trx_ut_list_t;
#include "ilist.h"
/** Checks if a page address is the trx sys header page.
@param[in] page_id page id
......@@ -803,6 +802,49 @@ class rw_trx_hash_t
}
};
class thread_safe_trx_ilist_t
{
public:
void create() { mutex_create(LATCH_ID_TRX_SYS, &mutex); }
void close() { mutex_free(&mutex); }
bool empty() const
{
mutex_enter(&mutex);
auto result= trx_list.empty();
mutex_exit(&mutex);
return result;
}
void push_front(trx_t &trx)
{
mutex_enter(&mutex);
trx_list.push_front(trx);
mutex_exit(&mutex);
}
void remove(trx_t &trx)
{
mutex_enter(&mutex);
trx_list.remove(trx);
mutex_exit(&mutex);
}
template <typename Callable> void for_each(Callable &&callback) const
{
mutex_enter(&mutex);
for (const auto &trx : trx_list)
callback(trx);
mutex_exit(&mutex);
}
void freeze() const { mutex_enter(&mutex); }
void unfreeze() const { mutex_exit(&mutex); }
private:
alignas(CACHE_LINE_SIZE) mutable TrxSysMutex mutex;
alignas(CACHE_LINE_SIZE) ilist<trx_t> trx_list;
};
/** The transaction system central memory data structure. */
class trx_sys_t
......@@ -833,11 +875,8 @@ class trx_sys_t
*/
MY_ALIGNED(CACHE_LINE_SIZE) Atomic_counter<uint32_t> rseg_history_len;
/** Mutex protecting trx_list AND NOTHING ELSE. */
MY_ALIGNED(CACHE_LINE_SIZE) mutable TrxSysMutex mutex;
/** List of all transactions. */
MY_ALIGNED(CACHE_LINE_SIZE) trx_ut_list_t trx_list;
thread_safe_trx_ilist_t trx_list;
MY_ALIGNED(CACHE_LINE_SIZE)
/** Temporary rollback segments */
......@@ -978,7 +1017,6 @@ class trx_sys_t
void snapshot_ids(trx_t *caller_trx, trx_ids_t *ids, trx_id_t *max_trx_id,
trx_id_t *min_trx_no)
{
ut_ad(!mutex_own(&mutex));
snapshot_ids_arg arg(ids);
while ((arg.m_id= get_rw_trx_hash_version()) != get_max_trx_id())
......@@ -1075,9 +1113,7 @@ class trx_sys_t
*/
void register_trx(trx_t *trx)
{
mutex_enter(&mutex);
UT_LIST_ADD_FIRST(trx_list, trx);
mutex_exit(&mutex);
trx_list.push_front(*trx);
}
......@@ -1088,9 +1124,7 @@ class trx_sys_t
*/
void deregister_trx(trx_t *trx)
{
mutex_enter(&mutex);
UT_LIST_REMOVE(trx_list, trx);
mutex_exit(&mutex);
trx_list.remove(*trx);
}
......@@ -1109,14 +1143,11 @@ class trx_sys_t
{
size_t count= 0;
mutex_enter(&mutex);
for (const trx_t *trx= UT_LIST_GET_FIRST(trx_list); trx;
trx= UT_LIST_GET_NEXT(trx_list, trx))
{
if (trx->read_view.is_open())
trx_list.for_each([&count](const trx_t &trx) {
if (trx.read_view.is_open())
++count;
}
mutex_exit(&mutex);
});
return count;
}
......
......@@ -35,6 +35,7 @@ Created 3/26/1996 Heikki Tuuri
#include "ut0vec.h"
#include "fts0fts.h"
#include "read0types.h"
#include "ilist.h"
#include <vector>
#include <set>
......@@ -713,7 +714,7 @@ struct trx_rsegs_t {
trx_temp_undo_t m_noredo;
};
struct trx_t {
struct trx_t : ilist_node<> {
private:
/**
Count of references.
......@@ -909,10 +910,6 @@ struct trx_t {
/*!< how many tables the current SQL
statement uses, except those
in consistent read */
/*------------------------------*/
UT_LIST_NODE_T(trx_t) trx_list; /*!< list of all transactions;
protected by trx_sys.mutex */
/*------------------------------*/
dberr_t error_state; /*!< 0 if no error, otherwise error
number; NOTE That ONLY the thread
doing the transaction is allowed to
......
......@@ -4654,15 +4654,14 @@ struct lock_print_info
purge_trx(purge_sys.query ? purge_sys.query->trx : NULL)
{}
void operator()(const trx_t* trx) const
void operator()(const trx_t &trx) const
{
ut_ad(mutex_own(&trx_sys.mutex));
if (UNIV_UNLIKELY(trx == purge_trx))
if (UNIV_UNLIKELY(&trx == purge_trx))
return;
lock_trx_print_wait_and_mvcc_state(file, trx, now);
lock_trx_print_wait_and_mvcc_state(file, &trx, now);
if (trx->will_lock && srv_print_innodb_lock_monitor)
lock_trx_print_locks(file, trx);
if (trx.will_lock && srv_print_innodb_lock_monitor)
lock_trx_print_locks(file, &trx);
}
FILE* const file;
......@@ -4682,11 +4681,8 @@ lock_print_info_all_transactions(
ut_ad(lock_mutex_own());
fprintf(file, "LIST OF TRANSACTIONS FOR EACH SESSION:\n");
const time_t now = time(NULL);
mutex_enter(&trx_sys.mutex);
ut_list_map(trx_sys.trx_list, lock_print_info(file, now));
mutex_exit(&trx_sys.mutex);
trx_sys.trx_list.for_each(lock_print_info(file, time(nullptr)));
lock_mutex_exit();
ut_ad(lock_validate());
......
......@@ -245,10 +245,8 @@ void ReadView::open(trx_t *trx)
void trx_sys_t::clone_oldest_view(ReadViewBase *view) const
{
view->snapshot(nullptr);
mutex_enter(&mutex);
/* Find oldest view. */
for (const trx_t *trx= UT_LIST_GET_FIRST(trx_list); trx;
trx= UT_LIST_GET_NEXT(trx_list, trx))
trx->read_view.append_to(view);
mutex_exit(&mutex);
trx_list.for_each([view](const trx_t &trx) {
trx.read_view.append_to(view);
});
}
......@@ -1220,19 +1220,13 @@ static void fetch_data_into_cache(trx_i_s_cache_t *cache)
trx_i_s_cache_clear(cache);
/* Capture the state of transactions */
mutex_enter(&trx_sys.mutex);
for (const trx_t *trx= UT_LIST_GET_FIRST(trx_sys.trx_list);
trx != NULL;
trx= UT_LIST_GET_NEXT(trx_list, trx))
{
if (trx_is_started(trx) && trx != purge_sys.query->trx)
trx_sys.trx_list.for_each([cache](const trx_t &trx) {
if (!cache->is_truncated && trx_is_started(&trx) &&
&trx != purge_sys.query->trx)
{
fetch_data_into_cache_low(cache, trx);
if (cache->is_truncated)
break;
}
}
mutex_exit(&trx_sys.mutex);
fetch_data_into_cache_low(cache, &trx);
}
});
cache->is_truncated= false;
}
......
......@@ -205,8 +205,7 @@ trx_sys_t::create()
ut_ad(this == &trx_sys);
ut_ad(!is_initialised());
m_initialised = true;
mutex_create(LATCH_ID_TRX_SYS, &mutex);
UT_LIST_INIT(trx_list, &trx_t::trx_list);
trx_list.create();
rseg_history_len= 0;
rw_trx_hash.init();
......@@ -320,8 +319,8 @@ trx_sys_t::close()
}
}
ut_a(UT_LIST_GET_LEN(trx_list) == 0);
mutex_free(&mutex);
ut_a(trx_list.empty());
trx_list.close();
m_initialised = false;
}
......@@ -330,15 +329,11 @@ ulint trx_sys_t::any_active_transactions()
{
uint32_t total_trx= 0;
mutex_enter(&mutex);
for (trx_t* trx= UT_LIST_GET_FIRST(trx_sys.trx_list);
trx != NULL;
trx= UT_LIST_GET_NEXT(trx_list, trx))
{
if (trx->state == TRX_STATE_COMMITTED_IN_MEMORY ||
(trx->state == TRX_STATE_ACTIVE && trx->id))
trx_sys.trx_list.for_each([&total_trx](const trx_t &trx) {
if (trx.state == TRX_STATE_COMMITTED_IN_MEMORY ||
(trx.state == TRX_STATE_ACTIVE && trx.id))
total_trx++;
}
mutex_exit(&mutex);
});
return total_trx;
}
......@@ -977,8 +977,9 @@ trx_start_low(
ut_a(trx->lock.table_locks.empty());
/* No other thread can access this trx object through rw_trx_hash,
still it can be found through trx_sys.trx_list, which means state
change must be protected by e.g. trx->mutex.
still it can be found through trx_sys.trx_list. Sometimes it's
possible to indirectly protect trx_t::state by freezing
trx_sys.trx_list.
For now we update it without mutex protection, because original code
did it this way. It has to be reviewed and fixed properly. */
......@@ -1343,9 +1344,9 @@ inline void trx_t::commit_in_memory(const mtr_t *mtr)
/* This state change is not protected by any mutex, therefore
there is an inherent race here around state transition during
printouts. We ignore this race for the sake of efficiency.
However, the trx_sys_t::mutex will protect the trx_t instance
and it cannot be removed from the trx_list and freed
without first acquiring the trx_sys_t::mutex. */
However, the freezing of trx_sys.trx_list will protect the trx_t
instance and it cannot be removed from the trx_list and freed
without first unfreezing trx_list. */
ut_ad(trx_state_eq(this, TRX_STATE_ACTIVE));
MONITOR_INC(MONITOR_TRX_NL_RO_COMMIT);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment