Commit 8a274c44 authored by mronstrom@mysql.com's avatar mronstrom@mysql.com

First step for WL 2025

Not yet fully working
Scan reads work fine, not scan updates
parent 6b8e4c93
...@@ -68,7 +68,22 @@ ...@@ -68,7 +68,22 @@
* API can order a multiple of this number of records at a time since * API can order a multiple of this number of records at a time since
* fragments can be scanned in parallel. * fragments can be scanned in parallel.
*/ */
#define MAX_PARALLEL_OP_PER_SCAN 16 #define MAX_PARALLEL_OP_PER_SCAN 64
/*
* When calculating the number of records sent from LQH in each batch
* one uses SCAN_BATCH_SIZE divided by the expected size of signals
* per row. This gives the batch size used for the scan. The NDB API
* will receive one batch from each node at a time so there has to be
* some care taken also so that the NDB API is not overloaded with
* signals.
*/
#define SCAN_BATCH_SIZE 32768
/*
* To protect the NDB API from overload we also define a maximum total
* batch size from all nodes. This parameter should most likely be
* configurable, or dependent on sendBufferSize.
*/
#define MAX_SCAN_BATCH_SIZE 196608
/* /*
* Maximum number of Parallel Scan queries on one hash index fragment * Maximum number of Parallel Scan queries on one hash index fragment
*/ */
......
...@@ -33,7 +33,7 @@ class ScanFragReq { ...@@ -33,7 +33,7 @@ class ScanFragReq {
*/ */
friend class Dblqh; friend class Dblqh;
public: public:
STATIC_CONST( SignalLength = 25 ); STATIC_CONST( SignalLength = 13 );
public: public:
Uint32 senderData; Uint32 senderData;
...@@ -45,9 +45,11 @@ public: ...@@ -45,9 +45,11 @@ public:
Uint32 schemaVersion; Uint32 schemaVersion;
Uint32 transId1; Uint32 transId1;
Uint32 transId2; Uint32 transId2;
Uint32 clientOpPtr[MAX_PARALLEL_OP_PER_SCAN]; Uint32 clientOpPtr;
Uint32 concurrency;
Uint32 batch_byte_size;
Uint32 first_batch_size;
static Uint32 getConcurrency(const Uint32 & requestInfo);
static Uint32 getLockMode(const Uint32 & requestInfo); static Uint32 getLockMode(const Uint32 & requestInfo);
static Uint32 getHoldLockFlag(const Uint32 & requestInfo); static Uint32 getHoldLockFlag(const Uint32 & requestInfo);
static Uint32 getKeyinfoFlag(const Uint32 & requestInfo); static Uint32 getKeyinfoFlag(const Uint32 & requestInfo);
...@@ -56,7 +58,6 @@ public: ...@@ -56,7 +58,6 @@ public:
static Uint32 getAttrLen(const Uint32 & requestInfo); static Uint32 getAttrLen(const Uint32 & requestInfo);
static Uint32 getScanPrio(const Uint32 & requestInfo); static Uint32 getScanPrio(const Uint32 & requestInfo);
static void setConcurrency(Uint32 & requestInfo, Uint32 concurrency);
static void setLockMode(Uint32 & requestInfo, Uint32 lockMode); static void setLockMode(Uint32 & requestInfo, Uint32 lockMode);
static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock); static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock);
static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo); static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo);
...@@ -79,7 +80,6 @@ class KeyInfo20 { ...@@ -79,7 +80,6 @@ class KeyInfo20 {
friend class NdbOperation; friend class NdbOperation;
friend class NdbScanReceiver; friend class NdbScanReceiver;
public: public:
//STATIC_CONST( SignalLength = 21 );
STATIC_CONST( HeaderLength = 5); STATIC_CONST( HeaderLength = 5);
STATIC_CONST( DataLength = 20 ); STATIC_CONST( DataLength = 20 );
...@@ -110,15 +110,15 @@ class ScanFragConf { ...@@ -110,15 +110,15 @@ class ScanFragConf {
friend class Backup; friend class Backup;
friend class Suma; friend class Suma;
public: public:
STATIC_CONST( SignalLength = 21 ); STATIC_CONST( SignalLength = 6 );
public: public:
Uint32 senderData; Uint32 senderData;
Uint32 completedOps; Uint32 completedOps;
Uint32 fragmentCompleted; Uint32 fragmentCompleted;
Uint32 opReturnDataLen[16];
Uint32 transId1; Uint32 transId1;
Uint32 transId2; Uint32 transId2;
Uint32 total_len;
}; };
class ScanFragRef { class ScanFragRef {
...@@ -188,7 +188,6 @@ public: ...@@ -188,7 +188,6 @@ public:
* Request Info * Request Info
* *
* a = Length of attrinfo - 16 Bits (16-31) * a = Length of attrinfo - 16 Bits (16-31)
* c = Concurrency - 5 Bits (0-4) -> Max 31
* l = Lock Mode - 1 Bit 5 * l = Lock Mode - 1 Bit 5
* h = Hold lock - 1 Bit 7 * h = Hold lock - 1 Bit 7
* k = Keyinfo - 1 Bit 8 * k = Keyinfo - 1 Bit 8
...@@ -198,11 +197,8 @@ public: ...@@ -198,11 +197,8 @@ public:
* *
* 1111111111222222222233 * 1111111111222222222233
* 01234567890123456789012345678901 * 01234567890123456789012345678901
* ccccclxhkr ppppaaaaaaaaaaaaaaaa * lxhkr ppppaaaaaaaaaaaaaaaa
*/ */
#define SF_CONCURRENCY_SHIFT (0)
#define SF_CONCURRENCY_MASK (31)
#define SF_LOCK_MODE_SHIFT (5) #define SF_LOCK_MODE_SHIFT (5)
#define SF_LOCK_MODE_MASK (1) #define SF_LOCK_MODE_MASK (1)
...@@ -217,12 +213,6 @@ public: ...@@ -217,12 +213,6 @@ public:
#define SF_PRIO_SHIFT 12 #define SF_PRIO_SHIFT 12
#define SF_PRIO_MASK 15 #define SF_PRIO_MASK 15
inline
Uint32
ScanFragReq::getConcurrency(const Uint32 & requestInfo){
return (requestInfo >> SF_CONCURRENCY_SHIFT) & SF_CONCURRENCY_MASK;
}
inline inline
Uint32 Uint32
ScanFragReq::getLockMode(const Uint32 & requestInfo){ ScanFragReq::getLockMode(const Uint32 & requestInfo){
...@@ -272,13 +262,6 @@ ScanFragReq::setScanPrio(UintR & requestInfo, UintR val){ ...@@ -272,13 +262,6 @@ ScanFragReq::setScanPrio(UintR & requestInfo, UintR val){
requestInfo |= (val << SF_PRIO_SHIFT); requestInfo |= (val << SF_PRIO_SHIFT);
} }
inline
void
ScanFragReq::setConcurrency(UintR & requestInfo, UintR val){
ASSERT_MAX(val, SF_CONCURRENCY_MASK, "ScanFragReq::setConcurrency");
requestInfo |= (val << SF_CONCURRENCY_SHIFT);
}
inline inline
void void
ScanFragReq::setLockMode(UintR & requestInfo, UintR val){ ScanFragReq::setLockMode(UintR & requestInfo, UintR val){
...@@ -324,7 +307,7 @@ ScanFragReq::setAttrLen(UintR & requestInfo, UintR val){ ...@@ -324,7 +307,7 @@ ScanFragReq::setAttrLen(UintR & requestInfo, UintR val){
inline inline
Uint32 Uint32
KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){ KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){
ASSERT_MAX(opNo, 15, "KeyInfo20::setScanInfo"); ASSERT_MAX(opNo, 1023, "KeyInfo20::setScanInfo");
ASSERT_MAX(scanNo, 255, "KeyInfo20::setScanInfo"); ASSERT_MAX(scanNo, 255, "KeyInfo20::setScanInfo");
return (opNo << 8) + scanNo; return (opNo << 8) + scanNo;
} }
...@@ -338,7 +321,7 @@ KeyInfo20::getScanNo(Uint32 scanInfo){ ...@@ -338,7 +321,7 @@ KeyInfo20::getScanNo(Uint32 scanInfo){
inline inline
Uint32 Uint32
KeyInfo20::getScanOp(Uint32 scanInfo){ KeyInfo20::getScanOp(Uint32 scanInfo){
return (scanInfo >> 8) & 0xF; return (scanInfo >> 8) & 0x1023;
} }
#endif #endif
...@@ -45,7 +45,7 @@ public: ...@@ -45,7 +45,7 @@ public:
/** /**
* Length of signal * Length of signal
*/ */
STATIC_CONST( SignalLength = 25 ); STATIC_CONST( StaticLength = 11 );
private: private:
...@@ -63,7 +63,8 @@ private: ...@@ -63,7 +63,8 @@ private:
UintR transId1; // DATA 6 UintR transId1; // DATA 6
UintR transId2; // DATA 7 UintR transId2; // DATA 7
UintR buddyConPtr; // DATA 8 UintR buddyConPtr; // DATA 8
UintR apiOperationPtr[16]; // DATA 9-25 UintR batch_byte_size; // DATA 9
UintR first_batch_size; // DATA 10
/** /**
* Get:ers for requestInfo * Get:ers for requestInfo
...@@ -95,11 +96,11 @@ private: ...@@ -95,11 +96,11 @@ private:
h = Hold lock mode - 1 Bit 10 h = Hold lock mode - 1 Bit 10
c = Read Committed - 1 Bit 11 c = Read Committed - 1 Bit 11
x = Range Scan (TUX) - 1 Bit 15 x = Range Scan (TUX) - 1 Bit 15
b = Scan batch - 5 Bit 16-19 (max 15) b = Scan batch - 10 Bit 16-25 (max 1023)
1111111111222222222233 1111111111222222222233
01234567890123456789012345678901 01234567890123456789012345678901
ppppppppl hc xbbbbb ppppppppl hc xbbbbbbbbbb
*/ */
#define PARALLELL_SHIFT (0) #define PARALLELL_SHIFT (0)
...@@ -118,7 +119,7 @@ private: ...@@ -118,7 +119,7 @@ private:
#define RANGE_SCAN_MASK (1) #define RANGE_SCAN_MASK (1)
#define SCAN_BATCH_SHIFT (16) #define SCAN_BATCH_SHIFT (16)
#define SCAN_BATCH_MASK (31) #define SCAN_BATCH_MASK (1023)
inline inline
Uint8 Uint8
...@@ -201,6 +202,7 @@ inline ...@@ -201,6 +202,7 @@ inline
void void
ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){ ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){
ASSERT_MAX(flag, SCAN_BATCH_MASK, "ScanTabReq::setScanBatch"); ASSERT_MAX(flag, SCAN_BATCH_MASK, "ScanTabReq::setScanBatch");
requestInfo &= ~(SCAN_BATCH_MASK << SCAN_BATCH_SHIFT);
requestInfo |= (flag << SCAN_BATCH_SHIFT); requestInfo |= (flag << SCAN_BATCH_SHIFT);
} }
...@@ -250,8 +252,8 @@ private: ...@@ -250,8 +252,8 @@ private:
Uint32 info; Uint32 info;
}; };
static Uint32 getLength(Uint32 opDataInfo) { return opDataInfo >> 5; }; static Uint32 getLength(Uint32 opDataInfo) { return opDataInfo >> 10; };
static Uint32 getRows(Uint32 opDataInfo) { return opDataInfo & 31;} static Uint32 getRows(Uint32 opDataInfo) { return opDataInfo & 1023;}
}; };
/** /**
......
...@@ -228,21 +228,21 @@ private: ...@@ -228,21 +228,21 @@ private:
* Scan Info * Scan Info
* *
t = Scan take over indicator - 1 Bit t = Scan take over indicator - 1 Bit
n = Take over node - 16 Bits -> max 65535 n = Take over node - 12 Bits -> max 65535
p = Scan Info - 12 Bits -> max 4095 p = Scan Info - 18 Bits -> max 4095
1111111111222222222233 1111111111222222222233
01234567890123456789012345678901 01234567890123456789012345678901
tpppppppppppp nnnnnnnnnnnnnnnn tpppppppppppppppppp nnnnnnnnnnnn
*/ */
#define TAKE_OVER_SHIFT (0) #define TAKE_OVER_SHIFT (0)
#define TAKE_OVER_NODE_SHIFT (16) #define TAKE_OVER_NODE_SHIFT (20)
#define TAKE_OVER_NODE_MASK (65535) #define TAKE_OVER_NODE_MASK (4095)
#define SCAN_INFO_SHIFT (1) #define SCAN_INFO_SHIFT (1)
#define SCAN_INFO_MASK (4095) #define SCAN_INFO_MASK (262143)
/** /**
* Attr Len * Attr Len
......
...@@ -75,6 +75,7 @@ private: ...@@ -75,6 +75,7 @@ private:
class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr); class NdbRecAttr * getValue(const class NdbColumnImpl*, char * user_dst_ptr);
void do_get_value(NdbReceiver*, Uint32 rows, Uint32 key_size); void do_get_value(NdbReceiver*, Uint32 rows, Uint32 key_size);
void prepareSend(); void prepareSend();
void calculate_batch_size(Uint32, Uint32, Uint32&, Uint32&, Uint32&);
int execKEYINFO20(Uint32 info, const Uint32* ptr, Uint32 len); int execKEYINFO20(Uint32 info, const Uint32* ptr, Uint32 len);
int execTRANSID_AI(const Uint32* ptr, Uint32 len); int execTRANSID_AI(const Uint32* ptr, Uint32 len);
......
...@@ -122,7 +122,6 @@ protected: ...@@ -122,7 +122,6 @@ protected:
NdbConnection *m_transConnection; NdbConnection *m_transConnection;
// Scan related variables // Scan related variables
Uint32 theBatchSize;
Uint32 theParallelism; Uint32 theParallelism;
Uint32 m_keyInfo; Uint32 m_keyInfo;
NdbApiSignal* theSCAN_TABREQ; NdbApiSignal* theSCAN_TABREQ;
......
...@@ -27,7 +27,7 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv ...@@ -27,7 +27,7 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
const UintR requestInfo = sig->requestInfo; const UintR requestInfo = sig->requestInfo;
fprintf(output, " apiConnectPtr: H\'%.8x\n", fprintf(output, " apiConnectPtr: H\'%.8x",
sig->apiConnectPtr); sig->apiConnectPtr);
fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo); fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo);
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Holdlock: %u, RangeScan: %u\n", fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Holdlock: %u, RangeScan: %u\n",
...@@ -42,23 +42,8 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv ...@@ -42,23 +42,8 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
fprintf(output, " transId(1, 2): (H\'%.8x, H\'%.8x) storedProcId: H\'%.8x\n", fprintf(output, " transId(1, 2): (H\'%.8x, H\'%.8x) storedProcId: H\'%.8x\n",
sig->transId1, sig->transId2, sig->storedProcId); sig->transId1, sig->transId2, sig->storedProcId);
fprintf(output, " batch_byte_size: %d, first_batch_size: %d\n",
fprintf(output, " OperationPtr(s):\n "); sig->batch_byte_size, sig->first_batch_size);
Uint32 restLen = (len - 9);
const Uint32 * rest = &sig->apiOperationPtr[0];
while(restLen >= 7){
fprintf(output,
" H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x H\'%.8x\n",
rest[0], rest[1], rest[2], rest[3],
rest[4], rest[5], rest[6]);
restLen -= 7;
rest += 7;
}
if(restLen > 0){
for(Uint32 i = 0; i<restLen; i++)
fprintf(output, " H\'%.8x", rest[i]);
fprintf(output, "\n");
}
return false; return false;
} }
......
...@@ -3324,20 +3324,16 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal) ...@@ -3324,20 +3324,16 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
req->requestInfo = 0; req->requestInfo = 0;
req->savePointId = 0; req->savePointId = 0;
req->tableId = table.tableId; req->tableId = table.tableId;
ScanFragReq::setConcurrency(req->requestInfo, parallelism); //ScanFragReq::setConcurrency(req->requestInfo, parallelism);
ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setLockMode(req->requestInfo, 0);
ScanFragReq::setHoldLockFlag(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 1); ScanFragReq::setKeyinfoFlag(req->requestInfo, 1);
ScanFragReq::setAttrLen(req->requestInfo,attrLen); ScanFragReq::setAttrLen(req->requestInfo,attrLen);
req->transId1 = 0; req->transId1 = 0;
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8); req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
req->clientOpPtr= filePtr.i;
Uint32 i; sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal,
for(i = 0; i<parallelism; i++) { ScanFragReq::SignalLength, JBB);
jam();
req->clientOpPtr[i] = filePtr.i;
}//for
sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 25, JBB);
signal->theData[0] = filePtr.i; signal->theData[0] = filePtr.i;
signal->theData[1] = 0; signal->theData[1] = 0;
...@@ -3351,6 +3347,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal) ...@@ -3351,6 +3347,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
signal->theData[7] = 0; signal->theData[7] = 0;
Uint32 dataPos = 8; Uint32 dataPos = 8;
Uint32 i;
for(i = 0; i<table.noOfAttributes; i++) { for(i = 0; i<table.noOfAttributes; i++) {
jam(); jam();
AttributePtr attr; AttributePtr attr;
...@@ -3655,7 +3652,7 @@ Backup::execSCAN_FRAGCONF(Signal* signal) ...@@ -3655,7 +3652,7 @@ Backup::execSCAN_FRAGCONF(Signal* signal)
c_backupFilePool.getPtr(filePtr, filePtrI); c_backupFilePool.getPtr(filePtr, filePtrI);
OperationRecord & op = filePtr.p->operation; OperationRecord & op = filePtr.p->operation;
op.scanConf(conf->completedOps, conf->opReturnDataLen); //op.scanConf(conf->completedOps, conf->opReturnDataLen);
const Uint32 completed = conf->fragmentCompleted; const Uint32 completed = conf->fragmentCompleted;
if(completed != 2) { if(completed != 2) {
......
...@@ -533,9 +533,11 @@ public: ...@@ -533,9 +533,11 @@ public:
COPY = 2 COPY = 2
}; };
UintR scanAccOpPtr[MAX_PARALLEL_OP_PER_SCAN]; UintR scanAccOpPtr[MAX_PARALLEL_OP_PER_SCAN];
UintR scanApiOpPtr[MAX_PARALLEL_OP_PER_SCAN]; UintR scanApiOpPtr;
UintR scanOpLength[MAX_PARALLEL_OP_PER_SCAN];
UintR scanLocalref[2]; UintR scanLocalref[2];
Uint32 scan_batch_len;
Uint32 first_batch_size;
Uint32 batch_byte_size;
UintR copyPtr; UintR copyPtr;
union { union {
Uint32 nextPool; Uint32 nextPool;
......
...@@ -890,7 +890,7 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* signal) ...@@ -890,7 +890,7 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* signal)
&ctcConnectrecFileSize)); &ctcConnectrecFileSize));
clogFileFileSize = 4 * cnoLogFiles; clogFileFileSize = 4 * cnoLogFiles;
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_SCAN, &cscanrecFileSize)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_SCAN, &cscanrecFileSize));
cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_SCANS_PER_FRAG; cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_OP_PER_SCAN;
initRecords(); initRecords();
initialiseRecordsLab(signal, 0, ref, senderData); initialiseRecordsLab(signal, 0, ref, senderData);
...@@ -2099,8 +2099,6 @@ void Dblqh::execTIME_SIGNAL(Signal* signal) ...@@ -2099,8 +2099,6 @@ void Dblqh::execTIME_SIGNAL(Signal* signal)
c_scanRecordPool.getPtr(TscanPtr, tTcConptr.p->tcScanRec); c_scanRecordPool.getPtr(TscanPtr, tTcConptr.p->tcScanRec);
ndbout << " scanState = " << TscanPtr.p->scanState << endl; ndbout << " scanState = " << TscanPtr.p->scanState << endl;
//TscanPtr.p->scanAccOpPtr[16]; //TscanPtr.p->scanAccOpPtr[16];
//TscanPtr.p->scanApiOpPtr[16];
//TscanPtr.p->scanOpLength[16];
//TscanPtr.p->scanLocalref[2]; //TscanPtr.p->scanLocalref[2];
ndbout << " copyPtr="<<TscanPtr.p->copyPtr ndbout << " copyPtr="<<TscanPtr.p->copyPtr
<< " scanAccPtr="<<TscanPtr.p->scanAccPtr << " scanAccPtr="<<TscanPtr.p->scanAccPtr
...@@ -6988,6 +6986,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal) ...@@ -6988,6 +6986,7 @@ void Dblqh::continueScanNextReqLab(Signal* signal)
initScanAccOp(signal); initScanAccOp(signal);
scanptr.p->scanCompletedOperations = 0; scanptr.p->scanCompletedOperations = 0;
scanptr.p->scan_batch_len= 0;
scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT; scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
scanNextLoopLab(signal); scanNextLoopLab(signal);
}//Dblqh::continueScanNextReqLab() }//Dblqh::continueScanNextReqLab()
...@@ -7142,6 +7141,7 @@ void Dblqh::closeScanRequestLab(Signal* signal) ...@@ -7142,6 +7141,7 @@ void Dblqh::closeScanRequestLab(Signal* signal)
}//if }//if
tcConnectptr.p->abortState = TcConnectionrec::ABORT_ACTIVE; tcConnectptr.p->abortState = TcConnectionrec::ABORT_ACTIVE;
scanptr.p->scanCompletedOperations = 0; scanptr.p->scanCompletedOperations = 0;
scanptr.p->scan_batch_len= 0;
sendScanFragConf(signal, ZTRUE); sendScanFragConf(signal, ZTRUE);
break; break;
case TcConnectionrec::SCAN_TUPKEY: case TcConnectionrec::SCAN_TUPKEY:
...@@ -7225,7 +7225,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal) ...@@ -7225,7 +7225,7 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
* ------------------------------------------------------------------------- */ * ------------------------------------------------------------------------- */
void Dblqh::execSCAN_FRAGREQ(Signal* signal) void Dblqh::execSCAN_FRAGREQ(Signal* signal)
{ {
const ScanFragReq * const scanFragReq = (ScanFragReq *)&signal->theData[0]; ScanFragReq * const scanFragReq = (ScanFragReq *)&signal->theData[0];
ScanFragRef * ref; ScanFragRef * ref;
const Uint32 transid1 = scanFragReq->transId1; const Uint32 transid1 = scanFragReq->transId1;
const Uint32 transid2 = scanFragReq->transId2; const Uint32 transid2 = scanFragReq->transId2;
...@@ -7238,7 +7238,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal) ...@@ -7238,7 +7238,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
const Uint32 reqinfo = scanFragReq->requestInfo; const Uint32 reqinfo = scanFragReq->requestInfo;
const Uint32 fragId = scanFragReq->fragmentNo; const Uint32 fragId = scanFragReq->fragmentNo;
tabptr.i = scanFragReq->tableId; tabptr.i = scanFragReq->tableId;
const Uint32 scanConcurrentOperations = ScanFragReq::getConcurrency(reqinfo); const Uint32 scanConcurrentOperations = scanFragReq->concurrency;
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo); const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
const Uint8 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo); const Uint8 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
const Uint8 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo); const Uint8 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
...@@ -7256,9 +7256,9 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal) ...@@ -7256,9 +7256,9 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
tcConnectptr.p->savePointId = scanFragReq->savePointId; tcConnectptr.p->savePointId = scanFragReq->savePointId;
} else { } else {
jam(); jam();
/* --------------------------------------------------------------------- /* --------------------------------------------------------------------
* NO FREE TC RECORD AVAILABLE, THUS WE CANNOT HANDLE THE REQUEST. * NO FREE TC RECORD AVAILABLE, THUS WE CANNOT HANDLE THE REQUEST.
* --------------------------------------------------------------------- */ * -------------------------------------------------------------------- */
errorCode = ZNO_TC_CONNECT_ERROR; errorCode = ZNO_TC_CONNECT_ERROR;
senderData = scanFragReq->senderData; senderData = scanFragReq->senderData;
goto error_handler_early; goto error_handler_early;
...@@ -7871,8 +7871,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal) ...@@ -7871,8 +7871,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1]; tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1];
tupKeyReq->attrBufLen = 0; tupKeyReq->attrBufLen = 0;
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN); ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
tupKeyReq->opRef = tupKeyReq->opRef = scanptr.p->scanApiOpPtr;
scanptr.p->scanApiOpPtr[scanptr.p->scanCompletedOperations];
tupKeyReq->applRef = scanptr.p->scanApiBlockref; tupKeyReq->applRef = scanptr.p->scanApiBlockref;
tupKeyReq->schemaVersion = scanptr.p->scanSchemaVersion; tupKeyReq->schemaVersion = scanptr.p->scanSchemaVersion;
tupKeyReq->storedProcedure = scanptr.p->scanStoredProcId; tupKeyReq->storedProcedure = scanptr.p->scanStoredProcId;
...@@ -7963,7 +7962,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal) ...@@ -7963,7 +7962,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
tdata4 += tcConnectptr.p->primKeyLen;// Inform API about keyinfo len aswell tdata4 += tcConnectptr.p->primKeyLen;// Inform API about keyinfo len aswell
}//if }//if
ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN); ndbrequire(scanptr.p->scanCompletedOperations < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->scanOpLength[scanptr.p->scanCompletedOperations] = tdata4; scanptr.p->scan_batch_len+= tdata4;
scanptr.p->scanCompletedOperations++; scanptr.p->scanCompletedOperations++;
if ((scanptr.p->scanCompletedOperations == if ((scanptr.p->scanCompletedOperations ==
scanptr.p->scanConcurrentOperations) && scanptr.p->scanConcurrentOperations) &&
...@@ -8217,6 +8216,7 @@ void Dblqh::tupScanCloseConfLab(Signal* signal) ...@@ -8217,6 +8216,7 @@ void Dblqh::tupScanCloseConfLab(Signal* signal)
} else { } else {
jam(); jam();
scanptr.p->scanCompletedOperations = 0; scanptr.p->scanCompletedOperations = 0;
scanptr.p->scan_batch_len= 0;
sendScanFragConf(signal, ZSCAN_FRAG_CLOSED); sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
}//if }//if
finishScanrec(signal); finishScanrec(signal);
...@@ -8249,7 +8249,7 @@ void Dblqh::initScanAccOp(Signal* signal) ...@@ -8249,7 +8249,7 @@ void Dblqh::initScanAccOp(Signal* signal)
Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
{ {
const Uint32 reqinfo = scanFragReq->requestInfo; const Uint32 reqinfo = scanFragReq->requestInfo;
const Uint32 scanConcurrentOperations = ScanFragReq::getConcurrency(reqinfo); const Uint32 scanConcurrentOperations = scanFragReq->concurrency;
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo); const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
const Uint32 scanLockHold = ScanFragReq::getHoldLockFlag(reqinfo); const Uint32 scanLockHold = ScanFragReq::getHoldLockFlag(reqinfo);
const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo); const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
...@@ -8267,7 +8267,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8267,7 +8267,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanTcrec = tcConnectptr.i; scanptr.p->scanTcrec = tcConnectptr.i;
scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion; scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion;
scanptr.p->scanCompletedOperations = 0; scanptr.p->scanCompletedOperations = 0;
scanptr.p->scan_batch_len= 0;
scanptr.p->scanConcurrentOperations = scanConcurrentOperations; scanptr.p->scanConcurrentOperations = scanConcurrentOperations;
scanptr.p->batch_byte_size= scanFragReq->batch_byte_size;
scanptr.p->first_batch_size= scanFragReq->first_batch_size;
scanptr.p->scanErrorCounter = 0; scanptr.p->scanErrorCounter = 0;
scanptr.p->scanLockMode = scanLockMode; scanptr.p->scanLockMode = scanLockMode;
scanptr.p->readCommitted = readCommitted; scanptr.p->readCommitted = readCommitted;
...@@ -8279,11 +8282,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8279,11 +8282,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanLocalFragid = 0; scanptr.p->scanLocalFragid = 0;
scanptr.p->scanTcWaiting = ZTRUE; scanptr.p->scanTcWaiting = ZTRUE;
scanptr.p->scanNumber = ~0; scanptr.p->scanNumber = ~0;
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
for (Uint32 i = 0; i < scanConcurrentOperations; i++) { for (Uint32 i = 0; i < scanConcurrentOperations; i++) {
jam();
scanptr.p->scanApiOpPtr[i] = scanFragReq->clientOpPtr[i];
scanptr.p->scanOpLength[i] = 0;
scanptr.p->scanAccOpPtr[i] = 0; scanptr.p->scanAccOpPtr[i] = 0;
}//for }//for
...@@ -8547,11 +8547,11 @@ void Dblqh::sendKeyinfo20(Signal* signal, ...@@ -8547,11 +8547,11 @@ void Dblqh::sendKeyinfo20(Signal* signal,
TdataBuf.i = TdataBuf.p->nextDatabuf; TdataBuf.i = TdataBuf.p->nextDatabuf;
} }
keyInfo->clientOpPtr = scanP->scanApiOpPtr[scanOp]; keyInfo->clientOpPtr = scanP->scanApiOpPtr;
keyInfo->keyLen = keyLen; keyInfo->keyLen = keyLen;
keyInfo->scanInfo_Node = KeyInfo20::setScanInfo(scanOp, keyInfo->scanInfo_Node = KeyInfo20::setScanInfo(scanOp,
scanP->scanNumber)+ scanP->scanNumber)+
(getOwnNodeId() << 16); (getOwnNodeId() << 20);
keyInfo->transId1 = tcConP->transid[0]; keyInfo->transId1 = tcConP->transid[0];
keyInfo->transId2 = tcConP->transid[1]; keyInfo->transId2 = tcConP->transid[1];
...@@ -8632,23 +8632,27 @@ void Dblqh::sendKeyinfo20(Signal* signal, ...@@ -8632,23 +8632,27 @@ void Dblqh::sendKeyinfo20(Signal* signal,
* ------------------------------------------------------------------------ */ * ------------------------------------------------------------------------ */
void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted) void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
{ {
Uint32 completed_ops= scanptr.p->scanCompletedOperations;
Uint32 total_len= scanptr.p->scan_batch_len;
scanptr.p->scanTcWaiting = ZFALSE;
if(ERROR_INSERTED(5037)){ if(ERROR_INSERTED(5037)){
CLEAR_ERROR_INSERT_VALUE; CLEAR_ERROR_INSERT_VALUE;
return; return;
} }
scanptr.p->scanTcWaiting = ZFALSE;
ScanFragConf * conf = (ScanFragConf*)&signal->theData[0]; ScanFragConf * conf = (ScanFragConf*)&signal->theData[0];
NodeId tc_node_id= refToNode(tcConnectptr.p->clientBlockref);
Uint32 trans_id1= tcConnectptr.p->transid[0];
Uint32 trans_id2= tcConnectptr.p->transid[1];
conf->senderData = tcConnectptr.p->clientConnectrec; conf->senderData = tcConnectptr.p->clientConnectrec;
conf->completedOps = scanptr.p->scanCompletedOperations; conf->completedOps = completed_ops;
conf->fragmentCompleted = scanCompleted; conf->fragmentCompleted = scanCompleted;
for(Uint32 i = 0; i<MAX_PARALLEL_OP_PER_SCAN; i++) conf->transId1 = trans_id1;
conf->opReturnDataLen[i] = scanptr.p->scanOpLength[i]; conf->transId2 = trans_id2;
conf->transId1 = tcConnectptr.p->transid[0]; conf->total_len= total_len;
conf->transId2 = tcConnectptr.p->transid[1];
sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF, sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF,
signal, ScanFragConf::SignalLength, JBB); signal, ScanFragConf::SignalLength, JBB);
}//Dblqh::sendScanFragConf() }//Dblqh::sendScanFragConf()
/* ######################################################################### */ /* ######################################################################### */
......
...@@ -1184,7 +1184,11 @@ public: ...@@ -1184,7 +1184,11 @@ public:
Uint32 scanTableref; Uint32 scanTableref;
// Number of operation records per scanned fragment // Number of operation records per scanned fragment
// Number of operations in first batch
// Max number of bytes per batch
Uint16 noOprecPerFrag; Uint16 noOprecPerFrag;
Uint16 first_batch_size;
Uint32 batch_byte_size;
// Shall the locks be held until the application have read the // Shall the locks be held until the application have read the
// records // records
...@@ -1417,17 +1421,13 @@ private: ...@@ -1417,17 +1421,13 @@ private:
UintR anApiConnectPtr); UintR anApiConnectPtr);
void handleScanStop(Signal* signal, UintR aFailedNode); void handleScanStop(Signal* signal, UintR aFailedNode);
void initScanTcrec(Signal* signal); void initScanTcrec(Signal* signal);
void initScanApirec(Signal* signal, void initScanrec(ScanRecordPtr, const class ScanTabReq*,
Uint32 buddyPtr,
UintR transid1,
UintR transid2);
void initScanrec(ScanRecordPtr, const class ScanTabReq*,
const UintR scanParallel, const UintR scanParallel,
const UintR noOprecPerFrag); const UintR noOprecPerFrag);
void initScanfragrec(Signal* signal); void initScanfragrec(Signal* signal);
void releaseScanResources(ScanRecordPtr); void releaseScanResources(ScanRecordPtr);
ScanRecordPtr seizeScanrec(Signal* signal); ScanRecordPtr seizeScanrec(Signal* signal);
void sendScanFragReq(Signal* signal, ScanRecord*, ScanFragRec*); void sendScanFragReq(Signal*, ScanRecord*, ScanFragRec*);
void sendScanTabConf(Signal* signal, ScanRecord*); void sendScanTabConf(Signal* signal, ScanRecord*);
void close_scan_req(Signal*, ScanRecordPtr, bool received_req); void close_scan_req(Signal*, ScanRecordPtr, bool received_req);
void close_scan_req_send_conf(Signal*, ScanRecordPtr); void close_scan_req_send_conf(Signal*, ScanRecordPtr);
......
This diff is collapsed.
...@@ -1844,7 +1844,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ ...@@ -1844,7 +1844,7 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->tableId = tabPtr.p->m_tableId; req->tableId = tabPtr.p->m_tableId;
req->requestInfo = 0; req->requestInfo = 0;
req->savePointId = 0; req->savePointId = 0;
ScanFragReq::setConcurrency(req->requestInfo, parallelism); //ScanFragReq::setConcurrency(req->requestInfo, parallelism);
ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setLockMode(req->requestInfo, 0);
ScanFragReq::setHoldLockFlag(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
...@@ -1853,9 +1853,10 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ ...@@ -1853,9 +1853,10 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->schemaVersion = tabPtr.p->m_schemaVersion; req->schemaVersion = tabPtr.p->m_schemaVersion;
req->transId1 = 0; req->transId1 = 0;
req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8); req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
for(unsigned int i = 0; i<parallelism; i++){ for(unsigned int i = 0; i<parallelism; i++){
req->clientOpPtr[i] = (ptrI << 16) + (i + 1); //req->clientOpPtr[i] = (ptrI << 16) + (i + 1);
req->clientOpPtr = (ptrI << 16) + (i + 1);
} }
suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 25, JBB); suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 25, JBB);
......
...@@ -511,7 +511,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){ ...@@ -511,7 +511,7 @@ Configuration::calcSizeAlt(ConfigValues * ownConfig){
/*-----------------------------------------------------------------------*/ /*-----------------------------------------------------------------------*/
cfg.put(CFG_ACC_OP_RECS, cfg.put(CFG_ACC_OP_RECS,
((11 * noOfOperations) / 10 + 50) + ((11 * noOfOperations) / 10 + 50) +
(noOfLocalScanRecords * MAX_PARALLEL_SCANS_PER_FRAG) + (noOfLocalScanRecords * MAX_PARALLEL_OP_PER_SCAN) +
NODE_RECOVERY_SCAN_OP_RECORDS); NODE_RECOVERY_SCAN_OP_RECORDS);
cfg.put(CFG_ACC_OVERFLOW_RECS, cfg.put(CFG_ACC_OVERFLOW_RECS,
......
...@@ -177,7 +177,7 @@ NdbApiSignal::setSignal(int aNdbSignalType) ...@@ -177,7 +177,7 @@ NdbApiSignal::setSignal(int aNdbSignalType)
theTrace = TestOrd::TraceAPI; theTrace = TestOrd::TraceAPI;
theReceiversBlockNumber = DBTC; theReceiversBlockNumber = DBTC;
theVerId_signalNumber = GSN_SCAN_TABREQ; theVerId_signalNumber = GSN_SCAN_TABREQ;
theLength = 9; // ScanTabReq::SignalLength; theLength = ScanTabReq::StaticLength;
} }
break; break;
...@@ -186,7 +186,7 @@ NdbApiSignal::setSignal(int aNdbSignalType) ...@@ -186,7 +186,7 @@ NdbApiSignal::setSignal(int aNdbSignalType)
theTrace = TestOrd::TraceAPI; theTrace = TestOrd::TraceAPI;
theReceiversBlockNumber = DBTC; theReceiversBlockNumber = DBTC;
theVerId_signalNumber = GSN_SCAN_NEXTREQ; theVerId_signalNumber = GSN_SCAN_NEXTREQ;
theLength = 4; theLength = ScanNextReq::SignalLength;
} }
break; break;
......
...@@ -71,6 +71,8 @@ public: ...@@ -71,6 +71,8 @@ public:
const Uint32 * getDataPtr() const; const Uint32 * getDataPtr() const;
Uint32 * getDataPtrSend(); Uint32 * getDataPtrSend();
NodeId get_sender_node();
/** /**
* Fragmentation * Fragmentation
*/ */
...@@ -103,6 +105,17 @@ private: ...@@ -103,6 +105,17 @@ private:
NdbApiSignal *theNextSignal; NdbApiSignal *theNextSignal;
Uint32 *theRealData; Uint32 *theRealData;
}; };
/**********************************************************************
NodeId get_sender_node
Remark: Get the node id of the sender
***********************************************************************/
inline
NodeId
NdbApiSignal::get_sender_node()
{
return refToNode(theSendersBlockRef);
}
/********************************************************************** /**********************************************************************
void getLength void getLength
Remark: Get the length of the signal. Remark: Get the length of the signal.
......
...@@ -99,11 +99,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal, ...@@ -99,11 +99,12 @@ NdbConnection::receiveSCAN_TABCONF(NdbApiSignal* aSignal,
} }
for(Uint32 i = 0; i<len; i += 3){ for(Uint32 i = 0; i<len; i += 3){
Uint32 opCount, totalLen;
Uint32 ptrI = * ops++; Uint32 ptrI = * ops++;
Uint32 tcPtrI = * ops++; Uint32 tcPtrI = * ops++;
Uint32 info = * ops++; Uint32 info = * ops++;
Uint32 opCount = ScanTabConf::getRows(info); opCount = ScanTabConf::getRows(info);
Uint32 totalLen = ScanTabConf::getLength(info); totalLen = ScanTabConf::getLength(info);
void * tPtr = theNdb->int2void(ptrI); void * tPtr = theNdb->int2void(ptrI);
assert(tPtr); // For now assert(tPtr); // For now
......
...@@ -89,6 +89,47 @@ NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){ ...@@ -89,6 +89,47 @@ NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
#define KEY_ATTR_ID (~0) #define KEY_ATTR_ID (~0)
void
NdbReceiver::calculate_batch_size(Uint32 key_size,
Uint32 parallelism,
Uint32& batch_size,
Uint32& batch_byte_size,
Uint32& first_batch_size)
{
Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
NdbRecAttr *rec_attr= theFirstRecAttr;
while (rec_attr != NULL) {
Uint32 attr_size= rec_attr->attrSize() * rec_attr->arraySize();
attr_size= ((attr_size + 7) >> 2) << 2; //Even to word + overhead
tot_size+= attr_size;
rec_attr= rec_attr->next();
}
tot_size+= 32; //include signal overhead
/**
* Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
* bytes sent for each batch from each node. We do however ensure that
* no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
* batch.
*/
batch_byte_size= SCAN_BATCH_SIZE;
if (SCAN_BATCH_SIZE * parallelism > MAX_SCAN_BATCH_SIZE) {
batch_byte_size= MAX_SCAN_BATCH_SIZE / parallelism;
}
batch_size= batch_byte_size / tot_size;
#ifdef VM_TRACE
ndbout << "batch_byte_size = " << batch_byte_size << " batch_size = ";
ndbout << batch_size << "tot_size = " << tot_size << endl;
#endif
if (batch_size == 0) {
batch_size= 1;
} else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
batch_size= MAX_PARALLEL_OP_PER_SCAN;
}
first_batch_size= batch_size;
return;
}
void void
NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
if(rows > m_defined_rows){ if(rows > m_defined_rows){
...@@ -139,7 +180,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){ ...@@ -139,7 +180,7 @@ NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
} }
prepareSend(); prepareSend();
return ; //0; return;
} }
void void
......
...@@ -140,17 +140,9 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -140,17 +140,9 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
Uint32 fragCount = m_currentTable->m_fragmentCount; Uint32 fragCount = m_currentTable->m_fragmentCount;
if (batch + parallel == 0) { if (parallel > fragCount || parallel == 0) {
batch = 16;
parallel= fragCount;
} else {
if (batch == 0 && parallel > 0) { // Backward
batch = (parallel >= 16 ? 16 : parallel);
parallel = (parallel + 15) / 16;
}
if (parallel > fragCount || parallel == 0)
parallel = fragCount; parallel = fragCount;
} }
// It is only possible to call openScan if // It is only possible to call openScan if
// 1. this transcation don't already contain another scan operation // 1. this transcation don't already contain another scan operation
...@@ -201,7 +193,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -201,7 +193,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
} }
theParallelism = parallel; theParallelism = parallel;
theBatchSize = batch;
if(fix_receivers(parallel) == -1){ if(fix_receivers(parallel) == -1){
setErrorCodeAbort(4000); setErrorCodeAbort(4000);
...@@ -223,7 +214,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -223,7 +214,7 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
Uint32 reqInfo = 0; Uint32 reqInfo = 0;
ScanTabReq::setParallelism(reqInfo, parallel); ScanTabReq::setParallelism(reqInfo, parallel);
ScanTabReq::setScanBatch(reqInfo, batch); ScanTabReq::setScanBatch(reqInfo, 0);
ScanTabReq::setLockMode(reqInfo, lockExcl); ScanTabReq::setLockMode(reqInfo, lockExcl);
ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode); ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted); ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
...@@ -815,8 +806,23 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, ...@@ -815,8 +806,23 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
theReceiver.prepareSend(); theReceiver.prepareSend();
bool keyInfo = m_keyInfo; bool keyInfo = m_keyInfo;
Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0; Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;
/**
* The number of records sent by each LQH is calculated and the kernel
* is informed of this number by updating the SCAN_TABREQ signal
*/
Uint32 batch_size, batch_byte_size, first_batch_size;
theReceiver.calculate_batch_size(key_size,
theParallelism,
batch_size,
batch_byte_size,
first_batch_size);
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
ScanTabReq::setScanBatch(req->requestInfo, batch_size);
req->batch_byte_size= batch_byte_size;
req->first_batch_size= first_batch_size;
for(Uint32 i = 0; i<theParallelism; i++){ for(Uint32 i = 0; i<theParallelism; i++){
m_receivers[i]->do_get_value(&theReceiver, theBatchSize, key_size); m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size);
} }
return 0; return 0;
} }
...@@ -856,23 +862,13 @@ NdbScanOperation::doSendScan(int aProcessorId) ...@@ -856,23 +862,13 @@ NdbScanOperation::doSendScan(int aProcessorId)
if (theOperationType == OpenRangeScanRequest) if (theOperationType == OpenRangeScanRequest)
req->attrLen += theTotalBoundAI_Len; req->attrLen += theTotalBoundAI_Len;
TransporterFacade *tp = TransporterFacade::instance(); TransporterFacade *tp = TransporterFacade::instance();
if(theParallelism > 16){ LinearSectionPtr ptr[3];
LinearSectionPtr ptr[3]; ptr[0].p = m_prepared_receivers;
ptr[0].p = m_prepared_receivers; ptr[0].sz = theParallelism;
ptr[0].sz = theParallelism; if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) {
if (tp->sendFragmentedSignal(tSignal, aProcessorId, ptr, 1) == -1) { setErrorCode(4002);
setErrorCode(4002); return -1;
return -1; }
}
} else {
tSignal->setLength(9+theParallelism);
memcpy(tSignal->getDataPtrSend()+9, m_prepared_receivers, 4*theParallelism);
if (tp->sendSignal(tSignal, aProcessorId) == -1) {
setErrorCode(4002);
return -1;
}
}
if (theOperationType == OpenRangeScanRequest) { if (theOperationType == OpenRangeScanRequest) {
// must have at least one signal since it contains attrLen for bounds // must have at least one signal since it contains attrLen for bounds
assert(theBoundATTRINFO != NULL); assert(theBoundATTRINFO != NULL);
...@@ -969,8 +965,8 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){ ...@@ -969,8 +965,8 @@ NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
} }
const Uint32 * src = (Uint32*)tRecAttr->aRef(); const Uint32 * src = (Uint32*)tRecAttr->aRef();
const Uint32 tScanInfo = src[len] & 0xFFFF; const Uint32 tScanInfo = src[len] & 0x3FFFF;
const Uint32 tTakeOverNode = src[len] >> 16; const Uint32 tTakeOverNode = src[len] >> 20;
{ {
UintR scanInfo = 0; UintR scanInfo = 0;
TcKeyReq::setTakeOverScanFlag(scanInfo, 1); TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment