Commit dd887d99 authored by Jérome Perrin's avatar Jérome Perrin

test_result: make a TaskDistributionTestCase

To be able to reuse utility methods and split tests in test classes for better
readability
parent 3438bb87
......@@ -7,9 +7,9 @@ import responses
import httplib
class TestTaskDistribution(ERP5TypeTestCase):
class TaskDistributionTestCase(ERP5TypeTestCase):
def afterSetUp(self):
self.portal = portal = self.getPortalObject()
portal = self.portal
self.test_node_module = self.portal.getDefaultModule(portal_type = 'Test Node Module')
self.test_suite_module = self.portal.getDefaultModule(portal_type = 'Test Suite Module')
self.test_result_module = self.portal.getDefaultModule(portal_type = 'Test Result Module')
......@@ -83,8 +83,6 @@ class TestTaskDistribution(ERP5TypeTestCase):
original_performance_class._getTestNodeModule = self._original_getTestNodeModule
original_performance_class._getTestSuiteModule = self._original_getTestSuiteModule
def _createTestNode(self, quantity=1, reference_correction=0,
specialise_value=None):
if specialise_value is None:
......@@ -133,7 +131,76 @@ class TestTaskDistribution(ERP5TypeTestCase):
)
test_suite.validate()
test_suite_list.append(test_suite)
return test_suite_list
return test_suite_list
def _callOptimizeAlarm(self):
self.portal.portal_alarms.task_distributor_alarm_optimize.activeSense()
self.tic()
def _callRestartStuckTestResultAlarm(self):
self.portal.portal_alarms.test_result_alarm_restarted_stuck_test_result.activeSense()
self.tic()
def processTest(self, test_title, revision, start_count=2, stop_count=2,
node_title='Node0'):
"""start_count: number of test line to start
stop_count: number of test line to stop
"""
status_dict = {}
test_result_path, revision = self._createTestResult(revision=revision,
test_list=['testFoo', 'testBar'], test_title=test_title, node_title=node_title)
if start_count:
line_url, test = self.distributor.startUnitTest(test_result_path,
node_title=node_title)
if start_count == 2:
next_line_url, next_test = self.distributor.startUnitTest(test_result_path,
node_title=node_title)
self.assertEqual(set(['testFoo', 'testBar']), set([test, next_test]))
if stop_count:
self.distributor.stopUnitTest(line_url, status_dict, node_title=node_title)
if stop_count == 2:
self.tool.stopUnitTest(next_line_url, status_dict, node_title=node_title)
test_result = self.portal.restrictedTraverse(test_result_path)
self.assertEqual(test_result.getSimulationState(), "started")
self.tic()
if stop_count == 2:
self.assertEquals(test_result.getSimulationState(), "stopped")
else:
self.assertEquals(test_result.getSimulationState(), "started")
def _cleanupTestResult(self):
self.tic()
cleanup_state_list = ['started', 'stopped']
test_list = self.test_result_module.searchFolder(title='"TEST FOO" OR "test suite %" OR "Default Test Suite"',
simulation_state=cleanup_state_list)
for test_result in test_list:
if test_result.getSimulationState() in cleanup_state_list:
test_result.cancel()
test_result.setTitle('previous test')
self.tic()
def _createTestResult(self, revision="r0=a,r1=a", node_title='Node0',
test_list=None, tic=1, allow_restart=False,
test_title=None, distributor=None):
if test_title is None:
test_title = self.default_test_title
if distributor is None:
distributor = self.distributor
result = distributor.createTestResult(
"", revision, test_list or [], allow_restart,
test_title=test_title, node_title=node_title)
# we commit, since usually we have a remote call only doing this
(self.tic if tic else self.commit)()
return result
def checkTestResultLine(self, test_result, expected):
line_list = test_result.objectValues(portal_type="Test Result Line")
found_list = [(x.getTitle(), x.getSimulationState()) for x in line_list]
found_list.sort(key=lambda x: x[0])
self.assertEqual(expected, found_list)
class TestTaskDistribution(TaskDistributionTestCase):
def test_01_createTestNode(self):
test_node = self._createTestNode()[0]
......@@ -198,14 +265,6 @@ class TestTaskDistribution(ERP5TypeTestCase):
self.portal.portal_workflow.doActionFor(test_suite_clone, 'validate_action')
self.assertEqual('validated', test_suite_clone.getValidationState())
def _callOptimizeAlarm(self):
self.portal.portal_alarms.task_distributor_alarm_optimize.activeSense()
self.tic()
def _callRestartStuckTestResultAlarm(self):
self.portal.portal_alarms.test_result_alarm_restarted_stuck_test_result.activeSense()
self.tic()
def test_subscribeNode_ReturnValue(self):
config = self.distributor.subscribeNode('COMP-X', 'QPK')
config = json.loads(config)
......@@ -262,33 +321,6 @@ class TestTaskDistribution(ERP5TypeTestCase):
[('COMP32-Node1',set([u'B1'])), ('COMP32-Node2',set([u'B0']))]],
"%r" % ([config1, config2],))
def processTest(self, test_title, revision, start_count=2, stop_count=2,
node_title='Node0'):
"""start_count: number of test line to start
stop_count: number of test line to stop
"""
status_dict = {}
test_result_path, revision = self._createTestResult(revision=revision,
test_list=['testFoo', 'testBar'], test_title=test_title, node_title=node_title)
if start_count:
line_url, test = self.distributor.startUnitTest(test_result_path,
node_title=node_title)
if start_count == 2:
next_line_url, next_test = self.distributor.startUnitTest(test_result_path,
node_title=node_title)
self.assertEqual(set(['testFoo', 'testBar']), set([test, next_test]))
if stop_count:
self.distributor.stopUnitTest(line_url, status_dict, node_title=node_title)
if stop_count == 2:
self.tool.stopUnitTest(next_line_url, status_dict, node_title=node_title)
test_result = self.portal.restrictedTraverse(test_result_path)
self.assertEqual(test_result.getSimulationState(), "started")
self.tic()
if stop_count == 2:
self.assertEquals(test_result.getSimulationState(), "stopped")
else:
self.assertEquals(test_result.getSimulationState(), "started")
def test_04b_startTestSuiteOrder(self):
"""
When we have many test suites associated to one test nodes, the method
......@@ -498,31 +530,6 @@ class TestTaskDistribution(ERP5TypeTestCase):
finally:
self.unpinDateTime()
def _cleanupTestResult(self):
self.tic()
cleanup_state_list = ['started', 'stopped']
test_list = self.test_result_module.searchFolder(title='"TEST FOO" OR "test suite %" OR "Default Test Suite"',
simulation_state=cleanup_state_list)
for test_result in test_list:
if test_result.getSimulationState() in cleanup_state_list:
test_result.cancel()
test_result.setTitle('previous test')
self.tic()
def _createTestResult(self, revision="r0=a,r1=a", node_title='Node0',
test_list=None, tic=1, allow_restart=False,
test_title=None, distributor=None):
if test_title is None:
test_title = self.default_test_title
if distributor is None:
distributor = self.distributor
result = distributor.createTestResult(
"", revision, test_list or [], allow_restart,
test_title=test_title, node_title=node_title)
# we commit, since usually we have a remote call only doing this
(self.tic if tic else self.commit)()
return result
def test_05_createTestResult(self):
"""
We will check the method createTestResult of distributor
......@@ -675,12 +682,6 @@ class TestTaskDistribution(ERP5TypeTestCase):
['testFailing', 'testSlow', 'testFast'],
[self.tool.startUnitTest(test_result_path)[1] for _ in range(3)])
def checkTestResultLine(self, test_result, expected):
line_list = test_result.objectValues(portal_type="Test Result Line")
found_list = [(x.getTitle(), x.getSimulationState()) for x in line_list]
found_list.sort(key=lambda x: x[0])
self.assertEqual(expected, found_list)
def test_06b_restartStuckTest(self):
"""
Check if a test result line is not stuck in 'started', if so, redraft
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment