Commit 50f19ca8 authored by Kristian Nielsen's avatar Kristian Nielsen

Remove unnecessary global mutex in parallel replication.

The function apply_event_and_update_pos() is called with the
rli->data_lock mutex held. However, there seems to be nothing in the
function actually needing the mutex to be held. Certainly not in the
parallel replication case, where sql_slave_skip_counter is always 0
since the non-zero case is handled by the SQL driver thread.

So this patch makes parallel replication use a variant of
apply_event_and_update_pos() without the need to take the
rli->data_lock mutex. This avoids one contended global mutex for each
event executed, which might improve performance on CPU-bound workloads
somewhat.
Signed-off-by: default avatarKristian Nielsen <knielsen@knielsen-hq.org>
parent 7e0c9de8
......@@ -47,9 +47,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
if (!(ev->is_artificial_event() || ev->is_relay_log_event() ||
(ev->when == 0)))
rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time;
mysql_mutex_lock(&rli->data_lock);
/* Mutex will be released in apply_event_and_update_pos(). */
err= apply_event_and_update_pos(ev, thd, rgi, rpt);
err= apply_event_and_update_pos_for_parallel(ev, thd, rgi);
thread_safe_increment64(&rli->executed_entries);
/* ToDo: error handling. */
......
......@@ -3377,39 +3377,17 @@ has_temporary_error(THD *thd)
}
/**
Applies the given event and advances the relay log position.
In essence, this function does:
@code
ev->apply_event(rli);
ev->update_pos(rli);
@endcode
But it also does some maintainance, such as skipping events if
needed and reporting errors.
If the @c skip flag is set, then it is tested whether the event
should be skipped, by looking at the slave_skip_counter and the
server id. The skip flag should be set when calling this from a
replication thread but not set when executing an explicit BINLOG
statement.
@retval 0 OK.
@retval 1 Error calling ev->apply_event().
/*
First half of apply_event_and_update_pos(), see below.
Setup some THD variables for applying the event.
@retval 2 No error calling ev->apply_event(), but error calling
ev->update_pos().
Split out so that it can run with rli->data_lock held in non-parallel
replication, but without the mutex held in the parallel case.
*/
int apply_event_and_update_pos(Log_event* ev, THD* thd,
rpl_group_info *rgi,
rpl_parallel_thread *rpt)
static int
apply_event_and_update_pos_setup(Log_event* ev, THD* thd, rpl_group_info *rgi)
{
int exec_res= 0;
Relay_log_info* rli= rgi->rli;
DBUG_ENTER("apply_event_and_update_pos");
DBUG_ENTER("apply_event_and_update_pos_setup");
DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)",
ev->get_type_str(), ev->get_type_code(),
......@@ -3459,13 +3437,23 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0);
ev->thd = thd; // because up to this point, ev->thd == 0
int reason= ev->shall_skip(rgi);
if (reason == Log_event::EVENT_SKIP_COUNT)
{
DBUG_ASSERT(rli->slave_skip_counter > 0);
rli->slave_skip_counter--;
}
mysql_mutex_unlock(&rli->data_lock);
DBUG_RETURN(ev->shall_skip(rgi));
}
/*
Second half of apply_event_and_update_pos(), see below.
Do the actual event apply (or skip), and position update.
*/
static int
apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
int reason)
{
int exec_res= 0;
Relay_log_info* rli= rgi->rli;
DBUG_ENTER("apply_event_and_update_pos_apply");
DBUG_EXECUTE_IF("inject_slave_sql_before_apply_event",
{
DBUG_ASSERT(!debug_sync_set_action
......@@ -3553,6 +3541,72 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
}
/**
Applies the given event and advances the relay log position.
In essence, this function does:
@code
ev->apply_event(rli);
ev->update_pos(rli);
@endcode
But it also does some maintainance, such as skipping events if
needed and reporting errors.
If the @c skip flag is set, then it is tested whether the event
should be skipped, by looking at the slave_skip_counter and the
server id. The skip flag should be set when calling this from a
replication thread but not set when executing an explicit BINLOG
statement.
@retval 0 OK.
@retval 1 Error calling ev->apply_event().
@retval 2 No error calling ev->apply_event(), but error calling
ev->update_pos().
This function is only used in non-parallel replication, where it is called
with rli->data_lock held; this lock is released during this function.
*/
int
apply_event_and_update_pos(Log_event* ev, THD* thd, rpl_group_info *rgi)
{
Relay_log_info* rli= rgi->rli;
mysql_mutex_assert_owner(&rli->data_lock);
int reason= apply_event_and_update_pos_setup(ev, thd, rgi);
if (reason == Log_event::EVENT_SKIP_COUNT)
{
DBUG_ASSERT(rli->slave_skip_counter > 0);
rli->slave_skip_counter--;
}
mysql_mutex_unlock(&rli->data_lock);
return apply_event_and_update_pos_apply(ev, thd, rgi, reason);
}
/*
The version of above apply_event_and_update_pos() used in parallel
replication. Unlike the non-parallel case, this function is called without
rli->data_lock held.
*/
int
apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
rpl_group_info *rgi)
{
Relay_log_info* rli= rgi->rli;
mysql_mutex_assert_not_owner(&rli->data_lock);
int reason= apply_event_and_update_pos_setup(ev, thd, rgi);
/*
In parallel replication, sql_slave_skip_counter is handled in the SQL
driver thread, so 23 should never see EVENT_SKIP_COUNT here.
*/
DBUG_ASSERT(reason != Log_event::EVENT_SKIP_COUNT);
return apply_event_and_update_pos_apply(ev, thd, rgi, reason);
}
/**
Keep the relay log transaction state up to date.
......@@ -3803,7 +3857,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos;
serial_rgi->event_relay_log_name= rli->event_relay_log_name;
serial_rgi->event_relay_log_pos= rli->event_relay_log_pos;
exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL);
exec_res= apply_event_and_update_pos(ev, thd, serial_rgi);
#ifdef WITH_WSREP
WSREP_DEBUG("apply_event_and_update_pos() result: %d", exec_res);
......
......@@ -243,8 +243,9 @@ void set_slave_thread_default_charset(THD *thd, rpl_group_info *rgi);
int rotate_relay_log(Master_info* mi);
int has_temporary_error(THD *thd);
int apply_event_and_update_pos(Log_event* ev, THD* thd,
struct rpl_group_info *rgi,
rpl_parallel_thread *rpt);
struct rpl_group_info *rgi);
int apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
struct rpl_group_info *rgi);
pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(rpl_group_info *rgi, THD *thd);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment