diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 5949609a136ed074fdbf8be185c0051fb2f8829e..4be44eb12e7623c2e529ac51676928eaef14e4ca 100644 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -507,62 +507,61 @@ class SQLDict(RAMDict, SQLBase): validation_text_dict = {'none': 1} message_dict = {} for line in result: - message = self.loadMessage(line.message, uid = line.uid, - order_validation_text = line.order_validation_text) + message = self.loadMessage(line.message, uid=line.uid, line=line, + order_validation_text=line.order_validation_text) self.getExecutableMessageList(activity_tool, message, message_dict, validation_text_dict, now_date=now_date) if message_dict: - message_unique_set = set() + def sort_message_key(message): + # same sort key as in SQLDict_readMessageList + return message.line.priority, message.line.date, message.uid + message_unique_dict = {} + serialization_tag_dict = {} + distributable_uid_set = set() deletable_uid_list = [] + # remove duplicates # SQLDict considers object_path, method_id, tag to unify activities, # but ignores method arguments. They are outside of semantics. - for key in message_dict.keys(): - # we manipulate message_dict below so that we cannot use - # iterator here. - message = message_dict[key] - unique_key = self.generateMessageUID(message) - if unique_key in message_unique_set: - deletable_uid_list.append(message.uid) - del message_dict[message.uid] - else: - message_unique_set.add(unique_key) + for message in message_dict.itervalues(): + message_unique_dict.setdefault(self.generateMessageUID(message), + []).append(message) + for message_list in message_unique_dict.itervalues(): + if len(message_list) > 1: + # Sort list of duplicates to keep the message with highest score + message_list.sort(key=sort_message_key) + deletable_uid_list += [m.uid for m in message_list[1:]] + message = message_list[0] + distributable_uid_set.add(message.uid) + serialization_tag = message.activity_kw.get('serialization_tag') + if serialization_tag is not None: + serialization_tag_dict.setdefault(serialization_tag, + []).append(message) # Don't let through if there is the same serialization tag in the # message dict. If there is the same serialization tag, only one can # be validated and others must wait. # But messages with group_method_id are exceptions. serialization_tag # does not stop validating together. Because those messages should # be processed together at once. - serialization_tag_set = set() - serialization_tag_group_method_id_dict = {} - for key in message_dict.keys(): - message = message_dict[key] - # serialize messages with serialization_tag. - serialization_tag = message.activity_kw.get('serialization_tag') - group_method_id = message.activity_kw.get('group_method_id') - if serialization_tag is not None: - if serialization_tag in serialization_tag_set: - if group_method_id is not None: - # Only one group_method_id can pass through. - if serialization_tag_group_method_id_dict.get( - serialization_tag,None) != group_method_id: - del message_dict[message.uid] - else: - del message_dict[message.uid] - else: - serialization_tag_set.add(serialization_tag) - if group_method_id is not None: - serialization_tag_group_method_id_dict[serialization_tag] = group_method_id + for message_list in serialization_tag_dict.itervalues(): + if len(message_list) == 1: + continue + # Sort list of messages to validate the message with highest score + message_list.sort(key=sort_message_key) + group_method_id = message_list[0].activity_kw.get('group_method_id') + for message in message_list[1:]: + if group_method_id is None or \ + group_method_id != message.activity_kw.get('group_method_id'): + distributable_uid_set.remove(message.uid) if deletable_uid_list: activity_tool.SQLBase_delMessage(table=self.sql_table, uid=deletable_uid_list) - - distributable_count = len(message_dict) - if distributable_count: - activity_tool.SQLBase_assignMessage(table=self.sql_table, - processing_node=0, uid=[m.uid for m in message_dict.itervalues()]) - validated_count += distributable_count + distributable_count = len(distributable_uid_set) + if distributable_count: + activity_tool.SQLBase_assignMessage(table=self.sql_table, + processing_node=0, uid=tuple(distributable_uid_set)) + validated_count += distributable_count if validated_count < MAX_VALIDATED_LIMIT: offset += READ_MESSAGE_LIMIT result = readMessageList(path=None, method_id=None, processing_node=-1, diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index 678c047425eb02299fb485bd767595bad82c5787..49e01fc47ea55723711a89e2d418d4e3d5e8231e 100644 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -2720,14 +2720,16 @@ class TestCMFActivity(ERP5TypeTestCase): try: Organisation.checkActivityCount = checkActivityCount # Adds two same activities. - organisation.activate(activity='SQLDict', tag='a').checkActivityCount(other_tag='a') + organisation.activate(activity='SQLDict', tag='a', priority=2).checkActivityCount(other_tag='a') get_transaction().commit() - organisation.activate(activity='SQLDict', tag='a').checkActivityCount(other_tag='a') + uid1, = [x.uid for x in activity_tool.getMessageList()] + organisation.activate(activity='SQLDict', tag='a', priority=1).checkActivityCount(other_tag='a') get_transaction().commit() self.assertEqual(len(activity_tool.getMessageList()), 2) activity_tool.distribute() # After distribute, duplicate is deleted. - self.assertEqual(len(activity_tool.getMessageList()), 1) + uid2, = [x.uid for x in activity_tool.getMessageList()] + self.assertNotEqual(uid1, uid2) self.tic() self.assertEqual(len(activity_tool.getMessageList()), 0) self.assertEqual(len(check_result_dict), 1) @@ -2886,9 +2888,9 @@ class TestCMFActivity(ERP5TypeTestCase): self.assertEqual(len(result), 0) # Second scenario: activate, activate, distribute # Both messages must be distributed (this is different from regular tags) - organisation.activate(activity=activity, serialization_tag='1').getTitle() + organisation.activate(activity=activity, serialization_tag='1', priority=2).getTitle() # Use a different method just so that SQLDict doesn't merge both activities prior to insertion. - organisation.activate(activity=activity, serialization_tag='1').getId() + organisation.activate(activity=activity, serialization_tag='1', priority=1).getId() get_transaction().commit() result = activity_tool.getMessageList() self.assertEqual(len(result), 2) @@ -2899,9 +2901,11 @@ class TestCMFActivity(ERP5TypeTestCase): # If activity is SQLQueue, this does not happen. if activity=='SQLDict': # one is validated. - self.assertEqual(len([x for x in result if x.processing_node == 0]), 1) + message, = [x for x in result if x.processing_node == 0] + self.assertEqual(message.method_id, 'getId') # the other one is still waiting for validation. - self.assertEqual(len([x for x in result if x.processing_node == -1]), 1) + message, = [x for x in result if x.processing_node == -1] + self.assertEqual(message.method_id, 'getTitle') else: # both are validated at once. self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)