Commit 8c714ba8 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-34857: Implement --slave-abort-blocking-timeout

If a slave replicating an event has waited for more than
@@slave_abort_blocking_timeout for a conflicting metadata lock held by a
non-replication thread, the blocking query is killed to allow replication to
proceed and not be blocked indefinitely by a user query.
Signed-off-by: default avatarKristian Nielsen <knielsen@knielsen-hq.org>
parent 9811d23b
include/master-slave.inc
[connection master]
*** Testcase to show how a long-running SELECT can block replication from proceeding
*** past a DDL. Intention to implement a timeout after which such SELECT can be
*** killed.
connection master;
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 SELECT seq, 100+seq FROM seq_1_to_20;
connection slave;
include/stop_slave.inc
SELECT @@GLOBAL.slave_abort_blocking_timeout;
@@GLOBAL.slave_abort_blocking_timeout
31536000.000000
SET @old_abort_timeout= @@slave_abort_blocking_timeout;
SET GLOBAL slave_abort_blocking_timeout= -1;
Warnings:
Warning 1292 Truncated incorrect slave_abort_blocking_timeout value: '-1'
SELECT @@GLOBAL.slave_abort_blocking_timeout;
@@GLOBAL.slave_abort_blocking_timeout
0.000000
SET GLOBAL slave_abort_blocking_timeout= 1.0;
SELECT @@GLOBAL.slave_abort_blocking_timeout;
@@GLOBAL.slave_abort_blocking_timeout
1.000000
connection server_2;
SELECT X.a, SLEEP(IF((X.b MOD 2)=0, 0.4, 0.6)) FROM t1 X CROSS JOIN t1 Y;
connection slave;
connection master;
UPDATE t1 SET b=b+1000 WHERE a=1;
ALTER TABLE t1 ADD INDEX b_idx(b);
UPDATE t1 SET b=b+1000 WHERE a=20;
connection slave;
include/start_slave.inc
connection server_2;
ERROR 70100: Query execution was interrupted
connection slave;
SHOW CREATE TABLE t1;
Table t1
Create Table CREATE TABLE `t1` (
`a` int(11) NOT NULL,
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`a`),
KEY `b_idx` (`b`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_uca1400_ai_ci
include/stop_slave.inc
SET GLOBAL slave_abort_blocking_timeout= 0;
SELECT @@GLOBAL.slave_abort_blocking_timeout;
@@GLOBAL.slave_abort_blocking_timeout
0.000000
connection server_2;
SELECT X.a, SLEEP(IF((X.b MOD 2)=0, 0.4, 0.6)) FROM t1 X CROSS JOIN t1 Y;
connection slave;
connection master;
UPDATE t1 SET b=b+1000 WHERE a=1;
ALTER TABLE t1 DROP INDEX b_idx;
UPDATE t1 SET b=b+1000 WHERE a=20;
connection slave;
include/start_slave.inc
connection server_2;
ERROR 70100: Query execution was interrupted
connection slave;
SHOW CREATE TABLE t1;
Table t1
Create Table CREATE TABLE `t1` (
`a` int(11) NOT NULL,
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_uca1400_ai_ci
include/stop_slave.inc
SET GLOBAL slave_abort_blocking_timeout= @old_abort_timeout;
include/start_slave.inc
connection master;
DROP TABLE t1;
include/rpl_end.inc
--source include/have_innodb.inc
--source include/have_sequence.inc
--source include/have_binlog_format_mixed.inc
--source include/master-slave.inc
--echo *** Testcase to show how a long-running SELECT can block replication from proceeding
--echo *** past a DDL. Intention to implement a timeout after which such SELECT can be
--echo *** killed.
--connection master
CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t1 SELECT seq, 100+seq FROM seq_1_to_20;
--sync_slave_with_master
--source include/stop_slave.inc
SELECT @@GLOBAL.slave_abort_blocking_timeout;
SET @old_abort_timeout= @@slave_abort_blocking_timeout;
SET GLOBAL slave_abort_blocking_timeout= -1;
SELECT @@GLOBAL.slave_abort_blocking_timeout;
SET GLOBAL slave_abort_blocking_timeout= 1.0;
SELECT @@GLOBAL.slave_abort_blocking_timeout;
--connection server_2
# Start a SELECT that will run for long.
send SELECT X.a, SLEEP(IF((X.b MOD 2)=0, 0.4, 0.6)) FROM t1 X CROSS JOIN t1 Y;
--connection slave
# Wait for the SELECT to have started so it will block the coming DDL
# from replicating.
--let $wait_condition= SELECT COUNT(*)=1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE state = 'User sleep'
--source include/wait_condition.inc
--connection master
UPDATE t1 SET b=b+1000 WHERE a=1;
ALTER TABLE t1 ADD INDEX b_idx(b);
UPDATE t1 SET b=b+1000 WHERE a=20;
--save_master_pos
--connection slave
--source include/start_slave.inc
--sync_with_master
--connection server_2
--error ER_QUERY_INTERRUPTED
reap;
--connection slave
query_vertical SHOW CREATE TABLE t1;
# Do it again to test that a timeout of 0 also works to abort user queries.
--source include/stop_slave.inc
SET GLOBAL slave_abort_blocking_timeout= 0;
SELECT @@GLOBAL.slave_abort_blocking_timeout;
--connection server_2
send SELECT X.a, SLEEP(IF((X.b MOD 2)=0, 0.4, 0.6)) FROM t1 X CROSS JOIN t1 Y;
--connection slave
--let $wait_condition= SELECT COUNT(*)=1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE state = 'User sleep'
--source include/wait_condition.inc
--connection master
UPDATE t1 SET b=b+1000 WHERE a=1;
ALTER TABLE t1 DROP INDEX b_idx;
UPDATE t1 SET b=b+1000 WHERE a=20;
--save_master_pos
--connection slave
--source include/start_slave.inc
--sync_with_master
--connection server_2
--error ER_QUERY_INTERRUPTED
reap;
--connection slave
query_vertical SHOW CREATE TABLE t1;
--source include/stop_slave.inc
SET GLOBAL slave_abort_blocking_timeout= @old_abort_timeout;
--source include/start_slave.inc
--connection master
DROP TABLE t1;
--source include/rpl_end.inc
...@@ -613,7 +613,7 @@ class MDL_lock ...@@ -613,7 +613,7 @@ class MDL_lock
bool needs_notification(const MDL_ticket *ticket) const bool needs_notification(const MDL_ticket *ticket) const
{ return m_strategy->needs_notification(ticket); } { return m_strategy->needs_notification(ticket); }
void notify_conflicting_locks(MDL_context *ctx) void notify_conflicting_locks(MDL_context *ctx, bool abort_blocking)
{ {
for (const auto &conflicting_ticket : m_granted) for (const auto &conflicting_ticket : m_granted)
{ {
...@@ -624,7 +624,8 @@ class MDL_lock ...@@ -624,7 +624,8 @@ class MDL_lock
ctx->get_owner()-> ctx->get_owner()->
notify_shared_lock(conflicting_ctx->get_owner(), notify_shared_lock(conflicting_ctx->get_owner(),
conflicting_ctx->get_needs_thr_lock_abort()); conflicting_ctx->get_needs_thr_lock_abort(),
abort_blocking);
} }
} }
} }
...@@ -2361,10 +2362,10 @@ MDL_context::acquire_lock(MDL_request *mdl_request, double lock_wait_timeout) ...@@ -2361,10 +2362,10 @@ MDL_context::acquire_lock(MDL_request *mdl_request, double lock_wait_timeout)
/* /*
Don't break conflicting locks if timeout is 0 as 0 is used Don't break conflicting locks if timeout is 0 as 0 is used
To check if there is any conflicting locks... to check if there is any conflicting locks...
*/ */
if (lock->needs_notification(ticket) && lock_wait_timeout) if (lock->needs_notification(ticket) && lock_wait_timeout)
lock->notify_conflicting_locks(this); lock->notify_conflicting_locks(this, false);
/* /*
Ensure that if we are trying to get an exclusive lock for a slave Ensure that if we are trying to get an exclusive lock for a slave
...@@ -2397,14 +2398,39 @@ MDL_context::acquire_lock(MDL_request *mdl_request, double lock_wait_timeout) ...@@ -2397,14 +2398,39 @@ MDL_context::acquire_lock(MDL_request *mdl_request, double lock_wait_timeout)
find_deadlock(); find_deadlock();
struct timespec abs_timeout, abs_shortwait; struct timespec abs_timeout, abs_shortwait, abs_abort_blocking_timeout;
bool abort_blocking_enabled= false;
double abort_blocking_timeout= slave_abort_blocking_timeout;
if (abort_blocking_timeout < lock_wait_timeout &&
m_owner->get_thd()->rgi_slave)
{
set_timespec_nsec(abs_abort_blocking_timeout,
(ulonglong)(abort_blocking_timeout * 1000000000ULL));
abort_blocking_enabled= true;
}
set_timespec_nsec(abs_timeout, set_timespec_nsec(abs_timeout,
(ulonglong)(lock_wait_timeout * 1000000000ULL)); (ulonglong)(lock_wait_timeout * 1000000000ULL));
set_timespec(abs_shortwait, 1);
wait_status= MDL_wait::EMPTY; wait_status= MDL_wait::EMPTY;
while (cmp_timespec(abs_shortwait, abs_timeout) <= 0) for (;;)
{ {
bool abort_blocking= false;
set_timespec(abs_shortwait, 1);
if (abort_blocking_enabled &&
cmp_timespec(abs_shortwait, abs_abort_blocking_timeout) >= 0)
{
/*
If a slave DDL has waited for --slave-abort-select-timeout, then notify
any blocking SELECT once before continuing to wait until the full
timeout.
*/
abs_shortwait= abs_abort_blocking_timeout;
abort_blocking= true;
abort_blocking_enabled= false;
}
if (cmp_timespec(abs_shortwait, abs_timeout) > 0)
break;
/* abs_timeout is far away. Wait a short while and notify locks. */ /* abs_timeout is far away. Wait a short while and notify locks. */
wait_status= m_wait.timed_wait(m_owner, &abs_shortwait, FALSE, wait_status= m_wait.timed_wait(m_owner, &abs_shortwait, FALSE,
mdl_request->key.get_wait_state_name()); mdl_request->key.get_wait_state_name());
...@@ -2425,9 +2451,8 @@ MDL_context::acquire_lock(MDL_request *mdl_request, double lock_wait_timeout) ...@@ -2425,9 +2451,8 @@ MDL_context::acquire_lock(MDL_request *mdl_request, double lock_wait_timeout)
mysql_prlock_wrlock(&lock->m_rwlock); mysql_prlock_wrlock(&lock->m_rwlock);
if (lock->needs_notification(ticket)) if (lock->needs_notification(ticket))
lock->notify_conflicting_locks(this); lock->notify_conflicting_locks(this, abort_blocking);
mysql_prlock_unlock(&lock->m_rwlock); mysql_prlock_unlock(&lock->m_rwlock);
set_timespec(abs_shortwait, 1);
} }
if (wait_status == MDL_wait::EMPTY) if (wait_status == MDL_wait::EMPTY)
wait_status= m_wait.timed_wait(m_owner, &abs_timeout, TRUE, wait_status= m_wait.timed_wait(m_owner, &abs_timeout, TRUE,
......
...@@ -110,7 +110,8 @@ class MDL_context_owner ...@@ -110,7 +110,8 @@ class MDL_context_owner
@see THD::notify_shared_lock() @see THD::notify_shared_lock()
*/ */
virtual bool notify_shared_lock(MDL_context_owner *in_use, virtual bool notify_shared_lock(MDL_context_owner *in_use,
bool needs_thr_lock_abort) = 0; bool needs_thr_lock_abort,
bool needs_non_slave_abort) = 0;
}; };
/** /**
......
...@@ -494,6 +494,7 @@ uint internal_slave_connections_needed_for_purge; ...@@ -494,6 +494,7 @@ uint internal_slave_connections_needed_for_purge;
ulong slave_max_allowed_packet= 0; ulong slave_max_allowed_packet= 0;
double slave_max_statement_time_double; double slave_max_statement_time_double;
ulonglong slave_max_statement_time; ulonglong slave_max_statement_time;
double slave_abort_blocking_timeout;
ulonglong binlog_stmt_cache_size=0; ulonglong binlog_stmt_cache_size=0;
ulonglong max_binlog_stmt_cache_size=0; ulonglong max_binlog_stmt_cache_size=0;
ulonglong test_flags; ulonglong test_flags;
......
...@@ -242,6 +242,7 @@ extern ulong max_binlog_size; ...@@ -242,6 +242,7 @@ extern ulong max_binlog_size;
extern ulong slave_max_allowed_packet; extern ulong slave_max_allowed_packet;
extern ulonglong slave_max_statement_time; extern ulonglong slave_max_statement_time;
extern double slave_max_statement_time_double; extern double slave_max_statement_time_double;
extern double slave_abort_blocking_timeout;
extern ulong opt_binlog_rows_event_max_size; extern ulong opt_binlog_rows_event_max_size;
extern ulong binlog_row_metadata; extern ulong binlog_row_metadata;
extern my_bool opt_binlog_gtid_index; extern my_bool opt_binlog_gtid_index;
......
...@@ -598,6 +598,8 @@ constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_MAX_ALLOWED_PACKET= ...@@ -598,6 +598,8 @@ constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_MAX_ALLOWED_PACKET=
REPL_SLAVE_ADMIN_ACL; REPL_SLAVE_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_MAX_STATEMENT_TIME= constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_MAX_STATEMENT_TIME=
REPL_SLAVE_ADMIN_ACL; REPL_SLAVE_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_ABORT_BLOCKING_TIMEOUT=
REPL_SLAVE_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_NET_TIMEOUT= constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_NET_TIMEOUT=
REPL_SLAVE_ADMIN_ACL; REPL_SLAVE_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_PARALLEL_MAX_QUEUED= constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_PARALLEL_MAX_QUEUED=
......
...@@ -1025,7 +1025,7 @@ void close_thread_table(THD *thd, TABLE **table_ptr) ...@@ -1025,7 +1025,7 @@ void close_thread_table(THD *thd, TABLE **table_ptr)
thd->handler_stats.add(file->handler_stats); thd->handler_stats.add(file->handler_stats);
} }
/* /*
This look is needed to allow THD::notify_shared_lock() to This lock is needed to allow THD::notify_shared_lock() to
traverse the thd->open_tables list without having to worry that traverse the thd->open_tables list without having to worry that
some of the tables are removed from under it some of the tables are removed from under it
*/ */
......
...@@ -2158,21 +2158,28 @@ void THD::disconnect() ...@@ -2158,21 +2158,28 @@ void THD::disconnect()
bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use,
bool needs_thr_lock_abort) bool needs_thr_lock_abort,
bool needs_non_slave_abort)
{ {
THD *in_use= ctx_in_use->get_thd(); THD *in_use= ctx_in_use->get_thd();
bool signalled= FALSE; bool signalled= FALSE;
DBUG_ENTER("THD::notify_shared_lock"); DBUG_ENTER("THD::notify_shared_lock");
DBUG_PRINT("enter",("needs_thr_lock_abort: %d", needs_thr_lock_abort)); DBUG_PRINT("enter",("needs_thr_lock_abort: %d", needs_thr_lock_abort));
if ((in_use->system_thread & SYSTEM_THREAD_DELAYED_INSERT) && enum killed_state kill_signal;
!in_use->killed) if (in_use->system_thread & SYSTEM_THREAD_DELAYED_INSERT)
kill_signal= KILL_CONNECTION;
else if (needs_non_slave_abort && !in_use->slave_thread)
kill_signal= KILL_QUERY;
else
kill_signal= NOT_KILLED;
if (kill_signal != NOT_KILLED && !in_use->killed)
{ {
/* This code is similar to kill_delayed_threads() */ /* This code is similar to kill_delayed_threads() */
DBUG_PRINT("info", ("kill delayed thread")); DBUG_PRINT("info", ("kill delayed thread"));
mysql_mutex_lock(&in_use->LOCK_thd_kill); mysql_mutex_lock(&in_use->LOCK_thd_kill);
if (in_use->killed < KILL_CONNECTION) if (in_use->killed < kill_signal)
in_use->set_killed_no_mutex(KILL_CONNECTION); in_use->set_killed_no_mutex(kill_signal);
in_use->abort_current_cond_wait(true); in_use->abort_current_cond_wait(true);
mysql_mutex_unlock(&in_use->LOCK_thd_kill); mysql_mutex_unlock(&in_use->LOCK_thd_kill);
signalled= TRUE; signalled= TRUE;
......
...@@ -4288,7 +4288,8 @@ class THD: public THD_count, /* this must be first */ ...@@ -4288,7 +4288,8 @@ class THD: public THD_count, /* this must be first */
@retval FALSE otherwise. @retval FALSE otherwise.
*/ */
bool notify_shared_lock(MDL_context_owner *ctx_in_use, bool notify_shared_lock(MDL_context_owner *ctx_in_use,
bool needs_thr_lock_abort) override; bool needs_thr_lock_abort,
bool needs_non_slave_abort) override;
// End implementation of MDL_context_owner interface. // End implementation of MDL_context_owner interface.
......
...@@ -2591,6 +2591,17 @@ static Sys_var_on_access_global< ...@@ -2591,6 +2591,17 @@ static Sys_var_on_access_global<
GLOBAL_VAR(slave_max_statement_time_double), CMD_LINE(REQUIRED_ARG), GLOBAL_VAR(slave_max_statement_time_double), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, LONG_TIMEOUT), DEFAULT(0), NO_MUTEX_GUARD, VALID_RANGE(0, LONG_TIMEOUT), DEFAULT(0), NO_MUTEX_GUARD,
NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(update_slave_max_statement_time)); NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(update_slave_max_statement_time));
static Sys_var_on_access_global<
Sys_var_double, PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_ABORT_BLOCKING_TIMEOUT>
Sys_slave_abort_blocking_timeout(
"slave_abort_blocking_timeout",
"Maximum time a slave DDL will wait for a blocking SELECT or other "
"user query until that query will be aborted. The argument will be "
"treated as a decimal value with microsecond precision",
GLOBAL_VAR(slave_abort_blocking_timeout), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, LONG_TIMEOUT), DEFAULT(LONG_TIMEOUT), NO_MUTEX_GUARD,
NOT_IN_BINLOG);
#endif #endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment