Commit 51e21d54 authored by Roman Nozdrin's avatar Roman Nozdrin Committed by Sergei Petrunia

CLX-5 Select handler framework.

parent 04fc4ead
......@@ -28,6 +28,7 @@ enum clustrix_commands {
CLUSTRIX_SCAN_STOP,
CLUSTRIX_KEY_READ,
CLUSTRIX_KEY_DELETE,
CLUSTRIX_QUERY_INIT
};
/****************************************************************************
......@@ -322,6 +323,70 @@ int clustrix_connection::scan_init(ulonglong clustrix_table_oid, uint index,
return error_code;
}
/*int clustrix_connection::scan_init_select(String &stmt)
{
int error_code = mysql_real_query(&clustrix_net, stmt.ptr(), stmt.length());
if (error_code)
return mysql_errno(&clustrix_net);
results = mysql_store_result(&clustrix_net);
return error_code;
}*/
/**
* @brief
* Sends a command to initiate query scan.
* @details
* Sends a command over mysql protocol connection to initiate an
* arbitrary query using a query text.
* Uses field types, field metadata and nullability to explicitly
* cast result to expected data type. Exploits RBR TABLE_MAP_EVENT
* format + sends SQL text.
* @args
* stmt& Query text to send
* fieldtype* array of byte wide field types of result projection
* null_bits* fields nullability bitmap of result projection
* field_metadata* Field metadata of result projection
* scan_refid id used to reference this scan later
* Used in pushdowns to initiate query scan.
**/
int clustrix_connection::scan_query_init(String &stmt, uchar *fieldtype,
uint fields, uchar *null_bits,
uint null_bits_size, uchar *field_metadata,
uint field_metadata_size, ulonglong *scan_refid)
{
int error_code;
command_length = 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_QUERY_INIT)))
return error_code;
if ((error_code = add_command_operand_str(fieldtype, fields)))
return error_code;
if ((error_code = add_command_operand_str(field_metadata, field_metadata_size)))
return error_code;
// This variable length string calls for an additional store w/o lcb lenth prefix.
if ((error_code = add_command_operand_vlstr(null_bits, null_bits_size)))
return error_code;
if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length())))
return error_code;
if ((error_code = send_command()))
return error_code;
ulong packet_length = cli_safe_read(&clustrix_net);
if (packet_length == packet_error)
return mysql_errno(&clustrix_net);
unsigned char *pos = clustrix_net.net.read_pos;
*scan_refid = safe_net_field_length_ll(&pos, packet_length);
return error_code;
}
int clustrix_connection::scan_next(ulonglong scan_refid, uchar **rowdata,
ulong *rowdata_length)
{
......@@ -584,6 +649,29 @@ int clustrix_connection::add_command_operand_str(const uchar *str,
return 0;
}
/**
* @brief
* Puts variable length string into the buffer.
* @details
* Puts into the buffer variable length string the size
* of which is send by other means. For details see
* MDB Client/Server Protocol.
* @args
* str - string to send
* str_length - size
**/
int clustrix_connection::add_command_operand_vlstr(const uchar *str,
size_t str_length)
{
int error_code = expand_command_buffer(str_length);
if (error_code)
return error_code;
memcpy(command_buffer + command_length, str, str_length);
command_length += str_length;
return 0;
}
int clustrix_connection::add_command_operand_lex_string(LEX_CSTRING str)
{
return add_command_operand_str((const uchar *)str.str, str.length);
......
......@@ -28,6 +28,7 @@ class clustrix_connection
# define COMMAND_BUFFER_SIZE_INCREMENT_BITS 10
MYSQL clustrix_net;
MYSQL_RES *results;
uchar *command_buffer;
size_t command_buffer_length;
size_t command_length;
......@@ -68,6 +69,7 @@ class clustrix_connection
int delete_table(String &stmt);
int rename_table(String &stmt);
int write_row(ulonglong clustrix_table_oid,
uchar *packed_row, size_t packed_size);
int key_delete(ulonglong clustrix_table_oid,
......@@ -82,6 +84,10 @@ class clustrix_connection
ulonglong *scan_refid);
int scan_next(ulonglong scan_refid, uchar **rowdata, ulong *rowdata_length);
int scan_end(ulonglong scan_refid);
int scan_query_init(String &stmt, uchar *fieldtype,
uint fields, uchar *null_bits,
uint null_bits_size, uchar *field_metadata,
uint field_metadata_size, ulonglong *scan_refid);
int populate_table_list(LEX_CSTRING *db, handlerton::discovered_list *result);
int discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, THD *thd,
......@@ -93,6 +99,7 @@ class clustrix_connection
int add_command_operand_ulonglong(ulonglong value);
int add_command_operand_lcb(ulonglong value);
int add_command_operand_str(const uchar *str, size_t length);
int add_command_operand_vlstr(const uchar *str, size_t length);
int add_command_operand_lex_string(LEX_CSTRING str);
int add_command_operand_bitmap(MY_BITMAP *bitmap);
int send_command();
......
......@@ -5,6 +5,7 @@ Copyright (c) 2019, MariaDB Corporation.
/** @file ha_clustrixdb.cc */
#include "ha_clustrixdb.h"
#include "ha_clustrixdb_pushdown.h"
#include "key.h"
handlerton *clustrixdb_hton = NULL;
......@@ -1052,6 +1053,8 @@ int clustrixdb_discover_table(handlerton *hton, THD *thd, TABLE_SHARE *share)
return error_code;
}
#include "ha_clustrixdb_pushdown.cc"
static int clustrixdb_init(void *p)
{
clustrixdb_hton = (handlerton *) p;
......@@ -1065,6 +1068,7 @@ static int clustrixdb_init(void *p)
clustrixdb_hton->show_status = clustrixdb_show_status;
clustrixdb_hton->discover_table_names = clustrixdb_discover_table_names;
clustrixdb_hton->discover_table = clustrixdb_discover_table;
clustrixdb_hton->create_select = create_clustrixdb_select_handler;
return 0;
}
......
/*****************************************************************************
Copyright (c) 2019, MariaDB Corporation.
*****************************************************************************/
#include "ha_clustrixdb_pushdown.h"
/*@brief Fills up array data types, metadata and nullability*/
/************************************************************
* DESCRIPTION:
* Fills up three arrays with: field binlog data types, field
* metadata and nullability bitmask as in Table_map_log_event
* ctor. Internally creates a temporary table as does
* Pushdown_select.
* More details in server/sql/log_event_server.cc
* PARAMETERS:
* thd - THD*
* sl - SELECT_LEX*
* fieldtype - uchar*
* field_metadata - uchar*
* null_bits - uchar*
* num_null_bytes - null bit size
* RETURN:
* metadata_size int or -1 in case of error
************************************************************/
int get_field_types(THD *thd, SELECT_LEX *sl, uchar *fieldtype,
uchar *field_metadata, uchar *null_bits, const int num_null_bytes)
{
int field_metadata_size = 0;
int metadata_index = 0;
// Construct a tmp table with fields to find out result DTs.
// This should be reconsidered if it worths the effort.
List<Item> types;
TMP_TABLE_PARAM tmp_table_param;
sl->master_unit()->join_union_item_types(thd, types, 1);
tmp_table_param.init();
tmp_table_param.field_count= types.elements;
TABLE *tmp_table = create_tmp_table(thd, &tmp_table_param, types,
(ORDER *) 0, false, 0,
TMP_TABLE_ALL_COLUMNS, 1,
&empty_clex_str, true, false);
if (!tmp_table) {
field_metadata_size = -1;
goto err;
}
for (unsigned int i = 0 ; i < tmp_table_param.field_count; ++i) {
fieldtype[i]= tmp_table->field[i]->binlog_type();
}
bzero(field_metadata, (tmp_table_param.field_count * 2));
for (unsigned int i= 0 ; i < tmp_table_param.field_count ; i++)
{
metadata_index+= tmp_table->field[i]->save_field_metadata(&field_metadata[metadata_index]);
}
if (metadata_index < 251)
field_metadata_size += metadata_index + 1;
else
field_metadata_size += metadata_index + 3;
bzero(null_bits, num_null_bytes);
for (unsigned int i= 0 ; i < tmp_table_param.field_count ; ++i) {
if (tmp_table->field[i]->maybe_null()) {
null_bits[(i / 8)]+= 1 << (i % 8);
}
}
free_tmp_table(thd, tmp_table);
err:
return field_metadata_size;
}
/*@brief create_clustrixdb_select_handler- Creates handler*/
/************************************************************
* DESCRIPTION:
* Creates a select handler
* More details in server/sql/select_handler.h
* PARAMETERS:
* thd - THD pointer.
* sel - SELECT_LEX* that describes the query.
* RETURN:
* select_handler if possible
* NULL otherwise
************************************************************/
static select_handler*
create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex)
{
ha_clustrixdb_select_handler *sh = NULL;
String query;
// Print the query into a string provided
select_lex->print(thd, &query, QT_ORDINARY);
int error_code = 0;
clustrix_connection *clustrix_net = NULL;
int field_metadata_size = 0;
ulonglong scan_refid = 0;
// We presume this number is equal to types.elements in get_field_types
uint items_number = select_lex->get_item_list()->elements;
uint num_null_bytes = (items_number + 7) / 8;
uchar *fieldtype = NULL;
uchar *null_bits = NULL;
uchar *field_metadata = NULL;
uchar *meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME), &fieldtype, items_number,
&null_bits, num_null_bytes, &field_metadata, (items_number * 2),
NULL);
if (!meta_memory) {
// The only way to say something here is to raise warning
// b/c we will fallback to other access methods: derived handler or rowstore.
goto err;
}
if((field_metadata_size =
get_field_types(thd, select_lex, fieldtype, field_metadata, null_bits, num_null_bytes)) < 0) {
goto err;
}
// Use buffers filled by get_field_types here.
// WIP reuse the connections
clustrix_net = new clustrix_connection();
error_code = clustrix_net->connect();
if (error_code)
goto err;
if ((error_code = clustrix_net->scan_query_init(query, fieldtype, items_number,
null_bits, num_null_bytes, field_metadata, field_metadata_size, &scan_refid))) {
goto err;
}
sh = new ha_clustrixdb_select_handler(thd, select_lex, clustrix_net);
err:
// reuse the connection
if (!sh)
delete clustrix_net;
// deallocate buffers
if (meta_memory)
my_free(meta_memory);
return sh;
}
/***********************************************************
* DESCRIPTION:
* select_handler constructor
* PARAMETERS:
* thd - THD pointer.
* select_lex - sematic tree for the query.
**********************************************************/
ha_clustrixdb_select_handler::ha_clustrixdb_select_handler(
THD *thd,
SELECT_LEX* select_lex,
clustrix_connection* clustrix_net)
: select_handler(thd, clustrixdb_hton), clustrix_net(clustrix_net)
{
select = select_lex;
}
/***********************************************************
* DESCRIPTION:
* select_handler constructor
**********************************************************/
ha_clustrixdb_select_handler::~ha_clustrixdb_select_handler()
{
if (clustrix_net)
delete clustrix_net;
}
/*@brief Initiate the query for select_handler */
/***********************************************************
* DESCRIPTION:
* Does nothing ATM.
* * PARAMETERS:
* RETURN:
* rc as int
* ********************************************************/
int ha_clustrixdb_select_handler::init_scan()
{
return 0;
}
/*@brief Fetch next row for select_handler */
/***********************************************************
* DESCRIPTION:
* Fetch next row for select_handler.
* PARAMETERS:
* RETURN:
* rc as int
* ********************************************************/
int ha_clustrixdb_select_handler::next_row()
{
int error_code = 0;
THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
assert(is_scan);
assert(scan_refid);
uchar *rowdata;
ulong rowdata_length;
if ((error_code = trx->clustrix_net->scan_next(scan_refid, &rowdata,
&rowdata_length)))
return error_code;
if (has_hidden_key) {
last_hidden_key = *(ulonglong *)rowdata;
rowdata += 8;
rowdata_length -= 8;
}
uchar const *current_row_end;
ulong master_reclength;
error_code = unpack_row(rgi, table, table->s->fields, rowdata,
&scan_fields, &current_row_end,
&master_reclength, rowdata + rowdata_length);
if (error_code)
return error_code;
return 0;
//return HA_ERR_END_OF_FILE;
}
/*@brief Finishes the scan and clean it up */
/***********************************************************
* DESCRIPTION:
* Finishes the scan for select handler
* ATM this function sets vtable_state and restores it
* afterwards since it reuses existed vtable code internally.
* PARAMETERS:
* RETURN:
* rc as int
***********************************************************/
int ha_clustrixdb_select_handler::end_scan()
{
/*
int error_code = 0;
THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
my_bitmap_free(&scan_fields);
if (scan_refid && (error_code = trx->clustrix_net->scan_end(scan_refid)))
return error_code;
scan_refid = 0;
return 0;
*/
return 0;
}
void ha_clustrixdb_select_handler::print_error(int, unsigned long)
{
}
/*****************************************************************************
Copyright (c) 2019, MariaDB Corporation.
*****************************************************************************/
#ifndef _ha_clustrixdb_pushdown_h
#define _ha_clustrixdb_pushdown_h
#include "select_handler.h"
#include "sql_select.h"
/*@brief select_handler class*/
/***********************************************************
* DESCRIPTION:
* select_handler API methods. Could be used by the server
* tp pushdown the whole query described by SELECT_LEX.
* More details in server/sql/select_handler.h
* sel in the constructor is the semantic tree for the query.
* Methods:
* init_scan - get plan and send it to ExeMgr. Get the execution result.
* next_row - get a row back from sm.
* end_scan - finish and clean the things up.
************************************************************/
class ha_clustrixdb_select_handler: public select_handler
{
public:
ha_clustrixdb_select_handler(THD* thd_arg, SELECT_LEX* sel,
clustrix_connection* clustrix_net);
~ha_clustrixdb_select_handler();
int init_scan();
int next_row();
int end_scan();
void print_error(int, unsigned long);
private:
clustrix_connection *clustrix_net;
rpl_group_info *rgi;
ulonglong scan_refid;
bool has_hidden_key;
};
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment