Commit 89b52fe1 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: fix flushing caps

Current ceph_fsync() only flushes dirty caps and wait for them to be
flushed. It doesn't wait for caps that has already been flushing.
This patch makes ceph_fsync() wait for pending flushing caps too.
Besides, this patch also makes caps_are_flushed() peroperly handle
tid wrapping.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 41445999
...@@ -1097,8 +1097,7 @@ void ceph_queue_caps_release(struct inode *inode) ...@@ -1097,8 +1097,7 @@ void ceph_queue_caps_release(struct inode *inode)
* caller should hold snap_rwsem (read), s_mutex. * caller should hold snap_rwsem (read), s_mutex.
*/ */
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
int op, int used, int want, int retain, int flushing, int op, int used, int want, int retain, int flushing)
unsigned *pflush_tid)
__releases(cap->ci->i_ceph_lock) __releases(cap->ci->i_ceph_lock)
{ {
struct ceph_inode_info *ci = cap->ci; struct ceph_inode_info *ci = cap->ci;
...@@ -1170,8 +1169,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, ...@@ -1170,8 +1169,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
* first ack clean Ax. * first ack clean Ax.
*/ */
flush_tid = ++ci->i_cap_flush_last_tid; flush_tid = ++ci->i_cap_flush_last_tid;
if (pflush_tid)
*pflush_tid = flush_tid;
dout(" cap_flush_tid %d\n", (int)flush_tid); dout(" cap_flush_tid %d\n", (int)flush_tid);
for (i = 0; i < CEPH_CAP_BITS; i++) for (i = 0; i < CEPH_CAP_BITS; i++)
if (flushing & (1 << i)) if (flushing & (1 << i))
...@@ -1724,7 +1721,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1724,7 +1721,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
/* __send_cap drops i_ceph_lock */ /* __send_cap drops i_ceph_lock */
delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
want, retain, flushing, NULL); want, retain, flushing);
goto retry; /* retake i_ceph_lock and restart our cap scan. */ goto retry; /* retake i_ceph_lock and restart our cap scan. */
} }
...@@ -1753,12 +1750,12 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1753,12 +1750,12 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
/* /*
* Try to flush dirty caps back to the auth mds. * Try to flush dirty caps back to the auth mds.
*/ */
static int try_flush_caps(struct inode *inode, unsigned *flush_tid) static int try_flush_caps(struct inode *inode, u16 flush_tid[])
{ {
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
int flushing = 0;
struct ceph_mds_session *session = NULL; struct ceph_mds_session *session = NULL;
int flushing = 0;
retry: retry:
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
...@@ -1787,17 +1784,19 @@ static int try_flush_caps(struct inode *inode, unsigned *flush_tid) ...@@ -1787,17 +1784,19 @@ static int try_flush_caps(struct inode *inode, unsigned *flush_tid)
/* __send_cap drops i_ceph_lock */ /* __send_cap drops i_ceph_lock */
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
cap->issued | cap->implemented, flushing, cap->issued | cap->implemented, flushing);
flush_tid);
if (!delayed)
goto out_unlocked;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
__cap_delay_requeue(mdsc, ci); if (delayed)
__cap_delay_requeue(mdsc, ci);
} }
flushing = ci->i_flushing_caps;
if (flushing)
memcpy(flush_tid, ci->i_cap_flush_tid,
sizeof(ci->i_cap_flush_tid));
out: out:
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
out_unlocked:
if (session) if (session)
mutex_unlock(&session->s_mutex); mutex_unlock(&session->s_mutex);
return flushing; return flushing;
...@@ -1806,19 +1805,22 @@ static int try_flush_caps(struct inode *inode, unsigned *flush_tid) ...@@ -1806,19 +1805,22 @@ static int try_flush_caps(struct inode *inode, unsigned *flush_tid)
/* /*
* Return true if we've flushed caps through the given flush_tid. * Return true if we've flushed caps through the given flush_tid.
*/ */
static int caps_are_flushed(struct inode *inode, unsigned tid) static int caps_are_flushed(struct inode *inode, u16 flush_tid[])
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
int i, ret = 1; int i, ret = 1;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
for (i = 0; i < CEPH_CAP_BITS; i++) for (i = 0; i < CEPH_CAP_BITS; i++) {
if ((ci->i_flushing_caps & (1 << i)) && if (!(ci->i_flushing_caps & (1 << i)))
ci->i_cap_flush_tid[i] <= tid) { continue;
// tid only has 16 bits. we need to handle wrapping
if ((s16)(ci->i_cap_flush_tid[i] - flush_tid[i]) <= 0) {
/* still flushing this bit */ /* still flushing this bit */
ret = 0; ret = 0;
break; break;
} }
}
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
return ret; return ret;
} }
...@@ -1871,7 +1873,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) ...@@ -1871,7 +1873,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
{ {
struct inode *inode = file->f_mapping->host; struct inode *inode = file->f_mapping->host;
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
unsigned flush_tid; u16 flush_tid[CEPH_CAP_BITS];
int ret; int ret;
int dirty; int dirty;
...@@ -1883,7 +1885,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) ...@@ -1883,7 +1885,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
return ret; return ret;
mutex_lock(&inode->i_mutex); mutex_lock(&inode->i_mutex);
dirty = try_flush_caps(inode, &flush_tid); dirty = try_flush_caps(inode, flush_tid);
dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
/* /*
...@@ -1892,7 +1894,6 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) ...@@ -1892,7 +1894,6 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
* wait for that) * wait for that)
*/ */
if (!datasync && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { if (!datasync && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
dout("fsync waiting for flush_tid %u\n", flush_tid);
ret = wait_event_interruptible(ci->i_cap_wq, ret = wait_event_interruptible(ci->i_cap_wq,
caps_are_flushed(inode, flush_tid)); caps_are_flushed(inode, flush_tid));
} }
...@@ -1911,14 +1912,14 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) ...@@ -1911,14 +1912,14 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
unsigned flush_tid; u16 flush_tid[CEPH_CAP_BITS];
int err = 0; int err = 0;
int dirty; int dirty;
int wait = wbc->sync_mode == WB_SYNC_ALL; int wait = wbc->sync_mode == WB_SYNC_ALL;
dout("write_inode %p wait=%d\n", inode, wait); dout("write_inode %p wait=%d\n", inode, wait);
if (wait) { if (wait) {
dirty = try_flush_caps(inode, &flush_tid); dirty = try_flush_caps(inode, flush_tid);
if (dirty) if (dirty)
err = wait_event_interruptible(ci->i_cap_wq, err = wait_event_interruptible(ci->i_cap_wq,
caps_are_flushed(inode, flush_tid)); caps_are_flushed(inode, flush_tid));
...@@ -1988,7 +1989,7 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, ...@@ -1988,7 +1989,7 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
__ceph_caps_used(ci), __ceph_caps_used(ci),
__ceph_caps_wanted(ci), __ceph_caps_wanted(ci),
cap->issued | cap->implemented, cap->issued | cap->implemented,
ci->i_flushing_caps, NULL); ci->i_flushing_caps);
if (delayed) { if (delayed) {
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
__cap_delay_requeue(mdsc, ci); __cap_delay_requeue(mdsc, ci);
...@@ -2027,7 +2028,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, ...@@ -2027,7 +2028,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
__ceph_caps_used(ci), __ceph_caps_used(ci),
__ceph_caps_wanted(ci), __ceph_caps_wanted(ci),
cap->issued | cap->implemented, cap->issued | cap->implemented,
ci->i_flushing_caps, NULL); ci->i_flushing_caps);
if (delayed) { if (delayed) {
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
__cap_delay_requeue(mdsc, ci); __cap_delay_requeue(mdsc, ci);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment