Commit e2faea4c authored by Kurt Hackel's avatar Kurt Hackel Committed by Mark Fasheh

[PATCH] ocfs2/dlm: fixes

* fix a hang which can occur during shutdown migration
* do not allow nodes to join during recovery
* when restarting lock mastery, do not ignore nodes which come up
* more than one node could become recovery master, fix this
* sleep to allow some time for heartbeat state to catch up to network
* extra debug info for bad recovery state problems
* make DLM_RECO_NODE_DATA_DONE a valid state for non-master recovery nodes
* prune all locks from dead nodes on $RECOVERY lock resources
* do NOT automatically add new nodes to mle nodemaps until they have properly
  joined the domain
* make sure dlm_pick_recovery_master only exits when all nodes have synced
* properly handle dlmunlock errors in dlm_pick_recovery_master
* do not propagate network errors in dlm_send_begin_reco_message
* dead nodes were not being put in the recovery map sometimes, fix this
* dlmunlock was failing to clear the unlock actions on DLM_DENIED
Signed-off-by: default avatarKurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: default avatarMark Fasheh <mark.fasheh@oracle.com>
parent 0d419a6a
...@@ -657,6 +657,7 @@ void dlm_complete_thread(struct dlm_ctxt *dlm); ...@@ -657,6 +657,7 @@ void dlm_complete_thread(struct dlm_ctxt *dlm);
int dlm_launch_recovery_thread(struct dlm_ctxt *dlm); int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
void dlm_complete_recovery_thread(struct dlm_ctxt *dlm); void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
void dlm_wait_for_recovery(struct dlm_ctxt *dlm); void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
void dlm_put(struct dlm_ctxt *dlm); void dlm_put(struct dlm_ctxt *dlm);
struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm); struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm);
......
...@@ -573,8 +573,11 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -573,8 +573,11 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
spin_lock(&dlm_domain_lock); spin_lock(&dlm_domain_lock);
dlm = __dlm_lookup_domain_full(query->domain, query->name_len); dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
/* Once the dlm ctxt is marked as leaving then we don't want /* Once the dlm ctxt is marked as leaving then we don't want
* to be put in someone's domain map. */ * to be put in someone's domain map.
* Also, explicitly disallow joining at certain troublesome
* times (ie. during recovery). */
if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) { if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
int bit = query->node_idx;
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
if (dlm->dlm_state == DLM_CTXT_NEW && if (dlm->dlm_state == DLM_CTXT_NEW &&
...@@ -586,6 +589,19 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -586,6 +589,19 @@ static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data)
} else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) { } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
/* Disallow parallel joins. */ /* Disallow parallel joins. */
response = JOIN_DISALLOW; response = JOIN_DISALLOW;
} else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
mlog(ML_NOTICE, "node %u trying to join, but recovery "
"is ongoing.\n", bit);
response = JOIN_DISALLOW;
} else if (test_bit(bit, dlm->recovery_map)) {
mlog(ML_NOTICE, "node %u trying to join, but it "
"still needs recovery.\n", bit);
response = JOIN_DISALLOW;
} else if (test_bit(bit, dlm->domain_map)) {
mlog(ML_NOTICE, "node %u trying to join, but it "
"is still in the domain! needs recovery?\n",
bit);
response = JOIN_DISALLOW;
} else { } else {
/* Alright we're fully a part of this domain /* Alright we're fully a part of this domain
* so we keep some state as to who's joining * so we keep some state as to who's joining
......
...@@ -1050,17 +1050,10 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, ...@@ -1050,17 +1050,10 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
node = dlm_bitmap_diff_iter_next(&bdi, &sc); node = dlm_bitmap_diff_iter_next(&bdi, &sc);
while (node >= 0) { while (node >= 0) {
if (sc == NODE_UP) { if (sc == NODE_UP) {
/* a node came up. easy. might not even need /* a node came up. clear any old vote from
* to talk to it if its node number is higher * the response map and set it in the vote map
* or if we are already blocked. */ * then restart the mastery. */
mlog(0, "node up! %d\n", node); mlog(ML_NOTICE, "node %d up while restarting\n", node);
if (blocked)
goto next;
if (node > dlm->node_num) {
mlog(0, "node > this node. skipping.\n");
goto next;
}
/* redo the master request, but only for the new node */ /* redo the master request, but only for the new node */
mlog(0, "sending request to new node\n"); mlog(0, "sending request to new node\n");
...@@ -2005,6 +1998,15 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, ...@@ -2005,6 +1998,15 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
break; break;
mlog(0, "timed out during migration\n"); mlog(0, "timed out during migration\n");
/* avoid hang during shutdown when migrating lockres
* to a node which also goes down */
if (dlm_is_node_dead(dlm, target)) {
mlog(0, "%s:%.*s: expected migration target %u "
"is no longer up. restarting.\n",
dlm->name, res->lockname.len,
res->lockname.name, target);
ret = -ERESTARTSYS;
}
} }
if (ret == -ERESTARTSYS) { if (ret == -ERESTARTSYS) {
/* migration failed, detach and clean up mle */ /* migration failed, detach and clean up mle */
......
...@@ -256,6 +256,27 @@ static int dlm_recovery_thread(void *data) ...@@ -256,6 +256,27 @@ static int dlm_recovery_thread(void *data)
return 0; return 0;
} }
/* returns true when the recovery master has contacted us */
static int dlm_reco_master_ready(struct dlm_ctxt *dlm)
{
int ready;
spin_lock(&dlm->spinlock);
ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM);
spin_unlock(&dlm->spinlock);
return ready;
}
/* returns true if node is no longer in the domain
* could be dead or just not joined */
int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
{
int dead;
spin_lock(&dlm->spinlock);
dead = test_bit(node, dlm->domain_map);
spin_unlock(&dlm->spinlock);
return dead;
}
/* callers of the top-level api calls (dlmlock/dlmunlock) should /* callers of the top-level api calls (dlmlock/dlmunlock) should
* block on the dlm->reco.event when recovery is in progress. * block on the dlm->reco.event when recovery is in progress.
* the dlm recovery thread will set this state when it begins * the dlm recovery thread will set this state when it begins
...@@ -297,6 +318,7 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm) ...@@ -297,6 +318,7 @@ static void dlm_end_recovery(struct dlm_ctxt *dlm)
static int dlm_do_recovery(struct dlm_ctxt *dlm) static int dlm_do_recovery(struct dlm_ctxt *dlm)
{ {
int status = 0; int status = 0;
int ret;
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
...@@ -343,10 +365,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) ...@@ -343,10 +365,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
goto master_here; goto master_here;
if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
/* choose a new master */ /* choose a new master, returns 0 if this node
if (!dlm_pick_recovery_master(dlm)) { * is the master, -EEXIST if it's another node.
* this does not return until a new master is chosen
* or recovery completes entirely. */
ret = dlm_pick_recovery_master(dlm);
if (!ret) {
/* already notified everyone. go. */ /* already notified everyone. go. */
dlm->reco.new_master = dlm->node_num;
goto master_here; goto master_here;
} }
mlog(0, "another node will master this recovery session.\n"); mlog(0, "another node will master this recovery session.\n");
...@@ -371,8 +396,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) ...@@ -371,8 +396,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
if (status < 0) { if (status < 0) {
mlog(ML_ERROR, "error %d remastering locks for node %u, " mlog(ML_ERROR, "error %d remastering locks for node %u, "
"retrying.\n", status, dlm->reco.dead_node); "retrying.\n", status, dlm->reco.dead_node);
/* yield a bit to allow any final network messages
* to get handled on remaining nodes */
msleep(100);
} else { } else {
/* success! see if any other nodes need recovery */ /* success! see if any other nodes need recovery */
mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n",
dlm->name, dlm->reco.dead_node, dlm->node_num);
dlm_reset_recovery(dlm); dlm_reset_recovery(dlm);
} }
dlm_end_recovery(dlm); dlm_end_recovery(dlm);
...@@ -477,7 +507,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -477,7 +507,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
BUG(); BUG();
break; break;
case DLM_RECO_NODE_DATA_DEAD: case DLM_RECO_NODE_DATA_DEAD:
mlog(0, "node %u died after " mlog(ML_NOTICE, "node %u died after "
"requesting recovery info for " "requesting recovery info for "
"node %u\n", ndata->node_num, "node %u\n", ndata->node_num,
dead_node); dead_node);
...@@ -485,6 +515,19 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -485,6 +515,19 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
// start all over // start all over
destroy = 1; destroy = 1;
status = -EAGAIN; status = -EAGAIN;
/* instead of spinning like crazy here,
* wait for the domain map to catch up
* with the network state. otherwise this
* can be hit hundreds of times before
* the node is really seen as dead. */
wait_event_timeout(dlm->dlm_reco_thread_wq,
dlm_is_node_dead(dlm,
ndata->node_num),
msecs_to_jiffies(1000));
mlog(0, "waited 1 sec for %u, "
"dead? %s\n", ndata->node_num,
dlm_is_node_dead(dlm, ndata->node_num) ?
"yes" : "no");
goto leave; goto leave;
case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_RECEIVING:
case DLM_RECO_NODE_DATA_REQUESTED: case DLM_RECO_NODE_DATA_REQUESTED:
...@@ -678,11 +721,27 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) ...@@ -678,11 +721,27 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
dlm = item->dlm; dlm = item->dlm;
dead_node = item->u.ral.dead_node; dead_node = item->u.ral.dead_node;
reco_master = item->u.ral.reco_master; reco_master = item->u.ral.reco_master;
mres = (struct dlm_migratable_lockres *)data;
if (dead_node != dlm->reco.dead_node ||
reco_master != dlm->reco.new_master) {
/* show extra debug info if the recovery state is messed */
mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), "
"request(dead=%u, master=%u)\n",
dlm->name, dlm->reco.dead_node, dlm->reco.new_master,
dead_node, reco_master);
mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u "
"entry[0]={c=%"MLFu64",l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",
dlm->name, mres->lockname_len, mres->lockname, mres->master,
mres->num_locks, mres->total_locks, mres->flags,
mres->ml[0].cookie, mres->ml[0].list, mres->ml[0].flags,
mres->ml[0].type, mres->ml[0].convert_type,
mres->ml[0].highest_blocked, mres->ml[0].node);
BUG();
}
BUG_ON(dead_node != dlm->reco.dead_node); BUG_ON(dead_node != dlm->reco.dead_node);
BUG_ON(reco_master != dlm->reco.new_master); BUG_ON(reco_master != dlm->reco.new_master);
mres = (struct dlm_migratable_lockres *)data;
/* lock resources should have already been moved to the /* lock resources should have already been moved to the
* dlm->reco.resources list. now move items from that list * dlm->reco.resources list. now move items from that list
* to a temp list if the dead owner matches. note that the * to a temp list if the dead owner matches. note that the
...@@ -757,15 +816,18 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -757,15 +816,18 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
continue; continue;
switch (ndata->state) { switch (ndata->state) {
/* should have moved beyond INIT but not to FINALIZE yet */
case DLM_RECO_NODE_DATA_INIT: case DLM_RECO_NODE_DATA_INIT:
case DLM_RECO_NODE_DATA_DEAD: case DLM_RECO_NODE_DATA_DEAD:
case DLM_RECO_NODE_DATA_DONE:
case DLM_RECO_NODE_DATA_FINALIZE_SENT: case DLM_RECO_NODE_DATA_FINALIZE_SENT:
mlog(ML_ERROR, "bad ndata state for node %u:" mlog(ML_ERROR, "bad ndata state for node %u:"
" state=%d\n", ndata->node_num, " state=%d\n", ndata->node_num,
ndata->state); ndata->state);
BUG(); BUG();
break; break;
/* these states are possible at this point, anywhere along
* the line of recovery */
case DLM_RECO_NODE_DATA_DONE:
case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_RECEIVING:
case DLM_RECO_NODE_DATA_REQUESTED: case DLM_RECO_NODE_DATA_REQUESTED:
case DLM_RECO_NODE_DATA_REQUESTING: case DLM_RECO_NODE_DATA_REQUESTING:
...@@ -799,13 +861,31 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, ...@@ -799,13 +861,31 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
{ {
struct dlm_lock_resource *res; struct dlm_lock_resource *res;
struct list_head *iter, *iter2; struct list_head *iter, *iter2;
struct dlm_lock *lock;
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
list_for_each_safe(iter, iter2, &dlm->reco.resources) { list_for_each_safe(iter, iter2, &dlm->reco.resources) {
res = list_entry (iter, struct dlm_lock_resource, recovering); res = list_entry (iter, struct dlm_lock_resource, recovering);
/* always prune any $RECOVERY entries for dead nodes,
* otherwise hangs can occur during later recovery */
if (dlm_is_recovery_lock(res->lockname.name, if (dlm_is_recovery_lock(res->lockname.name,
res->lockname.len)) res->lockname.len)) {
spin_lock(&res->spinlock);
list_for_each_entry(lock, &res->granted, list) {
if (lock->ml.node == dead_node) {
mlog(0, "AHA! there was "
"a $RECOVERY lock for dead "
"node %u (%s)!\n",
dead_node, dlm->name);
list_del_init(&lock->list);
dlm_lock_put(lock);
break;
}
}
spin_unlock(&res->spinlock);
continue; continue;
}
if (res->owner == dead_node) { if (res->owner == dead_node) {
mlog(0, "found lockres owned by dead node while " mlog(0, "found lockres owned by dead node while "
"doing recovery for node %u. sending it.\n", "doing recovery for node %u. sending it.\n",
...@@ -1179,7 +1259,7 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data) ...@@ -1179,7 +1259,7 @@ static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data)
again: again:
ret = dlm_lockres_master_requery(dlm, res, &real_master); ret = dlm_lockres_master_requery(dlm, res, &real_master);
if (ret < 0) { if (ret < 0) {
mlog(0, "dlm_lockres_master_requery failure: %d\n", mlog(0, "dlm_lockres_master_requery ret=%d\n",
ret); ret);
goto again; goto again;
} }
...@@ -1757,6 +1837,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -1757,6 +1837,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
struct dlm_lock_resource *res; struct dlm_lock_resource *res;
int i; int i;
struct list_head *bucket; struct list_head *bucket;
struct dlm_lock *lock;
/* purge any stale mles */ /* purge any stale mles */
...@@ -1780,10 +1861,25 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -1780,10 +1861,25 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
bucket = &(dlm->resources[i]); bucket = &(dlm->resources[i]);
list_for_each(iter, bucket) { list_for_each(iter, bucket) {
res = list_entry (iter, struct dlm_lock_resource, list); res = list_entry (iter, struct dlm_lock_resource, list);
/* always prune any $RECOVERY entries for dead nodes,
* otherwise hangs can occur during later recovery */
if (dlm_is_recovery_lock(res->lockname.name, if (dlm_is_recovery_lock(res->lockname.name,
res->lockname.len)) res->lockname.len)) {
spin_lock(&res->spinlock);
list_for_each_entry(lock, &res->granted, list) {
if (lock->ml.node == dead_node) {
mlog(0, "AHA! there was "
"a $RECOVERY lock for dead "
"node %u (%s)!\n",
dead_node, dlm->name);
list_del_init(&lock->list);
dlm_lock_put(lock);
break;
}
}
spin_unlock(&res->spinlock);
continue; continue;
}
spin_lock(&res->spinlock); spin_lock(&res->spinlock);
/* zero the lvb if necessary */ /* zero the lvb if necessary */
dlm_revalidate_lvb(dlm, res, dead_node); dlm_revalidate_lvb(dlm, res, dead_node);
...@@ -1869,12 +1965,9 @@ void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data) ...@@ -1869,12 +1965,9 @@ void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data)
return; return;
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
set_bit(idx, dlm->live_nodes_map); set_bit(idx, dlm->live_nodes_map);
/* do NOT notify mle attached to the heartbeat events.
/* notify any mles attached to the heartbeat events */ * new nodes are not interesting in mastery until joined. */
dlm_hb_event_notify_attached(dlm, idx, 1);
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
dlm_put(dlm); dlm_put(dlm);
...@@ -1897,7 +1990,18 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st) ...@@ -1897,7 +1990,18 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st)
mlog(0, "unlockast for recovery lock fired!\n"); mlog(0, "unlockast for recovery lock fired!\n");
} }
/*
* dlm_pick_recovery_master will continually attempt to use
* dlmlock() on the special "$RECOVERY" lockres with the
* LKM_NOQUEUE flag to get an EX. every thread that enters
* this function on each node racing to become the recovery
* master will not stop attempting this until either:
* a) this node gets the EX (and becomes the recovery master),
* or b) dlm->reco.new_master gets set to some nodenum
* != O2NM_INVALID_NODE_NUM (another node will do the reco).
* so each time a recovery master is needed, the entire cluster
* will sync at this point. if the new master dies, that will
* be detected in dlm_do_recovery */
static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
{ {
enum dlm_status ret; enum dlm_status ret;
...@@ -1906,23 +2010,45 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) ...@@ -1906,23 +2010,45 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n", mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",
dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);
retry: again:
memset(&lksb, 0, sizeof(lksb)); memset(&lksb, 0, sizeof(lksb));
ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast); DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
dlm->name, ret, lksb.status);
if (ret == DLM_NORMAL) { if (ret == DLM_NORMAL) {
mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n", mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",
dlm->name, dlm->node_num); dlm->name, dlm->node_num);
/* I am master, send message to all nodes saying
* that I am beginning a recovery session */ /* got the EX lock. check to see if another node
* just became the reco master */
if (dlm_reco_master_ready(dlm)) {
mlog(0, "%s: got reco EX lock, but %u will "
"do the recovery\n", dlm->name,
dlm->reco.new_master);
status = -EEXIST;
} else {
status = dlm_send_begin_reco_message(dlm, status = dlm_send_begin_reco_message(dlm,
dlm->reco.dead_node); dlm->reco.dead_node);
/* this always succeeds */
BUG_ON(status);
/* set the new_master to this node */
spin_lock(&dlm->spinlock);
dlm->reco.new_master = dlm->node_num;
spin_unlock(&dlm->spinlock);
}
/* recovery lock is a special case. ast will not get fired, /* recovery lock is a special case. ast will not get fired,
* so just go ahead and unlock it. */ * so just go ahead and unlock it. */
ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm); ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm);
if (ret == DLM_DENIED) {
mlog(0, "got DLM_DENIED, trying LKM_CANCEL\n");
ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm);
}
if (ret != DLM_NORMAL) { if (ret != DLM_NORMAL) {
/* this would really suck. this could only happen /* this would really suck. this could only happen
* if there was a network error during the unlock * if there was a network error during the unlock
...@@ -1930,20 +2056,42 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) ...@@ -1930,20 +2056,42 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
* is actually "done" and the lock structure is * is actually "done" and the lock structure is
* even freed. we can continue, but only * even freed. we can continue, but only
* because this specific lock name is special. */ * because this specific lock name is special. */
mlog(0, "dlmunlock returned %d\n", ret); mlog(ML_ERROR, "dlmunlock returned %d\n", ret);
}
if (status < 0) {
mlog(0, "failed to send recovery message. "
"must retry with new node map.\n");
goto retry;
} }
} else if (ret == DLM_NOTQUEUED) { } else if (ret == DLM_NOTQUEUED) {
mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n", mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",
dlm->name, dlm->node_num); dlm->name, dlm->node_num);
/* another node is master. wait on /* another node is master. wait on
* reco.new_master != O2NM_INVALID_NODE_NUM */ * reco.new_master != O2NM_INVALID_NODE_NUM
* for at most one second */
wait_event_timeout(dlm->dlm_reco_thread_wq,
dlm_reco_master_ready(dlm),
msecs_to_jiffies(1000));
if (!dlm_reco_master_ready(dlm)) {
mlog(0, "%s: reco master taking awhile\n",
dlm->name);
goto again;
}
/* another node has informed this one that it is reco master */
mlog(0, "%s: reco master %u is ready to recover %u\n",
dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
status = -EEXIST; status = -EEXIST;
} else {
struct dlm_lock_resource *res;
/* dlmlock returned something other than NOTQUEUED or NORMAL */
mlog(ML_ERROR, "%s: got %s from dlmlock($RECOVERY), "
"lksb.status=%s\n", dlm->name, dlm_errname(ret),
dlm_errname(lksb.status));
res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,
DLM_RECOVERY_LOCK_NAME_LEN);
if (res) {
dlm_print_one_lock_resource(res);
dlm_lockres_put(res);
} else {
mlog(ML_ERROR, "recovery lock not found\n");
}
BUG();
} }
return status; return status;
...@@ -1982,7 +2130,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -1982,7 +2130,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
mlog(0, "not sending begin reco to self\n"); mlog(0, "not sending begin reco to self\n");
continue; continue;
} }
retry:
ret = -EINVAL; ret = -EINVAL;
mlog(0, "attempting to send begin reco msg to %d\n", mlog(0, "attempting to send begin reco msg to %d\n",
nodenum); nodenum);
...@@ -1991,8 +2139,17 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -1991,8 +2139,17 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
/* negative status is handled ok by caller here */ /* negative status is handled ok by caller here */
if (ret >= 0) if (ret >= 0)
ret = status; ret = status;
if (dlm_is_host_down(ret)) {
/* node is down. not involved in recovery
* so just keep going */
mlog(0, "%s: node %u was down when sending "
"begin reco msg (%d)\n", dlm->name, nodenum, ret);
ret = 0;
}
if (ret < 0) { if (ret < 0) {
struct dlm_lock_resource *res; struct dlm_lock_resource *res;
/* this is now a serious problem, possibly ENOMEM
* in the network stack. must retry */
mlog_errno(ret); mlog_errno(ret);
mlog(ML_ERROR, "begin reco of dlm %s to node %u " mlog(ML_ERROR, "begin reco of dlm %s to node %u "
" returned %d\n", dlm->name, nodenum, ret); " returned %d\n", dlm->name, nodenum, ret);
...@@ -2004,7 +2161,10 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) ...@@ -2004,7 +2161,10 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
} else { } else {
mlog(ML_ERROR, "recovery lock not found\n"); mlog(ML_ERROR, "recovery lock not found\n");
} }
break; /* sleep for a bit in hopes that we can avoid
* another ENOMEM */
msleep(100);
goto retry;
} }
} }
...@@ -2027,19 +2187,34 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) ...@@ -2027,19 +2187,34 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
spin_lock(&dlm->spinlock); spin_lock(&dlm->spinlock);
if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
mlog(0, "new_master already set to %u!\n", if (test_bit(dlm->reco.new_master, dlm->recovery_map)) {
dlm->reco.new_master); mlog(0, "%s: new_master %u died, changing "
"to %u\n", dlm->name, dlm->reco.new_master,
br->node_idx);
} else {
mlog(0, "%s: new_master %u NOT DEAD, changing "
"to %u\n", dlm->name, dlm->reco.new_master,
br->node_idx);
/* may not have seen the new master as dead yet */
}
} }
if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) {
mlog(0, "dead_node already set to %u!\n", mlog(ML_NOTICE, "%s: dead_node previously set to %u, "
dlm->reco.dead_node); "node %u changing it to %u\n", dlm->name,
dlm->reco.dead_node, br->node_idx, br->dead_node);
} }
dlm->reco.new_master = br->node_idx; dlm->reco.new_master = br->node_idx;
dlm->reco.dead_node = br->dead_node; dlm->reco.dead_node = br->dead_node;
if (!test_bit(br->dead_node, dlm->recovery_map)) { if (!test_bit(br->dead_node, dlm->recovery_map)) {
mlog(ML_ERROR, "recovery master %u sees %u as dead, but this " mlog(0, "recovery master %u sees %u as dead, but this "
"node has not yet. marking %u as dead\n", "node has not yet. marking %u as dead\n",
br->node_idx, br->dead_node, br->dead_node); br->node_idx, br->dead_node, br->dead_node);
if (!test_bit(br->dead_node, dlm->domain_map) ||
!test_bit(br->dead_node, dlm->live_nodes_map))
mlog(0, "%u not in domain/live_nodes map "
"so setting it in reco map manually\n",
br->dead_node);
set_bit(br->dead_node, dlm->recovery_map);
__dlm_hb_node_down(dlm, br->dead_node); __dlm_hb_node_down(dlm, br->dead_node);
} }
spin_unlock(&dlm->spinlock); spin_unlock(&dlm->spinlock);
......
...@@ -188,6 +188,19 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm, ...@@ -188,6 +188,19 @@ static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
actions &= ~(DLM_UNLOCK_REMOVE_LOCK| actions &= ~(DLM_UNLOCK_REMOVE_LOCK|
DLM_UNLOCK_REGRANT_LOCK| DLM_UNLOCK_REGRANT_LOCK|
DLM_UNLOCK_CLEAR_CONVERT_TYPE); DLM_UNLOCK_CLEAR_CONVERT_TYPE);
} else if (status == DLM_RECOVERING ||
status == DLM_MIGRATING ||
status == DLM_FORWARD) {
/* must clear the actions because this unlock
* is about to be retried. cannot free or do
* any list manipulation. */
mlog(0, "%s:%.*s: clearing actions, %s\n",
dlm->name, res->lockname.len,
res->lockname.name,
status==DLM_RECOVERING?"recovering":
(status==DLM_MIGRATING?"migrating":
"forward"));
actions = 0;
} }
if (flags & LKM_CANCEL) if (flags & LKM_CANCEL)
lock->cancel_pending = 0; lock->cancel_pending = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment