Commit 10e6fc10 authored by Chuck Lever's avatar Chuck Lever

svcrdma: Post the Reply chunk and Send WR together

Reduce the doorbell and Send completion rates when sending RPC/RDMA
replies that have Reply chunks. NFS READDIR procedures typically
return their result in a Reply chunk, for example.

Instead of calling ib_post_send() to post the Write WRs for the
Reply chunk, and then calling it again to post the Send WR that
conveys the transport header, chain the Write WRs to the Send WR
and call ib_post_send() only once.

Thanks to the Send Queue completion ordering rules, when the Send
WR completes, that guarantees that Write WRs posted before it have
also completed successfully. Thus all Write WRs for the Reply chunk
can remain unsignaled. Instead of handling a Write completion and
then a Send completion, only the Send completion is seen, and it
handles clean up for both the Writes and the Send.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent a1f5788a
...@@ -262,19 +262,24 @@ extern void svc_rdma_release_ctxt(struct svc_xprt *xprt, void *ctxt); ...@@ -262,19 +262,24 @@ extern void svc_rdma_release_ctxt(struct svc_xprt *xprt, void *ctxt);
extern int svc_rdma_recvfrom(struct svc_rqst *); extern int svc_rdma_recvfrom(struct svc_rqst *);
/* svc_rdma_rw.c */ /* svc_rdma_rw.c */
extern void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
struct svc_rdma_chunk_ctxt *cc);
extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma); extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
extern void svc_rdma_cc_init(struct svcxprt_rdma *rdma, extern void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
struct svc_rdma_chunk_ctxt *cc); struct svc_rdma_chunk_ctxt *cc);
extern void svc_rdma_cc_release(struct svcxprt_rdma *rdma, extern void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
struct svc_rdma_chunk_ctxt *cc, struct svc_rdma_chunk_ctxt *cc,
enum dma_data_direction dir); enum dma_data_direction dir);
extern void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt);
extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_chunk *chunk, const struct svc_rdma_chunk *chunk,
const struct xdr_buf *xdr); const struct xdr_buf *xdr);
extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, extern int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt, const struct svc_rdma_pcl *write_pcl,
struct svc_rdma_send_ctxt *sctxt, const struct svc_rdma_pcl *reply_pcl,
const struct xdr_buf *xdr); struct svc_rdma_send_ctxt *sctxt,
const struct xdr_buf *xdr);
extern int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, extern int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
struct svc_rqst *rqstp, struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *head); struct svc_rdma_recv_ctxt *head);
......
...@@ -230,10 +230,18 @@ static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) ...@@ -230,10 +230,18 @@ static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
queue_work(svcrdma_wq, &info->wi_work); queue_work(svcrdma_wq, &info->wi_work);
} }
static void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma, /**
struct svc_rdma_chunk_ctxt *cc) * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
* @rdma: controlling transport
* @ctxt: Send context that is being released
*/
void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt)
{ {
svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc;
if (!cc->cc_sqecount)
return;
svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE); svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE);
} }
...@@ -254,7 +262,6 @@ static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc) ...@@ -254,7 +262,6 @@ static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc)
switch (wc->status) { switch (wc->status) {
case IB_WC_SUCCESS: case IB_WC_SUCCESS:
trace_svcrdma_wc_reply(&cc->cc_cid); trace_svcrdma_wc_reply(&cc->cc_cid);
svc_rdma_reply_chunk_release(rdma, cc);
return; return;
case IB_WC_WR_FLUSH_ERR: case IB_WC_WR_FLUSH_ERR:
trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid); trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid);
...@@ -263,7 +270,6 @@ static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc) ...@@ -263,7 +270,6 @@ static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc)
trace_svcrdma_wc_reply_err(wc, &cc->cc_cid); trace_svcrdma_wc_reply_err(wc, &cc->cc_cid);
} }
svc_rdma_reply_chunk_release(rdma, cc);
svc_xprt_deferred_close(&rdma->sc_xprt); svc_xprt_deferred_close(&rdma->sc_xprt);
} }
...@@ -637,9 +643,10 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, ...@@ -637,9 +643,10 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
} }
/** /**
* svc_rdma_send_reply_chunk - Write all segments in the Reply chunk * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk
* @rdma: controlling RDMA transport * @rdma: controlling RDMA transport
* @rctxt: Write and Reply chunks provisioned by the client * @write_pcl: Write chunk list provided by client
* @reply_pcl: Reply chunk provided by client
* @sctxt: Send WR resources * @sctxt: Send WR resources
* @xdr: xdr_buf containing an RPC Reply * @xdr: xdr_buf containing an RPC Reply
* *
...@@ -650,35 +657,44 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, ...@@ -650,35 +657,44 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
* %-ENOTCONN if posting failed (connection is lost), * %-ENOTCONN if posting failed (connection is lost),
* %-EIO if rdma_rw initialization failed (DMA mapping, etc). * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
*/ */
int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt, const struct svc_rdma_pcl *write_pcl,
struct svc_rdma_send_ctxt *sctxt, const struct svc_rdma_pcl *reply_pcl,
const struct xdr_buf *xdr) struct svc_rdma_send_ctxt *sctxt,
const struct xdr_buf *xdr)
{ {
struct svc_rdma_write_info *info = &sctxt->sc_reply_info; struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
struct ib_send_wr *first_wr;
struct list_head *pos;
struct ib_cqe *cqe;
int ret; int ret;
if (likely(pcl_is_empty(&rctxt->rc_reply_pcl)))
return 0; /* client provided no Reply chunk */
info->wi_rdma = rdma; info->wi_rdma = rdma;
info->wi_chunk = pcl_first_chunk(&rctxt->rc_reply_pcl); info->wi_chunk = pcl_first_chunk(reply_pcl);
info->wi_seg_off = 0; info->wi_seg_off = 0;
info->wi_seg_no = 0; info->wi_seg_no = 0;
svc_rdma_cc_init(rdma, &info->wi_cc);
info->wi_cc.cc_cqe.done = svc_rdma_reply_done; info->wi_cc.cc_cqe.done = svc_rdma_reply_done;
ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, ret = pcl_process_nonpayloads(write_pcl, xdr,
svc_rdma_xb_write, info); svc_rdma_xb_write, info);
if (ret < 0) if (ret < 0)
return ret; return ret;
trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); first_wr = sctxt->sc_wr_chain;
ret = svc_rdma_post_chunk_ctxt(rdma, cc); cqe = &cc->cc_cqe;
if (ret < 0) list_for_each(pos, &cc->cc_rwctxts) {
return ret; struct svc_rdma_rw_ctxt *rwc;
rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
rdma->sc_port_num, cqe, first_wr);
cqe = NULL;
}
sctxt->sc_wr_chain = first_wr;
sctxt->sc_sqecount += cc->cc_sqecount;
trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
return xdr->len; return xdr->len;
} }
......
...@@ -205,6 +205,7 @@ struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) ...@@ -205,6 +205,7 @@ struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma)
xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf, xdr_init_encode(&ctxt->sc_stream, &ctxt->sc_hdrbuf,
ctxt->sc_xprt_buf, NULL); ctxt->sc_xprt_buf, NULL);
svc_rdma_cc_init(rdma, &ctxt->sc_reply_info.wi_cc);
ctxt->sc_send_wr.num_sge = 0; ctxt->sc_send_wr.num_sge = 0;
ctxt->sc_cur_sge_no = 0; ctxt->sc_cur_sge_no = 0;
ctxt->sc_page_count = 0; ctxt->sc_page_count = 0;
...@@ -226,6 +227,8 @@ static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma, ...@@ -226,6 +227,8 @@ static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma,
struct ib_device *device = rdma->sc_cm_id->device; struct ib_device *device = rdma->sc_cm_id->device;
unsigned int i; unsigned int i;
svc_rdma_reply_chunk_release(rdma, ctxt);
if (ctxt->sc_page_count) if (ctxt->sc_page_count)
release_pages(ctxt->sc_pages, ctxt->sc_page_count); release_pages(ctxt->sc_pages, ctxt->sc_page_count);
...@@ -867,16 +870,10 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, ...@@ -867,16 +870,10 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
* in sc_sges[0], and the RPC xdr_buf is prepared in following sges. * in sc_sges[0], and the RPC xdr_buf is prepared in following sges.
* *
* Depending on whether a Write list or Reply chunk is present, * Depending on whether a Write list or Reply chunk is present,
* the server may send all, a portion of, or none of the xdr_buf. * the server may Send all, a portion of, or none of the xdr_buf.
* In the latter case, only the transport header (sc_sges[0]) is * In the latter case, only the transport header (sc_sges[0]) is
* transmitted. * transmitted.
* *
* RDMA Send is the last step of transmitting an RPC reply. Pages
* involved in the earlier RDMA Writes are here transferred out
* of the rqstp and into the sctxt's page array. These pages are
* DMA unmapped by each Write completion, but the subsequent Send
* completion finally releases these pages.
*
* Assumptions: * Assumptions:
* - The Reply's transport header will never be larger than a page. * - The Reply's transport header will never be larger than a page.
*/ */
...@@ -885,6 +882,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, ...@@ -885,6 +882,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt, const struct svc_rdma_recv_ctxt *rctxt,
struct svc_rqst *rqstp) struct svc_rqst *rqstp)
{ {
struct ib_send_wr *send_wr = &sctxt->sc_send_wr;
int ret; int ret;
ret = svc_rdma_map_reply_msg(rdma, sctxt, &rctxt->rc_write_pcl, ret = svc_rdma_map_reply_msg(rdma, sctxt, &rctxt->rc_write_pcl,
...@@ -892,13 +890,16 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, ...@@ -892,13 +890,16 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
if (ret < 0) if (ret < 0)
return ret; return ret;
/* Transfer pages involved in RDMA Writes to the sctxt's
* page array. Completion handling releases these pages.
*/
svc_rdma_save_io_pages(rqstp, sctxt); svc_rdma_save_io_pages(rqstp, sctxt);
if (rctxt->rc_inv_rkey) { if (rctxt->rc_inv_rkey) {
sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV; send_wr->opcode = IB_WR_SEND_WITH_INV;
sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey; send_wr->ex.invalidate_rkey = rctxt->rc_inv_rkey;
} else { } else {
sctxt->sc_send_wr.opcode = IB_WR_SEND; send_wr->opcode = IB_WR_SEND;
} }
return svc_rdma_post_send(rdma, sctxt); return svc_rdma_post_send(rdma, sctxt);
...@@ -1012,10 +1013,15 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -1012,10 +1013,15 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (!p) if (!p)
goto put_ctxt; goto put_ctxt;
ret = svc_rdma_send_reply_chunk(rdma, rctxt, sctxt, &rqstp->rq_res); rc_size = 0;
if (ret < 0) if (!pcl_is_empty(&rctxt->rc_reply_pcl)) {
goto reply_chunk; ret = svc_rdma_prepare_reply_chunk(rdma, &rctxt->rc_write_pcl,
rc_size = ret; &rctxt->rc_reply_pcl, sctxt,
&rqstp->rq_res);
if (ret < 0)
goto reply_chunk;
rc_size = ret;
}
*p++ = *rdma_argp; *p++ = *rdma_argp;
*p++ = *(rdma_argp + 1); *p++ = *(rdma_argp + 1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment