thread_repository.cc 5.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#ifdef __GNUC__
#pragma implementation 
#endif

#include "thread_repository.h"
#include <assert.h>
#include <signal.h>
#include "log.h"


/* Kick-off signal handler */
  
enum { THREAD_KICK_OFF_SIGNAL= SIGUSR2 };

static void handle_signal(int __attribute__((unused)) sig_no)
{
}


/*
  TODO: think about moving signal information (now it's shutdown_in_progress)
  to Thread_info. It will reduce contention and allow signal deliverence to
  a particular thread, not to the whole worker crew 
*/

Thread_repository::Thread_repository() :
  shutdown_in_progress(false)
{
  pthread_mutex_init(&LOCK_thread_repository, 0);
  pthread_cond_init(&COND_thread_repository_is_empty, 0);

  /* head is used by-value to simplify nodes inserting */
  head.next= head.prev= &head;
}


Thread_repository::~Thread_repository()
{
  /* Check that no one uses the repository. */
  pthread_mutex_lock(&LOCK_thread_repository);

  /* All threads must unregister */
  DBUG_ASSERT(head.next == &head);

  pthread_mutex_unlock(&LOCK_thread_repository);
  pthread_cond_destroy(&COND_thread_repository_is_empty);
  pthread_mutex_destroy(&LOCK_thread_repository);
}


/*
  
  Set signal handler for kick-off thread, and insert a thread info to the
  repository. New node is appended to the end of the list; head.prev always
  points to the last node.
*/

void Thread_repository::register_thread(Thread_info *info)
{
  struct sigaction sa;
  sa.sa_handler= handle_signal;
  sa.sa_flags= 0;
  sigemptyset(&sa.sa_mask);
  sigaction(THREAD_KICK_OFF_SIGNAL, &sa, 0);

  info->current_cond= 0;

  pthread_mutex_lock(&LOCK_thread_repository);
  info->next= &head;
  info->prev= head.prev;
  head.prev->next= info;
  head.prev= info;
  pthread_mutex_unlock(&LOCK_thread_repository);
}


/*
  Unregister a thread from the repository and free Thread_info structure.
  Every registered thread must unregister. Unregistering should be the last 
  thing a thread is doing, otherwise it could have no time to finalize.
*/

void Thread_repository::unregister_thread(Thread_info *info)
{
  pthread_mutex_lock(&LOCK_thread_repository);
  info->prev->next= info->next;
  info->next->prev= info->prev;
  if (head.next == &head)
    pthread_cond_signal(&COND_thread_repository_is_empty);
  pthread_mutex_unlock(&LOCK_thread_repository);
}


/*
  Check whether shutdown is in progress, and if yes, return immidiately.
  Else set info->current_cond and call pthread_cond_wait. When
  pthread_cond_wait returns, unregister current cond and check the shutdown
  status again.
  RETURN VALUE
    return value from pthread_cond_wait
*/

int Thread_repository::cond_wait(Thread_info *info, pthread_cond_t *cond,
                                  pthread_mutex_t *mutex, bool *is_shutdown)
{
  pthread_mutex_lock(&LOCK_thread_repository);
  *is_shutdown= shutdown_in_progress;
  if (*is_shutdown)
  {
    pthread_mutex_unlock(&LOCK_thread_repository);
    return 0;
  }
  info->current_cond= cond;
  pthread_mutex_unlock(&LOCK_thread_repository);
  /* sic: race condition here, cond can be signaled in deliver_shutdown */
  int rc= pthread_cond_wait(cond, mutex);
  pthread_mutex_lock(&LOCK_thread_repository);
  info->current_cond= 0;
  *is_shutdown= shutdown_in_progress;
  pthread_mutex_unlock(&LOCK_thread_repository);
  return rc;
}


/*
  Deliver shutdown message to the workers crew.
  As it's impossible to avoid all race conditions, we signal latecomers
  again.
*/

void Thread_repository::deliver_shutdown()
{
  struct timespec shutdown_time;
  set_timespec(shutdown_time, 1);
151
  Thread_info *info;
152 153 154

  pthread_mutex_lock(&LOCK_thread_repository);
  shutdown_in_progress= true;
155
  for (info= head.next; info != &head; info= info->next)
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
  {
    pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
    /*
      sic: race condition here, the thread may not yet fall into
      pthread_cond_wait.
    */
    if (info->current_cond)
      pthread_cond_signal(info->current_cond);
  }
  while (pthread_cond_timedwait(&COND_thread_repository_is_empty,
                                &LOCK_thread_repository,
                                &shutdown_time) != ETIMEDOUT &&
         head.next != &head)
    ;
  /*
    If previous signals did not reach some threads, they must be sleeping 
    in pthread_cond_wait or a blocking syscall. Wake them up: 
    every thread shall check signal variables after each syscall/cond_wait,
    so this time everybody should be informed (presumably each worker can
    get CPU during shutdown_time.)
  */
177
  for (info= head.next; info != &head; info= info->next)
178 179 180 181 182 183 184 185
  {
    pthread_kill(info->thread_id, THREAD_KICK_OFF_SIGNAL);
    if (info->current_cond)
      pthread_cond_signal(info->current_cond);
  }
  pthread_mutex_unlock(&LOCK_thread_repository);
}