ocfs2: dlm_remaster_locks() should never exit without completing

We cannot restart recovery. Once we begin to recover a node, keep the state
of the recovery intact and follow through, regardless of any other node
deaths that may occur.

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 00209f4..22a0b05 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -480,6 +480,7 @@
 
 	status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
 	if (status < 0) {
+		/* we should never hit this anymore */
 		mlog(ML_ERROR, "error %d remastering locks for node %u, "
 		     "retrying.\n", status, dlm->reco.dead_node);
 		/* yield a bit to allow any final network messages
@@ -506,9 +507,16 @@
 	int destroy = 0;
 	int pass = 0;
 
-	status = dlm_init_recovery_area(dlm, dead_node);
-	if (status < 0)
-		goto leave;
+	do {
+		/* we have become recovery master.  there is no escaping
+		 * this, so just keep trying until we get it. */
+		status = dlm_init_recovery_area(dlm, dead_node);
+		if (status < 0) {
+			mlog(ML_ERROR, "%s: failed to alloc recovery area, "
+			     "retrying\n", dlm->name);
+			msleep(1000);
+		}
+	} while (status != 0);
 
 	/* safe to access the node data list without a lock, since this
 	 * process is the only one to change the list */
@@ -525,16 +533,36 @@
 			continue;
 		}
 
-		status = dlm_request_all_locks(dlm, ndata->node_num, dead_node);
-		if (status < 0) {
-			mlog_errno(status);
-			if (dlm_is_host_down(status))
-				ndata->state = DLM_RECO_NODE_DATA_DEAD;
-			else {
-				destroy = 1;
-				goto leave;
+		do {
+			status = dlm_request_all_locks(dlm, ndata->node_num,
+						       dead_node);
+			if (status < 0) {
+				mlog_errno(status);
+				if (dlm_is_host_down(status)) {
+					/* node died, ignore it for recovery */
+					status = 0;
+					ndata->state = DLM_RECO_NODE_DATA_DEAD;
+					/* wait for the domain map to catch up
+					 * with the network state. */
+					wait_event_timeout(dlm->dlm_reco_thread_wq,
+							   dlm_is_node_dead(dlm,
+								ndata->node_num),
+							   msecs_to_jiffies(1000));
+					mlog(0, "waited 1 sec for %u, "
+					     "dead? %s\n", ndata->node_num,
+					     dlm_is_node_dead(dlm, ndata->node_num) ?
+					     "yes" : "no");
+				} else {
+					/* -ENOMEM on the other node */
+					mlog(0, "%s: node %u returned "
+					     "%d during recovery, retrying "
+					     "after a short wait\n",
+					     dlm->name, ndata->node_num,
+					     status);
+					msleep(100);
+				}
 			}
-		}
+		} while (status != 0);
 
 		switch (ndata->state) {
 			case DLM_RECO_NODE_DATA_INIT:
@@ -546,10 +574,9 @@
 				mlog(0, "node %u died after requesting "
 				     "recovery info for node %u\n",
 				     ndata->node_num, dead_node);
-				// start all over
-				destroy = 1;
-				status = -EAGAIN;
-				goto leave;
+				/* fine.  don't need this node's info.
+				 * continue without it. */
+				break;
 			case DLM_RECO_NODE_DATA_REQUESTING:
 				ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
 				mlog(0, "now receiving recovery data from "
@@ -593,28 +620,12 @@
 					BUG();
 					break;
 				case DLM_RECO_NODE_DATA_DEAD:
-					mlog(ML_NOTICE, "node %u died after "
+					mlog(0, "node %u died after "
 					     "requesting recovery info for "
 					     "node %u\n", ndata->node_num,
 					     dead_node);
 					spin_unlock(&dlm_reco_state_lock);
-					// start all over
-					destroy = 1;
-					status = -EAGAIN;
-					/* instead of spinning like crazy here,
-					 * wait for the domain map to catch up
-					 * with the network state.  otherwise this
-					 * can be hit hundreds of times before
-					 * the node is really seen as dead. */
-					wait_event_timeout(dlm->dlm_reco_thread_wq,
-							   dlm_is_node_dead(dlm,
-								ndata->node_num),
-							   msecs_to_jiffies(1000));
-					mlog(0, "waited 1 sec for %u, "
-					     "dead? %s\n", ndata->node_num,
-					     dlm_is_node_dead(dlm, ndata->node_num) ?
-					     "yes" : "no");
-					goto leave;
+					break;
 				case DLM_RECO_NODE_DATA_RECEIVING:
 				case DLM_RECO_NODE_DATA_REQUESTED:
 					mlog(0, "%s: node %u still in state %s\n",
@@ -659,7 +670,7 @@
 			     jiffies, dlm->reco.dead_node,
 			     dlm->node_num, dlm->reco.new_master);
 			destroy = 1;
-			status = ret;
+			status = 0;
 			/* rescan everything marked dirty along the way */
 			dlm_kick_thread(dlm, NULL);
 			break;
@@ -672,7 +683,6 @@
 
 	}
 
-leave:
 	if (destroy)
 		dlm_destroy_recovery_area(dlm, dead_node);
 
@@ -832,24 +842,22 @@
 
 	if (dead_node != dlm->reco.dead_node ||
 	    reco_master != dlm->reco.new_master) {
-		/* show extra debug info if the recovery state is messed */
-		mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), "
-		     "request(dead=%u, master=%u)\n",
-		     dlm->name, dlm->reco.dead_node, dlm->reco.new_master,
-		     dead_node, reco_master);
-		mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u "
-		     "entry[0]={c=%u:%llu,l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",
-		     dlm->name, mres->lockname_len, mres->lockname, mres->master,
-		     mres->num_locks, mres->total_locks, mres->flags,
-		     dlm_get_lock_cookie_node(mres->ml[0].cookie),
-		     dlm_get_lock_cookie_seq(mres->ml[0].cookie),
-		     mres->ml[0].list, mres->ml[0].flags,
-		     mres->ml[0].type, mres->ml[0].convert_type,
-		     mres->ml[0].highest_blocked, mres->ml[0].node);
-		BUG();
+		/* worker could have been created before the recovery master
+		 * died.  if so, do not continue, but do not error. */
+		if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
+			mlog(ML_NOTICE, "%s: will not send recovery state, "
+			     "recovery master %u died, thread=(dead=%u,mas=%u)"
+			     " current=(dead=%u,mas=%u)\n", dlm->name,
+			     reco_master, dead_node, reco_master,
+			     dlm->reco.dead_node, dlm->reco.new_master);
+		} else {
+			mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, "
+			     "master=%u), request(dead=%u, master=%u)\n",
+			     dlm->name, dlm->reco.dead_node,
+			     dlm->reco.new_master, dead_node, reco_master);
+		}
+		goto leave;
 	}
-	BUG_ON(dead_node != dlm->reco.dead_node);
-	BUG_ON(reco_master != dlm->reco.new_master);
 
 	/* lock resources should have already been moved to the
  	 * dlm->reco.resources list.  now move items from that list
@@ -889,7 +897,7 @@
 			     dlm->name, reco_master, dead_node, ret);
 		}
 	}
-
+leave:
 	free_page((unsigned long)data);
 }