Commit c8fe9b17 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: Asynchronous IO support

The basic idea of AIO support is simple, just call kiocb::ki_complete()
in OSD request's complete callback. But there are several special cases.

when IO span multiple objects, we need to wait until all OSD requests
are complete, then call kiocb::ki_complete(). Error handling in this case
is tricky too. For simplify, AIO both span multiple objects and extends
i_size are not allowed.

Another special case is check EOF for reading (other client can write to
the file and extend i_size concurrently). For simplify, the direct-IO/AIO
code path does do the check, fallback to normal syn read instead.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 458c4703
...@@ -397,8 +397,9 @@ int ceph_release(struct inode *inode, struct file *file) ...@@ -397,8 +397,9 @@ int ceph_release(struct inode *inode, struct file *file)
} }
enum { enum {
CHECK_EOF = 1, HAVE_RETRIED = 1,
READ_INLINE = 2, CHECK_EOF = 2,
READ_INLINE = 3,
}; };
/* /*
...@@ -411,17 +412,14 @@ enum { ...@@ -411,17 +412,14 @@ enum {
static int striped_read(struct inode *inode, static int striped_read(struct inode *inode,
u64 off, u64 len, u64 off, u64 len,
struct page **pages, int num_pages, struct page **pages, int num_pages,
int *checkeof, bool o_direct, int *checkeof)
unsigned long buf_align)
{ {
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
u64 pos, this_len, left; u64 pos, this_len, left;
int io_align, page_align; int page_align, pages_left;
int pages_left; int read, ret;
int read;
struct page **page_pos; struct page **page_pos;
int ret;
bool hit_stripe, was_short; bool hit_stripe, was_short;
/* /*
...@@ -432,12 +430,8 @@ static int striped_read(struct inode *inode, ...@@ -432,12 +430,8 @@ static int striped_read(struct inode *inode,
page_pos = pages; page_pos = pages;
pages_left = num_pages; pages_left = num_pages;
read = 0; read = 0;
io_align = off & ~PAGE_MASK;
more: more:
if (o_direct)
page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
else
page_align = pos & ~PAGE_MASK; page_align = pos & ~PAGE_MASK;
this_len = left; this_len = left;
ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
...@@ -457,8 +451,7 @@ static int striped_read(struct inode *inode, ...@@ -457,8 +451,7 @@ static int striped_read(struct inode *inode,
if (was_short && (pos + ret < inode->i_size)) { if (was_short && (pos + ret < inode->i_size)) {
int zlen = min(this_len - ret, int zlen = min(this_len - ret,
inode->i_size - pos - ret); inode->i_size - pos - ret);
int zoff = (o_direct ? buf_align : io_align) + int zoff = (off & ~PAGE_MASK) + read + ret;
read + ret;
dout(" zero gap %llu to %llu\n", dout(" zero gap %llu to %llu\n",
pos + ret, pos + ret + zlen); pos + ret, pos + ret + zlen);
ceph_zero_page_vector_range(zoff, zlen, pages); ceph_zero_page_vector_range(zoff, zlen, pages);
...@@ -521,46 +514,21 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, ...@@ -521,46 +514,21 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
if (ret < 0) if (ret < 0)
return ret; return ret;
if (iocb->ki_flags & IOCB_DIRECT) {
while (iov_iter_count(i)) {
size_t start;
ssize_t n;
n = dio_get_pagev_size(i);
pages = dio_get_pages_alloc(i, n, &start, &num_pages);
if (IS_ERR(pages))
return PTR_ERR(pages);
ret = striped_read(inode, off, n,
pages, num_pages, checkeof,
1, start);
ceph_put_page_vector(pages, num_pages, true);
if (ret <= 0)
break;
off += ret;
iov_iter_advance(i, ret);
if (ret < n)
break;
}
} else {
num_pages = calc_pages_for(off, len); num_pages = calc_pages_for(off, len);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages)) if (IS_ERR(pages))
return PTR_ERR(pages); return PTR_ERR(pages);
ret = striped_read(inode, off, len, pages, ret = striped_read(inode, off, len, pages,
num_pages, checkeof, 0, 0); num_pages, checkeof);
if (ret > 0) { if (ret > 0) {
int l, k = 0; int l, k = 0;
size_t left = ret; size_t left = ret;
while (left) { while (left) {
size_t page_off = off & ~PAGE_MASK; size_t page_off = off & ~PAGE_MASK;
size_t copy = min_t(size_t, size_t copy = min_t(size_t, left,
PAGE_SIZE - page_off, left); PAGE_SIZE - page_off);
l = copy_page_to_iter(pages[k++], page_off, l = copy_page_to_iter(pages[k++], page_off, copy, i);
copy, i);
off += l; off += l;
left -= l; left -= l;
if (l < copy) if (l < copy)
...@@ -568,7 +536,6 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, ...@@ -568,7 +536,6 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
} }
} }
ceph_release_page_vector(pages, num_pages); ceph_release_page_vector(pages, num_pages);
}
if (off > iocb->ki_pos) { if (off > iocb->ki_pos) {
ret = off - iocb->ki_pos; ret = off - iocb->ki_pos;
...@@ -579,6 +546,113 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, ...@@ -579,6 +546,113 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
return ret; return ret;
} }
struct ceph_aio_request {
struct kiocb *iocb;
size_t total_len;
int write;
int error;
struct list_head osd_reqs;
unsigned num_reqs;
atomic_t pending_reqs;
struct ceph_cap_flush *prealloc_cf;
};
static void ceph_aio_complete(struct inode *inode,
struct ceph_aio_request *aio_req)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int ret;
if (!atomic_dec_and_test(&aio_req->pending_reqs))
return;
ret = aio_req->error;
if (!ret)
ret = aio_req->total_len;
dout("ceph_aio_complete %p rc %d\n", inode, ret);
if (ret >= 0 && aio_req->write) {
int dirty;
loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
if (endoff > i_size_read(inode)) {
if (ceph_inode_set_size(inode, endoff))
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
}
spin_lock(&ci->i_ceph_lock);
ci->i_inline_version = CEPH_INLINE_NONE;
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
&aio_req->prealloc_cf);
spin_unlock(&ci->i_ceph_lock);
if (dirty)
__mark_inode_dirty(inode, dirty);
}
ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
CEPH_CAP_FILE_RD));
aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);
ceph_free_cap_flush(aio_req->prealloc_cf);
kfree(aio_req);
}
static void ceph_aio_complete_req(struct ceph_osd_request *req,
struct ceph_msg *msg)
{
int rc = req->r_result;
struct inode *inode = req->r_inode;
struct ceph_aio_request *aio_req = req->r_priv;
struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
int num_pages = calc_pages_for((u64)osd_data->alignment,
osd_data->length);
dout("ceph_aio_complete_req %p rc %d bytes %llu\n",
inode, rc, osd_data->length);
if (rc == -EOLDSNAPC) {
BUG_ON(1);
}
if (!aio_req->write) {
if (rc == -ENOENT)
rc = 0;
if (rc >= 0 && osd_data->length > rc) {
int zoff = osd_data->alignment + rc;
int zlen = osd_data->length - rc;
/*
* If read is satisfied by single OSD request,
* it can pass EOF. Otherwise read is within
* i_size.
*/
if (aio_req->num_reqs == 1) {
loff_t i_size = i_size_read(inode);
loff_t endoff = aio_req->iocb->ki_pos + rc;
if (endoff < i_size)
zlen = min_t(size_t, zlen,
i_size - endoff);
aio_req->total_len = rc + zlen;
}
if (zlen > 0)
ceph_zero_page_vector_range(zoff, zlen,
osd_data->pages);
}
}
ceph_put_page_vector(osd_data->pages, num_pages, false);
ceph_osdc_put_request(req);
if (rc < 0)
cmpxchg(&aio_req->error, 0, rc);
ceph_aio_complete(inode, aio_req);
return;
}
/* /*
* Write commit request unsafe callback, called to tell us when a * Write commit request unsafe callback, called to tell us when a
* request is unsafe (that is, in flight--has been handed to the * request is unsafe (that is, in flight--has been handed to the
...@@ -612,16 +686,10 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) ...@@ -612,16 +686,10 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
} }
/*
* Synchronous write, straight from __user pointer or user pages.
*
* If write spans object boundary, just do multiple writes. (For a
* correct atomic write, we should e.g. take write locks on all
* objects, rollback on failure, etc.)
*/
static ssize_t static ssize_t
ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
struct ceph_snap_context *snapc) struct ceph_snap_context *snapc,
struct ceph_cap_flush **pcf)
{ {
struct file *file = iocb->ki_filp; struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
...@@ -630,24 +698,26 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ...@@ -630,24 +698,26 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
struct ceph_vino vino; struct ceph_vino vino;
struct ceph_osd_request *req; struct ceph_osd_request *req;
struct page **pages; struct page **pages;
int num_pages; struct ceph_aio_request *aio_req = NULL;
int written = 0; int num_pages = 0;
int flags; int flags;
int check_caps = 0;
int ret; int ret;
struct timespec mtime = CURRENT_TIME; struct timespec mtime = CURRENT_TIME;
size_t count = iov_iter_count(from); size_t count = iov_iter_count(iter);
loff_t pos = iocb->ki_pos;
bool write = iov_iter_rw(iter) == WRITE;
if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
return -EROFS; return -EROFS;
dout("sync_direct_write on file %p %lld~%u\n", file, pos, dout("sync_direct_read_write (%s) on file %p %lld~%u\n",
(unsigned)count); (write ? "write" : "read"), file, pos, (unsigned)count);
ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
if (ret < 0) if (ret < 0)
return ret; return ret;
if (write) {
ret = invalidate_inode_pages2_range(inode->i_mapping, ret = invalidate_inode_pages2_range(inode->i_mapping,
pos >> PAGE_CACHE_SHIFT, pos >> PAGE_CACHE_SHIFT,
(pos + count) >> PAGE_CACHE_SHIFT); (pos + count) >> PAGE_CACHE_SHIFT);
...@@ -657,17 +727,23 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ...@@ -657,17 +727,23 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
flags = CEPH_OSD_FLAG_ORDERSNAP | flags = CEPH_OSD_FLAG_ORDERSNAP |
CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_ONDISK |
CEPH_OSD_FLAG_WRITE; CEPH_OSD_FLAG_WRITE;
} else {
flags = CEPH_OSD_FLAG_READ;
}
while (iov_iter_count(from) > 0) { while (iov_iter_count(iter) > 0) {
u64 len = dio_get_pagev_size(from); u64 size = dio_get_pagev_size(iter);
size_t start; size_t start = 0;
ssize_t n; ssize_t len;
vino = ceph_vino(inode); vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
vino, pos, &len, 0, vino, pos, &size, 0,
2,/*include a 'startsync' command*/ /*include a 'startsync' command*/
CEPH_OSD_OP_WRITE, flags, snapc, write ? 2 : 1,
write ? CEPH_OSD_OP_WRITE :
CEPH_OSD_OP_READ,
flags, snapc,
ci->i_truncate_seq, ci->i_truncate_seq,
ci->i_truncate_size, ci->i_truncate_size,
false); false);
...@@ -676,58 +752,135 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ...@@ -676,58 +752,135 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
break; break;
} }
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); len = size;
pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
n = len;
pages = dio_get_pages_alloc(from, len, &start, &num_pages);
if (IS_ERR(pages)) { if (IS_ERR(pages)) {
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
ret = PTR_ERR(pages); ret = PTR_ERR(pages);
break; break;
} }
/*
* To simplify error handling, allow AIO when IO within i_size
* or IO can be satisfied by single OSD request.
*/
if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
(len == count || pos + count <= i_size_read(inode))) {
aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
if (aio_req) {
aio_req->iocb = iocb;
aio_req->write = write;
INIT_LIST_HEAD(&aio_req->osd_reqs);
if (write) {
swap(aio_req->prealloc_cf, *pcf);
}
}
/* ignore error */
}
if (write) {
/* /*
* throw out any page cache pages in this range. this * throw out any page cache pages in this range. this
* may block. * may block.
*/ */
truncate_inode_pages_range(inode->i_mapping, pos, truncate_inode_pages_range(inode->i_mapping, pos,
(pos+n) | (PAGE_CACHE_SIZE-1)); (pos+len) | (PAGE_CACHE_SIZE - 1));
osd_req_op_extent_osd_data_pages(req, 0, pages, n, start,
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
}
osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
false, false); false, false);
/* BUG_ON(vino.snap != CEPH_NOSNAP); */
ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); if (aio_req) {
aio_req->total_len += len;
aio_req->num_reqs++;
atomic_inc(&aio_req->pending_reqs);
req->r_callback = ceph_aio_complete_req;
req->r_inode = inode;
req->r_priv = aio_req;
list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
pos += len;
iov_iter_advance(iter, len);
continue;
}
ret = ceph_osdc_start_request(req->r_osdc, req, false);
if (!ret) if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req); ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
size = i_size_read(inode);
if (!write) {
if (ret == -ENOENT)
ret = 0;
if (ret >= 0 && ret < len && pos + ret < size) {
int zlen = min_t(size_t, len - ret,
size - pos - ret);
ceph_zero_page_vector_range(start + ret, zlen,
pages);
ret += zlen;
}
if (ret >= 0)
len = ret;
}
ceph_put_page_vector(pages, num_pages, false); ceph_put_page_vector(pages, num_pages, false);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
if (ret) if (ret < 0)
break; break;
pos += n;
written += n;
iov_iter_advance(from, n);
if (pos > i_size_read(inode)) { pos += len;
check_caps = ceph_inode_set_size(inode, pos); iov_iter_advance(iter, len);
if (check_caps)
if (!write && pos >= size)
break;
if (write && pos > size) {
if (ceph_inode_set_size(inode, pos))
ceph_check_caps(ceph_inode(inode), ceph_check_caps(ceph_inode(inode),
CHECK_CAPS_AUTHONLY, CHECK_CAPS_AUTHONLY,
NULL); NULL);
} }
} }
if (ret != -EOLDSNAPC && written > 0) { if (aio_req) {
if (aio_req->num_reqs == 0) {
kfree(aio_req);
return ret;
}
ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
CEPH_CAP_FILE_RD);
while (!list_empty(&aio_req->osd_reqs)) {
req = list_first_entry(&aio_req->osd_reqs,
struct ceph_osd_request,
r_unsafe_item);
list_del_init(&req->r_unsafe_item);
if (ret >= 0)
ret = ceph_osdc_start_request(req->r_osdc,
req, false);
if (ret < 0) {
req->r_result = ret;
ceph_aio_complete_req(req, NULL);
}
}
return -EIOCBQUEUED;
}
if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
ret = pos - iocb->ki_pos;
iocb->ki_pos = pos; iocb->ki_pos = pos;
ret = written;
} }
return ret; return ret;
} }
/* /*
* Synchronous write, straight from __user pointer or user pages. * Synchronous write, straight from __user pointer or user pages.
* *
...@@ -897,8 +1050,14 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -897,8 +1050,14 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
ceph_cap_string(got)); ceph_cap_string(got));
if (ci->i_inline_version == CEPH_INLINE_NONE) { if (ci->i_inline_version == CEPH_INLINE_NONE) {
/* hmm, this isn't really async... */ if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
ret = ceph_direct_read_write(iocb, to,
NULL, NULL);
if (ret >= 0 && ret < len)
retry_op = CHECK_EOF;
} else {
ret = ceph_sync_read(iocb, to, &retry_op); ret = ceph_sync_read(iocb, to, &retry_op);
}
} else { } else {
retry_op = READ_INLINE; retry_op = READ_INLINE;
} }
...@@ -916,7 +1075,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -916,7 +1075,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
pinned_page = NULL; pinned_page = NULL;
} }
ceph_put_cap_refs(ci, got); ceph_put_cap_refs(ci, got);
if (retry_op && ret >= 0) { if (retry_op > HAVE_RETRIED && ret >= 0) {
int statret; int statret;
struct page *page = NULL; struct page *page = NULL;
loff_t i_size; loff_t i_size;
...@@ -973,7 +1132,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -973,7 +1132,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
read += ret; read += ret;
len -= ret; len -= ret;
retry_op = 0; retry_op = HAVE_RETRIED;
goto again; goto again;
} }
} }
...@@ -1088,8 +1247,8 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) ...@@ -1088,8 +1247,8 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
/* we might need to revert back to that point */ /* we might need to revert back to that point */
data = *from; data = *from;
if (iocb->ki_flags & IOCB_DIRECT) if (iocb->ki_flags & IOCB_DIRECT)
written = ceph_sync_direct_write(iocb, &data, pos, written = ceph_direct_read_write(iocb, &data, snapc,
snapc); &prealloc_cf);
else else
written = ceph_sync_write(iocb, &data, pos, snapc); written = ceph_sync_write(iocb, &data, pos, snapc);
if (written == -EOLDSNAPC) { if (written == -EOLDSNAPC) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment