Commit a5ffb981 authored by Will DeVries's avatar Will DeVries Committed by Sergei Petrunia

Buffer rows from Clustrix.

parent 17ee32fb
...@@ -355,9 +355,147 @@ int clustrix_connection::key_read(ulonglong clustrix_table_oid, uint index, ...@@ -355,9 +355,147 @@ int clustrix_connection::key_read(ulonglong clustrix_table_oid, uint index,
return 0; return 0;
} }
class clustrix_connection_cursor {
struct rowdata {
ulong length;
uchar *data;
};
ulong current_row;
ulong last_row;
struct rowdata *rows;
uchar *outstanding_row; // to be freed on next request.
MYSQL *clustrix_net;
public:
ulong buffer_size;
ulonglong scan_refid;
bool eof_reached;
private:
int cache_row(uchar *rowdata, ulong rowdata_length)
{
DBUG_ENTER("clustrix_connection_cursor::cache_row");
rows[last_row].length = rowdata_length;
rows[last_row].data = (uchar *)my_malloc(rowdata_length, MYF(MY_WME));
if (!rows[last_row].data)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
memcpy(rows[last_row].data, rowdata, rowdata_length);
last_row++;
DBUG_RETURN(0);
}
int load_rows_impl()
{
DBUG_ENTER("clustrix_connection_cursor::load_rows_impl");
int error_code = 0;
ulong packet_length = cli_safe_read(clustrix_net);
if (packet_length == packet_error) {
error_code = mysql_errno(clustrix_net);
if (error_code == HA_ERR_END_OF_FILE) {
// We have read all rows for query.
eof_reached = TRUE;
DBUG_RETURN(0);
}
DBUG_RETURN(error_code);
}
uchar *rowdata = clustrix_net->net.read_pos;
ulong rowdata_length = safe_net_field_length_ll(&rowdata, packet_length);
if (!rowdata_length) {
// We have read all rows in this batch.
DBUG_RETURN(0);
}
if ((error_code = cache_row(rowdata, rowdata_length)))
DBUG_RETURN(error_code);
DBUG_RETURN(load_rows_impl());
}
public:
clustrix_connection_cursor(MYSQL *clustrix_net_, ulong bufsize)
{
DBUG_ENTER("clustrix_connection_cursor::clustrix_connection_cursor");
clustrix_net = clustrix_net_;
eof_reached = FALSE;
current_row = 0;
last_row = 0;
outstanding_row = NULL;
buffer_size = bufsize;
rows = NULL;
DBUG_VOID_RETURN;
}
~clustrix_connection_cursor()
{
DBUG_ENTER("clustrix_connection_cursor::~clustrix_connection_cursor");
if (outstanding_row)
my_free(outstanding_row);
while (current_row < last_row)
my_free(rows[current_row++].data);
if (rows)
my_free(rows);
DBUG_VOID_RETURN;
}
int load_rows()
{
DBUG_ENTER("clustrix_connection_cursor::load_rows");
current_row = 0;
last_row = 0;
DBUG_RETURN(load_rows_impl());
}
int initialize()
{
DBUG_ENTER("clustrix_connection_cursor::initialize");
ulong packet_length = cli_safe_read(clustrix_net);
if (packet_length == packet_error)
DBUG_RETURN(mysql_errno(clustrix_net));
unsigned char *pos = clustrix_net->net.read_pos;
scan_refid = safe_net_field_length_ll(&pos, packet_length);
rows = (struct rowdata *)my_malloc(buffer_size * sizeof(struct rowdata),
MYF(MY_WME));
if (!rows)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
DBUG_RETURN(load_rows());
}
uchar *retrieve_row(ulong *rowdata_length)
{
DBUG_ENTER("clustrix_connection_cursor::retrieve_row");
if (outstanding_row) {
my_free(outstanding_row);
outstanding_row = NULL;
}
if (current_row == last_row)
DBUG_RETURN(NULL);
*rowdata_length = rows[current_row].length;
outstanding_row = rows[current_row].data;
current_row++;
DBUG_RETURN(outstanding_row);
}
};
int allocate_clustrix_connection_cursor(MYSQL *clustrix_net, ulong buffer_size,
clustrix_connection_cursor **scan)
{
DBUG_ENTER("allocate_clustrix_connection_cursor");
*scan = new clustrix_connection_cursor(clustrix_net, buffer_size);
if (!*scan)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
DBUG_RETURN((*scan)->initialize());
}
int clustrix_connection::scan_table(ulonglong clustrix_table_oid, uint index, int clustrix_connection::scan_table(ulonglong clustrix_table_oid, uint index,
enum sort_order sort, MY_BITMAP *read_set, enum sort_order sort, MY_BITMAP *read_set,
ulonglong *scan_refid) ushort row_req,
clustrix_connection_cursor **scan)
{ {
int error_code; int error_code;
command_length = 0; command_length = 0;
...@@ -365,6 +503,9 @@ int clustrix_connection::scan_table(ulonglong clustrix_table_oid, uint index, ...@@ -365,6 +503,9 @@ int clustrix_connection::scan_table(ulonglong clustrix_table_oid, uint index,
if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_TABLE))) if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_TABLE)))
return error_code; return error_code;
if ((error_code = add_command_operand_ushort(row_req)))
return error_code;
if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) if ((error_code = add_command_operand_ulonglong(clustrix_table_oid)))
return error_code; return error_code;
...@@ -380,13 +521,7 @@ int clustrix_connection::scan_table(ulonglong clustrix_table_oid, uint index, ...@@ -380,13 +521,7 @@ int clustrix_connection::scan_table(ulonglong clustrix_table_oid, uint index,
if ((error_code = send_command())) if ((error_code = send_command()))
return error_code; return error_code;
ulong packet_length = cli_safe_read(&clustrix_net); return allocate_clustrix_connection_cursor(&clustrix_net, row_req, scan);
if (packet_length == packet_error)
return mysql_errno(&clustrix_net);
unsigned char *pos = clustrix_net.net.read_pos;
*scan_refid = safe_net_field_length_ll(&pos, packet_length);
return error_code;
} }
/** /**
...@@ -410,7 +545,8 @@ int clustrix_connection::scan_query(String &stmt, uchar *fieldtype, uint fields, ...@@ -410,7 +545,8 @@ int clustrix_connection::scan_query(String &stmt, uchar *fieldtype, uint fields,
uchar *null_bits, uint null_bits_size, uchar *null_bits, uint null_bits_size,
uchar *field_metadata, uchar *field_metadata,
uint field_metadata_size, uint field_metadata_size,
ulonglong *scan_refid) ushort row_req,
clustrix_connection_cursor **scan)
{ {
int error_code; int error_code;
command_length = 0; command_length = 0;
...@@ -418,6 +554,9 @@ int clustrix_connection::scan_query(String &stmt, uchar *fieldtype, uint fields, ...@@ -418,6 +554,9 @@ int clustrix_connection::scan_query(String &stmt, uchar *fieldtype, uint fields,
if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_QUERY))) if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_QUERY)))
return error_code; return error_code;
if ((error_code = add_command_operand_ushort(row_req)))
return error_code;
if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length()))) if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length())))
return error_code; return error_code;
...@@ -434,13 +573,7 @@ int clustrix_connection::scan_query(String &stmt, uchar *fieldtype, uint fields, ...@@ -434,13 +573,7 @@ int clustrix_connection::scan_query(String &stmt, uchar *fieldtype, uint fields,
if ((error_code = send_command())) if ((error_code = send_command()))
return error_code; return error_code;
ulong packet_length = cli_safe_read(&clustrix_net); return allocate_clustrix_connection_cursor(&clustrix_net, row_req, scan);
if (packet_length == packet_error)
return mysql_errno(&clustrix_net);
unsigned char *pos = clustrix_net.net.read_pos;
*scan_refid = safe_net_field_length_ll(&pos, packet_length);
return error_code;
} }
int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index, int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index,
...@@ -448,7 +581,8 @@ int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index, ...@@ -448,7 +581,8 @@ int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index,
bool sorted_scan, MY_BITMAP *read_set, bool sorted_scan, MY_BITMAP *read_set,
uchar *packed_key, uchar *packed_key,
ulong packed_key_length, ulong packed_key_length,
ulonglong *scan_refid) ushort row_req,
clustrix_connection_cursor **scan)
{ {
int error_code; int error_code;
command_length = 0; command_length = 0;
...@@ -456,6 +590,9 @@ int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index, ...@@ -456,6 +590,9 @@ int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index,
if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_FROM_KEY))) if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_FROM_KEY)))
return error_code; return error_code;
if ((error_code = add_command_operand_ushort(row_req)))
return error_code;
if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) if ((error_code = add_command_operand_ulonglong(clustrix_table_oid)))
return error_code; return error_code;
...@@ -477,44 +614,54 @@ int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index, ...@@ -477,44 +614,54 @@ int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index,
if ((error_code = send_command())) if ((error_code = send_command()))
return error_code; return error_code;
ulong packet_length = cli_safe_read(&clustrix_net); return allocate_clustrix_connection_cursor(&clustrix_net, row_req, scan);
if (packet_length == packet_error)
return mysql_errno(&clustrix_net);
unsigned char *pos = clustrix_net.net.read_pos;
*scan_refid = safe_net_field_length_ll(&pos, packet_length);
return error_code;
} }
int clustrix_connection::scan_next(ulonglong scan_refid, uchar **rowdata, int clustrix_connection::scan_next(clustrix_connection_cursor *scan,
ulong *rowdata_length) uchar **rowdata, ulong *rowdata_length)
{ {
*rowdata = scan->retrieve_row(rowdata_length);
if (*rowdata)
return 0;
if (scan->eof_reached)
return HA_ERR_END_OF_FILE;
int error_code; int error_code;
command_length = 0; command_length = 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_NEXT))) if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_NEXT)))
return error_code; return error_code;
if ((error_code = add_command_operand_lcb(scan_refid))) if ((error_code = add_command_operand_ushort(scan->buffer_size)))
return error_code;
if ((error_code = add_command_operand_lcb(scan->scan_refid)))
return error_code; return error_code;
if ((error_code = send_command())) if ((error_code = send_command()))
return error_code; return error_code;
ulong packet_length = cli_safe_read(&clustrix_net); if ((error_code = scan->load_rows()))
if (packet_length == packet_error) return error_code;
return mysql_errno(&clustrix_net);
*rowdata = clustrix_net.net.read_pos; *rowdata = scan->retrieve_row(rowdata_length);
*rowdata_length = safe_net_field_length_ll(rowdata, packet_length); if (!*rowdata)
return HA_ERR_END_OF_FILE;
return 0; return 0;
} }
int clustrix_connection::scan_end(ulonglong scan_refid) int clustrix_connection::scan_end(clustrix_connection_cursor *scan)
{ {
int error_code; int error_code;
command_length = 0; command_length = 0;
ulonglong scan_refid = scan->scan_refid;
bool eof_reached = scan->eof_reached;
delete scan;
if (eof_reached)
return 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_STOP))) if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_STOP)))
return error_code; return error_code;
...@@ -696,6 +843,18 @@ int clustrix_connection::add_command_operand_uchar(uchar value) ...@@ -696,6 +843,18 @@ int clustrix_connection::add_command_operand_uchar(uchar value)
return 0; return 0;
} }
int clustrix_connection::add_command_operand_ushort(ushort value)
{
ushort be_value = htobe16(value);
int error_code = expand_command_buffer(sizeof(be_value));
if (error_code)
return error_code;
memcpy(command_buffer + command_length, &be_value, sizeof(be_value));
command_length += sizeof(be_value);
return 0;
}
int clustrix_connection::add_command_operand_uint(uint value) int clustrix_connection::add_command_operand_uint(uint value)
{ {
uint be_value = htobe32(value); uint be_value = htobe32(value);
......
...@@ -21,6 +21,7 @@ Copyright (c) 2019, MariaDB Corporation. ...@@ -21,6 +21,7 @@ Copyright (c) 2019, MariaDB Corporation.
#define CLUSTRIX_SERVER_REQUEST 30 #define CLUSTRIX_SERVER_REQUEST 30
class clustrix_connection_cursor;
class clustrix_connection class clustrix_connection
{ {
private: private:
...@@ -28,7 +29,6 @@ class clustrix_connection ...@@ -28,7 +29,6 @@ class clustrix_connection
# define COMMAND_BUFFER_SIZE_INCREMENT_BITS 10 # define COMMAND_BUFFER_SIZE_INCREMENT_BITS 10
MYSQL clustrix_net; MYSQL clustrix_net;
MYSQL_RES *results;
uchar *command_buffer; uchar *command_buffer;
size_t command_buffer_length; size_t command_buffer_length;
size_t command_length; size_t command_length;
...@@ -111,24 +111,29 @@ class clustrix_connection ...@@ -111,24 +111,29 @@ class clustrix_connection
}; };
int scan_table(ulonglong clustrix_table_oid, uint index, int scan_table(ulonglong clustrix_table_oid, uint index,
enum sort_order sort, MY_BITMAP *read_set, enum sort_order sort, MY_BITMAP *read_set, ushort row_req,
ulonglong *scan_refid); clustrix_connection_cursor **scan);
int scan_next(ulonglong scan_refid, uchar **rowdata, ulong *rowdata_length);
int scan_end(ulonglong scan_refid);
int scan_query(String &stmt, uchar *fieldtype, uint fields, uchar *null_bits, int scan_query(String &stmt, uchar *fieldtype, uint fields, uchar *null_bits,
uint null_bits_size, uchar *field_metadata, uint null_bits_size, uchar *field_metadata,
uint field_metadata_size, ulonglong *scan_refid); uint field_metadata_size, ushort row_req,
clustrix_connection_cursor **scan);
int scan_from_key(ulonglong clustrix_table_oid, uint index, int scan_from_key(ulonglong clustrix_table_oid, uint index,
enum scan_type scan_dir, bool sorted_scan, enum scan_type scan_dir, bool sorted_scan,
MY_BITMAP *read_set, uchar *packed_key, MY_BITMAP *read_set, uchar *packed_key,
ulong packed_key_length, ulonglong *scan_refid); ulong packed_key_length, ushort row_req,
clustrix_connection_cursor **scan);
int scan_next(clustrix_connection_cursor *scan, uchar **rowdata,
ulong *rowdata_length);
int scan_end(clustrix_connection_cursor *scan);
int populate_table_list(LEX_CSTRING *db, handlerton::discovered_list *result); int populate_table_list(LEX_CSTRING *db, handlerton::discovered_list *result);
int discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, THD *thd, int discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, THD *thd,
TABLE_SHARE *share); TABLE_SHARE *share);
private: private:
int expand_command_buffer(size_t add_length); int expand_command_buffer(size_t add_length);
int add_command_operand_uchar(uchar value); int add_command_operand_uchar(uchar value);
int add_command_operand_ushort(ushort value);
int add_command_operand_uint(uint value); int add_command_operand_uint(uint value);
int add_command_operand_ulonglong(ulonglong value); int add_command_operand_ulonglong(ulonglong value);
int add_command_operand_lcb(ulonglong value); int add_command_operand_lcb(ulonglong value);
......
...@@ -90,6 +90,14 @@ static MYSQL_SYSVAR_STR ...@@ -90,6 +90,14 @@ static MYSQL_SYSVAR_STR
NULL, NULL, "" NULL, NULL, ""
); );
static MYSQL_THDVAR_UINT
(
row_buffer,
PLUGIN_VAR_RQCMDARG,
"Clustrix rowstore row buffer size",
NULL, NULL, 20, 1, 65535, 0
);
// Per thread select handler knob // Per thread select handler knob
static MYSQL_THDVAR_BOOL( static MYSQL_THDVAR_BOOL(
select_handler, select_handler,
...@@ -119,6 +127,11 @@ bool derived_handler_setting(THD* thd) ...@@ -119,6 +127,11 @@ bool derived_handler_setting(THD* thd)
return ( thd == NULL ) ? false : THDVAR(thd, derived_handler); return ( thd == NULL ) ? false : THDVAR(thd, derived_handler);
} }
uint row_buffer_setting(THD* thd)
{
return THDVAR(thd, row_buffer);
}
/**************************************************************************** /****************************************************************************
** Utility functions ** Utility functions
****************************************************************************/ ****************************************************************************/
...@@ -183,7 +196,7 @@ ha_clustrixdb::ha_clustrixdb(handlerton *hton, TABLE_SHARE *table_arg) ...@@ -183,7 +196,7 @@ ha_clustrixdb::ha_clustrixdb(handlerton *hton, TABLE_SHARE *table_arg)
DBUG_ENTER("ha_clustrixdb::ha_clustrixdb"); DBUG_ENTER("ha_clustrixdb::ha_clustrixdb");
rli = NULL; rli = NULL;
rgi = NULL; rgi = NULL;
scan_refid = 0; scan_cur = NULL;
clustrix_table_oid = 0; clustrix_table_oid = 0;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -526,7 +539,7 @@ int ha_clustrixdb::index_init(uint idx, bool sorted) ...@@ -526,7 +539,7 @@ int ha_clustrixdb::index_init(uint idx, bool sorted)
active_index = idx; active_index = idx;
add_current_table_to_rpl_table_list(); add_current_table_to_rpl_table_list();
scan_refid = 0; scan_cur = NULL;
/* Return all columns until there is a better understanding of /* Return all columns until there is a better understanding of
requirements. */ requirements. */
...@@ -589,7 +602,8 @@ int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len, ...@@ -589,7 +602,8 @@ int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len,
error_code = trx->scan_from_key(clustrix_table_oid, active_index, st, error_code = trx->scan_from_key(clustrix_table_oid, active_index, st,
sorted_scan, &scan_fields, packed_key, sorted_scan, &scan_fields, packed_key,
packed_key_len, &scan_refid); packed_key_len, THDVAR(thd, row_buffer),
&scan_cur);
if (packed_key) if (packed_key)
my_afree(packed_key); my_afree(packed_key);
...@@ -601,7 +615,7 @@ int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len, ...@@ -601,7 +615,7 @@ int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len,
int ha_clustrixdb::index_first(uchar *buf) int ha_clustrixdb::index_first(uchar *buf)
{ {
DBUG_ENTER("ha_clustrixdb::index_read"); DBUG_ENTER("ha_clustrixdb::index_first");
int error_code = 0; int error_code = 0;
THD *thd = ha_thd(); THD *thd = ha_thd();
clustrix_connection *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
...@@ -611,7 +625,7 @@ int ha_clustrixdb::index_first(uchar *buf) ...@@ -611,7 +625,7 @@ int ha_clustrixdb::index_first(uchar *buf)
if ((error_code = trx->scan_from_key(clustrix_table_oid, active_index, if ((error_code = trx->scan_from_key(clustrix_table_oid, active_index,
clustrix_connection::READ_FROM_START, clustrix_connection::READ_FROM_START,
sorted_scan, &scan_fields, NULL, 0, sorted_scan, &scan_fields, NULL, 0,
&scan_refid))) THDVAR(thd, row_buffer), &scan_cur)))
DBUG_RETURN(error_code); DBUG_RETURN(error_code);
DBUG_RETURN(rnd_next(buf)); DBUG_RETURN(rnd_next(buf));
...@@ -619,7 +633,7 @@ int ha_clustrixdb::index_first(uchar *buf) ...@@ -619,7 +633,7 @@ int ha_clustrixdb::index_first(uchar *buf)
int ha_clustrixdb::index_last(uchar *buf) int ha_clustrixdb::index_last(uchar *buf)
{ {
DBUG_ENTER("ha_clustrixdb::index_read"); DBUG_ENTER("ha_clustrixdb::index_last");
int error_code = 0; int error_code = 0;
THD *thd = ha_thd(); THD *thd = ha_thd();
clustrix_connection *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
...@@ -629,7 +643,7 @@ int ha_clustrixdb::index_last(uchar *buf) ...@@ -629,7 +643,7 @@ int ha_clustrixdb::index_last(uchar *buf)
if ((error_code = trx->scan_from_key(clustrix_table_oid, active_index, if ((error_code = trx->scan_from_key(clustrix_table_oid, active_index,
clustrix_connection::READ_FROM_LAST, clustrix_connection::READ_FROM_LAST,
sorted_scan, &scan_fields, NULL, 0, sorted_scan, &scan_fields, NULL, 0,
&scan_refid))) THDVAR(thd, row_buffer), &scan_cur)))
DBUG_RETURN(error_code); DBUG_RETURN(error_code);
DBUG_RETURN(rnd_next(buf)); DBUG_RETURN(rnd_next(buf));
...@@ -658,7 +672,7 @@ int ha_clustrixdb::index_prev(uchar *buf) ...@@ -658,7 +672,7 @@ int ha_clustrixdb::index_prev(uchar *buf)
int ha_clustrixdb::index_end() int ha_clustrixdb::index_end()
{ {
DBUG_ENTER("index_prev"); DBUG_ENTER("index_prev");
if (scan_refid) if (scan_cur)
DBUG_RETURN(rnd_end()); DBUG_RETURN(rnd_end());
else else
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -675,7 +689,7 @@ int ha_clustrixdb::rnd_init(bool scan) ...@@ -675,7 +689,7 @@ int ha_clustrixdb::rnd_init(bool scan)
add_current_table_to_rpl_table_list(); add_current_table_to_rpl_table_list();
is_scan = scan; is_scan = scan;
scan_refid = 0; scan_cur = NULL;
if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false)) if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
return ER_OUTOFMEMORY; return ER_OUTOFMEMORY;
...@@ -694,7 +708,8 @@ int ha_clustrixdb::rnd_init(bool scan) ...@@ -694,7 +708,8 @@ int ha_clustrixdb::rnd_init(bool scan)
if ((error_code = trx->scan_table(clustrix_table_oid, 0, if ((error_code = trx->scan_table(clustrix_table_oid, 0,
clustrix_connection::SORT_NONE, clustrix_connection::SORT_NONE,
&scan_fields, &scan_refid))) &scan_fields, THDVAR(thd, row_buffer),
&scan_cur)))
return error_code; return error_code;
return 0; return 0;
...@@ -709,11 +724,11 @@ int ha_clustrixdb::rnd_next(uchar *buf) ...@@ -709,11 +724,11 @@ int ha_clustrixdb::rnd_next(uchar *buf)
return error_code; return error_code;
assert(is_scan); assert(is_scan);
assert(scan_refid); assert(scan_cur);
uchar *rowdata; uchar *rowdata;
ulong rowdata_length; ulong rowdata_length;
if ((error_code = trx->scan_next(scan_refid, &rowdata, &rowdata_length))) if ((error_code = trx->scan_next(scan_cur, &rowdata, &rowdata_length)))
return error_code; return error_code;
if (has_hidden_key) { if (has_hidden_key) {
...@@ -793,9 +808,9 @@ int ha_clustrixdb::rnd_end() ...@@ -793,9 +808,9 @@ int ha_clustrixdb::rnd_end()
return error_code; return error_code;
my_bitmap_free(&scan_fields); my_bitmap_free(&scan_fields);
if (scan_refid && (error_code = trx->scan_end(scan_refid))) if (scan_cur && (error_code = trx->scan_end(scan_cur)))
return error_code; return error_code;
scan_refid = 0; scan_cur = NULL;
return 0; return 0;
} }
...@@ -1063,6 +1078,7 @@ static struct st_mysql_sys_var* clustrixdb_system_variables[] = ...@@ -1063,6 +1078,7 @@ static struct st_mysql_sys_var* clustrixdb_system_variables[] =
MYSQL_SYSVAR(password), MYSQL_SYSVAR(password),
MYSQL_SYSVAR(port), MYSQL_SYSVAR(port),
MYSQL_SYSVAR(socket), MYSQL_SYSVAR(socket),
MYSQL_SYSVAR(row_buffer),
MYSQL_SYSVAR(select_handler), MYSQL_SYSVAR(select_handler),
MYSQL_SYSVAR(derived_handler), MYSQL_SYSVAR(derived_handler),
NULL NULL
......
...@@ -38,7 +38,7 @@ class ha_clustrixdb : public handler ...@@ -38,7 +38,7 @@ class ha_clustrixdb : public handler
bool has_hidden_key; bool has_hidden_key;
ulonglong last_hidden_key; ulonglong last_hidden_key;
ulonglong scan_refid; clustrix_connection_cursor *scan_cur;
bool is_scan; bool is_scan;
MY_BITMAP scan_fields; MY_BITMAP scan_fields;
bool sorted_scan; bool sorted_scan;
...@@ -110,4 +110,5 @@ class ha_clustrixdb : public handler ...@@ -110,4 +110,5 @@ class ha_clustrixdb : public handler
bool select_handler_setting(THD* thd); bool select_handler_setting(THD* thd);
bool derived_handler_setting(THD* thd); bool derived_handler_setting(THD* thd);
uint row_buffer_setting(THD* thd);
#endif // _ha_clustrixdb_h #endif // _ha_clustrixdb_h
...@@ -6,6 +6,7 @@ Copyright (c) 2019, MariaDB Corporation. ...@@ -6,6 +6,7 @@ Copyright (c) 2019, MariaDB Corporation.
#include "ha_clustrixdb_pushdown.h" #include "ha_clustrixdb_pushdown.h"
extern handlerton *clustrixdb_hton; extern handlerton *clustrixdb_hton;
extern uint clustrix_row_buffer;
/*@brief Fills up array data types, metadata and nullability*/ /*@brief Fills up array data types, metadata and nullability*/
/************************************************************ /************************************************************
...@@ -111,7 +112,7 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex) ...@@ -111,7 +112,7 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex)
select_lex->print(thd, &query, QT_ORDINARY); select_lex->print(thd, &query, QT_ORDINARY);
int error_code = 0; int error_code = 0;
int field_metadata_size = 0; int field_metadata_size = 0;
ulonglong scan_refid = 0; clustrix_connection_cursor *scan = NULL;
clustrix_connection *trx = NULL; clustrix_connection *trx = NULL;
// We presume this number is equal to types.elements in get_field_types // We presume this number is equal to types.elements in get_field_types
...@@ -138,12 +139,14 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex) ...@@ -138,12 +139,14 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex)
if (!trx) if (!trx)
goto err; goto err;
if ((error_code = trx->scan_query(query, fieldtype, items_number, if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits,
null_bits, num_null_bytes, field_metadata, field_metadata_size, &scan_refid))) { num_null_bytes, field_metadata,
field_metadata_size,
row_buffer_setting(thd), &scan))) {
goto err; goto err;
} }
sh = new ha_clustrixdb_select_handler(thd, select_lex, scan_refid); sh = new ha_clustrixdb_select_handler(thd, select_lex, scan);
err: err:
// deallocate buffers // deallocate buffers
...@@ -163,11 +166,11 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex) ...@@ -163,11 +166,11 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex)
ha_clustrixdb_select_handler::ha_clustrixdb_select_handler( ha_clustrixdb_select_handler::ha_clustrixdb_select_handler(
THD *thd, THD *thd,
SELECT_LEX* select_lex, SELECT_LEX* select_lex,
ulonglong scan_refid_) clustrix_connection_cursor *scan_)
: select_handler(thd, clustrixdb_hton) : select_handler(thd, clustrixdb_hton)
{ {
thd__ = thd; thd__ = thd;
scan_refid = scan_refid_; scan = scan_;
select = select_lex; select = select_lex;
rli = NULL; rli = NULL;
rgi = NULL; rgi = NULL;
...@@ -186,8 +189,8 @@ ha_clustrixdb_select_handler::~ha_clustrixdb_select_handler() ...@@ -186,8 +189,8 @@ ha_clustrixdb_select_handler::~ha_clustrixdb_select_handler()
if (!trx) { if (!trx) {
// TBD Log this // TBD Log this
} }
if (trx && scan_refid) if (trx && scan)
trx->scan_end(scan_refid); trx->scan_end(scan);
// If the ::init_scan has been executed // If the ::init_scan has been executed
if (table__) if (table__)
...@@ -234,11 +237,11 @@ int ha_clustrixdb_select_handler::next_row() ...@@ -234,11 +237,11 @@ int ha_clustrixdb_select_handler::next_row()
if (!trx) if (!trx)
return error_code; return error_code;
assert(scan_refid); assert(scan);
uchar *rowdata; uchar *rowdata;
ulong rowdata_length; ulong rowdata_length;
if ((error_code = trx->scan_next(scan_refid, &rowdata, &rowdata_length))) if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length)))
return error_code; return error_code;
uchar const *current_row_end; uchar const *current_row_end;
...@@ -290,9 +293,8 @@ create_clustrixdb_derived_handler(THD* thd, TABLE_LIST *derived) ...@@ -290,9 +293,8 @@ create_clustrixdb_derived_handler(THD* thd, TABLE_LIST *derived)
SELECT_LEX_UNIT *unit= derived->derived; SELECT_LEX_UNIT *unit= derived->derived;
SELECT_LEX *select_lex = unit->first_select(); SELECT_LEX *select_lex = unit->first_select();
String query; String query;
ulonglong scan_refid = 0;
dh = new ha_clustrixdb_derived_handler(thd, select_lex, scan_refid); dh = new ha_clustrixdb_derived_handler(thd, select_lex, NULL);
return dh; return dh;
} }
...@@ -307,11 +309,11 @@ create_clustrixdb_derived_handler(THD* thd, TABLE_LIST *derived) ...@@ -307,11 +309,11 @@ create_clustrixdb_derived_handler(THD* thd, TABLE_LIST *derived)
ha_clustrixdb_derived_handler::ha_clustrixdb_derived_handler( ha_clustrixdb_derived_handler::ha_clustrixdb_derived_handler(
THD *thd, THD *thd,
SELECT_LEX* select_lex, SELECT_LEX* select_lex,
ulonglong scan_refid_) clustrix_connection_cursor *scan_)
: derived_handler(thd, clustrixdb_hton) : derived_handler(thd, clustrixdb_hton)
{ {
thd__ = thd; thd__ = thd;
scan_refid = scan_refid_; scan = scan_;
select = select_lex; select = select_lex;
rli = NULL; rli = NULL;
rgi = NULL; rgi = NULL;
...@@ -333,8 +335,8 @@ ha_clustrixdb_derived_handler::~ha_clustrixdb_derived_handler() ...@@ -333,8 +335,8 @@ ha_clustrixdb_derived_handler::~ha_clustrixdb_derived_handler()
if (!trx) { if (!trx) {
// TBD Log this. // TBD Log this.
} }
if (trx && scan_refid) if (trx && scan)
trx->scan_end(scan_refid); trx->scan_end(scan);
// If the ::init_scan has been executed // If the ::init_scan has been executed
if (table__) if (table__)
...@@ -387,8 +389,10 @@ int ha_clustrixdb_derived_handler::init_scan() ...@@ -387,8 +389,10 @@ int ha_clustrixdb_derived_handler::init_scan()
if (!trx) if (!trx)
goto err; goto err;
if ((error_code = trx->scan_query(query, fieldtype, items_number, if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits,
null_bits, num_null_bytes, field_metadata, field_metadata_size, &scan_refid))) { num_null_bytes, field_metadata,
field_metadata_size,
row_buffer_setting(thd), &scan))) {
goto err; goto err;
} }
...@@ -422,11 +426,11 @@ int ha_clustrixdb_derived_handler::next_row() ...@@ -422,11 +426,11 @@ int ha_clustrixdb_derived_handler::next_row()
if (!trx) if (!trx)
return error_code; return error_code;
assert(scan_refid); assert(scan);
uchar *rowdata; uchar *rowdata;
ulong rowdata_length; ulong rowdata_length;
if ((error_code = trx->scan_next(scan_refid, &rowdata, &rowdata_length))) if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length)))
return error_code; return error_code;
uchar const *current_row_end; uchar const *current_row_end;
......
...@@ -30,7 +30,7 @@ class ha_clustrixdb_base_handler ...@@ -30,7 +30,7 @@ class ha_clustrixdb_base_handler
rpl_group_info *rgi; rpl_group_info *rgi;
Relay_log_info *rli; Relay_log_info *rli;
// CLX BE scan operation reference // CLX BE scan operation reference
ulonglong scan_refid; clustrix_connection_cursor *scan;
// To unpack rows from CLX BE // To unpack rows from CLX BE
void add_current_table_to_rpl_table_list(); void add_current_table_to_rpl_table_list();
void remove_current_table_from_rpl_table_list(); void remove_current_table_from_rpl_table_list();
...@@ -50,7 +50,7 @@ class ha_clustrixdb_select_handler: ...@@ -50,7 +50,7 @@ class ha_clustrixdb_select_handler:
{ {
public: public:
ha_clustrixdb_select_handler(THD* thd_arg, SELECT_LEX* sel, ha_clustrixdb_select_handler(THD* thd_arg, SELECT_LEX* sel,
ulonglong scan_refid); clustrix_connection_cursor *scan);
~ha_clustrixdb_select_handler(); ~ha_clustrixdb_select_handler();
int init_scan(); int init_scan();
...@@ -73,7 +73,7 @@ class ha_clustrixdb_derived_handler: ...@@ -73,7 +73,7 @@ class ha_clustrixdb_derived_handler:
{ {
public: public:
ha_clustrixdb_derived_handler(THD* thd_arg, SELECT_LEX* sel, ha_clustrixdb_derived_handler(THD* thd_arg, SELECT_LEX* sel,
ulonglong scan_refid); clustrix_connection_cursor *scan);
~ha_clustrixdb_derived_handler(); ~ha_clustrixdb_derived_handler();
int init_scan(); int init_scan();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment