Commit 05f0fe6b authored by Tejun Heo's avatar Tejun Heo

RCU, workqueue: Implement rcu_work

There are cases where RCU callback needs to be bounced to a sleepable
context.  This is currently done by the RCU callback queueing a work
item, which can be cumbersome to write and confusing to read.

This patch introduces rcu_work, a workqueue work variant which gets
executed after a RCU grace period, and converts the open coded
bouncing in fs/aio and kernel/cgroup.

v3: Dropped queue_rcu_work_on().  Documented rcu grace period behavior
    after queue_rcu_work().

v2: Use rcu_barrier() instead of synchronize_rcu() to wait for
    completion of previously queued rcu callback as per Paul.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
Acked-by: default avatar"Paul E. McKenney" <paulmck@linux.vnet.ibm.com>
Cc: Linus Torvalds <torvalds@linux-foundation.org>
parent c698ca52
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include <linux/threads.h> #include <linux/threads.h>
#include <linux/atomic.h> #include <linux/atomic.h>
#include <linux/cpumask.h> #include <linux/cpumask.h>
#include <linux/rcupdate.h>
struct workqueue_struct; struct workqueue_struct;
...@@ -120,6 +121,14 @@ struct delayed_work { ...@@ -120,6 +121,14 @@ struct delayed_work {
int cpu; int cpu;
}; };
struct rcu_work {
struct work_struct work;
struct rcu_head rcu;
/* target workqueue ->rcu uses to queue ->work */
struct workqueue_struct *wq;
};
/** /**
* struct workqueue_attrs - A struct for workqueue attributes. * struct workqueue_attrs - A struct for workqueue attributes.
* *
...@@ -151,6 +160,11 @@ static inline struct delayed_work *to_delayed_work(struct work_struct *work) ...@@ -151,6 +160,11 @@ static inline struct delayed_work *to_delayed_work(struct work_struct *work)
return container_of(work, struct delayed_work, work); return container_of(work, struct delayed_work, work);
} }
static inline struct rcu_work *to_rcu_work(struct work_struct *work)
{
return container_of(work, struct rcu_work, work);
}
struct execute_work { struct execute_work {
struct work_struct work; struct work_struct work;
}; };
...@@ -266,6 +280,12 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; } ...@@ -266,6 +280,12 @@ static inline unsigned int work_static(struct work_struct *work) { return 0; }
#define INIT_DEFERRABLE_WORK_ONSTACK(_work, _func) \ #define INIT_DEFERRABLE_WORK_ONSTACK(_work, _func) \
__INIT_DELAYED_WORK_ONSTACK(_work, _func, TIMER_DEFERRABLE) __INIT_DELAYED_WORK_ONSTACK(_work, _func, TIMER_DEFERRABLE)
#define INIT_RCU_WORK(_work, _func) \
INIT_WORK(&(_work)->work, (_func))
#define INIT_RCU_WORK_ONSTACK(_work, _func) \
INIT_WORK_ONSTACK(&(_work)->work, (_func))
/** /**
* work_pending - Find out whether a work item is currently pending * work_pending - Find out whether a work item is currently pending
* @work: The work item in question * @work: The work item in question
...@@ -447,6 +467,7 @@ extern bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq, ...@@ -447,6 +467,7 @@ extern bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *work, unsigned long delay); struct delayed_work *work, unsigned long delay);
extern bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq, extern bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay); struct delayed_work *dwork, unsigned long delay);
extern bool queue_rcu_work(struct workqueue_struct *wq, struct rcu_work *rwork);
extern void flush_workqueue(struct workqueue_struct *wq); extern void flush_workqueue(struct workqueue_struct *wq);
extern void drain_workqueue(struct workqueue_struct *wq); extern void drain_workqueue(struct workqueue_struct *wq);
...@@ -463,6 +484,8 @@ extern bool flush_delayed_work(struct delayed_work *dwork); ...@@ -463,6 +484,8 @@ extern bool flush_delayed_work(struct delayed_work *dwork);
extern bool cancel_delayed_work(struct delayed_work *dwork); extern bool cancel_delayed_work(struct delayed_work *dwork);
extern bool cancel_delayed_work_sync(struct delayed_work *dwork); extern bool cancel_delayed_work_sync(struct delayed_work *dwork);
extern bool flush_rcu_work(struct rcu_work *rwork);
extern void workqueue_set_max_active(struct workqueue_struct *wq, extern void workqueue_set_max_active(struct workqueue_struct *wq,
int max_active); int max_active);
extern struct work_struct *current_work(void); extern struct work_struct *current_work(void);
......
...@@ -1604,6 +1604,40 @@ bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq, ...@@ -1604,6 +1604,40 @@ bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
} }
EXPORT_SYMBOL_GPL(mod_delayed_work_on); EXPORT_SYMBOL_GPL(mod_delayed_work_on);
static void rcu_work_rcufn(struct rcu_head *rcu)
{
struct rcu_work *rwork = container_of(rcu, struct rcu_work, rcu);
/* read the comment in __queue_work() */
local_irq_disable();
__queue_work(WORK_CPU_UNBOUND, rwork->wq, &rwork->work);
local_irq_enable();
}
/**
* queue_rcu_work - queue work after a RCU grace period
* @wq: workqueue to use
* @rwork: work to queue
*
* Return: %false if @rwork was already pending, %true otherwise. Note
* that a full RCU grace period is guaranteed only after a %true return.
* While @rwork is guarnateed to be executed after a %false return, the
* execution may happen before a full RCU grace period has passed.
*/
bool queue_rcu_work(struct workqueue_struct *wq, struct rcu_work *rwork)
{
struct work_struct *work = &rwork->work;
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
rwork->wq = wq;
call_rcu(&rwork->rcu, rcu_work_rcufn);
return true;
}
return false;
}
EXPORT_SYMBOL(queue_rcu_work);
/** /**
* worker_enter_idle - enter idle state * worker_enter_idle - enter idle state
* @worker: worker which is entering idle state * @worker: worker which is entering idle state
...@@ -3001,6 +3035,26 @@ bool flush_delayed_work(struct delayed_work *dwork) ...@@ -3001,6 +3035,26 @@ bool flush_delayed_work(struct delayed_work *dwork)
} }
EXPORT_SYMBOL(flush_delayed_work); EXPORT_SYMBOL(flush_delayed_work);
/**
* flush_rcu_work - wait for a rwork to finish executing the last queueing
* @rwork: the rcu work to flush
*
* Return:
* %true if flush_rcu_work() waited for the work to finish execution,
* %false if it was already idle.
*/
bool flush_rcu_work(struct rcu_work *rwork)
{
if (test_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&rwork->work))) {
rcu_barrier();
flush_work(&rwork->work);
return true;
} else {
return flush_work(&rwork->work);
}
}
EXPORT_SYMBOL(flush_rcu_work);
static bool __cancel_work(struct work_struct *work, bool is_dwork) static bool __cancel_work(struct work_struct *work, bool is_dwork)
{ {
unsigned long flags; unsigned long flags;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment