Commit b703c784 authored by unknown's avatar unknown

tux optim 12 - remove max prefix + related

parent 89647b1d
......@@ -88,7 +88,7 @@
* Ordered index constants. Make configurable per index later.
*/
#define MAX_TTREE_NODE_SIZE 64 // total words in node
#define MAX_TTREE_PREF_SIZE 4 // words in min/max prefix each
#define MAX_TTREE_PREF_SIZE 4 // words in min prefix
#define MAX_TTREE_NODE_SLACK 3 // diff between max and min occupancy
/*
......
......@@ -77,10 +77,14 @@
#define jam() jamLine(60000 + __LINE__)
#define jamEntry() jamEntryLine(60000 + __LINE__)
#endif
#ifdef DBTUX_CMP_CPP
#ifdef DBTUX_SEARCH_CPP
#define jam() jamLine(70000 + __LINE__)
#define jamEntry() jamEntryLine(70000 + __LINE__)
#endif
#ifdef DBTUX_CMP_CPP
#define jam() jamLine(80000 + __LINE__)
#define jamEntry() jamEntryLine(80000 + __LINE__)
#endif
#ifdef DBTUX_DEBUG_CPP
#define jam() jamLine(90000 + __LINE__)
#define jamEntry() jamEntryLine(90000 + __LINE__)
......@@ -112,6 +116,7 @@ public:
static const unsigned DescPageSize = 256;
private:
static const unsigned MaxTreeNodeSize = MAX_TTREE_NODE_SIZE;
static const unsigned MaxPrefSize = MAX_TTREE_PREF_SIZE;
static const unsigned ScanBoundSegmentSize = 7;
static const unsigned MaxAccLockOps = MAX_PARALLEL_OP_PER_SCAN;
BLOCK_DEFINES(Dbtux);
......@@ -206,19 +211,19 @@ private:
unsigned m_fragBit : 1; // which duplicated table fragment
TreeEnt();
// methods
bool eq(const TreeEnt ent) const;
int cmp(const TreeEnt ent) const;
};
static const unsigned TreeEntSize = sizeof(TreeEnt) >> 2;
static const TreeEnt NullTreeEnt;
/*
* Tree node has 1) fixed part 2) actual table data for min and max
* prefix 3) max and min entries 4) rest of entries 5) one extra entry
* Tree node has 1) fixed part 2) a prefix of index key data for min
* entry 3) max and min entries 4) rest of entries 5) one extra entry
* used as work space.
*
* struct TreeNode part 1, size 6 words
* min prefix part 2, size TreeHead::m_prefSize
* max prefix part 2, size TreeHead::m_prefSize
* max entry part 3
* min entry part 3
* rest of entries part 4
......@@ -265,14 +270,14 @@ private:
friend struct TreeHead;
struct TreeHead {
Uint8 m_nodeSize; // words in tree node
Uint8 m_prefSize; // words in min/max prefix each
Uint8 m_prefSize; // words in min prefix
Uint8 m_minOccup; // min entries in internal node
Uint8 m_maxOccup; // max entries in node
TupLoc m_root; // root node
TreeHead();
// methods
unsigned getSize(AccSize acc) const;
Data getPref(TreeNode* node, unsigned i) const;
Data getPref(TreeNode* node) const;
TreeEnt* getEntList(TreeNode* node) const;
};
......@@ -514,6 +519,8 @@ private:
NodeHandle(Frag& frag);
NodeHandle(const NodeHandle& node);
NodeHandle& operator=(const NodeHandle& node);
// check if unassigned
bool isNull();
// getters
TupLoc getLink(unsigned i);
unsigned getChilds(); // cannot spell
......@@ -528,7 +535,7 @@ private:
void setBalance(int b);
void setNodeScan(Uint32 scanPtrI);
// access other parts of the node
Data getPref(unsigned i);
Data getPref();
TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i);
// for ndbrequire and ndbassert
......@@ -618,7 +625,7 @@ private:
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc);
void insertNode(Signal* signal, NodeHandle& node, AccSize acc);
void deleteNode(Signal* signal, NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node, unsigned i);
void setNodePref(Signal* signal, NodeHandle& node);
// node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
......@@ -633,7 +640,6 @@ private:
/*
* DbtuxTree.cpp
*/
void treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
......@@ -657,12 +663,20 @@ private:
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
void releaseScanOp(ScanOpPtr& scanPtr);
/*
* DbtuxSearch.cpp
*/
void searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/*
* DbtuxCmp.cpp
*/
int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2 = MaxAttrDataSize);
int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2);
int cmpScanBound(const Frag& frag, const BoundPar boundPar);
int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey);
int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey);
/*
* DbtuxDebug.cpp
......@@ -675,6 +689,7 @@ private:
TupLoc m_parent; // expected parent address
int m_depth; // returned depth
unsigned m_occup; // returned occupancy
TreeEnt m_minmax[2]; // returned subtree min and max
bool m_ok; // returned status
PrintPar();
};
......@@ -699,6 +714,8 @@ private:
DebugTree = 4, // log and check tree after each op
DebugScan = 8 // log scans
};
static const int DataFillByte = 0xa2;
static const int NodeFillByte = 0xa4;
#endif
// start up info
......@@ -859,13 +876,18 @@ Dbtux::TreeEnt::TreeEnt() :
{
}
inline bool
Dbtux::TreeEnt::eq(const TreeEnt ent) const
{
return
m_tupLoc == ent.m_tupLoc &&
m_tupVersion == ent.m_tupVersion &&
m_fragBit == ent.m_fragBit;
}
inline int
Dbtux::TreeEnt::cmp(const TreeEnt ent) const
{
if (m_fragBit < ent.m_fragBit)
return -1;
if (m_fragBit > ent.m_fragBit)
return +1;
if (m_tupLoc.m_pageId < ent.m_tupLoc.m_pageId)
return -1;
if (m_tupLoc.m_pageId > ent.m_tupLoc.m_pageId)
......@@ -878,6 +900,10 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const
return -1;
if (m_tupVersion > ent.m_tupVersion)
return +1;
if (m_fragBit < ent.m_fragBit)
return -1;
if (m_fragBit > ent.m_fragBit)
return +1;
return 0;
}
......@@ -920,7 +946,7 @@ Dbtux::TreeHead::getSize(AccSize acc) const
case AccHead:
return NodeHeadSize;
case AccPref:
return NodeHeadSize + 2 * m_prefSize + 2 * TreeEntSize;
return NodeHeadSize + m_prefSize + 2 * TreeEntSize;
case AccFull:
return m_nodeSize;
}
......@@ -929,16 +955,16 @@ Dbtux::TreeHead::getSize(AccSize acc) const
}
inline Dbtux::Data
Dbtux::TreeHead::getPref(TreeNode* node, unsigned i) const
Dbtux::TreeHead::getPref(TreeNode* node) const
{
Uint32* ptr = (Uint32*)node + NodeHeadSize + i * m_prefSize;
Uint32* ptr = (Uint32*)node + NodeHeadSize;
return ptr;
}
inline Dbtux::TreeEnt*
Dbtux::TreeHead::getEntList(TreeNode* node) const
{
Uint32* ptr = (Uint32*)node + NodeHeadSize + 2 * m_prefSize;
Uint32* ptr = (Uint32*)node + NodeHeadSize + m_prefSize;
return (TreeEnt*)ptr;
}
......@@ -1087,6 +1113,12 @@ Dbtux::NodeHandle::operator=(const NodeHandle& node)
return *this;
}
inline bool
Dbtux::NodeHandle::isNull()
{
return m_node == 0;
}
inline Dbtux::TupLoc
Dbtux::NodeHandle::getLink(unsigned i)
{
......@@ -1161,11 +1193,11 @@ Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI)
}
inline Dbtux::Data
Dbtux::NodeHandle::getPref(unsigned i)
Dbtux::NodeHandle::getPref()
{
TreeHead& tree = m_frag.m_tree;
ndbrequire(m_acc >= AccPref && i <= 1);
return tree.getPref(m_node, i);
ndbrequire(m_acc >= AccPref);
return tree.getPref(m_node);
}
inline Dbtux::TreeEnt
......
This diff is collapsed.
......@@ -137,16 +137,17 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
par.m_ok = false;
}
}
static const char* const sep = " *** ";
// check child-parent links
if (node.getLink(2) != par.m_parent) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "parent loc " << hex << node.getLink(2);
out << " should be " << hex << par.m_parent << endl;
}
if (node.getSide() != par.m_side) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "side " << dec << node.getSide();
out << " should be " << dec << par.m_side << endl;
}
......@@ -154,26 +155,26 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
const int balance = -cpar[0].m_depth + cpar[1].m_depth;
if (node.getBalance() != balance) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "balance " << node.getBalance();
out << " should be " << balance << endl;
}
if (abs(node.getBalance()) > 1) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "balance " << node.getBalance() << " is invalid" << endl;
}
// check occupancy
if (node.getOccup() > tree.m_maxOccup) {
if (node.getOccup() == 0 || node.getOccup() > tree.m_maxOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "occupancy " << node.getOccup();
out << " greater than max " << tree.m_maxOccup << endl;
out << " zero or greater than max " << tree.m_maxOccup << endl;
}
// check for occupancy of interior node
if (node.getChilds() == 2 && node.getOccup() < tree.m_minOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl;
}
......@@ -183,13 +184,74 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
node.getLink(1 - i) == NullTupLoc &&
node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "missed merge with child " << i << endl;
}
}
// check inline prefix
{ ConstData data1 = node.getPref();
Uint32 data2[MaxPrefSize];
memset(data2, DataFillByte, MaxPrefSize << 2);
readKeyAttrs(frag, node.getMinMax(0), 0, c_searchKey);
copyAttrs(frag, c_searchKey, data2, tree.m_prefSize);
for (unsigned n = 0; n < tree.m_prefSize; n++) {
if (data1[n] != data2[n]) {
par.m_ok = false;
out << par.m_path << sep;
out << "inline prefix mismatch word " << n;
out << " value " << hex << data1[n];
out << " should be " << hex << data2[n] << endl;
break;
}
}
}
// check ordering within node
for (unsigned j = 1; j < node.getOccup(); j++) {
unsigned start = 0;
const TreeEnt ent1 = node.getEnt(j - 1);
const TreeEnt ent2 = node.getEnt(j);
if (j == 1) {
readKeyAttrs(frag, ent1, start, c_searchKey);
} else {
memcpy(c_searchKey, c_entryKey, frag.m_numAttrs << 2);
}
readKeyAttrs(frag, ent2, start, c_entryKey);
int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
if (ret == 0)
ret = ent1.cmp(ent2);
if (ret != -1) {
par.m_ok = false;
out << par.m_path << sep;
out << " disorder within node at pos " << j << endl;
}
}
// check ordering wrt subtrees
for (unsigned i = 0; i <= 1; i++) {
if (node.getLink(i) == NullTupLoc)
continue;
const TreeEnt ent1 = cpar[i].m_minmax[1 - i];
const TreeEnt ent2 = node.getMinMax(i);
unsigned start = 0;
readKeyAttrs(frag, ent1, start, c_searchKey);
readKeyAttrs(frag, ent2, start, c_entryKey);
int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
if (ret == 0)
ret = ent1.cmp(ent2);
if (ret != (i == 0 ? -1 : +1)) {
par.m_ok = false;
out << par.m_path << sep;
out << " disorder wrt subtree " << i << endl;
}
}
// return values
par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth);
par.m_occup = node.getOccup();
for (unsigned i = 0; i <= 1; i++) {
if (node.getLink(i) == NullTupLoc)
par.m_minmax[i] = node.getMinMax(i);
else
par.m_minmax[i] = cpar[i].m_minmax[i];
}
}
NdbOut&
......@@ -355,20 +417,19 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node)
out << " [acc " << dec << node.m_acc << "]";
out << " [node " << *node.m_node << "]";
if (node.m_acc >= Dbtux::AccPref) {
for (unsigned i = 0; i <= 1; i++) {
out << " [pref " << dec << i;
const Uint32* data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + i * tree.m_prefSize;
for (unsigned j = 0; j < node.m_frag.m_tree.m_prefSize; j++)
out << " " << hex << data[j];
out << "]";
}
const Uint32* data;
out << " [pref";
data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize;
for (unsigned j = 0; j < tree.m_prefSize; j++)
out << " " << hex << data[j];
out << "]";
out << " [entList";
unsigned numpos = node.m_node->m_occup;
if (node.m_acc < Dbtux::AccFull && numpos > 2) {
numpos = 2;
out << "(" << dec << numpos << ")";
}
const Uint32* data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + 2 * tree.m_prefSize;
data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize;
const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data;
for (unsigned pos = 0; pos < numpos; pos++)
out << " " << entList[pos];
......
......@@ -26,7 +26,12 @@ Dbtux::Dbtux(const Configuration& conf) :
#ifdef VM_TRACE
debugFile(0),
debugOut(*new NullOutputStream()),
// until ndb_mgm supports dump
#ifdef DBTUX_DEBUG_TREE
debugFlags(DebugTree),
#else
debugFlags(0),
#endif
#endif
c_internalStartPhase(0),
c_typeOfStart(NodeState::ST_ILLEGAL_TYPE),
......@@ -314,6 +319,9 @@ Dbtux::copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2
keyAttrs += 1;
data1 += 1;
}
#ifdef VM_TRACE
memset(data2, DataFillByte, len2 << 2);
#endif
}
BLOCK_FUNCTIONS(Dbtux);
......@@ -110,20 +110,19 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
debugOut << " entry=" << ent;
debugOut << endl;
}
#endif
// find position in tree
TreePos treePos;
treeSearch(signal, frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << endl;
}
#endif
// do the operation
req->errorCode = 0;
TreePos treePos;
switch (opCode) {
case TuxMaintReq::OpAdd:
jam();
searchToAdd(signal, frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << endl;
}
#endif
if (treePos.m_match) {
jam();
// there is no "Building" state so this will have to do
......@@ -152,6 +151,12 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
case TuxMaintReq::OpRemove:
jam();
searchToRemove(signal, frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << endl;
}
#endif
if (! treePos.m_match) {
jam();
// there is no "Building" state so this will have to do
......@@ -167,7 +172,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
ndbrequire(false);
break;
}
// commit and release nodes
#ifdef VM_TRACE
if (debugFlags & DebugTree) {
printTree(signal, frag, debugOut);
......
......@@ -85,10 +85,9 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc)
new (node.m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
memset(node.getPref(0), 0xa2, tree.m_prefSize << 2);
memset(node.getPref(1), 0xa2, tree.m_prefSize << 2);
memset(node.getPref(), DataFillByte, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node.m_node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
memset(entList, NodeFillByte, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
}
......@@ -116,12 +115,12 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
* attribute headers for now. XXX use null mask instead
*/
void
Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
Dbtux::setNodePref(Signal* signal, NodeHandle& node)
{
const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree;
readKeyAttrs(frag, node.getMinMax(i), 0, c_entryKey);
copyAttrs(frag, c_entryKey, node.getPref(i), tree.m_prefSize);
readKeyAttrs(frag, node.getMinMax(0), 0, c_entryKey);
copyAttrs(frag, c_entryKey, node.getPref(), tree.m_prefSize);
}
// node operations
......@@ -173,11 +172,9 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// fix prefixes
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(signal, node, 0);
if (occup == 0 || pos == occup)
setNodePref(signal, node, 1);
setNodePref(signal, node);
}
/*
......@@ -248,11 +245,9 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
}
entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefixes
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(signal, node, 0);
if (occup != 1 && pos == occup - 1)
setNodePref(signal, node, 1);
setNodePref(signal, node);
}
/*
......@@ -325,11 +320,9 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefixes
// fix prefix
if (true)
setNodePref(signal, node, 0);
if (occup == 1 || pos == occup - 1)
setNodePref(signal, node, 1);
setNodePref(signal, node);
}
/*
......@@ -403,11 +396,9 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
}
tmpList[0] = newMin;
entList[0] = entList[occup];
// fix prefixes
// fix prefix
if (true)
setNodePref(signal, node, 0);
if (occup == 1 || pos == occup - 1)
setNodePref(signal, node, 1);
setNodePref(signal, node);
}
/*
......
......@@ -689,16 +689,9 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
TreeHead& tree = frag.m_tree;
if (tree.m_root == NullTupLoc) {
// tree may have become empty
jam();
scan.m_state = ScanOp::Last;
return;
}
TreePos pos;
pos.m_loc = tree.m_root;
NodeHandle node(frag);
// unpack lower bound
// set up index keys for this operation
setKeyAttrs(frag);
// unpack lower bound into c_dataBuffer
const ScanBound& bound = *scan.m_bound[0];
ScanBoundIterator iter;
bound.first(iter);
......@@ -707,103 +700,22 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
c_dataBuffer[j] = *iter.data;
bound.next(iter);
}
// comparison parameters
BoundPar boundPar;
boundPar.m_data1 = c_dataBuffer;
boundPar.m_count1 = scan.m_boundCnt[0];
boundPar.m_dir = 0;
loop: {
// search for scan start position
TreePos treePos;
searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
if (treePos.m_loc == NullTupLoc) {
// empty tree
jam();
selectNode(signal, node, pos.m_loc, AccPref);
const unsigned occup = node.getOccup();
ndbrequire(occup != 0);
for (unsigned i = 0; i <= 1; i++) {
jam();
// compare prefix
boundPar.m_data2 = node.getPref(i);
boundPar.m_len2 = tree.m_prefSize;
int ret = cmpScanBound(frag, boundPar);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read full value
ReadPar readPar;
readPar.m_ent = node.getMinMax(i);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
tupReadAttrs(signal, frag, readPar);
// compare full value
boundPar.m_data2 = readPar.m_data;
boundPar.m_len2 = ZNIL; // big
ret = cmpScanBound(frag, boundPar);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (i == 0 && ret < 0) {
jam();
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
pos.m_loc = loc;
goto loop;
}
// start scanning this node
pos.m_pos = 0;
pos.m_match = false;
pos.m_dir = 3;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
linkScan(node, scanPtr);
return;
}
if (i == 1 && ret > 0) {
jam();
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to right subtree
pos.m_loc = loc;
goto loop;
}
// start scanning upwards
pos.m_dir = 1;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
linkScan(node, scanPtr);
return;
}
}
// read rest of current node
accessNode(signal, node, AccFull);
// look for first entry
ndbrequire(occup >= 2);
for (unsigned j = 1; j < occup; j++) {
jam();
ReadPar readPar;
readPar.m_ent = node.getEnt(j);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
tupReadAttrs(signal, frag, readPar);
// compare
boundPar.m_data2 = readPar.m_data;
boundPar.m_len2 = ZNIL; // big
int ret = cmpScanBound(frag, boundPar);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
jam();
// start scanning this node
pos.m_pos = j;
pos.m_match = false;
pos.m_dir = 3;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
linkScan(node, scanPtr);
return;
}
}
ndbrequire(false);
scan.m_state = ScanOp::Last;
return;
}
// set position and state
scan.m_scanPos = treePos;
scan.m_state = ScanOp::Next;
// link the scan to node found
NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc, AccFull);
linkScan(node, scanPtr);
}
/*
......@@ -841,7 +753,9 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_accLockOp = RNIL;
scan.m_state = ScanOp::Current;
}
// unpack upper bound
// set up index keys for this operation
setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer
const ScanBound& bound = *scan.m_bound[1];
ScanBoundIterator iter;
bound.first(iter);
......@@ -850,11 +764,6 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
c_dataBuffer[j] = *iter.data;
bound.next(iter);
}
// comparison parameters
BoundPar boundPar;
boundPar.m_data1 = c_dataBuffer;
boundPar.m_count1 = scan.m_boundCnt[1];
boundPar.m_dir = 1;
// use copy of position
TreePos pos = scan.m_scanPos;
// get and remember original node
......@@ -912,17 +821,9 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
jam();
pos.m_ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged
// XXX implement prefix optimization
ReadPar readPar;
readPar.m_ent = pos.m_ent;
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
tupReadAttrs(signal, frag, readPar);
// compare
boundPar.m_data2 = readPar.m_data;
boundPar.m_len2 = ZNIL; // big
int ret = cmpScanBound(frag, boundPar);
// read and compare all attributes
readKeyAttrs(frag, pos.m_ent, 0, c_entryKey);
int ret = cmpScanBound(frag, 1, c_dataBuffer, scan.m_boundCnt[1], c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
jam();
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define DBTUX_SEARCH_CPP
#include "Dbtux.hpp"
/*
* Search for entry to add.
*
* Similar to searchToRemove (see below).
*
* TODO optimize for initial equal attrs in node min/max
*/
void
Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
/*
* In order to not (yet) change old behaviour, a position between
* 2 nodes returns the one at the bottom of the tree.
*/
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
unsigned start = 0;
ret = cmpSearchKey(frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare remaining attributes
ndbrequire(start < numAttrs);
readKeyAttrs(frag, currNode.getMinMax(0), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getMinMax(0));
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b but remember the bottom node
bottomNode = currNode;
currNode = glbNode;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = true;
return;
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
jam();
int ret;
// read and compare attributes
unsigned start = 0;
readKeyAttrs(frag, currNode.getEnt(j), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getEnt(j));
}
if (ret <= 0) {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = (ret == 0);
return;
}
}
if (! bottomNode.isNull()) {
jam();
// backwards compatible for now
treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
return;
}
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup();
treePos.m_match = false;
}
/*
* Search for entry to remove.
*
* Compares search key to each node min. A move to right subtree can
* overshoot target node. The last such node is saved. The final node
* is a half-leaf or leaf. If search key is less than final node min
* then the saved node is the g.l.b of the final node and we move back
* to it.
*/
void
Dbtux::searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
unsigned start = 0;
ret = cmpSearchKey(frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare remaining attributes
ndbrequire(start < numAttrs);
readKeyAttrs(frag, currNode.getMinMax(0), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getMinMax(0));
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b
currNode = glbNode;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = true;
return;
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
// pos 0 was handled above
for (unsigned j = 1, occup = currNode.getOccup(); j < occup; j++) {
jam();
// compare only the entry
if (searchEnt.eq(currNode.getEnt(j))) {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = true;
return;
}
}
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup();
treePos.m_match = false;
}
/*
* Search for scan start position.
*
* Similar to searchToAdd.
*/
void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare all attributes
readKeyAttrs(frag, currNode.getMinMax(0), 0, c_entryKey);
ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b but remember the bottom node
bottomNode = currNode;
currNode = glbNode;
} else {
// start scanning this node
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
ndbassert(false);
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
jam();
int ret;
// read and compare attributes
readKeyAttrs(frag, currNode.getEnt(j), 0, c_entryKey);
ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
// start scanning from current entry
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
}
if (! bottomNode.isNull()) {
jam();
// start scanning the l.u.b
treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
// start scanning upwards (pretend we came from right child)
treePos.m_loc = currNode.m_loc;
treePos.m_dir = 1;
}
......@@ -17,112 +17,6 @@
#define DBTUX_TREE_CPP
#include "Dbtux.hpp"
/*
* Search for entry.
*
* Search key is index attribute data and tree entry value. Start from
* root node and compare the key to min/max of each node. Use linear
* search on the final (bounding) node. Initial attributes which are
* same in min/max need not be checked.
*/
void
Dbtux::treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
treePos.m_loc = tree.m_root;
if (treePos.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_pos = 0;
treePos.m_match = false;
return;
}
NodeHandle node(frag);
loop: {
jam();
selectNode(signal, node, treePos.m_loc, AccPref);
const unsigned occup = node.getOccup();
ndbrequire(occup != 0);
// number of equal initial attributes in bounding node
unsigned start = ZNIL;
for (unsigned i = 0; i <= 1; i++) {
jam();
unsigned start1 = 0;
// compare prefix
int ret = cmpSearchKey(frag, start1, searchKey, node.getPref(i), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare remaining attributes
readKeyAttrs(frag, node.getMinMax(i), start1, c_entryKey);
ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (start > start1)
start = start1;
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(node.getMinMax(i));
}
if (i == 0 ? (ret < 0) : (ret > 0)) {
jam();
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to left/right subtree
treePos.m_loc = loc;
goto loop;
}
// position is immediately before/after this node
treePos.m_pos = (i == 0 ? 0 : occup);
treePos.m_match = false;
return;
}
if (ret == 0) {
jam();
// position is at first/last entry
treePos.m_pos = (i == 0 ? 0 : occup - 1);
treePos.m_match = true;
return;
}
}
// access rest of the bounding node
accessNode(signal, node, AccFull);
// position is strictly within the node
ndbrequire(occup >= 2);
const unsigned numWithin = occup - 2;
for (unsigned j = 1; j <= numWithin; j++) {
jam();
int ret = 0;
if (start < numAttrs) {
jam();
// read and compare remaining attributes
unsigned start1 = start;
readKeyAttrs(frag, node.getEnt(j), start1, c_entryKey);
ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(node.getEnt(j));
}
if (ret <= 0) {
jam();
// position is before or at this entry
treePos.m_pos = j;
treePos.m_match = (ret == 0);
return;
}
}
// position is before last entry
treePos.m_pos = occup - 1;
treePos.m_match = false;
return;
}
}
/*
* Add entry.
*/
......
......@@ -7,6 +7,7 @@ libdbtux_a_SOURCES = \
DbtuxNode.cpp \
DbtuxTree.cpp \
DbtuxScan.cpp \
DbtuxSearch.cpp \
DbtuxCmp.cpp \
DbtuxDebug.cpp
......
......@@ -49,4 +49,7 @@ optim 10 mc02/a 44 ms 65 ms 46 pct
optim 11 mc02/a 43 ms 63 ms 46 pct
mc02/b 52 ms 86 ms 63 pct
optim 12 mc02/a 38 ms 55 ms 43 pct
mc02/b 47 ms 77 ms 63 pct
vim: set et:
......@@ -2525,7 +2525,7 @@ tbusybuild(Par par)
for (unsigned i = 0; i < par.m_subloop; i++) {
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, dropindex, ST);
}
return 0;
......@@ -2564,9 +2564,11 @@ ttimemaint(Par par)
t1.off(par.m_totrows);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
t2.on();
RUNSTEP(par, pkupdate, MT);
t2.off(par.m_totrows);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, dropindex, ST);
}
LL1("update - " << t1.time());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment