Commit cb0a081e authored by unknown's avatar unknown

NDB wl-2151 Fix bounds setting table handler vs TUX


mysql-test/ndb/ndb_range_bounds.pl:
  wl-2151 Fix bounds setting table handler vs TUX
ndb/include/kernel/signaldata/TuxBound.hpp:
  wl-2151 Fix bounds setting table handler vs TUX
ndb/include/ndbapi/NdbIndexScanOperation.hpp:
  wl-2151 Fix bounds setting table handler vs TUX
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  wl-2151 Fix bounds setting table handler vs TUX
ndb/src/kernel/blocks/dbtux/DbtuxCmp.cpp:
  wl-2151 Fix bounds setting table handler vs TUX
ndb/src/kernel/blocks/dbtux/DbtuxScan.cpp:
  wl-2151 Fix bounds setting table handler vs TUX
ndb/test/ndbapi/testOIBasic.cpp:
  wl-2151 Fix bounds setting table handler vs TUX
sql/ha_ndbcluster.cc:
  wl-2151 Fix bounds setting table handler vs TUX
sql/ha_ndbcluster.h:
  wl-2151 Fix bounds setting table handler vs TUX
parent 83f1a0fe
#
# test range scan bounds
# output to mysql-test/t/ndb_range_bounds.test
#
# give option --all to generate all cases
#
use strict;
use integer;
my $all = shift;
!defined($all) || ($all eq '--all' && !defined(shift))
or die "only available option is --all";
my $table = 't';
print <<EOF;
--source include/have_ndb.inc
--disable_warnings
drop table if exists $table;
--enable_warnings
# test range scan bounds
# generated by mysql-test/ndb/ndb_range_bounds.pl
# all selects must return 0
EOF
sub cut ($$@) {
my($op, $key, @v) = @_;
$op = '==' if $op eq '=';
my(@w);
eval "\@w = grep(\$_ $op $key, \@v)";
$@ and die $@;
return @w;
}
sub mkdummy (\@) {
my ($val) = @_;
return {
'dummy' => 1,
'exp' => '9 = 9',
'cnt' => scalar @$val,
};
}
sub mkone ($$$\@) {
my($col, $op, $key, $val) = @_;
my $cnt = scalar cut($op, $key, @$val);
return {
'exp' => "$col $op $key",
'cnt' => $cnt,
};
}
sub mktwo ($$$$$\@) {
my($col, $op1, $key1, $op2, $key2, $val) = @_;
my $cnt = scalar cut($op2, $key2, cut($op1, $key1, @$val));
return {
'exp' => "$col $op1 $key1 and $col $op2 $key2",
'cnt' => $cnt,
};
}
sub mkall ($$$\@) {
my($col, $key1, $key2, $val) = @_;
my @a = ();
my $p = mkdummy(@$val);
push(@a, $p) if $all;
my @ops1 = $all ? qw(< <= = >= >) : qw(= >= >);
my @ops2 = $all ? qw(< <= = >= >) : qw(< <=);
for my $op1 (@ops1) {
my $p = mkone($col, $op1, $key1, @$val);
push(@a, $p) if $all || $p->{cnt} != 0;
for my $op2 (@ops2) {
my $p = mktwo($col, $op1, $key1, $op2, $key2, @$val);
push(@a, $p) if $all || $p->{cnt} != 0;
}
}
return \@a;
}
for my $nn ("bcd", "") {
my %nn;
for my $x (qw(b c d)) {
$nn{$x} = $nn =~ /$x/ ? "not null" : "null";
}
print <<EOF;
create table $table (
a int primary key,
b int $nn{b},
c int $nn{c},
d int $nn{d},
index (b, c, d)
) engine=ndb;
EOF
my @val = (0..4);
my $v0 = 0;
for my $v1 (@val) {
for my $v2 (@val) {
for my $v3 (@val) {
print "insert into $table values($v0, $v1, $v2, $v3);\n";
$v0++;
}
}
}
my $key1 = 1;
my $key2 = 3;
my $a1 = mkall('b', $key1, $key2, @val);
my $a2 = mkall('c', $key1, $key2, @val);
my $a3 = mkall('d', $key1, $key2, @val);
for my $p1 (@$a1) {
my $cnt1 = $p1->{cnt} * @val * @val;
print "select count(*) - $cnt1 from $table";
print " where $p1->{exp};\n";
for my $p2 (@$a2) {
my $cnt2 = $p1->{cnt} * $p2->{cnt} * @val;
print "select count(*) - $cnt2 from $table";
print " where $p1->{exp} and $p2->{exp};\n";
for my $p3 (@$a3) {
my $cnt3 = $p1->{cnt} * $p2->{cnt} * $p3->{cnt};
print "select count(*) - $cnt3 from $table";
print " where $p1->{exp} and $p2->{exp} and $p3->{exp};\n";
}
}
}
print <<EOF;
drop table $table;
EOF
}
# vim: set sw=2:
...@@ -48,7 +48,6 @@ private: ...@@ -48,7 +48,6 @@ private:
Uint32 tuxScanPtrI; Uint32 tuxScanPtrI;
/* /*
* Number of words of bound info included after fixed signal data. * Number of words of bound info included after fixed signal data.
* Starts with 5 unused words (word 0 is length used by LQH).
*/ */
Uint32 boundAiLength; Uint32 boundAiLength;
}; };
......
...@@ -55,28 +55,12 @@ public: ...@@ -55,28 +55,12 @@ public:
return readTuples(LM_Exclusive, 0, parallell, false); return readTuples(LM_Exclusive, 0, parallell, false);
} }
/**
* @name Define Range Scan
*
* A range scan is a scan on an ordered index. The operation is on
* the index table but tuples are returned from the primary table.
* The index contains all tuples where at least one index key has not
* null value.
*
* A range scan is currently opened via a normal open scan method.
* Bounds can be defined for each index key. After setting bounds,
* usual scan methods can be used (get value, interpreter, take over).
* These operate on the primary table.
*
* @{
*/
/** /**
* Type of ordered index key bound. The values (0-4) will not change * Type of ordered index key bound. The values (0-4) will not change
* and can be used explicitly (e.g. they could be computed). * and can be used explicitly (e.g. they could be computed).
*/ */
enum BoundType { enum BoundType {
BoundLE = 0, ///< lower bound, BoundLE = 0, ///< lower bound
BoundLT = 1, ///< lower bound, strict BoundLT = 1, ///< lower bound, strict
BoundGE = 2, ///< upper bound BoundGE = 2, ///< upper bound
BoundGT = 3, ///< upper bound, strict BoundGT = 3, ///< upper bound, strict
...@@ -86,20 +70,28 @@ public: ...@@ -86,20 +70,28 @@ public:
/** /**
* Define bound on index key in range scan. * Define bound on index key in range scan.
* *
* Each index key can have lower and/or upper bound, or can be set * Each index key can have lower and/or upper bound. Setting the key
* equal to a value. The bounds can be defined in any order but * equal to a value defines both upper and lower bounds. The bounds
* a duplicate definition is an error. * can be defined in any order. Conflicting definitions is an error.
*
* For equality, it is better to use BoundEQ instead of the equivalent
* pair of BoundLE and BoundGE. This is especially true when table
* distribution key is an initial part of the index key.
* *
* The bounds must specify a single range i.e. they are on an initial * The sets of lower and upper bounds must be on initial sequences of
* sequence of index keys and the condition is equality for all but * index keys. All but possibly the last bound must be non-strict.
* (at most) the last key which has a lower and/or upper bound. * So "a >= 2 and b > 3" is ok but "a > 2 and b >= 3" is not.
*
* The scan may currently return tuples for which the bounds are not
* satisfied. For example, "a <= 2 and b <= 3" scans the index up to
* (a=2, b=3) but also returns any (a=1, b=4).
* *
* NULL is treated like a normal value which is less than any not-NULL * NULL is treated like a normal value which is less than any not-NULL
* value and equal to another NULL value. To search for NULL use * value and equal to another NULL value. To compare against NULL use
* setBound with null pointer (0). * setBound with null pointer (0).
* *
* An index stores also all-NULL keys (this may become optional). * An index stores also all-NULL keys. Doing index scan with empty
* Doing index scan with empty bound set returns all table tuples. * bound set returns all table tuples.
* *
* @param attrName Attribute name, alternatively: * @param attrName Attribute name, alternatively:
* @param anAttrId Index column id (starting from 0) * @param anAttrId Index column id (starting from 0)
...@@ -117,8 +109,6 @@ public: ...@@ -117,8 +109,6 @@ public:
*/ */
int setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len = 0); int setBound(Uint32 anAttrId, int type, const void* aValue, Uint32 len = 0);
/** @} *********************************************************************/
/** /**
* Reset bounds and put operation in list that will be * Reset bounds and put operation in list that will be
* sent on next execute * sent on next execute
......
...@@ -7683,7 +7683,6 @@ void Dblqh::accScanConfScanLab(Signal* signal) ...@@ -7683,7 +7683,6 @@ void Dblqh::accScanConfScanLab(Signal* signal)
Uint32 boundAiLength = tcConnectptr.p->primKeyLen - 4; Uint32 boundAiLength = tcConnectptr.p->primKeyLen - 4;
if (scanptr.p->rangeScan) { if (scanptr.p->rangeScan) {
jam(); jam();
// bound info length is in first of the 5 header words
TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend(); TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
req->errorCode = RNIL; req->errorCode = RNIL;
req->tuxScanPtrI = scanptr.p->scanAccPtr; req->tuxScanPtrI = scanptr.p->scanAccPtr;
......
...@@ -87,21 +87,23 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons ...@@ -87,21 +87,23 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
/* /*
* Scan bound vs node prefix or entry. * Scan bound vs node prefix or entry.
* *
* Compare lower or upper bound and index attribute data. The attribute * Compare lower or upper bound and index entry data. The entry data
* data may be partial in which case CmpUnknown may be returned. * may be partial in which case CmpUnknown may be returned. Otherwise
* Returns -1 if the boundary is to the left of the compared key and +1 * returns -1 if the bound is to the left of the entry and +1 if the
* if the boundary is to the right of the compared key. * bound is to the right of the entry.
* *
* To get this behaviour we treat equality a little bit special. If the * The routine is similar to cmpSearchKey, but 0 is never returned.
* boundary is a lower bound then the boundary is to the left of all * Suppose all attributes compare equal. Recall that all bounds except
* equal keys and if it is an upper bound then the boundary is to the * possibly the last one are non-strict. Use the given bound direction
* right of all equal keys. * (0-lower 1-upper) and strictness of last bound to return -1 or +1.
* *
* When searching for the first key we are using the lower bound to try * Following example illustrates this. We are at (a=2, b=3).
* to find the first key that is to the right of the boundary. Then we *
* start scanning from this tuple (including the tuple itself) until we * dir bounds strict return
* find the first key which is to the right of the boundary. Then we * 0 a >= 2 and b >= 3 no -1
* stop and do not include that key in the scan result. * 0 a >= 2 and b > 3 yes +1
* 1 a <= 2 and b <= 3 no +1
* 1 a <= 2 and b < 3 yes -1
*/ */
int int
Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen) Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigned boundCount, ConstData entryData, unsigned maxlen)
...@@ -111,12 +113,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ...@@ -111,12 +113,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
ndbrequire(dir <= 1); ndbrequire(dir <= 1);
// number of words of data left // number of words of data left
unsigned len2 = maxlen; unsigned len2 = maxlen;
/* // in case of no bounds, init last type to something non-strict
* No boundary means full scan, low boundary is to the right of all
* keys. Thus we should always return -1. For upper bound we are to
* the right of all keys, thus we should always return +1. We achieve
* this behaviour by initializing type to 4.
*/
unsigned type = 4; unsigned type = 4;
while (boundCount != 0) { while (boundCount != 0) {
if (len2 <= AttributeHeaderSize) { if (len2 <= AttributeHeaderSize) {
...@@ -124,7 +121,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ...@@ -124,7 +121,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
return NdbSqlUtil::CmpUnknown; return NdbSqlUtil::CmpUnknown;
} }
len2 -= AttributeHeaderSize; len2 -= AttributeHeaderSize;
// get and skip bound type // get and skip bound type (it is used after the loop)
type = boundInfo[0]; type = boundInfo[0];
boundInfo += 1; boundInfo += 1;
if (! boundInfo.ah().isNULL()) { if (! boundInfo.ah().isNULL()) {
...@@ -166,30 +163,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne ...@@ -166,30 +163,7 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned dir, ConstData boundInfo, unsigne
entryData += AttributeHeaderSize + entryData.ah().getDataSize(); entryData += AttributeHeaderSize + entryData.ah().getDataSize();
boundCount -= 1; boundCount -= 1;
} }
if (dir == 0) { // all attributes were equal
jam(); const int strict = (type & 0x1);
/* return (dir == 0 ? (strict == 0 ? -1 : +1) : (strict == 0 ? +1 : -1));
* Looking for the lower bound. If strict lower bound then the
* boundary is to the right of the compared key and otherwise (equal
* included in range) then the boundary is to the left of the key.
*/
if (type == 1) {
jam();
return +1;
}
return -1;
} else {
jam();
/*
* Looking for the upper bound. If strict upper bound then the
* boundary is to the left of all equal keys and otherwise (equal
* included in the range) then the boundary is to the right of all
* equal keys.
*/
if (type == 3) {
jam();
return -1;
}
return +1;
}
} }
...@@ -108,15 +108,23 @@ Dbtux::execACC_SCANREQ(Signal* signal) ...@@ -108,15 +108,23 @@ Dbtux::execACC_SCANREQ(Signal* signal)
/* /*
* Receive bounds for scan in single direct call. The bounds can arrive * Receive bounds for scan in single direct call. The bounds can arrive
* in any order. Attribute ids are those of index table. * in any order. Attribute ids are those of index table.
*
* Replace EQ by equivalent LE + GE. Check for conflicting bounds.
* Check that sets of lower and upper bounds are on initial sequences of
* keys and that all but possibly last bound is non-strict.
*
* Finally save the sets of lower and upper bounds (i.e. start key and
* end key). Full bound type (< 4) is included but only the strict bit
* is used since lower and upper have now been separated.
*/ */
void void
Dbtux::execTUX_BOUND_INFO(Signal* signal) Dbtux::execTUX_BOUND_INFO(Signal* signal)
{ {
jamEntry(); jamEntry();
struct BoundInfo { struct BoundInfo {
int type;
unsigned offset; unsigned offset;
unsigned size; unsigned size;
int type;
}; };
TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend(); TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend();
const TuxBoundInfo reqCopy = *(const TuxBoundInfo*)sig; const TuxBoundInfo reqCopy = *(const TuxBoundInfo*)sig;
...@@ -124,18 +132,11 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal) ...@@ -124,18 +132,11 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
// get records // get records
ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI); ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI);
Index& index = *c_indexPool.getPtr(scan.m_indexId); Index& index = *c_indexPool.getPtr(scan.m_indexId);
// collect bound info for each index attribute // collect lower and upper bounds
BoundInfo boundInfo[MaxIndexAttributes][2]; BoundInfo boundInfo[2][MaxIndexAttributes];
// largest attrId seen plus one // largest attrId seen plus one
Uint32 maxAttrId = 0; Uint32 maxAttrId[2] = { 0, 0 };
// skip 5 words
unsigned offset = 0; unsigned offset = 0;
if (req->boundAiLength < offset) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
return;
}
const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength; const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
// walk through entries // walk through entries
while (offset + 2 <= req->boundAiLength) { while (offset + 2 <= req->boundAiLength) {
...@@ -156,32 +157,35 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal) ...@@ -156,32 +157,35 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
sig->errorCode = TuxBoundInfo::InvalidAttrInfo; sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
return; return;
} }
while (maxAttrId <= attrId) { for (unsigned j = 0; j <= 1; j++) {
BoundInfo* b = boundInfo[maxAttrId++]; // check if lower/upper bit matches
b[0].type = b[1].type = -1; const unsigned luBit = (j << 1);
} if ((type & 0x2) != luBit && type != 4)
BoundInfo* b = boundInfo[attrId]; continue;
if (type == 0 || type == 1 || type == 4) { // EQ -> LE, GE
if (b[0].type != -1) { const unsigned type2 = (type & 0x1) | luBit;
// fill in any gap
while (maxAttrId[j] <= attrId) {
BoundInfo& b = boundInfo[j][maxAttrId[j]++];
b.type = -1;
}
BoundInfo& b = boundInfo[j][attrId];
if (b.type != -1) {
// compare with previous bound
if (b.type != type2 ||
b.size != 2 + dataSize ||
memcmp(&data[b.offset + 2], &data[offset + 2], dataSize << 2) != 0) {
jam(); jam();
scan.m_state = ScanOp::Invalid; scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidBounds; sig->errorCode = TuxBoundInfo::InvalidBounds;
return; return;
} }
b[0].offset = offset; } else {
b[0].size = 2 + dataSize; // enter new bound
b[0].type = type; b.type = type2;
} b.offset = offset;
if (type == 2 || type == 3 || type == 4) { b.size = 2 + dataSize;
if (b[1].type != -1) {
jam();
scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidBounds;
return;
} }
b[1].offset = offset;
b[1].size = 2 + dataSize;
b[1].type = type;
} }
// jump to next // jump to next
offset += 2 + dataSize; offset += 2 + dataSize;
...@@ -192,34 +196,27 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal) ...@@ -192,34 +196,27 @@ Dbtux::execTUX_BOUND_INFO(Signal* signal)
sig->errorCode = TuxBoundInfo::InvalidAttrInfo; sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
return; return;
} }
// save the bounds in index attribute id order for (unsigned j = 0; j <= 1; j++) {
scan.m_boundCnt[0] = 0; // save lower/upper bound in index attribute id order
scan.m_boundCnt[1] = 0; for (unsigned i = 0; i < maxAttrId[j]; i++) {
for (unsigned i = 0; i < maxAttrId; i++) {
jam(); jam();
const BoundInfo* b = boundInfo[i]; const BoundInfo& b = boundInfo[j][i];
// current limitation - check all but last is equality // check for gap or strict bound before last
if (i + 1 < maxAttrId) { if (b.type == -1 || (i + 1 < maxAttrId[j] && (b.type & 0x1))) {
if (b[0].type != 4 || b[1].type != 4) {
jam(); jam();
scan.m_state = ScanOp::Invalid; scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::InvalidBounds; sig->errorCode = TuxBoundInfo::InvalidBounds;
return; return;
} }
} bool ok = scan.m_bound[j]->append(&data[b.offset], b.size);
for (unsigned j = 0; j <= 1; j++) {
if (b[j].type != -1) {
jam();
bool ok = scan.m_bound[j]->append(&data[b[j].offset], b[j].size);
if (! ok) { if (! ok) {
jam(); jam();
scan.m_state = ScanOp::Invalid; scan.m_state = ScanOp::Invalid;
sig->errorCode = TuxBoundInfo::OutOfBuffers; sig->errorCode = TuxBoundInfo::OutOfBuffers;
return; return;
} }
scan.m_boundCnt[j]++;
}
} }
scan.m_boundCnt[j] = maxAttrId[j];
} }
// no error // no error
sig->errorCode = 0; sig->errorCode = 0;
......
...@@ -1965,10 +1965,22 @@ BSet::calcpk(Par par, unsigned i) ...@@ -1965,10 +1965,22 @@ BSet::calcpk(Par par, unsigned i)
int int
BSet::setbnd(Par par) const BSet::setbnd(Par par) const
{ {
if (m_bvals != 0) {
unsigned p1 = urandom(m_bvals);
unsigned p2 = 10009; // prime
// random order
for (unsigned j = 0; j < m_bvals; j++) { for (unsigned j = 0; j < m_bvals; j++) {
const BVal& bval = *m_bval[j]; unsigned k = p1 + p2 * j;
const BVal& bval = *m_bval[k % m_bvals];
CHK(bval.setbnd(par) == 0);
}
// duplicate
if (urandom(5) == 0) {
unsigned k = urandom(m_bvals);
const BVal& bval = *m_bval[k];
CHK(bval.setbnd(par) == 0); CHK(bval.setbnd(par) == 0);
} }
}
return 0; return 0;
} }
......
...@@ -1222,114 +1222,158 @@ inline int ha_ndbcluster::next_result(byte *buf) ...@@ -1222,114 +1222,158 @@ inline int ha_ndbcluster::next_result(byte *buf)
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
} }
/* /*
Set bounds for a ordered index scan, use key_range Set bounds for ordered index scan.
*/ */
int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
const key_range *key, const key_range *keys[2])
int bound)
{ {
uint key_len, key_store_len, tot_len, key_tot_len; const KEY *const key_info= table->key_info + active_index;
byte *key_ptr; const uint key_parts= key_info->key_parts;
KEY* key_info= table->key_info + active_index; uint key_tot_len[2];
KEY_PART_INFO* key_part= key_info->key_part; uint tot_len;
KEY_PART_INFO* end= key_part+key_info->key_parts; int i, j;
Field* field;
bool key_nullable, key_null;
DBUG_ENTER("set_bounds"); DBUG_ENTER("set_bounds");
DBUG_PRINT("enter", ("bound: %d", bound)); DBUG_PRINT("info", ("key_parts=%d", key_parts));
DBUG_PRINT("enter", ("key_parts: %d", key_info->key_parts));
DBUG_PRINT("enter", ("key->length: %d", key->length));
DBUG_PRINT("enter", ("key->flag: %d", key->flag));
// Set bounds using key data for (j= 0; j <= 1; j++)
tot_len= 0;
key_ptr= (byte *) key->key;
key_tot_len= key->length;
for (; key_part != end; key_part++)
{ {
field= key_part->field; const key_range *key= keys[j];
key_len= key_part->length; if (key != NULL)
key_store_len= key_part->store_length; {
key_nullable= (bool) key_part->null_bit; // for key->flag see ha_rkey_function
key_null= (field->maybe_null() && *key_ptr); DBUG_PRINT("info", ("key %d length=%d flag=%d",
tot_len+= key_store_len; j, key->length, key->flag));
key_tot_len[j]= key->length;
const char* bounds[]= {"LE", "LT", "GE", "GT", "EQ"}; }
DBUG_ASSERT(bound >= 0 && bound <= 4); else
DBUG_PRINT("info", ("Set Bound%s on %s %s %s", {
bounds[bound], DBUG_PRINT("info", ("key %d not present", j));
field->field_name, key_tot_len[j]= 0;
key_nullable ? "NULLABLE" : "", }
key_null ? "NULL":"")); }
DBUG_PRINT("info", ("Total length %d", tot_len)); tot_len= 0;
DBUG_DUMP("key", (char*) key_ptr, key_store_len);
if (op->setBound(field->field_name,
bound,
key_null ? 0 : (key_nullable ? key_ptr + 1 : key_ptr),
key_null ? 0 : key_len) != 0)
ERR_RETURN(op->getNdbError());
key_ptr+= key_store_len;
if (tot_len >= key_tot_len) for (i= 0; i < key_parts; i++)
{
KEY_PART_INFO *key_part= &key_info->key_part[i];
Field *field= key_part->field;
uint part_len= key_part->length;
uint part_store_len= key_part->store_length;
bool part_nullable= (bool) key_part->null_bit;
// Info about each key part
struct part_st {
bool part_last;
const key_range *key;
const byte *part_ptr;
bool part_null;
int bound_type;
const char* bound_ptr;
};
struct part_st part[2];
for (j= 0; j <= 1; j++)
{
struct part_st &p = part[j];
p.key= NULL;
p.bound_type= -1;
if (tot_len < key_tot_len[j])
{
p.part_last= (tot_len + part_store_len >= key_tot_len[j]);
p.key= keys[j];
p.part_ptr= &p.key->key[tot_len];
p.part_null= (field->maybe_null() && *p.part_ptr);
p.bound_ptr= (const char *)
p.part_null ? 0 : part_nullable ? p.part_ptr + 1 : p.part_ptr;
if (j == 0)
{
switch (p.key->flag)
{
case HA_READ_KEY_EXACT:
p.bound_type= NdbIndexScanOperation::BoundEQ;
break; break;
case HA_READ_KEY_OR_NEXT:
/* p.bound_type= NdbIndexScanOperation::BoundLE;
Only one bound which is not EQ can be set break;
so if this bound was not EQ, bail out and make case HA_READ_AFTER_KEY:
a best effort attempt if (! p.part_last)
*/ p.bound_type= NdbIndexScanOperation::BoundLE;
if (bound != NdbIndexScanOperation::BoundEQ) else
p.bound_type= NdbIndexScanOperation::BoundLT;
break;
default:
break;
}
}
if (j == 1) {
switch (p.key->flag)
{
case HA_READ_BEFORE_KEY:
if (! p.part_last)
p.bound_type= NdbIndexScanOperation::BoundGE;
else
p.bound_type= NdbIndexScanOperation::BoundGT;
break;
case HA_READ_AFTER_KEY: // weird
p.bound_type= NdbIndexScanOperation::BoundGE;
break;
default:
break; break;
} }
}
if (p.bound_type == -1)
{
DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag));
DBUG_ASSERT(false);
// Stop setting bounds but continue with what we have
DBUG_RETURN(0); DBUG_RETURN(0);
} }
}
#ifndef DBUG_OFF }
const char* key_flag_strs[] =
{ "HA_READ_KEY_EXACT",
"HA_READ_KEY_OR_NEXT",
"HA_READ_KEY_OR_PREV",
"HA_READ_AFTER_KEY",
"HA_READ_BEFORE_KEY",
"HA_READ_PREFIX",
"HA_READ_PREFIX_LAST",
"HA_READ_PREFIX_LAST_OR_PREV",
"HA_READ_MBR_CONTAIN",
"HA_READ_MBR_INTERSECT",
"HA_READ_MBR_WITHIN",
"HA_READ_MBR_DISJOINT",
"HA_READ_MBR_EQUAL"
};
const int no_of_key_flags = sizeof(key_flag_strs)/sizeof(char*); // Seen with e.g. b = 1 and c > 1
if (part[0].bound_type == NdbIndexScanOperation::BoundLE &&
part[1].bound_type == NdbIndexScanOperation::BoundGE &&
memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
{
DBUG_PRINT("info", ("replace LE/GE pair by EQ"));
part[0].bound_type= NdbIndexScanOperation::BoundEQ;
part[1].bound_type= -1;
}
// Not seen but was in previous version
if (part[0].bound_type == NdbIndexScanOperation::BoundEQ &&
part[1].bound_type == NdbIndexScanOperation::BoundGE &&
memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
{
DBUG_PRINT("info", ("remove GE from EQ/GE pair"));
part[1].bound_type= -1;
}
void print_key(const key_range* key, const char* info) for (j= 0; j <= 1; j++)
{ {
if (key) struct part_st &p = part[j];
// Set bound if not done with this key
if (p.key != NULL)
{ {
const char* str= key->flag < no_of_key_flags ? DBUG_PRINT("info", ("key %d:%d offset=%d length=%d last=%d bound=%d",
key_flag_strs[key->flag] : "Unknown flag"; j, i, tot_len, part_len, p.part_last, p.bound_type));
DBUG_DUMP("info", (const char*)p.part_ptr, part_store_len);
DBUG_LOCK_FILE; // Set bound if not cancelled via type -1
fprintf(DBUG_FILE,"%s: %s, length=%d, key=", info, str, key->length); if (p.bound_type != -1)
uint i; if (op->setBound(field->field_name, p.bound_type, p.bound_ptr))
for (i=0; i<key->length-1; i++) ERR_RETURN(op->getNdbError());
fprintf(DBUG_FILE,"%0d ", key->key[i]); }
fprintf(DBUG_FILE, "\n");
DBUG_UNLOCK_FILE;
} }
return;
tot_len+= part_store_len;
}
DBUG_RETURN(0);
} }
#endif
/* /*
Start ordered index scan in NDB Start ordered index scan in NDB
...@@ -1348,10 +1392,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1348,10 +1392,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted)); DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted));
DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
DBUG_EXECUTE("enter", print_key(start_key, "start_key");); if (m_active_cursor == 0)
DBUG_EXECUTE("enter", print_key(end_key, "end_key"););
if(m_active_cursor == 0)
{ {
restart= false; restart= false;
NdbOperation::LockMode lm= NdbOperation::LockMode lm=
...@@ -1373,28 +1414,14 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1373,28 +1414,14 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_RETURN(ndb_err(m_active_trans)); DBUG_RETURN(ndb_err(m_active_trans));
} }
if (start_key &&
set_bounds(op, start_key,
(start_key->flag == HA_READ_KEY_EXACT) ?
NdbIndexScanOperation::BoundEQ :
(start_key->flag == HA_READ_AFTER_KEY) ?
NdbIndexScanOperation::BoundLT :
NdbIndexScanOperation::BoundLE))
DBUG_RETURN(1);
if (end_key)
{ {
if (start_key && start_key->flag == HA_READ_KEY_EXACT) const key_range *keys[2]= { start_key, end_key };
{ int ret= set_bounds(op, keys);
DBUG_PRINT("info", ("start_key is HA_READ_KEY_EXACT ignoring end_key")); if (ret)
} DBUG_RETURN(ret);
else if (set_bounds(op, end_key,
(end_key->flag == HA_READ_AFTER_KEY) ?
NdbIndexScanOperation::BoundGE :
NdbIndexScanOperation::BoundGT))
DBUG_RETURN(1);
} }
if(!restart)
if (!restart)
{ {
DBUG_RETURN(define_read_attrs(buf, op)); DBUG_RETURN(define_read_attrs(buf, op));
} }
......
...@@ -214,8 +214,7 @@ class ha_ndbcluster: public handler ...@@ -214,8 +214,7 @@ class ha_ndbcluster: public handler
int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key(NdbOperation *op); int set_primary_key(NdbOperation *op);
int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data); int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data);
int set_bounds(NdbIndexScanOperation *ndb_op, const key_range *key, int set_bounds(NdbIndexScanOperation *ndb_op, const key_range *keys[2]);
int bound);
int key_cmp(uint keynr, const byte * old_row, const byte * new_row); int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
void print_results(); void print_results();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment