Commit f3e578ab authored by Monty's avatar Monty

Fixed MDEV-8428: Mangled DML statements on 2nd level slave when enabling binlog checksums

Fix was to add a test in Query_log_event::Query_log_event() if we are using
CREATE ... SELECT and in this case use trans cache, like we do on the master.
This avoid using (with doesn't have checksum)

Other things:
- Removed dummy call my_checksum(0L, NULL, 0)
- More DBUG_PRINT
- Cleaned up Log_event::need_checksum() to make it more readable (similar as in MySQL 5.6)
- Renamed variable that was hiding another one in create_table_imp()
parent 2ebedfa9
include/rpl_init.inc [topology=1->2->3]
# On server_1
CREATE DATABASE test_8428;
USE test_8428;
CREATE TABLE t1(i INT) ENGINE=INNODB;
INSERT INTO t1 VALUES(1), (2), (3), (4), (5);
CREATE TABLE t2 AS SELECT * FROM t1;
CREATE TABLE t3 ENGINE=MyISAM AS SELECT * FROM t1;
# On server_2
SHOW TABLES IN test_8428;
Tables_in_test_8428
t1
t2
t3
# On server_3
SHOW TABLES IN test_8428;
Tables_in_test_8428
t1
t2
t3
SELECT * from test_8428.t1;
i
1
2
3
4
5
DROP DATABASE test_8428;
include/rpl_end.inc
# End of test
!include suite/rpl/my.cnf
[mysqld.1]
[mysqld.2]
log-slave-updates
binlog-checksum=CRC32
[mysqld.3]
log-slave-updates
binlog-checksum=CRC32
[ENV]
SERVER_MYPORT_3= @mysqld.3.port
SERVER_MYSOCK_3= @mysqld.3.socket
--source include/have_innodb.inc
--let $rpl_topology=1->2->3
--source include/rpl_init.inc
#
# Test of MDEV-8428 Mangled DML statements on 2nd level slave when enabling
# binlog checksums
#
connection server_1;
--echo # On server_1
CREATE DATABASE test_8428;
USE test_8428;
CREATE TABLE t1(i INT) ENGINE=INNODB;
INSERT INTO t1 VALUES(1), (2), (3), (4), (5);
CREATE TABLE t2 AS SELECT * FROM t1;
CREATE TABLE t3 ENGINE=MyISAM AS SELECT * FROM t1;
save_master_pos;
connection server_2;
sync_with_master;
--echo # On server_2
SHOW TABLES IN test_8428;
save_master_pos;
connection server_3;
sync_with_master;
--echo # On server_3
SHOW TABLES IN test_8428;
SELECT * from test_8428.t1;
# Cleanup
connection server_1;
DROP DATABASE test_8428;
--source include/rpl_end.inc
--echo # End of test
...@@ -6384,9 +6384,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6384,9 +6384,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
long val; long val;
ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t ulong end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN]; uchar header[LOG_EVENT_HEADER_LEN];
ha_checksum crc= 0, crc_0= 0; // assignments to keep compiler happy ha_checksum crc= 0, crc_0= 0;
my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF); my_bool do_checksum= (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF);
uchar buf[BINLOG_CHECKSUM_LEN]; uchar buf[BINLOG_CHECKSUM_LEN];
DBUG_ENTER("MYSQL_BIN_LOG::write_cache");
// while there is just one alg the following must hold: // while there is just one alg the following must hold:
DBUG_ASSERT(!do_checksum || DBUG_ASSERT(!do_checksum ||
...@@ -6408,9 +6409,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6408,9 +6409,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
group= (uint)my_b_tell(&log_file); group= (uint)my_b_tell(&log_file);
hdr_offs= carry= 0; hdr_offs= carry= 0;
if (do_checksum)
crc= crc_0= my_checksum(0L, NULL, 0);
do do
{ {
/* /*
...@@ -6439,7 +6438,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6439,7 +6438,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
/* write the first half of the split header */ /* write the first half of the split header */
if (my_b_write(&log_file, header, carry)) if (my_b_write(&log_file, header, carry))
return ER_ERROR_ON_WRITE; DBUG_RETURN(ER_ERROR_ON_WRITE);
status_var_add(thd->status_var.binlog_bytes_written, carry); status_var_add(thd->status_var.binlog_bytes_written, carry);
/* /*
...@@ -6483,12 +6482,12 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6483,12 +6482,12 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
crc= my_checksum(crc, cache->read_pos, length); crc= my_checksum(crc, cache->read_pos, length);
remains -= length; remains -= length;
if (my_b_write(&log_file, cache->read_pos, length)) if (my_b_write(&log_file, cache->read_pos, length))
return ER_ERROR_ON_WRITE; DBUG_RETURN(ER_ERROR_ON_WRITE);
if (remains == 0) if (remains == 0)
{ {
int4store(buf, crc); int4store(buf, crc);
if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
return ER_ERROR_ON_WRITE; DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= crc_0; crc= crc_0;
} }
} }
...@@ -6515,7 +6514,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6515,7 +6514,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
DBUG_ASSERT(remains == 0); DBUG_ASSERT(remains == 0);
if (my_b_write(&log_file, cache->read_pos, hdr_offs) || if (my_b_write(&log_file, cache->read_pos, hdr_offs) ||
my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
return ER_ERROR_ON_WRITE; DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= crc_0; crc= crc_0;
} }
} }
...@@ -6547,12 +6546,12 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6547,12 +6546,12 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
length, &crc); length, &crc);
if (my_b_write(&log_file, ev, if (my_b_write(&log_file, ev,
remains == 0 ? event_len : length - hdr_offs)) remains == 0 ? event_len : length - hdr_offs))
return ER_ERROR_ON_WRITE; DBUG_RETURN(ER_ERROR_ON_WRITE);
if (remains == 0) if (remains == 0)
{ {
int4store(buf, crc); int4store(buf, crc);
if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN)) if (my_b_write(&log_file, buf, BINLOG_CHECKSUM_LEN))
return ER_ERROR_ON_WRITE; DBUG_RETURN(ER_ERROR_ON_WRITE);
crc= crc_0; // crc is complete crc= crc_0; // crc is complete
} }
} }
...@@ -6577,10 +6576,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6577,10 +6576,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
/* Write data to the binary log file */ /* Write data to the binary log file */
DBUG_EXECUTE_IF("fail_binlog_write_1", DBUG_EXECUTE_IF("fail_binlog_write_1",
errno= 28; return ER_ERROR_ON_WRITE;); errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE););
if (!do_checksum) if (!do_checksum)
if (my_b_write(&log_file, cache->read_pos, length)) if (my_b_write(&log_file, cache->read_pos, length))
return ER_ERROR_ON_WRITE; DBUG_RETURN(ER_ERROR_ON_WRITE);
status_var_add(thd->status_var.binlog_bytes_written, length); status_var_add(thd->status_var.binlog_bytes_written, length);
cache->read_pos=cache->read_end; // Mark buffer used up cache->read_pos=cache->read_end; // Mark buffer used up
...@@ -6590,7 +6589,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) ...@@ -6590,7 +6589,7 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache)
DBUG_ASSERT(!do_checksum || remains == 0); DBUG_ASSERT(!do_checksum || remains == 0);
DBUG_ASSERT(!do_checksum || crc == crc_0); DBUG_ASSERT(!do_checksum || crc == crc_0);
return 0; // All OK DBUG_RETURN(0); // All OK
} }
/* /*
......
...@@ -1083,12 +1083,22 @@ my_bool Log_event::need_checksum() ...@@ -1083,12 +1083,22 @@ my_bool Log_event::need_checksum()
and Stop event) and Stop event)
provides their checksum alg preference through Log_event::checksum_alg. provides their checksum alg preference through Log_event::checksum_alg.
*/ */
ret= ((checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ? if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
(checksum_alg != BINLOG_CHECKSUM_ALG_OFF) : ret= (checksum_alg != BINLOG_CHECKSUM_ALG_OFF);
((binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF) && else
(cache_type == Log_event::EVENT_NO_CACHE)) ? {
MY_TEST(binlog_checksum_options) : FALSE); if (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF &&
cache_type == Log_event::EVENT_NO_CACHE)
{
checksum_alg= binlog_checksum_options;
ret= MY_TEST(binlog_checksum_options);
}
else
{
ret= FALSE;
checksum_alg= (uint8) BINLOG_CHECKSUM_ALG_OFF;
}
}
/* /*
FD calls the methods before data_written has been calculated. FD calls the methods before data_written has been calculated.
The following invariant claims if the current is not the first The following invariant claims if the current is not the first
...@@ -1099,10 +1109,6 @@ my_bool Log_event::need_checksum() ...@@ -1099,10 +1109,6 @@ my_bool Log_event::need_checksum()
DBUG_ASSERT(get_type_code() != FORMAT_DESCRIPTION_EVENT || ret || DBUG_ASSERT(get_type_code() != FORMAT_DESCRIPTION_EVENT || ret ||
data_written == 0); data_written == 0);
if (checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF)
checksum_alg= ret ? // calculated value stored
(uint8) binlog_checksum_options : (uint8) BINLOG_CHECKSUM_ALG_OFF;
DBUG_ASSERT(!ret || DBUG_ASSERT(!ret ||
((checksum_alg == binlog_checksum_options || ((checksum_alg == binlog_checksum_options ||
/* /*
...@@ -1142,17 +1148,19 @@ bool Log_event::wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong ...@@ -1142,17 +1148,19 @@ bool Log_event::wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong
bool Log_event::write_footer(IO_CACHE* file) bool Log_event::write_footer(IO_CACHE* file)
{ {
DBUG_ENTER("write_footer");
/* /*
footer contains the checksum-algorithm descriptor footer contains the checksum-algorithm descriptor
followed by the checksum value followed by the checksum value
*/ */
if (need_checksum()) if (need_checksum())
{ {
DBUG_PRINT("info", ("Writing checksum"));
uchar buf[BINLOG_CHECKSUM_LEN]; uchar buf[BINLOG_CHECKSUM_LEN];
int4store(buf, crc); int4store(buf, crc);
return (my_b_safe_write(file, (uchar*) buf, sizeof(buf))); DBUG_RETURN(my_b_safe_write(file, (uchar*) buf, sizeof(buf)));
} }
return 0; DBUG_RETURN(0);
} }
/* /*
...@@ -1174,7 +1182,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) ...@@ -1174,7 +1182,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
if (need_checksum()) if (need_checksum())
{ {
crc= my_checksum(0L, NULL, 0); crc= 0;
data_written += BINLOG_CHECKSUM_LEN; data_written += BINLOG_CHECKSUM_LEN;
} }
...@@ -3066,6 +3074,7 @@ Query_log_event::Query_log_event() ...@@ -3066,6 +3074,7 @@ Query_log_event::Query_log_event()
query_arg - array of char representing the query query_arg - array of char representing the query
query_length - size of the `query_arg' array query_length - size of the `query_arg' array
using_trans - there is a modified transactional table using_trans - there is a modified transactional table
direct - Don't cache statement
suppress_use - suppress the generation of 'USE' statements suppress_use - suppress the generation of 'USE' statements
errcode - the error code of the query errcode - the error code of the query
...@@ -3184,10 +3193,17 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, ...@@ -3184,10 +3193,17 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
break; break;
case SQLCOM_CREATE_TABLE: case SQLCOM_CREATE_TABLE:
trx_cache= (lex->select_lex.item_list.elements && /*
thd->is_current_stmt_binlog_format_row()); If we are using CREATE ... SELECT or if we are a slave
use_cache= (lex->create_info.tmp_table() && executing BEGIN...COMMIT (generated by CREATE...SELECT) we
thd->in_multi_stmt_transaction_mode()) || trx_cache; have to use the transactional cache to ensure we don't
calculate any checksum for the CREATE part.
*/
trx_cache= ((lex->select_lex.item_list.elements &&
thd->is_current_stmt_binlog_format_row()) ||
(thd->variables.option_bits & OPTION_GTID_BEGIN));
use_cache= ((lex->create_info.tmp_table() &&
thd->in_multi_stmt_transaction_mode()) || trx_cache);
break; break;
case SQLCOM_SET_OPTION: case SQLCOM_SET_OPTION:
if (lex->autocommit) if (lex->autocommit)
...@@ -3218,8 +3234,8 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, ...@@ -3218,8 +3234,8 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
else else
cache_type= Log_event::EVENT_STMT_CACHE; cache_type= Log_event::EVENT_STMT_CACHE;
DBUG_ASSERT(cache_type != Log_event::EVENT_INVALID_CACHE); DBUG_ASSERT(cache_type != Log_event::EVENT_INVALID_CACHE);
DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %llu", DBUG_PRINT("info",("Query_log_event has flags2: %lu sql_mode: %llu cache_tye: %d",
(ulong) flags2, sql_mode)); (ulong) flags2, sql_mode, cache_type));
} }
#endif /* MYSQL_CLIENT */ #endif /* MYSQL_CLIENT */
......
...@@ -1174,7 +1174,7 @@ bool event_checksum_test(uchar *event_buf, ulong event_len, uint8 alg) ...@@ -1174,7 +1174,7 @@ bool event_checksum_test(uchar *event_buf, ulong event_len, uint8 alg)
compile_time_assert(BINLOG_CHECKSUM_ALG_ENUM_END <= 0x80); compile_time_assert(BINLOG_CHECKSUM_ALG_ENUM_END <= 0x80);
} }
incoming= uint4korr(event_buf + event_len - BINLOG_CHECKSUM_LEN); incoming= uint4korr(event_buf + event_len - BINLOG_CHECKSUM_LEN);
computed= my_checksum(0L, NULL, 0); computed= 0;
/* checksum the event content but the checksum part itself */ /* checksum the event content but the checksum part itself */
computed= my_checksum(computed, (const uchar*) event_buf, computed= my_checksum(computed, (const uchar*) event_buf,
event_len - BINLOG_CHECKSUM_LEN); event_len - BINLOG_CHECKSUM_LEN);
......
...@@ -5464,7 +5464,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) ...@@ -5464,7 +5464,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
if (uint4korr(&buf[0]) == 0 && checksum_alg == BINLOG_CHECKSUM_ALG_OFF && if (uint4korr(&buf[0]) == 0 && checksum_alg == BINLOG_CHECKSUM_ALG_OFF &&
mi->rli.relay_log.relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_OFF) mi->rli.relay_log.relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_OFF)
{ {
ha_checksum rot_crc= my_checksum(0L, NULL, 0); ha_checksum rot_crc= 0;
event_len += BINLOG_CHECKSUM_LEN; event_len += BINLOG_CHECKSUM_LEN;
memcpy(rot_buf, buf, event_len - BINLOG_CHECKSUM_LEN); memcpy(rot_buf, buf, event_len - BINLOG_CHECKSUM_LEN);
int4store(&rot_buf[EVENT_LEN_OFFSET], int4store(&rot_buf[EVENT_LEN_OFFSET],
......
...@@ -79,8 +79,7 @@ fake_event_header(String* packet, Log_event_type event_type, ulong extra_len, ...@@ -79,8 +79,7 @@ fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
} }
if (*do_checksum) if (*do_checksum)
{ {
*crc= my_checksum(0L, NULL, 0); *crc= my_checksum(0, (uchar*)header, sizeof(header));
*crc= my_checksum(*crc, (uchar*)header, sizeof(header));
} }
return 0; return 0;
} }
...@@ -743,8 +742,8 @@ static int send_heartbeat_event(NET* net, String* packet, ...@@ -743,8 +742,8 @@ static int send_heartbeat_event(NET* net, String* packet,
if (do_checksum) if (do_checksum)
{ {
char b[BINLOG_CHECKSUM_LEN]; char b[BINLOG_CHECKSUM_LEN];
ha_checksum crc= my_checksum(0L, NULL, 0); ha_checksum crc;
crc= my_checksum(crc, (uchar*) header, sizeof(header)); crc= my_checksum(0, (uchar*) header, sizeof(header));
crc= my_checksum(crc, (uchar*) p, ident_len); crc= my_checksum(crc, (uchar*) p, ident_len);
int4store(b, crc); int4store(b, crc);
packet->append(b, sizeof(b)); packet->append(b, sizeof(b));
......
...@@ -4684,12 +4684,12 @@ int create_table_impl(THD *thd, ...@@ -4684,12 +4684,12 @@ int create_table_impl(THD *thd,
bool table_creation_was_logged= tmp_table->s->table_creation_was_logged; bool table_creation_was_logged= tmp_table->s->table_creation_was_logged;
if (create_info->options & HA_LEX_CREATE_REPLACE) if (create_info->options & HA_LEX_CREATE_REPLACE)
{ {
bool is_trans; bool tmp;
/* /*
We are using CREATE OR REPLACE on an existing temporary table We are using CREATE OR REPLACE on an existing temporary table
Remove the old table so that we can re-create it. Remove the old table so that we can re-create it.
*/ */
if (drop_temporary_table(thd, tmp_table, &is_trans)) if (drop_temporary_table(thd, tmp_table, &tmp))
goto err; goto err;
} }
else if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS) else if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment