Commit 41a0d523 authored by Davidlohr Bueso's avatar Davidlohr Bueso Committed by Linus Torvalds

ipc,msg: shorten critical region in msgrcv

do_msgrcv() is the last msg queue function that abuses the ipc lock Take
it only when needed when actually updating msq.
Signed-off-by: default avatarDavidlohr Bueso <davidlohr.bueso@hp.com>
Cc: Andi Kleen <andi@firstfloor.org>
Cc: Rik van Riel <riel@redhat.com>
Tested-by: default avatarSedat Dilek <sedat.dilek@gmail.com>
Signed-off-by: default avatarAndrew Morton <akpm@linux-foundation.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@linux-foundation.org>
parent 3dd1f784
...@@ -885,21 +885,19 @@ static struct msg_msg *find_msg(struct msg_queue *msq, long *msgtyp, int mode) ...@@ -885,21 +885,19 @@ static struct msg_msg *find_msg(struct msg_queue *msq, long *msgtyp, int mode)
return ERR_PTR(-EAGAIN); return ERR_PTR(-EAGAIN);
} }
long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgflg,
long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
int msgflg,
long (*msg_handler)(void __user *, struct msg_msg *, size_t)) long (*msg_handler)(void __user *, struct msg_msg *, size_t))
{ {
struct msg_queue *msq;
struct msg_msg *msg;
int mode; int mode;
struct msg_queue *msq;
struct ipc_namespace *ns; struct ipc_namespace *ns;
struct msg_msg *copy = NULL; struct msg_msg *msg, *copy = NULL;
ns = current->nsproxy->ipc_ns; ns = current->nsproxy->ipc_ns;
if (msqid < 0 || (long) bufsz < 0) if (msqid < 0 || (long) bufsz < 0)
return -EINVAL; return -EINVAL;
if (msgflg & MSG_COPY) { if (msgflg & MSG_COPY) {
copy = prepare_copy(buf, min_t(size_t, bufsz, ns->msg_ctlmax)); copy = prepare_copy(buf, min_t(size_t, bufsz, ns->msg_ctlmax));
if (IS_ERR(copy)) if (IS_ERR(copy))
...@@ -907,8 +905,10 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, ...@@ -907,8 +905,10 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
} }
mode = convert_mode(&msgtyp, msgflg); mode = convert_mode(&msgtyp, msgflg);
msq = msg_lock_check(ns, msqid); rcu_read_lock();
msq = msq_obtain_object_check(ns, msqid);
if (IS_ERR(msq)) { if (IS_ERR(msq)) {
rcu_read_unlock();
free_copy(copy); free_copy(copy);
return PTR_ERR(msq); return PTR_ERR(msq);
} }
...@@ -918,10 +918,10 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, ...@@ -918,10 +918,10 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
msg = ERR_PTR(-EACCES); msg = ERR_PTR(-EACCES);
if (ipcperms(ns, &msq->q_perm, S_IRUGO)) if (ipcperms(ns, &msq->q_perm, S_IRUGO))
goto out_unlock; goto out_unlock1;
ipc_lock_object(&msq->q_perm);
msg = find_msg(msq, &msgtyp, mode); msg = find_msg(msq, &msgtyp, mode);
if (!IS_ERR(msg)) { if (!IS_ERR(msg)) {
/* /*
* Found a suitable message. * Found a suitable message.
...@@ -929,7 +929,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, ...@@ -929,7 +929,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
*/ */
if ((bufsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) { if ((bufsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {
msg = ERR_PTR(-E2BIG); msg = ERR_PTR(-E2BIG);
goto out_unlock; goto out_unlock0;
} }
/* /*
* If we are copying, then do not unlink message and do * If we are copying, then do not unlink message and do
...@@ -937,8 +937,9 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, ...@@ -937,8 +937,9 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
*/ */
if (msgflg & MSG_COPY) { if (msgflg & MSG_COPY) {
msg = copy_msg(msg, copy); msg = copy_msg(msg, copy);
goto out_unlock; goto out_unlock0;
} }
list_del(&msg->m_list); list_del(&msg->m_list);
msq->q_qnum--; msq->q_qnum--;
msq->q_rtime = get_seconds(); msq->q_rtime = get_seconds();
...@@ -947,14 +948,16 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, ...@@ -947,14 +948,16 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
atomic_sub(msg->m_ts, &ns->msg_bytes); atomic_sub(msg->m_ts, &ns->msg_bytes);
atomic_dec(&ns->msg_hdrs); atomic_dec(&ns->msg_hdrs);
ss_wakeup(&msq->q_senders, 0); ss_wakeup(&msq->q_senders, 0);
msg_unlock(msq);
break; goto out_unlock0;
} }
/* No message waiting. Wait for a message */ /* No message waiting. Wait for a message */
if (msgflg & IPC_NOWAIT) { if (msgflg & IPC_NOWAIT) {
msg = ERR_PTR(-ENOMSG); msg = ERR_PTR(-ENOMSG);
goto out_unlock; goto out_unlock0;
} }
list_add_tail(&msr_d.r_list, &msq->q_receivers); list_add_tail(&msr_d.r_list, &msq->q_receivers);
msr_d.r_tsk = current; msr_d.r_tsk = current;
msr_d.r_msgtype = msgtyp; msr_d.r_msgtype = msgtyp;
...@@ -965,8 +968,9 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, ...@@ -965,8 +968,9 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
msr_d.r_maxsize = bufsz; msr_d.r_maxsize = bufsz;
msr_d.r_msg = ERR_PTR(-EAGAIN); msr_d.r_msg = ERR_PTR(-EAGAIN);
current->state = TASK_INTERRUPTIBLE; current->state = TASK_INTERRUPTIBLE;
msg_unlock(msq);
ipc_unlock_object(&msq->q_perm);
rcu_read_unlock();
schedule(); schedule();
/* Lockless receive, part 1: /* Lockless receive, part 1:
...@@ -977,7 +981,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, ...@@ -977,7 +981,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
* Prior to destruction, expunge_all(-EIRDM) changes r_msg. * Prior to destruction, expunge_all(-EIRDM) changes r_msg.
* Thus if r_msg is -EAGAIN, then the queue not yet destroyed. * Thus if r_msg is -EAGAIN, then the queue not yet destroyed.
* rcu_read_lock() prevents preemption between reading r_msg * rcu_read_lock() prevents preemption between reading r_msg
* and the spin_lock() inside ipc_lock_by_ptr(). * and acquiring the q_perm.lock in ipc_lock_object().
*/ */
rcu_read_lock(); rcu_read_lock();
...@@ -996,32 +1000,34 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, ...@@ -996,32 +1000,34 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp,
* If there is a message or an error then accept it without * If there is a message or an error then accept it without
* locking. * locking.
*/ */
if (msg != ERR_PTR(-EAGAIN)) { if (msg != ERR_PTR(-EAGAIN))
rcu_read_unlock(); goto out_unlock1;
break;
}
/* Lockless receive, part 3: /* Lockless receive, part 3:
* Acquire the queue spinlock. * Acquire the queue spinlock.
*/ */
ipc_lock_by_ptr(&msq->q_perm); ipc_lock_object(&msq->q_perm);
rcu_read_unlock();
/* Lockless receive, part 4: /* Lockless receive, part 4:
* Repeat test after acquiring the spinlock. * Repeat test after acquiring the spinlock.
*/ */
msg = (struct msg_msg*)msr_d.r_msg; msg = (struct msg_msg*)msr_d.r_msg;
if (msg != ERR_PTR(-EAGAIN)) if (msg != ERR_PTR(-EAGAIN))
goto out_unlock; goto out_unlock0;
list_del(&msr_d.r_list); list_del(&msr_d.r_list);
if (signal_pending(current)) { if (signal_pending(current)) {
msg = ERR_PTR(-ERESTARTNOHAND); msg = ERR_PTR(-ERESTARTNOHAND);
out_unlock: goto out_unlock0;
msg_unlock(msq);
break;
} }
ipc_unlock_object(&msq->q_perm);
} }
out_unlock0:
ipc_unlock_object(&msq->q_perm);
out_unlock1:
rcu_read_unlock();
if (IS_ERR(msg)) { if (IS_ERR(msg)) {
free_copy(copy); free_copy(copy);
return PTR_ERR(msg); return PTR_ERR(msg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment