Commit f436f619 authored by unknown's avatar unknown

NDB wl-1533 tux optim: after wl-1942 can remove signal from many methods


ndb/src/kernel/blocks/dbtux/Dbtux.hpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
ndb/test/ndbapi/testOIBasic.cpp:
  wl-1533 tux optim: after wl-1942 can remove signal from many methods
parent e609483c
......@@ -32,7 +32,6 @@
// signal classes
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/TuxContinueB.hpp>
#include <signaldata/BuildIndx.hpp>
#include <signaldata/TupFrag.hpp>
#include <signaldata/AlterIndx.hpp>
#include <signaldata/DropTab.hpp>
......@@ -478,7 +477,7 @@ private:
Uint16 m_numAttrs;
bool m_storeNullKey;
TreeHead m_tree;
TupLoc m_freeLoc; // one node pre-allocated for insert
TupLoc m_freeLoc; // list of free index nodes
DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2];
......@@ -582,24 +581,24 @@ private:
* DbtuxNode.cpp
*/
int allocNode(Signal* signal, NodeHandle& node);
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc);
void insertNode(Signal* signal, NodeHandle& node);
void deleteNode(Signal* signal, NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node);
void selectNode(NodeHandle& node, TupLoc loc);
void insertNode(NodeHandle& node);
void deleteNode(NodeHandle& node);
void setNodePref(NodeHandle& node);
// node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);
void nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList);
void nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos);
void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList);
void nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos);
void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList);
void nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos);
void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i);
void nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);
void nodePushUpScans(NodeHandle& node, unsigned pos);
void nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList);
void nodePopDownScans(NodeHandle& node, unsigned pos);
void nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList);
void nodePushDownScans(NodeHandle& node, unsigned pos);
void nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList);
void nodePopUpScans(NodeHandle& node, unsigned pos);
void nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i);
// scans linked to node
void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList);
void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList);
void moveScanList(Signal* signal, NodeHandle& node, unsigned pos);
void moveScanList(NodeHandle& node, unsigned pos);
void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
......@@ -608,20 +607,20 @@ private:
* DbtuxTree.cpp
*/
// add entry
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent);
void treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i);
void treeAddRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i);
void treeAdd(Frag& frag, TreePos treePos, TreeEnt ent);
void treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent);
void treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i);
void treeAddRebalance(Frag& frag, NodeHandle node, unsigned i);
// remove entry
void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos);
void treeRemoveSemi(Signal* signal, Frag& frag, NodeHandle node, unsigned i);
void treeRemoveLeaf(Signal* signal, Frag& frag, NodeHandle node);
void treeRemoveNode(Signal* signal, Frag& frag, NodeHandle node);
void treeRemoveRebalance(Signal* signal, Frag& frag, NodeHandle node, unsigned i);
void treeRemove(Frag& frag, TreePos treePos);
void treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos);
void treeRemoveSemi(Frag& frag, NodeHandle node, unsigned i);
void treeRemoveLeaf(Frag& frag, NodeHandle node);
void treeRemoveNode(Frag& frag, NodeHandle node);
void treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i);
// rotate
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
void treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i);
/*
* DbtuxScan.cpp
......@@ -633,9 +632,9 @@ private:
void execACCKEYCONF(Signal* signal);
void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal);
void scanFirst(Signal* signal, ScanOpPtr scanPtr);
void scanNext(Signal* signal, ScanOpPtr scanPtr);
bool scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent);
void scanFirst(ScanOpPtr scanPtr);
void scanNext(ScanOpPtr scanPtr);
bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
......@@ -644,9 +643,9 @@ private:
/*
* DbtuxSearch.cpp
*/
void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/*
* DbtuxCmp.cpp
......@@ -670,7 +669,7 @@ private:
PrintPar();
};
void printTree(Signal* signal, Frag& frag, NdbOut& out);
void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
void printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
friend class NdbOut& operator<<(NdbOut&, const TupLoc&);
friend class NdbOut& operator<<(NdbOut&, const TreeEnt&);
friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
......
......@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
strcpy(par.m_path, ".");
par.m_side = 2;
par.m_parent = NullTupLoc;
printNode(signal, frag, out, tree.m_root, par);
printNode(frag, out, tree.m_root, par);
out.m_out->flush();
if (! par.m_ok) {
if (debugFile == 0) {
......@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
}
void
Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
Dbtux::printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
{
if (loc == NullTupLoc) {
par.m_depth = 0;
......@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
}
TreeHead& tree = frag.m_tree;
NodeHandle node(frag);
selectNode(signal, node, loc);
selectNode(node, loc);
out << par.m_path << " " << node << endl;
// check children
PrintPar cpar[2];
......@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
cpar[i].m_side = i;
cpar[i].m_depth = 0;
cpar[i].m_parent = loc;
printNode(signal, frag, out, node.getLink(i), cpar[i]);
printNode(frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) {
par.m_ok = false;
}
......
......@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
switch (opCode) {
case TuxMaintReq::OpAdd:
jam();
searchToAdd(signal, frag, c_searchKey, ent, treePos);
searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
......@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
}
/*
* At most one new node is inserted in the operation. We keep one
* free node pre-allocated so the operation cannot fail.
* At most one new node is inserted in the operation. Pre-allocate
* it so that the operation cannot fail.
*/
if (frag.m_freeLoc == NullTupLoc) {
jam();
......@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
jam();
break;
}
// link to freelist
node.setLink(0, frag.m_freeLoc);
frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc);
}
treeAdd(signal, frag, treePos, ent);
treeAdd(frag, treePos, ent);
break;
case TuxMaintReq::OpRemove:
jam();
searchToRemove(signal, frag, c_searchKey, ent, treePos);
searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
......@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
}
break;
}
treeRemove(signal, frag, treePos);
treeRemove(frag, treePos);
break;
default:
ndbrequire(false);
......
......@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
* Set handle to point to existing node.
*/
void
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
Dbtux::selectNode(NodeHandle& node, TupLoc loc)
{
Frag& frag = node.m_frag;
ndbrequire(loc != NullTupLoc);
......@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
}
/*
* Set handle to point to new node. Uses the pre-allocated node.
* Set handle to point to new node. Uses a pre-allocated node.
*/
void
Dbtux::insertNode(Signal* signal, NodeHandle& node)
Dbtux::insertNode(NodeHandle& node)
{
Frag& frag = node.m_frag;
TupLoc loc = frag.m_freeLoc;
frag.m_freeLoc = NullTupLoc;
selectNode(signal, node, loc);
// unlink from freelist
selectNode(node, frag.m_freeLoc);
frag.m_freeLoc = node.getLink(0);
new (node.m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
......@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node)
}
/*
* Delete existing node.
* Delete existing node. Simply put it on the freelist.
*/
void
Dbtux::deleteNode(Signal* signal, NodeHandle& node)
Dbtux::deleteNode(NodeHandle& node)
{
Frag& frag = node.m_frag;
ndbrequire(node.getOccup() == 0);
TupLoc loc = node.m_loc;
Uint32 pageId = loc.getPageId();
Uint32 pageOffset = loc.getPageOffset();
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node);
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
jamEntry();
// invalidate handle and storage
// link to freelist
node.setLink(0, frag.m_freeLoc);
frag.m_freeLoc = node.m_loc;
// invalidate the handle
node.m_loc = NullTupLoc;
node.m_node = 0;
}
......@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
* attribute headers for now. XXX use null mask instead
*/
void
Dbtux::setNodePref(Signal* signal, NodeHandle& node)
Dbtux::setNodePref(NodeHandle& node)
{
const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree;
......@@ -121,7 +118,7 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
* Add list of scans at the new entry.
*/
void
Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
......@@ -129,7 +126,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
ndbrequire(occup < tree.m_maxOccup && pos <= occup);
// fix old scans
if (node.getNodeScan() != RNIL)
nodePushUpScans(signal, node, pos);
nodePushUpScans(node, pos);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
......@@ -146,11 +143,11 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
addScanList(node, pos, scanList);
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(signal, node);
setNodePref(node);
}
void
Dbtux::nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos)
Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
......@@ -187,7 +184,7 @@ Dbtux::nodePushUpScans(Signal* signal, NodeHandle& node, unsigned pos)
* else moved forward.
*/
void
Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
......@@ -196,12 +193,12 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent,
if (node.getNodeScan() != RNIL) {
// remove or move scans at this position
if (scanList == 0)
moveScanList(signal, node, pos);
moveScanList(node, pos);
else
removeScanList(node, pos, *scanList);
// fix other scans
if (node.getNodeScan() != RNIL)
nodePopDownScans(signal, node, pos);
nodePopDownScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
......@@ -216,11 +213,11 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent,
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(signal, node);
setNodePref(node);
}
void
Dbtux::nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos)
Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
......@@ -258,7 +255,7 @@ Dbtux::nodePopDownScans(Signal* signal, NodeHandle& node, unsigned pos)
* Return list of scans at the removed position 0.
*/
void
Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
......@@ -269,7 +266,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
removeScanList(node, 0, scanList);
// fix other scans
if (node.getNodeScan() != RNIL)
nodePushDownScans(signal, node, pos);
nodePushDownScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
......@@ -285,11 +282,11 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
setNodePref(node);
}
void
Dbtux::nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos)
Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
......@@ -328,7 +325,7 @@ Dbtux::nodePushDownScans(Signal* signal, NodeHandle& node, unsigned pos)
* Move scans at removed entry and add scans at the new entry.
*/
void
Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
......@@ -336,10 +333,10 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, U
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears
moveScanList(signal, node, pos);
moveScanList(node, pos);
// fix other scans
if (node.getNodeScan() != RNIL)
nodePopUpScans(signal, node, pos);
nodePopUpScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
......@@ -358,11 +355,11 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent, U
addScanList(node, 0, scanList);
// fix prefix
if (true)
setNodePref(signal, node);
setNodePref(node);
}
void
Dbtux::nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos)
Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
......@@ -392,7 +389,7 @@ Dbtux::nodePopUpScans(Signal* signal, NodeHandle& node, unsigned pos)
* (i=0) or after the max (i=1). Expensive but not often used.
*/
void
Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i)
Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i)
{
Frag& frag = dstNode.m_frag;
TreeHead& tree = frag.m_tree;
......@@ -400,8 +397,8 @@ Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsig
while (cnt != 0) {
TreeEnt ent;
Uint32 scanList = RNIL;
nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
cnt--;
}
}
......@@ -476,7 +473,7 @@ Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
* method scanNext().
*/
void
Dbtux::moveScanList(Signal* signal, NodeHandle& node, unsigned pos)
Dbtux::moveScanList(NodeHandle& node, unsigned pos)
{
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
......@@ -494,7 +491,7 @@ Dbtux::moveScanList(Signal* signal, NodeHandle& node, unsigned pos)
debugOut << "At pos=" << pos << " " << node << endl;
}
#endif
scanNext(signal, scanPtr);
scanNext(scanPtr);
ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
}
scanPtr.i = nextPtrI;
......
......@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jam();
const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandle node(frag);
selectNode(signal, node, loc);
selectNode(node, loc);
unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc;
}
......@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::First) {
jam();
// search is done only once in single range scan
scanFirst(signal, scanPtr);
scanFirst(scanPtr);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "First scan " << scanPtr.i << " " << scan << endl;
......@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::Next) {
jam();
// look for next
scanNext(signal, scanPtr);
scanNext(scanPtr);
}
// for reading tuple key in Current or Locked state
Data pkData = c_dataBuffer;
......@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
* by scanNext.
*/
void
Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
Dbtux::scanFirst(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
}
// search for scan start position
TreePos treePos;
searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
if (treePos.m_loc == NullTupLoc) {
// empty tree
jam();
......@@ -710,7 +710,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Next;
// link the scan to node found
NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc);
selectNode(node, treePos.m_loc);
linkScan(node, scanPtr);
}
......@@ -730,7 +730,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
* re-organizations need not worry about scan direction.
*/
void
Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
Dbtux::scanNext(ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......@@ -739,20 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
debugOut << "Next in scan " << scanPtr.i << " " << scan << endl;
}
#endif
if (scan.m_state == ScanOp::Locked) {
jam();
// version of a tuple locked by us cannot disappear (assert only)
ndbassert(false);
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
scan.m_state = ScanOp::Current;
}
// cannot be moved away from tuple we have locked
ndbrequire(scan.m_state != ScanOp::Locked);
// set up index keys for this operation
setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer
......@@ -768,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
TreePos pos = scan.m_scanPos;
// get and remember original node
NodeHandle origNode(frag);
selectNode(signal, origNode, pos.m_loc);
selectNode(origNode, pos.m_loc);
ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop
NodeHandle node = origNode;
......@@ -785,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
}
if (node.m_loc != pos.m_loc) {
jam();
selectNode(signal, node, pos.m_loc);
selectNode(node, pos.m_loc);
}
if (pos.m_dir == 4) {
// coming down from parent proceed to left child
......@@ -833,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break;
}
// can we see it
if (! scanVisible(signal, scanPtr, ent)) {
if (! scanVisible(scanPtr, ent)) {
jam();
continue;
}
......@@ -896,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
* which are not analyzed or handled yet.
*/
bool
Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent)
Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
{
const ScanOp& scan = *scanPtr.p;
const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......
......@@ -25,7 +25,7 @@
* TODO optimize for initial equal attrs in node min/max
*/
void
Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
......@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc);
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
unsigned start = 0;
......@@ -164,7 +164,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
* to it.
*/
void
Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
......@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc);
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
unsigned start = 0;
......@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
* Similar to searchToAdd.
*/
void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
......@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc);
selectNode(currNode, currNode.m_loc);
int ret;
// compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
......
This diff is collapsed.
......@@ -42,7 +42,7 @@ struct Opt {
CHARSET_INFO* m_cs;
bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop;
unsigned m_subsubloop;
const char* m_index;
unsigned m_loop;
bool m_nologging;
......@@ -66,7 +66,7 @@ struct Opt {
m_cs(0),
m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4),
m_subsubloop(4),
m_index(0),
m_loop(1),
m_nologging(false),
......@@ -79,7 +79,7 @@ struct Opt {
m_seed(0),
m_subloop(4),
m_table(0),
m_threads(4),
m_threads(6), // table + 5 indexes
m_v(1) {
}
};
......@@ -208,6 +208,8 @@ struct Par : public Opt {
Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
unsigned m_lno;
unsigned m_slno;
unsigned m_totrows;
// value calculation
unsigned m_range;
......@@ -226,6 +228,8 @@ struct Par : public Opt {
m_tab(0),
m_set(0),
m_tmr(0),
m_lno(0),
m_slno(0),
m_totrows(m_threads * m_rows),
m_range(m_rows),
m_pctrange(0),
......@@ -2069,7 +2073,8 @@ pkinsert(Par par)
CHK(con.startTransaction() == 0);
Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j);
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
if (set.exist(i) || set.pending(i)) {
set.unlock();
......@@ -2174,7 +2179,8 @@ pkdelete(Par par)
Lst lst;
bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j);
unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock();
if (! set.exist(i) || set.pending(i)) {
set.unlock();
......@@ -2398,7 +2404,7 @@ static int
scanreadindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) {
for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanreadindex(par, itab, bset) == 0);
......@@ -2645,7 +2651,7 @@ static int
scanupdateindex(Par par, const ITab& itab)
{
const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) {
for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows);
bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0);
......@@ -2685,6 +2691,53 @@ readverify(Par par)
return 0;
}
static int
readverifyfull(Par par)
{
par.m_verify = true;
if (par.m_no == 0)
CHK(scanreadtable(par) == 0);
else {
const Tab& tab = par.tab();
unsigned i = par.m_no;
if (i <= tab.m_itabs && useindex(i)) {
const ITab& itab = tab.m_itab[i - 1];
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindex(par, itab, bset) == 0);
}
}
return 0;
}
static int
pkops(Par par)
{
par.m_randomkey = true;
for (unsigned i = 0; i < par.m_subsubloop; i++) {
unsigned sel = urandom(10);
if (par.m_slno % 2 == 0) {
// favor insert
if (sel < 8) {
CHK(pkinsert(par) == 0);
} else if (sel < 9) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
} else {
// favor delete
if (sel < 1) {
CHK(pkinsert(par) == 0);
} else if (sel < 2) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
}
}
return 0;
}
static int
pkupdatescanread(Par par)
{
......@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
thr.m_par.m_tab = par.m_tab;
thr.m_par.m_set = par.m_set;
thr.m_par.m_tmr = par.m_tmr;
thr.m_par.m_lno = par.m_lno;
thr.m_par.m_slno = par.m_slno;
thr.m_func = func;
thr.start();
}
......@@ -2953,8 +3008,8 @@ tbuild(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
if (i % 2 == 0) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
if (par.m_slno % 2 == 0) {
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkinsert, MT);
......@@ -2964,9 +3019,9 @@ tbuild(Par par)
RUNSTEP(par, invalidateindex, MT);
}
RUNSTEP(par, pkupdate, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST);
}
return 0;
......@@ -2974,6 +3029,22 @@ tbuild(Par par)
static int
tpkops(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkops, MT);
LL2("rows=" << par.set().count());
RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
static int
tpkopsread(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
......@@ -2982,7 +3053,7 @@ tpkops(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverify, ST);
}
......@@ -3001,7 +3072,7 @@ tmixedops(Par par)
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST);
}
......@@ -3015,7 +3086,7 @@ tbusybuild(Par par)
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
......@@ -3031,7 +3102,7 @@ ttimebuild(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, createindex, ST);
......@@ -3050,7 +3121,7 @@ ttimemaint(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, pkupdate, MT);
......@@ -3075,7 +3146,7 @@ ttimescan(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
......@@ -3097,7 +3168,7 @@ ttimepkread(Par par)
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST);
par.m_tmr = &t1;
......@@ -3133,9 +3204,10 @@ struct TCase {
static const TCase
tcaselist[] = {
TCase("a", tbuild, "index build"),
TCase("b", tpkops, "pk operations and scan reads"),
TCase("c", tmixedops, "pk operations and scan operations"),
TCase("d", tbusybuild, "pk operations and index build"),
TCase("b", tpkops, "pk operations"),
TCase("c", tpkopsread, "pk operations and scan reads"),
TCase("d", tmixedops, "pk operations and scan operations"),
TCase("e", tbusybuild, "pk operations and index build"),
TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"),
......@@ -3193,10 +3265,10 @@ runtest(Par par)
Thr& thr = *g_thrlist[n];
assert(thr.m_thread != 0);
}
for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) {
LL1("loop " << l);
for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
LL1("loop " << par.m_lno);
if (par.m_seed == 0)
srandom(l);
srandom(par.m_lno);
for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment