Commit 96ef13b2 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Add a new helper rpc_wake_up_queued_task()

In all cases where we currently use rpc_wake_up_task(), we almost always
know on which waitqueue the rpc_task is actually sleeping. This will allows
us to simplify the queue locking in a future patch.
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent fde95c75
...@@ -33,7 +33,6 @@ struct rpc_wait_queue; ...@@ -33,7 +33,6 @@ struct rpc_wait_queue;
struct rpc_wait { struct rpc_wait {
struct list_head list; /* wait queue links */ struct list_head list; /* wait queue links */
struct list_head links; /* Links to related tasks */ struct list_head links; /* Links to related tasks */
struct rpc_wait_queue * rpc_waitq; /* RPC wait queue we're on */
}; };
/* /*
...@@ -80,6 +79,7 @@ struct rpc_task { ...@@ -80,6 +79,7 @@ struct rpc_task {
struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could
* be any workqueue * be any workqueue
*/ */
struct rpc_wait_queue *tk_waitqueue; /* RPC wait queue we're on */
union { union {
struct work_struct tk_work; /* Async task work queue */ struct work_struct tk_work; /* Async task work queue */
struct rpc_wait tk_wait; /* RPC wait */ struct rpc_wait tk_wait; /* RPC wait */
...@@ -233,6 +233,8 @@ void rpc_init_wait_queue(struct rpc_wait_queue *, const char *); ...@@ -233,6 +233,8 @@ void rpc_init_wait_queue(struct rpc_wait_queue *, const char *);
void rpc_sleep_on(struct rpc_wait_queue *, struct rpc_task *, void rpc_sleep_on(struct rpc_wait_queue *, struct rpc_task *,
rpc_action action, rpc_action timer); rpc_action action, rpc_action timer);
void rpc_wake_up_task(struct rpc_task *); void rpc_wake_up_task(struct rpc_task *);
void rpc_wake_up_queued_task(struct rpc_wait_queue *,
struct rpc_task *);
void rpc_wake_up(struct rpc_wait_queue *); void rpc_wake_up(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *); struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
void rpc_wake_up_status(struct rpc_wait_queue *, int); void rpc_wake_up_status(struct rpc_wait_queue *, int);
......
...@@ -1535,7 +1535,7 @@ void rpc_show_tasks(void) ...@@ -1535,7 +1535,7 @@ void rpc_show_tasks(void)
proc = -1; proc = -1;
if (RPC_IS_QUEUED(t)) if (RPC_IS_QUEUED(t))
rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq); rpc_waitq = rpc_qname(t->tk_waitqueue);
printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n", printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
t->tk_pid, proc, t->tk_pid, proc,
......
...@@ -148,7 +148,7 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task * ...@@ -148,7 +148,7 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *
list_add(&task->u.tk_wait.list, &queue->tasks[0]); list_add(&task->u.tk_wait.list, &queue->tasks[0]);
else else
list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
task->u.tk_wait.rpc_waitq = queue; task->tk_waitqueue = queue;
queue->qlen++; queue->qlen++;
rpc_set_queued(task); rpc_set_queued(task);
...@@ -175,11 +175,8 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task) ...@@ -175,11 +175,8 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
* Remove request from queue. * Remove request from queue.
* Note: must be called with spin lock held. * Note: must be called with spin lock held.
*/ */
static void __rpc_remove_wait_queue(struct rpc_task *task) static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
{ {
struct rpc_wait_queue *queue;
queue = task->u.tk_wait.rpc_waitq;
if (RPC_IS_PRIORITY(queue)) if (RPC_IS_PRIORITY(queue))
__rpc_remove_wait_queue_priority(task); __rpc_remove_wait_queue_priority(task);
else else
...@@ -364,11 +361,12 @@ EXPORT_SYMBOL_GPL(rpc_sleep_on); ...@@ -364,11 +361,12 @@ EXPORT_SYMBOL_GPL(rpc_sleep_on);
/** /**
* __rpc_do_wake_up_task - wake up a single rpc_task * __rpc_do_wake_up_task - wake up a single rpc_task
* @queue: wait queue
* @task: task to be woken up * @task: task to be woken up
* *
* Caller must hold queue->lock, and have cleared the task queued flag. * Caller must hold queue->lock, and have cleared the task queued flag.
*/ */
static void __rpc_do_wake_up_task(struct rpc_task *task) static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
{ {
dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
task->tk_pid, jiffies); task->tk_pid, jiffies);
...@@ -383,7 +381,7 @@ static void __rpc_do_wake_up_task(struct rpc_task *task) ...@@ -383,7 +381,7 @@ static void __rpc_do_wake_up_task(struct rpc_task *task)
} }
__rpc_disable_timer(task); __rpc_disable_timer(task);
__rpc_remove_wait_queue(task); __rpc_remove_wait_queue(queue, task);
rpc_make_runnable(task); rpc_make_runnable(task);
...@@ -391,36 +389,38 @@ static void __rpc_do_wake_up_task(struct rpc_task *task) ...@@ -391,36 +389,38 @@ static void __rpc_do_wake_up_task(struct rpc_task *task)
} }
/* /*
* Wake up the specified task * Wake up a queued task while the queue lock is being held
*/ */
static void __rpc_wake_up_task(struct rpc_task *task) static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
{ {
if (!RPC_IS_QUEUED(task) || task->tk_waitqueue != queue)
return;
if (rpc_start_wakeup(task)) { if (rpc_start_wakeup(task)) {
if (RPC_IS_QUEUED(task)) __rpc_do_wake_up_task(queue, task);
__rpc_do_wake_up_task(task);
rpc_finish_wakeup(task); rpc_finish_wakeup(task);
} }
} }
/* /*
* Wake up the specified task * Wake up a task on a specific queue
*/ */
void rpc_wake_up_task(struct rpc_task *task) void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
{ {
rcu_read_lock_bh(); rcu_read_lock_bh();
if (rpc_start_wakeup(task)) { spin_lock(&queue->lock);
if (RPC_IS_QUEUED(task)) { rpc_wake_up_task_queue_locked(queue, task);
struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq; spin_unlock(&queue->lock);
/* Note: we're already in a bh-safe context */
spin_lock(&queue->lock);
__rpc_do_wake_up_task(task);
spin_unlock(&queue->lock);
}
rpc_finish_wakeup(task);
}
rcu_read_unlock_bh(); rcu_read_unlock_bh();
} }
EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
/*
* Wake up the specified task
*/
void rpc_wake_up_task(struct rpc_task *task)
{
rpc_wake_up_queued_task(task->tk_waitqueue, task);
}
EXPORT_SYMBOL_GPL(rpc_wake_up_task); EXPORT_SYMBOL_GPL(rpc_wake_up_task);
/* /*
...@@ -471,7 +471,7 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu ...@@ -471,7 +471,7 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
new_owner: new_owner:
rpc_set_waitqueue_owner(queue, task->tk_owner); rpc_set_waitqueue_owner(queue, task->tk_owner);
out: out:
__rpc_wake_up_task(task); rpc_wake_up_task_queue_locked(queue, task);
return task; return task;
} }
...@@ -490,7 +490,7 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) ...@@ -490,7 +490,7 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
task = __rpc_wake_up_next_priority(queue); task = __rpc_wake_up_next_priority(queue);
else { else {
task_for_first(task, &queue->tasks[0]) task_for_first(task, &queue->tasks[0])
__rpc_wake_up_task(task); rpc_wake_up_task_queue_locked(queue, task);
} }
spin_unlock(&queue->lock); spin_unlock(&queue->lock);
rcu_read_unlock_bh(); rcu_read_unlock_bh();
...@@ -515,7 +515,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue) ...@@ -515,7 +515,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
head = &queue->tasks[queue->maxpriority]; head = &queue->tasks[queue->maxpriority];
for (;;) { for (;;) {
list_for_each_entry_safe(task, next, head, u.tk_wait.list) list_for_each_entry_safe(task, next, head, u.tk_wait.list)
__rpc_wake_up_task(task); rpc_wake_up_task_queue_locked(queue, task);
if (head == &queue->tasks[0]) if (head == &queue->tasks[0])
break; break;
head--; head--;
...@@ -543,7 +543,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) ...@@ -543,7 +543,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
for (;;) { for (;;) {
list_for_each_entry_safe(task, next, head, u.tk_wait.list) { list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
task->tk_status = status; task->tk_status = status;
__rpc_wake_up_task(task); rpc_wake_up_task_queue_locked(queue, task);
} }
if (head == &queue->tasks[0]) if (head == &queue->tasks[0])
break; break;
...@@ -562,10 +562,8 @@ static void rpc_run_timer(unsigned long ptr) ...@@ -562,10 +562,8 @@ static void rpc_run_timer(unsigned long ptr)
struct rpc_task *task = (struct rpc_task *)ptr; struct rpc_task *task = (struct rpc_task *)ptr;
void (*callback)(struct rpc_task *); void (*callback)(struct rpc_task *);
if (!rpc_start_wakeup(task))
goto out;
if (RPC_IS_QUEUED(task)) { if (RPC_IS_QUEUED(task)) {
struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq; struct rpc_wait_queue *queue = task->tk_waitqueue;
callback = task->tk_timeout_fn; callback = task->tk_timeout_fn;
dprintk("RPC: %5u running timer\n", task->tk_pid); dprintk("RPC: %5u running timer\n", task->tk_pid);
...@@ -573,11 +571,9 @@ static void rpc_run_timer(unsigned long ptr) ...@@ -573,11 +571,9 @@ static void rpc_run_timer(unsigned long ptr)
callback(task); callback(task);
/* Note: we're already in a bh-safe context */ /* Note: we're already in a bh-safe context */
spin_lock(&queue->lock); spin_lock(&queue->lock);
__rpc_do_wake_up_task(task); rpc_wake_up_task_queue_locked(queue, task);
spin_unlock(&queue->lock); spin_unlock(&queue->lock);
} }
rpc_finish_wakeup(task);
out:
smp_mb__before_clear_bit(); smp_mb__before_clear_bit();
clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate); clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
smp_mb__after_clear_bit(); smp_mb__after_clear_bit();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment