Commit 6e07cd0b authored by Alain Takoudjou's avatar Alain Takoudjou

slapos.promises: set more options, PromiseWrapper terminate children when exiting

parent 9a9d489e
...@@ -32,11 +32,13 @@ import os ...@@ -32,11 +32,13 @@ import os
import sys import sys
import logging import logging
import time import time
import signal
import importlib import importlib
import re import re
import traceback import traceback
import psutil import psutil
import subprocess import subprocess
import functools
import slapos.slap import slapos.slap
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from multiprocessing import Process, Queue as MQueue from multiprocessing import Process, Queue as MQueue
...@@ -46,6 +48,8 @@ from slapos.grid.promise import interface ...@@ -46,6 +48,8 @@ from slapos.grid.promise import interface
from slapos.grid.utils import dropPrivileges from slapos.grid.utils import dropPrivileges
from zope import interface as zope_interface from zope import interface as zope_interface
PROMISE_STATE_FOLDER_NAME = '.promises'
class PromiseError(Exception): class PromiseError(Exception):
pass pass
...@@ -86,10 +90,10 @@ class AnomalyResult(BaseResult): ...@@ -86,10 +90,10 @@ class AnomalyResult(BaseResult):
class PromiseQueueResult(object): class PromiseQueueResult(object):
def __init__(self, path, name, title, result, execution_time=0): def __init__(self, path, name, title, item, execution_time=0):
self.path = path self.path = path
self.name = name self.name = name
self.item = result self.item = item
self.title = title self.title = title
self.execution_time = execution_time self.execution_time = execution_time
...@@ -104,7 +108,6 @@ class GenericPromise(object): ...@@ -104,7 +108,6 @@ class GenericPromise(object):
self.__log_folder = self.__config.pop('log-folder', None) self.__log_folder = self.__config.pop('log-folder', None)
self.__partition_folder = self.__config.pop('partition-folder', None) self.__partition_folder = self.__config.pop('partition-folder', None)
self.__check_anomaly = self.__config.pop('check-anomaly', False) self.__check_anomaly = self.__config.pop('check-anomaly', False)
self.__title = self.__config.pop('title', None)
self.__periodicity = self.__config.pop('periodicity', 2) self.__periodicity = self.__config.pop('periodicity', 2)
self.__debug = self.__config.pop('debug', True) self.__debug = self.__config.pop('debug', True)
self.__name = self.__config.pop('name', None) self.__name = self.__config.pop('name', None)
...@@ -128,7 +131,7 @@ class GenericPromise(object): ...@@ -128,7 +131,7 @@ class GenericPromise(object):
else: else:
self.__log_file = os.path.join( self.__log_file = os.path.join(
self.__log_folder, self.__log_folder,
'%s.log' % os.path.splitext(self.__name)[0] '%s.log' % self.__title
) )
logger_handler = logging.FileHandler(self.__log_file) logger_handler = logging.FileHandler(self.__log_file)
...@@ -144,16 +147,21 @@ class GenericPromise(object): ...@@ -144,16 +147,21 @@ class GenericPromise(object):
raise ValueError("Queue object is not set in configuration") raise ValueError("Queue object is not set in configuration")
if self.__name is None: if self.__name is None:
raise ValueError("Monitor name is not set in configuration") raise ValueError("Monitor name is not set in configuration")
self.__title = os.path.splitext(self.__name)[0]
if self.__promise_path is None: if self.__promise_path is None:
raise ValueError("Promise path is not set in configuration") raise ValueError("Promise path is not set in configuration")
if self.__title is None:
raise ValueError("Monitor title is not set in configuration")
if self.__partition_folder is None: if self.__partition_folder is None:
raise ValueError("Monitor partition folder is not set in configuration") raise ValueError("Monitor partition folder is not set in configuration")
def getConfig(self): def getConfig(self):
return self.__config return self.__config
def getTitle(self):
return self.__title
def getName(self):
return self.__name
def getLogFile(self): def getLogFile(self):
return self.__log_file return self.__log_file
...@@ -166,7 +174,7 @@ class GenericPromise(object): ...@@ -166,7 +174,7 @@ class GenericPromise(object):
def getPromiseFile(self): def getPromiseFile(self):
return self.__promise_path return self.__promise_path
def setPeriodicity(self, minute=0): def setPeriodicity(self, minute):
if minute <= 0: if minute <= 0:
raise ValueError("Cannot set promise periodicity to a value less than 1") raise ValueError("Cannot set promise periodicity to a value less than 1")
self.__periodicity = minute self.__periodicity = minute
...@@ -283,7 +291,7 @@ class GenericPromise(object): ...@@ -283,7 +291,7 @@ class GenericPromise(object):
def defaultTest(self, latest_minute=2, failure_amount=1, exact_match=True, def defaultTest(self, latest_minute=2, failure_amount=1, exact_match=True,
is_anomaly=False): is_anomaly=False):
""" """
Fail if the latest `failure_amount` messages contain failure. Test if the latest messages contain `failure_amount` failures.
@param latest_minute: test the result from now to the latest X minutes in @param latest_minute: test the result from now to the latest X minutes in
the past. the past.
...@@ -374,7 +382,7 @@ class GenericPromise(object): ...@@ -374,7 +382,7 @@ class GenericPromise(object):
path=self.__promise_path, path=self.__promise_path,
name=self.__name, name=self.__name,
title=self.__title, title=self.__title,
result=result item=result
), True) ), True)
...@@ -386,11 +394,18 @@ class PromiseWrapper(GenericPromise): ...@@ -386,11 +394,18 @@ class PromiseWrapper(GenericPromise):
zope_interface.implements(interface.IPromise) zope_interface.implements(interface.IPromise)
def __init__(self, config): def __init__(self, config):
config.update({
"title": config.get('name')
})
GenericPromise.__init__(self, config) GenericPromise.__init__(self, config)
self.setPeriodicity(minute=2) # which periodicity to use ? self.setPeriodicity(minute=2)
@staticmethod
def terminate(name, logger, process, signum, frame):
if signum in [signal.SIGINT, signal.SIGTERM] and process:
logger.info("Terminating promise process %r" % name)
try:
# make sure we kill the process on timeout
process.terminate()
except Exception, e:
logger.error(traceback.format_exc())
def sense(self): def sense(self):
promise_process = subprocess.Popen( promise_process = subprocess.Popen(
...@@ -399,6 +414,11 @@ class PromiseWrapper(GenericPromise): ...@@ -399,6 +414,11 @@ class PromiseWrapper(GenericPromise):
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
cwd=self.getPartitionFolder() cwd=self.getPartitionFolder()
) )
handler = functools.partial(self.terminate, self.getName(), self.logger,
promise_process)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
output, error = promise_process.communicate() output, error = promise_process.communicate()
message = output or "" message = output or ""
if error: if error:
...@@ -434,7 +454,8 @@ class PromiseRunner(Process): ...@@ -434,7 +454,8 @@ class PromiseRunner(Process):
self.uid = uid self.uid = uid
self.gid = gid self.gid = gid
self.cwd = cwd self.cwd = cwd
# self.daemon = False # set this to True, so children process will be killed when exit
self.daemon = True
def run(self): def run(self):
if self.uid and self.gid: if self.uid and self.gid:
...@@ -485,10 +506,14 @@ class PromiseLauncher(object): ...@@ -485,10 +506,14 @@ class PromiseLauncher(object):
User UID User UID
gid gid
User GID User GID
profile debug
If True, show Promise consumption and execution time information If True, show Promise consumption and execution time information, etc
run-only-promise-list: run-only-promise-list
A list of promise from plugins directory that will be executed A list of promise from plugins directory that will be executed
slapgrid-mode
Set to True if promise launcher is executed by slapgrid.
force
Set to True if force run promises without check their periodicity
""" """
self.save_method = save_method self.save_method = save_method
...@@ -498,7 +523,7 @@ class PromiseLauncher(object): ...@@ -498,7 +523,7 @@ class PromiseLauncher(object):
'old-promise-folder': None, 'old-promise-folder': None,
'log-folder': None, 'log-folder': None,
'partition-folder': None, 'partition-folder': None,
'profile': False, 'debug': False,
'uid': None, 'uid': None,
'gid': None, 'gid': None,
'master-url': None, 'master-url': None,
...@@ -506,9 +531,10 @@ class PromiseLauncher(object): ...@@ -506,9 +531,10 @@ class PromiseLauncher(object):
'partition-key': None, 'partition-key': None,
'partition-id': None, 'partition-id': None,
'computer-id': None, 'computer-id': None,
'debug': True,
'check-anomaly': False, 'check-anomaly': False,
'run-only-promise-list': None 'force': False,
'run-only-promise-list': None,
'slapgrid-mode': False
} }
if config is not None: if config is not None:
self.__config.update(config) self.__config.update(config)
...@@ -523,7 +549,7 @@ class PromiseLauncher(object): ...@@ -523,7 +549,7 @@ class PromiseLauncher(object):
if logger is None: if logger is None:
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG) self.logger.setLevel(logging.DEBUG if self.debug else logging.INFO)
handler = logging.StreamHandler() handler = logging.StreamHandler()
handler.setFormatter( handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
...@@ -534,9 +560,14 @@ class PromiseLauncher(object): ...@@ -534,9 +560,14 @@ class PromiseLauncher(object):
self.queue_result = MQueue() self.queue_result = MQueue()
self.bang_called = False self.bang_called = False
self.run_status_folder = os.path.join(self.partition_folder, '.promises') self.run_status_folder = os.path.join(
self.partition_folder,
PROMISE_STATE_FOLDER_NAME
)
if not os.path.exists(self.run_status_folder): if not os.path.exists(self.run_status_folder):
os.mkdir(self.run_status_folder) os.mkdir(self.run_status_folder)
if self.slapgrid_mode:
self.force = True
def _isPromisePeriodicityMatch(self, promise_name, periodicity): def _isPromisePeriodicityMatch(self, promise_name, periodicity):
""" """
...@@ -555,7 +586,8 @@ class PromiseLauncher(object): ...@@ -555,7 +586,8 @@ class PromiseLauncher(object):
self.logger.debug("Skip Promise %r. periodicity=%s, time_diff=%s" % ( self.logger.debug("Skip Promise %r. periodicity=%s, time_diff=%s" % (
promise_name, periodicity, current_timediff)) promise_name, periodicity, current_timediff))
except ValueError: except ValueError:
return False # if the file is broken, run the promise and regenerated it
return True
else: else:
return False return False
return True return True
...@@ -618,7 +650,7 @@ class PromiseLauncher(object): ...@@ -618,7 +650,7 @@ class PromiseLauncher(object):
self.logger.warning("Promise %s skipped." % promise_name) self.logger.warning("Promise %s skipped." % promise_name)
return return
if not self._isPromisePeriodicityMatch(promise_name, if not self.force and not self._isPromisePeriodicityMatch(promise_name,
promise_instance.getPeriodicity()): promise_instance.getPeriodicity()):
return return
self.logger.info("Checking promise %s..." % promise_name) self.logger.info("Checking promise %s..." % promise_name)
...@@ -633,12 +665,13 @@ class PromiseLauncher(object): ...@@ -633,12 +665,13 @@ class PromiseLauncher(object):
) )
promise_process.start() promise_process.start()
if not self.slapgrid_mode:
self._setPromiseLatestRunTime(promise_name) self._setPromiseLatestRunTime(promise_name)
sleep_time = 0.1 sleep_time = 0.1
increment_limit = int(self.promise_timeout / sleep_time) increment_limit = int(self.promise_timeout / sleep_time)
execution_time = self.promise_timeout execution_time = self.promise_timeout
ps_profile = False ps_profile = False
if self.profile: if self.debug:
try: try:
psutil_process = psutil.Process(promise_process.pid) psutil_process = psutil.Process(promise_process.pid)
ps_profile = True ps_profile = True
...@@ -704,7 +737,7 @@ class PromiseLauncher(object): ...@@ -704,7 +737,7 @@ class PromiseLauncher(object):
message=message, message=message,
execution_time=execution_time execution_time=execution_time
)) ))
if self.profile: if self.debug:
self.logger.debug("Finished promise %r in %s second(s)." % ( self.logger.debug("Finished promise %r in %s second(s)." % (
promise_name, execution_time)) promise_name, execution_time))
...@@ -716,7 +749,8 @@ class PromiseLauncher(object): ...@@ -716,7 +749,8 @@ class PromiseLauncher(object):
base_config = { base_config = {
'log-folder': self.log_folder, 'log-folder': self.log_folder,
'partition-folder': self.partition_folder, 'partition-folder': self.partition_folder,
'debug': True, 'debug': self.debug,
'slapgrid-mode': self.slapgrid_mode,
'master-url': self.master_url, 'master-url': self.master_url,
'partition-cert': self.partition_cert, 'partition-cert': self.partition_cert,
'partition-key': self.partition_key, 'partition-key': self.partition_key,
......
...@@ -633,6 +633,7 @@ stderr_logfile_backups=1 ...@@ -633,6 +633,7 @@ stderr_logfile_backups=1
'promise-dir': promise_dir, 'promise-dir': promise_dir,
'old-promise-dir': old_promise_dir, 'old-promise-dir': old_promise_dir,
'promise-timeout': self.promise_timeout, 'promise-timeout': self.promise_timeout,
'slapgrid-mode': True,
'uid': uid, 'uid': uid,
'gid': gid, 'gid': gid,
'partition-folder': instance_path 'partition-folder': instance_path
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment