From 2264ab3a5cb5a29f4bdd580578e2d74b516cf6a2 Mon Sep 17 00:00:00 2001
From: Yoshinori Okuji <yo@nexedi.com>
Date: Mon, 13 Dec 2004 17:54:40 +0000
Subject: [PATCH] Broadcasting messages is supported.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@2003 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 product/CMFActivity/Activity/SQLDict.py       | 73 ++++++++++++-------
 .../skins/activity/SQLDict_assignMessage.zsql |  8 +-
 .../activity/SQLDict_createMessageTable.zsql  |  1 +
 .../skins/activity/SQLDict_writeMessage.zsql  |  7 +-
 4 files changed, 58 insertions(+), 31 deletions(-)

diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py
index 1aac3ed728..3c48ed0f3a 100755
--- a/product/CMFActivity/Activity/SQLDict.py
+++ b/product/CMFActivity/Activity/SQLDict.py
@@ -43,7 +43,7 @@ priority_weight = \
   [3] * 10 + \
   [4] * 5 + \
   [5] * 1
-   
+
 class ActivityFlushError(Exception):
     """Error during active message flush"""
 
@@ -59,6 +59,7 @@ class SQLDict(RAMDict):
       activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) ,
                                           method_id = m.method_id,
                                           priority = m.activity_kw.get('priority', 1),
+                                          broadcast = m.activity_kw.get('broadcast', 0),
                                           message = self.dumpMessage(m),
                                           date = m.activity_kw.get('at_date', DateTime()))
                                           # Also store uid of activity
@@ -69,26 +70,26 @@ class SQLDict(RAMDict):
     uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=m.method_id,processing_node=None)
     uid_list = map(lambda x:x.uid, uid_list)
     if len(uid_list)>0:
-      activity_tool.SQLDict_delMessage(uid = uid_list) 
-    
-  # Registration management    
+      activity_tool.SQLDict_delMessage(uid = uid_list)
+
+  # Registration management
   def registerActivityBuffer(self, activity_buffer):
     class_name = self.__class__.__name__
-    setattr(activity_buffer, '_%s_uid_dict' % class_name, {})  
-    setattr(activity_buffer, '_%s_message_list' % class_name, [])  
-            
+    setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
+    setattr(activity_buffer, '_%s_message_list' % class_name, [])
+
   def isMessageRegistered(self, activity_buffer, activity_tool, m):
     class_name = self.__class__.__name__
     uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
     return uid_dict.has_key((tuple(m.object_path), m.method_id))
-          
+
   def registerMessage(self, activity_buffer, activity_tool, m):
     m.is_registered = 1
     class_name = self.__class__.__name__
     uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
     uid_dict[(tuple(m.object_path), m.method_id)] = 1
     getattr(activity_buffer,'_%s_message_list' % class_name).append(m)
-          
+
   def unregisterMessage(self, activity_buffer, activity_tool, m):
     m.is_registered = 0 # This prevents from inserting deleted messages into the queue
     class_name = self.__class__.__name__
@@ -103,7 +104,7 @@ class SQLDict(RAMDict):
       return filter(lambda m: m.is_registered, message_list)
     else:
       return ()
-                
+
   # Queue semantic
   def dequeueMessage(self, activity_tool, processing_node):
     if hasattr(activity_tool,'SQLDict_readMessage'):
@@ -126,7 +127,7 @@ class SQLDict(RAMDict):
         uid_list = map(lambda x:x.uid, uid_list)
         # Make sure message can not be processed anylonger
         if len(uid_list) > 0:
-          # Set selected messages to processing          
+          # Set selected messages to processing
           activity_tool.SQLDict_processMessage(uid = uid_list)
         get_transaction().commit() # Release locks before starting a potentially long calculation
         # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
@@ -134,11 +135,11 @@ class SQLDict(RAMDict):
         # Validate message (make sure object exists, priority OK, etc.)
         validation_state = m.validate(self, activity_tool)
         if validation_state is not VALID:
-          if validation_state in (EXCEPTION, INVALID_PATH):          
+          if validation_state in (EXCEPTION, INVALID_PATH):
             # There is a serious validation error - we must lower priority
             if line.priority > MAX_PRIORITY:
               # This is an error
-              if len(uid_list) > 0: 
+              if len(uid_list) > 0:
                 activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
                                                                                 # Assign message back to 'error' state
               #m.notifyUser(activity_tool)                                      # Notify Error
@@ -231,7 +232,7 @@ class SQLDict(RAMDict):
               else:
                 # The message no longer exists
                 raise ActivityFlushError, (
-                    'The document %s does not exist' % path)               
+                    'The document %s does not exist' % path)
       # Parse each message in SQL dict
       result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
       for line in result:
@@ -239,6 +240,8 @@ class SQLDict(RAMDict):
         method_id = line.method_id
         if not method_dict.has_key(method_id):
           # Only invoke once (it would be different for a queue)
+          # This is optimisation with the goal to process objects on the same
+          # node and minimize network traffic with ZEO server
           method_dict[method_id] = 1
           m = self.loadMessage(line.message, uid = line.uid)
           self.deleteMessage(activity_tool, m)
@@ -265,8 +268,8 @@ class SQLDict(RAMDict):
         m.processing_node = line.processing_node
         m.priority = line.priority
         message_list.append(m)
-    return message_list        
-      
+    return message_list
+
   def distribute(self, activity_tool, node_count):
     processing_node = 1
     if hasattr(activity_tool,'SQLDict_readMessageList'):
@@ -276,17 +279,30 @@ class SQLDict(RAMDict):
         max_processing_date = now_date - MAX_PROCESSING_TIME
         self.max_processing_date = now_date
       else:
-        max_processing_date = None     
+        max_processing_date = None
       result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1,
                                                      to_processing_date = max_processing_date) # Only assign non assigned messages
       get_transaction().commit() # Release locks before starting a potentially long calculation
       path_dict = {}
       for line in result:
         path = line.path
-        if not path_dict.has_key(path):
+        broadcast = line.broadcast
+        if broadcast:
+          # Broadcast messages must be distributed into all nodes.
+          uid = line.uid
+          activity_tool.SQLDict_assignMessage(processing_node=1, uid=[uid])
+          for node in range(2, node_count+1):
+            activity_tool.SQLDict_writeMessage( path = path,
+                                                method_id = line.method_id,
+                                                priority = line.priority,
+                                                broadcast = 1,
+                                                processing_node = node,
+                                                message = line.message,
+                                                date = line.date)
+        elif not path_dict.has_key(path):
           # Only assign once (it would be different for a queue)
           path_dict[path] = 1
-          activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None)
+          activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None, broadcast=0)
           get_transaction().commit() # Release locks immediately to allow processing of messages
           processing_node = processing_node + 1
           if processing_node > node_count:
@@ -295,18 +311,21 @@ class SQLDict(RAMDict):
   # Validation private methods
   def _validate_after_method_id(self, activity_tool, message, value):
     # Count number of occurances of method_id
-    result = activity_tool.SQLDict_validateMessageList(method_id=value, message_uid=None, path=None)
-    if result[0].uid_count > 0:
-      return INVALID_ORDER
+    if type(value) == type(''):
+      value = [value]
+    for method_id in value:
+      result = activity_tool.SQLDict_validateMessageList(method_id=method_id, message_uid=None, path=None)
+      if result[0].uid_count > 0:
+        return INVALID_ORDER
     return VALID
-            
+
   def _validate_after_path(self, activity_tool, message, value):
     # Count number of occurances of path
     result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, path=value)
     if result[0].uid_count > 0:
       return INVALID_ORDER
     return VALID
-            
+
   def _validate_after_message_uid(self, activity_tool, message, value):
     # Count number of occurances of message_uid
     result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=value, path=None)
@@ -314,12 +333,12 @@ class SQLDict(RAMDict):
       return INVALID_ORDER
     return VALID
 
-  # Required for tests (time shift)        
-  def timeShift(self, activity_tool, delay):    
+  # Required for tests (time shift)
+  def timeShift(self, activity_tool, delay):
     """
       To simulate timeShift, we simply substract delay from
       all dates in SQLDict message table
     """
     activity_tool.SQLDict_timeShift(delay = delay * SECONDS_IN_DAY)
-            
+
 registerActivity(SQLDict)
diff --git a/product/CMFActivity/skins/activity/SQLDict_assignMessage.zsql b/product/CMFActivity/skins/activity/SQLDict_assignMessage.zsql
index ae59918ba6..8b3c15995e 100755
--- a/product/CMFActivity/skins/activity/SQLDict_assignMessage.zsql
+++ b/product/CMFActivity/skins/activity/SQLDict_assignMessage.zsql
@@ -10,7 +10,8 @@ class_file:
 <params>path
 processing_node
 method_id
-uid=None</params>
+uid
+broadcast</params>
 UPDATE message
 SET
   processing_node=<dtml-sqlvar processing_node type="int">,
@@ -22,4 +23,7 @@ WHERE
 <dtml-else>
   path = <dtml-sqlvar path type="string">
 <dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
-</dtml-if>
\ No newline at end of file
+</dtml-if>
+<dtml-if broadcast>
+  AND broadcast = <dtml-sqlvar broadcast type="int">
+</dtml-if>
diff --git a/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql b/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
index 827896267d..e528ceedad 100755
--- a/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
+++ b/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql
@@ -17,6 +17,7 @@ CREATE TABLE `message` (
   `processing` INT DEFAULT 0,
   `processing_date` datetime,
   `priority` INT DEFAULT 0,
+  `broadcast` INT DEFAULT 0,
   `message` BLOB,
   PRIMARY KEY  (`uid`),
   KEY `date` (`date`),
diff --git a/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql b/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql
index a456f94644..ff78ca8ed1 100755
--- a/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql
+++ b/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql
@@ -11,13 +11,16 @@ class_file:
 method_id
 message
 priority
-date</params>
+broadcast
+date
+processing_node=-1</params>
 INSERT INTO message
 SET
   path = <dtml-sqlvar path type="string">,
   <dtml-if date>date = <dtml-sqlvar date type="string">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="string">, </dtml-if> 
   method_id = <dtml-sqlvar method_id type="string">,
-  processing_node = -1,
+  processing_node = <dtml-sqlvar processing_node type="int">,
   processing = -1,
   priority = <dtml-sqlvar priority type="int">,
+  broadcast = <dtml-sqlvar broadcast type="int">,
   message = <dtml-sqlvar message type="string">
-- 
2.30.9