Commit 56fa7bfd authored by pekka@mysql.com's avatar pekka@mysql.com

Merge

parents 0d56d7ff f14d9c67
drop table if exists t1;
set autocommit=0;
create table t1 (
a int not null primary key,
b text not null,
c int not null,
d longblob,
key (c)
) engine=ndbcluster;
set @x0 = '01234567012345670123456701234567';
set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0);
set @b1 = 'b1';
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@x0);
set @d1 = 'dd1';
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @b2 = 'b2';
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @d2 = 'dd2';
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
select length(@x0),length(@b1),length(@d1) from dual;
length(@x0) length(@b1) length(@d1)
256 2256 3000
select length(@x0),length(@b2),length(@d2) from dual;
length(@x0) length(@b2) length(@d2)
256 20000 30000
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where a = 1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 const PRIMARY PRIMARY 4 const 1
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=1;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=2;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
2 20000 b2 30000 dd2
update t1 set b=@b2,d=@d2 where a=1;
update t1 set b=@b1,d=@d1 where a=2;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=1;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
1 20000 b2 30000 dd2
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=2;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
2 2256 b1 3000 dd1
update t1 set b=concat(b,b),d=concat(d,d) where a=1;
update t1 set b=concat(b,b),d=concat(d,d) where a=2;
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where a=1;
a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
1 40000 b2 60000 dd2
select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3)
from t1 where a=2;
a length(b) substr(b,1+4*900,2) length(d) substr(d,1+6*900,3)
2 4512 b1 6000 dd1
update t1 set d=null where a=1;
commit;
select a from t1 where d is null;
a
1
delete from t1 where a=1;
delete from t1 where a=2;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c = 111;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ref c c 4 const 10 Using where
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=111;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=222;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
2 20000 b2 30000 dd2
update t1 set b=@b2,d=@d2 where c=111;
update t1 set b=@b1,d=@d1 where c=222;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=111;
a length(b) substr(b,1+2*9000,2) length(d) substr(d,1+3*9000,3)
1 20000 b2 30000 dd2
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=222;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
2 2256 b1 3000 dd1
update t1 set d=null where c=111;
commit;
select a from t1 where d is null;
a
1
delete from t1 where c=111;
delete from t1 where c=222;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ALL NULL NULL NULL NULL 100
select * from t1 order by a;
a b c d
1 b1 111 dd1
2 b2 222 dd2
3 b3 333 dd3
4 b4 444 dd4
5 b5 555 dd5
6 b6 666 dd6
7 b7 777 dd7
8 b8 888 dd8
9 b9 999 dd9
update t1 set b=concat(a,'x',b),d=concat(a,'x',d);
commit;
select * from t1 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
delete from t1;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ALL NULL NULL NULL NULL 100
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
2 20000 b2 30000 dd2
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 order by a;
a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
1 4512 6000
2 40000 b2 60000 dd2
delete from t1;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1 where c >= 100 order by a;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort
select * from t1 where c >= 100 order by a;
a b c d
1 b1 111 dd1
2 b2 222 dd2
3 b3 333 dd3
4 b4 444 dd4
5 b5 555 dd5
6 b6 666 dd6
7 b7 777 dd7
8 b8 888 dd8
9 b9 999 dd9
update t1 set b=concat(a,'x',b),d=concat(a,'x',d)
where c >= 100;
commit;
select * from t1 where c >= 100 order by a;
a b c d
1 1xb1 111 1xdd1
2 2xb2 222 2xdd2
3 3xb3 333 3xdd3
4 4xb4 444 4xdd4
5 5xb5 555 5xdd5
6 6xb6 666 6xdd6
7 7xb7 777 7xdd7
8 8xb8 888 8xdd8
9 9xb9 999 9xdd9
delete from t1 where c >= 100;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c >= 100 order by a;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 range c c 4 NULL 10 Using where; Using filesort
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c >= 100 order by a;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
2 20000 b2 30000 dd2
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where c >= 100 order by a;
a length(b) substr(b,1+4*9000,2) length(d) substr(d,1+6*9000,3)
1 4512 6000
2 40000 b2 60000 dd2
delete from t1 where c >= 100;
commit;
select count(*) from t1;
count(*)
0
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 0;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 1;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 2;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
2 20000 b2 30000 dd2
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
a length(b) substr(b,1+2*900,2) length(d) substr(d,1+3*900,3)
1 2256 b1 3000 dd1
2 20000 b2 30000 dd2
rollback;
select count(*) from t1;
count(*)
0
--source include/have_ndb.inc
--disable_warnings
drop table if exists t1;
--enable_warnings
#
# Minimal NDB blobs test.
#
# On NDB API level there is an extensive test program "testBlobs".
# A prerequisite for this handler test is that "testBlobs" succeeds.
#
# make test harder with autocommit off
set autocommit=0;
create table t1 (
a int not null primary key,
b text not null,
c int not null,
d longblob,
key (c)
) engine=ndbcluster;
# -- values --
# x0 size 256 (current inline size)
set @x0 = '01234567012345670123456701234567';
set @x0 = concat(@x0,@x0,@x0,@x0,@x0,@x0,@x0,@x0);
# b1 length 2000+256 (blob part aligned)
set @b1 = 'b1';
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1,@b1);
set @b1 = concat(@b1,@x0);
# d1 length 3000
set @d1 = 'dd1';
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
set @d1 = concat(@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1,@d1);
# b2 length 20000
set @b2 = 'b2';
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
set @b2 = concat(@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2,@b2);
# d2 length 30000
set @d2 = 'dd2';
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
set @d2 = concat(@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2,@d2);
select length(@x0),length(@b1),length(@d1) from dual;
select length(@x0),length(@b2),length(@d2) from dual;
# -- pk ops --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where a = 1;
# pk read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=1;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=2;
# pk update
update t1 set b=@b2,d=@d2 where a=1;
update t1 set b=@b1,d=@d1 where a=2;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where a=1;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a=2;
# pk update
update t1 set b=concat(b,b),d=concat(d,d) where a=1;
update t1 set b=concat(b,b),d=concat(d,d) where a=2;
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where a=1;
select a,length(b),substr(b,1+4*900,2),length(d),substr(d,1+6*900,3)
from t1 where a=2;
# pk update to null
update t1 set d=null where a=1;
commit;
select a from t1 where d is null;
# pk delete
delete from t1 where a=1;
delete from t1 where a=2;
commit;
select count(*) from t1;
# -- hash index ops --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c = 111;
# hash key read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=111;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=222;
# hash key update
update t1 set b=@b2,d=@d2 where c=111;
update t1 set b=@b1,d=@d1 where c=222;
commit;
select a,length(b),substr(b,1+2*9000,2),length(d),substr(d,1+3*9000,3)
from t1 where c=111;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c=222;
# hash key update to null
update t1 set d=null where c=111;
commit;
select a from t1 where d is null;
# hash key delete
delete from t1 where c=111;
delete from t1 where c=222;
commit;
select count(*) from t1;
# -- table scan ops, short values --
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1;
# table scan read
select * from t1 order by a;
# table scan update
update t1 set b=concat(a,'x',b),d=concat(a,'x',d);
commit;
select * from t1 order by a;
# table scan delete
delete from t1;
commit;
select count(*) from t1;
# -- table scan ops, long values --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1;
# table scan read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
# table scan update
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 order by a;
# table scan delete
delete from t1;
commit;
select count(*) from t1;
# -- range scan ops, short values --
insert into t1 values(1,'b1',111,'dd1');
insert into t1 values(2,'b2',222,'dd2');
insert into t1 values(3,'b3',333,'dd3');
insert into t1 values(4,'b4',444,'dd4');
insert into t1 values(5,'b5',555,'dd5');
insert into t1 values(6,'b6',666,'dd6');
insert into t1 values(7,'b7',777,'dd7');
insert into t1 values(8,'b8',888,'dd8');
insert into t1 values(9,'b9',999,'dd9');
commit;
explain select * from t1 where c >= 100 order by a;
# range scan read
select * from t1 where c >= 100 order by a;
# range scan update
update t1 set b=concat(a,'x',b),d=concat(a,'x',d)
where c >= 100;
commit;
select * from t1 where c >= 100 order by a;
# range scan delete
delete from t1 where c >= 100;
commit;
select count(*) from t1;
# -- range scan ops, long values --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
commit;
explain select * from t1 where c >= 100 order by a;
# range scan read
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where c >= 100 order by a;
# range scan update
update t1 set b=concat(b,b),d=concat(d,d);
commit;
select a,length(b),substr(b,1+4*9000,2),length(d),substr(d,1+6*9000,3)
from t1 where c >= 100 order by a;
# range scan delete
delete from t1 where c >= 100;
commit;
select count(*) from t1;
# -- rollback --
insert into t1 values(1,@b1,111,@d1);
insert into t1 values(2,@b2,222,@d2);
# 626
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 0;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 1;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 where a = 2;
select a,length(b),substr(b,1+2*900,2),length(d),substr(d,1+3*900,3)
from t1 order by a;
rollback;
select count(*) from t1;
--drop table t1;
......@@ -311,7 +311,7 @@ public:
ExtDatetime = NdbSqlUtil::Type::Datetime,
ExtTimespec = NdbSqlUtil::Type::Timespec,
ExtBlob = NdbSqlUtil::Type::Blob,
ExtClob = NdbSqlUtil::Type::Clob
ExtText = NdbSqlUtil::Type::Text
};
// Attribute data interpretation
......@@ -435,7 +435,7 @@ public:
AttributeArraySize = 12 * AttributeExtLength;
return true;
case DictTabInfo::ExtBlob:
case DictTabInfo::ExtClob:
case DictTabInfo::ExtText:
AttributeType = DictTabInfo::StringType;
AttributeSize = DictTabInfo::an8Bit;
// head + inline part [ attr precision ]
......
......@@ -50,24 +50,33 @@ class NdbColumnImpl;
* - closed: after transaction commit
* - invalid: after rollback or transaction close
*
* NdbBlob supports 2 styles of data access:
* NdbBlob supports 3 styles of data access:
*
* - in prepare phase, NdbBlob methods getValue and setValue are used to
* prepare a read or write of a single blob value of known size
* prepare a read or write of a blob value of known size
*
* - in active phase, NdbBlob methods readData and writeData are used to
* read or write blob data of undetermined size
* - in prepare phase, setActiveHook is used to define a routine which
* is invoked as soon as the handle becomes active
*
* - in active phase, readData and writeData are used to read or write
* blob data of arbitrary size
*
* The styles can be applied in combination (in above order).
*
* Blob operations take effect at next transaction execute. In some
* cases NdbBlob is forced to do implicit executes. To avoid this,
* operate on complete blob parts.
*
* Use NdbConnection::executePendingBlobOps to flush your reads and
* writes. It avoids execute penalty if nothing is pending. It is not
* needed after execute (obviously) or after next scan result.
*
* NdbBlob methods return -1 on error and 0 on success, and use output
* parameters when necessary.
*
* Notes:
* - table and its blob part tables are not created atomically
* - blob data operations take effect at next transaction execute
* - NdbBlob may need to do implicit executes on the transaction
* - read and write of complete parts is much more efficient
* - scan must use the "new" interface NdbScanOperation
* - scan with blobs applies hold-read-lock (at minimum)
* - to update a blob in a read op requires exclusive tuple lock
* - update op in scan must do its own getBlobHandle
* - delete creates implicit, not-accessible blob handles
......@@ -78,12 +87,16 @@ class NdbColumnImpl;
* - scan must use exclusive locking for now
*
* Todo:
* - add scan method hold-read-lock-until-next + return-keyinfo
* - better check of keyinfo length when setting keys
* - better check of allowed blob op vs locking mode
* - add scan method hold-read-lock + return-keyinfo
* - check keyinfo length when setting keys
* - check allowed blob ops vs locking mode
* - overload control (too many pending ops)
*/
class NdbBlob {
public:
/**
* State.
*/
enum State {
Idle = 0,
Prepared = 1,
......@@ -92,9 +105,15 @@ public:
Invalid = 9
};
State getState();
/**
* Inline blob header.
*/
struct Head {
Uint64 length;
};
/**
* Prepare to read blob value. The value is available after execute.
* Use isNull to check for NULL and getLength to get the real length
* Use getNull to check for NULL and getLength to get the real length
* and to check for truncation. Sets current read/write position to
* after the data read.
*/
......@@ -106,6 +125,20 @@ public:
* data to null pointer (0) to create a NULL value.
*/
int setValue(const void* data, Uint32 bytes);
/**
* Callback for setActiveHook. Invoked immediately when the prepared
* operation has been executed (but not committed). Any getValue or
* setValue is done first. The blob handle is active so readData or
* writeData etc can be used to manipulate blob value. A user-defined
* argument is passed along. Returns non-zero on error.
*/
typedef int ActiveHook(NdbBlob* me, void* arg);
/**
* Define callback for blob handle activation. The queue of prepared
* operations will be executed in no commit mode up to this point and
* then the callback is invoked.
*/
int setActiveHook(ActiveHook* activeHook, void* arg);
/**
* Check if blob is null.
*/
......@@ -115,7 +148,7 @@ public:
*/
int setNull();
/**
* Get current length in bytes. Use isNull to distinguish between
* Get current length in bytes. Use getNull to distinguish between
* length 0 blob and NULL blob.
*/
int getLength(Uint64& length);
......@@ -180,6 +213,13 @@ public:
static const int ErrAbort = 4268;
// "Unknown blob error"
static const int ErrUnknown = 4269;
/**
* Return info about all blobs in this operation.
*/
// Get first blob in list
NdbBlob* blobsFirstBlob();
// Get next blob in list after this one
NdbBlob* blobsNextBlob();
private:
friend class Ndb;
......@@ -214,10 +254,11 @@ private:
bool theSetFlag;
const char* theSetBuf;
Uint32 theGetSetBytes;
// head
struct Head {
Uint64 length;
};
// pending ops
Uint8 thePendingBlobOps;
// activation callback
ActiveHook* theActiveHook;
void* theActiveHookArg;
// buffers
struct Buf {
char* data;
......@@ -235,7 +276,6 @@ private:
char* theInlineData;
NdbRecAttr* theHeadInlineRecAttr;
bool theHeadInlineUpdateFlag;
bool theNewPartFlag;
// length and read/write position
int theNullFlag;
Uint64 theLength;
......@@ -276,6 +316,11 @@ private:
int insertParts(const char* buf, Uint32 part, Uint32 count);
int updateParts(const char* buf, Uint32 part, Uint32 count);
int deleteParts(Uint32 part, Uint32 count);
// pending ops
int executePendingBlobReads();
int executePendingBlobWrites();
// callbacks
int invokeActiveHook();
// blob handle maintenance
int atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn);
int preExecute(ExecType anExecType, bool& batch);
......@@ -287,6 +332,7 @@ private:
void setErrorCode(NdbOperation* anOp, bool invalidFlag = true);
void setErrorCode(NdbConnection* aCon, bool invalidFlag = true);
#ifdef VM_TRACE
int getOperationType() const;
friend class NdbOut& operator<<(NdbOut&, const NdbBlob&);
#endif
};
......
......@@ -431,6 +431,15 @@ public:
/** @} *********************************************************************/
/**
* Execute the transaction in NoCommit mode if there are any not-yet
* executed blob part operations of given types. Otherwise do
* nothing. The flags argument is bitwise OR of (1 << optype) where
* optype comes from NdbOperation::OperationType. Only the basic PK
* ops are used (read, insert, update, delete).
*/
int executePendingBlobOps(Uint8 flags = 0xFF);
private:
/**
* Release completed operations
......@@ -642,6 +651,7 @@ private:
Uint32 theBuddyConPtr;
// optim: any blobs
bool theBlobFlag;
Uint8 thePendingBlobOps;
static void sendTC_COMMIT_ACK(NdbApiSignal *,
Uint32 transId1, Uint32 transId2,
......@@ -869,6 +879,21 @@ NdbConnection::OpSent()
theNoOfOpSent++;
}
/******************************************************************************
void executePendingBlobOps();
******************************************************************************/
#include <stdlib.h>
inline
int
NdbConnection::executePendingBlobOps(Uint8 flags)
{
if (thePendingBlobOps & flags) {
// not executeNoBlobs because there can be new ops with blobs
return execute(NoCommit);
}
return 0;
}
inline
Uint32
NdbConnection::ptr2int(){
......@@ -876,5 +901,3 @@ NdbConnection::ptr2int(){
}
#endif
......@@ -183,7 +183,7 @@ public:
Datetime, ///< Precision down to 1 sec (sizeof(Datetime) == 8 bytes )
Timespec, ///< Precision down to 1 nsec(sizeof(Datetime) == 12 bytes )
Blob, ///< Binary large object (see NdbBlob)
Clob ///< Text blob
Text ///< Text blob
};
/**
......@@ -309,7 +309,8 @@ public:
/**
* For blob, set or get "part size" i.e. number of bytes to store in
* each tuple of the "blob table". Must be less than 64k.
* each tuple of the "blob table". Can be set to zero to omit parts
* and to allow only inline bytes ("tinyblob").
*/
void setPartSize(int size) { setScale(size); }
int getPartSize() const { return getScale(); }
......@@ -1060,6 +1061,6 @@ public:
};
};
class NdbOut& operator <<(class NdbOut& ndbout, const NdbDictionary::Column::Type type);
class NdbOut& operator <<(class NdbOut& out, const NdbDictionary::Column& col);
#endif
......@@ -80,7 +80,7 @@ public:
Datetime, // Precision down to 1 sec (size 8 bytes)
Timespec, // Precision down to 1 nsec (size 12 bytes)
Blob, // Blob
Clob // Text blob
Text // Text blob
};
Enum m_typeId;
Cmp* m_cmp; // set to NULL if cmp not implemented
......@@ -125,7 +125,7 @@ private:
static Cmp cmpDatetime;
static Cmp cmpTimespec;
static Cmp cmpBlob;
static Cmp cmpClob;
static Cmp cmpText;
};
inline int
......@@ -344,17 +344,15 @@ NdbSqlUtil::cmp(Uint32 typeId, const Uint32* p1, const Uint32* p2, Uint32 full,
break;
case Type::Blob: // XXX fix
break;
case Type::Clob:
case Type::Text:
{
// skip blob head, the rest is varchar
// skip blob head, the rest is char
const unsigned skip = NDB_BLOB_HEAD_SIZE;
if (size >= skip + 1) {
union { const Uint32* p; const char* v; } u1, u2;
u1.p = p1 + skip;
u2.p = p2 + skip;
// length in first 2 bytes
int k = strncmp(u1.v + 2, u2.v + 2, ((size - skip) << 2) - 2);
return k < 0 ? -1 : k > 0 ? +1 : full == size ? 0 : CmpUnknown;
// TODO
}
return CmpUnknown;
}
......
......@@ -161,8 +161,8 @@ NdbSqlUtil::m_typeList[] = {
NULL // cmpDatetime
},
{
Type::Clob,
cmpClob
Type::Text,
cmpText
}
};
......@@ -299,9 +299,9 @@ NdbSqlUtil::cmpBlob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size
}
int
NdbSqlUtil::cmpClob(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
NdbSqlUtil::cmpText(const Uint32* p1, const Uint32* p2, Uint32 full, Uint32 size)
{
return cmp(Type::Clob, p1, p2, full, size);
return cmp(Type::Text, p1, p2, full, size);
}
#ifdef NDB_SQL_UTIL_TEST
......
......@@ -28,10 +28,11 @@
do { \
static const char* p = getenv("NDB_BLOB_DEBUG"); \
if (p == 0 || *p == 0 || *p == '0') break; \
const char* cname = theColumn == NULL ? "BLOB" : theColumn->m_name.c_str(); \
ndbout << cname << " " << __LINE__ << " " << x << " " << *this << endl; \
static char* prefix = "BLOB"; \
const char* cname = theColumn == NULL ? "-" : theColumn->m_name.c_str(); \
ndbout << prefix << " " << hex << (void*)this << " " << cname; \
ndbout << " " << dec << __LINE__ << " " << x << " " << *this << endl; \
} while (0)
#define EXE() assert(theNdbCon->executeNoBlobs(NoCommit) == 0)
#else
#define DBG(x)
#endif
......@@ -49,7 +50,7 @@ ndb_blob_debug(const Uint32* data, unsigned size)
/*
* Reading index table directly (as a table) is faster but there are
* bugs or limitations. Keep the code but make possible to choose.
* bugs or limitations. Keep the code and make possible to choose.
*/
static const bool g_ndb_blob_ok_to_read_index_table = false;
......@@ -116,7 +117,7 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm
case NdbDictionary::Column::Blob:
bc.setType(NdbDictionary::Column::Binary);
break;
case NdbDictionary::Column::Clob:
case NdbDictionary::Column::Text:
bc.setType(NdbDictionary::Column::Char);
break;
default:
......@@ -155,11 +156,13 @@ NdbBlob::init()
theSetFlag = false;
theSetBuf = NULL;
theGetSetBytes = 0;
thePendingBlobOps = 0;
theActiveHook = NULL;
theActiveHookArg = NULL;
theHead = NULL;
theInlineData = NULL;
theHeadInlineRecAttr = NULL;
theHeadInlineUpdateFlag = false;
theNewPartFlag = false;
theNullFlag = -1;
theLength = 0;
thePos = 0;
......@@ -270,7 +273,7 @@ NdbBlob::isScanOp()
inline Uint32
NdbBlob::getPartNumber(Uint64 pos)
{
assert(pos >= theInlineSize);
assert(thePartSize != 0 && pos >= theInlineSize);
return (pos - theInlineSize) / thePartSize;
}
......@@ -322,10 +325,10 @@ int
NdbBlob::setTableKeyValue(NdbOperation* anOp)
{
const Uint32* data = (const Uint32*)theKeyBuf.data;
DBG("setTableKeyValue key=" << ndb_blob_debug(data, theTable->m_sizeOfKeysInWords));
const unsigned columns = theTable->m_columns.size();
unsigned pos = 0;
const unsigned size = theTable->m_columns.size();
DBG("setTableKeyValue key=" << ndb_blob_debug(data, size));
for (unsigned i = 0; i < size; i++) {
for (unsigned i = 0; i < columns; i++) {
NdbColumnImpl* c = theTable->m_columns[i];
assert(c != NULL);
if (c->m_pk) {
......@@ -345,10 +348,10 @@ int
NdbBlob::setAccessKeyValue(NdbOperation* anOp)
{
const Uint32* data = (const Uint32*)theAccessKeyBuf.data;
DBG("setAccessKeyValue key=" << ndb_blob_debug(data, theAccessTable->m_sizeOfKeysInWords));
const unsigned columns = theAccessTable->m_columns.size();
unsigned pos = 0;
const unsigned size = theAccessTable->m_columns.size();
DBG("setAccessKeyValue key=" << ndb_blob_debug(data, size));
for (unsigned i = 0; i < size; i++) {
for (unsigned i = 0; i < columns; i++) {
NdbColumnImpl* c = theAccessTable->m_columns[i];
assert(c != NULL);
if (c->m_pk) {
......@@ -479,11 +482,27 @@ NdbBlob::setValue(const void* data, Uint32 bytes)
return 0;
}
// activation hook
int
NdbBlob::setActiveHook(ActiveHook activeHook, void* arg)
{
DBG("setActiveHook hook=" << hex << (void*)activeHook << " arg=" << hex << arg);
if (theState != Prepared) {
setErrorCode(ErrState);
return -1;
}
theActiveHook = activeHook;
theActiveHookArg = arg;
return 0;
}
// misc operations
int
NdbBlob::getNull(bool& isNull)
{
DBG("getNull");
if (theState == Prepared && theSetFlag) {
isNull = (theSetBuf == NULL);
return 0;
......@@ -520,6 +539,7 @@ NdbBlob::setNull()
int
NdbBlob::getLength(Uint64& len)
{
DBG("getLength");
if (theState == Prepared && theSetFlag) {
len = theGetSetBytes;
return 0;
......@@ -535,17 +555,17 @@ NdbBlob::getLength(Uint64& len)
int
NdbBlob::truncate(Uint64 length)
{
DBG("truncate kength=" << length);
DBG("truncate length=" << length);
if (theNullFlag == -1) {
setErrorCode(ErrState);
return -1;
}
if (theLength > length) {
if (length >= theInlineSize) {
Uint32 part1 = getPartNumber(length);
if (length > theInlineSize) {
Uint32 part1 = getPartNumber(length - 1);
Uint32 part2 = getPartNumber(theLength - 1);
assert(part2 >= part1);
if (deleteParts(part1, part2 - part1) == -1)
if (part2 > part1 && deleteParts(part1 + 1, part2 - part1) == -1)
return -1;
} else {
if (deleteParts(0, getPartCount()) == -1)
......@@ -560,6 +580,7 @@ NdbBlob::truncate(Uint64 length)
int
NdbBlob::getPos(Uint64& pos)
{
DBG("getPos");
if (theNullFlag == -1) {
setErrorCode(ErrState);
return -1;
......@@ -571,6 +592,7 @@ NdbBlob::getPos(Uint64& pos)
int
NdbBlob::setPos(Uint64 pos)
{
DBG("setPos pos=" << pos);
if (theNullFlag == -1) {
setErrorCode(ErrState);
return -1;
......@@ -629,6 +651,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes)
len -= n;
}
}
if (len > 0 && thePartSize == 0) {
setErrorCode(ErrSeek);
return -1;
}
if (len > 0) {
assert(pos >= theInlineSize);
Uint32 off = (pos - theInlineSize) % thePartSize;
......@@ -638,11 +664,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes)
Uint32 part = (pos - theInlineSize) / thePartSize;
if (readParts(thePartBuf.data, part, 1) == -1)
return -1;
DBG("force execute");
if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
setErrorCode(theNdbOp);
// need result now
DBG("execute pending part reads");
if (executePendingBlobReads() == -1)
return -1;
}
Uint32 n = thePartSize - off;
if (n > len)
n = len;
......@@ -673,11 +698,10 @@ NdbBlob::readDataPrivate(Uint64 pos, char* buf, Uint32& bytes)
Uint32 part = (pos - theInlineSize) / thePartSize;
if (readParts(thePartBuf.data, part, 1) == -1)
return -1;
DBG("force execute");
if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
setErrorCode(theNdbOp);
// need result now
DBG("execute pending part reads");
if (executePendingBlobReads() == -1)
return -1;
}
memcpy(buf, thePartBuf.data, len);
Uint32 n = len;
pos += n;
......@@ -736,29 +760,27 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes)
len -= n;
}
}
if (len > 0 && thePartSize == 0) {
setErrorCode(ErrSeek);
return -1;
}
if (len > 0) {
assert(pos >= theInlineSize);
Uint32 off = (pos - theInlineSize) % thePartSize;
// partial first block
if (off != 0) {
DBG("partial first block pos=" << pos << " len=" << len);
if (theNewPartFlag) {
// must flush insert to guarantee read
DBG("force execute");
if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
setErrorCode(theNdbOp);
return -1;
}
theNewPartFlag = false;
}
// flush writes to guarantee correct read
DBG("execute pending part writes");
if (executePendingBlobWrites() == -1)
return -1;
Uint32 part = (pos - theInlineSize) / thePartSize;
if (readParts(thePartBuf.data, part, 1) == -1)
return -1;
DBG("force execute");
if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
setErrorCode(theNdbOp);
// need result now
DBG("execute pending part reafs");
if (executePendingBlobReads() == -1)
return -1;
}
Uint32 n = thePartSize - off;
if (n > len) {
memset(thePartBuf.data + off + len, theFillChar, n - len);
......@@ -799,22 +821,16 @@ NdbBlob::writeDataPrivate(Uint64 pos, const char* buf, Uint32 bytes)
assert((pos - theInlineSize) % thePartSize == 0 && len < thePartSize);
Uint32 part = (pos - theInlineSize) / thePartSize;
if (theLength > pos + len) {
if (theNewPartFlag) {
// must flush insert to guarantee read
DBG("force execute");
if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
setErrorCode(theNdbOp);
return -1;
}
theNewPartFlag = false;
}
// flush writes to guarantee correct read
DBG("execute pending part writes");
if (executePendingBlobWrites() == -1)
return -1;
if (readParts(thePartBuf.data, part, 1) == -1)
return -1;
DBG("force execute");
if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
setErrorCode(theNdbOp);
// need result now
DBG("execute pending part reads");
if (executePendingBlobReads() == -1)
return -1;
}
memcpy(thePartBuf.data, buf, len);
if (updateParts(thePartBuf.data, part, 1) == -1)
return -1;
......@@ -859,6 +875,8 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
}
buf += thePartSize;
n++;
thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
}
return 0;
}
......@@ -879,7 +897,8 @@ NdbBlob::insertParts(const char* buf, Uint32 part, Uint32 count)
}
buf += thePartSize;
n++;
theNewPartFlag = true;
thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
}
return 0;
}
......@@ -900,7 +919,8 @@ NdbBlob::updateParts(const char* buf, Uint32 part, Uint32 count)
}
buf += thePartSize;
n++;
theNewPartFlag = true;
thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
}
return 0;
}
......@@ -919,6 +939,52 @@ NdbBlob::deleteParts(Uint32 part, Uint32 count)
return -1;
}
n++;
thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
}
return 0;
}
// pending ops
int
NdbBlob::executePendingBlobReads()
{
Uint8 flags = (1 << NdbOperation::ReadRequest);
if (thePendingBlobOps & flags) {
if (theNdbCon->executeNoBlobs(NoCommit) == -1)
return -1;
thePendingBlobOps = 0;
theNdbCon->thePendingBlobOps = 0;
}
return 0;
}
int
NdbBlob::executePendingBlobWrites()
{
Uint8 flags = 0xFF & ~(1 << NdbOperation::ReadRequest);
if (thePendingBlobOps & flags) {
if (theNdbCon->executeNoBlobs(NoCommit) == -1)
return -1;
thePendingBlobOps = 0;
theNdbCon->thePendingBlobOps = 0;
}
return 0;
}
// callbacks
int
NdbBlob::invokeActiveHook()
{
DBG("invokeActiveHook");
assert(theState == Active && theActiveHook != NULL);
int ret = (*theActiveHook)(this, theActiveHookArg);
DBG("invokeActiveHook ret=" << ret);
if (ret != 0) {
// no error is set on blob level
return -1;
}
return 0;
}
......@@ -948,7 +1014,7 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
partType = NdbDictionary::Column::Binary;
theFillChar = 0x0;
break;
case NdbDictionary::Column::Clob:
case NdbDictionary::Column::Text:
partType = NdbDictionary::Column::Char;
theFillChar = 0x20;
break;
......@@ -960,22 +1026,21 @@ NdbBlob::atPrepare(NdbConnection* aCon, NdbOperation* anOp, const NdbColumnImpl*
theInlineSize = theColumn->getInlineSize();
thePartSize = theColumn->getPartSize();
theStripeSize = theColumn->getStripeSize();
// blob table sanity check
// sanity check
assert((NDB_BLOB_HEAD_SIZE << 2) == sizeof(Head));
assert(theColumn->m_attrSize * theColumn->m_arraySize == sizeof(Head) + theInlineSize);
getBlobTableName(theBlobTableName, theTable, theColumn);
const NdbDictionary::Table* bt;
const NdbDictionary::Column* bc;
if (theInlineSize >= (1 << 16) ||
thePartSize == 0 ||
thePartSize >= (1 << 16) ||
theStripeSize == 0 ||
(bt = theNdb->theDictionary->getTable(theBlobTableName)) == NULL ||
(bc = bt->getColumn("DATA")) == NULL ||
bc->getType() != partType ||
bc->getLength() != (int)thePartSize) {
setErrorCode(ErrTable);
return -1;
if (thePartSize > 0) {
if (theStripeSize == 0 ||
(bt = theNdb->theDictionary->getTable(theBlobTableName)) == NULL ||
(bc = bt->getColumn("DATA")) == NULL ||
bc->getType() != partType ||
bc->getLength() != (int)thePartSize) {
setErrorCode(ErrTable);
return -1;
}
}
// buffers
theKeyBuf.alloc(theTable->m_sizeOfKeysInWords << 2);
......@@ -1061,7 +1126,7 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
Uint32 bytes = theGetSetBytes - theInlineSize;
if (writeDataPrivate(pos, buf, bytes) == -1)
return -1;
if (anExecType == Commit && theHeadInlineUpdateFlag) {
if (theHeadInlineUpdateFlag) {
// add an operation to update head+inline
NdbOperation* tOp = theNdbCon->getNdbOperation(theTable);
if (tOp == NULL ||
......@@ -1129,6 +1194,10 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
batch = true;
}
}
if (theActiveHook != NULL) {
// need blob head for callback
batch = true;
}
DBG("preExecute out batch=" << batch);
return 0;
}
......@@ -1145,8 +1214,11 @@ NdbBlob::postExecute(ExecType anExecType)
DBG("postExecute type=" << anExecType);
if (theState == Invalid)
return -1;
if (theState == Active)
if (theState == Active) {
setState(anExecType == NoCommit ? Active : Closed);
DBG("postExecute skip");
return 0;
}
assert(theState == Prepared);
assert(isKeyOp());
if (isIndexOp()) {
......@@ -1200,8 +1272,12 @@ NdbBlob::postExecute(ExecType anExecType)
if (deleteParts(0, getPartCount()) == -1)
return -1;
}
theNewPartFlag = false;
setState(anExecType == NoCommit ? Active : Closed);
// activation callback
if (theActiveHook != NULL) {
if (invokeActiveHook() == -1)
return -1;
}
DBG("postExecute out");
return 0;
}
......@@ -1275,20 +1351,18 @@ NdbBlob::atNextResult()
Uint32 bytes = theGetSetBytes - theInlineSize;
if (readDataPrivate(pos, buf, bytes) == -1)
return -1;
// must also execute them
DBG("force execute");
if (theNdbCon->executeNoBlobs(NoCommit) == -1) {
setErrorCode((NdbOperation*)0);
return -1;
}
}
}
setState(Active);
// activation callback
if (theActiveHook != NULL) {
if (invokeActiveHook() == -1)
return -1;
}
DBG("atNextResult out");
return 0;
}
// misc
const NdbDictionary::Column*
......@@ -1304,6 +1378,9 @@ NdbBlob::setErrorCode(int anErrorCode, bool invalidFlag)
{
DBG("setErrorCode code=" << anErrorCode);
theError.code = anErrorCode;
// conditionally copy error to operation level
if (theNdbOp != NULL && theNdbOp->theError.code == 0)
theNdbOp->setErrorCode(theError.code);
if (invalidFlag)
setState(Invalid);
}
......@@ -1336,11 +1413,34 @@ NdbBlob::setErrorCode(NdbConnection* aCon, bool invalidFlag)
setErrorCode(code, invalidFlag);
}
// info about all blobs in this operation
NdbBlob*
NdbBlob::blobsFirstBlob()
{
return theNdbOp->theBlobList;
}
NdbBlob*
NdbBlob::blobsNextBlob()
{
return theNext;
}
// debug
#ifdef VM_TRACE
inline int
NdbBlob::getOperationType() const
{
return theNdbOp != NULL ? theNdbOp->theOperationType : -1;
}
NdbOut&
operator<<(NdbOut& out, const NdbBlob& blob)
{
ndbout << dec << "s=" << blob.theState;
ndbout << dec << "o=" << blob.getOperationType();
ndbout << dec << " s=" << blob.theState;
ndbout << dec << " n=" << blob.theNullFlag;;
ndbout << dec << " l=" << blob.theLength;
ndbout << dec << " p=" << blob.thePos;
......
......@@ -89,7 +89,8 @@ NdbConnection::NdbConnection( Ndb* aNdb ) :
// Scan operations
theScanningOp(NULL),
theBuddyConPtr(0xFFFFFFFF),
theBlobFlag(false)
theBlobFlag(false),
thePendingBlobOps(0)
{
theListState = NotInList;
theError.code = 0;
......@@ -150,6 +151,7 @@ NdbConnection::init()
theBuddyConPtr = 0xFFFFFFFF;
//
theBlobFlag = false;
thePendingBlobOps = 0;
}//NdbConnection::init()
/*****************************************************************************
......@@ -269,26 +271,34 @@ NdbConnection::execute(ExecType aTypeOfExec,
if (! theBlobFlag)
return executeNoBlobs(aTypeOfExec, abortOption, forceSend);
// execute prepared ops in batches, as requested by blobs
/*
* execute prepared ops in batches, as requested by blobs
* - blob error does not terminate execution
* - blob error sets error on operation
* - if error on operation skip blob calls
*/
ExecType tExecType;
NdbOperation* tPrepOp;
int ret = 0;
do {
tExecType = aTypeOfExec;
tPrepOp = theFirstOpInList;
while (tPrepOp != NULL) {
bool batch = false;
NdbBlob* tBlob = tPrepOp->theBlobList;
while (tBlob != NULL) {
if (tBlob->preExecute(tExecType, batch) == -1)
return -1;
tBlob = tBlob->theNext;
}
if (batch) {
// blob asked to execute all up to here now
tExecType = NoCommit;
break;
if (tPrepOp->theError.code == 0) {
bool batch = false;
NdbBlob* tBlob = tPrepOp->theBlobList;
while (tBlob != NULL) {
if (tBlob->preExecute(tExecType, batch) == -1)
ret = -1;
tBlob = tBlob->theNext;
}
if (batch) {
// blob asked to execute all up to here now
tExecType = NoCommit;
break;
}
}
tPrepOp = tPrepOp->next();
}
......@@ -304,26 +314,30 @@ NdbConnection::execute(ExecType aTypeOfExec,
if (tExecType == Commit) {
NdbOperation* tOp = theCompletedFirstOp;
while (tOp != NULL) {
NdbBlob* tBlob = tOp->theBlobList;
while (tBlob != NULL) {
if (tBlob->preCommit() == -1)
return -1;
tBlob = tBlob->theNext;
if (tOp->theError.code == 0) {
NdbBlob* tBlob = tOp->theBlobList;
while (tBlob != NULL) {
if (tBlob->preCommit() == -1)
ret = -1;
tBlob = tBlob->theNext;
}
}
tOp = tOp->next();
}
}
if (executeNoBlobs(tExecType, abortOption, forceSend) == -1)
return -1;
ret = -1;
{
NdbOperation* tOp = theCompletedFirstOp;
while (tOp != NULL) {
NdbBlob* tBlob = tOp->theBlobList;
while (tBlob != NULL) {
// may add new operations if batch
if (tBlob->postExecute(tExecType) == -1)
return -1;
tBlob = tBlob->theNext;
if (tOp->theError.code == 0) {
NdbBlob* tBlob = tOp->theBlobList;
while (tBlob != NULL) {
// may add new operations if batch
if (tBlob->postExecute(tExecType) == -1)
ret = -1;
tBlob = tBlob->theNext;
}
}
tOp = tOp->next();
}
......@@ -338,7 +352,7 @@ NdbConnection::execute(ExecType aTypeOfExec,
}
} while (theFirstOpInList != NULL || tExecType != aTypeOfExec);
return 0;
return ret;
}
int
......@@ -397,6 +411,7 @@ NdbConnection::executeNoBlobs(ExecType aTypeOfExec,
break;
}
}
thePendingBlobOps = 0;
return 0;
}//NdbConnection::execute()
......
......@@ -806,73 +806,90 @@ NdbDictionary::Dictionary::getNdbError() const {
return m_impl.getNdbError();
}
NdbOut& operator <<(NdbOut& ndbout, const NdbDictionary::Column::Type type)
// printers
NdbOut&
operator<<(NdbOut& out, const NdbDictionary::Column& col)
{
switch(type){
case NdbDictionary::Column::Bigunsigned:
ndbout << "Bigunsigned";
out << col.getName() << " ";
switch (col.getType()) {
case NdbDictionary::Column::Tinyint:
out << "Tinyint";
break;
case NdbDictionary::Column::Unsigned:
ndbout << "Unsigned";
case NdbDictionary::Column::Tinyunsigned:
out << "Tinyunsigned";
break;
case NdbDictionary::Column::Smallint:
out << "Smallint";
break;
case NdbDictionary::Column::Smallunsigned:
ndbout << "Smallunsigned";
out << "Smallunsigned";
break;
case NdbDictionary::Column::Tinyunsigned:
ndbout << "Tinyunsigned";
case NdbDictionary::Column::Mediumint:
out << "Mediumint";
break;
case NdbDictionary::Column::Bigint:
ndbout << "Bigint";
case NdbDictionary::Column::Mediumunsigned:
out << "Mediumunsigned";
break;
case NdbDictionary::Column::Int:
ndbout << "Int";
out << "Int";
break;
case NdbDictionary::Column::Smallint:
ndbout << "Smallint";
break;
case NdbDictionary::Column::Tinyint:
ndbout << "Tinyint";
case NdbDictionary::Column::Unsigned:
out << "Unsigned";
break;
case NdbDictionary::Column::Char:
ndbout << "Char";
case NdbDictionary::Column::Bigint:
out << "Bigint";
break;
case NdbDictionary::Column::Varchar:
ndbout << "Varchar";
case NdbDictionary::Column::Bigunsigned:
out << "Bigunsigned";
break;
case NdbDictionary::Column::Float:
ndbout << "Float";
out << "Float";
break;
case NdbDictionary::Column::Double:
ndbout << "Double";
out << "Double";
break;
case NdbDictionary::Column::Mediumint:
ndbout << "Mediumint";
case NdbDictionary::Column::Decimal:
out << "Decimal(" << col.getScale() << "," << col.getPrecision() << ")";
break;
case NdbDictionary::Column::Mediumunsigned:
ndbout << "Mediumunsigend";
case NdbDictionary::Column::Char:
out << "Char(" << col.getLength() << ")";
break;
case NdbDictionary::Column::Varchar:
out << "Varchar(" << col.getLength() << ")";
break;
case NdbDictionary::Column::Binary:
ndbout << "Binary";
out << "Binary(" << col.getLength() << ")";
break;
case NdbDictionary::Column::Varbinary:
ndbout << "Varbinary";
out << "Varbinary(" << col.getLength() << ")";
break;
case NdbDictionary::Column::Decimal:
ndbout << "Decimal";
case NdbDictionary::Column::Datetime:
out << "Datetime";
break;
case NdbDictionary::Column::Timespec:
ndbout << "Timespec";
out << "Timespec";
break;
case NdbDictionary::Column::Blob:
ndbout << "Blob";
out << "Blob(" << col.getInlineSize() << "," << col.getPartSize()
<< ";" << col.getStripeSize() << ")";
break;
case NdbDictionary::Column::Text:
out << "Text(" << col.getInlineSize() << "," << col.getPartSize()
<< ";" << col.getStripeSize() << ")";
break;
case NdbDictionary::Column::Undefined:
ndbout << "Undefined";
out << "Undefined";
break;
default:
ndbout << "Unknown type=" << (Uint32)type;
out << "Type" << (Uint32)col.getType();
break;
}
return ndbout;
if (col.getPrimaryKey())
out << " PRIMARY KEY";
else if (! col.getNullable())
out << " NOT NULL";
else
out << " NULL";
return out;
}
......@@ -181,7 +181,7 @@ NdbColumnImpl::equal(const NdbColumnImpl& col) const
case NdbDictionary::Column::Timespec:
break;
case NdbDictionary::Column::Blob:
case NdbDictionary::Column::Clob:
case NdbDictionary::Column::Text:
if (m_precision != col.m_precision ||
m_scale != col.m_scale ||
m_length != col.m_length) {
......@@ -1088,7 +1088,7 @@ columnTypeMapping[] = {
{ DictTabInfo::ExtDatetime, NdbDictionary::Column::Datetime },
{ DictTabInfo::ExtTimespec, NdbDictionary::Column::Timespec },
{ DictTabInfo::ExtBlob, NdbDictionary::Column::Blob },
{ DictTabInfo::ExtClob, NdbDictionary::Column::Clob },
{ DictTabInfo::ExtText, NdbDictionary::Column::Text },
{ -1, -1 }
};
......@@ -1253,7 +1253,7 @@ NdbDictionaryImpl::createBlobTables(NdbTableImpl &t)
{
for (unsigned i = 0; i < t.m_columns.size(); i++) {
NdbColumnImpl & c = *t.m_columns[i];
if (! c.getBlobType())
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
NdbTableImpl bt;
NdbBlob::getBlobTable(bt, &t, &c);
......@@ -1622,7 +1622,7 @@ NdbDictionaryImpl::dropBlobTables(NdbTableImpl & t)
{
for (unsigned i = 0; i < t.m_columns.size(); i++) {
NdbColumnImpl & c = *t.m_columns[i];
if (! c.getBlobType())
if (! c.getBlobType() || c.getPartSize() == 0)
continue;
char btname[NdbBlob::BlobTableNameSize];
NdbBlob::getBlobTableName(btname, &t, &c);
......
......@@ -441,7 +441,7 @@ inline
bool
NdbColumnImpl::getBlobType() const {
return (m_type == NdbDictionary::Column::Blob ||
m_type == NdbDictionary::Column::Clob);
m_type == NdbDictionary::Column::Text);
}
inline
......
......@@ -29,6 +29,7 @@ Adjust: 971206 UABRONM First version
#include <ndb_global.h>
#include <NdbOut.hpp>
#include <NdbRecAttr.hpp>
#include <NdbBlob.hpp>
#include "NdbDictionaryImpl.hpp"
#include <NdbTCP.h>
......@@ -147,78 +148,100 @@ NdbRecAttr::receive_data(const Uint32 * data, Uint32 sz){
return false;
}
NdbOut& operator<<(NdbOut& ndbout, const NdbRecAttr &r)
NdbOut& operator<<(NdbOut& out, const NdbRecAttr &r)
{
if (r.isNULL())
{
ndbout << "[NULL]";
return ndbout;
out << "[NULL]";
return out;
}
if (r.arraySize() > 1)
ndbout << "[";
out << "[";
for (Uint32 j = 0; j < r.arraySize(); j++)
{
if (j > 0)
ndbout << " ";
out << " ";
switch(r.getType())
{
case NdbDictionary::Column::Bigunsigned:
ndbout << r.u_64_value();
out << r.u_64_value();
break;
case NdbDictionary::Column::Unsigned:
ndbout << r.u_32_value();
out << r.u_32_value();
break;
case NdbDictionary::Column::Smallunsigned:
ndbout << r.u_short_value();
out << r.u_short_value();
break;
case NdbDictionary::Column::Tinyunsigned:
ndbout << (unsigned) r.u_char_value();
out << (unsigned) r.u_char_value();
break;
case NdbDictionary::Column::Bigint:
ndbout << r.int64_value();
out << r.int64_value();
break;
case NdbDictionary::Column::Int:
ndbout << r.int32_value();
out << r.int32_value();
break;
case NdbDictionary::Column::Smallint:
ndbout << r.short_value();
out << r.short_value();
break;
case NdbDictionary::Column::Tinyint:
ndbout << (int) r.char_value();
out << (int) r.char_value();
break;
case NdbDictionary::Column::Char:
ndbout.print("%.*s", r.arraySize(), r.aRef());
out.print("%.*s", r.arraySize(), r.aRef());
j = r.arraySize();
break;
case NdbDictionary::Column::Varchar:
{
short len = ntohs(r.u_short_value());
ndbout.print("%.*s", len, r.aRef()+2);
out.print("%.*s", len, r.aRef()+2);
}
j = r.arraySize();
break;
case NdbDictionary::Column::Float:
ndbout << r.float_value();
out << r.float_value();
break;
case NdbDictionary::Column::Double:
ndbout << r.double_value();
out << r.double_value();
break;
case NdbDictionary::Column::Blob:
{
const NdbBlob::Head* h = (const NdbBlob::Head*)r.aRef();
out << h->length << ":";
const unsigned char* p = (const unsigned char*)(h + 1);
unsigned n = r.arraySize() - sizeof(*h);
for (unsigned k = 0; k < n && k < h->length; k++)
out.print("%02X", (int)p[k]);
j = r.arraySize();
}
break;
case NdbDictionary::Column::Text:
{
const NdbBlob::Head* h = (const NdbBlob::Head*)r.aRef();
out << h->length << ":";
const unsigned char* p = (const unsigned char*)(h + 1);
unsigned n = r.arraySize() - sizeof(*h);
for (unsigned k = 0; k < n && k < h->length; k++)
out.print("%c", (int)p[k]);
j = r.arraySize();
}
break;
default: /* no print functions for the rest, just print type */
ndbout << r.getType();
out << r.getType();
j = r.arraySize();
if (j > 1)
ndbout << " %u times" << j;
out << " " << j << " times";
break;
}
}
if (r.arraySize() > 1)
{
ndbout << "]";
out << "]";
}
return ndbout;
return out;
}
......@@ -55,6 +55,13 @@ int NdbResultSet::nextResult(bool fetchAllowed)
return -1;
tBlob = tBlob->theNext;
}
/*
* Flush blob part ops on behalf of user because
* - nextResult is analogous to execute(NoCommit)
* - user is likely to want blob value before next execute
*/
if (m_operation->m_transConnection->executePendingBlobOps() == -1)
return -1;
return 0;
}
return res;
......
......@@ -23,7 +23,6 @@
#include <NdbOut.hpp>
class NDBT_Attribute : public NdbDictionary::Column {
friend class NdbOut& operator <<(class NdbOut&, const NDBT_Attribute &);
public:
NDBT_Attribute(const char* _name,
Column::Type _type,
......
......@@ -38,6 +38,7 @@ struct Bcol {
};
struct Opt {
unsigned m_batch;
bool m_core;
bool m_dbg;
bool m_dbgall;
......@@ -46,7 +47,8 @@ struct Opt {
unsigned m_parts;
unsigned m_rows;
unsigned m_seed;
char m_skip[255];
const char* m_skip;
const char* m_style;
// metadata
const char* m_tname;
const char* m_x1name; // hash index
......@@ -60,6 +62,7 @@ struct Opt {
int m_bug;
int (*m_bugtest)();
Opt() :
m_batch(7),
m_core(false),
m_dbg(false),
m_dbgall(false),
......@@ -68,6 +71,8 @@ struct Opt {
m_parts(10),
m_rows(100),
m_seed(0),
m_skip(""),
m_style("012"),
// metadata
m_tname("TBLOB1"),
m_x1name("TBLOB1X1"),
......@@ -80,7 +85,6 @@ struct Opt {
// bugs
m_bug(0),
m_bugtest(0) {
memset(m_skip, false, sizeof(m_skip));
}
};
......@@ -92,6 +96,7 @@ printusage()
Opt d;
ndbout
<< "usage: testBlobs options [default/max]" << endl
<< " -batch N number of pk ops in batch [" << d.m_batch << "]" << endl
<< " -core dump core on error" << endl
<< " -dbg print debug" << endl
<< " -dbgall print also NDB API debug (if compiled in)" << endl
......@@ -101,7 +106,8 @@ printusage()
<< " -parts N max parts in blob value [" << d.m_parts << "]" << endl
<< " -rows N number of rows [" << d.m_rows << "]" << endl
<< " -seed N random seed 0=loop number [" << d.m_seed << "]" << endl
<< " -skip xxx skip these tests (see list)" << endl
<< " -skip xxx skip these tests (see list) [" << d.m_skip << endl
<< " -style xxx access styles to test (see list) [" << d.m_style << "]" << endl
<< "metadata" << endl
<< " -pk2len N length of PK2 [" << d.m_pk2len << "/" << g_max_pk2len <<"]" << endl
<< " -oneblob only 1 blob attribute [default 2]" << endl
......@@ -111,8 +117,10 @@ printusage()
<< " s table scans" << endl
<< " r ordered index scans" << endl
<< " u update blob value" << endl
<< " v getValue / setValue" << endl
<< " w readData / writeData" << endl
<< "access styles for -style" << endl
<< " 0 getValue / setValue" << endl
<< " 1 setActiveHook" << endl
<< " 2 readData / writeData" << endl
<< "bug tests (no blob test)" << endl
<< " -bug 4088 ndb api hang with mixed ops on index table" << endl
<< " -bug 2222 delete + write gives 626" << endl
......@@ -122,11 +130,16 @@ printusage()
static Opt g_opt;
static char&
skip(unsigned x)
static bool
skipcase(int x)
{
assert(x < sizeof(g_opt.m_skip));
return g_opt.m_skip[x];
return strchr(g_opt.m_skip, x) != 0;
}
static bool
skipstyle(int x)
{
return strchr(g_opt.m_style, '0' + x) == 0;
}
static Ndb* g_ndb = 0;
......@@ -138,11 +151,12 @@ static NdbScanOperation* g_ops = 0;
static NdbBlob* g_bh1 = 0;
static NdbBlob* g_bh2 = 0;
static bool g_printerror = true;
static unsigned g_loop = 0;
static void
printerror(int line, const char* msg)
{
ndbout << "line " << line << ": " << msg << " failed" << endl;
ndbout << "line " << line << " FAIL " << msg << endl;
if (! g_printerror) {
return;
}
......@@ -205,6 +219,7 @@ static int
createTable()
{
NdbDictionary::Table tab(g_opt.m_tname);
tab.setLogging(false);
// col PK1 - Uint32
{ NdbDictionary::Column col("PK1");
col.setType(NdbDictionary::Column::Unsigned);
......@@ -228,11 +243,11 @@ createTable()
col.setPrimaryKey(true);
tab.addColumn(col);
}
// col BL2 - Clob nullable
// col BL2 - Text nullable
if (! g_opt.m_oneblob)
{ NdbDictionary::Column col("BL2");
const Bcol& b = g_opt.m_blob2;
col.setType(NdbDictionary::Column::Clob);
col.setType(NdbDictionary::Column::Text);
col.setNullable(true);
col.setInlineSize(b.m_inline);
col.setPartSize(b.m_partsize);
......@@ -245,6 +260,7 @@ createTable()
if (g_opt.m_pk2len != 0)
{ NdbDictionary::Index idx(g_opt.m_x1name);
idx.setType(NdbDictionary::Index::UniqueHashIndex);
idx.setLogging(false);
idx.setTable(g_opt.m_tname);
idx.addColumnName("PK2");
CHK(g_dic->createIndex(idx) == 0);
......@@ -281,7 +297,7 @@ struct Bval {
m_buf = new char [m_buflen];
trash();
}
void copy(const Bval& v) {
void copyfrom(const Bval& v) {
m_len = v.m_len;
delete [] m_val;
if (v.m_val == 0)
......@@ -313,10 +329,10 @@ struct Tup {
m_blob1.alloc(g_opt.m_blob1.m_inline + g_opt.m_blob1.m_partsize * g_opt.m_parts);
m_blob2.alloc(g_opt.m_blob2.m_inline + g_opt.m_blob2.m_partsize * g_opt.m_parts);
}
void copy(const Tup& tup) {
void copyfrom(const Tup& tup) {
assert(m_pk1 == tup.m_pk1);
m_blob1.copy(tup.m_blob1);
m_blob2.copy(tup.m_blob2);
m_blob1.copyfrom(tup.m_blob1);
m_blob2.copyfrom(tup.m_blob2);
}
private:
Tup(const Tup&);
......@@ -357,6 +373,14 @@ calcBval(const Bcol& b, Bval& v, bool keepsize)
v.trash();
}
static void
calcBval(Tup& tup, bool keepsize)
{
calcBval(g_opt.m_blob1, tup.m_blob1, keepsize);
if (! g_opt.m_oneblob)
calcBval(g_opt.m_blob2, tup.m_blob2, keepsize);
}
static void
calcTups(bool keepsize)
{
......@@ -371,14 +395,39 @@ calcTups(bool keepsize)
tup.m_pk2[i] = 'a' + i % 26;
}
}
calcBval(g_opt.m_blob1, tup.m_blob1, keepsize);
if (! g_opt.m_oneblob)
calcBval(g_opt.m_blob2, tup.m_blob2, keepsize);
calcBval(tup, keepsize);
}
}
// blob handle ops
static int
getBlobHandles(NdbOperation* opr)
{
CHK((g_bh1 = opr->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = opr->getBlobHandle("BL2")) != 0);
return 0;
}
static int
getBlobHandles(NdbIndexOperation* opx)
{
CHK((g_bh1 = opx->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = opx->getBlobHandle("BL2")) != 0);
return 0;
}
static int
getBlobHandles(NdbScanOperation* ops)
{
CHK((g_bh1 = ops->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = ops->getBlobHandle("BL2")) != 0);
return 0;
}
static int
getBlobLength(NdbBlob* h, unsigned& len)
{
......@@ -386,16 +435,19 @@ getBlobLength(NdbBlob* h, unsigned& len)
CHK(h->getLength(len2) == 0);
len = (unsigned)len2;
assert(len == len2);
DBG("getBlobLength " << h->getColumn()->getName() << " len=" << len);
return 0;
}
// setValue / getValue
static int
setBlobValue(NdbBlob* h, const Bval& v)
{
bool null = (v.m_val == 0);
bool isNull;
unsigned len;
DBG("set " << h->getColumn()->getName() << " len=" << v.m_len << " null=" << null);
DBG("setValue " << h->getColumn()->getName() << " len=" << v.m_len << " null=" << null);
if (null) {
CHK(h->setNull() == 0);
isNull = false;
......@@ -409,11 +461,20 @@ setBlobValue(NdbBlob* h, const Bval& v)
return 0;
}
static int
setBlobValue(const Tup& tup)
{
CHK(setBlobValue(g_bh1, tup.m_blob1) == 0);
if (! g_opt.m_oneblob)
CHK(setBlobValue(g_bh2, tup.m_blob2) == 0);
return 0;
}
static int
getBlobValue(NdbBlob* h, const Bval& v)
{
bool null = (v.m_val == 0);
DBG("get " << h->getColumn()->getName() << " len=" << v.m_len << " null=" << null);
DBG("getValue " << h->getColumn()->getName() << " buflen=" << v.m_buflen);
CHK(h->getValue(v.m_buf, v.m_buflen) == 0);
return 0;
}
......@@ -456,6 +517,8 @@ verifyBlobValue(const Tup& tup)
return 0;
}
// readData / writeData
static int
writeBlobData(NdbBlob* h, const Bval& v)
{
......@@ -469,6 +532,7 @@ writeBlobData(NdbBlob* h, const Bval& v)
CHK(h->getNull(isNull) == 0 && isNull == true);
CHK(getBlobLength(h, len) == 0 && len == 0);
} else {
CHK(h->truncate(v.m_len) == 0);
unsigned n = 0;
do {
unsigned m = g_opt.m_full ? v.m_len : urandom(v.m_len + 1);
......@@ -486,6 +550,15 @@ writeBlobData(NdbBlob* h, const Bval& v)
return 0;
}
static int
writeBlobData(const Tup& tup)
{
CHK(writeBlobData(g_bh1, tup.m_blob1) == 0);
if (! g_opt.m_oneblob)
CHK(writeBlobData(g_bh2, tup.m_blob2) == 0);
return 0;
}
static int
readBlobData(NdbBlob* h, const Bval& v)
{
......@@ -531,6 +604,71 @@ readBlobData(const Tup& tup)
return 0;
}
// hooks
static NdbBlob::ActiveHook blobWriteHook;
static int
blobWriteHook(NdbBlob* h, void* arg)
{
DBG("blobWriteHook");
Bval& v = *(Bval*)arg;
CHK(writeBlobData(h, v) == 0);
return 0;
}
static int
setBlobWriteHook(NdbBlob* h, Bval& v)
{
DBG("setBlobWriteHook");
CHK(h->setActiveHook(blobWriteHook, &v) == 0);
return 0;
}
static int
setBlobWriteHook(Tup& tup)
{
CHK(setBlobWriteHook(g_bh1, tup.m_blob1) == 0);
if (! g_opt.m_oneblob)
CHK(setBlobWriteHook(g_bh2, tup.m_blob2) == 0);
return 0;
}
static NdbBlob::ActiveHook blobReadHook;
// no PK yet to identify tuple so just read the value
static int
blobReadHook(NdbBlob* h, void* arg)
{
DBG("blobReadHook");
Bval& v = *(Bval*)arg;
unsigned len;
CHK(getBlobLength(h, len) == 0);
v.alloc(len);
Uint32 maxlen = 0xffffffff;
CHK(h->readData(v.m_buf, maxlen) == 0);
DBG("read " << maxlen << " bytes");
CHK(len == maxlen);
return 0;
}
static int
setBlobReadHook(NdbBlob* h, Bval& v)
{
DBG("setBlobReadHook");
CHK(h->setActiveHook(blobReadHook, &v) == 0);
return 0;
}
static int
setBlobReadHook(Tup& tup)
{
CHK(setBlobReadHook(g_bh1, tup.m_blob1) == 0);
if (! g_opt.m_oneblob)
CHK(setBlobReadHook(g_bh2, tup.m_blob2) == 0);
return 0;
}
// verify blob data
static int
......@@ -540,7 +678,11 @@ verifyHeadInline(const Bcol& c, const Bval& v, NdbRecAttr* ra)
CHK(ra->isNULL() == 1);
} else {
CHK(ra->isNULL() == 0);
CHK(ra->u_64_value() == v.m_len);
const NdbBlob::Head* head = (const NdbBlob::Head*)ra->aRef();
CHK(head->length == v.m_len);
const char* data = (const char*)(head + 1);
for (unsigned i = 0; i < head->length && i < c.m_inline; i++)
CHK(data[i] == v.m_val[i]);
}
return 0;
}
......@@ -548,7 +690,7 @@ verifyHeadInline(const Bcol& c, const Bval& v, NdbRecAttr* ra)
static int
verifyHeadInline(const Tup& tup)
{
DBG("verifyHeadInline pk1=" << tup.m_pk1);
DBG("verifyHeadInline pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
CHK(g_opr->readTuple() == 0);
......@@ -580,7 +722,7 @@ verifyHeadInline(const Tup& tup)
static int
verifyBlobTable(const Bcol& b, const Bval& v, Uint32 pk1, bool exists)
{
DBG("verify " << b.m_btname << " pk1=" << pk1);
DBG("verify " << b.m_btname << " pk1=" << hex << pk1);
NdbRecAttr* ra_pk;
NdbRecAttr* ra_part;
NdbRecAttr* ra_data;
......@@ -640,7 +782,7 @@ verifyBlob()
{
for (unsigned k = 0; k < g_opt.m_rows; k++) {
const Tup& tup = g_tups[k];
DBG("verifyBlob pk1=" << tup.m_pk1);
DBG("verifyBlob pk1=" << hex << tup.m_pk1);
CHK(verifyHeadInline(tup) == 0);
CHK(verifyBlobTable(tup) == 0);
}
......@@ -649,105 +791,120 @@ verifyBlob()
// operations
static const char* stylename[3] = {
"style=getValue/setValue",
"style=setActiveHook",
"style=readData/writeData"
};
// pk ops
static int
insertPk(bool rw)
insertPk(int style)
{
DBG("--- insertPk ---");
DBG("--- insertPk " << stylename[style] << " ---");
unsigned n = 0;
CHK((g_con = g_ndb->startTransaction()) != 0);
for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k];
DBG("insertPk pk1=" << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
DBG("insertPk pk1=" << hex << tup.m_pk1);
CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
CHK(g_opr->insertTuple() == 0);
CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
if (g_opt.m_pk2len != 0)
CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
CHK((g_bh1 = g_opr->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = g_opr->getBlobHandle("BL2")) != 0);
if (! rw) {
CHK(setBlobValue(g_bh1, tup.m_blob1) == 0);
if (! g_opt.m_oneblob)
CHK(setBlobValue(g_bh2, tup.m_blob2) == 0);
CHK(getBlobHandles(g_opr) == 0);
if (style == 0) {
CHK(setBlobValue(tup) == 0);
} else if (style == 1) {
// non-nullable must be set
CHK(g_bh1->setValue("", 0) == 0);
CHK(setBlobWriteHook(tup) == 0);
} else {
// non-nullable must be set
CHK(g_bh1->setValue("", 0) == 0);
CHK(g_con->execute(NoCommit) == 0);
CHK(writeBlobData(g_bh1, tup.m_blob1) == 0);
if (! g_opt.m_oneblob)
CHK(writeBlobData(g_bh2, tup.m_blob2) == 0);
CHK(writeBlobData(tup) == 0);
}
// just another trap
if (urandom(10) == 0)
CHK(g_con->execute(NoCommit) == 0);
if (++n == g_opt.m_batch) {
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
CHK((g_con = g_ndb->startTransaction()) != 0);
n = 0;
}
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
g_opr = 0;
g_con = 0;
tup.m_exists = true;
}
if (n != 0) {
CHK(g_con->execute(Commit) == 0);
n = 0;
}
g_ndb->closeTransaction(g_con);
g_con = 0;
return 0;
}
static int
updatePk(bool rw)
readPk(int style)
{
DBG("--- updatePk ---");
DBG("--- readPk " << stylename[style] << " ---");
for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k];
DBG("updatePk pk1=" << tup.m_pk1);
DBG("readPk pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
CHK(g_opr->updateTuple() == 0);
CHK(g_opr->readTuple() == 0);
CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
if (g_opt.m_pk2len != 0)
CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
CHK((g_bh1 = g_opr->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = g_opr->getBlobHandle("BL2")) != 0);
if (! rw) {
CHK(setBlobValue(g_bh1, tup.m_blob1) == 0);
if (! g_opt.m_oneblob)
CHK(setBlobValue(g_bh2, tup.m_blob2) == 0);
CHK(getBlobHandles(g_opr) == 0);
if (style == 0) {
CHK(getBlobValue(tup) == 0);
} else if (style == 1) {
CHK(setBlobReadHook(tup) == 0);
} else {
CHK(g_con->execute(NoCommit) == 0);
CHK(writeBlobData(g_bh1, tup.m_blob1) == 0);
if (! g_opt.m_oneblob)
CHK(writeBlobData(g_bh2, tup.m_blob2) == 0);
CHK(readBlobData(tup) == 0);
}
CHK(g_con->execute(Commit) == 0);
if (style == 0 || style == 1) {
CHK(verifyBlobValue(tup) == 0);
}
g_ndb->closeTransaction(g_con);
g_opr = 0;
g_con = 0;
tup.m_exists = true;
}
return 0;
}
static int
updateIdx(bool rw)
updatePk(int style)
{
DBG("--- updateIdx ---");
DBG("--- updatePk " << stylename[style] << " ---");
for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k];
DBG("updateIdx pk1=" << tup.m_pk1);
DBG("updatePk pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
CHK(g_opx->updateTuple() == 0);
CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
CHK((g_bh1 = g_opx->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = g_opx->getBlobHandle("BL2")) != 0);
if (! rw) {
CHK(setBlobValue(g_bh1, tup.m_blob1) == 0);
if (! g_opt.m_oneblob)
CHK(setBlobValue(g_bh2, tup.m_blob2) == 0);
CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
CHK(g_opr->updateTuple() == 0);
CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
if (g_opt.m_pk2len != 0)
CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
CHK(getBlobHandles(g_opr) == 0);
if (style == 0) {
CHK(setBlobValue(tup) == 0);
} else if (style == 1) {
CHK(setBlobWriteHook(tup) == 0);
} else {
CHK(g_con->execute(NoCommit) == 0);
CHK(writeBlobData(g_bh1, tup.m_blob1) == 0);
if (! g_opt.m_oneblob)
CHK(writeBlobData(g_bh2, tup.m_blob2) == 0);
CHK(writeBlobData(tup) == 0);
}
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
g_opx = 0;
g_opr = 0;
g_con = 0;
tup.m_exists = true;
}
......@@ -755,74 +912,115 @@ updateIdx(bool rw)
}
static int
readPk(bool rw)
deletePk()
{
DBG("--- readPk ---");
DBG("--- deletePk ---");
for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k];
DBG("readPk pk1=" << tup.m_pk1);
DBG("deletePk pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
CHK(g_opr->readTuple() == 0);
CHK(g_opr->deleteTuple() == 0);
CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
if (g_opt.m_pk2len != 0)
CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
CHK((g_bh1 = g_opr->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = g_opr->getBlobHandle("BL2")) != 0);
if (! rw) {
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
g_opr = 0;
g_con = 0;
tup.m_exists = false;
}
return 0;
}
// hash index ops
static int
readIdx(int style)
{
DBG("--- readIdx " << stylename[style] << " ---");
for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k];
DBG("readIdx pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
CHK(g_opx->readTuple() == 0);
CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
CHK(getBlobHandles(g_opx) == 0);
if (style == 0) {
CHK(getBlobValue(tup) == 0);
} else if (style == 1) {
CHK(setBlobReadHook(tup) == 0);
} else {
CHK(g_con->execute(NoCommit) == 0);
CHK(readBlobData(tup) == 0);
}
CHK(g_con->execute(Commit) == 0);
if (! rw) {
if (style == 0 || style == 1) {
CHK(verifyBlobValue(tup) == 0);
}
g_ndb->closeTransaction(g_con);
g_opr = 0;
g_opx = 0;
g_con = 0;
}
return 0;
}
static int
readIdx(bool rw)
updateIdx(int style)
{
DBG("--- readIdx ---");
DBG("--- updateIdx " << stylename[style] << " ---");
for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k];
DBG("readIdx pk1=" << tup.m_pk1);
DBG("updateIdx pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
CHK(g_opx->readTuple() == 0);
CHK(g_opx->updateTuple() == 0);
CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
CHK((g_bh1 = g_opx->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = g_opx->getBlobHandle("BL2")) != 0);
if (! rw) {
CHK(getBlobValue(tup) == 0);
CHK(getBlobHandles(g_opx) == 0);
if (style == 0) {
CHK(setBlobValue(tup) == 0);
} else if (style == 1) {
CHK(setBlobWriteHook(tup) == 0);
} else {
CHK(g_con->execute(NoCommit) == 0);
CHK(readBlobData(tup) == 0);
CHK(writeBlobData(tup) == 0);
}
CHK(g_con->execute(Commit) == 0);
if (! rw) {
CHK(verifyBlobValue(tup) == 0);
}
g_ndb->closeTransaction(g_con);
g_opx = 0;
g_con = 0;
tup.m_exists = true;
}
return 0;
}
static int
deleteIdx()
{
DBG("--- deleteIdx ---");
for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k];
DBG("deleteIdx pk1=" << hex << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
CHK(g_opx->deleteTuple() == 0);
CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
g_opx = 0;
g_con = 0;
tup.m_exists = false;
}
return 0;
}
// scan ops table and index
static int
readScan(bool rw, bool idx)
readScan(int style, bool idx)
{
const char* func = ! idx ? "scan read table" : "scan read index";
DBG("--- " << func << " ---");
DBG("--- " << "readScan" << (idx ? "Idx" : "") << " " << stylename[style] << " ---");
Tup tup;
tup.alloc(); // allocate buffers
NdbResultSet* rs;
......@@ -836,11 +1034,11 @@ readScan(bool rw, bool idx)
CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
if (g_opt.m_pk2len != 0)
CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
CHK((g_bh1 = g_ops->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = g_ops->getBlobHandle("BL2")) != 0);
if (! rw) {
CHK(getBlobHandles(g_ops) == 0);
if (style == 0) {
CHK(getBlobValue(tup) == 0);
} else if (style == 1) {
CHK(setBlobReadHook(tup) == 0);
}
CHK(g_con->execute(NoCommit) == 0);
unsigned rows = 0;
......@@ -851,11 +1049,14 @@ readScan(bool rw, bool idx)
CHK((ret = rs->nextResult(true)) == 0 || ret == 1);
if (ret == 1)
break;
DBG(func << " pk1=" << tup.m_pk1);
DBG("readScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1);
Uint32 k = tup.m_pk1 - g_opt.m_pk1off;
CHK(k < g_opt.m_rows && g_tups[k].m_exists);
tup.copy(g_tups[k]);
if (! rw) {
tup.copyfrom(g_tups[k]);
if (style == 0) {
CHK(verifyBlobValue(tup) == 0);
} else if (style == 1) {
// execute ops generated by callbacks, if any
CHK(verifyBlobValue(tup) == 0);
} else {
CHK(readBlobData(tup) == 0);
......@@ -870,52 +1071,63 @@ readScan(bool rw, bool idx)
}
static int
deletePk()
updateScan(int style, bool idx)
{
DBG("--- deletePk ---");
for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k];
DBG("deletePk pk1=" << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
CHK(g_opr->deleteTuple() == 0);
CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
if (g_opt.m_pk2len != 0)
CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
g_opr = 0;
g_con = 0;
tup.m_exists = false;
DBG("--- " << "updateScan" << (idx ? "Idx" : "") << " " << stylename[style] << " ---");
Tup tup;
tup.alloc(); // allocate buffers
NdbResultSet* rs;
CHK((g_con = g_ndb->startTransaction()) != 0);
if (! idx) {
CHK((g_ops = g_con->getNdbScanOperation(g_opt.m_tname)) != 0);
} else {
CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
}
return 0;
}
static int
deleteIdx()
{
DBG("--- deleteIdx ---");
for (unsigned k = 0; k < g_opt.m_rows; k++) {
Tup& tup = g_tups[k];
DBG("deleteIdx pk1=" << tup.m_pk1);
CHK((g_con = g_ndb->startTransaction()) != 0);
CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
CHK(g_opx->deleteTuple() == 0);
CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
g_opx = 0;
g_con = 0;
tup.m_exists = false;
CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Exclusive)) != 0);
CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
if (g_opt.m_pk2len != 0)
CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
CHK(g_con->execute(NoCommit) == 0);
unsigned rows = 0;
while (1) {
int ret;
tup.m_pk1 = (Uint32)-1;
memset(tup.m_pk2, 'x', g_opt.m_pk2len);
CHK((ret = rs->nextResult(true)) == 0 || ret == 1);
if (ret == 1)
break;
DBG("updateScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1);
Uint32 k = tup.m_pk1 - g_opt.m_pk1off;
CHK(k < g_opt.m_rows && g_tups[k].m_exists);
// calculate new blob values
calcBval(g_tups[k], false);
tup.copyfrom(g_tups[k]);
CHK((g_opr = rs->updateTuple()) != 0);
CHK(getBlobHandles(g_opr) == 0);
if (style == 0) {
CHK(setBlobValue(tup) == 0);
} else if (style == 1) {
CHK(setBlobWriteHook(tup) == 0);
} else {
CHK(g_con->execute(NoCommit) == 0);
CHK(writeBlobData(tup) == 0);
}
CHK(g_con->execute(NoCommit) == 0);
g_opr = 0;
rows++;
}
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
g_con = 0;
g_ops = 0;
CHK(g_opt.m_rows == rows);
return 0;
}
static int
deleteScan(bool idx)
{
const char* func = ! idx ? "scan delete table" : "scan delete index";
DBG("--- " << func << " ---");
DBG("--- " << "deleteScan" << (idx ? "Idx" : "") << " ---");
Tup tup;
NdbResultSet* rs;
CHK((g_con = g_ndb->startTransaction()) != 0);
......@@ -937,7 +1149,7 @@ deleteScan(bool idx)
CHK((ret = rs->nextResult()) == 0 || ret == 1);
if (ret == 1)
break;
DBG(func << " pk1=" << tup.m_pk1);
DBG("deleteScan" << (idx ? "Idx" : "") << " pk1=" << hex << tup.m_pk1);
CHK(rs->deleteTuple() == 0);
CHK(g_con->execute(NoCommit) == 0);
Uint32 k = tup.m_pk1 - g_opt.m_pk1off;
......@@ -948,7 +1160,6 @@ deleteScan(bool idx)
CHK(g_con->execute(Commit) == 0);
g_ndb->closeTransaction(g_con);
g_con = 0;
g_opr = 0;
g_ops = 0;
CHK(g_opt.m_rows == rows);
return 0;
......@@ -981,69 +1192,75 @@ testmain()
}
if (g_opt.m_seed != 0)
srandom(g_opt.m_seed);
for (unsigned loop = 0; g_opt.m_loop == 0 || loop < g_opt.m_loop; loop++) {
DBG("=== loop " << loop << " ===");
for (g_loop = 0; g_opt.m_loop == 0 || g_loop < g_opt.m_loop; g_loop++) {
DBG("=== loop " << g_loop << " ===");
if (g_opt.m_seed == 0)
srandom(loop);
bool llim = skip('v') ? true : false;
bool ulim = skip('w') ? false : true;
srandom(g_loop);
// pk
for (int rw = llim; rw <= ulim; rw++) {
if (skip('k'))
for (int style = 0; style <= 2; style++) {
if (skipcase('k') || skipstyle(style))
continue;
DBG("--- pk ops " << (! rw ? "get/set" : "read/write") << " ---");
DBG("--- pk ops " << stylename[style] << " ---");
calcTups(false);
CHK(insertPk(rw) == 0);
CHK(insertPk(style) == 0);
CHK(verifyBlob() == 0);
CHK(readPk(rw) == 0);
if (! skip('u')) {
calcTups(rw);
CHK(updatePk(rw) == 0);
CHK(readPk(style) == 0);
if (! skipcase('u')) {
calcTups(style);
CHK(updatePk(style) == 0);
CHK(verifyBlob() == 0);
}
CHK(readPk(rw) == 0);
CHK(readPk(style) == 0);
CHK(deletePk() == 0);
CHK(verifyBlob() == 0);
}
// hash index
for (int rw = llim; rw <= ulim; rw++) {
if (skip('i'))
for (int style = 0; style <= 2; style++) {
if (skipcase('i') || skipstyle(style))
continue;
DBG("--- idx ops " << (! rw ? "get/set" : "read/write") << " ---");
DBG("--- idx ops " << stylename[style] << " ---");
calcTups(false);
CHK(insertPk(rw) == 0);
CHK(insertPk(style) == 0);
CHK(verifyBlob() == 0);
CHK(readIdx(rw) == 0);
calcTups(rw);
if (! skip('u')) {
CHK(updateIdx(rw) == 0);
CHK(readIdx(style) == 0);
calcTups(style);
if (! skipcase('u')) {
CHK(updateIdx(style) == 0);
CHK(verifyBlob() == 0);
CHK(readIdx(rw) == 0);
CHK(readIdx(style) == 0);
}
CHK(deleteIdx() == 0);
CHK(verifyBlob() == 0);
}
// scan table
for (int rw = llim; rw <= ulim; rw++) {
if (skip('s'))
for (int style = 0; style <= 2; style++) {
if (skipcase('s') || skipstyle(style))
continue;
DBG("--- table scan " << (! rw ? "get/set" : "read/write") << " ---");
DBG("--- table scan " << stylename[style] << " ---");
calcTups(false);
CHK(insertPk(rw) == 0);
CHK(insertPk(style) == 0);
CHK(verifyBlob() == 0);
CHK(readScan(rw, false) == 0);
CHK(readScan(style, false) == 0);
if (! skipcase('u')) {
CHK(updateScan(style, false) == 0);
CHK(verifyBlob() == 0);
}
CHK(deleteScan(false) == 0);
CHK(verifyBlob() == 0);
}
// scan index
for (int rw = llim; rw <= ulim; rw++) {
if (skip('r'))
for (int style = 0; style <= 2; style++) {
if (skipcase('r') || skipstyle(style))
continue;
DBG("--- index scan " << (! rw ? "get/set" : "read/write") << " ---");
DBG("--- index scan " << stylename[style] << " ---");
calcTups(false);
CHK(insertPk(rw) == 0);
CHK(insertPk(style) == 0);
CHK(verifyBlob() == 0);
CHK(readScan(rw, true) == 0);
CHK(readScan(style, true) == 0);
if (! skipcase('u')) {
CHK(updateScan(style, true) == 0);
CHK(verifyBlob() == 0);
}
CHK(deleteScan(true) == 0);
CHK(verifyBlob() == 0);
}
......@@ -1121,6 +1338,12 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
{
while (++argv, --argc > 0) {
const char* arg = argv[0];
if (strcmp(arg, "-batch") == 0) {
if (++argv, --argc > 0) {
g_opt.m_batch = atoi(argv[0]);
continue;
}
}
if (strcmp(arg, "-core") == 0) {
g_opt.m_core = true;
continue;
......@@ -1165,9 +1388,13 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
}
if (strcmp(arg, "-skip") == 0) {
if (++argv, --argc > 0) {
for (const char* p = argv[0]; *p != 0; p++) {
skip(*p) = true;
}
g_opt.m_skip = strdup(argv[0]);
continue;
}
}
if (strcmp(arg, "-style") == 0) {
if (++argv, --argc > 0) {
g_opt.m_style = strdup(argv[0]);
continue;
}
}
......@@ -1175,10 +1402,6 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
if (strcmp(arg, "-pk2len") == 0) {
if (++argv, --argc > 0) {
g_opt.m_pk2len = atoi(argv[0]);
if (g_opt.m_pk2len == 0) {
skip('i') = true;
skip('r') = true;
}
if (g_opt.m_pk2len <= g_max_pk2len)
continue;
}
......@@ -1205,7 +1428,15 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
printusage();
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
if (g_opt.m_pk2len == 0) {
char b[100];
strcpy(b, g_opt.m_skip);
strcat(b, "i");
strcat(b, "r");
g_opt.m_skip = strdup(b);
}
if (testmain() == -1) {
ndbout << "line " << __LINE__ << " FAIL loop=" << g_loop << endl;
return NDBT_ProgramExit(NDBT_FAILED);
}
return NDBT_ProgramExit(NDBT_OK);
......
......@@ -18,35 +18,6 @@
#include <NdbTimer.hpp>
#include <NDBT.hpp>
class NdbOut&
operator <<(class NdbOut& ndbout, const NDBT_Attribute & attr){
NdbDictionary::Column::Type type = attr.getType();
ndbout << attr.getName() << " " << type;
switch(type){
case NdbDictionary::Column::Decimal:
ndbout << "(" << attr.getScale() << ", " << attr.getPrecision() << ")";
break;
default:
break;
}
if(attr.getLength() != 1)
ndbout << "[" << attr.getLength() << "]";
if(attr.getNullable())
ndbout << " NULL";
else
ndbout << " NOT NULL";
if(attr.getPrimaryKey())
ndbout << " PRIMARY KEY";
return ndbout;
}
class NdbOut&
operator <<(class NdbOut& ndbout, const NDBT_Table & tab)
{
......
......@@ -830,7 +830,8 @@ void NDBT_TestSuite::execute(Ndb* ndb, const NdbDictionary::Table* pTab,
if(pTab2 == 0 && pDict->createTable(* pTab) != 0){
numTestsFail++;
numTestsExecuted++;
g_err << "ERROR1: Failed to create table " << pTab->getName() << endl;
g_err << "ERROR1: Failed to create table " << pTab->getName()
<< pDict->getNdbError() << endl;
tests[t]->saveTestResult(pTab, FAILED_TO_CREATE);
continue;
}
......
......@@ -181,6 +181,45 @@ bool ha_ndbcluster::get_error_message(int error,
}
/*
Check if type is supported by NDB.
TODO Use this once, not in every operation
*/
static inline bool ndb_supported_type(enum_field_types type)
{
switch (type) {
case MYSQL_TYPE_DECIMAL:
case MYSQL_TYPE_TINY:
case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_INT24:
case MYSQL_TYPE_LONGLONG:
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_DOUBLE:
case MYSQL_TYPE_TIMESTAMP:
case MYSQL_TYPE_DATETIME:
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_NEWDATE:
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_YEAR:
case MYSQL_TYPE_STRING:
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_SET:
return true;
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
break;
}
return false;
}
/*
Instruct NDB to set the value of the hidden primary key
*/
......@@ -208,40 +247,15 @@ int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
pack_len));
DBUG_DUMP("key", (char*)field_ptr, pack_len);
switch (field->type()) {
case MYSQL_TYPE_DECIMAL:
case MYSQL_TYPE_TINY:
case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_DOUBLE:
case MYSQL_TYPE_TIMESTAMP:
case MYSQL_TYPE_LONGLONG:
case MYSQL_TYPE_INT24:
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_DATETIME:
case MYSQL_TYPE_YEAR:
case MYSQL_TYPE_NEWDATE:
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_SET:
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_STRING:
// Common implementation for most field types
DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
default:
// Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
if (ndb_supported_type(field->type()))
{
if (! (field->flags & BLOB_FLAG))
// Common implementation for most field types
DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
}
DBUG_RETURN(3);
// Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
}
......@@ -259,63 +273,197 @@ int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
fieldnr, field->field_name, field->type(),
pack_len, field->is_null()?"Y":"N"));
DBUG_DUMP("value", (char*) field_ptr, pack_len);
if (field->is_null())
if (ndb_supported_type(field->type()))
{
// Set value to NULL
DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
}
switch (field->type()) {
case MYSQL_TYPE_DECIMAL:
case MYSQL_TYPE_TINY:
case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_DOUBLE:
case MYSQL_TYPE_TIMESTAMP:
case MYSQL_TYPE_LONGLONG:
case MYSQL_TYPE_INT24:
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_DATETIME:
case MYSQL_TYPE_YEAR:
case MYSQL_TYPE_NEWDATE:
case MYSQL_TYPE_ENUM:
case MYSQL_TYPE_SET:
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_STRING:
// Common implementation for most field types
DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
case MYSQL_TYPE_TINY_BLOB:
case MYSQL_TYPE_MEDIUM_BLOB:
case MYSQL_TYPE_LONG_BLOB:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
default:
// Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
if (! (field->flags & BLOB_FLAG))
{
if (field->is_null())
// Set value to NULL
DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
// Common implementation for most field types
DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
}
// Blob type
NdbBlob *ndb_blob = ndb_op->getBlobHandle(fieldnr);
if (ndb_blob != NULL)
{
if (field->is_null())
DBUG_RETURN(ndb_blob->setNull() != 0);
Field_blob *field_blob= (Field_blob*)field;
// Get length and pointer to data
uint32 blob_len= field_blob->get_length(field_ptr);
char* blob_ptr= NULL;
field_blob->get_ptr(&blob_ptr);
// Looks like NULL blob can also be signaled in this way
if (blob_ptr == NULL)
DBUG_RETURN(ndb_blob->setNull() != 0);
DBUG_PRINT("value", ("set blob ptr=%x len=%u",
(unsigned)blob_ptr, blob_len));
DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26));
// No callback needed to write value
DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0);
}
DBUG_RETURN(1);
}
// Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
}
/*
Callback to read all blob values.
- not done in unpack_record because unpack_record is valid
after execute(Commit) but reading blobs is not
- may only generate read operations; they have to be executed
somewhere before the data is available
- due to single buffer for all blobs, we let the last blob
process all blobs (last so that all are active)
- null bit is still set in unpack_record
- TODO allocate blob part aligned buffers
*/
NdbBlob::ActiveHook get_ndb_blobs_value;
int get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
{
DBUG_ENTER("get_ndb_blobs_value [callback]");
if (ndb_blob->blobsNextBlob() != NULL)
DBUG_RETURN(0);
ha_ndbcluster *ha= (ha_ndbcluster *)arg;
DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob));
}
int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
{
DBUG_ENTER("get_ndb_blobs_value");
// Field has no field number so cannot use TABLE blob_field
// Loop twice, first only counting total buffer size
for (int loop= 0; loop <= 1; loop++)
{
uint32 offset= 0;
for (uint i= 0; i < table->fields; i++)
{
Field *field= table->field[i];
NdbValue value= m_value[i];
if (value.ptr != NULL && (field->flags & BLOB_FLAG))
{
Field_blob *field_blob= (Field_blob *)field;
NdbBlob *ndb_blob= value.blob;
Uint64 blob_len= 0;
if (ndb_blob->getLength(blob_len) != 0)
DBUG_RETURN(-1);
// Align to Uint64
uint32 blob_size= blob_len;
if (blob_size % 8 != 0)
blob_size+= 8 - blob_size % 8;
if (loop == 1)
{
char *buf= blobs_buffer + offset;
uint32 len= 0xffffffff; // Max uint32
DBUG_PRINT("value", ("read blob ptr=%x len=%u",
(uint)buf, (uint)blob_len));
if (ndb_blob->readData(buf, len) != 0)
DBUG_RETURN(-1);
DBUG_ASSERT(len == blob_len);
field_blob->set_ptr(len, buf);
}
offset+= blob_size;
}
}
if (loop == 0 && offset > blobs_buffer_size)
{
my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
blobs_buffer_size= 0;
DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
blobs_buffer= my_malloc(offset, MYF(MY_WME));
if (blobs_buffer == NULL)
DBUG_RETURN(-1);
blobs_buffer_size= offset;
}
}
DBUG_RETURN(3);
DBUG_RETURN(0);
}
/*
Instruct NDB to fetch one field
- data is read directly into buffer provided by field_ptr
if it's NULL, data is read into memory provided by NDBAPI
- data is read directly into buffer provided by field
if field is NULL, data is read into memory provided by NDBAPI
*/
int ha_ndbcluster::get_ndb_value(NdbOperation *op,
uint field_no, byte *field_ptr)
int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
uint fieldnr)
{
DBUG_ENTER("get_ndb_value");
DBUG_PRINT("enter", ("field_no: %d", field_no));
m_value[field_no]= op->getValue(field_no, field_ptr);
DBUG_RETURN(m_value == NULL);
DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr,
(int)(field != NULL ? field->flags : 0)));
if (field != NULL)
{
if (ndb_supported_type(field->type()))
{
DBUG_ASSERT(field->ptr != NULL);
if (! (field->flags & BLOB_FLAG))
{
m_value[fieldnr].rec= ndb_op->getValue(fieldnr, field->ptr);
DBUG_RETURN(m_value[fieldnr].rec == NULL);
}
// Blob type
NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
m_value[fieldnr].blob= ndb_blob;
if (ndb_blob != NULL)
{
// Set callback
void *arg= (void *)this;
DBUG_RETURN(ndb_blob->setActiveHook(::get_ndb_blobs_value, arg) != 0);
}
DBUG_RETURN(1);
}
// Unhandled field types
DBUG_PRINT("error", ("Field type %d not supported", field->type()));
DBUG_RETURN(2);
}
// Used for hidden key only
m_value[fieldnr].rec= ndb_op->getValue(fieldnr, NULL);
DBUG_RETURN(m_value[fieldnr].rec == NULL);
}
/*
Check if any set or get of blob value in current query.
*/
bool ha_ndbcluster::uses_blob_value(bool all_fields)
{
if (table->blob_fields == 0)
return false;
if (all_fields)
return true;
{
uint no_fields= table->fields;
int i;
THD *thd= current_thd;
// They always put blobs at the end..
for (i= no_fields - 1; i >= 0; i--)
{
Field *field= table->field[i];
if (thd->query_id == field->query_id)
{
return true;
}
}
}
return false;
}
......@@ -462,10 +610,19 @@ void ha_ndbcluster::release_metadata()
DBUG_VOID_RETURN;
}
NdbScanOperation::LockMode get_ndb_lock_type(enum thr_lock_type type)
int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
return (type == TL_WRITE_ALLOW_WRITE) ?
NdbScanOperation::LM_Exclusive : NdbScanOperation::LM_CommittedRead;
int lm;
if (type == TL_WRITE_ALLOW_WRITE)
lm = NdbScanOperation::LM_Exclusive;
else if (uses_blob_value(retrieve_all_fields))
/*
TODO use a new scan mode to read + lock + keyinfo
*/
lm = NdbScanOperation::LM_Exclusive;
else
lm = NdbScanOperation::LM_CommittedRead;
return lm;
}
static const ulong index_type_flags[]=
......@@ -615,7 +772,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
ERR_RETURN(trans->getNdbError());
// Read key at the same time, for future reference
if (get_ndb_value(op, no_fields, NULL))
if (get_ndb_value(op, NULL, no_fields))
ERR_RETURN(trans->getNdbError());
}
else
......@@ -632,13 +789,13 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
if ((thd->query_id == field->query_id) ||
retrieve_all_fields)
{
if (get_ndb_value(op, i, field->ptr))
if (get_ndb_value(op, field, i))
ERR_RETURN(trans->getNdbError());
}
else
{
// Attribute was not to be read
m_value[i]= NULL;
m_value[i].ptr= NULL;
}
}
......@@ -747,13 +904,13 @@ int ha_ndbcluster::unique_index_read(const byte *key,
if ((thd->query_id == field->query_id) ||
(field->flags & PRI_KEY_FLAG))
{
if (get_ndb_value(op, i, field->ptr))
if (get_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError());
}
else
{
// Attribute was not to be read
m_value[i]= NULL;
m_value[i].ptr= NULL;
}
}
......@@ -796,11 +953,22 @@ inline int ha_ndbcluster::next_result(byte *buf)
bool contact_ndb = m_lock.type != TL_WRITE_ALLOW_WRITE;
do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
/*
We can only handle one tuple with blobs at a time.
*/
if (ops_pending && blobs_pending)
{
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
ops_pending= 0;
blobs_pending= false;
}
check= cursor->nextResult(contact_ndb);
if (check == 0)
{
// One more record found
DBUG_PRINT("info", ("One more record found"));
unpack_record(buf);
table->status= 0;
DBUG_RETURN(0);
......@@ -914,8 +1082,10 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
index_name= get_index_name(active_index);
if (!(op= trans->getNdbIndexScanOperation(index_name, m_tabname)))
ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,
parallelism, sorted)))
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism, sorted)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
......@@ -975,7 +1145,9 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
if (!(op= trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism)))
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
......@@ -1044,7 +1216,9 @@ int ha_ndbcluster::full_table_scan(byte *buf)
if (!(op=trans->getNdbScanOperation(m_tabname)))
ERR_RETURN(trans->getNdbError());
if (!(cursor= op->readTuples(get_ndb_lock_type(m_lock.type), 0,parallelism)))
NdbScanOperation::LockMode lm= (NdbScanOperation::LockMode)
get_ndb_lock_type(m_lock.type);
if (!(cursor= op->readTuples(lm, 0, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= cursor;
DBUG_RETURN(define_read_attrs(buf, op));
......@@ -1068,12 +1242,12 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
(field->flags & PRI_KEY_FLAG) ||
retrieve_all_fields)
{
if (get_ndb_value(op, i, field->ptr))
if (get_ndb_value(op, field, i))
ERR_RETURN(op->getNdbError());
}
else
{
m_value[i]= NULL;
m_value[i].ptr= NULL;
}
}
......@@ -1087,7 +1261,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
if (!tab->getColumn(hidden_no))
DBUG_RETURN(1);
#endif
if (get_ndb_value(op, hidden_no, NULL))
if (get_ndb_value(op, NULL, hidden_no))
ERR_RETURN(op->getNdbError());
}
......@@ -1155,12 +1329,13 @@ int ha_ndbcluster::write_row(byte *record)
*/
rows_inserted++;
if ((rows_inserted == rows_to_insert) ||
((rows_inserted % bulk_insert_rows) == 0))
((rows_inserted % bulk_insert_rows) == 0) ||
uses_blob_value(false) != 0)
{
// Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d",
rows_inserted, bulk_insert_rows));
(int)rows_inserted, (int)bulk_insert_rows));
if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans));
}
......@@ -1270,6 +1445,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
if (!(op= cursor->updateTuple()))
ERR_RETURN(trans->getNdbError());
ops_pending++;
if (uses_blob_value(false))
blobs_pending= true;
}
else
{
......@@ -1285,7 +1462,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
// Require that the PK for this record has previously been
// read into m_value
uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields];
NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec);
DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
......@@ -1360,7 +1537,7 @@ int ha_ndbcluster::delete_row(const byte *record)
// This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key"));
uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields];
NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec != NULL);
if (set_hidden_key(op, no_fields, rec->aRef()))
......@@ -1398,7 +1575,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
{
uint row_offset= (uint) (buf - table->record[0]);
Field **field, **end;
NdbRecAttr **value= m_value;
NdbValue *value= m_value;
DBUG_ENTER("unpack_record");
// Set null flag(s)
......@@ -1407,8 +1584,23 @@ void ha_ndbcluster::unpack_record(byte* buf)
field < end;
field++, value++)
{
if (*value && (*value)->isNULL())
(*field)->set_null(row_offset);
if ((*value).ptr)
{
if (! ((*field)->flags & BLOB_FLAG))
{
if ((*value).rec->isNULL())
(*field)->set_null(row_offset);
}
else
{
NdbBlob* ndb_blob= (*value).blob;
bool isNull= true;
int ret= ndb_blob->getNull(isNull);
DBUG_ASSERT(ret == 0);
if (isNull)
(*field)->set_null(row_offset);
}
}
}
#ifndef DBUG_OFF
......@@ -1419,7 +1611,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
int hidden_no= table->fields;
const NDBTAB *tab= (NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
NdbRecAttr* rec= m_value[hidden_no];
NdbRecAttr* rec= m_value[hidden_no].rec;
DBUG_ASSERT(rec);
DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
hidden_col->getName(), rec->u_64_value()));
......@@ -1446,9 +1638,9 @@ void ha_ndbcluster::print_results()
{
Field *field;
const NDBCOL *col;
NdbRecAttr *value;
NdbValue value;
if (!(value= m_value[f]))
if (!(value= m_value[f]).ptr)
{
fprintf(DBUG_FILE, "Field %d was not read\n", f);
continue;
......@@ -1457,19 +1649,28 @@ void ha_ndbcluster::print_results()
DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length());
col= tab->getColumn(f);
fprintf(DBUG_FILE, "%d: %s\t", f, col->getName());
if (value->isNULL())
NdbBlob *ndb_blob= NULL;
if (! (field->flags & BLOB_FLAG))
{
fprintf(DBUG_FILE, "NULL\n");
continue;
if (value.rec->isNULL())
{
fprintf(DBUG_FILE, "NULL\n");
continue;
}
}
else
{
ndb_blob= value.blob;
bool isNull= true;
ndb_blob->getNull(isNull);
if (isNull) {
fprintf(DBUG_FILE, "NULL\n");
continue;
}
}
switch (col->getType()) {
case NdbDictionary::Column::Blob:
case NdbDictionary::Column::Clob:
case NdbDictionary::Column::Undefined:
fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
break;
case NdbDictionary::Column::Tinyint: {
char value= *field->ptr;
fprintf(DBUG_FILE, "Tinyint\t%d", value);
......@@ -1561,6 +1762,21 @@ void ha_ndbcluster::print_results()
fprintf(DBUG_FILE, "Timespec\t%llu", value);
break;
}
case NdbDictionary::Column::Blob: {
Uint64 len= 0;
ndb_blob->getLength(len);
fprintf(DBUG_FILE, "Blob\t[len=%u]", (unsigned)len);
break;
}
case NdbDictionary::Column::Text: {
Uint64 len= 0;
ndb_blob->getLength(len);
fprintf(DBUG_FILE, "Text\t[len=%u]", (unsigned)len);
break;
}
case NdbDictionary::Column::Undefined:
fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
break;
}
fprintf(DBUG_FILE, "\n");
......@@ -1806,7 +2022,7 @@ void ha_ndbcluster::position(const byte *record)
// No primary key, get hidden key
DBUG_PRINT("info", ("Getting hidden key"));
int hidden_no= table->fields;
NdbRecAttr* rec= m_value[hidden_no];
NdbRecAttr* rec= m_value[hidden_no].rec;
const NDBTAB *tab= (NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
DBUG_ASSERT(hidden_col->getPrimaryKey() &&
......@@ -1980,7 +2196,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
const NDBTAB *tab= (NDBTAB *) m_table;
DBUG_ENTER("start_bulk_insert");
DBUG_PRINT("enter", ("rows: %d", rows));
DBUG_PRINT("enter", ("rows: %d", (int)rows));
rows_inserted= 0;
rows_to_insert= rows;
......@@ -2017,7 +2233,7 @@ int ha_ndbcluster::end_bulk_insert()
int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
{
DBUG_ENTER("extra_opt");
DBUG_PRINT("enter", ("cache_size: %d", cache_size));
DBUG_PRINT("enter", ("cache_size: %lu", cache_size));
DBUG_RETURN(extra(operation));
}
......@@ -2238,7 +2454,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
NdbConnection *tablock_trans=
(NdbConnection*)thd->transaction.all.ndb_tid;
DBUG_PRINT("info", ("tablock_trans: %x", tablock_trans));
DBUG_PRINT("info", ("tablock_trans: %x", (uint)tablock_trans));
DBUG_ASSERT(tablock_trans); trans= m_ndb->hupp(tablock_trans);
if (trans == NULL)
ERR_RETURN(m_ndb->getNdbError());
......@@ -2315,71 +2531,184 @@ int ndbcluster_rollback(THD *thd, void *ndb_transaction)
/*
Map MySQL type to the corresponding NDB type
Define NDB column based on Field.
Returns 0 or mysql error code.
Not member of ha_ndbcluster because NDBCOL cannot be declared.
*/
inline NdbDictionary::Column::Type
mysql_to_ndb_type(enum enum_field_types mysql_type, bool unsigned_flg)
static int create_ndb_column(NDBCOL &col,
Field *field,
HA_CREATE_INFO *info)
{
switch(mysql_type) {
// Set name
col.setName(field->field_name);
// Set type and sizes
const enum enum_field_types mysql_type= field->real_type();
switch (mysql_type) {
// Numeric types
case MYSQL_TYPE_DECIMAL:
return NdbDictionary::Column::Char;
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_TINY:
return (unsigned_flg) ?
NdbDictionary::Column::Tinyunsigned :
NdbDictionary::Column::Tinyint;
if (field->flags & UNSIGNED_FLAG)
col.setType(NDBCOL::Tinyunsigned);
else
col.setType(NDBCOL::Tinyint);
col.setLength(1);
break;
case MYSQL_TYPE_SHORT:
return (unsigned_flg) ?
NdbDictionary::Column::Smallunsigned :
NdbDictionary::Column::Smallint;
if (field->flags & UNSIGNED_FLAG)
col.setType(NDBCOL::Smallunsigned);
else
col.setType(NDBCOL::Smallint);
col.setLength(1);
break;
case MYSQL_TYPE_LONG:
return (unsigned_flg) ?
NdbDictionary::Column::Unsigned :
NdbDictionary::Column::Int;
case MYSQL_TYPE_TIMESTAMP:
return NdbDictionary::Column::Unsigned;
case MYSQL_TYPE_LONGLONG:
return (unsigned_flg) ?
NdbDictionary::Column::Bigunsigned :
NdbDictionary::Column::Bigint;
if (field->flags & UNSIGNED_FLAG)
col.setType(NDBCOL::Unsigned);
else
col.setType(NDBCOL::Int);
col.setLength(1);
break;
case MYSQL_TYPE_INT24:
return (unsigned_flg) ?
NdbDictionary::Column::Mediumunsigned :
NdbDictionary::Column::Mediumint;
if (field->flags & UNSIGNED_FLAG)
col.setType(NDBCOL::Mediumunsigned);
else
col.setType(NDBCOL::Mediumint);
col.setLength(1);
break;
case MYSQL_TYPE_LONGLONG:
if (field->flags & UNSIGNED_FLAG)
col.setType(NDBCOL::Bigunsigned);
else
col.setType(NDBCOL::Bigint);
col.setLength(1);
break;
case MYSQL_TYPE_FLOAT:
return NdbDictionary::Column::Float;
col.setType(NDBCOL::Float);
col.setLength(1);
break;
case MYSQL_TYPE_DOUBLE:
return NdbDictionary::Column::Double;
case MYSQL_TYPE_DATETIME :
return NdbDictionary::Column::Datetime;
case MYSQL_TYPE_DATE :
case MYSQL_TYPE_NEWDATE :
case MYSQL_TYPE_TIME :
case MYSQL_TYPE_YEAR :
// Missing NDB data types, mapped to char
return NdbDictionary::Column::Char;
case MYSQL_TYPE_ENUM :
return NdbDictionary::Column::Char;
case MYSQL_TYPE_SET :
return NdbDictionary::Column::Char;
case MYSQL_TYPE_TINY_BLOB :
case MYSQL_TYPE_MEDIUM_BLOB :
case MYSQL_TYPE_LONG_BLOB :
case MYSQL_TYPE_BLOB :
return NdbDictionary::Column::Blob;
case MYSQL_TYPE_VAR_STRING :
return NdbDictionary::Column::Varchar;
case MYSQL_TYPE_STRING :
return NdbDictionary::Column::Char;
case MYSQL_TYPE_NULL :
case MYSQL_TYPE_GEOMETRY :
return NdbDictionary::Column::Undefined;
}
return NdbDictionary::Column::Undefined;
col.setType(NDBCOL::Double);
col.setLength(1);
break;
// Date types
case MYSQL_TYPE_TIMESTAMP:
col.setType(NDBCOL::Unsigned);
col.setLength(1);
break;
case MYSQL_TYPE_DATETIME:
col.setType(NDBCOL::Datetime);
col.setLength(1);
break;
case MYSQL_TYPE_DATE:
case MYSQL_TYPE_NEWDATE:
case MYSQL_TYPE_TIME:
case MYSQL_TYPE_YEAR:
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
// Char types
case MYSQL_TYPE_STRING:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Binary);
else
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_VAR_STRING:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Varbinary);
else
col.setType(NDBCOL::Varchar);
col.setLength(field->pack_length());
break;
// Blob types (all come in as MYSQL_TYPE_BLOB)
mysql_type_tiny_blob:
case MYSQL_TYPE_TINY_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
col.setInlineSize(256);
// No parts
col.setPartSize(0);
col.setStripeSize(0);
break;
mysql_type_blob:
case MYSQL_TYPE_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
// Use "<=" even if "<" is the exact condition
if (field->max_length() <= (1 << 8))
goto mysql_type_tiny_blob;
else if (field->max_length() <= (1 << 16))
{
col.setInlineSize(256);
col.setPartSize(2000);
col.setStripeSize(16);
}
else if (field->max_length() <= (1 << 24))
goto mysql_type_medium_blob;
else
goto mysql_type_long_blob;
break;
mysql_type_medium_blob:
case MYSQL_TYPE_MEDIUM_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
col.setInlineSize(256);
col.setPartSize(4000);
col.setStripeSize(8);
break;
mysql_type_long_blob:
case MYSQL_TYPE_LONG_BLOB:
if (field->flags & BINARY_FLAG)
col.setType(NDBCOL::Blob);
else
col.setType(NDBCOL::Text);
col.setInlineSize(256);
col.setPartSize(8000);
col.setStripeSize(4);
break;
// Other types
case MYSQL_TYPE_ENUM:
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_SET:
col.setType(NDBCOL::Char);
col.setLength(field->pack_length());
break;
case MYSQL_TYPE_NULL:
case MYSQL_TYPE_GEOMETRY:
goto mysql_type_unsupported;
mysql_type_unsupported:
default:
return HA_ERR_UNSUPPORTED;
}
// Set nullable and pk
col.setNullable(field->maybe_null());
col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
// Set autoincrement
if (field->flags & AUTO_INCREMENT_FLAG)
{
col.setAutoIncrement(TRUE);
ulonglong value= info->auto_increment_value ?
info->auto_increment_value -1 : (ulonglong) 0;
DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value));
col.setAutoIncrementInitialValue(value);
}
else
col.setAutoIncrement(false);
return 0;
}
/*
Create a table in NDB Cluster
*/
......@@ -2389,7 +2718,6 @@ int ha_ndbcluster::create(const char *name,
HA_CREATE_INFO *info)
{
NDBTAB tab;
NdbDictionary::Column::Type ndb_type;
NDBCOL col;
uint pack_length, length, i;
const void *data, *pack_data;
......@@ -2420,31 +2748,11 @@ int ha_ndbcluster::create(const char *name,
for (i= 0; i < form->fields; i++)
{
Field *field= form->field[i];
ndb_type= mysql_to_ndb_type(field->real_type(),
field->flags & UNSIGNED_FLAG);
DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
field->field_name, field->real_type(),
field->pack_length()));
col.setName(field->field_name);
col.setType(ndb_type);
if ((ndb_type == NdbDictionary::Column::Char) ||
(ndb_type == NdbDictionary::Column::Varchar))
col.setLength(field->pack_length());
else
col.setLength(1);
col.setNullable(field->maybe_null());
col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
if (field->flags & AUTO_INCREMENT_FLAG)
{
col.setAutoIncrement(TRUE);
ulonglong value= info->auto_increment_value ?
info->auto_increment_value -1 : (ulonglong) 0;
DBUG_PRINT("info", ("Autoincrement key, initial: %d", value));
col.setAutoIncrementInitialValue(value);
}
else
col.setAutoIncrement(false);
if (my_errno= create_ndb_column(col, field, info))
DBUG_RETURN(my_errno);
tab.addColumn(col);
}
......@@ -2712,14 +3020,15 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_table(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NOT_EXACT_COUNT |
HA_NO_PREFIX_CHAR_KEYS |
HA_NO_BLOBS),
HA_NO_PREFIX_CHAR_KEYS),
m_use_write(false),
retrieve_all_fields(FALSE),
rows_to_insert(1),
rows_inserted(0),
bulk_insert_rows(1024),
ops_pending(0)
ops_pending(0),
blobs_buffer(0),
blobs_buffer_size(0)
{
int i;
......@@ -2752,6 +3061,8 @@ ha_ndbcluster::~ha_ndbcluster()
DBUG_ENTER("~ha_ndbcluster");
release_metadata();
my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
blobs_buffer= 0;
// Check for open cursor/transaction
DBUG_ASSERT(m_active_cursor == NULL);
......
......@@ -35,6 +35,7 @@ class NdbRecAttr; // Forward declaration
class NdbResultSet; // Forward declaration
class NdbScanOperation;
class NdbIndexScanOperation;
class NdbBlob;
typedef enum ndb_index_type {
UNDEFINED_INDEX = 0,
......@@ -171,6 +172,7 @@ class ha_ndbcluster: public handler
enum ha_rkey_function find_flag);
int close_scan();
void unpack_record(byte *buf);
int get_ndb_lock_type(enum thr_lock_type type);
void set_dbname(const char *pathname);
void set_tabname(const char *pathname);
......@@ -181,7 +183,9 @@ class ha_ndbcluster: public handler
int set_ndb_key(NdbOperation*, Field *field,
uint fieldnr, const byte* field_ptr);
int set_ndb_value(NdbOperation*, Field *field, uint fieldnr);
int get_ndb_value(NdbOperation*, uint fieldnr, byte *field_ptr);
int get_ndb_value(NdbOperation*, Field *field, uint fieldnr);
friend int ::get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg);
int get_ndb_blobs_value(NdbBlob *last_ndb_blob);
int set_primary_key(NdbOperation *op, const byte *key);
int set_primary_key(NdbOperation *op);
int set_primary_key_from_old_data(NdbOperation *op, const byte *old_data);
......@@ -191,8 +195,8 @@ class ha_ndbcluster: public handler
void print_results();
longlong get_auto_increment();
int ndb_err(NdbConnection*);
bool uses_blob_value(bool all_fields);
private:
int check_ndb_connection();
......@@ -209,13 +213,19 @@ class ha_ndbcluster: public handler
NDB_SHARE *m_share;
NDB_INDEX_TYPE m_indextype[MAX_KEY];
const char* m_unique_index_name[MAX_KEY];
NdbRecAttr *m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
// NdbRecAttr has no reference to blob
typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
bool m_use_write;
bool retrieve_all_fields;
ha_rows rows_to_insert;
ha_rows rows_inserted;
ha_rows bulk_insert_rows;
ha_rows ops_pending;
bool blobs_pending;
// memory for blobs in one tuple
char *blobs_buffer;
uint32 blobs_buffer_size;
};
bool ndbcluster_init(void);
......@@ -231,10 +241,3 @@ int ndbcluster_discover(const char* dbname, const char* name,
int ndbcluster_drop_database(const char* path);
void ndbcluster_print_error(int error, const NdbOperation *error_op);
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment