Commit d3b0753d authored by unknown's avatar unknown

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-4.1

into mc05.(none):/space2/tomas/mysql-4.1-ndb-test

parents a9eb4993 6b099d2c
......@@ -129,6 +129,8 @@ private:
/*
* Operate on entire tuple. Used by TUX where the table has a single
* Uint32 array attribute representing an index tree node.
*
* XXX this signal is no longer used by TUX and can be removed
*/
class TupStoreTh {
friend class Dbtup;
......
......@@ -69,7 +69,7 @@ class TuxFragReq {
friend class Dblqh;
friend class Dbtux;
public:
STATIC_CONST( SignalLength = 9 );
STATIC_CONST( SignalLength = 14 );
private:
Uint32 userPtr;
Uint32 userRef;
......@@ -80,6 +80,9 @@ private:
Uint32 fragOff;
Uint32 tableType;
Uint32 primaryTableId;
Uint32 tupIndexFragPtrI;
Uint32 tupTableFragPtrI[2];
Uint32 accTableFragPtrI[2];
};
class TuxFragConf {
......
......@@ -2432,6 +2432,7 @@ void Dbacc::execACC_LOCKREQ(Signal* signal)
}
fragrecptr.i = req->fragPtrI;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
ndbrequire(req->fragId == fragrecptr.p->myfid);
// caller must be explicit here
ndbrequire(req->accOpPtr == RNIL);
// seize operation to hold the lock
......
......@@ -1225,6 +1225,18 @@ Dblqh::sendAddFragReq(Signal* signal)
tuxreq->fragOff = addfragptr.p->lh3DistrBits;
tuxreq->tableType = addfragptr.p->tableType;
tuxreq->primaryTableId = addfragptr.p->primaryTableId;
// pointer to index fragment in TUP
tuxreq->tupIndexFragPtrI =
addfragptr.p->addfragStatus == AddFragRecord::WAIT_TWO_TUX ?
fragptr.p->tupFragptr[0] : fragptr.p->tupFragptr[1];
// pointers to table fragments in TUP and ACC
FragrecordPtr tFragPtr;
tFragPtr.i = fragptr.p->tableFragptr;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
tuxreq->tupTableFragPtrI[0] = tFragPtr.p->tupFragptr[0];
tuxreq->tupTableFragPtrI[1] = tFragPtr.p->tupFragptr[1];
tuxreq->accTableFragPtrI[0] = tFragPtr.p->accFragptr[0];
tuxreq->accTableFragPtrI[1] = tFragPtr.p->accFragptr[1];
sendSignal(fragptr.p->tuxBlockref, GSN_TUXFRAGREQ,
signal, TuxFragReq::SignalLength, JBB);
return;
......
......@@ -996,6 +996,14 @@ public:
Dbtup(const class Configuration &);
virtual ~Dbtup();
/*
* TUX index in TUP has single Uint32 array attribute which stores an
* index node. TUX uses following methods.
*/
int tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node);
void tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node);
void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node);
private:
BLOCK_DEFINES(Dbtup);
......
......@@ -179,6 +179,64 @@ Dbtup::execTUP_QUERY_TH(Signal* signal)
return;
}
int
Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
terrorCode = 0;
if (! allocTh(fragPtr.p, tablePtr.p, NORMAL_PAGE, signal, pageOffset, pagePtr)) {
jam();
ndbrequire(terrorCode != 0);
return terrorCode;
}
pageId = pagePtr.i;
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
return 0;
}
void
Dbtup::tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
ndbrequire(node == &pagePtr.p->pageWord[pageOffset] + attrDataOffset);
freeTh(fragPtr.p, tablePtr.p, signal, pagePtr.p, pageOffset);
}
void
Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tablePtr;
tablePtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
PagePtr pagePtr;
pagePtr.i = pageId;
ptrCheckGuard(pagePtr, cnoOfPage, page);
Uint32 attrDescIndex = tablePtr.p->tabDescriptor + (0 << ZAD_LOG_SIZE);
Uint32 attrDataOffset = AttributeOffset::getOffset(tableDescriptor[attrDescIndex + 1].tabDescr);
node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
}
void
Dbtup::execTUP_STORE_TH(Signal* signal)
{
......
......@@ -25,6 +25,9 @@
#include <DataBuffer.hpp>
#include <md5_hash.hpp>
// big brother
#include <Dbtup.hpp>
// signal classes
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/TuxContinueB.hpp>
......@@ -92,15 +95,13 @@ public:
Dbtux(const Configuration& conf);
virtual ~Dbtux();
// pointer to TUP instance in this thread
Dbtup* c_tup;
private:
// sizes are in words (Uint32)
static const unsigned MaxIndexFragments = 2 * NO_OF_FRAG_PER_NODE;
static const unsigned MaxIndexAttributes = MAX_ATTRIBUTES_IN_INDEX;
#ifdef VM_TRACE
static const unsigned MaxNodeHandles = 10000; // More space for printTree
#else
static const unsigned MaxNodeHandles = 128; // enough for 1 operation
#endif
static const unsigned MaxAttrDataSize = 2048;
public:
static const unsigned DescPageSize = 256;
......@@ -153,8 +154,7 @@ private:
static const unsigned AttributeHeaderSize = 1;
/*
* Logical tuple address, "local key". Identifies both table tuples
* and index tuples. The code assumes it is one word.
* Logical tuple address, "local key". Identifies table tuples.
*/
typedef Uint32 TupAddr;
static const unsigned NullTupAddr = (Uint32)-1;
......@@ -168,8 +168,18 @@ private:
Uint32 m_pageId; // page i-value
Uint16 m_pageOffset; // page offset in words
TupLoc();
TupLoc(Uint32 pageId, Uint16 pageOffset);
bool operator==(const TupLoc& loc) const;
bool operator!=(const TupLoc& loc) const;
};
/*
* There is no const member NullTupLoc since the compiler may not be
* able to optimize it to TupLoc() constants. Instead null values are
* constructed on the stack with TupLoc().
*/
#define NullTupLoc TupLoc()
// tree definitions
/*
......@@ -183,7 +193,7 @@ private:
TupAddr m_tupAddr; // address of original tuple
Uint16 m_tupVersion; // version
Uint8 m_fragBit; // which duplicated table fragment
Uint8 unused1;
Uint8 pad1;
TreeEnt();
// methods
int cmp(const TreeEnt ent) const;
......@@ -196,7 +206,7 @@ private:
* prefix 3) max and min entries 4) rest of entries 5) one extra entry
* used as work space.
*
* struct TreeNode part 1
* struct TreeNode part 1, size 6 words
* min prefix part 2, size TreeHead::m_prefSize
* max prefix part 2, size TreeHead::m_prefSize
* max entry part 3
......@@ -204,6 +214,10 @@ private:
* rest of entries part 4
* work entry part 5
*
* There are 3 links to other nodes: left child, right child, parent.
* These are in TupLoc format but the pageIds and pageOffsets are
* stored in separate arrays (saves 1 word).
*
* Occupancy (number of entries) is at least 1 except temporarily when
* a node is about to be removed. If occupancy is 1, only max entry
* is present but both min and max prefixes are set.
......@@ -211,11 +225,12 @@ private:
struct TreeNode;
friend struct TreeNode;
struct TreeNode {
TupAddr m_link[3]; // link to 0-left child 1-right child 2-parent
Uint8 m_side; // we are 0-left child 1-right child 2-root
Uint32 m_linkPI[3]; // link to 0-left child 1-right child 2-parent
Uint16 m_linkPO[3]; // page offsets for above real page ids
unsigned m_side : 2; // we are 0-left child 1-right child 2-root
int m_balance : 2; // balance -1, 0, +1
unsigned pad1 : 4;
Uint8 m_occup; // current number of entries
Int8 m_balance; // balance -1, 0, +1
Uint8 unused1;
Uint32 m_nodeScan; // list of scans at this node
TreeNode();
};
......@@ -243,7 +258,7 @@ private:
Uint8 m_prefSize; // words in min/max prefix each
Uint8 m_minOccup; // min entries in internal node
Uint8 m_maxOccup; // max entries in node
TupAddr m_root; // root node
TupLoc m_root; // root node
TreeHead();
// methods
unsigned getSize(AccSize acc) const;
......@@ -261,8 +276,7 @@ private:
struct TreePos;
friend struct TreePos;
struct TreePos {
TupAddr m_addr; // logical node address
TupLoc m_loc; // physical address
TupLoc m_loc; // physical node address
Uint16 m_pos; // position 0 to m_occup
Uint8 m_match; // at an existing entry
Uint8 m_dir; // from link (0-2) or within node (3)
......@@ -443,9 +457,11 @@ private:
Uint16 m_descOff;
Uint16 m_numAttrs;
TreeHead m_tree;
Uint32 m_nodeList; // node cache of current operation
Uint32 m_nodeFree; // one node pre-allocated for insert
TupLoc m_freeLoc; // one node pre-allocated for insert
DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2];
Uint32 m_accTableFragPtrI[2];
union {
Uint32 nextPool;
};
......@@ -476,62 +492,39 @@ private:
// node handles
/*
* A tree operation builds a cache of accessed nodes. This allows
* different implementations of index memory access. The cache is
* committed and released at the end of the operation.
* A node handle is a reference to a tree node in TUP. It is used to
* operate on the node. Node handles are allocated on the stack.
*/
struct NodeHandle;
friend struct NodeHandle;
struct NodeHandle {
enum Flags {
// bits 0,1 mark need for left,right prefix
DoInsert = (1 << 2),
DoDelete = (1 << 3),
DoUpdate = (1 << 4)
};
Dbtux& m_tux; // this block
Frag& m_frag; // fragment using the node
TupAddr m_addr; // logical node address
TupLoc m_loc; // physical node address
AccSize m_acc; // accessed size
unsigned m_flags; // flags
union {
Uint32 m_next; // next active node under fragment
Uint32 nextPool;
};
TreeNode* m_node; // pointer to node storage
Uint32 m_cache[MaxTreeNodeSize];
NodeHandle(Dbtux& tux, Frag& frag);
AccSize m_acc; // accessed size
NodeHandle(Frag& frag);
NodeHandle(const NodeHandle& node);
NodeHandle& operator=(const NodeHandle& node);
// getters
TupAddr getLink(unsigned i);
TupLoc getLink(unsigned i);
unsigned getChilds(); // cannot spell
unsigned getSide();
unsigned getOccup();
int getBalance();
Uint32 getNodeScan();
Data getPref(unsigned i);
TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i);
// setters
void setLink(unsigned i, TupAddr addr);
void setLink(unsigned i, TupLoc loc);
void setSide(unsigned i);
void setOccup(unsigned n);
void setBalance(int b);
void setNodeScan(Uint32 scanPtrI);
// operations XXX maybe these should move to Dbtux level
void pushUp(Signal* signal, unsigned pos, const TreeEnt& ent);
void popDown(Signal* signal, unsigned pos, TreeEnt& ent);
void pushDown(Signal* signal, unsigned pos, TreeEnt& ent);
void popUp(Signal* signal, unsigned pos, TreeEnt& ent);
void slide(Signal* signal, Ptr<NodeHandle> nodePtr, unsigned i);
void linkScan(Dbtux::ScanOpPtr scanPtr);
void unlinkScan(Dbtux::ScanOpPtr scanPtr);
bool islinkScan(Dbtux::ScanOpPtr scanPtr);
// for ndbrequire
void progError(int line, int cause, const char* extra);
// access other parts of the node
Data getPref(unsigned i);
TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i);
// for ndbrequire and ndbassert
void progError(int line, int cause, const char* file);
};
typedef Ptr<NodeHandle> NodeHandlePtr;
ArrayPool<NodeHandle> c_nodeHandlePool;
// parameters for methods
......@@ -562,17 +555,6 @@ private:
ReadPar();
};
/*
* Node storage operation.
*/
struct StorePar {
TupStoreTh::OpCode m_opCode;// operation code
unsigned m_offset; // data offset in words
unsigned m_size; // number of words
Uint32 m_errorCode; // terrorCode from TUP
StorePar();
};
/*
* Tree search for entry.
*/
......@@ -642,20 +624,26 @@ private:
void execTUX_MAINT_REQ(Signal* signal);
void tupReadAttrs(Signal* signal, const Frag& frag, ReadPar& readPar);
void tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar);
void tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, StorePar storePar);
/*
* DbtuxNode.cpp
*/
void seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr);
void preallocNode(Signal* signal, Frag& frag, Uint32& errorCode);
void findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr);
void selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr, AccSize acc);
void insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
void deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr);
void accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc);
void setNodePref(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i);
void commitNodes(Signal* signal, Frag& frag, bool updateOk);
int allocNode(Signal* signal, NodeHandle& node);
void accessNode(Signal* signal, NodeHandle& node, AccSize acc);
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc);
void insertNode(Signal* signal, NodeHandle& node, AccSize acc);
void deleteNode(Signal* signal, NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node, unsigned i);
// node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i);
// scans linked to node
void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
/*
* DbtuxTree.cpp
......@@ -663,8 +651,8 @@ private:
void treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& treePos);
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
/*
* DbtuxScan.cpp
......@@ -698,23 +686,24 @@ private:
struct PrintPar {
char m_path[100]; // LR prefix
unsigned m_side; // expected side
TupAddr m_parent; // expected parent address
TupLoc m_parent; // expected parent address
int m_depth; // returned depth
unsigned m_occup; // returned occupancy
bool m_ok; // returned status
PrintPar();
};
void printTree(Signal* signal, Frag& frag, NdbOut& out);
void printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar& par);
void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
friend class NdbOut& operator<<(NdbOut&, const TupLoc&);
friend class NdbOut& operator<<(NdbOut&, const TreeEnt&);
friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
friend class NdbOut& operator<<(NdbOut&, const TreeHead&);
friend class NdbOut& operator<<(NdbOut&, const TreePos&);
friend class NdbOut& operator<<(NdbOut&, const DescAttr&);
friend class NdbOut& operator<<(NdbOut&, const ScanOp&);
friend class NdbOut& operator<<(NdbOut&, const Index&);
friend class NdbOut& operator<<(NdbOut&, const Frag&);
friend class NdbOut& operator<<(NdbOut&, const NodeHandle&);
friend class NdbOut& operator<<(NdbOut&, const ScanOp&);
FILE* debugFile;
NdbOut debugOut;
unsigned debugFlags;
......@@ -831,8 +820,45 @@ Dbtux::ConstData::operator=(Data data)
return *this;
}
// Dbtux::TupLoc
inline
Dbtux::TupLoc::TupLoc() :
m_pageId(RNIL),
m_pageOffset(0)
{
}
inline
Dbtux::TupLoc::TupLoc(Uint32 pageId, Uint16 pageOffset) :
m_pageId(pageId),
m_pageOffset(pageOffset)
{
}
inline bool
Dbtux::TupLoc::operator==(const TupLoc& loc) const
{
return m_pageId == loc.m_pageId && m_pageOffset == loc.m_pageOffset;
}
inline bool
Dbtux::TupLoc::operator!=(const TupLoc& loc) const
{
return ! (*this == loc);
}
// Dbtux::TreeEnt
inline
Dbtux::TreeEnt::TreeEnt() :
m_tupAddr(NullTupAddr),
m_tupVersion(0),
m_fragBit(255),
pad1(0)
{
}
inline int
Dbtux::TreeEnt::cmp(const TreeEnt ent) const
{
......@@ -852,8 +878,36 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const
return 0;
}
// Dbtux::TreeNode
inline
Dbtux::TreeNode::TreeNode() :
m_side(2),
m_balance(0),
pad1(0),
m_occup(0),
m_nodeScan(RNIL)
{
m_linkPI[0] = NullTupLoc.m_pageId;
m_linkPO[0] = NullTupLoc.m_pageOffset;
m_linkPI[1] = NullTupLoc.m_pageId;
m_linkPO[1] = NullTupLoc.m_pageOffset;
m_linkPI[2] = NullTupLoc.m_pageId;
m_linkPO[2] = NullTupLoc.m_pageOffset;
}
// Dbtux::TreeHead
inline
Dbtux::TreeHead::TreeHead() :
m_nodeSize(0),
m_prefSize(0),
m_minOccup(0),
m_maxOccup(0),
m_root()
{
}
inline unsigned
Dbtux::TreeHead::getSize(AccSize acc) const
{
......@@ -885,52 +939,10 @@ Dbtux::TreeHead::getEntList(TreeNode* node) const
return (TreeEnt*)ptr;
}
// Dbtux
// constructors
inline
Dbtux::TupLoc::TupLoc() :
m_pageId(RNIL),
m_pageOffset(0)
{
}
inline
Dbtux::TreeEnt::TreeEnt() :
m_tupAddr(NullTupAddr),
m_tupVersion(0),
m_fragBit(255),
unused1(0)
{
}
inline
Dbtux::TreeNode::TreeNode() :
m_side(255),
m_occup(0),
m_balance(0),
unused1(0xa1),
m_nodeScan(RNIL)
{
m_link[0] = NullTupAddr;
m_link[1] = NullTupAddr;
m_link[2] = NullTupAddr;
}
inline
Dbtux::TreeHead::TreeHead() :
m_nodeSize(0),
m_prefSize(0),
m_minOccup(0),
m_maxOccup(0),
m_root(0)
{
}
// Dbtux::TreePos
inline
Dbtux::TreePos::TreePos() :
m_addr(NullTupAddr),
m_loc(),
m_pos(ZNIL),
m_match(false),
......@@ -939,6 +951,8 @@ Dbtux::TreePos::TreePos() :
{
}
// Dbtux::DescPage
inline
Dbtux::DescPage::DescPage() :
m_nextPage(RNIL),
......@@ -953,6 +967,41 @@ Dbtux::DescPage::DescPage() :
}
}
// Dbtux::ScanOp
inline
Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
m_state(Undef),
m_lockwait(false),
m_userPtr(RNIL),
m_userRef(RNIL),
m_tableId(RNIL),
m_indexId(RNIL),
m_fragPtrI(RNIL),
m_transId1(0),
m_transId2(0),
m_savePointId(0),
m_accLockOp(RNIL),
m_readCommitted(0),
m_lockMode(0),
m_keyInfo(0),
m_boundMin(scanBoundPool),
m_boundMax(scanBoundPool),
m_scanPos(),
m_lastEnt(),
m_nodeScan(RNIL)
{
m_bound[0] = &m_boundMin;
m_bound[1] = &m_boundMax;
m_boundCnt[0] = 0;
m_boundCnt[1] = 0;
for (unsigned i = 0; i < MaxAccLockOps; i++) {
m_accLockOps[i] = RNIL;
}
}
// Dbtux::Index
inline
Dbtux::Index::Index() :
m_state(NotDefined),
......@@ -969,6 +1018,8 @@ Dbtux::Index::Index() :
};
};
// Dbtux::Frag
inline
Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) :
m_tableId(RNIL),
......@@ -979,12 +1030,18 @@ Dbtux::Frag::Frag(ArrayPool<ScanOp>& scanOpPool) :
m_descOff(0),
m_numAttrs(ZNIL),
m_tree(),
m_nodeList(RNIL),
m_nodeFree(RNIL),
m_scanList(scanOpPool)
m_freeLoc(),
m_scanList(scanOpPool),
m_tupIndexFragPtrI(RNIL)
{
m_tupTableFragPtrI[0] = RNIL;
m_tupTableFragPtrI[1] = RNIL;
m_accTableFragPtrI[0] = RNIL;
m_accTableFragPtrI[1] = RNIL;
}
// Dbtux::FragOp
inline
Dbtux::FragOp::FragOp() :
m_userPtr(RNIL),
......@@ -997,160 +1054,107 @@ Dbtux::FragOp::FragOp() :
{
};
// Dbtux::NodeHandle
inline
Dbtux::NodeHandle::NodeHandle(Dbtux& tux, Frag& frag) :
m_tux(tux),
Dbtux::NodeHandle::NodeHandle(Frag& frag) :
m_frag(frag),
m_addr(NullTupAddr),
m_loc(),
m_acc(AccNone),
m_flags(0),
m_next(RNIL),
m_node(0)
m_node(0),
m_acc(AccNone)
{
}
inline
Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
m_state(Undef),
m_lockwait(false),
m_userPtr(RNIL),
m_userRef(RNIL),
m_tableId(RNIL),
m_indexId(RNIL),
m_fragPtrI(RNIL),
m_transId1(0),
m_transId2(0),
m_savePointId(0),
m_accLockOp(RNIL),
m_readCommitted(0),
m_lockMode(0),
m_keyInfo(0),
m_boundMin(scanBoundPool),
m_boundMax(scanBoundPool),
m_scanPos(),
m_lastEnt(),
m_nodeScan(RNIL)
Dbtux::NodeHandle::NodeHandle(const NodeHandle& node) :
m_frag(node.m_frag),
m_loc(node.m_loc),
m_node(node.m_node),
m_acc(node.m_acc)
{
m_bound[0] = &m_boundMin;
m_bound[1] = &m_boundMax;
m_boundCnt[0] = 0;
m_boundCnt[1] = 0;
for (unsigned i = 0; i < MaxAccLockOps; i++) {
m_accLockOps[i] = RNIL;
}
}
inline
Dbtux::CopyPar::CopyPar() :
m_items(0),
m_headers(true),
m_maxwords(~0), // max unsigned
// output
m_numitems(0),
m_numwords(0)
inline Dbtux::NodeHandle&
Dbtux::NodeHandle::operator=(const NodeHandle& node)
{
ndbassert(&m_frag == &node.m_frag);
m_loc = node.m_loc;
m_node = node.m_node;
m_acc = node.m_acc;
return *this;
}
inline
Dbtux::ReadPar::ReadPar() :
m_first(0),
m_count(0),
m_data(0),
m_size(0)
inline Dbtux::TupLoc
Dbtux::NodeHandle::getLink(unsigned i)
{
ndbrequire(i <= 2);
return TupLoc(m_node->m_linkPI[i], m_node->m_linkPO[i]);
}
inline
Dbtux::StorePar::StorePar() :
m_opCode(TupStoreTh::OpUndefined),
m_offset(0),
m_size(0),
m_errorCode(0)
inline unsigned
Dbtux::NodeHandle::getChilds()
{
return (getLink(0) != NullTupLoc) + (getLink(1) != NullTupLoc);
}
inline
Dbtux::SearchPar::SearchPar() :
m_data(0),
m_ent()
inline unsigned
Dbtux::NodeHandle::getSide()
{
return m_node->m_side;
}
inline
Dbtux::CmpPar::CmpPar() :
m_data1(0),
m_data2(0),
m_len2(0),
m_first(0),
m_numEq(0)
inline unsigned
Dbtux::NodeHandle::getOccup()
{
return m_node->m_occup;
}
inline
Dbtux::BoundPar::BoundPar() :
m_data1(0),
m_data2(0),
m_count1(0),
m_len2(0),
m_dir(255)
inline int
Dbtux::NodeHandle::getBalance()
{
return m_node->m_balance;
}
#ifdef VM_TRACE
inline
Dbtux::PrintPar::PrintPar() :
// caller fills in
m_path(),
m_side(255),
m_parent(NullTupAddr),
// default return values
m_depth(0),
m_occup(0),
m_ok(true)
inline Uint32
Dbtux::NodeHandle::getNodeScan()
{
return m_node->m_nodeScan;
}
#endif
// node handles
inline Dbtux::TupAddr
Dbtux::NodeHandle::getLink(unsigned i)
inline void
Dbtux::NodeHandle::setLink(unsigned i, TupLoc loc)
{
ndbrequire(i <= 2);
return m_node->m_link[i];
}
inline unsigned
Dbtux::NodeHandle::getChilds()
{
return
(m_node->m_link[0] != NullTupAddr) +
(m_node->m_link[1] != NullTupAddr);
m_node->m_linkPI[i] = loc.m_pageId;
m_node->m_linkPO[i] = loc.m_pageOffset;
}
inline Dbtux::TupAddr
Dbtux::NodeHandle::getSide()
inline void
Dbtux::NodeHandle::setSide(unsigned i)
{
return m_node->m_side;
ndbrequire(i <= 2);
m_node->m_side = i;
}
inline unsigned
Dbtux::NodeHandle::getOccup()
inline void
Dbtux::NodeHandle::setOccup(unsigned n)
{
return m_node->m_occup;
TreeHead& tree = m_frag.m_tree;
ndbrequire(n <= tree.m_maxOccup);
m_node->m_occup = n;
}
inline int
Dbtux::NodeHandle::getBalance()
inline void
Dbtux::NodeHandle::setBalance(int b)
{
return m_node->m_balance;
ndbrequire(abs(b) <= 1);
m_node->m_balance = b;
}
inline Uint32
Dbtux::NodeHandle::getNodeScan()
inline void
Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI)
{
return m_node->m_nodeScan;
m_node->m_nodeScan = scanPtrI;
}
inline Dbtux::Data
......@@ -1184,45 +1188,69 @@ Dbtux::NodeHandle::getMinMax(unsigned i)
return getEnt(i == 0 ? 0 : occup - 1);
}
inline void
Dbtux::NodeHandle::setLink(unsigned i, TupAddr addr)
// parameters for methods
inline
Dbtux::CopyPar::CopyPar() :
m_items(0),
m_headers(true),
m_maxwords(~0), // max unsigned
// output
m_numitems(0),
m_numwords(0)
{
ndbrequire(i <= 2);
m_node->m_link[i] = addr;
m_flags |= DoUpdate;
}
inline void
Dbtux::NodeHandle::setSide(unsigned i)
inline
Dbtux::ReadPar::ReadPar() :
m_first(0),
m_count(0),
m_data(0),
m_size(0)
{
// ndbrequire(i <= 1);
m_node->m_side = i;
m_flags |= DoUpdate;
}
inline void
Dbtux::NodeHandle::setOccup(unsigned n)
inline
Dbtux::SearchPar::SearchPar() :
m_data(0),
m_ent()
{
TreeHead& tree = m_frag.m_tree;
ndbrequire(n <= tree.m_maxOccup);
m_node->m_occup = n;
m_flags |= DoUpdate;
}
inline void
Dbtux::NodeHandle::setBalance(int b)
inline
Dbtux::CmpPar::CmpPar() :
m_data1(0),
m_data2(0),
m_len2(0),
m_first(0),
m_numEq(0)
{
ndbrequire(abs(b) <= 1);
m_node->m_balance = b;
m_flags |= DoUpdate;
}
inline void
Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI)
inline
Dbtux::BoundPar::BoundPar() :
m_data1(0),
m_data2(0),
m_count1(0),
m_len2(0),
m_dir(255)
{
}
#ifdef VM_TRACE
inline
Dbtux::PrintPar::PrintPar() :
// caller fills in
m_path(),
m_side(255),
m_parent(),
// default return values
m_depth(0),
m_occup(0),
m_ok(true)
{
m_node->m_nodeScan = scanPtrI;
m_flags |= DoUpdate;
}
#endif
// other methods
......
......@@ -97,7 +97,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
PrintPar par;
strcpy(par.m_path, ".");
par.m_side = 2;
par.m_parent = NullTupAddr;
par.m_parent = NullTupLoc;
printNode(signal, frag, out, tree.m_root, par);
out.m_out->flush();
if (! par.m_ok) {
......@@ -106,26 +106,24 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
signal->theData[1] = 1;
execDUMP_STATE_ORD(signal);
if (debugFile != 0) {
commitNodes(signal, frag, false);
printTree(signal, frag, debugOut);
}
}
ndbrequire(false);
}
commitNodes(signal, frag, false);
}
void
Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar& par)
Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
{
if (addr == NullTupAddr) {
if (loc == NullTupLoc) {
par.m_depth = 0;
return;
}
TreeHead& tree = frag.m_tree;
NodeHandlePtr nodePtr;
selectNode(signal, frag, nodePtr, addr, AccFull);
out << par.m_path << " " << *nodePtr.p << endl;
NodeHandle node(frag);
selectNode(signal, node, loc, AccFull);
out << par.m_path << " " << node << endl;
// check children
PrintPar cpar[2];
ndbrequire(strlen(par.m_path) + 1 < sizeof(par.m_path));
......@@ -133,57 +131,57 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar
sprintf(cpar[i].m_path, "%s%c", par.m_path, "LR"[i]);
cpar[i].m_side = i;
cpar[i].m_depth = 0;
cpar[i].m_parent = addr;
printNode(signal, frag, out, nodePtr.p->getLink(i), cpar[i]);
cpar[i].m_parent = loc;
printNode(signal, frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) {
par.m_ok = false;
}
}
// check child-parent links
if (nodePtr.p->getLink(2) != par.m_parent) {
if (node.getLink(2) != par.m_parent) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "parent addr " << hex << nodePtr.p->getLink(2);
out << "parent loc " << hex << node.getLink(2);
out << " should be " << hex << par.m_parent << endl;
}
if (nodePtr.p->getSide() != par.m_side) {
if (node.getSide() != par.m_side) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "side " << dec << nodePtr.p->getSide();
out << "side " << dec << node.getSide();
out << " should be " << dec << par.m_side << endl;
}
// check balance
const int balance = -cpar[0].m_depth + cpar[1].m_depth;
if (nodePtr.p->getBalance() != balance) {
if (node.getBalance() != balance) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "balance " << nodePtr.p->getBalance();
out << "balance " << node.getBalance();
out << " should be " << balance << endl;
}
if (abs(nodePtr.p->getBalance()) > 1) {
if (abs(node.getBalance()) > 1) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "balance " << nodePtr.p->getBalance() << " is invalid" << endl;
out << "balance " << node.getBalance() << " is invalid" << endl;
}
// check occupancy
if (nodePtr.p->getOccup() > tree.m_maxOccup) {
if (node.getOccup() > tree.m_maxOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "occupancy " << nodePtr.p->getOccup();
out << "occupancy " << node.getOccup();
out << " greater than max " << tree.m_maxOccup << endl;
}
// check for occupancy of interior node
if (nodePtr.p->getChilds() == 2 && nodePtr.p->getOccup() < tree.m_minOccup) {
if (node.getChilds() == 2 && node.getOccup() < tree.m_minOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "occupancy " << nodePtr.p->getOccup() << " of interior node";
out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl;
}
// check missed half-leaf/leaf merge
for (unsigned i = 0; i <= 1; i++) {
if (nodePtr.p->getLink(i) != NullTupAddr &&
nodePtr.p->getLink(1 - i) == NullTupAddr &&
nodePtr.p->getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
if (node.getLink(i) != NullTupLoc &&
node.getLink(1 - i) == NullTupLoc &&
node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << "missed merge with child " << i << endl;
......@@ -191,7 +189,19 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupAddr addr, PrintPar
}
// return values
par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth);
par.m_occup = nodePtr.p->getOccup();
par.m_occup = node.getOccup();
}
NdbOut&
operator<<(NdbOut& out, const Dbtux::TupLoc& loc)
{
if (loc == Dbtux::NullTupLoc) {
out << "null";
} else {
out << hex << loc.m_pageId;
out << "." << dec << loc.m_pageOffset;
}
return out;
}
NdbOut&
......@@ -206,10 +216,13 @@ operator<<(NdbOut& out, const Dbtux::TreeEnt& ent)
NdbOut&
operator<<(NdbOut& out, const Dbtux::TreeNode& node)
{
Dbtux::TupLoc link0(node.m_linkPI[0], node.m_linkPO[0]);
Dbtux::TupLoc link1(node.m_linkPI[1], node.m_linkPO[1]);
Dbtux::TupLoc link2(node.m_linkPI[2], node.m_linkPO[2]);
out << "[TreeNode " << hex << &node;
out << " [left " << hex << node.m_link[0] << "]";
out << " [right " << hex << node.m_link[1] << "]";
out << " [up " << hex << node.m_link[2] << "]";
out << " [left " << link0 << "]";
out << " [right " << link1 << "]";
out << " [up " << link2 << "]";
out << " [side " << dec << node.m_side << "]";
out << " [occup " << dec << node.m_occup << "]";
out << " [balance " << dec << (int)node.m_balance << "]";
......@@ -238,7 +251,7 @@ NdbOut&
operator<<(NdbOut& out, const Dbtux::TreePos& pos)
{
out << "[TreePos " << hex << &pos;
out << " [addr " << hex << pos.m_addr << "]";
out << " [loc " << pos.m_loc << "]";
out << " [pos " << dec << pos.m_pos << "]";
out << " [match " << dec << pos.m_match << "]";
out << " [dir " << dec << pos.m_dir << "]";
......@@ -338,9 +351,8 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node)
const Dbtux::Frag& frag = node.m_frag;
const Dbtux::TreeHead& tree = frag.m_tree;
out << "[NodeHandle " << hex << &node;
out << " [addr " << hex << node.m_addr << "]";
out << " [loc " << node.m_loc << "]";
out << " [acc " << dec << node.m_acc << "]";
out << " [flags " << hex << node.m_flags << "]";
out << " [node " << *node.m_node << "]";
if (node.m_acc >= Dbtux::AccPref) {
for (unsigned i = 0; i <= 1; i++) {
......
......@@ -21,6 +21,7 @@
Dbtux::Dbtux(const Configuration& conf) :
SimulatedBlock(DBTUX, conf),
c_tup(0),
c_descPageList(RNIL),
#ifdef VM_TRACE
debugFile(0),
......@@ -123,6 +124,8 @@ Dbtux::execSTTOR(Signal* signal)
case 1:
jam();
CLEAR_ERROR_INSERT_VALUE;
c_tup = (Dbtup*)globalData.getBlock(DBTUP);
ndbrequire(c_tup != 0);
break;
case 3:
jam();
......@@ -175,12 +178,11 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
c_fragPool.setSize(nFragment);
c_descPagePool.setSize(nDescPage);
c_fragOpPool.setSize(MaxIndexFragments);
c_nodeHandlePool.setSize(MaxNodeHandles);
c_scanOpPool.setSize(nScanOp);
c_scanBoundPool.setSize(nScanBoundWords);
/*
* Index id is physical array index. We seize and initialize all
* index records now. This assumes ArrayPool is an array.
* index records now.
*/
IndexPtr indexPtr;
while (1) {
......
......@@ -72,7 +72,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
}
ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
// set up index entry
TreeEnt ent;
ent.m_tupAddr = req->tupAddr;
......@@ -143,17 +142,18 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
}
/*
* At most one new node is inserted in the operation. We keep one
* free node pre-allocated so the operation cannot fail. This also
* gives a real TupAddr for links to the new node.
* free node pre-allocated so the operation cannot fail.
*/
if (frag.m_nodeFree == RNIL) {
if (frag.m_freeLoc == NullTupLoc) {
jam();
preallocNode(signal, frag, req->errorCode);
NodeHandle node(frag);
req->errorCode = allocNode(signal, node);
if (req->errorCode != 0) {
jam();
break;
}
ndbrequire(frag.m_nodeFree != RNIL);
frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc);
}
treeAdd(signal, frag, treePos, ent);
break;
......@@ -175,7 +175,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
}
// commit and release nodes
commitNodes(signal, frag, req->errorCode == 0);
#ifdef VM_TRACE
if (debugFlags & DebugTree) {
printTree(signal, frag, debugOut);
......@@ -199,7 +198,7 @@ Dbtux::tupReadAttrs(Signal* signal, const Frag& frag, ReadPar& readPar)
req->requestInfo = 0;
req->tableId = frag.m_tableId;
req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
req->fragPtrI = RNIL;
req->fragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
req->tupAddr = ent.m_tupAddr;
req->tupVersion = ent.m_tupVersion;
req->pageId = RNIL;
......@@ -246,7 +245,7 @@ Dbtux::tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar)
req->requestInfo = TupReadAttrs::ReadKeys;
req->tableId = frag.m_tableId;
req->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
req->fragPtrI = RNIL;
req->fragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
req->tupAddr = ent.m_tupAddr;
req->tupVersion = RNIL; // not used
req->pageId = RNIL;
......@@ -270,100 +269,3 @@ Dbtux::tupReadKeys(Signal* signal, const Frag& frag, ReadPar& readPar)
readPar.m_count = numKeys;
readPar.m_size = copyPar.m_numwords;
}
/*
* Operate on index node tuple in TUP. The data is copied between node
* cache and index storage via signal data.
*/
void
Dbtux::tupStoreTh(Signal* signal, const Frag& frag, NodeHandlePtr nodePtr, StorePar storePar)
{
const TreeHead& tree = frag.m_tree;
// define the direct signal
TupStoreTh* req = (TupStoreTh*)signal->getDataPtrSend();
req->errorCode = RNIL;
req->tableId = frag.m_indexId;
req->fragId = frag.m_fragId;
req->fragPtrI = RNIL;
req->tupAddr = nodePtr.p->m_addr;
req->tupVersion = 0;
req->pageId = nodePtr.p->m_loc.m_pageId;
req->pageOffset = nodePtr.p->m_loc.m_pageOffset;
req->bufferId = 0;
req->opCode = storePar.m_opCode;
ndbrequire(storePar.m_offset + storePar.m_size <= tree.m_nodeSize);
req->dataOffset = storePar.m_offset;
req->dataSize = storePar.m_size;
// the node cache
ndbrequire(nodePtr.p->m_node != 0);
// the buffer in signal data
Uint32* const buffer = (Uint32*)req + TupStoreTh::SignalLength;
// copy in data
switch (storePar.m_opCode) {
case TupStoreTh::OpRead:
jam();
#ifdef VM_TRACE
{
Uint32* dst = buffer + storePar.m_offset;
memset(dst, 0xa9, storePar.m_size << 2);
}
#endif
break;
case TupStoreTh::OpInsert:
jam();
// fallthru
case TupStoreTh::OpUpdate:
jam();
// copy from cache to signal data
{
Uint32* dst = buffer + storePar.m_offset;
const Uint32* src = (const Uint32*)nodePtr.p->m_node + storePar.m_offset;
memcpy(dst, src, storePar.m_size << 2);
}
break;
case TupStoreTh::OpDelete:
jam();
break;
default:
ndbrequire(false);
break;
}
// execute
EXECUTE_DIRECT(DBTUP, GSN_TUP_STORE_TH, signal, TupStoreTh::SignalLength);
jamEntry();
if (req->errorCode != 0) {
jam();
storePar.m_errorCode = req->errorCode;
return;
}
ndbrequire(req->errorCode == 0);
// copy out data
switch (storePar.m_opCode) {
case TupStoreTh::OpRead:
jam();
{
Uint32* dst = (Uint32*)nodePtr.p->m_node + storePar.m_offset;
const Uint32* src = (const Uint32*)buffer + storePar.m_offset;
memcpy(dst, src, storePar.m_size << 2);
}
// fallthru
case TupStoreTh::OpInsert:
jam();
// fallthru
case TupStoreTh::OpUpdate:
jam();
nodePtr.p->m_addr = req->tupAddr;
nodePtr.p->m_loc.m_pageId = req->pageId;
nodePtr.p->m_loc.m_pageOffset = req->pageOffset;
break;
case TupStoreTh::OpDelete:
jam();
nodePtr.p->m_addr = NullTupAddr;
nodePtr.p->m_loc.m_pageId = RNIL;
nodePtr.p->m_loc.m_pageOffset = 0;
break;
default:
ndbrequire(false);
break;
}
}
......@@ -85,6 +85,11 @@ Dbtux::execTUXFRAGREQ(Signal* signal)
fragPtr.p->m_fragOff = req->fragOff;
fragPtr.p->m_fragId = req->fragId;
fragPtr.p->m_numAttrs = req->noOfAttr;
fragPtr.p->m_tupIndexFragPtrI = req->tupIndexFragPtrI;
fragPtr.p->m_tupTableFragPtrI[0] = req->tupTableFragPtrI[0];
fragPtr.p->m_tupTableFragPtrI[1] = req->tupTableFragPtrI[1];
fragPtr.p->m_accTableFragPtrI[0] = req->accTableFragPtrI[0];
fragPtr.p->m_accTableFragPtrI[1] = req->accTableFragPtrI[1];
// add the fragment to the index
indexPtr.p->m_fragId[indexPtr.p->m_numFrags] = req->fragId;
indexPtr.p->m_fragPtrI[indexPtr.p->m_numFrags] = fragPtr.i;
......@@ -197,6 +202,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
jam();
// initialize tree header
TreeHead& tree = fragPtr.p->m_tree;
new (&tree) TreeHead();
// make these configurable later
tree.m_nodeSize = MAX_TTREE_NODE_SIZE;
tree.m_prefSize = MAX_TTREE_PREF_SIZE;
......@@ -222,8 +228,8 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
break;
}
tree.m_minOccup = tree.m_maxOccup - maxSlack;
// root node does not exist
tree.m_root = NullTupAddr;
// root node does not exist (also set by ctor)
tree.m_root = NullTupLoc;
// fragment is defined
c_fragOpPool.release(fragOpPtr);
}
......@@ -310,12 +316,6 @@ Dbtux::dropIndex(Signal* signal, IndexPtr indexPtr, Uint32 senderRef, Uint32 sen
unsigned i = --indexPtr.p->m_numFrags;
FragPtr fragPtr;
c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]);
Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
if (frag.m_nodeFree != RNIL) {
c_nodeHandlePool.release(frag.m_nodeFree);
frag.m_nodeFree = RNIL;
}
c_fragPool.release(fragPtr);
// the real time break is not used for anything currently
signal->theData[0] = TuxContinueB::DropIndex;
......
......@@ -18,175 +18,107 @@
#include "Dbtux.hpp"
/*
* Node handles.
*
* We use the "cache" implementation. Node operations are done on
* cached copies. Index memory is updated at the end of the operation.
* At most one node is inserted and it is always pre-allocated.
*
* An alternative "pointer" implementation which writes directly into
* index memory is planned for later.
* Allocate index node in TUP.
*/
// Dbtux
void
Dbtux::seizeNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
{
if (! c_nodeHandlePool.seize(nodePtr)) {
jam();
return;
}
new (nodePtr.p) NodeHandle(*this, frag);
nodePtr.p->m_next = frag.m_nodeList;
frag.m_nodeList = nodePtr.i;
// node cache used always
nodePtr.p->m_node = (TreeNode*)nodePtr.p->m_cache;
new (nodePtr.p->m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
TreeNode* node = nodePtr.p->m_node;
memset(tree.getPref(node, 0), 0xa2, tree.m_prefSize << 2);
memset(tree.getPref(node, 1), 0xa2, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
}
void
Dbtux::preallocNode(Signal* signal, Frag& frag, Uint32& errorCode)
int
Dbtux::allocNode(Signal* signal, NodeHandle& node)
{
ndbrequire(frag.m_nodeFree == RNIL);
NodeHandlePtr nodePtr;
seizeNode(signal, frag, nodePtr);
ndbrequire(nodePtr.i != RNIL);
// remove from cache XXX ugly
frag.m_nodeFree = frag.m_nodeList;
frag.m_nodeList = nodePtr.p->m_next;
StorePar storePar;
storePar.m_opCode = TupStoreTh::OpInsert;
storePar.m_offset = 0;
storePar.m_size = 0;
tupStoreTh(signal, frag, nodePtr, storePar);
if (storePar.m_errorCode != 0) {
jam();
errorCode = storePar.m_errorCode;
c_nodeHandlePool.release(nodePtr);
frag.m_nodeFree = RNIL;
}
Frag& frag = node.m_frag;
Uint32 pageId = NullTupLoc.m_pageId;
Uint32 pageOffset = NullTupLoc.m_pageOffset;
Uint32* node32 = 0;
int errorCode = c_tup->tuxAllocNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
if (errorCode == 0) {
jam();
node.m_loc = TupLoc(pageId, pageOffset);
node.m_node = reinterpret_cast<TreeNode*>(node32);
node.m_acc = AccNone;
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
}
return errorCode;
}
/*
* Find node in the cache. XXX too slow, use direct links instead
* Access more of the node.
*/
void
Dbtux::findNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr)
Dbtux::accessNode(Signal* signal, NodeHandle& node, AccSize acc)
{
NodeHandlePtr tmpPtr;
tmpPtr.i = frag.m_nodeList;
while (tmpPtr.i != RNIL) {
jam();
c_nodeHandlePool.getPtr(tmpPtr);
if (tmpPtr.p->m_addr == addr) {
jam();
nodePtr = tmpPtr;
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
if (node.m_acc >= acc)
return;
}
tmpPtr.i = tmpPtr.p->m_next;
}
nodePtr.i = RNIL;
nodePtr.p = 0;
// XXX could do prefetch
node.m_acc = acc;
}
/*
* Get handle for existing node.
* Set handle to point to existing node.
*/
void
Dbtux::selectNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, TupAddr addr, AccSize acc)
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc)
{
ndbrequire(addr != NullTupAddr && acc > AccNone);
NodeHandlePtr tmpPtr;
// search in cache
findNode(signal, frag, tmpPtr, addr);
if (tmpPtr.i == RNIL) {
jam();
// add new node
seizeNode(signal, frag, tmpPtr);
ndbrequire(tmpPtr.i != RNIL);
tmpPtr.p->m_addr = addr;
}
if (tmpPtr.p->m_acc < acc) {
jam();
accessNode(signal, frag, tmpPtr, acc);
}
nodePtr = tmpPtr;
Frag& frag = node.m_frag;
ndbrequire(loc != NullTupLoc);
Uint32 pageId = loc.m_pageId;
Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = 0;
c_tup->tuxGetNode(frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
node.m_loc = loc;
node.m_node = reinterpret_cast<TreeNode*>(node32);
node.m_acc = AccNone;
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
accessNode(signal, node, acc);
}
/*
* Create new node in the cache and mark it for insert.
* Set handle to point to new node. Uses the pre-allocated node.
*/
void
Dbtux::insertNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc)
Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc)
{
ndbrequire(acc > AccNone);
NodeHandlePtr tmpPtr;
// use the pre-allocated node
tmpPtr.i = frag.m_nodeFree;
frag.m_nodeFree = RNIL;
c_nodeHandlePool.getPtr(tmpPtr);
// move it to the cache
tmpPtr.p->m_next = frag.m_nodeList;
frag.m_nodeList = tmpPtr.i;
tmpPtr.p->m_acc = acc;
tmpPtr.p->m_flags |= NodeHandle::DoInsert;
nodePtr = tmpPtr;
}
/*
* Mark existing node for deletion.
*/
void
Dbtux::deleteNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr)
{
NodeHandlePtr tmpPtr = nodePtr;
ndbrequire(tmpPtr.p->getOccup() == 0);
tmpPtr.p->m_flags |= NodeHandle::DoDelete;
// scans have already been moved by popDown or popUp
Frag& frag = node.m_frag;
TupLoc loc = frag.m_freeLoc;
frag.m_freeLoc = NullTupLoc;
selectNode(signal, node, loc, acc);
new (node.m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
memset(tree.getPref(node.m_node, 0), 0xa2, tree.m_prefSize << 2);
memset(tree.getPref(node.m_node, 1), 0xa2, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node.m_node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
}
/*
* Access more of the node.
* Delete existing node.
*/
void
Dbtux::accessNode(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, AccSize acc)
Dbtux::deleteNode(Signal* signal, NodeHandle& node)
{
TreeHead& tree = frag.m_tree;
NodeHandlePtr tmpPtr = nodePtr;
if (tmpPtr.p->m_acc >= acc)
return;
if (! (tmpPtr.p->m_flags & NodeHandle::DoInsert)) {
jam();
StorePar storePar;
storePar.m_opCode = TupStoreTh::OpRead;
storePar.m_offset = tree.getSize(tmpPtr.p->m_acc);
storePar.m_size = tree.getSize(acc) - tree.getSize(tmpPtr.p->m_acc);
tmpPtr.p->m_tux.tupStoreTh(signal, frag, tmpPtr, storePar);
ndbrequire(storePar.m_errorCode == 0);
}
tmpPtr.p->m_acc = acc;
Frag& frag = node.m_frag;
ndbrequire(node.getOccup() == 0);
TupLoc loc = node.m_loc;
Uint32 pageId = loc.m_pageId;
Uint32 pageOffset = loc.m_pageOffset;
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node);
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
// invalidate handle and storage
node.m_loc = NullTupLoc;
node.m_node = 0;
}
/*
* Set prefix.
*/
void
Dbtux::setNodePref(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i)
Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
{
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
NodeHandlePtr tmpPtr = nodePtr;
ReadPar readPar;
ndbrequire(i <= 1);
readPar.m_ent = tmpPtr.p->getMinMax(i);
readPar.m_ent = node.getMinMax(i);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
// leave in signal data
......@@ -198,61 +130,11 @@ Dbtux::setNodePref(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned
copyPar.m_items = readPar.m_count;
copyPar.m_headers = true;
copyPar.m_maxwords = tree.m_prefSize;
Data pref = tmpPtr.p->getPref(i);
Data pref = node.getPref(i);
copyAttrs(pref, readPar.m_data, copyPar);
nodePtr.p->m_flags |= NodeHandle::DoUpdate;
}
/*
* Commit and release nodes at the end of an operation. Used also on
* error since no changes have been made (updateOk false).
*/
void
Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk)
{
TreeHead& tree = frag.m_tree;
NodeHandlePtr nodePtr;
nodePtr.i = frag.m_nodeList;
frag.m_nodeList = RNIL;
while (nodePtr.i != RNIL) {
c_nodeHandlePool.getPtr(nodePtr);
const unsigned flags = nodePtr.p->m_flags;
if (flags & NodeHandle::DoDelete) {
jam();
ndbrequire(updateOk);
// delete
StorePar storePar;
storePar.m_opCode = TupStoreTh::OpDelete;
nodePtr.p->m_tux.tupStoreTh(signal, frag, nodePtr, storePar);
ndbrequire(storePar.m_errorCode == 0);
} else if (flags & NodeHandle::DoUpdate) {
jam();
ndbrequire(updateOk);
// set prefixes
if (flags & (1 << 0)) {
jam();
setNodePref(signal, frag, nodePtr, 0);
}
if (flags & (1 << 1)) {
jam();
setNodePref(signal, frag, nodePtr, 1);
}
// update
StorePar storePar;
storePar.m_opCode = TupStoreTh::OpUpdate;
storePar.m_offset = 0;
storePar.m_size = tree.getSize(nodePtr.p->m_acc);
nodePtr.p->m_tux.tupStoreTh(signal, frag, nodePtr, storePar);
ndbrequire(storePar.m_errorCode == 0);
}
// release
NodeHandlePtr tmpPtr = nodePtr;
nodePtr.i = nodePtr.p->m_next;
c_nodeHandlePool.release(tmpPtr);
}
}
// Dbtux::NodeHandle
// node operations
/*
* Add entry at position. Move entries greater than or equal to the old
......@@ -264,25 +146,26 @@ Dbtux::commitNodes(Signal* signal, Frag& frag, bool updateOk)
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*/
void
Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent)
{
TreeHead& tree = m_frag.m_tree;
const unsigned occup = getOccup();
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup < tree.m_maxOccup && pos <= occup);
// fix scans
ScanOpPtr scanPtr;
scanPtr.i = getNodeScan();
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
m_tux.c_scanOpPool.getPtr(scanPtr);
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup);
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
if (scanPos.m_pos >= pos) {
jam();
#ifdef VM_TRACE
if (m_tux.debugFlags & m_tux.DebugScan) {
m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
m_tux.debugOut << "At pushUp pos=" << pos << " " << *this << endl;
if (debugFlags & DebugScan) {
debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pushUp pos=" << pos << " " << node << endl;
}
#endif
scanPos.m_pos++;
......@@ -290,7 +173,7 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(m_node);
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
......@@ -298,18 +181,18 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
tmpList[i] = tmpList[i - 1];
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// fix prefixes
if (occup == 0 || pos == 0)
m_flags |= (1 << 0);
setNodePref(signal, node, 0);
if (occup == 0 || pos == occup)
m_flags |= (1 << 1);
entList[0] = entList[occup + 1];
setOccup(occup + 1);
m_flags |= DoUpdate;
setNodePref(signal, node, 1);
}
/*
* Remove and return entry at position. Move entries greater than the
* removed one to the left. This is the opposite of pushUp.
* removed one to the left. This is the opposite of nodePushUp.
*
* D
* ^ ^
......@@ -317,46 +200,47 @@ Dbtux::NodeHandle::pushUp(Signal* signal, unsigned pos, const TreeEnt& ent)
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*/
void
Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
{
TreeHead& tree = m_frag.m_tree;
const unsigned occup = getOccup();
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
// move scans whose entry disappears
scanPtr.i = getNodeScan();
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
m_tux.c_scanOpPool.getPtr(scanPtr);
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup);
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (m_tux.debugFlags & m_tux.DebugScan) {
m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
m_tux.debugOut << "At popDown pos=" << pos << " " << *this << endl;
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popDown pos=" << pos << " " << node << endl;
}
#endif
m_tux.scanNext(signal, scanPtr);
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
}
// fix other scans
scanPtr.i = getNodeScan();
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
m_tux.c_scanOpPool.getPtr(scanPtr);
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup);
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos > pos) {
jam();
#ifdef VM_TRACE
if (m_tux.debugFlags & m_tux.DebugScan) {
m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
m_tux.debugOut << "At popDown pos=" << pos << " " << *this << endl;
if (debugFlags & DebugScan) {
debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popDown pos=" << pos << " " << node << endl;
}
#endif
scanPos.m_pos--;
......@@ -364,7 +248,7 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(m_node);
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
......@@ -372,13 +256,13 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
jam();
tmpList[i] = tmpList[i + 1];
}
entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefixes
if (occup != 1 && pos == 0)
m_flags |= (1 << 0);
setNodePref(signal, node, 0);
if (occup != 1 && pos == occup - 1)
m_flags |= (1 << 1);
entList[0] = entList[occup - 1];
setOccup(occup - 1);
m_flags |= DoUpdate;
setNodePref(signal, node, 1);
}
/*
......@@ -391,47 +275,48 @@ Dbtux::NodeHandle::popDown(Signal* signal, unsigned pos, TreeEnt& ent)
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*/
void
Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
{
TreeHead& tree = m_frag.m_tree;
const unsigned occup = getOccup();
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
// move scans whose entry disappears
scanPtr.i = getNodeScan();
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
m_tux.c_scanOpPool.getPtr(scanPtr);
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup);
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == 0) {
jam();
#ifdef VM_TRACE
if (m_tux.debugFlags & m_tux.DebugScan) {
m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
m_tux.debugOut << "At pushDown pos=" << pos << " " << *this << endl;
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pushDown pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
m_tux.scanNext(signal, scanPtr);
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
}
// fix other scans
scanPtr.i = getNodeScan();
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
m_tux.c_scanOpPool.getPtr(scanPtr);
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup);
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
ndbrequire(scanPos.m_pos != 0);
if (scanPos.m_pos <= pos) {
jam();
#ifdef VM_TRACE
if (m_tux.debugFlags & m_tux.DebugScan) {
m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
m_tux.debugOut << "At pushDown pos=" << pos << " " << *this << endl;
if (debugFlags & DebugScan) {
debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pushDown pos=" << pos << " " << node << endl;
}
#endif
scanPos.m_pos--;
......@@ -439,7 +324,7 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(m_node);
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
......@@ -449,18 +334,18 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
}
tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefixes
if (true)
m_flags |= (1 << 0);
setNodePref(signal, node, 0);
if (occup == 1 || pos == occup - 1)
m_flags |= (1 << 1);
entList[0] = entList[occup];
m_flags |= DoUpdate;
setNodePref(signal, node, 1);
}
/*
* Remove and return entry at position. Move entries less than the
* removed one to the right. Replace min entry by the input entry.
* This is the opposite of pushDown.
* This is the opposite of nodePushDown.
*
* X D
* v ^ ^
......@@ -468,47 +353,48 @@ Dbtux::NodeHandle::pushDown(Signal* signal, unsigned pos, TreeEnt& ent)
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*/
void
Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
{
TreeHead& tree = m_frag.m_tree;
const unsigned occup = getOccup();
Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr;
// move scans whose entry disappears
scanPtr.i = getNodeScan();
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
m_tux.c_scanOpPool.getPtr(scanPtr);
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup);
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (m_tux.debugFlags & m_tux.DebugScan) {
m_tux.debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
m_tux.debugOut << "At popUp pos=" << pos << " " << *this << endl;
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popUp pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
m_tux.scanNext(signal, scanPtr);
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
}
// fix other scans
scanPtr.i = getNodeScan();
scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) {
jam();
m_tux.c_scanOpPool.getPtr(scanPtr);
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_addr == m_addr && scanPos.m_pos < occup);
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos < pos) {
jam();
#ifdef VM_TRACE
if (m_tux.debugFlags & m_tux.DebugScan) {
m_tux.debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
m_tux.debugOut << "At popUp pos=" << pos << " " << *this << endl;
if (debugFlags & DebugScan) {
debugOut << "Fix scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popUp pos=" << pos << " " << node << endl;
}
#endif
scanPos.m_pos++;
......@@ -516,7 +402,7 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
scanPtr.i = scanPtr.p->m_nodeScan;
}
// fix node
TreeEnt* const entList = tree.getEntList(m_node);
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
......@@ -526,12 +412,12 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
tmpList[i] = tmpList[i - 1];
}
tmpList[0] = newMin;
entList[0] = entList[occup];
// fix prefixes
if (true)
m_flags |= (1 << 0);
setNodePref(signal, node, 0);
if (occup == 1 || pos == occup - 1)
m_flags |= (1 << 1);
entList[0] = entList[occup];
m_flags |= DoUpdate;
setNodePref(signal, node, 1);
}
/*
......@@ -539,14 +425,15 @@ Dbtux::NodeHandle::popUp(Signal* signal, unsigned pos, TreeEnt& ent)
* after the max (i=1). XXX can be optimized
*/
void
Dbtux::NodeHandle::slide(Signal* signal, NodeHandlePtr nodePtr, unsigned i)
Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i)
{
Frag& frag = dstNode.m_frag;
TreeHead& tree = frag.m_tree;
ndbrequire(i <= 1);
TreeHead& tree = m_frag.m_tree;
while (getOccup() < tree.m_maxOccup && nodePtr.p->getOccup() != 0) {
while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) {
TreeEnt ent;
nodePtr.p->popDown(signal, i == 0 ? nodePtr.p->getOccup() - 1 : 0, ent);
pushUp(signal, i == 0 ? 0 : getOccup(), ent);
nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent);
nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent);
}
}
......@@ -555,50 +442,50 @@ Dbtux::NodeHandle::slide(Signal* signal, NodeHandlePtr nodePtr, unsigned i)
* ordering does not matter.
*/
void
Dbtux::NodeHandle::linkScan(Dbtux::ScanOpPtr scanPtr)
Dbtux::linkScan(NodeHandle& node, ScanOpPtr scanPtr)
{
#ifdef VM_TRACE
if (m_tux.debugFlags & m_tux.DebugScan) {
m_tux.debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl;
m_tux.debugOut << "To node " << *this << endl;
if (debugFlags & DebugScan) {
debugOut << "Link scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "To node " << node << endl;
}
#endif
ndbrequire(! islinkScan(scanPtr) && scanPtr.p->m_nodeScan == RNIL);
scanPtr.p->m_nodeScan = getNodeScan();
setNodeScan(scanPtr.i);
ndbrequire(! islinkScan(node, scanPtr) && scanPtr.p->m_nodeScan == RNIL);
scanPtr.p->m_nodeScan = node.getNodeScan();
node.setNodeScan(scanPtr.i);
}
/*
* Unlink a scan from the list under the node.
*/
void
Dbtux::NodeHandle::unlinkScan(Dbtux::ScanOpPtr scanPtr)
Dbtux::unlinkScan(NodeHandle& node, ScanOpPtr scanPtr)
{
#ifdef VM_TRACE
if (m_tux.debugFlags & m_tux.DebugScan) {
m_tux.debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl;
m_tux.debugOut << "From node " << *this << endl;
if (debugFlags & DebugScan) {
debugOut << "Unlink scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "From node " << node << endl;
}
#endif
Dbtux::ScanOpPtr currPtr;
currPtr.i = getNodeScan();
Dbtux::ScanOpPtr prevPtr;
ScanOpPtr currPtr;
currPtr.i = node.getNodeScan();
ScanOpPtr prevPtr;
prevPtr.i = RNIL;
while (true) {
jam();
m_tux.c_scanOpPool.getPtr(currPtr);
c_scanOpPool.getPtr(currPtr);
Uint32 nextPtrI = currPtr.p->m_nodeScan;
if (currPtr.i == scanPtr.i) {
jam();
if (prevPtr.i == RNIL) {
setNodeScan(nextPtrI);
node.setNodeScan(nextPtrI);
} else {
jam();
prevPtr.p->m_nodeScan = nextPtrI;
}
scanPtr.p->m_nodeScan = RNIL;
// check for duplicates
ndbrequire(! islinkScan(scanPtr));
ndbrequire(! islinkScan(node, scanPtr));
return;
}
prevPtr = currPtr;
......@@ -610,13 +497,13 @@ Dbtux::NodeHandle::unlinkScan(Dbtux::ScanOpPtr scanPtr)
* Check if a scan is linked to this node. Only for ndbrequire.
*/
bool
Dbtux::NodeHandle::islinkScan(Dbtux::ScanOpPtr scanPtr)
Dbtux::islinkScan(NodeHandle& node, ScanOpPtr scanPtr)
{
Dbtux::ScanOpPtr currPtr;
currPtr.i = getNodeScan();
ScanOpPtr currPtr;
currPtr.i = node.getNodeScan();
while (currPtr.i != RNIL) {
jam();
m_tux.c_scanOpPool.getPtr(currPtr);
c_scanOpPool.getPtr(currPtr);
if (currPtr.i == scanPtr.i) {
jam();
return true;
......@@ -627,7 +514,7 @@ Dbtux::NodeHandle::islinkScan(Dbtux::ScanOpPtr scanPtr)
}
void
Dbtux::NodeHandle::progError(int line, int cause, const char* extra)
Dbtux::NodeHandle::progError(int line, int cause, const char* file)
{
m_tux.progError(line, cause, extra);
ErrorReporter::handleAssert("Dbtux::NodeHandle: assert failed", file, line);
}
......@@ -42,12 +42,11 @@ Dbtux::execACC_SCANREQ(Signal* signal)
}
ndbrequire(fragPtr.i != RNIL);
Frag& frag = *fragPtr.p;
ndbrequire(frag.m_nodeList == RNIL);
// must be normal DIH/TC fragment
ndbrequire(frag.m_fragId < (1 << frag.m_fragOff));
TreeHead& tree = frag.m_tree;
// check for empty fragment
if (tree.m_root == NullTupAddr) {
if (tree.m_root == NullTupLoc) {
jam();
AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
conf->scanPtr = req->senderData;
......@@ -241,7 +240,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl;
}
#endif
ndbrequire(frag.m_nodeList == RNIL);
// handle unlock previous and close scan
switch (req->scanFlag) {
case NextScanReq::ZSCAN_NEXT:
......@@ -275,13 +273,13 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
case NextScanReq::ZSCAN_CLOSE:
jam();
// unlink from tree node first to avoid state changes
if (scan.m_scanPos.m_addr != NullTupAddr) {
if (scan.m_scanPos.m_loc != NullTupLoc) {
jam();
const TupAddr addr = scan.m_scanPos.m_addr;
NodeHandlePtr nodePtr;
selectNode(signal, frag, nodePtr, addr, AccHead);
nodePtr.p->unlinkScan(scanPtr);
scan.m_scanPos.m_addr = NullTupAddr;
const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandle node(frag);
selectNode(signal, node, loc, AccHead);
unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc;
}
if (scan.m_lockwait) {
jam();
......@@ -295,7 +293,6 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_state = ScanOp::Aborting;
commitNodes(signal, frag, true);
return;
}
if (scan.m_state == ScanOp::Locked) {
......@@ -350,7 +347,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry();
commitNodes(signal, frag, true);
return; // stop
}
if (scan.m_lockwait) {
......@@ -365,7 +361,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
// if TC has ordered scan close, it will be detected here
sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
commitNodes(signal, frag, true);
return; // stop
}
if (scan.m_state == ScanOp::First) {
......@@ -407,8 +402,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
lockReq->userRef = reference();
lockReq->tableId = scan.m_tableId;
lockReq->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
// should cache this at fragment create
lockReq->fragPtrI = RNIL;
lockReq->fragPtrI = frag.m_accTableFragPtrI[ent.m_fragBit];
const Uint32* const buf32 = static_cast<Uint32*>(keyPar.m_data);
const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32);
lockReq->hashValue = md5_hash(buf64, keyPar.m_size);
......@@ -445,7 +439,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry();
commitNodes(signal, frag, true);
return; // stop
break;
case AccLockReq::Refused:
......@@ -458,7 +451,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry();
commitNodes(signal, frag, true);
return; // stop
break;
case AccLockReq::NoFreeOp:
......@@ -471,7 +463,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
signal->theData[1] = true;
EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
jamEntry();
commitNodes(signal, frag, true);
return; // stop
break;
default:
......@@ -555,7 +546,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
scan.m_lastEnt = ent;
// next time look for next entry
scan.m_state = ScanOp::Next;
commitNodes(signal, frag, true);
return;
}
// XXX in ACC this is checked before req->checkLcpStop
......@@ -569,7 +559,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
unsigned signalLength = 3;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
commitNodes(signal, frag, true);
return;
}
ndbrequire(false);
......@@ -700,15 +689,15 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
TreeHead& tree = frag.m_tree;
if (tree.m_root == NullTupAddr) {
if (tree.m_root == NullTupLoc) {
// tree may have become empty
jam();
scan.m_state = ScanOp::Last;
return;
}
TreePos pos;
pos.m_addr = tree.m_root;
NodeHandlePtr nodePtr;
pos.m_loc = tree.m_root;
NodeHandle node(frag);
// unpack lower bound
const ScanBound& bound = *scan.m_bound[0];
ScanBoundIterator iter;
......@@ -725,20 +714,20 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
boundPar.m_dir = 0;
loop: {
jam();
selectNode(signal, frag, nodePtr, pos.m_addr, AccPref);
const unsigned occup = nodePtr.p->getOccup();
selectNode(signal, node, pos.m_loc, AccPref);
const unsigned occup = node.getOccup();
ndbrequire(occup != 0);
for (unsigned i = 0; i <= 1; i++) {
jam();
// compare prefix
boundPar.m_data2 = nodePtr.p->getPref(i);
boundPar.m_data2 = node.getPref(i);
boundPar.m_len2 = tree.m_prefSize;
int ret = cmpScanBound(frag, boundPar);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read full value
ReadPar readPar;
readPar.m_ent = nodePtr.p->getMinMax(i);
readPar.m_ent = node.getMinMax(i);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
......@@ -751,11 +740,11 @@ loop: {
}
if (i == 0 && ret < 0) {
jam();
const TupAddr tupAddr = nodePtr.p->getLink(i);
if (tupAddr != NullTupAddr) {
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
pos.m_addr = tupAddr;
pos.m_loc = loc;
goto loop;
}
// start scanning this node
......@@ -764,34 +753,34 @@ loop: {
pos.m_dir = 3;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
nodePtr.p->linkScan(scanPtr);
linkScan(node, scanPtr);
return;
}
if (i == 1 && ret > 0) {
jam();
const TupAddr tupAddr = nodePtr.p->getLink(i);
if (tupAddr != NullTupAddr) {
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to right subtree
pos.m_addr = tupAddr;
pos.m_loc = loc;
goto loop;
}
// start scanning upwards
pos.m_dir = 1;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
nodePtr.p->linkScan(scanPtr);
linkScan(node, scanPtr);
return;
}
}
// read rest of current node
accessNode(signal, frag, nodePtr, AccFull);
accessNode(signal, node, AccFull);
// look for first entry
ndbrequire(occup >= 2);
for (unsigned j = 1; j < occup; j++) {
jam();
ReadPar readPar;
readPar.m_ent = nodePtr.p->getEnt(j);
readPar.m_ent = node.getEnt(j);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
......@@ -809,7 +798,7 @@ loop: {
pos.m_dir = 3;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
nodePtr.p->linkScan(scanPtr);
linkScan(node, scanPtr);
return;
}
}
......@@ -869,31 +858,31 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
// use copy of position
TreePos pos = scan.m_scanPos;
// get and remember original node
NodeHandlePtr origNodePtr;
selectNode(signal, frag, origNodePtr, pos.m_addr, AccHead);
ndbrequire(origNodePtr.p->islinkScan(scanPtr));
NodeHandle origNode(frag);
selectNode(signal, origNode, pos.m_loc, AccHead);
ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop
NodeHandlePtr nodePtr = origNodePtr;
NodeHandle node = origNode;
while (true) {
jam();
if (pos.m_dir == 2) {
// coming up from root ends the scan
jam();
pos.m_addr = NullTupAddr;
pos.m_loc = NullTupLoc;
scan.m_state = ScanOp::Last;
break;
}
if (nodePtr.p->m_addr != pos.m_addr) {
if (node.m_loc != pos.m_loc) {
jam();
selectNode(signal, frag, nodePtr, pos.m_addr, AccHead);
selectNode(signal, node, pos.m_loc, AccHead);
}
if (pos.m_dir == 4) {
// coming down from parent proceed to left child
jam();
TupAddr addr = nodePtr.p->getLink(0);
if (addr != NullTupAddr) {
TupLoc loc = node.getLink(0);
if (loc != NullTupLoc) {
jam();
pos.m_addr = addr;
pos.m_loc = loc;
pos.m_dir = 4; // unchanged
continue;
}
......@@ -910,10 +899,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (pos.m_dir == 3) {
// within node
jam();
unsigned occup = nodePtr.p->getOccup();
unsigned occup = node.getOccup();
ndbrequire(occup >= 1);
// access full node
accessNode(signal, frag, nodePtr, AccFull);
accessNode(signal, node, AccFull);
// advance position
if (! pos.m_match)
pos.m_match = true;
......@@ -921,7 +910,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
pos.m_pos++;
if (pos.m_pos < occup) {
jam();
pos.m_ent = nodePtr.p->getEnt(pos.m_pos);
pos.m_ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged
// XXX implement prefix optimization
ReadPar readPar;
......@@ -938,7 +927,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (ret < 0) {
jam();
// hit upper bound of single range scan
pos.m_addr = NullTupAddr;
pos.m_loc = NullTupLoc;
scan.m_state = ScanOp::Last;
break;
}
......@@ -952,10 +941,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break;
}
// after node proceed to right child
TupAddr addr = nodePtr.p->getLink(1);
if (addr != NullTupAddr) {
TupLoc loc = node.getLink(1);
if (loc != NullTupLoc) {
jam();
pos.m_addr = addr;
pos.m_loc = loc;
pos.m_dir = 4;
continue;
}
......@@ -965,8 +954,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if (pos.m_dir == 1) {
// coming from right child proceed to parent
jam();
pos.m_addr = nodePtr.p->getLink(2);
pos.m_dir = nodePtr.p->getSide();
pos.m_loc = node.getLink(2);
pos.m_dir = node.getSide();
continue;
}
ndbrequire(false);
......@@ -975,16 +964,16 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos;
// relink
if (scan.m_state == ScanOp::Current) {
ndbrequire(pos.m_addr == nodePtr.p->m_addr);
if (origNodePtr.i != nodePtr.i) {
ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) {
jam();
origNodePtr.p->unlinkScan(scanPtr);
nodePtr.p->linkScan(scanPtr);
unlinkScan(origNode, scanPtr);
linkScan(node, scanPtr);
}
} else if (scan.m_state == ScanOp::Last) {
jam();
ndbrequire(pos.m_addr == NullTupAddr);
origNodePtr.p->unlinkScan(scanPtr);
ndbrequire(pos.m_loc == NullTupLoc);
unlinkScan(origNode, scanPtr);
} else {
ndbrequire(false);
}
......@@ -1044,7 +1033,6 @@ void
Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
{
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
// unlock all not unlocked by LQH
for (unsigned i = 0; i < MaxAccLockOps; i++) {
......@@ -1069,7 +1057,6 @@ Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
releaseScanOp(scanPtr);
commitNodes(signal, frag, true);
}
void
......
......@@ -30,19 +30,19 @@ Dbtux::treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& tree
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
treePos.m_addr = tree.m_root;
NodeHandlePtr nodePtr;
if (treePos.m_addr == NullTupAddr) {
treePos.m_loc = tree.m_root;
if (treePos.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_pos = 0;
treePos.m_match = false;
return;
}
NodeHandle node(frag);
loop: {
jam();
selectNode(signal, frag, nodePtr, treePos.m_addr, AccPref);
const unsigned occup = nodePtr.p->getOccup();
selectNode(signal, node, treePos.m_loc, AccPref);
const unsigned occup = node.getOccup();
ndbrequire(occup != 0);
// number of equal initial attributes in bounding node
unsigned numEq = ZNIL;
......@@ -51,7 +51,7 @@ loop: {
// compare prefix
CmpPar cmpPar;
cmpPar.m_data1 = searchPar.m_data;
cmpPar.m_data2 = nodePtr.p->getPref(i);
cmpPar.m_data2 = node.getPref(i);
cmpPar.m_len2 = tree.m_prefSize;
cmpPar.m_first = 0;
cmpPar.m_numEq = 0;
......@@ -60,7 +60,7 @@ loop: {
jam();
// read full value
ReadPar readPar;
readPar.m_ent = nodePtr.p->getMinMax(i);
readPar.m_ent = node.getMinMax(i);
ndbrequire(cmpPar.m_numEq < numAttrs);
readPar.m_first = cmpPar.m_numEq;
readPar.m_count = numAttrs - cmpPar.m_numEq;
......@@ -78,19 +78,18 @@ loop: {
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchPar.m_ent.cmp(nodePtr.p->getMinMax(i));
ret = searchPar.m_ent.cmp(node.getMinMax(i));
}
if (i == 0 ? (ret < 0) : (ret > 0)) {
jam();
const TupAddr tupAddr = nodePtr.p->getLink(i);
if (tupAddr != NullTupAddr) {
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to left/right subtree
treePos.m_addr = tupAddr;
treePos.m_loc = loc;
goto loop;
}
// position is immediately before/after this node
// XXX disallow second case
treePos.m_pos = (i == 0 ? 0 : occup);
treePos.m_match = false;
return;
......@@ -103,8 +102,8 @@ loop: {
return;
}
}
// read rest of the bounding node
accessNode(signal, frag, nodePtr, AccFull);
// access rest of the bounding node
accessNode(signal, node, AccFull);
// position is strictly within the node
ndbrequire(occup >= 2);
const unsigned numWithin = occup - 2;
......@@ -115,7 +114,7 @@ loop: {
if (numEq < numAttrs) {
jam();
ReadPar readPar;
readPar.m_ent = nodePtr.p->getEnt(j);
readPar.m_ent = node.getEnt(j);
readPar.m_first = numEq;
readPar.m_count = numAttrs - numEq;
readPar.m_data = 0; // leave in signal data
......@@ -132,7 +131,7 @@ loop: {
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchPar.m_ent.cmp(nodePtr.p->getEnt(j));
ret = searchPar.m_ent.cmp(node.getEnt(j));
}
if (ret <= 0) {
jam();
......@@ -157,94 +156,94 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
{
TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandlePtr nodePtr;
NodeHandle node(frag);
// check for empty tree
if (treePos.m_addr == NullTupAddr) {
if (treePos.m_loc == NullTupLoc) {
jam();
insertNode(signal, frag, nodePtr, AccPref);
nodePtr.p->pushUp(signal, 0, ent);
nodePtr.p->setSide(2);
tree.m_root = nodePtr.p->m_addr;
insertNode(signal, node, AccPref);
nodePushUp(signal, node, 0, ent);
node.setSide(2);
tree.m_root = node.m_loc;
return;
}
// access full node
selectNode(signal, frag, nodePtr, treePos.m_addr, AccFull);
selectNode(signal, node, treePos.m_loc, AccFull);
// check if it is bounding node
if (pos != 0 && pos != nodePtr.p->getOccup()) {
if (pos != 0 && pos != node.getOccup()) {
jam();
// check if room for one more
if (nodePtr.p->getOccup() < tree.m_maxOccup) {
if (node.getOccup() < tree.m_maxOccup) {
jam();
nodePtr.p->pushUp(signal, pos, ent);
nodePushUp(signal, node, pos, ent);
return;
}
// returns min entry
nodePtr.p->pushDown(signal, pos - 1, ent);
nodePushDown(signal, node, pos - 1, ent);
// find position to add the removed min entry
TupAddr childAddr = nodePtr.p->getLink(0);
if (childAddr == NullTupAddr) {
TupLoc childLoc = node.getLink(0);
if (childLoc == NullTupLoc) {
jam();
// left child will be added
pos = 0;
} else {
jam();
// find glb node
while (childAddr != NullTupAddr) {
while (childLoc != NullTupLoc) {
jam();
selectNode(signal, frag, nodePtr, childAddr, AccHead);
childAddr = nodePtr.p->getLink(1);
selectNode(signal, node, childLoc, AccHead);
childLoc = node.getLink(1);
}
// access full node again
accessNode(signal, frag, nodePtr, AccFull);
pos = nodePtr.p->getOccup();
accessNode(signal, node, AccFull);
pos = node.getOccup();
}
// fall thru to next case
}
// adding new min or max
unsigned i = (pos == 0 ? 0 : 1);
ndbrequire(nodePtr.p->getLink(i) == NullTupAddr);
ndbrequire(node.getLink(i) == NullTupLoc);
// check if the half-leaf/leaf has room for one more
if (nodePtr.p->getOccup() < tree.m_maxOccup) {
if (node.getOccup() < tree.m_maxOccup) {
jam();
nodePtr.p->pushUp(signal, pos, ent);
nodePushUp(signal, node, pos, ent);
return;
}
// add a new node
NodeHandlePtr childPtr;
insertNode(signal, frag, childPtr, AccPref);
childPtr.p->pushUp(signal, 0, ent);
NodeHandle childNode(frag);
insertNode(signal, childNode, AccPref);
nodePushUp(signal, childNode, 0, ent);
// connect parent and child
nodePtr.p->setLink(i, childPtr.p->m_addr);
childPtr.p->setLink(2, nodePtr.p->m_addr);
childPtr.p->setSide(i);
node.setLink(i, childNode.m_loc);
childNode.setLink(2, node.m_loc);
childNode.setSide(i);
// re-balance tree at each node
while (true) {
// height of subtree i has increased by 1
int j = (i == 0 ? -1 : +1);
int b = nodePtr.p->getBalance();
int b = node.getBalance();
if (b == 0) {
// perfectly balanced
jam();
nodePtr.p->setBalance(j);
node.setBalance(j);
// height change propagates up
} else if (b == -j) {
// height of shorter subtree increased
jam();
nodePtr.p->setBalance(0);
node.setBalance(0);
// height of tree did not change - done
break;
} else if (b == j) {
// height of longer subtree increased
jam();
NodeHandlePtr childPtr;
selectNode(signal, frag, childPtr, nodePtr.p->getLink(i), AccHead);
int b2 = childPtr.p->getBalance();
NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(i), AccHead);
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
treeRotateSingle(signal, frag, nodePtr, i);
treeRotateSingle(signal, frag, node, i);
} else if (b2 == -b) {
jam();
treeRotateDouble(signal, frag, nodePtr, i);
treeRotateDouble(signal, frag, node, i);
} else {
// height of subtree increased so it cannot be perfectly balanced
ndbrequire(false);
......@@ -254,14 +253,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
} else {
ndbrequire(false);
}
TupAddr parentAddr = nodePtr.p->getLink(2);
if (parentAddr == NullTupAddr) {
TupLoc parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) {
jam();
// root node - done
break;
}
i = nodePtr.p->getSide();
selectNode(signal, frag, nodePtr, parentAddr, AccHead);
i = node.getSide();
selectNode(signal, node, parentLoc, AccHead);
}
}
......@@ -273,101 +272,101 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
{
TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandlePtr nodePtr;
NodeHandle node(frag);
// access full node
selectNode(signal, frag, nodePtr, treePos.m_addr, AccFull);
selectNode(signal, node, treePos.m_loc, AccFull);
TreeEnt ent;
// check interior node first
if (nodePtr.p->getChilds() == 2) {
if (node.getChilds() == 2) {
jam();
ndbrequire(nodePtr.p->getOccup() >= tree.m_minOccup);
ndbrequire(node.getOccup() >= tree.m_minOccup);
// check if no underflow
if (nodePtr.p->getOccup() > tree.m_minOccup) {
if (node.getOccup() > tree.m_minOccup) {
jam();
nodePtr.p->popDown(signal, pos, ent);
nodePopDown(signal, node, pos, ent);
return;
}
// save current handle
NodeHandlePtr parentPtr = nodePtr;
NodeHandle parentNode = node;
// find glb node
TupAddr childAddr = nodePtr.p->getLink(0);
while (childAddr != NullTupAddr) {
TupLoc childLoc = node.getLink(0);
while (childLoc != NullTupLoc) {
jam();
selectNode(signal, frag, nodePtr, childAddr, AccHead);
childAddr = nodePtr.p->getLink(1);
selectNode(signal, node, childLoc, AccHead);
childLoc = node.getLink(1);
}
// access full node again
accessNode(signal, frag, nodePtr, AccFull);
accessNode(signal, node, AccFull);
// use glb max as new parent min
ent = nodePtr.p->getEnt(nodePtr.p->getOccup() - 1);
parentPtr.p->popUp(signal, pos, ent);
ent = node.getEnt(node.getOccup() - 1);
nodePopUp(signal, parentNode, pos, ent);
// set up to remove glb max
pos = nodePtr.p->getOccup() - 1;
pos = node.getOccup() - 1;
// fall thru to next case
}
// remove the element
nodePtr.p->popDown(signal, pos, ent);
ndbrequire(nodePtr.p->getChilds() <= 1);
nodePopDown(signal, node, pos, ent);
ndbrequire(node.getChilds() <= 1);
// handle half-leaf
for (unsigned i = 0; i <= 1; i++) {
jam();
TupAddr childAddr = nodePtr.p->getLink(i);
if (childAddr != NullTupAddr) {
TupLoc childLoc = node.getLink(i);
if (childLoc != NullTupLoc) {
// move to child
selectNode(signal, frag, nodePtr, childAddr, AccFull);
selectNode(signal, node, childLoc, AccFull);
// balance of half-leaf parent requires child to be leaf
break;
}
}
ndbrequire(nodePtr.p->getChilds() == 0);
ndbrequire(node.getChilds() == 0);
// get parent if any
TupAddr parentAddr = nodePtr.p->getLink(2);
NodeHandlePtr parentPtr;
unsigned i = nodePtr.p->getSide();
TupLoc parentLoc = node.getLink(2);
NodeHandle parentNode(frag);
unsigned i = node.getSide();
// move all that fits into parent
if (parentAddr != NullTupAddr) {
if (parentLoc != NullTupLoc) {
jam();
selectNode(signal, frag, parentPtr, nodePtr.p->getLink(2), AccFull);
parentPtr.p->slide(signal, nodePtr, i);
selectNode(signal, parentNode, node.getLink(2), AccFull);
nodeSlide(signal, parentNode, node, i);
// fall thru to next case
}
// non-empty leaf
if (nodePtr.p->getOccup() >= 1) {
if (node.getOccup() >= 1) {
jam();
return;
}
// remove empty leaf
deleteNode(signal, frag, nodePtr);
if (parentAddr == NullTupAddr) {
deleteNode(signal, node);
if (parentLoc == NullTupLoc) {
jam();
// tree is now empty
tree.m_root = NullTupAddr;
tree.m_root = NullTupLoc;
return;
}
nodePtr = parentPtr;
nodePtr.p->setLink(i, NullTupAddr);
node = parentNode;
node.setLink(i, NullTupLoc);
#ifdef dbtux_min_occup_less_max_occup
// check if we created a half-leaf
if (nodePtr.p->getBalance() == 0) {
if (node.getBalance() == 0) {
jam();
// move entries from the other child
TupAddr childAddr = nodePtr.p->getLink(1 - i);
NodeHandlePtr childPtr;
selectNode(signal, frag, childPtr, childAddr, AccFull);
nodePtr.p->slide(signal, childPtr, 1 - i);
if (childPtr.p->getOccup() == 0) {
jam();
deleteNode(signal, frag, childPtr);
nodePtr.p->setLink(1 - i, NullTupAddr);
TupLoc childLoc = node.getLink(1 - i);
NodeHandle childNode(frag);
selectNode(signal, childNode, childLoc, AccFull);
nodeSlide(signal, node, childNode, 1 - i);
if (childNode.getOccup() == 0) {
jam();
deleteNode(signal, childNode);
node.setLink(1 - i, NullTupLoc);
// we are balanced again but our parent balance changes by -1
parentAddr = nodePtr.p->getLink(2);
if (parentAddr == NullTupAddr) {
parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) {
jam();
return;
}
// fix side and become parent
i = nodePtr.p->getSide();
selectNode(signal, frag, nodePtr, parentAddr, AccHead);
i = node.getSide();
selectNode(signal, node, parentLoc, AccHead);
}
}
#endif
......@@ -375,50 +374,50 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
while (true) {
// height of subtree i has decreased by 1
int j = (i == 0 ? -1 : +1);
int b = nodePtr.p->getBalance();
int b = node.getBalance();
if (b == 0) {
// perfectly balanced
jam();
nodePtr.p->setBalance(-j);
node.setBalance(-j);
// height of tree did not change - done
return;
} else if (b == j) {
// height of longer subtree has decreased
jam();
nodePtr.p->setBalance(0);
node.setBalance(0);
// height change propagates up
} else if (b == -j) {
// height of shorter subtree has decreased
jam();
NodeHandlePtr childPtr;
// child on the other side
selectNode(signal, frag, childPtr, nodePtr.p->getLink(1 - i), AccHead);
int b2 = childPtr.p->getBalance();
NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(1 - i), AccHead);
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
treeRotateSingle(signal, frag, nodePtr, 1 - i);
treeRotateSingle(signal, frag, node, 1 - i);
// height of tree decreased and propagates up
} else if (b2 == -b) {
jam();
treeRotateDouble(signal, frag, nodePtr, 1 - i);
treeRotateDouble(signal, frag, node, 1 - i);
// height of tree decreased and propagates up
} else {
jam();
treeRotateSingle(signal, frag, nodePtr, 1 - i);
treeRotateSingle(signal, frag, node, 1 - i);
// height of tree did not change - done
return;
}
} else {
ndbrequire(false);
}
TupAddr parentAddr = nodePtr.p->getLink(2);
if (parentAddr == NullTupAddr) {
TupLoc parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) {
jam();
// root node - done
return;
}
i = nodePtr.p->getSide();
selectNode(signal, frag, nodePtr, parentAddr, AccHead);
i = node.getSide();
selectNode(signal, node, parentLoc, AccHead);
}
}
......@@ -441,55 +440,55 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
void
Dbtux::treeRotateSingle(Signal* signal,
Frag& frag,
NodeHandlePtr& nodePtr,
NodeHandle& node,
unsigned i)
{
ndbrequire(i <= 1);
/*
5 is the old top node that have been unbalanced due to an insert or
delete. The balance is still the old balance before the update.
Verify that n5Bal is 1 if RR rotate and -1 if LL rotate.
Verify that bal5 is 1 if RR rotate and -1 if LL rotate.
*/
NodeHandlePtr n5Ptr = nodePtr;
const TupAddr n5Addr = n5Ptr.p->m_addr;
const int n5Bal = n5Ptr.p->getBalance();
const int n5side = n5Ptr.p->getSide();
ndbrequire(n5Bal + (1 - i) == i);
NodeHandle node5 = node;
const TupLoc loc5 = node5.m_loc;
const int bal5 = node5.getBalance();
const int side5 = node5.getSide();
ndbrequire(bal5 + (1 - i) == i);
/*
3 is the new root of this part of the tree which is to swap place with
node 5. For an insert to cause this it must have the same balance as 5.
For deletes it can have the balance 0.
*/
TupAddr n3Addr = n5Ptr.p->getLink(i);
NodeHandlePtr n3Ptr;
selectNode(signal, frag, n3Ptr, n3Addr, AccHead);
const int n3Bal = n3Ptr.p->getBalance();
TupLoc loc3 = node5.getLink(i);
NodeHandle node3(frag);
selectNode(signal, node3, loc3, AccHead);
const int bal3 = node3.getBalance();
/*
2 must always be there but is not changed. Thus we mereley check that it
exists.
*/
ndbrequire(n3Ptr.p->getLink(i) != NullTupAddr);
ndbrequire(node3.getLink(i) != NullTupLoc);
/*
4 is not necessarily there but if it is there it will move from one
side of 3 to the other side of 5. For LL it moves from the right side
to the left side and for RR it moves from the left side to the right
side. This means that it also changes parent from 3 to 5.
*/
TupAddr n4Addr = n3Ptr.p->getLink(1 - i);
NodeHandlePtr n4Ptr;
if (n4Addr != NullTupAddr) {
jam();
selectNode(signal, frag, n4Ptr, n4Addr, AccHead);
ndbrequire(n4Ptr.p->getSide() == (1 - i) &&
n4Ptr.p->getLink(2) == n3Addr);
n4Ptr.p->setSide(i);
n4Ptr.p->setLink(2, n5Addr);
TupLoc loc4 = node3.getLink(1 - i);
NodeHandle node4(frag);
if (loc4 != NullTupLoc) {
jam();
selectNode(signal, node4, loc4, AccHead);
ndbrequire(node4.getSide() == (1 - i) &&
node4.getLink(2) == loc3);
node4.setSide(i);
node4.setLink(2, loc5);
}//if
/*
Retrieve the address of 5's parent before it is destroyed
*/
TupAddr n0Addr = n5Ptr.p->getLink(2);
TupLoc loc0 = node5.getLink(2);
/*
The next step is to perform the rotation. 3 will inherit 5's parent
......@@ -503,22 +502,22 @@ Dbtux::treeRotateSingle(Signal* signal,
1. 3 must have had 5 as parent before the change.
2. 3's side is left for LL and right for RR before change.
*/
ndbrequire(n3Ptr.p->getLink(2) == n5Addr);
ndbrequire(n3Ptr.p->getSide() == i);
n3Ptr.p->setLink(1 - i, n5Addr);
n3Ptr.p->setLink(2, n0Addr);
n3Ptr.p->setSide(n5side);
n5Ptr.p->setLink(i, n4Addr);
n5Ptr.p->setLink(2, n3Addr);
n5Ptr.p->setSide(1 - i);
if (n0Addr != NullTupAddr) {
jam();
NodeHandlePtr n0Ptr;
selectNode(signal, frag, n0Ptr, n0Addr, AccHead);
n0Ptr.p->setLink(n5side, n3Addr);
ndbrequire(node3.getLink(2) == loc5);
ndbrequire(node3.getSide() == i);
node3.setLink(1 - i, loc5);
node3.setLink(2, loc0);
node3.setSide(side5);
node5.setLink(i, loc4);
node5.setLink(2, loc3);
node5.setSide(1 - i);
if (loc0 != NullTupLoc) {
jam();
NodeHandle node0(frag);
selectNode(signal, node0, loc0, AccHead);
node0.setLink(side5, loc3);
} else {
jam();
frag.m_tree.m_root = n3Addr;
frag.m_tree.m_root = loc3;
}//if
/* The final step of the change is to update the balance of 3 and
5 that changed places. There are two cases here. The first case is
......@@ -531,22 +530,22 @@ Dbtux::treeRotateSingle(Signal* signal,
In this case 5 will change balance but still be unbalanced and 3 will
be unbalanced in the opposite direction of 5.
*/
if (n3Bal == n5Bal) {
if (bal3 == bal5) {
jam();
n3Ptr.p->setBalance(0);
n5Ptr.p->setBalance(0);
} else if (n3Bal == 0) {
node3.setBalance(0);
node5.setBalance(0);
} else if (bal3 == 0) {
jam();
n3Ptr.p->setBalance(-n5Bal);
n5Ptr.p->setBalance(n5Bal);
node3.setBalance(-bal5);
node5.setBalance(bal5);
} else {
ndbrequire(false);
}//if
/*
Set nodePtr to 3 as return parameter for enabling caller to continue
Set node to 3 as return parameter for enabling caller to continue
traversing the tree.
*/
nodePtr = n3Ptr;
node = node3;
}
/*
......@@ -651,105 +650,105 @@ Dbtux::treeRotateSingle(Signal* signal,
*
*/
void
Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandlePtr& nodePtr, unsigned i)
Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i)
{
// old top node
NodeHandlePtr n6Ptr = nodePtr;
const TupAddr n6Addr = n6Ptr.p->m_addr;
NodeHandle node6 = node;
const TupLoc loc6 = node6.m_loc;
// the un-updated balance
const int n6Bal = n6Ptr.p->getBalance();
const unsigned n6Side = n6Ptr.p->getSide();
const int bal6 = node6.getBalance();
const unsigned side6 = node6.getSide();
// level 1
TupAddr n2Addr = n6Ptr.p->getLink(i);
NodeHandlePtr n2Ptr;
selectNode(signal, frag, n2Ptr, n2Addr, AccHead);
const int n2Bal = n2Ptr.p->getBalance();
TupLoc loc2 = node6.getLink(i);
NodeHandle node2(frag);
selectNode(signal, node2, loc2, AccHead);
const int bal2 = node2.getBalance();
// level 2
TupAddr n4Addr = n2Ptr.p->getLink(1 - i);
NodeHandlePtr n4Ptr;
selectNode(signal, frag, n4Ptr, n4Addr, AccHead);
const int n4Bal = n4Ptr.p->getBalance();
TupLoc loc4 = node2.getLink(1 - i);
NodeHandle node4(frag);
selectNode(signal, node4, loc4, AccHead);
const int bal4 = node4.getBalance();
ndbrequire(i <= 1);
ndbrequire(n6Bal + (1 - i) == i);
ndbrequire(n2Bal == -n6Bal);
ndbrequire(n2Ptr.p->getLink(2) == n6Addr);
ndbrequire(n2Ptr.p->getSide() == i);
ndbrequire(n4Ptr.p->getLink(2) == n2Addr);
ndbrequire(bal6 + (1 - i) == i);
ndbrequire(bal2 == -bal6);
ndbrequire(node2.getLink(2) == loc6);
ndbrequire(node2.getSide() == i);
ndbrequire(node4.getLink(2) == loc2);
// level 3
TupAddr n3Addr = n4Ptr.p->getLink(i);
TupAddr n5Addr = n4Ptr.p->getLink(1 - i);
TupLoc loc3 = node4.getLink(i);
TupLoc loc5 = node4.getLink(1 - i);
// fill up leaf before it becomes internal
if (n3Addr == NullTupAddr && n5Addr == NullTupAddr) {
if (loc3 == NullTupLoc && loc5 == NullTupLoc) {
jam();
TreeHead& tree = frag.m_tree;
accessNode(signal, frag, n2Ptr, AccFull);
accessNode(signal, frag, n4Ptr, AccFull);
n4Ptr.p->slide(signal, n2Ptr, i);
accessNode(signal, node2, AccFull);
accessNode(signal, node4, AccFull);
nodeSlide(signal, node4, node2, i);
// implied by rule of merging half-leaves with leaves
ndbrequire(n4Ptr.p->getOccup() >= tree.m_minOccup);
ndbrequire(n2Ptr.p->getOccup() != 0);
ndbrequire(node4.getOccup() >= tree.m_minOccup);
ndbrequire(node2.getOccup() != 0);
} else {
if (n3Addr != NullTupAddr) {
if (loc3 != NullTupLoc) {
jam();
NodeHandlePtr n3Ptr;
selectNode(signal, frag, n3Ptr, n3Addr, AccHead);
n3Ptr.p->setLink(2, n2Addr);
n3Ptr.p->setSide(1 - i);
NodeHandle node3(frag);
selectNode(signal, node3, loc3, AccHead);
node3.setLink(2, loc2);
node3.setSide(1 - i);
}
if (n5Addr != NullTupAddr) {
if (loc5 != NullTupLoc) {
jam();
NodeHandlePtr n5Ptr;
selectNode(signal, frag, n5Ptr, n5Addr, AccHead);
n5Ptr.p->setLink(2, n6Ptr.p->m_addr);
n5Ptr.p->setSide(i);
NodeHandle node5(frag);
selectNode(signal, node5, loc5, AccHead);
node5.setLink(2, node6.m_loc);
node5.setSide(i);
}
}
// parent
TupAddr n0Addr = n6Ptr.p->getLink(2);
NodeHandlePtr n0Ptr;
TupLoc loc0 = node6.getLink(2);
NodeHandle node0(frag);
// perform the rotation
n6Ptr.p->setLink(i, n5Addr);
n6Ptr.p->setLink(2, n4Addr);
n6Ptr.p->setSide(1 - i);
node6.setLink(i, loc5);
node6.setLink(2, loc4);
node6.setSide(1 - i);
n2Ptr.p->setLink(1 - i, n3Addr);
n2Ptr.p->setLink(2, n4Addr);
node2.setLink(1 - i, loc3);
node2.setLink(2, loc4);
n4Ptr.p->setLink(i, n2Addr);
n4Ptr.p->setLink(1 - i, n6Addr);
n4Ptr.p->setLink(2, n0Addr);
n4Ptr.p->setSide(n6Side);
node4.setLink(i, loc2);
node4.setLink(1 - i, loc6);
node4.setLink(2, loc0);
node4.setSide(side6);
if (n0Addr != NullTupAddr) {
if (loc0 != NullTupLoc) {
jam();
selectNode(signal, frag, n0Ptr, n0Addr, AccHead);
n0Ptr.p->setLink(n6Side, n4Addr);
selectNode(signal, node0, loc0, AccHead);
node0.setLink(side6, loc4);
} else {
jam();
frag.m_tree.m_root = n4Addr;
frag.m_tree.m_root = loc4;
}
// set balance of changed nodes
n4Ptr.p->setBalance(0);
if (n4Bal == 0) {
node4.setBalance(0);
if (bal4 == 0) {
jam();
n2Ptr.p->setBalance(0);
n6Ptr.p->setBalance(0);
} else if (n4Bal == -n2Bal) {
node2.setBalance(0);
node6.setBalance(0);
} else if (bal4 == -bal2) {
jam();
n2Ptr.p->setBalance(0);
n6Ptr.p->setBalance(n2Bal);
} else if (n4Bal == n2Bal) {
node2.setBalance(0);
node6.setBalance(bal2);
} else if (bal4 == bal2) {
jam();
n2Ptr.p->setBalance(-n2Bal);
n6Ptr.p->setBalance(0);
node2.setBalance(-bal2);
node6.setBalance(0);
} else {
ndbrequire(false);
}
// new top node
nodePtr = n4Ptr;
node = node4;
}
......@@ -10,6 +10,8 @@ libdbtux_a_SOURCES = \
DbtuxCmp.cpp \
DbtuxDebug.cpp
INCLUDES_LOC = -I$(top_srcdir)/ndb/src/kernel/blocks/dbtup
include $(top_srcdir)/ndb/config/common.mk.am
include $(top_srcdir)/ndb/config/type_kernel.mk.am
......
index maintenance overhead
==========================
"mc02" 2x1700 MHz linux-2.4.9 gcc-2.96 -O3 one db-node
case a: index on Unsigned
testOIBasic -case u -table 1 -index 1 -fragtype small -threads 10 -rows 100000 -subloop 1 -nologging
case b: index on Varchar(5) + Varchar(5) + Varchar(20) + Unsigned
testOIBasic -case u -table 2 -index 4 -fragtype small -threads 10 -rows 100000 -subloop 1 -nologging
1 million rows, pk update without index, pk update with index
shows ms / 1000 rows for each and pct overhead
the figures are based on single run on idle machine
040616 mc02/a 40 ms 87 ms 114 pct
mc02/b 51 ms 128 ms 148 pct
optim 1 mc02/a 38 ms 85 ms 124 pct
mc02/b 51 ms 123 ms 140 pct
optim 2 mc02/a 41 ms 80 ms 96 pct
mc02/b 51 ms 117 ms 128 pct
optim 3 mc02/a 43 ms 80 ms 85 pct
mc02/b 54 ms 118 ms 117 pct
optim 4 mc02/a 42 ms 80 ms 87 pct
mc02/b 51 ms 119 ms 129 pct
optim 5 mc02/a 43 ms 77 ms 77 pct
mc02/b 54 ms 118 ms 117 pct
optim 6 mc02/a 42 ms 70 ms 66 pct
mc02/b 53 ms 109 ms 105 pct
vim: set et:
......@@ -525,7 +525,7 @@ void Qmgr::execCM_REGREQ(Signal* signal)
cmRegConf->dynamicId = TdynId;
c_clusterNodes.copyto(NdbNodeBitmask::Size, cmRegConf->allNdbNodes);
sendSignal(Tblockref, GSN_CM_REGCONF, signal,
CmRegConf::SignalLength, JBB);
CmRegConf::SignalLength, JBA);
DEBUG_START(GSN_CM_REGCONF, refToNode(Tblockref), "");
/**
......@@ -847,7 +847,7 @@ void Qmgr::execCM_NODEINFOCONF(Signal* signal)
nodePtr.i = getOwnNodeId();
ptrAss(nodePtr, nodeRec);
ndbrequire(nodePtr.p->phase == ZSTARTING);
ndbrequire(c_start.m_gsn = GSN_CM_NODEINFOREQ);
ndbrequire(c_start.m_gsn == GSN_CM_NODEINFOREQ);
c_start.m_nodes.clearWaitingFor(nodeId);
/**
......@@ -1019,7 +1019,7 @@ void Qmgr::execCM_ADD(Signal* signal)
ndbrequire(addNodePtr.i == nodePtr.i);
switch(type){
case CmAdd::Prepare:
ndbrequire(c_start.m_gsn = GSN_CM_NODEINFOREQ);
ndbrequire(c_start.m_gsn == GSN_CM_NODEINFOREQ);
/**
* Wait for CM_NODEINFO_CONF
*/
......
......@@ -39,6 +39,7 @@ struct Opt {
NdbDictionary::Object::FragmentType m_fragtype;
const char* m_index;
unsigned m_loop;
bool m_nologging;
unsigned m_rows;
unsigned m_scanrd;
unsigned m_scanex;
......@@ -54,6 +55,7 @@ struct Opt {
m_fragtype(NdbDictionary::Object::FragUndefined),
m_index(0),
m_loop(1),
m_nologging(false),
m_rows(1000),
m_scanrd(240),
m_scanex(240),
......@@ -82,6 +84,7 @@ printhelp()
<< " -fragtype T fragment type single/small/medium/large" << endl
<< " -index xyz only given index numbers (digits 1-9)" << endl
<< " -loop N loop count full suite forever=0 [" << d.m_loop << "]" << endl
<< " -nologging create tables in no-logging mode" << endl
<< " -rows N rows per thread [" << d.m_rows << "]" << endl
<< " -scanrd N scan read parallelism [" << d.m_scanrd << "]" << endl
<< " -scanex N scan exclusive parallelism [" << d.m_scanex << "]" << endl
......@@ -476,7 +479,7 @@ tt1 = {
"TT1", 5, tt1col, 4, tt1itab
};
// tt2 + tt2x1 tt2x2 tt2x3
// tt2 + tt2x1 tt2x2 tt2x3 tt2x4
static const Col
tt2col[] = {
......@@ -505,6 +508,14 @@ tt2x3col[] = {
{ 1, tt2col[4] }
};
static const ICol
tt2x4col[] = {
{ 0, tt2col[4] },
{ 1, tt2col[3] },
{ 2, tt2col[2] },
{ 3, tt2col[1] }
};
static const ITab
tt2x1 = {
"TT2X1", 2, tt2x1col
......@@ -520,16 +531,22 @@ tt2x3 = {
"TT2X3", 2, tt2x3col
};
static const ITab
tt2x4 = {
"TT2X4", 4, tt2x4col
};
static const ITab
tt2itab[] = {
tt2x1,
tt2x2,
tt2x3
tt2x3,
tt2x4
};
static const Tab
tt2 = {
"TT2", 5, tt2col, 3, tt2itab
"TT2", 5, tt2col, 4, tt2itab
};
// all tables
......@@ -823,6 +840,9 @@ createtable(Par par)
if (par.m_fragtype != NdbDictionary::Object::FragUndefined) {
t.setFragmentType(par.m_fragtype);
}
if (par.m_nologging) {
t.setLogging(false);
}
for (unsigned k = 0; k < tab.m_cols; k++) {
const Col& col = tab.m_col[k];
NdbDictionary::Column c(col.m_name);
......@@ -2202,7 +2222,6 @@ pkupdateindexbuild(Par par)
{
if (par.m_no == 0) {
CHK(createindex(par) == 0);
CHK(invalidateindex(par) == 0);
} else {
CHK(pkupdate(par) == 0);
}
......@@ -2493,6 +2512,7 @@ tbusybuild(Par par)
RUNSTEP(par, pkinsert, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, MT);
RUNSTEP(par, dropindex, ST);
}
......@@ -2500,9 +2520,28 @@ tbusybuild(Par par)
}
static int
ttiming(Par par)
ttimebuild(Par par)
{
Tmr t1;
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) {
RUNSTEP(par, pkinsert, MT);
t1.on();
RUNSTEP(par, createindex, ST);
t1.off(par.m_totrows);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, dropindex, ST);
}
LL1("build index - " << t1.time());
return 0;
}
static int
ttimemaint(Par par)
{
Tmr t0, t1, t2;
Tmr t1, t2;
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
......@@ -2511,16 +2550,13 @@ ttiming(Par par)
t1.on();
RUNSTEP(par, pkupdate, MT);
t1.off(par.m_totrows);
t0.on();
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
t0.off(par.m_totrows);
t2.on();
RUNSTEP(par, pkupdate, MT);
t2.off(par.m_totrows);
RUNSTEP(par, dropindex, ST);
}
LL1("build index - " << t0.time());
LL1("update - " << t1.time());
LL1("update indexed - " << t2.time());
LL1("overhead - " << t2.over(t1));
......@@ -2551,7 +2587,8 @@ tcaselist[] = {
TCase("b", tpkops, "pk operations and scan reads"),
TCase("c", tmixedops, "pk operations and scan operations"),
TCase("d", tbusybuild, "pk operations and index build"),
TCase("t", ttiming, "time index build and maintenance"),
TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"),
TCase("z", tdrop, "drop test tables")
};
......@@ -2689,6 +2726,10 @@ NDB_COMMAND(testOIBasic, "testOIBasic", "testOIBasic", "testOIBasic", 65535)
continue;
}
}
if (strcmp(arg, "-nologging") == 0) {
g_opt.m_nologging = true;
continue;
}
if (strcmp(arg, "-rows") == 0) {
if (++argv, --argc > 0) {
g_opt.m_rows = atoi(argv[0]);
......
#!/bin/sh
# NAME
# make-html-reports.sh
#
# SYNOPSIS
# make-html-reports.sh [-q] [ -R <YYYY-MM-DD> ] [ -s <src dir> ] [ -d <dst dir> ] [ -c <conf dir> ]
#
# DESCRIPTION
#
# OPTIONS
#
# EXAMPLES
#
#
# ENVIRONMENT
# NDB_PROJ_HOME Home dir for ndb
#
# FILES
# $NDB_PROJ_HOME/lib/funcs.sh general shell script functions
#
#
# SEE ALSO
#
# DIAGNOSTICTS
#
# VERSION
# 1.0
#
# AUTHOR
# Jonas Oreland
#
progname=`basename $0`
synopsis="make-html-reports.sh [ -R <YYYY-MM-DD> ] [ -s <src dir> ] [ -d <dst dir> ] [ -c <conf dir> ]"
: ${NDB_PROJ_HOME:?} # If undefined, exit with error message
: ${NDB_LOCAL_BUILD_OPTIONS:=--} # If undef, set to --. Keeps getopts happy.
# You may have to experiment a bit
# to get quoting right (if you need it).
. $NDB_PROJ_HOME/lib/funcs.sh # Load some good stuff
# defaults for options related variables
#
src_dir=`pwd`
dst_dir=`pwd`
conf_dir=`pwd`
report_date=`date '+%Y-%m-%d'`
uniq_id=$$.$$
verbose=yes
# used if error when parsing the options environment variable
#
env_opterr="options environment variable: <<$options>>"
# Option parsing, for the options variable as well as the command line.
#
# We want to be able to set options in an environment variable,
# as well as on the command line. In order not to have to repeat
# the same getopts information twice, we loop two times over the
# getopts while loop. The first time, we process options from
# the options environment variable, the second time we process
# options from the command line.
#
# The things to change are the actual options and what they do.
#
#
for optstring in "$options" "" # 1. options variable 2. cmd line
do
while getopts q:s:R:d:c: i $optstring # optstring empty => no arg => cmd line
do
case $i in
src_dir=$1
run=$2
date=$3
src_file=$src_dir/report.txt
q) verbose="";; # echo important things
d) dst_dir=$OPTARG;; # Destination directory
s) src_dir=$OPTARG;; # Destination directory
c) conf_dir=$OPTARG;; #
R) report_date=$OPTARG;; #
\?) syndie $env_opterr;; # print synopsis and exit
esac
done
[ -n "$optstring" ] && OPTIND=1 # Reset for round 2, cmdline options
env_opterr= # Round 2 should not use the value
done
shift `expr $OPTIND - 1`
src_dir=`abspath $src_dir`
dst_dir=`abspath $dst_dir`
conf_dir=`abspath $conf_dir`
if [ ! -f $src_dir/report.txt ]
then
echo "$src_dir/report.txt is missing"
exit 1
fi
###
#
# General html functions
trim(){
echo $*
}
header(){
cat <<EOF
<html><head><title>$*</title></head>
......@@ -166,64 +83,7 @@ hr(){
EOF
}
# --- option parsing done ---
# -- Verify
trace "Verifying arguments"
summary_file=$src_dir/reports/summary.$report_date
if [ ! -r $summary_file ]
then
syndie "Invalid src directory or report date: $summary_file not found"
fi
if [ ! -d $conf_dir/configurations ]
then
syndie "Invalid src directory: $conf_dir/configurations not found"
fi
if [ ! -d $conf_dir/testcases ]
then
syndie "Invalid src directory: $conf_dir/testcases not found"
fi
if [ ! -d $dst_dir ]
then
syndie "Invalid dst dir..."
fi
# --- option verifying done ---
trace "src_dir: $src_dir"
trace "dst_dir: $dst_dir"
trace "conf_dir: $conf_dir"
trace "report date: $report_date"
###
config_spec(){
cat <<EOF
<a href=#$1>$1</a>
EOF
}
config_spec_include(){
# Print the $1 file to the file we are generating
cat <<EOF
<a name=$1><pre>
EOF
if [ -r $conf_dir/configurations/$1 ]
then
cat -E $conf_dir/configurations/$1 | sed 's/\$/<BR>/g'
else
cat <<EOF
Config spec $1 not found
EOF
fi
cat <<EOF
</pre></a>
EOF
}
time_spec(){
# $1 - secs
_ts_tmp=$1
......@@ -232,8 +92,14 @@ time_spec(){
_ts_tmp=`expr $_ts_tmp / 60`
_ts_m=`expr $_ts_tmp % 60`
if [ $_ts_tmp -ge 60 ]
then
_ts_tmp=`expr $_ts_tmp / 60`
else
_ts_tmp=0
fi
a=3
_ts_h=$_ts_tmp
if [ $_ts_h -gt 0 ]
......@@ -247,191 +113,77 @@ time_spec(){
echo $ret
}
log_spec(){
_ff_=$src_dir/log/$report_date/$1.$2/test.$3.out
if [ -r $_ff_ ] && [ -s $_ff_ ]
then
_f2_=$dst_dir/log.$report_date.$1.$2.$3.out.gz
if [ -r $_f2_ ]
then
rm $_f2_
fi
cp $_ff_ $dst_dir/log.$report_date.$1.$2.$3.out
gzip $dst_dir/log.$report_date.$1.$2.$3.out
rm -f $dst_dir/log.$report_date.$1.$2.$3.out
echo "<a href=log.$report_date.$1.$2.$3.out.gz>Log file</a>"
else
echo "-"
fi
}
### Main
err_spec(){
_ff_=$src_dir/log/$report_date/$1.$2/test.$3.err.tar
if [ -r $_ff_ ] && [ -s $_ff_ ]
then
cp $_ff_ $dst_dir/err.$report_date.$1.$2.$3.err.tar
gzip $dst_dir/err.$report_date.$1.$2.$3.err.tar
rm -f $dst_dir/err.$report_date.$1.$2.$3.err.tar
echo "<a href=err.$report_date.$1.$2.$3.err.tar.gz>Error tarball</a>"
else
echo "-"
fi
}
report_file=$src_dir/report.html
summary_file=$src_dir/summary.html
command_spec(){
echo $* | sed 's/;/<BR>/g'
}
passed=0
failed=0
total=0
### Main
pass(){
passed=`expr $passed + 1`
}
html_summary_file=$dst_dir/summary.$report_date.html
fail(){
failed=`expr $failed + 1`
}
trace "Creating summary"
(
eval `grep "TOTAL" $summary_file | awk -F";" '{ printf("test_file=\"%s\"; elapsed=\"%s\"; started=\"%s\"; stopped=\"%s\"", $2, $3, $4, $5); }'`
header "Autotest summary $report_date"
heading 1 "Autotest summary $report_date"
table
row ; column `bold test file: `; column $test_file ; end_row
row ; column `bold Started:` ; column "$started "; end_row
row ; column `bold Stopped:` ; column "$stopped "; end_row
row ; column `bold Elapsed:` ; column "`time_spec $elapsed secs`" ; end_row
end_table
hr
header Report $run $date
table "border=1"
row
c_column `bold Report`
c_column `bold Tag`
c_column `bold Version`
c_column `bold Distr-Config`
c_column `bold Db-Config`
c_column `bold Type`
c_column `bold Test file`
c_column `bold Make`
c_column `bold Config`
c_column `bold Test time`
c_column `bold Passed`
c_column `bold Failed`
column `bold Test case`
column `bold Result`
column `bold Elapsed`
column `bold Log`
end_row
) > $report_file
grep -v "^#" $summary_file | grep -v TOTAL | sed 's/;/ /g' | \
while read tag version config template type test_file make_res make_time conf_res conf_time test_time passed failed
do
row
if [ -r $src_dir/reports/report.$tag.$version.$config.$template.$type.$test_file.$report_date ]
then
column "<a href=\"report.$tag.$version.$config.$template.$type.$test_file.$report_date.html\">report</a>"
else
column "-"
fi
column $tag
column $version
column $config
column $template
column $type
column $test_file
column "$make_res(`time_spec $make_time`)"
column "$conf_res(`time_spec $conf_time`)"
c_column "`time_spec $test_time`"
c_column `bold $passed`
c_column `bold $failed`
end_row
done
end_table
footer
) > $html_summary_file
for i in $src_dir/reports/report.*.$report_date
cat $src_file | while read line
do
f=`basename $i`
trace "Creating report: $f"
eval `echo $f | awk -F"." '{printf("tag=%s;version=%s;config=%s;template=%s;type=%s;test_file=%s", $2, $3, $4, $5, $6, $7);}'`
eval `echo $line | awk -F";" '{ printf("prg=\"%s\"; no=\"%s\"; res=\"%s\"; time=\"%s\"", $1, $2, $3, $4); }'`
prg=`trim $prg`
no=`trim $no`
res=`trim $res`
time=`trim $time`
res_dir="<a href=\"result-$run/$date/result.$no/\">log</a>"
ts=`time_spec $time`
res_txt=""
case $res in
0) pass; res_txt="PASSED"; res_dir="&nbsp;";;
*) fail; res_txt="FAILED";;
esac
total=`expr $total + $time`
(
header "Autotest report $report_date"
heading 1 "Autotest report $report_date"
table #"border=1"
row ; column `bold Tag:`; column $tag ; end_row
row ; column `bold Version:` ; column $version ; end_row
row ; column `bold Configuration:` ; column `config_spec $config`; end_row
row ; column `bold Template:` ; column `config_spec $template`; end_row
row ; column `bold Type:` ; column $type ; end_row
row ; column `bold Test file:` ; column $test_file; end_row
end_table
hr
table "border=1"
row
c_column `bold Test case`
c_column `bold Result`
c_column `bold Test time`
c_column `bold Logfile`
c_column `bold Error tarfile`
column $prg
column $res_txt
column $ts
column $res_dir
end_row
) >> $report_file
grep -v "^#" $i | sed 's/;/ /g' | \
while read test_no test_res test_time cmd
do
(
row
column "`command_spec $cmd`"
case "$test_res" in
0)
column "PASSED";;
1001)
column "API error";;
1002)
column "Max time expired";;
1003)
column "Mgm port busy";;
*)
column "Unknown: $test_res";;
esac
column "`time_spec $test_time`"
column "`log_spec $tag $version $test_no`"
column "`err_spec $tag $version $test_no`"
column $run
column $date
column $passed
column $failed
column `time_spec $total`
column "<a href=\"result-$run/$date/report.html\">report</a>"
column "<a href=\"result-$run/$date/log.txt\">log.txt</a>"
end_row
done
end_table
# Last on page we include spec
# of used machines and template for config
# for future reference
hr
table "border=1"
row; column `bold Configuration:` $config; end_row
row; column `config_spec_include $config`; end_row
end_table
hr
table "border=1"
row; column `bold Template:` $template; end_row
row; column `config_spec_include $template`; end_row
end_table
footer
) > $dst_dir/$f.html
) > $summary_file
done
# Re creating index
trace "Recreating index"
(
header "Autotest super-duper index"
heading 1 "<center>Autotest super-duper index</center>"
hr
for i in `ls $dst_dir/summary.*.html | sort -r -n`
do
f=`basename $i`
cat <<EOF
<p><a href=$f>$f</a></p>
EOF
done
end_table
footer
) > $dst_dir/index.html
) >> $report_file
exit 0
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment