Commit 508b32d8 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: request xattrs if xattr_version is zero

Following sequence of events can happen.
  - Client releases an inode, queues cap release message.
  - A 'lookup' reply brings the same inode back, but the reply
    doesn't contain xattrs because MDS didn't receive the cap release
    message and thought client already has up-to-data xattrs.

The fix is force sending a getattr request to MDS if xattrs_version
is 0. The getattr mask is set to CEPH_STAT_CAP_XATTR, so MDS knows client
does not have xattr.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent b76f8239
...@@ -826,8 +826,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) ...@@ -826,8 +826,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
ceph_put_cap_refs(ci, got); ceph_put_cap_refs(ci, got);
if (checkeof && ret >= 0) { if (checkeof && ret >= 0) {
int statret = ceph_do_getattr(inode, int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
CEPH_STAT_CAP_SIZE);
/* hit EOF or hole? */ /* hit EOF or hole? */
if (statret == 0 && iocb->ki_pos < inode->i_size && if (statret == 0 && iocb->ki_pos < inode->i_size &&
...@@ -995,7 +994,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) ...@@ -995,7 +994,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
mutex_lock(&inode->i_mutex); mutex_lock(&inode->i_mutex);
if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) { if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
if (ret < 0) { if (ret < 0) {
offset = ret; offset = ret;
goto out; goto out;
......
...@@ -766,7 +766,7 @@ static int fill_inode(struct inode *inode, ...@@ -766,7 +766,7 @@ static int fill_inode(struct inode *inode,
/* xattrs */ /* xattrs */
/* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */ /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) &&
le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) { le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
if (ci->i_xattrs.blob) if (ci->i_xattrs.blob)
ceph_buffer_put(ci->i_xattrs.blob); ceph_buffer_put(ci->i_xattrs.blob);
...@@ -1907,7 +1907,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ...@@ -1907,7 +1907,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
* Verify that we have a lease on the given mask. If not, * Verify that we have a lease on the given mask. If not,
* do a getattr against an mds. * do a getattr against an mds.
*/ */
int ceph_do_getattr(struct inode *inode, int mask) int ceph_do_getattr(struct inode *inode, int mask, bool force)
{ {
struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_client *mdsc = fsc->mdsc;
...@@ -1920,7 +1920,7 @@ int ceph_do_getattr(struct inode *inode, int mask) ...@@ -1920,7 +1920,7 @@ int ceph_do_getattr(struct inode *inode, int mask)
} }
dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode); dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
return 0; return 0;
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
...@@ -1948,7 +1948,7 @@ int ceph_permission(struct inode *inode, int mask) ...@@ -1948,7 +1948,7 @@ int ceph_permission(struct inode *inode, int mask)
if (mask & MAY_NOT_BLOCK) if (mask & MAY_NOT_BLOCK)
return -ECHILD; return -ECHILD;
err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED); err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED, false);
if (!err) if (!err)
err = generic_permission(inode, mask); err = generic_permission(inode, mask);
...@@ -1966,7 +1966,7 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, ...@@ -1966,7 +1966,7 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
int err; int err;
err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL); err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL, false);
if (!err) { if (!err) {
generic_fillattr(inode, stat); generic_fillattr(inode, stat);
stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino); stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
......
...@@ -19,7 +19,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg) ...@@ -19,7 +19,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
struct ceph_ioctl_layout l; struct ceph_ioctl_layout l;
int err; int err;
err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT); err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false);
if (!err) { if (!err) {
l.stripe_unit = ceph_file_layout_su(ci->i_layout); l.stripe_unit = ceph_file_layout_su(ci->i_layout);
l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout); l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
...@@ -74,7 +74,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) ...@@ -74,7 +74,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
return -EFAULT; return -EFAULT;
/* validate changed params against current layout */ /* validate changed params against current layout */
err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT); err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false);
if (err) if (err)
return err; return err;
......
...@@ -714,7 +714,7 @@ extern void ceph_queue_vmtruncate(struct inode *inode); ...@@ -714,7 +714,7 @@ extern void ceph_queue_vmtruncate(struct inode *inode);
extern void ceph_queue_invalidate(struct inode *inode); extern void ceph_queue_invalidate(struct inode *inode);
extern void ceph_queue_writeback(struct inode *inode); extern void ceph_queue_writeback(struct inode *inode);
extern int ceph_do_getattr(struct inode *inode, int mask); extern int ceph_do_getattr(struct inode *inode, int mask, bool force);
extern int ceph_permission(struct inode *inode, int mask); extern int ceph_permission(struct inode *inode, int mask);
extern int ceph_setattr(struct dentry *dentry, struct iattr *attr); extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
......
...@@ -736,24 +736,20 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, ...@@ -736,24 +736,20 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
dout("getxattr %p ver=%lld index_ver=%lld\n", inode, dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
ci->i_xattrs.version, ci->i_xattrs.index_version); ci->i_xattrs.version, ci->i_xattrs.index_version);
if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) && if (ci->i_xattrs.version == 0 ||
(ci->i_xattrs.index_version >= ci->i_xattrs.version)) { !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
goto get_xattr;
} else {
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
/* get xattrs from mds (if we don't already have them) */ /* get xattrs from mds (if we don't already have them) */
err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR); err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
if (err) if (err)
return err; return err;
}
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
}
err = __build_xattrs(inode); err = __build_xattrs(inode);
if (err < 0) if (err < 0)
goto out; goto out;
get_xattr:
err = -ENODATA; /* == ENOATTR */ err = -ENODATA; /* == ENOATTR */
xattr = __get_xattr(ci, name); xattr = __get_xattr(ci, name);
if (!xattr) if (!xattr)
...@@ -798,23 +794,18 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) ...@@ -798,23 +794,18 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
dout("listxattr %p ver=%lld index_ver=%lld\n", inode, dout("listxattr %p ver=%lld index_ver=%lld\n", inode,
ci->i_xattrs.version, ci->i_xattrs.index_version); ci->i_xattrs.version, ci->i_xattrs.index_version);
if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) && if (ci->i_xattrs.version == 0 ||
(ci->i_xattrs.index_version >= ci->i_xattrs.version)) { !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
goto list_xattr;
} else {
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR); err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
if (err) if (err)
return err; return err;
}
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
}
err = __build_xattrs(inode); err = __build_xattrs(inode);
if (err < 0) if (err < 0)
goto out; goto out;
list_xattr:
/* /*
* Start with virtual dir xattr names (if any) (including * Start with virtual dir xattr names (if any) (including
* terminating '\0' characters for each). * terminating '\0' characters for each).
...@@ -968,7 +959,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, ...@@ -968,7 +959,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
retry: retry:
issued = __ceph_caps_issued(ci, NULL); issued = __ceph_caps_issued(ci, NULL);
dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued)); dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
if (!(issued & CEPH_CAP_XATTR_EXCL)) if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
goto do_sync; goto do_sync;
__build_xattrs(inode); __build_xattrs(inode);
...@@ -1077,7 +1068,7 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) ...@@ -1077,7 +1068,7 @@ int __ceph_removexattr(struct dentry *dentry, const char *name)
issued = __ceph_caps_issued(ci, NULL); issued = __ceph_caps_issued(ci, NULL);
dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued)); dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
if (!(issued & CEPH_CAP_XATTR_EXCL)) if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
goto do_sync; goto do_sync;
__build_xattrs(inode); __build_xattrs(inode);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment