Commit 5864a2fd authored by Manfred Spraul's avatar Manfred Spraul Committed by Linus Torvalds

ipc/sem.c: fix complex_count vs. simple op race

Commit 6d07b68c ("ipc/sem.c: optimize sem_lock()") introduced a
race:

sem_lock has a fast path that allows parallel simple operations.
There are two reasons why a simple operation cannot run in parallel:
 - a non-simple operations is ongoing (sma->sem_perm.lock held)
 - a complex operation is sleeping (sma->complex_count != 0)

As both facts are stored independently, a thread can bypass the current
checks by sleeping in the right positions.  See below for more details
(or kernel bugzilla 105651).

The patch fixes that by creating one variable (complex_mode)
that tracks both reasons why parallel operations are not possible.

The patch also updates stale documentation regarding the locking.

With regards to stable kernels:
The patch is required for all kernels that include the
commit 6d07b68c ("ipc/sem.c: optimize sem_lock()") (3.10?)

The alternative is to revert the patch that introduced the race.

The patch is safe for backporting, i.e. it makes no assumptions
about memory barriers in spin_unlock_wait().

Background:
Here is the race of the current implementation:

Thread A: (simple op)
- does the first "sma->complex_count == 0" test

Thread B: (complex op)
- does sem_lock(): This includes an array scan. But the scan can't
  find Thread A, because Thread A does not own sem->lock yet.
- the thread does the operation, increases complex_count,
  drops sem_lock, sleeps

Thread A:
- spin_lock(&sem->lock), spin_is_locked(sma->sem_perm.lock)
- sleeps before the complex_count test

Thread C: (complex op)
- does sem_lock (no array scan, complex_count==1)
- wakes up Thread B.
- decrements complex_count

Thread A:
- does the complex_count test

Bug:
Now both thread A and thread C operate on the same array, without
any synchronization.

Fixes: 6d07b68c ("ipc/sem.c: optimize sem_lock()")
Link: http://lkml.kernel.org/r/1469123695-5661-1-git-send-email-manfred@colorfullife.com
Reported-by: <felixh@informatik.uni-bremen.de>
Cc: "H. Peter Anvin" <hpa@zytor.com>
Cc: Peter Zijlstra <peterz@infradead.org>
Cc: Davidlohr Bueso <dave@stgolabs.net>
Cc: Thomas Gleixner <tglx@linutronix.de>
Cc: Ingo Molnar <mingo@elte.hu>
Cc: <1vier1@web.de>
Cc: <stable@vger.kernel.org>	[3.10+]
Signed-off-by: default avatarAndrew Morton <akpm@linux-foundation.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@linux-foundation.org>
parent 65deb8af
...@@ -21,6 +21,7 @@ struct sem_array { ...@@ -21,6 +21,7 @@ struct sem_array {
struct list_head list_id; /* undo requests on this array */ struct list_head list_id; /* undo requests on this array */
int sem_nsems; /* no. of semaphores in array */ int sem_nsems; /* no. of semaphores in array */
int complex_count; /* pending complex operations */ int complex_count; /* pending complex operations */
bool complex_mode; /* no parallel simple ops */
}; };
#ifdef CONFIG_SYSVIPC #ifdef CONFIG_SYSVIPC
......
...@@ -162,14 +162,21 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it); ...@@ -162,14 +162,21 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it);
/* /*
* Locking: * Locking:
* a) global sem_lock() for read/write
* sem_undo.id_next, * sem_undo.id_next,
* sem_array.complex_count, * sem_array.complex_count,
* sem_array.pending{_alter,_cont}, * sem_array.complex_mode
* sem_array.sem_undo: global sem_lock() for read/write * sem_array.pending{_alter,_const},
* sem_undo.proc_next: only "current" is allowed to read/write that field. * sem_array.sem_undo
* *
* b) global or semaphore sem_lock() for read/write:
* sem_array.sem_base[i].pending_{const,alter}: * sem_array.sem_base[i].pending_{const,alter}:
* global or semaphore sem_lock() for read/write * sem_array.complex_mode (for read)
*
* c) special:
* sem_undo_list.list_proc:
* * undo_list->lock for write
* * rcu for read
*/ */
#define sc_semmsl sem_ctls[0] #define sc_semmsl sem_ctls[0]
...@@ -260,30 +267,61 @@ static void sem_rcu_free(struct rcu_head *head) ...@@ -260,30 +267,61 @@ static void sem_rcu_free(struct rcu_head *head)
} }
/* /*
* Wait until all currently ongoing simple ops have completed. * Enter the mode suitable for non-simple operations:
* Caller must own sem_perm.lock. * Caller must own sem_perm.lock.
* New simple ops cannot start, because simple ops first check
* that sem_perm.lock is free.
* that a) sem_perm.lock is free and b) complex_count is 0.
*/ */
static void sem_wait_array(struct sem_array *sma) static void complexmode_enter(struct sem_array *sma)
{ {
int i; int i;
struct sem *sem; struct sem *sem;
if (sma->complex_count) { if (sma->complex_mode) {
/* The thread that increased sma->complex_count waited on /* We are already in complex_mode. Nothing to do */
* all sem->lock locks. Thus we don't need to wait again.
*/
return; return;
} }
/* We need a full barrier after seting complex_mode:
* The write to complex_mode must be visible
* before we read the first sem->lock spinlock state.
*/
smp_store_mb(sma->complex_mode, true);
for (i = 0; i < sma->sem_nsems; i++) { for (i = 0; i < sma->sem_nsems; i++) {
sem = sma->sem_base + i; sem = sma->sem_base + i;
spin_unlock_wait(&sem->lock); spin_unlock_wait(&sem->lock);
} }
/*
* spin_unlock_wait() is not a memory barriers, it is only a
* control barrier. The code must pair with spin_unlock(&sem->lock),
* thus just the control barrier is insufficient.
*
* smp_rmb() is sufficient, as writes cannot pass the control barrier.
*/
smp_rmb();
} }
/*
* Try to leave the mode that disallows simple operations:
* Caller must own sem_perm.lock.
*/
static void complexmode_tryleave(struct sem_array *sma)
{
if (sma->complex_count) {
/* Complex ops are sleeping.
* We must stay in complex mode
*/
return;
}
/*
* Immediately after setting complex_mode to false,
* a simple op can start. Thus: all memory writes
* performed by the current operation must be visible
* before we set complex_mode to false.
*/
smp_store_release(&sma->complex_mode, false);
}
#define SEM_GLOBAL_LOCK (-1)
/* /*
* If the request contains only one semaphore operation, and there are * If the request contains only one semaphore operation, and there are
* no complex transactions pending, lock only the semaphore involved. * no complex transactions pending, lock only the semaphore involved.
...@@ -300,57 +338,43 @@ static inline int sem_lock(struct sem_array *sma, struct sembuf *sops, ...@@ -300,57 +338,43 @@ static inline int sem_lock(struct sem_array *sma, struct sembuf *sops,
/* Complex operation - acquire a full lock */ /* Complex operation - acquire a full lock */
ipc_lock_object(&sma->sem_perm); ipc_lock_object(&sma->sem_perm);
/* And wait until all simple ops that are processed /* Prevent parallel simple ops */
* right now have dropped their locks. complexmode_enter(sma);
*/ return SEM_GLOBAL_LOCK;
sem_wait_array(sma);
return -1;
} }
/* /*
* Only one semaphore affected - try to optimize locking. * Only one semaphore affected - try to optimize locking.
* The rules are: * Optimized locking is possible if no complex operation
* - optimized locking is possible if no complex operation
* is either enqueued or processed right now. * is either enqueued or processed right now.
* - The test for enqueued complex ops is simple: *
* sma->complex_count != 0 * Both facts are tracked by complex_mode.
* - Testing for complex ops that are processed right now is
* a bit more difficult. Complex ops acquire the full lock
* and first wait that the running simple ops have completed.
* (see above)
* Thus: If we own a simple lock and the global lock is free
* and complex_count is now 0, then it will stay 0 and
* thus just locking sem->lock is sufficient.
*/ */
sem = sma->sem_base + sops->sem_num; sem = sma->sem_base + sops->sem_num;
if (sma->complex_count == 0) { /*
* Initial check for complex_mode. Just an optimization,
* no locking, no memory barrier.
*/
if (!sma->complex_mode) {
/* /*
* It appears that no complex operation is around. * It appears that no complex operation is around.
* Acquire the per-semaphore lock. * Acquire the per-semaphore lock.
*/ */
spin_lock(&sem->lock); spin_lock(&sem->lock);
/* Then check that the global lock is free */
if (!spin_is_locked(&sma->sem_perm.lock)) {
/* /*
* We need a memory barrier with acquire semantics, * See 51d7d5205d33
* otherwise we can race with another thread that does: * ("powerpc: Add smp_mb() to arch_spin_is_locked()"):
* complex_count++; * A full barrier is required: the write of sem->lock
* spin_unlock(sem_perm.lock); * must be visible before the read is executed
*/ */
smp_acquire__after_ctrl_dep(); smp_mb();
/* if (!smp_load_acquire(&sma->complex_mode)) {
* Now repeat the test of complex_count:
* It can't change anymore until we drop sem->lock.
* Thus: if is now 0, then it will stay 0.
*/
if (sma->complex_count == 0) {
/* fast path successful! */ /* fast path successful! */
return sops->sem_num; return sops->sem_num;
} }
}
spin_unlock(&sem->lock); spin_unlock(&sem->lock);
} }
...@@ -369,15 +393,16 @@ static inline int sem_lock(struct sem_array *sma, struct sembuf *sops, ...@@ -369,15 +393,16 @@ static inline int sem_lock(struct sem_array *sma, struct sembuf *sops,
/* Not a false alarm, thus complete the sequence for a /* Not a false alarm, thus complete the sequence for a
* full lock. * full lock.
*/ */
sem_wait_array(sma); complexmode_enter(sma);
return -1; return SEM_GLOBAL_LOCK;
} }
} }
static inline void sem_unlock(struct sem_array *sma, int locknum) static inline void sem_unlock(struct sem_array *sma, int locknum)
{ {
if (locknum == -1) { if (locknum == SEM_GLOBAL_LOCK) {
unmerge_queues(sma); unmerge_queues(sma);
complexmode_tryleave(sma);
ipc_unlock_object(&sma->sem_perm); ipc_unlock_object(&sma->sem_perm);
} else { } else {
struct sem *sem = sma->sem_base + locknum; struct sem *sem = sma->sem_base + locknum;
...@@ -529,6 +554,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params) ...@@ -529,6 +554,7 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
} }
sma->complex_count = 0; sma->complex_count = 0;
sma->complex_mode = true; /* dropped by sem_unlock below */
INIT_LIST_HEAD(&sma->pending_alter); INIT_LIST_HEAD(&sma->pending_alter);
INIT_LIST_HEAD(&sma->pending_const); INIT_LIST_HEAD(&sma->pending_const);
INIT_LIST_HEAD(&sma->list_id); INIT_LIST_HEAD(&sma->list_id);
...@@ -2184,10 +2210,10 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it) ...@@ -2184,10 +2210,10 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it)
/* /*
* The proc interface isn't aware of sem_lock(), it calls * The proc interface isn't aware of sem_lock(), it calls
* ipc_lock_object() directly (in sysvipc_find_ipc). * ipc_lock_object() directly (in sysvipc_find_ipc).
* In order to stay compatible with sem_lock(), we must wait until * In order to stay compatible with sem_lock(), we must
* all simple semop() calls have left their critical regions. * enter / leave complex_mode.
*/ */
sem_wait_array(sma); complexmode_enter(sma);
sem_otime = get_semotime(sma); sem_otime = get_semotime(sma);
...@@ -2204,6 +2230,8 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it) ...@@ -2204,6 +2230,8 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it)
sem_otime, sem_otime,
sma->sem_ctime); sma->sem_ctime);
complexmode_tryleave(sma);
return 0; return 0;
} }
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment