Commit 6ceea368 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Refactor Receive accounting

Clean up: Divide the work cleanly:

- rpcrdma_wc_receive is responsible only for RDMA Receives
- rpcrdma_reply_handler is responsible only for RPC Replies
- the posted send and receive counts both belong in rpcrdma_ep
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent b674c4b4
...@@ -570,7 +570,7 @@ TRACE_EVENT(xprtrdma_post_recvs, ...@@ -570,7 +570,7 @@ TRACE_EVENT(xprtrdma_post_recvs,
__entry->r_xprt = r_xprt; __entry->r_xprt = r_xprt;
__entry->count = count; __entry->count = count;
__entry->status = status; __entry->status = status;
__entry->posted = r_xprt->rx_buf.rb_posted_receives; __entry->posted = r_xprt->rx_ep.rep_receive_count;
__assign_str(addr, rpcrdma_addrstr(r_xprt)); __assign_str(addr, rpcrdma_addrstr(r_xprt));
__assign_str(port, rpcrdma_portstr(r_xprt)); __assign_str(port, rpcrdma_portstr(r_xprt));
), ),
......
...@@ -207,7 +207,6 @@ int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst) ...@@ -207,7 +207,6 @@ int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
if (rc < 0) if (rc < 0)
goto failed_marshal; goto failed_marshal;
rpcrdma_post_recvs(r_xprt, true);
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
goto drop_connection; goto drop_connection;
return 0; return 0;
......
...@@ -1312,11 +1312,6 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) ...@@ -1312,11 +1312,6 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
u32 credits; u32 credits;
__be32 *p; __be32 *p;
--buf->rb_posted_receives;
if (rep->rr_hdrbuf.head[0].iov_len == 0)
goto out_badstatus;
/* Fixed transport header fields */ /* Fixed transport header fields */
xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf, xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
rep->rr_hdrbuf.head[0].iov_base); rep->rr_hdrbuf.head[0].iov_base);
...@@ -1361,31 +1356,21 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) ...@@ -1361,31 +1356,21 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
rpcrdma_post_recvs(r_xprt, false);
queue_work(rpcrdma_receive_wq, &rep->rr_work); queue_work(rpcrdma_receive_wq, &rep->rr_work);
return; return;
out_badversion: out_badversion:
trace_xprtrdma_reply_vers(rep); trace_xprtrdma_reply_vers(rep);
goto repost; goto out;
/* The RPC transaction has already been terminated, or the header
* is corrupt.
*/
out_norqst: out_norqst:
spin_unlock(&xprt->queue_lock); spin_unlock(&xprt->queue_lock);
trace_xprtrdma_reply_rqst(rep); trace_xprtrdma_reply_rqst(rep);
goto repost; goto out;
out_shortreply: out_shortreply:
trace_xprtrdma_reply_short(rep); trace_xprtrdma_reply_short(rep);
/* If no pending RPC transaction was matched, post a replacement out:
* receive buffer before returning.
*/
repost:
rpcrdma_post_recvs(r_xprt, false);
out_badstatus:
rpcrdma_recv_buffer_put(rep); rpcrdma_recv_buffer_put(rep);
} }
...@@ -78,6 +78,7 @@ static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt); ...@@ -78,6 +78,7 @@ static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf); static void rpcrdma_mrs_destroy(struct rpcrdma_buffer *buf);
static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp); static int rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt, bool temp);
static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb); static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
struct workqueue_struct *rpcrdma_receive_wq __read_mostly; struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
...@@ -189,11 +190,13 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) ...@@ -189,11 +190,13 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
struct ib_cqe *cqe = wc->wr_cqe; struct ib_cqe *cqe = wc->wr_cqe;
struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
rr_cqe); rr_cqe);
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
/* WARNING: Only wr_id and status are reliable at this point */ /* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_receive(wc); trace_xprtrdma_wc_receive(wc);
--r_xprt->rx_ep.rep_receive_count;
if (wc->status != IB_WC_SUCCESS) if (wc->status != IB_WC_SUCCESS)
goto out_fail; goto out_flushed;
/* status == SUCCESS means all fields in wc are trustworthy */ /* status == SUCCESS means all fields in wc are trustworthy */
rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len); rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
...@@ -204,17 +207,16 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) ...@@ -204,17 +207,16 @@ rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
rdmab_addr(rep->rr_rdmabuf), rdmab_addr(rep->rr_rdmabuf),
wc->byte_len, DMA_FROM_DEVICE); wc->byte_len, DMA_FROM_DEVICE);
out_schedule: rpcrdma_post_recvs(r_xprt, false);
rpcrdma_reply_handler(rep); rpcrdma_reply_handler(rep);
return; return;
out_fail: out_flushed:
if (wc->status != IB_WC_WR_FLUSH_ERR) if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
ib_wc_status_msg(wc->status), ib_wc_status_msg(wc->status),
wc->status, wc->vendor_err); wc->status, wc->vendor_err);
rpcrdma_set_xdrlen(&rep->rr_hdrbuf, 0); rpcrdma_recv_buffer_put(rep);
goto out_schedule;
} }
static void static void
...@@ -581,6 +583,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ...@@ -581,6 +583,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
init_waitqueue_head(&ep->rep_connect_wait); init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_disconnect_worker, INIT_DELAYED_WORK(&ep->rep_disconnect_worker,
rpcrdma_disconnect_worker); rpcrdma_disconnect_worker);
ep->rep_receive_count = 0;
sendcq = ib_alloc_cq(ia->ri_device, NULL, sendcq = ib_alloc_cq(ia->ri_device, NULL,
ep->rep_attr.cap.max_send_wr + 1, ep->rep_attr.cap.max_send_wr + 1,
...@@ -1174,7 +1177,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) ...@@ -1174,7 +1177,6 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
} }
buf->rb_credits = 1; buf->rb_credits = 1;
buf->rb_posted_receives = 0;
INIT_LIST_HEAD(&buf->rb_recv_bufs); INIT_LIST_HEAD(&buf->rb_recv_bufs);
rc = rpcrdma_sendctxs_create(r_xprt); rc = rpcrdma_sendctxs_create(r_xprt);
...@@ -1511,25 +1513,20 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, ...@@ -1511,25 +1513,20 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
return 0; return 0;
} }
/** static void
* rpcrdma_post_recvs - Maybe post some Receive buffers
* @r_xprt: controlling transport
* @temp: when true, allocate temp rpcrdma_rep objects
*
*/
void
rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
{ {
struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
struct ib_recv_wr *wr, *bad_wr; struct ib_recv_wr *wr, *bad_wr;
int needed, count, rc; int needed, count, rc;
rc = 0; rc = 0;
count = 0; count = 0;
needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
if (buf->rb_posted_receives > needed) if (ep->rep_receive_count > needed)
goto out; goto out;
needed -= buf->rb_posted_receives; needed -= ep->rep_receive_count;
count = 0; count = 0;
wr = NULL; wr = NULL;
...@@ -1577,7 +1574,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) ...@@ -1577,7 +1574,7 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
--count; --count;
} }
} }
buf->rb_posted_receives += count; ep->rep_receive_count += count;
out: out:
trace_xprtrdma_post_recvs(r_xprt, count, rc); trace_xprtrdma_post_recvs(r_xprt, count, rc);
} }
...@@ -102,6 +102,7 @@ struct rpcrdma_ep { ...@@ -102,6 +102,7 @@ struct rpcrdma_ep {
struct rpcrdma_connect_private rep_cm_private; struct rpcrdma_connect_private rep_cm_private;
struct rdma_conn_param rep_remote_cma; struct rdma_conn_param rep_remote_cma;
struct delayed_work rep_disconnect_worker; struct delayed_work rep_disconnect_worker;
int rep_receive_count;
}; };
/* Pre-allocate extra Work Requests for handling backward receives /* Pre-allocate extra Work Requests for handling backward receives
...@@ -404,7 +405,6 @@ struct rpcrdma_buffer { ...@@ -404,7 +405,6 @@ struct rpcrdma_buffer {
unsigned long rb_flags; unsigned long rb_flags;
u32 rb_max_requests; u32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */ u32 rb_credits; /* most recent credit grant */
int rb_posted_receives;
u32 rb_bc_srv_max_requests; u32 rb_bc_srv_max_requests;
spinlock_t rb_reqslock; /* protect rb_allreqs */ spinlock_t rb_reqslock; /* protect rb_allreqs */
...@@ -560,7 +560,6 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *); ...@@ -560,7 +560,6 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *, int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
struct rpcrdma_req *); struct rpcrdma_req *);
void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
/* /*
* Buffer calls - xprtrdma/verbs.c * Buffer calls - xprtrdma/verbs.c
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment