Commit 010971a7 authored by unknown's avatar unknown

MDEV-6156: Parallel replication incorrectly caches charset between worker threads

Replication caches the character sets used in a query, to be able to quickly
reuse them for the next query in the common case of them not having changed.

In parallel replication, this caching needs to be per-worker-thread. The
code was not modified to handle this correctly, so the caching in one worker
could cause another worker to run a query using the wrong character set,
causing replication corruption.
parent f9e5f237
include/rpl_init.inc [topology=1->2]
*** MDEV-6156: Parallel replication incorrectly caches charset between worker threads ***
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=5;
include/start_slave.inc
CREATE TABLE t1 (a INT PRIMARY KEY, b VARCHAR(100) CHARACTER SET utf8);
SHOW CREATE TABLE t1;
Table Create Table
t1 CREATE TABLE `t1` (
`a` int(11) NOT NULL,
`b` varchar(100) CHARACTER SET utf8 DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1
SET character_set_client=latin1;
INSERT INTO t1 VALUES (1, 'Rødgrød med fløde 1');
INSERT INTO t1 VALUES (2, 'Rødgrød med fløde 2');
INSERT INTO t1 VALUES (3, 'Rødgrød med fløde 3');
INSERT INTO t1 VALUES (4, 'Rødgrød med fløde 4');
INSERT INTO t1 VALUES (5, 'Rødgrød med fløde 5');
INSERT INTO t1 VALUES (6, 'Rødgrød med fløde 6');
INSERT INTO t1 VALUES (7, 'Rødgrød med fløde 7');
INSERT INTO t1 VALUES (8, 'Rødgrød med fløde 8');
INSERT INTO t1 VALUES (9, 'Rødgrød med fløde 9');
INSERT INTO t1 VALUES (10, 'Rødgrød med fløde 10');
SET character_set_client=utf8;
INSERT INTO t1 VALUES (11, 'Rødgrød med fløde 1');
INSERT INTO t1 VALUES (12, 'Rødgrød med fløde 2');
INSERT INTO t1 VALUES (13, 'Rødgrød med fløde 3');
INSERT INTO t1 VALUES (14, 'Rødgrød med fløde 4');
INSERT INTO t1 VALUES (15, 'Rødgrød med fløde 5');
INSERT INTO t1 VALUES (16, 'Rødgrød med fløde 6');
INSERT INTO t1 VALUES (17, 'Rødgrød med fløde 7');
INSERT INTO t1 VALUES (18, 'Rødgrød med fløde 8');
INSERT INTO t1 VALUES (19, 'Rødgrød med fløde 9');
INSERT INTO t1 VALUES (20, 'Rødgrød med fløde 10');
SET character_set_results=utf8;
SELECT * FROM t1 ORDER BY a;
a b
1 Rødgrød med fløde 1
2 Rødgrød med fløde 2
3 Rødgrød med fløde 3
4 Rødgrød med fløde 4
5 Rødgrød med fløde 5
6 Rødgrød med fløde 6
7 Rødgrød med fløde 7
8 Rødgrød med fløde 8
9 Rødgrød med fløde 9
10 Rødgrød med fløde 10
11 Rødgrød med fløde 1
12 Rødgrød med fløde 2
13 Rødgrød med fløde 3
14 Rødgrød med fløde 4
15 Rødgrød med fløde 5
16 Rødgrød med fløde 6
17 Rødgrød med fløde 7
18 Rødgrød med fløde 8
19 Rødgrød med fløde 9
20 Rødgrød med fløde 10
SET character_set_results=utf8;
SELECT * FROM t1 ORDER BY a;
a b
1 Rødgrød med fløde 1
2 Rødgrød med fløde 2
3 Rødgrød med fløde 3
4 Rødgrød med fløde 4
5 Rødgrød med fløde 5
6 Rødgrød med fløde 6
7 Rødgrød med fløde 7
8 Rødgrød med fløde 8
9 Rødgrød med fløde 9
10 Rødgrød med fløde 10
11 Rødgrød med fløde 1
12 Rødgrød med fløde 2
13 Rødgrød med fløde 3
14 Rødgrød med fløde 4
15 Rødgrød med fløde 5
16 Rødgrød med fløde 6
17 Rødgrød med fløde 7
18 Rødgrød med fløde 8
19 Rødgrød med fløde 9
20 Rødgrød med fløde 10
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
DROP TABLE t1;
include/rpl_end.inc
--source include/have_binlog_format_statement.inc
--let $rpl_topology=1->2
--source include/rpl_init.inc
--echo *** MDEV-6156: Parallel replication incorrectly caches charset between worker threads ***
--connection server_2
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=5;
--source include/start_slave.inc
--connection server_1
CREATE TABLE t1 (a INT PRIMARY KEY, b VARCHAR(100) CHARACTER SET utf8);
SHOW CREATE TABLE t1;
SET character_set_client=latin1;
INSERT INTO t1 VALUES (1, 'Rdgrd med flde 1');
INSERT INTO t1 VALUES (2, 'Rdgrd med flde 2');
INSERT INTO t1 VALUES (3, 'Rdgrd med flde 3');
INSERT INTO t1 VALUES (4, 'Rdgrd med flde 4');
INSERT INTO t1 VALUES (5, 'Rdgrd med flde 5');
INSERT INTO t1 VALUES (6, 'Rdgrd med flde 6');
INSERT INTO t1 VALUES (7, 'Rdgrd med flde 7');
INSERT INTO t1 VALUES (8, 'Rdgrd med flde 8');
INSERT INTO t1 VALUES (9, 'Rdgrd med flde 9');
INSERT INTO t1 VALUES (10, 'Rdgrd med flde 10');
SET character_set_client=utf8;
INSERT INTO t1 VALUES (11, 'Rødgrød med fløde 1');
INSERT INTO t1 VALUES (12, 'Rødgrød med fløde 2');
INSERT INTO t1 VALUES (13, 'Rødgrød med fløde 3');
INSERT INTO t1 VALUES (14, 'Rødgrød med fløde 4');
INSERT INTO t1 VALUES (15, 'Rødgrød med fløde 5');
INSERT INTO t1 VALUES (16, 'Rødgrød med fløde 6');
INSERT INTO t1 VALUES (17, 'Rødgrød med fløde 7');
INSERT INTO t1 VALUES (18, 'Rødgrød med fløde 8');
INSERT INTO t1 VALUES (19, 'Rødgrød med fløde 9');
INSERT INTO t1 VALUES (20, 'Rødgrød med fløde 10');
SET character_set_results=utf8;
SELECT * FROM t1 ORDER BY a;
--save_master_pos
--connection server_2
--sync_with_master
SET character_set_results=utf8;
SELECT * FROM t1 ORDER BY a;
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
--source include/start_slave.inc
--connection server_1
DROP TABLE t1;
--source include/rpl_end.inc
...@@ -4153,7 +4153,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, ...@@ -4153,7 +4153,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
(sql_mode & ~(ulong) MODE_NO_DIR_IN_CREATE)); (sql_mode & ~(ulong) MODE_NO_DIR_IN_CREATE));
if (charset_inited) if (charset_inited)
{ {
if (rli->cached_charset_compare(charset)) if (rgi->cached_charset_compare(charset))
{ {
/* Verify that we support the charsets found in the event. */ /* Verify that we support the charsets found in the event. */
if (!(thd->variables.character_set_client= if (!(thd->variables.character_set_client=
...@@ -4169,7 +4169,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, ...@@ -4169,7 +4169,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
stop with EE_UNKNOWN_CHARSET in compare_errors (unless set to stop with EE_UNKNOWN_CHARSET in compare_errors (unless set to
ignore this error). ignore this error).
*/ */
set_slave_thread_default_charset(thd, rli); set_slave_thread_default_charset(thd, rgi);
goto compare_errors; goto compare_errors;
} }
thd->update_charset(); // for the charset change to take effect thd->update_charset(); // for the charset change to take effect
...@@ -6211,7 +6211,7 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi) ...@@ -6211,7 +6211,7 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi)
master is 4.0 then the events are in the slave's format (conversion). master is 4.0 then the events are in the slave's format (conversion).
*/ */
set_slave_thread_options(thd); set_slave_thread_options(thd);
set_slave_thread_default_charset(thd, rli); set_slave_thread_default_charset(thd, rgi);
thd->variables.sql_mode= global_system_variables.sql_mode; thd->variables.sql_mode= global_system_variables.sql_mode;
thd->variables.auto_increment_increment= thd->variables.auto_increment_increment=
thd->variables.auto_increment_offset= 1; thd->variables.auto_increment_offset= 1;
......
...@@ -82,7 +82,6 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) ...@@ -82,7 +82,6 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
max_relay_log_size= global_system_variables.max_relay_log_size; max_relay_log_size= global_system_variables.max_relay_log_size;
bzero((char*) &info_file, sizeof(info_file)); bzero((char*) &info_file, sizeof(info_file));
bzero((char*) &cache_buf, sizeof(cache_buf)); bzero((char*) &cache_buf, sizeof(cache_buf));
cached_charset_invalidate();
mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_data_lock, mysql_mutex_init(key_relay_log_info_data_lock,
&data_lock, MY_MUTEX_INIT_FAST); &data_lock, MY_MUTEX_INIT_FAST);
...@@ -1200,29 +1199,6 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev) ...@@ -1200,29 +1199,6 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
} }
void Relay_log_info::cached_charset_invalidate()
{
DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
/* Full of zeroes means uninitialized. */
bzero(cached_charset, sizeof(cached_charset));
DBUG_VOID_RETURN;
}
bool Relay_log_info::cached_charset_compare(char *charset) const
{
DBUG_ENTER("Relay_log_info::cached_charset_compare");
if (memcmp(cached_charset, charset, sizeof(cached_charset)))
{
memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
void Relay_log_info::stmt_done(my_off_t event_master_log_pos, void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
time_t event_creation_time, THD *thd, time_t event_creation_time, THD *thd,
rpl_group_info *rgi) rpl_group_info *rgi)
...@@ -1503,6 +1479,7 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli) ...@@ -1503,6 +1479,7 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli)
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{ {
reinit(rli); reinit(rli);
cached_charset_invalidate();
bzero(&current_gtid, sizeof(current_gtid)); bzero(&current_gtid, sizeof(current_gtid));
mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
MY_MUTEX_INIT_FAST); MY_MUTEX_INIT_FAST);
...@@ -1585,6 +1562,29 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi, ...@@ -1585,6 +1562,29 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
} }
void rpl_group_info::cached_charset_invalidate()
{
DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
/* Full of zeroes means uninitialized. */
bzero(cached_charset, sizeof(cached_charset));
DBUG_VOID_RETURN;
}
bool rpl_group_info::cached_charset_compare(char *charset) const
{
DBUG_ENTER("rpl_group_info::cached_charset_compare");
if (memcmp(cached_charset, charset, sizeof(cached_charset)))
{
memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
void rpl_group_info::cleanup_context(THD *thd, bool error) void rpl_group_info::cleanup_context(THD *thd, bool error)
{ {
DBUG_ENTER("Relay_log_info::cleanup_context"); DBUG_ENTER("Relay_log_info::cleanup_context");
......
...@@ -295,7 +295,6 @@ class Relay_log_info : public Slave_reporting_capability ...@@ -295,7 +295,6 @@ class Relay_log_info : public Slave_reporting_capability
/* Condition for UNTIL master_gtid_pos. */ /* Condition for UNTIL master_gtid_pos. */
slave_connection_state until_gtid_pos; slave_connection_state until_gtid_pos;
char cached_charset[6];
/* /*
retried_trans is a cumulative counter: how many times the slave retried_trans is a cumulative counter: how many times the slave
has retried a transaction (any) since slave started. has retried a transaction (any) since slave started.
...@@ -371,15 +370,6 @@ class Relay_log_info : public Slave_reporting_capability ...@@ -371,15 +370,6 @@ class Relay_log_info : public Slave_reporting_capability
group_relay_log_pos); group_relay_log_pos);
} }
/*
Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
the thread save 3 get_charset() per Query_log_event if the charset is not
changing from event to event (common situation).
When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
*/
void cached_charset_invalidate();
bool cached_charset_compare(char *charset) const;
/** /**
Helper function to do after statement completion. Helper function to do after statement completion.
...@@ -546,6 +536,8 @@ struct rpl_group_info ...@@ -546,6 +536,8 @@ struct rpl_group_info
mysql_mutex_t sleep_lock; mysql_mutex_t sleep_lock;
mysql_cond_t sleep_cond; mysql_cond_t sleep_cond;
char cached_charset[6];
/* /*
trans_retries varies between 0 to slave_transaction_retries and counts how trans_retries varies between 0 to slave_transaction_retries and counts how
many times the slave has retried the present transaction; gets reset to 0 many times the slave has retried the present transaction; gets reset to 0
...@@ -679,6 +671,15 @@ struct rpl_group_info ...@@ -679,6 +671,15 @@ struct rpl_group_info
return false; return false;
} }
/*
Last charset (6 bytes) seen by slave SQL thread is cached here; it helps
the thread save 3 get_charset() per Query_log_event if the charset is not
changing from event to event (common situation).
When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
*/
void cached_charset_invalidate();
bool cached_charset_compare(char *charset) const;
void clear_tables_to_lock(); void clear_tables_to_lock();
void cleanup_context(THD *, bool); void cleanup_context(THD *, bool);
void slave_close_thread_tables(THD *); void slave_close_thread_tables(THD *);
......
...@@ -2879,7 +2879,7 @@ void set_slave_thread_options(THD* thd) ...@@ -2879,7 +2879,7 @@ void set_slave_thread_options(THD* thd)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli) void set_slave_thread_default_charset(THD* thd, rpl_group_info *rgi)
{ {
DBUG_ENTER("set_slave_thread_default_charset"); DBUG_ENTER("set_slave_thread_default_charset");
...@@ -2891,13 +2891,7 @@ void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli) ...@@ -2891,13 +2891,7 @@ void set_slave_thread_default_charset(THD* thd, Relay_log_info const *rli)
global_system_variables.collation_server; global_system_variables.collation_server;
thd->update_charset(); thd->update_charset();
/* rgi->cached_charset_invalidate();
We use a const cast here since the conceptual (and externally
visible) behavior of the function is to set the default charset of
the thread. That the cache has to be invalidated is a secondary
effect.
*/
const_cast<Relay_log_info*>(rli)->cached_charset_invalidate();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -4682,7 +4676,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, ...@@ -4682,7 +4676,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
mysql_cond_broadcast(&rli->data_cond); mysql_cond_broadcast(&rli->data_cond);
rli->ignore_log_space_limit= 0; /* don't need any lock */ rli->ignore_log_space_limit= 0; /* don't need any lock */
/* we die so won't remember charset - re-update them on next thread start */ /* we die so won't remember charset - re-update them on next thread start */
rli->cached_charset_invalidate(); serial_rgi->cached_charset_invalidate();
/* /*
TODO: see if we can do this conditionally in next_event() instead TODO: see if we can do this conditionally in next_event() instead
......
...@@ -51,6 +51,7 @@ ...@@ -51,6 +51,7 @@
class Relay_log_info; class Relay_log_info;
class Master_info; class Master_info;
class Master_info_index; class Master_info_index;
struct rpl_group_info;
struct rpl_parallel_thread; struct rpl_parallel_thread;
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
...@@ -226,7 +227,7 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,ulonglong pos, ...@@ -226,7 +227,7 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,ulonglong pos,
int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
const char** errmsg); const char** errmsg);
void set_slave_thread_options(THD* thd); void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli); void set_slave_thread_default_charset(THD *thd, rpl_group_info *rgi);
int rotate_relay_log(Master_info* mi); int rotate_relay_log(Master_info* mi);
int apply_event_and_update_pos(Log_event* ev, THD* thd, int apply_event_and_update_pos(Log_event* ev, THD* thd,
struct rpl_group_info *rgi, struct rpl_group_info *rgi,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment