Commit 283f2654 authored by Sebastien Robin's avatar Sebastien Robin

updated SQLQueue in order to support more validation


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@1790 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 9f6925f4
...@@ -29,7 +29,8 @@ ...@@ -29,7 +29,8 @@
import random import random
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from RAMQueue import RAMQueue from RAMQueue import RAMQueue
from Queue import VALID from DateTime import DateTime
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY, SECONDS_IN_DAY
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from zLOG import LOG from zLOG import LOG
...@@ -58,7 +59,8 @@ class SQLQueue(RAMQueue): ...@@ -58,7 +59,8 @@ class SQLQueue(RAMQueue):
activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) , activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) ,
method_id = m.method_id, method_id = m.method_id,
priority = m.activity_kw.get('priority', 1), priority = m.activity_kw.get('priority', 1),
message = self.dumpMessage(m)) message = self.dumpMessage(m),
date = m.activity_kw.get('at_date', DateTime()))
def prepareDeleteMessage(self, activity_tool, m): def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction # Erase all messages in a single transaction
...@@ -67,12 +69,16 @@ class SQLQueue(RAMQueue): ...@@ -67,12 +69,16 @@ class SQLQueue(RAMQueue):
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if hasattr(activity_tool,'SQLQueue_readMessageList'): if hasattr(activity_tool,'SQLQueue_readMessageList'):
now_date = DateTime()
# Next processing date in case of error
next_processing_date = now_date + VALIDATION_ERROR_DELAY
priority = random.choice(priority_weight) priority = random.choice(priority_weight)
# Try to find a message at given priority level # Try to find a message at given priority level
result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority) result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority,
to_date=now_date)
if len(result) == 0: if len(result) == 0:
# If empty, take any message # If empty, take any message
result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None) result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None,to_date=now_date)
if len(result) > 0: if len(result) > 0:
line = result[0] line = result[0]
path = line.path path = line.path
...@@ -82,16 +88,23 @@ class SQLQueue(RAMQueue): ...@@ -82,16 +88,23 @@ class SQLQueue(RAMQueue):
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
# Make sure object exists # Make sure object exists
if m.validate(self, activity_tool) is not VALID: validation_state = m.validate(self, activity_tool)
if line.priority > MAX_PRIORITY: if validation_state is not VALID:
# This is an error if validation_state in (EXCEPTION, INVALID_PATH):
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE) if line.priority > MAX_PRIORITY:
# Assign message back to 'error' state # This is an error
#m.notifyUser(activity_tool) # Notify Error activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
get_transaction().commit() # and commit # Assign message back to 'error' state
#m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
else: else:
# Lower priority # We do not lower priority for INVALID_ORDER errors but we do postpone execution
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
priority = line.priority)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
else: else:
# Try to invoke # Try to invoke
...@@ -109,7 +122,8 @@ class SQLQueue(RAMQueue): ...@@ -109,7 +122,8 @@ class SQLQueue(RAMQueue):
get_transaction().commit() # and commit get_transaction().commit() # and commit
else: else:
# Lower priority # Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date,
priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
...@@ -207,4 +221,36 @@ class SQLQueue(RAMQueue): ...@@ -207,4 +221,36 @@ class SQLQueue(RAMQueue):
if processing_node > node_count: if processing_node > node_count:
processing_node = 1 # Round robin processing_node = 1 # Round robin
# Validation private methods
def _validate_after_method_id(self, activity_tool, message, value):
# Count number of occurances of method_id
LOG('SQLQueue._validate_after_method_id, message',0,message)
LOG('SQLQueue._validate_after_method_id, value',0,value)
result = activity_tool.SQLQueue_validateMessageList(method_id=value, message_uid=None, path=None)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_path(self, activity_tool, message, value):
# Count number of occurances of path
result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, path=value)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_message_uid(self, activity_tool, message, value):
# Count number of occurances of message_uid
result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=value, path=None)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
# Required for tests (time shift)
def timeShift(self, activity_tool, delay):
"""
To simulate timeShift, we simply substract delay from
all dates in SQLDict message table
"""
activity_tool.SQLQueue_timeShift(delay = delay * SECONDS_IN_DAY)
registerActivity(SQLQueue) registerActivity(SQLQueue)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment