Commit 4252df83 authored by Sergei Petrunia's avatar Sergei Petrunia Committed by Sergei Petrunia

CLX-80: ALTER TABLE t ENGINE=CLUSTRIX fails with an error

Fix a number of issues that were preventing row-copying ALTER TABLE
from working:

- ha_xpand::create() and ha_xpand::open() should take the table name
  from the arguments, not from TABLE_SHARE object.

- ha_xpand::rename_table() should not use decode_file_path() as that
  fails with an error for temporary table names (#sql_nnnn)

- Maintenance of clustrix_table_oid through TABLE_SHARE::tabledef_version
  didn't cover the temporary work tables created by ALTER TABLE code.
  Switch to storing it in the Xpand_share instead, and update it in
  ::create() and ::open.

- On as-needed-basis, add quoting of table names to pieces of code that
  build SQL statements to be ran on the backend. This is a stop-gap
  measure until a real solution is implemented.
parent d882f13b
CREATE DATABASE xpd; CREATE DATABASE xpd;
USE xpd; USE xpd;
DROP TABLE IF EXISTS cx1; DROP TABLE IF EXISTS cx1, t1, t2;
Warnings:
Note 1051 Unknown table 'xpd.cx1'
CREATE TABLE cx1(i BIGINT)ENGINE=xpand; CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
CREATE TABLE cx1(i BIGINT)ENGINE=xpand; CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
ERROR 42S01: Table 'cx1' already exists ERROR 42S01: Table 'cx1' already exists
...@@ -80,5 +78,30 @@ a b ...@@ -80,5 +78,30 @@ a b
1 1 1 1
2 10002 2 10002
3 3 3 3
drop table t1;
#
# CLX-80: ALTER TABLE t ENGINE=CLUSTRIX fails with an error
#
create table t1 (a int) engine=myisam;
insert into t1 values (1),(2),(3);
alter table t1 engine=xpand;
select * from t1;
a
1
2
3
show create table t1;
Table Create Table
t1 CREATE TABLE `t1` (
`a` int(11) DEFAULT NULL
) ENGINE=XPAND DEFAULT CHARSET=utf8
# Try a RENAME TABLE too since the patch touches the code
alter table t1 rename t2;
show create table t2;
Table Create Table
t2 CREATE TABLE `t2` (
`a` int(11) DEFAULT NULL
) ENGINE=XPAND DEFAULT CHARSET=utf8
drop table t2;
USE test; USE test;
DROP DATABASE xpd; DROP DATABASE xpd;
CREATE DATABASE xpd; CREATE DATABASE xpd;
USE xpd; USE xpd;
DROP TABLE IF EXISTS cx1; --disable_warnings
DROP TABLE IF EXISTS cx1, t1, t2;
--enable_warnings
CREATE TABLE cx1(i BIGINT)ENGINE=xpand; CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
--error ER_TABLE_EXISTS_ERROR --error ER_TABLE_EXISTS_ERROR
CREATE TABLE cx1(i BIGINT)ENGINE=xpand; CREATE TABLE cx1(i BIGINT)ENGINE=xpand;
...@@ -75,5 +78,23 @@ prepare s from 'update t1 set b=b+? where a=?'; ...@@ -75,5 +78,23 @@ prepare s from 'update t1 set b=b+? where a=?';
execute s using 10000, 2; execute s using 10000, 2;
--sorted_result --sorted_result
select * from t1; select * from t1;
drop table t1;
--echo #
--echo # CLX-80: ALTER TABLE t ENGINE=CLUSTRIX fails with an error
--echo #
create table t1 (a int) engine=myisam;
insert into t1 values (1),(2),(3);
alter table t1 engine=xpand;
--sorted_result
select * from t1;
show create table t1;
--echo # Try a RENAME TABLE too since the patch touches the code
alter table t1 rename t2;
show create table t2;
drop table t2;
USE test; USE test;
DROP DATABASE xpd; DROP DATABASE xpd;
...@@ -7,6 +7,7 @@ Copyright (c) 2019, MariaDB Corporation. ...@@ -7,6 +7,7 @@ Copyright (c) 2019, MariaDB Corporation.
#include "ha_xpand.h" #include "ha_xpand.h"
#include "ha_xpand_pushdown.h" #include "ha_xpand_pushdown.h"
#include "key.h" #include "key.h"
#include <strfunc.h> /* strconvert */
handlerton *xpand_hton = NULL; handlerton *xpand_hton = NULL;
...@@ -195,6 +196,33 @@ uint row_buffer_setting(THD* thd) ...@@ -195,6 +196,33 @@ uint row_buffer_setting(THD* thd)
return THDVAR(thd, row_buffer); return THDVAR(thd, row_buffer);
} }
/*
Get an Xpand_share object for this object. If it doesn't yet exist, create
it.
*/
Xpand_share *ha_xpand::get_share()
{
Xpand_share *tmp_share;
DBUG_ENTER("ha_xpand::get_share()");
lock_shared_ha_data();
if (!(tmp_share= static_cast<Xpand_share*>(get_ha_share_ptr())))
{
tmp_share= new Xpand_share;
if (!tmp_share)
goto err;
set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
}
err:
unlock_shared_ha_data();
DBUG_RETURN(tmp_share);
}
/**************************************************************************** /****************************************************************************
** Utility functions ** Utility functions
****************************************************************************/ ****************************************************************************/
...@@ -211,42 +239,71 @@ size_t estimate_row_size(TABLE *table) ...@@ -211,42 +239,71 @@ size_t estimate_row_size(TABLE *table)
return row_size; return row_size;
} }
/**
* @brief /*
* Decodes object name. Try to decode a string from filename encoding, if that fails, return the
* original string.
* @details
* Replaces the encoded object name in the path with a decoded variant, @detail
* e.g if path contains ./test/d@0024. This f() makes it ./test/d$ This is used to get table (or database) name from file (or directory)
* name. Names of regular tables/databases are encoded using
* Used in delete and rename DDL processing. my_charset_filename encoding.
**/ Names of temporary tables are not encoded, and they start with '#sql'
static void decode_objectname(char *buf, const char *path, size_t buf_size) which is not a valid character sequence in my_charset_filename encoding.
{ Our way to talkle this is to
size_t new_path_len = filename_to_tablename(path, buf, buf_size); 1. Try to convert the name back
buf[new_path_len] = '\0'; 2. If that failed, assume it's a temporary object name and just use the
name.
*/
static void decode_object_or_tmp_name(const char *from, uint size,
std::string *out)
{
uint errors, new_size;
out->resize(size+1); // assume the decoded string is not longer
new_size= strconvert(&my_charset_filename, from, size,
system_charset_info, (char*)out->c_str(), size+1,
&errors);
if (errors)
out->assign(from, size);
else
out->resize(new_size);
} }
static void decode_file_path(const char *path, char *decoded_dbname, /*
char *decoded_tbname) Take a "./db_name/table_name" and extract db_name and table_name from it
@return
0 OK
other Error code
*/
static int normalize_tablename(const char *db_table,
std::string *norm_db, std::string *norm_table)
{ {
// The format cont ains './' in the beginning of a path. std::string tablename(db_table);
char *dbname_start = (char*) path + 2; if (tablename.size() < 2 || tablename[0] != '.' ||
char *dbname_end = dbname_start; (tablename[1] != FN_LIBCHAR && tablename[1] != FN_LIBCHAR2)) {
while (*dbname_end != '/') DBUG_ASSERT(0); // We were not passed table name?
dbname_end++; return HA_ERR_INTERNAL_ERROR;
}
int cnt = dbname_end - dbname_start; size_t pos = tablename.find_first_of(FN_LIBCHAR, 2);
char *dbname = (char *)my_alloca(cnt + 1); if (pos == std::string::npos) {
memcpy(dbname, dbname_start, cnt); pos = tablename.find_first_of(FN_LIBCHAR2, 2);
dbname[cnt] = '\0'; }
decode_objectname(decoded_dbname, dbname, FN_REFLEN);
my_afree(dbname); if (pos == std::string::npos) {
DBUG_ASSERT(0); // We were not passed table name?
return HA_ERR_INTERNAL_ERROR;
}
char *tbname_start = dbname_end + 1; decode_object_or_tmp_name(tablename.c_str() + 2, pos - 2, norm_db);
decode_objectname(decoded_tbname, tbname_start, FN_REFLEN); decode_object_or_tmp_name(tablename.c_str() + pos + 1,
tablename.size() - (pos + 1), norm_table);
return 0;
} }
xpand_connection *get_trx(THD *thd, int *error_code) xpand_connection *get_trx(THD *thd, int *error_code)
{ {
*error_code = 0; *error_code = 0;
...@@ -290,6 +347,7 @@ ha_xpand::~ha_xpand() ...@@ -290,6 +347,7 @@ ha_xpand::~ha_xpand()
remove_current_table_from_rpl_table_list(rgi); remove_current_table_from_rpl_table_list(rgi);
} }
int ha_xpand::create(const char *name, TABLE *form, HA_CREATE_INFO *info) int ha_xpand::create(const char *name, TABLE *form, HA_CREATE_INFO *info)
{ {
int error_code; int error_code;
...@@ -311,12 +369,16 @@ int ha_xpand::create(const char *name, TABLE *form, HA_CREATE_INFO *info) ...@@ -311,12 +369,16 @@ int ha_xpand::create(const char *name, TABLE *form, HA_CREATE_INFO *info)
ulong old = create_info->used_fields; ulong old = create_info->used_fields;
create_info->used_fields &= ~HA_CREATE_USED_ENGINE; create_info->used_fields &= ~HA_CREATE_USED_ENGINE;
std::string norm_db, norm_table;
if ((error_code= normalize_tablename(name, &norm_db, &norm_table)))
return error_code;
TABLE_LIST table_list; TABLE_LIST table_list;
memset(&table_list, 0, sizeof(table_list)); memset(&table_list, 0, sizeof(table_list));
table_list.table = form; table_list.table = form;
error_code = show_create_table(thd, &table_list, &create_table_stmt, error_code = show_create_table_ex(thd, &table_list,
create_info, WITH_DB_NAME); norm_db.c_str(), norm_table.c_str(),
&create_table_stmt, create_info, WITH_DB_NAME);
if (!is_tmp_table) if (!is_tmp_table)
form->s->tmp_table = saved_tmp_table_type; form->s->tmp_table = saved_tmp_table_type;
create_info->used_fields = old; create_info->used_fields = old;
...@@ -334,6 +396,13 @@ int ha_xpand::create(const char *name, TABLE *form, HA_CREATE_INFO *info) ...@@ -334,6 +396,13 @@ int ha_xpand::create(const char *name, TABLE *form, HA_CREATE_INFO *info)
} }
error_code = trx->run_query(create_table_stmt); error_code = trx->run_query(create_table_stmt);
if (error_code)
return error_code;
// Load the oid of the created table
error_code= trx->get_table_oid(norm_db, norm_table, &xpand_table_oid,
table_share);
return error_code; return error_code;
} }
...@@ -345,15 +414,17 @@ int ha_xpand::delete_table(const char *path) ...@@ -345,15 +414,17 @@ int ha_xpand::delete_table(const char *path)
if (!trx) if (!trx)
return error_code; return error_code;
char decoded_dbname[FN_REFLEN]; std::string decoded_dbname;
char decoded_tbname[FN_REFLEN]; std::string decoded_tbname;
decode_file_path(path, decoded_dbname, decoded_tbname); if ((error_code= normalize_tablename(path, &decoded_dbname,
&decoded_tbname)))
return error_code;
String delete_cmd; String delete_cmd;
delete_cmd.append("DROP TABLE `"); delete_cmd.append("DROP TABLE `");
delete_cmd.append(decoded_dbname); delete_cmd.append(decoded_dbname.c_str());
delete_cmd.append("`.`"); delete_cmd.append("`.`");
delete_cmd.append(decoded_tbname); delete_cmd.append(decoded_tbname.c_str());
delete_cmd.append("`"); delete_cmd.append("`");
return trx->run_query(delete_cmd); return trx->run_query(delete_cmd);
...@@ -367,23 +438,27 @@ int ha_xpand::rename_table(const char* from, const char* to) ...@@ -367,23 +438,27 @@ int ha_xpand::rename_table(const char* from, const char* to)
if (!trx) if (!trx)
return error_code; return error_code;
char decoded_from_dbname[FN_REFLEN]; std::string decoded_from_dbname;
char decoded_from_tbname[FN_REFLEN]; std::string decoded_from_tbname;
decode_file_path(from, decoded_from_dbname, decoded_from_tbname); if ((error_code= normalize_tablename(from, &decoded_from_dbname,
&decoded_from_tbname)))
return error_code;
char decoded_to_dbname[FN_REFLEN]; std::string decoded_to_dbname;
char decoded_to_tbname[FN_REFLEN]; std::string decoded_to_tbname;
decode_file_path(to, decoded_to_dbname, decoded_to_tbname); if ((error_code= normalize_tablename(to, &decoded_to_dbname,
&decoded_to_tbname)))
return error_code;
String rename_cmd; String rename_cmd;
rename_cmd.append("RENAME TABLE `"); rename_cmd.append("RENAME TABLE `");
rename_cmd.append(decoded_from_dbname); rename_cmd.append(decoded_from_dbname.c_str());
rename_cmd.append("`.`"); rename_cmd.append("`.`");
rename_cmd.append(decoded_from_tbname); rename_cmd.append(decoded_from_tbname.c_str());
rename_cmd.append("` TO `"); rename_cmd.append("` TO `");
rename_cmd.append(decoded_to_dbname); rename_cmd.append(decoded_to_dbname.c_str());
rename_cmd.append("`.`"); rename_cmd.append("`.`");
rename_cmd.append(decoded_to_tbname); rename_cmd.append(decoded_to_tbname.c_str());
rename_cmd.append("`;"); rename_cmd.append("`;");
return trx->run_query(rename_cmd); return trx->run_query(rename_cmd);
...@@ -392,21 +467,37 @@ int ha_xpand::rename_table(const char* from, const char* to) ...@@ -392,21 +467,37 @@ int ha_xpand::rename_table(const char* from, const char* to)
static void static void
xpand_mark_table_for_discovery(TABLE *table) xpand_mark_table_for_discovery(TABLE *table)
{ {
table->s->tabledef_version.str = NULL;
table->s->tabledef_version.length = 0;
table->m_needs_reopen = TRUE; table->m_needs_reopen = TRUE;
Xpand_share *xs;
if ((xs= (Xpand_share*)table->s->ha_share))
xs->xpand_table_oid= 0;
} }
int ha_xpand::open(const char *name, int mode, uint test_if_locked) int ha_xpand::open(const char *name, int mode, uint test_if_locked)
{ {
THD *thd= ha_thd();
DBUG_ENTER("ha_xpand::open"); DBUG_ENTER("ha_xpand::open");
DBUG_PRINT("oid",
("%s", table->s->tabledef_version.str));
if (!table->s->tabledef_version.str) if (!(share = get_share()))
DBUG_RETURN(HA_ERR_TABLE_DEF_CHANGED); DBUG_RETURN(1);
if (!xpand_table_oid) int error_code;
xpand_table_oid = atoll((const char *)table->s->tabledef_version.str); xpand_connection *trx = get_trx(thd, &error_code);
if (!trx)
DBUG_RETURN(error_code);
if (!share->xpand_table_oid) {
// We may end up with two threads executing this piece concurrently but
// it's ok
std::string norm_table;
std::string norm_db;
if ((error_code= normalize_tablename(name, &norm_db, &norm_table)))
DBUG_RETURN(error_code);
error_code= trx->get_table_oid(norm_db, norm_table, &xpand_table_oid,
table_share);
if (error_code)
DBUG_RETURN(error_code);
}
// Surrogate key marker // Surrogate key marker
has_hidden_key = table->s->primary_key == MAX_KEY; has_hidden_key = table->s->primary_key == MAX_KEY;
...@@ -1298,6 +1389,7 @@ int xpand_discover_table(handlerton *hton, THD *thd, TABLE_SHARE *share) ...@@ -1298,6 +1389,7 @@ int xpand_discover_table(handlerton *hton, THD *thd, TABLE_SHARE *share)
static int xpand_init(void *p) static int xpand_init(void *p)
{ {
DBUG_ENTER("xpand_init"); DBUG_ENTER("xpand_init");
xpand_hton = (handlerton *) p; xpand_hton = (handlerton *) p;
xpand_hton->flags = HTON_NO_FLAGS; xpand_hton->flags = HTON_NO_FLAGS;
xpand_hton->panic = xpand_panic; xpand_hton->panic = xpand_panic;
......
...@@ -30,9 +30,18 @@ int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data, ...@@ -30,9 +30,18 @@ int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data,
uchar const *const row_data, MY_BITMAP const *cols, uchar const *const row_data, MY_BITMAP const *cols,
uchar const *const row_end); uchar const *const row_end);
class Xpand_share : public Handler_share {
public:
Xpand_share(): xpand_table_oid(0) {}
std::atomic<ulonglong> xpand_table_oid;
};
class ha_xpand : public handler class ha_xpand : public handler
{ {
private: private:
// TODO: do we need this here or one in share would be sufficient?
ulonglong xpand_table_oid; ulonglong xpand_table_oid;
rpl_group_info *rgi; rpl_group_info *rgi;
...@@ -56,6 +65,9 @@ class ha_xpand : public handler ...@@ -56,6 +65,9 @@ class ha_xpand : public handler
} xpd_upsert_flags_t; } xpd_upsert_flags_t;
int upsert_flag; int upsert_flag;
Xpand_share *share;
Xpand_share *get_share(); ///< Get the share
public: public:
ha_xpand(handlerton *hton, TABLE_SHARE *table_arg); ha_xpand(handlerton *hton, TABLE_SHARE *table_arg);
~ha_xpand(); ~ha_xpand();
......
...@@ -5,6 +5,7 @@ Copyright (c) 2019, MariaDB Corporation. ...@@ -5,6 +5,7 @@ Copyright (c) 2019, MariaDB Corporation.
/** @file xpand_connection.cc */ /** @file xpand_connection.cc */
#include "xpand_connection.h" #include "xpand_connection.h"
#include "ha_xpand.h"
#include <string> #include <string>
#include "handler.h" #include "handler.h"
#include "table.h" #include "table.h"
...@@ -988,24 +989,40 @@ int xpand_connection::populate_table_list(LEX_CSTRING *db, ...@@ -988,24 +989,40 @@ int xpand_connection::populate_table_list(LEX_CSTRING *db,
return error_code; return error_code;
} }
int xpand_connection::discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name,
THD *thd, TABLE_SHARE *share) /*
Given a table name, find its OID in the Clustrix, and save it in TABLE_SHARE
@param db Database name
@param name Table name
@param oid OUT Return the OID here
@param share INOUT If not NULL and the share has ha_share pointer, also
update Xpand_share::xpand_table_oid.
@return
0 - OK
error code if an error occurred
*/
int xpand_connection::get_table_oid(const std::string &db,
const std::string &name,
ulonglong *oid,
TABLE_SHARE *share)
{ {
DBUG_ENTER("xpand_connection::discover_table_details"); MYSQL_ROW row;
int error_code = 0; int error_code = 0;
MYSQL_RES *results_oid = NULL; MYSQL_RES *results_oid = NULL;
MYSQL_RES *results_create = NULL; String get_oid;
MYSQL_ROW row; DBUG_ENTER("xpand_connection::get_table_oid");
String get_oid, show;
/* get oid */ /* get oid */
get_oid.append("select r.table " get_oid.append("select r.table "
"from system.databases d " "from system.databases d "
" inner join ""system.relations r on d.db = r.db " " inner join ""system.relations r on d.db = r.db "
"where d.name = '"); "where d.name = '");
get_oid.append(db); get_oid.append(db.c_str());
get_oid.append("' and r.name = '"); get_oid.append("' and r.name = '");
get_oid.append(name); get_oid.append(name.c_str());
get_oid.append("'"); get_oid.append("'");
if (mysql_real_query(&xpand_net, get_oid.c_ptr(), get_oid.length())) { if (mysql_real_query(&xpand_net, get_oid.c_ptr(), get_oid.length())) {
...@@ -1026,24 +1043,47 @@ int xpand_connection::discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, ...@@ -1026,24 +1043,47 @@ int xpand_connection::discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name,
goto error; goto error;
} }
while((row = mysql_fetch_row(results_oid))) { if ((row = mysql_fetch_row(results_oid))) {
DBUG_PRINT("row", ("%s", row[0])); DBUG_PRINT("row", ("%s", row[0]));
uchar *to = (uchar*)alloc_root(&share->mem_root, strlen(row[0]) + 1);
if (!to) { *oid = strtoull((const char *)row[0], NULL, 10);
error_code = HA_ERR_OUT_OF_MEM; if (share->ha_share) {
Xpand_share *cs= (Xpand_share*)share->ha_share;
cs->xpand_table_oid = *oid;
}
} else {
error_code = HA_ERR_NO_SUCH_TABLE;
goto error; goto error;
} }
strcpy((char *)to, (char *)row[0]); error:
share->tabledef_version.str = to; if (results_oid)
share->tabledef_version.length = strlen(row[0]); mysql_free_result(results_oid);
}
DBUG_RETURN(error_code);
}
/*
Given a table name, fetch table definition from Clustrix and fill the TABLE_SHARE
object with details about field, indexes, etc.
*/
int xpand_connection::discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name,
THD *thd, TABLE_SHARE *share)
{
DBUG_ENTER("xpand_connection::discover_table_details");
int error_code = 0;
MYSQL_RES *results_create = NULL;
MYSQL_ROW row;
String show;
/* get show create statement */ /* get show create statement */
show.append("show simple create table "); show.append("show simple create table ");
show.append(db); show.append(db);
show.append("."); show.append(".");
show.append("`");
show.append(name); show.append(name);
show.append("`");
if (mysql_real_query(&xpand_net, show.c_ptr(), show.length())) { if (mysql_real_query(&xpand_net, show.c_ptr(), show.length())) {
if ((error_code = mysql_errno(&xpand_net))) { if ((error_code = mysql_errno(&xpand_net))) {
DBUG_PRINT("mysql_real_query returns ", ("%d", error_code)); DBUG_PRINT("mysql_real_query returns ", ("%d", error_code));
...@@ -1074,9 +1114,6 @@ int xpand_connection::discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, ...@@ -1074,9 +1114,6 @@ int xpand_connection::discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name,
} }
error: error:
if (results_oid)
mysql_free_result(results_oid);
if (results_create) if (results_create)
mysql_free_result(results_create); mysql_free_result(results_create);
DBUG_RETURN(error_code); DBUG_RETURN(error_code);
......
...@@ -103,6 +103,8 @@ class xpand_connection ...@@ -103,6 +103,8 @@ class xpand_connection
int scan_end(xpand_connection_cursor *scan); int scan_end(xpand_connection_cursor *scan);
int populate_table_list(LEX_CSTRING *db, handlerton::discovered_list *result); int populate_table_list(LEX_CSTRING *db, handlerton::discovered_list *result);
int get_table_oid(const std::string& db, const std::string &name,
ulonglong *oid, TABLE_SHARE *share);
int discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, THD *thd, int discover_table_details(LEX_CSTRING *db, LEX_CSTRING *name, THD *thd,
TABLE_SHARE *share); TABLE_SHARE *share);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment