Commit 21ea39ea authored by unknown's avatar unknown

ndb: wl-1893: range scanning backwards, ndb kernel


ndb/include/kernel/signaldata/AccScan.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/include/kernel/signaldata/ScanFrag.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/include/kernel/signaldata/ScanTab.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/include/ndbapi/NdbIndexScanOperation.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/include/ndbapi/NdbScanOperation.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/common/debugger/signaldata/ScanTab.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/Dbtux.hpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/src/ndbapi/NdbScanOperation.cpp:
  wl-1893: range scanning backwards, ndb kernel
ndb/tools/select_all.cpp:
  wl-1893: range scanning backwards, ndb kernel
sql/ha_ndbcluster.cc:
  wl-1893: range scanning backwards, ndb kernel
parent 5e45d161
......@@ -52,9 +52,11 @@ private:
*/
static Uint32 getLockMode(const Uint32 & requestInfo);
static Uint32 getReadCommittedFlag(const Uint32 & requestInfo);
static Uint32 getDescendingFlag(const Uint32 & requestInfo);
static void setLockMode(Uint32 & requestInfo, Uint32 lockMode);
static void setReadCommittedFlag(Uint32 & requestInfo, Uint32 readCommitted);
static void setDescendingFlag(Uint32 & requestInfo, Uint32 descending);
};
/**
......@@ -62,14 +64,16 @@ private:
*
* l = Lock Mode - 1 Bit 2
* h = Read Committed - 1 Bit 5
* z = Descending (TUX) - 1 Bit 6
*
* 1111111111222222222233
* 01234567890123456789012345678901
* l h
* l hz
*/
#define AS_LOCK_MODE_SHIFT (2)
#define AS_LOCK_MODE_MASK (1)
#define AS_READ_COMMITTED_SHIFT (5)
#define AS_DESCENDING_SHIFT (6)
inline
Uint32
......@@ -83,6 +87,12 @@ AccScanReq::getReadCommittedFlag(const Uint32 & requestInfo){
return (requestInfo >> AS_READ_COMMITTED_SHIFT) & 1;
}
inline
Uint32
AccScanReq::getDescendingFlag(const Uint32 & requestInfo){
return (requestInfo >> AS_DESCENDING_SHIFT) & 1;
}
inline
void
AccScanReq::setLockMode(UintR & requestInfo, UintR val){
......@@ -97,6 +107,13 @@ AccScanReq::setReadCommittedFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << AS_READ_COMMITTED_SHIFT);
}
inline
void
AccScanReq::setDescendingFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "AccScanReq::setDescendingFlag");
requestInfo |= (val << AS_DESCENDING_SHIFT);
}
class AccScanConf {
/**
* Sender(s)
......
......@@ -56,6 +56,7 @@ public:
static Uint32 getKeyinfoFlag(const Uint32 & requestInfo);
static Uint32 getReadCommittedFlag(const Uint32 & requestInfo);
static Uint32 getRangeScanFlag(const Uint32 & requestInfo);
static Uint32 getDescendingFlag(const Uint32 & requestInfo);
static Uint32 getAttrLen(const Uint32 & requestInfo);
static Uint32 getScanPrio(const Uint32 & requestInfo);
......@@ -64,6 +65,7 @@ public:
static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo);
static void setReadCommittedFlag(Uint32 & requestInfo, Uint32 readCommitted);
static void setRangeScanFlag(Uint32 & requestInfo, Uint32 rangeScan);
static void setDescendingFlag(Uint32 & requestInfo, Uint32 descending);
static void setAttrLen(Uint32 & requestInfo, Uint32 attrLen);
static void setScanPrio(Uint32& requestInfo, Uint32 prio);
};
......@@ -197,11 +199,12 @@ public:
* k = Keyinfo - 1 Bit 8
* r = read committed - 1 Bit 9
* x = range scan - 1 Bit 6
* z = descending - 1 Bit 10
* p = Scan prio - 4 Bits (12-15) -> max 15
*
* 1111111111222222222233
* 01234567890123456789012345678901
* lxhkr ppppaaaaaaaaaaaaaaaa
* lxhkrz ppppaaaaaaaaaaaaaaaa
*/
#define SF_LOCK_MODE_SHIFT (5)
#define SF_LOCK_MODE_MASK (1)
......@@ -210,6 +213,7 @@ public:
#define SF_KEYINFO_SHIFT (8)
#define SF_READ_COMMITTED_SHIFT (9)
#define SF_RANGE_SCAN_SHIFT (6)
#define SF_DESCENDING_SHIFT (10)
#define SF_ATTR_LEN_SHIFT (16)
#define SF_ATTR_LEN_MASK (65535)
......@@ -241,6 +245,12 @@ ScanFragReq::getRangeScanFlag(const Uint32 & requestInfo){
return (requestInfo >> SF_RANGE_SCAN_SHIFT) & 1;
}
inline
Uint32
ScanFragReq::getDescendingFlag(const Uint32 & requestInfo){
return (requestInfo >> SF_DESCENDING_SHIFT) & 1;
}
inline
Uint32
ScanFragReq::getReadCommittedFlag(const Uint32 & requestInfo){
......@@ -301,6 +311,13 @@ ScanFragReq::setRangeScanFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << SF_RANGE_SCAN_SHIFT);
}
inline
void
ScanFragReq::setDescendingFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "ScanFragReq::setDescendingFlag");
requestInfo |= (val << SF_DESCENDING_SHIFT);
}
inline
void
ScanFragReq::setAttrLen(UintR & requestInfo, UintR val){
......
......@@ -35,6 +35,7 @@ class ScanTabReq {
*/
friend class NdbConnection;
friend class NdbScanOperation;
friend class NdbIndexScanOperation;
/**
* For printing
......@@ -79,6 +80,7 @@ private:
static Uint8 getHoldLockFlag(const UintR & requestInfo);
static Uint8 getReadCommittedFlag(const UintR & requestInfo);
static Uint8 getRangeScanFlag(const UintR & requestInfo);
static Uint8 getDescendingFlag(const UintR & requestInfo);
static Uint8 getKeyinfoFlag(const UintR & requestInfo);
static Uint16 getScanBatch(const UintR & requestInfo);
static Uint8 getDistributionKeyFlag(const UintR & requestInfo);
......@@ -92,6 +94,7 @@ private:
static void setHoldLockFlag(UintR & requestInfo, Uint32 flag);
static void setReadCommittedFlag(UintR & requestInfo, Uint32 flag);
static void setRangeScanFlag(UintR & requestInfo, Uint32 flag);
static void setDescendingFlag(UintR & requestInfo, Uint32 flag);
static void setKeyinfoFlag(UintR & requestInfo, Uint32 flag);
static void setScanBatch(Uint32& requestInfo, Uint32 sz);
static void setDistributionKeyFlag(Uint32& requestInfo, Uint32 flag);
......@@ -105,13 +108,14 @@ private:
h = Hold lock mode - 1 Bit 10
c = Read Committed - 1 Bit 11
k = Keyinfo - 1 Bit 12
z = Descending (TUX) - 1 Bit 14
x = Range Scan (TUX) - 1 Bit 15
b = Scan batch - 10 Bit 16-25 (max 1023)
d = Distribution key flag
1111111111222222222233
01234567890123456789012345678901
ppppppppl hck xbbbbbbbbbb
ppppppppl hck zxbbbbbbbbbb
*/
#define PARALLELL_SHIFT (0)
......@@ -132,6 +136,9 @@ private:
#define RANGE_SCAN_SHIFT (15)
#define RANGE_SCAN_MASK (1)
#define DESCENDING_SHIFT (14)
#define DESCENDING_MASK (1)
#define SCAN_BATCH_SHIFT (16)
#define SCAN_BATCH_MASK (1023)
......@@ -167,6 +174,12 @@ ScanTabReq::getRangeScanFlag(const UintR & requestInfo){
return (Uint8)((requestInfo >> RANGE_SCAN_SHIFT) & RANGE_SCAN_MASK);
}
inline
Uint8
ScanTabReq::getDescendingFlag(const UintR & requestInfo){
return (Uint8)((requestInfo >> DESCENDING_SHIFT) & DESCENDING_MASK);
}
inline
Uint16
ScanTabReq::getScanBatch(const Uint32 & requestInfo){
......@@ -214,6 +227,13 @@ ScanTabReq::setRangeScanFlag(UintR & requestInfo, Uint32 flag){
requestInfo |= (flag << RANGE_SCAN_SHIFT);
}
inline
void
ScanTabReq::setDescendingFlag(UintR & requestInfo, Uint32 flag){
ASSERT_BOOL(flag, "ScanTabReq::setDescendingFlag");
requestInfo |= (flag << DESCENDING_SHIFT);
}
inline
void
ScanTabReq::setScanBatch(Uint32 & requestInfo, Uint32 flag){
......
......@@ -39,6 +39,7 @@ public:
* @param batch No of rows to fetch from each fragment at a time
* @param LockMode Scan lock handling
* @param order_by Order result set in index order
* @param order_desc Order descending, ignored unless order_by
* @returns NdbResultSet.
* @see NdbScanOperation::readTuples
*/
......@@ -46,6 +47,7 @@ public:
Uint32 batch = 0,
Uint32 parallel = 0,
bool order_by = false,
bool order_desc = false,
bool read_range_no = false);
inline int readTuples(int parallell){
......@@ -128,6 +130,7 @@ public:
int get_range_no();
bool getSorted() const { return m_ordered; }
bool getDescending() const { return m_descending; }
private:
NdbIndexScanOperation(Ndb* aNdb);
virtual ~NdbIndexScanOperation();
......
......@@ -232,7 +232,8 @@ protected:
int getKeyFromKEYINFO20(Uint32* data, unsigned size);
NdbOperation* takeOverScanOp(OperationType opType, NdbConnection*);
Uint32 m_ordered;
bool m_ordered;
bool m_descending;
Uint32 m_read_range_no;
};
......
......@@ -30,13 +30,14 @@ printSCANTABREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiv
fprintf(output, " apiConnectPtr: H\'%.8x",
sig->apiConnectPtr);
fprintf(output, " requestInfo: H\'%.8x:\n", requestInfo);
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u, Keyinfo: %u Holdlock: %u, RangeScan: %u ReadCommitted: %u\n DistributionKeyFlag: %u",
fprintf(output, " Parallellism: %u, Batch: %u LockMode: %u Keyinfo: %u Holdlock: %u RangeScan: %u Descending: %u ReadCommitted: %u\n DistributionKeyFlag: %u",
sig->getParallelism(requestInfo),
sig->getScanBatch(requestInfo),
sig->getLockMode(requestInfo),
sig->getKeyinfoFlag(requestInfo),
sig->getHoldLockFlag(requestInfo),
sig->getRangeScanFlag(requestInfo),
sig->getDescendingFlag(requestInfo),
sig->getReadCommittedFlag(requestInfo),
sig->getDistributionKeyFlag(requestInfo));
......
......@@ -572,6 +572,7 @@ public:
Uint8 scanLockMode;
Uint8 readCommitted;
Uint8 rangeScan;
Uint8 descending;
Uint8 scanTcWaiting;
Uint8 scanKeyinfoFlag;
Uint8 m_last_row;
......
......@@ -7637,6 +7637,7 @@ void Dblqh::continueAfterReceivingAllAiLab(Signal* signal)
req->requestInfo = 0;
AccScanReq::setLockMode(req->requestInfo, scanptr.p->scanLockMode);
AccScanReq::setReadCommittedFlag(req->requestInfo, scanptr.p->readCommitted);
AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending);
req->transId1 = tcConnectptr.p->transid[0];
req->transId2 = tcConnectptr.p->transid[1];
req->savePointId = tcConnectptr.p->savePointId;
......@@ -8628,6 +8629,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo);
const Uint32 idx = ScanFragReq::getRangeScanFlag(reqinfo);
const Uint32 descending = ScanFragReq::getDescendingFlag(reqinfo);
const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo);
const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo);
......@@ -8649,6 +8651,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanLockMode = scanLockMode;
scanptr.p->readCommitted = readCommitted;
scanptr.p->rangeScan = idx;
scanptr.p->descending = descending;
scanptr.p->scanState = ScanRecord::SCAN_FREE;
scanptr.p->scanFlag = ZFALSE;
scanptr.p->scanLocalref[0] = 0;
......
......@@ -8791,6 +8791,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
ScanFragReq::setKeyinfoFlag(tmp, ScanTabReq::getKeyinfoFlag(ri));
ScanFragReq::setReadCommittedFlag(tmp,ScanTabReq::getReadCommittedFlag(ri));
ScanFragReq::setRangeScanFlag(tmp, ScanTabReq::getRangeScanFlag(ri));
ScanFragReq::setDescendingFlag(tmp, ScanTabReq::getDescendingFlag(ri));
ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF);
scanptr.p->scanRequestInfo = tmp;
......
......@@ -406,6 +406,7 @@ private:
Uint32 m_accLockOp;
Uint8 m_readCommitted; // no locking
Uint8 m_lockMode;
Uint8 m_descending;
ScanBound m_boundMin;
ScanBound m_boundMax;
ScanBound* m_bound[2]; // pointers to above 2
......@@ -638,7 +639,7 @@ private:
void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal);
void scanFirst(ScanOpPtr scanPtr);
void scanNext(ScanOpPtr scanPtr);
void scanNext(ScanOpPtr scanPtr, bool fromMaintReq);
bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
......@@ -650,7 +651,9 @@ private:
*/
void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/*
* DbtuxCmp.cpp
......@@ -1029,6 +1032,7 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
m_accLockOp(RNIL),
m_readCommitted(0),
m_lockMode(0),
m_descending(0),
m_boundMin(scanBoundPool),
m_boundMax(scanBoundPool),
m_scanPos(),
......
......@@ -103,7 +103,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
*
* Following example illustrates this. We are at (a=2, b=3).
*
* dir bounds strict return
* idir bounds strict return
* 0 a >= 2 and b >= 3 no -1
* 0 a >= 2 and b > 3 yes +1
* 1 a <= 2 and b <= 3 no +1
......@@ -112,11 +112,11 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
* The attributes are normalized and have variable size given in words.
*/
int
Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
{
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// direction 0-lower 1-upper
ndbrequire(dir <= 1);
ndbrequire(idir <= 1);
// number of words of data left
unsigned len2 = maxlen;
// in case of no bounds, init last type to something non-strict
......@@ -171,5 +171,5 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
}
// all attributes were equal
const int strict = (type & 0x1);
return (dir == 0 ? (strict == 0 ? -1 : +1) : (strict == 0 ? +1 : -1));
return (idir == 0 ? (strict == 0 ? -1 : +1) : (strict == 0 ? +1 : -1));
}
......@@ -347,6 +347,7 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan)
out << "]";
out << " [readCommitted " << dec << scan.m_readCommitted << "]";
out << " [lockMode " << dec << scan.m_lockMode << "]";
out << " [descending " << dec << scan.m_descending << "]";
out << " [pos " << scan.m_scanPos << "]";
out << " [ent " << scan.m_scanEnt << "]";
for (unsigned i = 0; i <= 1; i++) {
......
......@@ -491,7 +491,7 @@ Dbtux::moveScanList(NodeHandle& node, unsigned pos)
debugOut << "At pos=" << pos << " " << node << endl;
}
#endif
scanNext(scanPtr);
scanNext(scanPtr, true);
ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
}
scanPtr.i = nextPtrI;
......
......@@ -74,17 +74,18 @@ Dbtux::execACC_SCANREQ(Signal* signal)
scanPtr.p->m_savePointId = req->savePointId;
scanPtr.p->m_readCommitted = AccScanReq::getReadCommittedFlag(req->requestInfo);
scanPtr.p->m_lockMode = AccScanReq::getLockMode(req->requestInfo);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl;
}
#endif
scanPtr.p->m_descending = AccScanReq::getDescendingFlag(req->requestInfo);
/*
* readCommitted lockMode keyInfo
* 1 0 0 - read committed (no lock)
* 0 0 0 - read latest (read lock)
* 0 1 1 - read exclusive (write lock)
*/
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl;
}
#endif
// conf
AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
conf->scanPtr = req->senderData;
......@@ -418,7 +419,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::Next) {
jam();
// look for next
scanNext(scanPtr);
scanNext(scanPtr, false);
}
// for reading tuple key in Current or Locked state
Data pkData = c_dataBuffer;
......@@ -697,8 +698,10 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
TreeHead& tree = frag.m_tree;
// set up index keys for this operation
setKeyAttrs(frag);
// unpack lower bound into c_dataBuffer
const ScanBound& bound = *scan.m_bound[0];
// scan direction 0, 1
const unsigned idir = scan.m_descending;
// unpack start key into c_dataBuffer
const ScanBound& bound = *scan.m_bound[idir];
ScanBoundIterator iter;
bound.first(iter);
for (unsigned j = 0; j < bound.getSize(); j++) {
......@@ -706,11 +709,10 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
c_dataBuffer[j] = *iter.data;
bound.next(iter);
}
// search for scan start position
TreePos treePos;
searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos);
if (treePos.m_loc == NullTupLoc) {
// empty tree
// empty result set
jam();
scan.m_state = ScanOp::Last;
return;
......@@ -728,7 +730,8 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
* Move to next entry. The scan is already linked to some node. When
* we leave, if an entry was found, it will be linked to a possibly
* different node. The scan has a position, and a direction which tells
* from where we came to this position. This is one of:
* from where we came to this position. This is one of (all comments
* are in terms of ascending scan):
*
* 0 - up from left child (scan this node next)
* 1 - up from right child (proceed to parent)
......@@ -740,7 +743,7 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
* re-organizations need not worry about scan direction.
*/
void
Dbtux::scanNext(ScanOpPtr scanPtr)
Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......@@ -753,8 +756,11 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
ndbrequire(scan.m_state != ScanOp::Locked);
// set up index keys for this operation
setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer
const ScanBound& bound = *scan.m_bound[1];
// scan direction
const unsigned idir = scan.m_descending; // 0, 1
const int jdir = 1 - 2 * (int)idir; // 1, -1
// unpack end key into c_dataBuffer
const ScanBound& bound = *scan.m_bound[1 - idir];
ScanBoundIterator iter;
bound.first(iter);
for (unsigned j = 0; j < bound.getSize(); j++) {
......@@ -774,6 +780,11 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
TreeEnt ent;
while (true) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Scan next pos " << pos << " " << node << endl;
}
#endif
if (pos.m_dir == 2) {
// coming up from root ends the scan
jam();
......@@ -788,7 +799,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
if (pos.m_dir == 4) {
// coming down from parent proceed to left child
jam();
TupLoc loc = node.getLink(0);
TupLoc loc = node.getLink(idir);
if (loc != NullTupLoc) {
jam();
pos.m_loc = loc;
......@@ -796,34 +807,42 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
continue;
}
// pretend we came from left child
pos.m_dir = 0;
pos.m_dir = idir;
}
const unsigned occup = node.getOccup();
if (occup == 0) {
jam();
ndbrequire(fromMaintReq);
// move back to parent - see comment in treeRemoveInner
pos.m_loc = node.getLink(2);
pos.m_dir = node.getSide();
continue;
}
if (pos.m_dir == 0) {
if (pos.m_dir == idir) {
// coming up from left child scan current node
jam();
pos.m_pos = 0;
pos.m_pos = idir == 0 ? 0 : occup - 1;
pos.m_match = false;
pos.m_dir = 3;
}
if (pos.m_dir == 3) {
// within node
jam();
unsigned occup = node.getOccup();
ndbrequire(occup >= 1);
// advance position
if (! pos.m_match)
pos.m_match = true;
else
pos.m_pos++;
// becomes ZNIL (which is > occup) if 0 and scan descending
pos.m_pos += jdir;
if (pos.m_pos < occup) {
jam();
ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged
// read and compare all attributes
readKeyAttrs(frag, ent, 0, c_entryKey);
int ret = cmpScanBound(frag, 1, c_dataBuffer, scan.m_boundCnt[1], c_entryKey);
int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, scan.m_boundCnt[1 - idir], c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
if (jdir * ret < 0) {
jam();
// hit upper bound of single range scan
pos.m_loc = NullTupLoc;
......@@ -840,7 +859,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
break;
}
// after node proceed to right child
TupLoc loc = node.getLink(1);
TupLoc loc = node.getLink(1 - idir);
if (loc != NullTupLoc) {
jam();
pos.m_loc = loc;
......@@ -848,9 +867,9 @@ Dbtux::scanNext(ScanOpPtr scanPtr)
continue;
}
// pretend we came from right child
pos.m_dir = 1;
pos.m_dir = 1 - idir;
}
if (pos.m_dir == 1) {
if (pos.m_dir == 1 - idir) {
// coming up from right child proceed to parent
jam();
pos.m_loc = node.getLink(2);
......
......@@ -253,22 +253,33 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo
/*
* Search for scan start position.
*
* Similar to searchToAdd.
* Similar to searchToAdd. The routines differ somewhat depending on
* scan direction and are done by separate methods.
*/
void
Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
if (tree.m_root != NullTupLoc) {
if (! descending)
searchToScanAscending(frag, boundInfo, boundCount, treePos);
else
searchToScanDescending(frag, boundInfo, boundCount, treePos);
return;
}
// empty tree
}
void
Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag);
// always before entry
treePos.m_match = false;
while (true) {
jam();
selectNode(currNode, currNode.m_loc);
......@@ -283,6 +294,7 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret < 0) {
// bound is left of this node
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
......@@ -300,11 +312,11 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo
// start scanning this node
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
} else if (ret > 0) {
// bound is at or right of this node
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
......@@ -316,7 +328,7 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo
continue;
}
} else {
ndbassert(false);
ndbrequire(false);
}
break;
}
......@@ -328,20 +340,19 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo
ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
// start scanning from current entry
// found first entry satisfying the bound
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
}
// bound is to right of this node
if (! bottomNode.isNull()) {
jam();
// start scanning the l.u.b
treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
......@@ -349,3 +360,90 @@ Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePo
treePos.m_loc = currNode.m_loc;
treePos.m_dir = 1;
}
void
Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag);
// always before entry
treePos.m_match = false;
while (true) {
jam();
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
ret = cmpScanBound(frag, 1, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare all attributes
readKeyAttrs(frag, currNode.getMinMax(0), 0, c_entryKey);
ret = cmpScanBound(frag, 1, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret < 0) {
// bound is left of this node
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b but remember the bottom node
bottomNode = currNode;
currNode = glbNode;
} else {
// empty result set
return;
}
} else if (ret > 0) {
// bound is at or right of this node
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
ndbrequire(false);
}
break;
}
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
jam();
int ret;
// read and compare attributes
readKeyAttrs(frag, currNode.getEnt(j), 0, c_entryKey);
ret = cmpScanBound(frag, 1, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
if (j > 0) {
// start scanning from previous entry
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j - 1;
treePos.m_dir = 3;
return;
}
// start scanning upwards (pretend we came from left child)
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_dir = 0;
return;
}
}
// start scanning this node
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup() - 1;
treePos.m_dir = 3;
}
......@@ -226,6 +226,9 @@ Dbtux::treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos)
// borrow max entry from semi/leaf
Uint32 scanList = RNIL;
nodePopDown(glbNode, glbNode.getOccup() - 1, ent, &scanList);
// g.l.b may be empty now
// a descending scan may try to enter the empty g.l.b
// we prevent this in scanNext
nodePopUp(lubNode, pos, ent, scanList);
if (glbNode.getLink(0) != NullTupLoc) {
jam();
......
......@@ -116,7 +116,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
Uint32 batch,
Uint32 parallel)
{
m_ordered = 0;
m_ordered = m_descending = false;
Uint32 fragCount = m_currentTable->m_fragmentCount;
if (parallel > fragCount || parallel == 0) {
......@@ -1191,9 +1191,10 @@ NdbIndexScanOperation::readTuples(LockMode lm,
Uint32 batch,
Uint32 parallel,
bool order_by,
bool order_desc,
bool read_range_no){
int res = NdbScanOperation::readTuples(lm, batch, 0);
if(read_range_no)
if(!res && read_range_no)
{
m_read_range_no = 1;
Uint32 word = 0;
......@@ -1202,7 +1203,12 @@ NdbIndexScanOperation::readTuples(LockMode lm,
res = -1;
}
if(!res && order_by){
m_ordered = 1;
m_ordered = true;
if (order_desc) {
m_descending = true;
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
ScanTabReq::setDescendingFlag(req->requestInfo, true);
}
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
m_sort_columns = cnt; // -1 for NDB$NODE
m_current_api_receiver = m_sent_receivers_count;
......@@ -1266,12 +1272,14 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
r1 = (skip ? r1->next() : r1);
r2 = (skip ? r2->next() : r2);
const int jdir = 1 - 2 * (int)m_descending;
assert(jdir == 1 || jdir == -1);
while(cols > 0){
Uint32 * d1 = (Uint32*)r1->aRef();
Uint32 * d2 = (Uint32*)r2->aRef();
unsigned r1_null = r1->isNULL();
if((r1_null ^ (unsigned)r2->isNULL())){
return (r1_null ? -1 : 1);
return (r1_null ? -1 : 1) * jdir;
}
const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
Uint32 len = r1->theAttrSize * r1->theArraySize;
......@@ -1280,7 +1288,7 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true);
if(r){
assert(r != NdbSqlUtil::CmpUnknown);
return r;
return r * jdir;
}
}
cols--;
......
......@@ -34,13 +34,14 @@ int scanReadRecords(Ndb*,
bool headers,
bool useHexFormat,
char delim,
bool orderby);
bool orderby,
bool descending);
static const char* opt_connect_str= 0;
static const char* _dbname = "TEST_DB";
static const char* _delimiter = "\t";
static int _unqualified, _header, _parallelism, _useHexFormat, _lock,
_order;
_order, _descending;
static struct my_option my_long_options[] =
{
......@@ -57,6 +58,9 @@ static struct my_option my_long_options[] =
{ "order", 'o', "Sort resultset according to index",
(gptr*) &_order, (gptr*) &_order, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "descending", 'z', "Sort descending (requires order flag)",
(gptr*) &_descending, (gptr*) &_descending, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "header", 'h', "Print header",
(gptr*) &_header, (gptr*) &_header, 0,
GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0 },
......@@ -151,6 +155,11 @@ int main(int argc, char** argv){
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
if (_descending && ! _order) {
ndbout << " Descending flag given without order flag" << endl;
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
if (scanReadRecords(&MyNdb,
pTab,
pIdx,
......@@ -158,7 +167,7 @@ int main(int argc, char** argv){
_lock,
_header,
_useHexFormat,
(char)*_delimiter, _order) != 0){
(char)*_delimiter, _order, _descending) != 0){
return NDBT_ProgramExit(NDBT_FAILED);
}
......@@ -173,7 +182,7 @@ int scanReadRecords(Ndb* pNdb,
int _lock,
bool headers,
bool useHexFormat,
char delimiter, bool order){
char delimiter, bool order, bool descending){
int retryAttempt = 0;
const int retryMax = 100;
......@@ -225,13 +234,13 @@ int scanReadRecords(Ndb* pNdb,
break;
case 3:
rs = pIOp->readTuples(NdbScanOperation::LM_CommittedRead, 0, parallel,
true);
true, descending);
break;
case 4:
rs = pIOp->readTuples(NdbScanOperation::LM_Read, 0, parallel, true);
rs = pIOp->readTuples(NdbScanOperation::LM_Read, 0, parallel, true, descending);
break;
case 5:
rs = pIOp->readTuples(NdbScanOperation::LM_Exclusive, 0, parallel, true);
rs = pIOp->readTuples(NdbScanOperation::LM_Exclusive, 0, parallel, true, descending);
break;
case 0:
default:
......
......@@ -4926,7 +4926,7 @@ ha_ndbcluster::read_multi_range_first(key_multi_range **found_range_p,
end_of_buffer -= reclength;
}
else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab))
&& !scanOp->readTuples(lm, 0, parallelism, sorted, true) &&
&& !scanOp->readTuples(lm, 0, parallelism, sorted, false, true) &&
!define_read_attrs(end_of_buffer-reclength, scanOp))
{
m_multi_cursor= scanOp;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment