Commit e01df29a authored by Hardik Juneja's avatar Hardik Juneja

CMFActivity: Add a function to create hashes

parent 5cec1117
......@@ -42,9 +42,15 @@ _DequeueMessageException = Exception()
from SQLDict import SQLDict
def sqljoblib_hash(args):
signature = joblib_hash(args)
sigint = int(signature, 16) % (10 ** 16)
return sigint
class SQLJoblib(SQLDict):
"""
A simple OOBT based queue, It is non transatactional and follow always-excute paradigm.
An extention of SQLDict, It is non transatactional and follow always-excute paradigm.
It uses a dictionary to store results and with hash of arguments as keys
"""
sql_table = 'message_job'
uid_group = 'portal_activity_job'
......@@ -75,11 +81,7 @@ class SQLJoblib(SQLDict):
"""
assert not message.is_registered, message
message.is_registered = True
signature = joblib_hash(message.args)
sigint = int(signature, 16) % (10 ** 16)
message.activity_kw['signature'] = sigint
message.activity_kw['signature'] = sqljoblib_hash(message.args)
if activity_buffer.activity_tool is None:
self.activity_tool = activity_tool
self.prepareMessage(activity_tool, message)
......
......@@ -31,12 +31,12 @@ from ZODB.POSException import ConflictError
try:
from sklearn.externals.joblib import register_parallel_backend
from sklearn.externals.joblib.hashing import hash as joblib_hash
from sklearn.externals.joblib.parallel import ParallelBackendBase, parallel_backend
from sklearn.externals.joblib.parallel import FallbackToBackend, SequentialBackend
from sklearn.externals.joblib._parallel_backends import SafeFunction
from sklearn.externals.joblib.my_exceptions import TransportableException, WorkerInterrupt
from sklearn.externals.joblib.format_stack import format_exc
from Products.CMFActivity.Activity.SQLJoblib import sqljoblib_hash
except ImportError:
LOG("CMFActivityBackend", WARNING, "CLASS NOT LOADED!!!")
ENABLE_JOBLIB = False
......@@ -95,15 +95,14 @@ if ENABLE_JOBLIB:
active_process_id = self.active_process.getId()
joblib_result = None
# create a signature and convert it to integer
sig = joblib_hash((MySafeFunction(batch),))
sigint = int(sig, 16) % (10 ** 16)
sig = sqljoblib_hash((MySafeFunction(batch),))
resultDict = self.active_process.getResultDict()
if not resultDict.has_key(sigint):
if not resultDict.has_key(sig):
joblib_result = portal_activities.activate(activity='SQLJoblib',
tag="joblib_%s" % active_process_id,
active_process=self.active_process).Base_callSafeFunction(MySafeFunction(batch))
if joblib_result is None:
joblib_result = CMFActivityResult(self.active_process, sigint, callback)
joblib_result = CMFActivityResult(self.active_process, sig, callback)
return joblib_result
def configure(self, n_jobs=1, parallel=None, **backend_args):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment