Commit ff0325f1 authored by anozdrin/alik@booka's avatar anozdrin/alik@booka

Polishing:

1) add support for joinable threads to Thread class;
2) move checking of thread model to Manager from mysqlmanager.cc,
because it is needed only for IM-main process.
parent 1efc8620
...@@ -105,7 +105,7 @@ static int wait_process(My_process_info *pi) ...@@ -105,7 +105,7 @@ static int wait_process(My_process_info *pi)
couldn't use wait(), because it could return in any wait() in the program. couldn't use wait(), because it could return in any wait() in the program.
*/ */
if (linuxthreads) if (Manager::is_linux_threads())
wait(NULL); /* LinuxThreads were detected */ wait(NULL); /* LinuxThreads were detected */
else else
waitpid(*pi, NULL, 0); waitpid(*pi, NULL, 0);
...@@ -564,7 +564,7 @@ int Instance::start() ...@@ -564,7 +564,7 @@ int Instance::start()
instance_monitor= new Instance_monitor(this); instance_monitor= new Instance_monitor(this);
if (instance_monitor == NULL || instance_monitor->start_detached()) if (instance_monitor == NULL || instance_monitor->start(Thread::DETACHED))
{ {
delete instance_monitor; delete instance_monitor;
log_error("Instance::start(): failed to create the monitoring thread" log_error("Instance::start(): failed to create the monitoring thread"
......
...@@ -323,7 +323,7 @@ void Listener::handle_new_mysql_connection(struct st_vio *vio) ...@@ -323,7 +323,7 @@ void Listener::handle_new_mysql_connection(struct st_vio *vio)
Mysql_connection *mysql_connection= Mysql_connection *mysql_connection=
new Mysql_connection(thread_registry, user_map, new Mysql_connection(thread_registry, user_map,
vio, ++total_connection_count); vio, ++total_connection_count);
if (mysql_connection == NULL || mysql_connection->start_detached()) if (mysql_connection == NULL || mysql_connection->start(Thread::DETACHED))
{ {
log_error("handle_one_mysql_connection() failed"); log_error("handle_one_mysql_connection() failed");
delete mysql_connection; delete mysql_connection;
......
...@@ -93,6 +93,65 @@ int my_sigwait(const sigset_t *set, int *sig) ...@@ -93,6 +93,65 @@ int my_sigwait(const sigset_t *set, int *sig)
#endif #endif
/**********************************************************************
Implementation of checking the actual thread model.
***********************************************************************/
namespace { /* no-indent */
class ThreadModelChecker: public Thread
{
public:
ThreadModelChecker()
:main_pid(getpid())
{ }
public:
inline bool is_linux_threads() const
{
return linux_threads;
}
protected:
virtual void run()
{
linux_threads= main_pid != getpid();
}
private:
pid_t main_pid;
bool linux_threads;
};
bool check_if_linux_threads(bool *linux_threads)
{
ThreadModelChecker checker;
if (checker.start() || checker.join())
return TRUE;
*linux_threads= checker.is_linux_threads();
return FALSE;
}
}
/**********************************************************************
Manager implementation
***********************************************************************/
Guardian *Manager::p_guardian;
Instance_map *Manager::p_instance_map;
Thread_registry *Manager::p_thread_registry;
User_map *Manager::p_user_map;
#ifndef __WIN__
bool Manager::linux_threads;
#endif // __WIN__
void Manager::stop_all_threads() void Manager::stop_all_threads()
{ {
/* /*
...@@ -106,14 +165,6 @@ void Manager::stop_all_threads() ...@@ -106,14 +165,6 @@ void Manager::stop_all_threads()
p_thread_registry->deliver_shutdown(); p_thread_registry->deliver_shutdown();
} }
/**********************************************************************
Manager implementation
***********************************************************************/
Guardian *Manager::p_guardian;
Instance_map *Manager::p_instance_map;
Thread_registry *Manager::p_thread_registry;
User_map *Manager::p_user_map;
/* /*
manager - entry point to the main instance manager process: start manager - entry point to the main instance manager process: start
...@@ -132,6 +183,15 @@ int Manager::main() ...@@ -132,6 +183,15 @@ int Manager::main()
bool shutdown_complete= FALSE; bool shutdown_complete= FALSE;
pid_t manager_pid= getpid(); pid_t manager_pid= getpid();
if (check_if_linux_threads(&linux_threads))
{
log_error("Error: can not check if Linux Threads are used.");
return 1;
}
log_info("Detected threads model: %s.",
(const char *) (linux_threads ? "LINUX threads" : "POSIX threads"));
Thread_registry thread_registry; Thread_registry thread_registry;
/* /*
All objects created in the manager() function live as long as All objects created in the manager() function live as long as
...@@ -228,7 +288,7 @@ int Manager::main() ...@@ -228,7 +288,7 @@ int Manager::main()
permitted to process instances. And before flush_instances() has permitted to process instances. And before flush_instances() has
completed, there are no instances to guard. completed, there are no instances to guard.
*/ */
if (guardian.start_detached()) if (guardian.start(Thread::DETACHED))
{ {
log_error("Error: can not start Guardian thread."); log_error("Error: can not start Guardian thread.");
goto err; goto err;
...@@ -255,7 +315,7 @@ int Manager::main() ...@@ -255,7 +315,7 @@ int Manager::main()
/* Initialize the Listener. */ /* Initialize the Listener. */
if (listener.start_detached()) if (listener.start(Thread::DETACHED))
{ {
log_error("Error: can not start Listener thread."); log_error("Error: can not start Listener thread.");
stop_all_threads(); stop_all_threads();
......
...@@ -39,6 +39,10 @@ class Manager ...@@ -39,6 +39,10 @@ class Manager
static Thread_registry *get_thread_registry() { return p_thread_registry; } static Thread_registry *get_thread_registry() { return p_thread_registry; }
static User_map *get_user_map() { return p_user_map; } static User_map *get_user_map() { return p_user_map; }
#ifndef __WIN__
static bool is_linux_threads() { return linux_threads; }
#endif // __WIN__
private: private:
static void stop_all_threads(); static void stop_all_threads();
...@@ -47,6 +51,14 @@ class Manager ...@@ -47,6 +51,14 @@ class Manager
static Instance_map *p_instance_map; static Instance_map *p_instance_map;
static Thread_registry *p_thread_registry; static Thread_registry *p_thread_registry;
static User_map *p_user_map; static User_map *p_user_map;
#ifndef __WIN__
/*
This flag is set if Instance Manager is running on the system using
LinuxThreads.
*/
static bool linux_threads;
#endif // __WIN__
}; };
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_MANAGER_H #endif // INCLUDES_MYSQL_INSTANCE_MANAGER_MANAGER_H
...@@ -71,7 +71,6 @@ static void daemonize(const char *log_file_name); ...@@ -71,7 +71,6 @@ static void daemonize(const char *log_file_name);
static void angel(); static void angel();
static struct passwd *check_user(const char *user); static struct passwd *check_user(const char *user);
static int set_user(const char *user, struct passwd *user_info); static int set_user(const char *user, struct passwd *user_info);
static bool check_if_linuxthreads();
#endif #endif
...@@ -111,9 +110,6 @@ int main(int argc, char *argv[]) ...@@ -111,9 +110,6 @@ int main(int argc, char *argv[])
} }
} }
if (check_if_linuxthreads())
goto main_end; /* out of resources */
if (Options::Daemon::run_as_service) if (Options::Daemon::run_as_service)
{ {
/* forks, and returns only in child */ /* forks, and returns only in child */
...@@ -395,28 +391,4 @@ static void angel() ...@@ -395,28 +391,4 @@ static void angel()
exit(0); exit(0);
} }
} }
extern "C" {
static void *check_if_linuxthreads_thread_func(void *arg)
{
pid_t main_pid= *(pid_t*) arg;
linuxthreads= getpid() != main_pid;
return NULL;
}
} /* extern "C" */
static bool check_if_linuxthreads()
{
pid_t pid= getpid();
pthread_t thread_id;
int rc;
rc= pthread_create(&thread_id, NULL, check_if_linuxthreads_thread_func,
(void*) &pid);
if (rc == 0)
rc= pthread_join(thread_id, NULL);
return test(rc);
}
#endif #endif
...@@ -22,14 +22,6 @@ ...@@ -22,14 +22,6 @@
#include "log.h" #include "log.h"
#ifndef __WIN__
/*
This flag is set if mysqlmanager has detected that it is running on the
system using LinuxThreads
*/
bool linuxthreads;
#endif
/* /*
The following string must be less then 80 characters, as The following string must be less then 80 characters, as
mysql_connection.cc relies on it mysql_connection.cc relies on it
......
...@@ -50,14 +50,6 @@ const int MAX_VERSION_LENGTH= 160; ...@@ -50,14 +50,6 @@ const int MAX_VERSION_LENGTH= 160;
const int MAX_INSTANCE_NAME_SIZE= FN_REFLEN; const int MAX_INSTANCE_NAME_SIZE= FN_REFLEN;
#ifndef __WIN__
/*
This flag is set if mysqlmanager has detected that it is running on the
system using LinuxThreads
*/
extern bool linuxthreads;
#endif
extern const LEX_STRING mysqlmanager_version; extern const LEX_STRING mysqlmanager_version;
/* MySQL client-server protocol version: substituted from configure */ /* MySQL client-server protocol version: substituted from configure */
......
...@@ -25,8 +25,6 @@ ...@@ -25,8 +25,6 @@
#include <signal.h> #include <signal.h>
#include "log.h"
#ifndef __WIN__ #ifndef __WIN__
/* Kick-off signal handler */ /* Kick-off signal handler */
...@@ -87,8 +85,8 @@ Thread_registry::~Thread_registry() ...@@ -87,8 +85,8 @@ Thread_registry::~Thread_registry()
void Thread_registry::register_thread(Thread_info *info, void Thread_registry::register_thread(Thread_info *info,
bool send_signal_on_shutdown) bool send_signal_on_shutdown)
{ {
log_info("Thread_registry: registering thread %d...", DBUG_PRINT("info", ("Thread_registry: registering thread %d...",
(int) info->thread_id); (int) info->thread_id));
info->init(send_signal_on_shutdown); info->init(send_signal_on_shutdown);
...@@ -118,8 +116,8 @@ void Thread_registry::register_thread(Thread_info *info, ...@@ -118,8 +116,8 @@ void Thread_registry::register_thread(Thread_info *info,
void Thread_registry::unregister_thread(Thread_info *info) void Thread_registry::unregister_thread(Thread_info *info)
{ {
log_info("Thread_registry: unregistering thread %d...", DBUG_PRINT("info", ("Thread_registry: unregistering thread %d...",
(int) info->thread_id); (int) info->thread_id));
pthread_mutex_lock(&LOCK_thread_registry); pthread_mutex_lock(&LOCK_thread_registry);
info->prev->next= info->next; info->prev->next= info->next;
...@@ -127,7 +125,7 @@ void Thread_registry::unregister_thread(Thread_info *info) ...@@ -127,7 +125,7 @@ void Thread_registry::unregister_thread(Thread_info *info)
if (head.next == &head) if (head.next == &head)
{ {
log_info("Thread_registry: thread registry is empty!"); DBUG_PRINT("info", ("Thread_registry: thread registry is empty!"));
pthread_cond_signal(&COND_thread_registry_is_empty); pthread_cond_signal(&COND_thread_registry_is_empty);
} }
...@@ -231,6 +229,7 @@ void Thread_registry::deliver_shutdown() ...@@ -231,6 +229,7 @@ void Thread_registry::deliver_shutdown()
wait_for_threads_to_unregister(); wait_for_threads_to_unregister();
#ifndef DBUG_OFF
/* /*
Print out threads, that didn't stopped. Thread_registry destructor will Print out threads, that didn't stopped. Thread_registry destructor will
probably abort the program if there is still any alive thread. probably abort the program if there is still any alive thread.
...@@ -238,15 +237,16 @@ void Thread_registry::deliver_shutdown() ...@@ -238,15 +237,16 @@ void Thread_registry::deliver_shutdown()
if (head.next != &head) if (head.next != &head)
{ {
log_info("Thread_registry: non-stopped threads:"); DBUG_PRINT("info", ("Thread_registry: non-stopped threads:"));
for (Thread_info *info= head.next; info != &head; info= info->next) for (Thread_info *info= head.next; info != &head; info= info->next)
log_info(" - %ld", (long int) info->thread_id); DBUG_PRINT("info", (" - %lu", (unsigned long) info->thread_id));
} }
else else
{ {
log_info("Thread_registry: all threads stopped."); DBUG_PRINT("info", ("Thread_registry: all threads stopped."));
} }
#endif // DBUG_OFF
pthread_mutex_unlock(&LOCK_thread_registry); pthread_mutex_unlock(&LOCK_thread_registry);
} }
...@@ -278,13 +278,13 @@ void Thread_registry::wait_for_threads_to_unregister() ...@@ -278,13 +278,13 @@ void Thread_registry::wait_for_threads_to_unregister()
set_timespec(shutdown_time, 1); set_timespec(shutdown_time, 1);
log_info("Thread_registry: joining threads..."); DBUG_PRINT("info", ("Thread_registry: joining threads..."));
while (true) while (true)
{ {
if (head.next == &head) if (head.next == &head)
{ {
log_info("Thread_registry: emptied."); DBUG_PRINT("info", ("Thread_registry: emptied."));
return; return;
} }
...@@ -294,7 +294,7 @@ void Thread_registry::wait_for_threads_to_unregister() ...@@ -294,7 +294,7 @@ void Thread_registry::wait_for_threads_to_unregister()
if (error == ETIMEDOUT || error == ETIME) if (error == ETIMEDOUT || error == ETIME)
{ {
log_info("Thread_registry: threads shutdown timed out."); DBUG_PRINT("info", ("Thread_registry: threads shutdown timed out."));
return; return;
} }
} }
...@@ -362,17 +362,33 @@ void *Thread::thread_func(void *arg) ...@@ -362,17 +362,33 @@ void *Thread::thread_func(void *arg)
} }
bool Thread::start_detached() bool Thread::start(enum_thread_type thread_type)
{ {
pthread_t thd_id;
pthread_attr_t attr; pthread_attr_t attr;
int rc; int rc;
pthread_attr_init(&attr); pthread_attr_init(&attr);
if (thread_type == DETACHED)
{
detached = TRUE;
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
rc= set_stacksize_and_create_thread(&thd_id, &attr, }
Thread::thread_func, this); else
{
detached = FALSE;
}
rc= set_stacksize_and_create_thread(&id, &attr, Thread::thread_func, this);
pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
return rc != 0; return rc != 0;
} }
bool Thread::join()
{
DBUG_ASSERT(!detached);
return pthread_join(id, NULL) != 0;
}
...@@ -86,17 +86,42 @@ class Thread_info ...@@ -86,17 +86,42 @@ class Thread_info
class Thread class Thread
{ {
public: public:
Thread() {} enum enum_thread_type
bool start_detached(); {
DETACHED,
JOINABLE
};
public:
Thread()
{ }
public:
inline bool is_detached() const;
bool start(enum_thread_type thread_type = JOINABLE);
bool join();
protected: protected:
virtual void run()= 0; virtual void run()= 0;
virtual ~Thread(); virtual ~Thread();
private:
pthread_t id;
bool detached;
private: private:
static void *thread_func(void *arg); static void *thread_func(void *arg);
private:
Thread(const Thread & /* rhs */); /* not implemented */ Thread(const Thread & /* rhs */); /* not implemented */
Thread &operator=(const Thread & /* rhs */); /* not implemented */ Thread &operator=(const Thread & /* rhs */); /* not implemented */
}; };
inline bool Thread::is_detached() const
{
return detached;
}
/** /**
Thread_registry - contains handles for each worker thread to deliver Thread_registry - contains handles for each worker thread to deliver
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment