Commit 181da131 authored by Sergei Petrunia's avatar Sergei Petrunia

Merge XPand Storage Engine (rebased)

parents e0e5d8c5 cc5f5481
CREATE DATABASE xpd;
USE xpd;
DROP TABLE IF EXISTS cx1, t1, t2;
CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
ERROR 42S01: Table 'cx1' already exists
INSERT INTO cx1 VALUES (42);
SELECT * FROM cx1;
i
42
DROP TABLE cx1;
SHOW CREATE TABLE cx1;
ERROR 42S02: Table 'xpd.cx1' doesn't exist
DROP TABLE IF EXISTS intandtext;
Warnings:
Note 1051 Unknown table 'xpd.intandtext'
CREATE TABLE intandtext(i bigint, t text)ENGINE=xpand;
INSERT INTO intandtext VALUES(10, 'someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq');
SELECT i,t FROM intandtext;
i t
10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq
EXPLAIN SELECT i,t FROM intandtext;
id select_type table type possible_keys key key_len ref rows Extra
1 PUSHED SELECT NULL NULL NULL NULL NULL NULL NULL NULL
SET @@optimizer_switch='derived_merge=OFF';
SET xpand_select_handler=OFF;
SELECT i,t FROM (SELECT i,t FROM intandtext) t;
i t
10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq
EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t;
id select_type table type possible_keys key key_len ref rows Extra
1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000
2 PUSHED DERIVED NULL NULL NULL NULL NULL NULL NULL NULL
SET xpand_derived_handler=OFF;
SELECT i,t FROM intandtext;
i t
10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq
SELECT i,t FROM (SELECT i,t FROM intandtext) t;
i t
10 someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq
EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t;
id select_type table type possible_keys key key_len ref rows Extra
1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000
2 DERIVED intandtext ALL NULL NULL NULL NULL 10000
DROP TABLE intandtext;
set
optimizer_switch=default,
xpand_derived_handler= default,
xpand_select_handler=default;
#
# CLX-77: INSERT ... SELECT returns rows to the client instead of inserting
#
drop table if exists t1,t2;
create table t1 (a int) engine=xpand;
insert into t1 values (1);
select a into @var from t1;
select @var;
@var
1
# This must not emit output to the client:
select a into outfile 'tmpfile1' from t1;
create table t2 (a int) engine=myisam;
insert into t2 select * from t1;
select * from t2;
a
1
drop table t1,t2;
#
# CLX-55: Prepared statement support:
# Implement "Direct Update" by printing the statement
#
create table t1 (a int primary key, b int) engine=xpand;
insert into t1 values (1,1),(2,2),(3,3);
prepare s from 'update t1 set b=b+? where a=?';
execute s using 10000, 2;
select * from t1;
a b
1 1
2 10002
3 3
drop table t1;
#
# CLX-80: ALTER TABLE t ENGINE=CLUSTRIX fails with an error
#
create table t1 (a int) engine=myisam;
insert into t1 values (1),(2),(3);
alter table t1 engine=xpand;
select * from t1;
a
1
2
3
show create table t1;
Table Create Table
t1 CREATE TABLE `t1` (
`a` int(11) DEFAULT NULL
) ENGINE=XPAND DEFAULT CHARSET=utf8
# Try a RENAME TABLE too since the patch touches the code
alter table t1 rename t2;
show create table t2;
Table Create Table
t2 CREATE TABLE `t2` (
`a` int(11) DEFAULT NULL
) ENGINE=XPAND DEFAULT CHARSET=utf8
drop table t2;
USE test;
DROP DATABASE xpd;
CREATE DATABASE xpd;
USE xpd;
--disable_warnings
DROP TABLE IF EXISTS cx1, t1, t2;
--enable_warnings
CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
--error ER_TABLE_EXISTS_ERROR
CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
INSERT INTO cx1 VALUES (42);
SELECT * FROM cx1;
DROP TABLE cx1;
--error ER_NO_SUCH_TABLE
SHOW CREATE TABLE cx1;
DROP TABLE IF EXISTS intandtext;
CREATE TABLE intandtext(i bigint, t text)ENGINE=xpand;
INSERT INTO intandtext VALUES(10, 'someqwqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq');
SELECT i,t FROM intandtext;
EXPLAIN SELECT i,t FROM intandtext;
SET @@optimizer_switch='derived_merge=OFF';
SET xpand_select_handler=OFF;
SELECT i,t FROM (SELECT i,t FROM intandtext) t;
EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t;
SET xpand_derived_handler=OFF;
SELECT i,t FROM intandtext;
SELECT i,t FROM (SELECT i,t FROM intandtext) t;
EXPLAIN SELECT i,t FROM (SELECT i,t FROM intandtext) t;
DROP TABLE intandtext;
set
optimizer_switch=default,
xpand_derived_handler= default,
xpand_select_handler=default;
--echo #
--echo # CLX-77: INSERT ... SELECT returns rows to the client instead of inserting
--echo #
--disable_warnings
drop table if exists t1,t2;
--enable_warnings
create table t1 (a int) engine=xpand;
insert into t1 values (1);
select a into @var from t1;
select @var;
--echo # This must not emit output to the client:
select a into outfile 'tmpfile1' from t1;
let $file=`select concat(@@datadir,'/xpd/tmpfile1')`;
--remove_file $file
create table t2 (a int) engine=myisam;
insert into t2 select * from t1;
select * from t2;
drop table t1,t2;
--echo #
--echo # CLX-55: Prepared statement support:
--echo # Implement "Direct Update" by printing the statement
--echo #
create table t1 (a int primary key, b int) engine=xpand;
insert into t1 values (1,1),(2,2),(3,3);
prepare s from 'update t1 set b=b+? where a=?';
execute s using 10000, 2;
--sorted_result
select * from t1;
drop table t1;
--echo #
--echo # CLX-80: ALTER TABLE t ENGINE=CLUSTRIX fails with an error
--echo #
create table t1 (a int) engine=myisam;
insert into t1 values (1),(2),(3);
alter table t1 engine=xpand;
--sorted_result
select * from t1;
show create table t1;
--echo # Try a RENAME TABLE too since the patch touches the code
alter table t1 rename t2;
show create table t2;
drop table t2;
USE test;
DROP DATABASE xpd;
!include include/default_my.cnf
[mysqld.1]
socket= /tmp/mysqld42.sock
CREATE DATABASE xpd;
USE xpd;
DROP TABLE IF EXISTS cx1;
CREATE TABLE cx1(i BIGINT, i2 BIGINT, t TEXT)ENGINE=xpand;
INSERT INTO cx1 VALUES (41, 43, 'some1'), (42, 42, 'some2'), (43, 41, 'some3');
SELECT * FROM cx1 ORDER BY i;
i i2 t
41 43 some1
42 42 some2
43 41 some3
SET xpand_select_handler=OFF;
SELECT * FROM cx1 WHERE i>41 AND i2>41;
i i2 t
42 42 some2
EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition
SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2';
i i2 t
42 42 some2
EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2';
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition
SELECT * FROM cx1 WHERE i>i2+1;
i i2 t
43 41 some3
EXPLAIN SELECT * FROM cx1 WHERE i>i2+1;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition
SET @@optimizer_switch='derived_merge=OFF';
SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
i i2 t
43 41 some3
EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
id select_type table type possible_keys key key_len ref rows Extra
1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000 Using filesort
2 PUSHED DERIVED NULL NULL NULL NULL NULL NULL NULL NULL
SET xpand_derived_handler=OFF;
SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
i i2 t
43 41 some3
EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
id select_type table type possible_keys key key_len ref rows Extra
1 PRIMARY <derived2> ALL NULL NULL NULL NULL 10000 Using filesort
2 DERIVED cx1 ALL NULL NULL NULL NULL 10000 Using where with pushed condition
USE test;
DROP DATABASE xpd;
CREATE DATABASE xpd;
USE xpd;
--disable_warnings
DROP TABLE IF EXISTS cx1;
--enable_warnings
CREATE TABLE cx1(i BIGINT, i2 BIGINT, t TEXT)ENGINE=xpand;
INSERT INTO cx1 VALUES (41, 43, 'some1'), (42, 42, 'some2'), (43, 41, 'some3');
SELECT * FROM cx1 ORDER BY i;
SET xpand_select_handler=OFF;
SELECT * FROM cx1 WHERE i>41 AND i2>41;
EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41;
SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2';
EXPLAIN SELECT * FROM cx1 WHERE i>41 AND i2>41 AND t='some2';
SELECT * FROM cx1 WHERE i>i2+1;
EXPLAIN SELECT * FROM cx1 WHERE i>i2+1;
# The plugin doesn't use pushdown conditions for DH as of 10.5.1
# but it is worth to test memory leaks.
SET @@optimizer_switch='derived_merge=OFF';
SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
SET xpand_derived_handler=OFF;
SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
EXPLAIN SELECT * FROM (SELECT * FROM cx1 WHERE i>i2+1) a1 ORDER BY i;
# SELECT * FROM (SELECT i FROM cx1 WHERE i=42)a1,(SELECT i FROM cx1 WHERE i =42)a2 WHERE a1.i=a2.i;
# EXPLAIN SELECT * FROM (SELECT i FROM cx1 WHERE i=42)a1,(SELECT i FROM cx1 WHERE i =42)a2 WHERE a1.i=a2.i;
USE test;
DROP DATABASE xpd;
--plugin-load=xpand=ha_xpand.so
--core-file
--xpand_port=3306
--plugin-maturity=unknown
CREATE DATABASE IF NOT EXISTS `db1`;
connect con1,localhost,root,,test;
connection con1;
USE `db1`;
DROP TABLE IF EXISTS `t1`;
CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=xpand;
show create table t1;
Table Create Table
t1 CREATE TABLE `t1` (
`i` bigint(20) DEFAULT NULL,
`t` text DEFAULT NULL
) ENGINE=XPAND DEFAULT CHARSET=utf8
set character_set_client=utf8;
set collation_connection=utf8_bin;
set character_set_results=utf8;
INSERT INTO `t1` (i, t) VALUES (42, 'один');
INSERT INTO `t1` (i, t) VALUES (42, 'ноль');
SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC;
i t
42 один
42 ноль
# examine the data on the backend:
# This should show that the SELECT has been pushed to the backend:
explain
select i, hex(t) from t1;
id select_type table type possible_keys key key_len ref rows Extra
1 PUSHED SELECT NULL NULL NULL NULL NULL NULL NULL NULL
select i, hex(t) from t1;
i hex(t)
42 D0BDD0BED0BBD18C
42 D0BED0B4D0B8D0BD
# The above should match:
select hex('один') as row1, hex('ноль') as row2;
row1 row2
D0BED0B4D0B8D0BD D0BDD0BED0BBD18C
UPDATE `t1` SET i=i+1,t='два' WHERE t='один';
SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC;
i t
43 два
42 ноль
USE test;
UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два';
SELECT * FROM `db1`.`t1` ORDER BY `i` DESC, `t` DESC;
i t
44 три
42 ноль
disconnect con1;
connection default;
DROP TABLE `db1`.`t1`;
USE test;
DROP DATABASE `db1`;
CREATE DATABASE IF NOT EXISTS `db1`;
# Do the test in another connection so that we don't have to clean up
connect (con1,localhost,root,,test);
connection con1;
USE `db1`;
--disable_warnings
DROP TABLE IF EXISTS `t1`;
--enable_warnings
CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=xpand;
show create table t1;
set character_set_client=utf8;
set collation_connection=utf8_bin;
set character_set_results=utf8;
INSERT INTO `t1` (i, t) VALUES (42, 'один');
INSERT INTO `t1` (i, t) VALUES (42, 'ноль');
SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC;
--echo # examine the data on the backend:
--echo # This should show that the SELECT has been pushed to the backend:
explain
select i, hex(t) from t1;
--sorted_result
select i, hex(t) from t1;
--echo # The above should match:
select hex('один') as row1, hex('ноль') as row2;
UPDATE `t1` SET i=i+1,t='два' WHERE t='один';
SELECT * FROM `t1` ORDER BY `i` DESC, `t` DESC;
USE test;
UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два';
SELECT * FROM `db1`.`t1` ORDER BY `i` DESC, `t` DESC;
disconnect con1;
connection default;
DROP TABLE `db1`.`t1`;
USE test;
DROP DATABASE `db1`;
CREATE DATABASE IF NOT EXISTS `db1`;
USE `db1`;
DROP TABLE IF EXISTS `ins_duplicate`;
Warnings:
Note 1051 Unknown table 'db1.ins_duplicate'
CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=xpand;
INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra');
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 Aardvark
2 Cheetah
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope');
ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 Banana
2 Cheetah
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 hybrid
2 hybrid
3 Zebra
BEGIN;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 hybrid
2 hybrid
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope');
ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 Vegetable
2 hybrid
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2';
COMMIT;
BEGIN;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 hybrid2
2 hybrid2
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope');
ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 Vegetable
2 hybrid2
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
ERROR 23000: Can't write; duplicate key in table 'ins_duplicate'
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3';
ROLLBACK;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 hybrid2
2 hybrid2
3 Zebra
DROP TABLE `db1`.`ins_duplicate`;
USE test;
DROP DATABASE `db1`;
CREATE DATABASE IF NOT EXISTS `db1`;
USE `db1`;
DROP TABLE IF EXISTS `ins_duplicate`;
CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=xpand;
INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra');
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_KEY
INSERT INTO ins_duplicate VALUES (1,'Antelope');
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_KEY
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
BEGIN;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_KEY
INSERT INTO ins_duplicate VALUES (1,'Antelope');
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_KEY
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2';
COMMIT;
BEGIN;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_KEY
INSERT INTO ins_duplicate VALUES (1,'Antelope');
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_KEY
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3';
ROLLBACK;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
DROP TABLE `db1`.`ins_duplicate`;
USE test;
DROP DATABASE `db1`;
......@@ -182,6 +182,16 @@ create_federatedx_select_handler(THD* thd, SELECT_LEX *sel)
return 0;
}
/*
Currently, ha_federatedx_select_handler::init_scan just takes the
thd->query and sends it to the backend.
This obviously won't work if the SELECT uses an "INTO @var" or
"INTO OUTFILE". It is also unlikely to work if the select has some
other kind of side effect.
*/
if (sel->uncacheable & UNCACHEABLE_SIDEEFFECT)
return NULL;
/*
Currently, ha_federatedx_select_handler::init_scan just takes the
thd->query and sends it to the backend.
......
#*****************************************************************************
# Copyright (c) 2019, 2020, MariaDB Corporation.
#****************************************************************************/
SET(XPAND_SOURCES ha_xpand.cc xpand_connection.cc ha_xpand_pushdown.cc)
MYSQL_ADD_PLUGIN(xpand ${XPAND_SOURCES} STORAGE_ENGINE COMPONENT xpand-engine)
/*****************************************************************************
Copyright (c) 2019, 2020, MariaDB Corporation.
*****************************************************************************/
/** @file ha_xpand.cc */
#include "ha_xpand.h"
#include "ha_xpand_pushdown.h"
#include "key.h"
#include <strfunc.h> /* strconvert */
#include "my_pthread.h"
handlerton *xpand_hton = NULL;
int xpand_connect_timeout;
static MYSQL_SYSVAR_INT
(
connect_timeout,
xpand_connect_timeout,
PLUGIN_VAR_OPCMDARG,
"Timeout for connecting to Xpand",
NULL, NULL, -1, -1, 2147483647, 0
);
int xpand_read_timeout;
static MYSQL_SYSVAR_INT
(
read_timeout,
xpand_read_timeout,
PLUGIN_VAR_OPCMDARG,
"Timeout for receiving data from Xpand",
NULL, NULL, -1, -1, 2147483647, 0
);
int xpand_write_timeout;
static MYSQL_SYSVAR_INT
(
write_timeout,
xpand_write_timeout,
PLUGIN_VAR_OPCMDARG,
"Timeout for sending data to Xpand",
NULL, NULL, -1, -1, 2147483647, 0
);
//state for load balancing
int xpand_hosts_cur; //protected by my_atomic's
ulong xpand_balance_algorithm;
const char* balance_algorithm_names[]=
{
"first", "round_robin", NullS
};
TYPELIB balance_algorithms=
{
array_elements(balance_algorithm_names) - 1, "",
balance_algorithm_names, NULL
};
static void update_balance_algorithm(MYSQL_THD thd, struct st_mysql_sys_var *var,
void *var_ptr, const void *save)
{
*static_cast<ulong *>(var_ptr) = *static_cast<const ulong *>(save);
my_atomic_store32(&xpand_hosts_cur, 0);
}
static MYSQL_SYSVAR_ENUM
(
balance_algorithm,
xpand_balance_algorithm,
PLUGIN_VAR_OPCMDARG,
"Method for managing load balancing of Clustrix nodes, can take values FIRST or ROUND_ROBIN",
NULL, update_balance_algorithm, XPAND_BALANCE_ROUND_ROBIN, &balance_algorithms
);
//current list of clx hosts
#ifdef HAVE_PSI_INTERFACE
static PSI_rwlock_key key_xpand_hosts;
#endif
mysql_rwlock_t xpand_hosts_lock;
xpand_host_list *xpand_hosts;
static int check_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var,
void *save, struct st_mysql_value *value)
{
DBUG_ENTER("check_hosts");
char b;
int len = 0;
const char *val = value->val_str(value, &b, &len);
if (!val)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
xpand_host_list list;
memset(&list, 0, sizeof(list));
int error_code = 0;
if ((error_code = list.fill(val)))
DBUG_RETURN(error_code);
list.empty();
*static_cast<const char **>(save) = val;
DBUG_RETURN(0);
}
static void update_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var,
void *var_ptr, const void *save)
{
DBUG_ENTER("update_hosts");
const char *from_save = *static_cast<const char * const *>(save);
mysql_rwlock_wrlock(&xpand_hosts_lock);
xpand_host_list *list = static_cast<xpand_host_list*>(
my_malloc(sizeof(xpand_host_list), MYF(MY_WME | MY_ZEROFILL)));
int error_code = list->fill(from_save);
if (error_code) {
my_free(list);
sql_print_error("Unhandled error %d setting xpand hostlist", error_code);
DBUG_VOID_RETURN;
}
xpand_hosts->empty();
my_free(xpand_hosts);
xpand_hosts = list;
char **display_var = static_cast<char**>(var_ptr);
my_free(*display_var);
*display_var = my_strdup(from_save, MYF(MY_WME));
mysql_rwlock_unlock(&xpand_hosts_lock);
DBUG_VOID_RETURN;
}
static char *xpand_hosts_str;
static MYSQL_SYSVAR_STR
(
hosts,
xpand_hosts_str,
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
"List of xpand hostnames seperated by commas, semicolons or spaces",
check_hosts, update_hosts, "localhost"
);
char *xpand_username;
static MYSQL_SYSVAR_STR
(
username,
xpand_username,
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
"Xpand user name",
NULL, NULL, "root"
);
char *xpand_password;
static MYSQL_SYSVAR_STR
(
password,
xpand_password,
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
"Xpand password",
NULL, NULL, ""
);
uint xpand_port;
static MYSQL_SYSVAR_UINT
(
port,
xpand_port,
PLUGIN_VAR_RQCMDARG,
"Xpand port",
NULL, NULL, MYSQL_PORT_DEFAULT, MYSQL_PORT_DEFAULT, 65535, 0
);
char *xpand_socket;
static MYSQL_SYSVAR_STR
(
socket,
xpand_socket,
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
"Xpand socket",
NULL, NULL, ""
);
static MYSQL_THDVAR_UINT
(
row_buffer,
PLUGIN_VAR_RQCMDARG,
"Xpand rowstore row buffer size",
NULL, NULL, 20, 1, 65535, 0
);
// Per thread select handler knob
static MYSQL_THDVAR_BOOL(
select_handler,
PLUGIN_VAR_NOCMDARG,
"",
NULL,
NULL,
1
);
// Per thread derived handler knob
static MYSQL_THDVAR_BOOL(
derived_handler,
PLUGIN_VAR_NOCMDARG,
"",
NULL,
NULL,
1
);
static MYSQL_THDVAR_BOOL(
enable_direct_update,
PLUGIN_VAR_NOCMDARG,
"",
NULL,
NULL,
1
);
bool select_handler_setting(THD* thd)
{
return ( thd == NULL ) ? false : THDVAR(thd, select_handler);
}
bool derived_handler_setting(THD* thd)
{
return ( thd == NULL ) ? false : THDVAR(thd, derived_handler);
}
uint row_buffer_setting(THD* thd)
{
return THDVAR(thd, row_buffer);
}
/*
Get an Xpand_share object for this object. If it doesn't yet exist, create
it.
*/
Xpand_share *ha_xpand::get_share()
{
Xpand_share *tmp_share;
DBUG_ENTER("ha_xpand::get_share()");
lock_shared_ha_data();
if (!(tmp_share= static_cast<Xpand_share*>(get_ha_share_ptr())))
{
tmp_share= new Xpand_share;
if (!tmp_share)
goto err;
set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
}
err:
unlock_shared_ha_data();
DBUG_RETURN(tmp_share);
}
/****************************************************************************
** Utility functions
****************************************************************************/
// This is a wastefull aproach but better then fixed sized buffer.
size_t estimate_row_size(TABLE *table)
{
size_t row_size = 0;
size_t null_byte_count = (bitmap_bits_set(table->write_set) + 7) / 8;
row_size += null_byte_count;
Field **p_field= table->field, *field;
for ( ; (field= *p_field) ; p_field++) {
row_size += field->max_data_length();
}
return row_size;
}
/*
Try to decode a string from filename encoding, if that fails, return the
original string.
@detail
This is used to get table (or database) name from file (or directory)
name. Names of regular tables/databases are encoded using
my_charset_filename encoding.
Names of temporary tables are not encoded, and they start with '#sql'
which is not a valid character sequence in my_charset_filename encoding.
Our way to talkle this is to
1. Try to convert the name back
2. If that failed, assume it's a temporary object name and just use the
name.
*/
static void decode_object_or_tmp_name(const char *from, uint size,
std::string *out)
{
uint errors, new_size;
out->resize(size+1); // assume the decoded string is not longer
new_size= strconvert(&my_charset_filename, from, size,
system_charset_info, (char*)out->c_str(), size+1,
&errors);
if (errors)
out->assign(from, size);
else
out->resize(new_size);
}
/*
Take a "./db_name/table_name" and extract db_name and table_name from it
@return
0 OK
other Error code
*/
static int normalize_tablename(const char *db_table,
std::string *norm_db, std::string *norm_table)
{
std::string tablename(db_table);
if (tablename.size() < 2 || tablename[0] != '.' ||
(tablename[1] != FN_LIBCHAR && tablename[1] != FN_LIBCHAR2)) {
DBUG_ASSERT(0); // We were not passed table name?
return HA_ERR_INTERNAL_ERROR;
}
size_t pos = tablename.find_first_of(FN_LIBCHAR, 2);
if (pos == std::string::npos) {
pos = tablename.find_first_of(FN_LIBCHAR2, 2);
}
if (pos == std::string::npos) {
DBUG_ASSERT(0); // We were not passed table name?
return HA_ERR_INTERNAL_ERROR;
}
decode_object_or_tmp_name(tablename.c_str() + 2, pos - 2, norm_db);
decode_object_or_tmp_name(tablename.c_str() + pos + 1,
tablename.size() - (pos + 1), norm_table);
return 0;
}
xpand_connection *get_trx(THD *thd, int *error_code)
{
*error_code = 0;
xpand_connection *trx;
if (!(trx = (xpand_connection *)thd_get_ha_data(thd, xpand_hton)))
{
if (!(trx = new xpand_connection())) {
*error_code = HA_ERR_OUT_OF_MEM;
return NULL;
}
*error_code = trx->connect();
if (*error_code) {
delete trx;
return NULL;
}
thd_set_ha_data(thd, xpand_hton, trx);
}
return trx;
}
/****************************************************************************
** Class ha_xpand
****************************************************************************/
ha_xpand::ha_xpand(handlerton *hton, TABLE_SHARE *table_arg)
: handler(hton, table_arg)
{
DBUG_ENTER("ha_xpand::ha_xpand");
rgi = NULL;
scan_cur = NULL;
xpand_table_oid = 0;
upsert_flag = 0;
DBUG_VOID_RETURN;
}
ha_xpand::~ha_xpand()
{
if (rgi)
remove_current_table_from_rpl_table_list(rgi);
}
int ha_xpand::create(const char *name, TABLE *form, HA_CREATE_INFO *info)
{
int error_code;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
enum tmp_table_type saved_tmp_table_type = form->s->tmp_table;
Table_specification_st *create_info = &thd->lex->create_info;
const bool is_tmp_table = info->options & HA_LEX_CREATE_TMP_TABLE;
String create_table_stmt;
/* Create a copy of the CREATE TABLE statement */
if (!is_tmp_table)
form->s->tmp_table = NO_TMP_TABLE;
const char *old_dbstr = thd->db.str;
thd->db.str = NULL;
ulong old = create_info->used_fields;
create_info->used_fields &= ~HA_CREATE_USED_ENGINE;
std::string norm_db, norm_table;
if ((error_code= normalize_tablename(name, &norm_db, &norm_table)))
return error_code;
TABLE_LIST table_list;
memset(&table_list, 0, sizeof(table_list));
table_list.table = form;
error_code = show_create_table_ex(thd, &table_list,
norm_db.c_str(), norm_table.c_str(),
&create_table_stmt, create_info, WITH_DB_NAME);
if (!is_tmp_table)
form->s->tmp_table = saved_tmp_table_type;
create_info->used_fields = old;
thd->db.str = old_dbstr;
if (error_code)
return error_code;
// To syncronize the schemas of MDB FE and XPD BE.
if (form->s && form->s->db.length) {
String createdb_stmt;
createdb_stmt.append("CREATE DATABASE IF NOT EXISTS `");
createdb_stmt.append(form->s->db.str, form->s->db.length);
createdb_stmt.append("`");
trx->run_query(createdb_stmt);
}
return trx->run_query(create_table_stmt);
}
int ha_xpand::delete_table(const char *path)
{
int error_code;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
std::string decoded_dbname;
std::string decoded_tbname;
if ((error_code= normalize_tablename(path, &decoded_dbname,
&decoded_tbname)))
return error_code;
String delete_cmd;
delete_cmd.append("DROP TABLE `");
delete_cmd.append(decoded_dbname.c_str());
delete_cmd.append("`.`");
delete_cmd.append(decoded_tbname.c_str());
delete_cmd.append("`");
return trx->run_query(delete_cmd);
}
int ha_xpand::rename_table(const char* from, const char* to)
{
int error_code;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
std::string decoded_from_dbname;
std::string decoded_from_tbname;
if ((error_code= normalize_tablename(from, &decoded_from_dbname,
&decoded_from_tbname)))
return error_code;
std::string decoded_to_dbname;
std::string decoded_to_tbname;
if ((error_code= normalize_tablename(to, &decoded_to_dbname,
&decoded_to_tbname)))
return error_code;
String rename_cmd;
rename_cmd.append("RENAME TABLE `");
rename_cmd.append(decoded_from_dbname.c_str());
rename_cmd.append("`.`");
rename_cmd.append(decoded_from_tbname.c_str());
rename_cmd.append("` TO `");
rename_cmd.append(decoded_to_dbname.c_str());
rename_cmd.append("`.`");
rename_cmd.append(decoded_to_tbname.c_str());
rename_cmd.append("`;");
return trx->run_query(rename_cmd);
}
static void
xpand_mark_table_for_discovery(TABLE *table)
{
table->m_needs_reopen = true;
Xpand_share *xs;
if ((xs= static_cast<Xpand_share*>(table->s->ha_share)))
xs->rediscover_table = true;
}
void
xpand_mark_tables_for_discovery(LEX *lex)
{
for (TABLE_LIST *tbl= lex->query_tables; tbl; tbl= tbl->next_global)
if (tbl->table && tbl->table->file->ht == xpand_hton)
xpand_mark_table_for_discovery(tbl->table);
}
ulonglong *
xpand_extract_table_oids(THD *thd, LEX *lex)
{
int cnt = 1;
for (TABLE_LIST *tbl = lex->query_tables; tbl; tbl= tbl->next_global)
if (tbl->table && tbl->table->file->ht == xpand_hton)
cnt++;
ulonglong *oids = (ulonglong*)thd_alloc(thd, cnt * sizeof(ulonglong));
ulonglong *ptr = oids;
for (TABLE_LIST *tbl = lex->query_tables; tbl; tbl= tbl->next_global)
{
if (tbl->table && tbl->table->file->ht == xpand_hton)
{
ha_xpand *hndlr = static_cast<ha_xpand *>(tbl->table->file);
*ptr++ = hndlr->get_table_oid();
}
}
*ptr = 0;
return oids;
}
int ha_xpand::open(const char *name, int mode, uint test_if_locked)
{
THD *thd= ha_thd();
DBUG_ENTER("ha_xpand::open");
Xpand_share *share;
if (!(share = get_share()))
DBUG_RETURN(1);
int error_code;
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
DBUG_RETURN(error_code);
if (share->rediscover_table)
DBUG_RETURN(HA_ERR_TABLE_DEF_CHANGED);
if (!share->xpand_table_oid) {
// We may end up with two threads executing this piece concurrently but
// it's ok
std::string norm_table;
std::string norm_db;
if ((error_code= normalize_tablename(name, &norm_db, &norm_table)))
DBUG_RETURN(error_code);
ulonglong oid = 0;
error_code= trx->get_table_oid(norm_db.c_str(), strlen(norm_db.c_str()),
norm_table.c_str(),
strlen(norm_table.c_str()), &oid,
table_share);
if (error_code)
DBUG_RETURN(error_code);
share->xpand_table_oid = oid;
}
xpand_table_oid = share->xpand_table_oid;
// Surrogate key marker
has_hidden_key = table->s->primary_key == MAX_KEY;
if (has_hidden_key) {
ref_length = 8;
} else {
KEY* key_info = table->key_info + table->s->primary_key;
ref_length = key_info->key_length;
}
DBUG_PRINT("open finished",
("oid: %llu, ref_length: %u", xpand_table_oid, ref_length));
DBUG_RETURN(0);
}
int ha_xpand::close(void)
{
return 0;
}
int ha_xpand::reset()
{
upsert_flag &= ~XPAND_BULK_UPSERT;
upsert_flag &= ~XPAND_HAS_UPSERT;
upsert_flag &= ~XPAND_UPSERT_SENT;
xpd_lock_type = XPAND_NO_LOCKS;
pushdown_cond_list.empty();
return 0;
}
int ha_xpand::extra(enum ha_extra_function operation)
{
DBUG_ENTER("ha_xpand::extra");
if (operation == HA_EXTRA_INSERT_WITH_UPDATE)
upsert_flag |= XPAND_HAS_UPSERT;
DBUG_RETURN(0);
}
/*@brief UPSERT State Machine*/
/*************************************************************
* DESCRIPTION:
* Fasttrack for UPSERT sends queries down to a XPD backend.
* UPSERT could be of two kinds: singular and bulk. The plugin
* re-/sets XPAND_BULK_UPSERT in end|start_bulk_insert
* methods. XPAND_UPSERT_SENT is used to avoid multiple
* execution at XPD backend.
* Generic XPAND_HAS_UPSERT is set for bulk UPSERT only b/c
* MDB calls write_row only once.
************************************************************/
int ha_xpand::write_row(const uchar *buf)
{
int error_code = 0;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
if (upsert_flag & XPAND_HAS_UPSERT) {
if (!(upsert_flag & XPAND_UPSERT_SENT)) {
ha_rows update_rows;
String update_stmt;
update_stmt.append(thd->query_string.str());
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trx->auto_commit_next();
ulonglong *oids = xpand_extract_table_oids(thd, thd->lex);
error_code= trx->update_query(update_stmt, table->s->db, oids,
&update_rows);
if (upsert_flag & XPAND_BULK_UPSERT)
upsert_flag |= XPAND_UPSERT_SENT;
else
upsert_flag &= ~XPAND_HAS_UPSERT;
}
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_tables_for_discovery(thd->lex);
return error_code;
}
/* Convert the row format to binlog (packed) format */
uchar *packed_new_row = (uchar*) my_alloca(estimate_row_size(table));
size_t packed_size = pack_row(table, table->write_set, packed_new_row, buf);
/* XXX: Xpand may needs to return HA_ERR_AUTOINC_ERANGE if we hit that
error. */
ulonglong last_insert_id = 0;
if ((error_code = trx->write_row(xpand_table_oid, packed_new_row, packed_size,
&last_insert_id)))
goto err;
if (table->next_number_field)
insert_id_for_cur_row = last_insert_id;
err:
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_table_for_discovery(table);
if (packed_size)
my_afree(packed_new_row);
return error_code;
}
int ha_xpand::update_row(const uchar *old_data, const uchar *new_data)
{
DBUG_ENTER("ha_xpand::update_row");
int error_code;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
DBUG_RETURN(error_code);
size_t row_size = estimate_row_size(table);
size_t packed_key_len;
uchar *packed_key = (uchar*) my_alloca(row_size);
build_key_packed_row(table->s->primary_key, old_data,
packed_key, &packed_key_len);
uchar *packed_new_row = (uchar*) my_alloca(row_size);
size_t packed_new_size = pack_row(table, table->write_set, packed_new_row,
new_data);
/* Send the packed rows to Xpand */
error_code = trx->key_update(xpand_table_oid, packed_key, packed_key_len,
table->write_set,
packed_new_row, packed_new_size);
if(packed_key)
my_afree(packed_key);
if(packed_new_row)
my_afree(packed_new_row);
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_table_for_discovery(table);
DBUG_RETURN(error_code);
}
int ha_xpand::direct_update_rows_init(List<Item> *update_fields)
{
DBUG_ENTER("ha_xpand::direct_update_rows_init");
THD *thd= ha_thd();
if (!THDVAR(thd, enable_direct_update))
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
DBUG_RETURN(0);
}
int ha_xpand::direct_update_rows(ha_rows *update_rows, ha_rows *found_rows)
{
DBUG_ENTER("ha_xpand::direct_update_rows");
int error_code= 0;
THD *thd= ha_thd();
xpand_connection *trx= get_trx(thd, &error_code);
if (!trx)
return error_code;
String update_stmt;
// Do the same as create_xpand_select_handler does:
thd->lex->print(&update_stmt, QT_ORDINARY);
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trx->auto_commit_next();
ulonglong *oids = xpand_extract_table_oids(thd, thd->lex);
error_code = trx->update_query(update_stmt, table->s->db, oids, update_rows);
*found_rows = *update_rows;
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_tables_for_discovery(thd->lex);
DBUG_RETURN(error_code);
}
void ha_xpand::start_bulk_insert(ha_rows rows, uint flags)
{
DBUG_ENTER("ha_xpand::start_bulk_insert");
int error_code= 0;
THD *thd= ha_thd();
xpand_connection *trx= get_trx(thd, &error_code);
if (!trx) {
// TBD log this
DBUG_VOID_RETURN;
}
upsert_flag |= XPAND_BULK_UPSERT;
DBUG_VOID_RETURN;
}
int ha_xpand::end_bulk_insert()
{
DBUG_ENTER("ha_xpand::end_bulk_insert");
upsert_flag &= ~XPAND_BULK_UPSERT;
upsert_flag &= ~XPAND_HAS_UPSERT;
upsert_flag &= ~XPAND_UPSERT_SENT;
DBUG_RETURN(0);
}
int ha_xpand::delete_row(const uchar *buf)
{
int error_code;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
// The estimate should consider only key fields widths.
size_t packed_key_len;
uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table));
build_key_packed_row(table->s->primary_key, buf, packed_key, &packed_key_len);
error_code = trx->key_delete(xpand_table_oid, packed_key, packed_key_len);
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_table_for_discovery(table);
if (packed_key)
my_afree(packed_key);
return error_code;
}
ha_xpand::Table_flags ha_xpand::table_flags(void) const
{
Table_flags flags = HA_PARTIAL_COLUMN_READ |
HA_REC_NOT_IN_SEQ |
HA_FAST_KEY_READ |
HA_NULL_IN_KEY |
HA_CAN_INDEX_BLOBS |
HA_AUTO_PART_KEY |
HA_CAN_SQL_HANDLER |
HA_BINLOG_STMT_CAPABLE |
HA_CAN_TABLE_CONDITION_PUSHDOWN |
HA_CAN_DIRECT_UPDATE_AND_DELETE;
return flags;
}
ulong ha_xpand::index_flags(uint idx, uint part, bool all_parts) const
{
ulong flags = HA_READ_NEXT |
HA_READ_PREV |
HA_READ_ORDER |
HA_READ_RANGE;
return flags;
}
ha_rows ha_xpand::records()
{
return 10000;
}
ha_rows ha_xpand::records_in_range(uint inx, key_range *min_key,
key_range *max_key)
{
return 2;
}
int ha_xpand::info(uint flag)
{
//THD *thd = ha_thd();
if (flag & HA_STATUS_TIME)
{
/* Retrieve the time of the most recent update to the table */
// stats.update_time =
}
if (flag & HA_STATUS_AUTO)
{
/* Retrieve the latest auto_increment value */
stats.auto_increment_value = next_insert_id;
}
if (flag & HA_STATUS_VARIABLE)
{
/* Retrieve variable info, such as row counts and file lengths */
stats.records = records();
stats.deleted = 0;
// stats.data_file_length =
// stats.index_file_length =
// stats.delete_length =
stats.check_time = 0;
// stats.mrr_length_per_rec =
if (stats.records == 0)
stats.mean_rec_length = 0;
else
stats.mean_rec_length = (ulong) (stats.data_file_length / stats.records);
}
if (flag & HA_STATUS_CONST)
{
/*
Retrieve constant info, such as file names, max file lengths,
create time, block size
*/
// stats.max_data_file_length =
// stats.create_time =
// stats.block_size =
}
return 0;
}
int ha_xpand::index_init(uint idx, bool sorted)
{
int error_code = 0;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
active_index = idx;
add_current_table_to_rpl_table_list(&rgi, thd, table);
scan_cur = NULL;
/* Return all columns until there is a better understanding of
requirements. */
if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
return ER_OUTOFMEMORY;
bitmap_set_all(&scan_fields);
sorted_scan = sorted;
return 0;
}
int ha_xpand::index_read(uchar * buf, const uchar * key, uint key_len,
enum ha_rkey_function find_flag)
{
DBUG_ENTER("ha_xpand::index_read");
int error_code = 0;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
DBUG_RETURN(error_code);
key_restore(buf, key, &table->key_info[active_index], key_len);
// The estimate should consider only key fields widths.
size_t packed_key_len;
uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table));
build_key_packed_row(active_index, buf, packed_key, &packed_key_len);
bool exact = false;
xpand_connection::scan_type st;
switch (find_flag) {
case HA_READ_KEY_EXACT:
exact = true;
break;
case HA_READ_KEY_OR_NEXT:
st = xpand_connection::READ_KEY_OR_NEXT;
break;
case HA_READ_KEY_OR_PREV:
st = xpand_connection::READ_KEY_OR_PREV;
break;
case HA_READ_AFTER_KEY:
st = xpand_connection::READ_AFTER_KEY;
break;
case HA_READ_BEFORE_KEY:
st = xpand_connection::READ_BEFORE_KEY;
break;
case HA_READ_PREFIX:
case HA_READ_PREFIX_LAST:
case HA_READ_PREFIX_LAST_OR_PREV:
case HA_READ_MBR_CONTAIN:
case HA_READ_MBR_INTERSECT:
case HA_READ_MBR_WITHIN:
case HA_READ_MBR_DISJOINT:
case HA_READ_MBR_EQUAL:
DBUG_RETURN(ER_NOT_SUPPORTED_YET);
}
uchar *rowdata = NULL;
if (exact) {
is_scan = false;
ulonglong rowdata_length;
error_code = trx->key_read(xpand_table_oid, 0, xpd_lock_type,
table->read_set, packed_key, packed_key_len,
&rowdata, &rowdata_length);
if (!error_code)
error_code = unpack_row_to_buf(rgi, table, buf, rowdata, table->read_set,
rowdata + rowdata_length);
} else {
is_scan = true;
error_code = trx->scan_from_key(xpand_table_oid, active_index,
xpd_lock_type, st, -1, sorted_scan,
&scan_fields, packed_key, packed_key_len,
THDVAR(thd, row_buffer), &scan_cur);
if (!error_code)
error_code = rnd_next(buf);
}
if (rowdata)
my_free(rowdata);
if (packed_key)
my_afree(packed_key);
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_table_for_discovery(table);
DBUG_RETURN(error_code);
}
int ha_xpand::index_first(uchar *buf)
{
DBUG_ENTER("ha_xpand::index_first");
int error_code = 0;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
DBUG_RETURN(error_code);
error_code = trx->scan_from_key(xpand_table_oid, active_index, xpd_lock_type,
xpand_connection::READ_FROM_START, -1,
sorted_scan, &scan_fields, NULL, 0,
THDVAR(thd, row_buffer), &scan_cur);
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_table_for_discovery(table);
if (error_code)
DBUG_RETURN(error_code);
DBUG_RETURN(rnd_next(buf));
}
int ha_xpand::index_last(uchar *buf)
{
DBUG_ENTER("ha_xpand::index_last");
int error_code = 0;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
DBUG_RETURN(error_code);
error_code = trx->scan_from_key(xpand_table_oid, active_index, xpd_lock_type,
xpand_connection::READ_FROM_LAST, -1,
sorted_scan, &scan_fields, NULL, 0,
THDVAR(thd, row_buffer), &scan_cur);
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_table_for_discovery(table);
if (error_code)
DBUG_RETURN(error_code);
DBUG_RETURN(rnd_next(buf));
}
int ha_xpand::index_next(uchar *buf)
{
DBUG_ENTER("index_next");
DBUG_RETURN(rnd_next(buf));
}
#if 0
int ha_xpand::index_next_same(uchar *buf, const uchar *key, uint keylen)
{
DBUG_ENTER("index_next_same");
DBUG_RETURN(rnd_next(buf));
}
#endif
int ha_xpand::index_prev(uchar *buf)
{
DBUG_ENTER("index_prev");
DBUG_RETURN(rnd_next(buf));
}
int ha_xpand::index_end()
{
DBUG_ENTER("index_prev");
if (scan_cur)
DBUG_RETURN(rnd_end());
else
{
my_bitmap_free(&scan_fields);
DBUG_RETURN(0);
}
}
int ha_xpand::rnd_init(bool scan)
{
DBUG_ENTER("ha_xpand::rnd_init");
int error_code = 0;
THD *thd = ha_thd();
if (thd->lex->sql_command == SQLCOM_UPDATE)
DBUG_RETURN(error_code);
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
DBUG_RETURN(error_code);
add_current_table_to_rpl_table_list(&rgi, thd, table);
is_scan = scan;
scan_cur = NULL;
if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
DBUG_RETURN(ER_OUTOFMEMORY);
#if 0
if (table->s->keys)
table->mark_columns_used_by_index(table->s->primary_key, &scan_fields);
else
bitmap_clear_all(&scan_fields);
bitmap_union(&scan_fields, table->read_set);
#else
/* Why is read_set not setup correctly? */
bitmap_set_all(&scan_fields);
#endif
String* pushdown_cond_sql = nullptr;
if (pushdown_cond_list.elements) {
pushdown_cond_sql = new String();
while (pushdown_cond_list.elements > 0) {
COND* cond = pushdown_cond_list.pop();
String sql_predicate;
cond->print_for_table_def(&sql_predicate);
pushdown_cond_sql->append(sql_predicate);
if ( pushdown_cond_list.elements > 0)
pushdown_cond_sql->append(" AND ");
}
}
error_code = trx->scan_table(xpand_table_oid, xpd_lock_type, &scan_fields,
THDVAR(thd, row_buffer), &scan_cur,
pushdown_cond_sql);
if (pushdown_cond_sql != nullptr)
delete pushdown_cond_sql;
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_table_for_discovery(table);
if (error_code)
DBUG_RETURN(error_code);
DBUG_RETURN(0);
}
int ha_xpand::rnd_next(uchar *buf)
{
int error_code = 0;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
assert(is_scan);
assert(scan_cur);
uchar *rowdata;
ulong rowdata_length;
if ((error_code = trx->scan_next(scan_cur, &rowdata, &rowdata_length)))
return error_code;
if (has_hidden_key) {
last_hidden_key = *(ulonglong *)rowdata;
rowdata += 8;
rowdata_length -= 8;
}
error_code = unpack_row_to_buf(rgi, table, buf, rowdata, &scan_fields,
rowdata + rowdata_length);
if (error_code)
return error_code;
return 0;
}
int ha_xpand::rnd_pos(uchar * buf, uchar *pos)
{
DBUG_ENTER("xpd_rnd_pos");
DBUG_DUMP("pos", pos, ref_length);
int error_code = 0;
THD *thd = ha_thd();
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
DBUG_RETURN(error_code);
/* WDD: We need a way to convert key buffers directy to rbr buffers. */
if (has_hidden_key) {
memcpy(&last_hidden_key, pos, sizeof(ulonglong));
} else {
uint keyno = table->s->primary_key;
uint len = calculate_key_len(table, keyno, pos,
table->const_key_parts[keyno]);
key_restore(buf, pos, &table->key_info[keyno], len);
}
// The estimate should consider only key fields widths.
uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table));
size_t packed_key_len;
build_key_packed_row(table->s->primary_key, buf, packed_key, &packed_key_len);
uchar *rowdata = NULL;
ulonglong rowdata_length;
if ((error_code = trx->key_read(xpand_table_oid, 0, xpd_lock_type,
table->read_set, packed_key, packed_key_len,
&rowdata, &rowdata_length)))
goto err;
if ((error_code = unpack_row_to_buf(rgi, table, buf, rowdata, table->read_set,
rowdata + rowdata_length)))
goto err;
err:
if (rowdata)
my_free(rowdata);
if (packed_key)
my_afree(packed_key);
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_table_for_discovery(table);
DBUG_RETURN(error_code);
}
int ha_xpand::rnd_end()
{
DBUG_ENTER("ha_xpand::rnd_end");
int error_code = 0;
THD *thd = ha_thd();
if (thd->lex->sql_command == SQLCOM_UPDATE)
DBUG_RETURN(error_code);
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
DBUG_RETURN(error_code);
my_bitmap_free(&scan_fields);
if (scan_cur && (error_code = trx->scan_end(scan_cur)))
DBUG_RETURN(error_code);
scan_cur = NULL;
DBUG_RETURN(0);
}
void ha_xpand::position(const uchar *record)
{
DBUG_ENTER("xpd_position");
if (has_hidden_key) {
memcpy(ref, &last_hidden_key, sizeof(ulonglong));
} else {
KEY* key_info = table->key_info + table->s->primary_key;
key_copy(ref, record, key_info, key_info->key_length);
}
DBUG_DUMP("key", ref, ref_length);
DBUG_VOID_RETURN;
}
uint ha_xpand::lock_count(void) const
{
/* Hopefully, we don't need to use thread locks */
return 0;
}
THR_LOCK_DATA **ha_xpand::store_lock(THD *thd, THR_LOCK_DATA **to,
enum thr_lock_type lock_type)
{
/* Hopefully, we don't need to use thread locks */
return to;
}
int ha_xpand::external_lock(THD *thd, int lock_type)
{
DBUG_ENTER("ha_xpand::external_lock()");
int error_code;
xpand_connection *trx = get_trx(thd, &error_code);
if (error_code)
DBUG_RETURN(error_code);
if (lock_type == F_WRLCK)
xpd_lock_type = XPAND_EXCLUSIVE;
else if (lock_type == F_RDLCK)
xpd_lock_type = XPAND_SHARED;
else if (lock_type == F_UNLCK)
xpd_lock_type = XPAND_NO_LOCKS;
if (lock_type != F_UNLCK) {
if (!trx->has_open_transaction()) {
error_code = trx->begin_transaction_next();
if (error_code)
DBUG_RETURN(error_code);
}
trans_register_ha(thd, FALSE, xpand_hton);
if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trans_register_ha(thd, TRUE, xpand_hton);
}
DBUG_RETURN(error_code);
}
/****************************************************************************
Engine Condition Pushdown
****************************************************************************/
const COND *ha_xpand::cond_push(const COND *cond)
{
THD *thd= ha_thd();
if (!thd->lex->describe) {
pushdown_cond_list.push_front(const_cast<COND*>(cond));
}
return NULL;
}
void ha_xpand::cond_pop()
{
pushdown_cond_list.pop();
}
int ha_xpand::info_push(uint info_type, void *info)
{
return 0;
}
ulonglong ha_xpand::get_table_oid()
{
return xpand_table_oid;
}
/****************************************************************************
** Row encoding functions
****************************************************************************/
void add_current_table_to_rpl_table_list(rpl_group_info **_rgi, THD *thd,
TABLE *table)
{
if (*_rgi)
return;
Relay_log_info *rli = new Relay_log_info(FALSE);
rli->sql_driver_thd = thd;
rpl_group_info *rgi = new rpl_group_info(rli);
*_rgi = rgi;
rgi->thd = thd;
rgi->tables_to_lock_count = 0;
rgi->tables_to_lock = NULL;
if (rgi->tables_to_lock_count)
return;
rgi->tables_to_lock = (RPL_TABLE_LIST *)my_malloc(sizeof(RPL_TABLE_LIST),
MYF(MY_WME));
rgi->tables_to_lock->init_one_table(&table->s->db, &table->s->table_name, 0,
TL_READ);
rgi->tables_to_lock->table = table;
rgi->tables_to_lock->table_id = table->tablenr;
rgi->tables_to_lock->m_conv_table = NULL;
rgi->tables_to_lock->master_had_triggers = FALSE;
rgi->tables_to_lock->m_tabledef_valid = TRUE;
// We need one byte per column to save a column's binlog type.
uchar *col_type = (uchar*) my_alloca(table->s->fields);
for (uint i = 0 ; i < table->s->fields ; ++i)
col_type[i] = table->field[i]->binlog_type();
table_def *tabledef = &rgi->tables_to_lock->m_tabledef;
new (tabledef) table_def(col_type, table->s->fields, NULL, 0, NULL, 0);
rgi->tables_to_lock_count++;
if (col_type)
my_afree(col_type);
}
void remove_current_table_from_rpl_table_list(rpl_group_info *rgi)
{
if (!rgi->tables_to_lock)
return;
rgi->tables_to_lock->m_tabledef.table_def::~table_def();
rgi->tables_to_lock->m_tabledef_valid = FALSE;
my_free(rgi->tables_to_lock);
rgi->tables_to_lock_count--;
rgi->tables_to_lock = NULL;
delete rgi->rli;
delete rgi;
}
void ha_xpand::build_key_packed_row(uint index, const uchar *buf,
uchar *packed_key, size_t *packed_key_len)
{
if (index == table->s->primary_key && has_hidden_key) {
memcpy(packed_key, &last_hidden_key, sizeof(ulonglong));
*packed_key_len = sizeof(ulonglong);
} else {
// make a row from the table
table->mark_columns_used_by_index(index, &table->tmp_set);
*packed_key_len = pack_row(table, &table->tmp_set, packed_key, buf);
}
}
int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data,
uchar const *const row_data, MY_BITMAP const *cols,
uchar const *const row_end)
{
/* Since unpack_row can only write to record[0], if 'data' does not point to
table->record[0], we must back it up and then restore it afterwards. */
uchar const *current_row_end;
ulong master_reclength;
uchar *backup_row = NULL;
if (data != table->record[0]) {
/* See Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
and the definitions of store_record and restore_record. */
backup_row = (uchar*) my_alloca(table->s->reclength);
memcpy(backup_row, table->record[0], table->s->reclength);
restore_record(table, record[data == table->record[1] ? 1 : 2]);
}
int error_code = unpack_row(rgi, table, table->s->fields, row_data, cols,
&current_row_end, &master_reclength, row_end);
if (backup_row) {
store_record(table, record[data == table->record[1] ? 1 : 2]);
memcpy(table->record[0], backup_row, table->s->reclength);
my_afree(backup_row);
}
return error_code;
}
/****************************************************************************
** Plugin Functions
****************************************************************************/
static int xpand_commit(handlerton *hton, THD *thd, bool all)
{
xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton);
assert(trx);
int error_code = 0;
if (trx->has_open_transaction()) {
if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
error_code = trx->commit_transaction();
else
error_code = trx->new_statement_next();
}
return error_code;
}
static int xpand_rollback(handlerton *hton, THD *thd, bool all)
{
xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton);
assert(trx);
int error_code = 0;
if (trx->has_open_transaction()) {
if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
error_code = trx->rollback_transaction();
else
error_code = trx->rollback_statement_next();
}
return error_code;
}
static handler* xpand_create_handler(handlerton *hton, TABLE_SHARE *table,
MEM_ROOT *mem_root)
{
return new (mem_root) ha_xpand(hton, table);
}
static int xpand_close_connection(handlerton* hton, THD* thd)
{
xpand_connection* trx = (xpand_connection *) thd_get_ha_data(thd, hton);
if (!trx)
return 0; /* Transaction is not started */
int error_code = xpand_rollback(xpand_hton, thd, TRUE);
delete trx;
return error_code;
}
static int xpand_panic(handlerton *hton, ha_panic_function type)
{
return 0;
}
static bool xpand_show_status(handlerton *hton, THD *thd,
stat_print_fn *stat_print,
enum ha_stat_type stat_type)
{
return FALSE;
}
static int xpand_discover_table_names(handlerton *hton, LEX_CSTRING *db,
MY_DIR *dir,
handlerton::discovered_list *result)
{
DBUG_ENTER("xpand_discover_table_names");
xpand_connection *xpand_net = new xpand_connection();
int error_code = xpand_net->connect();
if (error_code) {
if (error_code == HA_ERR_NO_CONNECTION)
error_code = 0;
goto err;
}
error_code = xpand_net->populate_table_list(db, result);
err:
delete xpand_net;
DBUG_RETURN(error_code);
}
int xpand_discover_table(handlerton *hton, THD *thd, TABLE_SHARE *share)
{
DBUG_ENTER("xpand_discover_table");
xpand_connection *xpand_net = new xpand_connection();
int error_code = xpand_net->connect();
if (error_code) {
if (error_code == HA_ERR_NO_CONNECTION)
error_code = HA_ERR_NO_SUCH_TABLE;
goto err;
}
error_code = xpand_net->discover_table_details(&share->db, &share->table_name,
thd, share);
err:
delete xpand_net;
DBUG_RETURN(error_code);
}
static int xpand_init(void *p)
{
DBUG_ENTER("xpand_init");
xpand_hton = (handlerton *) p;
xpand_hton->flags = HTON_NO_FLAGS;
xpand_hton->panic = xpand_panic;
xpand_hton->close_connection = xpand_close_connection;
xpand_hton->commit = xpand_commit;
xpand_hton->rollback = xpand_rollback;
xpand_hton->create = xpand_create_handler;
xpand_hton->show_status = xpand_show_status;
xpand_hton->discover_table_names = xpand_discover_table_names;
xpand_hton->discover_table = xpand_discover_table;
xpand_hton->create_select = create_xpand_select_handler;
xpand_hton->create_derived = create_xpand_derived_handler;
mysql_rwlock_init(key_xpand_hosts, &xpand_hosts_lock);
mysql_rwlock_wrlock(&xpand_hosts_lock);
xpand_hosts = static_cast<xpand_host_list*>(
my_malloc(sizeof(xpand_host_list), MYF(MY_WME | MY_ZEROFILL)));
int error_code = xpand_hosts->fill(xpand_hosts_str);
if (error_code) {
my_free(xpand_hosts);
xpand_hosts = NULL;
}
mysql_rwlock_unlock(&xpand_hosts_lock);
DBUG_RETURN(error_code);
}
static int xpand_deinit(void *p)
{
DBUG_ENTER("xpand_deinit");
mysql_rwlock_wrlock(&xpand_hosts_lock);
xpand_hosts->empty();
my_free(xpand_hosts);
xpand_hosts = NULL;
mysql_rwlock_destroy(&xpand_hosts_lock);
DBUG_RETURN(0);
}
struct st_mysql_show_var xpand_status_vars[] =
{
{NullS, NullS, SHOW_LONG}
};
static struct st_mysql_sys_var* xpand_system_variables[] =
{
MYSQL_SYSVAR(connect_timeout),
MYSQL_SYSVAR(read_timeout),
MYSQL_SYSVAR(write_timeout),
MYSQL_SYSVAR(balance_algorithm),
MYSQL_SYSVAR(hosts),
MYSQL_SYSVAR(username),
MYSQL_SYSVAR(password),
MYSQL_SYSVAR(port),
MYSQL_SYSVAR(socket),
MYSQL_SYSVAR(row_buffer),
MYSQL_SYSVAR(select_handler),
MYSQL_SYSVAR(derived_handler),
MYSQL_SYSVAR(enable_direct_update),
NULL
};
static struct st_mysql_storage_engine xpand_storage_engine =
{MYSQL_HANDLERTON_INTERFACE_VERSION};
maria_declare_plugin(xpand)
{
MYSQL_STORAGE_ENGINE_PLUGIN, /* Plugin Type */
&xpand_storage_engine, /* Plugin Descriptor */
"XPAND", /* Plugin Name */
"MariaDB", /* Plugin Author */
"Xpand storage engine", /* Plugin Description */
PLUGIN_LICENSE_GPL, /* Plugin Licence */
xpand_init, /* Plugin Entry Point */
xpand_deinit, /* Plugin Deinitializer */
0x0001, /* Hex Version Number (0.1) */
NULL /* xpand_status_vars */, /* Status Variables */
xpand_system_variables, /* System Variables */
"0.1", /* String Version */
MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* Maturity Level */
}
maria_declare_plugin_end;
/*****************************************************************************
Copyright (c) 2019, 2020, MariaDB Corporation.
*****************************************************************************/
#ifndef _ha_xpand_h
#define _ha_xpand_h
#ifdef USE_PRAGMA_INTERFACE
#pragma interface /* gcc class implementation */
#endif
#define MYSQL_SERVER 1
#include "xpand_connection.h"
#include "my_bitmap.h"
#include "table.h"
#include "rpl_rli.h"
#include "handler.h"
#include "sql_class.h"
#include "sql_show.h"
#include "mysql.h"
#include "../../sql/rpl_record.h"
size_t estimate_row_size(TABLE *table);
xpand_connection *get_trx(THD *thd, int *error_code);
bool get_enable_sh(THD* thd);
void add_current_table_to_rpl_table_list(rpl_group_info **_rgi, THD *thd,
TABLE *table);
void remove_current_table_from_rpl_table_list(rpl_group_info *rgi);
int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data,
uchar const *const row_data, MY_BITMAP const *cols,
uchar const *const row_end);
void xpand_mark_tables_for_discovery(LEX *lex);
ulonglong *xpand_extract_table_oids(THD *thd, LEX *lex);
class Xpand_share : public Handler_share {
public:
Xpand_share(): xpand_table_oid(0), rediscover_table(false) {}
std::atomic<ulonglong> xpand_table_oid;
std::atomic<bool> rediscover_table;
};
class ha_xpand : public handler
{
private:
// TODO: do we need this here or one in share would be sufficient?
ulonglong xpand_table_oid;
rpl_group_info *rgi;
Field *auto_inc_field;
ulonglong auto_inc_value;
bool has_hidden_key;
ulonglong last_hidden_key;
xpand_connection_cursor *scan_cur;
bool is_scan;
MY_BITMAP scan_fields;
bool sorted_scan;
xpand_lock_mode_t xpd_lock_type;
uint last_dup_errkey;
typedef enum xpand_upsert_flags {
XPAND_HAS_UPSERT= 1,
XPAND_BULK_UPSERT= 2,
XPAND_UPSERT_SENT= 4
} xpd_upsert_flags_t;
int upsert_flag;
List<COND> pushdown_cond_list;
Xpand_share *get_share(); ///< Get the share
public:
ha_xpand(handlerton *hton, TABLE_SHARE *table_arg);
~ha_xpand();
int create(const char *name, TABLE *form, HA_CREATE_INFO *info) override;
int delete_table(const char *name) override;
int rename_table(const char* from, const char* to) override;
int open(const char *name, int mode, uint test_if_locked) override;
int close(void) override;
int reset() override;
int extra(enum ha_extra_function operation) override;
int write_row(const uchar *buf) override;
// start_bulk_update exec_bulk_update
int update_row(const uchar *old_data, const uchar *new_data) override;
// start_bulk_delete exec_bulk_delete
int delete_row(const uchar *buf) override;
int direct_update_rows_init(List<Item> *update_fields) override;
int direct_update_rows(ha_rows *update_rows, ha_rows *found_rows) override;
void start_bulk_insert(ha_rows rows, uint flags = 0) override;
int end_bulk_insert() override;
Table_flags table_flags(void) const override;
ulong index_flags(uint idx, uint part, bool all_parts) const override;
uint max_supported_keys() const override { return MAX_KEY; }
ha_rows records() override;
ha_rows records_in_range(uint inx, key_range *min_key,
key_range *max_key) override;
int info(uint flag) override; // see my_base.h for full description
// multi_read_range
// read_range
int index_init(uint idx, bool sorted) override;
int index_read(uchar * buf, const uchar * key, uint key_len,
enum ha_rkey_function find_flag) override;
int index_first(uchar *buf) override;
int index_prev(uchar *buf) override;
int index_last(uchar *buf) override;
int index_next(uchar *buf) override;
//int index_next_same(uchar *buf, const uchar *key, uint keylen) override;
int index_end() override;
int rnd_init(bool scan) override;
int rnd_next(uchar *buf) override;
int rnd_pos(uchar * buf, uchar *pos) override;
int rnd_end() override;
void position(const uchar *record) override;
uint lock_count(void) const override;
THR_LOCK_DATA **store_lock(THD *thd,
THR_LOCK_DATA **to,
enum thr_lock_type lock_type) override;
int external_lock(THD *thd, int lock_type) override;
uint8 table_cache_type() override
{
return(HA_CACHE_TBL_NOCACHE);
}
const COND *cond_push(const COND *cond) override;
void cond_pop() override;
int info_push(uint info_type, void *info) override;
ulonglong get_table_oid();
private:
void build_key_packed_row(uint index, const uchar *buf,
uchar *packed_key, size_t *packed_key_len);
};
bool select_handler_setting(THD* thd);
bool derived_handler_setting(THD* thd);
uint row_buffer_setting(THD* thd);
#endif // _ha_xpand_h
/*****************************************************************************
Copyright (c) 2019, 2020, MariaDB Corporation.
*****************************************************************************/
#include "ha_xpand.h"
#include "ha_xpand_pushdown.h"
extern handlerton *xpand_hton;
extern uint xpand_row_buffer;
/*@brief Fills up array data types, metadata and nullability*/
/************************************************************
* DESCRIPTION:
* Fills up three arrays with: field binlog data types, field
* metadata and nullability bitmask as in Table_map_log_event
* ctor. Internally creates a temporary table as does
* Pushdown_select. DH uses the actual temp table w/o
* b/c create_DH is called later compared to create_SH.
* More details in server/sql/log_event_server.cc
* PARAMETERS:
* thd - THD*
* table__ - TABLE* temp table for the results
* sl - SELECT_LEX*
* fieldtype - uchar*
* field_metadata - uchar*
* null_bits - uchar*
* num_null_bytes - null bit size
* fields_count - a number of fields
* RETURN:
* metadata_size int or -1 in case of error
************************************************************/
int get_field_types(THD *thd, TABLE *table__, SELECT_LEX *sl, uchar *fieldtype,
uchar *field_metadata, uchar *null_bits,
const int num_null_bytes, const uint fields_count)
{
int field_metadata_size = 0;
int metadata_index = 0;
TABLE *tmp_table= table__;
if (!tmp_table) {
// Construct a tmp table with fields to find out result DTs.
// This should be reconsidered if it worths the effort.
List<Item> types;
TMP_TABLE_PARAM tmp_table_param;
sl->master_unit()->join_union_item_types(thd, types, 1);
tmp_table_param.init();
tmp_table_param.field_count= types.elements;
tmp_table = create_tmp_table(thd, &tmp_table_param, types, (ORDER *) 0,
false, 0, TMP_TABLE_ALL_COLUMNS, 1,
&empty_clex_str, true, false);
if (!tmp_table) {
field_metadata_size = -1;
goto err;
}
}
for (unsigned int i = 0 ; i < fields_count; ++i) {
fieldtype[i]= tmp_table->field[i]->binlog_type();
}
bzero(field_metadata, (fields_count * 2));
for (unsigned int i= 0 ; i < fields_count ; i++)
{
Binlog_type_info bti= tmp_table->field[i]->binlog_type_info();
uchar *ptr = reinterpret_cast<uchar*>(&bti.m_metadata);
memcpy(&field_metadata[metadata_index], ptr, bti.m_metadata_size);
metadata_index+= bti.m_metadata_size;
}
if (metadata_index < 251)
field_metadata_size += metadata_index + 1;
else
field_metadata_size += metadata_index + 3;
bzero(null_bits, num_null_bytes);
for (unsigned int i= 0 ; i < fields_count ; ++i) {
if (tmp_table->field[i]->maybe_null()) {
null_bits[(i / 8)]+= 1 << (i % 8);
}
}
if (!table__)
free_tmp_table(thd, tmp_table);
err:
return field_metadata_size;
}
/*@brief create_xpand_select_handler- Creates handler*/
/************************************************************
* DESCRIPTION:
* Creates a select handler
* More details in server/sql/select_handler.h
* PARAMETERS:
* thd - THD pointer.
* sel - SELECT_LEX* that describes the query.
* RETURN:
* select_handler if possible
* NULL otherwise
************************************************************/
select_handler*
create_xpand_select_handler(THD* thd, SELECT_LEX* select_lex)
{
ulonglong *oids = NULL;
ha_xpand_select_handler *sh = NULL;
if (!select_handler_setting(thd)) {
return sh;
}
// TODO Return early for EXPLAIN before we run the actual scan.
// We can send compile request when we separate compilation
// and execution.
xpand_connection_cursor *scan = NULL;
if (thd->lex->describe) {
sh = new ha_xpand_select_handler(thd, select_lex, scan);
return sh;
}
// Multi-update runs an implicit query to collect constraints.
// SH couldn't be used for this.
if (thd->lex->sql_command == SQLCOM_UPDATE_MULTI) {
return sh;
}
String query;
// Print the query into a string provided
select_lex->print(thd, &query, QT_ORDINARY);
int error_code = 0;
int field_metadata_size = 0;
xpand_connection *trx = NULL;
// We presume this number is equal to types.elements in get_field_types
uint items_number = select_lex->get_item_list()->elements;
uint num_null_bytes = (items_number + 7) / 8;
uchar *fieldtype = NULL;
uchar *null_bits = NULL;
uchar *field_metadata = NULL;
uchar *meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME), &fieldtype, items_number,
&null_bits, num_null_bytes, &field_metadata, (items_number * 2), NULL);
if (!meta_memory) {
// The only way to say something here is to raise warning
// b/c we will fallback to other access methods: derived handler or rowstore.
goto err;
}
if((field_metadata_size =
get_field_types(thd, NULL, select_lex, fieldtype, field_metadata,
null_bits, num_null_bytes, items_number)) < 0) {
goto err;
}
trx = get_trx(thd, &error_code);
if (!trx)
goto err;
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trx->auto_commit_next();
oids = xpand_extract_table_oids(thd, select_lex->parent_lex);
if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits,
num_null_bytes, field_metadata,
field_metadata_size,
row_buffer_setting(thd), oids, &scan))) {
goto err;
}
sh = new ha_xpand_select_handler(thd, select_lex, scan);
err:
if (meta_memory)
my_free(meta_memory);
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_tables_for_discovery(select_lex->parent_lex);
return sh;
}
/***********************************************************
* DESCRIPTION:
* select_handler constructor
* PARAMETERS:
* thd - THD pointer.
* select_lex - sematic tree for the query.
**********************************************************/
ha_xpand_select_handler::ha_xpand_select_handler(
THD *thd,
SELECT_LEX* select_lex,
xpand_connection_cursor *scan_)
: select_handler(thd, xpand_hton)
{
thd__ = thd;
scan = scan_;
select = select_lex;
rgi = NULL;
}
/***********************************************************
* DESCRIPTION:
* select_handler constructor
* This frees dynamic memory allocated for bitmap
* and disables replication to SH temp table.
**********************************************************/
ha_xpand_select_handler::~ha_xpand_select_handler()
{
int error_code;
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx) {
// TBD Log this
}
if (trx && scan)
trx->scan_end(scan);
// If the ::init_scan has been executed
if (table__)
my_bitmap_free(&scan_fields);
if (rgi)
remove_current_table_from_rpl_table_list(rgi);
}
/*@brief Initiate the query for select_handler */
/***********************************************************
* DESCRIPTION:
* Initializes dynamic structures and sets SH temp table
* as RBR replication destination to unpack rows.
* * PARAMETERS:
* RETURN:
* rc as int
* ********************************************************/
int ha_xpand_select_handler::init_scan()
{
// Save this into the base handler class attribute
table__ = table;
// need this bitmap future in next_row()
if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
return ER_OUTOFMEMORY;
bitmap_set_all(&scan_fields);
add_current_table_to_rpl_table_list(&rgi, thd__, table__);
return 0;
}
/*@brief Fetch next row for select_handler */
/***********************************************************
* DESCRIPTION:
* Fetch next row for select_handler.
* PARAMETERS:
* RETURN:
* rc as int
* ********************************************************/
int ha_xpand_select_handler::next_row()
{
int error_code = 0;
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
assert(scan);
uchar *rowdata;
ulong rowdata_length;
if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length)))
return error_code;
uchar const *current_row_end;
ulong master_reclength;
error_code = unpack_row(rgi, table, table->s->fields, rowdata,
&scan_fields, &current_row_end,
&master_reclength, rowdata + rowdata_length);
if (error_code)
return error_code;
return 0;
}
/*@brief Finishes the scan and clean it up */
/***********************************************************
* DESCRIPTION:
* Finishes the scan for select handler
* PARAMETERS:
* RETURN:
* rc as int
***********************************************************/
int ha_xpand_select_handler::end_scan()
{
return 0;
}
/*@brief create_xpand_derived_handler- Creates handler*/
/************************************************************
* DESCRIPTION:
* Creates a derived handler
* More details in server/sql/derived_handler.h
* PARAMETERS:
* thd - THD pointer.
* derived - TABLE_LIST* that describes the tables involved
* RETURN:
* derived_handler if possible
* NULL otherwise
************************************************************/
derived_handler*
create_xpand_derived_handler(THD* thd, TABLE_LIST *derived)
{
ha_xpand_derived_handler *dh = NULL;
if (!derived_handler_setting(thd)) {
return dh;
}
SELECT_LEX_UNIT *unit= derived->derived;
SELECT_LEX *select_lex = unit->first_select();
String query;
dh = new ha_xpand_derived_handler(thd, select_lex, NULL);
return dh;
}
/***********************************************************
* DESCRIPTION:
* derived_handler constructor
* PARAMETERS:
* thd - THD pointer.
* select_lex - sematic tree for the query.
**********************************************************/
ha_xpand_derived_handler::ha_xpand_derived_handler(
THD *thd,
SELECT_LEX* select_lex,
xpand_connection_cursor *scan_)
: derived_handler(thd, xpand_hton)
{
thd__ = thd;
scan = scan_;
select = select_lex;
rgi = NULL;
}
/***********************************************************
* DESCRIPTION:
* derived_handler constructor
* This frees dynamic memory allocated for bitmap
* and disables replication to SH temp table.
**********************************************************/
ha_xpand_derived_handler::~ha_xpand_derived_handler()
{
int error_code;
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx) {
// TBD Log this.
}
if (trx && scan)
trx->scan_end(scan);
// If the ::init_scan has been executed
if (table__)
my_bitmap_free(&scan_fields);
if (rgi)
remove_current_table_from_rpl_table_list(rgi);
}
/*@brief Initiate the query for derived_handler */
/***********************************************************
* DESCRIPTION:
* Initializes dynamic structures and sets SH temp table
* as RBR replication destination to unpack rows.
* * PARAMETERS:
* RETURN:
* rc as int
* ********************************************************/
int ha_xpand_derived_handler::init_scan()
{
String query;
// Print the query into a string provided
select->print(thd__, &query, QT_ORDINARY);
int error_code = 0;
int field_metadata_size = 0;
xpand_connection *trx = NULL;
ulonglong *oids = NULL;
// We presume this number is equal to types.elements in get_field_types
uint items_number= select->get_item_list()->elements;
uint num_null_bytes = (items_number + 7) / 8;
uchar *fieldtype = NULL;
uchar *null_bits = NULL;
uchar *field_metadata = NULL;
uchar *meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME), &fieldtype, items_number,
&null_bits, num_null_bytes, &field_metadata, (items_number * 2), NULL);
if (!meta_memory) {
// The only way to say something here is to raise warning
// b/c we will fallback to other access methods: derived handler or rowstore.
goto err;
}
if((field_metadata_size=
get_field_types(thd__, table, select, fieldtype, field_metadata,
null_bits, num_null_bytes, items_number)) < 0) {
goto err;
}
trx = get_trx(thd__, &error_code);
if (!trx)
goto err;
oids = xpand_extract_table_oids(thd__, select->parent_lex);
if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits,
num_null_bytes, field_metadata,
field_metadata_size,
row_buffer_setting(thd), oids, &scan))) {
goto err;
}
// Save this into the base handler class attribute
table__ = table;
// need this bitmap future in next_row()
if (my_bitmap_init(&scan_fields, NULL, table->read_set->n_bits, false))
return ER_OUTOFMEMORY;
bitmap_set_all(&scan_fields);
add_current_table_to_rpl_table_list(&rgi, thd__, table__);
err:
if (meta_memory)
my_free(meta_memory);
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_tables_for_discovery(select->parent_lex);
return error_code;
}
/*@brief Fetch next row for derived_handler */
/***********************************************************
* DESCRIPTION:
* Fetch next row for derived_handler.
* PARAMETERS:
* RETURN:
* rc as int
* ********************************************************/
int ha_xpand_derived_handler::next_row()
{
int error_code = 0;
xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
assert(scan);
uchar *rowdata;
ulong rowdata_length;
if ((error_code = trx->scan_next(scan, &rowdata, &rowdata_length)))
return error_code;
uchar const *current_row_end;
ulong master_reclength;
error_code = unpack_row(rgi, table, table->s->fields, rowdata,
&scan_fields, &current_row_end,
&master_reclength, rowdata + rowdata_length);
if (error_code)
return error_code;
return 0;
}
/*@brief Finishes the scan and clean it up */
/***********************************************************
* DESCRIPTION:
* Finishes the scan for derived handler
* PARAMETERS:
* RETURN:
* rc as int
***********************************************************/
int ha_xpand_derived_handler::end_scan()
{
return 0;
}
/*****************************************************************************
Copyright (c) 2019, 2020, MariaDB Corporation.
*****************************************************************************/
#ifndef _ha_xpand_pushdown_h
#define _ha_xpand_pushdown_h
#include "select_handler.h"
#include "derived_handler.h"
#include "sql_select.h"
/*@brief base_handler class*/
/***********************************************************
* DESCRIPTION:
* To be described
************************************************************/
class ha_xpand_base_handler
{
// To simulate abstract class
protected:
ha_xpand_base_handler(): thd__(0),table__(0) {}
~ha_xpand_base_handler() {}
// Copies of pushdown handlers attributes
// to use them in shared methods.
THD *thd__;
TABLE *table__;
// The bitmap used to sent
MY_BITMAP scan_fields;
// Structures to unpack RBR rows from XPD BE
rpl_group_info *rgi;
// XPD BE scan operation reference
xpand_connection_cursor *scan;
};
/*@brief select_handler class*/
/***********************************************************
* DESCRIPTION:
* select_handler API methods. Could be used by the server
* tp pushdown the whole query described by SELECT_LEX.
* More details in server/sql/select_handler.h
* sel semantic tree for the query in SELECT_LEX.
************************************************************/
class ha_xpand_select_handler:
private ha_xpand_base_handler,
public select_handler
{
public:
ha_xpand_select_handler(THD* thd_arg, SELECT_LEX* sel,
xpand_connection_cursor *scan);
~ha_xpand_select_handler();
int init_scan() override;
int next_row() override;
int end_scan() override;
void print_error(int, unsigned long) override {}
};
/*@brief derived_handler class*/
/***********************************************************
* DESCRIPTION:
* derived_handler API methods. Could be used by the server
* tp pushdown the whole query described by SELECT_LEX.
* More details in server/sql/derived_handler.h
* sel semantic tree for the query in SELECT_LEX.
************************************************************/
class ha_xpand_derived_handler:
private ha_xpand_base_handler,
public derived_handler
{
public:
ha_xpand_derived_handler(THD* thd_arg, SELECT_LEX* sel,
xpand_connection_cursor *scan);
~ha_xpand_derived_handler();
int init_scan() override;
int next_row() override;
int end_scan() override;
void print_error(int, unsigned long) override {}
};
select_handler *create_xpand_select_handler(THD* thd, SELECT_LEX* select_lex);
derived_handler *create_xpand_derived_handler(THD* thd, TABLE_LIST *derived);
#endif
/*****************************************************************************
Copyright (c) 2019, 2020, MariaDB Corporation.
*****************************************************************************/
/** @file xpand_connection.cc */
#include "xpand_connection.h"
#include "ha_xpand.h"
#include <string>
#include "handler.h"
#include "table.h"
#include "sql_class.h"
#include "my_pthread.h"
#include "tztime.h"
#include "errmsg.h"
#ifdef _WIN32
#include <stdlib.h>
#define htobe64 _byteswap_uint64
#define be64toh _byteswap_uint64
#define htobe32 _byteswap_ulong
#define be32toh _byteswap_ulong
#define htobe16 _byteswap_ushort
#define be16toh _byteswap_ushort
#endif
#if defined(__APPLE__)
#include <libkern/OSByteOrder.h>
#define htobe64(x) OSSwapHostToBigInt64(x)
#define be64toh(x) OSSwapBigToHostInt64(x)
#define htobe32(x) OSSwapHostToBigInt32(x)
#define be32toh(x) OSSwapBigToHostInt32(x)
#define htobe16(x) OSSwapHostToBigInt16(x)
#define be16toh(x) OSSwapBigToHostInt16(x)
#endif
extern int xpand_connect_timeout;
extern int xpand_read_timeout;
extern int xpand_write_timeout;
extern char *xpand_username;
extern char *xpand_password;
extern uint xpand_port;
extern char *xpand_socket;
/*
This class implements the commands that can be sent to the cluster by the
Xpand engine. All of these commands return a status to the caller, but some
commands also create open invocations on the cluster, which must be closed by
sending additional commands.
Transactions on the cluster are started using flags attached to commands, and
transactions are committed or rolled back using separate commands.
Methods ending with _next affect the transaction state after the next command
is sent to the cluster. Other transaction commands are sent to the cluster
immediately, and the state is changed before they return.
_____________________ _______________________
| | | | | |
V | | V | |
NONE --> REQUESTED --> STARTED --> NEW_STMT |
| |
`----> ROLLBACK_STMT ---`
The commit and rollback commands will change any other state to NONE. This
includes the REQUESTED state, for which nothing will be sent to the cluster.
The rollback statement command can likewise change the state from NEW_STMT to
STARTED without sending anything to the cluster.
In addition, the XPAND_TRANS_AUTOCOMMIT flag will cause the transactions
for commands that complete without leaving open invocations on the cluster to
be committed if successful or rolled back if there was an error. If
auto-commit is enabled, only one open invocation may be in progress at a
time.
*/
enum xpand_trans_state {
XPAND_TRANS_STARTED = 0,
XPAND_TRANS_REQUESTED = 1,
XPAND_TRANS_NEW_STMT = 2,
XPAND_TRANS_ROLLBACK_STMT = 4,
XPAND_TRANS_NONE = 32,
};
const int XPAND_TRANS_STARTS_STMT = (XPAND_TRANS_NEW_STMT |
XPAND_TRANS_REQUESTED |
XPAND_TRANS_ROLLBACK_STMT);
enum xpand_trans_post_flags {
XPAND_TRANS_AUTOCOMMIT = 8,
XPAND_TRANS_NO_POST_FLAGS = 0,
};
enum xpand_commands {
XPAND_WRITE_ROW = 1,
XPAND_SCAN_TABLE,
XPAND_SCAN_NEXT,
XPAND_SCAN_STOP,
XPAND_KEY_READ,
XPAND_KEY_DELETE,
XPAND_SCAN_QUERY,
XPAND_KEY_UPDATE,
XPAND_SCAN_FROM_KEY,
XPAND_UPDATE_QUERY,
XPAND_COMMIT,
XPAND_ROLLBACK,
XPAND_SCAN_TABLE_COND,
};
/****************************************************************************
** Class xpand_connection
****************************************************************************/
xpand_connection::xpand_connection()
: command_buffer(NULL), command_buffer_length(0), command_length(0),
trans_state(XPAND_TRANS_NONE), trans_flags(XPAND_TRANS_NO_POST_FLAGS)
{
DBUG_ENTER("xpand_connection::xpand_connection");
memset(&xpand_net, 0, sizeof(MYSQL));
DBUG_VOID_RETURN;
}
xpand_connection::~xpand_connection()
{
DBUG_ENTER("xpand_connection::~xpand_connection");
if (is_connected())
disconnect(TRUE);
if (command_buffer)
my_free(command_buffer);
DBUG_VOID_RETURN;
}
void xpand_connection::disconnect(bool is_destructor)
{
DBUG_ENTER("xpand_connection::disconnect");
if (is_destructor)
{
/*
Connection object destruction occurs after the destruction of
the thread used by the network has begun, so usage of that
thread object now is not reliable
*/
xpand_net.net.thd = NULL;
}
mysql_close(&xpand_net);
DBUG_VOID_RETURN;
}
extern int xpand_hosts_cur;
extern ulong xpand_balance_algorithm;
extern mysql_rwlock_t xpand_hosts_lock;
extern xpand_host_list *xpand_hosts;
int xpand_connection::connect()
{
DBUG_ENTER("xpand_connection::connect");
int start = 0;
if (xpand_balance_algorithm == XPAND_BALANCE_ROUND_ROBIN)
start = my_atomic_add32(&xpand_hosts_cur, 1);
mysql_rwlock_rdlock(&xpand_hosts_lock);
//search for available host
int error_code = HA_ERR_NO_CONNECTION;
for (int i = 0; i < xpand_hosts->hosts_len; i++) {
char *host = xpand_hosts->hosts[(start + i) % xpand_hosts->hosts_len];
error_code = connect_direct(host);
if (!error_code)
break;
}
mysql_rwlock_unlock(&xpand_hosts_lock);
DBUG_RETURN(error_code);
}
int xpand_connection::connect_direct(char *host)
{
DBUG_ENTER("xpand_connection::connect_direct");
my_bool my_true = true;
DBUG_PRINT("host", ("%s", host));
if (!mysql_init(&xpand_net))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
uint protocol_tcp = MYSQL_PROTOCOL_TCP;
mysql_options(&xpand_net, MYSQL_OPT_PROTOCOL, &protocol_tcp);
mysql_options(&xpand_net, MYSQL_OPT_READ_TIMEOUT,
&xpand_read_timeout);
mysql_options(&xpand_net, MYSQL_OPT_WRITE_TIMEOUT,
&xpand_write_timeout);
mysql_options(&xpand_net, MYSQL_OPT_CONNECT_TIMEOUT,
&xpand_connect_timeout);
mysql_options(&xpand_net, MYSQL_OPT_USE_REMOTE_CONNECTION,
NULL);
mysql_options(&xpand_net, MYSQL_SET_CHARSET_NAME, "utf8mb4");
mysql_options(&xpand_net, MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY,
(char *) &my_true);
mysql_options(&xpand_net, MYSQL_INIT_COMMAND,"SET autocommit=0");
#ifdef XPAND_CONNECTION_SSL
if (opt_ssl_ca_length | conn->tgt_ssl_capath_length |
conn->tgt_ssl_cert_length | conn->tgt_ssl_key_length)
{
mysql_ssl_set(&xpand_net, conn->tgt_ssl_key, conn->tgt_ssl_cert,
conn->tgt_ssl_ca, conn->tgt_ssl_capath, conn->tgt_ssl_cipher);
if (conn->tgt_ssl_vsc)
{
my_bool verify_flg = TRUE;
mysql_options(&xpand_net, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, &verify_flg);
}
}
#endif
int error_code = 0;
if (!mysql_real_connect(&xpand_net, host, xpand_username, xpand_password,
NULL, xpand_port, xpand_socket,
CLIENT_MULTI_STATEMENTS))
{
sql_print_error("Error connecting to xpand: %s", mysql_error(&xpand_net));
disconnect();
error_code = HA_ERR_NO_CONNECTION;
}
DBUG_RETURN(error_code);
}
int xpand_connection::add_status_vars()
{
DBUG_ENTER("xpand_connection::add_status_vars");
if (!(trans_state & XPAND_TRANS_STARTS_STMT))
DBUG_RETURN(add_command_operand_uchar(0));
int error_code = 0;
system_variables vars = current_thd->variables;
if ((error_code = add_command_operand_uchar(1)))
DBUG_RETURN(error_code);
//sql mode
if ((error_code = add_command_operand_ulonglong(vars.sql_mode)))
DBUG_RETURN(error_code);
//auto increment state
if ((error_code = add_command_operand_ushort(vars.auto_increment_increment)))
DBUG_RETURN(error_code);
if ((error_code = add_command_operand_ushort(vars.auto_increment_offset)))
DBUG_RETURN(error_code);
//character sets and collations
if ((error_code = add_command_operand_ushort(vars.character_set_results->number)))
DBUG_RETURN(error_code);
if ((error_code = add_command_operand_ushort(vars.character_set_client->number)))
DBUG_RETURN(error_code);
if ((error_code = add_command_operand_ushort(vars.collation_connection->number)))
DBUG_RETURN(error_code);
if ((error_code = add_command_operand_ushort(vars.collation_server->number)))
DBUG_RETURN(error_code);
//timezone and time names
String tzone;
vars.time_zone->get_name()->print(&tzone, system_charset_info);
if ((error_code = add_command_operand_str((const uchar*)tzone.ptr(),tzone.length())))
DBUG_RETURN(error_code);
if ((error_code = add_command_operand_ushort(vars.lc_time_names->number)))
DBUG_RETURN(error_code);
//transaction isolation
if ((error_code = add_command_operand_uchar(vars.tx_isolation)))
DBUG_RETURN(error_code);
DBUG_RETURN(0);
}
int xpand_connection::begin_command(uchar command)
{
if (trans_state == XPAND_TRANS_NONE)
return HA_ERR_INTERNAL_ERROR;
command_length = 0;
int error_code = 0;
if ((error_code = add_command_operand_uchar(command)))
return error_code;
if ((error_code = add_command_operand_uchar(trans_state | trans_flags)))
return error_code;
if ((error_code = add_status_vars()))
return error_code;
return error_code;
}
int xpand_connection::send_command()
{
/*
Please note:
* The transaction state is set before the command is sent because rolling
back a nonexistent transaction is better than leaving a tranaction open
on the cluster.
* The state may have alreadly been STARTED.
* Commit and rollback commands update the transaction state after calling
this function.
* If auto-commit is enabled, the state may also updated after the
response has been processed. We do not clear the auto-commit flag here
because it needs to be sent with each command until the transaction is
committed or rolled back.
*/
trans_state = XPAND_TRANS_STARTED;
if (simple_command(&xpand_net,
(enum_server_command)XPAND_SERVER_REQUEST,
command_buffer, command_length, TRUE))
return mysql_errno(&xpand_net);
return 0;
}
int xpand_connection::read_query_response()
{
int error_code = 0;
if (xpand_net.methods->read_query_result(&xpand_net))
error_code = mysql_errno(&xpand_net);
auto_commit_closed();
return error_code;
}
bool xpand_connection::has_open_transaction()
{
return trans_state != XPAND_TRANS_NONE;
}
int xpand_connection::commit_transaction()
{
DBUG_ENTER("xpand_connection::commit_transaction");
if (trans_state == XPAND_TRANS_NONE)
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
if (trans_state == XPAND_TRANS_REQUESTED) {
trans_state = XPAND_TRANS_NONE;
trans_flags = XPAND_TRANS_NO_POST_FLAGS;
DBUG_RETURN(0);
}
int error_code;
if ((error_code = begin_command(XPAND_COMMIT)))
DBUG_RETURN(error_code);
if ((error_code = send_command()))
DBUG_RETURN(error_code);
if ((error_code = read_query_response()))
DBUG_RETURN(error_code);
trans_state = XPAND_TRANS_NONE;
trans_flags = XPAND_TRANS_NO_POST_FLAGS;
DBUG_RETURN(error_code);
}
int xpand_connection::rollback_transaction()
{
DBUG_ENTER("xpand_connection::rollback_transaction");
if (trans_state == XPAND_TRANS_NONE ||
trans_state == XPAND_TRANS_REQUESTED) {
trans_state = XPAND_TRANS_NONE;
DBUG_RETURN(0);
}
int error_code;
if ((error_code = begin_command(XPAND_ROLLBACK)))
DBUG_RETURN(error_code);
if ((error_code = send_command()))
DBUG_RETURN(error_code);
if ((error_code = read_query_response()))
DBUG_RETURN(error_code);
trans_state = XPAND_TRANS_NONE;
trans_flags = XPAND_TRANS_NO_POST_FLAGS;
DBUG_RETURN(error_code);
}
int xpand_connection::begin_transaction_next()
{
DBUG_ENTER("xpand_connection::begin_transaction_next");
if (trans_state != XPAND_TRANS_NONE ||
trans_flags != XPAND_TRANS_NO_POST_FLAGS)
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
trans_state = XPAND_TRANS_REQUESTED;
DBUG_RETURN(0);
}
int xpand_connection::new_statement_next()
{
DBUG_ENTER("xpand_connection::new_statement_next");
if (trans_state != XPAND_TRANS_STARTED ||
trans_flags != XPAND_TRANS_NO_POST_FLAGS)
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
trans_state = XPAND_TRANS_NEW_STMT;
DBUG_RETURN(0);
}
int xpand_connection::rollback_statement_next()
{
DBUG_ENTER("xpand_connection::rollback_statement_next");
if (trans_state != XPAND_TRANS_STARTED ||
trans_flags != XPAND_TRANS_NO_POST_FLAGS)
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
trans_state = XPAND_TRANS_ROLLBACK_STMT;
DBUG_RETURN(0);
}
void xpand_connection::auto_commit_next()
{
trans_flags |= XPAND_TRANS_AUTOCOMMIT;
}
void xpand_connection::auto_commit_closed()
{
if (trans_flags & XPAND_TRANS_AUTOCOMMIT) {
trans_flags &= ~XPAND_TRANS_AUTOCOMMIT;
trans_state = XPAND_TRANS_NONE;
}
}
int xpand_connection::run_query(String &stmt)
{
int error_code = mysql_real_query(&xpand_net, stmt.ptr(), stmt.length());
if (error_code)
return mysql_errno(&xpand_net);
return error_code;
}
int xpand_connection::write_row(ulonglong xpand_table_oid, uchar *packed_row,
size_t packed_size, ulonglong *last_insert_id)
{
int error_code;
command_length = 0;
// row based commands should not be called with auto commit.
if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
return HA_ERR_INTERNAL_ERROR;
if ((error_code = begin_command(XPAND_WRITE_ROW)))
return error_code;
if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
return error_code;
if ((error_code = add_command_operand_str(packed_row, packed_size)))
return error_code;
if ((error_code = send_command()))
return error_code;
if ((error_code = read_query_response())) {
if (error_code == ER_DUP_ENTRY)
return HA_ERR_FOUND_DUPP_KEY;
return error_code;
}
*last_insert_id = xpand_net.insert_id;
return error_code;
}
int xpand_connection::key_update(ulonglong xpand_table_oid, uchar *packed_key,
size_t packed_key_length,
MY_BITMAP *update_set, uchar *packed_new_data,
size_t packed_new_length)
{
int error_code;
command_length = 0;
// row based commands should not be called with auto commit.
if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
return HA_ERR_INTERNAL_ERROR;
if ((error_code = begin_command(XPAND_KEY_UPDATE)))
return error_code;
if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
return error_code;
if ((error_code = add_command_operand_str(packed_key, packed_key_length)))
return error_code;
if ((error_code = add_command_operand_bitmap(update_set)))
return error_code;
if ((error_code = add_command_operand_str(packed_new_data,
packed_new_length)))
return error_code;
if ((error_code = send_command()))
return error_code;
if ((error_code = read_query_response()))
return error_code;
return error_code;
}
int xpand_connection::key_delete(ulonglong xpand_table_oid,
uchar *packed_key, size_t packed_key_length)
{
int error_code;
command_length = 0;
// row based commands should not be called with auto commit.
if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
return HA_ERR_INTERNAL_ERROR;
if ((error_code = begin_command(XPAND_KEY_DELETE)))
return error_code;
if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
return error_code;
if ((error_code = add_command_operand_str(packed_key, packed_key_length)))
return error_code;
if ((error_code = send_command()))
return error_code;
if ((error_code = read_query_response()))
return error_code;
return error_code;
}
int xpand_connection::key_read(ulonglong xpand_table_oid, uint index,
xpand_lock_mode_t lock_mode, MY_BITMAP *read_set,
uchar *packed_key, ulong packed_key_length,
uchar **rowdata, ulonglong *rowdata_length)
{
int error_code;
command_length = 0;
// row based commands should not be called with auto commit.
if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
return HA_ERR_INTERNAL_ERROR;
if ((error_code = begin_command(XPAND_KEY_READ)))
return error_code;
if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
return error_code;
if ((error_code = add_command_operand_uint(index)))
return error_code;
if ((error_code = add_command_operand_uchar((uchar)lock_mode)))
return error_code;
if ((error_code = add_command_operand_bitmap(read_set)))
return error_code;
if ((error_code = add_command_operand_str(packed_key, packed_key_length)))
return error_code;
if ((error_code = send_command()))
return error_code;
ulong packet_length = cli_safe_read(&xpand_net);
if (packet_length == packet_error)
return mysql_errno(&xpand_net);
uchar *data = xpand_net.net.read_pos;
*rowdata_length = safe_net_field_length_ll(&data, packet_length);
*rowdata = (uchar *)my_malloc(*rowdata_length, MYF(MY_WME));
memcpy(*rowdata, data, *rowdata_length);
packet_length = cli_safe_read(&xpand_net);
if (packet_length == packet_error) {
my_free(*rowdata);
*rowdata = NULL;
*rowdata_length = 0;
return mysql_errno(&xpand_net);
}
return 0;
}
class xpand_connection_cursor {
struct rowdata {
ulong length;
uchar *data;
};
ulong current_row;
ulong last_row;
struct rowdata *rows;
uchar *outstanding_row; // to be freed on next request.
MYSQL *xpand_net;
public:
ulong buffer_size;
ulonglong scan_refid;
bool eof_reached;
private:
int cache_row(uchar *rowdata, ulong rowdata_length)
{
DBUG_ENTER("xpand_connection_cursor::cache_row");
rows[last_row].length = rowdata_length;
rows[last_row].data = (uchar *)my_malloc(rowdata_length, MYF(MY_WME));
if (!rows[last_row].data)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
memcpy(rows[last_row].data, rowdata, rowdata_length);
last_row++;
DBUG_RETURN(0);
}
int load_rows_impl(bool *stmt_completed)
{
DBUG_ENTER("xpand_connection_cursor::load_rows_impl");
int error_code = 0;
ulong packet_length = cli_safe_read(xpand_net);
if (packet_length == packet_error) {
error_code = mysql_errno(xpand_net);
*stmt_completed = TRUE;
if (error_code == HA_ERR_END_OF_FILE) {
// We have read all rows for query.
eof_reached = TRUE;
DBUG_RETURN(0);
}
DBUG_RETURN(error_code);
}
uchar *rowdata = xpand_net->net.read_pos;
ulong rowdata_length = (ulong) safe_net_field_length_ll(&rowdata, packet_length);
if (!rowdata_length) {
// We have read all rows in this batch.
DBUG_RETURN(0);
}
if ((error_code = cache_row(rowdata, rowdata_length)))
DBUG_RETURN(error_code);
DBUG_RETURN(load_rows_impl(stmt_completed));
}
public:
xpand_connection_cursor(MYSQL *xpand_net_, ulong bufsize)
{
DBUG_ENTER("xpand_connection_cursor::xpand_connection_cursor");
xpand_net = xpand_net_;
eof_reached = FALSE;
current_row = 0;
last_row = 0;
outstanding_row = NULL;
buffer_size = bufsize;
rows = NULL;
DBUG_VOID_RETURN;
}
~xpand_connection_cursor()
{
DBUG_ENTER("xpand_connection_cursor::~xpand_connection_cursor");
if (outstanding_row)
my_free(outstanding_row);
if (rows) {
while (current_row < last_row)
my_free(rows[current_row++].data);
my_free(rows);
}
DBUG_VOID_RETURN;
}
int load_rows(bool *stmt_completed)
{
DBUG_ENTER("xpand_connection_cursor::load_rows");
current_row = 0;
last_row = 0;
DBUG_RETURN(load_rows_impl(stmt_completed));
}
int initialize(bool *stmt_completed)
{
DBUG_ENTER("xpand_connection_cursor::initialize");
ulong packet_length = cli_safe_read(xpand_net);
if (packet_length == packet_error) {
*stmt_completed = TRUE;
int error_code = mysql_errno(xpand_net);
my_printf_error(error_code, "Xpand error: %s", MYF(0),
mysql_error(xpand_net));
DBUG_RETURN(error_code);
}
unsigned char *pos = xpand_net->net.read_pos;
scan_refid = safe_net_field_length_ll(&pos, packet_length);
rows = (struct rowdata *)my_malloc(buffer_size * sizeof(struct rowdata),
MYF(MY_WME));
if (!rows)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
DBUG_RETURN(load_rows(stmt_completed));
}
uchar *retrieve_row(ulong *rowdata_length)
{
DBUG_ENTER("xpand_connection_cursor::retrieve_row");
if (outstanding_row) {
my_free(outstanding_row);
outstanding_row = NULL;
}
if (current_row == last_row)
DBUG_RETURN(NULL);
*rowdata_length = rows[current_row].length;
outstanding_row = rows[current_row].data;
current_row++;
DBUG_RETURN(outstanding_row);
}
};
int xpand_connection::allocate_cursor(MYSQL *xpand_net, ulong buffer_size,
xpand_connection_cursor **scan)
{
DBUG_ENTER("xpand_connection::allocate_cursor");
*scan = new xpand_connection_cursor(xpand_net, buffer_size);
if (!*scan)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
bool stmt_completed = FALSE;
int error_code = (*scan)->initialize(&stmt_completed);
if (error_code) {
delete *scan;
*scan = NULL;
}
if (stmt_completed)
auto_commit_closed();
DBUG_RETURN(error_code);
}
int xpand_connection::scan_table(ulonglong xpand_table_oid,
xpand_lock_mode_t lock_mode,
MY_BITMAP *read_set, ushort row_req,
xpand_connection_cursor **scan,
String* pushdown_cond_sql)
{
int error_code;
command_length = 0;
// row based commands should not be called with auto commit.
if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
return HA_ERR_INTERNAL_ERROR;
if (pushdown_cond_sql != nullptr) {
if ((error_code= begin_command(XPAND_SCAN_TABLE_COND)))
return error_code;
} else {
if ((error_code= begin_command(XPAND_SCAN_TABLE)))
return error_code;
}
if ((error_code = add_command_operand_ushort(row_req)))
return error_code;
if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
return error_code;
if ((error_code = add_command_operand_uchar((uchar)lock_mode)))
return error_code;
if ((error_code = add_command_operand_bitmap(read_set)))
return error_code;
if (pushdown_cond_sql != nullptr) {
if ((error_code= add_command_operand_str(
reinterpret_cast<const uchar*>(pushdown_cond_sql->ptr()),
pushdown_cond_sql->length()))) {
return error_code;
}
}
if ((error_code = send_command()))
return error_code;
return allocate_cursor(&xpand_net, row_req, scan);
}
/**
* @brief
* Sends a command to initiate query scan.
* @details
* Sends a command over mysql protocol connection to initiate an
* arbitrary query using a query text.
* Uses field types, field metadata and nullability to explicitly
* cast result to expected data type. Exploits RBR TABLE_MAP_EVENT
* format + sends SQL text.
* @args
* stmt& Query text to send
* fieldtype* array of byte wide field types of result projection
* null_bits* fields nullability bitmap of result projection
* field_metadata* Field metadata of result projection
* scan_refid id used to reference this scan later
* Used in pushdowns to initiate query scan.
**/
int xpand_connection::scan_query(String &stmt, uchar *fieldtype, uint fields,
uchar *null_bits, uint null_bits_size,
uchar *field_metadata,
uint field_metadata_size, ushort row_req,
ulonglong *oids,
xpand_connection_cursor **scan)
{
int error_code;
command_length = 0;
if ((error_code = begin_command(XPAND_SCAN_QUERY)))
return error_code;
do {
if ((error_code = add_command_operand_ulonglong(*oids)))
return error_code;
}
while (*oids++);
if ((error_code = add_command_operand_ushort(row_req)))
return error_code;
if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length())))
return error_code;
if ((error_code = add_command_operand_str(fieldtype, fields)))
return error_code;
if ((error_code = add_command_operand_str(field_metadata,
field_metadata_size)))
return error_code;
// This variable length string calls for an additional store w/o lcb lenth prefix.
if ((error_code = add_command_operand_vlstr(null_bits, null_bits_size)))
return error_code;
if ((error_code = send_command()))
return error_code;
return allocate_cursor(&xpand_net, row_req, scan);
}
/**
* @brief
* Sends a command to initiate UPDATE.
* @details
* Sends a command over mysql protocol connection to initiate an
* UPDATE query using a query text.
* @args
* stmt& Query text to send
* dbname current working database
* dbname &current database name
**/
int xpand_connection::update_query(String &stmt, LEX_CSTRING &dbname,
ulonglong *oids, ulonglong *affected_rows)
{
int error_code;
command_length = 0;
if ((error_code = begin_command(XPAND_UPDATE_QUERY)))
return error_code;
do {
if ((error_code = add_command_operand_ulonglong(*oids)))
return error_code;
}
while (*oids++);
if ((error_code = add_command_operand_str((uchar*)dbname.str, dbname.length)))
return error_code;
if ((error_code = add_command_operand_str((uchar*)stmt.ptr(), stmt.length())))
return error_code;
if ((error_code = send_command()))
return error_code;
error_code = read_query_response();
if (!error_code)
*affected_rows = xpand_net.affected_rows;
return error_code;
}
int xpand_connection::scan_from_key(ulonglong xpand_table_oid, uint index,
xpand_lock_mode_t lock_mode,
enum scan_type scan_dir,
int no_key_cols, bool sorted_scan,
MY_BITMAP *read_set, uchar *packed_key,
ulong packed_key_length, ushort row_req,
xpand_connection_cursor **scan)
{
int error_code;
command_length = 0;
// row based commands should not be called with auto commit.
if (trans_flags & XPAND_TRANS_AUTOCOMMIT)
return HA_ERR_INTERNAL_ERROR;
if ((error_code = begin_command(XPAND_SCAN_FROM_KEY)))
return error_code;
if ((error_code = add_command_operand_ushort(row_req)))
return error_code;
if ((error_code = add_command_operand_ulonglong(xpand_table_oid)))
return error_code;
if ((error_code = add_command_operand_uint(index)))
return error_code;
if ((error_code = add_command_operand_uchar((uchar)lock_mode)))
return error_code;
if ((error_code = add_command_operand_uchar(scan_dir)))
return error_code;
if ((error_code = add_command_operand_uint(no_key_cols)))
return error_code;
if ((error_code = add_command_operand_uchar(sorted_scan)))
return error_code;
if ((error_code = add_command_operand_str(packed_key, packed_key_length)))
return error_code;
if ((error_code = add_command_operand_bitmap(read_set)))
return error_code;
if ((error_code = send_command()))
return error_code;
return allocate_cursor(&xpand_net, row_req, scan);
}
int xpand_connection::scan_next(xpand_connection_cursor *scan,
uchar **rowdata, ulong *rowdata_length)
{
*rowdata = scan->retrieve_row(rowdata_length);
if (*rowdata)
return 0;
if (scan->eof_reached)
return HA_ERR_END_OF_FILE;
int error_code;
command_length = 0;
if ((error_code = begin_command(XPAND_SCAN_NEXT)))
return error_code;
// This should not happen as @@xpand_row_buffer has this limit.
if (scan->buffer_size > 65535)
return HA_ERR_INTERNAL_ERROR;
if ((error_code = add_command_operand_ushort((ushort)scan->buffer_size)))
return error_code;
if ((error_code = add_command_operand_lcb(scan->scan_refid)))
return error_code;
if ((error_code = send_command()))
return error_code;
bool stmt_completed = FALSE;
error_code = scan->load_rows(&stmt_completed);
if (stmt_completed)
auto_commit_closed();
if (error_code)
return error_code;
*rowdata = scan->retrieve_row(rowdata_length);
if (!*rowdata)
return HA_ERR_END_OF_FILE;
return 0;
}
int xpand_connection::scan_end(xpand_connection_cursor *scan)
{
int error_code;
command_length = 0;
ulonglong scan_refid = scan->scan_refid;
bool eof_reached = scan->eof_reached;
delete scan;
if (eof_reached)
return 0;
if ((error_code = begin_command(XPAND_SCAN_STOP)))
return error_code;
if ((error_code = add_command_operand_lcb(scan_refid)))
return error_code;
if ((error_code = send_command()))
return error_code;
return read_query_response();
}
int xpand_connection::populate_table_list(LEX_CSTRING *db,
handlerton::discovered_list *result)
{
int error_code = 0;
String stmt;
stmt.append("SHOW FULL TABLES FROM ");
stmt.append(db);
stmt.append(" WHERE table_type = 'BASE TABLE'");
if (mysql_real_query(&xpand_net, stmt.c_ptr(), stmt.length())) {
int error_code = mysql_errno(&xpand_net);
if (error_code == ER_BAD_DB_ERROR)
return 0;
else
return error_code;
}
MYSQL_RES *results = mysql_store_result(&xpand_net);
if (mysql_num_fields(results) != 2) {
error_code = HA_ERR_CORRUPT_EVENT;
goto error;
}
MYSQL_ROW row;
while((row = mysql_fetch_row(results)))
result->add_table(row[0], strlen(row[0]));
error:
mysql_free_result(results);
return error_code;
}
/*
Given a table name, find its OID in the Clustrix, and save it in TABLE_SHARE
@param db Database name
@param name Table name
@param oid OUT Return the OID here
@param share INOUT If not NULL and the share has ha_share pointer, also
update Xpand_share::xpand_table_oid.
@return
0 - OK
error code if an error occurred
*/
int xpand_connection::get_table_oid(const char *db, size_t db_len,
const char *name, size_t name_len,
ulonglong *oid, TABLE_SHARE *share)
{
MYSQL_ROW row;
int error_code = 0;
MYSQL_RES *results_oid = NULL;
String get_oid;
DBUG_ENTER("xpand_connection::get_table_oid");
/* get oid */
get_oid.append("select r.table "
"from system.databases d "
" inner join ""system.relations r on d.db = r.db "
"where d.name = '");
get_oid.append(db, db_len);
get_oid.append("' and r.name = '");
get_oid.append(name, name_len);
get_oid.append("'");
if (mysql_real_query(&xpand_net, get_oid.c_ptr(), get_oid.length())) {
if ((error_code = mysql_errno(&xpand_net))) {
DBUG_PRINT("mysql_real_query returns ", ("%d", error_code));
error_code = HA_ERR_NO_SUCH_TABLE;
goto error;
}
}
results_oid = mysql_store_result(&xpand_net);
DBUG_PRINT("oid results",
("rows: %llu, fields: %u", mysql_num_rows(results_oid),
mysql_num_fields(results_oid)));
if (mysql_num_rows(results_oid) != 1) {
error_code = HA_ERR_NO_SUCH_TABLE;
goto error;
}
if ((row = mysql_fetch_row(results_oid))) {
DBUG_PRINT("row", ("%s", row[0]));
*oid = strtoull((const char *)row[0], NULL, 10);
} else {
error_code = HA_ERR_NO_SUCH_TABLE;
goto error;
}
error:
if (results_oid)
mysql_free_result(results_oid);
DBUG_RETURN(error_code);
}
/*
Given a table name, fetch table definition from Clustrix and fill the TABLE_SHARE
object with details about field, indexes, etc.
*/
int xpand_connection::discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name,
THD *thd, TABLE_SHARE *share)
{
DBUG_ENTER("xpand_connection::discover_table_details");
int error_code = 0;
MYSQL_RES *results_create = NULL;
MYSQL_ROW row;
String show;
ulonglong oid = 0;
Xpand_share *cs;
if ((error_code = xpand_connection::get_table_oid(db->str, db->length,
name->str, name->length,
&oid, share)))
goto error;
if (!share->ha_share)
share->ha_share= new Xpand_share;
cs= static_cast<Xpand_share*>(share->ha_share);
cs->xpand_table_oid = oid;
/* get show create statement */
show.append("show simple create table ");
show.append(db);
show.append(".");
show.append("`");
show.append(name);
show.append("`");
if (mysql_real_query(&xpand_net, show.c_ptr(), show.length())) {
if ((error_code = mysql_errno(&xpand_net))) {
DBUG_PRINT("mysql_real_query returns ", ("%d", error_code));
error_code = HA_ERR_NO_SUCH_TABLE;
goto error;
}
}
results_create = mysql_store_result(&xpand_net);
DBUG_PRINT("show table results",
("rows: %llu, fields: %u", mysql_num_rows(results_create),
mysql_num_fields(results_create)));
if (mysql_num_rows(results_create) != 1) {
error_code = HA_ERR_NO_SUCH_TABLE;
goto error;
}
if (mysql_num_fields(results_create) != 2) {
error_code = HA_ERR_CORRUPT_EVENT;
goto error;
}
while((row = mysql_fetch_row(results_create))) {
DBUG_PRINT("row", ("%s - %s", row[0], row[1]));
error_code = share->init_from_sql_statement_string(thd, false, row[1],
strlen(row[1]));
}
cs->rediscover_table = false;
error:
if (results_create)
mysql_free_result(results_create);
DBUG_RETURN(error_code);
}
#define COMMAND_BUFFER_SIZE_INCREMENT 1024
#define COMMAND_BUFFER_SIZE_INCREMENT_BITS 10
int xpand_connection::expand_command_buffer(size_t add_length)
{
size_t expanded_length;
if (command_buffer_length >= command_length + add_length)
return 0;
expanded_length = command_buffer_length +
((add_length >> COMMAND_BUFFER_SIZE_INCREMENT_BITS)
<< COMMAND_BUFFER_SIZE_INCREMENT_BITS) +
COMMAND_BUFFER_SIZE_INCREMENT;
if (!command_buffer_length)
command_buffer = (uchar *) my_malloc(expanded_length, MYF(MY_WME));
else
command_buffer = (uchar *) my_realloc(command_buffer, expanded_length,
MYF(MY_WME));
if (!command_buffer)
return HA_ERR_OUT_OF_MEM;
command_buffer_length = expanded_length;
return 0;
}
int xpand_connection::add_command_operand_uchar(uchar value)
{
int error_code = expand_command_buffer(sizeof(value));
if (error_code)
return error_code;
memcpy(command_buffer + command_length, &value, sizeof(value));
command_length += sizeof(value);
return 0;
}
int xpand_connection::add_command_operand_ushort(ushort value)
{
ushort be_value = htobe16(value);
int error_code = expand_command_buffer(sizeof(be_value));
if (error_code)
return error_code;
memcpy(command_buffer + command_length, &be_value, sizeof(be_value));
command_length += sizeof(be_value);
return 0;
}
int xpand_connection::add_command_operand_uint(uint value)
{
uint be_value = htobe32(value);
int error_code = expand_command_buffer(sizeof(be_value));
if (error_code)
return error_code;
memcpy(command_buffer + command_length, &be_value, sizeof(be_value));
command_length += sizeof(be_value);
return 0;
}
int xpand_connection::add_command_operand_ulonglong(ulonglong value)
{
ulonglong be_value = htobe64(value);
int error_code = expand_command_buffer(sizeof(be_value));
if (error_code)
return error_code;
memcpy(command_buffer + command_length, &be_value, sizeof(be_value));
command_length += sizeof(be_value);
return 0;
}
int xpand_connection::add_command_operand_lcb(ulonglong value)
{
int len = net_length_size(value);
int error_code = expand_command_buffer(len);
if (error_code)
return error_code;
net_store_length(command_buffer + command_length, value);
command_length += len;
return 0;
}
int xpand_connection::add_command_operand_str(const uchar *str,
size_t str_length)
{
int error_code = add_command_operand_lcb(str_length);
if (error_code)
return error_code;
if (!str_length)
return 0;
error_code = expand_command_buffer(str_length);
if (error_code)
return error_code;
memcpy(command_buffer + command_length, str, str_length);
command_length += str_length;
return 0;
}
/**
* @brief
* Puts variable length string into the buffer.
* @details
* Puts into the buffer variable length string the size
* of which is send by other means. For details see
* MDB Client/Server Protocol.
* @args
* str - string to send
* str_length - size
**/
int xpand_connection::add_command_operand_vlstr(const uchar *str,
size_t str_length)
{
int error_code = expand_command_buffer(str_length);
if (error_code)
return error_code;
memcpy(command_buffer + command_length, str, str_length);
command_length += str_length;
return 0;
}
int xpand_connection::add_command_operand_lex_string(LEX_CSTRING str)
{
return add_command_operand_str((const uchar *)str.str, str.length);
}
int xpand_connection::add_command_operand_bitmap(MY_BITMAP *bitmap)
{
int error_code = add_command_operand_lcb(bitmap->n_bits);
if (error_code)
return error_code;
int no_bytes = no_bytes_in_map(bitmap);
error_code = expand_command_buffer(no_bytes);
if (error_code)
return error_code;
memcpy(command_buffer + command_length, bitmap->bitmap, no_bytes);
command_length += no_bytes;
return 0;
}
/****************************************************************************
** Class xpand_host_list
****************************************************************************/
int xpand_host_list::fill(const char *hosts)
{
strtok_buf = my_strdup(hosts, MYF(MY_WME));
if (!strtok_buf) {
return HA_ERR_OUT_OF_MEM;
}
const char *sep = ",; ";
//parse into array
int i = 0;
char *cursor = NULL;
char *token = NULL;
for (token = strtok_r(strtok_buf, sep, &cursor);
token && i < max_host_count;
token = strtok_r(NULL, sep, &cursor)) {
this->hosts[i] = token;
i++;
}
//host count out of range
if (i == 0 || token) {
my_free(strtok_buf);
return ER_BAD_HOST_ERROR;
}
hosts_len = i;
return 0;
}
void xpand_host_list::empty()
{
my_free(strtok_buf);
strtok_buf = NULL;
hosts_len = 0;
}
/*****************************************************************************
Copyright (c) 2019, 2020, MariaDB Corporation.
*****************************************************************************/
#ifndef _xpand_connection_h
#define _xpand_connection_h
#ifdef USE_PRAGMA_INTERFACE
#pragma interface /* gcc class implementation */
#endif
#define MYSQL_SERVER 1
#include "my_global.h"
#include "m_string.h"
#include "mysql.h"
#include "sql_common.h"
#include "my_base.h"
#include "mysqld_error.h"
#include "my_bitmap.h"
#include "handler.h"
#define XPAND_SERVER_REQUEST 30
enum xpand_lock_mode_t {
XPAND_NO_LOCKS,
XPAND_SHARED,
XPAND_EXCLUSIVE,
};
enum xpand_balance_algorithm_enum {
XPAND_BALANCE_FIRST,
XPAND_BALANCE_ROUND_ROBIN
};
class xpand_connection_cursor;
class xpand_connection
{
private:
MYSQL xpand_net;
uchar *command_buffer;
size_t command_buffer_length;
size_t command_length;
int trans_state;
int trans_flags;
int allocate_cursor(MYSQL *xpand_net, ulong buffer_size,
xpand_connection_cursor **scan);
public:
xpand_connection();
~xpand_connection();
inline bool is_connected()
{
return xpand_net.net.vio;
}
int connect();
int connect_direct(char *host);
void disconnect(bool is_destructor = FALSE);
bool has_open_transaction();
int commit_transaction();
int rollback_transaction();
int begin_transaction_next();
int new_statement_next();
int rollback_statement_next(); // also starts new statement
void auto_commit_next();
void auto_commit_closed();
int run_query(String &stmt);
int write_row(ulonglong xpand_table_oid, uchar *packed_row,
size_t packed_size, ulonglong *last_insert_id);
int key_update(ulonglong xpand_table_oid,
uchar *packed_key, size_t packed_key_length,
MY_BITMAP *update_set,
uchar *packed_new_data, size_t packed_new_length);
int key_delete(ulonglong xpand_table_oid,
uchar *packed_key, size_t packed_key_length);
int key_read(ulonglong xpand_table_oid, uint index,
xpand_lock_mode_t lock_mode, MY_BITMAP *read_set,
uchar *packed_key, ulong packed_key_length, uchar **rowdata,
ulonglong *rowdata_length);
enum sort_order {SORT_NONE = 0, SORT_ASC = 1, SORT_DESC = 2};
enum scan_type {
READ_KEY_OR_NEXT, /* rows with key and greater */
READ_KEY_OR_PREV, /* rows with key and less. */
READ_AFTER_KEY, /* rows with keys greater than key */
READ_BEFORE_KEY, /* rows with keys less than key */
READ_FROM_START, /* rows with forwards from first key. */
READ_FROM_LAST, /* rows with backwards from last key. */
};
int scan_table(ulonglong xpand_table_oid,
xpand_lock_mode_t lock_mode,
MY_BITMAP *read_set, ushort row_req,
xpand_connection_cursor **scan, String* pushdown_cond_sql);
int scan_query(String &stmt, uchar *fieldtype, uint fields, uchar *null_bits,
uint null_bits_size, uchar *field_metadata,
uint field_metadata_size, ushort row_req, ulonglong *oids,
xpand_connection_cursor **scan);
int update_query(String &stmt, LEX_CSTRING &dbname, ulonglong *oids,
ulonglong *affected_rows);
int scan_from_key(ulonglong xpand_table_oid, uint index,
xpand_lock_mode_t lock_mode,
enum scan_type scan_dir, int no_key_cols, bool sorted_scan,
MY_BITMAP *read_set, uchar *packed_key,
ulong packed_key_length, ushort row_req,
xpand_connection_cursor **scan);
int scan_next(xpand_connection_cursor *scan, uchar **rowdata,
ulong *rowdata_length);
int scan_end(xpand_connection_cursor *scan);
int populate_table_list(LEX_CSTRING *db, handlerton::discovered_list *result);
int get_table_oid(const char *db, size_t db_len, const char *name,
size_t name_len, ulonglong *oid, TABLE_SHARE *share);
int discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, THD *thd,
TABLE_SHARE *share);
private:
int expand_command_buffer(size_t add_length);
int add_command_operand_uchar(uchar value);
int add_command_operand_ushort(ushort value);
int add_command_operand_uint(uint value);
int add_command_operand_ulonglong(ulonglong value);
int add_command_operand_lcb(ulonglong value);
int add_command_operand_str(const uchar *str, size_t length);
int add_command_operand_vlstr(const uchar *str, size_t length);
int add_command_operand_lex_string(LEX_CSTRING str);
int add_command_operand_bitmap(MY_BITMAP *bitmap);
int add_status_vars();
int begin_command(uchar command);
int send_command();
int read_query_response();
};
static const int max_host_count = 128;
class xpand_host_list {
private:
char *strtok_buf;
public:
int hosts_len;
char *hosts[max_host_count];
int fill(const char *hosts);
void empty();
};
#endif // _xpand_connection_h
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment