From 025251c6bdf6170741c517094b0b6d9dfa2622d1 Mon Sep 17 00:00:00 2001 From: Yoshinori Okuji <yo@nexedi.com> Date: Fri, 23 Jun 2006 13:06:08 +0000 Subject: [PATCH] Ensure that SQLQueue never leave messages as processed. git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@8158 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/CMFActivity/Activity/SQLQueue.py | 81 ++++++++++++++---------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index a959ff9fa0..a99f4f7203 100644 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -95,45 +95,56 @@ class SQLQueue(RAMQueue): # Make sure message can not be processed anylonger activity_tool.SQLQueue_processMessage(uid=line.uid) get_transaction().commit() # Release locks before starting a potentially long calculation - m = self.loadMessage(line.message) - # Make sure object exists - validation_state = m.validate(self, activity_tool) - if validation_state is not VALID: - if validation_state in (EXCEPTION, INVALID_PATH): - if line.priority > MAX_PRIORITY: - # This is an error - activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE) - # Assign message back to 'error' state - #m.notifyUser(activity_tool) # Notify Error - get_transaction().commit() # and commit + + # At this point, the message is marked as processed. + try: + m = self.loadMessage(line.message) + # Make sure object exists + validation_state = m.validate(self, activity_tool) + if validation_state is not VALID: + if validation_state in (EXCEPTION, INVALID_PATH): + if line.priority > MAX_PRIORITY: + # This is an error. + # Assign message back to 'error' state. + activity_tool.SQLQueue_assignMessage(uid=line.uid, + processing_node = VALIDATE_ERROR_STATE) + get_transaction().commit() # and commit + else: + # Lower priority + activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) + get_transaction().commit() # Release locks before starting a potentially long calculation else: - # Lower priority - activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) + # We do not lower priority for INVALID_ORDER errors but we do postpone execution + activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date, + priority = line.priority) get_transaction().commit() # Release locks before starting a potentially long calculation - else: - # We do not lower priority for INVALID_ORDER errors but we do postpone execution - activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date, - priority = line.priority) - get_transaction().commit() # Release locks before starting a potentially long calculation + return 0 + except: + # If any exception occurs, catch it and delay the operation. + get_transaction().abort() + activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date, + priority = line.priority) + get_transaction().commit() + return 0 + + # Try to invoke + activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? + if m.is_executed: # Make sure message could be invoked + activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it + get_transaction().commit() # If successful, commit else: - # Try to invoke - activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? - if m.is_executed: # Make sure message could be invoked - activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it - get_transaction().commit() # If successful, commit + get_transaction().abort() # If not, abort transaction and start a new one + if line.priority > MAX_PRIORITY: + # This is an error + activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE) + # Assign message back to 'error' state + m.notifyUser(activity_tool) # Notify Error + get_transaction().commit() # and commit else: - get_transaction().abort() # If not, abort transaction and start a new one - if line.priority > MAX_PRIORITY: - # This is an error - activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE) - # Assign message back to 'error' state - m.notifyUser(activity_tool) # Notify Error - get_transaction().commit() # and commit - else: - # Lower priority - activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date, - priority = line.priority + 1) - get_transaction().commit() # Release locks before starting a potentially long calculation + # Lower priority + activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date, + priority = line.priority + 1) + get_transaction().commit() # Release locks before starting a potentially long calculation return 0 get_transaction().commit() # Release locks before starting a potentially long calculation return 1 -- 2.30.9