Commit 5259a1cb authored by unknown's avatar unknown

ndb: wl-1893: range scanning backwards, handler


mysql-test/ndb/ndb_range_bounds.pl:
  test ordered scan
mysql-test/r/ndb_index_ordered.result:
  test ordered scan
mysql-test/t/ndb_index_ordered.test:
  test ordered scan
sql/ha_ndbcluster.cc:
  wl-1893: range scanning backwards, handler
sql/ha_ndbcluster.h:
  wl-1893: range scanning backwards, handler
parent 2dab2c1e
# #
# test range scan bounds # test range scan bounds
# output to mysql-test/t/ndb_range_bounds.test # give option --all to test all cases
#
# give option --all to generate all cases
# #
use strict; use strict;
use integer; use integer;
use Getopt::Long; use Getopt::Long;
use DBI;
my $opt_all = 0; my $opt_all = 0;
my $opt_cnt = 5; my $opt_cnt = 5;
GetOptions("all" => \$opt_all, "cnt=i" => \$opt_cnt) my $opt_verbose = 0;
or die "options are: --all --cnt=N"; GetOptions("all" => \$opt_all, "cnt=i" => \$opt_cnt, "verbose" => \$opt_verbose)
or die "options are: --all --cnt=N --verbose";
my $table = 't'; my $mysql_top = $ENV{MYSQL_TOP};
my $dsn = "dbi:mysql:database=test;host=localhost;mysql_read_default_file=$mysql_top/.target/var/my.cnf";
my $opts = { RaiseError => 0, PrintError => 0, AutoCommit => 1, };
print <<EOF; my $dbh;
--source include/have_ndb.inc my $sth;
my $sql;
--disable_warnings $dbh = DBI->connect($dsn, "root", undef, $opts) or die $DBI::errstr;
drop table if exists $table;
--enable_warnings
# test range scan bounds my $table = 't';
# generated by mysql-test/ndb/ndb_range_bounds.pl
# all selects must return 0
EOF $sql = "drop table if exists $table";
$dbh->do($sql) or die $DBI::errstr;
sub cut ($$@) { sub cut ($$$) {
my($op, $key, @v) = @_; my($op, $key, $val) = @_;
$op = '==' if $op eq '='; $op = '==' if $op eq '=';
my(@w); my(@w) = @$val;
eval "\@w = grep(\$_ $op $key, \@v)"; eval "\@w = grep(\$_ $op $key, \@w)";
$@ and die $@; $@ and die $@;
return @w; return [ @w ];
} }
sub mkdummy (\@) { sub mkdummy ($) {
my ($val) = @_; my ($val) = @_;
return { return {
'dummy' => 1, 'dummy' => 1,
'exp' => '9 = 9', 'exp' => '9 = 9',
'cnt' => scalar @$val, 'res' => $val,
}; };
} }
sub mkone ($$$\@) { sub mkone ($$$$) {
my($col, $op, $key, $val) = @_; my($col, $op, $key, $val) = @_;
my $cnt = scalar cut($op, $key, @$val); my $res = cut($op, $key, $val);
return { return {
'exp' => "$col $op $key", 'exp' => "$col $op $key",
'cnt' => $cnt, 'res' => $res,
}; };
} }
sub mktwo ($$$$$\@) { sub mktwo ($$$$$$) {
my($col, $op1, $key1, $op2, $key2, $val) = @_; my($col, $op1, $key1, $op2, $key2, $val) = @_;
my $cnt = scalar cut($op2, $key2, cut($op1, $key1, @$val)); my $res = cut($op2, $key2, cut($op1, $key1, $val));
return { return {
'exp' => "$col $op1 $key1 and $col $op2 $key2", 'exp' => "$col $op1 $key1 and $col $op2 $key2",
'cnt' => $cnt, 'res' => $res,
}; };
} }
sub mkall ($$$\@) { sub mkall ($$$$) {
my($col, $key1, $key2, $val) = @_; my($col, $key1, $key2, $val) = @_;
my @a = (); my @a = ();
my $p = mkdummy(@$val); my $p = mkdummy($val);
push(@a, $p) if $opt_all; push(@a, $p) if $opt_all;
my @ops = qw(< <= = >= >); my @ops = qw(< <= = >= >);
for my $op (@ops) { for my $op (@ops) {
my $p = mkone($col, $op, $key1, @$val); my $p = mkone($col, $op, $key1, $val);
push(@a, $p) if $opt_all || $p->{cnt} != 0; push(@a, $p) if $opt_all || @{$p->{res}} != 0;
} }
my @ops1 = $opt_all ? @ops : qw(= >= >); my @ops1 = $opt_all ? @ops : qw(= >= >);
my @ops2 = $opt_all ? @ops : qw(<= <); my @ops2 = $opt_all ? @ops : qw(<= <);
for my $op1 (@ops1) { for my $op1 (@ops1) {
for my $op2 (@ops2) { for my $op2 (@ops2) {
my $p = mktwo($col, $op1, $key1, $op2, $key2, @$val); my $p = mktwo($col, $op1, $key1, $op2, $key2, $val);
push(@a, $p) if $opt_all || $p->{cnt} != 0; push(@a, $p) if $opt_all || @{$p->{res}} != 0;
} }
} }
warn scalar(@a)." cases\n" if $opt_verbose;
return \@a; return \@a;
} }
my $casecnt = 0;
sub verify ($$$) {
my($sql, $ord, $res) = @_;
warn "$sql\n" if $opt_verbose;
$sth = $dbh->prepare($sql) or die "prepare: $sql: $DBI::errstr";
$sth->execute() or die "execute: $sql: $DBI::errstr";
#
# BUG: execute can return success on error so check again
#
$sth->err and die "execute: $sql: $DBI::errstr";
my @out = ();
for my $b (@{$res->[0]}) {
for my $c (@{$res->[1]}) {
for my $d (@{$res->[2]}) {
push(@out, [$b, $c, $d]);
}
}
}
if ($ord) {
@out = sort {
$ord * ($a->[0] - $b->[0]) ||
$ord * ($a->[1] - $b->[1]) ||
$ord * ($a->[2] - $b->[2]) ||
0
} @out;
}
my $cnt = scalar @out;
my $n = 0;
while (1) {
my $row = $sth->fetchrow_arrayref;
$row || last;
@$row == 3 or die "bad row: $sql: @$row";
for my $v (@$row) {
$v =~ s/^\s+|\s+$//g;
$v =~ /^\d+$/ or die "bad value: $sql: $v";
}
if ($ord) {
my $out = $out[$n];
$row->[0] == $out->[0] &&
$row->[1] == $out->[1] &&
$row->[2] == $out->[2] or
die "$sql: row $n: got row @$row != @$out";
}
$n++;
}
$sth->err and die "fetch: $sql: $DBI::errstr";
$n == $cnt or die "verify: $sql: got row count $n != $cnt";
$casecnt++;
}
for my $nn ("bcd", "") { for my $nn ("bcd", "") {
my %nn; my %nn;
for my $x (qw(b c d)) { for my $x (qw(b c d)) {
$nn{$x} = $nn =~ /$x/ ? "not null" : "null"; $nn{$x} = $nn =~ /$x/ ? "not null" : "null";
} }
print <<EOF; warn "create table\n";
$sql = <<EOF;
create table $table ( create table $table (
a int primary key, a int primary key,
b int $nn{b}, b int $nn{b},
c int $nn{c}, c int $nn{c},
d int $nn{d}, d int $nn{d},
index (b, c, d) index (b, c, d)
) engine=ndb; ) engine=ndb
EOF EOF
$dbh->do($sql) or die $DBI::errstr;
warn "insert\n";
$sql = "insert into $table values(?, ?, ?, ?)";
$sth = $dbh->prepare($sql) or die $DBI::errstr;
my @val = (0..($opt_cnt-1)); my @val = (0..($opt_cnt-1));
my $v0 = 0; my $v0 = 0;
for my $v1 (@val) { for my $v1 (@val) {
for my $v2 (@val) { for my $v2 (@val) {
for my $v3 (@val) { for my $v3 (@val) {
print "insert into $table values($v0, $v1, $v2, $v3);\n"; $sth->bind_param(1, $v0) or die $DBI::errstr;
$sth->bind_param(2, $v1) or die $DBI::errstr;
$sth->bind_param(3, $v2) or die $DBI::errstr;
$sth->bind_param(4, $v3) or die $DBI::errstr;
$sth->execute or die $DBI::errstr;
$v0++; $v0++;
} }
} }
} }
warn "generate cases\n";
my $key1 = 1; my $key1 = 1;
my $key2 = 3; my $key2 = 3;
my $a1 = mkall('b', $key1, $key2, @val); my $a1 = mkall('b', $key1, $key2, \@val);
my $a2 = mkall('c', $key1, $key2, @val); my $a2 = mkall('c', $key1, $key2, \@val);
my $a3 = mkall('d', $key1, $key2, @val); my $a3 = mkall('d', $key1, $key2, \@val);
warn "select\n";
for my $ord (0, +1, -1) {
my $orderby =
$ord == 0 ? "" :
$ord == +1 ? " order by b, c, d" :
$ord == -1 ? " order by b desc, c desc, d desc" : die "not here";
for my $p1 (@$a1) { for my $p1 (@$a1) {
my $cnt1 = $p1->{cnt} * @val * @val; my $res = [ $p1->{res}, \@val, \@val ];
print "select count(*) - $cnt1 from $table"; $sql = "select b, c, d from $table" .
print " where $p1->{exp};\n"; " where $p1->{exp}" .
$orderby;
verify($sql, $ord, $res);
for my $p2 (@$a2) { for my $p2 (@$a2) {
my $cnt2 = $p1->{cnt} * $p2->{cnt} * @val; my $res = [ $p1->{res}, $p2->{res}, \@val ];
print "select count(*) - $cnt2 from $table"; $sql = "select b, c, d from $table" .
print " where $p1->{exp} and $p2->{exp};\n"; " where $p1->{exp} and $p2->{exp}" .
$orderby;
verify($sql, $ord, $res);
for my $p3 (@$a3) { for my $p3 (@$a3) {
my $cnt3 = $p1->{cnt} * $p2->{cnt} * $p3->{cnt}; my $res = [ $p1->{res}, $p2->{res}, $p3->{res} ];
print "select count(*) - $cnt3 from $table"; $sql = "select b, c, d from $table" .
print " where $p1->{exp} and $p2->{exp} and $p3->{exp};\n"; " where $p1->{exp} and $p2->{exp} and $p3->{exp}" .
$orderby;
verify($sql, $ord, $res);
} }
} }
} }
print <<EOF; }
drop table $table; warn "drop table\n";
EOF $sql = "drop table $table";
$dbh->do($sql) or die $DBI::errstr;
} }
warn "verified $casecnt cases\n";
warn "done\n";
# vim: set sw=2: # vim: set sw=2:
...@@ -275,6 +275,114 @@ a b c ...@@ -275,6 +275,114 @@ a b c
1 1 1 1 1 1
4 4 NULL 4 4 NULL
drop table t1; drop table t1;
create table t1 (
a int unsigned primary key,
b int unsigned,
c char(10),
key bc (b, c)
) engine=ndb;
insert into t1 values(1,1,'a'),(2,2,'b'),(3,3,'c'),(4,4,'d'),(5,5,'e');
insert into t1 select a*7,10*b,'f' from t1;
insert into t1 select a*13,10*b,'g' from t1;
insert into t1 select a*17,10*b,'h' from t1;
insert into t1 select a*19,10*b,'i' from t1;
insert into t1 select a*23,10*b,'j' from t1;
insert into t1 select a*29,10*b,'k' from t1;
select b, c from t1 where b <= 10 and c <'f' order by b, c;
b c
1 a
2 b
3 c
4 d
5 e
select b, c from t1 where b <= 10 and c <'f' order by b desc, c desc;
b c
5 e
4 d
3 c
2 b
1 a
select b, c from t1 where b=4000 and c<'k' order by b, c;
b c
4000 h
4000 i
4000 i
4000 i
4000 j
4000 j
4000 j
4000 j
4000 j
4000 j
select b, c from t1 where b=4000 and c<'k' order by b desc, c desc;
b c
4000 j
4000 j
4000 j
4000 j
4000 j
4000 j
4000 i
4000 i
4000 i
4000 h
select b, c from t1 where 1000<=b and b<=100000 and c<'j' order by b, c;
b c
1000 h
1000 i
1000 i
1000 i
2000 h
2000 i
2000 i
2000 i
3000 h
3000 i
3000 i
3000 i
4000 h
4000 i
4000 i
4000 i
5000 h
5000 i
5000 i
5000 i
10000 i
20000 i
30000 i
40000 i
50000 i
select b, c from t1 where 1000<=b and b<=100000 and c<'j' order by b desc, c desc;
b c
50000 i
40000 i
30000 i
20000 i
10000 i
5000 i
5000 i
5000 i
5000 h
4000 i
4000 i
4000 i
4000 h
3000 i
3000 i
3000 i
3000 h
2000 i
2000 i
2000 i
2000 h
1000 i
1000 i
1000 i
1000 h
select min(b), max(b) from t1;
min(b) max(b)
1 5000000
CREATE TABLE test1 ( CREATE TABLE test1 (
SubscrID int(11) NOT NULL auto_increment, SubscrID int(11) NOT NULL auto_increment,
UsrID int(11) NOT NULL default '0', UsrID int(11) NOT NULL default '0',
......
...@@ -147,6 +147,35 @@ select * from t1 use index (bc) where b < 4 order by a; ...@@ -147,6 +147,35 @@ select * from t1 use index (bc) where b < 4 order by a;
select * from t1 use index (bc) where b IS NOT NULL order by a; select * from t1 use index (bc) where b IS NOT NULL order by a;
drop table t1; drop table t1;
#
# Order by again, including descending.
#
create table t1 (
a int unsigned primary key,
b int unsigned,
c char(10),
key bc (b, c)
) engine=ndb;
insert into t1 values(1,1,'a'),(2,2,'b'),(3,3,'c'),(4,4,'d'),(5,5,'e');
insert into t1 select a*7,10*b,'f' from t1;
insert into t1 select a*13,10*b,'g' from t1;
insert into t1 select a*17,10*b,'h' from t1;
insert into t1 select a*19,10*b,'i' from t1;
insert into t1 select a*23,10*b,'j' from t1;
insert into t1 select a*29,10*b,'k' from t1;
#
select b, c from t1 where b <= 10 and c <'f' order by b, c;
select b, c from t1 where b <= 10 and c <'f' order by b desc, c desc;
#
select b, c from t1 where b=4000 and c<'k' order by b, c;
select b, c from t1 where b=4000 and c<'k' order by b desc, c desc;
select b, c from t1 where 1000<=b and b<=100000 and c<'j' order by b, c;
select b, c from t1 where 1000<=b and b<=100000 and c<'j' order by b desc, c desc;
#
select min(b), max(b) from t1;
# #
# Bug #6435 # Bug #6435
CREATE TABLE test1 ( CREATE TABLE test1 (
......
...@@ -771,7 +771,7 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase) ...@@ -771,7 +771,7 @@ int ha_ndbcluster::build_index_list(TABLE *tab, enum ILBP phase)
KEY* key_info= tab->key_info; KEY* key_info= tab->key_info;
const char **key_name= tab->keynames.type_names; const char **key_name= tab->keynames.type_names;
NdbDictionary::Dictionary *dict= m_ndb->getDictionary(); NdbDictionary::Dictionary *dict= m_ndb->getDictionary();
DBUG_ENTER("build_index_list"); DBUG_ENTER("ha_ndbcluster::build_index_list");
// Save information about all known indexes // Save information about all known indexes
for (i= 0; i < tab->keys; i++, key_info++, key_name++) for (i= 0; i < tab->keys; i++, key_info++, key_name++)
...@@ -860,7 +860,7 @@ int ha_ndbcluster::check_index_fields_not_null(uint inx) ...@@ -860,7 +860,7 @@ int ha_ndbcluster::check_index_fields_not_null(uint inx)
KEY* key_info= table->key_info + inx; KEY* key_info= table->key_info + inx;
KEY_PART_INFO* key_part= key_info->key_part; KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts; KEY_PART_INFO* end= key_part+key_info->key_parts;
DBUG_ENTER("check_index_fields_not_null"); DBUG_ENTER("ha_ndbcluster::check_index_fields_not_null");
for (; key_part != end; key_part++) for (; key_part != end; key_part++)
{ {
...@@ -922,6 +922,7 @@ static const ulong index_type_flags[]= ...@@ -922,6 +922,7 @@ static const ulong index_type_flags[]=
*/ */
// HA_KEYREAD_ONLY | // HA_KEYREAD_ONLY |
HA_READ_NEXT | HA_READ_NEXT |
HA_READ_PREV |
HA_READ_RANGE | HA_READ_RANGE |
HA_READ_ORDER, HA_READ_ORDER,
...@@ -930,11 +931,13 @@ static const ulong index_type_flags[]= ...@@ -930,11 +931,13 @@ static const ulong index_type_flags[]=
/* UNIQUE_ORDERED_INDEX */ /* UNIQUE_ORDERED_INDEX */
HA_READ_NEXT | HA_READ_NEXT |
HA_READ_PREV |
HA_READ_RANGE | HA_READ_RANGE |
HA_READ_ORDER, HA_READ_ORDER,
/* ORDERED_INDEX */ /* ORDERED_INDEX */
HA_READ_NEXT | HA_READ_NEXT |
HA_READ_PREV |
HA_READ_RANGE | HA_READ_RANGE |
HA_READ_ORDER HA_READ_ORDER
}; };
...@@ -958,7 +961,7 @@ inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const ...@@ -958,7 +961,7 @@ inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
inline ulong ha_ndbcluster::index_flags(uint idx_no, uint part, inline ulong ha_ndbcluster::index_flags(uint idx_no, uint part,
bool all_parts) const bool all_parts) const
{ {
DBUG_ENTER("index_flags"); DBUG_ENTER("ha_ndbcluster::index_flags");
DBUG_PRINT("info", ("idx_no: %d", idx_no)); DBUG_PRINT("info", ("idx_no: %d", idx_no));
DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size); DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size);
DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]); DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]);
...@@ -1024,7 +1027,7 @@ ha_ndbcluster::set_index_key(NdbOperation *op, ...@@ -1024,7 +1027,7 @@ ha_ndbcluster::set_index_key(NdbOperation *op,
const KEY *key_info, const KEY *key_info,
const byte * key_ptr) const byte * key_ptr)
{ {
DBUG_ENTER("set_index_key"); DBUG_ENTER("ha_ndbcluster::set_index_key");
uint i; uint i;
KEY_PART_INFO* key_part= key_info->key_part; KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts; KEY_PART_INFO* end= key_part+key_info->key_parts;
...@@ -1196,7 +1199,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, ...@@ -1196,7 +1199,7 @@ int ha_ndbcluster::unique_index_read(const byte *key,
int res; int res;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbIndexOperation *op; NdbIndexOperation *op;
DBUG_ENTER("unique_index_read"); DBUG_ENTER("ha_ndbcluster::unique_index_read");
DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index)); DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
DBUG_DUMP("key", (char*)key, key_len); DBUG_DUMP("key", (char*)key, key_len);
...@@ -1402,6 +1405,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, ...@@ -1402,6 +1405,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
case HA_READ_KEY_EXACT: case HA_READ_KEY_EXACT:
p.bound_type= NdbIndexScanOperation::BoundEQ; p.bound_type= NdbIndexScanOperation::BoundEQ;
break; break;
// ascending
case HA_READ_KEY_OR_NEXT: case HA_READ_KEY_OR_NEXT:
p.bound_type= NdbIndexScanOperation::BoundLE; p.bound_type= NdbIndexScanOperation::BoundLE;
break; break;
...@@ -1411,6 +1415,19 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, ...@@ -1411,6 +1415,19 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
else else
p.bound_type= NdbIndexScanOperation::BoundLT; p.bound_type= NdbIndexScanOperation::BoundLT;
break; break;
// descending
case HA_READ_PREFIX_LAST: // weird
p.bound_type= NdbIndexScanOperation::BoundEQ;
break;
case HA_READ_PREFIX_LAST_OR_PREV: // weird
p.bound_type= NdbIndexScanOperation::BoundGE;
break;
case HA_READ_BEFORE_KEY:
if (! p.part_last)
p.bound_type= NdbIndexScanOperation::BoundGE;
else
p.bound_type= NdbIndexScanOperation::BoundGT;
break;
default: default:
break; break;
} }
...@@ -1418,6 +1435,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, ...@@ -1418,6 +1435,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
if (j == 1) { if (j == 1) {
switch (p.key->flag) switch (p.key->flag)
{ {
// ascending
case HA_READ_BEFORE_KEY: case HA_READ_BEFORE_KEY:
if (! p.part_last) if (! p.part_last)
p.bound_type= NdbIndexScanOperation::BoundGE; p.bound_type= NdbIndexScanOperation::BoundGE;
...@@ -1429,6 +1447,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op, ...@@ -1429,6 +1447,7 @@ int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
break; break;
default: default:
break; break;
// descending strangely sets no end key
} }
} }
...@@ -1537,15 +1556,16 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) ...@@ -1537,15 +1556,16 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
int ha_ndbcluster::ordered_index_scan(const key_range *start_key, int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
const key_range *end_key, const key_range *end_key,
bool sorted, byte* buf) bool sorted, bool descending, byte* buf)
{ {
int res; int res;
bool restart; bool restart;
NdbConnection *trans= m_active_trans; NdbConnection *trans= m_active_trans;
NdbIndexScanOperation *op; NdbIndexScanOperation *op;
DBUG_ENTER("ordered_index_scan"); DBUG_ENTER("ha_ndbcluster::ordered_index_scan");
DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted)); DBUG_PRINT("enter", ("index: %u, sorted: %d, descending: %d",
active_index, sorted, descending));
DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname)); DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
// Check that sorted seems to be initialised // Check that sorted seems to be initialised
...@@ -1559,7 +1579,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1559,7 +1579,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *) if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
m_index[active_index].index, m_index[active_index].index,
(const NDBTAB *) m_table)) || (const NDBTAB *) m_table)) ||
op->readTuples(lm, 0, parallelism, sorted)) op->readTuples(lm, 0, parallelism, sorted, descending))
ERR_RETURN(trans->getNdbError()); ERR_RETURN(trans->getNdbError());
m_active_cursor= op; m_active_cursor= op;
} else { } else {
...@@ -2152,18 +2172,21 @@ void ha_ndbcluster::print_results() ...@@ -2152,18 +2172,21 @@ void ha_ndbcluster::print_results()
// Use DBUG_PRINT since DBUG_FILE cannot be filtered out // Use DBUG_PRINT since DBUG_FILE cannot be filtered out
char buf[2000]; char buf[2000];
Field *field; Field *field;
void* ptr;
const NDBCOL *col; const NDBCOL *col;
NdbValue value; NdbValue value;
NdbBlob *ndb_blob; NdbBlob *ndb_blob;
buf[0] = 0; buf[0] = 0;
if (!(value= m_value[f]).ptr) if (!(value= m_value[f]).ptr)
{ {
my_snprintf(buf, sizeof(buf), "not read"); my_snprintf(buf, sizeof(buf), "not read");
goto print_value; goto print_value;
} }
field= table->field[f]; field= table->field[f];
DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length()); ptr= field->ptr;
DBUG_DUMP("field->ptr", (char*)ptr, field->pack_length());
col= tab->getColumn(f); col= tab->getColumn(f);
if (! (field->flags & BLOB_FLAG)) if (! (field->flags & BLOB_FLAG))
...@@ -2188,96 +2211,97 @@ void ha_ndbcluster::print_results() ...@@ -2188,96 +2211,97 @@ void ha_ndbcluster::print_results()
switch (col->getType()) { switch (col->getType()) {
case NdbDictionary::Column::Tinyint: { case NdbDictionary::Column::Tinyint: {
char value= *field->ptr; Int8 value= *(Int8*)ptr;
my_snprintf(buf, sizeof(buf), "Tinyint %d", value); my_snprintf(buf, sizeof(buf), "Tinyint %d", value);
break; break;
} }
case NdbDictionary::Column::Tinyunsigned: { case NdbDictionary::Column::Tinyunsigned: {
unsigned char value= *field->ptr; Uint8 value= *(Uint8*)ptr;
my_snprintf(buf, sizeof(buf), "Tinyunsigned %u", value); my_snprintf(buf, sizeof(buf), "Tinyunsigned %u", value);
break; break;
} }
case NdbDictionary::Column::Smallint: { case NdbDictionary::Column::Smallint: {
short value= *field->ptr; Int16 value= *(Int16*)ptr;
my_snprintf(buf, sizeof(buf), "Smallint %d", value); my_snprintf(buf, sizeof(buf), "Smallint %d", value);
break; break;
} }
case NdbDictionary::Column::Smallunsigned: { case NdbDictionary::Column::Smallunsigned: {
unsigned short value= *field->ptr; Uint16 value= *(Uint16*)ptr;
my_snprintf(buf, sizeof(buf), "Smallunsigned %u", value); my_snprintf(buf, sizeof(buf), "Smallunsigned %u", value);
break; break;
} }
case NdbDictionary::Column::Mediumint: { case NdbDictionary::Column::Mediumint: {
byte value[3]; byte value[3];
memcpy(value, field->ptr, 3); memcpy(value, ptr, 3);
my_snprintf(buf, sizeof(buf), "Mediumint %d,%d,%d", value[0], value[1], value[2]); my_snprintf(buf, sizeof(buf), "Mediumint %d,%d,%d", value[0], value[1], value[2]);
break; break;
} }
case NdbDictionary::Column::Mediumunsigned: { case NdbDictionary::Column::Mediumunsigned: {
byte value[3]; byte value[3];
memcpy(value, field->ptr, 3); memcpy(value, ptr, 3);
my_snprintf(buf, sizeof(buf), "Mediumunsigned %u,%u,%u", value[0], value[1], value[2]); my_snprintf(buf, sizeof(buf), "Mediumunsigned %u,%u,%u", value[0], value[1], value[2]);
break; break;
} }
case NdbDictionary::Column::Int: { case NdbDictionary::Column::Int: {
Int32 value= *(Int32*)ptr;
my_snprintf(buf, sizeof(buf), "Int %d", value); my_snprintf(buf, sizeof(buf), "Int %d", value);
break; break;
} }
case NdbDictionary::Column::Unsigned: { case NdbDictionary::Column::Unsigned: {
Uint32 value= (Uint32) *field->ptr; Uint32 value= *(Uint32*)ptr;
my_snprintf(buf, sizeof(buf), "Unsigned %u", value); my_snprintf(buf, sizeof(buf), "Unsigned %u", value);
break; break;
} }
case NdbDictionary::Column::Bigint: { case NdbDictionary::Column::Bigint: {
Int64 value= (Int64) *field->ptr; Int64 value= *(Int64*)ptr;
my_snprintf(buf, sizeof(buf), "Bigint %lld", value); my_snprintf(buf, sizeof(buf), "Bigint %d", (int)value);
break; break;
} }
case NdbDictionary::Column::Bigunsigned: { case NdbDictionary::Column::Bigunsigned: {
Uint64 value= (Uint64) *field->ptr; Uint64 value= *(Uint64*)ptr;
my_snprintf(buf, sizeof(buf), "Bigunsigned %llu", value); my_snprintf(buf, sizeof(buf), "Bigunsigned %u", (unsigned)value);
break; break;
} }
case NdbDictionary::Column::Float: { case NdbDictionary::Column::Float: {
float value= (float) *field->ptr; float value= *(float*)ptr;
my_snprintf(buf, sizeof(buf), "Float %f", (double)value); my_snprintf(buf, sizeof(buf), "Float %f", (double)value);
break; break;
} }
case NdbDictionary::Column::Double: { case NdbDictionary::Column::Double: {
double value= (double) *field->ptr; double value= *(double*)ptr;
my_snprintf(buf, sizeof(buf), "Double %f", value); my_snprintf(buf, sizeof(buf), "Double %f", value);
break; break;
} }
case NdbDictionary::Column::Decimal: { case NdbDictionary::Column::Decimal: {
char *value= field->ptr; const char *value= (char*)ptr;
my_snprintf(buf, sizeof(buf), "Decimal '%-*s'", field->pack_length(), value); my_snprintf(buf, sizeof(buf), "Decimal '%-*s'", field->pack_length(), value);
break; break;
} }
case NdbDictionary::Column::Char:{ case NdbDictionary::Column::Char:{
const char *value= (char *) field->ptr; const char *value= (char*)ptr;
my_snprintf(buf, sizeof(buf), "Char '%.*s'", field->pack_length(), value); my_snprintf(buf, sizeof(buf), "Char '%.*s'", field->pack_length(), value);
break; break;
} }
case NdbDictionary::Column::Varchar: case NdbDictionary::Column::Varchar:
case NdbDictionary::Column::Binary: case NdbDictionary::Column::Binary:
case NdbDictionary::Column::Varbinary: { case NdbDictionary::Column::Varbinary: {
const char *value= (char *) field->ptr; const char *value= (char*)ptr;
my_snprintf(buf, sizeof(buf), "Var '%.*s'", field->pack_length(), value); my_snprintf(buf, sizeof(buf), "Var '%.*s'", field->pack_length(), value);
break; break;
} }
case NdbDictionary::Column::Bit: { case NdbDictionary::Column::Bit: {
const char *value= (char *) field->ptr; const char *value= (char*)ptr;
my_snprintf(buf, sizeof(buf), "Bit '%.*s'", field->pack_length(), value); my_snprintf(buf, sizeof(buf), "Bit '%.*s'", field->pack_length(), value);
break; break;
} }
case NdbDictionary::Column::Datetime: { case NdbDictionary::Column::Datetime: {
Uint64 value= (Uint64) *field->ptr; // todo
my_snprintf(buf, sizeof(buf), "Datetime %llu", value); my_snprintf(buf, sizeof(buf), "Datetime ?");
break; break;
} }
case NdbDictionary::Column::Timespec: { case NdbDictionary::Column::Timespec: {
Uint64 value= (Uint64) *field->ptr; // todo
my_snprintf(buf, sizeof(buf), "Timespec %llu", value); my_snprintf(buf, sizeof(buf), "Timespec ?");
break; break;
} }
case NdbDictionary::Column::Blob: { case NdbDictionary::Column::Blob: {
...@@ -2307,7 +2331,7 @@ print_value: ...@@ -2307,7 +2331,7 @@ print_value:
int ha_ndbcluster::index_init(uint index) int ha_ndbcluster::index_init(uint index)
{ {
DBUG_ENTER("index_init"); DBUG_ENTER("ha_ndbcluster::index_init");
DBUG_PRINT("enter", ("index: %u", index)); DBUG_PRINT("enter", ("index: %u", index));
DBUG_RETURN(handler::index_init(index)); DBUG_RETURN(handler::index_init(index));
} }
...@@ -2315,7 +2339,7 @@ int ha_ndbcluster::index_init(uint index) ...@@ -2315,7 +2339,7 @@ int ha_ndbcluster::index_init(uint index)
int ha_ndbcluster::index_end() int ha_ndbcluster::index_end()
{ {
DBUG_ENTER("index_end"); DBUG_ENTER("ha_ndbcluster::index_end");
DBUG_RETURN(close_scan()); DBUG_RETURN(close_scan());
} }
...@@ -2346,7 +2370,7 @@ int ha_ndbcluster::index_read(byte *buf, ...@@ -2346,7 +2370,7 @@ int ha_ndbcluster::index_read(byte *buf,
const byte *key, uint key_len, const byte *key, uint key_len,
enum ha_rkey_function find_flag) enum ha_rkey_function find_flag)
{ {
DBUG_ENTER("index_read"); DBUG_ENTER("ha_ndbcluster::index_read");
DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d", DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d",
active_index, key_len, find_flag)); active_index, key_len, find_flag));
...@@ -2394,7 +2418,18 @@ int ha_ndbcluster::index_read(byte *buf, ...@@ -2394,7 +2418,18 @@ int ha_ndbcluster::index_read(byte *buf,
start_key.key= key; start_key.key= key;
start_key.length= key_len; start_key.length= key_len;
start_key.flag= find_flag; start_key.flag= find_flag;
error= ordered_index_scan(&start_key, 0, TRUE, buf); bool descending= FALSE;
switch (find_flag) {
case HA_READ_KEY_OR_PREV:
case HA_READ_BEFORE_KEY:
case HA_READ_PREFIX_LAST:
case HA_READ_PREFIX_LAST_OR_PREV:
descending= TRUE;
break;
default:
break;
}
error= ordered_index_scan(&start_key, 0, TRUE, descending, buf);
DBUG_RETURN(error == HA_ERR_END_OF_FILE ? HA_ERR_KEY_NOT_FOUND : error); DBUG_RETURN(error == HA_ERR_END_OF_FILE ? HA_ERR_KEY_NOT_FOUND : error);
} }
...@@ -2404,7 +2439,7 @@ int ha_ndbcluster::index_read_idx(byte *buf, uint index_no, ...@@ -2404,7 +2439,7 @@ int ha_ndbcluster::index_read_idx(byte *buf, uint index_no,
enum ha_rkey_function find_flag) enum ha_rkey_function find_flag)
{ {
statistic_increment(current_thd->status_var.ha_read_key_count, &LOCK_status); statistic_increment(current_thd->status_var.ha_read_key_count, &LOCK_status);
DBUG_ENTER("index_read_idx"); DBUG_ENTER("ha_ndbcluster::index_read_idx");
DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len)); DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len));
index_init(index_no); index_init(index_no);
DBUG_RETURN(index_read(buf, key, key_len, find_flag)); DBUG_RETURN(index_read(buf, key, key_len, find_flag));
...@@ -2413,9 +2448,7 @@ int ha_ndbcluster::index_read_idx(byte *buf, uint index_no, ...@@ -2413,9 +2448,7 @@ int ha_ndbcluster::index_read_idx(byte *buf, uint index_no,
int ha_ndbcluster::index_next(byte *buf) int ha_ndbcluster::index_next(byte *buf)
{ {
DBUG_ENTER("index_next"); DBUG_ENTER("ha_ndbcluster::index_next");
int error= 1;
statistic_increment(current_thd->status_var.ha_read_next_count, statistic_increment(current_thd->status_var.ha_read_next_count,
&LOCK_status); &LOCK_status);
DBUG_RETURN(next_result(buf)); DBUG_RETURN(next_result(buf));
...@@ -2424,42 +2457,37 @@ int ha_ndbcluster::index_next(byte *buf) ...@@ -2424,42 +2457,37 @@ int ha_ndbcluster::index_next(byte *buf)
int ha_ndbcluster::index_prev(byte *buf) int ha_ndbcluster::index_prev(byte *buf)
{ {
DBUG_ENTER("index_prev"); DBUG_ENTER("ha_ndbcluster::index_prev");
statistic_increment(current_thd->status_var.ha_read_prev_count, statistic_increment(current_thd->status_var.ha_read_prev_count,
&LOCK_status); &LOCK_status);
DBUG_RETURN(1); DBUG_RETURN(next_result(buf));
} }
int ha_ndbcluster::index_first(byte *buf) int ha_ndbcluster::index_first(byte *buf)
{ {
DBUG_ENTER("index_first"); DBUG_ENTER("ha_ndbcluster::index_first");
statistic_increment(current_thd->status_var.ha_read_first_count, statistic_increment(current_thd->status_var.ha_read_first_count,
&LOCK_status); &LOCK_status);
// Start the ordered index scan and fetch the first row // Start the ordered index scan and fetch the first row
// Only HA_READ_ORDER indexes get called by index_first // Only HA_READ_ORDER indexes get called by index_first
DBUG_RETURN(ordered_index_scan(0, 0, TRUE, buf)); DBUG_RETURN(ordered_index_scan(0, 0, TRUE, FALSE, buf));
} }
int ha_ndbcluster::index_last(byte *buf) int ha_ndbcluster::index_last(byte *buf)
{ {
DBUG_ENTER("index_last"); DBUG_ENTER("ha_ndbcluster::index_last");
statistic_increment(current_thd->status_var.ha_read_last_count,&LOCK_status); statistic_increment(current_thd->status_var.ha_read_last_count,&LOCK_status);
int res; DBUG_RETURN(ordered_index_scan(0, 0, TRUE, TRUE, buf));
if((res= ordered_index_scan(0, 0, TRUE, buf)) == 0){
NdbScanOperation *cursor= m_active_cursor;
while((res= cursor->nextResult(TRUE, m_force_send)) == 0);
if(res == 1){
unpack_record(buf);
table->status= 0;
DBUG_RETURN(0);
}
}
DBUG_RETURN(res);
} }
int ha_ndbcluster::index_read_last(byte * buf, const byte * key, uint key_len)
{
DBUG_ENTER("ha_ndbcluster::index_read_last");
DBUG_RETURN(index_read(buf, key, key_len, HA_READ_PREFIX_LAST));
}
inline inline
int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key, int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
...@@ -2504,7 +2532,7 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key, ...@@ -2504,7 +2532,7 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
} }
// Start the ordered index scan and fetch the first row // Start the ordered index scan and fetch the first row
error= ordered_index_scan(start_key, end_key, sorted, buf); error= ordered_index_scan(start_key, end_key, sorted, FALSE, buf);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
...@@ -3610,7 +3638,7 @@ int ha_ndbcluster::create(const char *name, ...@@ -3610,7 +3638,7 @@ int ha_ndbcluster::create(const char *name,
int ha_ndbcluster::create_ordered_index(const char *name, int ha_ndbcluster::create_ordered_index(const char *name,
KEY *key_info) KEY *key_info)
{ {
DBUG_ENTER("create_ordered_index"); DBUG_ENTER("ha_ndbcluster::create_ordered_index");
DBUG_RETURN(create_index(name, key_info, FALSE)); DBUG_RETURN(create_index(name, key_info, FALSE));
} }
...@@ -3618,7 +3646,7 @@ int ha_ndbcluster::create_unique_index(const char *name, ...@@ -3618,7 +3646,7 @@ int ha_ndbcluster::create_unique_index(const char *name,
KEY *key_info) KEY *key_info)
{ {
DBUG_ENTER("create_unique_index"); DBUG_ENTER("ha_ndbcluster::create_unique_index");
DBUG_RETURN(create_index(name, key_info, TRUE)); DBUG_RETURN(create_index(name, key_info, TRUE));
} }
...@@ -3635,7 +3663,7 @@ int ha_ndbcluster::create_index(const char *name, ...@@ -3635,7 +3663,7 @@ int ha_ndbcluster::create_index(const char *name,
KEY_PART_INFO *key_part= key_info->key_part; KEY_PART_INFO *key_part= key_info->key_part;
KEY_PART_INFO *end= key_part + key_info->key_parts; KEY_PART_INFO *end= key_part + key_info->key_parts;
DBUG_ENTER("create_index"); DBUG_ENTER("ha_ndbcluster::create_index");
DBUG_PRINT("enter", ("name: %s ", name)); DBUG_PRINT("enter", ("name: %s ", name));
NdbDictionary::Index ndb_index(name); NdbDictionary::Index ndb_index(name);
......
...@@ -96,6 +96,7 @@ class ha_ndbcluster: public handler ...@@ -96,6 +96,7 @@ class ha_ndbcluster: public handler
int index_prev(byte *buf); int index_prev(byte *buf);
int index_first(byte *buf); int index_first(byte *buf);
int index_last(byte *buf); int index_last(byte *buf);
int index_read_last(byte * buf, const byte * key, uint key_len);
int rnd_init(bool scan); int rnd_init(bool scan);
int rnd_end(); int rnd_end();
int rnd_next(byte *buf); int rnd_next(byte *buf);
...@@ -176,7 +177,7 @@ class ha_ndbcluster: public handler ...@@ -176,7 +177,7 @@ class ha_ndbcluster: public handler
byte *buf); byte *buf);
int ordered_index_scan(const key_range *start_key, int ordered_index_scan(const key_range *start_key,
const key_range *end_key, const key_range *end_key,
bool sorted, byte* buf); bool sorted, bool descending, byte* buf);
int full_table_scan(byte * buf); int full_table_scan(byte * buf);
int fetch_next(NdbScanOperation* op); int fetch_next(NdbScanOperation* op);
int next_result(byte *buf); int next_result(byte *buf);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment