diff --git a/product/CMFActivity/ActiveObject.py b/product/CMFActivity/ActiveObject.py index fe27d1d899223e241b486a6994a686c2c4cd1ff0..fe817716f68aa15f5fff0dfc7db4039d18c5dddc 100644 --- a/product/CMFActivity/ActiveObject.py +++ b/product/CMFActivity/ActiveObject.py @@ -41,10 +41,6 @@ DEFAULT_ACTIVITY = 'SQLDict' # Processing node are used to store processing state or processing node DISTRIBUTABLE_STATE = -1 INVOKE_ERROR_STATE = -2 -VALIDATE_ERROR_STATE = -3 -STOP_STATE = -4 -# Special state which allows to select positive nodes -POSITIVE_NODE_STATE = 'Positive Node State' _DEFAULT_ACTIVATE_PARAMETER_KEY = 'default_activate_parameter' @@ -145,12 +141,6 @@ class ActiveObject(ExtensionClass.Base): """ return self.hasActivity(processing_node = INVOKE_ERROR_STATE) - security.declareProtected( permissions.View, 'hasInvalidActivity' ) - def hasInvalidActivity(self, **kw): - """Tells if there is invalied activities for this object. - """ - return self.hasActivity(processing_node = VALIDATE_ERROR_STATE) - def getActiveProcess(self): path = getActivityRuntimeEnvironment()._message.active_process if path: diff --git a/product/CMFActivity/ActiveProcess.py b/product/CMFActivity/ActiveProcess.py index 011579edb223dc5d62370ef76308145d5f4029c1..d15ffd840a789e62b4da6a3eb63a5a2a64c987c9 100644 --- a/product/CMFActivity/ActiveProcess.py +++ b/product/CMFActivity/ActiveProcess.py @@ -33,8 +33,7 @@ from Products.ERP5Type.Base import Base from Products.ERP5Type import PropertySheet from Products.ERP5Type.ConflictFree import ConflictFreeLog from BTrees.Length import Length -from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, \ - VALIDATE_ERROR_STATE +from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE from random import randrange from .ActiveResult import ActiveResult @@ -150,13 +149,6 @@ class ActiveProcess(Base): """ return self.hasActivity(processing_node = INVOKE_ERROR_STATE) - security.declareProtected( CMFCorePermissions.View, 'hasInvalidActivity' ) - def hasInvalidActivity(self, **kw): - """ - Tells if an object if active - """ - return self.hasActivity(processing_node = VALIDATE_ERROR_STATE) - def getCreationDate(self): """ Define a Creation Date for an active process diff --git a/product/CMFActivity/Activity/SQLBase.py b/product/CMFActivity/Activity/SQLBase.py index 765eea486519a18041550cf94c68fb734fb232e8..71096bd218212ecd8c9443f2ee47f975b79e12e1 100644 --- a/product/CMFActivity/Activity/SQLBase.py +++ b/product/CMFActivity/Activity/SQLBase.py @@ -34,8 +34,7 @@ from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC from ZODB.POSException import ConflictError from Products.CMFActivity.ActivityTool import ( MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED) -from Products.CMFActivity.ActiveObject import ( - INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE) +from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE from Products.CMFActivity.ActivityRuntimeEnvironment import ( ActivityRuntimeEnvironment, getTransactionalVariable) from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH @@ -278,7 +277,7 @@ class SQLBase(Queue): # Count the number of objects to prevent too many objects. cost = m.activity_kw.get('group_method_cost', .01) assert 0 < cost <= 1, (self.sql_table, uid) - count = len(m.getObjectList(activity_tool)) + count = m.getObjectCount(activity_tool) # this is heuristic (messages with same group_method_id # are likely to have the same group_method_cost) limit = int(1. / cost + 1 - count) @@ -294,7 +293,7 @@ class SQLBase(Queue): uid_to_duplicate_uid_list_dict[uid] += uid_list continue uid_to_duplicate_uid_list_dict[uid] = uid_list - cost += len(m.getObjectList(activity_tool)) * \ + cost += m.getObjectCount(activity_tool) * \ m.activity_kw.get('group_method_cost', .01) message_list.append(m) if cost >= 1: @@ -415,7 +414,6 @@ class SQLBase(Queue): final_error_uid_list = [] make_available_uid_list = [] notify_user_list = [] - non_executable_message_list = [] executed_uid_list = deletable_uid_list if uid_to_duplicate_uid_list_dict is not None: for m in message_list: @@ -461,12 +459,14 @@ class SQLBase(Queue): except: self._log(WARNING, 'Failed to reactivate %r' % uid) make_available_uid_list.append(uid) - else: - # Internal CMFActivity error: the message can not be executed because - # something is missing (context object cannot be found, method cannot - # be accessed on object). - non_executable_message_list.append(uid) - notify_user_list.append((m, False)) + else: # MESSAGE_NOT_EXECUTABLE + # 'path' does not point to any object. Activities are normally flushed + # (without invoking them) when an object is deleted, but this is only + # an optimisation. There is no efficient and reliable way to do such + # this, because a concurrent and very long transaction may be about to + # activate this object, without conflict. + # So we have to clean up any remaining activity. + deletable_uid_list.append(uid) if deletable_uid_list: try: self._retryOnLockError(activity_tool.SQLBase_delMessage, @@ -490,13 +490,6 @@ class SQLBase(Queue): except: self._log(ERROR, 'Failed to set message to error state for %r' % final_error_uid_list) - if non_executable_message_list: - try: - activity_tool.SQLBase_assignMessage(table=self.sql_table, - uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE) - except: - self._log(ERROR, 'Failed to set message to invalid path state for %r' - % non_executable_message_list) if make_available_uid_list: try: self.makeMessageListAvailable(activity_tool=activity_tool, diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index dcf603434b0a4d135c7ae1532b8796701f5a7ba6..8eefbed022cad5d2fcec2b869f32e9e12e5fc36d 100644 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -169,6 +169,7 @@ class Message(BaseMessage): is_executed = MESSAGE_NOT_EXECUTED processing = None traceback = None + oid = None def __init__(self, obj, active_process, activity_kw, method_id, args, kw): if isinstance(obj, str): @@ -177,6 +178,12 @@ class Message(BaseMessage): else: self.object_path = obj.getPhysicalPath() activity_creation_trace = obj.getPortalObject().portal_activities.activity_creation_trace + try: + self.oid = aq_base(obj)._p_oid + # Note that it's too early to get the OID of a newly created object, + # so at this point, self.oid may still be None. + except AttributeError: + pass if active_process is not None: self.active_process = active_process.getPhysicalPath() self.active_process_uid = active_process.getUid() @@ -216,29 +223,38 @@ class Message(BaseMessage): def getObject(self, activity_tool): """return the object referenced in this message.""" - return activity_tool.unrestrictedTraverse(self.object_path) - - def getObjectList(self, activity_tool): - """return the list of object that can be expanded from this message.""" - object_list = [] try: - object_list.append(self.getObject(activity_tool)) + obj = activity_tool.unrestrictedTraverse(self.object_path) except KeyError: - pass + LOG('CMFActivity', WARNING, "Message dropped (no object found at path %r)" + % (self.object_path,), error=sys.exc_info()) + self.setExecutionState(MESSAGE_NOT_EXECUTABLE) else: - if self.hasExpandMethod(): - expand_method_id = self.activity_kw['expand_method_id'] - # FIXME: how to pass parameters? - object_list = getattr(object_list[0], expand_method_id)() - return object_list - - def hasExpandMethod(self): - """return true if the message has an expand method. + if self.oid and self.oid != getattr(aq_base(obj), '_p_oid', None): + raise ValueError("OID mismatch for %r" % obj) + return obj + + def getObjectList(self, activity_tool): + """return the list of object that can be expanded from this message An expand method is used to expand the list of objects and to turn a big recursive transaction affecting many objects into multiple transactions affecting only one object at a time (this can prevent duplicated method calls).""" - return self.activity_kw.has_key('expand_method_id') + obj = self.getObject(activity_tool) + if obj is None: + return () + if 'expand_method_id' in self.activity_kw: + return getattr(obj, self.activity_kw['expand_method_id'])() + return obj, + + def getObjectCount(self, activity_tool): + if 'expand_method_id' in self.activity_kw: + try: + obj = activity_tool.unrestrictedTraverse(self.object_path) + return len(getattr(obj, self.activity_kw['expand_method_id'])()) + except StandardError: + pass + return 1 def changeUser(self, user_name, activity_tool): """restore the security context for the calling user.""" @@ -280,39 +296,21 @@ class Message(BaseMessage): def __call__(self, activity_tool): try: obj = self.getObject(activity_tool) - except KeyError: - exc_info = sys.exc_info() - LOG('CMFActivity', ERROR, - 'Message failed in getting an object from the path %r' - % (self.object_path,), error=exc_info) - self.setExecutionState(MESSAGE_NOT_EXECUTABLE, exc_info, - context=activity_tool) - else: - try: + if obj is not None: old_security_manager = getSecurityManager() try: # Change user if required (TO BE DONE) # We will change the user only in order to execute this method self.changeUser(self.user_name, activity_tool) - try: - # XXX: There is no check to see if user is allowed to access - # that method ! - method = getattr(obj, self.method_id) - except Exception: - exc_info = sys.exc_info() - LOG('CMFActivity', ERROR, - 'Message failed in getting a method %r from an object %r' - % (self.method_id, obj), error=exc_info) - method = None - self.setExecutionState(MESSAGE_NOT_EXECUTABLE, exc_info, - context=activity_tool) + # XXX: There is no check to see if user is allowed to access + # that method ! + method = getattr(obj, self.method_id) + # Store site info + setSite(activity_tool.getParentValue()) + if activity_tool.activity_timing_log: + result = activity_timing_method(method, self.args, self.kw) else: - # Store site info - setSite(activity_tool.getParentValue()) - if activity_tool.activity_timing_log: - result = activity_timing_method(method, self.args, self.kw) - else: - result = method(*self.args, **self.kw) + result = method(*self.args, **self.kw) finally: setSecurityManager(old_security_manager) @@ -322,8 +320,8 @@ class Message(BaseMessage): activity_tool.unrestrictedTraverse(self.active_process), result, obj) self.setExecutionState(MESSAGE_EXECUTED) - except: - self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool) + except: + self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool) def validate(self, activity, activity_tool, check_order_validation=1): return activity.validate(activity_tool, self, @@ -338,9 +336,7 @@ class Message(BaseMessage): email_from_name = portal.getProperty('email_from_name', portal.getProperty('email_from_address')) fail_count = self.line.retry + 1 - if self.getExecutionState() == MESSAGE_NOT_EXECUTABLE: - message = "Not executable activity" - elif retry: + if retry: message = "Pending activity already failed %s times" % fail_count else: message = "Activity failed" @@ -371,7 +367,7 @@ Named Parameters: %r def reactivate(self, activity_tool, activity=DEFAULT_ACTIVITY): # Reactivate the original object. - obj= self.getObject(activity_tool) + obj = activity_tool.unrestrictedTraverse(self.object_path) old_security_manager = getSecurityManager() try: # Change user if required (TO BE DONE) @@ -410,7 +406,7 @@ Named Parameters: %r """ assert is_executed in (MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, MESSAGE_NOT_EXECUTABLE) self.is_executed = is_executed - if is_executed != MESSAGE_EXECUTED: + if is_executed == MESSAGE_NOT_EXECUTED: if not exc_info: exc_info = sys.exc_info() if self.on_error_callback is not None: @@ -1189,21 +1185,11 @@ class ActivityTool (Folder, UniqueObject): # alternate method is used to segregate objects which cannot be grouped. alternate_method_id = m.activity_kw.get('alternate_method_id') try: - obj = m.getObject(self) - except KeyError: - exc_info = sys.exc_info() - LOG('CMFActivity', ERROR, - 'Message failed in getting an object from the path %r' - % (m.object_path,), error=exc_info) - m.setExecutionState(MESSAGE_NOT_EXECUTABLE, exc_info, context=self) - continue - try: - if m.hasExpandMethod(): - subobject_list = m.getObjectList(self) - else: - subobject_list = (obj,) + object_list = m.getObjectList(self) + if object_list is None: + continue message_dict[m] = expanded_object_list = [] - for subobj in subobject_list: + for subobj in object_list: if merge_duplicate: path = subobj.getPath() if path in path_set: diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index 7c11c3d0037990e1ba9ef06850414b64790dda0a..f9260419b9f9d7eb8e4344c5068f08013ddd8d01 100644 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -26,18 +26,16 @@ # ############################################################################## - +import inspect import unittest from Products.ERP5Type.tests.utils import LogInterceptor from Products.ERP5Type.tests.backportUnittest import skip from Testing import ZopeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase -from Products.ERP5Type.tests.utils import DummyMailHost from Products.ERP5Type.TransactionalVariable import getTransactionalVariable from Products.ERP5Type.Base import Base -from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\ - VALIDATE_ERROR_STATE +from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY from Products.CMFActivity.Activity.SQLDict import SQLDict import Products.CMFActivity.ActivityTool @@ -433,60 +431,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): self.tic() self.assertEquals(o.getTitle(), 'acb') - def ExpandedMethodWithDeletedSubObject(self, activity): - """ - Do recursiveReindexObject, then delete a - subobject an see if there is only one activity - in the queue - """ - portal = self.getPortal() - organisation_module = self.getOrganisationModule() - if not(organisation_module.hasContent(self.company_id2)): - o2 = organisation_module.newContent(id=self.company_id2) - o1 = portal.organisation._getOb(self.company_id) - o2 = portal.organisation._getOb(self.company_id2) - for o in (o1,o2): - if not(o.hasContent('1')): - o.newContent(portal_type='Email',id='1') - if not(o.hasContent('2')): - o.newContent(portal_type='Email',id='2') - o1.recursiveReindexObject() - o2.recursiveReindexObject() - o1._delOb('2') - self.commit() - portal.portal_activities.distribute() - portal.portal_activities.tic() - self.commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - - def ExpandedMethodWithDeletedObject(self, activity): - """ - Do recursiveReindexObject, then delete a - subobject an see if there is only one activity - in the queue - """ - portal = self.getPortal() - organisation_module = self.getOrganisationModule() - if not(organisation_module.hasContent(self.company_id2)): - o2 = organisation_module.newContent(id=self.company_id2) - o1 = portal.organisation._getOb(self.company_id) - o2 = portal.organisation._getOb(self.company_id2) - for o in (o1,o2): - if not(o.hasContent('1')): - o.newContent(portal_type='Email',id='1') - if not(o.hasContent('2')): - o.newContent(portal_type='Email',id='2') - o1.recursiveReindexObject() - o2.recursiveReindexObject() - organisation_module._delOb(self.company_id2) - self.commit() - portal.portal_activities.distribute() - portal.portal_activities.tic() - self.commit() - message_list = portal.portal_activities.getMessageList() - self.assertEquals(len(message_list),1) - def TryAfterTag(self, activity): """ Ensure the order of an execution by a tag @@ -1061,24 +1005,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): # Check if what we did was executed as toto self.assertEquals(email.getOwnerInfo()['id'],'toto') - def test_57_ExpandedMethodWithDeletedSubObject(self, quiet=0, run=run_all_test): - # Test if after_method_id can be used - if not run: return - if not quiet: - message = '\nTry Expanded Method With Deleted Sub Object' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - self.ExpandedMethodWithDeletedSubObject('SQLDict') - - def test_58_ExpandedMethodWithDeletedObject(self, quiet=0, run=run_all_test): - # Test if after_method_id can be used - if not run: return - if not quiet: - message = '\nTry Expanded Method With Deleted Object' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - self.ExpandedMethodWithDeletedObject('SQLDict') - def test_59_TryAfterTagWithSQLDict(self, quiet=0, run=run_all_test): # Test if after_tag can be used if not run: return @@ -1162,8 +1088,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): finished = 1 for message in activity_tool.getMessageList(): - if message.processing_node not in (INVOKE_ERROR_STATE, - VALIDATE_ERROR_STATE): + if message.processing_node != INVOKE_ERROR_STATE: finished = 0 activity_tool.timeShift(3 * VALIDATION_ERROR_DELAY) @@ -1969,38 +1894,33 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): delattr(Organisation, 'checkMarkerValue') def TryUserNotificationOnActivityFailure(self, activity): + message_list = self.portal.MailHost._message_list + del message_list[:] + obj = self.portal.organisation_module.newContent(portal_type='Organisation') self.tic() - obj = self.getPortal().organisation_module.newContent(portal_type='Organisation') - self.tic() - # Use a mutable variable to be able to modify the same instance from - # monkeypatch method. - notification_done = [] - from Products.CMFActivity.ActivityTool import Message - def fake_notifyUser(self, *args, **kw): - notification_done.append(True) - original_notifyUser = Message.notifyUser - def failingMethod(self): - raise ValueError, 'This method always fail' - Message.notifyUser = fake_notifyUser + def failingMethod(self): raise ValueError('This method always fails') Organisation.failingMethod = failingMethod try: # MESSAGE_NOT_EXECUTED obj.activate(activity=activity).failingMethod() self.commit() - self.assertEqual(len(notification_done), 0) + self.assertFalse(message_list) self.flushAllActivities(silent=1, loop_size=100) - self.assertEqual(len(notification_done), 1) + # Check there is a traceback in the email notification + sender, recipients, mail = message_list.pop() + self.assertTrue("Module %s, line %s, in failingMethod" % ( + __name__, inspect.getsourcelines(failingMethod)[1]) in mail, mail) + self.assertTrue("ValueError:" in mail, mail) # MESSAGE_NOT_EXECUTABLE obj.getParentValue()._delObject(obj.getId()) - obj.activate(activity=activity).getId() + obj.activate(activity=activity).failingMethod() self.commit() - self.assertEqual(len(notification_done), 1) - self.flushAllActivities(silent=1, loop_size=100) - self.assertEqual(len(notification_done), 2) + self.assertTrue(obj.hasActivity()) + self.tic() + self.assertFalse(obj.hasActivity()) + self.assertFalse(message_list) finally: - Message.notifyUser = original_notifyUser - delattr(Organisation, 'failingMethod') - + del Organisation.failingMethod def test_90_userNotificationOnActivityFailureWithSQLDict(self, quiet=0, run=run_all_test): """ @@ -2579,89 +2499,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): finally: delattr(Organisation, 'checkAbsoluteUrl') - def CheckMissingActivityContextObject(self, activity): - """ - Check that a message whose context has ben deleted goes to -3 - processing_node. - This must happen on first message execution, without any delay. - """ - activity_tool = self.getActivityTool() - container = self.getPortal().organisation_module - organisation = container.newContent(portal_type='Organisation') - self.tic() - organisation.activate(activity=activity).getTitle() - self.commit() - self.assertEqual(len(activity_tool.getMessageList()), 1) - # Here, we delete the subobject using most low-level method, to avoid - # pending activity to be removed. - organisation_id = organisation.id - container._delOb(organisation_id) - del organisation # Avoid keeping a reference to a deleted object. - self.commit() - self.assertEqual(getattr(container, organisation_id, None), None) - self.assertEqual(len(activity_tool.getMessageList()), 1) - activity_tool.distribute() - self.assertEqual([], activity_tool.getMessageList(activity=activity, - processing_node=-3)) - activity_tool.tic() - self.assertEqual(1, len(activity_tool.getMessageList(activity=activity, - processing_node=-3))) - - def test_109_checkMissingActivityContextObjectSQLDict(self, quiet=0, - run=run_all_test): - if not run: return - if not quiet: - message = '\nCheck missing activity context object (SQLDict)' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - self.CheckMissingActivityContextObject('SQLDict') - - def test_110_checkMissingActivityContextObjectSQLQueue(self, quiet=0, - run=run_all_test): - if not run: return - if not quiet: - message = '\nCheck missing activity context object (SQLQueue)' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - self.CheckMissingActivityContextObject('SQLQueue') - - def test_111_checkMissingActivityContextObjectSQLDict(self, quiet=0, - run=run_all_test): - """ - This is similar to tst 108, but here the object will be missing for an - activity with a group_method_id. - """ - if not run: return - if not quiet: - message = '\nCheck missing activity context object with ' \ - 'group_method_id (SQLDict)' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - activity_tool = self.getActivityTool() - container = self.getPortalObject().organisation_module - organisation = container.newContent(portal_type='Organisation') - organisation_2 = container.newContent(portal_type='Organisation') - self.tic() - organisation.reindexObject() - organisation_2.reindexObject() - self.commit() - self.assertEqual(len(activity_tool.getMessageList()), 2) - # Here, we delete the subobject using most low-level method, to avoid - # pending activity to be removed. - organisation_id = organisation.id - container._delOb(organisation_id) - del organisation # Avoid keeping a reference to a deleted object. - self.commit() - self.assertEqual(getattr(container, organisation_id, None), None) - self.assertEqual(len(activity_tool.getMessageList()), 2) - activity_tool.distribute() - self.assertEqual([], activity_tool.getMessageList(activity="SQLDict", - processing_node=-3)) - activity_tool.tic() - message, = activity_tool.getMessageList() - # The message excuted on "organisation_2" must have succeeded. - self.assertEqual(message.processing_node, -3) - def CheckLocalizerWorks(self, activity): FROM_STRING = 'Foo' TO_STRING = 'Bar' @@ -2712,69 +2549,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): LOG('Testing... ',0,message) self.CheckLocalizerWorks('SQLDict') - def testMessageContainsFailureTraceback(self, quiet=0, run=run_all_test): - if not run: return - if not quiet: - message = '\nCheck message contains failure traceback' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - portal = self.getPortalObject() - activity_tool = self.getActivityTool() - def checkMessage(message, exception_type): - self.assertNotEqual(message.getExecutionState(), 1) # 1 == MESSAGE_EXECUTED - self.assertEqual(message.exc_type, exception_type) - self.assertNotEqual(message.traceback, None) - # With Message.__call__ - # 1: activity context does not exist when activity is executed - organisation = portal.organisation_module.newContent(portal_type='Organisation') - self.tic() - organisation.activate().getTitle() # This generates the mssage we want to test. - self.commit() - message_list = activity_tool.getMessageList() - self.assertEqual(len(message_list), 1) - message = message_list[0] - portal.organisation_module._delOb(organisation.id) - message(activity_tool) - checkMessage(message, KeyError) - activity_tool.manageCancel(message.object_path, message.method_id) - # 2: activity method does not exist when activity is executed - portal.organisation_module.activate().this_method_does_not_exist() - self.commit() - message_list = activity_tool.getMessageList() - self.assertEqual(len(message_list), 1) - message = message_list[0] - message(activity_tool) - checkMessage(message, AttributeError) - activity_tool.manageCancel(message.object_path, message.method_id) - - # With ActivityTool.invokeGroup - # 1: activity context does not exist when activity is executed - organisation = portal.organisation_module.newContent(portal_type='Organisation') - self.tic() - organisation.activate().getTitle() # This generates the mssage we want to test. - self.commit() - message_list = activity_tool.getMessageList() - self.assertEqual(len(message_list), 1) - message = message_list[0] - portal.organisation_module._delOb(organisation.id) - activity_tool.invokeGroup('getTitle', [message], 'SQLDict', True) - checkMessage(message, KeyError) - activity_tool.manageCancel(message.object_path, message.method_id) - # 2: activity method does not exist when activity is executed - portal.organisation_module.activate().this_method_does_not_exist() - self.commit() - message_list = activity_tool.getMessageList() - self.assertEqual(len(message_list), 1) - message = message_list[0] - activity_tool.invokeGroup('this_method_does_not_exist', - [message], 'SQLDict', True) - checkMessage(message, KeyError) - activity_tool.manageCancel(message.object_path, message.method_id) - - # Unadressed error paths (in both cases): - # 3: activity commit raises - # 4: activity raises - def test_114_checkSQLQueueActivitySucceedsAfterActivityChangingSkin(self, quiet=0, run=run_all_test): if not run: return @@ -3620,6 +3394,41 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): self.tic() test() + def test_MessageNonExecutable(self): + message_list = self.portal.MailHost._message_list + del message_list[:] + activity_tool = self.portal.portal_activities + kw = {} + self._catch_log_errors(subsystem='CMFActivity') + try: + for kw['activity'] in 'SQLDict', 'SQLQueue': + for kw['group_method_id'] in '', None: + obj = activity_tool.newActiveProcess() + self.tic() + obj.activate(**kw).getId() + activity_tool._delOb(obj.getId()) + obj = activity_tool.newActiveProcess(id=obj.getId(), + is_indexable=False) + self.commit() + self.assertEqual(1, activity_tool.countMessage()) + self.flushAllActivities() + sender, recipients, mail = message_list.pop() + self.assertTrue('OID mismatch' in mail, mail) + m, = activity_tool.getMessageList() + self.assertEqual(m.processing_node, INVOKE_ERROR_STATE) + obj.flushActivity() + obj.activate(**kw).getId() + activity_tool._delOb(obj.getId()) + self.commit() + self.assertEqual(1, activity_tool.countMessage()) + activity_tool.tic() + self.assertTrue('no object found' in self.logged.pop().getMessage()) + finally: + self._ignore_log_errors() + self.assertFalse(self.logged) + self.assertFalse(message_list, message_list) + + def test_suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(TestCMFActivity))