Commit 44f0fccc authored by Vincent Pelletier's avatar Vincent Pelletier

CMFActivity: Stop deleting duplicates during SQLDict.distribute

Duplicate message detection is not good enough: different messages with
the same unicity value may bear different serialization_tags. This code
does not takes this into account, which can lead to deleting such tagged
message and validate an untagged one, which breaks serialization_tag
contract of preventing any further activity validation until execution
of all such-tagged validated activities is successful.
Also, it is not validation's node job to deduplicate: it can happen during
message execution without slowing down this crucial (performance-wise)
activity node.
As a result, distribute methods of SQLDict and SQLQueue can be factorised.
parent fd398269
......@@ -40,6 +40,10 @@ from Products.CMFActivity.ActivityRuntimeEnvironment import (
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH
from Products.CMFActivity.Errors import ActivityFlushError
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
# TODO: Limit by size in bytes instead of number of rows.
MAX_MESSAGE_LIST_SIZE = 100
INVOKE_ERROR_STATE = -2
......@@ -315,6 +319,62 @@ class SQLBase(Queue):
LOG(self.__class__.__name__, severity, summary,
error=severity>INFO and sys.exc_info() or None)
def distribute(self, activity_tool, node_count):
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
if assignMessage is None:
return
offset = 0
now_date = self.getNow(activity_tool)
validated_count = 0
while 1:
result = self._getMessageList(
activity_tool,
processing_node=-1,
to_date=now_date,
offset=offset,
count=READ_MESSAGE_LIMIT,
)
if not result:
return
transaction.commit()
validation_text_dict = {'none': 1}
message_dict = {}
for line in result:
message = Message.load(line.message, uid=line.uid, line=line)
if not hasattr(message, 'order_validation_text'): # BBB
message.order_validation_text = self.getOrderValidationText(message)
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date)
if message_dict:
distributable_uid_set = set()
serialization_tag_dict = {}
for message in message_dict.itervalues():
serialization_tag = message.activity_kw.get('serialization_tag')
if serialization_tag is None:
distributable_uid_set.add(message.uid)
else:
serialization_tag_dict.setdefault(serialization_tag,
[]).append(message)
for message_list in serialization_tag_dict.itervalues():
# Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key)
distributable_uid_set.add(message_list[0].uid)
group_method_id = message_list[0].line.group_method_id
if group_method_id == '\0':
continue
for message in message_list[1:]:
if group_method_id == message.line.group_method_id:
distributable_uid_set.add(message.uid)
distributable_count = len(distributable_uid_set)
if distributable_count:
assignMessage(table=self.sql_table,
processing_node=0, uid=tuple(distributable_uid_set))
validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT:
return
offset += READ_MESSAGE_LIMIT
def getReservedMessageList(self, activity_tool, date, processing_node,
limit=None, group_method_id=None):
"""
......
......@@ -147,78 +147,3 @@ class SQLDict(SQLBase):
# earlier.
return None, original_uid, [uid]
return load
def distribute(self, activity_tool, node_count):
offset = 0
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
if assignMessage is not None:
now_date = self.getNow(activity_tool)
validated_count = 0
while 1:
result = self._getMessageList(activity_tool, processing_node=-1,
to_date=now_date,
offset=offset, count=READ_MESSAGE_LIMIT)
if not result:
return
transaction.commit()
validation_text_dict = {'none': 1}
message_dict = {}
for line in result:
message = Message.load(line.message, uid=line.uid, line=line)
if not hasattr(message, 'order_validation_text'): # BBB
message.order_validation_text = self.getOrderValidationText(message)
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date)
if message_dict:
message_unique_dict = {}
serialization_tag_dict = {}
distributable_uid_set = set()
deletable_uid_list = []
# remove duplicates
# SQLDict considers object_path, method_id, tag to unify activities,
# but ignores method arguments. They are outside of semantics.
for message in message_dict.itervalues():
message_unique_dict.setdefault(self.generateMessageUID(message),
[]).append(message)
for message_list in message_unique_dict.itervalues():
if len(message_list) > 1:
# Sort list of duplicates to keep the message with highest score
message_list.sort(key=sort_message_key)
deletable_uid_list += [m.uid for m in message_list[1:]]
message = message_list[0]
serialization_tag = message.activity_kw.get('serialization_tag')
if serialization_tag is None:
distributable_uid_set.add(message.uid)
else:
serialization_tag_dict.setdefault(serialization_tag,
[]).append(message)
# Don't let through if there is the same serialization tag in the
# message dict. If there is the same serialization tag, only one can
# be validated and others must wait.
# But messages with group_method_id are exceptions. serialization_tag
# does not stop validating together. Because those messages should
# be processed together at once.
for message_list in serialization_tag_dict.itervalues():
# Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key)
distributable_uid_set.add(message_list[0].uid)
group_method_id = message_list[0].line.group_method_id
if group_method_id == '\0':
continue
for message in message_list[1:]:
if group_method_id == message.line.group_method_id:
distributable_uid_set.add(message.uid)
if deletable_uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=deletable_uid_list)
distributable_count = len(distributable_uid_set)
if distributable_count:
assignMessage(table=self.sql_table,
processing_node=0, uid=tuple(distributable_uid_set))
validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT:
return
offset += READ_MESSAGE_LIMIT
......@@ -26,17 +26,7 @@
#
##############################################################################
from Products.CMFActivity.ActivityTool import Message
from ZODB.POSException import ConflictError
from SQLBase import SQLBase, sort_message_key
from zExceptions import ExceptionFormatter
import transaction
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
from SQLBase import SQLBase
class SQLQueue(SQLBase):
"""
......@@ -46,54 +36,3 @@ class SQLQueue(SQLBase):
"""
sql_table = 'message_queue'
uid_group = 'portal_activity_queue'
def distribute(self, activity_tool, node_count):
offset = 0
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
if assignMessage is not None:
now_date = self.getNow(activity_tool)
validated_count = 0
while 1:
result = self._getMessageList(activity_tool, processing_node=-1,
to_date=now_date, processing=0,
offset=offset, count=READ_MESSAGE_LIMIT)
if not result:
return
transaction.commit()
validation_text_dict = {'none': 1}
message_dict = {}
for line in result:
message = Message.load(line.message, uid=line.uid, line=line)
if not hasattr(message, 'order_validation_text'): # BBB
message.order_validation_text = self.getOrderValidationText(message)
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date)
if message_dict:
distributable_uid_set = set()
serialization_tag_dict = {}
for message in message_dict.itervalues():
serialization_tag = message.activity_kw.get('serialization_tag')
if serialization_tag is None:
distributable_uid_set.add(message.uid)
else:
serialization_tag_dict.setdefault(serialization_tag,
[]).append(message)
for message_list in serialization_tag_dict.itervalues():
# Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key)
distributable_uid_set.add(message_list[0].uid)
group_method_id = message_list[0].line.group_method_id
if group_method_id == '\0':
continue
for message in message_list[1:]:
if group_method_id == message.line.group_method_id:
distributable_uid_set.add(message.uid)
distributable_count = len(distributable_uid_set)
if distributable_count:
assignMessage(table=self.sql_table,
processing_node=0, uid=tuple(distributable_uid_set))
validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT:
return
offset += READ_MESSAGE_LIMIT
......@@ -1632,65 +1632,57 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally:
del activity_tool.__class__.doSomething
def test_103_2_CheckSQLDictDeleteDuplicatesBeforeExecution(self):
"""
Test that SQLDict delete the same messages before execution if messages
has the same method_id and path and tag.
def test_103_2_CheckSQLDictDoesNotDeleteDuplicatesBeforeExecution(self):
"""
Test that SQLDict does not delete messages before execution
even if messages have the same method_id and path and tag.
There could be other things which differ (ex: serialization_tag) and may
not all be cheap to check during validation. Validation node is the only
non-paralelisable Zope-side task around activities, so it should be kept
simple.
Deduplication is cheap:
- inside the transaction which spawned duplicate activities, because it
has to have created activities around anyway, and can keep track
- inside the CMFActvitiy-level processing surrounding activity execution
because it has to load the activities to process them anyway
"""
activity_tool = self.getActivityTool()
marker = []
def doSomething(self, other_tag):
marker.append(self.countMessage(tag=other_tag))
activity_tool.__class__.doSomething = doSomething
try:
# Adds two same activities.
activity_tool.activate(activity='SQLDict', after_tag='foo', priority=2,
tag='a').doSomething(other_tag='a')
tag='a').getId()
self.commit()
uid1, = [x.uid for x in activity_tool.getMessageList()]
activity_tool.activate(activity='SQLDict', after_tag='bar', priority=1,
tag='a').doSomething(other_tag='a')
tag='a').getId()
self.commit()
uid2, = [x.uid for x in activity_tool.getMessageList() if x.uid != uid1]
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.distribute()
# After distribute, duplicate is deleted.
uid2, = [x.uid for x in activity_tool.getMessageList()]
self.assertNotEqual(uid1, uid2)
# After distribute, duplicate is still present.
self.assertItemsEqual([uid1, uid2], [x.uid for x in activity_tool.getMessageList()])
activity_tool.tic()
self.assertEqual(len(activity_tool.getMessageList()), 0)
self.assertEqual(marker, [1])
finally:
del activity_tool.__class__.doSomething
def test_103_3_CheckSQLJoblibDeleteDuplicatesBeforeExecution(self):
def test_103_3_CheckSQLJoblibDoesNotDeleteDuplicatesBeforeExecution(self):
"""
Test that SQLJoblib delete the same messages before execution if messages
has the same method_id and path and tag and signature.
(see test_103_2_CheckSQLDictDoesNotDeleteDuplicatesBeforeExecution)
"""
activity_tool = self.getActivityTool()
marker = []
def doSomething(self, other_tag):
marker.append(self.countMessage(tag=other_tag))
activity_tool.__class__.doSomething = doSomething
try:
# Adds two same activities.
activity_tool.activate(activity='SQLJoblib', after_tag='foo', priority=2,
tag='a').doSomething(other_tag='a')
tag='a').getId()
self.commit()
uid1, = [x.uid for x in activity_tool.getMessageList()]
activity_tool.activate(activity='SQLJoblib', after_tag='bar', priority=1,
tag='a').doSomething(other_tag='a')
tag='a').getId()
self.commit()
uid2, = [x.uid for x in activity_tool.getMessageList() if x.uid != uid1]
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.distribute()
# After distribute, duplicate is deleted.
uid2, = [x.uid for x in activity_tool.getMessageList()]
self.assertNotEqual(uid1, uid2)
# After distribute, duplicate is still present.
self.assertItemsEqual([uid1, uid2], [x.uid for x in activity_tool.getMessageList()])
activity_tool.tic()
self.assertEqual(len(activity_tool.getMessageList()), 0)
self.assertEqual(marker, [1])
finally:
del activity_tool.__class__.doSomething
def test_103_4_CheckSQLDictDistributeWithSerializationTagAndGroupMethodId(
self):
......@@ -2279,7 +2271,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
for message in result
if (message.processing_node==-1 and
message.serialization_tag=='test_115')]),
3)
4)
self.assertEqual(len([message
for message in result
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment