Commit 6799266b authored by joreland@mysql.com's avatar joreland@mysql.com

Merge mysql.com:/home/jonas/src/mysql-4.1

into mysql.com:/home/jonas/src/mysql-5.0
parents 5ffca20d 27f7a6c4
...@@ -7090,7 +7090,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal) ...@@ -7090,7 +7090,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal)
break; break;
}//switch }//switch
lcpConnectptr.p->noOfLcpConf++; lcpConnectptr.p->noOfLcpConf++;
ndbrequire(lcpConnectptr.p->noOfLcpConf <= 2); ndbrequire(lcpConnectptr.p->noOfLcpConf <= 4);
fragrecptr.p->fragState = ACTIVEFRAG; fragrecptr.p->fragState = ACTIVEFRAG;
rlpPageptr.i = fragrecptr.p->zeroPagePtr; rlpPageptr.i = fragrecptr.p->zeroPagePtr;
ptrCheckGuard(rlpPageptr, cpagesize, page8); ptrCheckGuard(rlpPageptr, cpagesize, page8);
...@@ -7108,7 +7108,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal) ...@@ -7108,7 +7108,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal)
}//for }//for
signal->theData[0] = fragrecptr.p->lcpLqhPtr; signal->theData[0] = fragrecptr.p->lcpLqhPtr;
sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPCONF, signal, 1, JBB); sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPCONF, signal, 1, JBB);
if (lcpConnectptr.p->noOfLcpConf == 2) { if (lcpConnectptr.p->noOfLcpConf == 4) {
jam(); jam();
releaseLcpConnectRec(signal); releaseLcpConnectRec(signal);
rootfragrecptr.i = fragrecptr.p->myroot; rootfragrecptr.i = fragrecptr.p->myroot;
...@@ -7139,6 +7139,13 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal) ...@@ -7139,6 +7139,13 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal)
/* LOCAL FRAG ID */ /* LOCAL FRAG ID */
tresult = 0; tresult = 0;
ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec); ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
if(ERROR_INSERTED(3002) && lcpConnectptr.p->noOfLcpConf < 2)
{
sendSignalWithDelay(cownBlockref, GSN_ACC_CONTOPREQ, signal, 300,
signal->getLength());
return;
}
ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE); ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
rootfragrecptr.i = lcpConnectptr.p->rootrecptr; rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec); ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
...@@ -7172,6 +7179,15 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal) ...@@ -7172,6 +7179,15 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal)
}//while }//while
signal->theData[0] = fragrecptr.p->lcpLqhPtr; signal->theData[0] = fragrecptr.p->lcpLqhPtr;
sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_CONTOPCONF, signal, 1, JBA); sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_CONTOPCONF, signal, 1, JBA);
lcpConnectptr.p->noOfLcpConf++;
if (lcpConnectptr.p->noOfLcpConf == 4) {
jam();
releaseLcpConnectRec(signal);
rootfragrecptr.i = fragrecptr.p->myroot;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
rootfragrecptr.p->rootState = ACTIVEROOT;
}//if
return; /* ALL QUEUED OPERATION ARE RESTARTED IF NEEDED. */ return; /* ALL QUEUED OPERATION ARE RESTARTED IF NEEDED. */
}//Dbacc::execACC_CONTOPREQ() }//Dbacc::execACC_CONTOPREQ()
......
...@@ -966,7 +966,6 @@ public: ...@@ -966,7 +966,6 @@ public:
enum LcpState { enum LcpState {
LCP_IDLE = 0, LCP_IDLE = 0,
LCP_STARTED = 1,
LCP_COMPLETED = 2, LCP_COMPLETED = 2,
LCP_WAIT_FRAGID = 3, LCP_WAIT_FRAGID = 3,
LCP_WAIT_TUP_PREPLCP = 4, LCP_WAIT_TUP_PREPLCP = 4,
...@@ -2265,7 +2264,7 @@ private: ...@@ -2265,7 +2264,7 @@ private:
void sendCopyActiveConf(Signal* signal,Uint32 tableId); void sendCopyActiveConf(Signal* signal,Uint32 tableId);
void checkLcpCompleted(Signal* signal); void checkLcpCompleted(Signal* signal);
void checkLcpHoldop(Signal* signal); void checkLcpHoldop(Signal* signal);
void checkLcpStarted(Signal* signal); bool checkLcpStarted(Signal* signal);
void checkLcpTupprep(Signal* signal); void checkLcpTupprep(Signal* signal);
void getNextFragForLcp(Signal* signal); void getNextFragForLcp(Signal* signal);
void initLcpLocAcc(Signal* signal, Uint32 fragId); void initLcpLocAcc(Signal* signal, Uint32 fragId);
......
...@@ -10426,8 +10426,8 @@ void Dblqh::execTUP_LCPSTARTED(Signal* signal) ...@@ -10426,8 +10426,8 @@ void Dblqh::execTUP_LCPSTARTED(Signal* signal)
void Dblqh::lcpStartedLab(Signal* signal) void Dblqh::lcpStartedLab(Signal* signal)
{ {
checkLcpStarted(signal); if (checkLcpStarted(signal))
if (lcpPtr.p->lcpState == LcpRecord::LCP_STARTED) { {
jam(); jam();
/* ---------------------------------------------------------------------- /* ----------------------------------------------------------------------
* THE LOCAL CHECKPOINT HAS BEEN STARTED. IT IS NOW TIME TO * THE LOCAL CHECKPOINT HAS BEEN STARTED. IT IS NOW TIME TO
...@@ -10507,26 +10507,7 @@ void Dblqh::execLQH_RESTART_OP(Signal* signal) ...@@ -10507,26 +10507,7 @@ void Dblqh::execLQH_RESTART_OP(Signal* signal)
lcpPtr.i = signal->theData[1]; lcpPtr.i = signal->theData[1];
ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord); ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
ndbrequire(fragptr.p->fragStatus == Fragrecord::BLOCKED); ndbrequire(fragptr.p->fragStatus == Fragrecord::BLOCKED);
if (lcpPtr.p->lcpState == LcpRecord::LCP_STARTED) { restartOperationsLab(signal);
jam();
/***********************************************************************/
/* THIS SIGNAL CAN ONLY BE RECEIVED WHEN FRAGMENT IS BLOCKED AND
* THE LOCAL CHECKPOINT HAS BEEN STARTED. THE BLOCKING WILL BE
* REMOVED AS SOON AS ALL OPERATIONS HAVE BEEN STARTED.
***********************************************************************/
restartOperationsLab(signal);
} else if (lcpPtr.p->lcpState == LcpRecord::LCP_BLOCKED_COMP) {
jam();
/*******************************************************************>
* THE CHECKPOINT IS COMPLETED BUT HAS NOT YET STARTED UP
* ALL OPERATIONS AGAIN.
* WE PERFORM THIS START-UP BEFORE CONTINUING WITH THE NEXT
* FRAGMENT OF THE LOCAL CHECKPOINT TO AVOID ANY STRANGE ERRORS.
*******************************************************************> */
restartOperationsLab(signal);
} else {
ndbrequire(false);
}
}//Dblqh::execLQH_RESTART_OP() }//Dblqh::execLQH_RESTART_OP()
void Dblqh::restartOperationsLab(Signal* signal) void Dblqh::restartOperationsLab(Signal* signal)
...@@ -11075,7 +11056,8 @@ void Dblqh::checkLcpHoldop(Signal* signal) ...@@ -11075,7 +11056,8 @@ void Dblqh::checkLcpHoldop(Signal* signal)
* *
* SUBROUTINE SHORT NAME = CLS * SUBROUTINE SHORT NAME = CLS
* ========================================================================== */ * ========================================================================== */
void Dblqh::checkLcpStarted(Signal* signal) bool
Dblqh::checkLcpStarted(Signal* signal)
{ {
LcpLocRecordPtr clsLcpLocptr; LcpLocRecordPtr clsLcpLocptr;
...@@ -11085,7 +11067,7 @@ void Dblqh::checkLcpStarted(Signal* signal) ...@@ -11085,7 +11067,7 @@ void Dblqh::checkLcpStarted(Signal* signal)
do { do {
ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord); ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_WAIT_STARTED){ if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_WAIT_STARTED){
return; return false;
}//if }//if
clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc; clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc;
i++; i++;
...@@ -11096,12 +11078,13 @@ void Dblqh::checkLcpStarted(Signal* signal) ...@@ -11096,12 +11078,13 @@ void Dblqh::checkLcpStarted(Signal* signal)
do { do {
ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord); ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::TUP_WAIT_STARTED){ if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::TUP_WAIT_STARTED){
return; return false;
}//if }//if
clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc; clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc;
i++; i++;
} while (clsLcpLocptr.i != RNIL); } while (clsLcpLocptr.i != RNIL);
lcpPtr.p->lcpState = LcpRecord::LCP_STARTED;
return true;
}//Dblqh::checkLcpStarted() }//Dblqh::checkLcpStarted()
/* ========================================================================== /* ==========================================================================
...@@ -11262,20 +11245,12 @@ void Dblqh::sendAccContOp(Signal* signal) ...@@ -11262,20 +11245,12 @@ void Dblqh::sendAccContOp(Signal* signal)
do { do {
ptrCheckGuard(sacLcpLocptr, clcpLocrecFileSize, lcpLocRecord); ptrCheckGuard(sacLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
sacLcpLocptr.p->accContCounter = 0; sacLcpLocptr.p->accContCounter = 0;
if(sacLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_STARTED){ /* ------------------------------------------------------------------- */
/* ------------------------------------------------------------------- */ /*SEND START OPERATIONS TO ACC AGAIN */
/*SEND START OPERATIONS TO ACC AGAIN */ /* ------------------------------------------------------------------- */
/* ------------------------------------------------------------------- */ signal->theData[0] = lcpPtr.p->lcpAccptr;
signal->theData[0] = lcpPtr.p->lcpAccptr; signal->theData[1] = sacLcpLocptr.p->locFragid;
signal->theData[1] = sacLcpLocptr.p->locFragid; sendSignal(fragptr.p->accBlockref, GSN_ACC_CONTOPREQ, signal, 2, JBA);
sendSignal(fragptr.p->accBlockref, GSN_ACC_CONTOPREQ, signal, 2, JBA);
count++;
} else if(sacLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_COMPLETED){
signal->theData[0] = sacLcpLocptr.i;
sendSignal(reference(), GSN_ACC_CONTOPCONF, signal, 1, JBB);
} else {
ndbrequire(false);
}
sacLcpLocptr.i = sacLcpLocptr.p->nextLcpLoc; sacLcpLocptr.i = sacLcpLocptr.p->nextLcpLoc;
} while (sacLcpLocptr.i != RNIL); } while (sacLcpLocptr.i != RNIL);
...@@ -11311,9 +11286,18 @@ void Dblqh::sendStartLcp(Signal* signal) ...@@ -11311,9 +11286,18 @@ void Dblqh::sendStartLcp(Signal* signal)
signal->theData[0] = stlLcpLocptr.i; signal->theData[0] = stlLcpLocptr.i;
signal->theData[1] = cownref; signal->theData[1] = cownref;
signal->theData[2] = stlLcpLocptr.p->tupRef; signal->theData[2] = stlLcpLocptr.p->tupRef;
sendSignal(fragptr.p->tupBlockref, GSN_TUP_LCPREQ, signal, 3, JBA); if(ERROR_INSERTED(5077))
sendSignalWithDelay(fragptr.p->tupBlockref, GSN_TUP_LCPREQ,
signal, 5000, 3);
else
sendSignal(fragptr.p->tupBlockref, GSN_TUP_LCPREQ, signal, 3, JBA);
stlLcpLocptr.i = stlLcpLocptr.p->nextLcpLoc; stlLcpLocptr.i = stlLcpLocptr.p->nextLcpLoc;
} while (stlLcpLocptr.i != RNIL); } while (stlLcpLocptr.i != RNIL);
if(ERROR_INSERTED(5077))
{
ndbout_c("Delayed TUP_LCPREQ with 5 sec");
}
}//Dblqh::sendStartLcp() }//Dblqh::sendStartLcp()
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment