Commit 8a931e4d authored by Jan Lindström's avatar Jan Lindström

MDEV-17571 : Make systemd timeout behavior more compatible with long Galera SSTs

This is 10.4 version.

Idea is to create monitor thread for both donor and joiner that will
periodically if needed extend systemd timeout while SST is being
processed. In 10.4 actual SST is executed by running SST script
and exchanging messages on pipe using blocking fgets. This fix
starts monitoring thread before SST script is started and
we stop monitoring thread when SST has been completed.
parent 6918157e
...@@ -15,7 +15,9 @@ select name from mutex_instances where name like 'wait/synch/mutex/sql/LOCK_wsre ...@@ -15,7 +15,9 @@ select name from mutex_instances where name like 'wait/synch/mutex/sql/LOCK_wsre
name wait/synch/mutex/sql/LOCK_wsrep_cluster_config name wait/synch/mutex/sql/LOCK_wsrep_cluster_config
name wait/synch/mutex/sql/LOCK_wsrep_config_state name wait/synch/mutex/sql/LOCK_wsrep_config_state
name wait/synch/mutex/sql/LOCK_wsrep_desync name wait/synch/mutex/sql/LOCK_wsrep_desync
name wait/synch/mutex/sql/LOCK_wsrep_donor_monitor
name wait/synch/mutex/sql/LOCK_wsrep_group_commit name wait/synch/mutex/sql/LOCK_wsrep_group_commit
name wait/synch/mutex/sql/LOCK_wsrep_joiner_monitor
name wait/synch/mutex/sql/LOCK_wsrep_ready name wait/synch/mutex/sql/LOCK_wsrep_ready
name wait/synch/mutex/sql/LOCK_wsrep_replaying name wait/synch/mutex/sql/LOCK_wsrep_replaying
name wait/synch/mutex/sql/LOCK_wsrep_slave_threads name wait/synch/mutex/sql/LOCK_wsrep_slave_threads
...@@ -24,6 +26,8 @@ name wait/synch/mutex/sql/LOCK_wsrep_SR_store ...@@ -24,6 +26,8 @@ name wait/synch/mutex/sql/LOCK_wsrep_SR_store
name wait/synch/mutex/sql/LOCK_wsrep_sst name wait/synch/mutex/sql/LOCK_wsrep_sst
name wait/synch/mutex/sql/LOCK_wsrep_sst_init name wait/synch/mutex/sql/LOCK_wsrep_sst_init
select name from cond_instances where name like 'wait/synch/cond/sql/COND_wsrep%' order by name; select name from cond_instances where name like 'wait/synch/cond/sql/COND_wsrep%' order by name;
name wait/synch/cond/sql/COND_wsrep_donor_monitor
name wait/synch/cond/sql/COND_wsrep_joiner_monitor
name wait/synch/cond/sql/COND_wsrep_ready name wait/synch/cond/sql/COND_wsrep_ready
name wait/synch/cond/sql/COND_wsrep_replaying name wait/synch/cond/sql/COND_wsrep_replaying
name wait/synch/cond/sql/COND_wsrep_sst name wait/synch/cond/sql/COND_wsrep_sst
......
...@@ -150,6 +150,10 @@ mysql_mutex_t LOCK_wsrep_config_state; ...@@ -150,6 +150,10 @@ mysql_mutex_t LOCK_wsrep_config_state;
mysql_mutex_t LOCK_wsrep_group_commit; mysql_mutex_t LOCK_wsrep_group_commit;
mysql_mutex_t LOCK_wsrep_SR_pool; mysql_mutex_t LOCK_wsrep_SR_pool;
mysql_mutex_t LOCK_wsrep_SR_store; mysql_mutex_t LOCK_wsrep_SR_store;
mysql_mutex_t LOCK_wsrep_joiner_monitor;
mysql_mutex_t LOCK_wsrep_donor_monitor;
mysql_cond_t COND_wsrep_joiner_monitor;
mysql_cond_t COND_wsrep_donor_monitor;
int wsrep_replaying= 0; int wsrep_replaying= 0;
ulong wsrep_running_threads = 0; // # of currently running wsrep ulong wsrep_running_threads = 0; // # of currently running wsrep
...@@ -168,13 +172,15 @@ PSI_mutex_key ...@@ -168,13 +172,15 @@ PSI_mutex_key
key_LOCK_wsrep_group_commit, key_LOCK_wsrep_group_commit,
key_LOCK_wsrep_SR_pool, key_LOCK_wsrep_SR_pool,
key_LOCK_wsrep_SR_store, key_LOCK_wsrep_SR_store,
key_LOCK_wsrep_thd_queue; key_LOCK_wsrep_thd_queue,
key_LOCK_wsrep_joiner_monitor,
key_LOCK_wsrep_donor_monitor;
PSI_cond_key key_COND_wsrep_thd, PSI_cond_key key_COND_wsrep_thd,
key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst, key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread, key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads; key_COND_wsrep_thd_queue, key_COND_wsrep_slave_threads,
key_COND_wsrep_joiner_monitor, key_COND_wsrep_donor_monitor;
PSI_file_key key_file_wsrep_gra_log; PSI_file_key key_file_wsrep_gra_log;
...@@ -192,7 +198,9 @@ static PSI_mutex_info wsrep_mutexes[]= ...@@ -192,7 +198,9 @@ static PSI_mutex_info wsrep_mutexes[]=
{ &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_group_commit, "LOCK_wsrep_group_commit", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_group_commit, "LOCK_wsrep_group_commit", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_SR_pool, "LOCK_wsrep_SR_pool", PSI_FLAG_GLOBAL}, { &key_LOCK_wsrep_SR_pool, "LOCK_wsrep_SR_pool", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_SR_store, "LOCK_wsrep_SR_store", PSI_FLAG_GLOBAL} { &key_LOCK_wsrep_SR_store, "LOCK_wsrep_SR_store", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_joiner_monitor, "LOCK_wsrep_joiner_monitor", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_donor_monitor, "LOCK_wsrep_donor_monitor", PSI_FLAG_GLOBAL}
}; };
static PSI_cond_info wsrep_conds[]= static PSI_cond_info wsrep_conds[]=
...@@ -203,7 +211,9 @@ static PSI_cond_info wsrep_conds[]= ...@@ -203,7 +211,9 @@ static PSI_cond_info wsrep_conds[]=
{ &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0}, { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
{ &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0}, { &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0},
{ &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}, { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_slave_threads, "COND_wsrep_wsrep_slave_threads", PSI_FLAG_GLOBAL} { &key_COND_wsrep_slave_threads, "COND_wsrep_wsrep_slave_threads", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_joiner_monitor, "COND_wsrep_joiner_monitor", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_donor_monitor, "COND_wsrep_donor_monitor", PSI_FLAG_GLOBAL}
}; };
static PSI_file_info wsrep_files[]= static PSI_file_info wsrep_files[]=
...@@ -212,14 +222,17 @@ static PSI_file_info wsrep_files[]= ...@@ -212,14 +222,17 @@ static PSI_file_info wsrep_files[]=
}; };
PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor, PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor,
key_wsrep_rollbacker, key_wsrep_applier; key_wsrep_rollbacker, key_wsrep_applier,
key_wsrep_sst_joiner_monitor, key_wsrep_sst_donor_monitor;
static PSI_thread_info wsrep_threads[]= static PSI_thread_info wsrep_threads[]=
{ {
{&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL}, {&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL}, {&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL}, {&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL} {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_sst_joiner_monitor, "wsrep_sst_joiner_monitor", PSI_FLAG_GLOBAL},
{&key_wsrep_sst_donor_monitor, "wsrep_sst_donor_monitor", PSI_FLAG_GLOBAL}
}; };
#endif /* HAVE_PSI_INTERFACE */ #endif /* HAVE_PSI_INTERFACE */
...@@ -788,6 +801,13 @@ void wsrep_thr_init() ...@@ -788,6 +801,13 @@ void wsrep_thr_init()
&LOCK_wsrep_SR_pool, MY_MUTEX_INIT_FAST); &LOCK_wsrep_SR_pool, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_SR_store, mysql_mutex_init(key_LOCK_wsrep_SR_store,
&LOCK_wsrep_SR_store, MY_MUTEX_INIT_FAST); &LOCK_wsrep_SR_store, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_joiner_monitor,
&LOCK_wsrep_joiner_monitor, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_donor_monitor,
&LOCK_wsrep_donor_monitor, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_joiner_monitor, &COND_wsrep_joiner_monitor, NULL);
mysql_cond_init(key_COND_wsrep_donor_monitor, &COND_wsrep_donor_monitor, NULL);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -891,6 +911,10 @@ void wsrep_thr_deinit() ...@@ -891,6 +911,10 @@ void wsrep_thr_deinit()
mysql_mutex_destroy(&LOCK_wsrep_group_commit); mysql_mutex_destroy(&LOCK_wsrep_group_commit);
mysql_mutex_destroy(&LOCK_wsrep_SR_pool); mysql_mutex_destroy(&LOCK_wsrep_SR_pool);
mysql_mutex_destroy(&LOCK_wsrep_SR_store); mysql_mutex_destroy(&LOCK_wsrep_SR_store);
mysql_mutex_destroy(&LOCK_wsrep_joiner_monitor);
mysql_mutex_destroy(&LOCK_wsrep_donor_monitor);
mysql_cond_destroy(&COND_wsrep_joiner_monitor);
mysql_cond_destroy(&COND_wsrep_donor_monitor);
delete wsrep_config_state; delete wsrep_config_state;
wsrep_config_state= 0; // Safety wsrep_config_state= 0; // Safety
......
...@@ -309,6 +309,11 @@ extern mysql_mutex_t LOCK_wsrep_SR_pool; ...@@ -309,6 +309,11 @@ extern mysql_mutex_t LOCK_wsrep_SR_pool;
extern mysql_mutex_t LOCK_wsrep_SR_store; extern mysql_mutex_t LOCK_wsrep_SR_store;
extern mysql_mutex_t LOCK_wsrep_config_state; extern mysql_mutex_t LOCK_wsrep_config_state;
extern mysql_mutex_t LOCK_wsrep_group_commit; extern mysql_mutex_t LOCK_wsrep_group_commit;
extern mysql_mutex_t LOCK_wsrep_joiner_monitor;
extern mysql_mutex_t LOCK_wsrep_donor_monitor;
extern mysql_cond_t COND_wsrep_joiner_monitor;
extern mysql_cond_t COND_wsrep_donor_monitor;
extern my_bool wsrep_emulate_bin_log; extern my_bool wsrep_emulate_bin_log;
extern int wsrep_to_isolation; extern int wsrep_to_isolation;
#ifdef GTID_SUPPORT #ifdef GTID_SUPPORT
...@@ -339,6 +344,8 @@ extern PSI_mutex_key key_LOCK_wsrep_SR_store; ...@@ -339,6 +344,8 @@ extern PSI_mutex_key key_LOCK_wsrep_SR_store;
extern PSI_mutex_key key_LOCK_wsrep_global_seqno; extern PSI_mutex_key key_LOCK_wsrep_global_seqno;
extern PSI_mutex_key key_LOCK_wsrep_thd_queue; extern PSI_mutex_key key_LOCK_wsrep_thd_queue;
extern PSI_cond_key key_COND_wsrep_thd_queue; extern PSI_cond_key key_COND_wsrep_thd_queue;
extern PSI_mutex_key key_LOCK_wsrep_joiner_monitor;
extern PSI_mutex_key key_LOCK_wsrep_donor_monitor;
extern PSI_file_key key_file_wsrep_gra_log; extern PSI_file_key key_file_wsrep_gra_log;
...@@ -346,6 +353,8 @@ extern PSI_thread_key key_wsrep_sst_joiner; ...@@ -346,6 +353,8 @@ extern PSI_thread_key key_wsrep_sst_joiner;
extern PSI_thread_key key_wsrep_sst_donor; extern PSI_thread_key key_wsrep_sst_donor;
extern PSI_thread_key key_wsrep_rollbacker; extern PSI_thread_key key_wsrep_rollbacker;
extern PSI_thread_key key_wsrep_applier; extern PSI_thread_key key_wsrep_applier;
extern PSI_thread_key key_wsrep_sst_joiner_monitor;
extern PSI_thread_key key_wsrep_sst_donor_monitor;
#endif /* HAVE_PSI_INTERFACE */ #endif /* HAVE_PSI_INTERFACE */
......
...@@ -49,6 +49,126 @@ const char* wsrep_sst_auth = NULL; ...@@ -49,6 +49,126 @@ const char* wsrep_sst_auth = NULL;
static const char* sst_auth_real = NULL; static const char* sst_auth_real = NULL;
my_bool wsrep_sst_donor_rejects_queries= FALSE; my_bool wsrep_sst_donor_rejects_queries= FALSE;
#define WSREP_EXTEND_TIMEOUT_INTERVAL 60
#define WSREP_TIMEDWAIT_SECONDS 30
bool sst_joiner_completed = false;
bool sst_donor_completed = false;
struct sst_thread_arg
{
const char* cmd;
char** env;
char* ret_str;
int err;
mysql_mutex_t lock;
mysql_cond_t cond;
sst_thread_arg (const char* c, char** e)
: cmd(c), env(e), ret_str(0), err(-1)
{
mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL);
}
~sst_thread_arg()
{
mysql_cond_destroy (&cond);
mysql_mutex_unlock (&lock);
mysql_mutex_destroy (&lock);
}
};
static void wsrep_donor_monitor_end(void)
{
mysql_mutex_lock(&LOCK_wsrep_donor_monitor);
sst_donor_completed= true;
mysql_cond_signal(&COND_wsrep_donor_monitor);
mysql_mutex_unlock(&LOCK_wsrep_donor_monitor);
}
static void wsrep_joiner_monitor_end(void)
{
mysql_mutex_lock(&LOCK_wsrep_joiner_monitor);
sst_joiner_completed= true;
mysql_cond_signal(&COND_wsrep_joiner_monitor);
mysql_mutex_unlock(&LOCK_wsrep_joiner_monitor);
}
static void* wsrep_sst_donor_monitor_thread(void *arg __attribute__((unused)))
{
int ret= 0;
unsigned long time_waited= 0;
mysql_mutex_lock(&LOCK_wsrep_donor_monitor);
WSREP_INFO("Donor monitor thread started to monitor");
wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
// operate with wsrep_ready == OFF
while (!sst_donor_completed)
{
timespec ts;
set_timespec(ts, WSREP_TIMEDWAIT_SECONDS);
time_t start_time= time(NULL);
ret= mysql_cond_timedwait(&COND_wsrep_donor_monitor, &LOCK_wsrep_donor_monitor, &ts);
time_t end_time= time(NULL);
time_waited+= difftime(end_time, start_time);
if (ret == ETIMEDOUT && !sst_donor_completed)
{
WSREP_DEBUG("Donor waited %lu sec, extending systemd startup timeout as SST"
"is not completed",
time_waited);
service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
"WSREP state transfer ongoing...");
}
}
WSREP_INFO("Donor monitor thread ended with total time %lu sec", time_waited);
mysql_mutex_unlock(&LOCK_wsrep_donor_monitor);
return NULL;
}
static void* wsrep_sst_joiner_monitor_thread(void *arg __attribute__((unused)))
{
int ret= 0;
unsigned long time_waited= 0;
mysql_mutex_lock(&LOCK_wsrep_joiner_monitor);
WSREP_INFO("Joiner monitor thread started to monitor");
wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
// operate with wsrep_ready == OFF
while (!sst_joiner_completed)
{
timespec ts;
set_timespec(ts, WSREP_TIMEDWAIT_SECONDS);
time_t start_time= time(NULL);
ret= mysql_cond_timedwait(&COND_wsrep_joiner_monitor, &LOCK_wsrep_joiner_monitor, &ts);
time_t end_time= time(NULL);
time_waited+= difftime(end_time, start_time);
if (ret == ETIMEDOUT && !sst_joiner_completed)
{
WSREP_DEBUG("Joiner waited %lu sec, extending systemd startup timeout as SST"
"is not completed",
time_waited);
service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
"WSREP state transfer ongoing...");
}
}
WSREP_INFO("Joiner monitor thread ended with total time %lu sec", time_waited);
mysql_mutex_unlock(&LOCK_wsrep_joiner_monitor);
return NULL;
}
bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var) bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
{ {
if ((! var->save_result.string_value.str) || if ((! var->save_result.string_value.str) ||
...@@ -193,6 +313,7 @@ static void wsrep_sst_complete (THD* thd, ...@@ -193,6 +313,7 @@ static void wsrep_sst_complete (THD* thd,
{ {
Wsrep_client_service client_service(thd, thd->wsrep_cs()); Wsrep_client_service client_service(thd, thd->wsrep_cs());
Wsrep_server_state::instance().sst_received(client_service, rcode); Wsrep_server_state::instance().sst_received(client_service, rcode);
wsrep_joiner_monitor_end();
} }
/* /*
...@@ -253,30 +374,6 @@ void wsrep_sst_received (THD* thd, ...@@ -253,30 +374,6 @@ void wsrep_sst_received (THD* thd,
} }
} }
struct sst_thread_arg
{
const char* cmd;
char** env;
char* ret_str;
int err;
mysql_mutex_t lock;
mysql_cond_t cond;
sst_thread_arg (const char* c, char** e)
: cmd(c), env(e), ret_str(0), err(-1)
{
mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL);
}
~sst_thread_arg()
{
mysql_cond_destroy (&cond);
mysql_mutex_unlock (&lock);
mysql_mutex_destroy (&lock);
}
};
static int sst_scan_uuid_seqno (const char* str, static int sst_scan_uuid_seqno (const char* str,
wsrep_uuid_t* uuid, wsrep_seqno_t* seqno) wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
{ {
...@@ -442,10 +539,12 @@ static void* sst_joiner_thread (void* a) ...@@ -442,10 +539,12 @@ static void* sst_joiner_thread (void* a)
wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED; wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED;
wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED;
// in case of successfull receiver start, wait for SST completion/end // in case of successfull receiver start, wait for SST
// completion/end
char* tmp= my_fgets (out, out_len, proc.pipe()); char* tmp= my_fgets (out, out_len, proc.pipe());
proc.wait(); proc.wait();
err= EINVAL; err= EINVAL;
if (!tmp) if (!tmp)
...@@ -989,16 +1088,33 @@ static ssize_t sst_prepare_other (const char* method, ...@@ -989,16 +1088,33 @@ static ssize_t sst_prepare_other (const char* method,
} }
} }
pthread_t tmp; pthread_t tmp, monitor;
sst_thread_arg arg(cmd_str(), env()); sst_thread_arg arg(cmd_str(), env());
mysql_mutex_lock (&arg.lock); mysql_mutex_lock (&arg.lock);
ret = mysql_thread_create (key_wsrep_sst_joiner, &tmp, NULL, sst_joiner_thread, &arg);
ret = mysql_thread_create (key_wsrep_sst_joiner_monitor, &monitor, NULL, wsrep_sst_joiner_monitor_thread, NULL);
if (ret)
{
WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)",
ret, strerror(ret));
return -ret;
}
sst_joiner_completed= false;
ret= mysql_thread_create (key_wsrep_sst_joiner, &tmp, NULL, sst_joiner_thread, &arg);
if (ret) if (ret)
{ {
WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)", WSREP_ERROR("sst_prepare_other(): mysql_thread_create() failed: %d (%s)",
ret, strerror(ret)); ret, strerror(ret));
pthread_detach(monitor);
return -ret; return -ret;
} }
mysql_cond_wait (&arg.cond, &arg.lock); mysql_cond_wait (&arg.cond, &arg.lock);
*addr_out= arg.ret_str; *addr_out= arg.ret_str;
...@@ -1012,6 +1128,7 @@ static ssize_t sst_prepare_other (const char* method, ...@@ -1012,6 +1128,7 @@ static ssize_t sst_prepare_other (const char* method,
} }
pthread_detach (tmp); pthread_detach (tmp);
pthread_detach (monitor);
return ret; return ret;
} }
...@@ -1509,6 +1626,7 @@ static void* sst_donor_thread (void* a) ...@@ -1509,6 +1626,7 @@ static void* sst_donor_thread (void* a)
wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can
// operate with wsrep_ready == OFF // operate with wsrep_ready == OFF
wsp::process proc(arg->cmd, "r", arg->env); wsp::process proc(arg->cmd, "r", arg->env);
err= -proc.error(); err= -proc.error();
...@@ -1604,9 +1722,13 @@ static void* sst_donor_thread (void* a) ...@@ -1604,9 +1722,13 @@ static void* sst_donor_thread (void* a)
wsrep::gtid gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)), wsrep::gtid gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)),
wsrep::seqno(err ? wsrep::seqno::undefined() : wsrep::seqno(err ? wsrep::seqno::undefined() :
wsrep::seqno(ret_seqno))); wsrep::seqno(ret_seqno)));
Wsrep_server_state::instance().sst_sent(gtid, err); Wsrep_server_state::instance().sst_sent(gtid, err);
proc.wait(); proc.wait();
wsrep_donor_monitor_end();
return NULL; return NULL;
} }
...@@ -1681,14 +1803,18 @@ static int sst_donate_other (const char* method, ...@@ -1681,14 +1803,18 @@ static int sst_donate_other (const char* method,
pthread_t tmp; pthread_t tmp;
sst_thread_arg arg(cmd_str(), env); sst_thread_arg arg(cmd_str(), env);
mysql_mutex_lock (&arg.lock); mysql_mutex_lock (&arg.lock);
ret = mysql_thread_create (key_wsrep_sst_donor, &tmp, NULL, sst_donor_thread, &arg);
ret= mysql_thread_create (key_wsrep_sst_donor, &tmp, NULL, sst_donor_thread, &arg);
if (ret) if (ret)
{ {
WSREP_ERROR("sst_donate_other(): mysql_thread_create() failed: %d (%s)", WSREP_ERROR("sst_donate_other(): mysql_thread_create() failed: %d (%s)",
ret, strerror(ret)); ret, strerror(ret));
return ret; return ret;
} }
mysql_cond_wait (&arg.cond, &arg.lock); mysql_cond_wait (&arg.cond, &arg.lock);
WSREP_INFO("sst_donor_thread signaled with %d", arg.err); WSREP_INFO("sst_donor_thread signaled with %d", arg.err);
...@@ -1732,6 +1858,18 @@ int wsrep_sst_donate(const std::string& msg, ...@@ -1732,6 +1858,18 @@ int wsrep_sst_donate(const std::string& msg,
} }
} }
sst_donor_completed= false;
pthread_t monitor;
ret= mysql_thread_create (key_wsrep_sst_donor_monitor, &monitor, NULL, wsrep_sst_donor_monitor_thread, NULL);
if (ret)
{
WSREP_ERROR("sst_donate: mysql_thread_create() failed: %d (%s)",
ret, strerror(ret));
return WSREP_CB_FAILURE;
}
if (!strcmp (WSREP_SST_MYSQLDUMP, method)) if (!strcmp (WSREP_SST_MYSQLDUMP, method))
{ {
ret= sst_donate_mysqldump(data, current_gtid, bypass, env()); ret= sst_donate_mysqldump(data, current_gtid, bypass, env());
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment