Commit dbc347ef authored by Xiubo Li's avatar Xiubo Li Committed by Ilya Dryomov

ceph: add ceph_cap_unlink_work to fire check_caps() immediately

When unlinking a file the check caps could be delayed for more than
5 seconds, but in MDS side it maybe waiting for the clients to
release caps.

This will use the cap_wq work queue and a dedicated list to help
fire the check_caps() and dirty buffer flushing immediately.

Link: https://tracker.ceph.com/issues/50223Signed-off-by: default avatarXiubo Li <xiubli@redhat.com>
Reviewed-by: default avatarMilind Changire <mchangir@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 902d6d01
...@@ -4772,7 +4772,22 @@ int ceph_drop_caps_for_unlink(struct inode *inode) ...@@ -4772,7 +4772,22 @@ int ceph_drop_caps_for_unlink(struct inode *inode)
if (__ceph_caps_dirty(ci)) { if (__ceph_caps_dirty(ci)) {
struct ceph_mds_client *mdsc = struct ceph_mds_client *mdsc =
ceph_inode_to_fs_client(inode)->mdsc; ceph_inode_to_fs_client(inode)->mdsc;
__cap_delay_requeue_front(mdsc, ci);
doutc(mdsc->fsc->client, "%p %llx.%llx\n", inode,
ceph_vinop(inode));
spin_lock(&mdsc->cap_unlink_delay_lock);
ci->i_ceph_flags |= CEPH_I_FLUSH;
if (!list_empty(&ci->i_cap_delay_list))
list_del_init(&ci->i_cap_delay_list);
list_add_tail(&ci->i_cap_delay_list,
&mdsc->cap_unlink_delay_list);
spin_unlock(&mdsc->cap_unlink_delay_lock);
/*
* Fire the work immediately, because the MDS maybe
* waiting for caps release.
*/
ceph_queue_cap_unlink_work(mdsc);
} }
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
......
...@@ -2484,6 +2484,50 @@ void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) ...@@ -2484,6 +2484,50 @@ void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
} }
} }
void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc)
{
struct ceph_client *cl = mdsc->fsc->client;
if (mdsc->stopping)
return;
if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) {
doutc(cl, "caps unlink work queued\n");
} else {
doutc(cl, "failed to queue caps unlink work\n");
}
}
static void ceph_cap_unlink_work(struct work_struct *work)
{
struct ceph_mds_client *mdsc =
container_of(work, struct ceph_mds_client, cap_unlink_work);
struct ceph_client *cl = mdsc->fsc->client;
doutc(cl, "begin\n");
spin_lock(&mdsc->cap_unlink_delay_lock);
while (!list_empty(&mdsc->cap_unlink_delay_list)) {
struct ceph_inode_info *ci;
struct inode *inode;
ci = list_first_entry(&mdsc->cap_unlink_delay_list,
struct ceph_inode_info,
i_cap_delay_list);
list_del_init(&ci->i_cap_delay_list);
inode = igrab(&ci->netfs.inode);
if (inode) {
spin_unlock(&mdsc->cap_unlink_delay_lock);
doutc(cl, "on %p %llx.%llx\n", inode,
ceph_vinop(inode));
ceph_check_caps(ci, CHECK_CAPS_FLUSH);
iput(inode);
spin_lock(&mdsc->cap_unlink_delay_lock);
}
}
spin_unlock(&mdsc->cap_unlink_delay_lock);
doutc(cl, "done\n");
}
/* /*
* requests * requests
*/ */
...@@ -5359,6 +5403,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) ...@@ -5359,6 +5403,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
INIT_LIST_HEAD(&mdsc->cap_delay_list); INIT_LIST_HEAD(&mdsc->cap_delay_list);
INIT_LIST_HEAD(&mdsc->cap_wait_list); INIT_LIST_HEAD(&mdsc->cap_wait_list);
spin_lock_init(&mdsc->cap_delay_lock); spin_lock_init(&mdsc->cap_delay_lock);
INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list);
spin_lock_init(&mdsc->cap_unlink_delay_lock);
INIT_LIST_HEAD(&mdsc->snap_flush_list); INIT_LIST_HEAD(&mdsc->snap_flush_list);
spin_lock_init(&mdsc->snap_flush_lock); spin_lock_init(&mdsc->snap_flush_lock);
mdsc->last_cap_flush_tid = 1; mdsc->last_cap_flush_tid = 1;
...@@ -5367,6 +5413,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) ...@@ -5367,6 +5413,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
spin_lock_init(&mdsc->cap_dirty_lock); spin_lock_init(&mdsc->cap_dirty_lock);
init_waitqueue_head(&mdsc->cap_flushing_wq); init_waitqueue_head(&mdsc->cap_flushing_wq);
INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work);
err = ceph_metric_init(&mdsc->metric); err = ceph_metric_init(&mdsc->metric);
if (err) if (err)
goto err_mdsmap; goto err_mdsmap;
...@@ -5640,6 +5687,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) ...@@ -5640,6 +5687,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
ceph_cleanup_global_and_empty_realms(mdsc); ceph_cleanup_global_and_empty_realms(mdsc);
cancel_work_sync(&mdsc->cap_reclaim_work); cancel_work_sync(&mdsc->cap_reclaim_work);
cancel_work_sync(&mdsc->cap_unlink_work);
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
doutc(cl, "done\n"); doutc(cl, "done\n");
......
...@@ -462,6 +462,8 @@ struct ceph_mds_client { ...@@ -462,6 +462,8 @@ struct ceph_mds_client {
unsigned long last_renew_caps; /* last time we renewed our caps */ unsigned long last_renew_caps; /* last time we renewed our caps */
struct list_head cap_delay_list; /* caps with delayed release */ struct list_head cap_delay_list; /* caps with delayed release */
spinlock_t cap_delay_lock; /* protects cap_delay_list */ spinlock_t cap_delay_lock; /* protects cap_delay_list */
struct list_head cap_unlink_delay_list; /* caps with delayed release for unlink */
spinlock_t cap_unlink_delay_lock; /* protects cap_unlink_delay_list */
struct list_head snap_flush_list; /* cap_snaps ready to flush */ struct list_head snap_flush_list; /* cap_snaps ready to flush */
spinlock_t snap_flush_lock; spinlock_t snap_flush_lock;
...@@ -475,6 +477,8 @@ struct ceph_mds_client { ...@@ -475,6 +477,8 @@ struct ceph_mds_client {
struct work_struct cap_reclaim_work; struct work_struct cap_reclaim_work;
atomic_t cap_reclaim_pending; atomic_t cap_reclaim_pending;
struct work_struct cap_unlink_work;
/* /*
* Cap reservations * Cap reservations
* *
...@@ -574,6 +578,7 @@ extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, ...@@ -574,6 +578,7 @@ extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session); struct ceph_mds_session *session);
extern void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc); extern void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc);
extern void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr); extern void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr);
extern void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc);
extern int ceph_iterate_session_caps(struct ceph_mds_session *session, extern int ceph_iterate_session_caps(struct ceph_mds_session *session,
int (*cb)(struct inode *, int mds, void *), int (*cb)(struct inode *, int mds, void *),
void *arg); void *arg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment