Commit fa6004ad authored by Davidlohr Bueso's avatar Davidlohr Bueso Committed by Ingo Molnar

ipc/mqueue: Implement lockless pipelined wakeups

This patch moves the wakeup_process() invocation so it is not done under
the info->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once it is STATE_READY and it does not need to loop
on SMP if it is still in STATE_PENDING. In the timeout case we still need
to grab the info->lock to verify the state.

This change should also avoid the introduction of preempt_disable() in -rt
which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY
change if the waiter has a higher priority compared to the waker.

Additionally, this patch micro-optimizes wq_sleep by using the cheaper
cousin of set_current_state(TASK_INTERRUPTABLE) as we will block no
matter what, thus get rid of the implied barrier.
Signed-off-by: default avatarDavidlohr Bueso <dbueso@suse.de>
Signed-off-by: default avatarPeter Zijlstra (Intel) <peterz@infradead.org>
Acked-by: default avatarGeorge Spelvin <linux@horizon.com>
Acked-by: default avatarThomas Gleixner <tglx@linutronix.de>
Cc: Andrew Morton <akpm@linux-foundation.org>
Cc: Borislav Petkov <bp@alien8.de>
Cc: Chris Mason <clm@fb.com>
Cc: H. Peter Anvin <hpa@zytor.com>
Cc: Linus Torvalds <torvalds@linux-foundation.org>
Cc: Manfred Spraul <manfred@colorfullife.com>
Cc: Peter Zijlstra <peterz@infradead.org>
Cc: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
Cc: Steven Rostedt <rostedt@goodmis.org>
Cc: dave@stgolabs.net
Link: http://lkml.kernel.org/r/1430748166.1940.17.camel@stgolabs.netSigned-off-by: default avatarIngo Molnar <mingo@kernel.org>
parent 1d0dcb3a
...@@ -47,8 +47,7 @@ ...@@ -47,8 +47,7 @@
#define RECV 1 #define RECV 1
#define STATE_NONE 0 #define STATE_NONE 0
#define STATE_PENDING 1 #define STATE_READY 1
#define STATE_READY 2
struct posix_msg_tree_node { struct posix_msg_tree_node {
struct rb_node rb_node; struct rb_node rb_node;
...@@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr, ...@@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr,
wq_add(info, sr, ewp); wq_add(info, sr, ewp);
for (;;) { for (;;) {
set_current_state(TASK_INTERRUPTIBLE); __set_current_state(TASK_INTERRUPTIBLE);
spin_unlock(&info->lock); spin_unlock(&info->lock);
time = schedule_hrtimeout_range_clock(timeout, 0, time = schedule_hrtimeout_range_clock(timeout, 0,
HRTIMER_MODE_ABS, CLOCK_REALTIME); HRTIMER_MODE_ABS, CLOCK_REALTIME);
while (ewp->state == STATE_PENDING)
cpu_relax();
if (ewp->state == STATE_READY) { if (ewp->state == STATE_READY) {
retval = 0; retval = 0;
goto out; goto out;
...@@ -907,11 +903,15 @@ SYSCALL_DEFINE1(mq_unlink, const char __user *, u_name) ...@@ -907,11 +903,15 @@ SYSCALL_DEFINE1(mq_unlink, const char __user *, u_name)
* list of waiting receivers. A sender checks that list before adding the new * list of waiting receivers. A sender checks that list before adding the new
* message into the message array. If there is a waiting receiver, then it * message into the message array. If there is a waiting receiver, then it
* bypasses the message array and directly hands the message over to the * bypasses the message array and directly hands the message over to the
* receiver. * receiver. The receiver accepts the message and returns without grabbing the
* The receiver accepts the message and returns without grabbing the queue * queue spinlock:
* spinlock. Therefore an intermediate STATE_PENDING state and memory barriers *
* are necessary. The same algorithm is used for sysv semaphores, see * - Set pointer to message.
* ipc/sem.c for more details. * - Queue the receiver task for later wakeup (without the info->lock).
* - Update its state to STATE_READY. Now the receiver can continue.
* - Wake up the process after the lock is dropped. Should the process wake up
* before this wakeup (due to a timeout or a signal) it will either see
* STATE_READY and continue or acquire the lock to check the state again.
* *
* The same algorithm is used for senders. * The same algorithm is used for senders.
*/ */
...@@ -919,21 +919,29 @@ SYSCALL_DEFINE1(mq_unlink, const char __user *, u_name) ...@@ -919,21 +919,29 @@ SYSCALL_DEFINE1(mq_unlink, const char __user *, u_name)
/* pipelined_send() - send a message directly to the task waiting in /* pipelined_send() - send a message directly to the task waiting in
* sys_mq_timedreceive() (without inserting message into a queue). * sys_mq_timedreceive() (without inserting message into a queue).
*/ */
static inline void pipelined_send(struct mqueue_inode_info *info, static inline void pipelined_send(struct wake_q_head *wake_q,
struct mqueue_inode_info *info,
struct msg_msg *message, struct msg_msg *message,
struct ext_wait_queue *receiver) struct ext_wait_queue *receiver)
{ {
receiver->msg = message; receiver->msg = message;
list_del(&receiver->list); list_del(&receiver->list);
receiver->state = STATE_PENDING; wake_q_add(wake_q, receiver->task);
wake_up_process(receiver->task); /*
smp_wmb(); * Rely on the implicit cmpxchg barrier from wake_q_add such
* that we can ensure that updating receiver->state is the last
* write operation: As once set, the receiver can continue,
* and if we don't have the reference count from the wake_q,
* yet, at that point we can later have a use-after-free
* condition and bogus wakeup.
*/
receiver->state = STATE_READY; receiver->state = STATE_READY;
} }
/* pipelined_receive() - if there is task waiting in sys_mq_timedsend() /* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
* gets its message and put to the queue (we have one free place for sure). */ * gets its message and put to the queue (we have one free place for sure). */
static inline void pipelined_receive(struct mqueue_inode_info *info) static inline void pipelined_receive(struct wake_q_head *wake_q,
struct mqueue_inode_info *info)
{ {
struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND); struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);
...@@ -944,10 +952,9 @@ static inline void pipelined_receive(struct mqueue_inode_info *info) ...@@ -944,10 +952,9 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)
} }
if (msg_insert(sender->msg, info)) if (msg_insert(sender->msg, info))
return; return;
list_del(&sender->list); list_del(&sender->list);
sender->state = STATE_PENDING; wake_q_add(wake_q, sender->task);
wake_up_process(sender->task);
smp_wmb();
sender->state = STATE_READY; sender->state = STATE_READY;
} }
...@@ -965,6 +972,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, ...@@ -965,6 +972,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
struct timespec ts; struct timespec ts;
struct posix_msg_tree_node *new_leaf = NULL; struct posix_msg_tree_node *new_leaf = NULL;
int ret = 0; int ret = 0;
WAKE_Q(wake_q);
if (u_abs_timeout) { if (u_abs_timeout) {
int res = prepare_timeout(u_abs_timeout, &expires, &ts); int res = prepare_timeout(u_abs_timeout, &expires, &ts);
...@@ -1049,7 +1057,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, ...@@ -1049,7 +1057,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
} else { } else {
receiver = wq_get_first_waiter(info, RECV); receiver = wq_get_first_waiter(info, RECV);
if (receiver) { if (receiver) {
pipelined_send(info, msg_ptr, receiver); pipelined_send(&wake_q, info, msg_ptr, receiver);
} else { } else {
/* adds message to the queue */ /* adds message to the queue */
ret = msg_insert(msg_ptr, info); ret = msg_insert(msg_ptr, info);
...@@ -1062,6 +1070,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr, ...@@ -1062,6 +1070,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
} }
out_unlock: out_unlock:
spin_unlock(&info->lock); spin_unlock(&info->lock);
wake_up_q(&wake_q);
out_free: out_free:
if (ret) if (ret)
free_msg(msg_ptr); free_msg(msg_ptr);
...@@ -1149,14 +1158,17 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr, ...@@ -1149,14 +1158,17 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
msg_ptr = wait.msg; msg_ptr = wait.msg;
} }
} else { } else {
WAKE_Q(wake_q);
msg_ptr = msg_get(info); msg_ptr = msg_get(info);
inode->i_atime = inode->i_mtime = inode->i_ctime = inode->i_atime = inode->i_mtime = inode->i_ctime =
CURRENT_TIME; CURRENT_TIME;
/* There is now free space in queue. */ /* There is now free space in queue. */
pipelined_receive(info); pipelined_receive(&wake_q, info);
spin_unlock(&info->lock); spin_unlock(&info->lock);
wake_up_q(&wake_q);
ret = 0; ret = 0;
} }
if (ret == 0) { if (ret == 0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment