Commit 6ba94429 authored by Frederic Weisbecker's avatar Frederic Weisbecker Committed by Tejun Heo

workqueue: Reorder sysfs code

The sysfs code usually belongs to the botom of the file since it deals
with high level objects. In the workqueue code it's misplaced and such
that we'll need to work around functions references to allow the sysfs
code to call APIs like apply_workqueue_attrs().

Lets move that block further in the file, almost the botom.

And declare workqueue_sysfs_unregister() just before destroy_workqueue()
which reference it.

tj: Moved workqueue_sysfs_unregister() forward declaration where other
    forward declarations are.
Suggested-by: default avatarTejun Heo <tj@kernel.org>
Cc: Christoph Lameter <cl@linux.com>
Cc: Kevin Hilman <khilman@linaro.org>
Cc: Lai Jiangshan <laijs@cn.fujitsu.com>
Cc: Mike Galbraith <bitbucket@online.de>
Cc: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Cc: Tejun Heo <tj@kernel.org>
Cc: Viresh Kumar <viresh.kumar@linaro.org>
Signed-off-by: default avatarFrederic Weisbecker <fweisbec@gmail.com>
Signed-off-by: default avatarLai Jiangshan <laijs@cn.fujitsu.com>
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
parent bffc4375
...@@ -332,6 +332,7 @@ EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq); ...@@ -332,6 +332,7 @@ EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
static int worker_thread(void *__worker); static int worker_thread(void *__worker);
static void copy_workqueue_attrs(struct workqueue_attrs *to, static void copy_workqueue_attrs(struct workqueue_attrs *to,
const struct workqueue_attrs *from); const struct workqueue_attrs *from);
static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
#define CREATE_TRACE_POINTS #define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h> #include <trace/events/workqueue.h>
...@@ -3001,792 +3002,475 @@ int execute_in_process_context(work_func_t fn, struct execute_work *ew) ...@@ -3001,792 +3002,475 @@ int execute_in_process_context(work_func_t fn, struct execute_work *ew)
} }
EXPORT_SYMBOL_GPL(execute_in_process_context); EXPORT_SYMBOL_GPL(execute_in_process_context);
#ifdef CONFIG_SYSFS /**
/* * free_workqueue_attrs - free a workqueue_attrs
* Workqueues with WQ_SYSFS flag set is visible to userland via * @attrs: workqueue_attrs to free
* /sys/bus/workqueue/devices/WQ_NAME. All visible workqueues have the
* following attributes.
*
* per_cpu RO bool : whether the workqueue is per-cpu or unbound
* max_active RW int : maximum number of in-flight work items
*
* Unbound workqueues have the following extra attributes.
* *
* id RO int : the associated pool ID * Undo alloc_workqueue_attrs().
* nice RW int : nice value of the workers
* cpumask RW mask : bitmask of allowed CPUs for the workers
*/ */
struct wq_device { void free_workqueue_attrs(struct workqueue_attrs *attrs)
struct workqueue_struct *wq;
struct device dev;
};
static struct workqueue_struct *dev_to_wq(struct device *dev)
{ {
struct wq_device *wq_dev = container_of(dev, struct wq_device, dev); if (attrs) {
free_cpumask_var(attrs->cpumask);
return wq_dev->wq; kfree(attrs);
}
} }
static ssize_t per_cpu_show(struct device *dev, struct device_attribute *attr, /**
char *buf) * alloc_workqueue_attrs - allocate a workqueue_attrs
* @gfp_mask: allocation mask to use
*
* Allocate a new workqueue_attrs, initialize with default settings and
* return it.
*
* Return: The allocated new workqueue_attr on success. %NULL on failure.
*/
struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); struct workqueue_attrs *attrs;
return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND)); attrs = kzalloc(sizeof(*attrs), gfp_mask);
if (!attrs)
goto fail;
if (!alloc_cpumask_var(&attrs->cpumask, gfp_mask))
goto fail;
cpumask_copy(attrs->cpumask, cpu_possible_mask);
return attrs;
fail:
free_workqueue_attrs(attrs);
return NULL;
} }
static DEVICE_ATTR_RO(per_cpu);
static ssize_t max_active_show(struct device *dev, static void copy_workqueue_attrs(struct workqueue_attrs *to,
struct device_attribute *attr, char *buf) const struct workqueue_attrs *from)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); to->nice = from->nice;
cpumask_copy(to->cpumask, from->cpumask);
return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active); /*
* Unlike hash and equality test, this function doesn't ignore
* ->no_numa as it is used for both pool and wq attrs. Instead,
* get_unbound_pool() explicitly clears ->no_numa after copying.
*/
to->no_numa = from->no_numa;
} }
static ssize_t max_active_store(struct device *dev, /* hash value of the content of @attr */
struct device_attribute *attr, const char *buf, static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
size_t count)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); u32 hash = 0;
int val;
if (sscanf(buf, "%d", &val) != 1 || val <= 0)
return -EINVAL;
workqueue_set_max_active(wq, val); hash = jhash_1word(attrs->nice, hash);
return count; hash = jhash(cpumask_bits(attrs->cpumask),
BITS_TO_LONGS(nr_cpumask_bits) * sizeof(long), hash);
return hash;
} }
static DEVICE_ATTR_RW(max_active);
static struct attribute *wq_sysfs_attrs[] = {
&dev_attr_per_cpu.attr,
&dev_attr_max_active.attr,
NULL,
};
ATTRIBUTE_GROUPS(wq_sysfs);
static ssize_t wq_pool_ids_show(struct device *dev, /* content equality test */
struct device_attribute *attr, char *buf) static bool wqattrs_equal(const struct workqueue_attrs *a,
const struct workqueue_attrs *b)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); if (a->nice != b->nice)
const char *delim = ""; return false;
int node, written = 0; if (!cpumask_equal(a->cpumask, b->cpumask))
return false;
rcu_read_lock_sched(); return true;
for_each_node(node) {
written += scnprintf(buf + written, PAGE_SIZE - written,
"%s%d:%d", delim, node,
unbound_pwq_by_node(wq, node)->pool->id);
delim = " ";
}
written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
rcu_read_unlock_sched();
return written;
} }
static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr, /**
char *buf) * init_worker_pool - initialize a newly zalloc'd worker_pool
* @pool: worker_pool to initialize
*
* Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs.
*
* Return: 0 on success, -errno on failure. Even on failure, all fields
* inside @pool proper are initialized and put_unbound_pool() can be called
* on @pool safely to release it.
*/
static int init_worker_pool(struct worker_pool *pool)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); spin_lock_init(&pool->lock);
int written; pool->id = -1;
pool->cpu = -1;
pool->node = NUMA_NO_NODE;
pool->flags |= POOL_DISASSOCIATED;
INIT_LIST_HEAD(&pool->worklist);
INIT_LIST_HEAD(&pool->idle_list);
hash_init(pool->busy_hash);
mutex_lock(&wq->mutex); init_timer_deferrable(&pool->idle_timer);
written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice); pool->idle_timer.function = idle_worker_timeout;
mutex_unlock(&wq->mutex); pool->idle_timer.data = (unsigned long)pool;
return written; setup_timer(&pool->mayday_timer, pool_mayday_timeout,
} (unsigned long)pool);
/* prepare workqueue_attrs for sysfs store operations */ mutex_init(&pool->manager_arb);
static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq) mutex_init(&pool->attach_mutex);
{ INIT_LIST_HEAD(&pool->workers);
struct workqueue_attrs *attrs;
attrs = alloc_workqueue_attrs(GFP_KERNEL); ida_init(&pool->worker_ida);
if (!attrs) INIT_HLIST_NODE(&pool->hash_node);
return NULL; pool->refcnt = 1;
mutex_lock(&wq->mutex); /* shouldn't fail above this point */
copy_workqueue_attrs(attrs, wq->unbound_attrs); pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
mutex_unlock(&wq->mutex); if (!pool->attrs)
return attrs; return -ENOMEM;
return 0;
} }
static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr, static void rcu_free_wq(struct rcu_head *rcu)
const char *buf, size_t count)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); struct workqueue_struct *wq =
struct workqueue_attrs *attrs; container_of(rcu, struct workqueue_struct, rcu);
int ret;
attrs = wq_sysfs_prep_attrs(wq);
if (!attrs)
return -ENOMEM;
if (sscanf(buf, "%d", &attrs->nice) == 1 && if (!(wq->flags & WQ_UNBOUND))
attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE) free_percpu(wq->cpu_pwqs);
ret = apply_workqueue_attrs(wq, attrs);
else else
ret = -EINVAL; free_workqueue_attrs(wq->unbound_attrs);
free_workqueue_attrs(attrs); kfree(wq->rescuer);
return ret ?: count; kfree(wq);
} }
static ssize_t wq_cpumask_show(struct device *dev, static void rcu_free_pool(struct rcu_head *rcu)
struct device_attribute *attr, char *buf)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
int written;
mutex_lock(&wq->mutex); ida_destroy(&pool->worker_ida);
written = scnprintf(buf, PAGE_SIZE, "%*pb\n", free_workqueue_attrs(pool->attrs);
cpumask_pr_args(wq->unbound_attrs->cpumask)); kfree(pool);
mutex_unlock(&wq->mutex);
return written;
} }
static ssize_t wq_cpumask_store(struct device *dev, /**
struct device_attribute *attr, * put_unbound_pool - put a worker_pool
const char *buf, size_t count) * @pool: worker_pool to put
*
* Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
* safe manner. get_unbound_pool() calls this function on its failure path
* and this function should be able to release pools which went through,
* successfully or not, init_worker_pool().
*
* Should be called with wq_pool_mutex held.
*/
static void put_unbound_pool(struct worker_pool *pool)
{ {
struct workqueue_struct *wq = dev_to_wq(dev); DECLARE_COMPLETION_ONSTACK(detach_completion);
struct workqueue_attrs *attrs; struct worker *worker;
int ret;
attrs = wq_sysfs_prep_attrs(wq); lockdep_assert_held(&wq_pool_mutex);
if (!attrs)
return -ENOMEM;
ret = cpumask_parse(buf, attrs->cpumask); if (--pool->refcnt)
if (!ret) return;
ret = apply_workqueue_attrs(wq, attrs);
free_workqueue_attrs(attrs); /* sanity checks */
return ret ?: count; if (WARN_ON(!(pool->cpu < 0)) ||
} WARN_ON(!list_empty(&pool->worklist)))
return;
static ssize_t wq_numa_show(struct device *dev, struct device_attribute *attr, /* release id and unhash */
char *buf) if (pool->id >= 0)
{ idr_remove(&worker_pool_idr, pool->id);
struct workqueue_struct *wq = dev_to_wq(dev); hash_del(&pool->hash_node);
int written;
mutex_lock(&wq->mutex);
written = scnprintf(buf, PAGE_SIZE, "%d\n",
!wq->unbound_attrs->no_numa);
mutex_unlock(&wq->mutex);
return written;
}
static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
const char *buf, size_t count)
{
struct workqueue_struct *wq = dev_to_wq(dev);
struct workqueue_attrs *attrs;
int v, ret;
attrs = wq_sysfs_prep_attrs(wq);
if (!attrs)
return -ENOMEM;
ret = -EINVAL; /*
if (sscanf(buf, "%d", &v) == 1) { * Become the manager and destroy all workers. Grabbing
attrs->no_numa = !v; * manager_arb prevents @pool's workers from blocking on
ret = apply_workqueue_attrs(wq, attrs); * attach_mutex.
} */
mutex_lock(&pool->manager_arb);
free_workqueue_attrs(attrs); spin_lock_irq(&pool->lock);
return ret ?: count; while ((worker = first_idle_worker(pool)))
} destroy_worker(worker);
WARN_ON(pool->nr_workers || pool->nr_idle);
spin_unlock_irq(&pool->lock);
static struct device_attribute wq_sysfs_unbound_attrs[] = { mutex_lock(&pool->attach_mutex);
__ATTR(pool_ids, 0444, wq_pool_ids_show, NULL), if (!list_empty(&pool->workers))
__ATTR(nice, 0644, wq_nice_show, wq_nice_store), pool->detach_completion = &detach_completion;
__ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store), mutex_unlock(&pool->attach_mutex);
__ATTR(numa, 0644, wq_numa_show, wq_numa_store),
__ATTR_NULL,
};
static struct bus_type wq_subsys = { if (pool->detach_completion)
.name = "workqueue", wait_for_completion(pool->detach_completion);
.dev_groups = wq_sysfs_groups,
};
static int __init wq_sysfs_init(void) mutex_unlock(&pool->manager_arb);
{
return subsys_virtual_register(&wq_subsys, NULL);
}
core_initcall(wq_sysfs_init);
static void wq_device_release(struct device *dev) /* shut down the timers */
{ del_timer_sync(&pool->idle_timer);
struct wq_device *wq_dev = container_of(dev, struct wq_device, dev); del_timer_sync(&pool->mayday_timer);
kfree(wq_dev); /* sched-RCU protected to allow dereferences from get_work_pool() */
call_rcu_sched(&pool->rcu, rcu_free_pool);
} }
/** /**
* workqueue_sysfs_register - make a workqueue visible in sysfs * get_unbound_pool - get a worker_pool with the specified attributes
* @wq: the workqueue to register * @attrs: the attributes of the worker_pool to get
* *
* Expose @wq in sysfs under /sys/bus/workqueue/devices. * Obtain a worker_pool which has the same attributes as @attrs, bump the
* alloc_workqueue*() automatically calls this function if WQ_SYSFS is set * reference count and return it. If there already is a matching
* which is the preferred method. * worker_pool, it will be used; otherwise, this function attempts to
* create a new one.
* *
* Workqueue user should use this function directly iff it wants to apply * Should be called with wq_pool_mutex held.
* workqueue_attrs before making the workqueue visible in sysfs; otherwise,
* apply_workqueue_attrs() may race against userland updating the
* attributes.
* *
* Return: 0 on success, -errno on failure. * Return: On success, a worker_pool with the same attributes as @attrs.
* On failure, %NULL.
*/ */
int workqueue_sysfs_register(struct workqueue_struct *wq) static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
{ {
struct wq_device *wq_dev; u32 hash = wqattrs_hash(attrs);
int ret; struct worker_pool *pool;
int node;
/* lockdep_assert_held(&wq_pool_mutex);
* Adjusting max_active or creating new pwqs by applyting
* attributes breaks ordering guarantee. Disallow exposing ordered
* workqueues.
*/
if (WARN_ON(wq->flags & __WQ_ORDERED))
return -EINVAL;
wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL); /* do we already have a matching pool? */
if (!wq_dev) hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
return -ENOMEM; if (wqattrs_equal(pool->attrs, attrs)) {
pool->refcnt++;
return pool;
}
}
wq_dev->wq = wq; /* nope, create a new one */
wq_dev->dev.bus = &wq_subsys; pool = kzalloc(sizeof(*pool), GFP_KERNEL);
wq_dev->dev.init_name = wq->name; if (!pool || init_worker_pool(pool) < 0)
wq_dev->dev.release = wq_device_release; goto fail;
lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */
copy_workqueue_attrs(pool->attrs, attrs);
/* /*
* unbound_attrs are created separately. Suppress uevent until * no_numa isn't a worker_pool attribute, always clear it. See
* everything is ready. * 'struct workqueue_attrs' comments for detail.
*/ */
dev_set_uevent_suppress(&wq_dev->dev, true); pool->attrs->no_numa = false;
ret = device_register(&wq_dev->dev);
if (ret) {
kfree(wq_dev);
wq->wq_dev = NULL;
return ret;
}
if (wq->flags & WQ_UNBOUND) {
struct device_attribute *attr;
for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) { /* if cpumask is contained inside a NUMA node, we belong to that node */
ret = device_create_file(&wq_dev->dev, attr); if (wq_numa_enabled) {
if (ret) { for_each_node(node) {
device_unregister(&wq_dev->dev); if (cpumask_subset(pool->attrs->cpumask,
wq->wq_dev = NULL; wq_numa_possible_cpumask[node])) {
return ret; pool->node = node;
break;
} }
} }
} }
dev_set_uevent_suppress(&wq_dev->dev, false); if (worker_pool_assign_id(pool) < 0)
kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD); goto fail;
return 0;
}
/** /* create and start the initial worker */
* workqueue_sysfs_unregister - undo workqueue_sysfs_register() if (!create_worker(pool))
* @wq: the workqueue to unregister goto fail;
*
* If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
*/
static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
{
struct wq_device *wq_dev = wq->wq_dev;
if (!wq->wq_dev) /* install */
return; hash_add(unbound_pool_hash, &pool->hash_node, hash);
wq->wq_dev = NULL; return pool;
device_unregister(&wq_dev->dev); fail:
if (pool)
put_unbound_pool(pool);
return NULL;
} }
#else /* CONFIG_SYSFS */
static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
#endif /* CONFIG_SYSFS */
/** static void rcu_free_pwq(struct rcu_head *rcu)
* free_workqueue_attrs - free a workqueue_attrs
* @attrs: workqueue_attrs to free
*
* Undo alloc_workqueue_attrs().
*/
void free_workqueue_attrs(struct workqueue_attrs *attrs)
{ {
if (attrs) { kmem_cache_free(pwq_cache,
free_cpumask_var(attrs->cpumask); container_of(rcu, struct pool_workqueue, rcu));
kfree(attrs);
}
} }
/** /*
* alloc_workqueue_attrs - allocate a workqueue_attrs * Scheduled on system_wq by put_pwq() when an unbound pwq hits zero refcnt
* @gfp_mask: allocation mask to use * and needs to be destroyed.
*
* Allocate a new workqueue_attrs, initialize with default settings and
* return it.
*
* Return: The allocated new workqueue_attr on success. %NULL on failure.
*/ */
struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask) static void pwq_unbound_release_workfn(struct work_struct *work)
{ {
struct workqueue_attrs *attrs; struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,
unbound_release_work);
struct workqueue_struct *wq = pwq->wq;
struct worker_pool *pool = pwq->pool;
bool is_last;
attrs = kzalloc(sizeof(*attrs), gfp_mask); if (WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND)))
if (!attrs) return;
goto fail;
if (!alloc_cpumask_var(&attrs->cpumask, gfp_mask))
goto fail;
cpumask_copy(attrs->cpumask, cpu_possible_mask); mutex_lock(&wq->mutex);
return attrs; list_del_rcu(&pwq->pwqs_node);
fail: is_last = list_empty(&wq->pwqs);
free_workqueue_attrs(attrs); mutex_unlock(&wq->mutex);
return NULL;
} mutex_lock(&wq_pool_mutex);
put_unbound_pool(pool);
mutex_unlock(&wq_pool_mutex);
call_rcu_sched(&pwq->rcu, rcu_free_pwq);
static void copy_workqueue_attrs(struct workqueue_attrs *to,
const struct workqueue_attrs *from)
{
to->nice = from->nice;
cpumask_copy(to->cpumask, from->cpumask);
/* /*
* Unlike hash and equality test, this function doesn't ignore * If we're the last pwq going away, @wq is already dead and no one
* ->no_numa as it is used for both pool and wq attrs. Instead, * is gonna access it anymore. Schedule RCU free.
* get_unbound_pool() explicitly clears ->no_numa after copying.
*/ */
to->no_numa = from->no_numa; if (is_last)
} call_rcu_sched(&wq->rcu, rcu_free_wq);
/* hash value of the content of @attr */
static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
{
u32 hash = 0;
hash = jhash_1word(attrs->nice, hash);
hash = jhash(cpumask_bits(attrs->cpumask),
BITS_TO_LONGS(nr_cpumask_bits) * sizeof(long), hash);
return hash;
}
/* content equality test */
static bool wqattrs_equal(const struct workqueue_attrs *a,
const struct workqueue_attrs *b)
{
if (a->nice != b->nice)
return false;
if (!cpumask_equal(a->cpumask, b->cpumask))
return false;
return true;
} }
/** /**
* init_worker_pool - initialize a newly zalloc'd worker_pool * pwq_adjust_max_active - update a pwq's max_active to the current setting
* @pool: worker_pool to initialize * @pwq: target pool_workqueue
*
* Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs.
* *
* Return: 0 on success, -errno on failure. Even on failure, all fields * If @pwq isn't freezing, set @pwq->max_active to the associated
* inside @pool proper are initialized and put_unbound_pool() can be called * workqueue's saved_max_active and activate delayed work items
* on @pool safely to release it. * accordingly. If @pwq is freezing, clear @pwq->max_active to zero.
*/ */
static int init_worker_pool(struct worker_pool *pool) static void pwq_adjust_max_active(struct pool_workqueue *pwq)
{ {
spin_lock_init(&pool->lock); struct workqueue_struct *wq = pwq->wq;
pool->id = -1; bool freezable = wq->flags & WQ_FREEZABLE;
pool->cpu = -1;
pool->node = NUMA_NO_NODE;
pool->flags |= POOL_DISASSOCIATED;
INIT_LIST_HEAD(&pool->worklist);
INIT_LIST_HEAD(&pool->idle_list);
hash_init(pool->busy_hash);
init_timer_deferrable(&pool->idle_timer);
pool->idle_timer.function = idle_worker_timeout;
pool->idle_timer.data = (unsigned long)pool;
setup_timer(&pool->mayday_timer, pool_mayday_timeout, /* for @wq->saved_max_active */
(unsigned long)pool); lockdep_assert_held(&wq->mutex);
mutex_init(&pool->manager_arb); /* fast exit for non-freezable wqs */
mutex_init(&pool->attach_mutex); if (!freezable && pwq->max_active == wq->saved_max_active)
INIT_LIST_HEAD(&pool->workers); return;
ida_init(&pool->worker_ida); spin_lock_irq(&pwq->pool->lock);
INIT_HLIST_NODE(&pool->hash_node);
pool->refcnt = 1;
/* shouldn't fail above this point */ /*
pool->attrs = alloc_workqueue_attrs(GFP_KERNEL); * During [un]freezing, the caller is responsible for ensuring that
if (!pool->attrs) * this function is called at least once after @workqueue_freezing
return -ENOMEM; * is updated and visible.
return 0; */
} if (!freezable || !workqueue_freezing) {
pwq->max_active = wq->saved_max_active;
static void rcu_free_wq(struct rcu_head *rcu) while (!list_empty(&pwq->delayed_works) &&
{ pwq->nr_active < pwq->max_active)
struct workqueue_struct *wq = pwq_activate_first_delayed(pwq);
container_of(rcu, struct workqueue_struct, rcu);
if (!(wq->flags & WQ_UNBOUND)) /*
free_percpu(wq->cpu_pwqs); * Need to kick a worker after thawed or an unbound wq's
else * max_active is bumped. It's a slow path. Do it always.
free_workqueue_attrs(wq->unbound_attrs); */
wake_up_worker(pwq->pool);
} else {
pwq->max_active = 0;
}
kfree(wq->rescuer); spin_unlock_irq(&pwq->pool->lock);
kfree(wq);
} }
static void rcu_free_pool(struct rcu_head *rcu) /* initialize newly alloced @pwq which is associated with @wq and @pool */
static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
struct worker_pool *pool)
{ {
struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu); BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
ida_destroy(&pool->worker_ida); memset(pwq, 0, sizeof(*pwq));
free_workqueue_attrs(pool->attrs);
kfree(pool); pwq->pool = pool;
pwq->wq = wq;
pwq->flush_color = -1;
pwq->refcnt = 1;
INIT_LIST_HEAD(&pwq->delayed_works);
INIT_LIST_HEAD(&pwq->pwqs_node);
INIT_LIST_HEAD(&pwq->mayday_node);
INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn);
} }
/** /* sync @pwq with the current state of its associated wq and link it */
* put_unbound_pool - put a worker_pool static void link_pwq(struct pool_workqueue *pwq)
* @pool: worker_pool to put
*
* Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
* safe manner. get_unbound_pool() calls this function on its failure path
* and this function should be able to release pools which went through,
* successfully or not, init_worker_pool().
*
* Should be called with wq_pool_mutex held.
*/
static void put_unbound_pool(struct worker_pool *pool)
{ {
DECLARE_COMPLETION_ONSTACK(detach_completion); struct workqueue_struct *wq = pwq->wq;
struct worker *worker;
lockdep_assert_held(&wq_pool_mutex); lockdep_assert_held(&wq->mutex);
if (--pool->refcnt) /* may be called multiple times, ignore if already linked */
if (!list_empty(&pwq->pwqs_node))
return; return;
/* sanity checks */ /* set the matching work_color */
if (WARN_ON(!(pool->cpu < 0)) || pwq->work_color = wq->work_color;
WARN_ON(!list_empty(&pool->worklist)))
return;
/* release id and unhash */ /* sync max_active to the current setting */
if (pool->id >= 0) pwq_adjust_max_active(pwq);
idr_remove(&worker_pool_idr, pool->id);
hash_del(&pool->hash_node);
/* /* link in @pwq */
* Become the manager and destroy all workers. Grabbing list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
* manager_arb prevents @pool's workers from blocking on }
* attach_mutex.
*/
mutex_lock(&pool->manager_arb);
spin_lock_irq(&pool->lock); /* obtain a pool matching @attr and create a pwq associating the pool and @wq */
while ((worker = first_idle_worker(pool))) static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
destroy_worker(worker); const struct workqueue_attrs *attrs)
WARN_ON(pool->nr_workers || pool->nr_idle); {
spin_unlock_irq(&pool->lock); struct worker_pool *pool;
struct pool_workqueue *pwq;
mutex_lock(&pool->attach_mutex); lockdep_assert_held(&wq_pool_mutex);
if (!list_empty(&pool->workers))
pool->detach_completion = &detach_completion;
mutex_unlock(&pool->attach_mutex);
if (pool->detach_completion) pool = get_unbound_pool(attrs);
wait_for_completion(pool->detach_completion); if (!pool)
return NULL;
mutex_unlock(&pool->manager_arb); pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
if (!pwq) {
put_unbound_pool(pool);
return NULL;
}
/* shut down the timers */ init_pwq(pwq, wq, pool);
del_timer_sync(&pool->idle_timer); return pwq;
del_timer_sync(&pool->mayday_timer); }
/* sched-RCU protected to allow dereferences from get_work_pool() */ /* undo alloc_unbound_pwq(), used only in the error path */
call_rcu_sched(&pool->rcu, rcu_free_pool); static void free_unbound_pwq(struct pool_workqueue *pwq)
{
lockdep_assert_held(&wq_pool_mutex);
if (pwq) {
put_unbound_pool(pwq->pool);
kmem_cache_free(pwq_cache, pwq);
}
} }
/** /**
* get_unbound_pool - get a worker_pool with the specified attributes * wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node
* @attrs: the attributes of the worker_pool to get * @attrs: the wq_attrs of interest
* @node: the target NUMA node
* @cpu_going_down: if >= 0, the CPU to consider as offline
* @cpumask: outarg, the resulting cpumask
* *
* Obtain a worker_pool which has the same attributes as @attrs, bump the * Calculate the cpumask a workqueue with @attrs should use on @node. If
* reference count and return it. If there already is a matching * @cpu_going_down is >= 0, that cpu is considered offline during
* worker_pool, it will be used; otherwise, this function attempts to * calculation. The result is stored in @cpumask.
* create a new one.
* *
* Should be called with wq_pool_mutex held. * If NUMA affinity is not enabled, @attrs->cpumask is always used. If
* enabled and @node has online CPUs requested by @attrs, the returned
* cpumask is the intersection of the possible CPUs of @node and
* @attrs->cpumask.
* *
* Return: On success, a worker_pool with the same attributes as @attrs. * The caller is responsible for ensuring that the cpumask of @node stays
* On failure, %NULL. * stable.
*
* Return: %true if the resulting @cpumask is different from @attrs->cpumask,
* %false if equal.
*/ */
static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs) static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node,
int cpu_going_down, cpumask_t *cpumask)
{ {
u32 hash = wqattrs_hash(attrs); if (!wq_numa_enabled || attrs->no_numa)
struct worker_pool *pool; goto use_dfl;
int node;
lockdep_assert_held(&wq_pool_mutex); /* does @node have any online CPUs @attrs wants? */
cpumask_and(cpumask, cpumask_of_node(node), attrs->cpumask);
if (cpu_going_down >= 0)
cpumask_clear_cpu(cpu_going_down, cpumask);
/* do we already have a matching pool? */ if (cpumask_empty(cpumask))
hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) { goto use_dfl;
if (wqattrs_equal(pool->attrs, attrs)) {
pool->refcnt++;
return pool;
}
}
/* nope, create a new one */
pool = kzalloc(sizeof(*pool), GFP_KERNEL);
if (!pool || init_worker_pool(pool) < 0)
goto fail;
lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */
copy_workqueue_attrs(pool->attrs, attrs);
/*
* no_numa isn't a worker_pool attribute, always clear it. See
* 'struct workqueue_attrs' comments for detail.
*/
pool->attrs->no_numa = false;
/* if cpumask is contained inside a NUMA node, we belong to that node */
if (wq_numa_enabled) {
for_each_node(node) {
if (cpumask_subset(pool->attrs->cpumask,
wq_numa_possible_cpumask[node])) {
pool->node = node;
break;
}
}
}
if (worker_pool_assign_id(pool) < 0)
goto fail;
/* create and start the initial worker */
if (!create_worker(pool))
goto fail;
/* install */
hash_add(unbound_pool_hash, &pool->hash_node, hash);
return pool;
fail:
if (pool)
put_unbound_pool(pool);
return NULL;
}
static void rcu_free_pwq(struct rcu_head *rcu)
{
kmem_cache_free(pwq_cache,
container_of(rcu, struct pool_workqueue, rcu));
}
/*
* Scheduled on system_wq by put_pwq() when an unbound pwq hits zero refcnt
* and needs to be destroyed.
*/
static void pwq_unbound_release_workfn(struct work_struct *work)
{
struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,
unbound_release_work);
struct workqueue_struct *wq = pwq->wq;
struct worker_pool *pool = pwq->pool;
bool is_last;
if (WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND)))
return;
mutex_lock(&wq->mutex);
list_del_rcu(&pwq->pwqs_node);
is_last = list_empty(&wq->pwqs);
mutex_unlock(&wq->mutex);
mutex_lock(&wq_pool_mutex);
put_unbound_pool(pool);
mutex_unlock(&wq_pool_mutex);
call_rcu_sched(&pwq->rcu, rcu_free_pwq);
/*
* If we're the last pwq going away, @wq is already dead and no one
* is gonna access it anymore. Schedule RCU free.
*/
if (is_last)
call_rcu_sched(&wq->rcu, rcu_free_wq);
}
/**
* pwq_adjust_max_active - update a pwq's max_active to the current setting
* @pwq: target pool_workqueue
*
* If @pwq isn't freezing, set @pwq->max_active to the associated
* workqueue's saved_max_active and activate delayed work items
* accordingly. If @pwq is freezing, clear @pwq->max_active to zero.
*/
static void pwq_adjust_max_active(struct pool_workqueue *pwq)
{
struct workqueue_struct *wq = pwq->wq;
bool freezable = wq->flags & WQ_FREEZABLE;
/* for @wq->saved_max_active */
lockdep_assert_held(&wq->mutex);
/* fast exit for non-freezable wqs */
if (!freezable && pwq->max_active == wq->saved_max_active)
return;
spin_lock_irq(&pwq->pool->lock);
/*
* During [un]freezing, the caller is responsible for ensuring that
* this function is called at least once after @workqueue_freezing
* is updated and visible.
*/
if (!freezable || !workqueue_freezing) {
pwq->max_active = wq->saved_max_active;
while (!list_empty(&pwq->delayed_works) &&
pwq->nr_active < pwq->max_active)
pwq_activate_first_delayed(pwq);
/*
* Need to kick a worker after thawed or an unbound wq's
* max_active is bumped. It's a slow path. Do it always.
*/
wake_up_worker(pwq->pool);
} else {
pwq->max_active = 0;
}
spin_unlock_irq(&pwq->pool->lock);
}
/* initialize newly alloced @pwq which is associated with @wq and @pool */
static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
struct worker_pool *pool)
{
BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
memset(pwq, 0, sizeof(*pwq));
pwq->pool = pool;
pwq->wq = wq;
pwq->flush_color = -1;
pwq->refcnt = 1;
INIT_LIST_HEAD(&pwq->delayed_works);
INIT_LIST_HEAD(&pwq->pwqs_node);
INIT_LIST_HEAD(&pwq->mayday_node);
INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn);
}
/* sync @pwq with the current state of its associated wq and link it */
static void link_pwq(struct pool_workqueue *pwq)
{
struct workqueue_struct *wq = pwq->wq;
lockdep_assert_held(&wq->mutex);
/* may be called multiple times, ignore if already linked */
if (!list_empty(&pwq->pwqs_node))
return;
/* set the matching work_color */
pwq->work_color = wq->work_color;
/* sync max_active to the current setting */
pwq_adjust_max_active(pwq);
/* link in @pwq */
list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
}
/* obtain a pool matching @attr and create a pwq associating the pool and @wq */
static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs)
{
struct worker_pool *pool;
struct pool_workqueue *pwq;
lockdep_assert_held(&wq_pool_mutex);
pool = get_unbound_pool(attrs);
if (!pool)
return NULL;
pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
if (!pwq) {
put_unbound_pool(pool);
return NULL;
}
init_pwq(pwq, wq, pool);
return pwq;
}
/* undo alloc_unbound_pwq(), used only in the error path */
static void free_unbound_pwq(struct pool_workqueue *pwq)
{
lockdep_assert_held(&wq_pool_mutex);
if (pwq) {
put_unbound_pool(pwq->pool);
kmem_cache_free(pwq_cache, pwq);
}
}
/**
* wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node
* @attrs: the wq_attrs of interest
* @node: the target NUMA node
* @cpu_going_down: if >= 0, the CPU to consider as offline
* @cpumask: outarg, the resulting cpumask
*
* Calculate the cpumask a workqueue with @attrs should use on @node. If
* @cpu_going_down is >= 0, that cpu is considered offline during
* calculation. The result is stored in @cpumask.
*
* If NUMA affinity is not enabled, @attrs->cpumask is always used. If
* enabled and @node has online CPUs requested by @attrs, the returned
* cpumask is the intersection of the possible CPUs of @node and
* @attrs->cpumask.
*
* The caller is responsible for ensuring that the cpumask of @node stays
* stable.
*
* Return: %true if the resulting @cpumask is different from @attrs->cpumask,
* %false if equal.
*/
static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node,
int cpu_going_down, cpumask_t *cpumask)
{
if (!wq_numa_enabled || attrs->no_numa)
goto use_dfl;
/* does @node have any online CPUs @attrs wants? */
cpumask_and(cpumask, cpumask_of_node(node), attrs->cpumask);
if (cpu_going_down >= 0)
cpumask_clear_cpu(cpu_going_down, cpumask);
if (cpumask_empty(cpumask))
goto use_dfl;
/* yeap, return possible CPUs in @node that @attrs wants */ /* yeap, return possible CPUs in @node that @attrs wants */
cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]); cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]);
...@@ -4817,202 +4501,519 @@ static int workqueue_cpu_up_callback(struct notifier_block *nfb, ...@@ -4817,202 +4501,519 @@ static int workqueue_cpu_up_callback(struct notifier_block *nfb,
else if (pool->cpu < 0) else if (pool->cpu < 0)
restore_unbound_workers_cpumask(pool, cpu); restore_unbound_workers_cpumask(pool, cpu);
mutex_unlock(&pool->attach_mutex); mutex_unlock(&pool->attach_mutex);
} }
/* update NUMA affinity of unbound workqueues */
list_for_each_entry(wq, &workqueues, list)
wq_update_unbound_numa(wq, cpu, true);
mutex_unlock(&wq_pool_mutex);
break;
}
return NOTIFY_OK;
}
/*
* Workqueues should be brought down after normal priority CPU notifiers.
* This will be registered as low priority CPU notifier.
*/
static int workqueue_cpu_down_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{
int cpu = (unsigned long)hcpu;
struct work_struct unbind_work;
struct workqueue_struct *wq;
switch (action & ~CPU_TASKS_FROZEN) {
case CPU_DOWN_PREPARE:
/* unbinding per-cpu workers should happen on the local CPU */
INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
queue_work_on(cpu, system_highpri_wq, &unbind_work);
/* update NUMA affinity of unbound workqueues */
mutex_lock(&wq_pool_mutex);
list_for_each_entry(wq, &workqueues, list)
wq_update_unbound_numa(wq, cpu, false);
mutex_unlock(&wq_pool_mutex);
/* wait for per-cpu unbinding to finish */
flush_work(&unbind_work);
destroy_work_on_stack(&unbind_work);
break;
}
return NOTIFY_OK;
}
#ifdef CONFIG_SMP
struct work_for_cpu {
struct work_struct work;
long (*fn)(void *);
void *arg;
long ret;
};
static void work_for_cpu_fn(struct work_struct *work)
{
struct work_for_cpu *wfc = container_of(work, struct work_for_cpu, work);
wfc->ret = wfc->fn(wfc->arg);
}
/**
* work_on_cpu - run a function in user context on a particular cpu
* @cpu: the cpu to run on
* @fn: the function to run
* @arg: the function arg
*
* It is up to the caller to ensure that the cpu doesn't go offline.
* The caller must not hold any locks which would prevent @fn from completing.
*
* Return: The value @fn returns.
*/
long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
{
struct work_for_cpu wfc = { .fn = fn, .arg = arg };
INIT_WORK_ONSTACK(&wfc.work, work_for_cpu_fn);
schedule_work_on(cpu, &wfc.work);
flush_work(&wfc.work);
destroy_work_on_stack(&wfc.work);
return wfc.ret;
}
EXPORT_SYMBOL_GPL(work_on_cpu);
#endif /* CONFIG_SMP */
#ifdef CONFIG_FREEZER
/**
* freeze_workqueues_begin - begin freezing workqueues
*
* Start freezing workqueues. After this function returns, all freezable
* workqueues will queue new works to their delayed_works list instead of
* pool->worklist.
*
* CONTEXT:
* Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
*/
void freeze_workqueues_begin(void)
{
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex);
WARN_ON_ONCE(workqueue_freezing);
workqueue_freezing = true;
list_for_each_entry(wq, &workqueues, list) {
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex);
}
mutex_unlock(&wq_pool_mutex);
}
/**
* freeze_workqueues_busy - are freezable workqueues still busy?
*
* Check whether freezing is complete. This function must be called
* between freeze_workqueues_begin() and thaw_workqueues().
*
* CONTEXT:
* Grabs and releases wq_pool_mutex.
*
* Return:
* %true if some freezable workqueues are still busy. %false if freezing
* is complete.
*/
bool freeze_workqueues_busy(void)
{
bool busy = false;
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex);
WARN_ON_ONCE(!workqueue_freezing);
list_for_each_entry(wq, &workqueues, list) {
if (!(wq->flags & WQ_FREEZABLE))
continue;
/*
* nr_active is monotonically decreasing. It's safe
* to peek without lock.
*/
rcu_read_lock_sched();
for_each_pwq(pwq, wq) {
WARN_ON_ONCE(pwq->nr_active < 0);
if (pwq->nr_active) {
busy = true;
rcu_read_unlock_sched();
goto out_unlock;
}
}
rcu_read_unlock_sched();
}
out_unlock:
mutex_unlock(&wq_pool_mutex);
return busy;
}
/**
* thaw_workqueues - thaw workqueues
*
* Thaw workqueues. Normal queueing is restored and all collected
* frozen works are transferred to their respective pool worklists.
*
* CONTEXT:
* Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
*/
void thaw_workqueues(void)
{
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex);
if (!workqueue_freezing)
goto out_unlock;
workqueue_freezing = false;
/* restore max_active and repopulate worklist */
list_for_each_entry(wq, &workqueues, list) {
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex);
}
out_unlock:
mutex_unlock(&wq_pool_mutex);
}
#endif /* CONFIG_FREEZER */
#ifdef CONFIG_SYSFS
/*
* Workqueues with WQ_SYSFS flag set is visible to userland via
* /sys/bus/workqueue/devices/WQ_NAME. All visible workqueues have the
* following attributes.
*
* per_cpu RO bool : whether the workqueue is per-cpu or unbound
* max_active RW int : maximum number of in-flight work items
*
* Unbound workqueues have the following extra attributes.
*
* id RO int : the associated pool ID
* nice RW int : nice value of the workers
* cpumask RW mask : bitmask of allowed CPUs for the workers
*/
struct wq_device {
struct workqueue_struct *wq;
struct device dev;
};
static struct workqueue_struct *dev_to_wq(struct device *dev)
{
struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
return wq_dev->wq;
}
static ssize_t per_cpu_show(struct device *dev, struct device_attribute *attr,
char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND));
}
static DEVICE_ATTR_RO(per_cpu);
static ssize_t max_active_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active);
}
static ssize_t max_active_store(struct device *dev,
struct device_attribute *attr, const char *buf,
size_t count)
{
struct workqueue_struct *wq = dev_to_wq(dev);
int val;
if (sscanf(buf, "%d", &val) != 1 || val <= 0)
return -EINVAL;
workqueue_set_max_active(wq, val);
return count;
}
static DEVICE_ATTR_RW(max_active);
static struct attribute *wq_sysfs_attrs[] = {
&dev_attr_per_cpu.attr,
&dev_attr_max_active.attr,
NULL,
};
ATTRIBUTE_GROUPS(wq_sysfs);
static ssize_t wq_pool_ids_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
const char *delim = "";
int node, written = 0;
rcu_read_lock_sched();
for_each_node(node) {
written += scnprintf(buf + written, PAGE_SIZE - written,
"%s%d:%d", delim, node,
unbound_pwq_by_node(wq, node)->pool->id);
delim = " ";
}
written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
rcu_read_unlock_sched();
return written;
}
static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
int written;
mutex_lock(&wq->mutex);
written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice);
mutex_unlock(&wq->mutex);
return written;
}
/* prepare workqueue_attrs for sysfs store operations */
static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
{
struct workqueue_attrs *attrs;
attrs = alloc_workqueue_attrs(GFP_KERNEL);
if (!attrs)
return NULL;
mutex_lock(&wq->mutex);
copy_workqueue_attrs(attrs, wq->unbound_attrs);
mutex_unlock(&wq->mutex);
return attrs;
}
static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
const char *buf, size_t count)
{
struct workqueue_struct *wq = dev_to_wq(dev);
struct workqueue_attrs *attrs;
int ret;
attrs = wq_sysfs_prep_attrs(wq);
if (!attrs)
return -ENOMEM;
if (sscanf(buf, "%d", &attrs->nice) == 1 &&
attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE)
ret = apply_workqueue_attrs(wq, attrs);
else
ret = -EINVAL;
free_workqueue_attrs(attrs);
return ret ?: count;
}
static ssize_t wq_cpumask_show(struct device *dev,
struct device_attribute *attr, char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
int written;
mutex_lock(&wq->mutex);
written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
cpumask_pr_args(wq->unbound_attrs->cpumask));
mutex_unlock(&wq->mutex);
return written;
}
static ssize_t wq_cpumask_store(struct device *dev,
struct device_attribute *attr,
const char *buf, size_t count)
{
struct workqueue_struct *wq = dev_to_wq(dev);
struct workqueue_attrs *attrs;
int ret;
attrs = wq_sysfs_prep_attrs(wq);
if (!attrs)
return -ENOMEM;
ret = cpumask_parse(buf, attrs->cpumask);
if (!ret)
ret = apply_workqueue_attrs(wq, attrs);
free_workqueue_attrs(attrs);
return ret ?: count;
}
static ssize_t wq_numa_show(struct device *dev, struct device_attribute *attr,
char *buf)
{
struct workqueue_struct *wq = dev_to_wq(dev);
int written;
/* update NUMA affinity of unbound workqueues */ mutex_lock(&wq->mutex);
list_for_each_entry(wq, &workqueues, list) written = scnprintf(buf, PAGE_SIZE, "%d\n",
wq_update_unbound_numa(wq, cpu, true); !wq->unbound_attrs->no_numa);
mutex_unlock(&wq->mutex);
mutex_unlock(&wq_pool_mutex); return written;
break;
}
return NOTIFY_OK;
} }
/* static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
* Workqueues should be brought down after normal priority CPU notifiers. const char *buf, size_t count)
* This will be registered as low priority CPU notifier.
*/
static int workqueue_cpu_down_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{ {
int cpu = (unsigned long)hcpu; struct workqueue_struct *wq = dev_to_wq(dev);
struct work_struct unbind_work; struct workqueue_attrs *attrs;
struct workqueue_struct *wq; int v, ret;
switch (action & ~CPU_TASKS_FROZEN) {
case CPU_DOWN_PREPARE:
/* unbinding per-cpu workers should happen on the local CPU */
INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
queue_work_on(cpu, system_highpri_wq, &unbind_work);
/* update NUMA affinity of unbound workqueues */ attrs = wq_sysfs_prep_attrs(wq);
mutex_lock(&wq_pool_mutex); if (!attrs)
list_for_each_entry(wq, &workqueues, list) return -ENOMEM;
wq_update_unbound_numa(wq, cpu, false);
mutex_unlock(&wq_pool_mutex);
/* wait for per-cpu unbinding to finish */ ret = -EINVAL;
flush_work(&unbind_work); if (sscanf(buf, "%d", &v) == 1) {
destroy_work_on_stack(&unbind_work); attrs->no_numa = !v;
break; ret = apply_workqueue_attrs(wq, attrs);
} }
return NOTIFY_OK;
free_workqueue_attrs(attrs);
return ret ?: count;
} }
#ifdef CONFIG_SMP static struct device_attribute wq_sysfs_unbound_attrs[] = {
__ATTR(pool_ids, 0444, wq_pool_ids_show, NULL),
__ATTR(nice, 0644, wq_nice_show, wq_nice_store),
__ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store),
__ATTR(numa, 0644, wq_numa_show, wq_numa_store),
__ATTR_NULL,
};
struct work_for_cpu { static struct bus_type wq_subsys = {
struct work_struct work; .name = "workqueue",
long (*fn)(void *); .dev_groups = wq_sysfs_groups,
void *arg;
long ret;
}; };
static void work_for_cpu_fn(struct work_struct *work) static int __init wq_sysfs_init(void)
{ {
struct work_for_cpu *wfc = container_of(work, struct work_for_cpu, work); return subsys_virtual_register(&wq_subsys, NULL);
wfc->ret = wfc->fn(wfc->arg);
} }
core_initcall(wq_sysfs_init);
/** static void wq_device_release(struct device *dev)
* work_on_cpu - run a function in user context on a particular cpu
* @cpu: the cpu to run on
* @fn: the function to run
* @arg: the function arg
*
* It is up to the caller to ensure that the cpu doesn't go offline.
* The caller must not hold any locks which would prevent @fn from completing.
*
* Return: The value @fn returns.
*/
long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
{ {
struct work_for_cpu wfc = { .fn = fn, .arg = arg }; struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
INIT_WORK_ONSTACK(&wfc.work, work_for_cpu_fn); kfree(wq_dev);
schedule_work_on(cpu, &wfc.work);
flush_work(&wfc.work);
destroy_work_on_stack(&wfc.work);
return wfc.ret;
} }
EXPORT_SYMBOL_GPL(work_on_cpu);
#endif /* CONFIG_SMP */
#ifdef CONFIG_FREEZER
/** /**
* freeze_workqueues_begin - begin freezing workqueues * workqueue_sysfs_register - make a workqueue visible in sysfs
* @wq: the workqueue to register
* *
* Start freezing workqueues. After this function returns, all freezable * Expose @wq in sysfs under /sys/bus/workqueue/devices.
* workqueues will queue new works to their delayed_works list instead of * alloc_workqueue*() automatically calls this function if WQ_SYSFS is set
* pool->worklist. * which is the preferred method.
* *
* CONTEXT: * Workqueue user should use this function directly iff it wants to apply
* Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's. * workqueue_attrs before making the workqueue visible in sysfs; otherwise,
* apply_workqueue_attrs() may race against userland updating the
* attributes.
*
* Return: 0 on success, -errno on failure.
*/ */
void freeze_workqueues_begin(void) int workqueue_sysfs_register(struct workqueue_struct *wq)
{ {
struct workqueue_struct *wq; struct wq_device *wq_dev;
struct pool_workqueue *pwq; int ret;
mutex_lock(&wq_pool_mutex);
WARN_ON_ONCE(workqueue_freezing); /*
workqueue_freezing = true; * Adjusting max_active or creating new pwqs by applyting
* attributes breaks ordering guarantee. Disallow exposing ordered
* workqueues.
*/
if (WARN_ON(wq->flags & __WQ_ORDERED))
return -EINVAL;
list_for_each_entry(wq, &workqueues, list) { wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
mutex_lock(&wq->mutex); if (!wq_dev)
for_each_pwq(pwq, wq) return -ENOMEM;
pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex);
}
mutex_unlock(&wq_pool_mutex); wq_dev->wq = wq;
} wq_dev->dev.bus = &wq_subsys;
wq_dev->dev.init_name = wq->name;
wq_dev->dev.release = wq_device_release;
/** /*
* freeze_workqueues_busy - are freezable workqueues still busy? * unbound_attrs are created separately. Suppress uevent until
* * everything is ready.
* Check whether freezing is complete. This function must be called */
* between freeze_workqueues_begin() and thaw_workqueues(). dev_set_uevent_suppress(&wq_dev->dev, true);
*
* CONTEXT:
* Grabs and releases wq_pool_mutex.
*
* Return:
* %true if some freezable workqueues are still busy. %false if freezing
* is complete.
*/
bool freeze_workqueues_busy(void)
{
bool busy = false;
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex); ret = device_register(&wq_dev->dev);
if (ret) {
kfree(wq_dev);
wq->wq_dev = NULL;
return ret;
}
WARN_ON_ONCE(!workqueue_freezing); if (wq->flags & WQ_UNBOUND) {
struct device_attribute *attr;
list_for_each_entry(wq, &workqueues, list) { for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) {
if (!(wq->flags & WQ_FREEZABLE)) ret = device_create_file(&wq_dev->dev, attr);
continue; if (ret) {
/* device_unregister(&wq_dev->dev);
* nr_active is monotonically decreasing. It's safe wq->wq_dev = NULL;
* to peek without lock. return ret;
*/
rcu_read_lock_sched();
for_each_pwq(pwq, wq) {
WARN_ON_ONCE(pwq->nr_active < 0);
if (pwq->nr_active) {
busy = true;
rcu_read_unlock_sched();
goto out_unlock;
} }
} }
rcu_read_unlock_sched();
} }
out_unlock:
mutex_unlock(&wq_pool_mutex); dev_set_uevent_suppress(&wq_dev->dev, false);
return busy; kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD);
return 0;
} }
/** /**
* thaw_workqueues - thaw workqueues * workqueue_sysfs_unregister - undo workqueue_sysfs_register()
* * @wq: the workqueue to unregister
* Thaw workqueues. Normal queueing is restored and all collected
* frozen works are transferred to their respective pool worklists.
* *
* CONTEXT: * If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
* Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
*/ */
void thaw_workqueues(void) static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
{ {
struct workqueue_struct *wq; struct wq_device *wq_dev = wq->wq_dev;
struct pool_workqueue *pwq;
mutex_lock(&wq_pool_mutex);
if (!workqueue_freezing)
goto out_unlock;
workqueue_freezing = false;
/* restore max_active and repopulate worklist */ if (!wq->wq_dev)
list_for_each_entry(wq, &workqueues, list) { return;
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex);
}
out_unlock: wq->wq_dev = NULL;
mutex_unlock(&wq_pool_mutex); device_unregister(&wq_dev->dev);
} }
#endif /* CONFIG_FREEZER */ #else /* CONFIG_SYSFS */
static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
#endif /* CONFIG_SYSFS */
static void __init wq_numa_init(void) static void __init wq_numa_init(void)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment