Commit fe618de6 authored by Aleksey Midenkov's avatar Aleksey Midenkov

MDEV-31313 SYSTEM VERSIONING and FOREIGN KEY CASCADE create orphan rows on replica

Constraints processing row_ins_check_foreign_constraint() was not
called because row_upd_check_references_constraints() didn't see
update as delete: node->is_delete was false.

Since MDEV-30378 we check for TRG_EVENT_DELETE to detect versioned
delete in ha_innobase::update_row().

Now we can use TRG_EVENT_DELETE to set upd_node->is_delete, so
constraints processing is triggered correctly.
parent add0c01b
......@@ -91,6 +91,18 @@ begin
end~~
delimiter ;~~
delimiter ~~;
eval create or replace function check_row_slave(row_start $sys_datatype_expl, row_end $sys_datatype_expl)
returns varchar(255)
deterministic
begin
if current_row(row_end) then
return "CURRENT ROW";
end if;
return "HISTORICAL ROW";
end~~
delimiter ;~~
delimiter ~~;
eval create or replace function check_row_ts(row_start timestamp(6), row_end timestamp(6))
returns varchar(255)
......
......@@ -4,6 +4,7 @@ drop procedure if exists verify_trt;
drop procedure if exists verify_trt_dummy;
drop function if exists current_row;
drop function if exists check_row;
drop function if exists check_row_slave;
drop function if exists current_row_ts;
drop function if exists check_row_ts;
--enable_warnings
......
......@@ -188,4 +188,53 @@ connection slave;
include/diff_tables.inc [master:test.t1,slave:test.t1]
connection master;
drop table t1;
#
# MDEV-31313 SYSTEM VERSIONING and FOREIGN KEY CASCADE create orphan rows on replica
#
create table parent (
id int(11) not null auto_increment,
processdate datetime default null,
primary key (id)
) engine=innodb with system versioning;
set timestamp= unix_timestamp('2000-01-01 00:00:00');
insert into parent values (1, now());
create table child (
id int(11) not null auto_increment,
ch_name varchar(30),
andreid int(11) default null,
primary key (id),
key andreid (andreid),
constraint fk_andreid foreign key (andreid) references parent (id) on delete cascade
) engine=innodb with system versioning;
set timestamp= unix_timestamp('2000-01-01 00:00:01');
insert into child values (null, 'vimtomar', 1);
set timestamp= unix_timestamp('2000-01-01 00:00:02');
delete from parent where id = 1;
select check_row(row_start, row_end) from parent for system_time all;
check_row(row_start, row_end)
HISTORICAL ROW
select check_row(row_start, row_end) from child for system_time all;
check_row(row_start, row_end)
HISTORICAL ROW
select * from child;
id ch_name andreid
select * from parent;
id processdate
connection slave;
select check_row_slave(row_start, row_end) from parent for system_time all;
check_row_slave(row_start, row_end)
HISTORICAL ROW
select check_row_slave(row_start, row_end) from child for system_time all;
check_row_slave(row_start, row_end)
HISTORICAL ROW
select * from child;
id ch_name andreid
select * from parent;
id processdate
connection master;
set timestamp= default;
drop table child;
drop table parent;
connection slave;
connection master;
include/rpl_end.inc
--source suite/versioning/engines.inc
--source suite/versioning/common.inc
--source include/have_partition.inc
--source include/master-slave.inc
......@@ -6,6 +7,7 @@
#Testing command counters -BEFORE.
#Storing the before counts of Slave
connection slave;
--source suite/versioning/common.inc
let $slave_com_commit_before= query_get_value(SHOW GLOBAL STATUS LIKE 'com_commit', Value, 1);
let $slave_com_insert_before= query_get_value(SHOW GLOBAL STATUS LIKE 'com_insert', Value, 1);
let $slave_com_delete_before= query_get_value(SHOW GLOBAL STATUS LIKE 'com_delete', Value, 1);
......@@ -167,4 +169,55 @@ sync_slave_with_master;
connection master;
drop table t1;
--echo #
--echo # MDEV-31313 SYSTEM VERSIONING and FOREIGN KEY CASCADE create orphan rows on replica
--echo #
create table parent (
id int(11) not null auto_increment,
processdate datetime default null,
primary key (id)
) engine=innodb with system versioning;
set timestamp= unix_timestamp('2000-01-01 00:00:00');
insert into parent values (1, now());
create table child (
id int(11) not null auto_increment,
ch_name varchar(30),
andreid int(11) default null,
primary key (id),
key andreid (andreid),
constraint fk_andreid foreign key (andreid) references parent (id) on delete cascade
) engine=innodb with system versioning;
set timestamp= unix_timestamp('2000-01-01 00:00:01');
insert into child values (null, 'vimtomar', 1);
set timestamp= unix_timestamp('2000-01-01 00:00:02');
delete from parent where id = 1;
select check_row(row_start, row_end) from parent for system_time all;
select check_row(row_start, row_end) from child for system_time all;
select * from child;
select * from parent;
sync_slave_with_master;
# Annoying tweaking of microseconds in slave row_end, so row_end can be <= row_start
select check_row_slave(row_start, row_end) from parent for system_time all;
select check_row_slave(row_start, row_end) from child for system_time all;
select * from child;
select * from parent;
# Cleanup
--source suite/versioning/common_finish.inc
--connection master
set timestamp= default;
drop table child;
drop table parent;
sync_slave_with_master;
connection master;
--source suite/versioning/common_finish.inc
--source include/rpl_end.inc
......@@ -14759,6 +14759,11 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
return error;
}
const bool history_change= m_table->versioned() ?
!m_table->vers_end_field()->is_max() : false;
TABLE_LIST *tl= m_table->pos_in_table_list;
uint8 trg_event_map_save= tl->trg_event_map;
/*
This is the situation after locating BI:
......@@ -14816,9 +14821,17 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
goto err;
}
if (m_vers_from_plain && m_table->versioned(VERS_TIMESTAMP))
m_table->vers_update_fields();
if (m_table->versioned())
{
if (m_vers_from_plain && m_table->versioned(VERS_TIMESTAMP))
m_table->vers_update_fields();
if (!history_change && !m_table->vers_end_field()->is_max())
{
tl->trg_event_map|= trg2bit(TRG_EVENT_DELETE);
}
}
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
tl->trg_event_map= trg_event_map_save;
if (unlikely(error == HA_ERR_RECORD_IS_THE_SAME))
error= 0;
if (m_vers_from_plain && m_table->versioned(VERS_TIMESTAMP))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment