Commit 0463e1f7 authored by Monty's avatar Monty

Fixed bug in cache:

- The THR_LOCK org_lock must be stored in a shared structure so that all
  instances of a table can use it. Fixed by adding a ha_cache_share object
  that keeps track of this one.
- Fixed wrong test in get_status_and_flush_cache to detect in insert command
- Fixed in get_status_and_flush_cache that we always free the insert lock
  if we don't need it.
parent 0dd14078
......@@ -25,3 +25,20 @@ a b
2 world
3 !
drop table t1;
create table t1 (a int not null , b varchar(10) not null) engine=columnstore_cache;
create table t2 (a int not null , b varchar(10) not null) engine=columnstore_cache;
create table t3 (a int, b varchar(10)) engine=Aria;
insert into t1 values (1,"1"),(2,"2"),(3,"3");
insert into t2 values (10,"1"),(20,"2"),(30,"3");
insert into t3 values (100,"1"),(200,"2"),(300,"3");
insert into t1 select * from t2;
insert into t1 select * from t3;
select count(*), sum(a) from t1;
count(*) sum(a)
9 666
insert into t1 select * from t1 as t4;
flush tables;
select count(*), sum(a) from t1;
count(*) sum(a)
18 1332
drop table t1,t2,t3;
......@@ -22,3 +22,22 @@ delete from t1 where a=5;
select * from t1;
drop table t1;
#
# Test insert select
#
create table t1 (a int not null , b varchar(10) not null) engine=columnstore_cache;
create table t2 (a int not null , b varchar(10) not null) engine=columnstore_cache;
create table t3 (a int, b varchar(10)) engine=Aria;
insert into t1 values (1,"1"),(2,"2"),(3,"3");
insert into t2 values (10,"1"),(20,"2"),(30,"3");
insert into t3 values (100,"1"),(200,"2"),(300,"3");
insert into t1 select * from t2;
insert into t1 select * from t3;
select count(*), sum(a) from t1;
insert into t1 select * from t1 as t4;
# This is required by CVS.
flush tables;
select count(*), sum(a) from t1;
drop table t1,t2,t3;
......@@ -101,14 +101,15 @@ my_bool get_status_and_flush_cache(void *param,
ha_cache *cache= (ha_cache*) param;
int error;
enum_sql_command sql_command= cache->table->in_use->lex->sql_command;
cache->insert_command= (sql_command != SQLCOM_INSERT &&
sql_command != SQLCOM_LOAD);
cache->insert_command= (sql_command == SQLCOM_INSERT &&
sql_command == SQLCOM_LOAD);
/*
Call first the original Aria get_status function
All Aria get_status functions takes Maria handler as the parameter
*/
if (original_get_status)
(*original_get_status)(&cache->cache_handler->file, concurrent_insert);
if (cache->share->org_lock.get_status)
(*cache->share->org_lock.get_status)(&cache->cache_handler->file,
concurrent_insert);
/* If first get_status() call for this table, flush cache if needed */
if (!cache->lock_counter++)
......@@ -124,7 +125,7 @@ my_bool get_status_and_flush_cache(void *param,
}
}
}
else if (!cache->insert_command)
if (!cache->insert_command)
cache->free_locks();
return (0);
......@@ -135,34 +136,99 @@ my_bool get_status_and_flush_cache(void *param,
static my_bool cache_start_trans(void* param)
{
ha_cache *cache= (ha_cache*) param;
return (*cache->org_lock.start_trans)(cache->cache_handler->file);
return (*cache->share->org_lock.start_trans)(cache->cache_handler->file);
}
static void cache_copy_status(void* to, void *from)
{
ha_cache *to_cache= (ha_cache*) to, *from_cache= (ha_cache*) from;
(*to_cache->org_lock.copy_status)(to_cache->cache_handler->file,
from_cache->cache_handler->file);
(*to_cache->share->org_lock.copy_status)(to_cache->cache_handler->file,
from_cache->cache_handler->file);
}
static void cache_update_status(void* param)
{
ha_cache *cache= (ha_cache*) param;
(*cache->org_lock.update_status)(cache->cache_handler->file);
(*cache->share->org_lock.update_status)(cache->cache_handler->file);
}
static void cache_restore_status(void *param)
{
ha_cache *cache= (ha_cache*) param;
(*cache->org_lock.restore_status)(cache->cache_handler->file);
(*cache->share->org_lock.restore_status)(cache->cache_handler->file);
}
static my_bool cache_check_status(void *param)
{
ha_cache *cache= (ha_cache*) param;
return (*cache->org_lock.check_status)(cache->cache_handler->file);
return (*cache->share->org_lock.check_status)(cache->cache_handler->file);
}
/*****************************************************************************
ha_cache_share functions (Common storage for an open cache file)
*****************************************************************************/
static ha_cache_share *cache_share_list= 0;
static PSI_mutex_key key_LOCK_cache_share;
static PSI_mutex_info all_mutexes[]=
{
{ &key_LOCK_cache_share, "LOCK_cache_share", PSI_FLAG_GLOBAL},
};
static mysql_mutex_t LOCK_cache_share;
/*
Find or create a share
*/
ha_cache_share *find_cache_share(const char *name)
{
ha_cache_share *pos, *share;
mysql_mutex_lock(&LOCK_cache_share);
for (pos= cache_share_list; pos; pos= pos->next)
{
if (!strcmp(pos->name, name))
{
mysql_mutex_unlock(&LOCK_cache_share);
return(pos);
}
}
if (!(share= (ha_cache_share*) my_malloc(PSI_NOT_INSTRUMENTED,
sizeof(*share) + strlen(name)+1,
MYF(MY_FAE))))
{
mysql_mutex_unlock(&LOCK_cache_share);
return 0;
}
share->name= (char*) (share+1);
share->open_count= 1;
strmov((char*) share->name, name);
share->next= cache_share_list;
cache_share_list= share;
mysql_mutex_unlock(&LOCK_cache_share);
return share;
}
/*
Decrement open counter and free share if there is no more users
*/
void ha_cache_share::close()
{
ha_cache_share *pos;
mysql_mutex_lock(&LOCK_cache_share);
if (!--open_count)
{
ha_cache_share **prev= &cache_share_list;
for ( ; (pos= *prev) != this; prev= &pos->next)
;
*prev= next;
my_free(this);
}
mysql_mutex_unlock(&LOCK_cache_share);
}
/*****************************************************************************
ha_cache handler functions
*****************************************************************************/
......@@ -171,6 +237,7 @@ ha_cache::ha_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root)
:ha_tina(derived_hton, table_arg)
{
cache_handler= new (mem_root) ha_maria(maria_hton, table_arg);
share= 0;
lock_counter= 0;
}
......@@ -230,6 +297,12 @@ int ha_cache::open(const char *name, int mode, uint open_flags)
if ((error= cache_handler->open(cache_name, mode, open_flags)))
DBUG_RETURN(error);
if (!(share= find_cache_share(name)))
{
cache_handler->close();
DBUG_RETURN(ER_OUTOFMEMORY);
}
/* Fix lock so that it goes through get_status_and_flush() */
THR_LOCK *lock= &cache_handler->file->s->lock;
if (lock->get_status != &get_status_and_flush_cache)
......@@ -237,8 +310,8 @@ int ha_cache::open(const char *name, int mode, uint open_flags)
mysql_mutex_lock(&cache_handler->file->s->intern_lock);
if (lock->get_status != &get_status_and_flush_cache)
{
org_lock= lock[0];
lock->get_status= &get_status_and_flush_cache;
/* Remember original lock. Used by the THR_lock cache functions */
share->org_lock= lock[0];
if (lock->start_trans)
lock->start_trans= &cache_start_trans;
if (lock->copy_status)
......@@ -251,10 +324,11 @@ int ha_cache::open(const char *name, int mode, uint open_flags)
lock->check_status= &cache_check_status;
if (lock->restore_status)
lock->restore_status= &cache_restore_status;
lock->get_status= &get_status_and_flush_cache;
}
cache_handler->file->lock.status_param= (void*) this;
mysql_mutex_unlock(&cache_handler->file->s->intern_lock);
}
cache_handler->file->lock.status_param= (void*) this;
if ((error= parent::open(name, mode, open_flags)))
{
......@@ -268,10 +342,13 @@ int ha_cache::open(const char *name, int mode, uint open_flags)
int ha_cache::close()
{
int error, error2;
ha_cache_share *org_share= share;
DBUG_ENTER("ha_cache::close()");
error= cache_handler->close();
if ((error2= parent::close()))
error= error2;
if (org_share)
org_share->close();
DBUG_RETURN(error);
}
......@@ -482,12 +559,17 @@ static int ha_cache_init(void *p)
{
handlerton *cache_hton;
int error;
uint count;
cache_hton= (handlerton *) p;
cache_hton->create= ha_cache_create_handler;
cache_hton->panic= 0;
cache_hton->flags= HTON_NO_PARTITION;
count= sizeof(all_mutexes)/sizeof(all_mutexes[0]);
mysql_mutex_register("ha_cache", all_mutexes, count);
mysql_mutex_init(key_LOCK_cache_share, &LOCK_cache_share, MY_MUTEX_INIT_FAST);
{
LEX_CSTRING name= { STRING_WITH_LEN("CSV") };
plugin= ha_resolve_by_name(0, &name, 0);
......@@ -507,6 +589,7 @@ static int ha_cache_deinit(void *p)
plugin_unlock(0, plugin);
plugin= 0;
}
mysql_mutex_destroy(&LOCK_cache_share);
return 0;
}
......@@ -548,6 +631,7 @@ void ha_cache::free_locks()
{
/* We don't need to lock cache_handler anymore as it's already flushed */
mysql_mutex_unlock(&cache_handler->file->lock.lock->mutex);
thr_unlock(&cache_handler->file->lock, 0);
/* Restart transaction for columnstore table */
......@@ -556,6 +640,9 @@ void ha_cache::free_locks()
parent::external_lock(table->in_use, F_UNLCK);
parent::external_lock(table->in_use, original_lock_type);
}
/* Needed as we are going back to end of thr_lock() */
mysql_mutex_lock(&cache_handler->file->lock.lock->mutex);
}
/**
......
......@@ -21,6 +21,18 @@
#include "../maria/ha_maria.h"
#include <thr_lock.h>
class ha_cache_share
{
ha_cache_share *next; /* Next open share */
const char *name;
uint open_count;
public:
THR_LOCK org_lock;
friend ha_cache_share *find_cache_share(const char *name);
void close();
};
class ha_cache :public ha_tina
{
typedef ha_tina parent;
......@@ -28,9 +40,9 @@ class ha_cache :public ha_tina
bool insert_command;
public:
THR_LOCK org_lock;
uint lock_counter;
ha_maria *cache_handler;
ha_cache_share *share;
ha_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root);
~ha_cache();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment