From 2a48f66fdb19298bf204efa2fb6442ac47e45cbc Mon Sep 17 00:00:00 2001 From: Yoshinori Okuji <yo@nexedi.com> Date: Fri, 23 Jun 2006 11:35:04 +0000 Subject: [PATCH] Do not refer to the activity tool in _finish. Instead, store the path in ActivityBuffer in __init__. Make the parameter activity_tool to __init__ in ActivityBuffer obligatory. Some performance tuning. git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@8153 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/CMFActivity/Activity/Queue.py | 30 +-- product/CMFActivity/Activity/RAMDict.py | 49 ++--- product/CMFActivity/Activity/RAMQueue.py | 43 ++--- product/CMFActivity/ActivityBuffer.py | 222 +++++++++++------------ 4 files changed, 172 insertions(+), 172 deletions(-) diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py index 2dff336b24..c033ebc11f 100644 --- a/product/CMFActivity/Activity/Queue.py +++ b/product/CMFActivity/Activity/Queue.py @@ -188,50 +188,50 @@ class Queue: def getMessageList(self, activity_tool, processing_node=None,**kw): return [] - + # Transaction Management def prepareQueueMessage(self, activity_tool, m): # Called to prepare transaction commit for queued messages pass - - def finishQueueMessage(self, activity_tool, m): + + def finishQueueMessage(self, activity_tool_path, m): # Called to commit queued messages pass def prepareDeleteMessage(self, activity_tool, m): # Called to prepare transaction commit for deleted messages pass - - def finishDeleteMessage(self, activity_tool, m): + + def finishDeleteMessage(self, activity_tool_path, m): # Called to commit deleted messages pass - + # Registration Management def registerActivityBuffer(self, activity_buffer): class_name = self.__class__.__name__ setattr(activity_buffer, '_%s_message_list' % class_name, []) - + def isMessageRegistered(self, activity_buffer, activity_tool, m): class_name = self.__class__.__name__ return m in getattr(activity_buffer, '_%s_message_list' % class_name) - + def registerMessage(self, activity_buffer, activity_tool, m): class_name = self.__class__.__name__ getattr(activity_buffer, '_%s_message_list' % class_name).append(m) m.is_registered = 1 - + def unregisterMessage(self, activity_buffer, activity_tool, m): m.is_registered = 0 - + def getRegisteredMessageList(self, activity_buffer, activity_tool): class_name = self.__class__.__name__ if hasattr(activity_buffer, '_%s_message_list' % class_name): - return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name)) + return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name)) else: - return () - - # Required for tests (time shift) - def timeShift(self, activity_tool, delay): + return () + + # Required for tests (time shift) + def timeShift(self, activity_tool, delay): """ delay is provided in fractions of day """ diff --git a/product/CMFActivity/Activity/RAMDict.py b/product/CMFActivity/Activity/RAMDict.py index aa0f7ecb8a..974b83ef01 100644 --- a/product/CMFActivity/Activity/RAMDict.py +++ b/product/CMFActivity/Activity/RAMDict.py @@ -50,20 +50,17 @@ class RAMDict(Queue): Queue.__init__(self) self.queue_dict = {} - def getDict(self, activity_tool): - path = activity_tool.getPhysicalPath() - if not self.queue_dict.has_key(path): - self.queue_dict[path] = {} - return self.queue_dict[path] - - def finishQueueMessage(self, activity_tool, m): + def getDict(self, activity_tool_path): + return self.queue_dict.setdefault(activity_tool_path, {}) + + def finishQueueMessage(self, activity_tool_path, m): if m.is_registered: - self.getDict(activity_tool)[(tuple(m.object_path), m.method_id)] = m + self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)] = m - def finishDeleteMessage(self, activity_tool, message): - for key, m in self.getDict(activity_tool).items(): + def finishDeleteMessage(self, activity_tool_path, message): + for key, m in self.getDict(activity_tool_path).items(): if m.object_path == message.object_path and m.method_id == message.method_id: - del self.getDict(activity_tool)[(tuple(m.object_path), m.method_id)] + del self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)] def registerActivityBuffer(self, activity_buffer): class_name = self.__class__.__name__ @@ -81,13 +78,14 @@ class RAMDict(Queue): m.is_registered = 1 def dequeueMessage(self, activity_tool, processing_node): - if len(self.getDict(activity_tool).keys()) is 0: + path = activity_tool.getPhysicalPath() + if len(self.getDict(path).keys()) is 0: return 1 # Go to sleep - for key, m in self.getDict(activity_tool).items(): + for key, m in self.getDict(path).items(): if m.validate(self, activity_tool) is VALID: activity_tool.invoke(m) if m.is_executed: - del self.getDict(activity_tool)[key] + del self.getDict(path)[key] get_transaction().commit() return 0 else: @@ -99,9 +97,10 @@ class RAMDict(Queue): if object is not None: object_path = object.getPhysicalPath() else: - object_path = None - active_process = kw.get('active_process', None) - for m in self.getDict(activity_tool).values(): + object_path = None + active_process = kw.get('active_process', None) + path = activity_tool.getPhysicalPath() + for m in self.getDict(path).values(): # Filter active process and path if defined if active_process is None or m.active_process == active_process: if object_path is None or m.object_path == object_path: @@ -133,16 +132,17 @@ class RAMDict(Queue): 'The document %s does not exist' % path) else: method_dict[m.method_id] = 1 - activity_tool.unregisterMessage(self, m) - else: + activity_tool.unregisterMessage(self, m) + else: method_dict[m.method_id] = 1 activity_tool.unregisterMessage(self, m) # Parse each message in RAM dict - for key, m in self.getDict(activity_tool).items(): + path = activity_tool.getPhysicalPath() + for key, m in self.getDict(path).items(): if object_path == m.object_path and (method_id is None or method_id == m.method_id): if not method_dict.has_key(m.method_id): LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) - if invoke: + if invoke: activity_tool.invoke(m) if m.is_executed: method_dict[m.method_id] = 1 @@ -150,15 +150,16 @@ class RAMDict(Queue): else: method_dict[m.method_id] = 1 self.deleteMessage(activity_tool, m) - else: + else: self.deleteMessage(activity_tool, m) def getMessageList(self, activity_tool, processing_node=None,**kw): new_queue = [] - for m in self.getDict(activity_tool).values(): + path = activity_tool.getPhysicalPath() + for m in self.getDict(path).values(): m.processing_node = 1 m.priority = 0 new_queue.append(m) return new_queue - + registerActivity(RAMDict) diff --git a/product/CMFActivity/Activity/RAMQueue.py b/product/CMFActivity/Activity/RAMQueue.py index 29f163f867..cd8d084203 100644 --- a/product/CMFActivity/Activity/RAMQueue.py +++ b/product/CMFActivity/Activity/RAMQueue.py @@ -43,23 +43,20 @@ class RAMQueue(Queue): Queue.__init__(self) self.queue_dict = {} self.last_uid = 0 - - def getQueue(self, activity_tool): - path = activity_tool.getPhysicalPath() - if not self.queue_dict.has_key(path): - self.queue_dict[path] = [] - return self.queue_dict[path] - - def finishQueueMessage(self, activity_tool, m): + + def getQueue(self, activity_tool_path): + return self.queue_dict.setdefault(activity_tool_path, []) + + def finishQueueMessage(self, activity_tool_path, m): if m.is_registered: # XXX - Some lock is required on this section self.last_uid = self.last_uid + 1 m.uid = self.last_uid - self.getQueue(activity_tool).append(m) + self.getQueue(activity_tool_path).append(m) - def finishDeleteMessage(self, activity_tool, m): + def finishDeleteMessage(self, activity_tool_path, m): i = 0 - queue = self.getQueue(activity_tool) + queue = self.getQueue(activity_tool_path) for my_message in queue: if my_message.uid == m.uid: del queue[i] @@ -67,7 +64,8 @@ class RAMQueue(Queue): i = i + 1 def dequeueMessage(self, activity_tool, processing_node): - for m in self.getQueue(activity_tool): + path = activity_tool.getPhysicalPath() + for m in self.getQueue(path): if m.validate(self, activity_tool) is not VALID: self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) get_transaction().commit() # Start a new transaction @@ -76,7 +74,7 @@ class RAMQueue(Queue): if m.is_executed: self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) get_transaction().commit() # Start a new transaction - return 0 # Keep on ticking + return 0 # Keep on ticking else: # Start a new transaction and keep on to next message get_transaction().commit() @@ -88,8 +86,9 @@ class RAMQueue(Queue): object_path = object.getPhysicalPath() else: object_path = None - active_process = kw.get('active_process', None) - for m in self.getQueue(activity_tool): + active_process = kw.get('active_process', None) + path = activity_tool.getPhysicalPath() + for m in self.getQueue(path): # Filter active process and path if defined if active_process is None or m.active_process == active_process: if object_path is None or m.object_path == object_path: @@ -102,29 +101,31 @@ class RAMQueue(Queue): if object_path == m.object_path and (method_id is None or method_id == m.method_id): if m.validate(self, activity_tool) is not VALID: activity_tool.unregisterMessage(self, m) # Trash messages which are not validated (no error handling) - else: + else: if invoke: activity_tool.invoke(m) if m.is_executed: activity_tool.unregisterMessage(self, m) - else: + else: activity_tool.unregisterMessage(self, m) # Parse each message in queue - for m in self.getQueue(activity_tool): + path = activity_tool.getPhysicalPath() + for m in self.getQueue(path): if object_path == m.object_path and (method_id is None or method_id == m.method_id): if m.validate(self, activity_tool) is not VALID: self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) - else: + else: if invoke: activity_tool.invoke(m) if m.is_executed: self.deleteMessage(activity_tool, m) # Only delete if no error happens - else: + else: self.deleteMessage(activity_tool, m) def getMessageList(self, activity_tool, processing_node=None,**kw): new_queue = [] - for m in self.getQueue(activity_tool): + path = activity_tool.getPhysicalPath() + for m in self.getQueue(path): m.processing_node = 1 m.priority = 0 new_queue.append(m) diff --git a/product/CMFActivity/ActivityBuffer.py b/product/CMFActivity/ActivityBuffer.py index d1a7ebbace..acef035e78 100644 --- a/product/CMFActivity/ActivityBuffer.py +++ b/product/CMFActivity/ActivityBuffer.py @@ -26,6 +26,7 @@ from Shared.DC.ZRDB.TM import TM from zLOG import LOG, ERROR, INFO import sys +from thread import allocate_lock, get_ident try: from transaction import get as get_transaction @@ -34,123 +35,120 @@ except ImportError: class ActivityBuffer(TM): - _p_oid=_p_changed=_registered=None + _p_oid=_p_changed=_registered=None - def __init__(self, activity_tool=None): - from thread import allocate_lock - self._use_TM = self._transactions = 1 - if self._use_TM: - self._tlock = allocate_lock() - self._tthread = None - self._lock = allocate_lock() - if activity_tool is not None: - self._activity_tool = activity_tool + def __init__(self, activity_tool=None): + self._use_TM = self._transactions = 1 + if self._use_TM: + self._tlock = allocate_lock() + self._tthread = None + self._lock = allocate_lock() - # Keeps a list of messages to add and remove - # at end of transaction - def _begin(self, *ignored): - from thread import get_ident - from ActivityTool import activity_list - self._tlock.acquire() - self._tthread = get_ident() - self.requires_prepare = 1 - try: - self.queued_activity = [] - self.flushed_activity = [] - for activity in activity_list: # Reset registration for each transaction - activity.registerActivityBuffer(self) - # In Zope 2.8 (ZODB 3.4), use beforeCommitHook instead of - # patching Trasaction. - transaction = get_transaction() - try: - transaction.beforeCommitHook(self.tpc_prepare, transaction) - except AttributeError: - pass - except: - LOG('ActivityBuffer', ERROR, "exception during _begin", - error=sys.exc_info()) - self._tlock.release() - raise + # Directly store the activity tool as an attribute. At the beginning + # the activity tool was stored as a part of the key in queued_activity and + # in flushed_activity, but this is not nice because in that case we must + # use hash on it, and when there is no uid on activity tool, it is + # impossible to generate a new uid because acquisition is not available + # in the dictionary. + assert activity_tool is not None + self._activity_tool = activity_tool - def _finish(self, *ignored): - from thread import get_ident - if not self._tlock.locked() or self._tthread != get_ident(): - LOG('ActivityBuffer', INFO, "ignoring _finish") - return - try: - try: - # Try to push / delete all messages - for (activity, message) in self.flushed_activity: - #LOG('ActivityBuffer finishDeleteMessage', ERROR, str(message.method_id)) - activity.finishDeleteMessage(self._activity_tool, message) - for (activity, message) in self.queued_activity: - #LOG('ActivityBuffer finishQueueMessage', ERROR, str(message.method_id)) - activity.finishQueueMessage(self._activity_tool, message) - except: - LOG('ActivityBuffer', ERROR, "exception during _finish", - error=sys.exc_info()) - raise - finally: - self._tlock.release() + # Referring to a persistent object is dangerous when finishing a transaction, + # so store only the required information. + self._activity_tool_path = activity_tool.getPhysicalPath() - def _abort(self, *ignored): - from thread import get_ident - if not self._tlock.locked() or self._tthread != get_ident(): - LOG('ActivityBuffer', 0, "ignoring _abort") - return - self._tlock.release() + # Keeps a list of messages to add and remove + # at end of transaction + def _begin(self, *ignored): + from ActivityTool import activity_list + self._tlock.acquire() + self._tthread = get_ident() + self.requires_prepare = 1 + try: + self.queued_activity = [] + self.flushed_activity = [] + for activity in activity_list: # Reset registration for each transaction + activity.registerActivityBuffer(self) + # In Zope 2.8 (ZODB 3.4), use beforeCommitHook instead of + # patching Trasaction. + transaction = get_transaction() + try: + transaction.beforeCommitHook(self.tpc_prepare, transaction) + except AttributeError: + pass + except: + LOG('ActivityBuffer', ERROR, "exception during _begin", + error=sys.exc_info()) + self._tlock.release() + raise - def tpc_prepare(self, transaction, sub=None): - if sub is not None: # Do nothing if it is a subtransaction - return - if not self.requires_prepare: return - self.requires_prepare = 0 - from thread import get_ident - if not self._tlock.locked() or self._tthread != get_ident(): - LOG('ActivityBuffer', 0, "ignoring tpc_prepare") - return - try: - # Try to push / delete all messages - for (activity, message) in self.flushed_activity: - #LOG('ActivityBuffer prepareDeleteMessage', ERROR, str(message.method_id)) - activity.prepareDeleteMessage(self._activity_tool, message) - activity_dict = {} - for (activity, message) in self.queued_activity: - key = activity - if key not in activity_dict: - activity_dict[key] = [] - activity_dict[key].append(message) - for key, message_list in activity_dict.items(): - activity = key - if hasattr(activity, 'prepareQueueMessageList'): - activity.prepareQueueMessageList(self._activity_tool, message_list) - else: - for message in message_list: - activity.prepareQueueMessage(self._activity_tool, message) - except: - LOG('ActivityBuffer', ERROR, "exception during tpc_prepare", - error=sys.exc_info()) - raise + def _finish(self, *ignored): + if not self._tlock.locked() or self._tthread != get_ident(): + LOG('ActivityBuffer', INFO, "ignoring _finish") + return + try: + try: + # Try to push / delete all messages + for (activity, message) in self.flushed_activity: + #LOG('ActivityBuffer finishDeleteMessage', ERROR, str(message.method_id)) + activity.finishDeleteMessage(self._activity_tool_path, message) + for (activity, message) in self.queued_activity: + #LOG('ActivityBuffer finishQueueMessage', ERROR, str(message.method_id)) + activity.finishQueueMessage(self._activity_tool_path, message) + except: + LOG('ActivityBuffer', ERROR, "exception during _finish", + error=sys.exc_info()) + raise + finally: + self._tlock.release() - def deferredQueueMessage(self, activity_tool, activity, message): - self._register() - # Directly store the activity tool as an attribute. At the beginning - # the activity tool was stored as a part of the key in queued_activity and - # in flushed_activity, but this is not nice because in that case we must - # use hash on it, and when there is no uid on activity tool, it is - # impossible to generate a new uid because acquisition is not available - # in the dictionnary. - if getattr(self,'_activity_tool',None) is None: - self._activity_tool = activity_tool - # Activity is called to prevent queuing some messages (useful for example - # to prevent reindexing objects multiple times) - if not activity.isMessageRegistered(self, activity_tool, message): - self.queued_activity.append((activity, message)) - # We register queued messages so that we can - # unregister them - activity.registerMessage(self, activity_tool, message) + def _abort(self, *ignored): + if not self._tlock.locked() or self._tthread != get_ident(): + LOG('ActivityBuffer', 0, "ignoring _abort") + return + self._tlock.release() - def deferredDeleteMessage(self, activity_tool, activity, message): - self._register() - self.flushed_activity.append((activity, message)) + def tpc_prepare(self, transaction, sub=None): + if sub is not None: # Do nothing if it is a subtransaction + return + if not self.requires_prepare: return + self.requires_prepare = 0 + if not self._tlock.locked() or self._tthread != get_ident(): + LOG('ActivityBuffer', 0, "ignoring tpc_prepare") + return + try: + # Try to push / delete all messages + for (activity, message) in self.flushed_activity: + #LOG('ActivityBuffer prepareDeleteMessage', ERROR, str(message.method_id)) + activity.prepareDeleteMessage(self._activity_tool, message) + activity_dict = {} + for (activity, message) in self.queued_activity: + key = activity + if key not in activity_dict: + activity_dict[key] = [] + activity_dict[key].append(message) + for key, message_list in activity_dict.items(): + activity = key + if hasattr(activity, 'prepareQueueMessageList'): + activity.prepareQueueMessageList(self._activity_tool, message_list) + else: + for message in message_list: + activity.prepareQueueMessage(self._activity_tool, message) + except: + LOG('ActivityBuffer', ERROR, "exception during tpc_prepare", + error=sys.exc_info()) + raise + def deferredQueueMessage(self, activity_tool, activity, message): + self._register() + # Activity is called to prevent queuing some messages (useful for example + # to prevent reindexing objects multiple times) + if not activity.isMessageRegistered(self, activity_tool, message): + self.queued_activity.append((activity, message)) + # We register queued messages so that we can + # unregister them + activity.registerMessage(self, activity_tool, message) + + def deferredDeleteMessage(self, activity_tool, activity, message): + self._register() + self.flushed_activity.append((activity, message)) -- 2.30.9