Commit 984a06db authored by Sujatha's avatar Sujatha

MDEV-4633: multi_source.simple test fails sporadically

Analysis:
========
Writes to 'rli->log_space_total' needs to be synchronized, otherwise both
SQL_THREAD and IO_THREAD can try to modify the variable simultaneously
resulting in incorrect rli->log_space_total.  In the current test scenario
SQL_THREAD is trying to decrement 'rli->log_space_total' in 'purge_first_log'
and IO_THREAD is trying to increment the 'rli->log_space_total' in
'queue_event' simultaneously. Hence test occasionally fails with  result
mismatch.

Fix:
===
Convert 'rli->log_space_total' variable to atomic type.
parent dd33a70d
...@@ -4397,7 +4397,9 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) ...@@ -4397,7 +4397,9 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
0, 0, &log_space_reclaimed); 0, 0, &log_space_reclaimed);
mysql_mutex_lock(&rli->log_space_lock); mysql_mutex_lock(&rli->log_space_lock);
rli->log_space_total-= log_space_reclaimed; my_atomic_add64_explicit((volatile int64*)(&rli->log_space_total),
(-(int64)log_space_reclaimed),
MY_MEMORY_ORDER_RELAXED);
mysql_cond_broadcast(&rli->log_space_cond); mysql_cond_broadcast(&rli->log_space_cond);
mysql_mutex_unlock(&rli->log_space_lock); mysql_mutex_unlock(&rli->log_space_lock);
......
...@@ -716,7 +716,9 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -716,7 +716,9 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
char buf1[22],buf2[22]; char buf1[22],buf2[22];
#endif #endif
DBUG_ENTER("harvest_bytes_written"); DBUG_ENTER("harvest_bytes_written");
(*counter)+=bytes_written;
my_atomic_add64_explicit((volatile int64*)(counter), bytes_written,
MY_MEMORY_ORDER_RELAXED);
DBUG_PRINT("info",("counter: %s bytes_written: %s", llstr(*counter,buf1), DBUG_PRINT("info",("counter: %s bytes_written: %s", llstr(*counter,buf1),
llstr(bytes_written,buf2))); llstr(bytes_written,buf2)));
bytes_written=0; bytes_written=0;
......
...@@ -454,7 +454,8 @@ static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo) ...@@ -454,7 +454,8 @@ static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
linfo->log_file_name); linfo->log_file_name);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
rli->log_space_total += s.st_size; my_atomic_add64_explicit((volatile int64*)(&rli->log_space_total),
s.st_size, MY_MEMORY_ORDER_RELAXED);
DBUG_PRINT("info",("log_space_total: %llu", rli->log_space_total)); DBUG_PRINT("info",("log_space_total: %llu", rli->log_space_total));
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -464,7 +465,8 @@ static int count_relay_log_space(Relay_log_info* rli) ...@@ -464,7 +465,8 @@ static int count_relay_log_space(Relay_log_info* rli)
{ {
LOG_INFO linfo; LOG_INFO linfo;
DBUG_ENTER("count_relay_log_space"); DBUG_ENTER("count_relay_log_space");
rli->log_space_total= 0; my_atomic_store64_explicit((volatile int64*)(&rli->log_space_total), 0,
MY_MEMORY_ORDER_RELAXED);
if (rli->relay_log.find_log_pos(&linfo, NullS, 1)) if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
{ {
sql_print_error("Could not find first log while counting relay log space"); sql_print_error("Could not find first log while counting relay log space");
...@@ -1223,8 +1225,8 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, ...@@ -1223,8 +1225,8 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname()); strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname());
strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname()); strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname());
rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
rli->log_space_total= 0; my_atomic_store64_explicit((volatile int64*)(&rli->log_space_total), 0,
MY_MEMORY_ORDER_RELAXED);
if (count_relay_log_space(rli)) if (count_relay_log_space(rli))
{ {
*errmsg= "Error counting relay log space"; *errmsg= "Error counting relay log space";
......
...@@ -2373,7 +2373,10 @@ static bool wait_for_relay_log_space(Relay_log_info* rli) ...@@ -2373,7 +2373,10 @@ static bool wait_for_relay_log_space(Relay_log_info* rli)
&rli->log_space_lock, &rli->log_space_lock,
&stage_waiting_for_relay_log_space, &stage_waiting_for_relay_log_space,
&old_stage); &old_stage);
while (rli->log_space_limit < rli->log_space_total && while (rli->log_space_limit <
(ulonglong)my_atomic_load64_explicit((volatile int64*)
(&rli->log_space_total),
MY_MEMORY_ORDER_RELAXED) &&
!(slave_killed=io_slave_killed(mi)) && !(slave_killed=io_slave_killed(mi)) &&
!rli->ignore_log_space_limit) !rli->ignore_log_space_limit)
mysql_cond_wait(&rli->log_space_cond, &rli->log_space_lock); mysql_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
...@@ -2923,7 +2926,10 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, ...@@ -2923,7 +2926,10 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
protocol->store(mi->rli.last_error().message, &my_charset_bin); protocol->store(mi->rli.last_error().message, &my_charset_bin);
protocol->store((uint32) mi->rli.slave_skip_counter); protocol->store((uint32) mi->rli.slave_skip_counter);
protocol->store((ulonglong) mi->rli.group_master_log_pos); protocol->store((ulonglong) mi->rli.group_master_log_pos);
protocol->store((ulonglong) mi->rli.log_space_total); protocol->store((ulonglong)
my_atomic_load64_explicit((volatile int64*)
(&mi->rli.log_space_total),
MY_MEMORY_ORDER_RELAXED));
protocol->store( protocol->store(
mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None": mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
...@@ -4623,7 +4629,9 @@ Stopping slave I/O thread due to out-of-memory error from master"); ...@@ -4623,7 +4629,9 @@ Stopping slave I/O thread due to out-of-memory error from master");
#endif #endif
if (rli->log_space_limit && rli->log_space_limit < if (rli->log_space_limit && rli->log_space_limit <
rli->log_space_total && (ulonglong) my_atomic_load64_explicit((volatile int64*)
(&rli->log_space_total),
MY_MEMORY_ORDER_RELAXED) &&
!rli->ignore_log_space_limit) !rli->ignore_log_space_limit)
if (wait_for_relay_log_space(rli)) if (wait_for_relay_log_space(rli))
{ {
...@@ -7222,7 +7230,10 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) ...@@ -7222,7 +7230,10 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
is are able to rotate and purge sometime soon. is are able to rotate and purge sometime soon.
*/ */
if (rli->log_space_limit && if (rli->log_space_limit &&
rli->log_space_limit < rli->log_space_total) rli->log_space_limit <
(ulonglong) my_atomic_load64_explicit((volatile int64*)
(&rli->log_space_total),
MY_MEMORY_ORDER_RELAXED))
{ {
/* force rotation if not in an unfinished group */ /* force rotation if not in an unfinished group */
rli->sql_force_rotate_relay= !rli->is_in_group(); rli->sql_force_rotate_relay= !rli->is_in_group();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment