Commit 0dd14078 authored by Monty's avatar Monty

Column_store_cache, stage 1

This is implemented on top of csv engine. Should be moved to ColumnStore
ASAP
parent 6af0bd69
......@@ -126,7 +126,7 @@ typedef struct st_thr_lock {
/* write_lock_count is incremented for write locks and reset on read locks */
ulong write_lock_count;
uint read_no_write_count;
void (*get_status)(void*, my_bool); /* When one gets a lock */
my_bool (*get_status)(void*, my_bool);/* Called when one gets a lock */
void (*copy_status)(void*,void*);
void (*update_status)(void*); /* Before release of write */
void (*restore_status)(void*); /* Before release of read */
......
select * from information_schema.engines where engine="columnstore_cache";
ENGINE SUPPORT COMMENT TRANSACTIONS XA SAVEPOINTS
Columnstore_cache YES Insert cache for ColumnStore NO NO NO
create table t1 (a int, b varchar(10)) engine=columnstore_cache;
ERROR 42000: The storage engine for the table doesn't support nullable columns
create table t1 (a int not null, b varchar(10) not null) engine=columnstore_cache;
select * from t1;
a b
insert into t1 values(1,"hello");
insert into t1 values(2,"world"),(3,"!");
select * from t1;
a b
1 hello
2 world
3 !
insert into t1 values(4,"qqq"), (5,"to_be_deleted");
Warnings:
Warning 1406 Data too long for column 'b' at row 2
update t1 set b="updated" where a=4;
delete from t1 where a=5;
select * from t1;
a b
4 updated
1 hello
2 world
3 !
drop table t1;
--source "have_cache.inc"
#
# Basic test for columnstore_cache engine
#
select * from information_schema.engines where engine="columnstore_cache";
# Table doesn't support nullable columns
--error ER_CHECK_NOT_IMPLEMENTED
create table t1 (a int, b varchar(10)) engine=columnstore_cache;
create table t1 (a int not null, b varchar(10) not null) engine=columnstore_cache;
select * from t1;
insert into t1 values(1,"hello");
insert into t1 values(2,"world"),(3,"!");
select * from t1;
insert into t1 values(4,"qqq"), (5,"to_be_deleted");
update t1 set b="updated" where a=4;
delete from t1 where a=5;
select * from t1;
drop table t1;
if (!`SELECT count(*) FROM information_schema.engines WHERE
(support = 'YES' OR support = 'DEFAULT') AND
engine = 'columnstore_cache'`){
skip Need ColumnStore_Cache engine;
}
......@@ -634,9 +634,10 @@ wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data,
else
{
result= THR_LOCK_SUCCESS;
if (data->lock->get_status)
(*data->lock->get_status)(data->status_param,
data->type == TL_WRITE_CONCURRENT_INSERT);
if (data->lock->get_status &&
(*data->lock->get_status)(data->status_param,
data->type == TL_WRITE_CONCURRENT_INSERT))
result= THR_LOCK_ABORTED;
check_locks(data->lock,"got wait_for_lock", data->type, 0);
}
mysql_mutex_unlock(&data->lock->mutex);
......@@ -811,8 +812,8 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout)
if (lock_type == TL_READ_NO_INSERT)
lock->read_no_write_count++;
check_locks(lock,"read lock with old write lock", lock_type, 0);
if (lock->get_status)
(*lock->get_status)(data->status_param, 0);
if ((lock->get_status) && (*lock->get_status)(data->status_param, 0))
result= THR_LOCK_ABORTED;
statistic_increment(locks_immediate,&THR_LOCK_lock);
goto end;
}
......@@ -835,8 +836,8 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout)
if (lock_type == TL_READ_NO_INSERT)
lock->read_no_write_count++;
check_locks(lock,"read lock with no write locks", lock_type, 0);
if (lock->get_status)
(*lock->get_status)(data->status_param, 0);
if ((lock->get_status) && (*lock->get_status)(data->status_param, 0))
result= THR_LOCK_ABORTED;
statistic_increment(locks_immediate,&THR_LOCK_lock);
goto end;
}
......@@ -951,9 +952,10 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout)
data->prev=lock->write.last;
lock->write.last= &data->next;
check_locks(lock,"second write lock", lock_type, 0);
if (lock->get_status)
(*lock->get_status)(data->status_param,
lock_type == TL_WRITE_CONCURRENT_INSERT);
if ((lock->get_status) &&
(*lock->get_status)(data->status_param,
lock_type == TL_WRITE_CONCURRENT_INSERT))
result= THR_LOCK_ABORTED;
statistic_increment(locks_immediate,&THR_LOCK_lock);
goto end;
}
......@@ -986,8 +988,9 @@ thr_lock(THR_LOCK_DATA *data, THR_LOCK_INFO *owner, ulong lock_wait_timeout)
(*lock->write.last)=data; /* Add as current write lock */
data->prev=lock->write.last;
lock->write.last= &data->next;
if (lock->get_status)
(*lock->get_status)(data->status_param, concurrent_insert);
if ((lock->get_status) &&
(*lock->get_status)(data->status_param, concurrent_insert))
result= THR_LOCK_ABORTED;
check_locks(lock,"only write lock", lock_type, 0);
statistic_increment(locks_immediate,&THR_LOCK_lock);
goto end;
......@@ -1581,6 +1584,7 @@ my_bool thr_upgrade_write_delay_lock(THR_LOCK_DATA *data,
{
if (!lock->read.data) /* No read locks */
{ /* We have the lock */
/* For this function, get_status is not allowed to fail */
if (data->lock->get_status)
(*data->lock->get_status)(data->status_param, 0);
mysql_mutex_unlock(&lock->mutex);
......@@ -1781,9 +1785,10 @@ static ulong sum=0;
/* The following functions is for WRITE_CONCURRENT_INSERT */
static void test_get_status(void* param __attribute__((unused)),
my_bool concurrent_insert __attribute__((unused)))
static my_bool test_get_status(void* param __attribute__((unused)),
my_bool concurrent_insert __attribute__((unused)))
{
return 0;
}
static void test_update_status(void* param __attribute__((unused)))
......
......@@ -4836,9 +4836,9 @@ class handler :public Sql_alloc
DBUG_ASSERT(!(ha_table_flags() & HA_CAN_REPAIR));
return HA_ADMIN_NOT_IMPLEMENTED;
}
protected:
virtual void start_bulk_insert(ha_rows rows, uint flags) {}
virtual int end_bulk_insert() { return 0; }
protected:
virtual int index_read(uchar * buf, const uchar * key, uint key_len,
enum ha_rkey_function find_flag)
{ return HA_ERR_WRONG_COMMAND; }
......
......@@ -15,3 +15,17 @@
SET(CSV_SOURCES ha_tina.cc ha_tina.h transparent_file.cc transparent_file.h)
MYSQL_ADD_PLUGIN(csv ${CSV_SOURCES} STORAGE_ENGINE MANDATORY)
#
# Cache
#
INCLUDE (CheckIncludeFiles)
SET(CACHE_SOURCES ha_cache.cc)
IF(TARGET cache)
ADD_DEFINITIONS(-DWITH_CACHE_STORAGE_ENGINE)
TARGET_LINK_LIBRARIES(csv cache)
ENDIF()
MYSQL_ADD_PLUGIN(cache ${CACHE_SOURCES} STORAGE_ENGINE MANDATORY)
This diff is collapsed.
#ifndef HA_CACHE_INCLUDED
#define HA_CACHE3_INCLUDED
/* Copyright (C) 2020 MariaDB Corppration AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the
Free Software Foundation, Inc.
51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
*/
#include "ha_tina.h"
#include "../maria/ha_maria.h"
#include <thr_lock.h>
class ha_cache :public ha_tina
{
typedef ha_tina parent;
int original_lock_type;
bool insert_command;
public:
THR_LOCK org_lock;
uint lock_counter;
ha_maria *cache_handler;
ha_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root);
~ha_cache();
/*
The following functions duplicates calls to derived handler and
cache handler
*/
int create(const char *name, TABLE *table_arg,
HA_CREATE_INFO *ha_create_info);
int open(const char *name, int mode, uint open_flags);
int delete_table(const char *name);
int rename_table(const char *from, const char *to);
int delete_all_rows(void);
int close(void);
uint lock_count(void) const;
THR_LOCK_DATA **store_lock(THD *thd,
THR_LOCK_DATA **to,
enum thr_lock_type lock_type);
int external_lock(THD *thd, int lock_type);
int repair(THD *thd, HA_CHECK_OPT *check_opt);
bool is_crashed() const;
/*
Write row uses cache_handler, for normal inserts, otherwise derived
handler
*/
int write_row(const uchar *buf);
void start_bulk_insert(ha_rows rows, uint flags);
int end_bulk_insert();
/* Cache functions */
void free_locks();
bool rows_cached();
int flush_insert_cache();
friend my_bool get_status_and_flush_cache(void *param,
my_bool concurrent_insert);
};
#endif /* HA_S3_INCLUDED */
......@@ -883,10 +883,11 @@ int ha_tina::find_current_row(uchar *buf)
for CSV engine. For more details see mysys/thr_lock.c
*/
void tina_get_status(void* param, my_bool concurrent_insert)
my_bool tina_get_status(void* param, my_bool concurrent_insert)
{
ha_tina *tina= (ha_tina*) param;
tina->get_status();
return 0;
}
void tina_update_status(void* param)
......
......@@ -176,5 +176,6 @@ class ha_tina: public handler
int encode_quote(const uchar *buf);
int find_current_row(uchar *buf);
int chain_append();
friend class ha_cache;
};
......@@ -40,7 +40,9 @@ extern ulonglong maria_recover_options;
class ha_maria :public handler
{
public:
MARIA_HA *file;
private:
ulonglong int_table_flags;
MARIA_RECORD_POS remember_pos;
char *data_file_name, *index_file_name;
......@@ -197,6 +199,7 @@ class ha_maria :public handler
friend ICP_RESULT index_cond_func_maria(void *arg);
friend void reset_thd_trn(THD *thd);
friend class ha_s3;
friend class ha_cache;
};
#endif /* HA_MARIA_INCLUDED */
......@@ -306,7 +306,7 @@ my_bool write_hook_for_file_id(enum translog_record_type type,
my_bool write_hook_for_commit(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
void *hook_arg);
void _ma_block_get_status(void *param, my_bool concurrent_insert);
my_bool _ma_block_get_status(void *param, my_bool concurrent_insert);
my_bool _ma_block_start_trans(void* param);
my_bool _ma_block_start_trans_no_versioning(void *param);
void _ma_block_update_status(void *param);
......
......@@ -2367,7 +2367,7 @@ static int initialize_variables_for_repair(HA_CHECK *param,
/* Repair code relies on share->state.state so we have to update it here */
if (share->lock.update_status)
(*share->lock.update_status)(info);
(*share->lock.update_status)(info->lock.status_param);
bzero((char*) sort_info, sizeof(*sort_info));
bzero((char*) sort_param, sizeof(*sort_param));
......
......@@ -282,7 +282,7 @@ void _ma_reset_state(MARIA_HA *info)
(THR_WRITE_CONCURRENT_INSERT was used)
*/
void _ma_get_status(void* param, my_bool concurrent_insert)
my_bool _ma_get_status(void* param, my_bool concurrent_insert)
{
MARIA_HA *info=(MARIA_HA*) param;
DBUG_ENTER("_ma_get_status");
......@@ -301,7 +301,7 @@ void _ma_get_status(void* param, my_bool concurrent_insert)
info->state= &info->state_save;
info->state->changed= 0;
info->append_insert_at_end= concurrent_insert;
DBUG_VOID_RETURN;
DBUG_RETURN(0);
}
......@@ -359,7 +359,7 @@ void _ma_update_status_with_lock(MARIA_HA *info)
locked= 1;
mysql_mutex_lock(&info->s->lock.mutex);
}
(*info->s->lock.update_status)(info);
(*info->s->lock.update_status)(info->lock.status_param);
if (locked)
mysql_mutex_unlock(&info->s->lock.mutex);
}
......@@ -379,11 +379,12 @@ void _ma_copy_status(void* to, void *from)
}
void _ma_reset_update_flag(void *param,
my_bool concurrent_insert __attribute__((unused)))
my_bool _ma_reset_update_flag(void *param,
my_bool concurrent_insert __attribute__((unused)))
{
MARIA_HA *info=(MARIA_HA*) param;
info->state->changed= 0;
return 0;
}
my_bool _ma_start_trans(void* param)
......@@ -627,7 +628,7 @@ void _ma_remove_table_from_trnman(MARIA_HA *info)
(THR_WRITE_CONCURRENT_INSERT was used)
*/
void _ma_block_get_status(void* param, my_bool concurrent_insert)
my_bool _ma_block_get_status(void* param, my_bool concurrent_insert)
{
MARIA_HA *info=(MARIA_HA*) param;
DBUG_ENTER("_ma_block_get_status");
......@@ -645,7 +646,7 @@ void _ma_block_get_status(void* param, my_bool concurrent_insert)
{
DBUG_ASSERT(info->lock.type != TL_WRITE_CONCURRENT_INSERT);
}
DBUG_VOID_RETURN;
DBUG_RETURN(0);
}
......
......@@ -61,15 +61,15 @@ MARIA_STATE_HISTORY *_ma_remove_not_visible_states(MARIA_STATE_HISTORY
my_bool all,
my_bool trman_is_locked);
void _ma_reset_state(MARIA_HA *info);
void _ma_get_status(void* param, my_bool concurrent_insert);
my_bool _ma_get_status(void* param, my_bool concurrent_insert);
void _ma_update_status(void* param);
void _ma_update_status_with_lock(MARIA_HA *info);
void _ma_restore_status(void *param);
void _ma_copy_status(void* to, void *from);
void _ma_reset_update_flag(void *param, my_bool concurrent_insert);
my_bool _ma_reset_update_flag(void *param, my_bool concurrent_insert);
my_bool _ma_start_trans(void* param);
my_bool _ma_check_status(void *param);
void _ma_block_get_status(void* param, my_bool concurrent_insert);
my_bool _ma_block_get_status(void* param, my_bool concurrent_insert);
void _ma_block_update_status(void *param);
void _ma_block_restore_status(void *param);
my_bool _ma_block_check_status(void *param);
......
......@@ -1361,7 +1361,7 @@ my_bool _ma_cmp_dynamic_unique(MARIA_HA *info, MARIA_UNIQUEDEF *def,
const uchar *record, MARIA_RECORD_POS pos);
my_bool _ma_unique_comp(MARIA_UNIQUEDEF *def, const uchar *a, const uchar *b,
my_bool null_are_equal);
void _ma_get_status(void *param, my_bool concurrent_insert);
my_bool _ma_get_status(void *param, my_bool concurrent_insert);
void _ma_update_status(void *param);
void _ma_restore_status(void *param);
void _ma_copy_status(void *to, void *from);
......
......@@ -284,7 +284,7 @@ int mi_lock_database(MI_INFO *info, int lock_type)
(THR_WRITE_CONCURRENT_INSERT was used)
*/
void mi_get_status(void* param, my_bool concurrent_insert)
my_bool mi_get_status(void* param, my_bool concurrent_insert)
{
MI_INFO *info=(MI_INFO*) param;
DBUG_ENTER("mi_get_status");
......@@ -306,7 +306,7 @@ void mi_get_status(void* param, my_bool concurrent_insert)
info->append_insert_at_end= concurrent_insert;
if (concurrent_insert)
info->s->state.state.uncacheable= TRUE;
DBUG_VOID_RETURN;
DBUG_RETURN(0);
}
......
......@@ -726,7 +726,7 @@ int _mi_cmp_dynamic_unique(MI_INFO *info, MI_UNIQUEDEF *def,
const uchar *record, my_off_t pos);
int mi_unique_comp(MI_UNIQUEDEF *def, const uchar *a, const uchar *b,
my_bool null_are_equal);
void mi_get_status(void *param, my_bool concurrent_insert);
my_bool mi_get_status(void *param, my_bool concurrent_insert);
void mi_update_status(void *param);
void mi_restore_status(void *param);
void mi_copy_status(void *to, void *from);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment