Commit 05589553 authored by Xiaoguang Wang's avatar Xiaoguang Wang Committed by Jens Axboe

io_uring: refactor file register/unregister/update handling

While diving into io_uring fileset register/unregister/update codes, we
found one bug in the fileset update handling. io_uring fileset update
use a percpu_ref variable to check whether we can put the previously
registered file, only when the refcnt of the perfcpu_ref variable
reaches zero, can we safely put these files. But this doesn't work so
well. If applications always issue requests continually, this
perfcpu_ref will never have an chance to reach zero, and it'll always be
in atomic mode, also will defeat the gains introduced by fileset
register/unresiger/update feature, which are used to reduce the atomic
operation overhead of fput/fget.

To fix this issue, while applications do IORING_REGISTER_FILES or
IORING_REGISTER_FILES_UPDATE operations, we allocate a new percpu_ref
and kill the old percpu_ref, new requests will use the new percpu_ref.
Once all previous old requests complete, old percpu_refs will be dropped
and registered files will be put safely.

Link: https://lore.kernel.org/io-uring/5a8dac33-4ca2-4847-b091-f7dcd3ad0ff3@linux.alibaba.com/T/#tSigned-off-by: default avatarXiaoguang Wang <xiaoguang.wang@linux.alibaba.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 458ef2a2
...@@ -186,14 +186,23 @@ struct fixed_file_table { ...@@ -186,14 +186,23 @@ struct fixed_file_table {
struct file **files; struct file **files;
}; };
struct fixed_file_ref_node {
struct percpu_ref refs;
struct list_head node;
struct list_head file_list;
struct fixed_file_data *file_data;
struct work_struct work;
};
struct fixed_file_data { struct fixed_file_data {
struct fixed_file_table *table; struct fixed_file_table *table;
struct io_ring_ctx *ctx; struct io_ring_ctx *ctx;
struct percpu_ref *cur_refs;
struct percpu_ref refs; struct percpu_ref refs;
struct llist_head put_llist;
struct work_struct ref_work;
struct completion done; struct completion done;
struct list_head ref_list;
spinlock_t lock;
}; };
struct io_buffer { struct io_buffer {
...@@ -618,6 +627,8 @@ struct io_kiocb { ...@@ -618,6 +627,8 @@ struct io_kiocb {
struct list_head inflight_entry; struct list_head inflight_entry;
struct percpu_ref *fixed_file_refs;
union { union {
/* /*
* Only commands that never go async can use the below fields, * Only commands that never go async can use the below fields,
...@@ -848,7 +859,6 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, ...@@ -848,7 +859,6 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
struct io_uring_files_update *ip, struct io_uring_files_update *ip,
unsigned nr_args); unsigned nr_args);
static int io_grab_files(struct io_kiocb *req); static int io_grab_files(struct io_kiocb *req);
static void io_ring_file_ref_flush(struct fixed_file_data *data);
static void io_cleanup_req(struct io_kiocb *req); static void io_cleanup_req(struct io_kiocb *req);
static int io_file_get(struct io_submit_state *state, struct io_kiocb *req, static int io_file_get(struct io_submit_state *state, struct io_kiocb *req,
int fd, struct file **out_file, bool fixed); int fd, struct file **out_file, bool fixed);
...@@ -1341,7 +1351,7 @@ static inline void io_put_file(struct io_kiocb *req, struct file *file, ...@@ -1341,7 +1351,7 @@ static inline void io_put_file(struct io_kiocb *req, struct file *file,
bool fixed) bool fixed)
{ {
if (fixed) if (fixed)
percpu_ref_put(&req->ctx->file_data->refs); percpu_ref_put(req->fixed_file_refs);
else else
fput(file); fput(file);
} }
...@@ -1393,21 +1403,18 @@ struct req_batch { ...@@ -1393,21 +1403,18 @@ struct req_batch {
static void io_free_req_many(struct io_ring_ctx *ctx, struct req_batch *rb) static void io_free_req_many(struct io_ring_ctx *ctx, struct req_batch *rb)
{ {
int fixed_refs = rb->to_free;
if (!rb->to_free) if (!rb->to_free)
return; return;
if (rb->need_iter) { if (rb->need_iter) {
int i, inflight = 0; int i, inflight = 0;
unsigned long flags; unsigned long flags;
fixed_refs = 0;
for (i = 0; i < rb->to_free; i++) { for (i = 0; i < rb->to_free; i++) {
struct io_kiocb *req = rb->reqs[i]; struct io_kiocb *req = rb->reqs[i];
if (req->flags & REQ_F_FIXED_FILE) { if (req->flags & REQ_F_FIXED_FILE) {
req->file = NULL; req->file = NULL;
fixed_refs++; percpu_ref_put(req->fixed_file_refs);
} }
if (req->flags & REQ_F_INFLIGHT) if (req->flags & REQ_F_INFLIGHT)
inflight++; inflight++;
...@@ -1433,8 +1440,6 @@ static void io_free_req_many(struct io_ring_ctx *ctx, struct req_batch *rb) ...@@ -1433,8 +1440,6 @@ static void io_free_req_many(struct io_ring_ctx *ctx, struct req_batch *rb)
} }
do_free: do_free:
kmem_cache_free_bulk(req_cachep, rb->to_free, rb->reqs); kmem_cache_free_bulk(req_cachep, rb->to_free, rb->reqs);
if (fixed_refs)
percpu_ref_put_many(&ctx->file_data->refs, fixed_refs);
percpu_ref_put_many(&ctx->refs, rb->to_free); percpu_ref_put_many(&ctx->refs, rb->to_free);
rb->to_free = rb->need_iter = 0; rb->to_free = rb->need_iter = 0;
} }
...@@ -5331,7 +5336,8 @@ static int io_file_get(struct io_submit_state *state, struct io_kiocb *req, ...@@ -5331,7 +5336,8 @@ static int io_file_get(struct io_submit_state *state, struct io_kiocb *req,
file = io_file_from_index(ctx, fd); file = io_file_from_index(ctx, fd);
if (!file) if (!file)
return -EBADF; return -EBADF;
percpu_ref_get(&ctx->file_data->refs); req->fixed_file_refs = ctx->file_data->cur_refs;
percpu_ref_get(req->fixed_file_refs);
} else { } else {
trace_io_uring_file_get(ctx, fd); trace_io_uring_file_get(ctx, fd);
file = __io_file_get(state, fd); file = __io_file_get(state, fd);
...@@ -6124,43 +6130,36 @@ static void io_file_ref_kill(struct percpu_ref *ref) ...@@ -6124,43 +6130,36 @@ static void io_file_ref_kill(struct percpu_ref *ref)
complete(&data->done); complete(&data->done);
} }
static void io_file_ref_exit_and_free(struct work_struct *work)
{
struct fixed_file_data *data;
data = container_of(work, struct fixed_file_data, ref_work);
/*
* Ensure any percpu-ref atomic switch callback has run, it could have
* been in progress when the files were being unregistered. Once
* that's done, we can safely exit and free the ref and containing
* data structure.
*/
rcu_barrier();
percpu_ref_exit(&data->refs);
kfree(data);
}
static int io_sqe_files_unregister(struct io_ring_ctx *ctx) static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
{ {
struct fixed_file_data *data = ctx->file_data; struct fixed_file_data *data = ctx->file_data;
struct fixed_file_ref_node *ref_node = NULL;
unsigned nr_tables, i; unsigned nr_tables, i;
unsigned long flags;
if (!data) if (!data)
return -ENXIO; return -ENXIO;
percpu_ref_kill_and_confirm(&data->refs, io_file_ref_kill); spin_lock_irqsave(&data->lock, flags);
flush_work(&data->ref_work); if (!list_empty(&data->ref_list))
ref_node = list_first_entry(&data->ref_list,
struct fixed_file_ref_node, node);
spin_unlock_irqrestore(&data->lock, flags);
if (ref_node)
percpu_ref_kill(&ref_node->refs);
percpu_ref_kill(&data->refs);
/* wait for all refs nodes to complete */
wait_for_completion(&data->done); wait_for_completion(&data->done);
io_ring_file_ref_flush(data);
__io_sqe_files_unregister(ctx); __io_sqe_files_unregister(ctx);
nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE); nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
for (i = 0; i < nr_tables; i++) for (i = 0; i < nr_tables; i++)
kfree(data->table[i].files); kfree(data->table[i].files);
kfree(data->table); kfree(data->table);
INIT_WORK(&data->ref_work, io_file_ref_exit_and_free); percpu_ref_exit(&data->refs);
queue_work(system_wq, &data->ref_work); kfree(data);
ctx->file_data = NULL; ctx->file_data = NULL;
ctx->nr_user_files = 0; ctx->nr_user_files = 0;
return 0; return 0;
...@@ -6385,46 +6384,72 @@ static void io_ring_file_put(struct io_ring_ctx *ctx, struct file *file) ...@@ -6385,46 +6384,72 @@ static void io_ring_file_put(struct io_ring_ctx *ctx, struct file *file)
} }
struct io_file_put { struct io_file_put {
struct llist_node llist; struct list_head list;
struct file *file; struct file *file;
}; };
static void io_ring_file_ref_flush(struct fixed_file_data *data) static void io_file_put_work(struct work_struct *work)
{ {
struct fixed_file_ref_node *ref_node;
struct fixed_file_data *file_data;
struct io_ring_ctx *ctx;
struct io_file_put *pfile, *tmp; struct io_file_put *pfile, *tmp;
struct llist_node *node; unsigned long flags;
ref_node = container_of(work, struct fixed_file_ref_node, work);
file_data = ref_node->file_data;
ctx = file_data->ctx;
while ((node = llist_del_all(&data->put_llist)) != NULL) { list_for_each_entry_safe(pfile, tmp, &ref_node->file_list, list) {
llist_for_each_entry_safe(pfile, tmp, node, llist) { list_del_init(&pfile->list);
io_ring_file_put(data->ctx, pfile->file); io_ring_file_put(ctx, pfile->file);
kfree(pfile); kfree(pfile);
} }
}
spin_lock_irqsave(&file_data->lock, flags);
list_del_init(&ref_node->node);
spin_unlock_irqrestore(&file_data->lock, flags);
percpu_ref_exit(&ref_node->refs);
kfree(ref_node);
percpu_ref_put(&file_data->refs);
} }
static void io_ring_file_ref_switch(struct work_struct *work) static void io_file_data_ref_zero(struct percpu_ref *ref)
{ {
struct fixed_file_data *data; struct fixed_file_ref_node *ref_node;
ref_node = container_of(ref, struct fixed_file_ref_node, refs);
data = container_of(work, struct fixed_file_data, ref_work); queue_work(system_wq, &ref_node->work);
io_ring_file_ref_flush(data);
percpu_ref_switch_to_percpu(&data->refs);
} }
static void io_file_data_ref_zero(struct percpu_ref *ref) static struct fixed_file_ref_node *alloc_fixed_file_ref_node(
struct io_ring_ctx *ctx)
{ {
struct fixed_file_data *data; struct fixed_file_ref_node *ref_node;
data = container_of(ref, struct fixed_file_data, refs); ref_node = kzalloc(sizeof(*ref_node), GFP_KERNEL);
if (!ref_node)
return ERR_PTR(-ENOMEM);
if (percpu_ref_init(&ref_node->refs, io_file_data_ref_zero,
0, GFP_KERNEL)) {
kfree(ref_node);
return ERR_PTR(-ENOMEM);
}
INIT_LIST_HEAD(&ref_node->node);
INIT_LIST_HEAD(&ref_node->file_list);
INIT_WORK(&ref_node->work, io_file_put_work);
ref_node->file_data = ctx->file_data;
return ref_node;
/* }
* We can't safely switch from inside this context, punt to wq. If
* the table ref is going away, the table is being unregistered. static void destroy_fixed_file_ref_node(struct fixed_file_ref_node *ref_node)
* Don't queue up the async work for that case, the caller will {
* handle it. percpu_ref_exit(&ref_node->refs);
*/ kfree(ref_node);
if (!percpu_ref_is_dying(&data->refs))
queue_work(system_wq, &data->ref_work);
} }
static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
...@@ -6435,6 +6460,8 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, ...@@ -6435,6 +6460,8 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
struct file *file; struct file *file;
int fd, ret = 0; int fd, ret = 0;
unsigned i; unsigned i;
struct fixed_file_ref_node *ref_node;
unsigned long flags;
if (ctx->file_data) if (ctx->file_data)
return -EBUSY; return -EBUSY;
...@@ -6448,6 +6475,7 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, ...@@ -6448,6 +6475,7 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
return -ENOMEM; return -ENOMEM;
ctx->file_data->ctx = ctx; ctx->file_data->ctx = ctx;
init_completion(&ctx->file_data->done); init_completion(&ctx->file_data->done);
INIT_LIST_HEAD(&ctx->file_data->ref_list);
nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE); nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
ctx->file_data->table = kcalloc(nr_tables, ctx->file_data->table = kcalloc(nr_tables,
...@@ -6459,15 +6487,13 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, ...@@ -6459,15 +6487,13 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
return -ENOMEM; return -ENOMEM;
} }
if (percpu_ref_init(&ctx->file_data->refs, io_file_data_ref_zero, if (percpu_ref_init(&ctx->file_data->refs, io_file_ref_kill,
PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) { PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
kfree(ctx->file_data->table); kfree(ctx->file_data->table);
kfree(ctx->file_data); kfree(ctx->file_data);
ctx->file_data = NULL; ctx->file_data = NULL;
return -ENOMEM; return -ENOMEM;
} }
ctx->file_data->put_llist.first = NULL;
INIT_WORK(&ctx->file_data->ref_work, io_ring_file_ref_switch);
if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) { if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
percpu_ref_exit(&ctx->file_data->refs); percpu_ref_exit(&ctx->file_data->refs);
...@@ -6530,9 +6556,22 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, ...@@ -6530,9 +6556,22 @@ static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
} }
ret = io_sqe_files_scm(ctx); ret = io_sqe_files_scm(ctx);
if (ret) if (ret) {
io_sqe_files_unregister(ctx); io_sqe_files_unregister(ctx);
return ret;
}
ref_node = alloc_fixed_file_ref_node(ctx);
if (IS_ERR(ref_node)) {
io_sqe_files_unregister(ctx);
return PTR_ERR(ref_node);
}
ctx->file_data->cur_refs = &ref_node->refs;
spin_lock_irqsave(&ctx->file_data->lock, flags);
list_add(&ref_node->node, &ctx->file_data->ref_list);
spin_unlock_irqrestore(&ctx->file_data->lock, flags);
percpu_ref_get(&ctx->file_data->refs);
return ret; return ret;
} }
...@@ -6579,30 +6618,21 @@ static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file, ...@@ -6579,30 +6618,21 @@ static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
#endif #endif
} }
static void io_atomic_switch(struct percpu_ref *ref)
{
struct fixed_file_data *data;
/*
* Juggle reference to ensure we hit zero, if needed, so we can
* switch back to percpu mode
*/
data = container_of(ref, struct fixed_file_data, refs);
percpu_ref_put(&data->refs);
percpu_ref_get(&data->refs);
}
static int io_queue_file_removal(struct fixed_file_data *data, static int io_queue_file_removal(struct fixed_file_data *data,
struct file *file) struct file *file)
{ {
struct io_file_put *pfile; struct io_file_put *pfile;
struct percpu_ref *refs = data->cur_refs;
struct fixed_file_ref_node *ref_node;
pfile = kzalloc(sizeof(*pfile), GFP_KERNEL); pfile = kzalloc(sizeof(*pfile), GFP_KERNEL);
if (!pfile) if (!pfile)
return -ENOMEM; return -ENOMEM;
ref_node = container_of(refs, struct fixed_file_ref_node, refs);
pfile->file = file; pfile->file = file;
llist_add(&pfile->llist, &data->put_llist); list_add(&pfile->list, &ref_node->file_list);
return 0; return 0;
} }
...@@ -6611,17 +6641,23 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, ...@@ -6611,17 +6641,23 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
unsigned nr_args) unsigned nr_args)
{ {
struct fixed_file_data *data = ctx->file_data; struct fixed_file_data *data = ctx->file_data;
bool ref_switch = false; struct fixed_file_ref_node *ref_node;
struct file *file; struct file *file;
__s32 __user *fds; __s32 __user *fds;
int fd, i, err; int fd, i, err;
__u32 done; __u32 done;
unsigned long flags;
bool needs_switch = false;
if (check_add_overflow(up->offset, nr_args, &done)) if (check_add_overflow(up->offset, nr_args, &done))
return -EOVERFLOW; return -EOVERFLOW;
if (done > ctx->nr_user_files) if (done > ctx->nr_user_files)
return -EINVAL; return -EINVAL;
ref_node = alloc_fixed_file_ref_node(ctx);
if (IS_ERR(ref_node))
return PTR_ERR(ref_node);
done = 0; done = 0;
fds = u64_to_user_ptr(up->fds); fds = u64_to_user_ptr(up->fds);
while (nr_args) { while (nr_args) {
...@@ -6642,7 +6678,7 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, ...@@ -6642,7 +6678,7 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
if (err) if (err)
break; break;
table->files[index] = NULL; table->files[index] = NULL;
ref_switch = true; needs_switch = true;
} }
if (fd != -1) { if (fd != -1) {
file = fget(fd); file = fget(fd);
...@@ -6673,11 +6709,19 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx, ...@@ -6673,11 +6709,19 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
up->offset++; up->offset++;
} }
if (ref_switch) if (needs_switch) {
percpu_ref_switch_to_atomic(&data->refs, io_atomic_switch); percpu_ref_kill(data->cur_refs);
spin_lock_irqsave(&data->lock, flags);
list_add(&ref_node->node, &data->ref_list);
data->cur_refs = &ref_node->refs;
spin_unlock_irqrestore(&data->lock, flags);
percpu_ref_get(&ctx->file_data->refs);
} else
destroy_fixed_file_ref_node(ref_node);
return done ? done : err; return done ? done : err;
} }
static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg, static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
unsigned nr_args) unsigned nr_args)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment