Commit 40fd88e7 authored by Hardik Juneja's avatar Hardik Juneja

CMFActivity: remove useBtree and let activity type decide when to use Dict instead

parent 187ae307
...@@ -86,35 +86,20 @@ class ActiveProcess(Base): ...@@ -86,35 +86,20 @@ class ActiveProcess(Base):
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
Base.__init__(self, *args, **kw) Base.__init__(self, *args, **kw)
self.result_list = ConflictFreeLog() self.result_list = ConflictFreeLog()
self.use_btree = False self.result_dict = LOBTree()
security.declareProtected(CMFCorePermissions.ManagePortal, 'useBTree')
def useBTree(self):
# Use BTree instead of Linked List
# this is used by joblib Backend to store results in a dictionary with
# signature as key
self.use_btree = True
self.result_list = LOBTree()
security.declareProtected(CMFCorePermissions.ManagePortal, 'postResult') security.declareProtected(CMFCorePermissions.ManagePortal, 'postResult')
def postResult(self, result): def postResult(self, result):
self.appendResult(result);
security.declareProtected(CMFCorePermissions.ManagePortal, 'appendResult')
def appendResult(self, result):
try: try:
result_list = self.result_list result_list = self.result_list
except AttributeError: except AttributeError:
# BBB: self was created before implementation of __init__ # BBB: self was created before implementation of __init__
if self.use_btree: self.result_list = result_list = ConflictFreeLog()
self.result_list = result_list = LOBTree()
else:
self.result_list = result_list = ConflictFreeLog()
else: else:
if self.use_btree:
if not hasattr(result, 'signature'):
result_id = randrange(0, 10000 * (id(result) + 1))
else:
result_id = result.signature
result_list.insert(result_id, result)
return
if type(result_list) is not ConflictFreeLog: # BBB: result_list is IOBTree if type(result_list) is not ConflictFreeLog: # BBB: result_list is IOBTree
# use a random id in order to store result in a way with # use a random id in order to store result in a way with
# fewer conflict errors # fewer conflict errors
...@@ -124,16 +109,32 @@ class ActiveProcess(Base): ...@@ -124,16 +109,32 @@ class ActiveProcess(Base):
result_list[random_id] = result result_list[random_id] = result
self.result_len.change(1) self.result_len.change(1)
return return
result_list.append(result)
security.declareProtected(CMFCorePermissions.ManagePortal, 'postActiveResult')
def postActiveResult(self, *args, **kw):
return self.appendResult(ActiveResult(*args, **kw))
if self.use_btree: security.declareProtected(CMFCorePermissions.ManagePortal, 'setResult')
signature = int(result.signature, 16) def setResult(self, result):
result_list.insert(signature, result) try:
result_dict = self.result_dict
except AttributeError:
# BBB: self was created before implementation of __init__
self.result_dict = result_dict = LOBTree()
else: else:
result_list.append(result) if not hasattr(result, 'signature'):
result_id = randrange(0, 10000 * (id(result) + 1))
else:
result_id = result.signature
result_dict.insert(result_id, result)
return
signature = int(result.signature)
result_dict.insert(signature, result)
security.declareProtected(CMFCorePermissions.ManagePortal, 'postActiveResult') security.declareProtected(CMFCorePermissions.ManagePortal, 'setActiveResult')
def postActiveResult(self, *args, **kw): def postActiveResult(self, *args, **kw):
return self.postResult(ActiveResult(*args, **kw)) return self.setResult(ActiveResult(*args, **kw))
security.declareProtected(CMFCorePermissions.ManagePortal, 'getResultList') security.declareProtected(CMFCorePermissions.ManagePortal, 'getResultList')
def getResultList(self, **kw): def getResultList(self, **kw):
...@@ -157,11 +158,11 @@ class ActiveProcess(Base): ...@@ -157,11 +158,11 @@ class ActiveProcess(Base):
Returns the result Dict Returns the result Dict
""" """
try: try:
result_list = self.result_list result_dict = self.result_dict
except AttributeError: except AttributeError:
return {} return {}
if type(result_list) is not ConflictFreeLog: if type(result_dict) is not ConflictFreeLog:
return result_list return result_dict
else: else:
return {} return {}
......
...@@ -121,7 +121,10 @@ class SQLBase(Queue): ...@@ -121,7 +121,10 @@ class SQLBase(Queue):
activity_buffer.register(activity_tool) activity_buffer.register(activity_tool)
assert not message.is_registered, message assert not message.is_registered, message
self.registerMessage(activity_buffer, activity_tool, message) self.registerMessage(activity_buffer, activity_tool, message)
def activateResult(self, active_process, result, signature=None):
active_process.appendResult(result)
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered] registered_message_list = [m for m in message_list if m.is_registered]
portal = activity_tool.getPortalObject() portal = activity_tool.getPortalObject()
......
...@@ -31,6 +31,7 @@ from functools import total_ordering ...@@ -31,6 +31,7 @@ from functools import total_ordering
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from SQLBase import SQLBase, sort_message_key from SQLBase import SQLBase, sort_message_key
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
from sklearn.externals.joblib.hashing import hash as joblib_hash;
# Stop validating more messages when this limit is reached # Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000 MAX_VALIDATED_LIMIT = 1000
...@@ -78,10 +79,19 @@ class SQLJoblib(SQLDict): ...@@ -78,10 +79,19 @@ class SQLJoblib(SQLDict):
""" """
assert not message.is_registered, message assert not message.is_registered, message
message.is_registered = True message.is_registered = True
signature = joblib_hash(message.args)
sigint = int(signature, 16) % (10 ** 16)
message.activity_kw['signature'] = sigint
if activity_buffer.activity_tool is None: if activity_buffer.activity_tool is None:
self.activity_tool = activity_tool self.activity_tool = activity_tool
self.prepareMessage(activity_tool, message) self.prepareMessage(activity_tool, message)
def activateResult(self, active_process, result, signature):
result.signature = int(signature)
active_process.setResult(result)
def prepareMessage(self, activity_tool, m): def prepareMessage(self, activity_tool, m):
portal = activity_tool.getPortalObject() portal = activity_tool.getPortalObject()
if m.is_registered: if m.is_registered:
......
...@@ -166,6 +166,7 @@ class Message(BaseMessage): ...@@ -166,6 +166,7 @@ class Message(BaseMessage):
activity_kw, activity_kw,
method_id, method_id,
args, kw, args, kw,
activity=None,
request=None, request=None,
portal_activities=None, portal_activities=None,
): ):
...@@ -177,6 +178,7 @@ class Message(BaseMessage): ...@@ -177,6 +178,7 @@ class Message(BaseMessage):
self.method_id = method_id self.method_id = method_id
self.args = args self.args = args
self.kw = kw self.kw = kw
self.activity = activity
if getattr(portal_activities, 'activity_creation_trace', False): if getattr(portal_activities, 'activity_creation_trace', False):
# Save current traceback, to make it possible to tell where a message # Save current traceback, to make it possible to tell where a message
# was generated. # was generated.
...@@ -290,7 +292,8 @@ class Message(BaseMessage): ...@@ -290,7 +292,8 @@ class Message(BaseMessage):
result = ActiveResult(result=result) result = ActiveResult(result=result)
# XXX Allow other method_id in future # XXX Allow other method_id in future
result.edit(object_path=object, method_id=self.method_id) result.edit(object_path=object, method_id=self.method_id)
active_process.postResult(result) signature = getattr(self, 'signature', None)
activity_dict[self.activity].activateResult(active_process, result, signature)
def __call__(self, activity_tool): def __call__(self, activity_tool):
try: try:
...@@ -315,7 +318,6 @@ class Message(BaseMessage): ...@@ -315,7 +318,6 @@ class Message(BaseMessage):
result = method(*self.args, **self.kw) result = method(*self.args, **self.kw)
finally: finally:
setSecurityManager(old_security_manager) setSecurityManager(old_security_manager)
if method is not None: if method is not None:
if self.active_process and result is not None: if self.active_process and result is not None:
self.activateResult( self.activateResult(
...@@ -500,6 +502,7 @@ class Method(object): ...@@ -500,6 +502,7 @@ class Method(object):
method_id=self._method_id, method_id=self._method_id,
args=args, args=args,
kw=kw, kw=kw,
activity=self._activity,
request=self._request, request=self._request,
portal_activities=portal_activities, portal_activities=portal_activities,
) )
......
...@@ -64,6 +64,7 @@ if ENABLE_JOBLIB: ...@@ -64,6 +64,7 @@ if ENABLE_JOBLIB:
def get(self, timeout=None): def get(self, timeout=None):
resultDict = self.active_process.getResultDict() resultDict = self.active_process.getResultDict()
if not resultDict.has_key(self.active_process_sig): if not resultDict.has_key(self.active_process_sig):
raise ConflictError raise ConflictError
result = resultDict[self.active_process_sig].result result = resultDict[self.active_process_sig].result
...@@ -93,16 +94,14 @@ if ENABLE_JOBLIB: ...@@ -93,16 +94,14 @@ if ENABLE_JOBLIB:
portal_activities = self.active_process.portal_activities portal_activities = self.active_process.portal_activities
active_process_id = self.active_process.getId() active_process_id = self.active_process.getId()
joblib_result = None joblib_result = None
# create a signature and convert it to integer # create a signature and convert it to integer
sig = joblib_hash(batch.items[0]) sig = joblib_hash((MySafeFunction(batch),))
sigint = int(sig, 16) % (10 ** 16) sigint = int(sig, 16) % (10 ** 16)
resultDict = self.active_process.getResultDict() resultDict = self.active_process.getResultDict()
if not resultDict.has_key(sigint): if not resultDict.has_key(sigint):
joblib_result = portal_activities.activate(activity='SQLJoblib', joblib_result = portal_activities.activate(activity='SQLJoblib',
tag="joblib_%s" % active_process_id, tag="joblib_%s" % active_process_id,
signature=sig, active_process=self.active_process).Base_callSafeFunction(MySafeFunction(batch))
active_process=self.active_process).Base_callSafeFunction(sigint, MySafeFunction(batch))
if joblib_result is None: if joblib_result is None:
joblib_result = CMFActivityResult(self.active_process, sigint, callback) joblib_result = CMFActivityResult(self.active_process, sigint, callback)
return joblib_result return joblib_result
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment