Commit c4433055 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Fix RPC receive hangs

The RPC code is occasionally hanging when the receive code fails to
empty the socket buffer due to a partial read of the data. When we
convert that to an EAGAIN, it appears we occasionally leave data in the
socket. The fix is to just keep reading until the socket returns
EAGAIN/EWOULDBLOCK.
Reported-by: default avatarCatalin Marinas <catalin.marinas@arm.com>
Reported-by: default avatarCristian Marussi <cristian.marussi@arm.com>
Reported-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
Tested-by: default avatarCatalin Marinas <catalin.marinas@arm.com>
Tested-by: default avatarCristian Marussi <cristian.marussi@arm.com>
parent 0a9a4304
...@@ -398,7 +398,7 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, ...@@ -398,7 +398,7 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
goto out; goto out;
if (ret != want) if (ret != want)
goto eagain; goto out;
seek = 0; seek = 0;
} else { } else {
seek -= buf->head[0].iov_len; seek -= buf->head[0].iov_len;
...@@ -418,7 +418,7 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, ...@@ -418,7 +418,7 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
goto out; goto out;
if (ret != want) if (ret != want)
goto eagain; goto out;
seek = 0; seek = 0;
} else { } else {
seek -= buf->page_len; seek -= buf->page_len;
...@@ -433,7 +433,7 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, ...@@ -433,7 +433,7 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC)) if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
goto out; goto out;
if (ret != want) if (ret != want)
goto eagain; goto out;
} else } else
offset += buf->tail[0].iov_len; offset += buf->tail[0].iov_len;
ret = -EMSGSIZE; ret = -EMSGSIZE;
...@@ -441,9 +441,6 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags, ...@@ -441,9 +441,6 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
out: out:
*read = offset - seek_init; *read = offset - seek_init;
return ret; return ret;
eagain:
ret = -EAGAIN;
goto out;
sock_err: sock_err:
offset += seek; offset += seek;
goto out; goto out;
...@@ -486,19 +483,18 @@ xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg, ...@@ -486,19 +483,18 @@ xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
if (transport->recv.offset == transport->recv.len) { if (transport->recv.offset == transport->recv.len) {
if (xs_read_stream_request_done(transport)) if (xs_read_stream_request_done(transport))
msg->msg_flags |= MSG_EOR; msg->msg_flags |= MSG_EOR;
return transport->recv.copied; return read;
} }
switch (ret) { switch (ret) {
default:
break;
case -EMSGSIZE: case -EMSGSIZE:
return transport->recv.copied; return read;
case 0: case 0:
return -ESHUTDOWN; return -ESHUTDOWN;
default:
if (ret < 0)
return ret;
} }
return -EAGAIN; return ret < 0 ? ret : read;
} }
static size_t static size_t
...@@ -537,7 +533,7 @@ xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags) ...@@ -537,7 +533,7 @@ xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
ret = xs_read_stream_request(transport, msg, flags, req); ret = xs_read_stream_request(transport, msg, flags, req);
if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
xprt_complete_bc_request(req, ret); xprt_complete_bc_request(req, transport->recv.copied);
return ret; return ret;
} }
...@@ -570,7 +566,7 @@ xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags) ...@@ -570,7 +566,7 @@ xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
spin_lock(&xprt->queue_lock); spin_lock(&xprt->queue_lock);
if (msg->msg_flags & (MSG_EOR|MSG_TRUNC)) if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
xprt_complete_rqst(req->rq_task, ret); xprt_complete_rqst(req->rq_task, transport->recv.copied);
xprt_unpin_rqst(req); xprt_unpin_rqst(req);
out: out:
spin_unlock(&xprt->queue_lock); spin_unlock(&xprt->queue_lock);
...@@ -591,10 +587,8 @@ xs_read_stream(struct sock_xprt *transport, int flags) ...@@ -591,10 +587,8 @@ xs_read_stream(struct sock_xprt *transport, int flags)
if (ret <= 0) if (ret <= 0)
goto out_err; goto out_err;
transport->recv.offset = ret; transport->recv.offset = ret;
if (ret != want) { if (transport->recv.offset != want)
ret = -EAGAIN; return transport->recv.offset;
goto out_err;
}
transport->recv.len = be32_to_cpu(transport->recv.fraghdr) & transport->recv.len = be32_to_cpu(transport->recv.fraghdr) &
RPC_FRAGMENT_SIZE_MASK; RPC_FRAGMENT_SIZE_MASK;
transport->recv.offset -= sizeof(transport->recv.fraghdr); transport->recv.offset -= sizeof(transport->recv.fraghdr);
...@@ -602,6 +596,9 @@ xs_read_stream(struct sock_xprt *transport, int flags) ...@@ -602,6 +596,9 @@ xs_read_stream(struct sock_xprt *transport, int flags)
} }
switch (be32_to_cpu(transport->recv.calldir)) { switch (be32_to_cpu(transport->recv.calldir)) {
default:
msg.msg_flags |= MSG_TRUNC;
break;
case RPC_CALL: case RPC_CALL:
ret = xs_read_stream_call(transport, &msg, flags); ret = xs_read_stream_call(transport, &msg, flags);
break; break;
...@@ -616,6 +613,8 @@ xs_read_stream(struct sock_xprt *transport, int flags) ...@@ -616,6 +613,8 @@ xs_read_stream(struct sock_xprt *transport, int flags)
goto out_err; goto out_err;
read += ret; read += ret;
if (transport->recv.offset < transport->recv.len) { if (transport->recv.offset < transport->recv.len) {
if (!(msg.msg_flags & MSG_TRUNC))
return read;
ret = xs_read_discard(transport->sock, &msg, flags, ret = xs_read_discard(transport->sock, &msg, flags,
transport->recv.len - transport->recv.offset); transport->recv.len - transport->recv.offset);
if (ret <= 0) if (ret <= 0)
...@@ -623,7 +622,7 @@ xs_read_stream(struct sock_xprt *transport, int flags) ...@@ -623,7 +622,7 @@ xs_read_stream(struct sock_xprt *transport, int flags)
transport->recv.offset += ret; transport->recv.offset += ret;
read += ret; read += ret;
if (transport->recv.offset != transport->recv.len) if (transport->recv.offset != transport->recv.len)
return -EAGAIN; return read;
} }
if (xs_read_stream_request_done(transport)) { if (xs_read_stream_request_done(transport)) {
trace_xs_stream_read_request(transport); trace_xs_stream_read_request(transport);
...@@ -653,7 +652,7 @@ static void xs_stream_data_receive(struct sock_xprt *transport) ...@@ -653,7 +652,7 @@ static void xs_stream_data_receive(struct sock_xprt *transport)
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
for (;;) { for (;;) {
ret = xs_read_stream(transport, MSG_DONTWAIT); ret = xs_read_stream(transport, MSG_DONTWAIT);
if (ret <= 0) if (ret < 0)
break; break;
read += ret; read += ret;
cond_resched(); cond_resched();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment