Commit e3f26c7c authored by Vincent Pelletier's avatar Vincent Pelletier

Update all activity queues to use accessors instead of is_executed.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@24099 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 97ef2d88
...@@ -86,7 +86,7 @@ class RAMDict(Queue): ...@@ -86,7 +86,7 @@ class RAMDict(Queue):
for key, m in self.getDict(path).items(): for key, m in self.getDict(path).items():
if m.validate(self, activity_tool) is VALID: if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m) activity_tool.invoke(m)
if m.is_executed == MESSAGE_EXECUTED: if m.getExecutionState() == MESSAGE_EXECUTED:
del self.getDict(path)[key] del self.getDict(path)[key]
get_transaction().commit() get_transaction().commit()
return 0 return 0
...@@ -136,7 +136,7 @@ class RAMDict(Queue): ...@@ -136,7 +136,7 @@ class RAMDict(Queue):
# First Validate # First Validate
if m.validate(self, activity_tool) is VALID: if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked if m.getExecutionState() != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path)) 'Could not evaluate %s on %s' % (method_id , path))
...@@ -161,7 +161,7 @@ class RAMDict(Queue): ...@@ -161,7 +161,7 @@ class RAMDict(Queue):
LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path))
if invoke: if invoke:
activity_tool.invoke(m) activity_tool.invoke(m)
if m.is_executed == MESSAGE_EXECUTED: if m.getExecutionState() == MESSAGE_EXECUTED:
method_dict[m.method_id] = 1 method_dict[m.method_id] = 1
self.deleteMessage(activity_tool, m) self.deleteMessage(activity_tool, m)
else: else:
......
...@@ -70,7 +70,7 @@ class RAMQueue(Queue): ...@@ -70,7 +70,7 @@ class RAMQueue(Queue):
get_transaction().commit() # Start a new transaction get_transaction().commit() # Start a new transaction
return 0 # Keep on ticking return 0 # Keep on ticking
activity_tool.invoke(m) activity_tool.invoke(m)
if m.is_executed == MESSAGE_EXECUTED: if m.getExecutionState() == MESSAGE_EXECUTED:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
get_transaction().commit() # Start a new transaction get_transaction().commit() # Start a new transaction
return 0 # Keep on ticking return 0 # Keep on ticking
...@@ -117,7 +117,7 @@ class RAMQueue(Queue): ...@@ -117,7 +117,7 @@ class RAMQueue(Queue):
else: else:
if invoke: if invoke:
activity_tool.invoke(m) activity_tool.invoke(m)
if m.is_executed == MESSAGE_EXECUTED: if m.getExecutionState() == MESSAGE_EXECUTED:
activity_tool.unregisterMessage(self, m) activity_tool.unregisterMessage(self, m)
else: else:
activity_tool.unregisterMessage(self, m) activity_tool.unregisterMessage(self, m)
...@@ -130,7 +130,7 @@ class RAMQueue(Queue): ...@@ -130,7 +130,7 @@ class RAMQueue(Queue):
else: else:
if invoke: if invoke:
activity_tool.invoke(m) activity_tool.invoke(m)
if m.is_executed == MESSAGE_EXECUTED: if m.getExecutionState() == MESSAGE_EXECUTED:
self.deleteMessage(activity_tool, m) # Only delete if no error happens self.deleteMessage(activity_tool, m) # Only delete if no error happens
else: else:
self.deleteMessage(activity_tool, m) self.deleteMessage(activity_tool, m)
......
...@@ -338,9 +338,9 @@ class SQLDict(RAMDict, SQLBase): ...@@ -338,9 +338,9 @@ class SQLDict(RAMDict, SQLBase):
message_with_active_process_list = [] message_with_active_process_list = []
notify_user_list = [] notify_user_list = []
non_executable_message_list = [] non_executable_message_list = []
something_failed = (len([x for x in message_uid_priority_list if x[1].is_executed == MESSAGE_NOT_EXECUTED]) != 0) something_failed = (len([x for x in message_uid_priority_list if x[1].getExecutionState() == MESSAGE_NOT_EXECUTED]) != 0)
for uid, m, priority in message_uid_priority_list: for uid, m, priority in message_uid_priority_list:
if m.is_executed == MESSAGE_EXECUTED: if m.getExecutionState() == MESSAGE_EXECUTED:
if something_failed: if something_failed:
make_available_uid_list.append(uid) make_available_uid_list.append(uid)
make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, [])) make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
...@@ -351,7 +351,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -351,7 +351,7 @@ class SQLDict(RAMDict, SQLBase):
# XXX: Bug here: Even if a duplicate message has an active_process, # XXX: Bug here: Even if a duplicate message has an active_process,
# it won't be called on the duplicate. # it won't be called on the duplicate.
message_with_active_process_list.append(m) message_with_active_process_list.append(m)
elif m.is_executed == MESSAGE_NOT_EXECUTED: elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
# Should duplicate messages follow strictly the original message, or # Should duplicate messages follow strictly the original message, or
# should they be just made available again ? # should they be just made available again ?
make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, [])) make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
...@@ -476,7 +476,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -476,7 +476,7 @@ class SQLDict(RAMDict, SQLBase):
else: else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list)) LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
# Abort if something failed. # Abort if something failed.
if len([x for x in message_uid_priority_list if x[1].is_executed == MESSAGE_NOT_EXECUTED]) != 0: if len([x for x in message_uid_priority_list if x[1].getExecutionState() == MESSAGE_NOT_EXECUTED]) != 0:
endTransaction = abortTransactionSynchronously endTransaction = abortTransactionSynchronously
else: else:
endTransaction = get_transaction().commit endTransaction = get_transaction().commit
...@@ -493,15 +493,8 @@ class SQLDict(RAMDict, SQLBase): ...@@ -493,15 +493,8 @@ class SQLDict(RAMDict, SQLBase):
LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.') LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
raise raise
exc_info = sys.exc_info() exc_info = sys.exc_info()
exc_type = exc_info[0]
exc_value = str(exc_info[1])
traceback = ''.join(ExceptionFormatter.format_exception(
*exc_info))
for x in message_uid_priority_list: for x in message_uid_priority_list:
x[1].is_executed = MESSAGE_NOT_EXECUTED x[1].setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
x[1].exc_type = exc_type
x[1].exc_value = exc_value
x[1].traceback = traceback
failed_message_uid_list = [x[0] for x in message_uid_priority_list] failed_message_uid_list = [x[0] for x in message_uid_priority_list]
try: try:
makeMessageListAvailable(failed_message_uid_list, uid_to_duplicate_uid_list_dict) makeMessageListAvailable(failed_message_uid_list, uid_to_duplicate_uid_list_dict)
...@@ -553,7 +546,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -553,7 +546,7 @@ class SQLDict(RAMDict, SQLBase):
validate_value = m.validate(self, activity_tool) validate_value = m.validate(self, activity_tool)
if validate_value is VALID: if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked if m.getExecutionState() != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path)) 'Could not evaluate %s on %s' % (m.method_id , path))
...@@ -586,8 +579,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -586,8 +579,7 @@ class SQLDict(RAMDict, SQLBase):
# LOG('SQLDict.flush validate_value',0,validate_value) # LOG('SQLDict.flush validate_value',0,validate_value)
if validate_value is VALID: if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
# LOG('SQLDict.flush m.is_executed',0,m.is_executed) if m.getExecutionState() != MESSAGE_EXECUTED: # Make sure message could be invoked
if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path)) 'Could not evaluate %s on %s' % (m.method_id , path))
......
...@@ -200,11 +200,11 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -200,11 +200,11 @@ class SQLQueue(RAMQueue, SQLBase):
notify_user_list = [] notify_user_list = []
non_executable_message_list = [] non_executable_message_list = []
for uid, m, priority in message_uid_priority_list: for uid, m, priority in message_uid_priority_list:
if m.is_executed == MESSAGE_EXECUTED: if m.getExecutionState() == MESSAGE_EXECUTED:
deletable_uid_list.append(uid) deletable_uid_list.append(uid)
if m.active_process: if m.active_process:
message_with_active_process_list.append(m) message_with_active_process_list.append(m)
elif m.is_executed == MESSAGE_NOT_EXECUTED: elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
if type(m.exc_type) is ClassType and \ if type(m.exc_type) is ClassType and \
issubclass(m.exc_type, ConflictError): issubclass(m.exc_type, ConflictError):
delay_uid_list.append(uid) delay_uid_list.append(uid)
...@@ -300,7 +300,7 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -300,7 +300,7 @@ class SQLQueue(RAMQueue, SQLBase):
# Try to invoke # Try to invoke
try: try:
activity_tool.invoke(value[1]) activity_tool.invoke(value[1])
if value[1].is_executed != MESSAGE_NOT_EXECUTED: if value[1].getExecutionState() != MESSAGE_NOT_EXECUTED:
# Commit so that if a message raises it doesn't causes previous # Commit so that if a message raises it doesn't causes previous
# successfull messages to be rolled back. This commit might fail, # successfull messages to be rolled back. This commit might fail,
# so it is protected the same way as activity execution by the # so it is protected the same way as activity execution by the
...@@ -320,12 +320,7 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -320,12 +320,7 @@ class SQLQueue(RAMQueue, SQLBase):
# We must make sure that the message is not set as executed. # We must make sure that the message is not set as executed.
# It is possible that the message is executed but the commit # It is possible that the message is executed but the commit
# of the transaction fails # of the transaction fails
value[1].is_executed = MESSAGE_NOT_EXECUTED value[1].setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
exc_info = sys.exc_info()
value[1].exc_type = exc_info[0]
value[1].exc_value = str(exc_info[1])
value[1].traceback = ''.join(ExceptionFormatter.format_exception(
*exc_info))
try: try:
makeMessageListAvailable([value[0]]) makeMessageListAvailable([value[0]])
except: except:
...@@ -386,7 +381,7 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -386,7 +381,7 @@ class SQLQueue(RAMQueue, SQLBase):
validate_value = m.validate(self, activity_tool) validate_value = m.validate(self, activity_tool)
if validate_value is VALID: if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked if m.getExecutionState() != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path)) 'Could not evaluate %s on %s' % (m.method_id , path))
...@@ -412,7 +407,7 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -412,7 +407,7 @@ class SQLQueue(RAMQueue, SQLBase):
validate_value = VALID validate_value = VALID
if validate_value is VALID: if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked if m.getExecutionState() != MESSAGE_EXECUTED: # Make sure message could be invoked
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path)) 'Could not evaluate %s on %s' % (method_id , path))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment