Commit a5e0f5f6 authored by joreland@mysql.com's avatar joreland@mysql.com

Fix scan error bug

(last known :-))
parent 43a0e76e
...@@ -6433,7 +6433,6 @@ void Dbtc::execSCAN_HBREP(Signal* signal) ...@@ -6433,7 +6433,6 @@ void Dbtc::execSCAN_HBREP(Signal* signal)
c_scan_frag_pool.getPtr(scanFragptr); c_scan_frag_pool.getPtr(scanFragptr);
switch (scanFragptr.p->scanFragState){ switch (scanFragptr.p->scanFragState){
case ScanFragRec::LQH_ACTIVE: case ScanFragRec::LQH_ACTIVE:
//case ScanFragRec::LQH_ACTIVE_CLOSE:
break; break;
default: default:
DEBUG("execSCAN_HBREP: scanFragState="<<scanFragptr.p->scanFragState); DEBUG("execSCAN_HBREP: scanFragState="<<scanFragptr.p->scanFragState);
...@@ -6514,7 +6513,6 @@ void Dbtc::timeOutFoundFragLab(Signal* signal, UintR TscanConPtr) ...@@ -6514,7 +6513,6 @@ void Dbtc::timeOutFoundFragLab(Signal* signal, UintR TscanConPtr)
/** /**
* The node has died * The node has died
*/ */
ndbout_c("Node %d has died", nodeId);
ptr.p->scanFragState = ScanFragRec::COMPLETED; ptr.p->scanFragState = ScanFragRec::COMPLETED;
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags); ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
...@@ -8358,10 +8356,6 @@ void Dbtc::execSCAN_TABREQ(Signal* signal) ...@@ -8358,10 +8356,6 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
scanParallel = (scanConcurrency + 15) / 16; scanParallel = (scanConcurrency + 15) / 16;
noOprecPerFrag = (scanConcurrency >= 16 ? 16 : scanConcurrency & 15); noOprecPerFrag = (scanConcurrency >= 16 ? 16 : scanConcurrency & 15);
} }
#ifdef VM_TRACE
ndbout_c("noOprecPerFrag=%d", noOprecPerFrag);
ndbout_c("scanParallel=%d", scanParallel);
#endif
jamEntry(); jamEntry();
apiConnectptr.i = scanTabReq->apiConnectPtr; apiConnectptr.i = scanTabReq->apiConnectPtr;
...@@ -8708,11 +8702,6 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal) ...@@ -8708,11 +8702,6 @@ void Dbtc::execDI_FCOUNTCONF(Signal* signal)
for (list.first(ptr); !ptr.isNull(); list.next(ptr)){ for (list.first(ptr); !ptr.isNull(); list.next(ptr)){
jam(); jam();
#ifdef VM_TRACE
ndbout_c("DIGETPRIMREQ(%d, %d)",
scanptr.p->scanTableref, scanptr.p->scanNextFragId);
#endif
ptr.p->lqhBlockref = 0; ptr.p->lqhBlockref = 0;
ptr.p->startFragTimer(ctcTimer); ptr.p->startFragTimer(ctcTimer);
ptr.p->scanFragId = scanptr.p->scanNextFragId++; ptr.p->scanFragId = scanptr.p->scanNextFragId++;
...@@ -8759,9 +8748,6 @@ void Dbtc::abortScanLab(Signal* signal, ScanRecordPtr scanptr, Uint32 errCode) ...@@ -8759,9 +8748,6 @@ void Dbtc::abortScanLab(Signal* signal, ScanRecordPtr scanptr, Uint32 errCode)
void Dbtc::releaseScanResources(ScanRecordPtr scanPtr) void Dbtc::releaseScanResources(ScanRecordPtr scanPtr)
{ {
#ifdef VM_TRACE
ndbout_c("releaseScanResources: %d", scanPtr.i);
#endif
if (apiConnectptr.p->cachePtr != RNIL) { if (apiConnectptr.p->cachePtr != RNIL) {
cachePtr.i = apiConnectptr.p->cachePtr; cachePtr.i = apiConnectptr.p->cachePtr;
ptrCheckGuard(cachePtr, ccacheFilesize, cacheRecord); ptrCheckGuard(cachePtr, ccacheFilesize, cacheRecord);
...@@ -9041,26 +9027,15 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal) ...@@ -9041,26 +9027,15 @@ void Dbtc::execSCAN_FRAGCONF(Signal* signal)
const Uint32 status = conf->fragmentCompleted; const Uint32 status = conf->fragmentCompleted;
DEBUG(apiConnectptr.i << " " << scanFragptr.i <<
" execSCAN_FRAGCONF() status: " << status
<< " ops: " << noCompletedOps << " from: " << refToNode(signal->getSendersBlockRef()));
if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){ if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
jam(); jam();
if(status == ZFALSE){ if(status == ZFALSE){
/** /**
* We have started closing = we sent a close -> ignore this * We have started closing = we sent a close -> ignore this
*/ */
DEBUG(apiConnectptr.i << " " << scanFragptr.i <<
" Received SCANFRAG_CONF wo/ close when in "
" CLOSING_SCAN:" << status << " " << noCompletedOps);
return; return;
} else { } else {
jam(); jam();
DEBUG(apiConnectptr.i << " " << scanFragptr.i
<< " Received SCANFRAG_CONF w/ close when in "
" CLOSING_SCAN:" << status << " " << noCompletedOps);
ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags); ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags); ScanFragList comp(c_scan_frag_pool, scanptr.p->m_completed_scan_frags);
...@@ -9249,9 +9224,6 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) ...@@ -9249,9 +9224,6 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
void void
Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
#ifdef VM_TRACE
ndbout_c("%d close_scan_req", apiConnectptr.i);
#endif
ScanRecord* scanP = scanPtr.p; ScanRecord* scanP = scanPtr.p;
scanPtr.p->scanState = ScanRecord::CLOSING_SCAN; scanPtr.p->scanState = ScanRecord::CLOSING_SCAN;
scanPtr.p->m_close_scan_req = req_received; scanPtr.p->m_close_scan_req = req_received;
...@@ -9295,7 +9267,6 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){ ...@@ -9295,7 +9267,6 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
nextReq->senderData = curr.i; nextReq->senderData = curr.i;
sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB); ScanFragNextReq::SignalLength, JBB);
ndbout_c("%d running -> closing", curr.i);
} }
// Close delivered // Close delivered
...@@ -9316,13 +9287,11 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){ ...@@ -9316,13 +9287,11 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB); ScanFragNextReq::SignalLength, JBB);
ndbout_c("%d delivered -> closing (%d)", curr.i, curr.p->m_ops);
} else { } else {
jam(); jam();
completed.add(curr); completed.add(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer(); curr.p->stopFragTimer();
ndbout_c("%d delivered -> completed", curr.i);
} }
}//for }//for
...@@ -9346,14 +9315,11 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){ ...@@ -9346,14 +9315,11 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
nextReq->senderData = curr.i; nextReq->senderData = curr.i;
sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB); ScanFragNextReq::SignalLength, JBB);
ndbout_c("%d queued -> closing", curr.i);
} else { } else {
jam(); jam();
completed.add(curr); completed.add(curr);
curr.p->scanFragState = ScanFragRec::COMPLETED; curr.p->scanFragState = ScanFragRec::COMPLETED;
curr.p->stopFragTimer(); curr.p->stopFragTimer();
ndbout_c("%d queued -> completed", curr.i);
} }
} }
} }
...@@ -9369,7 +9335,7 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){ ...@@ -9369,7 +9335,7 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){
ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty()); ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty());
//ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty()); //ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty());
#if 1 #if 0
{ {
ScanFragList comp(c_scan_frag_pool, scanPtr.p->m_completed_scan_frags); ScanFragList comp(c_scan_frag_pool, scanPtr.p->m_completed_scan_frags);
ScanFragRecPtr ptr; ScanFragRecPtr ptr;
...@@ -9382,8 +9348,6 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){ ...@@ -9382,8 +9348,6 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){
if(!scanPtr.p->m_running_scan_frags.isEmpty()){ if(!scanPtr.p->m_running_scan_frags.isEmpty()){
jam(); jam();
ndbout_c("%d close_scan_req_send_conf: not ready", apiConnectptr.i);
return; return;
} }
...@@ -9394,12 +9358,9 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){ ...@@ -9394,12 +9358,9 @@ Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){
/** /**
* The API hasn't order closing yet * The API hasn't order closing yet
*/ */
ndbout_c("%d close_scan_req_send_conf: api not ready", apiConnectptr.i);
return; return;
} }
ndbout_c("%d close_scan_req_send_conf: ready", apiConnectptr.i);
if(!apiFail){ if(!apiFail){
jam(); jam();
Uint32 ref = apiConnectptr.p->ndbapiBlockref; Uint32 ref = apiConnectptr.p->ndbapiBlockref;
......
...@@ -67,7 +67,6 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){ ...@@ -67,7 +67,6 @@ NdbConnection::receiveSCAN_TABREF(NdbApiSignal* aSignal){
return 0; return 0;
} }
assert(theScanningOp->m_sent_receivers_count); assert(theScanningOp->m_sent_receivers_count);
theScanningOp->m_sent_receivers_count--;
theScanningOp->m_conf_receivers_count++; theScanningOp->m_conf_receivers_count++;
return 0; return 0;
} }
......
...@@ -135,8 +135,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -135,8 +135,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
m_ordered = 0; m_ordered = 0;
Uint32 fragCount = m_currentTable->m_fragmentCount; Uint32 fragCount = m_currentTable->m_fragmentCount;
ndbout_c("batch: %d parallell: %d fragCount: %d",
batch, parallell, fragCount);
if(batch + parallell == 0){ // Max speed if(batch + parallell == 0){ // Max speed
batch = 16; batch = 16;
...@@ -156,9 +154,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -156,9 +154,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
else if(parallell == 0) else if(parallell == 0)
parallell = fragCount; parallell = fragCount;
ndbout_c("batch: %d parallell: %d fragCount: %d",
batch, parallell, fragCount);
assert(parallell > 0); assert(parallell > 0);
// It is only possible to call openScan if // It is only possible to call openScan if
...@@ -300,6 +295,7 @@ NdbScanOperation::fix_receivers(Uint32 parallell, bool keyInfo){ ...@@ -300,6 +295,7 @@ NdbScanOperation::fix_receivers(Uint32 parallell, bool keyInfo){
*/ */
void void
NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
if(theError.code == 0){
Uint32 idx = tRec->m_list_index; Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1; Uint32 last = m_sent_receivers_count - 1;
if(idx != last){ if(idx != last){
...@@ -314,6 +310,7 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ ...@@ -314,6 +310,7 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
m_conf_receivers_count = last + 1; m_conf_receivers_count = last + 1;
tRec->m_list_index = last; tRec->m_list_index = last;
tRec->m_current_row = 0; tRec->m_current_row = 0;
}
} }
/** /**
...@@ -321,6 +318,7 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){ ...@@ -321,6 +318,7 @@ NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
*/ */
void void
NdbScanOperation::receiver_completed(NdbReceiver* tRec){ NdbScanOperation::receiver_completed(NdbReceiver* tRec){
if(theError.code == 0){
Uint32 idx = tRec->m_list_index; Uint32 idx = tRec->m_list_index;
Uint32 last = m_sent_receivers_count - 1; Uint32 last = m_sent_receivers_count - 1;
if(idx != last){ if(idx != last){
...@@ -329,6 +327,7 @@ NdbScanOperation::receiver_completed(NdbReceiver* tRec){ ...@@ -329,6 +327,7 @@ NdbScanOperation::receiver_completed(NdbReceiver* tRec){
move->m_list_index = idx; move->m_list_index = idx;
} }
m_sent_receivers_count = last; m_sent_receivers_count = last;
}
} }
/***************************************************************************** /*****************************************************************************
...@@ -512,11 +511,6 @@ int NdbScanOperation::nextResult(bool fetchAllowed) ...@@ -512,11 +511,6 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
*/ */
theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue; continue;
...@@ -531,11 +525,6 @@ int NdbScanOperation::nextResult(bool fetchAllowed) ...@@ -531,11 +525,6 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
if(send_next_scan(0, true) == 0){ // Close scan if(send_next_scan(0, true) == 0){ // Close scan
theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) { if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
return 1; return 1;
...@@ -653,11 +642,7 @@ NdbScanOperation::doSend(int ProcessorId) ...@@ -653,11 +642,7 @@ NdbScanOperation::doSend(int ProcessorId)
void NdbScanOperation::closeScan() void NdbScanOperation::closeScan()
{ {
ndbout_c("closeScan %d : api: %d conf: %d sent: %d", int self = pthread_self() ;
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
do { do {
TransporterFacade* tp = TransporterFacade::instance(); TransporterFacade* tp = TransporterFacade::instance();
...@@ -671,50 +656,17 @@ void NdbScanOperation::closeScan() ...@@ -671,50 +656,17 @@ void NdbScanOperation::closeScan()
break; break;
} }
/**
* Wait for all running scans...
*/
while(m_sent_receivers_count){
theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
break;
case -1:
setErrorCode(4008);
case -2:
m_sent_receivers_count = 0;
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
}
}
if(seq != tp->getNodeSequence(nodeId)){
theNdbCon->theReleaseOnClose = true;
break;
}
if(m_api_receivers_count+m_conf_receivers_count){ if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan // Send close scan
send_next_scan(0, true); // Close scan send_next_scan(0, true); // Close scan
}
/** /**
* wait for close scan conf * wait for close scan conf
*/ */
do { while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count){
theNdb->theWaiter.m_node = nodeId; theNdb->theWaiter.m_node = nodeId;
theNdb->theWaiter.m_state = WAIT_SCAN; theNdb->theWaiter.m_state = WAIT_SCAN;
ndbout_c("%d : api: %d conf: %d sent: %d",
__LINE__,
m_api_receivers_count,
m_conf_receivers_count,
m_sent_receivers_count);
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT); int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){ switch(return_code){
case 0: case 0:
...@@ -724,8 +676,8 @@ void NdbScanOperation::closeScan() ...@@ -724,8 +676,8 @@ void NdbScanOperation::closeScan()
case -2: case -2:
m_api_receivers_count = 0; m_api_receivers_count = 0;
m_conf_receivers_count = 0; m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
} }
} while(m_api_receivers_count+m_conf_receivers_count);
} }
} while(0); } while(0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment