Commit a81bc310 authored by Jeff Layton's avatar Jeff Layton Committed by Ilya Dryomov

ceph: take the inode lock before acquiring cap refs

Most of the time, we (or the vfs layer) takes the inode_lock and then
acquires caps, but ceph_read_iter does the opposite, and that can lead
to a deadlock.

When there are multiple clients treading over the same data, we can end
up in a situation where a reader takes caps and then tries to acquire
the inode_lock. Another task holds the inode_lock and issues a request
to the MDS which needs to revoke the caps, but that can't happen until
the inode_lock is unwedged.

Fix this by having ceph_read_iter take the inode_lock earlier, before
attempting to acquire caps.

Fixes: 321fe13c ("ceph: add buffered/direct exclusionary locking for reads and writes")
Link: https://tracker.ceph.com/issues/36348Signed-off-by: default avatarJeff Layton <jlayton@kernel.org>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 31f4f5b4
...@@ -1264,14 +1264,24 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -1264,14 +1264,24 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode); inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
if (iocb->ki_flags & IOCB_DIRECT)
ceph_start_io_direct(inode);
else
ceph_start_io_read(inode);
if (fi->fmode & CEPH_FILE_MODE_LAZY) if (fi->fmode & CEPH_FILE_MODE_LAZY)
want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
else else
want = CEPH_CAP_FILE_CACHE; want = CEPH_CAP_FILE_CACHE;
ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1, ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1,
&got, &pinned_page); &got, &pinned_page);
if (ret < 0) if (ret < 0) {
if (iocb->ki_flags & IOCB_DIRECT)
ceph_end_io_direct(inode);
else
ceph_end_io_read(inode);
return ret; return ret;
}
if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
(iocb->ki_flags & IOCB_DIRECT) || (iocb->ki_flags & IOCB_DIRECT) ||
...@@ -1283,16 +1293,12 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -1283,16 +1293,12 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
if (ci->i_inline_version == CEPH_INLINE_NONE) { if (ci->i_inline_version == CEPH_INLINE_NONE) {
if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) { if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
ceph_start_io_direct(inode);
ret = ceph_direct_read_write(iocb, to, ret = ceph_direct_read_write(iocb, to,
NULL, NULL); NULL, NULL);
ceph_end_io_direct(inode);
if (ret >= 0 && ret < len) if (ret >= 0 && ret < len)
retry_op = CHECK_EOF; retry_op = CHECK_EOF;
} else { } else {
ceph_start_io_read(inode);
ret = ceph_sync_read(iocb, to, &retry_op); ret = ceph_sync_read(iocb, to, &retry_op);
ceph_end_io_read(inode);
} }
} else { } else {
retry_op = READ_INLINE; retry_op = READ_INLINE;
...@@ -1303,11 +1309,10 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -1303,11 +1309,10 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
ceph_cap_string(got)); ceph_cap_string(got));
ceph_add_rw_context(fi, &rw_ctx); ceph_add_rw_context(fi, &rw_ctx);
ceph_start_io_read(inode);
ret = generic_file_read_iter(iocb, to); ret = generic_file_read_iter(iocb, to);
ceph_end_io_read(inode);
ceph_del_rw_context(fi, &rw_ctx); ceph_del_rw_context(fi, &rw_ctx);
} }
dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
if (pinned_page) { if (pinned_page) {
...@@ -1315,6 +1320,12 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -1315,6 +1320,12 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
pinned_page = NULL; pinned_page = NULL;
} }
ceph_put_cap_refs(ci, got); ceph_put_cap_refs(ci, got);
if (iocb->ki_flags & IOCB_DIRECT)
ceph_end_io_direct(inode);
else
ceph_end_io_read(inode);
if (retry_op > HAVE_RETRIED && ret >= 0) { if (retry_op > HAVE_RETRIED && ret >= 0) {
int statret; int statret;
struct page *page = NULL; struct page *page = NULL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment