Commit 454892aa authored by Hardik Juneja's avatar Hardik Juneja

CMFActivity: cleanup CMFActivityParallelBackend.py

parent 98de0fb2
...@@ -46,12 +46,12 @@ else: ...@@ -46,12 +46,12 @@ else:
"""Wrapper around a SafeFunction that catches any exception """Wrapper around a SafeFunction that catches any exception
The exception can be handled in CMFActivityResult.get The exception can be handled in CMFActivityResult.get
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kw):
super(JoblibSafeFunction, self).__init__(*args, **kwargs) super(JoblibSafeFunction, self).__init__(*args, **kw)
def __call__(self, *args, **kwargs): def __call__(self, *args, **kw):
try: try:
return super(JoblibSafeFunction, self).__call__(*args, **kwargs) return super(JoblibSafeFunction, self).__call__(*args, **kw)
except Exception as exc: except Exception as exc:
raise exc raise exc
...@@ -75,9 +75,10 @@ else: ...@@ -75,9 +75,10 @@ else:
return result return result
class CMFActivityBackend(ParallelBackendBase): class CMFActivityBackend(ParallelBackendBase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kw):
self.count = 1 self.count = 1
self.active_process = kwargs['active_process'] self.active_process = kw['active_process']
#self.portal = kw['portal']
def effective_n_jobs(self, n_jobs): def effective_n_jobs(self, n_jobs):
"""Dummy implementation to prevent n_jobs <=0 """Dummy implementation to prevent n_jobs <=0
...@@ -90,19 +91,17 @@ else: ...@@ -90,19 +91,17 @@ else:
def apply_async(self, batch, callback=None): def apply_async(self, batch, callback=None):
"""Schedule a func to be run""" """Schedule a func to be run"""
portal_activities = self.active_process.portal_activities
active_process_id = self.active_process.getId() active_process_id = self.active_process.getId()
joblib_result = None portal_activities = self.active_process.portal_activities
# active_process = portal_activities.getActiveProcess()
# create a signature and convert it to integer # create a signature and convert it to integer
sig = sqljoblib_hash((JoblibSafeFunction(batch),)) sig = sqljoblib_hash((JoblibSafeFunction(batch),))
resultDict = self.active_process.getResultDict() resultDict = self.active_process.getResultDict()
if not resultDict.has_key(sig): if not resultDict.has_key(sig):
joblib_result = portal_activities.activate(activity='SQLJoblib', portal_activities.activate(activity='SQLJoblib',
tag="joblib_%s" % active_process_id, tag="joblib_" + active_process_id,
active_process=self.active_process)._callSafeFunction(JoblibSafeFunction(batch)) active_process=self.active_process)._callSafeFunction(JoblibSafeFunction(batch))
if joblib_result is None: return CMFActivityResult(self.active_process, sig, callback)
joblib_result = CMFActivityResult(self.active_process, sig, callback)
return joblib_result
def configure(self, n_jobs=1, parallel=None, **backend_args): def configure(self, n_jobs=1, parallel=None, **backend_args):
"""Reconfigure the backend and return the number of workers. This """Reconfigure the backend and return the number of workers. This
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment