Commit 4b5bf654 authored by Marko Mäkelä's avatar Marko Mäkelä

srw_lock, rw_lock: Implement update mode

FIXME: Pass the line number information to performance_schema

FIXME: Remove PSI_RWLOCK_SHAREDLOCK and friends, just use
PSI_RWLOCK_READLOCK and friends.
parent b74dae03
...@@ -20,7 +20,7 @@ this program; if not, write to the Free Software Foundation, Inc., ...@@ -20,7 +20,7 @@ this program; if not, write to the Free Software Foundation, Inc.,
#include <atomic> #include <atomic>
#include "my_dbug.h" #include "my_dbug.h"
/** Simple read-write lock based on std::atomic */ /** Simple read-update-write lock based on std::atomic */
class rw_lock class rw_lock
{ {
/** The lock word */ /** The lock word */
...@@ -35,6 +35,8 @@ class rw_lock ...@@ -35,6 +35,8 @@ class rw_lock
static constexpr uint32_t WRITER_WAITING= 1U << 30; static constexpr uint32_t WRITER_WAITING= 1U << 30;
/** Flag to indicate that write_lock() or write_lock_wait() is pending */ /** Flag to indicate that write_lock() or write_lock_wait() is pending */
static constexpr uint32_t WRITER_PENDING= WRITER | WRITER_WAITING; static constexpr uint32_t WRITER_PENDING= WRITER | WRITER_WAITING;
/** Flag to indicate that an update lock exists */
static constexpr uint32_t UPDATER= 1U << 29;
/** Start waiting for an exclusive lock. /** Start waiting for an exclusive lock.
@return current value of the lock word */ @return current value of the lock word */
...@@ -45,13 +47,14 @@ class rw_lock ...@@ -45,13 +47,14 @@ class rw_lock
@return whether the exclusive lock was acquired */ @return whether the exclusive lock was acquired */
bool write_lock_wait_try(uint32_t &l) bool write_lock_wait_try(uint32_t &l)
{ {
l= WRITER_WAITING;
return lock.compare_exchange_strong(l, WRITER, std::memory_order_acquire, return lock.compare_exchange_strong(l, WRITER, std::memory_order_acquire,
std::memory_order_relaxed); std::memory_order_relaxed);
} }
/** Try to acquire a shared lock. /** Try to acquire a shared lock.
@tparam prioritize_updater whether to ignore WRITER_WAITING for UPDATER
@param l the value of the lock word @param l the value of the lock word
@return whether the lock was acquired */ @return whether the lock was acquired */
template<bool prioritize_updater= false>
bool read_trylock(uint32_t &l) bool read_trylock(uint32_t &l)
{ {
l= UNLOCKED; l= UNLOCKED;
...@@ -59,16 +62,53 @@ class rw_lock ...@@ -59,16 +62,53 @@ class rw_lock
std::memory_order_relaxed)) std::memory_order_relaxed))
{ {
DBUG_ASSERT(!(WRITER & l) || !(~WRITER_PENDING & l)); DBUG_ASSERT(!(WRITER & l) || !(~WRITER_PENDING & l));
if (l & WRITER_PENDING) DBUG_ASSERT((~(WRITER_PENDING | UPDATER) & l) < UPDATER);
if (prioritize_updater
? (WRITER & l) || ((WRITER_WAITING | UPDATER) & l) == WRITER_WAITING
: (WRITER_PENDING & l))
return false; return false;
} }
return true; return true;
} }
/** Try to acquire an update lock.
@param l the value of the lock word
@return whether the lock was acquired */
bool update_trylock(uint32_t &l)
{
l= UNLOCKED;
while (!lock.compare_exchange_strong(l, l | UPDATER,
std::memory_order_acquire,
std::memory_order_relaxed))
{
DBUG_ASSERT(!(WRITER & l) || !(~WRITER_PENDING & l));
DBUG_ASSERT((~(WRITER_PENDING | UPDATER) & l) < UPDATER);
if ((WRITER_PENDING | UPDATER) & l)
return false;
}
return true;
}
/** Try to upgrade an update lock to an exclusive lock.
@return whether the update lock was upgraded to exclusive */
bool upgrade_trylock()
{
auto l= UPDATER;
while (!lock.compare_exchange_strong(l, l ^ (WRITER | UPDATER),
std::memory_order_acquire,
std::memory_order_relaxed))
{
DBUG_ASSERT(!(~l & (UPDATER - 1)));
DBUG_ASSERT(((WRITER | UPDATER) & l) == UPDATER);
if (~(WRITER_WAITING | UPDATER) & l)
return false;
}
DBUG_ASSERT((l & ~WRITER_WAITING) == UPDATER);
return true;
}
/** Wait for an exclusive lock. /** Wait for an exclusive lock.
@return whether the exclusive lock was acquired */ @return whether the exclusive lock was acquired */
bool write_lock_poll() bool write_lock_poll()
{ {
uint32_t l; auto l= WRITER_WAITING;
if (write_lock_wait_try(l)) if (write_lock_wait_try(l))
return true; return true;
if (!(l & WRITER_WAITING)) if (!(l & WRITER_WAITING))
...@@ -88,15 +128,26 @@ class rw_lock ...@@ -88,15 +128,26 @@ class rw_lock
bool read_unlock() bool read_unlock()
{ {
auto l= lock.fetch_sub(1, std::memory_order_release); auto l= lock.fetch_sub(1, std::memory_order_release);
DBUG_ASSERT(~WRITER_PENDING & l); /* at least one read lock */ DBUG_ASSERT(~(WRITER_PENDING | UPDATER) & l); /* at least one read lock */
DBUG_ASSERT(!(l & WRITER)); /* no write lock must have existed */ DBUG_ASSERT(!(l & WRITER)); /* no write lock must have existed */
return (~WRITER_PENDING & l) == 1; return (~WRITER_PENDING & l) == 1;
} }
/** Release an update lock.
@return whether any writers may have to be woken up */
bool update_unlock()
{
auto l= lock.fetch_and(~UPDATER, std::memory_order_release);
/* the update lock must have existed */
DBUG_ASSERT((l & (WRITER | UPDATER)) == UPDATER);
return !(~(WRITER_PENDING | UPDATER) & l);
}
/** Release an exclusive lock */ /** Release an exclusive lock */
void write_unlock() void write_unlock()
{ {
IF_DBUG_ASSERT(auto l=,) lock.fetch_sub(WRITER, std::memory_order_release); IF_DBUG_ASSERT(auto l=,)
DBUG_ASSERT(l & WRITER); /* the write lock must have existed */ lock.fetch_and(~WRITER, std::memory_order_release);
/* the write lock must have existed */
DBUG_ASSERT((l & (WRITER | UPDATER)) == WRITER);
} }
/** Try to acquire a shared lock. /** Try to acquire a shared lock.
@return whether the lock was acquired */ @return whether the lock was acquired */
...@@ -113,6 +164,9 @@ class rw_lock ...@@ -113,6 +164,9 @@ class rw_lock
/** @return whether an exclusive lock is being held by any thread */ /** @return whether an exclusive lock is being held by any thread */
bool is_write_locked() const bool is_write_locked() const
{ return !!(lock.load(std::memory_order_relaxed) & WRITER); } { return !!(lock.load(std::memory_order_relaxed) & WRITER); }
/** @return whether an update lock is being held by any thread */
bool is_update_locked() const
{ return !!(lock.load(std::memory_order_relaxed) & UPDATER); }
/** @return whether a shared lock is being held by any thread */ /** @return whether a shared lock is being held by any thread */
bool is_read_locked() const bool is_read_locked() const
{ {
......
...@@ -33,7 +33,7 @@ class srw_mutex ...@@ -33,7 +33,7 @@ class srw_mutex
public: public:
void init() { pthread_mutex_init(&lock, nullptr); } void init() { pthread_mutex_init(&lock, nullptr); }
void destroy() { pthread_mutex_destroy(&lock); } void destroy() { pthread_mutex_destroy(&lock); }
void wr_lock() { pthread_mutex_lock(&lock); } template<bool update=false> void wr_lock() { pthread_mutex_lock(&lock); }
void wr_unlock() { pthread_mutex_unlock(&lock); } void wr_unlock() { pthread_mutex_unlock(&lock); }
bool wr_lock_try() { return !pthread_mutex_trylock(&lock); } bool wr_lock_try() { return !pthread_mutex_trylock(&lock); }
}; };
...@@ -58,8 +58,12 @@ class srw_lock_low final : private rw_lock ...@@ -58,8 +58,12 @@ class srw_lock_low final : private rw_lock
/** Wait for a read lock. /** Wait for a read lock.
@param l lock word from a failed read_trylock() */ @param l lock word from a failed read_trylock() */
void read_lock(uint32_t l); void read_lock(uint32_t l);
/** Wait for a write lock after a failed write_trylock() */ /** Wait for an update lock.
void write_lock(); @param l lock word from a failed update_trylock() */
void update_lock(uint32_t l);
/** Wait for a write lock after a failed write_trylock() or upgrade_trylock()
@param holding_u whether we already hold u_lock() */
void write_lock(bool holding_u);
/** Wait for signal /** Wait for signal
@param l lock word from a failed acquisition */ @param l lock word from a failed acquisition */
inline void wait(uint32_t l); inline void wait(uint32_t l);
...@@ -77,9 +81,15 @@ class srw_lock_low final : private rw_lock ...@@ -77,9 +81,15 @@ class srw_lock_low final : private rw_lock
#endif #endif
bool rd_lock_try() { uint32_t l; return read_trylock(l); } bool rd_lock_try() { uint32_t l; return read_trylock(l); }
bool wr_lock_try() { return write_trylock(); } bool wr_lock_try() { return write_trylock(); }
template<bool update=false>
void rd_lock() { uint32_t l; if (!read_trylock(l)) read_lock(l); } void rd_lock() { uint32_t l; if (!read_trylock(l)) read_lock(l); }
void wr_lock() { if (!write_trylock()) write_lock(); } void u_lock() { uint32_t l; if (!update_trylock(l)) update_lock(l); }
bool u_lock_try() { uint32_t l; return update_trylock(l); }
void u_wr_upgrade() { if (!upgrade_trylock()) write_lock(true); }
template<bool update=false>
void wr_lock() { if (!write_trylock()) write_lock(false); }
void rd_unlock(); void rd_unlock();
void u_unlock();
void wr_unlock(); void wr_unlock();
}; };
...@@ -110,6 +120,7 @@ class srw_lock ...@@ -110,6 +120,7 @@ class srw_lock
} }
lock.destroy(); lock.destroy();
} }
template<bool update= false>
void rd_lock() void rd_lock()
{ {
uint32_t l; uint32_t l;
...@@ -119,7 +130,8 @@ class srw_lock ...@@ -119,7 +130,8 @@ class srw_lock
{ {
PSI_rwlock_locker_state state; PSI_rwlock_locker_state state;
PSI_rwlock_locker *locker= PSI_RWLOCK_CALL(start_rwlock_rdwait) PSI_rwlock_locker *locker= PSI_RWLOCK_CALL(start_rwlock_rdwait)
(&state, pfs_psi, PSI_RWLOCK_READLOCK, __FILE__, __LINE__); (&state, pfs_psi, update ? PSI_RWLOCK_SHAREDLOCK : PSI_RWLOCK_READLOCK,
__FILE__, __LINE__);
lock.read_lock(l); lock.read_lock(l);
if (locker) if (locker)
PSI_RWLOCK_CALL(end_rwlock_rdwait)(locker, 0); PSI_RWLOCK_CALL(end_rwlock_rdwait)(locker, 0);
...@@ -133,6 +145,29 @@ class srw_lock ...@@ -133,6 +145,29 @@ class srw_lock
PSI_RWLOCK_CALL(unlock_rwlock)(pfs_psi); PSI_RWLOCK_CALL(unlock_rwlock)(pfs_psi);
lock.rd_unlock(); lock.rd_unlock();
} }
void u_lock()
{
if (pfs_psi)
{
if (lock.u_lock_try())
return;
PSI_rwlock_locker_state state;
PSI_rwlock_locker *locker= PSI_RWLOCK_CALL(start_rwlock_wrwait)
(&state, pfs_psi, PSI_RWLOCK_SHAREDEXCLUSIVELOCK, __FILE__, __LINE__);
lock.u_lock();
if (locker)
PSI_RWLOCK_CALL(end_rwlock_rdwait)(locker, 0);
return;
}
lock.u_lock();
}
void u_unlock()
{
if (pfs_psi)
PSI_RWLOCK_CALL(unlock_rwlock)(pfs_psi);
lock.u_unlock();
}
template<bool update= false>
void wr_lock() void wr_lock()
{ {
if (lock.write_trylock()) if (lock.write_trylock())
...@@ -141,13 +176,15 @@ class srw_lock ...@@ -141,13 +176,15 @@ class srw_lock
{ {
PSI_rwlock_locker_state state; PSI_rwlock_locker_state state;
PSI_rwlock_locker *locker= PSI_RWLOCK_CALL(start_rwlock_wrwait) PSI_rwlock_locker *locker= PSI_RWLOCK_CALL(start_rwlock_wrwait)
(&state, pfs_psi, PSI_RWLOCK_WRITELOCK, __FILE__, __LINE__); (&state, pfs_psi,
lock.write_lock(); update ? PSI_RWLOCK_EXCLUSIVELOCK : PSI_RWLOCK_WRITELOCK,
__FILE__, __LINE__);
lock.write_lock(false);
if (locker) if (locker)
PSI_RWLOCK_CALL(end_rwlock_rdwait)(locker, 0); PSI_RWLOCK_CALL(end_rwlock_rdwait)(locker, 0);
return; return;
} }
lock.write_lock(); lock.write_lock(false);
} }
void wr_unlock() void wr_unlock()
{ {
...@@ -155,7 +192,24 @@ class srw_lock ...@@ -155,7 +192,24 @@ class srw_lock
PSI_RWLOCK_CALL(unlock_rwlock)(pfs_psi); PSI_RWLOCK_CALL(unlock_rwlock)(pfs_psi);
lock.wr_unlock(); lock.wr_unlock();
} }
void u_wr_upgrade()
{
if (lock.upgrade_trylock())
return;
if (pfs_psi)
{
PSI_rwlock_locker_state state;
PSI_rwlock_locker *locker= PSI_RWLOCK_CALL(start_rwlock_wrwait)
(&state, pfs_psi, PSI_RWLOCK_WRITELOCK, __FILE__, __LINE__);
lock.write_lock(true);
if (locker)
PSI_RWLOCK_CALL(end_rwlock_rdwait)(locker, 0);
return;
}
lock.write_lock(true);
}
bool rd_lock_try() { return lock.rd_lock_try(); } bool rd_lock_try() { return lock.rd_lock_try(); }
bool u_lock_try() { return lock.u_lock_try(); }
bool wr_lock_try() { return lock.wr_lock_try(); } bool wr_lock_try() { return lock.wr_lock_try(); }
}; };
#endif #endif
...@@ -114,7 +114,7 @@ void srw_lock_low::read_lock(uint32_t l) ...@@ -114,7 +114,7 @@ void srw_lock_low::read_lock(uint32_t l)
for (auto spin= srv_n_spin_wait_rounds; spin; spin--) for (auto spin= srv_n_spin_wait_rounds; spin; spin--)
{ {
ut_delay(srv_spin_wait_delay); ut_delay(srv_spin_wait_delay);
if (read_trylock(l)) if (read_trylock<true>(l))
return; return;
else if (l == WRITER_WAITING) else if (l == WRITER_WAITING)
goto wake_writer; goto wake_writer;
...@@ -122,11 +122,50 @@ void srw_lock_low::read_lock(uint32_t l) ...@@ -122,11 +122,50 @@ void srw_lock_low::read_lock(uint32_t l)
wait(l); wait(l);
} }
while (!read_trylock(l)); while (!read_trylock<true>(l));
} }
/** Wait for a write lock after a failed write_trylock() */ /** Wait for an update lock.
void srw_lock_low::write_lock() @param lock word value from a failed update_trylock() */
void srw_lock_low::update_lock(uint32_t l)
{
do
{
if (l == WRITER_WAITING)
{
wake_writer:
#ifdef SRW_LOCK_DUMMY
pthread_mutex_lock(&mutex);
{
pthread_cond_signal(&cond);
pthread_cond_wait(&cond, &mutex);
l= value();
}
while (l == WRITER_WAITING);
pthread_mutex_unlock(&mutex);
continue;
#else
wake_one();
#endif
}
else
for (auto spin= srv_n_spin_wait_rounds; spin; spin--)
{
ut_delay(srv_spin_wait_delay);
if (update_trylock(l))
return;
else if (l == WRITER_WAITING)
goto wake_writer;
}
wait(l);
}
while (!update_trylock(l));
}
/** Wait for a write lock after a failed write_trylock() or upgrade_trylock()
@param holding_u whether we already hold u_lock() */
void srw_lock_low::write_lock(bool holding_u)
{ {
for (;;) for (;;)
{ {
...@@ -134,6 +173,7 @@ void srw_lock_low::write_lock() ...@@ -134,6 +173,7 @@ void srw_lock_low::write_lock()
/* We are the first writer to be granted the lock. Spin for a while. */ /* We are the first writer to be granted the lock. Spin for a while. */
for (auto spin= srv_n_spin_wait_rounds; spin; spin--) for (auto spin= srv_n_spin_wait_rounds; spin; spin--)
{ {
l= holding_u ? WRITER_WAITING | UPDATER : WRITER_WAITING;
if (write_lock_wait_try(l)) if (write_lock_wait_try(l))
return; return;
if (!(l & WRITER_WAITING)) if (!(l & WRITER_WAITING))
...@@ -141,13 +181,22 @@ void srw_lock_low::write_lock() ...@@ -141,13 +181,22 @@ void srw_lock_low::write_lock()
ut_delay(srv_spin_wait_delay); ut_delay(srv_spin_wait_delay);
} }
l= holding_u ? WRITER_WAITING | UPDATER : WRITER_WAITING;
if (write_lock_wait_try(l)) if (write_lock_wait_try(l))
return; return;
if (!(l & WRITER_WAITING)) if (!(l & WRITER_WAITING))
{ {
if (l == UNLOCKED && write_trylock()) switch (l) {
return; case UNLOCKED:
DBUG_ASSERT(!holding_u);
if (write_trylock())
return;
break;
case UPDATER:
if (holding_u && upgrade_trylock())
return;
}
l= write_lock_wait_start() | WRITER_WAITING; l= write_lock_wait_start() | WRITER_WAITING;
} }
else else
...@@ -158,5 +207,5 @@ void srw_lock_low::write_lock() ...@@ -158,5 +207,5 @@ void srw_lock_low::write_lock()
} }
void srw_lock_low::rd_unlock() { if (read_unlock()) wake_one(); } void srw_lock_low::rd_unlock() { if (read_unlock()) wake_one(); }
void srw_lock_low::u_unlock() { if (update_unlock()) wake_one(); }
void srw_lock_low::wr_unlock() { write_unlock(); wake_all(); } void srw_lock_low::wr_unlock() { write_unlock(); wake_all(); }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment