Commit bf9e688d authored by Andrew Morton's avatar Andrew Morton Committed by Linus Torvalds

[PATCH] create singlethread_workqueue()

From: Rusty Russell <rusty@rustcorp.com.au>

Workqueues are a great primitive for running things from user context from
a completely clean environment.  Unfortunately, they currently insist on
creating one thread per CPU, which is overkill for many situations, so the
more generic keventd workqueue is used for these.  Recently deadlocks using
keventd were demonstrated, showing that it is not suitable for all uses.

1) Clean up CPU iterators.  Always a nice touch.

2) Add __create_workqueue() and create_singlethread_workqueue(),
   keeping source compatibility.

3) Put workqueues in workqueue list even if !CONFIG_HOTPLUG_CPU (means
   we need a lock to protect that list).  Now we can tell if a wq is
   single-threaded using list_empty(&wq->list).

4) For single-threaded workqueues, override CPU in queue_work,
   delayed_work_timer_fn and flush_workqueue to be 0.  flush_workqueue
   now does redundant passes for single-threaded workqueues, but the
   code remains simple.

5) Make create_workqueue_thread return the thread, so we can easily
   kthread_bind for multi-threaded workqueues.


akpm fixes:

- Fix up is_single_threaded() handling

- single-threaded wq thread does not have "/0" appended.
parent 18d0f151
...@@ -49,7 +49,11 @@ struct work_struct { ...@@ -49,7 +49,11 @@ struct work_struct {
init_timer(&(_work)->timer); \ init_timer(&(_work)->timer); \
} while (0) } while (0)
extern struct workqueue_struct *create_workqueue(const char *name); extern struct workqueue_struct *__create_workqueue(const char *name,
int singlethread);
#define create_workqueue(name) __create_workqueue((name), 0)
#define create_singlethread_workqueue(name) __create_workqueue((name), 1)
extern void destroy_workqueue(struct workqueue_struct *wq); extern void destroy_workqueue(struct workqueue_struct *wq);
extern int FASTCALL(queue_work(struct workqueue_struct *wq, struct work_struct *work)); extern int FASTCALL(queue_work(struct workqueue_struct *wq, struct work_struct *work));
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#include <linux/kthread.h> #include <linux/kthread.h>
/* /*
* The per-CPU workqueue. * The per-CPU workqueue (if single thread, we always use cpu 0's).
* *
* The sequence counters are for flush_scheduled_work(). It wants to wait * The sequence counters are for flush_scheduled_work(). It wants to wait
* until until all currently-scheduled works are completed, but it doesn't * until until all currently-scheduled works are completed, but it doesn't
...@@ -59,20 +59,19 @@ struct cpu_workqueue_struct { ...@@ -59,20 +59,19 @@ struct cpu_workqueue_struct {
struct workqueue_struct { struct workqueue_struct {
struct cpu_workqueue_struct cpu_wq[NR_CPUS]; struct cpu_workqueue_struct cpu_wq[NR_CPUS];
const char *name; const char *name;
struct list_head list; struct list_head list; /* Empty if single thread */
}; };
#ifdef CONFIG_HOTPLUG_CPU /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
/* All the workqueues on the system, for hotplug cpu to add/remove threads to each one as cpus come/go. */
threads to each one as cpus come/go. Protected by cpucontrol static spinlock_t workqueue_lock = SPIN_LOCK_UNLOCKED;
sem. */
static LIST_HEAD(workqueues); static LIST_HEAD(workqueues);
#define add_workqueue(wq) list_add(&(wq)->list, &workqueues)
#define del_workqueue(wq) list_del(&(wq)->list) /* If it's single threaded, it isn't in the list of workqueues. */
#else static inline int is_single_threaded(struct workqueue_struct *wq)
#define add_workqueue(wq) {
#define del_workqueue(wq) return list_empty(&wq->list);
#endif /* CONFIG_HOTPLUG_CPU */ }
/* Preempt must be disabled. */ /* Preempt must be disabled. */
static void __queue_work(struct cpu_workqueue_struct *cwq, static void __queue_work(struct cpu_workqueue_struct *cwq,
...@@ -100,6 +99,8 @@ int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) ...@@ -100,6 +99,8 @@ int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
int ret = 0, cpu = get_cpu(); int ret = 0, cpu = get_cpu();
if (!test_and_set_bit(0, &work->pending)) { if (!test_and_set_bit(0, &work->pending)) {
if (unlikely(is_single_threaded(wq)))
cpu = 0;
BUG_ON(!list_empty(&work->entry)); BUG_ON(!list_empty(&work->entry));
__queue_work(wq->cpu_wq + cpu, work); __queue_work(wq->cpu_wq + cpu, work);
ret = 1; ret = 1;
...@@ -112,8 +113,12 @@ static void delayed_work_timer_fn(unsigned long __data) ...@@ -112,8 +113,12 @@ static void delayed_work_timer_fn(unsigned long __data)
{ {
struct work_struct *work = (struct work_struct *)__data; struct work_struct *work = (struct work_struct *)__data;
struct workqueue_struct *wq = work->wq_data; struct workqueue_struct *wq = work->wq_data;
int cpu = smp_processor_id();
__queue_work(wq->cpu_wq + smp_processor_id(), work); if (unlikely(is_single_threaded(wq)))
cpu = 0;
__queue_work(wq->cpu_wq + cpu, work);
} }
int fastcall queue_delayed_work(struct workqueue_struct *wq, int fastcall queue_delayed_work(struct workqueue_struct *wq,
...@@ -234,12 +239,13 @@ void fastcall flush_workqueue(struct workqueue_struct *wq) ...@@ -234,12 +239,13 @@ void fastcall flush_workqueue(struct workqueue_struct *wq)
might_sleep(); might_sleep();
lock_cpu_hotplug(); lock_cpu_hotplug();
for (cpu = 0; cpu < NR_CPUS; cpu++) { for_each_online_cpu(cpu) {
DEFINE_WAIT(wait); DEFINE_WAIT(wait);
long sequence_needed; long sequence_needed;
if (!cpu_online(cpu)) if (is_single_threaded(wq))
continue; cwq = wq->cpu_wq + 0; /* Always use cpu 0's area. */
else
cwq = wq->cpu_wq + cpu; cwq = wq->cpu_wq + cpu;
if (cwq->thread == current) { if (cwq->thread == current) {
...@@ -266,7 +272,8 @@ void fastcall flush_workqueue(struct workqueue_struct *wq) ...@@ -266,7 +272,8 @@ void fastcall flush_workqueue(struct workqueue_struct *wq)
unlock_cpu_hotplug(); unlock_cpu_hotplug();
} }
static int create_workqueue_thread(struct workqueue_struct *wq, int cpu) static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
int cpu)
{ {
struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu; struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
struct task_struct *p; struct task_struct *p;
...@@ -280,18 +287,22 @@ static int create_workqueue_thread(struct workqueue_struct *wq, int cpu) ...@@ -280,18 +287,22 @@ static int create_workqueue_thread(struct workqueue_struct *wq, int cpu)
init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->more_work);
init_waitqueue_head(&cwq->work_done); init_waitqueue_head(&cwq->work_done);
if (is_single_threaded(wq))
p = kthread_create(worker_thread, cwq, "%s", wq->name);
else
p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
if (IS_ERR(p)) if (IS_ERR(p))
return PTR_ERR(p); return NULL;
cwq->thread = p; cwq->thread = p;
kthread_bind(p, cpu); return p;
return 0;
} }
struct workqueue_struct *create_workqueue(const char *name) struct workqueue_struct *__create_workqueue(const char *name,
int singlethread)
{ {
int cpu, destroy = 0; int cpu, destroy = 0;
struct workqueue_struct *wq; struct workqueue_struct *wq;
struct task_struct *p;
BUG_ON(strlen(name) > 10); BUG_ON(strlen(name) > 10);
...@@ -303,15 +314,26 @@ struct workqueue_struct *create_workqueue(const char *name) ...@@ -303,15 +314,26 @@ struct workqueue_struct *create_workqueue(const char *name)
wq->name = name; wq->name = name;
/* We don't need the distraction of CPUs appearing and vanishing. */ /* We don't need the distraction of CPUs appearing and vanishing. */
lock_cpu_hotplug(); lock_cpu_hotplug();
for (cpu = 0; cpu < NR_CPUS; cpu++) { if (singlethread) {
if (!cpu_online(cpu)) INIT_LIST_HEAD(&wq->list);
continue; p = create_workqueue_thread(wq, 0);
if (create_workqueue_thread(wq, cpu) < 0) if (!p)
destroy = 1; destroy = 1;
else else
wake_up_process(wq->cpu_wq[cpu].thread); wake_up_process(p);
} else {
spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
spin_unlock_irq(&workqueue_lock);
for_each_online_cpu(cpu) {
p = create_workqueue_thread(wq, cpu);
if (p) {
kthread_bind(p, cpu);
wake_up_process(p);
} else
destroy = 1;
}
} }
add_workqueue(wq);
/* /*
* Was there any error during startup? If yes then clean up: * Was there any error during startup? If yes then clean up:
...@@ -347,11 +369,15 @@ void destroy_workqueue(struct workqueue_struct *wq) ...@@ -347,11 +369,15 @@ void destroy_workqueue(struct workqueue_struct *wq)
/* We don't need the distraction of CPUs appearing and vanishing. */ /* We don't need the distraction of CPUs appearing and vanishing. */
lock_cpu_hotplug(); lock_cpu_hotplug();
for (cpu = 0; cpu < NR_CPUS; cpu++) { if (is_single_threaded(wq))
if (cpu_online(cpu)) cleanup_workqueue_thread(wq, 0);
else {
for_each_online_cpu(cpu)
cleanup_workqueue_thread(wq, cpu); cleanup_workqueue_thread(wq, cpu);
spin_lock(&workqueue_lock);
list_del(&wq->list);
spin_unlock_irq(&workqueue_lock);
} }
del_workqueue(wq);
unlock_cpu_hotplug(); unlock_cpu_hotplug();
kfree(wq); kfree(wq);
} }
...@@ -467,7 +493,7 @@ void init_workqueues(void) ...@@ -467,7 +493,7 @@ void init_workqueues(void)
BUG_ON(!keventd_wq); BUG_ON(!keventd_wq);
} }
EXPORT_SYMBOL_GPL(create_workqueue); EXPORT_SYMBOL_GPL(__create_workqueue);
EXPORT_SYMBOL_GPL(queue_work); EXPORT_SYMBOL_GPL(queue_work);
EXPORT_SYMBOL_GPL(queue_delayed_work); EXPORT_SYMBOL_GPL(queue_delayed_work);
EXPORT_SYMBOL_GPL(flush_workqueue); EXPORT_SYMBOL_GPL(flush_workqueue);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment