Commit 3d188805 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Chunk reading of replies from the server

Read the TCP data in chunks of max 2MB so that we do not hog the
socket lock.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@primarydata.com>
parent 024fbf9c
...@@ -52,6 +52,8 @@ ...@@ -52,6 +52,8 @@
#include "sunrpc.h" #include "sunrpc.h"
#define RPC_TCP_READ_CHUNK_SZ (3*512*1024)
static void xs_close(struct rpc_xprt *xprt); static void xs_close(struct rpc_xprt *xprt);
static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
struct socket *sock); struct socket *sock);
...@@ -1479,6 +1481,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns ...@@ -1479,6 +1481,7 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
.offset = offset, .offset = offset,
.count = len, .count = len,
}; };
size_t ret;
dprintk("RPC: xs_tcp_data_recv started\n"); dprintk("RPC: xs_tcp_data_recv started\n");
do { do {
...@@ -1507,9 +1510,14 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns ...@@ -1507,9 +1510,14 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
/* Skip over any trailing bytes on short reads */ /* Skip over any trailing bytes on short reads */
xs_tcp_read_discard(transport, &desc); xs_tcp_read_discard(transport, &desc);
} while (desc.count); } while (desc.count);
ret = len - desc.count;
if (ret < rd_desc->count)
rd_desc->count -= ret;
else
rd_desc->count = 0;
trace_xs_tcp_data_recv(transport); trace_xs_tcp_data_recv(transport);
dprintk("RPC: xs_tcp_data_recv done\n"); dprintk("RPC: xs_tcp_data_recv done\n");
return len - desc.count; return ret;
} }
static void xs_tcp_data_receive(struct sock_xprt *transport) static void xs_tcp_data_receive(struct sock_xprt *transport)
...@@ -1517,7 +1525,6 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) ...@@ -1517,7 +1525,6 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
struct rpc_xprt *xprt = &transport->xprt; struct rpc_xprt *xprt = &transport->xprt;
struct sock *sk; struct sock *sk;
read_descriptor_t rd_desc = { read_descriptor_t rd_desc = {
.count = 2*1024*1024,
.arg.data = xprt, .arg.data = xprt,
}; };
unsigned long total = 0; unsigned long total = 0;
...@@ -1531,16 +1538,16 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) ...@@ -1531,16 +1538,16 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
for (loop = 0; loop < 64; loop++) { for (loop = 0; loop < 64; loop++) {
rd_desc.count = RPC_TCP_READ_CHUNK_SZ;
lock_sock(sk); lock_sock(sk);
read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
if (read <= 0) { if (rd_desc.count != 0 || read < 0) {
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
release_sock(sk); release_sock(sk);
break; break;
} }
release_sock(sk); release_sock(sk);
total += read; total += read;
rd_desc.count = 65536;
} }
if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
queue_work(xprtiod_workqueue, &transport->recv_worker); queue_work(xprtiod_workqueue, &transport->recv_worker);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment