From 9a0ecaa7e42657e191f07dec9431801a20c3f392 Mon Sep 17 00:00:00 2001
From: Yoshinori Okuji <yo@nexedi.com>
Date: Thu, 3 Nov 2005 15:31:37 +0000
Subject: [PATCH] Manage how many times an active object is retried, and tweak
 the next processing date based on the number of retries. Also, when there is
 no active object available, shift the time to go ahead quickly.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@4223 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 product/CMFActivity/Activity/SQLDict.py | 39 ++++++++++++++++++-------
 1 file changed, 29 insertions(+), 10 deletions(-)

diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py
index 5ea2e9ab84..74672b5fb1 100755
--- a/product/CMFActivity/Activity/SQLDict.py
+++ b/product/CMFActivity/Activity/SQLDict.py
@@ -140,7 +140,7 @@ class SQLDict(RAMDict):
     else:
       return ()
 
-  def validateMessage(self, activity_tool, message, uid_list, priority, next_processing_date):
+  def validateMessage(self, activity_tool, message, uid_list, priority, next_processing_date, retry):
     validation_state = message.validate(self, activity_tool)
     if validation_state is not VALID:
       if validation_state in (EXCEPTION, INVALID_PATH):
@@ -156,13 +156,13 @@ class SQLDict(RAMDict):
           # Lower priority
           if len(uid_list) > 0: # Add some delay before new processing
             activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
-                                              priority = priority + 1)
+                                              priority = priority + 1, retry = retry + 1)
           get_transaction().commit() # Release locks before starting a potentially long calculation
       else:
         # We do not lower priority for INVALID_ORDER errors but we do postpone execution
         if len(uid_list) > 0: # Add some delay before new processing
           activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
-                                            priority = priority)
+                                            priority = priority, retry = retry + 1)
         get_transaction().commit() # Release locks before starting a potentially long calculation
       return 0
     return 1
@@ -171,8 +171,6 @@ class SQLDict(RAMDict):
   def dequeueMessage(self, activity_tool, processing_node):
     if hasattr(activity_tool,'SQLDict_readMessage'):
       now_date = DateTime()
-      # Next processing date in case of error
-      next_processing_date = now_date + VALIDATION_ERROR_DELAY
       priority = random.choice(priority_weight)
       # Try to find a message at given priority level which is scheduled for now
       result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority,
@@ -181,15 +179,26 @@ class SQLDict(RAMDict):
         # If empty, take any message which is scheduled for now
         priority = None
         result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
-      if len(result) > 0:
+      if len(result) == 0:
+        # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
+        # objects quickly.
+        self.timeShift(activity_tool, VALIDATION_ERROR_DELAY)
+      elif len(result) > 0:
         #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
         line = result[0]
         path = line.path
         method_id = line.method_id
+        try:
+          retry = int(line.retry)
+        except TypeError:
+          retry = 1
+        # Next processing date in case of error
+        next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry
         uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date )
         uid_list = [x.uid for x in uid_list]
         uid_list_list = [uid_list]
         priority_list = [line.priority]
+        retry_list = [retry]
         # Make sure message can not be processed anylonger
         if len(uid_list) > 0:
           # Set selected messages to processing
@@ -199,7 +208,7 @@ class SQLDict(RAMDict):
         m = self.loadMessage(line.message, uid = line.uid)
         message_list = [m]
         # Validate message (make sure object exists, priority OK, etc.)
-        if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date):
+        if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date, retry):
           group_method_id = m.activity_kw.get('group_method_id')
           if group_method_id is not None:
             # Count the number of objects to prevent too many objects.
@@ -225,6 +234,12 @@ class SQLDict(RAMDict):
               for line in result:
                 path = line.path
                 method_id = line.method_id
+                try:
+                  retry = int(line.retry)
+                except TypeError:
+                  retry = 1
+                # Next processing date in case of error
+                next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry
                 uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date )
                 uid_list = [x.uid for x in uid_list]
                 if len(uid_list) > 0:
@@ -232,7 +247,7 @@ class SQLDict(RAMDict):
                   activity_tool.SQLDict_processMessage(uid = uid_list)
                 get_transaction().commit() # Release locks before starting a potentially long calculation
                 m = self.loadMessage(line.message, uid = line.uid)
-                if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date):
+                if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date, retry):
                   if m.hasExpandMethod():
                     try:
                       count += len(m.getObjectList(activity_tool))
@@ -247,13 +262,14 @@ class SQLDict(RAMDict):
                   message_list.append(m)
                   uid_list_list.append(uid_list)
                   priority_list.append(line.priority)
+                  retry_list.append(retry)
                   if count >= MAX_GROUPED_OBJECTS:
                     break
                 
           get_transaction().commit() # Release locks before starting a potentially long calculation
           # Try to invoke
           if group_method_id is not None:
-            #LOG('SQLDict', 0, 'invoking a group method %s with %d objects (%d objects in expanded form)' % (group_method_id, len(message_list), count))
+            LOG('SQLDict', 0, 'invoking a group method %s with %d objects (%d objects in expanded form)' % (group_method_id, len(message_list), count))
             #for m in message_list:
             #  LOG('SQLDict', 0, '%r has objects %r' % (m, m.getObjectList(activity_tool)))
             activity_tool.invokeGroup(group_method_id, message_list)
@@ -277,6 +293,9 @@ class SQLDict(RAMDict):
             m = message_list[i]
             uid_list = uid_list_list[i]
             priority = priority_list[i]
+            retry = retry_list[i]
+            # Next processing date in case of error
+            next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry
             if m.is_executed:
               activity_tool.SQLDict_delMessage(uid = uid_list)                # Delete it
               get_transaction().commit()                                        # If successful, commit
@@ -297,7 +316,7 @@ class SQLDict(RAMDict):
                 # Lower priority
                 if len(uid_list) > 0:
                   activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
-                                                    priority = priority + 1)
+                                                    priority = priority + 1, retry = retry + 1)
                   get_transaction().commit() # Release locks before starting a potentially long calculation
                 
         return 0
-- 
2.30.9