Commit d46b4248 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-24142 preparation: Add srw_mutex and srw_lock::u_lock()

The PERFORMANCE_SCHEMA insists on distinguishing read-update-write
locks from read-write locks, so we must add
template<bool support_u_lock> in rd_lock() and wr_lock() operations.

rd_lock::read_trylock(): Add template<bool prioritize_updater=false>
which is used by the srw_lock_low::read_lock() loop. As long as
an UPDATE lock has already been granted to some thread, we will grant
subsequent READ lock requests even if a waiting WRITE lock request
exists. This will be necessary to be compatible with existing usage
pattern of InnoDB rw_lock_t where the holder of SX-latch (which we
will rename to UPDATE latch) may acquire an additional S-latch
on the same object. For normal read-write locks without update operations
this should make no difference at all, because the rw_lock::UPDATER
flag would never be set.
parent 3872e585
......@@ -20,7 +20,7 @@ this program; if not, write to the Free Software Foundation, Inc.,
#include <atomic>
#include "my_dbug.h"
/** Simple read-write lock based on std::atomic */
/** Simple read-update-write lock based on std::atomic */
class rw_lock
{
/** The lock word */
......@@ -35,6 +35,8 @@ class rw_lock
static constexpr uint32_t WRITER_WAITING= 1U << 30;
/** Flag to indicate that write_lock() or write_lock_wait() is pending */
static constexpr uint32_t WRITER_PENDING= WRITER | WRITER_WAITING;
/** Flag to indicate that an update lock exists */
static constexpr uint32_t UPDATER= 1U << 29;
/** Start waiting for an exclusive lock.
@return current value of the lock word */
......@@ -45,13 +47,14 @@ class rw_lock
@return whether the exclusive lock was acquired */
bool write_lock_wait_try(uint32_t &l)
{
l= WRITER_WAITING;
return lock.compare_exchange_strong(l, WRITER, std::memory_order_acquire,
std::memory_order_relaxed);
}
/** Try to acquire a shared lock.
@tparam prioritize_updater whether to ignore WRITER_WAITING for UPDATER
@param l the value of the lock word
@return whether the lock was acquired */
template<bool prioritize_updater= false>
bool read_trylock(uint32_t &l)
{
l= UNLOCKED;
......@@ -59,16 +62,53 @@ class rw_lock
std::memory_order_relaxed))
{
DBUG_ASSERT(!(WRITER & l) || !(~WRITER_PENDING & l));
if (l & WRITER_PENDING)
DBUG_ASSERT((~(WRITER_PENDING | UPDATER) & l) < UPDATER);
if (prioritize_updater
? (WRITER & l) || ((WRITER_WAITING | UPDATER) & l) == WRITER_WAITING
: (WRITER_PENDING & l))
return false;
}
return true;
}
/** Try to acquire an update lock.
@param l the value of the lock word
@return whether the lock was acquired */
bool update_trylock(uint32_t &l)
{
l= UNLOCKED;
while (!lock.compare_exchange_strong(l, l | UPDATER,
std::memory_order_acquire,
std::memory_order_relaxed))
{
DBUG_ASSERT(!(WRITER & l) || !(~WRITER_PENDING & l));
DBUG_ASSERT((~(WRITER_PENDING | UPDATER) & l) < UPDATER);
if ((WRITER_PENDING | UPDATER) & l)
return false;
}
return true;
}
/** Try to upgrade an update lock to an exclusive lock.
@return whether the update lock was upgraded to exclusive */
bool upgrade_trylock()
{
auto l= UPDATER;
while (!lock.compare_exchange_strong(l, l ^ (WRITER | UPDATER),
std::memory_order_acquire,
std::memory_order_relaxed))
{
DBUG_ASSERT(!(~l & (UPDATER - 1)));
DBUG_ASSERT(((WRITER | UPDATER) & l) == UPDATER);
if (~(WRITER_WAITING | UPDATER) & l)
return false;
}
DBUG_ASSERT((l & ~WRITER_WAITING) == UPDATER);
return true;
}
/** Wait for an exclusive lock.
@return whether the exclusive lock was acquired */
bool write_lock_poll()
{
uint32_t l;
auto l= WRITER_WAITING;
if (write_lock_wait_try(l))
return true;
if (!(l & WRITER_WAITING))
......@@ -88,15 +128,26 @@ class rw_lock
bool read_unlock()
{
auto l= lock.fetch_sub(1, std::memory_order_release);
DBUG_ASSERT(~WRITER_PENDING & l); /* at least one read lock */
DBUG_ASSERT(~(WRITER_PENDING | UPDATER) & l); /* at least one read lock */
DBUG_ASSERT(!(l & WRITER)); /* no write lock must have existed */
return (~WRITER_PENDING & l) == 1;
}
/** Release an update lock.
@return whether any writers may have to be woken up */
bool update_unlock()
{
auto l= lock.fetch_and(~UPDATER, std::memory_order_release);
/* the update lock must have existed */
DBUG_ASSERT((l & (WRITER | UPDATER)) == UPDATER);
return !(~(WRITER_PENDING | UPDATER) & l);
}
/** Release an exclusive lock */
void write_unlock()
{
IF_DBUG_ASSERT(auto l=,) lock.fetch_sub(WRITER, std::memory_order_release);
DBUG_ASSERT(l & WRITER); /* the write lock must have existed */
IF_DBUG_ASSERT(auto l=,)
lock.fetch_and(~WRITER, std::memory_order_release);
/* the write lock must have existed */
DBUG_ASSERT((l & (WRITER | UPDATER)) == WRITER);
}
/** Try to acquire a shared lock.
@return whether the lock was acquired */
......@@ -113,6 +164,9 @@ class rw_lock
/** @return whether an exclusive lock is being held by any thread */
bool is_write_locked() const
{ return !!(lock.load(std::memory_order_relaxed) & WRITER); }
/** @return whether an update lock is being held by any thread */
bool is_update_locked() const
{ return !!(lock.load(std::memory_order_relaxed) & UPDATER); }
/** @return whether a shared lock is being held by any thread */
bool is_read_locked() const
{
......
......@@ -25,6 +25,22 @@ this program; if not, write to the Free Software Foundation, Inc.,
# define SRW_LOCK_DUMMY /* Use dummy implementation for debugging purposes */
#endif
#ifdef SRW_LOCK_DUMMY
/** An exclusive-only variant of srw_lock */
class srw_mutex
{
pthread_mutex_t lock;
public:
void init() { pthread_mutex_init(&lock, nullptr); }
void destroy() { pthread_mutex_destroy(&lock); }
void wr_lock() { pthread_mutex_lock(&lock); }
void wr_unlock() { pthread_mutex_unlock(&lock); }
bool wr_lock_try() { return !pthread_mutex_trylock(&lock); }
};
#else
# define srw_mutex srw_lock_low
#endif
#include "rw_lock.h"
/** Slim reader-writer lock with no recursion */
......@@ -43,8 +59,12 @@ class srw_lock_low final : private rw_lock
/** Wait for a read lock.
@param l lock word from a failed read_trylock() */
void read_lock(uint32_t l);
/** Wait for a write lock after a failed write_trylock() */
void write_lock();
/** Wait for an update lock.
@param l lock word from a failed update_trylock() */
void update_lock(uint32_t l);
/** Wait for a write lock after a failed write_trylock() or upgrade_trylock()
@param holding_u whether we already hold u_lock() */
void write_lock(bool holding_u);
/** Wait for signal
@param l lock word from a failed acquisition */
inline void writer_wait(uint32_t l);
......@@ -65,9 +85,15 @@ class srw_lock_low final : private rw_lock
#endif
bool rd_lock_try() { uint32_t l; return read_trylock(l); }
bool wr_lock_try() { return write_trylock(); }
template<bool support_u_lock= false>
void rd_lock() { uint32_t l; if (!read_trylock(l)) read_lock(l); }
void wr_lock() { if (!write_trylock()) write_lock(); }
void u_lock() { uint32_t l; if (!update_trylock(l)) update_lock(l); }
bool u_lock_try() { uint32_t l; return update_trylock(l); }
void u_wr_upgrade() { if (!upgrade_trylock()) write_lock(true); }
template<bool support_u_lock= false>
void wr_lock() { if (!write_trylock()) write_lock(false); }
void rd_unlock();
void u_unlock();
void wr_unlock();
};
......@@ -87,8 +113,12 @@ class srw_lock
srw_lock_low lock;
PSI_rwlock *pfs_psi;
template<bool support_u_lock>
ATTRIBUTE_NOINLINE void psi_rd_lock(const char *file, unsigned line);
template<bool support_u_lock>
ATTRIBUTE_NOINLINE void psi_wr_lock(const char *file, unsigned line);
ATTRIBUTE_NOINLINE void psi_u_lock(const char *file, unsigned line);
ATTRIBUTE_NOINLINE void psi_u_wr_upgrade(const char *file, unsigned line);
public:
void init(mysql_pfs_key_t key)
{
......@@ -104,10 +134,11 @@ class srw_lock
}
lock.destroy();
}
template<bool support_u_lock= false>
void rd_lock(const char *file, unsigned line)
{
if (psi_likely(pfs_psi != nullptr))
psi_rd_lock(file, line);
psi_rd_lock<support_u_lock>(file, line);
else
lock.rd_lock();
}
......@@ -117,10 +148,24 @@ class srw_lock
PSI_RWLOCK_CALL(unlock_rwlock)(pfs_psi);
lock.rd_unlock();
}
void u_lock(const char *file, unsigned line)
{
if (psi_likely(pfs_psi != nullptr))
psi_u_lock(file, line);
else
lock.u_lock();
}
void u_unlock()
{
if (pfs_psi)
PSI_RWLOCK_CALL(unlock_rwlock)(pfs_psi);
lock.u_unlock();
}
template<bool support_u_lock= false>
void wr_lock(const char *file, unsigned line)
{
if (psi_likely(pfs_psi != nullptr))
psi_wr_lock(file, line);
psi_wr_lock<support_u_lock>(file, line);
else
lock.wr_lock();
}
......@@ -130,7 +175,15 @@ class srw_lock
PSI_RWLOCK_CALL(unlock_rwlock)(pfs_psi);
lock.wr_unlock();
}
void u_wr_upgrade(const char *file, unsigned line)
{
if (psi_likely(pfs_psi != nullptr))
psi_u_wr_upgrade(file, line);
else
lock.u_wr_upgrade();
}
bool rd_lock_try() { return lock.rd_lock_try(); }
bool u_lock_try() { return lock.u_lock_try(); }
bool wr_lock_try() { return lock.wr_lock_try(); }
};
#endif
......@@ -130,7 +130,7 @@ void srw_lock_low::read_lock(uint32_t l)
for (auto spin= srv_n_spin_wait_rounds; spin; spin--)
{
ut_delay(srv_spin_wait_delay);
if (read_trylock(l))
if (read_trylock<true>(l))
return;
else if (l == WRITER_WAITING)
goto wake_writer;
......@@ -138,11 +138,50 @@ void srw_lock_low::read_lock(uint32_t l)
readers_wait(l);
}
while (!read_trylock(l));
while (!read_trylock<true>(l));
}
/** Wait for a write lock after a failed write_trylock() */
void srw_lock_low::write_lock()
/** Wait for an update lock.
@param lock word value from a failed update_trylock() */
void srw_lock_low::update_lock(uint32_t l)
{
do
{
if (l == WRITER_WAITING)
{
wake_writer:
#ifdef SRW_LOCK_DUMMY
pthread_mutex_lock(&mutex);
{
pthread_cond_signal(&cond_exclusive);
pthread_cond_wait(&cond_shared, &mutex);
l= value();
}
while (l == WRITER_WAITING);
pthread_mutex_unlock(&mutex);
continue;
#else
writer_wake();
#endif
}
else
for (auto spin= srv_n_spin_wait_rounds; spin; spin--)
{
ut_delay(srv_spin_wait_delay);
if (update_trylock(l))
return;
else if (l == WRITER_WAITING)
goto wake_writer;
}
readers_wait(l);
}
while (!update_trylock(l));
}
/** Wait for a write lock after a failed write_trylock() or upgrade_trylock()
@param holding_u whether we already hold u_lock() */
void srw_lock_low::write_lock(bool holding_u)
{
for (;;)
{
......@@ -150,6 +189,7 @@ void srw_lock_low::write_lock()
/* We are the first writer to be granted the lock. Spin for a while. */
for (auto spin= srv_n_spin_wait_rounds; spin; spin--)
{
l= holding_u ? WRITER_WAITING | UPDATER : WRITER_WAITING;
if (write_lock_wait_try(l))
return;
if (!(l & WRITER_WAITING))
......@@ -157,13 +197,22 @@ void srw_lock_low::write_lock()
ut_delay(srv_spin_wait_delay);
}
l= holding_u ? WRITER_WAITING | UPDATER : WRITER_WAITING;
if (write_lock_wait_try(l))
return;
if (!(l & WRITER_WAITING))
{
if (l == UNLOCKED && write_trylock())
return;
switch (l) {
case UNLOCKED:
DBUG_ASSERT(!holding_u);
if (write_trylock())
return;
break;
case UPDATER:
if (holding_u && upgrade_trylock())
return;
}
l= write_lock_wait_start() | WRITER_WAITING;
}
else
......@@ -175,9 +224,12 @@ void srw_lock_low::write_lock()
void srw_lock_low::rd_unlock() { if (read_unlock()) writer_wake(); }
void srw_lock_low::u_unlock() { if (update_unlock()) writer_wake(); }
void srw_lock_low::wr_unlock() { write_unlock(); readers_wake(); }
#ifdef UNIV_PFS_RWLOCK
template<bool support_u_lock>
void srw_lock::psi_rd_lock(const char *file, unsigned line)
{
PSI_rwlock_locker_state state;
......@@ -185,7 +237,9 @@ void srw_lock::psi_rd_lock(const char *file, unsigned line)
const bool nowait= lock.read_trylock(l);
if (PSI_rwlock_locker *locker= PSI_RWLOCK_CALL(start_rwlock_rdwait)
(&state, pfs_psi,
nowait ? PSI_RWLOCK_TRYREADLOCK : PSI_RWLOCK_READLOCK, file, line))
support_u_lock
? (nowait ? PSI_RWLOCK_TRYSHAREDLOCK : PSI_RWLOCK_SHAREDLOCK)
: (nowait ? PSI_RWLOCK_TRYREADLOCK : PSI_RWLOCK_READLOCK), file, line))
{
if (!nowait)
lock.read_lock(l);
......@@ -195,13 +249,32 @@ void srw_lock::psi_rd_lock(const char *file, unsigned line)
lock.read_lock(l);
}
template void srw_lock::psi_rd_lock<false>(const char *, unsigned);
template void srw_lock::psi_rd_lock<true>(const char *, unsigned);
void srw_lock::psi_u_lock(const char *file, unsigned line)
{
PSI_rwlock_locker_state state;
if (PSI_rwlock_locker *locker= PSI_RWLOCK_CALL(start_rwlock_wrwait)
(&state, pfs_psi, PSI_RWLOCK_SHAREDEXCLUSIVELOCK, file, line))
{
lock.u_lock();
PSI_RWLOCK_CALL(end_rwlock_rdwait)(locker, 0);
}
else
lock.u_lock();
}
template<bool support_u_lock>
void srw_lock::psi_wr_lock(const char *file, unsigned line)
{
PSI_rwlock_locker_state state;
const bool nowait= lock.write_trylock();
if (PSI_rwlock_locker *locker= PSI_RWLOCK_CALL(start_rwlock_wrwait)
(&state, pfs_psi,
nowait ? PSI_RWLOCK_TRYWRITELOCK : PSI_RWLOCK_WRITELOCK,
support_u_lock
? (nowait ? PSI_RWLOCK_TRYEXCLUSIVELOCK : PSI_RWLOCK_EXCLUSIVELOCK)
: (nowait ? PSI_RWLOCK_TRYWRITELOCK : PSI_RWLOCK_WRITELOCK),
file, line))
{
if (!nowait)
......@@ -211,4 +284,24 @@ void srw_lock::psi_wr_lock(const char *file, unsigned line)
else if (!nowait)
lock.wr_lock();
}
template void srw_lock::psi_wr_lock<false>(const char *, unsigned);
template void srw_lock::psi_wr_lock<true>(const char *, unsigned);
void srw_lock::psi_u_wr_upgrade(const char *file, unsigned line)
{
PSI_rwlock_locker_state state;
const bool nowait= lock.upgrade_trylock();
if (PSI_rwlock_locker *locker= PSI_RWLOCK_CALL(start_rwlock_wrwait)
(&state, pfs_psi,
nowait ? PSI_RWLOCK_TRYEXCLUSIVELOCK : PSI_RWLOCK_EXCLUSIVELOCK,
file, line))
{
if (!nowait)
lock.write_lock(true);
PSI_RWLOCK_CALL(end_rwlock_rdwait)(locker, 0);
}
else if (!nowait)
lock.write_lock(true);
}
#endif /* UNIV_PFS_RWLOCK */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment