Commit e5d17126 authored by Jean-Paul Smets's avatar Jean-Paul Smets

New version should fixed doubles

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@802 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 7ce7995e
...@@ -45,14 +45,20 @@ class RAMDict(Queue): ...@@ -45,14 +45,20 @@ class RAMDict(Queue):
Queue.__init__(self) Queue.__init__(self)
self.dict = {} self.dict = {}
def getDict(self, activity_tool):
path = activity_tool.getPhysicalPath()
if not self.queue_dict.has_key(path):
self.queue_dict[path] = {}
return self.queue_dict[path]
def finishQueueMessage(self, activity_tool, m): def finishQueueMessage(self, activity_tool, m):
if m.is_registered: if m.is_registered:
self.dict[(tuple(m.object_path), m.method_id)] = m self.getDict(activity_tool)[(tuple(m.object_path), m.method_id)] = m
def finishDeleteMessage(self, activity_tool, message): def finishDeleteMessage(self, activity_tool, message):
for key, m in self.dict.items(): for key, m in self.getDict(activity_tool).items():
if m.object_path == message.object_path and m.method_id == message.method_id: if m.object_path == message.object_path and m.method_id == message.method_id:
del self.dict[(tuple(m.object_path), m.method_id)] del self.getDict(activity_tool)[(tuple(m.object_path), m.method_id)]
def registerActivityBuffer(self, activity_buffer): def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
...@@ -70,20 +76,24 @@ class RAMDict(Queue): ...@@ -70,20 +76,24 @@ class RAMDict(Queue):
m.is_registered = 1 m.is_registered = 1
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if len(self.dict.keys()) is 0: if len(self.getDict(activity_tool).keys()) is 0:
return 1 # Go to sleep return 1 # Go to sleep
for key, m in self.dict.items(): for key, m in self.getDict(activity_tool).items():
if m.validate(self, activity_tool): if m.validate(self, activity_tool):
activity_tool.invoke(m) activity_tool.invoke(m)
del self.dict[key] if m.is_executed:
del self.getDict(activity_tool)[key]
get_transaction().commit() get_transaction().commit()
return 0 return 0
else:
# Start a new transaction and keep on to next message
get_transaction().commit()
return 1 return 1
def hasActivity(self, activity_tool, object, **kw): def hasActivity(self, activity_tool, object, **kw):
if object is not None: if object is not None:
object_path = object.getPhysicalPath() object_path = object.getPhysicalPath()
for m in self.dict.values(): for m in self.getDict(activity_tool).values():
if m.object_path == object_path: if m.object_path == object_path:
return 1 return 1
else: else:
...@@ -97,8 +107,7 @@ class RAMDict(Queue): ...@@ -97,8 +107,7 @@ class RAMDict(Queue):
# Parse each message in registered # Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self): for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
activity_tool.unregisterMessage(self, m) if not method_dict.has_key(m.method_id):
if not method_dict.has_key(method_id):
if invoke: if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool): if m.validate(self, activity_tool):
...@@ -107,18 +116,41 @@ class RAMDict(Queue): ...@@ -107,18 +116,41 @@ class RAMDict(Queue):
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path)) 'Could not evaluate %s on %s' % (method_id , path))
else:
method_dict[m.method_id] = 1
activity_tool.unregisterMessage(self, m)
else: else:
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
else:
method_dict[m.method_id] = 1
activity_tool.unregisterMessage(self, m)
else:
method_dict[m.method_id] = 1
activity_tool.unregisterMessage(self, m)
# Parse each message in RAM dict # Parse each message in RAM dict
for key, m in self.dict.items(): for key, m in self.getDict(activity_tool).items():
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if not method_dict.has_key(m.method_id):
LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path))
if invoke: activity_tool.invoke(m) if invoke:
activity_tool.invoke(m)
if m.is_executed:
method_dict[m.method_id] = 1
self.deleteMessage(activity_tool, m)
else:
method_dict[m.method_id] = 1
self.deleteMessage(activity_tool, m)
else:
self.deleteMessage(activity_tool, m) self.deleteMessage(activity_tool, m)
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None):
return self.dict.values() new_queue = []
for m in self.getDict(activity_tool).values():
m.processing_node = 1
m.priority = 0
new_queue.append(m)
return new_queue
registerActivity(RAMDict) registerActivity(RAMDict)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment