Commit b0844011 authored by pekka@orca.ndb.mysql.com's avatar pekka@orca.ndb.mysql.com

Merge orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my50-bug20446

into  orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my51-bug20446
parents ae1b40ba 2fc65dba
...@@ -192,6 +192,7 @@ private: ...@@ -192,6 +192,7 @@ private:
unsigned m_tupVersion : 15; // version unsigned m_tupVersion : 15; // version
TreeEnt(); TreeEnt();
// methods // methods
bool eqtuple(const TreeEnt ent) const;
bool eq(const TreeEnt ent) const; bool eq(const TreeEnt ent) const;
int cmp(const TreeEnt ent) const; int cmp(const TreeEnt ent) const;
}; };
...@@ -270,8 +271,7 @@ private: ...@@ -270,8 +271,7 @@ private:
struct TreePos { struct TreePos {
TupLoc m_loc; // physical node address TupLoc m_loc; // physical node address
Uint16 m_pos; // position 0 to m_occup Uint16 m_pos; // position 0 to m_occup
Uint8 m_match; // at an existing entry Uint8 m_dir; // see scanNext
Uint8 m_dir; // see scanNext()
TreePos(); TreePos();
}; };
...@@ -362,12 +362,13 @@ private: ...@@ -362,12 +362,13 @@ private:
enum { enum {
Undef = 0, Undef = 0,
First = 1, // before first entry First = 1, // before first entry
Current = 2, // at current before locking Current = 2, // at some entry
Blocked = 3, // at current waiting for ACC lock Found = 3, // return current as next scan result
Locked = 4, // at current and locked or no lock needed Blocked = 4, // found and waiting for ACC lock
Next = 5, // looking for next extry Locked = 5, // found and locked or no lock needed
Last = 6, // after last entry Next = 6, // looking for next extry
Aborting = 7, // lock wait at scan close Last = 7, // after last entry
Aborting = 8, // lock wait at scan close
Invalid = 9 // cannot return REF to LQH currently Invalid = 9 // cannot return REF to LQH currently
}; };
Uint16 m_state; Uint16 m_state;
...@@ -544,6 +545,7 @@ private: ...@@ -544,6 +545,7 @@ private:
void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData); void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData);
void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize); void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize); void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
void unpackBound(const ScanBound& bound, Data data);
/* /*
* DbtuxMeta.cpp * DbtuxMeta.cpp
...@@ -618,7 +620,9 @@ private: ...@@ -618,7 +620,9 @@ private:
void execACCKEYREF(Signal* signal); void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal); void execACC_ABORTCONF(Signal* signal);
void scanFirst(ScanOpPtr scanPtr); void scanFirst(ScanOpPtr scanPtr);
void scanFind(ScanOpPtr scanPtr);
void scanNext(ScanOpPtr scanPtr, bool fromMaintReq); void scanNext(ScanOpPtr scanPtr, bool fromMaintReq);
bool scanCheck(ScanOpPtr scanPtr, TreeEnt ent);
bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent); bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr); void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp); void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
...@@ -628,8 +632,8 @@ private: ...@@ -628,8 +632,8 @@ private:
/* /*
* DbtuxSearch.cpp * DbtuxSearch.cpp
*/ */
void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); bool searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); bool searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos); void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
...@@ -793,6 +797,14 @@ Dbtux::TreeEnt::TreeEnt() : ...@@ -793,6 +797,14 @@ Dbtux::TreeEnt::TreeEnt() :
{ {
} }
inline bool
Dbtux::TreeEnt::eqtuple(const TreeEnt ent) const
{
return
m_tupLoc == ent.m_tupLoc &&
m_fragBit == ent.m_fragBit;
}
inline bool inline bool
Dbtux::TreeEnt::eq(const TreeEnt ent) const Dbtux::TreeEnt::eq(const TreeEnt ent) const
{ {
...@@ -812,10 +824,25 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const ...@@ -812,10 +824,25 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const
return -1; return -1;
if (m_tupLoc.getPageOffset() > ent.m_tupLoc.getPageOffset()) if (m_tupLoc.getPageOffset() > ent.m_tupLoc.getPageOffset())
return +1; return +1;
if (m_tupVersion < ent.m_tupVersion) /*
return -1; * Guess if one tuple version has wrapped around. This is well
if (m_tupVersion > ent.m_tupVersion) * defined ordering on existing versions since versions are assigned
return +1; * consecutively and different versions exists only on uncommitted
* tuple. Assuming max 2**14 uncommitted ops on same tuple.
*/
const unsigned version_wrap_limit = (1 << (ZTUP_VERSION_BITS - 1));
if (m_tupVersion < ent.m_tupVersion) {
if (ent.m_tupVersion - m_tupVersion < version_wrap_limit)
return -1;
else
return +1;
}
if (m_tupVersion > ent.m_tupVersion) {
if (m_tupVersion - ent.m_tupVersion < version_wrap_limit)
return +1;
else
return -1;
}
return 0; return 0;
} }
...@@ -883,7 +910,6 @@ inline ...@@ -883,7 +910,6 @@ inline
Dbtux::TreePos::TreePos() : Dbtux::TreePos::TreePos() :
m_loc(), m_loc(),
m_pos(ZNIL), m_pos(ZNIL),
m_match(false),
m_dir(255) m_dir(255)
{ {
} }
......
...@@ -280,7 +280,6 @@ operator<<(NdbOut& out, const Dbtux::TreePos& pos) ...@@ -280,7 +280,6 @@ operator<<(NdbOut& out, const Dbtux::TreePos& pos)
out << "[TreePos " << hex << &pos; out << "[TreePos " << hex << &pos;
out << " [loc " << pos.m_loc << "]"; out << " [loc " << pos.m_loc << "]";
out << " [pos " << dec << pos.m_pos << "]"; out << " [pos " << dec << pos.m_pos << "]";
out << " [match " << dec << pos.m_match << "]";
out << " [dir " << dec << pos.m_dir << "]"; out << " [dir " << dec << pos.m_dir << "]";
out << "]"; out << "]";
return out; return out;
......
...@@ -318,4 +318,17 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 ...@@ -318,4 +318,17 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2
#endif #endif
} }
void
Dbtux::unpackBound(const ScanBound& bound, Data dest)
{
ScanBoundIterator iter;
bound.first(iter);
const unsigned n = bound.getSize();
unsigned j;
for (j = 0; j < n; j++) {
dest[j] = *iter.data;
bound.next(iter);
}
}
BLOCK_FUNCTIONS(Dbtux) BLOCK_FUNCTIONS(Dbtux)
...@@ -111,16 +111,17 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -111,16 +111,17 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
// do the operation // do the operation
req->errorCode = 0; req->errorCode = 0;
TreePos treePos; TreePos treePos;
bool ok;
switch (opCode) { switch (opCode) {
case TuxMaintReq::OpAdd: case TuxMaintReq::OpAdd:
jam(); jam();
searchToAdd(frag, c_searchKey, ent, treePos); ok = searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (! ok ? " - error" : "") << endl;
} }
#endif #endif
if (treePos.m_match) { if (! ok) {
jam(); jam();
// there is no "Building" state so this will have to do // there is no "Building" state so this will have to do
if (indexPtr.p->m_state == Index::Online) { if (indexPtr.p->m_state == Index::Online) {
...@@ -150,13 +151,13 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -150,13 +151,13 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break; break;
case TuxMaintReq::OpRemove: case TuxMaintReq::OpRemove:
jam(); jam();
searchToRemove(frag, c_searchKey, ent, treePos); ok = searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (! ok ? " - error" : "") << endl;
} }
#endif #endif
if (! treePos.m_match) { if (! ok) {
jam(); jam();
// there is no "Building" state so this will have to do // there is no "Building" state so this will have to do
if (indexPtr.p->m_state == Index::Online) { if (indexPtr.p->m_state == Index::Online) {
......
...@@ -424,21 +424,17 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -424,21 +424,17 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam(); jam();
// search is done only once in single range scan // search is done only once in single range scan
scanFirst(scanPtr); scanFirst(scanPtr);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "First scan " << scanPtr.i << " " << scan << endl;
}
#endif
} }
if (scan.m_state == ScanOp::Next) { if (scan.m_state == ScanOp::Current ||
scan.m_state == ScanOp::Next) {
jam(); jam();
// look for next // look for next
scanNext(scanPtr, false); scanFind(scanPtr);
} }
// for reading tuple key in Current or Locked state // for reading tuple key in Found or Locked state
Data pkData = c_dataBuffer; Data pkData = c_dataBuffer;
unsigned pkSize = 0; // indicates not yet done unsigned pkSize = 0; // indicates not yet done
if (scan.m_state == ScanOp::Current) { if (scan.m_state == ScanOp::Found) {
// found an entry to return // found an entry to return
jam(); jam();
ndbrequire(scan.m_accLockOp == RNIL); ndbrequire(scan.m_accLockOp == RNIL);
...@@ -512,8 +508,8 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -512,8 +508,8 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
jam(); jam();
// max ops should depend on max scans (assert only) // max ops should depend on max scans (assert only)
ndbassert(false); ndbassert(false);
// stay in Current state // stay in Found state
scan.m_state = ScanOp::Current; scan.m_state = ScanOp::Found;
signal->theData[0] = scan.m_userPtr; signal->theData[0] = scan.m_userPtr;
signal->theData[1] = true; signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
...@@ -700,44 +696,95 @@ Dbtux::execACC_ABORTCONF(Signal* signal) ...@@ -700,44 +696,95 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
} }
/* /*
* Find start position for single range scan. If it exists, sets state * Find start position for single range scan.
* to Next and links the scan to the node. The first entry is returned
* by scanNext.
*/ */
void void
Dbtux::scanFirst(ScanOpPtr scanPtr) Dbtux::scanFirst(ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Enter first scan " << scanPtr.i << " " << scan << endl;
}
#endif
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
// set up index keys for this operation // set up index keys for this operation
setKeyAttrs(frag); setKeyAttrs(frag);
// scan direction 0, 1 // scan direction 0, 1
const unsigned idir = scan.m_descending; const unsigned idir = scan.m_descending;
// unpack start key into c_dataBuffer unpackBound(*scan.m_bound[idir], c_dataBuffer);
const ScanBound& bound = *scan.m_bound[idir];
ScanBoundIterator iter;
bound.first(iter);
for (unsigned j = 0; j < bound.getSize(); j++) {
jam();
c_dataBuffer[j] = *iter.data;
bound.next(iter);
}
TreePos treePos; TreePos treePos;
searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos); searchToScan(frag, c_dataBuffer, scan.m_boundCnt[idir], scan.m_descending, treePos);
if (treePos.m_loc == NullTupLoc) { if (treePos.m_loc != NullTupLoc) {
// empty result set scan.m_scanPos = treePos;
// link the scan to node found
NodeHandle node(frag);
selectNode(node, treePos.m_loc);
linkScan(node, scanPtr);
if (treePos.m_dir == 3) {
jam();
// check upper bound
TreeEnt ent = node.getEnt(treePos.m_pos);
if (scanCheck(scanPtr, ent))
scan.m_state = ScanOp::Current;
else
scan.m_state = ScanOp::Last;
} else {
scan.m_state = ScanOp::Next;
}
} else {
jam(); jam();
scan.m_state = ScanOp::Last; scan.m_state = ScanOp::Last;
return;
} }
// set position and state #ifdef VM_TRACE
scan.m_scanPos = treePos; if (debugFlags & DebugScan) {
scan.m_state = ScanOp::Next; debugOut << "Leave first scan " << scanPtr.i << " " << scan << endl;
// link the scan to node found }
NodeHandle node(frag); #endif
selectNode(node, treePos.m_loc); }
linkScan(node, scanPtr);
/*
* Look for entry to return as scan result.
*/
void
Dbtux::scanFind(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Enter find scan " << scanPtr.i << " " << scan << endl;
}
#endif
ndbrequire(scan.m_state == ScanOp::Current || scan.m_state == ScanOp::Next);
while (1) {
jam();
if (scan.m_state == ScanOp::Next)
scanNext(scanPtr, false);
if (scan.m_state == ScanOp::Current) {
jam();
const TreePos pos = scan.m_scanPos;
NodeHandle node(frag);
selectNode(node, pos.m_loc);
const TreeEnt ent = node.getEnt(pos.m_pos);
if (scanVisible(scanPtr, ent)) {
jam();
scan.m_state = ScanOp::Found;
scan.m_scanEnt = ent;
break;
}
} else {
jam();
break;
}
scan.m_state = ScanOp::Next;
}
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Leave find scan " << scanPtr.i << " " << scan << endl;
}
#endif
} }
/* /*
...@@ -755,6 +802,11 @@ Dbtux::scanFirst(ScanOpPtr scanPtr) ...@@ -755,6 +802,11 @@ Dbtux::scanFirst(ScanOpPtr scanPtr)
* *
* If an entry was found, scan direction is 3. Therefore tree * If an entry was found, scan direction is 3. Therefore tree
* re-organizations need not worry about scan direction. * re-organizations need not worry about scan direction.
*
* This method is also used to move a scan when its entry is removed
* (see moveScanList). If the scan is Blocked, we check if it remains
* Blocked on a different version of the tuple. Otherwise the tuple is
* lost and state becomes Current.
*/ */
void void
Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
...@@ -762,8 +814,8 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) ...@@ -762,8 +814,8 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & (DebugMaint | DebugScan)) {
debugOut << "Next in scan " << scanPtr.i << " " << scan << endl; debugOut << "Enter next scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
// cannot be moved away from tuple we have locked // cannot be moved away from tuple we have locked
...@@ -773,15 +825,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) ...@@ -773,15 +825,7 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
// scan direction // scan direction
const unsigned idir = scan.m_descending; // 0, 1 const unsigned idir = scan.m_descending; // 0, 1
const int jdir = 1 - 2 * (int)idir; // 1, -1 const int jdir = 1 - 2 * (int)idir; // 1, -1
// unpack end key into c_dataBuffer unpackBound(*scan.m_bound[1 - idir], c_dataBuffer);
const ScanBound& bound = *scan.m_bound[1 - idir];
ScanBoundIterator iter;
bound.first(iter);
for (unsigned j = 0; j < bound.getSize(); j++) {
jam();
c_dataBuffer[j] = *iter.data;
bound.next(iter);
}
// use copy of position // use copy of position
TreePos pos = scan.m_scanPos; TreePos pos = scan.m_scanPos;
// get and remember original node // get and remember original node
...@@ -795,15 +839,14 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) ...@@ -795,15 +839,14 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
while (true) { while (true) {
jam(); jam();
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & (DebugMaint | DebugScan)) {
debugOut << "Scan next pos " << pos << " " << node << endl; debugOut << "Current scan " << scanPtr.i << " pos " << pos << " node " << node << endl;
} }
#endif #endif
if (pos.m_dir == 2) { if (pos.m_dir == 2) {
// coming up from root ends the scan // coming up from root ends the scan
jam(); jam();
pos.m_loc = NullTupLoc; pos.m_loc = NullTupLoc;
scan.m_state = ScanOp::Last;
break; break;
} }
if (node.m_loc != pos.m_loc) { if (node.m_loc != pos.m_loc) {
...@@ -835,41 +878,22 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) ...@@ -835,41 +878,22 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
if (pos.m_dir == idir) { if (pos.m_dir == idir) {
// coming up from left child scan current node // coming up from left child scan current node
jam(); jam();
pos.m_pos = idir == 0 ? 0 : occup - 1; pos.m_pos = idir == 0 ? (Uint16)-1 : occup;
pos.m_match = false;
pos.m_dir = 3; pos.m_dir = 3;
} }
if (pos.m_dir == 3) { if (pos.m_dir == 3) {
// within node // before or within node
jam(); jam();
// advance position // advance position - becomes ZNIL (> occup) if 0 and descending
if (! pos.m_match) pos.m_pos += jdir;
pos.m_match = true;
else
// becomes ZNIL (which is > occup) if 0 and scan descending
pos.m_pos += jdir;
if (pos.m_pos < occup) { if (pos.m_pos < occup) {
jam(); jam();
ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged pos.m_dir = 3; // unchanged
// read and compare all attributes ent = node.getEnt(pos.m_pos);
readKeyAttrs(frag, ent, 0, c_entryKey); if (! scanCheck(scanPtr, ent)) {
int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, scan.m_boundCnt[1 - idir], c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (jdir * ret < 0) {
jam(); jam();
// hit upper bound of single range scan
pos.m_loc = NullTupLoc; pos.m_loc = NullTupLoc;
scan.m_state = ScanOp::Last;
break;
}
// can we see it
if (! scanVisible(scanPtr, ent)) {
jam();
continue;
} }
// found entry
scan.m_state = ScanOp::Current;
break; break;
} }
// after node proceed to right child // after node proceed to right child
...@@ -895,30 +919,63 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq) ...@@ -895,30 +919,63 @@ Dbtux::scanNext(ScanOpPtr scanPtr, bool fromMaintReq)
// copy back position // copy back position
scan.m_scanPos = pos; scan.m_scanPos = pos;
// relink // relink
if (scan.m_state == ScanOp::Current) { if (pos.m_loc != NullTupLoc) {
ndbrequire(pos.m_match == true && pos.m_dir == 3); ndbrequire(pos.m_dir == 3);
ndbrequire(pos.m_loc == node.m_loc); ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) { if (origNode.m_loc != node.m_loc) {
jam(); jam();
unlinkScan(origNode, scanPtr); unlinkScan(origNode, scanPtr);
linkScan(node, scanPtr); linkScan(node, scanPtr);
} }
// copy found entry if (scan.m_state != ScanOp::Blocked) {
scan.m_scanEnt = ent; scan.m_state = ScanOp::Current;
} else if (scan.m_state == ScanOp::Last) { } else {
jam();
ndbrequire(fromMaintReq);
TreeEnt& scanEnt = scan.m_scanEnt;
ndbrequire(scanEnt.m_tupLoc != NullTupLoc);
if (scanEnt.eqtuple(ent)) {
// remains blocked on another version
scanEnt = ent;
} else {
jam();
scanEnt.m_tupLoc = NullTupLoc;
scan.m_state = ScanOp::Current;
}
}
} else {
jam(); jam();
ndbrequire(pos.m_loc == NullTupLoc);
unlinkScan(origNode, scanPtr); unlinkScan(origNode, scanPtr);
} else { scan.m_state = ScanOp::Last;
ndbrequire(false);
} }
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & (DebugMaint | DebugScan)) {
debugOut << "Next out scan " << scanPtr.i << " " << scan << endl; debugOut << "Leave next scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
} }
/*
* Check end key. Return true if scan is still within range.
*/
bool
Dbtux::scanCheck(ScanOpPtr scanPtr, TreeEnt ent)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
const unsigned idir = scan.m_descending;
const int jdir = 1 - 2 * (int)idir;
unpackBound(*scan.m_bound[1 - idir], c_dataBuffer);
unsigned boundCnt = scan.m_boundCnt[1 - idir];
readKeyAttrs(frag, ent, 0, c_entryKey);
int ret = cmpScanBound(frag, 1 - idir, c_dataBuffer, boundCnt, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (jdir * ret > 0)
return true;
// hit upper bound of single range scan
return false;
}
/* /*
* Check if an entry is visible to the scan. * Check if an entry is visible to the scan.
* *
......
...@@ -21,22 +21,18 @@ ...@@ -21,22 +21,18 @@
* Search for entry to add. * Search for entry to add.
* *
* Similar to searchToRemove (see below). * Similar to searchToRemove (see below).
*
* TODO optimize for initial equal attrs in node min/max
*/ */
void bool
Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag); NodeHandle currNode(frag);
currNode.m_loc = tree.m_root; currNode.m_loc = tree.m_root;
// assume success
treePos.m_match = false;
if (currNode.m_loc == NullTupLoc) { if (currNode.m_loc == NullTupLoc) {
// empty tree // empty tree
jam(); jam();
return; return true;
} }
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
/* /*
...@@ -94,9 +90,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& ...@@ -94,9 +90,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
jam(); jam();
treePos.m_loc = currNode.m_loc; treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0; treePos.m_pos = 0;
// failed // entry found - error
treePos.m_match = true; return false;
return;
} }
break; break;
} }
...@@ -104,7 +99,7 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& ...@@ -104,7 +99,7 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
treePos.m_loc = currNode.m_loc; treePos.m_loc = currNode.m_loc;
// binary search // binary search
int lo = -1; int lo = -1;
unsigned hi = currNode.getOccup(); int hi = currNode.getOccup();
int ret; int ret;
while (1) { while (1) {
jam(); jam();
...@@ -126,9 +121,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& ...@@ -126,9 +121,8 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
lo = j; lo = j;
else { else {
treePos.m_pos = j; treePos.m_pos = j;
// failed // entry found - error
treePos.m_match = true; return false;
return;
} }
if (hi - lo == 1) if (hi - lo == 1)
break; break;
...@@ -136,22 +130,23 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& ...@@ -136,22 +130,23 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
if (ret < 0) { if (ret < 0) {
jam(); jam();
treePos.m_pos = hi; treePos.m_pos = hi;
return; return true;
} }
if (hi < currNode.getOccup()) { if (hi < currNode.getOccup()) {
jam(); jam();
treePos.m_pos = hi; treePos.m_pos = hi;
return; return true;
} }
if (bottomNode.isNull()) { if (bottomNode.isNull()) {
jam(); jam();
treePos.m_pos = hi; treePos.m_pos = hi;
return; return true;
} }
jam(); jam();
// backwards compatible for now // backwards compatible for now
treePos.m_loc = bottomNode.m_loc; treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0; treePos.m_pos = 0;
return true;
} }
/* /*
...@@ -163,21 +158,17 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& ...@@ -163,21 +158,17 @@ Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos&
* then the saved node is the g.l.b of the final node and we move back * then the saved node is the g.l.b of the final node and we move back
* to it. * to it.
*/ */
void bool
Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag); NodeHandle currNode(frag);
currNode.m_loc = tree.m_root; currNode.m_loc = tree.m_root;
// assume success
treePos.m_match = true;
if (currNode.m_loc == NullTupLoc) { if (currNode.m_loc == NullTupLoc) {
// empty tree // empty tree - failed
jam(); jam();
// failed return false;
treePos.m_match = false;
return;
} }
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) { while (true) {
...@@ -229,7 +220,7 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo ...@@ -229,7 +220,7 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo
jam(); jam();
treePos.m_loc = currNode.m_loc; treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0; treePos.m_pos = 0;
return; return true;
} }
break; break;
} }
...@@ -242,12 +233,12 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo ...@@ -242,12 +233,12 @@ Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePo
if (searchEnt.eq(currNode.getEnt(j))) { if (searchEnt.eq(currNode.getEnt(j))) {
jam(); jam();
treePos.m_pos = j; treePos.m_pos = j;
return; return true;
} }
} }
treePos.m_pos = currNode.getOccup(); treePos.m_pos = currNode.getOccup();
// failed // not found - failed
treePos.m_match = false; return false;
} }
/* /*
...@@ -278,8 +269,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun ...@@ -278,8 +269,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun
currNode.m_loc = tree.m_root; currNode.m_loc = tree.m_root;
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
// always before entry
treePos.m_match = false;
while (true) { while (true) {
jam(); jam();
selectNode(currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
...@@ -315,7 +304,7 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun ...@@ -315,7 +304,7 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun
treePos.m_dir = 3; treePos.m_dir = 3;
return; return;
} }
} else if (ret > 0) { } else {
// bound is at or right of this node // bound is at or right of this node
jam(); jam();
const TupLoc loc = currNode.getLink(1); const TupLoc loc = currNode.getLink(1);
...@@ -327,8 +316,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun ...@@ -327,8 +316,6 @@ Dbtux::searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCoun
currNode.m_loc = loc; currNode.m_loc = loc;
continue; continue;
} }
} else {
ndbrequire(false);
} }
break; break;
} }
...@@ -369,8 +356,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou ...@@ -369,8 +356,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou
currNode.m_loc = tree.m_root; currNode.m_loc = tree.m_root;
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
// always before entry
treePos.m_match = false;
while (true) { while (true) {
jam(); jam();
selectNode(currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
...@@ -403,7 +388,7 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou ...@@ -403,7 +388,7 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou
// empty result set // empty result set
return; return;
} }
} else if (ret > 0) { } else {
// bound is at or right of this node // bound is at or right of this node
jam(); jam();
const TupLoc loc = currNode.getLink(1); const TupLoc loc = currNode.getLink(1);
...@@ -415,8 +400,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou ...@@ -415,8 +400,6 @@ Dbtux::searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCou
currNode.m_loc = loc; currNode.m_loc = loc;
continue; continue;
} }
} else {
ndbrequire(false);
} }
break; break;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment