Commit 835a922a authored by Neil Brown's avatar Neil Brown Committed by Linus Torvalds

[PATCH] kNFSd: Support zero-copy read for NFSD

From Hirokazu Takahashi <taka@valinux.co.jp>

This patch changes read and readdir in nfsd.

read:
  If the file supports readpage, we use it to collect pages out of the page
  cache and to attache them directly to the outgoing nfs reply.
  The reduces the number of copies by one, and if the filesystem/device
  driver didn't copy the data, and if the network card can support not copying
  the data, then you get zero-copy reads.

readdir:
  A separate page is used for stoing the readdir response so that a fill
  PAGE_SIZE bytes of reply can be supported.
parent 032d3607
...@@ -436,35 +436,28 @@ static int ...@@ -436,35 +436,28 @@ static int
nfsd3_proc_readdir(struct svc_rqst *rqstp, struct nfsd3_readdirargs *argp, nfsd3_proc_readdir(struct svc_rqst *rqstp, struct nfsd3_readdirargs *argp,
struct nfsd3_readdirres *resp) struct nfsd3_readdirres *resp)
{ {
u32 * buffer;
int nfserr, count; int nfserr, count;
unsigned int want;
dprintk("nfsd: READDIR(3) %s %d bytes at %d\n", dprintk("nfsd: READDIR(3) %s %d bytes at %d\n",
SVCFH_fmt(&argp->fh), SVCFH_fmt(&argp->fh),
argp->count, (u32) argp->cookie); argp->count, (u32) argp->cookie);
/* Reserve buffer space for status, attributes and verifier */
svcbuf_reserve(&rqstp->rq_res, &buffer, &count,
1 + NFS3_POST_OP_ATTR_WORDS + 2);
/* Make sure we've room for the NULL ptr & eof flag, and shrink to /* Make sure we've room for the NULL ptr & eof flag, and shrink to
* client read size */ * client read size */
if ((count -= 2) > (want = (argp->count >> 2) - 2)) count = (argp->count >> 2) - 2;
count = want;
/* Read directory and encode entries on the fly */ /* Read directory and encode entries on the fly */
fh_copy(&resp->fh, &argp->fh); fh_copy(&resp->fh, &argp->fh);
resp->buflen = count; resp->buflen = count;
resp->common.err = nfs_ok; resp->common.err = nfs_ok;
resp->buffer = buffer; resp->buffer = argp->buffer;
resp->offset = NULL; resp->offset = NULL;
resp->rqstp = rqstp; resp->rqstp = rqstp;
nfserr = nfsd_readdir(rqstp, &resp->fh, (loff_t*) &argp->cookie, nfserr = nfsd_readdir(rqstp, &resp->fh, (loff_t*) &argp->cookie,
&resp->common, nfs3svc_encode_entry); &resp->common, nfs3svc_encode_entry);
memcpy(resp->verf, argp->verf, 8); memcpy(resp->verf, argp->verf, 8);
resp->count = resp->buffer - buffer; resp->count = resp->buffer - argp->buffer;
if (resp->offset) if (resp->offset)
xdr_encode_hyper(resp->offset, argp->cookie); xdr_encode_hyper(resp->offset, argp->cookie);
...@@ -479,35 +472,29 @@ static int ...@@ -479,35 +472,29 @@ static int
nfsd3_proc_readdirplus(struct svc_rqst *rqstp, struct nfsd3_readdirargs *argp, nfsd3_proc_readdirplus(struct svc_rqst *rqstp, struct nfsd3_readdirargs *argp,
struct nfsd3_readdirres *resp) struct nfsd3_readdirres *resp)
{ {
u32 * buffer; int nfserr, count;
int nfserr, count, want;
loff_t offset; loff_t offset;
dprintk("nfsd: READDIR+(3) %s %d bytes at %d\n", dprintk("nfsd: READDIR+(3) %s %d bytes at %d\n",
SVCFH_fmt(&argp->fh), SVCFH_fmt(&argp->fh),
argp->count, (u32) argp->cookie); argp->count, (u32) argp->cookie);
/* Reserve buffer space for status, attributes and verifier */
svcbuf_reserve(&rqstp->rq_res, &buffer, &count,
1 + NFS3_POST_OP_ATTR_WORDS + 2);
/* Make sure we've room for the NULL ptr & eof flag, and shrink to /* Make sure we've room for the NULL ptr & eof flag, and shrink to
* client read size */ * client read size */
if ((count -= 2) > (want = argp->count >> 2)) count = (argp->count >> 2) - 2;
count = want;
/* Read directory and encode entries on the fly */ /* Read directory and encode entries on the fly */
fh_copy(&resp->fh, &argp->fh); fh_copy(&resp->fh, &argp->fh);
resp->buflen = count; resp->buflen = count;
resp->common.err = nfs_ok; resp->common.err = nfs_ok;
resp->buffer = buffer; resp->buffer = argp->buffer;
resp->rqstp = rqstp; resp->rqstp = rqstp;
offset = argp->cookie; offset = argp->cookie;
nfserr = nfsd_readdir(rqstp, &resp->fh, &offset, nfserr = nfsd_readdir(rqstp, &resp->fh, &offset,
&resp->common, nfs3svc_encode_entry_plus); &resp->common, nfs3svc_encode_entry_plus);
memcpy(resp->verf, argp->verf, 8); memcpy(resp->verf, argp->verf, 8);
resp->count = resp->buffer - buffer; resp->count = resp->buffer - argp->buffer;
if (resp->offset) if (resp->offset)
xdr_encode_hyper(resp->offset, offset); xdr_encode_hyper(resp->offset, offset);
......
...@@ -490,6 +490,12 @@ nfs3svc_decode_readdirargs(struct svc_rqst *rqstp, u32 *p, ...@@ -490,6 +490,12 @@ nfs3svc_decode_readdirargs(struct svc_rqst *rqstp, u32 *p,
args->dircount = ~0; args->dircount = ~0;
args->count = ntohl(*p++); args->count = ntohl(*p++);
if (args->count > PAGE_SIZE)
args->count = PAGE_SIZE;
svc_take_page(rqstp);
args->buffer = page_address(rqstp->rq_respages[rqstp->rq_resused-1]);
return xdr_argsize_check(rqstp, p); return xdr_argsize_check(rqstp, p);
} }
...@@ -504,6 +510,9 @@ nfs3svc_decode_readdirplusargs(struct svc_rqst *rqstp, u32 *p, ...@@ -504,6 +510,9 @@ nfs3svc_decode_readdirplusargs(struct svc_rqst *rqstp, u32 *p,
args->dircount = ntohl(*p++); args->dircount = ntohl(*p++);
args->count = ntohl(*p++); args->count = ntohl(*p++);
svc_take_page(rqstp);
args->buffer = page_address(rqstp->rq_respages[rqstp->rq_resused-1]);
return xdr_argsize_check(rqstp, p); return xdr_argsize_check(rqstp, p);
} }
...@@ -600,7 +609,6 @@ nfs3svc_encode_readres(struct svc_rqst *rqstp, u32 *p, ...@@ -600,7 +609,6 @@ nfs3svc_encode_readres(struct svc_rqst *rqstp, u32 *p,
*p++ = htonl(resp->count); /* xdr opaque count */ *p++ = htonl(resp->count); /* xdr opaque count */
xdr_ressize_check(rqstp, p); xdr_ressize_check(rqstp, p);
/* now update rqstp->rq_res to reflect data aswell */ /* now update rqstp->rq_res to reflect data aswell */
rqstp->rq_res.page_base = 0;
rqstp->rq_res.page_len = resp->count; rqstp->rq_res.page_len = resp->count;
if (resp->count & 3) { if (resp->count & 3) {
/* need to pad the tail */ /* need to pad the tail */
...@@ -676,11 +684,16 @@ nfs3svc_encode_readdirres(struct svc_rqst *rqstp, u32 *p, ...@@ -676,11 +684,16 @@ nfs3svc_encode_readdirres(struct svc_rqst *rqstp, u32 *p,
if (resp->status == 0) { if (resp->status == 0) {
/* stupid readdir cookie */ /* stupid readdir cookie */
memcpy(p, resp->verf, 8); p += 2; memcpy(p, resp->verf, 8); p += 2;
xdr_ressize_check(rqstp, p);
p = resp->buffer; p = resp->buffer;
*p++ = 0; /* no more entries */ *p++ = 0; /* no more entries */
*p++ = htonl(resp->common.err == nfserr_eof); *p++ = htonl(resp->common.err == nfserr_eof);
} rqstp->rq_res.page_len = ((unsigned long)p & ~PAGE_MASK);
rqstp->rq_res.len =
rqstp->rq_res.head[0].iov_len+
rqstp->rq_res.page_len;
return 1;
} else
return xdr_ressize_check(rqstp, p); return xdr_ressize_check(rqstp, p);
} }
......
...@@ -467,7 +467,6 @@ static int ...@@ -467,7 +467,6 @@ static int
nfsd_proc_readdir(struct svc_rqst *rqstp, struct nfsd_readdirargs *argp, nfsd_proc_readdir(struct svc_rqst *rqstp, struct nfsd_readdirargs *argp,
struct nfsd_readdirres *resp) struct nfsd_readdirres *resp)
{ {
u32 * buffer;
int nfserr, count; int nfserr, count;
loff_t offset; loff_t offset;
...@@ -475,19 +474,15 @@ nfsd_proc_readdir(struct svc_rqst *rqstp, struct nfsd_readdirargs *argp, ...@@ -475,19 +474,15 @@ nfsd_proc_readdir(struct svc_rqst *rqstp, struct nfsd_readdirargs *argp,
SVCFH_fmt(&argp->fh), SVCFH_fmt(&argp->fh),
argp->count, argp->cookie); argp->count, argp->cookie);
/* Reserve buffer space for status */
svcbuf_reserve(&rqstp->rq_res, &buffer, &count, 1);
/* Shrink to the client read size */ /* Shrink to the client read size */
if (count > (argp->count >> 2)) count = (argp->count >> 2) - 2;
count = argp->count >> 2;
/* Make sure we've room for the NULL ptr & eof flag */ /* Make sure we've room for the NULL ptr & eof flag */
count -= 2; count -= 2;
if (count < 0) if (count < 0)
count = 0; count = 0;
resp->buffer = buffer; resp->buffer = argp->buffer;
resp->offset = NULL; resp->offset = NULL;
resp->buflen = count; resp->buflen = count;
resp->common.err = nfs_ok; resp->common.err = nfs_ok;
...@@ -496,7 +491,7 @@ nfsd_proc_readdir(struct svc_rqst *rqstp, struct nfsd_readdirargs *argp, ...@@ -496,7 +491,7 @@ nfsd_proc_readdir(struct svc_rqst *rqstp, struct nfsd_readdirargs *argp,
nfserr = nfsd_readdir(rqstp, &argp->fh, &offset, nfserr = nfsd_readdir(rqstp, &argp->fh, &offset,
&resp->common, nfssvc_encode_entry); &resp->common, nfssvc_encode_entry);
resp->count = resp->buffer - buffer; resp->count = resp->buffer - argp->buffer;
if (resp->offset) if (resp->offset)
*resp->offset = (u32)offset; *resp->offset = (u32)offset;
......
...@@ -337,6 +337,11 @@ nfssvc_decode_readdirargs(struct svc_rqst *rqstp, u32 *p, ...@@ -337,6 +337,11 @@ nfssvc_decode_readdirargs(struct svc_rqst *rqstp, u32 *p,
return 0; return 0;
args->cookie = ntohl(*p++); args->cookie = ntohl(*p++);
args->count = ntohl(*p++); args->count = ntohl(*p++);
if (args->count > PAGE_SIZE)
args->count = PAGE_SIZE;
svc_take_page(rqstp);
args->buffer = page_address(rqstp->rq_respages[rqstp->rq_resused-1]);
return xdr_argsize_check(rqstp, p); return xdr_argsize_check(rqstp, p);
} }
...@@ -385,7 +390,6 @@ nfssvc_encode_readres(struct svc_rqst *rqstp, u32 *p, ...@@ -385,7 +390,6 @@ nfssvc_encode_readres(struct svc_rqst *rqstp, u32 *p,
xdr_ressize_check(rqstp, p); xdr_ressize_check(rqstp, p);
/* now update rqstp->rq_res to reflect data aswell */ /* now update rqstp->rq_res to reflect data aswell */
rqstp->rq_res.page_base = 0;
rqstp->rq_res.page_len = resp->count; rqstp->rq_res.page_len = resp->count;
if (resp->count & 3) { if (resp->count & 3) {
/* need to pad the tail */ /* need to pad the tail */
...@@ -404,11 +408,16 @@ int ...@@ -404,11 +408,16 @@ int
nfssvc_encode_readdirres(struct svc_rqst *rqstp, u32 *p, nfssvc_encode_readdirres(struct svc_rqst *rqstp, u32 *p,
struct nfsd_readdirres *resp) struct nfsd_readdirres *resp)
{ {
xdr_ressize_check(rqstp, p);
p = resp->buffer; p = resp->buffer;
*p++ = 0; /* no more entries */ *p++ = 0; /* no more entries */
*p++ = htonl((resp->common.err == nfserr_eof)); *p++ = htonl((resp->common.err == nfserr_eof));
rqstp->rq_res.page_len = ((unsigned long)p & ~PAGE_MASK);
rqstp->rq_res.len =
rqstp->rq_res.head[0].iov_len+
rqstp->rq_res.page_len;
return xdr_ressize_check(rqstp, p); return 1;
} }
int int
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* dentry, don't worry--they have been taken care of. * dentry, don't worry--they have been taken care of.
* *
* Copyright (C) 1995-1999 Olaf Kirch <okir@monad.swb.de> * Copyright (C) 1995-1999 Olaf Kirch <okir@monad.swb.de>
* Zerocpy NFS support (C) 2002 Hirokazu Takahashi <taka@valinux.co.jp>
*/ */
#include <linux/config.h> #include <linux/config.h>
...@@ -28,6 +29,7 @@ ...@@ -28,6 +29,7 @@
#include <linux/net.h> #include <linux/net.h>
#include <linux/unistd.h> #include <linux/unistd.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/pagemap.h>
#include <linux/in.h> #include <linux/in.h>
#include <linux/module.h> #include <linux/module.h>
#include <linux/namei.h> #include <linux/namei.h>
...@@ -570,6 +572,61 @@ nfsd_get_raparms(dev_t dev, ino_t ino) ...@@ -570,6 +572,61 @@ nfsd_get_raparms(dev_t dev, ino_t ino)
return ra; return ra;
} }
/*
* Grab and keep cached pages assosiated with a file in the svc_rqst
* so that they can be passed to the netowork sendmsg/sendpage routines
* directrly. They will be released after the sending has completed.
*/
static int
nfsd_read_actor(read_descriptor_t *desc, struct page *page, unsigned long offset , unsigned long size)
{
unsigned long count = desc->count;
struct svc_rqst *rqstp = (struct svc_rqst *)desc->buf;
if (size > count)
size = count;
if (rqstp->rq_res.page_len == 0) {
get_page(page);
rqstp->rq_respages[rqstp->rq_resused++] = page;
rqstp->rq_res.page_base = offset;
rqstp->rq_res.page_len = size;
} else if (page != rqstp->rq_respages[rqstp->rq_resused-1]) {
get_page(page);
rqstp->rq_respages[rqstp->rq_resused++] = page;
rqstp->rq_res.page_len += size;
} else {
rqstp->rq_res.page_len += size;
}
desc->count = count - size;
desc->written += size;
return size;
}
static inline ssize_t
nfsd_getpages(struct file *filp, struct svc_rqst *rqstp, unsigned long count)
{
read_descriptor_t desc;
ssize_t retval;
if (!count)
return 0;
svc_pushback_unused_pages(rqstp);
desc.written = 0;
desc.count = count;
desc.buf = (char *)rqstp;
desc.error = 0;
do_generic_file_read(filp, &filp->f_pos, &desc, nfsd_read_actor);
retval = desc.written;
if (!retval)
retval = desc.error;
return retval;
}
/* /*
* Read data from a file. count must contain the requested read count * Read data from a file. count must contain the requested read count
* on entry. On return, *count contains the number of bytes actually read. * on entry. On return, *count contains the number of bytes actually read.
...@@ -601,10 +658,15 @@ nfsd_read(struct svc_rqst *rqstp, struct svc_fh *fhp, loff_t offset, ...@@ -601,10 +658,15 @@ nfsd_read(struct svc_rqst *rqstp, struct svc_fh *fhp, loff_t offset,
if (ra) if (ra)
file.f_ra = ra->p_ra; file.f_ra = ra->p_ra;
if (inode->i_mapping->a_ops->readpage) {
file.f_pos = offset;
err = nfsd_getpages(&file, rqstp, *count);
} else {
oldfs = get_fs(); oldfs = get_fs();
set_fs(KERNEL_DS); set_fs(KERNEL_DS);
err = vfs_readv(&file, vec, vlen, *count, &offset); err = vfs_readv(&file, vec, vlen, *count, &offset);
set_fs(oldfs); set_fs(oldfs);
}
/* Write back readahead params */ /* Write back readahead params */
if (ra) if (ra)
......
...@@ -77,6 +77,7 @@ struct nfsd_readdirargs { ...@@ -77,6 +77,7 @@ struct nfsd_readdirargs {
struct svc_fh fh; struct svc_fh fh;
__u32 cookie; __u32 cookie;
__u32 count; __u32 count;
u32 * buffer;
}; };
struct nfsd_attrstat { struct nfsd_attrstat {
......
...@@ -96,6 +96,7 @@ struct nfsd3_readdirargs { ...@@ -96,6 +96,7 @@ struct nfsd3_readdirargs {
__u32 dircount; __u32 dircount;
__u32 count; __u32 count;
__u32 * verf; __u32 * verf;
u32 * buffer;
}; };
struct nfsd3_commitargs { struct nfsd3_commitargs {
......
...@@ -192,6 +192,19 @@ static void inline svc_pushback_allpages(struct svc_rqst *rqstp) ...@@ -192,6 +192,19 @@ static void inline svc_pushback_allpages(struct svc_rqst *rqstp)
} }
} }
static void inline svc_pushback_unused_pages(struct svc_rqst *rqstp)
{
while (rqstp->rq_resused) {
if (rqstp->rq_respages[--rqstp->rq_resused] != NULL) {
rqstp->rq_argpages[rqstp->rq_arghi++] =
rqstp->rq_respages[rqstp->rq_resused];
rqstp->rq_respages[rqstp->rq_resused] = NULL;
}
if (rqstp->rq_res.pages == &rqstp->rq_respages[rqstp->rq_resused])
break;
}
}
static void inline svc_free_allpages(struct svc_rqst *rqstp) static void inline svc_free_allpages(struct svc_rqst *rqstp)
{ {
while (rqstp->rq_resused) { while (rqstp->rq_resused) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment