Commit 7e22cf6f authored by joreland@mysql.com's avatar joreland@mysql.com

Piggy back close scan req

parent 16a714c9
...@@ -1155,7 +1155,6 @@ public: ...@@ -1155,7 +1155,6 @@ public:
union { Uint32 m_queued_count; Uint32 scanReceivedOperations; }; union { Uint32 m_queued_count; Uint32 scanReceivedOperations; };
DLList<ScanFragRec>::Head m_queued_scan_frags; // In TC !sent to API DLList<ScanFragRec>::Head m_queued_scan_frags; // In TC !sent to API
DLList<ScanFragRec>::Head m_delivered_scan_frags;// Delivered to API DLList<ScanFragRec>::Head m_delivered_scan_frags;// Delivered to API
DLList<ScanFragRec>::Head m_completed_scan_frags;// Completed
// Id of the next fragment to be scanned. Used by scan fragment // Id of the next fragment to be scanned. Used by scan fragment
// processes when they are ready for the next fragment // processes when they are ready for the next fragment
...@@ -1433,7 +1432,7 @@ private: ...@@ -1433,7 +1432,7 @@ private:
void releaseScanResources(ScanRecordPtr); void releaseScanResources(ScanRecordPtr);
ScanRecordPtr seizeScanrec(Signal* signal); ScanRecordPtr seizeScanrec(Signal* signal);
void sendScanFragReq(Signal*, ScanRecord*, ScanFragRec*); void sendScanFragReq(Signal*, ScanRecord*, ScanFragRec*);
void sendScanTabConf(Signal* signal, ScanRecord*); void sendScanTabConf(Signal* signal, ScanRecordPtr);
void close_scan_req(Signal*, ScanRecordPtr, bool received_req); void close_scan_req(Signal*, ScanRecordPtr, bool received_req);
void close_scan_req_send_conf(Signal*, ScanRecordPtr); void close_scan_req_send_conf(Signal*, ScanRecordPtr);
......
...@@ -6586,10 +6586,8 @@ void Dbtc::timeOutFoundFragLab(Signal* signal, UintR TscanConPtr) ...@@ -6586,10 +6586,8 @@ void Dbtc::timeOutFoundFragLab(Signal* signal, UintR TscanConPtr)
*/ */
ptr.p->scanFragState = ScanFragRec::COMPLETED; ptr.p->scanFragState = ScanFragRec::COMPLETED;
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(ptr); run.release(ptr);
comp.add(ptr);
ptr.p->stopFragTimer(); ptr.p->stopFragTimer();
} }
...@@ -6865,7 +6863,6 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal, ...@@ -6865,7 +6863,6 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
jam(); jam();
ScanFragRecPtr ptr; ScanFragRecPtr ptr;
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
for(run.first(ptr); !ptr.isNull(); ){ for(run.first(ptr); !ptr.isNull(); ){
jam(); jam();
...@@ -6875,8 +6872,7 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal, ...@@ -6875,8 +6872,7 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
refToNode(curr.p->lqhBlockref) == failedNodeId){ refToNode(curr.p->lqhBlockref) == failedNodeId){
jam(); jam();
run.remove(curr); run.release(curr);
comp.add(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer(); curr.p->stopFragTimer();
found = true; found = true;
...@@ -8809,14 +8805,10 @@ void Dbtc::releaseScanResources(ScanRecordPtr scanPtr) ...@@ -8809,14 +8805,10 @@ void Dbtc::releaseScanResources(ScanRecordPtr scanPtr)
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord); ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
releaseTcCon(); releaseTcCon();
ScanFragList x(c_scan_frag_pool,
scanPtr.p->m_completed_scan_frags);
x.release();
ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty()); ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty());
ndbrequire(scanPtr.p->m_queued_scan_frags.isEmpty()); ndbrequire(scanPtr.p->m_queued_scan_frags.isEmpty());
ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty()); ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty());
ndbassert(scanPtr.p->scanApiRec == apiConnectptr.i); ndbassert(scanPtr.p->scanApiRec == apiConnectptr.i);
ndbassert(apiConnectptr.p->apiScanRec == scanPtr.i); ndbassert(apiConnectptr.p->apiScanRec == scanPtr.i);
...@@ -8868,10 +8860,8 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal) ...@@ -8868,10 +8860,8 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
if(tabPtr.p->checkTable(schemaVersion) == false){ if(tabPtr.p->checkTable(schemaVersion) == false){
jam(); jam();
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr); run.release(scanFragptr);
comp.add(scanFragptr);
scanError(signal, scanptr, tabPtr.p->getErrorCode(schemaVersion)); scanError(signal, scanptr, tabPtr.p->getErrorCode(schemaVersion));
return; return;
} }
...@@ -8889,10 +8879,8 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal) ...@@ -8889,10 +8879,8 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
updateBuddyTimer(apiConnectptr); updateBuddyTimer(apiConnectptr);
{ {
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr); run.release(scanFragptr);
comp.add(scanFragptr);
} }
close_scan_req_send_conf(signal, scanptr); close_scan_req_send_conf(signal, scanptr);
return; return;
...@@ -8945,10 +8933,8 @@ void Dbtc::execDIGETPRIMREF(Signal* signal) ...@@ -8945,10 +8933,8 @@ void Dbtc::execDIGETPRIMREF(Signal* signal)
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord); ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr); run.release(scanFragptr);
comp.add(scanFragptr);
scanError(signal, scanptr, errCode); scanError(signal, scanptr, errCode);
}//Dbtc::execDIGETPRIMREF() }//Dbtc::execDIGETPRIMREF()
...@@ -8993,10 +8979,8 @@ void Dbtc::execSCAN_FRAGREF(Signal* signal) ...@@ -8993,10 +8979,8 @@ void Dbtc::execSCAN_FRAGREF(Signal* signal)
{ {
scanFragptr.p->scanFragState = ScanFragRec::COMPLETED; scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr); run.release(scanFragptr);
comp.add(scanFragptr);
scanFragptr.p->stopFragTimer(); scanFragptr.p->stopFragTimer();
} }
scanError(signal, scanptr, errCode); scanError(signal, scanptr, errCode);
...@@ -9095,10 +9079,8 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) ...@@ -9095,10 +9079,8 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
} else { } else {
jam(); jam();
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr); run.release(scanFragptr);
comp.add(scanFragptr);
scanFragptr.p->stopFragTimer(); scanFragptr.p->stopFragTimer();
scanFragptr.p->scanFragState = ScanFragRec::COMPLETED; scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
} }
...@@ -9147,7 +9129,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) ...@@ -9147,7 +9129,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
if(scanptr.p->m_queued_count > /** Min */ 0){ if(scanptr.p->m_queued_count > /** Min */ 0){
jam(); jam();
sendScanTabConf(signal, scanptr.p); sendScanTabConf(signal, scanptr);
} }
}//Dbtc::execSCAN_FRAGCONF() }//Dbtc::execSCAN_FRAGCONF()
...@@ -9308,7 +9290,6 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){ ...@@ -9308,7 +9290,6 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
{ {
ScanFragRecPtr ptr; ScanFragRecPtr ptr;
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags); ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags); ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags); ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
...@@ -9350,7 +9331,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){ ...@@ -9350,7 +9331,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
} else { } else {
jam(); jam();
completed.add(curr); c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer(); curr.p->stopFragTimer();
} }
...@@ -9378,7 +9359,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){ ...@@ -9378,7 +9359,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
ScanFragNextReq::SignalLength, JBB); ScanFragNextReq::SignalLength, JBB);
} else { } else {
jam(); jam();
completed.add(curr); c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer(); curr.p->stopFragTimer();
} }
...@@ -9492,10 +9473,10 @@ void Dbtc::sendScanFragReq(Signal* signal, ...@@ -9492,10 +9473,10 @@ void Dbtc::sendScanFragReq(Signal* signal,
}//Dbtc::sendScanFragReq() }//Dbtc::sendScanFragReq()
void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) { void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
jam(); jam();
Uint32* ops = signal->getDataPtrSend()+4; Uint32* ops = signal->getDataPtrSend()+4;
Uint32 op_count = scanP->m_queued_count; Uint32 op_count = scanPtr.p->m_queued_count;
if(4 + 3 * op_count > 25){ if(4 + 3 * op_count > 25){
jam(); jam();
ops += 21; ops += 21;
...@@ -9507,9 +9488,9 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) { ...@@ -9507,9 +9488,9 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
conf->transId1 = apiConnectptr.p->transid[0]; conf->transId1 = apiConnectptr.p->transid[0];
conf->transId2 = apiConnectptr.p->transid[1]; conf->transId2 = apiConnectptr.p->transid[1];
ScanFragRecPtr ptr; ScanFragRecPtr ptr;
ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags); {
ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags); ScanFragList queued(c_scan_frag_pool, scanPtr.p->m_queued_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags); ScanFragList delivered(c_scan_frag_pool,scanPtr.p->m_delivered_scan_frags);
for(queued.first(ptr); !ptr.isNull(); ){ for(queued.first(ptr); !ptr.isNull(); ){
ndbrequire(ptr.p->scanFragState == ScanFragRec::QUEUED_FOR_DELIVERY); ndbrequire(ptr.p->scanFragState == ScanFragRec::QUEUED_FOR_DELIVERY);
ScanFragRecPtr curr = ptr; // Remove while iterating... ScanFragRecPtr curr = ptr; // Remove while iterating...
...@@ -9526,11 +9507,18 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) { ...@@ -9526,11 +9507,18 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
curr.p->stopFragTimer(); curr.p->stopFragTimer();
} else { } else {
(* --ops) = ScanTabConf::EndOfData; ops++; (* --ops) = ScanTabConf::EndOfData; ops++;
completed.add(curr); c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer(); curr.p->stopFragTimer();
} }
} }
}
if(scanPtr.p->m_delivered_scan_frags.isEmpty() &&
scanPtr.p->m_running_scan_frags.isEmpty()){
conf->requestInfo = op_count | ScanTabConf::EndOfData;
releaseScanResources(scanPtr);
}
if(4 + 3 * op_count > 25){ if(4 + 3 * op_count > 25){
jam(); jam();
...@@ -9544,7 +9532,7 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) { ...@@ -9544,7 +9532,7 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABCONF, signal, sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABCONF, signal,
ScanTabConf::SignalLength + 3 * op_count, JBB); ScanTabConf::SignalLength + 3 * op_count, JBB);
} }
scanP->m_queued_count = 0; scanPtr.p->m_queued_count = 0;
}//Dbtc::sendScanTabConf() }//Dbtc::sendScanTabConf()
...@@ -10446,7 +10434,6 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal) ...@@ -10446,7 +10434,6 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
DUMP_SFR(sp.p->m_running_scan_frags); DUMP_SFR(sp.p->m_running_scan_frags);
DUMP_SFR(sp.p->m_queued_scan_frags); DUMP_SFR(sp.p->m_queued_scan_frags);
DUMP_SFR(sp.p->m_delivered_scan_frags); DUMP_SFR(sp.p->m_delivered_scan_frags);
DUMP_SFR(sp.p->m_completed_scan_frags);
// Request dump of ApiConnectRecord // Request dump of ApiConnectRecord
dumpState->args[0] = DumpStateOrd::TcDumpOneApiConnectRec; dumpState->args[0] = DumpStateOrd::TcDumpOneApiConnectRec;
......
...@@ -120,6 +120,15 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, ...@@ -120,6 +120,15 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
} }
} }
} }
if (conf->requestInfo & ScanTabConf::EndOfData) {
if(theScanningOp->m_ordered)
theScanningOp->m_api_receivers_count = 0;
if(theScanningOp->m_api_receivers_count +
theScanningOp->m_conf_receivers_count +
theScanningOp->m_sent_receivers_count){
abort();
}
}
return 0; return 0;
} else { } else {
#ifdef NDB_NO_DROPPED_SIGNAL #ifdef NDB_NO_DROPPED_SIGNAL
......
...@@ -520,21 +520,10 @@ int NdbScanOperation::nextResult(bool fetchAllowed) ...@@ -520,21 +520,10 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
/** /**
* No completed & no sent -> EndOfData * No completed & no sent -> EndOfData
*/ */
if(send_next_scan(0, true) == 0){ // Close scan
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
theError.code = -1; // make sure user gets error if he tries again theError.code = -1; // make sure user gets error if he tries again
if(DEBUG_NEXT_RESULT) ndbout_c("return 1"); if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
return 1; return 1;
} }
retVal = -1; //return_code;
} else {
retVal = -3;
}
idx = last;
}
if(retVal == 0) if(retVal == 0)
break; break;
...@@ -685,6 +674,7 @@ void NdbScanOperation::closeScan() ...@@ -685,6 +674,7 @@ void NdbScanOperation::closeScan()
if(m_api_receivers_count+m_conf_receivers_count){ if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan // Send close scan
ndbout_c("sending close %d %d", m_api_receivers_count, m_conf_receivers_count);
send_next_scan(0, true); // Close scan send_next_scan(0, true); // Close scan
} }
...@@ -1344,19 +1334,9 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ ...@@ -1344,19 +1334,9 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
return 0; return 0;
} }
TransporterFacade* tp = TransporterFacade::instance(); theError.code = -1;
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) &&
send_next_scan(0, true) == 0 &&
theError.code == 0){
if(DEBUG_NEXT_RESULT) ndbout_c("return 1"); if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
return 1; return 1;
}
setErrorCode(theError.code);
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
return -1;
} }
int int
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment