Commit 636b927e authored by Tejun Heo's avatar Tejun Heo

workqueue: Make unbound workqueues to use per-cpu pool_workqueues

A pwq (pool_workqueue) represents an association between a workqueue and a
worker_pool. When a work item is queued, the workqueue selects the pwq to
use, which in turn determines the pool, and queues the work item to the pool
through the pwq. pwq is also what implements the maximum concurrency limit -
@max_active.

As a per-cpu workqueue should be assocaited with a different worker_pool on
each CPU, it always had per-cpu pwq's that are accessed through wq->cpu_pwq.
However, unbound workqueues were sharing a pwq within each NUMA node by
default. The sharing has several downsides:

* Because @max_active is per-pwq, the meaning of @max_active changes
  depending on the machine configuration and whether workqueue NUMA locality
  support is enabled.

* Makes per-cpu and unbound code deviate.

* Gets in the way of making workqueue CPU locality awareness more flexible.

This patch makes unbound workqueues use per-cpu pwq's the same way per-cpu
workqueues do by making the following changes:

* wq->numa_pwq_tbl[] is removed and unbound workqueues now use wq->cpu_pwq
  just like per-cpu workqueues. wq->cpu_pwq is now RCU protected for unbound
  workqueues.

* numa_pwq_tbl_install() is renamed to install_unbound_pwq() and installs
  the specified pwq to the target CPU's wq->cpu_pwq.

* apply_wqattrs_prepare() now always allocates a separate pwq for each CPU
  unless the workqueue is ordered. If ordered, all CPUs use wq->dfl_pwq.
  This makes the return value of wq_calc_node_cpumask() unnecessary. It now
  returns void.

* @max_active now means the same thing for both per-cpu and unbound
  workqueues. WQ_UNBOUND_MAX_ACTIVE now equals WQ_MAX_ACTIVE and
  documentation is updated accordingly. WQ_UNBOUND_MAX_ACTIVE is no longer
  used in workqueue implementation and will be removed later.

* All unbound pwq operations which used to be per-numa-node are now per-cpu.

For most unbound workqueue users, this shouldn't cause noticeable changes.
Work item issue and completion will be a small bit faster, flush_workqueue()
would become a bit more expensive, and the total concurrency limit would
likely become higher. All @max_active==1 use cases are currently being
audited for conversion into alloc_ordered_workqueue() and they shouldn't be
affected once the audit and conversion is complete.

One area where the behavior change may be more noticeable is
workqueue_congested() as the reported congestion state is now per CPU
instead of NUMA node. There are only two users of this interface -
drivers/infiniband/hw/hfi1 and net/smc. Maintainers of both subsystems are
cc'd. Inputs on the behavior change would be very much appreciated.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
Acked-by: default avatarDennis Dalessandro <dennis.dalessandro@cornelisnetworks.com>
Cc: Jason Gunthorpe <jgg@ziepe.ca>
Cc: Leon Romanovsky <leon@kernel.org>
Cc: Karsten Graul <kgraul@linux.ibm.com>
Cc: Wenjia Zhang <wenjia@linux.ibm.com>
Cc: Jan Karcher <jaka@linux.ibm.com>
parent 4cbfd3de
...@@ -220,17 +220,16 @@ resources, scheduled and executed. ...@@ -220,17 +220,16 @@ resources, scheduled and executed.
``max_active`` ``max_active``
-------------- --------------
``@max_active`` determines the maximum number of execution contexts ``@max_active`` determines the maximum number of execution contexts per
per CPU which can be assigned to the work items of a wq. For example, CPU which can be assigned to the work items of a wq. For example, with
with ``@max_active`` of 16, at most 16 work items of the wq can be ``@max_active`` of 16, at most 16 work items of the wq can be executing
executing at the same time per CPU. at the same time per CPU. This is always a per-CPU attribute, even for
unbound workqueues.
Currently, for a bound wq, the maximum limit for ``@max_active`` is
512 and the default value used when 0 is specified is 256. For an The maximum limit for ``@max_active`` is 512 and the default value used
unbound wq, the limit is higher of 512 and 4 * when 0 is specified is 256. These values are chosen sufficiently high
``num_possible_cpus()``. These values are chosen sufficiently high such that they are not the limiting factor while providing protection in
such that they are not the limiting factor while providing protection runaway cases.
in runaway cases.
The number of active work items of a wq is usually regulated by the The number of active work items of a wq is usually regulated by the
users of the wq, more specifically, by how many work items the users users of the wq, more specifically, by how many work items the users
......
...@@ -343,14 +343,10 @@ enum { ...@@ -343,14 +343,10 @@ enum {
__WQ_ORDERED_EXPLICIT = 1 << 19, /* internal: alloc_ordered_workqueue() */ __WQ_ORDERED_EXPLICIT = 1 << 19, /* internal: alloc_ordered_workqueue() */
WQ_MAX_ACTIVE = 512, /* I like 512, better ideas? */ WQ_MAX_ACTIVE = 512, /* I like 512, better ideas? */
WQ_MAX_UNBOUND_PER_CPU = 4, /* 4 * #cpus for unbound wq */ WQ_UNBOUND_MAX_ACTIVE = WQ_MAX_ACTIVE,
WQ_DFL_ACTIVE = WQ_MAX_ACTIVE / 2, WQ_DFL_ACTIVE = WQ_MAX_ACTIVE / 2,
}; };
/* unbound wq's aren't per-cpu, scale max_active according to #cpus */
#define WQ_UNBOUND_MAX_ACTIVE \
max_t(int, WQ_MAX_ACTIVE, num_possible_cpus() * WQ_MAX_UNBOUND_PER_CPU)
/* /*
* System-wide workqueues which are always present. * System-wide workqueues which are always present.
* *
...@@ -391,7 +387,7 @@ extern struct workqueue_struct *system_freezable_power_efficient_wq; ...@@ -391,7 +387,7 @@ extern struct workqueue_struct *system_freezable_power_efficient_wq;
* alloc_workqueue - allocate a workqueue * alloc_workqueue - allocate a workqueue
* @fmt: printf format for the name of the workqueue * @fmt: printf format for the name of the workqueue
* @flags: WQ_* flags * @flags: WQ_* flags
* @max_active: max in-flight work items, 0 for default * @max_active: max in-flight work items per CPU, 0 for default
* remaining args: args for @fmt * remaining args: args for @fmt
* *
* Allocate a workqueue with the specified parameters. For detailed * Allocate a workqueue with the specified parameters. For detailed
......
...@@ -321,8 +321,7 @@ struct workqueue_struct { ...@@ -321,8 +321,7 @@ struct workqueue_struct {
/* hot fields used during command issue, aligned to cacheline */ /* hot fields used during command issue, aligned to cacheline */
unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */ unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */
struct pool_workqueue __percpu **cpu_pwq; /* I: per-cpu pwqs */ struct pool_workqueue __percpu __rcu **cpu_pwq; /* I: per-cpu pwqs */
struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */
}; };
static struct kmem_cache *pwq_cache; static struct kmem_cache *pwq_cache;
...@@ -608,35 +607,6 @@ static int worker_pool_assign_id(struct worker_pool *pool) ...@@ -608,35 +607,6 @@ static int worker_pool_assign_id(struct worker_pool *pool)
return ret; return ret;
} }
/**
* unbound_pwq_by_node - return the unbound pool_workqueue for the given node
* @wq: the target workqueue
* @node: the node ID
*
* This must be called with any of wq_pool_mutex, wq->mutex or RCU
* read locked.
* If the pwq needs to be used beyond the locking in effect, the caller is
* responsible for guaranteeing that the pwq stays online.
*
* Return: The unbound pool_workqueue for @node.
*/
static struct pool_workqueue *unbound_pwq_by_node(struct workqueue_struct *wq,
int node)
{
assert_rcu_or_wq_mutex_or_pool_mutex(wq);
/*
* XXX: @node can be NUMA_NO_NODE if CPU goes offline while a
* delayed item is pending. The plan is to keep CPU -> NODE
* mapping valid and stable across CPU on/offlines. Once that
* happens, this workaround can be removed.
*/
if (unlikely(node == NUMA_NO_NODE))
return wq->dfl_pwq;
return rcu_dereference_raw(wq->numa_pwq_tbl[node]);
}
static unsigned int work_color_to_flags(int color) static unsigned int work_color_to_flags(int color)
{ {
return color << WORK_STRUCT_COLOR_SHIFT; return color << WORK_STRUCT_COLOR_SHIFT;
...@@ -1676,16 +1646,14 @@ static void __queue_work(int cpu, struct workqueue_struct *wq, ...@@ -1676,16 +1646,14 @@ static void __queue_work(int cpu, struct workqueue_struct *wq,
rcu_read_lock(); rcu_read_lock();
retry: retry:
/* pwq which will be used unless @work is executing elsewhere */ /* pwq which will be used unless @work is executing elsewhere */
if (wq->flags & WQ_UNBOUND) { if (req_cpu == WORK_CPU_UNBOUND) {
if (req_cpu == WORK_CPU_UNBOUND) if (wq->flags & WQ_UNBOUND)
cpu = wq_select_unbound_cpu(raw_smp_processor_id()); cpu = wq_select_unbound_cpu(raw_smp_processor_id());
pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu)); else
} else {
if (req_cpu == WORK_CPU_UNBOUND)
cpu = raw_smp_processor_id(); cpu = raw_smp_processor_id();
pwq = *per_cpu_ptr(wq->cpu_pwq, cpu);
} }
pwq = rcu_dereference(*per_cpu_ptr(wq->cpu_pwq, cpu));
pool = pwq->pool; pool = pwq->pool;
/* /*
...@@ -1715,12 +1683,11 @@ static void __queue_work(int cpu, struct workqueue_struct *wq, ...@@ -1715,12 +1683,11 @@ static void __queue_work(int cpu, struct workqueue_struct *wq,
} }
/* /*
* pwq is determined and locked. For unbound pools, we could have * pwq is determined and locked. For unbound pools, we could have raced
* raced with pwq release and it could already be dead. If its * with pwq release and it could already be dead. If its refcnt is zero,
* refcnt is zero, repeat pwq selection. Note that pwqs never die * repeat pwq selection. Note that unbound pwqs never die without
* without another pwq replacing it in the numa_pwq_tbl or while * another pwq replacing it in cpu_pwq or while work items are executing
* work items are executing on it, so the retrying is guaranteed to * on it, so the retrying is guaranteed to make forward-progress.
* make forward-progress.
*/ */
if (unlikely(!pwq->refcnt)) { if (unlikely(!pwq->refcnt)) {
if (wq->flags & WQ_UNBOUND) { if (wq->flags & WQ_UNBOUND) {
...@@ -3818,12 +3785,8 @@ static void rcu_free_wq(struct rcu_head *rcu) ...@@ -3818,12 +3785,8 @@ static void rcu_free_wq(struct rcu_head *rcu)
container_of(rcu, struct workqueue_struct, rcu); container_of(rcu, struct workqueue_struct, rcu);
wq_free_lockdep(wq); wq_free_lockdep(wq);
free_percpu(wq->cpu_pwq);
if (!(wq->flags & WQ_UNBOUND)) free_workqueue_attrs(wq->unbound_attrs);
free_percpu(wq->cpu_pwq);
else
free_workqueue_attrs(wq->unbound_attrs);
kfree(wq); kfree(wq);
} }
...@@ -4174,11 +4137,8 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq, ...@@ -4174,11 +4137,8 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
* *
* The caller is responsible for ensuring that the cpumask of @node stays * The caller is responsible for ensuring that the cpumask of @node stays
* stable. * stable.
*
* Return: %true if the resulting @cpumask is different from @attrs->cpumask,
* %false if equal.
*/ */
static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node, static void wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node,
int cpu_going_down, cpumask_t *cpumask) int cpu_going_down, cpumask_t *cpumask)
{ {
if (!wq_numa_enabled || attrs->no_numa) if (!wq_numa_enabled || attrs->no_numa)
...@@ -4195,23 +4155,18 @@ static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node, ...@@ -4195,23 +4155,18 @@ static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node,
/* yeap, return possible CPUs in @node that @attrs wants */ /* yeap, return possible CPUs in @node that @attrs wants */
cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]); cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]);
if (cpumask_empty(cpumask)) { if (cpumask_empty(cpumask))
pr_warn_once("WARNING: workqueue cpumask: online intersect > " pr_warn_once("WARNING: workqueue cpumask: online intersect > "
"possible intersect\n"); "possible intersect\n");
return false; return;
}
return !cpumask_equal(cpumask, attrs->cpumask);
use_dfl: use_dfl:
cpumask_copy(cpumask, attrs->cpumask); cpumask_copy(cpumask, attrs->cpumask);
return false;
} }
/* install @pwq into @wq's numa_pwq_tbl[] for @node and return the old pwq */ /* install @pwq into @wq's cpu_pwq and return the old pwq */
static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq, static struct pool_workqueue *install_unbound_pwq(struct workqueue_struct *wq,
int node, int cpu, struct pool_workqueue *pwq)
struct pool_workqueue *pwq)
{ {
struct pool_workqueue *old_pwq; struct pool_workqueue *old_pwq;
...@@ -4221,8 +4176,8 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq, ...@@ -4221,8 +4176,8 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
/* link_pwq() can handle duplicate calls */ /* link_pwq() can handle duplicate calls */
link_pwq(pwq); link_pwq(pwq);
old_pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]); old_pwq = rcu_access_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu));
rcu_assign_pointer(wq->numa_pwq_tbl[node], pwq); rcu_assign_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu), pwq);
return old_pwq; return old_pwq;
} }
...@@ -4239,10 +4194,10 @@ struct apply_wqattrs_ctx { ...@@ -4239,10 +4194,10 @@ struct apply_wqattrs_ctx {
static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx) static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
{ {
if (ctx) { if (ctx) {
int node; int cpu;
for_each_node(node) for_each_possible_cpu(cpu)
put_pwq_unlocked(ctx->pwq_tbl[node]); put_pwq_unlocked(ctx->pwq_tbl[cpu]);
put_pwq_unlocked(ctx->dfl_pwq); put_pwq_unlocked(ctx->dfl_pwq);
free_workqueue_attrs(ctx->attrs); free_workqueue_attrs(ctx->attrs);
...@@ -4259,11 +4214,11 @@ apply_wqattrs_prepare(struct workqueue_struct *wq, ...@@ -4259,11 +4214,11 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
{ {
struct apply_wqattrs_ctx *ctx; struct apply_wqattrs_ctx *ctx;
struct workqueue_attrs *new_attrs, *tmp_attrs; struct workqueue_attrs *new_attrs, *tmp_attrs;
int node; int cpu;
lockdep_assert_held(&wq_pool_mutex); lockdep_assert_held(&wq_pool_mutex);
ctx = kzalloc(struct_size(ctx, pwq_tbl, nr_node_ids), GFP_KERNEL); ctx = kzalloc(struct_size(ctx, pwq_tbl, nr_cpu_ids), GFP_KERNEL);
new_attrs = alloc_workqueue_attrs(); new_attrs = alloc_workqueue_attrs();
tmp_attrs = alloc_workqueue_attrs(); tmp_attrs = alloc_workqueue_attrs();
...@@ -4297,14 +4252,16 @@ apply_wqattrs_prepare(struct workqueue_struct *wq, ...@@ -4297,14 +4252,16 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
if (!ctx->dfl_pwq) if (!ctx->dfl_pwq)
goto out_free; goto out_free;
for_each_node(node) { for_each_possible_cpu(cpu) {
if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) { if (new_attrs->no_numa) {
ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
if (!ctx->pwq_tbl[node])
goto out_free;
} else {
ctx->dfl_pwq->refcnt++; ctx->dfl_pwq->refcnt++;
ctx->pwq_tbl[node] = ctx->dfl_pwq; ctx->pwq_tbl[cpu] = ctx->dfl_pwq;
} else {
wq_calc_node_cpumask(new_attrs, cpu_to_node(cpu), -1,
tmp_attrs->cpumask);
ctx->pwq_tbl[cpu] = alloc_unbound_pwq(wq, tmp_attrs);
if (!ctx->pwq_tbl[cpu])
goto out_free;
} }
} }
...@@ -4327,7 +4284,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq, ...@@ -4327,7 +4284,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
/* set attrs and install prepared pwqs, @ctx points to old pwqs on return */ /* set attrs and install prepared pwqs, @ctx points to old pwqs on return */
static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx) static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
{ {
int node; int cpu;
/* all pwqs have been created successfully, let's install'em */ /* all pwqs have been created successfully, let's install'em */
mutex_lock(&ctx->wq->mutex); mutex_lock(&ctx->wq->mutex);
...@@ -4335,9 +4292,9 @@ static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx) ...@@ -4335,9 +4292,9 @@ static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs); copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
/* save the previous pwq and install the new one */ /* save the previous pwq and install the new one */
for_each_node(node) for_each_possible_cpu(cpu)
ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node, ctx->pwq_tbl[cpu] = install_unbound_pwq(ctx->wq, cpu,
ctx->pwq_tbl[node]); ctx->pwq_tbl[cpu]);
/* @dfl_pwq might not have been used, ensure it's linked */ /* @dfl_pwq might not have been used, ensure it's linked */
link_pwq(ctx->dfl_pwq); link_pwq(ctx->dfl_pwq);
...@@ -4466,20 +4423,13 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu, ...@@ -4466,20 +4423,13 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
cpumask = target_attrs->cpumask; cpumask = target_attrs->cpumask;
copy_workqueue_attrs(target_attrs, wq->unbound_attrs); copy_workqueue_attrs(target_attrs, wq->unbound_attrs);
pwq = unbound_pwq_by_node(wq, node);
/* /* nothing to do if the target cpumask matches the current pwq */
* Let's determine what needs to be done. If the target cpumask is wq_calc_node_cpumask(wq->dfl_pwq->pool->attrs, node, off_cpu, cpumask);
* different from the default pwq's, we need to compare it to @pwq's pwq = rcu_dereference_protected(*per_cpu_ptr(wq->cpu_pwq, cpu),
* and create a new one if they don't match. If the target cpumask lockdep_is_held(&wq_pool_mutex));
* equals the default pwq's, the default pwq should be used. if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
*/ return;
if (wq_calc_node_cpumask(wq->dfl_pwq->pool->attrs, node, off_cpu, cpumask)) {
if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
return;
} else {
goto use_dfl_pwq;
}
/* create a new pwq */ /* create a new pwq */
pwq = alloc_unbound_pwq(wq, target_attrs); pwq = alloc_unbound_pwq(wq, target_attrs);
...@@ -4491,7 +4441,7 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu, ...@@ -4491,7 +4441,7 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
/* Install the new pwq. */ /* Install the new pwq. */
mutex_lock(&wq->mutex); mutex_lock(&wq->mutex);
old_pwq = numa_pwq_tbl_install(wq, node, pwq); old_pwq = install_unbound_pwq(wq, cpu, pwq);
goto out_unlock; goto out_unlock;
use_dfl_pwq: use_dfl_pwq:
...@@ -4499,7 +4449,7 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu, ...@@ -4499,7 +4449,7 @@ static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
raw_spin_lock_irq(&wq->dfl_pwq->pool->lock); raw_spin_lock_irq(&wq->dfl_pwq->pool->lock);
get_pwq(wq->dfl_pwq); get_pwq(wq->dfl_pwq);
raw_spin_unlock_irq(&wq->dfl_pwq->pool->lock); raw_spin_unlock_irq(&wq->dfl_pwq->pool->lock);
old_pwq = numa_pwq_tbl_install(wq, node, wq->dfl_pwq); old_pwq = install_unbound_pwq(wq, cpu, wq->dfl_pwq);
out_unlock: out_unlock:
mutex_unlock(&wq->mutex); mutex_unlock(&wq->mutex);
put_pwq_unlocked(old_pwq); put_pwq_unlocked(old_pwq);
...@@ -4510,11 +4460,11 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq) ...@@ -4510,11 +4460,11 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
bool highpri = wq->flags & WQ_HIGHPRI; bool highpri = wq->flags & WQ_HIGHPRI;
int cpu, ret; int cpu, ret;
if (!(wq->flags & WQ_UNBOUND)) { wq->cpu_pwq = alloc_percpu(struct pool_workqueue *);
wq->cpu_pwq = alloc_percpu(struct pool_workqueue *); if (!wq->cpu_pwq)
if (!wq->cpu_pwq) goto enomem;
goto enomem;
if (!(wq->flags & WQ_UNBOUND)) {
for_each_possible_cpu(cpu) { for_each_possible_cpu(cpu) {
struct pool_workqueue **pwq_p = struct pool_workqueue **pwq_p =
per_cpu_ptr(wq->cpu_pwq, cpu); per_cpu_ptr(wq->cpu_pwq, cpu);
...@@ -4562,13 +4512,11 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq) ...@@ -4562,13 +4512,11 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
static int wq_clamp_max_active(int max_active, unsigned int flags, static int wq_clamp_max_active(int max_active, unsigned int flags,
const char *name) const char *name)
{ {
int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE; if (max_active < 1 || max_active > WQ_MAX_ACTIVE)
if (max_active < 1 || max_active > lim)
pr_warn("workqueue: max_active %d requested for %s is out of range, clamping between %d and %d\n", pr_warn("workqueue: max_active %d requested for %s is out of range, clamping between %d and %d\n",
max_active, name, 1, lim); max_active, name, 1, WQ_MAX_ACTIVE);
return clamp_val(max_active, 1, lim); return clamp_val(max_active, 1, WQ_MAX_ACTIVE);
} }
/* /*
...@@ -4612,7 +4560,6 @@ struct workqueue_struct *alloc_workqueue(const char *fmt, ...@@ -4612,7 +4560,6 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
unsigned int flags, unsigned int flags,
int max_active, ...) int max_active, ...)
{ {
size_t tbl_size = 0;
va_list args; va_list args;
struct workqueue_struct *wq; struct workqueue_struct *wq;
struct pool_workqueue *pwq; struct pool_workqueue *pwq;
...@@ -4632,10 +4579,7 @@ struct workqueue_struct *alloc_workqueue(const char *fmt, ...@@ -4632,10 +4579,7 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
flags |= WQ_UNBOUND; flags |= WQ_UNBOUND;
/* allocate wq and format name */ /* allocate wq and format name */
if (flags & WQ_UNBOUND) wq = kzalloc(sizeof(*wq), GFP_KERNEL);
tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
if (!wq) if (!wq)
return NULL; return NULL;
...@@ -4730,7 +4674,7 @@ static bool pwq_busy(struct pool_workqueue *pwq) ...@@ -4730,7 +4674,7 @@ static bool pwq_busy(struct pool_workqueue *pwq)
void destroy_workqueue(struct workqueue_struct *wq) void destroy_workqueue(struct workqueue_struct *wq)
{ {
struct pool_workqueue *pwq; struct pool_workqueue *pwq;
int cpu, node; int cpu;
/* /*
* Remove it from sysfs first so that sanity check failure doesn't * Remove it from sysfs first so that sanity check failure doesn't
...@@ -4789,29 +4733,23 @@ void destroy_workqueue(struct workqueue_struct *wq) ...@@ -4789,29 +4733,23 @@ void destroy_workqueue(struct workqueue_struct *wq)
list_del_rcu(&wq->list); list_del_rcu(&wq->list);
mutex_unlock(&wq_pool_mutex); mutex_unlock(&wq_pool_mutex);
if (!(wq->flags & WQ_UNBOUND)) { /*
for_each_possible_cpu(cpu) * We're the sole accessor of @wq. Directly access cpu_pwq and dfl_pwq
put_pwq_unlocked(*per_cpu_ptr(wq->cpu_pwq, cpu)); * to put the base refs. @wq will be auto-destroyed from the last
} else { * pwq_put. RCU read lock prevents @wq from going away from under us.
/* */
* We're the sole accessor of @wq at this point. Directly rcu_read_lock();
* access numa_pwq_tbl[] and dfl_pwq to put the base refs.
* @wq will be freed when the last pwq is released.
*/
for_each_node(node) {
pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
RCU_INIT_POINTER(wq->numa_pwq_tbl[node], NULL);
put_pwq_unlocked(pwq);
}
/* for_each_possible_cpu(cpu) {
* Put dfl_pwq. @wq may be freed any time after dfl_pwq is pwq = rcu_access_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu));
* put. Don't access it afterwards. RCU_INIT_POINTER(*per_cpu_ptr(wq->cpu_pwq, cpu), NULL);
*/
pwq = wq->dfl_pwq;
wq->dfl_pwq = NULL;
put_pwq_unlocked(pwq); put_pwq_unlocked(pwq);
} }
put_pwq_unlocked(wq->dfl_pwq);
wq->dfl_pwq = NULL;
rcu_read_unlock();
} }
EXPORT_SYMBOL_GPL(destroy_workqueue); EXPORT_SYMBOL_GPL(destroy_workqueue);
...@@ -4888,10 +4826,11 @@ bool current_is_workqueue_rescuer(void) ...@@ -4888,10 +4826,11 @@ bool current_is_workqueue_rescuer(void)
* unreliable and only useful as advisory hints or for debugging. * unreliable and only useful as advisory hints or for debugging.
* *
* If @cpu is WORK_CPU_UNBOUND, the test is performed on the local CPU. * If @cpu is WORK_CPU_UNBOUND, the test is performed on the local CPU.
* Note that both per-cpu and unbound workqueues may be associated with *
* multiple pool_workqueues which have separate congested states. A * With the exception of ordered workqueues, all workqueues have per-cpu
* workqueue being congested on one CPU doesn't mean the workqueue is also * pool_workqueues, each with its own congested state. A workqueue being
* contested on other CPUs / NUMA nodes. * congested on one CPU doesn't mean that the workqueue is contested on any
* other CPUs.
* *
* Return: * Return:
* %true if congested, %false otherwise. * %true if congested, %false otherwise.
...@@ -4907,12 +4846,9 @@ bool workqueue_congested(int cpu, struct workqueue_struct *wq) ...@@ -4907,12 +4846,9 @@ bool workqueue_congested(int cpu, struct workqueue_struct *wq)
if (cpu == WORK_CPU_UNBOUND) if (cpu == WORK_CPU_UNBOUND)
cpu = smp_processor_id(); cpu = smp_processor_id();
if (!(wq->flags & WQ_UNBOUND)) pwq = *per_cpu_ptr(wq->cpu_pwq, cpu);
pwq = *per_cpu_ptr(wq->cpu_pwq, cpu);
else
pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
ret = !list_empty(&pwq->inactive_works); ret = !list_empty(&pwq->inactive_works);
preempt_enable(); preempt_enable();
rcu_read_unlock(); rcu_read_unlock();
...@@ -6434,7 +6370,7 @@ void __init workqueue_init_early(void) ...@@ -6434,7 +6370,7 @@ void __init workqueue_init_early(void)
system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0); system_long_wq = alloc_workqueue("events_long", 0, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND, system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE); WQ_MAX_ACTIVE);
system_freezable_wq = alloc_workqueue("events_freezable", system_freezable_wq = alloc_workqueue("events_freezable",
WQ_FREEZABLE, 0); WQ_FREEZABLE, 0);
system_power_efficient_wq = alloc_workqueue("events_power_efficient", system_power_efficient_wq = alloc_workqueue("events_power_efficient",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment