Commit f27817c1 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-7326: Server deadlock in connection with parallel replication

The bug occurs when a transaction does a retry after all transactions have
done mark_start_commit() in a batch of group commit from the master. In this
case, the retrying transaction can unmark_start_commit() after the following
batch has already started running and de-allocated the GCO. Then after retry,
the transaction will re-do mark_start_commit() on a de-allocated GCO, and also
wakeup of later GCOs can be lost.

This was seen "in the wild" by a user, even though it is not known exactly
what circumstances can lead to retry of one transaction after all transactions
in a group have reached the commit phase.

The lifetime around GCO was somewhat clunky anyway. With this patch, a GCO
lives until rpl_parallel_entry::last_committed_sub_id has reached the last
transaction in the GCO. This guarantees that the GCO will still be alive when
a transaction does mark_start_commit(). Also, we now loop over the list of
active GCOs for wakeup, to ensure we do not lose a wakeup even in the
problematic case.
parent 4a325159
...@@ -1023,6 +1023,119 @@ SET GLOBAL slave_parallel_threads=0; ...@@ -1023,6 +1023,119 @@ SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10; SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=slave_pos; CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc include/start_slave.inc
*** MDEV-7326 Server deadlock in connection with parallel replication ***
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=3;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_xid";
include/start_slave.inc
SET @old_format= @@SESSION.binlog_format;
SET binlog_format= STATEMENT;
INSERT INTO t1 VALUES (foo(50,
"rpl_parallel_start_waiting_for_prior SIGNAL t3_ready",
"rpl_parallel_end_of_group SIGNAL prep_ready WAIT_FOR prep_cont"));
SET DEBUG_SYNC= "now WAIT_FOR prep_ready";
INSERT INTO t2 VALUES (foo(50,
"rpl_parallel_simulate_temp_err_xid SIGNAL t1_ready1 WAIT_FOR t1_cont1",
"rpl_parallel_retry_after_unmark SIGNAL t1_ready2 WAIT_FOR t1_cont2"));
SET DEBUG_SYNC= "now WAIT_FOR t1_ready1";
INSERT INTO t1 VALUES (foo(51,
"rpl_parallel_before_mark_start_commit SIGNAL t2_ready1 WAIT_FOR t2_cont1",
"rpl_parallel_after_mark_start_commit SIGNAL t2_ready2"));
SET DEBUG_SYNC= "now WAIT_FOR t2_ready1";
SET DEBUG_SYNC= "now SIGNAL t1_cont1";
SET DEBUG_SYNC= "now WAIT_FOR t1_ready2";
INSERT INTO t1 VALUES (52);
SET BINLOG_FORMAT= @old_format;
SELECT * FROM t2 WHERE a>=50 ORDER BY a;
a
50
SELECT * FROM t1 WHERE a>=50 ORDER BY a;
a
50
51
52
SET DEBUG_SYNC= "now SIGNAL prep_cont";
SET DEBUG_SYNC= "now WAIT_FOR t3_ready";
SET DEBUG_SYNC= "now SIGNAL t2_cont1";
SET DEBUG_SYNC= "now WAIT_FOR t2_ready2";
SET DEBUG_SYNC= "now SIGNAL t1_cont2";
SELECT * FROM t2 WHERE a>=50 ORDER BY a;
a
50
SELECT * FROM t1 WHERE a>=50 ORDER BY a;
a
50
51
52
SET DEBUG_SYNC="reset";
include/stop_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
*** MDEV-7326 Server deadlock in connection with parallel replication ***
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=3;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_xid";
include/start_slave.inc
SET @old_format= @@SESSION.binlog_format;
SET binlog_format= STATEMENT;
INSERT INTO t1 VALUES (foo(60,
"rpl_parallel_start_waiting_for_prior SIGNAL t3_ready",
"rpl_parallel_end_of_group SIGNAL prep_ready WAIT_FOR prep_cont"));
SET DEBUG_SYNC= "now WAIT_FOR prep_ready";
INSERT INTO t2 VALUES (foo(60,
"rpl_parallel_simulate_temp_err_xid SIGNAL t1_ready1 WAIT_FOR t1_cont1",
"rpl_parallel_retry_after_unmark SIGNAL t1_ready2 WAIT_FOR t1_cont2"));
SET DEBUG_SYNC= "now WAIT_FOR t1_ready1";
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
SET binlog_format=statement;
INSERT INTO t1 VALUES (foo(61,
"rpl_parallel_before_mark_start_commit SIGNAL t2_ready1 WAIT_FOR t2_cont1",
"rpl_parallel_after_mark_start_commit SIGNAL t2_ready2"));
SET debug_sync='now WAIT_FOR master_queued1';
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
INSERT INTO t6 VALUES (62);
SET debug_sync='now WAIT_FOR master_queued2';
SET debug_sync='now SIGNAL master_cont1';
SET debug_sync='RESET';
SET BINLOG_FORMAT= @old_format;
SELECT * FROM t2 WHERE a>=60 ORDER BY a;
a
60
SELECT * FROM t1 WHERE a>=60 ORDER BY a;
a
60
61
SELECT * FROM t6 WHERE a>=60 ORDER BY a;
a
62
SET DEBUG_SYNC= "now WAIT_FOR t2_ready1";
SET DEBUG_SYNC= "now SIGNAL t1_cont1";
SET DEBUG_SYNC= "now WAIT_FOR t1_ready2";
SET DEBUG_SYNC= "now SIGNAL prep_cont";
SET DEBUG_SYNC= "now WAIT_FOR t3_ready";
SET DEBUG_SYNC= "now SIGNAL t2_cont1";
SET DEBUG_SYNC= "now WAIT_FOR t2_ready2";
SET DEBUG_SYNC= "now SIGNAL t1_cont2";
SELECT * FROM t2 WHERE a>=60 ORDER BY a;
a
60
SELECT * FROM t1 WHERE a>=60 ORDER BY a;
a
60
61
SELECT * FROM t6 WHERE a>=60 ORDER BY a;
a
62
SET DEBUG_SYNC="reset";
include/stop_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
include/stop_slave.inc include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads; SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc include/start_slave.inc
......
...@@ -1636,6 +1636,213 @@ CHANGE MASTER TO master_use_gtid=slave_pos; ...@@ -1636,6 +1636,213 @@ CHANGE MASTER TO master_use_gtid=slave_pos;
--source include/start_slave.inc --source include/start_slave.inc
--echo *** MDEV-7326 Server deadlock in connection with parallel replication ***
# We use three transactions, each in a separate group commit.
# T1 does mark_start_commit(), then gets a deadlock error.
# T2 wakes up and starts running
# T1 does unmark_start_commit()
# T3 goes to wait for T2 to start its commit
# T2 does mark_start_commit()
# The bug was that at this point, T3 got deadlocked. Because T1 has unmarked(),
# T3 did not yet see the count_committing_event_groups reach its target value
# yet. But when T1 later re-did mark_start_commit(), it failed to send a wakeup
# to T3.
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=3;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_xid";
--source include/start_slave.inc
--connection server_1
SET @old_format= @@SESSION.binlog_format;
SET binlog_format= STATEMENT;
# This debug_sync will linger on and be used to control T3 later.
INSERT INTO t1 VALUES (foo(50,
"rpl_parallel_start_waiting_for_prior SIGNAL t3_ready",
"rpl_parallel_end_of_group SIGNAL prep_ready WAIT_FOR prep_cont"));
--save_master_pos
--connection server_2
# Wait for the debug_sync point for T3 to be set. But let the preparation
# transaction remain hanging, so that T1 and T2 will be scheduled for the
# remaining two worker threads.
SET DEBUG_SYNC= "now WAIT_FOR prep_ready";
--connection server_1
INSERT INTO t2 VALUES (foo(50,
"rpl_parallel_simulate_temp_err_xid SIGNAL t1_ready1 WAIT_FOR t1_cont1",
"rpl_parallel_retry_after_unmark SIGNAL t1_ready2 WAIT_FOR t1_cont2"));
--save_master_pos
--connection server_2
SET DEBUG_SYNC= "now WAIT_FOR t1_ready1";
# T1 has now done mark_start_commit(). It will later do a rollback and retry.
--connection server_1
# Use a MyISAM table for T2 and T3, so they do not trigger the
# rpl_parallel_simulate_temp_err_xid DBUG insertion on XID event.
INSERT INTO t1 VALUES (foo(51,
"rpl_parallel_before_mark_start_commit SIGNAL t2_ready1 WAIT_FOR t2_cont1",
"rpl_parallel_after_mark_start_commit SIGNAL t2_ready2"));
--connection server_2
SET DEBUG_SYNC= "now WAIT_FOR t2_ready1";
# T2 has now started running, but has not yet done mark_start_commit()
SET DEBUG_SYNC= "now SIGNAL t1_cont1";
SET DEBUG_SYNC= "now WAIT_FOR t1_ready2";
# T1 has now done unmark_start_commit() in preparation for its retry.
--connection server_1
INSERT INTO t1 VALUES (52);
SET BINLOG_FORMAT= @old_format;
SELECT * FROM t2 WHERE a>=50 ORDER BY a;
SELECT * FROM t1 WHERE a>=50 ORDER BY a;
--connection server_2
# Let the preparation transaction complete, so that the same worker thread
# can continue with the transaction T3.
SET DEBUG_SYNC= "now SIGNAL prep_cont";
SET DEBUG_SYNC= "now WAIT_FOR t3_ready";
# T3 has now gone to wait for T2 to start committing
SET DEBUG_SYNC= "now SIGNAL t2_cont1";
SET DEBUG_SYNC= "now WAIT_FOR t2_ready2";
# T2 has now done mark_start_commit().
# Let things run, and check that T3 does not get deadlocked.
SET DEBUG_SYNC= "now SIGNAL t1_cont2";
--sync_with_master
--connection server_1
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t2 WHERE a>=50 ORDER BY a;
SELECT * FROM t1 WHERE a>=50 ORDER BY a;
SET DEBUG_SYNC="reset";
# Re-spawn the worker threads to remove any DBUG injections or DEBUG_SYNC.
--source include/stop_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
--source include/start_slave.inc
--echo *** MDEV-7326 Server deadlock in connection with parallel replication ***
# Similar to the previous test, but with T2 and T3 in the same GCO.
# We use three transactions, T1 in one group commit and T2/T3 in another.
# T1 does mark_start_commit(), then gets a deadlock error.
# T2 wakes up and starts running
# T1 does unmark_start_commit()
# T3 goes to wait for T1 to start its commit
# T2 does mark_start_commit()
# The bug was that at this point, T3 got deadlocked. T2 increments the
# count_committing_event_groups but does not signal T3, as they are in
# the same GCO. Then later when T1 increments, it would also not signal
# T3, because now the count_committing_event_groups is not equal to the
# wait_count of T3 (it is one larger).
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=3;
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_xid";
--source include/start_slave.inc
--connection server_1
SET @old_format= @@SESSION.binlog_format;
SET binlog_format= STATEMENT;
# This debug_sync will linger on and be used to control T3 later.
INSERT INTO t1 VALUES (foo(60,
"rpl_parallel_start_waiting_for_prior SIGNAL t3_ready",
"rpl_parallel_end_of_group SIGNAL prep_ready WAIT_FOR prep_cont"));
--save_master_pos
--connection server_2
# Wait for the debug_sync point for T3 to be set. But let the preparation
# transaction remain hanging, so that T1 and T2 will be scheduled for the
# remaining two worker threads.
SET DEBUG_SYNC= "now WAIT_FOR prep_ready";
--connection server_1
INSERT INTO t2 VALUES (foo(60,
"rpl_parallel_simulate_temp_err_xid SIGNAL t1_ready1 WAIT_FOR t1_cont1",
"rpl_parallel_retry_after_unmark SIGNAL t1_ready2 WAIT_FOR t1_cont2"));
--save_master_pos
--connection server_2
SET DEBUG_SYNC= "now WAIT_FOR t1_ready1";
# T1 has now done mark_start_commit(). It will later do a rollback and retry.
# Do T2 and T3 in a single group commit.
# Use a MyISAM table for T2 and T3, so they do not trigger the
# rpl_parallel_simulate_temp_err_xid DBUG insertion on XID event.
--connection con_temp3
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
SET binlog_format=statement;
send INSERT INTO t1 VALUES (foo(61,
"rpl_parallel_before_mark_start_commit SIGNAL t2_ready1 WAIT_FOR t2_cont1",
"rpl_parallel_after_mark_start_commit SIGNAL t2_ready2"));
--connection server_1
SET debug_sync='now WAIT_FOR master_queued1';
--connection con_temp4
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
send INSERT INTO t6 VALUES (62);
--connection server_1
SET debug_sync='now WAIT_FOR master_queued2';
SET debug_sync='now SIGNAL master_cont1';
--connection con_temp3
REAP;
--connection con_temp4
REAP;
--connection server_1
SET debug_sync='RESET';
SET BINLOG_FORMAT= @old_format;
SELECT * FROM t2 WHERE a>=60 ORDER BY a;
SELECT * FROM t1 WHERE a>=60 ORDER BY a;
SELECT * FROM t6 WHERE a>=60 ORDER BY a;
--connection server_2
SET DEBUG_SYNC= "now WAIT_FOR t2_ready1";
# T2 has now started running, but has not yet done mark_start_commit()
SET DEBUG_SYNC= "now SIGNAL t1_cont1";
SET DEBUG_SYNC= "now WAIT_FOR t1_ready2";
# T1 has now done unmark_start_commit() in preparation for its retry.
--connection server_2
# Let the preparation transaction complete, so that the same worker thread
# can continue with the transaction T3.
SET DEBUG_SYNC= "now SIGNAL prep_cont";
SET DEBUG_SYNC= "now WAIT_FOR t3_ready";
# T3 has now gone to wait for T2 to start committing
SET DEBUG_SYNC= "now SIGNAL t2_cont1";
SET DEBUG_SYNC= "now WAIT_FOR t2_ready2";
# T2 has now done mark_start_commit().
# Let things run, and check that T3 does not get deadlocked.
SET DEBUG_SYNC= "now SIGNAL t1_cont2";
--sync_with_master
--connection server_1
--save_master_pos
--connection server_2
--sync_with_master
SELECT * FROM t2 WHERE a>=60 ORDER BY a;
SELECT * FROM t1 WHERE a>=60 ORDER BY a;
SELECT * FROM t6 WHERE a>=60 ORDER BY a;
SET DEBUG_SYNC="reset";
# Re-spawn the worker threads to remove any DBUG injections or DEBUG_SYNC.
--source include/stop_slave.inc
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
--source include/start_slave.inc
# Clean up. # Clean up.
--connection server_2 --connection server_2
--source include/stop_slave.inc --source include/stop_slave.inc
......
...@@ -106,9 +106,10 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) ...@@ -106,9 +106,10 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
static void static void
finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
rpl_group_info *rgi) rpl_parallel_entry *entry, rpl_group_info *rgi)
{ {
THD *thd= rpt->thd;
wait_for_commit *wfc= &rgi->commit_orderer; wait_for_commit *wfc= &rgi->commit_orderer;
int err; int err;
...@@ -139,6 +140,16 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, ...@@ -139,6 +140,16 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
signal_error_to_sql_driver_thread(thd, rgi, err); signal_error_to_sql_driver_thread(thd, rgi, err);
thd->wait_for_commit_ptr= NULL; thd->wait_for_commit_ptr= NULL;
mysql_mutex_lock(&entry->LOCK_parallel_entry);
/*
We need to mark that this event group started its commit phase, in case we
missed it before (otherwise we would deadlock the next event group that is
waiting for this). In most cases (normal DML), it will be a no-op.
*/
rgi->mark_start_commit_no_lock();
if (entry->last_committed_sub_id < sub_id)
{
/* /*
Record that this event group has finished (eg. transaction is Record that this event group has finished (eg. transaction is
committed, if transactional), so other event groups will no longer committed, if transactional), so other event groups will no longer
...@@ -154,10 +165,22 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, ...@@ -154,10 +165,22 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
this commit is done, then any prior commits will also have been this commit is done, then any prior commits will also have been
done and also no longer need waiting for. done and also no longer need waiting for.
*/ */
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (entry->last_committed_sub_id < sub_id)
entry->last_committed_sub_id= sub_id; entry->last_committed_sub_id= sub_id;
/* Now free any GCOs in which all transactions have committed. */
group_commit_orderer *tmp_gco= rgi->gco;
while (tmp_gco &&
(!tmp_gco->next_gco || tmp_gco->last_sub_id > sub_id))
tmp_gco= tmp_gco->prev_gco;
while (tmp_gco)
{
group_commit_orderer *prev_gco= tmp_gco->prev_gco;
tmp_gco->next_gco->prev_gco= NULL;
rpt->loc_free_gco(tmp_gco);
tmp_gco= prev_gco;
}
}
/* /*
If this event group got error, then any following event groups that have If this event group got error, then any following event groups that have
not yet started should just skip their group, preparing for stop of the not yet started should just skip their group, preparing for stop of the
...@@ -166,12 +189,6 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, ...@@ -166,12 +189,6 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
if (unlikely(rgi->worker_error) && if (unlikely(rgi->worker_error) &&
entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX) entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
entry->stop_on_error_sub_id= sub_id; entry->stop_on_error_sub_id= sub_id;
/*
We need to mark that this event group started its commit phase, in case we
missed it before (otherwise we would deadlock the next event group that is
waiting for this). In most cases (normal DML), it will be a no-op.
*/
rgi->mark_start_commit_no_lock();
mysql_mutex_unlock(&entry->LOCK_parallel_entry); mysql_mutex_unlock(&entry->LOCK_parallel_entry);
thd->clear_error(); thd->clear_error();
...@@ -329,6 +346,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, ...@@ -329,6 +346,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
until after the unmark. until after the unmark.
*/ */
rgi->unmark_start_commit(); rgi->unmark_start_commit();
DEBUG_SYNC(thd, "rpl_parallel_retry_after_unmark");
/* /*
We might get the deadlock error that causes the retry during commit, while We might get the deadlock error that causes the retry during commit, while
...@@ -517,7 +535,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -517,7 +535,7 @@ handle_rpl_parallel_thread(void *arg)
bool in_event_group= false; bool in_event_group= false;
bool skip_event_group= false; bool skip_event_group= false;
rpl_group_info *group_rgi= NULL; rpl_group_info *group_rgi= NULL;
group_commit_orderer *gco, *tmp_gco; group_commit_orderer *gco;
uint64 event_gtid_sub_id= 0; uint64 event_gtid_sub_id= 0;
rpl_sql_thread_info sql_info(NULL); rpl_sql_thread_info sql_info(NULL);
int err; int err;
...@@ -610,7 +628,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -610,7 +628,7 @@ handle_rpl_parallel_thread(void *arg)
*/ */
group_rgi->cleanup_context(thd, 1); group_rgi->cleanup_context(thd, 1);
in_event_group= false; in_event_group= false;
finish_event_group(thd, group_rgi->gtid_sub_id, finish_event_group(rpt, group_rgi->gtid_sub_id,
qev->entry_for_queued, group_rgi); qev->entry_for_queued, group_rgi);
rpt->loc_free_rgi(group_rgi); rpt->loc_free_rgi(group_rgi);
...@@ -664,8 +682,12 @@ handle_rpl_parallel_thread(void *arg) ...@@ -664,8 +682,12 @@ handle_rpl_parallel_thread(void *arg)
mysql_mutex_lock(&entry->LOCK_parallel_entry); mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (!gco->installed) if (!gco->installed)
{ {
if (gco->prev_gco) group_commit_orderer *prev_gco= gco->prev_gco;
gco->prev_gco->next_gco= gco; if (prev_gco)
{
prev_gco->last_sub_id= gco->prior_sub_id;
prev_gco->next_gco= gco;
}
gco->installed= true; gco->installed= true;
} }
wait_count= gco->wait_count; wait_count= gco->wait_count;
...@@ -682,6 +704,8 @@ handle_rpl_parallel_thread(void *arg) ...@@ -682,6 +704,8 @@ handle_rpl_parallel_thread(void *arg)
if (thd->check_killed() && !rgi->worker_error) if (thd->check_killed() && !rgi->worker_error)
{ {
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
thd->send_kill_message(); thd->send_kill_message();
slave_output_error_info(rgi, thd); slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, 1); signal_error_to_sql_driver_thread(thd, rgi, 1);
...@@ -698,18 +722,6 @@ handle_rpl_parallel_thread(void *arg) ...@@ -698,18 +722,6 @@ handle_rpl_parallel_thread(void *arg)
} while (wait_count > entry->count_committing_event_groups); } while (wait_count > entry->count_committing_event_groups);
} }
if ((tmp_gco= gco->prev_gco))
{
/*
Now all the event groups in the previous batch have entered their
commit phase, and will no longer access their gco. So we can free
it here.
*/
DBUG_ASSERT(!tmp_gco->prev_gco);
gco->prev_gco= NULL;
rpt->loc_free_gco(tmp_gco);
}
if (entry->force_abort && wait_count > entry->stop_count) if (entry->force_abort && wait_count > entry->stop_count)
{ {
/* /*
...@@ -773,6 +785,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -773,6 +785,7 @@ handle_rpl_parallel_thread(void *arg)
{ {
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
rgi->mark_start_commit(); rgi->mark_start_commit();
DEBUG_SYNC(thd, "rpl_parallel_after_mark_start_commit");
} }
/* /*
...@@ -793,6 +806,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -793,6 +806,7 @@ handle_rpl_parallel_thread(void *arg)
thd->get_stmt_da()->reset_diagnostics_area(); thd->get_stmt_da()->reset_diagnostics_area();
my_error(ER_LOCK_DEADLOCK, MYF(0)); my_error(ER_LOCK_DEADLOCK, MYF(0));
err= 1; err= 1;
DEBUG_SYNC(thd, "rpl_parallel_simulate_temp_err_xid");
}); });
if (!err) if (!err)
#endif #endif
...@@ -832,7 +846,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -832,7 +846,7 @@ handle_rpl_parallel_thread(void *arg)
if (end_of_group) if (end_of_group)
{ {
in_event_group= false; in_event_group= false;
finish_event_group(thd, event_gtid_sub_id, entry, rgi); finish_event_group(rpt, event_gtid_sub_id, entry, rgi);
rpt->loc_free_rgi(rgi); rpt->loc_free_rgi(rgi);
thd->rgi_slave= group_rgi= rgi= NULL; thd->rgi_slave= group_rgi= rgi= NULL;
skip_event_group= false; skip_event_group= false;
...@@ -873,7 +887,7 @@ handle_rpl_parallel_thread(void *arg) ...@@ -873,7 +887,7 @@ handle_rpl_parallel_thread(void *arg)
*/ */
mysql_mutex_unlock(&rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
signal_error_to_sql_driver_thread(thd, group_rgi, 1); signal_error_to_sql_driver_thread(thd, group_rgi, 1);
finish_event_group(thd, group_rgi->gtid_sub_id, finish_event_group(rpt, group_rgi->gtid_sub_id,
group_rgi->parallel_entry, group_rgi); group_rgi->parallel_entry, group_rgi);
in_event_group= false; in_event_group= false;
mysql_mutex_lock(&rpt->LOCK_rpl_thread); mysql_mutex_lock(&rpt->LOCK_rpl_thread);
...@@ -922,7 +936,6 @@ handle_rpl_parallel_thread(void *arg) ...@@ -922,7 +936,6 @@ handle_rpl_parallel_thread(void *arg)
static void static void
dealloc_gco(group_commit_orderer *gco) dealloc_gco(group_commit_orderer *gco)
{ {
DBUG_ASSERT(!gco->prev_gco /* Must only free after dealloc previous */);
mysql_cond_destroy(&gco->COND_group_commit_orderer); mysql_cond_destroy(&gco->COND_group_commit_orderer);
my_free(gco); my_free(gco);
} }
...@@ -1303,7 +1316,8 @@ rpl_parallel_thread::free_rgi(rpl_group_info *rgi) ...@@ -1303,7 +1316,8 @@ rpl_parallel_thread::free_rgi(rpl_group_info *rgi)
group_commit_orderer * group_commit_orderer *
rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev) rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev,
uint64 prior_sub_id)
{ {
group_commit_orderer *gco; group_commit_orderer *gco;
mysql_mutex_assert_owner(&LOCK_rpl_thread); mysql_mutex_assert_owner(&LOCK_rpl_thread);
...@@ -1319,6 +1333,7 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev) ...@@ -1319,6 +1333,7 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev)
gco->wait_count= wait_count; gco->wait_count= wait_count;
gco->prev_gco= prev; gco->prev_gco= prev;
gco->next_gco= NULL; gco->next_gco= NULL;
gco->prior_sub_id= prior_sub_id;
gco->installed= false; gco->installed= false;
return gco; return gco;
} }
...@@ -1327,7 +1342,6 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev) ...@@ -1327,7 +1342,6 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev)
void void
rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
{ {
DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */);
if (!loc_gco_list) if (!loc_gco_list)
loc_gco_last_ptr_ptr= &gco->next_gco; loc_gco_last_ptr_ptr= &gco->next_gco;
else else
...@@ -1534,8 +1548,12 @@ static void ...@@ -1534,8 +1548,12 @@ static void
free_rpl_parallel_entry(void *element) free_rpl_parallel_entry(void *element)
{ {
rpl_parallel_entry *e= (rpl_parallel_entry *)element; rpl_parallel_entry *e= (rpl_parallel_entry *)element;
if (e->current_gco) while (e->current_gco)
{
group_commit_orderer *prev_gco= e->current_gco->prev_gco;
dealloc_gco(e->current_gco); dealloc_gco(e->current_gco);
e->current_gco= prev_gco;
}
mysql_cond_destroy(&e->COND_parallel_entry); mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry);
my_free(e); my_free(e);
...@@ -2007,7 +2025,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, ...@@ -2007,7 +2025,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
uint64 count= e->count_queued_event_groups; uint64 count= e->count_queued_event_groups;
group_commit_orderer *gco; group_commit_orderer *gco;
if (!(gco= cur_thread->get_gco(count, e->current_gco))) if (!(gco= cur_thread->get_gco(count, e->current_gco, e->current_sub_id)))
{ {
cur_thread->free_rgi(rgi); cur_thread->free_rgi(rgi);
cur_thread->free_qev(qev); cur_thread->free_qev(qev);
......
...@@ -39,9 +39,12 @@ struct inuse_relaylog; ...@@ -39,9 +39,12 @@ struct inuse_relaylog;
rpl_parallel_entry::count_committing_event_groups has reached rpl_parallel_entry::count_committing_event_groups has reached
gco->next_gco->wait_count. gco->next_gco->wait_count.
- When gco->wait_count is reached for a worker and the wait completes, - The gco lives until all its event groups have completed their commit.
the worker frees gco->prev_gco; at this point it is guaranteed not to This is detected by rpl_parallel_entry::last_committed_sub_id being
be needed any longer. greater than or equal gco->last_sub_id. Once this happens, the gco is
freed. Note that since update of last_committed_sub_id can happen
out-of-order, the thread that frees a given gco can be for any later
event group, not necessarily an event group from the gco being freed.
*/ */
struct group_commit_orderer { struct group_commit_orderer {
/* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */ /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */
...@@ -49,6 +52,16 @@ struct group_commit_orderer { ...@@ -49,6 +52,16 @@ struct group_commit_orderer {
uint64 wait_count; uint64 wait_count;
group_commit_orderer *prev_gco; group_commit_orderer *prev_gco;
group_commit_orderer *next_gco; group_commit_orderer *next_gco;
/*
The sub_id of last event group in this the previous GCO.
Only valid if prev_gco != NULL.
*/
uint64 prior_sub_id;
/*
The sub_id of the last event group in this GCO. Only valid when next_gco
is non-NULL.
*/
uint64 last_sub_id;
bool installed; bool installed;
}; };
...@@ -168,7 +181,8 @@ struct rpl_parallel_thread { ...@@ -168,7 +181,8 @@ struct rpl_parallel_thread {
LOCK_rpl_thread mutex. LOCK_rpl_thread mutex.
*/ */
void free_rgi(rpl_group_info *rgi); void free_rgi(rpl_group_info *rgi);
group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev); group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev,
uint64 first_sub_id);
/* /*
Put a gco on the local free list, to be later released to the global free Put a gco on the local free list, to be later released to the global free
list by batch_free(). list by batch_free().
......
...@@ -1849,11 +1849,20 @@ void rpl_group_info::slave_close_thread_tables(THD *thd) ...@@ -1849,11 +1849,20 @@ void rpl_group_info::slave_close_thread_tables(THD *thd)
static void static void
mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco) mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco,
rpl_group_info *rgi)
{ {
group_commit_orderer *tmp;
uint64 count= ++e->count_committing_event_groups; uint64 count= ++e->count_committing_event_groups;
if (gco->next_gco && gco->next_gco->wait_count == count) /* Signal any following GCO whose wait_count has been reached now. */
mysql_cond_broadcast(&gco->next_gco->COND_group_commit_orderer); tmp= gco;
while ((tmp= tmp->next_gco))
{
uint64 wait_count= tmp->wait_count;
if (wait_count > count)
break;
mysql_cond_broadcast(&tmp->COND_group_commit_orderer);
}
} }
...@@ -1862,7 +1871,7 @@ rpl_group_info::mark_start_commit_no_lock() ...@@ -1862,7 +1871,7 @@ rpl_group_info::mark_start_commit_no_lock()
{ {
if (did_mark_start_commit) if (did_mark_start_commit)
return; return;
mark_start_commit_inner(parallel_entry, gco); mark_start_commit_inner(parallel_entry, gco, this);
did_mark_start_commit= true; did_mark_start_commit= true;
} }
...@@ -1877,7 +1886,7 @@ rpl_group_info::mark_start_commit() ...@@ -1877,7 +1886,7 @@ rpl_group_info::mark_start_commit()
e= this->parallel_entry; e= this->parallel_entry;
mysql_mutex_lock(&e->LOCK_parallel_entry); mysql_mutex_lock(&e->LOCK_parallel_entry);
mark_start_commit_inner(e, gco); mark_start_commit_inner(e, gco, this);
mysql_mutex_unlock(&e->LOCK_parallel_entry); mysql_mutex_unlock(&e->LOCK_parallel_entry);
did_mark_start_commit= true; did_mark_start_commit= true;
} }
......
...@@ -563,6 +563,10 @@ struct rpl_group_info ...@@ -563,6 +563,10 @@ struct rpl_group_info
(When we execute in parallel the transactions that group committed (When we execute in parallel the transactions that group committed
together on the master, we still need to wait for any prior transactions together on the master, we still need to wait for any prior transactions
to have reached the commit stage). to have reached the commit stage).
The pointed-to gco is only valid for as long as
gtid_sub_id < parallel_entry->last_committed_sub_id. After that, it can
be freed by another thread.
*/ */
group_commit_orderer *gco; group_commit_orderer *gco;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment