Commit 8eb16159 authored by Sergey Petrunya's avatar Sergey Petrunya

Cassandra storage engine: BKA support

- We use HA_MRR_NO_ASSOC ("optimizer_switch=join_cache_hashed") mode
- Not able to use BKA's buffers yet.
- There is a variable to control batch size
- There are status counters.
- Nedeed to make some fixes in BKA code (to be checked with Igor)
parent fdab0300
...@@ -79,6 +79,7 @@ Cassandra_row_inserts 8 ...@@ -79,6 +79,7 @@ Cassandra_row_inserts 8
Cassandra_row_insert_batches 7 Cassandra_row_insert_batches 7
CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2'; thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
delete from t1;
INSERT INTO t1 VALUES (1,1),(2,2); INSERT INTO t1 VALUES (1,1),(2,2);
DELETE FROM t1 ORDER BY a LIMIT 1; DELETE FROM t1 ORDER BY a LIMIT 1;
DROP TABLE t1; DROP TABLE t1;
...@@ -92,3 +93,66 @@ show status like 'cassandra_row_insert%'; ...@@ -92,3 +93,66 @@ show status like 'cassandra_row_insert%';
Variable_name Value Variable_name Value
Cassandra_row_inserts 10 Cassandra_row_inserts 10
Cassandra_row_insert_batches 8 Cassandra_row_insert_batches 8
#
# Batched Key Access
#
# Control variable (we are not yet able to make use of MRR's buffer)
show variables like 'cassandra_multi%';
Variable_name Value
cassandra_multiget_batch_size 100
# MRR-related status variables:
show status like 'cassandra_multi%';
Variable_name Value
Cassandra_multiget_reads 0
Cassandra_multiget_keys_scanned 0
Cassandra_multiget_rows_read 0
CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
delete from t1;
INSERT INTO t1 VALUES (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9);
set @tmp_jcl=@@join_cache_level;
set join_cache_level=8;
explain select * from t1 A, t1 B where B.rowkey=A.a;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE A ALL NULL NULL NULL NULL 1000 Using where
1 SIMPLE B eq_ref PRIMARY PRIMARY 8 test.A.a 1 Using join buffer (flat, BKAH join); multiget_slice
select * from t1 A, t1 B where B.rowkey=A.a;
rowkey a rowkey a
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
4 4 4 4
5 5 5 5
6 6 6 6
7 7 7 7
8 8 8 8
9 9 9 9
show status like 'cassandra_multi%';
Variable_name Value
Cassandra_multiget_reads 1
Cassandra_multiget_keys_scanned 10
Cassandra_multiget_rows_read 10
insert into t1 values(1, 8);
insert into t1 values(3, 8);
insert into t1 values(5, 8);
insert into t1 values(7, 8);
select * from t1 A, t1 B where B.rowkey=A.a;
rowkey a rowkey a
0 0 0 0
2 2 2 2
4 4 4 4
6 6 6 6
1 8 8 8
7 8 8 8
8 8 8 8
5 8 8 8
3 8 8 8
9 9 9 9
show status like 'cassandra_multi%';
Variable_name Value
Cassandra_multiget_reads 2
Cassandra_multiget_keys_scanned 16
Cassandra_multiget_rows_read 16
delete from t1;
drop table t1;
...@@ -117,6 +117,7 @@ show status like 'cassandra_row_insert%'; ...@@ -117,6 +117,7 @@ show status like 'cassandra_row_insert%';
CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2'; thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
delete from t1;
INSERT INTO t1 VALUES (1,1),(2,2); INSERT INTO t1 VALUES (1,1),(2,2);
DELETE FROM t1 ORDER BY a LIMIT 1; DELETE FROM t1 ORDER BY a LIMIT 1;
...@@ -127,6 +128,40 @@ show status like 'cassandra_row_insert%'; ...@@ -127,6 +128,40 @@ show status like 'cassandra_row_insert%';
flush status; flush status;
show status like 'cassandra_row_insert%'; show status like 'cassandra_row_insert%';
--echo #
--echo # Batched Key Access
--echo #
--echo # Control variable (we are not yet able to make use of MRR's buffer)
show variables like 'cassandra_multi%';
--echo # MRR-related status variables:
show status like 'cassandra_multi%';
CREATE TABLE t1 (rowkey BIGINT PRIMARY KEY, a BIGINT) ENGINE=CASSANDRA
thrift_host='localhost' keyspace='mariadbtest2' column_family = 'cf2';
delete from t1;
INSERT INTO t1 VALUES (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9);
set @tmp_jcl=@@join_cache_level;
set join_cache_level=8;
explain select * from t1 A, t1 B where B.rowkey=A.a;
select * from t1 A, t1 B where B.rowkey=A.a;
show status like 'cassandra_multi%';
# The following INSERTs are really UPDATEs
insert into t1 values(1, 8);
insert into t1 values(3, 8);
insert into t1 values(5, 8);
insert into t1 values(7, 8);
select * from t1 A, t1 B where B.rowkey=A.a;
show status like 'cassandra_multi%';
delete from t1;
drop table t1;
############################################################################ ############################################################################
## Cassandra cleanup ## Cassandra cleanup
############################################################################ ############################################################################
......
...@@ -3876,8 +3876,11 @@ int JOIN_TAB_SCAN_MRR::next() ...@@ -3876,8 +3876,11 @@ int JOIN_TAB_SCAN_MRR::next()
If a record in in an incremental cache contains no fields then the If a record in in an incremental cache contains no fields then the
association for the last record in cache will be equal to cache->end_pos association for the last record in cache will be equal to cache->end_pos
*/ */
/*
psergey: this makes no sense where HA_MRR_NO_ASSOC is used.
DBUG_ASSERT(cache->buff <= (uchar *) (*ptr) && DBUG_ASSERT(cache->buff <= (uchar *) (*ptr) &&
(uchar *) (*ptr) <= cache->end_pos); (uchar *) (*ptr) <= cache->end_pos);
*/
if (join_tab->table->vfield) if (join_tab->table->vfield)
update_virtual_fields(join->thd, join_tab->table); update_virtual_fields(join->thd, join_tab->table);
} }
...@@ -4543,7 +4546,7 @@ bool JOIN_CACHE_BKAH::prepare_look_for_matches(bool skip_last) ...@@ -4543,7 +4546,7 @@ bool JOIN_CACHE_BKAH::prepare_look_for_matches(bool skip_last)
{ {
last_matching_rec_ref_ptr= next_matching_rec_ref_ptr= 0; last_matching_rec_ref_ptr= next_matching_rec_ref_ptr= 0;
if (no_association && if (no_association &&
(curr_matching_chain= get_matching_chain_by_join_key())) !(curr_matching_chain= get_matching_chain_by_join_key())) //psergey: added '!'
return 1; return 1;
last_matching_rec_ref_ptr= get_next_rec_ref(curr_matching_chain); last_matching_rec_ref_ptr= get_next_rec_ref(curr_matching_chain);
return 0; return 0;
......
...@@ -100,7 +100,18 @@ public: ...@@ -100,7 +100,18 @@ public:
/* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */ /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
void clear_read_columns(); void clear_read_columns();
void add_read_column(const char *name); void add_read_column(const char *name);
/* Reads, MRR scans */
void new_lookup_keys();
int add_lookup_key(const char *key, size_t key_len);
bool multiget_slice();
std::vector<std::string> mrr_keys; /* TODO: can we use allocator to put them onto MRR buffer? */
std::map<std::string, std::vector<ColumnOrSuperColumn> > mrr_result;
std::map<std::string, std::vector<ColumnOrSuperColumn> >::iterator mrr_result_it;
bool get_next_multiget_row();
bool truncate(); bool truncate();
bool remove_row(); bool remove_row();
...@@ -522,3 +533,72 @@ bool Cassandra_se_impl::remove_row() ...@@ -522,3 +533,72 @@ bool Cassandra_se_impl::remove_row()
return res; return res;
} }
/////////////////////////////////////////////////////////////////////////////
// MRR reads
/////////////////////////////////////////////////////////////////////////////
void Cassandra_se_impl::new_lookup_keys()
{
mrr_keys.clear();
}
int Cassandra_se_impl::add_lookup_key(const char *key, size_t key_len)
{
mrr_keys.push_back(std::string(key, key_len));
return mrr_keys.size();
}
bool Cassandra_se_impl::multiget_slice()
{
ColumnParent cparent;
cparent.column_family= column_family;
SlicePredicate slice_pred;
SliceRange sr;
sr.start = "";
sr.finish = "";
slice_pred.__set_slice_range(sr);
bool res= true;
try {
cassandra_counters.multiget_reads++;
cassandra_counters.multiget_keys_scanned += mrr_keys.size();
cass->multiget_slice(mrr_result, mrr_keys, cparent, slice_pred,
cur_consistency_level);
cassandra_counters.multiget_rows_read += mrr_result.size();
res= false;
mrr_result_it= mrr_result.begin();
} catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) {
print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) {
print_error("TimedOutException: %s", te.what());
}
return res;
}
bool Cassandra_se_impl::get_next_multiget_row()
{
if (mrr_result_it == mrr_result.end())
return true; /* EOF */
column_data_vec= mrr_result_it->second;
rowkey= mrr_result_it->first;
column_data_it= column_data_vec.begin();
mrr_result_it++;
return false;
}
...@@ -41,11 +41,16 @@ public: ...@@ -41,11 +41,16 @@ public:
/* Reads, multi-row scans */ /* Reads, multi-row scans */
int read_batch_size; int read_batch_size;
virtual bool get_range_slices(bool last_key_as_start_key)=0; virtual bool get_range_slices(bool last_key_as_start_key)=0;
virtual void finish_reading_range_slices()=0; virtual void finish_reading_range_slices()=0;
virtual bool get_next_range_slice_row(bool *eof)=0; virtual bool get_next_range_slice_row(bool *eof)=0;
/* Reads, MRR scans */
virtual void new_lookup_keys()=0;
virtual int add_lookup_key(const char *key, size_t key_len)=0;
virtual bool multiget_slice()=0;
virtual bool get_next_multiget_row()=0;
/* read_set setup */ /* read_set setup */
virtual void clear_read_columns()=0; virtual void clear_read_columns()=0;
virtual void add_read_column(const char *name)=0; virtual void add_read_column(const char *name)=0;
...@@ -59,13 +64,20 @@ public: ...@@ -59,13 +64,20 @@ public:
void print_error(const char *format, ...); void print_error(const char *format, ...);
}; };
/* A structure with global counters */ /* A structure with global counters */
class Cassandra_status_vars class Cassandra_status_vars
{ {
public: public:
ulong row_inserts; ulong row_inserts;
ulong row_insert_batches; ulong row_insert_batches;
ulong multiget_reads;
ulong multiget_keys_scanned;
ulong multiget_rows_read;
}; };
extern Cassandra_status_vars cassandra_counters; extern Cassandra_status_vars cassandra_counters;
......
...@@ -60,15 +60,35 @@ static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG, ...@@ -60,15 +60,35 @@ static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
"Number of rows in an INSERT batch", "Number of rows in an INSERT batch",
NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0); NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG,
"Number of rows in a multiget(MRR) batch",
NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
static struct st_mysql_sys_var* cassandra_system_variables[]= { static struct st_mysql_sys_var* cassandra_system_variables[]= {
MYSQL_SYSVAR(insert_batch_size), MYSQL_SYSVAR(insert_batch_size),
MYSQL_SYSVAR(multiget_batch_size),
// MYSQL_SYSVAR(enum_var), // MYSQL_SYSVAR(enum_var),
// MYSQL_SYSVAR(ulong_var), // MYSQL_SYSVAR(ulong_var),
NULL NULL
}; };
static SHOW_VAR cassandra_status_variables[]= {
{"row_inserts",
(char*) &cassandra_counters.row_inserts, SHOW_LONG},
{"row_insert_batches",
(char*) &cassandra_counters.row_insert_batches, SHOW_LONG},
{"multiget_reads",
(char*) &cassandra_counters.multiget_reads, SHOW_LONG},
{"multiget_keys_scanned",
(char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
{"multiget_rows_read",
(char*) &cassandra_counters.multiget_rows_read, SHOW_LONG},
{NullS, NullS, SHOW_LONG}
};
Cassandra_status_vars cassandra_counters; Cassandra_status_vars cassandra_counters;
Cassandra_status_vars cassandra_counters_copy; Cassandra_status_vars cassandra_counters_copy;
...@@ -772,8 +792,7 @@ int ha_cassandra::write_row(uchar *buf) ...@@ -772,8 +792,7 @@ int ha_cassandra::write_row(uchar *buf)
if (doing_insert_batch) if (doing_insert_batch)
{ {
res= 0; res= 0;
if (++insert_rows_batched >= /*insert_batch_size*/ if (++insert_rows_batched >= THDVAR(table->in_use, insert_batch_size))
THDVAR(table->in_use, insert_batch_size))
{ {
res= se->do_insert(); res= se->do_insert();
insert_rows_batched= 0; insert_rows_batched= 0;
...@@ -955,6 +974,135 @@ int ha_cassandra::reset() ...@@ -955,6 +974,135 @@ int ha_cassandra::reset()
return 0; return 0;
} }
/////////////////////////////////////////////////////////////////////////////
// MRR implementation
/////////////////////////////////////////////////////////////////////////////
/*
- The key can be only primary key
- allow equality-ranges only.
- anything else?
*/
ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
void *seq_init_param,
uint n_ranges, uint *bufsz,
uint *flags, COST_VECT *cost)
{
/* No support for const ranges so far */
return HA_POS_ERROR;
}
ha_rows ha_cassandra::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
uint key_parts, uint *bufsz,
uint *flags, COST_VECT *cost)
{
/* Can only be equality lookups on the primary key... */
// TODO anything else?
*flags &= ~HA_MRR_USE_DEFAULT_IMPL;
*flags |= HA_MRR_NO_ASSOCIATION;
return 10;
}
int ha_cassandra::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
uint n_ranges, uint mode, HANDLER_BUFFER *buf)
{
int res;
mrr_iter= seq->init(seq_init_param, n_ranges, mode);
mrr_funcs= *seq;
res= mrr_start_read();
return (res? HA_ERR_INTERNAL_ERROR: 0);
}
bool ha_cassandra::mrr_start_read()
{
uint key_len;
my_bitmap_map *old_map;
old_map= dbug_tmp_use_all_columns(table, table->read_set);
se->new_lookup_keys();
while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
{
char *cass_key;
int cass_key_len;
DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE);
uchar *key= (uchar*)mrr_cur_range.start_key.key;
key_len= mrr_cur_range.start_key.length;
//key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS??
store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
// Primitive buffer control
if (se->add_lookup_key(cass_key, cass_key_len) >
THDVAR(table->in_use, multiget_batch_size))
break;
}
dbug_tmp_restore_column_map(table->read_set, old_map);
return se->multiget_slice();
}
int ha_cassandra::multi_range_read_next(range_id_t *range_info)
{
int res;
while(1)
{
if (!se->get_next_multiget_row())
{
read_cassandra_columns(true);
res= 0;
break;
}
else
{
if (source_exhausted)
{
res= HA_ERR_END_OF_FILE;
break;
}
else
{
if (mrr_start_read())
{
res= HA_ERR_INTERNAL_ERROR;
break;
}
}
}
/*
We get here if we've refilled the buffer and done another read. Try
reading from results again
*/
}
return res;
}
int ha_cassandra::multi_range_read_explain_info(uint mrr_mode, char *str, size_t size)
{
const char *mrr_str= "multiget_slice";
if (!(mrr_mode & HA_MRR_USE_DEFAULT_IMPL))
{
uint mrr_str_len= strlen(mrr_str);
uint copy_len= min(mrr_str_len, size);
memcpy(str, mrr_str, size);
return copy_len;
}
return 0;
}
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
// Dummy implementations start // Dummy implementations start
...@@ -1073,15 +1221,6 @@ bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info, ...@@ -1073,15 +1221,6 @@ bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
// Dummy implementations end // Dummy implementations end
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
static SHOW_VAR cassandra_status_variables[]= {
{"row_inserts",
(char*) &cassandra_counters.row_inserts, SHOW_LONG},
{"row_insert_batches",
(char*) &cassandra_counters.row_insert_batches, SHOW_LONG},
{NullS, NullS, SHOW_LONG}
};
static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff) static int show_cassandra_vars(THD *thd, SHOW_VAR *var, char *buff)
{ {
//innodb_export_status(); //innodb_export_status();
......
...@@ -154,6 +154,24 @@ public: ...@@ -154,6 +154,24 @@ public:
virtual int reset(); virtual int reset();
int multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
uint n_ranges, uint mode, HANDLER_BUFFER *buf);
int multi_range_read_next(range_id_t *range_info);
ha_rows multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
void *seq_init_param,
uint n_ranges, uint *bufsz,
uint *flags, COST_VECT *cost);
ha_rows multi_range_read_info(uint keyno, uint n_ranges, uint keys,
uint key_parts, uint *bufsz,
uint *flags, COST_VECT *cost);
int multi_range_read_explain_info(uint mrr_mode, char *str, size_t size);
private:
bool source_exhausted;
bool mrr_start_read();
public:
/* /*
Everything below are methods that we implement in ha_example.cc. Everything below are methods that we implement in ha_example.cc.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment