From 4fa53cbec24f6ba9b22d09cff6bf1ed3786eb162 Mon Sep 17 00:00:00 2001
From: Vincent Pelletier <vincent@nexedi.com>
Date: Wed, 20 Feb 2008 13:49:25 +0000
Subject: [PATCH] Add support for serialization_tag.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@19431 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 product/CMFActivity/Activity/SQLDict.py       | 13 ++++-
 product/CMFActivity/Activity/SQLQueue.py      | 15 +++--
 .../activity/SQLDict_createMessageTable.zsql  |  1 +
 .../activity/SQLDict_validateMessageList.zsql |  5 ++
 .../activity/SQLDict_writeMessageList.zsql    |  4 +-
 .../activity/SQLQueue_createMessageTable.zsql |  1 +
 .../SQLQueue_validateMessageList.zsql         |  5 ++
 .../skins/activity/SQLQueue_writeMessage.zsql |  5 +-
 product/CMFActivity/tests/testCMFActivity.py  | 58 +++++++++++++++++++
 9 files changed, 98 insertions(+), 9 deletions(-)

diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py
index 0dbb6b5d47..e55d05bd07 100644
--- a/product/CMFActivity/Activity/SQLDict.py
+++ b/product/CMFActivity/Activity/SQLDict.py
@@ -77,6 +77,7 @@ class SQLDict(RAMDict, SQLBase):
       group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
                               for message in registered_message_list]
       tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
+      serialization_tag_list = [message.activity_kw.get('serialization_tag', '') for message in registered_message_list]
       order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
       uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', 
                    id_count=len(registered_message_list), store=0)
@@ -88,6 +89,7 @@ class SQLDict(RAMDict, SQLBase):
                                               date_list = date_list,
                                               group_method_id_list = group_method_id_list,
                                               tag_list = tag_list,
+                                              serialization_tag_list = serialization_tag_list,
                                               order_validation_text_list = order_validation_text_list)
 
   def prepareDeleteMessage(self, activity_tool, m):
@@ -645,7 +647,8 @@ class SQLDict(RAMDict, SQLBase):
       #LOG('SQLDict.distribute', INFO, '%0.4fs : %i messages => %i distributables' % (TIME_end - TIME_begin, offset - READ_MESSAGE_LIMIT + len(result), validated_count))
 
   # Validation private methods
-  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
+  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
+                serialization_tag=None):
     if isinstance(method_id, str):
       method_id = [method_id]
     if isinstance(path, str):
@@ -653,12 +656,13 @@ class SQLDict(RAMDict, SQLBase):
     if isinstance(tag, str):
       tag = [tag]
 
-    if method_id or message_uid or path or tag:
+    if method_id or message_uid or path or tag or serialization_tag:
       validateMessageList = activity_tool.SQLDict_validateMessageList
       result = validateMessageList(method_id=method_id,
                                    message_uid=message_uid,
                                    path=path,
-                                   tag=tag)
+                                   tag=tag,
+                                   serialization_tag=serialization_tag)
       message_list = []
       for line in result:
         m = self.loadMessage(line.message,
@@ -698,6 +702,9 @@ class SQLDict(RAMDict, SQLBase):
       return []
     return self._validate(activity_tool, tag=value[0], method_id=value[1])
 
+  def _validate_serialization_tag(self, activity_tool, message, value):
+    return self._validate(activity_tool, serialization_tag=value)
+
   def countMessage(self, activity_tool, tag=None, path=None,
                    method_id=None, message_uid=None, **kw):
     """Return the number of messages which match the given parameters.
diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py
index 4a22350f77..30e6df3ffe 100644
--- a/product/CMFActivity/Activity/SQLQueue.py
+++ b/product/CMFActivity/Activity/SQLQueue.py
@@ -72,13 +72,15 @@ class SQLQueue(RAMQueue, SQLBase):
       if date is None:
         date = self.getNow(activity_tool)
       tag = m.activity_kw.get('tag', '')
+      serialization_tag = m.activity_kw.get('serialization_tag', '')
       activity_tool.SQLQueue_writeMessage(uid=uid,
                                           path=path,
                                           method_id=method_id,
                                           priority=priority,
                                           message=self.dumpMessage(m),
                                           date=date,
-                                          tag=tag)
+                                          tag=tag,
+                                          serialization_tag=serialization_tag)
 
   def prepareDeleteMessage(self, activity_tool, m):
     # Erase all messages in a single transaction
@@ -488,7 +490,8 @@ class SQLQueue(RAMQueue, SQLBase):
       #LOG('SQLQueue.distribute', INFO, '%0.4fs : %i messages => %i distributables' % (TIME_end - TIME_begin, offset + len(result), validated_count))
 
   # Validation private methods
-  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
+  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
+                serialization_tag=None):
     if isinstance(method_id, str):
       method_id = [method_id]
     if isinstance(path, str):
@@ -496,12 +499,13 @@ class SQLQueue(RAMQueue, SQLBase):
     if isinstance(tag, str):
       tag = [tag]
 
-    if method_id or message_uid or path or tag:
+    if method_id or message_uid or path or tag or serialization_tag:
       validateMessageList = activity_tool.SQLQueue_validateMessageList
       result = validateMessageList(method_id=method_id,
                                    message_uid=message_uid,
                                    path=path,
-                                   tag=tag)
+                                   tag=tag,
+                                   serialization_tag=serialization_tag)
       message_list = []
       for line in result:
         m = self.loadMessage(line.message,
@@ -540,6 +544,9 @@ class SQLQueue(RAMQueue, SQLBase):
       return []
     return self._validate(activity_tool, tag=value[0], method_id=value[1])
 
+  def _validate_serialization_tag(self, activity_tool, message, value):
+    return self._validate(activity_tool, serialization_tag=value)
+
   # Required for tests (time shift)
   def timeShift(self, activity_tool, delay, processing_node = None):
     """
diff --git a/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql b/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
index 4d7b79ccfb..259d52c253 100644
--- a/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
+++ b/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
@@ -19,6 +19,7 @@ CREATE TABLE `message` (
   `priority` TINYINT NOT NULL DEFAULT 0,
   `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
   `tag` VARCHAR(255) NOT NULL,
+  `serialization_tag` VARCHAR(255) NOT NULL,
   `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
   `order_validation_text` VARCHAR(255) NOT NULL,
   `message` LONGBLOB NOT NULL,
diff --git a/product/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql b/product/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql
index 88e2d57494..eab48ec569 100644
--- a/product/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql
+++ b/product/CMFActivity/skins/activity/SQLDict_validateMessageList.zsql
@@ -12,6 +12,7 @@ message_uid
 path
 tag
 count
+serialization_tag
 </params>
 SELECT
 <dtml-if count>
@@ -39,3 +40,7 @@ WHERE
 <dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
     )
 </dtml-if>
+<dtml-if serialization_tag>
+    AND processing_node > -1
+    AND serialization_tag LIKE <dtml-sqlvar serialization_tag type="string">
+</dtml-if>
diff --git a/product/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql b/product/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql
index c615b389de..a402c7e5d4 100644
--- a/product/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql
+++ b/product/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql
@@ -16,9 +16,10 @@ date_list
 processing_node_list
 group_method_id_list
 tag_list
+serialization_tag_list
 order_validation_text_list</params>
 INSERT INTO message
-(uid, path, date, method_id, processing_node, processing, priority, group_method_id, tag, order_validation_text, message)
+(uid, path, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, order_validation_text, message)
 VALUES
 <dtml-in prefix="loop" expr="_.range(_.len(path_list))">
 <dtml-if sequence-start><dtml-else>,</dtml-if>
@@ -32,6 +33,7 @@ VALUES
   <dtml-sqlvar expr="priority_list[loop_item]" type="int">,
   <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
   <dtml-sqlvar expr="tag_list[loop_item]" type="string">,
+  <dtml-sqlvar expr="serialization_tag_list[loop_item]" type="string">,
   <dtml-sqlvar expr="order_validation_text_list[loop_item]" type="string">,
   <dtml-sqlvar expr="message_list[loop_item]" type="string">
 )
diff --git a/product/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql b/product/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
index a860902ac2..2e30136bdf 100644
--- a/product/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
+++ b/product/CMFActivity/skins/activity/SQLQueue_createMessageTable.zsql
@@ -18,6 +18,7 @@ CREATE TABLE `message_queue` (
   `processing_date` DATETIME,
   `priority` TINYINT NOT NULL DEFAULT 0,
   `tag` VARCHAR(255) NOT NULL,
+  `serialization_tag` VARCHAR(255) NOT NULL,
   `message` LONGBLOB NOT NULL,
   PRIMARY KEY (`uid`),
   KEY (`path`),
diff --git a/product/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql b/product/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql
index 8be7dc4e1c..3ed63d9761 100644
--- a/product/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql
+++ b/product/CMFActivity/skins/activity/SQLQueue_validateMessageList.zsql
@@ -12,6 +12,7 @@ message_uid
 path
 tag
 count
+serialization_tag
 </params>
 SELECT
 <dtml-if count>
@@ -39,3 +40,7 @@ WHERE
 <dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
     )
 </dtml-if>
+<dtml-if serialization_tag>
+    AND processing_node > -1
+    AND serialization_tag LIKE <dtml-sqlvar serialization_tag type="string">
+</dtml-if>
diff --git a/product/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql b/product/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql
index e2db5f1cd7..aa062b861d 100644
--- a/product/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql
+++ b/product/CMFActivity/skins/activity/SQLQueue_writeMessage.zsql
@@ -14,7 +14,9 @@ message
 priority
 processing_node
 date
-tag</params>
+tag
+serialization_tag
+</params>
 INSERT INTO message_queue
 SET
   uid = <dtml-sqlvar uid type="int">,
@@ -27,4 +29,5 @@ SET
   processing = 0,
   priority = <dtml-sqlvar priority type="int">,
   tag = <dtml-sqlvar tag type="string">,
+  serialization_tag = <dtml-sqlvar serialization_tag type="string">,
   message = <dtml-sqlvar message type="string">
diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py
index ecef18b613..d89345eedd 100644
--- a/product/CMFActivity/tests/testCMFActivity.py
+++ b/product/CMFActivity/tests/testCMFActivity.py
@@ -2735,6 +2735,64 @@ class TestCMFActivity(ERP5TypeTestCase):
       LOG('Testing... ',0,message)
     self.CheckActivityRuntimeEnvironment('SQLQueue')
 
+  def CheckSerializationTag(self, activity):
+    organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation')
+    get_transaction().commit()
+    self.tic()
+    activity_tool = self.getActivityTool()
+    result = activity_tool.getMessageList()
+    self.assertEqual(len(result), 0)
+    # First scenario: activate, distribute, activate, distribute
+    # Create first activity and distribute: it must be distributed
+    organisation.activate(activity=activity, serialization_tag='1').getTitle()
+    get_transaction().commit()
+    result = activity_tool.getMessageList()
+    self.assertEqual(len(result), 1)
+    activity_tool.distribute()
+    result = activity_tool.getMessageList()
+    self.assertEqual(len([x for x in result if x.processing_node == 0]), 1)
+    # Create second activity and distribute: it must *NOT* be distributed
+    organisation.activate(activity=activity, serialization_tag='1').getTitle()
+    get_transaction().commit()
+    result = activity_tool.getMessageList()
+    self.assertEqual(len(result), 2)
+    activity_tool.distribute()
+    result = activity_tool.getMessageList()
+    self.assertEqual(len([x for x in result if x.processing_node == 0]), 1) # Distributed message list len is still 1
+    self.tic()
+    result = activity_tool.getMessageList()
+    self.assertEqual(len(result), 0)
+    # Second scenario: activate, activate, distribute
+    # Both messages must be distributed (this is different from regular tags)
+    organisation.activate(activity=activity, serialization_tag='1').getTitle()
+    # Use a different method just so that SQLDict doesn't merge both activities prior to insertion.
+    organisation.activate(activity=activity, serialization_tag='1').getId()
+    get_transaction().commit()
+    result = activity_tool.getMessageList()
+    self.assertEqual(len(result), 2)
+    activity_tool.distribute()
+    result = activity_tool.getMessageList()
+    self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)
+    self.tic()
+    result = activity_tool.getMessageList()
+    self.assertEqual(len(result), 0)
+
+  def test_106_checkSerializationTagSQLDict(self, quiet=0, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = '\nCheck serialization tag (SQLDict)'
+      ZopeTestCase._print(message)
+      LOG('Testing... ',0,message)
+    self.CheckSerializationTag('SQLDict')
+
+  def test_107_checkSerializationTagSQLQueue(self, quiet=0, run=run_all_test):
+    if not run: return
+    if not quiet:
+      message = '\nCheck serialization tag (SQLQueue)'
+      ZopeTestCase._print(message)
+      LOG('Testing... ',0,message)
+    self.CheckSerializationTag('SQLQueue')
+
 def test_suite():
   suite = unittest.TestSuite()
   suite.addTest(unittest.makeSuite(TestCMFActivity))
-- 
2.30.9