Commit 227c0c96 authored by Jens Axboe's avatar Jens Axboe

io_uring: internally retry short reads

We've had a few application cases of not handling short reads properly,
and it is understandable as short reads aren't really expected if the
application isn't doing non-blocking IO.

Now that we retain the iov_iter over retries, we can implement internal
retry pretty trivially. This ensures that we don't return a short read,
even for buffered reads on page cache conflicts.

Cleanup the deep nesting and hard to read nature of io_read() as well,
it's much more straight forward now to read and understand. Added a
few comments explaining the logic as well.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent ff6165b2
...@@ -510,6 +510,7 @@ struct io_async_rw { ...@@ -510,6 +510,7 @@ struct io_async_rw {
struct iovec fast_iov[UIO_FASTIOV]; struct iovec fast_iov[UIO_FASTIOV];
const struct iovec *free_iovec; const struct iovec *free_iovec;
struct iov_iter iter; struct iov_iter iter;
size_t bytes_done;
struct wait_page_queue wpq; struct wait_page_queue wpq;
}; };
...@@ -916,7 +917,7 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req, ...@@ -916,7 +917,7 @@ static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
bool needs_lock); bool needs_lock);
static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec, static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
const struct iovec *fast_iov, const struct iovec *fast_iov,
struct iov_iter *iter); struct iov_iter *iter, bool force);
static struct kmem_cache *req_cachep; static struct kmem_cache *req_cachep;
...@@ -2298,7 +2299,7 @@ static bool io_resubmit_prep(struct io_kiocb *req, int error) ...@@ -2298,7 +2299,7 @@ static bool io_resubmit_prep(struct io_kiocb *req, int error)
ret = io_import_iovec(rw, req, &iovec, &iter, false); ret = io_import_iovec(rw, req, &iovec, &iter, false);
if (ret < 0) if (ret < 0)
goto end_req; goto end_req;
ret = io_setup_async_rw(req, iovec, inline_vecs, &iter); ret = io_setup_async_rw(req, iovec, inline_vecs, &iter, false);
if (!ret) if (!ret)
return true; return true;
kfree(iovec); kfree(iovec);
...@@ -2588,6 +2589,14 @@ static void kiocb_done(struct kiocb *kiocb, ssize_t ret, ...@@ -2588,6 +2589,14 @@ static void kiocb_done(struct kiocb *kiocb, ssize_t ret,
{ {
struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb); struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
/* add previously done IO, if any */
if (req->io && req->io->rw.bytes_done > 0) {
if (ret < 0)
ret = req->io->rw.bytes_done;
else
ret += req->io->rw.bytes_done;
}
if (req->flags & REQ_F_CUR_POS) if (req->flags & REQ_F_CUR_POS)
req->file->f_pos = kiocb->ki_pos; req->file->f_pos = kiocb->ki_pos;
if (ret >= 0 && kiocb->ki_complete == io_complete_rw) if (ret >= 0 && kiocb->ki_complete == io_complete_rw)
...@@ -2935,6 +2944,7 @@ static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec, ...@@ -2935,6 +2944,7 @@ static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec,
memcpy(&rw->iter, iter, sizeof(*iter)); memcpy(&rw->iter, iter, sizeof(*iter));
rw->free_iovec = NULL; rw->free_iovec = NULL;
rw->bytes_done = 0;
/* can only be fixed buffers, no need to do anything */ /* can only be fixed buffers, no need to do anything */
if (iter->type == ITER_BVEC) if (iter->type == ITER_BVEC)
return; return;
...@@ -2971,9 +2981,9 @@ static int io_alloc_async_ctx(struct io_kiocb *req) ...@@ -2971,9 +2981,9 @@ static int io_alloc_async_ctx(struct io_kiocb *req)
static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec, static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
const struct iovec *fast_iov, const struct iovec *fast_iov,
struct iov_iter *iter) struct iov_iter *iter, bool force)
{ {
if (!io_op_defs[req->opcode].async_ctx) if (!force && !io_op_defs[req->opcode].async_ctx)
return 0; return 0;
if (!req->io) { if (!req->io) {
if (__io_alloc_async_ctx(req)) if (__io_alloc_async_ctx(req))
...@@ -3097,8 +3107,7 @@ static inline int kiocb_wait_page_queue_init(struct kiocb *kiocb, ...@@ -3097,8 +3107,7 @@ static inline int kiocb_wait_page_queue_init(struct kiocb *kiocb,
* succeed, or in rare cases where it fails, we then fall back to using the * succeed, or in rare cases where it fails, we then fall back to using the
* async worker threads for a blocking retry. * async worker threads for a blocking retry.
*/ */
static bool io_rw_should_retry(struct io_kiocb *req, struct iovec *iovec, static bool io_rw_should_retry(struct io_kiocb *req)
struct iovec *fast_iov, struct iov_iter *iter)
{ {
struct kiocb *kiocb = &req->rw.kiocb; struct kiocb *kiocb = &req->rw.kiocb;
int ret; int ret;
...@@ -3107,8 +3116,8 @@ static bool io_rw_should_retry(struct io_kiocb *req, struct iovec *iovec, ...@@ -3107,8 +3116,8 @@ static bool io_rw_should_retry(struct io_kiocb *req, struct iovec *iovec,
if (req->flags & REQ_F_NOWAIT) if (req->flags & REQ_F_NOWAIT)
return false; return false;
/* already tried, or we're doing O_DIRECT */ /* Only for buffered IO */
if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ)) if (kiocb->ki_flags & IOCB_DIRECT)
return false; return false;
/* /*
* just use poll if we can, and don't attempt if the fs doesn't * just use poll if we can, and don't attempt if the fs doesn't
...@@ -3117,16 +3126,6 @@ static bool io_rw_should_retry(struct io_kiocb *req, struct iovec *iovec, ...@@ -3117,16 +3126,6 @@ static bool io_rw_should_retry(struct io_kiocb *req, struct iovec *iovec,
if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC)) if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
return false; return false;
/*
* If request type doesn't require req->io to defer in general,
* we need to allocate it here
*/
if (!req->io) {
if (__io_alloc_async_ctx(req))
return false;
io_req_map_rw(req, iovec, fast_iov, iter);
}
ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq, ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq,
io_async_buf_func, req); io_async_buf_func, req);
if (!ret) { if (!ret) {
...@@ -3153,8 +3152,8 @@ static int io_read(struct io_kiocb *req, bool force_nonblock, ...@@ -3153,8 +3152,8 @@ static int io_read(struct io_kiocb *req, bool force_nonblock,
struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
struct kiocb *kiocb = &req->rw.kiocb; struct kiocb *kiocb = &req->rw.kiocb;
struct iov_iter __iter, *iter = &__iter; struct iov_iter __iter, *iter = &__iter;
ssize_t io_size, ret, ret2;
size_t iov_count; size_t iov_count;
ssize_t io_size, ret, ret2 = 0;
if (req->io) if (req->io)
iter = &req->io->rw.iter; iter = &req->io->rw.iter;
...@@ -3164,6 +3163,7 @@ static int io_read(struct io_kiocb *req, bool force_nonblock, ...@@ -3164,6 +3163,7 @@ static int io_read(struct io_kiocb *req, bool force_nonblock,
return ret; return ret;
io_size = ret; io_size = ret;
req->result = io_size; req->result = io_size;
ret = 0;
/* Ensure we clear previously set non-block flag */ /* Ensure we clear previously set non-block flag */
if (!force_nonblock) if (!force_nonblock)
...@@ -3178,31 +3178,62 @@ static int io_read(struct io_kiocb *req, bool force_nonblock, ...@@ -3178,31 +3178,62 @@ static int io_read(struct io_kiocb *req, bool force_nonblock,
if (unlikely(ret)) if (unlikely(ret))
goto out_free; goto out_free;
ret2 = io_iter_do_read(req, iter); ret = io_iter_do_read(req, iter);
/* Catch -EAGAIN return for forced non-blocking submission */ if (!ret) {
if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) { goto done;
kiocb_done(kiocb, ret2, cs); } else if (ret == -EIOCBQUEUED) {
} else { ret = 0;
copy_iov:
ret = io_setup_async_rw(req, iovec, inline_vecs, iter);
if (ret)
goto out_free; goto out_free;
/* it's copied and will be cleaned with ->io */ } else if (ret == -EAGAIN) {
iovec = NULL; ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false);
/* if we can retry, do so with the callbacks armed */ if (ret)
if (io_rw_should_retry(req, iovec, inline_vecs, iter)) {
ret2 = io_iter_do_read(req, iter);
if (ret2 == -EIOCBQUEUED) {
goto out_free; goto out_free;
} else if (ret2 != -EAGAIN) { return -EAGAIN;
kiocb_done(kiocb, ret2, cs); } else if (ret < 0) {
goto out_free; goto out_free;
} }
/* read it all, or we did blocking attempt. no retry. */
if (!iov_iter_count(iter) || !force_nonblock)
goto done;
io_size -= ret;
copy_iov:
ret2 = io_setup_async_rw(req, iovec, inline_vecs, iter, true);
if (ret2) {
ret = ret2;
goto out_free;
} }
/* it's copied and will be cleaned with ->io */
iovec = NULL;
/* now use our persistent iterator, if we aren't already */
iter = &req->io->rw.iter;
retry:
req->io->rw.bytes_done += ret;
/* if we can retry, do so with the callbacks armed */
if (!io_rw_should_retry(req)) {
kiocb->ki_flags &= ~IOCB_WAITQ; kiocb->ki_flags &= ~IOCB_WAITQ;
return -EAGAIN; return -EAGAIN;
} }
/*
* Now retry read with the IOCB_WAITQ parts set in the iocb. If we
* get -EIOCBQUEUED, then we'll get a notification when the desired
* page gets unlocked. We can also get a partial read here, and if we
* do, then just retry at the new offset.
*/
ret = io_iter_do_read(req, iter);
if (ret == -EIOCBQUEUED) {
ret = 0;
goto out_free;
} else if (ret > 0 && ret < io_size) {
/* we got some bytes, but not all. retry. */
goto retry;
}
done:
kiocb_done(kiocb, ret, cs);
ret = 0;
out_free: out_free:
if (iovec) if (iovec)
kfree(iovec); kfree(iovec);
...@@ -3295,7 +3326,7 @@ static int io_write(struct io_kiocb *req, bool force_nonblock, ...@@ -3295,7 +3326,7 @@ static int io_write(struct io_kiocb *req, bool force_nonblock,
kiocb_done(kiocb, ret2, cs); kiocb_done(kiocb, ret2, cs);
} else { } else {
copy_iov: copy_iov:
ret = io_setup_async_rw(req, iovec, inline_vecs, iter); ret = io_setup_async_rw(req, iovec, inline_vecs, iter, false);
if (!ret) if (!ret)
return -EAGAIN; return -EAGAIN;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment