Commit 86056090 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: avoid sending unnessesary FLUSHSNAP message

when a snap notification contains no new snapshot, we can avoid
sending FLUSHSNAP message to MDS. But we still need to create
cap_snap in some case because it's required by write path and
page writeback path
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 5dda377c
...@@ -1297,11 +1297,8 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci, ...@@ -1297,11 +1297,8 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
if (capsnap->dirty_pages || capsnap->writing) if (capsnap->dirty_pages || capsnap->writing)
break; break;
/* /* should be removed by ceph_try_drop_cap_snap() */
* if cap writeback already occurred, we should have dropped BUG_ON(!capsnap->need_flush);
* the capsnap in ceph_put_wrbuffer_cap_refs.
*/
BUG_ON(capsnap->dirty == 0);
/* pick mds, take s_mutex */ /* pick mds, take s_mutex */
if (ci->i_auth_cap == NULL) { if (ci->i_auth_cap == NULL) {
...@@ -2347,6 +2344,27 @@ void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) ...@@ -2347,6 +2344,27 @@ void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
} }
/*
* drop cap_snap that is not associated with any snapshot.
* we don't need to send FLUSHSNAP message for it.
*/
static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap)
{
if (!capsnap->need_flush &&
!capsnap->writing && !capsnap->dirty_pages) {
dout("dropping cap_snap %p follows %llu\n",
capsnap, capsnap->follows);
ceph_put_snap_context(capsnap->context);
list_del(&capsnap->ci_item);
list_del(&capsnap->flushing_item);
ceph_put_cap_snap(capsnap);
return 1;
}
return 0;
}
/* /*
* Release cap refs. * Release cap refs.
* *
...@@ -2360,7 +2378,6 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) ...@@ -2360,7 +2378,6 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
{ {
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
int last = 0, put = 0, flushsnaps = 0, wake = 0; int last = 0, put = 0, flushsnaps = 0, wake = 0;
struct ceph_cap_snap *capsnap;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (had & CEPH_CAP_PIN) if (had & CEPH_CAP_PIN)
...@@ -2382,17 +2399,17 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) ...@@ -2382,17 +2399,17 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
if (had & CEPH_CAP_FILE_WR) if (had & CEPH_CAP_FILE_WR)
if (--ci->i_wr_ref == 0) { if (--ci->i_wr_ref == 0) {
last++; last++;
if (!list_empty(&ci->i_cap_snaps)) { if (__ceph_have_pending_cap_snap(ci)) {
capsnap = list_first_entry(&ci->i_cap_snaps, struct ceph_cap_snap *capsnap =
struct ceph_cap_snap, list_last_entry(&ci->i_cap_snaps,
ci_item); struct ceph_cap_snap,
if (capsnap->writing) { ci_item);
capsnap->writing = 0; capsnap->writing = 0;
flushsnaps = if (ceph_try_drop_cap_snap(capsnap))
__ceph_finish_cap_snap(ci, put++;
capsnap); else if (__ceph_finish_cap_snap(ci, capsnap))
wake = 1; flushsnaps = 1;
} wake = 1;
} }
if (ci->i_wrbuffer_ref_head == 0 && if (ci->i_wrbuffer_ref_head == 0 &&
ci->i_dirty_caps == 0 && ci->i_dirty_caps == 0 &&
...@@ -2416,7 +2433,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) ...@@ -2416,7 +2433,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
ceph_flush_snaps(ci); ceph_flush_snaps(ci);
if (wake) if (wake)
wake_up_all(&ci->i_cap_wq); wake_up_all(&ci->i_cap_wq);
if (put) while (put-- > 0)
iput(inode); iput(inode);
} }
...@@ -2467,25 +2484,15 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, ...@@ -2467,25 +2484,15 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
capsnap->dirty_pages -= nr; capsnap->dirty_pages -= nr;
if (capsnap->dirty_pages == 0) { if (capsnap->dirty_pages == 0) {
complete_capsnap = 1; complete_capsnap = 1;
if (capsnap->dirty == 0) drop_capsnap = ceph_try_drop_cap_snap(capsnap);
/* cap writeback completed before we created
* the cap_snap; no FLUSHSNAP is needed */
drop_capsnap = 1;
} }
dout("put_wrbuffer_cap_refs on %p cap_snap %p " dout("put_wrbuffer_cap_refs on %p cap_snap %p "
" snap %lld %d/%d -> %d/%d %s%s%s\n", " snap %lld %d/%d -> %d/%d %s%s\n",
inode, capsnap, capsnap->context->seq, inode, capsnap, capsnap->context->seq,
ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr, ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
ci->i_wrbuffer_ref, capsnap->dirty_pages, ci->i_wrbuffer_ref, capsnap->dirty_pages,
last ? " (wrbuffer last)" : "", last ? " (wrbuffer last)" : "",
complete_capsnap ? " (complete capsnap)" : "", complete_capsnap ? " (complete capsnap)" : "");
drop_capsnap ? " (drop capsnap)" : "");
if (drop_capsnap) {
ceph_put_snap_context(capsnap->context);
list_del(&capsnap->ci_item);
list_del(&capsnap->flushing_item);
ceph_put_cap_snap(capsnap);
}
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
......
...@@ -436,6 +436,14 @@ static int dup_array(u64 **dst, __le64 *src, u32 num) ...@@ -436,6 +436,14 @@ static int dup_array(u64 **dst, __le64 *src, u32 num)
return 0; return 0;
} }
static bool has_new_snaps(struct ceph_snap_context *o,
struct ceph_snap_context *n)
{
if (n->num_snaps == 0)
return false;
/* snaps are in descending order */
return n->snaps[0] > o->seq;
}
/* /*
* When a snapshot is applied, the size/mtime inode metadata is queued * When a snapshot is applied, the size/mtime inode metadata is queued
...@@ -455,7 +463,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) ...@@ -455,7 +463,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
{ {
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
struct ceph_cap_snap *capsnap; struct ceph_cap_snap *capsnap;
struct ceph_snap_context *old_snapc; struct ceph_snap_context *old_snapc, *new_snapc;
int used, dirty; int used, dirty;
capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS); capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
...@@ -469,6 +477,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) ...@@ -469,6 +477,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
dirty = __ceph_caps_dirty(ci); dirty = __ceph_caps_dirty(ci);
old_snapc = ci->i_head_snapc; old_snapc = ci->i_head_snapc;
new_snapc = ci->i_snap_realm->cached_context;
/* /*
* If there is a write in progress, treat that as a dirty Fw, * If there is a write in progress, treat that as a dirty Fw,
...@@ -486,20 +495,37 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) ...@@ -486,20 +495,37 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
dout("queue_cap_snap %p already pending\n", inode); dout("queue_cap_snap %p already pending\n", inode);
goto update_snapc; goto update_snapc;
} }
if (ci->i_snap_realm->cached_context == ceph_empty_snapc) { if (ci->i_wrbuffer_ref_head == 0 &&
dout("queue_cap_snap %p empty snapc\n", inode); !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) {
goto update_snapc;
}
if (!(dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) {
dout("queue_cap_snap %p nothing dirty|writing\n", inode); dout("queue_cap_snap %p nothing dirty|writing\n", inode);
goto update_snapc; goto update_snapc;
} }
BUG_ON(!old_snapc); BUG_ON(!old_snapc);
dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n", /*
inode, capsnap, old_snapc, ceph_cap_string(dirty)); * There is no need to send FLUSHSNAP message to MDS if there is
* no new snapshot. But when there is dirty pages or on-going
* writes, we still need to create cap_snap. cap_snap is needed
* by the write path and page writeback path.
*
* also see ceph_try_drop_cap_snap()
*/
if (has_new_snaps(old_snapc, new_snapc)) {
if (dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))
capsnap->need_flush = true;
} else {
if (!(used & CEPH_CAP_FILE_WR) &&
ci->i_wrbuffer_ref_head == 0) {
dout("queue_cap_snap %p "
"no new_snap|dirty_page|writing\n", inode);
goto update_snapc;
}
}
dout("queue_cap_snap %p cap_snap %p queuing under %p %s %s\n",
inode, capsnap, old_snapc, ceph_cap_string(dirty),
capsnap->need_flush ? "" : "no_flush");
ihold(inode); ihold(inode);
atomic_set(&capsnap->nref, 1); atomic_set(&capsnap->nref, 1);
...@@ -549,9 +575,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) ...@@ -549,9 +575,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
update_snapc: update_snapc:
if (ci->i_head_snapc) { if (ci->i_head_snapc) {
ci->i_head_snapc = ceph_get_snap_context( ci->i_head_snapc = ceph_get_snap_context(new_snapc);
ci->i_snap_realm->cached_context); dout(" new snapc is %p\n", new_snapc);
dout(" new snapc is %p\n", ci->i_head_snapc);
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
......
...@@ -164,6 +164,7 @@ struct ceph_cap_snap { ...@@ -164,6 +164,7 @@ struct ceph_cap_snap {
int writing; /* a sync write is still in progress */ int writing; /* a sync write is still in progress */
int dirty_pages; /* dirty pages awaiting writeback */ int dirty_pages; /* dirty pages awaiting writeback */
bool inline_data; bool inline_data;
bool need_flush;
}; };
static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
...@@ -719,8 +720,8 @@ extern void ceph_snap_exit(void); ...@@ -719,8 +720,8 @@ extern void ceph_snap_exit(void);
static inline bool __ceph_have_pending_cap_snap(struct ceph_inode_info *ci) static inline bool __ceph_have_pending_cap_snap(struct ceph_inode_info *ci)
{ {
return !list_empty(&ci->i_cap_snaps) && return !list_empty(&ci->i_cap_snaps) &&
list_entry(ci->i_cap_snaps.prev, struct ceph_cap_snap, list_last_entry(&ci->i_cap_snaps, struct ceph_cap_snap,
ci_item)->writing; ci_item)->writing;
} }
/* inode.c */ /* inode.c */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment