Commit 3de70d46 authored by unknown's avatar unknown

tux optim 6 - remove node cache

parent b5d5d741
...@@ -102,11 +102,6 @@ private: ...@@ -102,11 +102,6 @@ private:
// sizes are in words (Uint32) // sizes are in words (Uint32)
static const unsigned MaxIndexFragments = 2 * NO_OF_FRAG_PER_NODE; static const unsigned MaxIndexFragments = 2 * NO_OF_FRAG_PER_NODE;
static const unsigned MaxIndexAttributes = MAX_ATTRIBUTES_IN_INDEX; static const unsigned MaxIndexAttributes = MAX_ATTRIBUTES_IN_INDEX;
#ifdef VM_TRACE
static const unsigned MaxNodeHandles = 10000; // More space for printTree
#else
static const unsigned MaxNodeHandles = 128; // enough for 1 operation
#endif
static const unsigned MaxAttrDataSize = 2048; static const unsigned MaxAttrDataSize = 2048;
public: public:
static const unsigned DescPageSize = 256; static const unsigned DescPageSize = 256;
...@@ -179,7 +174,7 @@ private: ...@@ -179,7 +174,7 @@ private:
}; };
/* /*
* There is no const variable NullTupLoc since the compiler may not be * There is no const member NullTupLoc since the compiler may not be
* able to optimize it to TupLoc() constants. Instead null values are * able to optimize it to TupLoc() constants. Instead null values are
* constructed on the stack with TupLoc(). * constructed on the stack with TupLoc().
*/ */
...@@ -462,8 +457,7 @@ private: ...@@ -462,8 +457,7 @@ private:
Uint16 m_descOff; Uint16 m_descOff;
Uint16 m_numAttrs; Uint16 m_numAttrs;
TreeHead m_tree; TreeHead m_tree;
Uint32 m_nodeList; // node cache of current operation TupLoc m_freeLoc; // one node pre-allocated for insert
Uint32 m_nodeFree; // one node pre-allocated for insert
DLList<ScanOp> m_scanList; // current scans on this fragment DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI; Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2]; Uint32 m_tupTableFragPtrI[2];
...@@ -498,9 +492,8 @@ private: ...@@ -498,9 +492,8 @@ private:
// node handles // node handles
/* /*
* A tree operation builds a cache of accessed nodes. This allows * A node handle is a reference to a tree node in TUP. It is used to
* different implementations of index memory access. The cache is * operate on the node. Node handles are allocated on the stack.
* committed and released at the end of the operation.
*/ */
struct NodeHandle; struct NodeHandle;
friend struct NodeHandle; friend struct NodeHandle;
...@@ -509,11 +502,9 @@ private: ...@@ -509,11 +502,9 @@ private:
TupLoc m_loc; // physical node address TupLoc m_loc; // physical node address
TreeNode* m_node; // pointer to node storage TreeNode* m_node; // pointer to node storage
AccSize m_acc; // accessed size AccSize m_acc; // accessed size
union {
Uint32 m_next; // next active node under fragment
Uint32 nextPool;
};
NodeHandle(Frag& frag); NodeHandle(Frag& frag);
NodeHandle(const NodeHandle& node);
NodeHandle& operator=(const NodeHandle& node);
// getters // getters
TupLoc getLink(unsigned i); TupLoc getLink(unsigned i);
unsigned getChilds(); // cannot spell unsigned getChilds(); // cannot spell
...@@ -521,20 +512,19 @@ private: ...@@ -521,20 +512,19 @@ private:
unsigned getOccup(); unsigned getOccup();
int getBalance(); int getBalance();
Uint32 getNodeScan(); Uint32 getNodeScan();
Data getPref(unsigned i);
TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i);
// setters // setters
void setLink(unsigned i, TupLoc loc); void setLink(unsigned i, TupLoc loc);
void setSide(unsigned i); void setSide(unsigned i);
void setOccup(unsigned n); void setOccup(unsigned n);
void setBalance(int b); void setBalance(int b);
void setNodeScan(Uint32 scanPtrI); void setNodeScan(Uint32 scanPtrI);
// access other parts of the node
Data getPref(unsigned i);
TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i);
// for ndbrequire and ndbassert // for ndbrequire and ndbassert
void progError(int line, int cause, const char* file); void progError(int line, int cause, const char* file);
}; };
typedef Ptr<NodeHandle> NodeHandlePtr;
ArrayPool<NodeHandle> c_nodeHandlePool;
// parameters for methods // parameters for methods
...@@ -565,17 +555,6 @@ private: ...@@ -565,17 +555,6 @@ private:
ReadPar(); ReadPar();
}; };
/*
* Node storage operation.
*/
struct StorePar {
TupStoreTh::OpCode m_opCode;// operation code
unsigned m_offset; // data offset in words
unsigned m_size; // number of words
Uint32 m_errorCode; // terrorCode from TUP
StorePar();
};
/* /*
* Tree search for entry. * Tree search for entry.
*/ */
...@@ -649,15 +628,12 @@ private: ...@@ -649,15 +628,12 @@ private:
/* /*
* DbtuxNode.cpp * DbtuxNode.cpp
*/ */
void seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr); int allocNode(Signal* signal, NodeHandle& node);
void preallocNode(Signal* signal, Frag& frag, Uint32& errorCode); void accessNode(Signal* signal, NodeHandle& node, AccSize acc);
void findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc); void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc);
void selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc, AccSize acc); void insertNode(Signal* signal, NodeHandle& node, AccSize acc);
void insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc); void deleteNode(Signal* signal, NodeHandle& node);
void deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr);
void accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
void setNodePref(Signal* signal, NodeHandle& node, unsigned i); void setNodePref(Signal* signal, NodeHandle& node, unsigned i);
void commitNodes(Signal* signal, Frag& frag, bool updateOk);
// node operations // node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent); void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
...@@ -675,8 +651,8 @@ private: ...@@ -675,8 +651,8 @@ private:
void treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& treePos); void treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& treePos);
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent); void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeRemove(Signal* signal, Frag& frag, TreePos treePos); void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i); void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i); void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
/* /*
* DbtuxScan.cpp * DbtuxScan.cpp
...@@ -1054,8 +1030,7 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) : ...@@ -1054,8 +1030,7 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) :
m_descOff(0), m_descOff(0),
m_numAttrs(ZNIL), m_numAttrs(ZNIL),
m_tree(), m_tree(),
m_nodeList(RNIL), m_freeLoc(),
m_nodeFree(RNIL),
m_scanList(scanOpPool), m_scanList(scanOpPool),
m_tupIndexFragPtrI(RNIL) m_tupIndexFragPtrI(RNIL)
{ {
...@@ -1086,11 +1061,29 @@ Dbtux::NodeHandle::NodeHandle(Frag& frag) : ...@@ -1086,11 +1061,29 @@ Dbtux::NodeHandle::NodeHandle(Frag& frag) :
m_frag(frag), m_frag(frag),
m_loc(), m_loc(),
m_node(0), m_node(0),
m_acc(AccNone), m_acc(AccNone)
m_next(RNIL)
{ {
} }
inline
Dbtux::NodeHandle::NodeHandle(const NodeHandle& node) :
m_frag(node.m_frag),
m_loc(node.m_loc),
m_node(node.m_node),
m_acc(node.m_acc)
{
}
inline Dbtux::NodeHandle&
Dbtux::NodeHandle::operator=(const NodeHandle& node)
{
ndbassert(&m_frag == &node.m_frag);
m_loc = node.m_loc;
m_node = node.m_node;
m_acc = node.m_acc;
return *this;
}
inline Dbtux::TupLoc inline Dbtux::TupLoc
Dbtux::NodeHandle::getLink(unsigned i) Dbtux::NodeHandle::getLink(unsigned i)
{ {
...@@ -1128,37 +1121,6 @@ Dbtux::NodeHandle::getNodeScan() ...@@ -1128,37 +1121,6 @@ Dbtux::NodeHandle::getNodeScan()
return m_node->m_nodeScan; return m_node->m_nodeScan;
} }
inline Dbtux::Data
Dbtux::NodeHandle::getPref(unsigned i)
{
TreeHead& tree = m_frag.m_tree;
ndbrequire(m_acc >= AccPref && i <= 1);
return tree.getPref(m_node, i);
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getEnt(unsigned pos)
{
TreeHead& tree = m_frag.m_tree;
TreeEnt* entList = tree.getEntList(m_node);
const unsigned occup = m_node->m_occup;
ndbrequire(pos < occup);
if (pos == 0 || pos == occup - 1) {
ndbrequire(m_acc >= AccPref)
} else {
ndbrequire(m_acc == AccFull)
}
return entList[(1 + pos) % occup];
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getMinMax(unsigned i)
{
const unsigned occup = m_node->m_occup;
ndbrequire(i <= 1 && occup != 0);
return getEnt(i == 0 ? 0 : occup - 1);
}
inline void inline void
Dbtux::NodeHandle::setLink(unsigned i, TupLoc loc) Dbtux::NodeHandle::setLink(unsigned i, TupLoc loc)
{ {
...@@ -1195,6 +1157,37 @@ Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI) ...@@ -1195,6 +1157,37 @@ Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI)
m_node->m_nodeScan = scanPtrI; m_node->m_nodeScan = scanPtrI;
} }
inline Dbtux::Data
Dbtux::NodeHandle::getPref(unsigned i)
{
TreeHead& tree = m_frag.m_tree;
ndbrequire(m_acc >= AccPref && i <= 1);
return tree.getPref(m_node, i);
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getEnt(unsigned pos)
{
TreeHead& tree = m_frag.m_tree;
TreeEnt* entList = tree.getEntList(m_node);
const unsigned occup = m_node->m_occup;
ndbrequire(pos < occup);
if (pos == 0 || pos == occup - 1) {
ndbrequire(m_acc >= AccPref)
} else {
ndbrequire(m_acc == AccFull)
}
return entList[(1 + pos) % occup];
}
inline Dbtux::TreeEnt
Dbtux::NodeHandle::getMinMax(unsigned i)
{
const unsigned occup = m_node->m_occup;
ndbrequire(i <= 1 && occup != 0);
return getEnt(i == 0 ? 0 : occup - 1);
}
// parameters for methods // parameters for methods
inline inline
...@@ -1217,15 +1210,6 @@ Dbtux::ReadPar::ReadPar() : ...@@ -1217,15 +1210,6 @@ Dbtux::ReadPar::ReadPar() :
{ {
} }
inline
Dbtux::StorePar::StorePar() :
m_opCode(TupStoreTh::OpUndefined),
m_offset(0),
m_size(0),
m_errorCode(0)
{
}
inline inline
Dbtux::SearchPar::SearchPar() : Dbtux::SearchPar::SearchPar() :
m_data(0), m_data(0),
......
...@@ -106,13 +106,11 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -106,13 +106,11 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
signal->theData[1] = 1; signal->theData[1] = 1;
execDUMP_STATE_ORD(signal); execDUMP_STATE_ORD(signal);
if (debugFile != 0) { if (debugFile != 0) {
commitNodes(signal, frag, false);
printTree(signal, frag, debugOut); printTree(signal, frag, debugOut);
} }
} }
ndbrequire(false); ndbrequire(false);
} }
commitNodes(signal, frag, false);
} }
void void
...@@ -123,9 +121,9 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -123,9 +121,9 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
return; return;
} }
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
NodeHandlePtr nodePtr; NodeHandle node(frag);
selectNode(signal, frag, nodePtr, loc, AccFull); selectNode(signal, node, loc, AccFull);
out << par.m_path << " " << *nodePtr.p << endl; out << par.m_path << " " << node << endl;
// check children // check children
PrintPar cpar[2]; PrintPar cpar[2];
ndbrequire(strlen(par.m_path) + 1 < sizeof(par.m_path)); ndbrequire(strlen(par.m_path) + 1 < sizeof(par.m_path));
...@@ -134,56 +132,56 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -134,56 +132,56 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
cpar[i].m_side = i; cpar[i].m_side = i;
cpar[i].m_depth = 0; cpar[i].m_depth = 0;
cpar[i].m_parent = loc; cpar[i].m_parent = loc;
printNode(signal, frag, out, nodePtr.p->getLink(i), cpar[i]); printNode(signal, frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) { if (! cpar[i].m_ok) {
par.m_ok = false; par.m_ok = false;
} }
} }
// check child-parent links // check child-parent links
if (nodePtr.p->getLink(2) != par.m_parent) { if (node.getLink(2) != par.m_parent) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "parent loc " << hex << nodePtr.p->getLink(2); out << "parent loc " << hex << node.getLink(2);
out << " should be " << hex << par.m_parent << endl; out << " should be " << hex << par.m_parent << endl;
} }
if (nodePtr.p->getSide() != par.m_side) { if (node.getSide() != par.m_side) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "side " << dec << nodePtr.p->getSide(); out << "side " << dec << node.getSide();
out << " should be " << dec << par.m_side << endl; out << " should be " << dec << par.m_side << endl;
} }
// check balance // check balance
const int balance = -cpar[0].m_depth + cpar[1].m_depth; const int balance = -cpar[0].m_depth + cpar[1].m_depth;
if (nodePtr.p->getBalance() != balance) { if (node.getBalance() != balance) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "balance " << nodePtr.p->getBalance(); out << "balance " << node.getBalance();
out << " should be " << balance << endl; out << " should be " << balance << endl;
} }
if (abs(nodePtr.p->getBalance()) > 1) { if (abs(node.getBalance()) > 1) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "balance " << nodePtr.p->getBalance() << " is invalid" << endl; out << "balance " << node.getBalance() << " is invalid" << endl;
} }
// check occupancy // check occupancy
if (nodePtr.p->getOccup() > tree.m_maxOccup) { if (node.getOccup() > tree.m_maxOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "occupancy " << nodePtr.p->getOccup(); out << "occupancy " << node.getOccup();
out << " greater than max " << tree.m_maxOccup << endl; out << " greater than max " << tree.m_maxOccup << endl;
} }
// check for occupancy of interior node // check for occupancy of interior node
if (nodePtr.p->getChilds() == 2 && nodePtr.p->getOccup() < tree.m_minOccup) { if (node.getChilds() == 2 && node.getOccup() < tree.m_minOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "occupancy " << nodePtr.p->getOccup() << " of interior node"; out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl; out << " less than min " << tree.m_minOccup << endl;
} }
// check missed half-leaf/leaf merge // check missed half-leaf/leaf merge
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
if (nodePtr.p->getLink(i) != NullTupLoc && if (node.getLink(i) != NullTupLoc &&
nodePtr.p->getLink(1 - i) == NullTupLoc && node.getLink(1 - i) == NullTupLoc &&
nodePtr.p->getOccup() + cpar[i].m_occup <= tree.m_maxOccup) { node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << " *** "; out << par.m_path << " *** ";
out << "missed merge with child " << i << endl; out << "missed merge with child " << i << endl;
...@@ -191,7 +189,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -191,7 +189,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
} }
// return values // return values
par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth); par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth);
par.m_occup = nodePtr.p->getOccup(); par.m_occup = node.getOccup();
} }
NdbOut& NdbOut&
......
...@@ -178,7 +178,6 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal) ...@@ -178,7 +178,6 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
c_fragPool.setSize(nFragment); c_fragPool.setSize(nFragment);
c_descPagePool.setSize(nDescPage); c_descPagePool.setSize(nDescPage);
c_fragOpPool.setSize(MaxIndexFragments); c_fragOpPool.setSize(MaxIndexFragments);
c_nodeHandlePool.setSize(MaxNodeHandles);
c_scanOpPool.setSize(nScanOp); c_scanOpPool.setSize(nScanOp);
c_scanBoundPool.setSize(nScanBoundWords); c_scanBoundPool.setSize(nScanBoundWords);
/* /*
......
...@@ -72,7 +72,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -72,7 +72,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
} }
ndbrequire(fragPtr.i != RNIL); ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p; Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
// set up index entry // set up index entry
TreeEnt ent; TreeEnt ent;
ent.m_tupAddr = req->tupAddr; ent.m_tupAddr = req->tupAddr;
...@@ -143,17 +142,18 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -143,17 +142,18 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
} }
/* /*
* At most one new node is inserted in the operation. We keep one * At most one new node is inserted in the operation. We keep one
* free node pre-allocated so the operation cannot fail. This also * free node pre-allocated so the operation cannot fail.
* gives a real TupAddr for links to the new node.
*/ */
if (frag.m_nodeFree == RNIL) { if (frag.m_freeLoc == NullTupLoc) {
jam(); jam();
preallocNode(signal, frag, req->errorCode); NodeHandle node(frag);
req->errorCode = allocNode(signal, node);
if (req->errorCode != 0) { if (req->errorCode != 0) {
jam(); jam();
break; break;
} }
ndbrequire(frag.m_nodeFree != RNIL); frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc);
} }
treeAdd(signal, frag, treePos, ent); treeAdd(signal, frag, treePos, ent);
break; break;
...@@ -175,7 +175,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -175,7 +175,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break; break;
} }
// commit and release nodes // commit and release nodes
commitNodes(signal, frag, req->errorCode == 0);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugTree) { if (debugFlags & DebugTree) {
printTree(signal, frag, debugOut); printTree(signal, frag, debugOut);
......
...@@ -316,12 +316,6 @@ Dbtux::dropIndex(Signal* signal, IndexPtr indexPtr, Uint32 senderRef, Uint32 sen ...@@ -316,12 +316,6 @@ Dbtux::dropIndex(Signal* signal, IndexPtr indexPtr, Uint32 senderRef, Uint32 sen
unsigned i = --indexPtr.p->m_numFrags; unsigned i = --indexPtr.p->m_numFrags;
FragPtr fragPtr; FragPtr fragPtr;
c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]); c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]);
Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
if (frag.m_nodeFree != RNIL) {
c_nodeHandlePool.release(frag.m_nodeFree);
frag.m_nodeFree = RNIL;
}
c_fragPool.release(fragPtr); c_fragPool.release(fragPtr);
// the real time break is not used for anything currently // the real time break is not used for anything currently
signal->theData[0] = TuxContinueB::DropIndex; signal->theData[0] = TuxContinueB::DropIndex;
......
...@@ -18,161 +18,94 @@ ...@@ -18,161 +18,94 @@
#include "Dbtux.hpp" #include "Dbtux.hpp"
/* /*
* Node handles. * Allocate index node in TUP.
*
* Temporary version between "cache" and "pointer" implementations.
*/ */
int
// Dbtux Dbtux::allocNode(Signal* signal, NodeHandle& node)
void
Dbtux::seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
{
if (! c_nodeHandlePool.seize(nodePtr)) {
jam();
return;
}
new (nodePtr.p) NodeHandle(frag);
nodePtr.p->m_next = frag.m_nodeList;
frag.m_nodeList = nodePtr.i;
}
void
Dbtux::preallocNode(Signal* signal, Frag& frag, Uint32& errorCode)
{ {
ndbrequire(frag.m_nodeFree == RNIL); Frag& frag = node.m_frag;
NodeHandlePtr nodePtr;
seizeNode(signal, frag, nodePtr);
ndbrequire(nodePtr.i != RNIL);
// remove from cache XXX ugly
frag.m_nodeFree = frag.m_nodeList;
frag.m_nodeList = nodePtr.p->m_next;
// alloc index node in TUP
Uint32 pageId = NullTupLoc.m_pageId; Uint32 pageId = NullTupLoc.m_pageId;
Uint32 pageOffset = NullTupLoc.m_pageOffset; Uint32 pageOffset = NullTupLoc.m_pageOffset;
Uint32* node32 = 0; Uint32* node32 = 0;
errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
if (errorCode != 0) { if (errorCode == 0) {
jam(); jam();
c_nodeHandlePool.release(nodePtr); node.m_loc = TupLoc(pageId, pageOffset);
frag.m_nodeFree = RNIL; node.m_node = reinterpret_cast<TreeNode*>(node32);
return; node.m_acc = AccNone;
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
} }
nodePtr.p->m_loc = TupLoc(pageId, pageOffset); return errorCode;
nodePtr.p->m_node = reinterpret_cast<TreeNode*>(node32);
ndbrequire(nodePtr.p->m_loc != NullTupLoc && nodePtr.p->m_node != 0);
new (nodePtr.p->m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
TreeNode* node = nodePtr.p->m_node;
memset(tree.getPref(node, 0), 0xa2, tree.m_prefSize << 2);
memset(tree.getPref(node, 1), 0xa2, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
} }
/* /*
* Find node in the cache. XXX too slow, use direct links instead * Access more of the node.
*/ */
void void
Dbtux::findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc) Dbtux::accessNode(Signal* signal, NodeHandle& node, AccSize acc)
{ {
NodeHandlePtr tmpPtr; ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
tmpPtr.i = frag.m_nodeList; if (node.m_acc >= acc)
while (tmpPtr.i != RNIL) { return;
jam(); // XXX could do prefetch
c_nodeHandlePool.getPtr(tmpPtr); node.m_acc = acc;
if (tmpPtr.p->m_loc == loc) {
jam();
nodePtr = tmpPtr;
return;
}
tmpPtr.i = tmpPtr.p->m_next;
}
nodePtr.i = RNIL;
nodePtr.p = 0;
} }
/* /*
* Get handle for existing node. * Set handle to point to existing node.
*/ */
void void
Dbtux::selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupLoc loc, AccSize acc) Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc)
{ {
ndbrequire(loc != NullTupLoc && acc > AccNone); Frag& frag = node.m_frag;
NodeHandlePtr tmpPtr; ndbrequire(loc != NullTupLoc);
// search in cache Uint32 pageId = loc.m_pageId;
findNode(signal, frag, tmpPtr, loc); Uint32 pageOffset = loc.m_pageOffset;
if (tmpPtr.i == RNIL) { Uint32* node32 = 0;
jam(); c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
// add new node node.m_loc = loc;
seizeNode(signal, frag, tmpPtr); node.m_node = reinterpret_cast<TreeNode*>(node32);
ndbrequire(tmpPtr.i != RNIL); node.m_acc = AccNone;
tmpPtr.p->m_loc = loc; ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
Uint32 pageId = loc.m_pageId; accessNode(signal, node, acc);
Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = 0;
c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
tmpPtr.p->m_node = reinterpret_cast<TreeNode*>(node32);
ndbrequire(tmpPtr.p->m_loc != NullTupLoc && tmpPtr.p->m_node != 0);
}
if (tmpPtr.p->m_acc < acc) {
jam();
accessNode(signal, frag, tmpPtr, acc);
}
nodePtr = tmpPtr;
} }
/* /*
* Create new node in the cache using the pre-allocated node. * Set handle to point to new node. Uses the pre-allocated node.
*/ */
void void
Dbtux::insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc) Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc)
{ {
ndbrequire(acc > AccNone); Frag& frag = node.m_frag;
NodeHandlePtr tmpPtr; TupLoc loc = frag.m_freeLoc;
// use the pre-allocated node frag.m_freeLoc = NullTupLoc;
tmpPtr.i = frag.m_nodeFree; selectNode(signal, node, loc, acc);
frag.m_nodeFree = RNIL; new (node.m_node) TreeNode();
c_nodeHandlePool.getPtr(tmpPtr); #ifdef VM_TRACE
// move it to the cache TreeHead& tree = frag.m_tree;
tmpPtr.p->m_next = frag.m_nodeList; memset(tree.getPref(node.m_node, 0), 0xa2, tree.m_prefSize << 2);
frag.m_nodeList = tmpPtr.i; memset(tree.getPref(node.m_node, 1), 0xa2, tree.m_prefSize << 2);
tmpPtr.p->m_acc = acc; TreeEnt* entList = tree.getEntList(node.m_node);
nodePtr = tmpPtr; memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
} }
/* /*
* Delete existing node. * Delete existing node.
*/ */
void void
Dbtux::deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr) Dbtux::deleteNode(Signal* signal, NodeHandle& node)
{ {
NodeHandlePtr tmpPtr = nodePtr; Frag& frag = node.m_frag;
ndbrequire(tmpPtr.p->getOccup() == 0); ndbrequire(node.getOccup() == 0);
Uint32 pageId = tmpPtr.p->m_loc.m_pageId; TupLoc loc = node.m_loc;
Uint32 pageOffset = tmpPtr.p->m_loc.m_pageOffset; Uint32 pageId = loc.m_pageId;
Uint32* node32 = reinterpret_cast<Uint32*>(tmpPtr.p->m_node); Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node);
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32); c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
// invalidate handle and storage // invalidate handle and storage
tmpPtr.p->m_loc = NullTupLoc; node.m_loc = NullTupLoc;
tmpPtr.p->m_node = 0; node.m_node = 0;
// scans have already been moved by nodePopDown or nodePopUp
}
/*
* Access more of the node.
*/
void
Dbtux::accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc)
{
NodeHandlePtr tmpPtr = nodePtr;
ndbrequire(tmpPtr.p->m_loc != NullTupLoc && tmpPtr.p->m_node != 0);
if (tmpPtr.p->m_acc >= acc)
return;
// XXX could do prefetch
tmpPtr.p->m_acc = acc;
} }
/* /*
...@@ -201,25 +134,6 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i) ...@@ -201,25 +134,6 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
copyAttrs(pref, readPar.m_data, copyPar); copyAttrs(pref, readPar.m_data, copyPar);
} }
/*
* Commit and release nodes at the end of an operation. Used also on
* error since no changes have been made (updateOk false).
*/
void
Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk)
{
NodeHandlePtr nodePtr;
nodePtr.i = frag.m_nodeList;
frag.m_nodeList = RNIL;
while (nodePtr.i != RNIL) {
c_nodeHandlePool.getPtr(nodePtr);
// release
NodeHandlePtr tmpPtr = nodePtr;
nodePtr.i = nodePtr.p->m_next;
c_nodeHandlePool.release(tmpPtr);
}
}
// node operations // node operations
/* /*
......
...@@ -42,7 +42,6 @@ Dbtux::execACC_SCANREQ(Signal* signal) ...@@ -42,7 +42,6 @@ Dbtux::execACC_SCANREQ(Signal* signal)
} }
ndbrequire(fragPtr.i != RNIL); ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p; Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
// must be normal DIH/TC fragment // must be normal DIH/TC fragment
ndbrequire(frag.m_fragId < (1 << frag.m_fragOff)); ndbrequire(frag.m_fragId < (1 << frag.m_fragOff));
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -241,7 +240,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -241,7 +240,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl; debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
ndbrequire(frag.m_nodeList == RNIL);
// handle unlock previous and close scan // handle unlock previous and close scan
switch (req->scanFlag) { switch (req->scanFlag) {
case NextScanReq::ZSCAN_NEXT: case NextScanReq::ZSCAN_NEXT:
...@@ -278,9 +276,9 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -278,9 +276,9 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
if (scan.m_scanPos.m_loc != NullTupLoc) { if (scan.m_scanPos.m_loc != NullTupLoc) {
jam(); jam();
const TupLoc loc = scan.m_scanPos.m_loc; const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandlePtr nodePtr; NodeHandle node(frag);
selectNode(signal, frag, nodePtr, loc, AccHead); selectNode(signal, node, loc, AccHead);
unlinkScan(*nodePtr.p, scanPtr); unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc; scan.m_scanPos.m_loc = NullTupLoc;
} }
if (scan.m_lockwait) { if (scan.m_lockwait) {
...@@ -295,7 +293,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -295,7 +293,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jamEntry(); jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success); ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_state = ScanOp::Aborting; scan.m_state = ScanOp::Aborting;
commitNodes(signal, frag, true);
return; return;
} }
if (scan.m_state == ScanOp::Locked) { if (scan.m_state == ScanOp::Locked) {
...@@ -350,7 +347,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -350,7 +347,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true; signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry(); jamEntry();
commitNodes(signal, frag, true);
return; // stop return; // stop
} }
if (scan.m_lockwait) { if (scan.m_lockwait) {
...@@ -365,7 +361,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -365,7 +361,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
// if TC has ordered scan close, it will be detected here // if TC has ordered scan close, it will be detected here
sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF, sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB); signal, signalLength, JBB);
commitNodes(signal, frag, true);
return; // stop return; // stop
} }
if (scan.m_state == ScanOp::First) { if (scan.m_state == ScanOp::First) {
...@@ -444,7 +439,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -444,7 +439,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true; signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry(); jamEntry();
commitNodes(signal, frag, true);
return; // stop return; // stop
break; break;
case AccLockReq::Refused: case AccLockReq::Refused:
...@@ -457,7 +451,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -457,7 +451,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true; signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry(); jamEntry();
commitNodes(signal, frag, true);
return; // stop return; // stop
break; break;
case AccLockReq::NoFreeOp: case AccLockReq::NoFreeOp:
...@@ -470,7 +463,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -470,7 +463,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true; signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2); EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry(); jamEntry();
commitNodes(signal, frag, true);
return; // stop return; // stop
break; break;
default: default:
...@@ -554,7 +546,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -554,7 +546,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
scan.m_lastEnt = ent; scan.m_lastEnt = ent;
// next time look for next entry // next time look for next entry
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
commitNodes(signal, frag, true);
return; return;
} }
// XXX in ACC this is checked before req->checkLcpStop // XXX in ACC this is checked before req->checkLcpStop
...@@ -568,7 +559,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -568,7 +559,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
unsigned signalLength = 3; unsigned signalLength = 3;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB); signal, signalLength, JBB);
commitNodes(signal, frag, true);
return; return;
} }
ndbrequire(false); ndbrequire(false);
...@@ -707,7 +697,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -707,7 +697,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
} }
TreePos pos; TreePos pos;
pos.m_loc = tree.m_root; pos.m_loc = tree.m_root;
NodeHandlePtr nodePtr; NodeHandle node(frag);
// unpack lower bound // unpack lower bound
const ScanBound& bound = *scan.m_bound[0]; const ScanBound& bound = *scan.m_bound[0];
ScanBoundIterator iter; ScanBoundIterator iter;
...@@ -724,20 +714,20 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -724,20 +714,20 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
boundPar.m_dir = 0; boundPar.m_dir = 0;
loop: { loop: {
jam(); jam();
selectNode(signal, frag, nodePtr, pos.m_loc, AccPref); selectNode(signal, node, pos.m_loc, AccPref);
const unsigned occup = nodePtr.p->getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup != 0); ndbrequire(occup != 0);
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
jam(); jam();
// compare prefix // compare prefix
boundPar.m_data2 = nodePtr.p->getPref(i); boundPar.m_data2 = node.getPref(i);
boundPar.m_len2 = tree.m_prefSize; boundPar.m_len2 = tree.m_prefSize;
int ret = cmpScanBound(frag, boundPar); int ret = cmpScanBound(frag, boundPar);
if (ret == NdbSqlUtil::CmpUnknown) { if (ret == NdbSqlUtil::CmpUnknown) {
jam(); jam();
// read full value // read full value
ReadPar readPar; ReadPar readPar;
readPar.m_ent = nodePtr.p->getMinMax(i); readPar.m_ent = node.getMinMax(i);
readPar.m_first = 0; readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs; readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data readPar.m_data = 0; // leave in signal data
...@@ -750,7 +740,7 @@ loop: { ...@@ -750,7 +740,7 @@ loop: {
} }
if (i == 0 && ret < 0) { if (i == 0 && ret < 0) {
jam(); jam();
const TupLoc loc = nodePtr.p->getLink(i); const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
jam(); jam();
// continue to left subtree // continue to left subtree
...@@ -763,12 +753,12 @@ loop: { ...@@ -763,12 +753,12 @@ loop: {
pos.m_dir = 3; pos.m_dir = 3;
scan.m_scanPos = pos; scan.m_scanPos = pos;
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
linkScan(*nodePtr.p, scanPtr); linkScan(node, scanPtr);
return; return;
} }
if (i == 1 && ret > 0) { if (i == 1 && ret > 0) {
jam(); jam();
const TupLoc loc = nodePtr.p->getLink(i); const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
jam(); jam();
// continue to right subtree // continue to right subtree
...@@ -779,18 +769,18 @@ loop: { ...@@ -779,18 +769,18 @@ loop: {
pos.m_dir = 1; pos.m_dir = 1;
scan.m_scanPos = pos; scan.m_scanPos = pos;
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
linkScan(*nodePtr.p, scanPtr); linkScan(node, scanPtr);
return; return;
} }
} }
// read rest of current node // read rest of current node
accessNode(signal, frag, nodePtr, AccFull); accessNode(signal, node, AccFull);
// look for first entry // look for first entry
ndbrequire(occup >= 2); ndbrequire(occup >= 2);
for (unsigned j = 1; j < occup; j++) { for (unsigned j = 1; j < occup; j++) {
jam(); jam();
ReadPar readPar; ReadPar readPar;
readPar.m_ent = nodePtr.p->getEnt(j); readPar.m_ent = node.getEnt(j);
readPar.m_first = 0; readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs; readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data readPar.m_data = 0; // leave in signal data
...@@ -808,7 +798,7 @@ loop: { ...@@ -808,7 +798,7 @@ loop: {
pos.m_dir = 3; pos.m_dir = 3;
scan.m_scanPos = pos; scan.m_scanPos = pos;
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
linkScan(*nodePtr.p, scanPtr); linkScan(node, scanPtr);
return; return;
} }
} }
...@@ -868,11 +858,11 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -868,11 +858,11 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
// use copy of position // use copy of position
TreePos pos = scan.m_scanPos; TreePos pos = scan.m_scanPos;
// get and remember original node // get and remember original node
NodeHandlePtr origNodePtr; NodeHandle origNode(frag);
selectNode(signal, frag, origNodePtr, pos.m_loc, AccHead); selectNode(signal, origNode, pos.m_loc, AccHead);
ndbrequire(islinkScan(*origNodePtr.p, scanPtr)); ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop // current node in loop
NodeHandlePtr nodePtr = origNodePtr; NodeHandle node = origNode;
while (true) { while (true) {
jam(); jam();
if (pos.m_dir == 2) { if (pos.m_dir == 2) {
...@@ -882,14 +872,14 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -882,14 +872,14 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Last; scan.m_state = ScanOp::Last;
break; break;
} }
if (nodePtr.p->m_loc != pos.m_loc) { if (node.m_loc != pos.m_loc) {
jam(); jam();
selectNode(signal, frag, nodePtr, pos.m_loc, AccHead); selectNode(signal, node, pos.m_loc, AccHead);
} }
if (pos.m_dir == 4) { if (pos.m_dir == 4) {
// coming down from parent proceed to left child // coming down from parent proceed to left child
jam(); jam();
TupLoc loc = nodePtr.p->getLink(0); TupLoc loc = node.getLink(0);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
jam(); jam();
pos.m_loc = loc; pos.m_loc = loc;
...@@ -909,10 +899,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -909,10 +899,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (pos.m_dir == 3) { if (pos.m_dir == 3) {
// within node // within node
jam(); jam();
unsigned occup = nodePtr.p->getOccup(); unsigned occup = node.getOccup();
ndbrequire(occup >= 1); ndbrequire(occup >= 1);
// access full node // access full node
accessNode(signal, frag, nodePtr, AccFull); accessNode(signal, node, AccFull);
// advance position // advance position
if (! pos.m_match) if (! pos.m_match)
pos.m_match = true; pos.m_match = true;
...@@ -920,7 +910,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -920,7 +910,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
pos.m_pos++; pos.m_pos++;
if (pos.m_pos < occup) { if (pos.m_pos < occup) {
jam(); jam();
pos.m_ent = nodePtr.p->getEnt(pos.m_pos); pos.m_ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged pos.m_dir = 3; // unchanged
// XXX implement prefix optimization // XXX implement prefix optimization
ReadPar readPar; ReadPar readPar;
...@@ -951,7 +941,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -951,7 +941,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break; break;
} }
// after node proceed to right child // after node proceed to right child
TupLoc loc = nodePtr.p->getLink(1); TupLoc loc = node.getLink(1);
if (loc != NullTupLoc) { if (loc != NullTupLoc) {
jam(); jam();
pos.m_loc = loc; pos.m_loc = loc;
...@@ -964,8 +954,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -964,8 +954,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (pos.m_dir == 1) { if (pos.m_dir == 1) {
// coming from right child proceed to parent // coming from right child proceed to parent
jam(); jam();
pos.m_loc = nodePtr.p->getLink(2); pos.m_loc = node.getLink(2);
pos.m_dir = nodePtr.p->getSide(); pos.m_dir = node.getSide();
continue; continue;
} }
ndbrequire(false); ndbrequire(false);
...@@ -974,16 +964,16 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -974,16 +964,16 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos; scan.m_scanPos = pos;
// relink // relink
if (scan.m_state == ScanOp::Current) { if (scan.m_state == ScanOp::Current) {
ndbrequire(pos.m_loc == nodePtr.p->m_loc); ndbrequire(pos.m_loc == node.m_loc);
if (origNodePtr.i != nodePtr.i) { if (origNode.m_loc != node.m_loc) {
jam(); jam();
unlinkScan(*origNodePtr.p, scanPtr); unlinkScan(origNode, scanPtr);
linkScan(*nodePtr.p, scanPtr); linkScan(node, scanPtr);
} }
} else if (scan.m_state == ScanOp::Last) { } else if (scan.m_state == ScanOp::Last) {
jam(); jam();
ndbrequire(pos.m_loc == NullTupLoc); ndbrequire(pos.m_loc == NullTupLoc);
unlinkScan(*origNodePtr.p, scanPtr); unlinkScan(origNode, scanPtr);
} else { } else {
ndbrequire(false); ndbrequire(false);
} }
...@@ -1043,7 +1033,6 @@ void ...@@ -1043,7 +1033,6 @@ void
Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL); ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
// unlock all not unlocked by LQH // unlock all not unlocked by LQH
for (unsigned i = 0; i < MaxAccLockOps; i++) { for (unsigned i = 0; i < MaxAccLockOps; i++) {
...@@ -1068,7 +1057,6 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr) ...@@ -1068,7 +1057,6 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB); signal, signalLength, JBB);
releaseScanOp(scanPtr); releaseScanOp(scanPtr);
commitNodes(signal, frag, true);
} }
void void
......
This diff is collapsed.
...@@ -31,6 +31,7 @@ optim 4 mc02/a 42 ms 80 ms 87 pct ...@@ -31,6 +31,7 @@ optim 4 mc02/a 42 ms 80 ms 87 pct
optim 5 mc02/a 43 ms 77 ms 77 pct optim 5 mc02/a 43 ms 77 ms 77 pct
mc02/b 54 ms 118 ms 117 pct mc02/b 54 ms 118 ms 117 pct
optim 6 mc02/a 42 ms 70 ms 66 pct
mc02/b 53 ms 109 ms 105 pct
vim: set et: vim: set et:
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment