Commit ec47beab authored by Kristian Nielsen's avatar Kristian Nielsen

Merge parallel replication async deadlock kill into 10.2.

Conflicts:
	sql/mysqld.cc
	sql/slave.cc
parents 06b7fce9 7e0c9de8
...@@ -44,6 +44,16 @@ processlist_info NULL ...@@ -44,6 +44,16 @@ processlist_info NULL
unified_parent_thread_id unified parent_thread_id unified_parent_thread_id unified parent_thread_id
role NULL role NULL
instrumented YES instrumented YES
name thread/sql/slave_background
type BACKGROUND
processlist_user NULL
processlist_host NULL
processlist_db NULL
processlist_command NULL
processlist_info NULL
unified_parent_thread_id unified parent_thread_id
role NULL
instrumented YES
CREATE TEMPORARY TABLE t1 AS CREATE TEMPORARY TABLE t1 AS
SELECT thread_id FROM performance_schema.threads SELECT thread_id FROM performance_schema.threads
WHERE name LIKE 'thread/sql%'; WHERE name LIKE 'thread/sql%';
...@@ -105,4 +115,5 @@ parent_thread_name child_thread_name ...@@ -105,4 +115,5 @@ parent_thread_name child_thread_name
thread/sql/event_scheduler thread/sql/event_worker thread/sql/event_scheduler thread/sql/event_worker
thread/sql/main thread/sql/one_connection thread/sql/main thread/sql/one_connection
thread/sql/main thread/sql/signal_handler thread/sql/main thread/sql/signal_handler
thread/sql/main thread/sql/slave_background
thread/sql/one_connection thread/sql/event_scheduler thread/sql/one_connection thread/sql/event_scheduler
...@@ -383,7 +383,7 @@ static bool binlog_format_used= false; ...@@ -383,7 +383,7 @@ static bool binlog_format_used= false;
LEX_STRING opt_init_connect, opt_init_slave; LEX_STRING opt_init_connect, opt_init_slave;
mysql_cond_t COND_thread_cache; mysql_cond_t COND_thread_cache;
static mysql_cond_t COND_flush_thread_cache; static mysql_cond_t COND_flush_thread_cache;
mysql_cond_t COND_slave_init; mysql_cond_t COND_slave_background;
static DYNAMIC_ARRAY all_options; static DYNAMIC_ARRAY all_options;
/* Global variables */ /* Global variables */
...@@ -754,7 +754,7 @@ mysql_mutex_t ...@@ -754,7 +754,7 @@ mysql_mutex_t
LOCK_crypt, LOCK_crypt,
LOCK_global_system_variables, LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_active_mi, LOCK_user_conn, LOCK_slave_list, LOCK_active_mi,
LOCK_connection_count, LOCK_error_messages, LOCK_slave_init; LOCK_connection_count, LOCK_error_messages, LOCK_slave_background;
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats, mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats; LOCK_global_table_stats, LOCK_global_index_stats;
...@@ -937,7 +937,7 @@ PSI_mutex_key key_LOCK_gtid_waiting; ...@@ -937,7 +937,7 @@ PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_after_binlog_sync; PSI_mutex_key key_LOCK_after_binlog_sync;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered, PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
key_LOCK_slave_init; key_LOCK_slave_background;
PSI_mutex_key key_TABLE_SHARE_LOCK_share; PSI_mutex_key key_TABLE_SHARE_LOCK_share;
static PSI_mutex_info all_server_mutexes[]= static PSI_mutex_info all_server_mutexes[]=
...@@ -1003,7 +1003,7 @@ static PSI_mutex_info all_server_mutexes[]= ...@@ -1003,7 +1003,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL}, { &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_after_binlog_sync, "LOCK_after_binlog_sync", PSI_FLAG_GLOBAL}, { &key_LOCK_after_binlog_sync, "LOCK_after_binlog_sync", PSI_FLAG_GLOBAL},
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL}, { &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_init, "LOCK_slave_init", PSI_FLAG_GLOBAL}, { &key_LOCK_slave_background, "LOCK_slave_background", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0}, { &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL}, { &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL}, { &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
...@@ -1060,7 +1060,7 @@ PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; ...@@ -1060,7 +1060,7 @@ PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread, PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_rpl_thread_stop, key_COND_rpl_thread_pool, key_COND_rpl_thread_stop, key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer, key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered, key_COND_slave_init; key_COND_prepare_ordered, key_COND_slave_background;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates; PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
static PSI_cond_info all_server_conds[]= static PSI_cond_info all_server_conds[]=
...@@ -1110,7 +1110,7 @@ static PSI_cond_info all_server_conds[]= ...@@ -1110,7 +1110,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_parallel_entry, "COND_parallel_entry", 0}, { &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0}, { &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0}, { &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_slave_init, "COND_slave_init", 0}, { &key_COND_slave_background, "COND_slave_background", 0},
{ &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL}, { &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0}, { &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0} { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
...@@ -1119,7 +1119,7 @@ static PSI_cond_info all_server_conds[]= ...@@ -1119,7 +1119,7 @@ static PSI_cond_info all_server_conds[]=
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_main, key_thread_handle_manager, key_thread_main,
key_thread_one_connection, key_thread_signal_hand, key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_init, key_rpl_parallel_thread; key_thread_slave_background, key_rpl_parallel_thread;
static PSI_thread_info all_server_threads[]= static PSI_thread_info all_server_threads[]=
{ {
...@@ -1145,7 +1145,7 @@ static PSI_thread_info all_server_threads[]= ...@@ -1145,7 +1145,7 @@ static PSI_thread_info all_server_threads[]=
{ &key_thread_main, "main", PSI_FLAG_GLOBAL}, { &key_thread_main, "main", PSI_FLAG_GLOBAL},
{ &key_thread_one_connection, "one_connection", 0}, { &key_thread_one_connection, "one_connection", 0},
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL}, { &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
{ &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL}, { &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL},
{ &key_rpl_parallel_thread, "rpl_parallel_thread", 0} { &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
}; };
...@@ -2351,8 +2351,8 @@ static void clean_up_mutexes() ...@@ -2351,8 +2351,8 @@ static void clean_up_mutexes()
mysql_cond_destroy(&COND_prepare_ordered); mysql_cond_destroy(&COND_prepare_ordered);
mysql_mutex_destroy(&LOCK_after_binlog_sync); mysql_mutex_destroy(&LOCK_after_binlog_sync);
mysql_mutex_destroy(&LOCK_commit_ordered); mysql_mutex_destroy(&LOCK_commit_ordered);
mysql_mutex_destroy(&LOCK_slave_init); mysql_mutex_destroy(&LOCK_slave_background);
mysql_cond_destroy(&COND_slave_init); mysql_cond_destroy(&COND_slave_background);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -4738,9 +4738,9 @@ static int init_thread_environment() ...@@ -4738,9 +4738,9 @@ static int init_thread_environment()
MY_MUTEX_INIT_SLOW); MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered, mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
MY_MUTEX_INIT_SLOW); MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_slave_init, &LOCK_slave_init, mysql_mutex_init(key_LOCK_slave_background, &LOCK_slave_background,
MY_MUTEX_INIT_SLOW); MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_slave_init, &COND_slave_init, NULL); mysql_cond_init(key_COND_slave_background, &COND_slave_background, NULL);
#ifdef HAVE_OPENSSL #ifdef HAVE_OPENSSL
mysql_mutex_init(key_LOCK_des_key_file, mysql_mutex_init(key_LOCK_des_key_file,
...@@ -10172,6 +10172,9 @@ PSI_stage_info stage_waiting_for_rpl_thread_pool= { 0, "Waiting while replicatio ...@@ -10172,6 +10172,9 @@ PSI_stage_info stage_waiting_for_rpl_thread_pool= { 0, "Waiting while replicatio
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0}; PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0}; PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0}; PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
PSI_stage_info stage_slave_background_process_request= { 0, "Processing requests", 0};
PSI_stage_info stage_slave_background_wait_request= { 0, "Waiting for requests", 0};
PSI_stage_info stage_waiting_for_deadlock_kill= { 0, "Waiting for parallel replication deadlock handling to complete", 0};
#ifdef HAVE_PSI_INTERFACE #ifdef HAVE_PSI_INTERFACE
...@@ -10296,7 +10299,9 @@ PSI_stage_info *all_server_stages[]= ...@@ -10296,7 +10299,9 @@ PSI_stage_info *all_server_stages[]=
& stage_waiting_to_get_readlock, & stage_waiting_to_get_readlock,
& stage_master_gtid_wait_primary, & stage_master_gtid_wait_primary,
& stage_master_gtid_wait, & stage_master_gtid_wait,
& stage_gtid_wait_other_connection & stage_gtid_wait_other_connection,
& stage_slave_background_process_request,
& stage_slave_background_wait_request
}; };
PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection; PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection;
......
...@@ -342,8 +342,8 @@ extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates; ...@@ -342,8 +342,8 @@ extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main, key_thread_handle_manager, key_thread_kill_server, key_thread_main,
key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init, key_thread_one_connection, key_thread_signal_hand,
key_rpl_parallel_thread; key_thread_slave_background, key_rpl_parallel_thread;
extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest, extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
key_file_dbopt, key_file_des_key_file, key_file_ERRMSG, key_select_to_file, key_file_dbopt, key_file_des_key_file, key_file_ERRMSG, key_select_to_file,
...@@ -489,6 +489,9 @@ extern PSI_stage_info stage_waiting_for_rpl_thread_pool; ...@@ -489,6 +489,9 @@ extern PSI_stage_info stage_waiting_for_rpl_thread_pool;
extern PSI_stage_info stage_master_gtid_wait_primary; extern PSI_stage_info stage_master_gtid_wait_primary;
extern PSI_stage_info stage_master_gtid_wait; extern PSI_stage_info stage_master_gtid_wait;
extern PSI_stage_info stage_gtid_wait_other_connection; extern PSI_stage_info stage_gtid_wait_other_connection;
extern PSI_stage_info stage_slave_background_process_request;
extern PSI_stage_info stage_slave_background_wait_request;
extern PSI_stage_info stage_waiting_for_deadlock_kill;
#ifdef HAVE_PSI_STATEMENT_INTERFACE #ifdef HAVE_PSI_STATEMENT_INTERFACE
/** /**
...@@ -558,7 +561,7 @@ extern mysql_mutex_t ...@@ -558,7 +561,7 @@ extern mysql_mutex_t
LOCK_slave_list, LOCK_active_mi, LOCK_manager, LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables, LOCK_user_conn, LOCK_global_system_variables, LOCK_user_conn,
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count, LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count,
LOCK_slave_init; LOCK_slave_background;
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count; extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
extern mysql_mutex_t LOCK_start_thread; extern mysql_mutex_t LOCK_start_thread;
#ifdef HAVE_OPENSSL #ifdef HAVE_OPENSSL
...@@ -571,7 +574,7 @@ extern mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave; ...@@ -571,7 +574,7 @@ extern mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
extern mysql_rwlock_t LOCK_system_variables_hash; extern mysql_rwlock_t LOCK_system_variables_hash;
extern mysql_cond_t COND_thread_count, COND_start_thread; extern mysql_cond_t COND_thread_count, COND_start_thread;
extern mysql_cond_t COND_manager; extern mysql_cond_t COND_manager;
extern mysql_cond_t COND_slave_init; extern mysql_cond_t COND_slave_background;
extern int32 thread_running; extern int32 thread_running;
extern int32 thread_count, service_thread_count; extern int32 thread_count, service_thread_count;
......
...@@ -107,6 +107,25 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) ...@@ -107,6 +107,25 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
} }
/*
Wait for any pending deadlock kills. Since deadlock kills happen
asynchronously, we need to be sure they will be completed before starting a
new transaction. Otherwise the new transaction might suffer a spurious kill.
*/
static void
wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi)
{
PSI_stage_info old_stage;
mysql_mutex_lock(&thd->LOCK_wakeup_ready);
thd->ENTER_COND(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready,
&stage_waiting_for_deadlock_kill, &old_stage);
while (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
mysql_cond_wait(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready);
thd->EXIT_COND(&old_stage);
}
static void static void
finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
rpl_parallel_entry *entry, rpl_group_info *rgi) rpl_parallel_entry *entry, rpl_group_info *rgi)
...@@ -212,6 +231,8 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, ...@@ -212,6 +231,8 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
entry->stop_on_error_sub_id= sub_id; entry->stop_on_error_sub_id= sub_id;
mysql_mutex_unlock(&entry->LOCK_parallel_entry); mysql_mutex_unlock(&entry->LOCK_parallel_entry);
if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
wait_for_pending_deadlock_kill(thd, rgi);
thd->clear_error(); thd->clear_error();
thd->reset_killed(); thd->reset_killed();
/* /*
...@@ -610,7 +631,6 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi) ...@@ -610,7 +631,6 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi)
{ {
thd->clear_error(); thd->clear_error();
my_error(ER_LOCK_DEADLOCK, MYF(0)); my_error(ER_LOCK_DEADLOCK, MYF(0));
rgi->killed_for_retry= false;
thd->reset_killed(); thd->reset_killed();
} }
} }
...@@ -701,14 +721,16 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, ...@@ -701,14 +721,16 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
DBUG_EXECUTE_IF("inject_mdev8031", { DBUG_EXECUTE_IF("inject_mdev8031", {
/* Simulate that we get deadlock killed at this exact point. */ /* Simulate that we get deadlock killed at this exact point. */
rgi->killed_for_retry= true; rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
mysql_mutex_lock(&thd->LOCK_thd_data); mysql_mutex_lock(&thd->LOCK_thd_data);
thd->killed= KILL_CONNECTION; thd->killed= KILL_CONNECTION;
mysql_mutex_unlock(&thd->LOCK_thd_data); mysql_mutex_unlock(&thd->LOCK_thd_data);
}); });
rgi->cleanup_context(thd, 1); rgi->cleanup_context(thd, 1);
wait_for_pending_deadlock_kill(thd, rgi);
thd->reset_killed(); thd->reset_killed();
thd->clear_error(); thd->clear_error();
rgi->killed_for_retry = rpl_group_info::RETRY_KILL_NONE;
/* /*
If we retry due to a deadlock kill that occurred during the commit step, we If we retry due to a deadlock kill that occurred during the commit step, we
...@@ -847,7 +869,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, ...@@ -847,7 +869,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
{ {
/* Simulate that we get deadlock killed during open_binlog(). */ /* Simulate that we get deadlock killed during open_binlog(). */
thd->reset_for_next_command(); thd->reset_for_next_command();
rgi->killed_for_retry= true; rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
mysql_mutex_lock(&thd->LOCK_thd_data); mysql_mutex_lock(&thd->LOCK_thd_data);
thd->killed= KILL_CONNECTION; thd->killed= KILL_CONNECTION;
mysql_mutex_unlock(&thd->LOCK_thd_data); mysql_mutex_unlock(&thd->LOCK_thd_data);
...@@ -1747,7 +1769,7 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, ...@@ -1747,7 +1769,7 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rgi->relay_log= rli->last_inuse_relaylog; rgi->relay_log= rli->last_inuse_relaylog;
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size; rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0; rgi->retry_event_count= 0;
rgi->killed_for_retry= false; rgi->killed_for_retry= rpl_group_info::RETRY_KILL_NONE;
return rgi; return rgi;
} }
......
...@@ -713,7 +713,12 @@ struct rpl_group_info ...@@ -713,7 +713,12 @@ struct rpl_group_info
*/ */
SPECULATE_WAIT SPECULATE_WAIT
} speculation; } speculation;
bool killed_for_retry; enum enum_retry_killed {
RETRY_KILL_NONE = 0,
RETRY_KILL_PENDING,
RETRY_KILL_KILLED
};
uchar killed_for_retry;
rpl_group_info(Relay_log_info *rli_); rpl_group_info(Relay_log_info *rli_);
~rpl_group_info(); ~rpl_group_info();
......
...@@ -283,18 +283,27 @@ static void init_slave_psi_keys(void) ...@@ -283,18 +283,27 @@ static void init_slave_psi_keys(void)
#endif /* HAVE_PSI_INTERFACE */ #endif /* HAVE_PSI_INTERFACE */
static bool slave_init_thread_running; static bool slave_background_thread_running;
static bool slave_background_thread_stop;
static bool slave_background_thread_gtid_loaded;
struct slave_background_kill_t {
slave_background_kill_t *next;
THD *to_kill;
} *slave_background_kill_list;
pthread_handler_t pthread_handler_t
handle_slave_init(void *arg __attribute__((unused))) handle_slave_background(void *arg __attribute__((unused)))
{ {
THD *thd; THD *thd;
PSI_stage_info old_stage;
bool stop;
my_thread_init(); my_thread_init();
thd= new THD(next_thread_id()); thd= new THD(next_thread_id());
thd->thread_stack= (char*) &thd; /* Set approximate stack start */ thd->thread_stack= (char*) &thd; /* Set approximate stack start */
thd->system_thread = SYSTEM_THREAD_SLAVE_INIT; thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
thread_safe_increment32(&service_thread_count); thread_safe_increment32(&service_thread_count);
thd->store_globals(); thd->store_globals();
thd->security_ctx->skip_grants(); thd->security_ctx->skip_grants();
...@@ -307,49 +316,136 @@ handle_slave_init(void *arg __attribute__((unused))) ...@@ -307,49 +316,136 @@ handle_slave_init(void *arg __attribute__((unused)))
rpl_gtid_slave_state_table_name.str, rpl_gtid_slave_state_table_name.str,
thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message()); thd->get_stmt_da()->message());
delete thd;
thread_safe_decrement32(&service_thread_count);
/* Signal run_slave_init_thread() that we are done */ mysql_mutex_lock(&LOCK_slave_background);
slave_background_thread_gtid_loaded= true;
mysql_cond_broadcast(&COND_slave_background);
mysql_mutex_lock(&LOCK_start_thread); THD_STAGE_INFO(thd, stage_slave_background_process_request);
slave_init_thread_running= false; do
mysql_cond_broadcast(&COND_start_thread); {
mysql_mutex_unlock(&LOCK_start_thread); slave_background_kill_t *kill_list;
thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
&stage_slave_background_wait_request,
&old_stage);
for (;;)
{
stop= abort_loop || thd->killed || slave_background_thread_stop;
kill_list= slave_background_kill_list;
if (stop || kill_list)
break;
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
}
slave_background_kill_list= NULL;
thd->EXIT_COND(&old_stage);
while (kill_list)
{
slave_background_kill_t *p = kill_list;
THD *to_kill= p->to_kill;
kill_list= p->next;
mysql_mutex_lock(&to_kill->LOCK_thd_data);
to_kill->awake(KILL_CONNECTION);
mysql_mutex_unlock(&to_kill->LOCK_thd_data);
mysql_mutex_lock(&to_kill->LOCK_wakeup_ready);
to_kill->rgi_slave->killed_for_retry=
rpl_group_info::RETRY_KILL_KILLED;
mysql_cond_broadcast(&to_kill->COND_wakeup_ready);
mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready);
my_free(p);
}
mysql_mutex_lock(&LOCK_slave_background);
} while (!stop);
slave_background_thread_running= false;
mysql_cond_broadcast(&COND_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
delete thd;
thread_safe_decrement32(&service_thread_count);
my_thread_end(); my_thread_end();
return 0; return 0;
} }
void
slave_background_kill_request(THD *to_kill)
{
if (to_kill->rgi_slave->killed_for_retry)
return; // Already deadlock killed.
slave_background_kill_t *p=
(slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME));
if (p)
{
p->to_kill= to_kill;
to_kill->rgi_slave->killed_for_retry=
rpl_group_info::RETRY_KILL_PENDING;
mysql_mutex_lock(&LOCK_slave_background);
p->next= slave_background_kill_list;
slave_background_kill_list= p;
mysql_cond_signal(&COND_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
}
}
/* /*
Start the slave init thread. Start the slave background thread.
This thread is currently used for two purposes:
1. To load the GTID state from mysql.gtid_slave_pos at server start; reading
from table requires valid THD, which is otherwise not available during
server init.
This thread is used to load the GTID state from mysql.gtid_slave_pos at 2. To kill worker thread transactions during parallel replication, when a
server start; reading from table requires valid THD, which is otherwise not storage engine attempts to take an errorneous conflicting lock that would
available during server init. cause a deadlock. Killing is done asynchroneously, as the kill may not
be safe within the context of a callback from inside storage engine
locking code.
*/ */
static int static int
run_slave_init_thread() start_slave_background_thread()
{ {
pthread_t th; pthread_t th;
slave_init_thread_running= true; slave_background_thread_running= true;
if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib, slave_background_thread_stop= false;
handle_slave_init, NULL)) slave_background_thread_gtid_loaded= false;
if (mysql_thread_create(key_thread_slave_background,
&th, &connection_attrib, handle_slave_background,
NULL))
{ {
sql_print_error("Failed to create thread while initialising slave"); sql_print_error("Failed to create thread while initialising slave");
return 1; return 1;
} }
mysql_mutex_lock(&LOCK_start_thread); mysql_mutex_lock(&LOCK_slave_background);
while (slave_init_thread_running) while (!slave_background_thread_gtid_loaded)
mysql_cond_wait(&COND_start_thread, &LOCK_start_thread); mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
mysql_mutex_unlock(&LOCK_start_thread); mysql_mutex_unlock(&LOCK_slave_background);
return 0; return 0;
} }
static void
stop_slave_background_thread()
{
mysql_mutex_lock(&LOCK_slave_background);
slave_background_thread_stop= true;
mysql_cond_broadcast(&COND_slave_background);
while (slave_background_thread_running)
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
}
/* Initialize slave structures */ /* Initialize slave structures */
int init_slave() int init_slave()
...@@ -361,7 +457,7 @@ int init_slave() ...@@ -361,7 +457,7 @@ int init_slave()
init_slave_psi_keys(); init_slave_psi_keys();
#endif #endif
if (run_slave_init_thread()) if (start_slave_background_thread())
return 1; return 1;
if (global_rpl_thread_pool.init(opt_slave_parallel_threads)) if (global_rpl_thread_pool.init(opt_slave_parallel_threads))
...@@ -1001,6 +1097,9 @@ void end_slave() ...@@ -1001,6 +1097,9 @@ void end_slave()
master_info_index= 0; master_info_index= 0;
active_mi= 0; active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_unlock(&LOCK_active_mi);
stop_slave_background_thread();
global_rpl_thread_pool.destroy(); global_rpl_thread_pool.destroy();
free_all_rpl_filters(); free_all_rpl_filters();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
......
...@@ -250,6 +250,7 @@ pthread_handler_t handle_slave_io(void *arg); ...@@ -250,6 +250,7 @@ pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(rpl_group_info *rgi, THD *thd); void slave_output_error_info(rpl_group_info *rgi, THD *thd);
pthread_handler_t handle_slave_sql(void *arg); pthread_handler_t handle_slave_sql(void *arg);
bool net_request_file(NET* net, const char* fname); bool net_request_file(NET* net, const char* fname);
void slave_background_kill_request(THD *to_kill);
extern bool volatile abort_loop; extern bool volatile abort_loop;
extern Master_info *active_mi; /* active_mi for multi-master */ extern Master_info *active_mi; /* active_mi for multi-master */
......
...@@ -4684,12 +4684,87 @@ thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd) ...@@ -4684,12 +4684,87 @@ thd_report_wait_for(MYSQL_THD thd, MYSQL_THD other_thd)
cause replication to rollback (and later re-try) the other transaction, cause replication to rollback (and later re-try) the other transaction,
releasing the lock for this transaction so replication can proceed. releasing the lock for this transaction so replication can proceed.
*/ */
other_rgi->killed_for_retry= true; other_rgi->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
mysql_mutex_lock(&other_thd->LOCK_thd_data); mysql_mutex_lock(&other_thd->LOCK_thd_data);
other_thd->awake(KILL_CONNECTION); other_thd->awake(KILL_CONNECTION);
mysql_mutex_unlock(&other_thd->LOCK_thd_data); mysql_mutex_unlock(&other_thd->LOCK_thd_data);
} }
/*
Used by storage engines (currently TokuDB) to report that one transaction
THD is about to go to wait for a transactional lock held by another
transactions OTHER_THD.
This is used for parallel replication, where transactions are required to
commit in the same order on the slave as they did on the master. If the
transactions on the slave encounter lock conflicts on the slave that did not
exist on the master, this can cause deadlocks. This is primarily used in
optimistic (and aggressive) modes.
Normally, such conflicts will not occur in conservative mode, because the
same conflict would have prevented the two transactions from committing in
parallel on the master, thus preventing them from running in parallel on the
slave in the first place. However, it is possible in case when the optimizer
chooses a different plan on the slave than on the master (eg. table scan
instead of index scan).
InnoDB/XtraDB reports lock waits using this call. If a lock wait causes a
deadlock with the pre-determined commit order, we kill the later transaction,
and later re-try it, to resolve the deadlock.
This call need only receive reports about waits for locks that will remain
until the holding transaction commits. InnoDB/XtraDB auto-increment locks,
for example, are released earlier, and so need not be reported. (Such false
positives are not harmful, but could lead to unnecessary kill and retry, so
best avoided).
Returns 1 if the OTHER_THD will be killed to resolve deadlock, 0 if not. The
actual kill will happen later, asynchronously from another thread. The
caller does not need to take any actions on the return value if the
handlerton kill_query method is implemented to abort the to-be-killed
transaction.
*/
extern "C" int
thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd)
{
rpl_group_info *rgi;
rpl_group_info *other_rgi;
if (!thd)
return 0;
DEBUG_SYNC(thd, "thd_report_wait_for");
thd->transaction.stmt.mark_trans_did_wait();
if (!other_thd)
return 0;
binlog_report_wait_for(thd, other_thd);
rgi= thd->rgi_slave;
other_rgi= other_thd->rgi_slave;
if (!rgi || !other_rgi)
return 0;
if (!rgi->is_parallel_exec)
return 0;
if (rgi->rli != other_rgi->rli)
return 0;
if (!rgi->gtid_sub_id || !other_rgi->gtid_sub_id)
return 0;
if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
return 0;
if (rgi->gtid_sub_id > other_rgi->gtid_sub_id)
return 0;
/*
This transaction is about to wait for another transaction that is required
by replication binlog order to commit after. This would cause a deadlock.
So send a kill to the other transaction, with a temporary error; this will
cause replication to rollback (and later re-try) the other transaction,
releasing the lock for this transaction so replication can proceed.
*/
#ifdef HAVE_REPLICATION
slave_background_kill_request(other_thd);
#endif
return 1;
}
/* /*
This function is called from InnoDB/XtraDB to check if the commit order of This function is called from InnoDB/XtraDB to check if the commit order of
two transactions has already been decided by the upper layer. This happens two transactions has already been decided by the upper layer. This happens
......
...@@ -1529,7 +1529,8 @@ enum enum_thread_type ...@@ -1529,7 +1529,8 @@ enum enum_thread_type
SYSTEM_THREAD_EVENT_SCHEDULER= 8, SYSTEM_THREAD_EVENT_SCHEDULER= 8,
SYSTEM_THREAD_EVENT_WORKER= 16, SYSTEM_THREAD_EVENT_WORKER= 16,
SYSTEM_THREAD_BINLOG_BACKGROUND= 32, SYSTEM_THREAD_BINLOG_BACKGROUND= 32,
SYSTEM_THREAD_SLAVE_INIT= 64 SYSTEM_THREAD_SLAVE_INIT= 64,
SYSTEM_THREAD_SLAVE_BACKGROUND= 128
}; };
inline char const * inline char const *
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment