Commit d759acc1 authored by Roman Nozdrin's avatar Roman Nozdrin Committed by Sergei Petrunia

CLX-5 Add infrastructure to unpack RBR rows recieved from CX backend.

    Add a knob to disable Select handler.
parent 51e21d54
...@@ -5,7 +5,6 @@ Copyright (c) 2019, MariaDB Corporation. ...@@ -5,7 +5,6 @@ Copyright (c) 2019, MariaDB Corporation.
/** @file ha_clustrixdb.cc */ /** @file ha_clustrixdb.cc */
#include "ha_clustrixdb.h" #include "ha_clustrixdb.h"
#include "ha_clustrixdb_pushdown.h"
#include "key.h" #include "key.h"
handlerton *clustrixdb_hton = NULL; handlerton *clustrixdb_hton = NULL;
...@@ -90,10 +89,26 @@ static MYSQL_SYSVAR_STR ...@@ -90,10 +89,26 @@ static MYSQL_SYSVAR_STR
NULL, NULL, "" NULL, NULL, ""
); );
// Per thread select handler knob
static MYSQL_THDVAR_BOOL(
enable_sh,
PLUGIN_VAR_NOCMDARG,
"",
NULL,
NULL,
1
);
bool get_enable_sh(THD* thd)
{
return ( thd == NULL ) ? false : THDVAR(thd, enable_sh);
}
/**************************************************************************** /****************************************************************************
** Class ha_clustrixdb_trx ** Class ha_clustrixdb_trx
****************************************************************************/ ****************************************************************************/
st_clustrixdb_trx::st_clustrixdb_trx(THD *trx_thd) st_clustrixdb_trx::st_clustrixdb_trx(THD *trx_thd)
{ {
thd = trx_thd; thd = trx_thd;
...@@ -178,6 +193,27 @@ void decode_objectname(char *buf, const char *path, size_t buf_size) ...@@ -178,6 +193,27 @@ void decode_objectname(char *buf, const char *path, size_t buf_size)
buf[new_path_len] = '\0'; buf[new_path_len] = '\0';
} }
st_clustrixdb_trx *get_trx(THD *thd, int *error_code)
{
*error_code = 0;
st_clustrixdb_trx *trx;
if (!(trx = (st_clustrixdb_trx *)thd_get_ha_data(thd, clustrixdb_hton)))
{
if (!(trx = new st_clustrixdb_trx(thd))) {
*error_code = HA_ERR_OUT_OF_MEM;
return NULL;
}
if ((*error_code = trx->net_init())) {
delete trx;
return NULL;
}
thd_set_ha_data(thd, clustrixdb_hton, trx);
}
return trx;
}
/**************************************************************************** /****************************************************************************
** Class ha_clustrixdb ** Class ha_clustrixdb
****************************************************************************/ ****************************************************************************/
...@@ -322,6 +358,7 @@ int ha_clustrixdb::open(const char *name, int mode, uint test_if_locked) ...@@ -322,6 +358,7 @@ int ha_clustrixdb::open(const char *name, int mode, uint test_if_locked)
if (!clustrix_table_oid) if (!clustrix_table_oid)
clustrix_table_oid = atoll((const char *)table->s->tabledef_version.str); clustrix_table_oid = atoll((const char *)table->s->tabledef_version.str);
// Surrogate key marker
has_hidden_key = table->s->primary_key == MAX_KEY; has_hidden_key = table->s->primary_key == MAX_KEY;
if (has_hidden_key) { if (has_hidden_key) {
ref_length = 8; ref_length = 8;
...@@ -915,28 +952,6 @@ void ha_clustrixdb::remove_current_table_from_rpl_table_list() ...@@ -915,28 +952,6 @@ void ha_clustrixdb::remove_current_table_from_rpl_table_list()
delete rgi; delete rgi;
} }
st_clustrixdb_trx *ha_clustrixdb::get_trx(THD *thd, int *error_code)
{
*error_code = 0;
st_clustrixdb_trx *trx;
if (!(trx = (st_clustrixdb_trx *)thd_get_ha_data(thd, clustrixdb_hton)))
{
if (!(trx = new st_clustrixdb_trx(thd))) {
*error_code = HA_ERR_OUT_OF_MEM;
return NULL;
}
if ((*error_code = trx->net_init())) {
delete trx;
return NULL;
}
thd_set_ha_data(thd, clustrixdb_hton, trx);
}
return trx;
}
void ha_clustrixdb::build_key_packed_row(uint index, uchar *packed_key, void ha_clustrixdb::build_key_packed_row(uint index, uchar *packed_key,
size_t *packed_key_len) size_t *packed_key_len)
{ {
...@@ -1088,6 +1103,7 @@ static struct st_mysql_sys_var* clustrixdb_system_variables[] = ...@@ -1088,6 +1103,7 @@ static struct st_mysql_sys_var* clustrixdb_system_variables[] =
MYSQL_SYSVAR(password), MYSQL_SYSVAR(password),
MYSQL_SYSVAR(port), MYSQL_SYSVAR(port),
MYSQL_SYSVAR(socket), MYSQL_SYSVAR(socket),
MYSQL_SYSVAR(enable_sh),
NULL NULL
}; };
......
...@@ -21,6 +21,9 @@ Copyright (c) 2019, MariaDB Corporation. ...@@ -21,6 +21,9 @@ Copyright (c) 2019, MariaDB Corporation.
#include "../../sql/rpl_record.h" #include "../../sql/rpl_record.h"
size_t estimate_row_size(TABLE *table); size_t estimate_row_size(TABLE *table);
class st_clustrixdb_trx;
st_clustrixdb_trx *get_trx(THD *thd, int *error_code);
bool get_enable_sh(THD* thd);
class ha_clustrixdb; class ha_clustrixdb;
...@@ -117,7 +120,6 @@ class ha_clustrixdb : public handler ...@@ -117,7 +120,6 @@ class ha_clustrixdb : public handler
int info_push(uint info_type, void *info); int info_push(uint info_type, void *info);
private: private:
st_clustrixdb_trx *get_trx(THD *thd, int *error_code);
void add_current_table_to_rpl_table_list(); void add_current_table_to_rpl_table_list();
void remove_current_table_from_rpl_table_list(); void remove_current_table_from_rpl_table_list();
void build_key_packed_row(uint index, uchar *packed_key, void build_key_packed_row(uint index, uchar *packed_key,
......
...@@ -89,6 +89,10 @@ static select_handler* ...@@ -89,6 +89,10 @@ static select_handler*
create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex) create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex)
{ {
ha_clustrixdb_select_handler *sh = NULL; ha_clustrixdb_select_handler *sh = NULL;
if (!get_enable_sh(thd)) {
return sh;
}
String query; String query;
// Print the query into a string provided // Print the query into a string provided
select_lex->print(thd, &query, QT_ORDINARY); select_lex->print(thd, &query, QT_ORDINARY);
...@@ -127,10 +131,10 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex) ...@@ -127,10 +131,10 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex)
if ((error_code = clustrix_net->scan_query_init(query, fieldtype, items_number, if ((error_code = clustrix_net->scan_query_init(query, fieldtype, items_number,
null_bits, num_null_bytes, field_metadata, field_metadata_size, &scan_refid))) { null_bits, num_null_bytes, field_metadata, field_metadata_size, &scan_refid))) {
goto err; //goto err;
} }
sh = new ha_clustrixdb_select_handler(thd, select_lex, clustrix_net); sh = new ha_clustrixdb_select_handler(thd, select_lex, clustrix_net, scan_refid);
err: err:
// reuse the connection // reuse the connection
...@@ -153,10 +157,15 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex) ...@@ -153,10 +157,15 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex)
ha_clustrixdb_select_handler::ha_clustrixdb_select_handler( ha_clustrixdb_select_handler::ha_clustrixdb_select_handler(
THD *thd, THD *thd,
SELECT_LEX* select_lex, SELECT_LEX* select_lex,
clustrix_connection* clustrix_net) clustrix_connection* clustrix_net_,
: select_handler(thd, clustrixdb_hton), clustrix_net(clustrix_net) ulonglong scan_refid_)
: select_handler(thd, clustrixdb_hton), clustrix_net(clustrix_net_),
scan_refid(scan_refid_)
{ {
select = select_lex; select = select_lex;
rli = NULL;
rgi = NULL;
scan_refid = 0;
} }
/*********************************************************** /***********************************************************
...@@ -165,6 +174,9 @@ ha_clustrixdb_select_handler::ha_clustrixdb_select_handler( ...@@ -165,6 +174,9 @@ ha_clustrixdb_select_handler::ha_clustrixdb_select_handler(
**********************************************************/ **********************************************************/
ha_clustrixdb_select_handler::~ha_clustrixdb_select_handler() ha_clustrixdb_select_handler::~ha_clustrixdb_select_handler()
{ {
remove_current_table_from_rpl_table_list();
// WIP reuse the connection
if (clustrix_net) if (clustrix_net)
delete clustrix_net; delete clustrix_net;
} }
...@@ -179,6 +191,14 @@ ha_clustrixdb_select_handler::~ha_clustrixdb_select_handler() ...@@ -179,6 +191,14 @@ ha_clustrixdb_select_handler::~ha_clustrixdb_select_handler()
* ********************************************************/ * ********************************************************/
int ha_clustrixdb_select_handler::init_scan() int ha_clustrixdb_select_handler::init_scan()
{ {
// need this bitmap future in next_row()
// WIP look whether table->read_set->n_bits is valid or not
if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
return ER_OUTOFMEMORY;
bitmap_set_all(&scan_fields);
add_current_table_to_rpl_table_list();
return 0; return 0;
} }
...@@ -193,12 +213,10 @@ int ha_clustrixdb_select_handler::init_scan() ...@@ -193,12 +213,10 @@ int ha_clustrixdb_select_handler::init_scan()
int ha_clustrixdb_select_handler::next_row() int ha_clustrixdb_select_handler::next_row()
{ {
int error_code = 0; int error_code = 0;
THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); st_clustrixdb_trx *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
assert(is_scan);
assert(scan_refid); assert(scan_refid);
uchar *rowdata; uchar *rowdata;
...@@ -207,12 +225,6 @@ int ha_clustrixdb_select_handler::next_row() ...@@ -207,12 +225,6 @@ int ha_clustrixdb_select_handler::next_row()
&rowdata_length))) &rowdata_length)))
return error_code; return error_code;
if (has_hidden_key) {
last_hidden_key = *(ulonglong *)rowdata;
rowdata += 8;
rowdata_length -= 8;
}
uchar const *current_row_end; uchar const *current_row_end;
ulong master_reclength; ulong master_reclength;
...@@ -232,17 +244,13 @@ int ha_clustrixdb_select_handler::next_row() ...@@ -232,17 +244,13 @@ int ha_clustrixdb_select_handler::next_row()
/*********************************************************** /***********************************************************
* DESCRIPTION: * DESCRIPTION:
* Finishes the scan for select handler * Finishes the scan for select handler
* ATM this function sets vtable_state and restores it
* afterwards since it reuses existed vtable code internally.
* PARAMETERS: * PARAMETERS:
* RETURN: * RETURN:
* rc as int * rc as int
***********************************************************/ ***********************************************************/
int ha_clustrixdb_select_handler::end_scan() int ha_clustrixdb_select_handler::end_scan()
{ {
/*
int error_code = 0; int error_code = 0;
THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); st_clustrixdb_trx *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
...@@ -250,13 +258,81 @@ int ha_clustrixdb_select_handler::end_scan() ...@@ -250,13 +258,81 @@ int ha_clustrixdb_select_handler::end_scan()
my_bitmap_free(&scan_fields); my_bitmap_free(&scan_fields);
if (scan_refid && (error_code = trx->clustrix_net->scan_end(scan_refid))) if (scan_refid && (error_code = trx->clustrix_net->scan_end(scan_refid)))
return error_code; return error_code;
scan_refid = 0;
return 0; return error_code;
*/
return 0;
} }
void ha_clustrixdb_select_handler::print_error(int, unsigned long) void ha_clustrixdb_select_handler::print_error(int, unsigned long)
{ {
} }
/*@brief clone of ha_clustrixdb method */
/***********************************************************
* DESCRIPTION:
* Creates structures to unpack RBR rows in ::next_row()
* PARAMETERS:
* RETURN:
* rc as int
***********************************************************/
void ha_clustrixdb_select_handler::add_current_table_to_rpl_table_list()
{
if (rli)
return;
rli = new Relay_log_info(FALSE);
rli->sql_driver_thd = thd;
rgi = new rpl_group_info(rli);
rgi->thd = thd;
rgi->tables_to_lock_count = 0;
rgi->tables_to_lock = NULL;
if (rgi->tables_to_lock_count)
return;
rgi->tables_to_lock = (RPL_TABLE_LIST *)my_malloc(sizeof(RPL_TABLE_LIST),
MYF(MY_WME));
rgi->tables_to_lock->init_one_table(&table->s->db, &table->s->table_name, 0,
TL_READ);
rgi->tables_to_lock->table = table;
rgi->tables_to_lock->table_id = table->tablenr;
rgi->tables_to_lock->m_conv_table = NULL;
rgi->tables_to_lock->master_had_triggers = FALSE;
rgi->tables_to_lock->m_tabledef_valid = TRUE;
// We need one byte per column to save a column's binlog type.
uchar *col_type = (uchar*) my_alloca(table->s->fields);
for (uint i = 0 ; i < table->s->fields ; ++i)
col_type[i] = table->field[i]->binlog_type();
table_def *tabledef = &rgi->tables_to_lock->m_tabledef;
new (tabledef) table_def(col_type, table->s->fields, NULL, 0, NULL, 0);
rgi->tables_to_lock_count++;
if (col_type)
my_afree(col_type);
}
/*@brief clone of ha_clustrixdb method */
/***********************************************************
* DESCRIPTION:
* Deletes structures that are used to unpack RBR rows
* in ::next_row(). Called from dtor
* PARAMETERS:
* RETURN:
* rc as int
***********************************************************/
void ha_clustrixdb_select_handler::remove_current_table_from_rpl_table_list()
{
// the 2nd cond might be unnecessary
if (!rgi || !rgi->tables_to_lock)
return;
rgi->tables_to_lock->m_tabledef.table_def::~table_def();
rgi->tables_to_lock->m_tabledef_valid = FALSE;
my_free(rgi->tables_to_lock);
rgi->tables_to_lock_count--;
rgi->tables_to_lock = NULL;
delete rli;
delete rgi;
}
...@@ -13,17 +13,13 @@ Copyright (c) 2019, MariaDB Corporation. ...@@ -13,17 +13,13 @@ Copyright (c) 2019, MariaDB Corporation.
* select_handler API methods. Could be used by the server * select_handler API methods. Could be used by the server
* tp pushdown the whole query described by SELECT_LEX. * tp pushdown the whole query described by SELECT_LEX.
* More details in server/sql/select_handler.h * More details in server/sql/select_handler.h
* sel in the constructor is the semantic tree for the query. * sel semantic tree for the query in SELECT_LEX.
* Methods:
* init_scan - get plan and send it to ExeMgr. Get the execution result.
* next_row - get a row back from sm.
* end_scan - finish and clean the things up.
************************************************************/ ************************************************************/
class ha_clustrixdb_select_handler: public select_handler class ha_clustrixdb_select_handler: public select_handler
{ {
public: public:
ha_clustrixdb_select_handler(THD* thd_arg, SELECT_LEX* sel, ha_clustrixdb_select_handler(THD* thd_arg, SELECT_LEX* sel,
clustrix_connection* clustrix_net); clustrix_connection* clustrix_net, ulonglong scan_refid);
~ha_clustrixdb_select_handler(); ~ha_clustrixdb_select_handler();
int init_scan(); int init_scan();
...@@ -31,12 +27,15 @@ class ha_clustrixdb_select_handler: public select_handler ...@@ -31,12 +27,15 @@ class ha_clustrixdb_select_handler: public select_handler
int end_scan(); int end_scan();
void print_error(int, unsigned long); void print_error(int, unsigned long);
MY_BITMAP scan_fields;
private: private:
clustrix_connection *clustrix_net; clustrix_connection *clustrix_net;
rpl_group_info *rgi; rpl_group_info *rgi;
Relay_log_info *rli;
RPL_TABLE_LIST *rpl_table_list;
ulonglong scan_refid; ulonglong scan_refid;
bool has_hidden_key; void add_current_table_to_rpl_table_list();
void remove_current_table_from_rpl_table_list();
}; };
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment