Commit 6e895eeb authored by Sebastien Robin's avatar Sebastien Robin

check if we have sql methods before calling them

make ramqueue working with several cmf site


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@719 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent cffc1790
...@@ -36,57 +36,61 @@ class RAMQueue(Queue): ...@@ -36,57 +36,61 @@ class RAMQueue(Queue):
""" """
def __init__(self): def __init__(self):
Queue.__init__(self) Queue.__init__(self)
self.queue = [] self.queue_dict = {}
self.last_uid = 0 self.last_uid = 0
def getQueue(self, activity_tool):
path = activity_tool.getPhysicalPath()
if not self.queue_dict.has_key(path):
self.queue_dict[path] = []
return self.queue_dict[path]
def finishQueueMessage(self, activity_tool, m): def finishQueueMessage(self, activity_tool, m):
if m.is_registered: if m.is_registered:
# XXX - Some lock is required on this section # XXX - Some lock is required on this section
self.last_uid = self.last_uid + 1 self.last_uid = self.last_uid + 1
m.uid = self.last_uid m.uid = self.last_uid
self.queue.append(m) self.getQueue(activity_tool).append(m)
def finishDeleteMessage(self, activity_tool, m): def finishDeleteMessage(self, activity_tool, m):
i = 0 i = 0
for my_message in self.queue: queue = self.getQueue(activity_tool)
for my_message in queue:
if my_message.uid == m.uid: if my_message.uid == m.uid:
del self.queue[i] del queue[i]
return return
i = i + 1 i = i + 1
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if len(self.queue) is 0: if len(self.getQueue(activity_tool)) is 0:
return 1 # Go to sleep return 1 # Go to sleep
m = self.queue[0] m = self.getQueue(activity_tool)[0]
activity_tool.invoke(m) activity_tool.invoke(m)
self.deleteMessage(activity_tool, m) self.deleteMessage(activity_tool, m)
return 0 # Keep on ticking return 0 # Keep on ticking
def hasActivity(self, activity_tool, object, **kw): def hasActivity(self, activity_tool, object, **kw):
if object is not None: object_path = object.getPhysicalPath()
object_path = object.getPhysicalPath() for m in self.getQueue(activity_tool):
for m in self.queue: if m.object_path == object_path:
if list(m.object_path) == list(object_path): return 1
return 1
else:
return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
return 0 return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
# Parse each message in registered # Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self): for m in activity_tool.getRegisteredMessageList(self):
if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if invoke: activity_tool.invoke(m) if invoke: activity_tool.invoke(m)
activity_tool.unregisterMessage(self, m) activity_tool.unregisterMessage(self, m)
# Parse each message in queue # Parse each message in queue
for m in self.queue: for m in self.getQueue(activity_tool):
if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if invoke: activity_tool.invoke(m) if invoke: activity_tool.invoke(m)
self.deleteMessage(activity_tool, m) self.deleteMessage(activity_tool, m)
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None):
new_queue = [] new_queue = []
for m in self.queue: for m in self.getQueue(activity_tool):
m.processing_node = 1 m.processing_node = 1
m.priority = 0 m.priority = 0
new_queue.append(m) new_queue.append(m)
......
...@@ -94,79 +94,81 @@ class SQLDict(RAMDict): ...@@ -94,79 +94,81 @@ class SQLDict(RAMDict):
# Queue semantic # Queue semantic
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
priority = random.choice(priority_weight) if hasattr(activity_tool,'SQLDict_readMessageList'):
# Try to find a message at given priority level priority = random.choice(priority_weight)
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority) # Try to find a message at given priority level
if len(result) == 0:
# If empty, take any message
priority = None
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority) result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority)
if len(result) > 0: if len(result) == 0:
line = result[0] # If empty, take any message
path = line.path priority = None
method_id = line.method_id result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority)
uid_list = activity_tool.SQLDict_readUidList( path=path, method_id= method_id, processing_node = None ) if len(result) > 0:
uid_list = map(lambda x:x.uid, uid_list) line = result[0]
# Make sure message can not be processed anylonger path = line.path
if len(uid_list) > 0: method_id = line.method_id
activity_tool.SQLDict_processMessage(uid = uid_list) uid_list = activity_tool.SQLDict_readUidList( path=path, method_id= method_id, processing_node = None )
get_transaction().commit() # Release locks before starting a potentially long calculation uid_list = map(lambda x:x.uid, uid_list)
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state # Make sure message can not be processed anylonger
m = self.loadMessage(line.message, uid = line.uid) if len(uid_list) > 0:
# Make sure object exists activity_tool.SQLDict_processMessage(uid = uid_list)
if not m.validate(self, activity_tool): get_transaction().commit() # Release locks before starting a potentially long calculation
if line.priority > MAX_PRIORITY: # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
# This is an error m = self.loadMessage(line.message, uid = line.uid)
if len(uid_list) > 0: # Make sure object exists
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE) if not m.validate(self, activity_tool):
# Assign message back to 'error' state
#m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list,
priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
else:
# Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked
if len(uid_list) > 0:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
get_transaction().commit() # If successful, commit
if m.active_process:
active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity():
# Not more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
else:
get_transaction().abort() # If not, abort transaction and start a new one
if line.priority > MAX_PRIORITY: if line.priority > MAX_PRIORITY:
# This is an error # This is an error
if len(uid_list) > 0: if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE) activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error #m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit get_transaction().commit() # and commit
else: else:
# Lower priority # Lower priority
if len(uid_list) > 0: if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, activity_tool.SQLDict_setPriority(uid = uid_list,
priority = line.priority + 1) priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 else:
get_transaction().commit() # Release locks before starting a potentially long calculation # Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked
if len(uid_list) > 0:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
get_transaction().commit() # If successful, commit
if m.active_process:
active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity():
# Not more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
else:
get_transaction().abort() # If not, abort transaction and start a new one
if line.priority > MAX_PRIORITY:
# This is an error
if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list,
priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
get_transaction().commit() # Release locks before starting a potentially long calculation
return 1 return 1
def hasActivity(self, activity_tool, object, **kw): def hasActivity(self, activity_tool, object, **kw):
if object is not None: if hasattr(activity_tool,'SQLDict_readMessageList'):
my_object_path = '/'.join(object.getPhysicalPath()) if object is not None:
result = activity_tool.SQLDict_hasMessage(path=my_object_path, **kw) my_object_path = '/'.join(object.getPhysicalPath())
if len(result) > 0: result = activity_tool.SQLDict_hasMessage(path=my_object_path, **kw)
return result[0].message_count > 0 if len(result) > 0:
else: return result[0].message_count > 0
return 1 # Default behaviour if no object specified is to return 1 until active_process implemented else:
return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
return 0 return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw): def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
...@@ -184,11 +186,34 @@ class SQLDict(RAMDict): ...@@ -184,11 +186,34 @@ class SQLDict(RAMDict):
path = '/'.join(object_path) path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
method_dict = {} method_dict = {}
# Parse each message in registered if hasattr(activity_tool,'SQLDict_readMessageList'):
for m in activity_tool.getRegisteredMessageList(self): # Parse each message in registered
if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id): for m in activity_tool.getRegisteredMessageList(self):
activity_tool.unregisterMessage(self, m) if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
activity_tool.unregisterMessage(self, m)
if not method_dict.has_key(method_id):
if invoke:
# First Validate
if m.validate(self, activity_tool):
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
else:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
# Parse each message in SQL dict
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
for line in result:
path = line.path
method_id = line.method_id
if not method_dict.has_key(method_id): if not method_dict.has_key(method_id):
# Only invoke once (it would be different for a queue)
method_dict[method_id] = 1
m = self.loadMessage(line.message, uid = line.uid)
self.deleteMessage(activity_tool, m)
if invoke: if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool): if m.validate(self, activity_tool):
...@@ -200,29 +225,7 @@ class SQLDict(RAMDict): ...@@ -200,29 +225,7 @@ class SQLDict(RAMDict):
else: else:
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
# Parse each message in SQL dict
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
for line in result:
path = line.path
method_id = line.method_id
if not method_dict.has_key(method_id):
# Only invoke once (it would be different for a queue)
method_dict[method_id] = 1
m = self.loadMessage(line.message, uid = line.uid)
self.deleteMessage(activity_tool, m)
if invoke:
# First Validate
if m.validate(self, activity_tool):
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
else:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
# def start(self, activity_tool, active_process=None): # def start(self, activity_tool, active_process=None):
# uid_list = activity_tool.SQLDict_readUidList(path=path, active_process=active_process) # uid_list = activity_tool.SQLDict_readUidList(path=path, active_process=active_process)
...@@ -235,28 +238,30 @@ class SQLDict(RAMDict): ...@@ -235,28 +238,30 @@ class SQLDict(RAMDict):
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None):
# YO: reading all lines might cause a deadlock # YO: reading all lines might cause a deadlock
message_list = [] message_list = []
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None) if hasattr(activity_tool,'SQLDict_readMessageList'):
for line in result: result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None)
m = self.loadMessage(line.message, uid = line.uid) for line in result:
m.processing_node = line.processing_node m = self.loadMessage(line.message, uid = line.uid)
m.priority = line.priority m.processing_node = line.processing_node
message_list.append(m) m.priority = line.priority
message_list.append(m)
return message_list return message_list
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
processing_node = 1 processing_node = 1
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages if hasattr(activity_tool,'SQLDict_readMessageList'):
get_transaction().commit() # Release locks before starting a potentially long calculation result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
path_dict = {} get_transaction().commit() # Release locks before starting a potentially long calculation
for line in result: path_dict = {}
path = line.path for line in result:
if not path_dict.has_key(path): path = line.path
# Only assign once (it would be different for a queue) if not path_dict.has_key(path):
path_dict[path] = 1 # Only assign once (it would be different for a queue)
activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None) path_dict[path] = 1
get_transaction().commit() # Release locks immediately to allow processing of messages activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None)
processing_node = processing_node + 1 get_transaction().commit() # Release locks immediately to allow processing of messages
if processing_node > node_count: processing_node = processing_node + 1
processing_node = 1 # Round robin if processing_node > node_count:
processing_node = 1 # Round robin
registerActivity(SQLDict) registerActivity(SQLDict)
...@@ -65,62 +65,64 @@ class SQLQueue(RAMQueue): ...@@ -65,62 +65,64 @@ class SQLQueue(RAMQueue):
activity_tool.SQLQueue_delMessage(uid = m.uid) activity_tool.SQLQueue_delMessage(uid = m.uid)
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
priority = random.choice(priority_weight) if hasattr(activity_tool,'SQLQueue_readMessageList'):
# Try to find a message at given priority level priority = random.choice(priority_weight)
result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority) # Try to find a message at given priority level
if len(result) == 0: result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=priority)
# If empty, take any message if len(result) == 0:
result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None) # If empty, take any message
if len(result) > 0: result = activity_tool.SQLQueue_readMessage(processing_node=processing_node, priority=None)
line = result[0] if len(result) > 0:
path = line.path line = result[0]
method_id = line.method_id path = line.path
# Make sure message can not be processed anylonger method_id = line.method_id
activity_tool.SQLQueue_processMessage(uid=line.uid) # Make sure message can not be processed anylonger
get_transaction().commit() # Release locks before starting a potentially long calculation activity_tool.SQLQueue_processMessage(uid=line.uid)
m = self.loadMessage(line.message) get_transaction().commit() # Release locks before starting a potentially long calculation
# Make sure object exists m = self.loadMessage(line.message)
if not m.validate(self, activity_tool): # Make sure object exists
if line.priority > MAX_PRIORITY: if not m.validate(self, activity_tool):
# This is an error
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state
#m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
else:
# Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked
activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it
get_transaction().commit() # If successful, commit
else:
get_transaction().abort() # If not, abort transaction and start a new one
if line.priority > MAX_PRIORITY: if line.priority > MAX_PRIORITY:
# This is an error # This is an error
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE) activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error #m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit get_transaction().commit() # and commit
else: else:
# Lower priority # Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 else:
get_transaction().commit() # Release locks before starting a potentially long calculation # Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked
activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it
get_transaction().commit() # If successful, commit
else:
get_transaction().abort() # If not, abort transaction and start a new one
if line.priority > MAX_PRIORITY:
# This is an error
activity_tool.SQLQueue_assignMessage(uid=line.uid, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
get_transaction().commit() # Release locks before starting a potentially long calculation
return 1 return 1
def hasActivity(self, activity_tool, object, **kw): def hasActivity(self, activity_tool, object, **kw):
if object is not None: if hasattr(activity_tool,'SQLQueue_readMessageList'):
my_object_path = '/'.join(object.getPhysicalPath()) if object is not None:
result = activity_tool.SQLQueue_hasMessage(path=my_object_path, **kw) my_object_path = '/'.join(object.getPhysicalPath())
if len(result) > 0: result = activity_tool.SQLQueue_hasMessage(path=my_object_path, **kw)
return result[0].message_count > 0 if len(result) > 0:
else: return result[0].message_count > 0
return 1 # Default behaviour if no object specified is to return 1 until active_process implemented else:
return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
return 0 return 0
def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw): def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
...@@ -135,38 +137,39 @@ class SQLQueue(RAMQueue): ...@@ -135,38 +137,39 @@ class SQLQueue(RAMQueue):
NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
""" """
#return # Do nothing here to precent overlocking if hasattr(activity_tool,'SQLQueue_readMessageList'):
path = '/'.join(object_path) #return # Do nothing here to precent overlocking
# Parse each message in registered path = '/'.join(object_path)
for m in activity_tool.getRegisteredMessageList(self): # Parse each message in registered
if object_path == m.object_path and (method_id is None or method_id == m.method_id): for m in activity_tool.getRegisteredMessageList(self):
if invoke: activity_tool.invoke(m) if object_path == m.object_path and (method_id is None or method_id == m.method_id):
activity_tool.unregisterMessage(self, m) if invoke: activity_tool.invoke(m)
# Parse each message in SQL queue activity_tool.unregisterMessage(self, m)
#LOG('Flush', 0, str((path, invoke, method_id))) # Parse each message in SQL queue
result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) #LOG('Flush', 0, str((path, invoke, method_id)))
#LOG('Flush', 0, str(len(result))) result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None)
method_dict = {} #LOG('Flush', 0, str(len(result)))
for line in result: method_dict = {}
path = line.path for line in result:
method_id = line.method_id path = line.path
if not method_dict.has_key(method_id): method_id = line.method_id
# Only invoke once (it would be different for a queue) if not method_dict.has_key(method_id):
method_dict[method_id] = 1 # Only invoke once (it would be different for a queue)
m = self.loadMessage(line.message, uid = line.uid) method_dict[method_id] = 1
self.deleteMessage(activity_tool, m) m = self.loadMessage(line.message, uid = line.uid)
if invoke: self.deleteMessage(activity_tool, m)
# First Validate if invoke:
if m.validate(self, activity_tool): # First Validate
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? if m.validate(self, activity_tool):
if not m.is_executed: # Make sure message could be invoked activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
else:
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path)) 'The document %s does not exist' % path)
else:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
# def start(self, activity_tool, active_process=None): # def start(self, activity_tool, active_process=None):
# uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process) # uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
...@@ -178,27 +181,29 @@ class SQLQueue(RAMQueue): ...@@ -178,27 +181,29 @@ class SQLQueue(RAMQueue):
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None):
message_list = [] message_list = []
result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None) if hasattr(activity_tool,'SQLQueue_readMessageList'):
for line in result: result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node=None)
m = self.loadMessage(line.message) for line in result:
m.processing_node = line.processing_node m = self.loadMessage(line.message)
m.priority = line.priority m.processing_node = line.processing_node
message_list.append(m) m.priority = line.priority
message_list.append(m)
return message_list return message_list
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
processing_node = 1 processing_node = 1
result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages if hasattr(activity_tool,'SQLQueue_readMessageList'):
#LOG('distribute count',0,str(len(result)) ) result = activity_tool.SQLQueue_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages
#LOG('distribute count',0,str(map(lambda x:x.uid, result))) #LOG('distribute count',0,str(len(result)) )
#get_transaction().commit() # Release locks before starting a potentially long calculation #LOG('distribute count',0,str(map(lambda x:x.uid, result)))
uid_list = map(lambda x:x.uid, result)[0:100] #get_transaction().commit() # Release locks before starting a potentially long calculation
for uid in uid_list: uid_list = map(lambda x:x.uid, result)[0:100]
#LOG("distribute", 0, "assign %s" % uid) for uid in uid_list:
activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node) #LOG("distribute", 0, "assign %s" % uid)
#get_transaction().commit() # Release locks immediately to allow processing of messages activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node)
processing_node = processing_node + 1 #get_transaction().commit() # Release locks immediately to allow processing of messages
if processing_node > node_count: processing_node = processing_node + 1
processing_node = 1 # Round robin if processing_node > node_count:
processing_node = 1 # Round robin
registerActivity(SQLQueue) registerActivity(SQLQueue)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment