Commit 5a1af9ed authored by unknown's avatar unknown

BUG#9891 - ndb lcp

Crash if ACC_CONTOPREQ was sent while ACC_LCPCONF was in job buffer
  if ACC_LCPCONF would have arrived eariler (before TUP_LCPSTARTED)
  operations could lockup. 
  But would be restarted on next LCP

-- LQH
1) Better check for LCP started that will also return true
   if ACC or TUP already has completed
    
2) Remove incorrect if statement that prevented operations to
   be started if ACC has completed

-- ACC
Make sure all ACC_CONTOPCONF are sent before releasing lcp record
  i.e. use noOfLcpConf == 4 (2 ACC_LCPCONF + 2 ACC_CONTOPCONF)

Check for == 4 also when sending ACC_CONTOPCONF


ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
  Make sure all ACC_CONTOPCONF are sent before releasing lcp record
    i.e. use noOfLcpConf == 4 (2 ACC_LCPCONF + 2 ACC_CONTOPCONF)
  
  Check for == 4 also when sending ACC_CONTOPCONF
ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  Remove LCP_STARTED state
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  1) Better check for LCP started that will also return true
     if ACC or TUP already has completed
  
  2) Remove incorrect if statement that prevented operations to
     be started if ACC has completed
parent 06c001ec
......@@ -8486,7 +8486,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal)
break;
}//switch
lcpConnectptr.p->noOfLcpConf++;
ndbrequire(lcpConnectptr.p->noOfLcpConf <= 2);
ndbrequire(lcpConnectptr.p->noOfLcpConf <= 4);
fragrecptr.p->fragState = ACTIVEFRAG;
rlpPageptr.i = fragrecptr.p->zeroPagePtr;
ptrCheckGuard(rlpPageptr, cpagesize, page8);
......@@ -8504,7 +8504,7 @@ void Dbacc::checkSendLcpConfLab(Signal* signal)
}//for
signal->theData[0] = fragrecptr.p->lcpLqhPtr;
sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPCONF, signal, 1, JBB);
if (lcpConnectptr.p->noOfLcpConf == 2) {
if (lcpConnectptr.p->noOfLcpConf == 4) {
jam();
releaseLcpConnectRec(signal);
rootfragrecptr.i = fragrecptr.p->myroot;
......@@ -8535,6 +8535,13 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal)
/* LOCAL FRAG ID */
tresult = 0;
ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
if(ERROR_INSERTED(3002) && lcpConnectptr.p->noOfLcpConf < 2)
{
sendSignalWithDelay(cownBlockref, GSN_ACC_CONTOPREQ, signal, 300,
signal->getLength());
return;
}
ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
......@@ -8568,6 +8575,15 @@ void Dbacc::execACC_CONTOPREQ(Signal* signal)
}//while
signal->theData[0] = fragrecptr.p->lcpLqhPtr;
sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_CONTOPCONF, signal, 1, JBA);
lcpConnectptr.p->noOfLcpConf++;
if (lcpConnectptr.p->noOfLcpConf == 4) {
jam();
releaseLcpConnectRec(signal);
rootfragrecptr.i = fragrecptr.p->myroot;
ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
rootfragrecptr.p->rootState = ACTIVEROOT;
}//if
return; /* ALL QUEUED OPERATION ARE RESTARTED IF NEEDED. */
}//Dbacc::execACC_CONTOPREQ()
......
......@@ -968,7 +968,6 @@ public:
enum LcpState {
LCP_IDLE = 0,
LCP_STARTED = 1,
LCP_COMPLETED = 2,
LCP_WAIT_FRAGID = 3,
LCP_WAIT_TUP_PREPLCP = 4,
......@@ -2266,7 +2265,7 @@ private:
void sendCopyActiveConf(Signal* signal,Uint32 tableId);
void checkLcpCompleted(Signal* signal);
void checkLcpHoldop(Signal* signal);
void checkLcpStarted(Signal* signal);
bool checkLcpStarted(Signal* signal);
void checkLcpTupprep(Signal* signal);
void getNextFragForLcp(Signal* signal);
void initLcpLocAcc(Signal* signal, Uint32 fragId);
......
......@@ -10351,8 +10351,8 @@ void Dblqh::execTUP_LCPSTARTED(Signal* signal)
void Dblqh::lcpStartedLab(Signal* signal)
{
checkLcpStarted(signal);
if (lcpPtr.p->lcpState == LcpRecord::LCP_STARTED) {
if (checkLcpStarted(signal))
{
jam();
/* ----------------------------------------------------------------------
* THE LOCAL CHECKPOINT HAS BEEN STARTED. IT IS NOW TIME TO
......@@ -10432,26 +10432,7 @@ void Dblqh::execLQH_RESTART_OP(Signal* signal)
lcpPtr.i = signal->theData[1];
ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
ndbrequire(fragptr.p->fragStatus == Fragrecord::BLOCKED);
if (lcpPtr.p->lcpState == LcpRecord::LCP_STARTED) {
jam();
/***********************************************************************/
/* THIS SIGNAL CAN ONLY BE RECEIVED WHEN FRAGMENT IS BLOCKED AND
* THE LOCAL CHECKPOINT HAS BEEN STARTED. THE BLOCKING WILL BE
* REMOVED AS SOON AS ALL OPERATIONS HAVE BEEN STARTED.
***********************************************************************/
restartOperationsLab(signal);
} else if (lcpPtr.p->lcpState == LcpRecord::LCP_BLOCKED_COMP) {
jam();
/*******************************************************************>
* THE CHECKPOINT IS COMPLETED BUT HAS NOT YET STARTED UP
* ALL OPERATIONS AGAIN.
* WE PERFORM THIS START-UP BEFORE CONTINUING WITH THE NEXT
* FRAGMENT OF THE LOCAL CHECKPOINT TO AVOID ANY STRANGE ERRORS.
*******************************************************************> */
restartOperationsLab(signal);
} else {
ndbrequire(false);
}
}//Dblqh::execLQH_RESTART_OP()
void Dblqh::restartOperationsLab(Signal* signal)
......@@ -11000,7 +10981,8 @@ void Dblqh::checkLcpHoldop(Signal* signal)
*
* SUBROUTINE SHORT NAME = CLS
* ========================================================================== */
void Dblqh::checkLcpStarted(Signal* signal)
bool
Dblqh::checkLcpStarted(Signal* signal)
{
LcpLocRecordPtr clsLcpLocptr;
......@@ -11010,7 +10992,7 @@ void Dblqh::checkLcpStarted(Signal* signal)
do {
ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_WAIT_STARTED){
return;
return false;
}//if
clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc;
i++;
......@@ -11021,12 +11003,13 @@ void Dblqh::checkLcpStarted(Signal* signal)
do {
ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::TUP_WAIT_STARTED){
return;
return false;
}//if
clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc;
i++;
} while (clsLcpLocptr.i != RNIL);
lcpPtr.p->lcpState = LcpRecord::LCP_STARTED;
return true;
}//Dblqh::checkLcpStarted()
/* ==========================================================================
......@@ -11187,20 +11170,12 @@ void Dblqh::sendAccContOp(Signal* signal)
do {
ptrCheckGuard(sacLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
sacLcpLocptr.p->accContCounter = 0;
if(sacLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_STARTED){
/* ------------------------------------------------------------------- */
/*SEND START OPERATIONS TO ACC AGAIN */
/* ------------------------------------------------------------------- */
signal->theData[0] = lcpPtr.p->lcpAccptr;
signal->theData[1] = sacLcpLocptr.p->locFragid;
sendSignal(fragptr.p->accBlockref, GSN_ACC_CONTOPREQ, signal, 2, JBA);
count++;
} else if(sacLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_COMPLETED){
signal->theData[0] = sacLcpLocptr.i;
sendSignal(reference(), GSN_ACC_CONTOPCONF, signal, 1, JBB);
} else {
ndbrequire(false);
}
sacLcpLocptr.i = sacLcpLocptr.p->nextLcpLoc;
} while (sacLcpLocptr.i != RNIL);
......@@ -11236,9 +11211,18 @@ void Dblqh::sendStartLcp(Signal* signal)
signal->theData[0] = stlLcpLocptr.i;
signal->theData[1] = cownref;
signal->theData[2] = stlLcpLocptr.p->tupRef;
if(ERROR_INSERTED(5077))
sendSignalWithDelay(fragptr.p->tupBlockref, GSN_TUP_LCPREQ,
signal, 5000, 3);
else
sendSignal(fragptr.p->tupBlockref, GSN_TUP_LCPREQ, signal, 3, JBA);
stlLcpLocptr.i = stlLcpLocptr.p->nextLcpLoc;
} while (stlLcpLocptr.i != RNIL);
if(ERROR_INSERTED(5077))
{
ndbout_c("Delayed TUP_LCPREQ with 5 sec");
}
}//Dblqh::sendStartLcp()
/* ------------------------------------------------------------------------- */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment