Commit e27fc964 authored by Tejun Heo's avatar Tejun Heo Committed by Paul E. McKenney

rcu: increase synchronize_sched_expedited() batching

The fix in commit #6a0cc49 requires more than three concurrent instances
of synchronize_sched_expedited() before batching is possible.  This
patch uses a ticket-counter-like approach that is also not unrelated to
Lai Jiangshan's Ring RCU to allow sharing of expedited grace periods even
when there are only two concurrent instances of synchronize_sched_expedited().

This commit builds on Tejun's original posting, which may be found at
http://lkml.org/lkml/2010/11/9/204, adding memory barriers, avoiding
overflow of signed integers (other than via atomic_t), and fixing the
detection of batching.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
Signed-off-by: default avatarPaul E. McKenney <paulmck@linux.vnet.ibm.com>
parent 46fdb093
...@@ -47,6 +47,8 @@ ...@@ -47,6 +47,8 @@
extern int rcutorture_runnable; /* for sysctl */ extern int rcutorture_runnable; /* for sysctl */
#endif /* #ifdef CONFIG_RCU_TORTURE_TEST */ #endif /* #ifdef CONFIG_RCU_TORTURE_TEST */
#define UINT_CMP_GE(a, b) (UINT_MAX / 2 >= (a) - (b))
#define UINT_CMP_LT(a, b) (UINT_MAX / 2 < (a) - (b))
#define ULONG_CMP_GE(a, b) (ULONG_MAX / 2 >= (a) - (b)) #define ULONG_CMP_GE(a, b) (ULONG_MAX / 2 >= (a) - (b))
#define ULONG_CMP_LT(a, b) (ULONG_MAX / 2 < (a) - (b)) #define ULONG_CMP_LT(a, b) (ULONG_MAX / 2 < (a) - (b))
......
...@@ -1025,7 +1025,8 @@ EXPORT_SYMBOL_GPL(synchronize_sched_expedited); ...@@ -1025,7 +1025,8 @@ EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
#else /* #ifndef CONFIG_SMP */ #else /* #ifndef CONFIG_SMP */
static atomic_t synchronize_sched_expedited_count = ATOMIC_INIT(0); static atomic_t sync_sched_expedited_started = ATOMIC_INIT(0);
static atomic_t sync_sched_expedited_done = ATOMIC_INIT(0);
static int synchronize_sched_expedited_cpu_stop(void *data) static int synchronize_sched_expedited_cpu_stop(void *data)
{ {
...@@ -1041,8 +1042,6 @@ static int synchronize_sched_expedited_cpu_stop(void *data) ...@@ -1041,8 +1042,6 @@ static int synchronize_sched_expedited_cpu_stop(void *data)
* robustness against future implementation changes. * robustness against future implementation changes.
*/ */
smp_mb(); /* See above comment block. */ smp_mb(); /* See above comment block. */
if (cpumask_first(cpu_online_mask) == smp_processor_id())
atomic_inc(&synchronize_sched_expedited_count);
return 0; return 0;
} }
...@@ -1056,43 +1055,86 @@ static int synchronize_sched_expedited_cpu_stop(void *data) ...@@ -1056,43 +1055,86 @@ static int synchronize_sched_expedited_cpu_stop(void *data)
* lock that is acquired by a CPU-hotplug notifier. Failing to * lock that is acquired by a CPU-hotplug notifier. Failing to
* observe this restriction will result in deadlock. * observe this restriction will result in deadlock.
* *
* The synchronize_sched_expedited_cpu_stop() function is called * This implementation can be thought of as an application of ticket
* in stop-CPU context, but in order to keep overhead down to a dull * locking to RCU, with sync_sched_expedited_started and
* roar, we don't force this function to wait for its counterparts * sync_sched_expedited_done taking on the roles of the halves
* on other CPUs. One instance of this function will increment the * of the ticket-lock word. Each task atomically increments
* synchronize_sched_expedited_count variable per call to * sync_sched_expedited_started upon entry, snapshotting the old value,
* try_stop_cpus(), but there is no guarantee what order this instance * then attempts to stop all the CPUs. If this succeeds, then each
* will occur in. The worst case is that it is last on one call * CPU will have executed a context switch, resulting in an RCU-sched
* to try_stop_cpus(), and the first on the next call. This means * grace period. We are then done, so we use atomic_cmpxchg() to
* that piggybacking requires that synchronize_sched_expedited_count * update sync_sched_expedited_done to match our snapshot -- but
* be incremented by 3: this guarantees that the piggybacking * only if someone else has not already advanced past our snapshot.
* task has waited through an entire cycle of context switches, *
* even in the worst case. * On the other hand, if try_stop_cpus() fails, we check the value
* of sync_sched_expedited_done. If it has advanced past our
* initial snapshot, then someone else must have forced a grace period
* some time after we took our snapshot. In this case, our work is
* done for us, and we can simply return. Otherwise, we try again,
* but keep our initial snapshot for purposes of checking for someone
* doing our work for us.
*
* If we fail too many times in a row, we fall back to synchronize_sched().
*/ */
void synchronize_sched_expedited(void) void synchronize_sched_expedited(void)
{ {
int snap, trycount = 0; int firstsnap, s, snap, trycount = 0;
smp_mb(); /* ensure prior mod happens before capturing snap. */ /* Note that atomic_inc_return() implies full memory barrier. */
snap = atomic_read(&synchronize_sched_expedited_count) + 2; firstsnap = snap = atomic_inc_return(&sync_sched_expedited_started);
get_online_cpus(); get_online_cpus();
/*
* Each pass through the following loop attempts to force a
* context switch on each CPU.
*/
while (try_stop_cpus(cpu_online_mask, while (try_stop_cpus(cpu_online_mask,
synchronize_sched_expedited_cpu_stop, synchronize_sched_expedited_cpu_stop,
NULL) == -EAGAIN) { NULL) == -EAGAIN) {
put_online_cpus(); put_online_cpus();
/* No joy, try again later. Or just synchronize_sched(). */
if (trycount++ < 10) if (trycount++ < 10)
udelay(trycount * num_online_cpus()); udelay(trycount * num_online_cpus());
else { else {
synchronize_sched(); synchronize_sched();
return; return;
} }
if (atomic_read(&synchronize_sched_expedited_count) - snap > 0) {
/* Check to see if someone else did our work for us. */
s = atomic_read(&sync_sched_expedited_done);
if (UINT_CMP_GE((unsigned)s, (unsigned)firstsnap)) {
smp_mb(); /* ensure test happens before caller kfree */ smp_mb(); /* ensure test happens before caller kfree */
return; return;
} }
/*
* Refetching sync_sched_expedited_started allows later
* callers to piggyback on our grace period. We subtract
* 1 to get the same token that the last incrementer got.
* We retry after they started, so our grace period works
* for them, and they started after our first try, so their
* grace period works for us.
*/
get_online_cpus(); get_online_cpus();
snap = atomic_read(&sync_sched_expedited_started) - 1;
smp_mb(); /* ensure read is before try_stop_cpus(). */
} }
smp_mb__after_atomic_inc(); /* ensure post-GP actions seen after GP. */
/*
* Everyone up to our most recent fetch is covered by our grace
* period. Update the counter, but only if our work is still
* relevant -- which it won't be if someone who started later
* than we did beat us to the punch.
*/
do {
s = atomic_read(&sync_sched_expedited_done);
if (UINT_CMP_GE((unsigned)s, (unsigned)snap)) {
smp_mb(); /* ensure test happens before caller kfree */
break;
}
} while (atomic_cmpxchg(&sync_sched_expedited_done, s, snap) != s);
put_online_cpus(); put_online_cpus();
} }
EXPORT_SYMBOL_GPL(synchronize_sched_expedited); EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment