Commit 85275c87 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Allocate RPC/RDMA send buffer separately from struct rpcrdma_req

The rl_base field is currently the buffer where each RPC/RDMA call
header is built.

The inline threshold is an agreed-on size limit to for RDMA SEND
operations that pass between client and server. The sum of the
RPC/RDMA header size and the RPC header size must be less than or
equal to this threshold.

Increasing the r/wsize maximum will require MAX_SEGS to grow
significantly, but the inline threshold size won't change (both
sides agree on it). The server's inline threshold doesn't change.

Since an RPC/RDMA header can never be larger than the inline
threshold, make all RPC/RDMA header buffers the size of the
inline threshold.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Reviewed-by: default avatarSteve Wise <swise@opengridcomputing.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 0ca77dc3
...@@ -294,7 +294,7 @@ ssize_t ...@@ -294,7 +294,7 @@ ssize_t
rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result) rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result)
{ {
struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base; struct rpcrdma_msg *headerp = rdmab_to_msg(req->rl_rdmabuf);
if (req->rl_rtype != rpcrdma_noch) if (req->rl_rtype != rpcrdma_noch)
result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
...@@ -406,8 +406,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) ...@@ -406,8 +406,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
base = rqst->rq_svec[0].iov_base; base = rqst->rq_svec[0].iov_base;
rpclen = rqst->rq_svec[0].iov_len; rpclen = rqst->rq_svec[0].iov_len;
/* build RDMA header in private area at front */ headerp = rdmab_to_msg(req->rl_rdmabuf);
headerp = (struct rpcrdma_msg *) req->rl_base;
/* don't byte-swap XID, it's already done in request */ /* don't byte-swap XID, it's already done in request */
headerp->rm_xid = rqst->rq_xid; headerp->rm_xid = rqst->rq_xid;
headerp->rm_vers = rpcrdma_version; headerp->rm_vers = rpcrdma_version;
...@@ -528,7 +527,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) ...@@ -528,7 +527,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd" dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd"
" headerp 0x%p base 0x%p lkey 0x%x\n", " headerp 0x%p base 0x%p lkey 0x%x\n",
__func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen, __func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen,
headerp, base, req->rl_iov.lkey); headerp, base, rdmab_lkey(req->rl_rdmabuf));
/* /*
* initialize send_iov's - normally only two: rdma chunk header and * initialize send_iov's - normally only two: rdma chunk header and
...@@ -537,9 +536,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) ...@@ -537,9 +536,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
* header and any write data. In all non-rdma cases, any following * header and any write data. In all non-rdma cases, any following
* data has been copied into the RPC header buffer. * data has been copied into the RPC header buffer.
*/ */
req->rl_send_iov[0].addr = req->rl_iov.addr; req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
req->rl_send_iov[0].length = hdrlen; req->rl_send_iov[0].length = hdrlen;
req->rl_send_iov[0].lkey = req->rl_iov.lkey; req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen; req->rl_send_iov[1].length = rpclen;
......
...@@ -470,6 +470,8 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) ...@@ -470,6 +470,8 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
if (req == NULL) if (req == NULL)
return NULL; return NULL;
if (req->rl_rdmabuf == NULL)
goto out_rdmabuf;
if (req->rl_sendbuf == NULL) if (req->rl_sendbuf == NULL)
goto out_sendbuf; goto out_sendbuf;
if (size > req->rl_sendbuf->rg_size) if (size > req->rl_sendbuf->rg_size)
...@@ -480,6 +482,13 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) ...@@ -480,6 +482,13 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
req->rl_connect_cookie = 0; /* our reserved value */ req->rl_connect_cookie = 0; /* our reserved value */
return req->rl_sendbuf->rg_base; return req->rl_sendbuf->rg_base;
out_rdmabuf:
min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
if (IS_ERR(rb))
goto out_fail;
req->rl_rdmabuf = rb;
out_sendbuf: out_sendbuf:
/* XDR encoding and RPC/RDMA marshaling of this request has not /* XDR encoding and RPC/RDMA marshaling of this request has not
* yet occurred. Thus a lower bound is needed to prevent buffer * yet occurred. Thus a lower bound is needed to prevent buffer
......
...@@ -1078,30 +1078,14 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ...@@ -1078,30 +1078,14 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
static struct rpcrdma_req * static struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{ {
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
size_t wlen = cdata->inline_wsize;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_req *req; struct rpcrdma_req *req;
int rc;
rc = -ENOMEM; req = kzalloc(sizeof(*req), GFP_KERNEL);
req = kmalloc(sizeof(*req) + wlen, GFP_KERNEL);
if (req == NULL) if (req == NULL)
goto out; return ERR_PTR(-ENOMEM);
memset(req, 0, sizeof(*req));
rc = rpcrdma_register_internal(ia, req->rl_base, wlen,
&req->rl_handle, &req->rl_iov);
if (rc)
goto out_free;
req->rl_buffer = &r_xprt->rx_buf; req->rl_buffer = &r_xprt->rx_buf;
return req; return req;
out_free:
kfree(req);
out:
return ERR_PTR(rc);
} }
static struct rpcrdma_rep * static struct rpcrdma_rep *
...@@ -1333,7 +1317,7 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) ...@@ -1333,7 +1317,7 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
return; return;
rpcrdma_free_regbuf(ia, req->rl_sendbuf); rpcrdma_free_regbuf(ia, req->rl_sendbuf);
rpcrdma_deregister_internal(ia, req->rl_handle, &req->rl_iov); rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
kfree(req); kfree(req);
} }
......
...@@ -268,12 +268,10 @@ struct rpcrdma_req { ...@@ -268,12 +268,10 @@ struct rpcrdma_req {
enum rpcrdma_chunktype rl_rtype, rl_wtype; enum rpcrdma_chunktype rl_rtype, rl_wtype;
struct rpcrdma_buffer *rl_buffer; /* home base for this structure */ struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
struct rpcrdma_rep *rl_reply;/* holder for reply buffer */ struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
struct ib_sge rl_send_iov[4]; /* for active requests */ struct ib_sge rl_send_iov[4]; /* for active requests */
struct rpcrdma_regbuf *rl_rdmabuf;
struct rpcrdma_regbuf *rl_sendbuf; struct rpcrdma_regbuf *rl_sendbuf;
struct ib_sge rl_iov; /* for posting */ struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
struct ib_mr *rl_handle; /* handle for mem in rl_iov */
char rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */
}; };
static inline struct rpcrdma_req * static inline struct rpcrdma_req *
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment