Commit 7a677c49 authored by unknown's avatar unknown

Merge jonas@perch:/home/jonas/src/51-work

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb
parents 85dd6969 3a20e8d7
...@@ -1394,6 +1394,7 @@ void Dblqh::execTUP_ADD_ATTCONF(Signal* signal) ...@@ -1394,6 +1394,7 @@ void Dblqh::execTUP_ADD_ATTCONF(Signal* signal)
if (! DictTabInfo::isOrderedIndex(addfragptr.p->tableType)) if (! DictTabInfo::isOrderedIndex(addfragptr.p->tableType))
{ {
fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED; fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED;
//fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY;
fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION; fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
} }
else else
...@@ -2470,6 +2471,8 @@ void Dblqh::execTUPKEYCONF(Signal* signal) ...@@ -2470,6 +2471,8 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
jamEntry(); jamEntry();
tcConnectptr.i = tcIndex; tcConnectptr.i = tcIndex;
ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec); ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
TcConnectionrec * regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
FragrecordPtr regFragptr; FragrecordPtr regFragptr;
regFragptr.i = tcConnectptr.p->fragmentptr; regFragptr.i = tcConnectptr.p->fragmentptr;
...@@ -2497,6 +2500,32 @@ void Dblqh::execTUPKEYCONF(Signal* signal) ...@@ -2497,6 +2500,32 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
// Abort was not ready to start until this signal came back. Now we are ready // Abort was not ready to start until this signal came back. Now we are ready
// to start the abort. // to start the abort.
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
ndbrequire(regTcPtr->m_nr_delete.m_cnt);
regTcPtr->m_nr_delete.m_cnt--;
if (regTcPtr->m_nr_delete.m_cnt)
{
jam();
/**
* Let operation wait for pending NR operations
* even for before writing log...(as it's simpler)
*/
#ifdef VM_TRACE
/**
* Only disk table can have pending ops...
*/
TablerecPtr tablePtr;
tablePtr.i = regTcPtr->tableref;
ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec);
ndbrequire(tablePtr.p->m_disk_table);
#endif
return;
}
}
abortCommonLab(signal); abortCommonLab(signal);
break; break;
case TcConnectionrec::WAIT_ACC_ABORT: case TcConnectionrec::WAIT_ACC_ABORT:
...@@ -2523,13 +2552,23 @@ void Dblqh::execTUPKEYREF(Signal* signal) ...@@ -2523,13 +2552,23 @@ void Dblqh::execTUPKEYREF(Signal* signal)
tcConnectptr.i = tupKeyRef->userRef; tcConnectptr.i = tupKeyRef->userRef;
terrorCode = tupKeyRef->errorCode; terrorCode = tupKeyRef->errorCode;
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
TcConnectionrec* regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
FragrecordPtr regFragptr; FragrecordPtr regFragptr;
regFragptr.i = tcConnectptr.p->fragmentptr; regFragptr.i = regTcPtr->fragmentptr;
c_fragment_pool.getPtr(regFragptr); c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr; fragptr = regFragptr;
TcConnectionrec* regTcPtr = tcConnectptr.p; if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
ndbrequire(regTcPtr->m_nr_delete.m_cnt);
regTcPtr->m_nr_delete.m_cnt--;
ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP ||
regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT);
}
switch (tcConnectptr.p->transactionState) { switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP: case TcConnectionrec::WAIT_TUP:
jam(); jam();
...@@ -3767,7 +3806,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) ...@@ -3767,7 +3806,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
jamEntry(); jamEntry();
execACC_ABORTCONF(signal); packLqhkeyreqLab(signal);
} }
} }
...@@ -3890,16 +3929,17 @@ Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr) ...@@ -3890,16 +3929,17 @@ Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr)
if (TRACENR_FLAG) if (TRACENR_FLAG)
TRACENR(" performing DELETE key: " TRACENR(" performing DELETE key: "
<< dst[0] << endl); << dst[0] << endl);
regTcPtr.p->tupkeyData[0] = regTcPtr.p->m_row_id.ref();
if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr) nr_copy_delete_row(signal, regTcPtr, &regTcPtr.p->m_row_id, len);
{ ndbassert(regTcPtr.p->m_nr_delete.m_cnt);
regTcPtr.p->hashValue = calculateHash(tableId, dst); regTcPtr.p->m_nr_delete.m_cnt--; // No real op is run
} if (regTcPtr.p->m_nr_delete.m_cnt)
else
{ {
regTcPtr.p->hashValue = md5_hash((Uint64*)dst, len); jam();
return;
} }
goto run; packLqhkeyreqLab(signal);
return;
} }
else if (len == 0 && op == ZDELETE) else if (len == 0 && op == ZDELETE)
{ {
...@@ -3993,9 +4033,7 @@ Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr) ...@@ -3993,9 +4033,7 @@ Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr)
signal->theData[0] = regTcPtr.p->tupConnectrec; signal->theData[0] = regTcPtr.p->tupConnectrec;
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1); EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
regTcPtr.p->transactionState = TcConnectionrec::WAIT_ACC_ABORT; packLqhkeyreqLab(signal);
signal->theData[0] = regTcPtr.i;
execACC_ABORTCONF(signal);
} }
int int
...@@ -4149,7 +4187,6 @@ Dblqh::get_nr_op_info(Nr_op_info* op, Uint32 page_id) ...@@ -4149,7 +4187,6 @@ Dblqh::get_nr_op_info(Nr_op_info* op, Uint32 page_id)
op->m_gci = tcPtr.p->gci; op->m_gci = tcPtr.p->gci;
op->m_tup_frag_ptr_i = fragPtr.p->tupFragptr; op->m_tup_frag_ptr_i = fragPtr.p->tupFragptr;
ndbrequire(tcPtr.p->transactionState == TcConnectionrec::WAIT_TUP_COMMIT);
ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY); ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY);
ndbrequire(tcPtr.p->m_nr_delete.m_cnt); ndbrequire(tcPtr.p->m_nr_delete.m_cnt);
...@@ -4194,16 +4231,36 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op) ...@@ -4194,16 +4231,36 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op)
tcPtr.i = op->m_ptr_i; tcPtr.i = op->m_ptr_i;
ptrCheckGuard(tcPtr, ctcConnectrecFileSize, tcConnectionrec); ptrCheckGuard(tcPtr, ctcConnectrecFileSize, tcConnectionrec);
ndbrequire(tcPtr.p->transactionState == TcConnectionrec::WAIT_TUP_COMMIT);
ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY); ndbrequire(tcPtr.p->activeCreat == Fragrecord::AC_NR_COPY);
ndbrequire(tcPtr.p->m_nr_delete.m_cnt); ndbrequire(tcPtr.p->m_nr_delete.m_cnt);
tcPtr.p->m_nr_delete.m_cnt--; tcPtr.p->m_nr_delete.m_cnt--;
if (tcPtr.p->m_nr_delete.m_cnt == 0) if (tcPtr.p->m_nr_delete.m_cnt == 0)
{ {
jam();
tcConnectptr = tcPtr; tcConnectptr = tcPtr;
c_fragment_pool.getPtr(fragptr, tcPtr.p->fragmentptr); c_fragment_pool.getPtr(fragptr, tcPtr.p->fragmentptr);
if (tcPtr.p->abortState != TcConnectionrec::ABORT_IDLE)
{
jam();
tcPtr.p->activeCreat = Fragrecord::AC_NORMAL;
abortCommonLab(signal);
}
else if (tcPtr.p->operation == ZDELETE &&
LqhKeyReq::getNrCopyFlag(tcPtr.p->reqinfo))
{
/**
* This is run directly in handle_nr_copy
*/
jam();
packLqhkeyreqLab(signal); packLqhkeyreqLab(signal);
}
else
{
jam();
rwConcludedLab(signal);
}
return; return;
} }
...@@ -4319,7 +4376,6 @@ void Dblqh::execACCKEYCONF(Signal* signal) ...@@ -4319,7 +4376,6 @@ void Dblqh::execACCKEYCONF(Signal* signal)
return; return;
}//if }//if
// reset the activeCreat since that is only valid in cases where the record was not present.
/* ------------------------------------------------------------------------ /* ------------------------------------------------------------------------
* IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE * IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE
* INFORMATION ON WHICH TABLE AND FRAGMENT, THE LOCAL KEY AND IT NEEDS TO * INFORMATION ON WHICH TABLE AND FRAGMENT, THE LOCAL KEY AND IT NEEDS TO
...@@ -4536,6 +4592,7 @@ Dblqh::acckeyconf_load_diskpage(Signal* signal, TcConnectionrecPtr regTcPtr, ...@@ -4536,6 +4592,7 @@ Dblqh::acckeyconf_load_diskpage(Signal* signal, TcConnectionrecPtr regTcPtr,
} }
else else
{ {
regTcPtr.p->transactionState = TcConnectionrec::WAIT_TUP;
TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr(); TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
ref->userRef= regTcPtr.i; ref->userRef= regTcPtr.i;
ref->errorCode= ~0; ref->errorCode= ~0;
...@@ -4571,6 +4628,7 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal, ...@@ -4571,6 +4628,7 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal,
} }
else else
{ {
regTcPtr->transactionState = TcConnectionrec::WAIT_TUP;
TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr(); TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
ref->userRef= callbackData; ref->userRef= callbackData;
ref->errorCode= disk_page; ref->errorCode= disk_page;
...@@ -4592,9 +4650,11 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal, ...@@ -4592,9 +4650,11 @@ Dblqh::acckeyconf_load_diskpage_callback(Signal* signal,
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
void Dblqh::tupkeyConfLab(Signal* signal) void Dblqh::tupkeyConfLab(Signal* signal)
{ {
/* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED ---- */ /* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED --- */
const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0]; const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0];
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
if (regTcPtr->simpleRead) { if (regTcPtr->simpleRead) {
jam(); jam();
/* ---------------------------------------------------------------------- /* ----------------------------------------------------------------------
...@@ -4616,6 +4676,34 @@ void Dblqh::tupkeyConfLab(Signal* signal) ...@@ -4616,6 +4676,34 @@ void Dblqh::tupkeyConfLab(Signal* signal)
}//if }//if
regTcPtr->totSendlenAi = tupKeyConf->writeLength; regTcPtr->totSendlenAi = tupKeyConf->writeLength;
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen); ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
ndbrequire(regTcPtr->m_nr_delete.m_cnt);
regTcPtr->m_nr_delete.m_cnt--;
if (regTcPtr->m_nr_delete.m_cnt)
{
jam();
/**
* Let operation wait for pending NR operations
* even for before writing log...(as it's simpler)
*/
#ifdef VM_TRACE
/**
* Only disk table can have pending ops...
*/
TablerecPtr tablePtr;
tablePtr.i = regTcPtr->tableref;
ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec);
ndbrequire(tablePtr.p->m_disk_table);
#endif
return;
}
}
rwConcludedLab(signal); rwConcludedLab(signal);
return; return;
}//Dblqh::tupkeyConfLab() }//Dblqh::tupkeyConfLab()
...@@ -6325,27 +6413,19 @@ Dblqh::tupcommit_conf(Signal* signal, ...@@ -6325,27 +6413,19 @@ Dblqh::tupcommit_conf(Signal* signal,
/*SEND ANY COMMIT OR COMPLETE MESSAGES TO OTHER NODES. THEY WILL MERELY SEND */ /*SEND ANY COMMIT OR COMPLETE MESSAGES TO OTHER NODES. THEY WILL MERELY SEND */
/*THOSE SIGNALS INTERNALLY. */ /*THOSE SIGNALS INTERNALLY. */
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
if (tcPtrP->abortState == TcConnectionrec::ABORT_IDLE) { if (tcPtrP->abortState == TcConnectionrec::ABORT_IDLE)
{
jam(); jam();
if (activeCreat == Fragrecord::AC_NR_COPY && if (activeCreat == Fragrecord::AC_NR_COPY)
tcPtrP->m_nr_delete.m_cnt > 1)
{ {
jam(); jam();
/** ndbrequire(LqhKeyReq::getNrCopyFlag(tcPtrP->reqinfo));
* Nr delete waiting for disk delete to complete... ndbrequire(tcPtrP->m_nr_delete.m_cnt == 0);
*/
#ifdef VM_TRACE
TablerecPtr tablePtr;
tablePtr.i = tcPtrP->tableref;
ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec);
ndbrequire(tablePtr.p->m_disk_table);
#endif
tcPtrP->m_nr_delete.m_cnt--;
tcPtrP->transactionState = TcConnectionrec::WAIT_TUP_COMMIT;
return;
} }
packLqhkeyreqLab(signal); packLqhkeyreqLab(signal);
} else { }
else
{
ndbrequire(tcPtrP->abortState != TcConnectionrec::NEW_FROM_TC); ndbrequire(tcPtrP->abortState != TcConnectionrec::NEW_FROM_TC);
jam(); jam();
sendLqhTransconf(signal, LqhTransConf::Committed); sendLqhTransconf(signal, LqhTransConf::Committed);
...@@ -6549,7 +6629,7 @@ void Dblqh::execABORT(Signal* signal) ...@@ -6549,7 +6629,7 @@ void Dblqh::execABORT(Signal* signal)
}//if }//if
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
if (ERROR_INSERTED(5100)) if (ERROR_INSERTED(5100))
{ {
SET_ERROR_INSERT_VALUE(5101); SET_ERROR_INSERT_VALUE(5101);
...@@ -6574,10 +6654,10 @@ void Dblqh::execABORT(Signal* signal) ...@@ -6574,10 +6654,10 @@ void Dblqh::execABORT(Signal* signal)
sendSignal(TLqhRef, GSN_ABORT, signal, 4, JBB); sendSignal(TLqhRef, GSN_ABORT, signal, 4, JBB);
}//if }//if
regTcPtr->abortState = TcConnectionrec::ABORT_FROM_TC; regTcPtr->abortState = TcConnectionrec::ABORT_FROM_TC;
regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
const Uint32 commitAckMarker = regTcPtr->commitAckMarker; const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
if(commitAckMarker != RNIL){ if(commitAckMarker != RNIL)
{
jam(); jam();
#ifdef MARKER_TRACE #ifdef MARKER_TRACE
{ {
...@@ -6627,6 +6707,7 @@ void Dblqh::execABORTREQ(Signal* signal) ...@@ -6627,6 +6707,7 @@ void Dblqh::execABORTREQ(Signal* signal)
return; return;
}//if }//if
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
if (regTcPtr->transactionState != TcConnectionrec::PREPARED) { if (regTcPtr->transactionState != TcConnectionrec::PREPARED) {
warningReport(signal, 10); warningReport(signal, 10);
return; return;
...@@ -6634,7 +6715,7 @@ void Dblqh::execABORTREQ(Signal* signal) ...@@ -6634,7 +6715,7 @@ void Dblqh::execABORTREQ(Signal* signal)
regTcPtr->reqBlockref = reqBlockref; regTcPtr->reqBlockref = reqBlockref;
regTcPtr->reqRef = reqPtr; regTcPtr->reqRef = reqPtr;
regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC; regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC;
regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
abortCommonLab(signal); abortCommonLab(signal);
return; return;
}//Dblqh::execABORTREQ() }//Dblqh::execABORTREQ()
...@@ -6704,22 +6785,7 @@ void Dblqh::execACCKEYREF(Signal* signal) ...@@ -6704,22 +6785,7 @@ void Dblqh::execACCKEYREF(Signal* signal)
} }
if (tcPtr->activeCreat == Fragrecord::AC_NR_COPY) ndbrequire(tcPtr->activeCreat == Fragrecord::AC_NORMAL);
{
jam();
Uint32 op = tcPtr->operation;
switch(errCode){
case ZNO_TUPLE_FOUND:
ndbrequire(op == ZDELETE);
break;
break;
default:
ndbrequire(false);
}
tcPtr->activeCreat = Fragrecord::AC_IGNORED;
}
else
{
ndbrequire(!LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo)); ndbrequire(!LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo));
/** /**
...@@ -6739,7 +6805,6 @@ void Dblqh::execACCKEYREF(Signal* signal) ...@@ -6739,7 +6805,6 @@ void Dblqh::execACCKEYREF(Signal* signal)
(tcPtr->seqNoReplica == 0 || (tcPtr->seqNoReplica == 0 ||
errCode != ZTUPLE_ALREADY_EXIST || errCode != ZTUPLE_ALREADY_EXIST ||
(tcPtr->operation == ZREAD && (tcPtr->dirtyOp || tcPtr->opSimple))); (tcPtr->operation == ZREAD && (tcPtr->dirtyOp || tcPtr->opSimple)));
}
tcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; tcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
abortCommonLab(signal); abortCommonLab(signal);
...@@ -6753,7 +6818,6 @@ void Dblqh::localAbortStateHandlerLab(Signal* signal) ...@@ -6753,7 +6818,6 @@ void Dblqh::localAbortStateHandlerLab(Signal* signal)
jam(); jam();
return; return;
}//if }//if
regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
regTcPtr->errorCode = terrorCode; regTcPtr->errorCode = terrorCode;
abortStateHandlerLab(signal); abortStateHandlerLab(signal);
...@@ -6929,11 +6993,6 @@ void Dblqh::abortErrorLab(Signal* signal) ...@@ -6929,11 +6993,6 @@ void Dblqh::abortErrorLab(Signal* signal)
regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH; regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
regTcPtr->errorCode = terrorCode; regTcPtr->errorCode = terrorCode;
}//if }//if
/* -----------------------------------------------------------------------
* ACTIVE CREATION IS RESET FOR ALL ERRORS WHICH SHOULD BE HANDLED
* WITH NORMAL ABORT HANDLING.
* ----------------------------------------------------------------------- */
regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
abortCommonLab(signal); abortCommonLab(signal);
return; return;
}//Dblqh::abortErrorLab() }//Dblqh::abortErrorLab()
...@@ -6942,8 +7001,9 @@ void Dblqh::abortCommonLab(Signal* signal) ...@@ -6942,8 +7001,9 @@ void Dblqh::abortCommonLab(Signal* signal)
{ {
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
const Uint32 commitAckMarker = regTcPtr->commitAckMarker; const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
if(regTcPtr->activeCreat != Fragrecord::AC_IGNORED && const Uint32 activeCreat = regTcPtr->activeCreat;
commitAckMarker != RNIL){ if (commitAckMarker != RNIL)
{
/** /**
* There is no NR ongoing and we have a marker * There is no NR ongoing and we have a marker
*/ */
...@@ -6959,6 +7019,29 @@ void Dblqh::abortCommonLab(Signal* signal) ...@@ -6959,6 +7019,29 @@ void Dblqh::abortCommonLab(Signal* signal)
regTcPtr->commitAckMarker = RNIL; regTcPtr->commitAckMarker = RNIL;
} }
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
if (regTcPtr->m_nr_delete.m_cnt)
{
jam();
/**
* Let operation wait for pending NR operations
*/
#ifdef VM_TRACE
/**
* Only disk table can have pending ops...
*/
TablerecPtr tablePtr;
tablePtr.i = regTcPtr->tableref;
ptrCheckGuard(tablePtr, ctabrecFileSize, tablerec);
ndbrequire(tablePtr.p->m_disk_table);
#endif
return;
}
}
fragptr.i = regTcPtr->fragmentptr; fragptr.i = regTcPtr->fragmentptr;
if (fragptr.i != RNIL) { if (fragptr.i != RNIL) {
jam(); jam();
...@@ -7034,25 +7117,6 @@ void Dblqh::execACC_ABORTCONF(Signal* signal) ...@@ -7034,25 +7117,6 @@ void Dblqh::execACC_ABORTCONF(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT); ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
if (regTcPtr->activeCreat == Fragrecord::AC_IGNORED) {
/* ----------------------------------------------------------------------
* A NORMAL EVENT DURING CREATION OF A FRAGMENT. WE NOW NEED TO CONTINUE
* WITH NORMAL COMMIT PROCESSING.
* --------------------------------------------------------------------- */
if (regTcPtr->currTupAiLen == regTcPtr->totReclenAi) {
jam();
regTcPtr->abortState = TcConnectionrec::ABORT_IDLE;
fragptr.i = regTcPtr->fragmentptr;
c_fragment_pool.getPtr(fragptr);
rwConcludedLab(signal);
return;
} else {
ndbrequire(regTcPtr->currTupAiLen < regTcPtr->totReclenAi);
jam();
regTcPtr->transactionState = TcConnectionrec::WAIT_AI_AFTER_ABORT;
return;
}//if
}//if
continueAbortLab(signal); continueAbortLab(signal);
return; return;
}//Dblqh::execACC_ABORTCONF() }//Dblqh::execACC_ABORTCONF()
...@@ -9450,7 +9514,7 @@ void Dblqh::initScanTc(const ScanFragReq* req, ...@@ -9450,7 +9514,7 @@ void Dblqh::initScanTc(const ScanFragReq* req,
tcConnectptr.p->m_offset_current_keybuf = 0; tcConnectptr.p->m_offset_current_keybuf = 0;
tcConnectptr.p->m_scan_curr_range_no = 0; tcConnectptr.p->m_scan_curr_range_no = 0;
tcConnectptr.p->m_dealloc = 0; tcConnectptr.p->m_dealloc = 0;
tcConnectptr.p->activeCreat = Fragrecord::AC_NORMAL;
TablerecPtr tTablePtr; TablerecPtr tTablePtr;
tTablePtr.i = tabptr.p->primaryTableId; tTablePtr.i = tabptr.p->primaryTableId;
ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec); ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec);
...@@ -9929,16 +9993,21 @@ void Dblqh::continueFirstCopyAfterBlockedLab(Signal* signal) ...@@ -9929,16 +9993,21 @@ void Dblqh::continueFirstCopyAfterBlockedLab(Signal* signal)
*/ */
fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY; fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY;
if (0) scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
if (false && fragptr.p->tabRef > 4)
{ {
ndbout_c("STOPPING COPY (%d -> %d %d %d)", ndbout_c("STOPPING COPY X = [ %d %d %d %d ]",
scanptr.p->scanBlockref, refToBlock(scanptr.p->scanBlockref),
scanptr.p->scanAccPtr, RNIL, NextScanReq::ZSCAN_NEXT); scanptr.p->scanAccPtr, RNIL, NextScanReq::ZSCAN_NEXT);
/**
* RESTART: > DUMP 7020 332 X
*/
return; return;
} }
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = RNIL; signal->theData[1] = RNIL;
signal->theData[2] = NextScanReq::ZSCAN_NEXT; signal->theData[2] = NextScanReq::ZSCAN_NEXT;
...@@ -18351,6 +18420,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) ...@@ -18351,6 +18420,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
<< " tcBlockref = " << hex << tcRec.p->tcBlockref << " tcBlockref = " << hex << tcRec.p->tcBlockref
<< " reqBlockref = " << hex << tcRec.p->reqBlockref << " reqBlockref = " << hex << tcRec.p->reqBlockref
<< " primKeyLen = " << tcRec.p->primKeyLen << " primKeyLen = " << tcRec.p->primKeyLen
<< " nrcopyflag = " << LqhKeyReq::getNrCopyFlag(tcRec.p->reqinfo)
<< endl; << endl;
ndbout << " nextReplica = " << tcRec.p->nextReplica ndbout << " nextReplica = " << tcRec.p->nextReplica
<< " tcBlockref = " << hex << tcRec.p->tcBlockref << " tcBlockref = " << hex << tcRec.p->tcBlockref
...@@ -18421,6 +18491,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) ...@@ -18421,6 +18491,7 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
<< endl; << endl;
ndbout << " tupkeyData2 = " << tcRec.p->tupkeyData[2] ndbout << " tupkeyData2 = " << tcRec.p->tupkeyData[2]
<< " tupkeyData3 = " << tcRec.p->tupkeyData[3] << " tupkeyData3 = " << tcRec.p->tupkeyData[3]
<< " m_nr_delete.m_cnt = " << tcRec.p->m_nr_delete.m_cnt
<< endl; << endl;
switch (tcRec.p->transactionState) { switch (tcRec.p->transactionState) {
......
...@@ -484,6 +484,14 @@ Dbtup::load_diskpage(Signal* signal, ...@@ -484,6 +484,14 @@ Dbtup::load_diskpage(Signal* signal,
req.m_callback.m_callbackFunction= req.m_callback.m_callbackFunction=
safe_cast(&Dbtup::disk_page_load_callback); safe_cast(&Dbtup::disk_page_load_callback);
#ifdef ERROR_INSERTED
if (ERROR_INSERTED(4022))
{
flags |= Page_cache_client::DELAY_REQ;
req.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)3000;
}
#endif
if((res= m_pgman.get_page(signal, req, flags)) > 0) if((res= m_pgman.get_page(signal, req, flags)) > 0)
{ {
//ndbout_c("in cache"); //ndbout_c("in cache");
...@@ -3119,6 +3127,35 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData, ...@@ -3119,6 +3127,35 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData,
preq.m_callback.m_callbackFunction = preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::nr_delete_page_callback); safe_cast(&Dbtup::nr_delete_page_callback);
int flags = Page_cache_client::COMMIT_REQ; int flags = Page_cache_client::COMMIT_REQ;
#ifdef ERROR_INSERT
if (ERROR_INSERTED(4023) || ERROR_INSERTED(4024))
{
int rnd = rand() % 100;
int slp = 0;
if (ERROR_INSERTED(4024))
{
slp = 3000;
}
else if (rnd > 90)
{
slp = 3000;
}
else if (rnd > 70)
{
slp = 100;
}
ndbout_c("rnd: %d slp: %d", rnd, slp);
if (slp)
{
flags |= Page_cache_client::DELAY_REQ;
preq.m_delay_until_time = NdbTick_CurrentMillisecond()+(Uint64)slp;
}
}
#endif
res = m_pgman.get_page(signal, preq, flags); res = m_pgman.get_page(signal, preq, flags);
if (res == 0) if (res == 0)
{ {
......
...@@ -944,12 +944,16 @@ Pgman::process_callback(Signal* signal) ...@@ -944,12 +944,16 @@ Pgman::process_callback(Signal* signal)
int max_count = 1; int max_count = 1;
Page_sublist& pl_callback = *m_page_sublist[Page_entry::SL_CALLBACK]; Page_sublist& pl_callback = *m_page_sublist[Page_entry::SL_CALLBACK];
while (! pl_callback.isEmpty() && --max_count >= 0)
{
jam();
Ptr<Page_entry> ptr; Ptr<Page_entry> ptr;
pl_callback.first(ptr); pl_callback.first(ptr);
if (! process_callback(signal, ptr))
while (! ptr.isNull() && --max_count >= 0)
{
jam();
Ptr<Page_entry> curr = ptr;
pl_callback.next(ptr);
if (! process_callback(signal, curr))
{ {
jam(); jam();
break; break;
...@@ -987,6 +991,18 @@ Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr) ...@@ -987,6 +991,18 @@ Pgman::process_callback(Signal* signal, Ptr<Page_entry> ptr)
#ifdef VM_TRACE #ifdef VM_TRACE
debugOut << "PGMAN: " << req_ptr << " : process_callback" << endl; debugOut << "PGMAN: " << req_ptr << " : process_callback" << endl;
#endif #endif
#ifdef ERROR_INSERT
if (req_ptr.p->m_flags & Page_request::DELAY_REQ)
{
Uint64 now = NdbTick_CurrentMillisecond();
if (now < req_ptr.p->m_delay_until_time)
{
break;
}
}
#endif
b = globalData.getBlock(req_ptr.p->m_block); b = globalData.getBlock(req_ptr.p->m_block);
callback = req_ptr.p->m_callback; callback = req_ptr.p->m_callback;
...@@ -1314,6 +1330,24 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr) ...@@ -1314,6 +1330,24 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr)
state |= Page_entry::MAPPED; state |= Page_entry::MAPPED;
set_page_state(ptr, state); set_page_state(ptr, state);
{
/**
* Update lsn record on page
* as it can be modified/flushed wo/ update_lsn has been called
* (e.g. prealloc) and it then would get lsn 0, which is bad
* when running undo and following SR
*/
Ptr<GlobalPage> pagePtr;
m_global_page_pool.getPtr(pagePtr, ptr.p->m_real_page_i);
File_formats::Datafile::Data_page* page =
(File_formats::Datafile::Data_page*)pagePtr.p;
Uint64 lsn = 0;
lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += page->m_page_header.m_page_lsn_lo;
ptr.p->m_lsn = lsn;
}
ndbrequire(m_stats.m_current_io_waits > 0); ndbrequire(m_stats.m_current_io_waits > 0);
m_stats.m_current_io_waits--; m_stats.m_current_io_waits--;
...@@ -1576,6 +1610,12 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req) ...@@ -1576,6 +1610,12 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
bool only_request = ptr.p->m_requests.isEmpty(); bool only_request = ptr.p->m_requests.isEmpty();
if (req_flags & Page_request::DELAY_REQ)
{
jam();
only_request = false;
}
if (only_request && if (only_request &&
state & Page_entry::MAPPED) state & Page_entry::MAPPED)
{ {
...@@ -1623,6 +1663,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req) ...@@ -1623,6 +1663,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
req_ptr.p->m_block = page_req.m_block; req_ptr.p->m_block = page_req.m_block;
req_ptr.p->m_flags = page_req.m_flags; req_ptr.p->m_flags = page_req.m_flags;
req_ptr.p->m_callback = page_req.m_callback; req_ptr.p->m_callback = page_req.m_callback;
#ifdef ERROR_INSERT
req_ptr.p->m_delay_until_time = page_req.m_delay_until_time;
#endif
state |= Page_entry::REQUEST; state |= Page_entry::REQUEST;
if (only_request && req_flags & Page_request::EMPTY_PAGE) if (only_request && req_flags & Page_request::EMPTY_PAGE)
......
...@@ -256,12 +256,18 @@ private: ...@@ -256,12 +256,18 @@ private:
,DIRTY_REQ = 0x0200 // make page dirty wo/ update_lsn ,DIRTY_REQ = 0x0200 // make page dirty wo/ update_lsn
,UNLOCK_PAGE = 0x0400 ,UNLOCK_PAGE = 0x0400
,CORR_REQ = 0x0800 // correlated request (no LIRS update) ,CORR_REQ = 0x0800 // correlated request (no LIRS update)
#ifdef ERROR_INSERT
,DELAY_REQ = 0x1000 // Force request to be delayed
#endif
}; };
Uint16 m_block; Uint16 m_block;
Uint16 m_flags; Uint16 m_flags;
SimulatedBlock::Callback m_callback; SimulatedBlock::Callback m_callback;
#ifdef ERROR_INSERT
Uint64 m_delay_until_time;
#endif
Uint32 nextList; Uint32 nextList;
Uint32 m_magic; Uint32 m_magic;
}; };
...@@ -508,6 +514,10 @@ public: ...@@ -508,6 +514,10 @@ public:
struct Request { struct Request {
Local_key m_page; Local_key m_page;
SimulatedBlock::Callback m_callback; SimulatedBlock::Callback m_callback;
#ifdef ERROR_INSERT
Uint64 m_delay_until_time;
#endif
}; };
Ptr<GlobalPage> m_ptr; // TODO remove Ptr<GlobalPage> m_ptr; // TODO remove
...@@ -520,6 +530,9 @@ public: ...@@ -520,6 +530,9 @@ public:
,DIRTY_REQ = Pgman::Page_request::DIRTY_REQ ,DIRTY_REQ = Pgman::Page_request::DIRTY_REQ
,UNLOCK_PAGE = Pgman::Page_request::UNLOCK_PAGE ,UNLOCK_PAGE = Pgman::Page_request::UNLOCK_PAGE
,CORR_REQ = Pgman::Page_request::CORR_REQ ,CORR_REQ = Pgman::Page_request::CORR_REQ
#ifdef ERROR_INSERT
,DELAY_REQ = Pgman::Page_request::DELAY_REQ
#endif
}; };
/** /**
...@@ -588,6 +601,9 @@ Page_cache_client::get_page(Signal* signal, Request& req, Uint32 flags) ...@@ -588,6 +601,9 @@ Page_cache_client::get_page(Signal* signal, Request& req, Uint32 flags)
page_req.m_block = m_block; page_req.m_block = m_block;
page_req.m_flags = flags; page_req.m_flags = flags;
page_req.m_callback = req.m_callback; page_req.m_callback = req.m_callback;
#ifdef ERROR_INSERT
page_req.m_delay_until_time = req.m_delay_until_time;
#endif
int i = m_pgman->get_page(signal, entry_ptr, page_req); int i = m_pgman->get_page(signal, entry_ptr, page_req);
if (i > 0) if (i > 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment