Commit 3df12bb8 authored by Hardik Juneja's avatar Hardik Juneja

CMFActivity: Remove transaction.commit and sleep from initial implementation...

CMFActivity: Remove transaction.commit and sleep from initial implementation and use conflict error instead
parent 5591dea3
......@@ -32,6 +32,7 @@ from Products.CMFCore import permissions as CMFCorePermissions
from Products.ERP5Type.Base import Base
from Products.ERP5Type import PropertySheet
from Products.ERP5Type.ConflictFree import ConflictFreeLog
from BTrees.LOBTree import LOBTree
from BTrees.Length import Length
from random import randrange
from .ActiveResult import ActiveResult
......@@ -85,6 +86,15 @@ class ActiveProcess(Base):
def __init__(self, *args, **kw):
Base.__init__(self, *args, **kw)
self.result_list = ConflictFreeLog()
self.use_btree = False
security.declareProtected(CMFCorePermissions.ManagePortal, 'useBTree')
def useBTree(self):
# Use BTree instead of Linked List
# this is used by joblib Backend to store results in a dictionary with
# signature as key
self.use_btree = True
self.result_list = LOBTree()
security.declareProtected(CMFCorePermissions.ManagePortal, 'postResult')
def postResult(self, result):
......@@ -92,8 +102,19 @@ class ActiveProcess(Base):
result_list = self.result_list
except AttributeError:
# BBB: self was created before implementation of __init__
self.result_list = result_list = ConflictFreeLog()
if self.use_btree:
self.result_list = result_list = LOBTree()
else:
self.result_list = result_list = ConflictFreeLog()
else:
if self.use_btree:
if not hasattr(result, 'signature'):
result_id = randrange(0, 10000 * (id(result) + 1))
else:
result_id = result.signature
result_list.insert(result_id, result)
return
if type(result_list) is not ConflictFreeLog: # BBB: result_list is IOBTree
# use a random id in order to store result in a way with
# fewer conflict errors
......@@ -103,7 +124,12 @@ class ActiveProcess(Base):
result_list[random_id] = result
self.result_len.change(1)
return
result_list.append(result)
if self.use_btree:
signature = int(result.signature, 16)
result_list.insert(signature, result)
else:
result_list.append(result)
security.declareProtected(CMFCorePermissions.ManagePortal, 'postActiveResult')
def postActiveResult(self, *args, **kw):
......@@ -124,6 +150,18 @@ class ActiveProcess(Base):
return result_list.values()
return list(result_list)
security.declareProtected(CMFCorePermissions.ManagePortal, 'getResultDict')
def getResultDict(self, **kw):
"""
Returns the result with requested key else None
"""
try:
result_list = self.result_list
result = result_list[key]
except KeyError:
return None
return result
security.declareProtected(CMFCorePermissions.ManagePortal, 'activateResult')
def activateResult(self, result):
if result not in (None, 0, '', (), []):
......
......@@ -43,7 +43,7 @@ INVALID_ORDER = 2
# Time global parameters
MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 15 # in seconds
VALIDATION_ERROR_DELAY = 1 # in seconds
class Queue(object):
"""
......
......@@ -117,6 +117,11 @@ class SQLBase(Queue):
" The following added columns could not be initialized: %s"
% (self.sql_table, ", ".join(column_list)))
def register(self, activity_buffer, activity_tool, message):
activity_buffer.register(activity_tool)
assert not message.is_registered, message
self.registerMessage(activity_buffer, activity_tool, message)
def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered]
portal = activity_tool.getPortalObject()
......
......@@ -26,12 +26,193 @@
#
##############################################################################
import transaction
from functools import total_ordering
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
from SQLBase import SQLBase, sort_message_key
from Products.CMFActivity.ActivityTool import Message
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
_DequeueMessageException = Exception()
from SQLDict import SQLDict
class SQLJoblib(SQLDict):
"""
XXX SQLJoblib
A simple OOBT based queue, It is non transatactional and follow always-excute paradigm.
"""
sql_table = 'message_job'
uid_group = 'portal_activity_job'
def initialize(self, activity_tool, clear):
"""
Initialize the message table using MYISAM Engine
"""
folder = activity_tool.getPortalObject().portal_skins.activity
try:
createMessageTable = folder.SQLJoblib_createMessageTable
except AttributeError:
return
if clear:
folder.SQLBase_dropMessageTable(table=self.sql_table)
createMessageTable(table=self.sql_table)
else:
src = createMessageTable._upgradeSchema(create_if_not_exists=1,
initialize=self._initialize,
table=self.sql_table)
if src:
LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src))
def register(self, activity_buffer, activity_tool, message):
"""
Send messages to mysql directly
"""
assert not message.is_registered, message
message.is_registered = True
if activity_buffer.activity_tool is None:
self.activity_tool = activity_tool
self.prepareMessage(activity_tool, message)
def prepareMessage(self, activity_tool, m):
portal = activity_tool.getPortalObject()
if m.is_registered:
uid = portal.portal_ids.generateNewIdList(self.uid_group,
id_count=1, id_generator='uid')[0]
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node = (0 if x == 'none' else -1)
portal.SQLJoblib_writeMessage(
table=self.sql_table,
uid=uid,
path='/'.join(m.object_path),
active_process_uid=m.active_process_uid,
method_id=m.method_id,
priority=m.activity_kw.get('priority', 1),
message=Message.dump(m),
group_method_id=m.getGroupId(),
date=m.activity_kw.get('at_date'),
tag=m.activity_kw.get('tag', ''),
signature=m.activity_kw.get('signature', ''),
processing_node=processing_node,
serialization_tag=m.activity_kw.get('serialization_tag', ''))
def getProcessableMessageLoader(self, activity_tool, processing_node):
path_and_method_id_dict = {}
def load(line):
# getProcessableMessageList already fetch messages with the same
# group_method_id, so what remains to be filtered on are path, method_id
# and signature
path = line.path
method_id = line.method_id
key = path, method_id
uid = line.uid
signature = line.signature
original_uid = path_and_method_id_dict.get(key)
if original_uid is None:
m = Message.load(line.message, uid=uid, line=line, signature=signature)
try:
result = activity_tool.SQLJoblib_selectDuplicatedLineList(
path=path,
method_id=method_id,
group_method_id=line.group_method_id,
signature=signature)
reserve_uid_list = uid_list = [x.uid for x in result]
if reserve_uid_list:
activity_tool.SQLJoblib_reserveDuplicatedLineList(
processing_node=processing_node, uid=reserve_uid_list)
except:
self._log(WARNING, 'getDuplicateMessageUidList got an exception')
raise
if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
path_and_method_id_dict[key] = uid
return m, uid, uid_list
# We know that original_uid != uid because caller skips lines we returned
# earlier.
return None, original_uid, [uid]
return load
def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'),
m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
def distribute(self, activity_tool, node_count):
offset = 0
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
if assignMessage is not None:
now_date = self.getNow(activity_tool)
validated_count = 0
while 1:
result = self._getMessageList(activity_tool, processing_node=-1,
to_date=now_date,
offset=offset, count=READ_MESSAGE_LIMIT)
if not result:
return
transaction.commit()
validation_text_dict = {'none': 1}
message_dict = {}
for line in result:
message = Message.load(line.message, uid=line.uid, line=line)
if not hasattr(message, 'order_validation_text'): # BBB
message.order_validation_text = self.getOrderValidationText(message)
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date)
if message_dict:
message_unique_dict = {}
serialization_tag_dict = {}
distributable_uid_set = set()
deletable_uid_list = []
# remove duplicates
# SQLDict considers object_path, method_id, tag to unify activities,
# but ignores method arguments. They are outside of semantics.
for message in message_dict.itervalues():
message_unique_dict.setdefault(self.generateMessageUID(message),
[]).append(message)
for message_list in message_unique_dict.itervalues():
if len(message_list) > 1:
# Sort list of duplicates to keep the message with highest score
message_list.sort(key=sort_message_key)
deletable_uid_list += [m.uid for m in message_list[1:]]
message = message_list[0]
serialization_tag = message.activity_kw.get('serialization_tag')
if serialization_tag is None:
distributable_uid_set.add(message.uid)
else:
serialization_tag_dict.setdefault(serialization_tag,
[]).append(message)
# Don't let through if there is the same serialization tag in the
# message dict. If there is the same serialization tag, only one can
# be validated and others must wait.
# But messages with group_method_id are exceptions. serialization_tag
# does not stop validating together. Because those messages should
# be processed together at once.
for message_list in serialization_tag_dict.itervalues():
# Sort list of messages to validate the message with highest score
message_list.sort(key=sort_message_key)
distributable_uid_set.add(message_list[0].uid)
group_method_id = message_list[0].line.group_method_id
if group_method_id == '\0':
continue
for message in message_list[1:]:
if group_method_id == message.line.group_method_id:
distributable_uid_set.add(message.uid)
if deletable_uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=deletable_uid_list)
distributable_count = len(distributable_uid_set)
if distributable_count:
assignMessage(table=self.sql_table,
processing_node=0, uid=tuple(distributable_uid_set))
validated_count += distributable_count
if validated_count >= MAX_VALIDATED_LIMIT:
return
offset += READ_MESSAGE_LIMIT
......@@ -48,7 +48,7 @@ class ActivityBuffer(TM):
def getUidSet(self, activity):
return self.uid_set_dict[activity]
def _register(self, activity_tool):
def register(self, activity_tool):
TM._register(self)
if self.activity_tool is None:
self.activity_tool = activity_tool
......@@ -70,9 +70,7 @@ class ActivityBuffer(TM):
raise
def deferredQueueMessage(self, activity_tool, activity, message):
self._register(activity_tool)
assert not message.is_registered, message
activity.registerMessage(self, activity_tool, message)
activity.register(self, activity_tool, message)
def sortKey(self, *ignored):
"""Activities must be finished before databases commit transactions."""
......
......@@ -40,7 +40,8 @@ document_classes = updateGlobals(this_module, globals(),
# Finish installation
def initialize( context ):
# Define object classes and tools
import ActivityTool, ActiveProcess, ActivityConnection, ActivityJoblibBackend
import ActivityTool, ActiveProcess, ActivityConnection
from joblib import CMFActivityParallelBackend
object_classes = (ActiveProcess.ActiveProcess,
#ActivityConnection.ActivityConnection
)
......
......@@ -63,7 +63,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
</td>
</dtml-if>
<td align="left" valign="top"><dtml-var uid>
<dtml-var expr="{'SQLDict':'(dict)','SQLQueue':'(queue)'}[activity]">
<dtml-var expr="{'SQLDict':'(dict)','SQLQueue':'(queue)','SQLJoblib':'(Joblib)'}[activity]">
</td>
<td align="left" valign="top"><a href="<dtml-var expr="REQUEST.physicalPathToURL(path)">"><dtml-var path></a></td>
<td align="left" valign="top"><dtml-var method_id></td>
......
......@@ -26,14 +26,12 @@
##############################################################################
ENABLE_JOBLIB = True
import sys
import time
import transaction
from zLOG import LOG, INFO, WARNING
from ZODB.POSException import ConflictError
try:
from sklearn.externals.joblib import register_parallel_backend
from sklearn.externals.joblib.hashing import hash as joblib_hash
from sklearn.externals.joblib.parallel import ParallelBackendBase, parallel_backend
from sklearn.externals.joblib.parallel import FallbackToBackend, SequentialBackend
from sklearn.externals.joblib._parallel_backends import SafeFunction
......@@ -44,11 +42,14 @@ except ImportError:
ENABLE_JOBLIB = False
if ENABLE_JOBLIB:
class MySafeFunction(SafeFunction):
"""Wrapper around a SafeFunction that catches any exception
The exception can be handled in CMFActivityResult.get
The exception can be handled in CMFActivityResult.get
"""
def __init__(self, *args, **kwargs):
super(MySafeFunction, self).__init__(*args, **kwargs)
self.batch = args[0]
def __call__(self, *args, **kwargs):
try:
return super(MySafeFunction, self).__call__(*args, **kwargs)
......@@ -56,19 +57,16 @@ if ENABLE_JOBLIB:
return exc
class CMFActivityResult(object):
def __init__(self, active_process, callback):
def __init__(self, active_process, active_process_sig, callback):
self.active_process = active_process
self.active_process_sig = active_process_sig
self.callback = callback
def get(self, timeout=None):
while not self.active_process.getResultList():
time.sleep(1)
if timeout is not None:
timeout -= 1
if timeout < 0:
raise RuntimeError('Timeout reached')
transaction.commit()
result = self.active_process.getResultList()[0].result
# TODO raise before or after the callback?
if self.active_process.getResult(self.active_process_sig) is None:
raise ConflictError
result = self.active_process.getResult(self.active_process_sig).result
if isinstance(result, Exception):
raise result
if self.callback is not None:
......@@ -77,33 +75,36 @@ if ENABLE_JOBLIB:
class CMFActivityBackend(ParallelBackendBase):
def __init__(self, *args, **kwargs):
self.zope_context = kwargs['zope_context']
self.count = 1
self.active_process = kwargs['active_process']
def effective_n_jobs(self, n_jobs):
"""Dummy implementation to prevent n_jobs <=0
and allow (sequential) n_jobs=1 and n_jobs != 1 (parallel) behaviour
"""
if n_jobs == 0:
raise ValueError('n_jobs == 0 in Parallel has no meaning')
raise ValueError('n_jobs == 0 in Parallel has no meaning')
return abs(n_jobs)
def apply_async(self, batch, callback=None):
"""Schedule a func to be run"""
portal_activities = self.zope_context.portal_activities
# the creation of activitiy process here, might be removed.
active_process = portal_activities.newActiveProcess()
# SQLJoblib == JoblibActivity
joblib_result = portal_activities.activate(activity='SQLQueue',
active_process=active_process).Base_callSafeFunction(MySafeFunction(batch))
# While activate() don't return the joblib_result
portal_activities = self.active_process.portal_activities
active_process_id = self.active_process.getId()
joblib_result = None
# create a signature and convert it to integer
sig = joblib_hash(batch.items[0])
sigint = int(sig, 16) % (10 ** 16)
if not self.active_process.getResult(sigint):
joblib_result = portal_activities.activate(activity='SQLJoblib',
tag="joblib_%s" % active_process_id,
signature=sig,
active_process=self.active_process).Base_callSafeFunction(sigint, MySafeFunction(batch))
if joblib_result is None:
# Transaction commit, is a code crime.
transaction.commit()
joblib_result = CMFActivityResult(active_process, callback)
joblib_result = CMFActivityResult(self.active_process, sigint, callback)
return joblib_result
def configure(self, n_jobs=1, parallel=None, **backend_args):
"""Reconfigure the backend and return the number of workers. This
makes it possible to reuse an existing backend instance for successive
......@@ -115,9 +116,15 @@ if ENABLE_JOBLIB:
raise FallbackToBackend(SequentialBackend())
self.parallel = parallel
# self.zope_context = backend_args['zope_context']
return self.effective_n_jobs(n_jobs)
def abort_everything(self, ensure_ready=True):
# All jobs will be aborted here while they are still processing our backend
if ensure_ready:
self.configure(n_jobs=self.parallel.n_jobs, parallel=self.parallel,
**self.parallel._backend_args)
return
register_parallel_backend('CMFActivity', CMFActivityBackend)
else:
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table</params>
CREATE TABLE <dtml-var table> (
`uid` INT UNSIGNED NOT NULL,
`date` DATETIME NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`processing` TINYINT NOT NULL DEFAULT 0,
`processing_date` DATETIME,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`signature` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY `processing_node_processing` (`processing_node`, `processing`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`priority`),
KEY (`tag`)
) ENGINE=MYISAM
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
processing_node
uid
</params>
UPDATE
message_job
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
signature
</params>
SELECT uid FROM
message_job
WHERE
processing_node = 0
AND path = <dtml-sqlvar path type="string">
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
AND signature = <dtml-sqlvar signature type="string">
FOR UPDATE
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid
path
active_process_uid
method_id
message
priority
processing_node
date
group_method_id
tag
signature
serialization_tag
</params>
INSERT INTO <dtml-var table>
(uid, path, active_process_uid, date, method_id, processing_node, processing, priority, group_method_id, tag, signature, serialization_tag, message)
VALUES
(
<dtml-sqlvar expr="uid" type="int">,
<dtml-sqlvar expr="path" type="string">,
<dtml-sqlvar expr="active_process_uid" type="int" optional>,
<dtml-if expr="date is not None"><dtml-if expr="date is not None"><dtml-sqlvar expr="date" type="datetime"><dtml-else>UTC_TIMESTAMP()</dtml-if><dtml-else>UTC_TIMESTAMP()</dtml-if>,
<dtml-sqlvar expr="method_id" type="string">,
<dtml-sqlvar expr="processing_node" type="int">,
0,
<dtml-sqlvar expr="priority" type="int">,
<dtml-sqlvar expr="group_method_id" type="string">,
<dtml-sqlvar expr="tag" type="string">,
<dtml-sqlvar expr="signature" type="string">,
<dtml-sqlvar expr="serialization_tag" type="string">,
<dtml-sqlvar expr="message" type="string">
)
......@@ -87,7 +87,6 @@ from Products.ERP5Type.patches import ZopePageTemplate
from Products.ERP5Type.patches import ZSQLMethod
from Products.ERP5Type.patches import MimetypesRegistry
from Products.ERP5Type.patches import users
from Products.ERP5Type.patches import JoblibParallelPrint
# These symbols are required for backward compatibility
from Products.ERP5Type.patches.PropertyManager import ERP5PropertyManager
......
ENABLE_PATCH = True
try:
import sklearn
#from sklearn.externals import joblib
#from sklearn.externals.joblib.parallel import Parallel
from joblib.parallel import Parallel
except ImportError:
ENABLE_PATCH = False
if ENABLE_PATCH:
from zLOG import LOG, WARNING
def _print(self, msg, msg_args):
msg = msg % msg_args
LOG('Parallel._print', WARNING, '[%s]: %s\n' % (self, msg))
Parallel._print = _print
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment