Commit 55e8b1a1 authored by Andrew Morton's avatar Andrew Morton Committed by Linus Torvalds

[PATCH] lockless semop

From: Manfred Spraul <manfred@colorfullife.com>

attached is the lockless semop patch. I did another test run with 
idle=poll on an pentium III, and it remained unchanged: 99.9% direct 
fast path, 0.1% race with wakeup against writing the final result code:

http://khack.osdl.org/stp/282936/environment/proc/slabinfo

That means there is no immediate need to add the two-stage
implementation to finish_wait.

It reduces the spinlock operations on the semaphore array spinlock by 1/3.
parent 1af764e1
...@@ -59,6 +59,8 @@ ...@@ -59,6 +59,8 @@
* (c) 1999 Manfred Spraul <manfreds@colorfullife.com> * (c) 1999 Manfred Spraul <manfreds@colorfullife.com>
* Enforced range limit on SEM_UNDO * Enforced range limit on SEM_UNDO
* (c) 2001 Red Hat Inc <alan@redhat.com> * (c) 2001 Red Hat Inc <alan@redhat.com>
* Lockless wakeup
* (c) 2003 Manfred Spraul <manfred@colorfullife.com>
*/ */
#include <linux/config.h> #include <linux/config.h>
...@@ -118,6 +120,40 @@ void __init sem_init (void) ...@@ -118,6 +120,40 @@ void __init sem_init (void)
#endif #endif
} }
/*
* Lockless wakeup algorithm:
* Without the check/retry algorithm a lockless wakeup is possible:
* - queue.status is initialized to -EINTR before blocking.
* - wakeup is performed by
* * unlinking the queue entry from sma->sem_pending
* * setting queue.status to IN_WAKEUP
* This is the notification for the blocked thread that a
* result value is imminent.
* * call wake_up_process
* * set queue.status to the final value.
* - the previously blocked thread checks queue.status:
* * if it's IN_WAKEUP, then it must wait until the value changes
* * if it's not -EINTR, then the operation was completed by
* update_queue. semtimedop can return queue.status without
* performing any operation on the semaphore array.
* * otherwise it must acquire the spinlock and check what's up.
*
* The two-stage algorithm is necessary to protect against the following
* races:
* - if queue.status is set after wake_up_process, then the woken up idle
* thread could race forward and try (and fail) to acquire sma->lock
* before update_queue had a chance to set queue.status
* - if queue.status is written before wake_up_process and if the
* blocked process is woken up by a signal between writing
* queue.status and the wake_up_process, then the woken up
* process could return from semtimedop and die by calling
* sys_exit before wake_up_process is called. Then wake_up_process
* will oops, because the task structure is already invalid.
* (yes, this happened on s390 with sysv msg).
*
*/
#define IN_WAKEUP 1
static int newary (key_t key, int nsems, int semflg) static int newary (key_t key, int nsems, int semflg)
{ {
int id; int id;
...@@ -331,16 +367,25 @@ static void update_queue (struct sem_array * sma) ...@@ -331,16 +367,25 @@ static void update_queue (struct sem_array * sma)
int error; int error;
struct sem_queue * q; struct sem_queue * q;
for (q = sma->sem_pending; q; q = q->next) { q = sma->sem_pending;
while(q) {
error = try_atomic_semop(sma, q->sops, q->nsops, error = try_atomic_semop(sma, q->sops, q->nsops,
q->undo, q->pid); q->undo, q->pid);
/* Does q->sleeper still need to sleep? */ /* Does q->sleeper still need to sleep? */
if (error <= 0) { if (error <= 0) {
q->status = error; struct sem_queue *n;
remove_from_queue(sma,q); remove_from_queue(sma,q);
n = q->next;
q->status = IN_WAKEUP;
wake_up_process(q->sleeper); wake_up_process(q->sleeper);
/* hands-off: q will disappear immediately after
* writing q->status.
*/
q->status = error;
q = n;
} else {
q = q->next;
} }
} }
} }
...@@ -409,10 +454,16 @@ static void freeary (struct sem_array *sma, int id) ...@@ -409,10 +454,16 @@ static void freeary (struct sem_array *sma, int id)
un->semid = -1; un->semid = -1;
/* Wake up all pending processes and let them fail with EIDRM. */ /* Wake up all pending processes and let them fail with EIDRM. */
for (q = sma->sem_pending; q; q = q->next) { q = sma->sem_pending;
q->status = -EIDRM; while(q) {
struct sem_queue *n;
/* lazy remove_from_queue: we are killing the whole queue */
q->prev = NULL; q->prev = NULL;
n = q->next;
q->status = IN_WAKEUP;
wake_up_process(q->sleeper); /* doesn't sleep */ wake_up_process(q->sleeper); /* doesn't sleep */
q->status = -EIDRM; /* hands-off q */
q = n;
} }
/* Remove the semaphore set from the ID array*/ /* Remove the semaphore set from the ID array*/
...@@ -1083,6 +1134,18 @@ asmlinkage long sys_semtimedop(int semid, struct sembuf __user *tsops, ...@@ -1083,6 +1134,18 @@ asmlinkage long sys_semtimedop(int semid, struct sembuf __user *tsops,
else else
schedule(); schedule();
error = queue.status;
while(unlikely(error == IN_WAKEUP)) {
cpu_relax();
error = queue.status;
}
if (error != -EINTR) {
/* fast path: update_queue already obtained all requested
* resources */
goto out_free;
}
sma = sem_lock(semid); sma = sem_lock(semid);
if(sma==NULL) { if(sma==NULL) {
if(queue.prev != NULL) if(queue.prev != NULL)
...@@ -1095,7 +1158,7 @@ asmlinkage long sys_semtimedop(int semid, struct sembuf __user *tsops, ...@@ -1095,7 +1158,7 @@ asmlinkage long sys_semtimedop(int semid, struct sembuf __user *tsops,
* If queue.status != -EINTR we are woken up by another process * If queue.status != -EINTR we are woken up by another process
*/ */
error = queue.status; error = queue.status;
if (queue.status != -EINTR) { if (error != -EINTR) {
goto out_unlock_free; goto out_unlock_free;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment