Commit 63b2a551 authored by bell@sanja.is.com.ua's avatar bell@sanja.is.com.ua

processing trunsactional tables in query cache

parent 12f1cba0
......@@ -36229,9 +36229,8 @@ If a table changes (@code{INSERT}, @code{UPDATE}, @code{DELETE},
then all cached queries that used this table (possibly through a
@code{MRG_MyISAM} table!) become invalid and are removed from the cache.
Currently all @code{InnoDB} tables are invalidated on @code{COMMIT},
in the future this will be changed so only tables changed in the
transaction cause the corresponding cache entries to be invalidated.
Changed transactional @code{InnoDB} tables will be invalidated on
@code{COMMIT}.
A query cannot be cached if it contains one of the functions:
@multitable @columnfractions .25 .25 .25 .25
drop table if exists t1;
drop table if exists t1, t2, t3;
flush status;
set autocommit=0;
create table t1 (a int not null) type=innodb;
insert into t1 values (1),(2),(3);
......@@ -13,3 +14,87 @@ Qcache_queries_in_cache 0
drop table t1;
commit;
set autocommit=1;
begin;
create table t1 (a int not null) type=innodb;
insert into t1 values (1),(2),(3);
select * from t1;
a
1
2
3
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 0
drop table t1;
commit;
create table t1 (a int not null) type=innodb;
create table t2 (a int not null) type=innodb;
create table t3 (a int not null) type=innodb;
insert into t1 values (1),(2);
insert into t2 values (1),(2);
insert into t3 values (1),(2);
select * from t1;
a
1
2
select * from t2;
a
1
2
select * from t3;
a
1
2
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 3
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
begin;
select * from t1;
a
1
2
select * from t2;
a
1
2
select * from t3;
a
1
2
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 3
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
insert into t1 values (3);
insert into t2 values (3);
insert into t1 values (4);
select * from t1;
a
1
2
3
4
select * from t2;
a
1
2
3
select * from t3;
a
1
2
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 3
show status like "Qcache_hits";
Variable_name Value
Qcache_hits 0
commit;
show status like "Qcache_queries_in_cache";
Variable_name Value
Qcache_queries_in_cache 1
......@@ -3,7 +3,8 @@
#
# Without auto_commit.
#
drop table if exists t1;
drop table if exists t1, t2, t3;
flush status;
set autocommit=0;
create table t1 (a int not null) type=innodb;
insert into t1 values (1),(2),(3);
......@@ -12,3 +13,37 @@ show status like "Qcache_queries_in_cache";
drop table t1;
commit;
set autocommit=1;
begin;
create table t1 (a int not null) type=innodb;
insert into t1 values (1),(2),(3);
select * from t1;
show status like "Qcache_queries_in_cache";
drop table t1;
commit;
create table t1 (a int not null) type=innodb;
create table t2 (a int not null) type=innodb;
create table t3 (a int not null) type=innodb;
insert into t1 values (1),(2);
insert into t2 values (1),(2);
insert into t3 values (1),(2);
select * from t1;
select * from t2;
select * from t3;
show status like "Qcache_queries_in_cache";
show status like "Qcache_hits";
begin;
select * from t1;
select * from t2;
select * from t3;
show status like "Qcache_queries_in_cache";
show status like "Qcache_hits";
insert into t1 values (3);
insert into t2 values (3);
insert into t1 values (4);
select * from t1;
select * from t2;
select * from t3;
show status like "Qcache_queries_in_cache";
show status like "Qcache_hits";
commit;
show status like "Qcache_queries_in_cache";
\ No newline at end of file
......@@ -271,7 +271,8 @@ int ha_commit_trans(THD *thd, THD_TRANS* trans)
#ifdef USING_TRANSACTIONS
if (opt_using_transactions)
{
bool operation_done=0;
bool operation_done= 0;
bool transaction_commited= 0;
/* Update the binary log if we have cached some queries */
if (trans == &thd->transaction.all && mysql_bin_log.is_open() &&
my_b_tell(&thd->transaction.trans_log))
......@@ -289,6 +290,8 @@ int ha_commit_trans(THD *thd, THD_TRANS* trans)
my_error(ER_ERROR_DURING_COMMIT, MYF(0), error);
error=1;
}
else
transaction_commited= 1;
trans->bdb_tid=0;
}
#endif
......@@ -302,12 +305,12 @@ int ha_commit_trans(THD *thd, THD_TRANS* trans)
}
trans->innodb_active_trans=0;
if (trans == &thd->transaction.all)
{
query_cache.invalidate(Query_cache_table::INNODB);
operation_done=1;
}
operation_done= transaction_commited= 1;
}
#endif
if (transaction_commited)
query_cache.invalidate(thd->transaction.changed_tables);
if (error && trans == &thd->transaction.all && mysql_bin_log.is_open())
sql_print_error("Error: Got error during commit; Binlog is not up to date!");
thd->tx_isolation=thd->session_tx_isolation;
......
This diff is collapsed.
......@@ -148,17 +148,8 @@ struct Query_cache_query
struct Query_cache_table
{
enum query_cache_table_type {OTHER=0, INNODB=1, TYPES_NUMBER=2};
inline static query_cache_table_type type_convertion(db_type type)
{
return (type == DB_TYPE_INNODB ? INNODB : OTHER);
}
char *tbl;
query_cache_table_type tp;
inline query_cache_table_type type() { return tp; }
inline void type(query_cache_table_type t) { tp = t;}
inline char *db() { return (char *) data(); }
inline char *table() { return tbl; }
inline void table(char *table) { tbl = table; }
......@@ -248,7 +239,7 @@ protected:
byte *cache; // cache memory
Query_cache_block *first_block; // physical location block list
Query_cache_block *queries_blocks; // query list (LIFO)
Query_cache_block *tables_blocks[Query_cache_table::TYPES_NUMBER];
Query_cache_block *tables_blocks;
Query_cache_memory_bin *bins; // free block lists
Query_cache_memory_bin_step *steps; // bins spacing info
......@@ -270,7 +261,8 @@ protected:
Query_cache_block *tail_head);
/* Table key generation */
static uint filename_2_table_key (char *key, const char *filename);
static uint filename_2_table_key (char *key, const char *filename,
uint32 *db_langth);
/* The following functions require that structure_guard_mutex is locked */
void flush_cache();
......@@ -282,13 +274,14 @@ protected:
my_bool first_block);
void invalidate_table(TABLE_LIST *table);
void invalidate_table(TABLE *table);
void invalidate_table(byte *key, uint32 key_length);
void invalidate_table(Query_cache_block *table_block);
my_bool register_all_tables(Query_cache_block *block,
TABLE_LIST *tables_used,
TABLE_COUNTER_TYPE tables);
my_bool insert_table(uint key_len, char *key,
Query_cache_block_table *node,
Query_cache_table::query_cache_table_type type);
uint32 db_length);
void unlink_table(Query_cache_block_table *node);
Query_cache_block *get_free_block (ulong len, my_bool not_less,
ulong min);
......@@ -369,11 +362,10 @@ protected:
int send_result_to_client(THD *thd, char *query, uint query_length);
/* Remove all queries that uses any of the listed following tables */
void invalidate(TABLE_LIST *tables_used);
void invalidate(TABLE *table);
/* Remove all queries that uses tables of pointed type*/
void invalidate(Query_cache_table::query_cache_table_type type);
void invalidate(THD* thd, TABLE_LIST *tables_used,
my_bool using_transactions);
void invalidate(CHANGED_TABLE_LIST *tables_used);
void invalidate(THD* thd, TABLE *table, my_bool using_transactions);
/* Remove all queries that uses any of the tables in following database */
void invalidate(char *db);
......
......@@ -34,6 +34,8 @@
#ifdef __WIN__
#include <io.h>
#endif
#include <mysys_err.h>
#include <assert.h>
/*****************************************************************************
** Instansiate templates
......@@ -105,6 +107,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
net.vio=0;
ull=0;
system_thread=cleanup_done=0;
transaction.changed_tables = 0;
#ifdef __WIN__
real_id = 0;
#endif
......@@ -136,6 +139,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
/* Initialize sub structures */
bzero((char*) &mem_root,sizeof(mem_root));
bzero((char*) &transaction.mem_root,sizeof(transaction.mem_root));
user_connect=(UC *)0;
hash_init(&user_vars, USER_VARS_HASH_SIZE, 0, 0,
(hash_get_key) get_var_key,
......@@ -211,6 +215,7 @@ THD::~THD()
safeFree(db);
safeFree(ip);
free_root(&mem_root,MYF(0));
free_root(&transaction.mem_root,MYF(0));
mysys_var=0; // Safety (shouldn't be needed)
#ifdef SIGNAL_WITH_VIO_CLOSE
pthread_mutex_destroy(&active_vio_lock);
......@@ -255,6 +260,88 @@ bool THD::store_globals()
my_pthread_setspecific_ptr(THR_NET, &net));
}
/* routings to adding tables to list of changed in transaction tables */
inline static void list_include(CHANGED_TABLE_LIST** prev,
CHANGED_TABLE_LIST* curr,
CHANGED_TABLE_LIST* new_table)
{
if (new_table)
{
*prev = new_table;
(*prev)->next = curr;
}
}
/* add table to list of changed in transaction tables */
void THD::add_changed_table(TABLE *table)
{
DBUG_ENTER("THD::add_changed_table (table)");
DBUG_ASSERT((options & (OPTION_NOT_AUTO_COMMIT | OPTION_BEGIN)) &&
table->file->has_transactions());
CHANGED_TABLE_LIST** prev = &transaction.changed_tables;
CHANGED_TABLE_LIST* curr = transaction.changed_tables;
for(; curr; prev = &(curr->next), curr = curr->next)
{
int cmp = (long)curr->key_length - (long)table->key_length;
if (cmp < 0)
{
list_include(prev, curr, changed_table_dup(table));
DBUG_PRINT("info",
("key_length %u %u", table->key_length, (*prev)->key_length));
DBUG_VOID_RETURN;
}
else if (cmp == 0)
{
cmp = memcmp(curr->key ,table->table_cache_key, curr->key_length);
if (cmp < 0)
{
list_include(prev, curr, changed_table_dup(table));
DBUG_PRINT("info",
("key_length %u %u", table->key_length, (*prev)->key_length));
DBUG_VOID_RETURN;
}
else if (cmp == 0)
{
DBUG_PRINT("info", ("already in list"));
DBUG_VOID_RETURN;
}
}
}
*prev = changed_table_dup(table);
DBUG_PRINT("info", ("key_length %u %u", table->key_length, (*prev)->key_length));
DBUG_VOID_RETURN;
}
CHANGED_TABLE_LIST* THD::changed_table_dup(TABLE *table)
{
CHANGED_TABLE_LIST* new_table =
(CHANGED_TABLE_LIST*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TABLE_LIST))+
table->key_length + 1);
if (!new_table)
{
my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
ALIGN_SIZE(sizeof(TABLE_LIST)) + table->key_length + 1);
killed= 1;
return 0;
}
new_table->key = (char *) (((byte*)new_table)+
ALIGN_SIZE(sizeof(CHANGED_TABLE_LIST)));
new_table->next = 0;
new_table->key_length = table->key_length;
uint32 db_len = ((new_table->table_name =
::strmake(new_table->key, table->table_cache_key,
table->key_length) + 1) - new_table->key);
::memcpy(new_table->key + db_len, table->table_cache_key + db_len,
table->key_length - db_len);
return new_table;
}
/*****************************************************************************
** Functions to provide a interface to select results
*****************************************************************************/
......
......@@ -258,7 +258,7 @@ class THD :public ilink {
public:
NET net; // client connection descriptor
LEX lex; // parse tree descriptor
MEM_ROOT mem_root; // memory allocation pool
MEM_ROOT mem_root; // 1 command-life memory allocation pool
HASH user_vars; // hash for user variables
String packet; // dynamic string buffer used for network I/O
struct sockaddr_in remote; // client socket address
......@@ -326,6 +326,19 @@ public:
THD_TRANS all; // Trans since BEGIN WORK
THD_TRANS stmt; // Trans for current statement
uint bdb_lock_count;
/*
Tables changed in transaction (that must be invalidated in query cache).
List contain only transactional tables, that not invalidated in query
cache (instead of full list of changed in transaction tables).
*/
CHANGED_TABLE_LIST* changed_tables;
MEM_ROOT mem_root; // Transaction-life memory allocation pool
void cleanup()
{
changed_tables = 0;
free_root(&mem_root,MYF(MY_KEEP_PREALLOC));
}
} transaction;
Item *free_list, *handler_items;
CONVERT *convert_set;
......@@ -471,6 +484,12 @@ public:
memcpy(ptr,str,size);
return ptr;
}
inline gptr trans_alloc(unsigned int size)
{
return alloc_root(&transaction.mem_root,size);
}
void add_changed_table(TABLE *table);
CHANGED_TABLE_LIST * changed_table_dup(TABLE *table);
};
......
......@@ -182,7 +182,7 @@ cleanup:
thd->lock=0;
}
if (deleted)
query_cache.invalidate(table_list);
query_cache.invalidate(thd, table_list, 1);
delete select;
if (error >= 0) // Fatal error
send_error(&thd->net,thd->killed ? ER_SERVER_SHUTDOWN: 0);
......@@ -470,7 +470,7 @@ bool multi_delete::send_eof()
VOID(ha_autocommit_or_rollback(thd,error > 0));
}
if (deleted)
query_cache.invalidate(delete_tables);
query_cache.invalidate(thd, delete_tables, 1);
::send_ok(&thd->net,deleted);
return 0;
}
......@@ -548,7 +548,7 @@ int mysql_truncate(THD *thd, TABLE_LIST *table_list, bool dont_send_ok)
bzero((char*) &create_info,sizeof(create_info));
*fn_ext(path)=0; // Remove the .frm extension
error= ha_create_table(path,&create_info,1) ? -1 : 0;
query_cache.invalidate(table_list);
query_cache.invalidate(thd, table_list, 0);
if (!dont_send_ok)
{
......
......@@ -311,7 +311,7 @@ int mysql_insert(THD *thd,TABLE_LIST *table_list, List<Item> &fields,
}
thd->proc_info="end";
if (info.copied || info.deleted)
query_cache.invalidate(table_list);
query_cache.invalidate(thd, table_list, 1);
table->time_stamp=save_time_stamp; // Restore auto timestamp ptr
table->next_number_field=0;
thd->count_cuted_fields=0;
......@@ -1217,7 +1217,7 @@ bool delayed_insert::handle_inserts(void)
sql_print_error("%s",thd.net.last_error);
goto err;
}
query_cache.invalidate(table);
query_cache.invalidate(&thd, table, 1);
if (thr_reschedule_write_lock(*thd.lock->locks))
{
/* This should never happen */
......@@ -1242,7 +1242,7 @@ bool delayed_insert::handle_inserts(void)
sql_print_error("%s",thd.net.last_error);
goto err;
}
query_cache.invalidate(table);
query_cache.invalidate(&thd, table, 1);
pthread_mutex_lock(&mutex);
DBUG_RETURN(0);
......@@ -1330,7 +1330,7 @@ void select_insert::send_error(uint errcode,const char *err)
table->file->activate_all_index(thd);
ha_rollback_stmt(thd);
if (info.copied || info.deleted)
query_cache.invalidate(table);
query_cache.invalidate(thd, table, 1);
}
......@@ -1343,7 +1343,7 @@ bool select_insert::send_eof()
if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error)
error=error2;
if (info.copied || info.deleted)
query_cache.invalidate(table);
query_cache.invalidate(thd, table, 1);
if (error)
{
......
......@@ -48,6 +48,10 @@
#endif /* HAVE_OPENSSL */
#define SCRAMBLE_LENGTH 8
#define MEM_ROOT_BLOCK_SIZE 8192
#define MEM_ROOT_PREALLOC 8192
#define TRANS_MEM_ROOT_BLOCK_SIZE 4096
#define TRANS_MEM_ROOT_PREALLOC 4096
extern int yyparse(void);
extern "C" pthread_mutex_t THR_LOCK_keycache;
......@@ -615,7 +619,9 @@ pthread_handler_decl(handle_one_connection,arg)
thd->command=COM_SLEEP;
thd->version=refresh_version;
thd->set_time();
init_sql_alloc(&thd->mem_root,8192,8192);
init_sql_alloc(&thd->mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
init_sql_alloc(&thd->transaction.mem_root,
TRANS_MEM_ROOT_BLOCK_SIZE, TRANS_MEM_ROOT_PREALLOC);
while (!net->error && net->vio != 0 && !thd->killed)
{
if (do_command(thd))
......@@ -688,7 +694,9 @@ pthread_handler_decl(handle_bootstrap,arg)
thd->priv_user=thd->user=(char*)"boot";
buff= (char*) thd->net.buff;
init_sql_alloc(&thd->mem_root,8192,8192);
init_sql_alloc(&thd->mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
init_sql_alloc(&thd->transaction.mem_root,
TRANS_MEM_ROOT_BLOCK_SIZE, TRANS_MEM_ROOT_PREALLOC);
while (fgets(buff, thd->net.max_packet, file))
{
uint length=(uint) strlen(buff);
......@@ -712,6 +720,7 @@ pthread_handler_decl(handle_bootstrap,arg)
if (thd->fatal_error)
break;
free_root(&thd->mem_root,MYF(MY_KEEP_PREALLOC));
free_root(&thd->transaction.mem_root,MYF(MY_KEEP_PREALLOC));
}
thd->priv_user=thd->user=0;
......@@ -1084,6 +1093,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
close_connection(net);
close_thread_tables(thd); // Free before kill
free_root(&thd->mem_root,MYF(0));
free_root(&thd->transaction.mem_root,MYF(0));
kill_mysql();
error=TRUE;
break;
......@@ -1620,7 +1630,7 @@ mysql_execute_command(void)
goto error;
}
}
query_cache.invalidate(tables);
query_cache.invalidate(thd, tables, 0);
if (end_active_trans(thd))
res= -1;
else if (mysql_rename_tables(thd,tables))
......@@ -1659,7 +1669,7 @@ mysql_execute_command(void)
check_table_access(thd,SELECT_ACL | INSERT_ACL, tables))
goto error; /* purecov: inspected */
res = mysql_repair_table(thd, tables, &lex->check_opt);
query_cache.invalidate(tables);
query_cache.invalidate(thd, tables, 0);
break;
}
case SQLCOM_CHECK:
......@@ -1668,7 +1678,7 @@ mysql_execute_command(void)
check_table_access(thd, SELECT_ACL | EXTRA_ACL , tables))
goto error; /* purecov: inspected */
res = mysql_check_table(thd, tables, &lex->check_opt);
query_cache.invalidate(tables);
query_cache.invalidate(thd, tables, 0);
break;
}
case SQLCOM_ANALYZE:
......@@ -2392,6 +2402,7 @@ mysql_execute_command(void)
}
else
res= -1;
thd->transaction.cleanup();
break;
}
case SQLCOM_ROLLBACK:
......@@ -2406,6 +2417,7 @@ mysql_execute_command(void)
else
res= -1;
thd->options&= ~(ulong) (OPTION_BEGIN | OPTION_STATUS_NO_TRANS_UPDATE);
thd->transaction.cleanup();
break;
default: /* Impossible */
send_ok(&thd->net);
......@@ -3108,7 +3120,22 @@ TABLE_LIST *add_table_to_list(Table_ident *table, LEX_STRING *alias,
if (!(ptr = (TABLE_LIST *) thd->calloc(sizeof(TABLE_LIST))))
DBUG_RETURN(0); /* purecov: inspected */
ptr->db= table->db.str ? table->db.str : (thd->db ? thd->db : (char*) "");
if (table->db.str)
{
ptr->db= table->db.str;
ptr->db_length= table->db.length;
}
else if (thd->db)
{
ptr->db= thd->db;
ptr->db_length= thd->db_length;
}
else
{
ptr->db= (char*) "";
ptr->db_length= 0;
}
ptr->name=alias_str;
if (lower_case_table_names)
{
......@@ -3116,6 +3143,7 @@ TABLE_LIST *add_table_to_list(Table_ident *table, LEX_STRING *alias,
casedn_str(table->table.str);
}
ptr->real_name=table->table.str;
ptr->real_name_length=table->table.length;
ptr->lock_type=flags;
ptr->updating=updating;
if (use_index)
......
......@@ -161,7 +161,7 @@ int mysql_rm_table_part2(THD *thd, TABLE_LIST *tables, bool if_exists,
}
if (some_tables_deleted)
{
query_cache.invalidate(tables);
query_cache.invalidate(thd, tables, 0);
if (!dont_log_query)
{
mysql_update_log.write(thd, thd->query,thd->query_length);
......@@ -1774,7 +1774,7 @@ int mysql_alter_table(THD *thd,char *new_db, char *new_name,
VOID(pthread_cond_broadcast(&COND_refresh));
VOID(pthread_mutex_unlock(&LOCK_open));
table_list->table=0; // For query cache
query_cache.invalidate(table_list);
query_cache.invalidate(thd, table_list, 0);
end_temporary:
sprintf(tmp_name,ER(ER_INSERT_INFO),(ulong) (copied+deleted),
......
......@@ -324,7 +324,8 @@ int mysql_update(THD *thd,
thd->lock=0;
}
if (updated)
query_cache.invalidate(table_list);
query_cache.invalidate(thd, table_list, 1);
delete select;
if (error >= 0)
send_error(&thd->net,thd->killed ? ER_SERVER_SHUTDOWN : 0); /* purecov: inspected */
......@@ -787,7 +788,8 @@ bool multi_update::send_eof()
sprintf(buff,ER(ER_UPDATE_INFO), (long) found, (long) updated,
(long) thd->cuted_fields);
if (updated)
query_cache.invalidate(update_tables);
query_cache.invalidate(thd, update_tables, 1);
::send_ok(&thd->net,
(thd->client_capabilities & CLIENT_FOUND_ROWS) ? found : updated,
thd->insert_id_used ? thd->insert_id() : 0L,buff);
......
......@@ -137,6 +137,7 @@ struct st_table {
typedef struct st_table_list {
struct st_table_list *next;
char *db,*name,*real_name;
uint32 db_length, real_name_length;
Item *on_expr; /* Used with outer join */
struct st_table_list *natural_join; /* natural join on this table*/
List<String> *use_index,*ignore_index;
......@@ -149,6 +150,12 @@ typedef struct st_table_list {
bool shared; /* Used twice in union */
} TABLE_LIST;
typedef struct st_changed_table_list {
struct st_changed_table_list *next;
char *key, *table_name;
uint32 key_length;
} CHANGED_TABLE_LIST;
typedef struct st_open_table_list
{
struct st_open_table_list *next;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment