diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py index 627bb7fc7f99e8f49d91d60d35a14357228ef84f..72071b0c7a86cb8b306750cf8c613736ef26ff82 100755 --- a/product/CMFActivity/Activity/Queue.py +++ b/product/CMFActivity/Activity/Queue.py @@ -80,8 +80,19 @@ class Queue: self.is_initialized = 1 def queueMessage(self, activity_tool, m): - pass - + activity_tool.deferredQueueMessage(self, m) + m.is_queued = 1 + + def deleteMessage(self, activity_tool, m): + activity_tool.deferredDeleteMessage(self, m) + m.is_deleted = 1 + + def isDeleted(self, m): + return m.is_deleted + + def isQueued(self, m): + return m.is_queued + def dequeueMessage(self, activity_tool, processing_node): pass @@ -137,8 +148,10 @@ class Queue: # Stop queue / activities in queue for given process pass - def loadMessage(self, s): - return pickle.loads(s) + def loadMessage(self, s, **kw): + m = pickle.loads(s) + m.__dict__.update(kw) + return m def dumpMessage(self, m): return pickle.dumps(m) @@ -146,3 +159,16 @@ class Queue: def getMessageList(self, activity_tool, processing_node=None): return [] + # Transaction Management + def prepareQueueMessage(self, activity_tool, m): + pass + + def finishQueueMessage(self, activity_tool, m): + pass + + def prepareDequeueMessage(self, activity_tool, m): + pass + + def finishDequeueMessage(self, activity_tool, m): + pass + \ No newline at end of file diff --git a/product/CMFActivity/Activity/RAMDict.py b/product/CMFActivity/Activity/RAMDict.py index 80ce47b6fd73b3002711aed781043a22ed019b51..9b630540942803e3956316790e6ba4d29e23087e 100755 --- a/product/CMFActivity/Activity/RAMDict.py +++ b/product/CMFActivity/Activity/RAMDict.py @@ -45,9 +45,14 @@ class RAMDict(Queue): Queue.__init__(self) self.dict = {} - def queueMessage(self, activity_tool, m): + def finishQueueMessage(self, activity_tool, m): self.dict[(m.object_path, m.method_id)] = m + def finishDeleteMessage(self, activity_tool, message): + for key, m in self.dict.items(): + if m.object_path == message.object_path and m.method_id == message.method_id: + del self.dict[(m.object_path, m.method_id)] + def dequeueMessage(self, activity_tool, processing_node): if len(self.dict.keys()) is 0: return 1 # Go to sleep @@ -67,13 +72,14 @@ class RAMDict(Queue): def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): for key, m in self.dict.items(): - if m.object_path == object_path: - LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) - if invoke: activity_tool.invoke(m) - del self.dict[key] - else: - pass - #LOG('CMFActivity RAMDict: ', 0, 'not flushing object %s' % '/'.join(m.object_path)) + if not m.is_deleted: + if m.object_path == object_path: + LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) + if invoke: activity_tool.invoke(m) + self.deleteMessage(m) + else: + pass + #LOG('CMFActivity RAMDict: ', 0, 'not flushing object %s' % '/'.join(m.object_path)) def getMessageList(self, activity_tool, processing_node=None): return self.dict.values() diff --git a/product/CMFActivity/Activity/RAMQueue.py b/product/CMFActivity/Activity/RAMQueue.py index b85af140511b46498f8205ae6ec890af7e19431d..a63d622628e9c4f26d60036d511927c27e4878c0 100755 --- a/product/CMFActivity/Activity/RAMQueue.py +++ b/product/CMFActivity/Activity/RAMQueue.py @@ -34,20 +34,31 @@ class RAMQueue(Queue): """ A simple RAM based queue """ - + message_queue_id = 0 + def __init__(self): Queue.__init__(self) self.queue = [] - def queueMessage(self, activity_tool, m): + def finishQueueMessage(self, activity_tool, m): + self.message_queue_id = self.message_queue_id + 1 + m.message_queue_id = self.message_queue_id self.queue.append(m) + def finishDeleteMessage(self, activity_tool, m): + i = 0 + for my_message in self.queue: + if my_message.message_queue_id == m.message_queue_id: + del self.queue[i] + return + i = i + 1 + def dequeueMessage(self, activity_tool, processing_node): if len(self.queue) is 0: return 1 # Go to sleep m = self.queue[0] activity_tool.invoke(m) - del self.queue[0] + self.deleteMessage(m) return 0 # Keep on ticking def hasActivity(self, activity_tool, object, **kw): @@ -58,14 +69,11 @@ class RAMQueue(Queue): return 0 def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): - new_queue = [] for m in self.queue: - if m.object_path == object_path: - if invoke: - activity_tool.invoke(m) - else: - new_queue.append(m) - self.queue = new_queue + if not m.is_deleted: + if m.object_path == object_path: + if invoke: activity_tool.invoke(m) + self.deleteMessage(m) def getMessageList(self, activity_tool, processing_node=None): new_queue = [] diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 6bf82019cfd0827e358f341f29471f2b499222ef..787f6c72ba18bd211b25bd1dbf62540fbd458aa8 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -52,13 +52,19 @@ class SQLDict(RAMDict): because use of OOBTree. """ - def queueMessage(self, activity_tool, m): + def prepareQueueMessage(self, activity_tool, m): activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , method_id = m.method_id, priority = m.activity_kw.get('priority', 1), message = self.dumpMessage(m)) # Also store uid of activity + def prepareDeleteMessage(self, activity_tool, m): + # Erase all messages in a single transaction + uid_list = activity_tool.SQLDict_readUidList(path=m.object_path, method_id=m.method_id,processing_node=None) + uid_list = map(lambda x:x.uid, uid_list) + activity_tool.SQLDict_delMessage(uid = uid_list) + def dequeueMessage(self, activity_tool, processing_node): priority = random.choice(priority_weight) # Try to find a message at given priority level @@ -144,20 +150,19 @@ class SQLDict(RAMDict): NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible """ path = '/'.join(object_path) - uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,processing_node=None) - uid_list = map(lambda x:x.uid, uid_list) # LOG('Flush', 0, str((path, invoke, method_id))) - if invoke: - result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) - method_dict = {} - # Parse each message - for line in result: - path = line.path - method_id = line.method_id - if not method_dict.has_key(method_id): - # Only invoke once (it would be different for a queue) - method_dict[method_id] = 1 - m = self.loadMessage(line.message) + result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) + method_dict = {} + # Parse each message + for line in result: + path = line.path + method_id = line.method_id + if not method_dict.has_key(method_id): + # Only invoke once (it would be different for a queue) + method_dict[method_id] = 1 + m = self.loadMessage(line.message) + self.deleteMessage(m) + if invoke: # First Validate if m.validate(self, activity_tool): activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? @@ -169,9 +174,6 @@ class SQLDict(RAMDict): # The message no longer exists raise ActivityFlushError, ( 'The document %s does not exist' % path) - # Erase all messages in a single transaction - if len(uid_list) > 0: - activity_tool.SQLDict_delMessage(uid = uid_list) # Delete all "old" messages (not -1 processing) def start(self, activity_tool, active_process=None): uid_list = activity_tool.SQLDict_readUidList(path=path, active_process=active_process) diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index 352b7892c0806e58fc5ccc74d0ff066a76c697e8..b4b12edc8d156dde75ea31c6e099a923b5758072 100755 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -52,12 +52,16 @@ class SQLQueue(RAMQueue): because use of OOBTree. """ - def queueMessage(self, activity_tool, m): + def prepareQueueMessage(self, activity_tool, m): activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) , method_id = m.method_id, priority = m.activity_kw.get('priority', 1), message = self.dumpMessage(m)) + def prepareDeleteMessage(self, activity_tool, m): + # Erase all messages in a single transaction + activity_tool.SQLQueue_delMessage(uid = m.uid) + def dequeueMessage(self, activity_tool, processing_node): priority = random.choice(priority_weight) # Try to find a message at given priority level @@ -128,17 +132,18 @@ class SQLQueue(RAMQueue): return # Do nothing here to precent overlocking path = '/'.join(object_path) # LOG('Flush', 0, str((path, invoke, method_id))) - if invoke: - result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) - method_dict = {} - # Parse each message - for line in result: - path = line.path - method_id = line.method_id - if not method_dict.has_key(method_id): - # Only invoke once (it would be different for a queue) - method_dict[method_id] = 1 - m = self.loadMessage(line.message) + result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) + method_dict = {} + # Parse each message + for line in result: + path = line.path + method_id = line.method_id + if not method_dict.has_key(method_id): + # Only invoke once (it would be different for a queue) + method_dict[method_id] = 1 + m = self.loadMessage(line.message, uid = line.uid) + self.deleteMessage(m) + if invoke: # First Validate if m.validate(self, activity_tool): activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? @@ -150,8 +155,6 @@ class SQLQueue(RAMQueue): # The message no longer exists raise ActivityFlushError, ( 'The document %s does not exist' % path) - # Erase all messages in a single transaction - activity_tool.SQLQueue_delMessage(path=path, method_id=method_id) # Delete all "old" messages (not -1 processing) def start(self, activity_tool, active_process=None): uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process) diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 7b8fc0fb39d528f1e7295e6f9c18123687e3743a..8189ba84c8e7e844cf15d71d9853298bf1fad41b 100755 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -35,6 +35,7 @@ from Globals import InitializeClass, DTMLFile, get_request from Acquisition import aq_base from DateTime.DateTime import DateTime from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE +from ActivityBuffer import ActivityBuffer import threading from zLOG import LOG @@ -82,6 +83,10 @@ class Result: allow_class(Result) class Message: + + is_deleted = 0 + is_queued = 0 + def __init__(self, object, active_process, activity_kw, method_id, args, kw): if type(object) is type('a'): self.object_path = object.split('/') @@ -203,7 +208,7 @@ class ActivityTool (Folder, UniqueObject): def __init__(self): return Folder.__init__(self, ActivityTool.id) - + # Filter content (ZMI)) def filtered_meta_types(self, user=None): # Filters the list of available meta types. @@ -304,8 +309,15 @@ class ActivityTool (Folder, UniqueObject): def activate(self, object, activity, active_process, **kw): global is_initialized if not is_initialized: self.initialize() + if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer() return ActiveWrapper(object, activity, active_process, **kw) + def deferredQueueMessage(self, activity, message): + self._v_activity_buffer.deferredQueueMessage(self, activity, message) + + def deferredDeleteMessage(self, activity, message): + self._v_activity_buffer.deferredDeleteMessage(self, activity, message) + def flush(self, object, invoke=0, **kw): global is_initialized if not is_initialized: self.initialize() @@ -400,4 +412,4 @@ class ActivityTool (Folder, UniqueObject): return None -InitializeClass(ActivityTool) +InitializeClass(ActivityTool) \ No newline at end of file