Commit bdaffefd authored by Roman Nozdrin's avatar Roman Nozdrin Committed by Sergei Petrunia

CLX-43 An initial version of fast statement-based UPSERT that handles

both single row/value UPSERT and bulk UPSERT.
parent aca476f5
......@@ -6,18 +6,18 @@ Note 1051 Unknown table 'db1.t1'
CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=clustrixdb;
INSERT INTO `t1` (i, t) VALUES (42, 'один');
INSERT INTO `t1` (i, t) VALUES (42, 'ноль');
SELECT * FROM `t1`;
SELECT * FROM `t1` ORDER BY `i`;
i t
42 один
42 ноль
UPDATE `t1` SET i=i+1,t='два' WHERE t='один';
SELECT * FROM `t1`;
SELECT * FROM `t1` ORDER BY `i`;
i t
42 один
42 ноль
USE test;
UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два';
SELECT * FROM `db1`.`t1`;
SELECT * FROM `db1`.`t1` ORDER BY `i`;
i t
42 один
42 ноль
......
......@@ -4,14 +4,14 @@ DROP TABLE IF EXISTS `t1`;
CREATE TABLE `t1`(i BIGINT, t TEXT)ENGINE=clustrixdb;
INSERT INTO `t1` (i, t) VALUES (42, 'один');
INSERT INTO `t1` (i, t) VALUES (42, 'ноль');
SELECT * FROM `t1`;
SELECT * FROM `t1` ORDER BY `i`;
UPDATE `t1` SET i=i+1,t='два' WHERE t='один';
SELECT * FROM `t1`;
SELECT * FROM `t1` ORDER BY `i`;
USE test;
UPDATE `db1`.`t1` SET i=i+1,t='три' WHERE t='два';
SELECT * FROM `db1`.`t1`;
SELECT * FROM `db1`.`t1` ORDER BY `i`;
DROP TABLE `db1`.`t1`;
......
CREATE DATABASE IF NOT EXISTS `db1`;
USE `db1`;
DROP TABLE IF EXISTS `ins_duplicate`;
Warnings:
Note 1051 Unknown table 'db1.ins_duplicate'
CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=clustrixdb;
INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra');
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 Aardvark
2 Cheetah
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope');
ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1)
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 Banana
2 Cheetah
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1)
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 hybrid
2 hybrid
3 Zebra
BEGIN;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 hybrid
2 hybrid
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope');
ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1)
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 Vegetable
2 hybrid
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1)
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2';
COMMIT;
BEGIN;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 hybrid2
2 hybrid2
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope');
ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1)
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 Vegetable
2 hybrid2
3 Zebra
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
ERROR 23000: Clustrix error: [5120] Duplicate key in container: `db1`.`ins_duplicate` Primary key: (1)
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3';
ROLLBACK;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
id animal
1 hybrid2
2 hybrid2
3 Zebra
DROP TABLE `db1`.`ins_duplicate`;
USE test;
DROP DATABASE `db1`;
CREATE DATABASE IF NOT EXISTS `db1`;
USE `db1`;
DROP TABLE IF EXISTS `ins_duplicate`;
CREATE TABLE `ins_duplicate`(`id` INT PRIMARY KEY, `animal` VARCHAR(30)) ENGINE=clustrixdb;
INSERT INTO `ins_duplicate` VALUES (1,'Aardvark'), (2,'Cheetah'), (3,'Zebra');
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_ENTRY
INSERT INTO ins_duplicate VALUES (1,'Antelope');
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Banana';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_ENTRY
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
BEGIN;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_ENTRY
INSERT INTO ins_duplicate VALUES (1,'Antelope');
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_ENTRY
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid2';
COMMIT;
BEGIN;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_ENTRY
INSERT INTO ins_duplicate VALUES (1,'Antelope');
INSERT INTO ins_duplicate VALUES (1,'Antelope') ON DUPLICATE KEY UPDATE animal='Vegetable';
SELECT * FROM `ins_duplicate` ORDER BY `id`;
--error ER_DUP_ENTRY
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah');
INSERT INTO ins_duplicate VALUES (1,'Antelope'),(2,'Cheetah') ON DUPLICATE KEY UPDATE animal='hybrid3';
ROLLBACK;
SELECT * FROM `ins_duplicate` ORDER BY `id`;
DROP TABLE `db1`.`ins_duplicate`;
USE test;
DROP DATABASE `db1`;
......@@ -21,6 +21,12 @@ Copyright (c) 2019, MariaDB Corporation.
#define CLUSTRIX_SERVER_REQUEST 30
typedef enum clustrix_upsert_flags {
CLUSTRIX_HAS_UPSERT= 1,
CLUSTRIX_BULK_UPSERT= 2,
CLUSTRIX_UPSERT_SENT= 4
} clx_upsert_flags_t;
class clustrix_connection_cursor;
class clustrix_connection
{
......@@ -37,6 +43,7 @@ class clustrix_connection
size_t reply_length;
bool has_transaction;
uint upsert_flag;
bool has_anonymous_savepoint;
int commit_flag_next;
......@@ -48,6 +55,7 @@ class clustrix_connection
memset(&clustrix_net, 0, sizeof(MYSQL));
has_anonymous_savepoint = FALSE;
has_transaction = FALSE;
upsert_flag = 0;
commit_flag_next = 0;
DBUG_VOID_RETURN;
}
......@@ -82,6 +90,21 @@ class clustrix_connection
return has_transaction;
}
inline void unset_upsert(clx_upsert_flags_t state)
{
upsert_flag &= ~state;
}
inline void set_upsert(clx_upsert_flags_t state)
{
upsert_flag |= state;
}
inline bool check_upsert(clx_upsert_flags_t state)
{
return upsert_flag & state;
}
bool set_anonymous_savepoint();
bool release_anonymous_savepoint();
bool rollback_to_anonymous_savepoint();
......
......@@ -417,6 +417,33 @@ int ha_clustrixdb::reset()
return 0;
}
int ha_clustrixdb::extra(enum ha_extra_function operation)
{
DBUG_ENTER("ha_clustrixdb::extra");
int error_code = 0;
THD *thd = ha_thd();
clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx)
return error_code;
if (operation == HA_EXTRA_INSERT_WITH_UPDATE) {
trx->set_upsert(CLUSTRIX_HAS_UPSERT);
}
DBUG_RETURN(0);
}
/*@brief UPSERT State Machine*/
/*************************************************************
* DESCRIPTION:
* Fasttrack for UPSERT sends queries down to a CLX backend.
* UPSERT could be of two kinds: singular and bulk. The plugin
* re-/sets CLUSTRIX_BULK_UPSERT in end|start_bulk_insert
* methods. CLUSTRIX_UPSERT_SENT is used to avoid multiple
* execution at CLX backend.
* Generic CLUSTRIX_HAS_UPSERT is set for bulk UPSERT only b/c
* MDB calls write_row only once.
************************************************************/
int ha_clustrixdb::write_row(const uchar *buf)
{
int error_code = 0;
......@@ -425,6 +452,26 @@ int ha_clustrixdb::write_row(const uchar *buf)
if (!trx)
return error_code;
if (trx->check_upsert(CLUSTRIX_HAS_UPSERT)) {
if (!trx->check_upsert(CLUSTRIX_UPSERT_SENT)) {
ha_rows update_rows;
String update_stmt;
update_stmt.append(thd->query_string.str());
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trx->auto_commit_next();
error_code= trx->update_query(update_stmt, table->s->db, &update_rows);
thd->get_stmt_da()->set_overwrite_status(true);
if (trx->check_upsert(CLUSTRIX_BULK_UPSERT))
trx->set_upsert(CLUSTRIX_UPSERT_SENT);
else
trx->unset_upsert(CLUSTRIX_HAS_UPSERT);
}
return error_code;
}
/* Convert the row format to binlog (packed) format */
uchar *packed_new_row = (uchar*) my_alloca(estimate_row_size(table));
size_t packed_size = pack_row(table, table->write_set, packed_new_row, buf);
......@@ -509,6 +556,39 @@ int ha_clustrixdb::direct_update_rows(ha_rows *update_rows)
DBUG_RETURN(error_code);
}
void ha_clustrixdb::start_bulk_insert(ha_rows rows, uint flags)
{
DBUG_ENTER("ha_clustrixdb::start_bulk_insert");
int error_code= 0;
THD *thd= ha_thd();
clustrix_connection *trx= get_trx(thd, &error_code);
if (!trx) {
// TBD log this
DBUG_VOID_RETURN;
}
trx->set_upsert(CLUSTRIX_BULK_UPSERT);
DBUG_VOID_RETURN;
}
int ha_clustrixdb::end_bulk_insert()
{
DBUG_ENTER("ha_clustrixdb::end_bulk_insert");
int error_code= 0;
THD *thd= ha_thd();
clustrix_connection *trx= get_trx(thd, &error_code);
if (!trx) {
DBUG_RETURN(error_code);
}
trx->unset_upsert(CLUSTRIX_BULK_UPSERT);
trx->unset_upsert(CLUSTRIX_HAS_UPSERT);
trx->unset_upsert(CLUSTRIX_UPSERT_SENT);
DBUG_RETURN(0);
}
int ha_clustrixdb::delete_row(const uchar *buf)
{
int error_code;
......@@ -942,6 +1022,7 @@ THR_LOCK_DATA **ha_clustrixdb::store_lock(THD *thd,
int ha_clustrixdb::external_lock(THD *thd, int lock_type)
{
DBUG_ENTER("ha_clustrixdb::external_lock()");
int error_code;
clustrix_connection *trx = get_trx(thd, &error_code);
if (lock_type != F_UNLCK) {
......@@ -956,7 +1037,7 @@ int ha_clustrixdb::external_lock(THD *thd, int lock_type)
}
}
return 0;
DBUG_RETURN(0);
}
/****************************************************************************
......
......@@ -57,6 +57,8 @@ class ha_clustrixdb : public handler
int delete_row(const uchar *buf);
int direct_update_rows_init(List<Item> *update_fields);
int direct_update_rows(ha_rows *update_rows);
void start_bulk_insert(ha_rows rows, uint flags = 0);
int end_bulk_insert();
Table_flags table_flags(void) const;
ulong index_flags(uint idx, uint part, bool all_parts) const;
......@@ -67,6 +69,7 @@ class ha_clustrixdb : public handler
key_range *max_key);
int info(uint flag); // see my_base.h for full description
int extra(enum ha_extra_function operation);
// multi_read_range
// read_range
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment