Commit b1691bc0 authored by Jeff Layton's avatar Jeff Layton Committed by J. Bruce Fields

sunrpc: convert to lockless lookup of queued server threads

Testing has shown that the pool->sp_lock can be a bottleneck on a busy
server. Every time data is received on a socket, the server must take
that lock in order to dequeue a thread from the sp_threads list.

Address this problem by eliminating the sp_threads list (which contains
threads that are currently idle) and replacing it with a RQ_BUSY flag in
svc_rqst. This allows us to walk the sp_all_threads list under the
rcu_read_lock and find a suitable thread for the xprt by doing a
test_and_set_bit.

Note that we do still have a potential atomicity problem however with
this approach.  We don't want svc_xprt_do_enqueue to set the
rqst->rq_xprt pointer unless a test_and_set_bit of RQ_BUSY returned
zero (which indicates that the thread was idle). But, by the time we
check that, the bit could be flipped by a waking thread.

To address this, we acquire a new per-rqst spinlock (rq_lock) and take
that before doing the test_and_set_bit. If that returns false, then we
can set rq_xprt and drop the spinlock. Then, when the thread wakes up,
it must set the bit under the same spinlock and can trust that if it was
already set then the rq_xprt is also properly set.

With this scheme, the case where we have an idle thread no longer needs
to take the highly contended pool->sp_lock at all, and that removes the
bottleneck.

That still leaves one issue: What of the case where we walk the whole
sp_all_threads list and don't find an idle thread? Because the search is
lockess, it's possible for the queueing to race with a thread that is
going to sleep. To address that, we queue the xprt and then search again.

If we find an idle thread at that point, we can't attach the xprt to it
directly since that might race with a different thread waking up and
finding it.  All we can do is wake the idle thread back up and let it
attempt to find the now-queued xprt.
Signed-off-by: default avatarJeff Layton <jlayton@primarydata.com>
Tested-by: default avatarChris Worley <chris.worley@primarydata.com>
Signed-off-by: default avatarJ. Bruce Fields <bfields@redhat.com>
parent 403c7b44
...@@ -45,7 +45,6 @@ struct svc_pool_stats { ...@@ -45,7 +45,6 @@ struct svc_pool_stats {
struct svc_pool { struct svc_pool {
unsigned int sp_id; /* pool id; also node id on NUMA */ unsigned int sp_id; /* pool id; also node id on NUMA */
spinlock_t sp_lock; /* protects all fields */ spinlock_t sp_lock; /* protects all fields */
struct list_head sp_threads; /* idle server threads */
struct list_head sp_sockets; /* pending sockets */ struct list_head sp_sockets; /* pending sockets */
unsigned int sp_nrthreads; /* # of threads in pool */ unsigned int sp_nrthreads; /* # of threads in pool */
struct list_head sp_all_threads; /* all server threads */ struct list_head sp_all_threads; /* all server threads */
...@@ -221,7 +220,6 @@ static inline void svc_putu32(struct kvec *iov, __be32 val) ...@@ -221,7 +220,6 @@ static inline void svc_putu32(struct kvec *iov, __be32 val)
* processed. * processed.
*/ */
struct svc_rqst { struct svc_rqst {
struct list_head rq_list; /* idle list */
struct list_head rq_all; /* all threads list */ struct list_head rq_all; /* all threads list */
struct rcu_head rq_rcu_head; /* for RCU deferred kfree */ struct rcu_head rq_rcu_head; /* for RCU deferred kfree */
struct svc_xprt * rq_xprt; /* transport ptr */ struct svc_xprt * rq_xprt; /* transport ptr */
...@@ -264,6 +262,7 @@ struct svc_rqst { ...@@ -264,6 +262,7 @@ struct svc_rqst {
* to prevent encrypting page * to prevent encrypting page
* cache pages */ * cache pages */
#define RQ_VICTIM (5) /* about to be shut down */ #define RQ_VICTIM (5) /* about to be shut down */
#define RQ_BUSY (6) /* request is busy */
unsigned long rq_flags; /* flags field */ unsigned long rq_flags; /* flags field */
void * rq_argp; /* decoded arguments */ void * rq_argp; /* decoded arguments */
...@@ -285,6 +284,7 @@ struct svc_rqst { ...@@ -285,6 +284,7 @@ struct svc_rqst {
struct auth_domain * rq_gssclient; /* "gss/"-style peer info */ struct auth_domain * rq_gssclient; /* "gss/"-style peer info */
struct svc_cacherep * rq_cacherep; /* cache info */ struct svc_cacherep * rq_cacherep; /* cache info */
struct task_struct *rq_task; /* service thread */ struct task_struct *rq_task; /* service thread */
spinlock_t rq_lock; /* per-request lock */
}; };
#define SVC_NET(svc_rqst) (svc_rqst->rq_xprt->xpt_net) #define SVC_NET(svc_rqst) (svc_rqst->rq_xprt->xpt_net)
......
...@@ -419,7 +419,8 @@ TRACE_EVENT(xs_tcp_data_recv, ...@@ -419,7 +419,8 @@ TRACE_EVENT(xs_tcp_data_recv,
{ (1UL << RQ_USEDEFERRAL), "RQ_USEDEFERRAL"}, \ { (1UL << RQ_USEDEFERRAL), "RQ_USEDEFERRAL"}, \
{ (1UL << RQ_DROPME), "RQ_DROPME"}, \ { (1UL << RQ_DROPME), "RQ_DROPME"}, \
{ (1UL << RQ_SPLICE_OK), "RQ_SPLICE_OK"}, \ { (1UL << RQ_SPLICE_OK), "RQ_SPLICE_OK"}, \
{ (1UL << RQ_VICTIM), "RQ_VICTIM"}) { (1UL << RQ_VICTIM), "RQ_VICTIM"}, \
{ (1UL << RQ_BUSY), "RQ_BUSY"})
TRACE_EVENT(svc_recv, TRACE_EVENT(svc_recv,
TP_PROTO(struct svc_rqst *rqst, int status), TP_PROTO(struct svc_rqst *rqst, int status),
......
...@@ -476,7 +476,6 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools, ...@@ -476,7 +476,6 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
i, serv->sv_name); i, serv->sv_name);
pool->sp_id = i; pool->sp_id = i;
INIT_LIST_HEAD(&pool->sp_threads);
INIT_LIST_HEAD(&pool->sp_sockets); INIT_LIST_HEAD(&pool->sp_sockets);
INIT_LIST_HEAD(&pool->sp_all_threads); INIT_LIST_HEAD(&pool->sp_all_threads);
spin_lock_init(&pool->sp_lock); spin_lock_init(&pool->sp_lock);
...@@ -614,12 +613,14 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node) ...@@ -614,12 +613,14 @@ svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
goto out_enomem; goto out_enomem;
serv->sv_nrthreads++; serv->sv_nrthreads++;
__set_bit(RQ_BUSY, &rqstp->rq_flags);
spin_lock_init(&rqstp->rq_lock);
rqstp->rq_server = serv;
rqstp->rq_pool = pool;
spin_lock_bh(&pool->sp_lock); spin_lock_bh(&pool->sp_lock);
pool->sp_nrthreads++; pool->sp_nrthreads++;
list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads); list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
spin_unlock_bh(&pool->sp_lock); spin_unlock_bh(&pool->sp_lock);
rqstp->rq_server = serv;
rqstp->rq_pool = pool;
rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node); rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
if (!rqstp->rq_argp) if (!rqstp->rq_argp)
......
...@@ -310,25 +310,6 @@ char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) ...@@ -310,25 +310,6 @@ char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
} }
EXPORT_SYMBOL_GPL(svc_print_addr); EXPORT_SYMBOL_GPL(svc_print_addr);
/*
* Queue up an idle server thread. Must have pool->sp_lock held.
* Note: this is really a stack rather than a queue, so that we only
* use as many different threads as we need, and the rest don't pollute
* the cache.
*/
static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
{
list_add(&rqstp->rq_list, &pool->sp_threads);
}
/*
* Dequeue an nfsd thread. Must have pool->sp_lock held.
*/
static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
{
list_del(&rqstp->rq_list);
}
static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt) static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
{ {
if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE))) if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
...@@ -343,6 +324,7 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt) ...@@ -343,6 +324,7 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
struct svc_pool *pool; struct svc_pool *pool;
struct svc_rqst *rqstp; struct svc_rqst *rqstp;
int cpu; int cpu;
bool queued = false;
if (!svc_xprt_has_something_to_do(xprt)) if (!svc_xprt_has_something_to_do(xprt))
return; return;
...@@ -360,37 +342,60 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt) ...@@ -360,37 +342,60 @@ static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
cpu = get_cpu(); cpu = get_cpu();
pool = svc_pool_for_cpu(xprt->xpt_server, cpu); pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
spin_lock_bh(&pool->sp_lock);
atomic_long_inc(&pool->sp_stats.packets); atomic_long_inc(&pool->sp_stats.packets);
if (!list_empty(&pool->sp_threads)) { redo_search:
rqstp = list_entry(pool->sp_threads.next, /* find a thread for this xprt */
struct svc_rqst, rcu_read_lock();
rq_list); list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
dprintk("svc: transport %p served by daemon %p\n", /* Do a lockless check first */
xprt, rqstp); if (test_bit(RQ_BUSY, &rqstp->rq_flags))
svc_thread_dequeue(pool, rqstp); continue;
if (rqstp->rq_xprt)
printk(KERN_ERR /*
"svc_xprt_enqueue: server %p, rq_xprt=%p!\n", * Once the xprt has been queued, it can only be dequeued by
rqstp, rqstp->rq_xprt); * the task that intends to service it. All we can do at that
/* Note the order of the following 3 lines: * point is to try to wake this thread back up so that it can
* We want to assign xprt to rqstp->rq_xprt only _after_ * do so.
* we've woken up the process, so that we don't race with
* the lockless check in svc_get_next_xprt().
*/ */
svc_xprt_get(xprt); if (!queued) {
wake_up_process(rqstp->rq_task); spin_lock_bh(&rqstp->rq_lock);
if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) {
/* already busy, move on... */
spin_unlock_bh(&rqstp->rq_lock);
continue;
}
/* this one will do */
rqstp->rq_xprt = xprt; rqstp->rq_xprt = xprt;
svc_xprt_get(xprt);
spin_unlock_bh(&rqstp->rq_lock);
}
rcu_read_unlock();
atomic_long_inc(&pool->sp_stats.threads_woken); atomic_long_inc(&pool->sp_stats.threads_woken);
} else { wake_up_process(rqstp->rq_task);
put_cpu();
return;
}
rcu_read_unlock();
/*
* We didn't find an idle thread to use, so we need to queue the xprt.
* Do so and then search again. If we find one, we can't hook this one
* up to it directly but we can wake the thread up in the hopes that it
* will pick it up once it searches for a xprt to service.
*/
if (!queued) {
queued = true;
dprintk("svc: transport %p put into queue\n", xprt); dprintk("svc: transport %p put into queue\n", xprt);
spin_lock_bh(&pool->sp_lock);
list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
pool->sp_stats.sockets_queued++; pool->sp_stats.sockets_queued++;
}
spin_unlock_bh(&pool->sp_lock); spin_unlock_bh(&pool->sp_lock);
goto redo_search;
}
put_cpu(); put_cpu();
} }
...@@ -408,21 +413,26 @@ void svc_xprt_enqueue(struct svc_xprt *xprt) ...@@ -408,21 +413,26 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
EXPORT_SYMBOL_GPL(svc_xprt_enqueue); EXPORT_SYMBOL_GPL(svc_xprt_enqueue);
/* /*
* Dequeue the first transport. Must be called with the pool->sp_lock held. * Dequeue the first transport, if there is one.
*/ */
static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
{ {
struct svc_xprt *xprt; struct svc_xprt *xprt = NULL;
if (list_empty(&pool->sp_sockets)) if (list_empty(&pool->sp_sockets))
return NULL; return NULL;
xprt = list_entry(pool->sp_sockets.next, spin_lock_bh(&pool->sp_lock);
if (likely(!list_empty(&pool->sp_sockets))) {
xprt = list_first_entry(&pool->sp_sockets,
struct svc_xprt, xpt_ready); struct svc_xprt, xpt_ready);
list_del_init(&xprt->xpt_ready); list_del_init(&xprt->xpt_ready);
svc_xprt_get(xprt);
dprintk("svc: transport %p dequeued, inuse=%d\n", dprintk("svc: transport %p dequeued, inuse=%d\n",
xprt, atomic_read(&xprt->xpt_ref.refcount)); xprt, atomic_read(&xprt->xpt_ref.refcount));
}
spin_unlock_bh(&pool->sp_lock);
return xprt; return xprt;
} }
...@@ -497,16 +507,21 @@ void svc_wake_up(struct svc_serv *serv) ...@@ -497,16 +507,21 @@ void svc_wake_up(struct svc_serv *serv)
pool = &serv->sv_pools[0]; pool = &serv->sv_pools[0];
spin_lock_bh(&pool->sp_lock); rcu_read_lock();
if (!list_empty(&pool->sp_threads)) { list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
rqstp = list_entry(pool->sp_threads.next, /* skip any that aren't queued */
struct svc_rqst, if (test_bit(RQ_BUSY, &rqstp->rq_flags))
rq_list); continue;
rcu_read_unlock();
dprintk("svc: daemon %p woken up.\n", rqstp); dprintk("svc: daemon %p woken up.\n", rqstp);
wake_up_process(rqstp->rq_task); wake_up_process(rqstp->rq_task);
} else return;
}
rcu_read_unlock();
/* No free entries available */
set_bit(SP_TASK_PENDING, &pool->sp_flags); set_bit(SP_TASK_PENDING, &pool->sp_flags);
spin_unlock_bh(&pool->sp_lock); smp_wmb();
} }
EXPORT_SYMBOL_GPL(svc_wake_up); EXPORT_SYMBOL_GPL(svc_wake_up);
...@@ -617,22 +632,47 @@ static int svc_alloc_arg(struct svc_rqst *rqstp) ...@@ -617,22 +632,47 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
return 0; return 0;
} }
static bool
rqst_should_sleep(struct svc_rqst *rqstp)
{
struct svc_pool *pool = rqstp->rq_pool;
/* did someone call svc_wake_up? */
if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags))
return false;
/* was a socket queued? */
if (!list_empty(&pool->sp_sockets))
return false;
/* are we shutting down? */
if (signalled() || kthread_should_stop())
return false;
/* are we freezing? */
if (freezing(current))
return false;
return true;
}
static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
{ {
struct svc_xprt *xprt; struct svc_xprt *xprt;
struct svc_pool *pool = rqstp->rq_pool; struct svc_pool *pool = rqstp->rq_pool;
long time_left = 0; long time_left = 0;
/* rq_xprt should be clear on entry */
WARN_ON_ONCE(rqstp->rq_xprt);
/* Normally we will wait up to 5 seconds for any required /* Normally we will wait up to 5 seconds for any required
* cache information to be provided. * cache information to be provided.
*/ */
rqstp->rq_chandle.thread_wait = 5*HZ; rqstp->rq_chandle.thread_wait = 5*HZ;
spin_lock_bh(&pool->sp_lock);
xprt = svc_xprt_dequeue(pool); xprt = svc_xprt_dequeue(pool);
if (xprt) { if (xprt) {
rqstp->rq_xprt = xprt; rqstp->rq_xprt = xprt;
svc_xprt_get(xprt);
/* As there is a shortage of threads and this request /* As there is a shortage of threads and this request
* had to be queued, don't allow the thread to wait so * had to be queued, don't allow the thread to wait so
...@@ -640,51 +680,38 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout) ...@@ -640,51 +680,38 @@ static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
*/ */
rqstp->rq_chandle.thread_wait = 1*HZ; rqstp->rq_chandle.thread_wait = 1*HZ;
clear_bit(SP_TASK_PENDING, &pool->sp_flags); clear_bit(SP_TASK_PENDING, &pool->sp_flags);
} else { return xprt;
if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags)) {
xprt = ERR_PTR(-EAGAIN);
goto out;
} }
/* /*
* We have to be able to interrupt this wait * We have to be able to interrupt this wait
* to bring down the daemons ... * to bring down the daemons ...
*/ */
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
clear_bit(RQ_BUSY, &rqstp->rq_flags);
smp_mb();
/* No data pending. Go to sleep */ if (likely(rqst_should_sleep(rqstp)))
svc_thread_enqueue(pool, rqstp);
spin_unlock_bh(&pool->sp_lock);
if (!(signalled() || kthread_should_stop())) {
time_left = schedule_timeout(timeout); time_left = schedule_timeout(timeout);
else
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
try_to_freeze(); try_to_freeze();
spin_lock_bh(&rqstp->rq_lock);
set_bit(RQ_BUSY, &rqstp->rq_flags);
spin_unlock_bh(&rqstp->rq_lock);
xprt = rqstp->rq_xprt; xprt = rqstp->rq_xprt;
if (xprt != NULL) if (xprt != NULL)
return xprt; return xprt;
} else
__set_current_state(TASK_RUNNING);
spin_lock_bh(&pool->sp_lock);
if (!time_left) if (!time_left)
atomic_long_inc(&pool->sp_stats.threads_timedout); atomic_long_inc(&pool->sp_stats.threads_timedout);
xprt = rqstp->rq_xprt;
if (!xprt) {
svc_thread_dequeue(pool, rqstp);
spin_unlock_bh(&pool->sp_lock);
dprintk("svc: server %p, no data yet\n", rqstp);
if (signalled() || kthread_should_stop()) if (signalled() || kthread_should_stop())
return ERR_PTR(-EINTR); return ERR_PTR(-EINTR);
else
return ERR_PTR(-EAGAIN); return ERR_PTR(-EAGAIN);
}
}
out:
spin_unlock_bh(&pool->sp_lock);
return xprt;
} }
static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt) static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment