Commit 63e7ec10 authored by Alain Takoudjou's avatar Alain Takoudjou

grid.promise: implement a new promise launcher in slapgrid

parent 4b7a7060
# -*- coding: utf-8 -*-
# vim: set et sts=2:
##############################################################################
#
# Copyright (c) 2018 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import sys
import subprocess
import logging
import time
import importlib
import ConfigParser
import re
import slapos.slap
from multiprocessing import Process, Queue
from datetime import datetime
from threading import Thread
from slapos.grid.promise import interface
class PromiseError(Exception):
pass
class BaseResult(object):
def __init__(self, problem=False, message=None):
self.__problem = problem
self.__message = message
def hasFailed(self):
return not self.__problem
@property
def type(self):
return "BaseResult"
@property
def message(self):
return self.__message
class TestResult(BaseResult):
@property
def type(self):
return "TestResult"
class AnomalyResult(BaseResult):
@property
def type(self):
return "AnomalyResult"
class PromiseQueueResult(object):
def __init__(self, path, name, title, result, execution_time=0):
self.path = path
self.name = name
self.result = result
self.execution_time = execution_time
class GenericPromise(object)
def __init__(self, config):
self.__config = config
self.__log_folder = self.__config.pop('log-folder', None)
self.__partition_folder = self.__config.pop('partition-folder', None)
sef.__check_anomaly = self.___config.pop('check-anomaly', False)
self.__title = self.___config.pop('title', None)
self.__periodicity = self.___config.pop('periodicity', None)
self.__debug = self.___config.pop('debug', True)
self.__name = self.___config.pop('name', None)
self.__promise_path = self.___config.pop('path', None)
self.queue = self.___config.pop('queue', None)
self._validateConf()
self.__log_file = os.path.join(self.__log_folder,
'%s.log' % os.path.splitext(self.__name)[0])
self._configureLogger()
for key, value in config.items():
setattr(self, key.replace('-', '_', value)
def _configureLogger(self):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG if self.__debug else logging.INFO)
file_handler = logging.FileHandler(self.__log_file)
file_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
self.logger.addHandler(file_handler)
def _validateConf(self):
if self.queue is None:
raise ValueError("Queue object is not set in configuration")
if self.__name is None:
raise ValueError("Monitor name is not set in configuration")
if self.__promise_path is None:
raise ValueError("Promise path is not set in configuration")
if self.__title is None:
raise ValueError("Monitor title is not set in configuration")
if self.__log_folder is None:
raise ValueError("Monitor log folder is not set in configuration")
if self.__partition_folder is None:
raise ValueError("Monitor partition folder is not set in configuration")
validate_crontab_time_format_regex = re.compile(\
"{0}\s+{1}\s+{2}\s+{3}\s+{4}".format(\
"(?P<minute>\*|[0-5]?\d)",\
"(?P<hour>\*|[01]?\d|2[0-3])",\
"(?P<day>\*|0?[1-9]|[12]\d|3[01])",\
"(?P<month>\*|0?[1-9]|1[012])",\
"(?P<day_of_week>\*|[0-6](\-[0-6])?)"\
)
)
if validate_crontab_time_format_regex.match(period) is None:
raise ValueError("Periodicity %r is not a valid Cron time format." % period)
def getConfig(self):
return self.__config
def getLogFile(self):
return self.__log_file
def getLogFolder(self):
return self.__log_folder
def getPartitionFolder(self):
return self.__partition_folder
def __bang(self, message):
"""
Call bang if requested
"""
if self.__config.has_key('master_url') and \
self.__config.has_key('partition-id') and \
self.__config.has_key('computer-id'):
slap = slapos.slap.slap()
slap.initializeConnection(
self.__config['master-url'],
self.__config.get('partition-key'),
self.__config.get('partition-cert'),
)
computer_partition = slap.registerComputerPartition(
self.__config['computer-id'],
self.__config['partition-id'],
)
computer_partition.bang(message)
def run(self, can_bang=True):
"""
Method called to run the Promise
@param can_bang: Set to True if bang can be called, this parameter should
be set to False if bang is already called by another promise.
"""
try:
self.sense()
except Exception, e:
# log the result
self.logger.error(str(e))
if sef.__check_anomaly:
# run sense, anomaly
try:
result = self.anomaly()
if result is None:
raise ValueError("Promise anomaly method returned 'None'")
except Exception, e:
result = AnomalyResult(problem=True, message=str(e))
if result.hasFailed() and can_bang:
self.__bang("Promise %s is failing" % self.__title)
else:
# run sense, test
try:
result = self.test()
if result is None:
raise ValueError("Promise test method returned 'None'")
except Exception, e:
result = TestResult(problem=True, message=str(e))
# send the result of this promise
# should not raise Queue.Full exception as limit is not set to constructor
self.queue.put(PromiseQueueResult(
path=self.__promise_path,
name=self.__name,
title=self.__title,
result=result
), True)
class PromiseRunner(Process):
"""
Run a promise in a new Process
"""
def __init__(self, promise_instance, allow_bang=True):
"""
Initialise Promise Runner
@param promise_instance: Promise instance from GenericPromise class
@param allow_bang: Bolean saying if bang should be called in case of
anomaly failure.
"""
Process.__init__()
self.promise_instance = promise_instance
self.config = config
self.allow_bang = allow_bang
self.daemon = False
def run(self):
promise_instance.run(self.allow_bang)
class PromiseLauncher(object):
def __init__(self, config=None, logger=None, config_file=None,
save_method=None):
"""
Promise launcher will run promises
@param config_file: A file containing configurations
@param save_method: A method to call for saving promise result. If
None, the promise excution will raise in case of failure.
@param logger: Set the logger to use, if None a logger will be configured
to console.
@param config: A configuration dict to use. Values send here will
overwrite configs from `config_file`. Expected values in config are:
promise-timeout
Maximum promise execution time before timeout. Default: 20
partition-folder
Base path of the partition
promise-dir
Promises folder, all promises scripts will be imported from that folder
check-anomaly
Ask to check anomaly instead of test. Default: False
debug
Configure loggin in debug mode. Default: True
master-url
SlapOS Master service URL
partition-cert
Computer Partition Certificate file
partition-key
Computer Partition key file
partition-id
Computer Partition ID, ex: slappart13
computer-id
Computer ID, ex: COMP-1234
"""
self.__config = {
'promise-timeout': 20,
'promise-dir': None,
}
if self.config_file is not None:
self.loadConfigFromFile(config_file)
if config is not None:
self.__config.update(config)
if save_method is not None:
self.save_method = save_method
for key, value in self.__config.items():
setattr(self, key.replace('-', '_'), value)
if self.promise_dir is None:
raise ValueError("Promise folder is missing in configuration!")
if logger is None:
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
self.logger.addHandler(handler)
init_file = os.path.join(self.promise_dir, '__init__.py')
if not os.path.exists(init_file):
with open(ini_file, 'w') as f:
f.write("")
os.chmod(init_file, 0644)
if sys.path[0] != self.promise_dir:
sys.path[0:0] = [self.promise_dir]
self.queue_result = Queue()
self.bang_called = False
def _loadConfigFromFile(self, config_file):
config = ConfigParser.ConfigParser()
config.read([config_file])
if config.has_section('promises'):
for key, value in config.items('promises'):
self.__config[key] = value
def _loadPromiseModule(self, promise_name):
"""Load a promise from promises directory."""
if re.match(r'[a-zA-Z_]', promise_name) is None:
self.logger.error("Promise plugin name %r is not valid" % promise_name)
promise_module = importlib.import_module(promise_name)
if not hasattr(promise_module, "RunPromise"):
raise AttributeError("Class RunPromise not found in %s" % promise_name)
if not interface.IPromise.implementedBy(promise_module.RunPromise):
raise RuntimeError("RunPromise class in %s must implements 'IPromise'" \
" interface. zope_interface.implements(interface.IPromise) is" \
" missing ?" % promise_name)
if not issubclass(promise_module.RunPromise, GenericPromise):
raise RuntimeError("RunPromise class is not a subclass of" \
"GenericPromise class.")
return promise_module
def _getErrorPromiseResult(self, promise_instance, promise_name, message,
execution_time=0):
if self.check_anomaly:
result = AnomalyResult(problem=True, message=message)
else:
result = TestResult(problem=True, message=message)
return PromiseQueueResult(
result=result,
path=os.path.join(self.promise_dir, promise_name),
name=promise_name,
title=promise_instance.getTitle(),
execution_time=execution_time
)
def _launchPromise(self, promise_module, promise_name, argument_dict):
"""
Launch the promise and save the result if `self.save_method` is not None
If no save method is set, raise PromiseError in case of failure
"""
self.logger.info("Checking promise %s..." % promise_name)
promise_instance = promise_module.RunPromise(argument_dict)
promise_process = PromiseRunner(
promise_module,
argument_dict,
not self.bang_called and self.check_anomaly
)
promise_process.start()
sleep_time = 0.1
increment_limit = int(self.promise_timeout / sleep_time)
for current_increment in range(0, increment_limit):
if not process.is_alive():
try:
queue_item = self.queue_result.get(False, 2)
except Queue.Empty:
# no result found in process result Queue
if self.save_method is None:
raise PromiseError("No output returned by the promise")
queue_item = self._getErrorPromiseResult(
promise_instance,
promise_name=promise_name,
messsage="No output returned by the promise",
execution_time=current_increment * sleep_time
)
self.save_method(queue_item)
break
if queue_item.result.hasFailed():
if self.save_method is None:
raise PromiseError(queue_item.result.message)
elif isinstance(queue_item.result, AnomalyResult):
# stop to bang is it was called
self.bang_called = True
if self.save_method is not None:
queue_item.execution_time = current_increment * sleep_time
self.save_method(queue_item)
break
else:
promise_process.terminate()
message = 'Promise timed out after %s seconds' % self.promise_timeout
if self.save_method is None:
raise PromiseError(message)
else:
self.save_method(self._getErrorPromiseResult(
promise_instance,
promise_name=promise_name,
message=message,
execution_time=self.promise_timeout
))
def run(self):
promise_list = []
# load all promises so we can catch import errors before launch them
promise_list = [(promise_name, self._loadPromiseModule(promise_name))
for promise_name in os.listdir(self.promise_dir)]
base_config = {
'log-folder': '',
'partition-folder': '',
'debug': True,
'master-url': '',
'partition-cert': '',
'partition-key': '',
'partition-id': '',
'computer-id': '',
'queue': self.queue_result,
}
for promise in promise_list:
config = {
'path': os.path.join(self.promise_dir, promise[0]),
'name': promise[0]
}
config.update(base_config)
self._launchPromise(promise, promise_name, config)
# coding: utf-8
from zope.interface import Interface
class IPromise(Interface):
"""Base Promise interface."""
def __init__(config):
"""
@param config: Configurations needed to start the promise
"""
def anomaly(self):
"""
Called to detect if there is an anomaly.
@return AnomalyResult object
"""
def sense(self):
"""
Run the promise code and store the result
raise error, log error message, ... for failure
"""
def test(self):
"""
Test promise and say if problem is detected or not
@return TestResult object
"""
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment