Commit 34e83fc6 authored by Kent Overstreet's avatar Kent Overstreet Committed by Benjamin LaHaise

aio: reqs_active -> reqs_available

The number of outstanding kiocbs is one of the few shared things left that
has to be touched for every kiocb - it'd be nice to make it percpu.

We can make it per cpu by treating it like an allocation problem: we have
a maximum number of kiocbs that can be outstanding (i.e.  slots) - then we
just allocate and free slots, and we know how to write per cpu allocators.

So as prep work for that, we convert reqs_active to reqs_available.
Signed-off-by: default avatarKent Overstreet <koverstreet@google.com>
Cc: Zach Brown <zab@redhat.com>
Cc: Felipe Balbi <balbi@ti.com>
Cc: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
Cc: Mark Fasheh <mfasheh@suse.com>
Cc: Joel Becker <jlbec@evilplan.org>
Cc: Rusty Russell <rusty@rustcorp.com.au>
Cc: Jens Axboe <axboe@kernel.dk>
Cc: Asai Thambi S P <asamymuthupa@micron.com>
Cc: Selvan Mani <smani@micron.com>
Cc: Sam Bradshaw <sbradshaw@micron.com>
Cc: Jeff Moyer <jmoyer@redhat.com>
Cc: Al Viro <viro@zeniv.linux.org.uk>
Cc: Benjamin LaHaise <bcrl@kvack.org>
Reviewed-by: default avatar"Theodore Ts'o" <tytso@mit.edu>
Signed-off-by: default avatarAndrew Morton <akpm@linux-foundation.org>
Signed-off-by: default avatarBenjamin LaHaise <bcrl@kvack.org>
parent 0c45355f
...@@ -94,7 +94,13 @@ struct kioctx { ...@@ -94,7 +94,13 @@ struct kioctx {
struct work_struct rcu_work; struct work_struct rcu_work;
struct { struct {
atomic_t reqs_active; /*
* This counts the number of available slots in the ringbuffer,
* so we avoid overflowing it: it's decremented (if positive)
* when allocating a kiocb and incremented when the resulting
* io_event is pulled off the ringbuffer.
*/
atomic_t reqs_available;
} ____cacheline_aligned_in_smp; } ____cacheline_aligned_in_smp;
struct { struct {
...@@ -404,19 +410,20 @@ static void free_ioctx(struct kioctx *ctx) ...@@ -404,19 +410,20 @@ static void free_ioctx(struct kioctx *ctx)
head = ring->head; head = ring->head;
kunmap_atomic(ring); kunmap_atomic(ring);
while (atomic_read(&ctx->reqs_active) > 0) { while (atomic_read(&ctx->reqs_available) < ctx->nr_events - 1) {
wait_event(ctx->wait, wait_event(ctx->wait,
head != ctx->tail || (head != ctx->tail) ||
atomic_read(&ctx->reqs_active) <= 0); (atomic_read(&ctx->reqs_available) >=
ctx->nr_events - 1));
avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head; avail = (head <= ctx->tail ? ctx->tail : ctx->nr_events) - head;
atomic_sub(avail, &ctx->reqs_active); atomic_add(avail, &ctx->reqs_available);
head += avail; head += avail;
head %= ctx->nr_events; head %= ctx->nr_events;
} }
WARN_ON(atomic_read(&ctx->reqs_active) < 0); WARN_ON(atomic_read(&ctx->reqs_available) > ctx->nr_events - 1);
aio_free_ring(ctx); aio_free_ring(ctx);
...@@ -475,6 +482,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) ...@@ -475,6 +482,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
if (aio_setup_ring(ctx) < 0) if (aio_setup_ring(ctx) < 0)
goto out_freectx; goto out_freectx;
atomic_set(&ctx->reqs_available, ctx->nr_events - 1);
/* limit the number of system wide aios */ /* limit the number of system wide aios */
spin_lock(&aio_nr_lock); spin_lock(&aio_nr_lock);
if (aio_nr + nr_events > aio_max_nr || if (aio_nr + nr_events > aio_max_nr ||
...@@ -586,7 +595,7 @@ void exit_aio(struct mm_struct *mm) ...@@ -586,7 +595,7 @@ void exit_aio(struct mm_struct *mm)
"exit_aio:ioctx still alive: %d %d %d\n", "exit_aio:ioctx still alive: %d %d %d\n",
atomic_read(&ctx->users), atomic_read(&ctx->users),
atomic_read(&ctx->dead), atomic_read(&ctx->dead),
atomic_read(&ctx->reqs_active)); atomic_read(&ctx->reqs_available));
/* /*
* We don't need to bother with munmap() here - * We don't need to bother with munmap() here -
* exit_mmap(mm) is coming and it'll unmap everything. * exit_mmap(mm) is coming and it'll unmap everything.
...@@ -615,12 +624,9 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx) ...@@ -615,12 +624,9 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
{ {
struct kiocb *req; struct kiocb *req;
if (atomic_read(&ctx->reqs_active) >= ctx->nr_events) if (atomic_dec_if_positive(&ctx->reqs_available) <= 0)
return NULL; return NULL;
if (atomic_inc_return(&ctx->reqs_active) > ctx->nr_events - 1)
goto out_put;
req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO); req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
if (unlikely(!req)) if (unlikely(!req))
goto out_put; goto out_put;
...@@ -630,7 +636,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx) ...@@ -630,7 +636,7 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
return req; return req;
out_put: out_put:
atomic_dec(&ctx->reqs_active); atomic_inc(&ctx->reqs_available);
return NULL; return NULL;
} }
...@@ -701,7 +707,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2) ...@@ -701,7 +707,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
/* /*
* Take rcu_read_lock() in case the kioctx is being destroyed, as we * Take rcu_read_lock() in case the kioctx is being destroyed, as we
* need to issue a wakeup after decrementing reqs_active. * need to issue a wakeup after incrementing reqs_available.
*/ */
rcu_read_lock(); rcu_read_lock();
...@@ -719,7 +725,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2) ...@@ -719,7 +725,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
*/ */
if (unlikely(xchg(&iocb->ki_cancel, if (unlikely(xchg(&iocb->ki_cancel,
KIOCB_CANCELLED) == KIOCB_CANCELLED)) { KIOCB_CANCELLED) == KIOCB_CANCELLED)) {
atomic_dec(&ctx->reqs_active); atomic_inc(&ctx->reqs_available);
/* Still need the wake_up in case free_ioctx is waiting */ /* Still need the wake_up in case free_ioctx is waiting */
goto put_rq; goto put_rq;
} }
...@@ -857,7 +863,7 @@ static long aio_read_events_ring(struct kioctx *ctx, ...@@ -857,7 +863,7 @@ static long aio_read_events_ring(struct kioctx *ctx,
pr_debug("%li h%u t%u\n", ret, head, ctx->tail); pr_debug("%li h%u t%u\n", ret, head, ctx->tail);
atomic_sub(ret, &ctx->reqs_active); atomic_add(ret, &ctx->reqs_available);
out: out:
mutex_unlock(&ctx->ring_lock); mutex_unlock(&ctx->ring_lock);
...@@ -1241,7 +1247,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, ...@@ -1241,7 +1247,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
aio_put_req(req); /* drop extra ref to req */ aio_put_req(req); /* drop extra ref to req */
return 0; return 0;
out_put_req: out_put_req:
atomic_dec(&ctx->reqs_active); atomic_inc(&ctx->reqs_available);
aio_put_req(req); /* drop extra ref to req */ aio_put_req(req); /* drop extra ref to req */
aio_put_req(req); /* drop i/o ref to req */ aio_put_req(req); /* drop i/o ref to req */
return ret; return ret;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment