diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 583416fb1f5119a3a4f696791f1e8d0c7382f55a..b31c5ebe10497cba5d4cf73a05e3764c28138c51 100755 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -41,6 +41,7 @@ from ActivityBuffer import ActivityBuffer from AccessControl.SecurityManagement import newSecurityManager import threading import sys +from ZODB.POSException import ConflictError from zLOG import LOG @@ -261,28 +262,33 @@ class ActivityTool (Folder, UniqueObject): active_threads += 1 tic_lock.release() - # Wakeup each queue - for activity in activity_list: - try: - activity.wakeup(self, processing_node) - except: - LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity) - - # Process messages on each queue in round robin - has_awake_activity = 1 - while has_awake_activity: - has_awake_activity = 0 + try: + # Wakeup each queue for activity in activity_list: try: - activity.tic(self, processing_node) # Transaction processing is the responsability of the activity - has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node) + activity.wakeup(self, processing_node) + except ConflictError: + raise except: - LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity, error=sys.exc_info()) - - # decrease the number of active_threads - tic_lock.acquire() - active_threads -= 1 - tic_lock.release() + LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity) + + # Process messages on each queue in round robin + has_awake_activity = 1 + while has_awake_activity: + has_awake_activity = 0 + for activity in activity_list: + try: + activity.tic(self, processing_node) # Transaction processing is the responsability of the activity + has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node) + except ConflictError: + raise + except: + LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity, error=sys.exc_info()) + finally: + # decrease the number of active_threads + tic_lock.acquire() + active_threads -= 1 + tic_lock.release() def hasActivity(self, *args, **kw): # Check in each queue if the object has deferred tasks