Commit fcacb518 authored by Jean-Paul Smets's avatar Jean-Paul Smets

based on SQLDict - first tests


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@366 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 2be90dd3
...@@ -26,12 +26,29 @@ ...@@ -26,12 +26,29 @@
# #
############################################################################## ##############################################################################
import random
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from SQLDict import SQLDict from RAMQueue import RAMQueue
from zLOG import LOG from zLOG import LOG
class SQLQueue(SQLDict): MAX_PRIORITY = 5
DISTRIBUTABLE_STATE = -1
INVOKE_ERROR_STATE = -2
VALIDATE_ERROR_STATE = -3
priority_weight = \
[1] * 64 + \
[2] * 20 + \
[3] * 10 + \
[4] * 5 + \
[5] * 1
class ActivityFlushError(Exception):
"""Error during active message flush"""
class SQLQueue(RAMQueue):
""" """
A simple OOBTree based queue. It should be compatible with transactions A simple OOBTree based queue. It should be compatible with transactions
and provide sequentiality. Should not create conflict and provide sequentiality. Should not create conflict
...@@ -39,41 +56,83 @@ class SQLQueue(SQLDict): ...@@ -39,41 +56,83 @@ class SQLQueue(SQLDict):
""" """
def queueMessage(self, activity_tool, m): def queueMessage(self, activity_tool, m):
activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , method_id = m.method_id, message = self.dumpMessage(m)) activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) ,
method_id = m.method_id,
priority = m.activity_kw.get('priority', 1),
message = self.dumpMessage(m))
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
#activity_tool.SQLDict_lockMessage() # Too slow... priority = random.choice(priority_weight)
result = activity_tool.SQLDict_readMessage() # Try to find a message at given priority level
result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority)
if len(result) == 0:
# If empty, take any message
result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None)
if len(result) > 0: if len(result) > 0:
line = result[0] line = result[0]
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
activity_tool.SQLDict_processMessage(path=path, method_id=method_id, processing_node=1) # Make sure message can not be processed anylonger
#activity_tool.SQLDict_unlockMessage() # Too slow... activity_tool.SQLQueue_processMessage(uid=line.uid)
get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
if m.validate(self, activity_tool): # Make sure object exists
activity_tool.invoke(m) if not m.validate(self, activity_tool):
activity_tool.SQLDict_delMessage(message_id = 222) # We will need a message_id if line.priority > MAX_PRIORITY:
# This is an error
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state
get_transaction().commit() # and commit
else:
# Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
else:
# Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked
activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it
get_transaction().commit() # If successful, commit
else:
get_transaction().abort() # If not, abort transaction and start a new one
if line.priority > MAX_PRIORITY:
# This is an error
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
get_transaction().commit() # and commit
else:
# Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
#activity_tool.SQLDict_unlockMessage() get_transaction().commit() # Release locks before starting a potentially long calculation
return 1 return 1
def hasActivity(self, activity_tool, object, method_id=None, **kw): def hasActivity(self, activity_tool, object, method_id=None, **kw):
my_object_path = '/'.join(object.getPhysicalPath()) my_object_path = '/'.join(object.getPhysicalPath())
result = activity_tool.SQLDict_hasMessage(path=my_object_path, method_id=method_id) result = activity_tool.SQLQueue_hasMessage(path=my_object_path, method_id=method_id)
if len(result) > 0: if len(result) > 0:
return result[0].message_count > 0 return result[0].message_count > 0
return 0 return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
""" """
object_path is a tuple object_path is a tuple
commit allows to choose mode
- if we commit, then we make sure no locks are taken for too long
- if we do not commit, then we can use flush in a larger transaction
commit should in general not be used
NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
""" """
path = '/'.join(object_path) path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id)
method_dict = {}
if invoke: if invoke:
result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None)
method_dict = {}
# Parse each message
for line in result: for line in result:
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
...@@ -81,16 +140,44 @@ class SQLQueue(SQLDict): ...@@ -81,16 +140,44 @@ class SQLQueue(SQLDict):
# Only invoke once (it would be different for a queue) # Only invoke once (it would be different for a queue)
method_dict[method_id] = 1 method_dict[method_id] = 1
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
# First Validate
if m.validate(self, activity_tool): if m.validate(self, activity_tool):
activity_tool.invoke(m) activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
activity_tool.SQLDict_delMessage(path=path, method_id=method_id) if not m.is_executed: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
else:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
# Erase all messages in a single transaction
activity_tool.SQLQueue_delMessage(path=path, method_id=method_id) # Delete all "old" messages (not -1 processing)
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None):
message_list = [] message_list = []
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None) result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None)
for line in result: for line in result:
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
m.processing_node = line.processing_node
m.priority = line.priority
message_list.append(m) message_list.append(m)
return message_list return message_list
def distribute(self, activity_tool, node_count):
processing_node = 1
result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
get_transaction().commit() # Release locks before starting a potentially long calculation
path_dict = {}
for line in result:
path = line.path
if not path_dict.has_key(path):
# Only assign once (it would be different for a queue)
path_dict[path] = 1
activity_tool.SQLQueue_assignMessage(path=path, processing_node=processing_node)
get_transaction().commit() # Release locks immediately to allow processing of messages
processing_node = processing_node + 1
if processing_node > node_count:
processing_node = 1 # Round robin
registerActivity(SQLQueue) registerActivity(SQLQueue)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment