Commit 8296883f authored by unknown's avatar unknown

ndb - Fix problem with parallelism < fragcount & committedread


ndb/src/kernel/blocks/dbtc/Dbtc.hpp:
  Pre book fragments to keep count how many ops need to be continued by 
    api
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  Pre book fragments to keep count how many ops need to be continued by 
    api
  
  Fixes problem with parallelism < fragcount && committed read
ndb/test/ndbapi/testScan.cpp:
  new testcase for scan parallelism
ndb/test/run-test/daily-basic-tests.txt:
  Run new testcase
ndb/test/src/HugoTransactions.cpp:
  Use parallelism parameter
parent 124bc850
......@@ -1177,7 +1177,7 @@ public:
Uint32 nextScan;
// Length of expected attribute information
Uint32 scanAiLength;
union { Uint32 scanAiLength; Uint32 m_booked_fragments_count; };
Uint32 scanKeyLen;
......
......@@ -3195,7 +3195,6 @@ void Dbtc::attrinfoDihReceivedLab(Signal* signal)
CacheRecord * const regCachePtr = cachePtr.p;
TcConnectRecord * const regTcPtr = tcConnectptr.p;
Uint16 Tnode = regTcPtr->tcNodedata[0];
Uint16 TscanTakeOverInd = regCachePtr->scanTakeOverInd;
TableRecordPtr localTabptr;
localTabptr.i = regCachePtr->tableref;
......@@ -8931,6 +8930,7 @@ void Dbtc::diFcountReqLab(Signal* signal, ScanRecordPtr scanptr)
}
scanptr.p->scanNextFragId = 0;
scanptr.p->m_booked_fragments_count= 0;
scanptr.p->scanState = ScanRecord::WAIT_FRAGMENT_COUNT;
if(!cachePtr.p->distributionKeyIndicator)
......@@ -9573,6 +9573,9 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
*/
jam();
ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag);
jam();
ndbassert(scanptr.p->m_booked_fragments_count);
scanptr.p->m_booked_fragments_count--;
scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
tcConnectptr.i = scanptr.p->scanTcrec;
......@@ -9816,6 +9819,7 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
}
Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId;
Uint32 booked = scanPtr.p->m_booked_fragments_count;
ScanTabConf * conf = (ScanTabConf*)&signal->theData[0];
conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
......@@ -9831,7 +9835,9 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
ScanFragRecPtr curr = ptr; // Remove while iterating...
queued.next(ptr);
bool done = curr.p->m_scan_frag_conf_status && --left;
bool done = curr.p->m_scan_frag_conf_status && (left == booked);
if(curr.p->m_scan_frag_conf_status && (booked < left))
booked++;
* ops++ = curr.p->m_apiPtr;
* ops++ = done ? RNIL : curr.i;
......@@ -9850,8 +9856,10 @@ void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
}
}
scanPtr.p->m_booked_fragments_count = booked;
if(scanPtr.p->m_delivered_scan_frags.isEmpty() &&
scanPtr.p->m_running_scan_frags.isEmpty()){
scanPtr.p->m_running_scan_frags.isEmpty())
{
conf->requestInfo = op_count | ScanTabConf::EndOfData;
releaseScanResources(scanPtr);
}
......
......@@ -1065,6 +1065,44 @@ int runScanRestart(NDBT_Context* ctx, NDBT_Step* step){
}
int
runScanParallelism(NDBT_Context* ctx, NDBT_Step* step){
int loops = ctx->getNumLoops() + 3;
int records = ctx->getNumRecords();
int abort = ctx->getProperty("AbortProb", 15);
Uint32 fib[] = { 1, 2 };
Uint32 parallelism = 0; // start with 0
int i = 0;
HugoTransactions hugoTrans(*ctx->getTab());
while (i<loops && !ctx->isTestStopped()) {
g_info << i << ": ";
if (hugoTrans.scanReadRecords(GETNDB(step), records, abort, parallelism,
NdbOperation::LM_Read) != 0){
return NDBT_FAILED;
}
if (hugoTrans.scanReadRecords(GETNDB(step), records, abort, parallelism,
NdbOperation::LM_Exclusive) != 0){
return NDBT_FAILED;
}
if (hugoTrans.scanReadRecords(GETNDB(step), records, abort, parallelism,
NdbOperation::LM_CommittedRead) != 0){
return NDBT_FAILED;
}
if (hugoTrans.scanUpdateRecords(GETNDB(step), records, abort, parallelism)
!= 0){
return NDBT_FAILED;
}
i++;
parallelism = fib[0];
Uint32 next = fib[0] + fib[1];
fib[0] = fib[1];
fib[1] = next;
}
return NDBT_OK;
}
NDBT_TESTSUITE(testScan);
TESTCASE("ScanRead",
"Verify scan requirement: It should be possible "\
......@@ -1515,6 +1553,12 @@ TESTCASE("ScanRestart",
STEP(runScanRestart);
FINALIZER(runClearTable);
}
TESTCASE("ScanParallelism",
"Test scan with different parallelism"){
INITIALIZER(runLoadTable);
STEP(runScanParallelism);
FINALIZER(runClearTable);
}
NDBT_TESTSUITE_END(testScan);
int main(int argc, const char** argv){
......
......@@ -345,6 +345,10 @@ max-time: 500
cmd: testScan
args: -n ScanRestart T1
max-time: 500
cmd: testScan
args: -n ScanParallelism
#
# DICT TESTS
max-time: 1500
......
......@@ -72,7 +72,7 @@ HugoTransactions::scanReadRecords(Ndb* pNdb,
return NDBT_FAILED;
}
if( pOp ->readTuples(lm) ) {
if( pOp ->readTuples(lm, 0, parallelism) ) {
ERR(pTrans->getNdbError());
pNdb->closeTransaction(pTrans);
return NDBT_FAILED;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment