Commit a573e148 authored by unknown's avatar unknown

Merge joreland@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into mysql.com:/home/jonas/src/mysql-4.1-ndb
parents 046e747c dd451232
......@@ -7,10 +7,12 @@
use strict;
use integer;
use Getopt::Long;
my $all = shift;
!defined($all) || ($all eq '--all' && !defined(shift))
or die "only available option is --all";
my $opt_all = 0;
my $opt_cnt = 5;
GetOptions("all" => \$opt_all, "cnt=i" => \$opt_cnt)
or die "options are: --all --cnt=N";
my $table = 't';
......@@ -67,15 +69,18 @@ sub mkall ($$$\@) {
my($col, $key1, $key2, $val) = @_;
my @a = ();
my $p = mkdummy(@$val);
push(@a, $p) if $all;
my @ops1 = $all ? qw(< <= = >= >) : qw(= >= >);
my @ops2 = $all ? qw(< <= = >= >) : qw(< <=);
push(@a, $p) if $opt_all;
my @ops = qw(< <= = >= >);
for my $op (@ops) {
my $p = mkone($col, $op, $key1, @$val);
push(@a, $p) if $opt_all || $p->{cnt} != 0;
}
my @ops1 = $opt_all ? @ops : qw(= >= >);
my @ops2 = $opt_all ? @ops : qw(<= <);
for my $op1 (@ops1) {
my $p = mkone($col, $op1, $key1, @$val);
push(@a, $p) if $all || $p->{cnt} != 0;
for my $op2 (@ops2) {
my $p = mktwo($col, $op1, $key1, $op2, $key2, @$val);
push(@a, $p) if $all || $p->{cnt} != 0;
push(@a, $p) if $opt_all || $p->{cnt} != 0;
}
}
return \@a;
......@@ -95,7 +100,7 @@ create table $table (
index (b, c, d)
) engine=ndb;
EOF
my @val = (0..4);
my @val = (0..($opt_cnt-1));
my $v0 = 0;
for my $v1 (@val) {
for my $v2 (@val) {
......
......@@ -250,8 +250,8 @@ private:
static const unsigned NodeHeadSize = sizeof(TreeNode) >> 2;
/*
* Tree nodes are not always accessed fully, for cache reasons. There
* are 3 access sizes.
* Tree node "access size" was for an early version with signal
* interface to TUP. It is now used only to compute sizes.
*/
enum AccSize {
AccNone = 0,
......@@ -284,7 +284,7 @@ private:
* m_occup), and whether the position is at an existing entry or
* before one (if any). Position m_occup points past the node and is
* also represented by position 0 of next node. Includes direction
* and copy of entry used by scan.
* used by scan.
*/
struct TreePos;
friend struct TreePos;
......@@ -292,8 +292,7 @@ private:
TupLoc m_loc; // physical node address
Uint16 m_pos; // position 0 to m_occup
Uint8 m_match; // at an existing entry
Uint8 m_dir; // from link (0-2) or within node (3)
TreeEnt m_ent; // copy of current entry
Uint8 m_dir; // see scanNext()
TreePos();
};
......@@ -374,6 +373,10 @@ private:
* a separate lock wait flag. It may be for current entry or it may
* be for an entry we were moved away from. In any case nothing
* happens with current entry before lock wait flag is cleared.
*
* An unfinished scan is always linked to some tree node, and has
* current position and direction (see comments at scanNext). There
* is also a copy of latest entry found.
*/
struct ScanOp;
friend struct ScanOp;
......@@ -412,7 +415,7 @@ private:
ScanBound* m_bound[2]; // pointers to above 2
Uint16 m_boundCnt[2]; // number of bounds in each
TreePos m_scanPos; // position
TreeEnt m_lastEnt; // last entry returned
TreeEnt m_scanEnt; // latest entry found
Uint32 m_nodeScan; // next scan at node (single-linked)
union {
Uint32 nextPool;
......@@ -519,7 +522,6 @@ private:
Frag& m_frag; // fragment using the node
TupLoc m_loc; // physical node address
TreeNode* m_node; // pointer to node storage
AccSize m_acc; // accessed size
NodeHandle(Frag& frag);
NodeHandle(const NodeHandle& node);
NodeHandle& operator=(const NodeHandle& node);
......@@ -580,9 +582,8 @@ private:
* DbtuxNode.cpp
*/
int allocNode(Signal* signal, NodeHandle& node);
void accessNode(Signal* signal, NodeHandle& node, AccSize acc);
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc);
void insertNode(Signal* signal, NodeHandle& node, AccSize acc);
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc);
void insertNode(Signal* signal, NodeHandle& node);
void deleteNode(Signal* signal, NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node);
// node operations
......@@ -968,8 +969,7 @@ Dbtux::TreePos::TreePos() :
m_loc(),
m_pos(ZNIL),
m_match(false),
m_dir(255),
m_ent()
m_dir(255)
{
}
......@@ -1010,7 +1010,7 @@ Dbtux::ScanOp::ScanOp(ScanBoundPool& scanBoundPool) :
m_boundMin(scanBoundPool),
m_boundMax(scanBoundPool),
m_scanPos(),
m_lastEnt(),
m_scanEnt(),
m_nodeScan(RNIL)
{
m_bound[0] = &m_boundMin;
......@@ -1084,8 +1084,7 @@ inline
Dbtux::NodeHandle::NodeHandle(Frag& frag) :
m_frag(frag),
m_loc(),
m_node(0),
m_acc(AccNone)
m_node(0)
{
}
......@@ -1093,8 +1092,7 @@ inline
Dbtux::NodeHandle::NodeHandle(const NodeHandle& node) :
m_frag(node.m_frag),
m_loc(node.m_loc),
m_node(node.m_node),
m_acc(node.m_acc)
m_node(node.m_node)
{
}
......@@ -1104,7 +1102,6 @@ Dbtux::NodeHandle::operator=(const NodeHandle& node)
ndbassert(&m_frag == &node.m_frag);
m_loc = node.m_loc;
m_node = node.m_node;
m_acc = node.m_acc;
return *this;
}
......@@ -1190,7 +1187,6 @@ inline Dbtux::Data
Dbtux::NodeHandle::getPref()
{
TreeHead& tree = m_frag.m_tree;
ndbrequire(m_acc >= AccPref);
return tree.getPref(m_node);
}
......@@ -1201,11 +1197,6 @@ Dbtux::NodeHandle::getEnt(unsigned pos)
TreeEnt* entList = tree.getEntList(m_node);
const unsigned occup = m_node->m_occup;
ndbrequire(pos < occup);
if (pos == 0 || pos == occup - 1) {
ndbrequire(m_acc >= AccPref)
} else {
ndbrequire(m_acc == AccFull)
}
return entList[(1 + pos) % occup];
}
......
......@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
}
TreeHead& tree = frag.m_tree;
NodeHandle node(frag);
selectNode(signal, node, loc, AccFull);
selectNode(signal, node, loc);
out << par.m_path << " " << node << endl;
// check children
PrintPar cpar[2];
......@@ -310,7 +310,6 @@ operator<<(NdbOut& out, const Dbtux::TreePos& pos)
out << " [pos " << dec << pos.m_pos << "]";
out << " [match " << dec << pos.m_match << "]";
out << " [dir " << dec << pos.m_dir << "]";
out << " [ent " << pos.m_ent << "]";
out << "]";
return out;
}
......@@ -347,6 +346,7 @@ operator<<(NdbOut& out, const Dbtux::ScanOp& scan)
out << " [lockMode " << dec << scan.m_lockMode << "]";
out << " [keyInfo " << dec << scan.m_keyInfo << "]";
out << " [pos " << scan.m_scanPos << "]";
out << " [ent " << scan.m_scanEnt << "]";
for (unsigned i = 0; i <= 1; i++) {
out << " [bound " << dec << i;
Dbtux::ScanBound& bound = *scan.m_bound[i];
......@@ -407,28 +407,21 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node)
const Dbtux::TreeHead& tree = frag.m_tree;
out << "[NodeHandle " << hex << &node;
out << " [loc " << node.m_loc << "]";
out << " [acc " << dec << node.m_acc << "]";
out << " [node " << *node.m_node << "]";
if (node.m_acc >= Dbtux::AccPref) {
const Uint32* data;
out << " [pref";
data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize;
for (unsigned j = 0; j < tree.m_prefSize; j++)
out << " " << hex << data[j];
out << "]";
out << " [entList";
unsigned numpos = node.m_node->m_occup;
if (node.m_acc < Dbtux::AccFull && numpos > 2) {
numpos = 2;
out << "(" << dec << numpos << ")";
}
data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize;
const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data;
// print entries in logical order
for (unsigned pos = 1; pos <= numpos; pos++)
out << " " << entList[pos % numpos];
out << "]";
}
const Uint32* data;
out << " [pref";
data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize;
for (unsigned j = 0; j < tree.m_prefSize; j++)
out << " " << hex << data[j];
out << "]";
out << " [entList";
unsigned numpos = node.m_node->m_occup;
data = (const Uint32*)node.m_node + Dbtux::NodeHeadSize + tree.m_prefSize;
const Dbtux::TreeEnt* entList = (const Dbtux::TreeEnt*)data;
// print entries in logical order
for (unsigned pos = 1; pos <= numpos; pos++)
out << " " << entList[pos % numpos];
out << "]";
out << "]";
return out;
}
......
......@@ -33,30 +33,16 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
jam();
node.m_loc = TupLoc(pageId, pageOffset);
node.m_node = reinterpret_cast<TreeNode*>(node32);
node.m_acc = AccNone;
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
}
return errorCode;
}
/*
* Access more of the node.
*/
void
Dbtux::accessNode(Signal* signal, NodeHandle& node, AccSize acc)
{
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
if (node.m_acc >= acc)
return;
// XXX could do prefetch
node.m_acc = acc;
}
/*
* Set handle to point to existing node.
*/
void
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc)
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
{
Frag& frag = node.m_frag;
ndbrequire(loc != NullTupLoc);
......@@ -67,21 +53,19 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc)
jamEntry();
node.m_loc = loc;
node.m_node = reinterpret_cast<TreeNode*>(node32);
node.m_acc = AccNone;
ndbrequire(node.m_loc != NullTupLoc && node.m_node != 0);
accessNode(signal, node, acc);
}
/*
* Set handle to point to new node. Uses the pre-allocated node.
*/
void
Dbtux::insertNode(Signal* signal, NodeHandle& node, AccSize acc)
Dbtux::insertNode(Signal* signal, NodeHandle& node)
{
Frag& frag = node.m_frag;
TupLoc loc = frag.m_freeLoc;
frag.m_freeLoc = NullTupLoc;
selectNode(signal, node, loc, acc);
selectNode(signal, node, loc);
new (node.m_node) TreeNode();
#ifdef VM_TRACE
TreeHead& tree = frag.m_tree;
......
......@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jam();
const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandle node(frag);
selectNode(signal, node, loc, AccHead);
selectNode(signal, node, loc);
unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc;
}
......@@ -350,7 +350,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_lockwait) {
jam();
// LQH asks if we are waiting for lock and we tell it to ask again
const TreeEnt ent = scan.m_scanPos.m_ent;
const TreeEnt ent = scan.m_scanEnt;
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scan.m_userPtr;
conf->accOperationPtr = RNIL; // no tuple returned
......@@ -385,7 +385,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
ndbrequire(scan.m_accLockOp == RNIL);
if (! scan.m_readCommitted) {
jam();
const TreeEnt ent = scan.m_scanPos.m_ent;
const TreeEnt ent = scan.m_scanEnt;
// read tuple key
readTablePk(frag, ent, pkData, pkSize);
// get read lock or exclusive lock
......@@ -473,7 +473,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
// we have lock or do not need one
jam();
// read keys if not already done (uses signal)
const TreeEnt ent = scan.m_scanPos.m_ent;
const TreeEnt ent = scan.m_scanEnt;
if (scan.m_keyInfo) {
jam();
if (pkSize == 0) {
......@@ -536,8 +536,6 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
total += length;
}
}
// remember last entry returned
scan.m_lastEnt = ent;
// next time look for next entry
scan.m_state = ScanOp::Next;
return;
......@@ -712,20 +710,21 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Next;
// link the scan to node found
NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc, AccFull);
selectNode(signal, node, treePos.m_loc);
linkScan(node, scanPtr);
}
/*
* Move to next entry. The scan is already linked to some node. When
* we leave, if any entry was found, it will be linked to a possibly
* different node. The scan has a direction, one of:
* different node. The scan has a position, and a direction which tells
* from where we came to this position. This is one of:
*
* 0 - coming up from left child
* 1 - coming up from right child (proceed to parent immediately)
* 2 - coming up from root (the scan ends)
* 3 - left to right within node
* 4 - coming down from parent to left or right child
* 0 - up from left child (scan this node next)
* 1 - up from right child (proceed to parent)
* 2 - up from root (the scan ends)
* 3 - left to right within node (at end proceed to right child)
* 4 - down from parent (proceed to left child)
*/
void
Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
......@@ -768,10 +767,12 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
TreePos pos = scan.m_scanPos;
// get and remember original node
NodeHandle origNode(frag);
selectNode(signal, origNode, pos.m_loc, AccHead);
selectNode(signal, origNode, pos.m_loc);
ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop
NodeHandle node = origNode;
// copy of entry found
TreeEnt ent;
while (true) {
jam();
if (pos.m_dir == 2) {
......@@ -783,7 +784,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
}
if (node.m_loc != pos.m_loc) {
jam();
selectNode(signal, node, pos.m_loc, AccHead);
selectNode(signal, node, pos.m_loc);
}
if (pos.m_dir == 4) {
// coming down from parent proceed to left child
......@@ -799,7 +800,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
pos.m_dir = 0;
}
if (pos.m_dir == 0) {
// coming from left child scan current node
// coming up from left child scan current node
jam();
pos.m_pos = 0;
pos.m_match = false;
......@@ -810,8 +811,6 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
jam();
unsigned occup = node.getOccup();
ndbrequire(occup >= 1);
// access full node
accessNode(signal, node, AccFull);
// advance position
if (! pos.m_match)
pos.m_match = true;
......@@ -819,10 +818,10 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
pos.m_pos++;
if (pos.m_pos < occup) {
jam();
pos.m_ent = node.getEnt(pos.m_pos);
ent = node.getEnt(pos.m_pos);
pos.m_dir = 3; // unchanged
// read and compare all attributes
readKeyAttrs(frag, pos.m_ent, 0, c_entryKey);
readKeyAttrs(frag, ent, 0, c_entryKey);
int ret = cmpScanBound(frag, 1, c_dataBuffer, scan.m_boundCnt[1], c_entryKey);
ndbrequire(ret != NdbSqlUtil::CmpUnknown);
if (ret < 0) {
......@@ -833,7 +832,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break;
}
// can we see it
if (! scanVisible(signal, scanPtr, pos.m_ent)) {
if (! scanVisible(signal, scanPtr, ent)) {
jam();
continue;
}
......@@ -853,7 +852,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
pos.m_dir = 1;
}
if (pos.m_dir == 1) {
// coming from right child proceed to parent
// coming up from right child proceed to parent
jam();
pos.m_loc = node.getLink(2);
pos.m_dir = node.getSide();
......@@ -871,6 +870,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
unlinkScan(origNode, scanPtr);
linkScan(node, scanPtr);
}
// copy found entry
scan.m_scanEnt = ent;
} else if (scan.m_state == ScanOp::Last) {
jam();
ndbrequire(pos.m_loc == NullTupLoc);
......@@ -888,7 +889,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
/*
* Check if an entry is visible to the scan.
*
* There is a special check to never return same tuple twice in a row.
* There is a special check to never accept same tuple twice in a row.
* This is faster than asking TUP. It also fixes some special cases
* which are not analyzed or handled yet.
*/
......@@ -903,8 +904,8 @@ Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent)
Uint32 tupAddr = getTupAddr(frag, ent);
Uint32 tupVersion = ent.m_tupVersion;
// check for same tuple twice in row
if (scan.m_lastEnt.m_tupLoc == ent.m_tupLoc &&
scan.m_lastEnt.m_fragBit == fragBit) {
if (scan.m_scanEnt.m_tupLoc == ent.m_tupLoc &&
scan.m_scanEnt.m_fragBit == fragBit) {
jam();
return false;
}
......
......@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
selectNode(signal, currNode, currNode.m_loc);
int ret;
// compare prefix
unsigned start = 0;
......@@ -100,8 +100,6 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
// anticipate
treePos.m_loc = currNode.m_loc;
// binary search
......@@ -184,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
selectNode(signal, currNode, currNode.m_loc);
int ret;
// compare prefix
unsigned start = 0;
......@@ -235,8 +233,6 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
// anticipate
treePos.m_loc = currNode.m_loc;
// pos 0 was handled above
......@@ -275,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo
NodeHandle bottomNode(frag);
while (true) {
jam();
selectNode(signal, currNode, currNode.m_loc, AccPref);
selectNode(signal, currNode, currNode.m_loc);
int ret;
// compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
......@@ -324,8 +320,6 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo
}
break;
}
// access rest of current node
accessNode(signal, currNode, AccFull);
for (unsigned j = 0, occup = currNode.getOccup(); j < occup; j++) {
jam();
int ret;
......
......@@ -29,14 +29,13 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// check for empty tree
if (treePos.m_loc == NullTupLoc) {
jam();
insertNode(signal, node, AccPref);
insertNode(signal, node);
nodePushUp(signal, node, 0, ent);
node.setSide(2);
tree.m_root = node.m_loc;
return;
}
// access full node
selectNode(signal, node, treePos.m_loc, AccFull);
selectNode(signal, node, treePos.m_loc);
// check if it is bounding node
if (pos != 0 && pos != node.getOccup()) {
jam();
......@@ -59,11 +58,9 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// find glb node
while (childLoc != NullTupLoc) {
jam();
selectNode(signal, node, childLoc, AccHead);
selectNode(signal, node, childLoc);
childLoc = node.getLink(1);
}
// access full node again
accessNode(signal, node, AccFull);
pos = node.getOccup();
}
// fall thru to next case
......@@ -79,7 +76,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
}
// add a new node
NodeHandle childNode(frag);
insertNode(signal, childNode, AccPref);
insertNode(signal, childNode);
nodePushUp(signal, childNode, 0, ent);
// connect parent and child
node.setLink(i, childNode.m_loc);
......@@ -105,7 +102,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// height of longer subtree increased
jam();
NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(i), AccHead);
selectNode(signal, childNode, node.getLink(i));
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
......@@ -129,7 +126,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
break;
}
i = node.getSide();
selectNode(signal, node, parentLoc, AccHead);
selectNode(signal, node, parentLoc);
}
}
......@@ -142,8 +139,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandle node(frag);
// access full node
selectNode(signal, node, treePos.m_loc, AccFull);
selectNode(signal, node, treePos.m_loc);
TreeEnt ent;
// check interior node first
if (node.getChilds() == 2) {
......@@ -161,11 +157,9 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
TupLoc childLoc = node.getLink(0);
while (childLoc != NullTupLoc) {
jam();
selectNode(signal, node, childLoc, AccHead);
selectNode(signal, node, childLoc);
childLoc = node.getLink(1);
}
// access full node again
accessNode(signal, node, AccFull);
// use glb max as new parent min
ent = node.getEnt(node.getOccup() - 1);
nodePopUp(signal, parentNode, pos, ent);
......@@ -183,7 +177,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
TupLoc childLoc = node.getLink(i);
if (childLoc != NullTupLoc) {
// move to child
selectNode(signal, node, childLoc, AccFull);
selectNode(signal, node, childLoc);
// balance of half-leaf parent requires child to be leaf
break;
}
......@@ -196,7 +190,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
// move all that fits into parent
if (parentLoc != NullTupLoc) {
jam();
selectNode(signal, parentNode, node.getLink(2), AccFull);
selectNode(signal, parentNode, node.getLink(2));
nodeSlide(signal, parentNode, node, i);
// fall thru to next case
}
......@@ -222,7 +216,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
// move entries from the other child
TupLoc childLoc = node.getLink(1 - i);
NodeHandle childNode(frag);
selectNode(signal, childNode, childLoc, AccFull);
selectNode(signal, childNode, childLoc);
nodeSlide(signal, node, childNode, 1 - i);
if (childNode.getOccup() == 0) {
jam();
......@@ -236,7 +230,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
}
// fix side and become parent
i = node.getSide();
selectNode(signal, node, parentLoc, AccHead);
selectNode(signal, node, parentLoc);
}
}
#endif
......@@ -261,7 +255,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
jam();
// child on the other side
NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(1 - i), AccHead);
selectNode(signal, childNode, node.getLink(1 - i));
int b2 = childNode.getBalance();
if (b2 == b) {
jam();
......@@ -287,7 +281,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
return;
}
i = node.getSide();
selectNode(signal, node, parentLoc, AccHead);
selectNode(signal, node, parentLoc);
}
}
......@@ -331,7 +325,7 @@ Dbtux::treeRotateSingle(Signal* signal,
*/
TupLoc loc3 = node5.getLink(i);
NodeHandle node3(frag);
selectNode(signal, node3, loc3, AccHead);
selectNode(signal, node3, loc3);
const int bal3 = node3.getBalance();
/*
2 must always be there but is not changed. Thus we mereley check that it
......@@ -348,7 +342,7 @@ Dbtux::treeRotateSingle(Signal* signal,
NodeHandle node4(frag);
if (loc4 != NullTupLoc) {
jam();
selectNode(signal, node4, loc4, AccHead);
selectNode(signal, node4, loc4);
ndbrequire(node4.getSide() == (1 - i) &&
node4.getLink(2) == loc3);
node4.setSide(i);
......@@ -383,7 +377,7 @@ Dbtux::treeRotateSingle(Signal* signal,
if (loc0 != NullTupLoc) {
jam();
NodeHandle node0(frag);
selectNode(signal, node0, loc0, AccHead);
selectNode(signal, node0, loc0);
node0.setLink(side5, loc3);
} else {
jam();
......@@ -532,13 +526,13 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// level 1
TupLoc loc2 = node6.getLink(i);
NodeHandle node2(frag);
selectNode(signal, node2, loc2, AccHead);
selectNode(signal, node2, loc2);
const int bal2 = node2.getBalance();
// level 2
TupLoc loc4 = node2.getLink(1 - i);
NodeHandle node4(frag);
selectNode(signal, node4, loc4, AccHead);
selectNode(signal, node4, loc4);
const int bal4 = node4.getBalance();
ndbrequire(i <= 1);
......@@ -556,8 +550,6 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
if (loc3 == NullTupLoc && loc5 == NullTupLoc) {
jam();
TreeHead& tree = frag.m_tree;
accessNode(signal, node2, AccFull);
accessNode(signal, node4, AccFull);
nodeSlide(signal, node4, node2, i);
// implied by rule of merging half-leaves with leaves
ndbrequire(node4.getOccup() >= tree.m_minOccup);
......@@ -566,14 +558,14 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
if (loc3 != NullTupLoc) {
jam();
NodeHandle node3(frag);
selectNode(signal, node3, loc3, AccHead);
selectNode(signal, node3, loc3);
node3.setLink(2, loc2);
node3.setSide(1 - i);
}
if (loc5 != NullTupLoc) {
jam();
NodeHandle node5(frag);
selectNode(signal, node5, loc5, AccHead);
selectNode(signal, node5, loc5);
node5.setLink(2, node6.m_loc);
node5.setSide(i);
}
......@@ -596,7 +588,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
if (loc0 != NullTupLoc) {
jam();
selectNode(signal, node0, loc0, AccHead);
selectNode(signal, node0, loc0);
node0.setLink(side6, loc4);
} else {
jam();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment