diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py index 56f092fa63a8a6459afb901aa726d627e891083a..0a5013bb5f3fdcac1e5a4c8fde3c1dc7d55feaf1 100755 --- a/product/CMFActivity/Activity/Queue.py +++ b/product/CMFActivity/Activity/Queue.py @@ -76,7 +76,6 @@ class Queue: # we can set some global variables related # to the ZODB context if not self.is_initialized: - self.activity_tool = activity_tool self.is_initialized = 1 def queueMessage(self, activity_tool, m): @@ -169,17 +168,14 @@ class Queue: # Registration Management def registerActivityBuffer(self, activity_buffer): class_name = self.__class__.__name__ - if not hasattr(activity_buffer, '_%s_message_list' % class_name): - setattr(activity_buffer, '_%s_message_list' % class_name, []) + setattr(activity_buffer, '_%s_message_list' % class_name, []) def isMessageRegistered(self, activity_buffer, activity_tool, m): class_name = self.__class__.__name__ - self.registerActivityBuffer(activity_buffer) return m in getattr(activity_buffer, '_%s_message_list' % class_name) def registerMessage(self, activity_buffer, activity_tool, m): class_name = self.__class__.__name__ - self.registerActivityBuffer(activity_buffer) getattr(activity_buffer, '_%s_message_list' % class_name).append(m) m.is_registered = 1 diff --git a/product/CMFActivity/Activity/RAMDict.py b/product/CMFActivity/Activity/RAMDict.py index dc5a9e6ef55c45ba481c9197a4efc86eb6ec28d5..32e8c6d1df188ab024fa0630c6938ce5e04f0066 100755 --- a/product/CMFActivity/Activity/RAMDict.py +++ b/product/CMFActivity/Activity/RAMDict.py @@ -56,18 +56,15 @@ class RAMDict(Queue): def registerActivityBuffer(self, activity_buffer): class_name = self.__class__.__name__ - if not hasattr(activity_buffer, '_%s_message_list' % class_name): - setattr(activity_buffer, '_%s_message_list' % class_name, []) - setattr(activity_buffer, '_%s_uid_dict' % class_name, {}) + setattr(activity_buffer, '_%s_message_list' % class_name, []) + setattr(activity_buffer, '_%s_uid_dict' % class_name, {}) def isMessageRegistered(self, activity_buffer, activity_tool, m): class_name = self.__class__.__name__ - self.registerActivityBuffer(activity_buffer) return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((m.object_path, m.method_id)) def registerMessage(self, activity_buffer, activity_tool, m): class_name = self.__class__.__name__ - self.registerActivityBuffer(activity_buffer) getattr(activity_buffer, '_%s_message_list' % class_name).append(m) getattr(activity_buffer, '_%s_uid_dict' % class_name)[(m.object_path, m.method_id)] = 1 m.is_registered = 1 @@ -96,7 +93,7 @@ class RAMDict(Queue): # Parse each message in registered for m in activity_tool.getRegisteredMessageList(self): if object_path == m.object_path and (method_id is None or method_id == m.method_id): - self.unregisterMessage(m) + activity_tool.unregisterMessage(self, m) if not method_dict.has_key(method_id): if invoke: # First Validate diff --git a/product/CMFActivity/Activity/RAMQueue.py b/product/CMFActivity/Activity/RAMQueue.py index 46e95df4fe19c2d551979d0f88dbd75868d23e79..7130cb602656e062461ab9b7cfa0fadf68e6fc9c 100755 --- a/product/CMFActivity/Activity/RAMQueue.py +++ b/product/CMFActivity/Activity/RAMQueue.py @@ -74,7 +74,7 @@ class RAMQueue(Queue): for m in activity_tool.getRegisteredMessageList(self): if object_path == m.object_path and (method_id is None or method_id == m.method_id): if invoke: activity_tool.invoke(m) - self.unregisterMessage(m) + activity_tool.unregisterMessage(self, m) # Parse each message in queue for m in self.queue: if object_path == m.object_path and (method_id is None or method_id == m.method_id): diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index c87826136584c4048729b3c331e14547e7ff1b93..ae7263a50a0197ad10f50044a402d12adc82fcdd 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -68,22 +68,18 @@ class SQLDict(RAMDict): # Registration management def registerActivityBuffer(self, activity_buffer): - if not hasattr(activity_buffer, '_sqldict_uid_dict'): - activity_buffer._sqldict_uid_dict = {} - activity_buffer._sqldict_message_list = [] - + activity_buffer._sqldict_uid_dict = {} + activity_buffer._sqldict_message_list = [] + def isMessageRegistered(self, activity_buffer, activity_tool, m): - self.registerActivityBuffer(activity_buffer) return activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id)) def registerMessage(self, activity_buffer, activity_tool, m): - self.registerActivityBuffer(activity_buffer) m.is_registered = 1 activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)] = 1 activity_buffer._sqldict_message_list.append(m) def unregisterMessage(self, activity_buffer, activity_tool, m): - self.registerActivityBuffer(activity_buffer) m.is_registered = 0 # This prevents from inserting deleted messages into the queue if activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id)): del activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)] @@ -182,7 +178,7 @@ class SQLDict(RAMDict): # Parse each message in registered for m in activity_tool.getRegisteredMessageList(self): if object_path == m.object_path and (method_id is None or method_id == m.method_id): - self.unregisterMessage(m) + activity_tool.unregisterMessage(self, m) if not method_dict.has_key(method_id): if invoke: # First Validate diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index ac14baed9adffb690d6583f6be33206608977540..65cf6ff80322ab42e6e6f58868f8db2cbd1e0a7e 100755 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -136,7 +136,7 @@ class SQLQueue(RAMQueue): for m in activity_tool.getRegisteredMessageList(self): if object_path == m.object_path and (method_id is None or method_id == m.method_id): if invoke: activity_tool.invoke(m) - self.unregisterMessage(m) + activity_tool.unregisterMessage(self, m) # Parse each message in SQL queue # LOG('Flush', 0, str((path, invoke, method_id))) result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) diff --git a/product/CMFActivity/ActivityBuffer.py b/product/CMFActivity/ActivityBuffer.py index 17917de8d64de9af43ec69a563258dc3e2a32e5c..ff2e2b40a0ed51c5e25fc0bf4902695a84481d0e 100755 --- a/product/CMFActivity/ActivityBuffer.py +++ b/product/CMFActivity/ActivityBuffer.py @@ -42,12 +42,15 @@ class ActivityBuffer(TM): # at end of transaction def _begin(self, *ignored): from thread import get_ident + from ActivityTool import activity_list self._tlock.acquire() self._tthread = get_ident() self.requires_prepare = 1 try: self.queued_activity = [] self.flushed_activity = [] + for activity in activity_list: # Reset registration for each transaction + activity.registerActivityBuffer(self) except: LOG('ActivityBuffer', ERROR, "exception during _begin", error=sys.exc_info()) diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 07c702186d7e88ef4efb1916db282a034e903f1e..91e40c9f511286ef2839a5a252c8b3d6a0bb7ba3 100755 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -316,7 +316,10 @@ class ActivityTool (Folder, UniqueObject): self._v_activity_buffer.deferredDeleteMessage(self, activity, message) def getRegisteredMessageList(self, activity): - return activity.getRegisteredMessageList(self, self._v_activity_buffer) + return activity.getRegisteredMessageList(self._v_activity_buffer, self) + + def unregisterMessage(self, activity, message): + return activity.unregisterMessage(self._v_activity_buffer, self, message) def flush(self, object, invoke=0, **kw): global is_initialized