diff --git a/product/CMFActivity/Activity/RAMQueue.py b/product/CMFActivity/Activity/RAMQueue.py index cd55c36933be29ae5b9579de3529c14e047ac769..2640adca3d85b57a1dcc6ac87a0f802eb3dc4db4 100755 --- a/product/CMFActivity/Activity/RAMQueue.py +++ b/product/CMFActivity/Activity/RAMQueue.py @@ -36,57 +36,61 @@ class RAMQueue(Queue): """ def __init__(self): Queue.__init__(self) - self.queue = [] + self.queue_dict = {} self.last_uid = 0 + def getQueue(self, activity_tool): + path = activity_tool.getPhysicalPath() + if not self.queue_dict.has_key(path): + self.queue_dict[path] = [] + return self.queue_dict[path] + def finishQueueMessage(self, activity_tool, m): if m.is_registered: # XXX - Some lock is required on this section self.last_uid = self.last_uid + 1 m.uid = self.last_uid - self.queue.append(m) + self.getQueue(activity_tool).append(m) def finishDeleteMessage(self, activity_tool, m): i = 0 - for my_message in self.queue: + queue = self.getQueue(activity_tool) + for my_message in queue: if my_message.uid == m.uid: - del self.queue[i] + del queue[i] return i = i + 1 def dequeueMessage(self, activity_tool, processing_node): - if len(self.queue) is 0: + if len(self.getQueue(activity_tool)) is 0: return 1 # Go to sleep - m = self.queue[0] + m = self.getQueue(activity_tool)[0] activity_tool.invoke(m) self.deleteMessage(activity_tool, m) return 0 # Keep on ticking def hasActivity(self, activity_tool, object, **kw): - if object is not None: - object_path = object.getPhysicalPath() - for m in self.queue: - if list(m.object_path) == list(object_path): - return 1 - else: - return 1 # Default behaviour if no object specified is to return 1 until active_process implemented + object_path = object.getPhysicalPath() + for m in self.getQueue(activity_tool): + if m.object_path == object_path: + return 1 return 0 def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): # Parse each message in registered for m in activity_tool.getRegisteredMessageList(self): - if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id): + if object_path == m.object_path and (method_id is None or method_id == m.method_id): if invoke: activity_tool.invoke(m) activity_tool.unregisterMessage(self, m) # Parse each message in queue - for m in self.queue: - if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id): + for m in self.getQueue(activity_tool): + if object_path == m.object_path and (method_id is None or method_id == m.method_id): if invoke: activity_tool.invoke(m) self.deleteMessage(activity_tool, m) def getMessageList(self, activity_tool, processing_node=None): new_queue = [] - for m in self.queue: + for m in self.getQueue(activity_tool): m.processing_node = 1 m.priority = 0 new_queue.append(m) diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 1a363f5cf4e913b329da6c364596de8126f88982..9eefeedebeb195f279a14acb3873cfcb980146f5 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -94,79 +94,81 @@ class SQLDict(RAMDict): # Queue semantic def dequeueMessage(self, activity_tool, processing_node): - priority = random.choice(priority_weight) - # Try to find a message at given priority level - result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority) - if len(result) == 0: - # If empty, take any message - priority = None + if hasattr(activity_tool,'SQLDict_readMessageList'): + priority = random.choice(priority_weight) + # Try to find a message at given priority level result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority) - if len(result) > 0: - line = result[0] - path = line.path - method_id = line.method_id - uid_list = activity_tool.SQLDict_readUidList( path=path, method_id= method_id, processing_node = None ) - uid_list = map(lambda x:x.uid, uid_list) - # Make sure message can not be processed anylonger - if len(uid_list) > 0: - activity_tool.SQLDict_processMessage(uid = uid_list) - get_transaction().commit() # Release locks before starting a potentially long calculation - # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state - m = self.loadMessage(line.message, uid = line.uid) - # Make sure object exists - if not m.validate(self, activity_tool): - if line.priority > MAX_PRIORITY: - # This is an error - if len(uid_list) > 0: - activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE) - # Assign message back to 'error' state - #m.notifyUser(activity_tool) # Notify Error - get_transaction().commit() # and commit - else: - # Lower priority - if len(uid_list) > 0: - activity_tool.SQLDict_setPriority(uid = uid_list, - priority = line.priority + 1) - get_transaction().commit() # Release locks before starting a potentially long calculation - else: - # Try to invoke - activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? - if m.is_executed: # Make sure message could be invoked - if len(uid_list) > 0: - activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it - get_transaction().commit() # If successful, commit - if m.active_process: - active_process = activity_tool.unrestrictedTraverse(m.active_process) - if not active_process.hasActivity(): - # Not more activity - m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ??? - else: - get_transaction().abort() # If not, abort transaction and start a new one + if len(result) == 0: + # If empty, take any message + priority = None + result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority) + if len(result) > 0: + line = result[0] + path = line.path + method_id = line.method_id + uid_list = activity_tool.SQLDict_readUidList( path=path, method_id= method_id, processing_node = None ) + uid_list = map(lambda x:x.uid, uid_list) + # Make sure message can not be processed anylonger + if len(uid_list) > 0: + activity_tool.SQLDict_processMessage(uid = uid_list) + get_transaction().commit() # Release locks before starting a potentially long calculation + # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state + m = self.loadMessage(line.message, uid = line.uid) + # Make sure object exists + if not m.validate(self, activity_tool): if line.priority > MAX_PRIORITY: # This is an error if len(uid_list) > 0: - activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE) + activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE) # Assign message back to 'error' state - m.notifyUser(activity_tool) # Notify Error + #m.notifyUser(activity_tool) # Notify Error get_transaction().commit() # and commit else: # Lower priority if len(uid_list) > 0: activity_tool.SQLDict_setPriority(uid = uid_list, - priority = line.priority + 1) + priority = line.priority + 1) get_transaction().commit() # Release locks before starting a potentially long calculation - return 0 - get_transaction().commit() # Release locks before starting a potentially long calculation + else: + # Try to invoke + activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? + if m.is_executed: # Make sure message could be invoked + if len(uid_list) > 0: + activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it + get_transaction().commit() # If successful, commit + if m.active_process: + active_process = activity_tool.unrestrictedTraverse(m.active_process) + if not active_process.hasActivity(): + # Not more activity + m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ??? + else: + get_transaction().abort() # If not, abort transaction and start a new one + if line.priority > MAX_PRIORITY: + # This is an error + if len(uid_list) > 0: + activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE) + # Assign message back to 'error' state + m.notifyUser(activity_tool) # Notify Error + get_transaction().commit() # and commit + else: + # Lower priority + if len(uid_list) > 0: + activity_tool.SQLDict_setPriority(uid = uid_list, + priority = line.priority + 1) + get_transaction().commit() # Release locks before starting a potentially long calculation + return 0 + get_transaction().commit() # Release locks before starting a potentially long calculation return 1 def hasActivity(self, activity_tool, object, **kw): - if object is not None: - my_object_path = '/'.join(object.getPhysicalPath()) - result = activity_tool.SQLDict_hasMessage(path=my_object_path, **kw) - if len(result) > 0: - return result[0].message_count > 0 - else: - return 1 # Default behaviour if no object specified is to return 1 until active_process implemented + if hasattr(activity_tool,'SQLDict_readMessageList'): + if object is not None: + my_object_path = '/'.join(object.getPhysicalPath()) + result = activity_tool.SQLDict_hasMessage(path=my_object_path, **kw) + if len(result) > 0: + return result[0].message_count > 0 + else: + return 1 # Default behaviour if no object specified is to return 1 until active_process implemented return 0 def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw): @@ -184,11 +186,34 @@ class SQLDict(RAMDict): path = '/'.join(object_path) # LOG('Flush', 0, str((path, invoke, method_id))) method_dict = {} - # Parse each message in registered - for m in activity_tool.getRegisteredMessageList(self): - if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id): - activity_tool.unregisterMessage(self, m) + if hasattr(activity_tool,'SQLDict_readMessageList'): + # Parse each message in registered + for m in activity_tool.getRegisteredMessageList(self): + if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id): + activity_tool.unregisterMessage(self, m) + if not method_dict.has_key(method_id): + if invoke: + # First Validate + if m.validate(self, activity_tool): + activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? + if not m.is_executed: # Make sure message could be invoked + # The message no longer exists + raise ActivityFlushError, ( + 'Could not evaluate %s on %s' % (method_id , path)) + else: + # The message no longer exists + raise ActivityFlushError, ( + 'The document %s does not exist' % path) + # Parse each message in SQL dict + result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) + for line in result: + path = line.path + method_id = line.method_id if not method_dict.has_key(method_id): + # Only invoke once (it would be different for a queue) + method_dict[method_id] = 1 + m = self.loadMessage(line.message, uid = line.uid) + self.deleteMessage(activity_tool, m) if invoke: # First Validate if m.validate(self, activity_tool): @@ -200,29 +225,7 @@ class SQLDict(RAMDict): else: # The message no longer exists raise ActivityFlushError, ( - 'The document %s does not exist' % path) - # Parse each message in SQL dict - result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) - for line in result: - path = line.path - method_id = line.method_id - if not method_dict.has_key(method_id): - # Only invoke once (it would be different for a queue) - method_dict[method_id] = 1 - m = self.loadMessage(line.message, uid = line.uid) - self.deleteMessage(activity_tool, m) - if invoke: - # First Validate - if m.validate(self, activity_tool): - activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? - if not m.is_executed: # Make sure message could be invoked - # The message no longer exists - raise ActivityFlushError, ( - 'Could not evaluate %s on %s' % (method_id , path)) - else: - # The message no longer exists - raise ActivityFlushError, ( - 'The document %s does not exist' % path) + 'The document %s does not exist' % path) # def start(self, activity_tool, active_process=None): # uid_list = activity_tool.SQLDict_readUidList(path=path, active_process=active_process) @@ -235,28 +238,30 @@ class SQLDict(RAMDict): def getMessageList(self, activity_tool, processing_node=None): # YO: reading all lines might cause a deadlock message_list = [] - result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None) - for line in result: - m = self.loadMessage(line.message, uid = line.uid) - m.processing_node = line.processing_node - m.priority = line.priority - message_list.append(m) + if hasattr(activity_tool,'SQLDict_readMessageList'): + result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None) + for line in result: + m = self.loadMessage(line.message, uid = line.uid) + m.processing_node = line.processing_node + m.priority = line.priority + message_list.append(m) return message_list def distribute(self, activity_tool, node_count): processing_node = 1 - result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages - get_transaction().commit() # Release locks before starting a potentially long calculation - path_dict = {} - for line in result: - path = line.path - if not path_dict.has_key(path): - # Only assign once (it would be different for a queue) - path_dict[path] = 1 - activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None) - get_transaction().commit() # Release locks immediately to allow processing of messages - processing_node = processing_node + 1 - if processing_node > node_count: - processing_node = 1 # Round robin + if hasattr(activity_tool,'SQLDict_readMessageList'): + result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages + get_transaction().commit() # Release locks before starting a potentially long calculation + path_dict = {} + for line in result: + path = line.path + if not path_dict.has_key(path): + # Only assign once (it would be different for a queue) + path_dict[path] = 1 + activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None) + get_transaction().commit() # Release locks immediately to allow processing of messages + processing_node = processing_node + 1 + if processing_node > node_count: + processing_node = 1 # Round robin registerActivity(SQLDict) diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index 5c356fe3dc1b2c8639033328e37fbb166642ad63..b3004de404a9c0b1ad6153fdc64d270cf5a7dd25 100755 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -65,62 +65,64 @@ class SQLQueue(RAMQueue): activity_tool.SQLQueue_delMessage(uid = m.uid) def dequeueMessage(self, activity_tool, processing_node): - priority = random.choice(priority_weight) - # Try to find a message at given priority level - result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority) - if len(result) == 0: - # If empty, take any message - result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None) - if len(result) > 0: - line = result[0] - path = line.path - method_id = line.method_id - # Make sure message can not be processed anylonger - activity_tool.SQLQueue_processMessage(uid=line.uid) - get_transaction().commit() # Release locks before starting a potentially long calculation - m = self.loadMessage(line.message) - # Make sure object exists - if not m.validate(self, activity_tool): - if line.priority > MAX_PRIORITY: - # This is an error - activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE) - # Assign message back to 'error' state - #m.notifyUser(activity_tool) # Notify Error - get_transaction().commit() # and commit - else: - # Lower priority - activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) - get_transaction().commit() # Release locks before starting a potentially long calculation - else: - # Try to invoke - activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? - if m.is_executed: # Make sure message could be invoked - activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it - get_transaction().commit() # If successful, commit - else: - get_transaction().abort() # If not, abort transaction and start a new one + if hasattr(activity_tool,'SQLQueue_readMessageList'): + priority = random.choice(priority_weight) + # Try to find a message at given priority level + result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority) + if len(result) == 0: + # If empty, take any message + result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None) + if len(result) > 0: + line = result[0] + path = line.path + method_id = line.method_id + # Make sure message can not be processed anylonger + activity_tool.SQLQueue_processMessage(uid=line.uid) + get_transaction().commit() # Release locks before starting a potentially long calculation + m = self.loadMessage(line.message) + # Make sure object exists + if not m.validate(self, activity_tool): if line.priority > MAX_PRIORITY: # This is an error - activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE) + activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE) # Assign message back to 'error' state - m.notifyUser(activity_tool) # Notify Error + #m.notifyUser(activity_tool) # Notify Error get_transaction().commit() # and commit else: # Lower priority activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) get_transaction().commit() # Release locks before starting a potentially long calculation - return 0 - get_transaction().commit() # Release locks before starting a potentially long calculation + else: + # Try to invoke + activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? + if m.is_executed: # Make sure message could be invoked + activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it + get_transaction().commit() # If successful, commit + else: + get_transaction().abort() # If not, abort transaction and start a new one + if line.priority > MAX_PRIORITY: + # This is an error + activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE) + # Assign message back to 'error' state + m.notifyUser(activity_tool) # Notify Error + get_transaction().commit() # and commit + else: + # Lower priority + activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) + get_transaction().commit() # Release locks before starting a potentially long calculation + return 0 + get_transaction().commit() # Release locks before starting a potentially long calculation return 1 def hasActivity(self, activity_tool, object, **kw): - if object is not None: - my_object_path = '/'.join(object.getPhysicalPath()) - result = activity_tool.SQLQueue_hasMessage(path=my_object_path, **kw) - if len(result) > 0: - return result[0].message_count > 0 - else: - return 1 # Default behaviour if no object specified is to return 1 until active_process implemented + if hasattr(activity_tool,'SQLQueue_readMessageList'): + if object is not None: + my_object_path = '/'.join(object.getPhysicalPath()) + result = activity_tool.SQLQueue_hasMessage(path=my_object_path, **kw) + if len(result) > 0: + return result[0].message_count > 0 + else: + return 1 # Default behaviour if no object specified is to return 1 until active_process implemented return 0 def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw): @@ -135,38 +137,39 @@ class SQLQueue(RAMQueue): NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible """ - #return # Do nothing here to precent overlocking - path = '/'.join(object_path) - # Parse each message in registered - for m in activity_tool.getRegisteredMessageList(self): - if object_path == m.object_path and (method_id is None or method_id == m.method_id): - if invoke: activity_tool.invoke(m) - activity_tool.unregisterMessage(self, m) - # Parse each message in SQL queue - #LOG('Flush', 0, str((path, invoke, method_id))) - result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) - #LOG('Flush', 0, str(len(result))) - method_dict = {} - for line in result: - path = line.path - method_id = line.method_id - if not method_dict.has_key(method_id): - # Only invoke once (it would be different for a queue) - method_dict[method_id] = 1 - m = self.loadMessage(line.message, uid = line.uid) - self.deleteMessage(activity_tool, m) - if invoke: - # First Validate - if m.validate(self, activity_tool): - activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? - if not m.is_executed: # Make sure message could be invoked + if hasattr(activity_tool,'SQLQueue_readMessageList'): + #return # Do nothing here to precent overlocking + path = '/'.join(object_path) + # Parse each message in registered + for m in activity_tool.getRegisteredMessageList(self): + if object_path == m.object_path and (method_id is None or method_id == m.method_id): + if invoke: activity_tool.invoke(m) + activity_tool.unregisterMessage(self, m) + # Parse each message in SQL queue + #LOG('Flush', 0, str((path, invoke, method_id))) + result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) + #LOG('Flush', 0, str(len(result))) + method_dict = {} + for line in result: + path = line.path + method_id = line.method_id + if not method_dict.has_key(method_id): + # Only invoke once (it would be different for a queue) + method_dict[method_id] = 1 + m = self.loadMessage(line.message, uid = line.uid) + self.deleteMessage(activity_tool, m) + if invoke: + # First Validate + if m.validate(self, activity_tool): + activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? + if not m.is_executed: # Make sure message could be invoked + # The message no longer exists + raise ActivityFlushError, ( + 'Could not evaluate %s on %s' % (method_id , path)) + else: # The message no longer exists raise ActivityFlushError, ( - 'Could not evaluate %s on %s' % (method_id , path)) - else: - # The message no longer exists - raise ActivityFlushError, ( - 'The document %s does not exist' % path) + 'The document %s does not exist' % path) # def start(self, activity_tool, active_process=None): # uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process) @@ -178,27 +181,29 @@ class SQLQueue(RAMQueue): def getMessageList(self, activity_tool, processing_node=None): message_list = [] - result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None) - for line in result: - m = self.loadMessage(line.message) - m.processing_node = line.processing_node - m.priority = line.priority - message_list.append(m) + if hasattr(activity_tool,'SQLQueue_readMessageList'): + result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None) + for line in result: + m = self.loadMessage(line.message) + m.processing_node = line.processing_node + m.priority = line.priority + message_list.append(m) return message_list def distribute(self, activity_tool, node_count): processing_node = 1 - result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages - #LOG('distribute count',0,str(len(result)) ) - #LOG('distribute count',0,str(map(lambda x:x.uid, result))) - #get_transaction().commit() # Release locks before starting a potentially long calculation - uid_list = map(lambda x:x.uid, result)[0:100] - for uid in uid_list: - #LOG("distribute", 0, "assign %s" % uid) - activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node) - #get_transaction().commit() # Release locks immediately to allow processing of messages - processing_node = processing_node + 1 - if processing_node > node_count: - processing_node = 1 # Round robin + if hasattr(activity_tool,'SQLQueue_readMessageList'): + result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages + #LOG('distribute count',0,str(len(result)) ) + #LOG('distribute count',0,str(map(lambda x:x.uid, result))) + #get_transaction().commit() # Release locks before starting a potentially long calculation + uid_list = map(lambda x:x.uid, result)[0:100] + for uid in uid_list: + #LOG("distribute", 0, "assign %s" % uid) + activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node) + #get_transaction().commit() # Release locks immediately to allow processing of messages + processing_node = processing_node + 1 + if processing_node > node_count: + processing_node = 1 # Round robin registerActivity(SQLQueue)