Commit 6b0c7440 authored by Andreas Gruenbacher's avatar Andreas Gruenbacher Committed by Bob Peterson

gfs2: Clean up glock work enqueuing

This patch adds a standardized queueing mechanism for glock work
with spin_lock protection to prevent races.
Signed-off-by: default avatarAndreas Gruenbacher <agruenba@redhat.com>
Signed-off-by: default avatarBob Peterson <rpeterso@redhat.com>
parent 6f6597ba
...@@ -152,20 +152,34 @@ static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl) ...@@ -152,20 +152,34 @@ static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl)
spin_unlock(&lru_lock); spin_unlock(&lru_lock);
} }
/** /*
* gfs2_glock_put() - Decrement reference count on glock * Enqueue the glock on the work queue. Passes one glock reference on to the
* @gl: The glock to put * work queue.
*
*/ */
static void __gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) {
if (!queue_delayed_work(glock_workqueue, &gl->gl_work, delay)) {
/*
* We are holding the lockref spinlock, and the work was still
* queued above. The queued work (glock_work_func) takes that
* spinlock before dropping its glock reference(s), so it
* cannot have dropped them in the meantime.
*/
GLOCK_BUG_ON(gl, gl->gl_lockref.count < 2);
gl->gl_lockref.count--;
}
}
void gfs2_glock_put(struct gfs2_glock *gl) static void gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) {
spin_lock(&gl->gl_lockref.lock);
__gfs2_glock_queue_work(gl, delay);
spin_unlock(&gl->gl_lockref.lock);
}
static void __gfs2_glock_put(struct gfs2_glock *gl)
{ {
struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
struct address_space *mapping = gfs2_glock2aspace(gl); struct address_space *mapping = gfs2_glock2aspace(gl);
if (lockref_put_or_lock(&gl->gl_lockref))
return;
lockref_mark_dead(&gl->gl_lockref); lockref_mark_dead(&gl->gl_lockref);
gfs2_glock_remove_from_lru(gl); gfs2_glock_remove_from_lru(gl);
...@@ -177,6 +191,20 @@ void gfs2_glock_put(struct gfs2_glock *gl) ...@@ -177,6 +191,20 @@ void gfs2_glock_put(struct gfs2_glock *gl)
sdp->sd_lockstruct.ls_ops->lm_put_lock(gl); sdp->sd_lockstruct.ls_ops->lm_put_lock(gl);
} }
/**
* gfs2_glock_put() - Decrement reference count on glock
* @gl: The glock to put
*
*/
void gfs2_glock_put(struct gfs2_glock *gl)
{
if (lockref_put_or_lock(&gl->gl_lockref))
return;
__gfs2_glock_put(gl);
}
/** /**
* may_grant - check if its ok to grant a new lock * may_grant - check if its ok to grant a new lock
* @gl: The glock * @gl: The glock
...@@ -482,8 +510,7 @@ __acquires(&gl->gl_lockref.lock) ...@@ -482,8 +510,7 @@ __acquires(&gl->gl_lockref.lock)
target == LM_ST_UNLOCKED && target == LM_ST_UNLOCKED &&
test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags)) { test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags)) {
finish_xmote(gl, target); finish_xmote(gl, target);
if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) gfs2_glock_queue_work(gl, 0);
gfs2_glock_put(gl);
} }
else if (ret) { else if (ret) {
pr_err("lm_lock ret %d\n", ret); pr_err("lm_lock ret %d\n", ret);
...@@ -492,8 +519,7 @@ __acquires(&gl->gl_lockref.lock) ...@@ -492,8 +519,7 @@ __acquires(&gl->gl_lockref.lock)
} }
} else { /* lock_nolock */ } else { /* lock_nolock */
finish_xmote(gl, target); finish_xmote(gl, target);
if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) gfs2_glock_queue_work(gl, 0);
gfs2_glock_put(gl);
} }
spin_lock(&gl->gl_lockref.lock); spin_lock(&gl->gl_lockref.lock);
...@@ -565,8 +591,7 @@ __acquires(&gl->gl_lockref.lock) ...@@ -565,8 +591,7 @@ __acquires(&gl->gl_lockref.lock)
clear_bit(GLF_LOCK, &gl->gl_flags); clear_bit(GLF_LOCK, &gl->gl_flags);
smp_mb__after_atomic(); smp_mb__after_atomic();
gl->gl_lockref.count++; gl->gl_lockref.count++;
if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) __gfs2_glock_queue_work(gl, 0);
gl->gl_lockref.count--;
return; return;
out_unlock: out_unlock:
...@@ -601,11 +626,11 @@ static void glock_work_func(struct work_struct *work) ...@@ -601,11 +626,11 @@ static void glock_work_func(struct work_struct *work)
{ {
unsigned long delay = 0; unsigned long delay = 0;
struct gfs2_glock *gl = container_of(work, struct gfs2_glock, gl_work.work); struct gfs2_glock *gl = container_of(work, struct gfs2_glock, gl_work.work);
int drop_ref = 0; unsigned int drop_refs = 1;
if (test_and_clear_bit(GLF_REPLY_PENDING, &gl->gl_flags)) { if (test_and_clear_bit(GLF_REPLY_PENDING, &gl->gl_flags)) {
finish_xmote(gl, gl->gl_reply); finish_xmote(gl, gl->gl_reply);
drop_ref = 1; drop_refs++;
} }
spin_lock(&gl->gl_lockref.lock); spin_lock(&gl->gl_lockref.lock);
if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) && if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
...@@ -623,17 +648,25 @@ static void glock_work_func(struct work_struct *work) ...@@ -623,17 +648,25 @@ static void glock_work_func(struct work_struct *work)
} }
} }
run_queue(gl, 0); run_queue(gl, 0);
spin_unlock(&gl->gl_lockref.lock); if (delay) {
if (!delay) /* Keep one glock reference for the work we requeue. */
gfs2_glock_put(gl); drop_refs--;
else {
if (gl->gl_name.ln_type != LM_TYPE_INODE) if (gl->gl_name.ln_type != LM_TYPE_INODE)
delay = 0; delay = 0;
if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0) __gfs2_glock_queue_work(gl, delay);
gfs2_glock_put(gl);
} }
if (drop_ref)
gfs2_glock_put(gl); /*
* Drop the remaining glock references manually here. (Mind that
* __gfs2_glock_queue_work depends on the lockref spinlock begin held
* here as well.)
*/
gl->gl_lockref.count -= drop_refs;
if (!gl->gl_lockref.count) {
__gfs2_glock_put(gl);
return;
}
spin_unlock(&gl->gl_lockref.lock);
} }
/** /**
...@@ -986,8 +1019,7 @@ int gfs2_glock_nq(struct gfs2_holder *gh) ...@@ -986,8 +1019,7 @@ int gfs2_glock_nq(struct gfs2_holder *gh)
test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))) { test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))) {
set_bit(GLF_REPLY_PENDING, &gl->gl_flags); set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
gl->gl_lockref.count++; gl->gl_lockref.count++;
if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) __gfs2_glock_queue_work(gl, 0);
gl->gl_lockref.count--;
} }
run_queue(gl, 1); run_queue(gl, 1);
spin_unlock(&gl->gl_lockref.lock); spin_unlock(&gl->gl_lockref.lock);
...@@ -1047,17 +1079,15 @@ void gfs2_glock_dq(struct gfs2_holder *gh) ...@@ -1047,17 +1079,15 @@ void gfs2_glock_dq(struct gfs2_holder *gh)
gfs2_glock_add_to_lru(gl); gfs2_glock_add_to_lru(gl);
trace_gfs2_glock_queue(gh, 0); trace_gfs2_glock_queue(gh, 0);
if (unlikely(!fast_path)) {
gl->gl_lockref.count++;
if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
!test_bit(GLF_DEMOTE, &gl->gl_flags) &&
gl->gl_name.ln_type == LM_TYPE_INODE)
delay = gl->gl_hold_time;
__gfs2_glock_queue_work(gl, delay);
}
spin_unlock(&gl->gl_lockref.lock); spin_unlock(&gl->gl_lockref.lock);
if (likely(fast_path))
return;
gfs2_glock_hold(gl);
if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
!test_bit(GLF_DEMOTE, &gl->gl_flags) &&
gl->gl_name.ln_type == LM_TYPE_INODE)
delay = gl->gl_hold_time;
if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0)
gfs2_glock_put(gl);
} }
void gfs2_glock_dq_wait(struct gfs2_holder *gh) void gfs2_glock_dq_wait(struct gfs2_holder *gh)
...@@ -1233,9 +1263,8 @@ void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state) ...@@ -1233,9 +1263,8 @@ void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state)
spin_lock(&gl->gl_lockref.lock); spin_lock(&gl->gl_lockref.lock);
handle_callback(gl, state, delay, true); handle_callback(gl, state, delay, true);
__gfs2_glock_queue_work(gl, delay);
spin_unlock(&gl->gl_lockref.lock); spin_unlock(&gl->gl_lockref.lock);
if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0)
gfs2_glock_put(gl);
} }
/** /**
...@@ -1294,10 +1323,8 @@ void gfs2_glock_complete(struct gfs2_glock *gl, int ret) ...@@ -1294,10 +1323,8 @@ void gfs2_glock_complete(struct gfs2_glock *gl, int ret)
gl->gl_lockref.count++; gl->gl_lockref.count++;
set_bit(GLF_REPLY_PENDING, &gl->gl_flags); set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
__gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock); spin_unlock(&gl->gl_lockref.lock);
if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
gfs2_glock_put(gl);
} }
static int glock_cmp(void *priv, struct list_head *a, struct list_head *b) static int glock_cmp(void *priv, struct list_head *a, struct list_head *b)
...@@ -1355,8 +1382,7 @@ __acquires(&lru_lock) ...@@ -1355,8 +1382,7 @@ __acquires(&lru_lock)
if (demote_ok(gl)) if (demote_ok(gl))
handle_callback(gl, LM_ST_UNLOCKED, 0, false); handle_callback(gl, LM_ST_UNLOCKED, 0, false);
WARN_ON(!test_and_clear_bit(GLF_LOCK, &gl->gl_flags)); WARN_ON(!test_and_clear_bit(GLF_LOCK, &gl->gl_flags));
if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) __gfs2_glock_queue_work(gl, 0);
gl->gl_lockref.count--;
spin_unlock(&gl->gl_lockref.lock); spin_unlock(&gl->gl_lockref.lock);
cond_resched_lock(&lru_lock); cond_resched_lock(&lru_lock);
} }
...@@ -1462,13 +1488,12 @@ static void glock_hash_walk(glock_examiner examiner, const struct gfs2_sbd *sdp) ...@@ -1462,13 +1488,12 @@ static void glock_hash_walk(glock_examiner examiner, const struct gfs2_sbd *sdp)
static void thaw_glock(struct gfs2_glock *gl) static void thaw_glock(struct gfs2_glock *gl)
{ {
if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags)) if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags)) {
goto out;
set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) {
out:
gfs2_glock_put(gl); gfs2_glock_put(gl);
return;
} }
set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
gfs2_glock_queue_work(gl, 0);
} }
/** /**
...@@ -1484,9 +1509,8 @@ static void clear_glock(struct gfs2_glock *gl) ...@@ -1484,9 +1509,8 @@ static void clear_glock(struct gfs2_glock *gl)
spin_lock(&gl->gl_lockref.lock); spin_lock(&gl->gl_lockref.lock);
if (gl->gl_state != LM_ST_UNLOCKED) if (gl->gl_state != LM_ST_UNLOCKED)
handle_callback(gl, LM_ST_UNLOCKED, 0, false); handle_callback(gl, LM_ST_UNLOCKED, 0, false);
__gfs2_glock_queue_work(gl, 0);
spin_unlock(&gl->gl_lockref.lock); spin_unlock(&gl->gl_lockref.lock);
if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
gfs2_glock_put(gl);
} }
/** /**
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment