Commit 5cec1117 authored by Hardik Juneja's avatar Hardik Juneja

CMFActivity: remove useBtree and let activity type decide when to use Dict instead

parent 493bd0f2
......@@ -86,35 +86,26 @@ class ActiveProcess(Base):
def __init__(self, *args, **kw):
Base.__init__(self, *args, **kw)
self.result_list = ConflictFreeLog()
self.use_btree = False
security.declareProtected(CMFCorePermissions.ManagePortal, 'useBTree')
def useBTree(self):
# Use BTree instead of Linked List
# this is used by joblib Backend to store results in a dictionary with
# signature as key
self.use_btree = True
self.result_list = LOBTree()
self.result_dict = LOBTree()
security.declareProtected(CMFCorePermissions.ManagePortal, 'postResult')
def postResult(self, result):
"""
Alais for appendResult
"""
self.appendResult(result);
security.declareProtected(CMFCorePermissions.ManagePortal, 'appendResult')
def appendResult(self, result):
"""
Append result to the result list
"""
try:
result_list = self.result_list
except AttributeError:
# BBB: self was created before implementation of __init__
if self.use_btree:
self.result_list = result_list = LOBTree()
else:
self.result_list = result_list = ConflictFreeLog()
self.result_list = result_list = ConflictFreeLog()
else:
if self.use_btree:
if not hasattr(result, 'signature'):
result_id = randrange(0, 10000 * (id(result) + 1))
else:
result_id = result.signature
result_list.insert(result_id, result)
return
if type(result_list) is not ConflictFreeLog: # BBB: result_list is IOBTree
# use a random id in order to store result in a way with
# fewer conflict errors
......@@ -124,16 +115,32 @@ class ActiveProcess(Base):
result_list[random_id] = result
self.result_len.change(1)
return
if self.use_btree:
signature = int(result.signature, 16)
result_list.insert(signature, result)
else:
result_list.append(result)
result_list.append(result)
security.declareProtected(CMFCorePermissions.ManagePortal, 'postActiveResult')
def postActiveResult(self, *args, **kw):
return self.postResult(ActiveResult(*args, **kw))
"""
Alais for appendResult
"""
return self.appendResult(ActiveResult(*args, **kw))
security.declareProtected(CMFCorePermissions.ManagePortal, 'setResult')
def setResult(self, result):
"""
Set the result to the result dict
If signature is not present, a random signature is used
"""
result_dict = self.result_dict
if not hasattr(result, 'signature'):
result_id = randrange(0, 1000000)
else:
result_id = int(result.signature)
result_dict.insert(result_id, result)
return
security.declareProtected(CMFCorePermissions.ManagePortal, 'setActiveResult')
def setActiveResult(self, *args, **kw):
return self.setResult(ActiveResult(*args, **kw))
security.declareProtected(CMFCorePermissions.ManagePortal, 'getResultList')
def getResultList(self, **kw):
......@@ -146,8 +153,7 @@ class ActiveProcess(Base):
except AttributeError:
# BBB: self was created before implementation of __init__
return []
if type(result_list) is not ConflictFreeLog:
# BBB: result_list is IOBTree or LOBTree
if type(result_list) is not ConflictFreeLog: # BBB: result_list is IOBTree
return result_list.values()
return list(result_list)
......
......@@ -121,7 +121,10 @@ class SQLBase(Queue):
activity_buffer.register(activity_tool)
assert not message.is_registered, message
self.registerMessage(activity_buffer, activity_tool, message)
def activateResult(self, active_process, result, signature=None):
active_process.appendResult(result)
def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered]
portal = activity_tool.getPortalObject()
......
......@@ -31,6 +31,7 @@ from functools import total_ordering
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from SQLBase import SQLBase, sort_message_key
from Products.CMFActivity.ActivityTool import Message
from sklearn.externals.joblib.hashing import hash as joblib_hash;
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
......@@ -74,10 +75,19 @@ class SQLJoblib(SQLDict):
"""
assert not message.is_registered, message
message.is_registered = True
signature = joblib_hash(message.args)
sigint = int(signature, 16) % (10 ** 16)
message.activity_kw['signature'] = sigint
if activity_buffer.activity_tool is None:
self.activity_tool = activity_tool
self.prepareMessage(activity_tool, message)
def activateResult(self, active_process, result, signature):
result.signature = int(signature)
active_process.setResult(result)
def prepareMessage(self, activity_tool, m):
portal = activity_tool.getPortalObject()
if m.is_registered:
......
......@@ -166,6 +166,7 @@ class Message(BaseMessage):
activity_kw,
method_id,
args, kw,
activity=None,
request=None,
portal_activities=None,
):
......@@ -177,6 +178,7 @@ class Message(BaseMessage):
self.method_id = method_id
self.args = args
self.kw = kw
self.activity = activity
if getattr(portal_activities, 'activity_creation_trace', False):
# Save current traceback, to make it possible to tell where a message
# was generated.
......@@ -289,8 +291,11 @@ class Message(BaseMessage):
if not isinstance(result, ActiveResult):
result = ActiveResult(result=result)
# XXX Allow other method_id in future
if not hasattr(self, 'activity'):
self.activity = DEFAULT_ACTIVITY
result.edit(object_path=object, method_id=self.method_id)
active_process.postResult(result)
signature = getattr(self, 'signature', None)
activity_dict[self.activity].activateResult(active_process, result, signature)
def __call__(self, activity_tool):
try:
......@@ -315,7 +320,6 @@ class Message(BaseMessage):
result = method(*self.args, **self.kw)
finally:
setSecurityManager(old_security_manager)
if method is not None:
if self.active_process and result is not None:
self.activateResult(
......@@ -500,6 +504,7 @@ class Method(object):
method_id=self._method_id,
args=args,
kw=kw,
activity=self._activity,
request=self._request,
portal_activities=portal_activities,
)
......
......@@ -64,6 +64,7 @@ if ENABLE_JOBLIB:
def get(self, timeout=None):
resultDict = self.active_process.getResultDict()
if not resultDict.has_key(self.active_process_sig):
raise ConflictError
result = resultDict[self.active_process_sig].result
......@@ -93,16 +94,14 @@ if ENABLE_JOBLIB:
portal_activities = self.active_process.portal_activities
active_process_id = self.active_process.getId()
joblib_result = None
# create a signature and convert it to integer
sig = joblib_hash(batch.items[0])
sig = joblib_hash((MySafeFunction(batch),))
sigint = int(sig, 16) % (10 ** 16)
resultDict = self.active_process.getResultDict()
if not resultDict.has_key(sigint):
joblib_result = portal_activities.activate(activity='SQLJoblib',
tag="joblib_%s" % active_process_id,
signature=sig,
active_process=self.active_process).Base_callSafeFunction(sigint, MySafeFunction(batch))
active_process=self.active_process).Base_callSafeFunction(MySafeFunction(batch))
if joblib_result is None:
joblib_result = CMFActivityResult(self.active_process, sigint, callback)
return joblib_result
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment