Commit 604d1b02 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: take snap_rwsem when accessing snap realm's cached_context

When ceph inode's i_head_snapc is NULL, __ceph_mark_dirty_caps()
accesses snap realm's cached_context. So we need take read lock
of snap_rwsem.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 86056090
...@@ -1413,9 +1413,11 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ...@@ -1413,9 +1413,11 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
ceph_cap_string(was | mask)); ceph_cap_string(was | mask));
ci->i_dirty_caps |= mask; ci->i_dirty_caps |= mask;
if (was == 0) { if (was == 0) {
if (!ci->i_head_snapc) if (!ci->i_head_snapc) {
WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem));
ci->i_head_snapc = ceph_get_snap_context( ci->i_head_snapc = ceph_get_snap_context(
ci->i_snap_realm->cached_context); ci->i_snap_realm->cached_context);
}
dout(" inode %p now dirty snapc %p auth cap %p\n", dout(" inode %p now dirty snapc %p auth cap %p\n",
&ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
BUG_ON(!list_empty(&ci->i_dirty_item)); BUG_ON(!list_empty(&ci->i_dirty_item));
......
...@@ -1727,6 +1727,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ...@@ -1727,6 +1727,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
int mask = 0; int mask = 0;
int err = 0; int err = 0;
int inode_dirty_flags = 0; int inode_dirty_flags = 0;
bool lock_snap_rwsem = false;
if (ceph_snap(inode) != CEPH_NOSNAP) if (ceph_snap(inode) != CEPH_NOSNAP)
return -EROFS; return -EROFS;
...@@ -1742,6 +1743,18 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ...@@ -1742,6 +1743,18 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
issued = __ceph_caps_issued(ci, NULL); issued = __ceph_caps_issued(ci, NULL);
if (!ci->i_head_snapc &&
(issued & (CEPH_CAP_ANY_EXCL | CEPH_CAP_FILE_WR))) {
lock_snap_rwsem = true;
if (!down_read_trylock(&mdsc->snap_rwsem)) {
spin_unlock(&ci->i_ceph_lock);
down_read(&mdsc->snap_rwsem);
spin_lock(&ci->i_ceph_lock);
issued = __ceph_caps_issued(ci, NULL);
}
}
dout("setattr %p issued %s\n", inode, ceph_cap_string(issued)); dout("setattr %p issued %s\n", inode, ceph_cap_string(issued));
if (ia_valid & ATTR_UID) { if (ia_valid & ATTR_UID) {
...@@ -1890,6 +1903,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ...@@ -1890,6 +1903,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
release &= issued; release &= issued;
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (lock_snap_rwsem)
up_read(&mdsc->snap_rwsem);
if (inode_dirty_flags) if (inode_dirty_flags)
__mark_inode_dirty(inode, inode_dirty_flags); __mark_inode_dirty(inode, inode_dirty_flags);
......
...@@ -911,6 +911,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, ...@@ -911,6 +911,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
struct inode *inode = d_inode(dentry); struct inode *inode = d_inode(dentry);
struct ceph_vxattr *vxattr; struct ceph_vxattr *vxattr;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
int issued; int issued;
int err; int err;
int dirty = 0; int dirty = 0;
...@@ -920,6 +921,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, ...@@ -920,6 +921,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
char *newval = NULL; char *newval = NULL;
struct ceph_inode_xattr *xattr = NULL; struct ceph_inode_xattr *xattr = NULL;
int required_blob_size; int required_blob_size;
bool lock_snap_rwsem = false;
if (!ceph_is_valid_xattr(name)) if (!ceph_is_valid_xattr(name))
return -EOPNOTSUPP; return -EOPNOTSUPP;
...@@ -951,9 +953,20 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, ...@@ -951,9 +953,20 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
retry: retry:
issued = __ceph_caps_issued(ci, NULL); issued = __ceph_caps_issued(ci, NULL);
dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
goto do_sync; goto do_sync;
if (!lock_snap_rwsem && !ci->i_head_snapc) {
lock_snap_rwsem = true;
if (!down_read_trylock(&mdsc->snap_rwsem)) {
spin_unlock(&ci->i_ceph_lock);
down_read(&mdsc->snap_rwsem);
spin_lock(&ci->i_ceph_lock);
goto retry;
}
}
dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
__build_xattrs(inode); __build_xattrs(inode);
required_blob_size = __get_required_blob_size(ci, name_len, val_len); required_blob_size = __get_required_blob_size(ci, name_len, val_len);
...@@ -966,7 +979,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, ...@@ -966,7 +979,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
dout(" preaallocating new blob size=%d\n", required_blob_size); dout(" preaallocating new blob size=%d\n", required_blob_size);
blob = ceph_buffer_new(required_blob_size, GFP_NOFS); blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
if (!blob) if (!blob)
goto out; goto do_sync_unlocked;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (ci->i_xattrs.prealloc_blob) if (ci->i_xattrs.prealloc_blob)
ceph_buffer_put(ci->i_xattrs.prealloc_blob); ceph_buffer_put(ci->i_xattrs.prealloc_blob);
...@@ -984,6 +997,8 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, ...@@ -984,6 +997,8 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (lock_snap_rwsem)
up_read(&mdsc->snap_rwsem);
if (dirty) if (dirty)
__mark_inode_dirty(inode, dirty); __mark_inode_dirty(inode, dirty);
return err; return err;
...@@ -991,6 +1006,8 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, ...@@ -991,6 +1006,8 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
do_sync: do_sync:
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
do_sync_unlocked: do_sync_unlocked:
if (lock_snap_rwsem)
up_read(&mdsc->snap_rwsem);
err = ceph_sync_setxattr(dentry, name, value, size, flags); err = ceph_sync_setxattr(dentry, name, value, size, flags);
out: out:
kfree(newname); kfree(newname);
...@@ -1044,10 +1061,12 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) ...@@ -1044,10 +1061,12 @@ int __ceph_removexattr(struct dentry *dentry, const char *name)
struct inode *inode = d_inode(dentry); struct inode *inode = d_inode(dentry);
struct ceph_vxattr *vxattr; struct ceph_vxattr *vxattr;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
int issued; int issued;
int err; int err;
int required_blob_size; int required_blob_size;
int dirty; int dirty;
bool lock_snap_rwsem = false;
if (!ceph_is_valid_xattr(name)) if (!ceph_is_valid_xattr(name))
return -EOPNOTSUPP; return -EOPNOTSUPP;
...@@ -1064,10 +1083,21 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) ...@@ -1064,10 +1083,21 @@ int __ceph_removexattr(struct dentry *dentry, const char *name)
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
retry: retry:
issued = __ceph_caps_issued(ci, NULL); issued = __ceph_caps_issued(ci, NULL);
dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
goto do_sync; goto do_sync;
if (!lock_snap_rwsem && !ci->i_head_snapc) {
lock_snap_rwsem = true;
if (!down_read_trylock(&mdsc->snap_rwsem)) {
spin_unlock(&ci->i_ceph_lock);
down_read(&mdsc->snap_rwsem);
spin_lock(&ci->i_ceph_lock);
goto retry;
}
}
dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
__build_xattrs(inode); __build_xattrs(inode);
required_blob_size = __get_required_blob_size(ci, 0, 0); required_blob_size = __get_required_blob_size(ci, 0, 0);
...@@ -1080,7 +1110,7 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) ...@@ -1080,7 +1110,7 @@ int __ceph_removexattr(struct dentry *dentry, const char *name)
dout(" preaallocating new blob size=%d\n", required_blob_size); dout(" preaallocating new blob size=%d\n", required_blob_size);
blob = ceph_buffer_new(required_blob_size, GFP_NOFS); blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
if (!blob) if (!blob)
goto out; goto do_sync_unlocked;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (ci->i_xattrs.prealloc_blob) if (ci->i_xattrs.prealloc_blob)
ceph_buffer_put(ci->i_xattrs.prealloc_blob); ceph_buffer_put(ci->i_xattrs.prealloc_blob);
...@@ -1094,14 +1124,17 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) ...@@ -1094,14 +1124,17 @@ int __ceph_removexattr(struct dentry *dentry, const char *name)
ci->i_xattrs.dirty = true; ci->i_xattrs.dirty = true;
inode->i_ctime = CURRENT_TIME; inode->i_ctime = CURRENT_TIME;
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
if (lock_snap_rwsem)
up_read(&mdsc->snap_rwsem);
if (dirty) if (dirty)
__mark_inode_dirty(inode, dirty); __mark_inode_dirty(inode, dirty);
return err; return err;
do_sync: do_sync:
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
do_sync_unlocked: do_sync_unlocked:
if (lock_snap_rwsem)
up_read(&mdsc->snap_rwsem);
err = ceph_send_removexattr(dentry, name); err = ceph_send_removexattr(dentry, name);
out:
return err; return err;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment