Commit c03872f5 authored by Kurt Hackel's avatar Kurt Hackel Committed by Mark Fasheh

[PATCH] ocfs2: dlm recovery fixes

when starting lock mastery (excepting the recovery lock) wait on any nodes
needing recovery. fix one instance where lock resources were left attached
to the recovery list after recovery completed.  ensure that the node_down
code is run uniformly regardless of which node found the dead node first.
Signed-off-by: default avatarKurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: default avatarMark Fasheh <mark.fasheh@oracle.com>
parent 9c6510a5
......@@ -658,6 +658,7 @@ void dlm_complete_thread(struct dlm_ctxt *dlm);
int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
......@@ -762,6 +763,11 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data);
int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
u8 nodenum, u8 *real_master);
int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, u8 *real_master);
int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
......
......@@ -141,13 +141,23 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
res->lockname.len)) {
kick_thread = 1;
call_ast = 1;
} else {
mlog(0, "%s: returning DLM_NORMAL to "
"node %u for reco lock\n", dlm->name,
lock->ml.node);
}
} else {
/* for NOQUEUE request, unless we get the
* lock right away, return DLM_NOTQUEUED */
if (flags & LKM_NOQUEUE)
if (flags & LKM_NOQUEUE) {
status = DLM_NOTQUEUED;
else {
if (dlm_is_recovery_lock(res->lockname.name,
res->lockname.len)) {
mlog(0, "%s: returning NOTQUEUED to "
"node %u for reco lock\n", dlm->name,
lock->ml.node);
}
} else {
dlm_lock_get(lock);
list_add_tail(&lock->list, &res->blocked);
kick_thread = 1;
......
......@@ -239,6 +239,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 target);
static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res);
int dlm_is_host_down(int errno)
......@@ -677,6 +679,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
struct dlm_node_iter iter;
unsigned int namelen;
int tries = 0;
int bit, wait_on_recovery = 0;
BUG_ON(!lockid);
......@@ -762,6 +765,18 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
set_bit(dlm->node_num, mle->maybe_map);
list_add(&mle->list, &dlm->master_list);
/* still holding the dlm spinlock, check the recovery map
* to see if there are any nodes that still need to be
* considered. these will not appear in the mle nodemap
* but they might own this lockres. wait on them. */
bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
if (bit < O2NM_MAX_NODES) {
mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
"recover before lock mastery can begin\n",
dlm->name, namelen, (char *)lockid, bit);
wait_on_recovery = 1;
}
}
/* at this point there is either a DLM_MLE_BLOCK or a
......@@ -779,6 +794,39 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock);
while (wait_on_recovery) {
/* any cluster changes that occurred after dropping the
* dlm spinlock would be detectable be a change on the mle,
* so we only need to clear out the recovery map once. */
if (dlm_is_recovery_lock(lockid, namelen)) {
mlog(ML_NOTICE, "%s: recovery map is not empty, but "
"must master $RECOVERY lock now\n", dlm->name);
if (!dlm_pre_master_reco_lockres(dlm, res))
wait_on_recovery = 0;
else {
mlog(0, "%s: waiting 500ms for heartbeat state "
"change\n", dlm->name);
msleep(500);
}
continue;
}
dlm_kick_recovery_thread(dlm);
msleep(100);
dlm_wait_for_recovery(dlm);
spin_lock(&dlm->spinlock);
bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
if (bit < O2NM_MAX_NODES) {
mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
"recover before lock mastery can begin\n",
dlm->name, namelen, (char *)lockid, bit);
wait_on_recovery = 1;
} else
wait_on_recovery = 0;
spin_unlock(&dlm->spinlock);
}
/* must wait for lock to be mastered elsewhere */
if (blocked)
goto wait;
......@@ -1835,6 +1883,61 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
mlog(0, "finished with dlm_assert_master_worker\n");
}
/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
* We cannot wait for node recovery to complete to begin mastering this
* lockres because this lockres is used to kick off recovery! ;-)
* So, do a pre-check on all living nodes to see if any of those nodes
* think that $RECOVERY is currently mastered by a dead node. If so,
* we wait a short time to allow that node to get notified by its own
* heartbeat stack, then check again. All $RECOVERY lock resources
* mastered by dead nodes are purged when the hearbeat callback is
* fired, so we can know for sure that it is safe to continue once
* the node returns a live node or no node. */
static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res)
{
struct dlm_node_iter iter;
int nodenum;
int ret = 0;
u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
spin_lock(&dlm->spinlock);
dlm_node_iter_init(dlm->domain_map, &iter);
spin_unlock(&dlm->spinlock);
while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
/* do not send to self */
if (nodenum == dlm->node_num)
continue;
ret = dlm_do_master_requery(dlm, res, nodenum, &master);
if (ret < 0) {
mlog_errno(ret);
if (!dlm_is_host_down(ret))
BUG();
/* host is down, so answer for that node would be
* DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
}
if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
/* check to see if this master is in the recovery map */
spin_lock(&dlm->spinlock);
if (test_bit(master, dlm->recovery_map)) {
mlog(ML_NOTICE, "%s: node %u has not seen "
"node %u go down yet, and thinks the "
"dead node is mastering the recovery "
"lock. must wait.\n", dlm->name,
nodenum, master);
ret = -EAGAIN;
}
spin_unlock(&dlm->spinlock);
mlog(0, "%s: reco lock master is %u\n", dlm->name,
master);
break;
}
}
return ret;
}
/*
* DLM_MIGRATE_LOCKRES
......
......@@ -58,7 +58,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
static int dlm_recovery_thread(void *data);
void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
static int dlm_do_recovery(struct dlm_ctxt *dlm);
static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
......@@ -78,15 +78,9 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
u8 send_to,
struct dlm_lock_resource *res,
int total_locks);
static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 *real_master);
static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
struct dlm_migratable_lockres *mres);
static int dlm_do_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 nodenum, u8 *real_master);
static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
u8 dead_node, u8 send_to);
......@@ -165,7 +159,7 @@ void dlm_dispatch_work(void *data)
* RECOVERY THREAD
*/
static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
{
/* wake the recovery thread
* this will wake the reco thread in one of three places
......@@ -1316,9 +1310,8 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)
static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 *real_master)
int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res, u8 *real_master)
{
struct dlm_node_iter iter;
int nodenum;
......@@ -1360,8 +1353,10 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
if (ret < 0) {
mlog_errno(ret);
BUG();
/* TODO: need to figure a way to restart this */
if (!dlm_is_host_down(ret))
BUG();
/* host is down, so answer for that node would be
* DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
}
if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "lock master is %u\n", *real_master);
......@@ -1372,9 +1367,8 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
}
static int dlm_do_master_requery(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
u8 nodenum, u8 *real_master)
int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
u8 nodenum, u8 *real_master)
{
int ret = -EINVAL;
struct dlm_master_requery req;
......@@ -1739,6 +1733,13 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
} else
continue;
if (!list_empty(&res->recovering)) {
mlog(0, "%s:%.*s: lockres was "
"marked RECOVERING, owner=%u\n",
dlm->name, res->lockname.len,
res->lockname.name, res->owner);
list_del_init(&res->recovering);
}
spin_lock(&res->spinlock);
dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING;
......@@ -2258,7 +2259,10 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
mlog(0, "%u not in domain/live_nodes map "
"so setting it in reco map manually\n",
br->dead_node);
set_bit(br->dead_node, dlm->recovery_map);
/* force the recovery cleanup in __dlm_hb_node_down
* both of these will be cleared in a moment */
set_bit(br->dead_node, dlm->domain_map);
set_bit(br->dead_node, dlm->live_nodes_map);
__dlm_hb_node_down(dlm, br->dead_node);
}
spin_unlock(&dlm->spinlock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment