Commit 5b80147e authored by NeilBrown's avatar NeilBrown Committed by Chuck Lever

SUNRPC: only have one thread waking up at a time

Currently if several items of work become available in quick succession,
that number of threads (if available) will be woken.  By the time some
of them wake up another thread that was already cache-warm might have
come along and completed the work.  Anecdotal evidence suggests as many
as 15% of wakes find nothing to do once they get to the point of
looking.

This patch changes svc_pool_wake_idle_thread() to wake the first thread
on the queue but NOT remove it.  Subsequent calls will wake the same
thread.  Once that thread starts it will dequeue itself and after
dequeueing some work to do, it will wake the next thread if there is more
work ready.  This results in a more orderly increase in the number of
busy threads.

As a bonus, this allows us to reduce locking around the idle queue.
svc_pool_wake_idle_thread() no longer needs to take a lock (beyond
rcu_read_lock()) as it doesn't manipulate the queue, it just looks at
the first item.

The thread itself can avoid locking by using the new
llist_del_first_this() interface.  This will safely remove the thread
itself if it is the head.  If it isn't the head, it will do nothing.
If multiple threads call this concurrently only one will succeed.  The
others will do nothing, so no corruption can result.

If a thread wakes up and finds that it cannot dequeue itself that means
either
- that it wasn't woken because it was the head of the queue.  Maybe the
  freezer woke it.  In that case it can go back to sleep (after trying
  to freeze of course).
- some other thread found there was nothing to do very recently, and
  placed itself on the head of the queue in front of this thread.
  It must check again after placing itself there, so it can be deemed to
  be responsible for any pending work, and this thread can go back to
  sleep until woken.

No code ever tests for busy threads any more.  Only each thread itself
cares if it is busy.  So svc_thread_busy() is no longer needed.
Signed-off-by: default avatarNeilBrown <neilb@suse.de>
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent d7926ee8
...@@ -266,17 +266,6 @@ enum { ...@@ -266,17 +266,6 @@ enum {
RQ_DATA, /* request has data */ RQ_DATA, /* request has data */
}; };
/**
* svc_thread_busy - check if a thread as busy
* @rqstp: the thread which might be busy
*
* A thread is only busy when it is not an the idle list.
*/
static inline bool svc_thread_busy(const struct svc_rqst *rqstp)
{
return !llist_on_list(&rqstp->rq_idle);
}
#define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net) #define SVC_NET(rqst) (rqst->rq_xprt ? rqst->rq_xprt->xpt_net : rqst->rq_bc_net)
/* /*
......
...@@ -642,7 +642,6 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node) ...@@ -642,7 +642,6 @@ svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
folio_batch_init(&rqstp->rq_fbatch); folio_batch_init(&rqstp->rq_fbatch);
init_llist_node(&rqstp->rq_idle);
rqstp->rq_server = serv; rqstp->rq_server = serv;
rqstp->rq_pool = pool; rqstp->rq_pool = pool;
...@@ -704,17 +703,16 @@ void svc_pool_wake_idle_thread(struct svc_pool *pool) ...@@ -704,17 +703,16 @@ void svc_pool_wake_idle_thread(struct svc_pool *pool)
struct llist_node *ln; struct llist_node *ln;
rcu_read_lock(); rcu_read_lock();
spin_lock_bh(&pool->sp_lock); ln = READ_ONCE(pool->sp_idle_threads.first);
ln = llist_del_first_init(&pool->sp_idle_threads);
spin_unlock_bh(&pool->sp_lock);
if (ln) { if (ln) {
rqstp = llist_entry(ln, struct svc_rqst, rq_idle); rqstp = llist_entry(ln, struct svc_rqst, rq_idle);
WRITE_ONCE(rqstp->rq_qtime, ktime_get()); WRITE_ONCE(rqstp->rq_qtime, ktime_get());
if (!task_is_running(rqstp->rq_task)) {
wake_up_process(rqstp->rq_task); wake_up_process(rqstp->rq_task);
rcu_read_unlock();
percpu_counter_inc(&pool->sp_threads_woken);
trace_svc_wake_up(rqstp->rq_task->pid); trace_svc_wake_up(rqstp->rq_task->pid);
percpu_counter_inc(&pool->sp_threads_woken);
}
rcu_read_unlock();
return; return;
} }
rcu_read_unlock(); rcu_read_unlock();
......
...@@ -732,20 +732,19 @@ static void svc_thread_wait_for_work(struct svc_rqst *rqstp) ...@@ -732,20 +732,19 @@ static void svc_thread_wait_for_work(struct svc_rqst *rqstp)
if (svc_thread_should_sleep(rqstp)) { if (svc_thread_should_sleep(rqstp)) {
set_current_state(TASK_IDLE | TASK_FREEZABLE); set_current_state(TASK_IDLE | TASK_FREEZABLE);
llist_add(&rqstp->rq_idle, &pool->sp_idle_threads); llist_add(&rqstp->rq_idle, &pool->sp_idle_threads);
if (likely(svc_thread_should_sleep(rqstp)))
schedule();
if (unlikely(!svc_thread_should_sleep(rqstp))) while (!llist_del_first_this(&pool->sp_idle_threads,
/* Work just became available. This thread cannot simply &rqstp->rq_idle)) {
* choose not to sleep as it *must* wait until removed. /* Work just became available. This thread can only
* So wake the first waiter - whether it is this * handle it after removing rqstp from the idle
* thread or some other, it will get the work done. * list. If that attempt failed, some other thread
*/ * must have queued itself after finding no
svc_pool_wake_idle_thread(pool); * work to do, so that thread has taken responsibly
* for this new work. This thread can safely sleep
/* Since a thread cannot remove itself from an llist, * until woken again.
* schedule until someone else removes @rqstp from
* the idle list.
*/ */
while (!svc_thread_busy(rqstp)) {
schedule(); schedule();
set_current_state(TASK_IDLE | TASK_FREEZABLE); set_current_state(TASK_IDLE | TASK_FREEZABLE);
} }
...@@ -835,6 +834,15 @@ static void svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt) ...@@ -835,6 +834,15 @@ static void svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
svc_xprt_release(rqstp); svc_xprt_release(rqstp);
} }
static void svc_thread_wake_next(struct svc_rqst *rqstp)
{
if (!svc_thread_should_sleep(rqstp))
/* More work pending after I dequeued some,
* wake another worker
*/
svc_pool_wake_idle_thread(rqstp->rq_pool);
}
/** /**
* svc_recv - Receive and process the next request on any transport * svc_recv - Receive and process the next request on any transport
* @rqstp: an idle RPC service thread * @rqstp: an idle RPC service thread
...@@ -854,13 +862,16 @@ void svc_recv(struct svc_rqst *rqstp) ...@@ -854,13 +862,16 @@ void svc_recv(struct svc_rqst *rqstp)
clear_bit(SP_TASK_PENDING, &pool->sp_flags); clear_bit(SP_TASK_PENDING, &pool->sp_flags);
if (svc_thread_should_stop(rqstp)) if (svc_thread_should_stop(rqstp)) {
svc_thread_wake_next(rqstp);
return; return;
}
rqstp->rq_xprt = svc_xprt_dequeue(pool); rqstp->rq_xprt = svc_xprt_dequeue(pool);
if (rqstp->rq_xprt) { if (rqstp->rq_xprt) {
struct svc_xprt *xprt = rqstp->rq_xprt; struct svc_xprt *xprt = rqstp->rq_xprt;
svc_thread_wake_next(rqstp);
/* Normally we will wait up to 5 seconds for any required /* Normally we will wait up to 5 seconds for any required
* cache information to be provided. When there are no * cache information to be provided. When there are no
* idle threads, we reduce the wait time. * idle threads, we reduce the wait time.
...@@ -885,6 +896,7 @@ void svc_recv(struct svc_rqst *rqstp) ...@@ -885,6 +896,7 @@ void svc_recv(struct svc_rqst *rqstp)
if (req) { if (req) {
list_del(&req->rq_bc_list); list_del(&req->rq_bc_list);
spin_unlock_bh(&serv->sv_cb_lock); spin_unlock_bh(&serv->sv_cb_lock);
svc_thread_wake_next(rqstp);
svc_process_bc(req, rqstp); svc_process_bc(req, rqstp);
return; return;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment