From 3759047de62b8fb0da1cc2260e6c82ef283792a1 Mon Sep 17 00:00:00 2001
From: Julien Muchembled <jm@nexedi.com>
Date: Thu, 7 May 2015 23:21:06 +0200
Subject: [PATCH] searchAndActivate: use a grouping method instead of
 callMethodOnObjectList

The problem with callMethodOnObjectList is that when an object can't be
processed, all other objects of the same group fail without any chance to
be retried separately.

Grouping is configurable with usual CMFActivity parameters in new 'group_kw'
parameter, to avoid any conflict with catalog parameters (**kw).
'packet_size' and 'activity_count' are still accepted for backward
compatibility.
---
 .../erp5_upgrader/Alarm_runFullUpgrader.xml   |  2 +-
 .../Alarm_senseFullUpgradeNeed.xml            |  2 +-
 .../test.erp5.testUpgrader.py                 | 33 +++------
 product/ERP5Catalog/CatalogTool.py            | 71 ++++++++++++++-----
 4 files changed, 65 insertions(+), 43 deletions(-)

diff --git a/bt5/erp5_upgrader/SkinTemplateItem/portal_skins/erp5_upgrader/Alarm_runFullUpgrader.xml b/bt5/erp5_upgrader/SkinTemplateItem/portal_skins/erp5_upgrader/Alarm_runFullUpgrader.xml
index 4fef0ff0fb..95badfa06e 100644
--- a/bt5/erp5_upgrader/SkinTemplateItem/portal_skins/erp5_upgrader/Alarm_runFullUpgrader.xml
+++ b/bt5/erp5_upgrader/SkinTemplateItem/portal_skins/erp5_upgrader/Alarm_runFullUpgrader.xml
@@ -77,7 +77,7 @@ previous_method_id = launchUpgraderAlarm(\'upgrader_check_pre_upgrade\')\n
 \n
 previous_method_id.extend([\'recursiveImmediateReindexObject\',\n
                            \'immediateReindexObject\',\n
-                           \'callMethodOnObjectList\'])\n
+                           \'Base_postCheckConsistencyResult\'])\n
 \n
 previous_method_id = launchUpgraderAlarm(\'upgrader_check_upgrader\',\n
                                    after_method_id=previous_method_id)\n
diff --git a/bt5/erp5_upgrader/SkinTemplateItem/portal_skins/erp5_upgrader/Alarm_senseFullUpgradeNeed.xml b/bt5/erp5_upgrader/SkinTemplateItem/portal_skins/erp5_upgrader/Alarm_senseFullUpgradeNeed.xml
index 2b4c6d1927..0e41d3c27f 100644
--- a/bt5/erp5_upgrader/SkinTemplateItem/portal_skins/erp5_upgrader/Alarm_senseFullUpgradeNeed.xml
+++ b/bt5/erp5_upgrader/SkinTemplateItem/portal_skins/erp5_upgrader/Alarm_senseFullUpgradeNeed.xml
@@ -58,7 +58,7 @@
 portal = context.getPortalObject()\n
 portal_alarms = portal.portal_alarms\n
 \n
-after_method_id = \'callMethodOnObjectList\'\n
+after_method_id = \'Base_postCheckConsistencyResult\'\n
 def launchSenseAlarm(alarm_id, after_tag=[]):\n
   """ Get the alarm and use sense"""\n
   upgrader_alarm = getattr(portal_alarms, alarm_id, None)\n
diff --git a/bt5/erp5_upgrader/TestTemplateItem/portal_components/test.erp5.testUpgrader.py b/bt5/erp5_upgrader/TestTemplateItem/portal_components/test.erp5.testUpgrader.py
index 1c81d4d3ee..7b8ab3362a 100644
--- a/bt5/erp5_upgrader/TestTemplateItem/portal_components/test.erp5.testUpgrader.py
+++ b/bt5/erp5_upgrader/TestTemplateItem/portal_components/test.erp5.testUpgrader.py
@@ -27,7 +27,6 @@
 ##############################################################################
 
 import re
-import transaction
 from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
 from Products.ERP5.Tool.TemplateTool import BusinessTemplateUnknownError
 from Products.ERP5Type.tests.Sequence import SequenceList
@@ -312,21 +311,16 @@ class TestUpgrader(ERP5TypeTestCase):
       self.portal.portal_templates.getInstalledBusinessTemplateTitleList())
 
   def stepCheckNoActivitiesCreated(self, sequence=None):
-    transaction.commit()
     portal_activities = self.getActivityTool()
-    message = portal_activities.getMessageList()[0]
+    message, = portal_activities.getMessageList()
     self.assertEqual(message.method_id, "Alarm_runUpgrader")
-    portal_templates = self.getTemplateTool()
-    title_list = portal_templates.getInstalledBusinessTemplateTitleList()
-    self.assertTrue('erp5_web' not in title_list,
-      "%s found in %s" % ('erp5_web', title_list))
+    getTitleList = self.getTemplateTool().getInstalledBusinessTemplateTitleList
+    self.assertNotIn('erp5_web', getTitleList())
     portal_activities.manageInvoke(message.object_path, message.method_id)
-    title_list = portal_templates.getInstalledBusinessTemplateTitleList()
-    self.assertTrue('erp5_web' in title_list,
-      "%s not found in %s" % ('erp5_web', title_list))
-    transaction.commit()
-    message_list = set([i.method_id for i in portal_activities.getMessageList()])
-    self.assertTrue('callMethodOnObjectList' not in message_list)
+    self.assertIn('erp5_web', getTitleList())
+    self.commit()
+    self.assertEqual({'immediateReindexObject', 'unindexObject'},
+      {x.method_id for x in portal_activities.getMessageList()})
 
   def stepCreateBigIncosistentData(self, sequence=None):
     for _ in range(101):
@@ -341,18 +335,13 @@ class TestUpgrader(ERP5TypeTestCase):
         title="org_%s" % self.portal.organisation_module.getLastId())
 
   def stepCheckActivitiesCreated(self, sequence=None):
-    transaction.commit()
     portal_activities = self.getActivityTool()
-    message = portal_activities.getMessageList()[0]
+    message, = portal_activities.getMessageList()
     self.assertEqual(message.method_id, "Alarm_runUpgrader")
     portal_activities.manageInvoke(message.object_path, message.method_id)
-    transaction.commit()
-    message_list = portal_activities.getMessageList()
-    method_id_list = set([i.method_id for i in message_list])
-    self.assertTrue('callMethodOnObjectList' in method_id_list)
-    for message in message_list:
-      if message.method_id == 'callMethodOnObjectList':
-        self.assertEqual(message.args[-1], 'Base_postCheckConsistencyResult')
+    self.commit()
+    self.assertIn('Base_postCheckConsistencyResult',
+      {x.method_id for x in portal_activities.getMessageList()})
 
   def stepUninstallERP5UpgraderTestBT(self, sequence=None):
     bt5 = self.portal.portal_templates.getInstalledBusinessTemplate('erp5_web')
diff --git a/product/ERP5Catalog/CatalogTool.py b/product/ERP5Catalog/CatalogTool.py
index cd097a888e..db516f3bc8 100644
--- a/product/ERP5Catalog/CatalogTool.py
+++ b/product/ERP5Catalog/CatalogTool.py
@@ -29,6 +29,7 @@
 import sys
 from copy import deepcopy
 from collections import defaultdict
+from math import ceil
 from Products.CMFCore.CatalogTool import CatalogTool as CMFCoreCatalogTool
 from Products.ZSQLCatalog.ZSQLCatalog import ZCatalog
 from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery, SimpleQuery
@@ -972,22 +973,49 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
       return result
 
     def _searchAndActivate(self, method_id, method_args=(), method_kw={},
-                           activate_kw={}, min_uid=None, **kw):
+                           activate_kw={}, min_uid=None, group_kw={}, **kw):
       """Search the catalog and run a script by activity on all found objects
 
-      This method is configurable (via 'packet_size' & 'activity_count'
-      parameters) so that it can work efficiently with databases of any size.
+      In order to not generate too many activities, this method limits the
+      number of rows to fetch from the catalog, and if the catalog would return
+      more results, it resumes by calling itself by activity.
+
+      'activate_kw' is for common activate parameters between all generated
+      activities and is usually used for priority and dependencies.
+
+      Common usage is to call this method without 'select_method_id'.
+      In this case, found objects are processed via a CMFActivity grouping,
+      and this can be configured via 'group_kw', for additional parameters to
+      pass to CMFActivity (in particular: 'activity' and 'group_method_*').
+      A generic grouping method is used if none is given.
+      group_method_cost default to 30 objects per packet.
+
       'select_method_id', if provided, will be called with partial catalog
       results and returned value will be provided to the callable identified by
       'method_id' (which will no longer be invoked in the context of a given
       document returned by catalog) as first positional argument.
+      Use 'packet_size' parameter to limit the size of each group (default: 30).
 
-      'activate_kw' may specify an active process to collect results.
+      'activity_count' parameter is deprecated.
+      Its value should be hardcoded because CMFActivity can now handle many
+      activities efficiently and any tweak should benefit to everyone.
+      However, there are still rare cases where one want to limit the number
+      of processing nodes, to minimize latency of high-priority activities.
       """
-      catalog_kw = dict(kw)
-      packet_size = catalog_kw.pop('packet_size', 30)
-      limit = packet_size * catalog_kw.pop('activity_count', 100)
+      catalog_kw = kw.copy()
       select_method_id = catalog_kw.pop('select_method_id', None)
+      if select_method_id:
+        packet_size = catalog_kw.pop('packet_size', 30)
+        limit = packet_size * catalog_kw.pop('activity_count', 100)
+      elif 'packet_size' in catalog_kw: # BBB
+        assert not group_kw, (kw, group_kw)
+        packet_size = catalog_kw.pop('packet_size')
+        group_method_cost = 1. / packet_size
+        limit = packet_size * catalog_kw.pop('activity_count', 100)
+      else:
+        group_method_cost = group_kw.get('group_method_cost', .034) # 30 objects
+        limit = catalog_kw.pop('activity_count', None) or \
+          100 * int(ceil(1 / group_method_cost))
       if min_uid:
         catalog_kw['min_uid'] = SimpleQuery(uid=min_uid,
                                             comparison_operator='>')
@@ -999,23 +1027,28 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
       result_count = len(r)
       if result_count:
         if result_count == limit:
-          next_kw = dict(activate_kw, priority=1+activate_kw.get('priority', 1))
+          next_kw = activate_kw.copy()
+          next_kw['priority'] = 1 + next_kw.get('priority', 1)
           self.activate(activity='SQLQueue', **next_kw) \
               ._searchAndActivate(method_id,method_args, method_kw,
                                   activate_kw, r[-1].getUid(), **kw)
-        portal_activities = self.getPortalObject().portal_activities
-        active_portal_activities = portal_activities.activate(
-          activity='SQLQueue', **activate_kw)
-        if select_method_id is None:
-          r = [x.getPath() for x in r]
-          r.sort()
-          activate = active_portal_activities.callMethodOnObjectList
-          method_args = (method_id, ) + method_args
-        else:
+        if select_method_id:
+          portal_activities = self.getPortalObject().portal_activities
+          active_portal_activities = portal_activities.activate(
+            activity='SQLQueue', **activate_kw)
           r = getattr(portal_activities, select_method_id)(r)
           activate = getattr(active_portal_activities, method_id)
-        for i in xrange(0, result_count, packet_size):
-          activate(r[i:i+packet_size], *method_args, **method_kw)
+          for i in xrange(0, result_count, packet_size):
+            activate(r[i:i+packet_size], *method_args, **method_kw)
+        else:
+          kw = activate_kw.copy()
+          kw['activity'] = 'SQLQueue'
+          if group_method_cost < 1:
+            kw['group_method_cost'] = group_method_cost
+            kw['group_method_id'] = None
+            kw.update(group_kw)
+          for r in r:
+            getattr(r.activate(**kw), method_id)(*method_args, **method_kw)
 
     security.declarePublic('searchAndActivate')
     def searchAndActivate(self, *args, **kw):
-- 
2.30.9