Commit 3738daa6 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: fetch inline data when getting Fcr cap refs

we can't use getattr to fetch inline data after getting Fcr caps,
because it can cause deadlock. The solution is try bringing inline
data to page cache when not holding any cap, and hope the inline
data page is still there after getting the Fcr caps. If the page
is still there, pin it in page cache for later IO.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 01deead0
...@@ -1207,6 +1207,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) ...@@ -1207,6 +1207,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
struct inode *inode = file_inode(vma->vm_file); struct inode *inode = file_inode(vma->vm_file);
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi = vma->vm_file->private_data; struct ceph_file_info *fi = vma->vm_file->private_data;
struct page *pinned_page = NULL;
loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT; loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
int want, got, ret; int want, got, ret;
...@@ -1218,7 +1219,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) ...@@ -1218,7 +1219,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
want = CEPH_CAP_FILE_CACHE; want = CEPH_CAP_FILE_CACHE;
while (1) { while (1) {
got = 0; got = 0;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1,
&got, &pinned_page);
if (ret == 0) if (ret == 0)
break; break;
if (ret != -ERESTARTSYS) { if (ret != -ERESTARTSYS) {
...@@ -1233,6 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) ...@@ -1233,6 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
if (pinned_page)
page_cache_release(pinned_page);
ceph_put_cap_refs(ci, got); ceph_put_cap_refs(ci, got);
return ret; return ret;
...@@ -1266,7 +1270,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ...@@ -1266,7 +1270,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
want = CEPH_CAP_FILE_BUFFER; want = CEPH_CAP_FILE_BUFFER;
while (1) { while (1) {
got = 0; got = 0;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len); ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
&got, NULL);
if (ret == 0) if (ret == 0)
break; break;
if (ret != -ERESTARTSYS) { if (ret != -ERESTARTSYS) {
......
...@@ -2057,15 +2057,17 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) ...@@ -2057,15 +2057,17 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
* requested from the MDS. * requested from the MDS.
*/ */
static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
int *got, loff_t endoff, int *check_max, int *err) loff_t endoff, int *got, struct page **pinned_page,
int *check_max, int *err)
{ {
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
int ret = 0; int ret = 0;
int have, implemented; int have, implemented, _got = 0;
int file_wanted; int file_wanted;
dout("get_cap_refs %p need %s want %s\n", inode, dout("get_cap_refs %p need %s want %s\n", inode,
ceph_cap_string(need), ceph_cap_string(want)); ceph_cap_string(need), ceph_cap_string(want));
again:
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
/* make sure file is actually open */ /* make sure file is actually open */
...@@ -2075,7 +2077,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, ...@@ -2075,7 +2077,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
ceph_cap_string(need), ceph_cap_string(file_wanted)); ceph_cap_string(need), ceph_cap_string(file_wanted));
*err = -EBADF; *err = -EBADF;
ret = 1; ret = 1;
goto out; goto out_unlock;
} }
/* finish pending truncate */ /* finish pending truncate */
...@@ -2095,7 +2097,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, ...@@ -2095,7 +2097,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
*check_max = 1; *check_max = 1;
ret = 1; ret = 1;
} }
goto out; goto out_unlock;
} }
/* /*
* If a sync write is in progress, we must wait, so that we * If a sync write is in progress, we must wait, so that we
...@@ -2103,7 +2105,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, ...@@ -2103,7 +2105,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
*/ */
if (__ceph_have_pending_cap_snap(ci)) { if (__ceph_have_pending_cap_snap(ci)) {
dout("get_cap_refs %p cap_snap_pending\n", inode); dout("get_cap_refs %p cap_snap_pending\n", inode);
goto out; goto out_unlock;
} }
} }
...@@ -2120,18 +2122,50 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, ...@@ -2120,18 +2122,50 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
inode, ceph_cap_string(have), ceph_cap_string(not), inode, ceph_cap_string(have), ceph_cap_string(not),
ceph_cap_string(revoking)); ceph_cap_string(revoking));
if ((revoking & not) == 0) { if ((revoking & not) == 0) {
*got = need | (have & want); _got = need | (have & want);
__take_cap_refs(ci, *got); __take_cap_refs(ci, _got);
ret = 1; ret = 1;
} }
} else { } else {
dout("get_cap_refs %p have %s needed %s\n", inode, dout("get_cap_refs %p have %s needed %s\n", inode,
ceph_cap_string(have), ceph_cap_string(need)); ceph_cap_string(have), ceph_cap_string(need));
} }
out: out_unlock:
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
i_size_read(inode) > 0) {
int ret1;
struct page *page = find_get_page(inode->i_mapping, 0);
if (page) {
if (PageUptodate(page)) {
*pinned_page = page;
goto out;
}
page_cache_release(page);
}
/*
* drop cap refs first because getattr while holding
* caps refs can cause deadlock.
*/
ceph_put_cap_refs(ci, _got);
_got = 0;
/* getattr request will bring inline data into page cache */
ret1 = __ceph_do_getattr(inode, NULL,
CEPH_STAT_CAP_INLINE_DATA, true);
if (ret1 >= 0) {
ret = 0;
goto again;
}
*err = ret1;
ret = 1;
}
out:
dout("get_cap_refs %p ret %d got %s\n", inode, dout("get_cap_refs %p ret %d got %s\n", inode,
ret, ceph_cap_string(*got)); ret, ceph_cap_string(_got));
*got = _got;
return ret; return ret;
} }
...@@ -2168,8 +2202,8 @@ static void check_max_size(struct inode *inode, loff_t endoff) ...@@ -2168,8 +2202,8 @@ static void check_max_size(struct inode *inode, loff_t endoff)
* due to a small max_size, make sure we check_max_size (and possibly * due to a small max_size, make sure we check_max_size (and possibly
* ask the mds) so we don't get hung up indefinitely. * ask the mds) so we don't get hung up indefinitely.
*/ */
int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got, int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff) loff_t endoff, int *got, struct page **pinned_page)
{ {
int check_max, ret, err; int check_max, ret, err;
...@@ -2179,8 +2213,8 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got, ...@@ -2179,8 +2213,8 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got,
check_max = 0; check_max = 0;
err = 0; err = 0;
ret = wait_event_interruptible(ci->i_cap_wq, ret = wait_event_interruptible(ci->i_cap_wq,
try_get_cap_refs(ci, need, want, try_get_cap_refs(ci, need, want, endoff,
got, endoff, got, pinned_page,
&check_max, &err)); &check_max, &err));
if (err) if (err)
ret = err; ret = err;
......
...@@ -805,6 +805,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -805,6 +805,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
size_t len = iocb->ki_nbytes; size_t len = iocb->ki_nbytes;
struct inode *inode = file_inode(filp); struct inode *inode = file_inode(filp);
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct page *pinned_page = NULL;
ssize_t ret; ssize_t ret;
int want, got = 0; int want, got = 0;
int checkeof = 0, read = 0; int checkeof = 0, read = 0;
...@@ -817,7 +818,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -817,7 +818,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else else
want = CEPH_CAP_FILE_CACHE; want = CEPH_CAP_FILE_CACHE;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
if (ret < 0) if (ret < 0)
return ret; return ret;
...@@ -840,6 +841,10 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -840,6 +841,10 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
} }
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
if (pinned_page) {
page_cache_release(pinned_page);
pinned_page = NULL;
}
ceph_put_cap_refs(ci, got); ceph_put_cap_refs(ci, got);
if (checkeof && ret >= 0) { if (checkeof && ret >= 0) {
...@@ -924,7 +929,8 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) ...@@ -924,7 +929,8 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
else else
want = CEPH_CAP_FILE_BUFFER; want = CEPH_CAP_FILE_BUFFER;
got = 0; got = 0;
err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count); err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
&got, NULL);
if (err < 0) if (err < 0)
goto out; goto out;
...@@ -1225,7 +1231,7 @@ static long ceph_fallocate(struct file *file, int mode, ...@@ -1225,7 +1231,7 @@ static long ceph_fallocate(struct file *file, int mode,
else else
want = CEPH_CAP_FILE_BUFFER; want = CEPH_CAP_FILE_BUFFER;
ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
if (ret < 0) if (ret < 0)
goto unlock; goto unlock;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment