Commit 84a277f6 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: review saving of activity result on active process

- reduce probability and cost of ConflictError by posting the
  activity result by activity
- do not post None (to force it, it's still possible to forge an
  ActiveResult object containing the None value)
- allow grouped messages to post results (previous implementation was not
  working, and this required to change the API of grouping methods)
parent 3b17be37
...@@ -255,19 +255,18 @@ class Message(BaseMessage): ...@@ -255,19 +255,18 @@ class Message(BaseMessage):
noSecurityManager() noSecurityManager()
return user return user
def activateResult(self, activity_tool, result, object): def activateResult(self, active_process, result, object):
if self.active_process is not None: if not isinstance(result, ActiveResult):
active_process = activity_tool.unrestrictedTraverse(self.active_process) result = ActiveResult(result=result)
if isinstance(result, ActiveResult): # XXX Allow other method_id in future
result.edit(object_path=object) result.edit(object_path=object, method_id=self.method_id)
result.edit(method_id=self.method_id) kw = self.activity_kw
# XXX Allow other method_id in future kw = dict((k, kw[k]) for k in ('priority', 'tag') if k in kw)
active_process.activateResult(result) # Save result in a separate activity to reduce
else: # probability and cost of conflict error.
active_process.activateResult( active_process.activate(activity='SQLQueue',
ActiveResult(object_path=object, group_method_id=None, # dummy group method
method_id=self.method_id, **kw).activateResult(result)
result=result)) # XXX Allow other method_id in future
def __call__(self, activity_tool): def __call__(self, activity_tool):
try: try:
...@@ -305,7 +304,10 @@ class Message(BaseMessage): ...@@ -305,7 +304,10 @@ class Message(BaseMessage):
setSecurityManager(old_security_manager) setSecurityManager(old_security_manager)
if method is not None: if method is not None:
self.activateResult(activity_tool, result, obj) if self.active_process and result is not None:
self.activateResult(
activity_tool.unrestrictedTraverse(self.active_process),
result, obj)
self.setExecutionState(MESSAGE_EXECUTED) self.setExecutionState(MESSAGE_EXECUTED)
except: except:
self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool) self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
...@@ -1204,8 +1206,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1204,8 +1206,7 @@ class ActivityTool (Folder, UniqueObject):
'invoking group messages: method_id=%s, paths=%s' 'invoking group messages: method_id=%s, paths=%s'
% (method_id, ['/'.join(m.object_path) for m in message_list])) % (method_id, ['/'.join(m.object_path) for m in message_list]))
# Invoke a group method. # Invoke a group method.
expanded_object_list = [] message_dict = {}
new_message_list = []
path_set = set() path_set = set()
# Filter the list of messages. If an object is not available, mark its # Filter the list of messages. If an object is not available, mark its
# message as non-executable. In addition, expand an object if necessary, # message as non-executable. In addition, expand an object if necessary,
...@@ -1227,6 +1228,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1227,6 +1228,7 @@ class ActivityTool (Folder, UniqueObject):
subobject_list = m.getObjectList(self) subobject_list = m.getObjectList(self)
else: else:
subobject_list = (obj,) subobject_list = (obj,)
message_dict[m] = expanded_object_list = []
for subobj in subobject_list: for subobj in subobject_list:
if merge_duplicate: if merge_duplicate:
path = subobj.getPath() path = subobj.getPath()
...@@ -1243,25 +1245,23 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1243,25 +1245,23 @@ class ActivityTool (Folder, UniqueObject):
active_obj = subobj.activate(activity=activity, **activity_kw) active_obj = subobj.activate(activity=activity, **activity_kw)
getattr(active_obj, alternate_method_id)(*m.args, **m.kw) getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
else: else:
expanded_object_list.append((subobj, m.args, m.kw)) expanded_object_list.append([subobj, m.args, m.kw, None])
new_message_list.append((m, obj))
except: except:
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self) m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
expanded_object_list = sum(message_dict.itervalues(), [])
try: try:
if len(expanded_object_list) > 0: if len(expanded_object_list) > 0:
method = self.getPortalObject().unrestrictedTraverse(method_id) traverse = self.getPortalObject().unrestrictedTraverse
# FIXME: how to apply security here? # FIXME: how to apply security here?
# NOTE: expanded_object_list must be set to failed objects by the # NOTE: expanded_object_list[*][3] must be updated by the callee:
# callee. If it fully succeeds, expanded_object_list must be # it must be deleted in case of failure, or updated with the
# empty when returning. # result to post on the active process otherwise.
result = method(expanded_object_list) traverse(method_id)(expanded_object_list)
else:
result = None
except: except:
# In this case, the group method completely failed. # In this case, the group method completely failed.
exc_info = sys.exc_info() exc_info = sys.exc_info()
for m, obj in new_message_list: for m in message_dict:
m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False) m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
LOG('WARNING ActivityTool', 0, LOG('WARNING ActivityTool', 0,
'Could not call method %s on objects %s' % 'Could not call method %s on objects %s' %
...@@ -1270,22 +1270,26 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1270,22 +1270,26 @@ class ActivityTool (Folder, UniqueObject):
if error_log is not None: if error_log is not None:
error_log.raising(exc_info) error_log.raising(exc_info)
else: else:
# Obtain all indices of failed messages. # Note there can be partial failures.
# Note that this can be a partial failure. for m, expanded_object_list in message_dict.iteritems():
failed_message_set = set(id(x[2]) for x in expanded_object_list) result_list = []
# Only for succeeded messages, an activity process is invoked (if any). for result in expanded_object_list:
for m, obj in new_message_list: if len(result) != 4:
# We use id of kw dict (persistent object) to know if there is a break # message marked as failed by the group_method_id
# failed 3-tuple corresponding to Message m. elif result[3] is not None:
if id(m.kw) in failed_message_set: result_list.append(result)
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
else: else:
try: try:
m.activateResult(self, result, obj) if result_list and m.active_process:
active_process = traverse(m.active_process)
for result in result_list:
m.activateResult(active_process, result[3], result[0])
except: except:
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self) pass
else: else:
m.setExecutionState(MESSAGE_EXECUTED, context=self) m.setExecutionState(MESSAGE_EXECUTED, context=self)
continue
m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
if self.activity_tracking: if self.activity_tracking:
activity_tracking_logger.info('invoked group messages') activity_tracking_logger.info('invoked group messages')
...@@ -1294,8 +1298,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -1294,8 +1298,7 @@ class ActivityTool (Folder, UniqueObject):
def __bobo_traverse__(self, REQUEST, method_id): def __bobo_traverse__(self, REQUEST, method_id):
def group_method(message_list): def group_method(message_list):
for m in message_list: for m in message_list:
getattr(m[0], method_id)(*m[1], **m[2]) m[3] = getattr(m[0], method_id)(*m[1], **m[2])
del message_list[:]
return group_method return group_method
dummyGroupMethod = dummyGroupMethod() dummyGroupMethod = dummyGroupMethod()
......
...@@ -412,38 +412,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -412,38 +412,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
transaction.commit() transaction.commit()
portal.portal_activities.distribute() portal.portal_activities.distribute()
portal.portal_activities.tic() portal.portal_activities.tic()
self.assertEquals(self.title1,organisation.getTitle())
result = active_process.getResultList()[0]
self.assertEquals(result.method_id , 'getTitle')
self.assertEquals(result.result , self.title1)
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
def TryActiveProcessInsideActivity(self, activity):
"""
Try two levels with active_process, we create one first
activity with an active process, then this new activity
uses another active process
"""
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation._setTitle(self.title1)
def Organisation_test(self):
active_process = self.portal_activities.newActiveProcess()
self.activate(active_process=active_process).getTitle()
return active_process
Organisation.Organisation_test = Organisation_test
active_process = portal.portal_activities.newActiveProcess()
organisation.activate(activity=activity,active_process=active_process).Organisation_test()
# Needed so that the message are commited into the queue
transaction.commit()
portal.portal_activities.distribute()
portal.portal_activities.tic()
portal.portal_activities.distribute() portal.portal_activities.distribute()
portal.portal_activities.tic() portal.portal_activities.tic()
sub_active_process = active_process.getResultList()[0].result self.assertEquals(self.title1,organisation.getTitle())
LOG('TryActiveProcessInsideActivity, sub_active_process',0,sub_active_process) result = active_process.getResultList()[0]
result = sub_active_process.getResultList()[0]
self.assertEquals(result.method_id , 'getTitle') self.assertEquals(result.method_id , 'getTitle')
self.assertEquals(result.result , self.title1) self.assertEquals(result.result , self.title1)
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
...@@ -1287,42 +1259,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1287,42 +1259,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
LOG('Testing... ',0,message) LOG('Testing... ',0,message)
self.TryActiveProcess('RAMQueue') self.TryActiveProcess('RAMQueue')
def test_50_TryActiveProcessInsideActivityWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Active Process Inside Activity With SQL Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActiveProcessInsideActivity('SQLDict')
def test_51_TryActiveProcessInsideActivityWithSQLQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Active Process Inside Activity With SQL Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActiveProcessInsideActivity('SQLQueue')
def test_52_TryActiveProcessInsideActivityWithRAMDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Active Process Inside Activity With RAM Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActiveProcessInsideActivity('RAMDict')
def test_53_TryActiveProcessInsideActivityWithRAMQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Active Process Inside Activity With RAM Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActiveProcessInsideActivity('RAMQueue')
def test_54_TryAfterMethodIdWithSQLDict(self, quiet=0, run=run_all_test): def test_54_TryAfterMethodIdWithSQLDict(self, quiet=0, run=run_all_test):
# Test if after_method_id can be used # Test if after_method_id can be used
if not run: return if not run: return
...@@ -1876,13 +1812,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1876,13 +1812,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
foobar_list = [] foobar_list = []
def setFoobar(self, object_list): def setFoobar(self, object_list):
foobar_list.append(len(object_list)) foobar_list.append(len(object_list))
for obj, args, kw in object_list: for obj, args, kw, _ in object_list:
number = kw.get('number', 1) obj.foobar = getattr(obj.aq_base, 'foobar', 0) + kw.get('number', 1)
if getattr(obj,'foobar', None) is not None:
obj.foobar = obj.foobar + number
else:
obj.foobar = number
del object_list[:]
from Products.ERP5Type.Core.Folder import Folder from Products.ERP5Type.Core.Folder import Folder
Folder.setFoobar = setFoobar Folder.setFoobar = setFoobar
...@@ -2043,10 +1974,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2043,10 +1974,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
order_validation_text_list=[''], order_validation_text_list=[''],
serialization_tag_list=[''], serialization_tag_list=[''],
) )
if len(object_list) == 2: # Mark first entry as failed
# Remove one entry from object list: this is understood by caller as a del object_list[0][3]
# success for this entry.
object_list.pop()
def dummy(self): def dummy(self):
pass pass
try: try:
...@@ -2470,7 +2399,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2470,7 +2399,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
order_validation_text_list=[''], order_validation_text_list=[''],
) )
transaction.get().__class__.commit = fake_commit transaction.get().__class__.commit = fake_commit
object_list[:] = []
commit = transaction.get().__class__.commit commit = transaction.get().__class__.commit
def fake_commit(*args, **kw): def fake_commit(*args, **kw):
transaction.get().__class__.commit = commit transaction.get().__class__.commit = commit
...@@ -2523,7 +2451,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2523,7 +2451,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message_list=[pickled_message], message_list=[pickled_message],
priority_list=[1], priority_list=[1],
processing_node_list=[-2], processing_node_list=[-2],
group_method_id_list=[''], group_method_id_list=['\0'],
tag_list=[''], tag_list=[''],
order_validation_text_list=[''], order_validation_text_list=[''],
) )
...@@ -3798,9 +3726,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3798,9 +3726,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.tic() self.tic()
group_method_call_list = [] group_method_call_list = []
def doSomething(self, message_list): def doSomething(self, message_list):
group_method_call_list.append(sorted((obj.getPath(), args, kw) group_method_call_list.append(sorted((ob.getPath(), args, kw)
for obj, args, kw in message_list)) for ob, args, kw, _ in message_list))
del message_list[:]
activity_tool.__class__.doSomething = doSomething activity_tool.__class__.doSomething = doSomething
try: try:
for activity in 'SQLDict', 'SQLQueue': for activity in 'SQLDict', 'SQLQueue':
......
...@@ -799,11 +799,14 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): ...@@ -799,11 +799,14 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
def catalogObjectList(self, object_list, *args, **kw): def catalogObjectList(self, object_list, *args, **kw):
"""Catalog a list of objects""" """Catalog a list of objects"""
if type(object_list[0]) is tuple: m = object_list[0]
if type(m) is list:
tmp_object_list = [x[0] for x in object_list] tmp_object_list = [x[0] for x in object_list]
super(CatalogTool, self).catalogObjectList(tmp_object_list, **x[2]) super(CatalogTool, self).catalogObjectList(tmp_object_list, **m[2])
# keep failed objects in 'object_list' if tmp_object_list:
object_list[:] = [x for x in object_list if x[0] in tmp_object_list] for x in object_list:
if x[0] in tmp_object_list:
del object_list[3] # no result means failed
else: else:
super(CatalogTool, self).catalogObjectList(object_list, *args, **kw) super(CatalogTool, self).catalogObjectList(object_list, *args, **kw)
...@@ -812,9 +815,8 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): ...@@ -812,9 +815,8 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
"""Uncatalog a list of objects""" """Uncatalog a list of objects"""
# XXX: this is currently only a placeholder for further optimization # XXX: this is currently only a placeholder for further optimization
# (for the moment, it's not faster than the dummy group method) # (for the moment, it's not faster than the dummy group method)
for obj, args, kw in message_list: for m in message_list:
self.unindexObject(*args, **kw) self.unindexObject(*m[1], **m[2])
del message_list[:]
security.declarePrivate('unindexObject') security.declarePrivate('unindexObject')
def unindexObject(self, object=None, path=None, uid=None,sql_catalog_id=None): def unindexObject(self, object=None, path=None, uid=None,sql_catalog_id=None):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment