Commit edd593b4 authored by Nicolas Wavrant's avatar Nicolas Wavrant

Webrunner tests and new tools for resiliency

Adds tests for the Webrunner, and updates its code to make it easier to test. These tests are still incomplete, but they have the merit of existing. I don't really like mocking function calls, but globally it's a better way than spawning a supervisord server and recreating the whole slaprunner's system tree.  

Also adds tools (= scripts) which will be usefull for the monitoring in slapos : 
  * generation of RSS feed from status items, and management of them.
  * script for creating promises based on RSS feeds 

/reviewed-on nexedi/slapos.toolbox!8
parents 92680783 ad11ab67
Pipeline #2126 skipped
...@@ -56,12 +56,16 @@ setup(name=name, ...@@ -56,12 +56,16 @@ setup(name=name,
'networkbench' : ['pycurl'], 'networkbench' : ['pycurl'],
'check_web_page_http_cache_hit' : ['pycurl'], # needed for check_web_page_http_cache_hit module 'check_web_page_http_cache_hit' : ['pycurl'], # needed for check_web_page_http_cache_hit module
}, },
tests_require = [
'mock',
],
zip_safe=False, # proxy depends on Flask, which has issues with zip_safe=False, # proxy depends on Flask, which has issues with
# accessing templates # accessing templates
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'agent = slapos.agent.agent:main', 'agent = slapos.agent.agent:main',
'check-web-page-http-cache-hit = slapos.promise.check_web_page_http_cache_hit:main', 'check-web-page-http-cache-hit = slapos.promise.check_web_page_http_cache_hit:main',
'check-feed-as-promise = slapos.checkfeedaspromise:main',
'clouddestroy = slapos.cloudmgr.destroy:main', 'clouddestroy = slapos.cloudmgr.destroy:main',
'cloudgetprivatekey = slapos.cloudmgr.getprivatekey:main', 'cloudgetprivatekey = slapos.cloudmgr.getprivatekey:main',
'cloudgetpubliciplist = slapos.cloudmgr.getpubliciplist:main', 'cloudgetpubliciplist = slapos.cloudmgr.getpubliciplist:main',
...@@ -70,6 +74,7 @@ setup(name=name, ...@@ -70,6 +74,7 @@ setup(name=name,
'cloudstart = slapos.cloudmgr.start:main', 'cloudstart = slapos.cloudmgr.start:main',
'cloudstop = slapos.cloudmgr.stop:main', 'cloudstop = slapos.cloudmgr.stop:main',
'equeue = slapos.equeue:main', 'equeue = slapos.equeue:main',
'generatefeed = slapos.generatefeed:main',
'htpasswd = slapos.htpasswd:main', 'htpasswd = slapos.htpasswd:main',
'is-local-tcp-port-opened = slapos.promise.is_local_tcp_port_opened:main', 'is-local-tcp-port-opened = slapos.promise.is_local_tcp_port_opened:main',
'is-process-older-than-dependency-set = slapos.promise.is_process_older_than_dependency_set:main', 'is-process-older-than-dependency-set = slapos.promise.is_process_older_than_dependency_set:main',
...@@ -89,6 +94,7 @@ setup(name=name, ...@@ -89,6 +94,7 @@ setup(name=name,
'pubsubnotifier = slapos.pubsub.notifier:main', 'pubsubnotifier = slapos.pubsub.notifier:main',
'pubsubserver = slapos.pubsub:main', 'pubsubserver = slapos.pubsub:main',
'qemu-qmp-client = slapos.qemuqmpclient:main', 'qemu-qmp-client = slapos.qemuqmpclient:main',
'rdiffbackup.genstatrss = slapos.resilient.rdiffBackupStat2RSS:main',
'slapos-kill = slapos.systool:kill', 'slapos-kill = slapos.systool:kill',
'slaprunnertest = slapos.runner.runnertest:main', 'slaprunnertest = slapos.runner.runnertest:main',
'slaprunnerteststandalone = slapos.runner.runnertest:runStandaloneUnitTest', 'slaprunnerteststandalone = slapos.runner.runnertest:runStandaloneUnitTest',
......
# Command line script to test a RSS feed in a promise
# Checks that a given pattern can be found (or not) in the title or the
# description of the latest feed item.
# A time buffer option can be given, to determine if the emitter process is in
# a stalled state, in the case that no OK pattern has been found
import argparse
import datetime
import feedparser
import sys
def parseArguments():
parser = argparse.ArgumentParser()
parser.add_argument('--feed-path', dest='feed_path',
help='Path or Url of the feed to search')
parser.add_argument('--title', dest='title', action='store_true',
help='Patterns should be looked for in feed item\'s title')
parser.add_argument('--description', dest='description', action='store_true',
help='Patterns should be looked for in feed item\'s description')
parser.add_argument('--ok-pattern', dest='ok_pattern_list', action='append',
default=[],
help='If this pattern is found, then promise succeeds')
parser.add_argument('--ko-pattern', dest='ko_pattern_list', action='append',
default=[],
help='If this pattern is found, then promise fails')
parser.add_argument('--time-buffer', dest='time_buffer', type=int,
default=0,
help='Time delta in seconds before the promise really succeeds or fails')
return parser.parse_args()
def containsPattern(string, pattern_list):
for pattern in pattern_list:
if string.find(pattern) >= 0:
return True
return False
def checkFeedAsPromise(feed, option):
feed = feedparser.parse(feed)
if feed.bozo:
return 'Feed malformed'
if len(feed.entries) == 0:
return ''
last_item = feed.entries[-1]
if option.title:
candidate_string = last_item.title
elif option.description:
candidate_string = last_item.description
else:
return 'At least one in [--title|--description] should be provided'
publication_date = datetime.datetime(*last_item.published_parsed[:7])
publication_age = datetime.datetime.now() - publication_date
time_buffer = datetime.timedelta(seconds=option.time_buffer)
ok_pattern_found = containsPattern(candidate_string, option.ok_pattern_list)
ko_pattern_found = containsPattern(candidate_string, option.ko_pattern_list)
if ok_pattern_found and ko_pattern_found:
return 'Both OK and KO patterns found: please check arguments'
# Expectations fulfilled
if ok_pattern_found:
return ''
if ko_pattern_found:
return 'KO pattern found'
if not ok_pattern_found:
if publication_age < time_buffer:
# We have to wait for buffer to expire
return ''
else:
# If time-buffer is out, we are in stalled state
return 'Stalled situation'
# If not ok, and not stalled, what can have possibly happen ?
return 'Something went wrong'
def main():
option = parseArguments()
result = checkFeedAsPromise(option.feed_path, option)
if len(result) > 0:
sys.exit(result)
else:
sys.exit(0)
if __name__ == '__main__':
main()
# Command-line script to generate a RSS feed from a bunch of well-formated
# JSON items.
# This script tries to be the more generic possible. The items used to generate
# the feed must be JSON-formatted (because of simplicity to read/write them),
# and their keys must follow the names of elements of items as described
# in the RSS2 specification :
# http://cyber.law.harvard.edu/rss/rss.html#hrelementsOfLtitemgt
import argparse
import collections
import datetime
import json
import os
import PyRSS2Gen as rss
def parseArguments():
parser = argparse.ArgumentParser()
parser.add_argument('--output', dest='output', type=str, required=True,
help='Path where to save the file')
parser.add_argument('--status-item-path', dest='status_item_path',
type=str, required=True,
help='Path where to find feed items')
parser.add_argument('--max-item', dest='max_item', type=int,
default=50, required=False,
help='Maximum number of items in the feed')
parser.add_argument('--title', dest='feed_title', type=str, required=True,
help='Title of the feed')
parser.add_argument('--link', dest='feed_link', type=str, required=True,
help='Link of the feed')
parser.add_argument('--description', dest='feed_description',
type=str, required=False,
help='Description of the feed')
option = parser.parse_args()
if not hasattr(option, 'feed_description'):
option.feed_description = option.feed_title
return option
def deleteFileList(file_list):
for file in file_list:
try:
os.unlink(file)
except OSError:
pass
def getRSSItemListFromItemDict(item_dict):
rss_item_list = []
for item in item_dict:
item_dict[item]['pubDate'] = datetime.datetime.fromtimestamp(item_dict[item]['pubDate'])
rss_item_list.append(rss.RSSItem(**item_dict[item]))
return rss_item_list
def generateFeed(option):
item_dict = {} # {file: content}
for filename in os.listdir(option.status_item_path):
file_path = os.path.join(option.status_item_path, filename)
with open(file_path, 'r') as fd:
item_dict[file_path] = json.load(fd)
sorted_item_dict = collections.OrderedDict(
sorted(item_dict.items(), key=lambda x: x[1]['pubDate']))
# Reduces feed if number of items exceeds max_item
if len(item_dict) > option.max_item:
outdated_key_list = sorted_item_dict.keys()[option.max_item:]
for outdated_key in outdated_key_list:
del sorted_item_dict[outdated_key]
deleteFileList(outdated_key_list)
# Generate feed
feed = rss.RSS2(
title=option.feed_title,
link=option.feed_link,
description=option.feed_description,
lastBuildDate = datetime.datetime.now(),
items = getRSSItemListFromItemDict(sorted_item_dict)
)
return feed.to_xml()
def main():
option = parseArguments()
feed = generateFeed(option)
open(option.output, 'w').write(feed)
if __name__ == "__main__":
main()
...@@ -3,7 +3,10 @@ ...@@ -3,7 +3,10 @@
import argparse import argparse
import csv import csv
import datetime
import json
import httplib import httplib
import os
import socket import socket
import subprocess import subprocess
import sys import sys
...@@ -13,6 +16,20 @@ import urllib2 ...@@ -13,6 +16,20 @@ import urllib2
import urlparse import urlparse
import uuid import uuid
def createStatusItem(item_directory, instance_name, callback, date, link, status):
global app
callback_short_name = os.path.basename(callback)
content = json.dumps({
'title': '%s-PBS %s : %s' % (instance_name, callback_short_name, status),
'description': '%s run at %s' % (callback_short_name, datetime.datetime.fromtimestamp(date).isoformat()),
'pubDate': date,
'link': link,
})
item_path = os.path.join(item_directory, "status_%s" % time.time())
with open(item_path, 'w') as file:
file.write(content)
def main(): def main():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
...@@ -32,8 +49,31 @@ def main(): ...@@ -32,8 +49,31 @@ def main():
type=int, required=False, type=int, required=False,
help="Additional parameter for notification-url") help="Additional parameter for notification-url")
# Verbose mode
parser.add_argument('--instance-root-name', dest='instance_root_name',
type=str, required=False,
help="Path to config file containing info on instance")
parser.add_argument('--log-url', required=False, dest='log_url',
help="URL where the log file will be accessible")
parser.add_argument('--status-item-directory', dest='status_item_directory',
required=False, default='', type=str,
help="Directory containing PBS status to publish as feed.")
args = parser.parse_args() args = parser.parse_args()
if args.instance_root_name and args.log_url and args.status_item_directory:
# Verbose mode
saveStatus = lambda status: createStatusItem(args.status_item_directory,
args.instance_root_name,
args.executable[0],
time.time(),
args.log_url,
status)
else:
saveStatus = lambda status: None
saveStatus('STARTED')
try: try:
content = subprocess.check_output( content = subprocess.check_output(
args.executable[0], args.executable[0],
...@@ -45,7 +85,9 @@ def main(): ...@@ -45,7 +85,9 @@ def main():
args.executable[0], args.executable[0],
content.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;') content.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
)) ))
saveStatus('FINISHED')
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
saveStatus('ERROR')
content = e.output content = e.output
exit_code = e.returncode exit_code = e.returncode
content = ("FAILURE</br><p>%s Failed with returncode <em>%d</em>.</p>" content = ("FAILURE</br><p>%s Failed with returncode <em>%d</em>.</p>"
...@@ -91,23 +133,30 @@ def main(): ...@@ -91,23 +133,30 @@ def main():
notification_path += str(transaction_id) notification_path += str(transaction_id)
headers = {'Content-Type': feed.info().getheader('Content-Type')} headers = {'Content-Type': feed.info().getheader('Content-Type')}
error_message = ""
try: try:
notification = httplib.HTTPConnection(notification_url.hostname, notification = httplib.HTTPConnection(notification_url.hostname,
notification_port) notification_port)
notification.request('POST', notification_path, body, headers) notification.request('POST', notification_path, body, headers)
response = notification.getresponse() response = notification.getresponse()
if not (200 <= response.status < 300): if not (200 <= response.status < 300):
sys.stderr.write("The remote server at %s didn't send a successful reponse.\n" % notif_url) error_message = ("The remote server at %s didn't send a successful reponse.\n"
sys.stderr.write("Its response was %r\n" % response.reason) "Its response was %r\n") % (notif_url, response.reason)
some_notification_failed = True some_notification_failed = True
except socket.error as exc: except socket.error as exc:
sys.stderr.write("Connection with remote server at %s failed:\n" % notif_url) error_message = "Connection with remote server at %s failed:\n" % notif_url
sys.stderr.write(traceback.format_exc(exc)) error_message.append(traceback.format_exc(exc))
some_notification_failed = True some_notification_failed = True
finally:
if error_message:
sys.stderr.write(error_message)
saveStatus(saveStatus('ERROR ON NOTIFYING : %s') % error_message)
if some_notification_failed: if some_notification_failed:
sys.exit(1) sys.exit(1)
saveStatus('OK')
if __name__ == '__main__': if __name__ == '__main__':
main() main()
import argparse
import datetime
import os
import re
import time
import PyRSS2Gen as RSS2
from collections import OrderedDict
def parseArguments():
"""
Parse arguments for rdiff-backup statistics Rss Generator.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--rdiff_backup_data_folder',
help='Path where to find rdiff-backup statistical files')
parser.add_argument('--output',
help='Path where to save the feed')
parser.add_argument('--feed_url',
help='Url of this feed file.')
return parser.parse_args()
def makeDictFromStatFile(text_content):
m = re.search("([a-zA-Z]*) ([0-9 :.]*) \(([a-zA-Z0-9 :.]*)\)", text_content)
if m:
return {'metric': m.group(1),
'value': m.group(2),
'human_readable_value': m.group(3)}
m = re.search("([a-zA-Z]*) ([0-9]*)", text_content)
if m:
return {'metric': m.group(1),
'value': m.group(2),
'human_readable_value': m.group(2)}
def getRSSItemFromDict(item, option):
description = "Metric;Value;Human Readable Value\n"
for entry in item:
description += "%s;%s;%s\n" % (entry['metric'], entry['value'], entry['human_readable_value'])
if entry['metric'] == "EndTime":
pubDate = datetime.datetime.fromtimestamp(float(entry['value']))
return RSS2.RSSItem(
title="Rdiff-Backup Transfer Statistics",
link=option.feed_url,
pubDate=pubDate,
description=description)
def genRSS(option):
"""
Read statistics file from rdiff-backup and generate a RSS feed entry from it
"""
stat_file_list = sorted([file for file in os.listdir(option.rdiff_backup_data_folder)
if file.startswith('session_statistics')])
item_dict = OrderedDict()
for stat_file in stat_file_list:
with open(os.path.join(option.rdiff_backup_data_folder, stat_file), 'r') as file:
item_dict[stat_file] = [makeDictFromStatFile(line.strip()) for line in file.readlines()]
title="Rdiff-Backup Statistics"
rss_feed = RSS2.RSS2(
title=title,
link=option.feed_url,
description=title,
items = [getRSSItemFromDict(item_dict[item], option) for item in item_dict])
return rss_feed.to_xml()
def main():
option = parseArguments()
feed = genRSS(option)
with open(option.output, 'w') as rss_file:
rss_file.write(feed)
exit(0)
...@@ -44,7 +44,7 @@ def runProcess(config, process): ...@@ -44,7 +44,7 @@ def runProcess(config, process):
Start a process registered by supervisor Start a process registered by supervisor
""" """
server = xmlrpclib.Server(config['supervisord_server']) server = xmlrpclib.Server(config['supervisord_server'])
server.supervisor.startProcess(process) return server.supervisor.startProcess(process)
def runProcesses(config, processes): def runProcesses(config, processes):
...@@ -63,16 +63,21 @@ def stopProcess(config, process): ...@@ -63,16 +63,21 @@ def stopProcess(config, process):
""" """
if isRunning(config, process): if isRunning(config, process):
server = xmlrpclib.Server(config['supervisord_server']) server = xmlrpclib.Server(config['supervisord_server'])
server.supervisor.stopProcess(process) return server.supervisor.stopProcess(process)
def stopProcesses(config, processes): def stopProcesses(config, processes):
""" """
Stop a list of processes Stop a list of processes.
Returns True if all the processes have ended correctly.
Returns False if at least one process didn't stop correctly.
""" """
server = xmlrpclib.Server(config['supervisord_server']) server = xmlrpclib.Server(config['supervisord_server'])
return_status_list = []
for proc in processes: for proc in processes:
server.supervisor.stopProcess(proc) return_status_list.append(server.supervisor.stopProcess(proc))
return len(return_status_list) == sum(return_status_list)
def waitForProcessEnd(config, process): def waitForProcessEnd(config, process):
......
This diff is collapsed.
...@@ -25,7 +25,7 @@ from slapos.runner.utils import (checkSoftwareFolder, configNewSR, checkUserCred ...@@ -25,7 +25,7 @@ from slapos.runner.utils import (checkSoftwareFolder, configNewSR, checkUserCred
isSoftwareRunning, isSoftwareReleaseReady, isText, isSoftwareRunning, isSoftwareReleaseReady, isText,
loadSoftwareRList, md5sum, newSoftware, loadSoftwareRList, md5sum, newSoftware,
readFileFrom, readParameters, realpath, readFileFrom, readParameters, realpath,
removeInstanceRoot, removeProxyDb, removeCurrentInstance,
removeSoftwareByName, runSlapgridUntilSuccess, removeSoftwareByName, runSlapgridUntilSuccess,
saveBuildAndRunParams, saveBuildAndRunParams,
setMiniShellHistory, setMiniShellHistory,
...@@ -216,17 +216,9 @@ def supervisordStatus(): ...@@ -216,17 +216,9 @@ def supervisordStatus():
def removeInstance(): def removeInstance():
if isInstanceRunning(app.config): result = removeCurrentInstance(app.config)
flash('Instantiation in progress, cannot remove') if isinstance(result, str):
else: flash(result)
removeProxyDb(app.config)
stopProxy(app.config)
svcStopAll(app.config) # Stop All instance process
removeInstanceRoot(app.config)
param_path = os.path.join(app.config['etc_dir'], ".parameter.xml")
if os.path.exists(param_path):
os.remove(param_path)
flash('Instance removed')
return redirect(url_for('inspectInstance')) return redirect(url_for('inspectInstance'))
...@@ -375,12 +367,9 @@ def removeFile(): ...@@ -375,12 +367,9 @@ def removeFile():
def removeSoftwareDir(): def removeSoftwareDir():
try: status, message = removeSoftwareByName(app.config, request.form['md5'],
data = removeSoftwareByName(app.config, request.form['md5'],
request.form['title']) request.form['title'])
return jsonify(code=1, result=data) return jsonify(code=status, result=message)
except Exception as e:
return jsonify(code=0, result=str(e))
#read file and return content to ajax #read file and return content to ajax
......
import datetime
import feedparser
import time
import unittest
import PyRSS2Gen as RSS2
from slapos.checkfeedaspromise import checkFeedAsPromise
class Option(dict):
def __init__(self, **kw):
self.__dict__.update(kw)
def __setitem__(i, y):
self.__dict__[i] = y
class TestCheckFeedAsPromise(unittest.TestCase):
def getOptionObject(self, **kw):
"""
Returns an object containing options as properties, to simulate a call
to the tested script
"""
option = {
'title': False,
'description': False,
'time_buffer': 0,
'ok_pattern_list': [],
'ko_pattern_list': [],
}
option.update(kw)
return Option(**option)
def generateFeed(self, item_list):
return RSS2.RSS2(
title="Feed Title",
link="http://exemple.com",
description="Feed Description",
items=[RSS2.RSSItem(**item) for item in item_list]
).to_xml()
def generateOKFeed(self, extra_item_list=None):
item_list = [{
'title': 'Doing Something',
'description': 'work work work',
'pubDate': datetime.datetime.now(),
}, {
'title': 'Something Finished: OK',
'description': 'OK FINISHED DONE BASTA',
'pubDate': datetime.datetime.now(),
}]
if isinstance(extra_item_list, list):
item_list.append(extra_item_list)
return self.generateFeed(item_list)
def generateKOFeed(self, extra_item_list=None):
item_list = [{
'title': 'Doing Something',
'description': 'work work work',
'pubDate': datetime.datetime.now(),
}, {
'title': 'Something Finished: Error',
'description': 'FAILURE oops Arghh',
'pubDate': datetime.datetime.now(),
}]
if isinstance(extra_item_list, list):
item_list.extend(extra_item_list)
return self.generateFeed(item_list)
def test_ifOKFoundNoErrorReturned(self):
option = self.getOptionObject()
option.title = True
feed = self.generateOKFeed()
option.ok_pattern_list = ['OK']
self.assertEquals(checkFeedAsPromise(feed, option), "")
option.title, option.description = False, True
option.ok_pattern_list = ['DONE', 'OK']
self.assertEquals(checkFeedAsPromise(feed, option), "")
def test_ifKOFoundErrorReturned(self):
option = self.getOptionObject()
option.title = True
feed = self.generateKOFeed()
option.ko_pattern_list = ['Error']
self.assertNotEquals(checkFeedAsPromise(feed, option), "")
option.title, option.description = False, True
option.ko_pattern_list = ['FAILURE', 'Error']
self.assertNotEquals(checkFeedAsPromise(feed, option), "")
def test_ifNoOKPatternFoundErrorIsRaised(self):
option = self.getOptionObject()
option.title = True
feed = self.generateKOFeed()
# If no time buffer, then not OK is always wrong
option.ok_pattern_list = ['OK']
self.assertNotEquals(len(checkFeedAsPromise(feed, option)), 0)
# if time buffer, then not OK is wrong only after buffer expires
extra_item = {
'title': 'Something is Starting',
'description': 'Very long operation, but should last less than 1h',
'pubDate': datetime.datetime.now() - datetime.timedelta(seconds=3600),
}
feed = self.generateKOFeed([extra_item,])
option.time_buffer = 4000
# buffer longer than last item's age
self.assertEquals(checkFeedAsPromise(feed, option), "")
# shorter buffer, we want to raise an error
option.time_buffer = 1800
self.assertNotEquals(len(checkFeedAsPromise(feed, option)), 0)
def test_noItemInTheFeedIsNotAnError(self):
option = self.getOptionObject()
option.title = True
feed = self.generateFeed([])
self.assertEquals(checkFeedAsPromise(feed, option), "")
if __name__ == '__main__':
unittest.main()
import collections
import datetime
import feedparser
import json
import os
import shutil
import tempfile
import time
import unittest
from slapos.generatefeed import generateFeed
class Option(dict):
def __init__(self, **kw):
self.__dict__.update(kw)
def __setitem__(i, y):
self.__dict__[i] = y
class TestGenerateFeed(unittest.TestCase):
def setUp(self):
self.item_directory = tempfile.mkdtemp(dir='.')
self.feed_path = os.path.join(self.item_directory, 'path')
def tearDown(self):
shutil.rmtree(self.item_directory)
def getOptionObject(self, **kw):
"""
Returns an object containing options as properties, to simulate a call
to the tested script
"""
option = {
'output': self.feed_path,
'status_item_path': self.item_directory,
'max_item': 50,
'feed_title': 'Feed title',
'feed_link': 'http://example.com',
'feed_description': 'Feed description',
}
option.update(kw)
return Option(**option)
def saveAsStatusItem(self, filename, content):
"""
Save a JSON at filename in self.item_directory as a status item
"""
path = os.path.join(self.item_directory, filename)
with open(path, 'w') as status_file:
status_file.write(json.dumps(content))
def createItemSample(self):
"""
Populate item_directory with a few sample items
"""
item = [
# Last in alphabet, first in pubDate
('zzz.item',
{'description': 'description is OK too',
'link': "http://example.com",
'pubDate': time.mktime(datetime.datetime(2000, 1, 1).timetuple()),
'title': 'everything is OK',
}),
# First in pubDate, last in alphabet
('aaa.item',
{'description': 'what went wrong ?',
'link': "http://example.com",
'pubDate': time.mktime(datetime.datetime(2000, 12, 31).timetuple()),
'title': 'I guess we have an ERROR',
}),
]
for filename, content in item:
self.saveAsStatusItem(filename, content)
def test_feedItemsAreSortedByDate(self):
self.createItemSample()
option = self.getOptionObject()
content_feed = generateFeed(option)
feed = feedparser.parse(content_feed)
self.assertFalse(feed.bozo)
start_date = None
for item in feed.entries:
if start_date is None:
start_date = item.published_parsed
self.assertLessEqual(start_date, item.published_parsed)
def test_generateFeedCleanStatusDirectoryIfTooManyItems(self):
option = self.getOptionObject()
option.max_item = 10
# Creates items more than allowed
item_dummy_content = {
'description': 'dummy description',
'link': "http://example.com",
'pubDate': time.mktime(datetime.datetime.now().timetuple()),
'title': 'dummy title',
}
for i in range(15):
filename = '%s.item' % i
self.saveAsStatusItem(filename, item_dummy_content)
content_feed = generateFeed(option)
feed = feedparser.parse(content_feed)
self.assertFalse(feed.bozo)
# Feed entries number should be limited
self.assertEqual(len(feed.entries), option.max_item)
# Status item directory should have been cleaned
self.assertEqual(len(os.listdir(self.item_directory)), option.max_item)
if __name__ == '__main__':
unittest.main()
import csv
import feedparser
import os
import shutil
import tempfile
import unittest
from slapos.resilient.rdiffBackupStat2RSS import genRSS
class Option(dict):
def __init__(self, **kw):
self.__dict__.update(kw)
def __setitem__(i, y):
self.__dict__[i] = y
class TestRdiffBackupStat2RSS(unittest.TestCase):
def setUp(self):
self.data_directory = tempfile.mkdtemp(dir='.')
self.feed_path = os.path.join(self.data_directory)
def tearDown(self):
shutil.rmtree(self.data_directory)
def getOptionObject(self, **kw):
"""
Returns an object containing options as properties, to simulate a call
to the tested script
"""
option = {
'rdiff_backup_data_folder': self.data_directory,
'output': self.feed_path,
'feed_url': 'http://exemple.com',
}
option.update(kw)
return Option(**option)
def createSample(self):
"""
Writes 2 statistics file in rdiff-backup format
"""
with open(os.path.join(self.data_directory, 'session_statistics_1'), 'w') as stat_file:
stat_file.write("""\
StartTime 1473339659.00 (Thu Sep 8 15:00:59 2016)
EndTime 1473339667.81 (Thu Sep 8 15:01:07 2016)
ElapsedTime 8.81 (8.81 seconds)
SourceFiles 2381
SourceFileSize 142096473 (136 MB)
MirrorFiles 1
MirrorFileSize 0 (0 bytes)
NewFiles 2380
NewFileSize 142096473 (136 MB)
DeletedFiles 0
DeletedFileSize 0 (0 bytes)
ChangedFiles 1
ChangedSourceSize 0 (0 bytes)
ChangedMirrorSize 0 (0 bytes)
IncrementFiles 0
IncrementFileSize 0 (0 bytes)
TotalDestinationSizeChange 142096473 (136 MB)
Errors 0""")
with open(os.path.join(self.data_directory, 'session_statistics_2'), 'w') as stat_file:
stat_file.write("""\
StartTime 1473340154.00 (Thu Sep 8 15:09:14 2016)
EndTime 1473340154.95 (Thu Sep 8 15:09:14 2016)
ElapsedTime 0.95 (0.95 seconds)
SourceFiles 2381
SourceFileSize 142096473 (136 MB)
MirrorFiles 2381
MirrorFileSize 142096473 (136 MB)
NewFiles 0
NewFileSize 0 (0 bytes)
DeletedFiles 0
DeletedFileSize 0 (0 bytes)
ChangedFiles 15
ChangedSourceSize 230112 (225 KB)
ChangedMirrorSize 230112 (225 KB)
IncrementFiles 15
IncrementFileSize 2122 (2.07 KB)
TotalDestinationSizeChange 2122 (2.07 KB)
Errors 0""")
def test_generatedRSSIsCorrect(self):
self.createSample()
option = self.getOptionObject()
feed_content = genRSS(option)
feed = feedparser.parse(feed_content)
self.assertFalse(feed.bozo)
self.assertTrue(len(feed.entries), 2)
self.assertLess(feed.entries[0].published_parsed, feed.entries[1].published_parsed)
if __name__ == '__main__':
unittest.main()
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment