Commit a5ef083a authored by Alain Takoudjou's avatar Alain Takoudjou

check promise periodicity

parent 7dbd8742
...@@ -105,7 +105,7 @@ class GenericPromise(object): ...@@ -105,7 +105,7 @@ class GenericPromise(object):
self.__partition_folder = self.__config.pop('partition-folder', None) self.__partition_folder = self.__config.pop('partition-folder', None)
self.__check_anomaly = self.__config.pop('check-anomaly', False) self.__check_anomaly = self.__config.pop('check-anomaly', False)
self.__title = self.__config.pop('title', None) self.__title = self.__config.pop('title', None)
self.__periodicity = self.__config.pop('periodicity', None) self.__periodicity = self.__config.pop('periodicity', 2)
self.__debug = self.__config.pop('debug', True) self.__debug = self.__config.pop('debug', True)
self.__name = self.__config.pop('name', None) self.__name = self.__config.pop('name', None)
self.__promise_path = self.__config.pop('path', None) self.__promise_path = self.__config.pop('path', None)
...@@ -151,19 +151,6 @@ class GenericPromise(object): ...@@ -151,19 +151,6 @@ class GenericPromise(object):
if self.__partition_folder is None: if self.__partition_folder is None:
raise ValueError("Monitor partition folder is not set in configuration") raise ValueError("Monitor partition folder is not set in configuration")
validate_crontab_time_format_regex = re.compile(\
"{0}\s+{1}\s+{2}\s+{3}\s+{4}".format(\
"(?P<minute>\*|[0-5]?\d)",\
"(?P<hour>\*|[01]?\d|2[0-3])",\
"(?P<day>\*|0?[1-9]|[12]\d|3[01])",\
"(?P<month>\*|0?[1-9]|1[012])",\
"(?P<day_of_week>\*|[0-6](\-[0-6])?)"\
)
)
#if validate_crontab_time_format_regex.match(self.__periodicity) is None:
# raise ValueError("Promise periodicity %r is not a valid Cron time " \
# "format." % self.__periodicity)
def getConfig(self): def getConfig(self):
return self.__config return self.__config
...@@ -179,6 +166,14 @@ class GenericPromise(object): ...@@ -179,6 +166,14 @@ class GenericPromise(object):
def getPromiseFile(self): def getPromiseFile(self):
return self.__promise_path return self.__promise_path
def setPeriodicity(self, minute=0):
if minute <= 0:
raise ValueError("Cannot set promise periodicity to a value less than 1")
self.__periodicity = minute
def getPeriodicity(self):
return self.__periodicity
def __bang(self, message): def __bang(self, message):
""" """
Call bang if requested Call bang if requested
...@@ -285,7 +280,7 @@ class GenericPromise(object): ...@@ -285,7 +280,7 @@ class GenericPromise(object):
line_list.reverse() line_list.reverse()
return line_list return line_list
def defaultTest(self, latest_minute=3, failure_amount=2, exact_match=False, def defaultTest(self, latest_minute=2, failure_amount=1, exact_match=True,
is_anomaly=False): is_anomaly=False):
""" """
Fail if the latest `failure_amount` messages contain failure. Fail if the latest `failure_amount` messages contain failure.
...@@ -293,9 +288,13 @@ class GenericPromise(object): ...@@ -293,9 +288,13 @@ class GenericPromise(object):
@param latest_minute: test the result from now to the latest X minutes in @param latest_minute: test the result from now to the latest X minutes in
the past. the past.
@param failure_amount: fail is this amount of failure is found in result @param failure_amount: fail is this amount of failure is found in result
@param exact_match: bool (True|False). If True, only fail if the number @param exact_match: bool (True|False).
of failure found is equal to `failure_amount`. Else, fail if at least True:
one failure was found. only fail if the number of failure found is equal to
`failure_amount`, exactly `failure_amount` promise result are tested
starting from the most recent .
False:
fail if at least one failure is found.
@param is_anomaly: Say if the result is an AnomalyResult of TestResult @param is_anomaly: Say if the result is an AnomalyResult of TestResult
""" """
...@@ -388,10 +387,10 @@ class PromiseWrapper(GenericPromise): ...@@ -388,10 +387,10 @@ class PromiseWrapper(GenericPromise):
def __init__(self, config): def __init__(self, config):
config.update({ config.update({
"title": config.get('name'), "title": config.get('name')
"periodicity": "*/1 * * * *" # which periodicity to use ?
}) })
GenericPromise.__init__(self, config) GenericPromise.__init__(self, config)
self.setPeriodicity(minute=2) # which periodicity to use ?
def sense(self): def sense(self):
promise_process = subprocess.Popen( promise_process = subprocess.Popen(
...@@ -410,7 +409,7 @@ class PromiseWrapper(GenericPromise): ...@@ -410,7 +409,7 @@ class PromiseWrapper(GenericPromise):
self.logger.info(message.strip()) self.logger.info(message.strip())
def test(self): def test(self):
return self.defaultTest(latest_minute=3, failure_amount=2, is_anomaly=False) return self.defaultTest(latest_minute=4, failure_amount=1, is_anomaly=False)
class PromiseRunner(Process): class PromiseRunner(Process):
...@@ -519,6 +518,8 @@ class PromiseLauncher(object): ...@@ -519,6 +518,8 @@ class PromiseLauncher(object):
if self.promise_folder is None: if self.promise_folder is None:
raise ValueError("Promise folder is missing in configuration!") raise ValueError("Promise folder is missing in configuration!")
if self.partition_folder is None:
raise ValueError("Partition folder is missing in configuration!")
if logger is None: if logger is None:
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
...@@ -533,6 +534,39 @@ class PromiseLauncher(object): ...@@ -533,6 +534,39 @@ class PromiseLauncher(object):
self.queue_result = MQueue() self.queue_result = MQueue()
self.bang_called = False self.bang_called = False
self.run_status_folder = os.path.join(self.partition_folder, '.promises')
if not os.path.exists(self.run_status_folder):
os.mkdir(self.run_status_folder)
def _isPromisePeriodicityMatch(self, promise_name, periodicity):
"""
Return True if promise should be run now, considering given the execution
periodicity in minutes
"""
state_file = os.path.join(self.run_status_folder, promise_name)
if os.path.exists(state_file) and os.stat(state_file).st_size:
with open(state_file) as f:
try:
latest_timestamp = float(f.read())
current_timediff = (time.time() - latest_timestamp) / 60.0
margin_error = 0.15 # 0.15 seconds less is accepted
if current_timediff + margin_error >= periodicity:
return True
self.logger.debug("Skip Promise %r. periodicity=%s, time_diff=%s" % (
promise_name, periodicity, current_timediff))
except ValueError:
return False
else:
return False
return True
def _setPromiseLatestRunTime(self, promise_name):
"""
Save the promise execution timestamp
"""
state_file = os.path.join(self.run_status_folder, promise_name)
with open(state_file, 'w') as f:
f.write(str(time.time()))
def _loadPromiseModule(self, promise_name): def _loadPromiseModule(self, promise_name):
"""Load a promise from promises directory.""" """Load a promise from promises directory."""
...@@ -573,18 +607,22 @@ class PromiseLauncher(object): ...@@ -573,18 +607,22 @@ class PromiseLauncher(object):
Launch the promise and save the result if `self.save_method` is not None Launch the promise and save the result if `self.save_method` is not None
If no save method is set, raise PromiseError in case of failure If no save method is set, raise PromiseError in case of failure
""" """
self.logger.info("Checking promise %s..." % promise_name)
try: try:
if promise_module is None: if promise_module is None:
promise_instance = PromiseWrapper(argument_dict) promise_instance = PromiseWrapper(argument_dict)
else: else:
promise_instance = promise_module.RunPromise(argument_dict) promise_instance = promise_module.RunPromise(argument_dict)
except Exception: except Exception:
# to not prevent run other promises # only print traceback to not prevent run other promises
self.logger.error(traceback.format_exc()) self.logger.error(traceback.format_exc())
self.logger.warning("Promise %s skipped." % promise_name) self.logger.warning("Promise %s skipped." % promise_name)
return return
if not self._isPromisePeriodicityMatch(promise_name,
promise_instance.getPeriodicity()):
return
self.logger.info("Checking promise %s..." % promise_name)
promise_process = PromiseRunner( promise_process = PromiseRunner(
promise_instance, promise_instance,
logger=self.logger, logger=self.logger,
...@@ -595,6 +633,7 @@ class PromiseLauncher(object): ...@@ -595,6 +633,7 @@ class PromiseLauncher(object):
) )
promise_process.start() promise_process.start()
self._setPromiseLatestRunTime(promise_name)
sleep_time = 0.1 sleep_time = 0.1
increment_limit = int(self.promise_timeout / sleep_time) increment_limit = int(self.promise_timeout / sleep_time)
execution_time = self.promise_timeout execution_time = self.promise_timeout
...@@ -666,7 +705,7 @@ class PromiseLauncher(object): ...@@ -666,7 +705,7 @@ class PromiseLauncher(object):
execution_time=execution_time execution_time=execution_time
)) ))
if self.profile: if self.profile:
self.logger.info("Finished promise %r in %s second(s)." % ( self.logger.debug("Finished promise %r in %s second(s)." % (
promise_name, execution_time)) promise_name, execution_time))
def run(self): def run(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment