Commit 1daef391 authored by unknown's avatar unknown

WL#2269 Enable query cache for NDB

- Added a thread that fetches commit_count for open tables. This
will mean that NDB will not have to be contacted for every use of a cached query. 


sql/ha_ndbcluster.cc:
  Added a thread that periodically will fetch commit_count 
  for open tables and store that value in share. 
  The commit count value is then used when query cache 
  asks if a cached query can be used. 
  The thread activation interval is regulated by the 
  config variable ndb_cache_check_time, it's default value is 0
  which means that NDB is contacted every time a cached query is reused.
sql/ha_ndbcluster.h:
  Added commit_count to share
  Added ndb_cache_check_time
sql/mysqld.cc:
  Added config variable ndb_cache_check_time
sql/set_var.cc:
  Added config variable ndb_cache_check_time
parent b0786dec
drop table if exists t1;
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
set GLOBAL ndb_cache_check_time=5;
reset query cache;
flush status;
CREATE TABLE t1 ( pk int not null primary key,
a int, b int not null, c varchar(20)) ENGINE=ndbcluster;
insert into t1 value (1, 2, 3, 'First row');
select * from t1;
pk a b c
1 2 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 1
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 1
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
select * from t1;
pk a b c
1 2 3 First row
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 1
update t1 set a=3 where pk=1;
select * from t1;
pk a b c
1 3 3 First row
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 2
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 1
insert into t1 value (2, 7, 8, 'Second row');
insert into t1 value (4, 5, 6, 'Fourth row');
select * from t1;
pk a b c
2 7 8 Second row
4 5 6 Fourth row
1 3 3 First row
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 3
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 1
select * from t1;
pk a b c
2 7 8 Second row
4 5 6 Fourth row
1 3 3 First row
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 2
select * from t1 where b=3;
pk a b c
1 3 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 2
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 2
select * from t1 where b=3;
pk a b c
1 3 3 First row
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 3
delete from t1 where c='Fourth row';
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 0
select * from t1 where b=3;
pk a b c
1 3 3 First row
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 3
use test;
select * from t1;
pk a b c
2 7 8 Second row
1 3 3 First row
select * from t1 where b=3;
pk a b c
1 3 3 First row
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 4
update t1 set a=4 where b=3;
use test;
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 0
select * from t1;
pk a b c
2 7 8 Second row
1 4 3 First row
select * from t1;
pk a b c
2 7 8 Second row
1 4 3 First row
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 7
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 5
select * from t1;
pk a b c
2 7 8 Second row
1 4 3 First row
select * from t1;
pk a b c
2 7 8 Second row
1 4 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 1
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 7
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 7
begin;
update t1 set a=5 where pk=1;
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 0
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 7
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 7
select * from t1;
pk a b c
2 7 8 Second row
1 4 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 1
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 8
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 7
commit;
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 1
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 8
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 7
select * from t1;
pk a b c
2 7 8 Second row
1 5 3 First row
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 9
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 7
select * from t1;
pk a b c
2 7 8 Second row
1 5 3 First row
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 1
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 9
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 8
drop table t1;
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 0
SET GLOBAL query_cache_size=0;
SET GLOBAL ndb_cache_check_time=0;
drop table if exists t1, t2;
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
set GLOBAL ndb_cache_check_time=1;
reset query cache;
flush status;
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
set GLOBAL ndb_cache_check_time=1;
reset query cache;
flush status;
create table t1 (a int) engine=ndbcluster;
create table t2 (a int) engine=ndbcluster;
insert into t1 value (2);
insert into t2 value (3);
select * from t1;
a
2
select * from t2;
a
3
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 2
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 2
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 0
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 0
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
select * from t1;
a
2
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 1
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 1
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
update t1 set a=3 where a=2;
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 2
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 2
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
select * from t1;
a
3
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 2
show status like "Qcache_inserts";
Variable_name Value
Qcache_inserts 3
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
drop table t1, t2;
-- source include/have_query_cache.inc
-- source include/have_ndb.inc
--disable_warnings
drop table if exists t1;
--enable_warnings
# Turn on and reset query cache
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
# Turn on thread that will fetch commit count for open tables
set GLOBAL ndb_cache_check_time=5;
reset query cache;
flush status;
# Wait for thread to wake up and start "working"
sleep 20;
# Create test table in NDB
CREATE TABLE t1 ( pk int not null primary key,
a int, b int not null, c varchar(20)) ENGINE=ndbcluster;
insert into t1 value (1, 2, 3, 'First row');
# Perform one query which should be inerted in query cache
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
# Perform the same query and make sure the query cache is hit
select * from t1;
show status like "Qcache_hits";
# Update the table and make sure the correct data is returned
update t1 set a=3 where pk=1;
select * from t1;
show status like "Qcache_inserts";
show status like "Qcache_hits";
# Insert a new record and make sure the correct data is returned
insert into t1 value (2, 7, 8, 'Second row');
insert into t1 value (4, 5, 6, 'Fourth row');
select * from t1;
show status like "Qcache_inserts";
show status like "Qcache_hits";
select * from t1;
show status like "Qcache_hits";
# Perform a "new" query and make sure the query cache is not hit
select * from t1 where b=3;
show status like "Qcache_queries_in_cache";
show status like "Qcache_hits";
# Same query again...
select * from t1 where b=3;
show status like "Qcache_hits";
# Delete from the table
delete from t1 where c='Fourth row';
show status like "Qcache_queries_in_cache";
select * from t1 where b=3;
show status like "Qcache_hits";
# Start another connection and check that the query cache is hit
connect (con1,localhost,root,,);
connection con1;
use test;
select * from t1;
select * from t1 where b=3;
show status like "Qcache_hits";
# Update the table and switch to other connection
update t1 set a=4 where b=3;
connect (con2,localhost,root,,);
connection con2;
use test;
show status like "Qcache_queries_in_cache";
select * from t1;
select * from t1;
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
select * from t1;
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
# Use transactions and make sure the query cache is not updated until
# transaction is commited
begin;
update t1 set a=5 where pk=1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con2;
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
commit;
# Sleep to let the query cache thread update commit count
sleep 10;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con2;
select * from t1;
show status like "Qcache_inserts";
show status like "Qcache_hits";
connection con1;
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
drop table t1;
show status like "Qcache_queries_in_cache";
SET GLOBAL query_cache_size=0;
SET GLOBAL ndb_cache_check_time=0;
-- source include/have_query_cache.inc
-- source include/have_ndb.inc
-- source include/have_multi_ndb.inc
--disable_warnings
drop table if exists t1, t2;
--enable_warnings
# Turn on and reset query cache on server1
connection server1;
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
set GLOBAL ndb_cache_check_time=1;
reset query cache;
flush status;
# Turn on and reset query cache on server2
connection server2;
set GLOBAL query_cache_type=on;
set GLOBAL query_cache_size=1355776;
set GLOBAL ndb_cache_check_time=1;
reset query cache;
flush status;
# Sleep so that the query cache check thread has time to start
sleep 15;
# Create test tables in NDB and load them into cache
# on server1
connection server1;
create table t1 (a int) engine=ndbcluster;
create table t2 (a int) engine=ndbcluster;
insert into t1 value (2);
insert into t2 value (3);
select * from t1;
select * from t2;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
# Connect server2, load table in to cache, then update the table
connection server2;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
update t1 set a=3 where a=2;
# Sleep so that the query cache check thread has time to run
sleep 5;
# Connect to server1 and check that cache is invalidated
# and correct data is returned
connection server1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
select * from t1;
show status like "Qcache_queries_in_cache";
show status like "Qcache_inserts";
show status like "Qcache_hits";
drop table t1, t2;
...@@ -86,6 +86,12 @@ static int unpackfrm(const void **data, uint *len, ...@@ -86,6 +86,12 @@ static int unpackfrm(const void **data, uint *len,
static int ndb_get_table_statistics(Ndb*, const char *, static int ndb_get_table_statistics(Ndb*, const char *,
Uint64* rows, Uint64* commits); Uint64* rows, Uint64* commits);
// Util thread variables
static pthread_t ndb_util_thread;
pthread_mutex_t LOCK_ndb_util_thread;
pthread_cond_t COND_ndb_util_thread;
extern "C" pthread_handler_decl(ndb_util_thread_func, arg);
ulong ndb_cache_check_time;
/* /*
Dummy buffer to read zero pack_length fields Dummy buffer to read zero pack_length fields
...@@ -3865,6 +3871,7 @@ ha_ndbcluster::~ha_ndbcluster() ...@@ -3865,6 +3871,7 @@ ha_ndbcluster::~ha_ndbcluster()
} }
/* /*
Open a table for further use Open a table for further use
- fetch metadata for this table from NDB - fetch metadata for this table from NDB
...@@ -3963,16 +3970,14 @@ void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb) ...@@ -3963,16 +3970,14 @@ void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
Ndb* check_ndb_in_thd(THD* thd) Ndb* check_ndb_in_thd(THD* thd)
{ {
DBUG_ENTER("check_ndb_in_thd");
Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb; Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
if (!thd_ndb) if (!thd_ndb)
{ {
if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb())) if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
DBUG_RETURN(NULL); return NULL;
thd->transaction.thd_ndb= thd_ndb; thd->transaction.thd_ndb= thd_ndb;
} }
DBUG_RETURN(thd_ndb->ndb); return thd_ndb->ndb;
} }
...@@ -4310,13 +4315,21 @@ bool ndbcluster_init() ...@@ -4310,13 +4315,21 @@ bool ndbcluster_init()
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0, (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0); (hash_get_key) ndbcluster_get_key,0,0);
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST); pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_ndb_util_thread,MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_ndb_util_thread,NULL);
ndbcluster_inited= 1;
#ifdef USE_DISCOVER_ON_STARTUP // Create utility thread
if (ndb_discover_tables() != 0) pthread_t tmp;
if (pthread_create(&tmp,&connection_attrib,ndb_util_thread_func,0))
{
DBUG_PRINT("error", ("Could not create ndb utility thread"));
goto ndbcluster_init_error; goto ndbcluster_init_error;
#endif }
ndbcluster_inited= 1;
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
ndbcluster_init_error: ndbcluster_init_error:
ndbcluster_end(); ndbcluster_end();
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
...@@ -4326,12 +4339,19 @@ bool ndbcluster_init() ...@@ -4326,12 +4339,19 @@ bool ndbcluster_init()
/* /*
End use of the NDB Cluster table handler End use of the NDB Cluster table handler
- free all global variables allocated by - free all global variables allocated by
ndcluster_init() ndbcluster_init()
*/ */
bool ndbcluster_end() bool ndbcluster_end()
{ {
DBUG_ENTER("ndbcluster_end"); DBUG_ENTER("ndbcluster_end");
// Kill ndb utility thread
(void) pthread_mutex_lock(&LOCK_ndb_util_thread);
DBUG_PRINT("exit",("killing ndb util thread: %lx",ndb_util_thread));
(void) pthread_cond_signal(&COND_ndb_util_thread);
(void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
if(g_ndb) if(g_ndb)
delete g_ndb; delete g_ndb;
g_ndb= NULL; g_ndb= NULL;
...@@ -4342,6 +4362,8 @@ bool ndbcluster_end() ...@@ -4342,6 +4362,8 @@ bool ndbcluster_end()
DBUG_RETURN(0); DBUG_RETURN(0);
hash_free(&ndbcluster_open_tables); hash_free(&ndbcluster_open_tables);
pthread_mutex_destroy(&ndbcluster_mutex); pthread_mutex_destroy(&ndbcluster_mutex);
pthread_mutex_destroy(&LOCK_ndb_util_thread);
pthread_cond_destroy(&COND_ndb_util_thread);
ndbcluster_inited= 0; ndbcluster_inited= 0;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -4534,12 +4556,53 @@ const char* ha_ndbcluster::index_type(uint key_number) ...@@ -4534,12 +4556,53 @@ const char* ha_ndbcluster::index_type(uint key_number)
return "HASH"; return "HASH";
} }
} }
uint8 ha_ndbcluster::table_cache_type() uint8 ha_ndbcluster::table_cache_type()
{ {
DBUG_ENTER("ha_ndbcluster::table_cache_type=HA_CACHE_TBL_ASKTRANSACT"); DBUG_ENTER("ha_ndbcluster::table_cache_type=HA_CACHE_TBL_ASKTRANSACT");
DBUG_RETURN(HA_CACHE_TBL_ASKTRANSACT); DBUG_RETURN(HA_CACHE_TBL_ASKTRANSACT);
} }
uint ndb_get_commitcount(THD* thd, char* dbname, char* tabname,
Uint64* commit_count)
{
DBUG_ENTER("ndb_get_commitcount");
if (ndb_cache_check_time > 0)
{
// Use cached commit_count from share
char name[FN_REFLEN];
NDB_SHARE* share;
(void)strxnmov(name, FN_REFLEN,
"./",dbname,"/",tabname,NullS);
DBUG_PRINT("info", ("name: %s", name));
pthread_mutex_lock(&ndbcluster_mutex);
if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
(byte*) name,
strlen(name))))
{
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(1);
}
*commit_count= share->commit_count;
DBUG_PRINT("info", ("commit_count: %d", *commit_count));
pthread_mutex_unlock(&ndbcluster_mutex);
DBUG_RETURN(0);
}
// Get commit_count from NDB
Ndb *ndb;
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(1);
ndb->setDatabaseName(dbname);
if (ndb_get_table_statistics(ndb, tabname, 0, commit_count))
DBUG_RETURN(1);
DBUG_RETURN(0);
}
static static
my_bool my_bool
ndbcluster_cache_retrieval_allowed( ndbcluster_cache_retrieval_allowed(
...@@ -4561,51 +4624,33 @@ ndbcluster_cache_retrieval_allowed( ...@@ -4561,51 +4624,33 @@ ndbcluster_cache_retrieval_allowed(
all cached queries with this table*/ all cached queries with this table*/
{ {
DBUG_ENTER("ndbcluster_cache_retrieval_allowed"); DBUG_ENTER("ndbcluster_cache_retrieval_allowed");
char tabname[128];
char *dbname= full_name; Uint64 commit_count;
my_bool is_autocommit; bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
{ char* dbname= full_name;
int dbname_len= strlen(full_name); char* tabname= dbname+strlen(dbname)+1;
int tabname_len= full_name_len-dbname_len-1;
memcpy(tabname, full_name+dbname_len+1, tabname_len);
tabname[tabname_len]= '\0';
}
if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
is_autocommit = FALSE;
else
is_autocommit = TRUE;
DBUG_PRINT("enter",("dbname=%s, tabname=%s, autocommit=%d", DBUG_PRINT("enter",("dbname=%s, tabname=%s, autocommit=%d",
dbname,tabname,is_autocommit)); dbname, tabname, is_autocommit));
if (!is_autocommit) if (!is_autocommit)
{
DBUG_PRINT("info",("OPTION_NOT_AUTOCOMMIT=%d OPTION_BEGIN=%d",
thd->options & OPTION_NOT_AUTOCOMMIT,
thd->options & OPTION_BEGIN));
// ToDo enable cache inside a transaction
// no need to invalidate though so leave *engine_data
DBUG_RETURN(FALSE);
}
{
Ndb *ndb;
Uint64 commit_count;
if (!(ndb= check_ndb_in_thd(thd)))
{
*engine_data= *engine_data+1; // invalidate
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
}
ndb->setDatabaseName(dbname); if (ndb_get_commitcount(thd, dbname, tabname, &commit_count))
if (ndb_get_table_statistics(ndb, tabname, 0, &commit_count))
{ {
*engine_data= *engine_data+1; // invalidate *engine_data= *engine_data+1; // invalidate
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
DBUG_PRINT("info", ("*engine_data=%llu, commit_count=%llu",
*engine_data, commit_count));
if (*engine_data != commit_count) if (*engine_data != commit_count)
{ {
*engine_data= commit_count; // invalidate *engine_data= commit_count; // invalidate
DBUG_PRINT("exit",("Do not use cache, commit_count has changed"));
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
}
DBUG_PRINT("exit",("*engine_data=%d ok, use cache",*engine_data)); DBUG_PRINT("exit",("OK to use cache, *engine_data=%llu",*engine_data));
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
...@@ -4630,35 +4675,24 @@ ha_ndbcluster::cached_table_registration( ...@@ -4630,35 +4675,24 @@ ha_ndbcluster::cached_table_registration(
invalidate all cached queries with this table*/ invalidate all cached queries with this table*/
{ {
DBUG_ENTER("ha_ndbcluster::cached_table_registration"); DBUG_ENTER("ha_ndbcluster::cached_table_registration");
my_bool is_autocommit;
if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) bool is_autocommit= !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN));
is_autocommit = FALSE;
else
is_autocommit = TRUE;
DBUG_PRINT("enter",("dbname=%s, tabname=%s, is_autocommit=%d", DBUG_PRINT("enter",("dbname=%s, tabname=%s, is_autocommit=%d",
m_dbname,m_tabname,is_autocommit)); m_dbname,m_tabname,is_autocommit));
if (!is_autocommit) if (!is_autocommit)
{
DBUG_PRINT("info",("OPTION_NOT_AUTOCOMMIT=%d OPTION_BEGIN=%d",
thd->options & OPTION_NOT_AUTOCOMMIT,
thd->options & OPTION_BEGIN));
// ToDo enable cache inside a transaction
// no need to invalidate though so leave *engine_data
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
}
{
Uint64 commit_count; Uint64 commit_count;
Ndb *ndb= get_ndb(); if (ndb_get_commitcount(thd, m_dbname, m_tabname, &commit_count))
ndb->setDatabaseName(m_dbname);
if (ndb_get_table_statistics(ndb, m_tabname, 0, &commit_count))
{ {
*engine_data= 0; *engine_data= 0;
DBUG_PRINT("error", ("Could not get commitcount"))
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
*engine_data= commit_count; *engine_data= commit_count;
}
*engine_callback= ndbcluster_cache_retrieval_allowed; *engine_callback= ndbcluster_cache_retrieval_allowed;
DBUG_PRINT("exit",("*engine_data=%d", *engine_data)); DBUG_PRINT("exit",("*engine_data=%llu", *engine_data));
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
...@@ -4700,8 +4734,14 @@ static NDB_SHARE* get_share(const char *table_name) ...@@ -4700,8 +4734,14 @@ static NDB_SHARE* get_share(const char *table_name)
} }
thr_lock_init(&share->lock); thr_lock_init(&share->lock);
pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST); pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
share->commit_count= 0;
} }
} }
DBUG_PRINT("share",
("table_name: %s, length: %d, use_count: %d, commit_count: %d",
share->table_name, share->table_name_length, share->use_count,
share->commit_count));
share->use_count++; share->use_count++;
pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_unlock(&ndbcluster_mutex);
return share; return share;
...@@ -4868,10 +4908,10 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -4868,10 +4908,10 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
ndb->closeTransaction(pTrans); ndb->closeTransaction(pTrans);
if(row_count) if(row_count)
* row_count= sum_rows; *row_count= sum_rows;
if(commit_count) if(commit_count)
* commit_count= sum_commits; *commit_count= sum_commits;
DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits)); DBUG_PRINT("exit", ("records: %llu commits: %llu", sum_rows, sum_commits));
DBUG_RETURN(0); DBUG_RETURN(0);
} while(0); } while(0);
...@@ -4906,4 +4946,124 @@ int ha_ndbcluster::write_ndb_file() ...@@ -4906,4 +4946,124 @@ int ha_ndbcluster::write_ndb_file()
DBUG_RETURN(error); DBUG_RETURN(error);
} }
// Utility thread main loop
extern "C" pthread_handler_decl(ndb_util_thread_func,arg __attribute__((unused)))
{
THD *thd; // needs to be first for thread_stack
int error = 0;
struct timespec abstime;
my_thread_init();
DBUG_ENTER("ndb_util_thread");
DBUG_PRINT("enter", ("ndb_cache_check_time: %d", ndb_cache_check_time));
thd= new THD; // note that contructor of THD uses DBUG_ !
THD_CHECK_SENTRY(thd);
pthread_detach_this_thread();
ndb_util_thread = pthread_self();
thd->thread_stack = (char*)&thd; // remember where our stack is
if (thd->store_globals())
{
thd->cleanup();
delete thd;
DBUG_RETURN(NULL);
}
List<NDB_SHARE> util_open_tables;
set_timespec(abstime, ndb_cache_check_time);
for (;;)
{
pthread_mutex_lock(&LOCK_ndb_util_thread);
error= pthread_cond_timedwait(&COND_ndb_util_thread,
&LOCK_ndb_util_thread,
&abstime);
pthread_mutex_unlock(&LOCK_ndb_util_thread);
DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d",
ndb_cache_check_time));
if (abort_loop)
break; // Shutting down server
if (ndb_cache_check_time == 0)
{
set_timespec(abstime, 10);
continue;
}
// Set new time to wake up
set_timespec(abstime, ndb_cache_check_time);
// Lock mutex and fill list with pointers to all open tables
NDB_SHARE *share;
pthread_mutex_lock(&ndbcluster_mutex);
for (uint i= 0; i < ndbcluster_open_tables.records; i++)
{
share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
share->use_count++; // Make sure the table can't be closed
DBUG_PRINT("ndb_util_thread",
("Found open table[%d]: %s, use_count: %d",
i, share->table_name, share->use_count));
// Store pointer to table
util_open_tables.push_back(share);
}
pthread_mutex_unlock(&ndbcluster_mutex);
// Iterate through the open files list
List_iterator_fast<NDB_SHARE> it(util_open_tables);
while (share=it++)
{
// Split tab- and dbname
char buf[FN_REFLEN];
char *tabname, *db;
uint length= dirname_length(share->table_name);
tabname= share->table_name+length;
memcpy(buf, share->table_name, length-1);
buf[length-1]= 0;
db= buf+dirname_length(buf);
DBUG_PRINT("ndb_util_thread",
("Fetching commit count for: %s, db: %s, tab: %s",
share->table_name, db, tabname));
// Contact NDB to get commit count for table
g_ndb->setDatabaseName(db);
Uint64 rows, commit_count;
if(ndb_get_table_statistics(g_ndb, tabname,
&rows, &commit_count) == 0){
DBUG_PRINT("ndb_util_thread",
("Table: %s, rows: %llu, commit_count: %llu",
share->table_name, rows, commit_count));
share->commit_count= commit_count;
}
else
{
DBUG_PRINT("ndb_util_thread",
("Error: Could not get commit count for table %s",
share->table_name));
share->commit_count++; // Invalidate
}
// Decrease the use count and possibly free share
free_share(share);
}
// Clear the list of open tables
util_open_tables.empty();
}
thd->cleanup();
delete thd;
DBUG_PRINT("exit", ("ndb_util_thread"));
my_thread_end();
DBUG_RETURN(NULL);
}
#endif /* HAVE_NDBCLUSTER_DB */ #endif /* HAVE_NDBCLUSTER_DB */
...@@ -38,6 +38,7 @@ class NdbBlob; ...@@ -38,6 +38,7 @@ class NdbBlob;
// connectstring to cluster if given by mysqld // connectstring to cluster if given by mysqld
extern const char *ndbcluster_connectstring; extern const char *ndbcluster_connectstring;
extern ulong ndb_cache_check_time;
typedef enum ndb_index_type { typedef enum ndb_index_type {
UNDEFINED_INDEX = 0, UNDEFINED_INDEX = 0,
...@@ -59,6 +60,7 @@ typedef struct st_ndbcluster_share { ...@@ -59,6 +60,7 @@ typedef struct st_ndbcluster_share {
pthread_mutex_t mutex; pthread_mutex_t mutex;
char *table_name; char *table_name;
uint table_name_length,use_count; uint table_name_length,use_count;
uint commit_count;
} NDB_SHARE; } NDB_SHARE;
/* /*
......
...@@ -284,6 +284,7 @@ my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster; ...@@ -284,6 +284,7 @@ my_bool opt_console= 0, opt_bdb, opt_innodb, opt_isam, opt_ndbcluster;
#ifdef HAVE_NDBCLUSTER_DB #ifdef HAVE_NDBCLUSTER_DB
const char *opt_ndbcluster_connectstring= 0; const char *opt_ndbcluster_connectstring= 0;
my_bool opt_ndb_shm, opt_ndb_optimized_node_selection; my_bool opt_ndb_shm, opt_ndb_optimized_node_selection;
ulong opt_ndb_cache_check_time= 0;
#endif #endif
my_bool opt_readonly, use_temp_pool, relay_log_purge; my_bool opt_readonly, use_temp_pool, relay_log_purge;
my_bool opt_sync_bdb_logs, opt_sync_frm; my_bool opt_sync_bdb_logs, opt_sync_frm;
...@@ -4016,7 +4017,7 @@ enum options_mysqld ...@@ -4016,7 +4017,7 @@ enum options_mysqld
OPT_INNODB, OPT_ISAM, OPT_INNODB, OPT_ISAM,
OPT_NDBCLUSTER, OPT_NDB_CONNECTSTRING, OPT_NDB_USE_EXACT_COUNT, OPT_NDBCLUSTER, OPT_NDB_CONNECTSTRING, OPT_NDB_USE_EXACT_COUNT,
OPT_NDB_FORCE_SEND, OPT_NDB_AUTOINCREMENT_PREFETCH_SZ, OPT_NDB_FORCE_SEND, OPT_NDB_AUTOINCREMENT_PREFETCH_SZ,
OPT_NDB_SHM, OPT_NDB_OPTIMIZED_NODE_SELECTION, OPT_NDB_SHM, OPT_NDB_OPTIMIZED_NODE_SELECTION, OPT_NDB_CACHE_CHECK_TIME,
OPT_SKIP_SAFEMALLOC, OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_TEMP_POOL, OPT_TX_ISOLATION,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
...@@ -4498,6 +4499,10 @@ Disable with --skip-ndbcluster (will save memory).", ...@@ -4498,6 +4499,10 @@ Disable with --skip-ndbcluster (will save memory).",
(gptr*) &opt_ndb_optimized_node_selection, (gptr*) &opt_ndb_optimized_node_selection,
(gptr*) &opt_ndb_optimized_node_selection, (gptr*) &opt_ndb_optimized_node_selection,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0}, 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{ "ndb_cache_check_time", OPT_NDB_CACHE_CHECK_TIME,
"A dedicated thread is created to update cached commit count value at the given interval.",
(gptr*) &opt_ndb_cache_check_time, (gptr*) &opt_ndb_cache_check_time, 0, GET_ULONG, REQUIRED_ARG,
0, 0, LONG_TIMEOUT, 0, 1, 0},
#endif #endif
{"new", 'n', "Use very new possible 'unsafe' functions.", {"new", 'n', "Use very new possible 'unsafe' functions.",
(gptr*) &global_system_variables.new_mode, (gptr*) &global_system_variables.new_mode,
......
...@@ -370,6 +370,7 @@ sys_var_thd_bool ...@@ -370,6 +370,7 @@ sys_var_thd_bool
sys_ndb_use_exact_count("ndb_use_exact_count", &SV::ndb_use_exact_count); sys_ndb_use_exact_count("ndb_use_exact_count", &SV::ndb_use_exact_count);
sys_var_thd_bool sys_var_thd_bool
sys_ndb_use_transactions("ndb_use_transactions", &SV::ndb_use_transactions); sys_ndb_use_transactions("ndb_use_transactions", &SV::ndb_use_transactions);
sys_var_long_ptr sys_ndb_cache_check_time("ndb_cache_check_time", &ndb_cache_check_time);
#endif #endif
/* Time/date/datetime formats */ /* Time/date/datetime formats */
...@@ -630,6 +631,7 @@ sys_var *sys_variables[]= ...@@ -630,6 +631,7 @@ sys_var *sys_variables[]=
&sys_ndb_force_send, &sys_ndb_force_send,
&sys_ndb_use_exact_count, &sys_ndb_use_exact_count,
&sys_ndb_use_transactions, &sys_ndb_use_transactions,
&sys_ndb_cache_check_time,
#endif #endif
&sys_unique_checks, &sys_unique_checks,
&sys_warning_count &sys_warning_count
...@@ -797,6 +799,7 @@ struct show_var_st init_vars[]= { ...@@ -797,6 +799,7 @@ struct show_var_st init_vars[]= {
{sys_ndb_force_send.name, (char*) &sys_ndb_force_send, SHOW_SYS}, {sys_ndb_force_send.name, (char*) &sys_ndb_force_send, SHOW_SYS},
{sys_ndb_use_exact_count.name,(char*) &sys_ndb_use_exact_count, SHOW_SYS}, {sys_ndb_use_exact_count.name,(char*) &sys_ndb_use_exact_count, SHOW_SYS},
{sys_ndb_use_transactions.name,(char*) &sys_ndb_use_transactions, SHOW_SYS}, {sys_ndb_use_transactions.name,(char*) &sys_ndb_use_transactions, SHOW_SYS},
{sys_ndb_cache_check_time.name,(char*) &sys_ndb_cache_check_time, SHOW_SYS},
#endif #endif
{sys_net_buffer_length.name,(char*) &sys_net_buffer_length, SHOW_SYS}, {sys_net_buffer_length.name,(char*) &sys_net_buffer_length, SHOW_SYS},
{sys_net_read_timeout.name, (char*) &sys_net_read_timeout, SHOW_SYS}, {sys_net_read_timeout.name, (char*) &sys_net_read_timeout, SHOW_SYS},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment