Commit 757672c2 authored by Yusuke Muraoka's avatar Yusuke Muraoka

- modified TransformationRule to use Business Process

  instead of Supply Chain.
- added/changed some features for above modified.



git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@27271 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 033bc689
...@@ -185,3 +185,11 @@ class BusinessProcess(Path, XMLObject): ...@@ -185,3 +185,11 @@ class BusinessProcess(Path, XMLObject):
def isStopDateReferential(self): def isStopDateReferential(self):
return self.getReferentialDate() == 'stop_date' return self.getReferentialDate() == 'stop_date'
def getTradePhaseList(self):
"""
Returns all trade_phase of this business process
"""
path_list = self.objectValues(portal_type=self.getPortalBusinessPathTypeList())
return filter(None, [path.getTradePhase()
for path in path_list])
...@@ -161,8 +161,10 @@ class BusinessState(XMLObject): ...@@ -161,8 +161,10 @@ class BusinessState(XMLObject):
""" """
remaining_trade_phase_list = [] remaining_trade_phase_list = []
for path in self.getPredecessorRelatedValueList(): for path in self.getPredecessorRelatedValueList():
if not (path.isCompleted(explanation) or # XXX When no simulations related to path, what should path.isCompleted return?
path.isPartiallyCompleted(explanation)): # if True we don't have way to add remaining trade phases to new movement
if not (path._getRelatedSimulationMovementList(explanation) and
path.isCompleted(explanation)):
remaining_trade_phase_list += path.getTradePhaseValueList() remaining_trade_phase_list += path.getTradePhaseValueList()
# collect to successor direction recursively # collect to successor direction recursively
......
...@@ -42,6 +42,8 @@ from Products.ERP5.Document.Predicate import Predicate ...@@ -42,6 +42,8 @@ from Products.ERP5.Document.Predicate import Predicate
from Products.CMFCategory.Renderer import Renderer from Products.CMFCategory.Renderer import Renderer
from Products.ERP5.AggregatedAmountList import AggregatedAmountList from Products.ERP5.AggregatedAmountList import AggregatedAmountList
from zLOG import LOG, WARNING
class Transformation(XMLObject, Predicate, Variated): class Transformation(XMLObject, Predicate, Variated):
""" """
Build of material - contains a list of transformed resources Build of material - contains a list of transformed resources
...@@ -223,7 +225,9 @@ class Transformation(XMLObject, Predicate, Variated): ...@@ -223,7 +225,9 @@ class Transformation(XMLObject, Predicate, Variated):
security.declareProtected(Permissions.AccessContentsInformation, security.declareProtected(Permissions.AccessContentsInformation,
'getAggregatedAmountList') 'getAggregatedAmountList')
def getAggregatedAmountList(self, context=None, REQUEST=None, def getAggregatedAmountList(self, context=None, REQUEST=None,
ind_phase_url_list=None, trade_phase_list=None,
# obsolete, use trade_phase_list instead
ind_phase_url_list=None,
rejected_resource_uid_list=None, rejected_resource_uid_list=None,
context_quantity=0,**kw): context_quantity=0,**kw):
""" """
...@@ -247,8 +251,14 @@ class Transformation(XMLObject, Predicate, Variated): ...@@ -247,8 +251,14 @@ class Transformation(XMLObject, Predicate, Variated):
transformation_line_list = [] transformation_line_list = []
for transformation in ([self]+template_transformation_list): for transformation in ([self]+template_transformation_list):
transformation_line_list.extend(transformation.objectValues()) transformation_line_list.extend(transformation.objectValues())
# Get only lines related to a precise trade_phase
if trade_phase_list is not None:
transformation_line_list = filter(
lambda line: line.getTradePhase() in trade_phase_list,
transformation_line_list)
# Get only lines related to a precise industrial_phase # Get only lines related to a precise industrial_phase
if ind_phase_url_list is not None: if ind_phase_url_list is not None:
LOG("Transformation", WARNING, "ind_phase_list is obsolete")
new_transf_line_list = [] new_transf_line_list = []
for line in transformation_line_list: for line in transformation_line_list:
ind_ph = line.getIndustrialPhaseValue() ind_ph = line.getIndustrialPhaseValue()
......
# -*- coding:utf-8 -*-
############################################################################## ##############################################################################
# #
# Copyright (c) 2002, 2005 Nexedi SARL and Contributors. All Rights Reserved. # Copyright (c) 2002-2009 Nexedi SARL and Contributors. All Rights Reserved.
# Jean-Paul Smets-Solanes <jp@nexedi.com> # Jean-Paul Smets-Solanes <jp@nexedi.com>
# Romain Courteaud <romain@nexedi.com> # Romain Courteaud <romain@nexedi.com>
# #
...@@ -27,298 +28,278 @@ ...@@ -27,298 +28,278 @@
# #
############################################################################## ##############################################################################
from ExtensionClass import Base
from AccessControl import ClassSecurityInfo from AccessControl import ClassSecurityInfo
from Acquisition import aq_base, aq_parent, aq_inner, aq_acquire from Acquisition import aq_base, aq_parent, aq_inner, aq_acquire
from Products.CMFCore.utils import getToolByName from Products.CMFCore.utils import getToolByName
from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface
from Products.ERP5.Document.Rule import Rule from Products.ERP5.Document.Rule import Rule
from Products.ERP5.Document.SimulationMovement import SimulationMovement
from Products.ERP5Type.Errors import TransformationRuleError from Products.ERP5Type.Errors import TransformationRuleError
from Products.ERP5.Document.TransformationSourcingRule import\
TransformationSourcingRuleMixin
from zLOG import LOG class TransformationRuleMixin(Base):
security = ClassSecurityInfo()
class TransformationRule(TransformationSourcingRuleMixin, Rule): security.declareProtected(Permissions.View, 'getTransformation')
def getTransformation(self, movement=None, applied_rule=None):
""" """
Order Rule object make sure an Order in the similation Return transformation related to used by the applied rule.
is consistent with the real order
""" """
# CMF Type Definition if movement is None and applied_rule is not None:
meta_type = 'ERP5 Transformation Rule' movement = applied_rule.getParentValue()
portal_type = 'Transformation Rule'
# Declarative security order_movement = movement.getRootSimulationMovement().getOrderValue()
security = ClassSecurityInfo() explanation = self.getExplanation(movement=movement,
security.declareObjectProtected(Permissions.AccessContentsInformation) applied_rule=applied_rule)
__implements__ = ( Interface.Predicate, # find line recursively
Interface.Rule ) order_line = order_movement
# Default Properties while order_line.getParentValue() != explanation:
property_sheets = ( PropertySheet.Base order_line = order_line.getParentValue()
script = order_line._getTypeBasedMethod('_getTransformation')
if script is not None:
transformation = script()
else:
line_transformation = order_line.objectValues(
portal_type=self.getPortalTransformationTypeList())
if len(line_transformation) == 1:
transformation = line_transformation[0]
else:
transformation = order_line.getSpecialiseValue(
portal_type=self.getPortalTransformationTypeList())
if transformation.getResource() == movement.getResource():
return transformation
security.declareProtected(Permissions.View, 'getBusinessProcess')
def getBusinessProcess(self, **kwargs):
"""
Return business process related to root causality.
"""
explanation = self.getExplanation(**kwargs)
if explanation is not None:
specialise = explanation.getSpecialiseValue()
business_process_type_list = self.getPortalBusinessProcessTypeList()
# because trade condition can be specialised
while specialise is not None and \
specialise.getPortalType() not in business_process_type_list:
specialise = specialise.getSpecialiseValue()
return specialise
security.declareProtected(Permissions.View, 'getRootExplanation')
def getRootExplanation(self, business_process):
"""
the method of ProductionOrderRule returns most tail path of business process
"""
if business_process is not None:
for business_path in business_process.contentValues(
portal_type=self.getPortalBusinessPathTypeList()):
if business_path.isDeliverable():
return business_path
security.declareProtected(Permissions.View, 'getExplanation')
def getExplanation(self, movement=None, applied_rule=None):
if applied_rule is not None:
return applied_rule.getRootAppliedRule().getCausalityValue()
else:
return movement.getRootSimulationMovement()\
.getOrderValue().getExplanationValue()
class TransformationRule(TransformationRuleMixin, Rule):
"""
"""
# CMF Type Definition
meta_type = 'ERP5 Transformation Rule'
portal_type = 'Transformation Rule'
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
__implements__ = ( Interface.Predicate,
Interface.Rule )
# Default Properties
property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject , PropertySheet.XMLObject
, PropertySheet.CategoryCore , PropertySheet.CategoryCore
, PropertySheet.DublinCore , PropertySheet.DublinCore
, PropertySheet.Task , PropertySheet.Task
) )
# Class variable
simulation_movement_portal_type = "Simulation Movement"
# Simulation workflow def getHeadProductionPathList(self, transformation, business_process):
security.declareProtected(Permissions.ModifyPortalContent, 'expand') """
def expand(self, applied_rule, **kw): Return list of path which is head of transformation trade_phases
"""
Expands the current movement downward. this method assumes trade_phase of head paths is only one
-> new status -> expanded """
An applied rule can be expanded only if its parent movement production_trade_phase_set = set([amount.getTradePhase()
is expanded. for amount in transformation\
""" .objectValues(portal_type='Transformation Transformed Resource')])
parent_movement = applied_rule.getParentValue() head_path_list = []
# Get production node and production section for state in business_process.objectValues(
production = parent_movement.getSource() portal_type=self.getPortalBusinessStateTypeList()):
production_section = parent_movement.getSourceSection() if len(state.getSuccessorRelatedValueList()) == 0:
# Get the current supply link used to calculate consumed resource head_path_list.extend(state.getPredecessorRelatedValueList())
# The current supply link is calculated from the parent AppliedRule.
supply_chain = self.getSupplyChain(parent_movement.getParentValue()) result_list = []
parent_supply_link = self.getCurrentSupplyLink(parent_movement) for path in head_path_list:
current_supply_link_list = supply_chain.\ result_list += self._getHeadPathByTradePhaseList(path, production_trade_phase_set)
getPreviousProductionSupplyLinkList(parent_supply_link)
if len(current_supply_link_list) != 1: return map(lambda t: t[0], filter(lambda t: t != (None, None), result_list))
# We shall no pass here.
# The test method returned a wrong value ! def _getHeadPathByTradePhaseList(self, path, trade_phase_set):
_set = set(path.getTradePhaseList())
if _set & trade_phase_set:
return [(path, _set & trade_phase_set)]
successor_node = path.getSuccessorValue()
if successor_node is None:
return [(None, None)]
_list = []
for next_path in successor_node.getPredecessorRelatedValueList():
_list += self._getHeadPathByTradePhaseList(next_path, trade_phase_set)
return _list
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule, **kw):
"""
"""
parent_movement = applied_rule.getParentValue()
transformation = self.getTransformation(movement=parent_movement)
business_process = self.getBusinessProcess(movement=parent_movement)
explanation = self.getExplanation(movement=parent_movement)
# get all trade_phase of the Business Process
trade_phase_list = business_process.getTradePhaseList()
# get head of production path from business process with trade_phase_list
head_production_path_list = self.getHeadProductionPathList(transformation,
business_process)
product_resource = transformation.getResource()
product_quantity = parent_movement.getNetQuantity()
product_quantity_unit = parent_movement.getQuantityUnit()
product_variation_category_list = parent_movement.getVariationCategoryList()
product_variation_property_dict = parent_movement.getVariationPropertyDict()
amount_dict = {}
# XXX Transformation.getAggregatedAmountList is useless, it can not have trade_phase, because Amout.
for amount in transformation.objectValues(portal_type='Transformation Transformed Resource'):
phase = amount.getTradePhase()
amount_dict.setdefault(phase, [])
amount_dict[phase].append(amount)
product_destination = None
for (phase, amount_list) in amount_dict.items():
if phase not in trade_phase_list:
raise TransformationRuleError,\ raise TransformationRuleError,\
"Expand must not be called on %r" %\ "Trade phase %r is not part of Business Process %r" % (phase, business_process)
applied_rule.getRelativeUrl()
else: phase_path_list = business_process.getPathValueList(phase)
current_supply_link = current_supply_link_list[0]
# Generate produced movement
movement_dict = self._expandProducedResource(applied_rule,
production,
production_section,
current_supply_link)
# Generate consumed movement
consumed_mvt_dict = self._expandConsumedResource(applied_rule,
production,
production_section,
current_supply_link)
movement_dict.update(consumed_mvt_dict)
# Finally, build movement
self._buildMovementList(applied_rule, movement_dict, **kw)
# Expand each movement created
Rule.expand(self, applied_rule, **kw)
def _expandProducedResource(self, applied_rule, production,
production_section, current_supply_link):
"""
Produced resource.
Create a movement for the resource produced by the transformation.
Only one produced movement can be created.
"""
parent_movement = applied_rule.getParentValue()
stop_date = parent_movement.getStartDate()
produced_movement_dict = {
'pr': {
"resource": parent_movement.getResource(),
# XXX what is lost quantity ?
"quantity": parent_movement.getQuantity(),# + lost_quantity,
"quantity_unit": parent_movement.getQuantityUnit(),
"variation_category_list":\
parent_movement.getVariationCategoryList(),
"variation_property_dict": \
parent_movement.getVariationPropertyDict(),
"source_list": (),
"source_section_list": (),
"destination": production,
"destination_section": production_section,
"deliverable": 1,
'start_date': current_supply_link.calculateStartDate(stop_date),
'stop_date': stop_date,
'causality_value': current_supply_link,
}
}
return produced_movement_dict
def _expandConsumedResource(self, applied_rule, production,
production_section, current_supply_link):
"""
Consumed resource.
Create a movement for each resource consumed by the transformation,
and for the previous variation of the produced resource.
"""
# Calculate all consumed resource
# Store each value in a dictionnary before created them.
# { movement_id: {property_name: property_value,} ,}
consumed_movement_dict = {}
parent_movement = applied_rule.getParentValue()
supply_chain = self.getSupplyChain(parent_movement.getParentValue())
# Consumed previous variation
previous_variation_dict = self._expandConsumedPreviousVariation(
applied_rule,
production,
production_section,
supply_chain,
current_supply_link)
consumed_movement_dict.update(previous_variation_dict)
# Consumed raw materials
raw_material_dict = self._expandConsumedRawMaterials(
applied_rule,
production,
production_section,
supply_chain,
current_supply_link)
consumed_movement_dict.update(raw_material_dict)
return consumed_movement_dict
def _expandConsumedPreviousVariation(self, applied_rule, production,
production_section, supply_chain,
current_supply_link):
"""
Create a movement for the previous variation of the produced resource.
"""
id_count = 1
consumed_movement_dict = {}
parent_movement = applied_rule.getParentValue()
# Calculate the variation category list of parent movement
base_category_list = parent_movement.getVariationBaseCategoryList()
if "industrial_phase" in base_category_list:
# We do not want to get the industrial phase variation
base_category_list.remove("industrial_phase")
category_list = parent_movement.getVariationCategoryList(
base_category_list=base_category_list)
# Calculate the previous variation
for previous_supply_link in supply_chain.\
getPreviousSupplyLinkList(current_supply_link):
previous_ind_phase_list = supply_chain.\
getPreviousProductionIndustrialPhaseList(previous_supply_link,
all=1)
if previous_ind_phase_list != []:
# Industrial phase is a category
ind_phase_list = [x.getRelativeUrl() for x in \
previous_ind_phase_list]
consumed_mvt_id = "%s_%s" % ("mr", id_count)
id_count += 1
stop_date = parent_movement.getStartDate()
consumed_movement_dict[consumed_mvt_id] = {
'start_date': current_supply_link.calculateStartDate(stop_date),
'stop_date': stop_date,
"resource": parent_movement.getResource(),
# XXX Is the quantity value correct ?
"quantity": parent_movement.getNetQuantity(), # getNetQuantity to support efficency from transformation
"quantity_unit": parent_movement.getQuantityUnit(),
"destination_list": (),
"destination_section_list": (),
"source": production,
"source_section": production_section,
"deliverable": 1,
"variation_category_list": category_list+ind_phase_list,
"variation_property_dict": \
parent_movement.getVariationPropertyDict(),
'causality_value': current_supply_link,
}
return consumed_movement_dict
def _expandConsumedRawMaterials(self, applied_rule, production,
production_section, supply_chain,
current_supply_link):
"""
Create a movement for each resource consumed by the transformation,
""" """
parent_movement = applied_rule.getParentValue() XXX: In this context, we assume quantity as ratio,
# Calculate the context for getAggregatedAmountList but this notion is consistent with transformation.
base_category_list = parent_movement.getVariationBaseCategoryList()
if "industrial_phase" in base_category_list:
# We do not want to get the industrial phase variation
base_category_list.remove("industrial_phase")
category_list = parent_movement.getVariationCategoryList(
base_category_list=base_category_list)
# Get the transformation to use
transformation = self.getTransformation(applied_rule)
# Generate the fake context
tmp_context = parent_movement.asContext(
context=parent_movement,
REQUEST={'categories':category_list})
# Calculate the industrial phase list
previous_ind_phase_list = supply_chain.\
getPreviousPackingListIndustrialPhaseList(current_supply_link)
ind_phase_id_list = [x.getRelativeUrl() for x in previous_ind_phase_list]
# Call getAggregatedAmountList
# XXX expand failed if transformation is not defined.
# Do we need to catch the exception ?
amount_list = transformation.getAggregatedAmountList(
tmp_context,
ind_phase_url_list=ind_phase_id_list)
# Add entries in the consumed_movement_dict
consumed_movement_dict = {}
for amount in amount_list:
consumed_mvt_id = "%s_%s" % ("cr", amount.getId())
stop_date = parent_movement.getStartDate()
resource_price = amount.getResourcePrice()
price = None
if resource_price is not None:
price = amount.getNetQuantity() * resource_price # getNetQuantity to support efficency from transformation
consumed_movement_dict[consumed_mvt_id] = {
'start_date': current_supply_link.calculateStartDate(stop_date),
'stop_date': stop_date,
"resource": amount.getResource(),
"variation_category_list":\
amount.getVariationCategoryList(),
"variation_property_dict": \
amount.getVariationPropertyDict(),
"quantity": amount.getNetQuantity() * parent_movement.getQuantity(), # getNetQuantity to support efficency from transformation
"price": price,
"quantity_unit": amount.getQuantityUnit(),
"destination_list": (),
"destination_section_list": (),
"source": production,
"source_section": production_section,
"deliverable": 1,
'causality_value': current_supply_link,
}
return consumed_movement_dict
security.declareProtected(Permissions.ModifyPortalContent, 'solve')
def solve(self, applied_rule, solution_list):
""" """
Solve inconsistency according to a certain number of solutions if sum(map(lambda path: path.getQuantity(), phase_path_list)) != 1:
templates. This updates the raise TransformationRuleError,\
"sum ratio of Trade Phase %r of Business Process %r is not one" % (phase, business_process)
-> new status -> solved for path in phase_path_list:
start_date = path.getExpectedStartDate(explanation)
stop_date = path.getExpectedStopDate(explanation)
predecessor_remaining_phase_list = path.getPredecessorValue()\
.getRemainingTradePhaseList(explanation,
trade_phase_list=trade_phase_list)
successor_remaining_phase_list = path.getSuccessorValue()\
.getRemainingTradePhaseList(explanation,
trade_phase_list=trade_phase_list)
if len(successor_remaining_trade_phase_list) == 0:
"""
Destinations of last paths for transformation must be same,
because paths for transformation must be integrated finally,
This applies a solution to an applied rule. Once valid graph
the solution is applied, the parent movement is checked. a --
If it does not diverge, the rule is reexpanded. If not, \--
diverge is called on the parent movement. X- b
""" /--
c --
security.declareProtected(Permissions.ModifyPortalContent, 'diverge') invalid graph
def diverge(self, applied_rule): a ------- b
"""
-> new status -> diverged
This basically sets the rule to "diverged" c ------- d
and blocks expansion process """
""" if product_destination is None:
product_destination = path.getDestination()
if product_destination != path.getDestination():
raise TransformationRuleError,\
"Transformation %r is not integrated on Business Process %r" % (transformation, business_process)
else:
# partial product movement
movement = applied_rule.newContent(portal_type="Simulation Movement")
movement.edit(causality_value=path,
start_date=path.getExpectedStartDate(explanation),
stop_date=path.getExpectedStopDate(explanation),
resource=product_resource,
quantity=-(product_quantity * path.getQuantity()),
quantity_unit=product_quantity_unit,
variation_category_list=product_variation_category_list,
variation_property_dict=product_variation_property_dict,
destination=path.getDestination(),
#destination_section=???,
trade_phase_value_list=successor_remaining_trade_phase_list)
# # Solvers # when the path is part of production but not first, consume previous partial product
# security.declareProtected(Permissions.View, 'isDivergent') if path not in head_production_path_list:
# def isDivergent(self, applied_rule): # consumed partial product movement
# """ movement = applied_rule.newContent(portal_type="Simulation Movement")
# Returns 1 if divergent rule movement.edit(causality_value=path,
# """ start_date=start_date,
# stop_date=stop_date,
# security.declareProtected(Permissions.View, 'getDivergenceList') resource=product_resource,
# def getDivergenceList(self, applied_rule): quantity=product_quantity * path.getQuantity(),
# """ quantity_unit=product_quantity_unit,
# Returns a list Divergence descriptors variation_category_list=product_variation_category_list,
# """ variation_property_dict=product_variation_property_dict,
# source=path.getSource(),
# security.declareProtected(Permissions.View, 'getSolverList') #source_section=???,
# def getSolverList(self, applied_rule): trade_phase_value_list=predecessor_remaining_trade_phase_list)
# """
# Returns a list Divergence solvers # consumption movement
# """ for amount in amount_list:
consumed_resource = amount.getResource()
# Deliverability / orderability consumed_quantity = product_quantity * amount.getQuantity() / amount.getEfficiency()
def isDeliverable(self, m): consumed_quantity_unit = amount.getQuantityUnit()
return 1
def isOrderable(self, m): # consume resource
return 0 movement = applied_rule.newContent(portal_type="Simulation Movement")
movement.edit(causality_value=path,
start_date=start_date,
stop_date=stop_date,
resource=consumed_resource,
quantity=consumed_quantity * path.getQuantity(),
quantity_unit=consumed_quantity_unit,
source=path.getSource(),
#source_section=???,
trade_phase=path.getTradePhase())
# product movement
movement = applied_rule.newContent(portal_type="Simulation Movement")
movement.edit(start_date=path.getExpectedStartDate(explanation),
stop_date=path.getExpectedStopDate(explanation),
resource=product_resource,
quantity=-(product_quantity),
quantity_unit=product_quantity_unit,
variation_category_list=product_variation_category_list,
variation_property_dict=product_variation_property_dict,
destination=product_destination,
#destination_section=???,
)
Rule.expand(self, applied_rule, **kw)
...@@ -85,3 +85,7 @@ class IBusinessProcess(IBusinessCompletable, IBusinessBuildable): ...@@ -85,3 +85,7 @@ class IBusinessProcess(IBusinessCompletable, IBusinessBuildable):
'explanation' is the Order or Item or Document which is the 'explanation' is the Order or Item or Document which is the
cause of a root applied rule in the simulation cause of a root applied rule in the simulation
""" """
def getTradePhaseList(self):
"""Returns list of all trade_phase of this Business Process
"""
# -*- coding: utf-8 -*-
##############################################################################
# Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved.
# Yusuke Muraoka <yusuke@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
import transaction
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from AccessControl.SecurityManagement import newSecurityManager
from DateTime import DateTime
from Products.ERP5Type.tests.Sequence import SequenceList
from Products.CMFCore.utils import getToolByName
from Products.ERP5Type.tests.utils import reindex
from Products.ERP5.Document.TransformationRule import TransformationRule
from Products.ERP5.tests.testBPMCore import TestBPMMixin
class TestMRPMixin(TestBPMMixin):
transformation_portal_type = 'Transformation'
transformed_resource_portal_type = 'Transformation Transformed Resource'
product_portal_type = 'Product'
def setUpOnce(self):
self.portal = self.getPortalObject()
def invalidateRules(self):
"""
do reversely of validateRules
"""
rule_tool = self.getRuleTool()
for rule in rule_tool.contentValues(
portal_type=rule_tool.getPortalRuleTypeList()):
rule.invalidate()
def createTransformation(self):
module = self.portal.getDefaultModule(
portal_type=self.transformation_portal_type)
return module.newContent(portal_type=self.transformation_portal_type)
def createTransformedResource(self, transformation=None):
if transformation is None:
transformation = self.createTransformation()
return transformation.newContent(
portal_type=self.transformed_resource_portal_type)
@reindex
def createCategories(self):
category_tool = getToolByName(self.portal, 'portal_categories')
self.createCategoriesInCategory(category_tool.base_amount, ['weight'])
self.createCategoriesInCategory(category_tool.base_amount.weight, ['kg'])
self.createCategoriesInCategory(category_tool.trade_phase, ['mrp',])
self.createCategoriesInCategory(category_tool.trade_phase.mrp,
['p' + str(i) for i in range(5)]) # phase0 ~ 4
def createProduct(self):
module = self.portal.getDefaultModule(
portal_type=self.product_portal_type)
return module.newContent(portal_type=self.product_portal_type)
@reindex
def createDefaultTransformation(self):
resource1 = self.createProduct()
resource2 = self.createProduct()
resource3 = self.createProduct()
resource4 = self.createProduct()
resource5 = self.createProduct()
transformation = self.createTransformation()
amount1 = self.createTransformedResource(transformation=transformation)
amount2 = self.createTransformedResource(transformation=transformation)
amount3 = self.createTransformedResource(transformation=transformation)
amount4 = self.createTransformedResource(transformation=transformation)
resource1.edit(title='product', quantity_unit_list=['weight/kg'])
resource2.edit(title='triangle', quantity_unit_list=['weight/kg'])
resource3.edit(title='box', quantity_unit_list=['weight/kg'])
resource4.edit(title='circle', quantity_unit_list=['weight/kg'])
resource5.edit(title='banana', quantity_unit_list=['weight/kg'])
transformation.edit(resource_value=resource1)
amount1.edit(resource_value=resource2, quantity=3,
quantity_unit_list=['weight/kg'], trade_phase='mrp/p2')
amount2.edit(resource_value=resource3, quantity=1,
quantity_unit_list=['weight/kg'], trade_phase='mrp/p2')
amount3.edit(resource_value=resource4, quantity=4,
quantity_unit_list=['weight/kg'], trade_phase='mrp/p3')
amount4.edit(resource_value=resource5, quantity=1,
quantity_unit_list=['weight/kg'], trade_phase='mrp/p3')
return transformation
@reindex
def createSimpleBusinessProcess(self):
""" mrp/p2 mrp/3
ready -------- partial_produced ------- done
"""
business_process = self.createBusinessProcess()
business_path_p2 = self.createBusinessPath(business_process)
business_path_p3 = self.createBusinessPath(business_process)
business_state_ready = self.createBusinessState(business_process)
business_state_partial = self.createBusinessState(business_process)
business_state_done = self.createBusinessState(business_process)
business_process.edit(referential_date='stop_date')
business_path_p2.edit(id='p2',
predecessor_value=business_state_ready,
successor_value=business_state_partial,
quantity=1,
trade_phase=['mrp/p2'])
business_path_p3.edit(id='p3',
predecessor_value=business_state_partial,
successor_value=business_state_done,
quantity=1,
deliverable=1, # root explanation
trade_phase=['mrp/p3'])
return business_process
@reindex
def createConcurrentBusinessProcess(self):
""" mrp/p2
ready ======== partial_produced
mrp/p3
"""
business_process = self.createBusinessProcess()
business_path_p2 = self.createBusinessPath(business_process)
business_path_p3 = self.createBusinessPath(business_process)
business_state_ready = self.createBusinessState(business_process)
business_state_partial = self.createBusinessState(business_process)
business_process.edit(referential_date='stop_date')
business_path_p2.edit(id='p2',
predecessor_value=business_state_ready,
successor_value=business_state_partial,
quantity=1,
trade_phase=['mrp/p2'])
business_path_p3.edit(id='p3',
predecessor_value=business_state_ready,
successor_value=business_state_partial,
quantity=1,
deliverable=1, # root explanation
trade_phase=['mrp/p3'])
return business_process
class TestMRPImplementation(TestMRPMixin, ERP5TypeTestCase):
"""the test for implementation"""
def test_TransformationRule_getHeadProductionPathList(self):
rule = self.portal.portal_rules.default_transformation_rule
transformation = self.createDefaultTransformation()
business_process = self.createSimpleBusinessProcess()
self.assertEquals([business_process.p2],
rule.getHeadProductionPathList(transformation, business_process))
business_process = self.createConcurrentBusinessProcess()
self.assertEquals(set([business_process.p2, business_process.p3]),
set(rule.getHeadProductionPathList(transformation, business_process)))
def test_TransformationRule_expand(self):
transformation = self.createDefaultTransformation()
"""
Simple case
"""
business_process = self.createSimpleBusinessProcess()
# mock order
order = self.portal.production_order_module.newContent(portal_type="Production Order")
order_line = order.newContent(portal_type="Production Order Line")
base_date = DateTime()
order.edit(specialise_value=business_process,
start_date=base_date,
stop_date=base_date+3,
source_section_value=order,
source_value=order)
order_line.edit(quantity=10)
order_line.setSpecialiseValue(transformation) # XXX Why can not define by edit?
# don't need another rules, just need TransformationRule for test
self.invalidateRules()
self.stepTic()
# alter simulations of the order
# root
applied_rule = self.portal.portal_simulation.newContent(portal_type='Applied Rule')
movement = applied_rule.newContent(portal_type='Simulation Movement')
applied_rule.edit(causality_value=order)
movement.edit(order_value=order_line,
quantity=order_line.getQuantity(),
resource=transformation.getResource())
# test mock
applied_rule = movement.newContent(potal_type='Applied Rule')
rule = self.portal.portal_rules.default_transformation_rule
rule.expand(applied_rule)
# assertion
expected_value_set = set([
('business_process_module/1/p2', 'product_module/1', 'mrp/p3', -10),
('business_process_module/1/p2', 'product_module/2', 'mrp/p2', 30),
('business_process_module/1/p2', 'product_module/3', 'mrp/p2', 10),
('business_process_module/1/p3', 'product_module/1', 'mrp/p3', 10),
('business_process_module/1/p3', 'product_module/4', 'mrp/p3', 40),
('business_process_module/1/p3', 'product_module/5', 'mrp/p3', 10),
(None, 'product_module/1', None, -10)])
movement_list = applied_rule.objectValues()
self.assertEquals(len(expected_value_set), len(movement_list))
movement_value_set = set([])
for movement in movement_list:
movement_value_set |= set([(movement.getCausality(),
movement.getResource(),
movement.getTradePhase(),
movement.getQuantity())])
self.assertEquals(expected_value_set, movement_value_set)
"""
Concurrent case
"""
business_process = self.createConcurrentBusinessProcess()
order.edit(specialise_value=business_process)
self.stepTic()
# alter simulations of the order
# root
applied_rule = self.portal.portal_simulation.newContent(portal_type='Applied Rule')
movement = applied_rule.newContent(portal_type='Simulation Movement')
applied_rule.edit(causality_value=order)
movement.edit(order_value=order_line,
quantity=order_line.getQuantity(),
resource=transformation.getResource())
# test mock
applied_rule = movement.newContent(potal_type='Applied Rule')
rule = self.portal.portal_rules.default_transformation_rule
rule.expand(applied_rule)
# assertion
expected_value_set = set([
('business_process_module/2/p2', 'product_module/2', 'mrp/p2', 30),
('business_process_module/2/p2', 'product_module/3', 'mrp/p2', 10),
('business_process_module/2/p3', 'product_module/4', 'mrp/p3', 40),
('business_process_module/2/p3', 'product_module/5', 'mrp/p3', 10),
(None, 'product_module/1', None, -10)])
movement_list = applied_rule.objectValues()
self.assertEquals(len(expected_value_set), len(movement_list))
movement_value_set = set([])
for movement in movement_list:
movement_value_set |= set([(movement.getCausality(),
movement.getResource(),
movement.getTradePhase(),
movement.getQuantity())])
self.assertEquals(expected_value_set, movement_value_set)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestMRPImplementation))
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment