Commit f292e873 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'io_uring-5.12-2021-03-05' of git://git.kernel.dk/linux-block

Pull io_uring fixes from Jens Axboe:
 "A bit of a mix between fallout from the worker change, cleanups and
  reductions now possible from that change, and fixes in general. In
  detail:

   - Fully serialize manager and worker creation, fixing races due to
     that.

   - Clean up some naming that had gone stale.

   - SQPOLL fixes.

   - Fix race condition around task_work rework that went into this
     merge window.

   - Implement unshare. Used for when the original task does unshare(2)
     or setuid/seteuid and friends, drops the original workers and forks
     new ones.

   - Drop the only remaining piece of state shuffling we had left, which
     was cred. Move it into issue instead, and we can drop all of that
     code too.

   - Kill f_op->flush() usage. That was such a nasty hack that we had
     out of necessity, we no longer need it.

   - Following from ->flush() removal, we can also drop various bits of
     ctx state related to SQPOLL and cancelations.

   - Fix an issue with IOPOLL retry, which originally was fallout from a
     filemap change (removing iov_iter_revert()), but uncovered an issue
     with iovec re-import too late.

   - Fix an issue with system suspend.

   - Use xchg() for fallback work, instead of cmpxchg().

   - Properly destroy io-wq on exec.

   - Add create_io_thread() core helper, and use that in io-wq and
     io_uring. This allows us to remove various silly completion events
     related to thread setup.

   - A few error handling fixes.

  This should be the grunt of fixes necessary for the new workers, next
  week should be quieter. We've got a pending series from Pavel on
  cancelations, and how tasks and rings are indexed. Outside of that,
  should just be minor fixes. Even with these fixes, we're still killing
  a net ~80 lines"

* tag 'io_uring-5.12-2021-03-05' of git://git.kernel.dk/linux-block: (41 commits)
  io_uring: don't restrict issue_flags for io_openat
  io_uring: make SQPOLL thread parking saner
  io-wq: kill hashed waitqueue before manager exits
  io_uring: clear IOCB_WAITQ for non -EIOCBQUEUED return
  io_uring: don't keep looping for more events if we can't flush overflow
  io_uring: move to using create_io_thread()
  kernel: provide create_io_thread() helper
  io_uring: reliably cancel linked timeouts
  io_uring: cancel-match based on flags
  io-wq: ensure all pending work is canceled on exit
  io_uring: ensure that threads freeze on suspend
  io_uring: remove extra in_idle wake up
  io_uring: inline __io_queue_async_work()
  io_uring: inline io_req_clean_work()
  io_uring: choose right tctx->io_wq for try cancel
  io_uring: fix -EAGAIN retry with IOPOLL
  io-wq: fix error path leak of buffered write hash map
  io_uring: remove sqo_task
  io_uring: kill sqo_dead and sqo submission halting
  io_uring: ignore double poll add on the same waitqueue head
  ...
parents 6d47254c e45cff58
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <linux/rculist_nulls.h> #include <linux/rculist_nulls.h>
#include <linux/cpu.h> #include <linux/cpu.h>
#include <linux/tracehook.h> #include <linux/tracehook.h>
#include <linux/freezer.h>
#include "../kernel/sched/sched.h" #include "../kernel/sched/sched.h"
#include "io-wq.h" #include "io-wq.h"
...@@ -52,9 +53,6 @@ struct io_worker { ...@@ -52,9 +53,6 @@ struct io_worker {
struct io_wq_work *cur_work; struct io_wq_work *cur_work;
spinlock_t lock; spinlock_t lock;
const struct cred *cur_creds;
const struct cred *saved_creds;
struct completion ref_done; struct completion ref_done;
struct rcu_head rcu; struct rcu_head rcu;
...@@ -117,7 +115,10 @@ struct io_wq { ...@@ -117,7 +115,10 @@ struct io_wq {
struct io_wq_hash *hash; struct io_wq_hash *hash;
refcount_t refs; refcount_t refs;
struct completion done; struct completion exited;
atomic_t worker_refs;
struct completion worker_done;
struct hlist_node cpuhp_node; struct hlist_node cpuhp_node;
...@@ -126,6 +127,17 @@ struct io_wq { ...@@ -126,6 +127,17 @@ struct io_wq {
static enum cpuhp_state io_wq_online; static enum cpuhp_state io_wq_online;
struct io_cb_cancel_data {
work_cancel_fn *fn;
void *data;
int nr_running;
int nr_pending;
bool cancel_all;
};
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
struct io_cb_cancel_data *match);
static bool io_worker_get(struct io_worker *worker) static bool io_worker_get(struct io_worker *worker)
{ {
return refcount_inc_not_zero(&worker->ref); return refcount_inc_not_zero(&worker->ref);
...@@ -175,11 +187,6 @@ static void io_worker_exit(struct io_worker *worker) ...@@ -175,11 +187,6 @@ static void io_worker_exit(struct io_worker *worker)
worker->flags = 0; worker->flags = 0;
preempt_enable(); preempt_enable();
if (worker->saved_creds) {
revert_creds(worker->saved_creds);
worker->cur_creds = worker->saved_creds = NULL;
}
raw_spin_lock_irq(&wqe->lock); raw_spin_lock_irq(&wqe->lock);
if (flags & IO_WORKER_F_FREE) if (flags & IO_WORKER_F_FREE)
hlist_nulls_del_rcu(&worker->nulls_node); hlist_nulls_del_rcu(&worker->nulls_node);
...@@ -188,7 +195,9 @@ static void io_worker_exit(struct io_worker *worker) ...@@ -188,7 +195,9 @@ static void io_worker_exit(struct io_worker *worker)
raw_spin_unlock_irq(&wqe->lock); raw_spin_unlock_irq(&wqe->lock);
kfree_rcu(worker, rcu); kfree_rcu(worker, rcu);
io_wq_put(wqe->wq); if (atomic_dec_and_test(&wqe->wq->worker_refs))
complete(&wqe->wq->worker_done);
do_exit(0);
} }
static inline bool io_wqe_run_queue(struct io_wqe *wqe) static inline bool io_wqe_run_queue(struct io_wqe *wqe)
...@@ -263,12 +272,6 @@ static void io_wqe_dec_running(struct io_worker *worker) ...@@ -263,12 +272,6 @@ static void io_wqe_dec_running(struct io_worker *worker)
io_wqe_wake_worker(wqe, acct); io_wqe_wake_worker(wqe, acct);
} }
static void io_worker_start(struct io_worker *worker)
{
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
io_wqe_inc_running(worker);
}
/* /*
* Worker will start processing some work. Move it to the busy list, if * Worker will start processing some work. Move it to the busy list, if
* it's currently on the freelist * it's currently on the freelist
...@@ -319,10 +322,6 @@ static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) ...@@ -319,10 +322,6 @@ static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
worker->flags |= IO_WORKER_F_FREE; worker->flags |= IO_WORKER_F_FREE;
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
} }
if (worker->saved_creds) {
revert_creds(worker->saved_creds);
worker->cur_creds = worker->saved_creds = NULL;
}
} }
static inline unsigned int io_get_work_hash(struct io_wq_work *work) static inline unsigned int io_get_work_hash(struct io_wq_work *work)
...@@ -397,18 +396,6 @@ static void io_flush_signals(void) ...@@ -397,18 +396,6 @@ static void io_flush_signals(void)
} }
} }
static void io_wq_switch_creds(struct io_worker *worker,
struct io_wq_work *work)
{
const struct cred *old_creds = override_creds(work->creds);
worker->cur_creds = work->creds;
if (worker->saved_creds)
put_cred(old_creds); /* creds set by previous switch */
else
worker->saved_creds = old_creds;
}
static void io_assign_current_work(struct io_worker *worker, static void io_assign_current_work(struct io_worker *worker,
struct io_wq_work *work) struct io_wq_work *work)
{ {
...@@ -458,8 +445,6 @@ static void io_worker_handle_work(struct io_worker *worker) ...@@ -458,8 +445,6 @@ static void io_worker_handle_work(struct io_worker *worker)
unsigned int hash = io_get_work_hash(work); unsigned int hash = io_get_work_hash(work);
next_hashed = wq_next_work(work); next_hashed = wq_next_work(work);
if (work->creds && worker->cur_creds != work->creds)
io_wq_switch_creds(worker, work);
wq->do_work(work); wq->do_work(work);
io_assign_current_work(worker, NULL); io_assign_current_work(worker, NULL);
...@@ -495,8 +480,13 @@ static int io_wqe_worker(void *data) ...@@ -495,8 +480,13 @@ static int io_wqe_worker(void *data)
struct io_worker *worker = data; struct io_worker *worker = data;
struct io_wqe *wqe = worker->wqe; struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq; struct io_wq *wq = wqe->wq;
char buf[TASK_COMM_LEN];
io_worker_start(worker); worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
io_wqe_inc_running(worker);
sprintf(buf, "iou-wrk-%d", wq->task_pid);
set_task_comm(current, buf);
while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
...@@ -571,67 +561,11 @@ void io_wq_worker_sleeping(struct task_struct *tsk) ...@@ -571,67 +561,11 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
raw_spin_unlock_irq(&worker->wqe->lock); raw_spin_unlock_irq(&worker->wqe->lock);
} }
static int task_thread(void *data, int index)
{
struct io_worker *worker = data;
struct io_wqe *wqe = worker->wqe;
struct io_wqe_acct *acct = &wqe->acct[index];
struct io_wq *wq = wqe->wq;
char buf[TASK_COMM_LEN];
sprintf(buf, "iou-wrk-%d", wq->task_pid);
set_task_comm(current, buf);
current->pf_io_worker = worker;
worker->task = current;
set_cpus_allowed_ptr(current, cpumask_of_node(wqe->node));
current->flags |= PF_NO_SETAFFINITY;
raw_spin_lock_irq(&wqe->lock);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
list_add_tail_rcu(&worker->all_list, &wqe->all_list);
worker->flags |= IO_WORKER_F_FREE;
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
worker->flags |= IO_WORKER_F_FIXED;
acct->nr_workers++;
raw_spin_unlock_irq(&wqe->lock);
io_wqe_worker(data);
do_exit(0);
}
static int task_thread_bound(void *data)
{
return task_thread(data, IO_WQ_ACCT_BOUND);
}
static int task_thread_unbound(void *data)
{
return task_thread(data, IO_WQ_ACCT_UNBOUND);
}
pid_t io_wq_fork_thread(int (*fn)(void *), void *arg)
{
unsigned long flags = CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|
CLONE_IO|SIGCHLD;
struct kernel_clone_args args = {
.flags = ((lower_32_bits(flags) | CLONE_VM |
CLONE_UNTRACED) & ~CSIGNAL),
.exit_signal = (lower_32_bits(flags) & CSIGNAL),
.stack = (unsigned long)fn,
.stack_size = (unsigned long)arg,
};
return kernel_clone(&args);
}
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
{ {
struct io_wqe_acct *acct = &wqe->acct[index];
struct io_worker *worker; struct io_worker *worker;
pid_t pid; struct task_struct *tsk;
__set_current_state(TASK_RUNNING); __set_current_state(TASK_RUNNING);
...@@ -645,17 +579,32 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) ...@@ -645,17 +579,32 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
spin_lock_init(&worker->lock); spin_lock_init(&worker->lock);
init_completion(&worker->ref_done); init_completion(&worker->ref_done);
refcount_inc(&wq->refs); atomic_inc(&wq->worker_refs);
if (index == IO_WQ_ACCT_BOUND) tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
pid = io_wq_fork_thread(task_thread_bound, worker); if (IS_ERR(tsk)) {
else if (atomic_dec_and_test(&wq->worker_refs))
pid = io_wq_fork_thread(task_thread_unbound, worker); complete(&wq->worker_done);
if (pid < 0) {
io_wq_put(wq);
kfree(worker); kfree(worker);
return false; return false;
} }
tsk->pf_io_worker = worker;
worker->task = tsk;
set_cpus_allowed_ptr(tsk, cpumask_of_node(wqe->node));
tsk->flags |= PF_NOFREEZE | PF_NO_SETAFFINITY;
raw_spin_lock_irq(&wqe->lock);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
list_add_tail_rcu(&worker->all_list, &wqe->all_list);
worker->flags |= IO_WORKER_F_FREE;
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
worker->flags |= IO_WORKER_F_FIXED;
acct->nr_workers++;
raw_spin_unlock_irq(&wqe->lock);
wake_up_new_task(tsk);
return true; return true;
} }
...@@ -664,6 +613,8 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) ...@@ -664,6 +613,8 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
{ {
struct io_wqe_acct *acct = &wqe->acct[index]; struct io_wqe_acct *acct = &wqe->acct[index];
if (acct->nr_workers && test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state))
return false;
/* if we have available workers or no work, no need */ /* if we have available workers or no work, no need */
if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
return false; return false;
...@@ -697,6 +648,7 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe, ...@@ -697,6 +648,7 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe,
static bool io_wq_worker_wake(struct io_worker *worker, void *data) static bool io_wq_worker_wake(struct io_worker *worker, void *data)
{ {
set_notify_signal(worker->task);
wake_up_process(worker->task); wake_up_process(worker->task);
return false; return false;
} }
...@@ -725,6 +677,23 @@ static void io_wq_check_workers(struct io_wq *wq) ...@@ -725,6 +677,23 @@ static void io_wq_check_workers(struct io_wq *wq)
} }
} }
static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
return true;
}
static void io_wq_cancel_pending(struct io_wq *wq)
{
struct io_cb_cancel_data match = {
.fn = io_wq_work_match_all,
.cancel_all = true,
};
int node;
for_each_node(node)
io_wqe_cancel_pending_work(wq->wqes[node], &match);
}
/* /*
* Manager thread. Tasked with creating new workers, if we need them. * Manager thread. Tasked with creating new workers, if we need them.
*/ */
...@@ -732,25 +701,38 @@ static int io_wq_manager(void *data) ...@@ -732,25 +701,38 @@ static int io_wq_manager(void *data)
{ {
struct io_wq *wq = data; struct io_wq *wq = data;
char buf[TASK_COMM_LEN]; char buf[TASK_COMM_LEN];
int node;
sprintf(buf, "iou-mgr-%d", wq->task_pid); sprintf(buf, "iou-mgr-%d", wq->task_pid);
set_task_comm(current, buf); set_task_comm(current, buf);
current->flags |= PF_IO_WORKER;
wq->manager = current;
complete(&wq->done);
do { do {
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
io_wq_check_workers(wq); io_wq_check_workers(wq);
schedule_timeout(HZ); schedule_timeout(HZ);
try_to_freeze();
if (fatal_signal_pending(current)) if (fatal_signal_pending(current))
set_bit(IO_WQ_BIT_EXIT, &wq->state); set_bit(IO_WQ_BIT_EXIT, &wq->state);
} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)); } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
io_wq_check_workers(wq); io_wq_check_workers(wq);
wq->manager = NULL;
io_wq_put(wq); rcu_read_lock();
for_each_node(node)
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
rcu_read_unlock();
/* we might not ever have created any workers */
if (atomic_read(&wq->worker_refs))
wait_for_completion(&wq->worker_done);
spin_lock_irq(&wq->hash->wait.lock);
for_each_node(node)
list_del_init(&wq->wqes[node]->wait.entry);
spin_unlock_irq(&wq->hash->wait.lock);
io_wq_cancel_pending(wq);
complete(&wq->exited);
do_exit(0); do_exit(0);
} }
...@@ -787,23 +769,20 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work) ...@@ -787,23 +769,20 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
static int io_wq_fork_manager(struct io_wq *wq) static int io_wq_fork_manager(struct io_wq *wq)
{ {
int ret; struct task_struct *tsk;
if (wq->manager) if (wq->manager)
return 0; return 0;
clear_bit(IO_WQ_BIT_EXIT, &wq->state); reinit_completion(&wq->worker_done);
refcount_inc(&wq->refs); tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE);
current->flags |= PF_IO_WORKER; if (!IS_ERR(tsk)) {
ret = io_wq_fork_thread(io_wq_manager, wq); wq->manager = get_task_struct(tsk);
current->flags &= ~PF_IO_WORKER; wake_up_new_task(tsk);
if (ret >= 0) {
wait_for_completion(&wq->done);
return 0; return 0;
} }
io_wq_put(wq); return PTR_ERR(tsk);
return ret;
} }
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
...@@ -813,7 +792,8 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) ...@@ -813,7 +792,8 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
unsigned long flags; unsigned long flags;
/* Can only happen if manager creation fails after exec */ /* Can only happen if manager creation fails after exec */
if (unlikely(io_wq_fork_manager(wqe->wq))) { if (io_wq_fork_manager(wqe->wq) ||
test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
work->flags |= IO_WQ_WORK_CANCEL; work->flags |= IO_WQ_WORK_CANCEL;
wqe->wq->do_work(work); wqe->wq->do_work(work);
return; return;
...@@ -849,14 +829,6 @@ void io_wq_hash_work(struct io_wq_work *work, void *val) ...@@ -849,14 +829,6 @@ void io_wq_hash_work(struct io_wq_work *work, void *val)
work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
} }
struct io_cb_cancel_data {
work_cancel_fn *fn;
void *data;
int nr_running;
int nr_pending;
bool cancel_all;
};
static bool io_wq_worker_cancel(struct io_worker *worker, void *data) static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
{ {
struct io_cb_cancel_data *match = data; struct io_cb_cancel_data *match = data;
...@@ -1043,16 +1015,18 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ...@@ -1043,16 +1015,18 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
} }
wq->task_pid = current->pid; wq->task_pid = current->pid;
init_completion(&wq->done); init_completion(&wq->exited);
refcount_set(&wq->refs, 1); refcount_set(&wq->refs, 1);
init_completion(&wq->worker_done);
atomic_set(&wq->worker_refs, 0);
ret = io_wq_fork_manager(wq); ret = io_wq_fork_manager(wq);
if (!ret) if (!ret)
return wq; return wq;
io_wq_put(wq);
io_wq_put_hash(data->hash);
err: err:
io_wq_put_hash(data->hash);
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
for_each_node(node) for_each_node(node)
kfree(wq->wqes[node]); kfree(wq->wqes[node]);
...@@ -1063,6 +1037,16 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ...@@ -1063,6 +1037,16 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
return ERR_PTR(ret); return ERR_PTR(ret);
} }
static void io_wq_destroy_manager(struct io_wq *wq)
{
if (wq->manager) {
wake_up_process(wq->manager);
wait_for_completion(&wq->exited);
put_task_struct(wq->manager);
wq->manager = NULL;
}
}
static void io_wq_destroy(struct io_wq *wq) static void io_wq_destroy(struct io_wq *wq)
{ {
int node; int node;
...@@ -1070,26 +1054,16 @@ static void io_wq_destroy(struct io_wq *wq) ...@@ -1070,26 +1054,16 @@ static void io_wq_destroy(struct io_wq *wq)
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
set_bit(IO_WQ_BIT_EXIT, &wq->state); set_bit(IO_WQ_BIT_EXIT, &wq->state);
if (wq->manager) io_wq_destroy_manager(wq);
wake_up_process(wq->manager);
rcu_read_lock();
for_each_node(node)
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
rcu_read_unlock();
spin_lock_irq(&wq->hash->wait.lock);
for_each_node(node) { for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node]; struct io_wqe *wqe = wq->wqes[node];
WARN_ON_ONCE(!wq_list_empty(&wqe->work_list));
list_del_init(&wqe->wait.entry);
kfree(wqe); kfree(wqe);
} }
spin_unlock_irq(&wq->hash->wait.lock);
io_wq_put_hash(wq->hash); io_wq_put_hash(wq->hash);
kfree(wq->wqes); kfree(wq->wqes);
kfree(wq); kfree(wq);
} }
void io_wq_put(struct io_wq *wq) void io_wq_put(struct io_wq *wq)
...@@ -1098,6 +1072,13 @@ void io_wq_put(struct io_wq *wq) ...@@ -1098,6 +1072,13 @@ void io_wq_put(struct io_wq *wq)
io_wq_destroy(wq); io_wq_destroy(wq);
} }
void io_wq_put_and_exit(struct io_wq *wq)
{
set_bit(IO_WQ_BIT_EXIT, &wq->state);
io_wq_destroy_manager(wq);
io_wq_put(wq);
}
static bool io_wq_worker_affinity(struct io_worker *worker, void *data) static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{ {
struct task_struct *task = worker->task; struct task_struct *task = worker->task;
......
...@@ -79,8 +79,8 @@ static inline void wq_list_del(struct io_wq_work_list *list, ...@@ -79,8 +79,8 @@ static inline void wq_list_del(struct io_wq_work_list *list,
struct io_wq_work { struct io_wq_work {
struct io_wq_work_node list; struct io_wq_work_node list;
const struct cred *creds;
unsigned flags; unsigned flags;
unsigned short personality;
}; };
static inline struct io_wq_work *wq_next_work(struct io_wq_work *work) static inline struct io_wq_work *wq_next_work(struct io_wq_work *work)
...@@ -114,12 +114,11 @@ struct io_wq_data { ...@@ -114,12 +114,11 @@ struct io_wq_data {
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data); struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data);
void io_wq_put(struct io_wq *wq); void io_wq_put(struct io_wq *wq);
void io_wq_put_and_exit(struct io_wq *wq);
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work); void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
void io_wq_hash_work(struct io_wq_work *work, void *val); void io_wq_hash_work(struct io_wq_work *work, void *val);
pid_t io_wq_fork_thread(int (*fn)(void *), void *arg);
static inline bool io_wq_is_hashed(struct io_wq_work *work) static inline bool io_wq_is_hashed(struct io_wq_work *work)
{ {
return work->flags & IO_WQ_WORK_HASHED; return work->flags & IO_WQ_WORK_HASHED;
......
...@@ -74,13 +74,11 @@ ...@@ -74,13 +74,11 @@
#include <linux/fsnotify.h> #include <linux/fsnotify.h>
#include <linux/fadvise.h> #include <linux/fadvise.h>
#include <linux/eventpoll.h> #include <linux/eventpoll.h>
#include <linux/fs_struct.h>
#include <linux/splice.h> #include <linux/splice.h>
#include <linux/task_work.h> #include <linux/task_work.h>
#include <linux/pagemap.h> #include <linux/pagemap.h>
#include <linux/io_uring.h> #include <linux/io_uring.h>
#include <linux/blk-cgroup.h> #include <linux/freezer.h>
#include <linux/audit.h>
#define CREATE_TRACE_POINTS #define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h> #include <trace/events/io_uring.h>
...@@ -276,7 +274,7 @@ struct io_sq_data { ...@@ -276,7 +274,7 @@ struct io_sq_data {
unsigned long state; unsigned long state;
struct completion startup; struct completion startup;
struct completion completion; struct completion parked;
struct completion exited; struct completion exited;
}; };
...@@ -338,7 +336,6 @@ struct io_ring_ctx { ...@@ -338,7 +336,6 @@ struct io_ring_ctx {
unsigned int drain_next: 1; unsigned int drain_next: 1;
unsigned int eventfd_async: 1; unsigned int eventfd_async: 1;
unsigned int restricted: 1; unsigned int restricted: 1;
unsigned int sqo_dead: 1;
unsigned int sqo_exec: 1; unsigned int sqo_exec: 1;
/* /*
...@@ -380,11 +377,6 @@ struct io_ring_ctx { ...@@ -380,11 +377,6 @@ struct io_ring_ctx {
struct io_rings *rings; struct io_rings *rings;
/*
* For SQPOLL usage
*/
struct task_struct *sqo_task;
/* Only used for accounting purposes */ /* Only used for accounting purposes */
struct mm_struct *mm_account; struct mm_struct *mm_account;
...@@ -688,7 +680,6 @@ enum { ...@@ -688,7 +680,6 @@ enum {
REQ_F_POLLED_BIT, REQ_F_POLLED_BIT,
REQ_F_BUFFER_SELECTED_BIT, REQ_F_BUFFER_SELECTED_BIT,
REQ_F_NO_FILE_TABLE_BIT, REQ_F_NO_FILE_TABLE_BIT,
REQ_F_WORK_INITIALIZED_BIT,
REQ_F_LTIMEOUT_ACTIVE_BIT, REQ_F_LTIMEOUT_ACTIVE_BIT,
REQ_F_COMPLETE_INLINE_BIT, REQ_F_COMPLETE_INLINE_BIT,
...@@ -712,7 +703,7 @@ enum { ...@@ -712,7 +703,7 @@ enum {
/* fail rest of links */ /* fail rest of links */
REQ_F_FAIL_LINK = BIT(REQ_F_FAIL_LINK_BIT), REQ_F_FAIL_LINK = BIT(REQ_F_FAIL_LINK_BIT),
/* on inflight list */ /* on inflight list, should be cancelled and waited on exit reliably */
REQ_F_INFLIGHT = BIT(REQ_F_INFLIGHT_BIT), REQ_F_INFLIGHT = BIT(REQ_F_INFLIGHT_BIT),
/* read/write uses file position */ /* read/write uses file position */
REQ_F_CUR_POS = BIT(REQ_F_CUR_POS_BIT), REQ_F_CUR_POS = BIT(REQ_F_CUR_POS_BIT),
...@@ -730,8 +721,6 @@ enum { ...@@ -730,8 +721,6 @@ enum {
REQ_F_BUFFER_SELECTED = BIT(REQ_F_BUFFER_SELECTED_BIT), REQ_F_BUFFER_SELECTED = BIT(REQ_F_BUFFER_SELECTED_BIT),
/* doesn't need file table for this request */ /* doesn't need file table for this request */
REQ_F_NO_FILE_TABLE = BIT(REQ_F_NO_FILE_TABLE_BIT), REQ_F_NO_FILE_TABLE = BIT(REQ_F_NO_FILE_TABLE_BIT),
/* io_wq_work is initialized */
REQ_F_WORK_INITIALIZED = BIT(REQ_F_WORK_INITIALIZED_BIT),
/* linked timeout is active, i.e. prepared by link's head */ /* linked timeout is active, i.e. prepared by link's head */
REQ_F_LTIMEOUT_ACTIVE = BIT(REQ_F_LTIMEOUT_ACTIVE_BIT), REQ_F_LTIMEOUT_ACTIVE = BIT(REQ_F_LTIMEOUT_ACTIVE_BIT),
/* completion is deferred through io_comp_state */ /* completion is deferred through io_comp_state */
...@@ -1080,9 +1069,7 @@ static bool io_match_task(struct io_kiocb *head, ...@@ -1080,9 +1069,7 @@ static bool io_match_task(struct io_kiocb *head,
return true; return true;
io_for_each_link(req, head) { io_for_each_link(req, head) {
if (!(req->flags & REQ_F_WORK_INITIALIZED)) if (req->flags & REQ_F_INFLIGHT)
continue;
if (req->file && req->file->f_op == &io_uring_fops)
return true; return true;
if (req->task->files == files) if (req->task->files == files)
return true; return true;
...@@ -1096,24 +1083,6 @@ static inline void req_set_fail_links(struct io_kiocb *req) ...@@ -1096,24 +1083,6 @@ static inline void req_set_fail_links(struct io_kiocb *req)
req->flags |= REQ_F_FAIL_LINK; req->flags |= REQ_F_FAIL_LINK;
} }
static inline void __io_req_init_async(struct io_kiocb *req)
{
memset(&req->work, 0, sizeof(req->work));
req->flags |= REQ_F_WORK_INITIALIZED;
}
/*
* Note: must call io_req_init_async() for the first time you
* touch any members of io_wq_work.
*/
static inline void io_req_init_async(struct io_kiocb *req)
{
if (req->flags & REQ_F_WORK_INITIALIZED)
return;
__io_req_init_async(req);
}
static void io_ring_ctx_ref_free(struct percpu_ref *ref) static void io_ring_ctx_ref_free(struct percpu_ref *ref)
{ {
struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs); struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
...@@ -1196,37 +1165,11 @@ static bool req_need_defer(struct io_kiocb *req, u32 seq) ...@@ -1196,37 +1165,11 @@ static bool req_need_defer(struct io_kiocb *req, u32 seq)
return false; return false;
} }
static void io_req_clean_work(struct io_kiocb *req)
{
if (!(req->flags & REQ_F_WORK_INITIALIZED))
return;
if (req->work.creds) {
put_cred(req->work.creds);
req->work.creds = NULL;
}
if (req->flags & REQ_F_INFLIGHT) {
struct io_ring_ctx *ctx = req->ctx;
struct io_uring_task *tctx = req->task->io_uring;
unsigned long flags;
spin_lock_irqsave(&ctx->inflight_lock, flags);
list_del(&req->inflight_entry);
spin_unlock_irqrestore(&ctx->inflight_lock, flags);
req->flags &= ~REQ_F_INFLIGHT;
if (atomic_read(&tctx->in_idle))
wake_up(&tctx->wait);
}
req->flags &= ~REQ_F_WORK_INITIALIZED;
}
static void io_req_track_inflight(struct io_kiocb *req) static void io_req_track_inflight(struct io_kiocb *req)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
if (!(req->flags & REQ_F_INFLIGHT)) { if (!(req->flags & REQ_F_INFLIGHT)) {
io_req_init_async(req);
req->flags |= REQ_F_INFLIGHT; req->flags |= REQ_F_INFLIGHT;
spin_lock_irq(&ctx->inflight_lock); spin_lock_irq(&ctx->inflight_lock);
...@@ -1240,8 +1183,6 @@ static void io_prep_async_work(struct io_kiocb *req) ...@@ -1240,8 +1183,6 @@ static void io_prep_async_work(struct io_kiocb *req)
const struct io_op_def *def = &io_op_defs[req->opcode]; const struct io_op_def *def = &io_op_defs[req->opcode];
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
io_req_init_async(req);
if (req->flags & REQ_F_FORCE_ASYNC) if (req->flags & REQ_F_FORCE_ASYNC)
req->work.flags |= IO_WQ_WORK_CONCURRENT; req->work.flags |= IO_WQ_WORK_CONCURRENT;
...@@ -1252,8 +1193,6 @@ static void io_prep_async_work(struct io_kiocb *req) ...@@ -1252,8 +1193,6 @@ static void io_prep_async_work(struct io_kiocb *req)
if (def->unbound_nonreg_file) if (def->unbound_nonreg_file)
req->work.flags |= IO_WQ_WORK_UNBOUND; req->work.flags |= IO_WQ_WORK_UNBOUND;
} }
if (!req->work.creds)
req->work.creds = get_current_cred();
} }
static void io_prep_async_link(struct io_kiocb *req) static void io_prep_async_link(struct io_kiocb *req)
...@@ -1264,7 +1203,7 @@ static void io_prep_async_link(struct io_kiocb *req) ...@@ -1264,7 +1203,7 @@ static void io_prep_async_link(struct io_kiocb *req)
io_prep_async_work(cur); io_prep_async_work(cur);
} }
static struct io_kiocb *__io_queue_async_work(struct io_kiocb *req) static void io_queue_async_work(struct io_kiocb *req)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *link = io_prep_linked_timeout(req); struct io_kiocb *link = io_prep_linked_timeout(req);
...@@ -1275,18 +1214,9 @@ static struct io_kiocb *__io_queue_async_work(struct io_kiocb *req) ...@@ -1275,18 +1214,9 @@ static struct io_kiocb *__io_queue_async_work(struct io_kiocb *req)
trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req, trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req,
&req->work, req->flags); &req->work, req->flags);
io_wq_enqueue(tctx->io_wq, &req->work);
return link;
}
static void io_queue_async_work(struct io_kiocb *req)
{
struct io_kiocb *link;
/* init ->work of the whole link before punting */ /* init ->work of the whole link before punting */
io_prep_async_link(req); io_prep_async_link(req);
link = __io_queue_async_work(req); io_wq_enqueue(tctx->io_wq, &req->work);
if (link) if (link)
io_queue_linked_timeout(link); io_queue_linked_timeout(link);
} }
...@@ -1521,18 +1451,22 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, ...@@ -1521,18 +1451,22 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force,
return all_flushed; return all_flushed;
} }
static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force,
struct task_struct *tsk, struct task_struct *tsk,
struct files_struct *files) struct files_struct *files)
{ {
bool ret = true;
if (test_bit(0, &ctx->cq_check_overflow)) { if (test_bit(0, &ctx->cq_check_overflow)) {
/* iopoll syncs against uring_lock, not completion_lock */ /* iopoll syncs against uring_lock, not completion_lock */
if (ctx->flags & IORING_SETUP_IOPOLL) if (ctx->flags & IORING_SETUP_IOPOLL)
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
__io_cqring_overflow_flush(ctx, force, tsk, files); ret = __io_cqring_overflow_flush(ctx, force, tsk, files);
if (ctx->flags & IORING_SETUP_IOPOLL) if (ctx->flags & IORING_SETUP_IOPOLL)
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
} }
return ret;
} }
static void __io_cqring_fill_event(struct io_kiocb *req, long res, long cflags) static void __io_cqring_fill_event(struct io_kiocb *req, long res, long cflags)
...@@ -1714,9 +1648,19 @@ static void io_dismantle_req(struct io_kiocb *req) ...@@ -1714,9 +1648,19 @@ static void io_dismantle_req(struct io_kiocb *req)
io_put_file(req, req->file, (req->flags & REQ_F_FIXED_FILE)); io_put_file(req, req->file, (req->flags & REQ_F_FIXED_FILE));
if (req->fixed_rsrc_refs) if (req->fixed_rsrc_refs)
percpu_ref_put(req->fixed_rsrc_refs); percpu_ref_put(req->fixed_rsrc_refs);
io_req_clean_work(req);
if (req->flags & REQ_F_INFLIGHT) {
struct io_ring_ctx *ctx = req->ctx;
unsigned long flags;
spin_lock_irqsave(&ctx->inflight_lock, flags);
list_del(&req->inflight_entry);
spin_unlock_irqrestore(&ctx->inflight_lock, flags);
req->flags &= ~REQ_F_INFLIGHT;
}
} }
/* must to be called somewhat shortly after putting a request */
static inline void io_put_task(struct task_struct *task, int nr) static inline void io_put_task(struct task_struct *task, int nr)
{ {
struct io_uring_task *tctx = task->io_uring; struct io_uring_task *tctx = task->io_uring;
...@@ -1800,15 +1744,7 @@ static void io_fail_links(struct io_kiocb *req) ...@@ -1800,15 +1744,7 @@ static void io_fail_links(struct io_kiocb *req)
trace_io_uring_fail_link(req, link); trace_io_uring_fail_link(req, link);
io_cqring_fill_event(link, -ECANCELED); io_cqring_fill_event(link, -ECANCELED);
/*
* It's ok to free under spinlock as they're not linked anymore,
* but avoid REQ_F_WORK_INITIALIZED because it may deadlock on
* work.fs->lock.
*/
if (link->flags & REQ_F_WORK_INITIALIZED)
io_put_req_deferred(link, 2); io_put_req_deferred(link, 2);
else
io_double_put_req(link);
link = nxt; link = nxt;
} }
io_commit_cqring(ctx); io_commit_cqring(ctx);
...@@ -1845,6 +1781,18 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req) ...@@ -1845,6 +1781,18 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
return __io_req_find_next(req); return __io_req_find_next(req);
} }
static void ctx_flush_and_put(struct io_ring_ctx *ctx)
{
if (!ctx)
return;
if (ctx->submit_state.comp.nr) {
mutex_lock(&ctx->uring_lock);
io_submit_flush_completions(&ctx->submit_state.comp, ctx);
mutex_unlock(&ctx->uring_lock);
}
percpu_ref_put(&ctx->refs);
}
static bool __tctx_task_work(struct io_uring_task *tctx) static bool __tctx_task_work(struct io_uring_task *tctx)
{ {
struct io_ring_ctx *ctx = NULL; struct io_ring_ctx *ctx = NULL;
...@@ -1862,30 +1810,20 @@ static bool __tctx_task_work(struct io_uring_task *tctx) ...@@ -1862,30 +1810,20 @@ static bool __tctx_task_work(struct io_uring_task *tctx)
node = list.first; node = list.first;
while (node) { while (node) {
struct io_wq_work_node *next = node->next; struct io_wq_work_node *next = node->next;
struct io_ring_ctx *this_ctx;
struct io_kiocb *req; struct io_kiocb *req;
req = container_of(node, struct io_kiocb, io_task_work.node); req = container_of(node, struct io_kiocb, io_task_work.node);
this_ctx = req->ctx; if (req->ctx != ctx) {
req->task_work.func(&req->task_work); ctx_flush_and_put(ctx);
node = next; ctx = req->ctx;
percpu_ref_get(&ctx->refs);
if (!ctx) {
ctx = this_ctx;
} else if (ctx != this_ctx) {
mutex_lock(&ctx->uring_lock);
io_submit_flush_completions(&ctx->submit_state.comp, ctx);
mutex_unlock(&ctx->uring_lock);
ctx = this_ctx;
}
} }
if (ctx && ctx->submit_state.comp.nr) { req->task_work.func(&req->task_work);
mutex_lock(&ctx->uring_lock); node = next;
io_submit_flush_completions(&ctx->submit_state.comp, ctx);
mutex_unlock(&ctx->uring_lock);
} }
ctx_flush_and_put(ctx);
return list.first != NULL; return list.first != NULL;
} }
...@@ -1893,10 +1831,10 @@ static void tctx_task_work(struct callback_head *cb) ...@@ -1893,10 +1831,10 @@ static void tctx_task_work(struct callback_head *cb)
{ {
struct io_uring_task *tctx = container_of(cb, struct io_uring_task, task_work); struct io_uring_task *tctx = container_of(cb, struct io_uring_task, task_work);
clear_bit(0, &tctx->task_state);
while (__tctx_task_work(tctx)) while (__tctx_task_work(tctx))
cond_resched(); cond_resched();
clear_bit(0, &tctx->task_state);
} }
static int io_task_work_add(struct task_struct *tsk, struct io_kiocb *req, static int io_task_work_add(struct task_struct *tsk, struct io_kiocb *req,
...@@ -2010,7 +1948,7 @@ static void __io_req_task_submit(struct io_kiocb *req) ...@@ -2010,7 +1948,7 @@ static void __io_req_task_submit(struct io_kiocb *req)
/* ctx stays valid until unlock, even if we drop all ours ctx->refs */ /* ctx stays valid until unlock, even if we drop all ours ctx->refs */
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
if (!ctx->sqo_dead && !(current->flags & PF_EXITING) && !current->in_execve) if (!(current->flags & PF_EXITING) && !current->in_execve)
__io_queue_sqe(req); __io_queue_sqe(req);
else else
__io_req_task_cancel(req, -EFAULT); __io_req_task_cancel(req, -EFAULT);
...@@ -2472,23 +2410,32 @@ static bool io_resubmit_prep(struct io_kiocb *req) ...@@ -2472,23 +2410,32 @@ static bool io_resubmit_prep(struct io_kiocb *req)
return false; return false;
return !io_setup_async_rw(req, iovec, inline_vecs, &iter, false); return !io_setup_async_rw(req, iovec, inline_vecs, &iter, false);
} }
#endif
static bool io_rw_reissue(struct io_kiocb *req) static bool io_rw_should_reissue(struct io_kiocb *req)
{ {
#ifdef CONFIG_BLOCK
umode_t mode = file_inode(req->file)->i_mode; umode_t mode = file_inode(req->file)->i_mode;
struct io_ring_ctx *ctx = req->ctx;
if (!S_ISBLK(mode) && !S_ISREG(mode)) if (!S_ISBLK(mode) && !S_ISREG(mode))
return false; return false;
if ((req->flags & REQ_F_NOWAIT) || io_wq_current_is_worker()) if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() &&
!(ctx->flags & IORING_SETUP_IOPOLL)))
return false; return false;
/* /*
* If ref is dying, we might be running poll reap from the exit work. * If ref is dying, we might be running poll reap from the exit work.
* Don't attempt to reissue from that path, just let it fail with * Don't attempt to reissue from that path, just let it fail with
* -EAGAIN. * -EAGAIN.
*/ */
if (percpu_ref_is_dying(&req->ctx->refs)) if (percpu_ref_is_dying(&ctx->refs))
return false;
return true;
}
#endif
static bool io_rw_reissue(struct io_kiocb *req)
{
#ifdef CONFIG_BLOCK
if (!io_rw_should_reissue(req))
return false; return false;
lockdep_assert_held(&req->ctx->uring_lock); lockdep_assert_held(&req->ctx->uring_lock);
...@@ -2531,6 +2478,19 @@ static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2) ...@@ -2531,6 +2478,19 @@ static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
{ {
struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb); struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
#ifdef CONFIG_BLOCK
/* Rewind iter, if we have one. iopoll path resubmits as usual */
if (res == -EAGAIN && io_rw_should_reissue(req)) {
struct io_async_rw *rw = req->async_data;
if (rw)
iov_iter_revert(&rw->iter,
req->result - iov_iter_count(&rw->iter));
else if (!io_resubmit_prep(req))
res = -EIO;
}
#endif
if (kiocb->ki_flags & IOCB_WRITE) if (kiocb->ki_flags & IOCB_WRITE)
kiocb_end_write(req); kiocb_end_write(req);
...@@ -3279,6 +3239,8 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags) ...@@ -3279,6 +3239,8 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags)
ret = io_iter_do_read(req, iter); ret = io_iter_do_read(req, iter);
if (ret == -EIOCBQUEUED) { if (ret == -EIOCBQUEUED) {
if (req->async_data)
iov_iter_revert(iter, io_size - iov_iter_count(iter));
goto out_free; goto out_free;
} else if (ret == -EAGAIN) { } else if (ret == -EAGAIN) {
/* IOPOLL retry should happen for io-wq threads */ /* IOPOLL retry should happen for io-wq threads */
...@@ -3324,6 +3286,7 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags) ...@@ -3324,6 +3286,7 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags)
if (ret == -EIOCBQUEUED) if (ret == -EIOCBQUEUED)
return 0; return 0;
/* we got some bytes, but not all. retry. */ /* we got some bytes, but not all. retry. */
kiocb->ki_flags &= ~IOCB_WAITQ;
} while (ret > 0 && ret < io_size); } while (ret > 0 && ret < io_size);
done: done:
kiocb_done(kiocb, ret, issue_flags); kiocb_done(kiocb, ret, issue_flags);
...@@ -3410,6 +3373,8 @@ static int io_write(struct io_kiocb *req, unsigned int issue_flags) ...@@ -3410,6 +3373,8 @@ static int io_write(struct io_kiocb *req, unsigned int issue_flags)
/* no retry on NONBLOCK nor RWF_NOWAIT */ /* no retry on NONBLOCK nor RWF_NOWAIT */
if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT)) if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT))
goto done; goto done;
if (ret2 == -EIOCBQUEUED && req->async_data)
iov_iter_revert(iter, io_size - iov_iter_count(iter));
if (!force_nonblock || ret2 != -EAGAIN) { if (!force_nonblock || ret2 != -EAGAIN) {
/* IOPOLL retry should happen for io-wq threads */ /* IOPOLL retry should happen for io-wq threads */
if ((req->ctx->flags & IORING_SETUP_IOPOLL) && ret2 == -EAGAIN) if ((req->ctx->flags & IORING_SETUP_IOPOLL) && ret2 == -EAGAIN)
...@@ -3588,7 +3553,6 @@ static int __io_splice_prep(struct io_kiocb *req, ...@@ -3588,7 +3553,6 @@ static int __io_splice_prep(struct io_kiocb *req,
* Splice operation will be punted aync, and here need to * Splice operation will be punted aync, and here need to
* modify io_wq_work.flags, so initialize io_wq_work firstly. * modify io_wq_work.flags, so initialize io_wq_work firstly.
*/ */
io_req_init_async(req);
req->work.flags |= IO_WQ_WORK_UNBOUND; req->work.flags |= IO_WQ_WORK_UNBOUND;
} }
...@@ -3864,7 +3828,7 @@ static int io_openat2(struct io_kiocb *req, unsigned int issue_flags) ...@@ -3864,7 +3828,7 @@ static int io_openat2(struct io_kiocb *req, unsigned int issue_flags)
static int io_openat(struct io_kiocb *req, unsigned int issue_flags) static int io_openat(struct io_kiocb *req, unsigned int issue_flags)
{ {
return io_openat2(req, issue_flags & IO_URING_F_NONBLOCK); return io_openat2(req, issue_flags);
} }
static int io_remove_buffers_prep(struct io_kiocb *req, static int io_remove_buffers_prep(struct io_kiocb *req,
...@@ -5003,6 +4967,9 @@ static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt, ...@@ -5003,6 +4967,9 @@ static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt,
pt->error = -EINVAL; pt->error = -EINVAL;
return; return;
} }
/* double add on the same waitqueue head, ignore */
if (poll->head == head)
return;
poll = kmalloc(sizeof(*poll), GFP_ATOMIC); poll = kmalloc(sizeof(*poll), GFP_ATOMIC);
if (!poll) { if (!poll) {
pt->error = -ENOMEM; pt->error = -ENOMEM;
...@@ -5538,6 +5505,7 @@ static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, ...@@ -5538,6 +5505,7 @@ static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
data->mode = io_translate_timeout_mode(flags); data->mode = io_translate_timeout_mode(flags);
hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode); hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
io_req_track_inflight(req);
return 0; return 0;
} }
...@@ -5945,8 +5913,22 @@ static void __io_clean_op(struct io_kiocb *req) ...@@ -5945,8 +5913,22 @@ static void __io_clean_op(struct io_kiocb *req)
static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
const struct cred *creds = NULL;
int ret; int ret;
if (req->work.personality) {
const struct cred *new_creds;
if (!(issue_flags & IO_URING_F_NONBLOCK))
mutex_lock(&ctx->uring_lock);
new_creds = idr_find(&ctx->personality_idr, req->work.personality);
if (!(issue_flags & IO_URING_F_NONBLOCK))
mutex_unlock(&ctx->uring_lock);
if (!new_creds)
return -EINVAL;
creds = override_creds(new_creds);
}
switch (req->opcode) { switch (req->opcode) {
case IORING_OP_NOP: case IORING_OP_NOP:
ret = io_nop(req, issue_flags); ret = io_nop(req, issue_flags);
...@@ -6053,6 +6035,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) ...@@ -6053,6 +6035,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
break; break;
} }
if (creds)
revert_creds(creds);
if (ret) if (ret)
return ret; return ret;
...@@ -6216,18 +6201,10 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req) ...@@ -6216,18 +6201,10 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
static void __io_queue_sqe(struct io_kiocb *req) static void __io_queue_sqe(struct io_kiocb *req)
{ {
struct io_kiocb *linked_timeout = io_prep_linked_timeout(req); struct io_kiocb *linked_timeout = io_prep_linked_timeout(req);
const struct cred *old_creds = NULL;
int ret; int ret;
if ((req->flags & REQ_F_WORK_INITIALIZED) && req->work.creds &&
req->work.creds != current_cred())
old_creds = override_creds(req->work.creds);
ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER); ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
if (old_creds)
revert_creds(old_creds);
/* /*
* We async punt it if the file wasn't marked NOWAIT, or if the file * We async punt it if the file wasn't marked NOWAIT, or if the file
* doesn't support non-blocking read/write attempts * doesn't support non-blocking read/write attempts
...@@ -6314,7 +6291,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, ...@@ -6314,7 +6291,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
{ {
struct io_submit_state *state; struct io_submit_state *state;
unsigned int sqe_flags; unsigned int sqe_flags;
int id, ret = 0; int ret = 0;
req->opcode = READ_ONCE(sqe->opcode); req->opcode = READ_ONCE(sqe->opcode);
/* same numerical values with corresponding REQ_F_*, safe to copy */ /* same numerical values with corresponding REQ_F_*, safe to copy */
...@@ -6346,15 +6323,9 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, ...@@ -6346,15 +6323,9 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
!io_op_defs[req->opcode].buffer_select) !io_op_defs[req->opcode].buffer_select)
return -EOPNOTSUPP; return -EOPNOTSUPP;
id = READ_ONCE(sqe->personality); req->work.list.next = NULL;
if (id) { req->work.flags = 0;
__io_req_init_async(req); req->work.personality = READ_ONCE(sqe->personality);
req->work.creds = idr_find(&ctx->personality_idr, id);
if (unlikely(!req->work.creds))
return -EINVAL;
get_cred(req->work.creds);
}
state = &ctx->submit_state; state = &ctx->submit_state;
/* /*
...@@ -6616,8 +6587,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) ...@@ -6616,8 +6587,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
if (!list_empty(&ctx->iopoll_list)) if (!list_empty(&ctx->iopoll_list))
io_do_iopoll(ctx, &nr_events, 0); io_do_iopoll(ctx, &nr_events, 0);
if (to_submit && !ctx->sqo_dead && if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)))
likely(!percpu_ref_is_dying(&ctx->refs)))
ret = io_submit_sqes(ctx, to_submit); ret = io_submit_sqes(ctx, to_submit);
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
} }
...@@ -6686,7 +6656,7 @@ static void io_sq_thread_parkme(struct io_sq_data *sqd) ...@@ -6686,7 +6656,7 @@ static void io_sq_thread_parkme(struct io_sq_data *sqd)
* wait_task_inactive(). * wait_task_inactive().
*/ */
preempt_disable(); preempt_disable();
complete(&sqd->completion); complete(&sqd->parked);
schedule_preempt_disabled(); schedule_preempt_disabled();
preempt_enable(); preempt_enable();
} }
...@@ -6703,7 +6673,6 @@ static int io_sq_thread(void *data) ...@@ -6703,7 +6673,6 @@ static int io_sq_thread(void *data)
sprintf(buf, "iou-sqp-%d", sqd->task_pid); sprintf(buf, "iou-sqp-%d", sqd->task_pid);
set_task_comm(current, buf); set_task_comm(current, buf);
sqd->thread = current;
current->pf_io_worker = NULL; current->pf_io_worker = NULL;
if (sqd->sq_cpu != -1) if (sqd->sq_cpu != -1)
...@@ -6712,8 +6681,6 @@ static int io_sq_thread(void *data) ...@@ -6712,8 +6681,6 @@ static int io_sq_thread(void *data)
set_cpus_allowed_ptr(current, cpu_online_mask); set_cpus_allowed_ptr(current, cpu_online_mask);
current->flags |= PF_NO_SETAFFINITY; current->flags |= PF_NO_SETAFFINITY;
complete(&sqd->completion);
wait_for_completion(&sqd->startup); wait_for_completion(&sqd->startup);
while (!io_sq_thread_should_stop(sqd)) { while (!io_sq_thread_should_stop(sqd)) {
...@@ -6770,6 +6737,7 @@ static int io_sq_thread(void *data) ...@@ -6770,6 +6737,7 @@ static int io_sq_thread(void *data)
io_ring_set_wakeup_flag(ctx); io_ring_set_wakeup_flag(ctx);
schedule(); schedule();
try_to_freeze();
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
io_ring_clear_wakeup_flag(ctx); io_ring_clear_wakeup_flag(ctx);
} }
...@@ -6784,18 +6752,25 @@ static int io_sq_thread(void *data) ...@@ -6784,18 +6752,25 @@ static int io_sq_thread(void *data)
io_run_task_work(); io_run_task_work();
/* /*
* Clear thread under lock so that concurrent parks work correctly * Ensure that we park properly if racing with someone trying to park
* while we're exiting. If we fail to grab the lock, check park and
* park if necessary. The ordering with the park bit and the lock
* ensures that we catch this reliably.
*/ */
complete_all(&sqd->completion); if (!mutex_trylock(&sqd->lock)) {
if (io_sq_thread_should_park(sqd))
io_sq_thread_parkme(sqd);
mutex_lock(&sqd->lock); mutex_lock(&sqd->lock);
}
sqd->thread = NULL; sqd->thread = NULL;
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
ctx->sqo_exec = 1; ctx->sqo_exec = 1;
io_ring_set_wakeup_flag(ctx); io_ring_set_wakeup_flag(ctx);
} }
mutex_unlock(&sqd->lock);
complete(&sqd->exited); complete(&sqd->exited);
mutex_unlock(&sqd->lock);
do_exit(0); do_exit(0);
} }
...@@ -6917,11 +6892,16 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, ...@@ -6917,11 +6892,16 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
trace_io_uring_cqring_wait(ctx, min_events); trace_io_uring_cqring_wait(ctx, min_events);
do { do {
io_cqring_overflow_flush(ctx, false, NULL, NULL); /* if we can't even flush overflow, don't wait for more */
if (!io_cqring_overflow_flush(ctx, false, NULL, NULL)) {
ret = -EBUSY;
break;
}
prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
TASK_INTERRUPTIBLE); TASK_INTERRUPTIBLE);
ret = io_cqring_wait_schedule(ctx, &iowq, &timeout); ret = io_cqring_wait_schedule(ctx, &iowq, &timeout);
finish_wait(&ctx->wait, &iowq.wq); finish_wait(&ctx->wait, &iowq.wq);
cond_resched();
} while (ret > 0); } while (ret > 0);
restore_saved_sigmask_unless(ret == -EINTR); restore_saved_sigmask_unless(ret == -EINTR);
...@@ -7091,40 +7071,42 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx) ...@@ -7091,40 +7071,42 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
static void io_sq_thread_unpark(struct io_sq_data *sqd) static void io_sq_thread_unpark(struct io_sq_data *sqd)
__releases(&sqd->lock) __releases(&sqd->lock)
{ {
if (!sqd->thread)
return;
if (sqd->thread == current) if (sqd->thread == current)
return; return;
clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
if (sqd->thread)
wake_up_state(sqd->thread, TASK_PARKED); wake_up_state(sqd->thread, TASK_PARKED);
mutex_unlock(&sqd->lock); mutex_unlock(&sqd->lock);
} }
static bool io_sq_thread_park(struct io_sq_data *sqd) static void io_sq_thread_park(struct io_sq_data *sqd)
__acquires(&sqd->lock) __acquires(&sqd->lock)
{ {
if (sqd->thread == current) if (sqd->thread == current)
return true; return;
mutex_lock(&sqd->lock);
if (!sqd->thread) {
mutex_unlock(&sqd->lock);
return false;
}
set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
mutex_lock(&sqd->lock);
if (sqd->thread) {
wake_up_process(sqd->thread); wake_up_process(sqd->thread);
wait_for_completion(&sqd->completion); wait_for_completion(&sqd->parked);
return true; }
} }
static void io_sq_thread_stop(struct io_sq_data *sqd) static void io_sq_thread_stop(struct io_sq_data *sqd)
{ {
if (!sqd->thread) if (test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state))
return; return;
mutex_lock(&sqd->lock);
if (sqd->thread) {
set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state)); WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state));
wake_up_process(sqd->thread); wake_up_process(sqd->thread);
mutex_unlock(&sqd->lock);
wait_for_completion(&sqd->exited); wait_for_completion(&sqd->exited);
WARN_ON_ONCE(sqd->thread);
} else {
mutex_unlock(&sqd->lock);
}
} }
static void io_put_sq_data(struct io_sq_data *sqd) static void io_put_sq_data(struct io_sq_data *sqd)
...@@ -7203,7 +7185,7 @@ static struct io_sq_data *io_get_sq_data(struct io_uring_params *p) ...@@ -7203,7 +7185,7 @@ static struct io_sq_data *io_get_sq_data(struct io_uring_params *p)
mutex_init(&sqd->lock); mutex_init(&sqd->lock);
init_waitqueue_head(&sqd->wait); init_waitqueue_head(&sqd->wait);
init_completion(&sqd->startup); init_completion(&sqd->startup);
init_completion(&sqd->completion); init_completion(&sqd->parked);
init_completion(&sqd->exited); init_completion(&sqd->exited);
return sqd; return sqd;
} }
...@@ -7834,6 +7816,8 @@ void __io_uring_free(struct task_struct *tsk) ...@@ -7834,6 +7816,8 @@ void __io_uring_free(struct task_struct *tsk)
struct io_uring_task *tctx = tsk->io_uring; struct io_uring_task *tctx = tsk->io_uring;
WARN_ON_ONCE(!xa_empty(&tctx->xa)); WARN_ON_ONCE(!xa_empty(&tctx->xa));
WARN_ON_ONCE(tctx->io_wq);
percpu_counter_destroy(&tctx->inflight); percpu_counter_destroy(&tctx->inflight);
kfree(tctx); kfree(tctx);
tsk->io_uring = NULL; tsk->io_uring = NULL;
...@@ -7841,21 +7825,22 @@ void __io_uring_free(struct task_struct *tsk) ...@@ -7841,21 +7825,22 @@ void __io_uring_free(struct task_struct *tsk)
static int io_sq_thread_fork(struct io_sq_data *sqd, struct io_ring_ctx *ctx) static int io_sq_thread_fork(struct io_sq_data *sqd, struct io_ring_ctx *ctx)
{ {
struct task_struct *tsk;
int ret; int ret;
clear_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); clear_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
reinit_completion(&sqd->completion); reinit_completion(&sqd->parked);
ctx->sqo_dead = ctx->sqo_exec = 0; ctx->sqo_exec = 0;
sqd->task_pid = current->pid; sqd->task_pid = current->pid;
current->flags |= PF_IO_WORKER; tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
ret = io_wq_fork_thread(io_sq_thread, sqd); if (IS_ERR(tsk))
current->flags &= ~PF_IO_WORKER; return PTR_ERR(tsk);
if (ret < 0) { ret = io_uring_alloc_task_context(tsk, ctx);
sqd->thread = NULL; if (ret)
set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
sqd->thread = tsk;
wake_up_new_task(tsk);
return ret; return ret;
}
wait_for_completion(&sqd->completion);
return io_uring_alloc_task_context(sqd->thread, ctx);
} }
static int io_sq_offload_create(struct io_ring_ctx *ctx, static int io_sq_offload_create(struct io_ring_ctx *ctx,
...@@ -7878,6 +7863,7 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx, ...@@ -7878,6 +7863,7 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx,
fdput(f); fdput(f);
} }
if (ctx->flags & IORING_SETUP_SQPOLL) { if (ctx->flags & IORING_SETUP_SQPOLL) {
struct task_struct *tsk;
struct io_sq_data *sqd; struct io_sq_data *sqd;
ret = -EPERM; ret = -EPERM;
...@@ -7919,15 +7905,16 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx, ...@@ -7919,15 +7905,16 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx,
} }
sqd->task_pid = current->pid; sqd->task_pid = current->pid;
current->flags |= PF_IO_WORKER; tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
ret = io_wq_fork_thread(io_sq_thread, sqd); if (IS_ERR(tsk)) {
current->flags &= ~PF_IO_WORKER; ret = PTR_ERR(tsk);
if (ret < 0) {
sqd->thread = NULL;
goto err; goto err;
} }
wait_for_completion(&sqd->completion); ret = io_uring_alloc_task_context(tsk, ctx);
ret = io_uring_alloc_task_context(sqd->thread, ctx); if (ret)
set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
sqd->thread = tsk;
wake_up_new_task(tsk);
if (ret) if (ret)
goto err; goto err;
} else if (p->flags & IORING_SETUP_SQ_AFF) { } else if (p->flags & IORING_SETUP_SQ_AFF) {
...@@ -7946,6 +7933,7 @@ static void io_sq_offload_start(struct io_ring_ctx *ctx) ...@@ -7946,6 +7933,7 @@ static void io_sq_offload_start(struct io_ring_ctx *ctx)
{ {
struct io_sq_data *sqd = ctx->sq_data; struct io_sq_data *sqd = ctx->sq_data;
ctx->flags &= ~IORING_SETUP_R_DISABLED;
if (ctx->flags & IORING_SETUP_SQPOLL) if (ctx->flags & IORING_SETUP_SQPOLL)
complete(&sqd->startup); complete(&sqd->startup);
} }
...@@ -8384,7 +8372,7 @@ static void io_req_cache_free(struct list_head *list, struct task_struct *tsk) ...@@ -8384,7 +8372,7 @@ static void io_req_cache_free(struct list_head *list, struct task_struct *tsk)
} }
} }
static void io_req_caches_free(struct io_ring_ctx *ctx, struct task_struct *tsk) static void io_req_caches_free(struct io_ring_ctx *ctx)
{ {
struct io_submit_state *submit_state = &ctx->submit_state; struct io_submit_state *submit_state = &ctx->submit_state;
struct io_comp_state *cs = &ctx->submit_state.comp; struct io_comp_state *cs = &ctx->submit_state.comp;
...@@ -8444,7 +8432,7 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) ...@@ -8444,7 +8432,7 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
percpu_ref_exit(&ctx->refs); percpu_ref_exit(&ctx->refs);
free_uid(ctx->user); free_uid(ctx->user);
io_req_caches_free(ctx, NULL); io_req_caches_free(ctx);
if (ctx->hash_map) if (ctx->hash_map)
io_wq_put_hash(ctx->hash_map); io_wq_put_hash(ctx->hash_map);
kfree(ctx->cancel_hash); kfree(ctx->cancel_hash);
...@@ -8512,16 +8500,13 @@ static int io_remove_personalities(int id, void *p, void *data) ...@@ -8512,16 +8500,13 @@ static int io_remove_personalities(int id, void *p, void *data)
return 0; return 0;
} }
static void io_run_ctx_fallback(struct io_ring_ctx *ctx) static bool io_run_ctx_fallback(struct io_ring_ctx *ctx)
{ {
struct callback_head *work, *head, *next; struct callback_head *work, *next;
bool executed = false;
do { do {
do { work = xchg(&ctx->exit_task_work, NULL);
head = NULL;
work = READ_ONCE(ctx->exit_task_work);
} while (cmpxchg(&ctx->exit_task_work, work, head) != work);
if (!work) if (!work)
break; break;
...@@ -8531,7 +8516,10 @@ static void io_run_ctx_fallback(struct io_ring_ctx *ctx) ...@@ -8531,7 +8516,10 @@ static void io_run_ctx_fallback(struct io_ring_ctx *ctx)
work = next; work = next;
cond_resched(); cond_resched();
} while (work); } while (work);
executed = true;
} while (1); } while (1);
return executed;
} }
static void io_ring_exit_work(struct work_struct *work) static void io_ring_exit_work(struct work_struct *work)
...@@ -8547,7 +8535,6 @@ static void io_ring_exit_work(struct work_struct *work) ...@@ -8547,7 +8535,6 @@ static void io_ring_exit_work(struct work_struct *work)
*/ */
do { do {
io_uring_try_cancel_requests(ctx, NULL, NULL); io_uring_try_cancel_requests(ctx, NULL, NULL);
io_run_ctx_fallback(ctx);
} while (!wait_for_completion_timeout(&ctx->ref_comp, HZ/20)); } while (!wait_for_completion_timeout(&ctx->ref_comp, HZ/20));
io_ring_ctx_free(ctx); io_ring_ctx_free(ctx);
} }
...@@ -8556,10 +8543,6 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) ...@@ -8556,10 +8543,6 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
{ {
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
percpu_ref_kill(&ctx->refs); percpu_ref_kill(&ctx->refs);
if (WARN_ON_ONCE((ctx->flags & IORING_SETUP_SQPOLL) && !ctx->sqo_dead))
ctx->sqo_dead = 1;
/* if force is set, the ring is going away. always drop after that */ /* if force is set, the ring is going away. always drop after that */
ctx->cq_overflow_flushed = 1; ctx->cq_overflow_flushed = 1;
if (ctx->rings) if (ctx->rings)
...@@ -8648,7 +8631,8 @@ static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx, ...@@ -8648,7 +8631,8 @@ static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
struct files_struct *files) struct files_struct *files)
{ {
struct io_task_cancel cancel = { .task = task, .files = files, }; struct io_task_cancel cancel = { .task = task, .files = files, };
struct io_uring_task *tctx = current->io_uring; struct task_struct *tctx_task = task ?: current;
struct io_uring_task *tctx = tctx_task->io_uring;
while (1) { while (1) {
enum io_wq_cancel cret; enum io_wq_cancel cret;
...@@ -8671,6 +8655,7 @@ static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx, ...@@ -8671,6 +8655,7 @@ static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
ret |= io_poll_remove_all(ctx, task, files); ret |= io_poll_remove_all(ctx, task, files);
ret |= io_kill_timeouts(ctx, task, files); ret |= io_kill_timeouts(ctx, task, files);
ret |= io_run_task_work(); ret |= io_run_task_work();
ret |= io_run_ctx_fallback(ctx);
io_cqring_overflow_flush(ctx, true, task, files); io_cqring_overflow_flush(ctx, true, task, files);
if (!ret) if (!ret)
break; break;
...@@ -8718,17 +8703,6 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx, ...@@ -8718,17 +8703,6 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx,
} }
} }
static void io_disable_sqo_submit(struct io_ring_ctx *ctx)
{
mutex_lock(&ctx->uring_lock);
ctx->sqo_dead = 1;
mutex_unlock(&ctx->uring_lock);
/* make sure callers enter the ring to get error */
if (ctx->rings)
io_ring_set_wakeup_flag(ctx);
}
/* /*
* We need to iteratively cancel requests, in case a request has dependent * We need to iteratively cancel requests, in case a request has dependent
* hard links. These persist even for failure of cancelations, hence keep * hard links. These persist even for failure of cancelations, hence keep
...@@ -8738,16 +8712,18 @@ static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx, ...@@ -8738,16 +8712,18 @@ static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx,
struct files_struct *files) struct files_struct *files)
{ {
struct task_struct *task = current; struct task_struct *task = current;
bool did_park = false;
if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) { if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) {
io_disable_sqo_submit(ctx); /* never started, nothing to cancel */
did_park = io_sq_thread_park(ctx->sq_data); if (ctx->flags & IORING_SETUP_R_DISABLED) {
if (did_park) { io_sq_offload_start(ctx);
return;
}
io_sq_thread_park(ctx->sq_data);
task = ctx->sq_data->thread; task = ctx->sq_data->thread;
if (task)
atomic_inc(&task->io_uring->in_idle); atomic_inc(&task->io_uring->in_idle);
} }
}
io_cancel_defer_files(ctx, task, files); io_cancel_defer_files(ctx, task, files);
...@@ -8755,10 +8731,10 @@ static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx, ...@@ -8755,10 +8731,10 @@ static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx,
if (!files) if (!files)
io_uring_try_cancel_requests(ctx, task, NULL); io_uring_try_cancel_requests(ctx, task, NULL);
if (did_park) { if (task)
atomic_dec(&task->io_uring->in_idle); atomic_dec(&task->io_uring->in_idle);
if (ctx->sq_data)
io_sq_thread_unpark(ctx->sq_data); io_sq_thread_unpark(ctx->sq_data);
}
} }
/* /*
...@@ -8786,10 +8762,6 @@ static int io_uring_add_task_file(struct io_ring_ctx *ctx, struct file *file) ...@@ -8786,10 +8762,6 @@ static int io_uring_add_task_file(struct io_ring_ctx *ctx, struct file *file)
fput(file); fput(file);
return ret; return ret;
} }
/* one and only SQPOLL file note, held by sqo_task */
WARN_ON_ONCE((ctx->flags & IORING_SETUP_SQPOLL) &&
current != ctx->sqo_task);
} }
tctx->last = file; tctx->last = file;
} }
...@@ -8819,13 +8791,17 @@ static void io_uring_del_task_file(struct file *file) ...@@ -8819,13 +8791,17 @@ static void io_uring_del_task_file(struct file *file)
fput(file); fput(file);
} }
static void io_uring_remove_task_files(struct io_uring_task *tctx) static void io_uring_clean_tctx(struct io_uring_task *tctx)
{ {
struct file *file; struct file *file;
unsigned long index; unsigned long index;
xa_for_each(&tctx->xa, index, file) xa_for_each(&tctx->xa, index, file)
io_uring_del_task_file(file); io_uring_del_task_file(file);
if (tctx->io_wq) {
io_wq_put_and_exit(tctx->io_wq);
tctx->io_wq = NULL;
}
} }
void __io_uring_files_cancel(struct files_struct *files) void __io_uring_files_cancel(struct files_struct *files)
...@@ -8840,13 +8816,8 @@ void __io_uring_files_cancel(struct files_struct *files) ...@@ -8840,13 +8816,8 @@ void __io_uring_files_cancel(struct files_struct *files)
io_uring_cancel_task_requests(file->private_data, files); io_uring_cancel_task_requests(file->private_data, files);
atomic_dec(&tctx->in_idle); atomic_dec(&tctx->in_idle);
if (files) { if (files)
io_uring_remove_task_files(tctx); io_uring_clean_tctx(tctx);
if (tctx->io_wq) {
io_wq_put(tctx->io_wq);
tctx->io_wq = NULL;
}
}
} }
static s64 tctx_inflight(struct io_uring_task *tctx) static s64 tctx_inflight(struct io_uring_task *tctx)
...@@ -8863,11 +8834,12 @@ static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx) ...@@ -8863,11 +8834,12 @@ static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx)
if (!sqd) if (!sqd)
return; return;
io_disable_sqo_submit(ctx); io_sq_thread_park(sqd);
if (!io_sq_thread_park(sqd)) if (!sqd->thread || !sqd->thread->io_uring) {
io_sq_thread_unpark(sqd);
return; return;
}
tctx = ctx->sq_data->thread->io_uring; tctx = ctx->sq_data->thread->io_uring;
atomic_inc(&tctx->in_idle); atomic_inc(&tctx->in_idle);
do { do {
/* read completions before cancelations */ /* read completions before cancelations */
...@@ -8903,7 +8875,6 @@ void __io_uring_task_cancel(void) ...@@ -8903,7 +8875,6 @@ void __io_uring_task_cancel(void)
/* make sure overflow events are dropped */ /* make sure overflow events are dropped */
atomic_inc(&tctx->in_idle); atomic_inc(&tctx->in_idle);
/* trigger io_disable_sqo_submit() */
if (tctx->sqpoll) { if (tctx->sqpoll) {
struct file *file; struct file *file;
unsigned long index; unsigned long index;
...@@ -8933,53 +8904,9 @@ void __io_uring_task_cancel(void) ...@@ -8933,53 +8904,9 @@ void __io_uring_task_cancel(void)
atomic_dec(&tctx->in_idle); atomic_dec(&tctx->in_idle);
io_uring_remove_task_files(tctx); io_uring_clean_tctx(tctx);
} /* all current's requests should be gone, we can kill tctx */
__io_uring_free(current);
static int io_uring_flush(struct file *file, void *data)
{
struct io_uring_task *tctx = current->io_uring;
struct io_ring_ctx *ctx = file->private_data;
/* Ignore helper thread files exit */
if (current->flags & PF_IO_WORKER)
return 0;
if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
io_uring_cancel_task_requests(ctx, NULL);
io_req_caches_free(ctx, current);
}
io_run_ctx_fallback(ctx);
if (!tctx)
return 0;
/* we should have cancelled and erased it before PF_EXITING */
WARN_ON_ONCE((current->flags & PF_EXITING) &&
xa_load(&tctx->xa, (unsigned long)file));
/*
* fput() is pending, will be 2 if the only other ref is our potential
* task file note. If the task is exiting, drop regardless of count.
*/
if (atomic_long_read(&file->f_count) != 2)
return 0;
if (ctx->flags & IORING_SETUP_SQPOLL) {
/* there is only one file note, which is owned by sqo_task */
WARN_ON_ONCE(ctx->sqo_task != current &&
xa_load(&tctx->xa, (unsigned long)file));
/* sqo_dead check is for when this happens after cancellation */
WARN_ON_ONCE(ctx->sqo_task == current && !ctx->sqo_dead &&
!xa_load(&tctx->xa, (unsigned long)file));
io_disable_sqo_submit(ctx);
}
if (!(ctx->flags & IORING_SETUP_SQPOLL) || ctx->sqo_task == current)
io_uring_del_task_file(file);
return 0;
} }
static void *io_uring_validate_mmap_request(struct file *file, static void *io_uring_validate_mmap_request(struct file *file,
...@@ -9060,22 +8987,14 @@ static int io_sqpoll_wait_sq(struct io_ring_ctx *ctx) ...@@ -9060,22 +8987,14 @@ static int io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
do { do {
if (!io_sqring_full(ctx)) if (!io_sqring_full(ctx))
break; break;
prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE); prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
if (unlikely(ctx->sqo_dead)) {
ret = -EOWNERDEAD;
goto out;
}
if (!io_sqring_full(ctx)) if (!io_sqring_full(ctx))
break; break;
schedule(); schedule();
} while (!signal_pending(current)); } while (!signal_pending(current));
finish_wait(&ctx->sqo_sq_wait, &wait); finish_wait(&ctx->sqo_sq_wait, &wait);
out:
return ret; return ret;
} }
...@@ -9157,8 +9076,6 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, ...@@ -9157,8 +9076,6 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
ctx->sqo_exec = 0; ctx->sqo_exec = 0;
} }
ret = -EOWNERDEAD; ret = -EOWNERDEAD;
if (unlikely(ctx->sqo_dead))
goto out;
if (flags & IORING_ENTER_SQ_WAKEUP) if (flags & IORING_ENTER_SQ_WAKEUP)
wake_up(&ctx->sq_data->wait); wake_up(&ctx->sq_data->wait);
if (flags & IORING_ENTER_SQ_WAIT) { if (flags & IORING_ENTER_SQ_WAIT) {
...@@ -9313,7 +9230,6 @@ static void io_uring_show_fdinfo(struct seq_file *m, struct file *f) ...@@ -9313,7 +9230,6 @@ static void io_uring_show_fdinfo(struct seq_file *m, struct file *f)
static const struct file_operations io_uring_fops = { static const struct file_operations io_uring_fops = {
.release = io_uring_release, .release = io_uring_release,
.flush = io_uring_flush,
.mmap = io_uring_mmap, .mmap = io_uring_mmap,
#ifndef CONFIG_MMU #ifndef CONFIG_MMU
.get_unmapped_area = io_uring_nommu_get_unmapped_area, .get_unmapped_area = io_uring_nommu_get_unmapped_area,
...@@ -9468,7 +9384,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, ...@@ -9468,7 +9384,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
ctx->compat = in_compat_syscall(); ctx->compat = in_compat_syscall();
if (!capable(CAP_IPC_LOCK)) if (!capable(CAP_IPC_LOCK))
ctx->user = get_uid(current_user()); ctx->user = get_uid(current_user());
ctx->sqo_task = current;
/* /*
* This is just grabbed for accounting purposes. When a process exits, * This is just grabbed for accounting purposes. When a process exits,
...@@ -9531,7 +9446,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, ...@@ -9531,7 +9446,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
*/ */
ret = io_uring_install_fd(ctx, file); ret = io_uring_install_fd(ctx, file);
if (ret < 0) { if (ret < 0) {
io_disable_sqo_submit(ctx);
/* fput will clean it up */ /* fput will clean it up */
fput(file); fput(file);
return ret; return ret;
...@@ -9540,7 +9454,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, ...@@ -9540,7 +9454,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p,
trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags); trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
return ret; return ret;
err: err:
io_disable_sqo_submit(ctx);
io_ring_ctx_wait_and_kill(ctx); io_ring_ctx_wait_and_kill(ctx);
return ret; return ret;
} }
...@@ -9708,10 +9621,7 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx) ...@@ -9708,10 +9621,7 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx)
if (ctx->restrictions.registered) if (ctx->restrictions.registered)
ctx->restricted = 1; ctx->restricted = 1;
ctx->flags &= ~IORING_SETUP_R_DISABLED;
io_sq_offload_start(ctx); io_sq_offload_start(ctx);
return 0; return 0;
} }
......
...@@ -38,7 +38,7 @@ void __io_uring_free(struct task_struct *tsk); ...@@ -38,7 +38,7 @@ void __io_uring_free(struct task_struct *tsk);
static inline void io_uring_task_cancel(void) static inline void io_uring_task_cancel(void)
{ {
if (current->io_uring && !xa_empty(&current->io_uring->xa)) if (current->io_uring)
__io_uring_task_cancel(); __io_uring_task_cancel();
} }
static inline void io_uring_files_cancel(struct files_struct *files) static inline void io_uring_files_cancel(struct files_struct *files)
......
...@@ -31,6 +31,7 @@ struct kernel_clone_args { ...@@ -31,6 +31,7 @@ struct kernel_clone_args {
/* Number of elements in *set_tid */ /* Number of elements in *set_tid */
size_t set_tid_size; size_t set_tid_size;
int cgroup; int cgroup;
int io_thread;
struct cgroup *cgrp; struct cgroup *cgrp;
struct css_set *cset; struct css_set *cset;
}; };
...@@ -82,6 +83,7 @@ extern void exit_files(struct task_struct *); ...@@ -82,6 +83,7 @@ extern void exit_files(struct task_struct *);
extern void exit_itimers(struct signal_struct *); extern void exit_itimers(struct signal_struct *);
extern pid_t kernel_clone(struct kernel_clone_args *kargs); extern pid_t kernel_clone(struct kernel_clone_args *kargs);
struct task_struct *create_io_thread(int (*fn)(void *), void *arg, int node);
struct task_struct *fork_idle(int); struct task_struct *fork_idle(int);
struct mm_struct *copy_init_mm(void); struct mm_struct *copy_init_mm(void);
extern pid_t kernel_thread(int (*fn)(void *), void *arg, unsigned long flags); extern pid_t kernel_thread(int (*fn)(void *), void *arg, unsigned long flags);
......
...@@ -1940,6 +1940,8 @@ static __latent_entropy struct task_struct *copy_process( ...@@ -1940,6 +1940,8 @@ static __latent_entropy struct task_struct *copy_process(
p = dup_task_struct(current, node); p = dup_task_struct(current, node);
if (!p) if (!p)
goto fork_out; goto fork_out;
if (args->io_thread)
p->flags |= PF_IO_WORKER;
/* /*
* This _must_ happen before we call free_task(), i.e. before we jump * This _must_ happen before we call free_task(), i.e. before we jump
...@@ -2410,6 +2412,34 @@ struct mm_struct *copy_init_mm(void) ...@@ -2410,6 +2412,34 @@ struct mm_struct *copy_init_mm(void)
return dup_mm(NULL, &init_mm); return dup_mm(NULL, &init_mm);
} }
/*
* This is like kernel_clone(), but shaved down and tailored to just
* creating io_uring workers. It returns a created task, or an error pointer.
* The returned task is inactive, and the caller must fire it up through
* wake_up_new_task(p). All signals are blocked in the created task.
*/
struct task_struct *create_io_thread(int (*fn)(void *), void *arg, int node)
{
unsigned long flags = CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|
CLONE_IO;
struct kernel_clone_args args = {
.flags = ((lower_32_bits(flags) | CLONE_VM |
CLONE_UNTRACED) & ~CSIGNAL),
.exit_signal = (lower_32_bits(flags) & CSIGNAL),
.stack = (unsigned long)fn,
.stack_size = (unsigned long)arg,
.io_thread = 1,
};
struct task_struct *tsk;
tsk = copy_process(NULL, 0, node, &args);
if (!IS_ERR(tsk)) {
sigfillset(&tsk->blocked);
sigdelsetmask(&tsk->blocked, sigmask(SIGKILL));
}
return tsk;
}
/* /*
* Ok, this is the main fork-routine. * Ok, this is the main fork-routine.
* *
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment