Commit 4d6ee2d1 authored by unknown's avatar unknown

MDEV-5363: Make parallel replication waits killable

Make wait_for_prior_commit killable, and handle the error if
killed.
parent 4bce09c1
...@@ -6612,12 +6612,13 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, ...@@ -6612,12 +6612,13 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
to commit. If so, we add those to the queue as well, transitively for all to commit. If so, we add those to the queue as well, transitively for all
waiters. waiters.
@retval TRUE If queued as the first entry in the queue (meaning this @retval < 0 Error
@retval > 0 If queued as the first entry in the queue (meaning this
is the leader) is the leader)
@retval FALSE Otherwise @retval 0 Otherwise (queued as participant, leader handles the commit)
*/ */
bool int
MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
{ {
group_commit_entry *entry, *orig_queue; group_commit_entry *entry, *orig_queue;
...@@ -6641,6 +6642,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6641,6 +6642,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
/* Do an extra check here, this time safely under lock. */ /* Do an extra check here, this time safely under lock. */
if (wfc->waiting_for_commit) if (wfc->waiting_for_commit)
{ {
const char *old_msg;
/* /*
By setting wfc->opaque_pointer to our own entry, we mark that we are By setting wfc->opaque_pointer to our own entry, we mark that we are
ready to commit, but waiting for another transaction to commit before ready to commit, but waiting for another transaction to commit before
...@@ -6651,16 +6653,36 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6651,16 +6653,36 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
queued_by_other flag is set. queued_by_other flag is set.
*/ */
wfc->opaque_pointer= orig_entry; wfc->opaque_pointer= orig_entry;
old_msg=
orig_entry->thd->enter_cond(&wfc->COND_wait_commit,
&wfc->LOCK_wait_commit,
"Waiting for prior transaction to commit");
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior"); DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
do while (wfc->waiting_for_commit && !orig_entry->thd->check_killed())
{
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
} while (wfc->waiting_for_commit);
wfc->opaque_pointer= NULL; wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d", DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
orig_entry->queued_by_other)); orig_entry->queued_by_other));
orig_entry->thd->exit_cond(old_msg);
if (wfc->waiting_for_commit)
{
/* Interrupted by kill. */
wfc->wakeup_error= orig_entry->thd->killed_errno();
if (wfc->wakeup_error)
wfc->wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0));
DBUG_RETURN(-1);
}
} }
else
mysql_mutex_unlock(&wfc->LOCK_wait_commit); mysql_mutex_unlock(&wfc->LOCK_wait_commit);
if (wfc->wakeup_error)
{
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
DBUG_RETURN(-1);
}
} }
/* /*
...@@ -6669,7 +6691,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6669,7 +6691,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
then there is nothing else to do. then there is nothing else to do.
*/ */
if (orig_entry->queued_by_other) if (orig_entry->queued_by_other)
DBUG_RETURN(false); DBUG_RETURN(0);
/* Now enqueue ourselves in the group commit queue. */ /* Now enqueue ourselves in the group commit queue. */
DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue"); DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue");
...@@ -6841,13 +6863,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -6841,13 +6863,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
bool bool
MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{ {
bool is_leader= queue_for_group_commit(entry); int is_leader= queue_for_group_commit(entry);
/* /*
The first in the queue handles group commit for all; the others just wait The first in the queue handles group commit for all; the others just wait
to be signalled when group commit is done. to be signalled when group commit is done.
*/ */
if (is_leader) if (is_leader < 0)
return true; /* Error */
else if (is_leader)
trx_group_commit_leader(entry); trx_group_commit_leader(entry);
else if (!entry->queued_by_other) else if (!entry->queued_by_other)
entry->thd->wait_for_wakeup_ready(); entry->thd->wait_for_wakeup_ready();
......
...@@ -540,7 +540,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG ...@@ -540,7 +540,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
void do_checkpoint_request(ulong binlog_id); void do_checkpoint_request(ulong binlog_id);
void purge(); void purge();
int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id); int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
bool queue_for_group_commit(group_commit_entry *entry); int queue_for_group_commit(group_commit_entry *entry);
bool write_transaction_to_binlog_events(group_commit_entry *entry); bool write_transaction_to_binlog_events(group_commit_entry *entry);
void trx_group_commit_leader(group_commit_entry *leader); void trx_group_commit_leader(group_commit_entry *leader);
bool is_xidlist_idle_nolock(); bool is_xidlist_idle_nolock();
......
...@@ -142,7 +142,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id, ...@@ -142,7 +142,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
if (err) if (err)
wfc->unregister_wait_for_prior_commit(); wfc->unregister_wait_for_prior_commit();
else else
wfc->wait_for_prior_commit(); err= wfc->wait_for_prior_commit(thd);
thd->wait_for_commit_ptr= NULL; thd->wait_for_commit_ptr= NULL;
/* /*
......
...@@ -5783,13 +5783,28 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) ...@@ -5783,13 +5783,28 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
returns immediately. returns immediately.
*/ */
int int
wait_for_commit::wait_for_prior_commit2() wait_for_commit::wait_for_prior_commit2(THD *thd)
{ {
const char *old_msg;
mysql_mutex_lock(&LOCK_wait_commit); mysql_mutex_lock(&LOCK_wait_commit);
while (waiting_for_commit) old_msg= thd->enter_cond(&COND_wait_commit, &LOCK_wait_commit,
"Waiting for prior transaction to commit");
while (waiting_for_commit && !thd->check_killed())
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
mysql_mutex_unlock(&LOCK_wait_commit); thd->exit_cond(old_msg);
waitee= NULL; waitee= NULL;
if (!waiting_for_commit)
{
if (wakeup_error)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
return wakeup_error;
}
/* Wait was interrupted by kill, so give the error. */
wakeup_error= thd->killed_errno();
if (!wakeup_error)
wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wakeup_error, ER(wakeup_error), MYF(0));
return wakeup_error; return wakeup_error;
} }
......
...@@ -1627,14 +1627,14 @@ struct wait_for_commit ...@@ -1627,14 +1627,14 @@ struct wait_for_commit
bool wakeup_subsequent_commits_running; bool wakeup_subsequent_commits_running;
void register_wait_for_prior_commit(wait_for_commit *waitee); void register_wait_for_prior_commit(wait_for_commit *waitee);
int wait_for_prior_commit() int wait_for_prior_commit(THD *thd)
{ {
/* /*
Quick inline check, to avoid function call and locking in the common case Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled. where no wakeup is registered, or a registered wait was already signalled.
*/ */
if (waiting_for_commit) if (waiting_for_commit)
return wait_for_prior_commit2(); return wait_for_prior_commit2(thd);
else else
return wakeup_error; return wakeup_error;
} }
...@@ -1663,7 +1663,7 @@ struct wait_for_commit ...@@ -1663,7 +1663,7 @@ struct wait_for_commit
void wakeup(int wakeup_error); void wakeup(int wakeup_error);
int wait_for_prior_commit2(); int wait_for_prior_commit2(THD *thd);
void wakeup_subsequent_commits2(int wakeup_error); void wakeup_subsequent_commits2(int wakeup_error);
void unregister_wait_for_prior_commit2(); void unregister_wait_for_prior_commit2();
...@@ -3334,12 +3334,7 @@ class THD :public Statement, ...@@ -3334,12 +3334,7 @@ class THD :public Statement,
int wait_for_prior_commit() int wait_for_prior_commit()
{ {
if (wait_for_commit_ptr) if (wait_for_commit_ptr)
{ return wait_for_commit_ptr->wait_for_prior_commit(this);
int err= wait_for_commit_ptr->wait_for_prior_commit();
if (err)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
return err;
}
return 0; return 0;
} }
void wakeup_subsequent_commits(int wakeup_error) void wakeup_subsequent_commits(int wakeup_error)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment