Commit eec04fb4 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-6680: Performance of domain_parallel replication is disappointing

The code that handles free lists of various objects passed to worker threads
in parallel replication handles freeing in batches, to avoid taking and
releasing LOCK_rpl_thread too often. However, it was possible for freeing to
be delayed to the point where one thread could stall the SQL driver thread due
to full queue, while other worker threads might be idle. This could
significantly degrade possible parallelism and thus performance.

Clean up the batch freeing code so that it is more robust and now able to
regularly free batches of object, so that normally the queue will not run full
unless the SQL driver thread is really far ahead of the worker threads.
parent 8a3e2f29
......@@ -8,6 +8,15 @@
Code for optional parallel execution of replicated events on the slave.
*/
/*
Maximum number of queued events to accumulate in a local free list, before
moving them to the global free list. There is additional a limit of how much
to accumulate based on opt_slave_parallel_max_queued.
*/
#define QEV_BATCH_FREE 200
struct rpl_parallel_thread_pool global_rpl_thread_pool;
static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
......@@ -510,14 +519,8 @@ handle_rpl_parallel_thread(void *arg)
rpl_group_info *group_rgi= NULL;
group_commit_orderer *gco, *tmp_gco;
uint64 event_gtid_sub_id= 0;
rpl_parallel_thread::queued_event *qevs_to_free;
rpl_group_info *rgis_to_free;
group_commit_orderer *gcos_to_free;
rpl_sql_thread_info sql_info(NULL);
size_t total_event_size;
int err;
inuse_relaylog *last_ir;
uint64 accumulated_ir_count;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
......@@ -559,6 +562,8 @@ handle_rpl_parallel_thread(void *arg)
while (!rpt->stop)
{
rpl_parallel_thread::queued_event *qev, *next_qev;
thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
&stage_waiting_for_work_from_sql_thread, &old_stage);
/*
......@@ -580,28 +585,21 @@ handle_rpl_parallel_thread(void *arg)
thd->EXIT_COND(&old_stage);
more_events:
qevs_to_free= NULL;
rgis_to_free= NULL;
gcos_to_free= NULL;
total_event_size= 0;
while (events)
for (qev= events; qev; qev= next_qev)
{
struct rpl_parallel_thread::queued_event *next= events->next;
Log_event_type event_type;
rpl_group_info *rgi= events->rgi;
rpl_group_info *rgi= qev->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry;
bool end_of_group, group_ending;
total_event_size+= events->event_size;
if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
next_qev= qev->next;
if (qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
{
handle_queued_pos_update(thd, events);
events->next= qevs_to_free;
qevs_to_free= events;
events= next;
handle_queued_pos_update(thd, qev);
rpt->loc_free_qev(qev);
continue;
}
else if (events->typ ==
else if (qev->typ ==
rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART)
{
if (in_event_group)
......@@ -613,24 +611,21 @@ handle_rpl_parallel_thread(void *arg)
group_rgi->cleanup_context(thd, 1);
in_event_group= false;
finish_event_group(thd, group_rgi->gtid_sub_id,
events->entry_for_queued, group_rgi);
qev->entry_for_queued, group_rgi);
group_rgi->next= rgis_to_free;
rgis_to_free= group_rgi;
rpt->loc_free_rgi(group_rgi);
thd->rgi_slave= group_rgi= NULL;
}
events->next= qevs_to_free;
qevs_to_free= events;
events= next;
rpt->loc_free_qev(qev);
continue;
}
DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
if ((event_type= qev->ev->get_type_code()) == GTID_EVENT)
{
bool did_enter_cond= false;
PSI_stage_info old_stage;
......@@ -643,7 +638,7 @@ handle_rpl_parallel_thread(void *arg)
similar), without any terminating COMMIT/ROLLBACK/XID.
*/
group_standalone=
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
(0 != (static_cast<Gtid_log_event *>(qev->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
event_gtid_sub_id= rgi->gtid_sub_id;
......@@ -704,8 +699,7 @@ handle_rpl_parallel_thread(void *arg)
*/
DBUG_ASSERT(!tmp_gco->prev_gco);
gco->prev_gco= NULL;
tmp_gco->next_gco= gcos_to_free;
gcos_to_free= tmp_gco;
rpt->loc_free_gco(tmp_gco);
}
if (entry->force_abort && wait_count > entry->stop_count)
......@@ -766,7 +760,7 @@ handle_rpl_parallel_thread(void *arg)
}
}
group_ending= is_group_ending(events->ev, event_type);
group_ending= is_group_ending(qev->ev, event_type);
if (group_ending && likely(!rgi->worker_error))
{
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
......@@ -782,20 +776,20 @@ handle_rpl_parallel_thread(void *arg)
if (likely(!rgi->worker_error) && !skip_event_group)
{
++rgi->retry_event_count;
err= rpt_handle_event(events, rpt);
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
err= rpt_handle_event(qev, rpt);
delete_or_keep_event_post_apply(rgi, event_type, qev->ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
err= dbug_simulate_tmp_error(rgi, thd););
if (err)
{
convert_kill_to_deadlock_error(rgi);
if (has_temporary_error(thd) && slave_trans_retries > 0)
err= retry_event_group(rgi, rpt, events);
err= retry_event_group(rgi, rpt, qev);
}
}
else
{
delete events->ev;
delete qev->ev;
err= thd->wait_for_prior_commit();
}
......@@ -804,8 +798,7 @@ handle_rpl_parallel_thread(void *arg)
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
group_ending);
events->next= qevs_to_free;
qevs_to_free= events;
rpt->loc_free_qev(qev);
if (unlikely(err))
{
......@@ -820,61 +813,20 @@ handle_rpl_parallel_thread(void *arg)
{
in_event_group= false;
finish_event_group(thd, event_gtid_sub_id, entry, rgi);
rgi->next= rgis_to_free;
rgis_to_free= rgi;
rpt->loc_free_rgi(rgi);
thd->rgi_slave= group_rgi= rgi= NULL;
skip_event_group= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
events= next;
}
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
/* Signal that our queue can now accept more events. */
rpt->dequeue2(total_event_size);
mysql_cond_signal(&rpt->COND_rpl_thread_queue);
/* We need to delay the free here, to when we have the lock. */
while (gcos_to_free)
{
group_commit_orderer *next= gcos_to_free->next_gco;
rpt->free_gco(gcos_to_free);
gcos_to_free= next;
}
while (rgis_to_free)
{
rpl_group_info *next= rgis_to_free->next;
rpt->free_rgi(rgis_to_free);
rgis_to_free= next;
}
last_ir= NULL;
accumulated_ir_count= 0;
while (qevs_to_free)
{
rpl_parallel_thread::queued_event *next= qevs_to_free->next;
inuse_relaylog *ir= qevs_to_free->ir;
/* Batch up refcount update to reduce use of synchronised operations. */
if (last_ir != ir)
{
if (last_ir)
{
my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
accumulated_ir_count= 0;
}
last_ir= ir;
}
++accumulated_ir_count;
rpt->free_qev(qevs_to_free);
qevs_to_free= next;
}
if (last_ir)
{
my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
}
/*
Now that we have the lock, we can move everything from our local free
lists to the real free lists that are also accessible from the SQL
driver thread.
*/
rpt->batch_free();
if ((events= rpt->event_queue) != NULL)
{
......@@ -887,6 +839,7 @@ handle_rpl_parallel_thread(void *arg)
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
goto more_events;
}
rpt->inuse_relaylog_refcount_update();
if (in_event_group && group_rgi->parallel_entry->force_abort)
{
......@@ -1122,6 +1075,51 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
}
void
rpl_parallel_thread::batch_free()
{
mysql_mutex_assert_owner(&LOCK_rpl_thread);
if (loc_qev_list)
{
*loc_qev_last_ptr_ptr= qev_free_list;
qev_free_list= loc_qev_list;
loc_qev_list= NULL;
dequeue2(loc_qev_size);
/* Signal that our queue can now accept more events. */
mysql_cond_signal(&COND_rpl_thread_queue);
loc_qev_size= 0;
qev_free_pending= 0;
}
if (loc_rgi_list)
{
*loc_rgi_last_ptr_ptr= rgi_free_list;
rgi_free_list= loc_rgi_list;
loc_rgi_list= NULL;
}
if (loc_gco_list)
{
*loc_gco_last_ptr_ptr= gco_free_list;
gco_free_list= loc_gco_list;
loc_gco_list= NULL;
}
}
void
rpl_parallel_thread::inuse_relaylog_refcount_update()
{
inuse_relaylog *ir= accumulated_ir_last;
if (ir)
{
my_atomic_rwlock_wrlock(&ir->rli->inuse_relaylog_atomic_lock);
my_atomic_add64(&ir->dequeued_count, accumulated_ir_count);
my_atomic_rwlock_wrunlock(&ir->rli->inuse_relaylog_atomic_lock);
accumulated_ir_count= 0;
accumulated_ir_last= NULL;
}
}
rpl_parallel_thread::queued_event *
rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
{
......@@ -1175,6 +1173,43 @@ rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
}
void
rpl_parallel_thread::loc_free_qev(rpl_parallel_thread::queued_event *qev)
{
inuse_relaylog *ir= qev->ir;
inuse_relaylog *last_ir= accumulated_ir_last;
if (ir != last_ir)
{
if (last_ir)
inuse_relaylog_refcount_update();
accumulated_ir_last= ir;
}
++accumulated_ir_count;
if (!loc_qev_list)
loc_qev_last_ptr_ptr= &qev->next;
else
qev->next= loc_qev_list;
loc_qev_list= qev;
loc_qev_size+= qev->event_size;
/*
We want to release to the global free list only occasionally, to avoid
having to take the LOCK_rpl_thread muted too many times.
However, we do need to release regularly. If we let the unreleased part
grow too large, then the SQL driver thread may go to sleep waiting for
the queue to drop below opt_slave_parallel_max_queued, and this in turn
can stall all other worker threads for more stuff to do.
*/
if (++qev_free_pending >= QEV_BATCH_FREE ||
loc_qev_size >= opt_slave_parallel_max_queued/3)
{
mysql_mutex_lock(&LOCK_rpl_thread);
batch_free();
mysql_mutex_unlock(&LOCK_rpl_thread);
}
}
void
rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
{
......@@ -1223,6 +1258,19 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
}
void
rpl_parallel_thread::loc_free_rgi(rpl_group_info *rgi)
{
DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
rgi->free_annotate_event();
if (!loc_rgi_list)
loc_rgi_last_ptr_ptr= &rgi->next;
else
rgi->next= loc_rgi_list;
loc_rgi_list= rgi;
}
void
rpl_parallel_thread::free_rgi(rpl_group_info *rgi)
{
......@@ -1257,12 +1305,14 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev)
void
rpl_parallel_thread::free_gco(group_commit_orderer *gco)
rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
{
mysql_mutex_assert_owner(&LOCK_rpl_thread);
DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */);
gco->next_gco= gco_free_list;
gco_free_list= gco;
if (!loc_gco_list)
loc_gco_last_ptr_ptr= &gco->next_gco;
else
gco->next_gco= loc_gco_list;
loc_gco_list= gco;
}
......
......@@ -96,9 +96,28 @@ struct rpl_parallel_thread {
size_t event_size;
} *event_queue, *last_in_queue;
uint64 queued_size;
/* These free lists are protected by LOCK_rpl_thread. */
queued_event *qev_free_list;
rpl_group_info *rgi_free_list;
group_commit_orderer *gco_free_list;
/*
These free lists are local to the thread, so need not be protected by any
lock. They are moved to the global free lists in batches in the function
batch_free(), to reduce LOCK_rpl_thread contention.
The lists are not NULL-terminated (as we do not need to traverse them).
Instead, if they are non-NULL, the loc_XXX_last_ptr_ptr points to the
`next' pointer of the last element, which is used to link into the front
of the global freelists.
*/
queued_event *loc_qev_list, **loc_qev_last_ptr_ptr;
size_t loc_qev_size;
uint64 qev_free_pending;
rpl_group_info *loc_rgi_list, **loc_rgi_last_ptr_ptr;
group_commit_orderer *loc_gco_list, **loc_gco_last_ptr_ptr;
/* These keep track of batch update of inuse_relaylog refcounts. */
inuse_relaylog *accumulated_ir_last;
uint64 accumulated_ir_count;
void enqueue(queued_event *qev)
{
......@@ -127,12 +146,41 @@ struct rpl_parallel_thread {
queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev,
const char *relay_log_name,
ulonglong event_pos, ulonglong event_size);
/*
Put a qev on the local free list, to be later released to the global free
list by batch_free().
*/
void loc_free_qev(queued_event *qev);
/*
Release an rgi immediately to the global free list. Requires holding the
LOCK_rpl_thread mutex.
*/
void free_qev(queued_event *qev);
rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rpl_parallel_entry *e, ulonglong event_size);
/*
Put an gco on the local free list, to be later released to the global free
list by batch_free().
*/
void loc_free_rgi(rpl_group_info *rgi);
/*
Release an rgi immediately to the global free list. Requires holding the
LOCK_rpl_thread mutex.
*/
void free_rgi(rpl_group_info *rgi);
group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev);
void free_gco(group_commit_orderer *gco);
/*
Put a gco on the local free list, to be later released to the global free
list by batch_free().
*/
void loc_free_gco(group_commit_orderer *gco);
/*
Move all local free lists to the global ones. Requires holding
LOCK_rpl_thread.
*/
void batch_free();
/* Update inuse_relaylog refcounts with what we have accumulated so far. */
void inuse_relaylog_refcount_update();
};
......
......@@ -1390,6 +1390,7 @@ Relay_log_info::alloc_inuse_relaylog(const char *name)
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
return 1;
}
ir->rli= this;
strmake_buf(ir->name, name);
if (!inuse_relaylog_list)
......
......@@ -496,6 +496,7 @@ class Relay_log_info : public Slave_reporting_capability
*/
struct inuse_relaylog {
inuse_relaylog *next;
Relay_log_info *rli;
/* Number of events in this relay log queued for worker threads. */
int64 queued_count;
/* Number of events completed by worker threads. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment