Commit 83701246 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: sync read inline data

we can't use getattr to fetch inline data while holding Fr cap,
because it can cause deadlock. If we need to sync read inline data,
drop cap refs first, then use getattr to fetch inline data.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 3738daa6
...@@ -192,17 +192,30 @@ static int readpage_nounlock(struct file *filp, struct page *page) ...@@ -192,17 +192,30 @@ static int readpage_nounlock(struct file *filp, struct page *page)
struct ceph_osd_client *osdc = struct ceph_osd_client *osdc =
&ceph_inode_to_client(inode)->client->osdc; &ceph_inode_to_client(inode)->client->osdc;
int err = 0; int err = 0;
u64 off = page_offset(page);
u64 len = PAGE_CACHE_SIZE; u64 len = PAGE_CACHE_SIZE;
err = ceph_readpage_from_fscache(inode, page); if (off >= i_size_read(inode)) {
zero_user_segment(page, err, PAGE_CACHE_SIZE);
SetPageUptodate(page);
return 0;
}
/*
* Uptodate inline data should have been added into page cache
* while getting Fcr caps.
*/
if (ci->i_inline_version != CEPH_INLINE_NONE)
return -EINVAL;
err = ceph_readpage_from_fscache(inode, page);
if (err == 0) if (err == 0)
goto out; goto out;
dout("readpage inode %p file %p page %p index %lu\n", dout("readpage inode %p file %p page %p index %lu\n",
inode, filp, page, page->index); inode, filp, page, page->index);
err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
(u64) page_offset(page), &len, off, &len,
ci->i_truncate_seq, ci->i_truncate_size, ci->i_truncate_seq, ci->i_truncate_size,
&page, 1, 0); &page, 1, 0);
if (err == -ENOENT) if (err == -ENOENT)
...@@ -384,6 +397,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, ...@@ -384,6 +397,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
int rc = 0; int rc = 0;
int max = 0; int max = 0;
if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
return -EINVAL;
rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list, rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
&nr_pages); &nr_pages);
...@@ -1219,8 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) ...@@ -1219,8 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
want = CEPH_CAP_FILE_CACHE; want = CEPH_CAP_FILE_CACHE;
while (1) { while (1) {
got = 0; got = 0;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want,
&got, &pinned_page); -1, &got, &pinned_page);
if (ret == 0) if (ret == 0)
break; break;
if (ret != -ERESTARTSYS) { if (ret != -ERESTARTSYS) {
...@@ -1231,7 +1247,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) ...@@ -1231,7 +1247,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
dout("filemap_fault %p %llu~%zd got cap refs on %s\n", dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got)); inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
ci->i_inline_version == CEPH_INLINE_NONE)
ret = filemap_fault(vma, vmf); ret = filemap_fault(vma, vmf);
else
ret = -EAGAIN;
dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
...@@ -1239,6 +1259,42 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) ...@@ -1239,6 +1259,42 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
page_cache_release(pinned_page); page_cache_release(pinned_page);
ceph_put_cap_refs(ci, got); ceph_put_cap_refs(ci, got);
if (ret != -EAGAIN)
return ret;
/* read inline data */
if (off >= PAGE_CACHE_SIZE) {
/* does not support inline data > PAGE_SIZE */
ret = VM_FAULT_SIGBUS;
} else {
int ret1;
struct address_space *mapping = inode->i_mapping;
struct page *page = find_or_create_page(mapping, 0,
mapping_gfp_mask(mapping) &
~__GFP_FS);
if (!page) {
ret = VM_FAULT_OOM;
goto out;
}
ret1 = __ceph_do_getattr(inode, page,
CEPH_STAT_CAP_INLINE_DATA, true);
if (ret1 < 0 || off >= i_size_read(inode)) {
unlock_page(page);
page_cache_release(page);
ret = VM_FAULT_SIGBUS;
goto out;
}
if (ret1 < PAGE_CACHE_SIZE)
zero_user_segment(page, ret1, PAGE_CACHE_SIZE);
else
flush_dcache_page(page);
SetPageUptodate(page);
vmf->page = page;
ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
}
out:
dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
inode, off, (size_t)PAGE_CACHE_SIZE, ret);
return ret; return ret;
} }
......
...@@ -333,6 +333,11 @@ int ceph_release(struct inode *inode, struct file *file) ...@@ -333,6 +333,11 @@ int ceph_release(struct inode *inode, struct file *file)
return 0; return 0;
} }
enum {
CHECK_EOF = 1,
READ_INLINE = 2,
};
/* /*
* Read a range of bytes striped over one or more objects. Iterate over * Read a range of bytes striped over one or more objects. Iterate over
* objects we stripe over. (That's not atomic, but good enough for now.) * objects we stripe over. (That's not atomic, but good enough for now.)
...@@ -412,7 +417,7 @@ static int striped_read(struct inode *inode, ...@@ -412,7 +417,7 @@ static int striped_read(struct inode *inode,
ret = read; ret = read;
/* did we bounce off eof? */ /* did we bounce off eof? */
if (pos + left > inode->i_size) if (pos + left > inode->i_size)
*checkeof = 1; *checkeof = CHECK_EOF;
} }
dout("striped_read returns %d\n", ret); dout("striped_read returns %d\n", ret);
...@@ -808,7 +813,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -808,7 +813,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
struct page *pinned_page = NULL; struct page *pinned_page = NULL;
ssize_t ret; ssize_t ret;
int want, got = 0; int want, got = 0;
int checkeof = 0, read = 0; int retry_op = 0, read = 0;
again: again:
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
...@@ -830,8 +835,12 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -830,8 +835,12 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
ceph_cap_string(got)); ceph_cap_string(got));
if (ci->i_inline_version == CEPH_INLINE_NONE) {
/* hmm, this isn't really async... */ /* hmm, this isn't really async... */
ret = ceph_sync_read(iocb, to, &checkeof); ret = ceph_sync_read(iocb, to, &retry_op);
} else {
retry_op = READ_INLINE;
}
} else { } else {
dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
...@@ -846,12 +855,50 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -846,12 +855,50 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
pinned_page = NULL; pinned_page = NULL;
} }
ceph_put_cap_refs(ci, got); ceph_put_cap_refs(ci, got);
if (retry_op && ret >= 0) {
int statret;
struct page *page = NULL;
loff_t i_size;
if (retry_op == READ_INLINE) {
page = __page_cache_alloc(GFP_NOFS);
if (!page)
return -ENOMEM;
}
if (checkeof && ret >= 0) { statret = __ceph_do_getattr(inode, page,
int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); CEPH_STAT_CAP_INLINE_DATA, !!page);
if (statret < 0) {
__free_page(page);
if (statret == -ENODATA) {
BUG_ON(retry_op != READ_INLINE);
goto again;
}
return statret;
}
i_size = i_size_read(inode);
if (retry_op == READ_INLINE) {
/* does not support inline data > PAGE_SIZE */
if (i_size > PAGE_CACHE_SIZE) {
ret = -EIO;
} else if (iocb->ki_pos < i_size) {
loff_t end = min_t(loff_t, i_size,
iocb->ki_pos + len);
if (statret < end)
zero_user_segment(page, statret, end);
ret = copy_page_to_iter(page,
iocb->ki_pos & ~PAGE_MASK,
end - iocb->ki_pos, to);
iocb->ki_pos += ret;
} else {
ret = 0;
}
__free_pages(page, 0);
return ret;
}
/* hit EOF or hole? */ /* hit EOF or hole? */
if (statret == 0 && iocb->ki_pos < inode->i_size && if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
ret < len) { ret < len) {
dout("sync_read hit hole, ppos %lld < size %lld" dout("sync_read hit hole, ppos %lld < size %lld"
", reading more\n", iocb->ki_pos, ", reading more\n", iocb->ki_pos,
...@@ -859,7 +906,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -859,7 +906,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
read += ret; read += ret;
len -= ret; len -= ret;
checkeof = 0; retry_op = 0;
goto again; goto again;
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment