Commit 4df69149 authored by Justin's avatar Justin

promise: Merged check_free_disk_space and monitor_partition_space from...

promise: Merged check_free_disk_space and monitor_partition_space from @lisa.casino & @@xavier_thompson
parent 3b7d673f
......@@ -11,7 +11,8 @@ for f in sorted(glob.glob(os.path.join('slapos', 'README.*.rst'))):
long_description += open("CHANGES.txt").read() + "\n"
test_require = ['mock', 'cryptography',]
prediction_require = ['statsmodels', 'scipy', 'pandas']
test_require = ['mock', 'cryptography',] + prediction_require
setup(name=name,
version=version,
......@@ -61,6 +62,8 @@ setup(name=name,
'lampconfigure': ["mysqlclient"], #needed for MySQL Database access
'zodbpack': ['ZODB3'], # needed to play with ZODB
'flask_auth' : ["Flask-Auth"],
'pandas' : ['pandas'], # needed to monitor_partition_space promise
'prediction' : prediction_require, # needed to use ARIMA in check_free_disk_space
'test': test_require,
},
tests_require=test_require,
......
......@@ -36,23 +36,12 @@ import time
import json
import argparse
import psutil
from time import strftime, gmtime
from time import strftime
from datetime import datetime, timedelta
from slapos.collect.db import Database
from slapos.collect.reporter import ConsumptionReportBase
def get_date_scope():
return strftime('%Y-%m-%d', gmtime())
def get_min_time():
gm = gmtime()
return '{}:{}:00'.format(gm.tm_hour, gm.tm_min - 1)
def get_max_time():
gm = gmtime()
return '{}:{}:59'.format(gm.tm_hour, gm.tm_min - 1)
def parseArguments():
"""
Parse arguments for monitor collector instance.
......@@ -114,11 +103,11 @@ class ResourceCollect:
if where != "":
where = "and %s" % where
if not date_scope:
date_scope = get_date_scope()
date_scope = datetime.now().strftime('%Y-%m-%d')
if not min_time:
min_time = get_min_time()
min_time = (datetime.now() - timedelta(minutes=1)).strftime('%H:%M:00')
if not max_time:
max_time = get_max_time()
max_time = (datetime.now() - timedelta(minutes=1)).strftime('%H:%M:59')
columns = """count(pid), SUM(cpu_percent) as cpu_result, SUM(cpu_time),
MAX(cpu_num_threads), SUM(memory_percent), SUM(memory_rss), pid, SUM(io_rw_counter),
......@@ -159,11 +148,11 @@ class ResourceCollect:
if where != "":
where = " and %s" % where
if not date_scope:
date_scope = get_date_scope()
date_scope = datetime.now().strftime('%Y-%m-%d')
if not min_time:
min_time = get_min_time()
min_time = (datetime.now() - timedelta(minutes=1)).strftime('%H:%M:00')
if not max_time:
max_time = get_max_time()
max_time = (datetime.now() - timedelta(minutes=1)).strftime('%H:%M:59')
colums = """count(pid), SUM(cpu_percent), SUM(cpu_time), SUM(cpu_num_threads), SUM(memory_percent),
SUM(memory_rss), SUM(io_rw_counter), SUM(io_cycles_counter)"""
......
......@@ -11,73 +11,212 @@ import sqlite3
import argparse
import datetime
import psutil
import itertools
import warnings
import pkgutil
from slapos.collect.db import Database
from contextlib import closing
# install pandas, numpy and statsmodels for ARIMA prediction
try:
import pandas as pd
import numpy as np
from statsmodels.tsa.arima_model import ARIMA
except ImportError:
pass
@implementer(interface.IPromise)
class RunPromise(GenericPromise):
def __init__(self, config):
super(RunPromise, self).__init__(config)
# check disk space at least every 3 minutes
self.setPeriodicity(minute=3)
# check disk space at least every hours (heavy in computation)
self.setPeriodicity(float(self.getConfig('frequency', 60)))
def getDiskSize(self, disk_partition, database):
database = Database(database, create=False, timeout=10)
try:
# fetch disk size
database.connect()
where_query = "partition='%s'" % (disk_partition)
order = "datetime(date || ' ' || time) DESC"
query_result = database.select("disk", columns="free+used", where=where_query, order=order, limit=1)
result = query_result.fetchone()
if not result or not result[0]:
return None
disk_size = result[0]
except sqlite3.OperationalError as e:
# if database is still locked after timeout expiration (another process is using it)
# we print warning message and try the promise at next run until max warn count
locked_message = "database is locked"
if locked_message in str(e) and \
not self.raiseOnDatabaseLocked(locked_message):
return None
raise
finally:
# by using contextlib.closing, we don't need to close the database explicitly
with closing(database):
try:
database.close()
except Exception:
pass
# fetch disk size
database.connect()
where_query = "partition='%s'" % (disk_partition)
order = "datetime(date || ' ' || time) DESC"
query_result = database.select("disk", columns="free+used", where=where_query, order=order, limit=1)
result = query_result.fetchone()
if not result or not result[0]:
return None
disk_size = result[0]
except sqlite3.OperationalError as e:
# if database is still locked after timeout expiration (another process is using it)
# we print warning message and try the promise at next run until max warn count
locked_message = "database is locked"
if locked_message in str(e) and \
not self.raiseOnDatabaseLocked(locked_message):
return None
raise
return disk_size
def getFreeSpace(self, disk_partition, database, date, time):
database = Database(database, create=False, timeout=10)
try:
# fetch free disk space
database.connect()
where_query = "time between '%s:00' and '%s:30' and partition='%s'" % (time, time, disk_partition)
query_result = database.select("disk", date, "free", where=where_query)
result = query_result.fetchone()
if not result or not result[0]:
self.logger.info("No result from collector database: disk check skipped")
return 0
disk_free = result[0]
except sqlite3.OperationalError as e:
# if database is still locked after timeout expiration (another process is using it)
# we print warning message and try the promise at next run until max warn count
locked_message = "database is locked"
if locked_message in str(e) and \
not self.raiseOnDatabaseLocked(locked_message):
return 0
raise
finally:
with closing(database):
try:
database.close()
except Exception:
pass
# fetch free disk space
database.connect()
where_query = "time between '%s:00' and '%s:30' and partition='%s'" % (time, time, disk_partition)
query_result = database.select("disk", date, "free", where=where_query)
result = query_result.fetchone()
if not result or not result[0]:
self.logger.info("No result from collector database: disk check skipped")
return 0
disk_free = result[0]
except sqlite3.OperationalError as e:
# if database is still locked after timeout expiration (another process is using it)
# we print warning message and try the promise at next run until max warn count
locked_message = "database is locked"
if locked_message in str(e) and \
not self.raiseOnDatabaseLocked(locked_message):
return 0
raise
return int(disk_free)
def getBiggestPartitions(self, database, date, time):
# displays the 3 biggest partitions thanks to disk usage
limit = 3
database = Database(database, create=False, timeout=10)
with closing(database):
try:
database.connect()
date_time = date + ' ' + time
# gets the data recorded between the current date (date_time) and 24 hours earlier
where_query = "datetime(date || ' ' || time) >= datetime('%s', '-1 days') AND datetime(date || ' ' || time) <= datetime('%s')"
# gets only the most recent data for each partition
result = database.select(
"folder",
columns = "partition, disk_used*1024, max(datetime(date || ' ' || time))",
where = where_query % (date_time, date_time),
group = "partition",
order = "disk_used DESC",
limit = limit).fetchall()
if not result or not result[0]:
self.logger.info("No result from collector database in table folder: skipped")
return None
except sqlite3.OperationalError as e:
# if database is still locked after timeout expiration (another process is using it)
# we print warning message and try the promise at next run until max warn count
locked_message = "database is locked"
if locked_message in str(e) and \
not self.raiseOnDatabaseLocked(locked_message):
return None
raise
return result
def evaluateArimaModel(self, X, arima_order):
"""
Evaluate an ARIMA model for a given order (p,d,q) with the MSE which
measures the average of the squares of the errors.
"""
# take 66% of the data for training and 33% for testing
train_size = int(len(X) * 0.66)
train, test = X[0:train_size], X[train_size:]
history = [x for x in train]
# make predictions
predictions = list()
for t in range(len(test)):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
model = ARIMA(history, order=arima_order)
model_fit = model.fit(disp=-1)
yhat = model_fit.forecast()[0]
predictions.append(yhat)
history.append(test[t])
# calculate out of sample error
rmse = (np.square(np.subtract(test.values, np.hstack(predictions))).mean())**0.5
return rmse
def evaluateModels(self, dataset, p_values, d_values, q_values):
"""
Evaluate combinations of p, d and q values for an ARIMA model
"""
dataset = dataset.astype('float32')
best_score, best_cfg = float("inf"), None
for p in p_values:
for d in d_values:
for q in q_values:
order = (p,d,q)
try:
rmse = self.evaluateArimaModel(dataset, order)
if rmse < best_score:
best_score, best_cfg = rmse, order
except Exception:
pass
return best_cfg
def diskSpacePrediction(self, disk_partition, database, date, time, day_range):
"""
Returns an estimation of free disk space left depending on
the day_range parameter.
It uses Arima in order to predict data thanks to the 15 days before.
"""
database = Database(database, create=False, timeout=10)
with closing(database):
try:
database.connect()
# get one data per day, where each data is at the same time
where_query = "time between '%s:00' and '%s:30' and partition='%s'" % (
time, time, disk_partition)
result = database.select(
"disk",
columns = "free, datetime(date || ' ' || time)",
where = where_query,
order = "datetime(date || ' ' || time) ASC").fetchall()
# checks that there are at least 14 days of data
if (not result) or (len(result) < 14):
self.logger.info("No or not enough results from collector database in table disk: no prediction")
return None
# put the list in pandas dataframe format and set the right types
df = pd.DataFrame(data=result, columns=['free', 'date'])
df.loc[:,'date'] = pd.to_datetime(df.date)
df = df.astype({'free': np.float})
df = df.set_index('date')
# find the best configuration by trying different combinations
p_values = d_values = q_values = range(0, 3)
best_cfg = self.evaluateModels(df.free, p_values, d_values, q_values)
# set the days to be predicted
max_date_predicted = day_range+1
future_index_date = pd.date_range(df.index[-1], freq='24H', periods=max_date_predicted)
try:
# disabling warnings during the ARIMA calculation
with warnings.catch_warnings():
warnings.simplefilter("ignore")
model_arima = ARIMA(df, order=best_cfg)
# disp < 0 means no output about convergence information
model_arima_fit = model_arima.fit(disp=-1)
# save ARIMA predictions
fcast, _, conf = model_arima_fit.forecast(max_date_predicted, alpha=0.05)
# pass the same index as the others
fcast = pd.Series(fcast, index=future_index_date)
if fcast.empty:
self.logger.info("Arima prediction: none. Skipped prediction")
return None
except Exception:
self.logger.info("Arima prediction error: skipped prediction")
return None
# get results with 95% confidence
lower_series = pd.Series(conf[:, 0], index=future_index_date)
upper_series = pd.Series(conf[:, 1], index=future_index_date)
return fcast, lower_series, upper_series
except sqlite3.OperationalError as e:
# if database is still locked after timeout expiration (another process is using it)
# we print warning message and try the promise at next run until max warn count
locked_message = "database is locked"
if locked_message in str(e) and \
not self.raiseOnDatabaseLocked(locked_message):
return None
raise
def raiseOnDatabaseLocked(self, locked_message):
max_warn = 10
latest_result_list = self.getLastPromiseResultList(result_count=max_warn)
......@@ -149,7 +288,7 @@ class RunPromise(GenericPromise):
disk_partition = self.getConfig('test-disk-partition', '/dev/sda1')
else:
# get last minute
now = datetime.datetime.now()
now = datetime.datetime.utcnow()
currentdate = now.strftime('%Y-%m-%d')
currenttime = now - datetime.timedelta(minutes=1)
currenttime = currenttime.time().strftime('%H:%M')
......@@ -158,8 +297,7 @@ class RunPromise(GenericPromise):
default_threshold = None
if disk_size is not None:
default_threshold = round(disk_size/(1024*1024*1024) * 0.05, 2)
threshold = float(self.getConfig('threshold', default_threshold) or 2.0)
threshold_days = float(self.getConfig('threshold-days', '30'))
threshold = float(self.getConfig('threshold', default_threshold) or default_threshold)
free_space = self.getFreeSpace(disk_partition, db_path, currentdate,
currenttime)
......@@ -170,15 +308,60 @@ class RunPromise(GenericPromise):
if inode_usage:
self.logger.error(inode_usage)
else:
self.logger.info("Disk usage: OK")
self.logger.info("Current disk usage: OK")
# if the option is enabled and the current disk size is large enough,
# we check the predicted remaining disk space
display_prediction = bool(int(self.getConfig('display-prediction', 0) or 0))
self.logger.info("Enable to display disk space predictions: %s" % display_prediction)
if display_prediction:
# check that the libraries are installed from the slapos.toolbox extra requires
pandas_found = pkgutil.find_loader("pandas")
numpy_found = pkgutil.find_loader("numpy")
statsmodels_found = pkgutil.find_loader("statsmodels")
# if one module isn't installed
if pandas_found is None or numpy_found is None or statsmodels_found is None:
self.logger.warning("Trying to use statsmodels and pandas " \
"but at least one module is not installed. Prediction skipped.")
return
nb_days_predicted = int(self.getConfig('nb-days-predicted', 10) or 10)
disk_space_prediction_tuple = self.diskSpacePrediction(
disk_partition, db_path, currentdate, currenttime, nb_days_predicted)
if disk_space_prediction_tuple is not None:
fcast, lower_series, upper_series = disk_space_prediction_tuple
space_left_predicted = fcast.iloc[-1]
last_date_predicted = datetime.datetime.strptime(str(fcast.index[-1]),
"%Y-%m-%d %H:%M:%S")
delta_days = (last_date_predicted.date() - \
datetime.datetime.strptime(currentdate, "%Y-%m-%d").date()).days
self.logger.info("Prediction: there will be %.2f G left on %s (%s days)." % (
space_left_predicted/(1024*1024*1024), last_date_predicted, delta_days))
if space_left_predicted <= threshold*1024*1024*1024:
self.logger.warning("The free disk space will be too low. " \
"(disk size: %.2f G, threshold: %s G)" % (
disk_size/(1024*1024*1024), threshold))
return
free_space = round(free_space/(1024*1024*1024), 2)
self.logger.error('Free disk space low: remaining %s G (threshold: %s G)' % (
free_space, threshold))
message = "Free disk space low: remaining %.2f G (disk size: %.0f G, threshold: %.0f G)." % (
free_space/(1024*1024*1024), disk_size/(1024*1024*1024), threshold)
display_partition = bool(int(self.getConfig('display-partition', 0) or 0))
self.logger.info("Enable to display the 3 biggest partitions: %s" % display_partition)
if display_partition:
# display the 3 partitions that have the most storage capacity on the disk
big_partitions = self.getBiggestPartitions(db_path, currentdate, currenttime)
if big_partitions is not None:
for partition in big_partitions:
user_name, size_partition, date_checked = partition
partition_id = self.getConfig('partition-id', 'slappart')
# get the name of each partition by adding the user's number to the general name of the partition
partition_name = ''.join(x for x in partition_id if not x.isdigit()) + ''.join(filter(str.isdigit, user_name))
message += " The partition %s uses %.2f G (date checked: %s)." % (
partition_name, size_partition/(1024*1024*1024), date_checked)
# display the final error message
self.logger.error(message)
def test(self):
return self._test(result_count=1, failure_amount=1)
def anomaly(self):
return self._test(result_count=3, failure_amount=3)
return self._test(result_count=3, failure_amount=3)
\ No newline at end of file
from __future__ import division
from zope.interface import implementer
from slapos.grid.promise import interface
from slapos.grid.promise.generic import GenericPromise
import os
import sys
import pwd
import sqlite3
import argparse
import datetime
import psutil
import math
import pkgutil
# try to install pandas and numpy
try:
import pandas as pd
import numpy as np
except ImportError:
pass
from slapos.collect.db import Database
from contextlib import closing
@implementer(interface.IPromise)
class RunPromise(GenericPromise):
def __init__(self, config):
super(RunPromise, self).__init__(config)
# at least every hours (heavy in computation)
self.setPeriodicity(float(self.getConfig('frequency', 60)))
def getDiskSize(self, disk_partition, db_path):
database = Database(db_path, create=False, timeout=10)
# by using contextlib.closing, we don't need to close the database explicitly
with closing(database):
try:
database.connect()
where_query = "partition='%s'" % (disk_partition)
order = "datetime(date || ' ' || time) DESC"
result = database.select(
"disk",
columns="free+used",
where=where_query,
order=order,
limit=1).fetchone()
if not result or not result[0]:
return None
disk_size = result[0]
except sqlite3.OperationalError as e:
# if database is still locked after timeout expiration (another process is using it)
# we print warning message and try the promise at next run until max warn count
locked_message = "database is locked"
if locked_message in str(e) and \
not self.raiseOnDatabaseLocked(locked_message):
return None
raise
return disk_size
def getPartitionSize(self, disk_partition, db_path):
database = Database(db_path, create=False, timeout=10)
with closing(database):
try:
database.connect()
where_query = "partition='%s'" % (disk_partition)
order = "datetime(date || ' ' || time) DESC"
result = database.select(
"folder",
columns="disk_used*1024",
where=where_query,
order=order,
limit=1).fetchone()
if not result or not result[0]:
return None
partition_size = result[0]
except sqlite3.OperationalError as e:
# if database is still locked after timeout expiration (another process is using it)
# we print warning message and try the promise at next run until max warn count
locked_message = "database is locked"
if locked_message in str(e) and \
not self.raiseOnDatabaseLocked(locked_message):
return None
raise
return partition_size
def getAnomaly(self, disk_partition, db_path, user, date, time):
database = Database(db_path, create=False, timeout=10)
with closing(database):
try:
disk_size = self.getDiskSize(disk_partition, db_path)
if disk_size is None:
return None
database.connect()
result = database.select(
"folder",
columns = "%s-disk_used*1024, disk_used*1024, datetime(date || ' ' || time)" % disk_size,
where = "partition='%s'" % (user),
order = "date ASC, time ASC"
).fetchall()
if not result or not result[0]:
self.logger.info("No result from collector database for the user %s: skipped", user)
return None
datetime_now = datetime.datetime.strptime(date + ' ' + time, "%Y-%m-%d %H:%M:%S")
# check that the last data is less than 24 hours old
last_date = datetime.datetime.strptime(result[-1][2], "%Y-%m-%d %H:%M:%S")
if (datetime_now - last_date) > datetime.timedelta(days=1):
self.logger.info("Not enough recent data to detect anomalies: skipped")
return None
# check that the first data is at least 13 days old
first_date = datetime.datetime.strptime(result[0][2], "%Y-%m-%d %H:%M:%S")
if (datetime_now - first_date) < datetime.timedelta(days=13):
self.logger.info("Not enough data to detect anomalies: skipped")
return None
df = pd.DataFrame(result, columns=["free", "used", "date"])
df.loc[:,'date'] = pd.to_datetime(df.date)
# keep a sample every 5 minutes, set NaN when there is no information
freq = 5
df = df.resample(str(freq)+"min", on='date').mean()
# estimate the missing information
df['free'] = df.free.astype(float).interpolate(method='linear')
df['used'] = df.used.astype(float).interpolate(method='linear')
# calculate the median for the element-wise absolute value
# of the difference between each x and the median of x
df = df.reset_index()
x = df['date']
y = df['free']
mad = lambda x: np.median(np.fabs(x - np.median(x)))
# threshold is set at 8% of the disk size by default
threshold_ratio = float(self.getConfig('threshold-ratio', 0.08) or 0.08)
threshold = threshold_ratio*disk_size
# use a 1-day window
minutes_per_day = 60*24/freq
rolling_window = int(minutes_per_day*1)
rolling_mad = y.rolling(window=rolling_window, center=False).median() + \
y.rolling(window=rolling_window, center=False).apply(mad)
rolling_mad_upper = rolling_mad + threshold
rolling_mad_lower = rolling_mad - threshold
# create Pandas DataFrame and rename columns
data = pd.concat([x, y, df['used'], rolling_mad, rolling_mad_upper, rolling_mad_lower], axis=1)
data.columns = ["date", "free", "used", "mad", "upper_mad", "lower_mad"]
# drop initial values (outside rolling window)
data.dropna(subset=["mad"], inplace=True)
# determine anomalies and display their number
data["is_anomaly"] = ~(data["free"].between(data["lower_mad"], data["upper_mad"]))
data = data.set_index("date")
if (len(data)==0):
self.logger.info("No result from anomaly detection")
return None
self.logger.info("There were %s anomalies in the last 15 days " \
"(1 data every %s minutes, threshold: %s %% of the disk size)" % (
len(data[data['is_anomaly'] == True]), freq, threshold_ratio*100))
return data
except sqlite3.OperationalError as e:
# if database is still locked after timeout expiration (another process is using it)
# we print warning message and try the promise at next run until max warn count
locked_message = "database is locked"
if locked_message in str(e) and \
not self.raiseOnDatabaseLocked(locked_message):
return None
raise
@staticmethod
def _checkInodeUsage(path):
stat = os.statvfs(path)
total_inode = stat.f_files
if total_inode:
usage = 100 * (total_inode - stat.f_ffree) / total_inode
if usage >= 98:
return "Disk Inodes usage is really high: %.4f%%" % usage
def getInodeUsage(self, path):
return (self._checkInodeUsage(path) or
os.path.ismount('/tmp') and self._checkInodeUsage('/tmp') or
"")
def sense(self):
# check that the libraries are installed from the slapos.toolbox extra requires
pandas_found = pkgutil.find_loader("pandas")
numpy_found = pkgutil.find_loader("numpy")
if pandas_found is None or numpy_found is None:
self.logger.warning("Trying to use pandas but the module is not installed. Promise skipped.")
return
# find if a disk is mounted on the path
disk_partition = ""
db_path = self.getConfig('collectordb')
check_date = self.getConfig('test-check-date')
path = os.path.join(self.getPartitionFolder(), "") + "extrafolder"
partitions = psutil.disk_partitions()
while path is not '/':
if not disk_partition:
path = os.path.dirname(path)
else:
break
for p in partitions:
if p.mountpoint == path:
disk_partition = p.device
break
if not disk_partition:
self.logger.error("Couldn't find disk partition")
return
if db_path.endswith("collector.db"):
db_path=db_path[:-len("collector.db")]
if check_date:
# testing mode
currentdate = check_date
currenttime = self.getConfig('test-check-time', '09:30:30')
user = self.getConfig('test-partition', 'slapuser0')
else:
# get the user name of the partition
user = pwd.getpwuid(os.getuid()).pw_name
# get last minute
now = datetime.datetime.utcnow()
currentdate = now.strftime('%Y-%m-%d')
currenttime = now - datetime.timedelta(minutes=1)
currenttime = currenttime.time().strftime('%H:%M:%S')
partition_size = self.getPartitionSize(user, db_path)
data = self.getAnomaly(disk_partition, db_path, user, currentdate, currenttime)
if data is None:
return
last_data = data.iloc[-1]
last_date = data.index[-1]
if last_data.is_anomaly:
self.logger.error("Anomaly detected on %s. Space used by %s: %.2f G." % (
last_date, user, partition_size/(1024*1024*1024)))
else:
self.logger.info("No anomaly detected (last date checked: %s)" % (last_date))
def test(self):
return self._test(result_count=1, failure_amount=1)
def anomaly(self):
return self._test(result_count=3, failure_amount=3)
import itertools
import json
import logging
import os
import textwrap
from dateutil import parser as dateparser
from slapos.grid.promise.generic import GenericPromise
def iter_reverse_lines(f):
"""
Read lines from the end of the file
"""
f.seek(0, os.SEEK_END)
while True:
try:
while f.seek(-2, os.SEEK_CUR) and f.read(1) != b'\n':
pass
except OSError:
return
pos = f.tell()
yield f.readline()
f.seek(pos, os.SEEK_SET)
def iter_logrotate_file_handle(path, mode='r'):
"""
Yield successive file handles for rotated logs
(XX.log, XX.log.1, XX.log.2, ...)
"""
for i in itertools.count():
path_i = path + str(i or '')
  • @Just1 @xavier_thompson @jhuge I don't know what's wrong, but it seems the docstring and the code mismatch here, the doc says it yields (XX.log, XX.log.1, XX.log.2, ...) but I think it yields (XX.log, XX.log1, XX.log2, ...)

Please register or sign in to reply
try:
with open(path_i, mode) as f:
yield f
except OSError:
break
class JSONPromise(GenericPromise):
def __init__(self, config):
self.__name = config.get('name', None)
self.__log_folder = config.get('log-folder', None)
super(JSONPromise, self).__init__(config)
json_log_name = os.path.splitext(self.__name)[0] + '.json.log'
self.__json_log_file = os.path.join(self.__log_folder, json_log_name)
self.json_logger = self.__makeJsonLogger(self.__json_log_file)
def __makeJsonLogger(self, json_log_file):
logger = logging.getLogger('json-logger')
logger.setLevel(logging.INFO)
handler = logging.FileHandler(json_log_file)
formatter = logging.Formatter(
'{"time": "%(asctime)s", "log_level": "%(levelname)s"'
', "message": "%(message)s", "data": %(data)s}'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def getJsonLogDataInterval(self, interval):
"""
Get all data in the last "interval" seconds from JSON log
Reads rotated logs too (XX.log, XX.log.1, XX.log.2, ...)
"""
oldest_timestamp = None
data_list = []
for f in iter_logrotate_file_handle(self.__json_log_file, 'rb'):
for line in iter_reverse_lines(f):
l = json.loads(line.decode().replace("'", '"'))
timestamp = dateparser.parse(l['time'])
data_list.append(l['data'])
oldest_timestamp = oldest_timestamp or timestamp
if (oldest_timestamp - timestamp).total_seconds() > interval:
return data_list
return data_list
def getJsonLogLatestTimestamp(log):
"""
Get latest timestamp from JSON log
Reads rotated logs too (XX.log, XX.log.1, XX.log.2, ...)
"""
for f in iter_logrotate_file_handle(self.__json_log_file, 'rb'):
for line in iter_reverse_lines(f):
l = json.loads(line.decode().replace("'", '"'))
return dateparser.parse(l['time'])
return 0
from dateutil import parser
from slapos.grid.promise.generic import GenericPromise
def tail_file(file_path, line_count=10):
"""
Returns the last lines of file.
"""
line_list = []
with open(file_path) as f:
BUFSIZ = 1024
......@@ -25,5 +116,4 @@ def tail_file(file_path, line_count=10):
size -= line_len
bytes -= BUFSIZ
block -= 1
return '\n'.join(''.join(line_list).splitlines()[-line_count:])
\ No newline at end of file
return '\n'.join(''.join(line_list).splitlines()[-line_count:])
BEGIN TRANSACTION;
CREATE TABLE disk (partition text, used text, free text, mountpoint text, date text, time text, reported integer NULL DEFAULT 0);
CREATE TABLE folder (partition text, disk_used real, date text, time text, reported integer NULL DEFAULT 0);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-18','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-19','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-20','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-21','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-22','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-23','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-24','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-25','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-26','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-27','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-28','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-29','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-09-30','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-10-01','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159220666368','288948396032','/','2017-10-02','09:17:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159237537792','288931524608','/','2017-10-02','09:18:01',1);
INSERT INTO "disk" VALUES('/dev/sda1','159238090752','288930971648','/','2017-10-02','09:19:02',1);
......@@ -14,4 +29,7 @@ INSERT INTO "disk" VALUES('/dev/sda1','159429677056','288739385344','/','2017-10
INSERT INTO "disk" VALUES('/dev/sda1','159444549632','288724512768','/','2017-10-02','09:28:03',1);
INSERT INTO "disk" VALUES('/dev/sda1','159472902144','288696160256','/','2017-10-02','09:29:02',1);
INSERT INTO "disk" VALUES('/dev/sda1','159476805632','288692256768','/','2017-10-02','09:30:03',1);
INSERT INTO "folder" VALUES('slapuser0',87533020.0,'2017-10-02','09:17:00',1);
INSERT INTO "folder" VALUES('slapuser1',21883255.0,'2017-10-02','09:17:00',1);
INSERT INTO "folder" VALUES('slapuser2',43766510.0,'2017-10-02','09:17:00',1);
COMMIT;
BEGIN TRANSACTION;
CREATE TABLE disk (partition text, used text, free text, mountpoint text, date text, time text, reported integer NULL DEFAULT 0);
CREATE TABLE folder (partition text, disk_used real, date text, time text, reported integer NULL DEFAULT 0);
INSERT INTO "disk" VALUES('disk_partition_name','159220666368','288948396032','/','2017-10-02','09:17:01',1);
INSERT INTO "disk" VALUES('disk_partition_name','159237537792','288931524608','/','2017-10-02','09:18:01',1);
INSERT INTO "disk" VALUES('disk_partition_name','159238090752','288930971648','/','2017-10-02','09:19:02',1);
INSERT INTO "disk" VALUES('disk_partition_name','159241568256','288927494144','/','2017-10-02','09:20:02',1);
INSERT INTO "disk" VALUES('disk_partition_name','159242719232','288926343168','/','2017-10-02','09:21:02',1);
INSERT INTO "disk" VALUES('disk_partition_name','159196176384','288972886016','/','2017-10-02','09:22:03',1);
INSERT INTO "disk" VALUES('disk_partition_name','159300747264','288868315136','/','2017-10-02','09:23:03',1);
INSERT INTO "disk" VALUES('disk_partition_name','159294308352','288874754048','/','2017-10-02','09:24:02',1);
INSERT INTO "disk" VALUES('disk_partition_name','159328468992','288840593408','/','2017-10-02','09:25:03',1);
INSERT INTO "disk" VALUES('disk_partition_name','159384883200','288784179200','/','2017-10-02','09:26:03',1);
INSERT INTO "disk" VALUES('disk_partition_name','159429677056','288739385344','/','2017-10-02','09:27:02',1);
INSERT INTO "disk" VALUES('disk_partition_name','159444549632','288724512768','/','2017-10-02','09:28:03',1);
INSERT INTO "disk" VALUES('disk_partition_name','159472902144','288696160256','/','2017-10-02','09:29:02',1);
INSERT INTO "disk" VALUES('disk_partition_name','159476805632','288692256768','/','2017-10-02','09:30:03',1);
INSERT INTO "folder" VALUES('slapuser0',21883255.0,'2017-09-18','09:30:20',1);
INSERT INTO "folder" VALUES('slapuser0',24071580.5,'2017-09-19','09:30:11',1);
INSERT INTO "folder" VALUES('slapuser0',23196250.3,'2017-09-20','09:30:08',1);
INSERT INTO "folder" VALUES('slapuser0',23415082.8,'2017-09-21','09:30:21',1);
INSERT INTO "folder" VALUES('slapuser0',23633915.4,'2017-09-22','09:30:09',1);
INSERT INTO "folder" VALUES('slapuser0',30636557.0,'2017-09-23','09:30:06',1);
INSERT INTO "folder" VALUES('slapuser0',28448231.5,'2017-09-24','09:30:19',1);
INSERT INTO "folder" VALUES('slapuser0',27572901.3,'2017-09-25','09:30:13',1);
INSERT INTO "folder" VALUES('slapuser0',29761226.8,'2017-09-26','09:30:15',1);
INSERT INTO "folder" VALUES('slapuser0',30855389.5,'2017-09-27','09:30:06',1);
INSERT INTO "folder" VALUES('slapuser0',30636557.0,'2017-09-28','09:30:11',1);
INSERT INTO "folder" VALUES('slapuser0',31074222.0,'2017-09-29','09:30:08',1);
INSERT INTO "folder" VALUES('slapuser0',31096105.3,'2017-09-30','09:30:05',1);
INSERT INTO "folder" VALUES('slapuser0',31949552.2,'2017-10-01','09:30:10',1);
INSERT INTO "folder" VALUES('slapuser0',87533020.0,'2017-10-02','09:30:02',1);
COMMIT;
\ No newline at end of file
......@@ -74,18 +74,19 @@ extra_config_dict = {
""" % {'collectordb': self.db_file}
self.writePromise(self.promise_name, content)
self.configureLauncher()
self.configureLauncher(timeout=20)
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(result['result']['message'], "No result from collector database: disk check skipped")
def test_disk_space_ok(self):
self.configureLauncher()
self.configureLauncher(timeout=20)
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(result['result']['message'], "Disk usage: OK")
message = "Current disk usage: OK\nEnable to display disk space predictions: False"
self.assertEqual(result['result']['message'], message)
def test_disk_space_nok(self):
content = """from slapos.promise.plugin.check_free_disk_space import RunPromise
......@@ -98,29 +99,62 @@ extra_config_dict = {
""" % {'collectordb': self.db_file}
self.writePromise(self.promise_name, content)
self.configureLauncher()
self.configureLauncher(timeout=20)
with self.assertRaises(PromiseError):
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], True)
self.assertEqual(result['result']['message'],
"Free disk space low: remaining 269.1 G (threshold: 278.0 G)")
message = "Free disk space low: remaining 269.10 G (disk size: 417 G, threshold: 278 G)."
self.assertIn(message, result['result']['message'])
self.configureLauncher()
def test_display_partition(self):
content = """from slapos.promise.plugin.check_free_disk_space import RunPromise
extra_config_dict = {
'collectordb': '%(collectordb)s',
'test-check-date': '2017-10-02',
'threshold': '278',
'display-partition' : '1',
}
""" % {'collectordb': self.db_file}
self.writePromise(self.promise_name, content)
self.configureLauncher(timeout=20)
with self.assertRaises(PromiseError):
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], True)
self.assertEqual(result['result']['message'], "Free disk space low: remaining 269.1 G (threshold: 278.0 G)")
message = "Free disk space low: remaining 269.10 G (disk size: 417 G, threshold: 278 G). " \
"The partition slappart0 uses 83.48 G (date checked: 2017-10-02 09:17:00). " \
"The partition slappart2 uses 41.74 G (date checked: 2017-10-02 09:17:00). " \
"The partition slappart1 uses 20.87 G (date checked: 2017-10-02 09:17:00)."
self.assertIn(message, result['result']['message'])
def test_display_prediction(self):
content = """from slapos.promise.plugin.check_free_disk_space import RunPromise
extra_config_dict = {
'collectordb': '%(collectordb)s',
'test-check-date': '2017-10-02',
'display-prediction' : '1',
}
""" % {'collectordb': self.db_file}
self.writePromise(self.promise_name, content)
self.configureLauncher(timeout=20)
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertIn("Prediction:", result['result']['message'])
def test_check_free_disk_with_unicode_string_path(self):
# set path unicode
self.partition_dir = u'%s' % self.partition_dir
self.configureLauncher()
self.configureLauncher(timeout=20)
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(result['result']['message'], "Disk usage: OK")
self.assertIn("Current disk usage: OK", result['result']['message'])
if __name__ == '__main__':
unittest.main()
##############################################################################
#
# Copyright (c) 2018 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.test.promise.plugin import TestPromisePluginMixin
from slapos.grid.promise import PromiseError
import os
import sqlite3
import psutil
from slapos.grid.promise import PromiseError
class TestMonitorPartitionSpace(TestPromisePluginMixin):
def setUp(self):
TestPromisePluginMixin.setUp(self)
log_folder = os.path.join(self.partition_dir, 'var/log')
os.makedirs(log_folder)
# get disk partition name
disk_partition = ""
path = os.path.join(self.partition_dir, "") + "extrafolder"
partitions = psutil.disk_partitions()
while path is not '/':
if not disk_partition:
path = os.path.dirname(path)
else:
break
for p in partitions:
if p.mountpoint == path:
disk_partition = p.device
break
self.db_file = '/tmp/collector.db'
# populate db
self.conn = sqlite3.connect(self.db_file)
f = open(self.base_path+"/folder_disk_test.sql")
sql = f.read()
# replace every disk_partition_name string with disk partition name
sql = sql.replace('disk_partition_name', disk_partition)
self.conn.executescript(sql)
self.conn.close()
self.promise_name = "monitor-partition-space.py"
content = """from slapos.promise.plugin.monitor_partition_space import RunPromise
extra_config_dict = {
'collectordb': '%(collectordb)s',
'test-check-date': '2017-10-02',
}
""" % {'collectordb': self.db_file}
self.writePromise(self.promise_name, content)
def tearDown(self):
TestPromisePluginMixin.tearDown(self)
if os.path.exists(self.db_file):
os.remove(self.db_file)
def test_no_data_for_a_partition(self):
content = """from slapos.promise.plugin.monitor_partition_space import RunPromise
extra_config_dict = {
'collectordb': '%(collectordb)s',
'test-check-date': '2017-10-02',
'test-partition': 'slapuser1'
}
""" % {'collectordb': self.db_file}
self.writePromise(self.promise_name, content)
self.configureLauncher(timeout=20)
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(result['result']['message'],
"No result from collector database for the user slapuser1: skipped")
def test_no_recent_data(self):
content = """from slapos.promise.plugin.monitor_partition_space import RunPromise
extra_config_dict = {
'collectordb': '%(collectordb)s',
'test-check-date': '2017-10-04'
}
""" % {'collectordb': self.db_file}
self.writePromise(self.promise_name, content)
self.configureLauncher(force=True, timeout=20)
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(result['result']['message'], "Not enough recent data to detect anomalies: skipped")
def test_no_enough_data(self):
content = """from slapos.promise.plugin.monitor_partition_space import RunPromise
extra_config_dict = {
'collectordb': '%(collectordb)s',
'test-check-date': '2017-09-24'
}
""" % {'collectordb': self.db_file}
self.writePromise(self.promise_name, content)
self.configureLauncher(force=True, timeout=20)
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(result['result']['message'],
"Not enough data to detect anomalies: skipped")
def test_no_anomalies(self):
content = """from slapos.promise.plugin.monitor_partition_space import RunPromise
extra_config_dict = {
'collectordb': '%(collectordb)s',
'test-check-date': '2017-10-02',
'threshold-ratio': '0.70'
}
""" % {'collectordb': self.db_file}
self.writePromise(self.promise_name, content)
self.configureLauncher(force=True, timeout=20)
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertIn("No anomaly detected (last date checked: 2017-10-02 09:30:00)",
result['result']['message'])
def test_presence_of_anomalies(self):
self.configureLauncher(force=True, timeout=20)
with self.assertRaises(PromiseError):
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], True)
msg = "Anomaly detected on 2017-10-02 09:30:00. Space used by slapuser0: %.2f G."
self.assertIn(msg % (87533020.0/(1024*1024)), result['result']['message'])
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment