Commit 75c7c6dc authored by Brandon Nesterenko's avatar Brandon Nesterenko

MDEV-33551: Semi-sync Wait Point AFTER_COMMIT Slow on Workloads with Heavy Concurrency

When using semi-sync replication with
rpl_semi_sync_master_wait_point=AFTER_COMMIT, the performance of the
primary can significantly reduce compared to AFTER_SYNC's
performance for workloads with many concurrent users executing
transactions. This is because all connections on the primary share
the same cond_wait variable/mutex pair, so any time an ACK is
received from a replica, all waiting connections are awoken to check
if the ACK was for itself, which is done in mutual exclusion.

This patch changes this such that the waiting THD will use its own
local condition variable, and the ACK receiver thread only signals
connections which have been ACKed for wakeup. That is, the
THD::LOCK_wakeup_ready condition variable is re-used for this
purpose, and the Active_tranx queue nodes are extended to hold the
waiting thread, so it can be signalled once ACKed.

Additionally:

 1)  Removed part of MDEV-11853 additions, which allowed suspended
connection threads awaiting their semi-sync ACKs to live until their
ACKs had been received. This part, however, wasn't needed.  That is,
all that was needed was for the Ack_thread to survive.  So now the
connection threads are killed during phase 1. Thereby
THD::is_awaiting_semisync_ack, and all its related code was removed.

 2) COND_binlog_send is repurposed to signal on the condition when
Active_tranx is emptied during clear_active_tranx_nodes.

 3) At master shutdown (when waiting for slaves), instead of the
main loop individually waiting for each ACK, await_slave_reply()
(renamed await_all_slave_replies()) just waits once for the
repurposed COND_binlog_send to signal it is empty.

 4) Test rpl_semi_sync_shutdown_await_ack is updates as following:
   4.1) Added test case (adapted from Kristian Nielsen) to ensure
that if a thread awaiting its ACK is killed while SHUTDOWN WAIT FOR
ALL SLAVES is issued, the primary will still wait for the ACK from
the killed thread.
   4.2) As connections which by-passed phase 1 of thread killing no
longer are delayed for kill until phase 2, we can no longer query
yes/no tx after receiving an ACK/timeout. The check for these
variables is removed.
   4.3) Comment descriptions are updated which mention that the
connection is alive; and adjusted to be the Ack_thread.

Reviewed By:
============
Kristian Nielsen <knielsen@knielsen-hq.org>
parent b8a67198
include/master-slave.inc
[connection master]
connection master;
call mtr.add_suppression("Got an error reading communication packets");
set @save_bgc_count= @@global.binlog_commit_wait_count;
set @save_bgc_usec= @@global.binlog_commit_wait_usec;
set @save_debug_dbug= @@global.debug_dbug;
set @@global.binlog_commit_wait_count=3;
set @@global.binlog_commit_wait_usec=10000000;
set @@global.debug_dbug="+d,testing_cond_var_per_thd";
# Ensure semi-sync is on
connection slave;
connection master;
# Create three transactions to binlog group commit together
connection master;
create table t1 (a int);
connection server_1;
create table t2 (a int);
connection default;
create table t3 (a int);
connection master;
connection server_1;
connection default;
include/assert_grep.inc [Check that there is no 'Thread awaiting semi-sync ACK was awoken before its ACK' warning in error log.]
#
# Cleanup
connection master;
set @@global.binlog_commit_wait_count=@save_bgc_count;
set @@global.binlog_commit_wait_usec=@save_bgc_usec;
set @@global.debug_dbug=@save_debug_dbug;
drop table t1, t2, t3;
include/rpl_end.inc
......@@ -25,7 +25,7 @@ call mtr.add_suppression("Failed to kill the active semi-sync connection");
set @sav_enabled_server_3= @@GLOBAL.rpl_semi_sync_slave_enabled;
set @sav_server_3_dbug= @@GLOBAL.debug_dbug;
connection server_1;
CREATE TABLE t1 (a int);
CREATE TABLE t1 (a int) engine=innodb;
connection server_2;
connection server_3;
connect server_1_con2, localhost, root,,;
......@@ -34,8 +34,8 @@ connect server_1_con2, localhost, root,,;
#############################
#
# Test Case 1) If both replicas simulate a delay that is within the
# allowed timeout, the primary should delay killing the suspended thread
# until an ACK is received (Rpl_semi_sync_master_yes_tx should be 1).
# allowed timeout, the primary should delay killing the Ack_thread
# until an ACK is received.
#
connection server_1;
#--
......@@ -78,8 +78,6 @@ SET @@GLOBAL.debug_dbug= "+d,simulate_delay_semisync_slave_reply";
#--
#-- Test begins
connection server_1_con2;
#-- Give enough time after timeout/ack received to query yes_tx/no_tx
SET @@GLOBAL.debug_dbug= "+d,delay_shutdown_phase_2_after_semisync_wait";
connection server_1;
#-- Begin semi-sync transaction
INSERT INTO t1 VALUES (1);
......@@ -88,14 +86,7 @@ connection server_1_con2;
#-- Begin master shutdown
SHUTDOWN WAIT FOR ALL SLAVES;
connection server_1;
#-- Ensure either ACK was received (yes_tx=1) or timeout (no_tx=1)
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 1
show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 0
connection server_1_con2;
ERROR HY000: Lost connection to server during query
# Check logs to ensure shutdown was delayed
FOUND 1 /Delaying shutdown to await semi-sync ACK/ in mysqld.1.err
# Validate slave data is in correct state
......@@ -144,8 +135,8 @@ COUNT(*)=0
1
#
# Test Case 2) If both replicas simulate an error before sending an ACK,
# the primary should delay killing the suspended thread until the
# timeout is reached (Rpl_semi_sync_master_no_tx should be 1).
# the primary should delay killing the Ack_thread until the
# timeout is reached.
#
connection server_1;
#--
......@@ -188,8 +179,6 @@ SET @@GLOBAL.debug_dbug= "+d,corrupt_queue_event,delay_semisync_kill_connection_
#--
#-- Test begins
connection server_1_con2;
#-- Give enough time after timeout/ack received to query yes_tx/no_tx
SET @@GLOBAL.debug_dbug= "+d,delay_shutdown_phase_2_after_semisync_wait";
connection server_1;
#-- Begin semi-sync transaction
INSERT INTO t1 VALUES (1);
......@@ -198,14 +187,7 @@ connection server_1_con2;
#-- Begin master shutdown
SHUTDOWN WAIT FOR ALL SLAVES;
connection server_1;
#-- Ensure either ACK was received (yes_tx=1) or timeout (no_tx=1)
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 0
show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 1
connection server_1_con2;
ERROR HY000: Lost connection to server during query
# Check logs to ensure shutdown was delayed
FOUND 2 /Delaying shutdown to await semi-sync ACK/ in mysqld.1.err
# Validate slave data is in correct state
......@@ -267,8 +249,8 @@ COUNT(*)=0
#
# Test Case 3) If one replica simulates a delay within the allowed
# timeout and the other simulates an error before sending an ACK, the
# primary should delay killing the suspended thread until it receives an
# ACK from the delayed slave (Rpl_semi_sync_master_yes_tx should be 1).
# primary should delay killing the Ack_thread until it receives an
# ACK from the delayed slave.
#
connection server_1;
#--
......@@ -311,8 +293,6 @@ SET @@GLOBAL.debug_dbug= "+d,simulate_delay_semisync_slave_reply";
#--
#-- Test begins
connection server_1_con2;
#-- Give enough time after timeout/ack received to query yes_tx/no_tx
SET @@GLOBAL.debug_dbug= "+d,delay_shutdown_phase_2_after_semisync_wait";
connection server_1;
#-- Begin semi-sync transaction
INSERT INTO t1 VALUES (1);
......@@ -321,14 +301,7 @@ connection server_1_con2;
#-- Begin master shutdown
SHUTDOWN WAIT FOR ALL SLAVES;
connection server_1;
#-- Ensure either ACK was received (yes_tx=1) or timeout (no_tx=1)
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 1
show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 0
connection server_1_con2;
ERROR HY000: Lost connection to server during query
# Check logs to ensure shutdown was delayed
FOUND 3 /Delaying shutdown to await semi-sync ACK/ in mysqld.1.err
# Validate slave data is in correct state
......@@ -391,8 +364,7 @@ COUNT(*)=0
# active semi-sync connection in-tact. The slave should notice this, and
# not issue a `QUIT` command to the primary, which would otherwise be
# sent to kill an active connection. This test case validates that the
# slave does not send a `QUIT` in this case (Rpl_semi_sync_master_yes_tx
# should be 1 because server_3 will send the ACK within a valid timeout).
# slave does not send a `QUIT` in this case.
#
connection server_1;
#--
......@@ -435,8 +407,6 @@ SET @@GLOBAL.debug_dbug= "+d,simulate_delay_semisync_slave_reply";
#--
#-- Test begins
connection server_1_con2;
#-- Give enough time after timeout/ack received to query yes_tx/no_tx
SET @@GLOBAL.debug_dbug= "+d,delay_shutdown_phase_2_after_semisync_wait";
connection server_1;
#-- Begin semi-sync transaction
INSERT INTO t1 VALUES (1);
......@@ -445,14 +415,7 @@ connection server_1_con2;
#-- Begin master shutdown
SHUTDOWN WAIT FOR ALL SLAVES;
connection server_1;
#-- Ensure either ACK was received (yes_tx=1) or timeout (no_tx=1)
show status like 'Rpl_semi_sync_master_yes_tx';
Variable_name Value
Rpl_semi_sync_master_yes_tx 1
show status like 'Rpl_semi_sync_master_no_tx';
Variable_name Value
Rpl_semi_sync_master_no_tx 0
connection server_1_con2;
ERROR HY000: Lost connection to server during query
# Check logs to ensure shutdown was delayed
FOUND 4 /Delaying shutdown to await semi-sync ACK/ in mysqld.1.err
# Validate slave data is in correct state
......@@ -506,16 +469,60 @@ Rpl_semi_sync_slave_status OFF
SELECT COUNT(*)=0 from t1;
COUNT(*)=0
1
#
# Test Case 5) If a waiting-for-ACK user thread is killed (disconnected)
# during SHUTDOWN WAIT FOR ALL SLAVES, ensure the primary will still
# await the ACK from the replica before killing the Ack_receiver thread
#
connection server_1;
insert into t1 values (1);
include/save_master_gtid.inc
connection server_2;
include/sync_with_master_gtid.inc
include/stop_slave.inc
SET GLOBAL rpl_semi_sync_slave_enabled= 1;
include/start_slave.inc
connection server_1;
SET GLOBAL rpl_semi_sync_master_enabled= 1;
SET GLOBAL rpl_semi_sync_master_timeout= 2000;
show status like 'Rpl_semi_sync_master_status';
Variable_name Value
Rpl_semi_sync_master_status ON
show status like 'Rpl_semi_sync_master_clients';
Variable_name Value
Rpl_semi_sync_master_clients 1
connection server_2;
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,simulate_delay_semisync_slave_reply";
connect con1, localhost, root,,;
connect con2, localhost, root,,;
connection con1;
insert into t1 values (2);
connection server_1;
# Wait for thd to begin semi-sync wait..
# ..done
disconnect con1;
connection default;
connection con2;
SHUTDOWN WAIT FOR ALL SLAVES;
connection server_2;
include/assert_grep.inc [Ensure the primary waited for the ACK of the killed thread]
connection default;
connection server_1;
connection server_2;
include/stop_slave.inc
connection server_3;
include/stop_slave.inc
connection default;
connection server_1;
#############################
# Cleanup
#############################
connection server_2;
include/stop_slave.inc
SET @@GLOBAL.rpl_semi_sync_slave_enabled = @sav_enabled_server_2;
SET @@GLOBAL.debug_dbug= @sav_server_2_dbug;
include/start_slave.inc
connection server_3;
include/stop_slave.inc
SET @@GLOBAL.rpl_semi_sync_slave_enabled = @sav_enabled_server_3;
SET @@GLOBAL.debug_dbug= @sav_server_3_dbug;
include/start_slave.inc
......
!include ../my.cnf
[mysqld.1]
log-warnings=9
rpl_semi_sync_master_enabled=1
rpl_semi_sync_master_wait_point=AFTER_COMMIT
[mysqld.2]
log-warnings=9
rpl_semi_sync_slave_enabled=1
#
# This test ensures that, when using semi-sync with the wait_point
# AFTER_COMMIT, each thread awaiting an ACK is only woken up when its ACK (or
# an ACK for a later commit in binlog) has been received from the slave.
#
# Prior to MDEV-33551, all threads would be woken up for each ACK received,
# leading to large slowdowns, as each thread would check if the ACK was for it
# in mutual exclusion from the others.
#
# To ensure this, a debug-build-only log warning is added into
# Repl_semi_sync_master::commit_trx() at wakeup time, which will complain if
# the awoken thread's binlog wait coordinates are after the coordinate of the
# last ACK coordinates. Then, we use binlog group commit to commit a series of
# transactions, such that each will await an ACK concurrently. After all
# transactions have been finished (i.e. ACKed and committed), we check the log
# for the expected absence of the added debug warning message.
#
#
# References:
# MDEV-33551: Semi-sync Wait Point AFTER_COMMIT Slow on Workloads with Heavy
# Concurrency
#
--source include/have_binlog_format_row.inc
--source include/have_debug.inc
--source include/master-slave.inc
--connection master
call mtr.add_suppression("Got an error reading communication packets");
set @save_bgc_count= @@global.binlog_commit_wait_count;
set @save_bgc_usec= @@global.binlog_commit_wait_usec;
set @save_debug_dbug= @@global.debug_dbug;
set @@global.binlog_commit_wait_count=3;
set @@global.binlog_commit_wait_usec=10000000;
set @@global.debug_dbug="+d,testing_cond_var_per_thd";
--echo # Ensure semi-sync is on
--connection slave
let $status_var= rpl_semi_sync_slave_status;
let $status_var_value= ON;
source include/wait_for_status_var.inc;
--connection master
let $status_var= rpl_semi_sync_master_status;
let $status_var_value= ON;
source include/wait_for_status_var.inc;
--echo # Create three transactions to binlog group commit together
--connection master
--send create table t1 (a int)
--connection server_1
--send create table t2 (a int)
--connection default
--send create table t3 (a int)
--connection master
--reap
--connection server_1
--reap
--connection default
--reap
--let $assert_text= Check that there is no 'Thread awaiting semi-sync ACK was awoken before its ACK' warning in error log.
--let $assert_select=Thread awaiting semi-sync ACK was awoken before its ACK
--let $assert_file= $MYSQLTEST_VARDIR/log/mysqld.1.err
--let $assert_count= 0
--let $assert_only_after=CURRENT_TEST
--source include/assert_grep.inc
--echo #
--echo # Cleanup
--connection master
set @@global.binlog_commit_wait_count=@save_bgc_count;
set @@global.binlog_commit_wait_usec=@save_bgc_usec;
set @@global.debug_dbug=@save_debug_dbug;
drop table t1, t2, t3;
--source include/rpl_end.inc
......@@ -64,9 +64,6 @@ show status like 'Rpl_semi_sync_master_clients';
--echo #-- Test begins
--connection server_1_con2
--echo #-- Give enough time after timeout/ack received to query yes_tx/no_tx
SET @@GLOBAL.debug_dbug= "+d,delay_shutdown_phase_2_after_semisync_wait";
--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
wait
EOF
......@@ -82,15 +79,11 @@ let $status_var_value= 1;
source include/wait_for_status_var.inc;
--echo #-- Begin master shutdown
--send SHUTDOWN WAIT FOR ALL SLAVES
SHUTDOWN WAIT FOR ALL SLAVES;
--source include/wait_until_disconnected.inc
--connection server_1
--reap
--echo #-- Ensure either ACK was received (yes_tx=1) or timeout (no_tx=1)
show status like 'Rpl_semi_sync_master_yes_tx';
show status like 'Rpl_semi_sync_master_no_tx';
--connection server_1_con2
--error 2013
--reap
--source include/wait_until_disconnected.inc
......
......@@ -5,7 +5,7 @@
# This test validates that data is consistent between a primary and replica
# in semi-sync mode when the primary is issued `SHUTDOWN WAIT FOR SLAVES`
# during an active communication. More specifically, the primary should not
# kill the connection until it is sure a replica has received all binlog
# kill the Ack_thread until it is sure a replica has received all binlog
# data, i.e. once the primary receives the ACK. If a primary is issued a
# shutdown before receiving an ACK, it should wait until either 1) the ACK is
# received, or 2) the configured timeout (rpl_semi_sync_master_timeout) is
......@@ -15,23 +15,18 @@
# Using a topology consisting of one primary with two replicas, all in
# semi-sync mode, we use DEBUG_DBUG to simulate an error or delay on the
# replicas during an active communication while the primary is issued
# `SHUTDOWN WAIT FOR SLAVES`. We create four test cases to ensure the primary
# will correctly wait for the communication to finish, and use the semi-sync
# status variables Rpl_semi_sync_master_yes_tx and Rpl_semi_sync_master_no_tx
# to ensure the connection was not prematurely killed due to the shutdown.
# `SHUTDOWN WAIT FOR SLAVES`. We create four test cases to ensure the
# Ack_thread is not prematurely killed due to the shutdown.
# Test Case 1) If both replicas simulate a delay that is within the allowed
# timeout, the primary should delay killing the suspended thread
# until an ACK is received (Rpl_semi_sync_master_yes_tx should
# be 1).
# timeout, the primary should delay killing the Ack_thread
# until an ACK is received.
# Test Case 2) If both replicas simulate an error before sending an ACK, the
# primary should delay killing the suspended thread until the
# the timeout is reached (Rpl_semi_sync_master_no_tx should be
# 1).
# primary should delay killing the Ack_thread until the
# the timeout is reached.
# Test Case 3) If one replica simulates a delay within the allowed timeout
# and the other simulates an error before sending an ACK, the
# primary should delay killing the suspended thread until it
# receives an ACK from the delayed slave
# (Rpl_semi_sync_master_yes_tx should be 1).
# primary should delay killing the Ack_thread until it
# receives an ACK from the delayed slave.
# Test Case 4) If a replica errors before sending an ACK, it will cause the
# IO thread to stop and handle the error. During error handling,
# if semi-sync is active, the replica will form a new connection
......@@ -41,9 +36,11 @@
# slave should notice this, and not issue a `QUIT` command to
# the primary, which would otherwise be sent to kill an active
# connection. This test case validates that the slave does not
# send a `QUIT` in this case (Rpl_semi_sync_master_yes_tx should
# be 1 because server_3 will send the ACK within a valid
# timeout).
# send a `QUIT` in this case.
# Test Case 5) If a waiting-for-ACK user thread is killed (disconnected)
# during SHUTDOWN WAIT FOR ALL SLAVES, ensure the primary will
# still await the ACK from the replica before killing the
# Ack_thread.
#
# References:
# MDEV-11853: semisync thread can be killed after sync binlog but before ACK
......@@ -58,6 +55,7 @@
--echo # Note: Simulated slave delay is hardcoded to 800 milliseconds
--echo # Note: Simulated master shutdown delay is hardcoded to 500 milliseconds
--source include/have_innodb.inc
--source include/have_debug.inc
--let $rpl_topology=1->2, 1->3
--source include/rpl_init.inc
......@@ -90,7 +88,7 @@ set @sav_enabled_server_3= @@GLOBAL.rpl_semi_sync_slave_enabled;
set @sav_server_3_dbug= @@GLOBAL.debug_dbug;
--connection server_1
CREATE TABLE t1 (a int);
CREATE TABLE t1 (a int) engine=innodb;
--save_master_pos
--let i= 2
......@@ -112,8 +110,8 @@ while (`SELECT $i <= $slave_last`)
--echo #
--echo # Test Case 1) If both replicas simulate a delay that is within the
--echo # allowed timeout, the primary should delay killing the suspended thread
--echo # until an ACK is received (Rpl_semi_sync_master_yes_tx should be 1).
--echo # allowed timeout, the primary should delay killing the Ack_thread
--echo # until an ACK is received.
--echo #
--let server_2_dbug= "+d,simulate_delay_semisync_slave_reply"
--let server_3_dbug= "+d,simulate_delay_semisync_slave_reply"
......@@ -124,8 +122,8 @@ while (`SELECT $i <= $slave_last`)
--echo #
--echo # Test Case 2) If both replicas simulate an error before sending an ACK,
--echo # the primary should delay killing the suspended thread until the
--echo # timeout is reached (Rpl_semi_sync_master_no_tx should be 1).
--echo # the primary should delay killing the Ack_thread until the
--echo # timeout is reached.
--echo #
--let server_2_dbug= "+d,corrupt_queue_event,delay_semisync_kill_connection_for_mdev_28141"
--let server_3_dbug= "+d,corrupt_queue_event,delay_semisync_kill_connection_for_mdev_28141"
......@@ -137,8 +135,8 @@ while (`SELECT $i <= $slave_last`)
--echo #
--echo # Test Case 3) If one replica simulates a delay within the allowed
--echo # timeout and the other simulates an error before sending an ACK, the
--echo # primary should delay killing the suspended thread until it receives an
--echo # ACK from the delayed slave (Rpl_semi_sync_master_yes_tx should be 1).
--echo # primary should delay killing the Ack_thread until it receives an
--echo # ACK from the delayed slave.
--echo #
--let server_2_dbug= "+d,corrupt_queue_event,delay_semisync_kill_connection_for_mdev_28141"
--let server_3_dbug= "+d,simulate_delay_semisync_slave_reply"
......@@ -156,8 +154,7 @@ while (`SELECT $i <= $slave_last`)
--echo # active semi-sync connection in-tact. The slave should notice this, and
--echo # not issue a `QUIT` command to the primary, which would otherwise be
--echo # sent to kill an active connection. This test case validates that the
--echo # slave does not send a `QUIT` in this case (Rpl_semi_sync_master_yes_tx
--echo # should be 1 because server_3 will send the ACK within a valid timeout).
--echo # slave does not send a `QUIT` in this case.
--echo #
--let server_2_dbug= "+d,corrupt_queue_event,delay_semisync_kill_connection_for_mdev_28141"
--let server_3_dbug= "+d,simulate_delay_semisync_slave_reply"
......@@ -166,18 +163,108 @@ while (`SELECT $i <= $slave_last`)
--let server_3_expect_row_count= 1
--source rpl_semi_sync_shutdown_await_ack.inc
#
# Added with MDEV-33551
#
--echo #
--echo # Test Case 5) If a waiting-for-ACK user thread is killed (disconnected)
--echo # during SHUTDOWN WAIT FOR ALL SLAVES, ensure the primary will still
--echo # await the ACK from the replica before killing the Ack_receiver thread
--echo #
--connection server_1
insert into t1 values (1);
--source include/save_master_gtid.inc
--connection server_2
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
SET GLOBAL rpl_semi_sync_slave_enabled= 1;
--source include/start_slave.inc
--connection server_1
SET GLOBAL rpl_semi_sync_master_enabled= 1;
SET GLOBAL rpl_semi_sync_master_timeout= 2000;
--let $status_var= Rpl_semi_sync_master_clients
--let $status_var_value= 1
source include/wait_for_status_var.inc;
show status like 'Rpl_semi_sync_master_status';
show status like 'Rpl_semi_sync_master_clients';
--connection server_2
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,simulate_delay_semisync_slave_reply";
--connect(con1, localhost, root,,)
--connect(con2, localhost, root,,)
--connection con1
--send insert into t1 values (2)
--connection server_1
--echo # Wait for thd to begin semi-sync wait..
--let $wait_condition= SELECT COUNT(*) = 1 FROM information_schema.processlist WHERE state = 'Waiting for semi-sync ACK from slave'
--source include/wait_condition.inc
--source include/wait_condition.inc
--echo # ..done
--disconnect con1
--connection default
--write_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
wait
EOF
--connection con2
SHUTDOWN WAIT FOR ALL SLAVES;
--source include/wait_until_disconnected.inc
# Run assert_grep on server_2 as it uses SQL commands for verification, but
# server_1 has gone away
--connection server_2
--let $assert_text= Ensure the primary waited for the ACK of the killed thread
--let $assert_select= Delaying shutdown to await semi-sync ACK
--let $assert_file= $MYSQLTEST_VARDIR/log/mysqld.1.err
--let $assert_count= 5
--let $assert_only_after=CURRENT_TEST
--source include/assert_grep.inc
--connection default
--source include/wait_until_disconnected.inc
--connection server_1
--source include/wait_until_disconnected.inc
--connection server_2
--let $rpl_allow_error= 1
source include/stop_slave.inc;
--connection server_3
source include/stop_slave.inc;
--let $rpl_allow_error=
--connection default
--append_file $MYSQLTEST_VARDIR/tmp/mysqld.1.expect
restart
EOF
--enable_reconnect
--source include/wait_until_connected_again.inc
--connection server_1
--enable_reconnect
--source include/wait_until_connected_again.inc
--echo #############################
--echo # Cleanup
--echo #############################
--connection server_2
source include/stop_slave.inc;
SET @@GLOBAL.rpl_semi_sync_slave_enabled = @sav_enabled_server_2;
SET @@GLOBAL.debug_dbug= @sav_server_2_dbug;
source include/start_slave.inc;
--connection server_3
source include/stop_slave.inc;
SET @@GLOBAL.rpl_semi_sync_slave_enabled = @sav_enabled_server_3;
SET @@GLOBAL.debug_dbug= @sav_server_3_dbug;
source include/start_slave.inc;
......
......@@ -6870,8 +6870,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
#ifdef HAVE_REPLICATION
if (repl_semisync_master.report_binlog_update(thd, log_file_name,
file->pos_in_file))
if (repl_semisync_master.report_binlog_update(
thd, thd, log_file_name, file->pos_in_file))
{
sql_print_error("Failed to run 'after_flush' hooks");
error= 1;
......@@ -8465,9 +8465,19 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
for (current= queue; current != NULL; current= current->next)
{
#ifdef HAVE_REPLICATION
/*
The thread which will await the ACK from the replica can change
depending on the wait-point. If AFTER_COMMIT, then the user thread
will perform the wait. If AFTER_SYNC, the binlog group commit leader
will perform the wait on behalf of the user thread.
*/
THD *waiter_thd= (repl_semisync_master.wait_point() ==
SEMI_SYNC_MASTER_WAIT_POINT_AFTER_STORAGE_COMMIT)
? current->thd
: leader->thd;
if (likely(!current->error) &&
unlikely(repl_semisync_master.
report_binlog_update(current->thd,
report_binlog_update(current->thd, waiter_thd,
current->cache_mngr->
last_commit_pos_file,
current->cache_mngr->
......
......@@ -1538,15 +1538,12 @@ static void kill_thread(THD *thd)
/**
First shutdown everything but slave threads and binlog dump connections
*/
static my_bool kill_thread_phase_1(THD *thd, int *n_threads_awaiting_ack)
static my_bool kill_thread_phase_1(THD *thd, void *)
{
DBUG_PRINT("quit", ("Informing thread %ld that it's time to die",
(ulong) thd->thread_id));
if (thd->slave_thread || thd->is_binlog_dump_thread() ||
(shutdown_wait_for_slaves &&
repl_semisync_master.is_thd_awaiting_semisync_ack(thd) &&
++(*n_threads_awaiting_ack)))
if (thd->slave_thread || thd->is_binlog_dump_thread())
return 0;
if (DBUG_EVALUATE_IF("only_kill_system_threads", !thd->system_thread, 0))
......@@ -1745,29 +1742,18 @@ static void close_connections(void)
This will give the threads some time to gracefully abort their
statements and inform their clients that the server is about to die.
*/
int n_threads_awaiting_ack= 0;
server_threads.iterate(kill_thread_phase_1, &n_threads_awaiting_ack);
server_threads.iterate(kill_thread_phase_1);
/*
If we are waiting on any ACKs, delay killing the thread until either an ACK
is received or the timeout is hit.
Allow at max the number of sessions to await a timeout; however, if all
ACKs have been received in less iterations, then quit early
*/
if (shutdown_wait_for_slaves && repl_semisync_master.get_master_enabled())
{
int waiting_threads= repl_semisync_master.sync_get_master_wait_sessions();
if (waiting_threads)
sql_print_information("Delaying shutdown to await semi-sync ACK");
while (waiting_threads-- > 0)
repl_semisync_master.await_slave_reply();
repl_semisync_master.await_all_slave_replies(
"Delaying shutdown to await semi-sync ACK");
}
DBUG_EXECUTE_IF("delay_shutdown_phase_2_after_semisync_wait",
my_sleep(500000););
Events::deinit();
slave_prepare_for_shutdown();
ack_receiver.stop();
......@@ -1788,7 +1774,7 @@ static void close_connections(void)
*/
DBUG_PRINT("info", ("THD_count: %u", THD_count::value()));
for (int i= 0; THD_count::connection_thd_count() - n_threads_awaiting_ack
for (int i= 0; THD_count::connection_thd_count()
&& i < 1000 &&
DBUG_EVALUATE_IF("only_kill_system_threads_no_loop", 0, 1);
i++)
......@@ -1806,10 +1792,9 @@ static void close_connections(void)
#endif
/* All threads has now been aborted */
DBUG_PRINT("quit", ("Waiting for threads to die (count=%u)",
THD_count::value() - binlog_dump_thread_count -
n_threads_awaiting_ack));
THD_count::value() - binlog_dump_thread_count));
while (THD_count::connection_thd_count() - n_threads_awaiting_ack &&
while (THD_count::connection_thd_count() &&
DBUG_EVALUATE_IF("only_kill_system_threads_no_loop", 0, 1))
{
my_sleep(1000);
......
......@@ -68,6 +68,19 @@ static ulonglong timespec_to_usec(const struct timespec *ts)
return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
}
int signal_waiting_transaction(THD *waiting_thd, const char *binlog_file,
my_off_t binlog_pos)
{
/*
It is possible that the connection thd waiting for an ACK was killed. In
such circumstance, the connection thread will nullify the thd member of its
Active_tranx node. So before we try to signal, ensure the THD exists.
*/
if (waiting_thd)
mysql_cond_signal(&waiting_thd->COND_wakeup_ready);
return 0;
}
/*******************************************************************************
*
* <Active_tranx> class : manage all active transaction nodes
......@@ -75,12 +88,14 @@ static ulonglong timespec_to_usec(const struct timespec *ts)
******************************************************************************/
Active_tranx::Active_tranx(mysql_mutex_t *lock,
mysql_cond_t *cond,
ulong trace_level)
: Trace(trace_level), m_allocator(max_connections),
m_num_entries(max_connections << 1), /* Transaction hash table size
* is set to double the size
* of max_connections */
m_lock(lock)
m_lock(lock),
m_cond_empty(cond)
{
/* No transactions are in the list initially. */
m_trx_front = NULL;
......@@ -142,7 +157,8 @@ int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
return 0;
}
int Active_tranx::insert_tranx_node(const char *log_file_name,
int Active_tranx::insert_tranx_node(THD *thd_to_wait,
const char *log_file_name,
my_off_t log_file_pos)
{
Tranx_node *ins_node;
......@@ -165,6 +181,7 @@ int Active_tranx::insert_tranx_node(const char *log_file_name,
strncpy(ins_node->log_name, log_file_name, FN_REFLEN-1);
ins_node->log_name[FN_REFLEN-1] = 0; /* make sure it ends properly */
ins_node->log_pos = log_file_pos;
ins_node->thd= thd_to_wait;
if (!m_trx_front)
{
......@@ -232,28 +249,22 @@ bool Active_tranx::is_tranx_end_pos(const char *log_file_name,
DBUG_RETURN(entry != NULL);
}
void Active_tranx::clear_active_tranx_nodes(const char *log_file_name,
my_off_t log_file_pos)
void Active_tranx::clear_active_tranx_nodes(
const char *log_file_name, my_off_t log_file_pos,
active_tranx_action pre_delete_hook)
{
Tranx_node *new_front;
DBUG_ENTER("Active_tranx::::clear_active_tranx_nodes");
if (log_file_name != NULL)
{
new_front = m_trx_front;
while (new_front)
{
if (compare(new_front, log_file_name, log_file_pos) > 0)
break;
new_front = new_front->next;
}
}
else
new_front= m_trx_front;
while (new_front)
{
/* If log_file_name is NULL, clear everything. */
new_front = NULL;
if ((log_file_name != NULL) &&
compare(new_front, log_file_name, log_file_pos) > 0)
break;
pre_delete_hook(new_front->thd, new_front->log_name, new_front->log_pos);
new_front = new_front->next;
}
if (new_front == NULL)
......@@ -315,9 +326,66 @@ void Active_tranx::clear_active_tranx_nodes(const char *log_file_name,
m_trx_front->log_name, (ulong)m_trx_front->log_pos));
}
/*
m_cond_empty aliases Repl_semi_sync_master::COND_binlog, which holds the
condition variable to notify that we have cleared all nodes, e.g. used by
SHUTDOWN WAIT FOR ALL SLAVES.
*/
if (is_empty())
mysql_cond_signal(m_cond_empty);
DBUG_VOID_RETURN;
}
void Active_tranx::unlink_thd_as_waiter(const char *log_file_name,
my_off_t log_file_pos)
{
DBUG_ENTER("Active_tranx::unlink_thd_as_waiter");
mysql_mutex_assert_owner(m_lock);
unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
Tranx_node *entry = m_trx_htb[hash_val];
while (entry != NULL)
{
if (compare(entry, log_file_name, log_file_pos) == 0)
break;
entry = entry->hash_next;
}
if (entry)
entry->thd= NULL;
DBUG_VOID_RETURN;
}
#ifndef DBUG_OFF
void Active_tranx::assert_thd_is_waiter(THD *thd_to_check,
const char *log_file_name,
my_off_t log_file_pos)
{
DBUG_ENTER("Active_tranx::assert_thd_is_waiter");
mysql_mutex_assert_owner(m_lock);
unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
Tranx_node *entry = m_trx_htb[hash_val];
while (entry != NULL)
{
if (compare(entry, log_file_name, log_file_pos) == 0)
break;
entry = entry->hash_next;
}
DBUG_ASSERT(entry);
DBUG_ASSERT(entry->thd);
DBUG_ASSERT(entry->thd->thread_id == thd_to_check->thread_id);
DBUG_VOID_RETURN;
}
#endif
/*******************************************************************************
*
......@@ -397,7 +465,8 @@ int Repl_semi_sync_master::enable_master()
if (!get_master_enabled())
{
m_active_tranxs = new Active_tranx(&LOCK_binlog, m_trace_level);
m_active_tranxs=
new Active_tranx(&LOCK_binlog, &COND_binlog_send, m_trace_level);
if (m_active_tranxs != NULL)
{
m_commit_file_name_inited = false;
......@@ -459,15 +528,6 @@ void Repl_semi_sync_master::cleanup()
delete m_active_tranxs;
}
int Repl_semi_sync_master::sync_get_master_wait_sessions()
{
int wait_sessions;
lock();
wait_sessions= rpl_semi_sync_master_wait_sessions;
unlock();
return wait_sessions;
}
void Repl_semi_sync_master::create_timeout(struct timespec *out,
struct timespec *start_arg)
{
......@@ -500,23 +560,6 @@ void Repl_semi_sync_master::unlock()
mysql_mutex_unlock(&LOCK_binlog);
}
void Repl_semi_sync_master::cond_broadcast()
{
mysql_cond_broadcast(&COND_binlog_send);
}
int Repl_semi_sync_master::cond_timewait(struct timespec *wait_time)
{
int wait_res;
DBUG_ENTER("Repl_semi_sync_master::cond_timewait()");
wait_res= mysql_cond_timedwait(&COND_binlog_send,
&LOCK_binlog, wait_time);
DBUG_RETURN(wait_res);
}
void Repl_semi_sync_master::add_slave()
{
lock();
......@@ -533,7 +576,8 @@ void Repl_semi_sync_master::remove_slave()
Signal transactions waiting in commit_trx() that they do not have to
wait anymore.
*/
cond_broadcast();
m_active_tranxs->clear_active_tranx_nodes(NULL, 0,
signal_waiting_transaction);
}
unlock();
}
......@@ -616,7 +660,6 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,
my_off_t log_file_pos)
{
int cmp;
bool can_release_threads = false;
bool need_copy_send_pos = true;
DBUG_ENTER("Repl_semi_sync_master::report_reply_binlog");
......@@ -668,45 +711,26 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,
/* Remove all active transaction nodes before this point. */
DBUG_ASSERT(m_active_tranxs != NULL);
m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos);
m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos,
signal_waiting_transaction);
if (m_active_tranxs->is_empty())
m_wait_file_name_inited= false;
DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)",
"Repl_semi_sync_master::report_reply_binlog",
log_file_name, (ulong)log_file_pos));
}
if (rpl_semi_sync_master_wait_sessions > 0)
{
/* Let us check if some of the waiting threads doing a trx
* commit can now proceed.
*/
cmp = Active_tranx::compare(m_reply_file_name, m_reply_file_pos,
m_wait_file_name, m_wait_file_pos);
if (cmp >= 0)
{
/* Yes, at least one waiting thread can now proceed:
* let us release all waiting threads with a broadcast
*/
can_release_threads = true;
m_wait_file_name_inited = false;
}
}
l_end:
unlock();
if (can_release_threads)
{
DBUG_PRINT("semisync", ("%s: signal all waiting threads.",
"Repl_semi_sync_master::report_reply_binlog"));
cond_broadcast();
}
DBUG_RETURN(0);
}
int Repl_semi_sync_master::wait_after_sync(const char *log_file, my_off_t log_pos)
int Repl_semi_sync_master::wait_after_sync(const char *log_file,
my_off_t log_pos)
{
if (!get_master_enabled())
return 0;
......@@ -762,24 +786,27 @@ int Repl_semi_sync_master::wait_after_rollback(THD *thd, bool all)
/**
The method runs after flush to binary log is done.
*/
int Repl_semi_sync_master::report_binlog_update(THD* thd, const char *log_file,
int Repl_semi_sync_master::report_binlog_update(THD *trans_thd,
THD *waiter_thd,
const char *log_file,
my_off_t log_pos)
{
if (get_master_enabled())
{
Trans_binlog_info *log_info;
if (!(log_info= thd->semisync_info))
if (!(log_info= trans_thd->semisync_info))
{
if(!(log_info= (Trans_binlog_info*)my_malloc(PSI_INSTRUMENT_ME,
sizeof(Trans_binlog_info), MYF(0))))
return 1;
thd->semisync_info= log_info;
trans_thd->semisync_info= log_info;
}
strcpy(log_info->log_file, log_file + dirname_length(log_file));
log_info->log_pos = log_pos;
return write_tranx_in_binlog(log_info->log_file, log_pos);
return write_tranx_in_binlog(waiter_thd, log_info->log_file,
log_pos);
}
return 0;
......@@ -825,7 +852,7 @@ void Repl_semi_sync_master::dump_end(THD* thd)
ack_receiver.remove_slave(thd);
}
int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
int Repl_semi_sync_master::commit_trx(const char *trx_wait_binlog_name,
my_off_t trx_wait_binlog_pos)
{
bool success= 0;
......@@ -852,9 +879,8 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
lock();
/* This must be called after acquired the lock */
THD_ENTER_COND(thd, &COND_binlog_send, &LOCK_binlog,
& stage_waiting_for_semi_sync_ack_from_slave,
& old_stage);
THD_ENTER_COND(thd, &thd->COND_wakeup_ready, &LOCK_binlog,
&stage_waiting_for_semi_sync_ack_from_slave, &old_stage);
/* This is the real check inside the mutex. */
if (!get_master_enabled() || !is_on())
......@@ -865,7 +891,7 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
trx_wait_binlog_name, (ulong)trx_wait_binlog_pos,
(int)is_on()));
while (is_on() && !thd_killed(thd))
while (is_on() && !(aborted= thd_killed(thd)))
{
/* We have to check these again as things may have changed */
if (!rpl_semi_sync_master_clients && !rpl_semi_sync_master_wait_no_slave)
......@@ -902,7 +928,7 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
trx_wait_binlog_pos,
m_wait_file_name, m_wait_file_pos);
if (cmp <= 0)
{
{
/* This thd has a lower position, let's update the minimum info. */
strmake_buf(m_wait_file_name, trx_wait_binlog_name);
m_wait_file_pos = trx_wait_binlog_pos;
......@@ -934,20 +960,18 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
*/
rpl_semi_sync_master_wait_sessions++;
/* We keep track of when this thread is awaiting an ack to ensure it is
* not killed while awaiting an ACK if a shutdown is issued.
*/
set_thd_awaiting_semisync_ack(thd, TRUE);
DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)",
"Repl_semi_sync_master::commit_trx",
m_wait_timeout,
m_wait_file_name, (ulong)m_wait_file_pos));
#ifndef DBUG_OFF
m_active_tranxs->assert_thd_is_waiter(thd, trx_wait_binlog_name,
trx_wait_binlog_pos);
#endif
create_timeout(&abstime, &start_ts);
wait_result = cond_timewait(&abstime);
set_thd_awaiting_semisync_ack(thd, FALSE);
wait_result= mysql_cond_timedwait(&thd->COND_wakeup_ready, &LOCK_binlog,
&abstime);
rpl_semi_sync_master_wait_sessions--;
if (wait_result != 0)
......@@ -979,17 +1003,49 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
{
rpl_semi_sync_master_trx_wait_num++;
rpl_semi_sync_master_trx_wait_time += wait_time;
DBUG_EXECUTE_IF("testing_cond_var_per_thd", {
/*
DBUG log warning to ensure we have either recieved our ACK; or
have timed out and are awoken in an off state. Test
rpl.rpl_semi_sync_cond_var_per_thd scans the logs to ensure this
warning is not present.
*/
bool valid_wakeup=
(!get_master_enabled() || !is_on() || thd->is_killed() ||
0 <= Active_tranx::compare(
m_reply_file_name, m_reply_file_pos,
trx_wait_binlog_name, trx_wait_binlog_pos));
if (!valid_wakeup)
{
sql_print_warning(
"Thread awaiting semi-sync ACK was awoken before its "
"ACK. THD (%llu), Wait coord: (%s, %llu), ACK coord: (%s, "
"%llu)",
thd->thread_id, trx_wait_binlog_name, trx_wait_binlog_pos,
m_reply_file_name, m_reply_file_pos);
}
});
}
}
}
/*
If our THD was killed (rather than awoken from an ACK) notify the
Active_tranx cache that we are no longer waiting for the ACK, so nobody
signals our COND var invalidly.
*/
if (aborted)
m_active_tranxs->unlink_thd_as_waiter(trx_wait_binlog_name,
trx_wait_binlog_pos);
/*
At this point, the binlog file and position of this transaction
must have been removed from Active_tranx.
m_active_tranxs may be NULL if someone disabled semi sync during
cond_timewait()
mysql_cond_timedwait
*/
DBUG_ASSERT(thd_killed(thd) || !m_active_tranxs || aborted ||
DBUG_ASSERT(aborted || !m_active_tranxs || m_active_tranxs->is_empty() ||
!m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name,
trx_wait_binlog_pos));
......@@ -1030,20 +1086,21 @@ void Repl_semi_sync_master::switch_off()
{
DBUG_ENTER("Repl_semi_sync_master::switch_off");
/* Clear the active transaction list. */
if (m_active_tranxs)
m_active_tranxs->clear_active_tranx_nodes(NULL, 0,
signal_waiting_transaction);
if (m_state)
{
m_state = false;
/* Clear the active transaction list. */
DBUG_ASSERT(m_active_tranxs != NULL);
m_active_tranxs->clear_active_tranx_nodes(NULL, 0);
rpl_semi_sync_master_off_times++;
m_wait_file_name_inited = false;
m_reply_file_name_inited = false;
sql_print_information("Semi-sync replication switched OFF.");
}
cond_broadcast(); /* wake up all waiting threads */
DBUG_VOID_RETURN;
}
......@@ -1190,7 +1247,8 @@ int Repl_semi_sync_master::update_sync_header(THD* thd, unsigned char *packet,
DBUG_RETURN(0);
}
int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name,
int Repl_semi_sync_master::write_tranx_in_binlog(THD *thd,
const char *log_file_name,
my_off_t log_file_pos)
{
int result = 0;
......@@ -1233,7 +1291,7 @@ int Repl_semi_sync_master::write_tranx_in_binlog(const char* log_file_name,
if (is_on())
{
DBUG_ASSERT(m_active_tranxs != NULL);
if(m_active_tranxs->insert_tranx_node(log_file_name, log_file_pos))
if(m_active_tranxs->insert_tranx_node(thd, log_file_name, log_file_pos))
{
/*
if insert tranx_node failed, print a warning message
......@@ -1362,21 +1420,34 @@ void Repl_semi_sync_master::set_export_stats()
unlock();
}
void Repl_semi_sync_master::await_slave_reply()
void Repl_semi_sync_master::await_all_slave_replies(const char *msg)
{
struct timespec abstime;
struct timespec timeout;
int wait_result= 0;
bool first= true;
DBUG_ENTER("Repl_semi_sync_master::::await_all_slave_replies");
DBUG_ENTER("Repl_semi_sync_master::::await_slave_reply");
lock();
/* Just return if there is nothing to wait for */
if (!rpl_semi_sync_master_wait_sessions)
goto end;
/*
Wait for all transactions that need ACKS to have received them; or timeout.
If it is a timeout, the connection thread should attempt to turn off
semi-sync and broadcast to all other waiting threads to move on.
create_timeout(&abstime, NULL);
cond_timewait(&abstime);
COND_binlog_send is only signalled after the Active_tranx cache has been
emptied.
*/
create_timeout(&timeout, NULL);
lock();
while (get_master_enabled() && is_on() && !m_active_tranxs->is_empty() && !wait_result)
{
if (msg && first)
{
first= false;
sql_print_information(msg);
}
end:
wait_result=
mysql_cond_timedwait(&COND_binlog_send, &LOCK_binlog, &timeout);
}
unlock();
DBUG_VOID_RETURN;
}
......
......@@ -31,6 +31,7 @@ extern PSI_cond_key key_COND_binlog_send;
struct Tranx_node {
char log_name[FN_REFLEN];
my_off_t log_pos;
THD *thd; /* The thread awaiting an ACK */
struct Tranx_node *next; /* the next node in the sorted list */
struct Tranx_node *hash_next; /* the next node during hash collision */
};
......@@ -288,6 +289,18 @@ class Tranx_node_allocator
}
};
/**
Function pointer type to run on the contents of an Active_tranx node.
Return 0 for success, 1 for error.
Note Repl_semi_sync_master::LOCK_binlog is not guaranteed to be held for
its invocation. See the context in which it is called to know.
*/
typedef int (*active_tranx_action)(THD *trx_thd, const char *log_file_name,
my_off_t trx_log_file_pos);
/**
This class manages memory for active transaction list.
......@@ -308,6 +321,7 @@ class Active_tranx
int m_num_entries; /* maximum hash table entries */
mysql_mutex_t *m_lock; /* mutex lock */
mysql_cond_t *m_cond_empty; /* signalled when cleared all Tranx_node */
inline void assert_lock_owner();
......@@ -330,7 +344,8 @@ class Active_tranx
}
public:
Active_tranx(mysql_mutex_t *lock, unsigned long trace_level);
Active_tranx(mysql_mutex_t *lock, mysql_cond_t *cond,
unsigned long trace_level);
~Active_tranx();
/* Insert an active transaction node with the specified position.
......@@ -338,15 +353,38 @@ class Active_tranx
* Return:
* 0: success; non-zero: error
*/
int insert_tranx_node(const char *log_file_name, my_off_t log_file_pos);
int insert_tranx_node(THD *thd_to_wait, const char *log_file_name,
my_off_t log_file_pos);
/* Clear the active transaction nodes until(inclusive) the specified
* position.
* If log_file_name is NULL, everything will be cleared: the sorted
* list and the hash table will be reset to empty.
*
* The pre_delete_hook parameter is a function pointer that will be invoked
* for each Active_tranx node, in order, from m_trx_front to m_trx_rear,
* e.g. to signal their wakeup condition. Repl_semi_sync_binlog::LOCK_binlog
* is held while this is invoked.
*/
void clear_active_tranx_nodes(const char *log_file_name,
my_off_t log_file_pos);
my_off_t log_file_pos,
active_tranx_action pre_delete_hook);
/* Unlinks a thread from a Tranx_node, so it will not be referenced/signalled
* if it is separately killed. Note that this keeps the Tranx_node itself in
* the cache so it can still be awaited by await_all_slave_replies(), e.g.
* as is done by SHUTDOWN WAIT FOR ALL SLAVES.
*/
void unlink_thd_as_waiter(const char *log_file_name, my_off_t log_file_pos);
#ifndef DBUG_OFF
/* Uses DBUG_ASSERT statements to ensure that the argument thd_to_check
* matches the thread of the respective Tranx_node::thd of the passed in
* log_file_name and log_file_pos.
*/
void assert_thd_is_waiter(THD *thd_to_check, const char *log_file_name,
my_off_t log_file_pos);
#endif
/* Given a position, check to see whether the position is an active
* transaction's ending position by probing the hash table.
......@@ -359,6 +397,12 @@ class Active_tranx
static int compare(const char *log_file_name1, my_off_t log_file_pos1,
const char *log_file_name2, my_off_t log_file_pos2);
/* Check if there are no transactions actively awaiting ACKs. Returns true
* if the internal linked list has no entries, false otherwise.
*/
bool is_empty() { return m_trx_front == NULL; }
};
/**
......@@ -433,8 +477,6 @@ class Repl_semi_sync_master
void lock();
void unlock();
void cond_broadcast();
int cond_timewait(struct timespec *wait_time);
/* Is semi-sync replication on? */
bool is_on() {
......@@ -472,8 +514,6 @@ class Repl_semi_sync_master
m_wait_timeout = wait_timeout;
}
int sync_get_master_wait_sessions();
/*
Calculates a timeout that is m_wait_timeout after start_arg and saves it
in out. If start_arg is NULL, the timeout is m_wait_timeout after the
......@@ -482,10 +522,15 @@ class Repl_semi_sync_master
void create_timeout(struct timespec *out, struct timespec *start_arg);
/*
Blocks the calling thread until the ack_receiver either receives an ACK
or times out (from rpl_semi_sync_master_timeout)
Blocks the calling thread until the ack_receiver either receives ACKs for
all transactions awaiting ACKs, or times out (from
rpl_semi_sync_master_timeout).
If info_msg is provided, it will be output via sql_print_information when
there are transactions awaiting ACKs; info_msg is not output if there are
no transasctions to await.
*/
void await_slave_reply();
void await_all_slave_replies(const char *msg);
/*set the ACK point, after binlog sync or after transaction commit*/
void set_wait_point(unsigned long ack_point)
......@@ -561,9 +606,23 @@ class Repl_semi_sync_master
/*Wait after the transaction is rollback*/
int wait_after_rollback(THD *thd, bool all);
/*Store the current binlog position in m_active_tranxs. This position should
* be acked by slave*/
int report_binlog_update(THD *thd, const char *log_file,my_off_t log_pos);
/* Store the current binlog position in m_active_tranxs. This position should
* be acked by slave.
*
* Inputs:
* trans_thd Thread of the transaction which is executing the
* transaction.
* waiter_thd Thread that will wait for the ACK from the replica,
* which depends on the semi-sync wait point. If AFTER_SYNC,
* and also using binlog group commit, this will be the leader
* thread of the binlog commit. Otherwise, it is the thread that
* is executing the transaction, i.e. the same as trans_thd.
* log_file Name of the binlog file that the transaction is written into
* log_pos Offset within the binlog file that the transaction is written
* at
*/
int report_binlog_update(THD *trans_thd, THD *waiter_thd,
const char *log_file, my_off_t log_pos);
int dump_start(THD* thd,
const char *log_file,
......@@ -609,13 +668,19 @@ class Repl_semi_sync_master
* semi-sync is on
*
* Input: (the transaction events' ending binlog position)
* THD - (IN) thread that will wait for an ACK. This can be the
* binlog leader thread when using wait_point
* AFTER_SYNC with binlog group commit. In all other
* cases, this is the user thread executing the
* transaction.
* log_file_name - (IN) transaction ending position's file name
* log_file_pos - (IN) transaction ending position's file offset
*
* Return:
* 0: success; non-zero: error
*/
int write_tranx_in_binlog(const char* log_file_name, my_off_t log_file_pos);
int write_tranx_in_binlog(THD *thd, const char *log_file_name,
my_off_t log_file_pos);
/* Read the slave's reply so that we know how much progress the slave makes
* on receive replication events.
......@@ -633,30 +698,6 @@ class Repl_semi_sync_master
/*called before reset master*/
int before_reset_master();
/*
Determines if the given thread is currently awaiting a semisync_ack. Note
that the thread's value is protected by this class's LOCK_binlog, so this
function (indirectly) provides safe access.
*/
my_bool is_thd_awaiting_semisync_ack(THD *thd)
{
lock();
my_bool ret= thd->is_awaiting_semisync_ack;
unlock();
return ret;
}
/*
Update the thread's value for is_awaiting_semisync_ack. LOCK_binlog (from
this class) should be acquired before calling this function.
*/
void set_thd_awaiting_semisync_ack(THD *thd,
my_bool _is_awaiting_semisync_ack)
{
mysql_mutex_assert_owner(&LOCK_binlog);
thd->is_awaiting_semisync_ack= _is_awaiting_semisync_ack;
}
mysql_mutex_t LOCK_rpl_semi_sync_master_enabled;
};
......
......@@ -681,8 +681,7 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
#ifdef HAVE_REPLICATION
,
current_linfo(0),
slave_info(0),
is_awaiting_semisync_ack(0)
slave_info(0)
#endif
#ifdef WITH_WSREP
,
......
......@@ -5320,8 +5320,18 @@ class THD: public THD_count, /* this must be first */
Flag, mutex and condition for a thread to wait for a signal from another
thread.
Currently used to wait for group commit to complete, can also be used for
other purposes.
Currently used to wait for group commit to complete, and COND_wakeup_ready
is used for threads to wait on semi-sync ACKs (though is protected by
Repl_semi_sync_master::LOCK_binlog). Note the following relationships
between these two use-cases when using
rpl_semi_sync_master_wait_point=AFTER_SYNC during group commit:
1) Non-leader threads use COND_wakeup_ready to wait for the leader thread
to complete binlog commit.
2) The leader thread uses COND_wakeup_ready to await ACKs from the
replica before signalling the non-leader threads to wake up.
With wait_point=AFTER_COMMIT, there is no overlap as binlogging has
finished, so COND_wakeup_ready is safe to re-use.
*/
bool wakeup_ready;
mysql_mutex_t LOCK_wakeup_ready;
......@@ -5449,14 +5459,6 @@ class THD: public THD_count, /* this must be first */
bool is_binlog_dump_thread();
#endif
/*
Indicates if this thread is suspended due to awaiting an ACK from a
replica. True if suspended, false otherwise.
Note that this variable is protected by Repl_semi_sync_master::LOCK_binlog
*/
bool is_awaiting_semisync_ack;
inline ulong wsrep_binlog_format(ulong binlog_format) const
{
#ifdef WITH_WSREP
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment