Commit c34056a3 authored by Tejun Heo's avatar Tejun Heo

workqueue: introduce worker

Separate out worker thread related information to struct worker from
struct cpu_workqueue_struct and implement helper functions to deal
with the new struct worker.  The only change which is visible outside
is that now workqueue worker are all named "kworker/CPUID:WORKERID"
where WORKERID is allocated from per-cpu ida.

This is in preparation of concurrency managed workqueue where shared
multiple workers would be available per cpu.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
parent 73f53c4a
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
#include <linux/kallsyms.h> #include <linux/kallsyms.h>
#include <linux/debug_locks.h> #include <linux/debug_locks.h>
#include <linux/lockdep.h> #include <linux/lockdep.h>
#include <linux/idr.h>
/* /*
* Structure fields follow one of the following exclusion rules. * Structure fields follow one of the following exclusion rules.
...@@ -46,6 +47,15 @@ ...@@ -46,6 +47,15 @@
* W: workqueue_lock protected. * W: workqueue_lock protected.
*/ */
struct cpu_workqueue_struct;
struct worker {
struct work_struct *current_work; /* L: work being processed */
struct task_struct *task; /* I: worker task */
struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
int id; /* I: worker id */
};
/* /*
* The per-CPU workqueue (if single thread, we always use the first * The per-CPU workqueue (if single thread, we always use the first
* possible cpu). The lower WORK_STRUCT_FLAG_BITS of * possible cpu). The lower WORK_STRUCT_FLAG_BITS of
...@@ -58,15 +68,14 @@ struct cpu_workqueue_struct { ...@@ -58,15 +68,14 @@ struct cpu_workqueue_struct {
struct list_head worklist; struct list_head worklist;
wait_queue_head_t more_work; wait_queue_head_t more_work;
struct work_struct *current_work;
unsigned int cpu; unsigned int cpu;
struct worker *worker;
struct workqueue_struct *wq; /* I: the owning workqueue */ struct workqueue_struct *wq; /* I: the owning workqueue */
int work_color; /* L: current color */ int work_color; /* L: current color */
int flush_color; /* L: flushing color */ int flush_color; /* L: flushing color */
int nr_in_flight[WORK_NR_COLORS]; int nr_in_flight[WORK_NR_COLORS];
/* L: nr of in_flight works */ /* L: nr of in_flight works */
struct task_struct *thread;
}; };
/* /*
...@@ -214,6 +223,9 @@ static inline void debug_work_deactivate(struct work_struct *work) { } ...@@ -214,6 +223,9 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
/* Serializes the accesses to the list of workqueues. */ /* Serializes the accesses to the list of workqueues. */
static DEFINE_SPINLOCK(workqueue_lock); static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues); static LIST_HEAD(workqueues);
static DEFINE_PER_CPU(struct ida, worker_ida);
static int worker_thread(void *__worker);
static int singlethread_cpu __read_mostly; static int singlethread_cpu __read_mostly;
...@@ -428,6 +440,105 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, ...@@ -428,6 +440,105 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
} }
EXPORT_SYMBOL_GPL(queue_delayed_work_on); EXPORT_SYMBOL_GPL(queue_delayed_work_on);
static struct worker *alloc_worker(void)
{
struct worker *worker;
worker = kzalloc(sizeof(*worker), GFP_KERNEL);
return worker;
}
/**
* create_worker - create a new workqueue worker
* @cwq: cwq the new worker will belong to
* @bind: whether to set affinity to @cpu or not
*
* Create a new worker which is bound to @cwq. The returned worker
* can be started by calling start_worker() or destroyed using
* destroy_worker().
*
* CONTEXT:
* Might sleep. Does GFP_KERNEL allocations.
*
* RETURNS:
* Pointer to the newly created worker.
*/
static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
{
int id = -1;
struct worker *worker = NULL;
spin_lock(&workqueue_lock);
while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
spin_unlock(&workqueue_lock);
if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
goto fail;
spin_lock(&workqueue_lock);
}
spin_unlock(&workqueue_lock);
worker = alloc_worker();
if (!worker)
goto fail;
worker->cwq = cwq;
worker->id = id;
worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
cwq->cpu, id);
if (IS_ERR(worker->task))
goto fail;
if (bind)
kthread_bind(worker->task, cwq->cpu);
return worker;
fail:
if (id >= 0) {
spin_lock(&workqueue_lock);
ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
spin_unlock(&workqueue_lock);
}
kfree(worker);
return NULL;
}
/**
* start_worker - start a newly created worker
* @worker: worker to start
*
* Start @worker.
*
* CONTEXT:
* spin_lock_irq(cwq->lock).
*/
static void start_worker(struct worker *worker)
{
wake_up_process(worker->task);
}
/**
* destroy_worker - destroy a workqueue worker
* @worker: worker to be destroyed
*
* Destroy @worker.
*/
static void destroy_worker(struct worker *worker)
{
int cpu = worker->cwq->cpu;
int id = worker->id;
/* sanity check frenzy */
BUG_ON(worker->current_work);
kthread_stop(worker->task);
kfree(worker);
spin_lock(&workqueue_lock);
ida_remove(&per_cpu(worker_ida, cpu), id);
spin_unlock(&workqueue_lock);
}
/** /**
* cwq_dec_nr_in_flight - decrement cwq's nr_in_flight * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
* @cwq: cwq of interest * @cwq: cwq of interest
...@@ -468,7 +579,7 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color) ...@@ -468,7 +579,7 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
/** /**
* process_one_work - process single work * process_one_work - process single work
* @cwq: cwq to process work for * @worker: self
* @work: work to process * @work: work to process
* *
* Process @work. This function contains all the logics necessary to * Process @work. This function contains all the logics necessary to
...@@ -480,9 +591,9 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color) ...@@ -480,9 +591,9 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
* CONTEXT: * CONTEXT:
* spin_lock_irq(cwq->lock) which is released and regrabbed. * spin_lock_irq(cwq->lock) which is released and regrabbed.
*/ */
static void process_one_work(struct cpu_workqueue_struct *cwq, static void process_one_work(struct worker *worker, struct work_struct *work)
struct work_struct *work)
{ {
struct cpu_workqueue_struct *cwq = worker->cwq;
work_func_t f = work->func; work_func_t f = work->func;
int work_color; int work_color;
#ifdef CONFIG_LOCKDEP #ifdef CONFIG_LOCKDEP
...@@ -497,7 +608,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq, ...@@ -497,7 +608,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
#endif #endif
/* claim and process */ /* claim and process */
debug_work_deactivate(work); debug_work_deactivate(work);
cwq->current_work = work; worker->current_work = work;
work_color = get_work_color(work); work_color = get_work_color(work);
list_del_init(&work->entry); list_del_init(&work->entry);
...@@ -524,30 +635,33 @@ static void process_one_work(struct cpu_workqueue_struct *cwq, ...@@ -524,30 +635,33 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
spin_lock_irq(&cwq->lock); spin_lock_irq(&cwq->lock);
/* we're done with it, release */ /* we're done with it, release */
cwq->current_work = NULL; worker->current_work = NULL;
cwq_dec_nr_in_flight(cwq, work_color); cwq_dec_nr_in_flight(cwq, work_color);
} }
static void run_workqueue(struct cpu_workqueue_struct *cwq) static void run_workqueue(struct worker *worker)
{ {
struct cpu_workqueue_struct *cwq = worker->cwq;
spin_lock_irq(&cwq->lock); spin_lock_irq(&cwq->lock);
while (!list_empty(&cwq->worklist)) { while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry); struct work_struct, entry);
process_one_work(cwq, work); process_one_work(worker, work);
} }
spin_unlock_irq(&cwq->lock); spin_unlock_irq(&cwq->lock);
} }
/** /**
* worker_thread - the worker thread function * worker_thread - the worker thread function
* @__cwq: cwq to serve * @__worker: self
* *
* The cwq worker thread function. * The cwq worker thread function.
*/ */
static int worker_thread(void *__cwq) static int worker_thread(void *__worker)
{ {
struct cpu_workqueue_struct *cwq = __cwq; struct worker *worker = __worker;
struct cpu_workqueue_struct *cwq = worker->cwq;
DEFINE_WAIT(wait); DEFINE_WAIT(wait);
if (cwq->wq->flags & WQ_FREEZEABLE) if (cwq->wq->flags & WQ_FREEZEABLE)
...@@ -566,11 +680,11 @@ static int worker_thread(void *__cwq) ...@@ -566,11 +680,11 @@ static int worker_thread(void *__cwq)
if (kthread_should_stop()) if (kthread_should_stop())
break; break;
if (unlikely(!cpumask_equal(&cwq->thread->cpus_allowed, if (unlikely(!cpumask_equal(&worker->task->cpus_allowed,
get_cpu_mask(cwq->cpu)))) get_cpu_mask(cwq->cpu))))
set_cpus_allowed_ptr(cwq->thread, set_cpus_allowed_ptr(worker->task,
get_cpu_mask(cwq->cpu)); get_cpu_mask(cwq->cpu));
run_workqueue(cwq); run_workqueue(worker);
} }
return 0; return 0;
...@@ -873,7 +987,7 @@ int flush_work(struct work_struct *work) ...@@ -873,7 +987,7 @@ int flush_work(struct work_struct *work)
goto already_gone; goto already_gone;
prev = &work->entry; prev = &work->entry;
} else { } else {
if (cwq->current_work != work) if (!cwq->worker || cwq->worker->current_work != work)
goto already_gone; goto already_gone;
prev = &cwq->worklist; prev = &cwq->worklist;
} }
...@@ -937,7 +1051,7 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq, ...@@ -937,7 +1051,7 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
int running = 0; int running = 0;
spin_lock_irq(&cwq->lock); spin_lock_irq(&cwq->lock);
if (unlikely(cwq->current_work == work)) { if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
insert_wq_barrier(cwq, &barr, cwq->worklist.next); insert_wq_barrier(cwq, &barr, cwq->worklist.next);
running = 1; running = 1;
} }
...@@ -1225,7 +1339,7 @@ int current_is_keventd(void) ...@@ -1225,7 +1339,7 @@ int current_is_keventd(void)
BUG_ON(!keventd_wq); BUG_ON(!keventd_wq);
cwq = get_cwq(cpu, keventd_wq); cwq = get_cwq(cpu, keventd_wq);
if (current == cwq->thread) if (current == cwq->worker->task)
ret = 1; ret = 1;
return ret; return ret;
...@@ -1279,38 +1393,6 @@ static void free_cwqs(struct cpu_workqueue_struct *cwqs) ...@@ -1279,38 +1393,6 @@ static void free_cwqs(struct cpu_workqueue_struct *cwqs)
#endif #endif
} }
static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct workqueue_struct *wq = cwq->wq;
struct task_struct *p;
p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
/*
* Nobody can add the work_struct to this cwq,
* if (caller is __create_workqueue)
* nobody should see this wq
* else // caller is CPU_UP_PREPARE
* cpu is not on cpu_online_map
* so we can abort safely.
*/
if (IS_ERR(p))
return PTR_ERR(p);
cwq->thread = p;
return 0;
}
static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
{
struct task_struct *p = cwq->thread;
if (p != NULL) {
if (cpu >= 0)
kthread_bind(p, cpu);
wake_up_process(p);
}
}
struct workqueue_struct *__create_workqueue_key(const char *name, struct workqueue_struct *__create_workqueue_key(const char *name,
unsigned int flags, unsigned int flags,
struct lock_class_key *key, struct lock_class_key *key,
...@@ -1318,7 +1400,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name, ...@@ -1318,7 +1400,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
{ {
bool singlethread = flags & WQ_SINGLE_THREAD; bool singlethread = flags & WQ_SINGLE_THREAD;
struct workqueue_struct *wq; struct workqueue_struct *wq;
int err = 0, cpu; bool failed = false;
unsigned int cpu;
wq = kzalloc(sizeof(*wq), GFP_KERNEL); wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq) if (!wq)
...@@ -1348,20 +1431,21 @@ struct workqueue_struct *__create_workqueue_key(const char *name, ...@@ -1348,20 +1431,21 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK); BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
cwq->wq = wq;
cwq->cpu = cpu; cwq->cpu = cpu;
cwq->wq = wq;
cwq->flush_color = -1; cwq->flush_color = -1;
spin_lock_init(&cwq->lock); spin_lock_init(&cwq->lock);
INIT_LIST_HEAD(&cwq->worklist); INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->more_work);
if (err) if (failed)
continue; continue;
err = create_workqueue_thread(cwq, cpu); cwq->worker = create_worker(cwq,
if (cpu_online(cpu) && !singlethread) cpu_online(cpu) && !singlethread);
start_workqueue_thread(cwq, cpu); if (cwq->worker)
start_worker(cwq->worker);
else else
start_workqueue_thread(cwq, -1); failed = true;
} }
spin_lock(&workqueue_lock); spin_lock(&workqueue_lock);
...@@ -1370,7 +1454,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name, ...@@ -1370,7 +1454,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
cpu_maps_update_done(); cpu_maps_update_done();
if (err) { if (failed) {
destroy_workqueue(wq); destroy_workqueue(wq);
wq = NULL; wq = NULL;
} }
...@@ -1406,9 +1490,9 @@ void destroy_workqueue(struct workqueue_struct *wq) ...@@ -1406,9 +1490,9 @@ void destroy_workqueue(struct workqueue_struct *wq)
struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq); struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
int i; int i;
if (cwq->thread) { if (cwq->worker) {
kthread_stop(cwq->thread); destroy_worker(cwq->worker);
cwq->thread = NULL; cwq->worker = NULL;
} }
for (i = 0; i < WORK_NR_COLORS; i++) for (i = 0; i < WORK_NR_COLORS; i++)
...@@ -1495,6 +1579,11 @@ EXPORT_SYMBOL_GPL(work_on_cpu); ...@@ -1495,6 +1579,11 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
void __init init_workqueues(void) void __init init_workqueues(void)
{ {
unsigned int cpu;
for_each_possible_cpu(cpu)
ida_init(&per_cpu(worker_ida, cpu));
singlethread_cpu = cpumask_first(cpu_possible_mask); singlethread_cpu = cpumask_first(cpu_possible_mask);
hotcpu_notifier(workqueue_cpu_callback, 0); hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events"); keventd_wq = create_workqueue("events");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment