Commit 045d2a6d authored by Trond Myklebust's avatar Trond Myklebust

NFSv4.1: Delay callback processing when there are referring triples

If CB_SEQUENCE tells us that the processing of this request depends on
the completion of one or more referring triples (see RFC 5661 Section
2.10.6.3), delay the callback processing until after the RPC requests
being referred to have completed.
If we end up delaying for more than 1/2 second, then fall back to
returning NFS4ERR_DELAY in reply to the callback.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@primarydata.com>
parent e09c978a
...@@ -454,8 +454,8 @@ static bool referring_call_exists(struct nfs_client *clp, ...@@ -454,8 +454,8 @@ static bool referring_call_exists(struct nfs_client *clp,
((u32 *)&rclist->rcl_sessionid.data)[3], ((u32 *)&rclist->rcl_sessionid.data)[3],
ref->rc_sequenceid, ref->rc_slotid); ref->rc_sequenceid, ref->rc_slotid);
status = nfs4_slot_seqid_in_use(tbl, ref->rc_slotid, status = nfs4_slot_wait_on_seqid(tbl, ref->rc_slotid,
ref->rc_sequenceid); ref->rc_sequenceid, HZ >> 1) < 0;
if (status) if (status)
goto out; goto out;
} }
......
...@@ -686,6 +686,8 @@ static void nfs41_sequence_free_slot(struct nfs4_sequence_res *res) ...@@ -686,6 +686,8 @@ static void nfs41_sequence_free_slot(struct nfs4_sequence_res *res)
res->sr_slot = NULL; res->sr_slot = NULL;
if (send_new_highest_used_slotid) if (send_new_highest_used_slotid)
nfs41_notify_server(session->clp); nfs41_notify_server(session->clp);
if (waitqueue_active(&tbl->slot_waitq))
wake_up_all(&tbl->slot_waitq);
} }
int nfs41_sequence_done(struct rpc_task *task, struct nfs4_sequence_res *res) int nfs41_sequence_done(struct rpc_task *task, struct nfs4_sequence_res *res)
......
...@@ -28,6 +28,7 @@ static void nfs4_init_slot_table(struct nfs4_slot_table *tbl, const char *queue) ...@@ -28,6 +28,7 @@ static void nfs4_init_slot_table(struct nfs4_slot_table *tbl, const char *queue)
tbl->highest_used_slotid = NFS4_NO_SLOT; tbl->highest_used_slotid = NFS4_NO_SLOT;
spin_lock_init(&tbl->slot_tbl_lock); spin_lock_init(&tbl->slot_tbl_lock);
rpc_init_priority_wait_queue(&tbl->slot_tbl_waitq, queue); rpc_init_priority_wait_queue(&tbl->slot_tbl_waitq, queue);
init_waitqueue_head(&tbl->slot_waitq);
init_completion(&tbl->complete); init_completion(&tbl->complete);
} }
...@@ -192,7 +193,8 @@ static int nfs4_slot_get_seqid(struct nfs4_slot_table *tbl, u32 slotid, ...@@ -192,7 +193,8 @@ static int nfs4_slot_get_seqid(struct nfs4_slot_table *tbl, u32 slotid,
* RPC call in question is still in flight. This function is mainly * RPC call in question is still in flight. This function is mainly
* intended for use by the callback channel. * intended for use by the callback channel.
*/ */
bool nfs4_slot_seqid_in_use(struct nfs4_slot_table *tbl, u32 slotid, u32 seq_nr) static bool nfs4_slot_seqid_in_use(struct nfs4_slot_table *tbl,
u32 slotid, u32 seq_nr)
{ {
u32 cur_seq; u32 cur_seq;
bool ret = false; bool ret = false;
...@@ -205,6 +207,24 @@ bool nfs4_slot_seqid_in_use(struct nfs4_slot_table *tbl, u32 slotid, u32 seq_nr) ...@@ -205,6 +207,24 @@ bool nfs4_slot_seqid_in_use(struct nfs4_slot_table *tbl, u32 slotid, u32 seq_nr)
return ret; return ret;
} }
/*
* nfs4_slot_wait_on_seqid - wait until a slot sequence id is complete
*
* Given a slot table, slot id and sequence number, wait until the
* corresponding RPC call completes. This function is mainly
* intended for use by the callback channel.
*/
int nfs4_slot_wait_on_seqid(struct nfs4_slot_table *tbl,
u32 slotid, u32 seq_nr,
unsigned long timeout)
{
if (wait_event_timeout(tbl->slot_waitq,
!nfs4_slot_seqid_in_use(tbl, slotid, seq_nr),
timeout) == 0)
return -ETIMEDOUT;
return 0;
}
/* /*
* nfs4_alloc_slot - efficiently look for a free slot * nfs4_alloc_slot - efficiently look for a free slot
* *
......
...@@ -36,6 +36,7 @@ struct nfs4_slot_table { ...@@ -36,6 +36,7 @@ struct nfs4_slot_table {
unsigned long used_slots[SLOT_TABLE_SZ]; /* used/unused bitmap */ unsigned long used_slots[SLOT_TABLE_SZ]; /* used/unused bitmap */
spinlock_t slot_tbl_lock; spinlock_t slot_tbl_lock;
struct rpc_wait_queue slot_tbl_waitq; /* allocators may wait here */ struct rpc_wait_queue slot_tbl_waitq; /* allocators may wait here */
wait_queue_head_t slot_waitq; /* Completion wait on slot */
u32 max_slots; /* # slots in table */ u32 max_slots; /* # slots in table */
u32 max_slotid; /* Max allowed slotid value */ u32 max_slotid; /* Max allowed slotid value */
u32 highest_used_slotid; /* sent to server on each SEQ. u32 highest_used_slotid; /* sent to server on each SEQ.
...@@ -78,7 +79,9 @@ extern int nfs4_setup_slot_table(struct nfs4_slot_table *tbl, ...@@ -78,7 +79,9 @@ extern int nfs4_setup_slot_table(struct nfs4_slot_table *tbl,
extern void nfs4_shutdown_slot_table(struct nfs4_slot_table *tbl); extern void nfs4_shutdown_slot_table(struct nfs4_slot_table *tbl);
extern struct nfs4_slot *nfs4_alloc_slot(struct nfs4_slot_table *tbl); extern struct nfs4_slot *nfs4_alloc_slot(struct nfs4_slot_table *tbl);
extern struct nfs4_slot *nfs4_lookup_slot(struct nfs4_slot_table *tbl, u32 slotid); extern struct nfs4_slot *nfs4_lookup_slot(struct nfs4_slot_table *tbl, u32 slotid);
extern bool nfs4_slot_seqid_in_use(struct nfs4_slot_table *tbl, u32 slotid, u32 seq_nr); extern int nfs4_slot_wait_on_seqid(struct nfs4_slot_table *tbl,
u32 slotid, u32 seq_nr,
unsigned long timeout);
extern bool nfs4_try_to_lock_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot); extern bool nfs4_try_to_lock_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot);
extern void nfs4_free_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot); extern void nfs4_free_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot);
extern void nfs4_slot_tbl_drain_complete(struct nfs4_slot_table *tbl); extern void nfs4_slot_tbl_drain_complete(struct nfs4_slot_table *tbl);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment