diff --git a/product/CMFActivity/Activity/SQLBase.py b/product/CMFActivity/Activity/SQLBase.py index afb3eeee0cd6ac7ec180eafebb6eb02b97b8779c..7e9ea825ad0dbf3bd73f23eef7bbdbd732e8b287 100644 --- a/product/CMFActivity/Activity/SQLBase.py +++ b/product/CMFActivity/Activity/SQLBase.py @@ -35,8 +35,6 @@ from Products.CMFActivity.ActiveObject import ( INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE) from Queue import VALIDATION_ERROR_DELAY -MAX_RETRY = 5 - class SQLBase: """ @@ -166,16 +164,24 @@ class SQLBase: # please, remove the "type(m.exc_type) is type(ConflictError)" check # and leave only the "issubclass(m.exc_type, ConflictError)" check. if type(m.exc_type) is type(ConflictError) and \ - issubclass(m.exc_type, ConflictError): + m.conflict_retry and issubclass(m.exc_type, ConflictError): delay_uid_list.append(uid) else: + max_retry = m.max_retry retry = m.line.retry - if retry >= MAX_RETRY: + if max_retry is not None and retry >= max_retry: + # Always notify when we stop retrying. notify_user_list.append(m) final_error_uid_list.append(uid) continue - # By default, make delay quadratic to the number of retries. - delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) / 2 + # In case of infinite retry, notify the user + # when the default limit is reached. + if max_retry is None and retry == m.__class__.max_retry: + notify_user_list.append(m) + delay = m.delay + if delay is None: + # By default, make delay quadratic to the number of retries. + delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) / 2 try: # Immediately update, because values different for every message activity_tool.SQLBase_reactivate(table=self.sql_table, diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index b0e98e9da03a756c2688803a24764d31cd0b8959..1093f47e5f2eb9850fdf04a44744acb267390fd8 100644 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -36,7 +36,8 @@ import sys from types import ClassType #from time import time from SQLBase import SQLBase -from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, updateActivityRuntimeValue, clearActivityRuntimeEnvironment +from Products.CMFActivity.ActivityRuntimeEnvironment import ( + ActivityRuntimeEnvironment, getTransactionalVariable) from zExceptions import ExceptionFormatter try: @@ -315,19 +316,15 @@ class SQLDict(RAMDict, SQLBase): # Remove group_id parameter from group_method_id if group_method_id is not None: group_method_id = group_method_id.split('\0')[0] - clearActivityRuntimeEnvironment() if group_method_id not in (None, ""): - setActivityRuntimeValue('group_method_id', group_method_id) method = activity_tool.invokeGroup args = (group_method_id, message_list) + activity_runtime_environment = ActivityRuntimeEnvironment(None) else: method = activity_tool.invoke message = message_list[0] args = (message, ) - updateActivityRuntimeValue({'activity_kw': message.activity_kw, - 'priority': message.line.priority, - 'uid': message.uid}) - setActivityRuntimeValue('processing_node', processing_node) + activity_runtime_environment = ActivityRuntimeEnvironment(message) # Commit right before executing messages. # As MySQL transaction does not start exactly at the same time as ZODB # transactions but a bit later, messages available might be called @@ -336,6 +333,8 @@ class SQLDict(RAMDict, SQLBase): # So all connectors must be committed now that we have selected # everything needed from MySQL to get a fresh view of ZODB objects. get_transaction().commit() + tv = getTransactionalVariable(None) + tv['activity_runtime_environment'] = activity_runtime_environment # Try to invoke try: method(*args) diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index 5965b771004e77781261c9f7668ee307ffe100c3..f49a5e29e3c41718b1e2773a8cd830ad671bb75c 100644 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -37,7 +37,8 @@ import sys from time import time from sets import ImmutableSet from SQLBase import SQLBase -from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, updateActivityRuntimeValue, clearActivityRuntimeEnvironment +from Products.CMFActivity.ActivityRuntimeEnvironment import ( + ActivityRuntimeEnvironment, getTransactionalVariable) from zExceptions import ExceptionFormatter try: @@ -214,13 +215,10 @@ class SQLQueue(RAMQueue, SQLBase): # So all connectors must be committed now that we have selected # everything needed from MySQL to get a fresh view of ZODB objects. get_transaction().commit() + tv = getTransactionalVariable(None) for m in message_list: + tv['activity_runtime_environment'] = ActivityRuntimeEnvironment(m) processed_count += 1 - clearActivityRuntimeEnvironment() - updateActivityRuntimeValue({'processing_node': processing_node, - 'activity_kw': m.activity_kw, - 'priority': m.line.priority, - 'uid': m.uid}) # Try to invoke try: activity_tool.invoke(m) diff --git a/product/CMFActivity/ActivityRuntimeEnvironment.py b/product/CMFActivity/ActivityRuntimeEnvironment.py index 13922e5847e7e509dbec7f0e5d4650ef60f55792..93a60a825ad2521bdb1c93f9566918b3a0544972 100644 --- a/product/CMFActivity/ActivityRuntimeEnvironment.py +++ b/product/CMFActivity/ActivityRuntimeEnvironment.py @@ -1,33 +1,35 @@ -import threading -import copy - -activity_runtime_environment_container = threading.local() +from Products.ERP5Type.TransactionalVariable import getTransactionalVariable def getActivityRuntimeEnvironment(): """ - Raises AttributeError if called outside activity. + Raises KeyError if called outside activity. """ - return copy.deepcopy(activity_runtime_environment_container.current) + return getTransactionalVariable(None)['activity_runtime_environment'] def _getActivityRuntimeEnvironment(): - current = getattr(activity_runtime_environment_container, 'current', None) - if current is None: - current = activity_runtime_environment_container.current = {} - return current + try: + return getActivityRuntimeEnvironment() + except KeyError: + return -def setActivityRuntimeValue(key, value): - """ - TODO: protect against unauthorized use ? - """ - _getActivityRuntimeEnvironment()[key] = value -def updateActivityRuntimeValue(new_dict): - """ - TODO: protect against unauthorized use ? - """ - _getActivityRuntimeEnvironment().update(new_dict) +class BaseMessage: + + delay = None + # None means infinite retry + max_retry = 5 + # For errors happening after message invocation (ConflictError), + # should we retry quickly without increasing 'retry' count ? + conflict_retry = True + + +class ActivityRuntimeEnvironment(object): -def clearActivityRuntimeEnvironment(): - if getattr(activity_runtime_environment_container, 'current', None) is not None: - delattr(activity_runtime_environment_container, 'current') + def __init__(self, message): + self._message = message + def edit(self, **kw): + # There is no point allowing to modify other attributes from a message + for k in kw: + getattr(BaseMessage, k) + self._message.__dict__.update(kw) diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index ae9f5e6d4d2627163b7ebd0e5043c45c0fad4db9..cdb7c8ede4b93fe4486e6bf246be99500c9cc4c4 100644 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -48,6 +48,7 @@ from Products.ERP5Type.Globals import InitializeClass, DTMLFile from Acquisition import aq_base from Acquisition import aq_inner from ActivityBuffer import ActivityBuffer +from ActivityRuntimeEnvironment import BaseMessage from zExceptions import ExceptionFormatter from BTrees.OIBTree import OIBTree @@ -153,7 +154,8 @@ MESSAGE_NOT_EXECUTED = 0 MESSAGE_EXECUTED = 1 MESSAGE_NOT_EXECUTABLE = 2 -class Message: + +class Message(BaseMessage): """Activity Message Class. Message instances are stored in an activity queue, inside the Activity Tool. diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index a3eea7f7dcf221f8bd2d77887c5947c07675f442..7d6b05b5b01e9fc22dfd100791d3ffc1d22ae1f5 100644 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -32,6 +32,7 @@ import unittest from Testing import ZopeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.utils import DummyMailHost +from Products.ERP5Type.TransactionalVariable import getTransactionalVariable from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE,\ VALIDATE_ERROR_STATE from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY @@ -49,7 +50,6 @@ from DateTime import DateTime import cPickle as pickle from Products.CMFActivity.ActivityTool import Message import random -from Products.CMFActivity.ActivityRuntimeEnvironment import setActivityRuntimeValue, clearActivityRuntimeEnvironment import threading try: @@ -2802,44 +2802,32 @@ class TestCMFActivity(ERP5TypeTestCase): delattr(Organisation, 'mustRunAfter') def CheckActivityRuntimeEnvironment(self, activity): - organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation') - get_transaction().commit() - self.tic() - activity_tool = self.getActivityTool() - check_result_dict = {} - initial_list_check_value = [1, 2] + document = self.portal.organisation_module + activity_result = [] def extractActivityRuntimeEnvironment(self): - setActivityRuntimeValue('list_check', initial_list_check_value) - environment = self.getActivityRuntimeEnvironment() - check_result_dict['environment'] = environment - def runAndCheck(): - check_result_dict.clear() - self.assertFalse('environment' in check_result_dict) - get_transaction().commit() - self.tic() - self.assertTrue('environment' in check_result_dict) - Organisation.extractActivityRuntimeEnvironment = extractActivityRuntimeEnvironment + activity_result.append(self.getActivityRuntimeEnvironment()) + document.__class__.doSomething = extractActivityRuntimeEnvironment try: - # Check that organisation.getActivityRuntimeEnvironment raises outside - # of activities. - clearActivityRuntimeEnvironment() - #organisation.getActivityRuntimeEnvironment() - self.assertRaises(AttributeError, organisation.getActivityRuntimeEnvironment) + document.activate(activity=activity).doSomething() + get_transaction().commit() + # Check that getActivityRuntimeEnvironment raises outside of activities + self.assertRaises(KeyError, document.getActivityRuntimeEnvironment) # Check Runtime isolation - setActivityRuntimeValue('blah', True) - organisation.activate(activity=activity).extractActivityRuntimeEnvironment() - runAndCheck() - self.assertEqual(check_result_dict['environment'].get('blah'), None) - # Check Runtime presence - self.assertTrue(len(check_result_dict['environment']) > 0) - self.assertTrue('processing_node' in check_result_dict['environment']) - # Check Runtime does a deepcopy - self.assertTrue('list_check' in check_result_dict['environment']) - check_result_dict['environment']['list_check'].append(3) - self.assertTrue(check_result_dict['environment']['list_check'] != \ - initial_list_check_value) + self.tic() + # Check that it still raises outside of activities + self.assertRaises(KeyError, document.getActivityRuntimeEnvironment) + # Check activity runtime environment instance + env = activity_result.pop() + self.assertFalse(activity_result) + message = env._message + self.assertEqual(message.line.priority, 1) + self.assertEqual(message.object_path, document.getPhysicalPath()) + self.assertTrue(message.conflict_retry) # default value + env.edit(max_retry=0, conflict_retry=False) + self.assertFalse(message.conflict_retry) # edited value + self.assertRaises(AttributeError, env.edit, foo='bar') finally: - delattr(Organisation, 'extractActivityRuntimeEnvironment') + del document.__class__.doSomething def test_104_activityRuntimeEnvironmentSQLDict(self, quiet=0, run=run_all_test): if not run: return