Commit f99abb45 authored by Aleksey Midenkov's avatar Aleksey Midenkov

MDEV-17573 Assertion in federatedx on multi-update

Cause: shared federatedx_io cannot store table-specific data.

Fix: move current row reference `federatedx_io_mysql::current` to
ha_federatedx.

FederatedX connection (represented by federatedx_io) is stored into
federatedx_txn::txn_list of per-server connections (see
federatedx_txn::acquire()). federatedx_txn object is stored into THD
(see ha_federatedx::external_lock()). When multiple handlers acquire
FederatedX connection they get single federatedx_io instance. Multiple
handlers do their operation via federatedx_io_mysql::mark_position()
and federatedx_io_mysql::fetch_row() in arbitrarty manner. They access
the same federatedx_io_mysql instance and same MYSQL_ROWS *current
pointer, so one handler disrupts the work of the other.

Related to "MDEV-14551 Can't find record in table on multi-table update
with ORDER BY".
parent a3f7f233
...@@ -2283,6 +2283,32 @@ connection default; ...@@ -2283,6 +2283,32 @@ connection default;
connection master; connection master;
CREATE TABLE t1 (a INT) ENGINE=FEDERATED CONNECTION='mysql://@127.0.0.1:SLAVE_PORT/federated/t1'; CREATE TABLE t1 (a INT) ENGINE=FEDERATED CONNECTION='mysql://@127.0.0.1:SLAVE_PORT/federated/t1';
ERROR HY000: Can't create federated table. Foreign data src error: database: 'federated' username: '' hostname: '127.0.0.1' ERROR HY000: Can't create federated table. Foreign data src error: database: 'federated' username: '' hostname: '127.0.0.1'
#
# MDEV-17573 Assertion in federatedx on multi-update
#
create table t1 (
x int,
d datetime);
create table t1f engine=FEDERATED connection='mysql://root@127.0.0.1:MASTER_MYPORT/test/t1';
create table t2 (
x int, y int,
d datetime);
create table t2f engine=FEDERATED connection='mysql://root@127.0.0.1:MASTER_MYPORT/test/t2';
create table t3 (
x int, y int, z int,
d datetime);
create table t3f engine=FEDERATED connection='mysql://root@127.0.0.1:MASTER_MYPORT/test/t3';
insert into t1 values (1, "1990-01-01 00:00");
insert into t1 values (1, "1991-01-01 11:11");
insert into t2 values (2, 2, "1992-02-02 22:22");
insert into t3 values (3, 3, 3, "1993-03-03 23:33");
update t1f, t2f, t3f set t1f.x= 11, t2f.y= 22, t3f.z= 33;
drop table t1f;
drop table t2f;
drop table t3f;
drop table t1;
drop table t2;
drop table t3;
connection master; connection master;
DROP TABLE IF EXISTS federated.t1; DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated; DROP DATABASE IF EXISTS federated;
......
...@@ -2010,4 +2010,33 @@ connection master; ...@@ -2010,4 +2010,33 @@ connection master;
--error ER_CANT_CREATE_FEDERATED_TABLE --error ER_CANT_CREATE_FEDERATED_TABLE
eval CREATE TABLE t1 (a INT) ENGINE=FEDERATED CONNECTION='mysql://@127.0.0.1:$SLAVE_MYPORT/federated/t1'; eval CREATE TABLE t1 (a INT) ENGINE=FEDERATED CONNECTION='mysql://@127.0.0.1:$SLAVE_MYPORT/federated/t1';
--echo #
--echo # MDEV-17573 Assertion in federatedx on multi-update
--echo #
create table t1 (
x int,
d datetime);
--replace_result $MASTER_MYPORT MASTER_MYPORT
eval create table t1f engine=FEDERATED connection='mysql://root@127.0.0.1:$MASTER_MYPORT/test/t1';
create table t2 (
x int, y int,
d datetime);
--replace_result $MASTER_MYPORT MASTER_MYPORT
eval create table t2f engine=FEDERATED connection='mysql://root@127.0.0.1:$MASTER_MYPORT/test/t2';
create table t3 (
x int, y int, z int,
d datetime);
--replace_result $MASTER_MYPORT MASTER_MYPORT
eval create table t3f engine=FEDERATED connection='mysql://root@127.0.0.1:$MASTER_MYPORT/test/t3';
insert into t1 values (1, "1990-01-01 00:00");
insert into t1 values (1, "1991-01-01 11:11");
insert into t2 values (2, 2, "1992-02-02 22:22");
insert into t3 values (3, 3, 3, "1993-03-03 23:33");
update t1f, t2f, t3f set t1f.x= 11, t2f.y= 22, t3f.z= 33;
drop table t1f; drop table t2f; drop table t3f; drop table t1; drop table t2; drop table t3;
source include/federated_cleanup.inc; source include/federated_cleanup.inc;
...@@ -64,7 +64,6 @@ struct mysql_position ...@@ -64,7 +64,6 @@ struct mysql_position
class federatedx_io_mysql :public federatedx_io class federatedx_io_mysql :public federatedx_io
{ {
MYSQL mysql; /* MySQL connection */ MYSQL mysql; /* MySQL connection */
MYSQL_ROWS *current;
DYNAMIC_ARRAY savepoints; DYNAMIC_ARRAY savepoints;
bool requested_autocommit; bool requested_autocommit;
bool actual_autocommit; bool actual_autocommit;
...@@ -108,7 +107,8 @@ class federatedx_io_mysql :public federatedx_io ...@@ -108,7 +107,8 @@ class federatedx_io_mysql :public federatedx_io
virtual void free_result(FEDERATEDX_IO_RESULT *io_result); virtual void free_result(FEDERATEDX_IO_RESULT *io_result);
virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result); virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result);
virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result); virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result);
virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result); virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result,
FEDERATEDX_IO_ROWS **current= NULL);
virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result); virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result);
virtual const char *get_column_data(FEDERATEDX_IO_ROW *row, virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
unsigned int column); unsigned int column);
...@@ -117,7 +117,7 @@ class federatedx_io_mysql :public federatedx_io ...@@ -117,7 +117,7 @@ class federatedx_io_mysql :public federatedx_io
virtual size_t get_ref_length() const; virtual size_t get_ref_length() const;
virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
void *ref); void *ref, FEDERATEDX_IO_ROWS *current);
virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref); const void *ref);
virtual void set_thd(void *thd); virtual void set_thd(void *thd);
...@@ -515,10 +515,12 @@ my_ulonglong federatedx_io_mysql::get_num_rows(FEDERATEDX_IO_RESULT *io_result) ...@@ -515,10 +515,12 @@ my_ulonglong federatedx_io_mysql::get_num_rows(FEDERATEDX_IO_RESULT *io_result)
} }
FEDERATEDX_IO_ROW *federatedx_io_mysql::fetch_row(FEDERATEDX_IO_RESULT *io_result) FEDERATEDX_IO_ROW *federatedx_io_mysql::fetch_row(FEDERATEDX_IO_RESULT *io_result,
FEDERATEDX_IO_ROWS **current)
{ {
MYSQL_RES *result= (MYSQL_RES*)io_result; MYSQL_RES *result= (MYSQL_RES*)io_result;
current= result->data_cursor; if (current)
*current= (FEDERATEDX_IO_ROWS *) result->data_cursor;
return (FEDERATEDX_IO_ROW *) mysql_fetch_row(result); return (FEDERATEDX_IO_ROW *) mysql_fetch_row(result);
} }
...@@ -626,11 +628,11 @@ size_t federatedx_io_mysql::get_ref_length() const ...@@ -626,11 +628,11 @@ size_t federatedx_io_mysql::get_ref_length() const
void federatedx_io_mysql::mark_position(FEDERATEDX_IO_RESULT *io_result, void federatedx_io_mysql::mark_position(FEDERATEDX_IO_RESULT *io_result,
void *ref) void *ref, FEDERATEDX_IO_ROWS *current)
{ {
mysql_position& pos= *reinterpret_cast<mysql_position*>(ref); mysql_position& pos= *reinterpret_cast<mysql_position*>(ref);
pos.result= (MYSQL_RES *) io_result; pos.result= (MYSQL_RES *) io_result;
pos.offset= current; pos.offset= (MYSQL_ROW_OFFSET) current;
} }
int federatedx_io_mysql::seek_position(FEDERATEDX_IO_RESULT **io_result, int federatedx_io_mysql::seek_position(FEDERATEDX_IO_RESULT **io_result,
......
...@@ -90,7 +90,8 @@ class federatedx_io_null :public federatedx_io ...@@ -90,7 +90,8 @@ class federatedx_io_null :public federatedx_io
virtual void free_result(FEDERATEDX_IO_RESULT *io_result); virtual void free_result(FEDERATEDX_IO_RESULT *io_result);
virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result); virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result);
virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result); virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result);
virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result); virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result,
FEDERATEDX_IO_ROWS **current= NULL);
virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result); virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result);
virtual const char *get_column_data(FEDERATEDX_IO_ROW *row, virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
unsigned int column); unsigned int column);
...@@ -98,7 +99,7 @@ class federatedx_io_null :public federatedx_io ...@@ -98,7 +99,7 @@ class federatedx_io_null :public federatedx_io
unsigned int column) const; unsigned int column) const;
virtual size_t get_ref_length() const; virtual size_t get_ref_length() const;
virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
void *ref); void *ref, FEDERATEDX_IO_ROWS *current);
virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref); const void *ref);
}; };
...@@ -242,7 +243,8 @@ my_ulonglong federatedx_io_null::get_num_rows(FEDERATEDX_IO_RESULT *) ...@@ -242,7 +243,8 @@ my_ulonglong federatedx_io_null::get_num_rows(FEDERATEDX_IO_RESULT *)
} }
FEDERATEDX_IO_ROW *federatedx_io_null::fetch_row(FEDERATEDX_IO_RESULT *) FEDERATEDX_IO_ROW *federatedx_io_null::fetch_row(FEDERATEDX_IO_RESULT *,
FEDERATEDX_IO_ROWS **current)
{ {
return NULL; return NULL;
} }
...@@ -288,7 +290,7 @@ size_t federatedx_io_null::get_ref_length() const ...@@ -288,7 +290,7 @@ size_t federatedx_io_null::get_ref_length() const
void federatedx_io_null::mark_position(FEDERATEDX_IO_RESULT *io_result, void federatedx_io_null::mark_position(FEDERATEDX_IO_RESULT *io_result,
void *ref) void *ref, FEDERATEDX_IO_ROWS *current)
{ {
} }
......
...@@ -2908,7 +2908,7 @@ int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result) ...@@ -2908,7 +2908,7 @@ int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result)
DBUG_RETURN(retval); DBUG_RETURN(retval);
/* Fetch a row, insert it back in a row format. */ /* Fetch a row, insert it back in a row format. */
if (!(row= io->fetch_row(result))) if (!(row= io->fetch_row(result, &current)))
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
if (!(retval= convert_row_to_internal_format(buf, row, result))) if (!(retval= convert_row_to_internal_format(buf, row, result)))
...@@ -2952,7 +2952,7 @@ void ha_federatedx::position(const uchar *record __attribute__ ((unused))) ...@@ -2952,7 +2952,7 @@ void ha_federatedx::position(const uchar *record __attribute__ ((unused)))
if (txn->acquire(share, ha_thd(), TRUE, &io)) if (txn->acquire(share, ha_thd(), TRUE, &io))
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
io->mark_position(stored_result, ref); io->mark_position(stored_result, ref, current);
position_called= TRUE; position_called= TRUE;
......
...@@ -129,6 +129,7 @@ typedef struct st_federatedx_share { ...@@ -129,6 +129,7 @@ typedef struct st_federatedx_share {
typedef struct st_federatedx_result FEDERATEDX_IO_RESULT; typedef struct st_federatedx_result FEDERATEDX_IO_RESULT;
typedef struct st_federatedx_row FEDERATEDX_IO_ROW; typedef struct st_federatedx_row FEDERATEDX_IO_ROW;
typedef struct st_federatedx_rows FEDERATEDX_IO_ROWS;
typedef ptrdiff_t FEDERATEDX_IO_OFFSET; typedef ptrdiff_t FEDERATEDX_IO_OFFSET;
class federatedx_io class federatedx_io
...@@ -203,7 +204,8 @@ class federatedx_io ...@@ -203,7 +204,8 @@ class federatedx_io
virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0; virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0;
virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0; virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0;
virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0; virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0;
virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result)=0; virtual FEDERATEDX_IO_ROW *fetch_row(FEDERATEDX_IO_RESULT *io_result,
FEDERATEDX_IO_ROWS **current= NULL)=0;
virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0; virtual ulong *fetch_lengths(FEDERATEDX_IO_RESULT *io_result)=0;
virtual const char *get_column_data(FEDERATEDX_IO_ROW *row, virtual const char *get_column_data(FEDERATEDX_IO_ROW *row,
unsigned int column)=0; unsigned int column)=0;
...@@ -212,7 +214,7 @@ class federatedx_io ...@@ -212,7 +214,7 @@ class federatedx_io
virtual size_t get_ref_length() const=0; virtual size_t get_ref_length() const=0;
virtual void mark_position(FEDERATEDX_IO_RESULT *io_result, virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
void *ref)=0; void *ref, FEDERATEDX_IO_ROWS *current)=0;
virtual int seek_position(FEDERATEDX_IO_RESULT **io_result, virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref)=0; const void *ref)=0;
virtual void set_thd(void *thd) { } virtual void set_thd(void *thd) { }
...@@ -265,6 +267,7 @@ class ha_federatedx: public handler ...@@ -265,6 +267,7 @@ class ha_federatedx: public handler
federatedx_txn *txn; federatedx_txn *txn;
federatedx_io *io; federatedx_io *io;
FEDERATEDX_IO_RESULT *stored_result; FEDERATEDX_IO_RESULT *stored_result;
FEDERATEDX_IO_ROWS *current;
/** /**
Array of all stored results we get during a query execution. Array of all stored results we get during a query execution.
*/ */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment