ndb - bug#20895

  Fix occational LCP hang!!!
  Make sure only to consider alive nodes in startNextChkpt
parent 0d360045
...@@ -9561,15 +9561,19 @@ void Dbdih::startNextChkpt(Signal* signal) ...@@ -9561,15 +9561,19 @@ void Dbdih::startNextChkpt(Signal* signal)
nodePtr.i = replicaPtr.p->procNode; nodePtr.i = replicaPtr.p->procNode;
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord); ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
if (c_lcpState.m_participatingLQH.get(nodePtr.i))
{
if (replicaPtr.p->lcpOngoingFlag && if (replicaPtr.p->lcpOngoingFlag &&
replicaPtr.p->lcpIdStarted < lcpId) { replicaPtr.p->lcpIdStarted < lcpId)
{
jam(); jam();
//------------------------------------------------------------------- //-------------------------------------------------------------------
// We have found a replica on a node that performs local checkpoint // We have found a replica on a node that performs local checkpoint
// that is alive and that have not yet been started. // that is alive and that have not yet been started.
//------------------------------------------------------------------- //-------------------------------------------------------------------
if (nodePtr.p->noOfStartedChkpt < 2) { if (nodePtr.p->noOfStartedChkpt < 2)
{
jam(); jam();
/** /**
* Send LCP_FRAG_ORD to LQH * Send LCP_FRAG_ORD to LQH
...@@ -9587,7 +9591,9 @@ void Dbdih::startNextChkpt(Signal* signal) ...@@ -9587,7 +9591,9 @@ void Dbdih::startNextChkpt(Signal* signal)
nodePtr.p->noOfStartedChkpt = i + 1; nodePtr.p->noOfStartedChkpt = i + 1;
sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]); sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
} else if (nodePtr.p->noOfQueuedChkpt < 2) { }
else if (nodePtr.p->noOfQueuedChkpt < 2)
{
jam(); jam();
/** /**
* Put LCP_FRAG_ORD "in queue" * Put LCP_FRAG_ORD "in queue"
...@@ -9603,10 +9609,13 @@ void Dbdih::startNextChkpt(Signal* signal) ...@@ -9603,10 +9609,13 @@ void Dbdih::startNextChkpt(Signal* signal)
nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId; nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i; nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
nodePtr.p->noOfQueuedChkpt = i + 1; nodePtr.p->noOfQueuedChkpt = i + 1;
} else { }
else
{
jam(); jam();
if(save){ if(save)
{
/** /**
* Stop increasing value on first that was "full" * Stop increasing value on first that was "full"
*/ */
...@@ -9615,7 +9624,8 @@ void Dbdih::startNextChkpt(Signal* signal) ...@@ -9615,7 +9624,8 @@ void Dbdih::startNextChkpt(Signal* signal)
} }
busyNodes.set(nodePtr.i); busyNodes.set(nodePtr.i);
if(busyNodes.count() == lcpNodes){ if(busyNodes.count() == lcpNodes)
{
/** /**
* There were no possibility to start the local checkpoint * There were no possibility to start the local checkpoint
* and it was not possible to queue it up. In this case we * and it was not possible to queue it up. In this case we
...@@ -9628,6 +9638,7 @@ void Dbdih::startNextChkpt(Signal* signal) ...@@ -9628,6 +9638,7 @@ void Dbdih::startNextChkpt(Signal* signal)
}//if }//if
} }
}//while }//while
}
curr.fragmentId++; curr.fragmentId++;
if (curr.fragmentId >= tabPtr.p->totalfragments) { if (curr.fragmentId >= tabPtr.p->totalfragments) {
jam(); jam();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment