Commit 0c0829bc authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Don't wake pending tasks until disconnect is done

Transport disconnect processing does a "wake pending tasks" at
various points.

Suppose an RPC Reply is being processed. The RPC task that Reply
goes with is waiting on the pending queue. If a disconnect wake-up
happens before reply processing is done, that reply, even if it is
good, is thrown away, and the RPC has to be sent again.

This window apparently does not exist for socket transports because
there is a lock held while a reply is being received which prevents
the wake-up call until after reply processing is done.

To resolve this, all RPC replies being processed on an RPC-over-RDMA
transport have to complete before pending tasks are awoken due to a
transport disconnect.

Callers that already hold the transport write lock may invoke
->ops->close directly. Others use a generic helper that schedules
a close when the write lock can be taken safely.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 3d433ad8
......@@ -193,14 +193,15 @@ static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
*/
int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpc_xprt *xprt = rqst->rq_xprt;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
int rc;
if (!xprt_connected(rqst->rq_xprt))
goto drop_connection;
if (!xprt_connected(xprt))
return -ENOTCONN;
if (!xprt_request_get_cong(rqst->rq_xprt, rqst))
if (!xprt_request_get_cong(xprt, rqst))
return -EBADSLT;
rc = rpcrdma_bc_marshal_reply(rqst);
......@@ -215,7 +216,7 @@ int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst)
if (rc != -ENOTCONN)
return rc;
drop_connection:
xprt_disconnect_done(rqst->rq_xprt);
xprt_rdma_close(xprt);
return -ENOTCONN;
}
......@@ -338,7 +339,7 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
out_overflow:
pr_warn("RPC/RDMA backchannel overflow\n");
xprt_disconnect_done(xprt);
xprt_force_disconnect(xprt);
/* This receive buffer gets reposted automatically
* when the connection is re-established.
*/
......
......@@ -200,11 +200,10 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
svc_rdma_send_ctxt_put(rdma, ctxt);
goto drop_connection;
}
return rc;
return 0;
drop_connection:
dprintk("svcrdma: failed to send bc call\n");
xprt_disconnect_done(xprt);
return -ENOTCONN;
}
......@@ -225,8 +224,11 @@ xprt_rdma_bc_send_request(struct rpc_rqst *rqst)
ret = -ENOTCONN;
rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
if (!test_bit(XPT_DEAD, &sxprt->xpt_flags)) {
ret = rpcrdma_bc_send_request(rdma, rqst);
if (ret == -ENOTCONN)
svc_close_xprt(sxprt);
}
mutex_unlock(&sxprt->xpt_mutex);
......
......@@ -437,8 +437,7 @@ xprt_setup_rdma(struct xprt_create *args)
* Caller holds @xprt's send lock to prevent activity on this
* transport while the connection is torn down.
*/
static void
xprt_rdma_close(struct rpc_xprt *xprt)
void xprt_rdma_close(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_ep *ep = &r_xprt->rx_ep;
......@@ -453,13 +452,13 @@ xprt_rdma_close(struct rpc_xprt *xprt)
if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
rpcrdma_ia_remove(ia);
return;
goto out;
}
if (ep->rep_connected == -ENODEV)
return;
if (ep->rep_connected > 0)
xprt->reestablish_timeout = 0;
xprt_disconnect_done(xprt);
rpcrdma_ep_disconnect(ep, ia);
/* Prepare @xprt for the next connection by reinitializing
......@@ -467,6 +466,10 @@ xprt_rdma_close(struct rpc_xprt *xprt)
*/
r_xprt->rx_buf.rb_credits = 1;
xprt->cwnd = RPC_CWNDSHIFT;
out:
++xprt->connect_cookie;
xprt_disconnect_done(xprt);
}
/**
......@@ -717,7 +720,7 @@ xprt_rdma_send_request(struct rpc_rqst *rqst)
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
if (!xprt_connected(xprt))
goto drop_connection;
return -ENOTCONN;
if (!xprt_request_get_cong(xprt, rqst))
return -EBADSLT;
......@@ -749,8 +752,8 @@ xprt_rdma_send_request(struct rpc_rqst *rqst)
if (rc != -ENOTCONN)
return rc;
drop_connection:
xprt_disconnect_done(xprt);
return -ENOTCONN; /* implies disconnect */
xprt_rdma_close(xprt);
return -ENOTCONN;
}
void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
......
......@@ -280,7 +280,6 @@ rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
ep->rep_connected = -EAGAIN;
goto disconnected;
case RDMA_CM_EVENT_DISCONNECTED:
++xprt->connect_cookie;
ep->rep_connected = -ECONNABORTED;
disconnected:
xprt_force_disconnect(xprt);
......
......@@ -647,6 +647,7 @@ static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
extern unsigned int xprt_rdma_max_inline_read;
void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap);
void xprt_rdma_free_addresses(struct rpc_xprt *xprt);
void xprt_rdma_close(struct rpc_xprt *xprt);
void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq);
int xprt_rdma_init(void);
void xprt_rdma_cleanup(void);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment