Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
abcd2994
Commit
abcd2994
authored
Oct 17, 2004
by
pekka@mysql.com
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
NDB wl-1942 dbtux - move scans correctly at index tree re-org
parent
a04f98d3
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
265 additions
and
152 deletions
+265
-152
ndb/src/kernel/blocks/dbtux/Dbtux.hpp
ndb/src/kernel/blocks/dbtux/Dbtux.hpp
+11
-4
ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
+233
-135
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+5
-3
ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
+13
-10
ndb/src/kernel/blocks/dbtux/Times.txt
ndb/src/kernel/blocks/dbtux/Times.txt
+3
-0
No files found.
ndb/src/kernel/blocks/dbtux/Dbtux.hpp
View file @
abcd2994
...
@@ -587,12 +587,19 @@ private:
...
@@ -587,12 +587,19 @@ private:
void
deleteNode
(
Signal
*
signal
,
NodeHandle
&
node
);
void
deleteNode
(
Signal
*
signal
,
NodeHandle
&
node
);
void
setNodePref
(
Signal
*
signal
,
NodeHandle
&
node
);
void
setNodePref
(
Signal
*
signal
,
NodeHandle
&
node
);
// node operations
// node operations
void
nodePushUp
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
const
TreeEnt
&
ent
);
void
nodePushUp
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
const
TreeEnt
&
ent
,
Uint32
scanList
);
void
nodePopDown
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
ent
);
void
nodePushUpScans
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
);
void
nodePushDown
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
ent
);
void
nodePopDown
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
en
,
Uint32
*
scanList
);
void
nodePopUp
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
ent
);
void
nodePopDownScans
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
);
void
nodePushDown
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
ent
,
Uint32
&
scanList
);
void
nodePushDownScans
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
);
void
nodePopUp
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
ent
,
Uint32
scanList
);
void
nodePopUpScans
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
);
void
nodeSlide
(
Signal
*
signal
,
NodeHandle
&
dstNode
,
NodeHandle
&
srcNode
,
unsigned
cnt
,
unsigned
i
);
void
nodeSlide
(
Signal
*
signal
,
NodeHandle
&
dstNode
,
NodeHandle
&
srcNode
,
unsigned
cnt
,
unsigned
i
);
// scans linked to node
// scans linked to node
void
addScanList
(
NodeHandle
&
node
,
unsigned
pos
,
Uint32
scanList
);
void
removeScanList
(
NodeHandle
&
node
,
unsigned
pos
,
Uint32
&
scanList
);
void
moveScanList
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
);
void
linkScan
(
NodeHandle
&
node
,
ScanOpPtr
scanPtr
);
void
linkScan
(
NodeHandle
&
node
,
ScanOpPtr
scanPtr
);
void
unlinkScan
(
NodeHandle
&
node
,
ScanOpPtr
scanPtr
);
void
unlinkScan
(
NodeHandle
&
node
,
ScanOpPtr
scanPtr
);
bool
islinkScan
(
NodeHandle
&
node
,
ScanOpPtr
scanPtr
);
bool
islinkScan
(
NodeHandle
&
node
,
ScanOpPtr
scanPtr
);
...
...
ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
View file @
abcd2994
...
@@ -117,18 +117,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
...
@@ -117,18 +117,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
* v
* v
* A B C D E _ _ => A B C X D E _
* A B C D E _ _ => A B C X D E _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Add list of scans at the new entry.
*/
*/
void
void
Dbtux
::
nodePushUp
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
const
TreeEnt
&
ent
)
Dbtux
::
nodePushUp
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
const
TreeEnt
&
ent
,
Uint32
scanList
)
{
{
Frag
&
frag
=
node
.
m_frag
;
Frag
&
frag
=
node
.
m_frag
;
TreeHead
&
tree
=
frag
.
m_tree
;
TreeHead
&
tree
=
frag
.
m_tree
;
const
unsigned
occup
=
node
.
getOccup
();
const
unsigned
occup
=
node
.
getOccup
();
ndbrequire
(
occup
<
tree
.
m_maxOccup
&&
pos
<=
occup
);
ndbrequire
(
occup
<
tree
.
m_maxOccup
&&
pos
<=
occup
);
// fix scans
// fix old scans
if
(
node
.
getNodeScan
()
!=
RNIL
)
nodePushUpScans
(
signal
,
node
,
pos
);
// fix node
TreeEnt
*
const
entList
=
tree
.
getEntList
(
node
.
m_node
);
entList
[
occup
]
=
entList
[
0
];
TreeEnt
*
const
tmpList
=
entList
+
1
;
for
(
unsigned
i
=
occup
;
i
>
pos
;
i
--
)
{
jam
();
tmpList
[
i
]
=
tmpList
[
i
-
1
];
}
tmpList
[
pos
]
=
ent
;
entList
[
0
]
=
entList
[
occup
+
1
];
node
.
setOccup
(
occup
+
1
);
// add new scans
if
(
scanList
!=
RNIL
)
addScanList
(
node
,
pos
,
scanList
);
// fix prefix
if
(
occup
==
0
||
pos
==
0
)
setNodePref
(
signal
,
node
);
}
void
Dbtux
::
nodePushUpScans
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
)
{
const
unsigned
occup
=
node
.
getOccup
();
ScanOpPtr
scanPtr
;
ScanOpPtr
scanPtr
;
scanPtr
.
i
=
node
.
getNodeScan
();
scanPtr
.
i
=
node
.
getNodeScan
();
while
(
scanPtr
.
i
!=
RNIL
)
{
do
{
jam
();
jam
();
c_scanOpPool
.
getPtr
(
scanPtr
);
c_scanOpPool
.
getPtr
(
scanPtr
);
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
...
@@ -144,21 +171,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
...
@@ -144,21 +171,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
scanPos
.
m_pos
++
;
scanPos
.
m_pos
++
;
}
}
scanPtr
.
i
=
scanPtr
.
p
->
m_nodeScan
;
scanPtr
.
i
=
scanPtr
.
p
->
m_nodeScan
;
}
}
while
(
scanPtr
.
i
!=
RNIL
);
// fix node
TreeEnt
*
const
entList
=
tree
.
getEntList
(
node
.
m_node
);
entList
[
occup
]
=
entList
[
0
];
TreeEnt
*
const
tmpList
=
entList
+
1
;
for
(
unsigned
i
=
occup
;
i
>
pos
;
i
--
)
{
jam
();
tmpList
[
i
]
=
tmpList
[
i
-
1
];
}
tmpList
[
pos
]
=
ent
;
entList
[
0
]
=
entList
[
occup
+
1
];
node
.
setOccup
(
occup
+
1
);
// fix prefix
if
(
occup
==
0
||
pos
==
0
)
setNodePref
(
signal
,
node
);
}
}
/*
/*
...
@@ -169,42 +182,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
...
@@ -169,42 +182,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
* ^ ^
* ^ ^
* A B C D E F _ => A B C E F _ _
* A B C D E F _ => A B C E F _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Scans at removed entry are returned if non-zero location is passed or
* else moved forward.
*/
*/
void
void
Dbtux
::
nodePopDown
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
ent
)
Dbtux
::
nodePopDown
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
ent
,
Uint32
*
scanList
)
{
{
Frag
&
frag
=
node
.
m_frag
;
Frag
&
frag
=
node
.
m_frag
;
TreeHead
&
tree
=
frag
.
m_tree
;
TreeHead
&
tree
=
frag
.
m_tree
;
const
unsigned
occup
=
node
.
getOccup
();
const
unsigned
occup
=
node
.
getOccup
();
ndbrequire
(
occup
<=
tree
.
m_maxOccup
&&
pos
<
occup
);
ndbrequire
(
occup
<=
tree
.
m_maxOccup
&&
pos
<
occup
);
ScanOpPtr
scanPtr
;
if
(
node
.
getNodeScan
()
!=
RNIL
)
{
// move scans whose entry disappears
// remove or move scans at this position
scanPtr
.
i
=
node
.
getNodeScan
();
if
(
scanList
==
0
)
while
(
scanPtr
.
i
!=
RNIL
)
{
moveScanList
(
signal
,
node
,
pos
);
jam
();
else
c_scanOpPool
.
getPtr
(
scanPtr
);
removeScanList
(
node
,
pos
,
*
scanList
);
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
// fix other scans
ndbrequire
(
scanPos
.
m_loc
==
node
.
m_loc
&&
scanPos
.
m_pos
<
occup
);
if
(
node
.
getNodeScan
()
!=
RNIL
)
const
Uint32
nextPtrI
=
scanPtr
.
p
->
m_nodeScan
;
nodePopDownScans
(
signal
,
node
,
pos
);
if
(
scanPos
.
m_pos
==
pos
)
{
jam
();
#ifdef VM_TRACE
if
(
debugFlags
&
DebugScan
)
{
debugOut
<<
"Move scan "
<<
scanPtr
.
i
<<
" "
<<
*
scanPtr
.
p
<<
endl
;
debugOut
<<
"At popDown pos="
<<
pos
<<
" "
<<
node
<<
endl
;
}
#endif
scanNext
(
signal
,
scanPtr
);
}
}
scanPtr
.
i
=
nextPtrI
;
// fix node
TreeEnt
*
const
entList
=
tree
.
getEntList
(
node
.
m_node
);
entList
[
occup
]
=
entList
[
0
];
TreeEnt
*
const
tmpList
=
entList
+
1
;
ent
=
tmpList
[
pos
];
for
(
unsigned
i
=
pos
;
i
<
occup
-
1
;
i
++
)
{
jam
();
tmpList
[
i
]
=
tmpList
[
i
+
1
];
}
}
// fix other scans
entList
[
0
]
=
entList
[
occup
-
1
];
node
.
setOccup
(
occup
-
1
);
// fix prefix
if
(
occup
!=
1
&&
pos
==
0
)
setNodePref
(
signal
,
node
);
}
void
Dbtux
::
nodePopDownScans
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
)
{
const
unsigned
occup
=
node
.
getOccup
();
ScanOpPtr
scanPtr
;
scanPtr
.
i
=
node
.
getNodeScan
();
scanPtr
.
i
=
node
.
getNodeScan
();
while
(
scanPtr
.
i
!=
RNIL
)
{
do
{
jam
();
jam
();
c_scanOpPool
.
getPtr
(
scanPtr
);
c_scanOpPool
.
getPtr
(
scanPtr
);
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
ndbrequire
(
scanPos
.
m_loc
==
node
.
m_loc
&&
scanPos
.
m_pos
<
occup
);
ndbrequire
(
scanPos
.
m_loc
==
node
.
m_loc
&&
scanPos
.
m_pos
<
occup
);
// handled before
ndbrequire
(
scanPos
.
m_pos
!=
pos
);
ndbrequire
(
scanPos
.
m_pos
!=
pos
);
if
(
scanPos
.
m_pos
>
pos
)
{
if
(
scanPos
.
m_pos
>
pos
)
{
jam
();
jam
();
...
@@ -217,21 +243,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
...
@@ -217,21 +243,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos
.
m_pos
--
;
scanPos
.
m_pos
--
;
}
}
scanPtr
.
i
=
scanPtr
.
p
->
m_nodeScan
;
scanPtr
.
i
=
scanPtr
.
p
->
m_nodeScan
;
}
}
while
(
scanPtr
.
i
!=
RNIL
);
// fix node
TreeEnt
*
const
entList
=
tree
.
getEntList
(
node
.
m_node
);
entList
[
occup
]
=
entList
[
0
];
TreeEnt
*
const
tmpList
=
entList
+
1
;
ent
=
tmpList
[
pos
];
for
(
unsigned
i
=
pos
;
i
<
occup
-
1
;
i
++
)
{
jam
();
tmpList
[
i
]
=
tmpList
[
i
+
1
];
}
entList
[
0
]
=
entList
[
occup
-
1
];
node
.
setOccup
(
occup
-
1
);
// fix prefix
if
(
occup
!=
1
&&
pos
==
0
)
setNodePref
(
signal
,
node
);
}
}
/*
/*
...
@@ -242,43 +254,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
...
@@ -242,43 +254,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
* ^ v ^
* ^ v ^
* A B C D E _ _ => B C D X E _ _
* A B C D E _ _ => B C D X E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Return list of scans at the removed position 0.
*/
*/
void
void
Dbtux
::
nodePushDown
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
ent
)
Dbtux
::
nodePushDown
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
ent
,
Uint32
&
scanList
)
{
{
Frag
&
frag
=
node
.
m_frag
;
Frag
&
frag
=
node
.
m_frag
;
TreeHead
&
tree
=
frag
.
m_tree
;
TreeHead
&
tree
=
frag
.
m_tree
;
const
unsigned
occup
=
node
.
getOccup
();
const
unsigned
occup
=
node
.
getOccup
();
ndbrequire
(
occup
<=
tree
.
m_maxOccup
&&
pos
<
occup
);
ndbrequire
(
occup
<=
tree
.
m_maxOccup
&&
pos
<
occup
);
ScanOpPtr
scanPtr
;
if
(
node
.
getNodeScan
()
!=
RNIL
)
{
// move scans whose entry disappears
// remove scans at 0
scanPtr
.
i
=
node
.
getNodeScan
();
removeScanList
(
node
,
0
,
scanList
);
while
(
scanPtr
.
i
!=
RNIL
)
{
// fix other scans
jam
();
if
(
node
.
getNodeScan
()
!=
RNIL
)
c_scanOpPool
.
getPtr
(
scanPtr
);
nodePushDownScans
(
signal
,
node
,
pos
);
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
ndbrequire
(
scanPos
.
m_loc
==
node
.
m_loc
&&
scanPos
.
m_pos
<
occup
);
const
Uint32
nextPtrI
=
scanPtr
.
p
->
m_nodeScan
;
if
(
scanPos
.
m_pos
==
0
)
{
jam
();
#ifdef VM_TRACE
if
(
debugFlags
&
DebugScan
)
{
debugOut
<<
"Move scan "
<<
scanPtr
.
i
<<
" "
<<
*
scanPtr
.
p
<<
endl
;
debugOut
<<
"At pushDown pos="
<<
pos
<<
" "
<<
node
<<
endl
;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext
(
signal
,
scanPtr
);
}
}
scanPtr
.
i
=
nextPtrI
;
// fix node
TreeEnt
*
const
entList
=
tree
.
getEntList
(
node
.
m_node
);
entList
[
occup
]
=
entList
[
0
];
TreeEnt
*
const
tmpList
=
entList
+
1
;
TreeEnt
oldMin
=
tmpList
[
0
];
for
(
unsigned
i
=
0
;
i
<
pos
;
i
++
)
{
jam
();
tmpList
[
i
]
=
tmpList
[
i
+
1
];
}
}
// fix other scans
tmpList
[
pos
]
=
ent
;
ent
=
oldMin
;
entList
[
0
]
=
entList
[
occup
];
// fix prefix
if
(
true
)
setNodePref
(
signal
,
node
);
}
void
Dbtux
::
nodePushDownScans
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
)
{
const
unsigned
occup
=
node
.
getOccup
();
ScanOpPtr
scanPtr
;
scanPtr
.
i
=
node
.
getNodeScan
();
scanPtr
.
i
=
node
.
getNodeScan
();
while
(
scanPtr
.
i
!=
RNIL
)
{
do
{
jam
();
jam
();
c_scanOpPool
.
getPtr
(
scanPtr
);
c_scanOpPool
.
getPtr
(
scanPtr
);
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
ndbrequire
(
scanPos
.
m_loc
==
node
.
m_loc
&&
scanPos
.
m_pos
<
occup
);
ndbrequire
(
scanPos
.
m_loc
==
node
.
m_loc
&&
scanPos
.
m_pos
<
occup
);
// handled before
ndbrequire
(
scanPos
.
m_pos
!=
0
);
ndbrequire
(
scanPos
.
m_pos
!=
0
);
if
(
scanPos
.
m_pos
<=
pos
)
{
if
(
scanPos
.
m_pos
<=
pos
)
{
jam
();
jam
();
...
@@ -291,22 +312,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
...
@@ -291,22 +312,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
scanPos
.
m_pos
--
;
scanPos
.
m_pos
--
;
}
}
scanPtr
.
i
=
scanPtr
.
p
->
m_nodeScan
;
scanPtr
.
i
=
scanPtr
.
p
->
m_nodeScan
;
}
}
while
(
scanPtr
.
i
!=
RNIL
);
// fix node
TreeEnt
*
const
entList
=
tree
.
getEntList
(
node
.
m_node
);
entList
[
occup
]
=
entList
[
0
];
TreeEnt
*
const
tmpList
=
entList
+
1
;
TreeEnt
oldMin
=
tmpList
[
0
];
for
(
unsigned
i
=
0
;
i
<
pos
;
i
++
)
{
jam
();
tmpList
[
i
]
=
tmpList
[
i
+
1
];
}
tmpList
[
pos
]
=
ent
;
ent
=
oldMin
;
entList
[
0
]
=
entList
[
occup
];
// fix prefix
if
(
true
)
setNodePref
(
signal
,
node
);
}
}
/*
/*
...
@@ -318,39 +324,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
...
@@ -318,39 +324,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
* v ^ ^
* v ^ ^
* A B C D E _ _ => X A B C E _ _
* A B C D E _ _ => X A B C E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Move scans at removed entry and add scans at the new entry.
*/
*/
void
void
Dbtux
::
nodePopUp
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
ent
)
Dbtux
::
nodePopUp
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
,
TreeEnt
&
ent
,
Uint32
scanList
)
{
{
Frag
&
frag
=
node
.
m_frag
;
Frag
&
frag
=
node
.
m_frag
;
TreeHead
&
tree
=
frag
.
m_tree
;
TreeHead
&
tree
=
frag
.
m_tree
;
const
unsigned
occup
=
node
.
getOccup
();
const
unsigned
occup
=
node
.
getOccup
();
ndbrequire
(
occup
<=
tree
.
m_maxOccup
&&
pos
<
occup
);
ndbrequire
(
occup
<=
tree
.
m_maxOccup
&&
pos
<
occup
);
ScanOpPtr
scanPtr
;
if
(
node
.
getNodeScan
()
!=
RNIL
)
{
// move scans whose entry disappears
// move scans whose entry disappears
scanPtr
.
i
=
node
.
getNodeScan
();
moveScanList
(
signal
,
node
,
pos
);
while
(
scanPtr
.
i
!=
RNIL
)
{
// fix other scans
jam
();
if
(
node
.
getNodeScan
()
!=
RNIL
)
c_scanOpPool
.
getPtr
(
scanPtr
);
nodePopUpScans
(
signal
,
node
,
pos
);
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
ndbrequire
(
scanPos
.
m_loc
==
node
.
m_loc
&&
scanPos
.
m_pos
<
occup
);
const
Uint32
nextPtrI
=
scanPtr
.
p
->
m_nodeScan
;
if
(
scanPos
.
m_pos
==
pos
)
{
jam
();
#ifdef VM_TRACE
if
(
debugFlags
&
DebugScan
)
{
debugOut
<<
"Move scan "
<<
scanPtr
.
i
<<
" "
<<
*
scanPtr
.
p
<<
endl
;
debugOut
<<
"At popUp pos="
<<
pos
<<
" "
<<
node
<<
endl
;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext
(
signal
,
scanPtr
);
}
}
scanPtr
.
i
=
nextPtrI
;
// fix node
TreeEnt
*
const
entList
=
tree
.
getEntList
(
node
.
m_node
);
entList
[
occup
]
=
entList
[
0
];
TreeEnt
*
const
tmpList
=
entList
+
1
;
TreeEnt
newMin
=
ent
;
ent
=
tmpList
[
pos
];
for
(
unsigned
i
=
pos
;
i
>
0
;
i
--
)
{
jam
();
tmpList
[
i
]
=
tmpList
[
i
-
1
];
}
}
// fix other scans
tmpList
[
0
]
=
newMin
;
entList
[
0
]
=
entList
[
occup
];
// add scans
if
(
scanList
!=
RNIL
)
addScanList
(
node
,
0
,
scanList
);
// fix prefix
if
(
true
)
setNodePref
(
signal
,
node
);
}
void
Dbtux
::
nodePopUpScans
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
)
{
const
unsigned
occup
=
node
.
getOccup
();
ScanOpPtr
scanPtr
;
scanPtr
.
i
=
node
.
getNodeScan
();
scanPtr
.
i
=
node
.
getNodeScan
();
while
(
scanPtr
.
i
!=
RNIL
)
{
do
{
jam
();
jam
();
c_scanOpPool
.
getPtr
(
scanPtr
);
c_scanOpPool
.
getPtr
(
scanPtr
);
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
...
@@ -367,22 +384,7 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
...
@@ -367,22 +384,7 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos
.
m_pos
++
;
scanPos
.
m_pos
++
;
}
}
scanPtr
.
i
=
scanPtr
.
p
->
m_nodeScan
;
scanPtr
.
i
=
scanPtr
.
p
->
m_nodeScan
;
}
}
while
(
scanPtr
.
i
!=
RNIL
);
// fix node
TreeEnt
*
const
entList
=
tree
.
getEntList
(
node
.
m_node
);
entList
[
occup
]
=
entList
[
0
];
TreeEnt
*
const
tmpList
=
entList
+
1
;
TreeEnt
newMin
=
ent
;
ent
=
tmpList
[
pos
];
for
(
unsigned
i
=
pos
;
i
>
0
;
i
--
)
{
jam
();
tmpList
[
i
]
=
tmpList
[
i
-
1
];
}
tmpList
[
0
]
=
newMin
;
entList
[
0
]
=
entList
[
occup
];
// fix prefix
if
(
true
)
setNodePref
(
signal
,
node
);
}
}
/*
/*
...
@@ -397,12 +399,108 @@ Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsig
...
@@ -397,12 +399,108 @@ Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsig
ndbrequire
(
i
<=
1
);
ndbrequire
(
i
<=
1
);
while
(
cnt
!=
0
)
{
while
(
cnt
!=
0
)
{
TreeEnt
ent
;
TreeEnt
ent
;
nodePopDown
(
signal
,
srcNode
,
i
==
0
?
srcNode
.
getOccup
()
-
1
:
0
,
ent
);
Uint32
scanList
=
RNIL
;
nodePushUp
(
signal
,
dstNode
,
i
==
0
?
0
:
dstNode
.
getOccup
(),
ent
);
nodePopDown
(
signal
,
srcNode
,
i
==
0
?
srcNode
.
getOccup
()
-
1
:
0
,
ent
,
&
scanList
);
nodePushUp
(
signal
,
dstNode
,
i
==
0
?
0
:
dstNode
.
getOccup
(),
ent
,
scanList
);
cnt
--
;
cnt
--
;
}
}
}
}
// scans linked to node
/*
* Add list of scans to node at given position.
*/
void
Dbtux
::
addScanList
(
NodeHandle
&
node
,
unsigned
pos
,
Uint32
scanList
)
{
ScanOpPtr
scanPtr
;
scanPtr
.
i
=
scanList
;
do
{
jam
();
c_scanOpPool
.
getPtr
(
scanPtr
);
#ifdef VM_TRACE
if
(
debugFlags
&
DebugScan
)
{
debugOut
<<
"Add scan "
<<
scanPtr
.
i
<<
" "
<<
*
scanPtr
.
p
<<
endl
;
debugOut
<<
"To pos="
<<
pos
<<
" "
<<
node
<<
endl
;
}
#endif
const
Uint32
nextPtrI
=
scanPtr
.
p
->
m_nodeScan
;
scanPtr
.
p
->
m_nodeScan
=
RNIL
;
linkScan
(
node
,
scanPtr
);
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
// set position but leave direction alone
scanPos
.
m_loc
=
node
.
m_loc
;
scanPos
.
m_pos
=
pos
;
scanPtr
.
i
=
nextPtrI
;
}
while
(
scanPtr
.
i
!=
RNIL
);
}
/*
* Remove list of scans from node at given position. The return
* location must point to existing list (in fact RNIL always).
*/
void
Dbtux
::
removeScanList
(
NodeHandle
&
node
,
unsigned
pos
,
Uint32
&
scanList
)
{
ScanOpPtr
scanPtr
;
scanPtr
.
i
=
node
.
getNodeScan
();
do
{
jam
();
c_scanOpPool
.
getPtr
(
scanPtr
);
const
Uint32
nextPtrI
=
scanPtr
.
p
->
m_nodeScan
;
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
ndbrequire
(
scanPos
.
m_loc
==
node
.
m_loc
);
if
(
scanPos
.
m_pos
==
pos
)
{
jam
();
#ifdef VM_TRACE
if
(
debugFlags
&
DebugScan
)
{
debugOut
<<
"Remove scan "
<<
scanPtr
.
i
<<
" "
<<
*
scanPtr
.
p
<<
endl
;
debugOut
<<
"Fron pos="
<<
pos
<<
" "
<<
node
<<
endl
;
}
#endif
unlinkScan
(
node
,
scanPtr
);
scanPtr
.
p
->
m_nodeScan
=
scanList
;
scanList
=
scanPtr
.
i
;
// unset position but leave direction alone
scanPos
.
m_loc
=
NullTupLoc
;
scanPos
.
m_pos
=
ZNIL
;
}
scanPtr
.
i
=
nextPtrI
;
}
while
(
scanPtr
.
i
!=
RNIL
);
}
/*
* Move list of scans away from entry about to be removed. Uses scan
* method scanNext().
*/
void
Dbtux
::
moveScanList
(
Signal
*
signal
,
NodeHandle
&
node
,
unsigned
pos
)
{
ScanOpPtr
scanPtr
;
scanPtr
.
i
=
node
.
getNodeScan
();
do
{
jam
();
c_scanOpPool
.
getPtr
(
scanPtr
);
TreePos
&
scanPos
=
scanPtr
.
p
->
m_scanPos
;
const
Uint32
nextPtrI
=
scanPtr
.
p
->
m_nodeScan
;
ndbrequire
(
scanPos
.
m_loc
==
node
.
m_loc
);
if
(
scanPos
.
m_pos
==
pos
)
{
jam
();
#ifdef VM_TRACE
if
(
debugFlags
&
DebugScan
)
{
debugOut
<<
"Move scan "
<<
scanPtr
.
i
<<
" "
<<
*
scanPtr
.
p
<<
endl
;
debugOut
<<
"At pos="
<<
pos
<<
" "
<<
node
<<
endl
;
}
#endif
scanNext
(
signal
,
scanPtr
);
ndbrequire
(
!
(
scanPos
.
m_loc
==
node
.
m_loc
&&
scanPos
.
m_pos
==
pos
));
}
scanPtr
.
i
=
nextPtrI
;
}
while
(
scanPtr
.
i
!=
RNIL
);
}
/*
/*
* Link scan to the list under the node. The list is single-linked and
* Link scan to the list under the node. The list is single-linked and
* ordering does not matter.
* ordering does not matter.
...
...
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
View file @
abcd2994
...
@@ -716,7 +716,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
...
@@ -716,7 +716,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
/*
/*
* Move to next entry. The scan is already linked to some node. When
* Move to next entry. The scan is already linked to some node. When
* we leave, if an
y
entry was found, it will be linked to a possibly
* we leave, if an entry was found, it will be linked to a possibly
* different node. The scan has a position, and a direction which tells
* different node. The scan has a position, and a direction which tells
* from where we came to this position. This is one of:
* from where we came to this position. This is one of:
*
*
...
@@ -725,6 +725,9 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
...
@@ -725,6 +725,9 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
* 2 - up from root (the scan ends)
* 2 - up from root (the scan ends)
* 3 - left to right within node (at end proceed to right child)
* 3 - left to right within node (at end proceed to right child)
* 4 - down from parent (proceed to left child)
* 4 - down from parent (proceed to left child)
*
* If an entry was found, scan direction is 3. Therefore tree
* re-organizations need not worry about scan direction.
*/
*/
void
void
Dbtux
::
scanNext
(
Signal
*
signal
,
ScanOpPtr
scanPtr
)
Dbtux
::
scanNext
(
Signal
*
signal
,
ScanOpPtr
scanPtr
)
...
@@ -739,9 +742,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
...
@@ -739,9 +742,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
if
(
scan
.
m_state
==
ScanOp
::
Locked
)
{
if
(
scan
.
m_state
==
ScanOp
::
Locked
)
{
jam
();
jam
();
// version of a tuple locked by us cannot disappear (assert only)
// version of a tuple locked by us cannot disappear (assert only)
#ifdef dbtux_wl_1942_is_done
ndbassert
(
false
);
ndbassert
(
false
);
#endif
AccLockReq
*
const
lockReq
=
(
AccLockReq
*
)
signal
->
getDataPtrSend
();
AccLockReq
*
const
lockReq
=
(
AccLockReq
*
)
signal
->
getDataPtrSend
();
lockReq
->
returnCode
=
RNIL
;
lockReq
->
returnCode
=
RNIL
;
lockReq
->
requestInfo
=
AccLockReq
::
Unlock
;
lockReq
->
requestInfo
=
AccLockReq
::
Unlock
;
...
@@ -864,6 +865,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
...
@@ -864,6 +865,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan
.
m_scanPos
=
pos
;
scan
.
m_scanPos
=
pos
;
// relink
// relink
if
(
scan
.
m_state
==
ScanOp
::
Current
)
{
if
(
scan
.
m_state
==
ScanOp
::
Current
)
{
ndbrequire
(
pos
.
m_match
==
true
&&
pos
.
m_dir
==
3
);
ndbrequire
(
pos
.
m_loc
==
node
.
m_loc
);
ndbrequire
(
pos
.
m_loc
==
node
.
m_loc
);
if
(
origNode
.
m_loc
!=
node
.
m_loc
)
{
if
(
origNode
.
m_loc
!=
node
.
m_loc
)
{
jam
();
jam
();
...
...
ndb/src/kernel/blocks/dbtux/DbtuxTree.cpp
View file @
abcd2994
...
@@ -34,7 +34,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
...
@@ -34,7 +34,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
if
(
node
.
getOccup
()
<
tree
.
m_maxOccup
)
{
if
(
node
.
getOccup
()
<
tree
.
m_maxOccup
)
{
// node has room
// node has room
jam
();
jam
();
nodePushUp
(
signal
,
node
,
pos
,
ent
);
nodePushUp
(
signal
,
node
,
pos
,
ent
,
RNIL
);
return
;
return
;
}
}
treeAddFull
(
signal
,
frag
,
node
,
pos
,
ent
);
treeAddFull
(
signal
,
frag
,
node
,
pos
,
ent
);
...
@@ -42,7 +42,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
...
@@ -42,7 +42,7 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
}
}
jam
();
jam
();
insertNode
(
signal
,
node
);
insertNode
(
signal
,
node
);
nodePushUp
(
signal
,
node
,
0
,
ent
);
nodePushUp
(
signal
,
node
,
0
,
ent
,
RNIL
);
node
.
setSide
(
2
);
node
.
setSide
(
2
);
tree
.
m_root
=
node
.
m_loc
;
tree
.
m_root
=
node
.
m_loc
;
}
}
...
@@ -68,13 +68,14 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
...
@@ -68,13 +68,14 @@ Dbtux::treeAddFull(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
if
(
glbNode
.
getOccup
()
<
tree
.
m_maxOccup
)
{
if
(
glbNode
.
getOccup
()
<
tree
.
m_maxOccup
)
{
// g.l.b node has room
// g.l.b node has room
jam
();
jam
();
Uint32
scanList
=
RNIL
;
if
(
pos
!=
0
)
{
if
(
pos
!=
0
)
{
jam
();
jam
();
// add the new entry and return min entry
// add the new entry and return min entry
nodePushDown
(
signal
,
lubNode
,
pos
-
1
,
ent
);
nodePushDown
(
signal
,
lubNode
,
pos
-
1
,
ent
,
scanList
);
}
}
// g.l.b node receives min entry from l.u.b node
// g.l.b node receives min entry from l.u.b node
nodePushUp
(
signal
,
glbNode
,
glbNode
.
getOccup
(),
ent
);
nodePushUp
(
signal
,
glbNode
,
glbNode
.
getOccup
(),
ent
,
scanList
);
return
;
return
;
}
}
treeAddNode
(
signal
,
frag
,
lubNode
,
pos
,
ent
,
glbNode
,
1
);
treeAddNode
(
signal
,
frag
,
lubNode
,
pos
,
ent
,
glbNode
,
1
);
...
@@ -97,13 +98,14 @@ Dbtux::treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
...
@@ -97,13 +98,14 @@ Dbtux::treeAddNode(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned pos,
parentNode
.
setLink
(
i
,
glbNode
.
m_loc
);
parentNode
.
setLink
(
i
,
glbNode
.
m_loc
);
glbNode
.
setLink
(
2
,
parentNode
.
m_loc
);
glbNode
.
setLink
(
2
,
parentNode
.
m_loc
);
glbNode
.
setSide
(
i
);
glbNode
.
setSide
(
i
);
Uint32
scanList
=
RNIL
;
if
(
pos
!=
0
)
{
if
(
pos
!=
0
)
{
jam
();
jam
();
// add the new entry and return min entry
// add the new entry and return min entry
nodePushDown
(
signal
,
lubNode
,
pos
-
1
,
ent
);
nodePushDown
(
signal
,
lubNode
,
pos
-
1
,
ent
,
scanList
);
}
}
// g.l.b node receives min entry from l.u.b node
// g.l.b node receives min entry from l.u.b node
nodePushUp
(
signal
,
glbNode
,
0
,
ent
);
nodePushUp
(
signal
,
glbNode
,
0
,
ent
,
scanList
);
// re-balance the tree
// re-balance the tree
treeAddRebalance
(
signal
,
frag
,
parentNode
,
i
);
treeAddRebalance
(
signal
,
frag
,
parentNode
,
i
);
}
}
...
@@ -179,7 +181,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
...
@@ -179,7 +181,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
if
(
node
.
getOccup
()
>
tree
.
m_minOccup
)
{
if
(
node
.
getOccup
()
>
tree
.
m_minOccup
)
{
// no underflow in any node type
// no underflow in any node type
jam
();
jam
();
nodePopDown
(
signal
,
node
,
pos
,
ent
);
nodePopDown
(
signal
,
node
,
pos
,
ent
,
0
);
return
;
return
;
}
}
if
(
node
.
getChilds
()
==
2
)
{
if
(
node
.
getChilds
()
==
2
)
{
...
@@ -189,7 +191,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
...
@@ -189,7 +191,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
return
;
return
;
}
}
// remove entry in semi/leaf
// remove entry in semi/leaf
nodePopDown
(
signal
,
node
,
pos
,
ent
);
nodePopDown
(
signal
,
node
,
pos
,
ent
,
0
);
if
(
node
.
getLink
(
0
)
!=
NullTupLoc
)
{
if
(
node
.
getLink
(
0
)
!=
NullTupLoc
)
{
jam
();
jam
();
treeRemoveSemi
(
signal
,
frag
,
node
,
0
);
treeRemoveSemi
(
signal
,
frag
,
node
,
0
);
...
@@ -222,8 +224,9 @@ Dbtux::treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned
...
@@ -222,8 +224,9 @@ Dbtux::treeRemoveInner(Signal* signal, Frag& frag, NodeHandle lubNode, unsigned
loc
=
glbNode
.
getLink
(
1
);
loc
=
glbNode
.
getLink
(
1
);
}
while
(
loc
!=
NullTupLoc
);
}
while
(
loc
!=
NullTupLoc
);
// borrow max entry from semi/leaf
// borrow max entry from semi/leaf
nodePopDown
(
signal
,
glbNode
,
glbNode
.
getOccup
()
-
1
,
ent
);
Uint32
scanList
=
RNIL
;
nodePopUp
(
signal
,
lubNode
,
pos
,
ent
);
nodePopDown
(
signal
,
glbNode
,
glbNode
.
getOccup
()
-
1
,
ent
,
&
scanList
);
nodePopUp
(
signal
,
lubNode
,
pos
,
ent
,
scanList
);
if
(
glbNode
.
getLink
(
0
)
!=
NullTupLoc
)
{
if
(
glbNode
.
getLink
(
0
)
!=
NullTupLoc
)
{
jam
();
jam
();
treeRemoveSemi
(
signal
,
frag
,
glbNode
,
0
);
treeRemoveSemi
(
signal
,
frag
,
glbNode
,
0
);
...
...
ndb/src/kernel/blocks/dbtux/Times.txt
View file @
abcd2994
...
@@ -129,4 +129,7 @@ optim 17 mc02/a 35 ms 52 ms 49 pct
...
@@ -129,4 +129,7 @@ optim 17 mc02/a 35 ms 52 ms 49 pct
[ allow slack (2) in interior nodes - almost no effect?? ]
[ allow slack (2) in interior nodes - almost no effect?? ]
wl-1942 mc02/a 35 ms 52 ms 49 pct
mc02/b 42 ms 75 ms 76 pct
vim: set et:
vim: set et:
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment