Commit e9c6a989 authored by Will DeVries's avatar Will DeVries Committed by Sergei Petrunia

Implement index_read, index_first, and index_last.

parent 99831d70
......@@ -29,7 +29,8 @@ enum clustrix_commands {
CLUSTRIX_KEY_READ,
CLUSTRIX_KEY_DELETE,
CLUSTRIX_SCAN_QUERY,
CLUSTRIX_KEY_UPDATE
CLUSTRIX_KEY_UPDATE,
CLUSTRIX_SCAN_FROM_KEY
};
/****************************************************************************
......@@ -38,6 +39,7 @@ enum clustrix_commands {
void clustrix_connection::disconnect(bool is_destructor)
{
DBUG_ENTER("clustrix_connection::disconnect");
if (is_destructor)
{
/*
......@@ -48,13 +50,14 @@ void clustrix_connection::disconnect(bool is_destructor)
clustrix_net.net.thd = NULL;
}
mysql_close(&clustrix_net);
DBUG_VOID_RETURN;
}
int clustrix_connection::connect()
{
int error_code = 0;
my_bool my_true = 1;
DBUG_ENTER("connect");
DBUG_ENTER("clustrix_connection::connect");
/* Validate the connection parameters */
if (!strcmp(clustrix_socket, ""))
......@@ -440,6 +443,49 @@ int clustrix_connection::scan_query(String &stmt, uchar *fieldtype, uint fields,
return error_code;
}
int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index,
enum scan_type scan_dir,
bool sorted_scan, MY_BITMAP *read_set,
uchar *packed_key,
ulong packed_key_length,
ulonglong *scan_refid)
{
int error_code;
command_length = 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_FROM_KEY)))
return error_code;
if ((error_code = add_command_operand_ulonglong(clustrix_table_oid)))
return error_code;
if ((error_code = add_command_operand_uint(index)))
return error_code;
if ((error_code = add_command_operand_uchar(scan_dir)))
return error_code;
if ((error_code = add_command_operand_uchar(sorted_scan)))
return error_code;
if ((error_code = add_command_operand_str(packed_key, packed_key_length)))
return error_code;
if ((error_code = add_command_operand_bitmap(read_set)))
return error_code;
if ((error_code = send_command()))
return error_code;
ulong packet_length = cli_safe_read(&clustrix_net);
if (packet_length == packet_error)
return mysql_errno(&clustrix_net);
unsigned char *pos = clustrix_net.net.read_pos;
*scan_refid = safe_net_field_length_ll(&pos, packet_length);
return error_code;
}
int clustrix_connection::scan_next(ulonglong scan_refid, uchar **rowdata,
ulong *rowdata_length)
{
......@@ -693,6 +739,9 @@ int clustrix_connection::add_command_operand_str(const uchar *str,
if (error_code)
return error_code;
if (!str_length)
return 0;
error_code = expand_command_buffer(str_length);
if (error_code)
return error_code;
......
......@@ -44,18 +44,22 @@ class clustrix_connection
clustrix_connection()
: command_buffer(NULL), command_buffer_length(0), command_length(0)
{
DBUG_ENTER("clustrix_connection::clustrix_connection");
memset(&clustrix_net, 0, sizeof(MYSQL));
has_statement_trans = FALSE;
has_transaction = FALSE;
DBUG_VOID_RETURN;
}
~clustrix_connection()
{
DBUG_ENTER("clustrix_connection::~clustrix_connection");
if (is_connected())
disconnect(TRUE);
if (command_buffer)
my_free(command_buffer);
DBUG_VOID_RETURN;
}
inline bool is_connected()
......@@ -97,6 +101,15 @@ class clustrix_connection
uchar **rowdata, ulong *rowdata_length);
enum sort_order {SORT_NONE = 0, SORT_ASC = 1, SORT_DESC = 2};
enum scan_type {
READ_KEY_OR_NEXT, /* rows with key and greater */
READ_KEY_OR_PREV, /* rows with key and less. */
READ_AFTER_KEY, /* rows with keys greater than key */
READ_BEFORE_KEY, /* rows with keys less than key */
READ_FROM_START, /* rows with forwards from first key. */
READ_FROM_LAST, /* rows with backwards from last key. */
};
int scan_table(ulonglong clustrix_table_oid, uint index,
enum sort_order sort, MY_BITMAP *read_set,
ulonglong *scan_refid);
......@@ -105,6 +118,10 @@ class clustrix_connection
int scan_query(String &stmt, uchar *fieldtype, uint fields, uchar *null_bits,
uint null_bits_size, uchar *field_metadata,
uint field_metadata_size, ulonglong *scan_refid);
int scan_from_key(ulonglong clustrix_table_oid, uint index,
enum scan_type scan_dir, bool sorted_scan,
MY_BITMAP *read_set, uchar *packed_key,
ulong packed_key_length, ulonglong *scan_refid);
int populate_table_list(LEX_CSTRING *db, handlerton::discovered_list *result);
int discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, THD *thd,
......
......@@ -528,6 +528,13 @@ int ha_clustrixdb::index_init(uint idx, bool sorted)
add_current_table_to_rpl_table_list();
scan_refid = 0;
/* Return all columns until there is a better understanding of
requirements. */
if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
return ER_OUTOFMEMORY;
bitmap_set_all(&scan_fields);
sorted_scan = sorted;
return 0;
}
......@@ -542,7 +549,7 @@ int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len,
if (!trx)
DBUG_RETURN(error_code);
is_scan = false;
is_scan = true;
key_restore(table->record[0], key, &table->key_info[active_index], key_len);
// The estimate should consider only key fields widths.
size_t packed_key_len;
......@@ -550,92 +557,82 @@ int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len,
build_key_packed_row(active_index, table->record[0],
packed_key, &packed_key_len);
uchar *rowdata;
ulong rowdata_length;
if ((error_code = trx->key_read(clustrix_table_oid, active_index,
table->read_set, packed_key, packed_key_len,
&rowdata, &rowdata_length)))
goto err;
uchar const *current_row_end;
ulong master_reclength;
if ((error_code = unpack_row(rgi, table, table->s->fields, rowdata,
table->read_set, &current_row_end,
&master_reclength, rowdata + rowdata_length)))
goto err;
//bool exact = false;
clustrix_connection::scan_type st;
switch (find_flag) {
case HA_READ_KEY_EXACT:
//exact = true;
/* fall through */
//DBUG_RETURN(ER_NOT_SUPPORTED_YET);
case HA_READ_KEY_OR_NEXT:
st = clustrix_connection::READ_KEY_OR_NEXT;
break;
case HA_READ_KEY_OR_PREV:
st = clustrix_connection::READ_KEY_OR_PREV;
break;
case HA_READ_AFTER_KEY:
st = clustrix_connection::READ_AFTER_KEY;
break;
case HA_READ_BEFORE_KEY:
st = clustrix_connection::READ_BEFORE_KEY;
break;
case HA_READ_PREFIX:
case HA_READ_PREFIX_LAST:
case HA_READ_PREFIX_LAST_OR_PREV:
case HA_READ_MBR_CONTAIN:
case HA_READ_MBR_INTERSECT:
case HA_READ_MBR_WITHIN:
case HA_READ_MBR_DISJOINT:
case HA_READ_MBR_EQUAL:
DBUG_RETURN(ER_NOT_SUPPORTED_YET);
}
err:
error_code = trx->scan_from_key(clustrix_table_oid, active_index, st,
sorted_scan, &scan_fields, packed_key,
packed_key_len, &scan_refid);
if (packed_key)
my_afree(packed_key);
DBUG_RETURN(error_code);
if (error_code)
DBUG_RETURN(error_code);
DBUG_RETURN(rnd_next(buf));
}
int ha_clustrixdb::index_first(uchar *buf)
{
DBUG_ENTER("ha_clustrixdb::index_read");
int error_code = 0;
THD *thd = ha_thd();
clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
is_scan = true;
if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
return ER_OUTOFMEMORY;
#if 0
if (table->s->keys)
table->mark_columns_used_by_index(table->s->primary_key, &scan_fields);
else
bitmap_clear_all(&scan_fields);
bitmap_union(&scan_fields, table->read_set);
#else
/* Why is read_set not setup correctly? */
bitmap_set_all(&scan_fields);
#endif
if ((error_code = trx->scan_table(clustrix_table_oid, active_index,
clustrix_connection::SORT_NONE,
&scan_fields, &scan_refid)))
return error_code;
DBUG_RETURN(error_code);
if ((error_code = trx->scan_from_key(clustrix_table_oid, active_index,
clustrix_connection::READ_FROM_START,
sorted_scan, &scan_fields, NULL, 0,
&scan_refid)))
DBUG_RETURN(error_code);
return rnd_next(buf);
DBUG_RETURN(rnd_next(buf));
}
int ha_clustrixdb::index_last(uchar *buf)
{
DBUG_ENTER("ha_clustrixdb::index_read");
int error_code = 0;
THD *thd = ha_thd();
clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
is_scan = true;
if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
return ER_OUTOFMEMORY;
#if 0
if (table->s->keys)
table->mark_columns_used_by_index(table->s->primary_key, &scan_fields);
else
bitmap_clear_all(&scan_fields);
bitmap_union(&scan_fields, table->read_set);
#else
/* Why is read_set not setup correctly? */
bitmap_set_all(&scan_fields);
#endif
if ((error_code = trx->scan_table(clustrix_table_oid, active_index,
clustrix_connection::SORT_NONE,
&scan_fields, &scan_refid)))
return error_code;
DBUG_RETURN(error_code);
return rnd_next(buf);
if ((error_code = trx->scan_from_key(clustrix_table_oid, active_index,
clustrix_connection::READ_FROM_LAST,
sorted_scan, &scan_fields, NULL, 0,
&scan_refid)))
DBUG_RETURN(error_code);
DBUG_RETURN(rnd_next(buf));
}
int ha_clustrixdb::index_next(uchar *buf)
......
......@@ -41,6 +41,7 @@ class ha_clustrixdb : public handler
ulonglong scan_refid;
bool is_scan;
MY_BITMAP scan_fields;
bool sorted_scan;
public:
ha_clustrixdb(handlerton *hton, TABLE_SHARE *table_arg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment