Commit 11bfc5ec authored by pekka@mysql.com's avatar pekka@mysql.com

NDB wl-1533 tux optim: after wl-1942 can remove signal from many methods

parent abcd2994
...@@ -32,7 +32,6 @@ ...@@ -32,7 +32,6 @@
// signal classes // signal classes
#include <signaldata/DictTabInfo.hpp> #include <signaldata/DictTabInfo.hpp>
#include <signaldata/TuxContinueB.hpp> #include <signaldata/TuxContinueB.hpp>
#include <signaldata/BuildIndx.hpp>
#include <signaldata/TupFrag.hpp> #include <signaldata/TupFrag.hpp>
#include <signaldata/AlterIndx.hpp> #include <signaldata/AlterIndx.hpp>
#include <signaldata/DropTab.hpp> #include <signaldata/DropTab.hpp>
...@@ -478,7 +477,7 @@ private: ...@@ -478,7 +477,7 @@ private:
Uint16 m_numAttrs; Uint16 m_numAttrs;
bool m_storeNullKey; bool m_storeNullKey;
TreeHead m_tree; TreeHead m_tree;
TupLoc m_freeLoc; // one node pre-allocated for insert TupLoc m_freeLoc; // list of free index nodes
DLList<ScanOp> m_scanList; // current scans on this fragment DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI; Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2]; Uint32 m_tupTableFragPtrI[2];
...@@ -582,24 +581,24 @@ private: ...@@ -582,24 +581,24 @@ private:
* DbtuxNode.cpp * DbtuxNode.cpp
*/ */
int allocNode(Signal* signal, NodeHandle& node); int allocNode(Signal* signal, NodeHandle& node);
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc); void selectNode(NodeHandle& node, TupLoc loc);
void insertNode(Signal* signal, NodeHandle& node); void insertNode(NodeHandle& node);
void deleteNode(Signal* signal, NodeHandle& node); void deleteNode(NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node); void setNodePref(NodeHandle& node);
// node operations // node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList); void nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);
void nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos); void nodePushUpScans(NodeHandle& node, unsigned pos);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList); void nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList);
void nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos); void nodePopDownScans(NodeHandle& node, unsigned pos);
void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList); void nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList);
void nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos); void nodePushDownScans(NodeHandle& node, unsigned pos);
void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList); void nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList);
void nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos); void nodePopUpScans(NodeHandle& node, unsigned pos);
void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i); void nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i);
// scans linked to node // scans linked to node
void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList); void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList);
void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList); void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList);
void moveScanList(Signal* signal, NodeHandle& node, unsigned pos); void moveScanList(NodeHandle& node, unsigned pos);
void linkScan(NodeHandle& node, ScanOpPtr scanPtr); void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr); void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr); bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
...@@ -608,20 +607,20 @@ private: ...@@ -608,20 +607,20 @@ private:
* DbtuxTree.cpp * DbtuxTree.cpp
*/ */
// add entry // add entry
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent); void treeAdd(Frag& frag, TreePos treePos, TreeEnt ent);
void treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent); void treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent);
void treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i); void treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i);
void treeAddRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i); void treeAddRebalance(Frag& frag, NodeHandle node, unsigned i);
// remove entry // remove entry
void treeRemove(Signal* signal, Frag& frag, TreePos treePos); void treeRemove(Frag& frag, TreePos treePos);
void treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos); void treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos);
void treeRemoveSemi(Signal* signal, Frag& frag, NodeHandle node, unsigned i); void treeRemoveSemi(Frag& frag, NodeHandle node, unsigned i);
void treeRemoveLeaf(Signal* signal, Frag& frag, NodeHandle node); void treeRemoveLeaf(Frag& frag, NodeHandle node);
void treeRemoveNode(Signal* signal, Frag& frag, NodeHandle node); void treeRemoveNode(Frag& frag, NodeHandle node);
void treeRemoveRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i); void treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i);
// rotate // rotate
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); void treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); void treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i);
/* /*
* DbtuxScan.cpp * DbtuxScan.cpp
...@@ -633,9 +632,9 @@ private: ...@@ -633,9 +632,9 @@ private:
void execACCKEYCONF(Signal* signal); void execACCKEYCONF(Signal* signal);
void execACCKEYREF(Signal* signal); void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal); void execACC_ABORTCONF(Signal* signal);
void scanFirst(Signal* signal, ScanOpPtr scanPtr); void scanFirst(ScanOpPtr scanPtr);
void scanNext(Signal* signal, ScanOpPtr scanPtr); void scanNext(ScanOpPtr scanPtr);
bool scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent); bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr); void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp); void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp); void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
...@@ -644,9 +643,9 @@ private: ...@@ -644,9 +643,9 @@ private:
/* /*
* DbtuxSearch.cpp * DbtuxSearch.cpp
*/ */
void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/* /*
* DbtuxCmp.cpp * DbtuxCmp.cpp
...@@ -670,7 +669,7 @@ private: ...@@ -670,7 +669,7 @@ private:
PrintPar(); PrintPar();
}; };
void printTree(Signal* signal, Frag& frag, NdbOut& out); void printTree(Signal* signal, Frag& frag, NdbOut& out);
void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par); void printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
friend class NdbOut& operator<<(NdbOut&, const TupLoc&); friend class NdbOut& operator<<(NdbOut&, const TupLoc&);
friend class NdbOut& operator<<(NdbOut&, const TreeEnt&); friend class NdbOut& operator<<(NdbOut&, const TreeEnt&);
friend class NdbOut& operator<<(NdbOut&, const TreeNode&); friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
......
...@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
strcpy(par.m_path, "."); strcpy(par.m_path, ".");
par.m_side = 2; par.m_side = 2;
par.m_parent = NullTupLoc; par.m_parent = NullTupLoc;
printNode(signal, frag, out, tree.m_root, par); printNode(frag, out, tree.m_root, par);
out.m_out->flush(); out.m_out->flush();
if (! par.m_ok) { if (! par.m_ok) {
if (debugFile == 0) { if (debugFile == 0) {
...@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
} }
void void
Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par) Dbtux::printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
{ {
if (loc == NullTupLoc) { if (loc == NullTupLoc) {
par.m_depth = 0; par.m_depth = 0;
...@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
} }
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, loc); selectNode(node, loc);
out << par.m_path << " " << node << endl; out << par.m_path << " " << node << endl;
// check children // check children
PrintPar cpar[2]; PrintPar cpar[2];
...@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
cpar[i].m_side = i; cpar[i].m_side = i;
cpar[i].m_depth = 0; cpar[i].m_depth = 0;
cpar[i].m_parent = loc; cpar[i].m_parent = loc;
printNode(signal, frag, out, node.getLink(i), cpar[i]); printNode(frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) { if (! cpar[i].m_ok) {
par.m_ok = false; par.m_ok = false;
} }
......
...@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
switch (opCode) { switch (opCode) {
case TuxMaintReq::OpAdd: case TuxMaintReq::OpAdd:
jam(); jam();
searchToAdd(signal, frag, c_searchKey, ent, treePos); searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
...@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break; break;
} }
/* /*
* At most one new node is inserted in the operation. We keep one * At most one new node is inserted in the operation. Pre-allocate
* free node pre-allocated so the operation cannot fail. * it so that the operation cannot fail.
*/ */
if (frag.m_freeLoc == NullTupLoc) { if (frag.m_freeLoc == NullTupLoc) {
jam(); jam();
...@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
jam(); jam();
break; break;
} }
// link to freelist
node.setLink(0, frag.m_freeLoc);
frag.m_freeLoc = node.m_loc; frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc); ndbrequire(frag.m_freeLoc != NullTupLoc);
} }
treeAdd(signal, frag, treePos, ent); treeAdd(frag, treePos, ent);
break; break;
case TuxMaintReq::OpRemove: case TuxMaintReq::OpRemove:
jam(); jam();
searchToRemove(signal, frag, c_searchKey, ent, treePos); searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
...@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
} }
break; break;
} }
treeRemove(signal, frag, treePos); treeRemove(frag, treePos);
break; break;
default: default:
ndbrequire(false); ndbrequire(false);
......
...@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node) ...@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
* Set handle to point to existing node. * Set handle to point to existing node.
*/ */
void void
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc) Dbtux::selectNode(NodeHandle& node, TupLoc loc)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
ndbrequire(loc != NullTupLoc); ndbrequire(loc != NullTupLoc);
...@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc) ...@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
} }
/* /*
* Set handle to point to new node. Uses the pre-allocated node. * Set handle to point to new node. Uses a pre-allocated node.
*/ */
void void
Dbtux::insertNode(Signal* signal, NodeHandle& node) Dbtux::insertNode(NodeHandle& node)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TupLoc loc = frag.m_freeLoc; // unlink from freelist
frag.m_freeLoc = NullTupLoc; selectNode(node, frag.m_freeLoc);
selectNode(signal, node, loc); frag.m_freeLoc = node.getLink(0);
new (node.m_node) TreeNode(); new (node.m_node) TreeNode();
#ifdef VM_TRACE #ifdef VM_TRACE
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node) ...@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node)
} }
/* /*
* Delete existing node. * Delete existing node. Simply put it on the freelist.
*/ */
void void
Dbtux::deleteNode(Signal* signal, NodeHandle& node) Dbtux::deleteNode(NodeHandle& node)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
ndbrequire(node.getOccup() == 0); ndbrequire(node.getOccup() == 0);
TupLoc loc = node.m_loc; // link to freelist
Uint32 pageId = loc.getPageId(); node.setLink(0, frag.m_freeLoc);
Uint32 pageOffset = loc.getPageOffset(); frag.m_freeLoc = node.m_loc;
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node); // invalidate the handle
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
jamEntry();
// invalidate handle and storage
node.m_loc = NullTupLoc; node.m_loc = NullTupLoc;
node.m_node = 0; node.m_node = 0;
} }
...@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node) ...@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
* attribute headers for now. XXX use null mask instead * attribute headers for now. XXX use null mask instead
*/ */
void void
Dbtux::setNodePref(Signal* signal, NodeHandle& node) Dbtux::setNodePref(NodeHandle& node)
{ {
const Frag& frag = node.m_frag; const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
...@@ -121,7 +118,7 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node) ...@@ -121,7 +118,7 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
* Add list of scans at the new entry. * Add list of scans at the new entry.
*/ */
void void
Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList) Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -129,7 +126,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ...@@ -129,7 +126,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
ndbrequire(occup < tree.m_maxOccup && pos <= occup); ndbrequire(occup < tree.m_maxOccup && pos <= occup);
// fix old scans // fix old scans
if (node.getNodeScan() != RNIL) if (node.getNodeScan() != RNIL)
nodePushUpScans(signal, node, pos); nodePushUpScans(node, pos);
// fix node // fix node
TreeEnt* const entList = tree.getEntList(node.m_node); TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0]; entList[occup] = entList[0];
...@@ -146,11 +143,11 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ...@@ -146,11 +143,11 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
addScanList(node, pos, scanList); addScanList(node, pos, scanList);
// fix prefix // fix prefix
if (occup == 0 || pos == 0) if (occup == 0 || pos == 0)
setNodePref(signal, node); setNodePref(node);
} }
void void
Dbtux::nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos) Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos)
{ {
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
...@@ -187,7 +184,7 @@ Dbtux::nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos) ...@@ -187,7 +184,7 @@ Dbtux::nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos)
* else moved forward. * else moved forward.
*/ */
void void
Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList) Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -196,12 +193,12 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, ...@@ -196,12 +193,12 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent,
if (node.getNodeScan() != RNIL) { if (node.getNodeScan() != RNIL) {
// remove or move scans at this position // remove or move scans at this position
if (scanList == 0) if (scanList == 0)
moveScanList(signal, node, pos); moveScanList(node, pos);
else else
removeScanList(node, pos, *scanList); removeScanList(node, pos, *scanList);
// fix other scans // fix other scans
if (node.getNodeScan() != RNIL) if (node.getNodeScan() != RNIL)
nodePopDownScans(signal, node, pos); nodePopDownScans(node, pos);
} }
// fix node // fix node
TreeEnt* const entList = tree.getEntList(node.m_node); TreeEnt* const entList = tree.getEntList(node.m_node);
...@@ -216,11 +213,11 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, ...@@ -216,11 +213,11 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent,
node.setOccup(occup - 1); node.setOccup(occup - 1);
// fix prefix // fix prefix
if (occup != 1 && pos == 0) if (occup != 1 && pos == 0)
setNodePref(signal, node); setNodePref(node);
} }
void void
Dbtux::nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos) Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos)
{ {
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
...@@ -258,7 +255,7 @@ Dbtux::nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos) ...@@ -258,7 +255,7 @@ Dbtux::nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos)
* Return list of scans at the removed position 0. * Return list of scans at the removed position 0.
*/ */
void void
Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList) Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -269,7 +266,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent ...@@ -269,7 +266,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
removeScanList(node, 0, scanList); removeScanList(node, 0, scanList);
// fix other scans // fix other scans
if (node.getNodeScan() != RNIL) if (node.getNodeScan() != RNIL)
nodePushDownScans(signal, node, pos); nodePushDownScans(node, pos);
} }
// fix node // fix node
TreeEnt* const entList = tree.getEntList(node.m_node); TreeEnt* const entList = tree.getEntList(node.m_node);
...@@ -285,11 +282,11 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent ...@@ -285,11 +282,11 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
entList[0] = entList[occup]; entList[0] = entList[occup];
// fix prefix // fix prefix
if (true) if (true)
setNodePref(signal, node); setNodePref(node);
} }
void void
Dbtux::nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos) Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos)
{ {
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
...@@ -328,7 +325,7 @@ Dbtux::nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos) ...@@ -328,7 +325,7 @@ Dbtux::nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos)
* Move scans at removed entry and add scans at the new entry. * Move scans at removed entry and add scans at the new entry.
*/ */
void void
Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList) Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -336,10 +333,10 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, U ...@@ -336,10 +333,10 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, U
ndbrequire(occup <= tree.m_maxOccup && pos < occup); ndbrequire(occup <= tree.m_maxOccup && pos < occup);
if (node.getNodeScan() != RNIL) { if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears // move scans whose entry disappears
moveScanList(signal, node, pos); moveScanList(node, pos);
// fix other scans // fix other scans
if (node.getNodeScan() != RNIL) if (node.getNodeScan() != RNIL)
nodePopUpScans(signal, node, pos); nodePopUpScans(node, pos);
} }
// fix node // fix node
TreeEnt* const entList = tree.getEntList(node.m_node); TreeEnt* const entList = tree.getEntList(node.m_node);
...@@ -358,11 +355,11 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, U ...@@ -358,11 +355,11 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, U
addScanList(node, 0, scanList); addScanList(node, 0, scanList);
// fix prefix // fix prefix
if (true) if (true)
setNodePref(signal, node); setNodePref(node);
} }
void void
Dbtux::nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos) Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos)
{ {
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
...@@ -392,7 +389,7 @@ Dbtux::nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos) ...@@ -392,7 +389,7 @@ Dbtux::nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos)
* (i=0) or after the max (i=1). Expensive but not often used. * (i=0) or after the max (i=1). Expensive but not often used.
*/ */
void void
Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i) Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i)
{ {
Frag& frag = dstNode.m_frag; Frag& frag = dstNode.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -400,8 +397,8 @@ Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsig ...@@ -400,8 +397,8 @@ Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsig
while (cnt != 0) { while (cnt != 0) {
TreeEnt ent; TreeEnt ent;
Uint32 scanList = RNIL; Uint32 scanList = RNIL;
nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList); nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList); nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
cnt--; cnt--;
} }
} }
...@@ -476,7 +473,7 @@ Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList) ...@@ -476,7 +473,7 @@ Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
* method scanNext(). * method scanNext().
*/ */
void void
Dbtux::moveScanList(Signal* signal, NodeHandle& node, unsigned pos) Dbtux::moveScanList(NodeHandle& node, unsigned pos)
{ {
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
...@@ -494,7 +491,7 @@ Dbtux::moveScanList(Signal* signal, NodeHandle& node, unsigned pos) ...@@ -494,7 +491,7 @@ Dbtux::moveScanList(Signal* signal, NodeHandle& node, unsigned pos)
debugOut << "At pos=" << pos << " " << node << endl; debugOut << "At pos=" << pos << " " << node << endl;
} }
#endif #endif
scanNext(signal, scanPtr); scanNext(scanPtr);
ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos)); ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
} }
scanPtr.i = nextPtrI; scanPtr.i = nextPtrI;
......
...@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jam(); jam();
const TupLoc loc = scan.m_scanPos.m_loc; const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, loc); selectNode(node, loc);
unlinkScan(node, scanPtr); unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc; scan.m_scanPos.m_loc = NullTupLoc;
} }
...@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::First) { if (scan.m_state == ScanOp::First) {
jam(); jam();
// search is done only once in single range scan // search is done only once in single range scan
scanFirst(signal, scanPtr); scanFirst(scanPtr);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & DebugScan) {
debugOut << "First scan " << scanPtr.i << " " << scan << endl; debugOut << "First scan " << scanPtr.i << " " << scan << endl;
...@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::Next) { if (scan.m_state == ScanOp::Next) {
jam(); jam();
// look for next // look for next
scanNext(signal, scanPtr); scanNext(scanPtr);
} }
// for reading tuple key in Current or Locked state // for reading tuple key in Current or Locked state
Data pkData = c_dataBuffer; Data pkData = c_dataBuffer;
...@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal) ...@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
* by scanNext. * by scanNext.
*/ */
void void
Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) Dbtux::scanFirst(ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
...@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
} }
// search for scan start position // search for scan start position
TreePos treePos; TreePos treePos;
searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos); searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
if (treePos.m_loc == NullTupLoc) { if (treePos.m_loc == NullTupLoc) {
// empty tree // empty tree
jam(); jam();
...@@ -710,7 +710,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -710,7 +710,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
// link the scan to node found // link the scan to node found
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc); selectNode(node, treePos.m_loc);
linkScan(node, scanPtr); linkScan(node, scanPtr);
} }
...@@ -730,7 +730,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -730,7 +730,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
* re-organizations need not worry about scan direction. * re-organizations need not worry about scan direction.
*/ */
void void
Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) Dbtux::scanNext(ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
...@@ -739,20 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -739,20 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
debugOut << "Next in scan " << scanPtr.i << " " << scan << endl; debugOut << "Next in scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
if (scan.m_state == ScanOp::Locked) { // cannot be moved away from tuple we have locked
jam(); ndbrequire(scan.m_state != ScanOp::Locked);
// version of a tuple locked by us cannot disappear (assert only)
ndbassert(false);
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
scan.m_state = ScanOp::Current;
}
// set up index keys for this operation // set up index keys for this operation
setKeyAttrs(frag); setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer // unpack upper bound into c_dataBuffer
...@@ -768,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -768,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
TreePos pos = scan.m_scanPos; TreePos pos = scan.m_scanPos;
// get and remember original node // get and remember original node
NodeHandle origNode(frag); NodeHandle origNode(frag);
selectNode(signal, origNode, pos.m_loc); selectNode(origNode, pos.m_loc);
ndbrequire(islinkScan(origNode, scanPtr)); ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop // current node in loop
NodeHandle node = origNode; NodeHandle node = origNode;
...@@ -785,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -785,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
} }
if (node.m_loc != pos.m_loc) { if (node.m_loc != pos.m_loc) {
jam(); jam();
selectNode(signal, node, pos.m_loc); selectNode(node, pos.m_loc);
} }
if (pos.m_dir == 4) { if (pos.m_dir == 4) {
// coming down from parent proceed to left child // coming down from parent proceed to left child
...@@ -833,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -833,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break; break;
} }
// can we see it // can we see it
if (! scanVisible(signal, scanPtr, ent)) { if (! scanVisible(scanPtr, ent)) {
jam(); jam();
continue; continue;
} }
...@@ -896,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -896,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
* which are not analyzed or handled yet. * which are not analyzed or handled yet.
*/ */
bool bool
Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent) Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
{ {
const ScanOp& scan = *scanPtr.p; const ScanOp& scan = *scanPtr.p;
const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
* TODO optimize for initial equal attrs in node min/max * TODO optimize for initial equal attrs in node min/max
*/ */
void void
Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
...@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
unsigned start = 0; unsigned start = 0;
...@@ -164,7 +164,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -164,7 +164,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
* to it. * to it.
*/ */
void void
Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
...@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s ...@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
unsigned start = 0; unsigned start = 0;
...@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s ...@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
* Similar to searchToAdd. * Similar to searchToAdd.
*/ */
void void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos) Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag); NodeHandle currNode(frag);
...@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo ...@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize); ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
......
...@@ -22,27 +22,27 @@ ...@@ -22,27 +22,27 @@
* is the common case given slack in nodes. * is the common case given slack in nodes.
*/ */
void void
Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) Dbtux::treeAdd(Frag& frag, TreePos treePos, TreeEnt ent)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
NodeHandle node(frag); NodeHandle node(frag);
if (treePos.m_loc != NullTupLoc) { if (treePos.m_loc != NullTupLoc) {
// non-empty tree // non-empty tree
jam(); jam();
selectNode(signal, node, treePos.m_loc); selectNode(node, treePos.m_loc);
unsigned pos = treePos.m_pos; unsigned pos = treePos.m_pos;
if (node.getOccup() < tree.m_maxOccup) { if (node.getOccup() < tree.m_maxOccup) {
// node has room // node has room
jam(); jam();
nodePushUp(signal, node, pos, ent, RNIL); nodePushUp(node, pos, ent, RNIL);
return; return;
} }
treeAddFull(signal, frag, node, pos, ent); treeAddFull(frag, node, pos, ent);
return; return;
} }
jam(); jam();
insertNode(signal, node); insertNode(node);
nodePushUp(signal, node, 0, ent, RNIL); nodePushUp(node, 0, ent, RNIL);
node.setSide(2); node.setSide(2);
tree.m_root = node.m_loc; tree.m_root = node.m_loc;
} }
...@@ -53,7 +53,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -53,7 +53,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
* entry of this node. The min entry could be the entry to add. * entry of this node. The min entry could be the entry to add.
*/ */
void void
Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent) Dbtux::treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
TupLoc loc = lubNode.getLink(0); TupLoc loc = lubNode.getLink(0);
...@@ -62,7 +62,7 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, ...@@ -62,7 +62,7 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
NodeHandle glbNode(frag); NodeHandle glbNode(frag);
do { do {
jam(); jam();
selectNode(signal, glbNode, loc); selectNode(glbNode, loc);
loc = glbNode.getLink(1); loc = glbNode.getLink(1);
} while (loc != NullTupLoc); } while (loc != NullTupLoc);
if (glbNode.getOccup() < tree.m_maxOccup) { if (glbNode.getOccup() < tree.m_maxOccup) {
...@@ -72,16 +72,16 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, ...@@ -72,16 +72,16 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
if (pos != 0) { if (pos != 0) {
jam(); jam();
// add the new entry and return min entry // add the new entry and return min entry
nodePushDown(signal, lubNode, pos - 1, ent, scanList); nodePushDown(lubNode, pos - 1, ent, scanList);
} }
// g.l.b node receives min entry from l.u.b node // g.l.b node receives min entry from l.u.b node
nodePushUp(signal, glbNode, glbNode.getOccup(), ent, scanList); nodePushUp(glbNode, glbNode.getOccup(), ent, scanList);
return; return;
} }
treeAddNode(signal, frag, lubNode, pos, ent, glbNode, 1); treeAddNode(frag, lubNode, pos, ent, glbNode, 1);
return; return;
} }
treeAddNode(signal, frag, lubNode, pos, ent, lubNode, 0); treeAddNode(frag, lubNode, pos, ent, lubNode, 0);
} }
/* /*
...@@ -90,10 +90,10 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, ...@@ -90,10 +90,10 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
* becomes the new g.l.b node. * becomes the new g.l.b node.
*/ */
void void
Dbtux::treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i) Dbtux::treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i)
{ {
NodeHandle glbNode(frag); NodeHandle glbNode(frag);
insertNode(signal, glbNode); insertNode(glbNode);
// connect parent and child // connect parent and child
parentNode.setLink(i, glbNode.m_loc); parentNode.setLink(i, glbNode.m_loc);
glbNode.setLink(2, parentNode.m_loc); glbNode.setLink(2, parentNode.m_loc);
...@@ -102,12 +102,12 @@ Dbtux::treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, ...@@ -102,12 +102,12 @@ Dbtux::treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
if (pos != 0) { if (pos != 0) {
jam(); jam();
// add the new entry and return min entry // add the new entry and return min entry
nodePushDown(signal, lubNode, pos - 1, ent, scanList); nodePushDown(lubNode, pos - 1, ent, scanList);
} }
// g.l.b node receives min entry from l.u.b node // g.l.b node receives min entry from l.u.b node
nodePushUp(signal, glbNode, 0, ent, scanList); nodePushUp(glbNode, 0, ent, scanList);
// re-balance the tree // re-balance the tree
treeAddRebalance(signal, frag, parentNode, i); treeAddRebalance(frag, parentNode, i);
} }
/* /*
...@@ -115,7 +115,7 @@ Dbtux::treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, ...@@ -115,7 +115,7 @@ Dbtux::treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
* parent of the added node. * parent of the added node.
*/ */
void void
Dbtux::treeAddRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i) Dbtux::treeAddRebalance(Frag& frag, NodeHandle node, unsigned i)
{ {
while (true) { while (true) {
// height of subtree i has increased by 1 // height of subtree i has increased by 1
...@@ -136,14 +136,14 @@ Dbtux::treeAddRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i) ...@@ -136,14 +136,14 @@ Dbtux::treeAddRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i)
// height of longer subtree increased // height of longer subtree increased
jam(); jam();
NodeHandle childNode(frag); NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(i)); selectNode(childNode, node.getLink(i));
int b2 = childNode.getBalance(); int b2 = childNode.getBalance();
if (b2 == b) { if (b2 == b) {
jam(); jam();
treeRotateSingle(signal, frag, node, i); treeRotateSingle(frag, node, i);
} else if (b2 == -b) { } else if (b2 == -b) {
jam(); jam();
treeRotateDouble(signal, frag, node, i); treeRotateDouble(frag, node, i);
} else { } else {
// height of subtree increased so it cannot be perfectly balanced // height of subtree increased so it cannot be perfectly balanced
ndbrequire(false); ndbrequire(false);
...@@ -160,7 +160,7 @@ Dbtux::treeAddRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i) ...@@ -160,7 +160,7 @@ Dbtux::treeAddRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i)
break; break;
} }
i = node.getSide(); i = node.getSide();
selectNode(signal, node, parentLoc); selectNode(node, parentLoc);
} }
} }
...@@ -171,38 +171,38 @@ Dbtux::treeAddRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i) ...@@ -171,38 +171,38 @@ Dbtux::treeAddRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i)
* not need to consider merge of semi-leaf and leaf. * not need to consider merge of semi-leaf and leaf.
*/ */
void void
Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) Dbtux::treeRemove(Frag& frag, TreePos treePos)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos; unsigned pos = treePos.m_pos;
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc); selectNode(node, treePos.m_loc);
TreeEnt ent; TreeEnt ent;
if (node.getOccup() > tree.m_minOccup) { if (node.getOccup() > tree.m_minOccup) {
// no underflow in any node type // no underflow in any node type
jam(); jam();
nodePopDown(signal, node, pos, ent, 0); nodePopDown(node, pos, ent, 0);
return; return;
} }
if (node.getChilds() == 2) { if (node.getChilds() == 2) {
// underflow in interior node // underflow in interior node
jam(); jam();
treeRemoveInner(signal, frag, node, pos); treeRemoveInner(frag, node, pos);
return; return;
} }
// remove entry in semi/leaf // remove entry in semi/leaf
nodePopDown(signal, node, pos, ent, 0); nodePopDown(node, pos, ent, 0);
if (node.getLink(0) != NullTupLoc) { if (node.getLink(0) != NullTupLoc) {
jam(); jam();
treeRemoveSemi(signal, frag, node, 0); treeRemoveSemi(frag, node, 0);
return; return;
} }
if (node.getLink(1) != NullTupLoc) { if (node.getLink(1) != NullTupLoc) {
jam(); jam();
treeRemoveSemi(signal, frag, node, 1); treeRemoveSemi(frag, node, 1);
return; return;
} }
treeRemoveLeaf(signal, frag, node); treeRemoveLeaf(frag, node);
} }
/* /*
...@@ -211,7 +211,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -211,7 +211,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
* node becomes the min entry of this node. * node becomes the min entry of this node.
*/ */
void void
Dbtux::treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos) Dbtux::treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
TreeEnt ent; TreeEnt ent;
...@@ -220,19 +220,19 @@ Dbtux::treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned ...@@ -220,19 +220,19 @@ Dbtux::treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned
TupLoc loc = lubNode.getLink(0); TupLoc loc = lubNode.getLink(0);
do { do {
jam(); jam();
selectNode(signal, glbNode, loc); selectNode(glbNode, loc);
loc = glbNode.getLink(1); loc = glbNode.getLink(1);
} while (loc != NullTupLoc); } while (loc != NullTupLoc);
// borrow max entry from semi/leaf // borrow max entry from semi/leaf
Uint32 scanList = RNIL; Uint32 scanList = RNIL;
nodePopDown(signal, glbNode, glbNode.getOccup() - 1, ent, &scanList); nodePopDown(glbNode, glbNode.getOccup() - 1, ent, &scanList);
nodePopUp(signal, lubNode, pos, ent, scanList); nodePopUp(lubNode, pos, ent, scanList);
if (glbNode.getLink(0) != NullTupLoc) { if (glbNode.getLink(0) != NullTupLoc) {
jam(); jam();
treeRemoveSemi(signal, frag, glbNode, 0); treeRemoveSemi(frag, glbNode, 0);
return; return;
} }
treeRemoveLeaf(signal, frag, glbNode); treeRemoveLeaf(frag, glbNode);
} }
/* /*
...@@ -241,21 +241,21 @@ Dbtux::treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned ...@@ -241,21 +241,21 @@ Dbtux::treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned
* The leaf may become empty. * The leaf may become empty.
*/ */
void void
Dbtux::treeRemoveSemi(Signal* signal, Frag& frag, NodeHandle semiNode, unsigned i) Dbtux::treeRemoveSemi(Frag& frag, NodeHandle semiNode, unsigned i)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
ndbrequire(semiNode.getChilds() < 2); ndbrequire(semiNode.getChilds() < 2);
TupLoc leafLoc = semiNode.getLink(i); TupLoc leafLoc = semiNode.getLink(i);
NodeHandle leafNode(frag); NodeHandle leafNode(frag);
selectNode(signal, leafNode, leafLoc); selectNode(leafNode, leafLoc);
if (semiNode.getOccup() < tree.m_minOccup) { if (semiNode.getOccup() < tree.m_minOccup) {
jam(); jam();
unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - semiNode.getOccup()); unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - semiNode.getOccup());
nodeSlide(signal, semiNode, leafNode, cnt, i); nodeSlide(semiNode, leafNode, cnt, i);
if (leafNode.getOccup() == 0) { if (leafNode.getOccup() == 0) {
// remove empty leaf // remove empty leaf
jam(); jam();
treeRemoveNode(signal, frag, leafNode); treeRemoveNode(frag, leafNode);
} }
} }
} }
...@@ -266,14 +266,14 @@ Dbtux::treeRemoveSemi(Signal* signal, Frag& frag, NodeHandle semiNode, unsigned ...@@ -266,14 +266,14 @@ Dbtux::treeRemoveSemi(Signal* signal, Frag& frag, NodeHandle semiNode, unsigned
* do nothing. * do nothing.
*/ */
void void
Dbtux::treeRemoveLeaf(Signal* signal, Frag& frag, NodeHandle leafNode) Dbtux::treeRemoveLeaf(Frag& frag, NodeHandle leafNode)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
TupLoc parentLoc = leafNode.getLink(2); TupLoc parentLoc = leafNode.getLink(2);
if (parentLoc != NullTupLoc) { if (parentLoc != NullTupLoc) {
jam(); jam();
NodeHandle parentNode(frag); NodeHandle parentNode(frag);
selectNode(signal, parentNode, parentLoc); selectNode(parentNode, parentLoc);
unsigned i = leafNode.getSide(); unsigned i = leafNode.getSide();
if (parentNode.getLink(1 - i) == NullTupLoc) { if (parentNode.getLink(1 - i) == NullTupLoc) {
// parent is semi-leaf // parent is semi-leaf
...@@ -281,14 +281,14 @@ Dbtux::treeRemoveLeaf(Signal* signal, Frag& frag, NodeHandle leafNode) ...@@ -281,14 +281,14 @@ Dbtux::treeRemoveLeaf(Signal* signal, Frag& frag, NodeHandle leafNode)
if (parentNode.getOccup() < tree.m_minOccup) { if (parentNode.getOccup() < tree.m_minOccup) {
jam(); jam();
unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - parentNode.getOccup()); unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - parentNode.getOccup());
nodeSlide(signal, parentNode, leafNode, cnt, i); nodeSlide(parentNode, leafNode, cnt, i);
} }
} }
} }
if (leafNode.getOccup() == 0) { if (leafNode.getOccup() == 0) {
jam(); jam();
// remove empty leaf // remove empty leaf
treeRemoveNode(signal, frag, leafNode); treeRemoveNode(frag, leafNode);
} }
} }
...@@ -296,20 +296,20 @@ Dbtux::treeRemoveLeaf(Signal* signal, Frag& frag, NodeHandle leafNode) ...@@ -296,20 +296,20 @@ Dbtux::treeRemoveLeaf(Signal* signal, Frag& frag, NodeHandle leafNode)
* Remove empty leaf. * Remove empty leaf.
*/ */
void void
Dbtux::treeRemoveNode(Signal* signal, Frag& frag, NodeHandle leafNode) Dbtux::treeRemoveNode(Frag& frag, NodeHandle leafNode)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
ndbrequire(leafNode.getChilds() == 0); ndbrequire(leafNode.getChilds() == 0);
TupLoc parentLoc = leafNode.getLink(2); TupLoc parentLoc = leafNode.getLink(2);
unsigned i = leafNode.getSide(); unsigned i = leafNode.getSide();
deleteNode(signal, leafNode); deleteNode(leafNode);
if (parentLoc != NullTupLoc) { if (parentLoc != NullTupLoc) {
jam(); jam();
NodeHandle parentNode(frag); NodeHandle parentNode(frag);
selectNode(signal, parentNode, parentLoc); selectNode(parentNode, parentLoc);
parentNode.setLink(i, NullTupLoc); parentNode.setLink(i, NullTupLoc);
// re-balance the tree // re-balance the tree
treeRemoveRebalance(signal, frag, parentNode, i); treeRemoveRebalance(frag, parentNode, i);
return; return;
} }
// tree is now empty // tree is now empty
...@@ -321,7 +321,7 @@ Dbtux::treeRemoveNode(Signal* signal, Frag& frag, NodeHandle leafNode) ...@@ -321,7 +321,7 @@ Dbtux::treeRemoveNode(Signal* signal, Frag& frag, NodeHandle leafNode)
* parent of the removed node. * parent of the removed node.
*/ */
void void
Dbtux::treeRemoveRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i) Dbtux::treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i)
{ {
while (true) { while (true) {
// height of subtree i has decreased by 1 // height of subtree i has decreased by 1
...@@ -343,19 +343,19 @@ Dbtux::treeRemoveRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned ...@@ -343,19 +343,19 @@ Dbtux::treeRemoveRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned
jam(); jam();
// child on the other side // child on the other side
NodeHandle childNode(frag); NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(1 - i)); selectNode(childNode, node.getLink(1 - i));
int b2 = childNode.getBalance(); int b2 = childNode.getBalance();
if (b2 == b) { if (b2 == b) {
jam(); jam();
treeRotateSingle(signal, frag, node, 1 - i); treeRotateSingle(frag, node, 1 - i);
// height of tree decreased and propagates up // height of tree decreased and propagates up
} else if (b2 == -b) { } else if (b2 == -b) {
jam(); jam();
treeRotateDouble(signal, frag, node, 1 - i); treeRotateDouble(frag, node, 1 - i);
// height of tree decreased and propagates up // height of tree decreased and propagates up
} else { } else {
jam(); jam();
treeRotateSingle(signal, frag, node, 1 - i); treeRotateSingle(frag, node, 1 - i);
// height of tree did not change - done // height of tree did not change - done
return; return;
} }
...@@ -369,7 +369,7 @@ Dbtux::treeRemoveRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned ...@@ -369,7 +369,7 @@ Dbtux::treeRemoveRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned
return; return;
} }
i = node.getSide(); i = node.getSide();
selectNode(signal, node, parentLoc); selectNode(node, parentLoc);
} }
} }
...@@ -390,10 +390,7 @@ Dbtux::treeRemoveRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned ...@@ -390,10 +390,7 @@ Dbtux::treeRemoveRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned
* all optional. If 4 are there it changes side. * all optional. If 4 are there it changes side.
*/ */
void void
Dbtux::treeRotateSingle(Signal* signal, Dbtux::treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i)
Frag& frag,
NodeHandle& node,
unsigned i)
{ {
ndbrequire(i <= 1); ndbrequire(i <= 1);
/* /*
...@@ -413,7 +410,7 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -413,7 +410,7 @@ Dbtux::treeRotateSingle(Signal* signal,
*/ */
TupLoc loc3 = node5.getLink(i); TupLoc loc3 = node5.getLink(i);
NodeHandle node3(frag); NodeHandle node3(frag);
selectNode(signal, node3, loc3); selectNode(node3, loc3);
const int bal3 = node3.getBalance(); const int bal3 = node3.getBalance();
/* /*
2 must always be there but is not changed. Thus we mereley check that it 2 must always be there but is not changed. Thus we mereley check that it
...@@ -430,7 +427,7 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -430,7 +427,7 @@ Dbtux::treeRotateSingle(Signal* signal,
NodeHandle node4(frag); NodeHandle node4(frag);
if (loc4 != NullTupLoc) { if (loc4 != NullTupLoc) {
jam(); jam();
selectNode(signal, node4, loc4); selectNode(node4, loc4);
ndbrequire(node4.getSide() == (1 - i) && ndbrequire(node4.getSide() == (1 - i) &&
node4.getLink(2) == loc3); node4.getLink(2) == loc3);
node4.setSide(i); node4.setSide(i);
...@@ -465,7 +462,7 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -465,7 +462,7 @@ Dbtux::treeRotateSingle(Signal* signal,
if (loc0 != NullTupLoc) { if (loc0 != NullTupLoc) {
jam(); jam();
NodeHandle node0(frag); NodeHandle node0(frag);
selectNode(signal, node0, loc0); selectNode(node0, loc0);
node0.setLink(side5, loc3); node0.setLink(side5, loc3);
} else { } else {
jam(); jam();
...@@ -602,7 +599,7 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -602,7 +599,7 @@ Dbtux::treeRotateSingle(Signal* signal,
* *
*/ */
void void
Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i) Dbtux::treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -616,13 +613,13 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i ...@@ -616,13 +613,13 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// level 1 // level 1
TupLoc loc2 = node6.getLink(i); TupLoc loc2 = node6.getLink(i);
NodeHandle node2(frag); NodeHandle node2(frag);
selectNode(signal, node2, loc2); selectNode(node2, loc2);
const int bal2 = node2.getBalance(); const int bal2 = node2.getBalance();
// level 2 // level 2
TupLoc loc4 = node2.getLink(1 - i); TupLoc loc4 = node2.getLink(1 - i);
NodeHandle node4(frag); NodeHandle node4(frag);
selectNode(signal, node4, loc4); selectNode(node4, loc4);
const int bal4 = node4.getBalance(); const int bal4 = node4.getBalance();
ndbrequire(i <= 1); ndbrequire(i <= 1);
...@@ -643,7 +640,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i ...@@ -643,7 +640,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
jam(); jam();
unsigned cnt = tree.m_minOccup - node4.getOccup(); unsigned cnt = tree.m_minOccup - node4.getOccup();
ndbrequire(cnt < node2.getOccup()); ndbrequire(cnt < node2.getOccup());
nodeSlide(signal, node4, node2, cnt, i); nodeSlide(node4, node2, cnt, i);
ndbrequire(node4.getOccup() >= tree.m_minOccup); ndbrequire(node4.getOccup() >= tree.m_minOccup);
ndbrequire(node2.getOccup() != 0); ndbrequire(node2.getOccup() != 0);
} }
...@@ -651,14 +648,14 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i ...@@ -651,14 +648,14 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
if (loc3 != NullTupLoc) { if (loc3 != NullTupLoc) {
jam(); jam();
NodeHandle node3(frag); NodeHandle node3(frag);
selectNode(signal, node3, loc3); selectNode(node3, loc3);
node3.setLink(2, loc2); node3.setLink(2, loc2);
node3.setSide(1 - i); node3.setSide(1 - i);
} }
if (loc5 != NullTupLoc) { if (loc5 != NullTupLoc) {
jam(); jam();
NodeHandle node5(frag); NodeHandle node5(frag);
selectNode(signal, node5, loc5); selectNode(node5, loc5);
node5.setLink(2, node6.m_loc); node5.setLink(2, node6.m_loc);
node5.setSide(i); node5.setSide(i);
} }
...@@ -681,7 +678,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i ...@@ -681,7 +678,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
if (loc0 != NullTupLoc) { if (loc0 != NullTupLoc) {
jam(); jam();
selectNode(signal, node0, loc0); selectNode(node0, loc0);
node0.setLink(side6, loc4); node0.setLink(side6, loc4);
} else { } else {
jam(); jam();
......
...@@ -42,7 +42,7 @@ struct Opt { ...@@ -42,7 +42,7 @@ struct Opt {
CHARSET_INFO* m_cs; CHARSET_INFO* m_cs;
bool m_dups; bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype; NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop; unsigned m_subsubloop;
const char* m_index; const char* m_index;
unsigned m_loop; unsigned m_loop;
bool m_nologging; bool m_nologging;
...@@ -66,7 +66,7 @@ struct Opt { ...@@ -66,7 +66,7 @@ struct Opt {
m_cs(0), m_cs(0),
m_dups(false), m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined), m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4), m_subsubloop(4),
m_index(0), m_index(0),
m_loop(1), m_loop(1),
m_nologging(false), m_nologging(false),
...@@ -79,7 +79,7 @@ struct Opt { ...@@ -79,7 +79,7 @@ struct Opt {
m_seed(0), m_seed(0),
m_subloop(4), m_subloop(4),
m_table(0), m_table(0),
m_threads(4), m_threads(6), // table + 5 indexes
m_v(1) { m_v(1) {
} }
}; };
...@@ -208,6 +208,8 @@ struct Par : public Opt { ...@@ -208,6 +208,8 @@ struct Par : public Opt {
Set& set() const { assert(m_set != 0); return *m_set; } Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr; Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; } Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
unsigned m_lno;
unsigned m_slno;
unsigned m_totrows; unsigned m_totrows;
// value calculation // value calculation
unsigned m_range; unsigned m_range;
...@@ -226,6 +228,8 @@ struct Par : public Opt { ...@@ -226,6 +228,8 @@ struct Par : public Opt {
m_tab(0), m_tab(0),
m_set(0), m_set(0),
m_tmr(0), m_tmr(0),
m_lno(0),
m_slno(0),
m_totrows(m_threads * m_rows), m_totrows(m_threads * m_rows),
m_range(m_rows), m_range(m_rows),
m_pctrange(0), m_pctrange(0),
...@@ -2069,7 +2073,8 @@ pkinsert(Par par) ...@@ -2069,7 +2073,8 @@ pkinsert(Par par)
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
Lst lst; Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock(); set.lock();
if (set.exist(i) || set.pending(i)) { if (set.exist(i) || set.pending(i)) {
set.unlock(); set.unlock();
...@@ -2174,7 +2179,8 @@ pkdelete(Par par) ...@@ -2174,7 +2179,8 @@ pkdelete(Par par)
Lst lst; Lst lst;
bool deadlock = false; bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock(); set.lock();
if (! set.exist(i) || set.pending(i)) { if (! set.exist(i) || set.pending(i)) {
set.unlock(); set.unlock();
...@@ -2398,7 +2404,7 @@ static int ...@@ -2398,7 +2404,7 @@ static int
scanreadindex(Par par, const ITab& itab) scanreadindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) { for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
bset.calc(par); bset.calc(par);
CHK(scanreadindex(par, itab, bset) == 0); CHK(scanreadindex(par, itab, bset) == 0);
...@@ -2645,7 +2651,7 @@ static int ...@@ -2645,7 +2651,7 @@ static int
scanupdateindex(Par par, const ITab& itab) scanupdateindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) { for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
bset.calc(par); bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0); CHK(scanupdateindex(par, itab, bset) == 0);
...@@ -2685,6 +2691,53 @@ readverify(Par par) ...@@ -2685,6 +2691,53 @@ readverify(Par par)
return 0; return 0;
} }
static int
readverifyfull(Par par)
{
par.m_verify = true;
if (par.m_no == 0)
CHK(scanreadtable(par) == 0);
else {
const Tab& tab = par.tab();
unsigned i = par.m_no;
if (i <= tab.m_itabs && useindex(i)) {
const ITab& itab = tab.m_itab[i - 1];
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindex(par, itab, bset) == 0);
}
}
return 0;
}
static int
pkops(Par par)
{
par.m_randomkey = true;
for (unsigned i = 0; i < par.m_subsubloop; i++) {
unsigned sel = urandom(10);
if (par.m_slno % 2 == 0) {
// favor insert
if (sel < 8) {
CHK(pkinsert(par) == 0);
} else if (sel < 9) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
} else {
// favor delete
if (sel < 1) {
CHK(pkinsert(par) == 0);
} else if (sel < 2) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
}
}
return 0;
}
static int static int
pkupdatescanread(Par par) pkupdatescanread(Par par)
{ {
...@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode) ...@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
thr.m_par.m_tab = par.m_tab; thr.m_par.m_tab = par.m_tab;
thr.m_par.m_set = par.m_set; thr.m_par.m_set = par.m_set;
thr.m_par.m_tmr = par.m_tmr; thr.m_par.m_tmr = par.m_tmr;
thr.m_par.m_lno = par.m_lno;
thr.m_par.m_slno = par.m_slno;
thr.m_func = func; thr.m_func = func;
thr.start(); thr.start();
} }
...@@ -2953,8 +3008,8 @@ tbuild(Par par) ...@@ -2953,8 +3008,8 @@ tbuild(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
if (i % 2 == 0) { if (par.m_slno % 2 == 0) {
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
...@@ -2964,9 +3019,9 @@ tbuild(Par par) ...@@ -2964,9 +3019,9 @@ tbuild(Par par)
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
} }
RUNSTEP(par, pkupdate, MT); RUNSTEP(par, pkupdate, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, pkdelete, MT); RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST); RUNSTEP(par, dropindex, ST);
} }
return 0; return 0;
...@@ -2974,6 +3029,22 @@ tbuild(Par par) ...@@ -2974,6 +3029,22 @@ tbuild(Par par)
static int static int
tpkops(Par par) tpkops(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkops, MT);
LL2("rows=" << par.set().count());
RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
static int
tpkopsread(Par par)
{ {
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
...@@ -2982,7 +3053,7 @@ tpkops(Par par) ...@@ -2982,7 +3053,7 @@ tpkops(Par par)
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdatescanread, MT); RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
} }
...@@ -3001,7 +3072,7 @@ tmixedops(Par par) ...@@ -3001,7 +3072,7 @@ tmixedops(Par par)
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, mixedoperations, MT); RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
} }
...@@ -3015,7 +3086,7 @@ tbusybuild(Par par) ...@@ -3015,7 +3086,7 @@ tbusybuild(Par par)
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdateindexbuild, MT); RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
...@@ -3031,7 +3102,7 @@ ttimebuild(Par par) ...@@ -3031,7 +3102,7 @@ ttimebuild(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
t1.on(); t1.on();
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
...@@ -3050,7 +3121,7 @@ ttimemaint(Par par) ...@@ -3050,7 +3121,7 @@ ttimemaint(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
t1.on(); t1.on();
RUNSTEP(par, pkupdate, MT); RUNSTEP(par, pkupdate, MT);
...@@ -3075,7 +3146,7 @@ ttimescan(Par par) ...@@ -3075,7 +3146,7 @@ ttimescan(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
par.m_tmr = &t1; par.m_tmr = &t1;
...@@ -3097,7 +3168,7 @@ ttimepkread(Par par) ...@@ -3097,7 +3168,7 @@ ttimepkread(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
par.m_tmr = &t1; par.m_tmr = &t1;
...@@ -3133,9 +3204,10 @@ struct TCase { ...@@ -3133,9 +3204,10 @@ struct TCase {
static const TCase static const TCase
tcaselist[] = { tcaselist[] = {
TCase("a", tbuild, "index build"), TCase("a", tbuild, "index build"),
TCase("b", tpkops, "pk operations and scan reads"), TCase("b", tpkops, "pk operations"),
TCase("c", tmixedops, "pk operations and scan operations"), TCase("c", tpkopsread, "pk operations and scan reads"),
TCase("d", tbusybuild, "pk operations and index build"), TCase("d", tmixedops, "pk operations and scan operations"),
TCase("e", tbusybuild, "pk operations and index build"),
TCase("t", ttimebuild, "time index build"), TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"), TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"), TCase("v", ttimescan, "time full scan table vs index on pk"),
...@@ -3193,10 +3265,10 @@ runtest(Par par) ...@@ -3193,10 +3265,10 @@ runtest(Par par)
Thr& thr = *g_thrlist[n]; Thr& thr = *g_thrlist[n];
assert(thr.m_thread != 0); assert(thr.m_thread != 0);
} }
for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) { for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
LL1("loop " << l); LL1("loop " << par.m_lno);
if (par.m_seed == 0) if (par.m_seed == 0)
srandom(l); srandom(par.m_lno);
for (unsigned i = 0; i < tcasecount; i++) { for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i]; const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0) if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment