diff --git a/product/CMFActivity/ActiveObject.py b/product/CMFActivity/ActiveObject.py index 215dbff6bd19079ebfa6c977fc068f9265a72c75..f2e619b995027c5efb5561f15da60b9526bfea82 100755 --- a/product/CMFActivity/ActiveObject.py +++ b/product/CMFActivity/ActiveObject.py @@ -34,9 +34,13 @@ from Acquisition import aq_base from zLOG import LOG DEFAULT_ACTIVITY = 'SQLDict' -#DEFAULT_ACTIVITY = 'ZODBDict' -#DEFAULT_ACTIVITY = 'RAMDict' +# Processing node are used to store processing state or processing node +DISTRIBUTABLE_STATE = -1 +INVOKE_ERROR_STATE = -2 +VALIDATE_ERROR_STATE = -3 +STOP_STATE = -4 +POSITIVE_NODE_STATE = 'Positive Node State' # Special state which allows to select positive nodes class ActiveObject(ExtensionClass.Base): @@ -95,6 +99,20 @@ class ActiveObject(ExtensionClass.Base): # there can not be any activity return 0 + security.declareProtected( CMFCorePermissions.View, 'hasErrorActivity' ) + def hasErrorActivity(self, **kw): + """ + Tells if an object if active + """ + return self.hasActivity(processing_node = INVOKE_ERROR_STATE) + + security.declareProtected( CMFCorePermissions.View, 'hasInvalidActivity' ) + def hasInvalidActivity(self, **kw): + """ + Tells if an object if active + """ + return self.hasActivity(processing_node = VALIDATE_ERROR_STATE) + security.declareProtected( CMFCorePermissions.View, 'getActiveProcess' ) def getActiveProcess(self): activity_tool = getattr(self, 'portal_activities', None) diff --git a/product/CMFActivity/ActiveProcess.py b/product/CMFActivity/ActiveProcess.py index 561173c8a1e7b5a01e7acf1db7f2af75f5c5c744..2c7962a3288cee589ae4af2a7843abdaa4f4a20d 100755 --- a/product/CMFActivity/ActiveProcess.py +++ b/product/CMFActivity/ActiveProcess.py @@ -33,6 +33,7 @@ from Products.CMFCore import CMFCorePermissions from Products.ERP5Type.Base import Base from Products.ERP5Type import PropertySheet from BTrees.IOBTree import IOBTree +from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from zLOG import LOG @@ -98,7 +99,8 @@ class ActiveProcess(Base): def postResult(self, result): if not hasattr(self, 'result_list'): self.result_list = IOBTree() - self.result_list[self._generateNewId()] = result + result.id = self._generateNewId() + self.result_list[result.id] = result security.declareProtected(CMFCorePermissions.ManagePortal, 'getResultList') def getResultList(self, **kw): @@ -126,4 +128,42 @@ class ActiveProcess(Base): #if callable(result): # return self.activateResult(Result(self, 'activateResult',result()) + security.declareProtected( CMFCorePermissions.View, 'hasActivity' ) + def hasActivity(self, **kw): + """ + Tells if an object if active + """ + activity_tool = getattr(self, 'portal_activities', None) + if activity_tool is None: return 0 # Do nothing if no portal_activities + return activity_tool.hasActivity(None, active_process = self, **kw) + + security.declareProtected( CMFCorePermissions.View, 'hasErrorActivity' ) + def hasErrorActivity(self, **kw): + """ + Tells if an object if active + """ + return self.hasActivity(processing_node = INVOKE_ERROR_STATE) + + security.declareProtected( CMFCorePermissions.View, 'hasInvalidActivity' ) + def hasInvalidActivity(self, **kw): + """ + Tells if an object if active + """ + return self.hasActivity(processing_node = VALIDATE_ERROR_STATE) + + def start(): + # start activities related to this process + pass + + def stop(): + # stop activities related to this process + pass + + def flush(self): + # flush activities related to this process + activity_tool = getattr(self, 'portal_activities', None) + if activity_tool is None: return # Do nothing if no portal_activities + return activity_tool.flush(None, active_process = self, invoke = 0) # FLush + + InitializeClass( ActiveProcess ) diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py index eceb343b1d6b0cfff8d82bf8412ccb558e2cc8c3..627bb7fc7f99e8f49d91d60d35a14357228ef84f 100755 --- a/product/CMFActivity/Activity/Queue.py +++ b/product/CMFActivity/Activity/Queue.py @@ -123,12 +123,20 @@ class Queue: def isAwake(self, activity_tool, processing_node): return self.is_awake[processing_node] - def hasActivity(self, activity_tool, object, **kw): + def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw): return 0 def flush(self, activity_tool, object, **kw): pass + def start(self, active_process=None): + # Start queue / activities in queue for given process + pass + + def stop(self, active_process=None): + # Stop queue / activities in queue for given process + pass + def loadMessage(self, s): return pickle.loads(s) @@ -137,3 +145,4 @@ class Queue: def getMessageList(self, activity_tool, processing_node=None): return [] + diff --git a/product/CMFActivity/Activity/RAMDict.py b/product/CMFActivity/Activity/RAMDict.py index a0d893dc435598314b300a9f0fd6715330371887..80ce47b6fd73b3002711aed781043a22ed019b51 100755 --- a/product/CMFActivity/Activity/RAMDict.py +++ b/product/CMFActivity/Activity/RAMDict.py @@ -28,6 +28,7 @@ from Products.CMFActivity.ActivityTool import registerActivity from Queue import Queue +from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from zLOG import LOG @@ -57,7 +58,7 @@ class RAMDict(Queue): return 0 return 1 - def hasActivity(self, object, method_id=None, **kw): + def hasActivity(self, activity_tool, object, **kw): object_path = object.getPhysicalPath() for m in self.dict.values(): if m.object_path == object_path: diff --git a/product/CMFActivity/Activity/RAMQueue.py b/product/CMFActivity/Activity/RAMQueue.py index dc3ea0394810012f7a0ed2cd9c5991b2ee9b804d..3f244b55d8d2222df9643b6002dd090d7c8dd432 100755 --- a/product/CMFActivity/Activity/RAMQueue.py +++ b/product/CMFActivity/Activity/RAMQueue.py @@ -28,6 +28,7 @@ from Products.CMFActivity.ActivityTool import registerActivity from Queue import Queue +from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE class RAMQueue(Queue): """ @@ -49,7 +50,7 @@ class RAMQueue(Queue): del self.queue[0] return 0 # Keep on ticking - def hasActivity(self, object, method_id=None, **kw): + def hasActivity(self, activity_tool, object, **kw): object_path = object.getPhysicalPath() for m in self.queue: if m.object_path == object_path: diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 4868174c879129e8ea4136917481d149a1f9674d..6bf82019cfd0827e358f341f29471f2b499222ef 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -29,15 +29,12 @@ import random from Products.CMFActivity.ActivityTool import registerActivity from RAMDict import RAMDict +from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from zLOG import LOG MAX_PRIORITY = 5 -DISTRIBUTABLE_STATE = -1 -INVOKE_ERROR_STATE = -2 -VALIDATE_ERROR_STATE = -3 - priority_weight = \ [1] * 64 + \ [2] * 20 + \ @@ -60,6 +57,7 @@ class SQLDict(RAMDict): method_id = m.method_id, priority = m.activity_kw.get('priority', 1), message = self.dumpMessage(m)) + # Also store uid of activity def dequeueMessage(self, activity_tool, processing_node): priority = random.choice(priority_weight) @@ -88,6 +86,7 @@ class SQLDict(RAMDict): if len(uid_list) > 0: activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE) # Assign message back to 'error' state + m.notifyUser(activity_tool) # Notify Error get_transaction().commit() # and commit else: # Lower priority @@ -100,8 +99,13 @@ class SQLDict(RAMDict): activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? if m.is_executed: # Make sure message could be invoked if len(uid_list) > 0: - activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it + activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it get_transaction().commit() # If successful, commit + if m.active_process: + active_process = activity_tool.unrestrictedTraverse(m.active_process) + if not active_process.hasActivity(): + # Not more activity + m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ??? else: get_transaction().abort() # If not, abort transaction and start a new one if line.priority > MAX_PRIORITY: @@ -120,9 +124,9 @@ class SQLDict(RAMDict): get_transaction().commit() # Release locks before starting a potentially long calculation return 1 - def hasActivity(self, activity_tool, object, method_id=None, **kw): + def hasActivity(self, activity_tool, object, **kw): my_object_path = '/'.join(object.getPhysicalPath()) - result = activity_tool.SQLDict_hasMessage(path=my_object_path, method_id=method_id) + result = activity_tool.SQLDict_hasMessage(path=my_object_path, method_id=method_id, **kw) if len(result) > 0: return result[0].message_count > 0 return 0 @@ -169,6 +173,14 @@ class SQLDict(RAMDict): if len(uid_list) > 0: activity_tool.SQLDict_delMessage(uid = uid_list) # Delete all "old" messages (not -1 processing) + def start(self, activity_tool, active_process=None): + uid_list = activity_tool.SQLDict_readUidList(path=path, active_process=active_process) + activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = DISTRIBUTABLE_STATE) + + def stop(self, activity_tool, active_process=None): + uid_list = activity_tool.SQLDict_readUidList(path=path, active_process=active_process) + activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = STOP_STATE) + def getMessageList(self, activity_tool, processing_node=None): # YO: reading all lines might cause a deadlock message_list = [] diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index bf8a61087f128431df971ec4cecde9c60035d90b..352b7892c0806e58fc5ccc74d0ff066a76c697e8 100755 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -29,15 +29,12 @@ import random from Products.CMFActivity.ActivityTool import registerActivity from RAMQueue import RAMQueue +from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from zLOG import LOG MAX_PRIORITY = 5 -DISTRIBUTABLE_STATE = -1 -INVOKE_ERROR_STATE = -2 -VALIDATE_ERROR_STATE = -3 - priority_weight = \ [1] * 64 + \ [2] * 20 + \ @@ -99,6 +96,7 @@ class SQLQueue(RAMQueue): # This is an error activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE) # Assign message back to 'error' state + m.notifyUser(activity_tool) # Notify Error get_transaction().commit() # and commit else: # Lower priority @@ -108,9 +106,9 @@ class SQLQueue(RAMQueue): get_transaction().commit() # Release locks before starting a potentially long calculation return 1 - def hasActivity(self, activity_tool, object, method_id=None, **kw): + def hasActivity(self, activity_tool, object, **kw): my_object_path = '/'.join(object.getPhysicalPath()) - result = activity_tool.SQLQueue_hasMessage(path=my_object_path, method_id=method_id) + result = activity_tool.SQLQueue_hasMessage(path=my_object_path, **kw) if len(result) > 0: return result[0].message_count > 0 return 0 @@ -155,6 +153,14 @@ class SQLQueue(RAMQueue): # Erase all messages in a single transaction activity_tool.SQLQueue_delMessage(path=path, method_id=method_id) # Delete all "old" messages (not -1 processing) + def start(self, activity_tool, active_process=None): + uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process) + activity_tool.SQLQueue_assignMessage(uid = uid_list, processing_node = DISTRIBUTABLE_STATE) + + def stop(self, activity_tool, active_process=None): + uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process) + activity_tool.SQLQueue_assignMessage(uid = uid_list, processing_node = STOP_STATE) + def getMessageList(self, activity_tool, processing_node=None): message_list = [] result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None) diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 6c30845dae8c6d611911b832770ce2330381c1bb..09e27982fcf312464606c42106e9e47faf8cf35d 100755 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -34,9 +34,9 @@ from Products.CMFCore.utils import UniqueObject, _checkPermission, _getAuthentic from Globals import InitializeClass, DTMLFile, get_request from Acquisition import aq_base from DateTime.DateTime import DateTime +from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE import threading - from zLOG import LOG # Using a RAM property (not a property of an instance) allows @@ -60,7 +60,7 @@ def registerActivity(activity): class Result: - def __init__(self, object_or_path, method_id, result, title=None, id=None, message=None): + def __init__(self, object_or_path, method_id, result, log_title=None, log_id=None, log_message=None): # Some utility function to do this would be useful since we use it everywhere XXX if type(object_or_path) in (type([]), type(())): url = '/'.join(object_or_path) @@ -71,12 +71,13 @@ class Result: else: path = object_or_path.getPhysicalPath() url = '/'.join(path) - self.path = path - self.url = url - self.result = result # Include arbitrary result - self.title = title # Should follow Zope convention for LOG title - self.id = id # Should follow Zope convention for LOG ids - self.message = message # Should follow Zope convention for LOG message + self.object_path = path + self.object_url = url + self.method_id = method_id + self.result = result # Include arbitrary result + self.log_title = log_title # Should follow Zope convention for LOG title + self.log_id = log_id # Should follow Zope convention for LOG ids + self.log_message = log_message # Should follow Zope convention for LOG message allow_class(Result) @@ -92,12 +93,14 @@ class Message: self.active_process = None else: self.active_process = active_process.getPhysicalPath() + self.active_process_uid = active_process.getUid() self.activity_kw = activity_kw self.method_id = method_id self.args = args self.kw = kw self.is_executed = 0 - # User Info ? REQUEST Info ? + self.user_name = str(_getAuthenticatedUser(self)) + # Store REQUEST Info ? def __call__(self, activity_tool): try: @@ -121,6 +124,20 @@ class Message: def validate(self, activity, activity_tool): return activity.validate(activity_tool, self, **self.activity_kw) + def notifyUser(self, activity_tool, message="Failed Processing Activity"): + user_email = activity_tool.portal_memberdata.getProperty('email') + mail_text = """From: %s +To: %s +Subject: %s + +%s + +Document: %s +Method: %s + """ % (activity_tool.email_from_address, user_email, + message, message, '/'.join(self.object_path), self.method_id) + activity_tool.MailHost.send( mail_text ) + class Method: def __init__(self, passive_self, activity, active_process, kw, method_id): @@ -293,6 +310,20 @@ class ActivityTool (Folder, UniqueObject): LOG('CMFActivity: ', 0, 'flushing activity %s' % activity.__class__.__name__) activity.flush(self, object_path, invoke=invoke, **kw) + def start(self, **kw): + global is_initialized + if not is_initialized: self.initialize() + for activity in activity_list: + LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__) + activity.start(self, **kw) + + def stop(self, **kw): + global is_initialized + if not is_initialized: self.initialize() + for activity in activity_list: + LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__) + activity.stop(self, **kw) + def invoke(self, message): message(self)