Commit 0d840d4d authored by Sergey Petrunya's avatar Sergey Petrunya

MDEV-431: Cassandra storage engine

- Introduce type converters (so far rather trivial)
- switch INSERT to using batch_mutate()
parent c15914f7
......@@ -49,7 +49,7 @@ create columnfamily cf1 ( pk varchar primary key, data1 varchar);
############################################################################
# Now, create a table for real and insert data
create table t1 (rowkey char(36) primary key, column1 char(60)) engine=cassandra
create table t1 (rowkey char(36) primary key, data1 varchar(60)) engine=cassandra
thrift_host='localhost' keyspace='mariadbtest' column_family='cf1';
insert into t1 values ('key0', 'data1');
......
......@@ -43,53 +43,56 @@ class Cassandra_se_impl: public Cassandra_se_interface
std::string column_family;
std::string keyspace;
/* DDL checks */
/* DDL data */
KsDef ks_def; /* KeySpace we're using (TODO: put this in table->share) */
CfDef cf_def; /* Column family we're using (TODO: put in table->share)*/
std::vector<ColumnDef>::iterator column_ddl_it;
/* The list that was returned by the last key lookup */
std::vector<ColumnOrSuperColumn> col_supercol_vec;
std::vector<ColumnOrSuperColumn> column_data_vec;
std::vector<ColumnOrSuperColumn>::iterator column_data_it;
public:
/* Insert preparation */
typedef std::map<std::string, std::vector<Mutation> > ColumnFamilyToMutation;
typedef std::map<std::string, ColumnFamilyToMutation> KeyToCfMutationMap;
KeyToCfMutationMap batch_mutation; /* Prepare operation here */
std::string key_to_insert;
int64_t insert_timestamp;
std::vector<Mutation>* insert_list;
public:
Cassandra_se_impl() : cass(NULL) {}
virtual ~Cassandra_se_impl(){ delete cass; }
/* Connection and DDL checks */
bool connect(const char *host, const char *keyspace);
void set_column_family(const char *cfname) { column_family.assign(cfname); }
virtual void set_column_family(const char *cfname)
{
column_family.assign(cfname);
}
virtual bool insert(NameAndValue *fields);
virtual bool get_slice(char *key, size_t key_len, NameAndValue *row, bool *found);
/* Functions to enumerate ColumnFamily's DDL data */
bool setup_ddl_checks();
void first_ddl_column();
bool next_ddl_column(char **name, int *name_len, char **value, int *value_len);
/* Writes */
void start_prepare_insert(const char *key, int key_len);
void add_insert_column(const char *name, const char *value, int value_len);
bool do_insert();
/* Reads */
bool get_slice(char *key, size_t key_len, bool *found);
bool get_next_read_column(char **name, char **value, int *value_len);
};
/////////////////////////////////////////////////////////////////////////////
// Connection and setup
/////////////////////////////////////////////////////////////////////////////
Cassandra_se_interface *get_cassandra_se()
{
return new Cassandra_se_impl;
}
#define CASS_TRY(x) try { \
x; \
}catch(TTransportException te){ \
print_error("%s [%d]", te.what(), te.getType()); \
}catch(InvalidRequestException ire){ \
print_error("%s [%s]", ire.what(), ire.why.c_str()); \
}catch(NotFoundException nfe){ \
print_error("%s", nfe.what()); \
} catch(...) { \
print_error("Unknown Exception"); \
}
bool Cassandra_se_impl::connect(const char *host, const char *keyspace_arg)
{
......@@ -121,10 +124,9 @@ bool Cassandra_se_impl::connect(const char *host, const char *keyspace_arg)
print_error("Unknown Exception");
}
// For now:
cur_consistency_level= ConsistencyLevel::ONE;
if (setup_ddl_checks())
if (!res && setup_ddl_checks())
res= true;
return res;
}
......@@ -176,13 +178,20 @@ bool Cassandra_se_impl::next_ddl_column(char **name, int *name_len,
return false;
}
/////////////////////////////////////////////////////////////////////////////
// Data writes
/////////////////////////////////////////////////////////////////////////////
bool Cassandra_se_impl::insert(NameAndValue *fields)
void Cassandra_se_impl::start_prepare_insert(const char *key, int key_len)
{
ColumnParent cparent;
cparent.column_family= column_family;
key_to_insert.assign(key, key_len);
batch_mutation.clear();
batch_mutation[key_to_insert]= ColumnFamilyToMutation();
ColumnFamilyToMutation& cf_mut= batch_mutation[key_to_insert];
cf_mut[column_family]= std::vector<Mutation>();
insert_list= &cf_mut[column_family];
Column c;
struct timeval td;
gettimeofday(&td, NULL);
int64_t ms = td.tv_sec;
......@@ -190,37 +199,57 @@ bool Cassandra_se_impl::insert(NameAndValue *fields)
int64_t usec = td.tv_usec;
usec = usec / 1000;
ms += usec;
c.timestamp = ms;
c.__isset.timestamp = true;
insert_timestamp= ms;
}
std::string key;
key.assign(fields->value, fields->value_len);
fields++;
void Cassandra_se_impl::add_insert_column(const char *name, const char *value,
int value_len)
{
Mutation mut;
mut.__isset.column_or_supercolumn= true;
mut.column_or_supercolumn.__isset.column= true;
Column& col=mut.column_or_supercolumn.column;
col.name.assign(name);
col.value.assign(value, value_len);
col.timestamp= insert_timestamp;
col.__isset.value= true;
col.__isset.timestamp= true;
insert_list->push_back(mut);
}
bool res= false;
bool Cassandra_se_impl::do_insert()
{
bool res= true;
try {
/* TODO: switch to batch_mutate(). Or, even to CQL? */
// TODO: what should INSERT table (co1, col2) VALUES ('foo', 'bar') mean?
// in SQL, it sets all columns.. what should it mean here? can we have
// it to work only for specified columns? (if yes, what do for
// VALUES()?)
c.__isset.value= true;
for(;fields->name; fields++)
{
c.name.assign(fields->name);
c.value.assign(fields->value, fields->value_len);
cass->insert(key, cparent, c, ConsistencyLevel::ONE);
}
} catch (...) {
res= true;
cass->batch_mutate(batch_mutation, cur_consistency_level);
res= false;
} catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) {
print_error("UnavailableException: %s", ue.what());
} catch (TimedOutException te) {
print_error("TimedOutException: %s", te.what());
}
return res;
}
bool Cassandra_se_impl::get_slice(char *key, size_t key_len, NameAndValue *row, bool *found)
/////////////////////////////////////////////////////////////////////////////
// Reading data
/////////////////////////////////////////////////////////////////////////////
/*
Make one key lookup. If the record is found, the result is stored locally and
the caller should iterate over it.
*/
bool Cassandra_se_impl::get_slice(char *key, size_t key_len, bool *found)
{
ColumnParent cparent;
cparent.column_family= column_family;
......@@ -235,12 +264,10 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, NameAndValue *row,
slice_pred.__set_slice_range(sr);
try {
std::vector<ColumnOrSuperColumn> &res= col_supercol_vec;
cass->get_slice(res, rowkey_str, cparent, slice_pred, ConsistencyLevel::ONE);
*found= true;
cass->get_slice(column_data_vec, rowkey_str, cparent, slice_pred,
ConsistencyLevel::ONE);
std::vector<ColumnOrSuperColumn>::iterator it;
if (res.size() == 0)
if (column_data_vec.size() == 0)
{
/*
No columns found. Cassandra doesn't allow records without any column =>
......@@ -249,23 +276,45 @@ bool Cassandra_se_impl::get_slice(char *key, size_t key_len, NameAndValue *row,
*found= false;
return false;
}
for (it= res.begin(); it < res.end(); it++)
{
ColumnOrSuperColumn cs= *it;
if (!cs.__isset.column)
return true;
row->name= (char*)cs.column.name.c_str();
row->value= (char*)cs.column.value.c_str();
row->value_len= cs.column.value.length();
row++;
}
row->name= NULL;
*found= true;
} catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str());
return true;
} catch (UnavailableException ue) {
print_error("UnavailableException: %s", ue.what());
return true;
} catch (TimedOutException te) {
print_error("TimedOutException: %s", te.what());
return true;
}
column_data_it= column_data_vec.begin();
return false;
}
bool Cassandra_se_impl::get_next_read_column(char **name, char **value,
int *value_len)
{
while (1)
{
if (column_data_it == column_data_vec.end())
return true;
if (((*column_data_it).__isset.column))
break; /* Ok it's a real column. Should be always the case. */
column_data_it++;
}
ColumnOrSuperColumn& cs= *column_data_it;
*name= (char*)cs.column.name.c_str();
*value= (char*)cs.column.value.c_str();
*value_len= cs.column.value.length();
column_data_it++;
return false;
}
......@@ -43,10 +43,14 @@ class Cassandra_se_interface
int *value_len)=0;
/* Writes */
virtual bool insert(NameAndValue *fields)=0;
virtual void start_prepare_insert(const char *key, int key_len)=0;
virtual void add_insert_column(const char *name, const char *value,
int value_len)=0;
virtual bool do_insert()=0;
/* Reads */
virtual bool get_slice(char *key, size_t key_len, NameAndValue *row, bool *found)=0 ;
virtual bool get_slice(char *key, size_t key_len, bool *found)=0 ;
virtual bool get_next_read_column(char **name, char **value, int *value_len)=0;
/* Passing error messages up to ha_cassandra */
char err_buffer[512];
......
......@@ -212,7 +212,8 @@ static handler* cassandra_create_handler(handlerton *hton,
ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
:handler(hton, table_arg),
se(NULL), names_and_vals(NULL)
se(NULL), names_and_vals(NULL),
field_converters(NULL)
{}
......@@ -249,6 +250,12 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
if (setup_field_converters(table->field, table->s->fields))
{
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "setup_field_converters");
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
DBUG_RETURN(0);
}
......@@ -317,16 +324,6 @@ int ha_cassandra::create(const char *name, TABLE *table_arg,
DBUG_RETURN(HA_WRONG_CREATE_OPTION);
}
/*
pfield++;
if (strcmp((*pfield)->field_name, "data"))
{
my_error(ER_WRONG_COLUMN_NAME, MYF(0), "Second column must be named 'data'");
DBUG_RETURN(HA_WRONG_CREATE_OPTION);
}
*/
#ifndef DBUG_OFF
/*
......@@ -359,28 +356,11 @@ int ha_cassandra::create(const char *name, TABLE *table_arg,
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
/*
TODO: what about mapping the primary key? It has a 'type', too...
see CfDef::key_validation_class ? see also CfDef::key_alias?
*/
se->first_ddl_column();
char *col_name;
int col_name_len;
char *col_type;
int col_type_len;
while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
&col_type_len))
if (setup_field_converters(table_arg->s->field, table_arg->s->fields))
{
/* Mapping for the 1st field is already known */
for (Field **field= table_arg->s->field + 1; *field; field++)
{
if (!strcmp((*field)->field_name, col_name))
{
//map_field_to_type(field, col_type);
}
}
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "setup_field_converters");
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
DBUG_RETURN(0);
}
......@@ -391,37 +371,224 @@ int ha_cassandra::create(const char *name, TABLE *table_arg,
*/
const char * const validator_bigint="org.apache.cassandra.db.marshal.LongType";
const char * const validator_int="org.apache.cassandra.db.marshal.Int32Type";
/* Converter base */
class ColumnDataConverter
{
public:
Field *field;
/* This will save Cassandra's data in the Field */
virtual void cassandra_to_mariadb(const char *cass_data,
int cass_data_len)=0;
/*
This will get data from the Field pointer, store Cassandra's form
in internal buffer, and return pointer/size.
*/
virtual void mariadb_to_cassandra(char **cass_data, int *cass_data_len)=0;
virtual ~ColumnDataConverter() {};
};
class DoubleDataConverter : public ColumnDataConverter
{
double buf;
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len == sizeof(double));
double *pdata= (double*) cass_data;
field->store(*pdata);
}
void mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{
buf= field->val_real();
*cass_data= (char*)&buf;
*cass_data_len=sizeof(double);
}
~DoubleDataConverter(){}
};
class FloatDataConverter : public ColumnDataConverter
{
float buf;
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
DBUG_ASSERT(cass_data_len == sizeof(float));
float *pdata= (float*) cass_data;
field->store(*pdata);
}
void mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{
buf= field->val_real();
*cass_data= (char*)&buf;
*cass_data_len=sizeof(float);
}
~FloatDataConverter(){}
};
class BigintDataConverter : public ColumnDataConverter
{
longlong buf;
public:
void flip(const char *from, char* to)
{
to[0]= from[7];
to[1]= from[6];
to[2]= from[5];
to[3]= from[4];
to[4]= from[3];
to[5]= from[2];
to[6]= from[1];
to[7]= from[0];
}
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
longlong tmp;
DBUG_ASSERT(cass_data_len == sizeof(longlong));
flip(cass_data, (char*)&tmp);
field->store(tmp);
}
void mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{
longlong tmp= field->val_int();
flip((const char*)&tmp, (char*)&buf);
*cass_data= (char*)&buf;
*cass_data_len=sizeof(longlong);
}
~BigintDataConverter(){}
};
class StringCopyConverter : public ColumnDataConverter
{
String buf;
public:
void cassandra_to_mariadb(const char *cass_data, int cass_data_len)
{
field->store(cass_data, cass_data_len,field->charset());
}
void mariadb_to_cassandra(char **cass_data, int *cass_data_len)
{
String *pstr= field->val_str(&buf);
*cass_data= (char*)pstr->c_ptr();
*cass_data_len= pstr->length();
}
~StringCopyConverter(){}
};
const char * const validator_bigint= "org.apache.cassandra.db.marshal.LongType";
const char * const validator_int= "org.apache.cassandra.db.marshal.Int32Type";
const char * const validator_counter= "org.apache.cassandra.db.marshal.CounterColumnType";
const char * const validator_float= "org.apache.cassandra.db.marshal.FloatType";
const char * const validator_double= "org.apache.cassandra.db.marshal.DoubleType";
void map_field_to_type(Field *field, const char *validator_name)
const char * const validator_blob= "org.apache.cassandra.db.marshal.BytesType";
const char * const validator_ascii= "org.apache.cassandra.db.marshal.AsciiType";
const char * const validator_text= "org.apache.cassandra.db.marshal.UTF8Type";
ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
{
ColumnDataConverter *res= NULL;
switch(field->type()) {
case MYSQL_TYPE_TINY:
case MYSQL_TYPE_SHORT:
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_LONGLONG:
if (!strcmp(validator_name, validator_bigint))
{
//setup bigint validator
}
res= new BigintDataConverter;
break;
case MYSQL_TYPE_FLOAT:
if (!strcmp(validator_name, validator_float))
res= new FloatDataConverter;
break;
case MYSQL_TYPE_DOUBLE:
if (!strcmp(validator_name, validator_double))
res= new DoubleDataConverter;
break;
default:
DBUG_ASSERT(0);
case MYSQL_TYPE_VAR_STRING:
case MYSQL_TYPE_VARCHAR:
if (!strcmp(validator_name, validator_blob) ||
!strcmp(validator_name, validator_ascii) ||
!strcmp(validator_name, validator_text))
{
res= new StringCopyConverter;
}
break;
default:;
}
return res;
}
bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
{
char *col_name;
int col_name_len;
char *col_type;
int col_type_len;
DBUG_ASSERT(!field_converters);
size_t memsize= sizeof(ColumnDataConverter*) * n_fields;
if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
return true;
bzero(field_converters, memsize);
n_field_converters= n_fields;
/*
TODO: what about mapping the primary key? It has a 'type', too...
see CfDef::key_validation_class ? see also CfDef::key_alias?
*/
se->first_ddl_column();
uint n_mapped= 0;
while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
&col_type_len))
{
/* Mapping for the 1st field is already known */
for (Field **field= field_arg + 1; *field; field++)
{
if (!strcmp((*field)->field_name, col_name))
{
n_mapped++;
ColumnDataConverter **conv= field_converters + (*field)->field_index;
if (!(*conv= map_field_to_validator(*field, col_type)))
return true;
(*conv)->field= *field;
}
}
}
if (n_mapped != n_fields - 1)
return true;
return false;
}
void ha_cassandra::free_field_converters()
{
if (field_converters)
{
for (uint i=0; i < n_field_converters; i++)
delete field_converters[i];
my_free(field_converters);
field_converters= NULL;
}
}
void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
......@@ -445,27 +612,42 @@ int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
str= table->field[0]->val_str(&tmp);
bool found;
if (se->get_slice((char*)str->ptr(), str->length(), get_names_and_vals(), &found))
if (se->get_slice((char*)str->ptr(), str->length(), &found))
rc= HA_ERR_INTERNAL_ERROR;
else
{
if (found)
/* TODO: what if we're not reading all columns?? */
if (!found)
{
//NameAndValue *nv= get_names_and_vals();
// TODO: walk through the (name, value) pairs and return values.
}
else
rc= HA_ERR_KEY_NOT_FOUND;
}
#ifdef NEW_CODE
else
{
char *cass_name;
char *cass_value;
int cass_value_len;
Field **field;
se->get_slice();
/* Start with all fields being NULL */
for (field= table->field + 1; *field; field++)
(*field)->set_null();
for each column
while (!se->get_next_read_column(&cass_name, &cass_value, &cass_value_len))
{
// map to our column. todo: use hash or something..
int idx=1;
for (field= table->field + 1; *field; field++)
{
find column;
idx++;
if (!strcmp((*field)->field_name, cass_name))
{
int fieldnr= (*field)->field_index;
(*field)->set_notnull();
field_converters[fieldnr]->cassandra_to_mariadb(cass_value, cass_value_len);
break;
}
}
}
}
#endif
DBUG_RETURN(rc);
}
......@@ -475,36 +657,35 @@ int ha_cassandra::write_row(uchar *buf)
{
my_bitmap_map *old_map;
char buff[512];
NameAndValue *tuple;
NameAndValue *nv;
DBUG_ENTER("ha_cassandra::write_row");
/* Temporary malloc-happy code just to get INSERTs to work */
nv= tuple= get_names_and_vals();
old_map= dbug_tmp_use_all_columns(table, table->read_set);
for (Field **field= table->field; *field; field++, nv++)
/* Convert the key (todo: unify with the rest of the processing) */
{
Field *pk_col= table->field[0];
String tmp(buff,sizeof(buff), &my_charset_bin);
tmp.length(0);
String *str;
str= (*field)->val_str(&tmp);
nv->name= (char*)(*field)->field_name;
nv->value_len= str->length();
nv->value= (char*)my_malloc(nv->value_len, MYF(0));
memcpy(nv->value, str->ptr(), nv->value_len);
}
nv->name= NULL;
dbug_tmp_restore_column_map(table->read_set, old_map);
tmp.length(0);
str= pk_col->val_str(&tmp);
//invoke!
bool res= se->insert(tuple);
se->start_prepare_insert(str->ptr(), str->length());
}
for (nv= tuple; nv->name; nv++)
/* Convert other fields */
for (uint i= 1; i < table->s->fields; i++)
{
my_free(nv->value);
char *cass_data;
int cass_data_len;
field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len);
se->add_insert_column(field_converters[i]->field->field_name,
cass_data, cass_data_len);
}
dbug_tmp_restore_column_map(table->read_set, old_map);
bool res= se->do_insert();
DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
}
......
......@@ -24,6 +24,7 @@ typedef struct st_cassandra_share {
THR_LOCK lock;
} CASSANDRA_SHARE;
class ColumnDataConverter;
/** @brief
Class definition for the storage engine
......@@ -38,6 +39,12 @@ class ha_cassandra: public handler
/* pre-allocated array of #fields elements */
NameAndValue *names_and_vals;
NameAndValue *get_names_and_vals();
ColumnDataConverter **field_converters;
uint n_field_converters;
bool setup_field_converters(Field **field, uint n_fields);
void free_field_converters();
public:
ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
~ha_cassandra()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment