Commit 8f1d2a95 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: merge SQLDict.flush & SQLQueue.flush into SQLBase

parent faadbbfa
......@@ -38,7 +38,8 @@ from Products.CMFActivity.ActiveObject import (
INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable)
from Queue import Queue, VALIDATION_ERROR_DELAY
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH
from Products.CMFActivity.Errors import ActivityFlushError
def sort_message_key(message):
# same sort key as in SQLBase.getMessageList
......@@ -334,7 +335,7 @@ class SQLBase(Queue):
if group_method_id not in (None, ""):
method = activity_tool.invokeGroup
args = (group_method_id, message_list, self.__class__.__name__,
self.merge_duplicate)
hasattr(self, 'generateMessageUID'))
activity_runtime_environment = ActivityRuntimeEnvironment(None)
else:
method = activity_tool.invoke
......@@ -504,3 +505,47 @@ class SQLBase(Queue):
# Notification failures must not cause this method to raise.
self._log(WARNING,
'Exception during notification phase of finalizeMessageExecution')
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
"""
object_path is a tuple
"""
path = '/'.join(object_path)
if invoke:
invoked = set()
def invoke(message):
try:
key = self.generateMessageUID(message)
if key in invoked:
return
invoked.add(key)
except AttributeError:
pass
line = getattr(message, 'line', None)
validate_value = VALID if line and line.processing_node != -1 else \
message.validate(self, activity_tool)
if validate_value == VALID:
# Try to invoke the message - what happens if invoke calls flushActivity ??
activity_tool.invoke(message)
if message.getExecutionState() != MESSAGE_EXECUTED:
raise ActivityFlushError('Could not invoke %s on %s'
% (message.method_id, path))
elif validate_value is INVALID_PATH:
raise ActivityFlushError('The document %s does not exist' % path)
else:
raise ActivityFlushError('Could not validate %s on %s'
% (message.method_id, path))
for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (
method_id is None or method_id == m.method_id):
if invoke:
invoke(m)
activity_tool.unregisterMessage(self, m)
uid_list = []
for line in self._getMessageList(activity_tool, path=path, processing=0,
**({'method_id': method_id} if method_id else {})):
uid_list.append(line.uid)
if invoke:
invoke(self.loadMessage(line.message, uid=line.uid, line=line))
if uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
......@@ -27,8 +27,6 @@
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Queue import VALID, INVALID_PATH
from Products.CMFActivity.Errors import ActivityFlushError
import sys
#from time import time
from SQLBase import SQLBase, sort_message_key
......@@ -51,7 +49,6 @@ class SQLDict(SQLBase):
because use of OOBTree.
"""
sql_table = 'message'
merge_duplicate = True
# Transaction commit methods
def prepareQueueMessageList(self, activity_tool, message_list):
......@@ -204,77 +201,6 @@ class SQLDict(SQLBase):
return result[0].message_count > 0
return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
"""
object_path is a tuple
"""
path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id)))
method_dict = {}
readUidList = getattr(activity_tool, 'SQLDict_readUidList', None)
if readUidList is not None:
# Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
if m.object_path == object_path and (method_id is None or method_id == m.method_id):
#if not method_dict.has_key(method_id or m.method_id):
if not method_dict.has_key(m.method_id):
method_dict[m.method_id] = 1 # Prevents calling invoke twice
if invoke:
# First Validate
validate_value = m.validate(self, activity_tool)
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if m.getExecutionState() != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path))
elif validate_value is INVALID_PATH:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
else:
raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path))
activity_tool.unregisterMessage(self, m)
# Parse each message in SQL dict
result = self._getMessageList(activity_tool, processing=0, path=path,
**({'method_id': method_id} if method_id else {}))
for line in result:
path = line.path
line_method_id = line.method_id
if not method_dict.has_key(line_method_id):
# Only invoke once (it would be different for a queue)
# This is optimisation with the goal to process objects on the same
# node and minimize network traffic with ZEO server
method_dict[line_method_id] = 1
m = self.loadMessage(line.message, uid=line.uid, line=line)
if invoke:
# First Validate (only if message is marked as new)
if line.processing_node == -1:
validate_value = m.validate(self, activity_tool)
else:
validate_value = VALID
# LOG('SQLDict.flush validate_value',0,validate_value)
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if m.getExecutionState() != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path))
elif validate_value is INVALID_PATH:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
else:
raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path))
if result:
uid_list = readUidList(path=path, method_id=method_id)
if uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=[x.uid for x in uid_list])
def dumpMessageList(self, activity_tool):
# Dump all messages in the table.
message_list = []
......
......@@ -27,8 +27,6 @@
##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
from Queue import VALID, INVALID_PATH
from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError
from SQLBase import SQLBase, sort_message_key
from zExceptions import ExceptionFormatter
......@@ -51,7 +49,6 @@ class SQLQueue(SQLBase):
because use of OOBTree.
"""
sql_table = 'message_queue'
merge_duplicate = False
def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered]
......@@ -100,63 +97,6 @@ class SQLQueue(SQLBase):
return result[0].message_count > 0
return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
"""
object_path is a tuple
"""
delMessage = getattr(activity_tool, 'SQLBase_delMessage', None)
if delMessage is not None:
#return # Do nothing here to precent overlocking
path = '/'.join(object_path)
# Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if invoke:
# First Validate
validate_value = m.validate(self, activity_tool)
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if m.getExecutionState() != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path))
elif validate_value is INVALID_PATH:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
else:
raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path))
activity_tool.unregisterMessage(self, m)
# Parse each message in SQL queue
result = self._getMessageList(activity_tool, processing=0, path=path,
**({'method_id': method_id} if method_id else {}))
for line in result:
path = line.path
method_id = line.method_id
m = self.loadMessage(line.message, uid=line.uid, line=line)
if invoke:
# First Validate (only if message is marked as new)
if line.processing_node == -1:
validate_value = m.validate(self, activity_tool)
else:
validate_value = VALID
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if m.getExecutionState() != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
elif validate_value is INVALID_PATH:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
else:
raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path))
if result:
delMessage(table=self.sql_table, uid=[line.uid for line in result])
def countMessage(self, activity_tool, tag=None, path=None,
method_id=None, message_uid=None, **kw):
"""Return the number of messages which match the given parameters.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment