Commit 2b1ac852 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: try getting buffer capability for readahead/fadvise

For readahead/fadvise cases, caller of ceph_readpages does not
hold buffer capability. Pages can be added to page cache while
there is no buffer capability. This can cause data integrity
issue.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 5c341ee3
...@@ -315,7 +315,32 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) ...@@ -315,7 +315,32 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
struct page **pages; struct page **pages;
pgoff_t next_index; pgoff_t next_index;
int nr_pages = 0; int nr_pages = 0;
int ret; int got = 0;
int ret = 0;
if (!current->journal_info) {
/* caller of readpages does not hold buffer and read caps
* (fadvise, madvise and readahead cases) */
int want = CEPH_CAP_FILE_CACHE;
ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got);
if (ret < 0) {
dout("start_read %p, error getting cap\n", inode);
} else if (!(got & want)) {
dout("start_read %p, no cache cap\n", inode);
ret = 0;
}
if (ret <= 0) {
if (got)
ceph_put_cap_refs(ci, got);
while (!list_empty(page_list)) {
page = list_entry(page_list->prev,
struct page, lru);
list_del(&page->lru);
put_page(page);
}
return ret;
}
}
off = (u64) page_offset(page); off = (u64) page_offset(page);
...@@ -338,15 +363,18 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) ...@@ -338,15 +363,18 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
CEPH_OSD_FLAG_READ, NULL, CEPH_OSD_FLAG_READ, NULL,
ci->i_truncate_seq, ci->i_truncate_size, ci->i_truncate_seq, ci->i_truncate_size,
false); false);
if (IS_ERR(req)) if (IS_ERR(req)) {
return PTR_ERR(req); ret = PTR_ERR(req);
goto out;
}
/* build page vector */ /* build page vector */
nr_pages = calc_pages_for(0, len); nr_pages = calc_pages_for(0, len);
pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL); pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
if (!pages) {
ret = -ENOMEM; ret = -ENOMEM;
if (!pages) goto out_put;
goto out; }
for (i = 0; i < nr_pages; ++i) { for (i = 0; i < nr_pages; ++i) {
page = list_entry(page_list->prev, struct page, lru); page = list_entry(page_list->prev, struct page, lru);
BUG_ON(PageLocked(page)); BUG_ON(PageLocked(page));
...@@ -378,6 +406,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) ...@@ -378,6 +406,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
if (ret < 0) if (ret < 0)
goto out_pages; goto out_pages;
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
/* After adding locked pages to page cache, the inode holds cache cap.
* So we can drop our cap refs. */
if (got)
ceph_put_cap_refs(ci, got);
return nr_pages; return nr_pages;
out_pages: out_pages:
...@@ -386,8 +420,11 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) ...@@ -386,8 +420,11 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
unlock_page(pages[i]); unlock_page(pages[i]);
} }
ceph_put_page_vector(pages, nr_pages, false); ceph_put_page_vector(pages, nr_pages, false);
out: out_put:
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
out:
if (got)
ceph_put_cap_refs(ci, got);
return ret; return ret;
} }
...@@ -424,7 +461,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, ...@@ -424,7 +461,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
rc = start_read(inode, page_list, max); rc = start_read(inode, page_list, max);
if (rc < 0) if (rc < 0)
goto out; goto out;
BUG_ON(rc == 0);
} }
out: out:
ceph_fscache_readpages_cancel(inode, page_list); ceph_fscache_readpages_cancel(inode, page_list);
...@@ -1371,9 +1407,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) ...@@ -1371,9 +1407,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got)); inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
ci->i_inline_version == CEPH_INLINE_NONE) ci->i_inline_version == CEPH_INLINE_NONE) {
current->journal_info = vma->vm_file;
ret = filemap_fault(vma, vmf); ret = filemap_fault(vma, vmf);
else current->journal_info = NULL;
} else
ret = -EAGAIN; ret = -EAGAIN;
dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
......
...@@ -2479,6 +2479,27 @@ static void check_max_size(struct inode *inode, loff_t endoff) ...@@ -2479,6 +2479,27 @@ static void check_max_size(struct inode *inode, loff_t endoff)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
} }
int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
{
int ret, err = 0;
BUG_ON(need & ~CEPH_CAP_FILE_RD);
BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
ret = ceph_pool_perm_check(ci, need);
if (ret < 0)
return ret;
ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
if (ret) {
if (err == -EAGAIN) {
ret = 0;
} else if (err < 0) {
ret = err;
}
}
return ret;
}
/* /*
* Wait for caps, and take cap references. If we can't get a WR cap * Wait for caps, and take cap references. If we can't get a WR cap
* due to a small max_size, make sure we check_max_size (and possibly * due to a small max_size, make sure we check_max_size (and possibly
......
...@@ -1249,8 +1249,9 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -1249,8 +1249,9 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
ceph_cap_string(got)); ceph_cap_string(got));
current->journal_info = filp;
ret = generic_file_read_iter(iocb, to); ret = generic_file_read_iter(iocb, to);
current->journal_info = NULL;
} }
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
......
...@@ -905,6 +905,8 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn, ...@@ -905,6 +905,8 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page); loff_t endoff, int *got, struct page **pinned_page);
extern int ceph_try_get_caps(struct ceph_inode_info *ci,
int need, int want, int *got);
/* for counting open files by mode */ /* for counting open files by mode */
extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode); extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment