From 84a277f6b324ba5b3a62f63a9e5a60afe2dd497e Mon Sep 17 00:00:00 2001
From: Julien Muchembled <jm@nexedi.com>
Date: Tue, 2 Aug 2011 20:19:51 +0200
Subject: [PATCH] CMFActivity: review saving of activity result on active
 process

- reduce probability and cost of ConflictError by posting the
  activity result by activity
- do not post None (to force it, it's still possible to forge an
  ActiveResult object containing the None value)
- allow grouped messages to post results (previous implementation was not
  working, and this required to change the API of grouping methods)
---
 product/CMFActivity/ActivityTool.py          | 81 ++++++++---------
 product/CMFActivity/tests/testCMFActivity.py | 91 ++------------------
 product/ERP5Catalog/CatalogTool.py           | 16 ++--
 3 files changed, 60 insertions(+), 128 deletions(-)

diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py
index 179f3e4129..f8f8033a93 100644
--- a/product/CMFActivity/ActivityTool.py
+++ b/product/CMFActivity/ActivityTool.py
@@ -255,19 +255,18 @@ class Message(BaseMessage):
       noSecurityManager()
     return user
 
-  def activateResult(self, activity_tool, result, object):
-    if self.active_process is not None:
-      active_process = activity_tool.unrestrictedTraverse(self.active_process)
-      if isinstance(result, ActiveResult):
-        result.edit(object_path=object)
-        result.edit(method_id=self.method_id)
-        # XXX Allow other method_id in future
-        active_process.activateResult(result)
-      else:
-        active_process.activateResult(
-                    ActiveResult(object_path=object,
-                                 method_id=self.method_id,
-                                 result=result)) # XXX Allow other method_id in future
+  def activateResult(self, active_process, result, object):
+    if not isinstance(result, ActiveResult):
+      result = ActiveResult(result=result)
+    # XXX Allow other method_id in future
+    result.edit(object_path=object, method_id=self.method_id)
+    kw = self.activity_kw
+    kw = dict((k, kw[k]) for k in ('priority', 'tag') if k in kw)
+    # Save result in a separate activity to reduce
+    # probability and cost of conflict error.
+    active_process.activate(activity='SQLQueue',
+      group_method_id=None, # dummy group method
+      **kw).activateResult(result)
 
   def __call__(self, activity_tool):
     try:
@@ -305,7 +304,10 @@ class Message(BaseMessage):
           setSecurityManager(old_security_manager)
 
         if method is not None:
-          self.activateResult(activity_tool, result, obj)
+          if self.active_process and result is not None:
+            self.activateResult(
+              activity_tool.unrestrictedTraverse(self.active_process),
+              result, obj)
           self.setExecutionState(MESSAGE_EXECUTED)
       except:
         self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
@@ -1204,8 +1206,7 @@ class ActivityTool (Folder, UniqueObject):
           'invoking group messages: method_id=%s, paths=%s'
           % (method_id, ['/'.join(m.object_path) for m in message_list]))
       # Invoke a group method.
-      expanded_object_list = []
-      new_message_list = []
+      message_dict = {}
       path_set = set()
       # Filter the list of messages. If an object is not available, mark its
       # message as non-executable. In addition, expand an object if necessary,
@@ -1227,6 +1228,7 @@ class ActivityTool (Folder, UniqueObject):
             subobject_list = m.getObjectList(self)
           else:
             subobject_list = (obj,)
+          message_dict[m] = expanded_object_list = []
           for subobj in subobject_list:
             if merge_duplicate:
               path = subobj.getPath()
@@ -1243,25 +1245,23 @@ class ActivityTool (Folder, UniqueObject):
               active_obj = subobj.activate(activity=activity, **activity_kw)
               getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
             else:
-              expanded_object_list.append((subobj, m.args, m.kw))
-          new_message_list.append((m, obj))
+              expanded_object_list.append([subobj, m.args, m.kw, None])
         except:
           m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
 
+      expanded_object_list = sum(message_dict.itervalues(), [])
       try:
         if len(expanded_object_list) > 0:
-          method = self.getPortalObject().unrestrictedTraverse(method_id)
+          traverse = self.getPortalObject().unrestrictedTraverse
           # FIXME: how to apply security here?
-          # NOTE: expanded_object_list must be set to failed objects by the
-          #       callee. If it fully succeeds, expanded_object_list must be
-          #       empty when returning.
-          result = method(expanded_object_list)
-        else:
-          result = None
+          # NOTE: expanded_object_list[*][3] must be updated by the callee:
+          #       it must be deleted in case of failure, or updated with the
+          #       result to post on the active process otherwise.
+          traverse(method_id)(expanded_object_list)
       except:
         # In this case, the group method completely failed.
         exc_info = sys.exc_info()
-        for m, obj in new_message_list:
+        for m in message_dict:
           m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
         LOG('WARNING ActivityTool', 0,
             'Could not call method %s on objects %s' %
@@ -1270,22 +1270,26 @@ class ActivityTool (Folder, UniqueObject):
         if error_log is not None:
           error_log.raising(exc_info)
       else:
-        # Obtain all indices of failed messages.
-        # Note that this can be a partial failure.
-        failed_message_set = set(id(x[2]) for x in expanded_object_list)
-        # Only for succeeded messages, an activity process is invoked (if any).
-        for m, obj in new_message_list:
-          # We use id of kw dict (persistent object) to know if there is a
-          # failed 3-tuple corresponding to Message m.
-          if id(m.kw) in failed_message_set:
-            m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
+        # Note there can be partial failures.
+        for m, expanded_object_list in message_dict.iteritems():
+          result_list = []
+          for result in expanded_object_list:
+            if len(result) != 4:
+              break # message marked as failed by the group_method_id
+            elif result[3] is not None:
+              result_list.append(result)
           else:
             try:
-              m.activateResult(self, result, obj)
+              if result_list and m.active_process:
+                active_process = traverse(m.active_process)
+                for result in result_list:
+                  m.activateResult(active_process, result[3], result[0])
             except:
-              m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
+              pass
             else:
               m.setExecutionState(MESSAGE_EXECUTED, context=self)
+              continue
+          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
       if self.activity_tracking:
         activity_tracking_logger.info('invoked group messages')
 
@@ -1294,8 +1298,7 @@ class ActivityTool (Folder, UniqueObject):
       def __bobo_traverse__(self, REQUEST, method_id):
         def group_method(message_list):
           for m in message_list:
-            getattr(m[0], method_id)(*m[1], **m[2])
-          del message_list[:]
+            m[3] = getattr(m[0], method_id)(*m[1], **m[2])
         return group_method
     dummyGroupMethod = dummyGroupMethod()
 
diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py
index 0cc5cea42c..c391f416c5 100644
--- a/product/CMFActivity/tests/testCMFActivity.py
+++ b/product/CMFActivity/tests/testCMFActivity.py
@@ -412,38 +412,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
     transaction.commit()
     portal.portal_activities.distribute()
     portal.portal_activities.tic()
-    self.assertEquals(self.title1,organisation.getTitle())
-    result = active_process.getResultList()[0]
-    self.assertEquals(result.method_id , 'getTitle')
-    self.assertEquals(result.result , self.title1)
-    message_list = portal.portal_activities.getMessageList()
-    self.assertEquals(len(message_list),0)
-
-  def TryActiveProcessInsideActivity(self, activity):
-    """
-    Try two levels with active_process, we create one first
-    activity with an active process, then this new activity
-    uses another active process
-    """
-    portal = self.getPortal()
-    organisation =  portal.organisation._getOb(self.company_id)
-    organisation._setTitle(self.title1)
-    def Organisation_test(self):
-      active_process = self.portal_activities.newActiveProcess()
-      self.activate(active_process=active_process).getTitle()
-      return active_process
-    Organisation.Organisation_test = Organisation_test
-    active_process = portal.portal_activities.newActiveProcess()
-    organisation.activate(activity=activity,active_process=active_process).Organisation_test()
-    # Needed so that the message are commited into the queue
-    transaction.commit()
-    portal.portal_activities.distribute()
-    portal.portal_activities.tic()
     portal.portal_activities.distribute()
     portal.portal_activities.tic()
-    sub_active_process = active_process.getResultList()[0].result
-    LOG('TryActiveProcessInsideActivity, sub_active_process',0,sub_active_process)
-    result = sub_active_process.getResultList()[0]
+    self.assertEquals(self.title1,organisation.getTitle())
+    result = active_process.getResultList()[0]
     self.assertEquals(result.method_id , 'getTitle')
     self.assertEquals(result.result , self.title1)
     message_list = portal.portal_activities.getMessageList()
@@ -1287,42 +1259,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
       LOG('Testing... ',0,message)
     self.TryActiveProcess('RAMQueue')
 
-  def test_50_TryActiveProcessInsideActivityWithSQLDict(self, quiet=0, run=run_all_test):
-    # Test if we call methods only once
-    if not run: return
-    if not quiet:
-      message = '\nTry Active Process Inside Activity With SQL Dict '
-      ZopeTestCase._print(message)
-      LOG('Testing... ',0,message)
-    self.TryActiveProcessInsideActivity('SQLDict')
-
-  def test_51_TryActiveProcessInsideActivityWithSQLQueue(self, quiet=0, run=run_all_test):
-    # Test if we call methods only once
-    if not run: return
-    if not quiet:
-      message = '\nTry Active Process Inside Activity With SQL Queue '
-      ZopeTestCase._print(message)
-      LOG('Testing... ',0,message)
-    self.TryActiveProcessInsideActivity('SQLQueue')
-
-  def test_52_TryActiveProcessInsideActivityWithRAMDict(self, quiet=0, run=run_all_test):
-    # Test if we call methods only once
-    if not run: return
-    if not quiet:
-      message = '\nTry Active Process Inside Activity With RAM Dict '
-      ZopeTestCase._print(message)
-      LOG('Testing... ',0,message)
-    self.TryActiveProcessInsideActivity('RAMDict')
-
-  def test_53_TryActiveProcessInsideActivityWithRAMQueue(self, quiet=0, run=run_all_test):
-    # Test if we call methods only once
-    if not run: return
-    if not quiet:
-      message = '\nTry Active Process Inside Activity With RAM Queue '
-      ZopeTestCase._print(message)
-      LOG('Testing... ',0,message)
-    self.TryActiveProcessInsideActivity('RAMQueue')
-
   def test_54_TryAfterMethodIdWithSQLDict(self, quiet=0, run=run_all_test):
     # Test if after_method_id can be used
     if not run: return
@@ -1876,13 +1812,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
     foobar_list = []
     def setFoobar(self, object_list):
       foobar_list.append(len(object_list))
-      for obj, args, kw in object_list:
-        number = kw.get('number', 1)
-        if getattr(obj,'foobar', None) is not None:
-          obj.foobar = obj.foobar + number
-        else:
-          obj.foobar = number
-      del object_list[:]
+      for obj, args, kw, _ in object_list:
+        obj.foobar = getattr(obj.aq_base, 'foobar', 0) + kw.get('number', 1)
     from Products.ERP5Type.Core.Folder import Folder
     Folder.setFoobar = setFoobar    
 
@@ -2043,10 +1974,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
           order_validation_text_list=[''],
           serialization_tag_list=[''],
           )
-      if len(object_list) == 2:
-        # Remove one entry from object list: this is understood by caller as a
-        # success for this entry.
-        object_list.pop()
+      # Mark first entry as failed
+      del object_list[0][3]
     def dummy(self):
       pass
     try:
@@ -2470,7 +2399,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
           order_validation_text_list=[''],
           )
       transaction.get().__class__.commit = fake_commit
-      object_list[:] = []
     commit = transaction.get().__class__.commit
     def fake_commit(*args, **kw):
       transaction.get().__class__.commit = commit
@@ -2523,7 +2451,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
           message_list=[pickled_message],
           priority_list=[1],
           processing_node_list=[-2],
-          group_method_id_list=[''],
+          group_method_id_list=['\0'],
           tag_list=[''],
           order_validation_text_list=[''],
          )
@@ -3798,9 +3726,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
     self.tic()
     group_method_call_list = []
     def doSomething(self, message_list):
-      group_method_call_list.append(sorted((obj.getPath(), args, kw)
-                                           for obj, args, kw in message_list))
-      del message_list[:]
+      group_method_call_list.append(sorted((ob.getPath(), args, kw)
+                                           for ob, args, kw, _ in message_list))
     activity_tool.__class__.doSomething = doSomething
     try:
       for activity in 'SQLDict', 'SQLQueue':
diff --git a/product/ERP5Catalog/CatalogTool.py b/product/ERP5Catalog/CatalogTool.py
index 89cfc705ba..762a4e91dd 100644
--- a/product/ERP5Catalog/CatalogTool.py
+++ b/product/ERP5Catalog/CatalogTool.py
@@ -799,11 +799,14 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
 
     def catalogObjectList(self, object_list, *args, **kw):
         """Catalog a list of objects"""
-        if type(object_list[0]) is tuple:
+        m = object_list[0]
+        if type(m) is list:
           tmp_object_list = [x[0] for x in object_list]
-          super(CatalogTool, self).catalogObjectList(tmp_object_list, **x[2])
-          # keep failed objects in 'object_list'
-          object_list[:] = [x for x in object_list if x[0] in tmp_object_list]
+          super(CatalogTool, self).catalogObjectList(tmp_object_list, **m[2])
+          if tmp_object_list:
+            for x in object_list:
+              if x[0] in tmp_object_list:
+                del object_list[3] # no result means failed
         else:
           super(CatalogTool, self).catalogObjectList(object_list, *args, **kw)
 
@@ -812,9 +815,8 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
       """Uncatalog a list of objects"""
       # XXX: this is currently only a placeholder for further optimization
       #      (for the moment, it's not faster than the dummy group method)
-      for obj, args, kw in message_list:
-        self.unindexObject(*args, **kw)
-      del message_list[:]
+      for m in message_list:
+        self.unindexObject(*m[1], **m[2])
 
     security.declarePrivate('unindexObject')
     def unindexObject(self, object=None, path=None, uid=None,sql_catalog_id=None):
-- 
2.30.9