io_uring.c 105 KB
Newer Older
Jens Axboe's avatar
Jens Axboe committed
1 2 3 4 5 6
// SPDX-License-Identifier: GPL-2.0
/*
 * Shared application/kernel submission and completion ring pairs, for
 * supporting fast/efficient IO.
 *
 * A note on the read/write ordering memory barriers that are matched between
7 8 9 10 11 12 13
 * the application and kernel side.
 *
 * After the application reads the CQ ring tail, it must use an
 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
 * before writing the tail (using smp_load_acquire to read the tail will
 * do). It also needs a smp_mb() before updating CQ head (ordering the
 * entry load(s) with the head store), pairing with an implicit barrier
14
 * through a control-dependency in io_get_cqe (smp_store_release to
15 16 17 18 19 20 21 22 23 24 25 26 27 28
 * store head will do). Failure to do so could lead to reading invalid
 * CQ entries.
 *
 * Likewise, the application must use an appropriate smp_wmb() before
 * writing the SQ tail (ordering SQ entry stores with the tail store),
 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
 * to store the tail will do). And it needs a barrier ordering the SQ
 * head load before writing new SQ entries (smp_load_acquire to read
 * head will do).
 *
 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
 * updating the SQ tail; a full memory barrier smp_mb() is needed
 * between.
Jens Axboe's avatar
Jens Axboe committed
29 30 31 32 33 34 35 36 37 38 39
 *
 * Also see the examples in the liburing library:
 *
 *	git://git.kernel.dk/liburing
 *
 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
 * from data shared between the kernel and application. This is done both
 * for ordering purposes, but also to ensure that once a value is loaded from
 * data that the application could potentially modify, it remains stable.
 *
 * Copyright (C) 2018-2019 Jens Axboe
40
 * Copyright (c) 2018-2019 Christoph Hellwig
Jens Axboe's avatar
Jens Axboe committed
41 42 43 44 45
 */
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/syscalls.h>
46
#include <net/compat.h>
Jens Axboe's avatar
Jens Axboe committed
47 48
#include <linux/refcount.h>
#include <linux/uio.h>
49
#include <linux/bits.h>
Jens Axboe's avatar
Jens Axboe committed
50 51 52 53 54 55 56 57 58

#include <linux/sched/signal.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/fdtable.h>
#include <linux/mm.h>
#include <linux/mman.h>
#include <linux/percpu.h>
#include <linux/slab.h>
59
#include <linux/bvec.h>
Jens Axboe's avatar
Jens Axboe committed
60 61 62
#include <linux/net.h>
#include <net/sock.h>
#include <net/af_unix.h>
63
#include <net/scm.h>
Jens Axboe's avatar
Jens Axboe committed
64 65 66 67
#include <linux/anon_inodes.h>
#include <linux/sched/mm.h>
#include <linux/uaccess.h>
#include <linux/nospec.h>
68
#include <linux/highmem.h>
69
#include <linux/fsnotify.h>
70
#include <linux/fadvise.h>
71
#include <linux/task_work.h>
72
#include <linux/io_uring.h>
73
#include <linux/audit.h>
74
#include <linux/security.h>
Jens Axboe's avatar
Jens Axboe committed
75

76 77 78
#define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h>

Jens Axboe's avatar
Jens Axboe committed
79 80
#include <uapi/linux/io_uring.h>

81
#include "io-wq.h"
Jens Axboe's avatar
Jens Axboe committed
82

83
#include "io_uring.h"
84
#include "opdef.h"
85
#include "refs.h"
86
#include "tctx.h"
87
#include "sqpoll.h"
88
#include "fdinfo.h"
89
#include "kbuf.h"
90
#include "rsrc.h"
91
#include "cancel.h"
Jens Axboe's avatar
Jens Axboe committed
92
#include "net.h"
93
#include "notif.h"
94

95
#include "timeout.h"
96
#include "poll.h"
97
#include "alloc_cache.h"
98

99
#define IORING_MAX_ENTRIES	32768
100
#define IORING_MAX_CQ_ENTRIES	(2 * IORING_MAX_ENTRIES)
101

102 103
#define IORING_MAX_RESTRICTIONS	(IORING_RESTRICTION_LAST + \
				 IORING_REGISTER_LAST + IORING_OP_LAST)
Jens Axboe's avatar
Jens Axboe committed
104

105 106 107
#define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \
			  IOSQE_IO_HARDLINK | IOSQE_ASYNC)

108 109
#define SQE_VALID_FLAGS	(SQE_COMMON_FLAGS | IOSQE_BUFFER_SELECT | \
			IOSQE_IO_DRAIN | IOSQE_CQE_SKIP_SUCCESS)
110

111
#define IO_REQ_CLEAN_FLAGS (REQ_F_BUFFER_SELECTED | REQ_F_NEED_CLEANUP | \
112 113
				REQ_F_POLLED | REQ_F_INFLIGHT | REQ_F_CREDS | \
				REQ_F_ASYNC_DATA)
114

115 116 117
#define IO_REQ_CLEAN_SLOW_FLAGS (REQ_F_REFCOUNT | REQ_F_LINK | REQ_F_HARDLINK |\
				 IO_REQ_CLEAN_FLAGS)

118 119
#define IO_TCTX_REFS_CACHE_NR	(1U << 10)

120
#define IO_COMPL_BATCH			32
121
#define IO_REQ_ALLOC_BATCH		8
122

123 124
enum {
	IO_CHECK_CQ_OVERFLOW_BIT,
125
	IO_CHECK_CQ_DROPPED_BIT,
126 127
};

128 129 130 131 132
enum {
	IO_EVENTFD_OP_SIGNAL_BIT,
	IO_EVENTFD_OP_FREE_BIT,
};

133 134 135
struct io_defer_entry {
	struct list_head	list;
	struct io_kiocb		*req;
136
	u32			seq;
Jens Axboe's avatar
Jens Axboe committed
137 138
};

139 140
/* requests with any of those set should undergo io_disarm_next() */
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
141
#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
142

143
static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
144
					 struct task_struct *task,
145
					 bool cancel_all);
146

147
static void io_dismantle_req(struct io_kiocb *req);
148
static void io_clean_op(struct io_kiocb *req);
149
static void io_queue_sqe(struct io_kiocb *req);
150
static void io_move_task_work_from_local(struct io_ring_ctx *ctx);
151
static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
152

Jens Axboe's avatar
Jens Axboe committed
153 154 155 156 157
static struct kmem_cache *req_cachep;

struct sock *io_uring_get_socket(struct file *file)
{
#if defined(CONFIG_UNIX)
158
	if (io_is_uring_fops(file)) {
Jens Axboe's avatar
Jens Axboe committed
159 160 161 162 163 164 165 166 167
		struct io_ring_ctx *ctx = file->private_data;

		return ctx->ring_sock->sk;
	}
#endif
	return NULL;
}
EXPORT_SYMBOL(io_uring_get_socket);

168 169
static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
{
170
	if (!wq_list_empty(&ctx->submit_state.compl_reqs))
171 172 173
		__io_submit_flush_completions(ctx);
}

174 175 176 177 178
static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
{
	return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
}

179 180 181 182 183 184 185 186 187
static bool io_match_linked(struct io_kiocb *head)
{
	struct io_kiocb *req;

	io_for_each_link(req, head) {
		if (req->flags & REQ_F_INFLIGHT)
			return true;
	}
	return false;
188 189 190 191 192 193
}

/*
 * As io_match_task() but protected against racing with linked timeouts.
 * User must not hold timeout_lock.
 */
194 195
bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
			bool cancel_all)
196
{
197 198
	bool matched;

199 200
	if (task && head->task != task)
		return false;
201 202 203 204 205 206 207 208 209 210 211 212 213 214
	if (cancel_all)
		return true;

	if (head->flags & REQ_F_LINK_TIMEOUT) {
		struct io_ring_ctx *ctx = head->ctx;

		/* protect against races with linked timeouts */
		spin_lock_irq(&ctx->timeout_lock);
		matched = io_match_linked(head);
		spin_unlock_irq(&ctx->timeout_lock);
	} else {
		matched = io_match_linked(head);
	}
	return matched;
215 216
}

217 218 219
static inline void req_fail_link_node(struct io_kiocb *req, int res)
{
	req_set_fail(req);
220
	io_req_set_res(req, res, 0);
221 222
}

223 224 225
static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx)
{
	wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list);
226 227
}

228
static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
Jens Axboe's avatar
Jens Axboe committed
229 230 231
{
	struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);

232
	complete(&ctx->ref_comp);
Jens Axboe's avatar
Jens Axboe committed
233 234
}

235
static __cold void io_fallback_req_func(struct work_struct *work)
236 237 238 239 240
{
	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
						fallback_work.work);
	struct llist_node *node = llist_del_all(&ctx->fallback_llist);
	struct io_kiocb *req, *tmp;
241
	bool locked = false;
242 243

	percpu_ref_get(&ctx->refs);
244
	llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
245
		req->io_task_work.func(req, &locked);
246

247
	if (locked) {
248
		io_submit_flush_completions(ctx);
249 250
		mutex_unlock(&ctx->uring_lock);
	}
251 252 253
	percpu_ref_put(&ctx->refs);
}

254 255 256 257 258 259 260 261 262 263 264 265 266 267
static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
{
	unsigned hash_buckets = 1U << bits;
	size_t hash_size = hash_buckets * sizeof(table->hbs[0]);

	table->hbs = kmalloc(hash_size, GFP_KERNEL);
	if (!table->hbs)
		return -ENOMEM;

	table->hash_bits = bits;
	init_hash_table(table, hash_buckets);
	return 0;
}

268
static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
Jens Axboe's avatar
Jens Axboe committed
269 270
{
	struct io_ring_ctx *ctx;
271
	int hash_bits;
Jens Axboe's avatar
Jens Axboe committed
272 273 274 275 276

	ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
	if (!ctx)
		return NULL;

277 278
	xa_init(&ctx->io_bl_xa);

279 280
	/*
	 * Use 5 bits less than the max cq entries, that should give us around
281 282
	 * 32 entries per hash list if totally full and uniformly spread, but
	 * don't keep too many buckets to not overconsume memory.
283
	 */
284 285
	hash_bits = ilog2(p->cq_entries) - 5;
	hash_bits = clamp(hash_bits, 1, 8);
286
	if (io_alloc_hash_table(&ctx->cancel_table, hash_bits))
287
		goto err;
288 289
	if (io_alloc_hash_table(&ctx->cancel_table_locked, hash_bits))
		goto err;
290

291 292 293 294 295 296
	ctx->dummy_ubuf = kzalloc(sizeof(*ctx->dummy_ubuf), GFP_KERNEL);
	if (!ctx->dummy_ubuf)
		goto err;
	/* set invalid range, so io_import_fixed() fails meeting it */
	ctx->dummy_ubuf->ubuf = -1UL;

297
	if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
298
			    0, GFP_KERNEL))
299
		goto err;
Jens Axboe's avatar
Jens Axboe committed
300 301

	ctx->flags = p->flags;
302
	init_waitqueue_head(&ctx->sqo_sq_wait);
303
	INIT_LIST_HEAD(&ctx->sqd_list);
304
	INIT_LIST_HEAD(&ctx->cq_overflow_list);
305
	INIT_LIST_HEAD(&ctx->io_buffers_cache);
306
	io_alloc_cache_init(&ctx->apoll_cache);
Jens Axboe's avatar
Jens Axboe committed
307
	io_alloc_cache_init(&ctx->netmsg_cache);
308
	init_completion(&ctx->ref_comp);
309
	xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
Jens Axboe's avatar
Jens Axboe committed
310
	mutex_init(&ctx->uring_lock);
311
	init_waitqueue_head(&ctx->cq_wait);
Jens Axboe's avatar
Jens Axboe committed
312
	spin_lock_init(&ctx->completion_lock);
313
	spin_lock_init(&ctx->timeout_lock);
314
	INIT_WQ_LIST(&ctx->iopoll_list);
315 316
	INIT_LIST_HEAD(&ctx->io_buffers_pages);
	INIT_LIST_HEAD(&ctx->io_buffers_comp);
317
	INIT_LIST_HEAD(&ctx->defer_list);
318
	INIT_LIST_HEAD(&ctx->timeout_list);
319
	INIT_LIST_HEAD(&ctx->ltimeout_list);
320 321
	spin_lock_init(&ctx->rsrc_ref_lock);
	INIT_LIST_HEAD(&ctx->rsrc_ref_list);
322 323
	INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
	init_llist_head(&ctx->rsrc_put_llist);
324
	init_llist_head(&ctx->work_llist);
325
	INIT_LIST_HEAD(&ctx->tctx_list);
326 327
	ctx->submit_state.free_list.next = NULL;
	INIT_WQ_LIST(&ctx->locked_free_list);
328
	INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
329
	INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
Jens Axboe's avatar
Jens Axboe committed
330
	return ctx;
331
err:
332
	kfree(ctx->dummy_ubuf);
333
	kfree(ctx->cancel_table.hbs);
334
	kfree(ctx->cancel_table_locked.hbs);
335 336
	kfree(ctx->io_bl);
	xa_destroy(&ctx->io_bl_xa);
337 338
	kfree(ctx);
	return NULL;
Jens Axboe's avatar
Jens Axboe committed
339 340
}

341 342 343 344 345 346 347 348
static void io_account_cq_overflow(struct io_ring_ctx *ctx)
{
	struct io_rings *r = ctx->rings;

	WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1);
	ctx->cq_extra--;
}

349
static bool req_need_defer(struct io_kiocb *req, u32 seq)
350
{
351 352
	if (unlikely(req->flags & REQ_F_IO_DRAIN)) {
		struct io_ring_ctx *ctx = req->ctx;
353

354
		return seq + READ_ONCE(ctx->cq_extra) != ctx->cached_cq_tail;
355
	}
356

357
	return false;
358 359
}

360 361 362 363
static inline void io_req_track_inflight(struct io_kiocb *req)
{
	if (!(req->flags & REQ_F_INFLIGHT)) {
		req->flags |= REQ_F_INFLIGHT;
364
		atomic_inc(&req->task->io_uring->inflight_tracked);
365 366 367
	}
}

368 369
static struct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req)
{
370 371 372
	if (WARN_ON_ONCE(!req->link))
		return NULL;

373 374
	req->flags &= ~REQ_F_ARM_LTIMEOUT;
	req->flags |= REQ_F_LINK_TIMEOUT;
375 376

	/* linked timeouts should have two refs once prep'ed */
377
	io_req_set_refcount(req);
378 379
	__io_req_set_refcount(req->link, 2);
	return req->link;
380 381 382 383
}

static inline struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
{
384
	if (likely(!(req->flags & REQ_F_ARM_LTIMEOUT)))
385 386 387 388
		return NULL;
	return __io_prep_linked_timeout(req);
}

389 390 391 392 393 394 395 396 397 398 399
static noinline void __io_arm_ltimeout(struct io_kiocb *req)
{
	io_queue_linked_timeout(__io_prep_linked_timeout(req));
}

static inline void io_arm_ltimeout(struct io_kiocb *req)
{
	if (unlikely(req->flags & REQ_F_ARM_LTIMEOUT))
		__io_arm_ltimeout(req);
}

400 401 402 403 404
static void io_prep_async_work(struct io_kiocb *req)
{
	const struct io_op_def *def = &io_op_defs[req->opcode];
	struct io_ring_ctx *ctx = req->ctx;

405 406
	if (!(req->flags & REQ_F_CREDS)) {
		req->flags |= REQ_F_CREDS;
407
		req->creds = get_current_cred();
408
	}
409

410 411
	req->work.list.next = NULL;
	req->work.flags = 0;
412
	req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
413 414 415
	if (req->flags & REQ_F_FORCE_ASYNC)
		req->work.flags |= IO_WQ_WORK_CONCURRENT;

416 417 418
	if (req->file && !io_req_ffs_set(req))
		req->flags |= io_file_get_flags(req->file) << REQ_F_SUPPORT_NOWAIT_BIT;

419 420 421
	if (req->flags & REQ_F_ISREG) {
		if (def->hash_reg_file || (ctx->flags & IORING_SETUP_IOPOLL))
			io_wq_hash_work(&req->work, file_inode(req->file));
422
	} else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
423 424 425
		if (def->unbound_nonreg_file)
			req->work.flags |= IO_WQ_WORK_UNBOUND;
	}
426
}
427

428
static void io_prep_async_link(struct io_kiocb *req)
429
{
430
	struct io_kiocb *cur;
431

432 433 434
	if (req->flags & REQ_F_LINK_TIMEOUT) {
		struct io_ring_ctx *ctx = req->ctx;

435
		spin_lock_irq(&ctx->timeout_lock);
436 437
		io_for_each_link(cur, req)
			io_prep_async_work(cur);
438
		spin_unlock_irq(&ctx->timeout_lock);
439 440 441 442
	} else {
		io_for_each_link(cur, req)
			io_prep_async_work(cur);
	}
443 444
}

445
void io_queue_iowq(struct io_kiocb *req, bool *dont_use)
446
{
447
	struct io_kiocb *link = io_prep_linked_timeout(req);
448
	struct io_uring_task *tctx = req->task->io_uring;
449

450 451
	BUG_ON(!tctx);
	BUG_ON(!tctx->io_wq);
452

453 454
	/* init ->work of the whole link before punting */
	io_prep_async_link(req);
455 456 457 458 459 460 461 462 463 464 465

	/*
	 * Not expected to happen, but if we do have a bug where this _can_
	 * happen, catch it here and ensure the request is marked as
	 * canceled. That will make io-wq go through the usual work cancel
	 * procedure rather than attempt to run this request (or create a new
	 * worker for it).
	 */
	if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
		req->work.flags |= IO_WQ_WORK_CANCEL;

466
	trace_io_uring_queue_async_work(req, io_wq_is_hashed(&req->work));
467
	io_wq_enqueue(tctx->io_wq, &req->work);
468 469
	if (link)
		io_queue_linked_timeout(link);
470 471
}

472
static __cold void io_queue_deferred(struct io_ring_ctx *ctx)
473
{
474
	while (!list_empty(&ctx->defer_list)) {
475 476
		struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
						struct io_defer_entry, list);
477

478
		if (req_need_defer(de->req, de->seq))
479
			break;
480
		list_del_init(&de->list);
481
		io_req_task_queue(de->req);
482
		kfree(de);
483
	}
484 485
}

486 487

static void io_eventfd_ops(struct rcu_head *rcu)
488 489
{
	struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
490 491 492 493
	int ops = atomic_xchg(&ev_fd->ops, 0);

	if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT))
		eventfd_signal(ev_fd->cq_ev_fd, 1);
494

495 496 497 498 499 500 501 502
	/* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback
	 * ordering in a race but if references are 0 we know we have to free
	 * it regardless.
	 */
	if (atomic_dec_and_test(&ev_fd->refs)) {
		eventfd_ctx_put(ev_fd->cq_ev_fd);
		kfree(ev_fd);
	}
503 504
}

505
static void io_eventfd_signal(struct io_ring_ctx *ctx)
506
{
507
	struct io_ev_fd *ev_fd = NULL;
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522

	rcu_read_lock();
	/*
	 * rcu_dereference ctx->io_ev_fd once and use it for both for checking
	 * and eventfd_signal
	 */
	ev_fd = rcu_dereference(ctx->io_ev_fd);

	/*
	 * Check again if ev_fd exists incase an io_eventfd_unregister call
	 * completed between the NULL check of ctx->io_ev_fd at the start of
	 * the function and rcu_read_lock.
	 */
	if (unlikely(!ev_fd))
		goto out;
523
	if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
524
		goto out;
525 526
	if (ev_fd->eventfd_async && !io_wq_current_is_worker())
		goto out;
527

528
	if (likely(eventfd_signal_allowed())) {
529
		eventfd_signal(ev_fd->cq_ev_fd, 1);
530 531 532 533 534 535 536 537
	} else {
		atomic_inc(&ev_fd->refs);
		if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops))
			call_rcu(&ev_fd->rcu, io_eventfd_ops);
		else
			atomic_dec(&ev_fd->refs);
	}

538 539
out:
	rcu_read_unlock();
540 541
}

542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
static void io_eventfd_flush_signal(struct io_ring_ctx *ctx)
{
	bool skip;

	spin_lock(&ctx->completion_lock);

	/*
	 * Eventfd should only get triggered when at least one event has been
	 * posted. Some applications rely on the eventfd notification count
	 * only changing IFF a new CQE has been added to the CQ ring. There's
	 * no depedency on 1:1 relationship between how many times this
	 * function is called (and hence the eventfd count) and number of CQEs
	 * posted to the CQ ring.
	 */
	skip = ctx->cached_cq_tail == ctx->evfd_last_cq_tail;
	ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
	spin_unlock(&ctx->completion_lock);
	if (skip)
		return;

	io_eventfd_signal(ctx);
}

565 566 567 568 569 570 571 572 573 574 575
void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
{
	if (ctx->off_timeout_used || ctx->drain_active) {
		spin_lock(&ctx->completion_lock);
		if (ctx->off_timeout_used)
			io_flush_timeouts(ctx);
		if (ctx->drain_active)
			io_queue_deferred(ctx);
		spin_unlock(&ctx->completion_lock);
	}
	if (ctx->has_evfd)
576
		io_eventfd_flush_signal(ctx);
577 578
}

579
static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx)
580
{
581
	io_commit_cqring_flush(ctx);
582
	io_cqring_wake(ctx);
583 584
}

585 586 587 588 589 590 591 592 593 594 595 596 597
static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
	__releases(ctx->completion_lock)
{
	io_commit_cqring(ctx);
	spin_unlock(&ctx->completion_lock);
	io_cqring_ev_posted(ctx);
}

void io_cq_unlock_post(struct io_ring_ctx *ctx)
{
	__io_cq_unlock_post(ctx);
}

598
/* Returns true if there are no backlogged entries after the flush */
599
static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
600
{
601
	bool all_flushed;
602
	size_t cqe_size = sizeof(struct io_uring_cqe);
603

604
	if (!force && __io_cqring_events(ctx) == ctx->cq_entries)
605
		return false;
606

607 608 609
	if (ctx->flags & IORING_SETUP_CQE32)
		cqe_size <<= 1;

610
	io_cq_lock(ctx);
611
	while (!list_empty(&ctx->cq_overflow_list)) {
612
		struct io_uring_cqe *cqe = io_get_cqe(ctx);
613
		struct io_overflow_cqe *ocqe;
614

615 616
		if (!cqe && !force)
			break;
617 618 619
		ocqe = list_first_entry(&ctx->cq_overflow_list,
					struct io_overflow_cqe, list);
		if (cqe)
620
			memcpy(cqe, &ocqe->cqe, cqe_size);
621
		else
622 623
			io_account_cq_overflow(ctx);

624 625
		list_del(&ocqe->list);
		kfree(ocqe);
626 627
	}

628 629
	all_flushed = list_empty(&ctx->cq_overflow_list);
	if (all_flushed) {
630
		clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
631
		atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
632
	}
633

634
	io_cq_unlock_post(ctx);
635
	return all_flushed;
636 637
}

638
static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx)
639
{
640 641
	bool ret = true;

642
	if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
643 644 645
		/* iopoll syncs against uring_lock, not completion_lock */
		if (ctx->flags & IORING_SETUP_IOPOLL)
			mutex_lock(&ctx->uring_lock);
646
		ret = __io_cqring_overflow_flush(ctx, false);
647 648 649
		if (ctx->flags & IORING_SETUP_IOPOLL)
			mutex_unlock(&ctx->uring_lock);
	}
650 651

	return ret;
652 653
}

654
void __io_put_task(struct task_struct *task, int nr)
655 656 657
{
	struct io_uring_task *tctx = task->io_uring;

658 659 660 661 662 663
	percpu_counter_sub(&tctx->inflight, nr);
	if (unlikely(atomic_read(&tctx->in_idle)))
		wake_up(&tctx->wait);
	put_task_struct_many(task, nr);
}

664
void io_task_refs_refill(struct io_uring_task *tctx)
665 666 667 668 669 670 671 672
{
	unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;

	percpu_counter_add(&tctx->inflight, refill);
	refcount_add(refill, &current->usage);
	tctx->cached_refs += refill;
}

673 674 675 676 677 678 679 680 681 682 683 684
static __cold void io_uring_drop_tctx_refs(struct task_struct *task)
{
	struct io_uring_task *tctx = task->io_uring;
	unsigned int refs = tctx->cached_refs;

	if (refs) {
		tctx->cached_refs = 0;
		percpu_counter_sub(&tctx->inflight, refs);
		put_task_struct_many(task, refs);
	}
}

685 686
static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
				     s32 res, u32 cflags, u64 extra1, u64 extra2)
Jens Axboe's avatar
Jens Axboe committed
687
{
688
	struct io_overflow_cqe *ocqe;
689 690
	size_t ocq_size = sizeof(struct io_overflow_cqe);
	bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32);
Jens Axboe's avatar
Jens Axboe committed
691

692 693
	if (is_cqe32)
		ocq_size += sizeof(struct io_uring_cqe);
Jens Axboe's avatar
Jens Axboe committed
694

695
	ocqe = kmalloc(ocq_size, GFP_ATOMIC | __GFP_ACCOUNT);
696
	trace_io_uring_cqe_overflow(ctx, user_data, res, cflags, ocqe);
697 698 699 700 701 702
	if (!ocqe) {
		/*
		 * If we're in ring overflow flush mode, or in task cancel mode,
		 * or cannot allocate an overflow entry, then we need to drop it
		 * on the floor.
		 */
703
		io_account_cq_overflow(ctx);
704
		set_bit(IO_CHECK_CQ_DROPPED_BIT, &ctx->check_cq);
705
		return false;
Jens Axboe's avatar
Jens Axboe committed
706
	}
707
	if (list_empty(&ctx->cq_overflow_list)) {
708
		set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
709
		atomic_or(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
710

711
	}
712
	ocqe->cqe.user_data = user_data;
713 714
	ocqe->cqe.res = res;
	ocqe->cqe.flags = cflags;
715 716 717 718
	if (is_cqe32) {
		ocqe->cqe.big_cqe[0] = extra1;
		ocqe->cqe.big_cqe[1] = extra2;
	}
719 720
	list_add_tail(&ocqe->list, &ctx->cq_overflow_list);
	return true;
Jens Axboe's avatar
Jens Axboe committed
721 722
}

723 724 725 726 727 728 729 730 731 732 733
bool io_req_cqe_overflow(struct io_kiocb *req)
{
	if (!(req->flags & REQ_F_CQE32_INIT)) {
		req->extra1 = 0;
		req->extra2 = 0;
	}
	return io_cqring_event_overflow(req->ctx, req->cqe.user_data,
					req->cqe.res, req->cqe.flags,
					req->extra1, req->extra2);
}

734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
/*
 * writes to the cq entry need to come after reading head; the
 * control dependency is enough as we're using WRITE_ONCE to
 * fill the cq entry
 */
struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx)
{
	struct io_rings *rings = ctx->rings;
	unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
	unsigned int free, queued, len;


	/* userspace may cheat modifying the tail, be safe and do min */
	queued = min(__io_cqring_events(ctx), ctx->cq_entries);
	free = ctx->cq_entries - queued;
	/* we need a contiguous range, limit based on the current array offset */
	len = min(free, ctx->cq_entries - off);
	if (!len)
		return NULL;

754 755 756 757 758
	if (ctx->flags & IORING_SETUP_CQE32) {
		off <<= 1;
		len <<= 1;
	}

759 760
	ctx->cqe_cached = &rings->cqes[off];
	ctx->cqe_sentinel = ctx->cqe_cached + len;
761 762

	ctx->cached_cq_tail++;
763
	ctx->cqe_cached++;
764 765 766
	if (ctx->flags & IORING_SETUP_CQE32)
		ctx->cqe_cached++;
	return &rings->cqes[off];
767 768
}

769 770
bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
		     bool allow_overflow)
771
{
772 773
	struct io_uring_cqe *cqe;

774
	ctx->cq_extra++;
775 776 777 778 779 780 781 782

	/*
	 * If we can't get a cq entry, userspace overflowed the
	 * submission (by quite a lot). Increment the overflow count in
	 * the ring.
	 */
	cqe = io_get_cqe(ctx);
	if (likely(cqe)) {
783 784
		trace_io_uring_complete(ctx, NULL, user_data, res, cflags, 0, 0);

785 786 787
		WRITE_ONCE(cqe->user_data, user_data);
		WRITE_ONCE(cqe->res, res);
		WRITE_ONCE(cqe->flags, cflags);
788 789 790 791 792

		if (ctx->flags & IORING_SETUP_CQE32) {
			WRITE_ONCE(cqe->big_cqe[0], 0);
			WRITE_ONCE(cqe->big_cqe[1], 0);
		}
793 794
		return true;
	}
795 796 797 798 799

	if (allow_overflow)
		return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);

	return false;
800 801
}

802
bool io_post_aux_cqe(struct io_ring_ctx *ctx,
803 804
		     u64 user_data, s32 res, u32 cflags,
		     bool allow_overflow)
805 806 807
{
	bool filled;

808
	io_cq_lock(ctx);
809
	filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow);
810
	io_cq_unlock_post(ctx);
811 812 813
	return filled;
}

814
static void __io_req_complete_put(struct io_kiocb *req)
Jens Axboe's avatar
Jens Axboe committed
815
{
816 817 818 819
	/*
	 * If we're the last reference to this request, add to our locked
	 * free_list cache.
	 */
820
	if (req_ref_put_and_test(req)) {
821 822
		struct io_ring_ctx *ctx = req->ctx;

823
		if (req->flags & IO_REQ_LINK_FLAGS) {
824
			if (req->flags & IO_DISARM_MASK)
825 826 827 828 829 830
				io_disarm_next(req);
			if (req->link) {
				io_req_task_queue(req->link);
				req->link = NULL;
			}
		}
831
		io_req_put_rsrc(req);
832 833 834 835 836 837
		/*
		 * Selected buffer deallocation in io_clean_op() assumes that
		 * we don't hold ->completion_lock. Clean them here to avoid
		 * deadlocks.
		 */
		io_put_kbuf_comp(req);
838 839
		io_dismantle_req(req);
		io_put_task(req->task, 1);
840
		wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
841
		ctx->locked_free_nr++;
842
	}
843 844
}

845
void __io_req_complete_post(struct io_kiocb *req)
846
{
847
	if (!(req->flags & REQ_F_CQE_SKIP))
848
		__io_fill_cqe_req(req->ctx, req);
849 850 851
	__io_req_complete_put(req);
}

852
void io_req_complete_post(struct io_kiocb *req)
853 854 855
{
	struct io_ring_ctx *ctx = req->ctx;

856
	io_cq_lock(ctx);
857
	__io_req_complete_post(req);
858
	io_cq_unlock_post(ctx);
859 860
}

861
inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags)
862
{
863
	io_req_complete_post(req);
864 865
}

866
void io_req_complete_failed(struct io_kiocb *req, s32 res)
867
{
868
	req_set_fail(req);
869 870
	io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
	io_req_complete_post(req);
871 872
}

873 874 875 876 877 878 879 880 881 882
/*
 * Don't initialise the fields below on every allocation, but do that in
 * advance and keep them valid across allocations.
 */
static void io_preinit_req(struct io_kiocb *req, struct io_ring_ctx *ctx)
{
	req->ctx = ctx;
	req->link = NULL;
	req->async_data = NULL;
	/* not necessary, but safer to zero */
883
	req->cqe.res = 0;
884 885
}

886
static void io_flush_cached_locked_reqs(struct io_ring_ctx *ctx,
887
					struct io_submit_state *state)
888
{
889
	spin_lock(&ctx->completion_lock);
890
	wq_list_splice(&ctx->locked_free_list, &state->free_list);
891
	ctx->locked_free_nr = 0;
892
	spin_unlock(&ctx->completion_lock);
893 894
}

895 896 897 898 899 900
/*
 * A request might get retired back into the request caches even before opcode
 * handlers and io_issue_sqe() are done with it, e.g. inline completion path.
 * Because of that, io_alloc_req() should be called only under ->uring_lock
 * and with extra caution to not get a request that is still worked on.
 */
901
__cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx)
902
	__must_hold(&ctx->uring_lock)
Jens Axboe's avatar
Jens Axboe committed
903
{
904
	gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
905
	void *reqs[IO_REQ_ALLOC_BATCH];
906
	int ret, i;
907

908 909 910 911 912
	/*
	 * If we have more than a batch's worth of requests in our IRQ side
	 * locked cache, grab the lock and move them over to our submission
	 * side cache.
	 */
913
	if (data_race(ctx->locked_free_nr) > IO_COMPL_BATCH) {
914
		io_flush_cached_locked_reqs(ctx, &ctx->submit_state);
915
		if (!io_req_cache_empty(ctx))
916 917
			return true;
	}
918

919
	ret = kmem_cache_alloc_bulk(req_cachep, gfp, ARRAY_SIZE(reqs), reqs);
920

921 922 923 924 925
	/*
	 * Bulk alloc is all-or-nothing. If we fail to get a batch,
	 * retry single alloc to be on the safe side.
	 */
	if (unlikely(ret <= 0)) {
926 927
		reqs[0] = kmem_cache_alloc(req_cachep, gfp);
		if (!reqs[0])
928
			return false;
929
		ret = 1;
Jens Axboe's avatar
Jens Axboe committed
930
	}
931

932
	percpu_ref_get_many(&ctx->refs, ret);
933
	for (i = 0; i < ret; i++) {
934
		struct io_kiocb *req = reqs[i];
935 936

		io_preinit_req(req, ctx);
937
		io_req_add_to_cache(req, ctx);
938
	}
939 940 941
	return true;
}

942
static inline void io_dismantle_req(struct io_kiocb *req)
Jens Axboe's avatar
Jens Axboe committed
943
{
944
	unsigned int flags = req->flags;
945

946
	if (unlikely(flags & IO_REQ_CLEAN_FLAGS))
947
		io_clean_op(req);
948 949
	if (!(flags & REQ_F_FIXED_FILE))
		io_put_file(req->file);
950 951
}

952
__cold void io_free_req(struct io_kiocb *req)
953
{
954
	struct io_ring_ctx *ctx = req->ctx;
955

956
	io_req_put_rsrc(req);
957
	io_dismantle_req(req);
958
	io_put_task(req->task, 1);
959

960
	spin_lock(&ctx->completion_lock);
961
	wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
962
	ctx->locked_free_nr++;
963
	spin_unlock(&ctx->completion_lock);
964 965
}

966 967 968 969
static void __io_req_find_next_prep(struct io_kiocb *req)
{
	struct io_ring_ctx *ctx = req->ctx;

970
	io_cq_lock(ctx);
971
	io_disarm_next(req);
972
	io_cq_unlock_post(ctx);
973 974 975
}

static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
976
{
977
	struct io_kiocb *nxt;
978

979 980 981 982 983 984
	/*
	 * If LINK is set, we have dependent requests in this chain. If we
	 * didn't fail this request, queue the first one up, moving any other
	 * dependencies to the next request. In case of failure, fail the rest
	 * of the chain.
	 */
985 986
	if (unlikely(req->flags & IO_DISARM_MASK))
		__io_req_find_next_prep(req);
987 988 989
	nxt = req->link;
	req->link = NULL;
	return nxt;
990
}
991

992
static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
993 994 995
{
	if (!ctx)
		return;
996 997
	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
998
	if (*locked) {
999
		io_submit_flush_completions(ctx);
1000
		mutex_unlock(&ctx->uring_lock);
1001
		*locked = false;
1002 1003 1004 1005
	}
	percpu_ref_put(&ctx->refs);
}

1006 1007 1008
static unsigned int handle_tw_list(struct llist_node *node,
				   struct io_ring_ctx **ctx, bool *locked,
				   struct llist_node *last)
1009
{
1010 1011
	unsigned int count = 0;

Dylan Yudaken's avatar
Dylan Yudaken committed
1012
	while (node != last) {
1013
		struct llist_node *next = node->next;
1014 1015 1016
		struct io_kiocb *req = container_of(node, struct io_kiocb,
						    io_task_work.node);

1017 1018
		prefetch(container_of(next, struct io_kiocb, io_task_work.node));

1019 1020 1021 1022 1023 1024 1025 1026 1027
		if (req->ctx != *ctx) {
			ctx_flush_and_put(*ctx, locked);
			*ctx = req->ctx;
			/* if not contended, grab and improve batching */
			*locked = mutex_trylock(&(*ctx)->uring_lock);
			percpu_ref_get(&(*ctx)->refs);
		}
		req->io_task_work.func(req, locked);
		node = next;
1028
		count++;
Dylan Yudaken's avatar
Dylan Yudaken committed
1029
	}
1030 1031

	return count;
1032 1033
}

1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
/**
 * io_llist_xchg - swap all entries in a lock-less list
 * @head:	the head of lock-less list to delete all entries
 * @new:	new entry as the head of the list
 *
 * If list is empty, return NULL, otherwise, return the pointer to the first entry.
 * The order of entries returned is from the newest to the oldest added one.
 */
static inline struct llist_node *io_llist_xchg(struct llist_head *head,
					       struct llist_node *new)
{
	return xchg(&head->first, new);
}

/**
 * io_llist_cmpxchg - possibly swap all entries in a lock-less list
 * @head:	the head of lock-less list to delete all entries
 * @old:	expected old value of the first entry of the list
 * @new:	new entry as the head of the list
 *
 * perform a cmpxchg on the first entry of the list.
 */

static inline struct llist_node *io_llist_cmpxchg(struct llist_head *head,
						  struct llist_node *old,
						  struct llist_node *new)
{
	return cmpxchg(&head->first, old, new);
}

1064
void tctx_task_work(struct callback_head *cb)
1065
{
1066
	bool uring_locked = false;
1067
	struct io_ring_ctx *ctx = NULL;
1068 1069
	struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
						  task_work);
Dylan Yudaken's avatar
Dylan Yudaken committed
1070 1071
	struct llist_node fake = {};
	struct llist_node *node = io_llist_xchg(&tctx->task_list, &fake);
1072 1073
	unsigned int loops = 1;
	unsigned int count = handle_tw_list(node, &ctx, &uring_locked, NULL);
Dylan Yudaken's avatar
Dylan Yudaken committed
1074 1075 1076

	node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
	while (node != &fake) {
1077
		loops++;
Dylan Yudaken's avatar
Dylan Yudaken committed
1078
		node = io_llist_xchg(&tctx->task_list, &fake);
1079
		count += handle_tw_list(node, &ctx, &uring_locked, &fake);
Dylan Yudaken's avatar
Dylan Yudaken committed
1080
		node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
1081
	}
1082

1083
	ctx_flush_and_put(ctx, &uring_locked);
1084 1085 1086 1087

	/* relaxed read is enough as only the task itself sets ->in_idle */
	if (unlikely(atomic_read(&tctx->in_idle)))
		io_uring_drop_tctx_refs(current);
1088 1089

	trace_io_uring_task_work_run(tctx, count, loops);
1090 1091
}

1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
static void io_req_local_work_add(struct io_kiocb *req)
{
	struct io_ring_ctx *ctx = req->ctx;

	if (!llist_add(&req->io_task_work.node, &ctx->work_llist))
		return;

	if (unlikely(atomic_read(&req->task->io_uring->in_idle))) {
		io_move_task_work_from_local(ctx);
		return;
	}

	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);

1107 1108
	if (ctx->has_evfd)
		io_eventfd_signal(ctx);
1109 1110 1111 1112 1113
	io_cqring_wake(ctx);

}

static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
1114
{
1115
	struct io_uring_task *tctx = req->task->io_uring;
1116
	struct io_ring_ctx *ctx = req->ctx;
1117
	struct llist_node *node;
1118

1119 1120 1121 1122 1123
	if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
		io_req_local_work_add(req);
		return;
	}

1124
	/* task_work already pending, we're done */
1125
	if (!llist_add(&req->io_task_work.node, &tctx->task_list))
1126
		return;
1127

1128 1129 1130
	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
		atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);

1131
	if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
1132
		return;
1133

1134
	node = llist_del_all(&tctx->task_list);
1135

1136 1137 1138
	while (node) {
		req = container_of(node, struct io_kiocb, io_task_work.node);
		node = node->next;
1139
		if (llist_add(&req->io_task_work.node,
1140 1141 1142
			      &req->ctx->fallback_llist))
			schedule_delayed_work(&req->ctx->fallback_work, 1);
	}
1143 1144
}

1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211
void io_req_task_work_add(struct io_kiocb *req)
{
	__io_req_task_work_add(req, true);
}

static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
{
	struct llist_node *node;

	node = llist_del_all(&ctx->work_llist);
	while (node) {
		struct io_kiocb *req = container_of(node, struct io_kiocb,
						    io_task_work.node);

		node = node->next;
		__io_req_task_work_add(req, false);
	}
}

int io_run_local_work(struct io_ring_ctx *ctx)
{
	bool locked;
	struct llist_node *node;
	struct llist_node fake;
	struct llist_node *current_final = NULL;
	int ret;

	if (unlikely(ctx->submitter_task != current)) {
		/* maybe this is before any submissions */
		if (!ctx->submitter_task)
			return 0;

		return -EEXIST;
	}

	locked = mutex_trylock(&ctx->uring_lock);

	node = io_llist_xchg(&ctx->work_llist, &fake);
	ret = 0;
again:
	while (node != current_final) {
		struct llist_node *next = node->next;
		struct io_kiocb *req = container_of(node, struct io_kiocb,
						    io_task_work.node);
		prefetch(container_of(next, struct io_kiocb, io_task_work.node));
		req->io_task_work.func(req, &locked);
		ret++;
		node = next;
	}

	if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
		atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);

	node = io_llist_cmpxchg(&ctx->work_llist, &fake, NULL);
	if (node != &fake) {
		current_final = &fake;
		node = io_llist_xchg(&ctx->work_llist, &fake);
		goto again;
	}

	if (locked) {
		io_submit_flush_completions(ctx);
		mutex_unlock(&ctx->uring_lock);
	}
	return ret;
}

1212
static void io_req_tw_post(struct io_kiocb *req, bool *locked)
1213
{
1214
	io_req_complete_post(req);
1215
}
1216

1217
void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags)
1218
{
1219
	io_req_set_res(req, res, cflags);
1220
	req->io_task_work.func = io_req_tw_post;
1221
	io_req_task_work_add(req);
1222 1223
}

1224
static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
1225
{
1226
	/* not needed for normal modes, but SQPOLL depends on it */
1227
	io_tw_lock(req->ctx, locked);
1228
	io_req_complete_failed(req, req->cqe.res);
1229 1230
}

1231
void io_req_task_submit(struct io_kiocb *req, bool *locked)
1232
{
1233
	io_tw_lock(req->ctx, locked);
1234
	/* req->task == current here, checking PF_EXITING is safe */
1235
	if (likely(!(req->task->flags & PF_EXITING)))
1236
		io_queue_sqe(req);
1237
	else
1238
		io_req_complete_failed(req, -EFAULT);
1239 1240
}

1241
void io_req_task_queue_fail(struct io_kiocb *req, int ret)
1242
{
1243
	io_req_set_res(req, ret, 0);
1244
	req->io_task_work.func = io_req_task_cancel;
1245
	io_req_task_work_add(req);
1246 1247
}

1248
void io_req_task_queue(struct io_kiocb *req)
1249
{
1250
	req->io_task_work.func = io_req_task_submit;
1251
	io_req_task_work_add(req);
1252 1253
}

1254
void io_queue_next(struct io_kiocb *req)
1255
{
1256
	struct io_kiocb *nxt = io_req_find_next(req);
1257 1258

	if (nxt)
1259
		io_req_task_queue(nxt);
1260 1261
}

1262
void io_free_batch_list(struct io_ring_ctx *ctx, struct io_wq_work_node *node)
1263
	__must_hold(&ctx->uring_lock)
1264
{
1265
	struct task_struct *task = NULL;
1266
	int task_refs = 0;
1267

1268 1269 1270
	do {
		struct io_kiocb *req = container_of(node, struct io_kiocb,
						    comp_list);
1271

1272 1273 1274 1275 1276 1277
		if (unlikely(req->flags & IO_REQ_CLEAN_SLOW_FLAGS)) {
			if (req->flags & REQ_F_REFCOUNT) {
				node = req->comp_list.next;
				if (!req_ref_put_and_test(req))
					continue;
			}
1278 1279 1280 1281 1282
			if ((req->flags & REQ_F_POLLED) && req->apoll) {
				struct async_poll *apoll = req->apoll;

				if (apoll->double_poll)
					kfree(apoll->double_poll);
1283 1284
				if (!io_alloc_cache_put(&ctx->apoll_cache, &apoll->cache))
					kfree(apoll);
1285 1286
				req->flags &= ~REQ_F_POLLED;
			}
1287
			if (req->flags & IO_REQ_LINK_FLAGS)
1288
				io_queue_next(req);
1289 1290
			if (unlikely(req->flags & IO_REQ_CLEAN_FLAGS))
				io_clean_op(req);
1291
		}
1292 1293
		if (!(req->flags & REQ_F_FIXED_FILE))
			io_put_file(req->file);
1294

1295
		io_req_put_rsrc_locked(req, ctx);
1296

1297 1298 1299 1300 1301 1302 1303
		if (req->task != task) {
			if (task)
				io_put_task(task, task_refs);
			task = req->task;
			task_refs = 0;
		}
		task_refs++;
1304
		node = req->comp_list.next;
1305
		io_req_add_to_cache(req, ctx);
1306
	} while (node);
1307 1308 1309

	if (task)
		io_put_task(task, task_refs);
1310 1311
}

1312
static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
1313
	__must_hold(&ctx->uring_lock)
1314
{
1315
	struct io_wq_work_node *node, *prev;
1316
	struct io_submit_state *state = &ctx->submit_state;
1317

1318 1319 1320 1321
	spin_lock(&ctx->completion_lock);
	wq_list_for_each(node, prev, &state->compl_reqs) {
		struct io_kiocb *req = container_of(node, struct io_kiocb,
					    comp_list);
1322

1323 1324
		if (!(req->flags & REQ_F_CQE_SKIP))
			__io_fill_cqe_req(ctx, req);
1325
	}
1326
	__io_cq_unlock_post(ctx);
1327

1328
	io_free_batch_list(ctx, state->compl_reqs.first);
1329
	INIT_WQ_LIST(&state->compl_reqs);
1330 1331
}

1332 1333 1334 1335
/*
 * Drop reference to request, return next in chain (if there is one) if this
 * was the last reference to this request.
 */
1336
static inline struct io_kiocb *io_put_req_find_next(struct io_kiocb *req)
1337
{
1338 1339
	struct io_kiocb *nxt = NULL;

1340
	if (req_ref_put_and_test(req)) {
1341
		if (unlikely(req->flags & IO_REQ_LINK_FLAGS))
1342
			nxt = io_req_find_next(req);
1343
		io_free_req(req);
1344
	}
1345
	return nxt;
Jens Axboe's avatar
Jens Axboe committed
1346 1347
}

1348
static unsigned io_cqring_events(struct io_ring_ctx *ctx)
1349 1350 1351
{
	/* See comment at the top of this file */
	smp_rmb();
1352
	return __io_cqring_events(ctx);
1353 1354
}

1355 1356 1357 1358
/*
 * We can't just wait for polled events to come to us, we have to actively
 * find and complete them.
 */
1359
static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
1360 1361 1362 1363 1364
{
	if (!(ctx->flags & IORING_SETUP_IOPOLL))
		return;

	mutex_lock(&ctx->uring_lock);
1365
	while (!wq_list_empty(&ctx->iopoll_list)) {
1366
		/* let it sleep and repeat later if can't complete a request */
1367
		if (io_do_iopoll(ctx, true) == 0)
1368
			break;
1369 1370 1371
		/*
		 * Ensure we allow local-to-the-cpu processing to take place,
		 * in this case we need to ensure that we reap all events.
1372
		 * Also let task_work, etc. to progress by releasing the mutex
1373
		 */
1374 1375 1376 1377 1378
		if (need_resched()) {
			mutex_unlock(&ctx->uring_lock);
			cond_resched();
			mutex_lock(&ctx->uring_lock);
		}
1379 1380 1381 1382
	}
	mutex_unlock(&ctx->uring_lock);
}

1383
static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
1384
{
1385
	unsigned int nr_events = 0;
1386
	int ret = 0;
1387
	unsigned long check_cq;
1388

1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399
	check_cq = READ_ONCE(ctx->check_cq);
	if (unlikely(check_cq)) {
		if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
			__io_cqring_overflow_flush(ctx, false);
		/*
		 * Similarly do not spin if we have not informed the user of any
		 * dropped CQE.
		 */
		if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
			return -EBADR;
	}
1400 1401 1402 1403 1404 1405
	/*
	 * Don't enter poll loop if we already have events pending.
	 * If we do, we can potentially be spinning for commands that
	 * already triggered a CQE (eg in error).
	 */
	if (io_cqring_events(ctx))
1406
		return 0;
1407

1408
	do {
1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
		/*
		 * If a submit got punted to a workqueue, we can have the
		 * application entering polling for a command before it gets
		 * issued. That app will hold the uring_lock for the duration
		 * of the poll right here, so we need to take a breather every
		 * now and then to ensure that the issue has a chance to add
		 * the poll to the issued list. Otherwise we can spin here
		 * forever, while the workqueue is stuck trying to acquire the
		 * very same mutex.
		 */
1419
		if (wq_list_empty(&ctx->iopoll_list)) {
1420 1421
			u32 tail = ctx->cached_cq_tail;

1422
			mutex_unlock(&ctx->uring_lock);
1423
			ret = io_run_task_work_ctx(ctx);
1424
			mutex_lock(&ctx->uring_lock);
1425 1426
			if (ret < 0)
				break;
1427

1428 1429
			/* some requests don't go through iopoll_list */
			if (tail != ctx->cached_cq_tail ||
1430
			    wq_list_empty(&ctx->iopoll_list))
1431
				break;
1432
		}
1433 1434 1435 1436 1437 1438
		ret = io_do_iopoll(ctx, !min);
		if (ret < 0)
			break;
		nr_events += ret;
		ret = 0;
	} while (nr_events < min && !need_resched());
1439

1440 1441
	return ret;
}
1442 1443

void io_req_task_complete(struct io_kiocb *req, bool *locked)
1444
{
1445 1446 1447 1448 1449 1450 1451
	if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
		unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;

		req->cqe.flags |= io_put_kbuf(req, issue_flags);
	}

	if (*locked)
1452
		io_req_complete_defer(req);
1453
	else
1454
		io_req_complete_post(req);
1455 1456
}

1457 1458 1459
/*
 * After the iocb has been issued, it's safe to be found on the poll list.
 * Adding the kiocb to the list AFTER submission ensures that we don't
1460
 * find it from a io_do_iopoll() thread before the issuer is done
1461 1462
 * accessing the kiocb cookie.
 */
1463
static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
1464 1465
{
	struct io_ring_ctx *ctx = req->ctx;
1466
	const bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
1467 1468

	/* workqueue context doesn't hold uring_lock, grab it now */
1469
	if (unlikely(needs_lock))
1470
		mutex_lock(&ctx->uring_lock);
1471 1472 1473 1474 1475 1476

	/*
	 * Track whether we have multiple files in our lists. This will impact
	 * how we do polling eventually, not spinning if we're on potentially
	 * different devices.
	 */
1477
	if (wq_list_empty(&ctx->iopoll_list)) {
1478 1479
		ctx->poll_multi_queue = false;
	} else if (!ctx->poll_multi_queue) {
1480 1481
		struct io_kiocb *list_req;

1482 1483
		list_req = container_of(ctx->iopoll_list.first, struct io_kiocb,
					comp_list);
1484
		if (list_req->file != req->file)
1485
			ctx->poll_multi_queue = true;
1486 1487 1488 1489 1490 1491
	}

	/*
	 * For fast devices, IO may have already completed. If it has, add
	 * it to the front so we find it first.
	 */
1492
	if (READ_ONCE(req->iopoll_completed))
1493
		wq_list_add_head(&req->comp_list, &ctx->iopoll_list);
1494
	else
1495
		wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);
1496

1497
	if (unlikely(needs_lock)) {
1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509
		/*
		 * If IORING_SETUP_SQPOLL is enabled, sqes are either handle
		 * in sq thread task context or in io worker task context. If
		 * current task context is sq thread, we don't need to check
		 * whether should wake up sq thread.
		 */
		if ((ctx->flags & IORING_SETUP_SQPOLL) &&
		    wq_has_sleeper(&ctx->sq_data->wait))
			wake_up(&ctx->sq_data->wait);

		mutex_unlock(&ctx->uring_lock);
	}
1510 1511
}

1512 1513
static bool io_bdev_nowait(struct block_device *bdev)
{
1514
	return !bdev || blk_queue_nowait(bdev_get_queue(bdev));
1515 1516
}

Jens Axboe's avatar
Jens Axboe committed
1517 1518 1519 1520 1521
/*
 * If we tracked the file through the SCM inflight mechanism, we could support
 * any file. For now, just ensure that anything potentially problematic is done
 * inline.
 */
1522
static bool __io_file_supports_nowait(struct file *file, umode_t mode)
Jens Axboe's avatar
Jens Axboe committed
1523
{
1524
	if (S_ISBLK(mode)) {
Christoph Hellwig's avatar
Christoph Hellwig committed
1525 1526
		if (IS_ENABLED(CONFIG_BLOCK) &&
		    io_bdev_nowait(I_BDEV(file->f_mapping->host)))
1527 1528 1529
			return true;
		return false;
	}
1530
	if (S_ISSOCK(mode))
Jens Axboe's avatar
Jens Axboe committed
1531
		return true;
1532
	if (S_ISREG(mode)) {
Christoph Hellwig's avatar
Christoph Hellwig committed
1533 1534
		if (IS_ENABLED(CONFIG_BLOCK) &&
		    io_bdev_nowait(file->f_inode->i_sb->s_bdev) &&
1535
		    !io_is_uring_fops(file))
1536 1537 1538
			return true;
		return false;
	}
Jens Axboe's avatar
Jens Axboe committed
1539

1540 1541 1542
	/* any ->read/write should understand O_NONBLOCK */
	if (file->f_flags & O_NONBLOCK)
		return true;
1543
	return file->f_mode & FMODE_NOWAIT;
Jens Axboe's avatar
Jens Axboe committed
1544
}
1545

1546 1547 1548 1549 1550
/*
 * If we tracked the file through the SCM inflight mechanism, we could support
 * any file. For now, just ensure that anything potentially problematic is done
 * inline.
 */
1551
unsigned int io_file_get_flags(struct file *file)
1552 1553 1554
{
	umode_t mode = file_inode(file)->i_mode;
	unsigned int res = 0;
1555

1556 1557 1558 1559
	if (S_ISREG(mode))
		res |= FFS_ISREG;
	if (__io_file_supports_nowait(file, mode))
		res |= FFS_NOWAIT;
1560 1561
	if (io_file_need_scm(file))
		res |= FFS_SCM;
1562
	return res;
Jens Axboe's avatar
Jens Axboe committed
1563 1564
}

1565
bool io_alloc_async_data(struct io_kiocb *req)
1566
{
1567 1568
	WARN_ON_ONCE(!io_op_defs[req->opcode].async_size);
	req->async_data = kmalloc(io_op_defs[req->opcode].async_size, GFP_KERNEL);
1569 1570 1571 1572 1573
	if (req->async_data) {
		req->flags |= REQ_F_ASYNC_DATA;
		return false;
	}
	return true;
1574 1575
}

1576
int io_req_prep_async(struct io_kiocb *req)
1577
{
1578 1579 1580 1581 1582
	const struct io_op_def *def = &io_op_defs[req->opcode];

	/* assign early for deferred execution for non-fixed file */
	if (def->needs_file && !(req->flags & REQ_F_FIXED_FILE))
		req->file = io_file_get_normal(req, req->cqe.fd);
1583
	if (!def->prep_async)
1584 1585 1586
		return 0;
	if (WARN_ON_ONCE(req_has_async_data(req)))
		return -EFAULT;
1587 1588 1589 1590
	if (!io_op_defs[req->opcode].manual_alloc) {
		if (io_alloc_async_data(req))
			return -EAGAIN;
	}
1591
	return def->prep_async(req);
1592 1593
}

1594 1595
static u32 io_get_sequence(struct io_kiocb *req)
{
1596
	u32 seq = req->ctx->cached_sq_head;
1597
	struct io_kiocb *cur;
1598

1599
	/* need original cached_sq_head, but it was increased for each req */
1600
	io_for_each_link(cur, req)
1601 1602
		seq--;
	return seq;
1603 1604
}

1605
static __cold void io_drain_req(struct io_kiocb *req)
1606
{
1607
	struct io_ring_ctx *ctx = req->ctx;
1608
	struct io_defer_entry *de;
1609
	int ret;
1610
	u32 seq = io_get_sequence(req);
1611

1612
	/* Still need defer if there is pending req in defer list. */
1613
	spin_lock(&ctx->completion_lock);
1614
	if (!req_need_defer(req, seq) && list_empty_careful(&ctx->defer_list)) {
1615
		spin_unlock(&ctx->completion_lock);
1616
queue:
1617
		ctx->drain_active = false;
1618 1619
		io_req_task_queue(req);
		return;
1620
	}
1621
	spin_unlock(&ctx->completion_lock);
1622

1623
	ret = io_req_prep_async(req);
1624 1625 1626 1627 1628
	if (ret) {
fail:
		io_req_complete_failed(req, ret);
		return;
	}
1629
	io_prep_async_link(req);
1630
	de = kmalloc(sizeof(*de), GFP_KERNEL);
1631
	if (!de) {
1632
		ret = -ENOMEM;
1633
		goto fail;
1634
	}
1635

1636
	spin_lock(&ctx->completion_lock);
1637
	if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
1638
		spin_unlock(&ctx->completion_lock);
1639
		kfree(de);
1640
		goto queue;
1641 1642
	}

1643
	trace_io_uring_defer(req);
1644
	de->req = req;
1645
	de->seq = seq;
1646
	list_add_tail(&de->list, &ctx->defer_list);
1647
	spin_unlock(&ctx->completion_lock);
1648 1649
}

1650
static void io_clean_op(struct io_kiocb *req)
Pavel Begunkov's avatar
Pavel Begunkov committed
1651
{
1652 1653
	if (req->flags & REQ_F_BUFFER_SELECTED) {
		spin_lock(&req->ctx->completion_lock);
1654
		io_put_kbuf_comp(req);
1655 1656
		spin_unlock(&req->ctx->completion_lock);
	}
Pavel Begunkov's avatar
Pavel Begunkov committed
1657

1658
	if (req->flags & REQ_F_NEED_CLEANUP) {
1659
		const struct io_op_def *def = &io_op_defs[req->opcode];
1660

1661 1662
		if (def->cleanup)
			def->cleanup(req);
Pavel Begunkov's avatar
Pavel Begunkov committed
1663
	}
1664 1665 1666 1667 1668
	if ((req->flags & REQ_F_POLLED) && req->apoll) {
		kfree(req->apoll->double_poll);
		kfree(req->apoll);
		req->apoll = NULL;
	}
1669 1670 1671 1672 1673
	if (req->flags & REQ_F_INFLIGHT) {
		struct io_uring_task *tctx = req->task->io_uring;

		atomic_dec(&tctx->inflight_tracked);
	}
1674
	if (req->flags & REQ_F_CREDS)
1675
		put_cred(req->creds);
1676 1677 1678 1679
	if (req->flags & REQ_F_ASYNC_DATA) {
		kfree(req->async_data);
		req->async_data = NULL;
	}
1680
	req->flags &= ~IO_REQ_CLEAN_FLAGS;
Pavel Begunkov's avatar
Pavel Begunkov committed
1681 1682
}

1683 1684 1685 1686 1687 1688
static bool io_assign_file(struct io_kiocb *req, unsigned int issue_flags)
{
	if (req->file || !io_op_defs[req->opcode].needs_file)
		return true;

	if (req->flags & REQ_F_FIXED_FILE)
1689
		req->file = io_file_get_fixed(req, req->cqe.fd, issue_flags);
1690
	else
1691
		req->file = io_file_get_normal(req, req->cqe.fd);
1692

1693
	return !!req->file;
1694 1695
}

1696
static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
Jens Axboe's avatar
Jens Axboe committed
1697
{
1698
	const struct io_op_def *def = &io_op_defs[req->opcode];
1699
	const struct cred *creds = NULL;
1700
	int ret;
Jens Axboe's avatar
Jens Axboe committed
1701

1702 1703 1704
	if (unlikely(!io_assign_file(req, issue_flags)))
		return -EBADF;

1705
	if (unlikely((req->flags & REQ_F_CREDS) && req->creds != current_cred()))
1706
		creds = override_creds(req->creds);
1707

1708
	if (!def->audit_skip)
1709 1710
		audit_uring_entry(req->opcode);

1711
	ret = def->issue(req, issue_flags);
Jens Axboe's avatar
Jens Axboe committed
1712

1713
	if (!def->audit_skip)
1714 1715
		audit_uring_exit(!ret, ret);

1716 1717
	if (creds)
		revert_creds(creds);
1718

1719 1720
	if (ret == IOU_OK) {
		if (issue_flags & IO_URING_F_COMPLETE_DEFER)
1721
			io_req_complete_defer(req);
1722 1723 1724
		else
			io_req_complete_post(req);
	} else if (ret != IOU_ISSUE_SKIP_COMPLETE)
1725
		return ret;
1726

1727
	/* If the op doesn't have a file, we're not polling for it */
1728
	if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file)
1729
		io_iopoll_req_issued(req, issue_flags);
1730 1731

	return 0;
Jens Axboe's avatar
Jens Axboe committed
1732 1733
}

1734 1735 1736 1737 1738
int io_poll_issue(struct io_kiocb *req, bool *locked)
{
	io_tw_lock(req->ctx, locked);
	if (unlikely(req->task->flags & PF_EXITING))
		return -EFAULT;
1739
	return io_issue_sqe(req, IO_URING_F_NONBLOCK);
1740 1741
}

1742
struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
1743 1744 1745 1746 1747 1748 1749
{
	struct io_kiocb *req = container_of(work, struct io_kiocb, work);

	req = io_put_req_find_next(req);
	return req ? &req->work : NULL;
}

1750
void io_wq_submit_work(struct io_wq_work *work)
Jens Axboe's avatar
Jens Axboe committed
1751 1752
{
	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1753
	const struct io_op_def *def = &io_op_defs[req->opcode];
1754 1755
	unsigned int issue_flags = IO_URING_F_UNLOCKED;
	bool needs_poll = false;
1756
	int ret = 0, err = -ECANCELED;
Jens Axboe's avatar
Jens Axboe committed
1757

1758 1759 1760 1761 1762
	/* one will be dropped by ->io_free_work() after returning to io-wq */
	if (!(req->flags & REQ_F_REFCOUNT))
		__io_req_set_refcount(req, 2);
	else
		req_ref_get(req);
1763

1764
	io_arm_ltimeout(req);
1765

1766
	/* either cancelled or io-wq is dying, so don't touch tctx->iowq */
1767
	if (work->flags & IO_WQ_WORK_CANCEL) {
1768
fail:
1769
		io_req_task_queue_fail(req, err);
1770 1771
		return;
	}
1772 1773 1774 1775 1776
	if (!io_assign_file(req, issue_flags)) {
		err = -EBADF;
		work->flags |= IO_WQ_WORK_CANCEL;
		goto fail;
	}
1777

1778
	if (req->flags & REQ_F_FORCE_ASYNC) {
1779 1780 1781 1782
		bool opcode_poll = def->pollin || def->pollout;

		if (opcode_poll && file_can_poll(req->file)) {
			needs_poll = true;
1783
			issue_flags |= IO_URING_F_NONBLOCK;
1784
		}
1785
	}
1786

1787 1788 1789 1790 1791 1792 1793 1794 1795 1796
	do {
		ret = io_issue_sqe(req, issue_flags);
		if (ret != -EAGAIN)
			break;
		/*
		 * We can get EAGAIN for iopolled IO even though we're
		 * forcing a sync submission from here, since we can't
		 * wait for request slots on the block side.
		 */
		if (!needs_poll) {
1797 1798
			if (!(req->ctx->flags & IORING_SETUP_IOPOLL))
				break;
1799 1800
			cond_resched();
			continue;
1801 1802
		}

1803
		if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK)
1804 1805 1806 1807 1808
			return;
		/* aborted or ready, in either case retry blocking */
		needs_poll = false;
		issue_flags &= ~IO_URING_F_NONBLOCK;
	} while (1);
1809

1810
	/* avoid locking problems by failing it from a clean context */
1811
	if (ret < 0)
1812
		io_req_task_queue_fail(req, ret);
Jens Axboe's avatar
Jens Axboe committed
1813 1814
}

1815 1816
inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
				      unsigned int issue_flags)
1817
{
1818 1819
	struct io_ring_ctx *ctx = req->ctx;
	struct file *file = NULL;
1820
	unsigned long file_ptr;
1821

1822
	io_ring_submit_lock(ctx, issue_flags);
1823

1824
	if (unlikely((unsigned int)fd >= ctx->nr_user_files))
1825
		goto out;
1826 1827 1828 1829 1830
	fd = array_index_nospec(fd, ctx->nr_user_files);
	file_ptr = io_fixed_file_slot(&ctx->file_table, fd)->file_ptr;
	file = (struct file *) (file_ptr & FFS_MASK);
	file_ptr &= ~FFS_MASK;
	/* mask in overlapping REQ_F and FFS bits */
1831
	req->flags |= (file_ptr << REQ_F_SUPPORT_NOWAIT_BIT);
1832
	io_req_set_rsrc_node(req, ctx, 0);
1833
	WARN_ON_ONCE(file && !test_bit(fd, ctx->file_table.bitmap));
1834
out:
1835
	io_ring_submit_unlock(ctx, issue_flags);
1836 1837
	return file;
}
1838

1839
struct file *io_file_get_normal(struct io_kiocb *req, int fd)
1840
{
1841
	struct file *file = fget(fd);
1842

1843
	trace_io_uring_file_get(req, fd);
1844

1845
	/* we don't allow fixed io_uring files */
1846
	if (file && io_is_uring_fops(file))
1847
		io_req_track_inflight(req);
1848
	return file;
1849 1850
}

1851
static void io_queue_async(struct io_kiocb *req, int ret)
1852 1853
	__must_hold(&req->ctx->uring_lock)
{
1854 1855 1856 1857 1858 1859 1860 1861
	struct io_kiocb *linked_timeout;

	if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
		io_req_complete_failed(req, ret);
		return;
	}

	linked_timeout = io_prep_linked_timeout(req);
1862

1863
	switch (io_arm_poll_handler(req, 0)) {
1864
	case IO_APOLL_READY:
1865
		io_kbuf_recycle(req, 0);
1866 1867 1868 1869 1870 1871 1872
		io_req_task_queue(req);
		break;
	case IO_APOLL_ABORTED:
		/*
		 * Queued up for async execution, worker will release
		 * submit reference when the iocb is actually submitted.
		 */
1873
		io_kbuf_recycle(req, 0);
1874
		io_queue_iowq(req, NULL);
1875
		break;
1876 1877
	case IO_APOLL_OK:
		break;
1878 1879 1880 1881 1882 1883
	}

	if (linked_timeout)
		io_queue_linked_timeout(linked_timeout);
}

1884
static inline void io_queue_sqe(struct io_kiocb *req)
1885
	__must_hold(&req->ctx->uring_lock)
Jens Axboe's avatar
Jens Axboe committed
1886
{
1887
	int ret;
Jens Axboe's avatar
Jens Axboe committed
1888

1889
	ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
1890

1891 1892 1893 1894
	/*
	 * We async punt it if the file wasn't marked NOWAIT, or if the file
	 * doesn't support non-blocking read/write attempts
	 */
1895
	if (likely(!ret))
1896
		io_arm_ltimeout(req);
1897 1898
	else
		io_queue_async(req, ret);
Jens Axboe's avatar
Jens Axboe committed
1899 1900
}

1901
static void io_queue_sqe_fallback(struct io_kiocb *req)
1902
	__must_hold(&req->ctx->uring_lock)
1903
{
1904 1905 1906 1907 1908 1909 1910 1911
	if (unlikely(req->flags & REQ_F_FAIL)) {
		/*
		 * We don't submit, fail them all, for that replace hardlinks
		 * with normal links. Extra REQ_F_LINK is tolerated.
		 */
		req->flags &= ~REQ_F_HARDLINK;
		req->flags |= REQ_F_LINK;
		io_req_complete_failed(req, req->cqe.res);
1912 1913
	} else if (unlikely(req->ctx->drain_active)) {
		io_drain_req(req);
1914 1915 1916 1917 1918 1919
	} else {
		int ret = io_req_prep_async(req);

		if (unlikely(ret))
			io_req_complete_failed(req, ret);
		else
1920
			io_queue_iowq(req, NULL);
Jens Axboe's avatar
Jens Axboe committed
1921
	}
1922 1923
}

1924 1925 1926 1927 1928 1929 1930 1931
/*
 * Check SQE restrictions (opcode and flags).
 *
 * Returns 'true' if SQE is allowed, 'false' otherwise.
 */
static inline bool io_check_restriction(struct io_ring_ctx *ctx,
					struct io_kiocb *req,
					unsigned int sqe_flags)
1932
{
1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944
	if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
		return false;

	if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
	    ctx->restrictions.sqe_flags_required)
		return false;

	if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
			  ctx->restrictions.sqe_flags_required))
		return false;

	return true;
1945 1946
}

1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957
static void io_init_req_drain(struct io_kiocb *req)
{
	struct io_ring_ctx *ctx = req->ctx;
	struct io_kiocb *head = ctx->submit_state.link.head;

	ctx->drain_active = true;
	if (head) {
		/*
		 * If we need to drain a request in the middle of a link, drain
		 * the head request and the next request/link after the current
		 * link. Considering sequential execution of links,
1958
		 * REQ_F_IO_DRAIN will be maintained for every request of our
1959 1960
		 * link.
		 */
1961
		head->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
1962 1963 1964 1965
		ctx->drain_next = true;
	}
}

1966 1967
static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
		       const struct io_uring_sqe *sqe)
1968
	__must_hold(&ctx->uring_lock)
1969
{
1970
	const struct io_op_def *def;
1971
	unsigned int sqe_flags;
1972
	int personality;
1973
	u8 opcode;
1974

1975
	/* req is partially pre-initialised, see io_preinit_req() */
1976
	req->opcode = opcode = READ_ONCE(sqe->opcode);
1977 1978
	/* same numerical values with corresponding REQ_F_*, safe to copy */
	req->flags = sqe_flags = READ_ONCE(sqe->flags);
1979
	req->cqe.user_data = READ_ONCE(sqe->user_data);
1980
	req->file = NULL;
1981
	req->rsrc_node = NULL;
1982 1983
	req->task = current;

1984 1985
	if (unlikely(opcode >= IORING_OP_LAST)) {
		req->opcode = 0;
1986
		return -EINVAL;
1987
	}
1988
	def = &io_op_defs[opcode];
1989 1990 1991 1992
	if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) {
		/* enforce forwards compatibility on users */
		if (sqe_flags & ~SQE_VALID_FLAGS)
			return -EINVAL;
1993
		if (sqe_flags & IOSQE_BUFFER_SELECT) {
1994
			if (!def->buffer_select)
1995 1996 1997
				return -EOPNOTSUPP;
			req->buf_index = READ_ONCE(sqe->buf_group);
		}
1998 1999 2000 2001 2002
		if (sqe_flags & IOSQE_CQE_SKIP_SUCCESS)
			ctx->drain_disabled = true;
		if (sqe_flags & IOSQE_IO_DRAIN) {
			if (ctx->drain_disabled)
				return -EOPNOTSUPP;
2003
			io_init_req_drain(req);
2004
		}
2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015
	}
	if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
		if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags))
			return -EACCES;
		/* knock it to the slow queue path, will be drained there */
		if (ctx->drain_active)
			req->flags |= REQ_F_FORCE_ASYNC;
		/* if there is no link, we're at "next" request and need to drain */
		if (unlikely(ctx->drain_next) && !ctx->submit_state.link.head) {
			ctx->drain_next = false;
			ctx->drain_active = true;
2016
			req->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
2017
		}
2018
	}
2019

2020
	if (!def->ioprio && sqe->ioprio)
2021
		return -EINVAL;
2022
	if (!def->iopoll && (ctx->flags & IORING_SETUP_IOPOLL))
2023 2024
		return -EINVAL;

2025
	if (def->needs_file) {
2026 2027
		struct io_submit_state *state = &ctx->submit_state;

2028
		req->cqe.fd = READ_ONCE(sqe->fd);
2029

2030 2031 2032 2033
		/*
		 * Plug now if we have more than 2 IO left after this, and the
		 * target is potentially a read/write to block based storage.
		 */
2034
		if (state->need_plug && def->plug) {
2035 2036
			state->plug_started = true;
			state->need_plug = false;
2037
			blk_start_plug_nr_ios(&state->plug, state->submit_nr);
2038
		}
2039
	}
2040

2041 2042
	personality = READ_ONCE(sqe->personality);
	if (personality) {
2043 2044
		int ret;

2045 2046
		req->creds = xa_load(&ctx->personalities, personality);
		if (!req->creds)
2047
			return -EINVAL;
2048
		get_cred(req->creds);
2049 2050 2051 2052 2053
		ret = security_uring_override_creds(req->creds);
		if (ret) {
			put_cred(req->creds);
			return ret;
		}
2054
		req->flags |= REQ_F_CREDS;
2055
	}
2056

2057
	return def->prep(req, sqe);
2058 2059
}

2060 2061 2062 2063 2064 2065 2066
static __cold int io_submit_fail_init(const struct io_uring_sqe *sqe,
				      struct io_kiocb *req, int ret)
{
	struct io_ring_ctx *ctx = req->ctx;
	struct io_submit_link *link = &ctx->submit_state.link;
	struct io_kiocb *head = link->head;

2067
	trace_io_uring_req_failed(sqe, req, ret);
2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097

	/*
	 * Avoid breaking links in the middle as it renders links with SQPOLL
	 * unusable. Instead of failing eagerly, continue assembling the link if
	 * applicable and mark the head with REQ_F_FAIL. The link flushing code
	 * should find the flag and handle the rest.
	 */
	req_fail_link_node(req, ret);
	if (head && !(head->flags & REQ_F_FAIL))
		req_fail_link_node(head, -ECANCELED);

	if (!(req->flags & IO_REQ_LINK_FLAGS)) {
		if (head) {
			link->last->link = req;
			link->head = NULL;
			req = head;
		}
		io_queue_sqe_fallback(req);
		return ret;
	}

	if (head)
		link->last->link = req;
	else
		link->head = req;
	link->last = req;
	return 0;
}

static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2098
			 const struct io_uring_sqe *sqe)
2099
	__must_hold(&ctx->uring_lock)
2100
{
2101
	struct io_submit_link *link = &ctx->submit_state.link;
2102
	int ret;
2103

2104
	ret = io_init_req(ctx, req, sqe);
2105 2106
	if (unlikely(ret))
		return io_submit_fail_init(sqe, req, ret);
2107

2108
	/* don't need @sqe from now on */
2109
	trace_io_uring_submit_sqe(req, true);
2110

2111 2112 2113 2114 2115 2116 2117
	/*
	 * If we already have a head request, queue this one for async
	 * submittal once the head completes. If we don't have a head but
	 * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
	 * submitted sync once the chain is complete. If none of those
	 * conditions are true (normal request), then just queue it.
	 */
2118
	if (unlikely(link->head)) {
2119 2120 2121 2122
		ret = io_req_prep_async(req);
		if (unlikely(ret))
			return io_submit_fail_init(sqe, req, ret);

2123
		trace_io_uring_link(req, link->head);
2124
		link->last->link = req;
2125
		link->last = req;
2126

2127
		if (req->flags & IO_REQ_LINK_FLAGS)
2128
			return 0;
2129 2130
		/* last request of the link, flush it */
		req = link->head;
2131
		link->head = NULL;
2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143
		if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL))
			goto fallback;

	} else if (unlikely(req->flags & (IO_REQ_LINK_FLAGS |
					  REQ_F_FORCE_ASYNC | REQ_F_FAIL))) {
		if (req->flags & IO_REQ_LINK_FLAGS) {
			link->head = req;
			link->last = req;
		} else {
fallback:
			io_queue_sqe_fallback(req);
		}
2144
		return 0;
2145
	}
2146

2147
	io_queue_sqe(req);
2148
	return 0;
2149 2150
}

2151 2152 2153
/*
 * Batched submission is done, ensure local IO is flushed out.
 */
2154
static void io_submit_state_end(struct io_ring_ctx *ctx)
2155
{
2156 2157
	struct io_submit_state *state = &ctx->submit_state;

2158 2159
	if (unlikely(state->link.head))
		io_queue_sqe_fallback(state->link.head);
2160
	/* flush only after queuing links as they can generate completions */
2161
	io_submit_flush_completions(ctx);
2162 2163
	if (state->plug_started)
		blk_finish_plug(&state->plug);
2164 2165 2166 2167 2168 2169
}

/*
 * Start submission side cache.
 */
static void io_submit_state_start(struct io_submit_state *state,
2170
				  unsigned int max_ios)
2171
{
2172
	state->plug_started = false;
2173
	state->need_plug = max_ios > 2;
2174
	state->submit_nr = max_ios;
2175 2176
	/* set only head, no need to init link_last in advance */
	state->link.head = NULL;
2177 2178
}

Jens Axboe's avatar
Jens Axboe committed
2179 2180
static void io_commit_sqring(struct io_ring_ctx *ctx)
{
2181
	struct io_rings *rings = ctx->rings;
Jens Axboe's avatar
Jens Axboe committed
2182

2183 2184 2185 2186 2187 2188
	/*
	 * Ensure any loads from the SQEs are done at this point,
	 * since once we write the new head, the application could
	 * write new data to them.
	 */
	smp_store_release(&rings->sq.head, ctx->cached_sq_head);
Jens Axboe's avatar
Jens Axboe committed
2189 2190 2191
}

/*
2192
 * Fetch an sqe, if one is available. Note this returns a pointer to memory
Jens Axboe's avatar
Jens Axboe committed
2193 2194 2195 2196 2197 2198
 * that is mapped by userspace. This means that care needs to be taken to
 * ensure that reads are stable, as we cannot rely on userspace always
 * being a good citizen. If members of the sqe are validated and then later
 * used, it's important that those reads are done through READ_ONCE() to
 * prevent a re-load down the line.
 */
2199
static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx)
Jens Axboe's avatar
Jens Axboe committed
2200
{
2201
	unsigned head, mask = ctx->sq_entries - 1;
2202
	unsigned sq_idx = ctx->cached_sq_head++ & mask;
Jens Axboe's avatar
Jens Axboe committed
2203 2204 2205 2206 2207 2208 2209 2210 2211

	/*
	 * The cached sq head (or cq tail) serves two purposes:
	 *
	 * 1) allows us to batch the cost of updating the user visible
	 *    head updates.
	 * 2) allows the kernel side to track the head on its own, even
	 *    though the application is the one updating it.
	 */
2212
	head = READ_ONCE(ctx->sq_array[sq_idx]);
2213 2214 2215 2216
	if (likely(head < ctx->sq_entries)) {
		/* double index for 128-byte SQEs, twice as long */
		if (ctx->flags & IORING_SETUP_SQE128)
			head <<= 1;
2217
		return &ctx->sq_sqes[head];
2218
	}
Jens Axboe's avatar
Jens Axboe committed
2219 2220

	/* drop invalid entries */
2221 2222 2223
	ctx->cq_extra--;
	WRITE_ONCE(ctx->rings->sq_dropped,
		   READ_ONCE(ctx->rings->sq_dropped) + 1);
2224 2225 2226
	return NULL;
}

2227
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
2228
	__must_hold(&ctx->uring_lock)
2229
{
2230
	unsigned int entries = io_sqring_entries(ctx);
2231 2232
	unsigned int left;
	int ret;
2233

2234
	if (unlikely(!entries))
2235
		return 0;
2236
	/* make sure SQ entry isn't read before tail */
2237 2238 2239
	ret = left = min3(nr, ctx->sq_entries, entries);
	io_get_task_refs(left);
	io_submit_state_start(&ctx->submit_state, left);
2240

2241
	do {
2242
		const struct io_uring_sqe *sqe;
2243
		struct io_kiocb *req;
2244

2245
		if (unlikely(!io_alloc_req_refill(ctx)))
2246
			break;
2247
		req = io_alloc_req(ctx);
2248 2249
		sqe = io_get_sqe(ctx);
		if (unlikely(!sqe)) {
2250
			io_req_add_to_cache(req, ctx);
2251 2252
			break;
		}
2253

2254 2255 2256 2257 2258 2259 2260 2261
		/*
		 * Continue submitting even for sqe failure if the
		 * ring was setup with IORING_SETUP_SUBMIT_ALL
		 */
		if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
		    !(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
			left--;
			break;
2262
		}
2263
	} while (--left);
2264

2265 2266 2267 2268 2269 2270
	if (unlikely(left)) {
		ret -= left;
		/* try again if it submitted nothing and can't allocate a req */
		if (!ret && io_req_cache_empty(ctx))
			ret = -EAGAIN;
		current->io_uring->cached_refs += left;
2271
	}
2272

2273
	io_submit_state_end(ctx);
2274 2275
	 /* Commit SQ ring head once we've consumed and submitted all SQEs */
	io_commit_sqring(ctx);
2276
	return ret;
2277 2278
}

2279 2280 2281
struct io_wait_queue {
	struct wait_queue_entry wq;
	struct io_ring_ctx *ctx;
2282
	unsigned cq_tail;
2283 2284 2285
	unsigned nr_timeouts;
};

2286 2287
static inline bool io_has_work(struct io_ring_ctx *ctx)
{
2288 2289 2290
	return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
	       ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
		!llist_empty(&ctx->work_llist));
2291 2292
}

2293
static inline bool io_should_wake(struct io_wait_queue *iowq)
2294 2295
{
	struct io_ring_ctx *ctx = iowq->ctx;
2296
	int dist = ctx->cached_cq_tail - (int) iowq->cq_tail;
2297 2298

	/*
2299
	 * Wake up if we have enough events, or if a timeout occurred since we
2300 2301 2302
	 * started waiting. For timeouts, we always want to return to userspace,
	 * regardless of event count.
	 */
2303
	return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
2304 2305 2306 2307 2308 2309 2310
}

static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
			    int wake_flags, void *key)
{
	struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
							wq);
2311
	struct io_ring_ctx *ctx = iowq->ctx;
2312

2313 2314 2315 2316
	/*
	 * Cannot safely flush overflowed CQEs from here, ensure we wake up
	 * the task, and the next invocation will do it.
	 */
2317
	if (io_should_wake(iowq) || io_has_work(ctx))
2318 2319
		return autoremove_wake_function(curr, mode, wake_flags, key);
	return -1;
2320 2321
}

2322
int io_run_task_work_sig(struct io_ring_ctx *ctx)
2323
{
2324
	if (io_run_task_work_ctx(ctx) > 0)
2325
		return 1;
2326 2327 2328
	if (task_sigpending(current))
		return -EINTR;
	return 0;
2329 2330
}

2331 2332 2333
/* when returns >0, the caller should retry */
static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
					  struct io_wait_queue *iowq,
2334
					  ktime_t timeout)
2335 2336
{
	int ret;
2337
	unsigned long check_cq;
2338 2339

	/* make sure we run task_work before checking for signals */
2340
	ret = io_run_task_work_sig(ctx);
2341 2342
	if (ret || io_should_wake(iowq))
		return ret;
2343

2344
	check_cq = READ_ONCE(ctx->check_cq);
2345 2346 2347 2348 2349 2350 2351
	if (unlikely(check_cq)) {
		/* let the caller flush overflows, retry */
		if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
			return 1;
		if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
			return -EBADR;
	}
2352 2353 2354
	if (!schedule_hrtimeout(&timeout, HRTIMER_MODE_ABS))
		return -ETIME;
	return 1;
2355 2356
}

Jens Axboe's avatar
Jens Axboe committed
2357 2358 2359 2360 2361
/*
 * Wait until events become available, if we don't already have some. The
 * application must reap them itself, as they reside on the shared cq ring.
 */
static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2362 2363
			  const sigset_t __user *sig, size_t sigsz,
			  struct __kernel_timespec __user *uts)
Jens Axboe's avatar
Jens Axboe committed
2364
{
2365
	struct io_wait_queue iowq;
2366
	struct io_rings *rings = ctx->rings;
2367
	ktime_t timeout = KTIME_MAX;
2368
	int ret;
Jens Axboe's avatar
Jens Axboe committed
2369

2370
	do {
2371 2372 2373 2374
		/* always run at least 1 task work to process local work */
		ret = io_run_task_work_ctx(ctx);
		if (ret < 0)
			return ret;
2375
		io_cqring_overflow_flush(ctx);
2376
		if (io_cqring_events(ctx) >= min_events)
2377
			return 0;
2378
	} while (ret > 0);
Jens Axboe's avatar
Jens Axboe committed
2379 2380

	if (sig) {
2381 2382 2383
#ifdef CONFIG_COMPAT
		if (in_compat_syscall())
			ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2384
						      sigsz);
2385 2386
		else
#endif
2387
			ret = set_user_sigmask(sig, sigsz);
2388

Jens Axboe's avatar
Jens Axboe committed
2389 2390 2391 2392
		if (ret)
			return ret;
	}

2393 2394 2395 2396 2397 2398 2399 2400
	if (uts) {
		struct timespec64 ts;

		if (get_timespec64(&ts, uts))
			return -EFAULT;
		timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
	}

2401 2402 2403 2404
	init_waitqueue_func_entry(&iowq.wq, io_wake_function);
	iowq.wq.private = current;
	INIT_LIST_HEAD(&iowq.wq.entry);
	iowq.ctx = ctx;
2405
	iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
2406
	iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
2407

2408
	trace_io_uring_cqring_wait(ctx, min_events);
2409
	do {
2410
		/* if we can't even flush overflow, don't wait for more */
2411
		if (!io_cqring_overflow_flush(ctx)) {
2412 2413 2414
			ret = -EBUSY;
			break;
		}
2415
		prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
2416
						TASK_INTERRUPTIBLE);
2417
		ret = io_cqring_wait_schedule(ctx, &iowq, timeout);
2418
		cond_resched();
2419
	} while (ret > 0);
2420

2421
	finish_wait(&ctx->cq_wait, &iowq.wq);
2422
	restore_saved_sigmask_unless(ret == -EINTR);
Jens Axboe's avatar
Jens Axboe committed
2423

2424
	return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
Jens Axboe's avatar
Jens Axboe committed
2425 2426
}

2427
static void io_mem_free(void *ptr)
2428
{
2429
	struct page *page;
2430

2431 2432
	if (!ptr)
		return;
2433

2434 2435 2436
	page = virt_to_head_page(ptr);
	if (put_page_testzero(page))
		free_compound_page(page);
2437 2438
}

2439
static void *io_mem_alloc(size_t size)
2440
{
2441
	gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP;
2442

2443
	return (void *) __get_free_pages(gfp, get_order(size));
2444 2445
}

2446 2447
static unsigned long rings_size(struct io_ring_ctx *ctx, unsigned int sq_entries,
				unsigned int cq_entries, size_t *sq_offset)
2448
{
2449 2450
	struct io_rings *rings;
	size_t off, sq_array_size;
2451

2452 2453 2454 2455 2456 2457 2458
	off = struct_size(rings, cqes, cq_entries);
	if (off == SIZE_MAX)
		return SIZE_MAX;
	if (ctx->flags & IORING_SETUP_CQE32) {
		if (check_shl_overflow(off, 1, &off))
			return SIZE_MAX;
	}
2459

2460 2461 2462 2463 2464
#ifdef CONFIG_SMP
	off = ALIGN(off, SMP_CACHE_BYTES);
	if (off == 0)
		return SIZE_MAX;
#endif
2465

2466 2467
	if (sq_offset)
		*sq_offset = off;
2468

2469 2470 2471
	sq_array_size = array_size(sizeof(u32), sq_entries);
	if (sq_array_size == SIZE_MAX)
		return SIZE_MAX;
2472

2473 2474
	if (check_add_overflow(off, sq_array_size, &off))
		return SIZE_MAX;
2475

2476
	return off;
2477 2478
}

2479 2480
static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg,
			       unsigned int eventfd_async)
2481
{
2482 2483 2484
	struct io_ev_fd *ev_fd;
	__s32 __user *fds = arg;
	int fd;
2485

2486 2487 2488 2489
	ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
					lockdep_is_held(&ctx->uring_lock));
	if (ev_fd)
		return -EBUSY;
2490

2491 2492
	if (copy_from_user(&fd, fds, sizeof(*fds)))
		return -EFAULT;
2493

2494 2495 2496
	ev_fd = kmalloc(sizeof(*ev_fd), GFP_KERNEL);
	if (!ev_fd)
		return -ENOMEM;
2497

2498 2499 2500 2501 2502 2503
	ev_fd->cq_ev_fd = eventfd_ctx_fdget(fd);
	if (IS_ERR(ev_fd->cq_ev_fd)) {
		int ret = PTR_ERR(ev_fd->cq_ev_fd);
		kfree(ev_fd);
		return ret;
	}
2504 2505 2506 2507 2508

	spin_lock(&ctx->completion_lock);
	ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
	spin_unlock(&ctx->completion_lock);

2509 2510 2511
	ev_fd->eventfd_async = eventfd_async;
	ctx->has_evfd = true;
	rcu_assign_pointer(ctx->io_ev_fd, ev_fd);
2512 2513
	atomic_set(&ev_fd->refs, 1);
	atomic_set(&ev_fd->ops, 0);
2514
	return 0;
2515 2516
}

2517
static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2518
{
2519 2520 2521 2522 2523 2524 2525
	struct io_ev_fd *ev_fd;

	ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
					lockdep_is_held(&ctx->uring_lock));
	if (ev_fd) {
		ctx->has_evfd = false;
		rcu_assign_pointer(ctx->io_ev_fd, NULL);
2526 2527
		if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_FREE_BIT), &ev_fd->ops))
			call_rcu(&ev_fd->rcu, io_eventfd_ops);
2528 2529
		return 0;
	}
2530

2531
	return -ENXIO;
2532 2533
}

2534
static void io_req_caches_free(struct io_ring_ctx *ctx)
Jens Axboe's avatar
Jens Axboe committed
2535
{
2536
	struct io_submit_state *state = &ctx->submit_state;
2537
	int nr = 0;
2538

2539
	mutex_lock(&ctx->uring_lock);
2540
	io_flush_cached_locked_reqs(ctx, state);
2541

2542
	while (!io_req_cache_empty(ctx)) {
2543 2544
		struct io_wq_work_node *node;
		struct io_kiocb *req;
2545

2546 2547 2548
		node = wq_stack_extract(&state->free_list);
		req = container_of(node, struct io_kiocb, comp_list);
		kmem_cache_free(req_cachep, req);
2549
		nr++;
2550
	}
2551 2552
	if (nr)
		percpu_ref_put_many(&ctx->refs, nr);
2553 2554 2555
	mutex_unlock(&ctx->uring_lock);
}

2556
static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
Jens Axboe's avatar
Jens Axboe committed
2557
{
2558
	io_sq_thread_finish(ctx);
2559

2560
	if (ctx->mm_account) {
2561 2562
		mmdrop(ctx->mm_account);
		ctx->mm_account = NULL;
2563
	}
2564

2565
	io_rsrc_refs_drop(ctx);
2566 2567 2568 2569
	/* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
	io_wait_rsrc_data(ctx->buf_data);
	io_wait_rsrc_data(ctx->file_data);

2570
	mutex_lock(&ctx->uring_lock);
2571
	if (ctx->buf_data)
2572
		__io_sqe_buffers_unregister(ctx);
2573
	if (ctx->file_data)
2574
		__io_sqe_files_unregister(ctx);
2575 2576
	if (ctx->rings)
		__io_cqring_overflow_flush(ctx, true);
2577
	io_eventfd_unregister(ctx);
2578
	io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free);
Jens Axboe's avatar
Jens Axboe committed
2579
	io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
2580
	mutex_unlock(&ctx->uring_lock);
2581
	io_destroy_buffers(ctx);
2582 2583
	if (ctx->sq_creds)
		put_cred(ctx->sq_creds);
2584 2585
	if (ctx->submitter_task)
		put_task_struct(ctx->submitter_task);
2586

2587 2588 2589
	/* there are no registered resources left, nobody uses it */
	if (ctx->rsrc_node)
		io_rsrc_node_destroy(ctx->rsrc_node);
2590
	if (ctx->rsrc_backup_node)
2591
		io_rsrc_node_destroy(ctx->rsrc_backup_node);
2592
	flush_delayed_work(&ctx->rsrc_put_work);
2593
	flush_delayed_work(&ctx->fallback_work);
2594 2595 2596

	WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
	WARN_ON_ONCE(!llist_empty(&ctx->rsrc_put_llist));
2597

Jens Axboe's avatar
Jens Axboe committed
2598
#if defined(CONFIG_UNIX)
2599 2600
	if (ctx->ring_sock) {
		ctx->ring_sock->file = NULL; /* so that iput() is called */
Jens Axboe's avatar
Jens Axboe committed
2601
		sock_release(ctx->ring_sock);
2602
	}
Jens Axboe's avatar
Jens Axboe committed
2603
#endif
2604
	WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
2605
	WARN_ON_ONCE(ctx->notif_slots || ctx->nr_notif_slots);
Jens Axboe's avatar
Jens Axboe committed
2606

2607
	io_mem_free(ctx->rings);
Jens Axboe's avatar
Jens Axboe committed
2608 2609 2610 2611
	io_mem_free(ctx->sq_sqes);

	percpu_ref_exit(&ctx->refs);
	free_uid(ctx->user);
2612
	io_req_caches_free(ctx);
2613 2614
	if (ctx->hash_map)
		io_wq_put_hash(ctx->hash_map);
2615
	kfree(ctx->cancel_table.hbs);
2616
	kfree(ctx->cancel_table_locked.hbs);
2617
	kfree(ctx->dummy_ubuf);
2618 2619
	kfree(ctx->io_bl);
	xa_destroy(&ctx->io_bl_xa);
Jens Axboe's avatar
Jens Axboe committed
2620 2621 2622 2623 2624 2625 2626 2627
	kfree(ctx);
}

static __poll_t io_uring_poll(struct file *file, poll_table *wait)
{
	struct io_ring_ctx *ctx = file->private_data;
	__poll_t mask = 0;

2628
	poll_wait(file, &ctx->cq_wait, wait);
2629 2630 2631 2632
	/*
	 * synchronizes with barrier from wq_has_sleeper call in
	 * io_commit_cqring
	 */
Jens Axboe's avatar
Jens Axboe committed
2633
	smp_rmb();
2634
	if (!io_sqring_full(ctx))
Jens Axboe's avatar
Jens Axboe committed
2635
		mask |= EPOLLOUT | EPOLLWRNORM;
2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649

	/*
	 * Don't flush cqring overflow list here, just do a simple check.
	 * Otherwise there could possible be ABBA deadlock:
	 *      CPU0                    CPU1
	 *      ----                    ----
	 * lock(&ctx->uring_lock);
	 *                              lock(&ep->mtx);
	 *                              lock(&ctx->uring_lock);
	 * lock(&ep->mtx);
	 *
	 * Users may get EPOLLIN meanwhile seeing nothing in cqring, this
	 * pushs them to do the flush.
	 */
2650 2651

	if (io_cqring_events(ctx) || io_has_work(ctx))
Jens Axboe's avatar
Jens Axboe committed
2652 2653 2654 2655 2656
		mask |= EPOLLIN | EPOLLRDNORM;

	return mask;
}

2657
static int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id)
2658
{
Jens Axboe's avatar
Jens Axboe committed
2659
	const struct cred *creds;
2660

2661
	creds = xa_erase(&ctx->personalities, id);
Jens Axboe's avatar
Jens Axboe committed
2662 2663
	if (creds) {
		put_cred(creds);
2664
		return 0;
2665
	}
2666 2667 2668 2669

	return -EINVAL;
}

2670 2671 2672
struct io_tctx_exit {
	struct callback_head		task_work;
	struct completion		completion;
2673
	struct io_ring_ctx		*ctx;
2674 2675
};

2676
static __cold void io_tctx_exit_cb(struct callback_head *cb)
2677 2678 2679 2680 2681 2682 2683 2684 2685 2686
{
	struct io_uring_task *tctx = current->io_uring;
	struct io_tctx_exit *work;

	work = container_of(cb, struct io_tctx_exit, task_work);
	/*
	 * When @in_idle, we're in cancellation and it's racy to remove the
	 * node. It'll be removed by the end of cancellation, just ignore it.
	 */
	if (!atomic_read(&tctx->in_idle))
2687
		io_uring_del_tctx_node((unsigned long)work->ctx);
2688 2689 2690
	complete(&work->completion);
}

2691
static __cold bool io_cancel_ctx_cb(struct io_wq_work *work, void *data)
2692 2693 2694 2695 2696 2697
{
	struct io_kiocb *req = container_of(work, struct io_kiocb, work);

	return req->ctx == data;
}

2698
static __cold void io_ring_exit_work(struct work_struct *work)
2699
{
2700
	struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
2701
	unsigned long timeout = jiffies + HZ * 60 * 5;
2702
	unsigned long interval = HZ / 20;
2703 2704 2705
	struct io_tctx_exit exit;
	struct io_tctx_node *node;
	int ret;
2706

2707 2708 2709 2710 2711 2712
	/*
	 * If we're doing polled IO and end up having requests being
	 * submitted async (out-of-line), then completions can come in while
	 * we're waiting for refs to drop. We need to reap these manually,
	 * as nobody else will be looking for them.
	 */
2713
	do {
2714 2715 2716
		if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
			io_move_task_work_from_local(ctx);

2717 2718 2719
		while (io_uring_try_cancel_requests(ctx, NULL, true))
			cond_resched();

2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730
		if (ctx->sq_data) {
			struct io_sq_data *sqd = ctx->sq_data;
			struct task_struct *tsk;

			io_sq_thread_park(sqd);
			tsk = sqd->thread;
			if (tsk && tsk->io_uring && tsk->io_uring->io_wq)
				io_wq_cancel_cb(tsk->io_uring->io_wq,
						io_cancel_ctx_cb, ctx, true);
			io_sq_thread_unpark(sqd);
		}
2731

2732 2733
		io_req_caches_free(ctx);

2734 2735 2736 2737 2738
		if (WARN_ON_ONCE(time_after(jiffies, timeout))) {
			/* there is little hope left, don't run it too often */
			interval = HZ * 60;
		}
	} while (!wait_for_completion_timeout(&ctx->ref_comp, interval));
2739

2740 2741 2742
	init_completion(&exit.completion);
	init_task_work(&exit.task_work, io_tctx_exit_cb);
	exit.ctx = ctx;
2743 2744 2745
	/*
	 * Some may use context even when all refs and requests have been put,
	 * and they are free to do so while still holding uring_lock or
2746
	 * completion_lock, see io_req_task_submit(). Apart from other work,
2747 2748
	 * this lock/unlock section also waits them to finish.
	 */
2749 2750
	mutex_lock(&ctx->uring_lock);
	while (!list_empty(&ctx->tctx_list)) {
2751 2752
		WARN_ON_ONCE(time_after(jiffies, timeout));

2753 2754
		node = list_first_entry(&ctx->tctx_list, struct io_tctx_node,
					ctx_node);
2755 2756
		/* don't spin on a single task if cancellation failed */
		list_rotate_left(&ctx->tctx_list);
2757 2758 2759 2760 2761 2762 2763 2764 2765
		ret = task_work_add(node->task, &exit.task_work, TWA_SIGNAL);
		if (WARN_ON_ONCE(ret))
			continue;

		mutex_unlock(&ctx->uring_lock);
		wait_for_completion(&exit.completion);
		mutex_lock(&ctx->uring_lock);
	}
	mutex_unlock(&ctx->uring_lock);
2766 2767
	spin_lock(&ctx->completion_lock);
	spin_unlock(&ctx->completion_lock);
2768

2769 2770 2771
	io_ring_ctx_free(ctx);
}

2772
static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
Jens Axboe's avatar
Jens Axboe committed
2773
{
2774 2775 2776
	unsigned long index;
	struct creds *creds;

Jens Axboe's avatar
Jens Axboe committed
2777 2778
	mutex_lock(&ctx->uring_lock);
	percpu_ref_kill(&ctx->refs);
2779
	if (ctx->rings)
2780
		__io_cqring_overflow_flush(ctx, true);
2781 2782
	xa_for_each(&ctx->personalities, index, creds)
		io_unregister_personality(ctx, index);
2783 2784
	if (ctx->rings)
		io_poll_remove_all(ctx, NULL, true);
Jens Axboe's avatar
Jens Axboe committed
2785 2786
	mutex_unlock(&ctx->uring_lock);

2787 2788 2789 2790 2791 2792
	/* failed during ring init, it couldn't have issued any requests */
	if (ctx->rings) {
		io_kill_timeouts(ctx, NULL, true);
		/* if we failed setting up the ctx, we might not have any rings */
		io_iopoll_try_reap_events(ctx);
	}
2793

2794
	INIT_WORK(&ctx->exit_work, io_ring_exit_work);
2795 2796 2797 2798 2799 2800 2801
	/*
	 * Use system_unbound_wq to avoid spawning tons of event kworkers
	 * if we're exiting a ton of rings at the same time. It just adds
	 * noise and overhead, there's no discernable change in runtime
	 * over using system_wq.
	 */
	queue_work(system_unbound_wq, &ctx->exit_work);
Jens Axboe's avatar
Jens Axboe committed
2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812
}

static int io_uring_release(struct inode *inode, struct file *file)
{
	struct io_ring_ctx *ctx = file->private_data;

	file->private_data = NULL;
	io_ring_ctx_wait_and_kill(ctx);
	return 0;
}

2813 2814
struct io_task_cancel {
	struct task_struct *task;
2815
	bool all;
2816
};
2817

2818
static bool io_cancel_task_cb(struct io_wq_work *work, void *data)
2819
{
2820
	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2821
	struct io_task_cancel *cancel = data;
2822

2823
	return io_match_task_safe(req, cancel->task, cancel->all);
2824 2825
}

2826 2827 2828
static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx,
					 struct task_struct *task,
					 bool cancel_all)
2829
{
2830
	struct io_defer_entry *de;
2831 2832
	LIST_HEAD(list);

2833
	spin_lock(&ctx->completion_lock);
2834
	list_for_each_entry_reverse(de, &ctx->defer_list, list) {
2835
		if (io_match_task_safe(de->req, task, cancel_all)) {
2836 2837 2838 2839
			list_cut_position(&list, &ctx->defer_list, &de->list);
			break;
		}
	}
2840
	spin_unlock(&ctx->completion_lock);
2841 2842
	if (list_empty(&list))
		return false;
2843 2844 2845 2846

	while (!list_empty(&list)) {
		de = list_first_entry(&list, struct io_defer_entry, list);
		list_del_init(&de->list);
2847
		io_req_complete_failed(de->req, -ECANCELED);
2848 2849
		kfree(de);
	}
2850
	return true;
2851 2852
}

2853
static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876
{
	struct io_tctx_node *node;
	enum io_wq_cancel cret;
	bool ret = false;

	mutex_lock(&ctx->uring_lock);
	list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
		struct io_uring_task *tctx = node->task->io_uring;

		/*
		 * io_wq will stay alive while we hold uring_lock, because it's
		 * killed after ctx nodes, which requires to take the lock.
		 */
		if (!tctx || !tctx->io_wq)
			continue;
		cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true);
		ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
	}
	mutex_unlock(&ctx->uring_lock);

	return ret;
}

2877
static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
2878 2879
						struct task_struct *task,
						bool cancel_all)
2880
{
2881
	struct io_task_cancel cancel = { .task = task, .all = cancel_all, };
2882
	struct io_uring_task *tctx = task ? task->io_uring : NULL;
2883 2884
	enum io_wq_cancel cret;
	bool ret = false;
2885

2886 2887
	/* failed during ring init, it couldn't have issued any requests */
	if (!ctx->rings)
2888
		return false;
2889

2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900
	if (!task) {
		ret |= io_uring_try_cancel_iowq(ctx);
	} else if (tctx && tctx->io_wq) {
		/*
		 * Cancels requests of all rings, not only @ctx, but
		 * it's fine as the task is in exit/exec.
		 */
		cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb,
				       &cancel, true);
		ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
	}
2901

2902 2903 2904 2905 2906 2907
	/* SQPOLL thread does its own polling */
	if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) ||
	    (ctx->sq_data && ctx->sq_data->thread == current)) {
		while (!wq_list_empty(&ctx->iopoll_list)) {
			io_iopoll_try_reap_events(ctx);
			ret = true;
2908 2909
		}
	}
2910

2911 2912
	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
		ret |= io_run_local_work(ctx) > 0;
2913 2914 2915 2916 2917 2918
	ret |= io_cancel_defer_files(ctx, task, cancel_all);
	mutex_lock(&ctx->uring_lock);
	ret |= io_poll_remove_all(ctx, task, cancel_all);
	mutex_unlock(&ctx->uring_lock);
	ret |= io_kill_timeouts(ctx, task, cancel_all);
	if (task)
2919
		ret |= io_run_task_work() > 0;
2920
	return ret;
2921 2922
}

2923
static s64 tctx_inflight(struct io_uring_task *tctx, bool tracked)
2924
{
2925
	if (tracked)
2926
		return atomic_read(&tctx->inflight_tracked);
2927 2928 2929
	return percpu_counter_sum(&tctx->inflight);
}

2930 2931
/*
 * Find any io_uring ctx that this task has registered or done IO on, and cancel
2932
 * requests. @sqd should be not-null IFF it's an SQPOLL thread cancellation.
2933
 */
2934
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
2935
{
2936
	struct io_uring_task *tctx = current->io_uring;
2937
	struct io_ring_ctx *ctx;
2938 2939
	s64 inflight;
	DEFINE_WAIT(wait);
2940

2941 2942
	WARN_ON_ONCE(sqd && sqd->thread != current);

2943 2944
	if (!current->io_uring)
		return;
2945 2946 2947
	if (tctx->io_wq)
		io_wq_exit_start(tctx->io_wq);

2948 2949
	atomic_inc(&tctx->in_idle);
	do {
2950 2951
		bool loop = false;

2952
		io_uring_drop_tctx_refs(current);
2953
		/* read completions before cancelations */
2954
		inflight = tctx_inflight(tctx, !cancel_all);
2955 2956
		if (!inflight)
			break;
2957

2958 2959 2960
		if (!sqd) {
			struct io_tctx_node *node;
			unsigned long index;
2961

2962 2963 2964 2965
			xa_for_each(&tctx->xa, index, node) {
				/* sqpoll task will cancel all its requests */
				if (node->ctx->sq_data)
					continue;
2966 2967
				loop |= io_uring_try_cancel_requests(node->ctx,
							current, cancel_all);
2968 2969 2970
			}
		} else {
			list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
2971 2972 2973 2974 2975 2976 2977 2978
				loop |= io_uring_try_cancel_requests(ctx,
								     current,
								     cancel_all);
		}

		if (loop) {
			cond_resched();
			continue;
2979
		}
2980

2981 2982
		prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
		io_run_task_work();
2983
		io_uring_drop_tctx_refs(current);
2984

2985
		/*
2986 2987 2988
		 * If we've seen completions, retry without waiting. This
		 * avoids a race where a completion comes in before we did
		 * prepare_to_wait().
2989
		 */
2990
		if (inflight == tctx_inflight(tctx, !cancel_all))
2991
			schedule();
2992
		finish_wait(&tctx->wait, &wait);
2993
	} while (1);
2994

2995
	io_uring_clean_tctx(tctx);
2996
	if (cancel_all) {
2997 2998 2999 3000 3001
		/*
		 * We shouldn't run task_works after cancel, so just leave
		 * ->in_idle set for normal exit.
		 */
		atomic_dec(&tctx->in_idle);
3002 3003 3004
		/* for exec all current's requests should be gone, kill tctx */
		__io_uring_free(current);
	}
3005 3006
}

3007
void __io_uring_cancel(bool cancel_all)
3008
{
3009
	io_uring_cancel_generic(cancel_all, NULL);
3010 3011
}

3012 3013
static void *io_uring_validate_mmap_request(struct file *file,
					    loff_t pgoff, size_t sz)
Jens Axboe's avatar
Jens Axboe committed
3014 3015
{
	struct io_ring_ctx *ctx = file->private_data;
3016
	loff_t offset = pgoff << PAGE_SHIFT;
Jens Axboe's avatar
Jens Axboe committed
3017 3018 3019 3020 3021
	struct page *page;
	void *ptr;

	switch (offset) {
	case IORING_OFF_SQ_RING:
3022 3023
	case IORING_OFF_CQ_RING:
		ptr = ctx->rings;
Jens Axboe's avatar
Jens Axboe committed
3024 3025 3026 3027 3028
		break;
	case IORING_OFF_SQES:
		ptr = ctx->sq_sqes;
		break;
	default:
3029
		return ERR_PTR(-EINVAL);
Jens Axboe's avatar
Jens Axboe committed
3030 3031 3032
	}

	page = virt_to_head_page(ptr);
3033
	if (sz > page_size(page))
3034 3035 3036 3037 3038 3039 3040
		return ERR_PTR(-EINVAL);

	return ptr;
}

#ifdef CONFIG_MMU

3041
static __cold int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
3042 3043 3044 3045 3046 3047 3048 3049
{
	size_t sz = vma->vm_end - vma->vm_start;
	unsigned long pfn;
	void *ptr;

	ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
	if (IS_ERR(ptr))
		return PTR_ERR(ptr);
Jens Axboe's avatar
Jens Axboe committed
3050 3051 3052 3053 3054

	pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
	return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
}

3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081
#else /* !CONFIG_MMU */

static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
{
	return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
}

static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
{
	return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
}

static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
	unsigned long addr, unsigned long len,
	unsigned long pgoff, unsigned long flags)
{
	void *ptr;

	ptr = io_uring_validate_mmap_request(file, pgoff, len);
	if (IS_ERR(ptr))
		return PTR_ERR(ptr);

	return (unsigned long) ptr;
}

#endif /* !CONFIG_MMU */

3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094
static int io_validate_ext_arg(unsigned flags, const void __user *argp, size_t argsz)
{
	if (flags & IORING_ENTER_EXT_ARG) {
		struct io_uring_getevents_arg arg;

		if (argsz != sizeof(arg))
			return -EINVAL;
		if (copy_from_user(&arg, argp, sizeof(arg)))
			return -EFAULT;
	}
	return 0;
}

3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118
static int io_get_ext_arg(unsigned flags, const void __user *argp, size_t *argsz,
			  struct __kernel_timespec __user **ts,
			  const sigset_t __user **sig)
{
	struct io_uring_getevents_arg arg;

	/*
	 * If EXT_ARG isn't set, then we have no timespec and the argp pointer
	 * is just a pointer to the sigset_t.
	 */
	if (!(flags & IORING_ENTER_EXT_ARG)) {
		*sig = (const sigset_t __user *) argp;
		*ts = NULL;
		return 0;
	}

	/*
	 * EXT_ARG is set - ensure we agree on the size of it and copy in our
	 * timespec and sigset_t pointers if good.
	 */
	if (*argsz != sizeof(arg))
		return -EINVAL;
	if (copy_from_user(&arg, argp, sizeof(arg)))
		return -EFAULT;
3119 3120
	if (arg.pad)
		return -EINVAL;
3121 3122 3123 3124 3125 3126
	*sig = u64_to_user_ptr(arg.sigmask);
	*argsz = arg.sigmask_sz;
	*ts = u64_to_user_ptr(arg.ts);
	return 0;
}

Jens Axboe's avatar
Jens Axboe committed
3127
SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
3128 3129
		u32, min_complete, u32, flags, const void __user *, argp,
		size_t, argsz)
Jens Axboe's avatar
Jens Axboe committed
3130 3131 3132
{
	struct io_ring_ctx *ctx;
	struct fd f;
3133
	long ret;
Jens Axboe's avatar
Jens Axboe committed
3134

3135
	if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
3136 3137
			       IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
			       IORING_ENTER_REGISTERED_RING)))
Jens Axboe's avatar
Jens Axboe committed
3138 3139
		return -EINVAL;

3140 3141 3142 3143 3144 3145 3146
	/*
	 * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
	 * need only dereference our task private array to find it.
	 */
	if (flags & IORING_ENTER_REGISTERED_RING) {
		struct io_uring_task *tctx = current->io_uring;

3147
		if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
3148 3149 3150
			return -EINVAL;
		fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
		f.file = tctx->registered_rings[fd];
3151
		f.flags = 0;
3152 3153
		if (unlikely(!f.file))
			return -EBADF;
3154 3155
	} else {
		f = fdget(fd);
3156 3157 3158 3159
		if (unlikely(!f.file))
			return -EBADF;
		ret = -EOPNOTSUPP;
		if (unlikely(!io_is_uring_fops(f.file)))
3160
			goto out;
3161
	}
Jens Axboe's avatar
Jens Axboe committed
3162 3163

	ctx = f.file->private_data;
3164
	ret = -EBADFD;
3165
	if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
3166 3167
		goto out;

3168 3169 3170 3171 3172
	/*
	 * For SQ polling, the thread will do all submissions and completions.
	 * Just return the requested submit count, and wake the thread if
	 * we were asked to.
	 */
3173
	ret = 0;
3174
	if (ctx->flags & IORING_SETUP_SQPOLL) {
3175
		io_cqring_overflow_flush(ctx);
3176

3177 3178
		if (unlikely(ctx->sq_data->thread == NULL)) {
			ret = -EOWNERDEAD;
3179
			goto out;
3180
		}
3181
		if (flags & IORING_ENTER_SQ_WAKEUP)
3182
			wake_up(&ctx->sq_data->wait);
3183 3184 3185 3186 3187
		if (flags & IORING_ENTER_SQ_WAIT) {
			ret = io_sqpoll_wait_sq(ctx);
			if (ret)
				goto out;
		}
3188
		ret = to_submit;
3189
	} else if (to_submit) {
3190
		ret = io_uring_add_tctx_node(ctx);
3191 3192
		if (unlikely(ret))
			goto out;
3193

Jens Axboe's avatar
Jens Axboe committed
3194
		mutex_lock(&ctx->uring_lock);
3195 3196
		ret = io_submit_sqes(ctx, to_submit);
		if (ret != to_submit) {
3197
			mutex_unlock(&ctx->uring_lock);
3198
			goto out;
3199 3200 3201 3202
		}
		if ((flags & IORING_ENTER_GETEVENTS) && ctx->syscall_iopoll)
			goto iopoll_locked;
		mutex_unlock(&ctx->uring_lock);
Jens Axboe's avatar
Jens Axboe committed
3203
	}
3204

Jens Axboe's avatar
Jens Axboe committed
3205
	if (flags & IORING_ENTER_GETEVENTS) {
3206
		int ret2;
3207

3208
		if (ctx->syscall_iopoll) {
3209 3210 3211 3212 3213 3214 3215 3216
			/*
			 * We disallow the app entering submit/complete with
			 * polling, but we still need to lock the ring to
			 * prevent racing with polled issue that got punted to
			 * a workqueue.
			 */
			mutex_lock(&ctx->uring_lock);
iopoll_locked:
3217 3218 3219 3220 3221
			ret2 = io_validate_ext_arg(flags, argp, argsz);
			if (likely(!ret2)) {
				min_complete = min(min_complete,
						   ctx->cq_entries);
				ret2 = io_iopoll_check(ctx, min_complete);
3222 3223
			}
			mutex_unlock(&ctx->uring_lock);
3224
		} else {
3225 3226 3227
			const sigset_t __user *sig;
			struct __kernel_timespec __user *ts;

3228 3229 3230 3231 3232 3233 3234
			ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig);
			if (likely(!ret2)) {
				min_complete = min(min_complete,
						   ctx->cq_entries);
				ret2 = io_cqring_wait(ctx, min_complete, sig,
						      argsz, ts);
			}
3235
		}
3236

3237
		if (!ret) {
3238
			ret = ret2;
Jens Axboe's avatar
Jens Axboe committed
3239

3240 3241 3242 3243 3244 3245 3246 3247
			/*
			 * EBADR indicates that one or more CQE were dropped.
			 * Once the user has been informed we can clear the bit
			 * as they are obviously ok with those drops.
			 */
			if (unlikely(ret2 == -EBADR))
				clear_bit(IO_CHECK_CQ_DROPPED_BIT,
					  &ctx->check_cq);
3248
		}
Jens Axboe's avatar
Jens Axboe committed
3249
	}
3250
out:
3251
	fdput(f);
3252
	return ret;
Jens Axboe's avatar
Jens Axboe committed
3253 3254 3255 3256 3257
}

static const struct file_operations io_uring_fops = {
	.release	= io_uring_release,
	.mmap		= io_uring_mmap,
3258 3259 3260 3261
#ifndef CONFIG_MMU
	.get_unmapped_area = io_uring_nommu_get_unmapped_area,
	.mmap_capabilities = io_uring_nommu_mmap_capabilities,
#endif
Jens Axboe's avatar
Jens Axboe committed
3262
	.poll		= io_uring_poll,
3263
#ifdef CONFIG_PROC_FS
3264
	.show_fdinfo	= io_uring_show_fdinfo,
3265
#endif
Jens Axboe's avatar
Jens Axboe committed
3266 3267
};

3268 3269 3270 3271 3272
bool io_is_uring_fops(struct file *file)
{
	return file->f_op == &io_uring_fops;
}

3273 3274
static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx,
					 struct io_uring_params *p)
Jens Axboe's avatar
Jens Axboe committed
3275
{
3276 3277
	struct io_rings *rings;
	size_t size, sq_array_offset;
Jens Axboe's avatar
Jens Axboe committed
3278

3279 3280 3281 3282
	/* make sure these are sane, as we already accounted them */
	ctx->sq_entries = p->sq_entries;
	ctx->cq_entries = p->cq_entries;

3283
	size = rings_size(ctx, p->sq_entries, p->cq_entries, &sq_array_offset);
3284 3285 3286 3287 3288
	if (size == SIZE_MAX)
		return -EOVERFLOW;

	rings = io_mem_alloc(size);
	if (!rings)
Jens Axboe's avatar
Jens Axboe committed
3289 3290
		return -ENOMEM;

3291 3292 3293 3294 3295 3296
	ctx->rings = rings;
	ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
	rings->sq_ring_mask = p->sq_entries - 1;
	rings->cq_ring_mask = p->cq_entries - 1;
	rings->sq_ring_entries = p->sq_entries;
	rings->cq_ring_entries = p->cq_entries;
Jens Axboe's avatar
Jens Axboe committed
3297

3298 3299 3300 3301
	if (p->flags & IORING_SETUP_SQE128)
		size = array_size(2 * sizeof(struct io_uring_sqe), p->sq_entries);
	else
		size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
3302 3303 3304
	if (size == SIZE_MAX) {
		io_mem_free(ctx->rings);
		ctx->rings = NULL;
Jens Axboe's avatar
Jens Axboe committed
3305
		return -EOVERFLOW;
3306
	}
Jens Axboe's avatar
Jens Axboe committed
3307 3308

	ctx->sq_sqes = io_mem_alloc(size);
3309 3310 3311
	if (!ctx->sq_sqes) {
		io_mem_free(ctx->rings);
		ctx->rings = NULL;
Jens Axboe's avatar
Jens Axboe committed
3312
		return -ENOMEM;
3313
	}
Jens Axboe's avatar
Jens Axboe committed
3314 3315 3316 3317

	return 0;
}

3318 3319 3320 3321 3322 3323 3324 3325
static int io_uring_install_fd(struct io_ring_ctx *ctx, struct file *file)
{
	int ret, fd;

	fd = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
	if (fd < 0)
		return fd;

3326
	ret = __io_uring_add_tctx_node(ctx, false);
3327 3328 3329 3330 3331 3332 3333 3334
	if (ret) {
		put_unused_fd(fd);
		return ret;
	}
	fd_install(fd, file);
	return fd;
}

Jens Axboe's avatar
Jens Axboe committed
3335 3336 3337 3338 3339 3340
/*
 * Allocate an anonymous fd, this is what constitutes the application
 * visible backing of an io_uring instance. The application mmaps this
 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
 * we have to tie this fd to a socket for file garbage collection purposes.
 */
3341
static struct file *io_uring_get_file(struct io_ring_ctx *ctx)
Jens Axboe's avatar
Jens Axboe committed
3342 3343
{
	struct file *file;
3344
#if defined(CONFIG_UNIX)
Jens Axboe's avatar
Jens Axboe committed
3345 3346 3347 3348 3349
	int ret;

	ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
				&ctx->ring_sock);
	if (ret)
3350
		return ERR_PTR(ret);
Jens Axboe's avatar
Jens Axboe committed
3351 3352
#endif

3353 3354
	file = anon_inode_getfile_secure("[io_uring]", &io_uring_fops, ctx,
					 O_RDWR | O_CLOEXEC, NULL);
Jens Axboe's avatar
Jens Axboe committed
3355
#if defined(CONFIG_UNIX)
3356 3357 3358 3359 3360
	if (IS_ERR(file)) {
		sock_release(ctx->ring_sock);
		ctx->ring_sock = NULL;
	} else {
		ctx->ring_sock->file = file;
3361
	}
Jens Axboe's avatar
Jens Axboe committed
3362
#endif
3363
	return file;
Jens Axboe's avatar
Jens Axboe committed
3364 3365
}

3366 3367
static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
				  struct io_uring_params __user *params)
Jens Axboe's avatar
Jens Axboe committed
3368 3369
{
	struct io_ring_ctx *ctx;
3370
	struct file *file;
Jens Axboe's avatar
Jens Axboe committed
3371 3372
	int ret;

3373
	if (!entries)
Jens Axboe's avatar
Jens Axboe committed
3374
		return -EINVAL;
3375 3376 3377 3378 3379
	if (entries > IORING_MAX_ENTRIES) {
		if (!(p->flags & IORING_SETUP_CLAMP))
			return -EINVAL;
		entries = IORING_MAX_ENTRIES;
	}
Jens Axboe's avatar
Jens Axboe committed
3380 3381 3382 3383 3384

	/*
	 * Use twice as many entries for the CQ ring. It's possible for the
	 * application to drive a higher depth than the size of the SQ ring,
	 * since the sqes are only used at submission time. This allows for
3385 3386 3387
	 * some flexibility in overcommitting a bit. If the application has
	 * set IORING_SETUP_CQSIZE, it will have passed in the desired number
	 * of CQ ring entries manually.
Jens Axboe's avatar
Jens Axboe committed
3388 3389
	 */
	p->sq_entries = roundup_pow_of_two(entries);
3390 3391 3392 3393 3394 3395
	if (p->flags & IORING_SETUP_CQSIZE) {
		/*
		 * If IORING_SETUP_CQSIZE is set, we do the same roundup
		 * to a power-of-two, if it isn't already. We do NOT impose
		 * any cq vs sq ring sizing.
		 */
3396
		if (!p->cq_entries)
3397
			return -EINVAL;
3398 3399 3400 3401 3402
		if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
			if (!(p->flags & IORING_SETUP_CLAMP))
				return -EINVAL;
			p->cq_entries = IORING_MAX_CQ_ENTRIES;
		}
3403 3404 3405
		p->cq_entries = roundup_pow_of_two(p->cq_entries);
		if (p->cq_entries < p->sq_entries)
			return -EINVAL;
3406 3407 3408
	} else {
		p->cq_entries = 2 * p->sq_entries;
	}
Jens Axboe's avatar
Jens Axboe committed
3409 3410

	ctx = io_ring_ctx_alloc(p);
3411
	if (!ctx)
Jens Axboe's avatar
Jens Axboe committed
3412
		return -ENOMEM;
3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423

	/*
	 * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
	 * space applications don't need to do io completion events
	 * polling again, they can rely on io_sq_thread to do polling
	 * work, which can reduce cpu usage and uring_lock contention.
	 */
	if (ctx->flags & IORING_SETUP_IOPOLL &&
	    !(ctx->flags & IORING_SETUP_SQPOLL))
		ctx->syscall_iopoll = 1;

Jens Axboe's avatar
Jens Axboe committed
3424
	ctx->compat = in_compat_syscall();
3425 3426
	if (!capable(CAP_IPC_LOCK))
		ctx->user = get_uid(current_user());
3427

3428
	/*
3429 3430
	 * For SQPOLL, we just need a wakeup, always. For !SQPOLL, if
	 * COOP_TASKRUN is set, then IPIs are never needed by the app.
3431
	 */
3432 3433 3434
	ret = -EINVAL;
	if (ctx->flags & IORING_SETUP_SQPOLL) {
		/* IPI related flags don't make sense with SQPOLL */
3435
		if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
3436 3437
				  IORING_SETUP_TASKRUN_FLAG |
				  IORING_SETUP_DEFER_TASKRUN))
3438
			goto err;
3439
		ctx->notify_method = TWA_SIGNAL_NO_IPI;
3440 3441 3442
	} else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
		ctx->notify_method = TWA_SIGNAL_NO_IPI;
	} else {
3443 3444
		if (ctx->flags & IORING_SETUP_TASKRUN_FLAG &&
		    !(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
3445
			goto err;
3446
		ctx->notify_method = TWA_SIGNAL;
3447
	}
3448

3449 3450 3451 3452 3453 3454 3455 3456 3457 3458
	/*
	 * For DEFER_TASKRUN we require the completion task to be the same as the
	 * submission task. This implies that there is only one submitter, so enforce
	 * that.
	 */
	if (ctx->flags & IORING_SETUP_DEFER_TASKRUN &&
	    !(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) {
		goto err;
	}

3459 3460 3461 3462 3463 3464
	/*
	 * This is just grabbed for accounting purposes. When a process exits,
	 * the mm is exited and dropped before the files, hence we need to hang
	 * on to this mm purely for the purposes of being able to unaccount
	 * memory (locked/pinned vm). It's not used for anything else.
	 */
3465
	mmgrab(current->mm);
3466
	ctx->mm_account = current->mm;
3467

Jens Axboe's avatar
Jens Axboe committed
3468 3469 3470 3471
	ret = io_allocate_scq_urings(ctx, p);
	if (ret)
		goto err;

3472
	ret = io_sq_offload_create(ctx, p);
Jens Axboe's avatar
Jens Axboe committed
3473 3474
	if (ret)
		goto err;
3475
	/* always set a rsrc node */
3476 3477 3478
	ret = io_rsrc_node_switch_start(ctx);
	if (ret)
		goto err;
3479
	io_rsrc_node_switch(ctx, NULL);
Jens Axboe's avatar
Jens Axboe committed
3480 3481

	memset(&p->sq_off, 0, sizeof(p->sq_off));
3482 3483 3484 3485 3486 3487 3488
	p->sq_off.head = offsetof(struct io_rings, sq.head);
	p->sq_off.tail = offsetof(struct io_rings, sq.tail);
	p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
	p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
	p->sq_off.flags = offsetof(struct io_rings, sq_flags);
	p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
	p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
Jens Axboe's avatar
Jens Axboe committed
3489 3490

	memset(&p->cq_off, 0, sizeof(p->cq_off));
3491 3492 3493 3494 3495 3496
	p->cq_off.head = offsetof(struct io_rings, cq.head);
	p->cq_off.tail = offsetof(struct io_rings, cq.tail);
	p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
	p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
	p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
	p->cq_off.cqes = offsetof(struct io_rings, cqes);
3497
	p->cq_off.flags = offsetof(struct io_rings, cq_flags);
3498

3499 3500
	p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
			IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
3501
			IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
3502
			IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
3503
			IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
3504 3505
			IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
			IORING_FEAT_LINKED_FILE;
3506 3507 3508 3509 3510

	if (copy_to_user(params, p, sizeof(*p))) {
		ret = -EFAULT;
		goto err;
	}
3511

3512 3513 3514 3515 3516 3517
	file = io_uring_get_file(ctx);
	if (IS_ERR(file)) {
		ret = PTR_ERR(file);
		goto err;
	}

3518 3519 3520 3521
	/*
	 * Install ring fd as the very last thing, so we don't risk someone
	 * having closed it before we finish setup
	 */
3522 3523 3524 3525 3526 3527
	ret = io_uring_install_fd(ctx, file);
	if (ret < 0) {
		/* fput will clean it up */
		fput(file);
		return ret;
	}
3528

3529
	trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
Jens Axboe's avatar
Jens Axboe committed
3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552
	return ret;
err:
	io_ring_ctx_wait_and_kill(ctx);
	return ret;
}

/*
 * Sets up an aio uring context, and returns the fd. Applications asks for a
 * ring size, we return the actual sq/cq ring sizes (among other things) in the
 * params structure passed in.
 */
static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
{
	struct io_uring_params p;
	int i;

	if (copy_from_user(&p, params, sizeof(p)))
		return -EFAULT;
	for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
		if (p.resv[i])
			return -EINVAL;
	}

3553
	if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3554
			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
3555
			IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
3556
			IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
3557
			IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
3558
			IORING_SETUP_SQE128 | IORING_SETUP_CQE32 |
3559
			IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN))
Jens Axboe's avatar
Jens Axboe committed
3560 3561
		return -EINVAL;

3562
	return io_uring_create(entries, &p, params);
Jens Axboe's avatar
Jens Axboe committed
3563 3564 3565 3566 3567 3568 3569 3570
}

SYSCALL_DEFINE2(io_uring_setup, u32, entries,
		struct io_uring_params __user *, params)
{
	return io_uring_setup(entries, params);
}

3571 3572
static __cold int io_probe(struct io_ring_ctx *ctx, void __user *arg,
			   unsigned nr_args)
3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610
{
	struct io_uring_probe *p;
	size_t size;
	int i, ret;

	size = struct_size(p, ops, nr_args);
	if (size == SIZE_MAX)
		return -EOVERFLOW;
	p = kzalloc(size, GFP_KERNEL);
	if (!p)
		return -ENOMEM;

	ret = -EFAULT;
	if (copy_from_user(p, arg, size))
		goto out;
	ret = -EINVAL;
	if (memchr_inv(p, 0, size))
		goto out;

	p->last_op = IORING_OP_LAST - 1;
	if (nr_args > IORING_OP_LAST)
		nr_args = IORING_OP_LAST;

	for (i = 0; i < nr_args; i++) {
		p->ops[i].op = i;
		if (!io_op_defs[i].not_supported)
			p->ops[i].flags = IO_URING_OP_SUPPORTED;
	}
	p->ops_len = i;

	ret = 0;
	if (copy_to_user(arg, p, size))
		ret = -EFAULT;
out:
	kfree(p);
	return ret;
}

3611 3612
static int io_register_personality(struct io_ring_ctx *ctx)
{
Jens Axboe's avatar
Jens Axboe committed
3613
	const struct cred *creds;
3614
	u32 id;
3615
	int ret;
3616

Jens Axboe's avatar
Jens Axboe committed
3617
	creds = get_current_cred();
3618

3619 3620
	ret = xa_alloc_cyclic(&ctx->personalities, &id, (void *)creds,
			XA_LIMIT(0, USHRT_MAX), &ctx->pers_next, GFP_KERNEL);
3621 3622 3623 3624 3625
	if (ret < 0) {
		put_cred(creds);
		return ret;
	}
	return id;
3626 3627
}

3628 3629
static __cold int io_register_restrictions(struct io_ring_ctx *ctx,
					   void __user *arg, unsigned int nr_args)
3630 3631 3632 3633 3634
{
	struct io_uring_restriction *res;
	size_t size;
	int i, ret;

3635 3636 3637 3638
	/* Restrictions allowed only if rings started disabled */
	if (!(ctx->flags & IORING_SETUP_R_DISABLED))
		return -EBADFD;

3639
	/* We allow only a single restrictions registration */
3640
	if (ctx->restrictions.registered)
3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691
		return -EBUSY;

	if (!arg || nr_args > IORING_MAX_RESTRICTIONS)
		return -EINVAL;

	size = array_size(nr_args, sizeof(*res));
	if (size == SIZE_MAX)
		return -EOVERFLOW;

	res = memdup_user(arg, size);
	if (IS_ERR(res))
		return PTR_ERR(res);

	ret = 0;

	for (i = 0; i < nr_args; i++) {
		switch (res[i].opcode) {
		case IORING_RESTRICTION_REGISTER_OP:
			if (res[i].register_op >= IORING_REGISTER_LAST) {
				ret = -EINVAL;
				goto out;
			}

			__set_bit(res[i].register_op,
				  ctx->restrictions.register_op);
			break;
		case IORING_RESTRICTION_SQE_OP:
			if (res[i].sqe_op >= IORING_OP_LAST) {
				ret = -EINVAL;
				goto out;
			}

			__set_bit(res[i].sqe_op, ctx->restrictions.sqe_op);
			break;
		case IORING_RESTRICTION_SQE_FLAGS_ALLOWED:
			ctx->restrictions.sqe_flags_allowed = res[i].sqe_flags;
			break;
		case IORING_RESTRICTION_SQE_FLAGS_REQUIRED:
			ctx->restrictions.sqe_flags_required = res[i].sqe_flags;
			break;
		default:
			ret = -EINVAL;
			goto out;
		}
	}

out:
	/* Reset all restrictions if an error happened */
	if (ret != 0)
		memset(&ctx->restrictions, 0, sizeof(ctx->restrictions));
	else
3692
		ctx->restrictions.registered = true;
3693 3694 3695 3696 3697

	kfree(res);
	return ret;
}

3698 3699 3700 3701 3702 3703 3704 3705
static int io_register_enable_rings(struct io_ring_ctx *ctx)
{
	if (!(ctx->flags & IORING_SETUP_R_DISABLED))
		return -EBADFD;

	if (ctx->restrictions.registered)
		ctx->restricted = 1;

3706 3707 3708
	ctx->flags &= ~IORING_SETUP_R_DISABLED;
	if (ctx->sq_data && wq_has_sleeper(&ctx->sq_data->wait))
		wake_up(&ctx->sq_data->wait);
3709 3710 3711
	return 0;
}

3712 3713
static __cold int io_register_iowq_aff(struct io_ring_ctx *ctx,
				       void __user *arg, unsigned len)
3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728
{
	struct io_uring_task *tctx = current->io_uring;
	cpumask_var_t new_mask;
	int ret;

	if (!tctx || !tctx->io_wq)
		return -EINVAL;

	if (!alloc_cpumask_var(&new_mask, GFP_KERNEL))
		return -ENOMEM;

	cpumask_clear(new_mask);
	if (len > cpumask_size())
		len = cpumask_size();

3729 3730 3731 3732 3733 3734 3735 3736 3737
	if (in_compat_syscall()) {
		ret = compat_get_bitmap(cpumask_bits(new_mask),
					(const compat_ulong_t __user *)arg,
					len * 8 /* CHAR_BIT */);
	} else {
		ret = copy_from_user(new_mask, arg, len);
	}

	if (ret) {
3738 3739 3740 3741 3742 3743 3744 3745 3746
		free_cpumask_var(new_mask);
		return -EFAULT;
	}

	ret = io_wq_cpu_affinity(tctx->io_wq, new_mask);
	free_cpumask_var(new_mask);
	return ret;
}

3747
static __cold int io_unregister_iowq_aff(struct io_ring_ctx *ctx)
3748 3749 3750 3751 3752 3753 3754 3755 3756
{
	struct io_uring_task *tctx = current->io_uring;

	if (!tctx || !tctx->io_wq)
		return -EINVAL;

	return io_wq_cpu_affinity(tctx->io_wq, NULL);
}

3757 3758
static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
					       void __user *arg)
3759
	__must_hold(&ctx->uring_lock)
3760
{
3761
	struct io_tctx_node *node;
3762 3763
	struct io_uring_task *tctx = NULL;
	struct io_sq_data *sqd = NULL;
3764 3765 3766 3767 3768 3769 3770 3771 3772
	__u32 new_count[2];
	int i, ret;

	if (copy_from_user(new_count, arg, sizeof(new_count)))
		return -EFAULT;
	for (i = 0; i < ARRAY_SIZE(new_count); i++)
		if (new_count[i] > INT_MAX)
			return -EINVAL;

3773 3774 3775
	if (ctx->flags & IORING_SETUP_SQPOLL) {
		sqd = ctx->sq_data;
		if (sqd) {
3776 3777 3778 3779 3780
			/*
			 * Observe the correct sqd->lock -> ctx->uring_lock
			 * ordering. Fine to drop uring_lock here, we hold
			 * a ref to the ctx.
			 */
3781
			refcount_inc(&sqd->refs);
3782
			mutex_unlock(&ctx->uring_lock);
3783
			mutex_lock(&sqd->lock);
3784
			mutex_lock(&ctx->uring_lock);
3785 3786
			if (sqd->thread)
				tctx = sqd->thread->io_uring;
3787 3788 3789 3790 3791
		}
	} else {
		tctx = current->io_uring;
	}

3792
	BUILD_BUG_ON(sizeof(new_count) != sizeof(ctx->iowq_limits));
3793

3794 3795 3796
	for (i = 0; i < ARRAY_SIZE(new_count); i++)
		if (new_count[i])
			ctx->iowq_limits[i] = new_count[i];
3797 3798 3799 3800 3801 3802 3803 3804 3805
	ctx->iowq_limits_set = true;

	if (tctx && tctx->io_wq) {
		ret = io_wq_max_workers(tctx->io_wq, new_count);
		if (ret)
			goto err;
	} else {
		memset(new_count, 0, sizeof(new_count));
	}
3806

3807
	if (sqd) {
3808
		mutex_unlock(&sqd->lock);
3809 3810
		io_put_sq_data(sqd);
	}
3811 3812 3813 3814

	if (copy_to_user(arg, new_count, sizeof(new_count)))
		return -EFAULT;

3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830
	/* that's it for SQPOLL, only the SQPOLL task creates requests */
	if (sqd)
		return 0;

	/* now propagate the restriction to all registered users */
	list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
		struct io_uring_task *tctx = node->task->io_uring;

		if (WARN_ON_ONCE(!tctx->io_wq))
			continue;

		for (i = 0; i < ARRAY_SIZE(new_count); i++)
			new_count[i] = ctx->iowq_limits[i];
		/* ignore errors, it always returns zero anyway */
		(void)io_wq_max_workers(tctx->io_wq, new_count);
	}
3831
	return 0;
3832
err:
3833
	if (sqd) {
3834
		mutex_unlock(&sqd->lock);
3835 3836
		io_put_sq_data(sqd);
	}
3837
	return ret;
3838 3839
}

3840 3841
static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
			       void __user *arg, unsigned nr_args)
3842 3843
	__releases(ctx->uring_lock)
	__acquires(ctx->uring_lock)
3844 3845 3846
{
	int ret;

3847
	/*
3848 3849
	 * We don't quiesce the refs for register anymore and so it can't be
	 * dying as we're holding a file ref here.
3850
	 */
3851
	if (WARN_ON_ONCE(percpu_ref_is_dying(&ctx->refs)))
3852 3853
		return -ENXIO;

3854 3855 3856 3857 3858 3859 3860 3861
	if (ctx->restricted) {
		if (opcode >= IORING_REGISTER_LAST)
			return -EINVAL;
		opcode = array_index_nospec(opcode, IORING_REGISTER_LAST);
		if (!test_bit(opcode, ctx->restrictions.register_op))
			return -EACCES;
	}

3862 3863
	switch (opcode) {
	case IORING_REGISTER_BUFFERS:
3864 3865 3866
		ret = -EFAULT;
		if (!arg)
			break;
3867
		ret = io_sqe_buffers_register(ctx, arg, nr_args, NULL);
3868 3869 3870 3871 3872
		break;
	case IORING_UNREGISTER_BUFFERS:
		ret = -EINVAL;
		if (arg || nr_args)
			break;
3873
		ret = io_sqe_buffers_unregister(ctx);
3874
		break;
3875
	case IORING_REGISTER_FILES:
3876 3877 3878
		ret = -EFAULT;
		if (!arg)
			break;
3879
		ret = io_sqe_files_register(ctx, arg, nr_args, NULL);
3880 3881 3882 3883 3884 3885 3886
		break;
	case IORING_UNREGISTER_FILES:
		ret = -EINVAL;
		if (arg || nr_args)
			break;
		ret = io_sqe_files_unregister(ctx);
		break;
3887
	case IORING_REGISTER_FILES_UPDATE:
3888
		ret = io_register_files_update(ctx, arg, nr_args);
3889
		break;
3890 3891 3892 3893
	case IORING_REGISTER_EVENTFD:
		ret = -EINVAL;
		if (nr_args != 1)
			break;
3894 3895 3896 3897 3898
		ret = io_eventfd_register(ctx, arg, 0);
		break;
	case IORING_REGISTER_EVENTFD_ASYNC:
		ret = -EINVAL;
		if (nr_args != 1)
3899
			break;
3900
		ret = io_eventfd_register(ctx, arg, 1);
3901 3902 3903 3904 3905 3906 3907
		break;
	case IORING_UNREGISTER_EVENTFD:
		ret = -EINVAL;
		if (arg || nr_args)
			break;
		ret = io_eventfd_unregister(ctx);
		break;
3908 3909 3910 3911 3912 3913
	case IORING_REGISTER_PROBE:
		ret = -EINVAL;
		if (!arg || nr_args > 256)
			break;
		ret = io_probe(ctx, arg, nr_args);
		break;
3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925
	case IORING_REGISTER_PERSONALITY:
		ret = -EINVAL;
		if (arg || nr_args)
			break;
		ret = io_register_personality(ctx);
		break;
	case IORING_UNREGISTER_PERSONALITY:
		ret = -EINVAL;
		if (arg)
			break;
		ret = io_unregister_personality(ctx, nr_args);
		break;
3926 3927 3928 3929 3930 3931
	case IORING_REGISTER_ENABLE_RINGS:
		ret = -EINVAL;
		if (arg || nr_args)
			break;
		ret = io_register_enable_rings(ctx);
		break;
3932 3933 3934
	case IORING_REGISTER_RESTRICTIONS:
		ret = io_register_restrictions(ctx, arg, nr_args);
		break;
3935 3936 3937 3938 3939 3940 3941 3942 3943
	case IORING_REGISTER_FILES2:
		ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_FILE);
		break;
	case IORING_REGISTER_FILES_UPDATE2:
		ret = io_register_rsrc_update(ctx, arg, nr_args,
					      IORING_RSRC_FILE);
		break;
	case IORING_REGISTER_BUFFERS2:
		ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_BUFFER);
3944
		break;
3945 3946 3947
	case IORING_REGISTER_BUFFERS_UPDATE:
		ret = io_register_rsrc_update(ctx, arg, nr_args,
					      IORING_RSRC_BUFFER);
3948
		break;
3949 3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960
	case IORING_REGISTER_IOWQ_AFF:
		ret = -EINVAL;
		if (!arg || !nr_args)
			break;
		ret = io_register_iowq_aff(ctx, arg, nr_args);
		break;
	case IORING_UNREGISTER_IOWQ_AFF:
		ret = -EINVAL;
		if (arg || nr_args)
			break;
		ret = io_unregister_iowq_aff(ctx);
		break;
3961 3962 3963 3964 3965 3966
	case IORING_REGISTER_IOWQ_MAX_WORKERS:
		ret = -EINVAL;
		if (!arg || nr_args != 2)
			break;
		ret = io_register_iowq_max_workers(ctx, arg);
		break;
3967 3968 3969 3970 3971 3972
	case IORING_REGISTER_RING_FDS:
		ret = io_ringfd_register(ctx, arg, nr_args);
		break;
	case IORING_UNREGISTER_RING_FDS:
		ret = io_ringfd_unregister(ctx, arg, nr_args);
		break;
3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984
	case IORING_REGISTER_PBUF_RING:
		ret = -EINVAL;
		if (!arg || nr_args != 1)
			break;
		ret = io_register_pbuf_ring(ctx, arg);
		break;
	case IORING_UNREGISTER_PBUF_RING:
		ret = -EINVAL;
		if (!arg || nr_args != 1)
			break;
		ret = io_unregister_pbuf_ring(ctx, arg);
		break;
3985 3986 3987 3988 3989 3990
	case IORING_REGISTER_SYNC_CANCEL:
		ret = -EINVAL;
		if (!arg || nr_args != 1)
			break;
		ret = io_sync_cancel(ctx, arg);
		break;
3991 3992 3993 3994 3995 3996
	case IORING_REGISTER_FILE_ALLOC_RANGE:
		ret = -EINVAL;
		if (!arg || nr_args)
			break;
		ret = io_register_file_alloc_range(ctx, arg);
		break;
3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016
	default:
		ret = -EINVAL;
		break;
	}

	return ret;
}

SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
		void __user *, arg, unsigned int, nr_args)
{
	struct io_ring_ctx *ctx;
	long ret = -EBADF;
	struct fd f;

	f = fdget(fd);
	if (!f.file)
		return -EBADF;

	ret = -EOPNOTSUPP;
4017
	if (!io_is_uring_fops(f.file))
4018 4019 4020 4021
		goto out_fput;

	ctx = f.file->private_data;

4022
	io_run_task_work_ctx(ctx);
4023

4024 4025 4026
	mutex_lock(&ctx->uring_lock);
	ret = __io_uring_register(ctx, opcode, arg, nr_args);
	mutex_unlock(&ctx->uring_lock);
4027
	trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret);
4028 4029 4030 4031 4032
out_fput:
	fdput(f);
	return ret;
}

Jens Axboe's avatar
Jens Axboe committed
4033 4034
static int __init io_uring_init(void)
{
4035
#define __BUILD_BUG_VERIFY_OFFSET_SIZE(stype, eoffset, esize, ename) do { \
4036
	BUILD_BUG_ON(offsetof(stype, ename) != eoffset); \
4037
	BUILD_BUG_ON(sizeof_field(stype, ename) != esize); \
4038 4039 4040
} while (0)

#define BUILD_BUG_SQE_ELEM(eoffset, etype, ename) \
4041 4042 4043
	__BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, sizeof(etype), ename)
#define BUILD_BUG_SQE_ELEM_SIZE(eoffset, esize, ename) \
	__BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, esize, ename)
4044 4045 4046 4047 4048 4049 4050
	BUILD_BUG_ON(sizeof(struct io_uring_sqe) != 64);
	BUILD_BUG_SQE_ELEM(0,  __u8,   opcode);
	BUILD_BUG_SQE_ELEM(1,  __u8,   flags);
	BUILD_BUG_SQE_ELEM(2,  __u16,  ioprio);
	BUILD_BUG_SQE_ELEM(4,  __s32,  fd);
	BUILD_BUG_SQE_ELEM(8,  __u64,  off);
	BUILD_BUG_SQE_ELEM(8,  __u64,  addr2);
4051 4052
	BUILD_BUG_SQE_ELEM(8,  __u32,  cmd_op);
	BUILD_BUG_SQE_ELEM(12, __u32, __pad1);
4053
	BUILD_BUG_SQE_ELEM(16, __u64,  addr);
4054
	BUILD_BUG_SQE_ELEM(16, __u64,  splice_off_in);
4055 4056 4057 4058 4059
	BUILD_BUG_SQE_ELEM(24, __u32,  len);
	BUILD_BUG_SQE_ELEM(28,     __kernel_rwf_t, rw_flags);
	BUILD_BUG_SQE_ELEM(28, /* compat */   int, rw_flags);
	BUILD_BUG_SQE_ELEM(28, /* compat */ __u32, rw_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  fsync_flags);
4060 4061
	BUILD_BUG_SQE_ELEM(28, /* compat */ __u16,  poll_events);
	BUILD_BUG_SQE_ELEM(28, __u32,  poll32_events);
4062 4063 4064 4065 4066 4067 4068 4069
	BUILD_BUG_SQE_ELEM(28, __u32,  sync_range_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  msg_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  timeout_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  accept_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  cancel_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  open_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  statx_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  fadvise_advice);
4070
	BUILD_BUG_SQE_ELEM(28, __u32,  splice_flags);
4071 4072 4073 4074 4075
	BUILD_BUG_SQE_ELEM(28, __u32,  rename_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  unlink_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  hardlink_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  xattr_flags);
	BUILD_BUG_SQE_ELEM(28, __u32,  msg_ring_flags);
4076 4077
	BUILD_BUG_SQE_ELEM(32, __u64,  user_data);
	BUILD_BUG_SQE_ELEM(40, __u16,  buf_index);
4078
	BUILD_BUG_SQE_ELEM(40, __u16,  buf_group);
4079
	BUILD_BUG_SQE_ELEM(42, __u16,  personality);
4080
	BUILD_BUG_SQE_ELEM(44, __s32,  splice_fd_in);
4081
	BUILD_BUG_SQE_ELEM(44, __u32,  file_index);
4082 4083
	BUILD_BUG_SQE_ELEM(44, __u16,  addr_len);
	BUILD_BUG_SQE_ELEM(46, __u16,  __pad3[0]);
4084
	BUILD_BUG_SQE_ELEM(48, __u64,  addr3);
4085 4086
	BUILD_BUG_SQE_ELEM_SIZE(48, 0, cmd);
	BUILD_BUG_SQE_ELEM(56, __u64,  __pad2);
4087

4088 4089 4090 4091
	BUILD_BUG_ON(sizeof(struct io_uring_files_update) !=
		     sizeof(struct io_uring_rsrc_update));
	BUILD_BUG_ON(sizeof(struct io_uring_rsrc_update) >
		     sizeof(struct io_uring_rsrc_update2));
4092 4093

	/* ->buf_index is u16 */
4094 4095 4096
	BUILD_BUG_ON(offsetof(struct io_uring_buf_ring, bufs) != 0);
	BUILD_BUG_ON(offsetof(struct io_uring_buf, resv) !=
		     offsetof(struct io_uring_buf_ring, tail));
4097

4098 4099
	/* should fit into one byte */
	BUILD_BUG_ON(SQE_VALID_FLAGS >= (1 << 8));
4100 4101
	BUILD_BUG_ON(SQE_COMMON_FLAGS >= (1 << 8));
	BUILD_BUG_ON((SQE_VALID_FLAGS | SQE_COMMON_FLAGS) != SQE_VALID_FLAGS);
4102

4103
	BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof(int));
4104

4105 4106
	BUILD_BUG_ON(sizeof(atomic_t) != sizeof(u32));

4107
	io_uring_optable_init();
4108

4109 4110
	req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC |
				SLAB_ACCOUNT);
Jens Axboe's avatar
Jens Axboe committed
4111 4112 4113
	return 0;
};
__initcall(io_uring_init);