Commit 15788d1d authored by Chuck Lever's avatar Chuck Lever Committed by Trond Myklebust

xprtrdma: Do not refresh Receive Queue while it is draining

Currently the Receive completion handler refreshes the Receive Queue
whenever a successful Receive completion occurs.

On disconnect, xprtrdma drains the Receive Queue. The first few
Receive completions after a disconnect are typically successful,
until the first flushed Receive.

This means the Receive completion handler continues to post more
Receive WRs after the drain sentinel has been posted. The late-
posted Receives flush after the drain sentinel has completed,
leading to a crash later in rpcrdma_xprt_disconnect().

To prevent this crash, xprtrdma has to ensure that the Receive
handler stops posting Receives before ib_drain_rq() posts its
drain sentinel.
Suggested-by: default avatarTom Talpey <tom@talpey.com>
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 32e6b681
...@@ -101,6 +101,12 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) ...@@ -101,6 +101,12 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_ep *ep = r_xprt->rx_ep; struct rpcrdma_ep *ep = r_xprt->rx_ep;
struct rdma_cm_id *id = ep->re_id; struct rdma_cm_id *id = ep->re_id;
/* Wait for rpcrdma_post_recvs() to leave its critical
* section.
*/
if (atomic_inc_return(&ep->re_receiving) > 1)
wait_for_completion(&ep->re_done);
/* Flush Receives, then wait for deferred Reply work /* Flush Receives, then wait for deferred Reply work
* to complete. * to complete.
*/ */
...@@ -414,6 +420,7 @@ static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt) ...@@ -414,6 +420,7 @@ static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
__module_get(THIS_MODULE); __module_get(THIS_MODULE);
device = id->device; device = id->device;
ep->re_id = id; ep->re_id = id;
reinit_completion(&ep->re_done);
ep->re_max_requests = r_xprt->rx_xprt.max_reqs; ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
ep->re_inline_send = xprt_rdma_max_inline_write; ep->re_inline_send = xprt_rdma_max_inline_write;
...@@ -1385,6 +1392,9 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) ...@@ -1385,6 +1392,9 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
if (!temp) if (!temp)
needed += RPCRDMA_MAX_RECV_BATCH; needed += RPCRDMA_MAX_RECV_BATCH;
if (atomic_inc_return(&ep->re_receiving) > 1)
goto out;
/* fast path: all needed reps can be found on the free list */ /* fast path: all needed reps can be found on the free list */
wr = NULL; wr = NULL;
while (needed) { while (needed) {
...@@ -1410,6 +1420,9 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) ...@@ -1410,6 +1420,9 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
rc = ib_post_recv(ep->re_id->qp, wr, rc = ib_post_recv(ep->re_id->qp, wr,
(const struct ib_recv_wr **)&bad_wr); (const struct ib_recv_wr **)&bad_wr);
if (atomic_dec_return(&ep->re_receiving) > 0)
complete(&ep->re_done);
out: out:
trace_xprtrdma_post_recvs(r_xprt, count, rc); trace_xprtrdma_post_recvs(r_xprt, count, rc);
if (rc) { if (rc) {
......
...@@ -83,6 +83,7 @@ struct rpcrdma_ep { ...@@ -83,6 +83,7 @@ struct rpcrdma_ep {
unsigned int re_max_inline_recv; unsigned int re_max_inline_recv;
int re_async_rc; int re_async_rc;
int re_connect_status; int re_connect_status;
atomic_t re_receiving;
atomic_t re_force_disconnect; atomic_t re_force_disconnect;
struct ib_qp_init_attr re_attr; struct ib_qp_init_attr re_attr;
wait_queue_head_t re_connect_wait; wait_queue_head_t re_connect_wait;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment