Commit cdd067e6 authored by unknown's avatar unknown

ndb - bug#31257

    handle partially complete LCP better in SR


storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp:
  remove partially complete LCP from "node" when doign removeNodeFromTable
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp:
  remove partially complete LCP from "node" when doign removeNodeFromTable
parent db693ec9
...@@ -318,6 +318,7 @@ public: ...@@ -318,6 +318,7 @@ public:
Uint8 noOfStartedChkpt; Uint8 noOfStartedChkpt;
MasterLCPConf::State lcpStateAtTakeOver; MasterLCPConf::State lcpStateAtTakeOver;
Uint32 m_remove_node_from_table_lcp_id;
}; };
typedef Ptr<NodeRecord> NodeRecordPtr; typedef Ptr<NodeRecord> NodeRecordPtr;
/**********************************************************************/ /**********************************************************************/
......
...@@ -4989,6 +4989,18 @@ void Dbdih::startRemoveFailedNode(Signal* signal, NodeRecordPtr failedNodePtr) ...@@ -4989,6 +4989,18 @@ void Dbdih::startRemoveFailedNode(Signal* signal, NodeRecordPtr failedNodePtr)
return; return;
} }
/**
* If node has node complete LCP
* we need to remove it as undo might not be complete
* bug#31257
*/
failedNodePtr.p->m_remove_node_from_table_lcp_id = RNIL;
if (c_lcpState.m_LCP_COMPLETE_REP_Counter_LQH.isWaitingFor(failedNodePtr.i))
{
jam();
failedNodePtr.p->m_remove_node_from_table_lcp_id = SYSFILE->latestLCP_ID;
}
jam(); jam();
signal->theData[0] = DihContinueB::ZREMOVE_NODE_FROM_TABLE; signal->theData[0] = DihContinueB::ZREMOVE_NODE_FROM_TABLE;
signal->theData[1] = failedNodePtr.i; signal->theData[1] = failedNodePtr.i;
...@@ -5630,6 +5642,11 @@ void Dbdih::removeNodeFromTable(Signal* signal, ...@@ -5630,6 +5642,11 @@ void Dbdih::removeNodeFromTable(Signal* signal,
return; return;
}//if }//if
NodeRecordPtr nodePtr;
nodePtr.i = nodeId;
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
const Uint32 lcpId = nodePtr.p->m_remove_node_from_table_lcp_id;
/** /**
* For each fragment * For each fragment
*/ */
...@@ -5637,7 +5654,6 @@ void Dbdih::removeNodeFromTable(Signal* signal, ...@@ -5637,7 +5654,6 @@ void Dbdih::removeNodeFromTable(Signal* signal,
Uint32 noOfRemovedLcpReplicas = 0; // No of replicas in LCP removed Uint32 noOfRemovedLcpReplicas = 0; // No of replicas in LCP removed
Uint32 noOfRemainingLcpReplicas = 0;// No of replicas in LCP remaining Uint32 noOfRemainingLcpReplicas = 0;// No of replicas in LCP remaining
//const Uint32 lcpId = SYSFILE->latestLCP_ID;
const bool lcpOngoingFlag = (tabPtr.p->tabLcpStatus== TabRecord::TLS_ACTIVE); const bool lcpOngoingFlag = (tabPtr.p->tabLcpStatus== TabRecord::TLS_ACTIVE);
const bool unlogged = (tabPtr.p->tabStorage != TabRecord::ST_NORMAL); const bool unlogged = (tabPtr.p->tabStorage != TabRecord::ST_NORMAL);
...@@ -5672,6 +5688,23 @@ void Dbdih::removeNodeFromTable(Signal* signal, ...@@ -5672,6 +5688,23 @@ void Dbdih::removeNodeFromTable(Signal* signal,
noOfRemovedLcpReplicas ++; noOfRemovedLcpReplicas ++;
replicaPtr.p->lcpOngoingFlag = false; replicaPtr.p->lcpOngoingFlag = false;
} }
if (lcpId != RNIL)
{
jam();
Uint32 lcpNo = prevLcpNo(replicaPtr.p->nextLcp);
if (replicaPtr.p->lcpStatus[lcpNo] == ZVALID &&
replicaPtr.p->lcpId[lcpNo] == SYSFILE->latestLCP_ID)
{
jam();
replicaPtr.p->lcpStatus[lcpNo] = ZINVALID;
replicaPtr.p->lcpId[lcpNo] = 0;
replicaPtr.p->nextLcp = lcpNo;
ndbout_c("REMOVING lcp: %u from table: %u frag: %u node: %u",
SYSFILE->latestLCP_ID,
tabPtr.i, fragNo, nodeId);
}
}
} }
} }
if (!found) if (!found)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment