Commit 6644925a authored by Alexander Aring's avatar Alexander Aring Committed by David Teigland

dlm: do not use ref counts for rsb in the toss state

In the past we had problems when an rsb had a reference counter greater
than one while in the toss state.  An rsb in the toss state is not
actively used for locking, and should not have any other references
apart from the single ref keeping it on the rsb hash.  Shift to freeing
rsb's directly rather than using kref_put to free them, since the ref
counting is not meant to be used in this state.  Add warnings if ref
counting is seen while an rsb is in the toss state.
Signed-off-by: default avatarAlexander Aring <aahringo@redhat.com>
Signed-off-by: default avatarDavid Teigland <teigland@redhat.com>
parent 6c648035
......@@ -325,6 +325,8 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
static inline void hold_rsb(struct dlm_rsb *r)
{
/* rsbs in toss state never get referenced */
WARN_ON(rsb_flag(r, RSB_TOSS));
kref_get(&r->res_ref);
}
......@@ -631,6 +633,11 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
list_move(&r->res_rsbs_list, &ls->ls_keep);
rsb_clear_flag(r, RSB_TOSS);
/* rsb got out of toss state, it becomes alive again
* and we reinit the reference counter that is only
* valid for keep state rsbs
*/
kref_init(&r->res_ref);
goto out_unlock;
......@@ -765,6 +772,11 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
list_move(&r->res_rsbs_list, &ls->ls_keep);
rsb_clear_flag(r, RSB_TOSS);
/* rsb got out of toss state, it becomes alive again
* and we reinit the reference counter that is only
* valid for keep state rsbs
*/
kref_init(&r->res_ref);
goto out_unlock;
......@@ -1112,8 +1124,6 @@ static void toss_rsb(struct kref *kref)
struct dlm_ls *ls = r->res_ls;
DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
kref_init(&r->res_ref);
WARN_ON(rsb_flag(r, RSB_TOSS));
rsb_set_flag(r, RSB_TOSS);
list_move(&r->res_rsbs_list, &ls->ls_toss);
r->res_toss_time = jiffies;
......@@ -1129,16 +1139,20 @@ static void toss_rsb(struct kref *kref)
static void unhold_rsb(struct dlm_rsb *r)
{
int rv;
/* rsbs in toss state never get referenced */
WARN_ON(rsb_flag(r, RSB_TOSS));
rv = kref_put(&r->res_ref, toss_rsb);
DLM_ASSERT(!rv, dlm_dump_rsb(r););
}
static void kill_rsb(struct kref *kref)
void free_toss_rsb(struct dlm_rsb *r)
{
struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS));
/* All work is done after the return from kref_put() so we
can release the write_lock before the remove and free. */
/* check if all work is done after the rsb is on toss list
* and it can be freed.
*/
DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
......@@ -1147,6 +1161,8 @@ static void kill_rsb(struct kref *kref)
DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
dlm_free_rsb(r);
}
/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
......@@ -1611,15 +1627,10 @@ static void shrink_bucket(struct dlm_ls *ls)
continue;
}
if (!kref_put(&r->res_ref, kill_rsb)) {
log_error(ls, "tossed rsb in use %s", r->res_name);
continue;
}
list_del(&r->res_rsbs_list);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
dlm_free_rsb(r);
free_toss_rsb(r);
}
if (need_shrink)
......@@ -1680,19 +1691,13 @@ static void shrink_bucket(struct dlm_ls *ls)
continue;
}
if (!kref_put(&r->res_ref, kill_rsb)) {
spin_unlock_bh(&ls->ls_rsbtbl_lock);
log_error(ls, "remove_name in use %s", name);
continue;
}
list_del(&r->res_rsbs_list);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
send_remove(r);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r);
free_toss_rsb(r);
}
}
......@@ -4180,18 +4185,12 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
return;
}
if (kref_put(&r->res_ref, kill_rsb)) {
list_del(&r->res_rsbs_list);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r);
} else {
log_error(ls, "receive_remove from %d rsb ref error",
from_nodeid);
dlm_print_rsb(r);
spin_unlock_bh(&ls->ls_rsbtbl_lock);
}
free_toss_rsb(r);
}
static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
......
......@@ -18,6 +18,7 @@ void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
uint32_t saved_seq);
void dlm_receive_buffer(const union dlm_packet *p, int nodeid);
int dlm_modes_compat(int mode1, int mode2);
void free_toss_rsb(struct dlm_rsb *r);
void dlm_put_rsb(struct dlm_rsb *r);
void dlm_hold_rsb(struct dlm_rsb *r);
int dlm_put_lkb(struct dlm_lkb *lkb);
......
......@@ -889,7 +889,7 @@ void dlm_clear_toss(struct dlm_ls *ls)
list_del(&r->res_rsbs_list);
rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
dlm_rhash_rsb_params);
dlm_free_rsb(r);
free_toss_rsb(r);
count++;
}
spin_unlock_bh(&ls->ls_rsbtbl_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment