Commit 5329b3a6 authored by unknown's avatar unknown

WL#2269 Enable query cache for NDB part 2

-This is mostly fixes for correct behaviour when using query cache + transactions + the thread that
fetches commit count from NDB at regular intervals. The major fix is to add a
list in thd_ndb, that keeps a list of NDB_SHARE's that were modified by
transaction and then "clearing" them in ndbcluster_commit.


mysql-test/r/ndb_cache2.result:
  Updated test cases for the ndb_util thread, more simultaneous tables and more tesst
mysql-test/t/ndb_cache2.test:
  Updated test cases for the ndb_util thread, more simultaneous tables and more advanced tesst
sql/ha_ndbcluster.cc:
  Add table changed during transaction to list of changed tables in Thd_ndb, this list is then used in ndbcluster_commit to invalidate the cached commit_count in share
  Fix so that ndb_util_thread uses milliseconds "sleeps"
  Changed so that ndb_commit_count uses the commit_count from share if available
sql/ha_ndbcluster.h:
  Add commit_count_lock to NBD_SHARE, use for detecting simultaneous attempts to update commit_count
  Add list of tables changed by transaction to Thd_ndb
  Change check_ndb_connection to take thd as argument, use current_thd as default
  Added m_rows_changed variable to keep track of if this handler has modified any records within the transaction
sql/set_var.cc:
  Change format of code
  Sort sys__ variables in aplha order
parent 82095bc4
drop table if exists t1; drop table if exists t1, t2, t3, t4, t5;
set GLOBAL query_cache_type=on; set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776; set GLOBAL query_cache_size=1355776;
set GLOBAL ndb_cache_check_time=1; set GLOBAL ndb_cache_check_time=100;
reset query cache; reset query cache;
flush status; flush status;
CREATE TABLE t1 ( pk int not null primary key, CREATE TABLE t1 (
a int, b int not null, c varchar(20)) ENGINE=ndbcluster; pk int not null primary key,
a1 int,
b1 int not null,
c1 varchar(20)
) ENGINE=ndb;
CREATE TABLE t2 (
pk int not null primary key,
a2 int,
b2 int not null
) ENGINE=ndb;
CREATE TABLE t3 (
pk int not null primary key,
a3 int,
b3 int not null,
c3 int not null,
d3 varchar(20)
) ENGINE=ndb;
CREATE TABLE t4 (
a4 int,
b4 int not null,
c4 char(20)
) ENGINE=ndbcluster;
CREATE TABLE t5 (
pk int not null primary key,
a5 int,
b5 int not null,
c5 varchar(255)
) ENGINE=ndbcluster;
insert into t1 value (1, 2, 3, 'First row'); insert into t1 value (1, 2, 3, 'First row');
insert into t2 value (1, 2, 3);
insert into t3 value (1, 2, 3, 4, '3 - First row');
insert into t4 value (2, 3, '4 - First row');
insert into t5 value (1, 2, 3, '5 - First row');
select * from t1; select * from t1;
pk a b c pk a1 b1 c1
1 2 3 First row 1 2 3 First row
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
Variable_name Value Variable_name Value
...@@ -20,14 +51,14 @@ show status like "Qcache_hits"; ...@@ -20,14 +51,14 @@ show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 0 Qcache_hits 0
select * from t1; select * from t1;
pk a b c pk a1 b1 c1
1 2 3 First row 1 2 3 First row
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 1 Qcache_hits 1
update t1 set a=3 where pk=1; update t1 set a1=3 where pk=1;
select * from t1; select * from t1;
pk a b c pk a1 b1 c1
1 3 3 First row 1 3 3 First row
show status like "Qcache_inserts"; show status like "Qcache_inserts";
Variable_name Value Variable_name Value
...@@ -38,7 +69,7 @@ Qcache_hits 1 ...@@ -38,7 +69,7 @@ Qcache_hits 1
insert into t1 value (2, 7, 8, 'Second row'); insert into t1 value (2, 7, 8, 'Second row');
insert into t1 value (4, 5, 6, 'Fourth row'); insert into t1 value (4, 5, 6, 'Fourth row');
select * from t1 order by pk desc; select * from t1 order by pk desc;
pk a b c pk a1 b1 c1
4 5 6 Fourth row 4 5 6 Fourth row
2 7 8 Second row 2 7 8 Second row
1 3 3 First row 1 3 3 First row
...@@ -49,15 +80,15 @@ show status like "Qcache_hits"; ...@@ -49,15 +80,15 @@ show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 1 Qcache_hits 1
select * from t1 order by pk desc; select * from t1 order by pk desc;
pk a b c pk a1 b1 c1
4 5 6 Fourth row 4 5 6 Fourth row
2 7 8 Second row 2 7 8 Second row
1 3 3 First row 1 3 3 First row
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 2 Qcache_hits 2
select * from t1 where b=3; select * from t1 where b1=3;
pk a b c pk a1 b1 c1
1 3 3 First row 1 3 3 First row
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
Variable_name Value Variable_name Value
...@@ -65,44 +96,44 @@ Qcache_queries_in_cache 2 ...@@ -65,44 +96,44 @@ Qcache_queries_in_cache 2
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 2 Qcache_hits 2
select * from t1 where b=3; select * from t1 where b1=3;
pk a b c pk a1 b1 c1
1 3 3 First row 1 3 3 First row
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 3 Qcache_hits 3
delete from t1 where c='Fourth row'; delete from t1 where c1='Fourth row';
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
Variable_name Value Variable_name Value
Qcache_queries_in_cache 0 Qcache_queries_in_cache 0
select * from t1 where b=3; select * from t1 where b1=3;
pk a b c pk a1 b1 c1
1 3 3 First row 1 3 3 First row
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 3 Qcache_hits 3
use test; use test;
select * from t1 order by pk desc; select * from t1 order by pk desc;
pk a b c pk a1 b1 c1
2 7 8 Second row 2 7 8 Second row
1 3 3 First row 1 3 3 First row
select * from t1 where b=3; select * from t1 where b1=3;
pk a b c pk a1 b1 c1
1 3 3 First row 1 3 3 First row
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 4 Qcache_hits 4
update t1 set a=4 where b=3; update t1 set a1=4 where b1=3;
use test; use test;
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
Variable_name Value Variable_name Value
Qcache_queries_in_cache 0 Qcache_queries_in_cache 0
select * from t1 order by pk desc; select * from t1 order by pk desc;
pk a b c pk a1 b1 c1
2 7 8 Second row 2 7 8 Second row
1 4 3 First row 1 4 3 First row
select * from t1 order by pk desc; select * from t1 order by pk desc;
pk a b c pk a1 b1 c1
2 7 8 Second row 2 7 8 Second row
1 4 3 First row 1 4 3 First row
show status like "Qcache_inserts"; show status like "Qcache_inserts";
...@@ -112,11 +143,11 @@ show status like "Qcache_hits"; ...@@ -112,11 +143,11 @@ show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 5 Qcache_hits 5
select * from t1 order by pk desc; select * from t1 order by pk desc;
pk a b c pk a1 b1 c1
2 7 8 Second row 2 7 8 Second row
1 4 3 First row 1 4 3 First row
select * from t1 order by pk desc; select * from t1 order by pk desc;
pk a b c pk a1 b1 c1
2 7 8 Second row 2 7 8 Second row
1 4 3 First row 1 4 3 First row
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
...@@ -128,64 +159,463 @@ Qcache_inserts 7 ...@@ -128,64 +159,463 @@ Qcache_inserts 7
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 7 Qcache_hits 7
select * from t2;
pk a2 b2
1 2 3
select * from t3;
pk a3 b3 c3 d3
1 2 3 4 3 - First row
select * from t4;
a4 b4 c4
2 3 4 - First row
select * from t5;
pk a5 b5 c5
1 2 3 5 - First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
flush status;
begin; begin;
update t1 set a=5 where pk=1; update t1 set a1=5 where pk=1;
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
Variable_name Value Variable_name Value
Qcache_queries_in_cache 0 Qcache_queries_in_cache 4
show status like "Qcache_inserts"; show status like "Qcache_inserts";
Variable_name Value Variable_name Value
Qcache_inserts 7 Qcache_inserts 0
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 7 Qcache_hits 0
select * from t1 order by pk desc; select * from t1 order by pk desc;
pk a b c pk a1 b1 c1
2 7 8 Second row 2 7 8 Second row
1 4 3 First row 1 4 3 First row
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
Variable_name Value Variable_name Value
Qcache_queries_in_cache 1 Qcache_queries_in_cache 5
show status like "Qcache_inserts"; show status like "Qcache_inserts";
Variable_name Value Variable_name Value
Qcache_inserts 8 Qcache_inserts 1
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 7 Qcache_hits 0
commit; commit;
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
Variable_name Value Variable_name Value
Qcache_queries_in_cache 1 Qcache_queries_in_cache 5
show status like "Qcache_inserts"; show status like "Qcache_inserts";
Variable_name Value Variable_name Value
Qcache_inserts 8 Qcache_inserts 1
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 7 Qcache_hits 0
select * from t1 order by pk desc; select * from t1 order by pk desc;
pk a b c pk a1 b1 c1
2 7 8 Second row 2 7 8 Second row
1 5 3 First row 1 5 3 First row
show status like "Qcache_inserts"; show status like "Qcache_inserts";
Variable_name Value Variable_name Value
Qcache_inserts 9 Qcache_inserts 2
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 7 Qcache_hits 0
select * from t1 order by pk desc; select * from t1 order by pk desc;
pk a b c pk a1 b1 c1
2 7 8 Second row 2 7 8 Second row
1 5 3 First row 1 5 3 First row
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
Variable_name Value Variable_name Value
Qcache_queries_in_cache 1 Qcache_queries_in_cache 5
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 2
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 1
flush status;
begin;
update t1 set a1=6 where pk=1;
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 4
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 0
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
select * from t1 order by pk desc;
pk a1 b1 c1
2 7 8 Second row
1 5 3 First row
select * from t1 order by pk desc;
pk a1 b1 c1
2 7 8 Second row
1 5 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 1
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 1
select * from t1 order by pk desc;
pk a1 b1 c1
2 7 8 Second row
1 6 3 First row
select * from t1 order by pk desc;
pk a1 b1 c1
2 7 8 Second row
1 6 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 1
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 1
commit;
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 1
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 1
select * from t1 order by pk desc;
pk a1 b1 c1
2 7 8 Second row
1 6 3 First row
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 2
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 1
select * from t1 order by pk desc;
pk a1 b1 c1
2 7 8 Second row
1 6 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 2
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 2
flush status;
begin;
insert into t1 set pk=5, a1=6, b1=3, c1="New row";
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 4
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 0
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
select * from t1 where pk=5;
pk a1 b1 c1
select * from t1 order by pk desc;
pk a1 b1 c1
2 7 8 Second row
1 6 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 6
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 2
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
select * from t1 where pk=5;
pk a1 b1 c1
5 6 3 New row
select * from t1 where pk=5;
pk a1 b1 c1
5 6 3 New row
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
2 7 8 Second row
1 6 3 First row
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
2 7 8 Second row
1 6 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 6
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 2
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
commit;
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
2 7 8 Second row
1 6 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 3
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
flush status;
begin;
delete from t1 where pk=2;
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 4
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 0
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
select * from t1 where pk=2;
pk a1 b1 c1
2 7 8 Second row
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
2 7 8 Second row
1 6 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 6
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 2
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
select * from t1 where pk=2;
pk a1 b1 c1
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
1 6 3 First row
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
1 6 3 First row
select * from t1 where pk=2;
pk a1 b1 c1
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 6
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 2
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
commit;
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
1 6 3 First row
select * from t1 where pk=2;
pk a1 b1 c1
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 6
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 4
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
flush status;
begin;
update t1 set a1=9 where pk=1;
update t2 set a2=9 where pk=1;
update t3 set a3=9 where pk=1;
update t4 set a4=9 where a4=2;
update t5 set a5=9 where pk=1;
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 0
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 0
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
1 6 3 First row
select * from t2;
pk a2 b2
1 2 3
select * from t3;
pk a3 b3 c3 d3
1 2 3 4 3 - First row
select * from t4;
a4 b4 c4
2 3 4 - First row
select * from t5;
pk a5 b5 c5
1 2 3 5 - First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 5
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
1 9 3 First row
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
1 9 3 First row
select * from t2;
pk a2 b2
1 9 3
select * from t3;
pk a3 b3 c3 d3
1 9 3 4 3 - First row
select * from t4;
a4 b4 c4
9 3 4 - First row
select * from t5;
pk a5 b5 c5
1 9 3 5 - First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 5
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
commit;
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
1 9 3 First row
select * from t2;
pk a2 b2
1 9 3
select * from t3;
pk a3 b3 c3 d3
1 9 3 4 3 - First row
select * from t4;
a4 b4 c4
9 3 4 - First row
select * from t5;
pk a5 b5 c5
1 9 3 5 - First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 10
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
1 9 3 First row
select * from t2;
pk a2 b2
1 9 3
select * from t3;
pk a3 b3 c3 d3
1 9 3 4 3 - First row
select * from t4;
a4 b4 c4
9 3 4 - First row
select * from t5;
pk a5 b5 c5
1 9 3 5 - First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 10
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 5
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
1 9 3 First row
select * from t2;
pk a2 b2
1 9 3
select * from t3;
pk a3 b3 c3 d3
1 9 3 4 3 - First row
select * from t4;
a4 b4 c4
9 3 4 - First row
select * from t5;
pk a5 b5 c5
1 9 3 5 - First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 10
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 10
select * from t1 order by pk desc;
pk a1 b1 c1
5 6 3 New row
1 9 3 First row
select * from t2;
pk a2 b2
1 9 3
select * from t3;
pk a3 b3 c3 d3
1 9 3 4 3 - First row
select * from t4;
a4 b4 c4
9 3 4 - First row
select * from t5;
pk a5 b5 c5
1 9 3 5 - First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 5
show status like "Qcache_inserts"; show status like "Qcache_inserts";
Variable_name Value Variable_name Value
Qcache_inserts 9 Qcache_inserts 10
show status like "Qcache_hits"; show status like "Qcache_hits";
Variable_name Value Variable_name Value
Qcache_hits 8 Qcache_hits 15
drop table t1; drop table t1, t2, t3, t4, t5;
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
Variable_name Value Variable_name Value
Qcache_queries_in_cache 0 Qcache_queries_in_cache 0
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
-- source include/have_ndb.inc -- source include/have_ndb.inc
--disable_warnings --disable_warnings
drop table if exists t1; drop table if exists t1, t2, t3, t4, t5;
--enable_warnings --enable_warnings
...@@ -10,19 +10,47 @@ drop table if exists t1; ...@@ -10,19 +10,47 @@ drop table if exists t1;
set GLOBAL query_cache_type=on; set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776; set GLOBAL query_cache_size=1355776;
# Turn on thread that will fetch commit count for open tables # Turn on thread that will fetch commit count for open tables
set GLOBAL ndb_cache_check_time=1; set GLOBAL ndb_cache_check_time=100;
reset query cache; reset query cache;
flush status; flush status;
# Wait for thread to wake up and start "working" # Create test tables in NDB
sleep 20; CREATE TABLE t1 (
pk int not null primary key,
# Create test table in NDB a1 int,
CREATE TABLE t1 ( pk int not null primary key, b1 int not null,
a int, b int not null, c varchar(20)) ENGINE=ndbcluster; c1 varchar(20)
) ENGINE=ndb;
CREATE TABLE t2 (
pk int not null primary key,
a2 int,
b2 int not null
) ENGINE=ndb;
CREATE TABLE t3 (
pk int not null primary key,
a3 int,
b3 int not null,
c3 int not null,
d3 varchar(20)
) ENGINE=ndb;
CREATE TABLE t4 (
a4 int,
b4 int not null,
c4 char(20)
) ENGINE=ndbcluster;
CREATE TABLE t5 (
pk int not null primary key,
a5 int,
b5 int not null,
c5 varchar(255)
) ENGINE=ndbcluster;
insert into t1 value (1, 2, 3, 'First row'); insert into t1 value (1, 2, 3, 'First row');
insert into t2 value (1, 2, 3);
insert into t3 value (1, 2, 3, 4, '3 - First row');
insert into t4 value (2, 3, '4 - First row');
insert into t5 value (1, 2, 3, '5 - First row');
# Perform one query which should be inerted in query cache # Perform one query which should be inserted in query cache
select * from t1; select * from t1;
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts"; show status like "Qcache_inserts";
...@@ -33,7 +61,7 @@ select * from t1; ...@@ -33,7 +61,7 @@ select * from t1;
show status like "Qcache_hits"; show status like "Qcache_hits";
# Update the table and make sure the correct data is returned # Update the table and make sure the correct data is returned
update t1 set a=3 where pk=1; update t1 set a1=3 where pk=1;
select * from t1; select * from t1;
show status like "Qcache_inserts"; show status like "Qcache_inserts";
show status like "Qcache_hits"; show status like "Qcache_hits";
...@@ -48,18 +76,18 @@ select * from t1 order by pk desc; ...@@ -48,18 +76,18 @@ select * from t1 order by pk desc;
show status like "Qcache_hits"; show status like "Qcache_hits";
# Perform a "new" query and make sure the query cache is not hit # Perform a "new" query and make sure the query cache is not hit
select * from t1 where b=3; select * from t1 where b1=3;
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
show status like "Qcache_hits"; show status like "Qcache_hits";
# Same query again... # Same query again...
select * from t1 where b=3; select * from t1 where b1=3;
show status like "Qcache_hits"; show status like "Qcache_hits";
# Delete from the table # Delete from the table
delete from t1 where c='Fourth row'; delete from t1 where c1='Fourth row';
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
select * from t1 where b=3; select * from t1 where b1=3;
show status like "Qcache_hits"; show status like "Qcache_hits";
# Start another connection and check that the query cache is hit # Start another connection and check that the query cache is hit
...@@ -67,11 +95,11 @@ connect (con1,localhost,root,,); ...@@ -67,11 +95,11 @@ connect (con1,localhost,root,,);
connection con1; connection con1;
use test; use test;
select * from t1 order by pk desc; select * from t1 order by pk desc;
select * from t1 where b=3; select * from t1 where b1=3;
show status like "Qcache_hits"; show status like "Qcache_hits";
# Update the table and switch to other connection # Update the table and switch to other connection
update t1 set a=4 where b=3; update t1 set a1=4 where b1=3;
connect (con2,localhost,root,,); connect (con2,localhost,root,,);
connection con2; connection con2;
use test; use test;
...@@ -87,37 +115,243 @@ show status like "Qcache_queries_in_cache"; ...@@ -87,37 +115,243 @@ show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts"; show status like "Qcache_inserts";
show status like "Qcache_hits"; show status like "Qcache_hits";
# Use transactions and make sure the query cache is not updated until # Load all tables into cache
# transaction is commited select * from t2;
select * from t3;
select * from t4;
select * from t5;
show status like "Qcache_queries_in_cache";
#####################################################################
# Start transaction and perform update
# Switch to other transaction and check that update does not show up
# Switch back and commit transaction
# Switch to other transaction and check that update shows up
#####################################################################
connection con1;
flush status;
begin;
update t1 set a1=5 where pk=1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con2;
select * from t1 order by pk desc;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
commit;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con2;
select * from t1 order by pk desc;
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
select * from t1 order by pk desc;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
#####################################################################
# Start transaction and perform update
# Switch to other transaction and check that update does not show up
# Switch back, perform selects and commit transaction
# Switch to other transaction and check that update shows up
#####################################################################
connection con1;
flush status;
begin;
update t1 set a1=6 where pk=1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con2;
select * from t1 order by pk desc;
select * from t1 order by pk desc;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
# The two queries below will not hit cache since transaction is ongoing
select * from t1 order by pk desc;
select * from t1 order by pk desc;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
commit;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con2;
select * from t1 order by pk desc;
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
select * from t1 order by pk desc;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
#####################################################################
# Start transaction and perform insert
# Switch to other transaction and check that insert does not show up
# Switch back, perform selects and commit transaction
# Switch to other transaction and check that update shows up
#####################################################################
connection con1;
flush status;
begin;
insert into t1 set pk=5, a1=6, b1=3, c1="New row";
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con2;
select * from t1 where pk=5;
select * from t1 order by pk desc;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
# The below four queries will not be cached, trans is ongoing
select * from t1 where pk=5;
select * from t1 where pk=5;
select * from t1 order by pk desc;
select * from t1 order by pk desc;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
commit;
connection con2;
select * from t1 order by pk desc;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
#####################################################################
# Start transaction and perform delete
# Switch to other transaction and check that delete does not show up
# Switch back, perform selects and commit transaction
# Switch to other transaction and check that update shows up
#####################################################################
connection con1;
flush status;
begin; begin;
update t1 set a=5 where pk=1; delete from t1 where pk=2;
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts"; show status like "Qcache_inserts";
show status like "Qcache_hits"; show status like "Qcache_hits";
connection con2; connection con2;
select * from t1 where pk=2;
select * from t1 order by pk desc; select * from t1 order by pk desc;
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts"; show status like "Qcache_inserts";
show status like "Qcache_hits"; show status like "Qcache_hits";
connection con1; connection con1;
# The below four queries will not be cached, trans is ongoing
select * from t1 where pk=2;
select * from t1 order by pk desc;
select * from t1 order by pk desc;
select * from t1 where pk=2;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
commit; commit;
# Sleep to let the query cache thread update commit count
sleep 10; connection con2;
select * from t1 order by pk desc;
select * from t1 where pk=2;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
#####################################################################
# Start a transaction which updates all tables
# Switch to other transaction and check updates does not show up
# Switch back, perform selects and commit transaction
# Switch to other transaction and check that update shows up
#####################################################################
flush status;
begin;
update t1 set a1=9 where pk=1;
update t2 set a2=9 where pk=1;
update t3 set a3=9 where pk=1;
update t4 set a4=9 where a4=2;
update t5 set a5=9 where pk=1;
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts"; show status like "Qcache_inserts";
show status like "Qcache_hits"; show status like "Qcache_hits";
connection con2; connection con2;
select * from t1 order by pk desc; select * from t1 order by pk desc;
select * from t2;
select * from t3;
select * from t4;
select * from t5;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts"; show status like "Qcache_inserts";
show status like "Qcache_hits"; show status like "Qcache_hits";
connection con1; connection con1;
# The below five queries will not be cached, trans is ongoing
select * from t1 order by pk desc;
select * from t1 order by pk desc;
select * from t2;
select * from t3;
select * from t4;
select * from t5;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
commit;
connection con2;
select * from t1 order by pk desc;
select * from t2;
select * from t3;
select * from t4;
select * from t5;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
select * from t1 order by pk desc;
select * from t2;
select * from t3;
select * from t4;
select * from t5;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
select * from t1 order by pk desc;
select * from t2;
select * from t3;
select * from t4;
select * from t5;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con2;
select * from t1 order by pk desc; select * from t1 order by pk desc;
select * from t2;
select * from t3;
select * from t4;
select * from t5;
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts"; show status like "Qcache_inserts";
show status like "Qcache_hits"; show status like "Qcache_hits";
drop table t1; drop table t1, t2, t3, t4, t5;
# There should be no queries in cache, when tables have been dropped
show status like "Qcache_queries_in_cache"; show status like "Qcache_queries_in_cache";
SET GLOBAL query_cache_size=0; SET GLOBAL query_cache_size=0;
......
...@@ -286,7 +286,8 @@ Thd_ndb::~Thd_ndb() ...@@ -286,7 +286,8 @@ Thd_ndb::~Thd_ndb()
{ {
if (ndb) if (ndb)
delete ndb; delete ndb;
ndb= 0; ndb= NULL;
changed_tables.empty();
} }
inline inline
...@@ -1954,7 +1955,7 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1954,7 +1955,7 @@ int ha_ndbcluster::write_row(byte *record)
if (peek_res != HA_ERR_KEY_NOT_FOUND) if (peek_res != HA_ERR_KEY_NOT_FOUND)
DBUG_RETURN(peek_res); DBUG_RETURN(peek_res);
} }
statistic_increment(thd->status_var.ha_write_count, &LOCK_status); statistic_increment(thd->status_var.ha_write_count, &LOCK_status);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
table->timestamp_field->set_time(); table->timestamp_field->set_time();
...@@ -2003,6 +2004,8 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2003,6 +2004,8 @@ int ha_ndbcluster::write_row(byte *record)
} }
} }
m_rows_changed++;
/* /*
Execute write operation Execute write operation
NOTE When doing inserts with many values in NOTE When doing inserts with many values in
...@@ -2196,6 +2199,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -2196,6 +2199,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
} }
} }
m_rows_changed++;
// Set non-key attribute(s) // Set non-key attribute(s)
for (i= 0; i < table->s->fields; i++) for (i= 0; i < table->s->fields; i++)
{ {
...@@ -2278,7 +2283,9 @@ int ha_ndbcluster::delete_row(const byte *record) ...@@ -2278,7 +2283,9 @@ int ha_ndbcluster::delete_row(const byte *record)
return res; return res;
} }
} }
m_rows_changed++;
// Execute delete operation // Execute delete operation
if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans) != 0) {
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
...@@ -3181,14 +3188,14 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3181,14 +3188,14 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
Check that this handler instance has a connection Check that this handler instance has a connection
set up to the Ndb object of thd set up to the Ndb object of thd
*/ */
if (check_ndb_connection()) if (check_ndb_connection(thd))
DBUG_RETURN(1); DBUG_RETURN(1);
Thd_ndb *thd_ndb= get_thd_ndb(thd); Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb; Ndb *ndb= thd_ndb->ndb;
DBUG_PRINT("enter", ("transaction.thd_ndb->lock_count: %d", DBUG_PRINT("enter", ("thd: %x, thd_ndb: %x, thd_ndb->lock_count: %d",
thd_ndb->lock_count)); thd, thd_ndb, thd_ndb->lock_count));
if (lock_type != F_UNLCK) if (lock_type != F_UNLCK)
{ {
...@@ -3196,7 +3203,6 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3196,7 +3203,6 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
if (!thd_ndb->lock_count++) if (!thd_ndb->lock_count++)
{ {
PRINT_OPTION_FLAGS(thd); PRINT_OPTION_FLAGS(thd);
if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN | OPTION_TABLE_LOCK))) if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN | OPTION_TABLE_LOCK)))
{ {
// Autocommit transaction // Autocommit transaction
...@@ -3264,9 +3270,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3264,9 +3270,10 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
m_active_trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt; m_active_trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
DBUG_ASSERT(m_active_trans); DBUG_ASSERT(m_active_trans);
// Start of transaction // Start of transaction
m_rows_changed= 0;
m_retrieve_all_fields= FALSE; m_retrieve_all_fields= FALSE;
m_retrieve_primary_key= FALSE; m_retrieve_primary_key= FALSE;
m_ops_pending= 0; m_ops_pending= 0;
{ {
NDBDICT *dict= ndb->getDictionary(); NDBDICT *dict= ndb->getDictionary();
const NDBTAB *tab; const NDBTAB *tab;
...@@ -3278,10 +3285,28 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3278,10 +3285,28 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
m_table_info= tab_info; m_table_info= tab_info;
} }
no_uncommitted_rows_init(thd); no_uncommitted_rows_init(thd);
} }
else else
{ {
DBUG_PRINT("info", ("lock_type == F_UNLCK")); DBUG_PRINT("info", ("lock_type == F_UNLCK"));
if (ndb_cache_check_time && m_rows_changed)
{
DBUG_PRINT("info", ("Rows has changed and util thread is running"));
if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
{
DBUG_PRINT("info", ("Add share to list of tables to be invalidated"));
/* NOTE push_back allocates memory using transactions mem_root! */
thd_ndb->changed_tables.push_back(m_share, &thd->transaction.mem_root);
}
pthread_mutex_lock(&m_share->mutex);
DBUG_PRINT("info", ("Invalidating commit_count"));
m_share->commit_count= 0;
m_share->commit_count_lock++;
pthread_mutex_unlock(&m_share->mutex);
}
if (!--thd_ndb->lock_count) if (!--thd_ndb->lock_count)
{ {
DBUG_PRINT("trans", ("Last external_lock")); DBUG_PRINT("trans", ("Last external_lock"));
...@@ -3301,6 +3326,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3301,6 +3326,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
} }
m_table= NULL; m_table= NULL;
m_table_info= NULL; m_table_info= NULL;
/* /*
This is the place to make sure this handler instance This is the place to make sure this handler instance
no longer are connected to the active transaction. no longer are connected to the active transaction.
...@@ -3374,7 +3400,7 @@ int ha_ndbcluster::start_stmt(THD *thd) ...@@ -3374,7 +3400,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
/* /*
Commit a transaction started in NDB Commit a transaction started in NDB
*/ */
int ndbcluster_commit(THD *thd, bool all) int ndbcluster_commit(THD *thd, bool all)
...@@ -3386,7 +3412,7 @@ int ndbcluster_commit(THD *thd, bool all) ...@@ -3386,7 +3412,7 @@ int ndbcluster_commit(THD *thd, bool all)
DBUG_ENTER("ndbcluster_commit"); DBUG_ENTER("ndbcluster_commit");
DBUG_PRINT("transaction",("%s", DBUG_PRINT("transaction",("%s",
trans == thd_ndb->stmt ? trans == thd_ndb->stmt ?
"stmt" : "all")); "stmt" : "all"));
DBUG_ASSERT(ndb && trans); DBUG_ASSERT(ndb && trans);
...@@ -3394,18 +3420,31 @@ int ndbcluster_commit(THD *thd, bool all) ...@@ -3394,18 +3420,31 @@ int ndbcluster_commit(THD *thd, bool all)
{ {
const NdbError err= trans->getNdbError(); const NdbError err= trans->getNdbError();
const NdbOperation *error_op= trans->getNdbErrorOperation(); const NdbOperation *error_op= trans->getNdbErrorOperation();
ERR_PRINT(err); ERR_PRINT(err);
res= ndb_to_mysql_error(&err); res= ndb_to_mysql_error(&err);
if (res != -1) if (res != -1)
ndbcluster_print_error(res, error_op); ndbcluster_print_error(res, error_op);
} }
ndb->closeTransaction(trans); ndb->closeTransaction(trans);
if(all) if(all)
thd_ndb->all= NULL; thd_ndb->all= NULL;
else else
thd_ndb->stmt= NULL; thd_ndb->stmt= NULL;
/* Clear commit_count for tables changed by transaction */
NDB_SHARE* share;
List_iterator_fast<NDB_SHARE> it(thd_ndb->changed_tables);
while ((share= it++))
{
pthread_mutex_lock(&share->mutex);
DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ", share->table_name, share->commit_count));
share->commit_count= 0;
share->commit_count_lock++;
pthread_mutex_unlock(&share->mutex);
}
thd_ndb->changed_tables.empty();
DBUG_RETURN(res); DBUG_RETURN(res);
} }
...@@ -3443,6 +3482,9 @@ int ndbcluster_rollback(THD *thd, bool all) ...@@ -3443,6 +3482,9 @@ int ndbcluster_rollback(THD *thd, bool all)
else else
thd_ndb->stmt= NULL; thd_ndb->stmt= NULL;
/* Clear list of tables changed by transaction */
thd_ndb->changed_tables.empty();
DBUG_RETURN(res); DBUG_RETURN(res);
} }
...@@ -4135,6 +4177,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -4135,6 +4177,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_rows_to_insert(1), m_rows_to_insert(1),
m_rows_inserted(0), m_rows_inserted(0),
m_bulk_insert_rows(1024), m_bulk_insert_rows(1024),
m_rows_changed(0),
m_bulk_insert_not_flushed(FALSE), m_bulk_insert_not_flushed(FALSE),
m_ops_pending(0), m_ops_pending(0),
m_skip_auto_increment(TRUE), m_skip_auto_increment(TRUE),
...@@ -4147,9 +4190,9 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -4147,9 +4190,9 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_autoincrement_prefetch(32), m_autoincrement_prefetch(32),
m_transaction_on(TRUE), m_transaction_on(TRUE),
m_multi_cursor(NULL) m_multi_cursor(NULL)
{ {
int i; int i;
DBUG_ENTER("ha_ndbcluster"); DBUG_ENTER("ha_ndbcluster");
m_tabname[0]= '\0'; m_tabname[0]= '\0';
...@@ -4309,9 +4352,8 @@ Ndb* check_ndb_in_thd(THD* thd) ...@@ -4309,9 +4352,8 @@ Ndb* check_ndb_in_thd(THD* thd)
int ha_ndbcluster::check_ndb_connection() int ha_ndbcluster::check_ndb_connection(THD* thd)
{ {
THD* thd= current_thd;
Ndb *ndb; Ndb *ndb;
DBUG_ENTER("check_ndb_connection"); DBUG_ENTER("check_ndb_connection");
...@@ -4385,33 +4427,31 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name, ...@@ -4385,33 +4427,31 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name,
/* /*
Check if a table exists in NDB Check if a table exists in NDB
*/ */
int ndbcluster_table_exists(THD* thd, const char *db, const char *name) int ndbcluster_table_exists(THD* thd, const char *db, const char *name)
{ {
uint len;
const void* data;
const NDBTAB* tab; const NDBTAB* tab;
Ndb* ndb; Ndb* ndb;
DBUG_ENTER("ndbcluster_table_exists"); DBUG_ENTER("ndbcluster_table_exists");
DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
if (!(ndb= check_ndb_in_thd(thd))) if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION); DBUG_RETURN(HA_ERR_NO_CONNECTION);
ndb->setDatabaseName(db); ndb->setDatabaseName(db);
NDBDICT* dict= ndb->getDictionary(); NDBDICT* dict= ndb->getDictionary();
dict->set_local_table_data_size(sizeof(Ndb_table_local_info)); dict->set_local_table_data_size(sizeof(Ndb_table_local_info));
dict->invalidateTable(name); dict->invalidateTable(name);
if (!(tab= dict->getTable(name))) if (!(tab= dict->getTable(name)))
{ {
const NdbError err= dict->getNdbError(); const NdbError err= dict->getNdbError();
if (err.code == 709) if (err.code == 709)
DBUG_RETURN(0); DBUG_RETURN(0);
ERR_RETURN(err); ERR_RETURN(err);
} }
DBUG_PRINT("info", ("Found table %s", tab->getName())); DBUG_PRINT("info", ("Found table %s", tab->getName()));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -4929,38 +4969,65 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname, ...@@ -4929,38 +4969,65 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
{ {
DBUG_ENTER("ndb_get_commitcount"); DBUG_ENTER("ndb_get_commitcount");
char name[FN_REFLEN];
NDB_SHARE *share;
(void)strxnmov(name, FN_REFLEN, "./",dbname,"/",tabname,NullS);
DBUG_PRINT("enter", ("name: %s", name));
pthread_mutex_lock(&ndbcluster_mutex);
if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) name,
strlen(name))))
{
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables",
name));
DBUG_RETURN(1);
}
share->use_count++;
pthread_mutex_unlock(&ndbcluster_mutex);
pthread_mutex_lock(&share->mutex);
if (ndb_cache_check_time > 0) if (ndb_cache_check_time > 0)
{ {
/* Use cached commit_count from share */ if (share->commit_count != 0)
char name[FN_REFLEN];
NDB_SHARE *share;
(void)strxnmov(name, FN_REFLEN,
"./",dbname,"/",tabname,NullS);
DBUG_PRINT("info", ("name: %s", name));
pthread_mutex_lock(&ndbcluster_mutex);
if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) name,
strlen(name))))
{ {
pthread_mutex_unlock(&ndbcluster_mutex); *commit_count= share->commit_count;
DBUG_RETURN(1); DBUG_PRINT("info", ("Getting commit_count: %llu from share",
share->commit_count));
pthread_mutex_unlock(&share->mutex);
free_share(share);
DBUG_RETURN(0);
} }
*commit_count= share->commit_count;
DBUG_PRINT("info", ("commit_count: %d", *commit_count));
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
} }
DBUG_PRINT("info", ("Get commit_count from NDB"));
/* Get commit_count from NDB */
Ndb *ndb; Ndb *ndb;
if (!(ndb= check_ndb_in_thd(thd))) if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(1); DBUG_RETURN(1);
ndb->setDatabaseName(dbname); ndb->setDatabaseName(dbname);
uint lock= share->commit_count_lock;
pthread_mutex_unlock(&share->mutex);
struct Ndb_statistics stat; struct Ndb_statistics stat;
if (ndb_get_table_statistics(ndb, tabname, &stat)) if (ndb_get_table_statistics(ndb, tabname, &stat))
{
free_share(share);
DBUG_RETURN(1); DBUG_RETURN(1);
*commit_count= stat.commit_count; }
pthread_mutex_lock(&share->mutex);
if(share->commit_count_lock == lock)
{
DBUG_PRINT("info", ("Setting commit_count to %llu", stat.commit_count));
share->commit_count= stat.commit_count;
*commit_count= stat.commit_count;
}
else
{
DBUG_PRINT("info", ("Discarding commit_count, comit_count_lock changed"));
*commit_count= 0;
}
pthread_mutex_unlock(&share->mutex);
free_share(share);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -5007,27 +5074,37 @@ ndbcluster_cache_retrieval_allowed(THD *thd, ...@@ -5007,27 +5074,37 @@ ndbcluster_cache_retrieval_allowed(THD *thd,
char *dbname= full_name; char *dbname= full_name;
char *tabname= dbname+strlen(dbname)+1; char *tabname= dbname+strlen(dbname)+1;
DBUG_PRINT("enter",("dbname=%s, tabname=%s, autocommit=%d", DBUG_PRINT("enter", ("dbname: %s, tabname: %s, is_autocommit: %d",
dbname, tabname, is_autocommit)); dbname, tabname, is_autocommit));
if (!is_autocommit) if (!is_autocommit)
{
DBUG_PRINT("exit", ("No, don't use cache in transaction"));
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
}
if (ndb_get_commitcount(thd, dbname, tabname, &commit_count)) if (ndb_get_commitcount(thd, dbname, tabname, &commit_count))
{ {
*engine_data+= 1; /* invalidate */ *engine_data= 0; /* invalidate */
DBUG_PRINT("exit", ("No, could not retrieve commit_count"));
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
DBUG_PRINT("info", ("*engine_data=%llu, commit_count=%llu", DBUG_PRINT("info", ("*engine_data: %llu, commit_count: %llu",
*engine_data, commit_count)); *engine_data, commit_count));
if (*engine_data != commit_count) if (commit_count == 0)
{ {
*engine_data= commit_count; /* invalidate */ *engine_data= 0; /* invalidate */
DBUG_PRINT("exit",("Do not use cache, commit_count has changed")); DBUG_PRINT("exit", ("No, local commit has been performed"));
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
else if (*engine_data != commit_count)
{
*engine_data= commit_count; /* invalidate */
DBUG_PRINT("exit", ("No, commit_count has changed"));
DBUG_RETURN(FALSE);
}
DBUG_PRINT("exit",("OK to use cache, *engine_data=%llu",*engine_data)); DBUG_PRINT("exit", ("OK to use cache, engine_data: %llu", *engine_data));
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
...@@ -5063,22 +5140,27 @@ ha_ndbcluster::register_query_cache_table(THD *thd, ...@@ -5063,22 +5140,27 @@ ha_ndbcluster::register_query_cache_table(THD *thd,
DBUG_ENTER("ha_ndbcluster::register_query_cache_table"); DBUG_ENTER("ha_ndbcluster::register_query_cache_table");
bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)); bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
DBUG_PRINT("enter",("dbname=%s, tabname=%s, is_autocommit=%d",
m_dbname,m_tabname,is_autocommit)); DBUG_PRINT("enter",("dbname: %s, tabname: %s, is_autocommit: %d",
m_dbname, m_tabname, is_autocommit));
if (!is_autocommit) if (!is_autocommit)
{
DBUG_PRINT("exit", ("Can't register table during transaction"))
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
}
Uint64 commit_count; Uint64 commit_count;
if (ndb_get_commitcount(thd, m_dbname, m_tabname, &commit_count)) if (ndb_get_commitcount(thd, m_dbname, m_tabname, &commit_count))
{ {
*engine_data= 0; *engine_data= 0;
DBUG_PRINT("error", ("Could not get commitcount")) DBUG_PRINT("exit", ("Error, could not get commitcount"))
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
*engine_data= commit_count; *engine_data= commit_count;
*engine_callback= ndbcluster_cache_retrieval_allowed; *engine_callback= ndbcluster_cache_retrieval_allowed;
DBUG_PRINT("exit",("*engine_data=%llu", *engine_data)); DBUG_PRINT("exit", ("commit_count: %llu", commit_count));
DBUG_RETURN(TRUE); DBUG_RETURN(commit_count > 0);
} }
...@@ -5121,14 +5203,21 @@ static NDB_SHARE* get_share(const char *table_name) ...@@ -5121,14 +5203,21 @@ static NDB_SHARE* get_share(const char *table_name)
thr_lock_init(&share->lock); thr_lock_init(&share->lock);
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST); pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
share->commit_count= 0; share->commit_count= 0;
share->commit_count_lock= 0;
}
else
{
DBUG_PRINT("error", ("Failed to alloc share"));
pthread_mutex_unlock(&ndbcluster_mutex);
return 0;
} }
} }
DBUG_PRINT("share",
("table_name: %s, length: %d, use_count: %d, commit_count: %d",
share->table_name, share->table_name_length, share->use_count,
share->commit_count));
share->use_count++; share->use_count++;
DBUG_PRINT("share",
("table_name: %s, length: %d, use_count: %d, commit_count: %d",
share->table_name, share->table_name_length, share->use_count,
share->commit_count));
pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_unlock(&ndbcluster_mutex);
return share; return share;
} }
...@@ -5139,7 +5228,7 @@ static void free_share(NDB_SHARE *share) ...@@ -5139,7 +5228,7 @@ static void free_share(NDB_SHARE *share)
pthread_mutex_lock(&ndbcluster_mutex); pthread_mutex_lock(&ndbcluster_mutex);
if (!--share->use_count) if (!--share->use_count)
{ {
hash_delete(&ndbcluster_open_tables, (byte*) share); hash_delete(&ndbcluster_open_tables, (byte*) share);
thr_lock_delete(&share->lock); thr_lock_delete(&share->lock);
pthread_mutex_destroy(&share->mutex); pthread_mutex_destroy(&share->mutex);
my_free((gptr) share, MYF(0)); my_free((gptr) share, MYF(0));
...@@ -5283,6 +5372,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -5283,6 +5372,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
if (check == -1) if (check == -1)
break; break;
Uint32 count= 0;
Uint64 sum_rows= 0; Uint64 sum_rows= 0;
Uint64 sum_commits= 0; Uint64 sum_commits= 0;
Uint64 sum_row_size= 0; Uint64 sum_row_size= 0;
...@@ -5294,6 +5384,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -5294,6 +5384,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
if (sum_row_size < size) if (sum_row_size < size)
sum_row_size= size; sum_row_size= size;
sum_mem+= mem; sum_mem+= mem;
count++;
} }
if (check == -1) if (check == -1)
...@@ -5308,8 +5399,11 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -5308,8 +5399,11 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
ndbstat->row_size= sum_row_size; ndbstat->row_size= sum_row_size;
ndbstat->fragment_memory= sum_mem; ndbstat->fragment_memory= sum_mem;
DBUG_PRINT("exit", ("records: %u commits: %u row_size: %d mem: %d", DBUG_PRINT("exit", ("records: %llu commits: %llu "
sum_rows, sum_commits, sum_row_size, sum_mem)); "row_size: %llu mem: %llu count: %u",
sum_rows, sum_commits, sum_row_size,
sum_mem, count));
DBUG_RETURN(0); DBUG_RETURN(0);
} while(0); } while(0);
...@@ -5739,6 +5833,7 @@ extern "C" pthread_handler_decl(ndb_util_thread_func, ...@@ -5739,6 +5833,7 @@ extern "C" pthread_handler_decl(ndb_util_thread_func,
arg __attribute__((unused))) arg __attribute__((unused)))
{ {
THD *thd; /* needs to be first for thread_stack */ THD *thd; /* needs to be first for thread_stack */
Ndb* ndb;
int error= 0; int error= 0;
struct timespec abstime; struct timespec abstime;
...@@ -5748,12 +5843,13 @@ extern "C" pthread_handler_decl(ndb_util_thread_func, ...@@ -5748,12 +5843,13 @@ extern "C" pthread_handler_decl(ndb_util_thread_func,
thd= new THD; /* note that contructor of THD uses DBUG_ */ thd= new THD; /* note that contructor of THD uses DBUG_ */
THD_CHECK_SENTRY(thd); THD_CHECK_SENTRY(thd);
ndb= new Ndb(g_ndb_cluster_connection, "");
pthread_detach_this_thread(); pthread_detach_this_thread();
ndb_util_thread= pthread_self(); ndb_util_thread= pthread_self();
thd->thread_stack= (char*)&thd; /* remember where our stack is */ thd->thread_stack= (char*)&thd; /* remember where our stack is */
if (thd->store_globals()) if (thd->store_globals() && (ndb->init() != -1))
{ {
thd->cleanup(); thd->cleanup();
delete thd; delete thd;
...@@ -5779,22 +5875,11 @@ extern "C" pthread_handler_decl(ndb_util_thread_func, ...@@ -5779,22 +5875,11 @@ extern "C" pthread_handler_decl(ndb_util_thread_func,
if (ndb_cache_check_time == 0) if (ndb_cache_check_time == 0)
{ {
/* Wake up in 10 seconds to check if value has changed */ /* Wake up in 1 second to check if value has changed */
set_timespec(abstime, 10); set_timespec(abstime, 1);
continue; continue;
} }
/* Set new time to wake up */
struct timeval tv;
gettimeofday(&tv,0);
abstime.tv_sec= tv.tv_sec + (ndb_cache_check_time / 1000);
abstime.tv_nsec= tv.tv_usec * 1000 + (ndb_cache_check_time % 1000);
if (abstime.tv_nsec >= 1000000000)
{
abstime.tv_sec += 1;
abstime.tv_nsec -= 1000000000;
}
/* Lock mutex and fill list with pointers to all open tables */ /* Lock mutex and fill list with pointers to all open tables */
NDB_SHARE *share; NDB_SHARE *share;
pthread_mutex_lock(&ndbcluster_mutex); pthread_mutex_lock(&ndbcluster_mutex);
...@@ -5814,7 +5899,7 @@ extern "C" pthread_handler_decl(ndb_util_thread_func, ...@@ -5814,7 +5899,7 @@ extern "C" pthread_handler_decl(ndb_util_thread_func,
/* Iterate through the open files list */ /* Iterate through the open files list */
List_iterator_fast<NDB_SHARE> it(util_open_tables); List_iterator_fast<NDB_SHARE> it(util_open_tables);
while (share= it++) while ((share= it++))
{ {
/* Split tab- and dbname */ /* Split tab- and dbname */
char buf[FN_REFLEN]; char buf[FN_REFLEN];
...@@ -5825,26 +5910,37 @@ extern "C" pthread_handler_decl(ndb_util_thread_func, ...@@ -5825,26 +5910,37 @@ extern "C" pthread_handler_decl(ndb_util_thread_func,
buf[length-1]= 0; buf[length-1]= 0;
db= buf+dirname_length(buf); db= buf+dirname_length(buf);
DBUG_PRINT("ndb_util_thread", DBUG_PRINT("ndb_util_thread",
("Fetching commit count for: %s, db: %s, tab: %s", ("Fetching commit count for: %s",
share->table_name, db, tabname)); share->table_name));
/* Contact NDB to get commit count for table */ /* Contact NDB to get commit count for table */
g_ndb->setDatabaseName(db); ndb->setDatabaseName(db);
struct Ndb_statistics stat;; struct Ndb_statistics stat;
if(ndb_get_table_statistics(g_ndb, tabname, &stat) == 0)
uint lock;
pthread_mutex_lock(&share->mutex);
lock= share->commit_count_lock;
pthread_mutex_unlock(&share->mutex);
if(ndb_get_table_statistics(ndb, tabname, &stat) == 0)
{ {
DBUG_PRINT("ndb_util_thread", DBUG_PRINT("ndb_util_thread",
("Table: %s, rows: %llu, commit_count: %llu", ("Table: %s, commit_count: %llu, rows: %llu",
share->table_name, stat.row_count, stat.commit_count)); share->table_name, stat.commit_count, stat.row_count));
share->commit_count= stat.commit_count;
} }
else else
{ {
DBUG_PRINT("ndb_util_thread", DBUG_PRINT("ndb_util_thread",
("Error: Could not get commit count for table %s", ("Error: Could not get commit count for table %s",
share->table_name)); share->table_name));
share->commit_count++; /* Invalidate */ stat.commit_count= 0;
} }
pthread_mutex_lock(&share->mutex);
if (share->commit_count_lock == lock)
share->commit_count= stat.commit_count;
pthread_mutex_unlock(&share->mutex);
/* Decrease the use count and possibly free share */ /* Decrease the use count and possibly free share */
free_share(share); free_share(share);
} }
...@@ -5852,6 +5948,26 @@ extern "C" pthread_handler_decl(ndb_util_thread_func, ...@@ -5852,6 +5948,26 @@ extern "C" pthread_handler_decl(ndb_util_thread_func,
/* Clear the list of open tables */ /* Clear the list of open tables */
util_open_tables.empty(); util_open_tables.empty();
/* Calculate new time to wake up */
int secs= 0;
int msecs= ndb_cache_check_time;
struct timeval tick_time;
gettimeofday(&tick_time, 0);
abstime.tv_sec= tick_time.tv_sec;
abstime.tv_nsec= tick_time.tv_usec * 1000;
if(msecs >= 1000){
secs= msecs / 1000;
msecs= msecs % 1000;
}
abstime.tv_sec+= secs;
abstime.tv_nsec+= msecs * 1000000;
if (abstime.tv_nsec >= 1000000000) {
abstime.tv_sec+= 1;
abstime.tv_nsec-= 1000000000;
}
} }
thd->cleanup(); thd->cleanup();
......
...@@ -60,6 +60,7 @@ typedef struct st_ndbcluster_share { ...@@ -60,6 +60,7 @@ typedef struct st_ndbcluster_share {
pthread_mutex_t mutex; pthread_mutex_t mutex;
char *table_name; char *table_name;
uint table_name_length,use_count; uint table_name_length,use_count;
uint commit_count_lock;
ulonglong commit_count; ulonglong commit_count;
} NDB_SHARE; } NDB_SHARE;
...@@ -77,6 +78,7 @@ class Thd_ndb { ...@@ -77,6 +78,7 @@ class Thd_ndb {
NdbTransaction *all; NdbTransaction *all;
NdbTransaction *stmt; NdbTransaction *stmt;
int error; int error;
List<NDB_SHARE> changed_tables;
}; };
class ha_ndbcluster: public handler class ha_ndbcluster: public handler
...@@ -226,7 +228,7 @@ class ha_ndbcluster: public handler ...@@ -226,7 +228,7 @@ class ha_ndbcluster: public handler
char *update_table_comment(const char * comment); char *update_table_comment(const char * comment);
private: private:
int check_ndb_connection(); int check_ndb_connection(THD* thd= current_thd);
NdbTransaction *m_active_trans; NdbTransaction *m_active_trans;
NdbScanOperation *m_active_cursor; NdbScanOperation *m_active_cursor;
...@@ -250,6 +252,7 @@ class ha_ndbcluster: public handler ...@@ -250,6 +252,7 @@ class ha_ndbcluster: public handler
ha_rows m_rows_to_insert; ha_rows m_rows_to_insert;
ha_rows m_rows_inserted; ha_rows m_rows_inserted;
ha_rows m_bulk_insert_rows; ha_rows m_bulk_insert_rows;
ha_rows m_rows_changed;
bool m_bulk_insert_not_flushed; bool m_bulk_insert_not_flushed;
ha_rows m_ops_pending; ha_rows m_ops_pending;
bool m_skip_auto_increment; bool m_skip_auto_increment;
......
...@@ -404,7 +404,7 @@ sys_var_long_ptr sys_innodb_thread_concurrency("innodb_thread_concurrency", ...@@ -404,7 +404,7 @@ sys_var_long_ptr sys_innodb_thread_concurrency("innodb_thread_concurrency",
#ifdef HAVE_NDBCLUSTER_DB #ifdef HAVE_NDBCLUSTER_DB
/* ndb thread specific variable settings */ /* ndb thread specific variable settings */
sys_var_thd_ulong sys_var_thd_ulong
sys_ndb_autoincrement_prefetch_sz("ndb_autoincrement_prefetch_sz", sys_ndb_autoincrement_prefetch_sz("ndb_autoincrement_prefetch_sz",
&SV::ndb_autoincrement_prefetch_sz); &SV::ndb_autoincrement_prefetch_sz);
sys_var_thd_bool sys_var_thd_bool
...@@ -413,7 +413,8 @@ sys_var_thd_bool ...@@ -413,7 +413,8 @@ sys_var_thd_bool
sys_ndb_use_exact_count("ndb_use_exact_count", &SV::ndb_use_exact_count); sys_ndb_use_exact_count("ndb_use_exact_count", &SV::ndb_use_exact_count);
sys_var_thd_bool sys_var_thd_bool
sys_ndb_use_transactions("ndb_use_transactions", &SV::ndb_use_transactions); sys_ndb_use_transactions("ndb_use_transactions", &SV::ndb_use_transactions);
sys_var_long_ptr sys_ndb_cache_check_time("ndb_cache_check_time", &ndb_cache_check_time); sys_var_long_ptr
sys_ndb_cache_check_time("ndb_cache_check_time", &ndb_cache_check_time);
#endif #endif
/* Time/date/datetime formats */ /* Time/date/datetime formats */
...@@ -686,10 +687,10 @@ sys_var *sys_variables[]= ...@@ -686,10 +687,10 @@ sys_var *sys_variables[]=
#endif #endif
#ifdef HAVE_NDBCLUSTER_DB #ifdef HAVE_NDBCLUSTER_DB
&sys_ndb_autoincrement_prefetch_sz, &sys_ndb_autoincrement_prefetch_sz,
&sys_ndb_cache_check_time,
&sys_ndb_force_send, &sys_ndb_force_send,
&sys_ndb_use_exact_count, &sys_ndb_use_exact_count,
&sys_ndb_use_transactions, &sys_ndb_use_transactions,
&sys_ndb_cache_check_time,
#endif #endif
&sys_unique_checks, &sys_unique_checks,
&sys_updatable_views_with_limit, &sys_updatable_views_with_limit,
...@@ -1276,7 +1277,6 @@ static int check_max_delayed_threads(THD *thd, set_var *var) ...@@ -1276,7 +1277,6 @@ static int check_max_delayed_threads(THD *thd, set_var *var)
return 0; return 0;
} }
static void fix_max_connections(THD *thd, enum_var_type type) static void fix_max_connections(THD *thd, enum_var_type type)
{ {
#ifndef EMBEDDED_LIBRARY #ifndef EMBEDDED_LIBRARY
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment