Commit 98de7e01 authored by unknown's avatar unknown

bug#6775 - ndb

Queue scan on real fragment.
  Index fragment for range scans
  Table fragment for table scans


ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  Document meaning of fragPtrI and how it differs
    from scanTcRec->fragmentptr
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Queue scan on real fragment.
    Index fragment for range scans
    Table fragment for table scans
ndb/src/ndbapi/NdbConnection.cpp:
  Check tOp before assigning
parent 1b9ea9b3
...@@ -550,6 +550,11 @@ public: ...@@ -550,6 +550,11 @@ public:
UintR scanErrorCounter; UintR scanErrorCounter;
UintR scanLocalFragid; UintR scanLocalFragid;
UintR scanSchemaVersion; UintR scanSchemaVersion;
/**
* This is _always_ main table, even in range scan
* in which case scanTcrec->fragmentptr is different
*/
Uint32 fragPtrI; Uint32 fragPtrI;
UintR scanStoredProcId; UintR scanStoredProcId;
ScanState scanState; ScanState scanState;
......
...@@ -7703,6 +7703,9 @@ void Dblqh::abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode){ ...@@ -7703,6 +7703,9 @@ void Dblqh::abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode){
jam(); jam();
scanptr.i = scan_ptr_i; scanptr.i = scan_ptr_i;
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
fragptr.i = tcConnectptr.p->fragmentptr;
ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
finishScanrec(signal); finishScanrec(signal);
releaseScanrec(signal); releaseScanrec(signal);
tcConnectptr.p->transactionState = TcConnectionrec::IDLE; tcConnectptr.p->transactionState = TcConnectionrec::IDLE;
...@@ -8570,10 +8573,12 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8570,10 +8573,12 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
/** /**
* Used for scan take over * Used for scan take over
*/ */
FragrecordPtr tFragPtr; {
tFragPtr.i = fragptr.p->tableFragptr; FragrecordPtr tFragPtr;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord); tFragPtr.i = fragptr.p->tableFragptr;
scanptr.p->fragPtrI = fragptr.p->tableFragptr; ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
scanptr.p->fragPtrI = fragptr.p->tableFragptr;
}
/** /**
* !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11 * !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11
...@@ -8582,8 +8587,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8582,8 +8587,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ); Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1); Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
stop += start; stop += start;
Uint32 free = tFragPtr.p->m_scanNumberMask.find(start); Uint32 free = fragptr.p->m_scanNumberMask.find(start);
if(free == Fragrecord::ScanNumberMask::NotFound || free >= stop){ if(free == Fragrecord::ScanNumberMask::NotFound || free >= stop){
jam(); jam();
...@@ -8597,16 +8602,16 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8597,16 +8602,16 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
*/ */
scanptr.p->scanState = ScanRecord::IN_QUEUE; scanptr.p->scanState = ScanRecord::IN_QUEUE;
LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, LocalDLFifoList<ScanRecord> queue(c_scanRecordPool,
tFragPtr.p->m_queuedScans); fragptr.p->m_queuedScans);
queue.add(scanptr); queue.add(scanptr);
return ZOK; return ZOK;
} }
scanptr.p->scanNumber = free; scanptr.p->scanNumber = free;
tFragPtr.p->m_scanNumberMask.clear(free);// Update mask fragptr.p->m_scanNumberMask.clear(free);// Update mask
LocalDLList<ScanRecord> active(c_scanRecordPool, tFragPtr.p->m_activeScans); LocalDLList<ScanRecord> active(c_scanRecordPool, fragptr.p->m_activeScans);
active.add(scanptr); active.add(scanptr);
if(scanptr.p->scanKeyinfoFlag){ if(scanptr.p->scanKeyinfoFlag){
jam(); jam();
...@@ -8666,12 +8671,8 @@ void Dblqh::finishScanrec(Signal* signal) ...@@ -8666,12 +8671,8 @@ void Dblqh::finishScanrec(Signal* signal)
{ {
release_acc_ptr_list(scanptr.p); release_acc_ptr_list(scanptr.p);
FragrecordPtr tFragPtr;
tFragPtr.i = scanptr.p->fragPtrI;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
LocalDLFifoList<ScanRecord> queue(c_scanRecordPool, LocalDLFifoList<ScanRecord> queue(c_scanRecordPool,
tFragPtr.p->m_queuedScans); fragptr.p->m_queuedScans);
if(scanptr.p->scanState == ScanRecord::IN_QUEUE){ if(scanptr.p->scanState == ScanRecord::IN_QUEUE){
jam(); jam();
...@@ -8689,11 +8690,11 @@ void Dblqh::finishScanrec(Signal* signal) ...@@ -8689,11 +8690,11 @@ void Dblqh::finishScanrec(Signal* signal)
ndbrequire(tmp.p == scanptr.p); ndbrequire(tmp.p == scanptr.p);
} }
LocalDLList<ScanRecord> scans(c_scanRecordPool, tFragPtr.p->m_activeScans); LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans);
scans.release(scanptr); scans.release(scanptr);
const Uint32 scanNumber = scanptr.p->scanNumber; const Uint32 scanNumber = scanptr.p->scanNumber;
ndbrequire(!tFragPtr.p->m_scanNumberMask.get(scanNumber)); ndbrequire(!fragptr.p->m_scanNumberMask.get(scanNumber));
ScanRecordPtr restart; ScanRecordPtr restart;
/** /**
...@@ -8701,13 +8702,13 @@ void Dblqh::finishScanrec(Signal* signal) ...@@ -8701,13 +8702,13 @@ void Dblqh::finishScanrec(Signal* signal)
*/ */
if(scanNumber == NR_ScanNo || !queue.first(restart)){ if(scanNumber == NR_ScanNo || !queue.first(restart)){
jam(); jam();
tFragPtr.p->m_scanNumberMask.set(scanNumber); fragptr.p->m_scanNumberMask.set(scanNumber);
return; return;
} }
if(ERROR_INSERTED(5034)){ if(ERROR_INSERTED(5034)){
jam(); jam();
tFragPtr.p->m_scanNumberMask.set(scanNumber); fragptr.p->m_scanNumberMask.set(scanNumber);
return; return;
} }
...@@ -8718,7 +8719,7 @@ void Dblqh::finishScanrec(Signal* signal) ...@@ -8718,7 +8719,7 @@ void Dblqh::finishScanrec(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
restart.p->scanNumber = scanNumber; restart.p->scanNumber = scanNumber;
restart.p->scanState = ScanRecord::WAIT_ACC_SCAN; restart.p->scanState = ScanRecord::WAIT_ACC_SCAN;
queue.remove(restart); queue.remove(restart);
scans.add(restart); scans.add(restart);
if(restart.p->scanKeyinfoFlag){ if(restart.p->scanKeyinfoFlag){
......
...@@ -1086,8 +1086,11 @@ NdbConnection::getNdbIndexScanOperation(const NdbIndexImpl* index, ...@@ -1086,8 +1086,11 @@ NdbConnection::getNdbIndexScanOperation(const NdbIndexImpl* index,
if (indexTable != 0){ if (indexTable != 0){
NdbIndexScanOperation* tOp = NdbIndexScanOperation* tOp =
getNdbScanOperation((NdbTableImpl *) indexTable); getNdbScanOperation((NdbTableImpl *) indexTable);
tOp->m_currentTable = table; if(tOp)
if(tOp) tOp->m_cursor_type = NdbScanOperation::IndexCursor; {
tOp->m_currentTable = table;
tOp->m_cursor_type = NdbScanOperation::IndexCursor;
}
return tOp; return tOp;
} else { } else {
setOperationErrorCodeAbort(theNdb->theError.code); setOperationErrorCodeAbort(theNdb->theError.code);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment