Commit 487001ac authored by Sebastien Robin's avatar Sebastien Robin

ERP5ProjectDistributor: enhance distribution algorithm to benefit from additional test nodes

With previous algorithm, work was given to additional test nodes only when:
- we were previously below the needed capacity
- when another test node was dying

Now, as soon as a new test node is added, we move work of overloaded test nodes to
idle test nodes. We try to move only test suite using many test nodes to avoid having to
wait for building time.

This allows to have better distribution of the work with the idea to have more quickly test results.
This will avoid cases where we have several testnodes assigned to no work at all.

Finally, fixed distribution algorithm to avoid some unfair cases where a test suite might
have more test node than another while they both ask for the same number of test nodes.
parent 62a9d4c8
...@@ -484,10 +484,11 @@ class TestTaskDistribution(ERP5TypeTestCase): ...@@ -484,10 +484,11 @@ class TestTaskDistribution(ERP5TypeTestCase):
self.tic() self.tic()
self._callOptimizeAlarm() self._callOptimizeAlarm()
for test_node, aggregate_list in args: for test_node, aggregate_list in args:
self.assertEqual(set(test_node.getAggregateList()), test_note_aggregate_title_list = [x.split(" ")[-1] for x in test_node.getAggregateTitleList()]
self.assertEqual(set(test_note_aggregate_title_list),
set(aggregate_list), set(aggregate_list),
"incorrect aggregate for %r, got %r instead of %r" % \ "incorrect aggregate for %r, got %r instead of %r" % \
(test_node.getTitle(), test_node.getAggregateList(), aggregate_list)) (test_node.getTitle(), test_note_aggregate_title_list, aggregate_list))
def test_11_checkERP5ProjectOptimizationIsStable(self): def test_11_checkERP5ProjectOptimizationIsStable(self):
""" """
...@@ -497,96 +498,121 @@ class TestTaskDistribution(ERP5TypeTestCase): ...@@ -497,96 +498,121 @@ class TestTaskDistribution(ERP5TypeTestCase):
test_node_one, test_node_two = self._createTestNode(quantity=2) test_node_one, test_node_two = self._createTestNode(quantity=2)
test_suite_one = self._createTestSuite(reference_correction=+0, test_suite_one = self._createTestSuite(reference_correction=+0,
title='one')[0] title='one')[0]
test_suite_one_url = test_suite_one.getRelativeUrl() self._createTestSuite(reference_correction=+1,
test_suite_two_url = self._createTestSuite(reference_correction=+1,
title='two')[0].getRelativeUrl() title='two')[0].getRelativeUrl()
self.tic() self.tic()
self._callOptimizeAlarm() self._callOptimizeAlarm()
check = self._checkTestSuiteAggregateList check = self._checkTestSuiteAggregateList
check([test_node_one, [test_suite_one_url]], check([test_node_one, ["one"]],
[test_node_two, [test_suite_two_url]]) [test_node_two, ["two"]])
# first test suite is invalidated, so it should be removed from nodes, # first test suite is invalidated, so it should be removed from nodes,
# but this should not change assignment of second test suite # but this should not change assignment of second test suite
test_suite_one.invalidate() test_suite_one.invalidate()
check([test_node_one, []], check([test_node_one, []],
[test_node_two, [test_suite_two_url]]) [test_node_two, ["two"]])
# an additional test node is added, with lower title, this should # an additional test node is added, with lower title, this should
# still not change anyting # still not change anyting
test_node_zero = self._createTestNode(quantity=1, reference_correction=-1)[0] test_node_zero = self._createTestNode(quantity=1, reference_correction=-1)[0]
check([test_node_zero, []], check([test_node_zero, []],
[test_node_one, []], [test_node_one, []],
[test_node_two, [test_suite_two_url]]) [test_node_two, ["two"]])
# test suite one is validated again, it is installed on first # test suite one is validated again, it is installed on first
# available test node # available test node
test_suite_one.validate() test_suite_one.validate()
check([test_node_zero, [test_suite_one_url]], check([test_node_zero, ["one"]],
[test_node_one, []], [test_node_one, []],
[test_node_two, [test_suite_two_url]]) [test_node_two, ["two"]])
# for some reasons, test_node two is dead, so the work is distributed # for some reasons, test_node two is dead, so the work is distributed
# to remaining test nodes # to remaining test nodes
test_node_two.invalidate() test_node_two.invalidate()
check([test_node_zero, [test_suite_one_url]], check([test_node_zero, ["one"]],
[test_node_one, [test_suite_two_url]], [test_node_one, ["two"]],
[test_node_two, []]) [test_node_two, []])
# we add another test suite, since all test node already have one # we add another test suite, since all test node already have one
# test suite, the new test suite is given to first available one # test suite, the new test suite is given to first available one
test_suite_three_url = self._createTestSuite(reference_correction=+2, self._createTestSuite(reference_correction=+2,
title='three')[0].getRelativeUrl() title='three')[0].getRelativeUrl()
check([test_node_zero, [test_suite_one_url, test_suite_three_url]], check([test_node_zero, ["one", "three"]],
[test_node_one, [test_suite_two_url]], [test_node_one, ["two"]],
[test_node_two, []]) [test_node_two, []])
# test node two is coming back. However we do not change any assignment # test node two is coming back. To have better repartition of work,
# to avoid uninstalling stuff on nodes # move some work from overloaded test node to less busy test node, while
# still trying to move as less test suite as possible (here only one)
test_node_two.validate() test_node_two.validate()
check([test_node_zero, [test_suite_one_url, test_suite_three_url]], check([test_node_zero, ["three"]],
[test_node_one, [test_suite_two_url]], [test_node_one, ["two"]],
[test_node_two, []]) [test_node_two, ["one"]])
# Now let's create a test suite needing between 1 to 2 test nodes # Now let's create a test suite needing between 1 to 2 test nodes
# We check that nodes with less suites are completed first # Make sure additional work is added without moving other test suites
test_suite_four_url = self._createTestSuite(reference_correction=+5, self._createTestSuite(reference_correction=+3,
priority=4, title='four')[0].getRelativeUrl() priority=4, title='four')[0].getRelativeUrl()
check([test_node_zero, [test_suite_one_url, test_suite_three_url]], check([test_node_zero, ["three", "four"]],
[test_node_one, [test_suite_two_url, test_suite_four_url]], [test_node_one, ["two", "four"]],
[test_node_two, [test_suite_four_url]]) [test_node_two, ["one"]])
# Now let's create a 2 test suite needing between 2 to 3 test nodes # Now let's create a a test suite needing 1 nodes
# to make all test nodes almost satured # to make sure test nodes with less work get the work first
test_suite_five_url = self._createTestSuite(reference_correction=+6, test_suite_five = self._createTestSuite(reference_correction=+4,
priority=7, title='five')[0].getRelativeUrl() title='five')[0]
test_suite_six_url = self._createTestSuite(reference_correction=+7, check([test_node_zero, ["three", "four"]],
priority=7, title='six')[0].getRelativeUrl() [test_node_one, ["two", "four"]],
check([test_node_zero, [test_suite_one_url, test_suite_three_url, [test_node_two, ["one", "five"]])
test_suite_five_url, test_suite_six_url]], # Now let's create another test suite needing between 2 to 3 test nodes
[test_node_one, [test_suite_two_url, test_suite_four_url, # and increase priority of one suite to make all test nodes almost satured
test_suite_five_url, test_suite_six_url]], test_suite_five.setIntIndex(7)
[test_node_two, [test_suite_four_url, self._createTestSuite(reference_correction=+5,
test_suite_five_url, test_suite_six_url]]) priority=7, title='six')
check([test_node_zero, ["three", "four","five", "six"]],
[test_node_one, ["two", "four", "five", "six"]],
[test_node_two, ["one", "five", "six"]])
# Then, check what happens if all nodes are more than saturated # Then, check what happens if all nodes are more than saturated
# with a test suite needing between 3 to 5 test nodes # with a test suite needing between 3 to 5 test nodes
test_suite_seven_url = self._createTestSuite(reference_correction=+4, self._createTestSuite(reference_correction=+6,
priority=9, title='seven')[0].getRelativeUrl() priority=9, title='seven')
check([test_node_zero, [test_suite_one_url, test_suite_three_url, check([test_node_zero, ["three", "four", "five", "six"]],
test_suite_five_url, test_suite_six_url]], [test_node_one, ["two", "four", "five", "six"]],
[test_node_one, [test_suite_two_url, test_suite_four_url, [test_node_two, ["one", "seven", "five", "six"]])
test_suite_five_url, test_suite_six_url]],
[test_node_two, [test_suite_four_url, test_suite_seven_url,
test_suite_five_url, test_suite_six_url]])
# No place any more, adding more test suite has no consequence # No place any more, adding more test suite has no consequence
test_suite_height_url = self._createTestSuite(reference_correction=+8, # we need 5*2 + 3*2 + 2*1 + 1*3 => 21 slots
priority=9, title='height')[0].getRelativeUrl() self._createTestSuite(reference_correction=+7,
check([test_node_zero, [test_suite_one_url, test_suite_three_url, priority=9, title='height')
test_suite_five_url, test_suite_six_url]], check([test_node_zero, ["three", "four", "five", "six"]],
[test_node_one, [test_suite_two_url, test_suite_four_url, [test_node_one, ["two", "four", "five", "six"]],
test_suite_five_url, test_suite_six_url]], [test_node_two, ["one", "seven", "five", "six"]])
[test_node_two, [test_suite_four_url, test_suite_seven_url,
test_suite_five_url, test_suite_six_url]])
# free some place by removing a test suite # free some place by removing a test suite
self.portal.unrestrictedTraverse(test_suite_five_url).invalidate() # make sure free slots are fairly distributed to test suite having
check([test_node_zero, [test_suite_one_url, test_suite_three_url, # less test nodes
test_suite_six_url, test_suite_seven_url]], # We remove 3 slots, so we would need 18 slots
[test_node_one, [test_suite_two_url, test_suite_four_url, test_suite_five.invalidate()
test_suite_six_url, test_suite_seven_url]], check([test_node_zero, ["three", "four", "height", "six"]],
[test_node_two, [test_suite_four_url, test_suite_six_url, [test_node_one, ["two", "four", "seven" , "six"]],
test_suite_seven_url, test_suite_height_url]]) [test_node_two, ["one", "seven", "height" , "six"]])
# Check that additional test node would get work for missing assignments
# No move a test suite is done since in average we miss slots
test_node_three, = self._createTestNode(reference_correction=2)
check([test_node_zero, ["three", "four", "height", "six"]],
[test_node_one, ["two", "four", "seven" , "six"]],
[test_node_two, ["one", "seven", "height" , "six"]],
[test_node_three, ["seven", "height"]])
# With even more test node, check that we move some work to less
# busy test nodes
test_node_four, = self._createTestNode(reference_correction=3)
test_node_five, = self._createTestNode(reference_correction=4)
check([test_node_zero, ["three", "six", "height"]],
[test_node_one, ["two", "six", "seven"]],
[test_node_two, ["one", "seven", "height"]],
[test_node_three, ["four", "seven", "height"]],
[test_node_four, ["four", "seven", "height"]],
[test_node_five, ["six", "seven", "height"]])
test_node_six, = self._createTestNode(reference_correction=5)
test_node_seven, = self._createTestNode(reference_correction=6)
check([test_node_zero, ["three", "height"]],
[test_node_one, ["two", "seven"]],
[test_node_two, ["one", "height"]],
[test_node_three, ["seven", "height"]],
[test_node_four, ["four", "seven", "height"]],
[test_node_five, ["six", "seven", "height"]],
[test_node_six, ["six", "seven"]],
[test_node_seven, ["four", "six"]])
def test_12_checkCloudPerformanceOptimizationIsStable(self): def test_12_checkCloudPerformanceOptimizationIsStable(self):
""" """
...@@ -595,58 +621,51 @@ class TestTaskDistribution(ERP5TypeTestCase): ...@@ -595,58 +621,51 @@ class TestTaskDistribution(ERP5TypeTestCase):
""" """
test_node_one, test_node_two = self._createTestNode(quantity=2, test_node_one, test_node_two = self._createTestNode(quantity=2,
specialise_value=self.performance_distributor) specialise_value=self.performance_distributor)
test_suite_list = self._createTestSuite(quantity=2, test_suite_one, = self._createTestSuite(
title='one', specialise_value=self.performance_distributor)
self._createTestSuite(title='two', reference_correction=+1,
specialise_value=self.performance_distributor) specialise_value=self.performance_distributor)
self.tic() self.tic()
self._callOptimizeAlarm() self._callOptimizeAlarm()
test_suite_one, test_suite_two = test_suite_list
test_suite_one_url, test_suite_two_url = [x.getRelativeUrl() for x in
test_suite_list]
check = self._checkTestSuiteAggregateList check = self._checkTestSuiteAggregateList
check([test_node_one, [test_suite_one_url, test_suite_two_url]], check([test_node_one, ["one", "two"]],
[test_node_two, [test_suite_one_url, test_suite_two_url]]) [test_node_two, ["one", "two"]])
# first test suite is invalidated, so it should be removed from nodes, # first test suite is invalidated, so it should be removed from nodes,
# but this should not change assignment of second test suite # but this should not change assignment of second test suite
test_suite_one.invalidate() test_suite_one.invalidate()
check([test_node_one, [test_suite_two_url]], check([test_node_one, ["two"]],
[test_node_two, [test_suite_two_url]]) [test_node_two, ["two"]])
# an additional test node is added, with lower title, it should # an additional test node is added, with lower title, it should
# get in any case all test suite # get in any case all test suite
test_node_zero = self._createTestNode(quantity=1, reference_correction=-1, test_node_zero = self._createTestNode(quantity=1, reference_correction=-1,
specialise_value=self.performance_distributor)[0] specialise_value=self.performance_distributor)[0]
check([test_node_zero, [test_suite_two_url]], check([test_node_zero, ["two"]],
[test_node_one, [test_suite_two_url]], [test_node_one, ["two"]],
[test_node_two, [test_suite_two_url]]) [test_node_two, ["two"]])
# test suite one is validating again, it is installed on first # test suite one is validating again, it is installed on first
# available test node # available test node
test_suite_one.validate() test_suite_one.validate()
check([test_node_zero, [test_suite_one_url, test_suite_two_url]], check([test_node_zero, ["one", "two"]],
[test_node_one, [test_suite_one_url, test_suite_two_url]], [test_node_one, ["one", "two"]],
[test_node_two, [test_suite_one_url, test_suite_two_url]]) [test_node_two, ["one", "two"]])
# for some reasons, test_node two is dead, this has no consequence # for some reasons, test_node two is dead, this has no consequence
# for others # for others
test_node_two.invalidate() test_node_two.invalidate()
check([test_node_zero, [test_suite_one_url, test_suite_two_url]], check([test_node_zero, ["one", "two"]],
[test_node_one, [test_suite_one_url, test_suite_two_url]], [test_node_one, ["one", "two"]],
[test_node_two, [test_suite_one_url, test_suite_two_url]]) [test_node_two, ["one", "two"]])
# we add another test suite, all test nodes should run it, except # we add another test suite, all test nodes should run it, except
# test_node_two which is dead # test_node_two which is dead
test_suite_three_url = self._createTestSuite(reference_correction=+2, self._createTestSuite(title="three", reference_correction=+2,
specialise_value=self.performance_distributor)[0]\ specialise_value=self.performance_distributor)
.getRelativeUrl() check([test_node_zero, ["one", "two", "three"]],
check([test_node_zero, [test_suite_one_url, test_suite_two_url, [test_node_one, ["one", "two", "three"]],
test_suite_three_url]], [test_node_two, ["one", "two"]])
[test_node_one, [test_suite_one_url, test_suite_two_url,
test_suite_three_url]],
[test_node_two, [test_suite_one_url, test_suite_two_url]])
# test node two is coming back. It should run all test suites # test node two is coming back. It should run all test suites
test_node_two.validate() test_node_two.validate()
check([test_node_zero, [test_suite_one_url, test_suite_two_url, check([test_node_zero, ["one", "two", "three"]],
test_suite_three_url]], [test_node_one, ["one", "two", "three"]],
[test_node_one, [test_suite_one_url, test_suite_two_url, [test_node_two, ["one", "two", "three"]])
test_suite_three_url]],
[test_node_two, [test_suite_one_url, test_suite_two_url,
test_suite_three_url]])
# now we are going to # now we are going to
def test_13_startTestSuiteWithOneTestNodeAndPerformanceDistributor(self): def test_13_startTestSuiteWithOneTestNodeAndPerformanceDistributor(self):
......
...@@ -123,12 +123,16 @@ class ERP5ProjectUnitTestDistributor(XMLObject): ...@@ -123,12 +123,16 @@ class ERP5ProjectUnitTestDistributor(XMLObject):
specialise_uid=self.getUid(), sort_on=[('title','ascending')])] specialise_uid=self.getUid(), sort_on=[('title','ascending')])]
test_node_list_len = len(test_node_list) test_node_list_len = len(test_node_list)
def _optimizeConfiguration(test_suite_list_to_add, level=0): def _optimizeConfiguration(test_suite_list_to_add, level=0,
test_node_list_to_optimize=None,
test_suite_max=TEST_SUITE_MAX):
if test_node_list_to_optimize is None:
test_node_list_to_optimize = [x for x in test_node_list]
if test_suite_list_to_add: if test_suite_list_to_add:
test_node_list_to_remove = [] test_node_list_to_remove = []
for test_node in test_node_list: for test_node in test_node_list_to_optimize:
# We can no longer add more test suite on this test node # We can no longer add more test suite on this test node
if TEST_SUITE_MAX < (level + 1): if test_suite_max < (level + 1):
test_node_list_to_remove.append(test_node) test_node_list_to_remove.append(test_node)
continue continue
test_suite_list = test_node.getAggregateList() test_suite_list = test_node.getAggregateList()
...@@ -141,15 +145,53 @@ class ERP5ProjectUnitTestDistributor(XMLObject): ...@@ -141,15 +145,53 @@ class ERP5ProjectUnitTestDistributor(XMLObject):
if len(test_suite_list_to_add) == 0: if len(test_suite_list_to_add) == 0:
break break
for test_node in test_node_list_to_remove: for test_node in test_node_list_to_remove:
test_node_list.remove(test_node) test_node_list_to_optimize.remove(test_node)
if test_suite_list_to_add and test_node_list: if test_suite_list_to_add and test_node_list_to_optimize:
_optimizeConfiguration(test_suite_list_to_add, level=level+1) _optimizeConfiguration(test_suite_list_to_add, level=level+1,
test_node_list_to_optimize=test_node_list_to_optimize,
test_suite_max=test_suite_max)
test_suite_list_to_add = self._getSortedNodeTestSuiteList() test_suite_score, test_suite_list_to_add = self._getSortedNodeTestSuiteList()
average_quantity = float(len(test_suite_list_to_add)) / (test_node_list_len or 1)
test_suite_list_to_remove = self._checkCurrentConfiguration(test_node_list, test_suite_list_to_remove = self._checkCurrentConfiguration(test_node_list,
test_suite_list_to_add) test_suite_list_to_add)
self._cleanupTestNodeList(test_node_list, test_suite_list_to_remove) self._cleanupTestNodeList(test_node_list, test_suite_list_to_remove)
_optimizeConfiguration(test_suite_list_to_add) _optimizeConfiguration(test_suite_list_to_add)
# once we removed useless test suite and added needed ones,
# we check if we can move some test suites to testnodes that are
# more idle than others. We try to move first test suites using
# more test nodes, this reduce risk of moving a test suite assigned
# on a single test node (to avoid waiting building)
overloaded_test_node_list = []
lazy_test_node_list = []
int_average_quantity = int(average_quantity)
# Find testnode which can accept more work
for test_node in test_node_list:
aggregate_len = len(test_node.getAggregateList())
if aggregate_len <= (average_quantity - 1):
lazy_test_node_list.append(test_node)
# check on most overloaded test nodes first if we can move some work to lazy
# test nodes
for aggregate_quantity in range(TEST_SUITE_MAX, int_average_quantity, -1):
if len(lazy_test_node_list) == 0:
break
overloaded_test_node_list = [x for x in test_node_list if len(x.getAggregateList()) == aggregate_quantity]
for test_node in overloaded_test_node_list:
test_suite_list = test_node.getAggregateList()
test_suite_list.sort(key=lambda x: (-test_suite_score[x][-1],
portal.unrestrictedTraverse(x).getTitle()))
for test_suite in test_suite_list:
test_suite_list_to_move = [test_suite]
_optimizeConfiguration(test_suite_list_to_move,
test_node_list_to_optimize=lazy_test_node_list,
test_suite_max=int_average_quantity)
if len(test_suite_list_to_move) == 0:
# This means we were able to affect the test suite to another test node
test_suite_list.remove(test_suite)
test_node.setAggregateList(test_suite_list)
break
if len(lazy_test_node_list) == 0:
break
def _getSortedNodeTestSuiteList(self): def _getSortedNodeTestSuiteList(self):
""" """
...@@ -157,12 +199,16 @@ class ERP5ProjectUnitTestDistributor(XMLObject): ...@@ -157,12 +199,16 @@ class ERP5ProjectUnitTestDistributor(XMLObject):
can be installed on at most 2 test nodes, it will be twice can be installed on at most 2 test nodes, it will be twice
in the returned list. We give a score for every wished test suites. in the returned list. We give a score for every wished test suites.
The lower score, the better chance it has to be installed. The lower score, the better chance it has to be installed.
A test_suite_score is also returned allowing to quickly identify
which test suite migh be removed on test node with too many test suites
""" """
test_suite_module = self._getTestSuiteModule() test_suite_module = self._getTestSuiteModule()
portal = self.getPortalObject() portal = self.getPortalObject()
test_suite_list = test_suite_module.searchFolder(validation_state="validated", test_suite_list = test_suite_module.searchFolder(validation_state="validated",
specialise_uid=self.getUid()) specialise_uid=self.getUid())
all_test_suite_list = [] all_test_suite_list = []
test_suite_score = {}
for test_suite in test_suite_list: for test_suite in test_suite_list:
test_suite = test_suite.getObject() test_suite = test_suite.getObject()
test_suite_url = test_suite.getRelativeUrl() test_suite_url = test_suite.getRelativeUrl()
...@@ -173,13 +219,17 @@ class ERP5ProjectUnitTestDistributor(XMLObject): ...@@ -173,13 +219,17 @@ class ERP5ProjectUnitTestDistributor(XMLObject):
node_quantity_min = PRIORITY_MAPPING[int_index][0]/3 node_quantity_min = PRIORITY_MAPPING[int_index][0]/3
node_quantity_max = PRIORITY_MAPPING[int_index][1]/3 node_quantity_max = PRIORITY_MAPPING[int_index][1]/3
for x in xrange(0, node_quantity_min): for x in xrange(0, node_quantity_min):
all_test_suite_list.append((x/(x+1),test_suite_url, title)) score = float(x)/(x+1)
all_test_suite_list.append((score, test_suite_url, title))
test_suite_score.setdefault(test_suite_url, []).append(score)
# additional suites, lower score # additional suites, lower score
for x in xrange(0, node_quantity_max - for x in xrange(0, node_quantity_max -
node_quantity_min ): node_quantity_min ):
score = float(1) + x/(x+1)
all_test_suite_list.append((1 + x/(x+1), test_suite_url, title)) all_test_suite_list.append((1 + x/(x+1), test_suite_url, title))
test_suite_score.setdefault(test_suite_url, []).append(score)
all_test_suite_list.sort(key=lambda x: (x[0], x[2])) all_test_suite_list.sort(key=lambda x: (x[0], x[2]))
return [x[1] for x in all_test_suite_list] return test_suite_score, [x[1] for x in all_test_suite_list]
def _getTestNodeModule(self): def _getTestNodeModule(self):
return self.getPortalObject().test_node_module return self.getPortalObject().test_node_module
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment