Commit e47f2923 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: remove non-executable message state (-3)

When an object is deleted, higher level code used to flush its messages (without
invoking them). However, a concurrent and very long transaction may be about to
activate such an object, without conflict. We already experienced false -3
errors that could prevent other messages to be validated.

Because there is no efficient and reliable way to flush absolutely all messages,
messages on deleted objects are now ignored and deleted without any email
notification. There's only a WARNING in logs. But for performance reasons,
there's still a flush on object deletion.

To simplify code, messages that went to -3 for other reasons, like a
non-existing method, now go to -2. In fact, this was already the case for
grouped messages.

In case that a path is recycled, it may still be possible for a message to be
executed on a wrong object (the new one), instead of being ignored (because the
activated object was deleted). So in such scenario, developer should make sure
not to delete an object that may be activated in a concurrent transaction.
If the original object has an OID at the moment it is activated, an assertion
will make sure the message is not executed on another object.
parent fcce7b97
No related merge requests found
......@@ -41,10 +41,6 @@ DEFAULT_ACTIVITY = 'SQLDict'
# Processing node are used to store processing state or processing node
DISTRIBUTABLE_STATE = -1
INVOKE_ERROR_STATE = -2
VALIDATE_ERROR_STATE = -3
STOP_STATE = -4
# Special state which allows to select positive nodes
POSITIVE_NODE_STATE = 'Positive Node State'
_DEFAULT_ACTIVATE_PARAMETER_KEY = 'default_activate_parameter'
......@@ -145,12 +141,6 @@ class ActiveObject(ExtensionClass.Base):
"""
return self.hasActivity(processing_node = INVOKE_ERROR_STATE)
security.declareProtected( permissions.View, 'hasInvalidActivity' )
def hasInvalidActivity(self, **kw):
"""Tells if there is invalied activities for this object.
"""
return self.hasActivity(processing_node = VALIDATE_ERROR_STATE)
def getActiveProcess(self):
path = getActivityRuntimeEnvironment()._message.active_process
if path:
......
......@@ -33,8 +33,7 @@ from Products.ERP5Type.Base import Base
from Products.ERP5Type import PropertySheet
from Products.ERP5Type.ConflictFree import ConflictFreeLog
from BTrees.Length import Length
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, \
VALIDATE_ERROR_STATE
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE
from random import randrange
from .ActiveResult import ActiveResult
......@@ -150,13 +149,6 @@ class ActiveProcess(Base):
"""
return self.hasActivity(processing_node = INVOKE_ERROR_STATE)
security.declareProtected( CMFCorePermissions.View, 'hasInvalidActivity' )
def hasInvalidActivity(self, **kw):
"""
Tells if an object if active
"""
return self.hasActivity(processing_node = VALIDATE_ERROR_STATE)
def getCreationDate(self):
"""
Define a Creation Date for an active process
......
......@@ -34,8 +34,7 @@ from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from ZODB.POSException import ConflictError
from Products.CMFActivity.ActivityTool import (
MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
from Products.CMFActivity.ActiveObject import (
INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE
from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable)
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH
......@@ -278,7 +277,7 @@ class SQLBase(Queue):
# Count the number of objects to prevent too many objects.
cost = m.activity_kw.get('group_method_cost', .01)
assert 0 < cost <= 1, (self.sql_table, uid)
count = len(m.getObjectList(activity_tool))
count = m.getObjectCount(activity_tool)
# this is heuristic (messages with same group_method_id
# are likely to have the same group_method_cost)
limit = int(1. / cost + 1 - count)
......@@ -294,7 +293,7 @@ class SQLBase(Queue):
uid_to_duplicate_uid_list_dict[uid] += uid_list
continue
uid_to_duplicate_uid_list_dict[uid] = uid_list
cost += len(m.getObjectList(activity_tool)) * \
cost += m.getObjectCount(activity_tool) * \
m.activity_kw.get('group_method_cost', .01)
message_list.append(m)
if cost >= 1:
......@@ -415,7 +414,6 @@ class SQLBase(Queue):
final_error_uid_list = []
make_available_uid_list = []
notify_user_list = []
non_executable_message_list = []
executed_uid_list = deletable_uid_list
if uid_to_duplicate_uid_list_dict is not None:
for m in message_list:
......@@ -461,12 +459,14 @@ class SQLBase(Queue):
except:
self._log(WARNING, 'Failed to reactivate %r' % uid)
make_available_uid_list.append(uid)
else:
# Internal CMFActivity error: the message can not be executed because
# something is missing (context object cannot be found, method cannot
# be accessed on object).
non_executable_message_list.append(uid)
notify_user_list.append((m, False))
else: # MESSAGE_NOT_EXECUTABLE
# 'path' does not point to any object. Activities are normally flushed
# (without invoking them) when an object is deleted, but this is only
# an optimisation. There is no efficient and reliable way to do such
# this, because a concurrent and very long transaction may be about to
# activate this object, without conflict.
# So we have to clean up any remaining activity.
deletable_uid_list.append(uid)
if deletable_uid_list:
try:
self._retryOnLockError(activity_tool.SQLBase_delMessage,
......@@ -490,13 +490,6 @@ class SQLBase(Queue):
except:
self._log(ERROR, 'Failed to set message to error state for %r'
% final_error_uid_list)
if non_executable_message_list:
try:
activity_tool.SQLBase_assignMessage(table=self.sql_table,
uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE)
except:
self._log(ERROR, 'Failed to set message to invalid path state for %r'
% non_executable_message_list)
if make_available_uid_list:
try:
self.makeMessageListAvailable(activity_tool=activity_tool,
......
......@@ -169,6 +169,7 @@ class Message(BaseMessage):
is_executed = MESSAGE_NOT_EXECUTED
processing = None
traceback = None
oid = None
def __init__(self, obj, active_process, activity_kw, method_id, args, kw):
if isinstance(obj, str):
......@@ -177,6 +178,12 @@ class Message(BaseMessage):
else:
self.object_path = obj.getPhysicalPath()
activity_creation_trace = obj.getPortalObject().portal_activities.activity_creation_trace
try:
self.oid = aq_base(obj)._p_oid
# Note that it's too early to get the OID of a newly created object,
# so at this point, self.oid may still be None.
except AttributeError:
pass
if active_process is not None:
self.active_process = active_process.getPhysicalPath()
self.active_process_uid = active_process.getUid()
......@@ -216,29 +223,38 @@ class Message(BaseMessage):
def getObject(self, activity_tool):
"""return the object referenced in this message."""
return activity_tool.unrestrictedTraverse(self.object_path)
def getObjectList(self, activity_tool):
"""return the list of object that can be expanded from this message."""
object_list = []
try:
object_list.append(self.getObject(activity_tool))
obj = activity_tool.unrestrictedTraverse(self.object_path)
except KeyError:
pass
LOG('CMFActivity', WARNING, "Message dropped (no object found at path %r)"
% (self.object_path,), error=sys.exc_info())
self.setExecutionState(MESSAGE_NOT_EXECUTABLE)
else:
if self.hasExpandMethod():
expand_method_id = self.activity_kw['expand_method_id']
# FIXME: how to pass parameters?
object_list = getattr(object_list[0], expand_method_id)()
return object_list
if self.oid and self.oid != getattr(aq_base(obj), '_p_oid', None):
raise ValueError("OID mismatch for %r" % obj)
return obj
def hasExpandMethod(self):
"""return true if the message has an expand method.
def getObjectList(self, activity_tool):
"""return the list of object that can be expanded from this message
An expand method is used to expand the list of objects and to turn a
big recursive transaction affecting many objects into multiple
transactions affecting only one object at a time (this can prevent
duplicated method calls)."""
return self.activity_kw.has_key('expand_method_id')
obj = self.getObject(activity_tool)
if obj is None:
return ()
if 'expand_method_id' in self.activity_kw:
return getattr(obj, self.activity_kw['expand_method_id'])()
return obj,
def getObjectCount(self, activity_tool):
if 'expand_method_id' in self.activity_kw:
try:
obj = activity_tool.unrestrictedTraverse(self.object_path)
return len(getattr(obj, self.activity_kw['expand_method_id'])())
except StandardError:
pass
return 1
def changeUser(self, user_name, activity_tool):
"""restore the security context for the calling user."""
......@@ -280,33 +296,15 @@ class Message(BaseMessage):
def __call__(self, activity_tool):
try:
obj = self.getObject(activity_tool)
except KeyError:
exc_info = sys.exc_info()
LOG('CMFActivity', ERROR,
'Message failed in getting an object from the path %r'
% (self.object_path,), error=exc_info)
self.setExecutionState(MESSAGE_NOT_EXECUTABLE, exc_info,
context=activity_tool)
else:
try:
if obj is not None:
old_security_manager = getSecurityManager()
try:
# Change user if required (TO BE DONE)
# We will change the user only in order to execute this method
self.changeUser(self.user_name, activity_tool)
try:
# XXX: There is no check to see if user is allowed to access
# that method !
method = getattr(obj, self.method_id)
except Exception:
exc_info = sys.exc_info()
LOG('CMFActivity', ERROR,
'Message failed in getting a method %r from an object %r'
% (self.method_id, obj), error=exc_info)
method = None
self.setExecutionState(MESSAGE_NOT_EXECUTABLE, exc_info,
context=activity_tool)
else:
# Store site info
setSite(activity_tool.getParentValue())
if activity_tool.activity_timing_log:
......@@ -338,9 +336,7 @@ class Message(BaseMessage):
email_from_name = portal.getProperty('email_from_name',
portal.getProperty('email_from_address'))
fail_count = self.line.retry + 1
if self.getExecutionState() == MESSAGE_NOT_EXECUTABLE:
message = "Not executable activity"
elif retry:
if retry:
message = "Pending activity already failed %s times" % fail_count
else:
message = "Activity failed"
......@@ -371,7 +367,7 @@ Named Parameters: %r
def reactivate(self, activity_tool, activity=DEFAULT_ACTIVITY):
# Reactivate the original object.
obj= self.getObject(activity_tool)
obj = activity_tool.unrestrictedTraverse(self.object_path)
old_security_manager = getSecurityManager()
try:
# Change user if required (TO BE DONE)
......@@ -410,7 +406,7 @@ Named Parameters: %r
"""
assert is_executed in (MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, MESSAGE_NOT_EXECUTABLE)
self.is_executed = is_executed
if is_executed != MESSAGE_EXECUTED:
if is_executed == MESSAGE_NOT_EXECUTED:
if not exc_info:
exc_info = sys.exc_info()
if self.on_error_callback is not None:
......@@ -1189,21 +1185,11 @@ class ActivityTool (Folder, UniqueObject):
# alternate method is used to segregate objects which cannot be grouped.
alternate_method_id = m.activity_kw.get('alternate_method_id')
try:
obj = m.getObject(self)
except KeyError:
exc_info = sys.exc_info()
LOG('CMFActivity', ERROR,
'Message failed in getting an object from the path %r'
% (m.object_path,), error=exc_info)
m.setExecutionState(MESSAGE_NOT_EXECUTABLE, exc_info, context=self)
object_list = m.getObjectList(self)
if object_list is None:
continue
try:
if m.hasExpandMethod():
subobject_list = m.getObjectList(self)
else:
subobject_list = (obj,)
message_dict[m] = expanded_object_list = []
for subobj in subobject_list:
for subobj in object_list:
if merge_duplicate:
path = subobj.getPath()
if path in path_set:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment