Commit a3b8d42b authored by joreland@mysql.com's avatar joreland@mysql.com

Update error handling of new scan

Still known bugs :-(
parent 5e4af7e5
......@@ -367,7 +367,7 @@ public:
/**
* Length of signal
*/
STATIC_CONST( SignalLength = 4 );
STATIC_CONST( SignalLength = 5 );
private:
......@@ -380,7 +380,7 @@ private:
UintR transId1; // DATA 1
UintR transId2; // DATA 2
UintR errorCode; // DATA 3
// UintR sendScanNextReqWithClose; // DATA 4
UintR closeNeeded; // DATA 4
};
......
......@@ -633,17 +633,7 @@ private:
#ifdef VM_TRACE
void printState();
#endif
bool checkState_TransId(const Uint32 * transId) const {
const Uint32 tTmp1 = transId[0];
const Uint32 tTmp2 = transId[1];
Uint64 tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
bool b = theStatus == Connected && theTransactionId == tRecTransId;
#ifdef NDB_NO_DROPPED_SIGNAL
if(!b) abort();
#endif
return b;
}
bool checkState_TransId(const Uint32 * transId) const;
};
inline
......@@ -678,6 +668,19 @@ NdbConnection::checkMagicNumber()
}
}
inline
bool
NdbConnection::checkState_TransId(const Uint32 * transId) const {
const Uint32 tTmp1 = transId[0];
const Uint32 tTmp2 = transId[1];
Uint64 tRecTransId = (Uint64)tTmp1 + ((Uint64)tTmp2 << 32);
bool b = theStatus == Connected && theTransactionId == tRecTransId;
#ifdef NDB_NO_DROPPED_SIGNAL
if(!b) abort();
#endif
return b;
}
/************************************************************************************************
void setTransactionId(Uint64 aTransactionId);
......
......@@ -146,7 +146,7 @@ protected:
int send_next_scan(Uint32 cnt, bool close);
void receiver_delivered(NdbReceiver*);
void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP(Uint32 errCode);
void execCLOSE_SCAN_REP();
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
......
......@@ -120,7 +120,7 @@ printSCANTABREF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
fprintf(output, " Errorcode: %u\n", sig->errorCode);
// fprintf(output, " sendScanNextReqWithClose: %u\n", sig->sendScanNextReqWithClose);
fprintf(output, " closeNeeded: %u\n", sig->closeNeeded);
return false;
}
......
......@@ -6785,7 +6785,8 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal)
if (findTransaction(transid1, transid2, senderData) != ZOK){
jam();
DEBUG("Received SCAN_NEXTREQ in LQH with close flag when closed");
DEBUG(senderData <<
" Received SCAN_NEXTREQ in LQH with close flag when closed");
ndbrequire(nextReq->closeFlag == ZTRUE);
return;
}
......@@ -6825,6 +6826,10 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal)
return;
}//if
if(ERROR_INSERTED(5036)){
return;
}
scanptr.i = tcConnectptr.p->tcScanRec;
ndbrequire(scanptr.i != RNIL);
c_scanRecordPool.getPtr(scanptr);
......@@ -6841,6 +6846,10 @@ void Dblqh::execSCAN_NEXTREQ(Signal* signal)
if(ERROR_INSERTED(5034)){
CLEAR_ERROR_INSERT_VALUE;
}
if(ERROR_INSERTED(5036)){
CLEAR_ERROR_INSERT_VALUE;
return;
}
closeScanRequestLab(signal);
return;
}//if
......@@ -8517,6 +8526,11 @@ void Dblqh::sendKeyinfo20(Signal* signal,
* ------------------------------------------------------------------------ */
void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
{
if(ERROR_INSERTED(5037)){
CLEAR_ERROR_INSERT_VALUE;
return;
}
scanptr.p->scanTcWaiting = ZFALSE;
ScanFragConf * conf = (ScanFragConf*)&signal->theData[0];
......
......@@ -1194,6 +1194,9 @@ public:
// Scan is on ordered index
Uint8 rangeScan;
// Close is ordered
bool m_close_scan_req;
};
typedef Ptr<ScanRecord> ScanRecordPtr;
......@@ -1414,15 +1417,15 @@ private:
Uint32 buddyPtr,
UintR transid1,
UintR transid2);
void initScanrec(Signal* signal,
void initScanrec(ScanRecordPtr, const class ScanTabReq*,
const UintR scanParallel,
const UintR noOprecPerFrag);
void initScanfragrec(Signal* signal);
void releaseScanResources(ScanRecordPtr);
void seizeScanrec(Signal* signal);
void sendScanFragReq(Signal* signal);
void sendScanTabConf(Signal* signal);
void close_scan_req(Signal*, ScanRecordPtr);
ScanRecordPtr seizeScanrec(Signal* signal);
void sendScanFragReq(Signal* signal, ScanRecord*, ScanFragRec*);
void sendScanTabConf(Signal* signal, ScanRecord*);
void close_scan_req(Signal*, ScanRecordPtr, bool received_req);
void close_scan_req_send_conf(Signal*, ScanRecordPtr);
void checkGcp(Signal* signal);
......@@ -1557,11 +1560,11 @@ private:
void systemErrorLab(Signal* signal);
void sendSignalErrorRefuseLab(Signal* signal);
void scanTabRefLab(Signal* signal, Uint32 errCode);
void diFcountReqLab(Signal* signal);
void diFcountReqLab(Signal* signal, ScanRecordPtr);
void signalErrorRefuseLab(Signal* signal);
void abort080Lab(Signal* signal);
void packKeyData000Lab(Signal* signal, BlockReference TBRef);
void abortScanLab(Signal* signal, Uint32 errCode);
void abortScanLab(Signal* signal, ScanRecordPtr, Uint32 errCode);
void sendAbortedAfterTimeout(Signal* signal, int Tcheck);
void abort010Lab(Signal* signal);
void abort015Lab(Signal* signal);
......@@ -1589,7 +1592,7 @@ private:
void attrinfo020Lab(Signal* signal);
void scanReleaseResourcesLab(Signal* signal);
void scanCompletedLab(Signal* signal);
void scanFragError(Signal* signal, Uint32 errorCode);
void scanError(Signal* signal, ScanRecordPtr, Uint32 errorCode);
void diverify010Lab(Signal* signal);
void intstartphase2x010Lab(Signal* signal);
void intstartphase3x010Lab(Signal* signal);
......@@ -1699,7 +1702,6 @@ private:
ApiConnectRecordPtr timeOutptr;
ScanRecord *scanRecord;
ScanRecordPtr scanptr;
UintR cscanrecFileSize;
UnsafeArrayPool<ScanFragRec> c_scan_frag_pool;
......
This diff is collapsed.
......@@ -54,6 +54,9 @@ void Dbtup::execSEND_PACKED(Signal* signal)
void Dbtup::bufferTRANSID_AI(Signal* signal, BlockReference aRef,
Uint32 Tlen)
{
if(Tlen == 3)
return;
Uint32 hostId = refToNode(aRef);
Uint32 Theader = ((refToBlock(aRef) << 16)+(Tlen-3));
......
......@@ -61,7 +61,14 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){
const ScanTabRef * ref = CAST_CONSTPTR(ScanTabRef, aSignal->getDataPtr());
if(checkState_TransId(&ref->transId1)){
theScanningOp->execCLOSE_SCAN_REP(ref->errorCode);
theScanningOp->theError.code = ref->errorCode;
if(!ref->closeNeeded){
theScanningOp->execCLOSE_SCAN_REP();
return 0;
}
assert(theScanningOp->m_sent_receivers_count);
theScanningOp->m_sent_receivers_count--;
theScanningOp->m_conf_receivers_count++;
return 0;
}
return -1;
......@@ -88,11 +95,10 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
if(checkState_TransId(&conf->transId1)){
if (conf->requestInfo == ScanTabConf::EndOfData) {
theScanningOp->execCLOSE_SCAN_REP(0);
theScanningOp->execCLOSE_SCAN_REP();
return 0;
}
int noComp = -1;
for(Uint32 i = 0; i<len; i += 3){
Uint32 ptrI = * ops++;
Uint32 tcPtrI = * ops++;
......@@ -108,15 +114,13 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
/**
*
*/
noComp++;
theScanningOp->receiver_delivered(tOp);
} else if(info == ScanTabConf::EndOfData){
noComp++;
theScanningOp->receiver_completed(tOp);
}
}
}
return noComp;
return 0;
}
return -1;
......
......@@ -135,6 +135,8 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
m_ordered = 0;
Uint32 fragCount = m_currentTable->m_fragmentCount;
ndbout_c("batch: %d parallell: %d fragCount: %d",
batch, parallell, fragCount);
if(batch + parallell == 0){ // Max speed
batch = 16;
......@@ -154,6 +156,9 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
else if(parallell == 0)
parallell = fragCount;
ndbout_c("batch: %d parallell: %d fragCount: %d",
batch, parallell, fragCount);
assert(parallell > 0);
// It is only possible to call openScan if
......@@ -486,6 +491,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
last = m_api_receivers_count;
do {
if(theError.code){
setErrorCode(theError.code);
return -1;
}
Uint32 cnt = m_conf_receivers_count;
Uint32 sent = m_sent_receivers_count;
......@@ -502,12 +512,17 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
*/
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
} else {
idx = last;
retVal = -1; //return_code;
retVal = -2; //return_code;
}
} else if(retVal == 2){
/**
......@@ -516,6 +531,11 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
if(send_next_scan(0, true) == 0){ // Close scan
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
return 1;
......@@ -633,6 +653,12 @@ NdbScanOperation::doSend(int ProcessorId)
void NdbScanOperation::closeScan()
{
ndbout_c("closeScan %d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
do {
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
......@@ -651,6 +677,11 @@ void NdbScanOperation::closeScan()
while(m_sent_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
......@@ -679,6 +710,11 @@ void NdbScanOperation::closeScan()
do {
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
......@@ -701,22 +737,7 @@ void NdbScanOperation::closeScan()
}
void
NdbScanOperation::execCLOSE_SCAN_REP(Uint32 errCode){
/**
* We will receive no further signals from this scan
*/
if(!errCode){
/**
* Normal termination
*/
theNdbCon->theCommitStatus = NdbConnection::Committed;
theNdbCon->theCompletionStatus = NdbConnection::CompletedSuccess;
} else {
/**
* Something is fishy
*/
abort();
}
NdbScanOperation::execCLOSE_SCAN_REP(){
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
......@@ -1206,7 +1227,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(u_idx)){
Uint32 tmp = m_sent_receivers_count;
while(m_sent_receivers_count > 0){
while(m_sent_receivers_count > 0 && !theError.code){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
......@@ -1223,6 +1244,10 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
if(theError.code){
setErrorCode(theError.code);
return -1;
}
}
} else {
return 2;
......@@ -1279,9 +1304,12 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(0, true) == 0){
if(seq == tp->getNodeSequence(nodeId) &&
send_next_scan(0, true) == 0 &&
theError.code == 0){
return 1;
}
setErrorCode(theError.code);
return -1;
}
......
......@@ -706,8 +706,11 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
tFirstDataPtr = int2void(tFirstData);
if (tFirstDataPtr == 0) goto InvalidSignal;
if (tWaitState == WAIT_SCAN){
tCon = void2con(tFirstDataPtr);
assert(tFirstDataPtr != 0 &&
void2con(tFirstDataPtr)->checkMagicNumber() == 0);
if (tCon->checkMagicNumber() == 0){
tReturnCode = tCon->receiveSCAN_TABREF(aSignal);
if (tReturnCode != -1){
......@@ -715,7 +718,6 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
break;
}
}
goto InvalidSignal;
}
case GSN_SCAN_TABINFO:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment