Commit aa8516dd authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: use a new 'retry' column to count the number of failures

Now, all messages can be executed 6 times by default (or more in case of
ConflictError), regardless their initial priority.
For compatibility reasons, the 'priority' column is still increased.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@32877 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 0793cd95
No related merge requests found
......@@ -35,7 +35,7 @@ from Products.CMFActivity.ActiveObject import (
INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE)
from Queue import VALIDATION_ERROR_DELAY
MAX_PRIORITY = 5
MAX_RETRY = 5
class SQLBase:
......@@ -67,7 +67,7 @@ class SQLBase:
activity=class_name,
uid=line.uid,
processing_node=line.processing_node,
priority=line.priority,
retry=line.retry,
processing=line.processing)
for line in readMessageList(path=None,
method_id=None,
......@@ -133,10 +133,10 @@ class SQLBase:
If anything failed, make successful messages available (if any), and
the following rules apply to failed messages:
- Failures due to ConflictErrors cause messages to be postponed,
but their priority is *not* increased.
- Failures of messages already above maximum priority cause them to
but their retry count is *not* increased.
- Failures of messages already above maximum retry count cause them to
be put in a permanent-error state.
- In all other cases, priority is increased and message is delayed.
- In all other cases, retry count is increased and message is delayed.
"""
deletable_uid_list = []
delay_uid_list = []
......@@ -161,7 +161,6 @@ class SQLBase:
# should they be just made available again ?
if uid_to_duplicate_uid_list_dict is not None:
make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
priority = m.line.priority
# BACK: Only exceptions can be classes in Python 2.6.
# Once we drop support for Python 2.4,
# please, remove the "type(m.exc_type) is type(ConflictError)" check
......@@ -169,19 +168,23 @@ class SQLBase:
if type(m.exc_type) is type(ConflictError) and \
issubclass(m.exc_type, ConflictError):
delay_uid_list.append(uid)
elif priority > MAX_PRIORITY:
else:
retry = m.line.retry
if retry >= MAX_RETRY:
notify_user_list.append(m)
final_error_uid_list.append(uid)
else:
continue
# XXX: What about making delay quadratic to the number of retries ?
delay = VALIDATION_ERROR_DELAY #* (retry * retry + 1) / 2
try:
# Immediately update, because values different for every message
activity_tool.SQLBase_reactivate(table=self.sql_table,
uid=[uid],
delay=None,
priority=priority + 1)
delay=delay,
retry=1)
except:
self._log(WARNING, 'Failed to increase priority of %r' % uid)
delay_uid_list.append(uid)
self._log(WARNING, 'Failed to reactivate %r' % uid)
make_available_uid_list.append(uid)
else:
# Internal CMFActivity error: the message can not be executed because
# something is missing (context object cannot be found, method cannot
......@@ -198,12 +201,11 @@ class SQLBase:
self._log(TRACE, 'Deleted messages %r' % deletable_uid_list)
if delay_uid_list:
try:
# If this is a conflict error, do not lower the priority but only delay.
# If this is a conflict error, do not increase 'retry' but only delay.
activity_tool.SQLBase_reactivate(table=self.sql_table,
uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, priority=None)
uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, retry=None)
except:
self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
make_available_uid_list += delay_uid_list
if final_error_uid_list:
try:
activity_tool.SQLBase_assignMessage(table=self.sql_table,
......
......@@ -49,7 +49,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
<th align="left" valign="top">Arguments</th>
<th align="left" valign="top">Named Parameters</th>
<th align="left" valign="top">Processing Node</th>
<th align="left" valign="top">Priority</th>
<th align="left" valign="top">Retry</th>
<th align="left" valign="top">Processing</th>
<th align="left" valign="top">Call Traceback</th>
</tr>
......@@ -83,7 +83,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
</dtml-if>
</td>
<td align="left" valign="top"><dtml-var processing_node></td>
<td align="left" valign="top"><dtml-var priority></td>
<td align="left" valign="top"><dtml-var retry></td>
<td align="left" valign="top">
<dtml-if expr="processing is not None">
<dtml-var processing>
......
......@@ -9,18 +9,17 @@ class_file:
</dtml-comment>
<params>table
uid:list
priority
retry
delay
</params>
UPDATE
<dtml-var table>
SET
processing = 0
<dtml-if expr="priority is not None">
, priority = <dtml-sqlvar priority type="int">
</dtml-if>
<dtml-if expr="delay is not None">
, date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL <dtml-sqlvar delay type="int"> SECOND)
date = DATE_ADD(UTC_TIMESTAMP(), INTERVAL
<dtml-sqlvar delay type="int"> SECOND)
<dtml-if expr="retry is not None">
, priority = priority + <dtml-sqlvar retry type="int">
, retry = retry + <dtml-sqlvar retry type="int">
</dtml-if>
WHERE
<dtml-sqltest uid type="int" multiple>
......@@ -20,6 +20,7 @@ CREATE TABLE `message_queue` (
`priority` TINYINT NOT NULL DEFAULT 0,
`tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY (`path`),
......
......@@ -1620,7 +1620,10 @@ class TestCMFActivity(ERP5TypeTestCase):
def test_67_TestCancelFailedActiveObject(self, quiet=0, run=run_all_test):
"""Cancel an active object to make sure that it does not refer to
a persistent object."""
a persistent object.
XXX: this test fails if run first
"""
if not run: return
if not quiet:
message = '\nTest if it is possible to safely cancel an active object'
......@@ -1657,7 +1660,7 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(len(activity_tool.getMessageList()), 1)
# Just wait for the active object to be abandoned.
self.flushAllActivities(silent=1, loop_size=10)
self.flushAllActivities(silent=1, loop_size=100)
self.assertEquals(len(activity_tool.getMessageList()), 1)
self.assertEquals(activity_tool.getMessageList()[0].processing_node,
INVOKE_ERROR_STATE)
......
......@@ -698,7 +698,7 @@ class ERP5TypeTestCase(backportUnittest.TestCase, PortalTestCase):
)
raise RuntimeError,\
'tic is looping forever. These messages are pending: %r %s' % (
[('/'.join(m.object_path), m.method_id, m.processing_node, m.priority)
[('/'.join(m.object_path), m.method_id, m.processing_node, m.retry)
for m in portal_activities.getMessageList()],
error_message
)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment