Commit fedd0901 authored by Michael Widenius's avatar Michael Widenius

Applied patch for lp:585688 "maridb crashes in federatedx code" from lp:~atcurtis/maria/federatedx:

- Fixed Partition engine to store CONNECTION string for partitions.
  Removed HA_NO_PARTITION flag from FederatedX.
  Added test 'federated_partition' to suite.
- lp:#585688 - maridb crashes in federatedx code
  FederatedX handler instances, created on one thread and used on
  another thread (via table cache) when "show table status" is executed
  crashed because txn member was not initialized for current thread.
  Added test 'federated_bug_585688' to suite.

Author for the patch is Antony Curtis

mysql-test/suite/federated/federated_bug_585688.result:
  Test for lp:585688
mysql-test/suite/federated/federated_bug_585688.test:
  Test for lp:585688
mysql-test/suite/federated/federated_partition-slave.opt:
  Test for partition support in federatedx
mysql-test/suite/federated/federated_partition.result:
  Test for partition support in federatedx
mysql-test/suite/federated/federated_partition.test:
  Test for partition support in federatedx
mysql-test/t/partition_federated.test:
  Updated error message
sql/ha_partition.cc:
  Added support for connection strings to partitions for federatedx
sql/ha_partition.h:
  Added support for connection strings to partitions for federatedx
sql/partition_element.h:
  Added support for connection strings to partitions for federatedx
sql/sql_yacc.yy:
  Added support for connection strings to partitions for federatedx
storage/federatedx/ha_federatedx.cc:
  Added support for partitions.
  FederatedX handler instances, created on one thread and used on another thread (via table cache) when "show table status"
  is executed crashed because txn member was not initialized for current thread.
parent 4f6e375b
CREATE DATABASE federated;
CREATE DATABASE federated;
#
# Bug #585688: maridb crashes in federatedx code
#
CREATE TABLE federated.t1(a TEXT);
INSERT INTO federated.t1 VALUES('abc'), ('gh'), ('f'), ('ijk'), ('de');
flush tables;
CREATE TABLE federated.t1(a TEXT) ENGINE=FEDERATED
CONNECTION='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1';
flush tables;
describe federated.t1;
Field Type Null Key Default Extra
a text YES NULL
show table status from federated;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 FEDERATED 10 Dynamic 5 20 X X X X X X X X latin1_swedish_ci NULL
show table status from federated;
Name Engine Version Row_format Rows Avg_row_length Data_length Max_data_length Index_length Data_free Auto_increment Create_time Update_time Check_time Collation Checksum Create_options Comment
t1 FEDERATED 10 Dynamic 5 20 X X X X X X X X latin1_swedish_ci NULL
DROP TABLE federated.t1;
DROP TABLE federated.t1;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
source federated.inc;
--echo #
--echo # Bug #585688: maridb crashes in federatedx code
--echo #
connection slave;
CREATE TABLE federated.t1(a TEXT);
INSERT INTO federated.t1 VALUES('abc'), ('gh'), ('f'), ('ijk'), ('de');
connect (conn_1,127.0.0.1,root,,,$MASTER_MYPORT);
connection master;
flush tables;
connection conn_1;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval CREATE TABLE federated.t1(a TEXT) ENGINE=FEDERATED
CONNECTION='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1';
disconnect conn_1;
connection master;
flush tables;
connect (conn_2,127.0.0.1,root,,,$MASTER_MYPORT);
connect (conn_3,127.0.0.1,root,,,$MASTER_MYPORT);
connection conn_2;
describe federated.t1;
connection conn_3;
--replace_column 7 X 8 X 9 X 10 X 11 X 12 X 13 X 14 X
show table status from federated;
disconnect conn_2;
connect (conn_4,127.0.0.1,root,,,$MASTER_MYPORT);
connection conn_4;
--replace_column 7 X 8 X 9 X 10 X 11 X 12 X 13 X 14 X
show table status from federated;
disconnect conn_3;
disconnect conn_4;
connection master;
DROP TABLE federated.t1;
connection slave;
DROP TABLE federated.t1;
connection default;
source federated_cleanup.inc;
CREATE DATABASE federated;
CREATE DATABASE federated;
drop table if exists t1;
create table federated.t1_1 (s1 int primary key) engine=myisam;
create table federated.t1_2 (s1 int primary key) engine=innodb;
create table t1 (s1 int primary key) engine=federated
partition by list (s1)
(partition p1 values in (1,3)
connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1_1',
partition p2 values in (2,4)
connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1_2');
insert into t1 values (1), (2), (3), (4);
select * from t1;
s1
1
3
2
4
select * from federated.t1_1;
s1
1
3
select * from federated.t1_2;
s1
2
4
drop table t1;
drop table federated.t1_1;
drop table federated.t1_2;
End of 5.1 tests
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated;
#
# Tests for partitioned FEDERATED
#
source include/have_partition.inc;
source include/have_innodb.inc;
source federated.inc;
disable_warnings;
drop table if exists t1;
enable_warnings;
#
# Federated + partition
#
# Create 2 tables on the Slave, we can use different storage engines.
# Then create a Federated table on the Master, using different connect
# string to specify the two different target partitions we want to use.
#
connection slave;
create table federated.t1_1 (s1 int primary key) engine=myisam;
create table federated.t1_2 (s1 int primary key) engine=innodb;
connection master;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval create table t1 (s1 int primary key) engine=federated
partition by list (s1)
(partition p1 values in (1,3)
connection='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1_1',
partition p2 values in (2,4)
connection='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1_2');
insert into t1 values (1), (2), (3), (4);
select * from t1;
connection slave;
select * from federated.t1_1;
select * from federated.t1_2;
connection master;
drop table t1;
connection slave;
drop table federated.t1_1;
drop table federated.t1_2;
--echo End of 5.1 tests
source federated_cleanup.inc;
......@@ -13,7 +13,7 @@ drop table if exists t1;
# Bug #22451 Partitions: duplicate results with engine=federated
#
--error ER_PARTITION_MERGE_ERROR
--error ER_CONNECT_TO_FOREIGN_DATA_SOURCE
create table t1 (s1 int) engine=federated
connection='mysql://root@localhost/federated/t1' partition by list (s1)
(partition p1 values in (1), partition p2 values in (2));
......
......@@ -167,6 +167,7 @@ ha_partition::ha_partition(handlerton *hton, TABLE_SHARE *share)
m_is_sub_partitioned(0)
{
DBUG_ENTER("ha_partition::ha_partition(table)");
init_alloc_root(&m_mem_root, 512, 512);
init_handler_variables();
DBUG_VOID_RETURN;
}
......@@ -188,6 +189,7 @@ ha_partition::ha_partition(handlerton *hton, partition_info *part_info)
m_is_sub_partitioned(m_part_info->is_sub_partitioned())
{
DBUG_ENTER("ha_partition::ha_partition(part_info)");
init_alloc_root(&m_mem_root, 512, 512);
init_handler_variables();
DBUG_ASSERT(m_part_info);
DBUG_VOID_RETURN;
......@@ -212,6 +214,7 @@ void ha_partition::init_handler_variables()
m_file_buffer= NULL;
m_name_buffer_ptr= NULL;
m_engine_array= NULL;
m_connect_string= NULL;
m_file= NULL;
m_file_tot_parts= 0;
m_reorged_file= NULL;
......@@ -286,9 +289,11 @@ ha_partition::~ha_partition()
for (i= 0; i < m_tot_parts; i++)
delete m_file[i];
}
my_free((char*) m_ordered_rec_buffer, MYF(MY_ALLOW_ZERO_PTR));
clear_handler_file();
free_root(&m_mem_root, MYF(0));
DBUG_VOID_RETURN;
}
......@@ -555,6 +560,13 @@ int ha_partition::create(const char *name, TABLE *table_arg,
char t_name[FN_REFLEN];
DBUG_ENTER("ha_partition::create");
if (create_info->used_fields & HA_CREATE_USED_CONNECTION)
{
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
"CONNECTION not valid for partition");
DBUG_RETURN(1);
}
strmov(t_name, name);
DBUG_ASSERT(*fn_rext((char*)name) == '\0');
if (del_ren_cre_table(t_name, NULL, table_arg, create_info))
......@@ -1227,6 +1239,7 @@ int ha_partition::prepare_new_partition(TABLE *tbl,
if ((error= set_up_table_before_create(tbl, part_name, create_info,
0, p_elem)))
goto error_create;
tbl->s->connect_string = p_elem->connect_string;
if ((error= file->ha_create(part_name, tbl, create_info)))
{
/*
......@@ -1747,6 +1760,8 @@ void ha_partition::update_create_info(HA_CREATE_INFO *create_info)
create_info->auto_increment_value= stats.auto_increment_value;
create_info->data_file_name= create_info->index_file_name = NULL;
create_info->connect_string.str= NULL;
create_info->connect_string.length= 0;
return;
}
......@@ -2034,6 +2049,10 @@ int ha_partition::set_up_table_before_create(TABLE *tbl,
}
info->index_file_name= part_elem->index_file_name;
info->data_file_name= part_elem->data_file_name;
info->connect_string= part_elem->connect_string;
if (info->connect_string.length)
info->used_fields|= HA_CREATE_USED_CONNECTION;
tbl->s->connect_string= part_elem->connect_string;
DBUG_RETURN(0);
}
......@@ -2148,8 +2167,10 @@ bool ha_partition::create_handler_file(const char *name)
tot_name_words= (tot_name_len + 3) / 4;
tot_len_words= 4 + tot_partition_words + tot_name_words;
tot_len_byte= 4 * tot_len_words;
if (!(file_buffer= (uchar *) my_malloc(tot_len_byte, MYF(MY_ZEROFILL))))
file_buffer= (uchar *) my_alloca(tot_len_byte);
if (!file_buffer)
DBUG_RETURN(TRUE);
bzero(file_buffer, tot_len_byte);
engine_array= (file_buffer + 12);
name_buffer_ptr= (char*) (file_buffer + ((4 + tot_partition_words) * 4));
part_it.rewind();
......@@ -2205,11 +2226,23 @@ bool ha_partition::create_handler_file(const char *name)
{
result= my_write(file, (uchar *) file_buffer, tot_len_byte,
MYF(MY_WME | MY_NABP)) != 0;
part_it.rewind();
for (i= 0; i < no_parts; i++)
{
uchar buffer[4];
part_elem= part_it++;
uint length = part_elem->connect_string.length;
int4store(buffer, length);
my_write(file, buffer, 4, MYF(MY_WME | MY_NABP));
my_write(file, (uchar *) part_elem->connect_string.str, length,
MYF(MY_WME | MY_NABP));
}
VOID(my_close(file, MYF(0)));
}
else
result= TRUE;
my_free((char*) file_buffer, MYF(0));
my_afree((char*) file_buffer);
DBUG_RETURN(result);
}
......@@ -2227,10 +2260,11 @@ void ha_partition::clear_handler_file()
{
if (m_engine_array)
plugin_unlock_list(NULL, m_engine_array, m_tot_parts);
my_free((char*) m_file_buffer, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*) m_engine_array, MYF(MY_ALLOW_ZERO_PTR));
free_root(&m_mem_root, MYF(MY_KEEP_PREALLOC));
m_file_buffer= NULL;
m_engine_array= NULL;
m_connect_string= NULL;
m_ordered_rec_buffer= NULL;
}
/*
......@@ -2389,7 +2423,7 @@ bool ha_partition::get_from_handler_file(const char *name, MEM_ROOT *mem_root)
goto err1;
len_words= uint4korr(buff);
len_bytes= 4 * len_words;
if (!(file_buffer= (char*) my_malloc(len_bytes, MYF(0))))
if (!(file_buffer= (char*) alloc_root(&m_mem_root, len_bytes)))
goto err1;
VOID(my_seek(file, 0, MY_SEEK_SET, MYF(0)));
if (my_read(file, (uchar *) file_buffer, len_bytes, MYF(MY_NABP)))
......@@ -2418,12 +2452,33 @@ bool ha_partition::get_from_handler_file(const char *name, MEM_ROOT *mem_root)
if (len_words != (tot_partition_words + tot_name_words + 4))
goto err3;
name_buffer_ptr= file_buffer + 16 + 4 * tot_partition_words;
if (!(m_connect_string= (LEX_STRING*)
alloc_root(&m_mem_root, m_tot_parts * sizeof(LEX_STRING))))
goto err3;
bzero(m_connect_string, m_tot_parts * sizeof(LEX_STRING));
for (i= 0; i < m_tot_parts; i++)
{
LEX_STRING connect_string;
uchar buffer[4];
if (my_read(file, buffer, 4, MYF(MY_NABP)))
break;
connect_string.length= uint4korr(buffer);
connect_string.str= (char*) alloc_root(&m_mem_root, connect_string.length+1);
if (my_read(file, (uchar*) connect_string.str, connect_string.length,
MYF(MY_NABP)))
break;
connect_string.str[connect_string.length]= 0;
m_connect_string[i]= connect_string;
}
VOID(my_close(file, MYF(0)));
m_file_buffer= file_buffer; // Will be freed in clear_handler_file()
m_name_buffer_ptr= name_buffer_ptr;
if (!(m_engine_array= (plugin_ref*)
my_malloc(m_tot_parts * sizeof(plugin_ref), MYF(MY_WME))))
alloc_root(&m_mem_root, m_tot_parts * sizeof(plugin_ref))))
goto err3;
for (i= 0; i < m_tot_parts; i++)
......@@ -2441,7 +2496,6 @@ bool ha_partition::get_from_handler_file(const char *name, MEM_ROOT *mem_root)
err3:
my_afree((gptr) engine_array);
err2:
my_free(file_buffer, MYF(0));
err1:
VOID(my_close(file, MYF(0)));
DBUG_RETURN(TRUE);
......@@ -2514,7 +2568,7 @@ int ha_partition::open(const char *name, int mode, uint test_if_locked)
alloc_len+= table_share->max_key_length;
if (!m_ordered_rec_buffer)
{
if (!(m_ordered_rec_buffer= (uchar*)my_malloc(alloc_len, MYF(MY_WME))))
if (!(m_ordered_rec_buffer= (uchar*) alloc_root(&m_mem_root, alloc_len)))
{
DBUG_RETURN(1);
}
......@@ -2557,9 +2611,11 @@ int ha_partition::open(const char *name, int mode, uint test_if_locked)
{
create_partition_name(name_buff, name, name_buffer_ptr, NORMAL_PART_NAME,
FALSE);
table->s->connect_string = m_connect_string[(uint)(file-m_file)];
if ((error= (*file)->ha_open(table, (const char*) name_buff, mode,
test_if_locked)))
goto err_handler;
bzero(&table->s->connect_string, sizeof(LEX_STRING));
m_no_locks+= (*file)->lock_count();
name_buffer_ptr+= strlen(name_buffer_ptr) + 1;
set_if_bigger(ref_length, ((*file)->ref_length));
......
......@@ -73,12 +73,14 @@ class ha_partition :public handler
uint m_open_test_lock; // Open test_if_locked
char *m_file_buffer; // Buffer with names
char *m_name_buffer_ptr; // Pointer to first partition name
MEM_ROOT m_mem_root;
plugin_ref *m_engine_array; // Array of types of the handlers
handler **m_file; // Array of references to handler inst.
uint m_file_tot_parts; // Debug
handler **m_new_file; // Array of references to new handlers
handler **m_reorged_file; // Reorganised partitions
handler **m_added_file; // Added parts kept for errors
LEX_STRING *m_connect_string;
partition_info *m_part_info; // local reference to partition
Field **m_part_field_array; // Part field array locally to save acc
uchar *m_ordered_rec_buffer; // Row and key buffer for ord. idx scan
......
......@@ -65,6 +65,7 @@ class partition_element :public Sql_alloc {
char* data_file_name;
char* index_file_name;
handlerton *engine_type;
LEX_STRING connect_string;
enum partition_state part_state;
uint16 nodegroup_id;
bool has_null_value;
......
......@@ -4501,6 +4501,12 @@ opt_part_option:
lex->part_info->curr_part_elem->engine_type= $4;
lex->part_info->default_engine_type= $4;
}
| CONNECTION_SYM opt_equal TEXT_STRING_sys
{
LEX *lex= Lex;
lex->part_info->curr_part_elem->connect_string.str= $3.str;
lex->part_info->curr_part_elem->connect_string.length= $3.length;
}
| NODEGROUP_SYM opt_equal real_ulong_num
{ Lex->part_info->curr_part_elem->nodegroup_id= (uint16) $3; }
| MAX_ROWS opt_equal real_ulonglong_num
......
......@@ -400,7 +400,7 @@ int federatedx_db_init(void *p)
federatedx_hton->commit= ha_federatedx::commit;
federatedx_hton->rollback= ha_federatedx::rollback;
federatedx_hton->create= federatedx_create_handler;
federatedx_hton->flags= HTON_ALTER_NOT_SUPPORTED | HTON_NO_PARTITION;
federatedx_hton->flags= HTON_ALTER_NOT_SUPPORTED;
if (pthread_mutex_init(&federatedx_mutex, MY_MUTEX_INIT_FAST))
goto error;
......@@ -1583,6 +1583,7 @@ static FEDERATEDX_SHARE *get_share(const char *table_name, TABLE *table)
tmp_share.table_name_length, ident_quote_char);
if (!(share= (FEDERATEDX_SHARE *) memdup_root(&mem_root, (char*)&tmp_share, sizeof(*share))) ||
!(share->share_key= (char*) memdup_root(&mem_root, tmp_share.share_key, tmp_share.share_key_length+1)) ||
!(share->select_query= (char*) strmake_root(&mem_root, query.ptr(), query.length() + 1)))
goto error;
......@@ -1722,6 +1723,7 @@ int ha_federatedx::disconnect(handlerton *hton, MYSQL_THD thd)
{
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
delete txn;
*((federatedx_txn **) thd_ha_data(thd, hton))= 0;
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment