Commit 932ec586 authored by Aleksey Midenkov's avatar Aleksey Midenkov

MDEV-23644 Assertion on evaluating foreign referential action for...

MDEV-23644 Assertion on evaluating foreign referential action for self-reference in system versioned table

First part of the fix (row0mysql.cc) addresses external columns when adding history
row on referential action. The full data must be retrieved before the
row is inserted.

Second part of the fix (the rest) avoids duplicate primary key error between
the history row generated on referential action and the history row
generated by SQL command. Both command and referential action can
happen on same table since foreign key can be self-reference (parent
and child tables are same). Moreover, the self-reference can refer
multiple rows when the key is non-unique. In such case history is
generated by referential action occured on first row but processed all
rows by a matched key. The second round is when the next row is
processed by a command but history already exists. In such case we
check TRX_ID of existing history row and if it is the same we assume
the above situation and skip adding one more history row or failing
the command.
parent 7410ff43
...@@ -70,6 +70,11 @@ returns int ...@@ -70,6 +70,11 @@ returns int
deterministic deterministic
return sys_trx_end = $sys_datatype_max; return sys_trx_end = $sys_datatype_max;
eval create or replace function current_row_ts(sys_trx_end timestamp(6))
returns int
deterministic
return convert_tz(sys_trx_end, '+00:00', @@time_zone) = TIMESTAMP'2038-01-19 03:14:07.999999';
delimiter ~~; delimiter ~~;
eval create or replace function check_row(row_start $sys_datatype_expl, row_end $sys_datatype_expl) eval create or replace function check_row(row_start $sys_datatype_expl, row_end $sys_datatype_expl)
returns varchar(255) returns varchar(255)
...@@ -86,4 +91,20 @@ begin ...@@ -86,4 +91,20 @@ begin
end~~ end~~
delimiter ;~~ delimiter ;~~
delimiter ~~;
eval create or replace function check_row_ts(row_start timestamp(6), row_end timestamp(6))
returns varchar(255)
deterministic
begin
if row_end < row_start then
return "ERROR: row_end < row_start";
elseif row_end = row_start then
return "ERROR: row_end == row_start";
elseif current_row_ts(row_end) then
return "CURRENT ROW";
end if;
return "HISTORICAL ROW";
end~~
delimiter ;~~
--enable_query_log --enable_query_log
...@@ -4,5 +4,7 @@ drop procedure if exists verify_trt; ...@@ -4,5 +4,7 @@ drop procedure if exists verify_trt;
drop procedure if exists verify_trt_dummy; drop procedure if exists verify_trt_dummy;
drop function if exists current_row; drop function if exists current_row;
drop function if exists check_row; drop function if exists check_row;
drop function if exists current_row_ts;
drop function if exists check_row_ts;
--enable_warnings --enable_warnings
--enable_query_log --enable_query_log
...@@ -138,13 +138,13 @@ f1 int, f2 text, f3 int, fulltext (f2), key(f1), key(f3), ...@@ -138,13 +138,13 @@ f1 int, f2 text, f3 int, fulltext (f2), key(f1), key(f3),
foreign key r (f3) references t1 (f1) on delete set null) foreign key r (f3) references t1 (f1) on delete set null)
with system versioning engine innodb; with system versioning engine innodb;
insert into t1 values (1, repeat('a', 8193), 1), (1, repeat('b', 8193), 1); insert into t1 values (1, repeat('a', 8193), 1), (1, repeat('b', 8193), 1);
select f1, f3, check_row(row_start, row_end) from t1; select f1, f3, check_row_ts(row_start, row_end) from t1;
f1 f3 check_row(row_start, row_end) f1 f3 check_row_ts(row_start, row_end)
1 1 CURRENT ROW 1 1 CURRENT ROW
1 1 CURRENT ROW 1 1 CURRENT ROW
delete from t1; delete from t1;
select f1, f3, check_row(row_start, row_end) from t1 for system_time all; select f1, f3, check_row_ts(row_start, row_end) from t1 for system_time all;
f1 f3 check_row(row_start, row_end) f1 f3 check_row_ts(row_start, row_end)
1 1 HISTORICAL ROW 1 1 HISTORICAL ROW
1 NULL ERROR: row_end == row_start 1 NULL ERROR: row_end == row_start
1 1 HISTORICAL ROW 1 1 HISTORICAL ROW
......
...@@ -398,6 +398,8 @@ Warning 1265 Data truncated for column 'f12' at row 7 ...@@ -398,6 +398,8 @@ Warning 1265 Data truncated for column 'f12' at row 7
SET timestamp = 9; SET timestamp = 9;
REPLACE INTO t2 SELECT * FROM t2; REPLACE INTO t2 SELECT * FROM t2;
DROP TABLE t1, t2; DROP TABLE t1, t2;
set timestamp= default;
set time_zone='+00:00';
# #
# MDEV-16210 FK constraints on versioned tables use historical rows, which may cause constraint violation # MDEV-16210 FK constraints on versioned tables use historical rows, which may cause constraint violation
# #
...@@ -427,3 +429,17 @@ insert into t2 values (1), (1); ...@@ -427,3 +429,17 @@ insert into t2 values (1), (1);
# DELETE from foreign table is allowed # DELETE from foreign table is allowed
delete from t2; delete from t2;
drop tables t2, t1; drop tables t2, t1;
#
# MDEV-23644 Assertion on evaluating foreign referential action for self-reference in system versioned table
#
create table t1 (pk int primary key, f1 int,f2 int, f3 text,
key(f1), fulltext(f3), key(f3(10)),
foreign key (f2) references t1 (f1) on delete set null
) engine=innodb with system versioning;
insert into t1 values (1, 8, 8, 'SHORT'), (2, 8, 8, repeat('LONG', 8071));
delete from t1;
select pk, f1, f2, left(f3, 4), check_row_ts(row_start, row_end) from t1 for system_time all order by pk;
pk f1 f2 left(f3, 4) check_row_ts(row_start, row_end)
1 8 8 SHOR HISTORICAL ROW
2 8 8 LONG HISTORICAL ROW
drop table t1;
...@@ -5,7 +5,15 @@ sys_trx_start bigint(20) unsigned as row start invisible, ...@@ -5,7 +5,15 @@ sys_trx_start bigint(20) unsigned as row start invisible,
sys_trx_end bigint(20) unsigned as row end invisible, sys_trx_end bigint(20) unsigned as row end invisible,
period for system_time (sys_trx_start, sys_trx_end) period for system_time (sys_trx_start, sys_trx_end)
) with system versioning; ) with system versioning;
# No history inside the transaction
start transaction;
insert into t1 (x) values (1); insert into t1 (x) values (1);
update t1 set x= x + 1;
update t1 set x= x + 1;
commit;
select *, sys_trx_start > 1, sys_trx_end from t1 for system_time all;
x sys_trx_start > 1 sys_trx_end
3 1 18446744073709551615
# ALTER ADD SYSTEM VERSIONING should write to mysql.transaction_registry # ALTER ADD SYSTEM VERSIONING should write to mysql.transaction_registry
set @@system_versioning_alter_history=keep; set @@system_versioning_alter_history=keep;
create or replace table t1 (x int); create or replace table t1 (x int);
......
...@@ -102,9 +102,9 @@ create table t1 ( ...@@ -102,9 +102,9 @@ create table t1 (
foreign key r (f3) references t1 (f1) on delete set null) foreign key r (f3) references t1 (f1) on delete set null)
with system versioning engine innodb; with system versioning engine innodb;
insert into t1 values (1, repeat('a', 8193), 1), (1, repeat('b', 8193), 1); insert into t1 values (1, repeat('a', 8193), 1), (1, repeat('b', 8193), 1);
select f1, f3, check_row(row_start, row_end) from t1; select f1, f3, check_row_ts(row_start, row_end) from t1;
delete from t1; delete from t1;
select f1, f3, check_row(row_start, row_end) from t1 for system_time all; select f1, f3, check_row_ts(row_start, row_end) from t1 for system_time all;
# cleanup # cleanup
drop table t1; drop table t1;
......
...@@ -421,6 +421,8 @@ REPLACE INTO t2 SELECT * FROM t2; ...@@ -421,6 +421,8 @@ REPLACE INTO t2 SELECT * FROM t2;
# Cleanup # Cleanup
DROP TABLE t1, t2; DROP TABLE t1, t2;
set timestamp= default;
set time_zone='+00:00';
--let $datadir= `select @@datadir` --let $datadir= `select @@datadir`
--remove_file $datadir/test/t1.data --remove_file $datadir/test/t1.data
--remove_file $datadir/test/t1.data.2 --remove_file $datadir/test/t1.data.2
...@@ -458,4 +460,20 @@ insert into t2 values (1), (1); ...@@ -458,4 +460,20 @@ insert into t2 values (1), (1);
delete from t2; delete from t2;
drop tables t2, t1; drop tables t2, t1;
--echo #
--echo # MDEV-23644 Assertion on evaluating foreign referential action for self-reference in system versioned table
--echo #
create table t1 (pk int primary key, f1 int,f2 int, f3 text,
key(f1), fulltext(f3), key(f3(10)),
foreign key (f2) references t1 (f1) on delete set null
) engine=innodb with system versioning;
insert into t1 values (1, 8, 8, 'SHORT'), (2, 8, 8, repeat('LONG', 8071));
delete from t1;
select pk, f1, f2, left(f3, 4), check_row_ts(row_start, row_end) from t1 for system_time all order by pk;
# cleanup
drop table t1;
--source suite/versioning/common_finish.inc --source suite/versioning/common_finish.inc
...@@ -14,7 +14,13 @@ create or replace table t1 ( ...@@ -14,7 +14,13 @@ create or replace table t1 (
period for system_time (sys_trx_start, sys_trx_end) period for system_time (sys_trx_start, sys_trx_end)
) with system versioning; ) with system versioning;
--echo # No history inside the transaction
start transaction;
insert into t1 (x) values (1); insert into t1 (x) values (1);
update t1 set x= x + 1;
update t1 set x= x + 1;
commit;
select *, sys_trx_start > 1, sys_trx_end from t1 for system_time all;
--echo # ALTER ADD SYSTEM VERSIONING should write to mysql.transaction_registry --echo # ALTER ADD SYSTEM VERSIONING should write to mysql.transaction_registry
set @@system_versioning_alter_history=keep; set @@system_versioning_alter_history=keep;
......
...@@ -246,7 +246,15 @@ int TABLE::delete_row() ...@@ -246,7 +246,15 @@ int TABLE::delete_row()
store_record(this, record[1]); store_record(this, record[1]);
vers_update_end(); vers_update_end();
return file->ha_update_row(record[1], record[0]); int err= file->ha_update_row(record[1], record[0]);
/*
MDEV-23644: we get HA_ERR_FOREIGN_DUPLICATE_KEY iff we already got history
row with same trx_id which is the result of foreign key action, so we
don't need one more history row.
*/
if (err == HA_ERR_FOREIGN_DUPLICATE_KEY)
return file->ha_delete_row(record[0]);
return err;
} }
......
...@@ -738,7 +738,7 @@ void ...@@ -738,7 +738,7 @@ void
dtuple_convert_back_big_rec( dtuple_convert_back_big_rec(
/*========================*/ /*========================*/
dict_index_t* index MY_ATTRIBUTE((unused)), /*!< in: index */ dict_index_t* index MY_ATTRIBUTE((unused)), /*!< in: index */
dtuple_t* entry, /*!< in: entry whose data was put to vector */ dtuple_t* entry, /*!< in/out: entry whose data was put to vector */
big_rec_t* vector) /*!< in, own: big rec vector; it is big_rec_t* vector) /*!< in, own: big rec vector; it is
freed in this function */ freed in this function */
{ {
......
...@@ -543,6 +543,16 @@ struct dtuple_t { ...@@ -543,6 +543,16 @@ struct dtuple_t {
inserted or updated. inserted or updated.
@param[in] index index possibly with instantly added columns */ @param[in] index index possibly with instantly added columns */
void trim(const dict_index_t& index); void trim(const dict_index_t& index);
bool vers_history_row() const
{
for (ulint i = 0; i < n_fields; i++) {
const dfield_t* field = &fields[i];
if (field->type.vers_sys_end()) {
return field->vers_history_row();
}
}
return false;
}
}; };
inline ulint dtuple_get_n_fields(const dtuple_t* tuple) inline ulint dtuple_get_n_fields(const dtuple_t* tuple)
......
...@@ -2385,6 +2385,18 @@ row_ins_duplicate_error_in_clust( ...@@ -2385,6 +2385,18 @@ row_ins_duplicate_error_in_clust(
duplicate: duplicate:
trx->error_info = cursor->index; trx->error_info = cursor->index;
err = DB_DUPLICATE_KEY; err = DB_DUPLICATE_KEY;
if (cursor->index->table->versioned()
&& entry->vers_history_row())
{
ulint trx_id_len;
byte *trx_id = rec_get_nth_field(
rec, offsets, n_unique,
&trx_id_len);
ut_ad(trx_id_len == DATA_TRX_ID_LEN);
if (trx->id == trx_read_trx_id(trx_id)) {
err = DB_FOREIGN_DUPLICATE_KEY;
}
}
goto func_exit; goto func_exit;
} }
} }
......
...@@ -2125,6 +2125,7 @@ static dberr_t row_update_vers_insert(que_thr_t* thr, upd_node_t* node) ...@@ -2125,6 +2125,7 @@ static dberr_t row_update_vers_insert(que_thr_t* thr, upd_node_t* node)
dfield_t* row_end; dfield_t* row_end;
char row_end_data[8]; char row_end_data[8];
dict_table_t* table = node->table; dict_table_t* table = node->table;
page_size_t page_size= dict_table_page_size(table);
ut_ad(table->versioned()); ut_ad(table->versioned());
dtuple_t* row; dtuple_t* row;
...@@ -2152,9 +2153,27 @@ static dberr_t row_update_vers_insert(que_thr_t* thr, upd_node_t* node) ...@@ -2152,9 +2153,27 @@ static dberr_t row_update_vers_insert(que_thr_t* thr, upd_node_t* node)
ut_ad(n_cols > DATA_N_SYS_COLS); ut_ad(n_cols > DATA_N_SYS_COLS);
// Exclude DB_ROW_ID, DB_TRX_ID, DB_ROLL_PTR // Exclude DB_ROW_ID, DB_TRX_ID, DB_ROLL_PTR
for (ulint i = 0; i < n_cols - DATA_N_SYS_COLS; i++) { for (ulint i = 0; i < n_cols - DATA_N_SYS_COLS; i++) {
dfield_t *dst= dtuple_get_nth_field(row, i);
dfield_t *src= dtuple_get_nth_field(node->historical_row, i); dfield_t *src= dtuple_get_nth_field(node->historical_row, i);
dfield_t *dst= dtuple_get_nth_field(row, i);
dfield_copy(dst, src); dfield_copy(dst, src);
if (dfield_is_ext(src)) {
byte *field_data
= static_cast<byte*>(dfield_get_data(src));
ulint ext_len;
ulint field_len = dfield_get_len(src);
ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
ut_a(memcmp(field_data + field_len
- BTR_EXTERN_FIELD_REF_SIZE,
field_ref_zero,
BTR_EXTERN_FIELD_REF_SIZE));
byte *data = btr_copy_externally_stored_field(
&ext_len, field_data, page_size, field_len,
node->historical_heap);
dfield_set_data(dst, data, ext_len);
}
} }
for (ulint i = 0; i < n_v_cols; i++) { for (ulint i = 0; i < n_v_cols; i++) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment