From 2fa6bfeb078115a971fa7a64c3c5c78c59e9d72b Mon Sep 17 00:00:00 2001 From: Vincent Pelletier <vincent@nexedi.com> Date: Sat, 8 Jun 2013 23:31:53 +0200 Subject: [PATCH] Make registerMessage check for duplicate registration. isMessageRegistered duplicates work done in registerMessage, so it wastes time when creating an activity (in the likely event the activity is not a duplicate). --- product/CMFActivity/Activity/Queue.py | 7 +++++-- product/CMFActivity/Activity/SQLDict.py | 12 ++++++++---- product/CMFActivity/ActivityBuffer.py | 9 +++------ product/CMFActivity/ActivityTool.py | 1 + 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py index 0e282556c3..1c9609872c 100644 --- a/product/CMFActivity/Activity/Queue.py +++ b/product/CMFActivity/Activity/Queue.py @@ -239,16 +239,19 @@ class Queue(object): pass def isMessageRegistered(self, activity_buffer, activity_tool, m): + # BBB: deprecated message_list = activity_buffer.getMessageList(self) return m in message_list def registerMessage(self, activity_buffer, activity_tool, m): message_list = activity_buffer.getMessageList(self) + if m in message_list: + return message_list.append(m) - m.is_registered = 1 + m.is_registered = True def unregisterMessage(self, activity_buffer, activity_tool, m): - m.is_registered = 0 + m.is_registered = False def getRegisteredMessageList(self, activity_buffer, activity_tool): message_list = activity_buffer.getMessageList(self) diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 14c69fc748..888651f235 100644 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -96,17 +96,21 @@ class SQLDict(SQLBase): return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id')) def isMessageRegistered(self, activity_buffer, activity_tool, m): + # BBB: deprecated return self.generateMessageUID(m) in activity_buffer.getUidSet(self) def registerMessage(self, activity_buffer, activity_tool, m): + message_id = self.generateMessageUID(m) + uid_set = activity_buffer.getUidSet(self) + if message_id in uid_set: + return + uid_set.add(message_id) message_list = activity_buffer.getMessageList(self) message_list.append(m) - uid_set = activity_buffer.getUidSet(self) - uid_set.add(self.generateMessageUID(m)) - m.is_registered = 1 + m.is_registered = True def unregisterMessage(self, activity_buffer, activity_tool, m): - m.is_registered = 0 # This prevents from inserting deleted messages into the queue + m.is_registered = False # This prevents from inserting deleted messages into the queue class_name = self.__class__.__name__ uid_set = activity_buffer.getUidSet(self) uid_set.discard(self.generateMessageUID(m)) diff --git a/product/CMFActivity/ActivityBuffer.py b/product/CMFActivity/ActivityBuffer.py index b9cd684696..93e58cf7bf 100644 --- a/product/CMFActivity/ActivityBuffer.py +++ b/product/CMFActivity/ActivityBuffer.py @@ -93,13 +93,10 @@ class ActivityBuffer(TM): def deferredQueueMessage(self, activity_tool, activity, message): self._register(activity_tool) - # Activity is called to prevent queuing some messages (useful for example - # to prevent reindexing objects multiple times) - if not activity.isMessageRegistered(self, activity_tool, message): + assert not message.is_registered, message + activity.registerMessage(self, activity_tool, message) + if message.is_registered: self.queued_activity.append((activity, message)) - # We register queued messages so that we can - # unregister them - activity.registerMessage(self, activity_tool, message) def sortKey(self, *ignored): """Activities must be finished before databases commit transactions.""" diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index e1726a6430..9e2cd75f73 100644 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -170,6 +170,7 @@ class Message(BaseMessage): processing = None traceback = None oid = None + is_registered = False def __init__(self, obj, active_process, activity_kw, method_id, args, kw, request=None, portal_activities=None): if isinstance(obj, str): -- 2.30.9