Commit 81817412 authored by Sergey Petrunya's avatar Sergey Petrunya

Read records in batches when doing full table scan.

parent 38bc66fb
...@@ -68,6 +68,7 @@ class Cassandra_se_impl: public Cassandra_se_interface ...@@ -68,6 +68,7 @@ class Cassandra_se_impl: public Cassandra_se_interface
std::string rowkey; /* key of the record we're returning now */ std::string rowkey; /* key of the record we're returning now */
SlicePredicate slice_pred; SlicePredicate slice_pred;
bool get_slices_returned_less;
public: public:
Cassandra_se_impl() : cass(NULL) {} Cassandra_se_impl() : cass(NULL) {}
virtual ~Cassandra_se_impl(){ delete cass; } virtual ~Cassandra_se_impl(){ delete cass; }
...@@ -92,9 +93,9 @@ public: ...@@ -92,9 +93,9 @@ public:
void get_read_rowkey(char **value, int *value_len); void get_read_rowkey(char **value, int *value_len);
/* Reads, multi-row scans */ /* Reads, multi-row scans */
bool get_range_slices(); bool get_range_slices(bool last_key_as_start_key);
void finish_reading_range_slices(); void finish_reading_range_slices();
bool get_next_range_slice_row(); bool get_next_range_slice_row(bool *eof);
/* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */ /* Setup that's necessary before a multi-row read. (todo: use it before point lookups, too) */
void clear_read_columns(); void clear_read_columns();
...@@ -369,7 +370,7 @@ void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len) ...@@ -369,7 +370,7 @@ void Cassandra_se_impl::get_read_rowkey(char **value, int *value_len)
} }
bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as parameters bool Cassandra_se_impl::get_range_slices(bool last_key_as_start_key)
{ {
bool res= true; bool res= true;
...@@ -380,11 +381,16 @@ bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as para ...@@ -380,11 +381,16 @@ bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as para
// Try passing nothing... // Try passing nothing...
KeyRange key_range; // Try passing nothing, too. KeyRange key_range; // Try passing nothing, too.
key_range.__isset.start_key=true; key_range.__isset.start_key= true;
key_range.__isset.end_key=true; key_range.__isset.end_key= true;
key_range.start_key.assign("", 0);
key_range.end_key.assign("", 0); if (last_key_as_start_key)
key_range.start_key= rowkey;
else
key_range.start_key.assign("", 0);
key_range.end_key.assign("", 0);
key_range.count= read_batch_size;
try { try {
cass->get_range_slices(key_slice_vec, cass->get_range_slices(key_slice_vec,
...@@ -392,6 +398,11 @@ bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as para ...@@ -392,6 +398,11 @@ bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as para
cur_consistency_level); cur_consistency_level);
res= false; res= false;
if (key_slice_vec.size() < (uint)read_batch_size)
get_slices_returned_less= true;
else
get_slices_returned_less= false;
} catch (InvalidRequestException ire) { } catch (InvalidRequestException ire) {
print_error("%s [%s]", ire.what(), ire.why.c_str()); print_error("%s [%s]", ire.what(), ire.why.c_str());
} catch (UnavailableException ue) { } catch (UnavailableException ue) {
...@@ -405,11 +416,32 @@ bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as para ...@@ -405,11 +416,32 @@ bool Cassandra_se_impl::get_range_slices() //todo: start_range/end_range as para
} }
bool Cassandra_se_impl::get_next_range_slice_row() /* Switch to next row. This may produce an error */
bool Cassandra_se_impl::get_next_range_slice_row(bool *eof)
{ {
if (key_slice_it == key_slice_vec.end()) if (key_slice_it == key_slice_vec.end())
return true; {
if (get_slices_returned_less)
{
*eof= true;
return false;
}
/*
We have read through all columns in this batch. Try getting the next
batch.
*/
if (get_range_slices(true))
return true;
if (key_slice_vec.empty())
{
*eof= true;
return false;
}
}
*eof= false;
column_data_vec= key_slice_it->columns; column_data_vec= key_slice_it->columns;
rowkey= key_slice_it->key; rowkey= key_slice_it->key;
column_data_it= column_data_vec.begin(); column_data_it= column_data_vec.begin();
......
...@@ -39,9 +39,11 @@ public: ...@@ -39,9 +39,11 @@ public:
virtual void get_read_rowkey(char **value, int *value_len)=0; virtual void get_read_rowkey(char **value, int *value_len)=0;
/* Reads, multi-row scans */ /* Reads, multi-row scans */
virtual bool get_range_slices()=0; int read_batch_size;
virtual bool get_range_slices(bool last_key_as_start_key)=0;
virtual void finish_reading_range_slices()=0; virtual void finish_reading_range_slices()=0;
virtual bool get_next_range_slice_row()=0; virtual bool get_next_range_slice_row(bool *eof)=0;
/* read_set setup */ /* read_set setup */
virtual void clear_read_columns()=0; virtual void clear_read_columns()=0;
......
...@@ -212,7 +212,8 @@ static handler* cassandra_create_handler(handlerton *hton, ...@@ -212,7 +212,8 @@ static handler* cassandra_create_handler(handlerton *hton,
ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg) ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
:handler(hton, table_arg), :handler(hton, table_arg),
se(NULL), field_converters(NULL),rowkey_converter(NULL) se(NULL), field_converters(NULL), rowkey_converter(NULL),
rnd_batch_size(10*1000)
{} {}
...@@ -760,7 +761,8 @@ int ha_cassandra::rnd_init(bool scan) ...@@ -760,7 +761,8 @@ int ha_cassandra::rnd_init(bool scan)
for (uint i= 1; i < table->s->fields; i++) for (uint i= 1; i < table->s->fields; i++)
se->add_read_column(table->field[i]->field_name); se->add_read_column(table->field[i]->field_name);
bres= se->get_range_slices(); se->read_batch_size= rnd_batch_size;
bres= se->get_range_slices(false);
if (bres) if (bres)
my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str()); my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
...@@ -780,17 +782,23 @@ int ha_cassandra::rnd_end() ...@@ -780,17 +782,23 @@ int ha_cassandra::rnd_end()
int ha_cassandra::rnd_next(uchar *buf) int ha_cassandra::rnd_next(uchar *buf)
{ {
int rc; int rc;
bool reached_eof;
DBUG_ENTER("ha_cassandra::rnd_next"); DBUG_ENTER("ha_cassandra::rnd_next");
// Unpack and return the next record. // Unpack and return the next record.
if (se->get_next_range_slice_row()) if (se->get_next_range_slice_row(&reached_eof))
{ {
rc= HA_ERR_END_OF_FILE; rc= HA_ERR_INTERNAL_ERROR;
} }
else else
{ {
read_cassandra_columns(true); if (reached_eof)
rc= 0; rc= HA_ERR_END_OF_FILE;
else
{
read_cassandra_columns(true);
rc= 0;
}
} }
DBUG_RETURN(rc); DBUG_RETURN(rc);
......
...@@ -45,6 +45,8 @@ class ha_cassandra: public handler ...@@ -45,6 +45,8 @@ class ha_cassandra: public handler
void free_field_converters(); void free_field_converters();
void read_cassandra_columns(bool unpack_pk); void read_cassandra_columns(bool unpack_pk);
ha_rows rnd_batch_size;
public: public:
ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg); ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg);
~ha_cassandra() ~ha_cassandra()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment