Commit 68da9ff3 authored by unknown's avatar unknown

wl2025 - fix suma & backup

pluse remove "first_batch_size" and instead put
batch_size on next_scan_req


ndb/include/kernel/signaldata/ScanFrag.hpp:
  Rename
  * concurrency -> batch_size_bytes
  * batch_byte_size -> batch_size_bytes
  
  Removed first_batch_size and
  added size parameters to SCAN_NEXTREQ
ndb/src/kernel/blocks/backup/Backup.cpp:
  Adopt to SCAN_FRAG_CHANGES
ndb/src/kernel/blocks/backup/Backup.hpp:
  Adopt to SCAN_FRAG_CHANGES
ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  Renamed
  scanCompletedOperations -> m_curr_batch_size_rows
  scanConcurrentOperations -> m_max_batch_size_rows
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Renamed
  scanCompletedOperations -> m_curr_batch_size_rows
  scanConcurrentOperations -> m_max_batch_size_rows
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  Adopt to SCAN_FRAG_CHANGES
ndb/src/kernel/blocks/suma/Suma.cpp:
  Adopt to SCAN_FRAG_CHANGES
parent c378cb82
...@@ -33,7 +33,7 @@ class ScanFragReq { ...@@ -33,7 +33,7 @@ class ScanFragReq {
*/ */
friend class Dblqh; friend class Dblqh;
public: public:
STATIC_CONST( SignalLength = 13 ); STATIC_CONST( SignalLength = 12 );
public: public:
Uint32 senderData; Uint32 senderData;
...@@ -46,9 +46,8 @@ public: ...@@ -46,9 +46,8 @@ public:
Uint32 transId1; Uint32 transId1;
Uint32 transId2; Uint32 transId2;
Uint32 clientOpPtr; Uint32 clientOpPtr;
Uint32 concurrency; Uint32 batch_size_rows;
Uint32 batch_byte_size; Uint32 batch_size_bytes;
Uint32 first_batch_size;
static Uint32 getLockMode(const Uint32 & requestInfo); static Uint32 getLockMode(const Uint32 & requestInfo);
static Uint32 getHoldLockFlag(const Uint32 & requestInfo); static Uint32 getHoldLockFlag(const Uint32 & requestInfo);
...@@ -176,13 +175,15 @@ class ScanFragNextReq { ...@@ -176,13 +175,15 @@ class ScanFragNextReq {
friend bool printSCANFRAGNEXTREQ(FILE * output, const Uint32 * theData, friend bool printSCANFRAGNEXTREQ(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo); Uint32 len, Uint16 receiverBlockNo);
public: public:
STATIC_CONST( SignalLength = 4 ); STATIC_CONST( SignalLength = 6 );
public: public:
Uint32 senderData; Uint32 senderData;
Uint32 closeFlag; Uint32 closeFlag;
Uint32 transId1; Uint32 transId1;
Uint32 transId2; Uint32 transId2;
Uint32 batch_size_rows;
Uint32 batch_size_bytes;
}; };
/** /**
......
...@@ -3324,7 +3324,6 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal) ...@@ -3324,7 +3324,6 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
req->requestInfo = 0; req->requestInfo = 0;
req->savePointId = 0; req->savePointId = 0;
req->tableId = table.tableId; req->tableId = table.tableId;
//ScanFragReq::setConcurrency(req->requestInfo, parallelism);
ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setLockMode(req->requestInfo, 0);
ScanFragReq::setHoldLockFlag(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 1); ScanFragReq::setKeyinfoFlag(req->requestInfo, 1);
...@@ -3332,6 +3331,8 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal) ...@@ -3332,6 +3331,8 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
req->transId1 = 0; req->transId1 = 0;
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8); req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
req->clientOpPtr= filePtr.i; req->clientOpPtr= filePtr.i;
req->batch_size_rows= 16;
req->batch_size_bytes= 0;
sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB); ScanFragReq::SignalLength, JBB);
...@@ -3549,8 +3550,7 @@ Backup::OperationRecord::newFragment(Uint32 tableId, Uint32 fragNo) ...@@ -3549,8 +3550,7 @@ Backup::OperationRecord::newFragment(Uint32 tableId, Uint32 fragNo)
head->FragmentNo = htonl(fragNo); head->FragmentNo = htonl(fragNo);
head->ChecksumType = htonl(0); head->ChecksumType = htonl(0);
opNoDone = opNoConf = 0; opNoDone = opNoConf = opLen = 0;
memset(attrLen, 0, sizeof(attrLen));
newRecord(tmp + headSz); newRecord(tmp + headSz);
scanStart = tmp; scanStart = tmp;
scanStop = (tmp + headSz); scanStop = (tmp + headSz);
...@@ -3593,8 +3593,7 @@ Backup::OperationRecord::newScan() ...@@ -3593,8 +3593,7 @@ Backup::OperationRecord::newScan()
ndbrequire(16 * maxRecordSize < dataBuffer.getMaxWrite()); ndbrequire(16 * maxRecordSize < dataBuffer.getMaxWrite());
if(dataBuffer.getWritePtr(&tmp, 16 * maxRecordSize)) { if(dataBuffer.getWritePtr(&tmp, 16 * maxRecordSize)) {
jam(); jam();
opNoDone = opNoConf = 0; opNoDone = opNoConf = opLen = 0;
memset(attrLen, 0, sizeof(attrLen));
newRecord(tmp); newRecord(tmp);
scanStart = tmp; scanStart = tmp;
scanStop = tmp; scanStop = tmp;
...@@ -3604,12 +3603,12 @@ Backup::OperationRecord::newScan() ...@@ -3604,12 +3603,12 @@ Backup::OperationRecord::newScan()
} }
bool bool
Backup::OperationRecord::scanConf(Uint32 noOfOps, Uint32 opLen[]) Backup::OperationRecord::scanConf(Uint32 noOfOps, Uint32 total_len)
{ {
const Uint32 done = opNoDone-opNoConf; const Uint32 done = opNoDone-opNoConf;
ndbrequire(noOfOps == done); ndbrequire(noOfOps == done);
ndbrequire(memcmp(&attrLen[opNoConf], opLen, done << 2) == 0); ndbrequire(opLen == total_len);
opNoConf = opNoDone; opNoConf = opNoDone;
const Uint32 len = (scanStop - scanStart); const Uint32 len = (scanStop - scanStart);
...@@ -3652,8 +3651,8 @@ Backup::execSCAN_FRAGCONF(Signal* signal) ...@@ -3652,8 +3651,8 @@ Backup::execSCAN_FRAGCONF(Signal* signal)
c_backupFilePool.getPtr(filePtr, filePtrI); c_backupFilePool.getPtr(filePtr, filePtrI);
OperationRecord & op = filePtr.p->operation; OperationRecord & op = filePtr.p->operation;
//op.scanConf(conf->completedOps, conf->opReturnDataLen);
op.scanConf(conf->completedOps, conf->total_len);
const Uint32 completed = conf->fragmentCompleted; const Uint32 completed = conf->fragmentCompleted;
if(completed != 2) { if(completed != 2) {
jam(); jam();
...@@ -3722,6 +3721,8 @@ Backup::checkScan(Signal* signal, BackupFilePtr filePtr) ...@@ -3722,6 +3721,8 @@ Backup::checkScan(Signal* signal, BackupFilePtr filePtr)
req->closeFlag = 0; req->closeFlag = 0;
req->transId1 = 0; req->transId1 = 0;
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8); req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
req->batch_size_rows= 16;
req->batch_size_bytes= 0;
sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal, sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB); ScanFragNextReq::SignalLength, JBB);
return; return;
......
...@@ -236,7 +236,7 @@ public: ...@@ -236,7 +236,7 @@ public:
* Once per scan frag (next) req/conf * Once per scan frag (next) req/conf
*/ */
bool newScan(); bool newScan();
bool scanConf(Uint32 noOfOps, Uint32 opLen[]); bool scanConf(Uint32 noOfOps, Uint32 opLen);
/** /**
* Per record * Per record
...@@ -268,7 +268,7 @@ public: ...@@ -268,7 +268,7 @@ public:
Uint32 opNoDone; Uint32 opNoDone;
Uint32 opNoConf; Uint32 opNoConf;
Uint32 attrLen[16]; Uint32 opLen;
public: public:
Uint32* dst; Uint32* dst;
...@@ -713,7 +713,7 @@ Backup::OperationRecord::finished(){ ...@@ -713,7 +713,7 @@ Backup::OperationRecord::finished(){
return false; return false;
} }
attrLen[opNoDone] = attrSzTotal + sz_FixedKeys; opLen += attrSzTotal + sz_FixedKeys;
opNoDone++; opNoDone++;
scanStop = dst = (Uint32 *)dst_VariableData; scanStop = dst = (Uint32 *)dst_VariableData;
......
...@@ -532,15 +532,20 @@ public: ...@@ -532,15 +532,20 @@ public:
SCAN = 1, SCAN = 1,
COPY = 2 COPY = 2
}; };
UintR scan_acc_op_ptr[32]; UintR scan_acc_op_ptr[32];
Uint32 scan_acc_index; Uint32 scan_acc_index;
Uint32 scan_acc_attr_recs; Uint32 scan_acc_attr_recs;
UintR scanApiOpPtr; UintR scanApiOpPtr;
UintR scanLocalref[2]; UintR scanLocalref[2];
Uint32 scan_batch_len;
Uint32 batch_size; Uint32 m_max_batch_size_rows;
Uint32 first_batch_size; Uint32 m_max_batch_size_bytes;
Uint32 batch_byte_size;
Uint32 m_curr_batch_size_rows;
Uint32 m_curr_batch_size_bytes;
bool check_scan_batch_completed() const;
UintR copyPtr; UintR copyPtr;
union { union {
...@@ -559,8 +564,6 @@ public: ...@@ -559,8 +564,6 @@ public:
UintR scanAccPtr; UintR scanAccPtr;
UintR scanAiLength; UintR scanAiLength;
UintR scanCompletedOperations;
UintR scanConcurrentOperations;
UintR scanErrorCounter; UintR scanErrorCounter;
UintR scanLocalFragid; UintR scanLocalFragid;
UintR scanSchemaVersion; UintR scanSchemaVersion;
...@@ -2231,7 +2234,6 @@ private: ...@@ -2231,7 +2234,6 @@ private:
Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32, bool); Uint32 get_acc_ptr_from_scan_record(ScanRecord*, Uint32, bool);
void set_acc_ptr_in_scan_record(ScanRecord*, Uint32, Uint32); void set_acc_ptr_in_scan_record(ScanRecord*, Uint32, Uint32);
void i_get_acc_ptr(ScanRecord*, Uint32*&, Uint32); void i_get_acc_ptr(ScanRecord*, Uint32*&, Uint32);
bool check_scan_batch_completed(ScanRecord*);
void removeTable(Uint32 tableId); void removeTable(Uint32 tableId);
void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId); void sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId);
...@@ -2933,9 +2935,13 @@ public: ...@@ -2933,9 +2935,13 @@ public:
inline inline
bool bool
Dblqh::check_scan_batch_completed(ScanRecord* scanP) Dblqh::ScanRecord::check_scan_batch_completed() const
{ {
return (scanP->scanCompletedOperations == scanP->scanConcurrentOperations) || Uint32 max_rows = m_max_batch_size_rows;
(scanP->scan_batch_len >= scanP->batch_byte_size); Uint32 max_bytes = m_max_batch_size_bytes;
return (max_rows > 0 && (m_curr_batch_size_rows >= max_rows)) ||
(max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes));
} }
#endif #endif
This diff is collapsed.
...@@ -9264,6 +9264,7 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) ...@@ -9264,6 +9264,7 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
nextReq->closeFlag = ZFALSE; nextReq->closeFlag = ZFALSE;
nextReq->transId1 = apiConnectptr.p->transid[0]; nextReq->transId1 = apiConnectptr.p->transid[0];
nextReq->transId2 = apiConnectptr.p->transid[1]; nextReq->transId2 = apiConnectptr.p->transid[1];
nextReq->batch_size_bytes= scanP->batch_byte_size;
ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags); ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags); ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
...@@ -9278,6 +9279,8 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal) ...@@ -9278,6 +9279,8 @@ void Dbtc::execSCAN_NEXTREQ(Signal* signal)
scanFragptr.p->m_ops = 0; scanFragptr.p->m_ops = 0;
nextReq->senderData = scanFragptr.i; nextReq->senderData = scanFragptr.i;
nextReq->batch_size_rows= scanFragptr.p->scanFragConcurrency;
sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal, sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB); ScanFragNextReq::SignalLength, JBB);
delivered.remove(scanFragptr); delivered.remove(scanFragptr);
...@@ -9487,10 +9490,9 @@ void Dbtc::sendScanFragReq(Signal* signal, ...@@ -9487,10 +9490,9 @@ void Dbtc::sendScanFragReq(Signal* signal,
req->savePointId = apiConnectptr.p->currSavePointId; req->savePointId = apiConnectptr.p->currSavePointId;
req->transId1 = apiConnectptr.p->transid[0]; req->transId1 = apiConnectptr.p->transid[0];
req->transId2 = apiConnectptr.p->transid[1]; req->transId2 = apiConnectptr.p->transid[1];
req->concurrency= scanFragP->scanFragConcurrency;
req->clientOpPtr = scanFragP->m_apiPtr; req->clientOpPtr = scanFragP->m_apiPtr;
req->batch_byte_size= scanP->batch_byte_size; req->batch_size_rows= scanFragP->scanFragConcurrency;
req->first_batch_size= scanP->first_batch_size; req->batch_size_bytes= scanP->batch_byte_size;
sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal, sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB); ScanFragReq::SignalLength, JBB);
updateBuddyTimer(apiConnectptr); updateBuddyTimer(apiConnectptr);
......
...@@ -1880,7 +1880,6 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ ...@@ -1880,7 +1880,6 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->tableId = tabPtr.p->m_tableId; req->tableId = tabPtr.p->m_tableId;
req->requestInfo = 0; req->requestInfo = 0;
req->savePointId = 0; req->savePointId = 0;
//ScanFragReq::setConcurrency(req->requestInfo, parallelism);
ScanFragReq::setLockMode(req->requestInfo, 0); ScanFragReq::setLockMode(req->requestInfo, 0);
ScanFragReq::setHoldLockFlag(req->requestInfo, 0); ScanFragReq::setHoldLockFlag(req->requestInfo, 0);
ScanFragReq::setKeyinfoFlag(req->requestInfo, 0); ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
...@@ -1889,12 +1888,11 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){ ...@@ -1889,12 +1888,11 @@ SumaParticipant::SyncRecord::nextScan(Signal* signal){
req->schemaVersion = tabPtr.p->m_schemaVersion; req->schemaVersion = tabPtr.p->m_schemaVersion;
req->transId1 = 0; req->transId1 = 0;
req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8); req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
req->clientOpPtr = (ptrI << 16);
for(unsigned int i = 0; i<parallelism; i++){ req->batch_size_rows= 16;
//req->clientOpPtr[i] = (ptrI << 16) + (i + 1); req->batch_size_bytes= 0;
req->clientOpPtr = (ptrI << 16) + (i + 1); suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal,
} ScanFragReq::SignalLength, JBB);
suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal, 25, JBB);
signal->theData[0] = ptrI; signal->theData[0] = ptrI;
signal->theData[1] = 0; signal->theData[1] = 0;
...@@ -1996,6 +1994,8 @@ SumaParticipant::execSUB_SYNC_CONTINUE_CONF(Signal* signal){ ...@@ -1996,6 +1994,8 @@ SumaParticipant::execSUB_SYNC_CONTINUE_CONF(Signal* signal){
req->closeFlag = 0; req->closeFlag = 0;
req->transId1 = 0; req->transId1 = 0;
req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8); req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
req->batch_size_rows = 16;
req->batch_size_bytes = 0;
sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal, sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB); ScanFragNextReq::SignalLength, JBB);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment