Commit e33dfbb7 authored by Łukasz Nowak's avatar Łukasz Nowak

- decouple from Rule class to find minimal figure of BPMRule class

 - categorise helpers by overriding or not
 - run expand recursively on children


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@28153 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent c1208103
No related merge requests found
......@@ -31,9 +31,12 @@
import zope.interface
from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet, interfaces
from Products.ERP5.Document.Rule import Rule
from Products.ERP5.Document.Predicate import Predicate
from Products.ERP5Type.XMLObject import XMLObject
from Acquisition import aq_base
from Products.CMFCore.utils import getToolByName
class BPMRule(Rule):
class BPMRule(Predicate, XMLObject):
"""
DISCLAIMER: Do not use this in any production system.
This is only proof of concept and evaluation of new system
......@@ -72,7 +75,134 @@ class BPMRule(Rule):
, PropertySheet.AppliedRule
)
#### Helpers
movement_type = 'Simulation Movement'
security.declareProtected(Permissions.AccessContentsInformation,
'isAccountable')
def isAccountable(self, movement):
"""Tells wether generated movement needs to be accounted or not.
Only account movements which are not associated to a delivery;
Whenever delivery is there, delivery has priority
"""
return movement.getDeliveryValue() is None
security.declareProtected(Permissions.ModifyPortalContent,
'constructNewAppliedRule')
def constructNewAppliedRule(self, context, id=None, activate_kw=None):
"""
Creates a new applied rule which points to self
"""
portal_types = getToolByName(self, 'portal_types')
if id is None:
id = context.generateNewId()
if getattr(aq_base(context), id, None) is None:
context.newContent(id=id,
portal_type='Applied Rule',
specialise_value=self,
activate_kw=activate_kw)
return context.get(id)
# Simulation workflow
def test(self, *args, **kw):
"""
If no test method is defined, return False, to prevent infinite loop
"""
if not self.getTestMethodId():
return False
return Predicate.test(self, *args, **kw)
# Solvers
security.declareProtected( Permissions.AccessContentsInformation,
'isDivergent')
def isDivergent(self, sim_mvt, ignore_list=[]):
"""
Returns true if the Simulation Movement is divergent comparing to
the delivery value
"""
delivery = sim_mvt.getDeliveryValue()
if delivery is None:
return 0
if self.getDivergenceList(sim_mvt) == []:
return 0
else:
return 1
security.declareProtected(Permissions.View, 'getDivergenceList')
def getDivergenceList(self, sim_mvt):
"""
Return a list of messages that contains the divergences.
"""
result_list = []
for divergence_tester in self.contentValues(
portal_type=self.getPortalDivergenceTesterTypeList()):
result = divergence_tester.explain(sim_mvt)
result_list.extend(result)
return result_list
# Deliverability / orderability
def isOrderable(self, movement):
return 0
def isDeliverable(self, movement):
return 0
def isStable(self, applied_rule, **kw):
"""
- generate a list of previsions
- compare the prevision with existing children
- return 1 if they match, 0 else
"""
list = self._getCompensatedMovementList(applied_rule, **kw)
for e in list:
if len(e) > 0:
return 0
return 1
#### Helpers to overload
def _getExpandablePropertyUpdateDict(self, applied_rule, movement, business_path, **kw):
"""Rule specific dictionary used to update _getExpandablePropertyDict
This method might be overloaded.
"""
return {}
def _getInputMovementList(self, applied_rule):
"""Return list of movements for applied rule.
This method might be overloaded"""
return [applied_rule.getParentValue()]
def _generatePrevisionList(self, applied_rule, **kw):
"""
Generate a list of dictionaries, that contain calculated content of
current Simulation Movements in applied rule.
based on its context (parent movement, delivery, configuration ...)
These previsions are returned as dictionaries.
"""
input_movement_list = self._getInputMovementList(applied_rule)
business_process = applied_rule.getBusinessProcessValue()
input_movement_and_path_list = []
business_path_list = []
for input_movement in input_movement_list:
for business_path in business_process.getPathValueList(
self.getProperty('trade_phase_list'),
input_movement):
input_movement_and_path_list.append((input_movement, business_path))
business_path not in business_path_list and business_path_list \
.append(business_path)
if len(business_path_list) > 1:
raise NotImplementedError
prevision_dict_list = []
for input_movement, business_path in input_movement_and_path_list:
prevision_dict_list.append(self._getExpandablePropertyDict(applied_rule,
input_movement, business_path))
return prevision_dict_list
#### Helpers NOT to overload
def _getCurrentMovementList(self, applied_rule, **kw):
"""
Returns the list of current children of the applied rule, sorted in 3
......@@ -199,17 +329,6 @@ class BPMRule(Rule):
movement.getRelativeUrl())
return (add_list, modify_dict, delete_list)
def _getExpandablePropertyUpdateDict(self, applied_rule, movement, business_path, **kw):
"""Rule specific dictionary used to update _getExpandablePropertyDict
This method might be overloaded.
"""
return {}
def _getInputMovementList(self, applied_rule):
"""Return list of movements for applied rule.
This method might be overloaded"""
return [applied_rule.getParentValue()]
def _getExpandablePropertyDict(self, applied_rule, movement, business_path,
**kw):
"""
......@@ -255,38 +374,10 @@ class BPMRule(Rule):
movement, business_path, **kw))
return property_dict
def _generatePrevisionList(self, applied_rule, **kw):
"""
Generate a list of dictionaries, that contain calculated content of
current Simulation Movements in applied rule.
based on its context (parent movement, delivery, configuration ...)
These previsions are returned as dictionaries.
"""
input_movement_list = self._getInputMovementList(applied_rule)
business_process = applied_rule.getBusinessProcessValue()
input_movement_and_path_list = []
business_path_list = []
for input_movement in input_movement_list:
for business_path in business_process.getPathValueList(
self.getProperty('trade_phase_list'),
input_movement):
input_movement_and_path_list.append((input_movement, business_path))
business_path not in business_path_list and business_path_list \
.append(business_path)
if len(business_path_list) > 1:
raise NotImplementedError
prevision_dict_list = []
for input_movement, business_path in input_movement_and_path_list:
prevision_dict_list.append(self._getExpandablePropertyDict(applied_rule,
input_movement, business_path))
return prevision_dict_list
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule, force=0, **kw):
"""Generic expand with helpers.
Do NOT overload, use helpers."""
add_list, modify_dict, \
delete_list = self._getCompensatedMovementList(applied_rule, **kw)
......@@ -304,5 +395,5 @@ class BPMRule(Rule):
new_movement = applied_rule.newContent(id=movement_id,
portal_type=self.movement_type, **movement_dict)
# Pass to base class
Rule.expand(self, applied_rule, force=force, **kw) # Simulation workflow
for o in applied_rule.objectValues():
o.expand(**kw)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment