Commit 04f3c88a authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: new 'on_error_callback' message parameter

parent 795dc233
...@@ -42,6 +42,7 @@ def sort_message_key(message): ...@@ -42,6 +42,7 @@ def sort_message_key(message):
# same sort key as in SQL{Dict,Queue}_readMessageList # same sort key as in SQL{Dict,Queue}_readMessageList
return message.line.priority, message.line.date, message.uid return message.line.priority, message.line.date, message.uid
_DequeueMessageException = Exception()
class SQLBase: class SQLBase:
""" """
...@@ -288,6 +289,15 @@ class SQLBase: ...@@ -288,6 +289,15 @@ class SQLBase:
self._log(TRACE, '(no message was reserved)') self._log(TRACE, '(no message was reserved)')
return [], 0, None, {} return [], 0, None, {}
def _abort(self):
try:
transaction.abort()
except:
# Unfortunately, database adapters may raise an exception against abort.
self._log(PANIC,
'abort failed, thus some objects may be modified accidentally')
raise
# Queue semantic # Queue semantic
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
message_list, group_method_id, uid_to_duplicate_uid_list_dict = \ message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
...@@ -320,42 +330,36 @@ class SQLBase: ...@@ -320,42 +330,36 @@ class SQLBase:
# Try to invoke # Try to invoke
try: try:
method(*args) method(*args)
# Abort if at least 1 message failed. On next tic, only those that
# succeeded will be selected because their at_date won't have been
# increased.
for m in message_list:
if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
raise _DequeueMessageException
transaction.commit()
except: except:
exc_info = sys.exc_info()
if exc_info[1] is not _DequeueMessageException:
self._log(WARNING, self._log(WARNING,
'Exception raised when invoking messages (uid, path, method_id) %r' 'Exception raised when invoking messages (uid, path, method_id) %r'
% [(m.uid, m.object_path, m.method_id) for m in message_list]) % [(m.uid, m.object_path, m.method_id) for m in message_list])
try:
transaction.abort()
except:
# Unfortunately, database adapters may raise an exception against
# abort.
self._log(PANIC,
'abort failed, thus some objects may be modified accidentally')
raise
# Abort if something failed.
if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
endTransaction = transaction.abort
else:
endTransaction = transaction.commit
try:
endTransaction()
except:
self._log(WARNING,
'Failed to end transaction for messages (uid, path, method_id) %r'
% [(m.uid, m.object_path, m.method_id) for m in message_list])
if endTransaction == transaction.abort:
self._log(PANIC, 'Failed to abort executed messages.'
' Some objects may be modified accidentally.')
else:
try:
transaction.abort()
except:
self._log(PANIC, 'Failed to abort executed messages which also'
' failed to commit. Some objects may be modified accidentally.')
raise
exc_info = sys.exc_info()
for m in message_list: for m in message_list:
m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False) m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
self._abort()
exc_info = message_list[0].exc_info
if exc_info:
try:
# Register it again.
tv['activity_runtime_environment'] = activity_runtime_environment
cancel = message.on_error_callback(*exc_info)
del exc_info, message.exc_info
transaction.commit()
if cancel:
message.setExecutionState(MESSAGE_EXECUTED)
except:
self._log(WARNING, 'Exception raised when processing error callbacks')
message.setExecutionState(MESSAGE_NOT_EXECUTED)
self._abort()
self.finalizeMessageExecution(activity_tool, message_list, self.finalizeMessageExecution(activity_tool, message_list,
uid_to_duplicate_uid_list_dict) uid_to_duplicate_uid_list_dict)
transaction.commit() transaction.commit()
......
...@@ -25,6 +25,12 @@ class BaseMessage: ...@@ -25,6 +25,12 @@ class BaseMessage:
# For errors happening after message invocation (ConflictError), # For errors happening after message invocation (ConflictError),
# should we retry quickly without increasing 'retry' count ? # should we retry quickly without increasing 'retry' count ?
conflict_retry = __property(conflict_retry=True) conflict_retry = __property(conflict_retry=True)
# Called if any error happened, after the transaction is aborted.
# The message is cancelled if a non zero value is returned.
# A transaction commit is done after it is called.
# If the callback fails, the transaction is aborted again and the
# notification contains this failure instead of the original one.
on_error_callback = __property(on_error_callback=None)
class ActivityRuntimeEnvironment(object): class ActivityRuntimeEnvironment(object):
...@@ -36,4 +42,7 @@ class ActivityRuntimeEnvironment(object): ...@@ -36,4 +42,7 @@ class ActivityRuntimeEnvironment(object):
# There is no point allowing to modify other attributes from a message # There is no point allowing to modify other attributes from a message
for k in kw: for k in kw:
getattr(BaseMessage, k) getattr(BaseMessage, k)
if k == 'on_error_callback' and \
self._message.activity_kw.get(k) is not None:
raise RuntimeError("An error callback is already registered")
self._message.activity_kw.update(kw) self._message.activity_kw.update(kw)
...@@ -161,10 +161,10 @@ class Message(BaseMessage): ...@@ -161,10 +161,10 @@ class Message(BaseMessage):
active_process = None active_process = None
active_process_uid = None active_process_uid = None
call_traceback = None call_traceback = None
exc_info = None
is_executed = MESSAGE_NOT_EXECUTED is_executed = MESSAGE_NOT_EXECUTED
processing = None processing = None
traceback = None traceback = None
exc_type = None
def __init__(self, obj, active_process, activity_kw, method_id, args, kw): def __init__(self, obj, active_process, activity_kw, method_id, args, kw):
if isinstance(obj, str): if isinstance(obj, str):
...@@ -394,6 +394,8 @@ Named Parameters: %r ...@@ -394,6 +394,8 @@ Named Parameters: %r
if is_executed != MESSAGE_EXECUTED: if is_executed != MESSAGE_EXECUTED:
if not exc_info: if not exc_info:
exc_info = sys.exc_info() exc_info = sys.exc_info()
if self.on_error_callback is not None:
self.exc_info = exc_info
self.exc_type = exc_info[0] self.exc_type = exc_info[0]
if exc_info[0] is None: if exc_info[0] is None:
# Raise a dummy exception, ignore it, fetch it and use it as if it was the error causing message non-execution. This will help identifyting the cause of this misbehaviour. # Raise a dummy exception, ignore it, fetch it and use it as if it was the error causing message non-execution. This will help identifyting the cause of this misbehaviour.
......
...@@ -48,10 +48,11 @@ from ZODB.POSException import ConflictError ...@@ -48,10 +48,11 @@ from ZODB.POSException import ConflictError
from DateTime import DateTime from DateTime import DateTime
import cPickle as pickle import cPickle as pickle
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
import gc
import random import random
import threading import threading
import sys import sys
import weakref
import transaction import transaction
class CommitFailed(Exception): class CommitFailed(Exception):
...@@ -1585,7 +1586,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -1585,7 +1586,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
if edit_kw: if edit_kw:
self.getActivityRuntimeEnvironment().edit(**edit_kw) self.getActivityRuntimeEnvironment().edit(**edit_kw)
if conflict is not None: if conflict is not None:
raise conflict and ConflictError or Exception raise ConflictError if conflict else Exception
def check(retry_list, **activate_kw): def check(retry_list, **activate_kw):
fail = retry_list[-1][0] is not None and 1 or 0 fail = retry_list[-1][0] is not None and 1 or 0
for activity in 'SQLDict', 'SQLQueue': for activity in 'SQLDict', 'SQLQueue':
...@@ -3819,6 +3820,52 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -3819,6 +3820,52 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
newconn = portal.cmf_activity_sql_connection newconn = portal.cmf_activity_sql_connection
self.assertEquals(newconn.meta_type, 'CMFActivity Database Connection') self.assertEquals(newconn.meta_type, 'CMFActivity Database Connection')
def test_onErrorCallback(self):
activity_tool = self.portal.portal_activities
obj = activity_tool.newActiveProcess()
transaction.commit()
self.tic()
def _raise(exception): # I wish exceptions are callable raising themselves
raise exception
def doSomething(self, conflict_error, cancel):
self.activity_count += 1
error = ConflictError() if conflict_error else Exception()
def onError(exc_type, exc_value, traceback):
assert exc_value is error
env = self.getActivityRuntimeEnvironment()
weakref_list.extend(map(weakref.ref, (env, env._message)))
self.on_error_count += 1
return cancel
self.getActivityRuntimeEnvironment().edit(on_error_callback=onError)
if not self.on_error_count:
if not conflict_error:
raise error
transaction.get().addBeforeCommitHook(_raise, (error,))
obj.__class__.doSomething = doSomething
try:
for activity in 'SQLDict', 'SQLQueue':
for conflict_error in False, True:
weakref_list = []
obj.activity_count = obj.on_error_count = 0
obj.activate(activity=activity).doSomething(conflict_error, True)
transaction.commit()
self.tic()
self.assertEqual(obj.activity_count, 0)
self.assertEqual(obj.on_error_count, 1)
gc.collect()
self.assertEqual([x() for x in weakref_list], [None, None])
weakref_list = []
obj.activate(activity=activity).doSomething(conflict_error, False)
obj.on_error_count = 0
transaction.commit()
self.tic()
self.assertEqual(obj.activity_count, 1)
self.assertEqual(obj.on_error_count, 1)
gc.collect()
self.assertEqual([x() for x in weakref_list], [None, None])
finally:
del obj.__class__.doSomething
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity)) suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment