Commit c9e7cf27 authored by Tejun Heo's avatar Tejun Heo

workqueue: move busy_hash from global_cwq to worker_pool

There's no functional necessity for the two pools on the same CPU to
share the busy hash table.  It's also likely to be a bottleneck when
implementing pools with user-specified attributes.

This patch makes busy_hash per-pool.  The conversion is mostly
straight-forward.  Changes worth noting are,

* Large block of changes in rebind_workers() is moving the block
  inside for_each_worker_pool() as now there are separate hash tables
  for each pool.  This changes the order of operations but doesn't
  break anything.

* Thre for_each_worker_pool() loops in gcwq_unbind_fn() are combined
  into one.  This again changes the order of operaitons but doesn't
  break anything.

This is part of an effort to remove global_cwq and make worker_pool
the top level abstraction, which in turn will help implementing worker
pools with user-specified attributes.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
Reviewed-by: default avatarLai Jiangshan <laijs@cn.fujitsu.com>
parent 7c3eed5c
...@@ -137,6 +137,10 @@ struct worker_pool { ...@@ -137,6 +137,10 @@ struct worker_pool {
struct timer_list idle_timer; /* L: worker idle timeout */ struct timer_list idle_timer; /* L: worker idle timeout */
struct timer_list mayday_timer; /* L: SOS timer for workers */ struct timer_list mayday_timer; /* L: SOS timer for workers */
/* workers are chained either in busy_hash or idle_list */
DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
/* L: hash of busy workers */
struct mutex assoc_mutex; /* protect POOL_DISASSOCIATED */ struct mutex assoc_mutex; /* protect POOL_DISASSOCIATED */
struct ida worker_ida; /* L: for worker IDs */ struct ida worker_ida; /* L: for worker IDs */
}; };
...@@ -150,10 +154,6 @@ struct global_cwq { ...@@ -150,10 +154,6 @@ struct global_cwq {
spinlock_t lock; /* the gcwq lock */ spinlock_t lock; /* the gcwq lock */
unsigned int cpu; /* I: the associated cpu */ unsigned int cpu; /* I: the associated cpu */
/* workers are chained either in busy_hash or pool idle_list */
DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
/* L: hash of busy workers */
struct worker_pool pools[NR_STD_WORKER_POOLS]; struct worker_pool pools[NR_STD_WORKER_POOLS];
/* normal and highpri pools */ /* normal and highpri pools */
} ____cacheline_aligned_in_smp; } ____cacheline_aligned_in_smp;
...@@ -255,8 +255,8 @@ EXPORT_SYMBOL_GPL(system_freezable_wq); ...@@ -255,8 +255,8 @@ EXPORT_SYMBOL_GPL(system_freezable_wq);
for ((pool) = &(gcwq)->pools[0]; \ for ((pool) = &(gcwq)->pools[0]; \
(pool) < &(gcwq)->pools[NR_STD_WORKER_POOLS]; (pool)++) (pool) < &(gcwq)->pools[NR_STD_WORKER_POOLS]; (pool)++)
#define for_each_busy_worker(worker, i, pos, gcwq) \ #define for_each_busy_worker(worker, i, pos, pool) \
hash_for_each(gcwq->busy_hash, i, pos, worker, hentry) hash_for_each(pool->busy_hash, i, pos, worker, hentry)
static inline int __next_gcwq_cpu(int cpu, const struct cpumask *mask, static inline int __next_gcwq_cpu(int cpu, const struct cpumask *mask,
unsigned int sw) unsigned int sw)
...@@ -892,11 +892,11 @@ static inline void worker_clr_flags(struct worker *worker, unsigned int flags) ...@@ -892,11 +892,11 @@ static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
/** /**
* find_worker_executing_work - find worker which is executing a work * find_worker_executing_work - find worker which is executing a work
* @gcwq: gcwq of interest * @pool: pool of interest
* @work: work to find worker for * @work: work to find worker for
* *
* Find a worker which is executing @work on @gcwq by searching * Find a worker which is executing @work on @pool by searching
* @gcwq->busy_hash which is keyed by the address of @work. For a worker * @pool->busy_hash which is keyed by the address of @work. For a worker
* to match, its current execution should match the address of @work and * to match, its current execution should match the address of @work and
* its work function. This is to avoid unwanted dependency between * its work function. This is to avoid unwanted dependency between
* unrelated work executions through a work item being recycled while still * unrelated work executions through a work item being recycled while still
...@@ -924,13 +924,13 @@ static inline void worker_clr_flags(struct worker *worker, unsigned int flags) ...@@ -924,13 +924,13 @@ static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
* Pointer to worker which is executing @work if found, NULL * Pointer to worker which is executing @work if found, NULL
* otherwise. * otherwise.
*/ */
static struct worker *find_worker_executing_work(struct global_cwq *gcwq, static struct worker *find_worker_executing_work(struct worker_pool *pool,
struct work_struct *work) struct work_struct *work)
{ {
struct worker *worker; struct worker *worker;
struct hlist_node *tmp; struct hlist_node *tmp;
hash_for_each_possible(gcwq->busy_hash, worker, tmp, hentry, hash_for_each_possible(pool->busy_hash, worker, tmp, hentry,
(unsigned long)work) (unsigned long)work)
if (worker->current_work == work && if (worker->current_work == work &&
worker->current_func == work->func) worker->current_func == work->func)
...@@ -1191,13 +1191,15 @@ static bool is_chained_work(struct workqueue_struct *wq) ...@@ -1191,13 +1191,15 @@ static bool is_chained_work(struct workqueue_struct *wq)
unsigned int cpu; unsigned int cpu;
for_each_gcwq_cpu(cpu) { for_each_gcwq_cpu(cpu) {
struct global_cwq *gcwq = get_gcwq(cpu); struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
struct worker_pool *pool = cwq->pool;
struct global_cwq *gcwq = pool->gcwq;
struct worker *worker; struct worker *worker;
struct hlist_node *pos; struct hlist_node *pos;
int i; int i;
spin_lock_irqsave(&gcwq->lock, flags); spin_lock_irqsave(&gcwq->lock, flags);
for_each_busy_worker(worker, i, pos, gcwq) { for_each_busy_worker(worker, i, pos, pool) {
if (worker->task != current) if (worker->task != current)
continue; continue;
spin_unlock_irqrestore(&gcwq->lock, flags); spin_unlock_irqrestore(&gcwq->lock, flags);
...@@ -1238,7 +1240,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, ...@@ -1238,7 +1240,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
/* determine gcwq to use */ /* determine gcwq to use */
if (!(wq->flags & WQ_UNBOUND)) { if (!(wq->flags & WQ_UNBOUND)) {
struct global_cwq *last_gcwq; struct worker_pool *last_pool;
if (cpu == WORK_CPU_UNBOUND) if (cpu == WORK_CPU_UNBOUND)
cpu = raw_smp_processor_id(); cpu = raw_smp_processor_id();
...@@ -1250,14 +1252,15 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq, ...@@ -1250,14 +1252,15 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
* non-reentrancy. * non-reentrancy.
*/ */
gcwq = get_gcwq(cpu); gcwq = get_gcwq(cpu);
last_gcwq = get_work_gcwq(work); last_pool = get_work_pool(work);
if (last_gcwq && last_gcwq != gcwq) { if (last_pool && last_pool->gcwq != gcwq) {
struct global_cwq *last_gcwq = last_pool->gcwq;
struct worker *worker; struct worker *worker;
spin_lock(&last_gcwq->lock); spin_lock(&last_gcwq->lock);
worker = find_worker_executing_work(last_gcwq, work); worker = find_worker_executing_work(last_pool, work);
if (worker && worker->current_cwq->wq == wq) if (worker && worker->current_cwq->wq == wq)
gcwq = last_gcwq; gcwq = last_gcwq;
...@@ -1722,31 +1725,32 @@ static void rebind_workers(struct global_cwq *gcwq) ...@@ -1722,31 +1725,32 @@ static void rebind_workers(struct global_cwq *gcwq)
*/ */
wake_up_process(worker->task); wake_up_process(worker->task);
} }
}
/* rebind busy workers */ /* rebind busy workers */
for_each_busy_worker(worker, i, pos, gcwq) { for_each_busy_worker(worker, i, pos, pool) {
struct work_struct *rebind_work = &worker->rebind_work; struct work_struct *rebind_work = &worker->rebind_work;
struct workqueue_struct *wq; struct workqueue_struct *wq;
if (test_and_set_bit(WORK_STRUCT_PENDING_BIT, if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
work_data_bits(rebind_work))) work_data_bits(rebind_work)))
continue; continue;
debug_work_activate(rebind_work); debug_work_activate(rebind_work);
/* /*
* wq doesn't really matter but let's keep @worker->pool * wq doesn't really matter but let's keep
* and @cwq->pool consistent for sanity. * @worker->pool and @cwq->pool consistent for
*/ * sanity.
if (std_worker_pool_pri(worker->pool)) */
wq = system_highpri_wq; if (std_worker_pool_pri(worker->pool))
else wq = system_highpri_wq;
wq = system_wq; else
wq = system_wq;
insert_work(get_cwq(gcwq->cpu, wq), rebind_work,
worker->scheduled.next, insert_work(get_cwq(gcwq->cpu, wq), rebind_work,
work_color_to_flags(WORK_NO_COLOR)); worker->scheduled.next,
work_color_to_flags(WORK_NO_COLOR));
}
} }
} }
...@@ -2197,7 +2201,7 @@ __acquires(&gcwq->lock) ...@@ -2197,7 +2201,7 @@ __acquires(&gcwq->lock)
* already processing the work. If so, defer the work to the * already processing the work. If so, defer the work to the
* currently executing one. * currently executing one.
*/ */
collision = find_worker_executing_work(gcwq, work); collision = find_worker_executing_work(pool, work);
if (unlikely(collision)) { if (unlikely(collision)) {
move_linked_works(work, &collision->scheduled, NULL); move_linked_works(work, &collision->scheduled, NULL);
return; return;
...@@ -2205,7 +2209,7 @@ __acquires(&gcwq->lock) ...@@ -2205,7 +2209,7 @@ __acquires(&gcwq->lock)
/* claim and dequeue */ /* claim and dequeue */
debug_work_deactivate(work); debug_work_deactivate(work);
hash_add(gcwq->busy_hash, &worker->hentry, (unsigned long)work); hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
worker->current_work = work; worker->current_work = work;
worker->current_func = work->func; worker->current_func = work->func;
worker->current_cwq = cwq; worker->current_cwq = cwq;
...@@ -2833,13 +2837,15 @@ EXPORT_SYMBOL_GPL(drain_workqueue); ...@@ -2833,13 +2837,15 @@ EXPORT_SYMBOL_GPL(drain_workqueue);
static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr) static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
{ {
struct worker *worker = NULL; struct worker *worker = NULL;
struct worker_pool *pool;
struct global_cwq *gcwq; struct global_cwq *gcwq;
struct cpu_workqueue_struct *cwq; struct cpu_workqueue_struct *cwq;
might_sleep(); might_sleep();
gcwq = get_work_gcwq(work); pool = get_work_pool(work);
if (!gcwq) if (!pool)
return false; return false;
gcwq = pool->gcwq;
spin_lock_irq(&gcwq->lock); spin_lock_irq(&gcwq->lock);
if (!list_empty(&work->entry)) { if (!list_empty(&work->entry)) {
...@@ -2853,7 +2859,7 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr) ...@@ -2853,7 +2859,7 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
if (unlikely(!cwq || gcwq != cwq->pool->gcwq)) if (unlikely(!cwq || gcwq != cwq->pool->gcwq))
goto already_gone; goto already_gone;
} else { } else {
worker = find_worker_executing_work(gcwq, work); worker = find_worker_executing_work(pool, work);
if (!worker) if (!worker)
goto already_gone; goto already_gone;
cwq = worker->current_cwq; cwq = worker->current_cwq;
...@@ -3482,18 +3488,20 @@ EXPORT_SYMBOL_GPL(workqueue_congested); ...@@ -3482,18 +3488,20 @@ EXPORT_SYMBOL_GPL(workqueue_congested);
*/ */
unsigned int work_busy(struct work_struct *work) unsigned int work_busy(struct work_struct *work)
{ {
struct global_cwq *gcwq = get_work_gcwq(work); struct worker_pool *pool = get_work_pool(work);
struct global_cwq *gcwq;
unsigned long flags; unsigned long flags;
unsigned int ret = 0; unsigned int ret = 0;
if (!gcwq) if (!pool)
return 0; return 0;
gcwq = pool->gcwq;
spin_lock_irqsave(&gcwq->lock, flags); spin_lock_irqsave(&gcwq->lock, flags);
if (work_pending(work)) if (work_pending(work))
ret |= WORK_BUSY_PENDING; ret |= WORK_BUSY_PENDING;
if (find_worker_executing_work(gcwq, work)) if (find_worker_executing_work(pool, work))
ret |= WORK_BUSY_RUNNING; ret |= WORK_BUSY_RUNNING;
spin_unlock_irqrestore(&gcwq->lock, flags); spin_unlock_irqrestore(&gcwq->lock, flags);
...@@ -3555,15 +3563,15 @@ static void gcwq_unbind_fn(struct work_struct *work) ...@@ -3555,15 +3563,15 @@ static void gcwq_unbind_fn(struct work_struct *work)
* ones which are still executing works from before the last CPU * ones which are still executing works from before the last CPU
* down must be on the cpu. After this, they may become diasporas. * down must be on the cpu. After this, they may become diasporas.
*/ */
for_each_worker_pool(pool, gcwq) for_each_worker_pool(pool, gcwq) {
list_for_each_entry(worker, &pool->idle_list, entry) list_for_each_entry(worker, &pool->idle_list, entry)
worker->flags |= WORKER_UNBOUND; worker->flags |= WORKER_UNBOUND;
for_each_busy_worker(worker, i, pos, gcwq) for_each_busy_worker(worker, i, pos, pool)
worker->flags |= WORKER_UNBOUND; worker->flags |= WORKER_UNBOUND;
for_each_worker_pool(pool, gcwq)
pool->flags |= POOL_DISASSOCIATED; pool->flags |= POOL_DISASSOCIATED;
}
gcwq_release_assoc_and_unlock(gcwq); gcwq_release_assoc_and_unlock(gcwq);
...@@ -3854,13 +3862,12 @@ static int __init init_workqueues(void) ...@@ -3854,13 +3862,12 @@ static int __init init_workqueues(void)
spin_lock_init(&gcwq->lock); spin_lock_init(&gcwq->lock);
gcwq->cpu = cpu; gcwq->cpu = cpu;
hash_init(gcwq->busy_hash);
for_each_worker_pool(pool, gcwq) { for_each_worker_pool(pool, gcwq) {
pool->gcwq = gcwq; pool->gcwq = gcwq;
pool->flags |= POOL_DISASSOCIATED; pool->flags |= POOL_DISASSOCIATED;
INIT_LIST_HEAD(&pool->worklist); INIT_LIST_HEAD(&pool->worklist);
INIT_LIST_HEAD(&pool->idle_list); INIT_LIST_HEAD(&pool->idle_list);
hash_init(pool->busy_hash);
init_timer_deferrable(&pool->idle_timer); init_timer_deferrable(&pool->idle_timer);
pool->idle_timer.function = idle_worker_timeout; pool->idle_timer.function = idle_worker_timeout;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment