Commit 62c1c3f0 authored by Sergey Petrunya's avatar Sergey Petrunya

MDEV-431: Cassandra storage engine

- Partial support for DELETE ... WHERE.
parent d3625970
...@@ -14,8 +14,11 @@ ERROR HY000: Unable to connect to foreign data source: Default TException. [Keys ...@@ -14,8 +14,11 @@ ERROR HY000: Unable to connect to foreign data source: Default TException. [Keys
create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra create table t1 (rowkey char(10) primary key, column1 char(10)) engine=cassandra
thrift_host='localhost' keyspace='no_such_keyspace'; thrift_host='localhost' keyspace='no_such_keyspace';
ERROR HY000: Unable to connect to foreign data source: thrift_host, keyspace, and column_family table options must be s ERROR HY000: Unable to connect to foreign data source: thrift_host, keyspace, and column_family table options must be s
# Now, create a table for real and insert data
create table t1 (rowkey varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra create table t1 (rowkey varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra
thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1'; thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
# Just in case there were left-overs from previous:
delete from t1;
select * from t1; select * from t1;
rowkey data1 data2 rowkey data1 data2
insert into t1 values ('rowkey10', 'data1-value', 123456); insert into t1 values ('rowkey10', 'data1-value', 123456);
...@@ -33,6 +36,12 @@ id select_type table type possible_keys key key_len ref rows Extra ...@@ -33,6 +36,12 @@ id select_type table type possible_keys key key_len ref rows Extra
select * from t1 where rowkey='rowkey11'; select * from t1 where rowkey='rowkey11';
rowkey data1 data2 rowkey data1 data2
rowkey11 data1-value2 34543 rowkey11 data1-value2 34543
delete from t1 where rowkey='rowkey11';
select * from t1;
rowkey data1 data2
rowkey12 data1-value3 454
rowkey10 data1-value 123456
rowkey11 NULL NULL
delete from t1; delete from t1;
select * from t1; select * from t1;
rowkey data1 data2 rowkey data1 data2
......
...@@ -48,9 +48,12 @@ create columnfamily cf1 ( pk varchar primary key, data1 varchar, data2 bigint); ...@@ -48,9 +48,12 @@ create columnfamily cf1 ( pk varchar primary key, data1 varchar, data2 bigint);
## Cassandra initialization ends ## Cassandra initialization ends
############################################################################ ############################################################################
# Now, create a table for real and insert data --echo # Now, create a table for real and insert data
create table t1 (rowkey varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra create table t1 (rowkey varchar(36) primary key, data1 varchar(60), data2 bigint) engine=cassandra
thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1'; thrift_host='localhost' keyspace='mariadbtest2' column_family='cf1';
--echo # Just in case there were left-overs from previous:
delete from t1;
select * from t1; select * from t1;
insert into t1 values ('rowkey10', 'data1-value', 123456); insert into t1 values ('rowkey10', 'data1-value', 123456);
...@@ -62,7 +65,22 @@ explain ...@@ -62,7 +65,22 @@ explain
select * from t1 where rowkey='rowkey11'; select * from t1 where rowkey='rowkey11';
select * from t1 where rowkey='rowkey11'; select * from t1 where rowkey='rowkey11';
# Check if deletion works # Deletion functions weirdly: it sets all columns to NULL
# but when If I do this in cassandra-cli:
#
# del cf1[ascii('rowkey10')]
#
# Subsequent 'list cf1' command also gives
#
# RowKey: rowkey10
#
# without any columns.
#
# CQL seems to simply ignore all "incomplete" records.
delete from t1 where rowkey='rowkey11';
select * from t1;
delete from t1; delete from t1;
select * from t1; select * from t1;
......
...@@ -101,6 +101,10 @@ class Cassandra_se_impl: public Cassandra_se_interface ...@@ -101,6 +101,10 @@ class Cassandra_se_impl: public Cassandra_se_interface
void add_read_column(const char *name); void add_read_column(const char *name);
bool truncate(); bool truncate();
bool remove_row();
/* Non-inherited utility functions: */
int64_t get_i64_timestamp();
}; };
...@@ -215,6 +219,18 @@ void Cassandra_se_impl::get_rowkey_type(char **name, char **type) ...@@ -215,6 +219,18 @@ void Cassandra_se_impl::get_rowkey_type(char **name, char **type)
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
// Data writes // Data writes
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
int64_t Cassandra_se_impl::get_i64_timestamp()
{
struct timeval td;
gettimeofday(&td, NULL);
int64_t ms = td.tv_sec;
ms = ms * 1000;
int64_t usec = td.tv_usec;
usec = usec / 1000;
ms += usec;
return ms;
}
void Cassandra_se_impl::start_prepare_insert(const char *key, int key_len) void Cassandra_se_impl::start_prepare_insert(const char *key, int key_len)
{ {
...@@ -226,14 +242,7 @@ void Cassandra_se_impl::start_prepare_insert(const char *key, int key_len) ...@@ -226,14 +242,7 @@ void Cassandra_se_impl::start_prepare_insert(const char *key, int key_len)
cf_mut[column_family]= std::vector<Mutation>(); cf_mut[column_family]= std::vector<Mutation>();
insert_list= &cf_mut[column_family]; insert_list= &cf_mut[column_family];
struct timeval td; insert_timestamp= get_i64_timestamp();
gettimeofday(&td, NULL);
int64_t ms = td.tv_sec;
ms = ms * 1000;
int64_t usec = td.tv_usec;
usec = usec / 1000;
ms += usec;
insert_timestamp= ms;
} }
...@@ -448,3 +457,25 @@ bool Cassandra_se_impl::truncate() ...@@ -448,3 +457,25 @@ bool Cassandra_se_impl::truncate()
return res; return res;
} }
bool Cassandra_se_impl::remove_row()
{
bool res= true;
ColumnPath column_path;
column_path.column_family= column_family;
try {
cass->remove(rowkey, column_path, get_i64_timestamp(), cur_consistency_level);
res= false;
} catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) {
print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) {
print_error("TimedOutException: %s", te.what());
}
return res;
}
...@@ -48,6 +48,8 @@ class Cassandra_se_interface ...@@ -48,6 +48,8 @@ class Cassandra_se_interface
virtual void add_read_column(const char *name)=0; virtual void add_read_column(const char *name)=0;
virtual bool truncate()=0; virtual bool truncate()=0;
virtual bool remove_row()=0;
/* Passing error messages up to ha_cassandra */ /* Passing error messages up to ha_cassandra */
char err_buffer[512]; char err_buffer[512];
const char *error_str() { return err_buffer; } const char *error_str() { return err_buffer; }
......
...@@ -625,7 +625,7 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key, ...@@ -625,7 +625,7 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
key_part_map keypart_map, key_part_map keypart_map,
enum ha_rkey_function find_flag) enum ha_rkey_function find_flag)
{ {
int rc; int rc= 0;
DBUG_ENTER("ha_cassandra::index_read_map"); DBUG_ENTER("ha_cassandra::index_read_map");
if (find_flag != HA_READ_KEY_EXACT) if (find_flag != HA_READ_KEY_EXACT)
...@@ -824,9 +824,15 @@ int ha_cassandra::delete_all_rows() ...@@ -824,9 +824,15 @@ int ha_cassandra::delete_all_rows()
int ha_cassandra::delete_row(const uchar *buf) int ha_cassandra::delete_row(const uchar *buf)
{ {
bool bres;
DBUG_ENTER("ha_cassandra::delete_row"); DBUG_ENTER("ha_cassandra::delete_row");
// todo: delete the row we've just read.
DBUG_RETURN(HA_ERR_WRONG_COMMAND); bres= se->remove_row();
if (bres)
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment