Commit 75a3c99c authored by kostja@oak.local's avatar kostja@oak.local

First implementation for signal handling and multi-threading:

safe shutdown and signal deliverence to all threads in 
the manager process
parent 36ce7d5f
......@@ -15,8 +15,10 @@ liboptions_a_SOURCES= otpions.h options.cc
bin_PROGRAMS = mysqlmanager
#AM_CXXFLAGS= -Wformat=2 -W -Wall -Wformat-security
mysqlmanager_SOURCES= mysqlmanager.cc manager.h manager.cc log.h log.cc
mysqlmanager_SOURCES= mysqlmanager.cc manager.h manager.cc log.h log.cc \
listener.h listener.cc \
thread_repository.h thread_repository.cc
mysqlmanager_LDADD= liboptions.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a \
......
#include "listener.h"
#include "thread_repository.h"
#include "log.h"
C_MODE_START
pthread_handler_decl(listener, arg)
{
Thread_info info(pthread_self());
Thread_repository &thread_repository=
((Listener_thread_args *) arg)->thread_repository;
thread_repository.register_thread(&info);
while (true)
{
log_info("listener is alive");
sleep(2);
if (thread_repository.is_shutdown())
break;
}
log_info("listener(): shutdown requested, exiting...");
thread_repository.unregister_thread(&info);
return 0;
}
C_MODE_END
#if 0
while (true)
{
}
/*
Dummy manager implementation: listens on a UNIX socket and
starts echo server in a dedicated thread for each accepted connection.
Enough to test startup/shutdown/options/logging of the instance manager.
*/
int fd= socket(AF_UNIX, SOCK_STREAM, 0);
if (!fd)
die("socket(): failed");
struct sockaddr_un address;
bzero(&address, sizeof(address));
address.sun_family= AF_UNIX;
strcpy(address.sun_path, socket_path);
int opt= 1;
if (unlink(socket_path) ||
bind(fd, (struct sockaddr *) &address, sizeof(address)) ||
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)))
die("unlink | bind | setsockopt failed");
if (listen(fd, 5))
die("listen() failed");
int client_fd;
while ((client_fd= accept(fd, 0, 0)) != -1);
{
printf("accepted\n");
const char *message= "\n10hel";
send(client_fd, message, strlen(message), 0);
int sleep_seconds= argc > 1 && atoi(argv[1]) ? atoi(argv[1]) : 1;
printf("sleeping %d seconds\n", sleep_seconds);
sleep(sleep_seconds);
close(client_fd);
}
printf("accept(): failed\n");
close(fd);
#endif
#ifndef INCLUDES_MYSQL_INSTANCE_MANAGER_LISTENER_H
#define INCLUDES_MYSQL_INSTANCE_MANAGER_LISTENER_H
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifdef __GNUC__
#pragma interface
#endif
#include <my_global.h>
#include <my_pthread.h>
C_MODE_START
pthread_handler_decl(listener, arg);
C_MODE_END
class Thread_repository;
struct Listener_thread_args
{
Thread_repository &thread_repository;
const char *socket_file_name;
Listener_thread_args(Thread_repository &thread_repository_arg,
const char *socket_file_name_arg) :
thread_repository(thread_repository_arg),
socket_file_name(socket_file_name_arg) {}
};
#endif
......@@ -73,7 +73,7 @@ void print_error(const char *format, ...)
void log_init();
/* initialize logs for daemon application */
/* print information to the error log and eixt(1) */
void die(const char *format, ...)
#ifdef __GNUC__
......
......@@ -14,57 +14,52 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "manager.h"
#include <my_global.h>
#include <signal.h>
#include "thread_repository.h"
#include "listener.h"
#include "log.h"
//#include "mysql_connection.h"
void manager(const char *socket_file_name)
{
while (true)
{
log_info("manager is alive");
sleep(2);
}
#if 0
/*
Dummy manager implementation: listens on a UNIX socket and
starts echo server in a dedicated thread for each accepted connection.
Enough to test startup/shutdown/options/logging of the instance manager.
*/
Thread_repository thread_repository;
Listener_thread_args listener_args(thread_repository, socket_file_name);
int fd= socket(AF_UNIX, SOCK_STREAM, 0);
if (!fd)
die("socket(): failed");
/* write pid file */
struct sockaddr_un address;
bzero(&address, sizeof(address));
address.sun_family= AF_UNIX;
strcpy(address.sun_path, socket_path);
int opt= 1;
/* block signals */
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
sigaddset(&mask, SIGTERM);
sigaddset(&mask, SIGHUP);
if (unlink(socket_path) ||
bind(fd, (struct sockaddr *) &address, sizeof(address)) ||
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)))
die("unlink | bind | setsockopt failed");
if (listen(fd, 5))
die("listen() failed");
int client_fd;
while ((client_fd= accept(fd, 0, 0)) != -1);
/* all new threads will inherite this signal mask */
pthread_sigmask(SIG_BLOCK, &mask, NULL);
{
printf("accepted\n");
const char *message= "\n10hel";
send(client_fd, message, strlen(message), 0);
/* create the listener */
pthread_t listener_thd_id;
pthread_attr_t listener_thd_attr;
int sleep_seconds= argc > 1 && atoi(argv[1]) ? atoi(argv[1]) : 1;
printf("sleeping %d seconds\n", sleep_seconds);
sleep(sleep_seconds);
close(client_fd);
if (pthread_attr_init(&listener_thd_attr))
die("manager(): pthread_attr_init(listener) failed");
if (pthread_attr_setdetachstate(&listener_thd_attr,
PTHREAD_CREATE_DETACHED))
die("manager(): pthread_attr_setdetachstate(listener) failed");
if (pthread_create(&listener_thd_id, &listener_thd_attr, listener,
&listener_args))
die("manager(): pthread_create(listener) failed");
}
printf("accept(): failed\n");
close(fd);
#endif
/*
To work nicely with LinuxThreads, the signal thread is the first thread
in the process.
*/
int signo;
sigwait(&mask, &signo);
thread_repository.deliver_shutdown();
/* don't pthread_exit to kill all threads who did not shut down in time */
}
#ifndef INCLUDES_MYSQL_INSTANCE_MANAGER_MANAGER_H
#define INCLUDES_MYSQL_INSTANCE_MANAGER_MANAGER_H
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
......@@ -15,3 +17,5 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
void manager(const char *socket_file_name);
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_MANAGER_H
......@@ -106,13 +106,12 @@ static void daemonize(const char *log_file_name)
case 0: // child, fork ok
int fd;
/*
Become a session leader: setsid should succeed because child is
Become a session leader: setsid must succeed because child is
guaranteed not to be a process group leader (it belongs to the
process group of the parent.)
The goal is not to have a controlling terminal.
*/
if (setsid() == -1)
die("daemonize(): setsid() failed, %s", strerror(errno));
setsid();
/*
As we now don't have a controlling terminal we will not receive
tty-related signals - no need to ignore them.
......@@ -193,19 +192,21 @@ static void angel(const Options &options)
sigset_t zeromask; // to sigsuspend in parent
struct sigaction sa_chld, sa_term;
struct sigaction sa_chld_out, sa_term_out, sa_int_out, sa_hup_out;
if (sigemptyset(&zeromask) ||
sigemptyset(&sa_chld.sa_mask) ||
sigemptyset(&sa_term.sa_mask)) // how can it fail?
die("angel(): sigemptyset() failed, %s", strerror(errno));
sigemptyset(&zeromask);
sigemptyset(&sa_chld.sa_mask);
sigemptyset(&sa_term.sa_mask);
sa_chld.sa_handler= reap_child;
sa_chld.sa_flags= SA_NOCLDSTOP;
sa_term.sa_handler= terminate;
sa_term.sa_flags= 0;
if (sigaction(SIGCHLD, &sa_chld, &sa_chld_out) == -1 ||
sigaction(SIGTERM, &sa_term, &sa_term_out) == -1 ||
sigaction(SIGINT, &sa_term, &sa_int_out) == -1 ||
sigaction(SIGHUP, &sa_term, &sa_hup_out) == -1)
die("angel(): sigaction() failed, %s", strerror(errno));
/* sigaction can fail only on wrong arguments */
sigaction(SIGCHLD, &sa_chld, &sa_chld_out);
sigaction(SIGTERM, &sa_term, &sa_term_out);
sigaction(SIGINT, &sa_term, &sa_int_out);
sigaction(SIGHUP, &sa_term, &sa_hup_out);
/* spawn a child */
spawn:
......@@ -218,22 +219,16 @@ static void angel(const Options &options)
restore default actions for signals to let the manager work with
signals as he wishes
*/
if (sigaction(SIGCHLD, &sa_chld_out, 0) == -1 ||
sigaction(SIGTERM, &sa_term_out, 0) == -1 ||
sigaction(SIGINT, &sa_int_out, 0) == -1 ||
sigaction(SIGHUP, &sa_hup_out, 0) == -1)
die("angel(): child failed to restore old signal actions, %s",
strerror(errno));
sigaction(SIGCHLD, &sa_chld_out, 0);
sigaction(SIGTERM, &sa_term_out, 0);
sigaction(SIGINT, &sa_int_out, 0);
sigaction(SIGHUP, &sa_hup_out, 0);
manager(options.socket_file_name);
default: // parent, success
while (child_status == CHILD_OK && is_terminated == 0)
{
errno= 0;
sigsuspend(&zeromask);
if (errno != EINTR)
die("angel(): sigsuspend failed, %s", strerror(errno));
}
if (is_terminated)
log_info("angel got signal %d (%s), exiting",
is_terminated, sys_siglist[is_terminated]);
......
......@@ -14,16 +14,16 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifdef __GNUC__
#pragma implementation
#endif
#include "options.h"
#include <my_global.h>
#include <my_sys.h>
#include <my_getopt.h>
#ifdef __GNUC__
#pragma implementation
#endif
#define QUOTE2(x) #x
#define QUOTE(x) QUOTE2(x)
......@@ -121,7 +121,3 @@ void Options::load(int argc, char **argv)
exit(rc);
}
void init_mysys_library(const char *progname)
{
MY_INIT((char *) progname);
}
......@@ -35,6 +35,4 @@ struct Options
static void load(int argc, char **argv);
};
void init_mysys_library(const char *progname);
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_OPTIONS_H
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifdef __GNUC__
#pragma implementation
#endif
#include "thread_repository.h"
#include <assert.h>
#include <signal.h>
#include "log.h"
/* Kick-off signal handler */
enum { THREAD_KICK_OFF_SIGNAL= SIGUSR2 };
static void handle_signal(int __attribute__((unused)) sig_no)
{
}
/*
TODO: think about moving signal information (now it's shutdown_in_progress)
to Thread_info. It will reduce contention and allow signal deliverence to
a particular thread, not to the whole worker crew
*/
Thread_repository::Thread_repository() :
shutdown_in_progress(false)
{
pthread_mutex_init(&LOCK_thread_repository, 0);
pthread_cond_init(&COND_thread_repository_is_empty, 0);
/* head is used by-value to simplify nodes inserting */
head.next= head.prev= &head;
}
Thread_repository::~Thread_repository()
{
/* Check that no one uses the repository. */
pthread_mutex_lock(&LOCK_thread_repository);
/* All threads must unregister */
DBUG_ASSERT(head.next == &head);
pthread_mutex_unlock(&LOCK_thread_repository);
pthread_cond_destroy(&COND_thread_repository_is_empty);
pthread_mutex_destroy(&LOCK_thread_repository);
}
/*
Set signal handler for kick-off thread, and insert a thread info to the
repository. New node is appended to the end of the list; head.prev always
points to the last node.
*/
void Thread_repository::register_thread(Thread_info *info)
{
struct sigaction sa;
sa.sa_handler= handle_signal;
sa.sa_flags= 0;
sigemptyset(&sa.sa_mask);
sigaction(THREAD_KICK_OFF_SIGNAL, &sa, 0);
info->current_cond= 0;
pthread_mutex_lock(&LOCK_thread_repository);
info->next= &head;
info->prev= head.prev;
head.prev->next= info;
head.prev= info;
pthread_mutex_unlock(&LOCK_thread_repository);
}
/*
Unregister a thread from the repository and free Thread_info structure.
Every registered thread must unregister. Unregistering should be the last
thing a thread is doing, otherwise it could have no time to finalize.
*/
void Thread_repository::unregister_thread(Thread_info *info)
{
pthread_mutex_lock(&LOCK_thread_repository);
info->prev->next= info->next;
info->next->prev= info->prev;
if (head.next == &head)
pthread_cond_signal(&COND_thread_repository_is_empty);
pthread_mutex_unlock(&LOCK_thread_repository);
}
/*
Check whether shutdown is in progress, and if yes, return immidiately.
Else set info->current_cond and call pthread_cond_wait. When
pthread_cond_wait returns, unregister current cond and check the shutdown
status again.
RETURN VALUE
return value from pthread_cond_wait
*/
int Thread_repository::cond_wait(Thread_info *info, pthread_cond_t *cond,
pthread_mutex_t *mutex, bool *is_shutdown)
{
pthread_mutex_lock(&LOCK_thread_repository);
*is_shutdown= shutdown_in_progress;
if (*is_shutdown)
{
pthread_mutex_unlock(&LOCK_thread_repository);
return 0;
}
info->current_cond= cond;
pthread_mutex_unlock(&LOCK_thread_repository);
/* sic: race condition here, cond can be signaled in deliver_shutdown */
int rc= pthread_cond_wait(cond, mutex);
pthread_mutex_lock(&LOCK_thread_repository);
info->current_cond= 0;
*is_shutdown= shutdown_in_progress;
pthread_mutex_unlock(&LOCK_thread_repository);
return rc;
}
/*
Deliver shutdown message to the workers crew.
As it's impossible to avoid all race conditions, we signal latecomers
again.
*/
void Thread_repository::deliver_shutdown()
{
struct timespec shutdown_time;
set_timespec(shutdown_time, 1);
pthread_mutex_lock(&LOCK_thread_repository);
shutdown_in_progress= true;
for (Thread_info *info= head.next; info != &head; info= info->next)
{
pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
/*
sic: race condition here, the thread may not yet fall into
pthread_cond_wait.
*/
if (info->current_cond)
pthread_cond_signal(info->current_cond);
}
while (pthread_cond_timedwait(&COND_thread_repository_is_empty,
&LOCK_thread_repository,
&shutdown_time) != ETIMEDOUT &&
head.next != &head)
;
/*
If previous signals did not reach some threads, they must be sleeping
in pthread_cond_wait or a blocking syscall. Wake them up:
every thread shall check signal variables after each syscall/cond_wait,
so this time everybody should be informed (presumably each worker can
get CPU during shutdown_time.)
*/
for (Thread_info *info= head.next; info != &head; info= info->next)
{
pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
if (info->current_cond)
pthread_cond_signal(info->current_cond);
}
pthread_mutex_unlock(&LOCK_thread_repository);
}
#ifndef INCLUDES_MYSQL_INSTANCE_MANAGER_THREAD_REPOSITORY_HH
#define INCLUDES_MYSQL_INSTANCE_MANAGER_THREAD_REPOSITORY_HH
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
A multi-threaded application shall nicely work with signals.
This means it shall, first of all, shut down nicely on ``quit'' signals:
stop all running threads, cleanup and exit.
Note, that a thread can't be shut down nicely if it doesn't want to be.
That's why to perform clean shutdown, all threads consituting a process
must observe certain rules. Here we use the rules, described in Butenhof
book 'Programming with POSIX threads', namely:
- all user signals are handled in 'signal thread' in synchronous manner
(by means of sigwait). To guarantee that the signal thread is the only who
can receive user signals, all threads block them, and signal thread is
the only who calls sigwait() with an apporpriate sigmask.
To propogate a signal to the workers the signal thread sets
a variable, corresponding to the signal. Additionally the signal thread
sends each worker an internal signal (by means of pthread_kill) to kick it
out from possible blocking syscall, and possibly pthread_cond_signal if
some thread is blocked in pthread_cond_[timed]wait.
- a worker handles only internal 'kick' signal (the handler does nothing).
In case when a syscall returns 'EINTR' the worker checks all
signal-related variables and behaves accordingly.
Also these variables shall be checked from time to time in long
CPU-bounded operations, and before/after pthread_cond_wait. (It's supposed
that a worker thread either waits in a syscall/conditional variable, or
computes something.)
- to guarantee signal deliverence, there should be some kind of feedback,
e. g. all workers shall account in the signal thread Thread Repository and
unregister from it on exit.
Configuration reload (on SIGHUP) and thread timeouts/alarms can be handled
in manner, similar to ``quit'' signals.
*/
#ifdef __GNUC__
#pragma interface
#endif
#include <my_global.h>
#include <my_pthread.h>
/*
Thread_info - repository entry for each worker thread
All entries comprise double-linked list like:
0 -- entry -- entry -- entry - 0
Double-linked list is used to unregister threads easy.
*/
class Thread_info
{
pthread_cond_t *current_cond;
Thread_info *prev, *next;
pthread_t thread_id;
Thread_info() {}
friend class Thread_repository;
public:
Thread_info(pthread_t thread_id_arg) : thread_id(thread_id_arg) {}
};
/*
Thread_repository - contains handles for each worker thread to deliver
signal information to workers.
*/
class Thread_repository
{
public:
Thread_repository();
~Thread_repository();
void register_thread(Thread_info *info);
void unregister_thread(Thread_info *info);
void deliver_shutdown();
inline bool is_shutdown();
int cond_wait(Thread_info *info, pthread_cond_t *cond,
pthread_mutex_t *mutex, bool *is_shutdown);
private:
Thread_info head;
bool shutdown_in_progress;
pthread_mutex_t LOCK_thread_repository;
pthread_cond_t COND_thread_repository_is_empty;
};
inline bool Thread_repository::is_shutdown()
{
pthread_mutex_lock(&LOCK_thread_repository);
bool res= shutdown_in_progress;
pthread_mutex_unlock(&LOCK_thread_repository);
return res;
}
#endif // INCLUDES_MYSQL_INSTANCE_MANAGER_THREAD_REPOSITORY_HH
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment