Commit 77310320 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: renew caps for read/write if mds session got killed.

When mds session gets killed, read/write operation may hang.
Client waits for Frw caps, but mds does not know what caps client
wants. To recover this, client sends an open request to mds. The
request will tell mds what caps client wants.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent d463a43d
...@@ -2317,7 +2317,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, ...@@ -2317,7 +2317,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
/* make sure file is actually open */ /* make sure file is actually open */
file_wanted = __ceph_caps_file_wanted(ci); file_wanted = __ceph_caps_file_wanted(ci);
if ((file_wanted & need) == 0) { if ((file_wanted & need) != need) {
dout("try_get_cap_refs need %s file_wanted %s, EBADF\n", dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
ceph_cap_string(need), ceph_cap_string(file_wanted)); ceph_cap_string(need), ceph_cap_string(file_wanted));
*err = -EBADF; *err = -EBADF;
...@@ -2412,13 +2412,27 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, ...@@ -2412,13 +2412,27 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
goto out_unlock; goto out_unlock;
} }
if (!__ceph_is_any_caps(ci) && if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) {
ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { int mds_wanted;
if (ACCESS_ONCE(mdsc->fsc->mount_state) ==
CEPH_MOUNT_SHUTDOWN) {
dout("get_cap_refs %p forced umount\n", inode); dout("get_cap_refs %p forced umount\n", inode);
*err = -EIO; *err = -EIO;
ret = 1; ret = 1;
goto out_unlock; goto out_unlock;
} }
mds_wanted = __ceph_caps_mds_wanted(ci);
if ((mds_wanted & need) != need) {
dout("get_cap_refs %p caps were dropped"
" (session killed?)\n", inode);
*err = -ESTALE;
ret = 1;
goto out_unlock;
}
if ((mds_wanted & file_wanted) ==
(file_wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR)))
ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED;
}
dout("get_cap_refs %p have %s needed %s\n", inode, dout("get_cap_refs %p have %s needed %s\n", inode,
ceph_cap_string(have), ceph_cap_string(need)); ceph_cap_string(have), ceph_cap_string(need));
...@@ -2487,7 +2501,7 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, ...@@ -2487,7 +2501,7 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
if (err == -EAGAIN) if (err == -EAGAIN)
continue; continue;
if (err < 0) if (err < 0)
return err; ret = err;
} else { } else {
ret = wait_event_interruptible(ci->i_cap_wq, ret = wait_event_interruptible(ci->i_cap_wq,
try_get_cap_refs(ci, need, want, endoff, try_get_cap_refs(ci, need, want, endoff,
...@@ -2496,7 +2510,14 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, ...@@ -2496,7 +2510,14 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
continue; continue;
if (err < 0) if (err < 0)
ret = err; ret = err;
if (ret < 0) }
if (ret < 0) {
if (err == -ESTALE) {
/* session was killed, try renew caps */
ret = ceph_renew_caps(&ci->vfs_inode);
if (ret == 0)
continue;
}
return ret; return ret;
} }
...@@ -3226,6 +3247,8 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ...@@ -3226,6 +3247,8 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
if (target < 0) { if (target < 0) {
__ceph_remove_cap(cap, false); __ceph_remove_cap(cap, false);
if (!ci->i_auth_cap)
ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
goto out_unlock; goto out_unlock;
} }
......
...@@ -191,6 +191,59 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) ...@@ -191,6 +191,59 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
return ret; return ret;
} }
/*
* try renew caps after session gets killed.
*/
int ceph_renew_caps(struct inode *inode)
{
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_request *req;
int err, flags, wanted;
spin_lock(&ci->i_ceph_lock);
wanted = __ceph_caps_file_wanted(ci);
if (__ceph_is_any_real_caps(ci) &&
(!(wanted & CEPH_CAP_ANY_WR) == 0 || ci->i_auth_cap)) {
int issued = __ceph_caps_issued(ci, NULL);
spin_unlock(&ci->i_ceph_lock);
dout("renew caps %p want %s issued %s updating mds_wanted\n",
inode, ceph_cap_string(wanted), ceph_cap_string(issued));
ceph_check_caps(ci, 0, NULL);
return 0;
}
spin_unlock(&ci->i_ceph_lock);
flags = 0;
if ((wanted & CEPH_CAP_FILE_RD) && (wanted & CEPH_CAP_FILE_WR))
flags = O_RDWR;
else if (wanted & CEPH_CAP_FILE_RD)
flags = O_RDONLY;
else if (wanted & CEPH_CAP_FILE_WR)
flags = O_WRONLY;
#ifdef O_LAZY
if (wanted & CEPH_CAP_FILE_LAZYIO)
flags |= O_LAZY;
#endif
req = prepare_open_request(inode->i_sb, flags, 0);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
}
req->r_inode = inode;
ihold(inode);
req->r_num_caps = 1;
req->r_fmode = -1;
err = ceph_mdsc_do_request(mdsc, NULL, req);
ceph_mdsc_put_request(req);
out:
dout("renew caps %p open result=%d\n", inode, err);
return err < 0 ? err : 0;
}
/* /*
* If we already have the requisite capabilities, we can satisfy * If we already have the requisite capabilities, we can satisfy
* the open request locally (no need to request new caps from the * the open request locally (no need to request new caps from the
......
...@@ -1133,6 +1133,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -1133,6 +1133,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
struct ceph_mds_client *mdsc = struct ceph_mds_client *mdsc =
ceph_sb_to_client(inode->i_sb)->mdsc; ceph_sb_to_client(inode->i_sb)->mdsc;
ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
while (true) { while (true) {
struct rb_node *n = rb_first(&ci->i_cap_flush_tree); struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
if (!n) if (!n)
...@@ -1181,7 +1183,9 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -1181,7 +1183,9 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
list_del(&cf->list); list_del(&cf->list);
ceph_free_cap_flush(cf); ceph_free_cap_flush(cf);
} }
while (drop--)
wake_up_all(&ci->i_cap_wq);
if (drop)
iput(inode); iput(inode);
return 0; return 0;
} }
......
...@@ -470,6 +470,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, ...@@ -470,6 +470,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */ #define CEPH_I_POOL_RD (1 << 5) /* can read from pool */
#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */ #define CEPH_I_POOL_WR (1 << 6) /* can write to pool */
#define CEPH_I_SEC_INITED (1 << 7) /* security initialized */ #define CEPH_I_SEC_INITED (1 << 7) /* security initialized */
#define CEPH_I_CAP_DROPPED (1 << 8) /* caps were forcibly dropped */
static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
long long release_count, long long release_count,
...@@ -932,6 +933,7 @@ extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc); ...@@ -932,6 +933,7 @@ extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
/* file.c */ /* file.c */
extern const struct file_operations ceph_file_fops; extern const struct file_operations ceph_file_fops;
extern int ceph_renew_caps(struct inode *inode);
extern int ceph_open(struct inode *inode, struct file *file); extern int ceph_open(struct inode *inode, struct file *file);
extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
struct file *file, unsigned flags, umode_t mode, struct file *file, unsigned flags, umode_t mode,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment