Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
mariadb
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
mariadb
Commits
8920cff4
Commit
8920cff4
authored
Oct 08, 2004
by
pekka@mysql.com
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
NDB wl-2151 Fix bounds setting table handler vs TUX
parent
ae87bd11
Changes
9
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
381 additions
and
251 deletions
+381
-251
mysql-test/ndb/ndb_range_bounds.pl
mysql-test/ndb/ndb_range_bounds.pl
+133
-0
ndb/include/kernel/signaldata/TuxBound.hpp
ndb/include/kernel/signaldata/TuxBound.hpp
+0
-1
ndb/include/ndbapi/NdbIndexScanOperation.hpp
ndb/include/ndbapi/NdbIndexScanOperation.hpp
+18
-28
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+0
-1
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
+20
-46
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
+52
-55
ndb/test/ndbapi/testOIBasic.cpp
ndb/test/ndbapi/testOIBasic.cpp
+15
-3
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.cc
+142
-115
sql/ha_ndbcluster.h
sql/ha_ndbcluster.h
+1
-2
No files found.
mysql-test/ndb/ndb_range_bounds.pl
0 → 100644
View file @
8920cff4
#
# test range scan bounds
# output to mysql-test/t/ndb_range_bounds.test
#
# give option --all to generate all cases
#
use
strict
;
use
integer
;
my
$all
=
shift
;
!
defined
(
$all
)
||
(
$all
eq
'
--all
'
&&
!
defined
(
shift
))
or
die
"
only available option is --all
";
my
$table
=
'
t
';
print
<<EOF;
--source include/have_ndb.inc
--disable_warnings
drop table if exists $table;
--enable_warnings
# test range scan bounds
# generated by mysql-test/ndb/ndb_range_bounds.pl
# all selects must return 0
EOF
sub
cut
($$@)
{
my
(
$op
,
$key
,
@v
)
=
@_
;
$op
=
'
==
'
if
$op
eq
'
=
';
my
(
@w
);
eval
"
\
@w
= grep(
\
$_
$op
$key
,
\
@v
)
";
$@
and
die
$@
;
return
@w
;
}
sub
mkdummy
(\@) {
my
(
$val
)
=
@_
;
return
{
'
dummy
'
=>
1
,
'
exp
'
=>
'
9 = 9
',
'
cnt
'
=>
scalar
@$val
,
};
}
sub
mkone
($$$\@) {
my
(
$col
,
$op
,
$key
,
$val
)
=
@_
;
my
$cnt
=
scalar
cut
(
$op
,
$key
,
@$val
);
return
{
'
exp
'
=>
"
$col
$op
$key
",
'
cnt
'
=>
$cnt
,
};
}
sub
mktwo
($$$$$\@) {
my
(
$col
,
$op1
,
$key1
,
$op2
,
$key2
,
$val
)
=
@_
;
my
$cnt
=
scalar
cut
(
$op2
,
$key2
,
cut
(
$op1
,
$key1
,
@$val
));
return
{
'
exp
'
=>
"
$col
$op1
$key1
and
$col
$op2
$key2
",
'
cnt
'
=>
$cnt
,
};
}
sub
mkall
($$$\@) {
my
(
$col
,
$key1
,
$key2
,
$val
)
=
@_
;
my
@a
=
();
my
$p
=
mkdummy
(
@$val
);
push
(
@a
,
$p
)
if
$all
;
my
@ops1
=
$all
?
qw(< <= = >= >)
:
qw(= >= >)
;
my
@ops2
=
$all
?
qw(< <= = >= >)
:
qw(< <=)
;
for
my
$op1
(
@ops1
)
{
my
$p
=
mkone
(
$col
,
$op1
,
$key1
,
@$val
);
push
(
@a
,
$p
)
if
$all
||
$p
->
{
cnt
}
!=
0
;
for
my
$op2
(
@ops2
)
{
my
$p
=
mktwo
(
$col
,
$op1
,
$key1
,
$op2
,
$key2
,
@$val
);
push
(
@a
,
$p
)
if
$all
||
$p
->
{
cnt
}
!=
0
;
}
}
return
\
@a
;
}
for
my
$nn
("
bcd
",
"")
{
my
%
nn
;
for
my
$x
(
qw(b c d)
)
{
$nn
{
$x
}
=
$nn
=~
/$x/
?
"
not null
"
:
"
null
";
}
print
<<EOF;
create table $table (
a int primary key,
b int $nn{b},
c int $nn{c},
d int $nn{d},
index (b, c, d)
) engine=ndb;
EOF
my
@val
=
(
0
..
4
);
my
$v0
=
0
;
for
my
$v1
(
@val
)
{
for
my
$v2
(
@val
)
{
for
my
$v3
(
@val
)
{
print
"
insert into
$table
values(
$v0
,
$v1
,
$v2
,
$v3
);
\n
";
$v0
++
;
}
}
}
my
$key1
=
1
;
my
$key2
=
3
;
my
$a1
=
mkall
('
b
',
$key1
,
$key2
,
@val
);
my
$a2
=
mkall
('
c
',
$key1
,
$key2
,
@val
);
my
$a3
=
mkall
('
d
',
$key1
,
$key2
,
@val
);
for
my
$p1
(
@$a1
)
{
my
$cnt1
=
$p1
->
{
cnt
}
*
@val
*
@val
;
print
"
select count(*) -
$cnt1
from
$table
";
print
"
where
$p1
->{exp};
\n
";
for
my
$p2
(
@$a2
)
{
my
$cnt2
=
$p1
->
{
cnt
}
*
$p2
->
{
cnt
}
*
@val
;
print
"
select count(*) -
$cnt2
from
$table
";
print
"
where
$p1
->{exp} and
$p2
->{exp};
\n
";
for
my
$p3
(
@$a3
)
{
my
$cnt3
=
$p1
->
{
cnt
}
*
$p2
->
{
cnt
}
*
$p3
->
{
cnt
};
print
"
select count(*) -
$cnt3
from
$table
";
print
"
where
$p1
->{exp} and
$p2
->{exp} and
$p3
->{exp};
\n
";
}
}
}
print
<<EOF;
drop table $table;
EOF
}
# vim: set sw=2:
ndb/include/kernel/signaldata/TuxBound.hpp
View file @
8920cff4
...
...
@@ -48,7 +48,6 @@ private:
Uint32
tuxScanPtrI
;
/*
* Number of words of bound info included after fixed signal data.
* Starts with 5 unused words (word 0 is length used by LQH).
*/
Uint32
boundAiLength
;
};
...
...
ndb/include/ndbapi/NdbIndexScanOperation.hpp
View file @
8920cff4
...
...
@@ -55,28 +55,12 @@ public:
return
readTuples
(
LM_Exclusive
,
0
,
parallell
,
false
);
}
/**
* @name Define Range Scan
*
* A range scan is a scan on an ordered index. The operation is on
* the index table but tuples are returned from the primary table.
* The index contains all tuples where at least one index key has not
* null value.
*
* A range scan is currently opened via a normal open scan method.
* Bounds can be defined for each index key. After setting bounds,
* usual scan methods can be used (get value, interpreter, take over).
* These operate on the primary table.
*
* @{
*/
/**
* Type of ordered index key bound. The values (0-4) will not change
* and can be used explicitly (e.g. they could be computed).
*/
enum
BoundType
{
BoundLE
=
0
,
///< lower bound
,
BoundLE
=
0
,
///< lower bound
BoundLT
=
1
,
///< lower bound, strict
BoundGE
=
2
,
///< upper bound
BoundGT
=
3
,
///< upper bound, strict
...
...
@@ -86,20 +70,28 @@ public:
/**
* Define bound on index key in range scan.
*
* Each index key can have lower and/or upper bound, or can be set
* equal to a value. The bounds can be defined in any order but
* a duplicate definition is an error.
* Each index key can have lower and/or upper bound. Setting the key
* equal to a value defines both upper and lower bounds. The bounds
* can be defined in any order. Conflicting definitions is an error.
*
* For equality, it is better to use BoundEQ instead of the equivalent
* pair of BoundLE and BoundGE. This is especially true when table
* distribution key is an initial part of the index key.
*
* The bounds must specify a single range i.e. they are on an initial
* sequence of index keys and the condition is equality for all but
* (at most) the last key which has a lower and/or upper bound.
* The sets of lower and upper bounds must be on initial sequences of
* index keys. All but possibly the last bound must be non-strict.
* So "a >= 2 and b > 3" is ok but "a > 2 and b >= 3" is not.
*
* The scan may currently return tuples for which the bounds are not
* satisfied. For example, "a <= 2 and b <= 3" scans the index up to
* (a=2, b=3) but also returns any (a=1, b=4).
*
* NULL is treated like a normal value which is less than any not-NULL
* value and equal to another NULL value. To
search for
NULL use
* value and equal to another NULL value. To
compare against
NULL use
* setBound with null pointer (0).
*
* An index stores also all-NULL keys
(this may become optional).
*
Doing index scan with empty
bound set returns all table tuples.
* An index stores also all-NULL keys
. Doing index scan with empty
* bound set returns all table tuples.
*
* @param attrName Attribute name, alternatively:
* @param anAttrId Index column id (starting from 0)
...
...
@@ -117,8 +109,6 @@ public:
*/
int
setBound
(
Uint32
anAttrId
,
int
type
,
const
void
*
aValue
,
Uint32
len
=
0
);
/** @} *********************************************************************/
/**
* Reset bounds and put operation in list that will be
* sent on next execute
...
...
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
View file @
8920cff4
...
...
@@ -7683,7 +7683,6 @@ void Dblqh::accScanConfScanLab(Signal* signal)
Uint32
boundAiLength
=
tcConnectptr
.
p
->
primKeyLen
-
4
;
if
(
scanptr
.
p
->
rangeScan
)
{
jam
();
// bound info length is in first of the 5 header words
TuxBoundInfo
*
const
req
=
(
TuxBoundInfo
*
)
signal
->
getDataPtrSend
();
req
->
errorCode
=
RNIL
;
req
->
tuxScanPtrI
=
scanptr
.
p
->
scanAccPtr
;
...
...
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp
View file @
8920cff4
...
...
@@ -87,21 +87,23 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
/*
* Scan bound vs node prefix or entry.
*
* Compare lower or upper bound and index
attribute data. The attribute
*
data may be partial in which case CmpUnknown may be returned.
*
Returns -1 if the boundary is to the left of the compared key and +1
*
if the boundary is to the right of the compared ke
y.
* Compare lower or upper bound and index
entry data. The entry data
*
may be partial in which case CmpUnknown may be returned. Otherwise
*
returns -1 if the bound is to the left of the entry and +1 if the
*
bound is to the right of the entr
y.
*
* T
o get this behaviour we treat equality a little bit special. If the
*
boundary is a lower bound then the boundary is to the left of all
*
equal keys and if it is an upper bound then the boundary is to the
*
right of all equal keys
.
* T
he routine is similar to cmpSearchKey, but 0 is never returned.
*
Suppose all attributes compare equal. Recall that all bounds except
*
possibly the last one are non-strict. Use the given bound direction
*
(0-lower 1-upper) and strictness of last bound to return -1 or +1
.
*
* When searching for the first key we are using the lower bound to try
* to find the first key that is to the right of the boundary. Then we
* start scanning from this tuple (including the tuple itself) until we
* find the first key which is to the right of the boundary. Then we
* stop and do not include that key in the scan result.
* Following example illustrates this. We are at (a=2, b=3).
*
* dir bounds strict return
* 0 a >= 2 and b >= 3 no -1
* 0 a >= 2 and b > 3 yes +1
* 1 a <= 2 and b <= 3 no +1
* 1 a <= 2 and b < 3 yes -1
*/
int
Dbtux
::
cmpScanBound
(
const
Frag
&
frag
,
unsigned
dir
,
ConstData
boundInfo
,
unsigned
boundCount
,
ConstData
entryData
,
unsigned
maxlen
)
...
...
@@ -111,12 +113,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
ndbrequire
(
dir
<=
1
);
// number of words of data left
unsigned
len2
=
maxlen
;
/*
* No boundary means full scan, low boundary is to the right of all
* keys. Thus we should always return -1. For upper bound we are to
* the right of all keys, thus we should always return +1. We achieve
* this behaviour by initializing type to 4.
*/
// in case of no bounds, init last type to something non-strict
unsigned
type
=
4
;
while
(
boundCount
!=
0
)
{
if
(
len2
<=
AttributeHeaderSize
)
{
...
...
@@ -124,7 +121,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
return
NdbSqlUtil
::
CmpUnknown
;
}
len2
-=
AttributeHeaderSize
;
// get and skip bound type
// get and skip bound type
(it is used after the loop)
type
=
boundInfo
[
0
];
boundInfo
+=
1
;
if
(
!
boundInfo
.
ah
().
isNULL
())
{
...
...
@@ -166,30 +163,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
entryData
+=
AttributeHeaderSize
+
entryData
.
ah
().
getDataSize
();
boundCount
-=
1
;
}
if
(
dir
==
0
)
{
jam
();
/*
* Looking for the lower bound. If strict lower bound then the
* boundary is to the right of the compared key and otherwise (equal
* included in range) then the boundary is to the left of the key.
*/
if
(
type
==
1
)
{
jam
();
return
+
1
;
}
return
-
1
;
}
else
{
jam
();
/*
* Looking for the upper bound. If strict upper bound then the
* boundary is to the left of all equal keys and otherwise (equal
* included in the range) then the boundary is to the right of all
* equal keys.
*/
if
(
type
==
3
)
{
jam
();
return
-
1
;
}
return
+
1
;
}
// all attributes were equal
const
int
strict
=
(
type
&
0x1
);
return
(
dir
==
0
?
(
strict
==
0
?
-
1
:
+
1
)
:
(
strict
==
0
?
+
1
:
-
1
));
}
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp
View file @
8920cff4
...
...
@@ -108,15 +108,23 @@ Dbtux::execACC_SCANREQ(Signal* signal)
/*
* Receive bounds for scan in single direct call. The bounds can arrive
* in any order. Attribute ids are those of index table.
*
* Replace EQ by equivalent LE + GE. Check for conflicting bounds.
* Check that sets of lower and upper bounds are on initial sequences of
* keys and that all but possibly last bound is non-strict.
*
* Finally save the sets of lower and upper bounds (i.e. start key and
* end key). Full bound type (< 4) is included but only the strict bit
* is used since lower and upper have now been separated.
*/
void
Dbtux
::
execTUX_BOUND_INFO
(
Signal
*
signal
)
{
jamEntry
();
struct
BoundInfo
{
int
type
;
unsigned
offset
;
unsigned
size
;
int
type
;
};
TuxBoundInfo
*
const
sig
=
(
TuxBoundInfo
*
)
signal
->
getDataPtrSend
();
const
TuxBoundInfo
reqCopy
=
*
(
const
TuxBoundInfo
*
)
sig
;
...
...
@@ -124,18 +132,11 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
// get records
ScanOp
&
scan
=
*
c_scanOpPool
.
getPtr
(
req
->
tuxScanPtrI
);
Index
&
index
=
*
c_indexPool
.
getPtr
(
scan
.
m_indexId
);
// collect
bound info for each index attribute
BoundInfo
boundInfo
[
MaxIndexAttributes
][
2
];
// collect
lower and upper bounds
BoundInfo
boundInfo
[
2
][
MaxIndexAttributes
];
// largest attrId seen plus one
Uint32
maxAttrId
=
0
;
// skip 5 words
Uint32
maxAttrId
[
2
]
=
{
0
,
0
};
unsigned
offset
=
0
;
if
(
req
->
boundAiLength
<
offset
)
{
jam
();
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
InvalidAttrInfo
;
return
;
}
const
Uint32
*
const
data
=
(
Uint32
*
)
sig
+
TuxBoundInfo
::
SignalLength
;
// walk through entries
while
(
offset
+
2
<=
req
->
boundAiLength
)
{
...
...
@@ -156,32 +157,35 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
sig
->
errorCode
=
TuxBoundInfo
::
InvalidAttrInfo
;
return
;
}
while
(
maxAttrId
<=
attrId
)
{
BoundInfo
*
b
=
boundInfo
[
maxAttrId
++
];
b
[
0
].
type
=
b
[
1
].
type
=
-
1
;
}
BoundInfo
*
b
=
boundInfo
[
attrId
];
if
(
type
==
0
||
type
==
1
||
type
==
4
)
{
if
(
b
[
0
].
type
!=
-
1
)
{
for
(
unsigned
j
=
0
;
j
<=
1
;
j
++
)
{
// check if lower/upper bit matches
const
unsigned
luBit
=
(
j
<<
1
);
if
((
type
&
0x2
)
!=
luBit
&&
type
!=
4
)
continue
;
// EQ -> LE, GE
const
unsigned
type2
=
(
type
&
0x1
)
|
luBit
;
// fill in any gap
while
(
maxAttrId
[
j
]
<=
attrId
)
{
BoundInfo
&
b
=
boundInfo
[
j
][
maxAttrId
[
j
]
++
];
b
.
type
=
-
1
;
}
BoundInfo
&
b
=
boundInfo
[
j
][
attrId
];
if
(
b
.
type
!=
-
1
)
{
// compare with previous bound
if
(
b
.
type
!=
type2
||
b
.
size
!=
2
+
dataSize
||
memcmp
(
&
data
[
b
.
offset
+
2
],
&
data
[
offset
+
2
],
dataSize
<<
2
)
!=
0
)
{
jam
();
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
InvalidBounds
;
return
;
}
b
[
0
].
offset
=
offset
;
b
[
0
].
size
=
2
+
dataSize
;
b
[
0
].
type
=
type
;
}
if
(
type
==
2
||
type
==
3
||
type
==
4
)
{
if
(
b
[
1
].
type
!=
-
1
)
{
jam
();
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
InvalidBounds
;
return
;
}
else
{
// enter new bound
b
.
type
=
type2
;
b
.
offset
=
offset
;
b
.
size
=
2
+
dataSize
;
}
b
[
1
].
offset
=
offset
;
b
[
1
].
size
=
2
+
dataSize
;
b
[
1
].
type
=
type
;
}
// jump to next
offset
+=
2
+
dataSize
;
...
...
@@ -192,34 +196,27 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
sig
->
errorCode
=
TuxBoundInfo
::
InvalidAttrInfo
;
return
;
}
// save the bounds in index attribute id order
scan
.
m_boundCnt
[
0
]
=
0
;
scan
.
m_boundCnt
[
1
]
=
0
;
for
(
unsigned
i
=
0
;
i
<
maxAttrId
;
i
++
)
{
for
(
unsigned
j
=
0
;
j
<=
1
;
j
++
)
{
// save lower/upper bound in index attribute id order
for
(
unsigned
i
=
0
;
i
<
maxAttrId
[
j
];
i
++
)
{
jam
();
const
BoundInfo
*
b
=
boundInfo
[
i
];
// current limitation - check all but last is equality
if
(
i
+
1
<
maxAttrId
)
{
if
(
b
[
0
].
type
!=
4
||
b
[
1
].
type
!=
4
)
{
const
BoundInfo
&
b
=
boundInfo
[
j
][
i
];
// check for gap or strict bound before last
if
(
b
.
type
==
-
1
||
(
i
+
1
<
maxAttrId
[
j
]
&&
(
b
.
type
&
0x1
)))
{
jam
();
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
InvalidBounds
;
return
;
}
}
for
(
unsigned
j
=
0
;
j
<=
1
;
j
++
)
{
if
(
b
[
j
].
type
!=
-
1
)
{
jam
();
bool
ok
=
scan
.
m_bound
[
j
]
->
append
(
&
data
[
b
[
j
].
offset
],
b
[
j
].
size
);
bool
ok
=
scan
.
m_bound
[
j
]
->
append
(
&
data
[
b
.
offset
],
b
.
size
);
if
(
!
ok
)
{
jam
();
scan
.
m_state
=
ScanOp
::
Invalid
;
sig
->
errorCode
=
TuxBoundInfo
::
OutOfBuffers
;
return
;
}
scan
.
m_boundCnt
[
j
]
++
;
}
}
scan
.
m_boundCnt
[
j
]
=
maxAttrId
[
j
];
}
// no error
sig
->
errorCode
=
0
;
...
...
ndb/test/ndbapi/testOIBasic.cpp
View file @
8920cff4
...
...
@@ -1965,10 +1965,22 @@ BSet::calcpk(Par par, unsigned i)
int
BSet
::
setbnd
(
Par
par
)
const
{
if
(
m_bvals
!=
0
)
{
unsigned
p1
=
urandom
(
m_bvals
);
unsigned
p2
=
10009
;
// prime
// random order
for
(
unsigned
j
=
0
;
j
<
m_bvals
;
j
++
)
{
const
BVal
&
bval
=
*
m_bval
[
j
];
unsigned
k
=
p1
+
p2
*
j
;
const
BVal
&
bval
=
*
m_bval
[
k
%
m_bvals
];
CHK
(
bval
.
setbnd
(
par
)
==
0
);
}
// duplicate
if
(
urandom
(
5
)
==
0
)
{
unsigned
k
=
urandom
(
m_bvals
);
const
BVal
&
bval
=
*
m_bval
[
k
];
CHK
(
bval
.
setbnd
(
par
)
==
0
);
}
}
return
0
;
}
...
...
sql/ha_ndbcluster.cc
View file @
8920cff4
...
...
@@ -1222,114 +1222,158 @@ inline int ha_ndbcluster::next_result(byte *buf)
DBUG_RETURN
(
HA_ERR_END_OF_FILE
);
}
/*
Set bounds for
a ordered index scan, use key_range
Set bounds for
ordered index scan.
*/
int
ha_ndbcluster
::
set_bounds
(
NdbIndexScanOperation
*
op
,
const
key_range
*
key
,
int
bound
)
const
key_range
*
keys
[
2
])
{
uint
key_len
,
key_store_len
,
tot_len
,
key_tot_len
;
byte
*
key_ptr
;
KEY
*
key_info
=
table
->
key_info
+
active_index
;
KEY_PART_INFO
*
key_part
=
key_info
->
key_part
;
KEY_PART_INFO
*
end
=
key_part
+
key_info
->
key_parts
;
Field
*
field
;
bool
key_nullable
,
key_null
;
const
KEY
*
const
key_info
=
table
->
key_info
+
active_index
;
const
uint
key_parts
=
key_info
->
key_parts
;
uint
key_tot_len
[
2
];
uint
tot_len
;
int
i
,
j
;
DBUG_ENTER
(
"set_bounds"
);
DBUG_PRINT
(
"enter"
,
(
"bound: %d"
,
bound
));
DBUG_PRINT
(
"enter"
,
(
"key_parts: %d"
,
key_info
->
key_parts
));
DBUG_PRINT
(
"enter"
,
(
"key->length: %d"
,
key
->
length
));
DBUG_PRINT
(
"enter"
,
(
"key->flag: %d"
,
key
->
flag
));
DBUG_PRINT
(
"info"
,
(
"key_parts=%d"
,
key_parts
));
// Set bounds using key data
tot_len
=
0
;
key_ptr
=
(
byte
*
)
key
->
key
;
key_tot_len
=
key
->
length
;
for
(;
key_part
!=
end
;
key_part
++
)
for
(
j
=
0
;
j
<=
1
;
j
++
)
{
field
=
key_part
->
field
;
key_len
=
key_part
->
length
;
key_store_len
=
key_part
->
store_length
;
key_nullable
=
(
bool
)
key_part
->
null_bit
;
key_null
=
(
field
->
maybe_null
()
&&
*
key_ptr
);
tot_len
+=
key_store_len
;
const
char
*
bounds
[]
=
{
"LE"
,
"LT"
,
"GE"
,
"GT"
,
"EQ"
};
DBUG_ASSERT
(
bound
>=
0
&&
bound
<=
4
);
DBUG_PRINT
(
"info"
,
(
"Set Bound%s on %s %s %s"
,
bounds
[
bound
],
field
->
field_name
,
key_nullable
?
"NULLABLE"
:
""
,
key_null
?
"NULL"
:
""
));
DBUG_PRINT
(
"info"
,
(
"Total length %d"
,
tot_len
));
DBUG_DUMP
(
"key"
,
(
char
*
)
key_ptr
,
key_store_len
);
if
(
op
->
setBound
(
field
->
field_name
,
bound
,
key_null
?
0
:
(
key_nullable
?
key_ptr
+
1
:
key_ptr
),
key_null
?
0
:
key_len
)
!=
0
)
ERR_RETURN
(
op
->
getNdbError
());
key_ptr
+=
key_store_len
;
const
key_range
*
key
=
keys
[
j
];
if
(
key
!=
NULL
)
{
// for key->flag see ha_rkey_function
DBUG_PRINT
(
"info"
,
(
"key %d length=%d flag=%d"
,
j
,
key
->
length
,
key
->
flag
));
key_tot_len
[
j
]
=
key
->
length
;
}
else
{
DBUG_PRINT
(
"info"
,
(
"key %d not present"
,
j
));
key_tot_len
[
j
]
=
0
;
}
}
tot_len
=
0
;
if
(
tot_len
>=
key_tot_len
)
for
(
i
=
0
;
i
<
key_parts
;
i
++
)
{
KEY_PART_INFO
*
key_part
=
&
key_info
->
key_part
[
i
];
Field
*
field
=
key_part
->
field
;
uint
part_len
=
key_part
->
length
;
uint
part_store_len
=
key_part
->
store_length
;
bool
part_nullable
=
(
bool
)
key_part
->
null_bit
;
// Info about each key part
struct
part_st
{
bool
part_last
;
const
key_range
*
key
;
const
byte
*
part_ptr
;
bool
part_null
;
int
bound_type
;
const
char
*
bound_ptr
;
};
struct
part_st
part
[
2
];
for
(
j
=
0
;
j
<=
1
;
j
++
)
{
struct
part_st
&
p
=
part
[
j
];
p
.
key
=
NULL
;
p
.
bound_type
=
-
1
;
if
(
tot_len
<
key_tot_len
[
j
])
{
p
.
part_last
=
(
tot_len
+
part_store_len
>=
key_tot_len
[
j
]);
p
.
key
=
keys
[
j
];
p
.
part_ptr
=
&
p
.
key
->
key
[
tot_len
];
p
.
part_null
=
(
field
->
maybe_null
()
&&
*
p
.
part_ptr
);
p
.
bound_ptr
=
(
const
char
*
)
p
.
part_null
?
0
:
part_nullable
?
p
.
part_ptr
+
1
:
p
.
part_ptr
;
if
(
j
==
0
)
{
switch
(
p
.
key
->
flag
)
{
case
HA_READ_KEY_EXACT
:
p
.
bound_type
=
NdbIndexScanOperation
::
BoundEQ
;
break
;
/*
Only one bound which is not EQ can be set
so if this bound was not EQ, bail out and make
a best effort attempt
*/
if
(
bound
!=
NdbIndexScanOperation
::
BoundEQ
)
case
HA_READ_KEY_OR_NEXT
:
p
.
bound_type
=
NdbIndexScanOperation
::
BoundLE
;
break
;
case
HA_READ_AFTER_KEY
:
if
(
!
p
.
part_last
)
p
.
bound_type
=
NdbIndexScanOperation
::
BoundLE
;
else
p
.
bound_type
=
NdbIndexScanOperation
::
BoundLT
;
break
;
default:
break
;
}
}
if
(
j
==
1
)
{
switch
(
p
.
key
->
flag
)
{
case
HA_READ_BEFORE_KEY
:
if
(
!
p
.
part_last
)
p
.
bound_type
=
NdbIndexScanOperation
::
BoundGE
;
else
p
.
bound_type
=
NdbIndexScanOperation
::
BoundGT
;
break
;
case
HA_READ_AFTER_KEY
:
// weird
p
.
bound_type
=
NdbIndexScanOperation
::
BoundGE
;
break
;
default:
break
;
}
}
if
(
p
.
bound_type
==
-
1
)
{
DBUG_PRINT
(
"error"
,
(
"key %d unknown flag %d"
,
j
,
p
.
key
->
flag
));
DBUG_ASSERT
(
false
);
// Stop setting bounds but continue with what we have
DBUG_RETURN
(
0
);
}
#ifndef DBUG_OFF
const
char
*
key_flag_strs
[]
=
{
"HA_READ_KEY_EXACT"
,
"HA_READ_KEY_OR_NEXT"
,
"HA_READ_KEY_OR_PREV"
,
"HA_READ_AFTER_KEY"
,
"HA_READ_BEFORE_KEY"
,
"HA_READ_PREFIX"
,
"HA_READ_PREFIX_LAST"
,
"HA_READ_PREFIX_LAST_OR_PREV"
,
"HA_READ_MBR_CONTAIN"
,
"HA_READ_MBR_INTERSECT"
,
"HA_READ_MBR_WITHIN"
,
"HA_READ_MBR_DISJOINT"
,
"HA_READ_MBR_EQUAL"
};
}
}
}
const
int
no_of_key_flags
=
sizeof
(
key_flag_strs
)
/
sizeof
(
char
*
);
// Seen with e.g. b = 1 and c > 1
if
(
part
[
0
].
bound_type
==
NdbIndexScanOperation
::
BoundLE
&&
part
[
1
].
bound_type
==
NdbIndexScanOperation
::
BoundGE
&&
memcmp
(
part
[
0
].
part_ptr
,
part
[
1
].
part_ptr
,
part_store_len
)
==
0
)
{
DBUG_PRINT
(
"info"
,
(
"replace LE/GE pair by EQ"
));
part
[
0
].
bound_type
=
NdbIndexScanOperation
::
BoundEQ
;
part
[
1
].
bound_type
=
-
1
;
}
// Not seen but was in previous version
if
(
part
[
0
].
bound_type
==
NdbIndexScanOperation
::
BoundEQ
&&
part
[
1
].
bound_type
==
NdbIndexScanOperation
::
BoundGE
&&
memcmp
(
part
[
0
].
part_ptr
,
part
[
1
].
part_ptr
,
part_store_len
)
==
0
)
{
DBUG_PRINT
(
"info"
,
(
"remove GE from EQ/GE pair"
));
part
[
1
].
bound_type
=
-
1
;
}
void
print_key
(
const
key_range
*
key
,
const
char
*
info
)
{
if
(
key
)
for
(
j
=
0
;
j
<=
1
;
j
++
)
{
struct
part_st
&
p
=
part
[
j
];
// Set bound if not done with this key
if
(
p
.
key
!=
NULL
)
{
const
char
*
str
=
key
->
flag
<
no_of_key_flags
?
key_flag_strs
[
key
->
flag
]
:
"Unknown flag"
;
DBUG_PRINT
(
"info"
,
(
"key %d:%d offset=%d length=%d last=%d bound=%d"
,
j
,
i
,
tot_len
,
part_len
,
p
.
part_last
,
p
.
bound_type
));
DBUG_DUMP
(
"info"
,
(
const
char
*
)
p
.
part_ptr
,
part_store_len
);
DBUG_LOCK_FILE
;
fprintf
(
DBUG_FILE
,
"%s: %s, length=%d, key="
,
info
,
str
,
key
->
length
);
uint
i
;
for
(
i
=
0
;
i
<
key
->
length
-
1
;
i
++
)
fprintf
(
DBUG_FILE
,
"%0d "
,
key
->
key
[
i
]);
fprintf
(
DBUG_FILE
,
"
\n
"
);
DBUG_UNLOCK_FILE
;
// Set bound if not cancelled via type -1
if
(
p
.
bound_type
!=
-
1
)
if
(
op
->
setBound
(
field
->
field_name
,
p
.
bound_type
,
p
.
bound_ptr
))
ERR_RETURN
(
op
->
getNdbError
());
}
}
return
;
tot_len
+=
part_store_len
;
}
DBUG_RETURN
(
0
);
}
#endif
/*
Start ordered index scan in NDB
...
...
@@ -1348,10 +1392,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_PRINT
(
"enter"
,
(
"index: %u, sorted: %d"
,
active_index
,
sorted
));
DBUG_PRINT
(
"enter"
,
(
"Starting new ordered scan on %s"
,
m_tabname
));
DBUG_EXECUTE
(
"enter"
,
print_key
(
start_key
,
"start_key"
););
DBUG_EXECUTE
(
"enter"
,
print_key
(
end_key
,
"end_key"
););
if
(
m_active_cursor
==
0
)
if
(
m_active_cursor
==
0
)
{
restart
=
false
;
NdbOperation
::
LockMode
lm
=
...
...
@@ -1373,28 +1414,14 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_RETURN
(
ndb_err
(
m_active_trans
));
}
if
(
start_key
&&
set_bounds
(
op
,
start_key
,
(
start_key
->
flag
==
HA_READ_KEY_EXACT
)
?
NdbIndexScanOperation
::
BoundEQ
:
(
start_key
->
flag
==
HA_READ_AFTER_KEY
)
?
NdbIndexScanOperation
::
BoundLT
:
NdbIndexScanOperation
::
BoundLE
))
DBUG_RETURN
(
1
);
if
(
end_key
)
{
if
(
start_key
&&
start_key
->
flag
==
HA_READ_KEY_EXACT
)
{
DBUG_PRINT
(
"info"
,
(
"start_key is HA_READ_KEY_EXACT ignoring end_key"
));
}
else
if
(
set_bounds
(
op
,
end_key
,
(
end_key
->
flag
==
HA_READ_AFTER_KEY
)
?
NdbIndexScanOperation
::
BoundGE
:
NdbIndexScanOperation
::
BoundGT
))
DBUG_RETURN
(
1
);
const
key_range
*
keys
[
2
]
=
{
start_key
,
end_key
};
int
ret
=
set_bounds
(
op
,
keys
);
if
(
ret
)
DBUG_RETURN
(
ret
);
}
if
(
!
restart
)
if
(
!
restart
)
{
DBUG_RETURN
(
define_read_attrs
(
buf
,
op
));
}
...
...
sql/ha_ndbcluster.h
View file @
8920cff4
...
...
@@ -214,8 +214,7 @@ class ha_ndbcluster: public handler
int
set_primary_key
(
NdbOperation
*
op
,
const
byte
*
key
);
int
set_primary_key
(
NdbOperation
*
op
);
int
set_primary_key_from_old_data
(
NdbOperation
*
op
,
const
byte
*
old_data
);
int
set_bounds
(
NdbIndexScanOperation
*
ndb_op
,
const
key_range
*
key
,
int
bound
);
int
set_bounds
(
NdbIndexScanOperation
*
ndb_op
,
const
key_range
*
keys
[
2
]);
int
key_cmp
(
uint
keynr
,
const
byte
*
old_row
,
const
byte
*
new_row
);
void
print_results
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment