Commit f8d213bd authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-34178: Improve the spin loops

srw_mutex_impl<spinloop>::wait_and_lock(): Invoke srw_pause() and
reload the lock word on each loop. Thanks to Mark Callaghan for
suggesting this.

ssux_lock_impl<spinloop>::rd_wait(): Actually implement a spin loop
on the rw-lock component without blocking on the mutex component.
If there is a conflict with wr_lock(), wait for writer.lock to be
released without actually acquiring it.

Reviewed by: Debarun Banerjee
Tested by: Matthias Leich
parent 6cde03ae
......@@ -72,10 +72,13 @@ inline void pthread_mutex_wrapper<true>::wr_lock()
# endif
#endif
template<bool spinloop> class ssux_lock_impl;
/** Futex-based mutex */
template<bool spinloop>
class srw_mutex_impl final
{
friend ssux_lock_impl<spinloop>;
/** The lock word, containing HOLDER + 1 if the lock is being held,
plus the number of waiters */
std::atomic<uint32_t> lock;
......@@ -97,6 +100,8 @@ class srw_mutex_impl final
inline void wait(uint32_t lk);
/** Wake up one wait() thread */
void wake();
/** Wake up all wait() threads */
inline void wake_all();
public:
/** @return whether the mutex is being held or waited for */
bool is_locked_or_waiting() const
......@@ -209,22 +214,25 @@ class ssux_lock_impl
/** @return whether the lock is being held or waited for */
bool is_vacant() const { return !is_locked_or_waiting(); }
#endif /* !DBUG_OFF */
bool rd_lock_try()
private:
/** Try to acquire a shared latch.
@return the lock word value if the latch was not acquired
@retval 0 if the latch was acquired */
uint32_t rd_lock_try_low()
{
uint32_t lk= 0;
while (!readers.compare_exchange_weak(lk, lk + 1,
std::memory_order_acquire,
std::memory_order_relaxed))
if (lk & WRITER)
return false;
return true;
return lk;
return 0;
}
public:
bool u_lock_try()
{
return writer.wr_lock_try();
}
bool rd_lock_try() { return rd_lock_try_low() == 0; }
bool u_lock_try() { return writer.wr_lock_try(); }
bool wr_lock_try()
{
......
......@@ -191,6 +191,13 @@ void srw_mutex_impl<spinloop>::wake()
pthread_mutex_unlock(&mutex);
}
template<bool spinloop>
inline void srw_mutex_impl<spinloop>::wake_all()
{
pthread_mutex_lock(&mutex);
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mutex);
}
template<bool spinloop>
void ssux_lock_impl<spinloop>::wake()
{
pthread_mutex_lock(&writer.mutex);
......@@ -207,6 +214,8 @@ inline void srw_mutex_impl<spinloop>::wait(uint32_t lk)
{ WaitOnAddress(&lock, &lk, 4, INFINITE); }
template<bool spinloop>
void srw_mutex_impl<spinloop>::wake() { WakeByAddressSingle(&lock); }
template<bool spinloop>
inline void srw_mutex_impl<spinloop>::wake_all() { WakeByAddressAll(&lock); }
template<bool spinloop>
inline void ssux_lock_impl<spinloop>::wait(uint32_t lk)
......@@ -244,6 +253,8 @@ inline void srw_mutex_impl<spinloop>::wait(uint32_t lk)
{ SRW_FUTEX(&lock, WAIT, lk); }
template<bool spinloop>
void srw_mutex_impl<spinloop>::wake() { SRW_FUTEX(&lock, WAKE, 1); }
template<bool spinloop>
void srw_mutex_impl<spinloop>::wake_all() { SRW_FUTEX(&lock, WAKE, INT_MAX); }
template<bool spinloop>
inline void ssux_lock_impl<spinloop>::wait(uint32_t lk)
......@@ -304,9 +315,8 @@ void srw_mutex_impl<spinloop>::wait_and_lock()
for (auto spin= srv_n_spin_wait_rounds;;)
{
DBUG_ASSERT(~HOLDER & lk);
if (lk & HOLDER)
lk= lock.load(std::memory_order_relaxed);
else
if (!(lk & HOLDER))
{
#ifdef IF_NOT_FETCH_OR_GOTO
static_assert(HOLDER == (1U << 31), "compatibility");
......@@ -316,10 +326,10 @@ void srw_mutex_impl<spinloop>::wait_and_lock()
if (!((lk= lock.fetch_or(HOLDER, std::memory_order_relaxed)) & HOLDER))
goto acquired;
#endif
srw_pause(delay);
}
if (!--spin)
break;
srw_pause(delay);
}
}
......@@ -392,14 +402,52 @@ template void ssux_lock_impl<false>::wr_wait(uint32_t);
template<bool spinloop>
void ssux_lock_impl<spinloop>::rd_wait()
{
const unsigned delay= srw_pause_delay();
if (spinloop)
{
for (auto spin= srv_n_spin_wait_rounds; spin; spin--)
{
srw_pause(delay);
if (rd_lock_try())
return;
}
}
/* Subscribe to writer.wake() or write.wake_all() calls by
concurrently executing rd_wait() or writer.wr_unlock(). */
uint32_t wl= 1 + writer.lock.fetch_add(1, std::memory_order_acquire);
for (;;)
{
writer.wr_lock();
bool acquired= rd_lock_try();
writer.wr_unlock();
if (acquired)
if (UNIV_LIKELY(writer.HOLDER & wl))
writer.wait(wl);
uint32_t lk= rd_lock_try_low();
if (!lk)
break;
if (UNIV_UNLIKELY(lk == WRITER)) /* A wr_lock() just succeeded. */
/* Immediately wake up (also) wr_lock(). We may also unnecessarily
wake up other concurrent threads that are executing rd_wait().
If we invoked writer.wake() here to wake up just one thread,
we could wake up a rd_wait(), which then would invoke writer.wake(),
waking up possibly another rd_wait(), and we could end up doing
lots of non-productive context switching until the wr_lock()
is finally woken up. */
writer.wake_all();
srw_pause(delay);
wl= writer.lock.load(std::memory_order_acquire);
ut_ad(wl);
}
/* Unsubscribe writer.wake() and writer.wake_all(). */
wl= writer.lock.fetch_sub(1, std::memory_order_release);
ut_ad(wl);
/* Wake any other threads that may be blocked in writer.wait().
All other waiters than this rd_wait() would end up acquiring writer.lock
and waking up other threads on unlock(). */
if (wl > 1)
writer.wake_all();
}
template void ssux_lock_impl<true>::rd_wait();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment