Commit caa79081 authored by Sujatha's avatar Sujatha

MDEV-20707: Missing memory barrier in parallel replication error handler in wait_for_prior_commit()

revision-id: 673e253724979fd9fe43a4a22bd7e1b2c3a5269e
Author: Kristian Nielsen

Fix missing memory barrier in wait_for_commit.

The function wait_for_commit::wait_for_prior_commit() has a fast path where it
checks without locks if wakeup_subsequent_commits() has already been called.
This check was missing a memory barrier. The waitee thread does two writes to
variables `waitee' and `wakeup_error', and if the waiting thread sees the
first write it _must_ also see the second or incorrect behavior will occur.
This requires memory barriers between both the writes (release semantics) and
the reads (acquire semantics) of those two variables.

Other accesses to these variables are done under lock or where only one thread
will be accessing them, and can be done without barriers (relaxed semantics).
parent 49019dde
...@@ -7481,8 +7481,10 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -7481,8 +7481,10 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
*/ */
wfc= orig_entry->thd->wait_for_commit_ptr; wfc= orig_entry->thd->wait_for_commit_ptr;
orig_entry->queued_by_other= false; orig_entry->queued_by_other= false;
if (wfc && wfc->waitee) if (wfc && wfc->waitee.load(std::memory_order_acquire))
{ {
wait_for_commit *loc_waitee;
mysql_mutex_lock(&wfc->LOCK_wait_commit); mysql_mutex_lock(&wfc->LOCK_wait_commit);
/* /*
Do an extra check here, this time safely under lock. Do an extra check here, this time safely under lock.
...@@ -7494,10 +7496,10 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -7494,10 +7496,10 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
before setting the flag, so there is no risk that we can queue ahead of before setting the flag, so there is no risk that we can queue ahead of
it. it.
*/ */
if (wfc->waitee && !wfc->waitee->commit_started) if ((loc_waitee= wfc->waitee.load(std::memory_order_relaxed)) &&
!loc_waitee->commit_started)
{ {
PSI_stage_info old_stage; PSI_stage_info old_stage;
wait_for_commit *loc_waitee;
/* /*
By setting wfc->opaque_pointer to our own entry, we mark that we are By setting wfc->opaque_pointer to our own entry, we mark that we are
...@@ -7519,7 +7521,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -7519,7 +7521,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
&wfc->LOCK_wait_commit, &wfc->LOCK_wait_commit,
&stage_waiting_for_prior_transaction_to_commit, &stage_waiting_for_prior_transaction_to_commit,
&old_stage); &old_stage);
while ((loc_waitee= wfc->waitee) && !orig_entry->thd->check_killed(1)) while ((loc_waitee= wfc->waitee.load(std::memory_order_relaxed)) &&
!orig_entry->thd->check_killed(1))
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
wfc->opaque_pointer= NULL; wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d", DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
...@@ -7537,14 +7540,18 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -7537,14 +7540,18 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
do do
{ {
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
} while (wfc->waitee); } while (wfc->waitee.load(std::memory_order_relaxed));
} }
else else
{ {
/* We were killed, so remove us from the list of waitee. */ /* We were killed, so remove us from the list of waitee. */
wfc->remove_from_list(&loc_waitee->subsequent_commits_list); wfc->remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
wfc->waitee= NULL; /*
This is the thread clearing its own status, it is no longer on
the list of waiters. So no memory barriers are needed here.
*/
wfc->waitee.store(NULL, std::memory_order_relaxed);
orig_entry->thd->EXIT_COND(&old_stage); orig_entry->thd->EXIT_COND(&old_stage);
/* Interrupted by kill. */ /* Interrupted by kill. */
......
...@@ -7228,7 +7228,7 @@ wait_for_commit::reinit() ...@@ -7228,7 +7228,7 @@ wait_for_commit::reinit()
{ {
subsequent_commits_list= NULL; subsequent_commits_list= NULL;
next_subsequent_commit= NULL; next_subsequent_commit= NULL;
waitee= NULL; waitee.store(NULL, std::memory_order_relaxed);
opaque_pointer= NULL; opaque_pointer= NULL;
wakeup_error= 0; wakeup_error= 0;
wakeup_subsequent_commits_running= false; wakeup_subsequent_commits_running= false;
...@@ -7306,8 +7306,9 @@ wait_for_commit::wakeup(int wakeup_error) ...@@ -7306,8 +7306,9 @@ wait_for_commit::wakeup(int wakeup_error)
*/ */
mysql_mutex_lock(&LOCK_wait_commit); mysql_mutex_lock(&LOCK_wait_commit);
waitee= NULL;
this->wakeup_error= wakeup_error; this->wakeup_error= wakeup_error;
/* Memory barrier to make wakeup_error visible to the waiter thread. */
waitee.store(NULL, std::memory_order_release);
/* /*
Note that it is critical that the mysql_cond_signal() here is done while Note that it is critical that the mysql_cond_signal() here is done while
still holding the mutex. As soon as we release the mutex, the waiter might still holding the mutex. As soon as we release the mutex, the waiter might
...@@ -7338,9 +7339,10 @@ wait_for_commit::wakeup(int wakeup_error) ...@@ -7338,9 +7339,10 @@ wait_for_commit::wakeup(int wakeup_error)
void void
wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
{ {
DBUG_ASSERT(!this->waitee /* No prior registration allowed */); DBUG_ASSERT(!this->waitee.load(std::memory_order_relaxed)
/* No prior registration allowed */);
wakeup_error= 0; wakeup_error= 0;
this->waitee= waitee; this->waitee.store(waitee, std::memory_order_relaxed);
mysql_mutex_lock(&waitee->LOCK_wait_commit); mysql_mutex_lock(&waitee->LOCK_wait_commit);
/* /*
...@@ -7349,7 +7351,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) ...@@ -7349,7 +7351,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
see comments on wakeup_subsequent_commits2() for details. see comments on wakeup_subsequent_commits2() for details.
*/ */
if (waitee->wakeup_subsequent_commits_running) if (waitee->wakeup_subsequent_commits_running)
this->waitee= NULL; this->waitee.store(NULL, std::memory_order_relaxed);
else else
{ {
/* /*
...@@ -7379,7 +7381,8 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) ...@@ -7379,7 +7381,8 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
thd->ENTER_COND(&COND_wait_commit, &LOCK_wait_commit, thd->ENTER_COND(&COND_wait_commit, &LOCK_wait_commit,
&stage_waiting_for_prior_transaction_to_commit, &stage_waiting_for_prior_transaction_to_commit,
&old_stage); &old_stage);
while ((loc_waitee= this->waitee) && likely(!thd->check_killed(1))) while ((loc_waitee= this->waitee.load(std::memory_order_relaxed)) &&
likely(!thd->check_killed(1)))
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
if (!loc_waitee) if (!loc_waitee)
{ {
...@@ -7402,14 +7405,14 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) ...@@ -7402,14 +7405,14 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
do do
{ {
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
} while (this->waitee); } while (this->waitee.load(std::memory_order_relaxed));
if (wakeup_error) if (wakeup_error)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
goto end; goto end;
} }
remove_from_list(&loc_waitee->subsequent_commits_list); remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
this->waitee= NULL; this->waitee.store(NULL, std::memory_order_relaxed);
wakeup_error= thd->killed_errno(); wakeup_error= thd->killed_errno();
if (!wakeup_error) if (!wakeup_error)
...@@ -7511,7 +7514,7 @@ wait_for_commit::unregister_wait_for_prior_commit2() ...@@ -7511,7 +7514,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
wait_for_commit *loc_waitee; wait_for_commit *loc_waitee;
mysql_mutex_lock(&LOCK_wait_commit); mysql_mutex_lock(&LOCK_wait_commit);
if ((loc_waitee= this->waitee)) if ((loc_waitee= this->waitee.load(std::memory_order_relaxed)))
{ {
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running) if (loc_waitee->wakeup_subsequent_commits_running)
...@@ -7524,7 +7527,7 @@ wait_for_commit::unregister_wait_for_prior_commit2() ...@@ -7524,7 +7527,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
See comments on wakeup_subsequent_commits2() for more details. See comments on wakeup_subsequent_commits2() for more details.
*/ */
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
while (this->waitee) while (this->waitee.load(std::memory_order_relaxed))
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
} }
else else
...@@ -7532,7 +7535,7 @@ wait_for_commit::unregister_wait_for_prior_commit2() ...@@ -7532,7 +7535,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
/* Remove ourselves from the list in the waitee. */ /* Remove ourselves from the list in the waitee. */
remove_from_list(&loc_waitee->subsequent_commits_list); remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
this->waitee= NULL; this->waitee.store(NULL, std::memory_order_relaxed);
} }
} }
wakeup_error= 0; wakeup_error= 0;
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
/* Classes in mysql */ /* Classes in mysql */
#include <atomic>
#include "dur_prop.h" #include "dur_prop.h"
#include <waiting_threads.h> #include <waiting_threads.h>
#include "sql_const.h" #include "sql_const.h"
...@@ -2011,7 +2012,7 @@ struct wait_for_commit ...@@ -2011,7 +2012,7 @@ struct wait_for_commit
/* /*
The LOCK_wait_commit protects the fields subsequent_commits_list and The LOCK_wait_commit protects the fields subsequent_commits_list and
wakeup_subsequent_commits_running (for a waitee), and the pointer wakeup_subsequent_commits_running (for a waitee), and the pointer
waiterr and associated COND_wait_commit (for a waiter). waitee and associated COND_wait_commit (for a waiter).
*/ */
mysql_mutex_t LOCK_wait_commit; mysql_mutex_t LOCK_wait_commit;
mysql_cond_t COND_wait_commit; mysql_cond_t COND_wait_commit;
...@@ -2025,8 +2026,14 @@ struct wait_for_commit ...@@ -2025,8 +2026,14 @@ struct wait_for_commit
When this is cleared for wakeup, the COND_wait_commit condition is When this is cleared for wakeup, the COND_wait_commit condition is
signalled. signalled.
This pointer is protected by LOCK_wait_commit. But there is also a "fast
path" where the waiter compares this to NULL without holding the lock.
Such read must be done with acquire semantics (and all corresponding
writes done with release semantics). This ensures that a wakeup with error
is reliably detected as (waitee==NULL && wakeup_error != 0).
*/ */
wait_for_commit *waitee; std::atomic<wait_for_commit *> waitee;
/* /*
Generic pointer for use by the transaction coordinator to optimise the Generic pointer for use by the transaction coordinator to optimise the
waiting for improved group commit. waiting for improved group commit.
...@@ -2061,7 +2068,7 @@ struct wait_for_commit ...@@ -2061,7 +2068,7 @@ struct wait_for_commit
Quick inline check, to avoid function call and locking in the common case Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled. where no wakeup is registered, or a registered wait was already signalled.
*/ */
if (waitee) if (waitee.load(std::memory_order_acquire))
return wait_for_prior_commit2(thd); return wait_for_prior_commit2(thd);
else else
{ {
...@@ -2089,7 +2096,7 @@ struct wait_for_commit ...@@ -2089,7 +2096,7 @@ struct wait_for_commit
} }
void unregister_wait_for_prior_commit() void unregister_wait_for_prior_commit()
{ {
if (waitee) if (waitee.load(std::memory_order_relaxed))
unregister_wait_for_prior_commit2(); unregister_wait_for_prior_commit2();
else else
wakeup_error= 0; wakeup_error= 0;
...@@ -2111,7 +2118,7 @@ struct wait_for_commit ...@@ -2111,7 +2118,7 @@ struct wait_for_commit
} }
next_ptr_ptr= &cur->next_subsequent_commit; next_ptr_ptr= &cur->next_subsequent_commit;
} }
waitee= NULL; waitee.store(NULL, std::memory_order_relaxed);
} }
void wakeup(int wakeup_error); void wakeup(int wakeup_error);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment