Commit d2727cef authored by Chuck Lever's avatar Chuck Lever

svcrdma: Post WRs for Write chunks in svc_rdma_sendto()

Refactor to eventually enable svcrdma to post the Write WRs for each
RPC response using the same ib_post_send() as the Send WR (ie, as a
single WR chain).

svc_rdma_result_payload (originally svc_rdma_read_payload) was added
so that the upper layer XDR encoder could identify a range of bytes
to be possibly conveyed by RDMA (if a Write chunk was provided by
the client).

The purpose of commit f6ad7759 ("svcrdma: Post RDMA Writes while
XDR encoding replies") was to post as much of the result payload
outside of svc_rdma_sendto() as possible because svc_rdma_sendto()
used to be called with the xpt_mutex held.

However, since commit ca4faf54 ("SUNRPC: Move xpt_mutex into
socket xpo_sendto methods"), the xpt_mutex is no longer held when
calling svc_rdma_sendto(). Thus, that benefit is no longer an issue.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 10e6fc10
...@@ -272,8 +272,8 @@ extern void svc_rdma_cc_release(struct svcxprt_rdma *rdma, ...@@ -272,8 +272,8 @@ extern void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
enum dma_data_direction dir); enum dma_data_direction dir);
extern void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma, extern void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt); struct svc_rdma_send_ctxt *ctxt);
extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, extern int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
const struct svc_rdma_chunk *chunk, const struct svc_rdma_recv_ctxt *rctxt,
const struct xdr_buf *xdr); const struct xdr_buf *xdr);
extern int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma, extern int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_pcl *write_pcl, const struct svc_rdma_pcl *write_pcl,
......
...@@ -601,47 +601,65 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data) ...@@ -601,47 +601,65 @@ static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
return xdr->len; return xdr->len;
} }
/** static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
* svc_rdma_send_write_chunk - Write all segments in a Write chunk
* @rdma: controlling RDMA transport
* @chunk: Write chunk provided by the client
* @xdr: xdr_buf containing the data payload
*
* Returns a non-negative number of bytes the chunk consumed, or
* %-E2BIG if the payload was larger than the Write chunk,
* %-EINVAL if client provided too many segments,
* %-ENOMEM if rdma_rw context pool was exhausted,
* %-ENOTCONN if posting failed (connection is lost),
* %-EIO if rdma_rw initialization failed (DMA mapping, etc).
*/
int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_chunk *chunk, const struct svc_rdma_chunk *chunk,
const struct xdr_buf *xdr) const struct xdr_buf *xdr)
{ {
struct svc_rdma_write_info *info; struct svc_rdma_write_info *info;
struct svc_rdma_chunk_ctxt *cc; struct svc_rdma_chunk_ctxt *cc;
struct xdr_buf payload;
int ret; int ret;
if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position,
chunk->ch_payload_length))
return -EMSGSIZE;
info = svc_rdma_write_info_alloc(rdma, chunk); info = svc_rdma_write_info_alloc(rdma, chunk);
if (!info) if (!info)
return -ENOMEM; return -ENOMEM;
cc = &info->wi_cc; cc = &info->wi_cc;
ret = svc_rdma_xb_write(xdr, info); ret = svc_rdma_xb_write(&payload, info);
if (ret != xdr->len) if (ret != payload.len)
goto out_err; goto out_err;
trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
ret = svc_rdma_post_chunk_ctxt(rdma, cc); ret = svc_rdma_post_chunk_ctxt(rdma, cc);
if (ret < 0) if (ret < 0)
goto out_err; goto out_err;
return xdr->len; return 0;
out_err: out_err:
svc_rdma_write_info_free(info); svc_rdma_write_info_free(info);
return ret; return ret;
} }
/**
* svc_rdma_send_write_list - Send all chunks on the Write list
* @rdma: controlling RDMA transport
* @rctxt: Write list provisioned by the client
* @xdr: xdr_buf containing an RPC Reply message
*
* Returns zero on success, or a negative errno if one or more
* Write chunks could not be sent.
*/
int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt,
const struct xdr_buf *xdr)
{
struct svc_rdma_chunk *chunk;
int ret;
pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
if (!chunk->ch_payload_length)
break;
ret = svc_rdma_send_write_chunk(rdma, chunk, xdr);
if (ret < 0)
return ret;
}
return 0;
}
/** /**
* svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk
* @rdma: controlling RDMA transport * @rdma: controlling RDMA transport
......
...@@ -1013,6 +1013,10 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -1013,6 +1013,10 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
if (!p) if (!p)
goto put_ctxt; goto put_ctxt;
ret = svc_rdma_send_write_list(rdma, rctxt, &rqstp->rq_res);
if (ret < 0)
goto put_ctxt;
rc_size = 0; rc_size = 0;
if (!pcl_is_empty(&rctxt->rc_reply_pcl)) { if (!pcl_is_empty(&rctxt->rc_reply_pcl)) {
ret = svc_rdma_prepare_reply_chunk(rdma, &rctxt->rc_write_pcl, ret = svc_rdma_prepare_reply_chunk(rdma, &rctxt->rc_write_pcl,
...@@ -1064,45 +1068,33 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -1064,45 +1068,33 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
/** /**
* svc_rdma_result_payload - special processing for a result payload * svc_rdma_result_payload - special processing for a result payload
* @rqstp: svc_rqst to operate on * @rqstp: RPC transaction context
* @offset: payload's byte offset in @xdr * @offset: payload's byte offset in @rqstp->rq_res
* @length: size of payload, in bytes * @length: size of payload, in bytes
* *
* Assign the passed-in result payload to the current Write chunk,
* and advance to cur_result_payload to the next Write chunk, if
* there is one.
*
* Return values: * Return values:
* %0 if successful or nothing needed to be done * %0 if successful or nothing needed to be done
* %-EMSGSIZE on XDR buffer overflow
* %-E2BIG if the payload was larger than the Write chunk * %-E2BIG if the payload was larger than the Write chunk
* %-EINVAL if client provided too many segments
* %-ENOMEM if rdma_rw context pool was exhausted
* %-ENOTCONN if posting failed (connection is lost)
* %-EIO if rdma_rw initialization failed (DMA mapping, etc)
*/ */
int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset, int svc_rdma_result_payload(struct svc_rqst *rqstp, unsigned int offset,
unsigned int length) unsigned int length)
{ {
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
struct svc_rdma_chunk *chunk; struct svc_rdma_chunk *chunk;
struct svcxprt_rdma *rdma;
struct xdr_buf subbuf;
int ret;
chunk = rctxt->rc_cur_result_payload; chunk = rctxt->rc_cur_result_payload;
if (!length || !chunk) if (!length || !chunk)
return 0; return 0;
rctxt->rc_cur_result_payload = rctxt->rc_cur_result_payload =
pcl_next_chunk(&rctxt->rc_write_pcl, chunk); pcl_next_chunk(&rctxt->rc_write_pcl, chunk);
if (length > chunk->ch_length) if (length > chunk->ch_length)
return -E2BIG; return -E2BIG;
chunk->ch_position = offset; chunk->ch_position = offset;
chunk->ch_payload_length = length; chunk->ch_payload_length = length;
if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length))
return -EMSGSIZE;
rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
ret = svc_rdma_send_write_chunk(rdma, chunk, &subbuf);
if (ret < 0)
return ret;
return 0; return 0;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment