Commit 6d45b1fd authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: change behaviour of serialization_tag on SQLQueue to match SQLDict

At most 1 message for a given serialization tag can be validated (cf [28706]).
Respect priority/date/uid when validating only 1 message (cf [34632]).

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@34841 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 556a41d9
...@@ -36,6 +36,11 @@ from Products.CMFActivity.ActiveObject import ( ...@@ -36,6 +36,11 @@ from Products.CMFActivity.ActiveObject import (
from Queue import VALIDATION_ERROR_DELAY from Queue import VALIDATION_ERROR_DELAY
def sort_message_key(message):
# same sort key as in SQL{Dict,Queue}_readMessageList
return message.line.priority, message.line.date, message.uid
class SQLBase: class SQLBase:
""" """
Define a set of common methods for SQL-based storage of activities. Define a set of common methods for SQL-based storage of activities.
......
...@@ -35,7 +35,7 @@ from ZODB.POSException import ConflictError ...@@ -35,7 +35,7 @@ from ZODB.POSException import ConflictError
import sys import sys
from types import ClassType from types import ClassType
#from time import time #from time import time
from SQLBase import SQLBase from SQLBase import SQLBase, sort_message_key
from Products.CMFActivity.ActivityRuntimeEnvironment import ( from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable) ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter from zExceptions import ExceptionFormatter
...@@ -516,9 +516,6 @@ class SQLDict(RAMDict, SQLBase): ...@@ -516,9 +516,6 @@ class SQLDict(RAMDict, SQLBase):
validation_text_dict, now_date=now_date) validation_text_dict, now_date=now_date)
if message_dict: if message_dict:
def sort_message_key(message):
# same sort key as in SQLDict_readMessageList
return message.line.priority, message.line.date, message.uid
message_unique_dict = {} message_unique_dict = {}
serialization_tag_dict = {} serialization_tag_dict = {}
distributable_uid_set = set() distributable_uid_set = set()
......
...@@ -35,8 +35,7 @@ from ZODB.POSException import ConflictError ...@@ -35,8 +35,7 @@ from ZODB.POSException import ConflictError
from types import ClassType from types import ClassType
import sys import sys
from time import time from time import time
from sets import ImmutableSet from SQLBase import SQLBase, sort_message_key
from SQLBase import SQLBase
from Products.CMFActivity.ActivityRuntimeEnvironment import ( from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable) ActivityRuntimeEnvironment, getTransactionalVariable)
from zExceptions import ExceptionFormatter from zExceptions import ExceptionFormatter
...@@ -406,13 +405,27 @@ class SQLQueue(RAMQueue, SQLBase): ...@@ -406,13 +405,27 @@ class SQLQueue(RAMQueue, SQLBase):
message.order_validation_text = self.getOrderValidationText(message) message.order_validation_text = self.getOrderValidationText(message)
self.getExecutableMessageList(activity_tool, message, message_dict, self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date) validation_text_dict, now_date=now_date)
distributable_count = len(message_dict) if message_dict:
if distributable_count: distributable_uid_set = set()
activity_tool.SQLBase_assignMessage(table=self.sql_table, serialization_tag_dict = {}
processing_node=0, uid=[m.uid for m in message_dict.itervalues()]) for message in message_dict.itervalues():
validated_count += distributable_count serialization_tag = message.activity_kw.get('serialization_tag')
if validated_count >= MAX_VALIDATED_LIMIT: if serialization_tag is None:
return distributable_uid_set.add(message.uid)
else:
serialization_tag_dict.setdefault(serialization_tag,
[]).append(message)
for message_list in serialization_tag_dict.itervalues():
# Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key)
distributable_uid_set.add(message_list[0].uid)
distributable_count = len(distributable_uid_set)
if distributable_count:
activity_tool.SQLBase_assignMessage(table=self.sql_table,
processing_node=0, uid=tuple(distributable_uid_set))
validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT:
return
offset += READ_MESSAGE_LIMIT offset += READ_MESSAGE_LIMIT
# Validation private methods # Validation private methods
......
...@@ -2899,19 +2899,12 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2899,19 +2899,12 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(len(result), 2) self.assertEqual(len(result), 2)
activity_tool.distribute() activity_tool.distribute()
result = activity_tool.getMessageList() result = activity_tool.getMessageList()
# If activity is SQLDict, serialization tag prevents validating the same # at most 1 activity for a given serialization tag can be validated
# serialization tagged messages simultaneously. message, = [x for x in result if x.processing_node == 0]
# If activity is SQLQueue, this does not happen. self.assertEqual(message.method_id, 'getId')
if activity=='SQLDict': # the other one is still waiting for validation
# one is validated. message, = [x for x in result if x.processing_node == -1]
message, = [x for x in result if x.processing_node == 0] self.assertEqual(message.method_id, 'getTitle')
self.assertEqual(message.method_id, 'getId')
# the other one is still waiting for validation.
message, = [x for x in result if x.processing_node == -1]
self.assertEqual(message.method_id, 'getTitle')
else:
# both are validated at once.
self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)
self.tic() self.tic()
result = activity_tool.getMessageList() result = activity_tool.getMessageList()
self.assertEqual(len(result), 0) self.assertEqual(len(result), 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment