Commit a577d8f0 authored by Hardik Juneja's avatar Hardik Juneja

Remove remaining transcaions.commit and move BTree to ActiveProcess

parent 2da064a8
from sklearn.externals.joblib.parallel import parallel_backend, Parallel, delayed
from Products.ERP5Type.Log import log
from Products.CMFActivity.ActiveResult import ActiveResult
import time
from math import sqrt
def abc(num):
time.sleep(5)
return sqrt(num)
def test(self, active_process_path):
active_process = self.portal_activities.unrestrictedTraverse(active_process_path)
with parallel_backend('CMFActivity', active_process=active_process):
result = Parallel(n_jobs=2, timeout=30, verbose=30)(delayed(sqrt)(i**2) for i in range(2))
result = Parallel(n_jobs=2, timeout=30, verbose=30)(delayed(abc)(i**2) for i in range(4))
log("I am here", result)
return result
\ No newline at end of file
......@@ -6,6 +6,12 @@
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>joblibFunction</string> </value>
......@@ -51,13 +57,28 @@
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
......@@ -70,7 +91,7 @@
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
......@@ -79,7 +100,7 @@
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
</pickle>
......
......@@ -20,7 +20,8 @@ def test(self, active_process_path):
'gamma': np.logspace(-10, 10, 3),
'tol': [1e-4]
}
X = np.ascontiguousarray(X)
y = np.ascontiguousarray(y)
clf = GridSearchCV(SVC(), param_grid=param_grid, verbose=10)
active_process = self.portal_activities.unrestrictedTraverse(active_process_path)
with parallel_backend('CMFActivity', n_jobs=2, active_process=active_process):
......
......@@ -6,6 +6,12 @@
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>joblibGridSearch</string> </value>
......@@ -49,13 +55,28 @@
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
......@@ -68,7 +89,7 @@
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
......@@ -77,7 +98,7 @@
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
</pickle>
......
......@@ -2,6 +2,7 @@ if REQUEST is not None:
raise Unauthorized
from Products.ERP5Type.Log import log
from Products.CMFActivity.ActiveResult import ActiveResult
log("Executing batch_function on %s" % context.getRelativeUrl())
......@@ -9,4 +10,4 @@ result = batch_function()
log("Result of batch_function on %s: %s" % (context.getRelativeUrl(), result))
return result
return ActiveResult(result=result, sig=hash)
......@@ -50,7 +50,7 @@
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>batch_function, REQUEST=None</string> </value>
<value> <string>hash, batch_function, REQUEST=None</string> </value>
</item>
<item>
<key> <string>id</string> </key>
......
......@@ -4,7 +4,8 @@ from Products.ERP5Type.Log import log
timeout = 10
active_process = context.portal_activities.newActiveProcess()
active_process.useBTree()
active_process_id = active_process.getId()
path = active_process.getPhysicalPath()
context.portal_activities.activate(activity="SQLQueue", after_method_id="Base_callSafeFunction", active_process=active_process, tag='abc').Base_joblibFunction(path)
context.portal_activities.activate(activity="SQLQueue", after_method_id="Base_callSafeFunction", active_process=active_process, tag='abc').Base_joblibGridSearchFunction(path)
return path
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>driverScript</string> </value>
<value> <string>Base_driverScript</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -12,7 +12,7 @@
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>joblibGridSearch</string> </value>
<value> <string>joblibFunction</string> </value>
</item>
<item>
<key> <string>id</string> </key>
......
......@@ -12,7 +12,7 @@
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>joblibFunction</string> </value>
<value> <string>joblibGridSearch</string> </value>
</item>
<item>
<key> <string>id</string> </key>
......
......@@ -32,6 +32,7 @@ from Products.CMFCore import permissions as CMFCorePermissions
from Products.ERP5Type.Base import Base
from Products.ERP5Type import PropertySheet
from Products.ERP5Type.ConflictFree import ConflictFreeLog
from BTrees.LOBTree import LOBTree
from BTrees.Length import Length
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE
from random import randrange
......@@ -87,14 +88,31 @@ class ActiveProcess(Base):
Base.__init__(self, *args, **kw)
self.result_list = ConflictFreeLog()
security.declareProtected(CMFCorePermissions.ManagePortal, 'useBTree')
def useBTree(self):
# Use BTree instead of Linked List
# this is used by joblib Backend to store results in a dictionary with
# signature as key
self.use_btree = True
self.result_list = LOBTree()
security.declareProtected(CMFCorePermissions.ManagePortal, 'postResult')
def postResult(self, result):
try:
result_list = self.result_list
except AttributeError:
# BBB: self was created before implementation of __init__
self.result_list = result_list = ConflictFreeLog()
if self.use_btree:
self.result_list = result_list = LOBTree()
else:
self.result_list = result_list = ConflictFreeLog()
else:
if self.use_btree:
if not hasattr(result, 'sig'):
result.sig = randrange(0, 10000 * (id(result) + 1))
result_list.insert(result.sig, result)
return
if type(result_list) is not ConflictFreeLog: # BBB: result_list is IOBTree
# use a random id in order to store result in a way with
# fewer conflict errors
......@@ -104,7 +122,11 @@ class ActiveProcess(Base):
result_list[random_id] = result
self.result_len.change(1)
return
result_list.append(result)
if self.use_btree:
result_list.insert(result.sig, result)
else:
result_list.append(result)
security.declareProtected(CMFCorePermissions.ManagePortal, 'postActiveResult')
def postActiveResult(self, *args, **kw):
......@@ -125,6 +147,18 @@ class ActiveProcess(Base):
return result_list.values()
return list(result_list)
security.declareProtected(CMFCorePermissions.ManagePortal, 'getResult')
def getResult(self, key, **kw):
"""
Returns the result with requested key else None
"""
try:
result_list = self.result_list
result = result_list[key]
except:
return None
return result
security.declareProtected(CMFCorePermissions.ManagePortal, 'activateResult')
def activateResult(self, result):
if result not in (None, 0, '', (), []):
......
......@@ -542,7 +542,8 @@ class SQLBase(Queue):
return not message_list
def finalizeMessageExecution(self, activity_tool, message_list,
uid_to_duplicate_uid_list_dict=None):
uid_to_duplicate_uid_list_dict=None,
deletable_uid_list = []):
"""
If everything was fine, delete all messages.
If anything failed, make successful messages available (if any), and
......@@ -553,7 +554,6 @@ class SQLBase(Queue):
be put in a permanent-error state.
- In all other cases, retry count is increased and message is delayed.
"""
deletable_uid_list = []
delay_uid_list = []
final_error_uid_list = []
make_available_uid_list = []
......
This diff is collapsed.
......@@ -26,11 +26,11 @@
##############################################################################
ENABLE_JOBLIB = True
import copy
import hashlib
import sys
import time
import transaction
from BTrees.OOBTree import OOBTree
from zLOG import LOG, INFO, WARNING
from ZODB.POSException import ConflictError
......@@ -45,9 +45,33 @@ except ImportError:
LOG("CMFActivityBackend", WARNING, "CLASS NOT LOADED!!!")
ENABLE_JOBLIB = False
from Activity.SQLJoblib import MyBatchedSignature
if ENABLE_JOBLIB:
# this is improvisation of
# http://stackoverflow.com/questions/5884066/hashing-a-python-dictionary/8714242#8714242
def make_hash(o):
"""
Makes a hash from a dictionary, list, tuple or set to any level, that contains
only other hashable types (including any lists, tuples, sets, and
dictionaries).
"""
if (callable(o) and o.__name__):
return hash(o.__name__)
if isinstance(o, (set, tuple, list)):
return hash(tuple([make_hash(e) for e in o]))
elif not isinstance(o, dict):
try:
return hash(o)
except TypeError:
return hash(int(hashlib.md5(o).hexdigest(), 16))
new_o = copy.deepcopy(o)
for k, v in new_o.items():
new_o[k] = make_hash(v)
return hash(tuple(frozenset(sorted(new_o.items()))))
class MySafeFunction(SafeFunction):
"""Wrapper around a SafeFunction that catches any exception
......@@ -67,21 +91,11 @@ if ENABLE_JOBLIB:
self.active_process = active_process
self.active_process_sig = active_process_sig
self.callback = callback
def get(self, timeout=None):
'''
while not self.active_process.getResultList():
time.sleep(1)
if timeout is not None:
timeout -= 1
if timeout < 0:
raise RuntimeError('Timeout reached')
transaction.commit()
'''
if self.active_process.process_result_map[self.active_process_sig] is None:
if self.active_process.getResult(self.active_process_sig) is None:
raise ConflictError
result = self.active_process.process_result_map[self.active_process_sig]
result = self.active_process.getResult(self.active_process_sig).result
# TODO raise before or after the callback?
if isinstance(result, Exception):
......@@ -94,10 +108,7 @@ if ENABLE_JOBLIB:
def __init__(self, *args, **kwargs):
self.count = 1
self.active_process = kwargs['active_process']
if not hasattr(self.active_process, 'process_result_map'):
self.active_process.process_result_map = OOBTree()
transaction.commit()
def effective_n_jobs(self, n_jobs):
"""Dummy implementation to prevent n_jobs <=0
......@@ -112,13 +123,13 @@ if ENABLE_JOBLIB:
portal_activities = self.active_process.portal_activities
active_process_id = self.active_process.getId()
joblib_result = None
sig = make_hash(batch.items[0])
sig = MyBatchedSignature(batch)
if not self.active_process.process_result_map.has_key(sig):
self.active_process.process_result_map.insert(sig, None)
if not self.active_process.getResult(sig):
joblib_result = portal_activities.activate(activity='SQLJoblib',
tag="joblib_%s" % active_process_id,
active_process=self.active_process).Base_callSafeFunction(MySafeFunction(batch))
signature=sig,
active_process=self.active_process).Base_callSafeFunction(sig, MySafeFunction(batch))
if joblib_result is None:
joblib_result = CMFActivityResult(self.active_process, sig, callback)
return joblib_result
......@@ -138,11 +149,6 @@ if ENABLE_JOBLIB:
def abort_everything(self, ensure_ready=True):
# All jobs will be aborted here while they are still processing our backend
# remove job with no results
#self.active_process.process_result_map = dict((k, v)
# for k, v in self.active_process.process_result_map.iteritems() if v)
transaction.commit()
if ensure_ready:
self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel,
**self.parallel._backend_args)
......
......@@ -177,7 +177,6 @@ class Message(BaseMessage):
self.method_id = method_id
self.args = args
self.kw = kw
self.result = None
if getattr(portal_activities, 'activity_creation_trace', False):
# Save current traceback, to make it possible to tell where a message
# was generated.
......@@ -316,12 +315,12 @@ class Message(BaseMessage):
result = method(*self.args, **self.kw)
finally:
setSecurityManager(old_security_manager)
if method is not None:
if self.active_process and result is not None:
self.activateResult(
activity_tool.unrestrictedTraverse(self.active_process),
result, obj)
self.result = result
self.setExecutionState(MESSAGE_EXECUTED)
except:
self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
......@@ -504,7 +503,6 @@ class Method(object):
request=self._request,
portal_activities=portal_activities,
)
if portal_activities.activity_tracking:
activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self._activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
portal_activities.getActivityBuffer().deferredQueueMessage(
......
......@@ -20,6 +20,7 @@ CREATE TABLE <dtml-var table> (
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`signature` BIGINT NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
processing_node
uid
</params>
UPDATE
message_job
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
signature
</params>
SELECT uid FROM
message_job
WHERE
processing_node = 0
AND path = <dtml-sqlvar path type="string">
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
AND signature = <dtml-sqlvar signature type="int">
FOR UPDATE
......@@ -18,10 +18,11 @@ processing_node
date
group_method_id
tag
signature
serialization_tag
</params>
INSERT INTO <dtml-var table>
(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, serialization_tag, message)
(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, signature, serialization_tag, message)
VALUES
(
<dtml-sqlvar expr="uid" type="int">,
......@@ -34,6 +35,7 @@ VALUES
<dtml-sqlvar expr="priority" type="int">,
<dtml-sqlvar expr="group_method_id" type="string">,
<dtml-sqlvar expr="tag" type="string">,
<dtml-sqlvar expr="signature" type="int">,
<dtml-sqlvar expr="serialization_tag" type="string">,
<dtml-sqlvar expr="message" type="string">
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment