diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 0c42093c7cfce967855ac12aea04cd0fb819df21..19e05ca18a9806c71b925c5e46658fd5e53a7a6e 100644 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -178,20 +178,35 @@ class SQLDict(RAMDict, SQLBase): if len(uid_list): activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list) - def deleteDuplicatedLineList(self, activity_tool, date, processing_node, line): + def getDuplicateMessageUidList(self, activity_tool, line, processing_node): """ - Delete all messages matching given one except itself. - Operator Value - != uid - <= date - = path, method_id, group_method_id, order_validation_text, - processing_node, tag + Reserve unreserved messages matching given line. + Return their uids. """ - activity_tool.SQLDict_deleteDuplicatedMessageList( - processing_node=processing_node, uid=line.uid, - path=line.path, method_id=line.method_id, - group_method_id=line.group_method_id, - order_validation_text=line.order_validation_text) + try: + result = activity_tool.SQLDict_selectDuplicatedLineList( + path=line.path, + method_id=line.method_id, + group_method_id=line.group_method_id, + order_validation_text=line.order_validation_text + ) + uid_list = [x.uid for x in result] + if len(uid_list): + activity_tool.SQLDict_reserveDuplicatedLineList( + processing_node=processing_node, + uid_list=uid_list + ) + else: + # Release locks + activity_tool.SQLDict_commit() + except: + # Log + LOG('SQLDict', WARNING, 'getDuplicateMessageUidList got an exception', error=sys.exc_info()) + # Release lock + activity_tool.SQLDict_rollback() + # And re-raise + raise + return uid_list def getProcessableMessageList(self, activity_tool, processing_node): """ @@ -217,13 +232,14 @@ class SQLDict(RAMDict, SQLBase): unclean state. Returned values: - 3-tuple: + 4-tuple: - list of 3-tuple: - message uid - message - priority - impacted object count - group_method_id + - uid_to_duplicate_uid_list_dict """ def getReservedMessageList(**kw): line_list = self.getReservedMessageList(activity_tool=activity_tool, @@ -233,9 +249,12 @@ class SQLDict(RAMDict, SQLBase): if len(line_list): LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list])) return line_list - def deleteDuplicatedLineList(line): - self.deleteDuplicatedLineList(activity_tool=activity_tool, date=now_date, - processing_node=processing_node, line=line) + def getDuplicateMessageUidList(line): + uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool, + line=line, processing_node=processing_node) + if len(uid_list): + LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, )) + return uid_list def makeMessageListAvailable(uid_list): self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list) BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number @@ -248,14 +267,14 @@ class SQLDict(RAMDict, SQLBase): group_method_id = None try: result = getReservedMessageList(limit=1) + uid_to_duplicate_uid_list_dict = {} if len(result) > 0: line = result[0] m = self.loadMessage(line.message, uid=line.uid) append(line, m) group_method_id = line.group_method_id - # Delete all messages matching current one - except current one. - deleteDuplicatedLineList(line) activity_tool.SQLDict_processMessage(uid=[line.uid]) + uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line)) if group_method_id not in (None, '', '\0'): # Count the number of objects to prevent too many objects. count += len(m.getObjectList(activity_tool)) @@ -272,11 +291,12 @@ class SQLDict(RAMDict, SQLBase): # So what remains to be filtered on are path, method_id, # order_validation_text, tag key = (line.path, line.method_id, line.order_validation_text, line.tag) - if key in path_and_method_id_dict: - LOG('SQLDict', TRACE, 'Duplicate of message %r has been skipped (it should already be deleted anyway): %r' % (path_and_method_id_dict[key], line.uid)) + original_uid = path_and_method_id_dict.get(key) + if original_uid is not None: + uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid) continue path_and_method_id_dict[key] = line.uid - deleteDuplicatedLineList(line) + uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line)) if count < MAX_GROUPED_OBJECTS: m = self.loadMessage(line.message, uid=line.uid) count += len(m.getObjectList(activity_tool)) @@ -286,7 +306,7 @@ class SQLDict(RAMDict, SQLBase): activity_tool.SQLDict_processMessage(uid=[x[0] for x in message_list]) # Unreserve extra messages as soon as possible. makeMessageListAvailable(unreserve_uid_list) - return message_list, count, group_method_id + return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict except: LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info()) if len(message_list): @@ -300,9 +320,9 @@ class SQLDict(RAMDict, SQLBase): LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, )) else: LOG('SQLDict', TRACE, '(no message was reserved)') - return [], 0, None + return [], 0, None, {} - def finalizeMessageExecution(self, activity_tool, message_uid_priority_list): + def finalizeMessageExecution(self, activity_tool, message_uid_priority_list, uid_to_duplicate_uid_list_dict): """ If everything was fine, delete all messages. If anything failed, make successfull messages available (if any), and @@ -326,11 +346,18 @@ class SQLDict(RAMDict, SQLBase): if m.is_executed: if something_failed: make_available_uid_list.append(uid) + make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, [])) else: deletable_uid_list.append(uid) + deletable_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, [])) if m.active_process: + # XXX: Bug here: Even if a duplicate message has an active_process, + # it won't be called on the duplicate. message_with_active_process_list.append(m) else: + # Whatever happens, duplicate uids are to be made available. Only + # executed message will get to lower priority or error state. + make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, [])) exc_type = m.exc_info[0] if type(exc_type) is ClassType and \ issubclass(exc_type, ConflictError): @@ -388,9 +415,14 @@ class SQLDict(RAMDict, SQLBase): # Queue semantic def dequeueMessage(self, activity_tool, processing_node): - def makeMessageListAvailable(uid_list): - self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list) - message_uid_priority_list, count, group_method_id = \ + def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict): + final_uid_list = [] + for uid in uid_list: + final_uid_list.append(uid) + final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, [])) + self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list) + message_uid_priority_list, count, group_method_id, \ + uid_to_duplicate_uid_list_dict = \ self.getProcessableMessageList(activity_tool, processing_node) if len(message_uid_priority_list): # Remove group_id parameter from group_method_id @@ -425,7 +457,7 @@ class SQLDict(RAMDict, SQLBase): raise to_free_uid_list = [x[0] for x in message_uid_priority_list] try: - makeMessageListAvailable(to_free_uid_list) + makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict) except: LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info()) else: @@ -451,12 +483,12 @@ class SQLDict(RAMDict, SQLBase): x[1].is_executed = 0 failed_message_uid_list = [x[0] for x in message_uid_priority_list] try: - makeMessageListAvailable(failed_message_uid_list) + makeMessageListAvailable(failed_message_uid_list, uid_to_duplicate_uid_list_dict) except: LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (failed_message_uid_list, ), error=sys.exc_info()) else: LOG('SQLDict', TRACE, 'Freed messages %r' % (failed_message_uid_list, )) - self.finalizeMessageExecution(activity_tool, message_uid_priority_list) + self.finalizeMessageExecution(activity_tool, message_uid_priority_list, uid_to_duplicate_uid_list_dict) get_transaction().commit() return not len(message_uid_priority_list) diff --git a/product/CMFActivity/skins/activity/SQLDict_commit.zsql b/product/CMFActivity/skins/activity/SQLDict_commit.zsql new file mode 100644 index 0000000000000000000000000000000000000000..bdbf5b302f8e5ef7eac802ebcd6399dd88635ac9 --- /dev/null +++ b/product/CMFActivity/skins/activity/SQLDict_commit.zsql @@ -0,0 +1,11 @@ +<dtml-comment> +title: +connection_id:cmf_activity_sql_connection +max_rows:1000 +max_cache:0 +cache_time:0 +class_name: +class_file: +</dtml-comment> +<params></params> +COMMIT diff --git a/product/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql b/product/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql new file mode 100644 index 0000000000000000000000000000000000000000..a232a0e9ac50e668880444b7cc6e72bb4e8e6750 --- /dev/null +++ b/product/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql @@ -0,0 +1,21 @@ +<dtml-comment> +title: +connection_id:cmf_activity_sql_connection +max_rows:0 +max_cache:0 +cache_time:0 +class_name: +class_file: +</dtml-comment> +<params> +processing_node +uid_list +</params> +UPDATE + message +SET + processing_node=<dtml-sqlvar processing_node type="int"> +WHERE + uid IN (<dtml-in uid_list><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>) +<dtml-var sql_delimiter> +COMMIT diff --git a/product/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql b/product/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql similarity index 71% rename from product/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql rename to product/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql index 435475fa947ff75d808359930a9022044bd0a51f..d3577f6c37881846ad8e08eea624e4a933aecc30 100644 --- a/product/CMFActivity/skins/activity/SQLDict_deleteDuplicatedMessageList.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql @@ -5,24 +5,20 @@ max_rows:0 max_cache:0 cache_time:0 class_name: -class_file: +class_ </dtml-comment> <params> -processing_node -uid path method_id group_method_id order_validation_text </params> -DELETE FROM +SELECT uid FROM message WHERE - processing_node IN (0, <dtml-sqlvar processing_node type="int">) - AND uid != <dtml-sqlvar uid type="int"> + processing_node = 0 AND path = <dtml-sqlvar path type="string"> AND method_id = <dtml-sqlvar method_id type="string"> AND group_method_id = <dtml-sqlvar group_method_id type="string"> AND order_validation_text = <dtml-sqlvar order_validation_text type="string"> -<dtml-var sql_delimiter> -COMMIT +FOR UPDATE