Commit e497ca6c authored by Benjamin Blanc's avatar Benjamin Blanc Committed by Sebastien Robin

testnode: Modify testnode structure to include scalability tests.

Testnode code has been splitted. Runner class has been created for
each test types (unit and scalability).
Communication with SlapOS and ERP5 Master has been added.
parent b723ac57
from unittest import TestCase
from erp5.util.testnode.testnode import TestNode
from erp5.util.testnode.testnode import SlapOSInstance
from erp5.util.testnode.NodeTestSuite import SlapOSInstance, NodeTestSuite
from erp5.util.testnode.ProcessManager import ProcessManager, SubprocessError
from erp5.util.testnode.Updater import Updater
from erp5.util.testnode.SlapOSMasterCommunicator import SlapOSMasterCommunicator
from erp5.util.testnode.SlapOSControler import SlapOSControler
from erp5.util.testnode.UnitTestRunner import UnitTestRunner
from erp5.util.testnode.ScalabilityTestRunner import ScalabilityTestRunner
from erp5.util.testnode.SlapOSControler import createFolder
from erp5.util.taskdistribution import TaskDistributor
from erp5.util.taskdistribution import TaskDistributionTool
from erp5.util.taskdistribution import TestResultProxy
......@@ -25,6 +29,7 @@ class ERP5TestNode(TestCase):
self._temp_dir = tempfile.mkdtemp()
self.working_directory = os.path.join(self._temp_dir, 'testnode')
self.slapos_directory = os.path.join(self._temp_dir, 'slapos')
self.software_directory = os.path.join(self._temp_dir, 'software_directory')
self.test_suite_directory = os.path.join(self._temp_dir,'test_suite')
self.environment = os.path.join(self._temp_dir,'environment')
self.log_directory = os.path.join(self._temp_dir,'var/log/testnode')
......@@ -36,6 +41,7 @@ class ERP5TestNode(TestCase):
os.mkdir(self.working_directory)
os.mkdir(self.slapos_directory)
os.mkdir(self.test_suite_directory)
os.mkdir(self.software_directory)
os.mkdir(self.environment)
os.mkdir(self.system_temp_folder)
os.makedirs(self.log_directory)
......@@ -45,9 +51,17 @@ class ERP5TestNode(TestCase):
os.mkdir(self.remote_repository2)
def log(*args,**kw):
for arg in args:
print "TESTNODE LOG : %r" % (arg,)
print "TESTNODE LOG : %r, %r" % (arg, kw)
self.log = log
def returnGoodClassRunner(self, test_type):
if test_type == 'UnitTest':
return UnitTestRunner
elif test_type == 'ScalabilityTest':
return ScalabilityTestRunner
else:
raise NotImplementedError
def tearDown(self):
shutil.rmtree(self._temp_dir, True)
......@@ -55,15 +69,23 @@ class ERP5TestNode(TestCase):
# XXX how to get property the git path ?
config = {}
config["git_binary"] = "git"
config["slapos_directory"] = config["working_directory"] = self.working_directory
config["slapos_directory"] = self.slapos_directory
config["working_directory"] = self.working_directory
config["software_directory"] = self.software_directory
config["node_quantity"] = 3
config["test_suite_directory"] = self.test_suite_directory
config["environment"] = self.environment
config["log_directory"] = self.log_directory
config["log_file"] = self.log_file
config["test_suite_master_url"] = None
config["hateoas_slapos_master_url"] = None
config["test_node_title"] = "Foo-Test-Node"
config["system_temp_folder"] = self.system_temp_folder
config["computer_id"] = "COMP-TEST"
config["server_url"] = "http://foo.bar"
config["httpd_ip"] = "ff:ff:ff:ff:ff:ff:ff:ff"
config["httpd_software_access_port"] = "9080"
return TestNode(self.log, config)
def getTestSuiteData(self, add_third_repository=False, reference="foo"):
......@@ -91,6 +113,9 @@ class ERP5TestNode(TestCase):
def updateNodeTestSuiteData(self, node_test_suite,
add_third_repository=False):
"""
Update from zero/Regenerate the testsuite
"""
node_test_suite.edit(working_directory=self.working_directory,
**self.getTestSuiteData(add_third_repository=add_third_repository)[0])
......@@ -143,7 +168,7 @@ class ERP5TestNode(TestCase):
# ['4f1d14de1b04b4f878a442ee859791fa337bcf85', 'first_commit']]}
return commit_dict
def test_01_getDelNodeTestSuite(self):
def test_01_getDelNodeTestSuite(self, my_test_type='UnitTest'):
"""
We should be able to get/delete NodeTestSuite objects inside test_node
"""
......@@ -156,7 +181,7 @@ class ERP5TestNode(TestCase):
node_test_suite = test_node.getNodeTestSuite('foo')
self.assertEquals(0, node_test_suite.retry_software_count)
def test_02_NodeTestSuiteWorkingDirectory(self):
def test_02_NodeTestSuiteWorkingDirectory(self, my_test_type='UnitTest'):
"""
Make sure we extend the working path with the node_test_suite reference
"""
......@@ -168,7 +193,7 @@ class ERP5TestNode(TestCase):
self.assertEquals("%s/foo/test_suite" % self.working_directory,
node_test_suite.test_suite_directory)
def test_03_NodeTestSuiteCheckDataAfterEdit(self):
def test_03_NodeTestSuiteCheckDataAfterEdit(self, my_test_type='UnitTest'):
"""
When a NodeTestSuite instance is edited, the method _checkData
analyse properties and add new ones
......@@ -184,17 +209,21 @@ class ERP5TestNode(TestCase):
"%s/rep1" % node_test_suite.working_directory]
self.assertEquals(expected_list, repository_path_list)
def test_04_constructProfile(self):
def test_04_constructProfile(self, my_test_type='UnitTest'):
"""
Check if the software profile is correctly generated
"""
test_node = self.getTestNode()
test_node.test_suite_portal = TaskDistributor
test_node.test_suite_portal.getTestNode = TaskDistributor.getTestType
node_test_suite = test_node.getNodeTestSuite('foo')
self.updateNodeTestSuiteData(node_test_suite, add_third_repository=True)
test_node.constructProfile(node_test_suite)
node_test_suite.revision = 'rep1=1234-azerty,rep2=3456-qwerty'
test_node.constructProfile(node_test_suite,my_test_type)
self.assertEquals("%s/software.cfg" % (node_test_suite.working_directory,),
node_test_suite.custom_profile_path)
profile = open(node_test_suite.custom_profile_path, 'r')
if my_test_type=='UnitTest':
expected_profile = """
[buildout]
extends = %(temp_dir)s/testnode/foo/rep0/software.cfg
......@@ -207,10 +236,27 @@ branch = master
repository = %(temp_dir)s/testnode/foo/rep2
branch = foo
""" % {'temp_dir': self._temp_dir}
else:
revision1 = "azerty"
revision2 = "qwerty"
expected_profile = """
[buildout]
extends = %(temp_dir)s/testnode/foo/rep0/software.cfg
[rep1]
repository = <obfuscated_url>/rep1/rep1.git
revision = %(revision1)s
ignore-ssl-certificate = true
[rep2]
repository = <obfuscated_url>/rep2/rep2.git
revision = %(revision2)s
ignore-ssl-certificate = true
""" % {'temp_dir': self._temp_dir, 'revision1': revision1, 'revision2': revision2}
self.assertEquals(expected_profile, profile.read())
profile.close()
def test_05_getAndUpdateFullRevisionList(self):
def test_05_getAndUpdateFullRevisionList(self, my_test_type='UnitTest'):
"""
Check if we clone correctly repositories and get right revisions
"""
......@@ -234,7 +280,7 @@ branch = foo
for vcs_repository in node_test_suite.vcs_repository_list:
self.assertTrue(os.path.exists(vcs_repository['repository_path']))
def test_05b_changeRepositoryBranch(self):
def test_05b_changeRepositoryBranch(self, my_test_type='UnitTest'):
"""
It could happen that the branch is changed for a repository. Testnode must
be able to reset correctly the branch
......@@ -307,7 +353,7 @@ branch = foo
finally:
Updater.deleteRepository = original_deleteRepository
def test_06_checkRevision(self):
def test_06_checkRevision(self, my_test_type='UnitTest'):
"""
Check if we are able to restore older commit hash if master decide so
"""
......@@ -344,7 +390,7 @@ branch = foo
self.assertEquals([commit_dict['rep0'][0][0],commit_dict['rep1'][1][0]],
getRepInfo(hash=1))
def test_07_checkExistingTestSuite(self):
def test_07_checkExistingTestSuite(self, my_test_type='UnitTest'):
test_node = self.getTestNode()
test_suite_data = self.getTestSuiteData(add_third_repository=True)
self.assertEquals([], os.listdir(self.working_directory))
......@@ -360,7 +406,7 @@ branch = foo
test_node.checkOldTestSuite(test_suite_data)
self.assertEquals(['foo'], os.listdir(self.working_directory))
def test_08_getSupportedParamaterSet(self):
def test_08_getSupportedParamaterSet(self, my_test_type='UnitTest'):
original_spawn = ProcessManager.spawn
try:
def get_help(self, *args, **kw):
......@@ -377,7 +423,7 @@ branch = foo
finally:
ProcessManager.spawn = original_spawn
def test_09_runTestSuite(self):
def test_09_runTestSuite(self, my_test_type='UnitTest'):
"""
Check parameters passed to runTestSuite
Also make sure that --firefox_bin and --xvfb_bin are passed when needed
......@@ -385,36 +431,46 @@ branch = foo
original_getSupportedParameter = ProcessManager.getSupportedParameterSet
original_spawn = ProcessManager.spawn
try:
# Create a file
def _createPath(path_to_create, end_path):
os.makedirs(path_to_create)
return os.close(os.open(os.path.join(path_to_create,
end_path),os.O_CREAT))
def get_parameters(self, *args, **kw):
call_parameter_list.append({'args': [x for x in args], 'kw':kw})
def patch_getSupportedParameterSet(self, run_test_suite_path, parameter_list,):
if '--firefox_bin' and '--xvfb_bin' in parameter_list:
return set(['--firefox_bin','--xvfb_bin'])
else:
return []
test_node = self.getTestNode()
test_node.slapos_controler = SlapOSControler(self.working_directory,
test_node.config, self.log)
RunnerClass = self.returnGoodClassRunner(my_test_type)
runner = RunnerClass(test_node)
# Create and initialise/regenerate a nodetestsuite
node_test_suite = test_node.getNodeTestSuite('foo')
self.updateNodeTestSuiteData(node_test_suite)
node_test_suite.revision = 'dummy'
# Path to the dummy runable
run_test_suite_path = _createPath(
os.path.join(test_node.slapos_controler.instance_root,'a/bin'),'runTestSuite')
os.path.join(runner.slapos_controler.instance_root,'a/bin'),'runTestSuite')
def checkRunTestSuiteParameters(additional_parameter_list=None):
ProcessManager.getSupportedParameterSet = patch_getSupportedParameterSet
ProcessManager.spawn = get_parameters
test_node.runTestSuite(node_test_suite,"http://foo.bar")
RunnerClass = self.returnGoodClassRunner(my_test_type)
runner = RunnerClass(test_node)
runner.runTestSuite(node_test_suite,"http://foo.bar")
expected_parameter_list = ['%s/a/bin/runTestSuite'
%(test_node.slapos_controler.instance_root), '--test_suite', 'Foo', '--revision',
%(runner.slapos_controler.instance_root), '--test_suite', 'Foo', '--revision',
'dummy', '--test_suite_title', 'Foo-Test', '--node_quantity', 3, '--master_url',
'http://foo.bar']
if additional_parameter_list:
expected_parameter_list.extend(additional_parameter_list)
self.assertEqual(call_parameter_list[0]['args'], expected_parameter_list)
call_parameter_list = []
checkRunTestSuiteParameters()
_createPath(os.path.join(test_node.config['slapos_directory'], 'soft/a/parts/firefox'),'firefox-slapos')
......@@ -430,9 +486,11 @@ branch = foo
ProcessManager.getSupportedParameterSet = original_getSupportedParameter
ProcessManager.spawn = original_spawn
def test_10_prepareSlapOS(self):
def test_10_prepareSlapOS(self, my_test_type='UnitTest'):
test_node = self.getTestNode()
test_node_slapos = SlapOSInstance()
RunnerClass = self.returnGoodClassRunner(my_test_type)
runner = RunnerClass(test_node)
node_test_suite = test_node.getNodeTestSuite('foo')
node_test_suite.edit(working_directory=self.working_directory)
status_dict = {"status_code" : 0}
......@@ -448,39 +506,72 @@ branch = foo
"args": [x for x in args],
"kw": kw})
return {"status_code": self.status_code}
SlapOSControler.initializeSlapOSControler = Patch("initializeSlapOSControler")
SlapOSControler.runSoftwareRelease = Patch("runSoftwareRelease")
SlapOSControler.runComputerPartition = Patch("runComputerPartition")
test_node.prepareSlapOSForTestNode(test_node_slapos)
self.assertEquals(["initializeSlapOSControler", "runSoftwareRelease"],
method_list_for_prepareSlapOSForTestNode = ["initializeSlapOSControler",
"runSoftwareRelease"]
method_list_for_prepareSlapOSForTestSuite = ["initializeSlapOSControler",
"runSoftwareRelease", "runComputerPartition"]
runner.prepareSlapOSForTestNode(test_node_slapos)
self.assertEquals(method_list_for_prepareSlapOSForTestNode,
[x["method_name"] for x in call_list])
call_list = []
test_node.prepareSlapOSForTestSuite(node_test_suite)
self.assertEquals(["initializeSlapOSControler", "runSoftwareRelease",
"runComputerPartition"],
runner.prepareSlapOSForTestSuite(node_test_suite)
self.assertEquals(method_list_for_prepareSlapOSForTestSuite,
[x["method_name"] for x in call_list])
call_list = []
SlapOSControler.runSoftwareRelease = Patch("runSoftwareRelease", status_code=1)
self.assertRaises(SubprocessError, test_node.prepareSlapOSForTestSuite,
# TODO : write a test for scalability case
self.assertRaises(SubprocessError, runner.prepareSlapOSForTestSuite,
node_test_suite)
def test_11_run(self):
def test_11_run(self, my_test_type='UnitTest', grade='master'):
def doNothing(self, *args, **kw):
pass
# Used in case of 'ScalabilityTest'
def patch_getTestType(self, *args, **kw):
return my_test_type
def patch_getSlaposAccountKey(self, *args, **kw):
return "key"
def patch_getSlaposAccountCertificate(self, *args, **kw):
return "Certificate"
def patch_getSlaposUrl(self, *args, **kw):
return "http://Foo"
def patch_getSlaposHateoasUrl(self, *args, **kw):
return "http://Foo"
def patch_generateConfiguration(self, *args, **kw):
return json.dumps({"configuration_list": [], "involved_nodes_computer_guid"\
: [], "error_message": "No error.", "launcher_nodes_computer_guid": [], \
"launchable": False, "randomized_path" : "azertyuiop"})
def patch_isMasterTestnode(self, *args, **kw):
return (grade == 'master')
def patch_isHostingSubscriptionReady(self, *args, **kw):
return True
def patch_isRegisteredHostingSubscription(self, *args, **kw):
return True
test_self = self
test_result_path_root = os.path.join(test_self._temp_dir,'test/results')
os.makedirs(test_result_path_root)
global counter
counter = 0
def patch_startTestSuite(self,test_node_title):
def patch_startTestSuite(self,node_title,computer_guid='unknown'):
global counter
config_list = []
# Sclalability slave testnode is not directly in charge of testsuites
if my_test_type == 'ScalabilityTest' and grade == 'slave':
if counter == 5:
raise StopIteration
counter += 1
return json.dumps([])
def _checkExistingTestSuite(reference_set):
test_self.assertEquals(set(reference_set),
set(os.listdir(test_node.config["working_directory"])))
set(os.listdir(test_node.working_directory)))
for x in reference_set:
test_self.assertTrue(os.path.exists(os.path.join(
test_node.config["working_directory"],x)),True)
test_node.working_directory,x)),True)
if counter == 0:
config_list.append(test_self.getTestSuiteData(reference='foo')[0])
config_list.append(test_self.getTestSuiteData(reference='bar')[0])
......@@ -514,28 +605,75 @@ branch = foo
result = TestResultProxy(self._proxy, self._retry_time,
self._logger, test_result_path, node_title, revision)
return result
def patch_runTestSuite(self, *argv, **kw):
return {'status_code':0}
original_sleep = time.sleep
time.sleep = doNothing
self.generateTestRepositoryList()
RunnerClass = self.returnGoodClassRunner(my_test_type)
# Patch
if my_test_type == "ScalabilityTest":
original_getSlaposAccountKey = TaskDistributor.getSlaposAccountKey
original_getSlaposAccountCertificate = TaskDistributor.getSlaposAccountCertificate
original_getSlaposUrl = TaskDistributor.getSlaposUrl
original_getSlaposHateoasUrl = TaskDistributor.getSlaposHateoasUrl
original_generateConfiguration = TaskDistributor.generateConfiguration
original_isMasterTestnode = TaskDistributor.isMasterTestnode
original_updateInstanceXML = RunnerClass._updateInstanceXML
original_isHostingSubscriptionReady = SlapOSMasterCommunicator.isHostingSubscriptionReady
original_isRegisteredHostingSubscription = SlapOSMasterCommunicator.isRegisteredHostingSubscription
original_SlapOSMasterCommunicator__init__ = SlapOSMasterCommunicator.__init__
TaskDistributor.getSlaposAccountKey = patch_getSlaposAccountKey
TaskDistributor.getSlaposAccountCertificate = patch_getSlaposAccountCertificate
TaskDistributor.getSlaposUrl = patch_getSlaposUrl
TaskDistributor.getSlaposHateoasUrl = patch_getSlaposHateoasUrl
TaskDistributor.generateConfiguration = patch_generateConfiguration
TaskDistributor.isMasterTestnode = patch_isMasterTestnode
RunnerClass._updateInstanceXML = doNothing
SlapOSMasterCommunicator.isHostingSubscriptionReady = patch_isHostingSubscriptionReady
SlapOSMasterCommunicator.isRegisteredHostingSubscription = patch_isRegisteredHostingSubscription
SlapOSMasterCommunicator.__init__ = doNothing
original_startTestSuite = TaskDistributor.startTestSuite
TaskDistributor.startTestSuite = patch_startTestSuite
original_subscribeNode = TaskDistributor.subscribeNode
original_getTestType = TaskDistributor.getTestType
original_createTestResult = TaskDistributionTool.createTestResult
TaskDistributor.startTestSuite = patch_startTestSuite
TaskDistributor.subscribeNode = doNothing
TaskDistributor.getTestType = patch_getTestType
TaskDistributionTool.createTestResult = patch_createTestResult
# TestNode
test_node = self.getTestNode()
original_prepareSlapOS = test_node._prepareSlapOS
test_node._prepareSlapOS = doNothing
original_runTestSuite = test_node.runTestSuite
test_node.runTestSuite = doNothing
# Modify class UnitTestRunner(or more after) method
original_prepareSlapOS = RunnerClass._prepareSlapOS
original_runTestSuite = RunnerClass.runTestSuite
RunnerClass._prepareSlapOS = doNothing
RunnerClass.runTestSuite = patch_runTestSuite
SlapOSControler.initializeSlapOSControler = doNothing
# Inside test_node a runner is created using new UnitTestRunner methods
test_node.run()
self.assertEquals(5, counter)
time.sleep = original_sleep
# Restore old class methods
if my_test_type == "ScalabilityTest":
TaskDistributor.getSlaposAccountKey = original_getSlaposAccountKey
TaskDistributor.getSlaposAccountCertificate = original_getSlaposAccountCertificate
TaskDistributor.getSlaposUrl = original_getSlaposUrl
TaskDistributor.getSlaposHateoasUrl = original_getSlaposHateoasUrl
TaskDistributor.generateConfiguration = original_generateConfiguration
TaskDistributor.isMasterTestnode = original_isMasterTestnode
RunnerClass._updateInstanceXML = original_updateInstanceXML
SlapOSMasterCommunicator.isHostingSubscriptionReady = original_isHostingSubscriptionReady
SlapOSMasterCommunicator.isRegisteredHostingSubscription = original_isRegisteredHostingSubscription
SlapOSMasterCommunicator.__init__ = original_SlapOSMasterCommunicator__init__
TaskDistributor.startTestSuite = original_startTestSuite
TaskDistributionTool.createTestResult = original_createTestResult
test_node._prepareSlapOS = original_prepareSlapOS
test_node.runTestSuite = original_runTestSuite
TaskDistributionTool.subscribeNode = original_subscribeNode
TaskDistributionTool.getTestType = original_getTestType
RunnerClass._prepareSlapOS = original_prepareSlapOS
RunnerClass.runTestSuite = original_runTestSuite
def test_12_spawn(self):
def test_12_spawn(self, my_test_type='UnitTest'):
def _checkCorrectStatus(expected_status,*args):
result = process_manager.spawn(*args)
self.assertEqual(result['status_code'], expected_status)
......@@ -545,7 +683,7 @@ branch = foo
# it will be automatically killed
self.assertRaises(SubprocessError, process_manager.spawn, 'sleep','3')
def test_13_SlaposControlerResetSoftware(self):
def test_13_SlaposControlerResetSoftware(self, my_test_type='UnitTest'):
test_node = self.getTestNode()
controler = SlapOSControler(self.working_directory,
test_node.config, self.log)
......@@ -557,7 +695,7 @@ branch = foo
controler._resetSoftware()
self.assertEquals([], os.listdir(controler.software_root))
def test_14_createFolder(self):
def test_14_createFolder(self, my_test_type='UnitTest'):
test_node = self.getTestNode()
node_test_suite = test_node.getNodeTestSuite('foo')
node_test_suite.edit(working_directory=self.working_directory)
......@@ -572,15 +710,37 @@ branch = foo
createFolder(folder, clean=True)
self.assertEquals(False, os.path.exists(to_drop_path))
def test_15_suite_log_directory(self):
def test_15_suite_log_directory(self, my_test_type='UnitTest', grade='master'):
def doNothing(self, *args, **kw):
pass
# Used in case of 'ScalabilityTest'
def patch_getTestType(self, *args, **kw):
return my_test_type
def patch_getSlaposAccountKey(self, *args, **kw):
return "key"
def patch_getSlaposAccountCertificate(self, *args, **kw):
return "Certificate"
def patch_getSlaposUrl(self, *args, **kw):
return "http://Foo"
return "Certificate"
def patch_getSlaposHateoasUrl(self, *args, **kw):
return "http://Foo"
def patch_generateConfiguration(self, *args, **kw):
return json.dumps({"configuration_list": [], "involved_nodes_computer_guid"\
: [], "error_message": "No error.", "launcher_nodes_computer_guid": [], \
"launchable": False, "randomized_path" : "azertyuiop"})
def patch_isMasterTestnode(self, *args, **kw):
return grade == 'master'
def patch_isHostingSubscriptionReady(self, *args, **kw):
return True
def patch_isRegisteredHostingSubscription(self, *args, **kw):
return True
test_self = self
test_result_path_root = os.path.join(test_self._temp_dir,'test/results')
os.makedirs(test_result_path_root)
global counter
counter = 0
def patch_startTestSuite(self,test_node_title):
def patch_startTestSuite(self,node_title,computer_guid='unknown'):
global counter
config_list = [test_self.getTestSuiteData(reference='aa')[0],
test_self.getTestSuiteData(reference='bb')[0]]
......@@ -596,6 +756,8 @@ branch = foo
result = TestResultProxy(self._proxy, self._retry_time,
self._logger, test_result_path, node_title, revision)
return result
def patch_runTestSuite(self,*argv, **kw):
return {'status_code':0}
def checkTestSuite(test_node):
test_node.node_test_suite_dict
rand_part_set = set()
......@@ -616,29 +778,82 @@ branch = foo
self.assertEquals(1, len([x for x in suite_log.readlines() \
if x.find("Activated logfile")>=0]))
RunnerClass = self.returnGoodClassRunner(my_test_type)
original_sleep = time.sleep
time.sleep = doNothing
self.generateTestRepositoryList()
if my_test_type == "ScalabilityTest":
original_getSlaposAccountKey = TaskDistributor.getSlaposAccountKey
original_getSlaposAccountCertificate = TaskDistributor.getSlaposAccountCertificate
original_getSlaposUrl = TaskDistributor.getSlaposUrl
original_getSlaposHateoasUrl = TaskDistributor.getSlaposHateoasUrl
original_generateConfiguration = TaskDistributor.generateConfiguration
original_isMasterTestnode = TaskDistributor.isMasterTestnode
original_supply = SlapOSControler.supply
original_request = SlapOSControler.request
original_updateInstanceXML = RunnerClass._updateInstanceXML
original_isHostingSubscriptionReady = SlapOSMasterCommunicator.isHostingSubscriptionReady
original_isRegisteredHostingSubscription = SlapOSMasterCommunicator.isRegisteredHostingSubscription
original_SlapOSMasterCommunicator__init__ = SlapOSMasterCommunicator.__init__
TaskDistributor.getSlaposAccountKey = patch_getSlaposAccountKey
TaskDistributor.getSlaposAccountCertificate = patch_getSlaposAccountCertificate
TaskDistributor.getSlaposUrl = patch_getSlaposUrl
TaskDistributor.getSlaposHateoasUrl = patch_getSlaposHateoasUrl
TaskDistributor.generateConfiguration = patch_generateConfiguration
TaskDistributor.isMasterTestnode = patch_isMasterTestnode
SlapOSControler.supply = doNothing
SlapOSControler.request = doNothing
RunnerClass._updateInstanceXML = doNothing
SlapOSMasterCommunicator.isHostingSubscriptionReady = patch_isHostingSubscriptionReady
SlapOSMasterCommunicator.isRegisteredHostingSubscription = patch_isRegisteredHostingSubscription
SlapOSMasterCommunicator.__init__ = doNothing
original_startTestSuite = TaskDistributor.startTestSuite
original_subscribeNode = TaskDistributor.subscribeNode
original_getTestType = TaskDistributor.getTestType
TaskDistributor.startTestSuite = patch_startTestSuite
TaskDistributor.subscribeNode = doNothing
TaskDistributor.getTestType = patch_getTestType
original_createTestResult = TaskDistributionTool.createTestResult
TaskDistributionTool.createTestResult = patch_createTestResult
test_node = self.getTestNode()
original_prepareSlapOS = test_node._prepareSlapOS
test_node._prepareSlapOS = doNothing
original_runTestSuite = test_node.runTestSuite
test_node.runTestSuite = doNothing
# Change UnitTestRunner class methods
original_prepareSlapOS = RunnerClass._prepareSlapOS
original_runTestSuite = RunnerClass.runTestSuite
if my_test_type == "ScalabilityTest":
RunnerClass.runTestSuite = patch_runTestSuite
else:
RunnerClass.runTestSuite = doNothing
RunnerClass._prepareSlapOS = doNothing
SlapOSControler.initializeSlapOSControler = doNothing
test_node.run()
self.assertEquals(counter, 3)
checkTestSuite(test_node)
time.sleep = original_sleep
# Restore old class methods
if my_test_type == "ScalabilityTest":
TaskDistributor.getSlaposAccountKey = original_getSlaposAccountKey
TaskDistributor.getSlaposAccountCertificate = original_getSlaposAccountCertificate
TaskDistributor.getSlaposUrl = original_getSlaposUrl
TaskDistributor.getSlaposHateoasUrl = original_getSlaposHateoasUrl
TaskDistributor.generateConfiguration = original_generateConfiguration
TaskDistributor.isMasterTestnode = original_isMasterTestnode
SlapOSControler.supply =original_supply
SlapOSControler.request = original_request
SlapOSControler.updateInstanceXML = original_updateInstanceXML
SlapOSMasterCommunicator.isHostingSubscriptionReady = original_isHostingSubscriptionReady
SlapOSMasterCommunicator.isRegisteredHostingSubscription = original_isRegisteredHostingSubscription
SlapOSMasterCommunicator.__init__ = original_SlapOSMasterCommunicator__init__
TaskDistributor.startTestSuite = original_startTestSuite
TaskDistributionTool.createTestResult = original_createTestResult
test_node._prepareSlapOS = original_prepareSlapOS
test_node.runTestSuite = original_runTestSuite
TaskDistributionTool.subscribeNode = original_subscribeNode
TaskDistributionTool.getTestType = original_getTestType
RunnerClass._prepareSlapOS = original_prepareSlapOS
RunnerClass.runTestSuite = original_runTestSuite
def test_16_cleanupLogDirectory(self):
def test_16_cleanupLogDirectory(self, my_test_type='UnitTest'):
# Make sure that we are able to cleanup old log folders
test_node = self.getTestNode()
def check(file_list):
......@@ -659,7 +874,7 @@ branch = foo
test_node._cleanupLog()
check(set(['a_file']))
def test_17_cleanupTempDirectory(self):
def test_17_cleanupTempDirectory(self, my_test_type='UnitTest'):
# Make sure that we are able to cleanup old temp folders
test_node = self.getTestNode()
temp_directory = self.system_temp_folder
......@@ -681,7 +896,7 @@ branch = foo
test_node._cleanupTemporaryFiles()
check(set(['something']))
def test_18_resetSoftwareAfterManyBuildFailures(self):
def test_18_resetSoftwareAfterManyBuildFailures(self, my_test_type='UnitTest'):
"""
Check that after several building failures that the software is resetted
"""
......@@ -689,6 +904,8 @@ branch = foo
SlapOSControler.initializeSlapOSControler
initial_runSoftwareRelease = SlapOSControler.runSoftwareRelease
test_node = self.getTestNode()
RunnerClass = self.returnGoodClassRunner(my_test_type)
runner = RunnerClass(test_node)
node_test_suite = test_node.getNodeTestSuite('foo')
init_call_kw_list = []
def initializeSlapOSControler(self, **kw):
......@@ -698,10 +915,11 @@ branch = foo
SlapOSControler.initializeSlapOSControler = initializeSlapOSControler
SlapOSControler.runSoftwareRelease = runSoftwareRelease
def callPrepareSlapOS():
test_node._prepareSlapOS(self.working_directory, node_test_suite,
runner._prepareSlapOS(self.working_directory, node_test_suite,
test_node.log, create_partition=0)
def callRaisingPrepareSlapos():
self.assertRaises(SubprocessError, callPrepareSlapOS)
self.assertEquals(node_test_suite.retry_software_count, 0)
for x in xrange(0,11):
callRaisingPrepareSlapos()
......@@ -717,3 +935,169 @@ branch = foo
SlapOSControler.initializeSlapOSControler = \
initial_initializeSlapOSControler
SlapOSControler.runSoftwareRelease = initial_runSoftwareRelease
def test_scalability_01_getDelNodeTestSuite(self, my_test_type='ScalabilityTest'):
self.test_01_getDelNodeTestSuite(my_test_type)
def test_scalability_02_NodeTestSuiteWorkingDirectory(self, my_test_type='ScalabilityTest'):
self.test_02_NodeTestSuiteWorkingDirectory(my_test_type)
def test_scalability_03_NodeTestSuiteCheckDataAfterEdit(self, my_test_type='ScalabilityTest'):
self.test_03_NodeTestSuiteCheckDataAfterEdit(my_test_type)
def test_scalability_04_constructProfile(self, my_test_type='ScalabilityTest'):
self.test_04_constructProfile(my_test_type)
def test_scalability_05_getAndUpdateFullRevisionList(self, my_test_type='ScalabilityTest'):
self.test_05_getAndUpdateFullRevisionList(my_test_type)
def test_scalability_05b_changeRepositoryBranch(self, my_test_type='ScalabilityTest'):
self.test_05b_changeRepositoryBranch(my_test_type)
def test_scalability_06_checkRevision(self, my_test_type='ScalabilityTest'):
self.test_06_checkRevision(my_test_type)
def test_scalability_07_checkExistingTestSuite(self, my_test_type='ScalabilityTest'):
self.test_07_checkExistingTestSuite(my_test_type)
def test_scalability_08_getSupportedParamaterSet(self, my_test_type='ScalabilityTest'):
self.test_08_getSupportedParamaterSet(my_test_type)
def test_scalability_09_runTestSuite(self, my_test_type='ScalabilityTest'):
# TODO : write own scalability test
pass
def test_scalability_10_prepareSlapOS(self, my_test_type='ScalabilityTest'):
# TODO : write own scalability test
# This case test may be dispensable on ScalabilityTest case
# so..
pass
def test_scalability_as_master_11_run(self, my_test_type='ScalabilityTest'):
self.test_11_run(my_test_type, grade='master')
# TODO : add a test with master and a launchable testsuite -> patch a lot of methods
def test_scalability_as_slave_11_run(self, my_test_type='ScalabilityTest'):
self.test_11_run(my_test_type, grade='slave')
def test_scalability_12_spawn(self, my_test_type='ScalabilityTest'):
self.test_12_spawn(my_test_type)
def test_scalability_13_SlaposControlerResetSoftware(self, my_test_type='ScalabilityTest'):
self.test_13_SlaposControlerResetSoftware(my_test_type)
def test_scalability_14_createFolder(self, my_test_type='ScalabilityTest'):
self.test_14_createFolder(my_test_type)
def test_scalability_as_master_15_suite_log_directory(self, my_test_type='ScalabilityTest'):
self.test_15_suite_log_directory(my_test_type, grade='master')
def test_scalability_as_slave_15_suite_log_directory(self, my_test_type='ScalabilityTest'):
self.test_15_suite_log_directory(my_test_type, grade='slave')
def test_scalability_16_cleanupLogDirectory(self, my_test_type='ScalabilityTest'):
self.test_16_cleanupLogDirectory(my_test_type)
def test_scalability_17_cleanupTempDirectory(self, my_test_type='ScalabilityTest'):
self.test_17_cleanupTempDirectory(my_test_type)
def test_scalability_18_resetSoftwareAfterManyBuildFailures(self, my_test_type='ScalabilityTest'):
# TODO : write own scalability test
pass
def test_zzzz_scalability_19_xxxx(self):
# TODO : fill the dummy slapos answer
# by patching isSoftwareReleaseReady method.
def patch_createTestResult(self, revision, test_name_list, node_title,
allow_restart=False, test_title=None, project_title=None):
test_result_path = os.path.join(test_result_path_root, test_title)
result = TestResultProxy(self._proxy, self._retry_time,
self._logger, test_result_path, node_title, revision)
return result
global startTestSuiteDone
startTestSuiteDone = False
def patch_startTestSuite(self,node_title,computer_guid='unknown'):
config_list = []
global startTestSuiteDone
if not startTestSuiteDone:
startTestSuiteDone = True
config_list.append(test_self.getTestSuiteData(reference='foo')[0])
config_list.append(test_self.getTestSuiteData(reference='bar')[0])
else:
raise StopIteration
return json.dumps(config_list)
def patch_isMasterTestnode(self, *args, **kw):
return True
def patch_generateConfiguration(self, *args, **kw):
return json.dumps({"configuration_list": [{"ok":"ok"}], "involved_nodes_computer_guid"\
: ["COMP1", "COMP2", "COMP3"], "error_message": "No error.", "launcher_nodes_computer_guid": ["COMP1"], \
"launchable": True, "randomized_path" : "azertyuiop"})
def doNothing(self, *args, **kw):
pass
def patch_getSlaposAccountKey(self, *args, **kw):
return "key"
def patch_getSlaposAccountCertificate(self, *args, **kw):
return "Certificate"
def patch_getSlaposUrl(self, *args, **kw):
return "http://Foo"
return "Certificate"
def patch_getSlaposHateoasUrl(self, *args, **kw):
return "http://Foo"
def patch_getTestType(self, *args, **kw):
return "ScalabilityTest"
def patch_isHostingSubscriptionReady(self, *args, **kw):
return True
def patch_isRegisteredHostingSubscription(self, *args, **kw):
return True
def patch_runTestSuite(self, *args, **kw):
return {'status_code':0}
test_self = self
test_result_path_root = os.path.join(test_self._temp_dir,'test/results')
os.makedirs(test_result_path_root)
self.generateTestRepositoryList()
# Select the good runner to modify
RunnerClass = self.returnGoodClassRunner('ScalabilityTest')
# Patch methods
original_sleep = time.sleep
original_getSlaposAccountKey = TaskDistributor.getSlaposAccountKey
original_getSlaposAccountCertificate = TaskDistributor.getSlaposAccountCertificate
original_getSlaposUrl = TaskDistributor.getSlaposUrl
original_getSlaposHateoasUrl = TaskDistributor.getSlaposHateoasUrl
original_generateConfiguration = TaskDistributor.generateConfiguration
original_isMasterTestnode = TaskDistributor.isMasterTestnode
original_startTestSuite = TaskDistributor.startTestSuite
original_subscribeNode = TaskDistributor.subscribeNode
original_getTestType = TaskDistributor.getTestType
original_createTestResult = TaskDistributionTool.createTestResult
original_prepareSlapOS = RunnerClass._prepareSlapOS
original_runTestSuite = RunnerClass.runTestSuite
original_supply = SlapOSControler.supply
original_request = SlapOSControler.request
original_updateInstanceXML = SlapOSControler.updateInstanceXML
original_isHostingSubscriptionReady = SlapOSMasterCommunicator.isHostingSubscriptionReady
original_isRegisteredHostingSubscription = SlapOSMasterCommunicator.isRegisteredHostingSubscription
original_SlapOSMasterCommunicator__init__ = SlapOSMasterCommunicator.__init__
#
time.sleep = doNothing
TaskDistributor.getSlaposAccountKey = patch_getSlaposAccountKey
TaskDistributor.getSlaposAccountCertificate = patch_getSlaposAccountCertificate
TaskDistributor.getSlaposUrl = patch_getSlaposUrl
TaskDistributor.getSlaposHateoasUrl = patch_getSlaposHateoasUrl
TaskDistributor.generateConfiguration = patch_generateConfiguration
TaskDistributor.isMasterTestnode = patch_isMasterTestnode
TaskDistributor.startTestSuite = patch_startTestSuite
TaskDistributor.subscribeNode = doNothing
TaskDistributor.getTestType = patch_getTestType
TaskDistributionTool.createTestResult = patch_createTestResult
RunnerClass._prepareSlapOS = doNothing
RunnerClass.runTestSuite = patch_runTestSuite
SlapOSControler.supply = doNothing
SlapOSControler.request = doNothing
SlapOSControler.updateInstanceXML = doNothing
SlapOSMasterCommunicator.isHostingSubscriptionReady = patch_isHostingSubscriptionReady
SlapOSMasterCommunicator.isRegisteredHostingSubscription = patch_isRegisteredHostingSubscription
SlapOSMasterCommunicator.__init__ = doNothing
# Run
test_node = self.getTestNode()
test_node.run()
# Restore methods
TaskDistributor.getSlaposAccountKey = original_getSlaposAccountKey
TaskDistributor.getSlaposAccountCertificate = original_getSlaposAccountCertificate
TaskDistributor.getSlaposUrl = original_getSlaposUrl
TaskDistributor.getSlaposHateoasUrl = original_getSlaposHateoasUrl
TaskDistributor.generateConfiguration = original_generateConfiguration
TaskDistributor.isMasterTestnode = original_isMasterTestnode
TaskDistributor.startTestSuite = original_startTestSuite
TaskDistributionTool.createTestResult = original_createTestResult
TaskDistributionTool.subscribeNode = original_subscribeNode
TaskDistributionTool.getTestType = original_getTestType
RunnerClass._prepareSlapOS = original_prepareSlapOS
RunnerClass.runTestSuite = original_runTestSuite
SlapOSControler.supply = original_supply
SlapOSControler.request = original_request
SlapOSControler.updateInstanceXML = original_updateInstanceXML
SlapOSMasterCommunicator.isHostingSubscriptionReady = original_isHostingSubscriptionReady
SlapOSMasterCommunicator.isRegisteredHostingSubscription = original_isRegisteredHostingSubscription
SlapOSMasterCommunicator.__init__ = original_SlapOSMasterCommunicator__init__
time.sleep =original_sleep
......@@ -138,6 +138,15 @@ class TestResultLineProxy(RPCRetry):
def name(self):
return self._name
def isTestCaseAlive(self):
"""
Tell if test result line is still alive on site.
"""
try:
return bool(self._retryRPC('isTestCaseAlive', [self._test_result_line_path]))
except:
raise ValueError('isTestCaseAlive Failed.')
def stop(self, test_count=None, error_count=None, failure_count=None,
skip_count=None, duration=None, date=None, command=None,
stdout=None, stderr=None, html_test_result=None, **kw):
......@@ -201,6 +210,10 @@ class TestResultProxy(RPCRetry):
return '<%s(%r, %r, %r) at %x>' % (self.__class__.__name__,
self._test_result_path, self._node_title, self._revision, id(self))
@property
def test_result_path(self):
return self._test_result_path
@property
def revision(self):
return self._revision
......@@ -350,6 +363,35 @@ class TestResultProxy(RPCRetry):
if self._watcher_thread is not None:
self._watcher_thread.join()
def stop(self):
"""
"""
return self._retryRPC('stopTest', [self._test_result_path])
class TestResultProxyProxy(TestResultProxy):
"""
A wrapper/proxy to TestResultProxy
"""
def __init__(self, test_suite_master_url, retry_time, logger, test_result_path,
node_title, revision):
try:
proxy = ServerProxy(
test_suite_master_url,
allow_none=True,
).portal_task_distribution
except:
raise ValueError("Cannot instanciate ServerProxy")
TestResultProxy.__init__(self, proxy, retry_time, logger, test_result_path,
node_title, revision)
def getRunningTestCase(self):
"""
A proxy to getNextTestCase
Return the relative path of the test with the running state
"""
return self._retryRPC('getRunningTestCase', [self._test_result_path])
class ServerProxy(xmlrpclib.ServerProxy):
def __init__(self, *args, **kw):
......@@ -427,7 +469,6 @@ class TaskDistributionTool(RPCRetry):
class TaskDistributor(RPCRetry):
def __init__(self,portal_url,retry_time=64,logger=None):
if logger is None:
logger = null_logger
if portal_url is None:
......@@ -440,14 +481,67 @@ class TaskDistributor(RPCRetry):
raise ValueError('Unsupported protocol revision: %r',
protocol_revision)
def startTestSuite(self,node_title):
def startTestSuite(self,node_title,computer_guid='unknown'):
"""
Returns None if no test suite is needed.
therwise, returns a JSON with all the test suite parameters.
"""
result = self._retryRPC('startTestSuite',(node_title,))
result = self._retryRPC('startTestSuite',(node_title,computer_guid,))
return result
def getTestType(self):
"""
Return the Test Type
"""
result = self._retryRPC('getTestType')
return result
def subscribeNode(self, node_title, computer_guid):
"""
Susbscribes node with the node title and the computer guid.
"""
self._retryRPC('subscribeNode', (node_title,computer_guid,))
def generateConfiguration(self, test_suite_title):
"""
Generates a configuration from a test_suite_title
"""
return self._retryRPC('generateConfiguration', (test_suite_title,))
def isMasterTestnode(self, test_node_title):
"""
Returns True or False if the testnode is the master
"""
return self._retryRPC('isMasterTestnode', (test_node_title,))
def getSlaposAccountKey(self):
"""
Returns the slapos account key related to the distributor
"""
return self._retryRPC('getSlaposAccountKey')
def getSlaposAccountCertificate(self):
"""
Returns the slapos account certificate related to the distributor
"""
return self._retryRPC('getSlaposAccountCertificate')
def getSlaposUrl(self):
"""
Returns the url of slapos master related to the distributor
"""
return self._retryRPC('getSlaposUrl')
def getSlaposHateoasUrl(self):
"""
Returns the url of API REST using hateoas of
slapos master related to the distributor
"""
return self._retryRPC('getSlaposHateoasUrl')
class DummyTaskDistributionTool(object):
"""
Fake remote server.
......
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from datetime import datetime,timedelta
import os
import subprocess
import sys
import time
import glob
import SlapOSControler
import json
import time
import shutil
import logging
import string
import random
from ProcessManager import SubprocessError, ProcessManager, CancellationError
from subprocess import CalledProcessError
from Updater import Updater
from erp5.util import taskdistribution
class SlapOSInstance(object):
"""
Base of an software instance,
store variables used during software installation
"""
def __init__(self):
self.retry_software_count = 0
self.retry = False
def edit(self, **kw):
self.__dict__.update(**kw)
self._checkData()
def _checkData(self):
pass
class NodeTestSuite(SlapOSInstance):
"""
"""
def __init__(self, reference):
super(NodeTestSuite, self).__init__()
self.reference = reference
def edit(self, **kw):
super(NodeTestSuite, self).edit(**kw)
def _checkData(self):
if getattr(self, "working_directory", None) is not None:
if not(self.working_directory.endswith(os.path.sep + self.reference)):
self.working_directory = os.path.join(self.working_directory,
self.reference)
SlapOSControler.createFolder(self.working_directory)
self.test_suite_directory = os.path.join(
self.working_directory, "test_suite")
self.custom_profile_path = os.path.join(self.working_directory,
'software.cfg')
if getattr(self, "vcs_repository_list", None) is not None:
for vcs_repository in self.vcs_repository_list:
buildout_section_id = vcs_repository.get('buildout_section_id', None)
repository_id = buildout_section_id or \
vcs_repository.get('url').split('/')[-1].split('.')[0]
repository_path = os.path.join(self.working_directory,repository_id)
vcs_repository['repository_id'] = repository_id
vcs_repository['repository_path'] = repository_path
def createSuiteLog(self):
# /srv/slapgrid/slappartXX/srv/var/log/testnode/az-mlksjfmlk234Sljssdflkj23KSdfslj/suite.log
alphabets = string.digits + string.letters
rand_part = ''.join(random.choice(alphabets) for i in xrange(32))
random_suite_folder_id = '%s-%s' % (self.reference, rand_part)
suite_log_directory = os.path.join(self.log_directory,
random_suite_folder_id)
SlapOSControler.createFolders(suite_log_directory)
self.suite_log_path = os.path.join(suite_log_directory,
'suite.log')
return self.getSuiteLogPath(), random_suite_folder_id
def getSuiteLogPath(self):
return getattr(self,"suite_log_path", None)
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import datetime
import os
import subprocess
import sys
import time
import glob
import SlapOSControler
import SlapOSMasterCommunicator
import json
import time
import shutil
import logging
import string
import random
import Utils
from ProcessManager import SubprocessError, ProcessManager, CancellationError
from subprocess import CalledProcessError
from Updater import Updater
from erp5.util import taskdistribution
# for dummy slapos answer
import signal
# max time to instance changing state: 2 hour
MAX_INSTANCE_TIME = 60*60*2
# max time to register instance to slapOSMaster: 5 minutes
MAX_CREATION_INSTANCE_TIME = 60*10
# max time for a test: 1 hour
MAX_TEST_CASE_TIME = 60*60
class ScalabilityTestRunner():
def __init__(self, testnode):
self.testnode = testnode
self.log = self.testnode.log
self.slapos_controler = SlapOSControler.SlapOSControler(
self.testnode.working_directory,
self.testnode.config,
self.log)
# Create the slapos account configuration file and dir
key = self.testnode.test_suite_portal.getSlaposAccountKey()
certificate = self.testnode.test_suite_portal.getSlaposAccountCertificate()
# Get Slapos Master Url
self.slapos_url = ''
try:
self.slapos_url = self.testnode.test_suite_portal.getSlaposUrl()
if not self.slapos_url:
self.slapos_url = self.testnode.config['server_url']
except:
self.slapos_url = self.testnode.config['server_url']
# Get Slapos Master url used for api rest (using hateoas)
self.slapos_api_rest_url = self.testnode.test_suite_portal.getSlaposHateoasUrl()
self.log("SlapOS Master url is: %s" %self.slapos_url)
self.log("SlapOS Master hateoas url is: %s" %self.slapos_api_rest_url)
self.key_path, self.cert_path, config_path = self.slapos_controler.createSlaposConfigurationFileAccount(
key, certificate, self.slapos_url, self.testnode.config)
self.slapos_communicator = None
# Dict containing used to store which SR is not yet correctly installed.
# looks like: {'comp_id1':'SR_urlA', 'comp_id2':'SR_urlA',..}
self.remaining_software_installation_dict = {}
# Protection to prevent installation of softwares after checking
self.authorize_supply = True
self.authorize_request = False
# Used to simulate SlapOS answer (used as a queue)
self.last_slapos_answer = []
self.last_slapos_answer_request = []
def _prepareSlapOS(self, software_path, computer_guid, create_partition=0):
# create_partition is kept for compatibility
"""
A proxy to supply : Install a software on a specific node
"""
self.log("testnode, supply : %s %s", software_path, computer_guid)
if self.authorize_supply :
self.remaining_software_installation_dict[computer_guid] = software_path
self.slapos_controler.supply(software_path, computer_guid)
# Here make a request via slapos controler ?
return {'status_code' : 0}
else:
raise ValueError("Too late to supply now. ('self.authorize_supply' is False)")
return {'status_code' : 1}
def _generateInstanceTitle(self, test_suite_title):
"""
Generate an instance title using various parameter
TODO : add some verification (to don't use unexisting variables)
"""
instance_title = "Scalability-"
instance_title += "("+test_suite_title+")-"
instance_title += str(self.involved_nodes_computer_guid).replace("'","")
instance_title += "-"+str(datetime.datetime.now().isoformat())+"-"
instance_title += "timestamp="+str(time.time())
return instance_title
def _generateInstanceXML(self, software_configuration,
test_result, test_suite):
"""
Generate a complete scalability instance XML configuration
"""
config_cluster = software_configuration.copy()
config = {'cluster':config_cluster}
config.update({'scalability-launcher-computer-guid':self.launcher_nodes_computer_guid[0]})
config.update({'scalability-launcher-title':'MyTestNodeTitle'})
config.update({'test-result-path':test_result.test_result_path})
config.update({'test-suite-revision':test_result.revision})
config.update({'test-suite':test_suite})
config.update({'test-suite-master-url':self.testnode.config['test_suite_master_url']})
return config
def _createInstance(self, software_path, software_configuration, instance_title,
test_result, test_suite):
"""
Create scalability instance
"""
if self.authorize_request:
config = self._generateInstanceXML(software_configuration,
test_result, test_suite)
self.log("testnode, request : %s", instance_title)
config = json.dumps(config)
self.slapos_controler.request(instance_title, software_path,
"test", {"_" : config},
self.launcher_nodes_computer_guid[0])
self.authorize_request = False
return {'status_code' : 0}
else:
raise ValueError("Softwares release not ready yet to launch instan\
ces or already launched.")
return {'status_code' : 1}
def prepareSlapOSForTestNode(self, test_node_slapos=None):
"""
We will build slapos software needed by the testnode itself,
"""
if self.testnode.test_suite_portal.isMasterTestnode(
self.testnode.config['test_node_title']):
pass
return {'status_code' : 0}
# Dummy slapos answering
def _getSignal(self, signal, frame):
self.log("Dummy SlapOS Master answer received.")
self.last_slapos_answer.append(True)
def _prepareDummySlapOSAnswer(self):
self.log("Dummy slapOS answer enabled, send signal to %s (kill -10 %s) to simu\
late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
signal.signal(signal.SIGUSR1, self._getSignal)
def _comeBackFromDummySlapOS(self):
self.log("Dummy slapOS answer disabled, please don't send more signals.")
# use SIG_USR (kill)
signal.signal(signal.SIGUSR1, signal.SIG_DFL)
def simulateSlapOSAnswer(self):
if len(self.last_slapos_answer)==0:
return False
else:
return self.last_slapos_answer.pop()
# /Dummy slapos answering
def isSoftwareReleaseReady(self, software_url, computer_guid):
"""
Return true if the specified software on the specified node is installed.
This method should communicates with SlapOS Master.
"""
# TODO : implement -> communication with SlapOS master
# this simulate a SlapOS answer
return self.simulateSlapOSAnswer()
def remainSoftwareToInstall(self):
"""
Return True if it remains softwares to install, otherwise return False
"""
# Remove from grid installed software entries
for computer_guid, software_path in self.remaining_software_installation_dict.items():
if self.isSoftwareReleaseReady(software_path, computer_guid):
del self.remaining_software_installation_dict[computer_guid]
# Not empty grid means that all softwares are not installed
return len(self.remaining_software_installation_dict) > 0
def _updateInstanceXML(self, software_configuration, instance_title,
test_result, test_suite):
"""
Just a proxy to SlapOSControler.updateInstanceXML.
"""
config = self._generateInstanceXML(software_configuration,
test_result, test_suite)
config = json.dumps(config)
self.log("testnode, updateInstanceXML : %s", instance_title)
self.slapos_controler.updateInstanceXML(instance_title, {"_" : config})
return {'status_code' : 0}
def _waitInstance(self, instance_title, state, max_time=MAX_INSTANCE_TIME):
"""
Wait for 'max_time' an instance specific state
"""
self.log("Wait for instance state: %s" %state)
start_time = time.time()
while (not self.slapos_communicator.isHostingSubscriptionReady(instance_title, state)
and (max_time > (time.time()-start_time))):
self.log("Instance(s) not in %s state yet." % state)
time.sleep(15)
if (time.time()-start_time) > max_time:
error_message = "Instance '%s' not '%s' after %s seconds" %(instance_title, state, str(time.time()-start_time))
self.log(error_message)
self.log("Do you use instance state propagation in your project?")
self.log("Instance '%s' will be stopped and test avorted." %instance_title)
# What if we wanted to stop ?
self.slapos_controler.stopInstance(instance_title)
# XXX: _waitInstance call here ? recursive call ?
# XXX: sleep 60 seconds.
time.sleep(60)
raise ValueError(error_message)
self.log("Instance correctly '%s' after %s seconds." %(state, str(time.time()-start_time)))
def _waitInstanceCreation(self, instance_title, max_time=MAX_CREATION_INSTANCE_TIME):
"""
Wait for 'max_time' the instance creation
"""
self.log("Wait for instance creation")
start_time = time.time()
while ( not self.slapos_communicator.isRegisteredHostingSubscription(instance_title) \
and (max_time > (time.time()-start_time)) ):
time.sleep(5)
if (time.time()-start_time) > max_time:
raise ValueError("Instance '%s' not found after %s seconds" %(instance_title, max_time))
self.log("Instance found on slapOSMaster")
def prepareSlapOSForTestSuite(self, node_test_suite):
"""
Install testsuite softwares
"""
self.log('prepareSlapOSForTestSuite')
# Define how many time this method can take
max_time = 3600*10*1.0 # 10 hours
interval_time = 60
start_time = time.time()
# Create a communicator with slapos
self.log("creating SlapOs Master communicator...")
self.slapos_communicator = SlapOSMasterCommunicator.SlapOSMasterCommunicator(
self.cert_path,
self.key_path,
self.log,
self.slapos_api_rest_url)
# Only master testnode must order software installation
if self.testnode.test_suite_portal.isMasterTestnode(
self.testnode.config['test_node_title']):
# Get from ERP5 Master the configuration of the cluster for the test
test_configuration = Utils.deunicodeData(
json.loads(self.testnode.test_suite_portal.generateConfiguration(
node_test_suite.test_suite_title)
)
)
self.involved_nodes_computer_guid = test_configuration['involved_nodes_computer_guid']
self.launchable = test_configuration['launchable']
self.error_message = test_configuration['error_message']
self.randomized_path = test_configuration['randomized_path']
# Avoid the test if it is not launchable
if not self.launchable:
self.log("Test suite %s is not actually launchable with \
the current cluster configuration." %(node_test_suite.test_suite_title,))
self.log("ERP5 Master indicates : %s" %(self.error_message,))
# error : wich code to return ?
return {'status_code' : 1}
involved_nodes_computer_guid = test_configuration['involved_nodes_computer_guid']
configuration_list = test_configuration['configuration_list']
node_test_suite.edit(configuration_list=configuration_list)
self.launcher_nodes_computer_guid = test_configuration['launcher_nodes_computer_guid']
# Create an obfuscated link to the testsuite directory
path_to_suite = os.path.join(
self.testnode.config['working_directory'],
node_test_suite.reference)
self.obfuscated_link_path = os.path.join(
self.testnode.config['software_directory'],
self.randomized_path)
if ( not os.path.lexists(self.obfuscated_link_path) and
not os.path.exists(self.obfuscated_link_path) ) :
try :
os.symlink(path_to_suite, self.obfuscated_link_path)
self.log("testnode, Symbolic link (%s->%s) created."
%(self.obfuscated_link_path, path_to_suite))
except :
self.log("testnode, Unable to create symbolic link to the testsuite.")
raise ValueError("testnode, Unable to create symbolic link to the testsuite.")
self.log("Sym link : %s %s" %(path_to_suite, self.obfuscated_link_path))
# Construct the ipv6 obfuscated url of the software profile reachable from outside
self.reachable_address = os.path.join(
"https://","["+self.testnode.config['httpd_ip']+"]"+":"+self.testnode.config['httpd_software_access_port'],
self.randomized_path)
self.reachable_profile = os.path.join(self.reachable_address, "software.cfg")
# Write the reachable address in the software.cfg file,
# by replacing <obfuscated_url> occurences by the current reachable address.
software_file = open(node_test_suite.custom_profile_path, "r")
file_content = software_file.readlines()
new_file_content = []
for line in file_content:
new_file_content.append(line.replace('<obfuscated_url>', self.reachable_address))
software_file.close()
os.remove(node_test_suite.custom_profile_path)
software_file = open(node_test_suite.custom_profile_path, "w")
for line in new_file_content:
software_file.write(line)
software_file.close()
self.log("Software reachable profile path is : %s "
%(self.reachable_profile,))
# Ask for SR installation
for computer_guid in self.involved_nodes_computer_guid:
self._prepareSlapOS(self.reachable_profile, computer_guid)
# From the line below we would not supply any more softwares
self.authorize_supply = False
# TODO : remove the line below wich simulate an answer from slapos master
self._prepareDummySlapOSAnswer()
# Waiting until all softwares are installed
while ( self.remainSoftwareToInstall()
and (max_time > (time.time()-start_time))):
self.log("Master testnode is waiting\
for the end of all software installation (for %ss) PID=%s.",
str(int(time.time()-start_time)), str(os.getpid()))
time.sleep(interval_time)
# TODO : remove the line below wich simulate an answer from slapos master
self._comeBackFromDummySlapOS()
if self.remainSoftwareToInstall() :
# All softwares are not installed, however maxtime is elapsed, that's a failure.
return {'status_code' : 1}
self.authorize_request = True
self.log("Softwares installed")
# Launch instance
self.instance_title = self._generateInstanceTitle(node_test_suite.test_suite_title)
try:
self._createInstance(self.reachable_profile, configuration_list[0],
self.instance_title, node_test_suite.test_result, node_test_suite.test_suite)
self.log("Scalability instance requested.")
except:
self.log("Unable to launch instance")
raise ValueError("Unable to launch instance")
self.log("Waiting for instance creation..")
self._waitInstanceCreation(self.instance_title)
return {'status_code' : 0}
return {'status_code' : 1}
def runTestSuite(self, node_test_suite, portal_url):
if not self.launchable:
self.log("Current test_suite is not actually launchable.")
return {'status_code' : 1} # Unable to continue due to not realizable configuration
configuration_list = node_test_suite.configuration_list
test_list = range(0, len(configuration_list))
# create test_result
test_result_proxy = self.testnode.portal.createTestResult(
node_test_suite.revision, test_list,
self.testnode.config['test_node_title'],
True, node_test_suite.test_suite_title,
node_test_suite.project_title)
count = 0
error_message = None
# Each cluster configuration are tested
for configuration in configuration_list:
# First configuration doesn't need XML configuration update.
if count > 0:
# Stop instance
self.slapos_controler.stopInstance(self.instance_title)
self._waitInstance(self.instance_title, 'stopped')
# Update instance XML configuration
self._updateInstanceXML(configuration, self.instance_title,
node_test_suite.test_result, node_test_suite.test_suite)
self._waitInstance(self.instance_title, 'started')
# Start instance
self.slapos_controler.startInstance(self.instance_title)
# XXX: Dirty hack used to force haproxy to restart in time
# with all zope informations.
self._waitInstance(self.instance_title, 'started')
self.slapos_controler.stopInstance(self.instance_title)
self._waitInstance(self.instance_title, 'stopped')
self.slapos_controler.startInstance(self.instance_title)
##########################################################
self._waitInstance(self.instance_title, 'started')
# Start only the current test
exclude_list=[x for x in test_list if x!=test_list[count]]
count += 1
test_result_line_proxy = test_result_proxy.start(exclude_list)
#
if test_result_line_proxy == None :
error_message = "Test case already tested."
break
self.log("Test for count : %d is in a running state." %count)
# Wait for test case ending
test_case_start_time = time.time()
while test_result_line_proxy.isTestCaseAlive() and \
test_result_proxy.isAlive() and \
time.time() - test_case_start_time < MAX_TEST_CASE_TIME:
time.sleep(15)
# Max time limit reach for current test case: failure.
if test_result_line_proxy.isTestCaseAlive():
error_message = "Test case during for %s seconds, too long. (max: %s seconds). Test failure." \
%(str(time.time() - test_case_start_time), MAX_TEST_CASE_TIME)
break
# Test cancelled, finished or in an undeterminate state.
if not test_result_proxy.isAlive():
# Test finished
if count == len(configuration_list):
break
# Cancelled or in an undeterminate state.
error_message = "Test cancelled or undeterminate state."
break
# Stop current instance
self.slapos_controler.stopInstance(self.instance_title)
self._waitInstance(self.instance_title, 'stopped')
# Delete old instances
self._cleanUpOldInstance()
# If error appears then that's a test failure.
if error_message:
test_result_line_proxy.stop(error_count=1, failure_count=1,
stdout=error_message, stderr=error_message)
test_result_proxy.reportFailure(stdout=error_message)
self.log("Test Failed.")
return {'status_code' : 1, 'error_message': error_message}
# Test is finished.
self.log("Test finished.")
return {'status_code' : 0}
def _cleanUpOldInstance(self):
self.log("_cleanUpOldInstance")
# Get title and link list of all instances
instance_dict = self.slapos_communicator.getHostingSubscriptionDict()
instance_to_delete_list = []
outdated_date = datetime.datetime.fromtimestamp(time.time()) - datetime.timedelta(days=2)
# Select instances to delete
for title,link in instance_dict.items():
# Instances created by testnode contains "Scalability-" and
# "timestamp=" in the title.
if "Scalability-" in title and "timestamp=" in title:
# Get timestamp of the instance creation date
foo, timestamp = title.split("timestamp=")
creation_date = datetime.datetime.fromtimestamp(float(timestamp))
# Test if instance is older than the limit
if creation_date < outdated_date:
instance_to_delete_list.append((title,link))
for title,link in instance_to_delete_list:
# Get instance information
instance_information_dict = self.slapos_communicator.getHostingSubscriptionInformationDict(title)
# Delete instance
if instance_information_dict:
if instance_information_dict['status'] != 'destroyed':
self.slapos_controler.request(
instance_information_dict['title'],
instance_information_dict['software_url'],
software_type=instance_information_dict['software_type'],
computer_guid=instance_information_dict['computer_guid'],
state='destroyed'
)
self.log("Instance '%s' deleted." %instance_information_dict['title'])
def _cleanUpNodesInformation(self):
self.involved_nodes_computer_guid = []
self.launcher_nodes_computer_guid = []
self.remaining_software_installation_dict = {}
self.authorize_supply = True
self.authorize_request = False
def getRelativePathUsage(self):
"""
Used by the method testnode.constructProfile() to know
if the software.cfg have to use relative path or not.
"""
return True
......@@ -33,6 +33,8 @@ import xml_marshaller
import shutil
import sys
import glob
import argparse
from slapos import client
MAX_PARTIONS = 10
MAX_SR_RETRIES = 3
......@@ -47,6 +49,20 @@ def createFolders(folder):
if not(os.path.exists(folder)):
os.makedirs(folder)
def isDir(folder):
return os.path.isdir(folder)
def createFile(path, mode, content):
f = open(path, mode)
if os.path.exists(path):
f.write(content)
f.close()
else:
# error
pass
class SlapOSControler(object):
def __init__(self, working_directory, config, log):
......@@ -54,8 +70,175 @@ class SlapOSControler(object):
self.software_root = os.path.join(working_directory, 'soft')
self.instance_root = os.path.join(working_directory, 'inst')
self.slapos_config = os.path.join(working_directory, 'slapos.cfg')
self.proxy_database = os.path.join(working_directory, 'proxy.db')
self.log = log
self.proxy_database = os.path.join(working_directory, 'proxy.db')
self.instance_config = {}
#TODO: implement a method to get all instance related the slapOS account
# and deleting all old instances (based on creation date or name etc...)
def createSlaposConfigurationFileAccount(self, key, certificate, slapos_url, config):
# Create "slapos_account" directory in the "slapos_directory"
slapos_account_directory = os.path.join(config['slapos_directory'], "slapos_account")
createFolder(slapos_account_directory)
# Create slapos-account files
slapos_account_key_path = os.path.join(slapos_account_directory, "key")
slapos_account_certificate_path = os.path.join(slapos_account_directory, "certificate")
configuration_file_path = os.path.join(slapos_account_directory, "slapos.cfg")
configuration_file_value = "[slapos]\nmaster_url = %s\n\
[slapconsole]\ncert_file = %s\nkey_file = %s" %(
slapos_url,
slapos_account_certificate_path,
slapos_account_key_path)
createFile(slapos_account_key_path, "w", key)
createFile(slapos_account_certificate_path, "w", certificate)
createFile(configuration_file_path, "w", configuration_file_value)
self.configuration_file_path = configuration_file_path
return slapos_account_key_path, slapos_account_certificate_path, configuration_file_path
def supply(self, software_url, computer_id, state="available"):
"""
Request the installation of a software release on a specific node
Ex :
my_controler.supply('kvm.cfg', 'COMP-726')
"""
self.log('SlapOSControler : supply')
parser = argparse.ArgumentParser()
parser.add_argument("configuration_file")
parser.add_argument("software_url")
parser.add_argument("node")
if os.path.exists(self.configuration_file_path):
args = parser.parse_args([self.configuration_file_path, software_url, computer_id])
config = client.Config()
config.setConfig(args, args.configuration_file)
try:
local = client.init(config)
local['supply'](software_url, computer_guid=computer_id, state=state)
self.log('SlapOSControler : supply %s %s %s' %(software_url, computer_id, state))
except:
self.log("SlapOSControler.supply, \
exception in registerOpenOrder", exc_info=sys.exc_info())
raise ValueError("Unable to supply (or remove)")
else:
raise ValueError("Configuration file not found.")
def destroy(self, software_url, computer_id):
"""
Request Deletetion of a software release on a specific node
Ex :
my_controler.destroy('kvm.cfg', 'COMP-726')
"""
self.supply(self, software_url, computer_id, state="destroyed")
def getInstanceRequestedState(self, reference):
try:
return self.instance_config[reference]['requested_state']
except:
raise ValueError("Instance '%s' not exist" %self.instance_config[reference])
def request(self, reference, software_url, software_type=None,
software_configuration=None, computer_guid=None, state='started'):
"""
configuration_file_path (slapos acount)
reference : instance title
software_url : software path/url
software_type : scalability
software_configuration : dict { "_" : "{'toto' : 'titi'}" }
Ex :
my_controler._request('Instance16h34Ben',
'kvm.cfg', 'cluster', { "_" : "{'toto' : 'titi'}" } )
"""
self.log('SlapOSControler : request-->SlapOSMaster')
current_intance_config = {'software_type':software_type,
'software_configuration':software_configuration,
'computer_guid':computer_guid,
'software_url':software_url,
'requested_state':state,
'partition':None
}
self.instance_config[reference] = current_intance_config
filter_kw = None
if computer_guid != None:
filter_kw = { "computer_guid": computer_guid }
if os.path.exists(self.configuration_file_path):
parser = argparse.ArgumentParser()
parser.add_argument("configuration_file")
args = parser.parse_args([self.configuration_file_path])
config = client.Config()
config.setConfig(args, args.configuration_file)
try:
local = client.init(config)
partition = local['request'](
software_release = software_url,
partition_reference = reference,
partition_parameter_kw = software_configuration,
software_type = software_type,
filter_kw = filter_kw,
state = state)
self.instance_config[reference]['partition'] = partition
if state == 'destroyed':
del self.instance_config[reference]
if state == 'started':
self.log('Instance started with configuration: %s' %str(software_configuration))
except:
self.log("SlapOSControler.request, \
exception in registerOpenOrder", exc_info=sys.exc_info())
raise ValueError("Unable to do this request")
else:
raise ValueError("Configuration file not found.")
def _requestSpecificState(self, reference, state):
self.request(reference,
self.instance_config[reference]['software_url'],
self.instance_config[reference]['software_type'],
self.instance_config[reference]['software_configuration'],
self.instance_config[reference]['computer_guid'],
state=state
)
def destroyInstance(self, reference):
self.log('SlapOSControler : delete instance')
try:
self._requestSpecificState(reference, 'destroyed')
except:
raise ValueError("Can't delete instance '%s' (instance may not been created?)" %reference)
def stopInstance(self, reference):
self.log('SlapOSControler : stop instance')
try:
self._requestSpecificState(reference, 'stopped')
except:
raise ValueError("Can't stop instance '%s' (instance may not been created?)" %reference)
def startInstance(self, reference):
self.log('SlapOSControler : start instance')
try:
self._requestSpecificState(reference, 'started')
except:
raise ValueError("Can't start instance '%s' (instance may not been created?)" %reference)
def updateInstanceXML(self, reference, software_configuration):
"""
Update the XML configuration of an instance
# Request same instance with different parameters.
"""
self.log('SlapOSControler : updateInstanceXML')
self.log('SlapOSControler : updateInstanceXML will request same'
'instance with new XML configuration...')
try:
self.request(reference,
self.instance_config[reference]['software_url'],
self.instance_config[reference]['software_type'],
software_configuration,
self.instance_config[reference]['computer_guid'],
state='started'
)
except:
raise ValueError("Can't update instance '%s' (may not exist?)" %reference)
def _resetSoftware(self):
self.log('SlapOSControler : GOING TO RESET ALL SOFTWARE : %r' %
......@@ -65,7 +248,6 @@ class SlapOSControler(object):
os.mkdir(self.software_root)
os.chmod(self.software_root, 0750)
def initializeSlapOSControler(self, slapproxy_log=None, process_manager=None,
reset_software=False, software_path_list=None):
self.process_manager = process_manager
......
import json
import httplib
import urlparse
import time
TIMEOUT = 30
# TODO: News-> look list to get last news... (and not the first of the list)
class SlapOSMasterCommunicator(object):
"""
Communication with slapos Master using Hateoas.
collection: collection of data (hosting_subscription, instance, software_release...)
hosting_subscription: result of a request
instance(s): instance(s) related to an hosting_subscription
usage: ex:
# Try to reuse same communicator, because initilization step may takes a lot of time
# due to listing of all instances (alive or not) related to the specified slapOS account.
communicator = SlapOSMasterCommunicator()
# Print news related to 'TestScalability_21423104630420' all instances
instance_link_list = communicator._getRelatedInstanceLink('TestScalability_21423104630420')
for instance_link in instance_link_list:
news = communicator.getNewsFromInstanceLink(instance_link)
print news['news']
"""
def __init__(self, certificate_path, key_path, log,
url):
# Create connection
api_scheme, api_netloc, api_path, api_query, api_fragment = urlparse.urlsplit(url)
self.log = log
self.certificate_path = certificate_path
self.key_path = key_path
self.url = url
self.connection = self._getConnection(self.certificate_path, self.key_path, self.url)
# Get master
master_link = {'href':api_path,'type':"application/vnd.slapos.org.hal+json; class=slapos.org.master"}
master = self._curl(master_link)
self.person_link = master['_links']['http://slapos.org/reg/me']
# Get person related to specified key/certificate provided
person = self._curl(self.person_link)
self.personnal_collection_link = person['_links']['http://slapos.org/reg/hosting_subscription']
# Get collection (of hosting subscriptions)
collection = self._curl(self.personnal_collection_link)
# XXX: This part may be extremly long (because here no hosting subscriptions
# has been visited)
self.hosting_subcriptions_dict = {}
self.visited_hosting_subcriptions_link_list = []
self.log("SlapOSMasterCommunicator will read all hosting subscriptions entries, "
"it may take several time...")
self._update_hosting_subscription_informations()
def _getConnection(self,certificate_path, key_path, url):
api_scheme, api_netloc, api_path, api_query, api_fragment = urlparse.urlsplit(url)
#self.log("HTTPS Connection with: %s, cert=%s, key=%s" %(api_netloc,key_path,certificate_path))
return httplib.HTTPSConnection(api_netloc, key_file=key_path, cert_file=certificate_path, timeout=TIMEOUT)
def _curl(self, link):
"""
'link' must look like : {'href':url,'type':content_type}
"""
# Set timeout
import socket
socket.setdefaulttimeout(1.0*TIMEOUT)
api_scheme, api_netloc, api_path, api_query, api_fragment = urlparse.urlsplit(link['href'])
max_retry = 10
# Try to use existing conection
try:
self.connection.request(method='GET', url=api_path, headers={'Accept': link['type']}, body="")
response = self.connection.getresponse()
return json.loads(response.read())
# Create and use new connection
except:
retry = 0
# (re)Try several time to use new connection
while retry < max_retry:
try:
self.connection = self._getConnection(self.certificate_path, self.key_path, self.url)
self.connection.request(method='GET', url=api_path, headers={'Accept': link['type']}, body="")
response = self.connection.getresponse()
return json.loads(response.read())
except:
self.log("SlapOSMasterCommunicator: Connection failed..")
retry += 1
time.sleep(10)
self.log("SlapOSMasterCommunicator: All connection attempts failed after %d try.." %max_retry)
raise ValueError("SlapOSMasterCommunicator: Impossible to use connection")
def _update_hosting_subscription_informations(self):
"""
Add all not already visited hosting_subcription
# Visit all hosting subscriptions and fill a dict containing all
# new hosting subscriptions. ( like: {hs1_title:hs1_link, hs2_title:hs2_link, ..} )
# and a list of visited hosting_subsciption ( like: [hs1_link, hs2_link, ..] )
"""
collection = self._curl(self.personnal_collection_link)
# For each hosting_subcription present in the collection
for hosting_subscription_link in collection['_links']['item']:
if hosting_subscription_link not in self.visited_hosting_subcriptions_link_list:
hosting_subscription = self._curl(hosting_subscription_link)
self.hosting_subcriptions_dict.update({hosting_subscription['title']:hosting_subscription_link})
self.visited_hosting_subcriptions_link_list.append(hosting_subscription_link)
def _getRelatedInstanceLink(self, hosting_subscription_title):
"""
Return a list of all related instance_url from an hosting_subscription_title
"""
# Update informations
self._update_hosting_subscription_informations()
# Get specified hosting_subscription
hosting_subscription_link = self.hosting_subcriptions_dict[hosting_subscription_title]
hosting_subscription = self._curl(hosting_subscription_link)
assert(hosting_subscription_title == hosting_subscription['title'])
# Get instance collection related to this hosting_subscription
instance_collection_link = hosting_subscription['_links']['http://slapos.org/reg/instance']
instance_collection = self._curl(instance_collection_link)
related_instance_link_list = []
# For each instance present in the collection
for instance in instance_collection['_links']['item']:
related_instance_link_list.append(instance)
return related_instance_link_list
def getNewsFromInstanceLink(self, instance_link):
instance = self._curl(instance_link)
news_link = instance['_links']['http://slapos.org/reg/news']
return self._curl(news_link)
def isHostingSubsciptionStatusEqualTo(self, hosting_subscription_title, excepted_news_text):
"""
Return True if all related instance state are equal to status,
or False if not or if there is are no related instances.
"""
related_instance_link_list = _getRelatedInstanceLink(hosting_subscription_title)
# For each instance present in the collection
for instance_link in related_instance_link_list:
news = self.getNewsFromInstanceLink(instance_link)
if excepted_news_text != news['news'][0]['text']:
return False
return len(related_instance_link_list) > 0
def isInstanceReady(self, instance_link, status):
"""
Return True if instance status and instance news text ~looks corresponding.
( use the matching of 'correctly' and 'Instance' and status )
"""
# XXX: SlapOS Master doesn't store any "news" about slave instances. Assume true.
if self._curl(instance_link)['slave']:
return True
text = self.getNewsFromInstanceLink(instance_link)['news'][0]['text']
return ('Instance' in text) and ('correctly' in text) and (status in text)
# check if provided 'status' = status
def isHostingSubscriptionReady(self, hosting_subscription_title, status):
"""
Return True if all instance status and instance news text ~looks corresponding.
( use the matching of 'correctly' and 'Instance' and status ).
"""
instance_link_list = self._getRelatedInstanceLink(hosting_subscription_title)
for instance_link in instance_link_list:
if not self.isInstanceReady(instance_link, status):
return False
return len(instance_link_list) > 0
def isRegisteredHostingSubscription(self, hosting_subscription_title):
"""
Return True if the specified hosting_subscription is present on SlapOSMaster
"""
self._update_hosting_subscription_informations()
if self.hosting_subcriptions_dict.get(hosting_subscription_title):
return True
return False
def getHostingSubscriptionDict(self):
"""
Return the dict of hosting subcription.
"""
return self.hosting_subcriptions_dict
def getHostingSubscriptionInformationDict(self, title):
"""
Return a dict with informations about Hosting subscription
"""
related_instance_link_list = self._getRelatedInstanceLink(title)
related_instance_link = None
# Get root instance
for link in related_instance_link_list:
instance = self._curl(link)
if title == instance['title']:
related_instance_link = link
break
# Return information dict
if related_instance_link:
related_instance = self._curl(related_instance_link)
return {
'title': related_instance['title'],
'status': related_instance['status'],
'software_url': related_instance['_links']['http://slapos.org/reg/release'],
'software_type': related_instance['software_type'],
'computer_guid': related_instance['sla']['computer_guid']
}
else:
return None
\ No newline at end of file
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from datetime import datetime,timedelta
import os
import subprocess
import sys
import time
import glob
import SlapOSControler
import json
import time
import shutil
import logging
import string
import random
from ProcessManager import SubprocessError, ProcessManager, CancellationError
from subprocess import CalledProcessError
from NodeTestSuite import SlapOSInstance
from Updater import Updater
from erp5.util import taskdistribution
class UnitTestRunner():
def __init__(self, testnode):
self.testnode = testnode
self.slapos_controler = SlapOSControler.SlapOSControler(
self.testnode.working_directory,
self.testnode.config,
self.testnode.log)
def _prepareSlapOS(self, working_directory, slapos_instance, log,
create_partition=1, software_path_list=None, **kw):
"""
Launch slapos to build software and partitions
"""
slapproxy_log = os.path.join(self.testnode.config['log_directory'],
'slapproxy.log')
log('Configured slapproxy log to %r' % slapproxy_log)
reset_software = slapos_instance.retry_software_count > 10
if reset_software:
slapos_instance.retry_software_count = 0
log('testnode, retry_software_count : %r' % \
slapos_instance.retry_software_count)
self.slapos_controler.initializeSlapOSControler(slapproxy_log=slapproxy_log,
process_manager=self.testnode.process_manager, reset_software=reset_software,
software_path_list=software_path_list)
self.testnode.process_manager.supervisord_pid_file = os.path.join(\
self.slapos_controler.instance_root, 'var', 'run', 'supervisord.pid')
method_list= ["runSoftwareRelease"]
if create_partition:
method_list.append("runComputerPartition")
for method_name in method_list:
slapos_method = getattr(self.slapos_controler, method_name)
log("Before status_dict = slapos_method(...)")
status_dict = slapos_method(self.testnode.config,
environment=self.testnode.config['environment'],
)
log(status_dict)
log("After status_dict = slapos_method(...)")
if status_dict['status_code'] != 0:
slapos_instance.retry = True
slapos_instance.retry_software_count += 1
raise SubprocessError(status_dict)
else:
slapos_instance.retry_software_count = 0
return status_dict
def prepareSlapOSForTestNode(self, test_node_slapos):
"""
We will build slapos software needed by the testnode itself,
like the building of selenium-runner by default
"""
return self._prepareSlapOS(self.testnode.config['slapos_directory'],
test_node_slapos, self.testnode.log, create_partition=0,
software_path_list=self.testnode.config.get("software_list"))
def prepareSlapOSForTestSuite(self, node_test_suite):
"""
Build softwares needed by testsuites
"""
log = self.testnode.log
if log is None:
log = self.testnode.log
return self._prepareSlapOS(node_test_suite.working_directory,
node_test_suite, log,
software_path_list=[node_test_suite.custom_profile_path])
def runTestSuite(self, node_test_suite, portal_url, log=None):
config = self.testnode.config
parameter_list = []
run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" % \
self.slapos_controler.instance_root)
if not len(run_test_suite_path_list):
raise ValueError('No runTestSuite provided in installed partitions.')
run_test_suite_path = run_test_suite_path_list[0]
run_test_suite_revision = node_test_suite.revision
# Deal with Shebang size limitation
invocation_list = self.testnode._dealShebang(run_test_suite_path)
invocation_list.extend([run_test_suite_path,
'--test_suite', node_test_suite.test_suite,
'--revision', node_test_suite.revision,
'--test_suite_title', node_test_suite.test_suite_title,
'--node_quantity', config['node_quantity'],
'--master_url', portal_url])
firefox_bin_list = glob.glob("%s/soft/*/parts/firefox/firefox-slapos" % \
config["slapos_directory"])
if len(firefox_bin_list):
parameter_list.append('--firefox_bin')
xvfb_bin_list = glob.glob("%s/soft/*/parts/xserver/bin/Xvfb" % \
config["slapos_directory"])
if len(xvfb_bin_list):
parameter_list.append('--xvfb_bin')
supported_paramater_set = self.testnode.process_manager.getSupportedParameterSet(
run_test_suite_path, parameter_list)
if '--firefox_bin' in supported_paramater_set:
invocation_list.extend(["--firefox_bin", firefox_bin_list[0]])
if '--xvfb_bin' in supported_paramater_set:
invocation_list.extend(["--xvfb_bin", xvfb_bin_list[0]])
# TODO : include testnode correction ( b111682f14890bf )
if hasattr(node_test_suite,'additional_bt5_repository_id'):
additional_bt5_path = os.path.join(
node_test_suite.working_directory,
node_test_suite.additional_bt5_repository_id)
invocation_list.extend(["--bt5_path", additional_bt5_path])
# From this point, test runner becomes responsible for updating test
# result. We only do cleanup if the test runner itself is not able
# to run.
SlapOSControler.createFolder(node_test_suite.test_suite_directory,
clean=True)
self.testnode.process_manager.spawn(*invocation_list,
cwd=node_test_suite.test_suite_directory,
log_prefix='runTestSuite', get_output=False)
def getRelativePathUsage(self):
"""
Used by the method testnode.constructProfile() to know
if the software.cfg have to use relative path or not.
"""
return False
......@@ -32,7 +32,7 @@ import subprocess
import sys
import threading
from testnode import SubprocessError
from ProcessManager import SubprocessError
SVN_UP_REV = re.compile(r'^(?:At|Updated to) revision (\d+).$')
SVN_CHANGED_REV = re.compile(r'^Last Changed Rev.*:\s*(\d+)', re.MULTILINE)
......@@ -103,6 +103,30 @@ class Updater(object):
def _git(self, *args, **kw):
return self.spawn(self.git_binary, *args, **kw)['stdout'].strip()
def git_update_server_info(self):
return self._git('update-server-info', '-f')
def git_create_repository_link(self):
""" Create a link in depository to the ".git" directory.
ex:
for "../erp5/.git"
"../erp5/erp5.git"->"../erp5/.git" will be created.
"""
git_repository_path = os.path.join(self.getRepositoryPath(), '.git')
name = os.path.basename(os.path.normpath(self.getRepositoryPath()))
git_repository_link_path = os.path.join(self.getRepositoryPath(), '%s.git' %name)
self.log("checking link %s -> %s.."
%(git_repository_link_path,git_repository_path))
if ( not os.path.lexists(git_repository_link_path) and \
not os.path.exists(git_repository_link_path) ):
try:
os.symlink(git_repository_path, git_repository_link_path)
self.log("link: %s -> %s created"
%(git_repository_link_path,git_repository_path))
except:
self.log("Cannot create link from %s -> %s"
%(git_repository_link_path,git_repository_path))
def _git_find_rev(self, ref):
try:
return self._git_cache[ref]
......@@ -213,3 +237,4 @@ class Updater(object):
else:
raise NotImplementedError
self._path_list += path_list
self.git_update_server_info()
import sys
import json
import shutil
import string
from random import choice
def deunicodeData(data):
if isinstance(data, list):
new_data = []
for sub_data in data:
new_data.append(deunicodeData(sub_data))
elif isinstance(data, unicode):
new_data = data.encode('utf8')
elif isinstance(data, dict):
new_data = {}
for key, value in data.iteritems():
key = deunicodeData(key)
value = deunicodeData(value)
new_data[key] = value
else:
new_data = data
return new_data
\ No newline at end of file
......@@ -74,15 +74,17 @@ def main(*args):
config.optionxform = str
config.readfp(parsed_argument.configuration_file[0])
for key in ('slapos_directory','working_directory','test_suite_directory',
'log_directory','run_directory','proxy_host','proxy_port',
'git_binary','zip_binary','node_quantity','test_node_title',
'ipv4_address','ipv6_address','test_suite_master_url',
'log_directory','run_directory', 'srv_directory', 'proxy_host',
'software_directory',
'proxy_port', 'git_binary','zip_binary','node_quantity',
'test_node_title', 'ipv4_address','ipv6_address','test_suite_master_url',
'slapgrid_partition_binary','slapgrid_software_binary',
'slapproxy_binary', 'httpd_ip', 'httpd_port'):
'slapproxy_binary', 'httpd_ip', 'httpd_port', 'httpd_software_access_port',
'computer_id', 'server_url'):
CONFIG[key] = config.get('testnode',key)
for key in ('slapos_directory', 'working_directory', 'test_suite_directory',
'log_directory', 'run_directory'):
'log_directory', 'run_directory', 'srv_directory', 'software_directory'):
d = CONFIG[key]
if not os.path.isdir(d):
raise ValueError('Directory %r does not exists.' % d)
......
......@@ -37,11 +37,19 @@ import shutil
import logging
import string
import random
import Utils
import traceback
from ProcessManager import SubprocessError, ProcessManager, CancellationError
from subprocess import CalledProcessError
from Updater import Updater
from NodeTestSuite import NodeTestSuite, SlapOSInstance
from ScalabilityTestRunner import ScalabilityTestRunner
from UnitTestRunner import UnitTestRunner
from erp5.util import taskdistribution
DEFAULT_SLEEP_TIMEOUT = 120 # time in seconds to sleep
MAX_LOG_TIME = 15 # time in days we should keep logs that we can see through
# httd
......@@ -56,77 +64,6 @@ class DummyLogger(object):
'critical', 'fatal'):
setattr(self, name, func)
class SlapOSInstance(object):
def __init__(self):
self.retry_software_count = 0
self.retry = False
def edit(self, **kw):
self.__dict__.update(**kw)
self._checkData()
def _checkData(self):
pass
def deunicodeData(data):
if isinstance(data, list):
new_data = []
for sub_data in data:
new_data.append(deunicodeData(sub_data))
elif isinstance(data, unicode):
new_data = data.encode('utf8')
elif isinstance(data, dict):
new_data = {}
for key, value in data.iteritems():
key = deunicodeData(key)
value = deunicodeData(value)
new_data[key] = value
return new_data
class NodeTestSuite(SlapOSInstance):
def __init__(self, reference):
super(NodeTestSuite, self).__init__()
self.reference = reference
def edit(self, **kw):
super(NodeTestSuite, self).edit(**kw)
def _checkData(self):
if getattr(self, "working_directory", None) is not None:
if not(self.working_directory.endswith(os.path.sep + self.reference)):
self.working_directory = os.path.join(self.working_directory,
self.reference)
SlapOSControler.createFolder(self.working_directory)
self.test_suite_directory = os.path.join(
self.working_directory, "test_suite")
self.custom_profile_path = os.path.join(self.working_directory,
'software.cfg')
if getattr(self, "vcs_repository_list", None) is not None:
for vcs_repository in self.vcs_repository_list:
buildout_section_id = vcs_repository.get('buildout_section_id', None)
repository_id = buildout_section_id or \
vcs_repository.get('url').split('/')[-1].split('.')[0]
repository_path = os.path.join(self.working_directory,repository_id)
vcs_repository['repository_id'] = repository_id
vcs_repository['repository_path'] = repository_path
def createSuiteLog(self):
# /srv/slapgrid/slappartXX/srv/var/log/testnode/az-mlksjfmlk234Sljssdflkj23KSdfslj/suite.log
alphabets = string.digits + string.letters
rand_part = ''.join(random.choice(alphabets) for i in xrange(32))
random_suite_folder_id = '%s-%s' % (self.reference, rand_part)
suite_log_directory = os.path.join(self.log_directory,
random_suite_folder_id)
SlapOSControler.createFolders(suite_log_directory)
self.suite_log_path = os.path.join(suite_log_directory,
'suite.log')
return self.getSuiteLogPath(), random_suite_folder_id
def getSuiteLogPath(self):
return getattr(self,"suite_log_path", None)
class TestNode(object):
def __init__(self, log, config, max_log_time=MAX_LOG_TIME,
......@@ -135,19 +72,22 @@ class TestNode(object):
self.log = log
self.config = config or {}
self.process_manager = ProcessManager(log)
self.working_directory = config['working_directory']
self.node_test_suite_dict = {}
self.file_handler = None
self.max_log_time = max_log_time
self.max_temp_time = max_temp_time
self.file_handler = None
self.url_access = "https://[0::0]:0123" # Ipv6 + port of the node
def checkOldTestSuite(self,test_suite_data):
config = self.config
installed_reference_set = set(os.listdir(config['working_directory']))
installed_reference_set = set(os.listdir(self.working_directory))
wished_reference_set = set([x['test_suite_reference'] for x in test_suite_data])
to_remove_reference_set = installed_reference_set.difference(
wished_reference_set)
for y in to_remove_reference_set:
fpath = os.path.join(config['working_directory'],y)
fpath = os.path.join(self.working_directory,y)
self.delNodeTestSuite(y)
self.log("testnode.checkOldTestSuite, DELETING : %r" % (fpath,))
if os.path.isdir(fpath):
......@@ -166,7 +106,14 @@ class TestNode(object):
if self.node_test_suite_dict.has_key(reference):
self.node_test_suite_dict.pop(reference)
def constructProfile(self, node_test_suite):
def _dealShebang(self,run_test_suite_path):
line = open(run_test_suite_path, 'r').readline()
invocation_list = []
if line[:2] == '#!':
invocation_list = line[2:].split()
return invocation_list
def constructProfile(self, node_test_suite, test_type, use_relative_path=False):
config = self.config
profile_content = ''
assert len(node_test_suite.vcs_repository_list), "we must have at least one repository"
......@@ -184,12 +131,47 @@ class TestNode(object):
profile_path_count += 1
if profile_path_count > 1:
raise ValueError(PROFILE_PATH_KEY + ' defined more than once')
# Absolute path to relative path
software_config_path = os.path.join(repository_path, profile_path)
if use_relative_path :
from_path = os.path.join(self.working_directory,
node_test_suite.reference)
software_config_path = os.path.relpath(software_config_path, from_path)
profile_content_list.append("""
[buildout]
extends = %(software_config_path)s
""" % {'software_config_path': os.path.join(repository_path, profile_path)})
""" % {'software_config_path': software_config_path})
# Construct sections
if not(buildout_section_id is None):
# Absolute path to relative
if use_relative_path:
from_path = os.path.join(self.working_directory,
node_test_suite.reference)
repository_path = os.path.relpath(repository_path, from_path)
if test_type=="ScalabilityTest":
# updater = Updater(repository_path, git_binary=self.config['git_binary'],
# branch = vcs_repository.get('branch','master'), log=self.log, process_manager=self.process_manager)
# updater.checkout()
# revision = updater.getRevision()[1]
all_revision = node_test_suite.revision
# from 'sec1=xx-azer,sec2=yy-qwer,..' to [[sec1,azer],[sec2,qwer],..]
revision_list = [ [x.split('=')[0],x.split('=')[1].split('-')[1]] for x in all_revision.split(',') ]
# from [[sec1,azer],[sec2,qwer],..] to {sec1:azer,sec2:qwer,..}
revision_dict = {branch:revision for branch,revision in revision_list}
# <obfuscated_url> word is modified by in runner.prepareSlapOSForTestSuite()
profile_content_list.append("""
[%(buildout_section_id)s]
repository = <obfuscated_url>/%(buildout_section_id)s/%(buildout_section_id)s.git
revision = %(revision)s
ignore-ssl-certificate = true
""" % {'buildout_section_id': buildout_section_id,
'revision': revision_dict[buildout_section_id]})
else:
profile_content_list.append("""
[%(buildout_section_id)s]
repository = %(repository_path)s
......@@ -199,6 +181,7 @@ branch = %(branch)s
'branch' : vcs_repository.get('branch','master')})
if not profile_path_count:
raise ValueError(PROFILE_PATH_KEY + ' not defined')
# Write file
custom_profile = open(node_test_suite.custom_profile_path, 'w')
# sort to have buildout section first
profile_content_list.sort(key=lambda x: [x, ''][x.startswith('\n[buildout]')])
......@@ -276,113 +259,10 @@ branch = %(branch)s
revision=revision, log=log,
process_manager=self.process_manager)
updater.checkout()
updater.git_update_server_info()
updater.git_create_repository_link()
node_test_suite.revision = test_result.revision
def _prepareSlapOS(self, working_directory, slapos_instance, log,
create_partition=1, software_path_list=None, **kw):
"""
Launch slapos to build software and partitions
"""
slapproxy_log = os.path.join(self.config['log_directory'],
'slapproxy.log')
log('Configured slapproxy log to %r' % slapproxy_log)
reset_software = slapos_instance.retry_software_count > 10
if reset_software:
slapos_instance.retry_software_count = 0
log('testnode, retry_software_count : %r' % \
slapos_instance.retry_software_count)
self.slapos_controler = SlapOSControler.SlapOSControler(
working_directory, self.config, log)
self.slapos_controler.initializeSlapOSControler(slapproxy_log=slapproxy_log,
process_manager=self.process_manager, reset_software=reset_software,
software_path_list=software_path_list)
self.process_manager.supervisord_pid_file = os.path.join(\
self.slapos_controler.instance_root, 'var', 'run', 'supervisord.pid')
method_list= ["runSoftwareRelease"]
if create_partition:
method_list.append("runComputerPartition")
for method_name in method_list:
slapos_method = getattr(self.slapos_controler, method_name)
status_dict = slapos_method(self.config,
environment=self.config['environment'],
)
if status_dict['status_code'] != 0:
slapos_instance.retry = True
slapos_instance.retry_software_count += 1
raise SubprocessError(status_dict)
else:
slapos_instance.retry_software_count = 0
return status_dict
def prepareSlapOSForTestNode(self, test_node_slapos):
"""
We will build slapos software needed by the testnode itself,
like the building of selenium-runner by default
"""
return self._prepareSlapOS(self.config['slapos_directory'],
test_node_slapos, self.log, create_partition=0,
software_path_list=self.config.get("software_list"))
def prepareSlapOSForTestSuite(self, node_test_suite):
log = self.log
if log is None:
log = self.log
return self._prepareSlapOS(node_test_suite.working_directory,
node_test_suite, log,
software_path_list=[node_test_suite.custom_profile_path])
def _dealShebang(self,run_test_suite_path):
line = open(run_test_suite_path, 'r').readline()
invocation_list = []
if line[:2] == '#!':
invocation_list = line[2:].split()
return invocation_list
def runTestSuite(self, node_test_suite, portal_url, log=None):
config = self.config
parameter_list = []
run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" % \
self.slapos_controler.instance_root)
if not len(run_test_suite_path_list):
raise ValueError('No runTestSuite provided in installed partitions.')
run_test_suite_path = run_test_suite_path_list[0]
run_test_suite_revision = node_test_suite.revision
# Deal with Shebang size limitation
invocation_list = self._dealShebang(run_test_suite_path)
invocation_list.extend([run_test_suite_path,
'--test_suite', node_test_suite.test_suite,
'--revision', node_test_suite.revision,
'--test_suite_title', node_test_suite.test_suite_title,
'--node_quantity', config['node_quantity'],
'--master_url', portal_url])
firefox_bin_list = glob.glob("%s/soft/*/parts/firefox/firefox-slapos" % \
config["slapos_directory"])
if len(firefox_bin_list):
parameter_list.append('--firefox_bin')
xvfb_bin_list = glob.glob("%s/soft/*/parts/xserver/bin/Xvfb" % \
config["slapos_directory"])
if len(xvfb_bin_list):
parameter_list.append('--xvfb_bin')
supported_paramater_set = self.process_manager.getSupportedParameterSet(
run_test_suite_path, parameter_list)
if '--firefox_bin' in supported_paramater_set:
invocation_list.extend(["--firefox_bin", firefox_bin_list[0]])
if '--xvfb_bin' in supported_paramater_set:
invocation_list.extend(["--xvfb_bin", xvfb_bin_list[0]])
if hasattr(node_test_suite,'additional_bt5_repository_id'):
additional_bt5_path = os.path.join(
node_test_suite.working_directory,
node_test_suite.additional_bt5_repository_id)
invocation_list.extend(["--bt5_path", additional_bt5_path])
# From this point, test runner becomes responsible for updating test
# result. We only do cleanup if the test runner itself is not able
# to run.
SlapOSControler.createFolder(node_test_suite.test_suite_directory,
clean=True)
self.process_manager.spawn(*invocation_list,
cwd=node_test_suite.test_suite_directory,
log_prefix='runTestSuite', get_output=False)
def _cleanupLog(self):
config = self.config
log_directory = self.config['log_directory']
......@@ -442,15 +322,39 @@ branch = %(branch)s
self.cleanUp(None)
remote_test_result_needs_cleanup = False
begin = time.time()
self.prepareSlapOSForTestNode(test_node_slapos)
portal_url = config['test_suite_master_url']
portal = taskdistribution.TaskDistributionTool(portal_url, logger=DummyLogger(log))
test_suite_portal = taskdistribution.TaskDistributor(portal_url, logger=DummyLogger(log))
test_suite_json = test_suite_portal.startTestSuite(config['test_node_title'])
test_suite_data = deunicodeData(json.loads(test_suite_json))
portal = taskdistribution.TaskDistributionTool(portal_url,
logger=DummyLogger(log))
self.portal = portal
self.test_suite_portal = taskdistribution.TaskDistributor(
portal_url,
logger=DummyLogger(log))
self.test_suite_portal.subscribeNode(node_title=config['test_node_title'],
computer_guid=config['computer_id'])
test_suite_json = self.test_suite_portal.startTestSuite(
node_title=config['test_node_title'],
computer_guid=config['computer_id'])
test_suite_data = Utils.deunicodeData(json.loads(test_suite_json))
log("Got following test suite data from master : %r" % \
(test_suite_data,))
#Clean-up test suites
try:
my_test_type = self.test_suite_portal.getTestType()
except:
log("testnode, error during requesting getTestType() method \
from the distributor.")
raise NotImplementedError
# Select runner according to the test type
if my_test_type == 'UnitTest':
runner = UnitTestRunner(self)
elif my_test_type == 'ScalabilityTest':
runner = ScalabilityTestRunner(self)
else:
log("testnode, Runner type not implemented.", my_test_type)
raise NotImplementedError
log("Type of current test is %s" %(my_test_type,))
# master testnode gets test_suites, slaves get nothing
runner.prepareSlapOSForTestNode(test_node_slapos)
# Clean-up test suites
self.checkOldTestSuite(test_suite_data)
for test_suite in test_suite_data:
remote_test_result_needs_cleanup = False
......@@ -460,12 +364,16 @@ branch = %(branch)s
working_directory=self.config['working_directory'],
log_directory=self.config['log_directory'])
node_test_suite.edit(**test_suite)
# XXX: temporary hack to prevent empty test_suite
if not hasattr(node_test_suite, 'test_suite'):
node_test_suite.edit(test_suite='')
run_software = True
# Write our own software.cfg to use the local repository
self.constructProfile(node_test_suite)
# kill processes from previous loop if any
self.process_manager.killPreviousRun()
self.getAndUpdateFullRevisionList(node_test_suite)
# Write our own software.cfg to use the local repository
self.constructProfile(node_test_suite, my_test_type,
runner.getRelativePathUsage())
# Make sure we have local repository
test_result = portal.createTestResult(node_test_suite.revision, [],
config['test_node_title'], False,
......@@ -476,13 +384,36 @@ branch = %(branch)s
if test_result is not None:
self.registerSuiteLog(test_result, node_test_suite)
self.checkRevision(test_result,node_test_suite)
node_test_suite.edit(test_result=test_result)
# Now prepare the installation of SlapOS and create instance
status_dict = self.prepareSlapOSForTestSuite(node_test_suite)
status_dict = runner.prepareSlapOSForTestSuite(node_test_suite)
# Give some time so computer partitions may start
# as partitions can be of any kind we have and likely will never have
# a reliable way to check if they are up or not ...
time.sleep(20)
self.runTestSuite(node_test_suite,portal_url)
if my_test_type == 'UnitTest':
runner.runTestSuite(node_test_suite, portal_url)
elif my_test_type == 'ScalabilityTest':
error_message = None
# A problem is appeared during runTestSuite
if status_dict['status_code'] == 1:
error_message = "Software installation too long or error(s) are present during SR install."
else:
status_dict = runner.runTestSuite(node_test_suite, portal_url)
# A problem is appeared during runTestSuite
if status_dict['status_code'] == 1:
error_message = status_dict['error_message']
# If an error is appeared
if error_message:
test_result.reportFailure(
stdout=error_message
)
self.log(error_message)
raise ValueError(error_message)
else:
raise NotImplementedError
# break the loop to get latest priorities from master
break
self.cleanUp(test_result)
......@@ -507,6 +438,8 @@ branch = %(branch)s
node_test_suite.retry = True
continue
except:
ex_type, ex, tb = sys.exc_info()
traceback.print_tb(tb)
log("erp5testnode exception", exc_info=sys.exc_info())
raise
now = time.time()
......@@ -518,9 +451,11 @@ branch = %(branch)s
except:
log("Exception in error handling", exc_info=sys.exc_info())
finally:
if 'tb' in locals():
del tb
# Nice way to kill *everything* generated by run process -- process
# groups working only in POSIX compilant systems
# Exceptions are swallowed during cleanup phas
# Exceptions are swallowed during cleanup phase
log("GENERAL EXCEPTION, QUITING")
self.cleanUp(test_result)
log("GENERAL EXCEPTION, QUITING, cleanup finished")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment