Commit 6fc580c0 authored by mronstrom@mysql.com's avatar mronstrom@mysql.com

Finalised first step of scan optimisations

Everything is working but a few things is still remaining
parent e30e90fb
...@@ -68,7 +68,7 @@ ...@@ -68,7 +68,7 @@
* API can order a multiple of this number of records at a time since * API can order a multiple of this number of records at a time since
* fragments can be scanned in parallel. * fragments can be scanned in parallel.
*/ */
#define MAX_PARALLEL_OP_PER_SCAN 64 #define MAX_PARALLEL_OP_PER_SCAN 512
/* /*
* When calculating the number of records sent from LQH in each batch * When calculating the number of records sent from LQH in each batch
* one uses SCAN_BATCH_SIZE divided by the expected size of signals * one uses SCAN_BATCH_SIZE divided by the expected size of signals
...@@ -83,7 +83,7 @@ ...@@ -83,7 +83,7 @@
* batch size from all nodes. This parameter should most likely be * batch size from all nodes. This parameter should most likely be
* configurable, or dependent on sendBufferSize. * configurable, or dependent on sendBufferSize.
*/ */
#define MAX_SCAN_BATCH_SIZE 196608 #define MAX_SCAN_BATCH_SIZE 262144
/* /*
* Maximum number of Parallel Scan queries on one hash index fragment * Maximum number of Parallel Scan queries on one hash index fragment
*/ */
......
...@@ -6113,15 +6113,15 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal){ ...@@ -6113,15 +6113,15 @@ void Dbdih::execCREATE_FRAGMENTATION_REQ(Signal * signal){
switch(fragmentType){ switch(fragmentType){
case DictTabInfo::AllNodesSmallTable: case DictTabInfo::AllNodesSmallTable:
jam(); jam();
noOfFragments = cnoOfNodeGroups; noOfFragments = csystemnodes;
break; break;
case DictTabInfo::AllNodesMediumTable: case DictTabInfo::AllNodesMediumTable:
jam(); jam();
noOfFragments = 2 * cnoOfNodeGroups; noOfFragments = csystemnodes;
break; break;
case DictTabInfo::AllNodesLargeTable: case DictTabInfo::AllNodesLargeTable:
jam(); jam();
noOfFragments = 8 * cnoOfNodeGroups; noOfFragments = csystemnodes;
break; break;
case DictTabInfo::SingleFragment: case DictTabInfo::SingleFragment:
jam(); jam();
......
...@@ -532,7 +532,8 @@ public: ...@@ -532,7 +532,8 @@ public:
SCAN = 1, SCAN = 1,
COPY = 2 COPY = 2
}; };
UintR scanAccOpPtr[MAX_PARALLEL_OP_PER_SCAN]; UintR scan_acc_op_ptr[MAX_PARALLEL_OP_PER_SCAN];
Uint32 scan_acc_index;
UintR scanApiOpPtr; UintR scanApiOpPtr;
UintR scanLocalref[2]; UintR scanLocalref[2];
Uint32 scan_batch_len; Uint32 scan_batch_len;
...@@ -2219,6 +2220,13 @@ private: ...@@ -2219,6 +2220,13 @@ private:
void execTUX_ADD_ATTRREF(Signal* signal); void execTUX_ADD_ATTRREF(Signal* signal);
// Statement blocks // Statement blocks
void init_acc_ptr_list(ScanRecord*);
bool seize_acc_ptr_list(ScanRecord*, Uint32);
void release_acc_ptr_list(ScanRecord*);
Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32);
void set_acc_ptr_in_scan_record(ScanRecord*, Uint32, Uint32);
void removeTable(Uint32 tableId); void removeTable(Uint32 tableId);
void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId); void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId);
void sendEMPTY_LCP_CONF(Signal* signal, bool idle); void sendEMPTY_LCP_CONF(Signal* signal, bool idle);
...@@ -2248,7 +2256,6 @@ private: ...@@ -2248,7 +2256,6 @@ private:
void sendAttrinfoSignal(Signal* signal); void sendAttrinfoSignal(Signal* signal);
void sendLqhAttrinfoSignal(Signal* signal); void sendLqhAttrinfoSignal(Signal* signal);
void sendKeyinfoAcc(Signal* signal); void sendKeyinfoAcc(Signal* signal);
void initScanAccOp(Signal* signal);
Uint32 initScanrec(const class ScanFragReq *); Uint32 initScanrec(const class ScanFragReq *);
void initScanTc(Signal* signal, void initScanTc(Signal* signal,
Uint32 transid1, Uint32 transid1,
......
...@@ -2098,7 +2098,6 @@ void Dblqh::execTIME_SIGNAL(Signal* signal) ...@@ -2098,7 +2098,6 @@ void Dblqh::execTIME_SIGNAL(Signal* signal)
ScanRecordPtr TscanPtr; ScanRecordPtr TscanPtr;
c_scanRecordPool.getPtr(TscanPtr, tTcConptr.p->tcScanRec); c_scanRecordPool.getPtr(TscanPtr, tTcConptr.p->tcScanRec);
ndbout << " scanState = " << TscanPtr.p->scanState << endl; ndbout << " scanState = " << TscanPtr.p->scanState << endl;
//TscanPtr.p->scanAccOpPtr[16];
//TscanPtr.p->scanLocalref[2]; //TscanPtr.p->scanLocalref[2];
ndbout << " copyPtr="<<TscanPtr.p->copyPtr ndbout << " copyPtr="<<TscanPtr.p->copyPtr
<< " scanAccPtr="<<TscanPtr.p->scanAccPtr << " scanAccPtr="<<TscanPtr.p->scanAccPtr
...@@ -3584,7 +3583,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) ...@@ -3584,7 +3583,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
takeOverErrorLab(signal); takeOverErrorLab(signal);
return; return;
}//if }//if
Uint32 accOpPtr = scanptr.p->scanAccOpPtr[ttcScanOp]; Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, ttcScanOp);
if (accOpPtr == RNIL) { if (accOpPtr == RNIL) {
jam(); jam();
releaseActiveFrag(signal); releaseActiveFrag(signal);
...@@ -6984,7 +6983,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal) ...@@ -6984,7 +6983,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
// Update timer on tcConnectRecord // Update timer on tcConnectRecord
tcConnectptr.p->tcTimer = cLqhTimeOutCount; tcConnectptr.p->tcTimer = cLqhTimeOutCount;
initScanAccOp(signal); init_acc_ptr_list(scanptr.p);
scanptr.p->scanCompletedOperations = 0; scanptr.p->scanCompletedOperations = 0;
scanptr.p->scan_batch_len= 0; scanptr.p->scan_batch_len= 0;
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
...@@ -7029,9 +7028,8 @@ void Dblqh::continueScanReleaseAfterBlockedLab(Signal* signal) ...@@ -7029,9 +7028,8 @@ void Dblqh::continueScanReleaseAfterBlockedLab(Signal* signal)
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
scanptr.p->scanState = ScanRecord::WAIT_RELEASE_LOCK; scanptr.p->scanState = ScanRecord::WAIT_RELEASE_LOCK;
signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[0] = scanptr.p->scanAccPtr;
ndbrequire((scanptr.p->scanReleaseCounter -1) < MAX_PARALLEL_OP_PER_SCAN); signal->theData[1]=
signal->theData[1] = get_acc_ptr_from_scan_record(scanptr.p, scanptr.p->scanReleaseCounter -1);
scanptr.p->scanAccOpPtr[scanptr.p->scanReleaseCounter -1];
signal->theData[2] = NextScanReq::ZSCAN_COMMIT; signal->theData[2] = NextScanReq::ZSCAN_COMMIT;
if (! scanptr.p->rangeScan) if (! scanptr.p->rangeScan)
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB); sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
...@@ -7220,6 +7218,43 @@ void Dblqh::scanLockReleasedLab(Signal* signal) ...@@ -7220,6 +7218,43 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
}//if }//if
}//Dblqh::scanLockReleasedLab() }//Dblqh::scanLockReleasedLab()
bool
Dblqh::seize_acc_ptr_list(ScanRecord* scanP, Uint32 batch_size)
{
scanP->scan_acc_index = 0;
return true;
}
void
Dblqh::release_acc_ptr_list(ScanRecord* scanP)
{
scanP->scan_acc_index = 0;
}
void
Dblqh::init_acc_ptr_list(ScanRecord* scanP)
{
scanP->scan_acc_index = 0;
}
Uint32
Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP, Uint32 index)
{
ndbrequire((index < MAX_PARALLEL_OP_PER_SCAN) &&
index < scanP->scan_acc_index);
return scanP->scan_acc_op_ptr[index];
}
void
Dblqh::set_acc_ptr_in_scan_record(ScanRecord* scanP,
Uint32 index, Uint32 acc)
{
ndbrequire((index == 0 || scanP->scan_acc_index == index) &&
(index < MAX_PARALLEL_OP_PER_SCAN));
scanP->scan_acc_index= index + 1;
scanP->scan_acc_op_ptr[index]= acc;
}
/* ------------------------------------------------------------------------- /* -------------------------------------------------------------------------
* SCAN_FRAGREQ: Request to start scanning the specified fragment of a table. * SCAN_FRAGREQ: Request to start scanning the specified fragment of a table.
* ------------------------------------------------------------------------- */ * ------------------------------------------------------------------------- */
...@@ -7615,7 +7650,7 @@ void Dblqh::continueFirstScanAfterBlockedLab(Signal* signal) ...@@ -7615,7 +7650,7 @@ void Dblqh::continueFirstScanAfterBlockedLab(Signal* signal)
scanptr.i = tcConnectptr.p->tcScanRec; scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
scanptr.p->scanState = ScanRecord::WAIT_NEXT_SCAN; scanptr.p->scanState = ScanRecord::WAIT_NEXT_SCAN;
initScanAccOp(signal); init_acc_ptr_list(scanptr.p);
signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = RNIL; signal->theData[1] = RNIL;
signal->theData[2] = NextScanReq::ZSCAN_NEXT; signal->theData[2] = NextScanReq::ZSCAN_NEXT;
...@@ -7784,9 +7819,9 @@ void Dblqh::nextScanConfScanLab(Signal* signal) ...@@ -7784,9 +7819,9 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
return; return;
}//if }//if
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN); set_acc_ptr_in_scan_record(scanptr.p,
scanptr.p->scanAccOpPtr[scanptr.p->scanCompletedOperations] = scanptr.p->scanCompletedOperations,
nextScanConf->accOperationPtr; nextScanConf->accOperationPtr);
scanptr.p->scanLocalref[0] = nextScanConf->localKey[0]; scanptr.p->scanLocalref[0] = nextScanConf->localKey[0];
scanptr.p->scanLocalref[1] = nextScanConf->localKey[1]; scanptr.p->scanLocalref[1] = nextScanConf->localKey[1];
scanptr.p->scanLocalFragid = nextScanConf->fragId; scanptr.p->scanLocalFragid = nextScanConf->fragId;
...@@ -7870,7 +7905,6 @@ void Dblqh::nextScanConfLoopLab(Signal* signal) ...@@ -7870,7 +7905,6 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
tupKeyReq->keyRef1 = scanptr.p->scanLocalref[0]; tupKeyReq->keyRef1 = scanptr.p->scanLocalref[0];
tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1]; tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1];
tupKeyReq->attrBufLen = 0; tupKeyReq->attrBufLen = 0;
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
tupKeyReq->opRef = scanptr.p->scanApiOpPtr; tupKeyReq->opRef = scanptr.p->scanApiOpPtr;
tupKeyReq->applRef = scanptr.p->scanApiBlockref; tupKeyReq->applRef = scanptr.p->scanApiBlockref;
tupKeyReq->schemaVersion = scanptr.p->scanSchemaVersion; tupKeyReq->schemaVersion = scanptr.p->scanSchemaVersion;
...@@ -7886,7 +7920,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal) ...@@ -7886,7 +7920,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal, EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal,
TupKeyReq::SignalLength); TupKeyReq::SignalLength);
} }
}//Dblqh::nextScanConfLoopLab() }
/* ------------------------------------------------------------------------- /* -------------------------------------------------------------------------
* RECEPTION OF FURTHER KEY INFORMATION WHEN KEY SIZE > 16 BYTES. * RECEPTION OF FURTHER KEY INFORMATION WHEN KEY SIZE > 16 BYTES.
...@@ -8024,10 +8058,12 @@ void Dblqh::continueScanAfterBlockedLab(Signal* signal) ...@@ -8024,10 +8058,12 @@ void Dblqh::continueScanAfterBlockedLab(Signal* signal)
if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_ABORT) { if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_ABORT) {
jam(); jam();
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
accOpPtr = scanptr.p->scanAccOpPtr[scanptr.p->scanCompletedOperations]; accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
scanptr.p->scanCompletedOperations);
} else if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_COMMIT) { } else if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_COMMIT) {
jam(); jam();
accOpPtr = scanptr.p->scanAccOpPtr[scanptr.p->scanCompletedOperations - 1]; accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
scanptr.p->scanCompletedOperations);
} else { } else {
jam(); jam();
accOpPtr = RNIL; // The value is not used in ACC accOpPtr = RNIL; // The value is not used in ACC
...@@ -8219,6 +8255,7 @@ void Dblqh::tupScanCloseConfLab(Signal* signal) ...@@ -8219,6 +8255,7 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
scanptr.p->scan_batch_len= 0; scanptr.p->scan_batch_len= 0;
sendScanFragConf(signal, ZSCAN_FRAG_CLOSED); sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
}//if }//if
release_acc_ptr_list(scanptr.p);
finishScanrec(signal); finishScanrec(signal);
releaseScanrec(signal); releaseScanrec(signal);
tcConnectptr.p->tcScanRec = RNIL; tcConnectptr.p->tcScanRec = RNIL;
...@@ -8227,20 +8264,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal) ...@@ -8227,20 +8264,6 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
releaseTcrec(signal, tcConnectptr); releaseTcrec(signal, tcConnectptr);
}//Dblqh::tupScanCloseConfLab() }//Dblqh::tupScanCloseConfLab()
/* =========================================================================
* ======= INITIATE SCAN_ACC_OP_PTR TO RNIL IN SCAN RECORD =======
*
* SUBROUTINE SHORT NAME = ISA
* ========================================================================= */
void Dblqh::initScanAccOp(Signal* signal)
{
UintR tisaIndex;
for (tisaIndex = 0; tisaIndex < MAX_PARALLEL_OP_PER_SCAN; tisaIndex++) {
scanptr.p->scanAccOpPtr[tisaIndex] = RNIL;
}//for
}//Dblqh::initScanAccOp()
/* ========================================================================= /* =========================================================================
* ======= INITIATE SCAN RECORD ======= * ======= INITIATE SCAN RECORD =======
* *
...@@ -8283,10 +8306,11 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8283,10 +8306,11 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanTcWaiting = ZTRUE; scanptr.p->scanTcWaiting = ZTRUE;
scanptr.p->scanNumber = ~0; scanptr.p->scanNumber = ~0;
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr; scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
for (Uint32 i = 0; i < scanConcurrentOperations; i++) {
scanptr.p->scanAccOpPtr[i] = 0;
}//for
if (!seize_acc_ptr_list(scanptr.p, scanConcurrentOperations)) {
jam();
return ScanFragRef::ZTOO_MANY_ACTIVE_SCAN_ERROR;
}
/** /**
* Used for scan take over * Used for scan take over
*/ */
...@@ -8341,38 +8365,9 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8341,38 +8365,9 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
#endif #endif
c_scanTakeOverHash.add(scanptr); c_scanTakeOverHash.add(scanptr);
} }
init_acc_ptr_list(scanptr.p);
return ZOK; return ZOK;
}
#if 0
if (! rangeScan) {
jam();
for (Int32 i = NR_ScanNo - 1; i >= 0; i--) {
jam();
if (fragptr.p->fragScanRec[i] == ZNIL) {
jam();
scanptr.p->scanNumber = i;
fragptr.p->fragScanRec[i] = scanptr.i;
return ZOK;
}//if
}//for
} else {
jam();
// put in second half of fragScanRec of primary table fragment
FragrecordPtr tFragPtr;
tFragPtr.i = fragptr.p->tableFragptr;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
for (Uint32 i = NR_MinRangeScanNo; i < NR_MaxRangeScanNo; i++) {
if (tFragPtr.p->fragScanRec[i] == ZNIL) {
jam();
scanptr.p->scanNumber = i;
tFragPtr.p->fragScanRec[i] = scanptr.i;
return ZOK;
}
}
}
return ZNO_FREE_FRAG_SCAN_REC_ERROR;
#endif
}//Dblqh::initScanrec()
/* ========================================================================= /* =========================================================================
* ======= INITIATE TC RECORD AT SCAN ======= * ======= INITIATE TC RECORD AT SCAN =======
...@@ -8943,7 +8938,7 @@ void Dblqh::nextScanConfCopyLab(Signal* signal) ...@@ -8943,7 +8938,7 @@ void Dblqh::nextScanConfCopyLab(Signal* signal)
return; return;
} }
scanptr.p->scanAccOpPtr[0] = nextScanConf->accOperationPtr; set_acc_ptr_in_scan_record(scanptr.p, 0, nextScanConf->accOperationPtr);
initCopyTc(signal); initCopyTc(signal);
if (tcConnectptr.p->primKeyLen > 4) { if (tcConnectptr.p->primKeyLen > 4) {
jam(); jam();
...@@ -9217,8 +9212,9 @@ void Dblqh::continueCopyAfterBlockedLab(Signal* signal) ...@@ -9217,8 +9212,9 @@ void Dblqh::continueCopyAfterBlockedLab(Signal* signal)
scanptr.i = tcConnectptr.p->tcScanRec; scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
tcConnectptr.p->errorCode = 0; tcConnectptr.p->errorCode = 0;
Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0);
signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = scanptr.p->scanAccOpPtr[0]; signal->theData[1] = acc_op_ptr;
signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT; signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT;
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB); sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
return; return;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment