Commit b2f415d3 authored by unknown's avatar unknown

Bug#26015 valgrind warning PollGuard::unlock_and_signal()/NdbCondition_Destroy

- Add variable "ndbcluster_binlog_terminating" and use that to signal
the cluster binlog thread it's time to shutdown. This allows
exact control of when the thread shutdown, previous implementation 
would start shutdown of the thread as soon as the mysqld started
shutdown. Now we will shutdown cluster binlog thread 
in 'ndbcluster_binlog_end'


sql/ha_ndbcluster_binlog.cc:
  Make binlog thread shutdown controllable by using a separet variable
  to communicate the order to shutdown
parent bc365a8b
...@@ -81,6 +81,7 @@ static Ndb *injector_ndb= 0; ...@@ -81,6 +81,7 @@ static Ndb *injector_ndb= 0;
static Ndb *schema_ndb= 0; static Ndb *schema_ndb= 0;
static int ndbcluster_binlog_inited= 0; static int ndbcluster_binlog_inited= 0;
static int ndbcluster_binlog_terminating= 0;
/* /*
Mutex and condition used for interacting between client sql thread Mutex and condition used for interacting between client sql thread
...@@ -582,61 +583,18 @@ static int ndbcluster_binlog_end(THD *thd) ...@@ -582,61 +583,18 @@ static int ndbcluster_binlog_end(THD *thd)
#ifdef HAVE_NDB_BINLOG #ifdef HAVE_NDB_BINLOG
/* wait for injector thread to finish */ /* wait for injector thread to finish */
ndbcluster_binlog_terminating= 1;
pthread_cond_signal(&injector_cond);
pthread_mutex_lock(&injector_mutex); pthread_mutex_lock(&injector_mutex);
if (ndb_binlog_thread_running > 0) while (ndb_binlog_thread_running > 0)
{ pthread_cond_wait(&injector_cond, &injector_mutex);
pthread_cond_signal(&injector_cond);
pthread_mutex_unlock(&injector_mutex);
pthread_mutex_lock(&injector_mutex);
while (ndb_binlog_thread_running > 0)
{
struct timespec abstime;
set_timespec(abstime, 1);
pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
}
}
pthread_mutex_unlock(&injector_mutex); pthread_mutex_unlock(&injector_mutex);
/* remove all shares */
{
pthread_mutex_lock(&ndbcluster_mutex);
for (uint i= 0; i < ndbcluster_open_tables.records; i++)
{
NDB_SHARE *share=
(NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
if (share->table)
DBUG_PRINT("share",
("table->s->db.table_name: %s.%s",
share->table->s->db.str, share->table->s->table_name.str));
/* ndb_share reference create free */
if (share->state != NSS_DROPPED)
{
DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
share->key, share->use_count));
--share->use_count;
}
if (share->use_count == 0)
ndbcluster_real_free_share(&share);
else
{
DBUG_PRINT("share",
("[%d] 0x%lx key: %s key_length: %d",
i, (long) share, share->key, share->key_length));
DBUG_PRINT("share",
("db.tablename: %s.%s use_count: %d commit_count: %lu",
share->db, share->table_name,
share->use_count, (long) share->commit_count));
}
}
pthread_mutex_unlock(&ndbcluster_mutex);
}
pthread_mutex_destroy(&injector_mutex); pthread_mutex_destroy(&injector_mutex);
pthread_cond_destroy(&injector_cond); pthread_cond_destroy(&injector_cond);
pthread_mutex_destroy(&ndb_schema_share_mutex); pthread_mutex_destroy(&ndb_schema_share_mutex);
#endif #endif
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2385,7 +2343,6 @@ int ndbcluster_binlog_start() ...@@ -2385,7 +2343,6 @@ int ndbcluster_binlog_start()
if (ndb_binlog_thread_running < 0) if (ndb_binlog_thread_running < 0)
DBUG_RETURN(-1); DBUG_RETURN(-1);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -3562,6 +3519,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3562,6 +3519,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
s_ndb->init()) s_ndb->init())
{ {
sql_print_error("NDB Binlog: Getting Schema Ndb object failed"); sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
ndb_binlog_thread_running= -1;
pthread_mutex_unlock(&injector_mutex);
pthread_cond_signal(&injector_cond);
goto err; goto err;
} }
...@@ -3592,7 +3552,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3592,7 +3552,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
p_latest_trans_gci= p_latest_trans_gci=
injector_ndb->get_ndb_cluster_connection().get_latest_trans_gci(); injector_ndb->get_ndb_cluster_connection().get_latest_trans_gci();
schema_ndb= s_ndb; schema_ndb= s_ndb;
ndb_binlog_thread_running= 1;
if (opt_bin_log) if (opt_bin_log)
{ {
if (global_system_variables.binlog_format == BINLOG_FORMAT_ROW || if (global_system_variables.binlog_format == BINLOG_FORMAT_ROW ||
...@@ -3605,10 +3565,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3605,10 +3565,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
sql_print_error("NDB: only row based binary logging is supported"); sql_print_error("NDB: only row based binary logging is supported");
} }
} }
/*
We signal the thread that started us that we've finished /* Thread start up completed */
starting up. ndb_binlog_thread_running= 1;
*/
pthread_mutex_unlock(&injector_mutex); pthread_mutex_unlock(&injector_mutex);
pthread_cond_signal(&injector_cond); pthread_cond_signal(&injector_cond);
...@@ -3627,7 +3586,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3627,7 +3586,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
struct timespec abstime; struct timespec abstime;
set_timespec(abstime, 1); set_timespec(abstime, 1);
pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
if (abort_loop) if (ndbcluster_binlog_terminating)
{ {
pthread_mutex_unlock(&injector_mutex); pthread_mutex_unlock(&injector_mutex);
goto err; goto err;
...@@ -3652,13 +3611,15 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3652,13 +3611,15 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
{ {
// wait for the first event // wait for the first event
thd->proc_info= "Waiting for first event from ndbcluster"; thd->proc_info= "Waiting for first event from ndbcluster";
DBUG_PRINT("info", ("Waiting for the first event"));
int schema_res, res; int schema_res, res;
Uint64 schema_gci; Uint64 schema_gci;
do do
{ {
if (abort_loop) DBUG_PRINT("info", ("Waiting for the first event"));
if (ndbcluster_binlog_terminating)
goto err; goto err;
schema_res= s_ndb->pollEvents(100, &schema_gci); schema_res= s_ndb->pollEvents(100, &schema_gci);
} while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci); } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
if (ndb_binlog_running) if (ndb_binlog_running)
...@@ -3666,7 +3627,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3666,7 +3627,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Uint64 gci= i_ndb->getLatestGCI(); Uint64 gci= i_ndb->getLatestGCI();
while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch) while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
{ {
if (abort_loop) if (ndbcluster_binlog_terminating)
goto err; goto err;
res= i_ndb->pollEvents(10, &gci); res= i_ndb->pollEvents(10, &gci);
} }
...@@ -3713,7 +3674,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3713,7 +3674,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
thd->db= db; thd->db= db;
} }
do_ndbcluster_binlog_close_connection= BCCC_running; do_ndbcluster_binlog_close_connection= BCCC_running;
for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) && for ( ; !((ndbcluster_binlog_terminating ||
do_ndbcluster_binlog_close_connection) &&
ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci) && ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci) &&
do_ndbcluster_binlog_close_connection != BCCC_restart; ) do_ndbcluster_binlog_close_connection != BCCC_restart; )
{ {
...@@ -3760,7 +3722,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ...@@ -3760,7 +3722,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
schema_res= s_ndb->pollEvents(10, &schema_gci); schema_res= s_ndb->pollEvents(10, &schema_gci);
} }
if ((abort_loop || do_ndbcluster_binlog_close_connection) && if ((ndbcluster_binlog_terminating ||
do_ndbcluster_binlog_close_connection) &&
(ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci || (ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci ||
!ndb_binlog_running)) !ndb_binlog_running))
break; /* Shutting down server */ break; /* Shutting down server */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment