Commit 183a9a16 authored by joreland@mysql.com's avatar joreland@mysql.com

ndb - Fix loop over multiple bounds in LQH

      Last part of NDB batching of range scans 
      (now only handler remains)
parent 005123f9
...@@ -6962,6 +6962,7 @@ void Dblqh::execSTORED_PROCREF(Signal* signal) ...@@ -6962,6 +6962,7 @@ void Dblqh::execSTORED_PROCREF(Signal* signal)
switch (scanptr.p->scanState) { switch (scanptr.p->scanState) {
case ScanRecord::WAIT_STORED_PROC_SCAN: case ScanRecord::WAIT_STORED_PROC_SCAN:
jam(); jam();
scanptr.p->scanCompletedStatus = ZTRUE;
scanptr.p->scanStoredProcId = signal->theData[2]; scanptr.p->scanStoredProcId = signal->theData[2];
tcConnectptr.p->errorCode = errorCode; tcConnectptr.p->errorCode = errorCode;
closeScanLab(signal); closeScanLab(signal);
...@@ -7763,42 +7764,52 @@ void Dblqh::accScanConfScanLab(Signal* signal) ...@@ -7763,42 +7764,52 @@ void Dblqh::accScanConfScanLab(Signal* signal)
tcConnectptr.p->errorCode = req->errorCode; tcConnectptr.p->errorCode = req->errorCode;
} }
} }
scanptr.p->scanState = ScanRecord::WAIT_STORED_PROC_SCAN;
signal->theData[0] = tcConnectptr.p->tupConnectrec;
signal->theData[1] = tcConnectptr.p->tableref;
signal->theData[2] = scanptr.p->scanSchemaVersion;
signal->theData[3] = ZSTORED_PROC_SCAN;
signal->theData[4] = scanptr.p->scanAiLength; scanptr.p->scanState = ScanRecord::WAIT_STORED_PROC_SCAN;
sendSignal(tcConnectptr.p->tcTupBlockref, if(scanptr.p->scanStoredProcId == RNIL)
GSN_STORED_PROCREQ, signal, 5, JBB); {
signal->theData[0] = tcConnectptr.p->tupConnectrec;
AttrbufPtr regAttrinbufptr;
Uint32 firstAttr = regAttrinbufptr.i = tcConnectptr.p->firstAttrinbuf;
while (regAttrinbufptr.i != RNIL) {
ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
jam(); jam();
Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN]; signal->theData[0] = tcConnectptr.p->tupConnectrec;
ndbrequire(dataLen != 0); signal->theData[1] = tcConnectptr.p->tableref;
// first 3 words already set in STORED_PROCREQ signal->theData[2] = scanptr.p->scanSchemaVersion;
MEMCOPY_NO_WORDS(&signal->theData[3], signal->theData[3] = ZSTORED_PROC_SCAN;
&regAttrinbufptr.p->attrbuf[0],
dataLen); signal->theData[4] = scanptr.p->scanAiLength;
sendSignal(tcConnectptr.p->tcTupBlockref, sendSignal(tcConnectptr.p->tcTupBlockref,
GSN_ATTRINFO, signal, dataLen + 3, JBB); GSN_STORED_PROCREQ, signal, 5, JBB);
regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
c_no_attrinbuf_recs++; signal->theData[0] = tcConnectptr.p->tupConnectrec;
}//while AttrbufPtr regAttrinbufptr;
Uint32 firstAttr = regAttrinbufptr.i = tcConnectptr.p->firstAttrinbuf;
/** while (regAttrinbufptr.i != RNIL) {
* Release attr info ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
*/ jam();
if(firstAttr != RNIL) Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
ndbrequire(dataLen != 0);
// first 3 words already set in STORED_PROCREQ
MEMCOPY_NO_WORDS(&signal->theData[3],
&regAttrinbufptr.p->attrbuf[0],
dataLen);
sendSignal(tcConnectptr.p->tcTupBlockref,
GSN_ATTRINFO, signal, dataLen + 3, JBB);
regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
c_no_attrinbuf_recs++;
}//while
/**
* Release attr info
*/
if(firstAttr != RNIL)
{
regAttrinbufptr.p->attrbuf[ZINBUF_NEXT] = cfirstfreeAttrinbuf;
cfirstfreeAttrinbuf = firstAttr;
tcConnectptr.p->firstAttrinbuf = tcConnectptr.p->lastAttrinbuf = RNIL;
}
}
else
{ {
regAttrinbufptr.p->attrbuf[ZINBUF_NEXT] = cfirstfreeAttrinbuf; jam();
cfirstfreeAttrinbuf = firstAttr; storedProcConfScanLab(signal);
tcConnectptr.p->firstAttrinbuf = tcConnectptr.p->lastAttrinbuf = RNIL;
} }
}//Dblqh::accScanConfScanLab() }//Dblqh::accScanConfScanLab()
...@@ -7823,7 +7834,7 @@ Dblqh::copy_bounds(Uint32 * dst, TcConnectionrec* tcPtrP) ...@@ -7823,7 +7834,7 @@ Dblqh::copy_bounds(Uint32 * dst, TcConnectionrec* tcPtrP)
Uint32 left = 4 - tcPtrP->m_offset_current_keybuf; // left in buf Uint32 left = 4 - tcPtrP->m_offset_current_keybuf; // left in buf
Uint32 totalLen = tcPtrP->primKeyLen - 4; Uint32 totalLen = tcPtrP->primKeyLen - 4;
regDatabufptr.i = tcPtrP->firstTupkeybuf; regDatabufptr.i = tcPtrP->firstTupkeybuf;
ndbassert(tcPtrP->primKeyLen >= 4); ndbassert(tcPtrP->primKeyLen >= 4);
ndbassert(tcPtrP->m_offset_current_keybuf < 4); ndbassert(tcPtrP->m_offset_current_keybuf < 4);
ndbassert(!(totalLen == 0 && regDatabufptr.i != RNIL)); ndbassert(!(totalLen == 0 && regDatabufptr.i != RNIL));
...@@ -7857,7 +7868,7 @@ Dblqh::copy_bounds(Uint32 * dst, TcConnectionrec* tcPtrP) ...@@ -7857,7 +7868,7 @@ Dblqh::copy_bounds(Uint32 * dst, TcConnectionrec* tcPtrP)
if(len < left) if(len < left)
{ {
offset = 4 - len; offset = len;
} }
else else
{ {
...@@ -7972,7 +7983,6 @@ void Dblqh::continueFirstScanAfterBlockedLab(Signal* signal) ...@@ -7972,7 +7983,6 @@ void Dblqh::continueFirstScanAfterBlockedLab(Signal* signal)
scanptr.i = tcConnectptr.p->tcScanRec; scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
scanptr.p->scanState = ScanRecord::WAIT_NEXT_SCAN; scanptr.p->scanState = ScanRecord::WAIT_NEXT_SCAN;
init_acc_ptr_list(scanptr.p);
signal->theData[0] = scanptr.p->scanAccPtr; signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = RNIL; signal->theData[1] = RNIL;
signal->theData[2] = NextScanReq::ZSCAN_NEXT; signal->theData[2] = NextScanReq::ZSCAN_NEXT;
...@@ -8532,11 +8542,11 @@ void Dblqh::accScanCloseConfLab(Signal* signal) ...@@ -8532,11 +8542,11 @@ void Dblqh::accScanCloseConfLab(Signal* signal)
scanptr.p->scanCompletedStatus != ZTRUE) scanptr.p->scanCompletedStatus != ZTRUE)
{ {
jam(); jam();
ndbout_c("Dont close scan"); releaseActiveFrag(signal);
//sendScanFragConf(signal, 0); continueAfterReceivingAllAiLab(signal);
// Don't delete scan in TUP return;
} }
scanptr.p->scanState = ScanRecord::WAIT_DELETE_STORED_PROC_ID_SCAN; scanptr.p->scanState = ScanRecord::WAIT_DELETE_STORED_PROC_ID_SCAN;
signal->theData[0] = tcConnectptr.p->tupConnectrec; signal->theData[0] = tcConnectptr.p->tupConnectrec;
signal->theData[1] = tcConnectptr.p->tableref; signal->theData[1] = tcConnectptr.p->tableref;
...@@ -8630,6 +8640,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8630,6 +8640,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanNumber = ~0; scanptr.p->scanNumber = ~0;
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr; scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
scanptr.p->m_last_row = 0; scanptr.p->m_last_row = 0;
scanptr.p->scanStoredProcId = RNIL;
if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){ if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){
jam(); jam();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment