Commit cee54fc9 authored by Trond Myklebust's avatar Trond Myklebust

NFSv4: Add functions to order RPC calls

 NFSv4 file state-changing functions such as OPEN, CLOSE, LOCK,... are all
 labelled with "sequence identifiers" in order to prevent the server from
 reordering RPC requests, as this could cause its file state to
 become out of sync with the client.

 Currently the NFS client code enforces this ordering locally using
 semaphores to restrict access to structures until the RPC call is done.
 This, of course, only works with synchronous RPC calls, since the
 user process must first grab the semaphore.
 By dropping semaphores, and instead teaching the RPC engine to hold
 the RPC calls until they are ready to be sent, we can extend this
 process to work nicely with asynchronous RPC calls too.

 This patch adds a new list called "rpc_sequence" that defines the order
 of the RPC calls to be sent. We add one such list for each state_owner.
 When an RPC call is ready to be sent, it checks if it is top of the
 rpc_sequence list. If so, it proceeds. If not, it goes back to sleep,
 and loops until it hits top of the list.
 Once the RPC call has completed, it can then bump the sequence id counter,
 and remove itself from the rpc_sequence list, and then wake up the next
 sleeper.

 Note that the state_owner sequence ids and lock_owner sequence ids are
 all indexed to the same rpc_sequence list, so OPEN, LOCK,... requests
 are all ordered w.r.t. each other.
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent 5e5ce5be
......@@ -92,6 +92,35 @@ struct nfs4_client {
unsigned char cl_id_uniquifier;
};
/*
* struct rpc_sequence ensures that RPC calls are sent in the exact
* order that they appear on the list.
*/
struct rpc_sequence {
struct rpc_wait_queue wait; /* RPC call delay queue */
spinlock_t lock; /* Protects the list */
struct list_head list; /* Defines sequence of RPC calls */
};
#define NFS_SEQID_CONFIRMED 1
struct nfs_seqid_counter {
struct rpc_sequence *sequence;
int flags;
u32 counter;
};
struct nfs_seqid {
struct list_head list;
struct nfs_seqid_counter *sequence;
struct rpc_task *task;
};
static inline void nfs_confirm_seqid(struct nfs_seqid_counter *seqid, int status)
{
if (seqid_mutating_err(-status))
seqid->flags |= NFS_SEQID_CONFIRMED;
}
/*
* NFS4 state_owners and lock_owners are simply labels for ordered
* sequences of RPC calls. Their sole purpose is to provide once-only
......@@ -106,12 +135,13 @@ struct nfs4_state_owner {
struct nfs4_client *so_client;
u32 so_id; /* 32-bit identifier, unique */
struct semaphore so_sema;
u32 so_seqid; /* protected by so_sema */
atomic_t so_count;
struct rpc_cred *so_cred; /* Associated cred */
struct list_head so_states;
struct list_head so_delegations;
struct nfs_seqid_counter so_seqid;
struct rpc_sequence so_sequence;
};
/*
......@@ -132,7 +162,7 @@ struct nfs4_lock_state {
fl_owner_t ls_owner; /* POSIX lock owner */
#define NFS_LOCK_INITIALIZED 1
int ls_flags;
u32 ls_seqid;
struct nfs_seqid_counter ls_seqid;
u32 ls_id;
nfs4_stateid ls_stateid;
atomic_t ls_count;
......@@ -224,12 +254,16 @@ extern struct nfs4_state * nfs4_get_open_state(struct inode *, struct nfs4_state
extern void nfs4_put_open_state(struct nfs4_state *);
extern void nfs4_close_state(struct nfs4_state *, mode_t);
extern struct nfs4_state *nfs4_find_state(struct inode *, struct rpc_cred *, mode_t mode);
extern void nfs4_increment_seqid(int status, struct nfs4_state_owner *sp);
extern void nfs4_schedule_state_recovery(struct nfs4_client *);
extern int nfs4_set_lock_state(struct nfs4_state *state, struct file_lock *fl);
extern void nfs4_increment_lock_seqid(int status, struct nfs4_lock_state *ls);
extern void nfs4_copy_stateid(nfs4_stateid *, struct nfs4_state *, fl_owner_t);
extern struct nfs_seqid *nfs_alloc_seqid(struct nfs_seqid_counter *counter);
extern int nfs_wait_on_sequence(struct nfs_seqid *seqid, struct rpc_task *task);
extern void nfs_increment_open_seqid(int status, struct nfs_seqid *seqid);
extern void nfs_increment_lock_seqid(int status, struct nfs_seqid *seqid);
extern void nfs_free_seqid(struct nfs_seqid *seqid);
extern const nfs4_stateid zero_stateid;
/* nfs4xdr.c */
......
This diff is collapsed.
......@@ -264,13 +264,16 @@ nfs4_alloc_state_owner(void)
{
struct nfs4_state_owner *sp;
sp = kmalloc(sizeof(*sp),GFP_KERNEL);
sp = kzalloc(sizeof(*sp),GFP_KERNEL);
if (!sp)
return NULL;
init_MUTEX(&sp->so_sema);
sp->so_seqid = 0; /* arbitrary */
INIT_LIST_HEAD(&sp->so_states);
INIT_LIST_HEAD(&sp->so_delegations);
rpc_init_wait_queue(&sp->so_sequence.wait, "Seqid_waitqueue");
sp->so_seqid.sequence = &sp->so_sequence;
spin_lock_init(&sp->so_sequence.lock);
INIT_LIST_HEAD(&sp->so_sequence.list);
atomic_set(&sp->so_count, 1);
return sp;
}
......@@ -553,12 +556,10 @@ static struct nfs4_lock_state *nfs4_alloc_lock_state(struct nfs4_state *state, f
struct nfs4_lock_state *lsp;
struct nfs4_client *clp = state->owner->so_client;
lsp = kmalloc(sizeof(*lsp), GFP_KERNEL);
lsp = kzalloc(sizeof(*lsp), GFP_KERNEL);
if (lsp == NULL)
return NULL;
lsp->ls_flags = 0;
lsp->ls_seqid = 0; /* arbitrary */
memset(lsp->ls_stateid.data, 0, sizeof(lsp->ls_stateid.data));
lsp->ls_seqid.sequence = &state->owner->so_sequence;
atomic_set(&lsp->ls_count, 1);
lsp->ls_owner = fl_owner;
spin_lock(&clp->cl_lock);
......@@ -673,29 +674,102 @@ void nfs4_copy_stateid(nfs4_stateid *dst, struct nfs4_state *state, fl_owner_t f
nfs4_put_lock_state(lsp);
}
/*
* Called with state->lock_sema and clp->cl_sem held.
*/
void nfs4_increment_lock_seqid(int status, struct nfs4_lock_state *lsp)
struct nfs_seqid *nfs_alloc_seqid(struct nfs_seqid_counter *counter)
{
struct rpc_sequence *sequence = counter->sequence;
struct nfs_seqid *new;
new = kmalloc(sizeof(*new), GFP_KERNEL);
if (new != NULL) {
new->sequence = counter;
new->task = NULL;
spin_lock(&sequence->lock);
list_add_tail(&new->list, &sequence->list);
spin_unlock(&sequence->lock);
}
return new;
}
void nfs_free_seqid(struct nfs_seqid *seqid)
{
if (status == NFS_OK || seqid_mutating_err(-status))
lsp->ls_seqid++;
struct rpc_sequence *sequence = seqid->sequence->sequence;
struct rpc_task *next = NULL;
spin_lock(&sequence->lock);
list_del(&seqid->list);
if (!list_empty(&sequence->list)) {
next = list_entry(sequence->list.next, struct nfs_seqid, list)->task;
if (next)
rpc_wake_up_task(next);
}
spin_unlock(&sequence->lock);
kfree(seqid);
}
/*
* Called with sp->so_sema and clp->cl_sem held.
*
* Increment the seqid if the OPEN/OPEN_DOWNGRADE/CLOSE succeeded, or
* failed with a seqid incrementing error -
* see comments nfs_fs.h:seqid_mutating_error()
*/
void nfs4_increment_seqid(int status, struct nfs4_state_owner *sp)
{
if (status == NFS_OK || seqid_mutating_err(-status))
sp->so_seqid++;
/* If the server returns BAD_SEQID, unhash state_owner here */
if (status == -NFS4ERR_BAD_SEQID)
* Called with sp->so_sema and clp->cl_sem held.
*
* Increment the seqid if the OPEN/OPEN_DOWNGRADE/CLOSE succeeded, or
* failed with a seqid incrementing error -
* see comments nfs_fs.h:seqid_mutating_error()
*/
static inline void nfs_increment_seqid(int status, struct nfs_seqid *seqid)
{
switch (status) {
case 0:
break;
case -NFS4ERR_BAD_SEQID:
case -NFS4ERR_STALE_CLIENTID:
case -NFS4ERR_STALE_STATEID:
case -NFS4ERR_BAD_STATEID:
case -NFS4ERR_BADXDR:
case -NFS4ERR_RESOURCE:
case -NFS4ERR_NOFILEHANDLE:
/* Non-seqid mutating errors */
return;
};
/*
* Note: no locking needed as we are guaranteed to be first
* on the sequence list
*/
seqid->sequence->counter++;
}
void nfs_increment_open_seqid(int status, struct nfs_seqid *seqid)
{
if (status == -NFS4ERR_BAD_SEQID) {
struct nfs4_state_owner *sp = container_of(seqid->sequence,
struct nfs4_state_owner, so_seqid);
nfs4_drop_state_owner(sp);
}
return nfs_increment_seqid(status, seqid);
}
/*
* Called with ls->lock_sema and clp->cl_sem held.
*
* Increment the seqid if the LOCK/LOCKU succeeded, or
* failed with a seqid incrementing error -
* see comments nfs_fs.h:seqid_mutating_error()
*/
void nfs_increment_lock_seqid(int status, struct nfs_seqid *seqid)
{
return nfs_increment_seqid(status, seqid);
}
int nfs_wait_on_sequence(struct nfs_seqid *seqid, struct rpc_task *task)
{
struct rpc_sequence *sequence = seqid->sequence->sequence;
int status = 0;
spin_lock(&sequence->lock);
if (sequence->list.next != &seqid->list) {
seqid->task = task;
rpc_sleep_on(&sequence->wait, task, NULL, NULL);
status = -EAGAIN;
}
spin_unlock(&sequence->lock);
return status;
}
static int reclaimer(void *);
......@@ -791,8 +865,6 @@ static int nfs4_reclaim_open_state(struct nfs4_state_recovery_ops *ops, struct n
if (state->state == 0)
continue;
status = ops->recover_open(sp, state);
list_for_each_entry(lock, &state->lock_states, ls_locks)
lock->ls_flags &= ~NFS_LOCK_INITIALIZED;
if (status >= 0) {
status = nfs4_reclaim_locks(ops, state);
if (status < 0)
......@@ -831,6 +903,26 @@ static int nfs4_reclaim_open_state(struct nfs4_state_recovery_ops *ops, struct n
return status;
}
static void nfs4_state_mark_reclaim(struct nfs4_client *clp)
{
struct nfs4_state_owner *sp;
struct nfs4_state *state;
struct nfs4_lock_state *lock;
/* Reset all sequence ids to zero */
list_for_each_entry(sp, &clp->cl_state_owners, so_list) {
sp->so_seqid.counter = 0;
sp->so_seqid.flags = 0;
list_for_each_entry(state, &sp->so_states, open_states) {
list_for_each_entry(lock, &state->lock_states, ls_locks) {
lock->ls_seqid.counter = 0;
lock->ls_seqid.flags = 0;
lock->ls_flags &= ~NFS_LOCK_INITIALIZED;
}
}
}
}
static int reclaimer(void *ptr)
{
struct reclaimer_args *args = (struct reclaimer_args *)ptr;
......@@ -864,6 +956,7 @@ static int reclaimer(void *ptr)
default:
ops = &nfs4_network_partition_recovery_ops;
};
nfs4_state_mark_reclaim(clp);
status = __nfs4_init_client(clp);
if (status)
goto out_error;
......
......@@ -604,7 +604,7 @@ static int encode_close(struct xdr_stream *xdr, const struct nfs_closeargs *arg)
RESERVE_SPACE(8+sizeof(arg->stateid.data));
WRITE32(OP_CLOSE);
WRITE32(arg->seqid);
WRITE32(arg->seqid->sequence->counter);
WRITEMEM(arg->stateid.data, sizeof(arg->stateid.data));
return 0;
......@@ -732,9 +732,9 @@ static int encode_lock(struct xdr_stream *xdr, const struct nfs_lockargs *arg)
struct nfs_open_to_lock *ol = opargs->u.open_lock;
RESERVE_SPACE(40);
WRITE32(ol->open_seqid);
WRITE32(ol->open_seqid->sequence->counter);
WRITEMEM(&ol->open_stateid, sizeof(ol->open_stateid));
WRITE32(ol->lock_seqid);
WRITE32(ol->lock_seqid->sequence->counter);
WRITE64(ol->lock_owner.clientid);
WRITE32(4);
WRITE32(ol->lock_owner.id);
......@@ -744,7 +744,7 @@ static int encode_lock(struct xdr_stream *xdr, const struct nfs_lockargs *arg)
RESERVE_SPACE(20);
WRITEMEM(&el->stateid, sizeof(el->stateid));
WRITE32(el->seqid);
WRITE32(el->seqid->sequence->counter);
}
return 0;
......@@ -775,7 +775,7 @@ static int encode_locku(struct xdr_stream *xdr, const struct nfs_lockargs *arg)
RESERVE_SPACE(44);
WRITE32(OP_LOCKU);
WRITE32(arg->type);
WRITE32(opargs->seqid);
WRITE32(opargs->seqid->sequence->counter);
WRITEMEM(&opargs->stateid, sizeof(opargs->stateid));
WRITE64(arg->offset);
WRITE64(arg->length);
......@@ -826,7 +826,7 @@ static inline void encode_openhdr(struct xdr_stream *xdr, const struct nfs_opena
*/
RESERVE_SPACE(8);
WRITE32(OP_OPEN);
WRITE32(arg->seqid);
WRITE32(arg->seqid->sequence->counter);
encode_share_access(xdr, arg->open_flags);
RESERVE_SPACE(16);
WRITE64(arg->clientid);
......@@ -941,7 +941,7 @@ static int encode_open_confirm(struct xdr_stream *xdr, const struct nfs_open_con
RESERVE_SPACE(8+sizeof(arg->stateid.data));
WRITE32(OP_OPEN_CONFIRM);
WRITEMEM(arg->stateid.data, sizeof(arg->stateid.data));
WRITE32(arg->seqid);
WRITE32(arg->seqid->sequence->counter);
return 0;
}
......@@ -953,7 +953,7 @@ static int encode_open_downgrade(struct xdr_stream *xdr, const struct nfs_closea
RESERVE_SPACE(8+sizeof(arg->stateid.data));
WRITE32(OP_OPEN_DOWNGRADE);
WRITEMEM(arg->stateid.data, sizeof(arg->stateid.data));
WRITE32(arg->seqid);
WRITE32(arg->seqid->sequence->counter);
encode_share_access(xdr, arg->open_flags);
return 0;
}
......@@ -1416,6 +1416,9 @@ static int nfs4_xdr_enc_close(struct rpc_rqst *req, uint32_t *p, struct nfs_clos
};
int status;
status = nfs_wait_on_sequence(args->seqid, req->rq_task);
if (status != 0)
goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
......@@ -1437,6 +1440,9 @@ static int nfs4_xdr_enc_open(struct rpc_rqst *req, uint32_t *p, struct nfs_opena
};
int status;
status = nfs_wait_on_sequence(args->seqid, req->rq_task);
if (status != 0)
goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
......@@ -1464,6 +1470,9 @@ static int nfs4_xdr_enc_open_confirm(struct rpc_rqst *req, uint32_t *p, struct n
};
int status;
status = nfs_wait_on_sequence(args->seqid, req->rq_task);
if (status != 0)
goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
......@@ -1485,6 +1494,9 @@ static int nfs4_xdr_enc_open_noattr(struct rpc_rqst *req, uint32_t *p, struct nf
};
int status;
status = nfs_wait_on_sequence(args->seqid, req->rq_task);
if (status != 0)
goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
......@@ -1506,6 +1518,9 @@ static int nfs4_xdr_enc_open_downgrade(struct rpc_rqst *req, uint32_t *p, struct
};
int status;
status = nfs_wait_on_sequence(args->seqid, req->rq_task);
if (status != 0)
goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
......@@ -1525,8 +1540,17 @@ static int nfs4_xdr_enc_lock(struct rpc_rqst *req, uint32_t *p, struct nfs_locka
struct compound_hdr hdr = {
.nops = 2,
};
struct nfs_lock_opargs *opargs = args->u.lock;
struct nfs_seqid *seqid;
int status;
if (opargs->new_lock_owner)
seqid = opargs->u.open_lock->lock_seqid;
else
seqid = opargs->u.exist_lock->seqid;
status = nfs_wait_on_sequence(seqid, req->rq_task);
if (status != 0)
goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
......@@ -1569,6 +1593,9 @@ static int nfs4_xdr_enc_locku(struct rpc_rqst *req, uint32_t *p, struct nfs_lock
};
int status;
status = nfs_wait_on_sequence(args->u.locku->seqid, req->rq_task);
if (status != 0)
goto out;
xdr_init_encode(&xdr, &req->rq_snd_buf, p);
encode_compound_hdr(&xdr, &hdr);
status = encode_putfh(&xdr, args->fh);
......
......@@ -96,12 +96,13 @@ struct nfs4_change_info {
u64 after;
};
struct nfs_seqid;
/*
* Arguments to the open call.
*/
struct nfs_openargs {
const struct nfs_fh * fh;
__u32 seqid;
struct nfs_seqid * seqid;
int open_flags;
__u64 clientid;
__u32 id;
......@@ -136,7 +137,7 @@ struct nfs_openres {
struct nfs_open_confirmargs {
const struct nfs_fh * fh;
nfs4_stateid stateid;
__u32 seqid;
struct nfs_seqid * seqid;
};
struct nfs_open_confirmres {
......@@ -149,7 +150,7 @@ struct nfs_open_confirmres {
struct nfs_closeargs {
struct nfs_fh * fh;
nfs4_stateid stateid;
__u32 seqid;
struct nfs_seqid * seqid;
int open_flags;
};
......@@ -165,15 +166,15 @@ struct nfs_lowner {
};
struct nfs_open_to_lock {
__u32 open_seqid;
struct nfs_seqid * open_seqid;
nfs4_stateid open_stateid;
__u32 lock_seqid;
struct nfs_seqid * lock_seqid;
struct nfs_lowner lock_owner;
};
struct nfs_exist_lock {
nfs4_stateid stateid;
__u32 seqid;
struct nfs_seqid * seqid;
};
struct nfs_lock_opargs {
......@@ -186,7 +187,7 @@ struct nfs_lock_opargs {
};
struct nfs_locku_opargs {
__u32 seqid;
struct nfs_seqid * seqid;
nfs4_stateid stateid;
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment