Commit 340acac6 authored by Vincent Pelletier's avatar Vincent Pelletier

Revert "Products.CMFActivity.ActivityTool: Improve behaviour on single-node instances."

Also revert related fixup commits:
  "CMFActivity: fixup do not loop on tic if the node is the distribution node"
  "Products.ERP5Type.tests: Follow-up on ActivityTool.tic signature change."

While the original commit did improve the specific workload it was
designed to improve, it turned out to degrade too much intensive activity
workloads, like initial ERP5 site creation and tests (which, for the
purposes of this change, are the same as a single-zope instance).
Given how easy it is to get a multi-Zope instance, which would solve the
original issue and also provide the better performance necessary anyway
for an instance managing a non-trivial amount of documents, I choose to
revert this change.

I am not reverting several loosely-related changes I applied, which
rather fix real bugs uncovered by the different activity execution
scheme this change provided, especially by letting tests'
"stop_condition" callback being executed a lot more often between
activities, uncovering missing dependencies and unrealistic test
expectations, whose fixes should be beneficial independently from the
reverted code.

This reverts commit 4dfafbc9.
This reverts commit 4eb26017.
This reverts commit 041642d0.
parent 1fdf2fbc
......@@ -1293,9 +1293,8 @@ class ActivityTool (BaseTool):
self.registerNode(currentNode)
processing_node_list = self.getNodeList(role=ROLE_PROCESSING)
is_distributing_node = self.getDistributingNode() == currentNode
# only distribute when we are the distributingNode
if is_distributing_node:
if self.getDistributingNode() == currentNode:
self.distribute(len(processing_node_list))
# SkinsTool uses a REQUEST cache to store skin objects, as
......@@ -1311,20 +1310,7 @@ class ActivityTool (BaseTool):
# the processing_node numbers are the indices of the elements
# in the node tuple +1 because processing_node starts form 1
if currentNode in processing_node_list:
self.tic(
processing_node=processing_node_list.index(currentNode) + 1,
# Tell tic it cannot keep processing activities forever when
# current node is also the distribution node: it must return
# in order for validation to happen. Otherwise, there can be
# an accumulation of activities in processing_node=-1 which
# would otherwise be possible to execute.
# To minimize the impact of this control, this is computed
# here, so if the current node is given the distribution node
# role while already in the "tic" method, it will keep looping.
# Node role changes are rare and several are dangerous on a
# busy cluster, so it should not be a big drawback.
can_loop=not is_distributing_node,
)
self.tic(processing_node_list.index(currentNode) + 1)
except:
# Catch ALL exception to avoid killing timerserver.
LOG('ActivityTool', ERROR, 'process_timer received an exception',
......@@ -1344,7 +1330,7 @@ class ActivityTool (BaseTool):
activity.distribute(aq_inner(self), node_count)
security.declarePublic('tic')
def tic(self, processing_node=1, force=0, can_loop=True):
def tic(self, processing_node=1, force=0):
"""
Starts again an activity
processing_node starts from 1 (there is not node 0)
......@@ -1385,8 +1371,6 @@ class ActivityTool (BaseTool):
break
else:
break
if not can_loop:
break
finally:
is_running_lock.release()
finally:
......@@ -1478,13 +1462,9 @@ class ActivityTool (BaseTool):
elif node != 'same':
kw['node'] = self.getFamilyId(node)
break
processing_node_list = self.getNodeList(role=ROLE_PROCESSING)
if len(processing_node_list) < 2:
# If there is less than 2 processing nodes at the time this activity
# is spawned, then ignore "same" (including implicit "same").
break
try:
kw['node'] = 1 + processing_node_list.index(getCurrentNode())
kw['node'] = 1 + self.getNodeList(
role=ROLE_PROCESSING).index(getCurrentNode())
except ValueError:
pass
break
......
......@@ -85,7 +85,7 @@ def patchActivityTool():
# When there is more than 1 node, prevent the distributing node from
# processing activities.
@patch
def tic(self, processing_node=1, force=0, can_loop=True):
def tic(self, processing_node=1, force=0):
processing_node_list = self.getProcessingNodeList()
if len(processing_node_list) > 1 and \
getCurrentNode() == self.getDistributingNode():
......@@ -94,7 +94,7 @@ def patchActivityTool():
transaction.commit()
transaction.begin()
else:
self._orig_tic(processing_node, force, can_loop=can_loop)
self._orig_tic(processing_node, force)
def Application_resolveConflict(self, old_state, saved_state, new_state):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment