Commit ebbd5ef6 authored by mkaruza's avatar mkaruza Committed by Jan Lindström

MDEV-27862 Galera should replicate nextval()-related changes in sequences with...

MDEV-27862 Galera should replicate nextval()-related changes in sequences with INCREMENT <> 0, at least NOCACHE ones with engine=InnoDB

Sequence storage engine is not transactionl so cache will be written in
stmt_cache that is not replicated in cluster. To fix this replicate
what is available in both trans_cache and stmt_cache.

Sequences will only work when NOCACHE keyword is used when sequnce is
created. If WSREP is enabled and we don't have this keyword report error
indicting that sequence will not work correctly in cluster.

When binlog is enabled statement cache will be cleared in transaction
before COMMIT so cache generated from sequence will not be replicated.
We need to keep cache until replication.

Tests are re-recorded because of replication changes that were
introducted with this PR.
Reviewed-by: default avatarJan Lindström <jan.lindstrom@mariadb.com>
parent c8fabbed
...@@ -12,3 +12,6 @@ INSERT INTO t1 VALUES (NEXT VALUE FOR Seq1_1); ...@@ -12,3 +12,6 @@ INSERT INTO t1 VALUES (NEXT VALUE FOR Seq1_1);
ERROR 23000: Duplicate entry '1' for key 'PRIMARY' ERROR 23000: Duplicate entry '1' for key 'PRIMARY'
DROP SEQUENCE Seq1_1; DROP SEQUENCE Seq1_1;
DROP TABLE t1; DROP TABLE t1;
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
connection node_2;
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
connection node_2;
connection node_1;
CREATE SEQUENCE seq_nocache ENGINE=InnoDB;
DROP SEQUENCE seq_nocache;
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
connection node_2;
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
connection node_1;
CREATE SEQUENCE seq NOCACHE ENGINE=InnoDB;
SELECT NEXTVAL(seq) = 1;
NEXTVAL(seq) = 1
1
connection node_2;
SELECT NEXTVAL(seq) = 2;
NEXTVAL(seq) = 2
1
connection node_1;
SELECT NEXTVAL(seq) = 3;
NEXTVAL(seq) = 3
1
SELECT SETVAL(seq, 100);
SETVAL(seq, 100)
100
connection node_2;
SELECT NEXTVAL(seq) = 101;
NEXTVAL(seq) = 101
1
connection node_1;
SELECT NEXTVAL(seq) = 102;
NEXTVAL(seq) = 102
1
DROP SEQUENCE seq;
CREATE TABLE t1(f1 INT);
CREATE SEQUENCE seq_transaction NOCACHE ENGINE=InnoDB;
START TRANSACTION;
INSERT INTO t1 VALUES (0);
SELECT NEXTVAL(seq_transaction);
NEXTVAL(seq_transaction)
1
INSERT INTO t1 VALUES (NEXTVAL(seq_transaction));
COMMIT;
connection node_2;
SELECT COUNT(*) = 2 FROM t1;
COUNT(*) = 2
1
SELECT NEXTVAL(seq_transaction) = 3;
NEXTVAL(seq_transaction) = 3
1
connection node_1;
SELECT NEXTVAL(seq_transaction) = 4;
NEXTVAL(seq_transaction) = 4
1
DROP SEQUENCE seq_transaction;
DROP TABLE t1;
...@@ -44,6 +44,9 @@ Table Create Table ...@@ -44,6 +44,9 @@ Table Create Table
Seq1_1 CREATE SEQUENCE `Seq1_1` start with 1 minvalue 1 maxvalue 9223372036854775806 increment by 1 cache 1000 nocycle ENGINE=InnoDB Seq1_1 CREATE SEQUENCE `Seq1_1` start with 1 minvalue 1 maxvalue 9223372036854775806 increment by 1 cache 1000 nocycle ENGINE=InnoDB
select NEXT VALUE FOR Seq1_1; select NEXT VALUE FOR Seq1_1;
NEXT VALUE FOR Seq1_1 NEXT VALUE FOR Seq1_1
1 3001
connection node_1; connection node_1;
DROP SEQUENCE Seq1_1; DROP SEQUENCE Seq1_1;
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
connection node_2;
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
...@@ -13,3 +13,11 @@ CREATE SEQUENCE Seq1_1 START WITH 1 INCREMENT BY 1; ...@@ -13,3 +13,11 @@ CREATE SEQUENCE Seq1_1 START WITH 1 INCREMENT BY 1;
INSERT INTO t1 VALUES (NEXT VALUE FOR Seq1_1); INSERT INTO t1 VALUES (NEXT VALUE FOR Seq1_1);
DROP SEQUENCE Seq1_1; DROP SEQUENCE Seq1_1;
DROP TABLE t1; DROP TABLE t1;
# Supress warning for SEQUENCES that are declared without `NOCACHE` introduced with MDEV-27862
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
--connection node_2
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
--source include/galera_cluster.inc
--source include/have_innodb.inc
# Report WARNING when SEQUENCE is created without `NOCACHE`
CREATE SEQUENCE seq_nocache ENGINE=InnoDB;
DROP SEQUENCE seq_nocache;
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
--connection node_2
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
# NEXTVAL
--connection node_1
CREATE SEQUENCE seq NOCACHE ENGINE=InnoDB;
SELECT NEXTVAL(seq) = 1;
--connection node_2
SELECT NEXTVAL(seq) = 2;
--connection node_1
SELECT NEXTVAL(seq) = 3;
# SETVAL
SELECT SETVAL(seq, 100);
--connection node_2
SELECT NEXTVAL(seq) = 101;
--connection node_1
SELECT NEXTVAL(seq) = 102;
DROP SEQUENCE seq;
# TRANSACTIONS
CREATE TABLE t1(f1 INT);
CREATE SEQUENCE seq_transaction NOCACHE ENGINE=InnoDB;
START TRANSACTION;
INSERT INTO t1 VALUES (0);
SELECT NEXTVAL(seq_transaction);
INSERT INTO t1 VALUES (NEXTVAL(seq_transaction));
COMMIT;
--connection node_2
SELECT COUNT(*) = 2 FROM t1;
SELECT NEXTVAL(seq_transaction) = 3;
--connection node_1
SELECT NEXTVAL(seq_transaction) = 4;
DROP SEQUENCE seq_transaction;
DROP TABLE t1;
...@@ -44,3 +44,9 @@ select NEXT VALUE FOR Seq1_1; ...@@ -44,3 +44,9 @@ select NEXT VALUE FOR Seq1_1;
--connection node_1 --connection node_1
DROP SEQUENCE Seq1_1; DROP SEQUENCE Seq1_1;
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
--connection node_2
CALL mtr.add_suppression("SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
...@@ -434,6 +434,9 @@ static int sequence_initialize(void *p) ...@@ -434,6 +434,9 @@ static int sequence_initialize(void *p)
HTON_HIDDEN | HTON_HIDDEN |
HTON_TEMPORARY_NOT_SUPPORTED | HTON_TEMPORARY_NOT_SUPPORTED |
HTON_ALTER_NOT_SUPPORTED | HTON_ALTER_NOT_SUPPORTED |
#ifdef WITH_WSREP
HTON_WSREP_REPLICATION |
#endif
HTON_NO_PARTITION); HTON_NO_PARTITION);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -2013,7 +2013,13 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) ...@@ -2013,7 +2013,13 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
thd->backup_stage(&org_stage); thd->backup_stage(&org_stage);
THD_STAGE_INFO(thd, stage_binlog_write); THD_STAGE_INFO(thd, stage_binlog_write);
#ifdef WITH_WSREP
// DON'T clear stmt cache in case we are in transaction
if (!cache_mngr->stmt_cache.empty() &&
(!wsrep_on(thd) || ending_trans(thd, all)))
#else
if (!cache_mngr->stmt_cache.empty()) if (!cache_mngr->stmt_cache.empty())
#endif
{ {
error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr);
} }
...@@ -10777,13 +10783,13 @@ maria_declare_plugin_end; ...@@ -10777,13 +10783,13 @@ maria_declare_plugin_end;
#ifdef WITH_WSREP #ifdef WITH_WSREP
#include "wsrep_mysqld.h" #include "wsrep_mysqld.h"
IO_CACHE *wsrep_get_trans_cache(THD * thd) IO_CACHE *wsrep_get_cache(THD * thd, bool is_transactional)
{ {
DBUG_ASSERT(binlog_hton->slot != HA_SLOT_UNDEF); DBUG_ASSERT(binlog_hton->slot != HA_SLOT_UNDEF);
binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*) binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*)
thd_get_ha_data(thd, binlog_hton); thd_get_ha_data(thd, binlog_hton);
if (cache_mngr) if (cache_mngr)
return cache_mngr->get_binlog_cache_log(true); return cache_mngr->get_binlog_cache_log(is_transactional);
WSREP_DEBUG("binlog cache not initialized, conn: %llu", WSREP_DEBUG("binlog cache not initialized, conn: %llu",
thd->thread_id); thd->thread_id);
......
...@@ -1239,7 +1239,7 @@ static inline TC_LOG *get_tc_log_implementation() ...@@ -1239,7 +1239,7 @@ static inline TC_LOG *get_tc_log_implementation()
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
IO_CACHE* wsrep_get_trans_cache(THD *); IO_CACHE* wsrep_get_cache(THD *, bool);
void wsrep_thd_binlog_trx_reset(THD * thd); void wsrep_thd_binlog_trx_reset(THD * thd);
void wsrep_thd_binlog_stmt_rollback(THD * thd); void wsrep_thd_binlog_stmt_rollback(THD * thd);
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
......
...@@ -308,6 +308,11 @@ bool sequence_insert(THD *thd, LEX *lex, TABLE_LIST *org_table_list) ...@@ -308,6 +308,11 @@ bool sequence_insert(THD *thd, LEX *lex, TABLE_LIST *org_table_list)
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
#ifdef WITH_WSREP
if (WSREP_ON && seq->cache != 0)
WSREP_WARN("CREATE SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
#endif
/* If not temporary table */ /* If not temporary table */
if (!temporary_table) if (!temporary_table)
{ {
...@@ -904,12 +909,18 @@ bool Sql_cmd_alter_sequence::execute(THD *thd) ...@@ -904,12 +909,18 @@ bool Sql_cmd_alter_sequence::execute(THD *thd)
No_such_table_error_handler no_such_table_handler; No_such_table_error_handler no_such_table_handler;
DBUG_ENTER("Sql_cmd_alter_sequence::execute"); DBUG_ENTER("Sql_cmd_alter_sequence::execute");
if (check_access(thd, ALTER_ACL, first_table->db.str, if (check_access(thd, ALTER_ACL, first_table->db.str,
&first_table->grant.privilege, &first_table->grant.privilege,
&first_table->grant.m_internal, &first_table->grant.m_internal,
0, 0)) 0, 0))
DBUG_RETURN(TRUE); /* purecov: inspected */ DBUG_RETURN(TRUE); /* purecov: inspected */
#ifdef WITH_WSREP
if (WSREP_ON && new_seq->cache != 0)
WSREP_WARN("ALTER SEQUENCES declared without `NOCACHE` will not behave correctly in galera cluster.");
#endif
if (check_grant(thd, ALTER_ACL, first_table, FALSE, 1, FALSE)) if (check_grant(thd, ALTER_ACL, first_table, FALSE, 1, FALSE))
DBUG_RETURN(TRUE); /* purecov: inspected */ DBUG_RETURN(TRUE); /* purecov: inspected */
......
...@@ -155,11 +155,11 @@ static int wsrep_write_cache_inc(THD* const thd, ...@@ -155,11 +155,11 @@ static int wsrep_write_cache_inc(THD* const thd,
goto cleanup; goto cleanup;
cache->read_pos= cache->read_end; cache->read_pos= cache->read_end;
} while ((cache->file >= 0) && (length= my_b_fill(cache))); } while ((cache->file >= 0) && (length= my_b_fill(cache)));
}
if (ret == 0) if (ret == 0)
{ {
assert(total_length + thd->wsrep_sr().log_position() == saved_pos); assert(total_length + thd->wsrep_sr().log_position() == saved_pos);
} }
}
cleanup: cleanup:
*len= total_length; *len= total_length;
......
...@@ -86,18 +86,37 @@ int Wsrep_client_service::prepare_data_for_replication() ...@@ -86,18 +86,37 @@ int Wsrep_client_service::prepare_data_for_replication()
DBUG_ASSERT(m_thd == current_thd); DBUG_ASSERT(m_thd == current_thd);
DBUG_ENTER("Wsrep_client_service::prepare_data_for_replication"); DBUG_ENTER("Wsrep_client_service::prepare_data_for_replication");
size_t data_len= 0; size_t data_len= 0;
IO_CACHE* cache= wsrep_get_trans_cache(m_thd); IO_CACHE* transactional_cache= wsrep_get_cache(m_thd, true);
IO_CACHE* stmt_cache= wsrep_get_cache(m_thd, false);
if (cache) if (transactional_cache || stmt_cache)
{ {
m_thd->binlog_flush_pending_rows_event(true); m_thd->binlog_flush_pending_rows_event(true);
if (wsrep_write_cache(m_thd, cache, &data_len))
size_t transactional_data_len= 0;
size_t stmt_data_len= 0;
// Write transactional cache
if (transactional_cache &&
wsrep_write_cache(m_thd, transactional_cache, &transactional_data_len))
{ {
WSREP_ERROR("rbr write fail, data_len: %zu", WSREP_ERROR("rbr write fail, data_len: %zu",
data_len); data_len);
// wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT); // wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
// Write stmt cache
if (stmt_cache && wsrep_write_cache(m_thd, stmt_cache, &stmt_data_len))
{
WSREP_ERROR("rbr write fail, data_len: %zu",
data_len);
// wsrep_override_error(m_thd, ER_ERROR_DURING_COMMIT);
DBUG_RETURN(1);
}
// Complete data written from both caches
data_len = transactional_data_len + stmt_data_len;
} }
if (data_len == 0) if (data_len == 0)
...@@ -139,7 +158,7 @@ int Wsrep_client_service::prepare_fragment_for_replication( ...@@ -139,7 +158,7 @@ int Wsrep_client_service::prepare_fragment_for_replication(
DBUG_ASSERT(m_thd == current_thd); DBUG_ASSERT(m_thd == current_thd);
THD* thd= m_thd; THD* thd= m_thd;
DBUG_ENTER("Wsrep_client_service::prepare_fragment_for_replication"); DBUG_ENTER("Wsrep_client_service::prepare_fragment_for_replication");
IO_CACHE* cache= wsrep_get_trans_cache(thd); IO_CACHE* cache= wsrep_get_cache(thd, true);
thd->binlog_flush_pending_rows_event(true); thd->binlog_flush_pending_rows_event(true);
if (!cache) if (!cache)
...@@ -221,7 +240,7 @@ bool Wsrep_client_service::statement_allowed_for_streaming() const ...@@ -221,7 +240,7 @@ bool Wsrep_client_service::statement_allowed_for_streaming() const
size_t Wsrep_client_service::bytes_generated() const size_t Wsrep_client_service::bytes_generated() const
{ {
IO_CACHE* cache= wsrep_get_trans_cache(m_thd); IO_CACHE* cache= wsrep_get_cache(m_thd, true);
if (cache) if (cache)
{ {
size_t pending_rows_event_length= 0; size_t pending_rows_event_length= 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment