Commit 82692fc0 authored by Vincent Pelletier's avatar Vincent Pelletier

Instead of deleting duplicate messages, reserve them, and delete them when the...

Instead of deleting duplicate messages, reserve them, and delete them when the "original" message has succeeded.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@19192 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 3ec823f9
......@@ -178,20 +178,35 @@ class SQLDict(RAMDict, SQLBase):
if len(uid_list):
activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)
def deleteDuplicatedLineList(self, activity_tool, date, processing_node, line):
def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
"""
Delete all messages matching given one except itself.
Operator Value
!= uid
<= date
= path, method_id, group_method_id, order_validation_text,
processing_node, tag
Reserve unreserved messages matching given line.
Return their uids.
"""
activity_tool.SQLDict_deleteDuplicatedMessageList(
processing_node=processing_node, uid=line.uid,
path=line.path, method_id=line.method_id,
try:
result = activity_tool.SQLDict_selectDuplicatedLineList(
path=line.path,
method_id=line.method_id,
group_method_id=line.group_method_id,
order_validation_text=line.order_validation_text)
order_validation_text=line.order_validation_text
)
uid_list = [x.uid for x in result]
if len(uid_list):
activity_tool.SQLDict_reserveDuplicatedLineList(
processing_node=processing_node,
uid_list=uid_list
)
else:
# Release locks
activity_tool.SQLDict_commit()
except:
# Log
LOG('SQLDict', WARNING, 'getDuplicateMessageUidList got an exception', error=sys.exc_info())
# Release lock
activity_tool.SQLDict_rollback()
# And re-raise
raise
return uid_list
def getProcessableMessageList(self, activity_tool, processing_node):
"""
......@@ -217,13 +232,14 @@ class SQLDict(RAMDict, SQLBase):
unclean state.
Returned values:
3-tuple:
4-tuple:
- list of 3-tuple:
- message uid
- message
- priority
- impacted object count
- group_method_id
- uid_to_duplicate_uid_list_dict
"""
def getReservedMessageList(**kw):
line_list = self.getReservedMessageList(activity_tool=activity_tool,
......@@ -233,9 +249,12 @@ class SQLDict(RAMDict, SQLBase):
if len(line_list):
LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
return line_list
def deleteDuplicatedLineList(line):
self.deleteDuplicatedLineList(activity_tool=activity_tool, date=now_date,
processing_node=processing_node, line=line)
def getDuplicateMessageUidList(line):
uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool,
line=line, processing_node=processing_node)
if len(uid_list):
LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
return uid_list
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
......@@ -248,14 +267,14 @@ class SQLDict(RAMDict, SQLBase):
group_method_id = None
try:
result = getReservedMessageList(limit=1)
uid_to_duplicate_uid_list_dict = {}
if len(result) > 0:
line = result[0]
m = self.loadMessage(line.message, uid=line.uid)
append(line, m)
group_method_id = line.group_method_id
# Delete all messages matching current one - except current one.
deleteDuplicatedLineList(line)
activity_tool.SQLDict_processMessage(uid=[line.uid])
uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
if group_method_id not in (None, '', '\0'):
# Count the number of objects to prevent too many objects.
count += len(m.getObjectList(activity_tool))
......@@ -272,11 +291,12 @@ class SQLDict(RAMDict, SQLBase):
# So what remains to be filtered on are path, method_id,
# order_validation_text, tag
key = (line.path, line.method_id, line.order_validation_text, line.tag)
if key in path_and_method_id_dict:
LOG('SQLDict', TRACE, 'Duplicate of message %r has been skipped (it should already be deleted anyway): %r' % (path_and_method_id_dict[key], line.uid))
original_uid = path_and_method_id_dict.get(key)
if original_uid is not None:
uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
continue
path_and_method_id_dict[key] = line.uid
deleteDuplicatedLineList(line)
uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
if count < MAX_GROUPED_OBJECTS:
m = self.loadMessage(line.message, uid=line.uid)
count += len(m.getObjectList(activity_tool))
......@@ -286,7 +306,7 @@ class SQLDict(RAMDict, SQLBase):
activity_tool.SQLDict_processMessage(uid=[x[0] for x in message_list])
# Unreserve extra messages as soon as possible.
makeMessageListAvailable(unreserve_uid_list)
return message_list, count, group_method_id
return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
except:
LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
if len(message_list):
......@@ -300,9 +320,9 @@ class SQLDict(RAMDict, SQLBase):
LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
else:
LOG('SQLDict', TRACE, '(no message was reserved)')
return [], 0, None
return [], 0, None, {}
def finalizeMessageExecution(self, activity_tool, message_uid_priority_list):
def finalizeMessageExecution(self, activity_tool, message_uid_priority_list, uid_to_duplicate_uid_list_dict):
"""
If everything was fine, delete all messages.
If anything failed, make successfull messages available (if any), and
......@@ -326,11 +346,18 @@ class SQLDict(RAMDict, SQLBase):
if m.is_executed:
if something_failed:
make_available_uid_list.append(uid)
make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
else:
deletable_uid_list.append(uid)
deletable_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
if m.active_process:
# XXX: Bug here: Even if a duplicate message has an active_process,
# it won't be called on the duplicate.
message_with_active_process_list.append(m)
else:
# Whatever happens, duplicate uids are to be made available. Only
# executed message will get to lower priority or error state.
make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
exc_type = m.exc_info[0]
if type(exc_type) is ClassType and \
issubclass(exc_type, ConflictError):
......@@ -388,9 +415,14 @@ class SQLDict(RAMDict, SQLBase):
# Queue semantic
def dequeueMessage(self, activity_tool, processing_node):
def makeMessageListAvailable(uid_list):
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
message_uid_priority_list, count, group_method_id = \
def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
final_uid_list = []
for uid in uid_list:
final_uid_list.append(uid)
final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
message_uid_priority_list, count, group_method_id, \
uid_to_duplicate_uid_list_dict = \
self.getProcessableMessageList(activity_tool, processing_node)
if len(message_uid_priority_list):
# Remove group_id parameter from group_method_id
......@@ -425,7 +457,7 @@ class SQLDict(RAMDict, SQLBase):
raise
to_free_uid_list = [x[0] for x in message_uid_priority_list]
try:
makeMessageListAvailable(to_free_uid_list)
makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
except:
LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
else:
......@@ -451,12 +483,12 @@ class SQLDict(RAMDict, SQLBase):
x[1].is_executed = 0
failed_message_uid_list = [x[0] for x in message_uid_priority_list]
try:
makeMessageListAvailable(failed_message_uid_list)
makeMessageListAvailable(failed_message_uid_list, uid_to_duplicate_uid_list_dict)
except:
LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (failed_message_uid_list, ), error=sys.exc_info())
else:
LOG('SQLDict', TRACE, 'Freed messages %r' % (failed_message_uid_list, ))
self.finalizeMessageExecution(activity_tool, message_uid_priority_list)
self.finalizeMessageExecution(activity_tool, message_uid_priority_list, uid_to_duplicate_uid_list_dict)
get_transaction().commit()
return not len(message_uid_priority_list)
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
processing_node
uid_list
</params>
UPDATE
message
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
uid IN (<dtml-in uid_list><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>, </dtml-if></dtml-in>)
<dtml-var sql_delimiter>
COMMIT
......@@ -5,24 +5,20 @@ max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
class_
</dtml-comment>
<params>
processing_node
uid
path
method_id
group_method_id
order_validation_text
</params>
DELETE FROM
SELECT uid FROM
message
WHERE
processing_node IN (0, <dtml-sqlvar processing_node type="int">)
AND uid != <dtml-sqlvar uid type="int">
processing_node = 0
AND path = <dtml-sqlvar path type="string">
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
AND order_validation_text = <dtml-sqlvar order_validation_text type="string">
<dtml-var sql_delimiter>
COMMIT
FOR UPDATE
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment