Commit 247ad2ab authored by Jean-Paul Smets's avatar Jean-Paul Smets

make sure only one thread on one message


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@321 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent e5de1f1b
...@@ -47,11 +47,13 @@ class SQLDict(RAMDict): ...@@ -47,11 +47,13 @@ class SQLDict(RAMDict):
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
result = activity_tool.SQLDict_readMessage(processing_node=processing_node) result = activity_tool.SQLDict_readMessage(processing_node=processing_node)
get_transaction().commit() # Release locks before starting a potentially long calculation
if len(result) > 0: if len(result) > 0:
line = result[0] line = result[0]
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
# Make sure message can not be processed anylonger
activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = INVOKE_ERROR_STATE)
get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
if m.validate(self, activity_tool): if m.validate(self, activity_tool):
activity_tool.invoke(m) # Try to invoke the message activity_tool.invoke(m) # Try to invoke the message
...@@ -60,7 +62,7 @@ class SQLDict(RAMDict): ...@@ -60,7 +62,7 @@ class SQLDict(RAMDict):
get_transaction().commit() # If successful, commit get_transaction().commit() # If successful, commit
else: else:
get_transaction().abort() # If not, abort transaction and start a new one get_transaction().abort() # If not, abort transaction and start a new one
activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = INVOKE_ERROR_STATE) #activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
get_transaction().commit() # and commit get_transaction().commit() # and commit
else: else:
...@@ -68,6 +70,7 @@ class SQLDict(RAMDict): ...@@ -68,6 +70,7 @@ class SQLDict(RAMDict):
# Assign message back to 'error' state # Assign message back to 'error' state
get_transaction().commit() # and commit get_transaction().commit() # and commit
return 0 return 0
get_transaction().commit() # Release locks before starting a potentially long calculation
return 1 return 1
def hasActivity(self, activity_tool, object, method_id=None, **kw): def hasActivity(self, activity_tool, object, method_id=None, **kw):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment