Commit f856038c authored by unknown's avatar unknown

Piggy back close scan req

parent 9ff05718
......@@ -1155,7 +1155,6 @@ public:
union { Uint32 m_queued_count; Uint32 scanReceivedOperations; };
DLList<ScanFragRec>::Head m_queued_scan_frags; // In TC !sent to API
DLList<ScanFragRec>::Head m_delivered_scan_frags;// Delivered to API
DLList<ScanFragRec>::Head m_completed_scan_frags;// Completed
// Id of the next fragment to be scanned. Used by scan fragment
// processes when they are ready for the next fragment
......@@ -1433,7 +1432,7 @@ private:
void releaseScanResources(ScanRecordPtr);
ScanRecordPtr seizeScanrec(Signal* signal);
void sendScanFragReq(Signal*, ScanRecord*, ScanFragRec*);
void sendScanTabConf(Signal* signal, ScanRecord*);
void sendScanTabConf(Signal* signal, ScanRecordPtr);
void close_scan_req(Signal*, ScanRecordPtr, bool received_req);
void close_scan_req_send_conf(Signal*, ScanRecordPtr);
......
......@@ -6586,10 +6586,8 @@ void Dbtc::timeOutFoundFragLab(Signal* signal, UintR TscanConPtr)
*/
ptr.p->scanFragState = ScanFragRec::COMPLETED;
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(ptr);
comp.add(ptr);
run.release(ptr);
ptr.p->stopFragTimer();
}
......@@ -6865,7 +6863,6 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
jam();
ScanFragRecPtr ptr;
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
for(run.first(ptr); !ptr.isNull(); ){
jam();
......@@ -6875,8 +6872,7 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
refToNode(curr.p->lqhBlockref) == failedNodeId){
jam();
run.remove(curr);
comp.add(curr);
run.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
found = true;
......@@ -8809,14 +8805,10 @@ void Dbtc::releaseScanResources(ScanRecordPtr scanPtr)
ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
releaseTcCon();
ScanFragList x(c_scan_frag_pool,
scanPtr.p->m_completed_scan_frags);
x.release();
ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty());
ndbrequire(scanPtr.p->m_queued_scan_frags.isEmpty());
ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty());
ndbassert(scanPtr.p->scanApiRec == apiConnectptr.i);
ndbassert(apiConnectptr.p->apiScanRec == scanPtr.i);
......@@ -8868,10 +8860,8 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
if(tabPtr.p->checkTable(schemaVersion) == false){
jam();
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr);
comp.add(scanFragptr);
run.release(scanFragptr);
scanError(signal, scanptr, tabPtr.p->getErrorCode(schemaVersion));
return;
}
......@@ -8889,10 +8879,8 @@ void Dbtc::execDIGETPRIMCONF(Signal* signal)
updateBuddyTimer(apiConnectptr);
{
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr);
comp.add(scanFragptr);
run.release(scanFragptr);
}
close_scan_req_send_conf(signal, scanptr);
return;
......@@ -8945,10 +8933,8 @@ void Dbtc::execDIGETPRIMREF(Signal* signal)
ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr);
comp.add(scanFragptr);
run.release(scanFragptr);
scanError(signal, scanptr, errCode);
}//Dbtc::execDIGETPRIMREF()
......@@ -8993,10 +8979,8 @@ void Dbtc::execSCAN_FRAGREF(Signal* signal)
{
scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr);
comp.add(scanFragptr);
run.release(scanFragptr);
scanFragptr.p->stopFragTimer();
}
scanError(signal, scanptr, errCode);
......@@ -9095,10 +9079,8 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
} else {
jam();
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
run.remove(scanFragptr);
comp.add(scanFragptr);
run.release(scanFragptr);
scanFragptr.p->stopFragTimer();
scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
}
......@@ -9147,7 +9129,7 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
if(scanptr.p->m_queued_count > /** Min */ 0){
jam();
sendScanTabConf(signal, scanptr.p);
sendScanTabConf(signal, scanptr);
}
}//Dbtc::execSCAN_FRAGCONF()
......@@ -9308,7 +9290,6 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
{
ScanFragRecPtr ptr;
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
......@@ -9350,7 +9331,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
} else {
jam();
completed.add(curr);
c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
}
......@@ -9378,7 +9359,7 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
ScanFragNextReq::SignalLength, JBB);
} else {
jam();
completed.add(curr);
c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
}
......@@ -9492,10 +9473,10 @@ void Dbtc::sendScanFragReq(Signal* signal,
}//Dbtc::sendScanFragReq()
void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
jam();
Uint32* ops = signal->getDataPtrSend()+4;
Uint32 op_count = scanP->m_queued_count;
Uint32 op_count = scanPtr.p->m_queued_count;
if(4 + 3 * op_count > 25){
jam();
ops += 21;
......@@ -9507,9 +9488,9 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
conf->transId1 = apiConnectptr.p->transid[0];
conf->transId2 = apiConnectptr.p->transid[1];
ScanFragRecPtr ptr;
ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
ScanFragList completed(c_scan_frag_pool, scanP->m_completed_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
{
ScanFragList queued(c_scan_frag_pool, scanPtr.p->m_queued_scan_frags);
ScanFragList delivered(c_scan_frag_pool,scanPtr.p->m_delivered_scan_frags);
for(queued.first(ptr); !ptr.isNull(); ){
ndbrequire(ptr.p->scanFragState == ScanFragRec::QUEUED_FOR_DELIVERY);
ScanFragRecPtr curr = ptr; // Remove while iterating...
......@@ -9526,11 +9507,18 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
curr.p->stopFragTimer();
} else {
(* --ops) = ScanTabConf::EndOfData; ops++;
completed.add(curr);
c_scan_frag_pool.release(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer();
}
}
}
if(scanPtr.p->m_delivered_scan_frags.isEmpty() &&
scanPtr.p->m_running_scan_frags.isEmpty()){
conf->requestInfo = op_count | ScanTabConf::EndOfData;
releaseScanResources(scanPtr);
}
if(4 + 3 * op_count > 25){
jam();
......@@ -9544,7 +9532,7 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecord * scanP) {
sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABCONF, signal,
ScanTabConf::SignalLength + 3 * op_count, JBB);
}
scanP->m_queued_count = 0;
scanPtr.p->m_queued_count = 0;
}//Dbtc::sendScanTabConf()
......@@ -10446,7 +10434,6 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
DUMP_SFR(sp.p->m_running_scan_frags);
DUMP_SFR(sp.p->m_queued_scan_frags);
DUMP_SFR(sp.p->m_delivered_scan_frags);
DUMP_SFR(sp.p->m_completed_scan_frags);
// Request dump of ApiConnectRecord
dumpState->args[0] = DumpStateOrd::TcDumpOneApiConnectRec;
......
......@@ -120,6 +120,15 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
}
}
}
if (conf->requestInfo & ScanTabConf::EndOfData) {
if(theScanningOp->m_ordered)
theScanningOp->m_api_receivers_count = 0;
if(theScanningOp->m_api_receivers_count +
theScanningOp->m_conf_receivers_count +
theScanningOp->m_sent_receivers_count){
abort();
}
}
return 0;
} else {
#ifdef NDB_NO_DROPPED_SIGNAL
......
......@@ -520,21 +520,10 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
/**
* No completed & no sent -> EndOfData
*/
if(send_next_scan(0, true) == 0){ // Close scan
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
theError.code = -1; // make sure user gets error if he tries again
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
return 1;
}
retVal = -1; //return_code;
} else {
retVal = -3;
}
idx = last;
}
if(retVal == 0)
break;
......@@ -685,6 +674,7 @@ void NdbScanOperation::closeScan()
if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan
ndbout_c("sending close %d %d", m_api_receivers_count, m_conf_receivers_count);
send_next_scan(0, true); // Close scan
}
......@@ -1344,19 +1334,9 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
return 0;
}
TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) &&
send_next_scan(0, true) == 0 &&
theError.code == 0){
theError.code = -1;
if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
return 1;
}
setErrorCode(theError.code);
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
return -1;
}
int
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment