Commit 46075704 authored by unknown's avatar unknown

wl2126 - ndb - add support for batching range scans

1) removed acc keyinfo
2) introduce handling of multiple bounds


ndb/include/kernel/signaldata/AccScan.hpp:
  Removed keyinfo from AccScan
ndb/include/kernel/signaldata/TuxBound.hpp:
  Add data word
ndb/include/ndbapi/NdbIndexScanOperation.hpp:
  Added new_bound call, which is used when batching range scans
ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
  removed keyinfo from acc scan
ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  removed scan keyinfo
ndb/src/kernel/blocks/dblqh/DblqhInit.cpp:
  removed scan keyinfo
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  1) removed add scan,
     let LQH read keyinfo from tup instead
  2) introduce copy_bounds which handles multiple bounds
ndb/src/kernel/blocks/dbtux/Dbtux.hpp:
  removed scan keyinfo
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp:
  removed scan keyinfo
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp:
  removed scan keyinfo
ndb/src/ndbapi/NdbScanOperation.cpp:
  handle multiple bounds
parent 3ef57e28
......@@ -51,11 +51,9 @@ private:
* Previously there where also a scan type
*/
static Uint32 getLockMode(const Uint32 & requestInfo);
static Uint32 getKeyinfoFlag(const Uint32 & requestInfo);
static Uint32 getReadCommittedFlag(const Uint32 & requestInfo);
static void setLockMode(Uint32 & requestInfo, Uint32 lockMode);
static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo);
static void setReadCommittedFlag(Uint32 & requestInfo, Uint32 readCommitted);
};
......@@ -63,17 +61,14 @@ private:
* Request Info
*
* l = Lock Mode - 1 Bit 2
* k = Keyinfo - 1 Bit 4
* h = Read Committed - 1 Bit 5
*
* 1111111111222222222233
* 01234567890123456789012345678901
* l kh
* l h
*/
#define AS_LOCK_MODE_SHIFT (2)
#define AS_LOCK_MODE_MASK (1)
#define AS_KEYINFO_SHIFT (4)
#define AS_READ_COMMITTED_SHIFT (5)
inline
......@@ -82,12 +77,6 @@ AccScanReq::getLockMode(const Uint32 & requestInfo){
return (requestInfo >> AS_LOCK_MODE_SHIFT) & AS_LOCK_MODE_MASK;
}
inline
Uint32
AccScanReq::getKeyinfoFlag(const Uint32 & requestInfo){
return (requestInfo >> AS_KEYINFO_SHIFT) & 1;
}
inline
Uint32
AccScanReq::getReadCommittedFlag(const Uint32 & requestInfo){
......@@ -101,13 +90,6 @@ AccScanReq::setLockMode(UintR & requestInfo, UintR val){
requestInfo |= (val << AS_LOCK_MODE_SHIFT);
}
inline
void
AccScanReq::setKeyinfoFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "AccScanReq::setKeyinfoFlag");
requestInfo |= (val << AS_KEYINFO_SHIFT);
}
inline
void
AccScanReq::setReadCommittedFlag(UintR & requestInfo, UintR val){
......
......@@ -50,6 +50,8 @@ private:
* Number of words of bound info included after fixed signal data.
*/
Uint32 boundAiLength;
Uint32 data[1];
};
#endif
......@@ -115,6 +115,12 @@ public:
*/
int reset_bounds();
/**
* Set new bound on operation,
* used when batching index reads
*/
int set_new_bound();
bool getSorted() const { return m_ordered; }
private:
NdbIndexScanOperation(Ndb* aNdb);
......@@ -132,6 +138,9 @@ private:
int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*);
Uint32 m_sort_columns;
Uint32 m_this_bound_start;
Uint32 * m_first_bound_word;
};
#endif
......@@ -8949,7 +8949,6 @@ void Dbacc::execACC_SCANREQ(Signal* signal)
rootfragrecptr.p->scan[i] = scanPtr.i;
scanPtr.p->scanBucketState = ScanRec::FIRST_LAP;
scanPtr.p->scanLockMode = AccScanReq::getLockMode(tscanFlag);
scanPtr.p->scanKeyinfoFlag = AccScanReq::getKeyinfoFlag(tscanFlag);
scanPtr.p->scanReadCommittedFlag = AccScanReq::getReadCommittedFlag(tscanFlag);
/* TWELVE BITS OF THE ELEMENT HEAD ARE SCAN */
......@@ -10083,7 +10082,6 @@ void Dbacc::sendNextScanConf(Signal* signal)
{
scanPtr.p->scanTimer = scanPtr.p->scanContinuebCounter;
Uint32 blockNo = refToBlock(scanPtr.p->scanUserblockref);
if (!scanPtr.p->scanKeyinfoFlag){
jam();
/** ---------------------------------------------------------------------
* LQH WILL NOT HAVE ANY USE OF THE TUPLE KEY LENGTH IN THIS CASE AND
......@@ -10097,39 +10095,6 @@ void Dbacc::sendNextScanConf(Signal* signal)
signal->theData[5] = fragrecptr.p->localkeylen;
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 6);
return;
}//if
fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
readTablePk(operationRecPtr.p->localdata[0]);
signal->theData[0] = scanPtr.p->scanUserptr;
signal->theData[1] = operationRecPtr.i;
signal->theData[2] = operationRecPtr.p->fid;
signal->theData[3] = operationRecPtr.p->localdata[0];
signal->theData[4] = operationRecPtr.p->localdata[1];
signal->theData[5] = fragrecptr.p->localkeylen;
signal->theData[6] = fragrecptr.p->keyLength;
signal->theData[7] = ckeys[0];
signal->theData[8] = ckeys[1];
signal->theData[9] = ckeys[2];
signal->theData[10] = ckeys[3];
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 11);
const Uint32 keyLength = fragrecptr.p->keyLength;
Uint32 total = 4;
while (total < keyLength) {
jam();
Uint32 length = keyLength - total;
if (length > 20)
length = 20;
signal->theData[0] = scanPtr.p->scanUserptr;
signal->theData[1] = operationRecPtr.i; // not used by LQH
signal->theData[2] = operationRecPtr.p->fid; // not used by LQH
signal->theData[3] = length;
memcpy(&signal->theData[4], &ckeys[total], length << 2);
EXECUTE_DIRECT(blockNo, GSN_ACC_SCAN_INFO24, signal, 4 + length);
// wl-2066 remove GSN_ACC_SCAN_INFO
total += length;
}//if
}//Dbacc::sendNextScanConf()
/*---------------------------------------------------------------------------
......@@ -11233,13 +11198,12 @@ Dbacc::execDUMP_STATE_ORD(Signal* signal)
scanPtr.p->minBucketIndexToRescan,
scanPtr.p->maxBucketIndexToRescan);
infoEvent(" scanBucketState=%d, scanLockHeld=%d, userBlockRef=%d, "
"scanMask=%d scanLockMode=%d, keyInfoFlag=%d",
"scanMask=%d scanLockMode=%d",
scanPtr.p->scanBucketState,
scanPtr.p->scanLockHeld,
scanPtr.p->scanUserblockref,
scanPtr.p->scanMask,
scanPtr.p->scanLockMode,
scanPtr.p->scanKeyinfoFlag);
scanPtr.p->scanLockMode);
return;
}
......
......@@ -29,6 +29,9 @@
#include <signaldata/LqhTransConf.hpp>
#include <signaldata/LqhFrag.hpp>
// primary key is stored in TUP
#include <../dbtup/Dbtup.hpp>
#ifdef DBLQH_C
// Constants
/* ------------------------------------------------------------------------- */
......@@ -500,9 +503,7 @@ public:
WAIT_DELETE_STORED_PROC_ID_COPY = 6,
WAIT_ACC_COPY = 7,
WAIT_ACC_SCAN = 8,
WAIT_SCAN_KEYINFO = 9,
WAIT_SCAN_NEXTREQ = 10,
WAIT_COPY_KEYINFO = 11,
WAIT_CLOSE_SCAN = 12,
WAIT_CLOSE_COPY = 13,
WAIT_RELEASE_LOCK = 14,
......@@ -2025,6 +2026,7 @@ public:
Uint8 opExec;
Uint8 operation;
Uint8 reclenAiLqhkey;
Uint8 m_offset_current_keybuf;
Uint8 replicaType;
Uint8 simpleRead;
Uint8 seqNoReplica;
......@@ -2128,8 +2130,6 @@ private:
void execACC_SCANREF(Signal* signal);
void execNEXT_SCANCONF(Signal* signal);
void execNEXT_SCANREF(Signal* signal);
void execACC_SCAN_INFO(Signal* signal);
void execACC_SCAN_INFO24(Signal* signal);
void execACC_TO_REF(Signal* signal);
void execSTORED_PROCCONF(Signal* signal);
void execSTORED_PROCREF(Signal* signal);
......@@ -2250,7 +2250,7 @@ private:
void finishScanrec(Signal* signal);
void releaseScanrec(Signal* signal);
void seizeScanrec(Signal* signal);
void sendKeyinfo20(Signal* signal, ScanRecord *, TcConnectionrec *);
Uint32 sendKeyinfo20(Signal* signal, ScanRecord *, TcConnectionrec *);
void sendScanFragConf(Signal* signal, Uint32 scanCompleted);
void initCopyrec(Signal* signal);
void initCopyTc(Signal* signal);
......@@ -2379,6 +2379,8 @@ private:
void seizeAttrinbuf(Signal* signal);
Uint32 seize_attrinbuf();
Uint32 release_attrinbuf(Uint32);
Uint32 copy_bounds(Uint32 * dst, TcConnectionrec*);
void seizeFragmentrec(Signal* signal);
void seizePageRef(Signal* signal);
void seizeTcrec();
......@@ -2425,7 +2427,6 @@ private:
void localCommitLab(Signal* signal);
void abortErrorLab(Signal* signal);
void continueAfterReceivingAllAiLab(Signal* signal);
void sendScanFragRefLateLab(Signal* signal);
void abortStateHandlerLab(Signal* signal);
void writeAttrinfoLab(Signal* signal);
void scanAttrinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length);
......@@ -2492,7 +2493,7 @@ private:
void nextScanConfScanLab(Signal* signal);
void nextScanConfCopyLab(Signal* signal);
void continueScanNextReqLab(Signal* signal);
bool keyinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length);
void keyinfoLab(const Uint32 * src, const Uint32 * end);
void copySendTupkeyReqLab(Signal* signal);
void storedProcConfScanLab(Signal* signal);
void storedProcConfCopyLab(Signal* signal);
......@@ -2548,7 +2549,6 @@ private:
void accScanConfScanLab(Signal* signal);
void accScanConfCopyLab(Signal* signal);
void scanLockReleasedLab(Signal* signal);
void accScanInfoEnterLab(Signal* signal, Uint32* dataPtr, Uint32 length);
void openSrFourthNextLab(Signal* signal);
void closingInitLab(Signal* signal);
void closeExecSrCompletedLab(Signal* signal);
......@@ -2563,6 +2563,8 @@ private:
void initData();
void initRecords();
Dbtup* c_tup;
Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst);
// ----------------------------------------------------------------
// These are variables handling the records. For most records one
// pointer to the array of structs, one pointer-struct, a file size
......
......@@ -274,8 +274,6 @@ Dblqh::Dblqh(const class Configuration & conf):
addRecSignal(GSN_ACC_SCANREF, &Dblqh::execACC_SCANREF);
addRecSignal(GSN_NEXT_SCANCONF, &Dblqh::execNEXT_SCANCONF);
addRecSignal(GSN_NEXT_SCANREF, &Dblqh::execNEXT_SCANREF);
addRecSignal(GSN_ACC_SCAN_INFO, &Dblqh::execACC_SCAN_INFO);
addRecSignal(GSN_ACC_SCAN_INFO24, &Dblqh::execACC_SCAN_INFO24);
addRecSignal(GSN_STORED_PROCCONF, &Dblqh::execSTORED_PROCCONF);
addRecSignal(GSN_STORED_PROCREF, &Dblqh::execSTORED_PROCREF);
addRecSignal(GSN_COPY_FRAGREQ, &Dblqh::execCOPY_FRAGREQ);
......
This diff is collapsed.
......@@ -406,7 +406,6 @@ private:
Uint32 m_accLockOp;
Uint8 m_readCommitted; // no locking
Uint8 m_lockMode;
Uint8 m_keyInfo;
ScanBound m_boundMin;
ScanBound m_boundMax;
ScanBound* m_bound[2]; // pointers to above 2
......@@ -1030,7 +1029,6 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
m_accLockOp(RNIL),
m_readCommitted(0),
m_lockMode(0),
m_keyInfo(0),
m_boundMin(scanBoundPool),
m_boundMax(scanBoundPool),
m_scanPos(),
......
......@@ -347,7 +347,6 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan)
out << "]";
out << " [readCommitted " << dec << scan.m_readCommitted << "]";
out << " [lockMode " << dec << scan.m_lockMode << "]";
out << " [keyInfo " << dec << scan.m_keyInfo << "]";
out << " [pos " << scan.m_scanPos << "]";
out << " [ent " << scan.m_scanEnt << "]";
for (unsigned i = 0; i <= 1; i++) {
......
......@@ -73,7 +73,6 @@ Dbtux::execACC_SCANREQ(Signal* signal)
scanPtr.p->m_savePointId = req->savePointId;
scanPtr.p->m_readCommitted = AccScanReq::getReadCommittedFlag(req->requestInfo);
scanPtr.p->m_lockMode = AccScanReq::getLockMode(req->requestInfo);
scanPtr.p->m_keyInfo = AccScanReq::getKeyinfoFlag(req->requestInfo);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl;
......@@ -473,13 +472,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam();
// read keys if not already done (uses signal)
const TreeEnt ent = scan.m_scanEnt;
if (scan.m_keyInfo) {
jam();
if (pkSize == 0) {
jam();
readTablePk(frag, ent, pkData, pkSize);
}
}
// conf signal
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scan.m_userPtr;
......@@ -501,15 +493,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
conf->localKeyLength = 1;
unsigned signalLength = 6;
// add key info
if (scan.m_keyInfo) {
jam();
conf->keyLength = pkSize;
// piggy-back first 4 words of key data
for (unsigned i = 0; i < 4; i++) {
conf->key[i] = i < pkSize ? pkData[i] : 0;
}
signalLength = 11;
}
if (! scan.m_readCommitted) {
sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
......@@ -517,24 +500,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
Uint32 blockNo = refToBlock(scan.m_userRef);
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
}
// send rest of key data
if (scan.m_keyInfo && pkSize > 4) {
unsigned total = 4;
while (total < pkSize) {
jam();
unsigned length = pkSize - total;
if (length > 20)
length = 20;
signal->theData[0] = scan.m_userPtr;
signal->theData[1] = 0;
signal->theData[2] = 0;
signal->theData[3] = length;
memcpy(&signal->theData[4], &pkData[total], length << 2);
sendSignal(scan.m_userRef, GSN_ACC_SCAN_INFO24,
signal, 4 + length, JBB);
total += length;
}
}
// next time look for next entry
scan.m_state = ScanOp::Next;
return;
......
......@@ -129,7 +129,6 @@ NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
Uint32 parallel)
{
m_ordered = 0;
Uint32 fragCount = m_currentTable->m_fragmentCount;
if (parallel > fragCount || parallel == 0) {
......@@ -1205,6 +1204,9 @@ NdbIndexScanOperation::readTuples(LockMode lm,
#endif
}
}
m_this_bound_start = 0;
m_first_bound_word = theKEYINFOptr;
return rs;
}
......@@ -1573,7 +1575,8 @@ NdbScanOperation::restart()
}
int
NdbIndexScanOperation::reset_bounds(){
NdbIndexScanOperation::reset_bounds()
{
int res;
{
......@@ -1593,6 +1596,8 @@ NdbIndexScanOperation::reset_bounds(){
theTotalNrOfKeyWordInSignal = 0;
theNoOfTupKeyLeft = m_accessTable->m_noOfDistributionKeys;
theDistrKeyIndicator_ = 0;
m_this_bound_start = 0;
m_first_bound_word = theKEYINFOptr;
m_transConnection
->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp,
this);
......@@ -1601,3 +1606,14 @@ NdbIndexScanOperation::reset_bounds(){
}
return res;
}
int
NdbIndexScanOperation::set_new_bound()
{
Uint32 bound_head = * m_first_bound_word;
bound_head |= (theTupKeyLen - m_this_bound_start) << 16;
* m_first_bound_word = bound_head;
m_first_bound_word = theKEYINFOptr;
m_this_bound_start = theTupKeyLen;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment