From d0472bc2ea9f6ebc2b31cf7ca27ee8a5e6dd1480 Mon Sep 17 00:00:00 2001
From: Vincent Pelletier <vincent@nexedi.com>
Date: Fri, 19 Jan 2018 18:15:09 +0900
Subject: [PATCH] CMFActivity: Stop deleting duplicates during
 SQLDict.distribute

Duplicate message detection is not good enough: different messages with
the same unicity value may bear different serialization_tags. This code
does not takes this into account, which can lead to deleting such tagged
message and validate an untagged one, which breaks serialization_tag
contract of preventing any further activity validation until execution
of all such-tagged validated activities is successful.
Also, it is not validation's node job to deduplicate: it can happen during
message execution without slowing down this crucial (performance-wise)
activity node.
As a result, distribute methods of SQLDict and SQLQueue can be factorised.
---
 product/CMFActivity/Activity/SQLBase.py      |  60 +++++++++++
 product/CMFActivity/Activity/SQLDict.py      |  75 --------------
 product/CMFActivity/Activity/SQLQueue.py     |  63 +-----------
 product/CMFActivity/tests/testCMFActivity.py | 100 +++++++++----------
 4 files changed, 107 insertions(+), 191 deletions(-)

diff --git a/product/CMFActivity/Activity/SQLBase.py b/product/CMFActivity/Activity/SQLBase.py
index 1b279d4124..6740279e3d 100644
--- a/product/CMFActivity/Activity/SQLBase.py
+++ b/product/CMFActivity/Activity/SQLBase.py
@@ -40,6 +40,10 @@ from Products.CMFActivity.ActivityRuntimeEnvironment import (
 from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH
 from Products.CMFActivity.Errors import ActivityFlushError
 
+# Stop validating more messages when this limit is reached
+MAX_VALIDATED_LIMIT = 1000
+# Read this many messages to validate.
+READ_MESSAGE_LIMIT = 1000
 # TODO: Limit by size in bytes instead of number of rows.
 MAX_MESSAGE_LIST_SIZE = 100
 INVOKE_ERROR_STATE = -2
@@ -315,6 +319,62 @@ class SQLBase(Queue):
     LOG(self.__class__.__name__, severity, summary,
         error=severity>INFO and sys.exc_info() or None)
 
+  def distribute(self, activity_tool, node_count):
+    assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
+    if assignMessage is None:
+      return
+    offset = 0
+    now_date = self.getNow(activity_tool)
+    validated_count = 0
+    while 1:
+      result = self._getMessageList(
+        activity_tool,
+        processing_node=-1,
+        to_date=now_date,
+        offset=offset,
+        count=READ_MESSAGE_LIMIT,
+      )
+      if not result:
+        return
+      transaction.commit()
+
+      validation_text_dict = {'none': 1}
+      message_dict = {}
+      for line in result:
+        message = Message.load(line.message, uid=line.uid, line=line)
+        if not hasattr(message, 'order_validation_text'): # BBB
+          message.order_validation_text = self.getOrderValidationText(message)
+        self.getExecutableMessageList(activity_tool, message, message_dict,
+                                      validation_text_dict, now_date=now_date)
+      if message_dict:
+        distributable_uid_set = set()
+        serialization_tag_dict = {}
+        for message in message_dict.itervalues():
+          serialization_tag = message.activity_kw.get('serialization_tag')
+          if serialization_tag is None:
+            distributable_uid_set.add(message.uid)
+          else:
+            serialization_tag_dict.setdefault(serialization_tag,
+                                              []).append(message)
+        for message_list in serialization_tag_dict.itervalues():
+          # Sort list of messages to validate the message with highest score
+          message_list.sort(key=sort_message_key)
+          distributable_uid_set.add(message_list[0].uid)
+          group_method_id = message_list[0].line.group_method_id
+          if group_method_id == '\0':
+            continue
+          for message in message_list[1:]:
+            if group_method_id == message.line.group_method_id:
+              distributable_uid_set.add(message.uid)
+        distributable_count = len(distributable_uid_set)
+        if distributable_count:
+          assignMessage(table=self.sql_table,
+            processing_node=0, uid=tuple(distributable_uid_set))
+          validated_count += distributable_count
+          if validated_count >= MAX_VALIDATED_LIMIT:
+            return
+      offset += READ_MESSAGE_LIMIT
+
   def getReservedMessageList(self, activity_tool, date, processing_node,
                              limit=None, group_method_id=None):
     """
diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py
index 316c57a44e..bcd8cb9794 100644
--- a/product/CMFActivity/Activity/SQLDict.py
+++ b/product/CMFActivity/Activity/SQLDict.py
@@ -147,78 +147,3 @@ class SQLDict(SQLBase):
       # earlier.
       return None, original_uid, [uid]
     return load
-
-  def distribute(self, activity_tool, node_count):
-    offset = 0
-    assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
-    if assignMessage is not None:
-      now_date = self.getNow(activity_tool)
-      validated_count = 0
-      while 1:
-        result = self._getMessageList(activity_tool, processing_node=-1,
-                                      to_date=now_date,
-                                      offset=offset, count=READ_MESSAGE_LIMIT)
-        if not result:
-          return
-        transaction.commit()
-
-        validation_text_dict = {'none': 1}
-        message_dict = {}
-        for line in result:
-          message = Message.load(line.message, uid=line.uid, line=line)
-          if not hasattr(message, 'order_validation_text'): # BBB
-            message.order_validation_text = self.getOrderValidationText(message)
-          self.getExecutableMessageList(activity_tool, message, message_dict,
-                                        validation_text_dict, now_date=now_date)
-
-        if message_dict:
-          message_unique_dict = {}
-          serialization_tag_dict = {}
-          distributable_uid_set = set()
-          deletable_uid_list = []
-
-          # remove duplicates
-          # SQLDict considers object_path, method_id, tag to unify activities,
-          # but ignores method arguments. They are outside of semantics.
-          for message in message_dict.itervalues():
-            message_unique_dict.setdefault(self.generateMessageUID(message),
-                                           []).append(message)
-          for message_list in message_unique_dict.itervalues():
-            if len(message_list) > 1:
-              # Sort list of duplicates to keep the message with highest score
-              message_list.sort(key=sort_message_key)
-              deletable_uid_list += [m.uid for m in message_list[1:]]
-            message = message_list[0]
-            serialization_tag = message.activity_kw.get('serialization_tag')
-            if serialization_tag is None:
-              distributable_uid_set.add(message.uid)
-            else:
-              serialization_tag_dict.setdefault(serialization_tag,
-                                                []).append(message)
-          # Don't let through if there is the same serialization tag in the
-          # message dict. If there is the same serialization tag, only one can
-          # be validated and others must wait.
-          # But messages with group_method_id are exceptions. serialization_tag
-          # does not stop validating together. Because those messages should
-          # be processed together at once.
-          for message_list in serialization_tag_dict.itervalues():
-            # Sort list of messages to validate the message with highest score
-            message_list.sort(key=sort_message_key)
-            distributable_uid_set.add(message_list[0].uid)
-            group_method_id = message_list[0].line.group_method_id
-            if group_method_id == '\0':
-              continue
-            for message in message_list[1:]:
-              if group_method_id == message.line.group_method_id:
-                distributable_uid_set.add(message.uid)
-          if deletable_uid_list:
-            activity_tool.SQLBase_delMessage(table=self.sql_table,
-                                             uid=deletable_uid_list)
-          distributable_count = len(distributable_uid_set)
-          if distributable_count:
-            assignMessage(table=self.sql_table,
-              processing_node=0, uid=tuple(distributable_uid_set))
-            validated_count += distributable_count
-            if validated_count >= MAX_VALIDATED_LIMIT:
-              return
-        offset += READ_MESSAGE_LIMIT
diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py
index 5f02c263ba..63d25d04b0 100644
--- a/product/CMFActivity/Activity/SQLQueue.py
+++ b/product/CMFActivity/Activity/SQLQueue.py
@@ -26,17 +26,7 @@
 #
 ##############################################################################
 
-from Products.CMFActivity.ActivityTool import Message
-from ZODB.POSException import ConflictError
-from SQLBase import SQLBase, sort_message_key
-from zExceptions import ExceptionFormatter
-
-import transaction
-
-# Stop validating more messages when this limit is reached
-MAX_VALIDATED_LIMIT = 1000
-# Read this many messages to validate.
-READ_MESSAGE_LIMIT = 1000
+from SQLBase import SQLBase
 
 class SQLQueue(SQLBase):
   """
@@ -46,54 +36,3 @@ class SQLQueue(SQLBase):
   """
   sql_table = 'message_queue'
   uid_group = 'portal_activity_queue'
-
-  def distribute(self, activity_tool, node_count):
-    offset = 0
-    assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
-    if assignMessage is not None:
-      now_date = self.getNow(activity_tool)
-      validated_count = 0
-      while 1:
-        result = self._getMessageList(activity_tool, processing_node=-1,
-                                      to_date=now_date, processing=0,
-                                      offset=offset, count=READ_MESSAGE_LIMIT)
-        if not result:
-          return
-        transaction.commit()
-
-        validation_text_dict = {'none': 1}
-        message_dict = {}
-        for line in result:
-          message = Message.load(line.message, uid=line.uid, line=line)
-          if not hasattr(message, 'order_validation_text'): # BBB
-            message.order_validation_text = self.getOrderValidationText(message)
-          self.getExecutableMessageList(activity_tool, message, message_dict,
-                                        validation_text_dict, now_date=now_date)
-        if message_dict:
-          distributable_uid_set = set()
-          serialization_tag_dict = {}
-          for message in message_dict.itervalues():
-            serialization_tag = message.activity_kw.get('serialization_tag')
-            if serialization_tag is None:
-              distributable_uid_set.add(message.uid)
-            else:
-              serialization_tag_dict.setdefault(serialization_tag,
-                                                []).append(message)
-          for message_list in serialization_tag_dict.itervalues():
-            # Sort list of messages to validate the message with highest score
-            message_list.sort(key=sort_message_key)
-            distributable_uid_set.add(message_list[0].uid)
-            group_method_id = message_list[0].line.group_method_id
-            if group_method_id == '\0':
-              continue
-            for message in message_list[1:]:
-              if group_method_id == message.line.group_method_id:
-                distributable_uid_set.add(message.uid)
-          distributable_count = len(distributable_uid_set)
-          if distributable_count:
-            assignMessage(table=self.sql_table,
-              processing_node=0, uid=tuple(distributable_uid_set))
-            validated_count += distributable_count
-            if validated_count >= MAX_VALIDATED_LIMIT:
-              return
-        offset += READ_MESSAGE_LIMIT
diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py
index 9b548b20c5..dfaf53bb4c 100644
--- a/product/CMFActivity/tests/testCMFActivity.py
+++ b/product/CMFActivity/tests/testCMFActivity.py
@@ -1632,65 +1632,57 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
     finally:
       del activity_tool.__class__.doSomething
 
-  def test_103_2_CheckSQLDictDeleteDuplicatesBeforeExecution(self):
-    """
-      Test that SQLDict delete the same messages before execution if messages
-      has the same method_id and path and tag.
+  def test_103_2_CheckSQLDictDoesNotDeleteDuplicatesBeforeExecution(self):
+    """
+      Test that SQLDict does not delete messages before execution
+      even if messages have the same method_id and path and tag.
+      There could be other things which differ (ex: serialization_tag) and may
+      not all be cheap to check during validation. Validation node is the only
+      non-paralelisable Zope-side task around activities, so it should be kept
+      simple.
+      Deduplication is cheap:
+      - inside the transaction which spawned duplicate activities, because it
+        has to have created activities around anyway, and can keep track
+      - inside the CMFActvitiy-level processing surrounding activity execution
+        because it has to load the activities to process them anyway
     """
     activity_tool = self.getActivityTool()
-    marker = []
-    def doSomething(self, other_tag):
-      marker.append(self.countMessage(tag=other_tag))
-    activity_tool.__class__.doSomething = doSomething
-    try:
-      # Adds two same activities.
-      activity_tool.activate(activity='SQLDict', after_tag='foo', priority=2,
-        tag='a').doSomething(other_tag='a')
-      self.commit()
-      uid1, = [x.uid for x in activity_tool.getMessageList()]
-      activity_tool.activate(activity='SQLDict', after_tag='bar', priority=1,
-        tag='a').doSomething(other_tag='a')
-      self.commit()
-      self.assertEqual(len(activity_tool.getMessageList()), 2)
-      activity_tool.distribute()
-      # After distribute, duplicate is deleted.
-      uid2, = [x.uid for x in activity_tool.getMessageList()]
-      self.assertNotEqual(uid1, uid2)
-      activity_tool.tic()
-      self.assertEqual(len(activity_tool.getMessageList()), 0)
-      self.assertEqual(marker, [1])
-    finally:
-      del activity_tool.__class__.doSomething
+    # Adds two same activities.
+    activity_tool.activate(activity='SQLDict', after_tag='foo', priority=2,
+      tag='a').getId()
+    self.commit()
+    uid1, = [x.uid for x in activity_tool.getMessageList()]
+    activity_tool.activate(activity='SQLDict', after_tag='bar', priority=1,
+      tag='a').getId()
+    self.commit()
+    uid2, = [x.uid for x in activity_tool.getMessageList() if x.uid != uid1]
+    self.assertEqual(len(activity_tool.getMessageList()), 2)
+    activity_tool.distribute()
+    # After distribute, duplicate is still present.
+    self.assertItemsEqual([uid1, uid2], [x.uid for x in activity_tool.getMessageList()])
+    activity_tool.tic()
+    self.assertEqual(len(activity_tool.getMessageList()), 0)
 
-  def test_103_3_CheckSQLJoblibDeleteDuplicatesBeforeExecution(self):
+  def test_103_3_CheckSQLJoblibDoesNotDeleteDuplicatesBeforeExecution(self):
     """
-      Test that SQLJoblib delete the same messages before execution if messages
-      has the same method_id and path and tag and signature.
+    (see test_103_2_CheckSQLDictDoesNotDeleteDuplicatesBeforeExecution)
     """
     activity_tool = self.getActivityTool()
-    marker = []
-    def doSomething(self, other_tag):
-      marker.append(self.countMessage(tag=other_tag))
-    activity_tool.__class__.doSomething = doSomething
-    try:
-      # Adds two same activities.
-      activity_tool.activate(activity='SQLJoblib', after_tag='foo', priority=2,
-        tag='a').doSomething(other_tag='a')
-      self.commit()
-      uid1, = [x.uid for x in activity_tool.getMessageList()]
-      activity_tool.activate(activity='SQLJoblib', after_tag='bar', priority=1,
-        tag='a').doSomething(other_tag='a')
-      self.commit()
-      self.assertEqual(len(activity_tool.getMessageList()), 2)
-      activity_tool.distribute()
-      # After distribute, duplicate is deleted.
-      uid2, = [x.uid for x in activity_tool.getMessageList()]
-      self.assertNotEqual(uid1, uid2)
-      activity_tool.tic()
-      self.assertEqual(len(activity_tool.getMessageList()), 0)
-      self.assertEqual(marker, [1])
-    finally:
-      del activity_tool.__class__.doSomething
+    # Adds two same activities.
+    activity_tool.activate(activity='SQLJoblib', after_tag='foo', priority=2,
+      tag='a').getId()
+    self.commit()
+    uid1, = [x.uid for x in activity_tool.getMessageList()]
+    activity_tool.activate(activity='SQLJoblib', after_tag='bar', priority=1,
+      tag='a').getId()
+    self.commit()
+    uid2, = [x.uid for x in activity_tool.getMessageList() if x.uid != uid1]
+    self.assertEqual(len(activity_tool.getMessageList()), 2)
+    activity_tool.distribute()
+    # After distribute, duplicate is still present.
+    self.assertItemsEqual([uid1, uid2], [x.uid for x in activity_tool.getMessageList()])
+    activity_tool.tic()
+    self.assertEqual(len(activity_tool.getMessageList()), 0)
 
   def test_103_4_CheckSQLDictDistributeWithSerializationTagAndGroupMethodId(
       self):
@@ -2279,7 +2271,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
                             for message in result
                             if (message.processing_node==-1 and
                                 message.serialization_tag=='test_115')]),
-                       3)
+                       4)
 
       self.assertEqual(len([message
                             for message in result
-- 
2.30.9