Commit 221c5eb2 authored by Jens Axboe's avatar Jens Axboe

io_uring: add support for IORING_OP_POLL

This is basically a direct port of bfe4037e, which implements a
one-shot poll command through aio. Description below is based on that
commit as well. However, instead of adding a POLL command and relying
on io_cancel(2) to remove it, we mimic the epoll(2) interface of
having a command to add a poll notification, IORING_OP_POLL_ADD,
and one to remove it again, IORING_OP_POLL_REMOVE.

To poll for a file descriptor the application should submit an sqe of
type IORING_OP_POLL. It will poll the fd for the events specified in the
poll_events field.

Unlike poll or epoll without EPOLLONESHOT this interface always works in
one shot mode, that is once the sqe is completed, it will have to be
resubmitted.
Reviewed-by: default avatarHannes Reinecke <hare@suse.com>
Based-on-code-from: Christoph Hellwig <hch@lst.de>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent c16361c1
...@@ -161,6 +161,7 @@ struct io_ring_ctx { ...@@ -161,6 +161,7 @@ struct io_ring_ctx {
* manipulate the list, hence no extra locking is needed there. * manipulate the list, hence no extra locking is needed there.
*/ */
struct list_head poll_list; struct list_head poll_list;
struct list_head cancel_list;
} ____cacheline_aligned_in_smp; } ____cacheline_aligned_in_smp;
#if defined(CONFIG_UNIX) #if defined(CONFIG_UNIX)
...@@ -176,8 +177,20 @@ struct sqe_submit { ...@@ -176,8 +177,20 @@ struct sqe_submit {
bool needs_fixed_file; bool needs_fixed_file;
}; };
struct io_poll_iocb {
struct file *file;
struct wait_queue_head *head;
__poll_t events;
bool woken;
bool canceled;
struct wait_queue_entry wait;
};
struct io_kiocb { struct io_kiocb {
union {
struct kiocb rw; struct kiocb rw;
struct io_poll_iocb poll;
};
struct sqe_submit submit; struct sqe_submit submit;
...@@ -261,6 +274,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) ...@@ -261,6 +274,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
init_waitqueue_head(&ctx->wait); init_waitqueue_head(&ctx->wait);
spin_lock_init(&ctx->completion_lock); spin_lock_init(&ctx->completion_lock);
INIT_LIST_HEAD(&ctx->poll_list); INIT_LIST_HEAD(&ctx->poll_list);
INIT_LIST_HEAD(&ctx->cancel_list);
return ctx; return ctx;
} }
...@@ -1058,6 +1072,246 @@ static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe, ...@@ -1058,6 +1072,246 @@ static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
return 0; return 0;
} }
static void io_poll_remove_one(struct io_kiocb *req)
{
struct io_poll_iocb *poll = &req->poll;
spin_lock(&poll->head->lock);
WRITE_ONCE(poll->canceled, true);
if (!list_empty(&poll->wait.entry)) {
list_del_init(&poll->wait.entry);
queue_work(req->ctx->sqo_wq, &req->work);
}
spin_unlock(&poll->head->lock);
list_del_init(&req->list);
}
static void io_poll_remove_all(struct io_ring_ctx *ctx)
{
struct io_kiocb *req;
spin_lock_irq(&ctx->completion_lock);
while (!list_empty(&ctx->cancel_list)) {
req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
io_poll_remove_one(req);
}
spin_unlock_irq(&ctx->completion_lock);
}
/*
* Find a running poll command that matches one specified in sqe->addr,
* and remove it if found.
*/
static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *poll_req, *next;
int ret = -ENOENT;
if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
return -EINVAL;
if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
sqe->poll_events)
return -EINVAL;
spin_lock_irq(&ctx->completion_lock);
list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
if (READ_ONCE(sqe->addr) == poll_req->user_data) {
io_poll_remove_one(poll_req);
ret = 0;
break;
}
}
spin_unlock_irq(&ctx->completion_lock);
io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
io_free_req(req);
return 0;
}
static void io_poll_complete(struct io_kiocb *req, __poll_t mask)
{
io_cqring_add_event(req->ctx, req->user_data, mangle_poll(mask), 0);
io_fput(req);
io_free_req(req);
}
static void io_poll_complete_work(struct work_struct *work)
{
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
struct io_poll_iocb *poll = &req->poll;
struct poll_table_struct pt = { ._key = poll->events };
struct io_ring_ctx *ctx = req->ctx;
__poll_t mask = 0;
if (!READ_ONCE(poll->canceled))
mask = vfs_poll(poll->file, &pt) & poll->events;
/*
* Note that ->ki_cancel callers also delete iocb from active_reqs after
* calling ->ki_cancel. We need the ctx_lock roundtrip here to
* synchronize with them. In the cancellation case the list_del_init
* itself is not actually needed, but harmless so we keep it in to
* avoid further branches in the fast path.
*/
spin_lock_irq(&ctx->completion_lock);
if (!mask && !READ_ONCE(poll->canceled)) {
add_wait_queue(poll->head, &poll->wait);
spin_unlock_irq(&ctx->completion_lock);
return;
}
list_del_init(&req->list);
spin_unlock_irq(&ctx->completion_lock);
io_poll_complete(req, mask);
}
static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
void *key)
{
struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
wait);
struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
struct io_ring_ctx *ctx = req->ctx;
__poll_t mask = key_to_poll(key);
poll->woken = true;
/* for instances that support it check for an event match first: */
if (mask) {
unsigned long flags;
if (!(mask & poll->events))
return 0;
/* try to complete the iocb inline if we can: */
if (spin_trylock_irqsave(&ctx->completion_lock, flags)) {
list_del(&req->list);
spin_unlock_irqrestore(&ctx->completion_lock, flags);
list_del_init(&poll->wait.entry);
io_poll_complete(req, mask);
return 1;
}
}
list_del_init(&poll->wait.entry);
queue_work(ctx->sqo_wq, &req->work);
return 1;
}
struct io_poll_table {
struct poll_table_struct pt;
struct io_kiocb *req;
int error;
};
static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
struct poll_table_struct *p)
{
struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
if (unlikely(pt->req->poll.head)) {
pt->error = -EINVAL;
return;
}
pt->error = 0;
pt->req->poll.head = head;
add_wait_queue(head, &pt->req->poll.wait);
}
static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_poll_iocb *poll = &req->poll;
struct io_ring_ctx *ctx = req->ctx;
struct io_poll_table ipt;
unsigned flags;
__poll_t mask;
u16 events;
int fd;
if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
return -EINVAL;
if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
return -EINVAL;
INIT_WORK(&req->work, io_poll_complete_work);
events = READ_ONCE(sqe->poll_events);
poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
flags = READ_ONCE(sqe->flags);
fd = READ_ONCE(sqe->fd);
if (flags & IOSQE_FIXED_FILE) {
if (unlikely(!ctx->user_files || fd >= ctx->nr_user_files))
return -EBADF;
poll->file = ctx->user_files[fd];
req->flags |= REQ_F_FIXED_FILE;
} else {
poll->file = fget(fd);
}
if (unlikely(!poll->file))
return -EBADF;
poll->head = NULL;
poll->woken = false;
poll->canceled = false;
ipt.pt._qproc = io_poll_queue_proc;
ipt.pt._key = poll->events;
ipt.req = req;
ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
/* initialized the list so that we can do list_empty checks */
INIT_LIST_HEAD(&poll->wait.entry);
init_waitqueue_func_entry(&poll->wait, io_poll_wake);
/* one for removal from waitqueue, one for this function */
refcount_set(&req->refs, 2);
mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
if (unlikely(!poll->head)) {
/* we did not manage to set up a waitqueue, done */
goto out;
}
spin_lock_irq(&ctx->completion_lock);
spin_lock(&poll->head->lock);
if (poll->woken) {
/* wake_up context handles the rest */
mask = 0;
ipt.error = 0;
} else if (mask || ipt.error) {
/* if we get an error or a mask we are done */
WARN_ON_ONCE(list_empty(&poll->wait.entry));
list_del_init(&poll->wait.entry);
} else {
/* actually waiting for an event */
list_add_tail(&req->list, &ctx->cancel_list);
}
spin_unlock(&poll->head->lock);
spin_unlock_irq(&ctx->completion_lock);
out:
if (unlikely(ipt.error)) {
if (!(flags & IOSQE_FIXED_FILE))
fput(poll->file);
/*
* Drop one of our refs to this req, __io_submit_sqe() will
* drop the other one since we're returning an error.
*/
io_free_req(req);
return ipt.error;
}
if (mask)
io_poll_complete(req, mask);
io_free_req(req);
return 0;
}
static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
const struct sqe_submit *s, bool force_nonblock, const struct sqe_submit *s, bool force_nonblock,
struct io_submit_state *state) struct io_submit_state *state)
...@@ -1093,6 +1347,12 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, ...@@ -1093,6 +1347,12 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
case IORING_OP_FSYNC: case IORING_OP_FSYNC:
ret = io_fsync(req, s->sqe, force_nonblock); ret = io_fsync(req, s->sqe, force_nonblock);
break; break;
case IORING_OP_POLL_ADD:
ret = io_poll_add(req, s->sqe);
break;
case IORING_OP_POLL_REMOVE:
ret = io_poll_remove(req, s->sqe);
break;
default: default:
ret = -EINVAL; ret = -EINVAL;
break; break;
...@@ -2131,6 +2391,7 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) ...@@ -2131,6 +2391,7 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
percpu_ref_kill(&ctx->refs); percpu_ref_kill(&ctx->refs);
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
io_poll_remove_all(ctx);
io_iopoll_reap_events(ctx); io_iopoll_reap_events(ctx);
wait_for_completion(&ctx->ctx_done); wait_for_completion(&ctx->ctx_done);
io_ring_ctx_free(ctx); io_ring_ctx_free(ctx);
......
...@@ -25,6 +25,7 @@ struct io_uring_sqe { ...@@ -25,6 +25,7 @@ struct io_uring_sqe {
union { union {
__kernel_rwf_t rw_flags; __kernel_rwf_t rw_flags;
__u32 fsync_flags; __u32 fsync_flags;
__u16 poll_events;
}; };
__u64 user_data; /* data to be passed back at completion time */ __u64 user_data; /* data to be passed back at completion time */
union { union {
...@@ -51,6 +52,8 @@ struct io_uring_sqe { ...@@ -51,6 +52,8 @@ struct io_uring_sqe {
#define IORING_OP_FSYNC 3 #define IORING_OP_FSYNC 3
#define IORING_OP_READ_FIXED 4 #define IORING_OP_READ_FIXED 4
#define IORING_OP_WRITE_FIXED 5 #define IORING_OP_WRITE_FIXED 5
#define IORING_OP_POLL_ADD 6
#define IORING_OP_POLL_REMOVE 7
/* /*
* sqe->fsync_flags * sqe->fsync_flags
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment