Commit 02bcc9a3 authored by Alain Takoudjou's avatar Alain Takoudjou

grid.promise: save and use promise periodicity in cache

parent ababf1f6
......@@ -47,8 +47,7 @@ from slapos.grid.promise.generic import (GenericPromise, PromiseQueueResult,
AnomalyResult, TestResult,
PROMISE_STATE_FOLDER_NAME,
PROMISE_RESULT_FOLDER_NAME,
PROMISE_PARAMETER_NAME,
PROMISE_PERIOD_FILE_NAME)
PROMISE_PARAMETER_NAME)
from slapos.grid.promise.wrapper import WrapPromise
from slapos.version import version
......@@ -72,6 +71,7 @@ class PromiseProcess(Process):
@param promise_name: The name of the promise to run
@param promise_path: path of the promise
@param argument_dict: all promise parameters in a dictionary
@param queue: Queue used to send promise result
@param allow_bang: Bolean saying if bang should be called in case of
anomaly failure.
@param check_anomaly: Bolean saying if promise anomaly should be run.
......@@ -94,78 +94,61 @@ class PromiseProcess(Process):
self._periodicity = None
self.cache_folder = os.path.join(self.partition_folder,
PROMISE_CACHE_FOLDER_NAME)
mkdir_p(self.cache_folder)
self.cache_file = os.path.join(self.cache_folder, self.getPromiseTitle())
self._timestamp_file = os.path.join(partition_folder,
# XXX - remove old files used to store promise timestamp and periodicity
self._cleanupDeprecated()
def _cleanupDeprecated(self):
timestamp_file = os.path.join(self.partition_folder,
PROMISE_STATE_FOLDER_NAME,
'%s.timestamp' % promise_name)
periodicity_file = os.path.join(partition_folder,
'%s.timestamp' % self.name)
periodicity_file = os.path.join(self.partition_folder,
PROMISE_STATE_FOLDER_NAME,
PROMISE_PERIOD_FILE_NAME % promise_name)
if os.path.exists(periodicity_file) and os.stat(periodicity_file).st_size:
with open(periodicity_file) as f:
try:
self._periodicity = float(f.read())
except ValueError:
# set to None, run the promise and regenerate the file
pass
'%s.periodicity' % self.name)
if os.path.exists(timestamp_file) and os.path.isfile(timestamp_file):
os.unlink(timestamp_file)
if os.path.exists(periodicity_file) and os.path.isfile(periodicity_file):
os.unlink(periodicity_file)
def isPeriodicityMatch(self):
def getNextPromiseTime(self, periodicity):
"""
Return True if promise should be run now, considering the promise
periodicity in minutes
Return the next promise execution timestamp from now
"""
if self._periodicity is not None and \
os.path.exists(self._timestamp_file) and \
os.stat(self._timestamp_file).st_size:
with open(self._timestamp_file) as f:
try:
latest_timestamp = float(f.read())
current_timediff = (time.time() - latest_timestamp) / 60.0
if current_timediff >= self._periodicity:
return True
#self.logger.debug("Skip Promise %r. periodicity=%s, time_diff=%s" % (
# self.name, self._periodicity, current_timediff))
except ValueError:
# if the file is broken, run the promise and regenerate it
return True
else:
return False
return True
def setPromiseStartTimestamp(self):
"""
Save the promise execution timestamp
"""
state_directory = os.path.dirname(self._timestamp_file)
mkdir_p(state_directory)
with open(self._timestamp_file, 'w') as f:
f.write(str(time.time()))
return time.time() + (periodicity * 60.0)
def getPromiseTitle(self):
return os.path.splitext(self.name)[0]
def updatePromiseCache(self, promise_class, promise_instance):
def updatePromiseCache(self, promise_class, promise_instance, started=True):
"""
Cache some data from the promise that can be reused
"""
py_file = '%s.py' % os.path.splitext(inspect.getfile(promise_class))[0]
stat = os.stat(py_file)
timestamp = time.time()
cache_dict = dict(
is_tested= not hasattr(promise_instance, 'isTested') or \
promise_instance.isTested(),
is_anomaly_detected=not hasattr(promise_instance, 'isAnomalyDetected') or \
promise_instance.isAnomalyDetected(),
periodicity=promise_instance.getPeriodicity(),
next_run_after=timestamp + (promise_instance.getPeriodicity() * 60.0),
timestamp=timestamp,
module_file=py_file,
module_file_mtime=stat.st_mtime,
module_file_size=stat.st_size,
)
if not os.path.isdir(self.cache_folder):
mkdir_p(self.cache_folder)
if not started:
cache_dict['next_run_after'] = timestamp
with open(self.cache_file, 'w') as f:
f.write(json.dumps(cache_dict))
def loadPromiseCacheDict(self):
"""
Load cached data for this promise.
If saved promise module file is not exists then invalidate cache.
Cache will be updated when promise run
"""
if os.path.exists(self.cache_file):
try:
with open(self.cache_file) as f:
......@@ -173,11 +156,6 @@ class PromiseProcess(Process):
if not os.path.exists(cache_dict['module_file']):
# file not exists mean path was changed
return None
current_stat = os.stat(cache_dict['module_file'])
if current_stat.st_mtime != cache_dict['module_file_mtime'] or \
current_stat.st_size != cache_dict['module_file_size']:
# file was modified, update cache
return None
return cache_dict
except ValueError:
return None
......@@ -190,23 +168,26 @@ class PromiseProcess(Process):
"""
try:
os.chdir(self.partition_folder)
promise_started = False
if self.uid and self.gid:
dropPrivileges(self.uid, self.gid, logger=self.logger)
if self.wrap_promise:
promise_instance = WrapPromise(self.argument_dict)
else:
self._createInitFile()
promise_module = self._loadPromiseModule()
promise_instance = promise_module.RunPromise(self.argument_dict)
self.updatePromiseCache(promise_module.RunPromise, promise_instance)
if not hasattr(promise_instance, 'isAnomalyDetected') or not \
hasattr(promise_instance, 'isTested') or \
(promise_instance.isAnomalyDetected() and self.check_anomaly) or \
(promise_instance.isTested() and not self.check_anomaly):
# if the promise will run, we save execution timestamp
self.setPromiseStartTimestamp()
promise_started = True
self.updatePromiseCache(
WrapPromise if self.wrap_promise else promise_module.RunPromise,
promise_instance,
started=promise_started)
promise_instance.run(self.check_anomaly, self.allow_bang)
except Exception:
self.logger.error(traceback.format_exc())
......@@ -387,7 +368,7 @@ class PromiseLauncher(object):
execution_time=execution_time
)
def _writePromiseResult(self, result):
def _savePromiseResult(self, result):
if not isinstance(result, PromiseQueueResult):
self.logger.error('Bad result: %s is not type of PromiseQueueResult...' % result)
return
......@@ -421,12 +402,10 @@ class PromiseLauncher(object):
))
return result
def _savePromiseResult(self, result_item):
def _writePromiseResult(self, result_item):
if result_item.item.type() == "Empty Result":
# no result collected (sense skipped)
skipped_method = "Anomaly" if self.check_anomaly else "Test"
self.logger.debug("Skipped, %s is disabled in promise %r" % (
skipped_method, result_item.name))
return
elif result_item.item.hasFailed():
self.logger.error(result_item.item.message)
if result_item.execution_time != -1 and \
......@@ -434,7 +413,7 @@ class PromiseLauncher(object):
# stop to bang as it was called
self.bang_called = True
# Send result
self._writePromiseResult(result_item)
self._savePromiseResult(result_item)
def _emptyQueue(self):
"""Remove all entries from queue until it's empty"""
......@@ -451,6 +430,11 @@ class PromiseLauncher(object):
PROMISE_STATE_FOLDER_NAME)
chownDirectory(folder_path, stat_info.st_uid, stat_info.st_gid)
def isPeriodicityMatch(self, next_timestamp):
if next_timestamp:
return time.time() >= next_timestamp
return True
def _launchPromise(self, promise_name, promise_path, argument_dict,
wrap_process=False):
"""
......@@ -460,7 +444,6 @@ class PromiseLauncher(object):
If the promise periodicity doesn't match, the previous promise result is
checked.
"""
self.logger.info("Checking promise %s..." % promise_name)
try:
promise_process = PromiseProcess(
self.partition_folder,
......@@ -480,9 +463,10 @@ class PromiseLauncher(object):
if self.check_anomaly and not promise_cache_dict.get('is_anomaly_detected') \
or not self.check_anomaly and not promise_cache_dict.get('is_tested'):
# promise is skipped, send empty result
self._savePromiseResult(PromiseQueueResult())
self._writePromiseResult(PromiseQueueResult())
return
if not self.force and not promise_process.isPeriodicityMatch():
if not self.force and (promise_cache_dict is not None and not
self.isPeriodicityMatch(promise_cache_dict.get('next_run_after'))):
# we won't start the promise process, just get the latest result
result = self._loadPromiseResult(promise_process.getPromiseTitle())
if result is not None:
......@@ -500,6 +484,7 @@ class PromiseLauncher(object):
self.logger.warning("Promise %s skipped." % promise_name)
return True
self.logger.info("Checking promise %s..." % promise_name)
queue_item = None
sleep_time = 0.1
increment_limit = int(self.promise_timeout / sleep_time)
......@@ -567,7 +552,7 @@ class PromiseLauncher(object):
execution_time=execution_time
)
self._savePromiseResult(queue_item)
self._writePromiseResult(queue_item)
if self.debug:
self.logger.debug("Finished promise %r in %s second(s)." % (
promise_name, execution_time))
......
......@@ -46,7 +46,6 @@ PROMISE_RESULT_FOLDER_NAME = '.slapgrid/promise/result'
PROMISE_LOG_FOLDER_NAME = '.slapgrid/promise/log'
PROMISE_PARAMETER_NAME = 'extra_config_dict'
PROMISE_PERIOD_FILE_NAME = '%s.periodicity'
LOGLINE_RE = r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\-?\s*(\w{4,7})\s+\-?\s+(\d+\-\d{3})\s+\-?\s*(.*)"
matchLogStr = re.compile(LOGLINE_RE).match
......@@ -159,10 +158,6 @@ class GenericPromise(with_metaclass(ABCMeta, object)):
self.__promise_path = self.__config.pop('path', None)
self.__queue = self.__config.pop('queue', None)
self.__logger_buffer = None
self.__periodicity_file = os.path.join(
self.__partition_folder,
PROMISE_STATE_FOLDER_NAME,
PROMISE_PERIOD_FILE_NAME % self.__name)
self.setPeriodicity(self.__config.pop('periodicity', 2))
self.__transaction_id = '%s-%s' % (int(time.time()), random.randint(100, 999))
......@@ -236,8 +231,6 @@ class GenericPromise(with_metaclass(ABCMeta, object)):
if minute <= 0:
raise ValueError("Cannot set promise periodicity to a value less than 1")
self.__periodicity = minute
with open(self.__periodicity_file, 'w') as f:
f.write('%s' % minute)
def getPeriodicity(self):
return self.__periodicity
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment