From 2357549239b8364b10bfd4eb0d3cf5b0592b5a54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Calonne?= <aurel@nexedi.com> Date: Thu, 12 Aug 2010 14:27:01 +0000 Subject: [PATCH] add former BusinessProcess class git-svn-id: https://svn.erp5.org/repos/public/erp5/sandbox/amount_generator@37774 20353a03-c40f-0410-a6d1-a30d3c3de9de --- .../ERP5Legacy/Document/BusinessProcess.py | 267 ++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100644 product/ERP5Legacy/Document/BusinessProcess.py diff --git a/product/ERP5Legacy/Document/BusinessProcess.py b/product/ERP5Legacy/Document/BusinessProcess.py new file mode 100644 index 0000000000..96168bfda8 --- /dev/null +++ b/product/ERP5Legacy/Document/BusinessProcess.py @@ -0,0 +1,267 @@ +# -*- coding: utf-8 -*- +############################################################################## +# +# Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved. +# Jean-Paul Smets-Solanes <jp@nexedi.com> +# Yusuke Muraoka <yusuke@nexedi.com> +# +# WARNING: This program as such is intended to be used by professional +# programmers who take the whole responsability of assessing all potential +# consequences resulting from its eventual inadequacies and bugs +# End users who are looking for a ready-to-use solution with commercial +# garantees and support are strongly adviced to contract a Free Software +# Service Company +# +# This program is Free Software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +############################################################################## + +from AccessControl import ClassSecurityInfo +from Products.ERP5Type import Permissions, PropertySheet, interfaces +from Products.ERP5Type.XMLObject import XMLObject +from Products.ERP5.Document.Path import Path + +import zope.interface + +class BusinessProcess(Path, XMLObject): + """ + The BusinessProcess class is a container class which is used + to describe business processes in the area of trade, payroll + and production. + + TODO: + - finish interface implementation + """ + meta_type = 'ERP5 Business Process' + portal_type = 'Business Process' + + # Declarative security + security = ClassSecurityInfo() + security.declareObjectProtected(Permissions.AccessContentsInformation) + + # Declarative properties + property_sheets = ( PropertySheet.Base + , PropertySheet.XMLObject + , PropertySheet.CategoryCore + , PropertySheet.DublinCore + , PropertySheet.Folder + , PropertySheet.Comment + , PropertySheet.Arrow + , PropertySheet.BusinessProcess + ) + + # Declarative interfaces + zope.interface.implements(interfaces.IBusinessProcess, + interfaces.IArrowBase) + + # Access to path and states of the business process + security.declareProtected(Permissions.AccessContentsInformation, 'getPathValueList') + def getPathValueList(self, trade_phase=None, context=None, **kw): + """ + Returns all Path of the current BusinessProcess which + are matching the given trade_phase and the optional context. + + trade_phase -- a single trade phase category or a list of + trade phases + + context -- the context to search matching predicates for + + **kw -- same parameters as for searchValues / contentValues + """ + if trade_phase is None: + trade_phase = set() + elif not isinstance(trade_phase, (list, tuple)): + trade_phase = set((trade_phase,)) + else: + trade_phase = set(trade_phase) + result = [] + if len(trade_phase) == 0: + return result + # Separate the selection of business paths into twp steps + # for easier debugging. + # First, collect business paths which can be applicable to a given context. + business_path_list = [] + for business_path in self.objectValues(portal_type='Business Path', + sort_on='int_index'): + if trade_phase.intersection(business_path.getTradePhaseList()): + business_path_list.append(business_path) + # Then, filter business paths by Predicate API. + # FIXME: Ideally, we should use the Domain Tool to search business paths, + # and avoid using the low level Predicate API. But the Domain Tool does + # support the condition above without scripting? + for business_path in business_path_list: + if business_path.test(context): + result.append(business_path) + return result + + security.declareProtected(Permissions.AccessContentsInformation, 'getStateValueList') + def getStateValueList(self, *args, **kw): + """ + Returns all states of the business process. The method + **kw parameters follows the API of searchValues / contentValues + """ + # Naive implementation to redo XXX + kw['portal_type'] = "Business State" + return self.contentValues(*args, **kw) + + # Access to path and states of the business process + def isCompleted(self, explanation): + """ + True if all states are completed + """ + for state in self.getStateValueList(): + if not state.isCompleted(explanation): + return False + return True + + def isBuildable(self, explanation): + """ + True if all any path is buildable + """ + return len(self.getBuildablePathValueList(explanation)) != 0 + + def getBuildablePathValueList(self, explanation): + """ + Returns the list of Business Path which are ready to + be built + """ + return filter(lambda x:x.isBuildable(explanation), + self.objectValues(portal_type='Business Path')) + + def getCompletedStateValueList(self, explanation): + """ + Returns the list of Business States which are finished + """ + return filter(lambda x:x.isCompleted(explanation), self.getStateValueList()) + + def getPartiallyCompletedStateValueList(self, explanation): + """ + Returns the list of Business States which are finished + """ + return filter(lambda x:x.isPartiallyCompleted(explanation), self.getStateValueList()) + + def getLatestCompletedStateValue(self, explanation): + """ + Returns the most advanced completed state + """ + for state in self.getCompletedStateValueList(explanation): + for path in state.getPredecessorRelatedValueList(): + if not path.isCompleted(explanation): + return state + return None + + def getLatestPartiallyCompletedStateValue(self, explanation): + """ + Returns the most advanced completed state + """ + for state in self.getCompletedStateValueList(explanation): + for path in state.getPredecessorRelatedValueList(): + if not path.isPartiallyCompleted(explanation): + return state + return None + + def getLatestCompletedStateValueList(self, explanation): + """ + Returns the most advanced completed state + """ + result = [] + for state in self.getCompletedStateValueList(explanation): + for path in state.getPredecessorRelatedValueList(): + if not path.isCompleted(explanation): + result.append(state) + return result + + def getLatestPartiallyCompletedStateValueList(self, explanation): + """ + Returns the most advanced completed state + """ + result = [] + for state in self.getCompletedStateValueList(explanation): + for path in state.getPredecessorRelatedValueList(): + if not path.isPartiallyCompleted(explanation): + result.append(state) + return result + + def build(self, explanation_relative_url): + """ + Build whatever is buildable + """ + explanation = self.restrictedTraverse(explanation_relative_url) + for path in self.getBuildablePathValueList(explanation): + path.build(explanation) + + def isStartDateReferential(self): # XXX - not in interface + return self.getReferentialDate() == 'start_date' + + def isStopDateReferential(self): # XXX - not in interface + return self.getReferentialDate() == 'stop_date' + + def getTradePhaseList(self): + """ + Returns all trade_phase of this business process + """ + path_list = self.objectValues(portal_type=self.getPortalBusinessPathTypeList()) + return filter(None, [path.getTradePhase() + for path in path_list]) + + def getRootExplanationPathValue(self): + """ + Returns a root path of this business process + """ + path_list = self.objectValues(portal_type=self.getPortalBusinessPathTypeList()) + path_list = filter(lambda x: x.isDeliverable(), path_list) + + if len(path_list) > 1: + raise Exception, "this business process has multi root paths" + + if len(path_list) == 1: + return path_list[0] + + def getHeadPathValueList(self, trade_phase_list=None): + """ + Returns a list of head path(s) of this business process + + trade_phase_list -- used to filtering, means that discovering + a list of head path with the trade_phase_list + """ + head_path_list = list() + for state in self.getStateValueList(): + if len(state.getSuccessorRelatedValueList()) == 0: + head_path_list += state.getPredecessorRelatedValueList() + + if trade_phase_list is not None: + _set = set(trade_phase_list) + _list = list() + # start to discover a head path with the trade_phase_list from head path(s) of whole + for path in head_path_list: + _list += self._getHeadPathValueList(path, _set) + head_path_list = map(lambda t: t[0], filter(lambda t: t != (None, None), _list)) + + return head_path_list + + def _getHeadPathValueList(self, path, trade_phase_set): + # if the path has target trade_phase, it is a head path. + _set = set(path.getTradePhaseList()) + if _set & trade_phase_set: + return [(path, _set & trade_phase_set)] + + node = path.getSuccessorValue() + if node is None: + return [(None, None)] + + _list = list() + for next_path in node.getPredecessorRelatedValueList(): + _list += self._getHeadPathValueList(next_path, trade_phase_set) + return _list -- 2.30.9