Commit f66f7342 authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe

io_uring: skip spinlocking for ->task_complete

->task_complete was added to serialised CQE posting by doing it from
the task context only (or fallback wq when the task is dead), and now we
can use that to avoid taking ->completion_lock while filling CQ entries.
The patch skips spinlocking only in two spots,
__io_submit_flush_completions() and flushing in io_aux_cqe, it's safer
and covers all cases we care about. Extra care is taken to force taking
the lock while queueing overflow entries.

It fundamentally relies on SINGLE_ISSUER to have only one task posting
events. It also need to take into account overflowed CQEs, flushing of
which happens in the cq wait path, and so this implementation also needs
DEFER_TASKRUN to limit waiters. For the same reason we disable it for
SQPOLL, and for IOPOLL as it won't benefit from it in any case.
DEFER_TASKRUN, SQPOLL and IOPOLL requirement may be relaxed in the
future.
Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/2a8c91fd82cfcdcc1d2e5bac7051fe2c183bda73.1670384893.git.asml.silence@gmail.com
[axboe: modify to apply]
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 6d043ee1
...@@ -584,13 +584,25 @@ void __io_commit_cqring_flush(struct io_ring_ctx *ctx) ...@@ -584,13 +584,25 @@ void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
io_eventfd_flush_signal(ctx); io_eventfd_flush_signal(ctx);
} }
static inline void __io_cq_lock(struct io_ring_ctx *ctx)
__acquires(ctx->completion_lock)
{
if (!ctx->task_complete)
spin_lock(&ctx->completion_lock);
}
static inline void __io_cq_unlock(struct io_ring_ctx *ctx)
{
if (!ctx->task_complete)
spin_unlock(&ctx->completion_lock);
}
/* keep it inlined for io_submit_flush_completions() */ /* keep it inlined for io_submit_flush_completions() */
static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx) static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
__releases(ctx->completion_lock) __releases(ctx->completion_lock)
{ {
io_commit_cqring(ctx); io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock); __io_cq_unlock(ctx);
io_commit_cqring_flush(ctx); io_commit_cqring_flush(ctx);
io_cqring_wake(ctx); io_cqring_wake(ctx);
} }
...@@ -598,7 +610,10 @@ static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx) ...@@ -598,7 +610,10 @@ static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx)
void io_cq_unlock_post(struct io_ring_ctx *ctx) void io_cq_unlock_post(struct io_ring_ctx *ctx)
__releases(ctx->completion_lock) __releases(ctx->completion_lock)
{ {
io_cq_unlock_post_inline(ctx); io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
io_commit_cqring_flush(ctx);
io_cqring_wake(ctx);
} }
/* Returns true if there are no backlogged entries after the flush */ /* Returns true if there are no backlogged entries after the flush */
...@@ -785,12 +800,13 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow) ...@@ -785,12 +800,13 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow)
return &rings->cqes[off]; return &rings->cqes[off];
} }
static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
bool allow_overflow) u32 cflags)
{ {
struct io_uring_cqe *cqe; struct io_uring_cqe *cqe;
lockdep_assert_held(&ctx->completion_lock); if (!ctx->task_complete)
lockdep_assert_held(&ctx->completion_lock);
ctx->cq_extra++; ctx->cq_extra++;
...@@ -813,10 +829,6 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 ...@@ -813,10 +829,6 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32
} }
return true; return true;
} }
if (allow_overflow)
return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
return false; return false;
} }
...@@ -830,7 +842,17 @@ static void __io_flush_post_cqes(struct io_ring_ctx *ctx) ...@@ -830,7 +842,17 @@ static void __io_flush_post_cqes(struct io_ring_ctx *ctx)
for (i = 0; i < state->cqes_count; i++) { for (i = 0; i < state->cqes_count; i++) {
struct io_uring_cqe *cqe = &state->cqes[i]; struct io_uring_cqe *cqe = &state->cqes[i];
io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags, true); if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) {
if (ctx->task_complete) {
spin_lock(&ctx->completion_lock);
io_cqring_event_overflow(ctx, cqe->user_data,
cqe->res, cqe->flags, 0, 0);
spin_unlock(&ctx->completion_lock);
} else {
io_cqring_event_overflow(ctx, cqe->user_data,
cqe->res, cqe->flags, 0, 0);
}
}
} }
state->cqes_count = 0; state->cqes_count = 0;
} }
...@@ -841,7 +863,10 @@ static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u ...@@ -841,7 +863,10 @@ static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u
bool filled; bool filled;
io_cq_lock(ctx); io_cq_lock(ctx);
filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow); filled = io_fill_cqe_aux(ctx, user_data, res, cflags);
if (!filled && allow_overflow)
filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
io_cq_unlock_post(ctx); io_cq_unlock_post(ctx);
return filled; return filled;
} }
...@@ -865,10 +890,10 @@ bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 ...@@ -865,10 +890,10 @@ bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32
lockdep_assert_held(&ctx->uring_lock); lockdep_assert_held(&ctx->uring_lock);
if (ctx->submit_state.cqes_count == length) { if (ctx->submit_state.cqes_count == length) {
io_cq_lock(ctx); __io_cq_lock(ctx);
__io_flush_post_cqes(ctx); __io_flush_post_cqes(ctx);
/* no need to flush - flush is deferred */ /* no need to flush - flush is deferred */
io_cq_unlock(ctx); __io_cq_unlock_post(ctx);
} }
/* For defered completions this is not as strict as it is otherwise, /* For defered completions this is not as strict as it is otherwise,
...@@ -1403,7 +1428,7 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) ...@@ -1403,7 +1428,7 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
struct io_wq_work_node *node, *prev; struct io_wq_work_node *node, *prev;
struct io_submit_state *state = &ctx->submit_state; struct io_submit_state *state = &ctx->submit_state;
io_cq_lock(ctx); __io_cq_lock(ctx);
/* must come first to preserve CQE ordering in failure cases */ /* must come first to preserve CQE ordering in failure cases */
if (state->cqes_count) if (state->cqes_count)
__io_flush_post_cqes(ctx); __io_flush_post_cqes(ctx);
...@@ -1411,10 +1436,18 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) ...@@ -1411,10 +1436,18 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
struct io_kiocb *req = container_of(node, struct io_kiocb, struct io_kiocb *req = container_of(node, struct io_kiocb,
comp_list); comp_list);
if (!(req->flags & REQ_F_CQE_SKIP)) if (!(req->flags & REQ_F_CQE_SKIP) &&
__io_fill_cqe_req(ctx, req); unlikely(!__io_fill_cqe_req(ctx, req))) {
if (ctx->task_complete) {
spin_lock(&ctx->completion_lock);
io_req_cqe_overflow(req);
spin_unlock(&ctx->completion_lock);
} else {
io_req_cqe_overflow(req);
}
}
} }
io_cq_unlock_post_inline(ctx); __io_cq_unlock_post(ctx);
if (!wq_list_empty(&ctx->submit_state.compl_reqs)) { if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
io_free_batch_list(ctx, state->compl_reqs.first); io_free_batch_list(ctx, state->compl_reqs.first);
......
...@@ -133,7 +133,7 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx, ...@@ -133,7 +133,7 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
*/ */
cqe = io_get_cqe(ctx); cqe = io_get_cqe(ctx);
if (unlikely(!cqe)) if (unlikely(!cqe))
return io_req_cqe_overflow(req); return false;
trace_io_uring_complete(req->ctx, req, req->cqe.user_data, trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
req->cqe.res, req->cqe.flags, req->cqe.res, req->cqe.flags,
...@@ -156,6 +156,14 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx, ...@@ -156,6 +156,14 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
return true; return true;
} }
static inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
struct io_kiocb *req)
{
if (likely(__io_fill_cqe_req(ctx, req)))
return true;
return io_req_cqe_overflow(req);
}
static inline void req_set_fail(struct io_kiocb *req) static inline void req_set_fail(struct io_kiocb *req)
{ {
req->flags |= REQ_F_FAIL; req->flags |= REQ_F_FAIL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment