Commit 9a6a180b authored by Chuck Lever's avatar Chuck Lever Committed by J. Bruce Fields

svcrdma: Use rdma_rw API in RPC reply path

The current svcrdma sendto code path posts one RDMA Write WR at a
time. Each of these Writes typically carries a small number of pages
(for instance, up to 30 pages for mlx4 devices). That means a 1MB
NFS READ reply requires 9 ib_post_send() calls for the Write WRs,
and one for the Send WR carrying the actual RPC Reply message.

Instead, use the new rdma_rw API. The details of Write WR chain
construction and memory registration are taken care of in the RDMA
core. svcrdma can focus on the details of the RPC-over-RDMA
protocol. This gives three main benefits:

1. All Write WRs for one RDMA segment are posted in a single chain.
As few as one ib_post_send() for each Write chunk.

2. The Write path can now use FRWR to register the Write buffers.
If the device's maximum page list depth is large, this means a
single Write WR is needed for each RPC's Write chunk data.

3. The new code introduces support for RPCs that carry both a Write
list and a Reply chunk. This combination can be used for an NFSv4
READ where the data payload is large, and thus is removed from the
Payload Stream, but the Payload Stream is still larger than the
inline threshold.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarJ. Bruce Fields <bfields@redhat.com>
parent f13193f5
...@@ -212,7 +212,6 @@ extern int svc_rdma_xdr_decode_req(struct xdr_buf *); ...@@ -212,7 +212,6 @@ extern int svc_rdma_xdr_decode_req(struct xdr_buf *);
extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *, extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
struct rpcrdma_msg *, struct rpcrdma_msg *,
enum rpcrdma_errcode, __be32 *); enum rpcrdma_errcode, __be32 *);
extern void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *, int);
extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int); extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int);
extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int, extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int,
__be32, __be64, u32); __be32, __be64, u32);
......
...@@ -90,9 +90,9 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp, ...@@ -90,9 +90,9 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
* Caller holds the connection's mutex and has already marshaled * Caller holds the connection's mutex and has already marshaled
* the RPC/RDMA request. * the RPC/RDMA request.
* *
* This is similar to svc_rdma_reply, but takes an rpc_rqst * This is similar to svc_rdma_send_reply_msg, but takes a struct
* instead, does not support chunks, and avoids blocking memory * rpc_rqst instead, does not support chunks, and avoids blocking
* allocation. * memory allocation.
* *
* XXX: There is still an opportunity to block in svc_rdma_send() * XXX: There is still an opportunity to block in svc_rdma_send()
* if there are no SQ entries to post the Send. This may occur if * if there are no SQ entries to post the Send. This may occur if
......
/* /*
* Copyright (c) 2016 Oracle. All rights reserved.
* Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
* Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
* *
...@@ -40,6 +41,63 @@ ...@@ -40,6 +41,63 @@
* Author: Tom Tucker <tom@opengridcomputing.com> * Author: Tom Tucker <tom@opengridcomputing.com>
*/ */
/* Operation
*
* The main entry point is svc_rdma_sendto. This is called by the
* RPC server when an RPC Reply is ready to be transmitted to a client.
*
* The passed-in svc_rqst contains a struct xdr_buf which holds an
* XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
* transport header, post all Write WRs needed for this Reply, then post
* a Send WR conveying the transport header and the RPC message itself to
* the client.
*
* svc_rdma_sendto must fully transmit the Reply before returning, as
* the svc_rqst will be recycled as soon as sendto returns. Remaining
* resources referred to by the svc_rqst are also recycled at that time.
* Therefore any resources that must remain longer must be detached
* from the svc_rqst and released later.
*
* Page Management
*
* The I/O that performs Reply transmission is asynchronous, and may
* complete well after sendto returns. Thus pages under I/O must be
* removed from the svc_rqst before sendto returns.
*
* The logic here depends on Send Queue and completion ordering. Since
* the Send WR is always posted last, it will always complete last. Thus
* when it completes, it is guaranteed that all previous Write WRs have
* also completed.
*
* Write WRs are constructed and posted. Each Write segment gets its own
* svc_rdma_rw_ctxt, allowing the Write completion handler to find and
* DMA-unmap the pages under I/O for that Write segment. The Write
* completion handler does not release any pages.
*
* When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt.
* The ownership of all of the Reply's pages are transferred into that
* ctxt, the Send WR is posted, and sendto returns.
*
* The svc_rdma_op_ctxt is presented when the Send WR completes. The
* Send completion handler finally releases the Reply's pages.
*
* This mechanism also assumes that completions on the transport's Send
* Completion Queue do not run in parallel. Otherwise a Write completion
* and Send completion running at the same time could release pages that
* are still DMA-mapped.
*
* Error Handling
*
* - If the Send WR is posted successfully, it will either complete
* successfully, or get flushed. Either way, the Send completion
* handler releases the Reply's pages.
* - If the Send WR cannot be not posted, the forward path releases
* the Reply's pages.
*
* This handles the case, without the use of page reference counting,
* where two different Write segments send portions of the same page.
*/
#include <linux/sunrpc/debug.h> #include <linux/sunrpc/debug.h>
#include <linux/sunrpc/rpc_rdma.h> #include <linux/sunrpc/rpc_rdma.h>
#include <linux/spinlock.h> #include <linux/spinlock.h>
...@@ -55,6 +113,133 @@ static u32 xdr_padsize(u32 len) ...@@ -55,6 +113,133 @@ static u32 xdr_padsize(u32 len)
return (len & 3) ? (4 - (len & 3)) : 0; return (len & 3) ? (4 - (len & 3)) : 0;
} }
/* Returns length of transport header, in bytes.
*/
static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
{
unsigned int nsegs;
__be32 *p;
p = rdma_resp;
/* RPC-over-RDMA V1 replies never have a Read list. */
p += rpcrdma_fixed_maxsz + 1;
/* Skip Write list. */
while (*p++ != xdr_zero) {
nsegs = be32_to_cpup(p++);
p += nsegs * rpcrdma_segment_maxsz;
}
/* Skip Reply chunk. */
if (*p++ != xdr_zero) {
nsegs = be32_to_cpup(p++);
p += nsegs * rpcrdma_segment_maxsz;
}
return (unsigned long)p - (unsigned long)rdma_resp;
}
/* One Write chunk is copied from Call transport header to Reply
* transport header. Each segment's length field is updated to
* reflect number of bytes consumed in the segment.
*
* Returns number of segments in this chunk.
*/
static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
unsigned int remaining)
{
unsigned int i, nsegs;
u32 seg_len;
/* Write list discriminator */
*dst++ = *src++;
/* number of segments in this chunk */
nsegs = be32_to_cpup(src);
*dst++ = *src++;
for (i = nsegs; i; i--) {
/* segment's RDMA handle */
*dst++ = *src++;
/* bytes returned in this segment */
seg_len = be32_to_cpu(*src);
if (remaining >= seg_len) {
/* entire segment was consumed */
*dst = *src;
remaining -= seg_len;
} else {
/* segment only partly filled */
*dst = cpu_to_be32(remaining);
remaining = 0;
}
dst++; src++;
/* segment's RDMA offset */
*dst++ = *src++;
*dst++ = *src++;
}
return nsegs;
}
/* The client provided a Write list in the Call message. Fill in
* the segments in the first Write chunk in the Reply's transport
* header with the number of bytes consumed in each segment.
* Remaining chunks are returned unused.
*
* Assumptions:
* - Client has provided only one Write chunk
*/
static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
unsigned int consumed)
{
unsigned int nsegs;
__be32 *p, *q;
/* RPC-over-RDMA V1 replies never have a Read list. */
p = rdma_resp + rpcrdma_fixed_maxsz + 1;
q = wr_ch;
while (*q != xdr_zero) {
nsegs = xdr_encode_write_chunk(p, q, consumed);
q += 2 + nsegs * rpcrdma_segment_maxsz;
p += 2 + nsegs * rpcrdma_segment_maxsz;
consumed = 0;
}
/* Terminate Write list */
*p++ = xdr_zero;
/* Reply chunk discriminator; may be replaced later */
*p = xdr_zero;
}
/* The client provided a Reply chunk in the Call message. Fill in
* the segments in the Reply chunk in the Reply message with the
* number of bytes consumed in each segment.
*
* Assumptions:
* - Reply can always fit in the provided Reply chunk
*/
static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
unsigned int consumed)
{
__be32 *p;
/* Find the Reply chunk in the Reply's xprt header.
* RPC-over-RDMA V1 replies never have a Read list.
*/
p = rdma_resp + rpcrdma_fixed_maxsz + 1;
/* Skip past Write list */
while (*p++ != xdr_zero)
p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
xdr_encode_write_chunk(p, rp_ch, consumed);
}
int svc_rdma_map_xdr(struct svcxprt_rdma *xprt, int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
struct xdr_buf *xdr, struct xdr_buf *xdr,
struct svc_rdma_req_map *vec, struct svc_rdma_req_map *vec,
...@@ -123,45 +308,14 @@ int svc_rdma_map_xdr(struct svcxprt_rdma *xprt, ...@@ -123,45 +308,14 @@ int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
return 0; return 0;
} }
static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
struct xdr_buf *xdr,
u32 xdr_off, size_t len, int dir)
{
struct page *page;
dma_addr_t dma_addr;
if (xdr_off < xdr->head[0].iov_len) {
/* This offset is in the head */
xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
page = virt_to_page(xdr->head[0].iov_base);
} else {
xdr_off -= xdr->head[0].iov_len;
if (xdr_off < xdr->page_len) {
/* This offset is in the page list */
xdr_off += xdr->page_base;
page = xdr->pages[xdr_off >> PAGE_SHIFT];
xdr_off &= ~PAGE_MASK;
} else {
/* This offset is in the tail */
xdr_off -= xdr->page_len;
xdr_off += (unsigned long)
xdr->tail[0].iov_base & ~PAGE_MASK;
page = virt_to_page(xdr->tail[0].iov_base);
}
}
dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
min_t(size_t, PAGE_SIZE, len), dir);
return dma_addr;
}
/* Parse the RPC Call's transport header. /* Parse the RPC Call's transport header.
*/ */
static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
struct rpcrdma_write_array **write, __be32 **write, __be32 **reply)
struct rpcrdma_write_array **reply)
{ {
__be32 *p; __be32 *p;
p = (__be32 *)&rmsgp->rm_body.rm_chunks[0]; p = rdma_argp + rpcrdma_fixed_maxsz;
/* Read list */ /* Read list */
while (*p++ != xdr_zero) while (*p++ != xdr_zero)
...@@ -169,7 +323,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, ...@@ -169,7 +323,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
/* Write list */ /* Write list */
if (*p != xdr_zero) { if (*p != xdr_zero) {
*write = (struct rpcrdma_write_array *)p; *write = p;
while (*p++ != xdr_zero) while (*p++ != xdr_zero)
p += 1 + be32_to_cpu(*p) * 4; p += 1 + be32_to_cpu(*p) * 4;
} else { } else {
...@@ -179,7 +333,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, ...@@ -179,7 +333,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
/* Reply chunk */ /* Reply chunk */
if (*p != xdr_zero) if (*p != xdr_zero)
*reply = (struct rpcrdma_write_array *)p; *reply = p;
else else
*reply = NULL; *reply = NULL;
} }
...@@ -210,6 +364,32 @@ static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp, ...@@ -210,6 +364,32 @@ static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
return be32_to_cpup(p); return be32_to_cpup(p);
} }
/* ib_dma_map_page() is used here because svc_rdma_dma_unmap()
* is used during completion to DMA-unmap this memory, and
* it uses ib_dma_unmap_page() exclusively.
*/
static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
struct svc_rdma_op_ctxt *ctxt,
unsigned int sge_no,
unsigned char *base,
unsigned int len)
{
unsigned long offset = (unsigned long)base & ~PAGE_MASK;
struct ib_device *dev = rdma->sc_cm_id->device;
dma_addr_t dma_addr;
dma_addr = ib_dma_map_page(dev, virt_to_page(base),
offset, len, DMA_TO_DEVICE);
if (ib_dma_mapping_error(dev, dma_addr))
return -EIO;
ctxt->sge[sge_no].addr = dma_addr;
ctxt->sge[sge_no].length = len;
ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
svc_rdma_count_mappings(rdma, ctxt);
return 0;
}
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
struct svc_rdma_op_ctxt *ctxt, struct svc_rdma_op_ctxt *ctxt,
unsigned int sge_no, unsigned int sge_no,
...@@ -253,222 +433,73 @@ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, ...@@ -253,222 +433,73 @@ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len); return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
} }
/* Assumptions: /* Load the xdr_buf into the ctxt's sge array, and DMA map each
* - The specified write_len can be represented in sc_max_sge * PAGE_SIZE * element as it is added.
*
* Returns the number of sge elements loaded on success, or
* a negative errno on failure.
*/ */
static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
u32 rmr, u64 to, struct svc_rdma_op_ctxt *ctxt,
u32 xdr_off, int write_len, struct xdr_buf *xdr, __be32 *wr_lst)
struct svc_rdma_req_map *vec)
{ {
struct ib_rdma_wr write_wr; unsigned int len, sge_no, remaining, page_off;
struct ib_sge *sge; struct page **ppages;
int xdr_sge_no; unsigned char *base;
int sge_no; u32 xdr_pad;
int sge_bytes; int ret;
int sge_off;
int bc;
struct svc_rdma_op_ctxt *ctxt;
if (vec->count > RPCSVC_MAXPAGES) { sge_no = 1;
pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
return -EIO;
}
dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, " ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++,
"write_len=%d, vec->sge=%p, vec->count=%lu\n", xdr->head[0].iov_base,
rmr, (unsigned long long)to, xdr_off, xdr->head[0].iov_len);
write_len, vec->sge, vec->count); if (ret < 0)
return ret;
ctxt = svc_rdma_get_context(xprt); /* If a Write chunk is present, the xdr_buf's page list
ctxt->direction = DMA_TO_DEVICE; * is not included inline. However the Upper Layer may
sge = ctxt->sge; * have added XDR padding in the tail buffer, and that
* should not be included inline.
/* Find the SGE associated with xdr_off */ */
for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count; if (wr_lst) {
xdr_sge_no++) { base = xdr->tail[0].iov_base;
if (vec->sge[xdr_sge_no].iov_len > bc) len = xdr->tail[0].iov_len;
break; xdr_pad = xdr_padsize(xdr->page_len);
bc -= vec->sge[xdr_sge_no].iov_len;
}
sge_off = bc; if (len && xdr_pad) {
bc = write_len; base += xdr_pad;
sge_no = 0; len -= xdr_pad;
/* Copy the remaining SGE */
while (bc != 0) {
sge_bytes = min_t(size_t,
bc, vec->sge[xdr_sge_no].iov_len-sge_off);
sge[sge_no].length = sge_bytes;
sge[sge_no].addr =
dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
sge_bytes, DMA_TO_DEVICE);
xdr_off += sge_bytes;
if (ib_dma_mapping_error(xprt->sc_cm_id->device,
sge[sge_no].addr))
goto err;
svc_rdma_count_mappings(xprt, ctxt);
sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
ctxt->count++;
sge_off = 0;
sge_no++;
xdr_sge_no++;
if (xdr_sge_no > vec->count) {
pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
goto err;
}
bc -= sge_bytes;
if (sge_no == xprt->sc_max_sge)
break;
} }
/* Prepare WRITE WR */ goto tail;
memset(&write_wr, 0, sizeof write_wr);
ctxt->cqe.done = svc_rdma_wc_write;
write_wr.wr.wr_cqe = &ctxt->cqe;
write_wr.wr.sg_list = &sge[0];
write_wr.wr.num_sge = sge_no;
write_wr.wr.opcode = IB_WR_RDMA_WRITE;
write_wr.wr.send_flags = IB_SEND_SIGNALED;
write_wr.rkey = rmr;
write_wr.remote_addr = to;
/* Post It */
atomic_inc(&rdma_stat_write);
if (svc_rdma_send(xprt, &write_wr.wr))
goto err;
return write_len - bc;
err:
svc_rdma_unmap_dma(ctxt);
svc_rdma_put_context(ctxt, 0);
return -EIO;
}
noinline
static int send_write_chunks(struct svcxprt_rdma *xprt,
struct rpcrdma_write_array *wr_ary,
struct rpcrdma_msg *rdma_resp,
struct svc_rqst *rqstp,
struct svc_rdma_req_map *vec)
{
u32 xfer_len = rqstp->rq_res.page_len;
int write_len;
u32 xdr_off;
int chunk_off;
int chunk_no;
int nchunks;
struct rpcrdma_write_array *res_ary;
int ret;
res_ary = (struct rpcrdma_write_array *)
&rdma_resp->rm_body.rm_chunks[1];
/* Write chunks start at the pagelist */
nchunks = be32_to_cpu(wr_ary->wc_nchunks);
for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
xfer_len && chunk_no < nchunks;
chunk_no++) {
struct rpcrdma_segment *arg_ch;
u64 rs_offset;
arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
/* Prepare the response chunk given the length actually
* written */
xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
arg_ch->rs_handle,
arg_ch->rs_offset,
write_len);
chunk_off = 0;
while (write_len) {
ret = send_write(xprt, rqstp,
be32_to_cpu(arg_ch->rs_handle),
rs_offset + chunk_off,
xdr_off,
write_len,
vec);
if (ret <= 0)
goto out_err;
chunk_off += ret;
xdr_off += ret;
xfer_len -= ret;
write_len -= ret;
}
} }
/* Update the req with the number of chunks actually used */
svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
return rqstp->rq_res.page_len; ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
page_off = xdr->page_base & ~PAGE_MASK;
remaining = xdr->page_len;
while (remaining) {
len = min_t(u32, PAGE_SIZE - page_off, remaining);
out_err: ret = svc_rdma_dma_map_page(rdma, ctxt, sge_no++,
pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret); *ppages++, page_off, len);
return -EIO; if (ret < 0)
} return ret;
noinline
static int send_reply_chunks(struct svcxprt_rdma *xprt,
struct rpcrdma_write_array *rp_ary,
struct rpcrdma_msg *rdma_resp,
struct svc_rqst *rqstp,
struct svc_rdma_req_map *vec)
{
u32 xfer_len = rqstp->rq_res.len;
int write_len;
u32 xdr_off;
int chunk_no;
int chunk_off;
int nchunks;
struct rpcrdma_segment *ch;
struct rpcrdma_write_array *res_ary;
int ret;
/* XXX: need to fix when reply lists occur with read-list and or remaining -= len;
* write-list */ page_off = 0;
res_ary = (struct rpcrdma_write_array *)
&rdma_resp->rm_body.rm_chunks[2];
/* xdr offset starts at RPC message */
nchunks = be32_to_cpu(rp_ary->wc_nchunks);
for (xdr_off = 0, chunk_no = 0;
xfer_len && chunk_no < nchunks;
chunk_no++) {
u64 rs_offset;
ch = &rp_ary->wc_array[chunk_no].wc_target;
write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
/* Prepare the reply chunk given the length actually
* written */
xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
ch->rs_handle, ch->rs_offset,
write_len);
chunk_off = 0;
while (write_len) {
ret = send_write(xprt, rqstp,
be32_to_cpu(ch->rs_handle),
rs_offset + chunk_off,
xdr_off,
write_len,
vec);
if (ret <= 0)
goto out_err;
chunk_off += ret;
xdr_off += ret;
xfer_len -= ret;
write_len -= ret;
}
} }
/* Update the req with the number of chunks actually used */
svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
return rqstp->rq_res.len; base = xdr->tail[0].iov_base;
len = xdr->tail[0].iov_len;
tail:
if (len) {
ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, base, len);
if (ret < 0)
return ret;
}
out_err: return sge_no - 1;
pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
return -EIO;
} }
/* The svc_rqst and all resources it owns are released as soon as /* The svc_rqst and all resources it owns are released as soon as
...@@ -525,90 +556,66 @@ int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, ...@@ -525,90 +556,66 @@ int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
return svc_rdma_send(rdma, send_wr); return svc_rdma_send(rdma, send_wr);
} }
/* This function prepares the portion of the RPCRDMA message to be /* Prepare the portion of the RPC Reply that will be transmitted
* sent in the RDMA_SEND. This function is called after data sent via * via RDMA Send. The RPC-over-RDMA transport header is prepared
* RDMA has already been transmitted. There are three cases: * in sge[0], and the RPC xdr_buf is prepared in following sges.
* - The RPCRDMA header, RPC header, and payload are all sent in a *
* single RDMA_SEND. This is the "inline" case. * Depending on whether a Write list or Reply chunk is present,
* - The RPCRDMA header and some portion of the RPC header and data * the server may send all, a portion of, or none of the xdr_buf.
* are sent via this RDMA_SEND and another portion of the data is * In the latter case, only the transport header (sge[0]) is
* sent via RDMA. * transmitted.
* - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC *
* header and data are all transmitted via RDMA. * RDMA Send is the last step of transmitting an RPC reply. Pages
* In all three cases, this function prepares the RPCRDMA header in * involved in the earlier RDMA Writes are here transferred out
* sge[0], the 'type' parameter indicates the type to place in the * of the rqstp and into the ctxt's page array. These pages are
* RPCRDMA header, and the 'byte_count' field indicates how much of * DMA unmapped by each Write completion, but the subsequent Send
* the XDR to include in this RDMA_SEND. NB: The offset of the payload * completion finally releases these pages.
* to send is zero in the XDR. *
* Assumptions:
* - The Reply's transport header will never be larger than a page.
*/ */
static int send_reply(struct svcxprt_rdma *rdma, static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
__be32 *rdma_argp, __be32 *rdma_resp,
struct svc_rqst *rqstp, struct svc_rqst *rqstp,
struct page *page, __be32 *wr_lst, __be32 *rp_ch)
struct rpcrdma_msg *rdma_resp,
struct svc_rdma_req_map *vec,
int byte_count,
u32 inv_rkey)
{ {
struct svc_rdma_op_ctxt *ctxt; struct svc_rdma_op_ctxt *ctxt;
u32 xdr_off; u32 inv_rkey;
int sge_no; int ret;
int sge_bytes;
int ret = -EIO; dprintk("svcrdma: sending %s reply: head=%zu, pagelen=%u, tail=%zu\n",
(rp_ch ? "RDMA_NOMSG" : "RDMA_MSG"),
rqstp->rq_res.head[0].iov_len,
rqstp->rq_res.page_len,
rqstp->rq_res.tail[0].iov_len);
/* Prepare the context */
ctxt = svc_rdma_get_context(rdma); ctxt = svc_rdma_get_context(rdma);
ctxt->direction = DMA_TO_DEVICE;
ctxt->pages[0] = page;
ctxt->count = 1;
/* Prepare the SGE for the RPCRDMA Header */ ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; svc_rdma_reply_hdr_len(rdma_resp));
ctxt->sge[0].length = if (ret < 0)
svc_rdma_xdr_get_reply_hdr_len((__be32 *)rdma_resp);
ctxt->sge[0].addr =
ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
ctxt->sge[0].length, DMA_TO_DEVICE);
if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
goto err; goto err;
svc_rdma_count_mappings(rdma, ctxt);
ctxt->direction = DMA_TO_DEVICE;
/* Map the payload indicated by 'byte_count' */ if (!rp_ch) {
xdr_off = 0; ret = svc_rdma_map_reply_msg(rdma, ctxt,
for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) { &rqstp->rq_res, wr_lst);
sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count); if (ret < 0)
byte_count -= sge_bytes;
ctxt->sge[sge_no].addr =
dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
sge_bytes, DMA_TO_DEVICE);
xdr_off += sge_bytes;
if (ib_dma_mapping_error(rdma->sc_cm_id->device,
ctxt->sge[sge_no].addr))
goto err;
svc_rdma_count_mappings(rdma, ctxt);
ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
ctxt->sge[sge_no].length = sge_bytes;
}
if (byte_count != 0) {
pr_err("svcrdma: Could not map %d bytes\n", byte_count);
goto err; goto err;
} }
svc_rdma_save_io_pages(rqstp, ctxt); svc_rdma_save_io_pages(rqstp, ctxt);
if (sge_no > rdma->sc_max_sge) { inv_rkey = 0;
pr_err("svcrdma: Too many sges (%d)\n", sge_no); if (rdma->sc_snd_w_inv)
goto err; inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch);
} ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, inv_rkey);
ret = svc_rdma_post_send_wr(rdma, ctxt, sge_no, inv_rkey);
if (ret) if (ret)
goto err; goto err;
return 0; return 0;
err: err:
pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
svc_rdma_unmap_dma(ctxt); svc_rdma_unmap_dma(ctxt);
svc_rdma_put_context(ctxt, 1); svc_rdma_put_context(ctxt, 1);
return ret; return ret;
...@@ -618,41 +625,36 @@ void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp) ...@@ -618,41 +625,36 @@ void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
{ {
} }
/**
* svc_rdma_sendto - Transmit an RPC reply
* @rqstp: processed RPC request, reply XDR already in ::rq_res
*
* Any resources still associated with @rqstp are released upon return.
* If no reply message was possible, the connection is closed.
*
* Returns:
* %0 if an RPC reply has been successfully posted,
* %-ENOMEM if a resource shortage occurred (connection is lost),
* %-ENOTCONN if posting failed (connection is lost).
*/
int svc_rdma_sendto(struct svc_rqst *rqstp) int svc_rdma_sendto(struct svc_rqst *rqstp)
{ {
struct svc_xprt *xprt = rqstp->rq_xprt; struct svc_xprt *xprt = rqstp->rq_xprt;
struct svcxprt_rdma *rdma = struct svcxprt_rdma *rdma =
container_of(xprt, struct svcxprt_rdma, sc_xprt); container_of(xprt, struct svcxprt_rdma, sc_xprt);
struct rpcrdma_msg *rdma_argp; __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
struct rpcrdma_msg *rdma_resp; struct xdr_buf *xdr = &rqstp->rq_res;
struct rpcrdma_write_array *wr_ary, *rp_ary;
int ret;
int inline_bytes;
struct page *res_page; struct page *res_page;
struct svc_rdma_req_map *vec; int ret;
u32 inv_rkey;
__be32 *p;
dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
/* Get the RDMA request header. The receive logic always /* Find the call's chunk lists to decide how to send the reply.
* places this at the start of page 0. * Receive places the Call's xprt header at the start of page 0.
*/ */
rdma_argp = page_address(rqstp->rq_pages[0]); rdma_argp = page_address(rqstp->rq_pages[0]);
svc_rdma_get_write_arrays(rdma_argp, &wr_ary, &rp_ary); svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
inv_rkey = 0;
if (rdma->sc_snd_w_inv)
inv_rkey = svc_rdma_get_inv_rkey(&rdma_argp->rm_xid,
(__be32 *)wr_ary,
(__be32 *)rp_ary);
/* Build an req vec for the XDR */ dprintk("svcrdma: preparing response for XID 0x%08x\n",
vec = svc_rdma_get_req_map(rdma); be32_to_cpup(rdma_argp));
ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
if (ret)
goto err0;
inline_bytes = rqstp->rq_res.len;
/* Create the RDMA response header. xprt->xpt_mutex, /* Create the RDMA response header. xprt->xpt_mutex,
* acquired in svc_send(), serializes RPC replies. The * acquired in svc_send(), serializes RPC replies. The
...@@ -666,54 +668,46 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -666,54 +668,46 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
goto err0; goto err0;
rdma_resp = page_address(res_page); rdma_resp = page_address(res_page);
p = &rdma_resp->rm_xid; p = rdma_resp;
*p++ = rdma_argp->rm_xid; *p++ = *rdma_argp;
*p++ = rdma_argp->rm_vers; *p++ = *(rdma_argp + 1);
*p++ = rdma->sc_fc_credits; *p++ = rdma->sc_fc_credits;
*p++ = rp_ary ? rdma_nomsg : rdma_msg; *p++ = rp_ch ? rdma_nomsg : rdma_msg;
/* Start with empty chunks */ /* Start with empty chunks */
*p++ = xdr_zero; *p++ = xdr_zero;
*p++ = xdr_zero; *p++ = xdr_zero;
*p = xdr_zero; *p = xdr_zero;
/* Send any write-chunk data and build resp write-list */ if (wr_lst) {
if (wr_ary) { /* XXX: Presume the client sent only one Write chunk */
ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec); ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
if (ret < 0) if (ret < 0)
goto err1; goto err1;
inline_bytes -= ret + xdr_padsize(ret); svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
} }
if (rp_ch) {
/* Send any reply-list data and update resp reply-list */ ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
if (rp_ary) {
ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
if (ret < 0) if (ret < 0)
goto err1; goto err1;
inline_bytes -= ret; svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
} }
/* Post a fresh Receive buffer _before_ sending the reply */
ret = svc_rdma_post_recv(rdma, GFP_KERNEL); ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
if (ret) if (ret)
goto err1; goto err1;
ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp,
ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec, wr_lst, rp_ch);
inline_bytes, inv_rkey);
if (ret < 0) if (ret < 0)
goto err0; goto err0;
return 0;
svc_rdma_put_req_map(rdma, vec);
dprintk("svcrdma: send_reply returns %d\n", ret);
return ret;
err1: err1:
put_page(res_page); put_page(res_page);
err0: err0:
svc_rdma_put_req_map(rdma, vec);
pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n", pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
ret); ret);
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); set_bit(XPT_CLOSE, &xprt->xpt_flags);
return -ENOTCONN; return -ENOTCONN;
} }
......
...@@ -1053,6 +1053,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ...@@ -1053,6 +1053,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
memset(&qp_attr, 0, sizeof qp_attr); memset(&qp_attr, 0, sizeof qp_attr);
qp_attr.event_handler = qp_event_handler; qp_attr.event_handler = qp_event_handler;
qp_attr.qp_context = &newxprt->sc_xprt; qp_attr.qp_context = &newxprt->sc_xprt;
qp_attr.port_num = newxprt->sc_cm_id->port_num;
qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests;
qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
qp_attr.cap.max_send_sge = newxprt->sc_max_sge; qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment