Commit 27131549 authored by Jens Axboe's avatar Jens Axboe

Merge branch 'for-5.12/io_uring' into io_uring-worker.v3

* for-5.12/io_uring: (21 commits)
  io_uring: run task_work on io_uring_register()
  io_uring: fix leaving invalid req->flags
  io_uring: wait potential ->release() on resurrect
  io_uring: keep generic rsrc infra generic
  io_uring: zero ref_node after killing it
  io_uring: make the !CONFIG_NET helpers a bit more robust
  io_uring: don't hold uring_lock when calling io_run_task_work*
  io_uring: fail io-wq submission from a task_work
  io_uring: don't take uring_lock during iowq cancel
  io_uring: fail links more in io_submit_sqe()
  io_uring: don't do async setup for links' heads
  io_uring: do io_*_prep() early in io_submit_sqe()
  io_uring: split sqe-prep and async setup
  io_uring: don't submit link on error
  io_uring: move req link into submit_state
  io_uring: move io_init_req() into io_submit_sqe()
  io_uring: move io_init_req()'s definition
  io_uring: don't duplicate ->file check in sfr
  io_uring: keep io_*_prep() naming consistent
  io_uring: kill fictitious submit iteration index
  ...
parents d99676af b6c23dd5
......@@ -104,6 +104,10 @@
#define IORING_MAX_RESTRICTIONS (IORING_RESTRICTION_LAST + \
IORING_REGISTER_LAST + IORING_OP_LAST)
#define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
IOSQE_IO_HARDLINK | IOSQE_ASYNC | \
IOSQE_BUFFER_SELECT)
struct io_uring {
u32 head ____cacheline_aligned_in_smp;
u32 tail ____cacheline_aligned_in_smp;
......@@ -232,6 +236,7 @@ struct fixed_rsrc_data {
struct fixed_rsrc_ref_node *node;
struct percpu_ref refs;
struct completion done;
bool quiesce;
};
struct io_buffer {
......@@ -279,8 +284,14 @@ struct io_comp_state {
struct list_head locked_free_list;
};
struct io_submit_link {
struct io_kiocb *head;
struct io_kiocb *last;
};
struct io_submit_state {
struct blk_plug plug;
struct io_submit_link link;
/*
* io_kiocb alloc cache
......@@ -1028,8 +1039,7 @@ static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
static void destroy_fixed_rsrc_ref_node(struct fixed_rsrc_ref_node *ref_node);
static struct fixed_rsrc_ref_node *alloc_fixed_rsrc_ref_node(
struct io_ring_ctx *ctx);
static void init_fixed_file_ref_node(struct io_ring_ctx *ctx,
struct fixed_rsrc_ref_node *ref_node);
static void io_ring_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc);
static bool io_rw_reissue(struct io_kiocb *req);
static void io_cqring_fill_event(struct io_kiocb *req, long res);
......@@ -1096,6 +1106,21 @@ static inline void io_set_resource_node(struct io_kiocb *req)
}
}
static bool io_refs_resurrect(struct percpu_ref *ref, struct completion *compl)
{
if (!percpu_ref_tryget(ref)) {
/* already at zero, wait for ->release() */
if (!try_wait_for_completion(compl))
synchronize_rcu();
return false;
}
percpu_ref_resurrect(ref);
reinit_completion(compl);
percpu_ref_put(ref);
return true;
}
static bool io_match_task(struct io_kiocb *head,
struct task_struct *task,
struct files_struct *files)
......@@ -2329,7 +2354,9 @@ static void io_req_task_cancel(struct callback_head *cb)
struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
struct io_ring_ctx *ctx = req->ctx;
__io_req_task_cancel(req, -ECANCELED);
mutex_lock(&ctx->uring_lock);
__io_req_task_cancel(req, req->result);
mutex_unlock(&ctx->uring_lock);
percpu_ref_put(&ctx->refs);
}
......@@ -2364,11 +2391,22 @@ static void io_req_task_queue(struct io_kiocb *req)
req->task_work.func = io_req_task_submit;
ret = io_req_task_work_add(req);
if (unlikely(ret)) {
req->result = -ECANCELED;
percpu_ref_get(&req->ctx->refs);
io_req_task_work_add_fallback(req, io_req_task_cancel);
}
}
static void io_req_task_queue_fail(struct io_kiocb *req, int ret)
{
percpu_ref_get(&req->ctx->refs);
req->result = ret;
req->task_work.func = io_req_task_cancel;
if (unlikely(io_req_task_work_add(req)))
io_req_task_work_add_fallback(req, io_req_task_cancel);
}
static inline void io_queue_next(struct io_kiocb *req)
{
struct io_kiocb *nxt = io_req_find_next(req);
......@@ -3467,19 +3505,9 @@ static inline int io_rw_prep_async(struct io_kiocb *req, int rw)
static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
ssize_t ret;
ret = io_prep_rw(req, sqe);
if (ret)
return ret;
if (unlikely(!(req->file->f_mode & FMODE_READ)))
return -EBADF;
/* either don't need iovec imported or already have it */
if (!req->async_data)
return 0;
return io_rw_prep_async(req, READ);
return io_prep_rw(req, sqe);
}
/*
......@@ -3607,10 +3635,7 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags)
ret = io_iter_do_read(req, iter);
if (ret == -EIOCBQUEUED) {
/* it's faster to check here then delegate to kfree */
if (iovec)
kfree(iovec);
return 0;
goto out_free;
} else if (ret == -EAGAIN) {
/* IOPOLL retry should happen for io-wq threads */
if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL))
......@@ -3631,6 +3656,7 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags)
if (ret2)
return ret2;
iovec = NULL;
rw = req->async_data;
/* now use our persistent iterator, if we aren't already */
iter = &rw->iter;
......@@ -3657,24 +3683,18 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags)
} while (ret > 0 && ret < io_size);
done:
kiocb_done(kiocb, ret, issue_flags);
out_free:
/* it's faster to check here then delegate to kfree */
if (iovec)
kfree(iovec);
return 0;
}
static int io_write_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
ssize_t ret;
ret = io_prep_rw(req, sqe);
if (ret)
return ret;
if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
return -EBADF;
/* either don't need iovec imported or already have it */
if (!req->async_data)
return 0;
return io_rw_prep_async(req, WRITE);
return io_prep_rw(req, sqe);
}
static int io_write(struct io_kiocb *req, unsigned int issue_flags)
......@@ -4011,7 +4031,7 @@ static int io_nop(struct io_kiocb *req, unsigned int issue_flags)
return 0;
}
static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
static int io_fsync_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_ring_ctx *ctx = req->ctx;
......@@ -4598,13 +4618,10 @@ static int io_close(struct io_kiocb *req, unsigned int issue_flags)
return 0;
}
static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
static int io_sfr_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_ring_ctx *ctx = req->ctx;
if (!req->file)
return -EBADF;
if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
return -EINVAL;
if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
......@@ -4664,11 +4681,21 @@ static int io_sendmsg_copy_hdr(struct io_kiocb *req,
req->sr_msg.msg_flags, &iomsg->free_iov);
}
static int io_sendmsg_prep_async(struct io_kiocb *req)
{
int ret;
if (!io_op_defs[req->opcode].needs_async_data)
return 0;
ret = io_sendmsg_copy_hdr(req, req->async_data);
if (!ret)
req->flags |= REQ_F_NEED_CLEANUP;
return ret;
}
static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_async_msghdr *async_msg = req->async_data;
struct io_sr_msg *sr = &req->sr_msg;
int ret;
if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
return -EINVAL;
......@@ -4681,13 +4708,7 @@ static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
if (req->ctx->compat)
sr->msg_flags |= MSG_CMSG_COMPAT;
#endif
if (!async_msg || !io_op_defs[req->opcode].needs_async_data)
return 0;
ret = io_sendmsg_copy_hdr(req, async_msg);
if (!ret)
req->flags |= REQ_F_NEED_CLEANUP;
return ret;
return 0;
}
static int io_sendmsg(struct io_kiocb *req, unsigned int issue_flags)
......@@ -4881,13 +4902,22 @@ static inline unsigned int io_put_recv_kbuf(struct io_kiocb *req)
return io_put_kbuf(req, req->sr_msg.kbuf);
}
static int io_recvmsg_prep(struct io_kiocb *req,
const struct io_uring_sqe *sqe)
static int io_recvmsg_prep_async(struct io_kiocb *req)
{
struct io_async_msghdr *async_msg = req->async_data;
struct io_sr_msg *sr = &req->sr_msg;
int ret;
if (!io_op_defs[req->opcode].needs_async_data)
return 0;
ret = io_recvmsg_copy_hdr(req, req->async_data);
if (!ret)
req->flags |= REQ_F_NEED_CLEANUP;
return ret;
}
static int io_recvmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_sr_msg *sr = &req->sr_msg;
if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
return -EINVAL;
......@@ -4900,13 +4930,7 @@ static int io_recvmsg_prep(struct io_kiocb *req,
if (req->ctx->compat)
sr->msg_flags |= MSG_CMSG_COMPAT;
#endif
if (!async_msg || !io_op_defs[req->opcode].needs_async_data)
return 0;
ret = io_recvmsg_copy_hdr(req, async_msg);
if (!ret)
req->flags |= REQ_F_NEED_CLEANUP;
return ret;
return 0;
}
static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
......@@ -5059,10 +5083,17 @@ static int io_accept(struct io_kiocb *req, unsigned int issue_flags)
return 0;
}
static int io_connect_prep_async(struct io_kiocb *req)
{
struct io_async_connect *io = req->async_data;
struct io_connect *conn = &req->connect;
return move_addr_to_kernel(conn->addr, conn->addr_len, &io->address);
}
static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_connect *conn = &req->connect;
struct io_async_connect *io = req->async_data;
if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
return -EINVAL;
......@@ -5071,12 +5102,7 @@ static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
conn->addr = u64_to_user_ptr(READ_ONCE(sqe->addr));
conn->addr_len = READ_ONCE(sqe->addr2);
if (!io)
return 0;
return move_addr_to_kernel(conn->addr, conn->addr_len,
&io->address);
return 0;
}
static int io_connect(struct io_kiocb *req, unsigned int issue_flags)
......@@ -5121,56 +5147,32 @@ static int io_connect(struct io_kiocb *req, unsigned int issue_flags)
return 0;
}
#else /* !CONFIG_NET */
static int io_sendmsg_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
return -EOPNOTSUPP;
}
static int io_sendmsg(struct io_kiocb *req, unsigned int issue_flags)
{
return -EOPNOTSUPP;
}
static int io_send(struct io_kiocb *req, unsigned int issue_flags)
{
return -EOPNOTSUPP;
}
static int io_recvmsg_prep(struct io_kiocb *req,
const struct io_uring_sqe *sqe)
{
return -EOPNOTSUPP;
}
static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
{
return -EOPNOTSUPP;
}
static int io_recv(struct io_kiocb *req, unsigned int issue_flags)
{
return -EOPNOTSUPP;
}
static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
return -EOPNOTSUPP;
}
static int io_accept(struct io_kiocb *req, unsigned int issue_flags)
{
return -EOPNOTSUPP;
}
static int io_connect_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
return -EOPNOTSUPP;
}
static int io_connect(struct io_kiocb *req, unsigned int issue_flags)
{
return -EOPNOTSUPP;
}
#define IO_NETOP_FN(op) \
static int io_##op(struct io_kiocb *req, unsigned int issue_flags) \
{ \
return -EOPNOTSUPP; \
}
#define IO_NETOP_PREP(op) \
IO_NETOP_FN(op) \
static int io_##op##_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) \
{ \
return -EOPNOTSUPP; \
} \
#define IO_NETOP_PREP_ASYNC(op) \
IO_NETOP_PREP(op) \
static int io_##op##_prep_async(struct io_kiocb *req) \
{ \
return -EOPNOTSUPP; \
}
IO_NETOP_PREP_ASYNC(sendmsg);
IO_NETOP_PREP_ASYNC(recvmsg);
IO_NETOP_PREP_ASYNC(connect);
IO_NETOP_PREP(accept);
IO_NETOP_FN(send);
IO_NETOP_FN(recv);
#endif /* CONFIG_NET */
struct io_poll_table {
......@@ -6084,9 +6086,9 @@ static int io_req_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
case IORING_OP_POLL_REMOVE:
return io_poll_remove_prep(req, sqe);
case IORING_OP_FSYNC:
return io_prep_fsync(req, sqe);
return io_fsync_prep(req, sqe);
case IORING_OP_SYNC_FILE_RANGE:
return io_prep_sfr(req, sqe);
return io_sfr_prep(req, sqe);
case IORING_OP_SENDMSG:
case IORING_OP_SEND:
return io_sendmsg_prep(req, sqe);
......@@ -6144,14 +6146,39 @@ static int io_req_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return-EINVAL;
}
static int io_req_defer_prep(struct io_kiocb *req,
const struct io_uring_sqe *sqe)
static int io_req_prep_async(struct io_kiocb *req)
{
if (!sqe)
switch (req->opcode) {
case IORING_OP_READV:
case IORING_OP_READ_FIXED:
case IORING_OP_READ:
return io_rw_prep_async(req, READ);
case IORING_OP_WRITEV:
case IORING_OP_WRITE_FIXED:
case IORING_OP_WRITE:
return io_rw_prep_async(req, WRITE);
case IORING_OP_SENDMSG:
case IORING_OP_SEND:
return io_sendmsg_prep_async(req);
case IORING_OP_RECVMSG:
case IORING_OP_RECV:
return io_recvmsg_prep_async(req);
case IORING_OP_CONNECT:
return io_connect_prep_async(req);
}
return 0;
}
static int io_req_defer_prep(struct io_kiocb *req)
{
if (!io_op_defs[req->opcode].needs_async_data)
return 0;
if (io_alloc_async_data(req))
/* some opcodes init it during the inital prep */
if (req->async_data)
return 0;
if (__io_alloc_async_data(req))
return -EAGAIN;
return io_req_prep(req, sqe);
return io_req_prep_async(req);
}
static u32 io_get_sequence(struct io_kiocb *req)
......@@ -6167,7 +6194,7 @@ static u32 io_get_sequence(struct io_kiocb *req)
return total_submitted - nr_reqs;
}
static int io_req_defer(struct io_kiocb *req, const struct io_uring_sqe *sqe)
static int io_req_defer(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
struct io_defer_entry *de;
......@@ -6184,11 +6211,9 @@ static int io_req_defer(struct io_kiocb *req, const struct io_uring_sqe *sqe)
if (!req_need_defer(req, seq) && list_empty_careful(&ctx->defer_list))
return 0;
if (!req->async_data) {
ret = io_req_defer_prep(req, sqe);
if (ret)
return ret;
}
ret = io_req_defer_prep(req);
if (ret)
return ret;
io_prep_async_link(req);
de = kmalloc(sizeof(*de), GFP_KERNEL);
if (!de)
......@@ -6427,29 +6452,11 @@ static void io_wq_submit_work(struct io_wq_work *work)
} while (1);
}
/* avoid locking problems by failing it from a clean context */
if (ret) {
struct io_ring_ctx *lock_ctx = NULL;
if (req->ctx->flags & IORING_SETUP_IOPOLL)
lock_ctx = req->ctx;
/*
* io_iopoll_complete() does not hold completion_lock to
* complete polled io, so here for polled io, we can not call
* io_req_complete() directly, otherwise there maybe concurrent
* access to cqring, defer_list, etc, which is not safe. Given
* that io_iopoll_complete() is always called under uring_lock,
* so here for polled io, we also get uring_lock to complete
* it.
*/
if (lock_ctx)
mutex_lock(&lock_ctx->uring_lock);
req_set_fail_links(req);
io_req_complete(req, ret);
if (lock_ctx)
mutex_unlock(&lock_ctx->uring_lock);
/* io-wq is going to take one down */
refcount_inc(&req->refs);
io_req_task_queue_fail(req, ret);
}
}
......@@ -6607,11 +6614,11 @@ static void __io_queue_sqe(struct io_kiocb *req)
io_queue_linked_timeout(linked_timeout);
}
static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
static void io_queue_sqe(struct io_kiocb *req)
{
int ret;
ret = io_req_defer(req, sqe);
ret = io_req_defer(req);
if (ret) {
if (ret != -EIOCBQUEUED) {
fail_req:
......@@ -6620,42 +6627,148 @@ static void io_queue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe)
io_req_complete(req, ret);
}
} else if (req->flags & REQ_F_FORCE_ASYNC) {
if (!req->async_data) {
ret = io_req_defer_prep(req, sqe);
if (unlikely(ret))
goto fail_req;
}
ret = io_req_defer_prep(req);
if (unlikely(ret))
goto fail_req;
io_queue_async_work(req);
} else {
if (sqe) {
ret = io_req_prep(req, sqe);
if (unlikely(ret))
goto fail_req;
}
__io_queue_sqe(req);
}
}
static inline void io_queue_link_head(struct io_kiocb *req)
/*
* Check SQE restrictions (opcode and flags).
*
* Returns 'true' if SQE is allowed, 'false' otherwise.
*/
static inline bool io_check_restriction(struct io_ring_ctx *ctx,
struct io_kiocb *req,
unsigned int sqe_flags)
{
if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
io_put_req(req);
io_req_complete(req, -ECANCELED);
} else
io_queue_sqe(req, NULL);
if (!ctx->restricted)
return true;
if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
return false;
if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
ctx->restrictions.sqe_flags_required)
return false;
if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
ctx->restrictions.sqe_flags_required))
return false;
return true;
}
struct io_submit_link {
struct io_kiocb *head;
struct io_kiocb *last;
};
static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
const struct io_uring_sqe *sqe)
{
struct io_submit_state *state;
unsigned int sqe_flags;
int id, ret = 0;
req->opcode = READ_ONCE(sqe->opcode);
/* same numerical values with corresponding REQ_F_*, safe to copy */
req->flags = sqe_flags = READ_ONCE(sqe->flags);
req->user_data = READ_ONCE(sqe->user_data);
req->async_data = NULL;
req->file = NULL;
req->ctx = ctx;
req->link = NULL;
req->fixed_rsrc_refs = NULL;
/* one is dropped after submission, the other at completion */
refcount_set(&req->refs, 2);
req->task = current;
req->result = 0;
static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
struct io_submit_link *link)
/* enforce forwards compatibility on users */
if (unlikely(sqe_flags & ~SQE_VALID_FLAGS)) {
req->flags = 0;
return -EINVAL;
}
if (unlikely(req->opcode >= IORING_OP_LAST))
return -EINVAL;
if (unlikely(io_sq_thread_acquire_mm_files(ctx, req)))
return -EFAULT;
if (unlikely(!io_check_restriction(ctx, req, sqe_flags)))
return -EACCES;
if ((sqe_flags & IOSQE_BUFFER_SELECT) &&
!io_op_defs[req->opcode].buffer_select)
return -EOPNOTSUPP;
id = READ_ONCE(sqe->personality);
if (id) {
struct io_identity *iod;
iod = idr_find(&ctx->personality_idr, id);
if (unlikely(!iod))
return -EINVAL;
refcount_inc(&iod->count);
__io_req_init_async(req);
get_cred(iod->creds);
req->work.identity = iod;
req->work.flags |= IO_WQ_WORK_CREDS;
}
state = &ctx->submit_state;
/*
* Plug now if we have more than 1 IO left after this, and the target
* is potentially a read/write to block based storage.
*/
if (!state->plug_started && state->ios_left > 1 &&
io_op_defs[req->opcode].plug) {
blk_start_plug(&state->plug);
state->plug_started = true;
}
if (io_op_defs[req->opcode].needs_file) {
bool fixed = req->flags & REQ_F_FIXED_FILE;
req->file = io_file_get(state, req, READ_ONCE(sqe->fd), fixed);
if (unlikely(!req->file))
ret = -EBADF;
}
state->ios_left--;
return ret;
}
static int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
const struct io_uring_sqe *sqe)
{
struct io_ring_ctx *ctx = req->ctx;
struct io_submit_link *link = &ctx->submit_state.link;
int ret;
ret = io_init_req(ctx, req, sqe);
if (unlikely(ret)) {
fail_req:
io_put_req(req);
io_req_complete(req, ret);
if (link->head) {
/* fail even hard links since we don't submit */
link->head->flags |= REQ_F_FAIL_LINK;
io_put_req(link->head);
io_req_complete(link->head, -ECANCELED);
link->head = NULL;
}
return ret;
}
ret = io_req_prep(req, sqe);
if (unlikely(ret))
goto fail_req;
/* don't need @sqe from now on */
trace_io_uring_submit_sqe(ctx, req->opcode, req->user_data,
true, ctx->flags & IORING_SETUP_SQPOLL);
/*
* If we already have a head request, queue this one for async
* submittal once the head completes. If we don't have a head but
......@@ -6677,19 +6790,16 @@ static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
head->flags |= REQ_F_IO_DRAIN;
ctx->drain_next = 1;
}
ret = io_req_defer_prep(req, sqe);
if (unlikely(ret)) {
/* fail even hard links since we don't submit */
head->flags |= REQ_F_FAIL_LINK;
return ret;
}
ret = io_req_defer_prep(req);
if (unlikely(ret))
goto fail_req;
trace_io_uring_link(ctx, req, head);
link->last->link = req;
link->last = req;
/* last request of a link, enqueue the link */
if (!(req->flags & (REQ_F_LINK | REQ_F_HARDLINK))) {
io_queue_link_head(head);
io_queue_sqe(head);
link->head = NULL;
}
} else {
......@@ -6698,13 +6808,10 @@ static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
ctx->drain_next = 0;
}
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) {
ret = io_req_defer_prep(req, sqe);
if (unlikely(ret))
req->flags |= REQ_F_FAIL_LINK;
link->head = req;
link->last = req;
} else {
io_queue_sqe(req, sqe);
io_queue_sqe(req);
}
}
......@@ -6717,6 +6824,8 @@ static int io_submit_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
static void io_submit_state_end(struct io_submit_state *state,
struct io_ring_ctx *ctx)
{
if (state->link.head)
io_queue_sqe(state->link.head);
if (state->comp.nr)
io_submit_flush_completions(&state->comp, ctx);
if (state->plug_started)
......@@ -6732,6 +6841,8 @@ static void io_submit_state_start(struct io_submit_state *state,
{
state->plug_started = false;
state->ios_left = max_ios;
/* set only head, no need to init link_last in advance */
state->link.head = NULL;
}
static void io_commit_sqring(struct io_ring_ctx *ctx)
......@@ -6777,117 +6888,9 @@ static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx)
return NULL;
}
/*
* Check SQE restrictions (opcode and flags).
*
* Returns 'true' if SQE is allowed, 'false' otherwise.
*/
static inline bool io_check_restriction(struct io_ring_ctx *ctx,
struct io_kiocb *req,
unsigned int sqe_flags)
{
if (!ctx->restricted)
return true;
if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
return false;
if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
ctx->restrictions.sqe_flags_required)
return false;
if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
ctx->restrictions.sqe_flags_required))
return false;
return true;
}
#define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
IOSQE_IO_HARDLINK | IOSQE_ASYNC | \
IOSQE_BUFFER_SELECT)
static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
const struct io_uring_sqe *sqe)
{
struct io_submit_state *state;
unsigned int sqe_flags;
int id, ret = 0;
req->opcode = READ_ONCE(sqe->opcode);
/* same numerical values with corresponding REQ_F_*, safe to copy */
req->flags = sqe_flags = READ_ONCE(sqe->flags);
req->user_data = READ_ONCE(sqe->user_data);
req->async_data = NULL;
req->file = NULL;
req->ctx = ctx;
req->link = NULL;
req->fixed_rsrc_refs = NULL;
/* one is dropped after submission, the other at completion */
refcount_set(&req->refs, 2);
req->task = current;
req->result = 0;
/* enforce forwards compatibility on users */
if (unlikely(sqe_flags & ~SQE_VALID_FLAGS))
return -EINVAL;
if (unlikely(req->opcode >= IORING_OP_LAST))
return -EINVAL;
if (unlikely(io_sq_thread_acquire_mm_files(ctx, req)))
return -EFAULT;
if (unlikely(!io_check_restriction(ctx, req, sqe_flags)))
return -EACCES;
if ((sqe_flags & IOSQE_BUFFER_SELECT) &&
!io_op_defs[req->opcode].buffer_select)
return -EOPNOTSUPP;
id = READ_ONCE(sqe->personality);
if (id) {
struct io_identity *iod;
iod = idr_find(&ctx->personality_idr, id);
if (unlikely(!iod))
return -EINVAL;
refcount_inc(&iod->count);
__io_req_init_async(req);
get_cred(iod->creds);
req->work.identity = iod;
req->work.flags |= IO_WQ_WORK_CREDS;
}
state = &ctx->submit_state;
/*
* Plug now if we have more than 1 IO left after this, and the target
* is potentially a read/write to block based storage.
*/
if (!state->plug_started && state->ios_left > 1 &&
io_op_defs[req->opcode].plug) {
blk_start_plug(&state->plug);
state->plug_started = true;
}
if (io_op_defs[req->opcode].needs_file) {
bool fixed = req->flags & REQ_F_FIXED_FILE;
req->file = io_file_get(state, req, READ_ONCE(sqe->fd), fixed);
if (unlikely(!req->file))
ret = -EBADF;
}
state->ios_left--;
return ret;
}
static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
{
struct io_submit_link link;
int i, submitted = 0;
int submitted = 0;
/* if we have a backlog and couldn't flush it all, return BUSY */
if (test_bit(0, &ctx->sq_check_overflow)) {
......@@ -6903,14 +6906,11 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
percpu_counter_add(&current->io_uring->inflight, nr);
refcount_add(nr, &current->usage);
io_submit_state_start(&ctx->submit_state, nr);
link.head = NULL;
for (i = 0; i < nr; i++) {
while (submitted < nr) {
const struct io_uring_sqe *sqe;
struct io_kiocb *req;
int err;
req = io_alloc_req(ctx);
if (unlikely(!req)) {
......@@ -6925,20 +6925,8 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
}
/* will complete beyond this point, count as submitted */
submitted++;
err = io_init_req(ctx, req, sqe);
if (unlikely(err)) {
fail_req:
io_put_req(req);
io_req_complete(req, err);
if (io_submit_sqe(ctx, req, sqe))
break;
}
trace_io_uring_submit_sqe(ctx, req->opcode, req->user_data,
true, ctx->flags & IORING_SETUP_SQPOLL);
err = io_submit_sqe(req, sqe, &link);
if (err)
goto fail_req;
}
if (unlikely(submitted != nr)) {
......@@ -6950,10 +6938,8 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
percpu_counter_sub(&tctx->inflight, unused);
put_task_struct_many(current, unused);
}
if (link.head)
io_queue_link_head(link.head);
io_submit_state_end(&ctx->submit_state, ctx);
io_submit_state_end(&ctx->submit_state, ctx);
/* Commit SQ ring head once we've consumed and submitted all SQEs */
io_commit_sqring(ctx);
......@@ -7328,38 +7314,57 @@ static void io_sqe_rsrc_set_node(struct io_ring_ctx *ctx,
percpu_ref_get(&rsrc_data->refs);
}
static int io_rsrc_ref_quiesce(struct fixed_rsrc_data *data,
struct io_ring_ctx *ctx,
struct fixed_rsrc_ref_node *backup_node)
static void io_sqe_rsrc_kill_node(struct io_ring_ctx *ctx, struct fixed_rsrc_data *data)
{
struct fixed_rsrc_ref_node *ref_node;
int ret;
struct fixed_rsrc_ref_node *ref_node = NULL;
io_rsrc_ref_lock(ctx);
ref_node = data->node;
data->node = NULL;
io_rsrc_ref_unlock(ctx);
if (ref_node)
percpu_ref_kill(&ref_node->refs);
}
percpu_ref_kill(&data->refs);
static int io_rsrc_ref_quiesce(struct fixed_rsrc_data *data,
struct io_ring_ctx *ctx,
void (*rsrc_put)(struct io_ring_ctx *ctx,
struct io_rsrc_put *prsrc))
{
struct fixed_rsrc_ref_node *backup_node;
int ret;
if (data->quiesce)
return -ENXIO;
/* wait for all refs nodes to complete */
flush_delayed_work(&ctx->rsrc_put_work);
data->quiesce = true;
do {
ret = -ENOMEM;
backup_node = alloc_fixed_rsrc_ref_node(ctx);
if (!backup_node)
break;
backup_node->rsrc_data = data;
backup_node->rsrc_put = rsrc_put;
io_sqe_rsrc_kill_node(ctx, data);
percpu_ref_kill(&data->refs);
flush_delayed_work(&ctx->rsrc_put_work);
ret = wait_for_completion_interruptible(&data->done);
if (!ret)
if (!ret || !io_refs_resurrect(&data->refs, &data->done))
break;
io_sqe_rsrc_set_node(ctx, data, backup_node);
backup_node = NULL;
mutex_unlock(&ctx->uring_lock);
ret = io_run_task_work_sig();
if (ret < 0) {
percpu_ref_resurrect(&data->refs);
reinit_completion(&data->done);
io_sqe_rsrc_set_node(ctx, data, backup_node);
return ret;
}
} while (1);
mutex_lock(&ctx->uring_lock);
} while (ret >= 0);
data->quiesce = false;
destroy_fixed_rsrc_ref_node(backup_node);
return 0;
if (backup_node)
destroy_fixed_rsrc_ref_node(backup_node);
return ret;
}
static struct fixed_rsrc_data *alloc_fixed_rsrc_data(struct io_ring_ctx *ctx)
......@@ -7390,18 +7395,17 @@ static void free_fixed_rsrc_data(struct fixed_rsrc_data *data)
static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
{
struct fixed_rsrc_data *data = ctx->file_data;
struct fixed_rsrc_ref_node *backup_node;
unsigned nr_tables, i;
int ret;
if (!data)
/*
* percpu_ref_is_dying() is to stop parallel files unregister
* Since we possibly drop uring lock later in this function to
* run task work.
*/
if (!data || percpu_ref_is_dying(&data->refs))
return -ENXIO;
backup_node = alloc_fixed_rsrc_ref_node(ctx);
if (!backup_node)
return -ENOMEM;
init_fixed_file_ref_node(ctx, backup_node);
ret = io_rsrc_ref_quiesce(data, ctx, backup_node);
ret = io_rsrc_ref_quiesce(data, ctx, io_ring_file_put);
if (ret)
return ret;
......@@ -8743,7 +8747,9 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
css_put(ctx->sqo_blkcg_css);
#endif
mutex_lock(&ctx->uring_lock);
io_sqe_files_unregister(ctx);
mutex_unlock(&ctx->uring_lock);
io_eventfd_unregister(ctx);
io_destroy_buffers(ctx);
idr_destroy(&ctx->personality_idr);
......@@ -10078,10 +10084,8 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
mutex_lock(&ctx->uring_lock);
if (ret) {
percpu_ref_resurrect(&ctx->refs);
goto out_quiesce;
}
if (ret && io_refs_resurrect(&ctx->refs, &ctx->ref_comp))
return ret;
}
if (ctx->restricted) {
......@@ -10173,7 +10177,6 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
if (io_register_op_must_quiesce(opcode)) {
/* bring the ctx back to life */
percpu_ref_reinit(&ctx->refs);
out_quiesce:
reinit_completion(&ctx->ref_comp);
}
return ret;
......@@ -10196,6 +10199,8 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
ctx = f.file->private_data;
io_run_task_work();
mutex_lock(&ctx->uring_lock);
ret = __io_uring_register(ctx, opcode, arg, nr_args);
mutex_unlock(&ctx->uring_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment