Commit 484ec098 authored by unknown's avatar unknown

Merge perch.ndb.mysql.com:/home/jonas/src/41-work

into  perch.ndb.mysql.com:/home/jonas/src/mysql-4.1-ndb
parents 5823817a 9c2562d1
...@@ -1044,6 +1044,8 @@ private: ...@@ -1044,6 +1044,8 @@ private:
void removeStoredReplica(FragmentstorePtr regFragptr, void removeStoredReplica(FragmentstorePtr regFragptr,
ReplicaRecordPtr replicaPtr); ReplicaRecordPtr replicaPtr);
void searchStoredReplicas(FragmentstorePtr regFragptr); void searchStoredReplicas(FragmentstorePtr regFragptr);
bool setup_create_replica(FragmentstorePtr, CreateReplicaRecord*,
ConstPtr<ReplicaRecord>);
void updateNodeInfo(FragmentstorePtr regFragptr); void updateNodeInfo(FragmentstorePtr regFragptr);
//------------------------------------ //------------------------------------
......
...@@ -8344,14 +8344,30 @@ Dbdih::resetReplicaSr(TabRecordPtr tabPtr){ ...@@ -8344,14 +8344,30 @@ Dbdih::resetReplicaSr(TabRecordPtr tabPtr){
resetReplicaLcp(replicaPtr.p, newestRestorableGCI); resetReplicaLcp(replicaPtr.p, newestRestorableGCI);
/* ----------------------------------------------------------------- /**
* LINK THE REPLICA INTO THE STORED REPLICA LIST. WE WILL USE THIS * Make sure we can also find REDO for restoring replica...
* NODE AS A STORED REPLICA. */
* WE MUST FIRST LINK IT OUT OF THE LIST OF OLD STORED REPLICAS. {
* --------------------------------------------------------------- */ CreateReplicaRecord createReplica;
removeOldStoredReplica(fragPtr, replicaPtr); ConstPtr<ReplicaRecord> constReplicaPtr;
linkStoredReplica(fragPtr, replicaPtr); constReplicaPtr.i = replicaPtr.i;
constReplicaPtr.p = replicaPtr.p;
if (setup_create_replica(fragPtr,
&createReplica, constReplicaPtr))
{
removeOldStoredReplica(fragPtr, replicaPtr);
linkStoredReplica(fragPtr, replicaPtr);
}
else
{
infoEvent("Forcing take-over of node %d due to unsufficient REDO"
" for table %d fragment: %d",
nodePtr.i, tabPtr.i, i);
setNodeActiveStatus(nodePtr.i,
Sysfile::NS_NotActive_NotTakenOver);
}
}
} }
default: default:
jam(); jam();
...@@ -9399,6 +9415,7 @@ void Dbdih::calculateKeepGciLab(Signal* signal, Uint32 tableId, Uint32 fragId) ...@@ -9399,6 +9415,7 @@ void Dbdih::calculateKeepGciLab(Signal* signal, Uint32 tableId, Uint32 fragId)
FragmentstorePtr fragPtr; FragmentstorePtr fragPtr;
getFragstore(tabPtr.p, fragId, fragPtr); getFragstore(tabPtr.p, fragId, fragPtr);
checkKeepGci(tabPtr, fragId, fragPtr.p, fragPtr.p->storedReplicas); checkKeepGci(tabPtr, fragId, fragPtr.p, fragPtr.p->storedReplicas);
checkKeepGci(tabPtr, fragId, fragPtr.p, fragPtr.p->oldStoredReplicas);
fragId++; fragId++;
if (fragId >= tabPtr.p->totalfragments) { if (fragId >= tabPtr.p->totalfragments) {
jam(); jam();
...@@ -12281,16 +12298,75 @@ void Dbdih::removeTooNewCrashedReplicas(ReplicaRecordPtr rtnReplicaPtr) ...@@ -12281,16 +12298,75 @@ void Dbdih::removeTooNewCrashedReplicas(ReplicaRecordPtr rtnReplicaPtr)
/* CHECKPOINT WITHOUT NEEDING ANY EXTRA LOGGING FACILITIES.*/ /* CHECKPOINT WITHOUT NEEDING ANY EXTRA LOGGING FACILITIES.*/
/* A MAXIMUM OF FOUR NODES IS RETRIEVED. */ /* A MAXIMUM OF FOUR NODES IS RETRIEVED. */
/*************************************************************************/ /*************************************************************************/
bool
Dbdih::setup_create_replica(FragmentstorePtr fragPtr,
CreateReplicaRecord* createReplicaPtrP,
ConstPtr<ReplicaRecord> replicaPtr)
{
createReplicaPtrP->dataNodeId = replicaPtr.p->procNode;
createReplicaPtrP->replicaRec = replicaPtr.i;
/* ----------------------------------------------------------------- */
/* WE NEED TO SEARCH FOR A PROPER LOCAL CHECKPOINT TO USE FOR THE */
/* SYSTEM RESTART. */
/* ----------------------------------------------------------------- */
Uint32 startGci;
Uint32 startLcpNo;
Uint32 stopGci = SYSFILE->newestRestorableGCI;
bool result = findStartGci(replicaPtr,
stopGci,
startGci,
startLcpNo);
if (!result)
{
jam();
/* --------------------------------------------------------------- */
/* WE COULD NOT FIND ANY LOCAL CHECKPOINT. THE FRAGMENT THUS DO NOT*/
/* CONTAIN ANY VALID LOCAL CHECKPOINT. IT DOES HOWEVER CONTAIN A */
/* VALID FRAGMENT LOG. THUS BY FIRST CREATING THE FRAGMENT AND THEN*/
/* EXECUTING THE FRAGMENT LOG WE CAN CREATE THE FRAGMENT AS */
/* DESIRED. THIS SHOULD ONLY OCCUR AFTER CREATING A FRAGMENT. */
/* */
/* TO INDICATE THAT NO LOCAL CHECKPOINT IS TO BE USED WE SET THE */
/* LOCAL CHECKPOINT TO ZNIL. */
/* --------------------------------------------------------------- */
createReplicaPtrP->lcpNo = ZNIL;
}
else
{
jam();
/* --------------------------------------------------------------- */
/* WE FOUND A PROPER LOCAL CHECKPOINT TO RESTART FROM. */
/* SET LOCAL CHECKPOINT ID AND LOCAL CHECKPOINT NUMBER. */
/* --------------------------------------------------------------- */
createReplicaPtrP->lcpNo = startLcpNo;
arrGuard(startLcpNo, MAX_LCP_STORED);
createReplicaPtrP->createLcpId = replicaPtr.p->lcpId[startLcpNo];
}//if
/* ----------------------------------------------------------------- */
/* WE HAVE EITHER FOUND A LOCAL CHECKPOINT OR WE ARE PLANNING TO */
/* EXECUTE THE LOG FROM THE INITIAL CREATION OF THE TABLE. IN BOTH */
/* CASES WE NEED TO FIND A SET OF LOGS THAT CAN EXECUTE SUCH THAT */
/* WE RECOVER TO THE SYSTEM RESTART GLOBAL CHECKPOINT. */
/* -_--------------------------------------------------------------- */
return findLogNodes(createReplicaPtrP, fragPtr, startGci, stopGci);
}
void Dbdih::searchStoredReplicas(FragmentstorePtr fragPtr) void Dbdih::searchStoredReplicas(FragmentstorePtr fragPtr)
{ {
Uint32 nextReplicaPtrI; Uint32 nextReplicaPtrI;
ConstPtr<ReplicaRecord> replicaPtr; Ptr<ReplicaRecord> replicaPtr;
replicaPtr.i = fragPtr.p->storedReplicas; replicaPtr.i = fragPtr.p->storedReplicas;
while (replicaPtr.i != RNIL) { while (replicaPtr.i != RNIL) {
jam(); jam();
ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord); ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
nextReplicaPtrI = replicaPtr.p->nextReplica; nextReplicaPtrI = replicaPtr.p->nextReplica;
ConstPtr<ReplicaRecord> constReplicaPtr;
constReplicaPtr.i = replicaPtr.i;
constReplicaPtr.p = replicaPtr.p;
NodeRecordPtr nodePtr; NodeRecordPtr nodePtr;
nodePtr.i = replicaPtr.p->procNode; nodePtr.i = replicaPtr.p->procNode;
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord); ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
...@@ -12310,69 +12386,13 @@ void Dbdih::searchStoredReplicas(FragmentstorePtr fragPtr) ...@@ -12310,69 +12386,13 @@ void Dbdih::searchStoredReplicas(FragmentstorePtr fragPtr)
createReplicaPtr.i = cnoOfCreateReplicas; createReplicaPtr.i = cnoOfCreateReplicas;
ptrCheckGuard(createReplicaPtr, 4, createReplicaRecord); ptrCheckGuard(createReplicaPtr, 4, createReplicaRecord);
cnoOfCreateReplicas++; cnoOfCreateReplicas++;
createReplicaPtr.p->dataNodeId = replicaPtr.p->procNode;
createReplicaPtr.p->replicaRec = replicaPtr.i;
/* ----------------------------------------------------------------- */
/* WE NEED TO SEARCH FOR A PROPER LOCAL CHECKPOINT TO USE FOR THE */
/* SYSTEM RESTART. */
/* ----------------------------------------------------------------- */
Uint32 startGci;
Uint32 startLcpNo;
Uint32 stopGci = SYSFILE->newestRestorableGCI;
bool result = findStartGci(replicaPtr,
stopGci,
startGci,
startLcpNo);
if (!result) {
jam();
/* --------------------------------------------------------------- */
/* WE COULD NOT FIND ANY LOCAL CHECKPOINT. THE FRAGMENT THUS DO NOT*/
/* CONTAIN ANY VALID LOCAL CHECKPOINT. IT DOES HOWEVER CONTAIN A */
/* VALID FRAGMENT LOG. THUS BY FIRST CREATING THE FRAGMENT AND THEN*/
/* EXECUTING THE FRAGMENT LOG WE CAN CREATE THE FRAGMENT AS */
/* DESIRED. THIS SHOULD ONLY OCCUR AFTER CREATING A FRAGMENT. */
/* */
/* TO INDICATE THAT NO LOCAL CHECKPOINT IS TO BE USED WE SET THE */
/* LOCAL CHECKPOINT TO ZNIL. */
/* --------------------------------------------------------------- */
createReplicaPtr.p->lcpNo = ZNIL;
} else {
jam();
/* --------------------------------------------------------------- */
/* WE FOUND A PROPER LOCAL CHECKPOINT TO RESTART FROM. */
/* SET LOCAL CHECKPOINT ID AND LOCAL CHECKPOINT NUMBER. */
/* --------------------------------------------------------------- */
createReplicaPtr.p->lcpNo = startLcpNo;
arrGuard(startLcpNo, MAX_LCP_STORED);
createReplicaPtr.p->createLcpId = replicaPtr.p->lcpId[startLcpNo];
}//if
if(ERROR_INSERTED(7073) || ERROR_INSERTED(7074)){
jam();
nodePtr.p->nodeStatus = NodeRecord::DEAD;
}
/* ----------------------------------------------------------------- */
/* WE HAVE EITHER FOUND A LOCAL CHECKPOINT OR WE ARE PLANNING TO */
/* EXECUTE THE LOG FROM THE INITIAL CREATION OF THE TABLE. IN BOTH */
/* CASES WE NEED TO FIND A SET OF LOGS THAT CAN EXECUTE SUCH THAT */
/* WE RECOVER TO THE SYSTEM RESTART GLOBAL CHECKPOINT. */
/* -_--------------------------------------------------------------- */
if (!findLogNodes(createReplicaPtr.p, fragPtr, startGci, stopGci)) {
jam();
/* --------------------------------------------------------------- */
/* WE WERE NOT ABLE TO FIND ANY WAY OF RESTORING THIS REPLICA. */
/* THIS IS A POTENTIAL SYSTEM ERROR. */
/* --------------------------------------------------------------- */
cnoOfCreateReplicas--;
return;
}//if
if(ERROR_INSERTED(7073) || ERROR_INSERTED(7074)){
jam();
nodePtr.p->nodeStatus = NodeRecord::ALIVE;
}
/**
* Should have been checked in resetReplicaSr
*/
ndbrequire(setup_create_replica(fragPtr,
createReplicaPtr.p,
constReplicaPtr));
break; break;
} }
default: default:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment