Commit 72429cad authored by Nikita Malyavin's avatar Nikita Malyavin

MDEV-30046 wrong row targeted with "insert ... on duplicate" and "replace"

When HA_DUPLICATE_POS is not supported, the row to replace was navigated by
ha_index_read_idx_map, which uses only hash to navigate.

Suchwise, given a hash collision it may choose an incorrect row.

handler::position would be correct and very convenient to use here.

dup_ref is already set by handler independently of the engine
capabilities, when an extra lookup is made (for long unique or something else,
for example WITHOUT OVERLAPS) such error will be indicated by
file->lookup_errkey != -1.
parent 7f161a5c
......@@ -674,6 +674,25 @@ SELECT * FROM t1;
a b c
3 2 2
DROP TABLE t1;
# MDEV-30046 wrong row targeted with "insert ... on duplicate" and
# "replace", leading to data corruption
create table t (s blob, n int, unique (s)) engine=innodb;
insert into t values ('Hrecvx_0004ln-00',1), ('Hrecvx_0004mm-00',1);
replace into t values ('Hrecvx_0004mm-00',2);
select * from t;
s n
Hrecvx_0004ln-00 1
Hrecvx_0004mm-00 2
drop table t;
create table t (s blob, n int, unique (s)) engine=innodb;
insert into t values ('Hrecvx_0004ln-00',1), ('Hrecvx_0004mm-00',1);
insert into t values ('Hrecvx_0004mm-00',2)
on duplicate key update n = values (n);
select * from t;
s n
Hrecvx_0004ln-00 1
Hrecvx_0004mm-00 2
drop table t;
#
# End of 10.5 tests
#
......@@ -655,6 +655,21 @@ REPLACE INTO t1 VALUES (3,2,2);
SELECT * FROM t1;
DROP TABLE t1;
--echo # MDEV-30046 wrong row targeted with "insert ... on duplicate" and
--echo # "replace", leading to data corruption
--source include/have_innodb.inc
create table t (s blob, n int, unique (s)) engine=innodb;
insert into t values ('Hrecvx_0004ln-00',1), ('Hrecvx_0004mm-00',1);
replace into t values ('Hrecvx_0004mm-00',2);
select * from t;
drop table t;
create table t (s blob, n int, unique (s)) engine=innodb;
insert into t values ('Hrecvx_0004ln-00',1), ('Hrecvx_0004mm-00',1);
insert into t values ('Hrecvx_0004mm-00',2)
on duplicate key update n = values (n);
select * from t;
drop table t;
--echo #
--echo # End of 10.5 tests
--echo #
......@@ -4569,6 +4569,12 @@ uint handler::get_dup_key(int error)
DBUG_RETURN(errkey);
}
bool handler::has_dup_ref() const
{
DBUG_ASSERT(lookup_errkey != (uint)-1 || errkey != (uint)-1);
return ha_table_flags() & HA_DUPLICATE_POS || lookup_errkey != (uint)-1;
}
/**
Delete all files with extension from bas_ext().
......@@ -6996,11 +7002,8 @@ int handler::check_duplicate_long_entry_key(const uchar *new_rec, uint key_no)
if (error == HA_ERR_FOUND_DUPP_KEY)
{
table->file->lookup_errkey= key_no;
if (ha_table_flags() & HA_DUPLICATE_POS)
{
lookup_handler->position(table->record[0]);
memcpy(table->file->dup_ref, lookup_handler->ref, ref_length);
}
lookup_handler->position(table->record[0]);
memcpy(table->file->dup_ref, lookup_handler->ref, ref_length);
}
restore_record(table, file->lookup_buffer);
table->restore_blob_values(blob_storage);
......
......@@ -3464,6 +3464,7 @@ class handler :public Sql_alloc
virtual void print_error(int error, myf errflag);
virtual bool get_error_message(int error, String *buf);
uint get_dup_key(int error);
bool has_dup_ref() const;
/**
Retrieves the names of the table and the key for which there was a
duplicate entry in the case of HA_ERR_FOREIGN_DUPLICATE_KEY.
......
......@@ -1884,11 +1884,28 @@ int write_record(THD *thd, TABLE *table, COPY_INFO *info, select_result *sink)
if (info->handle_duplicates == DUP_REPLACE && table->next_number_field &&
key_nr == table->s->next_number_index && insert_id_for_cur_row > 0)
goto err;
if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
if (table->file->has_dup_ref())
{
/*
If engine doesn't support HA_DUPLICATE_POS, the handler may init to
INDEX, but dup_ref could also be set by lookup_handled (and then,
lookup_errkey is set, f.ex. long unique duplicate).
In such case, handler would stay uninitialized, so do it here.
*/
bool init_lookup_handler= table->file->lookup_errkey != (uint)-1 &&
table->file->inited == handler::NONE;
if (init_lookup_handler && table->file->ha_rnd_init_with_error(false))
goto err;
DBUG_ASSERT(table->file->inited == handler::RND);
if (table->file->ha_rnd_pos(table->record[1],table->file->dup_ref))
goto err;
int rnd_pos_err= table->file->ha_rnd_pos(table->record[1],
table->file->dup_ref);
if (init_lookup_handler)
table->file->ha_rnd_end();
if (rnd_pos_err)
goto err;
}
else
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment