Commit 706a8232 authored by Aleksey Midenkov's avatar Aleksey Midenkov

MDEV-25477 Auto-create breaks replication when triggering event was not replicated

If UPDATE/DELETE does not change data it is skipped from
replication. We now force replication of such events when they trigger
partition auto-creation.

For ROLLBACK it is as simple as set OPTION_KEEP_LOG
flag. trans_cannot_safely_rollback() does the rest.

For UPDATE/DELETE .. LIMIT 0 we make additional binlog_query() calls
at the early points of return.

As a safety measure we also convert row format into statement if it is
needed. The condition is decided by
binlog_need_stmt_format(). Basically if there are some row events in
cache we don't need that: table open of row event will trigger
auto-creation anyway.

Multi-update/delete works via mysql_select(). There is no early points
of return, so binlogging is always checked by
send_eof()/abort_resultset(). But we must comply with the above
measure of converting into statement.
parent 92bfc0e8
...@@ -192,4 +192,196 @@ PARTITIONS 3 ...@@ -192,4 +192,196 @@ PARTITIONS 3
connection master; connection master;
drop table t1; drop table t1;
set timestamp= default; set timestamp= default;
#
# MDEV-25477 Auto-create breaks replication when triggering event was not replicated
#
set timestamp= unix_timestamp('2001-01-01 01:00:00');
# ROLLBACK
create table t (a int) with system versioning
partition by system_time interval 1 hour auto;
insert into t values (1), (2);
set @@timestamp= @@timestamp + 3601;
start transaction;
delete from t;
rollback;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) DEFAULT NULL
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 01:00:00' AUTO
PARTITIONS 3
connection slave;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) DEFAULT NULL
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 01:00:00' AUTO
PARTITIONS 3
connection master;
alter table t drop partition p0;
connection slave;
# INSERT .. ODKU
connection master;
create or replace table t (a int primary key) with system versioning
partition by system_time interval 1 hour auto;
insert into t values (1), (2);
set @@timestamp= @@timestamp + 3601;
insert into t values (1) on duplicate key update a= a;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) NOT NULL,
PRIMARY KEY (`a`)
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 02:00:00' AUTO
PARTITIONS 3
connection slave;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) NOT NULL,
PRIMARY KEY (`a`)
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 02:00:00' AUTO
PARTITIONS 3
connection master;
alter table t drop partition p0;
connection slave;
# INSERT .. SELECT .. ODKU
connection master;
create or replace table t (a int primary key) with system versioning
partition by system_time interval 1 hour auto;
insert into t values (1), (2);
set @@timestamp= @@timestamp + 3601;
call mtr.add_suppression("Unsafe statement written to the binary log");
insert t select a from t where a = 1 limit 0 on duplicate key update a= 1;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) NOT NULL,
PRIMARY KEY (`a`)
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 03:00:00' AUTO
PARTITIONS 3
connection slave;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) NOT NULL,
PRIMARY KEY (`a`)
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 03:00:00' AUTO
PARTITIONS 3
connection master;
alter table t drop partition p0;
connection slave;
# UPDATE
connection master;
create or replace table t (a int) with system versioning
partition by system_time interval 1 hour auto;
insert into t values (1), (2);
set @@timestamp= @@timestamp + 3601;
update t set a= 3 limit 0;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) DEFAULT NULL
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 04:00:00' AUTO
PARTITIONS 3
connection slave;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) DEFAULT NULL
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 04:00:00' AUTO
PARTITIONS 3
connection master;
alter table t drop partition p0;
connection slave;
# DELETE
connection master;
create or replace table t (a int) with system versioning
partition by system_time interval 1 hour auto;
insert into t values (1), (2);
set @@timestamp= @@timestamp + 3601;
delete from t limit 0;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) DEFAULT NULL
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 05:00:00' AUTO
PARTITIONS 3
connection slave;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) DEFAULT NULL
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 05:00:00' AUTO
PARTITIONS 3
connection master;
alter table t drop partition p0;
connection slave;
# Multi-update
connection master;
create or replace table t (a int) with system versioning
partition by system_time interval 1 hour auto;
create or replace table t2 (b int);
insert into t values (0), (1);
insert into t2 values (10), (20);
set @@timestamp= @@timestamp + 3601;
update t left join t2 on a > b set a= 4;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) DEFAULT NULL
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 06:00:00' AUTO
PARTITIONS 3
connection slave;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) DEFAULT NULL
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 06:00:00' AUTO
PARTITIONS 3
connection master;
alter table t drop partition p0;
connection slave;
# Multi-delete
connection master;
create or replace table t (a int) with system versioning
partition by system_time interval 1 hour auto;
create or replace table t2 (b int);
insert into t values (0), (1);
insert into t2 values (10), (20);
set @@timestamp= @@timestamp + 3601;
delete t, t2 from t join t2 where a > b;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) DEFAULT NULL
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 07:00:00' AUTO
PARTITIONS 3
connection slave;
show create table t;
Table Create Table
t CREATE TABLE `t` (
`a` int(11) DEFAULT NULL
) ENGINE=ENGINE DEFAULT CHARSET=latin1 WITH SYSTEM VERSIONING
PARTITION BY SYSTEM_TIME INTERVAL 1 HOUR STARTS TIMESTAMP'2001-01-01 07:00:00' AUTO
PARTITIONS 3
connection master;
alter table t drop partition p0;
connection slave;
connection master;
drop tables t, t2;
set timestamp= default;
include/rpl_end.inc include/rpl_end.inc
...@@ -152,4 +152,148 @@ show create table t1; ...@@ -152,4 +152,148 @@ show create table t1;
drop table t1; drop table t1;
set timestamp= default; set timestamp= default;
--echo #
--echo # MDEV-25477 Auto-create breaks replication when triggering event was not replicated
--echo #
set timestamp= unix_timestamp('2001-01-01 01:00:00');
--echo # ROLLBACK
create table t (a int) with system versioning
partition by system_time interval 1 hour auto;
insert into t values (1), (2);
set @@timestamp= @@timestamp + 3601;
start transaction;
delete from t;
--disable_warnings
rollback;
# Warning: Some non-transactional changed tables couldn't be rolled back
--enable_warnings
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--sync_slave_with_master
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--connection master
alter table t drop partition p0;
--sync_slave_with_master
--echo # INSERT .. ODKU
--connection master
create or replace table t (a int primary key) with system versioning
partition by system_time interval 1 hour auto;
insert into t values (1), (2);
set @@timestamp= @@timestamp + 3601;
insert into t values (1) on duplicate key update a= a;
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--sync_slave_with_master
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--connection master
alter table t drop partition p0;
--sync_slave_with_master
--echo # INSERT .. SELECT .. ODKU
--connection master
create or replace table t (a int primary key) with system versioning
partition by system_time interval 1 hour auto;
insert into t values (1), (2);
set @@timestamp= @@timestamp + 3601;
--disable_warnings
call mtr.add_suppression("Unsafe statement written to the binary log");
insert t select a from t where a = 1 limit 0 on duplicate key update a= 1;
--enable_warnings
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--sync_slave_with_master
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--connection master
alter table t drop partition p0;
--sync_slave_with_master
--echo # UPDATE
--connection master
create or replace table t (a int) with system versioning
partition by system_time interval 1 hour auto;
insert into t values (1), (2);
set @@timestamp= @@timestamp + 3601;
update t set a= 3 limit 0;
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--sync_slave_with_master
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--connection master
alter table t drop partition p0;
--sync_slave_with_master
--echo # DELETE
--connection master
create or replace table t (a int) with system versioning
partition by system_time interval 1 hour auto;
insert into t values (1), (2);
set @@timestamp= @@timestamp + 3601;
delete from t limit 0;
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--sync_slave_with_master
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--connection master
alter table t drop partition p0;
--sync_slave_with_master
--echo # Multi-update
--connection master
create or replace table t (a int) with system versioning
partition by system_time interval 1 hour auto;
create or replace table t2 (b int);
insert into t values (0), (1);
insert into t2 values (10), (20);
set @@timestamp= @@timestamp + 3601;
# Note: limit 0 is not important for multi-update/delete because they work
# via mysql_select(). OTOH limit 0 makes unwanted "unsafe" warning.
update t left join t2 on a > b set a= 4;
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--sync_slave_with_master
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--connection master
alter table t drop partition p0;
--sync_slave_with_master
--echo # Multi-delete
--connection master
create or replace table t (a int) with system versioning
partition by system_time interval 1 hour auto;
create or replace table t2 (b int);
insert into t values (0), (1);
insert into t2 values (10), (20);
set @@timestamp= @@timestamp + 3601;
delete t, t2 from t join t2 where a > b;
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--sync_slave_with_master
--replace_result InnoDB ENGINE MyISAM ENGINE MEMORY ENGINE
show create table t;
--connection master
alter table t drop partition p0;
--sync_slave_with_master
--connection master
drop tables t, t2;
set timestamp= default;
--source include/rpl_end.inc --source include/rpl_end.inc
...@@ -921,6 +921,8 @@ bool partition_info::vers_set_hist_part(THD *thd, uint *create_count) ...@@ -921,6 +921,8 @@ bool partition_info::vers_set_hist_part(THD *thd, uint *create_count)
/** /**
@brief Run fast_alter_partition_table() to add new history partitions @brief Run fast_alter_partition_table() to add new history partitions
for tables requiring them. for tables requiring them.
@param num_parts Number of partitions to create
*/ */
bool vers_create_partitions(THD *thd, TABLE_LIST* tl, uint num_parts) bool vers_create_partitions(THD *thd, TABLE_LIST* tl, uint num_parts)
{ {
...@@ -937,6 +939,7 @@ bool vers_create_partitions(THD *thd, TABLE_LIST* tl, uint num_parts) ...@@ -937,6 +939,7 @@ bool vers_create_partitions(THD *thd, TABLE_LIST* tl, uint num_parts)
TABLE *table= tl->table; TABLE *table= tl->table;
DBUG_ASSERT(!thd->is_error()); DBUG_ASSERT(!thd->is_error());
DBUG_ASSERT(num_parts);
{ {
DBUG_ASSERT(table->s->get_table_ref_type() == TABLE_REF_BASE_TABLE); DBUG_ASSERT(table->s->get_table_ref_type() == TABLE_REF_BASE_TABLE);
...@@ -1025,6 +1028,7 @@ bool vers_create_partitions(THD *thd, TABLE_LIST* tl, uint num_parts) ...@@ -1025,6 +1028,7 @@ bool vers_create_partitions(THD *thd, TABLE_LIST* tl, uint num_parts)
// NOTE: we have to return DA_EMPTY for new command // NOTE: we have to return DA_EMPTY for new command
DBUG_ASSERT(thd->get_stmt_da()->is_ok()); DBUG_ASSERT(thd->get_stmt_da()->is_ok());
thd->get_stmt_da()->reset_diagnostics_area(); thd->get_stmt_da()->reset_diagnostics_area();
thd->variables.option_bits|= OPTION_BINLOG_THIS;
exit: exit:
thd->work_part_info= save_part_info; thd->work_part_info= save_part_info;
......
...@@ -1674,6 +1674,7 @@ bool TABLE::vers_switch_partition(THD *thd, TABLE_LIST *table_list, ...@@ -1674,6 +1674,7 @@ bool TABLE::vers_switch_partition(THD *thd, TABLE_LIST *table_list,
{ {
switch (thd->lex->sql_command) switch (thd->lex->sql_command)
{ {
case SQLCOM_INSERT_SELECT:
case SQLCOM_INSERT: case SQLCOM_INSERT:
if (thd->lex->duplicates != DUP_UPDATE) if (thd->lex->duplicates != DUP_UPDATE)
return false; return false;
......
...@@ -7327,6 +7327,26 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) ...@@ -7327,6 +7327,26 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
} }
/*
DML that doesn't change the table normally is not logged,
but it needs to be logged if it auto-created a partition as a side effect.
*/
bool THD::binlog_for_noop_dml(bool transactional_table)
{
if (log_current_statement())
{
reset_unsafe_warnings();
if (binlog_query(THD::STMT_QUERY_TYPE, query(), query_length(),
transactional_table, FALSE, FALSE, 0) > 0)
{
my_error(ER_ERROR_ON_WRITE, MYF(0), "binary log", -1);
return true;
}
}
return false;
}
#if !defined(DBUG_OFF) && !defined(_lint) #if !defined(DBUG_OFF) && !defined(_lint)
static const char * static const char *
show_query_type(THD::enum_binlog_query_type qtype) show_query_type(THD::enum_binlog_query_type qtype)
......
...@@ -2918,6 +2918,14 @@ class THD: public THD_count, /* this must be first */ ...@@ -2918,6 +2918,14 @@ class THD: public THD_count, /* this must be first */
int binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional); int binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional);
int binlog_remove_pending_rows_event(bool clear_maps, bool is_transactional); int binlog_remove_pending_rows_event(bool clear_maps, bool is_transactional);
bool binlog_need_stmt_format(bool is_transactional) const
{
return log_current_statement() &&
!binlog_get_pending_rows_event(is_transactional);
}
bool binlog_for_noop_dml(bool transactional_table);
/** /**
Determine the binlog format of the current statement. Determine the binlog format of the current statement.
...@@ -7738,24 +7746,25 @@ class Sp_eval_expr_state ...@@ -7738,24 +7746,25 @@ class Sp_eval_expr_state
void dbug_serve_apcs(THD *thd, int n_calls); void dbug_serve_apcs(THD *thd, int n_calls);
#endif #endif
class ScopedStatementReplication class StatementBinlog
{ {
const enum_binlog_format saved_binlog_format;
THD *const thd;
public: public:
ScopedStatementReplication(THD *thd) : StatementBinlog(THD *thd, bool need_stmt) :
saved_binlog_format(thd saved_binlog_format(thd->get_current_stmt_binlog_format()),
? thd->set_current_stmt_binlog_format_stmt()
: BINLOG_FORMAT_MIXED),
thd(thd) thd(thd)
{}
~ScopedStatementReplication()
{ {
if (thd) if (need_stmt && saved_binlog_format != BINLOG_FORMAT_STMT)
thd->restore_stmt_binlog_format(saved_binlog_format); {
thd->set_current_stmt_binlog_format_stmt();
}
}
~StatementBinlog()
{
thd->set_current_stmt_binlog_format(saved_binlog_format);
} }
private:
const enum_binlog_format saved_binlog_format;
THD *const thd;
}; };
......
...@@ -450,6 +450,7 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, ...@@ -450,6 +450,7 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
*/ */
has_triggers= table->triggers && table->triggers->has_delete_triggers(); has_triggers= table->triggers && table->triggers->has_delete_triggers();
transactional_table= table->file->has_transactions_and_rollback();
if (!returning && !using_limit && const_cond_result && if (!returning && !using_limit && const_cond_result &&
(!thd->is_current_stmt_binlog_format_row() && !has_triggers) (!thd->is_current_stmt_binlog_format_row() && !has_triggers)
...@@ -508,6 +509,9 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, ...@@ -508,6 +509,9 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
if (thd->lex->describe || thd->lex->analyze_stmt) if (thd->lex->describe || thd->lex->analyze_stmt)
goto produce_explain_and_leave; goto produce_explain_and_leave;
if (thd->binlog_for_noop_dml(transactional_table))
DBUG_RETURN(1);
my_ok(thd, 0); my_ok(thd, 0);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -538,6 +542,10 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, ...@@ -538,6 +542,10 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
*/ */
if (unlikely(thd->is_error())) if (unlikely(thd->is_error()))
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
if (thd->binlog_for_noop_dml(transactional_table))
DBUG_RETURN(1);
my_ok(thd, 0); my_ok(thd, 0);
DBUG_RETURN(0); // Nothing to delete DBUG_RETURN(0); // Nothing to delete
} }
...@@ -916,14 +924,14 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, ...@@ -916,14 +924,14 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
deltempfile=NULL; deltempfile=NULL;
delete select; delete select;
select= NULL; select= NULL;
transactional_table= table->file->has_transactions_and_rollback();
if (!transactional_table && deleted > 0) if (!transactional_table && deleted > 0)
thd->transaction->stmt.modified_non_trans_table= thd->transaction->stmt.modified_non_trans_table=
thd->transaction->all.modified_non_trans_table= TRUE; thd->transaction->all.modified_non_trans_table= TRUE;
/* See similar binlogging code in sql_update.cc, for comments */ /* See similar binlogging code in sql_update.cc, for comments */
if (likely((error < 0) || thd->transaction->stmt.modified_non_trans_table)) if (likely((error < 0) || thd->transaction->stmt.modified_non_trans_table
|| thd->log_current_statement()))
{ {
if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
{ {
...@@ -933,8 +941,8 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds, ...@@ -933,8 +941,8 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
else else
errcode= query_error_code(thd, killed_status == NOT_KILLED); errcode= query_error_code(thd, killed_status == NOT_KILLED);
ScopedStatementReplication scoped_stmt_rpl( StatementBinlog stmt_binlog(thd, table->versioned(VERS_TRX_ID) ||
table->versioned(VERS_TRX_ID) ? thd : NULL); thd->binlog_need_stmt_format(transactional_table));
/* /*
[binlog]: If 'handler::delete_all_rows()' was called and the [binlog]: If 'handler::delete_all_rows()' was called and the
storage engine does not inject the rows itself, we replicate storage engine does not inject the rows itself, we replicate
...@@ -1438,13 +1446,15 @@ void multi_delete::abort_result_set() ...@@ -1438,13 +1446,15 @@ void multi_delete::abort_result_set()
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
if (thd->transaction->stmt.modified_non_trans_table) if (thd->transaction->stmt.modified_non_trans_table ||
thd->log_current_statement())
{ {
/* /*
there is only side effects; to binlog with the error there is only side effects; to binlog with the error
*/ */
if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
{ {
StatementBinlog stmt_binlog(thd, thd->binlog_need_stmt_format(transactional_tables));
int errcode= query_error_code(thd, thd->killed == NOT_KILLED); int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
/* possible error of writing binary log is ignored deliberately */ /* possible error of writing binary log is ignored deliberately */
(void) thd->binlog_query(THD::ROW_QUERY_TYPE, (void) thd->binlog_query(THD::ROW_QUERY_TYPE,
...@@ -1618,7 +1628,8 @@ bool multi_delete::send_eof() ...@@ -1618,7 +1628,8 @@ bool multi_delete::send_eof()
query_cache_invalidate3(thd, delete_tables, 1); query_cache_invalidate3(thd, delete_tables, 1);
} }
if (likely((local_error == 0) || if (likely((local_error == 0) ||
thd->transaction->stmt.modified_non_trans_table)) thd->transaction->stmt.modified_non_trans_table) ||
thd->log_current_statement())
{ {
if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
{ {
...@@ -1628,6 +1639,7 @@ bool multi_delete::send_eof() ...@@ -1628,6 +1639,7 @@ bool multi_delete::send_eof()
else else
errcode= query_error_code(thd, killed_status == NOT_KILLED); errcode= query_error_code(thd, killed_status == NOT_KILLED);
thd->thread_specific_used= TRUE; thd->thread_specific_used= TRUE;
StatementBinlog stmt_binlog(thd, thd->binlog_need_stmt_format(transactional_tables));
if (unlikely(thd->binlog_query(THD::ROW_QUERY_TYPE, if (unlikely(thd->binlog_query(THD::ROW_QUERY_TYPE,
thd->query(), thd->query_length(), thd->query(), thd->query_length(),
transactional_tables, FALSE, FALSE, transactional_tables, FALSE, FALSE,
......
...@@ -1207,6 +1207,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, ...@@ -1207,6 +1207,7 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
if (error <= 0 || if (error <= 0 ||
thd->transaction->stmt.modified_non_trans_table || thd->transaction->stmt.modified_non_trans_table ||
thd->log_current_statement() ||
was_insert_delayed) was_insert_delayed)
{ {
if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
...@@ -1229,8 +1230,8 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list, ...@@ -1229,8 +1230,8 @@ bool mysql_insert(THD *thd, TABLE_LIST *table_list,
else else
errcode= query_error_code(thd, thd->killed == NOT_KILLED); errcode= query_error_code(thd, thd->killed == NOT_KILLED);
ScopedStatementReplication scoped_stmt_rpl( StatementBinlog stmt_binlog(thd, table->versioned(VERS_TRX_ID) ||
table->versioned(VERS_TRX_ID) ? thd : NULL); thd->binlog_need_stmt_format(transactional_table));
/* bug#22725: /* bug#22725:
A query which per-row-loop can not be interrupted with A query which per-row-loop can not be interrupted with
...@@ -4223,7 +4224,8 @@ bool select_insert::prepare_eof() ...@@ -4223,7 +4224,8 @@ bool select_insert::prepare_eof()
ha_autocommit_or_rollback() is issued below. ha_autocommit_or_rollback() is issued below.
*/ */
if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) && if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
(likely(!error) || thd->transaction->stmt.modified_non_trans_table)) (likely(!error) || thd->transaction->stmt.modified_non_trans_table ||
thd->log_current_statement()))
{ {
int errcode= 0; int errcode= 0;
int res; int res;
...@@ -4231,6 +4233,8 @@ bool select_insert::prepare_eof() ...@@ -4231,6 +4233,8 @@ bool select_insert::prepare_eof()
thd->clear_error(); thd->clear_error();
else else
errcode= query_error_code(thd, killed_status == NOT_KILLED); errcode= query_error_code(thd, killed_status == NOT_KILLED);
StatementBinlog stmt_binlog(thd, !can_rollback_data() &&
thd->binlog_need_stmt_format(trans_table));
res= thd->binlog_query(THD::ROW_QUERY_TYPE, res= thd->binlog_query(THD::ROW_QUERY_TYPE,
thd->query(), thd->query_length(), thd->query(), thd->query_length(),
trans_table, FALSE, FALSE, errcode); trans_table, FALSE, FALSE, errcode);
...@@ -4351,6 +4355,8 @@ void select_insert::abort_result_set() ...@@ -4351,6 +4355,8 @@ void select_insert::abort_result_set()
if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
{ {
StatementBinlog stmt_binlog(thd, !can_rollback_data() &&
thd->binlog_need_stmt_format(transactional_table));
int errcode= query_error_code(thd, thd->killed == NOT_KILLED); int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
int res; int res;
/* error of writing binary log is ignored */ /* error of writing binary log is ignored */
......
...@@ -552,6 +552,7 @@ int mysql_update(THD *thd, ...@@ -552,6 +552,7 @@ int mysql_update(THD *thd,
// Don't count on usage of 'only index' when calculating which key to use // Don't count on usage of 'only index' when calculating which key to use
table->covering_keys.clear_all(); table->covering_keys.clear_all();
transactional_table= table->file->has_transactions_and_rollback();
#ifdef WITH_PARTITION_STORAGE_ENGINE #ifdef WITH_PARTITION_STORAGE_ENGINE
if (prune_partitions(thd, table, conds)) if (prune_partitions(thd, table, conds))
...@@ -564,6 +565,9 @@ int mysql_update(THD *thd, ...@@ -564,6 +565,9 @@ int mysql_update(THD *thd,
if (thd->is_error()) if (thd->is_error())
DBUG_RETURN(1); DBUG_RETURN(1);
if (thd->binlog_for_noop_dml(transactional_table))
DBUG_RETURN(1);
my_ok(thd); // No matching records my_ok(thd); // No matching records
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -593,6 +597,10 @@ int mysql_update(THD *thd, ...@@ -593,6 +597,10 @@ int mysql_update(THD *thd,
{ {
DBUG_RETURN(1); // Error in where DBUG_RETURN(1); // Error in where
} }
if (thd->binlog_for_noop_dml(transactional_table))
DBUG_RETURN(1);
my_ok(thd); // No matching records my_ok(thd); // No matching records
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -957,7 +965,6 @@ int mysql_update(THD *thd, ...@@ -957,7 +965,6 @@ int mysql_update(THD *thd,
thd->count_cuted_fields= CHECK_FIELD_WARN; thd->count_cuted_fields= CHECK_FIELD_WARN;
thd->cuted_fields=0L; thd->cuted_fields=0L;
transactional_table= table->file->has_transactions_and_rollback();
thd->abort_on_warning= !ignore && thd->is_strict_mode(); thd->abort_on_warning= !ignore && thd->is_strict_mode();
if (do_direct_update) if (do_direct_update)
...@@ -1296,7 +1303,8 @@ int mysql_update(THD *thd, ...@@ -1296,7 +1303,8 @@ int mysql_update(THD *thd,
Sometimes we want to binlog even if we updated no rows, in case user used Sometimes we want to binlog even if we updated no rows, in case user used
it to be sure master and slave are in same state. it to be sure master and slave are in same state.
*/ */
if (likely(error < 0) || thd->transaction->stmt.modified_non_trans_table) if (likely(error < 0) || thd->transaction->stmt.modified_non_trans_table ||
thd->log_current_statement())
{ {
if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
{ {
...@@ -1306,9 +1314,8 @@ int mysql_update(THD *thd, ...@@ -1306,9 +1314,8 @@ int mysql_update(THD *thd,
else else
errcode= query_error_code(thd, killed_status == NOT_KILLED); errcode= query_error_code(thd, killed_status == NOT_KILLED);
ScopedStatementReplication scoped_stmt_rpl( StatementBinlog stmt_binlog(thd, table->versioned(VERS_TRX_ID) ||
table->versioned(VERS_TRX_ID) ? thd : NULL); thd->binlog_need_stmt_format(transactional_table));
if (thd->binlog_query(THD::ROW_QUERY_TYPE, if (thd->binlog_query(THD::ROW_QUERY_TYPE,
thd->query(), thd->query_length(), thd->query(), thd->query_length(),
transactional_table, FALSE, FALSE, errcode) > 0) transactional_table, FALSE, FALSE, errcode) > 0)
...@@ -2722,7 +2729,8 @@ void multi_update::abort_result_set() ...@@ -2722,7 +2729,8 @@ void multi_update::abort_result_set()
(void) do_updates(); (void) do_updates();
} }
} }
if (thd->transaction->stmt.modified_non_trans_table) if (thd->transaction->stmt.modified_non_trans_table ||
thd->log_current_statement())
{ {
/* /*
The query has to binlog because there's a modified non-transactional table The query has to binlog because there's a modified non-transactional table
...@@ -2730,6 +2738,7 @@ void multi_update::abort_result_set() ...@@ -2730,6 +2738,7 @@ void multi_update::abort_result_set()
*/ */
if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
{ {
StatementBinlog stmt_binlog(thd, thd->binlog_need_stmt_format(transactional_tables));
/* /*
THD::killed status might not have been set ON at time of an error THD::killed status might not have been set ON at time of an error
got caught and if happens later the killed error is written got caught and if happens later the killed error is written
...@@ -3058,7 +3067,8 @@ bool multi_update::send_eof() ...@@ -3058,7 +3067,8 @@ bool multi_update::send_eof()
(thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT); (thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
if (likely(local_error == 0 || if (likely(local_error == 0 ||
thd->transaction->stmt.modified_non_trans_table)) thd->transaction->stmt.modified_non_trans_table) ||
thd->log_current_statement())
{ {
if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
{ {
...@@ -3068,7 +3078,8 @@ bool multi_update::send_eof() ...@@ -3068,7 +3078,8 @@ bool multi_update::send_eof()
else else
errcode= query_error_code(thd, killed_status == NOT_KILLED); errcode= query_error_code(thd, killed_status == NOT_KILLED);
bool force_stmt= false; bool force_stmt= thd->binlog_need_stmt_format(transactional_tables);
if (!force_stmt)
for (TABLE *table= all_tables->table; table; table= table->next) for (TABLE *table= all_tables->table; table; table= table->next)
{ {
if (table->versioned(VERS_TRX_ID)) if (table->versioned(VERS_TRX_ID))
...@@ -3077,16 +3088,11 @@ bool multi_update::send_eof() ...@@ -3077,16 +3088,11 @@ bool multi_update::send_eof()
break; break;
} }
} }
enum_binlog_format save_binlog_format; StatementBinlog stmt_binlog(thd, force_stmt);
save_binlog_format= thd->get_current_stmt_binlog_format();
if (force_stmt)
thd->set_current_stmt_binlog_format_stmt();
if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(), if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query(),
thd->query_length(), transactional_tables, FALSE, thd->query_length(), transactional_tables, FALSE,
FALSE, errcode) > 0) FALSE, errcode) > 0)
local_error= 1; // Rollback update local_error= 1; // Rollback update
thd->set_current_stmt_binlog_format(save_binlog_format);
} }
} }
DBUG_ASSERT(trans_safe || !updated || DBUG_ASSERT(trans_safe || !updated ||
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment