Commit 6b696094 authored by Romain Courteaud's avatar Romain Courteaud

Start to implement the new simulation rules.

http://erp5.org/Discussion/SimulationRules

By Alexandre Boeglin and Rafael Monnerat.
Verified by Romain Courteaud.

Currently, the rule 18 is not implemented (we don't copy yet order 
movement's properties to the simulation movement)

The behavior of the simulation should not be changed by this commit.



git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@9783 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent c4b84da0
......@@ -86,7 +86,7 @@ class AmortisationRule(Rule):
'correction': 'correction'
}
def test(self, movement):
def _test(self, movement):
"""
Tests if the rule (still) applies
"""
......
......@@ -28,7 +28,6 @@
from AccessControl import ClassSecurityInfo
from Products.CMFCore.utils import getToolByName
from Products.CMFCore.WorkflowCore import WorkflowMethod
from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type.PsycoWrapper import psyco
......@@ -37,20 +36,21 @@ from zLOG import LOG
class AppliedRule(XMLObject):
"""
An applied rule holds a list of simulation movements
An applied rule holds a list of simulation movements.
An applied rule points to an instance of Rule
(which defines the actual rule to apply with its parameters)
An applied rule points to an instance of Rule (which defines the actual
rule to apply with its parameters) through the specialise relation.
An applied rule can expand itself (look at its direct parent
and take conclusions on what should be inside). This is similar
to the base_fix_consistency mechanism
An applied rule can expand itself (look at its direct parent and take
conclusions on what should be inside).
An applied rule can "solve" or "backtrack". In this case
it looks at its children, looks at the difference between
target and actual, and takes conclusions on its parent
An applied rule can tell if it is stable (if its children are consistent
with what would be expanded from its direct parent).
All algorithms are implemented by the rule
An applied rule can tell if any of his direct children is divergent (not
consistent with the delivery).
All algorithms are implemented by the rule.
"""
# CMF Type Definition
......@@ -76,21 +76,21 @@ class AppliedRule(XMLObject):
security.declareProtected(Permissions.AccessContentsInformation, 'test')
def test(self):
"""
Tests if the rule (still) applies
Tests if the rule (still) applies
"""
my_parent = self.aq_parent
if my_parent is None: # Should be is portal_simulation
if self.isRootAppliedRule():
return 1
else:
parent_value = self.aq_parent
rule = self.getSpecialiseValue()
return rule.test(my_parent)
return rule.test(parent_value)
security.declareProtected(Permissions.AccessContentsInformation,
'isAccountable')
'isAccountable')
def isAccountable(self, movement):
"""Tells wether generated movement needs to be accounted or not."""
"""Tells whether generated movement needs to be accounted or not."""
return self.getSpecialiseValue().isAccountable(movement)
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, **kw):
"""
......@@ -110,11 +110,9 @@ class AppliedRule(XMLObject):
# XXX This part must be done with a interaction workflow is needed.
# if self.isRootAppliedRule():
# self.activate(
# after_method_id=["immediateReindexObject",
# after_method_id=["immediateReindexObject",
# "recursiveImmediateReindexObject"]).\
# notifySimulationChange(rule._v_notify_dict)
#expand = WorkflowMethod(expand)
security.declareProtected(Permissions.ModifyPortalContent, 'solve')
def solve(self, solution_list):
......@@ -133,8 +131,6 @@ class AppliedRule(XMLObject):
if rule is not None:
rule.solve(self)
#solve = WorkflowMethod(solve)
security.declareProtected(Permissions.ModifyPortalContent, 'diverge')
def diverge(self):
"""
......@@ -147,41 +143,41 @@ class AppliedRule(XMLObject):
if rule is not None:
rule.diverge(self)
#diverge = WorkflowMethod(diverge)
# Solvers
security.declareProtected(Permissions.View, 'isDivergent')
def isDivergent(self):
security.declareProtected(Permissions.AccessContentsInformation,
'isStable')
def isStable(self):
"""
Returns 1 if divergent rule
Tells whether the rule is stable or not.
"""
rule = self.getSpecialiseValue()
if rule is not None:
return rule.isDivergent(self)
return 0
return self.getSpecialiseValue().isStable(self)
security.declareProtected(Permissions.View, 'getDivergenceList')
def getDivergenceList(self):
security.declareProtected(Permissions.AccessContentsInformation,
'isDivergent')
def isDivergent(self, movement):
"""
Returns a list Divergence descriptors
Tells whether generated movement is divergent or not.
"""
rule = self.getSpecialiseValue()
if rule is not None:
return rule.getDivergenceList(self)
return ()
return self.getSpecialiseValue().isDivergent(movement)
security.declareProtected(Permissions.View, 'getSolverList')
def getSolverList(self):
security.declareProtected(Permissions.AccessContentsInformation,
'getDivergenceList')
def getDivergenceList(self, movement):
"""
Returns a list Divergence solvers
Returns a list Divergence descriptors
"""
rule = self.getSpecialiseValue()
if rule is not None:
return rule.getSolverList(self)
return ()
return self.getSpecialiseValue().getDivergenceList(movement)
security.declareProtected(Permissions.AccessContentsInformation,
'getSolverList')
def getSolverList(self, movement):
"""
Returns a list Divergence solvers
"""
return self.getSpecialiseValue().getSolverList(movement)
security.declareProtected(Permissions.AccessContentsInformation,
'isRootAppliedRule')
'isRootAppliedRule')
def isRootAppliedRule(self):
"""
Returns 1 is this is a root applied rule
......@@ -189,7 +185,7 @@ class AppliedRule(XMLObject):
return self.getParentValue().getMetaType() == "ERP5 Simulation Tool"
security.declareProtected(Permissions.AccessContentsInformation,
'getRootAppliedRule')
'getRootAppliedRule')
def getRootAppliedRule(self):
"""Return the root applied rule.
useful if some reindexing is needed from inside
......
......@@ -34,175 +34,187 @@ from Products.ERP5.Document.Rule import Rule
from zLOG import LOG
class DeliveryRule(Rule):
"""
Delivery Rule object make sure orphaned movements in a Delivery
(ie. movements which have no explanation in terms of order)
are part of the simulation process
"""
# CMF Type Definition
meta_type = 'ERP5 Delivery Rule'
portal_type = 'Delivery Rule'
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
__implements__ = ( Interface.Predicate,
Interface.Rule )
# Default Properties
property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject
, PropertySheet.CategoryCore
, PropertySheet.DublinCore
)
def _test(self, movement):
"""
Delivery Rule object make sure orphaned movements in a Delivery
(ie. movements which have no explanation in terms of order)
are part of the simulation process
Default behaviour of DeliveryRule.test
Tests if the rule (still) applies
"""
# A delivery rule never applies
# since it is always explicitely instanciated
return 0
# CMF Type Definition
meta_type = 'ERP5 Delivery Rule'
portal_type = 'Delivery Rule'
# Simulation workflow
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule, **kw):
"""
Expands the additional Delivery movements to a new simulation tree.
Expand is only allowed to create or modify simulation movements for
delivery lines which are not already linked to another simulation
movement.
If the movement is not in current state, has no delivered child, and not
in delivery movements, it can be deleted.
Else if the movement is not in current state, it can be modified.
Else, it cannot be modified.
"""
movement_type = 'Simulation Movement'
existing_movement_list = []
immutable_movement_list = []
delivery = applied_rule.getDefaultCausalityValue()
if delivery is not None:
delivery_movement_list = delivery.getMovementList()
# Check existing movements
for movement in applied_rule.contentValues(portal_type=movement_type):
if movement.getLastExpandSimulationState() not in \
delivery.getPortalCurrentInventoryStateList():
movement_delivery = movement.getDeliveryValue()
if not self._isTreeDelivered([movement], ignore_first=1) and \
movement_delivery not in delivery_movement_list:
applied_rule._delObject(movement.getId())
else:
existing_movement_list.append(movement)
else:
existing_movement_list.append(movement)
immutable_movement_list.append(movement)
# Create or modify movements
for movement in delivery.getMovementList():
related_delivery = movement.getDeliveryRelatedValue()
if related_delivery is None:
# create a new movement
if movement.getParentUid() == movement.getExplanationUid():
# We are on a line
new_id = movement.getId()
else:
# Weare on a cell
new_id = "%s_%s" % (movement.getParentId(), movement.getId())
# Generate the simulation movement
new_sim_mvt = applied_rule.newContent(
portal_type=movement_type,
id=new_id,
order_value=movement,
order_ratio=1,
delivery_value=movement,
delivery_ratio=1,
deliverable=1,
source=movement.getSource(),
source_section=movement.getSourceSection(),
destination=movement.getDestination(),
destination_section=movement.getDestinationSection(),
quantity=movement.getQuantity(),
resource=movement.getResource(),
variation_category_list=movement.getVariationCategoryList(),
variation_property_dict=movement.getVariationPropertyDict(),
start_date=movement.getStartDate(),
stop_date=movement.getStopDate())
elif related_delivery in existing_movement_list:
if related_delivery not in immutable_movement_list:
# modification allowed
related_delivery.edit(
delivery_value=movement,
delivery_ratio=1,
deliverable=1,
source=movement.getSource(),
source_section=movement.getSourceSection(),
destination=movement.getDestination(),
destination_section=movement.getDestinationSection(),
quantity=movement.getQuantity(),
resource=movement.getResource(),
variation_category_list=movement.getVariationCategoryList(),
variation_property_dict=movement.getVariationPropertyDict(),
start_date=movement.getStartDate(),
stop_date=movement.getStopDate(),
force_update=1)
else:
# modification disallowed, must compensate
pass
# Now we can set the last expand simulation state to the current state
applied_rule.setLastExpandSimulationState(delivery.getSimulationState())
# Pass to base class
Rule.expand(self, applied_rule, **kw)
security.declareProtected(Permissions.ModifyPortalContent, 'solve')
def solve(self, applied_rule, solution_list):
"""
Solve inconsistency according to a certain number of solutions
templates. This updates the
-> new status -> solved
This applies a solution to an applied rule. Once
the solution is applied, the parent movement is checked.
If it does not diverge, the rule is reexpanded. If not,
diverge is called on the parent movement.
"""
security.declareProtected(Permissions.ModifyPortalContent, 'diverge')
def diverge(self, applied_rule):
"""
-> new status -> diverged
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
This basically sets the rule to "diverged"
and blocks expansion process
"""
# Solvers
security.declareProtected(Permissions.AccessContentsInformation, 'isStable')
def isStable(self, movement):
"""
Checks that the applied_rule is stable
"""
return 0
security.declareProtected(Permissions.AccessContentsInformation, 'isDivergent')
def isDivergent(self, movement):
"""
Checks that the movement is divergent
"""
return Rule.isDivergent(self, movement)
__implements__ = ( Interface.Predicate,
Interface.Rule )
security.declareProtected(Permissions.AccessContentsInformation, 'getDivergenceList')
def getDivergenceList(self, applied_rule):
"""
Returns a list Divergence descriptors
"""
# Default Properties
property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject
, PropertySheet.CategoryCore
, PropertySheet.DublinCore
)
security.declareProtected(Permissions.AccessContentsInformation, 'getSolverList')
def getSolverList(self, applied_rule):
"""
Returns a list Divergence solvers
"""
def test(self, movement):
"""
Tests if the rule (still) applies
"""
# A delivery rule never applies
# since it is always explicitely instanciated
# Deliverability / orderability
def isOrderable(self, movement):
return 1
def isDeliverable(self, movement):
if movement.getSimulationState() in movement.getPortalDraftOrderStateList():
return 0
return 1
# Simulation workflow
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule,
movement_type_method='getPortalOrderMovementTypeList', **kw):
"""
Expands the current movement downwards.
-> new status -> expanded
An applied rule can be expanded only if its parent movement
is expanded.
"""
delivery_line_type = 'Simulation Movement'
# Get the delivery where we come from
# Causality is a kind of Delivery (ex. Packing List)
my_delivery = applied_rule.getDefaultCausalityValue()
# Only expand if my_delivery is not None
if my_delivery is not None:
#if my_delivery.getSimulationState() not in ('delivered', ):
# Even if delivered, we should always calculate consequences
# First, check each contained movement and make
# a list of delivery uids which do not need to be copied
# eventually delete movement which do not exist anylonger
existing_uid_list = []
existing_uid_list_append = existing_uid_list.append
order_movement_type_list = getattr(applied_rule,
movement_type_method)()
for movement in applied_rule.objectValues() :
delivery_value = movement.getDeliveryValue(
portal_type=order_movement_type_list)
if (delivery_value is None) or\
(delivery_value.hasCellContent()) or\
(len(delivery_value.getDeliveryRelatedValueList()) > 1):
# Our delivery_value is already related
# to another simulation movement
# Delete ourselve
# XXX Make sure this is not deleted if already in delivery
applied_rule._delObject(movement.getId())
else:
existing_uid_list_append(delivery_value.getUid())
# Copy each movement (line or cell) from the delivery is that
for delivery_movement in my_delivery.getMovementList():
simulation_movement_to_update_list = delivery_movement.\
getOrderRelatedValueList(portal_type = 'Simulation Movement')
try:
if len(delivery_movement.getDeliveryRelatedValueList()) == 0:
# Only create if orphaned movement
if delivery_movement.getUid() not in existing_uid_list:
# Generate a nicer ID
if delivery_movement.getParentUid() ==\
delivery_movement.getExplanationUid():
# We are on a line
new_id = delivery_movement.getId()
else:
# On a cell
new_id = "%s_%s" % (delivery_movement.getParentId(),
delivery_movement.getId())
# Generate the simulation movement
new_sim_mvt = applied_rule.newContent(
id = new_id,
portal_type = delivery_line_type,
order_value = delivery_movement)
simulation_movement_to_update_list.append(new_sim_mvt)
for simulation_movement in simulation_movement_to_update_list :
simulation_movement.edit(
delivery_value=delivery_movement,
# XXX Do we need to copy the quantity
# Why not the resource, the variation,...
# force_update is required in order
# to make sure the quantity is stored
# on the movement
quantity=delivery_movement.getQuantity(),
variation_category_list=\
delivery_movement.getVariationCategoryList(),
delivery_ratio=1,
deliverable=1,
force_update=1)
except AttributeError:
LOG('ERP5: WARNING', 0,
'AttributeError during expand on delivery line %s'\
% delivery_movement.absolute_url())
# Pass to base class
Rule.expand(self, applied_rule, **kw)
security.declareProtected(Permissions.ModifyPortalContent, 'solve')
def solve(self, applied_rule, solution_list):
"""
Solve inconsistency according to a certain number of solutions
templates. This updates the
-> new status -> solved
This applies a solution to an applied rule. Once
the solution is applied, the parent movement is checked.
If it does not diverge, the rule is reexpanded. If not,
diverge is called on the parent movement.
"""
security.declareProtected(Permissions.ModifyPortalContent, 'diverge')
def diverge(self, applied_rule):
"""
-> new status -> diverged
This basically sets the rule to "diverged"
and blocks expansion process
"""
# Solvers
security.declareProtected(Permissions.View, 'isDivergent')
def isDivergent(self, applied_rule):
"""
Returns 1 if divergent rule
"""
security.declareProtected(Permissions.View, 'getDivergenceList')
def getDivergenceList(self, applied_rule):
"""
Returns a list Divergence descriptors
"""
security.declareProtected(Permissions.View, 'getSolverList')
def getSolverList(self, applied_rule):
"""
Returns a list Divergence solvers
"""
# Deliverability / orderability
def isOrderable(self, m):
return 1
def isDeliverable(self, m):
if m.getSimulationState() in m.getPortalDraftOrderStateList():
return 0
return 1
......@@ -34,209 +34,313 @@ from Products.ERP5.Document.PredicateMatrix import PredicateMatrix
from zLOG import LOG, BLATHER, INFO, PROBLEM
class InvoiceTransactionRule(Rule, PredicateMatrix):
"""
Invoice Transaction Rule object generates accounting movements
for each invoice movement based on category membership and
other predicated. Template accounting movements are stored
in cells inside an instance of the InvoiceTransactionRule.
"""
# CMF Type Definition
meta_type = 'ERP5 Invoice Transaction Rule'
portal_type = 'Invoice Transaction Rule'
add_permission = Permissions.AddPortalContent
isPortalContent = 1
isRADContent = 1
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
__implements__ = ( Interface.Predicate,
Interface.Rule )
# Default Properties
property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject
, PropertySheet.CategoryCore
, PropertySheet.DublinCore
)
def _test(self, movement):
"""
Tests if the rule (still) applies
"""
Invoice Transaction Rule object generates accounting movements
for each invoice movement based on category membership and
other predicated. Template accounting movements are stored
in cells inside an instance of the InvoiceTransactionRule.
# An invoice transaction rule applies when the movement's
# parent is an invoice rule
parent = movement.getParentValue()
parent_rule_value = parent.getSpecialiseValue()
if parent_rule_value is None:
return 0
if parent_rule_value.getPortalType() in (
'Invoicing Rule', 'Invoice Rule'):
if self._getMatchingCell(movement) is not None:
return 1
return 0
WARNING: what to do with movement split ?
#### Helper method for expand
def _generatePrevisionList(self, applied_rule, **kw):
"""
Generate a list of movements, that should be children of this rule,
based on its context (parent movement, delivery, configuration ...)
# CMF Type Definition
meta_type = 'ERP5 Invoice Transaction Rule'
portal_type = 'Invoice Transaction Rule'
add_permission = Permissions.AddPortalContent
isPortalContent = 1
isRADContent = 1
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
__implements__ = ( Interface.Predicate,
Interface.Rule )
# Default Properties
property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject
, PropertySheet.CategoryCore
, PropertySheet.DublinCore
)
def test(self, movement):
"""
Tests if the rule (still) applies
"""
# An invoice transaction rule applies when the movement's
# parent is an invoice rule
parent = movement.getParentValue()
parent_rule_value = parent.getSpecialiseValue()
if parent_rule_value is None:
return 0
if parent_rule_value.getPortalType() in (
'Invoicing Rule', 'Invoice Rule'):
if self._getMatchingCell(movement) is not None:
return 1
return 0
These previsions are acrually returned as dictionaries.
"""
prevision_list = []
context_movement = applied_rule.getParentValue()
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule, force=0, **kw):
""" Expands the current movement downward.
"""
invoice_transaction_line_type = 'Simulation Movement'
# First, get the simulation movement we were expanded from
my_invoice_line_simulation = applied_rule.getParentValue()
# Next, we can try to expand the rule
if force or \
(applied_rule.getLastExpandSimulationState()\
not in self.getPortalReservedInventoryStateList() and \
applied_rule.getLastExpandSimulationState()\
not in self.getPortalCurrentInventoryStateList()):
# Find a matching cell
my_cell = self._getMatchingCell(my_invoice_line_simulation)
if my_cell is not None :
my_cell_transaction_id_list = my_cell.contentIds()
else :
my_cell_transaction_id_list = []
if my_cell is not None : # else, we do nothing
# check each contained movement and delete
# those that we don't need
for movement in applied_rule.objectValues():
if movement.getId() not in my_cell_transaction_id_list :
applied_rule.deleteContent(movement.getId())
# Add every movement from the Matrix to the Simulation
for transaction_line in my_cell.objectValues() :
if transaction_line.getId() in applied_rule.objectIds() :
my_simulation_movement = applied_rule[transaction_line.getId()]
else :
my_simulation_movement = applied_rule.newContent(
id = transaction_line.getId()
, portal_type=invoice_transaction_line_type)
# get the resource (in that order):
# * resource from the invoice (using deliveryValue)
# * price_currency from the invoice
# * price_currency from the parents simulation movement's
# deliveryValue
# * price_currency from the top level simulation movement's
# orderValue
resource = None
invoice_line = my_invoice_line_simulation.getDeliveryValue()
if invoice_line is not None :
invoice = invoice_line.getExplanationValue()
resource = invoice.getProperty('resource',
invoice.getProperty('price_currency', None))
if resource is None :
# search the resource on parents simulation movement's deliveries
simulation_movement = applied_rule.getParentValue()
portal_simulation = self.getPortalObject().portal_simulation
while resource is None and \
simulation_movement != portal_simulation :
delivery = simulation_movement.getDeliveryValue()
if delivery is not None:
resource = delivery.getProperty('price_currency', None)
if simulation_movement.getParentValue().getParentValue() \
== portal_simulation :
# we are on the first simulation movement, we'll try
# to get the resource from it's order price currency.
order = simulation_movement.getOrderValue()
resource = order.getProperty('price_currency', None)
simulation_movement = simulation_movement\
.getParentValue().getParentValue()
if resource is None :
# last resort : get the resource from the rule
resource = transaction_line.getResource() or my_cell.getResource()
if resource in (None, '') :
# XXX this happen in many order, so this log is probably useless
LOG("InvoiceTransactionRule", PROBLEM,
"expanding %s: without resource" % applied_rule.getPath())
my_simulation_movement._edit(
source = transaction_line.getSource()
, destination = transaction_line.getDestination()
, source_section = my_invoice_line_simulation.getSourceSection()
, destination_section = my_invoice_line_simulation\
.getDestinationSection()
, resource = resource
# calculate (quantity * price) * cell_quantity
, quantity = (my_invoice_line_simulation.getQuantity()
* my_invoice_line_simulation.getPrice())
* transaction_line.getQuantity()
, start_date = my_invoice_line_simulation.getStartDate()
, stop_date = my_invoice_line_simulation.getStopDate()
, force_update = 1
)
# Find a matching cell
cell = self._getMatchingCell(context_movement)
if cell is not None : # else, we do nothing
for transaction_line in cell.objectValues() :
# get the resource (in that order):
# * resource from the invoice (using deliveryValue)
# * price_currency from the invoice
# * price_currency from the parents simulation movement's
# deliveryValue
# * price_currency from the top level simulation movement's
# orderValue
resource = None
invoice_line = context_movement.getDeliveryValue()
if invoice_line is not None :
invoice = invoice_line.getExplanationValue()
resource = invoice.getProperty('resource',
invoice.getProperty('price_currency', None))
if resource is None :
# search the resource on parents simulation movement's deliveries
simulation_movement = applied_rule.getParentValue()
portal_simulation = self.getPortalObject().portal_simulation
while resource is None and \
simulation_movement != portal_simulation :
delivery = simulation_movement.getDeliveryValue()
if delivery is not None:
resource = delivery.getProperty('price_currency', None)
if simulation_movement.getParentValue().getParentValue() \
== portal_simulation :
# we are on the first simulation movement, we'll try
# to get the resource from it's order price currency.
order = simulation_movement.getOrderValue()
resource = order.getProperty('price_currency', None)
simulation_movement = simulation_movement\
.getParentValue().getParentValue()
if resource is None :
# last resort : get the resource from the rule
resource = transaction_line.getResource() or cell.getResource()
if resource in (None, '') :
# XXX this happen in many order, so this log is probably useless
LOG("InvoiceTransactionRule", PROBLEM,
"expanding %s: without resource" % applied_rule.getPath())
prevision_line = {}
prevision_line.update(
id = transaction_line.getId(),
source = transaction_line.getSource(),
destination = transaction_line.getDestination(),
source_section = context_movement.getSourceSection(),
destination_section = context_movement.getDestinationSection(),
resource = resource,
# calculate (quantity * price) * cell_quantity
quantity = (context_movement.getCorrectedQuantity() *
context_movement.getPrice()) * transaction_line.getQuantity(),
start_date = context_movement.getStartDate(),
stop_date = context_movement.getStopDate(),
force_update = 1)
prevision_list.append(prevision_line)
return prevision_list
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule, force=0, **kw):
"""
Expands the rule:
- generate a list of previsions
- compare the prevision with existing children
- get the list of existing movements (immutable, mutable, deletable)
- compute the difference between prevision and existing (add,
modify, remove)
- add/modify/remove child movements to match prevision
"""
add_list, modify_dict, \
delete_list = self._getCompensatedMovementList(applied_rule,
matching_property_list=['resource', 'source', 'destination'], **kw)
if len(add_list) or len(modify_dict):
pass#import pdb; pdb.set_trace()
for movement_id in delete_list:
applied_rule._delObject(movement_id)
# Pass to base class
Rule.expand(self, applied_rule, force=force, **kw)
# Matrix related
security.declareProtected( Permissions.ModifyPortalContent,
'newCellContent' )
def newCellContent(self, id, **kw):
"""
Creates a new Cell.
"""
self.invokeFactory(type_name='Accounting Rule Cell', id=id)
new_cell = self.get(id)
return new_cell
for movement, prop_dict in modify_dict.items():
applied_rule[movement].edit(**prop_dict)
for movement_dict in add_list:
if 'id' in movement_dict.keys():
mvmt_id = applied_rule._get_id(movement_dict.pop('id'))
new_mvmt = applied_rule.newContent(id=mvmt_id,
portal_type=self.movement_type)
else:
new_mvmt = applied_rule.newContent(portal_type=self.movement_type)
new_mvmt.edit(**movement_dict)
# Pass to base class
Rule.expand(self, applied_rule, force=force, **kw)
#### old expand method kept for reference
def old_expand(self, applied_rule, force=0, **kw):
""" Expands the current movement downward.
"""
# First, get the simulation movement we were expanded from
my_invoice_line_simulation = applied_rule.getParentValue()
security.declareProtected(Permissions.ModifyPortalContent, 'solve')
def solve(self, applied_rule, solution_list):
"""
Solve inconsistency according to a certain number of solutions
templates. This updates the
-> new status -> solved
This applies a solution to an applied rule. Once
the solution is applied, the parent movement is checked.
If it does not diverge, the rule is reexpanded. If not,
diverge is called on the parent movement.
"""
security.declareProtected(Permissions.ModifyPortalContent, 'diverge')
def diverge(self, applied_rule):
"""
-> new status -> diverged
This basically sets the rule to "diverged"
and blocks expansion process
"""
# Solvers
security.declareProtected(Permissions.View, 'isDivergent')
def isDivergent(self, applied_rule):
"""
Returns 1 if divergent rule
"""
security.declareProtected(Permissions.View, 'getDivergenceList')
def getDivergenceList(self, applied_rule):
"""
Returns a list Divergence descriptors
"""
security.declareProtected(Permissions.View, 'getSolverList')
def getSolverList(self, applied_rule):
"""
Returns a list Divergence solvers
"""
# Deliverability / orderability
def isOrderable(self, m):
return 1
def isDeliverable(self, m):
if m.getSimulationState() in self.getPortalDraftOrderStateList():
return 0
return 1
# Next, we can try to expand the rule
if force or \
(applied_rule.getLastExpandSimulationState()\
not in self.getPortalReservedInventoryStateList() and \
applied_rule.getLastExpandSimulationState()\
not in self.getPortalCurrentInventoryStateList()):
# Find a matching cell
my_cell = self._getMatchingCell(my_invoice_line_simulation)
if my_cell is not None :
my_cell_transaction_id_list = my_cell.contentIds()
else :
my_cell_transaction_id_list = []
if my_cell is not None : # else, we do nothing
# check each contained movement and delete
# those that we don't need
for movement in applied_rule.objectValues():
if movement.getId() not in my_cell_transaction_id_list :
applied_rule.deleteContent(movement.getId())
# Add every movement from the Matrix to the Simulation
for transaction_line in my_cell.objectValues() :
if transaction_line.getId() in applied_rule.objectIds() :
my_simulation_movement = applied_rule[transaction_line.getId()]
else :
my_simulation_movement = applied_rule.newContent(
id = transaction_line.getId()
, portal_type=self.movement_type)
# get the resource (in that order):
# * resource from the invoice (using deliveryValue)
# * price_currency from the invoice
# * price_currency from the parents simulation movement's
# deliveryValue
# * price_currency from the top level simulation movement's
# orderValue
resource = None
invoice_line = my_invoice_line_simulation.getDeliveryValue()
if invoice_line is not None :
invoice = invoice_line.getExplanationValue()
resource = invoice.getProperty('resource',
invoice.getProperty('price_currency', None))
if resource is None :
# search the resource on parents simulation movement's deliveries
simulation_movement = applied_rule.getParentValue()
portal_simulation = self.getPortalObject().portal_simulation
while resource is None and \
simulation_movement != portal_simulation :
delivery = simulation_movement.getDeliveryValue()
if delivery is not None:
resource = delivery.getProperty('price_currency', None)
if simulation_movement.getParentValue().getParentValue() \
== portal_simulation :
# we are on the first simulation movement, we'll try
# to get the resource from it's order price currency.
order = simulation_movement.getOrderValue()
resource = order.getProperty('price_currency', None)
simulation_movement = simulation_movement\
.getParentValue().getParentValue()
if resource is None :
# last resort : get the resource from the rule
resource = transaction_line.getResource() or my_cell.getResource()
if resource in (None, '') :
# XXX this happen in many order, so this log is probably useless
LOG("InvoiceTransactionRule", PROBLEM,
"expanding %s: without resource" % applied_rule.getPath())
my_simulation_movement._edit(
source = transaction_line.getSource()
, destination = transaction_line.getDestination()
, source_section = my_invoice_line_simulation.getSourceSection()
, destination_section = my_invoice_line_simulation\
.getDestinationSection()
, resource = resource
# calculate (quantity * price) * cell_quantity
, quantity = (my_invoice_line_simulation.getQuantity()
* my_invoice_line_simulation.getPrice())
* transaction_line.getQuantity()
, start_date = my_invoice_line_simulation.getStartDate()
, stop_date = my_invoice_line_simulation.getStopDate()
, force_update = 1
)
# Pass to base class
Rule.expand(self, applied_rule, force=force, **kw)
# Matrix related
security.declareProtected( Permissions.ModifyPortalContent,
'newCellContent' )
def newCellContent(self, id, **kw):
"""
Creates a new Cell.
"""
self.invokeFactory(type_name='Accounting Rule Cell', id=id)
new_cell = self.get(id)
return new_cell
security.declareProtected(Permissions.ModifyPortalContent, 'solve')
def solve(self, applied_rule, solution_list):
"""
Solve inconsistency according to a certain number of solutions
templates. This updates the
-> new status -> solved
This applies a solution to an applied rule. Once
the solution is applied, the parent movement is checked.
If it does not diverge, the rule is reexpanded. If not,
diverge is called on the parent movement.
"""
security.declareProtected(Permissions.ModifyPortalContent, 'diverge')
def diverge(self, applied_rule):
"""
-> new status -> diverged
This basically sets the rule to "diverged"
and blocks expansion process
"""
# Solvers
security.declareProtected(Permissions.View, 'isDivergent')
def isDivergent(self, applied_rule):
"""
Returns 1 if divergent rule
"""
security.declareProtected(Permissions.View, 'getDivergenceList')
def getDivergenceList(self, applied_rule):
"""
Returns a list Divergence descriptors
"""
security.declareProtected(Permissions.View, 'getSolverList')
def getSolverList(self, applied_rule):
"""
Returns a list Divergence solvers
"""
# Deliverability / orderability
def isOrderable(self, m):
return 1
def isDeliverable(self, m):
if m.getSimulationState() in self.getPortalDraftOrderStateList():
return 0
return 1
......@@ -33,119 +33,199 @@ from Products.CMFCore.utils import getToolByName
from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface
from Products.ERP5.Document.Rule import Rule
#from Products.ERP5Type.Base import TempBase
from zLOG import LOG
class InvoicingRule(Rule):
"""
Invoicing Rule expand simulation created by a order or delivery rule.
"""
# CMF Type Definition
meta_type = 'ERP5 Invoicing Rule'
portal_type = 'Invoicing Rule'
add_permission = Permissions.AddPortalContent
isPortalContent = 1
isRADContent = 1
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
__implements__ = ( Interface.Predicate,
Interface.Rule )
# Default Properties
property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject
, PropertySheet.CategoryCore
, PropertySheet.DublinCore
)
security.declareProtected(Permissions.AccessContentsInformation,
'isAccountable')
def isAccountable(self, movement):
"""
Invoicing Rule expand simulation created by a order rule.
Tells wether generated movement needs to be accounted or not.
Invoice movement are never accountable, so simulation movement for
invoice movements should not be accountable either.
"""
return 0
# CMF Type Definition
meta_type = 'ERP5 Invoicing Rule'
portal_type = 'Invoicing Rule'
add_permission = Permissions.AddPortalContent
isPortalContent = 1
isRADContent = 1
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
__implements__ = ( Interface.Predicate,
Interface.Rule )
# Default Properties
property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject
, PropertySheet.CategoryCore
, PropertySheet.DublinCore
)
security.declareProtected(Permissions.AccessContentsInformation,
'isAccountable')
def isAccountable(self, movement):
"""Tells wether generated movement needs to be accounted or not.
Invoice movement are never accountable, so simulation movement for
invoice movements should not be accountable either.
"""
return 0
security.declareProtected(Permissions.AccessContentsInformation, 'test')
def test(self, movement):
"""
Tests if the rule (still) applies
"""
parent = movement.getParentValue()
result = 0
if (parent.getPortalType() == 'Applied Rule') and \
(parent.getSpecialiseId() in ('default_order_rule',
'default_delivery_rule' )):
result = 1
return result
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule, **kw):
""" Expands the current movement downward.
"""
delivery_line_type = 'Simulation Movement'
# Source that movement from the next node / stock
my_context_movement = applied_rule.getParentValue()
# Do not invoice within the same entity or whenever entities are
# not all defined
# It is OK to invoice within different entities of the same company
# if we wish to get some internal analytical accounting
# but that requires some processing to produce a balance sheet
source_section = my_context_movement.getSourceSection()
destination_section = my_context_movement.getDestinationSection()
if source_section == destination_section or source_section is None \
or destination_section is None:
return Rule.expand(self, applied_rule, **kw)
if my_context_movement.getSource() is not None:
# XXX Please explain why ? Let us consider for
# example a consumption movement of garbage which we
# want to be invoiced (the cleanup company is working
# within our premises)
#
# We should only expand movements if they have a source
# otherwise, it creates infinite recursion
# This happens for example whenever the source of a movement is
# acquired from an order which is deleted afterwards
new_id = 'inv_mvt'
if new_id in applied_rule.objectIds():
invoice_line = applied_rule[new_id]
else:
invoice_line = applied_rule.newContent(
type_name = delivery_line_type,
id = new_id
)
# Edit movement
invoice_line._edit(
price = my_context_movement.getPrice(),
quantity = my_context_movement.getQuantity(),
quantity_unit = my_context_movement.getQuantityUnit(),
efficiency = my_context_movement.getEfficiency(),
resource = my_context_movement.getResource(),
variation_category_list = my_context_movement.\
getVariationCategoryList(),
variation_property_dict = my_context_movement.\
getVariationPropertyDict(),
start_date = my_context_movement.getStartDate(),
stop_date = my_context_movement.getStopDate(),
source = my_context_movement.getSource(),
source_section = source_section,
destination = my_context_movement.getDestination(),
destination_section = destination_section,
# We do need to collect invoice lines to build invoices
deliverable = 1,
)
def _test(self, movement):
"""
Tests if the rule (still) applies
"""
parent = movement.getParentValue()
result = 0
if (parent.getPortalType() == 'Applied Rule') and \
(parent.getSpecialiseId() in ('default_order_rule',
'default_delivery_rule' )):
result = 1
return result
#### Helper method for expand
def _generatePrevisionList(self, applied_rule, **kw):
"""
Generate a list of movements, that should be children of this rule,
based on its context (parent movement, delivery, configuration ...)
These previsions are acrually returned as dictionaries.
"""
# XXX Isn't it better to share the code with expand method
context_movement = applied_rule.getParentValue()
# Do not invoice within the same entity or whenever entities are not all
# defined.
# It could be OK to invoice within different entities of the same
# company if we wish to get some internal analytical accounting but that
# requires some processing to produce a balance sheet.
source_section = context_movement.getSourceSection()
destination_section = context_movement.getDestinationSection()
if source_section == destination_section or source_section is None \
or destination_section is None:
return []
invoice_line = {}
invoice_line.update(
price=context_movement.getPrice(),
quantity=context_movement.getCorrectedQuantity(),
quantity_unit=context_movement.getQuantityUnit(),
efficiency=context_movement.getEfficiency(),
resource=context_movement.getResource(),
variation_category_list=context_movement.getVariationCategoryList(),
variation_property_dict=context_movement.getVariationPropertyDict(),
start_date=context_movement.getStartDate(),
stop_date=context_movement.getStopDate(),
source=context_movement.getSource(), source_section=source_section,
destination=context_movement.getDestination(),
destination_section=destination_section,
# We do need to collect invoice lines to build invoices
deliverable=1
)
return [invoice_line]
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule, force=0, **kw):
"""
Expands the rule:
- generate a list of previsions
- compare the prevision with existing children
- get the list of existing movements (immutable, mutable, deletable)
- compute the difference between prevision and existing (add,
modify, remove)
- add/modify/remove child movements to match prevision
"""
add_list, modify_dict, \
delete_list = self._getCompensatedMovementList(applied_rule, **kw)
for movement_id in delete_list:
applied_rule._delObject(movement_id)
# Create one submovement which sources the transformation
Rule.expand(self, applied_rule, **kw)
for movement, prop_dict in modify_dict.items():
#XXX ignore start_date and stop_date if the difference is smaller than a
# rule defined value
for prop in ('start_date', 'stop_date'):
if prop in prop_dict.keys():
prop_dict.pop(prop)
applied_rule[movement].edit(**prop_dict)
for movement_dict in add_list:
if 'id' in movement_dict.keys():
mvmt_id = applied_rule._get_id(movement_dict.pop('id'))
new_mvmt = applied_rule.newContent(id=mvmt_id,
portal_type=self.movement_type)
else:
new_mvmt = applied_rule.newContent(portal_type=self.movement_type)
new_mvmt.edit(**movement_dict)
def isDeliverable(self, m):
return m.getResource() is not None
# Pass to base class
Rule.expand(self, applied_rule, force=force, **kw)
def isDeliverable(self, movement):
return movement.getResource() is not None
#### old expand method kept for reference
def old_expand(self, applied_rule, **kw):
"""
Expands the current movement downward.
"""
delivery_line_type = 'Simulation Movement'
# Source that movement from the next node / stock
context_movement = applied_rule.getParentValue()
# Do not invoice within the same entity or whenever entities are
# not all defined
# It is OK to invoice within different entities of the same company
# if we wish to get some internal analytical accounting
# but that requires some processing to produce a balance sheet
source_section = context_movement.getSourceSection()
destination_section = context_movement.getDestinationSection()
if source_section == destination_section or source_section is None \
or destination_section is None:
return Rule.expand(self, applied_rule, **kw)
if context_movement.getSource() is not None:
# XXX Please explain why ? Let us consider for
# example a consumption movement of garbage which we
# want to be invoiced (the cleanup company is working
# within our premises)
#
# We should only expand movements if they have a source
# otherwise, it creates infinite recursion
# This happens for example whenever the source of a movement is
# acquired from an order which is deleted afterwards
new_id = 'inv_mvt'
if new_id in applied_rule.objectIds():
invoice_line = applied_rule[new_id]
else:
invoice_line = applied_rule.newContent(
type_name = delivery_line_type,
id = new_id
)
# Edit movement
invoice_line._edit(
price = context_movement.getPrice(),
quantity = context_movement.getQuantity(),
quantity_unit = context_movement.getQuantityUnit(),
efficiency = context_movement.getEfficiency(),
resource = context_movement.getResource(),
variation_category_list = context_movement.\
getVariationCategoryList(),
variation_property_dict = context_movement.\
getVariationPropertyDict(),
start_date = context_movement.getStartDate(),
stop_date = context_movement.getStopDate(),
source = context_movement.getSource(),
source_section = source_section,
destination = context_movement.getDestination(),
destination_section = destination_section,
# We do need to collect invoice lines to build invoices
deliverable = 1,
)
# Create one submovement which sources the transformation
Rule.expand(self, applied_rule, **kw)
......@@ -35,7 +35,7 @@ from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5.Document.Amount import Amount
from zLOG import LOG
from zLOG import LOG, WARNING
class Movement(XMLObject, Amount):
"""
......@@ -394,6 +394,25 @@ class Movement(XMLObject, Amount):
return 1
return 0
security.declareProtected(Permissions.AccessContentsInformation,
'isFrozen')
def isFrozen(self):
"""
Returns the frozen status of this movemnt.
a movement in started, stopped, delivered is automatically frozen.
If frozen is locally set to '0', we must check for a parent set to '1', in
which case, we want the children to be frozen as well.
"""
# XXX Hardcoded
# Maybe, we should use getPortalCurrentInventoryStateList
# and another portal method for cancelled (and deleted)
LOG("Movement, isFrozen", WARNING, "Hardcoded state list")
if self.getSimulationState() in ('stopped', 'delivered', 'cancelled'):
return 1
if self._baseIsFrozen() == 0:
self._baseSetFrozen(None)
return self._baseGetFrozen() or 0
security.declareProtected( Permissions.AccessContentsInformation,
'getExplanation')
def getExplanation(self):
......
......@@ -31,105 +31,141 @@ from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface
from Products.ERP5.Document.Rule import Rule
from Products.ERP5.Document.DeliveryRule import DeliveryRule
from zLOG import LOG
from zLOG import LOG, WARNING
class OrderRule(DeliveryRule):
"""
Order Rule object make sure an Order in the similation
is consistent with the real order
WARNING: what to do with movement split ?
"""
# CMF Type Definition
meta_type = 'ERP5 Order Rule'
portal_type = 'Order Rule'
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
__implements__ = ( Interface.Predicate,
Interface.Rule )
# Default Properties
property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject
, PropertySheet.CategoryCore
, PropertySheet.DublinCore
)
# Simulation workflow
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule, force=0, **kw):
"""
Order Rule object make sure an Order in the similation
is consistent with the real order
Expands the Order to a new simulation tree.
expand is only allowed to modify a simulation movement if it doesn't
have a delivery relation yet.
WARNING: what to do with movement split ?
If the movement is in ordered or planned state, has no delivered
child, and is not in order, it can be deleted.
Else, if the movement is in ordered or planned state, has no
delivered child, and is in order, it can be modified.
Else, it cannot be modified.
"""
movement_type = 'Simulation Movement'
existing_movement_list = []
immutable_movement_list = []
order = applied_rule.getDefaultCausalityValue()
if order is not None:
order_movement_list = order.getMovementList()
# check existing movements
for movement in applied_rule.contentValues(portal_type=movement_type):
if (not movement.getLastExpandSimulationState() in
order.getPortalReservedInventoryStateList() and
not movement.getLastExpandSimulationState() in
order.getPortalCurrentInventoryStateList()) and \
not self._isTreeDelivered([movement]):
# CMF Type Definition
meta_type = 'ERP5 Order Rule'
portal_type = 'Order Rule'
movement_order = movement.getOrderValue()
if movement_order in order_movement_list:
existing_movement_list.append(movement)
else:
applied_rule._delObject(movement.getId())
else:
existing_movement_list.append(movement)
immutable_movement_list.append(movement)
# Create or modify movements
for movement in order_movement_list:
related_order = movement.getOrderRelatedValue()
if related_order is None:
if movement.getParentUid() == movement.getExplanationUid():
# We are on a line
new_id = movement.getId()
else:
# We are on a cell
new_id = "%s_%s" % (movement.getParentId(), movement.getId())
# Generate a simulation movement
LOG("OrderRule, expand", WARNING, "Hardcoded state list")
applied_rule.newContent(
portal_type=movement_type,
id=new_id,
order_value=movement,
order_ratio=1,
delivery_ratio=1,
deliverable=1,
# source=movement.getSource(),
# source_section=movement.getSourceSection(),
# destination=movement.getDestination(),
# destination_section=movement.getDestinationSection(),
# quantity=movement.getQuantity(),
# resource=movement.getResource(),
# variation_category_list=movement.getVariationCategoryList(),
# variation_property_dict=movement.getVariationPropertyDict(),
# start_date=movement.getStartDate(),
# stop_date=movement.getStopDate(),
**kw)
elif related_order in existing_movement_list:
if related_order not in immutable_movement_list:
# modification allowed
related_order.edit(
order_value=movement,
# source=movement.getSource(),
# source_section=movement.getSourceSection(),
# destination=movement.getDestination(),
# destination_section=movement.getDestinationSection(),
# quantity=movement.getQuantity(),
# resource=movement.getResource(),
# variation_category_list=movement.getVariationCategoryList(),
# variation_property_dict=movement.getVariationPropertyDict(),
# start_date=movement.getStartDate(),
# stop_date=movement.getStopDate(),
**kw)
#related_order.setLastExpandSimulationState(order.getSimulationState())
else:
# modification disallowed, must compensate
pass
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
__implements__ = ( Interface.Predicate,
Interface.Rule )
# Now we can set the last expand simulation state to the current state
applied_rule.setLastExpandSimulationState(order.getSimulationState())
# Pass to base class
Rule.expand(self, applied_rule, force=force, **kw)
# Default Properties
property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject
, PropertySheet.CategoryCore
, PropertySheet.DublinCore
)
security.declareProtected(Permissions.AccessContentsInformation, 'isStable')
def isStable(self, applied_rule):
"""
Checks that the applied_rule is stable
"""
LOG('OrderRule.isStable', WARNING, 'Not Implemented')
return 1
# Simulation workflow
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule, force=0, **kw):
"""
Expands the current movement downward.
-> new status -> expanded
An applied rule can be expanded only if its parent movement
is expanded.
"""
delivery_line_type = 'Simulation Movement'
# Get the order when we come from
my_order = applied_rule.getDefaultCausalityValue()
# Only expand if my_order is not None and state is not 'confirmed'
if my_order is not None:
# Only expand order rule if order not yet confirmed (This is consistent
# with the fact that once simulation is launched, we stick to it)
state = applied_rule.getLastExpandSimulationState()
if force or \
(state not in applied_rule.getPortalReservedInventoryStateList()\
and state not in applied_rule.getPortalCurrentInventoryStateList()):
# First, check each contained movement and make
# a list of order ids which do not need to be copied
# eventually delete movement which do not exist anylonger
existing_uid_list = []
existing_uid_list_append = existing_uid_list.append
movement_type_list = applied_rule.getPortalMovementTypeList()
order_movement_type_list = \
applied_rule.getPortalOrderMovementTypeList()
# Calculate existing simulation movement to delete
for movement in applied_rule.contentValues(
filter={'portal_type': movement_type_list}):
order_value = movement.getOrderValue(\
portal_type=order_movement_type_list)
if (order_value is None) or\
(order_value.hasCellContent()):
# XXX Make sure this is not deleted if already in delivery
applied_rule._delObject(movement.getId())
else:
existing_uid_list_append(order_value.getUid())
# Build simulation movement if necessary
for order_movement in my_order.getMovementList():
try:
if order_movement.getUid() not in existing_uid_list:
# Generate a nicer ID
if order_movement.getParentUid() ==\
order_movement.getExplanationUid():
# We are on a line
new_id = order_movement.getId()
else:
# On a cell
new_id = "%s_%s" % (order_movement.getParentId(),
order_movement.getId())
# Generate the simulation movement
# Source, Destination, Quantity, Date, etc. are
# acquired from the order and need not to be copied.
new_sim_mvt = applied_rule.newContent(
portal_type=delivery_line_type,
id=new_id,
order_value=order_movement,
delivery_ratio=1,
deliverable=1,
**kw)
# No acquisition on variation_category_list
# in this case to prevent user failure
except AttributeError:
LOG('ERP5: WARNING', 0,\
'AttributeError during expand on order movement %s'\
% order_movement.absolute_url())
# Now we can set the last expand simulation state
# to the current state
applied_rule.setLastExpandSimulationState(\
my_order.getSimulationState())
# Pass to base class
Rule.expand(self, applied_rule, force=force, **kw)
security.declareProtected(Permissions.AccessContentsInformation,
'isDivergent')
def isDivergent(self, movement):
"""
Checks that the movement is divergent
"""
return Rule.isDivergent(self, movement)
......@@ -64,8 +64,7 @@ class PaymentRule(Rule):
, PropertySheet.DublinCore
)
security.declareProtected(Permissions.AccessContentsInformation, 'test')
def test(self, movement):
def _test(self, movement):
"""
Tests if the rule (still) applies
"""
......
......@@ -32,147 +32,356 @@ from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5.Document.Predicate import Predicate
from Acquisition import aq_base, aq_parent, aq_inner, aq_acquire
from zLOG import LOG, WARNING
class Rule(XMLObject, Predicate):
"""
Rule objects implement the simulation algorithm
(expand, solve)
Example of rules
- Stock rule (checks stocks)
- Order rule (copies movements from an order)
- Capacity rule (makes sure stocks / sources are possible)
- Transformation rule (expands transformations)
- Template rule (creates submovements with a template system)
used in Invoice rule, Paysheet rule, etc.
Rules are called one by one at the global level (the rules folder)
and at the local level (applied rules in the simulation folder)
The simulation_tool includes rules which are parametrized by the sysadmin
The simulation_tool does the logics of checking, calling, etc.
simulation_tool is a subclass of Folder & Tool
"""
# CMF Type Definition
meta_type = 'ERP5 Rule'
portal_type = 'Rule'
add_permission = Permissions.AddPortalContent
isPortalContent = 1
isRADContent = 1
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
__implements__ = ( Interface.Predicate,
Interface.Rule )
# Default Properties
property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject
, PropertySheet.CategoryCore
, PropertySheet.DublinCore
)
# Portal Type of created children
movement_type = 'Simulation Movement'
security.declareProtected(Permissions.AccessContentsInformation,
'isAccountable')
def isAccountable(self, movement):
"""Tells wether generated movement needs to be accounted or not.
Only account movements which are not associated to a delivery;
Whenever delivery is there, delivery has priority
"""
Rule objects implement the simulation algorithm
(expand, solve)
return movement.getDeliveryValue() is None
Example of rules
security.declareProtected(Permissions.ModifyPortalContent,
'constructNewAppliedRule')
def constructNewAppliedRule(self, context, id=None,
activate_kw=None, **kw):
"""
Creates a new applied rule which points to self
"""
# XXX Parameter **kw is useless, so, we should remove it
portal_types = getToolByName(self, 'portal_types')
if id is None:
id = context.generateNewId()
if getattr(aq_base(context), id, None) is None:
context.newContent(id=id,
portal_type='Applied Rule',
specialise_value=self,
activate_kw=activate_kw)
return context.get(id)
# Simulation workflow
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule, **kw):
"""
Expands the current movement downward.
- Stock rule (checks stocks)
An applied rule can be expanded only if its parent movement
is expanded.
"""
for o in applied_rule.objectValues():
o.expand(**kw)
security.declareProtected(Permissions.ModifyPortalContent, 'solve')
def solve(self, applied_rule, solution_list):
"""
Solve inconsistency according to a certain number of solutions
templates. This updates the
- Order rule (copies movements from an order)
-> new status -> solved
- Capacity rule (makes sure stocks / sources are possible)
This applies a solution to an applied rule. Once
the solution is applied, the parent movement is checked.
If it does not diverge, the rule is reexpanded. If not,
diverge is called on the parent movement.
"""
- Transformation rule (expands transformations)
def test(self, movement):
"""
Tests if the rule (still) applies
First try to call a python script, then call the _test method defined in
the class
- Template rule (creates submovements with a template system)
used in Invoice rule, Paysheet rule, etc.
This method should not be overriden by Rules.
"""
method = self._getTypeBasedMethod('test')
if method is not None:
return method(movement)
return self._test(movement)
Rules are called one by one at the global level (the rules folder)
and at the local level (applied rules in the simulation folder)
def _test(self, movement):
"""
Default behaviour of Rule.test, used when no test method for the rule
was defined
The simulation_tool includes rules which are parametrized by the sysadmin
The simulation_tool does the logics of checking, calling, etc.
This method should be overriden by Rules if another default behaviour is
wanted.
"""
return 0
simulation_tool is a subclass of Folder & Tool
security.declareProtected(Permissions.ModifyPortalContent, 'diverge')
def diverge(self, applied_rule):
"""
-> new status -> diverged
# CMF Type Definition
meta_type = 'ERP5 Rule'
portal_type = 'Rule'
add_permission = Permissions.AddPortalContent
isPortalContent = 1
isRADContent = 1
This basically sets the rule to "diverged"
and blocks expansion process
"""
pass
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
# Solvers
security.declareProtected( Permissions.AccessContentsInformation,
'isDivergent')
def isDivergent(self, movement, ignore_list=[]):
"""
Returns true if the Simulation Movement is divergent comparing to
the delivery value
"""
delivery = movement.getDeliveryValue()
if delivery is None:
return 0
__implements__ = ( Interface.Predicate,
Interface.Rule )
# Default Properties
property_sheets = ( PropertySheet.Base
, PropertySheet.XMLObject
, PropertySheet.CategoryCore
, PropertySheet.DublinCore
)
default_match_list = (
'source_section', 'destination_section', 'source',
'destination', 'resource', 'variation_category_list',
'aggregate_list', 'start_date', 'stop_date')
match_list = [x for x in default_match_list if x not in ignore_list]
for prop in match_list:
if movement.getProperty(prop) != delivery.getProperty(prop):
return 1
security.declareProtected(Permissions.AccessContentsInformation,
'isAccountable')
def isAccountable(self, movement):
"""Tells wether generated movement needs to be accounted or not.
Only account movements which are not associated to a delivery;
Whenever delivery is there, delivery has priority
"""
return movement.getDeliveryValue() is None
security.declareProtected(Permissions.ModifyPortalContent,
'constructNewAppliedRule')
def constructNewAppliedRule(self, context, id=None,activate_kw=None,**kw):
"""
Creates a new applied rule which points to self
"""
portal_types = getToolByName(self, 'portal_types')
if id is None:
id = context.generateNewId()
if getattr(aq_base(context), id, None) is None:
context.newContent(id=id,
portal_type='Applied Rule',
specialise_value=self,
activate_kw=activate_kw,
)
return context.get(id)
# Simulation workflow
security.declareProtected(Permissions.ModifyPortalContent, 'expand')
def expand(self, applied_rule, **kw):
"""
Expands the current movement downward.
An applied rule can be expanded only if its parent movement
is expanded.
"""
for o in applied_rule.objectValues():
o.expand(**kw)
security.declareProtected(Permissions.ModifyPortalContent, 'solve')
def solve(self, applied_rule, solution_list):
"""
Solve inconsistency according to a certain number of solutions
templates. This updates the
-> new status -> solved
This applies a solution to an applied rule. Once
the solution is applied, the parent movement is checked.
If it does not diverge, the rule is reexpanded. If not,
diverge is called on the parent movement.
"""
def test(self, movement):
"""
Tests if the rule (still) applies to a movement
"""
return 0
d_quantity = delivery.getQuantity()
quantity = movement.getCorrectedQuantity()
d_error = movement.getDeliveryError()
if quantity is None:
if d_quantity is None:
return 0
return 1
if d_quantity is None:
d_quantity = 0
if d_error is None:
d_error = 0
delivery_ratio = movement.getDeliveryRatio()
# if the delivery_ratio is None, make sure that we are
# divergent even if the delivery quantity is 0
if delivery_ratio is not None:
d_quantity *= delivery_ratio
if delivery_ratio == 0 and quantity > 0:
return 1
if d_quantity != quantity + d_error:
return 1
return 0
security.declareProtected(Permissions.ModifyPortalContent, 'diverge')
def diverge(self, applied_rule):
"""
-> new status -> diverged
This basically sets the rule to "diverged"
and blocks expansion process
"""
# Solvers
security.declareProtected(Permissions.View, 'isDivergent')
def isDivergent(self, applied_rule):
"""
Returns 1 if divergent rule
"""
security.declareProtected(Permissions.View, 'getDivergenceList')
def getDivergenceList(self, applied_rule):
"""
Returns a list Divergence descriptors
"""
security.declareProtected(Permissions.View, 'getSolverList')
def getSolverList(self, applied_rule):
"""
Returns a list Divergence solvers
"""
# Deliverability / orderability
def isOrderable(self, m):
return 0
# security.declareProtected(Permissions.View, 'getDivergenceList')
# def getDivergenceList(self, applied_rule):
# """
# Returns a list Divergence descriptors
# """
#
# security.declareProtected(Permissions.View, 'getSolverList')
# def getSolverList(self, applied_rule):
# """
# Returns a list Divergence solvers
# """
# Deliverability / orderability
def isOrderable(self, movement):
return 0
def isDeliverable(self, movement):
return 0
def isDeliverable(self, m):
def isStable(self, applied_rule, **kw):
"""
- generate a list of previsions
- compare the prevision with existing children
- return 1 if they match, 0 else
"""
list = self._getCompensatedMovementList(applied_rule, **kw)
for e in list:
if len(e) > 0:
return 0
return 1
#### Helpers
def _isTreeDelivered(self, movement_list, ignore_first=0):
"""
returns 1 if the movement or any of its child is linked to a delivery
"""
child_movement_list = []
for movement in movement_list:
if not ignore_first and len(movement.getDeliveryList()) > 0:
return 1
else:
for applied_rule in movement.objectValues():
child_movement_list = applied_rule.objectValues()
if len(child_movement_list) == 0:
return 0
return self._isTreeDelivered(child_movement_list)
def _getCurrentMovementList(self, applied_rule, **kw):
"""
Returns the list of current children of the applied rule, sorted in 3
groups : immutables/mutables/deletable
If a movement is not frozen, and has no delivered child, it can be
deleted.
Else, if a movement is not frozen, and has some delivered child, it can
be modified.
Else, it cannot be modified.
- is delivered
- has delivered childs (including self)
- is in reserved or current state
- is frozen
a movement is deletable if it has no delivered child, is not in current
state, and not in delivery movements.
a movement
"""
immutable_movement_list = []
mutable_movement_list = []
deletable_movement_list = []
for movement in applied_rule.contentValues(portal_type=self.movement_type):
if movement.isFrozen():
immutable_movement_list.append(movement)
else:
if self._isTreeDelivered([movement]):
mutable_movement_list.append(movement)
else:
deletable_movement_list.append(movement)
return (immutable_movement_list, mutable_movement_list,
deletable_movement_list)
def _getCompensatedMovementList(self, applied_rule,
matching_property_list=['resource'], **kw):
"""
Compute the difference between prevision and existing movements
immutable movements need compensation, mutables needs to be modified
XXX For now, this implementation is too simple. It could be improved by
using MovementGroups
"""
add_list = [] # list of movements to be added
modify_dict = {} # dict of movements to be modified
delete_list = [] # list of movements to be deleted
prevision_list = self._generatePrevisionList(applied_rule, **kw)
immutable_movement_list, mutable_movement_list, \
deletable_movement_list = self._getCurrentMovementList(applied_rule,
**kw)
movement_list = immutable_movement_list + mutable_movement_list \
+ deletable_movement_list
non_matched_list = movement_list[:] # list of remaining movements
for prevision in prevision_list:
p_matched_list = []
for movement in non_matched_list:
for prop in matching_property_list:
if prevision.get(prop) != movement.getProperty(prop):
break
else:
p_matched_list.append(movement)
# XXX hardcoded ...
LOG("Rule, _getCompensatedMovementList", WARNING,
"Hardcoded properties check")
# Movements exist, we'll try to make them match the prevision
if p_matched_list != []:
# Check the quantity
m_quantity = 0.0
for movement in p_matched_list:
m_quantity += movement.getQuantity()#getCorrectedQuantity()
if m_quantity != prevision.get('quantity'):
q_diff = prevision.get('quantity') - m_quantity
# try to find a movement that can be edited
for movement in p_matched_list:
if movement in (mutable_movement_list \
+ deletable_movement_list):
# mark as requiring modification
prop_dict = modify_dict.setdefault(movement.getId(), {})
#prop_dict['quantity'] = movement.getCorrectedQuantity() + \
prop_dict['quantity'] = movement.getQuantity() + \
q_diff
break
# no modifiable movement was found, need to create one
else:
prevision['quantity'] = q_diff
add_list.append(prevision)
# Check the date
for movement in p_matched_list:
if movement in (mutable_movement_list \
+ deletable_movement_list):
for prop in ('start_date', 'stop_date'):
#XXX should be >= 15
if prevision.get(prop) != movement.getProperty(prop):
prop_dict = modify_dict.setdefault(movement.getId(), {})
prop_dict[prop] = prevision.get(prop)
break
# update movement lists
for movement in p_matched_list:
non_matched_list.remove(movement)
# No movement matched, we need to create one
else:
add_list.append(prevision)
# delete non matched movements
for movement in non_matched_list:
if movement in deletable_movement_list:
# delete movement
delete_list.append(movement.getId())
elif movement in mutable_movement_list:
# set movement quantity to 0 to make it "void"
prop_dict = modify_dict.setdefault(movement.getId(), {})
prop_dict['quantity'] = 0.0
else:
# movement not modifiable, we can decide to create a compensation
# with negative quantity
raise OperationalError, "Not Implemented"
return (add_list, modify_dict, delete_list)
......@@ -29,10 +29,8 @@
from Globals import InitializeClass
from AccessControl import ClassSecurityInfo
from Products.CMFCore.utils import getToolByName
from Products.CMFCore.WorkflowCore import WorkflowMethod
from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface
from Products.ERP5.Core import MetaNode, MetaResource
from Products.ERP5.Document.Movement import Movement
......@@ -114,11 +112,11 @@ class SimulationMovement(Movement):
, PropertySheet.AppliedRule
, PropertySheet.ItemAggregation
)
def tpValues(self) :
""" show the content in the left pane of the ZMI """
return self.objectValues()
# Price should be acquired
security.declareProtected( Permissions.AccessContentsInformation,
'getPrice')
......@@ -221,9 +219,9 @@ class SimulationMovement(Movement):
# reexpanded each time, because the rule apply only if predicates
# are true, then this kind of rule must always be tested. Currently,
# we know that invoicing rule acts like this, and that it comes after
# invoice or invoicing_rule, so we if we come from invoince rule or
# invoice or invoicing_rule, so we if we come from invoince rule or
# invoicing rule, we always expand regardless of the causality state.
if ((self.getParentValue().getSpecialiseId() not in
if ((self.getParentValue().getSpecialiseId() not in
('default_invoicing_rule', 'default_invoice_rule')
and self.getCausalityState() == 'expanded' ) or \
len(self.objectIds()) != 0):
......@@ -250,8 +248,6 @@ class SimulationMovement(Movement):
"""
self.setCausalityState('diverged')
#diverge = WorkflowMethod(diverge) USELESS NOW
security.declareProtected( Permissions.AccessContentsInformation,
'getExplanation')
def getExplanation(self):
......@@ -271,7 +267,7 @@ class SimulationMovement(Movement):
explanation_value = self.getExplanationValue()
if explanation_value is not None :
return explanation_value.getUid()
security.declareProtected( Permissions.AccessContentsInformation,
'getExplanationValue')
def getExplanationValue(self):
......@@ -295,15 +291,6 @@ class SimulationMovement(Movement):
if explanation_value != self.getPortalObject():
return explanation_value
def isFrozen(self):
"""
A frozen simulation movement can not change its target anylonger
Also, once a movement is frozen, we do not calculate anylonger
its direct consequences. (ex. we do not calculate again a transformation)
"""
return 0
# Deliverability / orderability
security.declareProtected( Permissions.AccessContentsInformation,
'isOrderable')
......@@ -327,14 +314,14 @@ class SimulationMovement(Movement):
getDeliverable = isDeliverable
# Simulation Dates - acquire target dates
# Simulation Dates - acquire target dates
security.declareProtected( Permissions.AccessContentsInformation,
'getOrderStartDate')
def getOrderStartDate(self):
order_value = self.getOrderValue()
if order_value is not None:
return order_value.getStartDate()
security.declareProtected( Permissions.AccessContentsInformation,
'getOrderStopDate')
def getOrderStopDate(self):
......@@ -353,7 +340,7 @@ class SimulationMovement(Movement):
if delivery_movement is not None:
start_date_list.append(delivery_movement.getStartDate())
return start_date_list
security.declareProtected( Permissions.AccessContentsInformation,
'getDeliveryStopDateList')
def getDeliveryStopDateList(self):
......@@ -365,7 +352,7 @@ class SimulationMovement(Movement):
if delivery_movement is not None:
stop_date_list.append(delivery_movement.getStopDate())
return stop_date_list
security.declareProtected( Permissions.AccessContentsInformation,
'getDeliveryQuantity')
def getDeliveryQuantity(self):
......@@ -377,75 +364,41 @@ class SimulationMovement(Movement):
if delivery_movement is not None:
quantity = delivery_movement.getQuantity()
return quantity
security.declareProtected( Permissions.AccessContentsInformation,
'isConvergent')
def isConvergent(self):
"""
Returns true if the Simulation Movement is convergent comparing to
Returns true if the Simulation Movement is convergent with the
the delivery value
"""
return not self.isDivergent()
security.declareProtected( Permissions.AccessContentsInformation,
'isDivergent')
'isDivergent')
def isDivergent(self):
"""
Returns true if the Simulation Movement is divergent comparing to
Returns true if the Simulation Movement is divergent from the
the delivery value
"""
delivery = self.getDeliveryValue()
if delivery is None:
return 0
# XXX Those properties are the same than defined in DeliveryBuilder.
# We need to defined it only 1 time.
#LOG('SimulationMovement.isDivergent',0,delivery.getPath())
#LOG('SimulationMovement.isDivergent self.getStartDate()',0,self.getStartDate())
#LOG('SimulationMovement.isDivergent delivery.getStartDate()',0,delivery.getStartDate())
if self.getSourceSection() != delivery.getSourceSection() or \
self.getDestinationSection() != delivery.getDestinationSection() or \
self.getSource() != delivery.getSource() or \
self.getDestination() != delivery.getDestination() or \
self.getResource() != delivery.getResource() or \
self.getVariationCategoryList() != delivery.getVariationCategoryList()\
or \
self.getAggregateList() != delivery.getAggregateList() or \
self.getStartDate() != delivery.getStartDate() or \
self.getStopDate() != delivery.getStopDate():
# for method in ["getSourceSection",
# "getDestinationSection",
# "getSource",
# "getDestination",
# "getResource",
# "getVariationCategoryList",
# "getStartDate",
# "getStopDate"]:
# LOG("SimulationMovement, isDivergent", 0,
# "method: %s, self: %s , delivery: %s" % \
# tuple([method]+[str(getattr(x,method)()) for x in (self, delivery)]))
return 1
d_quantity = delivery.getQuantity()
quantity = self.getCorrectedQuantity()
d_error = self.getDeliveryError()
if quantity is None:
if d_quantity is None:
return 0
return 1
if d_quantity is None:
d_quantity = 0
if d_error is None:
d_error = 0
delivery_ratio = self.getDeliveryRatio()
# if the delivery_ratio is None, make sure that we are
# divergent even if the delivery quantity is 0
if delivery_ratio is not None:
d_quantity *= delivery_ratio
if delivery_ratio == 0 and quantity >0:
return 1
if d_quantity != quantity + d_error:
return 1
return 0
return self.getParentValue().isDivergent(self)
security.declareProtected( Permissions.AccessContentsInformation,
'getDivergenceList')
def getDivergenceList(self):
"""
Returns detailed information about the divergence
"""
return self.getParentValue().getDivergenceList(self)
security.declareProtected( Permissions.AccessContentsInformation,
'getSolverList')
def getSolverList(self):
"""
Returns solvers that can fix the current divergence
"""
return self.getParentValue().getSolverList(self)
security.declareProtected( Permissions.ModifyPortalContent,
'setDefaultDeliveryProperties')
def setDefaultDeliveryProperties(self):
......@@ -505,10 +458,10 @@ class SimulationMovement(Movement):
"""
root_rule = self.getRootAppliedRule()
return root_rule.getCausalityValueList()
# XXX FIXME Use a interaction workflow instead
# XXX This behavior is now done by simulation_movement_interaction_workflow
# The call to activate() must be done after actual call to
# The call to activate() must be done after actual call to
# setDelivery() on the movement,
# but activate() must be called on the previous delivery...
#def _setDelivery(self, value):
......@@ -519,10 +472,10 @@ class SimulationMovement(Movement):
# if delivery_value is not None:
# LOG('delivery_value = ', 0, repr(delivery_value))
# activity = delivery_value.activate(
# activity='SQLQueue',
# activity='SQLQueue',
# after_path_and_method_id=(
# self.getPath(),
# ['immediateReindexObject',
# self.getPath(),
# ['immediateReindexObject',
# 'recursiveImmediateReindexObject']))
# activity.edit()
......@@ -62,8 +62,7 @@ class TransformationRule(Rule):
# Class variable
simulation_movement_portal_type = "Simulation Movement"
security.declareProtected(Permissions.AccessContentsInformation, 'test')
def test(self, movement):
def _test(self, movement):
"""
Tests if the rule (still) applies
"""
......
......@@ -145,8 +145,7 @@ class TransformationSourcingRule(Rule):
# Class variable
simulation_movement_portal_type = "Simulation Movement"
security.declareProtected(Permissions.AccessContentsInformation, 'test')
def test(self, movement):
def _test(self, movement):
"""
Tests if the rule (still) applies
"""
......
......@@ -39,6 +39,15 @@ class AppliedRule:
'description' : 'Contains the id of the simulation state when the '\
'object was last expanded (in order to avoid '\
'recalculation)',
'acquisition_base_category' : ( 'parent',),
'acquisition_portal_type' : ('Applied Rule', ),
'acquisition_copy_value' : 0,
'acquisition_mask_value' : 1,
'acquisition_accessor_id' : 'getLastExpandSimulationState',
'acquisition_depends' : None,
'alt_accessor_id' : ('getLastExpandSimulationState', ),
'type' : 'string',
'mode' : 'w' },
)
......
......@@ -71,3 +71,4 @@ class Task:
'mode' : 'w' },
)
_categories = ( 'source_project', 'destination_project' )
......@@ -31,9 +31,16 @@ from TargetSolver import TargetSolver
class CopyToTarget(TargetSolver):
"""
Copy values simulation movement as target. This is
only acceptable for root movements. The meaning of
this solver of other movements is far from certain.
This solver calculates the ratio between the new (delivery) and old
(simulation) quantity and applies this ratio to the simulation movement
and to its parent, until a stable one is found
XXX: This solver's name is not good, and it tries too many things.
Once the new isDivergent engine is implemented, this solver can be
splitted in smaller ones (one for profit and loss, one for backtracking)
Backtracking alone is not enough to solve completely, it must be used with
another solver (profit and loss, or creating a compensation branch ...)
"""
def _generateValueDeltaDict(self, simulation_movement):
"""
......@@ -94,10 +101,21 @@ class CopyToTarget(TargetSolver):
"""
Get parent movement, and its value delta dict.
"""
#XXX max_allowed_delta is the maximum number of days we want not to
# account as a divergence. It should be configurable through a Rule
max_allowed_delta = 15
applied_rule = simulation_movement.getParentValue()
parent_movement = applied_rule.getParentValue()
if parent_movement.getPortalType() != "Simulation Movement":
parent_movement = None
for date_delta in ('start_date_delta', 'stop_date_delta'):
if date_delta in value_delta_dict.keys():
if abs(value_delta_dict[date_delta]) <= \
applied_rule.getProperty('max_allowed_delta', max_allowed_delta):
value_delta_dict.pop(date_delta)
return parent_movement, value_delta_dict
def _recursivelySolve(self, simulation_movement, is_last_movement=1, **value_delta_dict):
......@@ -106,12 +124,34 @@ class CopyToTarget(TargetSolver):
his parent movement.
"""
value_dict = self._generateValueDict(simulation_movement, **value_delta_dict)
simulation_movement.edit(**value_dict)
if is_last_movement:
delivery_quantity = simulation_movement.getDeliveryValue().getQuantity()
simulation_movement.setDeliveryError(delivery_quantity - value_dict['quantity'])
parent_movement, parent_value_delta_dict = \
self._getParentParameters(simulation_movement, **value_delta_dict)
if parent_movement is not None:
# Modify the parent movement
self._recursivelySolve(parent_movement, is_last_movement=0, **parent_value_delta_dict)
if parent_movement is not None and parent_movement.isFrozen():
# If backtraxcking is not possible, we have to make sure that the
# divergence is solved locally by using profit and loss
sm_quantity = simulation_movement.getQuantity()
delivery_quantity = \
simulation_movement.getDeliveryValue().getQuantity()
# simulation_movement.edit(
# profit_quantity=sm_quantity - delivery_quantity)
else:
# fix foating point rounding error
if is_last_movement:
delivery_quantity = \
simulation_movement.getDeliveryValue().getQuantity()
simulation_movement.setDeliveryError(delivery_quantity -
value_dict['quantity'])
delivery = simulation_movement.getDeliveryValue()
simulation_movement.setDestination(delivery.getDestination())
simulation_movement.setSource(delivery.getSource())
simulation_movement.setDestinationSection(delivery.getDestinationSection())
simulation_movement.setSourceSection(delivery.getSourceSection())
simulation_movement.edit(**value_dict)
if parent_movement is not None:
# backtrack to the parent movement only if it is not frozen
self._recursivelySolve(parent_movement, is_last_movement=0,
**parent_value_delta_dict)
......@@ -76,6 +76,10 @@ class SplitAndDefer(CopyToTarget):
**self.additional_parameters
)
new_movement.activate(**self.additional_parameters).expand()
# adopt new quantity on original simulation movement
simulation_movement.edit(quantity=new_movement_quantity)
simulation_movement._v_activate_kw = self.activate_kw
simulation_movement.activate(**self.additional_parameters).expand()
CopyToTarget.solve(self, simulation_movement)
# SplitAndDefer solves the divergence at the current level, no need to
# backtrack.
......@@ -1266,6 +1266,7 @@ class TestAccountingRules(TestAccountingRulesMixin, ERP5TypeTestCase):
invoice_transaction_line.getSourceCredit())
self.assertEquals(simulation_movement.getSourceDebit(),
invoice_transaction_line.getSourceDebit())
self.assertEquals(simulation_movement.getDelivery(),
invoice_transaction_line.getRelativeUrl())
self.assert_(len(simulation_movement_found.keys()), 3)
......
......@@ -31,8 +31,8 @@
TODO:
* check empty related Delivery Rule
* check divergence
* check divergence
"""
from random import randint
......@@ -65,7 +65,7 @@ class TestInvoice(TestPackingListMixin,
TestAccountingRulesMixin,
ERP5TypeTestCase):
"""Test invoice are created from orders then packing lists. """
RUN_ALL_TESTS = 1
default_region = "europe/west/france"
......@@ -75,16 +75,20 @@ class TestInvoice(TestPackingListMixin,
customer_gap = 'fr/pcg/4/41/411'
# (account_id, account_gap)
#XXX gap for the last 3 should be set to real values
account_definition_list = (
('receivable_vat', vat_gap),
('sale', sale_gap),
('customer', customer_gap),
('refundable_vat', vat_gap),
('purchase', sale_gap),
('supplier', customer_gap),
)
# (line_id, source_account_id, line_quantity)
# (line_id, source_account_id, destination_account_id, line_quantity)
transaction_line_definition_list = (
('income', 'sale', 1.0),
('receivable', 'customer', -1.0 - vat_rate),
('collected_vat', 'receivable_vat', vat_rate),
('income', 'sale', 'purchase', 1.0),
('receivable', 'customer', 'supplier', -1.0 - vat_rate),
('collected_vat', 'receivable_vat', 'refundable_vat', vat_rate),
)
invoice_portal_type = 'Sale Invoice Transaction'
......@@ -94,14 +98,14 @@ class TestInvoice(TestPackingListMixin,
def getTitle(self):
return "Invoices"
def login(self, quiet=0, run=1):
uf = self.getPortal().acl_users
uf._doAddUser('alex', '', ['Manager', 'Assignee', 'Assignor',
'Associate', 'Auditor', 'Author'], [])
user = uf.getUserById('alex').__of__(uf)
newSecurityManager(None, user)
def createCategories(self):
"""Create the categories for our test. """
TestPackingListMixin.createCategories(self)
......@@ -122,7 +126,7 @@ class TestInvoice(TestPackingListMixin,
self.assertNotEquals(None,
self.getCategoryTool().restrictedTraverse(cat_string),
cat_string)
def getNeededCategoryList(self):
"""return a list of categories that should be created."""
return ('region/%s' % self.default_region,
......@@ -130,7 +134,7 @@ class TestInvoice(TestPackingListMixin,
'gap/%s' % self.sale_gap,
'gap/%s' % self.customer_gap,
)
def getBusinessTemplateList(self):
""" """
return TestPackingListMixin.getBusinessTemplateList(self) + (
......@@ -152,7 +156,7 @@ class TestInvoice(TestPackingListMixin,
client2 = sequence.get('organisation3')
self.assertEquals(client2.getRegionValue(), None)
sequence.edit(client2=client2)
def stepCreateCurrency(self, sequence, **kw) :
"""Create a default currency. """
currency_module = self.getCurrencyModule()
......@@ -162,18 +166,18 @@ class TestInvoice(TestPackingListMixin,
id = "EUR" )
currency = currency_module.objectValues(id='EUR')[0]
sequence.edit(currency = currency)
def stepSetOrderPriceCurrency(self, sequence, **kw) :
"""Set the price currency of the order.
This step is not necessary.
"""Set the price currency of the order.
This step is not necessary.
TODO : - include a test without this step.
- include a test with this step late.
"""
currency = sequence.get('currency')
order = sequence.get('order')
order.setPriceCurrency(currency.getRelativeUrl())
def stepCreateSaleInvoiceTransactionRule(self, sequence, **kw) :
"""Create the rule for accounting. """
......@@ -213,12 +217,13 @@ class TestInvoice(TestPackingListMixin,
self.assertEquals(len(cell_list),1)
cell = cell_list[0]
for line_id, line_source_id, line_ratio in \
for line_id, line_source_id, line_destination_id, line_ratio in \
self.transaction_line_definition_list:
line = cell.newContent(id=line_id,
portal_type=self.sale_invoice_transaction_portal_type)
line.setQuantity(line_ratio)
line.setSourceValue(account_module[line_source_id])
line.setDestinationValue(account_module[line_destination_id])
def modifyPackingListState(self, transition_name,
sequence,packing_list=None):
......@@ -227,7 +232,7 @@ class TestInvoice(TestPackingListMixin,
packing_list = sequence.get('packing_list')
packing_list.portal_workflow.doActionFor(packing_list,
transition_name, wf_id='packing_list_workflow')
def stepSetReadyPackingList(self, sequence=None, sequence_list=None, **kw):
""" set the Packing List as Ready. This must build the invoice. """
self.modifyPackingListState('set_ready_action', sequence=sequence)
......@@ -246,30 +251,103 @@ class TestInvoice(TestPackingListMixin,
self.modifyPackingListState('start_action', sequence=sequence)
packing_list = sequence.get('packing_list')
self.assertEquals(packing_list.getSimulationState(), 'started')
def stepStartNewPackingList(self, sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('new_packing_list')
self.modifyPackingListState('start_action', sequence=sequence,
packing_list=packing_list)
self.assertEquals(packing_list.getSimulationState(), 'started')
def stepStopPackingList(self, sequence=None, sequence_list=None, **kw):
self.modifyPackingListState('stop_action', sequence=sequence)
packing_list = sequence.get('packing_list')
self.assertEquals(packing_list.getSimulationState(), 'stopped')
def stepDeliverPackingList(self, sequence=None, sequence_list=None, **kw):
self.modifyPackingListState('deliver_action', sequence=sequence)
packing_list = sequence.get('packing_list')
self.assertEquals(packing_list.getSimulationState(), 'delivered')
def stepCancelPackingList(self, sequence=None, sequence_list=None, **kw):
self.modifyPackingListState('cancel_action', sequence=sequence)
packing_list = sequence.get('packing_list')
self.assertEquals(packing_list.getSimulationState(), 'cancelled')
def stepPackingListSimulation(self, sequence=None, sequence_list=None, **kw):
def modifyInvoiceState(self, transition_name,
sequence,invoice=None):
""" calls the workflow for the invoice """
if invoice is None:
invoice = sequence.get('invoice')
invoice.portal_workflow.doActionFor(invoice,
transition_name, wf_id='accounting_workflow')
def stepConfirmInvoice(self, sequence=None, sequence_list=None, **kw):
""" set the Packing List as Ready. This must build the invoice. """
self.modifyInvoiceState('confirm_action', sequence=sequence)
invoice = sequence.get('invoice')
self.assertEquals(invoice.getSimulationState(), 'confirmed')
def stepSetReadyInvoice(self, sequence=None, sequence_list=None, **kw):
""" set the Packing List as Ready. This must build the invoice. """
self.modifyInvoiceState('set_ready_action', sequence=sequence)
invoice = sequence.get('invoice')
self.assertEquals(invoice.getSimulationState(), 'ready')
def stepSetReadyNewInvoice(self, sequence=None,
sequence_list=None, **kw):
""" set the Packing List as Ready. This must build the invoice. """
invoice = sequence.get('new_invoice')
self.modifyInvoiceState('set_ready_action', sequence=sequence,
invoice=invoice)
self.assertEquals(invoice.getSimulationState(), 'ready')
def stepStartInvoice(self, sequence=None, sequence_list=None, **kw):
self.modifyInvoiceState('start_action', sequence=sequence)
invoice = sequence.get('invoice')
self.assertEquals(invoice.getSimulationState(), 'started')
def stepStartNewInvoice(self, sequence=None, sequence_list=None, **kw):
invoice = sequence.get('new_invoice')
self.modifyInvoiceState('start_action', sequence=sequence,
invoice=invoice)
self.assertEquals(invoice.getSimulationState(), 'started')
def stepStopInvoice(self, sequence=None, sequence_list=None, **kw):
self.modifyInvoiceState('stop_action', sequence=sequence)
invoice = sequence.get('invoice')
self.assertEquals(invoice.getSimulationState(), 'stopped')
def stepDeliverInvoice(self, sequence=None, sequence_list=None, **kw):
self.modifyInvoiceState('deliver_action', sequence=sequence)
invoice = sequence.get('invoice')
self.assertEquals(invoice.getSimulationState(), 'delivered')
def stepCancelInvoice(self, sequence=None, sequence_list=None, **kw):
self.modifyInvoiceState('cancel_action', sequence=sequence)
invoice = sequence.get('invoice')
self.assertEquals(invoice.getSimulationState(), 'cancelled')
def stepSwitchPackingLists(self, sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('packing_list')
new_packing_list = sequence.get('new_packing_list')
#invoice = new_packing_list.getDefaultCausalityRelatedValue(
#portal_type=self.invoice_portal_type)
sequence.edit(packing_list=new_packing_list,
new_packing_list=packing_list)#, invoice=invoice)
def stepSwitchInvoices(self, sequence=None, sequence_list=None, **kw):
invoice = sequence.get('invoice')
new_invoice = sequence.get('new_invoice')
sequence.edit(invoice=new_invoice, new_invoice=invoice)
def stepCheckPackingListSimulation(self, sequence=None, sequence_list=None, **kw):
""" checks that simulation movements related to the packing list are OK """
packing_list = sequence.get('packing_list')
order = sequence.get('order')
order_root_applied_rule = order.getCausalityRelatedValueList(
portal_type = 'Applied Rule')[0]
# check simulation movements from this packing list
for movement in packing_list.getMovementList() :
simulation_movement_list = movement.getOrderRelatedValueList()
......@@ -282,12 +360,12 @@ class TestInvoice(TestPackingListMixin,
self.assertEquals( simulation_movement.getRootAppliedRule(),
order_root_applied_rule)
self.assertEquals(total_quantity, movement.getQuantity())
def stepCheckInvoiceBuilding(self, sequence=None, sequence_list=None, **kw):
""" checks that the invoice is built with the default_invoice_builder """
"""
checks that the invoice is built with the default_invoice_builder
"""
packing_list = sequence.get('packing_list')
related_applied_rule_list = packing_list.getCausalityRelatedValueList(
portal_type=self.applied_rule_portal_type)
related_invoice_list = packing_list.getCausalityRelatedValueList(
portal_type=self.sale_invoice_transaction_portal_type)
......@@ -302,7 +380,7 @@ class TestInvoice(TestPackingListMixin,
self.failUnless(invoice is not None)
# Invoices created by Delivery Builder are in planned state
self.assertEquals(invoice.getSimulationState(), 'planned')
# Get the list of simulation movements of packing list ...
packing_list_simulation_movement_list = []
for packing_list_movement in packing_list.getMovementList():
......@@ -313,7 +391,7 @@ class TestInvoice(TestPackingListMixin,
for p_l_simulation_movement in packing_list_simulation_movement_list :
for applied_rule in p_l_simulation_movement.objectValues() :
simulation_movement_list.extend(applied_rule.objectValues())
# First, test if each Simulation Movement is related to an
# Invoice Movement
invoice_relative_url = invoice.getRelativeUrl()
......@@ -360,12 +438,12 @@ class TestInvoice(TestPackingListMixin,
self.assertEquals(total_price / quantity, invoice_movement.getPrice())
sequence.edit(invoice = invoice)
# Test causality
self.assertEquals(len(invoice.getCausalityValueList(
portal_type = self.packing_list_portal_type)), 1)
self.assertEquals(invoice.getCausalityValue(), packing_list)
# Finally, test getTotalQuantity and getTotalPrice on Invoice
self.assertEquals(packing_list.getTotalQuantity(),
invoice.getTotalQuantity())
......@@ -385,16 +463,16 @@ class TestInvoice(TestPackingListMixin,
destination_section = sequence.get('destination_section')
destination = sequence.get('destination')
product = sequence.get('product')
order_module = self.getSaleOrderModule()
order = order_module.newContent(portal_type='Sale Order')
order = order_module.newContent(portal_type=self.order_portal_type)
order.setStartDate(DateTime('2004-11-20'))
order.setStopDate(DateTime('2004-11-24'))
order.setDestinationValue(destination)
order.setDestinationSectionValue(destination_section)
order.setSourceValue(source)
order.setSourceSectionValue(source_section)
order_line = order.newContent(portal_type = 'Sale Order Line', id = '1')
order_line = order.newContent(portal_type=order_line_portal_type, id='1')
order_line.setResourceValue(product)
order_line.setQuantity(10)
order_line.setPrice(100)
......@@ -417,10 +495,11 @@ class TestInvoice(TestPackingListMixin,
self.assertEquals(len(order.getMovementList()),
sum([len(rule.objectIds()) for rule in rule_list]))
def stepCheckInvoicingRule(self, sequence=None, sequence_list=None, **kw):
""" Checks that the invoicing rule is applied and its values are
correct. """
"""
Checks that the invoicing rule is applied and its values are correct.
"""
order_rule_list = sequence.get('order_rule_list')
invoicing_rule_list = []
invoice_transaction_rule_list = []
......@@ -449,10 +528,10 @@ class TestInvoice(TestPackingListMixin,
self.assertTrue(simulation_movement.isConvergent())
# TODO: What is the invoice dates supposed to be ?
# is this done through profiles ?
self.assertEquals(simulation_movement.getStartDate(),
sequence.get('order').getStartDate())
self.assertEquals(simulation_movement.getStopDate(),
sequence.get('order').getStopDate())
#self.assertEquals(simulation_movement.getStartDate(),
# sequence.get('order').getStartDate())
#self.assertEquals(simulation_movement.getStopDate(),
# sequence.get('order').getStopDate())
sequence.edit(invoice_transaction_rule_list=invoice_transaction_rule_list)
def stepCheckInvoiceTransactionRule(self, sequence=None, sequence_list=None,
......@@ -466,18 +545,51 @@ class TestInvoice(TestPackingListMixin,
for invoice_transaction_rule in invoice_transaction_rule_list:
parent_movement = aq_parent(invoice_transaction_rule)
self.assertEquals(3, len(invoice_transaction_rule.objectValues()))
for line_id, line_source_id, line_ratio in \
for line_id, line_source_id, line_destination_id, line_ratio in \
self.transaction_line_definition_list:
movement = getattr(invoice_transaction_rule, line_id, None)
self.assertTrue(movement is not None)
self.assertEquals(movement.getQuantity(), parent_movement.getPrice() *
parent_movement.getQuantity() * line_ratio)
self.assertEquals(movement.getCorrectedQuantity(), parent_movement.getPrice() *
parent_movement.getCorrectedQuantity() * line_ratio)
self.assertEquals(movement.getSourceId(), line_source_id)
self.assertEquals(movement.getDestinationId(), line_destination_id)
self.assertEquals(movement.getStartDate(),
parent_movement.getStartDate())
self.assertEquals(movement.getStopDate(),
parent_movement.getStopDate())
def stepCheckInvoicesConsistency(self, sequence=None, sequence_list=None,
**kw):
"""
Checks that all invoices are consistent:
- transaction lines match invoice lines
- no movement is divergent
"""
invoice_list = self.getPortal()['accounting_module'].objectValues()
for invoice in invoice_list:
accounting_state_list = \
list(self.getPortal().getPortalCurrentInventoryStateList())
accounting_state_list.append('cancelled')
if invoice.getSimulationState() in accounting_state_list:
invoice_line_list = invoice.contentValues(
portal_type=self.invoice_line_portal_type)
invoice_transaction_line_list = invoice.contentValues(
portal_type=self.invoice_transaction_line_portal_type)
self.assertEquals(3, len(invoice_transaction_line_list))
expected_price = 0.0
for line in invoice_line_list:
expected_price += line.getTotalPrice()
for line_id, line_source, line_dest, line_ratio in \
self.transaction_line_definition_list:
for line in invoice.contentValues(
portal_type=self.invoice_transaction_line_portal_type):
if line.getSource() == 'account_module/%s' % line_source and \
line.getDestination() == 'account_module/%s' % line_dest:
break
else:
self.fail('No line found that matches %s' % line_id)
self.assertEquals(line.getQuantity(), expected_price * line_ratio)
def stepCheckDeliveryRuleForDeferred(
self, sequence=None, sequence_list=None, **kw):
""" Checks that a delivery rule has been created when we took 'split
......@@ -640,7 +752,7 @@ class TestInvoice(TestPackingListMixin,
new_list = [x for x in previous_list if x != 'DateMovementGroup']
new_list.append('ParentExplanationMovementGroup')
builder.setDeliveryCollectOrderList(new_list)
def stepEditInvoice(self, sequence=None, sequence_list=None, **kw):
"""Edit the current invoice, to trigger updateAppliedRule."""
invoice = sequence.get('invoice')
......@@ -668,7 +780,7 @@ class TestInvoice(TestPackingListMixin,
"""Edit the current packing list, to trigger updateAppliedRule."""
packing_list = sequence.get('packing_list')
packing_list.edit()
# call updateAppliedRule directly, don't rely on edit interactions
rule_id = 'default_delivery_rule'
self.failUnless(rule_id in
......@@ -686,7 +798,7 @@ class TestInvoice(TestPackingListMixin,
for delivery_mvt in packing_list.getMovementList():
self.assertEquals(len(delivery_mvt.getOrderRelatedValueList(
portal_type=self.simulation_movement_portal_type)), 0)
def stepDecreaseInvoiceLineQuantity(self, sequence=None, sequence_list=None,
**kw):
"""
......@@ -694,11 +806,40 @@ class TestInvoice(TestPackingListMixin,
"""
invoice = sequence.get('invoice')
quantity = sequence.get('line_quantity',default=self.default_quantity)
quantity = quantity -1
quantity = quantity - 1
sequence.edit(line_quantity=quantity)
for invoice_line in invoice.objectValues(
portal_type=self.invoice_line_portal_type):
invoice_line.edit(quantity=quantity)
sequence.edit(last_delta = sequence.get('last_delta', 0.0) - 1.0)
def stepIncreaseInvoiceLineQuantity(self, sequence=None, sequence_list=None,
**kw):
"""
Set a Increased quantity on invoice lines
"""
invoice = sequence.get('invoice')
quantity = sequence.get('line_quantity',default=self.default_quantity)
quantity = quantity + 1
sequence.edit(line_quantity=quantity)
for invoice_line in invoice.objectValues(
portal_type=self.invoice_line_portal_type):
invoice_line.edit(quantity=quantity)
sequence.edit(last_delta = sequence.get('last_delta', 0.0) + 1.0)
def stepSetInvoiceLineQuantityToZero(self, sequence=None, sequence_list=None,
**kw):
"""
Set the quantity on invoice lines to zero
"""
invoice = sequence.get('invoice')
#default_quantity = sequence.get('line_quantity',default_quantity)
quantity = 0.0
sequence.edit(line_quantity=quantity)
for invoice_line in invoice.objectValues(
portal_type=self.invoice_line_portal_type):
invoice_line.edit(quantity=quantity)
sequence.edit(last_delta = - self.default_quantity)
def stepChangeInvoiceStartDate(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -747,10 +888,10 @@ class TestInvoice(TestPackingListMixin,
invoice = sequence.get('invoice')
self.assertFalse(invoice.isDivergent())
def stepSplitAndDifferInvoice(self, sequence=None, sequence_list=None,
def stepSplitAndDeferInvoice(self, sequence=None, sequence_list=None,
**kw):
"""
split and differ at the invoice level
split and defer at the invoice level
"""
invoice = sequence.get('invoice')
invoice.portal_workflow.doActionFor(invoice,'split_prevision_action',
......@@ -800,9 +941,11 @@ class TestInvoice(TestPackingListMixin,
for invoice in invoice_list:
if invoice.getUid() == sequence.get('invoice').getUid():
invoice1 = invoice
last_delta = sequence.get('last_delta', 0.0)
for line in invoice1.objectValues(
portal_type=self.invoice_line_portal_type):
self.assertEquals(self.default_quantity, line.getQuantity())
self.assertEquals(self.default_quantity + last_delta,
line.getQuantity())
# default sequence for one line of not varianted resource.
PACKING_LIST_DEFAULT_SEQUENCE = """
......@@ -831,7 +974,7 @@ class TestInvoice(TestPackingListMixin,
stepTic
stepCheckPackingListIsPacked
"""
# default sequence for two lines of not varianted resource.
PACKING_LIST_TWO_LINES_DEFAULT_SEQUENCE = """
stepCreateSaleInvoiceTransactionRule
......@@ -865,7 +1008,7 @@ class TestInvoice(TestPackingListMixin,
stepTic
stepCheckPackingListIsPacked
"""
# default sequence for one line of not varianted resource.
TWO_PACKING_LIST_DEFAULT_SEQUENCE = """
stepCreateSaleInvoiceTransactionRule
......@@ -887,11 +1030,11 @@ class TestInvoice(TestPackingListMixin,
stepCheckOrderRule
stepCheckOrderSimulation
stepCheckDeliveryBuilding
stepDecreasePackingListLineQuantity
stepCheckPackingListIsCalculating
stepSplitAndDeferPackingList
stepTic
stepCheckPackingListIsSolved
stepDecreasePackingListLineQuantity
stepCheckPackingListIsCalculating
stepSplitAndDeferPackingList
stepTic
stepCheckPackingListIsSolved
stepCheckPackingListSplitted
stepAddPackingListContainer
stepAddPackingListContainerLine
......@@ -904,10 +1047,14 @@ class TestInvoice(TestPackingListMixin,
"""
def test_01_SimpleInvoice(self, quiet=0, run=RUN_ALL_TESTS):
"""Checks that a Simple Invoice is created from a Packing List"""
"""
Checks that a Simple Invoice is created from a Packing List
"""
if not run: return
self.logMessage('Simple Invoice')
sequence_list = SequenceList()
for base_sequence in (self.PACKING_LIST_DEFAULT_SEQUENCE, ) :
self.playSequence(
sequence_list.addSequenceString(
base_sequence +
"""
stepSetReadyPackingList
......@@ -917,10 +1064,13 @@ class TestInvoice(TestPackingListMixin,
stepTic
stepCheckInvoiceBuilding
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
""")
sequence_list.play(self)
def test_02_TwoInvoicesFromTwoPackingList(self, quiet=0, run=RUN_ALL_TESTS):
""" This test was created for the following bug:
"""
This test was created for the following bug:
- an order is created and confirmed
- the packing list is split
- the 2 packing list are delivered (at different date)
......@@ -930,8 +1080,10 @@ class TestInvoice(TestPackingListMixin,
and an invoice with no accounting rules. both invoices are wrong
"""
if not run: return
self.logMessage('Two Invoices from Two Packing List')
sequence_list = SequenceList()
for base_sequence in (self.TWO_PACKING_LIST_DEFAULT_SEQUENCE, ) :
self.playSequence(
sequence_list.addSequenceString(
base_sequence +
"""
stepSetReadyPackingList
......@@ -945,10 +1097,13 @@ class TestInvoice(TestPackingListMixin,
stepConfirmTwoInvoices
stepTic
stepCheckTwoInvoicesTransactionLines
stepCheckInvoicesConsistency
""")
sequence_list.play(self)
def test_03_InvoiceEditAndInvoiceRule(self, quiet=0, run=RUN_ALL_TESTS):
"""Invoice Rule should not be applied on invoice lines created from\
"""
Invoice Rule should not be applied on invoice lines created from\
Packing List.
We want to prevent this from happening:
......@@ -959,8 +1114,10 @@ class TestInvoice(TestPackingListMixin,
movements for this invoice are present twice in the simulation.
"""
if not run: return
self.logMessage('Invoice Edit')
sequence_list = SequenceList()
for base_sequence in (self.PACKING_LIST_DEFAULT_SEQUENCE, ) :
self.playSequence(
sequence_list.addSequenceString(
base_sequence +
"""
stepSetReadyPackingList
......@@ -971,27 +1128,38 @@ class TestInvoice(TestPackingListMixin,
stepCheckInvoiceBuilding
stepEditInvoice
stepCheckInvoiceRuleNotAppliedOnInvoiceEdit
stepCheckInvoicesConsistency
""")
sequence_list.play(self)
def test_04_PackingListEditAndInvoiceRule(self, quiet=0, run=RUN_ALL_TESTS):
"""Delivery Rule should not be applied on packing list lines created\
"""
Delivery Rule should not be applied on packing list lines created\
from Order.
"""
if not run: return
self.logMessage('Packing List Edit')
sequence_list = SequenceList()
for base_sequence in (self.PACKING_LIST_DEFAULT_SEQUENCE, ) :
self.playSequence(
sequence_list.addSequenceString(
base_sequence +
"""
stepEditPackingList
stepCheckDeliveryRuleNotAppliedOnPackingListEdit
stepCheckInvoicesConsistency
""")
sequence_list.play(self)
def test_05_InvoiceEditPackingListLine(self, quiet=0, run=RUN_ALL_TESTS):
"""Checks that editing a Packing List Line still creates a correct
Invoice"""
"""
Checks that editing a Packing List Line still creates a correct
Invoice
"""
if not run: return
self.logMessage('Packing List Line Edit')
sequence_list = SequenceList()
for base_sequence in (self.PACKING_LIST_DEFAULT_SEQUENCE, ) :
self.playSequence(
sequence_list.addSequenceString(
base_sequence +
"""
stepEditPackingListLine
......@@ -1002,15 +1170,21 @@ class TestInvoice(TestPackingListMixin,
stepTic
stepCheckInvoiceBuilding
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
""")
sequence_list.play(self)
def test_06_InvoiceDeletePackingListLine(self, quiet=0,
run=RUN_ALL_TESTS):
"""Checks that deleting a Packing List Line still creates a correct
Invoice"""
"""
Checks that deleting a Packing List Line still creates a correct
Invoice
"""
if not run: return
self.logMessage('Packing List Line Delete')
sequence_list = SequenceList()
for base_sequence in (self.PACKING_LIST_TWO_LINES_DEFAULT_SEQUENCE, ) :
self.playSequence(
sequence_list.addSequenceString(
base_sequence +
"""
stepDeletePackingListLine
......@@ -1021,15 +1195,21 @@ class TestInvoice(TestPackingListMixin,
stepTic
stepCheckInvoiceBuilding
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
""")
sequence_list.play(self)
def test_07_InvoiceAddPackingListLine(self, quiet=0, run=RUN_ALL_TESTS):
"""Checks that adding a Packing List Line still creates a correct
Invoice"""
"""
Checks that adding a Packing List Line still creates a correct
Invoice
"""
if not run: return
self.logMessage('Packing List Line Add')
sequence_list = SequenceList()
for base_sequence in (self.PACKING_LIST_DEFAULT_SEQUENCE,
self.PACKING_LIST_TWO_LINES_DEFAULT_SEQUENCE) :
self.playSequence(
sequence_list.addSequenceString(
base_sequence +
"""
stepAddPackingListLine
......@@ -1042,13 +1222,18 @@ class TestInvoice(TestPackingListMixin,
stepTic
stepCheckInvoiceBuilding
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
""")
sequence_list.play(self)
def test_08_InvoiceDecreaseQuantity(self, quiet=0, run=RUN_ALL_TESTS):
"""Change the quantity of a Invoice Line,
check that the packing list is divergent,
then split and differ"""
"""
Change the quantity of a Invoice Line,
check that the invoice is divergent,
then split and defer, and check everything is solved
"""
if not run: return
self.logMessage('Invoice Decrease Qantity')
sequence = self.PACKING_LIST_DEFAULT_SEQUENCE + \
"""
stepSetReadyPackingList
......@@ -1060,24 +1245,32 @@ class TestInvoice(TestPackingListMixin,
stepCheckInvoiceBuilding
stepDecreaseInvoiceLineQuantity
stepCheckInvoiceIsDivergent
stepCheckInvoiceIsCalculating
stepSplitAndDifferInvoice
stepSplitAndDeferInvoice
stepTic
stepCheckInvoiceIsNotDivergent
stepCheckInvoiceIsSolved
stepCheckInvoiceSplitted
stepCheckPackingListIsDivergent
stepCheckPackingListIsDiverged
stepCheckPackingListIsNotDivergent
stepCheckPackingListIsSolved
stepCheckInvoiceTransactionRule
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
"""
self.playSequence(sequence)
def test_09_InvoiceChangeStartDate(self, quiet=0, run=RUN_ALL_TESTS):
"""Change the start_date of a Invoice Line,
check that the packing list is divergent,
then acceptDecision"""
"""
Change the start_date of a Invoice Line,
check that the invoice is divergent,
then accept decision, and check everything is solved
"""
if not run: return
self.logMessage('Invoice Change Sart Date')
sequence = self.PACKING_LIST_DEFAULT_SEQUENCE + \
"""
stepSetReadyPackingList
......@@ -1089,20 +1282,422 @@ class TestInvoice(TestPackingListMixin,
stepCheckInvoiceBuilding
stepChangeInvoiceStartDate
stepCheckInvoiceIsDivergent
stepCheckInvoiceIsCalculating
stepAcceptDecisionInvoice
stepTic
stepCheckInvoiceNotSplitted
stepCheckInvoiceIsNotDivergent
stepCheckInvoiceIsSolved
stepCheckPackingListIsNotDivergent
stepCheckPackingListIsSolved
stepCheckInvoiceTransactionRule
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
"""
self.playSequence(sequence)
def test_10_AcceptDecisionOnPackingList(self, quiet=0, run=RUN_ALL_TESTS):
"""
- Increase or Decrease the quantity of a Packing List line
- Accept Decision on Packing List
- Packing List must not be divergent and use new quantity
- Invoice must not be divergent and use new quantity
"""
if not run: return
self.logMessage('InvoiceAcceptDecisionOnPackingList')
end_sequence = \
"""
stepSetContainerFullQuantity
stepCheckPackingListIsCalculating
stepAcceptDecisionPackingList
stepTic
stepCheckPackingListIsSolved
stepCheckPackingListNotSplitted
stepSetReadyPackingList
stepTic
stepStartPackingList
stepCheckInvoicingRule
stepCheckInvoiceTransactionRule
stepTic
stepCheckInvoiceBuilding
stepStopPackingList
stepTic
stepDeliverPackingList
stepTic
stepCheckPackingListIsNotDivergent
stepCheckPackingListIsSolved
stepCheckInvoiceTransactionRule
stepConfirmInvoice
stepTic
stepStopInvoice
stepTic
stepDeliverInvoice
stepTic
stepCheckInvoiceNotSplitted
stepCheckInvoiceIsNotDivergent
stepCheckInvoiceIsSolved
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
"""
mid_sequence_list = ["""
stepCheckInvoicingRule
stepDecreasePackingListLineQuantity
""", """
stepCheckInvoicingRule
stepIncreasePackingListLineQuantity
"""]
sequence_list = SequenceList()
for seq in mid_sequence_list:
sequence = self.PACKING_LIST_DEFAULT_SEQUENCE + \
seq + end_sequence
sequence_list.addSequenceString(sequence)
sequence_list.play(self)
def test_11_AcceptDecisionOnPackingListAndInvoice(self, quiet=0,
run=RUN_ALL_TESTS):
"""
- Increase or Decrease the quantity of a Packing List line
- Accept Decision on Packing List
- Packing List must not be divergent and use new quantity
- Put old quantity on Invoice
- Accept Decision on Invoice
- Packing List must not be divergent and use new quantity
- Invoice must not be divergent and use old quantity
"""
if not run: return
self.logMessage('InvoiceAcceptDecisionOnPackingListAndInvoice')
mid_sequence = \
"""
stepSetContainerFullQuantity
stepCheckPackingListIsCalculating
stepAcceptDecisionPackingList
stepTic
stepCheckPackingListIsSolved
stepCheckPackingListNotSplitted
stepSetReadyPackingList
stepTic
stepStartPackingList
stepCheckInvoicingRule
stepCheckInvoiceTransactionRule
stepTic
stepCheckInvoiceBuilding
stepStopPackingList
stepTic
stepDeliverPackingList
stepTic
stepCheckPackingListIsNotDivergent
stepCheckPackingListIsSolved
stepCheckInvoiceTransactionRule
"""
end_sequence = \
"""
stepCheckInvoiceIsDiverged
stepAcceptDecisionInvoice
stepTic
stepConfirmInvoice
stepTic
stepStopInvoice
stepTic
stepDeliverInvoice
stepTic
stepCheckPackingListIsNotDivergent
stepCheckPackingListIsSolved
stepCheckInvoiceTransactionRule
stepCheckInvoiceNotSplitted
stepCheckInvoiceIsNotDivergent
stepCheckInvoiceIsSolved
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
"""
mid_sequence_list = [("""
stepCheckInvoicingRule
stepDecreasePackingListLineQuantity
""", """
stepIncreaseInvoiceLineQuantity
stepTic
"""), ("""
stepCheckInvoicingRule
stepIncreasePackingListLineQuantity
""", """
stepDecreaseInvoiceLineQuantity
stepTic
""")]
sequence_list = SequenceList()
for seq1, seq2 in mid_sequence_list:
sequence = self.PACKING_LIST_DEFAULT_SEQUENCE + \
seq1 + mid_sequence + seq2 + end_sequence
sequence_list.addSequenceString(sequence)
sequence_list.play(self)
def test_12_SplitPackingListAndAcceptInvoice(self, quiet=0,
run=RUN_ALL_TESTS):
"""
- Decrease the quantity of a Packing List line
- Split and Defer on Packing List
- Packing List must not be divergent and use new quantity
- splitted Packing List must not be divergent and use old - new quantity
- Put old quantity on Invoice1
- Accept Decision on Invoice1
- Packing List must not be divergent and use new quantity
- splitted Packing List must not be divergent and use old - new quantity
- Invoice1 must not be divergent and use old quantity
- set Invoice2 quantity to 0
- Accept Decision on Invoice2
- Packing List must not be divergent and use new quantity
- splitted Packing List must not be divergent and use old - new quantity
- Invoice1 must not be divergent and use old quantity
- Invoice2 must not be divergent and use 0 as quantity
"""
if not run: return
self.logMessage('InvoiceSplitPackingListAndAcceptInvoice')
sequence = self.PACKING_LIST_DEFAULT_SEQUENCE + \
"""
stepCheckInvoicingRule
stepDecreasePackingListLineQuantity
stepSetContainerFullQuantity
stepCheckPackingListIsCalculating
stepSplitAndDeferPackingList
stepTic
stepCheckPackingListIsSolved
stepCheckPackingListSplitted
stepSetReadyPackingList
stepTic
stepStartPackingList
stepCheckInvoicingRule
stepCheckInvoiceTransactionRule
stepTic
stepCheckInvoiceBuilding
stepStopPackingList
stepTic
stepDeliverPackingList
stepTic
stepCheckPackingListIsSolved
stepCheckPackingListSplitted
stepIncreaseInvoiceLineQuantity
stepCheckInvoiceIsCalculating
stepAcceptDecisionInvoice
stepTic
stepConfirmInvoice
stepTic
stepStopInvoice
stepTic
stepDeliverInvoice
stepTic
stepCheckInvoiceIsSolved
stepCheckInvoiceNotSplitted
stepCheckPackingListIsDivergent
stepCheckPackingListIsDiverged
stepCheckSimulationStartDateUpdated
stepCheckPackingListIsNotDivergent
stepCheckPackingListIsSolved
stepCheckInvoiceTransactionRule
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
stepSwitchPackingLists
stepAddPackingListContainer
stepSetContainerFullQuantity
stepTic
stepCheckPackingListIsSolved
stepSetReadyPackingList
stepTic
stepStartPackingList
stepCheckInvoicingRule
stepCheckInvoiceTransactionRule
stepTic
stepCheckInvoiceBuilding
stepStopPackingList
stepTic
stepDeliverPackingList
stepTic
stepCheckPackingListIsSolved
stepSetInvoiceLineQuantityToZero
stepCheckInvoiceIsCalculating
stepAcceptDecisionInvoice
stepTic
stepConfirmInvoice
stepTic
stepStopInvoice
stepTic
stepDeliverInvoice
stepTic
stepCheckInvoiceIsSolved
stepCheckInvoiceNotSplitted
stepCheckPackingListIsNotDivergent
stepCheckPackingListIsSolved
stepCheckInvoiceTransactionRule
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
"""
self.playSequence(sequence)
def test_13_SplitAndDeferInvoice(self, quiet=0, run=RUN_ALL_TESTS):
"""
- Accept Order, Accept Packing List
- Decrease quantity on Invoice
- Split and defer Invoice
- Accept Invoice
- Accept splitted Invoice
- Packing List must not be divergent and use old quantity
- Invoice must not be divergent and use new quantity
- splitted Invoice must not be divergent and use old - new quantity
"""
if not run: return
self.logMessage('InvoiceSplitAndDeferInvoice')
sequence = self.PACKING_LIST_DEFAULT_SEQUENCE + \
"""
stepSetReadyPackingList
stepTic
stepStartPackingList
stepCheckInvoicingRule
stepCheckInvoiceTransactionRule
stepTic
stepCheckInvoiceBuilding
stepStopPackingList
stepTic
stepDeliverPackingList
stepTic
stepCheckPackingListIsSolved
stepCheckPackingListNotSplitted
stepDecreaseInvoiceLineQuantity
stepCheckInvoiceIsDivergent
stepCheckInvoiceIsCalculating
stepSplitAndDeferInvoice
stepTic
stepConfirmInvoice
stepTic
stepStopInvoice
stepTic
stepDeliverInvoice
stepTic
stepCheckInvoiceIsNotDivergent
stepCheckInvoiceIsSolved
stepCheckInvoiceSplitted
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
stepCheckPackingListIsNotDivergent
stepCheckPackingListIsSolved
stepCheckInvoiceTransactionRule
stepSwitchInvoices
stepConfirmInvoice
stepTic
stepStopInvoice
stepTic
stepDeliverInvoice
stepTic
stepCheckInvoiceIsNotDivergent
stepCheckInvoiceIsSolved
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
"""
self.playSequence(sequence)
def test_14_AcceptDecisionOnInvoice(self, quiet=0, run=RUN_ALL_TESTS):
"""
- Accept Order, Accept Packing List
- Increase or Decrease quantity on Invoice
- Accept Decision on Invoice
- Accept Invoice
- Packing List must not be divergent and use old quantity
- Invoice must not be divergent and use new quantity
"""
if not run: return
self.logMessage('InvoiceAcceptDecisionOnInvoice')
mid_sequence = \
"""
stepSetReadyPackingList
stepTic
stepStartPackingList
stepCheckInvoicingRule
stepCheckInvoiceTransactionRule
stepTic
stepCheckInvoiceBuilding
stepStopPackingList
stepTic
stepDeliverPackingList
stepTic
stepCheckPackingListIsSolved
stepCheckPackingListNotSplitted
"""
end_sequence = \
"""
stepCheckInvoiceIsDivergent
stepCheckInvoiceIsCalculating
stepAcceptDecisionInvoice
stepTic
stepConfirmInvoice
stepTic
stepStopInvoice
stepTic
stepDeliverInvoice
stepTic
stepCheckPackingListIsNotDivergent
stepCheckPackingListIsSolved
stepCheckInvoiceTransactionRule
stepCheckInvoiceNotSplitted
stepCheckInvoiceIsNotDivergent
stepCheckInvoiceIsSolved
stepRebuildAndCheckNothingIsCreated
stepCheckInvoicesConsistency
"""
mid_sequence_list = ["""
stepDecreaseInvoiceLineQuantity
""", """
stepIncreaseInvoiceLineQuantity
"""]
sequence_list = SequenceList()
for seq in mid_sequence_list:
sequence = self.PACKING_LIST_DEFAULT_SEQUENCE + \
mid_sequence + seq + end_sequence
sequence_list.addSequenceString(sequence)
sequence_list.play(self)
#class TestPurchaseInvoice(TestInvoice):
# order_portal_type = 'Purchase Order'
# order_line_portal_type = 'Purchase Order Line'
# order_cell_portal_type = 'Purchase Order Cell'
# packing_list_portal_type = 'Purchase Packing List'
# packing_list_line_portal_type = 'Purchase Packing List Line'
# packing_list_cell_portal_type = 'Purchase Packing List Cell'
# delivery_builder_id = 'purchase_packing_list_builder'
# invoice_portal_type = 'Purchase Invoice Transaction'
# invoice_transaction_line_portal_type = 'Purchase Invoice Transaction Line'
#
# def getTitle(self):
# return "Purchase Invoices"
if __name__ == '__main__':
framework()
else:
......
......@@ -57,7 +57,6 @@ from Products.CMFCore.utils import getToolByName
class TestOrderMixin:
run_all_test = 1
default_quantity = 99
default_price = 555
resource_portal_type = 'Apparel Model'
......@@ -71,7 +70,7 @@ class TestOrderMixin:
packing_list_cell_portal_type = 'Sale Packing List Cell'
delivery_builder_id = 'sale_packing_list_builder'
order_workflow_id='order_workflow'
size_list = ['Baby','Child/32','Child/34','Man','Woman']
size_list = ['Baby','Child/32','Child/34','Man','Woman']
def getBusinessTemplateList(self):
"""
......@@ -93,8 +92,8 @@ class TestOrderMixin:
self.createCategories()
def createCategories(self):
"""
Light install create only base categories, so we create
"""
Light install create only base categories, so we create
some categories for testing them
"""
size_category_list = ['Baby', 'Child', 'Man', 'Woman']
......@@ -126,7 +125,7 @@ class TestOrderMixin:
o = self.category_tool.product_line.newContent(
portal_type='Category',
id=category_id)
def stepTic(self,**kw):
self.tic()
......@@ -162,7 +161,7 @@ class TestOrderMixin:
industrial_phase_list=["phase1", "phase2"],
product_line = 'apparel'
)
resource.setSizeList(self.size_list)
resource.setSizeList(self.size_list)
# Add colour variation
colour_variation_count = 3
for i in range(colour_variation_count):
......@@ -185,7 +184,7 @@ class TestOrderMixin:
resource_list.append(resource)
sequence.edit( resource_list = resource_list )
def stepCreateOrganisation(self, sequence=None, sequence_list=None,
def stepCreateOrganisation(self, sequence=None, sequence_list=None,
title='organisation', **kw):
"""
Create a empty organisation
......@@ -258,10 +257,10 @@ class TestOrderMixin:
# vcl = list(order_line.getVariationCategoryList())
# cell_key_list = order_line.getCellKeyList(base_id=base_id)
cell_list = order_line.objectValues(portal_type=self.order_cell_portal_type)
# self.failIfDifferentSet( vcl , [] )
# self.failIfDifferentSet( vcl , [] )
# self.failIfDifferentSet( cell_key_list , [] )
self.failIfDifferentSet( cell_list , [] )
def stepSetOrderLineResource(self, sequence=None, sequence_list=None, **kw):
"""
Set order line resource with the current resource
......@@ -443,7 +442,7 @@ class TestOrderMixin:
order_line = sequence.get('order_line')
order_line.edit(quantity=self.default_quantity,
price=self.default_price)
def stepCheckOrderLineDefaultValues(self, sequence=None, \
sequence_list=None, **kw):
"""
......@@ -452,17 +451,17 @@ class TestOrderMixin:
order_line = sequence.get('order_line')
self.assertEquals(self.default_quantity, order_line.getQuantity())
self.assertEquals(self.default_price, order_line.getPrice())
def stepCheckOrderLineTotalQuantity(self, sequence=None, \
sequence_list=None, **kw):
"""
Check the method getTotalQuantity on a order line.
"""
# FIXME : order_line needs to be indexed for 'fast' calculation to
# work as expected
self.stepTic()
order_line = sequence.get('order_line')
base_id = 'movement'
cell_key_list = order_line.getCellKeyList(base_id=base_id)
......@@ -478,23 +477,23 @@ class TestOrderMixin:
self.assertEquals(total_quantity, order_line.getTotalQuantity())
self.assertEquals( order_line.getTotalQuantity(fast = 0),
order_line.getTotalQuantity(fast = 1) )
def stepCheckOrderLineTotalPrice(self, sequence=None, \
sequence_list=None, **kw):
"""
Check the method getTotalPrice on a order line.
"""
# FIXME : order_line needs to be indexed for 'fast' calculation to
# work as expected
self.stepTic()
order_line = sequence.get('order_line')
base_id = 'movement'
cell_key_list = order_line.getCellKeyList(base_id=base_id)
if list(cell_key_list) == []:
self.assertEquals(order_line.getProperty('price') *
order_line.getProperty('quantity'),
order_line.getProperty('quantity'),
order_line.getTotalPrice())
else:
total_price = 0
......@@ -506,17 +505,17 @@ class TestOrderMixin:
self.assertEquals(total_price, order_line.getTotalPrice())
self.assertEquals( order_line.getTotalPrice(fast = 0),
order_line.getTotalPrice(fast = 1) )
def stepCheckOrderTotalQuantity(self, sequence=None, sequence_list=None, \
**kw):
"""
Check the method getTotalQuantity on a order .
"""
# FIXME : order needs to be indexed for 'fast' calculation to
# work as expected
self.stepTic()
order = sequence.get('order')
order_line_list = order.objectValues( \
portal_type=self.order_line_portal_type)
......@@ -527,17 +526,17 @@ class TestOrderMixin:
self.assertEquals(total_quantity, order.getTotalQuantity())
self.assertEquals( order.getTotalQuantity(fast = 0),
order.getTotalQuantity(fast = 1) )
def stepCheckOrderTotalPrice(self, sequence=None, sequence_list=None, \
**kw):
"""
Check the method getTotalPrice on a order .
"""
# FIXME : order needs to be indexed for 'fast' calculation to
# work as expected
self.stepTic()
order = sequence.get('order')
order_line_list = order.objectValues( \
portal_type=self.order_line_portal_type)
......@@ -548,7 +547,7 @@ class TestOrderMixin:
self.assertEquals(total_price, order.getTotalPrice())
self.assertEquals( order.getTotalPrice(fast = 0),
order.getTotalPrice(fast = 1) )
def stepCheckOrderInitialState(self, sequence=None, sequence_list=None, \
**kw):
"""
......@@ -556,7 +555,7 @@ class TestOrderMixin:
"""
order = sequence.get('order')
self.assertEquals('draft', order.getSimulationState())
def stepCheckOrderLineState(self, sequence=None, sequence_list=None, \
**kw):
"""
......@@ -565,7 +564,7 @@ class TestOrderMixin:
order = sequence.get('order')
order_line = sequence.get('order_line')
self.assertEquals(order.getSimulationState(), order_line.getSimulationState())
def stepCheckOrderCellState(self, sequence=None, sequence_list=None, \
**kw):
"""
......@@ -580,12 +579,13 @@ class TestOrderMixin:
def stepCheckOrderPlanned(self, sequence=None, sequence_list=None, **kw):
order = sequence.get('order')
self.assertEquals('planned', order.getSimulationState())
def checkAcquisition(self, object, acquired_object):
"""
Check if properties are well acquired
"""
# packing_list_movement, simulation_movement
self.assertEquals(acquired_object.getStartDate(), object.getStartDate())
self.assertEquals(acquired_object.getStopDate(), object.getStopDate())
self.assertEquals(acquired_object.getSourceValue(), \
......@@ -649,19 +649,19 @@ class TestOrderMixin:
self.failUnless(applied_rule is not None)
self.failUnless(order_state, \
applied_rule.getLastExpandSimulationState())
# Test if applied rule has a specialise value with default_order_rule
portal_rules = getToolByName(order, 'portal_rules')
self.assertEquals(portal_rules.default_order_rule, \
applied_rule.getSpecialiseValue())
simulation_movement_list = applied_rule.objectValues()
sequence.edit(simulation_movement_list=simulation_movement_list)
# Count the number of movement in order
order_line_list = order.objectValues( \
portal_type=self.order_line_portal_type)
order_line_list = map(lambda x: x.getObject(), order_line_list)
order_line_list = [x.getObject() for x in order_line_list]
movement_list = []
for order_line in order_line_list:
if not order_line.hasCellContent():
......@@ -669,12 +669,12 @@ class TestOrderMixin:
else:
cell_list = order_line.objectValues( \
portal_type=self.order_cell_portal_type)
movement_list.extend(map(lambda x: x.getObject(), cell_list))
movement_list.extend([x.getObject() for x in cell_list])
# Check if number of movement is equal to number of simulation movement
self.assertEquals(len(movement_list), len(simulation_movement_list))
# Check if each movement has only one simulation movement related
order_movement_list = map(lambda x: x.getOrderValue(), \
simulation_movement_list)
order_movement_list = [x.getOrderValue() for x in \
simulation_movement_list]
self.failIfDifferentSet(movement_list, order_movement_list)
# Check each simulation movement
......@@ -699,7 +699,7 @@ class TestOrderMixin:
# Test other attributes
self.assertEquals(1, simulation_movement.deliverable)
def modifyOrderState(self, transition_name, sequence=None,
def modifyOrderState(self, transition_name, sequence=None,
sequence_list=None):
order = sequence.get('order')
order.portal_workflow.doActionFor(order, transition_name, \
......@@ -784,7 +784,7 @@ class TestOrderMixin:
packing_list = related_packing_list_list[0].getObject()
self.failUnless(packing_list is not None)
sequence.edit(packing_list = packing_list)
applied_rule = related_applied_rule_list[0].getObject()
simulation_movement_list = applied_rule.objectValues()
......@@ -862,17 +862,17 @@ class TestOrderMixin:
"""
order = sequence.get('order')
order.setStartDate(self.datetime + 77)
def stepModifyOrderLineStartDate(self, sequence=None, sequence_list=None, \
**kw):
"""
Modify order line start date
"""
order_line = sequence.get('order_line')
order_line.setStartDate(self.datetime + 88)
order_line.edit(start_date=self.datetime + 88)
def stepModifyOrderCellStartDate(self, sequence=None, sequence_list=None, \
**kw):
**kw):
"""
Modify order cell start date
"""
......@@ -881,22 +881,82 @@ class TestOrderMixin:
if len(cell_list) > 0:
order_cell = cell_list[0].getObject()
order_cell.setStartDate(self.datetime + 99)
def stepModifyOrderLineQuantity(self, sequence=None, sequence_list=None, \
**kw):
"""
Modify order line quantity
"""
order_line = sequence.get('order_line')
order_line.setQuantity(order_line.getQuantity() + 111)
def stepCheckOrderSimulationStable(self, sequence=None, \
sequence_list=None, **kw):
"""
Tests that the simulation related to the order is stable and not
divergent
"""
order = sequence.get('order')
order_movement_list = order.getMovementList()
related_simulation_list = []
for order_movement in order_movement_list:
related_simulation_list.extend(order_movement.getOrderRelatedValueList())
related_applied_rule_list = {}
for simulation_mvt in related_simulation_list:
self.assertFalse(simulation_mvt.isDivergent())
related_applied_rule_list[simulation_mvt.getParent()]=1
for applied_rule in related_applied_rule_list.keys():
self.assertTrue(applied_rule.isStable())
def stepPackingListAdoptPrevision(self,sequence=None, sequence_list=None,
**kw):
"""
Check if simulation movement are disconnected
"""
packing_list = sequence.get('packing_list')
packing_list.portal_workflow.doActionFor(packing_list,
'adopt_prevision_action')
non_variated_order_creation = '\
stepCreateOrder \
stepCreateNotVariatedResource \
stepTic \
stepCreateOrderLine \
stepCheckOrderLineEmptyMatrix \
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepCheckOrderLineDefaultValues \
'
variated_order_line_creation = '\
stepCreateOrder \
stepCreateVariatedResource \
stepTic \
stepCreateOrderLine \
'
variated_line_completion = '\
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepCheckOrderLineDefaultValues \
stepCheckOrderLineTotalQuantity \
stepSetOrderLineFullVCL \
stepCompleteOrderLineMatrix \
'
variated_order_creation = variated_order_line_creation + \
variated_line_completion
class TestOrder(TestOrderMixin, ERP5TypeTestCase):
"""
Test business template erp5_trade
Test business template erp5_trade
"""
run_all_test = 1
def getTitle(self):
return "Order"
def enableLightInstall(self):
"""
You can override this.
You can override this.
Return if we should do a light install (1) or not (0)
"""
return 1
......@@ -934,10 +994,10 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
# new_price = 0.0
# self.assertEquals(new_quantity, cell.getProperty('quantity'))
# self.assertEquals(new_price, cell.getProperty('price'))
#
#
# # XXX test getTotalPrice on OrderLine
def test_01_OrderLine_getVariationRangeCategoryList(self, quiet=0,
def test_01_OrderLine_getVariationRangeCategoryList(self, quiet=0,
run=run_all_test):
"""
Test order line getVariationRangeCategoryList.
......@@ -945,24 +1005,24 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
if not run: return
sequence_list = SequenceList()
# Test when resource has no variation
sequence_string = 'CreateOrder \
CreateOrderLine \
CheckOrderLineVRCL \
CreateNotVariatedResource \
Tic \
SetOrderLineResource \
CheckOrderLineVRCL \
'
sequence_string = 'stepCreateOrder \
stepCreateOrderLine \
stepCheckOrderLineVRCL \
stepCreateNotVariatedResource \
stepTic \
stepSetOrderLineResource \
stepCheckOrderLineVRCL \
'
sequence_list.addSequenceString(sequence_string)
# Test when resource has variation
sequence_string = 'CreateOrder \
CreateOrderLine \
CheckOrderLineVRCL \
CreateVariatedResource \
Tic \
SetOrderLineResource \
CheckOrderLineVRCL \
'
sequence_string = 'stepCreateOrder \
stepCreateOrderLine \
stepCheckOrderLineVRCL \
stepCreateVariatedResource \
stepTic \
stepSetOrderLineResource \
stepCheckOrderLineVRCL \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
......@@ -974,24 +1034,24 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
if not run: return
sequence_list = SequenceList()
# Test when resource has no variation
sequence_string = 'CreateOrder \
CreateOrderLine \
CheckOrderLineVRCIL \
CreateNotVariatedResource \
Tic \
SetOrderLineResource \
CheckOrderLineVRCIL \
'
sequence_string = 'stepCreateOrder \
stepCreateOrderLine \
stepCheckOrderLineVRCIL \
stepCreateNotVariatedResource \
stepTic \
stepSetOrderLineResource \
stepCheckOrderLineVRCIL \
'
sequence_list.addSequenceString(sequence_string)
# Test when resource has variation
sequence_string = 'CreateOrder \
CreateOrderLine \
CheckOrderLineVRCIL \
CreateVariatedResource \
Tic \
SetOrderLineResource \
CheckOrderLineVRCIL \
'
sequence_string = 'stepCreateOrder \
stepCreateOrderLine \
stepCheckOrderLineVRCIL \
stepCreateVariatedResource \
stepTic \
stepSetOrderLineResource \
stepCheckOrderLineVRCIL \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
......@@ -1012,16 +1072,16 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
if not run: return
sequence_list = SequenceList()
# Test when resource has variation
sequence_string = 'CreateOrder \
CreateOrderLine \
CheckOrderLineVCIL \
CreateVariatedResource \
Tic \
SetOrderLineResource \
SetOrderLineHalfVCL \
Tic \
CheckOrderLineVCIL \
'
sequence_string = 'stepCreateOrder \
stepCreateOrderLine \
stepCheckOrderLineVCIL \
stepCreateVariatedResource \
stepTic \
stepSetOrderLineResource \
stepSetOrderLineHalfVCL \
stepTic \
stepCheckOrderLineVCIL \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
......@@ -1031,51 +1091,36 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
"""
if not run: return
sequence_list = SequenceList()
# Test when resource has no variation
sequence_string = 'CreateOrder \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
CheckOrderLineEmptyMatrix \
SetOrderLineResource \
CheckOrderLineEmptyMatrix \
SetOrderLineFullVCL \
CheckOrderLineRange \
CheckOrderLineEmptyMatrix \
CompleteOrderLineMatrix \
Tic \
CheckOrderLineCompleteMatrix \
SetOrderLineHalfVCL \
Tic \
CheckOrderLineRange \
CheckOrderLineCompleteMatrix \
SetOrderLineEmptyVCL \
Tic \
CheckOrderLineEmptyMatrix \
# common part of sequence
base_sequence = 'stepTic \
stepCreateOrderLine \
stepCheckOrderLineEmptyMatrix \
stepSetOrderLineResource \
stepCheckOrderLineEmptyMatrix \
stepSetOrderLineFullVCL \
stepCheckOrderLineRange \
stepCheckOrderLineEmptyMatrix \
stepCompleteOrderLineMatrix \
stepTic \
stepCheckOrderLineCompleteMatrix \
stepSetOrderLineHalfVCL \
stepTic \
stepCheckOrderLineRange \
stepCheckOrderLineCompleteMatrix \
stepSetOrderLineEmptyVCL \
stepTic \
stepCheckOrderLineEmptyMatrix \
'
# Test when resource has no variation
sequence_string = 'stepCreateOrder \
stepCreateNotVariatedResource \
' + base_sequence
sequence_list.addSequenceString(sequence_string)
# Test when resource has variations
sequence_string = 'CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
CheckOrderLineEmptyMatrix \
SetOrderLineResource \
CheckOrderLineEmptyMatrix \
SetOrderLineFullVCL \
CheckOrderLineRange \
CheckOrderLineEmptyMatrix \
CompleteOrderLineMatrix \
Tic \
CheckOrderLineCompleteMatrix \
SetOrderLineHalfVCL \
Tic \
CheckOrderLineRange \
CheckOrderLineCompleteMatrix \
SetOrderLineEmptyVCL \
Tic \
CheckOrderLineRange \
CheckOrderLineEmptyMatrix \
sequence_string = 'stepCreateOrder \
stepCreateVariatedResource \
' + base_sequence + \
'stepCheckOrderLineRange \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
......@@ -1098,7 +1143,7 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
# # Then, store new properties
# cell.edit(price=price)
# self.assertEquals(price, cell.getProperty('price'))
#
#
# # for new_quantity, new_price in [(None, 346), (123, None), (None, None), \
# # (quantity, price)]:
# def stepSetCellPriceToNone(self, sequence=None, sequence_list=None, **kw):
......@@ -1106,7 +1151,7 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
# Set the cell price to None
# """
# self.modifyOrderLineCellPrice(None)
#
#
# def stepSetCellQuantityToNone(self, sequence=None, sequence_list=None, **kw):
# """
# Set the cell price to None
......@@ -1121,30 +1166,13 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
sequence_list = SequenceList()
# Test when resource has no variation
sequence_string = '\
CreateOrder \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
CheckOrderLineDefaultValues \
CheckOrderLineTotalQuantity \
sequence_string = self.non_variated_order_creation + '\
stepCheckOrderLineTotalQuantity \
'
sequence_list.addSequenceString(sequence_string)
# Test when resource has variations
sequence_string = '\
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
CheckOrderLineDefaultValues \
CheckOrderLineTotalQuantity \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
CheckOrderLineTotalQuantity \
sequence_string = self.variated_order_creation + '\
stepCheckOrderLineTotalQuantity \
'
sequence_list.addSequenceString(sequence_string)
# XXX test cell modification
......@@ -1159,31 +1187,13 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
sequence_list = SequenceList()
# Test when resource has not variation
sequence_string = '\
CreateOrder \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
CheckOrderLineEmptyMatrix \
SetOrderLineResource \
SetOrderLineDefaultValues \
CheckOrderLineDefaultValues \
CheckOrderLineTotalPrice \
sequence_string = self.non_variated_order_creation + '\
stepCheckOrderLineTotalPrice \
'
sequence_list.addSequenceString(sequence_string)
# Test when resource has variations
sequence_string = '\
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
CheckOrderLineDefaultValues \
CheckOrderLineTotalQuantity \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
CheckOrderLineTotalPrice \
sequence_string = self.variated_order_creation + '\
stepCheckOrderLineTotalPrice \
'
sequence_list.addSequenceString(sequence_string)
# XXX test cell modification
......@@ -1198,53 +1208,30 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
sequence_list = SequenceList()
# Test with no order line
sequence_string = '\
CreateOrder \
CheckOrderTotalQuantity \
stepCreateOrder \
stepCheckOrderTotalQuantity \
'
sequence_list.addSequenceString(sequence_string)
# Test when resource has not variation
sequence_string = '\
CreateOrder \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
Tic \
CheckOrderTotalQuantity \
sequence_string = self.non_variated_order_creation + '\
stepCheckOrderTotalQuantity \
'
sequence_list.addSequenceString(sequence_string)
# Test when resource has variations
sequence_string = '\
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
CheckOrderTotalQuantity \
sequence_string = self.variated_order_creation + '\
stepCheckOrderTotalQuantity \
'
sequence_list.addSequenceString(sequence_string)
# Test whith multiples order line
sequence_string = '\
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
CheckOrderTotalQuantity \
CreateNotVariatedResource \
CreateOrderLine \
Tic \
SetOrderLineResource \
SetOrderLineDefaultValues \
Tic \
CheckOrderTotalQuantity \
sequence_string = self.variated_order_creation + '\
stepCheckOrderTotalQuantity \
stepCreateNotVariatedResource \
stepCreateOrderLine \
stepTic \
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepTic \
stepCheckOrderTotalQuantity \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
......@@ -1257,52 +1244,30 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
sequence_list = SequenceList()
# Test with no order line
sequence_string = '\
CreateOrder \
CheckOrderTotalPrice\
stepCreateOrder \
stepCheckOrderTotalPrice\
'
sequence_list.addSequenceString(sequence_string)
# Test when resource has not variation
sequence_string = '\
CreateOrder \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
Tic \
CheckOrderTotalPrice \
sequence_string = self.non_variated_order_creation + '\
stepCheckOrderTotalPrice \
'
sequence_list.addSequenceString(sequence_string)
# Test when resource has variations
sequence_string = '\
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
CheckOrderTotalPrice \
sequence_string = self.variated_order_creation + '\
stepCheckOrderTotalPrice \
'
sequence_list.addSequenceString(sequence_string)
# Test with multiples order line
sequence_string = '\
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
Tic \
CheckOrderTotalPrice \
sequence_string = self.variated_order_creation + '\
stepCheckOrderTotalQuantity \
stepCreateNotVariatedResource \
stepCreateOrderLine \
stepTic \
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepTic \
stepCheckOrderTotalPrice \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
......@@ -1314,50 +1279,40 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
if not run: return
sequence_list = SequenceList()
sequence_string = '\
CreateOrganisation \
CreateOrder \
CreateVariatedResource \
Tic \
CheckOrderInitialState \
CreateOrderLine \
Tic \
CheckOrderLineState \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
Tic \
CheckOrderCellState \
PlanOrder \
Tic \
CheckOrderPlanned \
CheckOrderLineState \
CheckOrderCellState \
stepCreateOrganisation \
' + self.variated_order_line_creation + '\
stepCheckOrder \
stepCheckOrderInitialState \
stepTic \
stepCheckOrderLineState \
' + self.variated_line_completion + '\
stepTic \
stepCheckOrderCellState \
stepPlanOrder \
stepTic \
stepCheckOrderPlanned \
stepCheckOrderLineState \
stepCheckOrderCellState \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def test_11_testPropertiesAcquisition(self, quiet=0, run=run_all_test):
"""
Test if some properties on order line or order
Test if some properties on order line or order
cell are well acquired.
"""
if not run: return
sequence_list = SequenceList()
sequence_string = '\
CreateOrganisation \
CreateOrder \
CheckOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
CheckOrderLineAcquisition \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
CheckOrderCellAcquisition \
Tic \
stepCreateOrganisation \
' + self.variated_order_line_creation + '\
stepCheckOrder \
stepCheckOrderInitialState \
stepCheckOrderLineAcquisition \
' + self.variated_line_completion + '\
stepCheckOrderCellAcquisition \
stepTic \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
......@@ -1370,117 +1325,90 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
sequence_list = SequenceList()
# Test first some portal method
sequence_string = '\
CreateOrder \
CheckPortalMethod \
stepCreateOrder \
stepCheckPortalMethod \
'
sequence_list.addSequenceString(sequence_string)
# Test when order is cancelled
sequence_string = '\
CreateOrganisation \
CreateOrder \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
CheckOrderSimulation \
PlanOrder \
Tic \
CheckOrderSimulation \
OrderOrder \
Tic \
CheckOrderSimulation \
CancelOrder \
Tic \
Tic \
CheckOrderSimulation \
stepCreateOrganisation \
' + self.non_variated_order_creation + '\
stepCheckOrderSimulation \
stepPlanOrder \
stepTic \
stepCheckOrderSimulation \
stepOrderOrder \
stepTic \
stepCheckOrderSimulation \
stepCancelOrder \
stepTic \
stepTic \
stepCheckOrderSimulation \
'
sequence_list.addSequenceString(sequence_string)
# Test with a simply order without cell
sequence_string = '\
CreateOrganisation \
CreateOrder \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
PlanOrder \
Tic \
CheckOrderSimulation \
OrderOrder \
Tic \
CheckOrderSimulation \
ConfirmOrder \
Tic \
CheckOrderSimulation \
stepCreateOrganisation \
' + self.non_variated_order_creation + '\
stepPlanOrder \
stepTic \
stepCheckOrderSimulation \
stepOrderOrder \
stepTic \
stepCheckOrderSimulation \
stepConfirmOrder \
stepTic \
stepCheckOrderSimulation \
'
sequence_list.addSequenceString(sequence_string)
# Test to confirm order without planned or ordered it
sequence_string = '\
CreateOrganisation \
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
OrderOrder \
Tic \
CheckOrderSimulation \
ConfirmOrder \
Tic \
CheckOrderSimulation \
stepCreateOrganisation \
' + self.variated_order_line_creation + '\
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepOrderOrder \
stepTic \
stepCheckOrderSimulation \
stepConfirmOrder \
stepTic \
stepCheckOrderSimulation \
'
sequence_list.addSequenceString(sequence_string)
# Test to confirm order with variated resource
sequence_string = '\
CreateOrganisation \
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
Tic \
OrderOrder \
Tic \
CheckOrderSimulation \
ConfirmOrder \
Tic \
CheckOrderSimulation \
stepCreateOrganisation \
' + self.variated_order_creation + '\
stepTic \
stepOrderOrder \
stepTic \
stepCheckOrderSimulation \
stepConfirmOrder \
stepTic \
stepCheckOrderSimulation \
'
sequence_list.addSequenceString(sequence_string)
# Test to confirm order with multiples lines
sequence_string = '\
CreateOrganisation \
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
Tic \
OrderOrder \
Tic \
CheckOrderSimulation \
ConfirmOrder \
Tic \
CheckOrderSimulation \
stepCreateOrganisation \
' + self.variated_order_creation + '\
stepCreateNotVariatedResource \
stepTic \
stepCreateOrderLine \
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepTic \
stepOrderOrder \
stepTic \
stepCheckOrderSimulation \
stepConfirmOrder \
stepTic \
stepCheckOrderSimulation \
'
sequence_list.addSequenceString(sequence_string)
......@@ -1494,53 +1422,35 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
sequence_list = SequenceList()
# Test when order is modified
sequence_string = '\
CreateOrganisation \
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
OrderOrder \
Tic \
ModifyOrderStartDate \
CheckOrderSimulation \
stepCreateOrganisation \
' + self.variated_order_creation + '\
stepOrderOrder \
stepTic \
stepModifyOrderStartDate \
stepTic \
stepCheckOrderSimulation \
'
sequence_list.addSequenceString(sequence_string)
# Test when order line is modified
sequence_string = '\
CreateOrganisation \
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
OrderOrder \
Tic \
ModifyOrderLineStartDate \
CheckOrderSimulation \
stepCreateOrganisation \
' + self.variated_order_creation + '\
stepOrderOrder \
stepTic \
stepModifyOrderLineStartDate \
stepTic \
stepCheckOrderSimulation \
'
sequence_list.addSequenceString(sequence_string)
# Test when order cell is modified
sequence_string = '\
CreateOrganisation \
CreateOrder \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
OrderOrder \
Tic \
ModifyOrderCellStartDate \
CheckOrderSimulation \
stepCreateOrganisation \
' + self.variated_order_creation + '\
stepOrderOrder \
stepTic \
stepModifyOrderCellStartDate \
stepTic \
stepCheckOrderSimulation \
'
sequence_list.addSequenceString(sequence_string)
......@@ -1562,129 +1472,117 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
sequence_list = SequenceList()
# First, test if delivery buider exists
sequence_string = '\
CheckDeliveryBuilderPresence \
stepCheckDeliveryBuilderPresence \
'
sequence_list.addSequenceString(sequence_string)
# Test with a simply order without cell
sequence_string = '\
CreateOrganisation1 \
CreateOrganisation2 \
CreateOrder \
SetOrderProfile \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
OrderOrder \
Tic \
CheckDeliveryBuilding \
ConfirmOrder \
Tic \
CheckDeliveryBuilding \
stepCreateOrganisation1 \
stepCreateOrganisation2 \
stepCreateOrder \
stepSetOrderProfile \
stepCreateNotVariatedResource \
stepTic \
stepCreateOrderLine \
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepOrderOrder \
stepTic \
stepCheckDeliveryBuilding \
stepConfirmOrder \
stepTic \
stepCheckDeliveryBuilding \
'
sequence_list.addSequenceString(sequence_string)
# Test to confirm order with variated resource
sequence_string = '\
CreateOrganisation1 \
CreateOrganisation2 \
CreateOrder \
SetOrderProfile \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
Tic \
OrderOrder \
Tic \
CheckDeliveryBuilding \
ConfirmOrder \
Tic \
CheckDeliveryBuilding \
stepCreateOrganisation1 \
stepCreateOrganisation2 \
stepCreateOrder \
stepSetOrderProfile \
stepCreateVariatedResource \
stepTic \
stepCreateOrderLine \
' + self.variated_line_completion + '\
stepTic \
stepOrderOrder \
stepTic \
stepCheckDeliveryBuilding \
stepConfirmOrder \
stepTic \
stepCheckDeliveryBuilding \
'
sequence_list.addSequenceString(sequence_string)
# Test to confirm order with multiples lines
sequence_string = '\
CreateOrganisation1 \
CreateOrganisation2 \
CreateOrder \
SetOrderProfile \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
Tic \
OrderOrder \
Tic \
CheckDeliveryBuilding \
ConfirmOrder \
Tic \
CheckDeliveryBuilding \
stepCreateOrganisation1 \
stepCreateOrganisation2 \
stepCreateOrder \
stepSetOrderProfile \
stepCreateVariatedResource \
stepTic \
stepCreateOrderLine \
' + self.variated_line_completion + '\
stepCreateNotVariatedResource \
stepTic \
stepCreateOrderLine \
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepTic \
stepOrderOrder \
stepTic \
stepCheckDeliveryBuilding \
stepConfirmOrder \
stepTic \
stepCheckDeliveryBuilding \
'
sequence_list.addSequenceString(sequence_string)
# Test with a order with 2 lines and the same not variated resource
sequence_string = '\
CreateOrganisation1 \
CreateOrganisation2 \
CreateOrder \
SetOrderProfile \
CreateNotVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
OrderOrder \
Tic \
CheckDeliveryBuilding \
ConfirmOrder \
Tic \
CheckDeliveryBuilding \
stepCreateOrganisation1 \
stepCreateOrganisation2 \
stepCreateOrder \
stepSetOrderProfile \
stepCreateNotVariatedResource \
stepTic \
stepCreateOrderLine \
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepCreateOrderLine \
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepOrderOrder \
stepTic \
stepCheckDeliveryBuilding \
stepConfirmOrder \
stepTic \
stepCheckDeliveryBuilding \
'
sequence_list.addSequenceString(sequence_string)
# Test with a order with 2 lines and the same variated resource
sequence_string = '\
CreateOrganisation1 \
CreateOrganisation2 \
CreateOrder \
SetOrderProfile \
CreateVariatedResource \
Tic \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
Tic \
OrderOrder \
Tic \
CheckDeliveryBuilding \
ConfirmOrder \
Tic \
CheckDeliveryBuilding \
stepCreateOrganisation1 \
stepCreateOrganisation2 \
stepCreateOrder \
stepSetOrderProfile \
stepCreateVariatedResource \
stepTic \
stepCreateOrderLine \
' + self.variated_line_completion + '\
stepCreateOrderLine \
' + self.variated_line_completion + '\
stepTic \
stepOrderOrder \
stepTic \
stepCheckDeliveryBuilding \
stepConfirmOrder \
stepTic \
stepCheckDeliveryBuilding \
'
sequence_list.addSequenceString(sequence_string)
......@@ -1701,31 +1599,100 @@ class TestOrder(TestOrderMixin, ERP5TypeTestCase):
# XXX Does not work yet
# Test to confirm order without doing any tic
# Except after creating organisations
sequence_string = '\
stepCreateOrganisation1 \
stepCreateOrganisation2 \
stepTic \
stepCreateOrder \
stepSetOrderProfile \
stepCreateVariatedResource \
stepCreateOrderLine \
' + self.variated_line_completion + '\
stepCreateNotVariatedResource \
stepCreateOrderLine \
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepOrderOrder \
stepCheckDeliveryBuilding \
stepConfirmOrder \
stepTic \
stepCheckDeliveryBuilding \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def test_17_orderExpand(self, quiet=0, run=run_all_test):
"""
This tests the behaviour of expand.
First, without delivery:
- create an order, apply a rule
- modify the order line, expand the rule
- check that the simulation is not Divergent and Stable (simulation must
match order value)
Second, with delivery:
- create an order, apply a rule
- build a packing list
- modify the order line, expand the rule
- adopt prevision to fix the delivery
- check that the simulation is not Divergent and Stable (simulation must
match delivery value)
"""
if not run: return
sequence_list = SequenceList()
sequence_string = '\
CreateOrganisation1 \
CreateOrganisation2 \
Tic \
CreateOrder \
SetOrderProfile \
CreateVariatedResource \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
SetOrderLineFullVCL \
CompleteOrderLineMatrix \
CreateNotVariatedResource \
CreateOrderLine \
SetOrderLineResource \
SetOrderLineDefaultValues \
OrderOrder \
CheckDeliveryBuilding \
ConfirmOrder \
Tic \
Tic \
CheckDeliveryBuilding \
stepCreateOrganisation1 \
stepCreateOrganisation2 \
stepCreateOrder \
stepSetOrderProfile \
stepCreateNotVariatedResource \
stepTic \
stepCreateOrderLine \
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepOrderOrder \
stepTic \
stepModifyOrderLineQuantity \
stepTic \
stepCheckDeliveryBuilding \
stepConfirmOrder \
stepTic \
stepCheckOrderSimulation \
stepCheckOrderSimulationStable \
'
sequence_list.addSequenceString(sequence_string)
# XXX XXX FIXME
# The idea of this test is to create a PackingList
# Before the Order is in state confirmed.
# And then, check the behaviour of isDivergent
# when the Order is modified
sequence_string = '\
stepCreateOrganisation1 \
stepCreateOrganisation2 \
stepCreateOrder \
stepSetOrderProfile \
stepCreateNotVariatedResource \
stepTic \
stepCreateOrderLine \
stepSetOrderLineResource \
stepSetOrderLineDefaultValues \
stepOrderOrder \
stepTic \
stepConfirmOrder \
stepTic \
stepCheckDeliveryBuilding \
stepModifyOrderLineQuantity \
stepTic \
stepPackingListAdoptPrevision \
stepTic \
stepCheckOrderSimulation \
stepCheckDeliveryBuilding \
stepCheckOrderSimulationStable \
'
# sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
......
......@@ -59,6 +59,7 @@ class TestPackingListMixin(TestOrderMixin):
"""
Test business template erp5_trade
"""
packable_packing_list_portal_type_list = ['Sale Packing List']
container_portal_type = 'Container'
container_line_portal_type = 'Container Line'
container_cell_portal_type = 'Container Cell'
......@@ -81,7 +82,8 @@ class TestPackingListMixin(TestOrderMixin):
stepCheckDeliveryBuilding \
stepCheckPackingListIsNotDivergent '
default_sequence_with_two_lines = 'stepCreateOrganisation1 \
default_sequence_with_two_lines = '\
stepCreateOrganisation1 \
stepCreateOrganisation2 \
stepCreateOrganisation3 \
stepCreateOrder \
......@@ -104,7 +106,8 @@ class TestPackingListMixin(TestOrderMixin):
stepCheckDeliveryBuilding \
stepCheckPackingListIsNotDivergent '
variated_default_sequence = 'stepCreateOrganisation1 \
variated_default_sequence = '\
stepCreateOrganisation1 \
stepCreateOrganisation2 \
stepCreateOrganisation3 \
stepCreateOrder \
......@@ -178,7 +181,7 @@ class TestPackingListMixin(TestOrderMixin):
Test if packing list is divergent
"""
packing_list = sequence.get('packing_list')
self.assertEquals('diverged',packing_list.getCausalityState())
self.assertEquals('diverged', packing_list.getCausalityState())
def stepCheckPackingListIsNotDivergent(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -205,11 +208,12 @@ class TestPackingListMixin(TestOrderMixin):
"""
packing_list = sequence.get('packing_list')
quantity = sequence.get('line_quantity',default=self.default_quantity)
quantity = quantity -1
quantity = quantity - 1
sequence.edit(line_quantity=quantity)
for packing_list_line in packing_list.objectValues(
portal_type=self.packing_list_line_portal_type):
packing_list_line.edit(quantity=quantity)
sequence.edit(last_delta = sequence.get('last_delta', 0.0) - 1.0)
def stepIncreasePackingListLineQuantity(self, sequence=None,
sequence_list=None, **kw):
......@@ -217,9 +221,13 @@ class TestPackingListMixin(TestOrderMixin):
Set a increased quantity on packing list lines
"""
packing_list = sequence.get('packing_list')
quantity = sequence.get('line_quantity',default=self.default_quantity)
quantity = quantity + 1
sequence.edit(line_quantity=quantity)
for packing_list_line in packing_list.objectValues(
portal_type=self.packing_list_line_portal_type):
packing_list_line.edit(quantity=self.default_quantity+1)
packing_list_line.edit(quantity=quantity)
sequence.edit(last_delta = sequence.get('last_delta', 0.0) + 1.0)
def stepSplitAndDeferPackingList(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -285,14 +293,41 @@ class TestPackingListMixin(TestOrderMixin):
portal_type=self.packing_list_portal_type)
self.assertEquals(1,len(packing_list_list))
packing_list1 = sequence.get('packing_list')
last_delta = sequence.get('last_delta', 0.0)
for line in packing_list1.objectValues(
portal_type= self.packing_list_line_portal_type):
self.assertEquals(self.default_quantity+1,line.getQuantity())
self.assertEquals(self.default_quantity + last_delta,
line.getQuantity())
simulation_list = line.getDeliveryRelatedValueList(
portal_type='Simulation Movement')
self.assertEquals(len(simulation_list),1)
simulation_movement = simulation_list[0]
self.assertEquals(simulation_movement.getQuantity(),self.default_quantity+1)
self.assertEquals(self.default_quantity + last_delta,
simulation_movement.getCorrectedQuantity())
def stepCheckPackingListNotSolved(self, sequence=None, sequence_list=None, **kw):
"""
This step is specific to test_10 : the incorrectly used solver didn't
solve anything.
"""
order = sequence.get('order')
packing_list_list = order.getCausalityRelatedValueList(
portal_type=self.packing_list_portal_type)
self.assertEquals(1,len(packing_list_list))
packing_list1 = sequence.get('packing_list')
last_delta = sequence.get('last_delta', 0.0)
for line in packing_list1.objectValues(
portal_type= self.packing_list_line_portal_type):
self.assertEquals(self.default_quantity + last_delta,
line.getQuantity())
simulation_list = line.getDeliveryRelatedValueList(
portal_type='Simulation Movement')
self.assertEquals(len(simulation_list),1)
simulation_movement = simulation_list[0]
# Here we don't add last_delta, as the solver didn't do its work.
self.assertEquals(self.default_quantity,
simulation_movement.getCorrectedQuantity())
def stepChangePackingListDestination(self, sequence=None,
sequence_list=None, **kw):
......@@ -429,7 +464,7 @@ class TestPackingListMixin(TestOrderMixin):
packing_list = sequence.get('new_packing_list')
self.stepAdoptPrevision(sequence=sequence,packing_list=packing_list)
def stepAcceptDecision(self,sequence=None, sequence_list=None, **kw):
def stepAcceptDecisionPackingList(self,sequence=None, sequence_list=None, **kw):
"""
Check if simulation movement are disconnected
"""
......@@ -561,6 +596,8 @@ class TestPackingListMixin(TestOrderMixin):
not equals to the quantity of the packing list
"""
packing_list = sequence.get('packing_list')
if packing_list.getPortalType() not in \
self.packable_packing_list_portal_type_list: return
self.assertEquals(0,packing_list.isPacked())
self.assertEquals('missing',packing_list.getContainerState())
......@@ -572,6 +609,8 @@ class TestPackingListMixin(TestOrderMixin):
"""
if packing_list is None:
packing_list = sequence.get('packing_list')
if packing_list.getPortalType() not in \
self.packable_packing_list_portal_type_list: return
get_transaction().commit()
self.assertEquals(1,packing_list.isPacked())
self.assertEquals('packed',packing_list.getContainerState())
......@@ -582,6 +621,8 @@ class TestPackingListMixin(TestOrderMixin):
equals to the quantity of the packing list
"""
packing_list = sequence.get('new_packing_list')
if packing_list.getPortalType() not in \
self.packable_packing_list_portal_type_list: return
self.stepCheckPackingListIsPacked(sequence=sequence,
packing_list=packing_list)
......@@ -621,8 +662,11 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
# Test with a simply order without cell
sequence_string = self.default_sequence + '\
stepChangePackingListDestination \
stepCheckPackingListIsDivergent \
stepCheckPackingListIsCalculating \
stepAcceptDecisionPackingList \
stepTic \
stepCheckPackingListIsSolved \
stepCheckPackingListIsNotDivergent \
stepCheckSimulationDestinationUpdated \
'
sequence_list.addSequenceString(sequence_string)
......@@ -640,7 +684,7 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
sequence_string = self.default_sequence + '\
stepChangePackingListStartDate \
stepCheckPackingListIsCalculating \
stepAcceptDecision \
stepAcceptDecisionPackingList \
stepTic \
stepCheckPackingListIsSolved \
stepCheckPackingListIsNotDivergent \
......@@ -783,9 +827,14 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
def test_10_PackingListIncreaseQuantity(self, quiet=0, run=run_all_test):
"""
Change the quantity on an delivery line, then
see if the packing list is divergent and then
split and defer the packing list
- Increase the quantity on an delivery line
- check if the packing list is divergent
- Apply the "split and defer" solver to the packing list
- check that nothing was splitted and the packing list is still divergent
(reset the delta before, as we don't expect a modification)
Basically, when we apply "split and defer" to a packing list, we don't
want it to modify lines which have been increased.
"""
if not run: return
sequence_list = SequenceList()
......@@ -796,8 +845,9 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
stepCheckPackingListIsCalculating \
stepSplitAndDeferPackingList \
stepTic \
stepCheckPackingListIsSolved \
stepCheckPackingListNotSplitted \
stepCheckPackingListIsDiverged \
stepCheckPackingListIsDivergent \
stepCheckPackingListNotSolved \
'
sequence_list.addSequenceString(sequence_string)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment