Commit ff35e5ef authored by Davidlohr Bueso's avatar Davidlohr Bueso Committed by Linus Torvalds

ipc,msg: provide barrier pairings for lockless receive

We currently use a full barrier on the sender side to to avoid receiver
tasks disappearing on us while still performing on the sender side wakeup.
 We lack however, the proper CPU-CPU interactions pairing on the receiver
side which busy-waits for the message.  Similarly, we do not need a full
smp_mb, and can relax the semantics for the writer and reader sides of the
message.  This is safe as we are only ordering loads and stores to r_msg.
And in both smp_wmb and smp_rmb, there are no stores after the calls
_anyway_.

This obviously applies for pipelined_send and expunge_all, for EIRDM when
destroying a queue.
Signed-off-by: default avatarDavidlohr Bueso <dbueso@suse.de>
Cc: Manfred Spraul <manfred@colorfullife.com>
Signed-off-by: default avatarAndrew Morton <akpm@linux-foundation.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@linux-foundation.org>
parent c5c8975b
...@@ -196,7 +196,7 @@ static void expunge_all(struct msg_queue *msq, int res) ...@@ -196,7 +196,7 @@ static void expunge_all(struct msg_queue *msq, int res)
* or dealing with -EAGAIN cases. See lockless receive part 1 * or dealing with -EAGAIN cases. See lockless receive part 1
* and 2 in do_msgrcv(). * and 2 in do_msgrcv().
*/ */
smp_mb(); smp_wmb(); /* barrier (B) */
msr->r_msg = ERR_PTR(res); msr->r_msg = ERR_PTR(res);
} }
} }
...@@ -580,7 +580,8 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) ...@@ -580,7 +580,8 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
/* initialize pipelined send ordering */ /* initialize pipelined send ordering */
msr->r_msg = NULL; msr->r_msg = NULL;
wake_up_process(msr->r_tsk); wake_up_process(msr->r_tsk);
smp_mb(); /* see barrier comment below */ /* barrier (B) see barrier comment below */
smp_wmb();
msr->r_msg = ERR_PTR(-E2BIG); msr->r_msg = ERR_PTR(-E2BIG);
} else { } else {
msr->r_msg = NULL; msr->r_msg = NULL;
...@@ -589,11 +590,12 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) ...@@ -589,11 +590,12 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
wake_up_process(msr->r_tsk); wake_up_process(msr->r_tsk);
/* /*
* Ensure that the wakeup is visible before * Ensure that the wakeup is visible before
* setting r_msg, as the receiving end depends * setting r_msg, as the receiving can otherwise
* on it. See lockless receive part 1 and 2 in * exit - once r_msg is set, the receiver can
* do_msgrcv(). * continue. See lockless receive part 1 and 2
* in do_msgrcv(). Barrier (B).
*/ */
smp_mb(); smp_wmb();
msr->r_msg = msg; msr->r_msg = msg;
return 1; return 1;
...@@ -932,12 +934,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl ...@@ -932,12 +934,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
/* Lockless receive, part 2: /* Lockless receive, part 2:
* Wait until pipelined_send or expunge_all are outside of * Wait until pipelined_send or expunge_all are outside of
* wake_up_process(). There is a race with exit(), see * wake_up_process(). There is a race with exit(), see
* ipc/mqueue.c for the details. * ipc/mqueue.c for the details. The correct serialization
* ensures that a receiver cannot continue without the wakeup
* being visibible _before_ setting r_msg:
*
* CPU 0 CPU 1
* <loop receiver>
* smp_rmb(); (A) <-- pair -. <waker thread>
* <load ->r_msg> | msr->r_msg = NULL;
* | wake_up_process();
* <continue> `------> smp_wmb(); (B)
* msr->r_msg = msg;
*
* Where (A) orders the message value read and where (B) orders
* the write to the r_msg -- done in both pipelined_send and
* expunge_all.
*/ */
msg = (struct msg_msg *)msr_d.r_msg; for (;;) {
while (msg == NULL) { /*
cpu_relax(); * Pairs with writer barrier in pipelined_send
* or expunge_all.
*/
smp_rmb(); /* barrier (A) */
msg = (struct msg_msg *)msr_d.r_msg; msg = (struct msg_msg *)msr_d.r_msg;
if (msg)
break;
/*
* The cpu_relax() call is a compiler barrier
* which forces everything in this loop to be
* re-loaded.
*/
cpu_relax();
} }
/* Lockless receive, part 3: /* Lockless receive, part 3:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment