Commit 94beb7cd authored by unknown's avatar unknown

Bug#25511

  "Federated INSERT failures"
  Federated does not correctly handle "INSERT...ON DUPLICATE KEY UPDATE"
  However, implementing such support is not reasonably possible without
  increasing complexity of the storage engine: checking that constraints
  on remote server match local server and parsing error messages.
  This patch causes 'ON DUPLICATE KEY' to fail with ER_DUP_KEY message
  if a conflict occurs and not to fail silently.


include/my_base.h:
  bug25511
    new storage engine hint: HA_EXTRA_INSERT_WITH_UPDATE
mysql-test/r/federated.result:
  test for bug25511
mysql-test/t/federated.test:
  test for bug25511
sql/ha_federated.cc:
  bug25511
    implement support for handling HA_EXTRA_INSERT_WITH_UPDATE hint
sql/ha_federated.h:
  bug25511
    new property: insert_dup_update
sql/sql_insert.cc:
  bug25511
    implement support for HA_EXTRA_INSERT_WITH_UPDATE
    When checking duplicates flag, if it is DUP_UPDATE, send hint
    to the storage engine.
parent 0e5e884b
...@@ -168,7 +168,13 @@ enum ha_extra_function { ...@@ -168,7 +168,13 @@ enum ha_extra_function {
These flags are reset by the handler::extra(HA_EXTRA_RESET) call. These flags are reset by the handler::extra(HA_EXTRA_RESET) call.
*/ */
HA_EXTRA_DELETE_CANNOT_BATCH, HA_EXTRA_DELETE_CANNOT_BATCH,
HA_EXTRA_UPDATE_CANNOT_BATCH HA_EXTRA_UPDATE_CANNOT_BATCH,
/*
Inform handler that write_row() should immediately report constraint
violations because a INSERT...ON DUPLICATE KEY UPDATE is in being
performed.
*/
HA_EXTRA_INSERT_WITH_UPDATE
}; };
/* The following is parameter to ha_panic() */ /* The following is parameter to ha_panic() */
......
...@@ -1867,6 +1867,21 @@ a b ...@@ -1867,6 +1867,21 @@ a b
3 Curly 3 Curly
drop table federated.t1; drop table federated.t1;
drop table federated.t1; drop table federated.t1;
create table federated.t1 (a int primary key, b varchar(64))
DEFAULT CHARSET=utf8;
create table federated.t1 (a int primary key, b varchar(64))
ENGINE=FEDERATED
connection='mysql://root@127.0.0.1:SLAVE_PORT/federated/t1'
DEFAULT CHARSET=utf8;
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe")
on duplicate key update a=a+100;
ERROR 23000: Can't write; duplicate key in table 't1'
select * from federated.t1;
a b
1 Larry
2 Curly
drop table federated.t1;
drop table federated.t1;
DROP TABLE IF EXISTS federated.t1; DROP TABLE IF EXISTS federated.t1;
DROP DATABASE IF EXISTS federated; DROP DATABASE IF EXISTS federated;
DROP TABLE IF EXISTS federated.t1; DROP TABLE IF EXISTS federated.t1;
......
...@@ -1603,4 +1603,30 @@ drop table federated.t1; ...@@ -1603,4 +1603,30 @@ drop table federated.t1;
connection slave; connection slave;
drop table federated.t1; drop table federated.t1;
#
# BUG#25511 Federated Insert failures.
#
# When the user performs a INSERT...ON DUPLICATE KEY UPDATE, we want
# it to fail if a duplicate key exists instead of ignoring it.
#
connection slave;
create table federated.t1 (a int primary key, b varchar(64))
DEFAULT CHARSET=utf8;
connection master;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval create table federated.t1 (a int primary key, b varchar(64))
ENGINE=FEDERATED
connection='mysql://root@127.0.0.1:$SLAVE_MYPORT/federated/t1'
DEFAULT CHARSET=utf8;
--error ER_DUP_KEY
insert into federated.t1 values (1,"Larry"), (2,"Curly"), (1,"Moe")
on duplicate key update a=a+100;
select * from federated.t1;
drop table federated.t1;
connection slave;
drop table federated.t1;
source include/federated_cleanup.inc; source include/federated_cleanup.inc;
...@@ -1627,7 +1627,7 @@ int ha_federated::write_row(byte *buf) ...@@ -1627,7 +1627,7 @@ int ha_federated::write_row(byte *buf)
*/ */
if (replace_duplicates) if (replace_duplicates)
insert_string.append(STRING_WITH_LEN("REPLACE INTO ")); insert_string.append(STRING_WITH_LEN("REPLACE INTO "));
else if (ignore_duplicates) else if (ignore_duplicates && !insert_dup_update)
insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO ")); insert_string.append(STRING_WITH_LEN("INSERT IGNORE INTO "));
else else
insert_string.append(STRING_WITH_LEN("INSERT INTO ")); insert_string.append(STRING_WITH_LEN("INSERT INTO "));
...@@ -2548,6 +2548,7 @@ int ha_federated::extra(ha_extra_function operation) ...@@ -2548,6 +2548,7 @@ int ha_federated::extra(ha_extra_function operation)
ignore_duplicates= TRUE; ignore_duplicates= TRUE;
break; break;
case HA_EXTRA_NO_IGNORE_DUP_KEY: case HA_EXTRA_NO_IGNORE_DUP_KEY:
insert_dup_update= FALSE;
ignore_duplicates= FALSE; ignore_duplicates= FALSE;
break; break;
case HA_EXTRA_WRITE_CAN_REPLACE: case HA_EXTRA_WRITE_CAN_REPLACE:
...@@ -2556,7 +2557,11 @@ int ha_federated::extra(ha_extra_function operation) ...@@ -2556,7 +2557,11 @@ int ha_federated::extra(ha_extra_function operation)
case HA_EXTRA_WRITE_CANNOT_REPLACE: case HA_EXTRA_WRITE_CANNOT_REPLACE:
replace_duplicates= FALSE; replace_duplicates= FALSE;
break; break;
case HA_EXTRA_INSERT_WITH_UPDATE:
insert_dup_update= TRUE;
break;
case HA_EXTRA_RESET: case HA_EXTRA_RESET:
insert_dup_update= FALSE;
ignore_duplicates= FALSE; ignore_duplicates= FALSE;
replace_duplicates= FALSE; replace_duplicates= FALSE;
break; break;
...@@ -2699,6 +2704,9 @@ int ha_federated::stash_remote_error() ...@@ -2699,6 +2704,9 @@ int ha_federated::stash_remote_error()
DBUG_ENTER("ha_federated::stash_remote_error()"); DBUG_ENTER("ha_federated::stash_remote_error()");
remote_error_number= mysql_errno(mysql); remote_error_number= mysql_errno(mysql);
strmake(remote_error_buf, mysql_error(mysql), sizeof(remote_error_buf)-1); strmake(remote_error_buf, mysql_error(mysql), sizeof(remote_error_buf)-1);
if (remote_error_number == ER_DUP_ENTRY ||
remote_error_number == ER_DUP_KEY)
DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM); DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM);
} }
......
...@@ -158,6 +158,7 @@ class ha_federated: public handler ...@@ -158,6 +158,7 @@ class ha_federated: public handler
int remote_error_number; int remote_error_number;
char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE]; char remote_error_buf[FEDERATED_QUERY_BUFFER_SIZE];
bool ignore_duplicates, replace_duplicates; bool ignore_duplicates, replace_duplicates;
bool insert_dup_update;
private: private:
/* /*
......
...@@ -715,6 +715,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, ...@@ -715,6 +715,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
*/ */
table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS); table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS);
} }
if (duplic == DUP_UPDATE)
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
/* /*
let's *try* to start bulk inserts. It won't necessary let's *try* to start bulk inserts. It won't necessary
start them as values_list.elements should be greater than start them as values_list.elements should be greater than
...@@ -2434,6 +2436,8 @@ bool Delayed_insert::handle_inserts(void) ...@@ -2434,6 +2436,8 @@ bool Delayed_insert::handle_inserts(void)
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
using_opt_replace= 1; using_opt_replace= 1;
} }
if (info.handle_duplicates == DUP_UPDATE)
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
thd.clear_error(); // reset error for binlog thd.clear_error(); // reset error for binlog
if (write_record(&thd, table, &info)) if (write_record(&thd, table, &info))
{ {
...@@ -2761,6 +2765,8 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -2761,6 +2765,8 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS); table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS);
} }
if (info.handle_duplicates == DUP_UPDATE)
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
thd->no_trans_update.stmt= FALSE; thd->no_trans_update.stmt= FALSE;
thd->abort_on_warning= (!info.ignore && thd->abort_on_warning= (!info.ignore &&
(thd->variables.sql_mode & (thd->variables.sql_mode &
...@@ -3218,6 +3224,8 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) ...@@ -3218,6 +3224,8 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u)
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS); table->file->extra(HA_EXTRA_RETRIEVE_ALL_COLS);
} }
if (info.handle_duplicates == DUP_UPDATE)
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
if (!thd->prelocked_mode) if (!thd->prelocked_mode)
table->file->start_bulk_insert((ha_rows) 0); table->file->start_bulk_insert((ha_rows) 0);
thd->no_trans_update.stmt= FALSE; thd->no_trans_update.stmt= FALSE;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment