Commit 6c7a64e5 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Add socket transmit queue offset tracking

Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent e1806c7b
...@@ -42,6 +42,13 @@ struct sock_xprt { ...@@ -42,6 +42,13 @@ struct sock_xprt {
flags; flags;
} recv; } recv;
/*
* State of TCP transmit queue
*/
struct {
u32 offset;
} xmit;
/* /*
* Connection of transports * Connection of transports
*/ */
......
...@@ -461,7 +461,7 @@ static int xs_nospace(struct rpc_task *task) ...@@ -461,7 +461,7 @@ static int xs_nospace(struct rpc_task *task)
int ret = -EAGAIN; int ret = -EAGAIN;
dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
task->tk_pid, req->rq_slen - req->rq_bytes_sent, task->tk_pid, req->rq_slen - transport->xmit.offset,
req->rq_slen); req->rq_slen);
/* Protect against races with write_space */ /* Protect against races with write_space */
...@@ -528,19 +528,22 @@ static int xs_local_send_request(struct rpc_task *task) ...@@ -528,19 +528,22 @@ static int xs_local_send_request(struct rpc_task *task)
req->rq_svec->iov_base, req->rq_svec->iov_len); req->rq_svec->iov_base, req->rq_svec->iov_len);
req->rq_xtime = ktime_get(); req->rq_xtime = ktime_get();
status = xs_sendpages(transport->sock, NULL, 0, xdr, req->rq_bytes_sent, status = xs_sendpages(transport->sock, NULL, 0, xdr,
transport->xmit.offset,
true, &sent); true, &sent);
dprintk("RPC: %s(%u) = %d\n", dprintk("RPC: %s(%u) = %d\n",
__func__, xdr->len - req->rq_bytes_sent, status); __func__, xdr->len - transport->xmit.offset, status);
if (status == -EAGAIN && sock_writeable(transport->inet)) if (status == -EAGAIN && sock_writeable(transport->inet))
status = -ENOBUFS; status = -ENOBUFS;
if (likely(sent > 0) || status == 0) { if (likely(sent > 0) || status == 0) {
req->rq_bytes_sent += sent; transport->xmit.offset += sent;
req->rq_xmit_bytes_sent += sent; req->rq_bytes_sent = transport->xmit.offset;
if (likely(req->rq_bytes_sent >= req->rq_slen)) { if (likely(req->rq_bytes_sent >= req->rq_slen)) {
req->rq_xmit_bytes_sent += transport->xmit.offset;
req->rq_bytes_sent = 0; req->rq_bytes_sent = 0;
transport->xmit.offset = 0;
return 0; return 0;
} }
status = -EAGAIN; status = -EAGAIN;
...@@ -592,10 +595,10 @@ static int xs_udp_send_request(struct rpc_task *task) ...@@ -592,10 +595,10 @@ static int xs_udp_send_request(struct rpc_task *task)
return -ENOTCONN; return -ENOTCONN;
req->rq_xtime = ktime_get(); req->rq_xtime = ktime_get();
status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen, status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
xdr, req->rq_bytes_sent, true, &sent); xdr, 0, true, &sent);
dprintk("RPC: xs_udp_send_request(%u) = %d\n", dprintk("RPC: xs_udp_send_request(%u) = %d\n",
xdr->len - req->rq_bytes_sent, status); xdr->len, status);
/* firewall is blocking us, don't return -EAGAIN or we end up looping */ /* firewall is blocking us, don't return -EAGAIN or we end up looping */
if (status == -EPERM) if (status == -EPERM)
...@@ -684,17 +687,20 @@ static int xs_tcp_send_request(struct rpc_task *task) ...@@ -684,17 +687,20 @@ static int xs_tcp_send_request(struct rpc_task *task)
while (1) { while (1) {
sent = 0; sent = 0;
status = xs_sendpages(transport->sock, NULL, 0, xdr, status = xs_sendpages(transport->sock, NULL, 0, xdr,
req->rq_bytes_sent, zerocopy, &sent); transport->xmit.offset,
zerocopy, &sent);
dprintk("RPC: xs_tcp_send_request(%u) = %d\n", dprintk("RPC: xs_tcp_send_request(%u) = %d\n",
xdr->len - req->rq_bytes_sent, status); xdr->len - transport->xmit.offset, status);
/* If we've sent the entire packet, immediately /* If we've sent the entire packet, immediately
* reset the count of bytes sent. */ * reset the count of bytes sent. */
req->rq_bytes_sent += sent; transport->xmit.offset += sent;
req->rq_xmit_bytes_sent += sent; req->rq_bytes_sent = transport->xmit.offset;
if (likely(req->rq_bytes_sent >= req->rq_slen)) { if (likely(req->rq_bytes_sent >= req->rq_slen)) {
req->rq_xmit_bytes_sent += transport->xmit.offset;
req->rq_bytes_sent = 0; req->rq_bytes_sent = 0;
transport->xmit.offset = 0;
return 0; return 0;
} }
...@@ -760,18 +766,13 @@ static int xs_tcp_send_request(struct rpc_task *task) ...@@ -760,18 +766,13 @@ static int xs_tcp_send_request(struct rpc_task *task)
*/ */
static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
struct rpc_rqst *req; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
if (task != xprt->snd_task) if (task != xprt->snd_task)
return; return;
if (task == NULL) if (task == NULL)
goto out_release; goto out_release;
req = task->tk_rqstp; if (transport->xmit.offset == 0 || !xprt_connected(xprt))
if (req == NULL)
goto out_release;
if (req->rq_bytes_sent == 0)
goto out_release;
if (req->rq_bytes_sent == req->rq_snd_buf.len)
goto out_release; goto out_release;
set_bit(XPRT_CLOSE_WAIT, &xprt->state); set_bit(XPRT_CLOSE_WAIT, &xprt->state);
out_release: out_release:
...@@ -2021,6 +2022,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, ...@@ -2021,6 +2022,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
write_unlock_bh(&sk->sk_callback_lock); write_unlock_bh(&sk->sk_callback_lock);
} }
transport->xmit.offset = 0;
/* Tell the socket layer to start connecting... */ /* Tell the socket layer to start connecting... */
xprt->stat.connect_count++; xprt->stat.connect_count++;
xprt->stat.connect_start = jiffies; xprt->stat.connect_start = jiffies;
...@@ -2384,6 +2387,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) ...@@ -2384,6 +2387,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
transport->recv.len = 0; transport->recv.len = 0;
transport->recv.copied = 0; transport->recv.copied = 0;
transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
transport->xmit.offset = 0;
/* Tell the socket layer to start connecting... */ /* Tell the socket layer to start connecting... */
xprt->stat.connect_count++; xprt->stat.connect_count++;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment