Commit 0ba6f370 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Refactor rpcrdma_deferred_completion

Invoke a common routine for releasing hardware resources (for
example, invalidating MRs). This needs to be done whether an
RPC Reply has arrived or the RPC was terminated early.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 531cca0c
...@@ -1293,6 +1293,20 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) ...@@ -1293,6 +1293,20 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
goto out; goto out;
} }
void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
/* Invalidate and unmap the data payloads before waking
* the waiting application. This guarantees the memory
* regions are properly fenced from the server before the
* application accesses the data. It also ensures proper
* send flow control: waking the next RPC waits until this
* RPC has relinquished all its Send Queue entries.
*/
if (!list_empty(&req->rl_registered))
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
&req->rl_registered);
}
/* Reply handling runs in the poll worker thread. Anything that /* Reply handling runs in the poll worker thread. Anything that
* might wait is deferred to a separate workqueue. * might wait is deferred to a separate workqueue.
*/ */
...@@ -1301,18 +1315,9 @@ void rpcrdma_deferred_completion(struct work_struct *work) ...@@ -1301,18 +1315,9 @@ void rpcrdma_deferred_completion(struct work_struct *work)
struct rpcrdma_rep *rep = struct rpcrdma_rep *rep =
container_of(work, struct rpcrdma_rep, rr_work); container_of(work, struct rpcrdma_rep, rr_work);
struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst); struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
/* Invalidate and unmap the data payloads before waking
* the waiting application. This guarantees the memory
* regions are properly fenced from the server before the
* application accesses the data. It also ensures proper
* send flow control: waking the next RPC waits until this
* RPC has relinquished all its Send Queue entries.
*/
rpcrdma_mark_remote_invalidation(&req->rl_registered, rep); rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &req->rl_registered); rpcrdma_release_rqst(rep->rr_rxprt, req);
rpcrdma_complete_rqst(rep); rpcrdma_complete_rqst(rep);
} }
...@@ -1374,6 +1379,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) ...@@ -1374,6 +1379,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
req = rpcr_to_rdmar(rqst); req = rpcr_to_rdmar(rqst);
req->rl_reply = rep; req->rl_reply = rep;
rep->rr_rqst = rqst; rep->rr_rqst = rqst;
clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
__func__, rep, req, be32_to_cpu(rep->rr_xid)); __func__, rep, req, be32_to_cpu(rep->rr_xid));
......
...@@ -678,15 +678,14 @@ xprt_rdma_free(struct rpc_task *task) ...@@ -678,15 +678,14 @@ xprt_rdma_free(struct rpc_task *task)
struct rpc_rqst *rqst = task->tk_rqstp; struct rpc_rqst *rqst = task->tk_rqstp;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
if (test_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags)) if (test_bit(RPCRDMA_REQ_F_BACKCHANNEL, &req->rl_flags))
return; return;
dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
if (!list_empty(&req->rl_registered)) if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags))
ia->ri_ops->ro_unmap_sync(r_xprt, &req->rl_registered); rpcrdma_release_rqst(r_xprt, req);
rpcrdma_buffer_put(req); rpcrdma_buffer_put(req);
} }
...@@ -742,6 +741,7 @@ xprt_rdma_send_request(struct rpc_task *task) ...@@ -742,6 +741,7 @@ xprt_rdma_send_request(struct rpc_task *task)
goto drop_connection; goto drop_connection;
req->rl_connect_cookie = xprt->connect_cookie; req->rl_connect_cookie = xprt->connect_cookie;
set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
goto drop_connection; goto drop_connection;
......
...@@ -386,6 +386,7 @@ struct rpcrdma_req { ...@@ -386,6 +386,7 @@ struct rpcrdma_req {
/* rl_flags */ /* rl_flags */
enum { enum {
RPCRDMA_REQ_F_BACKCHANNEL = 0, RPCRDMA_REQ_F_BACKCHANNEL = 0,
RPCRDMA_REQ_F_PENDING,
}; };
static inline void static inline void
...@@ -655,6 +656,8 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); ...@@ -655,6 +656,8 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
void rpcrdma_reply_handler(struct rpcrdma_rep *rep); void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req);
void rpcrdma_deferred_completion(struct work_struct *work); void rpcrdma_deferred_completion(struct work_struct *work);
static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment