Commit d488b9d0 authored by Trond Myklebust's avatar Trond Myklebust Committed by Anna Schumaker

pnfs/flexfiles: enable localio support

If the DS is local to this client use localio to write the data.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
Signed-off-by: default avatarMike Snitzer <snitzer@kernel.org>
Reviewed-by: default avatarNeilBrown <neilb@suse.de>
Reviewed-by: default avatarJeff Layton <jlayton@kernel.org>
Signed-off-by: default avatarAnna Schumaker <anna.schumaker@oracle.com>
parent fa88a7d6
......@@ -11,6 +11,7 @@
#include <linux/nfs_mount.h>
#include <linux/nfs_page.h>
#include <linux/module.h>
#include <linux/file.h>
#include <linux/sched/mm.h>
#include <linux/sunrpc/metrics.h>
......@@ -162,6 +163,21 @@ decode_name(struct xdr_stream *xdr, u32 *id)
return 0;
}
static struct nfsd_file *
ff_local_open_fh(struct nfs_client *clp, const struct cred *cred,
struct nfs_fh *fh, fmode_t mode)
{
if (mode & FMODE_WRITE) {
/*
* Always request read and write access since this corresponds
* to a rw layout.
*/
mode |= FMODE_READ;
}
return nfs_local_open_fh(clp, cred, fh, mode);
}
static bool ff_mirror_match_fh(const struct nfs4_ff_layout_mirror *m1,
const struct nfs4_ff_layout_mirror *m2)
{
......@@ -1756,6 +1772,7 @@ ff_layout_read_pagelist(struct nfs_pgio_header *hdr)
struct pnfs_layout_segment *lseg = hdr->lseg;
struct nfs4_pnfs_ds *ds;
struct rpc_clnt *ds_clnt;
struct nfsd_file *localio;
struct nfs4_ff_layout_mirror *mirror;
const struct cred *ds_cred;
loff_t offset = hdr->args.offset;
......@@ -1802,11 +1819,18 @@ ff_layout_read_pagelist(struct nfs_pgio_header *hdr)
hdr->args.offset = offset;
hdr->mds_offset = offset;
/* Start IO accounting for local read */
localio = ff_local_open_fh(ds->ds_clp, ds_cred, fh, FMODE_READ);
if (localio) {
hdr->task.tk_start = ktime_get();
ff_layout_read_record_layoutstats_start(&hdr->task, hdr);
}
/* Perform an asynchronous read to ds */
nfs_initiate_pgio(ds_clnt, hdr, ds_cred, ds->ds_clp->rpc_ops,
vers == 3 ? &ff_layout_read_call_ops_v3 :
&ff_layout_read_call_ops_v4,
0, RPC_TASK_SOFTCONN, NULL);
0, RPC_TASK_SOFTCONN, localio);
put_cred(ds_cred);
return PNFS_ATTEMPTED;
......@@ -1826,6 +1850,7 @@ ff_layout_write_pagelist(struct nfs_pgio_header *hdr, int sync)
struct pnfs_layout_segment *lseg = hdr->lseg;
struct nfs4_pnfs_ds *ds;
struct rpc_clnt *ds_clnt;
struct nfsd_file *localio;
struct nfs4_ff_layout_mirror *mirror;
const struct cred *ds_cred;
loff_t offset = hdr->args.offset;
......@@ -1870,11 +1895,19 @@ ff_layout_write_pagelist(struct nfs_pgio_header *hdr, int sync)
*/
hdr->args.offset = offset;
/* Start IO accounting for local write */
localio = ff_local_open_fh(ds->ds_clp, ds_cred, fh,
FMODE_READ|FMODE_WRITE);
if (localio) {
hdr->task.tk_start = ktime_get();
ff_layout_write_record_layoutstats_start(&hdr->task, hdr);
}
/* Perform an asynchronous write */
nfs_initiate_pgio(ds_clnt, hdr, ds_cred, ds->ds_clp->rpc_ops,
vers == 3 ? &ff_layout_write_call_ops_v3 :
&ff_layout_write_call_ops_v4,
sync, RPC_TASK_SOFTCONN, NULL);
sync, RPC_TASK_SOFTCONN, localio);
put_cred(ds_cred);
return PNFS_ATTEMPTED;
......@@ -1908,6 +1941,7 @@ static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how)
struct pnfs_layout_segment *lseg = data->lseg;
struct nfs4_pnfs_ds *ds;
struct rpc_clnt *ds_clnt;
struct nfsd_file *localio;
struct nfs4_ff_layout_mirror *mirror;
const struct cred *ds_cred;
u32 idx;
......@@ -1946,10 +1980,18 @@ static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how)
if (fh)
data->args.fh = fh;
/* Start IO accounting for local commit */
localio = ff_local_open_fh(ds->ds_clp, ds_cred, fh,
FMODE_READ|FMODE_WRITE);
if (localio) {
data->task.tk_start = ktime_get();
ff_layout_commit_record_layoutstats_start(&data->task, data);
}
ret = nfs_initiate_commit(ds_clnt, data, ds->ds_clp->rpc_ops,
vers == 3 ? &ff_layout_commit_call_ops_v3 :
&ff_layout_commit_call_ops_v4,
how, RPC_TASK_SOFTCONN, NULL);
how, RPC_TASK_SOFTCONN, localio);
put_cred(ds_cred);
return ret;
out_err:
......
......@@ -395,6 +395,12 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg,
/* connect success, check rsize/wsize limit */
if (!status) {
/*
* ds_clp is put in destroy_ds().
* keep ds_clp even if DS is local, so that if local IO cannot
* proceed somehow, we can fall back to NFS whenever we want.
*/
nfs_local_probe(ds->ds_clp);
max_payload =
nfs_block_size(rpc_max_payload(ds->ds_clp->cl_rpcclient),
NULL);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment