Commit 4db207d5 authored by Sergey Petrunya's avatar Sergey Petrunya

Cassandra SE: lazy connections

- Don't connect right away in ha_cassandra::open. If we do this, it becomes 
  impossible to do SHOW CREATE TABLE when the server is not present. 
- Note: CREATE TABLE still requires that connection is present, as it needs 
  to check whether the specified DDL can be used with Cassandra.  We could
  delay that check also, but then one would not be able to find out about 
  errors in table DDL until they do a SELECT.
parent 2d88c4be
...@@ -159,7 +159,7 @@ class Cassandra_se_impl: public Cassandra_se_interface ...@@ -159,7 +159,7 @@ class Cassandra_se_impl: public Cassandra_se_interface
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
// Connection and setup // Connection and setup
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
Cassandra_se_interface *get_cassandra_se() Cassandra_se_interface *create_cassandra_se()
{ {
return new Cassandra_se_impl; return new Cassandra_se_impl;
} }
......
...@@ -110,5 +110,5 @@ class Cassandra_status_vars ...@@ -110,5 +110,5 @@ class Cassandra_status_vars
extern Cassandra_status_vars cassandra_counters; extern Cassandra_status_vars cassandra_counters;
Cassandra_se_interface *get_cassandra_se(); Cassandra_se_interface *create_cassandra_se();
...@@ -375,21 +375,16 @@ const char **ha_cassandra::bas_ext() const ...@@ -375,21 +375,16 @@ const char **ha_cassandra::bas_ext() const
} }
int ha_cassandra::open(const char *name, int mode, uint test_if_locked) int ha_cassandra::connect_and_check_options(TABLE *table_arg)
{ {
ha_table_option_struct *options= table->s->option_struct; ha_table_option_struct *options= table_arg->s->option_struct;
int res; int res;
DBUG_ENTER("ha_cassandra::open"); DBUG_ENTER("ha_cassandra::connect_and_check_options");
if (!(share = get_share(name, table)))
DBUG_RETURN(1);
thr_lock_data_init(&share->lock,&lock,NULL);
DBUG_ASSERT(!se);
if ((res= check_table_options(options))) if ((res= check_table_options(options)))
DBUG_RETURN(res); DBUG_RETURN(res);
se= get_cassandra_se(); se= create_cassandra_se();
se->set_column_family(options->column_family); se->set_column_family(options->column_family);
const char *thrift_host= options->thrift_host? options->thrift_host: const char *thrift_host= options->thrift_host? options->thrift_host:
cassandra_default_thrift_host; cassandra_default_thrift_host;
...@@ -399,11 +394,36 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked) ...@@ -399,11 +394,36 @@ int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
DBUG_RETURN(HA_ERR_NO_CONNECTION); DBUG_RETURN(HA_ERR_NO_CONNECTION);
} }
if (setup_field_converters(table->field, table->s->fields)) if (setup_field_converters(table_arg->field, table_arg->s->fields))
{ {
DBUG_RETURN(HA_ERR_NO_CONNECTION); DBUG_RETURN(HA_ERR_NO_CONNECTION);
} }
DBUG_RETURN(0);
}
int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
{
DBUG_ENTER("ha_cassandra::open");
if (!(share = get_share(name, table)))
DBUG_RETURN(1);
thr_lock_data_init(&share->lock,&lock,NULL);
DBUG_ASSERT(!se);
/*
Don't do the following on open: it prevents SHOW CREATE TABLE when the server
has gone away.
*/
/*
int res;
if ((res= connect_and_check_options(table)))
{
DBUG_RETURN(res);
}
*/
info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST); info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
insert_lineno= 0; insert_lineno= 0;
...@@ -444,7 +464,7 @@ int ha_cassandra::check_table_options(ha_table_option_struct *options) ...@@ -444,7 +464,7 @@ int ha_cassandra::check_table_options(ha_table_option_struct *options)
/** /**
@brief @brief
create() is called to create a database. The variable name will have the name create() is called to create a table. The variable name will have the name
of the table. of the table.
@details @details
...@@ -485,45 +505,10 @@ int ha_cassandra::create(const char *name, TABLE *table_arg, ...@@ -485,45 +505,10 @@ int ha_cassandra::create(const char *name, TABLE *table_arg,
DBUG_RETURN(HA_WRONG_CREATE_OPTION); DBUG_RETURN(HA_WRONG_CREATE_OPTION);
} }
#ifndef DBUG_OFF
/*
DBUG_PRINT("info", ("strparam: '%-.64s' ullparam: %llu enumparam: %u "\
"boolparam: %u",
(options->strparam ? options->strparam : "<NULL>"),
options->ullparam, options->enumparam, options->boolparam));
psergey-todo: check table definition!
for (Field **field= table_arg->s->field; *field; field++)
{
ha_field_option_struct *field_options= (*field)->option_struct;
DBUG_ASSERT(field_options);
DBUG_PRINT("info", ("field: %s complex: '%-.64s'",
(*field)->field_name,
(field_options->complex_param_to_parse_it_in_engine ?
field_options->complex_param_to_parse_it_in_engine :
"<NULL>")));
}
*/
#endif
DBUG_ASSERT(!se); DBUG_ASSERT(!se);
if ((res= check_table_options(options))) if ((res= connect_and_check_options(table_arg)))
DBUG_RETURN(res); DBUG_RETURN(res);
se= get_cassandra_se();
se->set_column_family(options->column_family);
const char *thrift_host= options->thrift_host? options->thrift_host:
cassandra_default_thrift_host;
if (se->connect(thrift_host, options->thrift_port, options->keyspace))
{
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
if (setup_field_converters(table_arg->s->field, table_arg->s->fields))
{
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), "setup_field_converters");
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
insert_lineno= 0; insert_lineno= 0;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1064,6 +1049,14 @@ void ha_cassandra::free_field_converters() ...@@ -1064,6 +1049,14 @@ void ha_cassandra::free_field_converters()
} }
int ha_cassandra::index_init(uint idx, bool sorted)
{
int ires;
if (!se && (ires= connect_and_check_options(table)))
return ires;
return 0;
}
void store_key_image_to_rec(Field *field, uchar *ptr, uint len); void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
int ha_cassandra::index_read_map(uchar *buf, const uchar *key, int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
...@@ -1198,8 +1191,12 @@ int ha_cassandra::read_cassandra_columns(bool unpack_pk) ...@@ -1198,8 +1191,12 @@ int ha_cassandra::read_cassandra_columns(bool unpack_pk)
int ha_cassandra::write_row(uchar *buf) int ha_cassandra::write_row(uchar *buf)
{ {
my_bitmap_map *old_map; my_bitmap_map *old_map;
int ires;
DBUG_ENTER("ha_cassandra::write_row"); DBUG_ENTER("ha_cassandra::write_row");
if (!se && (ires= connect_and_check_options(table)))
DBUG_RETURN(ires);
if (!doing_insert_batch) if (!doing_insert_batch)
se->clear_insert_buffer(); se->clear_insert_buffer();
...@@ -1260,6 +1257,10 @@ int ha_cassandra::write_row(uchar *buf) ...@@ -1260,6 +1257,10 @@ int ha_cassandra::write_row(uchar *buf)
void ha_cassandra::start_bulk_insert(ha_rows rows) void ha_cassandra::start_bulk_insert(ha_rows rows)
{ {
int ires;
if (!se && (ires= connect_and_check_options(table)))
return;
doing_insert_batch= true; doing_insert_batch= true;
insert_rows_batched= 0; insert_rows_batched= 0;
...@@ -1283,7 +1284,12 @@ int ha_cassandra::end_bulk_insert() ...@@ -1283,7 +1284,12 @@ int ha_cassandra::end_bulk_insert()
int ha_cassandra::rnd_init(bool scan) int ha_cassandra::rnd_init(bool scan)
{ {
bool bres; bool bres;
int ires;
DBUG_ENTER("ha_cassandra::rnd_init"); DBUG_ENTER("ha_cassandra::rnd_init");
if (!se && (ires= connect_and_check_options(table)))
DBUG_RETURN(ires);
if (!scan) if (!scan)
{ {
/* Prepare for rnd_pos() calls. We don't need to anything. */ /* Prepare for rnd_pos() calls. We don't need to anything. */
...@@ -1338,8 +1344,12 @@ int ha_cassandra::rnd_next(uchar *buf) ...@@ -1338,8 +1344,12 @@ int ha_cassandra::rnd_next(uchar *buf)
int ha_cassandra::delete_all_rows() int ha_cassandra::delete_all_rows()
{ {
bool bres; bool bres;
int ires;
DBUG_ENTER("ha_cassandra::delete_all_rows"); DBUG_ENTER("ha_cassandra::delete_all_rows");
if (!se && (ires= connect_and_check_options(table)))
DBUG_RETURN(ires);
bres= se->truncate(); bres= se->truncate();
if (bres) if (bres)
......
...@@ -69,6 +69,7 @@ class ha_cassandra: public handler ...@@ -69,6 +69,7 @@ class ha_cassandra: public handler
ha_rows insert_lineno; ha_rows insert_lineno;
void print_conversion_error(const char *field_name, void print_conversion_error(const char *field_name,
char *cass_value, int cass_value_len); char *cass_value, int cass_value_len);
int connect_and_check_options(TABLE *table_arg);
public: public:
ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg); ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
~ha_cassandra() ~ha_cassandra()
...@@ -152,7 +153,8 @@ class ha_cassandra: public handler ...@@ -152,7 +153,8 @@ class ha_cassandra: public handler
*/ */
uint max_supported_key_length() const { return 16*1024; /* just to return something*/ } uint max_supported_key_length() const { return 16*1024; /* just to return something*/ }
/* At the moment, we're ok with default handler::index_init() implementation. */ int index_init(uint idx, bool sorted);
int index_read_map(uchar * buf, const uchar * key, int index_read_map(uchar * buf, const uchar * key,
key_part_map keypart_map, key_part_map keypart_map,
enum ha_rkey_function find_flag); enum ha_rkey_function find_flag);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment