Commit f1fcc1fc authored by Kristian Nielsen's avatar Kristian Nielsen

Back-port Master_info::using_parallel() to 10.0.

This has no functional changes, but it helps avoid merge problems from 10.0
to 10.1. In 10.0, code that checks for parallel replication uses
opt_slave_parallel_threads > 0, but this check needs to be
mi->using_parallel() in 10.1. By using the same check in 10.0 (with
unchanged semantics), merge problems to 10.1 are avoided.
parent 9a090728
...@@ -12829,7 +12829,7 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos, ...@@ -12829,7 +12829,7 @@ bool rpl_get_position_info(const char **log_file_name, ulonglong *log_pos,
return FALSE; return FALSE;
#else #else
const Relay_log_info *rli= &(active_mi->rli); const Relay_log_info *rli= &(active_mi->rli);
if (opt_slave_parallel_threads == 0) if (!rli->mi->using_parallel())
{ {
*log_file_name= rli->group_master_log_name; *log_file_name= rli->group_master_log_name;
*log_pos= rli->group_master_log_pos + *log_pos= rli->group_master_log_pos +
......
...@@ -75,6 +75,10 @@ class Master_info : public Slave_reporting_capability ...@@ -75,6 +75,10 @@ class Master_info : public Slave_reporting_capability
return connection_name.str == 0; return connection_name.str == 0;
} }
static const char *using_gtid_astext(enum enum_using_gtid arg); static const char *using_gtid_astext(enum enum_using_gtid arg);
bool using_parallel()
{
return opt_slave_parallel_threads > 0;
}
/* the variables below are needed because we can change masters on the fly */ /* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN+6]; /* Room for multi-*/ char master_log_name[FN_REFLEN+6]; /* Room for multi-*/
......
...@@ -1231,7 +1231,7 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev) ...@@ -1231,7 +1231,7 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
if (ev && ev->server_id == (uint32) global_system_variables.server_id && if (ev && ev->server_id == (uint32) global_system_variables.server_id &&
!replicate_same_server_id) !replicate_same_server_id)
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
log_name= (opt_slave_parallel_threads > 0 ? log_name= (mi->using_parallel() ?
future_event_master_log_name : group_master_log_name); future_event_master_log_name : group_master_log_name);
log_pos= ((!ev)? group_master_log_pos : log_pos= ((!ev)? group_master_log_pos :
(get_flag(IN_TRANSACTION) || !ev->log_pos) ? (get_flag(IN_TRANSACTION) || !ev->log_pos) ?
......
...@@ -620,8 +620,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) ...@@ -620,8 +620,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
{ {
DBUG_PRINT("info",("Terminating SQL thread")); DBUG_PRINT("info",("Terminating SQL thread"));
if (opt_slave_parallel_threads > 0 && if (mi->using_parallel() && mi->rli.abort_slave && mi->rli.stop_for_until)
mi->rli.abort_slave && mi->rli.stop_for_until)
{ {
mi->rli.stop_for_until= false; mi->rli.stop_for_until= false;
mi->rli.parallel.stop_during_until(); mi->rli.parallel.stop_during_until();
...@@ -2726,8 +2725,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, ...@@ -2726,8 +2725,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
else else
{ {
idle= mi->rli.sql_thread_caught_up; idle= mi->rli.sql_thread_caught_up;
if (opt_slave_parallel_threads > 0 && idle && if (mi->using_parallel() && idle && !mi->rli.parallel.workers_idle())
!mi->rli.parallel.workers_idle())
idle= false; idle= false;
} }
if (idle) if (idle)
...@@ -3517,7 +3515,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, ...@@ -3517,7 +3515,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
the user might be surprised to see a claim that the slave is up to date the user might be surprised to see a claim that the slave is up to date
long before those queued events are actually executed. long before those queued events are actually executed.
*/ */
if (opt_slave_parallel_threads == 0 && if (!rli->mi->using_parallel() &&
!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) !(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0)))
{ {
rli->last_master_timestamp= ev->when + (time_t) ev->exec_time; rli->last_master_timestamp= ev->when + (time_t) ev->exec_time;
...@@ -3568,7 +3566,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, ...@@ -3568,7 +3566,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
update_state_of_relay_log(rli, ev); update_state_of_relay_log(rli, ev);
if (opt_slave_parallel_threads > 0) if (rli->mi->using_parallel())
{ {
int res= rli->parallel.do_event(serial_rgi, ev, event_size); int res= rli->parallel.do_event(serial_rgi, ev, event_size);
if (res >= 0) if (res >= 0)
...@@ -4546,8 +4544,7 @@ pthread_handler_t handle_slave_sql(void *arg) ...@@ -4546,8 +4544,7 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi->gtid_sub_id= 0; serial_rgi->gtid_sub_id= 0;
serial_rgi->gtid_pending= false; serial_rgi->gtid_pending= false;
if (mi->using_gtid != Master_info::USE_GTID_NO && if (mi->using_gtid != Master_info::USE_GTID_NO && mi->using_parallel() &&
opt_slave_parallel_threads > 0 &&
rli->restart_gtid_pos.count() > 0) rli->restart_gtid_pos.count() > 0)
{ {
/* /*
...@@ -4734,7 +4731,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, ...@@ -4734,7 +4731,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
} }
} }
if (opt_slave_parallel_threads > 0) if (mi->using_parallel())
rli->parallel.wait_for_done(thd, rli); rli->parallel.wait_for_done(thd, rli);
/* Thread stopped. Print the current replication position to the log */ /* Thread stopped. Print the current replication position to the log */
...@@ -4760,7 +4757,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, ...@@ -4760,7 +4757,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
(We want the first one to be before the printout of stop position to (We want the first one to be before the printout of stop position to
get the correct position printed.) get the correct position printed.)
*/ */
if (opt_slave_parallel_threads > 0) if (mi->using_parallel())
rli->parallel.wait_for_done(thd, rli); rli->parallel.wait_for_done(thd, rli);
/* /*
...@@ -4784,7 +4781,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, ...@@ -4784,7 +4781,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
ulong domain_count; ulong domain_count;
flush_relay_log_info(rli); flush_relay_log_info(rli);
if (opt_slave_parallel_threads > 0) if (mi->using_parallel())
{ {
/* /*
In parallel replication GTID mode, we may stop with different domains In parallel replication GTID mode, we may stop with different domains
...@@ -6475,7 +6472,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) ...@@ -6475,7 +6472,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
llstr(my_b_tell(cur_log),llbuf1), llstr(my_b_tell(cur_log),llbuf1),
llstr(rli->event_relay_log_pos,llbuf2))); llstr(rli->event_relay_log_pos,llbuf2)));
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(opt_slave_parallel_threads > 0 || DBUG_ASSERT(rli->mi->using_parallel() ||
my_b_tell(cur_log) == rli->event_relay_log_pos); my_b_tell(cur_log) == rli->event_relay_log_pos);
} }
#endif #endif
......
...@@ -4351,8 +4351,7 @@ static bool update_slave_skip_counter(sys_var *self, THD *thd, Master_info *mi) ...@@ -4351,8 +4351,7 @@ static bool update_slave_skip_counter(sys_var *self, THD *thd, Master_info *mi)
mi->connection_name.str); mi->connection_name.str);
return true; return true;
} }
if (mi->using_gtid != Master_info::USE_GTID_NO && if (mi->using_gtid != Master_info::USE_GTID_NO && mi->using_parallel())
opt_slave_parallel_threads > 0)
{ {
ulong domain_count; ulong domain_count;
mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state); mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment