Commit 76e5492b authored by Chuck Lever's avatar Chuck Lever

NFSD: Invoke svc_encode_result_payload() in "read" NFSD encoders

Have the NFSD encoders annotate the boundaries of every
direct-data-placement eligible result data payload. Then change
svcrdma to use that annotation instead of the xdr->page_len
when handling Write chunks.

For NFSv4 on RDMA, that enables the ability to recognize multiple
result payloads per compound. This is a pre-requisite for supporting
multiple Write chunks per RPC transaction.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 03493bca
...@@ -707,6 +707,7 @@ int ...@@ -707,6 +707,7 @@ int
nfs3svc_encode_readlinkres(struct svc_rqst *rqstp, __be32 *p) nfs3svc_encode_readlinkres(struct svc_rqst *rqstp, __be32 *p)
{ {
struct nfsd3_readlinkres *resp = rqstp->rq_resp; struct nfsd3_readlinkres *resp = rqstp->rq_resp;
struct kvec *head = rqstp->rq_res.head;
*p++ = resp->status; *p++ = resp->status;
p = encode_post_op_attr(rqstp, p, &resp->fh); p = encode_post_op_attr(rqstp, p, &resp->fh);
...@@ -720,6 +721,8 @@ nfs3svc_encode_readlinkres(struct svc_rqst *rqstp, __be32 *p) ...@@ -720,6 +721,8 @@ nfs3svc_encode_readlinkres(struct svc_rqst *rqstp, __be32 *p)
*p = 0; *p = 0;
rqstp->rq_res.tail[0].iov_len = 4 - (resp->len&3); rqstp->rq_res.tail[0].iov_len = 4 - (resp->len&3);
} }
if (svc_encode_result_payload(rqstp, head->iov_len, resp->len))
return 0;
return 1; return 1;
} else } else
return xdr_ressize_check(rqstp, p); return xdr_ressize_check(rqstp, p);
...@@ -730,6 +733,7 @@ int ...@@ -730,6 +733,7 @@ int
nfs3svc_encode_readres(struct svc_rqst *rqstp, __be32 *p) nfs3svc_encode_readres(struct svc_rqst *rqstp, __be32 *p)
{ {
struct nfsd3_readres *resp = rqstp->rq_resp; struct nfsd3_readres *resp = rqstp->rq_resp;
struct kvec *head = rqstp->rq_res.head;
*p++ = resp->status; *p++ = resp->status;
p = encode_post_op_attr(rqstp, p, &resp->fh); p = encode_post_op_attr(rqstp, p, &resp->fh);
...@@ -746,6 +750,9 @@ nfs3svc_encode_readres(struct svc_rqst *rqstp, __be32 *p) ...@@ -746,6 +750,9 @@ nfs3svc_encode_readres(struct svc_rqst *rqstp, __be32 *p)
*p = 0; *p = 0;
rqstp->rq_res.tail[0].iov_len = 4 - (resp->count & 3); rqstp->rq_res.tail[0].iov_len = 4 - (resp->count & 3);
} }
if (svc_encode_result_payload(rqstp, head->iov_len,
resp->count))
return 0;
return 1; return 1;
} else } else
return xdr_ressize_check(rqstp, p); return xdr_ressize_check(rqstp, p);
......
...@@ -3756,8 +3756,8 @@ static __be32 nfsd4_encode_splice_read( ...@@ -3756,8 +3756,8 @@ static __be32 nfsd4_encode_splice_read(
{ {
struct xdr_stream *xdr = &resp->xdr; struct xdr_stream *xdr = &resp->xdr;
struct xdr_buf *buf = xdr->buf; struct xdr_buf *buf = xdr->buf;
int status, space_left;
u32 eof; u32 eof;
int space_left;
__be32 nfserr; __be32 nfserr;
__be32 *p = xdr->p - 2; __be32 *p = xdr->p - 2;
...@@ -3768,14 +3768,13 @@ static __be32 nfsd4_encode_splice_read( ...@@ -3768,14 +3768,13 @@ static __be32 nfsd4_encode_splice_read(
nfserr = nfsd_splice_read(read->rd_rqstp, read->rd_fhp, nfserr = nfsd_splice_read(read->rd_rqstp, read->rd_fhp,
file, read->rd_offset, &maxcount, &eof); file, read->rd_offset, &maxcount, &eof);
read->rd_length = maxcount; read->rd_length = maxcount;
if (nfserr) { if (nfserr)
/* goto out_err;
* nfsd_splice_actor may have already messed with the status = svc_encode_result_payload(read->rd_rqstp,
* page length; reset it so as not to confuse buf->head[0].iov_len, maxcount);
* xdr_truncate_encode: if (status) {
*/ nfserr = nfserrno(status);
buf->page_len = 0; goto out_err;
return nfserr;
} }
*(p++) = htonl(eof); *(p++) = htonl(eof);
...@@ -3806,6 +3805,15 @@ static __be32 nfsd4_encode_splice_read( ...@@ -3806,6 +3805,15 @@ static __be32 nfsd4_encode_splice_read(
xdr->end = (__be32 *)((void *)xdr->end + space_left); xdr->end = (__be32 *)((void *)xdr->end + space_left);
return 0; return 0;
out_err:
/*
* nfsd_splice_actor may have already messed with the
* page length; reset it so as not to confuse
* xdr_truncate_encode in our caller.
*/
buf->page_len = 0;
return nfserr;
} }
static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp, static __be32 nfsd4_encode_readv(struct nfsd4_compoundres *resp,
...@@ -3897,6 +3905,7 @@ nfsd4_encode_readlink(struct nfsd4_compoundres *resp, __be32 nfserr, struct nfsd ...@@ -3897,6 +3905,7 @@ nfsd4_encode_readlink(struct nfsd4_compoundres *resp, __be32 nfserr, struct nfsd
int zero = 0; int zero = 0;
struct xdr_stream *xdr = &resp->xdr; struct xdr_stream *xdr = &resp->xdr;
int length_offset = xdr->buf->len; int length_offset = xdr->buf->len;
int status;
__be32 *p; __be32 *p;
p = xdr_reserve_space(xdr, 4); p = xdr_reserve_space(xdr, 4);
...@@ -3917,9 +3926,13 @@ nfsd4_encode_readlink(struct nfsd4_compoundres *resp, __be32 nfserr, struct nfsd ...@@ -3917,9 +3926,13 @@ nfsd4_encode_readlink(struct nfsd4_compoundres *resp, __be32 nfserr, struct nfsd
(char *)p, &maxcount); (char *)p, &maxcount);
if (nfserr == nfserr_isdir) if (nfserr == nfserr_isdir)
nfserr = nfserr_inval; nfserr = nfserr_inval;
if (nfserr) { if (nfserr)
xdr_truncate_encode(xdr, length_offset); goto out_err;
return nfserr; status = svc_encode_result_payload(readlink->rl_rqstp, length_offset,
maxcount);
if (status) {
nfserr = nfserrno(status);
goto out_err;
} }
wire_count = htonl(maxcount); wire_count = htonl(maxcount);
...@@ -3929,6 +3942,10 @@ nfsd4_encode_readlink(struct nfsd4_compoundres *resp, __be32 nfserr, struct nfsd ...@@ -3929,6 +3942,10 @@ nfsd4_encode_readlink(struct nfsd4_compoundres *resp, __be32 nfserr, struct nfsd
write_bytes_to_xdr_buf(xdr->buf, length_offset + 4 + maxcount, write_bytes_to_xdr_buf(xdr->buf, length_offset + 4 + maxcount,
&zero, 4 - (maxcount&3)); &zero, 4 - (maxcount&3));
return 0; return 0;
out_err:
xdr_truncate_encode(xdr, length_offset);
return nfserr;
} }
static __be32 static __be32
......
...@@ -469,6 +469,7 @@ int ...@@ -469,6 +469,7 @@ int
nfssvc_encode_readlinkres(struct svc_rqst *rqstp, __be32 *p) nfssvc_encode_readlinkres(struct svc_rqst *rqstp, __be32 *p)
{ {
struct nfsd_readlinkres *resp = rqstp->rq_resp; struct nfsd_readlinkres *resp = rqstp->rq_resp;
struct kvec *head = rqstp->rq_res.head;
*p++ = resp->status; *p++ = resp->status;
if (resp->status != nfs_ok) if (resp->status != nfs_ok)
...@@ -483,6 +484,8 @@ nfssvc_encode_readlinkres(struct svc_rqst *rqstp, __be32 *p) ...@@ -483,6 +484,8 @@ nfssvc_encode_readlinkres(struct svc_rqst *rqstp, __be32 *p)
*p = 0; *p = 0;
rqstp->rq_res.tail[0].iov_len = 4 - (resp->len&3); rqstp->rq_res.tail[0].iov_len = 4 - (resp->len&3);
} }
if (svc_encode_result_payload(rqstp, head->iov_len, resp->len))
return 0;
return 1; return 1;
} }
...@@ -490,6 +493,7 @@ int ...@@ -490,6 +493,7 @@ int
nfssvc_encode_readres(struct svc_rqst *rqstp, __be32 *p) nfssvc_encode_readres(struct svc_rqst *rqstp, __be32 *p)
{ {
struct nfsd_readres *resp = rqstp->rq_resp; struct nfsd_readres *resp = rqstp->rq_resp;
struct kvec *head = rqstp->rq_res.head;
*p++ = resp->status; *p++ = resp->status;
if (resp->status != nfs_ok) if (resp->status != nfs_ok)
...@@ -507,6 +511,8 @@ nfssvc_encode_readres(struct svc_rqst *rqstp, __be32 *p) ...@@ -507,6 +511,8 @@ nfssvc_encode_readres(struct svc_rqst *rqstp, __be32 *p)
*p = 0; *p = 0;
rqstp->rq_res.tail[0].iov_len = 4 - (resp->count&3); rqstp->rq_res.tail[0].iov_len = 4 - (resp->count&3);
} }
if (svc_encode_result_payload(rqstp, head->iov_len, resp->count))
return 0;
return 1; return 1;
} }
......
...@@ -448,7 +448,6 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src, ...@@ -448,7 +448,6 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
* svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list * svc_rdma_encode_write_list - Encode RPC Reply's Write chunk list
* @rctxt: Reply context with information about the RPC Call * @rctxt: Reply context with information about the RPC Call
* @sctxt: Send context for the RPC Reply * @sctxt: Send context for the RPC Reply
* @length: size in bytes of the payload in the first Write chunk
* *
* The client provides a Write chunk list in the Call message. Fill * The client provides a Write chunk list in the Call message. Fill
* in the segments in the first Write chunk in the Reply's transport * in the segments in the first Write chunk in the Reply's transport
...@@ -465,12 +464,12 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src, ...@@ -465,12 +464,12 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
*/ */
static ssize_t static ssize_t
svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt, svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
struct svc_rdma_send_ctxt *sctxt, struct svc_rdma_send_ctxt *sctxt)
unsigned int length)
{ {
ssize_t len, ret; ssize_t len, ret;
ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt, length); ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
rctxt->rc_read_payload_length);
if (ret < 0) if (ret < 0)
return ret; return ret;
len = ret; len = ret;
...@@ -923,21 +922,12 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -923,21 +922,12 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
goto err0; goto err0;
if (wr_lst) { if (wr_lst) {
/* XXX: Presume the client sent only one Write chunk */ /* XXX: Presume the client sent only one Write chunk */
unsigned long offset; ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr,
unsigned int length; rctxt->rc_read_payload_offset,
rctxt->rc_read_payload_length);
if (rctxt->rc_read_payload_length) {
offset = rctxt->rc_read_payload_offset;
length = rctxt->rc_read_payload_length;
} else {
offset = xdr->head[0].iov_len;
length = xdr->page_len;
}
ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr, offset,
length);
if (ret < 0) if (ret < 0)
goto err2; goto err2;
if (svc_rdma_encode_write_list(rctxt, sctxt, length) < 0) if (svc_rdma_encode_write_list(rctxt, sctxt) < 0)
goto err0; goto err0;
} else { } else {
if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0) if (xdr_stream_encode_item_absent(&sctxt->sc_stream) < 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment