Commit fd796e41 authored by Paul E. McKenney's avatar Paul E. McKenney

rcu-tasks: Use fewer callbacks queues if callback flood ends

By default, when lock contention is encountered, the RCU Tasks flavors
of RCU switch to using per-CPU queueing.  However, if the callback
flood ends, per-CPU queueing continues to be used, which introduces
significant additional overhead, especially for callback invocation,
which fans out a series of workqueue handlers.

This commit therefore switches back to single-queue operation if at the
beginning of a grace period there are very few callbacks.  The definition
of "very few" is set by the rcupdate.rcu_task_collapse_lim module
parameter, which defaults to 10.  This switch happens in two phases,
with the first phase causing future callbacks to be enqueued on CPU 0's
queue, but with all queues continuing to be checked for grace periods
and callback invocation.  The second phase checks to see if an RCU grace
period has elapsed and if all remaining RCU-Tasks callbacks are queued
on CPU 0.  If so, only CPU 0 is checked for future grace periods and
callback operation.

Of course, the return of contention anywhere during this process will
result in returning to per-CPU callback queueing.
Reported-by: default avatarMartin Lau <kafai@fb.com>
Cc: Neeraj Upadhyay <neeraj.iitr10@gmail.com>
Signed-off-by: default avatarPaul E. McKenney <paulmck@kernel.org>
parent 2cee0789
......@@ -4805,6 +4805,14 @@
period to instead use normal non-expedited
grace-period processing.
rcupdate.rcu_task_collapse_lim= [KNL]
Set the maximum number of callbacks present
at the beginning of a grace period that allows
the RCU Tasks flavors to collapse back to using
a single callback queue. This switching only
occurs when rcupdate.rcu_task_enqueue_lim is
set to the default value of -1.
rcupdate.rcu_task_contend_lim= [KNL]
Set the minimum number of callback-queuing-time
lock-contention events per jiffy required to
......
......@@ -68,6 +68,7 @@ struct rcu_tasks_percpu {
* @percpu_enqueue_shift: Shift down CPU ID this much when enqueuing callbacks.
* @percpu_enqueue_lim: Number of per-CPU callback queues in use for enqueuing.
* @percpu_dequeue_lim: Number of per-CPU callback queues in use for dequeuing.
* @percpu_dequeue_gpseq: RCU grace-period number to propagate enqueue limit to dequeuers.
* @barrier_q_mutex: Serialize barrier operations.
* @barrier_q_count: Number of queues being waited on.
* @barrier_q_completion: Barrier wait/wakeup mechanism.
......@@ -98,6 +99,7 @@ struct rcu_tasks {
int percpu_enqueue_shift;
int percpu_enqueue_lim;
int percpu_dequeue_lim;
unsigned long percpu_dequeue_gpseq;
struct mutex barrier_q_mutex;
atomic_t barrier_q_count;
struct completion barrier_q_completion;
......@@ -148,6 +150,8 @@ module_param(rcu_task_enqueue_lim, int, 0444);
static bool rcu_task_cb_adjust;
static int rcu_task_contend_lim __read_mostly = 100;
module_param(rcu_task_contend_lim, int, 0444);
static int rcu_task_collapse_lim __read_mostly = 10;
module_param(rcu_task_collapse_lim, int, 0444);
/* RCU tasks grace-period state for debugging. */
#define RTGS_INIT 0
......@@ -269,6 +273,7 @@ static void call_rcu_tasks_generic(struct rcu_head *rhp, rcu_callback_t func,
rhp->next = NULL;
rhp->func = func;
local_irq_save(flags);
rcu_read_lock();
rtpcp = per_cpu_ptr(rtp->rtpcpu,
smp_processor_id() >> READ_ONCE(rtp->percpu_enqueue_shift));
if (!raw_spin_trylock_rcu_node(rtpcp)) { // irqs already disabled.
......@@ -294,12 +299,13 @@ static void call_rcu_tasks_generic(struct rcu_head *rhp, rcu_callback_t func,
raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
if (rtp->percpu_enqueue_lim != nr_cpu_ids) {
WRITE_ONCE(rtp->percpu_enqueue_shift, ilog2(nr_cpu_ids));
WRITE_ONCE(rtp->percpu_enqueue_lim, nr_cpu_ids);
WRITE_ONCE(rtp->percpu_dequeue_lim, nr_cpu_ids);
smp_store_release(&rtp->percpu_enqueue_lim, nr_cpu_ids);
pr_info("Switching %s to per-CPU callback queuing.\n", rtp->name);
}
raw_spin_unlock_irqrestore(&rtp->cbs_gbl_lock, flags);
}
rcu_read_unlock();
/* We can't create the thread unless interrupts are enabled. */
if (needwake && READ_ONCE(rtp->kthread_ptr))
irq_work_queue(&rtpcp->rtp_irq_work);
......@@ -369,15 +375,25 @@ static int rcu_tasks_need_gpcb(struct rcu_tasks *rtp)
{
int cpu;
unsigned long flags;
long n;
long ncbs = 0;
long ncbsnz = 0;
int needgpcb = 0;
for (cpu = 0; cpu < smp_load_acquire(&rtp->percpu_dequeue_lim); cpu++) {
struct rcu_tasks_percpu *rtpcp = per_cpu_ptr(rtp->rtpcpu, cpu);
/* Advance and accelerate any new callbacks. */
if (rcu_segcblist_empty(&rtpcp->cblist))
if (!rcu_segcblist_n_cbs(&rtpcp->cblist))
continue;
raw_spin_lock_irqsave_rcu_node(rtpcp, flags);
// Should we shrink down to a single callback queue?
n = rcu_segcblist_n_cbs(&rtpcp->cblist);
if (n) {
ncbs += n;
if (cpu > 0)
ncbsnz += n;
}
rcu_segcblist_advance(&rtpcp->cblist, rcu_seq_current(&rtp->tasks_gp_seq));
(void)rcu_segcblist_accelerate(&rtpcp->cblist, rcu_seq_snap(&rtp->tasks_gp_seq));
if (rcu_segcblist_pend_cbs(&rtpcp->cblist))
......@@ -386,6 +402,34 @@ static int rcu_tasks_need_gpcb(struct rcu_tasks *rtp)
needgpcb |= 0x1;
raw_spin_unlock_irqrestore_rcu_node(rtpcp, flags);
}
// Shrink down to a single callback queue if appropriate.
// This is done in two stages: (1) If there are no more than
// rcu_task_collapse_lim callbacks on CPU 0 and none on any other
// CPU, limit enqueueing to CPU 0. (2) After an RCU grace period,
// if there has not been an increase in callbacks, limit dequeuing
// to CPU 0. Note the matching RCU read-side critical section in
// call_rcu_tasks_generic().
if (rcu_task_cb_adjust && ncbs <= rcu_task_collapse_lim) {
raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
if (rtp->percpu_enqueue_lim > 1) {
WRITE_ONCE(rtp->percpu_enqueue_shift, ilog2(nr_cpu_ids));
smp_store_release(&rtp->percpu_enqueue_lim, 1);
rtp->percpu_dequeue_gpseq = get_state_synchronize_rcu();
pr_info("Starting switch %s to CPU-0 callback queuing.\n", rtp->name);
}
raw_spin_unlock_irqrestore(&rtp->cbs_gbl_lock, flags);
}
if (rcu_task_cb_adjust && !ncbsnz &&
poll_state_synchronize_rcu(rtp->percpu_dequeue_gpseq)) {
raw_spin_lock_irqsave(&rtp->cbs_gbl_lock, flags);
if (rtp->percpu_enqueue_lim < rtp->percpu_dequeue_lim) {
WRITE_ONCE(rtp->percpu_dequeue_lim, 1);
pr_info("Completing switch %s to CPU-0 callback queuing.\n", rtp->name);
}
raw_spin_unlock_irqrestore(&rtp->cbs_gbl_lock, flags);
}
return needgpcb;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment