Commit 7b183636 authored by Jérome Perrin's avatar Jérome Perrin

pylint wip

parent 40d4407c
......@@ -9,7 +9,7 @@
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
......@@ -114,17 +114,17 @@ class HTTPCacheCheckerTestSuite(object):
"""Delete and create workgin directory
"""
if os.path.isdir(self.working_directory):
logging.debug('Re-creating folder:%r' % self.working_directory)
logging.debug('Re-creating folder:%r', self.working_directory)
shutil.rmtree(self.working_directory)
else:
logging.debug('Creating folder:%r' % self.working_directory)
logging.debug('Creating folder:%r', self.working_directory)
os.mkdir(self.working_directory)
def _runSpider(self):
"""Run wget in working_directory with headers in result
"""
wget_command_string = '%s %s %s' % (WGET, WGET_ARGS, self.root_url)
logging.debug('wget command:%r' % wget_command_string)
logging.debug('wget command:%r', wget_command_string)
wget_argument_list = shlex.split(wget_command_string)
wget_process = Popen(wget_argument_list, stdin=PIPE,
stdout=PIPE, stderr=STDOUT,
......@@ -132,7 +132,7 @@ class HTTPCacheCheckerTestSuite(object):
# in English
universal_newlines=True,
cwd=self.working_directory) # working directory
stdout, stderr = wget_process.communicate()
stdout, _ = wget_process.communicate()
return stdout
# def _fillCache(self):
......@@ -189,13 +189,13 @@ class HTTPCacheCheckerTestSuite(object):
re.compile('Content-Type:\s%s' % content_type_regex_str,
re.MULTILINE | re.IGNORECASE)
if content_type_regex.search(fetched_data) is not None:
for header, value in self.no_header_dict[section]:
for header, in self.no_header_dict[section]:
no_check_header_list.append(header)
continue
if url_regex_str_match is not None:
url_regex_str = url_regex_str_match.group(1)
if re.compile(url_regex_str).match(url) is not None:
for header, value in self.no_header_dict[section]:
for header, _ in self.no_header_dict[section]:
no_check_header_list.append(header)
return no_check_header_list
......@@ -311,16 +311,16 @@ class HTTPCacheCheckerTestSuite(object):
discarded = True
for line in wget_log_file.splitlines():
logging.debug('wget output:%r' % line)
logging.debug('wget output:%r', line)
code, value = self._parseWgetLine(line)
if code == self.URL_CODE:
# This is the first Line by URL checked in wget stdout
url = value
logging.debug('url:%r' % url)
logging.debug('url:%r', url)
discarded = False
if not first_pass and url in discarded_url_list:
# URL already checked during first pass
logging.debug('%r Discarded' % url)
logging.debug('%r Discarded', url)
discarded = True
elif self._isSameUrl(url):
discarded = True
......@@ -339,7 +339,7 @@ class HTTPCacheCheckerTestSuite(object):
self.report_dict.setdefault(url, []).append(message)
if code == self.FILE_PATH_CODE:
logging.debug('checking:%r' % value)
logging.debug('checking:%r', value)
# Here we check if Response was cached
file_path = os.path.sep.join((self.working_directory, value))
folder_path , filename = os.path.split(file_path)
......@@ -370,8 +370,8 @@ class HTTPCacheCheckerTestSuite(object):
try:
file_object = open(number_file_path, 'r')
except IOError:
logging.info('File not found for url:%r %r' %\
(url, (file_path, index_file_path, number_file_path),))
logging.info('File not found for url:%r %r',
url, (file_path, index_file_path, number_file_path),)
continue
fetched_data = file_object.read()
file_object.close()
......@@ -393,7 +393,7 @@ class HTTPCacheCheckerTestSuite(object):
if cached:
# This is a cached content with a positive hit value
# dot not check this url in second pass
logging.debug('will be discarded:%r' % url)
logging.debug('will be discarded:%r', url)
discarded_url_list.append(url)
else:
message = 'Not a cache hit! (got %s) (expect %s)' % (
......@@ -416,14 +416,13 @@ class HTTPCacheCheckerTestSuite(object):
return errors
"""
logging.info('#' * 52)
logging.info('date:%r' % datetime.now().isoformat())
logging.info('date:%r', datetime.now().isoformat())
logging.info('#' * 52)
self._initFolder()
logging.info('Starting pass:%r' % self.root_url)
logging.info('Starting pass:%r', self.root_url)
wget_log_file = self._runSpider()
x_varnish_reference_list, discarded_url_list =\
self._parseWgetLogs(wget_log_file,
self._parseWgetLogs(wget_log_file,
prohibited_file_name_list=prohibited_file_name_list,
prohibited_folder_name_list=prohibited_folder_name_list)
logging.info('End of second pass\n')
......
......@@ -3,11 +3,10 @@
import sys
import os
import psutil
import argparse
import logging
from six.moves import configparser
from slapos.grid.promise import PromiseLauncher, PromiseQueueResult, PromiseError
from slapos.grid.promise import PromiseLauncher, PromiseError
from slapos.grid.promise.generic import PROMISE_LOG_FOLDER_NAME
from slapos.util import mkdir_p
......
......@@ -94,7 +94,7 @@ def setupLogging(name=__name__, log_path=None):
logging.basicConfig(level=logging.INFO, format=logger_format)
root_logger = logging.getLogger('')
fd, fname = tempfile.mkstemp()
_, fname = tempfile.mkstemp()
file_handler = logging.FileHandler(fname)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
......@@ -146,7 +146,7 @@ class ScalabilityLauncher(object):
'runScalabilityTestSuite.log')
else:
log_path = None
logger, fname = setupLogging('runScalabilityTestSuite', log_path)
logger, _ = setupLogging('runScalabilityTestSuite', log_path)
self.log = logger.info
# Proxy to erp5 master test_result
......@@ -174,11 +174,11 @@ class ScalabilityLauncher(object):
def run(self):
self.log('Resiliency Launcher started, with:')
self.log('Test suite master url: %s' % self._argumentNamespace.test_suite_master_url)
self.log('Test suite: %s' % self._argumentNamespace.test_suite)
self.log('Test result path: %s' % self._argumentNamespace.test_result_path)
self.log('Revision: %s' % self._argumentNamespace.revision)
self.log('Node title: %s' % self._argumentNamespace.node_title)
self.log('Test suite master url: %s', self._argumentNamespace.test_suite_master_url)
self.log('Test suite: %s', self._argumentNamespace.test_suite)
self.log('Test result path: %s', self._argumentNamespace.test_result_path)
self.log('Revision: %s', self._argumentNamespace.revision)
self.log('Node title: %s', self._argumentNamespace.node_title)
while True:
time.sleep(5)
......
......@@ -66,7 +66,7 @@ class ERP5TestSuite(SlaprunnerTestSuite):
resource='getConnectionParameter/slappart0'
)
url = json.loads(json.loads(data)['_'])['family-default-v6']
self.logger.info('Retrieved erp5 url is:\n%s' % url)
self.logger.info('Retrieved erp5 url is:\n%s', url)
return url
def _getERP5Password(self):
......@@ -74,7 +74,7 @@ class ERP5TestSuite(SlaprunnerTestSuite):
resource='getConnectionParameter/slappart0'
)
password = json.loads(json.loads(data)['_'])['inituser-password']
self.logger.info('Retrieved erp5 password is:\n%s' % password)
self.logger.info('Retrieved erp5 password is:\n%s', password)
return password
def _getSlaprunnerServiceInformationList(self):
......@@ -141,7 +141,7 @@ class ERP5TestSuite(SlaprunnerTestSuite):
auth_handler,
HTTPSHandler(context=ssl_context)
)
self.logger.info('Calling ERP5 url %s' % url)
self.logger.info('Calling ERP5 url %s', url)
if data:
result = opener_director.open(url, data=data)
......@@ -170,8 +170,8 @@ class ERP5TestSuite(SlaprunnerTestSuite):
random.SystemRandom().sample(string.ascii_lowercase, 8)
)
self.slaprunner_user = 'slapos'
self.logger.info('Generated slaprunner user is: %s' % self.slaprunner_user)
self.logger.info('Generated slaprunner password is: %s' % self.slaprunner_password)
self.logger.info('Generated slaprunner user is: %s', self.slaprunner_user)
self.logger.info('Generated slaprunner password is: %s', self.slaprunner_password)
def pushDataOnMainInstance(self):
"""
......@@ -185,7 +185,7 @@ class ERP5TestSuite(SlaprunnerTestSuite):
self.logger.debug('Getting the backend URL...')
parameter_dict = self._getPartitionParameterDict()
self.slaprunner_backend_url = parameter_dict['backend-url']
self.logger.info('backend_url is %s.' % self.slaprunner_backend_url)
self.logger.info('backend_url is %s.', self.slaprunner_backend_url)
self.slaprunner_user = parameter_dict['init-user']
self.slaprunner_password = parameter_dict['init-password']
......@@ -209,7 +209,7 @@ class ERP5TestSuite(SlaprunnerTestSuite):
self._connectToSlaprunner('/startAllPartition')
self.logger.info('Waiting 30 seconds so that erp5 can be bootstrapped...')
for i in range(20):
for _ in range(20):
time.sleep(30)
try:
if "erp5" == self._getCreatedERP5SiteId():
......
......@@ -103,7 +103,7 @@ class GitlabTestSuite(SlaprunnerTestSuite):
resource='getConnectionParameter/slappart0'
)
password = json.loads(json.loads(data)['_'])['password']
self.logger.info('Retrieved gitlab root password is:\n%s' % password)
self.logger.info('Retrieved gitlab root password is:\n%s', password)
return password
def generateData(self):
......@@ -111,8 +111,8 @@ class GitlabTestSuite(SlaprunnerTestSuite):
random.SystemRandom().sample(string.ascii_lowercase, 8)
)
self.slaprunner_user = 'slapos'
self.logger.info('Generated slaprunner user is: %s' % self.slaprunner_user)
self.logger.info('Generated slaprunner password is: %s' % self.slaprunner_password)
self.logger.info('Generated slaprunner user is: %s', self.slaprunner_user)
self.logger.info('Generated slaprunner password is: %s', self.slaprunner_password)
def pushDataOnMainInstance(self):
"""
......@@ -127,7 +127,7 @@ class GitlabTestSuite(SlaprunnerTestSuite):
self.logger.debug('Getting the backend URL...')
parameter_dict = self._getPartitionParameterDict()
self.slaprunner_backend_url = parameter_dict['backend-url']
self.logger.info('backend_url is %s.' % self.slaprunner_backend_url)
self.logger.info('backend_url is %s.', self.slaprunner_backend_url)
self.slaprunner_user = parameter_dict['init-user']
self.slaprunner_password = parameter_dict['init-password']
......@@ -155,9 +155,9 @@ class GitlabTestSuite(SlaprunnerTestSuite):
self._deployInstance()
self._setGitlabConnectionParameter()
self.logger.info('Retrieved gitlab url is:\n%s' % self.backend_url)
self.logger.info('Gitlab root password is:\n%s' % self.password)
self.logger.info('Gitlab private token is:\n%s' % self.private_token)
self.logger.info('Retrieved gitlab url is:\n%s', self.backend_url)
self.logger.info('Gitlab root password is:\n%s', self.password)
self.logger.info('Gitlab private token is:\n%s', self.private_token)
self.logger.info('Waiting 90 seconds so that gitlab can be started...')
time.sleep(90)
......@@ -184,8 +184,8 @@ class GitlabTestSuite(SlaprunnerTestSuite):
for project in project_list:
self.default_project_list.append(project['name_with_namespace'])
self.logger.info('Gitlab project list is:\n%s' % self.default_project_list)
self.logger.info('Getting test file at url: %s' % self.file_uri)
self.logger.info('Gitlab project list is:\n%s', self.default_project_list)
self.logger.info('Getting test file at url: %s', self.file_uri)
self.sample_file = self._connectToGitlab(url=self.file_uri)
self.logger.info('Wait 10 minutes for main instance to have backup of gitlab...')
......
......@@ -43,7 +43,7 @@ def fetchKey(ip):
fail after XX (2?) hours.
"""
new_key = None
for i in range(0, 10):
for _ in range(0, 10):
try:
new_key = urlopen('http://%s:10080/get' % ip).read().strip()
break
......@@ -138,12 +138,12 @@ class KVMTestSuite(ResiliencyTestSuite):
Set a random key that will be stored inside of the virtual hard drive.
"""
self.key = ''.join(random.SystemRandom().sample(string.ascii_lowercase, 20))
self.logger.info('Generated key is: %s' % self.key)
self.logger.info('Generated key is: %s', self.key)
def pushDataOnMainInstance(self):
self.logger.info('Getting the KVM IP...')
self.ip = self._getPartitionParameterDict()['ipv6']
logger.info('KVM IP is %s.' % self.ip)
logger.info('KVM IP is %s.', self.ip)
for i in range(0, 60):
failure = False
......@@ -171,7 +171,7 @@ class KVMTestSuite(ResiliencyTestSuite):
)
new_key = fetchKey(self.ip)
logger.info('Key on this new instance is %s' % new_key)
logger.info('Key on this new instance is %s', new_key)
# Compare with original key. If same: success.
if new_key == self.key:
......
......@@ -77,8 +77,8 @@ class ResiliencyTestSuite(object):
Private method.
Make the specified clone instance takeover the main instance.
"""
self.logger.info('Replacing main instance by clone instance %s%s...' % (
self.namebase, target_clone))
self.logger.info('Replacing main instance by clone instance %s%s...',
self.namebase, target_clone)
root_partition_parameter_dict = self._getPartitionParameterDict()
takeover_url = root_partition_parameter_dict['takeover-%s-%s-url' % (namebase, target_clone)]
......@@ -87,7 +87,7 @@ class ResiliencyTestSuite(object):
# Do takeover
takeover_result = urlopen('%s?password=%s' % (takeover_url, takeover_password)).read()
if 'Error' in takeover_result:
raise Exception('Error while doing takeover: %s' % takeover_result)
raise Exception('Error while doing takeover: %s', takeover_result)
self.logger.info('Done.')
......@@ -110,7 +110,7 @@ class ResiliencyTestSuite(object):
"""
raise NotImplementedError('Overload me, I am an abstract method.')
def deleteTimestamp():
def deleteTimestamp(self):
"""
XXX-Nicolas delete .timestamp in test partition to force the full processing
by slapgrid, to force the good parameters to be passed to the instances of the tree
......@@ -144,14 +144,14 @@ class ResiliencyTestSuite(object):
self.logger.info('Waiting for new main instance to be ready...')
new_parameter_value = None
while not new_parameter_value or new_parameter_value == 'None' or new_parameter_value == old_parameter_value:
self.logger.info('Not ready yet. SlapOS says new parameter value is %s' % new_parameter_value)
self.logger.info('Not ready yet. SlapOS says new parameter value is %s', new_parameter_value)
new_parameter_value = self._getPartitionParameterDict().get(parameter_key, None)
time.sleep(30)
self.logger.info('New parameter value of instance is %s' % new_parameter_value)
self.logger.info('New parameter value of instance is %s', new_parameter_value)
return new_parameter_value
def _waitForCloneToBeReadyForTakeover(self, clone):
self.logger.info('Wait for Clone %s to be ready for takeover' % clone)
self.logger.info('Wait for Clone %s to be ready for takeover', clone)
root_partition_parameter_dict = self._getPartitionParameterDict()
takeover_url = root_partition_parameter_dict['takeover-%s-%s-url' % (self.namebase, clone)]
......@@ -170,10 +170,10 @@ class ResiliencyTestSuite(object):
"""
# Wait for XX minutes so that replication is done
self.logger.info(
'Sleeping for %s seconds before testing clone %s.' % (
'Sleeping for %s seconds before testing clone %s.',
self.sleep_time_between_test,
clone
))
)
time.sleep(self.sleep_time_between_test)
self._waitForCloneToBeReadyForTakeover(clone)
......@@ -182,7 +182,7 @@ class ResiliencyTestSuite(object):
if not self._testPromises():
return False
self.logger.info('Testing %s%s instance.' % (self.namebase, clone))
self.logger.info('Testing %s%s instance.', self.namebase, clone)
self._doTakeover(self.namebase, clone)
if self.test_type == UNIT_TEST_ERP5TESTNODE: # Run by classical erp5testnode using slapproxy
......
......@@ -31,7 +31,6 @@ from .resiliencytestsuite import ResiliencyTestSuite
import base64
from six.moves import http_cookiejar as cookielib
import json
from lxml import etree
import ssl
import time
from six.moves.urllib.request import HTTPCookieProcessor, HTTPSHandler, \
......@@ -123,7 +122,7 @@ class SlaprunnerTestSuite(ResiliencyTestSuite):
if json_data['code'] == 0:
raise IOError(json_data['result'])
data = json_data['result']
self.logger.info('Retrieved data are:\n%s' % data)
self.logger.info('Retrieved data are:\n%s', data)
except (ValueError, KeyError):
if data.find('<') != -1:
raise IOError(
......@@ -148,7 +147,7 @@ class SlaprunnerTestSuite(ResiliencyTestSuite):
data="filename=instance_root/../software.log&truncate=%s" % truncate)
try:
data = json.loads(data)['result']
self.logger.info('Tail of software.log:\n%s' % data)
self.logger.info('Tail of software.log:\n%s', data)
except (ValueError, KeyError):
self.logger.info("Fail to get software.log")
......@@ -165,7 +164,7 @@ class SlaprunnerTestSuite(ResiliencyTestSuite):
return self._connectToSlaprunner(resource='isSRReady')
except (NotHttpOkException, HTTPError) as error:
# The nginx frontend might timeout before software release is finished.
self.logger.warning('Problem occured when contacting the server: %s' % error)
self.logger.warning('Problem occured when contacting the server: %s', error)
return -1
status = getSRStatus()
......@@ -177,7 +176,7 @@ class SlaprunnerTestSuite(ResiliencyTestSuite):
else:
self.logger.info('Software release is still building. Sleeping...')
time.sleep(20)
for sleep_wait in range(3):
for _ in range(3):
self._retrieveSoftwareLogFileTail(truncate=100)
time.sleep(10)
......@@ -225,7 +224,7 @@ class SlaprunnerTestSuite(ResiliencyTestSuite):
self.logger.info('git-cloning ongoing, sleeping...')
def _openSoftwareRelease(self, software_release='erp5testnode/testsuite/dummy'):
self.logger.debug('Opening %s software release...' % software_release)
self.logger.debug('Opening %s software release...', software_release)
data = self._connectToSlaprunner(
resource='setCurrentProject',
data='path=workspace/slapos/software/%s/' % software_release
......@@ -249,7 +248,7 @@ class SlaprunnerTestSuite(ResiliencyTestSuite):
self.logger.debug('Getting the backend URL...')
parameter_dict = self._getPartitionParameterDict()
self.slaprunner_backend_url = parameter_dict['backend-url']
self.logger.info('backend_url is %s.' % self.slaprunner_backend_url)
self.logger.info('backend_url is %s.', self.slaprunner_backend_url)
self.slaprunner_user = parameter_dict['init-user']
self.slaprunner_password = parameter_dict['init-password']
......
from __future__ import division, print_function
import argparse
import itertools
import os
import re
import shutil
......@@ -57,7 +56,7 @@ def rsync(rsync_binary, source, destination, exclude_list=None, extra_args=None,
print(subprocess.check_output(arg_list))
except subprocess.CalledProcessError as e:
# All rsync errors are not to be considered as errors
allowed_error_message_list = \
allowed_error_message_regex = \
'^(file has vanished: |rsync warning: some files vanished before they could be transferred)'
if e.returncode != 24 or \
re.search(allowed_error_message_regex, e.output, re.M) is None:
......@@ -126,7 +125,7 @@ def getBackupFilesModifiedDuringExportList(config, export_start_date):
output = process.communicate(modified_files)[0]
retcode = process.poll()
if retcode:
raise CalledProcessError(retcode, rsync_arg_list[0], output=output)
raise subprocess.CalledProcessError(retcode, rsync_arg_list[0], output=output)
important_modified_file_list = output.splitlines()
not_important_modified_file_set = set(modified_files.splitlines()).difference(important_modified_file_list)
......@@ -141,9 +140,6 @@ def runExport():
args = parseArgumentList()
def _rsync(*params):
return rsync(args.rsync_binary, *params, dry=args.dry)
runner_working_path = os.path.join(args.srv_path, 'runner')
backup_runner_path = os.path.join(args.backup_path, 'runner')
......
......@@ -107,7 +107,7 @@ def writeSignatureFile(slappart_signature_method_dict, runner_working_path, sign
special_slappart_list = slappart_signature_method_dict.keys()
signature_list = []
for dirpath, dirname_list, filename_list in os.walk('.'):
for dirpath, _, filename_list in os.walk('.'):
if dirpath == '.' or not filename_list:
continue
......
......@@ -74,7 +74,5 @@ class TestMonitorCollect(unittest.TestCase):
self.assertEqual(2822535483392.0, data[2]['io_rw_counter'])
def tearDown(self):
os.remove("/tmp/collector.db")
if __name__ == '__main__':
unittest.main()
os.remove("/tmp/collector.db")
......@@ -140,8 +140,8 @@ class RunPromise(GenericPromise):
return arg_parser.parse_args(promise_cmd)
def test_promise_generic(self):
promise_file = self.generatePromiseScript('my_promise.py', success=True)
promise_file2 = self.generatePromiseScript('my_second_promise.py', success=True)
self.generatePromiseScript('my_promise.py', success=True)
self.generatePromiseScript('my_second_promise.py', success=True)
parser = self.getPromiseParser()
promise_runner = MonitorPromiseLauncher(parser)
promise_runner.start()
......@@ -179,7 +179,7 @@ class RunPromise(GenericPromise):
self.assertEqual(expected_result, second_result)
def test_promise_generic_failed(self):
promise_file = self.generatePromiseScript('my_promise.py', success=False)
self.generatePromiseScript('my_promise.py', success=False)
parser = self.getPromiseParser()
promise_runner = MonitorPromiseLauncher(parser)
promise_runner.start()
......@@ -201,7 +201,7 @@ class RunPromise(GenericPromise):
self.assertEqual(expected_result, my_result)
def test_promise_generic_status_change(self):
promise_file = self.generatePromiseScript('my_promise.py', success=False)
self.generatePromiseScript('my_promise.py', success=False)
parser = self.getPromiseParser()
promise_runner = MonitorPromiseLauncher(parser)
promise_runner.start()
......@@ -243,7 +243,7 @@ class RunPromise(GenericPromise):
self.assertEqual(expected_result, my_result)
def test_promise_generic_periodicity(self):
promise_file = self.generatePromiseScript('my_promise.py', success=False, periodicity=0.03)
self.generatePromiseScript('my_promise.py', success=False, periodicity=0.03)
parser = self.getPromiseParser()
promise_runner = MonitorPromiseLauncher(parser)
promise_runner.start()
......@@ -263,8 +263,8 @@ class RunPromise(GenericPromise):
self.assertTrue(os.path.exists(result_file))
def test_promise_generic_run_only(self):
promise_file = self.generatePromiseScript('my_promise.py', success=False)
second_promise = self.generatePromiseScript('my_second_promise.py', success=False)
self.generatePromiseScript('my_promise.py', success=False)
self.generatePromiseScript('my_second_promise.py', success=False)
parser = self.getPromiseParser()
promise_runner = MonitorPromiseLauncher(parser)
promise_runner.start()
......@@ -326,7 +326,7 @@ class RunPromise(GenericPromise):
self.assertTrue(os.path.exists(result2_file))
with open(result_file) as f:
result1 = json.load(f)
start_date = datetime.strptime(result1['result'].pop('date'), '%Y-%m-%dT%H:%M:%S+0000')
self.assertTrue(datetime.strptime(result1['result'].pop('date'), '%Y-%m-%dT%H:%M:%S+0000'))
expected_result = {
u'title': u'promise_1', u'name': u'promise_1',
......
......@@ -105,6 +105,3 @@ extra_config_dict = {{
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], True)
self.assertIn("Backup didn't start at correct time", result['result']['message'])
if __name__ == '__main__':
unittest.main()
......@@ -37,7 +37,6 @@ import datetime
import os
import shutil
import tempfile
import unittest
from slapos.util import bytes2str
......@@ -92,16 +91,16 @@ class TestCheckCertificate(TestPromisePluginMixin):
def createKeyCertificate(self, days=30):
key, key_pem = self.createKey()
certificate, certificate_pem = self.createCertificate(key, days)
_, certificate_pem = self.createCertificate(key, days)
with open(self.key_path, 'w') as fh:
fh.write(bytes2str(key_pem))
with open(self.certificate_path, 'a') as fh:
fh.write(bytes2str(certificate_pem))
def createKeyCertificateNotMatch(self):
key, key_pem = self.createKey()
another_key, another_key_pem = self.createKey()
certificate, certificate_pem = self.createCertificate(key)
key, _ = self.createKey()
_, another_key_pem = self.createKey()
_, certificate_pem = self.createCertificate(key)
with open(self.key_path, 'w') as fh:
fh.write(bytes2str(another_key_pem))
with open(self.certificate_path, 'a') as fh:
......@@ -240,7 +239,3 @@ class TestCheckCertificate(TestPromisePluginMixin):
class TestCheckCertificateSameFile(TestCheckCertificate):
same_file = True
if __name__ == '__main__':
unittest.main()
......@@ -199,5 +199,3 @@ extra_config_dict = {
self.assertEqual(result['result']['failed'], True)
self.assertEqual(result['result']['message'], "ERROR=11 (NOROUTE=0, UNREACHABLENET=11, TIMEOUT=0)")
if __name__ == '__main__':
unittest.main()
......@@ -104,6 +104,3 @@ extra_config_dict = {
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['message'], "ERROR: Site has 3 long request")
if __name__ == '__main__':
unittest.main()
......@@ -30,7 +30,6 @@ from slapos.test.promise.plugin import TestPromisePluginMixin
import tempfile
import os
import unittest
import shutil
import six
......@@ -212,6 +211,3 @@ extra_config_dict = {
"https://www.example.com/" % (filename,)
)
if __name__ == '__main__':
unittest.main()
......@@ -121,6 +121,3 @@ extra_config_dict = {
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(result['result']['message'], "Disk usage: OK")
if __name__ == '__main__':
unittest.main()
......@@ -28,7 +28,7 @@
from slapos.test.promise.plugin import TestPromisePluginMixin
from slapos.grid.promise import PromiseError
import os
import os.path
class TestCheckICMPPacketLost(TestPromisePluginMixin):
......@@ -153,6 +153,3 @@ extra_config_dict = {
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertTrue('packet_lost_ratio=100' in result['result']['message'])
if __name__ == '__main__':
unittest.main()
......@@ -89,6 +89,3 @@ BACKINGUP; OUT_OF_DATE=1, UP_TO_DATE=3; ltid=03d17cd516db69db (2019-08-02 13:09:
return
self.assertEqual(message, "No JSON object could be decoded")
if __name__ == '__main__':
unittest.main()
......@@ -69,6 +69,3 @@ from slapos.promise.plugin.check_partition_deployment_state import RunPromise
self.assertEqual(result['result']['message'], 'Buildout failed to process %s.' % self.partition_id)
with open(buildout_output) as f:
self.assertEqual(f.read(), message)
if __name__ == '__main__':
unittest.main()
......@@ -119,6 +119,3 @@ extra_config_dict = {
last_message = result['result']['message'].split('\n')[-1]
self.assertEqual(result['result']['failed'], True)
self.assertEqual(last_message, "FAILED: IPv4 reachable, IPv6 unreachable")
if __name__ == '__main__':
unittest.main()
......@@ -198,7 +198,7 @@ class CheckUrlAvailableMixin(TestPromisePluginMixin):
ip = SLAPOS_TEST_IPV4.decode('utf-8') \
if isinstance(SLAPOS_TEST_IPV4, bytes) \
else SLAPOS_TEST_IPV4
key, key_pem, csr, csr_pem = createCSR(
_, key_pem, csr, _ = createCSR(
u"testserver.example.com", ip)
_, cls.test_server_certificate_pem = cls.test_server_ca.signCSR(csr)
......
......@@ -89,6 +89,3 @@ extra_config_dict = {
self.assertEqual(result['result']['failed'], True)
message = "Monitor bootstrap exited with error.\n ---- Latest monitor-boostrap.log ----\nBuildout running..."
self.assertEqual(result['result']['message'], message)
if __name__ == '__main__':
unittest.main()
......@@ -25,10 +25,10 @@
#
##############################################################################
import os
import re
import unittest
import os.path
import psutil
from slapos.promise.apache_mpm_watchdog import watchServerStatus, \
from slapos.promise.apache_mpm_watchdog import \
loadJSONFile, writeJSONFile, getServerStatus, search_pid_regex
from . import data
......@@ -45,9 +45,7 @@ class TestApacheMPMWatchdog(unittest.TestCase):
self.assertEqual(['12345', '12346'],
re.findall(search_pid_regex, server_status))
def test_loadJSONFile(self):
self.assertEqual({},
loadJSONFile("couscous"))
......@@ -87,7 +85,3 @@ class TestApacheMPMWatchdog(unittest.TestCase):
self.assertNotEqual(None,
getServerStatus("https://www.erp5.com/", None, None))
if __name__ == '__main__':
unittest.main()
......@@ -106,7 +106,3 @@ class TestCheckApacheDigestResult(unittest.TestCase):
self._remove_file(self.today)
self._remove_file(self.yesterday)
shutil.rmtree('/tmp/ap')
if __name__ == '__main__':
unittest.main()
......@@ -103,6 +103,3 @@ class TestCheckSlowQueriesDigestResult(unittest.TestCase):
self._remove_file(self.yesterday)
shutil.rmtree('/tmp/ap')
if __name__ == '__main__':
unittest.main()
......@@ -111,6 +111,3 @@ class TestUserMemory(unittest.TestCase):
def tearDown(self):
if os.path.exists(self.db_file):
os.remove(self.db_file)
if __name__ == '__main__':
unittest.main()
......@@ -57,8 +57,3 @@ class TestLocalTcpPortOpened(unittest.TestCase):
self.assertEqual(isLocalTcpPortOpened("::1",port), True)
finally:
s.close()
if __name__ == '__main__':
unittest.main()
......@@ -7,10 +7,9 @@ import PyRSS2Gen as RSS2
from slapos.checkfeedaspromise import checkFeedAsPromise
class Option(dict):
def __init__(self, **kw):
self.__dict__.update(kw)
def __setitem__(i, y):
self.__dict__[i] = y
def __init__(self, **kw):
self.__dict__.update(kw)
class TestCheckFeedAsPromise(unittest.TestCase):
def getOptionObject(self, **kw):
......
......@@ -11,10 +11,8 @@ import six
from slapos.generatefeed import generateFeed
class Option(dict):
def __init__(self, **kw):
self.__dict__.update(kw)
def __setitem__(i, y):
self.__dict__[i] = y
def __init__(self, **kw):
self.__dict__.update(kw)
class TestGenerateFeed(unittest.TestCase):
def setUp(self):
......
......@@ -12,7 +12,7 @@ import mock
import slapos.runner.utils as runner_utils
import slapos.runner.views
import supervisor
sys.modules['slapos.runner.utils'].sup_process = mock.MagicMock()
......@@ -125,11 +125,11 @@ class TestRunnerBackEnd(unittest.TestCase):
self.sup_process.waitForProcessEnd.assert_called_once_with(config, process_name)
def test_runSoftwareWithLockMakesCorrectCallstoSupervisord(self):
self._runSlapgridWithLockMakesCorrectCallsToSupervisord(
self._runSlapgridWithLockMakesCorrectCallsToSupervisord( # pylint:disable=no-value-for-parameter
runner_utils.runSoftwareWithLock, 'slapgrid-sr')
def test_runInstanceWithLockMakesCorrectCallstoSupervisord(self):
self._runSlapgridWithLockMakesCorrectCallsToSupervisord(
self._runSlapgridWithLockMakesCorrectCallsToSupervisord( # pylint:disable=no-value-for-parameter
runner_utils.runInstanceWithLock, 'slapgrid-cp')
@mock.patch('os.path.exists')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment