Commit d257cc8c authored by Waiman Long's avatar Waiman Long Committed by Peter Zijlstra

locking/rwsem: Make handoff bit handling more consistent

There are some inconsistency in the way that the handoff bit is being
handled in readers and writers that lead to a race condition.

Firstly, when a queue head writer set the handoff bit, it will clear
it when the writer is being killed or interrupted on its way out
without acquiring the lock. That is not the case for a queue head
reader. The handoff bit will simply be inherited by the next waiter.

Secondly, in the out_nolock path of rwsem_down_read_slowpath(), both
the waiter and handoff bits are cleared if the wait queue becomes
empty.  For rwsem_down_write_slowpath(), however, the handoff bit is
not checked and cleared if the wait queue is empty. This can
potentially make the handoff bit set with empty wait queue.

Worse, the situation in rwsem_down_write_slowpath() relies on wstate,
a variable set outside of the critical section containing the ->count
manipulation, this leads to race condition where RWSEM_FLAG_HANDOFF
can be double subtracted, corrupting ->count.

To make the handoff bit handling more consistent and robust, extract
out handoff bit clearing code into the new rwsem_del_waiter() helper
function. Also, completely eradicate wstate; always evaluate
everything inside the same critical section.

The common function will only use atomic_long_andnot() to clear bits
when the wait queue is empty to avoid possible race condition.  If the
first waiter with handoff bit set is killed or interrupted to exit the
slowpath without acquiring the lock, the next waiter will inherit the
handoff bit.

While at it, simplify the trylock for loop in
rwsem_down_write_slowpath() to make it easier to read.

Fixes: 4f23dbc1 ("locking/rwsem: Implement lock handoff to prevent lock starvation")
Reported-by: default avatarZhenhua Ma <mazhenhua@xiaomi.com>
Suggested-by: default avatarPeter Zijlstra <peterz@infradead.org>
Signed-off-by: default avatarWaiman Long <longman@redhat.com>
Signed-off-by: default avatarPeter Zijlstra (Intel) <peterz@infradead.org>
Link: https://lkml.kernel.org/r/20211116012912.723980-1-longman@redhat.com
parent 13605725
...@@ -105,9 +105,9 @@ ...@@ -105,9 +105,9 @@
* atomic_long_cmpxchg() will be used to obtain writer lock. * atomic_long_cmpxchg() will be used to obtain writer lock.
* *
* There are three places where the lock handoff bit may be set or cleared. * There are three places where the lock handoff bit may be set or cleared.
* 1) rwsem_mark_wake() for readers. * 1) rwsem_mark_wake() for readers -- set, clear
* 2) rwsem_try_write_lock() for writers. * 2) rwsem_try_write_lock() for writers -- set, clear
* 3) Error path of rwsem_down_write_slowpath(). * 3) rwsem_del_waiter() -- clear
* *
* For all the above cases, wait_lock will be held. A writer must also * For all the above cases, wait_lock will be held. A writer must also
* be the first one in the wait_list to be eligible for setting the handoff * be the first one in the wait_list to be eligible for setting the handoff
...@@ -334,6 +334,9 @@ struct rwsem_waiter { ...@@ -334,6 +334,9 @@ struct rwsem_waiter {
struct task_struct *task; struct task_struct *task;
enum rwsem_waiter_type type; enum rwsem_waiter_type type;
unsigned long timeout; unsigned long timeout;
/* Writer only, not initialized in reader */
bool handoff_set;
}; };
#define rwsem_first_waiter(sem) \ #define rwsem_first_waiter(sem) \
list_first_entry(&sem->wait_list, struct rwsem_waiter, list) list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
...@@ -344,12 +347,6 @@ enum rwsem_wake_type { ...@@ -344,12 +347,6 @@ enum rwsem_wake_type {
RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */ RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */
}; };
enum writer_wait_state {
WRITER_NOT_FIRST, /* Writer is not first in wait list */
WRITER_FIRST, /* Writer is first in wait list */
WRITER_HANDOFF /* Writer is first & handoff needed */
};
/* /*
* The typical HZ value is either 250 or 1000. So set the minimum waiting * The typical HZ value is either 250 or 1000. So set the minimum waiting
* time to at least 4ms or 1 jiffy (if it is higher than 4ms) in the wait * time to at least 4ms or 1 jiffy (if it is higher than 4ms) in the wait
...@@ -365,6 +362,31 @@ enum writer_wait_state { ...@@ -365,6 +362,31 @@ enum writer_wait_state {
*/ */
#define MAX_READERS_WAKEUP 0x100 #define MAX_READERS_WAKEUP 0x100
static inline void
rwsem_add_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
{
lockdep_assert_held(&sem->wait_lock);
list_add_tail(&waiter->list, &sem->wait_list);
/* caller will set RWSEM_FLAG_WAITERS */
}
/*
* Remove a waiter from the wait_list and clear flags.
*
* Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of
* this function. Modify with care.
*/
static inline void
rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
{
lockdep_assert_held(&sem->wait_lock);
list_del(&waiter->list);
if (likely(!list_empty(&sem->wait_list)))
return;
atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS, &sem->count);
}
/* /*
* handle the lock release when processes blocked on it that can now run * handle the lock release when processes blocked on it that can now run
* - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
...@@ -376,6 +398,8 @@ enum writer_wait_state { ...@@ -376,6 +398,8 @@ enum writer_wait_state {
* preferably when the wait_lock is released * preferably when the wait_lock is released
* - woken process blocks are discarded from the list after having task zeroed * - woken process blocks are discarded from the list after having task zeroed
* - writers are only marked woken if downgrading is false * - writers are only marked woken if downgrading is false
*
* Implies rwsem_del_waiter() for all woken readers.
*/ */
static void rwsem_mark_wake(struct rw_semaphore *sem, static void rwsem_mark_wake(struct rw_semaphore *sem,
enum rwsem_wake_type wake_type, enum rwsem_wake_type wake_type,
...@@ -490,17 +514,24 @@ static void rwsem_mark_wake(struct rw_semaphore *sem, ...@@ -490,17 +514,24 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
adjustment = woken * RWSEM_READER_BIAS - adjustment; adjustment = woken * RWSEM_READER_BIAS - adjustment;
lockevent_cond_inc(rwsem_wake_reader, woken); lockevent_cond_inc(rwsem_wake_reader, woken);
oldcount = atomic_long_read(&sem->count);
if (list_empty(&sem->wait_list)) { if (list_empty(&sem->wait_list)) {
/* hit end of list above */ /*
* Combined with list_move_tail() above, this implies
* rwsem_del_waiter().
*/
adjustment -= RWSEM_FLAG_WAITERS; adjustment -= RWSEM_FLAG_WAITERS;
} if (oldcount & RWSEM_FLAG_HANDOFF)
adjustment -= RWSEM_FLAG_HANDOFF;
} else if (woken) {
/* /*
* When we've woken a reader, we no longer need to force writers * When we've woken a reader, we no longer need to force
* to give up the lock and we can clear HANDOFF. * writers to give up the lock and we can clear HANDOFF.
*/ */
if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF)) if (oldcount & RWSEM_FLAG_HANDOFF)
adjustment -= RWSEM_FLAG_HANDOFF; adjustment -= RWSEM_FLAG_HANDOFF;
}
if (adjustment) if (adjustment)
atomic_long_add(adjustment, &sem->count); atomic_long_add(adjustment, &sem->count);
...@@ -532,12 +563,12 @@ static void rwsem_mark_wake(struct rw_semaphore *sem, ...@@ -532,12 +563,12 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
* race conditions between checking the rwsem wait list and setting the * race conditions between checking the rwsem wait list and setting the
* sem->count accordingly. * sem->count accordingly.
* *
* If wstate is WRITER_HANDOFF, it will make sure that either the handoff * Implies rwsem_del_waiter() on success.
* bit is set or the lock is acquired with handoff bit cleared.
*/ */
static inline bool rwsem_try_write_lock(struct rw_semaphore *sem, static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
enum writer_wait_state wstate) struct rwsem_waiter *waiter)
{ {
bool first = rwsem_first_waiter(sem) == waiter;
long count, new; long count, new;
lockdep_assert_held(&sem->wait_lock); lockdep_assert_held(&sem->wait_lock);
...@@ -546,13 +577,19 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem, ...@@ -546,13 +577,19 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
do { do {
bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF); bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
if (has_handoff && wstate == WRITER_NOT_FIRST) if (has_handoff) {
if (!first)
return false; return false;
/* First waiter inherits a previously set handoff bit */
waiter->handoff_set = true;
}
new = count; new = count;
if (count & RWSEM_LOCK_MASK) { if (count & RWSEM_LOCK_MASK) {
if (has_handoff || (wstate != WRITER_HANDOFF)) if (has_handoff || (!rt_task(waiter->task) &&
!time_after(jiffies, waiter->timeout)))
return false; return false;
new |= RWSEM_FLAG_HANDOFF; new |= RWSEM_FLAG_HANDOFF;
...@@ -569,9 +606,17 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem, ...@@ -569,9 +606,17 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
* We have either acquired the lock with handoff bit cleared or * We have either acquired the lock with handoff bit cleared or
* set the handoff bit. * set the handoff bit.
*/ */
if (new & RWSEM_FLAG_HANDOFF) if (new & RWSEM_FLAG_HANDOFF) {
waiter->handoff_set = true;
lockevent_inc(rwsem_wlock_handoff);
return false; return false;
}
/*
* Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
* success.
*/
list_del(&waiter->list);
rwsem_set_owner(sem); rwsem_set_owner(sem);
return true; return true;
} }
...@@ -956,7 +1001,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat ...@@ -956,7 +1001,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
} }
adjustment += RWSEM_FLAG_WAITERS; adjustment += RWSEM_FLAG_WAITERS;
} }
list_add_tail(&waiter.list, &sem->wait_list); rwsem_add_waiter(sem, &waiter);
/* we're now waiting on the lock, but no longer actively locking */ /* we're now waiting on the lock, but no longer actively locking */
count = atomic_long_add_return(adjustment, &sem->count); count = atomic_long_add_return(adjustment, &sem->count);
...@@ -1002,11 +1047,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat ...@@ -1002,11 +1047,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
return sem; return sem;
out_nolock: out_nolock:
list_del(&waiter.list); rwsem_del_waiter(sem, &waiter);
if (list_empty(&sem->wait_list)) {
atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
&sem->count);
}
raw_spin_unlock_irq(&sem->wait_lock); raw_spin_unlock_irq(&sem->wait_lock);
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock_fail); lockevent_inc(rwsem_rlock_fail);
...@@ -1020,9 +1061,7 @@ static struct rw_semaphore * ...@@ -1020,9 +1061,7 @@ static struct rw_semaphore *
rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
{ {
long count; long count;
enum writer_wait_state wstate;
struct rwsem_waiter waiter; struct rwsem_waiter waiter;
struct rw_semaphore *ret = sem;
DEFINE_WAKE_Q(wake_q); DEFINE_WAKE_Q(wake_q);
/* do optimistic spinning and steal lock if possible */ /* do optimistic spinning and steal lock if possible */
...@@ -1038,16 +1077,13 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) ...@@ -1038,16 +1077,13 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
waiter.task = current; waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE; waiter.type = RWSEM_WAITING_FOR_WRITE;
waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT; waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
waiter.handoff_set = false;
raw_spin_lock_irq(&sem->wait_lock); raw_spin_lock_irq(&sem->wait_lock);
rwsem_add_waiter(sem, &waiter);
/* account for this before adding a new element to the list */
wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;
list_add_tail(&waiter.list, &sem->wait_list);
/* we're now waiting on the lock */ /* we're now waiting on the lock */
if (wstate == WRITER_NOT_FIRST) { if (rwsem_first_waiter(sem) != &waiter) {
count = atomic_long_read(&sem->count); count = atomic_long_read(&sem->count);
/* /*
...@@ -1083,13 +1119,16 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) ...@@ -1083,13 +1119,16 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
/* wait until we successfully acquire the lock */ /* wait until we successfully acquire the lock */
set_current_state(state); set_current_state(state);
for (;;) { for (;;) {
if (rwsem_try_write_lock(sem, wstate)) { if (rwsem_try_write_lock(sem, &waiter)) {
/* rwsem_try_write_lock() implies ACQUIRE on success */ /* rwsem_try_write_lock() implies ACQUIRE on success */
break; break;
} }
raw_spin_unlock_irq(&sem->wait_lock); raw_spin_unlock_irq(&sem->wait_lock);
if (signal_pending_state(state, current))
goto out_nolock;
/* /*
* After setting the handoff bit and failing to acquire * After setting the handoff bit and failing to acquire
* the lock, attempt to spin on owner to accelerate lock * the lock, attempt to spin on owner to accelerate lock
...@@ -1098,7 +1137,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) ...@@ -1098,7 +1137,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
* In this case, we attempt to acquire the lock again * In this case, we attempt to acquire the lock again
* without sleeping. * without sleeping.
*/ */
if (wstate == WRITER_HANDOFF) { if (waiter.handoff_set) {
enum owner_state owner_state; enum owner_state owner_state;
preempt_disable(); preempt_disable();
...@@ -1109,66 +1148,26 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) ...@@ -1109,66 +1148,26 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
goto trylock_again; goto trylock_again;
} }
/* Block until there are no active lockers. */
for (;;) {
if (signal_pending_state(state, current))
goto out_nolock;
schedule(); schedule();
lockevent_inc(rwsem_sleep_writer); lockevent_inc(rwsem_sleep_writer);
set_current_state(state); set_current_state(state);
/*
* If HANDOFF bit is set, unconditionally do
* a trylock.
*/
if (wstate == WRITER_HANDOFF)
break;
if ((wstate == WRITER_NOT_FIRST) &&
(rwsem_first_waiter(sem) == &waiter))
wstate = WRITER_FIRST;
count = atomic_long_read(&sem->count);
if (!(count & RWSEM_LOCK_MASK))
break;
/*
* The setting of the handoff bit is deferred
* until rwsem_try_write_lock() is called.
*/
if ((wstate == WRITER_FIRST) && (rt_task(current) ||
time_after(jiffies, waiter.timeout))) {
wstate = WRITER_HANDOFF;
lockevent_inc(rwsem_wlock_handoff);
break;
}
}
trylock_again: trylock_again:
raw_spin_lock_irq(&sem->wait_lock); raw_spin_lock_irq(&sem->wait_lock);
} }
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
list_del(&waiter.list);
raw_spin_unlock_irq(&sem->wait_lock); raw_spin_unlock_irq(&sem->wait_lock);
lockevent_inc(rwsem_wlock); lockevent_inc(rwsem_wlock);
return sem;
return ret;
out_nolock: out_nolock:
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
raw_spin_lock_irq(&sem->wait_lock); raw_spin_lock_irq(&sem->wait_lock);
list_del(&waiter.list); rwsem_del_waiter(sem, &waiter);
if (!list_empty(&sem->wait_list))
if (unlikely(wstate == WRITER_HANDOFF))
atomic_long_add(-RWSEM_FLAG_HANDOFF, &sem->count);
if (list_empty(&sem->wait_list))
atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
else
rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock); raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q); wake_up_q(&wake_q);
lockevent_inc(rwsem_wlock_fail); lockevent_inc(rwsem_wlock_fail);
return ERR_PTR(-EINTR); return ERR_PTR(-EINTR);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment