Commit fe97b47c authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Use workqueue to process RPC/RDMA replies

The reply tasklet is fast, but it's single threaded. After reply
traffic saturates a single CPU, there's no more reply processing
capacity.

Replace the tasklet with a workqueue to spread reply handling across
all CPUs.  This also moves RPC/RDMA reply handling out of the soft
IRQ context and into a context that allows sleeps.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Reviewed-by: default avatarSagi Grimberg <sagig@mellanox.com>
Tested-By: default avatarDevesh Sharma <devesh.sharma@avagotech.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 1e465fd4
...@@ -723,8 +723,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep) ...@@ -723,8 +723,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
schedule_delayed_work(&ep->rep_connect_worker, 0); schedule_delayed_work(&ep->rep_connect_worker, 0);
} }
/* /* Process received RPC/RDMA messages.
* Called as a tasklet to do req/reply match and complete a request *
* Errors must result in the RPC task either being awakened, or * Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time. * allowed to timeout, to discover the errors at that time.
*/ */
...@@ -752,13 +752,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) ...@@ -752,13 +752,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
if (headerp->rm_vers != rpcrdma_version) if (headerp->rm_vers != rpcrdma_version)
goto out_badversion; goto out_badversion;
/* Get XID and try for a match. */ /* Match incoming rpcrdma_rep to an rpcrdma_req to
spin_lock(&xprt->transport_lock); * get context for handling any incoming chunks.
*/
spin_lock_bh(&xprt->transport_lock);
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
if (!rqst) if (!rqst)
goto out_nomatch; goto out_nomatch;
/* get request object */
req = rpcr_to_rdmar(rqst); req = rpcr_to_rdmar(rqst);
if (req->rl_reply) if (req->rl_reply)
goto out_duplicate; goto out_duplicate;
...@@ -859,7 +860,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) ...@@ -859,7 +860,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
xprt_release_rqst_cong(rqst->rq_task); xprt_release_rqst_cong(rqst->rq_task);
xprt_complete_rqst(rqst->rq_task, status); xprt_complete_rqst(rqst->rq_task, status);
spin_unlock(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status); __func__, xprt, rqst, status);
return; return;
...@@ -882,14 +883,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) ...@@ -882,14 +883,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
goto repost; goto repost;
out_nomatch: out_nomatch:
spin_unlock(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n", dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
__func__, be32_to_cpu(headerp->rm_xid), __func__, be32_to_cpu(headerp->rm_xid),
rep->rr_len); rep->rr_len);
goto repost; goto repost;
out_duplicate: out_duplicate:
spin_unlock(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: " dprintk("RPC: %s: "
"duplicate reply %p to RPC request %p: xid 0x%08x\n", "duplicate reply %p to RPC request %p: xid 0x%08x\n",
__func__, rep, req, be32_to_cpu(headerp->rm_xid)); __func__, rep, req, be32_to_cpu(headerp->rm_xid));
......
...@@ -732,6 +732,7 @@ void xprt_rdma_cleanup(void) ...@@ -732,6 +732,7 @@ void xprt_rdma_cleanup(void)
dprintk("RPC: %s: xprt_unregister returned %i\n", dprintk("RPC: %s: xprt_unregister returned %i\n",
__func__, rc); __func__, rc);
rpcrdma_destroy_wq();
frwr_destroy_recovery_wq(); frwr_destroy_recovery_wq();
} }
...@@ -743,8 +744,15 @@ int xprt_rdma_init(void) ...@@ -743,8 +744,15 @@ int xprt_rdma_init(void)
if (rc) if (rc)
return rc; return rc;
rc = rpcrdma_alloc_wq();
if (rc) {
frwr_destroy_recovery_wq();
return rc;
}
rc = xprt_register_transport(&xprt_rdma); rc = xprt_register_transport(&xprt_rdma);
if (rc) { if (rc) {
rpcrdma_destroy_wq();
frwr_destroy_recovery_wq(); frwr_destroy_recovery_wq();
return rc; return rc;
} }
......
...@@ -100,6 +100,35 @@ rpcrdma_run_tasklet(unsigned long data) ...@@ -100,6 +100,35 @@ rpcrdma_run_tasklet(unsigned long data)
static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL); static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
static struct workqueue_struct *rpcrdma_receive_wq;
int
rpcrdma_alloc_wq(void)
{
struct workqueue_struct *recv_wq;
recv_wq = alloc_workqueue("xprtrdma_receive",
WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
0);
if (!recv_wq)
return -ENOMEM;
rpcrdma_receive_wq = recv_wq;
return 0;
}
void
rpcrdma_destroy_wq(void)
{
struct workqueue_struct *wq;
if (rpcrdma_receive_wq) {
wq = rpcrdma_receive_wq;
rpcrdma_receive_wq = NULL;
destroy_workqueue(wq);
}
}
static void static void
rpcrdma_schedule_tasklet(struct list_head *sched_list) rpcrdma_schedule_tasklet(struct list_head *sched_list)
{ {
...@@ -196,7 +225,16 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) ...@@ -196,7 +225,16 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
} }
static void static void
rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) rpcrdma_receive_worker(struct work_struct *work)
{
struct rpcrdma_rep *rep =
container_of(work, struct rpcrdma_rep, rr_work);
rpcrdma_reply_handler(rep);
}
static void
rpcrdma_recvcq_process_wc(struct ib_wc *wc)
{ {
struct rpcrdma_rep *rep = struct rpcrdma_rep *rep =
(struct rpcrdma_rep *)(unsigned long)wc->wr_id; (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
...@@ -219,8 +257,9 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) ...@@ -219,8 +257,9 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
prefetch(rdmab_to_msg(rep->rr_rdmabuf)); prefetch(rdmab_to_msg(rep->rr_rdmabuf));
out_schedule: out_schedule:
list_add_tail(&rep->rr_list, sched_list); queue_work(rpcrdma_receive_wq, &rep->rr_work);
return; return;
out_fail: out_fail:
if (wc->status != IB_WC_WR_FLUSH_ERR) if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("RPC: %s: rep %p: %s\n", pr_err("RPC: %s: rep %p: %s\n",
...@@ -239,7 +278,6 @@ static void ...@@ -239,7 +278,6 @@ static void
rpcrdma_recvcq_poll(struct ib_cq *cq) rpcrdma_recvcq_poll(struct ib_cq *cq)
{ {
struct ib_wc *pos, wcs[4]; struct ib_wc *pos, wcs[4];
LIST_HEAD(sched_list);
int count, rc; int count, rc;
do { do {
...@@ -251,10 +289,8 @@ rpcrdma_recvcq_poll(struct ib_cq *cq) ...@@ -251,10 +289,8 @@ rpcrdma_recvcq_poll(struct ib_cq *cq)
count = rc; count = rc;
while (count-- > 0) while (count-- > 0)
rpcrdma_recvcq_process_wc(pos++, &sched_list); rpcrdma_recvcq_process_wc(pos++);
} while (rc == ARRAY_SIZE(wcs)); } while (rc == ARRAY_SIZE(wcs));
rpcrdma_schedule_tasklet(&sched_list);
} }
/* Handle provider receive completion upcalls. /* Handle provider receive completion upcalls.
...@@ -272,12 +308,9 @@ static void ...@@ -272,12 +308,9 @@ static void
rpcrdma_flush_cqs(struct rpcrdma_ep *ep) rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
{ {
struct ib_wc wc; struct ib_wc wc;
LIST_HEAD(sched_list);
while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
rpcrdma_recvcq_process_wc(&wc, &sched_list); rpcrdma_recvcq_process_wc(&wc);
if (!list_empty(&sched_list))
rpcrdma_schedule_tasklet(&sched_list);
while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
rpcrdma_sendcq_process_wc(&wc); rpcrdma_sendcq_process_wc(&wc);
} }
...@@ -913,6 +946,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) ...@@ -913,6 +946,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep->rr_device = ia->ri_device; rep->rr_device = ia->ri_device;
rep->rr_rxprt = r_xprt; rep->rr_rxprt = r_xprt;
INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
return rep; return rep;
out_free: out_free:
......
...@@ -164,6 +164,7 @@ struct rpcrdma_rep { ...@@ -164,6 +164,7 @@ struct rpcrdma_rep {
unsigned int rr_len; unsigned int rr_len;
struct ib_device *rr_device; struct ib_device *rr_device;
struct rpcrdma_xprt *rr_rxprt; struct rpcrdma_xprt *rr_rxprt;
struct work_struct rr_work;
struct list_head rr_list; struct list_head rr_list;
struct rpcrdma_regbuf *rr_rdmabuf; struct rpcrdma_regbuf *rr_rdmabuf;
}; };
...@@ -430,6 +431,9 @@ unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); ...@@ -430,6 +431,9 @@ unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
int frwr_alloc_recovery_wq(void); int frwr_alloc_recovery_wq(void);
void frwr_destroy_recovery_wq(void); void frwr_destroy_recovery_wq(void);
int rpcrdma_alloc_wq(void);
void rpcrdma_destroy_wq(void);
/* /*
* Wrappers for chunk registration, shared by read/write chunk code. * Wrappers for chunk registration, shared by read/write chunk code.
*/ */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment