Commit 6b2d1766 authored by Marko Mäkelä's avatar Marko Mäkelä

MDEV-24167 fixup: Avoid hangs in SRW_LOCK_DUMMY

In commit 1fdc161d we introduced
a mutex-and-condition-variable based fallback implementation
for platforms that lack a futex system call. That implementation
is prone to hangs.

Let us use separate condition variables for write and read requests.
parent c152f595
...@@ -52,6 +52,7 @@ class srw_lock_low final : private rw_lock ...@@ -52,6 +52,7 @@ class srw_lock_low final : private rw_lock
#ifdef SRW_LOCK_DUMMY #ifdef SRW_LOCK_DUMMY
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t cond; pthread_cond_t cond;
pthread_cond_t cond_ex;
#endif #endif
/** @return pointer to the lock word */ /** @return pointer to the lock word */
rw_lock *word() { return static_cast<rw_lock*>(this); } rw_lock *word() { return static_cast<rw_lock*>(this); }
...@@ -66,11 +67,14 @@ class srw_lock_low final : private rw_lock ...@@ -66,11 +67,14 @@ class srw_lock_low final : private rw_lock
void write_lock(bool holding_u); void write_lock(bool holding_u);
/** Wait for signal /** Wait for signal
@param l lock word from a failed acquisition */ @param l lock word from a failed acquisition */
inline void wait(uint32_t l); inline void writer_wait(uint32_t l);
/** Wait for signal
@param l lock word from a failed acquisition */
inline void readers_wait(uint32_t l);
/** Send signal to one waiter */ /** Send signal to one waiter */
inline void wake_one(); inline void writer_wake();
/** Send signal to all waiters */ /** Send signal to all waiters */
inline void wake_all(); inline void readers_wake();
public: public:
#ifdef SRW_LOCK_DUMMY #ifdef SRW_LOCK_DUMMY
void init(); void init();
......
...@@ -25,6 +25,7 @@ void srw_lock_low::init() ...@@ -25,6 +25,7 @@ void srw_lock_low::init()
DBUG_ASSERT(!is_locked_or_waiting()); DBUG_ASSERT(!is_locked_or_waiting());
pthread_mutex_init(&mutex, nullptr); pthread_mutex_init(&mutex, nullptr);
pthread_cond_init(&cond, nullptr); pthread_cond_init(&cond, nullptr);
pthread_cond_init(&cond_ex, nullptr);
} }
void srw_lock_low::destroy() void srw_lock_low::destroy()
...@@ -32,9 +33,18 @@ void srw_lock_low::destroy() ...@@ -32,9 +33,18 @@ void srw_lock_low::destroy()
DBUG_ASSERT(!is_locked_or_waiting()); DBUG_ASSERT(!is_locked_or_waiting());
pthread_mutex_destroy(&mutex); pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond); pthread_cond_destroy(&cond);
pthread_cond_destroy(&cond_ex);
} }
inline void srw_lock_low::wait(uint32_t l) inline void srw_lock_low::writer_wait(uint32_t l)
{
pthread_mutex_lock(&mutex);
if (value() == l)
pthread_cond_wait(&cond_ex, &mutex);
pthread_mutex_unlock(&mutex);
}
inline void srw_lock_low::readers_wait(uint32_t l)
{ {
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
if (value() == l) if (value() == l)
...@@ -42,30 +52,39 @@ inline void srw_lock_low::wait(uint32_t l) ...@@ -42,30 +52,39 @@ inline void srw_lock_low::wait(uint32_t l)
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
} }
inline void srw_lock_low::wake_one() #if 0
inline void srw_lock_low::writer_wake()
{ {
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
pthread_cond_signal(&cond); pthread_cond_broadcast(&cond_ex);
if (!(value() & WRITER_PENDING))
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
} }
#endif
inline void srw_lock_low::wake_all() inline void srw_lock_low::writer_wake()
{ {
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
pthread_cond_broadcast(&cond); uint32_t l= value();
if (!(l & WRITER))
pthread_cond_broadcast((l & WRITER_PENDING) ? &cond_ex : &cond);
else
DBUG_ASSERT(!(l & ~WRITER_PENDING));
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
} }
# define readers_wake writer_wake
#else #else
static_assert(4 == sizeof(rw_lock), "ABI"); static_assert(4 == sizeof(rw_lock), "ABI");
# ifdef _WIN32 # ifdef _WIN32
# include <synchapi.h> # include <synchapi.h>
inline void srw_lock_low::wait(uint32_t l) inline void srw_lock_low::writer_wait(uint32_t l)
{ {
WaitOnAddress(word(), &l, 4, INFINITE); WaitOnAddress(word(), &l, 4, INFINITE);
} }
inline void srw_lock_low::wake_one() { WakeByAddressSingle(word()); } inline void srw_lock_low::writer_wake() { WakeByAddressSingle(word()); }
inline void srw_lock_low::wake_all() { WakeByAddressAll(word()); } inline void srw_lock_low::readers_wake() { WakeByAddressAll(word()); }
# else # else
# ifdef __linux__ # ifdef __linux__
# include <linux/futex.h> # include <linux/futex.h>
...@@ -81,10 +100,14 @@ inline void srw_lock_low::wake_all() { WakeByAddressAll(word()); } ...@@ -81,10 +100,14 @@ inline void srw_lock_low::wake_all() { WakeByAddressAll(word()); }
# error "no futex support" # error "no futex support"
# endif # endif
inline void srw_lock_low::wait(uint32_t l) { SRW_FUTEX(word(), WAIT, l); } inline void srw_lock_low::writer_wait(uint32_t l)
inline void srw_lock_low::wake_one() { SRW_FUTEX(word(), WAKE, 1); } {
inline void srw_lock_low::wake_all() { SRW_FUTEX(word(), WAKE, INT_MAX); } SRW_FUTEX(word(), WAIT, l);
}
inline void srw_lock_low::writer_wake() { SRW_FUTEX(word(), WAKE, 1); }
inline void srw_lock_low::readers_wake() { SRW_FUTEX(word(), WAKE, INT_MAX); }
# endif # endif
# define readers_wait writer_wait
#endif #endif
/** Wait for a read lock. /** Wait for a read lock.
...@@ -99,7 +122,7 @@ void srw_lock_low::read_lock(uint32_t l) ...@@ -99,7 +122,7 @@ void srw_lock_low::read_lock(uint32_t l)
#ifdef SRW_LOCK_DUMMY #ifdef SRW_LOCK_DUMMY
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
{ {
pthread_cond_signal(&cond); pthread_cond_signal(&cond_ex);
pthread_cond_wait(&cond, &mutex); pthread_cond_wait(&cond, &mutex);
l= value(); l= value();
} }
...@@ -107,7 +130,7 @@ void srw_lock_low::read_lock(uint32_t l) ...@@ -107,7 +130,7 @@ void srw_lock_low::read_lock(uint32_t l)
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
continue; continue;
#else #else
wake_one(); writer_wake();
#endif #endif
} }
else else
...@@ -120,7 +143,7 @@ void srw_lock_low::read_lock(uint32_t l) ...@@ -120,7 +143,7 @@ void srw_lock_low::read_lock(uint32_t l)
goto wake_writer; goto wake_writer;
} }
wait(l); readers_wait(l);
} }
while (!read_trylock<true>(l)); while (!read_trylock<true>(l));
} }
...@@ -137,7 +160,7 @@ void srw_lock_low::update_lock(uint32_t l) ...@@ -137,7 +160,7 @@ void srw_lock_low::update_lock(uint32_t l)
#ifdef SRW_LOCK_DUMMY #ifdef SRW_LOCK_DUMMY
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
{ {
pthread_cond_signal(&cond); pthread_cond_signal(&cond_ex);
pthread_cond_wait(&cond, &mutex); pthread_cond_wait(&cond, &mutex);
l= value(); l= value();
} }
...@@ -145,7 +168,7 @@ void srw_lock_low::update_lock(uint32_t l) ...@@ -145,7 +168,7 @@ void srw_lock_low::update_lock(uint32_t l)
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
continue; continue;
#else #else
wake_one(); writer_wake();
#endif #endif
} }
else else
...@@ -158,7 +181,7 @@ void srw_lock_low::update_lock(uint32_t l) ...@@ -158,7 +181,7 @@ void srw_lock_low::update_lock(uint32_t l)
goto wake_writer; goto wake_writer;
} }
wait(l); readers_wait(l);
} }
while (!update_trylock(l)); while (!update_trylock(l));
} }
...@@ -202,10 +225,10 @@ void srw_lock_low::write_lock(bool holding_u) ...@@ -202,10 +225,10 @@ void srw_lock_low::write_lock(bool holding_u)
else else
DBUG_ASSERT(~WRITER_WAITING & l); DBUG_ASSERT(~WRITER_WAITING & l);
wait(l); writer_wait(l);
} }
} }
void srw_lock_low::rd_unlock() { if (read_unlock()) wake_one(); } void srw_lock_low::rd_unlock() { if (read_unlock()) writer_wake(); }
void srw_lock_low::u_unlock() { if (update_unlock()) wake_one(); } void srw_lock_low::u_unlock() { if (update_unlock()) writer_wake(); }
void srw_lock_low::wr_unlock() { write_unlock(); wake_all(); } void srw_lock_low::wr_unlock() { write_unlock(); readers_wake(); }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment