Commit ccc62d90 authored by Trond Myklebust's avatar Trond Myklebust

RPC: Convert rpciod into a work queue for greater flexibility.

Signed-off-by: default avatarTrond Myklebust <trond.myklebust@fys.uio.no>
parent 72e57cd5
...@@ -215,7 +215,6 @@ nfs_complete_unlink(struct dentry *dentry) ...@@ -215,7 +215,6 @@ nfs_complete_unlink(struct dentry *dentry)
spin_lock(&dentry->d_lock); spin_lock(&dentry->d_lock);
dentry->d_flags &= ~DCACHE_NFSFS_RENAMED; dentry->d_flags &= ~DCACHE_NFSFS_RENAMED;
spin_unlock(&dentry->d_lock); spin_unlock(&dentry->d_lock);
if (data->task.tk_rpcwait == &nfs_delete_queue) rpc_wake_up_task(&data->task);
rpc_wake_up_task(&data->task);
nfs_put_unlinkdata(data); nfs_put_unlinkdata(data);
} }
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#include <linux/timer.h> #include <linux/timer.h>
#include <linux/sunrpc/types.h> #include <linux/sunrpc/types.h>
#include <linux/wait.h> #include <linux/wait.h>
#include <linux/workqueue.h>
#include <linux/sunrpc/xdr.h> #include <linux/sunrpc/xdr.h>
/* /*
...@@ -25,11 +26,18 @@ struct rpc_message { ...@@ -25,11 +26,18 @@ struct rpc_message {
struct rpc_cred * rpc_cred; /* Credentials */ struct rpc_cred * rpc_cred; /* Credentials */
}; };
struct rpc_wait_queue;
struct rpc_wait {
struct list_head list; /* wait queue links */
struct list_head links; /* Links to related tasks */
wait_queue_head_t waitq; /* sync: sleep on this q */
struct rpc_wait_queue * rpc_waitq; /* RPC wait queue we're on */
};
/* /*
* This is the RPC task struct * This is the RPC task struct
*/ */
struct rpc_task { struct rpc_task {
struct list_head tk_list; /* wait queue links */
#ifdef RPC_DEBUG #ifdef RPC_DEBUG
unsigned long tk_magic; /* 0xf00baa */ unsigned long tk_magic; /* 0xf00baa */
#endif #endif
...@@ -37,7 +45,6 @@ struct rpc_task { ...@@ -37,7 +45,6 @@ struct rpc_task {
struct rpc_clnt * tk_client; /* RPC client */ struct rpc_clnt * tk_client; /* RPC client */
struct rpc_rqst * tk_rqstp; /* RPC request */ struct rpc_rqst * tk_rqstp; /* RPC request */
int tk_status; /* result of last operation */ int tk_status; /* result of last operation */
struct rpc_wait_queue * tk_rpcwait; /* RPC wait queue we're on */
/* /*
* RPC call state * RPC call state
...@@ -70,13 +77,18 @@ struct rpc_task { ...@@ -70,13 +77,18 @@ struct rpc_task {
* you have a pathological interest in kernel oopses. * you have a pathological interest in kernel oopses.
*/ */
struct timer_list tk_timer; /* kernel timer */ struct timer_list tk_timer; /* kernel timer */
wait_queue_head_t tk_wait; /* sync: sleep on this q */
unsigned long tk_timeout; /* timeout for rpc_sleep() */ unsigned long tk_timeout; /* timeout for rpc_sleep() */
unsigned short tk_flags; /* misc flags */ unsigned short tk_flags; /* misc flags */
unsigned char tk_active : 1;/* Task has been activated */ unsigned char tk_active : 1;/* Task has been activated */
unsigned char tk_priority : 2;/* Task priority */ unsigned char tk_priority : 2;/* Task priority */
unsigned long tk_runstate; /* Task run status */ unsigned long tk_runstate; /* Task run status */
struct list_head tk_links; /* links to related tasks */ struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could
* be any workqueue
*/
union {
struct work_struct tk_work; /* Async task work queue */
struct rpc_wait tk_wait; /* RPC wait */
} u;
#ifdef RPC_DEBUG #ifdef RPC_DEBUG
unsigned short tk_pid; /* debugging aid */ unsigned short tk_pid; /* debugging aid */
#endif #endif
...@@ -87,11 +99,11 @@ struct rpc_task { ...@@ -87,11 +99,11 @@ struct rpc_task {
/* support walking a list of tasks on a wait queue */ /* support walking a list of tasks on a wait queue */
#define task_for_each(task, pos, head) \ #define task_for_each(task, pos, head) \
list_for_each(pos, head) \ list_for_each(pos, head) \
if ((task=list_entry(pos, struct rpc_task, tk_list)),1) if ((task=list_entry(pos, struct rpc_task, u.tk_wait.list)),1)
#define task_for_first(task, head) \ #define task_for_first(task, head) \
if (!list_empty(head) && \ if (!list_empty(head) && \
((task=list_entry((head)->next, struct rpc_task, tk_list)),1)) ((task=list_entry((head)->next, struct rpc_task, u.tk_wait.list)),1))
/* .. and walking list of all tasks */ /* .. and walking list of all tasks */
#define alltask_for_each(task, pos, head) \ #define alltask_for_each(task, pos, head) \
...@@ -126,22 +138,38 @@ typedef void (*rpc_action)(struct rpc_task *); ...@@ -126,22 +138,38 @@ typedef void (*rpc_action)(struct rpc_task *);
#define RPC_IS_SOFT(t) ((t)->tk_flags & RPC_TASK_SOFT) #define RPC_IS_SOFT(t) ((t)->tk_flags & RPC_TASK_SOFT)
#define RPC_TASK_UNINTERRUPTIBLE(t) ((t)->tk_flags & RPC_TASK_NOINTR) #define RPC_TASK_UNINTERRUPTIBLE(t) ((t)->tk_flags & RPC_TASK_NOINTR)
#define RPC_TASK_SLEEPING 0 #define RPC_TASK_RUNNING 0
#define RPC_TASK_RUNNING 1 #define RPC_TASK_QUEUED 1
#define RPC_IS_SLEEPING(t) (test_bit(RPC_TASK_SLEEPING, &(t)->tk_runstate)) #define RPC_TASK_WAKEUP 2
#define RPC_IS_RUNNING(t) (test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
#define RPC_IS_RUNNING(t) (test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
#define rpc_set_running(t) (set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)) #define rpc_set_running(t) (set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
#define rpc_clear_running(t) (clear_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)) #define rpc_test_and_set_running(t) \
(test_and_set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate))
#define rpc_clear_running(t) \
do { \
smp_mb__before_clear_bit(); \
clear_bit(RPC_TASK_RUNNING, &(t)->tk_runstate); \
smp_mb__after_clear_bit(); \
} while (0)
#define rpc_set_sleeping(t) (set_bit(RPC_TASK_SLEEPING, &(t)->tk_runstate)) #define RPC_IS_QUEUED(t) (test_bit(RPC_TASK_QUEUED, &(t)->tk_runstate))
#define rpc_set_queued(t) (set_bit(RPC_TASK_QUEUED, &(t)->tk_runstate))
#define rpc_clear_queued(t) \
do { \
smp_mb__before_clear_bit(); \
clear_bit(RPC_TASK_QUEUED, &(t)->tk_runstate); \
smp_mb__after_clear_bit(); \
} while (0)
#define rpc_clear_sleeping(t) \ #define rpc_start_wakeup(t) \
(test_and_set_bit(RPC_TASK_WAKEUP, &(t)->tk_runstate) == 0)
#define rpc_finish_wakeup(t) \
do { \ do { \
smp_mb__before_clear_bit(); \ smp_mb__before_clear_bit(); \
clear_bit(RPC_TASK_SLEEPING, &(t)->tk_runstate); \ clear_bit(RPC_TASK_WAKEUP, &(t)->tk_runstate); \
smp_mb__after_clear_bit(); \ smp_mb__after_clear_bit(); \
} while(0) } while (0)
/* /*
* Task priorities. * Task priorities.
...@@ -209,13 +237,10 @@ void rpc_killall_tasks(struct rpc_clnt *); ...@@ -209,13 +237,10 @@ void rpc_killall_tasks(struct rpc_clnt *);
int rpc_execute(struct rpc_task *); int rpc_execute(struct rpc_task *);
void rpc_run_child(struct rpc_task *parent, struct rpc_task *child, void rpc_run_child(struct rpc_task *parent, struct rpc_task *child,
rpc_action action); rpc_action action);
int rpc_add_wait_queue(struct rpc_wait_queue *, struct rpc_task *);
void rpc_remove_wait_queue(struct rpc_task *);
void rpc_init_priority_wait_queue(struct rpc_wait_queue *, const char *); void rpc_init_priority_wait_queue(struct rpc_wait_queue *, const char *);
void rpc_init_wait_queue(struct rpc_wait_queue *, const char *); void rpc_init_wait_queue(struct rpc_wait_queue *, const char *);
void rpc_sleep_on(struct rpc_wait_queue *, struct rpc_task *, void rpc_sleep_on(struct rpc_wait_queue *, struct rpc_task *,
rpc_action action, rpc_action timer); rpc_action action, rpc_action timer);
void rpc_add_timer(struct rpc_task *, rpc_action);
void rpc_wake_up_task(struct rpc_task *); void rpc_wake_up_task(struct rpc_task *);
void rpc_wake_up(struct rpc_wait_queue *); void rpc_wake_up(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *); struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
......
...@@ -41,16 +41,9 @@ static mempool_t *rpc_buffer_mempool; ...@@ -41,16 +41,9 @@ static mempool_t *rpc_buffer_mempool;
static void __rpc_default_timer(struct rpc_task *task); static void __rpc_default_timer(struct rpc_task *task);
static void rpciod_killall(void); static void rpciod_killall(void);
static void rpc_free(struct rpc_task *task); static void rpc_free(struct rpc_task *task);
static void rpc_async_schedule(void *);
/*
* When an asynchronous RPC task is activated within a bottom half
* handler, or while executing another RPC task, it is put on
* schedq, and rpciod is woken up.
*/
static RPC_WAITQ(schedq, "schedq");
/* /*
* RPC tasks that create another task (e.g. for contacting the portmapper) * RPC tasks that create another task (e.g. for contacting the portmapper)
...@@ -71,12 +64,9 @@ static LIST_HEAD(all_tasks); ...@@ -71,12 +64,9 @@ static LIST_HEAD(all_tasks);
/* /*
* rpciod-related stuff * rpciod-related stuff
*/ */
static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle);
static DECLARE_COMPLETION(rpciod_killer);
static DECLARE_MUTEX(rpciod_sema); static DECLARE_MUTEX(rpciod_sema);
static unsigned int rpciod_users; static unsigned int rpciod_users;
static pid_t rpciod_pid; static struct workqueue_struct *rpciod_workqueue;
static int rpc_inhibit;
/* /*
* Spinlock for wait queues. Access to the latter also has to be * Spinlock for wait queues. Access to the latter also has to be
...@@ -108,16 +98,13 @@ __rpc_disable_timer(struct rpc_task *task) ...@@ -108,16 +98,13 @@ __rpc_disable_timer(struct rpc_task *task)
* without calling del_timer_sync(). The latter could cause a * without calling del_timer_sync(). The latter could cause a
* deadlock if called while we're holding spinlocks... * deadlock if called while we're holding spinlocks...
*/ */
static void static void rpc_run_timer(struct rpc_task *task)
rpc_run_timer(struct rpc_task *task)
{ {
void (*callback)(struct rpc_task *); void (*callback)(struct rpc_task *);
spin_lock_bh(&rpc_queue_lock);
callback = task->tk_timeout_fn; callback = task->tk_timeout_fn;
task->tk_timeout_fn = NULL; task->tk_timeout_fn = NULL;
spin_unlock_bh(&rpc_queue_lock); if (callback && RPC_IS_QUEUED(task)) {
if (callback) {
dprintk("RPC: %4d running timer\n", task->tk_pid); dprintk("RPC: %4d running timer\n", task->tk_pid);
callback(task); callback(task);
} }
...@@ -142,17 +129,6 @@ __rpc_add_timer(struct rpc_task *task, rpc_action timer) ...@@ -142,17 +129,6 @@ __rpc_add_timer(struct rpc_task *task, rpc_action timer)
mod_timer(&task->tk_timer, jiffies + task->tk_timeout); mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
} }
/*
* Set up a timer for an already sleeping task.
*/
void rpc_add_timer(struct rpc_task *task, rpc_action timer)
{
spin_lock_bh(&rpc_queue_lock);
if (!RPC_IS_RUNNING(task))
__rpc_add_timer(task, timer);
spin_unlock_bh(&rpc_queue_lock);
}
/* /*
* Delete any timer for the current task. Because we use del_timer_sync(), * Delete any timer for the current task. Because we use del_timer_sync(),
* this function should never be called while holding rpc_queue_lock. * this function should never be called while holding rpc_queue_lock.
...@@ -172,16 +148,17 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r ...@@ -172,16 +148,17 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r
struct list_head *q; struct list_head *q;
struct rpc_task *t; struct rpc_task *t;
INIT_LIST_HEAD(&task->u.tk_wait.links);
q = &queue->tasks[task->tk_priority]; q = &queue->tasks[task->tk_priority];
if (unlikely(task->tk_priority > queue->maxpriority)) if (unlikely(task->tk_priority > queue->maxpriority))
q = &queue->tasks[queue->maxpriority]; q = &queue->tasks[queue->maxpriority];
list_for_each_entry(t, q, tk_list) { list_for_each_entry(t, q, u.tk_wait.list) {
if (t->tk_cookie == task->tk_cookie) { if (t->tk_cookie == task->tk_cookie) {
list_add_tail(&task->tk_list, &t->tk_links); list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
return; return;
} }
} }
list_add_tail(&task->tk_list, q); list_add_tail(&task->u.tk_wait.list, q);
} }
/* /*
...@@ -192,37 +169,21 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r ...@@ -192,37 +169,21 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r
* improve overall performance. * improve overall performance.
* Everyone else gets appended to the queue to ensure proper FIFO behavior. * Everyone else gets appended to the queue to ensure proper FIFO behavior.
*/ */
static int __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
{ {
if (task->tk_rpcwait == queue) BUG_ON (RPC_IS_QUEUED(task));
return 0;
if (task->tk_rpcwait) {
printk(KERN_WARNING "RPC: doubly enqueued task!\n");
return -EWOULDBLOCK;
}
if (RPC_IS_PRIORITY(queue)) if (RPC_IS_PRIORITY(queue))
__rpc_add_wait_queue_priority(queue, task); __rpc_add_wait_queue_priority(queue, task);
else if (RPC_IS_SWAPPER(task)) else if (RPC_IS_SWAPPER(task))
list_add(&task->tk_list, &queue->tasks[0]); list_add(&task->u.tk_wait.list, &queue->tasks[0]);
else else
list_add_tail(&task->tk_list, &queue->tasks[0]); list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
task->tk_rpcwait = queue; task->u.tk_wait.rpc_waitq = queue;
rpc_set_queued(task);
dprintk("RPC: %4d added to queue %p \"%s\"\n", dprintk("RPC: %4d added to queue %p \"%s\"\n",
task->tk_pid, queue, rpc_qname(queue)); task->tk_pid, queue, rpc_qname(queue));
return 0;
}
int rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
{
int result;
spin_lock_bh(&rpc_queue_lock);
result = __rpc_add_wait_queue(q, task);
spin_unlock_bh(&rpc_queue_lock);
return result;
} }
/* /*
...@@ -232,12 +193,12 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task) ...@@ -232,12 +193,12 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
{ {
struct rpc_task *t; struct rpc_task *t;
if (!list_empty(&task->tk_links)) { if (!list_empty(&task->u.tk_wait.links)) {
t = list_entry(task->tk_links.next, struct rpc_task, tk_list); t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
list_move(&t->tk_list, &task->tk_list); list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
list_splice_init(&task->tk_links, &t->tk_links); list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
} }
list_del(&task->tk_list); list_del(&task->u.tk_wait.list);
} }
/* /*
...@@ -246,31 +207,17 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task) ...@@ -246,31 +207,17 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
*/ */
static void __rpc_remove_wait_queue(struct rpc_task *task) static void __rpc_remove_wait_queue(struct rpc_task *task)
{ {
struct rpc_wait_queue *queue = task->tk_rpcwait; struct rpc_wait_queue *queue;
queue = task->u.tk_wait.rpc_waitq;
if (!queue)
return;
if (RPC_IS_PRIORITY(queue)) if (RPC_IS_PRIORITY(queue))
__rpc_remove_wait_queue_priority(task); __rpc_remove_wait_queue_priority(task);
else else
list_del(&task->tk_list); list_del(&task->u.tk_wait.list);
task->tk_rpcwait = NULL;
dprintk("RPC: %4d removed from queue %p \"%s\"\n", dprintk("RPC: %4d removed from queue %p \"%s\"\n",
task->tk_pid, queue, rpc_qname(queue)); task->tk_pid, queue, rpc_qname(queue));
} }
void
rpc_remove_wait_queue(struct rpc_task *task)
{
if (!task->tk_rpcwait)
return;
spin_lock_bh(&rpc_queue_lock);
__rpc_remove_wait_queue(task);
spin_unlock_bh(&rpc_queue_lock);
}
static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority) static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
{ {
queue->priority = priority; queue->priority = priority;
...@@ -319,34 +266,31 @@ EXPORT_SYMBOL(rpc_init_wait_queue); ...@@ -319,34 +266,31 @@ EXPORT_SYMBOL(rpc_init_wait_queue);
* Note: If the task is ASYNC, this must be called with * Note: If the task is ASYNC, this must be called with
* the spinlock held to protect the wait queue operation. * the spinlock held to protect the wait queue operation.
*/ */
static inline void static void rpc_make_runnable(struct rpc_task *task)
rpc_make_runnable(struct rpc_task *task)
{ {
if (task->tk_timeout_fn) { int do_ret;
printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n");
BUG_ON(task->tk_timeout_fn);
do_ret = rpc_test_and_set_running(task);
rpc_clear_queued(task);
if (do_ret)
return; return;
}
rpc_set_running(task);
if (RPC_IS_ASYNC(task)) { if (RPC_IS_ASYNC(task)) {
if (RPC_IS_SLEEPING(task)) { int status;
int status;
status = __rpc_add_wait_queue(&schedq, task); INIT_WORK(&task->u.tk_work, rpc_async_schedule, (void *)task);
if (status < 0) { status = queue_work(task->tk_workqueue, &task->u.tk_work);
printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); if (status < 0) {
task->tk_status = status; printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
return; task->tk_status = status;
} return;
rpc_clear_sleeping(task);
wake_up(&rpciod_idle);
} }
} else { } else
rpc_clear_sleeping(task); wake_up(&task->u.tk_wait.waitq);
wake_up(&task->tk_wait);
}
} }
/* /*
* Place a newly initialized task on the schedq. * Place a newly initialized task on the workqueue.
*/ */
static inline void static inline void
rpc_schedule_run(struct rpc_task *task) rpc_schedule_run(struct rpc_task *task)
...@@ -355,33 +299,18 @@ rpc_schedule_run(struct rpc_task *task) ...@@ -355,33 +299,18 @@ rpc_schedule_run(struct rpc_task *task)
if (RPC_IS_ACTIVATED(task)) if (RPC_IS_ACTIVATED(task))
return; return;
task->tk_active = 1; task->tk_active = 1;
rpc_set_sleeping(task);
rpc_make_runnable(task); rpc_make_runnable(task);
} }
/*
* For other people who may need to wake the I/O daemon
* but should (for now) know nothing about its innards
*/
void rpciod_wake_up(void)
{
if(rpciod_pid==0)
printk(KERN_ERR "rpciod: wot no daemon?\n");
wake_up(&rpciod_idle);
}
/* /*
* Prepare for sleeping on a wait queue. * Prepare for sleeping on a wait queue.
* By always appending tasks to the list we ensure FIFO behavior. * By always appending tasks to the list we ensure FIFO behavior.
* NB: An RPC task will only receive interrupt-driven events as long * NB: An RPC task will only receive interrupt-driven events as long
* as it's on a wait queue. * as it's on a wait queue.
*/ */
static void static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
__rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
rpc_action action, rpc_action timer) rpc_action action, rpc_action timer)
{ {
int status;
dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid, dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
rpc_qname(q), jiffies); rpc_qname(q), jiffies);
...@@ -391,24 +320,14 @@ __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, ...@@ -391,24 +320,14 @@ __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
} }
/* Mark the task as being activated if so needed */ /* Mark the task as being activated if so needed */
if (!RPC_IS_ACTIVATED(task)) { if (!RPC_IS_ACTIVATED(task))
task->tk_active = 1; task->tk_active = 1;
rpc_set_sleeping(task);
}
status = __rpc_add_wait_queue(q, task); __rpc_add_wait_queue(q, task);
if (status) {
printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); BUG_ON(task->tk_callback != NULL);
task->tk_status = status; task->tk_callback = action;
} else { __rpc_add_timer(task, timer);
rpc_clear_running(task);
if (task->tk_callback) {
dprintk(KERN_ERR "RPC: %4d overwrites an active callback\n", task->tk_pid);
BUG();
}
task->tk_callback = action;
__rpc_add_timer(task, timer);
}
} }
void void
...@@ -424,16 +343,14 @@ rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, ...@@ -424,16 +343,14 @@ rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
} }
/** /**
* __rpc_wake_up_task - wake up a single rpc_task * __rpc_do_wake_up_task - wake up a single rpc_task
* @task: task to be woken up * @task: task to be woken up
* *
* Caller must hold rpc_queue_lock * Caller must hold rpc_queue_lock, and have cleared the task queued flag.
*/ */
static void static void __rpc_do_wake_up_task(struct rpc_task *task)
__rpc_wake_up_task(struct rpc_task *task)
{ {
dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n", dprintk("RPC: %4d __rpc_wake_up_task (now %ld)\n", task->tk_pid, jiffies);
task->tk_pid, jiffies, rpc_inhibit);
#ifdef RPC_DEBUG #ifdef RPC_DEBUG
if (task->tk_magic != 0xf00baa) { if (task->tk_magic != 0xf00baa) {
...@@ -448,18 +365,27 @@ __rpc_wake_up_task(struct rpc_task *task) ...@@ -448,18 +365,27 @@ __rpc_wake_up_task(struct rpc_task *task)
printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
return; return;
} }
if (RPC_IS_RUNNING(task))
return;
__rpc_disable_timer(task); __rpc_disable_timer(task);
if (task->tk_rpcwait != &schedq) __rpc_remove_wait_queue(task);
__rpc_remove_wait_queue(task);
rpc_make_runnable(task); rpc_make_runnable(task);
dprintk("RPC: __rpc_wake_up_task done\n"); dprintk("RPC: __rpc_wake_up_task done\n");
} }
/*
* Wake up the specified task
*/
static void __rpc_wake_up_task(struct rpc_task *task)
{
if (rpc_start_wakeup(task)) {
if (RPC_IS_QUEUED(task))
__rpc_do_wake_up_task(task);
rpc_finish_wakeup(task);
}
}
/* /*
* Default timeout handler if none specified by user * Default timeout handler if none specified by user
*/ */
...@@ -474,14 +400,16 @@ __rpc_default_timer(struct rpc_task *task) ...@@ -474,14 +400,16 @@ __rpc_default_timer(struct rpc_task *task)
/* /*
* Wake up the specified task * Wake up the specified task
*/ */
void void rpc_wake_up_task(struct rpc_task *task)
rpc_wake_up_task(struct rpc_task *task)
{ {
if (RPC_IS_RUNNING(task)) if (rpc_start_wakeup(task)) {
return; if (RPC_IS_QUEUED(task)) {
spin_lock_bh(&rpc_queue_lock); spin_lock_bh(&rpc_queue_lock);
__rpc_wake_up_task(task); __rpc_do_wake_up_task(task);
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&rpc_queue_lock);
}
rpc_finish_wakeup(task);
}
} }
/* /*
...@@ -497,11 +425,11 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu ...@@ -497,11 +425,11 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
*/ */
q = &queue->tasks[queue->priority]; q = &queue->tasks[queue->priority];
if (!list_empty(q)) { if (!list_empty(q)) {
task = list_entry(q->next, struct rpc_task, tk_list); task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
if (queue->cookie == task->tk_cookie) { if (queue->cookie == task->tk_cookie) {
if (--queue->nr) if (--queue->nr)
goto out; goto out;
list_move_tail(&task->tk_list, q); list_move_tail(&task->u.tk_wait.list, q);
} }
/* /*
* Check if we need to switch queues. * Check if we need to switch queues.
...@@ -519,7 +447,7 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu ...@@ -519,7 +447,7 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
else else
q = q - 1; q = q - 1;
if (!list_empty(q)) { if (!list_empty(q)) {
task = list_entry(q->next, struct rpc_task, tk_list); task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
goto new_queue; goto new_queue;
} }
} while (q != &queue->tasks[queue->priority]); } while (q != &queue->tasks[queue->priority]);
...@@ -571,7 +499,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue) ...@@ -571,7 +499,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
head = &queue->tasks[queue->maxpriority]; head = &queue->tasks[queue->maxpriority];
for (;;) { for (;;) {
while (!list_empty(head)) { while (!list_empty(head)) {
task = list_entry(head->next, struct rpc_task, tk_list); task = list_entry(head->next, struct rpc_task, u.tk_wait.list);
__rpc_wake_up_task(task); __rpc_wake_up_task(task);
} }
if (head == &queue->tasks[0]) if (head == &queue->tasks[0])
...@@ -597,7 +525,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) ...@@ -597,7 +525,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
head = &queue->tasks[queue->maxpriority]; head = &queue->tasks[queue->maxpriority];
for (;;) { for (;;) {
while (!list_empty(head)) { while (!list_empty(head)) {
task = list_entry(head->next, struct rpc_task, tk_list); task = list_entry(head->next, struct rpc_task, u.tk_wait.list);
task->tk_status = status; task->tk_status = status;
__rpc_wake_up_task(task); __rpc_wake_up_task(task);
} }
...@@ -629,21 +557,22 @@ __rpc_atrun(struct rpc_task *task) ...@@ -629,21 +557,22 @@ __rpc_atrun(struct rpc_task *task)
/* /*
* This is the RPC `scheduler' (or rather, the finite state machine). * This is the RPC `scheduler' (or rather, the finite state machine).
*/ */
static int static int __rpc_execute(struct rpc_task *task)
__rpc_execute(struct rpc_task *task)
{ {
int status = 0; int status = 0;
dprintk("RPC: %4d rpc_execute flgs %x\n", dprintk("RPC: %4d rpc_execute flgs %x\n",
task->tk_pid, task->tk_flags); task->tk_pid, task->tk_flags);
if (!RPC_IS_RUNNING(task)) { BUG_ON(RPC_IS_QUEUED(task));
printk(KERN_WARNING "RPC: rpc_execute called for sleeping task!!\n");
return 0;
}
restarted: restarted:
while (1) { while (1) {
/*
* Garbage collection of pending timers...
*/
rpc_delete_timer(task);
/* /*
* Execute any pending callback. * Execute any pending callback.
*/ */
...@@ -660,7 +589,9 @@ __rpc_execute(struct rpc_task *task) ...@@ -660,7 +589,9 @@ __rpc_execute(struct rpc_task *task)
*/ */
save_callback=task->tk_callback; save_callback=task->tk_callback;
task->tk_callback=NULL; task->tk_callback=NULL;
lock_kernel();
save_callback(task); save_callback(task);
unlock_kernel();
} }
/* /*
...@@ -668,43 +599,35 @@ __rpc_execute(struct rpc_task *task) ...@@ -668,43 +599,35 @@ __rpc_execute(struct rpc_task *task)
* tk_action may be NULL when the task has been killed * tk_action may be NULL when the task has been killed
* by someone else. * by someone else.
*/ */
if (RPC_IS_RUNNING(task)) { if (!RPC_IS_QUEUED(task)) {
/*
* Garbage collection of pending timers...
*/
rpc_delete_timer(task);
if (!task->tk_action) if (!task->tk_action)
break; break;
lock_kernel();
task->tk_action(task); task->tk_action(task);
/* micro-optimization to avoid spinlock */ unlock_kernel();
if (RPC_IS_RUNNING(task))
continue;
} }
/* /*
* Check whether task is sleeping. * Lockless check for whether task is sleeping or not.
*/ */
spin_lock_bh(&rpc_queue_lock); if (!RPC_IS_QUEUED(task))
if (!RPC_IS_RUNNING(task)) { continue;
rpc_set_sleeping(task); rpc_clear_running(task);
if (RPC_IS_ASYNC(task)) { if (RPC_IS_ASYNC(task)) {
spin_unlock_bh(&rpc_queue_lock); /* Careful! we may have raced... */
if (RPC_IS_QUEUED(task))
return 0; return 0;
} if (rpc_test_and_set_running(task))
return 0;
continue;
} }
spin_unlock_bh(&rpc_queue_lock);
if (!RPC_IS_SLEEPING(task))
continue;
/* sync task: sleep here */ /* sync task: sleep here */
dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid); dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid);
if (current->pid == rpciod_pid)
printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
if (RPC_TASK_UNINTERRUPTIBLE(task)) { if (RPC_TASK_UNINTERRUPTIBLE(task)) {
__wait_event(task->tk_wait, !RPC_IS_SLEEPING(task)); __wait_event(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task));
} else { } else {
__wait_event_interruptible(task->tk_wait, !RPC_IS_SLEEPING(task), status); __wait_event_interruptible(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task), status);
/* /*
* When a sync task receives a signal, it exits with * When a sync task receives a signal, it exits with
* -ERESTARTSYS. In order to catch any callbacks that * -ERESTARTSYS. In order to catch any callbacks that
...@@ -718,11 +641,14 @@ __rpc_execute(struct rpc_task *task) ...@@ -718,11 +641,14 @@ __rpc_execute(struct rpc_task *task)
rpc_wake_up_task(task); rpc_wake_up_task(task);
} }
} }
rpc_set_running(task);
dprintk("RPC: %4d sync task resuming\n", task->tk_pid); dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
} }
if (task->tk_exit) { if (task->tk_exit) {
lock_kernel();
task->tk_exit(task); task->tk_exit(task);
unlock_kernel();
/* If tk_action is non-null, the user wants us to restart */ /* If tk_action is non-null, the user wants us to restart */
if (task->tk_action) { if (task->tk_action) {
if (!RPC_ASSASSINATED(task)) { if (!RPC_ASSASSINATED(task)) {
...@@ -741,7 +667,6 @@ __rpc_execute(struct rpc_task *task) ...@@ -741,7 +667,6 @@ __rpc_execute(struct rpc_task *task)
/* Release all resources associated with the task */ /* Release all resources associated with the task */
rpc_release_task(task); rpc_release_task(task);
return status; return status;
} }
...@@ -757,57 +682,16 @@ __rpc_execute(struct rpc_task *task) ...@@ -757,57 +682,16 @@ __rpc_execute(struct rpc_task *task)
int int
rpc_execute(struct rpc_task *task) rpc_execute(struct rpc_task *task)
{ {
int status = -EIO; BUG_ON(task->tk_active);
if (rpc_inhibit) {
printk(KERN_INFO "RPC: execution inhibited!\n");
goto out_release;
}
status = -EWOULDBLOCK;
if (task->tk_active) {
printk(KERN_ERR "RPC: active task was run twice!\n");
goto out_err;
}
task->tk_active = 1; task->tk_active = 1;
rpc_set_running(task); rpc_set_running(task);
return __rpc_execute(task); return __rpc_execute(task);
out_release:
rpc_release_task(task);
out_err:
return status;
} }
/* static void rpc_async_schedule(void *arg)
* This is our own little scheduler for async RPC tasks.
*/
static void
__rpc_schedule(void)
{ {
struct rpc_task *task; __rpc_execute((struct rpc_task *)arg);
int count = 0;
dprintk("RPC: rpc_schedule enter\n");
while (1) {
task_for_first(task, &schedq.tasks[0]) {
__rpc_remove_wait_queue(task);
spin_unlock_bh(&rpc_queue_lock);
__rpc_execute(task);
spin_lock_bh(&rpc_queue_lock);
} else {
break;
}
if (++count >= 200 || need_resched()) {
count = 0;
spin_unlock_bh(&rpc_queue_lock);
schedule();
spin_lock_bh(&rpc_queue_lock);
}
}
dprintk("RPC: rpc_schedule leave\n");
} }
/* /*
...@@ -865,7 +749,6 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call ...@@ -865,7 +749,6 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call
task->tk_client = clnt; task->tk_client = clnt;
task->tk_flags = flags; task->tk_flags = flags;
task->tk_exit = callback; task->tk_exit = callback;
init_waitqueue_head(&task->tk_wait);
if (current->uid != current->fsuid || current->gid != current->fsgid) if (current->uid != current->fsuid || current->gid != current->fsgid)
task->tk_flags |= RPC_TASK_SETUID; task->tk_flags |= RPC_TASK_SETUID;
...@@ -876,7 +759,11 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call ...@@ -876,7 +759,11 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call
task->tk_priority = RPC_PRIORITY_NORMAL; task->tk_priority = RPC_PRIORITY_NORMAL;
task->tk_cookie = (unsigned long)current; task->tk_cookie = (unsigned long)current;
INIT_LIST_HEAD(&task->tk_links);
/* Initialize workqueue for async tasks */
task->tk_workqueue = rpciod_workqueue;
if (!RPC_IS_ASYNC(task))
init_waitqueue_head(&task->u.tk_wait.waitq);
/* Add to global list of all tasks */ /* Add to global list of all tasks */
spin_lock(&rpc_sched_lock); spin_lock(&rpc_sched_lock);
...@@ -966,19 +853,9 @@ rpc_release_task(struct rpc_task *task) ...@@ -966,19 +853,9 @@ rpc_release_task(struct rpc_task *task)
list_del(&task->tk_task); list_del(&task->tk_task);
spin_unlock(&rpc_sched_lock); spin_unlock(&rpc_sched_lock);
/* Protect the execution below. */ BUG_ON (RPC_IS_QUEUED(task));
spin_lock_bh(&rpc_queue_lock);
/* Disable timer to prevent zombie wakeup */
__rpc_disable_timer(task);
/* Remove from any wait queue we're still on */
__rpc_remove_wait_queue(task);
task->tk_active = 0; task->tk_active = 0;
spin_unlock_bh(&rpc_queue_lock);
/* Synchronously delete any running timer */ /* Synchronously delete any running timer */
rpc_delete_timer(task); rpc_delete_timer(task);
...@@ -1094,82 +971,6 @@ rpc_killall_tasks(struct rpc_clnt *clnt) ...@@ -1094,82 +971,6 @@ rpc_killall_tasks(struct rpc_clnt *clnt)
static DECLARE_MUTEX_LOCKED(rpciod_running); static DECLARE_MUTEX_LOCKED(rpciod_running);
static inline int
rpciod_task_pending(void)
{
return !list_empty(&schedq.tasks[0]);
}
/*
* This is the rpciod kernel thread
*/
static int
rpciod(void *ptr)
{
int rounds = 0;
lock_kernel();
/*
* Let our maker know we're running ...
*/
rpciod_pid = current->pid;
up(&rpciod_running);
daemonize("rpciod");
allow_signal(SIGKILL);
dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid);
spin_lock_bh(&rpc_queue_lock);
while (rpciod_users) {
DEFINE_WAIT(wait);
if (signalled()) {
spin_unlock_bh(&rpc_queue_lock);
rpciod_killall();
flush_signals(current);
spin_lock_bh(&rpc_queue_lock);
}
__rpc_schedule();
if (current->flags & PF_FREEZE) {
spin_unlock_bh(&rpc_queue_lock);
refrigerator(PF_FREEZE);
spin_lock_bh(&rpc_queue_lock);
}
if (++rounds >= 64) { /* safeguard */
spin_unlock_bh(&rpc_queue_lock);
schedule();
rounds = 0;
spin_lock_bh(&rpc_queue_lock);
}
dprintk("RPC: rpciod back to sleep\n");
prepare_to_wait(&rpciod_idle, &wait, TASK_INTERRUPTIBLE);
if (!rpciod_task_pending() && !signalled()) {
spin_unlock_bh(&rpc_queue_lock);
schedule();
rounds = 0;
spin_lock_bh(&rpc_queue_lock);
}
finish_wait(&rpciod_idle, &wait);
dprintk("RPC: switch to rpciod\n");
}
spin_unlock_bh(&rpc_queue_lock);
dprintk("RPC: rpciod shutdown commences\n");
if (!list_empty(&all_tasks)) {
printk(KERN_ERR "rpciod: active tasks at shutdown?!\n");
rpciod_killall();
}
dprintk("RPC: rpciod exiting\n");
unlock_kernel();
rpciod_pid = 0;
complete_and_exit(&rpciod_killer, 0);
return 0;
}
static void static void
rpciod_killall(void) rpciod_killall(void)
{ {
...@@ -1178,9 +979,7 @@ rpciod_killall(void) ...@@ -1178,9 +979,7 @@ rpciod_killall(void)
while (!list_empty(&all_tasks)) { while (!list_empty(&all_tasks)) {
clear_thread_flag(TIF_SIGPENDING); clear_thread_flag(TIF_SIGPENDING);
rpc_killall_tasks(NULL); rpc_killall_tasks(NULL);
spin_lock_bh(&rpc_queue_lock); flush_workqueue(rpciod_workqueue);
__rpc_schedule();
spin_unlock_bh(&rpc_queue_lock);
if (!list_empty(&all_tasks)) { if (!list_empty(&all_tasks)) {
dprintk("rpciod_killall: waiting for tasks to exit\n"); dprintk("rpciod_killall: waiting for tasks to exit\n");
yield(); yield();
...@@ -1198,28 +997,30 @@ rpciod_killall(void) ...@@ -1198,28 +997,30 @@ rpciod_killall(void)
int int
rpciod_up(void) rpciod_up(void)
{ {
struct workqueue_struct *wq;
int error = 0; int error = 0;
down(&rpciod_sema); down(&rpciod_sema);
dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid, rpciod_users); dprintk("rpciod_up: users %d\n", rpciod_users);
rpciod_users++; rpciod_users++;
if (rpciod_pid) if (rpciod_workqueue)
goto out; goto out;
/* /*
* If there's no pid, we should be the first user. * If there's no pid, we should be the first user.
*/ */
if (rpciod_users > 1) if (rpciod_users > 1)
printk(KERN_WARNING "rpciod_up: no pid, %d users??\n", rpciod_users); printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users);
/* /*
* Create the rpciod thread and wait for it to start. * Create the rpciod thread and wait for it to start.
*/ */
error = kernel_thread(rpciod, NULL, 0); error = -ENOMEM;
if (error < 0) { wq = create_workqueue("rpciod");
printk(KERN_WARNING "rpciod_up: create thread failed, error=%d\n", error); if (wq == NULL) {
printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error);
rpciod_users--; rpciod_users--;
goto out; goto out;
} }
down(&rpciod_running); rpciod_workqueue = wq;
error = 0; error = 0;
out: out:
up(&rpciod_sema); up(&rpciod_sema);
...@@ -1230,20 +1031,21 @@ void ...@@ -1230,20 +1031,21 @@ void
rpciod_down(void) rpciod_down(void)
{ {
down(&rpciod_sema); down(&rpciod_sema);
dprintk("rpciod_down pid %d sema %d\n", rpciod_pid, rpciod_users); dprintk("rpciod_down sema %d\n", rpciod_users);
if (rpciod_users) { if (rpciod_users) {
if (--rpciod_users) if (--rpciod_users)
goto out; goto out;
} else } else
printk(KERN_WARNING "rpciod_down: pid=%d, no users??\n", rpciod_pid); printk(KERN_WARNING "rpciod_down: no users??\n");
if (!rpciod_pid) { if (!rpciod_workqueue) {
dprintk("rpciod_down: Nothing to do!\n"); dprintk("rpciod_down: Nothing to do!\n");
goto out; goto out;
} }
rpciod_killall();
kill_proc(rpciod_pid, SIGKILL, 1); destroy_workqueue(rpciod_workqueue);
wait_for_completion(&rpciod_killer); rpciod_workqueue = NULL;
out: out:
up(&rpciod_sema); up(&rpciod_sema);
} }
...@@ -1261,7 +1063,12 @@ void rpc_show_tasks(void) ...@@ -1261,7 +1063,12 @@ void rpc_show_tasks(void)
} }
printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout " printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
"-rpcwait -action- --exit--\n"); "-rpcwait -action- --exit--\n");
alltask_for_each(t, le, &all_tasks) alltask_for_each(t, le, &all_tasks) {
const char *rpc_waitq = "none";
if (RPC_IS_QUEUED(t))
rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n", printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
t->tk_pid, t->tk_pid,
(t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1), (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
...@@ -1269,8 +1076,9 @@ void rpc_show_tasks(void) ...@@ -1269,8 +1076,9 @@ void rpc_show_tasks(void)
t->tk_client, t->tk_client,
(t->tk_client ? t->tk_client->cl_prog : 0), (t->tk_client ? t->tk_client->cl_prog : 0),
t->tk_rqstp, t->tk_timeout, t->tk_rqstp, t->tk_timeout,
rpc_qname(t->tk_rpcwait), rpc_waitq,
t->tk_action, t->tk_exit); t->tk_action, t->tk_exit);
}
spin_unlock(&rpc_sched_lock); spin_unlock(&rpc_sched_lock);
} }
#endif #endif
......
...@@ -1103,7 +1103,7 @@ xprt_write_space(struct sock *sk) ...@@ -1103,7 +1103,7 @@ xprt_write_space(struct sock *sk)
goto out; goto out;
spin_lock_bh(&xprt->sock_lock); spin_lock_bh(&xprt->sock_lock);
if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending) if (xprt->snd_task)
rpc_wake_up_task(xprt->snd_task); rpc_wake_up_task(xprt->snd_task);
spin_unlock_bh(&xprt->sock_lock); spin_unlock_bh(&xprt->sock_lock);
out: out:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment