Commit 7ce469a5 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: fix splice read for no Fc capability case

When iov_iter type is ITER_PIPE, copy_page_to_iter() increases
the page's reference and add the page to a pipe_buffer. It also
set the pipe_buffer's ops to page_cache_pipe_buf_ops. The comfirm
callback in page_cache_pipe_buf_ops expects the page is from page
cache and uptodate, otherwise it return error.

For ceph_sync_read() case, pages are not from page cache. So we
can't call copy_page_to_iter() when iov_iter type is ITER_PIPE.
The fix is using iov_iter_get_pages_alloc() to allocate pages
for the pipe. (the code is similar to default_file_splice_read)
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 2b1ac852
...@@ -458,71 +458,60 @@ enum { ...@@ -458,71 +458,60 @@ enum {
* only return a short read to the caller if we hit EOF. * only return a short read to the caller if we hit EOF.
*/ */
static int striped_read(struct inode *inode, static int striped_read(struct inode *inode,
u64 off, u64 len, u64 pos, u64 len,
struct page **pages, int num_pages, struct page **pages, int num_pages,
int *checkeof) int page_align, int *checkeof)
{ {
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
u64 pos, this_len, left; u64 this_len;
loff_t i_size; loff_t i_size;
int page_align, pages_left; int page_idx;
int read, ret; int ret, read = 0;
struct page **page_pos;
bool hit_stripe, was_short; bool hit_stripe, was_short;
/* /*
* we may need to do multiple reads. not atomic, unfortunately. * we may need to do multiple reads. not atomic, unfortunately.
*/ */
pos = off;
left = len;
page_pos = pages;
pages_left = num_pages;
read = 0;
more: more:
page_align = pos & ~PAGE_MASK; this_len = len;
this_len = left; page_idx = (page_align + read) >> PAGE_SHIFT;
ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
&ci->i_layout, pos, &this_len, &ci->i_layout, pos, &this_len,
ci->i_truncate_seq, ci->i_truncate_seq, ci->i_truncate_size,
ci->i_truncate_size, pages + page_idx, num_pages - page_idx,
page_pos, pages_left, page_align); ((page_align + read) & ~PAGE_MASK));
if (ret == -ENOENT) if (ret == -ENOENT)
ret = 0; ret = 0;
hit_stripe = this_len < left; hit_stripe = this_len < len;
was_short = ret >= 0 && ret < this_len; was_short = ret >= 0 && ret < this_len;
dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read, dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
i_size = i_size_read(inode); i_size = i_size_read(inode);
if (ret >= 0) { if (ret >= 0) {
int didpages;
if (was_short && (pos + ret < i_size)) { if (was_short && (pos + ret < i_size)) {
int zlen = min(this_len - ret, i_size - pos - ret); int zlen = min(this_len - ret, i_size - pos - ret);
int zoff = (off & ~PAGE_MASK) + read + ret; int zoff = page_align + read + ret;
dout(" zero gap %llu to %llu\n", dout(" zero gap %llu to %llu\n",
pos + ret, pos + ret + zlen); pos + ret, pos + ret + zlen);
ceph_zero_page_vector_range(zoff, zlen, pages); ceph_zero_page_vector_range(zoff, zlen, pages);
ret += zlen; ret += zlen;
} }
didpages = (page_align + ret) >> PAGE_SHIFT; read += ret;
pos += ret; pos += ret;
read = pos - off; len -= ret;
left -= ret;
page_pos += didpages;
pages_left -= didpages;
/* hit stripe and need continue*/ /* hit stripe and need continue*/
if (left && hit_stripe && pos < i_size) if (len && hit_stripe && pos < i_size)
goto more; goto more;
} }
if (read > 0) { if (read > 0) {
ret = read; ret = read;
/* did we bounce off eof? */ /* did we bounce off eof? */
if (pos + left > i_size) if (pos + len > i_size)
*checkeof = CHECK_EOF; *checkeof = CHECK_EOF;
} }
...@@ -536,15 +525,16 @@ static int striped_read(struct inode *inode, ...@@ -536,15 +525,16 @@ static int striped_read(struct inode *inode,
* *
* If the read spans object boundary, just do multiple reads. * If the read spans object boundary, just do multiple reads.
*/ */
static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
int *checkeof) int *checkeof)
{ {
struct file *file = iocb->ki_filp; struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
struct page **pages; struct page **pages;
u64 off = iocb->ki_pos; u64 off = iocb->ki_pos;
int num_pages, ret; int num_pages;
size_t len = iov_iter_count(i); ssize_t ret;
size_t len = iov_iter_count(to);
dout("sync_read on file %p %llu~%u %s\n", file, off, dout("sync_read on file %p %llu~%u %s\n", file, off,
(unsigned)len, (unsigned)len,
...@@ -563,35 +553,56 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, ...@@ -563,35 +553,56 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
if (ret < 0) if (ret < 0)
return ret; return ret;
num_pages = calc_pages_for(off, len); if (unlikely(to->type & ITER_PIPE)) {
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); size_t page_off;
if (IS_ERR(pages)) ret = iov_iter_get_pages_alloc(to, &pages, len,
return PTR_ERR(pages); &page_off);
ret = striped_read(inode, off, len, pages, if (ret <= 0)
num_pages, checkeof); return -ENOMEM;
if (ret > 0) { num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
int l, k = 0;
size_t left = ret; ret = striped_read(inode, off, ret, pages, num_pages,
page_off, checkeof);
while (left) { if (ret > 0) {
size_t page_off = off & ~PAGE_MASK; iov_iter_advance(to, ret);
size_t copy = min_t(size_t, left, off += ret;
PAGE_SIZE - page_off); } else {
l = copy_page_to_iter(pages[k++], page_off, copy, i); iov_iter_advance(to, 0);
off += l; }
left -= l; ceph_put_page_vector(pages, num_pages, false);
if (l < copy) } else {
break; num_pages = calc_pages_for(off, len);
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages))
return PTR_ERR(pages);
ret = striped_read(inode, off, len, pages, num_pages,
(off & ~PAGE_MASK), checkeof);
if (ret > 0) {
int l, k = 0;
size_t left = ret;
while (left) {
size_t page_off = off & ~PAGE_MASK;
size_t copy = min_t(size_t, left,
PAGE_SIZE - page_off);
l = copy_page_to_iter(pages[k++], page_off,
copy, to);
off += l;
left -= l;
if (l < copy)
break;
}
} }
ceph_release_page_vector(pages, num_pages);
} }
ceph_release_page_vector(pages, num_pages);
if (off > iocb->ki_pos) { if (off > iocb->ki_pos) {
ret = off - iocb->ki_pos; ret = off - iocb->ki_pos;
iocb->ki_pos = off; iocb->ki_pos = off;
} }
dout("sync_read result %d\n", ret); dout("sync_read result %zd\n", ret);
return ret; return ret;
} }
...@@ -1771,6 +1782,7 @@ const struct file_operations ceph_file_fops = { ...@@ -1771,6 +1782,7 @@ const struct file_operations ceph_file_fops = {
.fsync = ceph_fsync, .fsync = ceph_fsync,
.lock = ceph_lock, .lock = ceph_lock,
.flock = ceph_flock, .flock = ceph_flock,
.splice_read = generic_file_splice_read,
.splice_write = iter_file_splice_write, .splice_write = iter_file_splice_write,
.unlocked_ioctl = ceph_ioctl, .unlocked_ioctl = ceph_ioctl,
.compat_ioctl = ceph_ioctl, .compat_ioctl = ceph_ioctl,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment