Commit 634a2284 authored by Sebastien Robin's avatar Sebastien Robin

getMessageList support now the parameter include_processing


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@5768 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 0fec037d
...@@ -186,7 +186,7 @@ class Queue: ...@@ -186,7 +186,7 @@ class Queue:
def dumpMessage(self, m): def dumpMessage(self, m):
return pickle.dumps(m) return pickle.dumps(m)
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None,**kw):
return [] return []
# Transaction Management # Transaction Management
......
...@@ -153,7 +153,7 @@ class RAMDict(Queue): ...@@ -153,7 +153,7 @@ class RAMDict(Queue):
else: else:
self.deleteMessage(activity_tool, m) self.deleteMessage(activity_tool, m)
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None,**kw):
new_queue = [] new_queue = []
for m in self.getDict(activity_tool).values(): for m in self.getDict(activity_tool).values():
m.processing_node = 1 m.processing_node = 1
......
...@@ -122,7 +122,7 @@ class RAMQueue(Queue): ...@@ -122,7 +122,7 @@ class RAMQueue(Queue):
else: else:
self.deleteMessage(activity_tool, m) self.deleteMessage(activity_tool, m)
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None,**kw):
new_queue = [] new_queue = []
for m in self.getQueue(activity_tool): for m in self.getQueue(activity_tool):
m.processing_node = 1 m.processing_node = 1
......
...@@ -390,7 +390,8 @@ class SQLDict(RAMDict): ...@@ -390,7 +390,8 @@ class SQLDict(RAMDict):
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
# Parse each message in SQL dict # Parse each message in SQL dict
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,
processing_node=None,include_processing=0)
for line in result: for line in result:
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
...@@ -417,15 +418,16 @@ class SQLDict(RAMDict): ...@@ -417,15 +418,16 @@ class SQLDict(RAMDict):
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
# YO: reading all lines might cause a deadlock # YO: reading all lines might cause a deadlock
message_list = [] message_list = []
if hasattr(activity_tool,'SQLDict_readMessageList'): if hasattr(activity_tool,'SQLDict_readMessageList'):
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None, to_processing_date=None) result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None, to_processing_date=None,include_processing=include_processing)
for line in result: for line in result:
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
m.processing_node = line.processing_node m.processing_node = line.processing_node
m.priority = line.priority m.priority = line.priority
m.processing = line.processing
message_list.append(m) message_list.append(m)
return message_list return message_list
...@@ -450,7 +452,8 @@ class SQLDict(RAMDict): ...@@ -450,7 +452,8 @@ class SQLDict(RAMDict):
else: else:
max_processing_date = None max_processing_date = None
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1, result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1,
to_processing_date = max_processing_date) # Only assign non assigned messages to_processing_date = max_processing_date,
include_processing=0) # Only assign non assigned messages
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
path_dict = {} path_dict = {}
for line in result: for line in result:
......
...@@ -200,7 +200,7 @@ class SQLQueue(RAMQueue): ...@@ -200,7 +200,7 @@ class SQLQueue(RAMQueue):
# uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process) # uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
# activity_tool.SQLQueue_assignMessage(uid = uid_list, processing_node = STOP_STATE) # activity_tool.SQLQueue_assignMessage(uid = uid_list, processing_node = STOP_STATE)
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None,**kw):
message_list = [] message_list = []
if hasattr(activity_tool,'SQLQueue_readMessageList'): if hasattr(activity_tool,'SQLQueue_readMessageList'):
result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None) result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None)
......
...@@ -97,6 +97,7 @@ class Message: ...@@ -97,6 +97,7 @@ class Message:
self.kw = kw self.kw = kw
self.is_executed = 0 self.is_executed = 0
self.exc_type = None self.exc_type = None
self.processing = None
self.user_name = str(_getAuthenticatedUser(self)) self.user_name = str(_getAuthenticatedUser(self))
# Store REQUEST Info ? # Store REQUEST Info ?
...@@ -782,7 +783,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -782,7 +783,7 @@ class ActivityTool (Folder, UniqueObject):
return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared')) return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared'))
security.declarePublic('getMessageList') security.declarePublic('getMessageList')
def getMessageList(self): def getMessageList(self,**kw):
""" """
List messages waiting in queues List messages waiting in queues
""" """
...@@ -792,7 +793,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -792,7 +793,7 @@ class ActivityTool (Folder, UniqueObject):
message_list = [] message_list = []
for activity in activity_list: for activity in activity_list:
try: try:
message_list += activity.getMessageList(self) message_list += activity.getMessageList(self,**kw)
except AttributeError: except AttributeError:
LOG('getMessageList, could not get message from Activity:',0,activity) LOG('getMessageList, could not get message from Activity:',0,activity)
return message_list return message_list
......
...@@ -11,7 +11,8 @@ class_file: ...@@ -11,7 +11,8 @@ class_file:
method_id method_id
processing_node processing_node
priority priority
to_processing_date</params> to_processing_date
include_processing</params>
<dtml-if to_processing_date>UPDATE message <dtml-if to_processing_date>UPDATE message
SET SET
processing = 0 processing = 0
...@@ -25,7 +26,10 @@ AND ...@@ -25,7 +26,10 @@ AND
</dtml-if>SELECT * FROM </dtml-if>SELECT * FROM
message message
WHERE WHERE
processing <> 1 1 = 1
<dtml-if expr="not(include_processing)">
AND processing <> 1
</dtml-if>
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if> <dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if>
<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if> <dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if> <dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment