Commit 6a1cb449 authored by Sergei Golubchik's avatar Sergei Golubchik

cleanup: remove slave background thread, use handle_manager thread instead

parent 990eb093
...@@ -17,6 +17,16 @@ processlist_info NULL ...@@ -17,6 +17,16 @@ processlist_info NULL
unified_parent_thread_id NULL unified_parent_thread_id NULL
role NULL role NULL
instrumented YES instrumented YES
name thread/sql/manager
type BACKGROUND
processlist_user NULL
processlist_host NULL
processlist_db NULL
processlist_command NULL
processlist_info NULL
unified_parent_thread_id unified parent_thread_id
role NULL
instrumented YES
name thread/sql/one_connection name thread/sql/one_connection
type FOREGROUND type FOREGROUND
processlist_user root processlist_user root
...@@ -44,16 +54,6 @@ processlist_info NULL ...@@ -44,16 +54,6 @@ processlist_info NULL
unified_parent_thread_id unified parent_thread_id unified_parent_thread_id unified parent_thread_id
role NULL role NULL
instrumented YES instrumented YES
name thread/sql/slave_background
type BACKGROUND
processlist_user NULL
processlist_host NULL
processlist_db NULL
processlist_command NULL
processlist_info NULL
unified_parent_thread_id unified parent_thread_id
role NULL
instrumented YES
CREATE TEMPORARY TABLE t1 AS CREATE TEMPORARY TABLE t1 AS
SELECT thread_id FROM performance_schema.threads SELECT thread_id FROM performance_schema.threads
WHERE name LIKE 'thread/sql%'; WHERE name LIKE 'thread/sql%';
...@@ -113,7 +113,7 @@ WHERE t1.name LIKE 'thread/sql%' ...@@ -113,7 +113,7 @@ WHERE t1.name LIKE 'thread/sql%'
ORDER BY parent_thread_name, child_thread_name; ORDER BY parent_thread_name, child_thread_name;
parent_thread_name child_thread_name parent_thread_name child_thread_name
thread/sql/event_scheduler thread/sql/event_worker thread/sql/event_scheduler thread/sql/event_worker
thread/sql/main thread/sql/manager
thread/sql/main thread/sql/one_connection thread/sql/main thread/sql/one_connection
thread/sql/main thread/sql/signal_handler thread/sql/main thread/sql/signal_handler
thread/sql/main thread/sql/slave_background
thread/sql/one_connection thread/sql/event_scheduler thread/sql/one_connection thread/sql/event_scheduler
...@@ -385,7 +385,6 @@ static bool binlog_format_used= false; ...@@ -385,7 +385,6 @@ static bool binlog_format_used= false;
LEX_STRING opt_init_connect, opt_init_slave; LEX_STRING opt_init_connect, opt_init_slave;
mysql_cond_t COND_thread_cache; mysql_cond_t COND_thread_cache;
static mysql_cond_t COND_flush_thread_cache; static mysql_cond_t COND_flush_thread_cache;
mysql_cond_t COND_slave_background;
static DYNAMIC_ARRAY all_options; static DYNAMIC_ARRAY all_options;
/* Global variables */ /* Global variables */
...@@ -758,7 +757,7 @@ mysql_mutex_t ...@@ -758,7 +757,7 @@ mysql_mutex_t
LOCK_crypt, LOCK_crypt,
LOCK_global_system_variables, LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_user_conn, LOCK_slave_list,
LOCK_connection_count, LOCK_error_messages, LOCK_slave_background; LOCK_connection_count, LOCK_error_messages;
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats, mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats; LOCK_global_table_stats, LOCK_global_index_stats;
...@@ -947,8 +946,7 @@ PSI_mutex_key key_LOCK_stats, ...@@ -947,8 +946,7 @@ PSI_mutex_key key_LOCK_stats,
PSI_mutex_key key_LOCK_gtid_waiting; PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_after_binlog_sync; PSI_mutex_key key_LOCK_after_binlog_sync;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered, PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
key_LOCK_slave_background;
PSI_mutex_key key_TABLE_SHARE_LOCK_share; PSI_mutex_key key_TABLE_SHARE_LOCK_share;
static PSI_mutex_info all_server_mutexes[]= static PSI_mutex_info all_server_mutexes[]=
...@@ -1017,7 +1015,6 @@ static PSI_mutex_info all_server_mutexes[]= ...@@ -1017,7 +1015,6 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL}, { &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_after_binlog_sync, "LOCK_after_binlog_sync", PSI_FLAG_GLOBAL}, { &key_LOCK_after_binlog_sync, "LOCK_after_binlog_sync", PSI_FLAG_GLOBAL},
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL}, { &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_background, "LOCK_slave_background", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0}, { &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL}, { &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL}, { &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
...@@ -1074,7 +1071,7 @@ PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; ...@@ -1074,7 +1071,7 @@ PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread, PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_rpl_thread_stop, key_COND_rpl_thread_pool, key_COND_rpl_thread_stop, key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer, key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered, key_COND_slave_background; key_COND_prepare_ordered;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates; PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
static PSI_cond_info all_server_conds[]= static PSI_cond_info all_server_conds[]=
...@@ -1124,7 +1121,6 @@ static PSI_cond_info all_server_conds[]= ...@@ -1124,7 +1121,6 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_parallel_entry, "COND_parallel_entry", 0}, { &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0}, { &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0}, { &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_slave_background, "COND_slave_background", 0},
{ &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL}, { &key_COND_start_thread, "COND_start_thread", PSI_FLAG_GLOBAL},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0}, { &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0} { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
...@@ -2379,8 +2375,6 @@ static void clean_up_mutexes() ...@@ -2379,8 +2375,6 @@ static void clean_up_mutexes()
mysql_cond_destroy(&COND_prepare_ordered); mysql_cond_destroy(&COND_prepare_ordered);
mysql_mutex_destroy(&LOCK_after_binlog_sync); mysql_mutex_destroy(&LOCK_after_binlog_sync);
mysql_mutex_destroy(&LOCK_commit_ordered); mysql_mutex_destroy(&LOCK_commit_ordered);
mysql_mutex_destroy(&LOCK_slave_background);
mysql_cond_destroy(&COND_slave_background);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -4838,9 +4832,6 @@ static int init_thread_environment() ...@@ -4838,9 +4832,6 @@ static int init_thread_environment()
MY_MUTEX_INIT_SLOW); MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered, mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
MY_MUTEX_INIT_SLOW); MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_slave_background, &LOCK_slave_background,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_slave_background, &COND_slave_background, NULL);
#ifdef HAVE_OPENSSL #ifdef HAVE_OPENSSL
mysql_mutex_init(key_LOCK_des_key_file, mysql_mutex_init(key_LOCK_des_key_file,
......
...@@ -568,8 +568,7 @@ extern mysql_mutex_t ...@@ -568,8 +568,7 @@ extern mysql_mutex_t
LOCK_error_log, LOCK_delayed_insert, LOCK_short_uuid_generator, LOCK_error_log, LOCK_delayed_insert, LOCK_short_uuid_generator,
LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone, LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
LOCK_slave_list, LOCK_active_mi, LOCK_manager, LOCK_user_conn, LOCK_slave_list, LOCK_active_mi, LOCK_manager, LOCK_user_conn,
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count, LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count;
LOCK_slave_background;
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count, extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count,
LOCK_global_system_variables; LOCK_global_system_variables;
extern mysql_mutex_t LOCK_start_thread; extern mysql_mutex_t LOCK_start_thread;
...@@ -583,7 +582,6 @@ extern mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave; ...@@ -583,7 +582,6 @@ extern mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
extern mysql_prlock_t LOCK_system_variables_hash; extern mysql_prlock_t LOCK_system_variables_hash;
extern mysql_cond_t COND_thread_count, COND_start_thread; extern mysql_cond_t COND_thread_count, COND_start_thread;
extern mysql_cond_t COND_manager; extern mysql_cond_t COND_manager;
extern mysql_cond_t COND_slave_background;
extern int32 thread_running; extern int32 thread_running;
extern int32 thread_count, service_thread_count; extern int32 thread_count, service_thread_count;
......
...@@ -60,6 +60,7 @@ ...@@ -60,6 +60,7 @@
#include "rpl_tblmap.h" #include "rpl_tblmap.h"
#include "debug_sync.h" #include "debug_sync.h"
#include "rpl_parallel.h" #include "rpl_parallel.h"
#include "sql_manager.h"
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"") #define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
...@@ -279,8 +280,6 @@ static void init_slave_psi_keys(void) ...@@ -279,8 +280,6 @@ static void init_slave_psi_keys(void)
#endif /* HAVE_PSI_INTERFACE */ #endif /* HAVE_PSI_INTERFACE */
static bool slave_background_thread_running;
static bool slave_background_thread_stop;
static bool slave_background_thread_gtid_loaded; static bool slave_background_thread_gtid_loaded;
struct slave_background_kill_t { struct slave_background_kill_t {
...@@ -289,24 +288,15 @@ struct slave_background_kill_t { ...@@ -289,24 +288,15 @@ struct slave_background_kill_t {
} *slave_background_kill_list; } *slave_background_kill_list;
pthread_handler_t static void bg_rpl_load_gtid_slave_state(void *)
handle_slave_background(void *arg __attribute__((unused)))
{ {
THD *thd; THD *thd= new THD(next_thread_id());
PSI_stage_info old_stage;
bool stop;
my_thread_init();
thd= new THD(next_thread_id());
thd->thread_stack= (char*) &thd; /* Set approximate stack start */ thd->thread_stack= (char*) &thd; /* Set approximate stack start */
thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND; thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
thread_safe_increment32(&service_thread_count);
thd->store_globals(); thd->store_globals();
thd->security_ctx->skip_grants(); thd->security_ctx->skip_grants();
thd->set_command(COM_DAEMON); thd->set_command(COM_DAEMON);
#ifdef WITH_WSREP
thd->variables.wsrep_on= 0; thd->variables.wsrep_on= 0;
#endif
thd_proc_info(thd, "Loading slave GTID position from table"); thd_proc_info(thd, "Loading slave GTID position from table");
if (rpl_load_gtid_slave_state(thd)) if (rpl_load_gtid_slave_state(thd))
...@@ -316,136 +306,34 @@ handle_slave_background(void *arg __attribute__((unused))) ...@@ -316,136 +306,34 @@ handle_slave_background(void *arg __attribute__((unused)))
thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message()); thd->get_stmt_da()->message());
mysql_mutex_lock(&LOCK_slave_background); // hijacking global_rpl_thread_pool cond here - it's only once on startup
mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
slave_background_thread_gtid_loaded= true; slave_background_thread_gtid_loaded= true;
mysql_cond_broadcast(&COND_slave_background); mysql_cond_signal(&global_rpl_thread_pool.COND_rpl_thread_pool);
mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
THD_STAGE_INFO(thd, stage_slave_background_process_request); delete thd;
do }
{
slave_background_kill_t *kill_list;
thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
&stage_slave_background_wait_request,
&old_stage);
for (;;)
{
stop= abort_loop || thd->killed || slave_background_thread_stop;
kill_list= slave_background_kill_list;
if (stop || kill_list)
break;
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
}
slave_background_kill_list= NULL;
thd->EXIT_COND(&old_stage);
while (kill_list)
{
slave_background_kill_t *p = kill_list;
THD *to_kill= p->to_kill;
kill_list= p->next;
static void bg_slave_kill(void *victim)
{
THD *to_kill= (THD *)victim;
mysql_mutex_lock(&to_kill->LOCK_thd_data); mysql_mutex_lock(&to_kill->LOCK_thd_data);
to_kill->awake(KILL_CONNECTION); to_kill->awake(KILL_CONNECTION);
mysql_mutex_unlock(&to_kill->LOCK_thd_data); mysql_mutex_unlock(&to_kill->LOCK_thd_data);
mysql_mutex_lock(&to_kill->LOCK_wakeup_ready); mysql_mutex_lock(&to_kill->LOCK_wakeup_ready);
to_kill->rgi_slave->killed_for_retry= to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_KILLED;
rpl_group_info::RETRY_KILL_KILLED;
mysql_cond_broadcast(&to_kill->COND_wakeup_ready); mysql_cond_broadcast(&to_kill->COND_wakeup_ready);
mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready); mysql_mutex_unlock(&to_kill->LOCK_wakeup_ready);
my_free(p);
}
mysql_mutex_lock(&LOCK_slave_background);
} while (!stop);
slave_background_thread_running= false;
mysql_cond_broadcast(&COND_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
delete thd;
thread_safe_decrement32(&service_thread_count);
signal_thd_deleted();
my_thread_end();
return 0;
} }
void slave_background_kill_request(THD *to_kill)
void
slave_background_kill_request(THD *to_kill)
{ {
if (to_kill->rgi_slave->killed_for_retry) if (to_kill->rgi_slave->killed_for_retry)
return; // Already deadlock killed. return; // Already deadlock killed.
slave_background_kill_t *p= to_kill->rgi_slave->killed_for_retry= rpl_group_info::RETRY_KILL_PENDING;
(slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME)); mysql_manager_submit(bg_slave_kill, to_kill);
if (p)
{
p->to_kill= to_kill;
to_kill->rgi_slave->killed_for_retry=
rpl_group_info::RETRY_KILL_PENDING;
mysql_mutex_lock(&LOCK_slave_background);
p->next= slave_background_kill_list;
slave_background_kill_list= p;
mysql_cond_signal(&COND_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
}
} }
/*
Start the slave background thread.
This thread is currently used for two purposes:
1. To load the GTID state from mysql.gtid_slave_pos at server start; reading
from table requires valid THD, which is otherwise not available during
server init.
2. To kill worker thread transactions during parallel replication, when a
storage engine attempts to take an errorneous conflicting lock that would
cause a deadlock. Killing is done asynchroneously, as the kill may not
be safe within the context of a callback from inside storage engine
locking code.
*/
static int
start_slave_background_thread()
{
pthread_t th;
slave_background_thread_running= true;
slave_background_thread_stop= false;
slave_background_thread_gtid_loaded= false;
if (mysql_thread_create(key_thread_slave_background,
&th, &connection_attrib, handle_slave_background,
NULL))
{
sql_print_error("Failed to create thread while initialising slave");
return 1;
}
mysql_mutex_lock(&LOCK_slave_background);
while (!slave_background_thread_gtid_loaded)
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
return 0;
}
static void
stop_slave_background_thread()
{
mysql_mutex_lock(&LOCK_slave_background);
slave_background_thread_stop= true;
mysql_cond_broadcast(&COND_slave_background);
while (slave_background_thread_running)
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
}
/* Initialize slave structures */ /* Initialize slave structures */
int init_slave() int init_slave()
...@@ -457,12 +345,19 @@ int init_slave() ...@@ -457,12 +345,19 @@ int init_slave()
init_slave_psi_keys(); init_slave_psi_keys();
#endif #endif
if (start_slave_background_thread())
return 1;
if (global_rpl_thread_pool.init(opt_slave_parallel_threads)) if (global_rpl_thread_pool.init(opt_slave_parallel_threads))
return 1; return 1;
slave_background_thread_gtid_loaded= false;
mysql_manager_submit(bg_rpl_load_gtid_slave_state, NULL);
// hijacking global_rpl_thread_pool cond here - it's only once on startup
mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
while (!slave_background_thread_gtid_loaded)
mysql_cond_wait(&global_rpl_thread_pool.COND_rpl_thread_pool,
&global_rpl_thread_pool.LOCK_rpl_thread_pool);
mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
/* /*
This is called when mysqld starts. Before client connections are This is called when mysqld starts. Before client connections are
accepted. However bootstrap may conflict with us if it does START SLAVE. accepted. However bootstrap may conflict with us if it does START SLAVE.
...@@ -1080,7 +975,6 @@ void slave_prepare_for_shutdown() ...@@ -1080,7 +975,6 @@ void slave_prepare_for_shutdown()
mysql_mutex_lock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_active_mi);
master_info_index->free_connections(); master_info_index->free_connections();
mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_unlock(&LOCK_active_mi);
stop_slave_background_thread();
} }
/* /*
...@@ -1111,8 +1005,6 @@ void end_slave() ...@@ -1111,8 +1005,6 @@ void end_slave()
active_mi= 0; active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_unlock(&LOCK_active_mi);
stop_slave_background_thread();
global_rpl_thread_pool.destroy(); global_rpl_thread_pool.destroy();
free_all_rpl_filters(); free_all_rpl_filters();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -4335,9 +4227,7 @@ pthread_handler_t handle_slave_io(void *arg) ...@@ -4335,9 +4227,7 @@ pthread_handler_t handle_slave_io(void *arg)
goto err; goto err;
} }
#ifdef WITH_WSREP
thd->variables.wsrep_on= 0; thd->variables.wsrep_on= 0;
#endif
if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi))) if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
{ {
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment