Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
1b31325e
Commit
1b31325e
authored
Oct 10, 2004
by
unknown
Browse files
Options
Browse Files
Download
Plain Diff
Merge
sql/ha_ndbcluster.cc: SCCS merged
parents
a54801ee
f5571c3d
Changes
17
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
530 additions
and
318 deletions
+530
-318
mysql-test/ndb/ndb_range_bounds.pl
mysql-test/ndb/ndb_range_bounds.pl
+133
-0
ndb/include/kernel/signaldata/TuxBound.hpp
ndb/include/kernel/signaldata/TuxBound.hpp
+0
-1
ndb/include/ndbapi/NdbIndexScanOperation.hpp
ndb/include/ndbapi/NdbIndexScanOperation.hpp
+18
-28
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+0
-1
ndb/src/kernel/blocks/dbtux/Dbtux.hpp
ndb/src/kernel/blocks/dbtux/Dbtux.hpp
+56
-27
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
+20
-46
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
+8
-10
ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
+2
-2
ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
+2
-2
ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
+14
-0
ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
+6
-6
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+52
-55
ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
+43
-19
ndb/src/kernel/blocks/dbtux/Times.txt
ndb/src/kernel/blocks/dbtux/Times.txt
+12
-0
ndb/test/ndbapi/testOIBasic.cpp
ndb/test/ndbapi/testOIBasic.cpp
+21
-4
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.cc
+142
-115
sql/ha_ndbcluster.h
sql/ha_ndbcluster.h
+1
-2
No files found.
mysql-test/ndb/ndb_range_bounds.pl
0 → 100644
View file @
1b31325e
#
# test range scan bounds
# output to mysql-test/t/ndb_range_bounds.test
#
# give option --all to generate all cases
#
use
strict
;
use
integer
;
my
$all
=
shift
;
!
defined
(
$all
)
||
(
$all
eq
'
--all
'
&&
!
defined
(
shift
))
or
die
"
only available option is --all
";
my
$table
=
'
t
';
print
<<EOF;
--source include/have_ndb.inc
--disable_warnings
drop table if exists $table;
--enable_warnings
# test range scan bounds
# generated by mysql-test/ndb/ndb_range_bounds.pl
# all selects must return 0
EOF
sub
cut
($$@)
{
my
(
$op
,
$key
,
@v
)
=
@_
;
$op
=
'
==
'
if
$op
eq
'
=
';
my
(
@w
);
eval
"
\
@w
= grep(
\
$_
$op
$key
,
\
@v
)
";
$@
and
die
$@
;
return
@w
;
}
sub
mkdummy
(\@) {
my
(
$val
)
=
@_
;
return
{
'
dummy
'
=>
1
,
'
exp
'
=>
'
9 = 9
',
'
cnt
'
=>
scalar
@$val
,
};
}
sub
mkone
($$$\@) {
my
(
$col
,
$op
,
$key
,
$val
)
=
@_
;
my
$cnt
=
scalar
cut
(
$op
,
$key
,
@$val
);
return
{
'
exp
'
=>
"
$col
$op
$key
",
'
cnt
'
=>
$cnt
,
};
}
sub
mktwo
($$$$$\@) {
my
(
$col
,
$op1
,
$key1
,
$op2
,
$key2
,
$val
)
=
@_
;
my
$cnt
=
scalar
cut
(
$op2
,
$key2
,
cut
(
$op1
,
$key1
,
@$val
));
return
{
'
exp
'
=>
"
$col
$op1
$key1
and
$col
$op2
$key2
",
'
cnt
'
=>
$cnt
,
};
}
sub
mkall
($$$\@) {
my
(
$col
,
$key1
,
$key2
,
$val
)
=
@_
;
my
@a
=
();
my
$p
=
mkdummy
(
@$val
);
push
(
@a
,
$p
)
if
$all
;
my
@ops1
=
$all
?
qw(< <= = >= >)
:
qw(= >= >)
;
my
@ops2
=
$all
?
qw(< <= = >= >)
:
qw(< <=)
;
for
my
$op1
(
@ops1
)
{
my
$p
=
mkone
(
$col
,
$op1
,
$key1
,
@$val
);
push
(
@a
,
$p
)
if
$all
||
$p
->
{
cnt
}
!=
0
;
for
my
$op2
(
@ops2
)
{
my
$p
=
mktwo
(
$col
,
$op1
,
$key1
,
$op2
,
$key2
,
@$val
);
push
(
@a
,
$p
)
if
$all
||
$p
->
{
cnt
}
!=
0
;
}
}
return
\
@a
;
}
for
my
$nn
("
bcd
",
"")
{
my
%
nn
;
for
my
$x
(
qw(b c d)
)
{
$nn
{
$x
}
=
$nn
=~
/$x/
?
"
not null
"
:
"
null
";
}
print
<<EOF;
create table $table (
a int primary key,
b int $nn{b},
c int $nn{c},
d int $nn{d},
index (b, c, d)
) engine=ndb;
EOF
my
@val
=
(
0
..
4
);
my
$v0
=
0
;
for
my
$v1
(
@val
)
{
for
my
$v2
(
@val
)
{
for
my
$v3
(
@val
)
{
print
"
insert into
$table
values(
$v0
,
$v1
,
$v2
,
$v3
);
\n
";
$v0
++
;
}
}
}
my
$key1
=
1
;
my
$key2
=
3
;
my
$a1
=
mkall
('
b
',
$key1
,
$key2
,
@val
);
my
$a2
=
mkall
('
c
',
$key1
,
$key2
,
@val
);
my
$a3
=
mkall
('
d
',
$key1
,
$key2
,
@val
);
for
my
$p1
(
@$a1
)
{
my
$cnt1
=
$p1
->
{
cnt
}
*
@val
*
@val
;
print
"
select count(*) -
$cnt1
from
$table
";
print
"
where
$p1
->{exp};
\n
";
for
my
$p2
(
@$a2
)
{
my
$cnt2
=
$p1
->
{
cnt
}
*
$p2
->
{
cnt
}
*
@val
;
print
"
select count(*) -
$cnt2
from
$table
";
print
"
where
$p1
->{exp} and
$p2
->{exp};
\n
";
for
my
$p3
(
@$a3
)
{
my
$cnt3
=
$p1
->
{
cnt
}
*
$p2
->
{
cnt
}
*
$p3
->
{
cnt
};
print
"
select count(*) -
$cnt3
from
$table
";
print
"
where
$p1
->{exp} and
$p2
->{exp} and
$p3
->{exp};
\n
";
}
}
}
print
<<EOF;
drop table $table;
EOF
}
# vim: set sw=2:
ndb/include/kernel/signaldata/TuxBound.hpp
View file @
1b31325e
...
...
@@ -48,7 +48,6 @@ private:
Uint32
tuxScanPtrI
;
/*
* Number of words of bound info included after fixed signal data.
* Starts with 5 unused words (word 0 is length used by LQH).
*/
Uint32
boundAiLength
;
};
...
...
ndb/include/ndbapi/NdbIndexScanOperation.hpp
View file @
1b31325e
...
...
@@ -55,28 +55,12 @@ public:
return
readTuples
(
LM_Exclusive
,
0
,
parallell
,
false
);
}
/**
* @name Define Range Scan
*
* A range scan is a scan on an ordered index. The operation is on
* the index table but tuples are returned from the primary table.
* The index contains all tuples where at least one index key has not
* null value.
*
* A range scan is currently opened via a normal open scan method.
* Bounds can be defined for each index key. After setting bounds,
* usual scan methods can be used (get value, interpreter, take over).
* These operate on the primary table.
*
* @{
*/
/**
* Type of ordered index key bound. The values (0-4) will not change
* and can be used explicitly (e.g. they could be computed).
*/
enum
BoundType
{
BoundLE
=
0
,
///< lower bound
,
BoundLE
=
0
,
///< lower bound
BoundLT
=
1
,
///< lower bound, strict
BoundGE
=
2
,
///< upper bound
BoundGT
=
3
,
///< upper bound, strict
...
...
@@ -86,20 +70,28 @@ public:
/**
* Define bound on index key in range scan.
*
* Each index key can have lower and/or upper bound, or can be set
* equal to a value. The bounds can be defined in any order but
* a duplicate definition is an error.
* Each index key can have lower and/or upper bound. Setting the key
* equal to a value defines both upper and lower bounds. The bounds
* can be defined in any order. Conflicting definitions is an error.
*
* For equality, it is better to use BoundEQ instead of the equivalent
* pair of BoundLE and BoundGE. This is especially true when table
* distribution key is an initial part of the index key.
*
* The bounds must specify a single range i.e. they are on an initial
* sequence of index keys and the condition is equality for all but
* (at most) the last key which has a lower and/or upper bound.
* The sets of lower and upper bounds must be on initial sequences of
* index keys. All but possibly the last bound must be non-strict.
* So "a >= 2 and b > 3" is ok but "a > 2 and b >= 3" is not.
*
* The scan may currently return tuples for which the bounds are not
* satisfied. For example, "a <= 2 and b <= 3" scans the index up to
* (a=2, b=3) but also returns any (a=1, b=4).
*
* NULL is treated like a normal value which is less than any not-NULL
* value and equal to another NULL value. To
search for
NULL use
* value and equal to another NULL value. To
compare against
NULL use
* setBound with null pointer (0).
*
* An index stores also all-NULL keys
(this may become optional).
*
Doing index scan with empty
bound set returns all table tuples.
* An index stores also all-NULL keys
. Doing index scan with empty
* bound set returns all table tuples.
*
* @param attrName Attribute name, alternatively:
* @param anAttrId Index column id (starting from 0)
...
...
@@ -117,8 +109,6 @@ public:
*/
int
setBound
(
Uint32
anAttrId
,
int
type
,
const
void
*
aValue
,
Uint32
len
=
0
);
/** @} *********************************************************************/
/**
* Reset bounds and put operation in list that will be
* sent on next execute
...
...
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
View file @
1b31325e
...
...
@@ -7683,7 +7683,6 @@ void Dblqh::accScanConfScanLab(Signal* signal)
Uint32
boundAiLength
=
tcConnectptr
.
p
->
primKeyLen
-
4
;
if
(
scanptr
.
p
->
rangeScan
)
{
jam
();
// bound info length is in first of the 5 header words
TuxBoundInfo
*
const
req
=
(
TuxBoundInfo
*
)
signal
->
getDataPtrSend
();
req
->
errorCode
=
RNIL
;
req
->
tuxScanPtrI
=
scanptr
.
p
->
scanAccPtr
;
...
...
ndb/src/kernel/blocks/dbtux/Dbtux.hpp
View file @
1b31325e
...
...
@@ -172,12 +172,21 @@ private:
* Physical tuple address in TUP. Provides fast access to table tuple
* or index node. Valid within the db node and across timeslices.
* Not valid between db nodes or across restarts.
*
* To avoid wasting an Uint16 the pageid is split in two.
*/
struct
TupLoc
{
Uint32
m_pageId
;
// page i-value
private:
Uint16
m_pageId1
;
// page i-value (big-endian)
Uint16
m_pageId2
;
Uint16
m_pageOffset
;
// page offset in words
public:
TupLoc
();
TupLoc
(
Uint32
pageId
,
Uint16
pageOffset
);
Uint32
getPageId
()
const
;
void
setPageId
(
Uint32
pageId
);
Uint32
getPageOffset
()
const
;
void
setPageOffset
(
Uint32
pageOffset
);
bool
operator
==
(
const
TupLoc
&
loc
)
const
;
bool
operator
!=
(
const
TupLoc
&
loc
)
const
;
};
...
...
@@ -224,18 +233,13 @@ private:
* work entry part 5
*
* There are 3 links to other nodes: left child, right child, parent.
* These are in TupLoc format but the pageIds and pageOffsets are
* stored in separate arrays (saves 1 word).
*
* Occupancy (number of entries) is at least 1 except temporarily when
* a node is about to be removed. If occupancy is 1, only max entry
* is present but both min and max prefixes are set.
* a node is about to be removed.
*/
struct
TreeNode
;
friend
struct
TreeNode
;
struct
TreeNode
{
Uint32
m_linkPI
[
3
];
// link to 0-left child 1-right child 2-parent
Uint16
m_linkPO
[
3
];
// page offsets for above real page ids
TupLoc
m_link
[
3
];
// link to 0-left child 1-right child 2-parent
unsigned
m_side
:
2
;
// we are 0-left child 1-right child 2-root
int
m_balance
:
2
;
// balance -1, 0, +1
unsigned
pad1
:
4
;
...
...
@@ -805,22 +809,52 @@ Dbtux::ConstData::operator=(Data data)
inline
Dbtux
::
TupLoc
::
TupLoc
()
:
m_pageId
(
RNIL
),
m_pageId1
(
RNIL
>>
16
),
m_pageId2
(
RNIL
&
0xFFFF
),
m_pageOffset
(
0
)
{
}
inline
Dbtux
::
TupLoc
::
TupLoc
(
Uint32
pageId
,
Uint16
pageOffset
)
:
m_pageId
(
pageId
),
m_pageId1
(
pageId
>>
16
),
m_pageId2
(
pageId
&
0xFFFF
),
m_pageOffset
(
pageOffset
)
{
}
inline
Uint32
Dbtux
::
TupLoc
::
getPageId
()
const
{
return
(
m_pageId1
<<
16
)
|
m_pageId2
;
}
inline
void
Dbtux
::
TupLoc
::
setPageId
(
Uint32
pageId
)
{
m_pageId1
=
(
pageId
>>
16
);
m_pageId2
=
(
pageId
&
0xFFFF
);
}
inline
Uint32
Dbtux
::
TupLoc
::
getPageOffset
()
const
{
return
(
Uint32
)
m_pageOffset
;
}
inline
void
Dbtux
::
TupLoc
::
setPageOffset
(
Uint32
pageOffset
)
{
m_pageOffset
=
(
Uint16
)
pageOffset
;
}
inline
bool
Dbtux
::
TupLoc
::
operator
==
(
const
TupLoc
&
loc
)
const
{
return
m_pageId
==
loc
.
m_pageId
&&
m_pageOffset
==
loc
.
m_pageOffset
;
return
m_pageId1
==
loc
.
m_pageId1
&&
m_pageId2
==
loc
.
m_pageId2
&&
m_pageOffset
==
loc
.
m_pageOffset
;
}
inline
bool
...
...
@@ -851,13 +885,13 @@ Dbtux::TreeEnt::eq(const TreeEnt ent) const
inline
int
Dbtux
::
TreeEnt
::
cmp
(
const
TreeEnt
ent
)
const
{
if
(
m_tupLoc
.
m_pageId
<
ent
.
m_tupLoc
.
m_pageId
)
if
(
m_tupLoc
.
getPageId
()
<
ent
.
m_tupLoc
.
getPageId
()
)
return
-
1
;
if
(
m_tupLoc
.
m_pageId
>
ent
.
m_tupLoc
.
m_pageId
)
if
(
m_tupLoc
.
getPageId
()
>
ent
.
m_tupLoc
.
getPageId
()
)
return
+
1
;
if
(
m_tupLoc
.
m_pageOffset
<
ent
.
m_tupLoc
.
m_pageOffset
)
if
(
m_tupLoc
.
getPageOffset
()
<
ent
.
m_tupLoc
.
getPageOffset
()
)
return
-
1
;
if
(
m_tupLoc
.
m_pageOffset
>
ent
.
m_tupLoc
.
m_pageOffset
)
if
(
m_tupLoc
.
getPageOffset
()
>
ent
.
m_tupLoc
.
getPageOffset
()
)
return
+
1
;
if
(
m_tupVersion
<
ent
.
m_tupVersion
)
return
-
1
;
...
...
@@ -880,12 +914,9 @@ Dbtux::TreeNode::TreeNode() :
m_occup
(
0
),
m_nodeScan
(
RNIL
)
{
m_linkPI
[
0
]
=
NullTupLoc
.
m_pageId
;
m_linkPO
[
0
]
=
NullTupLoc
.
m_pageOffset
;
m_linkPI
[
1
]
=
NullTupLoc
.
m_pageId
;
m_linkPO
[
1
]
=
NullTupLoc
.
m_pageOffset
;
m_linkPI
[
2
]
=
NullTupLoc
.
m_pageId
;
m_linkPO
[
2
]
=
NullTupLoc
.
m_pageOffset
;
m_link
[
0
]
=
NullTupLoc
;
m_link
[
1
]
=
NullTupLoc
;
m_link
[
2
]
=
NullTupLoc
;
}
// Dbtux::TreeHead
...
...
@@ -913,7 +944,6 @@ Dbtux::TreeHead::getSize(AccSize acc) const
case
AccFull
:
return
m_nodeSize
;
}
abort
();
return
0
;
}
...
...
@@ -1088,13 +1118,13 @@ inline Dbtux::TupLoc
Dbtux
::
NodeHandle
::
getLink
(
unsigned
i
)
{
ndbrequire
(
i
<=
2
);
return
TupLoc
(
m_node
->
m_linkPI
[
i
],
m_node
->
m_linkPO
[
i
])
;
return
m_node
->
m_link
[
i
]
;
}
inline
unsigned
Dbtux
::
NodeHandle
::
getChilds
()
{
return
(
getLink
(
0
)
!=
NullTupLoc
)
+
(
getLink
(
1
)
!=
NullTupLoc
);
return
(
m_node
->
m_link
[
0
]
!=
NullTupLoc
)
+
(
m_node
->
m_link
[
1
]
!=
NullTupLoc
);
}
inline
unsigned
...
...
@@ -1125,8 +1155,7 @@ inline void
Dbtux
::
NodeHandle
::
setLink
(
unsigned
i
,
TupLoc
loc
)
{
ndbrequire
(
i
<=
2
);
m_node
->
m_linkPI
[
i
]
=
loc
.
m_pageId
;
m_node
->
m_linkPO
[
i
]
=
loc
.
m_pageOffset
;
m_node
->
m_link
[
i
]
=
loc
;
}
inline
void
...
...
@@ -1224,7 +1253,7 @@ Dbtux::getTupAddr(const Frag& frag, TreeEnt ent)
const
Uint32
tableFragPtrI
=
frag
.
m_tupTableFragPtrI
[
ent
.
m_fragBit
];
const
TupLoc
tupLoc
=
ent
.
m_tupLoc
;
Uint32
tupAddr
=
NullTupAddr
;
c_tup
->
tuxGetTupAddr
(
tableFragPtrI
,
tupLoc
.
m_pageId
,
tupLoc
.
m_pageOffset
,
tupAddr
);
c_tup
->
tuxGetTupAddr
(
tableFragPtrI
,
tupLoc
.
getPageId
(),
tupLoc
.
getPageOffset
()
,
tupAddr
);
jamEntry
();
return
tupAddr
;
}
...
...
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
View file @
1b31325e
...
...
@@ -87,21 +87,23 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
/*
* Scan bound vs node prefix or entry.
*
* Compare lower or upper bound and index
attribute data. The attribute
*
data may be partial in which case CmpUnknown may be returned.
*
Returns -1 if the boundary is to the left of the compared key and +1
*
if the boundary is to the right of the compared ke
y.
* Compare lower or upper bound and index
entry data. The entry data
*
may be partial in which case CmpUnknown may be returned. Otherwise
*
returns -1 if the bound is to the left of the entry and +1 if the
*
bound is to the right of the entr
y.
*
* T
o get this behaviour we treat equality a little bit special. If the
*
boundary is a lower bound then the boundary is to the left of all
*
equal keys and if it is an upper bound then the boundary is to the
*
right of all equal keys
.
* T
he routine is similar to cmpSearchKey, but 0 is never returned.
*
Suppose all attributes compare equal. Recall that all bounds except
*
possibly the last one are non-strict. Use the given bound direction
*
(0-lower 1-upper) and strictness of last bound to return -1 or +1
.
*
* When searching for the first key we are using the lower bound to try
* to find the first key that is to the right of the boundary. Then we
* start scanning from this tuple (including the tuple itself) until we
* find the first key which is to the right of the boundary. Then we
* stop and do not include that key in the scan result.
* Following example illustrates this. We are at (a=2, b=3).
*
* dir bounds strict return
* 0 a >= 2 and b >= 3 no -1
* 0 a >= 2 and b > 3 yes +1
* 1 a <= 2 and b <= 3 no +1
* 1 a <= 2 and b < 3 yes -1
*/
int
Dbtux
::
cmpScanBound
(
const
Frag
&
frag
,
unsigned
dir
,
ConstData
boundInfo
,
unsigned
boundCount
,
ConstData
entryData
,
unsigned
maxlen
)
...
...
@@ -111,12 +113,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
ndbrequire
(
dir
<=
1
);
// number of words of data left
unsigned
len2
=
maxlen
;
/*
* No boundary means full scan, low boundary is to the right of all
* keys. Thus we should always return -1. For upper bound we are to
* the right of all keys, thus we should always return +1. We achieve
* this behaviour by initializing type to 4.
*/
// in case of no bounds, init last type to something non-strict
unsigned
type
=
4
;
while
(
boundCount
!=
0
)
{
if
(
len2
<=
AttributeHeaderSize
)
{
...
...
@@ -124,7 +121,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
return
NdbSqlUtil
::
CmpUnknown
;
}
len2
-=
AttributeHeaderSize
;
// get and skip bound type
// get and skip bound type
(it is used after the loop)
type
=
boundInfo
[
0
];
boundInfo
+=
1
;
if
(
!
boundInfo
.
ah
().
isNULL
())
{
...
...
@@ -166,30 +163,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
entryData
+=
AttributeHeaderSize
+
entryData
.
ah
().
getDataSize
();
boundCount
-=
1
;
}
if
(
dir
==
0
)
{
jam
();
/*
* Looking for the lower bound. If strict lower bound then the
* boundary is to the right of the compared key and otherwise (equal
* included in range) then the boundary is to the left of the key.
*/
if
(
type
==
1
)
{
jam
();
return
+
1
;
}
return
-
1
;
}
else
{
jam
();
/*
* Looking for the upper bound. If strict upper bound then the
* boundary is to the left of all equal keys and otherwise (equal
* included in the range) then the boundary is to the right of all
* equal keys.
*/
if
(
type
==
3
)
{
jam
();
return
-
1
;
}
return
+
1
;
}
// all attributes were equal
const
int
strict
=
(
type
&
0x1
);
return
(
dir
==
0
?
(
strict
==
0
?
-
1
:
+
1
)
:
(
strict
==
0
?
+
1
:
-
1
));
}
ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
View file @
1b31325e
...
...
@@ -256,8 +256,8 @@ operator<<(NdbOut& out, const Dbtux::TupLoc& loc)
if
(
loc
==
Dbtux
::
NullTupLoc
)
{
out
<<
"null"
;
}
else
{
out
<<
dec
<<
loc
.
m_pageId
;
out
<<
"."
<<
dec
<<
loc
.
m_pageOffset
;
out
<<
dec
<<
loc
.
getPageId
()
;
out
<<
"."
<<
dec
<<
loc
.
getPageOffset
()
;
}
return
out
;
}
...
...
@@ -274,13 +274,10 @@ operator<<(NdbOut& out, const Dbtux::TreeEnt& ent)
NdbOut
&
operator
<<
(
NdbOut
&
out
,
const
Dbtux
::
TreeNode
&
node
)
{
Dbtux
::
TupLoc
link0
(
node
.
m_linkPI
[
0
],
node
.
m_linkPO
[
0
]);
Dbtux
::
TupLoc
link1
(
node
.
m_linkPI
[
1
],
node
.
m_linkPO
[
1
]);
Dbtux
::
TupLoc
link2
(
node
.
m_linkPI
[
2
],
node
.
m_linkPO
[
2
]);
out
<<
"[TreeNode "
<<
hex
<<
&
node
;
out
<<
" [left "
<<
link0
<<
"]"
;
out
<<
" [right "
<<
link1
<<
"]"
;
out
<<
" [up "
<<
link2
<<
"]"
;
out
<<
" [left "
<<
node
.
m_link
[
0
]
<<
"]"
;
out
<<
" [right "
<<
node
.
m_link
[
1
]
<<
"]"
;
out
<<
" [up "
<<
node
.
m_link
[
2
]
<<
"]"
;
out
<<
" [side "
<<
dec
<<
node
.
m_side
<<
"]"
;
out
<<
" [occup "
<<
dec
<<
node
.
m_occup
<<
"]"
;
out
<<
" [balance "
<<
dec
<<
(
int
)
node
.
m_balance
<<
"]"
;
...
...
@@ -427,8 +424,9 @@ operator<<(NdbOut& out, const Dbtux::NodeHandle& node)
}
data
=
(
const
Uint32
*
)
node
.
m_node
+
Dbtux
::
NodeHeadSize
+
tree
.
m_prefSize
;
const
Dbtux
::
TreeEnt
*
entList
=
(
const
Dbtux
::
TreeEnt
*
)
data
;
for
(
unsigned
pos
=
0
;
pos
<
numpos
;
pos
++
)
out
<<
" "
<<
entList
[
pos
];
// print entries in logical order
for
(
unsigned
pos
=
1
;
pos
<=
numpos
;
pos
++
)
out
<<
" "
<<
entList
[
pos
%
numpos
];
out
<<
"]"
;
}
out
<<
"]"
;
...
...
ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp
View file @
1b31325e
...
...
@@ -245,7 +245,7 @@ Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
const
Uint32
numAttrs
=
frag
.
m_numAttrs
-
start
;
// skip to start position in keyAttrs only
keyAttrs
+=
start
;
int
ret
=
c_tup
->
tuxReadAttrs
(
tableFragPtrI
,
tupLoc
.
m_pageId
,
tupLoc
.
m_pageOffset
,
tupVersion
,
keyAttrs
,
numAttrs
,
keyData
);
int
ret
=
c_tup
->
tuxReadAttrs
(
tableFragPtrI
,
tupLoc
.
getPageId
(),
tupLoc
.
getPageOffset
()
,
tupVersion
,
keyAttrs
,
numAttrs
,
keyData
);
jamEntry
();
// TODO handle error
ndbrequire
(
ret
>
0
);
...
...
@@ -256,7 +256,7 @@ Dbtux::readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize)
{
const
Uint32
tableFragPtrI
=
frag
.
m_tupTableFragPtrI
[
ent
.
m_fragBit
];
const
TupLoc
tupLoc
=
ent
.
m_tupLoc
;
int
ret
=
c_tup
->
tuxReadPk
(
tableFragPtrI
,
tupLoc
.
m_pageId
,
tupLoc
.
m_pageOffset
,
pkData
);
int
ret
=
c_tup
->
tuxReadPk
(
tableFragPtrI
,
tupLoc
.
getPageId
(),
tupLoc
.
getPageOffset
()
,
pkData
);
jamEntry
();
// TODO handle error
ndbrequire
(
ret
>
0
);
...
...
ndb/src/kernel/blocks/dbtux/DbtuxMaint.cpp
View file @
1b31325e
...
...
@@ -120,7 +120,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
searchToAdd
(
signal
,
frag
,
c_searchKey
,
ent
,
treePos
);
#ifdef VM_TRACE
if
(
debugFlags
&
DebugMaint
)
{
debugOut
<<
treePos
<<
endl
;
debugOut
<<
treePos
<<
(
treePos
.
m_match
?
" - error"
:
""
)
<<
endl
;
}
#endif
if
(
treePos
.
m_match
)
{
...
...
@@ -154,7 +154,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
searchToRemove
(
signal
,
frag
,
c_searchKey
,
ent
,
treePos
);
#ifdef VM_TRACE
if
(
debugFlags
&
DebugMaint
)
{
debugOut
<<
treePos
<<
endl
;
debugOut
<<
treePos
<<
(
!
treePos
.
m_match
?
" - error"
:
""
)
<<
endl
;
}
#endif
if
(
!
treePos
.
m_match
)
{
...
...
ndb/src/kernel/blocks/dbtux/DbtuxMeta.cpp
View file @
1b31325e
...
...
@@ -235,6 +235,20 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
tree
.
m_minOccup
=
tree
.
m_maxOccup
-
maxSlack
;
// root node does not exist (also set by ctor)
tree
.
m_root
=
NullTupLoc
;
#ifdef VM_TRACE
if
(
debugFlags
&
DebugMeta
)
{
if
(
fragOpPtr
.
p
->
m_fragNo
==
0
)
{
debugOut
<<
"Index id="
<<
indexPtr
.
i
;
debugOut
<<
" nodeSize="
<<
tree
.
m_nodeSize
;
debugOut
<<
" headSize="
<<
NodeHeadSize
;
debugOut
<<
" prefSize="
<<
tree
.
m_prefSize
;
debugOut
<<
" entrySize="
<<
TreeEntSize
;
debugOut
<<
" minOccup="
<<
tree
.
m_minOccup
;
debugOut
<<
" maxOccup="
<<
tree
.
m_maxOccup
;
debugOut
<<
endl
;
}
}
#endif
// fragment is defined
c_fragOpPool
.
release
(
fragOpPtr
);
}
...
...
ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
View file @
1b31325e
...
...
@@ -24,8 +24,8 @@ int
Dbtux
::
allocNode
(
Signal
*
signal
,
NodeHandle
&
node
)
{
Frag
&
frag
=
node
.
m_frag
;
Uint32
pageId
=
NullTupLoc
.
m_pageId
;
Uint32
pageOffset
=
NullTupLoc
.
m_pageOffset
;
Uint32
pageId
=
NullTupLoc
.
getPageId
()
;
Uint32
pageOffset
=
NullTupLoc
.
getPageOffset
()
;
Uint32
*
node32
=
0
;
int
errorCode
=
c_tup
->
tuxAllocNode
(
signal
,
frag
.
m_tupIndexFragPtrI
,
pageId
,
pageOffset
,
node32
);
jamEntry
();
...
...
@@ -60,8 +60,8 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc, AccSize acc)
{
Frag
&
frag
=
node
.
m_frag
;
ndbrequire
(
loc
!=
NullTupLoc
);
Uint32
pageId
=
loc
.
m_pageId
;
Uint32
pageOffset
=
loc
.
m_pageOffset
;
Uint32
pageId
=
loc
.
getPageId
()
;
Uint32
pageOffset
=
loc
.
getPageOffset
()
;
Uint32
*
node32
=
0
;
c_tup
->
tuxGetNode
(
frag
.
m_tupIndexFragPtrI
,
pageId
,
pageOffset
,
node32
);
jamEntry
();
...
...
@@ -100,8 +100,8 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
Frag
&
frag
=
node
.
m_frag
;
ndbrequire
(
node
.
getOccup
()
==
0
);
TupLoc
loc
=
node
.
m_loc
;
Uint32
pageId
=
loc
.
m_pageId
;
Uint32
pageOffset
=
loc
.
m_pageOffset
;
Uint32
pageId
=
loc
.
getPageId
()
;
Uint32
pageOffset
=
loc
.
getPageOffset
()
;
Uint32
*
node32
=
reinterpret_cast
<
Uint32
*>
(
node
.
m_node
);
c_tup
->
tuxFreeNode
(
signal
,
frag
.
m_tupIndexFragPtrI
,
pageId
,
pageOffset
,
node32
);
jamEntry
();
...
...
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
View file @
1b31325e
...
...
@@ -108,15 +108,23 @@ Dbtux::execACC_SCANREQ(Signal* signal)
/*
* Receive bounds for scan in single direct call. The bounds can arrive
* in any order. Attribute ids are those of index table.
*
* Replace EQ by equivalent LE + GE. Check for conflicting bounds.
* Check that sets of lower and upper bounds are on initial sequences of
* keys and that all but possibly last bound is non-strict.
*
* Finally save the sets of lower and upper bounds (i.e. start key and
* end key). Full bound type (< 4) is included but only the strict bit
* is used since lower and upper have now been separated.
*/
void
Dbtux
::
execTUX_BOUND_INFO
(
Signal
*
signal
)
{
jamEntry
();
struct
BoundInfo
{
int
type
;
unsigned
offset
;
unsigned
size
;
int
type
;
};
TuxBoundInfo
*
const
sig
=
(
TuxBoundInfo
*
)
signal
->
getDataPtrSend
();
const
TuxBoundInfo
reqCopy
=
*
(
const
TuxBoundInfo
*
)
sig
;
...
...
@@ -124,18 +132,11 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
// get records
ScanOp
&
scan
=
*
c_scanOpPool
.
getPtr
(
req
->
tuxScanPtrI
);
Index
&
index
=
*
c_indexPool
.
getPtr
(
scan
.
m_indexId
);
// collect
bound info for each index attribute
BoundInfo
boundInfo
[
MaxIndexAttributes
][
2
];
// collect
lower and upper bounds
BoundInfo
boundInfo
[
2
][
MaxIndexAttributes
];
// largest attrId seen plus one
Uint32
maxAttrId
=
0
;
// skip 5 words
Uint32
maxAttrId
[
2
]
=
{
0
,
0
};
unsigned
offset
=
0
;
if
(
req
->
boundAiLength
<
offset
)
{
jam
();
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
InvalidAttrInfo
;
return
;
}
const
Uint32
*
const
data
=
(
Uint32
*
)
sig
+
TuxBoundInfo
::
SignalLength
;
// walk through entries
while
(
offset
+
2
<=
req
->
boundAiLength
)
{
...
...
@@ -156,32 +157,35 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
sig
->
errorCode
=
TuxBoundInfo
::
InvalidAttrInfo
;
return
;
}
while
(
maxAttrId
<=
attrId
)
{
BoundInfo
*
b
=
boundInfo
[
maxAttrId
++
];
b
[
0
].
type
=
b
[
1
].
type
=
-
1
;
}
BoundInfo
*
b
=
boundInfo
[
attrId
]
;
if
(
type
==
0
||
type
==
1
||
type
==
4
)
{
if
(
b
[
0
].
type
!=
-
1
)
{
jam
();
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
InvalidBounds
;
return
;
for
(
unsigned
j
=
0
;
j
<=
1
;
j
++
)
{
// check if lower/upper bit matches
const
unsigned
luBit
=
(
j
<<
1
)
;
if
((
type
&
0x2
)
!=
luBit
&&
type
!=
4
)
continue
;
// EQ -> LE, GE
const
unsigned
type2
=
(
type
&
0x1
)
|
luBit
;
// fill in any gap
while
(
maxAttrId
[
j
]
<=
attrId
)
{
BoundInfo
&
b
=
boundInfo
[
j
][
maxAttrId
[
j
]
++
]
;
b
.
type
=
-
1
;
}
b
[
0
].
offset
=
offset
;
b
[
0
].
size
=
2
+
dataSize
;
b
[
0
].
type
=
type
;
}
if
(
type
==
2
||
type
==
3
||
type
==
4
)
{
if
(
b
[
1
].
type
!=
-
1
)
{
jam
();
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
InvalidBounds
;
return
;
BoundInfo
&
b
=
boundInfo
[
j
][
attrId
];
if
(
b
.
type
!=
-
1
)
{
// compare with previous bound
if
(
b
.
type
!=
type2
||
b
.
size
!=
2
+
dataSize
||
memcmp
(
&
data
[
b
.
offset
+
2
],
&
data
[
offset
+
2
],
dataSize
<<
2
)
!=
0
)
{
jam
();
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
InvalidBounds
;
return
;
}
}
else
{
// enter new bound
b
.
type
=
type2
;
b
.
offset
=
offset
;
b
.
size
=
2
+
dataSize
;
}
b
[
1
].
offset
=
offset
;
b
[
1
].
size
=
2
+
dataSize
;
b
[
1
].
type
=
type
;
}
// jump to next
offset
+=
2
+
dataSize
;
...
...
@@ -192,34 +196,27 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
sig
->
errorCode
=
TuxBoundInfo
::
InvalidAttrInfo
;
return
;
}
// save the bounds in index attribute id order
scan
.
m_boundCnt
[
0
]
=
0
;
scan
.
m_boundCnt
[
1
]
=
0
;
for
(
unsigned
i
=
0
;
i
<
maxAttrId
;
i
++
)
{
jam
();
const
BoundInfo
*
b
=
boundInfo
[
i
];
// current limitation - check all but last is equality
if
(
i
+
1
<
maxAttrId
)
{
if
(
b
[
0
].
type
!=
4
||
b
[
1
].
type
!=
4
)
{
for
(
unsigned
j
=
0
;
j
<=
1
;
j
++
)
{
// save lower/upper bound in index attribute id order
for
(
unsigned
i
=
0
;
i
<
maxAttrId
[
j
];
i
++
)
{
jam
();
const
BoundInfo
&
b
=
boundInfo
[
j
][
i
];
// check for gap or strict bound before last
if
(
b
.
type
==
-
1
||
(
i
+
1
<
maxAttrId
[
j
]
&&
(
b
.
type
&
0x1
)))
{
jam
();
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
InvalidBounds
;
return
;
}
}
for
(
unsigned
j
=
0
;
j
<=
1
;
j
++
)
{
if
(
b
[
j
].
type
!=
-
1
)
{
bool
ok
=
scan
.
m_bound
[
j
]
->
append
(
&
data
[
b
.
offset
],
b
.
size
);
if
(
!
ok
)
{
jam
();
bool
ok
=
scan
.
m_bound
[
j
]
->
append
(
&
data
[
b
[
j
].
offset
],
b
[
j
].
size
);
if
(
!
ok
)
{
jam
();
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
OutOfBuffers
;
return
;
}
scan
.
m_boundCnt
[
j
]
++
;
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
OutOfBuffers
;
return
;
}
}
scan
.
m_boundCnt
[
j
]
=
maxAttrId
[
j
];
}
// no error
sig
->
errorCode
=
0
;
...
...
ndb/src/kernel/blocks/dbtux/DbtuxSearch.cpp
View file @
1b31325e
...
...
@@ -31,10 +31,11 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
const
unsigned
numAttrs
=
frag
.
m_numAttrs
;
NodeHandle
currNode
(
frag
);
currNode
.
m_loc
=
tree
.
m_root
;
// assume success
treePos
.
m_match
=
false
;
if
(
currNode
.
m_loc
==
NullTupLoc
)
{
// empty tree
jam
();
treePos
.
m_match
=
false
;
return
;
}
NodeHandle
glbNode
(
frag
);
// potential g.l.b of final node
...
...
@@ -93,6 +94,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
jam
();
treePos
.
m_loc
=
currNode
.
m_loc
;
treePos
.
m_pos
=
0
;
// failed
treePos
.
m_match
=
true
;
return
;
}
...
...
@@ -100,9 +102,16 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
}
// access rest of current node
accessNode
(
signal
,
currNode
,
AccFull
);
for
(
unsigned
j
=
0
,
occup
=
currNode
.
getOccup
();
j
<
occup
;
j
++
)
{
// anticipate
treePos
.
m_loc
=
currNode
.
m_loc
;
// binary search
int
lo
=
-
1
;
int
hi
=
currNode
.
getOccup
();
int
ret
;
while
(
1
)
{
jam
();
int
ret
;
// hi - lo > 1 implies lo < j < hi
int
j
=
(
hi
+
lo
)
/
2
;
// read and compare attributes
unsigned
start
=
0
;
readKeyAttrs
(
frag
,
currNode
.
getEnt
(
j
),
start
,
c_entryKey
);
...
...
@@ -113,25 +122,38 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
// keys are equal, compare entry values
ret
=
searchEnt
.
cmp
(
currNode
.
getEnt
(
j
));
}
if
(
ret
<=
0
)
{
jam
();
treePos
.
m_loc
=
currNode
.
m_loc
;
if
(
ret
<
0
)
hi
=
j
;
else
if
(
ret
>
0
)
lo
=
j
;
else
{
treePos
.
m_pos
=
j
;
treePos
.
m_match
=
(
ret
==
0
);
// failed
treePos
.
m_match
=
true
;
return
;
}
if
(
hi
-
lo
==
1
)
break
;
}
if
(
!
bottomNode
.
isNull
()
)
{
if
(
ret
<
0
)
{
jam
();
// backwards compatible for now
treePos
.
m_loc
=
bottomNode
.
m_loc
;
treePos
.
m_pos
=
0
;
treePos
.
m_match
=
false
;
treePos
.
m_pos
=
hi
;
return
;
}
treePos
.
m_loc
=
currNode
.
m_loc
;
treePos
.
m_pos
=
currNode
.
getOccup
();
treePos
.
m_match
=
false
;
if
(
hi
<
currNode
.
getOccup
())
{
jam
();
treePos
.
m_pos
=
hi
;
return
;
}
if
(
bottomNode
.
isNull
())
{
jam
();
treePos
.
m_pos
=
hi
;
return
;
}
jam
();
// backwards compatible for now
treePos
.
m_loc
=
bottomNode
.
m_loc
;
treePos
.
m_pos
=
0
;
}
/*
...
...
@@ -150,9 +172,12 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
const
unsigned
numAttrs
=
frag
.
m_numAttrs
;
NodeHandle
currNode
(
frag
);
currNode
.
m_loc
=
tree
.
m_root
;
// assume success
treePos
.
m_match
=
true
;
if
(
currNode
.
m_loc
==
NullTupLoc
)
{
// empty tree
jam
();
// failed
treePos
.
m_match
=
false
;
return
;
}
...
...
@@ -206,27 +231,26 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
jam
();
treePos
.
m_loc
=
currNode
.
m_loc
;
treePos
.
m_pos
=
0
;
treePos
.
m_match
=
true
;
return
;
}
break
;
}
// access rest of current node
accessNode
(
signal
,
currNode
,
AccFull
);
// anticipate
treePos
.
m_loc
=
currNode
.
m_loc
;
// pos 0 was handled above
for
(
unsigned
j
=
1
,
occup
=
currNode
.
getOccup
();
j
<
occup
;
j
++
)
{
jam
();
// compare only the entry
if
(
searchEnt
.
eq
(
currNode
.
getEnt
(
j
)))
{
jam
();
treePos
.
m_loc
=
currNode
.
m_loc
;
treePos
.
m_pos
=
j
;
treePos
.
m_match
=
true
;
return
;
}
}
treePos
.
m_loc
=
currNode
.
m_loc
;
treePos
.
m_pos
=
currNode
.
getOccup
();
// failed
treePos
.
m_match
=
false
;
}
...
...
ndb/src/kernel/blocks/dbtux/Times.txt
View file @
1b31325e
...
...
@@ -108,4 +108,16 @@ charsets mc02/a 35 ms 60 ms 71 pct
[ case b: TUX can no longer use pointers to TUP data ]
optim 15 mc02/a 34 ms 60 ms 72 pct
mc02/b 42 ms 85 ms 100 pct
mc02/c 5 ms 12 ms 110 pct
mc02/d 178 ms 242 ms 35 pct
[ corrected wasted space in index node ]
optim 16 mc02/a 34 ms 53 ms 53 pct
mc02/b 42 ms 75 ms 75 pct
[ case a, b: binary search of bounding node when adding entry ]
vim: set et:
ndb/test/ndbapi/testOIBasic.cpp
View file @
1b31325e
...
...
@@ -212,6 +212,8 @@ struct Par : public Opt {
// value calculation
unsigned
m_range
;
unsigned
m_pctrange
;
// choice of key
bool
m_randomkey
;
// do verify after read
bool
m_verify
;
// deadlock possible
...
...
@@ -227,6 +229,7 @@ struct Par : public Opt {
m_totrows
(
m_threads
*
m_rows
),
m_range
(
m_rows
),
m_pctrange
(
0
),
m_randomkey
(
false
),
m_verify
(
false
),
m_deadlock
(
false
)
{
}
...
...
@@ -1965,9 +1968,21 @@ BSet::calcpk(Par par, unsigned i)
int
BSet
::
setbnd
(
Par
par
)
const
{
for
(
unsigned
j
=
0
;
j
<
m_bvals
;
j
++
)
{
const
BVal
&
bval
=
*
m_bval
[
j
];
CHK
(
bval
.
setbnd
(
par
)
==
0
);
if
(
m_bvals
!=
0
)
{
unsigned
p1
=
urandom
(
m_bvals
);
unsigned
p2
=
10009
;
// prime
// random order
for
(
unsigned
j
=
0
;
j
<
m_bvals
;
j
++
)
{
unsigned
k
=
p1
+
p2
*
j
;
const
BVal
&
bval
=
*
m_bval
[
k
%
m_bvals
];
CHK
(
bval
.
setbnd
(
par
)
==
0
);
}
// duplicate
if
(
urandom
(
5
)
==
0
)
{
unsigned
k
=
urandom
(
m_bvals
);
const
BVal
&
bval
=
*
m_bval
[
k
];
CHK
(
bval
.
setbnd
(
par
)
==
0
);
}
}
return
0
;
}
...
...
@@ -2107,7 +2122,8 @@ pkupdate(Par par)
Lst
lst
;
bool
deadlock
=
false
;
for
(
unsigned
j
=
0
;
j
<
par
.
m_rows
;
j
++
)
{
unsigned
i
=
thrrow
(
par
,
j
);
unsigned
j2
=
!
par
.
m_randomkey
?
j
:
urandom
(
par
.
m_rows
);
unsigned
i
=
thrrow
(
par
,
j2
);
set
.
lock
();
if
(
!
set
.
exist
(
i
)
||
set
.
pending
(
i
))
{
set
.
unlock
();
...
...
@@ -2710,6 +2726,7 @@ pkupdateindexbuild(Par par)
if
(
par
.
m_no
==
0
)
{
CHK
(
createindex
(
par
)
==
0
);
}
else
{
par
.
m_randomkey
=
true
;
CHK
(
pkupdate
(
par
)
==
0
);
}
return
0
;
...
...
sql/ha_ndbcluster.cc
View file @
1b31325e
...
...
@@ -1227,114 +1227,158 @@ inline int ha_ndbcluster::next_result(byte *buf)
DBUG_RETURN
(
HA_ERR_END_OF_FILE
);
}
/*
Set bounds for
a ordered index scan, use key_range
Set bounds for
ordered index scan.
*/
int
ha_ndbcluster
::
set_bounds
(
NdbIndexScanOperation
*
op
,
const
key_range
*
key
,
int
bound
)
const
key_range
*
keys
[
2
])
{
uint
key_len
,
key_store_len
,
tot_len
,
key_tot_len
;
byte
*
key_ptr
;
KEY
*
key_info
=
table
->
key_info
+
active_index
;
KEY_PART_INFO
*
key_part
=
key_info
->
key_part
;
KEY_PART_INFO
*
end
=
key_part
+
key_info
->
key_parts
;
Field
*
field
;
bool
key_nullable
,
key_null
;
const
KEY
*
const
key_info
=
table
->
key_info
+
active_index
;
const
uint
key_parts
=
key_info
->
key_parts
;
uint
key_tot_len
[
2
];
uint
tot_len
;
int
i
,
j
;
DBUG_ENTER
(
"set_bounds"
);
DBUG_PRINT
(
"enter"
,
(
"bound: %d"
,
bound
));
DBUG_PRINT
(
"enter"
,
(
"key_parts: %d"
,
key_info
->
key_parts
));
DBUG_PRINT
(
"enter"
,
(
"key->length: %d"
,
key
->
length
));
DBUG_PRINT
(
"enter"
,
(
"key->flag: %d"
,
key
->
flag
));
DBUG_PRINT
(
"info"
,
(
"key_parts=%d"
,
key_parts
));
// Set bounds using key data
tot_len
=
0
;
key_ptr
=
(
byte
*
)
key
->
key
;
key_tot_len
=
key
->
length
;
for
(;
key_part
!=
end
;
key_part
++
)
for
(
j
=
0
;
j
<=
1
;
j
++
)
{
field
=
key_part
->
field
;
key_len
=
key_part
->
length
;
key_store_len
=
key_part
->
store_length
;
key_nullable
=
(
bool
)
key_part
->
null_bit
;
key_null
=
(
field
->
maybe_null
()
&&
*
key_ptr
);
tot_len
+=
key_store_len
;
const
char
*
bounds
[]
=
{
"LE"
,
"LT"
,
"GE"
,
"GT"
,
"EQ"
};
DBUG_ASSERT
(
bound
>=
0
&&
bound
<=
4
);
DBUG_PRINT
(
"info"
,
(
"Set Bound%s on %s %s %s"
,
bounds
[
bound
],
field
->
field_name
,
key_nullable
?
"NULLABLE"
:
""
,
key_null
?
"NULL"
:
""
));
DBUG_PRINT
(
"info"
,
(
"Total length %d"
,
tot_len
));
DBUG_DUMP
(
"key"
,
(
char
*
)
key_ptr
,
key_store_len
);
if
(
op
->
setBound
(
field
->
field_name
,
bound
,
key_null
?
0
:
(
key_nullable
?
key_ptr
+
1
:
key_ptr
),
key_null
?
0
:
key_len
)
!=
0
)
ERR_RETURN
(
op
->
getNdbError
());
key_ptr
+=
key_store_len
;
if
(
tot_len
>=
key_tot_len
)
break
;
/*
Only one bound which is not EQ can be set
so if this bound was not EQ, bail out and make
a best effort attempt
*/
if
(
bound
!=
NdbIndexScanOperation
::
BoundEQ
)
break
;
const
key_range
*
key
=
keys
[
j
];
if
(
key
!=
NULL
)
{
// for key->flag see ha_rkey_function
DBUG_PRINT
(
"info"
,
(
"key %d length=%d flag=%d"
,
j
,
key
->
length
,
key
->
flag
));
key_tot_len
[
j
]
=
key
->
length
;
}
else
{
DBUG_PRINT
(
"info"
,
(
"key %d not present"
,
j
));
key_tot_len
[
j
]
=
0
;
}
}
tot_len
=
0
;
DBUG_RETURN
(
0
);
}
for
(
i
=
0
;
i
<
key_parts
;
i
++
)
{
KEY_PART_INFO
*
key_part
=
&
key_info
->
key_part
[
i
];
Field
*
field
=
key_part
->
field
;
uint
part_len
=
key_part
->
length
;
uint
part_store_len
=
key_part
->
store_length
;
bool
part_nullable
=
(
bool
)
key_part
->
null_bit
;
// Info about each key part
struct
part_st
{
bool
part_last
;
const
key_range
*
key
;
const
byte
*
part_ptr
;
bool
part_null
;
int
bound_type
;
const
char
*
bound_ptr
;
};
struct
part_st
part
[
2
];
for
(
j
=
0
;
j
<=
1
;
j
++
)
{
struct
part_st
&
p
=
part
[
j
];
p
.
key
=
NULL
;
p
.
bound_type
=
-
1
;
if
(
tot_len
<
key_tot_len
[
j
])
{
p
.
part_last
=
(
tot_len
+
part_store_len
>=
key_tot_len
[
j
]);
p
.
key
=
keys
[
j
];
p
.
part_ptr
=
&
p
.
key
->
key
[
tot_len
];
p
.
part_null
=
(
field
->
maybe_null
()
&&
*
p
.
part_ptr
);
p
.
bound_ptr
=
(
const
char
*
)
p
.
part_null
?
0
:
part_nullable
?
p
.
part_ptr
+
1
:
p
.
part_ptr
;
if
(
j
==
0
)
{
switch
(
p
.
key
->
flag
)
{
case
HA_READ_KEY_EXACT
:
p
.
bound_type
=
NdbIndexScanOperation
::
BoundEQ
;
break
;
case
HA_READ_KEY_OR_NEXT
:
p
.
bound_type
=
NdbIndexScanOperation
::
BoundLE
;
break
;
case
HA_READ_AFTER_KEY
:
if
(
!
p
.
part_last
)
p
.
bound_type
=
NdbIndexScanOperation
::
BoundLE
;
else
p
.
bound_type
=
NdbIndexScanOperation
::
BoundLT
;
break
;
default:
break
;
}
}
if
(
j
==
1
)
{
switch
(
p
.
key
->
flag
)
{
case
HA_READ_BEFORE_KEY
:
if
(
!
p
.
part_last
)
p
.
bound_type
=
NdbIndexScanOperation
::
BoundGE
;
else
p
.
bound_type
=
NdbIndexScanOperation
::
BoundGT
;
break
;
case
HA_READ_AFTER_KEY
:
// weird
p
.
bound_type
=
NdbIndexScanOperation
::
BoundGE
;
break
;
default:
break
;
}
}
#ifndef DBUG_OFF
if
(
p
.
bound_type
==
-
1
)
{
DBUG_PRINT
(
"error"
,
(
"key %d unknown flag %d"
,
j
,
p
.
key
->
flag
));
DBUG_ASSERT
(
false
);
// Stop setting bounds but continue with what we have
DBUG_RETURN
(
0
);
}
}
}
const
char
*
key_flag_strs
[]
=
{
"HA_READ_KEY_EXACT"
,
"HA_READ_KEY_OR_NEXT"
,
"HA_READ_KEY_OR_PREV"
,
"HA_READ_AFTER_KEY"
,
"HA_READ_BEFORE_KEY"
,
"HA_READ_PREFIX"
,
"HA_READ_PREFIX_LAST"
,
"HA_READ_PREFIX_LAST_OR_PREV"
,
"HA_READ_MBR_CONTAIN"
,
"HA_READ_MBR_INTERSECT"
,
"HA_READ_MBR_WITHIN"
,
"HA_READ_MBR_DISJOINT"
,
"HA_READ_MBR_EQUAL"
};
// Seen with e.g. b = 1 and c > 1
if
(
part
[
0
].
bound_type
==
NdbIndexScanOperation
::
BoundLE
&&
part
[
1
].
bound_type
==
NdbIndexScanOperation
::
BoundGE
&&
memcmp
(
part
[
0
].
part_ptr
,
part
[
1
].
part_ptr
,
part_store_len
)
==
0
)
{
DBUG_PRINT
(
"info"
,
(
"replace LE/GE pair by EQ"
));
part
[
0
].
bound_type
=
NdbIndexScanOperation
::
BoundEQ
;
part
[
1
].
bound_type
=
-
1
;
}
// Not seen but was in previous version
if
(
part
[
0
].
bound_type
==
NdbIndexScanOperation
::
BoundEQ
&&
part
[
1
].
bound_type
==
NdbIndexScanOperation
::
BoundGE
&&
memcmp
(
part
[
0
].
part_ptr
,
part
[
1
].
part_ptr
,
part_store_len
)
==
0
)
{
DBUG_PRINT
(
"info"
,
(
"remove GE from EQ/GE pair"
));
part
[
1
].
bound_type
=
-
1
;
}
const
int
no_of_key_flags
=
sizeof
(
key_flag_strs
)
/
sizeof
(
char
*
);
for
(
j
=
0
;
j
<=
1
;
j
++
)
{
struct
part_st
&
p
=
part
[
j
];
// Set bound if not done with this key
if
(
p
.
key
!=
NULL
)
{
DBUG_PRINT
(
"info"
,
(
"key %d:%d offset=%d length=%d last=%d bound=%d"
,
j
,
i
,
tot_len
,
part_len
,
p
.
part_last
,
p
.
bound_type
));
DBUG_DUMP
(
"info"
,
(
const
char
*
)
p
.
part_ptr
,
part_store_len
);
// Set bound if not cancelled via type -1
if
(
p
.
bound_type
!=
-
1
)
if
(
op
->
setBound
(
field
->
field_name
,
p
.
bound_type
,
p
.
bound_ptr
))
ERR_RETURN
(
op
->
getNdbError
());
}
}
void
print_key
(
const
key_range
*
key
,
const
char
*
info
)
{
if
(
key
)
{
const
char
*
str
=
key
->
flag
<
no_of_key_flags
?
key_flag_strs
[
key
->
flag
]
:
"Unknown flag"
;
DBUG_LOCK_FILE
;
fprintf
(
DBUG_FILE
,
"%s: %s, length=%d, key="
,
info
,
str
,
key
->
length
);
uint
i
;
for
(
i
=
0
;
i
<
key
->
length
-
1
;
i
++
)
fprintf
(
DBUG_FILE
,
"%0d "
,
key
->
key
[
i
]);
fprintf
(
DBUG_FILE
,
"
\n
"
);
DBUG_UNLOCK_FILE
;
tot_len
+=
part_store_len
;
}
return
;
DBUG_RETURN
(
0
)
;
}
#endif
/*
Start ordered index scan in NDB
...
...
@@ -1353,13 +1397,10 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_PRINT
(
"enter"
,
(
"index: %u, sorted: %d"
,
active_index
,
sorted
));
DBUG_PRINT
(
"enter"
,
(
"Starting new ordered scan on %s"
,
m_tabname
));
DBUG_EXECUTE
(
"enter"
,
print_key
(
start_key
,
"start_key"
););
DBUG_EXECUTE
(
"enter"
,
print_key
(
end_key
,
"end_key"
););
// Check that sorted seems to be initialised
DBUG_ASSERT
(
sorted
==
0
||
sorted
==
1
);
if
(
m_active_cursor
==
0
)
if
(
m_active_cursor
==
0
)
{
restart
=
false
;
NdbOperation
::
LockMode
lm
=
...
...
@@ -1380,29 +1421,15 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
if
(
op
->
reset_bounds
())
DBUG_RETURN
(
ndb_err
(
m_active_trans
));
}
if
(
start_key
&&
set_bounds
(
op
,
start_key
,
(
start_key
->
flag
==
HA_READ_KEY_EXACT
)
?
NdbIndexScanOperation
::
BoundEQ
:
(
start_key
->
flag
==
HA_READ_AFTER_KEY
)
?
NdbIndexScanOperation
::
BoundLT
:
NdbIndexScanOperation
::
BoundLE
))
DBUG_RETURN
(
1
);
if
(
end_key
)
{
if
(
start_key
&&
start_key
->
flag
==
HA_READ_KEY_EXACT
)
{
DBUG_PRINT
(
"info"
,
(
"start_key is HA_READ_KEY_EXACT ignoring end_key"
));
}
else
if
(
set_bounds
(
op
,
end_key
,
(
end_key
->
flag
==
HA_READ_AFTER_KEY
)
?
NdbIndexScanOperation
::
BoundGE
:
NdbIndexScanOperation
::
BoundGT
))
DBUG_RETURN
(
1
);
const
key_range
*
keys
[
2
]
=
{
start_key
,
end_key
};
int
ret
=
set_bounds
(
op
,
keys
);
if
(
ret
)
DBUG_RETURN
(
ret
);
}
if
(
!
restart
)
if
(
!
restart
)
{
DBUG_RETURN
(
define_read_attrs
(
buf
,
op
));
}
...
...
sql/ha_ndbcluster.h
View file @
1b31325e
...
...
@@ -214,8 +214,7 @@ class ha_ndbcluster: public handler
int
set_primary_key
(
NdbOperation
*
op
,
const
byte
*
key
);
int
set_primary_key
(
NdbOperation
*
op
);
int
set_primary_key_from_old_data
(
NdbOperation
*
op
,
const
byte
*
old_data
);
int
set_bounds
(
NdbIndexScanOperation
*
ndb_op
,
const
key_range
*
key
,
int
bound
);
int
set_bounds
(
NdbIndexScanOperation
*
ndb_op
,
const
key_range
*
keys
[
2
]);
int
key_cmp
(
uint
keynr
,
const
byte
*
old_row
,
const
byte
*
new_row
);
void
print_results
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment