Commit 9a0ecaa7 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Manage how many times an active object is retried, and tweak the next...

Manage how many times an active object is retried, and tweak the next processing date based on the number of retries. Also, when there is no active object available, shift the time to go ahead quickly.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@4223 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 1661bb6a
No related merge requests found
......@@ -140,7 +140,7 @@ class SQLDict(RAMDict):
else:
return ()
def validateMessage(self, activity_tool, message, uid_list, priority, next_processing_date):
def validateMessage(self, activity_tool, message, uid_list, priority, next_processing_date, retry):
validation_state = message.validate(self, activity_tool)
if validation_state is not VALID:
if validation_state in (EXCEPTION, INVALID_PATH):
......@@ -156,13 +156,13 @@ class SQLDict(RAMDict):
# Lower priority
if len(uid_list) > 0: # Add some delay before new processing
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = priority + 1)
priority = priority + 1, retry = retry + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
else:
# We do not lower priority for INVALID_ORDER errors but we do postpone execution
if len(uid_list) > 0: # Add some delay before new processing
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = priority)
priority = priority, retry = retry + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
return 1
......@@ -171,8 +171,6 @@ class SQLDict(RAMDict):
def dequeueMessage(self, activity_tool, processing_node):
if hasattr(activity_tool,'SQLDict_readMessage'):
now_date = DateTime()
# Next processing date in case of error
next_processing_date = now_date + VALIDATION_ERROR_DELAY
priority = random.choice(priority_weight)
# Try to find a message at given priority level which is scheduled for now
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority,
......@@ -181,15 +179,26 @@ class SQLDict(RAMDict):
# If empty, take any message which is scheduled for now
priority = None
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
if len(result) > 0:
if len(result) == 0:
# If the result is still empty, shift the dates so that SQLDict can dispatch pending active
# objects quickly.
self.timeShift(activity_tool, VALIDATION_ERROR_DELAY)
elif len(result) > 0:
#LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
line = result[0]
path = line.path
method_id = line.method_id
try:
retry = int(line.retry)
except TypeError:
retry = 1
# Next processing date in case of error
next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry
uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date )
uid_list = [x.uid for x in uid_list]
uid_list_list = [uid_list]
priority_list = [line.priority]
retry_list = [retry]
# Make sure message can not be processed anylonger
if len(uid_list) > 0:
# Set selected messages to processing
......@@ -199,7 +208,7 @@ class SQLDict(RAMDict):
m = self.loadMessage(line.message, uid = line.uid)
message_list = [m]
# Validate message (make sure object exists, priority OK, etc.)
if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date):
if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date, retry):
group_method_id = m.activity_kw.get('group_method_id')
if group_method_id is not None:
# Count the number of objects to prevent too many objects.
......@@ -225,6 +234,12 @@ class SQLDict(RAMDict):
for line in result:
path = line.path
method_id = line.method_id
try:
retry = int(line.retry)
except TypeError:
retry = 1
# Next processing date in case of error
next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry
uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date )
uid_list = [x.uid for x in uid_list]
if len(uid_list) > 0:
......@@ -232,7 +247,7 @@ class SQLDict(RAMDict):
activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message, uid = line.uid)
if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date):
if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date, retry):
if m.hasExpandMethod():
try:
count += len(m.getObjectList(activity_tool))
......@@ -247,13 +262,14 @@ class SQLDict(RAMDict):
message_list.append(m)
uid_list_list.append(uid_list)
priority_list.append(line.priority)
retry_list.append(retry)
if count >= MAX_GROUPED_OBJECTS:
break
get_transaction().commit() # Release locks before starting a potentially long calculation
# Try to invoke
if group_method_id is not None:
#LOG('SQLDict', 0, 'invoking a group method %s with %d objects (%d objects in expanded form)' % (group_method_id, len(message_list), count))
LOG('SQLDict', 0, 'invoking a group method %s with %d objects (%d objects in expanded form)' % (group_method_id, len(message_list), count))
#for m in message_list:
# LOG('SQLDict', 0, '%r has objects %r' % (m, m.getObjectList(activity_tool)))
activity_tool.invokeGroup(group_method_id, message_list)
......@@ -277,6 +293,9 @@ class SQLDict(RAMDict):
m = message_list[i]
uid_list = uid_list_list[i]
priority = priority_list[i]
retry = retry_list[i]
# Next processing date in case of error
next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry
if m.is_executed:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
get_transaction().commit() # If successful, commit
......@@ -297,7 +316,7 @@ class SQLDict(RAMDict):
# Lower priority
if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
priority = priority + 1)
priority = priority + 1, retry = retry + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment