Commit 3d218024 authored by pekka@mysql.com's avatar pekka@mysql.com

NDB tux optim 16 - binary search on bounding node when adding entry

parent e0c36040
...@@ -424,8 +424,9 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node) ...@@ -424,8 +424,9 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node)
} }
data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize; data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize;
const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data; const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data;
for (unsigned pos = 0; pos < numpos; pos++) // print entries in logical order
out << " " << entList[pos]; for (unsigned pos = 1; pos <= numpos; pos++)
out << " " << entList[pos % numpos];
out << "]"; out << "]";
} }
out << "]"; out << "]";
......
...@@ -120,7 +120,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -120,7 +120,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
searchToAdd(signal, frag, c_searchKey, ent, treePos); searchToAdd(signal, frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << endl; debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
} }
#endif #endif
if (treePos.m_match) { if (treePos.m_match) {
...@@ -154,7 +154,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -154,7 +154,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
searchToRemove(signal, frag, c_searchKey, ent, treePos); searchToRemove(signal, frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << endl; debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
} }
#endif #endif
if (! treePos.m_match) { if (! treePos.m_match) {
......
...@@ -31,10 +31,11 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -31,10 +31,11 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag); NodeHandle currNode(frag);
currNode.m_loc = tree.m_root; currNode.m_loc = tree.m_root;
// assume success
treePos.m_match = false;
if (currNode.m_loc == NullTupLoc) { if (currNode.m_loc == NullTupLoc) {
// empty tree // empty tree
jam(); jam();
treePos.m_match = false;
return; return;
} }
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
...@@ -93,6 +94,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -93,6 +94,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
jam(); jam();
treePos.m_loc = currNode.m_loc; treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0; treePos.m_pos = 0;
// failed
treePos.m_match = true; treePos.m_match = true;
return; return;
} }
...@@ -100,9 +102,16 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -100,9 +102,16 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
} }
// access rest of current node // access rest of current node
accessNode(signal, currNode, AccFull); accessNode(signal, currNode, AccFull);
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) { // anticipate
jam(); treePos.m_loc = currNode.m_loc;
// binary search
int lo = -1;
int hi = currNode.getOccup();
int ret; int ret;
while (1) {
jam();
// hi - lo > 1 implies lo < j < hi
int j = (hi + lo) / 2;
// read and compare attributes // read and compare attributes
unsigned start = 0; unsigned start = 0;
readKeyAttrs(frag, currNode.getEnt(j), start, c_entryKey); readKeyAttrs(frag, currNode.getEnt(j), start, c_entryKey);
...@@ -113,25 +122,38 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -113,25 +122,38 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
// keys are equal, compare entry values // keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getEnt(j)); ret = searchEnt.cmp(currNode.getEnt(j));
} }
if (ret <= 0) { if (ret < 0)
jam(); hi = j;
treePos.m_loc = currNode.m_loc; else if (ret > 0)
lo = j;
else {
treePos.m_pos = j; treePos.m_pos = j;
treePos.m_match = (ret == 0); // failed
treePos.m_match = true;
return; return;
} }
if (hi - lo == 1)
break;
}
if (ret < 0) {
jam();
treePos.m_pos = hi;
return;
}
if (hi < currNode.getOccup()) {
jam();
treePos.m_pos = hi;
return;
}
if (bottomNode.isNull()) {
jam();
treePos.m_pos = hi;
return;
} }
if (! bottomNode.isNull()) {
jam(); jam();
// backwards compatible for now // backwards compatible for now
treePos.m_loc = bottomNode.m_loc; treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0; treePos.m_pos = 0;
treePos.m_match = false;
return;
}
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup();
treePos.m_match = false;
} }
/* /*
...@@ -150,9 +172,12 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s ...@@ -150,9 +172,12 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag); NodeHandle currNode(frag);
currNode.m_loc = tree.m_root; currNode.m_loc = tree.m_root;
// assume success
treePos.m_match = true;
if (currNode.m_loc == NullTupLoc) { if (currNode.m_loc == NullTupLoc) {
// empty tree // empty tree
jam(); jam();
// failed
treePos.m_match = false; treePos.m_match = false;
return; return;
} }
...@@ -206,27 +231,26 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s ...@@ -206,27 +231,26 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
jam(); jam();
treePos.m_loc = currNode.m_loc; treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0; treePos.m_pos = 0;
treePos.m_match = true;
return; return;
} }
break; break;
} }
// access rest of current node // access rest of current node
accessNode(signal, currNode, AccFull); accessNode(signal, currNode, AccFull);
// anticipate
treePos.m_loc = currNode.m_loc;
// pos 0 was handled above // pos 0 was handled above
for (unsigned j = 1, occup = currNode.getOccup(); j < occup; j++) { for (unsigned j = 1, occup = currNode.getOccup(); j < occup; j++) {
jam(); jam();
// compare only the entry // compare only the entry
if (searchEnt.eq(currNode.getEnt(j))) { if (searchEnt.eq(currNode.getEnt(j))) {
jam(); jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j; treePos.m_pos = j;
treePos.m_match = true;
return; return;
} }
} }
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup(); treePos.m_pos = currNode.getOccup();
// failed
treePos.m_match = false; treePos.m_match = false;
} }
......
...@@ -115,4 +115,9 @@ optim 15 mc02/a 34 ms 60 ms 72 pct ...@@ -115,4 +115,9 @@ optim 15 mc02/a 34 ms 60 ms 72 pct
[ corrected wasted space in index node ] [ corrected wasted space in index node ]
optim 16 mc02/a 34 ms 53 ms 53 pct
mc02/b 42 ms 75 ms 75 pct
[ case a, b: binary search of bounding node when adding entry ]
vim: set et: vim: set et:
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment