From 05c3a1d1e77b757e37985a4c53b206630911e336 Mon Sep 17 00:00:00 2001
From: Sebastien Robin <seb@nexedi.com>
Date: Fri, 2 Mar 2007 11:57:20 +0000
Subject: [PATCH] - Fixed horrible bug : flush (called in manageInvoke) was  
 deleting messages wich were not VALID, without execution !! - Wrote a test to
 show that the bug is currently fixed.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@13180 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 product/CMFActivity/Activity/Queue.py        |  1 +
 product/CMFActivity/Activity/SQLDict.py      | 12 +++--
 product/CMFActivity/Activity/SQLQueue.py     |  2 +-
 product/CMFActivity/tests/testCMFActivity.py | 51 +++++++++++++++++++-
 4 files changed, 61 insertions(+), 5 deletions(-)

diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py
index d4cc730263..9600d6f880 100644
--- a/product/CMFActivity/Activity/Queue.py
+++ b/product/CMFActivity/Activity/Queue.py
@@ -159,6 +159,7 @@ class Queue:
           error=sys.exc_info())
       # Do not try to call methods on objects which cause errors
       return EXCEPTION
+    LOG('Queue.validate, return',0,'VALID')
     return VALID
 
   def isAwake(self, activity_tool, processing_node):
diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py
index 3e28fa550d..ac992938ad 100644
--- a/product/CMFActivity/Activity/SQLDict.py
+++ b/product/CMFActivity/Activity/SQLDict.py
@@ -406,7 +406,6 @@ class SQLDict(RAMDict):
       # Parse each message in registered
       for m in activity_tool.getRegisteredMessageList(self):
         if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
-          activity_tool.unregisterMessage(self, m)
           #if not method_dict.has_key(method_id or m.method_id):
           if not method_dict.has_key(m.method_id):
             method_dict[m.method_id] = 1 # Prevents calling invoke twice
@@ -423,6 +422,10 @@ class SQLDict(RAMDict):
                 # The message no longer exists
                 raise ActivityFlushError, (
                     'The document %s does not exist' % path)
+              else:
+                raise ActivityFlushError, (
+                    'Could not validate %s on %s' % (m.method_id , path))
+          activity_tool.unregisterMessage(self, m)
       # Parse each message in SQL dict
       result = readMessageList(path=path, method_id=method_id,
                                processing_node=None,include_processing=0)
@@ -435,7 +438,6 @@ class SQLDict(RAMDict):
           # node and minimize network traffic with ZEO server
           method_dict[method_id] = 1
           m = self.loadMessage(line.message, uid = line.uid)
-          self.deleteMessage(activity_tool, m)
           if invoke:
             # First Validate
             validate_value = m.validate(self, activity_tool)
@@ -447,10 +449,14 @@ class SQLDict(RAMDict):
                 # The message no longer exists
                 raise ActivityFlushError, (
                     'Could not evaluate %s on %s' % (m.method_id , path))
-            if validate_value is INVALID_PATH:
+            elif validate_value is INVALID_PATH:
               # The message no longer exists
               raise ActivityFlushError, (
                   'The document %s does not exist' % path)
+            else:
+              raise ActivityFlushError, (
+                  'Could not validate %s on %s' % (m.method_id , path))
+          self.deleteMessage(activity_tool, m)
 
   def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
     # YO: reading all lines might cause a deadlock
diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py
index 1c71ca5359..35ab4a731d 100644
--- a/product/CMFActivity/Activity/SQLQueue.py
+++ b/product/CMFActivity/Activity/SQLQueue.py
@@ -222,7 +222,6 @@ class SQLQueue(RAMQueue):
           # Only invoke once (it would be different for a queue)
           method_dict[method_id] = 1
           m = self.loadMessage(line.message, uid = line.uid)
-          self.deleteMessage(activity_tool, m)
           if invoke:
             # First Validate
             if m.validate(self, activity_tool) is VALID:
@@ -235,6 +234,7 @@ class SQLQueue(RAMQueue):
               # The message no longer exists
               raise ActivityFlushError, (
                   'The document %s does not exist' % path)
+          self.deleteMessage(activity_tool, m)
 
   # def start(self, activity_tool, active_process=None):
   #   uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py
index 488fea7542..4127ec43df 100644
--- a/product/CMFActivity/tests/testCMFActivity.py
+++ b/product/CMFActivity/tests/testCMFActivity.py
@@ -40,7 +40,7 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
 from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\
                                               VALIDATE_ERROR_STATE
 from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
-from Products.CMFActivity.Errors import ActivityPendingError
+from Products.CMFActivity.Errors import ActivityPendingError, ActivityFlushError
 from Products.ERP5Type.Document.Organisation import Organisation
 from AccessControl.SecurityManagement import newSecurityManager
 from zLOG import LOG
@@ -552,6 +552,37 @@ class TestCMFActivity(ERP5TypeTestCase):
     self.tic()
     self.assertEquals(o.getCorporateName(), 'cd')
 
+  def TryFlushActivityWithAfterTag(self, activity):
+    """
+      Ensure the order of an execution by a tag
+    """
+    portal = self.getPortal()
+    organisation_module = self.getOrganisationModule()
+    if not organisation_module.hasContent(self.company_id):
+      organisation_module.newContent(id=self.company_id)
+    o = portal.organisation._getOb(self.company_id)
+
+    o.setTitle('?')
+    o.setDescription('?')
+    self.assertEquals(o.getTitle(), '?')
+    self.assertEquals(o.getDescription(), '?')
+    get_transaction().commit()
+    self.tic()
+
+    o.activate(after_tag = 'toto', activity = activity).setDescription('b')
+    o.activate(tag = 'toto', activity = activity).setTitle('a')
+    get_transaction().commit()
+    tool = self.getActivityTool()
+    self.assertRaises(ActivityFlushError,tool.manageInvoke,o.getPath(),'setDescription')
+    tool.manageInvoke(o.getPath(),'setTitle')
+    get_transaction().commit()
+    self.assertEquals(o.getTitle(), 'a')
+    self.assertEquals(o.getDescription(), '?')
+    self.tic()
+    self.assertEquals(len(tool.getMessageList()),0)
+    self.assertEquals(o.getTitle(), 'a')
+    self.assertEquals(o.getDescription(), 'b')
+
   def CheckScheduling(self, activity):
     """
       Check if active objects with different after parameters are executed in a correct order
@@ -1583,6 +1614,24 @@ class TestCMFActivity(ERP5TypeTestCase):
       LOG('Testing... ', 0, message)
     self.TryErrorsWhileFinishingCommitDB('SQLQueue')
 
+  def test_74_TryFlushActivityWithAfterTagSQLDict(self, quiet=0, run=run_all_test):
+    # Test if after_tag can be used
+    if not run: return
+    if not quiet:
+      message = '\nTry Flus Activity With After Tag With SQL Dict'
+      ZopeTestCase._print(message)
+      LOG('Testing... ',0,message)
+    self.TryFlushActivityWithAfterTag('SQLDict')
+
+  def test_75_TryFlushActivityWithAfterTagWithSQLQueue(self, quiet=0, run=run_all_test):
+    # Test if after_tag can be used
+    if not run: return
+    if not quiet:
+      message = '\nTry Flush Activity With After Tag With SQL Queue'
+      ZopeTestCase._print(message)
+      LOG('Testing... ',0,message)
+    self.TryFlushActivityWithAfterTag('SQLQueue')
+
 
 if __name__ == '__main__':
     framework()
-- 
2.30.9