diff --git a/product/CMFActivity/Activity/RAMDict.py b/product/CMFActivity/Activity/RAMDict.py index 87addc6a4396c92e1448eff67e501fd8049f2332..30514923792e1b203ae8e239a934561c4b0fa951 100755 --- a/product/CMFActivity/Activity/RAMDict.py +++ b/product/CMFActivity/Activity/RAMDict.py @@ -56,19 +56,19 @@ class RAMDict(Queue): def registerActivityBuffer(self, activity_buffer): class_name = self.__class__.__name__ - setattr(activity_buffer, '_%s_message_list' % class_name, []) - setattr(activity_buffer, '_%s_uid_dict' % class_name, {}) - + setattr(activity_buffer, '_%s_message_list' % class_name, []) + setattr(activity_buffer, '_%s_uid_dict' % class_name, {}) + def isMessageRegistered(self, activity_buffer, activity_tool, m): class_name = self.__class__.__name__ return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((tuple(m.object_path), m.method_id)) - + def registerMessage(self, activity_buffer, activity_tool, m): class_name = self.__class__.__name__ getattr(activity_buffer, '_%s_message_list' % class_name).append(m) getattr(activity_buffer, '_%s_uid_dict' % class_name)[(tuple(m.object_path), m.method_id)] = 1 m.is_registered = 1 - + def dequeueMessage(self, activity_tool, processing_node): if len(self.dict.keys()) is 0: return 1 # Go to sleep @@ -76,6 +76,7 @@ class RAMDict(Queue): if m.validate(self, activity_tool): activity_tool.invoke(m) del self.dict[key] + get_transaction().commit() return 0 return 1 @@ -89,7 +90,7 @@ class RAMDict(Queue): return 1 # Default behaviour if no object specified is to return 1 until active_process implemented return 0 - def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): + def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): path = '/'.join(object_path) # LOG('Flush', 0, str((path, invoke, method_id))) method_dict = {} diff --git a/product/CMFActivity/Activity/RAMQueue.py b/product/CMFActivity/Activity/RAMQueue.py index 2640adca3d85b57a1dcc6ac87a0f802eb3dc4db4..fe33e421bd6971c39d2af104c4ea7ffc177dfe60 100755 --- a/product/CMFActivity/Activity/RAMQueue.py +++ b/product/CMFActivity/Activity/RAMQueue.py @@ -60,13 +60,14 @@ class RAMQueue(Queue): del queue[i] return i = i + 1 - + def dequeueMessage(self, activity_tool, processing_node): if len(self.getQueue(activity_tool)) is 0: return 1 # Go to sleep m = self.getQueue(activity_tool)[0] activity_tool.invoke(m) self.deleteMessage(activity_tool, m) + get_transaction().commit() return 0 # Keep on ticking def hasActivity(self, activity_tool, object, **kw):