Commit e7031926 authored by Hardik Juneja's avatar Hardik Juneja

CMFActivity: revert changes for ActivityBuffer._register, override...

CMFActivity: revert changes for ActivityBuffer._register, override prepareQueueMessageList and remove distrubute in SQLJoblib
parent c9c06dda
......@@ -83,9 +83,6 @@ class Queue(object):
def initialize(self, activity_tool, clear):
pass
def register(self, activity_buffer, activity_tool, message):
pass
def activateResult(self, active_process, result, signature=None):
pass
......
......@@ -75,16 +75,18 @@ class SQLJoblib(SQLDict):
LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src))
def register(self, activity_buffer, activity_tool, message):
def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'),
m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
def registerMessage(self, activity_buffer, activity_tool, message):
"""
Send messages to mysql directly
"""
assert not message.is_registered, message
message.is_registered = True
if not 'signature' in message.activity_kw:
message.activity_kw['signature'] = sqljoblib_hash(message.args)
if activity_buffer.activity_tool is None:
self.activity_tool = activity_tool
super(SQLJoblib, self).registerMessage(activity_buffer, activity_tool, message)
self.prepareMessage(activity_tool, message)
def storeResult(self, message, active_process, result):
......@@ -92,6 +94,9 @@ class SQLJoblib(SQLDict):
result.signature = int(signature)
active_process.setResult(result)
def prepareQueueMessageList(self, activity_tool, message_list):
pass
def prepareMessage(self, activity_tool, m):
portal = activity_tool.getPortalObject()
if m.is_registered:
......@@ -148,84 +153,4 @@ class SQLJoblib(SQLDict):
# We know that original_uid != uid because caller skips lines we returned
# earlier.
return None, original_uid, [uid]
return load
def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'),
m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
def distribute(self, activity_tool, node_count):
offset = 0
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
if assignMessage is not None:
now_date = self.getNow(activity_tool)
validated_count = 0
while 1:
result = self._getMessageList(activity_tool, processing_node=-1,
to_date=now_date,
offset=offset, count=READ_MESSAGE_LIMIT)
if not result:
return
transaction.commit()
validation_text_dict = {'none': 1}
message_dict = {}
for line in result:
message = Message.load(line.message, uid=line.uid, line=line)
if not hasattr(message, 'order_validation_text'): # BBB
message.order_validation_text = self.getOrderValidationText(message)
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date)
if message_dict:
message_unique_dict = {}
serialization_tag_dict = {}
distributable_uid_set = set()
deletable_uid_list = []
# remove duplicates
# SQLDict considers object_path, method_id, tag to unify activities,
# but ignores method arguments. They are outside of semantics.
for message in message_dict.itervalues():
message_unique_dict.setdefault(self.generateMessageUID(message),
[]).append(message)
for message_list in message_unique_dict.itervalues():
if len(message_list) > 1:
# Sort list of duplicates to keep the message with highest score
message_list.sort(key=sort_message_key)
deletable_uid_list += [m.uid for m in message_list[1:]]
message = message_list[0]
serialization_tag = message.activity_kw.get('serialization_tag')
if serialization_tag is None:
distributable_uid_set.add(message.uid)
else:
serialization_tag_dict.setdefault(serialization_tag,
[]).append(message)
# Don't let through if there is the same serialization tag in the
# message dict. If there is the same serialization tag, only one can
# be validated and others must wait.
# But messages with group_method_id are exceptions. serialization_tag
# does not stop validating together. Because those messages should
# be processed together at once.
for message_list in serialization_tag_dict.itervalues():
# Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key)
distributable_uid_set.add(message_list[0].uid)
group_method_id = message_list[0].line.group_method_id
if group_method_id == '\0':
continue
for message in message_list[1:]:
if group_method_id == message.line.group_method_id:
distributable_uid_set.add(message.uid)
if deletable_uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=deletable_uid_list)
distributable_count = len(distributable_uid_set)
if distributable_count:
assignMessage(table=self.sql_table,
processing_node=0, uid=tuple(distributable_uid_set))
validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT:
return
offset += READ_MESSAGE_LIMIT
return load
\ No newline at end of file
......@@ -48,7 +48,7 @@ class ActivityBuffer(TM):
def getUidSet(self, activity):
return self.uid_set_dict[activity]
def register(self, activity_tool):
def _register(self, activity_tool):
TM._register(self)
if self.activity_tool is None:
self.activity_tool = activity_tool
......@@ -70,7 +70,9 @@ class ActivityBuffer(TM):
raise
def deferredQueueMessage(self, activity_tool, activity, message):
activity.register(self, activity_tool, message)
self._register(activity_tool)
assert not message.is_registered, message
activity.registerMessage(self, activity_tool, message)
def sortKey(self, *ignored):
"""Activities must be finished before databases commit transactions."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment