Commit aa1df3a3 authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe

io_uring: fix CQE reordering

Overflowing CQEs may result in reordering, which is buggy in case of
links, F_MORE and so on. If we guarantee that we don't reorder for
the unlikely event of a CQ ring overflow, then we can further extend
this to not have to terminate multishot requests if it happens. For
other operations, like zerocopy sends, we have no choice but to honor
CQE ordering.
Reported-by: default avatarDylan Yudaken <dylany@fb.com>
Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/ec3bc55687b0768bbe20fb62d7d06cfced7d7e70.1663892031.git.asml.silence@gmail.comSigned-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent a75155fa
......@@ -609,7 +609,7 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
io_cq_lock(ctx);
while (!list_empty(&ctx->cq_overflow_list)) {
struct io_uring_cqe *cqe = io_get_cqe(ctx);
struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true);
struct io_overflow_cqe *ocqe;
if (!cqe && !force)
......@@ -736,12 +736,19 @@ bool io_req_cqe_overflow(struct io_kiocb *req)
* control dependency is enough as we're using WRITE_ONCE to
* fill the cq entry
*/
struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx)
struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow)
{
struct io_rings *rings = ctx->rings;
unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
unsigned int free, queued, len;
/*
* Posting into the CQ when there are pending overflowed CQEs may break
* ordering guarantees, which will affect links, F_MORE users and more.
* Force overflow the completion.
*/
if (!overflow && (ctx->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)))
return NULL;
/* userspace may cheat modifying the tail, be safe and do min */
queued = min(__io_cqring_events(ctx), ctx->cq_entries);
......@@ -2394,6 +2401,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
if (ret < 0)
return ret;
io_cqring_overflow_flush(ctx);
if (io_cqring_events(ctx) >= min_events)
return 0;
} while (ret > 0);
......
......@@ -24,7 +24,7 @@ enum {
IOU_STOP_MULTISHOT = -ECANCELED,
};
struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx);
struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow);
bool io_req_cqe_overflow(struct io_kiocb *req);
int io_run_task_work_sig(struct io_ring_ctx *ctx);
int __io_run_local_work(struct io_ring_ctx *ctx, bool locked);
......@@ -93,7 +93,8 @@ static inline void io_cq_lock(struct io_ring_ctx *ctx)
void io_cq_unlock_post(struct io_ring_ctx *ctx);
static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
static inline struct io_uring_cqe *io_get_cqe_overflow(struct io_ring_ctx *ctx,
bool overflow)
{
if (likely(ctx->cqe_cached < ctx->cqe_sentinel)) {
struct io_uring_cqe *cqe = ctx->cqe_cached;
......@@ -105,7 +106,12 @@ static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
return cqe;
}
return __io_get_cqe(ctx);
return __io_get_cqe(ctx, overflow);
}
static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
{
return io_get_cqe_overflow(ctx, false);
}
static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment