Commit 432277eb authored by pekka@mysql.com's avatar pekka@mysql.com

tux optim 12 - remove max prefix + related

parent 3f20e69c
......@@ -88,7 +88,7 @@
* Ordered index constants. Make configurable per index later.
*/
#define MAX_TTREE_NODE_SIZE 64 // total words in node
#define MAX_TTREE_PREF_SIZE 4 // words in min/max prefix each
#define MAX_TTREE_PREF_SIZE 4 // words in min prefix
#define MAX_TTREE_NODE_SLACK 3 // diff between max and min occupancy
/*
......
......@@ -77,10 +77,14 @@
#define jam() jamLine(60000 + __LINE__)
#define jamEntry() jamEntryLine(60000 + __LINE__)
#endif
#ifdef DBTUX_CMP_CPP
#ifdef DBTUX_SEARCH_CPP
#define jam() jamLine(70000 + __LINE__)
#define jamEntry() jamEntryLine(70000 + __LINE__)
#endif
#ifdef DBTUX_CMP_CPP
#define jam() jamLine(80000 + __LINE__)
#define jamEntry() jamEntryLine(80000 + __LINE__)
#endif
#ifdef DBTUX_DEBUG_CPP
#define jam() jamLine(90000 + __LINE__)
#define jamEntry() jamEntryLine(90000 + __LINE__)
......@@ -112,6 +116,7 @@ public:
static const unsigned DescPageSize = 256;
private:
static const unsigned MaxTreeNodeSize = MAX_TTREE_NODE_SIZE;
static const unsigned MaxPrefSize = MAX_TTREE_PREF_SIZE;
static const unsigned ScanBoundSegmentSize = 7;
static const unsigned MaxAccLockOps = MAX_PARALLEL_OP_PER_SCAN;
BLOCK_DEFINES(Dbtux);
......@@ -206,19 +211,19 @@ private:
unsigned m_fragBit : 1; // which duplicated table fragment
TreeEnt();
// methods
bool eq(const TreeEnt ent) const;
int cmp(const TreeEnt ent) const;
};
static const unsigned TreeEntSize = sizeof(TreeEnt) >> 2;
static const TreeEnt NullTreeEnt;
/*
* Tree node has 1) fixed part 2) actual table data for min and max
* prefix 3) max and min entries 4) rest of entries 5) one extra entry
* Tree node has 1) fixed part 2) a prefix of index key data for min
* entry 3) max and min entries 4) rest of entries 5) one extra entry
* used as work space.
*
* struct TreeNode part 1, size 6 words
* min prefix part 2, size TreeHead::m_prefSize
* max prefix part 2, size TreeHead::m_prefSize
* max entry part 3
* min entry part 3
* rest of entries part 4
......@@ -265,14 +270,14 @@ private:
friend struct TreeHead;
struct TreeHead {
Uint8 m_nodeSize; // words in tree node
Uint8 m_prefSize; // words in min/max prefix each
Uint8 m_prefSize; // words in min prefix
Uint8 m_minOccup; // min entries in internal node
Uint8 m_maxOccup; // max entries in node
TupLoc m_root; // root node
TreeHead();
// methods
unsigned getSize(AccSize acc) const;
Data getPref(TreeNode* node, unsigned i) const;
Data getPref(TreeNode* node) const;
TreeEnt* getEntList(TreeNode* node) const;
};
......@@ -514,6 +519,8 @@ private:
NodeHandle(Frag& frag);
NodeHandle(const NodeHandle& node);
NodeHandle& operator=(const NodeHandle& node);
// check if unassigned
bool isNull();
// getters
TupLoc getLink(unsigned i);
unsigned getChilds(); // cannot spell
......@@ -528,7 +535,7 @@ private:
void setBalance(int b);
void setNodeScan(Uint32 scanPtrI);
// access other parts of the node
Data getPref(unsigned i);
Data getPref();
TreeEnt getEnt(unsigned pos);
TreeEnt getMinMax(unsigned i);
// for ndbrequire and ndbassert
......@@ -618,7 +625,7 @@ private:
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc);
void insertNode(Signal* signal, NodeHandle& node, AccSize acc);
void deleteNode(Signal* signal, NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node, unsigned i);
void setNodePref(Signal* signal, NodeHandle& node);
// node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent);
......@@ -633,7 +640,6 @@ private:
/*
* DbtuxTree.cpp
*/
void treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
......@@ -657,12 +663,20 @@ private:
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
void releaseScanOp(ScanOpPtr& scanPtr);
/*
* DbtuxSearch.cpp
*/
void searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/*
* DbtuxCmp.cpp
*/
int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2 = MaxAttrDataSize);
int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2);
int cmpScanBound(const Frag& frag, const BoundPar boundPar);
int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey);
int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen = MaxAttrDataSize);
int cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey);
/*
* DbtuxDebug.cpp
......@@ -675,6 +689,7 @@ private:
TupLoc m_parent; // expected parent address
int m_depth; // returned depth
unsigned m_occup; // returned occupancy
TreeEnt m_minmax[2]; // returned subtree min and max
bool m_ok; // returned status
PrintPar();
};
......@@ -699,6 +714,8 @@ private:
DebugTree = 4, // log and check tree after each op
DebugScan = 8 // log scans
};
static const int DataFillByte = 0xa2;
static const int NodeFillByte = 0xa4;
#endif
// start up info
......@@ -859,13 +876,18 @@ Dbtux::TreeEnt::TreeEnt() :
{
}
inline bool
Dbtux::TreeEnt::eq(const TreeEnt ent) const
{
return
m_tupLoc == ent.m_tupLoc &&
m_tupVersion == ent.m_tupVersion &&
m_fragBit == ent.m_fragBit;
}
inline int
Dbtux::TreeEnt::cmp(const TreeEnt ent) const
{
if (m_fragBit < ent.m_fragBit)
return -1;
if (m_fragBit > ent.m_fragBit)
return +1;
if (m_tupLoc.m_pageId < ent.m_tupLoc.m_pageId)
return -1;
if (m_tupLoc.m_pageId > ent.m_tupLoc.m_pageId)
......@@ -878,6 +900,10 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const
return -1;
if (m_tupVersion > ent.m_tupVersion)
return +1;
if (m_fragBit < ent.m_fragBit)
return -1;
if (m_fragBit > ent.m_fragBit)
return +1;
return 0;
}
......@@ -920,7 +946,7 @@ Dbtux::TreeHead::getSize(AccSize acc) const
case AccHead:
return NodeHeadSize;
case AccPref:
return NodeHeadSize + 2 * m_prefSize + 2 * TreeEntSize;
return NodeHeadSize + m_prefSize + 2 * TreeEntSize;
case AccFull:
return m_nodeSize;
}
......@@ -929,16 +955,16 @@ Dbtux::TreeHead::getSize(AccSize acc) const
}
inline Dbtux::Data
Dbtux::TreeHead::getPref(TreeNode* node, unsigned i) const
Dbtux::TreeHead::getPref(TreeNode* node) const
{
Uint32* ptr = (Uint32*)node + NodeHeadSize + i * m_prefSize;
Uint32* ptr = (Uint32*)node + NodeHeadSize;
return ptr;
}
inline Dbtux::TreeEnt*
Dbtux::TreeHead::getEntList(TreeNode* node) const
{
Uint32* ptr = (Uint32*)node + NodeHeadSize + 2 * m_prefSize;
Uint32* ptr = (Uint32*)node + NodeHeadSize + m_prefSize;
return (TreeEnt*)ptr;
}
......@@ -1087,6 +1113,12 @@ Dbtux::NodeHandle::operator=(const NodeHandle& node)
return *this;
}
inline bool
Dbtux::NodeHandle::isNull()
{
return m_node == 0;
}
inline Dbtux::TupLoc
Dbtux::NodeHandle::getLink(unsigned i)
{
......@@ -1161,11 +1193,11 @@ Dbtux::NodeHandle::setNodeScan(Uint32 scanPtrI)
}
inline Dbtux::Data
Dbtux::NodeHandle::getPref(unsigned i)
Dbtux::NodeHandle::getPref()
{
TreeHead& tree = m_frag.m_tree;
ndbrequire(m_acc >= AccPref && i <= 1);
return tree.getPref(m_node, i);
ndbrequire(m_acc >= AccPref);
return tree.getPref(m_node);
}
inline Dbtux::TreeEnt
......
......@@ -25,14 +25,14 @@
* prefix may be partial in which case CmpUnknown may be returned.
*/
int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2)
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, ConstData entryData, unsigned maxlen)
{
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// number of words of attribute data left
unsigned len2 = maxlen2;
unsigned len2 = maxlen;
// skip to right position in search key
data1 += start;
searchKey += start;
int ret = 0;
while (start < numAttrs) {
if (len2 < AttributeHeaderSize) {
......@@ -41,20 +41,20 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstDat
break;
}
len2 -= AttributeHeaderSize;
if (*data1 != 0) {
if (! data2.ah().isNULL()) {
if (*searchKey != 0) {
if (! entryData.ah().isNULL()) {
jam();
// current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start];
const unsigned typeId = descAttr.m_typeId;
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
ndbrequire(size1 != 0 && size1 == data2.ah().getDataSize());
ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2);
len2 -= size2;
// compare
const Uint32* const p1 = *data1;
const Uint32* const p2 = &data2[AttributeHeaderSize];
const Uint32* const p1 = *searchKey;
const Uint32* const p2 = &entryData[AttributeHeaderSize];
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
if (ret != 0) {
jam();
......@@ -67,15 +67,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstDat
break;
}
} else {
if (! data2.ah().isNULL()) {
if (! entryData.ah().isNULL()) {
jam();
// NULL > not NULL
ret = +1;
break;
}
}
data1 += 1;
data2 += AttributeHeaderSize + data2.ah().getDataSize();
searchKey += 1;
entryData += AttributeHeaderSize + entryData.ah().getDataSize();
start++;
}
// XXX until data format errors are handled
......@@ -89,17 +89,17 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstDat
* Start position is updated as in previous routine.
*/
int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2)
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData searchKey, TableData entryKey)
{
const unsigned numAttrs = frag.m_numAttrs;
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// skip to right position
data1 += start;
data2 += start;
searchKey += start;
entryKey += start;
int ret = 0;
while (start < numAttrs) {
if (*data1 != 0) {
if (*data2 != 0) {
if (*searchKey != 0) {
if (*entryKey != 0) {
jam();
// current attribute
const DescAttr& descAttr = descEnt.m_descAttr[start];
......@@ -107,8 +107,8 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableDat
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare
const Uint32* const p1 = *data1;
const Uint32* const p2 = *data2;
const Uint32* const p1 = *searchKey;
const Uint32* const p2 = *entryKey;
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
if (ret != 0) {
jam();
......@@ -121,15 +121,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableDat
break;
}
} else {
if (*data2 != 0) {
if (*entryKey != 0) {
jam();
// NULL > not NULL
ret = +1;
break;
}
}
data1 += 1;
data2 += 1;
searchKey += 1;
entryKey += 1;
start++;
}
// XXX until data format errors are handled
......@@ -137,71 +137,68 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableDat
return ret;
}
/*
* Scan bound vs tree entry.
* Scan bound vs node prefix.
*
* Compare lower or upper bound and index attribute data. The attribute
* data may be partial in which case CmpUnknown may be returned.
* Returns -1 if the boundary is to the left of the compared key and +1 if
* the boundary is to the right of the compared key.
* Returns -1 if the boundary is to the left of the compared key and +1
* if the boundary is to the right of the compared key.
*
* To get this behaviour we treat equality a little bit special.
* If the boundary is a lower bound then the boundary is to the left of all
* equal keys and if it is an upper bound then the boundary is to the right
* of all equal keys.
* To get this behaviour we treat equality a little bit special. If the
* boundary is a lower bound then the boundary is to the left of all
* equal keys and if it is an upper bound then the boundary is to the
* right of all equal keys.
*
* When searching for the first key we are using the lower bound to try
* to find the first key that is to the right of the boundary.
* Then we start scanning from this tuple (including the tuple itself)
* until we find the first key which is to the right of the boundary. Then
* we stop and do not include that key in the scan result.
* to find the first key that is to the right of the boundary. Then we
* start scanning from this tuple (including the tuple itself) until we
* find the first key which is to the right of the boundary. Then we
* stop and do not include that key in the scan result.
*/
int
Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar)
Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
{
unsigned type = 4;
int ret = 0;
/*
No boundary means full scan, low boundary is to the right of all keys.
Thus we should always return -1. For upper bound we are to the right of
all keys, thus we should always return +1. We achieve this behaviour
by initialising return value to 0 and set type to 4.
*/
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
ConstData data1 = boundPar.m_data1;
ConstData data2 = boundPar.m_data2;
// direction 0-lower 1-upper
const unsigned dir = boundPar.m_dir;
ndbrequire(dir <= 1);
// number of words of data left
unsigned len2 = boundPar.m_len2;
for (unsigned i = 0; i < boundPar.m_count1; i++) {
unsigned len2 = maxlen;
/*
* No boundary means full scan, low boundary is to the right of all
* keys. Thus we should always return -1. For upper bound we are to
* the right of all keys, thus we should always return +1. We achieve
* this behaviour by initializing type to 4.
*/
unsigned type = 4;
while (boundCount != 0) {
if (len2 < AttributeHeaderSize) {
jam();
return NdbSqlUtil::CmpUnknown;
}
len2 -= AttributeHeaderSize;
// get and skip bound type
type = data1[0];
data1 += 1;
ndbrequire(! data1.ah().isNULL());
if (! data2.ah().isNULL()) {
type = boundInfo[0];
boundInfo += 1;
ndbrequire(! boundInfo.ah().isNULL());
if (! entryData.ah().isNULL()) {
jam();
// current attribute
const unsigned index = data1.ah().getAttributeId();
const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index];
const unsigned typeId = descAttr.m_typeId;
ndbrequire(data2.ah().getAttributeId() == descAttr.m_primaryAttrId);
ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
// full data size
const unsigned size1 = data1.ah().getDataSize();
ndbrequire(size1 != 0 && size1 == data2.ah().getDataSize());
const unsigned size1 = boundInfo.ah().getDataSize();
ndbrequire(size1 != 0 && size1 == entryData.ah().getDataSize());
const unsigned size2 = min(size1, len2);
len2 -= size2;
// compare
const Uint32* const p1 = &data1[AttributeHeaderSize];
const Uint32* const p2 = &data2[AttributeHeaderSize];
ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = &entryData[AttributeHeaderSize];
int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
// XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError);
if (ret != 0) {
jam();
return ret;
......@@ -209,21 +206,21 @@ Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar)
} else {
jam();
/*
NULL is bigger than any bound, thus the boundary is always to the
left of NULL
* NULL is bigger than any bound, thus the boundary is always to
* the left of NULL.
*/
return -1;
}
data1 += AttributeHeaderSize + data1.ah().getDataSize();
data2 += AttributeHeaderSize + data2.ah().getDataSize();
boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
entryData += AttributeHeaderSize + entryData.ah().getDataSize();
boundCount -= 1;
}
ndbassert(ret == 0);
if (dir == 0) {
jam();
/*
Looking for the lower bound. If strict lower bound then the boundary is
to the right of the compared key and otherwise (equal included in range)
then the boundary is to the left of the key.
* Looking for the lower bound. If strict lower bound then the
* boundary is to the right of the compared key and otherwise (equal
* included in range) then the boundary is to the left of the key.
*/
if (type == 1) {
jam();
......@@ -233,9 +230,10 @@ Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar)
} else {
jam();
/*
Looking for the upper bound. If strict upper bound then the boundary is
to the left of all equal keys and otherwise (equal included in the
range) then the boundary is to the right of all equal keys.
* Looking for the upper bound. If strict upper bound then the
* boundary is to the left of all equal keys and otherwise (equal
* included in the range) then the boundary is to the right of all
* equal keys.
*/
if (type == 3) {
jam();
......@@ -245,3 +243,67 @@ Dbtux::cmpScanBound(const Frag& frag, const BoundPar boundPar)
}
}
/*
* Scan bound vs tree entry.
*/
int
Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, TableData entryKey)
{
const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
// direction 0-lower 1-upper
ndbrequire(dir <= 1);
// initialize type to equality
unsigned type = 4;
while (boundCount != 0) {
// get and skip bound type
type = boundInfo[0];
boundInfo += 1;
ndbrequire(! boundInfo.ah().isNULL());
if (*entryKey != 0) {
jam();
// current attribute
const unsigned index = boundInfo.ah().getAttributeId();
const DescAttr& descAttr = descEnt.m_descAttr[index];
const unsigned typeId = descAttr.m_typeId;
// full data size
const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
// compare
const Uint32* const p1 = &boundInfo[AttributeHeaderSize];
const Uint32* const p2 = *entryKey;
int ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
// XXX until data format errors are handled
ndbrequire(ret != NdbSqlUtil::CmpError);
if (ret != 0) {
jam();
return ret;
}
} else {
jam();
/*
* NULL is bigger than any bound, thus the boundary is always to
* the left of NULL.
*/
return -1;
}
boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
entryKey += 1;
boundCount -= 1;
}
if (dir == 0) {
// lower bound
jam();
if (type == 1) {
jam();
return +1;
}
return -1;
} else {
// upper bound
jam();
if (type == 3) {
jam();
return -1;
}
return +1;
}
}
......@@ -137,16 +137,17 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
par.m_ok = false;
}
}
static const char* const sep = " *** ";
// check child-parent links
if (node.getLink(2) != par.m_parent) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "parent loc " << hex << node.getLink(2);
out << " should be " << hex << par.m_parent << endl;
}
if (node.getSide() != par.m_side) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "side " << dec << node.getSide();
out << " should be " << dec << par.m_side << endl;
}
......@@ -154,26 +155,26 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
const int balance = -cpar[0].m_depth + cpar[1].m_depth;
if (node.getBalance() != balance) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "balance " << node.getBalance();
out << " should be " << balance << endl;
}
if (abs(node.getBalance()) > 1) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "balance " << node.getBalance() << " is invalid" << endl;
}
// check occupancy
if (node.getOccup() > tree.m_maxOccup) {
if (node.getOccup() == 0 || node.getOccup() > tree.m_maxOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "occupancy " << node.getOccup();
out << " greater than max " << tree.m_maxOccup << endl;
out << " zero or greater than max " << tree.m_maxOccup << endl;
}
// check for occupancy of interior node
if (node.getChilds() == 2 && node.getOccup() < tree.m_minOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl;
}
......@@ -183,13 +184,74 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
node.getLink(1 - i) == NullTupLoc &&
node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) {
par.m_ok = false;
out << par.m_path << " *** ";
out << par.m_path << sep;
out << "missed merge with child " << i << endl;
}
}
// check inline prefix
{ ConstData data1 = node.getPref();
Uint32 data2[MaxPrefSize];
memset(data2, DataFillByte, MaxPrefSize << 2);
readKeyAttrs(frag, node.getMinMax(0), 0, c_searchKey);
copyAttrs(frag, c_searchKey, data2, tree.m_prefSize);
for (unsigned n = 0; n < tree.m_prefSize; n++) {
if (data1[n] != data2[n]) {
par.m_ok = false;
out << par.m_path << sep;
out << "inline prefix mismatch word " << n;
out << " value " << hex << data1[n];
out << " should be " << hex << data2[n] << endl;
break;
}
}
}
// check ordering within node
for (unsigned j = 1; j < node.getOccup(); j++) {
unsigned start = 0;
const TreeEnt ent1 = node.getEnt(j - 1);
const TreeEnt ent2 = node.getEnt(j);
if (j == 1) {
readKeyAttrs(frag, ent1, start, c_searchKey);
} else {
memcpy(c_searchKey, c_entryKey, frag.m_numAttrs << 2);
}
readKeyAttrs(frag, ent2, start, c_entryKey);
int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
if (ret == 0)
ret = ent1.cmp(ent2);
if (ret != -1) {
par.m_ok = false;
out << par.m_path << sep;
out << " disorder within node at pos " << j << endl;
}
}
// check ordering wrt subtrees
for (unsigned i = 0; i <= 1; i++) {
if (node.getLink(i) == NullTupLoc)
continue;
const TreeEnt ent1 = cpar[i].m_minmax[1 - i];
const TreeEnt ent2 = node.getMinMax(i);
unsigned start = 0;
readKeyAttrs(frag, ent1, start, c_searchKey);
readKeyAttrs(frag, ent2, start, c_entryKey);
int ret = cmpSearchKey(frag, start, c_searchKey, c_entryKey);
if (ret == 0)
ret = ent1.cmp(ent2);
if (ret != (i == 0 ? -1 : +1)) {
par.m_ok = false;
out << par.m_path << sep;
out << " disorder wrt subtree " << i << endl;
}
}
// return values
par.m_depth = 1 + max(cpar[0].m_depth, cpar[1].m_depth);
par.m_occup = node.getOccup();
for (unsigned i = 0; i <= 1; i++) {
if (node.getLink(i) == NullTupLoc)
par.m_minmax[i] = node.getMinMax(i);
else
par.m_minmax[i] = cpar[i].m_minmax[i];
}
}
NdbOut&
......@@ -355,20 +417,19 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node)
out << " [acc " << dec << node.m_acc << "]";
out << " [node " << *node.m_node << "]";
if (node.m_acc >= Dbtux::AccPref) {
for (unsigned i = 0; i <= 1; i++) {
out << " [pref " << dec << i;
const Uint32* data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + i * tree.m_prefSize;
for (unsigned j = 0; j < node.m_frag.m_tree.m_prefSize; j++)
const Uint32* data;
out << " [pref";
data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize;
for (unsigned j = 0; j < tree.m_prefSize; j++)
out << " " << hex << data[j];
out << "]";
}
out << " [entList";
unsigned numpos = node.m_node->m_occup;
if (node.m_acc < Dbtux::AccFull && numpos > 2) {
numpos = 2;
out << "(" << dec << numpos << ")";
}
const Uint32* data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + 2 * tree.m_prefSize;
data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize;
const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data;
for (unsigned pos = 0; pos < numpos; pos++)
out << " " << entList[pos];
......
......@@ -26,7 +26,12 @@ Dbtux::Dbtux(const Configuration& conf) :
#ifdef VM_TRACE
debugFile(0),
debugOut(*new NullOutputStream()),
// until ndb_mgm supports dump
#ifdef DBTUX_DEBUG_TREE
debugFlags(DebugTree),
#else
debugFlags(0),
#endif
#endif
c_internalStartPhase(0),
c_typeOfStart(NodeState::ST_ILLEGAL_TYPE),
......@@ -314,6 +319,9 @@ Dbtux::copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2
keyAttrs += 1;
data1 += 1;
}
#ifdef VM_TRACE
memset(data2, DataFillByte, len2 << 2);
#endif
}
BLOCK_FUNCTIONS(Dbtux);
......@@ -111,19 +111,18 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
debugOut << endl;
}
#endif
// find position in tree
// do the operation
req->errorCode = 0;
TreePos treePos;
treeSearch(signal, frag, c_searchKey, ent, treePos);
switch (opCode) {
case TuxMaintReq::OpAdd:
jam();
searchToAdd(signal, frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << endl;
}
#endif
// do the operation
req->errorCode = 0;
switch (opCode) {
case TuxMaintReq::OpAdd:
jam();
if (treePos.m_match) {
jam();
// there is no "Building" state so this will have to do
......@@ -152,6 +151,12 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break;
case TuxMaintReq::OpRemove:
jam();
searchToRemove(signal, frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
if (debugFlags & DebugMaint) {
debugOut << treePos << endl;
}
#endif
if (! treePos.m_match) {
jam();
// there is no "Building" state so this will have to do
......@@ -167,7 +172,6 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
ndbrequire(false);
break;
}
// commit and release nodes
#ifdef VM_TRACE
if (debugFlags & DebugTree) {
printTree(signal, frag, debugOut);
......
......@@ -85,10 +85,9 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc)
new (node.m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
memset(node.getPref(0), 0xa2, tree.m_prefSize << 2);
memset(node.getPref(1), 0xa2, tree.m_prefSize << 2);
memset(node.getPref(), DataFillByte, tree.m_prefSize << 2);
TreeEnt* entList = tree.getEntList(node.m_node);
memset(entList, 0xa4, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
memset(entList, NodeFillByte, (tree.m_maxOccup + 1) * (TreeEntSize << 2));
#endif
}
......@@ -116,12 +115,12 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
* attribute headers for now. XXX use null mask instead
*/
void
Dbtux::setNodePref(Signal* signal, NodeHandle& node, unsigned i)
Dbtux::setNodePref(Signal* signal, NodeHandle& node)
{
const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree;
readKeyAttrs(frag, node.getMinMax(i), 0, c_entryKey);
copyAttrs(frag, c_entryKey, node.getPref(i), tree.m_prefSize);
readKeyAttrs(frag, node.getMinMax(0), 0, c_entryKey);
copyAttrs(frag, c_entryKey, node.getPref(), tree.m_prefSize);
}
// node operations
......@@ -173,11 +172,9 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// fix prefixes
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(signal, node, 0);
if (occup == 0 || pos == occup)
setNodePref(signal, node, 1);
setNodePref(signal, node);
}
/*
......@@ -248,11 +245,9 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
}
entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefixes
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(signal, node, 0);
if (occup != 1 && pos == occup - 1)
setNodePref(signal, node, 1);
setNodePref(signal, node);
}
/*
......@@ -325,11 +320,9 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefixes
// fix prefix
if (true)
setNodePref(signal, node, 0);
if (occup == 1 || pos == occup - 1)
setNodePref(signal, node, 1);
setNodePref(signal, node);
}
/*
......@@ -403,11 +396,9 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
}
tmpList[0] = newMin;
entList[0] = entList[occup];
// fix prefixes
// fix prefix
if (true)
setNodePref(signal, node, 0);
if (occup == 1 || pos == occup - 1)
setNodePref(signal, node, 1);
setNodePref(signal, node);
}
/*
......
......@@ -689,16 +689,9 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
TreeHead& tree = frag.m_tree;
if (tree.m_root == NullTupLoc) {
// tree may have become empty
jam();
scan.m_state = ScanOp::Last;
return;
}
TreePos pos;
pos.m_loc = tree.m_root;
NodeHandle node(frag);
// unpack lower bound
// set up index keys for this operation
setKeyAttrs(frag);
// unpack lower bound into c_dataBuffer
const ScanBound& bound = *scan.m_bound[0];
ScanBoundIterator iter;
bound.first(iter);
......@@ -707,103 +700,22 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
c_dataBuffer[j] = *iter.data;
bound.next(iter);
}
// comparison parameters
BoundPar boundPar;
boundPar.m_data1 = c_dataBuffer;
boundPar.m_count1 = scan.m_boundCnt[0];
boundPar.m_dir = 0;
loop: {
jam();
selectNode(signal, node, pos.m_loc, AccPref);
const unsigned occup = node.getOccup();
ndbrequire(occup != 0);
for (unsigned i = 0; i <= 1; i++) {
jam();
// compare prefix
boundPar.m_data2 = node.getPref(i);
boundPar.m_len2 = tree.m_prefSize;
int ret = cmpScanBound(frag, boundPar);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read full value
ReadPar readPar;
readPar.m_ent = node.getMinMax(i);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
tupReadAttrs(signal, frag, readPar);
// compare full value
boundPar.m_data2 = readPar.m_data;
boundPar.m_len2 = ZNIL; // big
ret = cmpScanBound(frag, boundPar);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (i == 0 && ret < 0) {
jam();
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
pos.m_loc = loc;
goto loop;
}
// start scanning this node
pos.m_pos = 0;
pos.m_match = false;
pos.m_dir = 3;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
linkScan(node, scanPtr);
return;
}
if (i == 1 && ret > 0) {
jam();
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
// search for scan start position
TreePos treePos;
searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
if (treePos.m_loc == NullTupLoc) {
// empty tree
jam();
// continue to right subtree
pos.m_loc = loc;
goto loop;
}
// start scanning upwards
pos.m_dir = 1;
scan.m_scanPos = pos;
scan.m_state = ScanOp::Next;
linkScan(node, scanPtr);
scan.m_state = ScanOp::Last;
return;
}
}
// read rest of current node
accessNode(signal, node, AccFull);
// look for first entry
ndbrequire(occup >= 2);
for (unsigned j = 1; j < occup; j++) {
jam();
ReadPar readPar;
readPar.m_ent = node.getEnt(j);
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
tupReadAttrs(signal, frag, readPar);
// compare
boundPar.m_data2 = readPar.m_data;
boundPar.m_len2 = ZNIL; // big
int ret = cmpScanBound(frag, boundPar);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
jam();
// start scanning this node
pos.m_pos = j;
pos.m_match = false;
pos.m_dir = 3;
scan.m_scanPos = pos;
// set position and state
scan.m_scanPos = treePos;
scan.m_state = ScanOp::Next;
// link the scan to node found
NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc, AccFull);
linkScan(node, scanPtr);
return;
}
}
ndbrequire(false);
}
}
/*
......@@ -841,7 +753,9 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_accLockOp = RNIL;
scan.m_state = ScanOp::Current;
}
// unpack upper bound
// set up index keys for this operation
setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer
const ScanBound& bound = *scan.m_bound[1];
ScanBoundIterator iter;
bound.first(iter);
......@@ -850,11 +764,6 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
c_dataBuffer[j] = *iter.data;
bound.next(iter);
}
// comparison parameters
BoundPar boundPar;
boundPar.m_data1 = c_dataBuffer;
boundPar.m_count1 = scan.m_boundCnt[1];
boundPar.m_dir = 1;
// use copy of position
TreePos pos = scan.m_scanPos;
// get and remember original node
......@@ -912,17 +821,9 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
jam();
pos.m_ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged
// XXX implement prefix optimization
ReadPar readPar;
readPar.m_ent = pos.m_ent;
readPar.m_first = 0;
readPar.m_count = frag.m_numAttrs;
readPar.m_data = 0; // leave in signal data
tupReadAttrs(signal, frag, readPar);
// compare
boundPar.m_data2 = readPar.m_data;
boundPar.m_len2 = ZNIL; // big
int ret = cmpScanBound(frag, boundPar);
// read and compare all attributes
readKeyAttrs(frag, pos.m_ent, 0, c_entryKey);
int ret = cmpScanBound(frag, 1, c_dataBuffer, scan.m_boundCnt[1], c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
jam();
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define DBTUX_SEARCH_CPP
#include "Dbtux.hpp"
/*
* Search for entry to add.
*
* Similar to searchToRemove (see below).
*
* TODO optimize for initial equal attrs in node min/max
*/
void
Dbtux::searchToAdd(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
/*
* In order to not (yet) change old behaviour, a position between
* 2 nodes returns the one at the bottom of the tree.
*/
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
unsigned start = 0;
ret = cmpSearchKey(frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare remaining attributes
ndbrequire(start < numAttrs);
readKeyAttrs(frag, currNode.getMinMax(0), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getMinMax(0));
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b but remember the bottom node
bottomNode = currNode;
currNode = glbNode;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = true;
return;
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
jam();
int ret;
// read and compare attributes
unsigned start = 0;
readKeyAttrs(frag, currNode.getEnt(j), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getEnt(j));
}
if (ret <= 0) {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = (ret == 0);
return;
}
}
if (! bottomNode.isNull()) {
jam();
// backwards compatible for now
treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
return;
}
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup();
treePos.m_match = false;
}
/*
* Search for entry to remove.
*
* Compares search key to each node min. A move to right subtree can
* overshoot target node. The last such node is saved. The final node
* is a half-leaf or leaf. If search key is less than final node min
* then the saved node is the g.l.b of the final node and we move back
* to it.
*/
void
Dbtux::searchToRemove(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
unsigned start = 0;
ret = cmpSearchKey(frag, start, searchKey, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare remaining attributes
ndbrequire(start < numAttrs);
readKeyAttrs(frag, currNode.getMinMax(0), start, c_entryKey);
ret = cmpSearchKey(frag, start, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(currNode.getMinMax(0));
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b
currNode = glbNode;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = true;
return;
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
// pos 0 was handled above
for (unsigned j = 1, occup = currNode.getOccup(); j < occup; j++) {
jam();
// compare only the entry
if (searchEnt.eq(currNode.getEnt(j))) {
jam();
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = true;
return;
}
}
treePos.m_loc = currNode.m_loc;
treePos.m_pos = currNode.getOccup();
treePos.m_match = false;
}
/*
* Search for scan start position.
*
* Similar to searchToAdd.
*/
void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag);
currNode.m_loc = tree.m_root;
if (currNode.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_match = false;
return;
}
NodeHandle glbNode(frag); // potential g.l.b of final node
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
int ret;
// compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare all attributes
readKeyAttrs(frag, currNode.getMinMax(0), 0, c_entryKey);
ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret < 0) {
jam();
const TupLoc loc = currNode.getLink(0);
if (loc != NullTupLoc) {
jam();
// continue to left subtree
currNode.m_loc = loc;
continue;
}
if (! glbNode.isNull()) {
jam();
// move up to the g.l.b but remember the bottom node
bottomNode = currNode;
currNode = glbNode;
} else {
// start scanning this node
treePos.m_loc = currNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
} else if (ret > 0) {
jam();
const TupLoc loc = currNode.getLink(1);
if (loc != NullTupLoc) {
jam();
// save potential g.l.b
glbNode = currNode;
// continue to right subtree
currNode.m_loc = loc;
continue;
}
} else {
ndbassert(false);
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
jam();
int ret;
// read and compare attributes
readKeyAttrs(frag, currNode.getEnt(j), 0, c_entryKey);
ret = cmpScanBound(frag, 0, boundInfo, boundCount, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
// start scanning from current entry
treePos.m_loc = currNode.m_loc;
treePos.m_pos = j;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
}
if (! bottomNode.isNull()) {
jam();
// start scanning the l.u.b
treePos.m_loc = bottomNode.m_loc;
treePos.m_pos = 0;
treePos.m_match = false;
treePos.m_dir = 3;
return;
}
// start scanning upwards (pretend we came from right child)
treePos.m_loc = currNode.m_loc;
treePos.m_dir = 1;
}
......@@ -17,112 +17,6 @@
#define DBTUX_TREE_CPP
#include "Dbtux.hpp"
/*
* Search for entry.
*
* Search key is index attribute data and tree entry value. Start from
* root node and compare the key to min/max of each node. Use linear
* search on the final (bounding) node. Initial attributes which are
* same in min/max need not be checked.
*/
void
Dbtux::treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos)
{
const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs;
treePos.m_loc = tree.m_root;
if (treePos.m_loc == NullTupLoc) {
// empty tree
jam();
treePos.m_pos = 0;
treePos.m_match = false;
return;
}
NodeHandle node(frag);
loop: {
jam();
selectNode(signal, node, treePos.m_loc, AccPref);
const unsigned occup = node.getOccup();
ndbrequire(occup != 0);
// number of equal initial attributes in bounding node
unsigned start = ZNIL;
for (unsigned i = 0; i <= 1; i++) {
jam();
unsigned start1 = 0;
// compare prefix
int ret = cmpSearchKey(frag, start1, searchKey, node.getPref(i), tree.m_prefSize);
if (ret == NdbSqlUtil::CmpUnknown) {
jam();
// read and compare remaining attributes
readKeyAttrs(frag, node.getMinMax(i), start1, c_entryKey);
ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (start > start1)
start = start1;
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(node.getMinMax(i));
}
if (i == 0 ? (ret < 0) : (ret > 0)) {
jam();
const TupLoc loc = node.getLink(i);
if (loc != NullTupLoc) {
jam();
// continue to left/right subtree
treePos.m_loc = loc;
goto loop;
}
// position is immediately before/after this node
treePos.m_pos = (i == 0 ? 0 : occup);
treePos.m_match = false;
return;
}
if (ret == 0) {
jam();
// position is at first/last entry
treePos.m_pos = (i == 0 ? 0 : occup - 1);
treePos.m_match = true;
return;
}
}
// access rest of the bounding node
accessNode(signal, node, AccFull);
// position is strictly within the node
ndbrequire(occup >= 2);
const unsigned numWithin = occup - 2;
for (unsigned j = 1; j <= numWithin; j++) {
jam();
int ret = 0;
if (start < numAttrs) {
jam();
// read and compare remaining attributes
unsigned start1 = start;
readKeyAttrs(frag, node.getEnt(j), start1, c_entryKey);
ret = cmpSearchKey(frag, start1, searchKey, c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
}
if (ret == 0) {
jam();
// keys are equal, compare entry values
ret = searchEnt.cmp(node.getEnt(j));
}
if (ret <= 0) {
jam();
// position is before or at this entry
treePos.m_pos = j;
treePos.m_match = (ret == 0);
return;
}
}
// position is before last entry
treePos.m_pos = occup - 1;
treePos.m_match = false;
return;
}
}
/*
* Add entry.
*/
......
......@@ -7,6 +7,7 @@ libdbtux_a_SOURCES = \
DbtuxNode.cpp \
DbtuxTree.cpp \
DbtuxScan.cpp \
DbtuxSearch.cpp \
DbtuxCmp.cpp \
DbtuxDebug.cpp
......
......@@ -49,4 +49,7 @@ optim 10 mc02/a 44 ms 65 ms 46 pct
optim 11 mc02/a 43 ms 63 ms 46 pct
mc02/b 52 ms 86 ms 63 pct
optim 12 mc02/a 38 ms 55 ms 43 pct
mc02/b 47 ms 77 ms 63 pct
vim: set et:
......@@ -2525,7 +2525,7 @@ tbusybuild(Par par)
for (unsigned i = 0; i < par.m_subloop; i++) {
RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, MT);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, dropindex, ST);
}
return 0;
......@@ -2564,9 +2564,11 @@ ttimemaint(Par par)
t1.off(par.m_totrows);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST);
t2.on();
RUNSTEP(par, pkupdate, MT);
t2.off(par.m_totrows);
RUNSTEP(par, readverify, ST);
RUNSTEP(par, dropindex, ST);
}
LL1("update - " << t1.time());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment