Commit fac5a35a authored by Neil Brown's avatar Neil Brown Committed by Linus Torvalds

[PATCH] PATCH 8/16: NFSD: RPC lists tidyup

Change sunrpc to use more list.h lists

The sunrpc client code uses home-grown doubly linked
lists to group
   - idle server threads
   - pending server sockets
   - waiting rpc tasks
   - all rpc tasks.

This patch converts all of these lists to <linux/list.h> lists and
also makes the list of all server sockets for a particular server into
a list.h list instead of a single-link list.

Possibly the least obvious change is replacing RPC_INIT_WAITQ
with RPC_WAITQ and INIT_RPC_WAITQ.  These follow the model of
LIST_HEAD and INIT_LIST_HEAD defined in list.h and are needed
to initialise the list_head in the rpc_waitq properly.
parent a00181e0
......@@ -50,7 +50,7 @@
/*
* This is the wait queue all cluster daemons sleep on
*/
static struct rpc_wait_queue flushd_queue = RPC_INIT_WAITQ("nfs_flushd");
static RPC_WAITQ(flushd_queue, "nfs_flushd");
/*
* Local function declarations.
......
......@@ -24,7 +24,7 @@ struct nfs_unlinkdata {
};
static struct nfs_unlinkdata *nfs_deletes;
static struct rpc_wait_queue nfs_delete_queue = RPC_INIT_WAITQ("nfs_delete_queue");
static RPC_WAITQ(nfs_delete_queue, "nfs_delete_queue");
/**
* nfs_detach_unlinkdata - Remove asynchronous unlink from global list
......
......@@ -34,13 +34,11 @@ struct rpc_message {
* This is the RPC task struct
*/
struct rpc_task {
struct rpc_task * tk_prev; /* wait queue links */
struct rpc_task * tk_next;
struct list_head tk_list; /* wait queue links */
#ifdef RPC_DEBUG
unsigned long tk_magic; /* 0xf00baa */
#endif
struct rpc_task * tk_next_task; /* global list of tasks */
struct rpc_task * tk_prev_task; /* global list of tasks */
struct list_head tk_task; /* global list of tasks */
struct rpc_clnt * tk_client; /* RPC client */
struct rpc_rqst * tk_rqstp; /* RPC request */
int tk_status; /* result of last operation */
......@@ -88,6 +86,20 @@ struct rpc_task {
#define tk_auth tk_client->cl_auth
#define tk_xprt tk_client->cl_xprt
/* support walking a list of tasks on a wait queue */
#define task_for_each(task, pos, head) \
list_for_each(pos, head) \
if ((task=list_entry(pos, struct rpc_task, tk_list)),1)
#define task_for_first(task, head) \
if (!list_empty(head) && \
((task=list_entry((head)->next, struct rpc_task, tk_list)),1))
/* .. and walking list of all tasks */
#define alltask_for_each(task, pos, head) \
list_for_each(pos, head) \
if ((task=list_entry(pos, struct rpc_task, tk_task)),1)
typedef void (*rpc_action)(struct rpc_task *);
/*
......@@ -133,16 +145,24 @@ typedef void (*rpc_action)(struct rpc_task *);
* RPC synchronization objects
*/
struct rpc_wait_queue {
struct rpc_task * task;
struct list_head tasks;
#ifdef RPC_DEBUG
char * name;
#endif
};
#ifndef RPC_DEBUG
# define RPC_INIT_WAITQ(name) ((struct rpc_wait_queue) { NULL })
# define RPC_WAITQ_INIT(var,qname) ((struct rpc_wait_queue) {LIST_HEAD_INIT(var)})
# define RPC_WAITQ(var,qname) struct rpc_wait_queue var = RPC_WAITQ_INIT(var.tasks,qname)
# define INIT_RPC_WAITQ(ptr,qname) do { \
INIT_LIST_HEAD(&(ptr)->tasks); \
} while(0)
#else
# define RPC_INIT_WAITQ(name) ((struct rpc_wait_queue) { NULL, name })
# define RPC_WAITQ_INIT(var,qname) ((struct rpc_wait_queue) {LIST_HEAD_INIT(var.tasks), qname})
# define RPC_WAITQ(var,qname) struct rpc_wait_queue var = RPC_WAITQ_INIT(var,qname)
# define INIT_RPC_WAITQ(ptr,qname) do { \
INIT_LIST_HEAD(&(ptr)->tasks); (ptr)->name = qname; \
} while(0)
#endif
/*
......
......@@ -27,8 +27,8 @@
* We currently do not support more than one RPC program per daemon.
*/
struct svc_serv {
struct svc_rqst * sv_threads; /* idle server threads */
struct svc_sock * sv_sockets; /* pending sockets */
struct list_head sv_threads; /* idle server threads */
struct list_head sv_sockets; /* pending sockets */
struct svc_program * sv_program; /* RPC program */
struct svc_stat * sv_stats; /* RPC statistics */
spinlock_t sv_lock;
......@@ -36,7 +36,7 @@ struct svc_serv {
unsigned int sv_bufsz; /* datagram buffer size */
unsigned int sv_xdrsize; /* XDR buffer size */
struct svc_sock * sv_allsocks; /* all sockets */
struct list_head sv_allsocks; /* all sockets */
char * sv_name; /* service name */
};
......@@ -89,8 +89,7 @@ struct svc_buf {
* NOTE: First two items must be prev/next.
*/
struct svc_rqst {
struct svc_rqst * rq_prev; /* idle list */
struct svc_rqst * rq_next;
struct list_head rq_list; /* idle list */
struct svc_sock * rq_sock; /* socket */
struct sockaddr_in rq_addr; /* peer address */
int rq_addrlen;
......
......@@ -13,12 +13,10 @@
/*
* RPC server socket.
* NOTE: First two items must be prev/next.
*/
struct svc_sock {
struct svc_sock * sk_prev; /* list of ready sockets */
struct svc_sock * sk_next;
struct svc_sock * sk_list; /* list of all sockets */
struct list_head sk_ready; /* list of ready sockets */
struct list_head sk_list; /* list of all sockets */
struct socket * sk_sock; /* berkeley socket layer */
struct sock * sk_sk; /* INET layer */
spinlock_t sk_lock;
......
......@@ -12,60 +12,7 @@
#include <linux/timer.h>
#include <linux/tqueue.h>
#include <linux/sunrpc/debug.h>
/*
* These are the RPC list manipulation primitives used everywhere.
*/
struct rpc_listitem {
struct rpc_listitem * prev;
struct rpc_listitem * next;
};
static __inline__ void
__rpc_append_list(struct rpc_listitem **q, struct rpc_listitem *item)
{
struct rpc_listitem *next, *prev;
if (!(next = *q)) {
*q = item->next = item->prev = item;
} else {
prev = next->prev;
prev->next = item;
next->prev = item;
item->next = next;
item->prev = prev;
}
}
static __inline__ void
__rpc_insert_list(struct rpc_listitem **q, struct rpc_listitem *item)
{
__rpc_append_list(q, item);
*q = item;
}
static __inline__ void
__rpc_remove_list(struct rpc_listitem **q, struct rpc_listitem *item)
{
struct rpc_listitem *prev = item->prev,
*next = item->next;
if (item != prev) {
next->prev = prev;
prev->next = next;
} else {
next = NULL;
}
if (*q == item)
*q = next;
}
#define rpc_insert_list(q, i) \
__rpc_insert_list((struct rpc_listitem **) q, (struct rpc_listitem *) i)
#define rpc_append_list(q, i) \
__rpc_append_list((struct rpc_listitem **) q, (struct rpc_listitem *) i)
#define rpc_remove_list(q, i) \
__rpc_remove_list((struct rpc_listitem **) q, (struct rpc_listitem *) i)
#include <linux/list.h>
/*
* Shorthands
......
......@@ -103,7 +103,7 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
clnt->cl_vers = version->number;
clnt->cl_prot = xprt->prot;
clnt->cl_stats = program->stats;
clnt->cl_bindwait = RPC_INIT_WAITQ("bindwait");
INIT_RPC_WAITQ(&clnt->cl_bindwait, "bindwait");
if (!clnt->cl_port)
clnt->cl_autobind = 1;
......
......@@ -41,23 +41,23 @@ static void rpciod_killall(void);
* handler, or while executing another RPC task, it is put on
* schedq, and rpciod is woken up.
*/
static struct rpc_wait_queue schedq = RPC_INIT_WAITQ("schedq");
static RPC_WAITQ(schedq, "schedq");
/*
* RPC tasks that create another task (e.g. for contacting the portmapper)
* will wait on this queue for their child's completion
*/
static struct rpc_wait_queue childq = RPC_INIT_WAITQ("childq");
static RPC_WAITQ(childq, "childq");
/*
* RPC tasks sit here while waiting for conditions to improve.
*/
static struct rpc_wait_queue delay_queue = RPC_INIT_WAITQ("delayq");
static RPC_WAITQ(delay_queue, "delayq");
/*
* All RPC tasks are linked into this list
*/
static struct rpc_task * all_tasks;
static LIST_HEAD(all_tasks);
/*
* rpciod-related stuff
......@@ -194,9 +194,9 @@ __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
return -EWOULDBLOCK;
}
if (RPC_IS_SWAPPER(task))
rpc_insert_list(&queue->task, task);
list_add(&task->tk_list, &queue->tasks);
else
rpc_append_list(&queue->task, task);
list_add_tail(&task->tk_list, &queue->tasks);
task->tk_rpcwait = queue;
dprintk("RPC: %4d added to queue %p \"%s\"\n",
......@@ -228,7 +228,7 @@ __rpc_remove_wait_queue(struct rpc_task *task)
if (!queue)
return;
rpc_remove_list(&queue->task, task);
list_del(&task->tk_list);
task->tk_rpcwait = NULL;
dprintk("RPC: %4d removed from queue %p \"%s\"\n",
......@@ -450,11 +450,11 @@ rpc_wake_up_task(struct rpc_task *task)
struct rpc_task *
rpc_wake_up_next(struct rpc_wait_queue *queue)
{
struct rpc_task *task;
struct rpc_task *task = NULL;
dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
spin_lock_bh(&rpc_queue_lock);
if ((task = queue->task) != 0)
task_for_first(task, &queue->tasks)
__rpc_wake_up_task(task);
spin_unlock_bh(&rpc_queue_lock);
......@@ -470,9 +470,12 @@ rpc_wake_up_next(struct rpc_wait_queue *queue)
void
rpc_wake_up(struct rpc_wait_queue *queue)
{
struct rpc_task *task;
spin_lock_bh(&rpc_queue_lock);
while (queue->task)
__rpc_wake_up_task(queue->task);
while (!list_empty(&queue->tasks))
task_for_first(task, &queue->tasks)
__rpc_wake_up_task(task);
spin_unlock_bh(&rpc_queue_lock);
}
......@@ -489,10 +492,12 @@ rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
struct rpc_task *task;
spin_lock_bh(&rpc_queue_lock);
while ((task = queue->task) != NULL) {
while (!list_empty(&queue->tasks)) {
task_for_first(task, &queue->tasks) {
task->tk_status = status;
__rpc_wake_up_task(task);
}
}
spin_unlock_bh(&rpc_queue_lock);
}
......@@ -703,10 +708,8 @@ __rpc_schedule(void)
rpciod_tcp_dispatcher();
spin_lock_bh(&rpc_queue_lock);
if (!(task = schedq.task)) {
spin_unlock_bh(&rpc_queue_lock);
break;
}
task_for_first(task, &schedq.tasks) {
if (task->tk_lock) {
spin_unlock_bh(&rpc_queue_lock);
printk(KERN_ERR "RPC: Locked task was scheduled !!!!\n");
......@@ -720,6 +723,10 @@ __rpc_schedule(void)
spin_unlock_bh(&rpc_queue_lock);
__rpc_execute(task);
} else {
spin_unlock_bh(&rpc_queue_lock);
break;
}
if (++count >= 200 || need_resched()) {
count = 0;
......@@ -814,11 +821,7 @@ rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
/* Add to global list of all tasks */
spin_lock(&rpc_sched_lock);
task->tk_next_task = all_tasks;
task->tk_prev_task = NULL;
if (all_tasks)
all_tasks->tk_prev_task = task;
all_tasks = task;
list_add(&task->tk_task, &all_tasks);
spin_unlock(&rpc_sched_lock);
if (clnt)
......@@ -877,8 +880,6 @@ rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
void
rpc_release_task(struct rpc_task *task)
{
struct rpc_task *next, *prev;
dprintk("RPC: %4d release task\n", task->tk_pid);
#ifdef RPC_DEBUG
......@@ -892,15 +893,7 @@ rpc_release_task(struct rpc_task *task)
/* Remove from global task list */
spin_lock(&rpc_sched_lock);
prev = task->tk_prev_task;
next = task->tk_next_task;
if (next)
next->tk_prev_task = prev;
if (prev)
prev->tk_next_task = next;
else
all_tasks = next;
task->tk_next_task = task->tk_prev_task = NULL;
list_del(&task->tk_task);
spin_unlock(&rpc_sched_lock);
/* Protect the execution below. */
......@@ -954,14 +947,13 @@ static inline struct rpc_task *
rpc_find_parent(struct rpc_task *child)
{
struct rpc_task *task, *parent;
struct list_head *le;
parent = (struct rpc_task *) child->tk_calldata;
if ((task = childq.task) != NULL) {
do {
task_for_each(task, le, &childq.tasks)
if (task == parent)
return parent;
} while ((task = task->tk_next) != childq.task);
}
return NULL;
}
......@@ -1015,7 +1007,8 @@ rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
void
rpc_killall_tasks(struct rpc_clnt *clnt)
{
struct rpc_task **q, *rovr;
struct rpc_task *rovr;
struct list_head *le;
dprintk("RPC: killing all tasks for client %p\n", clnt);
......@@ -1023,13 +1016,12 @@ rpc_killall_tasks(struct rpc_clnt *clnt)
* Spin lock all_tasks to prevent changes...
*/
spin_lock(&rpc_sched_lock);
for (q = &all_tasks; (rovr = *q); q = &rovr->tk_next_task) {
alltask_for_each(rovr, le, &all_tasks)
if (!clnt || rovr->tk_client == clnt) {
rovr->tk_flags |= RPC_TASK_KILLED;
rpc_exit(rovr, -EIO);
rpc_wake_up_task(rovr);
}
}
spin_unlock(&rpc_sched_lock);
}
......@@ -1038,7 +1030,7 @@ static DECLARE_MUTEX_LOCKED(rpciod_running);
static inline int
rpciod_task_pending(void)
{
return schedq.task != NULL || xprt_tcp_pending();
return !list_empty(&schedq.tasks) || xprt_tcp_pending();
}
......@@ -1090,7 +1082,7 @@ rpciod(void *ptr)
}
dprintk("RPC: rpciod shutdown commences\n");
if (all_tasks) {
if (!list_empty(&all_tasks)) {
printk(KERN_ERR "rpciod: active tasks at shutdown?!\n");
rpciod_killall();
}
......@@ -1108,11 +1100,11 @@ rpciod_killall(void)
{
unsigned long flags;
while (all_tasks) {
while (!list_empty(&all_tasks)) {
clear_thread_flag(TIF_SIGPENDING);
rpc_killall_tasks(NULL);
__rpc_schedule();
if (all_tasks) {
if (!list_empty(&all_tasks)) {
dprintk("rpciod_killall: waiting for tasks to exit\n");
yield();
}
......@@ -1207,25 +1199,23 @@ rpciod_down(void)
#ifdef RPC_DEBUG
void rpc_show_tasks(void)
{
struct rpc_task *t = all_tasks, *next;
struct list_head *le;
struct rpc_task *t;
spin_lock(&rpc_sched_lock);
t = all_tasks;
if (!t) {
if (list_empty(&all_tasks)) {
spin_unlock(&rpc_sched_lock);
return;
}
printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
"-rpcwait -action- --exit--\n");
for (; t; t = next) {
next = t->tk_next_task;
alltask_for_each(t, le, &all_tasks)
printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
t->tk_pid, t->tk_msg.rpc_proc, t->tk_flags, t->tk_status,
t->tk_client, t->tk_client->cl_prog,
t->tk_rqstp, t->tk_timeout,
t->tk_rpcwait ? rpc_qname(t->tk_rpcwait) : " <NULL> ",
t->tk_action, t->tk_exit);
}
spin_unlock(&rpc_sched_lock);
}
#endif
......@@ -44,6 +44,9 @@ svc_create(struct svc_program *prog, unsigned int bufsize, unsigned int xdrsize)
serv->sv_stats = prog->pg_stats;
serv->sv_bufsz = bufsize? bufsize : 4096;
serv->sv_xdrsize = xdrsize;
INIT_LIST_HEAD(&serv->sv_threads);
INIT_LIST_HEAD(&serv->sv_sockets);
INIT_LIST_HEAD(&serv->sv_allsocks);
spin_lock_init(&serv->sv_lock);
serv->sv_name = prog->pg_name;
......@@ -72,8 +75,12 @@ svc_destroy(struct svc_serv *serv)
} else
printk("svc_destroy: no threads for serv=%p!\n", serv);
while ((svsk = serv->sv_allsocks) != NULL)
while (!list_empty(&serv->sv_allsocks)) {
svsk = list_entry(serv->sv_allsocks.next,
struct svc_sock,
sk_list);
svc_delete_socket(svsk);
}
/* Unregister service with the portmapper */
svc_register(serv, 0, 0);
......
......@@ -63,11 +63,14 @@ static int svc_udp_sendto(struct svc_rqst *);
/*
* Queue up an idle server thread. Must have serv->sv_lock held.
* Note: this is really a stack rather than a queue, so that we only
* use as many different threads as we need, and the rest don't polute
* the cache.
*/
static inline void
svc_serv_enqueue(struct svc_serv *serv, struct svc_rqst *rqstp)
{
rpc_append_list(&serv->sv_threads, rqstp);
list_add(&rqstp->rq_list, &serv->sv_threads);
}
/*
......@@ -76,7 +79,7 @@ svc_serv_enqueue(struct svc_serv *serv, struct svc_rqst *rqstp)
static inline void
svc_serv_dequeue(struct svc_serv *serv, struct svc_rqst *rqstp)
{
rpc_remove_list(&serv->sv_threads, rqstp);
list_del(&rqstp->rq_list);
}
/*
......@@ -110,7 +113,8 @@ svc_sock_enqueue(struct svc_sock *svsk)
/* NOTE: Local BH is already disabled by our caller. */
spin_lock(&serv->sv_lock);
if (serv->sv_threads && serv->sv_sockets)
if (!list_empty(&serv->sv_threads) &&
!list_empty(&serv->sv_sockets))
printk(KERN_ERR
"svc_sock_enqueue: threads and sockets both waiting??\n");
......@@ -126,7 +130,10 @@ svc_sock_enqueue(struct svc_sock *svsk)
*/
svsk->sk_busy = 1;
if ((rqstp = serv->sv_threads) != NULL) {
if (!list_empty(&serv->sv_threads)) {
rqstp = list_entry(serv->sv_threads.next,
struct svc_rqst,
rq_list);
dprintk("svc: socket %p served by daemon %p\n",
svsk->sk_sk, rqstp);
svc_serv_dequeue(serv, rqstp);
......@@ -139,7 +146,7 @@ svc_sock_enqueue(struct svc_sock *svsk)
wake_up(&rqstp->rq_wait);
} else {
dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
rpc_append_list(&serv->sv_sockets, svsk);
list_add_tail(&svsk->sk_ready, &serv->sv_sockets);
svsk->sk_qued = 1;
}
......@@ -155,14 +162,16 @@ svc_sock_dequeue(struct svc_serv *serv)
{
struct svc_sock *svsk;
if ((svsk = serv->sv_sockets) != NULL)
rpc_remove_list(&serv->sv_sockets, svsk);
if (list_empty(&serv->sv_sockets))
return NULL;
svsk = list_entry(serv->sv_sockets.next,
struct svc_sock, sk_ready);
list_del(&svsk->sk_ready);
if (svsk) {
dprintk("svc: socket %p dequeued, inuse=%d\n",
svsk->sk_sk, svsk->sk_inuse);
svsk->sk_qued = 0;
}
return svsk;
}
......@@ -238,7 +247,10 @@ svc_wake_up(struct svc_serv *serv)
struct svc_rqst *rqstp;
spin_lock_bh(&serv->sv_lock);
if ((rqstp = serv->sv_threads) != NULL) {
if (!list_empty(&serv->sv_threads)) {
rqstp = list_entry(serv->sv_threads.next,
struct svc_rqst,
rq_list);
dprintk("svc: daemon %p woken up.\n", rqstp);
/*
svc_serv_dequeue(serv, rqstp);
......@@ -958,8 +970,7 @@ if (svsk->sk_sk == NULL)
}
spin_lock_bh(&serv->sv_lock);
svsk->sk_list = serv->sv_allsocks;
serv->sv_allsocks = svsk;
list_add(&svsk->sk_list, &serv->sv_allsocks);
spin_unlock_bh(&serv->sv_lock);
dprintk("svc: svc_setup_socket created %p (inet %p)\n",
......@@ -1020,7 +1031,6 @@ svc_create_socket(struct svc_serv *serv, int protocol, struct sockaddr_in *sin)
void
svc_delete_socket(struct svc_sock *svsk)
{
struct svc_sock **rsk;
struct svc_serv *serv;
struct sock *sk;
......@@ -1034,17 +1044,9 @@ svc_delete_socket(struct svc_sock *svsk)
spin_lock_bh(&serv->sv_lock);
for (rsk = &serv->sv_allsocks; *rsk; rsk = &(*rsk)->sk_list) {
if (*rsk == svsk)
break;
}
if (!*rsk) {
spin_unlock_bh(&serv->sv_lock);
return;
}
*rsk = svsk->sk_list;
list_del(&svsk->sk_list);
if (svsk->sk_qued)
rpc_remove_list(&serv->sv_sockets, svsk);
list_del(&svsk->sk_ready);
svsk->sk_dead = 1;
......
......@@ -542,25 +542,15 @@ xprt_reconn_status(struct rpc_task *task)
static inline struct rpc_rqst *
xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
{
struct rpc_task *head, *task;
struct rpc_rqst *req;
int safe = 0;
struct list_head *le;
struct rpc_task *task;
spin_lock_bh(&rpc_queue_lock);
if ((head = xprt->pending.task) != NULL) {
task = head;
do {
task_for_each(task, le, &xprt->pending.tasks)
if ((req = task->tk_rqstp) && req->rq_xid == xid)
goto out;
task = task->tk_next;
if (++safe > 100) {
printk("xprt_lookup_rqst: loop in Q!\n");
goto out_bad;
}
} while (task != head);
}
dprintk("RPC: unknown XID %08x in reply.\n", xid);
out_bad:
req = NULL;
out:
if (req && !__rpc_lock_task(req->rq_task))
......@@ -1487,9 +1477,9 @@ xprt_setup(struct socket *sock, int proto,
} else
xprt_default_timeout(&xprt->timeout, xprt->prot);
xprt->pending = RPC_INIT_WAITQ("xprt_pending");
xprt->sending = RPC_INIT_WAITQ("xprt_sending");
xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
/* initialize free list */
for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment