diff --git a/product/ERP5/tests/testBPMCore.py b/product/ERP5/tests/testBPMCore.py index 12a243a7e6aedd8c5dcdcccec4166341004cbcb1..435c98c62b8ae8e6df418b8ef006f723142bd2c1 100644 --- a/product/ERP5/tests/testBPMCore.py +++ b/product/ERP5/tests/testBPMCore.py @@ -42,7 +42,8 @@ class TestBPMMixin(ERP5TypeTestCase): def getBusinessTemplateList(self): return ('erp5_base', 'erp5_pdm', 'erp5_trade', 'erp5_accounting', - 'erp5_invoicing', 'erp5_simplified_invoicing') + 'erp5_invoicing', 'erp5_simplified_invoicing', 'erp5_simulation', + 'erp5_simulation_test') business_process_portal_type = 'Business Process' business_link_portal_type = 'Business Link' diff --git a/product/ERP5/tests/testERP5SimulationBPMCore.py b/product/ERP5/tests/testERP5SimulationBPMCore.py deleted file mode 100644 index 135bf8d0b323316d644685324d9f4387d649c0b3..0000000000000000000000000000000000000000 --- a/product/ERP5/tests/testERP5SimulationBPMCore.py +++ /dev/null @@ -1,168 +0,0 @@ -# -*- coding: utf-8 -*- -############################################################################## -# Copyright (c) 2010 Nexedi SA and Contributors. All Rights Reserved. -# -# WARNING: This program as such is intended to be used by professional -# programmers who take the whole responsibility of assessing all potential -# consequences resulting from its eventual inadequacies and bugs -# End users who are looking for a ready-to-use solution with commercial -# guarantees and support are strongly advised to contract a Free Software -# Service Company -# -# This program is Free Software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -# -############################################################################## -import transaction - -from Products.ERP5.tests.testBPMCore import TestBPMMixin, test_suite - -if True: - def getBusinessTemplateList(self): - return ('erp5_base', 'erp5_pdm', 'erp5_trade', 'erp5_accounting', - 'erp5_invoicing', 'erp5_simplified_invoicing', 'erp5_simulation') - - TestBPMMixin.getBusinessTemplateList = getBusinessTemplateList - - def createInvoiceTransactionRule(self): - self.receivable_account = self.createAndValidateAccount('receivable', - 'asset/receivable') - self.payable_account = self.createAndValidateAccount('payable', - 'liability/payable') - self.income_account = self.createAndValidateAccount('income', 'income') - self.expense_account = self.createAndValidateAccount('expense', 'expense') - self.collected_tax_account = self.createAndValidateAccount( - 'collected_tax', 'liability/payable/collected_vat') - self.refundable_tax_account = self.createAndValidateAccount( - 'refundable_tax', - 'asset/receivable/refundable_vat') - - itr = self.portal.portal_rules.newContent( - portal_type='Invoice Transaction Simulation Rule', - reference='default_invoice_transaction_rule', - id='test_invoice_transaction_simulation_rule', - title='Transaction Simulation Rule', - test_method_id= - 'SimulationMovement_testInvoiceTransactionSimulationRule', - version=100) - # matching provider for source and destination - for category in ('resource', 'source', 'destination', - 'destination_total_asset_price', - 'source_total_asset_price'): - itr.newContent( - portal_type='Category Membership Divergence Tester', - title='%s divergence tester' % category, - tested_property=category, - divergence_provider=False, - matching_provider=True) - # non-matching/non-divergence provider quantity divergence tester - # (i.e. only used for expand) - itr.newContent( - portal_type='Net Converted Quantity Divergence Tester', - title='quantity divergence tester', - tested_property='quantity', - quantity=0, - divergence_provider=False, - matching_provider=False) - # divergence provider for date - for property_id in ('start_date', 'stop_date'): - itr.newContent( - portal_type='DateTime Divergence Tester', - title='%s divergence tester' % property_id, - tested_property=property_id, - quantity=0, - divergence_provider=True, - matching_provider=False) - for category in ('source_administration', 'source_decision', 'source_function', 'source_payment', 'source_project', 'source_section', 'destination_administration', 'destination_decision', 'destination_function', 'destination_payment', 'destination_project', 'destination_section'): - itr.newContent( - portal_type='Category Membership Divergence Tester', - title='%s divergence tester' % category, - tested_property=category, - divergence_provider=True, - matching_provider=False) - itr.newContent( - portal_type='Float Divergence Tester', - title='price divergence tester', - tested_property='price', - quantity=0, - divergence_provider=True, - matching_provider=False) - predicate = itr.newContent(portal_type='Predicate',) - predicate.edit( - string_index='use', - title='tax', - int_index=1, - membership_criterion_base_category='resource_use', - membership_criterion_category='resource_use/use/tax') - predicate = itr.newContent(portal_type='Predicate',) - predicate.edit( - string_index='use', - title='discount', - int_index=2, - membership_criterion_base_category='resource_use', - membership_criterion_category='resource_use/use/discount') - predicate = itr.newContent(portal_type='Predicate',) - predicate.edit( - string_index='use', - title='normal', - int_index=3, - membership_criterion_base_category='resource_use', - membership_criterion_category='resource_use/use/normal') - transaction.commit() - self.tic() - accounting_rule_cell_list = itr.contentValues( - portal_type='Accounting Rule Cell') - self.assertEquals(3, len(accounting_rule_cell_list)) - tax_rule_cell = itr._getOb("movement_0") - self.assertEquals(tax_rule_cell.getTitle(), 'tax') - tax_rule_cell.newContent( - portal_type='Accounting Transaction Line', - source_value=self.receivable_account, - destination_value=self.payable_account, - quantity=-1) - tax_rule_cell.newContent( - portal_type='Accounting Transaction Line', - source_value=self.collected_tax_account, - destination_value=self.refundable_tax_account, - quantity=1) - - discount_rule_cell = itr._getOb("movement_1") - self.assertEquals(discount_rule_cell.getTitle(), 'discount') - discount_rule_cell.newContent( - portal_type='Accounting Transaction Line', - source_value=self.receivable_account, - destination_value=self.payable_account, - quantity=-1) - discount_rule_cell.newContent( - portal_type='Accounting Transaction Line', - source_value=self.income_account, - destination_value=self.expense_account, - quantity=1) - - normal_rule_cell = itr._getOb("movement_2") - self.assertEquals(normal_rule_cell.getTitle(), 'normal') - normal_rule_cell.newContent( - portal_type='Accounting Transaction Line', - source_value=self.receivable_account, - destination_value=self.payable_account, - quantity=-1) - normal_rule_cell.newContent( - portal_type='Accounting Transaction Line', - source_value=self.income_account, - destination_value=self.expense_account, - quantity=1) - - itr.validate() - - TestBPMMixin.createInvoiceTransactionRule = createInvoiceTransactionRule diff --git a/product/ERP5/tests/testTradeModelLine.py b/product/ERP5/tests/testTradeModelLine.py index f2bc8b09ef3c3df70ac6f7188ce9debf71c0cbb2..834ececea34c54daf10e38d5ad8c621363a9ce8a 100644 --- a/product/ERP5/tests/testTradeModelLine.py +++ b/product/ERP5/tests/testTradeModelLine.py @@ -30,7 +30,7 @@ import unittest import transaction -from Products.ERP5.tests.testERP5SimulationBPMCore import TestBPMMixin +from Products.ERP5.tests.testBPMCore import TestBPMMixin from Products.ERP5Type.tests.backportUnittest import expectedFailure from Products.ERP5Type.tests.Sequence import SequenceList from DateTime import DateTime diff --git a/product/ERP5Legacy/tests/testLegacyBPMCore.py b/product/ERP5Legacy/tests/testLegacyBPMCore.py new file mode 100644 index 0000000000000000000000000000000000000000..d828a3b819d9fc54a3506f15589705ed926931c9 --- /dev/null +++ b/product/ERP5Legacy/tests/testLegacyBPMCore.py @@ -0,0 +1,1352 @@ +# -*- coding: utf-8 -*- +############################################################################## +# Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved. +# 艁ukasz Nowak <luke@nexedi.com> +# Yusuke Muraoka <yusuke@nexedi.com> +# Fabien Morin <fabien@nexedi.com> +# +# WARNING: This program as such is intended to be used by professional +# programmers who take the whole responsibility of assessing all potential +# consequences resulting from its eventual inadequacies and bugs +# End users who are looking for a ready-to-use solution with commercial +# guarantees and support are strongly adviced to contract a Free Software +# Service Company +# +# This program is Free Software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +############################################################################## + +import unittest +import transaction + +from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase +from DateTime import DateTime + +from Products.CMFCore.utils import getToolByName +from Products.ERP5Type.tests.utils import reindex + +class TestBPMMixin(ERP5TypeTestCase): + """Skeletons for tests which depend on BPM""" + + def getBusinessTemplateList(self): + return ('erp5_base', 'erp5_pdm', 'erp5_trade', 'erp5_accounting', + 'erp5_invoicing', 'erp5_simplified_invoicing') + + business_process_portal_type = 'Business Process' + business_path_portal_type = 'Business Path' + business_state_portal_type = 'Business State' + + normal_resource_use_category_list = ['normal'] + invoicing_resource_use_category_list = ['discount', 'tax'] + + def createCategoriesInCategory(self, category, category_id_list): + for category_id in category_id_list: + if not category.hasObject(category_id): + category.newContent(portal_type='Category', id = category_id, + title = category_id) + + @reindex + def createCategories(self): + category_tool = getToolByName(self.portal, 'portal_categories') + self.createCategoriesInCategory(category_tool.base_amount, ['discount', + 'tax', 'total_tax', 'total_discount', 'total']) + self.createCategoriesInCategory(category_tool.use, + self.normal_resource_use_category_list + \ + self.invoicing_resource_use_category_list) + self.createCategoriesInCategory(category_tool.trade_phase, ['default',]) + self.createCategoriesInCategory(category_tool.trade_phase.default, + ['accounting', 'delivery', 'invoicing', 'discount', 'tax', 'payment']) + + @reindex + def createBusinessProcess(self, **kw): + module = self.portal.getDefaultModule( + portal_type=self.business_process_portal_type) + return module.newContent(portal_type=self.business_process_portal_type, + **kw) + + @reindex + def createBusinessPath(self, business_process=None, **kw): + if business_process is None: + business_process = self.createBusinessProcess() + kw['destination_method_id'] = kw.pop('destination_method_id', + 'BusinessPath_getDefaultDestinationList') + kw['source_method_id'] = kw.pop('source_method_id', + 'BusinessPath_getDefaultSourceList') + business_path = business_process.newContent( + portal_type=self.business_path_portal_type, **kw) + return business_path + + @reindex + def createBusinessState(self, business_process=None, **kw): + if business_process is None: + business_process = self.createBusinessProcess() + business_path = business_process.newContent( + portal_type=self.business_state_portal_type, **kw) + return business_path + + def createMovement(self): + # returns a movement for testing + applied_rule = self.portal.portal_simulation.newContent( + portal_type='Applied Rule') + return applied_rule.newContent(portal_type='Simulation Movement') + + @reindex + def createAndValidateAccount(self, account_id, account_type): + account_module = self.portal.account_module + account = account_module.newContent(portal_type='Account', + title=account_id, + account_type=account_type) + self.assertNotEqual(None, account.getAccountTypeValue()) + account.validate() + return account + + def createInvoiceTransactionRule(self): + self.receivable_account = self.createAndValidateAccount('receivable', + 'asset/receivable') + self.payable_account = self.createAndValidateAccount('payable', + 'liability/payable') + self.income_account = self.createAndValidateAccount('income', 'income') + self.expense_account = self.createAndValidateAccount('expense', 'expense') + self.collected_tax_account = self.createAndValidateAccount( + 'collected_tax', 'liability/payable/collected_vat') + self.refundable_tax_account = self.createAndValidateAccount( + 'refundable_tax', + 'asset/receivable/refundable_vat') + + itr = self.portal.portal_rules.newContent( + portal_type='Invoice Transaction Simulation Rule', + reference='default_invoice_transaction_rule', + id='test_invoice_transaction_simulation_rule', + title='Transaction Simulation Rule', + test_method_id= + 'SimulationMovement_testInvoiceTransactionSimulationRule', + version=100) + predicate = itr.newContent(portal_type='Predicate',) + predicate.edit( + string_index='use', + title='tax', + int_index=1, + membership_criterion_base_category='resource_use', + membership_criterion_category='resource_use/use/tax') + predicate = itr.newContent(portal_type='Predicate',) + predicate.edit( + string_index='use', + title='discount', + int_index=2, + membership_criterion_base_category='resource_use', + membership_criterion_category='resource_use/use/discount') + predicate = itr.newContent(portal_type='Predicate',) + predicate.edit( + string_index='use', + title='normal', + int_index=3, + membership_criterion_base_category='resource_use', + membership_criterion_category='resource_use/use/normal') + transaction.commit() + self.tic() + accounting_rule_cell_list = itr.contentValues( + portal_type='Accounting Rule Cell') + self.assertEquals(3, len(accounting_rule_cell_list)) + tax_rule_cell = itr._getOb("movement_0") + self.assertEquals(tax_rule_cell.getTitle(), 'tax') + tax_rule_cell.newContent( + portal_type='Accounting Transaction Line', + source_value=self.receivable_account, + destination_value=self.payable_account, + quantity=-1) + tax_rule_cell.newContent( + portal_type='Accounting Transaction Line', + source_value=self.collected_tax_account, + destination_value=self.refundable_tax_account, + quantity=1) + + discount_rule_cell = itr._getOb("movement_1") + self.assertEquals(discount_rule_cell.getTitle(), 'discount') + discount_rule_cell.newContent( + portal_type='Accounting Transaction Line', + source_value=self.receivable_account, + destination_value=self.payable_account, + quantity=-1) + discount_rule_cell.newContent( + portal_type='Accounting Transaction Line', + source_value=self.income_account, + destination_value=self.expense_account, + quantity=1) + + normal_rule_cell = itr._getOb("movement_2") + self.assertEquals(normal_rule_cell.getTitle(), 'normal') + normal_rule_cell.newContent( + portal_type='Accounting Transaction Line', + source_value=self.receivable_account, + destination_value=self.payable_account, + quantity=-1) + normal_rule_cell.newContent( + portal_type='Accounting Transaction Line', + source_value=self.income_account, + destination_value=self.expense_account, + quantity=1) + + itr.validate() + + def afterSetUp(self): + self.validateRules() + self.createCategories() + self.createInvoiceTransactionRule() + self.stepTic() + + def beforeTearDown(self): + # abort any transaction + transaction.abort() + # put non finished activities into ignored state + activity_connection = self.portal.cmf_activity_sql_connection + for table in 'message', 'message_queue': + activity_connection.manage_test( + 'delete from %s where processing_node=-2' % table) + # remove not needed rules + self.portal.portal_rules.manage_delObjects( + ids=['test_invoice_transaction_simulation_rule']) + self.stepTic() + +class TestBPMImplementation(TestBPMMixin): + """Business Process implementation tests""" + def test_BusinessProcess_getPathValueList(self): + business_process = self.createBusinessProcess() + + accounting_business_path = business_process.newContent( + portal_type=self.business_path_portal_type, + trade_phase='default/accounting') + + delivery_business_path = business_process.newContent( + portal_type=self.business_path_portal_type, + trade_phase='default/delivery') + + accounting_delivery_business_path = business_process.newContent( + portal_type=self.business_path_portal_type, + trade_phase=('default/accounting', 'default/delivery')) + + self.stepTic() + + self.assertSameSet( + (accounting_business_path, accounting_delivery_business_path), + business_process.getPathValueList(trade_phase='default/accounting') + ) + + self.assertSameSet( + (delivery_business_path, accounting_delivery_business_path), + business_process.getPathValueList(trade_phase='default/delivery') + ) + + self.assertSameSet( + (accounting_delivery_business_path, delivery_business_path, + accounting_business_path), + business_process.getPathValueList(trade_phase=('default/delivery', + 'default/accounting')) + ) + + def test_BusinessPathStandardCategoryAccessProvider(self): + source_node = self.portal.organisation_module.newContent( + portal_type='Organisation') + source_section_node = self.portal.organisation_module.newContent( + portal_type='Organisation') + business_path = self.createBusinessPath() + business_path.setSourceValue(source_node) + business_path.setSourceSectionValue(source_section_node) + self.assertEquals([source_node], business_path.getSourceValueList()) + self.assertEquals([source_node.getRelativeUrl()], business_path.getSourceList()) + self.assertEquals(source_node.getRelativeUrl(), + business_path.getSource(default='something')) + + def test_EmptyBusinessPathStandardCategoryAccessProvider(self): + business_path = self.createBusinessPath() + self.assertEquals(None, business_path.getSourceValue()) + self.assertEquals(None, business_path.getSource()) + self.assertEquals('something', + business_path.getSource(default='something')) + + def test_BuinessPathDynamicCategoryAccessProvider(self): + source_node = self.portal.organisation_module.newContent( + portal_type='Organisation') + source_section_node = self.portal.organisation_module.newContent( + portal_type='Organisation') + business_path = self.createBusinessPath() + business_path.setSourceMethodId('BusinessPath_getDefaultSourceList') + + context_movement = self.createMovement() + context_movement.setSourceValue(source_node) + context_movement.setSourceSectionValue(source_section_node) + self.assertEquals(None, business_path.getSourceValue()) + self.assertEquals([source_node], + business_path.getSourceValueList(context=context_movement)) + self.assertEquals([source_node.getRelativeUrl()], + business_path.getSourceList(context=context_movement)) + self.assertEquals(source_node.getRelativeUrl(), + business_path.getSource(context=context_movement, default='something')) + + def test_BuinessPathDynamicCategoryAccessProviderBusinessPathPrecedence(self): + movement_node = self.portal.organisation_module.newContent( + portal_type='Organisation') + path_node = self.portal.organisation_module.newContent( + portal_type='Organisation') + business_path = self.createBusinessPath() + business_path.setSourceMethodId('BusinessPath_getDefaultSourceList') + business_path.setSourceValue(path_node) + + context_movement = self.createMovement() + context_movement.setSourceValue(movement_node) + self.assertEquals(path_node, business_path.getSourceValue()) + self.assertEquals(path_node, + business_path.getSourceValue(context=context_movement)) + self.assertEquals([path_node], + business_path.getSourceValueList(context=context_movement)) + + def test_BuinessPathDynamicCategoryAccessProviderEmptyMovement(self): + business_path = self.createBusinessPath() + business_path.setSourceMethodId('BusinessPath_getDefaultSourceList') + + context_movement = self.createMovement() + self.assertEquals(None, business_path.getSourceValue()) + self.assertEquals(None, + business_path.getSourceValue(context=context_movement)) + self.assertEquals(None, + business_path.getSource(context=context_movement)) + self.assertEquals('something', + business_path.getSource(context=context_movement, default='something')) + + def test_BusinessState_getRemainingTradePhaseList(self): + """ + This test case is described for what trade_phase is remaining after the state. + In this case, root explanation is path of between "b" and "d", and + path of between "a" and "b" has a condition which simulation state of + explanation must be "ordered" to pass the path. (*1) + But this test case will be passed the condition. + + (root explanation) + default/discount default/invoicing default/accounting + a ------------------ b ------------------- d -------------------- e + (cond="ordered") \ / + \ / + default/delivery \ / default/payment + \ / + \ / + \ / + \ / + \ / + \ / + \ / + c + """ + # define business process + business_process = self.createBusinessProcess() + business_path_a_b = self.createBusinessPath(business_process) + business_path_b_c = self.createBusinessPath(business_process) + business_path_b_d = self.createBusinessPath(business_process) + business_path_c_d = self.createBusinessPath(business_process) + business_path_d_e = self.createBusinessPath(business_process) + business_state_a = self.createBusinessState(business_process) + business_state_b = self.createBusinessState(business_process) + business_state_c = self.createBusinessState(business_process) + business_state_d = self.createBusinessState(business_process) + business_state_e = self.createBusinessState(business_process) + business_path_a_b.setPredecessorValue(business_state_a) + business_path_b_c.setPredecessorValue(business_state_b) + business_path_b_d.setPredecessorValue(business_state_b) + business_path_c_d.setPredecessorValue(business_state_c) + business_path_d_e.setPredecessorValue(business_state_d) + business_path_a_b.setSuccessorValue(business_state_b) + business_path_b_c.setSuccessorValue(business_state_c) + business_path_b_d.setSuccessorValue(business_state_d) + business_path_c_d.setSuccessorValue(business_state_d) + business_path_d_e.setSuccessorValue(business_state_e) + + # set title for debug + business_path_a_b.edit(title="a_b") + business_path_b_c.edit(title="b_c") + business_path_b_d.edit(title="b_d") + business_path_c_d.edit(title="c_d") + business_path_d_e.edit(title="d_e") + business_state_a.edit(title="a") + business_state_b.edit(title="b") + business_state_c.edit(title="c") + business_state_d.edit(title="d") + business_state_e.edit(title="e") + + # set trade_phase + business_path_a_b.edit(trade_phase=['default/discount'], + completed_state=['ordered']) # (*1) + business_path_b_c.edit(trade_phase=['default/delivery']) + business_path_b_d.edit(trade_phase=['default/invoicing']) + business_path_c_d.edit(trade_phase=['default/payment']) + business_path_d_e.edit(trade_phase=['default/accounting']) + + # mock order + order = self.portal.sale_order_module.newContent(portal_type="Sale Order") + order_line = order.newContent(portal_type="Sale Order Line") + + # make simulation + order.order() + + self.stepTic() + + applied_rule = order.getCausalityRelatedValue() + sm = applied_rule.contentValues(portal_type="Simulation Movement")[0] + sm.edit(causality_value=business_path_a_b) + + # make other movements for each business path + applied_rule.newContent(portal_type="Simulation Movement", + causality_value=business_path_b_c, + order_value=order_line) + applied_rule.newContent(portal_type="Simulation Movement", + causality_value=business_path_b_d, + order_value=order_line) + applied_rule.newContent(portal_type="Simulation Movement", + causality_value=business_path_c_d, + order_value=order_line) + applied_rule.newContent(portal_type="Simulation Movement", + causality_value=business_path_d_e, + order_value=order_line) + + self.stepTic() + + trade_phase = self.portal.portal_categories.trade_phase.default + + # assertion which getRemainingTradePhaseList must return category which will be passed + # discount is passed, business_path_a_b is already completed, because simulation state is "ordered" + self.assertEquals(set([trade_phase.delivery, + trade_phase.invoicing, + trade_phase.payment, + trade_phase.accounting]), + set(business_state_a.getRemainingTradePhaseList(order))) + self.assertEquals(set([trade_phase.delivery, + trade_phase.invoicing, + trade_phase.payment, + trade_phase.accounting]), + set(business_state_b.getRemainingTradePhaseList(order))) + self.assertEquals(set([trade_phase.payment, + trade_phase.accounting]), + set(business_state_c.getRemainingTradePhaseList(order))) + self.assertEquals(set([trade_phase.accounting]), + set(business_state_d.getRemainingTradePhaseList(order))) + + # when trade_phase_list is defined in arguments, the result is filtered by base category. + self.assertEquals(set([trade_phase.delivery, + trade_phase.accounting]), + set(business_state_a\ + .getRemainingTradePhaseList(order, + trade_phase_list=['default/delivery', + 'default/accounting']))) + + def test_BusinessPath_calculateExpectedDate(self): + """ + This test case is described for what start/stop date is expected on + each path by explanation. + In this case, root explanation is path of between "b" and "d", and + lead time and wait time is set on each path. + ("l" is lead time, "w" is wait_time) + + Each path must calculate most early day from getting most longest + path in the simulation. + + "referential_date" represents for which date have to get of explanation from reality. + + (root_explanation) + l:2, w:1 l:3, w:1 l:4, w:2 + a ------------ b -------------- d -------------- e + \ / + \ / + l:2, w:1 \ / l:3, w:0 + \ / + \ / + \ / + \ / + c + """ + # define business process + business_process = self.createBusinessProcess() + business_path_a_b = self.createBusinessPath(business_process) + business_path_b_c = self.createBusinessPath(business_process) + business_path_b_d = self.createBusinessPath(business_process) + business_path_c_d = self.createBusinessPath(business_process) + business_path_d_e = self.createBusinessPath(business_process) + business_state_a = self.createBusinessState(business_process) + business_state_b = self.createBusinessState(business_process) + business_state_c = self.createBusinessState(business_process) + business_state_d = self.createBusinessState(business_process) + business_state_e = self.createBusinessState(business_process) + business_path_a_b.setPredecessorValue(business_state_a) + business_path_b_c.setPredecessorValue(business_state_b) + business_path_b_d.setPredecessorValue(business_state_b) + business_path_c_d.setPredecessorValue(business_state_c) + business_path_d_e.setPredecessorValue(business_state_d) + business_path_a_b.setSuccessorValue(business_state_b) + business_path_b_c.setSuccessorValue(business_state_c) + business_path_b_d.setSuccessorValue(business_state_d) + business_path_c_d.setSuccessorValue(business_state_d) + business_path_d_e.setSuccessorValue(business_state_e) + + business_process.edit(referential_date='stop_date') + business_state_a.edit(title='a') + business_state_b.edit(title='b') + business_state_c.edit(title='c') + business_state_d.edit(title='d') + business_state_e.edit(title='e') + business_path_a_b.edit(title='a_b', lead_time=2, wait_time=1) + business_path_b_c.edit(title='b_c', lead_time=2, wait_time=1) + business_path_b_d.edit(title='b_d', lead_time=3, wait_time=1) + business_path_c_d.edit(title='c_d', lead_time=3, wait_time=0) + business_path_d_e.edit(title='d_e', lead_time=4, wait_time=2) + + # root explanation + business_path_b_d.edit(deliverable=True) + self.stepTic() + + """ + Basic test, lead time of reality and simulation are consistent. + """ + class Mock: + def __init__(self, date): + self.date = date + def getStartDate(self): + return self.date + def getStopDate(self): + return self.date + 3 # lead time of reality + + base_date = DateTime('2009/04/01 GMT+9') + mock = Mock(base_date) + + # root explanation. + self.assertEquals(business_path_b_d.getExpectedStartDate(mock), DateTime('2009/04/01 GMT+9')) + self.assertEquals(business_path_b_d.getExpectedStopDate(mock), DateTime('2009/04/04 GMT+9')) + + # assertion for each path without root explanation. + self.assertEquals(business_path_a_b.getExpectedStartDate(mock), DateTime('2009/03/27 GMT+9')) + self.assertEquals(business_path_a_b.getExpectedStopDate(mock), DateTime('2009/03/29 GMT+9')) + self.assertEquals(business_path_b_c.getExpectedStartDate(mock), DateTime('2009/03/30 GMT+9')) + self.assertEquals(business_path_b_c.getExpectedStopDate(mock), DateTime('2009/04/01 GMT+9')) + self.assertEquals(business_path_c_d.getExpectedStartDate(mock), DateTime('2009/04/01 GMT+9')) + self.assertEquals(business_path_c_d.getExpectedStopDate(mock), DateTime('2009/04/04 GMT+9')) + self.assertEquals(business_path_d_e.getExpectedStartDate(mock), DateTime('2009/04/06 GMT+9')) + self.assertEquals(business_path_d_e.getExpectedStopDate(mock), DateTime('2009/04/10 GMT+9')) + + """ + Test of illegal case, lead time of reality and simulation are inconsistent, + always reality is taken, but it depends on which date(e.g. start_date and stop_date) is referential. + + How we know which is referential, currently implementation of it can be known by + BusinessProcess.isStartDateReferential and BusinessProcess.isStopDateReferential. + + In this test case, stop_date on business_path_b_d is referential, because business_path_b_d is + root explanation and business_process refer to stop_date as referential. + + calculation example(when referential date is 2009/04/06 GMT+9): + start_date of business_path_b_d = referential_date - 3(lead_time of business_path_b_d) + = 2009/04/06 GMT+9 - 3 + = 2009/04/03 GMT+9 + """ + class Mock: + def __init__(self, date): + self.date = date + def getStartDate(self): + return self.date + def getStopDate(self): + return self.date + 5 # changed + + base_date = DateTime('2009/04/01 GMT+9') + mock = Mock(base_date) + + self.assertEquals(business_path_b_d.getExpectedStartDate(mock), DateTime('2009/04/03 GMT+9')) + # This is base in this context, because referential_date is 'stop_date' + self.assertEquals(business_path_b_d.getExpectedStopDate(mock), DateTime('2009/04/06 GMT+9')) + + # assertion for each path without root explanation. + self.assertEquals(business_path_a_b.getExpectedStartDate(mock), DateTime('2009/03/29 GMT+9')) + self.assertEquals(business_path_a_b.getExpectedStopDate(mock), DateTime('2009/03/31 GMT+9')) + self.assertEquals(business_path_b_c.getExpectedStartDate(mock), DateTime('2009/04/01 GMT+9')) + self.assertEquals(business_path_b_c.getExpectedStopDate(mock), DateTime('2009/04/03 GMT+9')) + self.assertEquals(business_path_c_d.getExpectedStartDate(mock), DateTime('2009/04/03 GMT+9')) + self.assertEquals(business_path_c_d.getExpectedStopDate(mock), DateTime('2009/04/06 GMT+9')) + self.assertEquals(business_path_d_e.getExpectedStartDate(mock), DateTime('2009/04/08 GMT+9')) + self.assertEquals(business_path_d_e.getExpectedStopDate(mock), DateTime('2009/04/12 GMT+9')) + + def testBPMCopyAndPaste(self): + business_process = self.createBusinessProcess() + state = business_process.newContent( + portal_type=self.business_state_portal_type) + path = business_process.newContent( + portal_type=self.business_path_portal_type, predecessor_value=state, + successor_value=state) + transaction.commit() + self.tic() + + pasted_business_process = business_process.Base_createCloneDocument( + batch_mode=1) + transaction.commit() + self.tic() + + pasted_path = pasted_business_process.contentValues( + portal_type=self.business_path_portal_type)[0] + pasted_state = pasted_business_process.contentValues( + portal_type=self.business_state_portal_type)[0] + + self.assertEqual(pasted_state, pasted_path.getSuccessorValue()) + self.assertEqual(pasted_state, pasted_path.getPredecessorValue()) + +class TestBPMDummyDeliveryMovementMixin(TestBPMMixin): + def _createDelivery(self, **kw): + return self.folder.newContent(portal_type='Dummy Delivery', **kw) + + def _createMovement(self, delivery, **kw): + return delivery.newContent(portal_type='Dummy Movement', **kw) + + def getBusinessTemplateList(self): + return TestBPMMixin.getBusinessTemplateList(self) \ + + ('erp5_dummy_movement', ) + + def afterSetUp(self): + TestBPMMixin.afterSetUp(self) + if not hasattr(self.portal, 'testing_folder'): + self.portal.newContent(portal_type='Folder', + id='testing_folder') + self.folder = self.portal.testing_folder + self.stepTic() + + def beforeTearDown(self): + TestBPMMixin.beforeTearDown(self) + self.portal.deleteContent(id='testing_folder') + self.stepTic() + + completed_state = 'delivered' + frozen_state = 'confirmed' + + completed_state_list = [completed_state, frozen_state] + frozen_state_list = [frozen_state] + + def _createOrderedDeliveredInvoicedBusinessProcess(self): + # simple business process preparation + business_process = self.createBusinessProcess() + ordered = self.createBusinessState(business_process) + delivered = self.createBusinessState(business_process) + invoiced = self.createBusinessState(business_process) + + # path which is completed, as soon as related simulation movements are in + # proper state + self.order_path = self.createBusinessPath(business_process, + successor_value = ordered, + trade_phase='default/order', + completed_state_list = self.completed_state_list, + frozen_state_list = self.frozen_state_list) + + self.delivery_path = self.createBusinessPath(business_process, + predecessor_value = ordered, successor_value = delivered, + trade_phase='default/delivery', + completed_state_list = self.completed_state_list, + frozen_state_list = self.frozen_state_list) + + self.invoice_path = self.createBusinessPath(business_process, + predecessor_value = delivered, successor_value = invoiced, + trade_phase='default/invoicing') + self.stepTic() + + def _createOrderedInvoicedDeliveredBusinessProcess(self): + business_process = self.createBusinessProcess() + ordered = self.createBusinessState(business_process) + delivered = self.createBusinessState(business_process) + invoiced = self.createBusinessState(business_process) + + self.order_path = self.createBusinessPath(business_process, + successor_value = ordered, + trade_phase='default/order', + completed_state_list = self.completed_state_list, + frozen_state_list = self.frozen_state_list) + + self.invoice_path = self.createBusinessPath(business_process, + predecessor_value = ordered, successor_value = invoiced, + trade_phase='default/invoicing', + completed_state_list = self.completed_state_list, + frozen_state_list = self.frozen_state_list) + + self.delivery_path = self.createBusinessPath(business_process, + predecessor_value = invoiced, successor_value = delivered, + trade_phase='default/delivery') + self.stepTic() + +class TestBPMisBuildableImplementation(TestBPMDummyDeliveryMovementMixin): + def test_isBuildable_OrderedDeliveredInvoiced(self): + """Test isBuildable for ordered, delivered and invoiced sequence + + Here Business Process sequence corresponds simulation tree. + + delivery_path is related to root applied rule, and invoice_path is related + to rule below, and invoice_path is after delivery_path + """ + self._createOrderedDeliveredInvoicedBusinessProcess() + # create order and order line to have starting point for business process + order = self._createDelivery() + order_line = self._createMovement(order) + + # first level rule with simulation movement + applied_rule = self.portal.portal_simulation.newContent( + portal_type='Applied Rule', causality_value=order) + + simulation_movement = applied_rule.newContent( + portal_type = 'Simulation Movement', + delivery_value = order_line, + causality_value = self.order_path + ) + + # second level rule with simulation movement + delivery_rule = simulation_movement.newContent( + portal_type='Applied Rule') + delivery_simulation_movement = delivery_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.delivery_path) + + # third level rule with simulation movement + invoicing_rule = delivery_simulation_movement.newContent( + portal_type='Applied Rule') + invoicing_simulation_movement = invoicing_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.invoice_path) + + # split simulation movement for first level applied rule + split_simulation_movement = applied_rule.newContent( + portal_type = 'Simulation Movement', delivery_value = order_line, + causality_value = self.order_path) + + # second level rule with simulation movement for split parent movement + split_delivery_rule = split_simulation_movement.newContent( + portal_type='Applied Rule') + split_delivery_simulation_movement = split_delivery_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.delivery_path) + + # third level rule with simulation movement for split parent movement + split_invoicing_rule = split_delivery_simulation_movement.newContent( + portal_type='Applied Rule') + split_invoicing_simulation_movement = split_invoicing_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.invoice_path) + + order.setSimulationState(self.completed_state) + self.stepTic() + + # in the beginning only order related movements shall be buildable + self.assertEquals(self.delivery_path.isBuildable(order), True) + self.assertEquals(delivery_simulation_movement.isBuildable(), True) + self.assertEquals(split_delivery_simulation_movement.isBuildable(), True) + + self.assertEquals(self.invoice_path.isBuildable(order), False) + self.assertEquals(invoicing_simulation_movement.isBuildable(), False) + self.assertEquals(split_invoicing_simulation_movement.isBuildable(), + False) + + # add delivery + delivery = self._createDelivery(causality_value = order) + delivery_line = self._createMovement(delivery) + + # relate not split movement with delivery (deliver it) + delivery_simulation_movement.edit(delivery_value = delivery_line) + + self.stepTic() + + # delivery_path (for order) is still buildable, as split movement is not + # delivered yet + # + # invoice_path is not yet buildable, delivery is in inproper simulation + # state + # + # delivery_path (for delivery) is not buildable - delivery is already + # built for those movements + self.assertEquals(self.delivery_path.isBuildable(order), True) + self.assertEquals(split_delivery_simulation_movement.isBuildable(), True) + + self.assertEquals(self.delivery_path.isBuildable(delivery), False) + self.assertEquals(self.invoice_path.isBuildable(delivery), False) + self.assertEquals(delivery_simulation_movement.isBuildable(), False) + self.assertEquals(invoicing_simulation_movement.isBuildable(), False) + self.assertEquals(self.invoice_path.isBuildable(order), False) + self.assertEquals(split_invoicing_simulation_movement.isBuildable(), + False) + + # put delivery in simulation state configured on path (and this state is + # available directly on movements) + + delivery.setSimulationState(self.completed_state) + + self.assertEqual(self.completed_state, delivery.getSimulationState()) + + self.stepTic() + + # delivery_path (for order) is still buildable, as split movement is not + # delivered yet + # + # invoicing_path (for delivery and order) is buildable - in case of order, + # because part of tree is buildable + # + # split movement for invoicing is not buildable - no proper delivery + # related for previous path + self.assertEquals(self.delivery_path.isBuildable(order), True) + self.assertEquals(invoicing_simulation_movement.isBuildable(), True) + self.assertEquals(self.invoice_path.isBuildable(delivery), True) + + # XXX look at comments in BusinessPath.isBuildable + self.assertEquals(self.invoice_path.isBuildable(order), True) + + self.assertEquals(self.delivery_path.isBuildable(delivery), False) + self.assertEquals(delivery_simulation_movement.isBuildable(), False) + self.assertEquals(split_invoicing_simulation_movement.isBuildable(), + False) + + def test_isBuildable_OrderedInvoicedDelivered(self): + """Test isBuildable for ordered, invoiced and delivered sequence + + Here Business Process sequence do not corresponds simulation tree. + + delivery_path is related to root applied rule, and invoice_path is related + to rule below, but invoice_path is before delivery_path in seuqence. + """ + self._createOrderedInvoicedDeliveredBusinessProcess() + + order = self._createDelivery() + order_line = self._createMovement(order) + + applied_rule = self.portal.portal_simulation.newContent( + portal_type='Applied Rule', causality_value=order) + + simulation_movement = applied_rule.newContent( + portal_type = 'Simulation Movement', + delivery_value = order_line, + causality_value = self.order_path + ) + + delivery_rule = simulation_movement.newContent( + portal_type='Applied Rule') + delivery_simulation_movement = delivery_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.delivery_path) + + invoicing_rule = delivery_simulation_movement.newContent( + portal_type='Applied Rule') + invoicing_simulation_movement = invoicing_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.invoice_path) + + order.setSimulationState(self.completed_state) + self.stepTic() + + self.assertEquals(self.delivery_path.isBuildable(order), False) + self.assertEquals(delivery_simulation_movement.isBuildable(), False) + + self.assertEquals(self.invoice_path.isBuildable(order), True) + self.assertEquals(invoicing_simulation_movement.isBuildable(), True) + + delivery = self._createDelivery(causality_value = order) + delivery_line = self._createMovement(delivery) + + invoicing_simulation_movement.edit(delivery_value = delivery_line) + + self.stepTic() + + self.assertEquals(self.delivery_path.isBuildable(order), False) + + self.assertEquals(self.delivery_path.isBuildable(delivery), False) + self.assertEquals(self.invoice_path.isBuildable(delivery), False) + self.assertEquals(delivery_simulation_movement.isBuildable(), False) + self.assertEquals(invoicing_simulation_movement.isBuildable(), False) + self.assertEquals(self.invoice_path.isBuildable(order), False) + + # put delivery in simulation state configured on path (and this state is + # available directly on movements) + + delivery.setSimulationState(self.completed_state) + + self.assertEqual(self.completed_state, delivery.getSimulationState()) + + self.stepTic() + + self.assertEquals(self.delivery_path.isBuildable(order), True) + self.assertEquals(self.delivery_path.isBuildable(delivery), True) + self.assertEquals(invoicing_simulation_movement.isBuildable(), False) + self.assertEquals(self.invoice_path.isBuildable(delivery), False) + self.assertEquals(self.invoice_path.isBuildable(order), False) + self.assertEquals(delivery_simulation_movement.isBuildable(), True) + + # now simulate compensation + + compensated_simulation_movement = delivery_rule.newContent( + portal_type = 'Simulation Movement', + delivery_value = order_line, + causality_value = self.delivery_path + ) + + compensated_invoicing_rule = compensated_simulation_movement.newContent( + portal_type='Applied Rule') + + compensated_invoicing_simulation_movement = compensated_invoicing_rule \ + .newContent(portal_type='Simulation Movement', + causality_value = self.invoice_path) + + # and delivery some part of tree + + another_delivery = self._createDelivery(causality_value = delivery) + another_delivery_line = self._createMovement(another_delivery) + + delivery_simulation_movement.edit(delivery_value=another_delivery_line) + + self.stepTic() + + self.assertEquals(self.delivery_path.isBuildable(order), False) + + self.assertEquals(delivery_simulation_movement.isBuildable(), False) + self.assertEquals(invoicing_simulation_movement.isBuildable(), False) + + self.assertEquals(self.invoice_path.isBuildable(order), True) + self.assertEquals(compensated_invoicing_simulation_movement.isBuildable(), + True) + + self.assertEquals(compensated_simulation_movement.isBuildable(), False) + +class TestBPMisCompletedImplementation(TestBPMDummyDeliveryMovementMixin): + def test_isCompleted_OrderedDeliveredInvoiced(self): + """Test isCompleted for ordered, delivered and invoiced sequence""" + self._createOrderedDeliveredInvoicedBusinessProcess() + + # create order and order line to have starting point for business process + order = self._createDelivery() + order_line = self._createMovement(order) + + # first level rule with simulation movement + applied_rule = self.portal.portal_simulation.newContent( + portal_type='Applied Rule', causality_value=order) + + simulation_movement = applied_rule.newContent( + portal_type = 'Simulation Movement', + delivery_value = order_line, + causality_value = self.order_path + ) + + # second level rule with simulation movement + delivery_rule = simulation_movement.newContent( + portal_type='Applied Rule') + delivery_simulation_movement = delivery_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.delivery_path) + + # third level rule with simulation movement + invoicing_rule = delivery_simulation_movement.newContent( + portal_type='Applied Rule') + invoicing_simulation_movement = invoicing_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.invoice_path) + + # split simulation movement for first level applied rule + split_simulation_movement = applied_rule.newContent( + portal_type = 'Simulation Movement', delivery_value = order_line, + causality_value = self.order_path) + + # second level rule with simulation movement for split parent movement + split_delivery_rule = split_simulation_movement.newContent( + portal_type='Applied Rule') + split_delivery_simulation_movement = split_delivery_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.delivery_path) + + # third level rule with simulation movement for split parent movement + split_invoicing_rule = split_delivery_simulation_movement.newContent( + portal_type='Applied Rule') + split_invoicing_simulation_movement = split_invoicing_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.invoice_path) + + self.stepTic() + + self.assertEqual(self.delivery_path.isCompleted(order), False) + self.assertEqual(self.delivery_path.isPartiallyCompleted(order), False) + + self.assertEqual(self.invoice_path.isCompleted(order), False) + self.assertEqual(self.invoice_path.isPartiallyCompleted(order), False) + + # add delivery + delivery = self._createDelivery(causality_value = order) + delivery_line = self._createMovement(delivery) + + # relate not split movement with delivery (deliver it) + delivery_simulation_movement.edit(delivery_value = delivery_line) + + self.stepTic() + + # nothing changes + self.assertEqual(self.delivery_path.isCompleted(order), False) + self.assertEqual(self.delivery_path.isPartiallyCompleted(order), False) + + self.assertEqual(self.invoice_path.isCompleted(order), False) + self.assertEqual(self.invoice_path.isPartiallyCompleted(order), False) + + # from delivery point of view everything is same + self.assertEqual(self.delivery_path.isCompleted(delivery), False) + self.assertEqual(self.delivery_path.isPartiallyCompleted(delivery), False) + + self.assertEqual(self.invoice_path.isCompleted(delivery), False) + self.assertEqual(self.invoice_path.isPartiallyCompleted(delivery), False) + + # put delivery in simulation state configured on path (and this state is + # available directly on movements) + + delivery.setSimulationState(self.completed_state) + + self.assertEqual(self.completed_state, delivery.getSimulationState()) + + self.stepTic() + + self.assertEqual(self.delivery_path.isCompleted(order), False) + self.assertEqual(self.delivery_path.isPartiallyCompleted(order), True) + + self.assertEqual(self.invoice_path.isCompleted(order), False) + self.assertEqual(self.invoice_path.isPartiallyCompleted(order), False) + + self.assertEqual(self.delivery_path.isCompleted(delivery), True) + self.assertEqual(self.delivery_path.isPartiallyCompleted(delivery), True) + + self.assertEqual(self.invoice_path.isCompleted(delivery), False) + self.assertEqual(self.invoice_path.isPartiallyCompleted(delivery), False) + + def test_isCompleted_OrderedInvoicedDelivered(self): + """Test isCompleted for ordered, invoiced and invoiced sequence""" + self._createOrderedInvoicedDeliveredBusinessProcess() + + order = self._createDelivery() + order_line = self._createMovement(order) + + applied_rule = self.portal.portal_simulation.newContent( + portal_type='Applied Rule', causality_value=order) + + simulation_movement = applied_rule.newContent( + portal_type = 'Simulation Movement', + delivery_value = order_line, + causality_value = self.delivery_path + ) + + delivery_rule = simulation_movement.newContent( + portal_type='Applied Rule') + delivery_simulation_movement = delivery_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.delivery_path) + + invoicing_rule = delivery_simulation_movement.newContent( + portal_type='Applied Rule') + invoicing_simulation_movement = invoicing_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.invoice_path) + + self.stepTic() + + self.assertEqual(self.delivery_path.isCompleted(order), False) + self.assertEqual(self.delivery_path.isPartiallyCompleted(order), False) + + self.assertEqual(self.invoice_path.isCompleted(order), False) + self.assertEqual(self.invoice_path.isPartiallyCompleted(order), False) + + delivery = self._createDelivery(causality_value = order) + delivery_line = self._createMovement(delivery) + + invoicing_simulation_movement.edit(delivery_value = delivery_line) + + self.stepTic() + + self.assertEqual(self.delivery_path.isCompleted(order), False) + self.assertEqual(self.delivery_path.isPartiallyCompleted(order), False) + + self.assertEqual(self.invoice_path.isCompleted(order), False) + self.assertEqual(self.invoice_path.isPartiallyCompleted(order), False) + + self.assertEqual(self.delivery_path.isCompleted(delivery), False) + self.assertEqual(self.delivery_path.isPartiallyCompleted(delivery), False) + + self.assertEqual(self.invoice_path.isCompleted(delivery), False) + self.assertEqual(self.invoice_path.isPartiallyCompleted(delivery), False) + + # put delivery in simulation state configured on path (and this state is + # available directly on movements) + + delivery.setSimulationState(self.completed_state) + + self.assertEqual(self.completed_state, delivery.getSimulationState()) + + self.stepTic() + + self.assertEqual(self.delivery_path.isCompleted(order), False) + self.assertEqual(self.delivery_path.isPartiallyCompleted(order), False) + + self.assertEqual(self.invoice_path.isCompleted(order), True) + self.assertEqual(self.invoice_path.isPartiallyCompleted(order), True) + + self.assertEqual(self.delivery_path.isCompleted(delivery), False) + self.assertEqual(self.delivery_path.isPartiallyCompleted(delivery), False) + + self.assertEqual(self.invoice_path.isCompleted(delivery), True) + self.assertEqual(self.invoice_path.isPartiallyCompleted(delivery), True) + + # now simulate compensation + + compensated_simulation_movement = delivery_rule.newContent( + portal_type = 'Simulation Movement', + delivery_value = order_line, + causality_value = self.delivery_path + ) + + compensated_invoicing_rule = compensated_simulation_movement.newContent( + portal_type='Applied Rule') + + compensated_invoicing_simulation_movement = compensated_invoicing_rule \ + .newContent(portal_type='Simulation Movement', + causality_value = self.invoice_path) + + # and delivery some part of tree + + another_delivery = self._createDelivery(causality_value = delivery) + another_delivery_line = self._createMovement(another_delivery) + + delivery_simulation_movement.edit(delivery_value=another_delivery_line) + + self.stepTic() + + self.assertEqual(self.delivery_path.isCompleted(order), False) + self.assertEqual(self.delivery_path.isPartiallyCompleted(order), False) + + self.assertEqual(self.invoice_path.isCompleted(order), False) + self.assertEqual(self.invoice_path.isPartiallyCompleted(order), True) + + self.assertEqual(self.delivery_path.isCompleted(delivery), False) + self.assertEqual(self.delivery_path.isPartiallyCompleted(delivery), False) + + self.assertEqual(self.invoice_path.isCompleted(delivery), True) + self.assertEqual(self.invoice_path.isPartiallyCompleted(delivery), True) + +class TestBPMisFrozenImplementation(TestBPMDummyDeliveryMovementMixin): + def test_isFrozen_OrderedDeliveredInvoiced(self): + """Test isFrozen for ordered, delivered and invoiced sequence""" + self._createOrderedDeliveredInvoicedBusinessProcess() + + # create order and order line to have starting point for business process + order = self._createDelivery() + order_line = self._createMovement(order) + + # first level rule with simulation movement + applied_rule = self.portal.portal_simulation.newContent( + portal_type='Applied Rule', causality_value=order) + + simulation_movement = applied_rule.newContent( + portal_type = 'Simulation Movement', + delivery_value = order_line, + causality_value = self.delivery_path + ) + + # second level rule with simulation movement + delivery_rule = simulation_movement.newContent( + portal_type='Applied Rule') + delivery_simulation_movement = delivery_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.delivery_path) + + # third level rule with simulation movement + invoicing_rule = delivery_simulation_movement.newContent( + portal_type='Applied Rule') + invoicing_simulation_movement = invoicing_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.invoice_path) + + # split simulation movement for first level applied rule + split_simulation_movement = applied_rule.newContent( + portal_type = 'Simulation Movement', delivery_value = order_line, + causality_value = self.order_path) + + # second level rule with simulation movement for split parent movement + split_delivery_rule = split_simulation_movement.newContent( + portal_type='Applied Rule') + split_delivery_simulation_movement = split_delivery_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.delivery_path) + + # third level rule with simulation movement for split parent movement + split_invoicing_rule = split_delivery_simulation_movement.newContent( + portal_type='Applied Rule') + split_invoicing_simulation_movement = split_invoicing_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.invoice_path) + + self.stepTic() + + self.assertEqual(self.delivery_path.isFrozen(order), False) + self.assertEqual(self.invoice_path.isFrozen(order), False) + + self.assertEqual(simulation_movement.isFrozen(), False) + self.assertEqual(invoicing_simulation_movement.isFrozen(), False) + self.assertEqual(split_simulation_movement.isFrozen(), False) + self.assertEqual(split_invoicing_simulation_movement.isFrozen(), False) + + # add delivery + delivery = self._createDelivery(causality_value = order) + delivery_line = self._createMovement(delivery) + + # relate not split movement with delivery (deliver it) + delivery_simulation_movement.edit(delivery_value = delivery_line) + + self.stepTic() + + # nothing changes + self.assertEqual(self.delivery_path.isFrozen(order), False) + self.assertEqual(self.invoice_path.isFrozen(order), False) + + # from delivery point of view everything is same + self.assertEqual(self.delivery_path.isFrozen(delivery), False) + self.assertEqual(self.invoice_path.isFrozen(delivery), False) + + self.assertEqual(simulation_movement.isFrozen(), False) + self.assertEqual(invoicing_simulation_movement.isFrozen(), False) + self.assertEqual(split_simulation_movement.isFrozen(), False) + self.assertEqual(split_invoicing_simulation_movement.isFrozen(), False) + + # put delivery in simulation state configured on path (and this state is + # available directly on movements) + + delivery.setSimulationState(self.frozen_state) + + self.assertEqual(self.frozen_state, delivery.getSimulationState()) + + self.stepTic() + + self.assertEqual(self.delivery_path.isFrozen(order), False) + self.assertEqual(self.invoice_path.isFrozen(order), False) + self.assertEqual(self.delivery_path.isFrozen(delivery), False) + self.assertEqual(self.invoice_path.isFrozen(delivery), False) + + self.assertEqual(delivery_simulation_movement.isFrozen(), True) + self.assertEqual(invoicing_simulation_movement.isFrozen(), False) + self.assertEqual(split_simulation_movement.isFrozen(), False) + self.assertEqual(split_invoicing_simulation_movement.isFrozen(), False) + + def test_isFrozen_OrderedInvoicedDelivered(self): + """Test isFrozen for ordered, invoiced and invoiced sequence""" + self._createOrderedInvoicedDeliveredBusinessProcess() + + order = self._createDelivery() + order_line = self._createMovement(order) + + applied_rule = self.portal.portal_simulation.newContent( + portal_type='Applied Rule', causality_value=order) + + simulation_movement = applied_rule.newContent( + portal_type = 'Simulation Movement', + delivery_value = order_line, + causality_value = self.delivery_path + ) + + delivery_rule = simulation_movement.newContent( + portal_type='Applied Rule') + delivery_simulation_movement = delivery_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.delivery_path) + + invoicing_rule = delivery_simulation_movement.newContent( + portal_type='Applied Rule') + invoicing_simulation_movement = invoicing_rule.newContent( + portal_type='Simulation Movement', + causality_value = self.invoice_path) + + self.stepTic() + + self.assertEqual(self.delivery_path.isFrozen(order), False) + self.assertEqual(self.invoice_path.isFrozen(order), False) + + self.assertEqual(simulation_movement.isFrozen(), False) + self.assertEqual(invoicing_simulation_movement.isFrozen(), False) + + delivery = self._createDelivery(causality_value = order) + delivery_line = self._createMovement(delivery) + + invoicing_simulation_movement.edit(delivery_value = delivery_line) + + self.stepTic() + + self.assertEqual(self.delivery_path.isFrozen(order), False) + self.assertEqual(self.invoice_path.isFrozen(order), False) + self.assertEqual(self.delivery_path.isFrozen(delivery), False) + self.assertEqual(self.invoice_path.isFrozen(delivery), False) + + self.assertEqual(simulation_movement.isFrozen(), False) + self.assertEqual(invoicing_simulation_movement.isFrozen(), False) + + # put delivery in simulation state configured on path (and this state is + # available directly on movements) + + delivery.setSimulationState(self.frozen_state) + + self.assertEqual(self.frozen_state, delivery.getSimulationState()) + + self.stepTic() + + self.assertEqual(self.delivery_path.isFrozen(order), False) + self.assertEqual(self.invoice_path.isFrozen(order), True) + self.assertEqual(self.delivery_path.isFrozen(delivery), False) + self.assertEqual(self.invoice_path.isFrozen(delivery), True) + + self.assertEqual(simulation_movement.isFrozen(), False) + self.assertEqual(invoicing_simulation_movement.isFrozen(), True) + + # now simulate compensation + + compensated_simulation_movement = delivery_rule.newContent( + portal_type = 'Simulation Movement', + delivery_value = order_line, + causality_value = self.delivery_path + ) + + compensated_invoicing_rule = compensated_simulation_movement.newContent( + portal_type='Applied Rule') + + compensated_invoicing_simulation_movement = compensated_invoicing_rule \ + .newContent(portal_type='Simulation Movement', + causality_value = self.invoice_path) + + # and delivery some part of tree + + another_delivery = self._createDelivery(causality_value = delivery) + another_delivery_line = self._createMovement(another_delivery) + + delivery_simulation_movement.edit(delivery_value=another_delivery_line) + + self.stepTic() + + self.assertEqual(self.delivery_path.isFrozen(order), False) + + self.assertEqual(self.invoice_path.isFrozen(order), False) + + self.assertEqual(self.delivery_path.isFrozen(delivery), False) + + self.assertEqual(self.invoice_path.isFrozen(delivery), True) + + self.assertEqual(simulation_movement.isFrozen(), False) + self.assertEqual(invoicing_simulation_movement.isFrozen(), True) + + self.assertEqual(compensated_simulation_movement.isFrozen(), False) + self.assertEqual(compensated_invoicing_simulation_movement.isFrozen(), + False) + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestBPMImplementation)) + suite.addTest(unittest.makeSuite(TestBPMisBuildableImplementation)) + suite.addTest(unittest.makeSuite(TestBPMisCompletedImplementation)) + suite.addTest(unittest.makeSuite(TestBPMisFrozenImplementation)) + return suite diff --git a/product/ERP5Legacy/tests/testLegacyBPMEvaluation.py b/product/ERP5Legacy/tests/testLegacyBPMEvaluation.py new file mode 100644 index 0000000000000000000000000000000000000000..21d60277e029a142a2b3df6cb7cc5a235447445f --- /dev/null +++ b/product/ERP5Legacy/tests/testLegacyBPMEvaluation.py @@ -0,0 +1,889 @@ +# -*- coding: utf-8 -*- +############################################################################## +# Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved. +# 艁ukasz Nowak <luke@nexedi.com> +# +# WARNING: This program as such is intended to be used by professional +# programmers who take the whole responsibility of assessing all potential +# consequences resulting from its eventual inadequacies and bugs +# End users who are looking for a ready-to-use solution with commercial +# guarantees and support are strongly advised to contract a Free Software +# Service Company +# +# This program is Free Software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +############################################################################## +""" +This is BPM Evaluation Test class using erp5_bpm development Business Template + +Generally it tries to use two Business Processes - one with sequence very +similar to normal ERP5 - TestBPMEvaluationDefaultProcessMixin, second one +inverted - TestBPMEvaluationDifferentProcessMixin. + +It uses only Sale path to demonstrate BPM. + +It is advised to *NOT* remove erp5_administration. +""" +import unittest +import transaction + +from Products.ERP5Legacy.tests.testLegacyBPMCore import TestBPMMixin + +from DateTime import DateTime + +class TestBPMEvaluationMixin(TestBPMMixin): + node_portal_type = 'Organisation' + order_portal_type = 'Sale Order' + order_line_portal_type = 'Sale Order Line' + packing_list_portal_type = 'Sale Packing List' + packing_list_line_portal_type = 'Sale Packing List Line' + trade_condition_portal_type = 'Sale Trade Condition' + invoice_portal_type = 'Sale Invoice Transaction' + product_portal_type = 'Product' + order_start_date = DateTime() + order_stop_date = order_start_date + 10 + + def getBusinessTemplateList(self): + return TestBPMMixin.getBusinessTemplateList(self) + ('erp5_bpm', + 'erp5_administration', 'erp5_simulation',) + + def afterSetUp(self): + TestBPMMixin.afterSetUp(self) + self._createNodes() + self._createBusinessProcess() + self._createTradeCondition() + self._createRootDocument() + self._setUpRules() + self.stepTic() + + def _setUpRules(self): + """Setups rules + + Rules are part of configuration, so anything provided by Business + Templates or previous test runs is ignored - all old rules are invalidated + between tests and new rules are created, configured and validated. + """ + self.rule_tool = self.portal.portal_rules + for rule in self.rule_tool.contentValues(): + if rule.getValidationState() == 'validated': + rule.invalidate() + transaction.commit() + self._createOrderRootSimulationRule() + self._createDeliveryRootSimulationRule() + self._createDeliverySimulationRule() + self._createInvoiceSimulationRule() + self._createInvoiceRootSimulationRule() + self._createTradeModelSimulationRule() + + def _createTradeRootSimulationRule(self, **kw): + edit_dict = {} + edit_dict.update( + trade_phase = 'default/delivery', + ) + edit_dict.update(**kw) + rule = self.rule_tool.newContent(**edit_dict) + + # matching providers + for category in ('delivery',): + rule.newContent( + portal_type='Category Membership Divergence Tester', + title='%s divergence tester' % category, + tested_property=category, + divergence_provider=False, + matching_provider=True) + + # divergence providers + for category in ('source_section', + 'resource', + 'destination_section', + 'source', + 'aggregate'): + rule.newContent( + portal_type='Category Membership Divergence Tester', + title='%s divergence tester' % category, + tested_property=category, + divergence_provider=True, + matching_provider=False) + rule.newContent( + portal_type='Net Converted Quantity Divergence Tester', + title='quantity divergence tester', + tested_property='quantity', + quantity=0, + divergence_provider=True, + matching_provider=False) + for property_id in ('start_date', 'stop_date'): + rule.newContent( + portal_type='DateTime Divergence Tester', + title='%s divergence tester' % property_id, + tested_property=property_id, + quantity=0, + divergence_provider=True, + matching_provider=False) + rule.newContent( + portal_type='Float Divergence Tester', + title='price divergence tester', + tested_property='price', + quantity=0, + divergence_provider=True, + matching_provider=False) + + return rule + + def _createOrderRootSimulationRule(self): + rule = self._createTradeRootSimulationRule(portal_type='Order Root Simulation Rule', + trade_phase='default/order', + reference='default_order_rule') + rule.validate() + transaction.commit() + + def _createDeliveryRootSimulationRule(self): + rule = self._createTradeRootSimulationRule(portal_type='Delivery Root Simulation Rule', + reference='default_delivery_rule') + rule.validate() + transaction.commit() + + def _createDeliverySimulationRule(self): + rule = self.rule_tool.newContent(portal_type='Delivery Simulation Rule', + reference='default_delivering_rule', + trade_phase='default/delivery', + test_method_id = ('SimulationMovement_testDeliverySimulationRule',) + ) + # matching providers + for category in ('resource',): + rule.newContent( + portal_type='Category Membership Divergence Tester', + title='%s divergence tester' % category, + tested_property=category, + divergence_provider=False, + matching_provider=True) + rule.newContent( + portal_type='Variation Divergence Tester', + title='variation divergence tester', + tested_property='variation_property_dict', + divergence_provider=False, + matching_provider=True) + + # divergence providers + for category in ('resource', + 'source_section', + 'destination_section', + 'source', + 'source_function', + 'destination', + 'destination_function', + 'source_project', + 'destination_project', + 'aggregate', + 'price_currency', + 'base_contribution', + 'base_application', + 'source_account', + 'destination_account', + ): + rule.newContent( + portal_type='Category Membership Divergence Tester', + title='%s divergence tester' % category, + tested_property=category, + divergence_provider=True, + matching_provider=False) + rule.newContent( + portal_type='Net Converted Quantity Divergence Tester', + title='quantity divergence tester', + tested_property='quantity', + quantity=0, + divergence_provider=True, + matching_provider=False) + for property_id in ('start_date', 'stop_date'): + rule.newContent( + portal_type='DateTime Divergence Tester', + title='%s divergence tester' % property_id, + tested_property=property_id, + quantity=0, + divergence_provider=True, + matching_provider=False) + rule.newContent( + portal_type='Float Divergence Tester', + title='price divergence tester', + tested_property='price', + quantity=0, + divergence_provider=True, + matching_provider=False) + + rule.validate() + transaction.commit() + + def _createTradeModelSimulationRule(self): + rule = self.rule_tool.newContent(portal_type='Trade Model Simulation Rule', + reference='default_trade_model_rule', + test_method_id = ('SimulationMovement_testTradeModelSimulationRule',) + ) + # matching providers + for category in ('resource',): + rule.newContent( + portal_type='Category Membership Divergence Tester', + title='%s divergence tester' % category, + tested_property=category, + divergence_provider=False, + matching_provider=True) + rule.newContent( + portal_type='Variation Divergence Tester', + title='variation divergence tester', + tested_property='variation_property_dict', + divergence_provider=False, + matching_provider=True) + + # divergence providers + for category in ('resource', + 'source_section', + 'destination_section', + 'source', + 'source_function', + 'destination_function', + 'source_project', + 'destination_project', + 'aggregate', + 'price_currency', + 'base_contribution', + 'base_application', + 'source_account', + 'destination_account', + ): + rule.newContent( + portal_type='Category Membership Divergence Tester', + title='%s divergence tester' % category, + tested_property=category, + divergence_provider=True, + matching_provider=False) + rule.newContent( + portal_type='Net Converted Quantity Divergence Tester', + title='quantity divergence tester', + tested_property='quantity', + quantity=0, + divergence_provider=True, + matching_provider=False) + for property_id in ('start_date', 'stop_date'): + rule.newContent( + portal_type='DateTime Divergence Tester', + title='%s divergence tester' % property_id, + tested_property=property_id, + quantity=0, + divergence_provider=True, + matching_provider=False) + rule.newContent( + portal_type='Float Divergence Tester', + title='price divergence tester', + tested_property='price', + quantity=0, + divergence_provider=True, + matching_provider=False) + + rule.validate() + transaction.commit() + + def _createInvoiceRootSimulationRule(self): + # Note: This is not used, but invoices, even if built from simulation, + # need those rule to create empty one applied rule + rule_tool = self.portal.portal_rules + + clipboard = rule_tool.manage_copyObjects(ids = ['new_invoice_root_simulation_rule']) + pasted = rule_tool.manage_pasteObjects(clipboard) + new_rule = getattr(rule_tool, pasted[0]['new_id']) + new_rule.validate() + transaction.commit() + + def _createInvoiceSimulationRule(self): + edit_dict = {} + edit_dict.update( + ) + rule = self.rule_tool.newContent(portal_type='Invoice Simulation Rule', + reference='default_invoicing_rule', + trade_phase = 'default/invoicing', + test_method_id = ('SimulationMovement_testInvoiceSimulationRule',) + ) + # matching providers + for category in ('resource',): + rule.newContent( + portal_type='Category Membership Divergence Tester', + title='%s divergence tester' % category, + tested_property=category, + divergence_provider=False, + matching_provider=True) + rule.newContent( + portal_type='Variation Divergence Tester', + title='variation divergence tester', + tested_property='variation_property_dict', + divergence_provider=False, + matching_provider=True) + + # divergence providers + for category in ('resource', + 'source_section', + 'destination_section', + 'source', + 'source_function', + 'destination_function', + 'source_project', + 'destination_project', + 'aggregate', + 'price_currency', + 'base_contribution', + 'base_application', + 'source_account', + 'destination_account', + ): + rule.newContent( + portal_type='Category Membership Divergence Tester', + title='%s divergence tester' % category, + tested_property=category, + divergence_provider=True, + matching_provider=False) + rule.newContent( + portal_type='Net Converted Quantity Divergence Tester', + title='quantity divergence tester', + tested_property='quantity', + quantity=0, + divergence_provider=True, + matching_provider=False) + for property_id in ('start_date', 'stop_date'): + rule.newContent( + portal_type='DateTime Divergence Tester', + title='%s divergence tester' % property_id, + tested_property=property_id, + quantity=0, + divergence_provider=True, + matching_provider=False) + rule.newContent( + portal_type='Float Divergence Tester', + title='price divergence tester', + tested_property='price', + quantity=0, + divergence_provider=True, + matching_provider=False) + + rule.validate() + transaction.commit() + + def _createDocument(self, portal_type, **kw): + module = self.portal.getDefaultModule(portal_type=portal_type) + return module.newContent(portal_type=portal_type, **kw) + + def _createProduct(self, **kw): + return self._createDocument(self.product_portal_type, **kw) + + def _createNode(self, **kw): + return self._createDocument(self.node_portal_type, **kw) + + def _createTradeCondition(self, **kw): + self.trade_condition = self._createDocument( + self.trade_condition_portal_type, + title = self.id(), + specialise_value=self.business_process, **kw) + + def _createRootDocumentLine(self, **kw): + return self.root_document.newContent( + portal_type=self.root_document_line_portal_type, **kw) + + def _createNodes(self): + self.source, self.source_section = self._createNode(), self._createNode() + self.destination, self.destination_section = self._createNode() \ + , self._createNode() + + def _createBusinessStateList(self): + """Creates list of defaults states, set them on self as name_state property""" + for state_name in ('ordered', 'delivered', 'invoiced', 'accounted', + 'paid'): + state_document = self.createBusinessState(self.business_process, + title=state_name) + setattr(self,'%s_state' % state_name, state_document) + + def _createRootDocument(self): + self.root_document = self._createDocument(self.root_document_portal_type, + source_value = self.source, + source_section_value = self.source_section, + destination_value = self.destination, + destination_section_value = self.destination_section, + start_date = self.order_start_date, + stop_date = self.order_stop_date, + specialise_value = self.trade_condition) + + def _checkBPMSimulation(self): + """Checks BPMised related simumation. + + Note: Simulation tree is the same, it is totally independent from + BPM sequence""" + # TODO: + # - gather errors into one list + bpm_root_rule = self.root_document.getCausalityRelatedValue( + portal_type='Applied Rule') + # check that correct root rule applied + self.assertEqual(bpm_root_rule.getSpecialiseValue().getPortalType(), + self.root_rule_portal_type) + root_simulation_movement_list = bpm_root_rule.contentValues() + for root_simulation_movement in root_simulation_movement_list: + self.assertEqual(root_simulation_movement.getPortalType(), + 'Simulation Movement') + movement = root_simulation_movement.getDeliveryValue() + property_problem_list = [] + # check some properties equality between delivery line and simulation + # movement, gather errors + for property in 'resource', 'price', 'start_date', 'stop_date', \ + 'source', 'destination', 'source_section', \ + 'destination_section': + if movement.getProperty(property) != root_simulation_movement \ + .getProperty(property): + property_problem_list.append('property %s movement %s ' + 'simulation %s' % (property, movement.getProperty(property), + root_simulation_movement.getProperty(property))) + if len(property_problem_list) > 0: + self.fail('\n'.join(property_problem_list)) + self.assertEqual( + movement.getQuantity() * root_simulation_movement.getDeliveryRatio(), + root_simulation_movement.getQuantity()) + # root rule is order or delivery - so below each movement invoicing one + # is expected + self.assertEquals(len(root_simulation_movement.contentValues()), 1) + if self.root_rule_portal_type == 'Order Root Simulation Rule': + delivery_rule = root_simulation_movement.contentValues()[0] + delivery_simulation_movement_list = delivery_rule.contentValues() + self.assertEqual(1, len(delivery_simulation_movement_list)) + delivery_simulation_movement = delivery_simulation_movement_list[0] + else: + delivery_simulation_movement = root_simulation_movement + for bpm_invoicing_rule in delivery_simulation_movement.contentValues(): + self.assertEqual(bpm_invoicing_rule.getPortalType(), 'Applied Rule') + self.assertEqual(bpm_invoicing_rule.getSpecialiseValue() \ + .getPortalType(), 'Invoice Simulation Rule') + # only one movement inside invoicing rule + self.assertEquals(len(bpm_invoicing_rule.contentValues()), 1) + for invoicing_simulation_movement in bpm_invoicing_rule \ + .contentValues(): + self.assertEqual(invoicing_simulation_movement.getPortalType(), + 'Simulation Movement') + self.assertEqual(invoicing_simulation_movement.getCausalityValue(), + self.invoice_path) + property_problem_list = [] + # check equality of some properties, gather them + for property in 'resource', 'price', 'start_date', \ + 'stop_date', 'source', 'destination', 'source_section', \ + 'destination_section': + if movement.getProperty(property) != \ + invoicing_simulation_movement.getProperty(property): + property_problem_list.append('property %s movement %s ' + 'simulation %s' % (property, movement.getProperty(property), + invoicing_simulation_movement.getProperty(property))) + if len(property_problem_list) > 0: + self.fail('\n'.join(property_problem_list)) + self.assertEqual( + movement.getQuantity() * root_simulation_movement.getDeliveryRatio(), + invoicing_simulation_movement.getQuantity()) + # simple check for trade model rule existence, without movements, + # as no trade condition configured + self.assertEquals( + len(invoicing_simulation_movement.contentValues()), 1) + for trade_model_rule in invoicing_simulation_movement \ + .contentValues(): + self.assertEqual(trade_model_rule.getPortalType(), 'Applied Rule') + self.assertEqual(trade_model_rule.getSpecialiseValue() \ + .getPortalType(), 'Trade Model Simulation Rule') + self.assertSameSet(trade_model_rule.contentValues( + portal_type='Simulation Movement'), []) + +class TestBPMEvaluationDefaultProcessMixin: + def _createBusinessProcess(self): + self.business_process = self.createBusinessProcess(title=self.id(), + referential_date='start_date') + self._createBusinessStateList() + + self.order_path = self.createBusinessPath(self.business_process, + successor_value=self.ordered_state, + trade_phase='default/order', + deliverable=1, + completed_state_list=['confirmed'], + frozen_state_list=['confirmed'], + ) + + self.delivery_path = self.createBusinessPath(self.business_process, + predecessor_value=self.ordered_state, + successor_value=self.delivered_state, + trade_phase='default/delivery', + deliverable=1, + completed_state_list=['started', 'stopped', 'delivered'], + frozen_state_list=['started', 'stopped', 'delivered'], + delivery_builder='portal_deliveries/bpm_sale_packing_list_builder', + ) + + self.invoice_path = self.createBusinessPath(self.business_process, + predecessor_value=self.delivered_state, + successor_value=self.invoiced_state, + completed_state_list=['delivered'], + frozen_state_list=['stopped', 'delivered'], + delivery_builder='portal_deliveries/bpm_sale_invoice_builder', + trade_phase='default/invoicing') + + self.account_path = self.createBusinessPath(self.business_process, + predecessor_value=self.invoiced_state, + successor_value=self.accounted_state, + completed_state_list=['delivered'], + frozen_state_list=['stopped', 'delivered'], + trade_phase='default/accounting') + + self.pay_path = self.createBusinessPath(self.business_process, + predecessor_value=self.invoiced_state, + successor_value=self.accounted_state, + completed_state_list=['delivered'], + frozen_state_list=['stopped', 'delivered'], + trade_phase='default/payment') + + self.stepTic() + +class TestBPMEvaluationDifferentProcessMixin: + def _createBusinessProcess(self): + self.business_process = self.createBusinessProcess(title=self.id(), + referential_date='start_date') + self._createBusinessStateList() + + self.order_path = self.createBusinessPath(self.business_process, + successor_value=self.ordered_state, + trade_phase='default/order', + deliverable=1, + completed_state_list=['confirmed'], + frozen_state_list=['confirmed'], + ) + + self.invoice_path = self.createBusinessPath(self.business_process, + predecessor_value=self.ordered_state, + successor_value=self.invoiced_state, + completed_state_list=['delivered'], + frozen_state_list=['stopped', 'delivered'], + trade_phase='default/invoicing') + + self.account_path = self.createBusinessPath(self.business_process, + predecessor_value=self.invoiced_state, + successor_value=self.accounted_state, + completed_state_list=['delivered'], + frozen_state_list=['stopped', 'delivered'], + trade_phase='default/accounting') + + self.pay_path = self.createBusinessPath(self.business_process, + predecessor_value=self.accounted_state, + successor_value=self.paid_state, + completed_state_list=['delivered'], + frozen_state_list=['stopped', 'delivered'], + trade_phase='default/payment') + + self.delivery_path = self.createBusinessPath(self.business_process, + predecessor_value=self.paid_state, + successor_value=self.delivered_state, + trade_phase='default/delivery', + deliverable=1, + completed_state_list=['delivered'], + frozen_state_list=['stopped', 'delivered']) + + self.stepTic() + +class GenericRuleTestsMixin: + """Tests which are generic for BPMised Order, Delivery and Invoice Rule""" + def test_transition(self): + self.order_line = self._createRootDocumentLine( + resource_value = self._createProduct(), quantity = 10, price = 5) + self.stepTic() + + self._doFirstTransition(self.root_document) + self.stepTic() + self._checkBPMSimulation() + + def _split(self): + """Invoke manual splitting""" + ratio = .5 # hardcoded value, hopefully float friendly + applied_rule = self.root_document.getCausalityRelatedValue( + portal_type='Applied Rule') + for movement in applied_rule.contentValues( + portal_type='Simulation Movement'): + new_movement = movement.Base_createCloneDocument(batch_mode=1) + old_quantity = movement.getQuantity() + movement.edit( + quantity = old_quantity * ratio + ) + + new_movement.edit( + quantity = old_quantity * (1 - ratio) + ) + + self.stepTic() + + # recalculate order ratio + for movement in self.root_document.getMovementList(): + movement_quantity = movement.getQuantity() + for simulation_movement in movement.getDeliveryRelatedValueList(): + new_ratio = simulation_movement.getQuantity() / movement_quantity + simulation_movement.edit(order_ratio = new_ratio) + if simulation_movement.getDelivery() is not None: + simulation_movement.edit(delivery_ratio = new_ratio) + + # reexpand + applied_rule.expand() + self.stepTic() + + self._checkBPMSimulation() + + def test_transition_split(self): + self.order_line = self._createRootDocumentLine( + resource_value = self._createProduct(), quantity = 10, price = 5) + self.stepTic() + + self._doFirstTransition(self.root_document) + self.stepTic() + self._checkBPMSimulation() + + self._split() + + # expand + self.root_document.edit(title = self.root_document.getTitle() + 'a') + + self.stepTic() + self._checkBPMSimulation() + + def test_transition_split_line_add(self): + self.test_transition_split() + self.order_line_2 = self._createRootDocumentLine( + resource_value = self._createProduct(), quantity = 4, price = 2) + self.stepTic() + self._checkBPMSimulation() + + def test_transition_split_line_add_split(self): + self.test_transition_split_line_add() + + # second split + self._split() + + # expand + self.root_document.edit(title = self.root_document.getTitle() + 'a') + + self.stepTic() + self._checkBPMSimulation() + + def test_transition_line_edit(self): + self.test_transition() + self.order_line.edit(quantity = 8, price = 6) + self.stepTic() + self._checkBPMSimulation() + + def test_transition_line_edit_add(self): + self.test_transition_line_edit() + self.order_line_2 = self._createRootDocumentLine( + resource_value = self._createProduct(), quantity = 4, price = 2) + self.stepTic() + self._checkBPMSimulation() + + def test_transition_line_edit_add_many_transactions(self): + self.test_transition_line_edit() + self.order_line_9 = self._createRootDocumentLine() + self.stepTic() + self._checkBPMSimulation() + + self.order_line_9.edit(resource_value = self._createProduct()) + self.stepTic() + self._checkBPMSimulation() + + self.order_line_9.edit(quantity = 1) + self.stepTic() + self._checkBPMSimulation() + + self.order_line_9.edit(price = 33) + self.stepTic() + self._checkBPMSimulation() + + self.order_line_9.edit(resource_value = self._createProduct()) + self.stepTic() + self._checkBPMSimulation() + + def test_transition_line_edit_add_same_resource(self): + self.test_transition_line_edit() + resource = self.order_line.getResourceValue() + self.order_line_10 = self._createRootDocumentLine( + resource_value = resource, quantity = 9, price = 2) + self.stepTic() + self._checkBPMSimulation() + + def test_transition_line_edit_add_same_resource_edit_again(self): + self.test_transition_line_edit_add_same_resource() + + self.root_document.edit(title = self.root_document.getTitle() + 'a' ) + self.stepTic() + self._checkBPMSimulation() + +class TestOrder(TestBPMEvaluationMixin, GenericRuleTestsMixin): + """Check BPMised Order Rule behaviour""" + root_document_portal_type = 'Sale Order' + root_document_line_portal_type = 'Sale Order Line' + root_rule_portal_type = 'Order Root Simulation Rule' + + def _doFirstTransition(self, document): + document.plan() + + def test_confirming(self): + self.order_line = self._createRootDocumentLine( + resource_value = self._createProduct(), quantity = 10, price = 5) + self.stepTic() + + self.root_document.confirm() + self.stepTic() + self._checkBPMSimulation() + self.assertEqual( + 2, + len(self.root_document.getCausalityRelatedList()) + ) + self.assertEqual( + 'Applied Rule', + self.root_document.getCausalityRelatedValue( + portal_type='Applied Rule').getPortalType() + ) + + self.assertEqual( + self.packing_list_portal_type, + self.root_document.getCausalityRelatedValue( + portal_type=self.packing_list_portal_type).getPortalType() + ) + +class TestPackingList(TestBPMEvaluationMixin, GenericRuleTestsMixin): + """Check BPM Delivery Rule behaviour""" + root_document_portal_type = 'Sale Packing List' + root_document_line_portal_type = 'Sale Packing List Line' + root_rule_portal_type = 'Delivery Root Simulation Rule' + + def _packDelivery(self): + """Packs delivery fully, removes possible containers before""" + self.root_document.deleteContent(self.root_document.contentIds( + filter={'portal_type':'Container'})) + cont = self.root_document.newContent(portal_type='Container') + for movement in self.root_document.getMovementList(): + cont.newContent(portal_type='Container Line', + resource = movement.getResource(), quantity = movement.getQuantity()) + self.stepTic() + self._checkBPMSimulation() + + def _doFirstTransition(self, document): + document.confirm() + + def test_starting(self): + self.delivery_line = self._createRootDocumentLine( + resource_value = self._createProduct(), quantity = 10, price = 5) + self.stepTic() + + self.root_document.confirm() + self.stepTic() + self._checkBPMSimulation() + + self._packDelivery() + + self.root_document.start() + self.stepTic() + self._checkBPMSimulation() + + self.assertEqual( + 2, + len(self.root_document.getCausalityRelatedList()) + ) + self.assertEqual( + 'Applied Rule', + self.root_document.getCausalityRelatedValue( + portal_type='Applied Rule').getPortalType() + ) + + self.assertEqual( + self.invoice_portal_type, + self.root_document.getCausalityRelatedValue( + portal_type=self.invoice_portal_type).getPortalType() + ) + +class TestInvoice(TestBPMEvaluationMixin, GenericRuleTestsMixin): + """Check BPM Invoice Rule behaviour""" + # not implemented yet + pass + +class TestOrderDefaultProcess(TestOrder, + TestBPMEvaluationDefaultProcessMixin): + pass + +class TestPackingListDefaultProcess(TestPackingList, + TestBPMEvaluationDefaultProcessMixin): + pass + +class TestInvoiceDefaultProcess(TestInvoice, + TestBPMEvaluationDefaultProcessMixin): + pass + +class TestOrderDifferentProcess(TestOrder, + TestBPMEvaluationDifferentProcessMixin): + def test_confirming(self): + # in current BPM configuration nothing shall be built + # as soon as test business process will be finished, it shall built proper + # delivery + self.order_line = self._createRootDocumentLine( + resource_value = self._createProduct(), quantity = 10, price = 5) + self.stepTic() + + self.root_document.confirm() + self.stepTic() + self._checkBPMSimulation() + self.assertEqual( + 1, + len(self.root_document.getCausalityRelatedList()) + ) + self.assertEqual( + 'Applied Rule', + self.root_document.getCausalityRelatedValue().getPortalType() + ) + +class TestPackingListDifferentProcess(TestPackingList, + TestBPMEvaluationDifferentProcessMixin): + def test_starting(self): + self.delivery_line = self._createRootDocumentLine( + resource_value = self._createProduct(), quantity = 10, price = 5) + self.stepTic() + + self.root_document.confirm() + self.stepTic() + self._checkBPMSimulation() + + self._packDelivery() + self.root_document.start() + self.stepTic() + self._checkBPMSimulation() + + self.assertEqual( + 1, + len(self.root_document.getCausalityRelatedList()) + ) + self.assertEqual( + 'Applied Rule', + self.root_document.getCausalityRelatedValue( + portal_type='Applied Rule').getPortalType() + ) + +class TestInvoiceDifferentProcess(TestInvoice, + TestBPMEvaluationDifferentProcessMixin): + pass + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestOrderDefaultProcess)) + suite.addTest(unittest.makeSuite(TestPackingListDefaultProcess)) +# suite.addTest(unittest.makeSuite(TestInvoiceDefaultProcess)) + + suite.addTest(unittest.makeSuite(TestOrderDifferentProcess)) + suite.addTest(unittest.makeSuite(TestPackingListDifferentProcess)) +# suite.addTest(unittest.makeSuite(TestInvoiceDifferentProcess)) + return suite diff --git a/product/ERP5Legacy/tests/testLegacyMRP.py b/product/ERP5Legacy/tests/testLegacyMRP.py new file mode 100644 index 0000000000000000000000000000000000000000..683b8c889e58ecad4151af3d6595942bd544f8b7 --- /dev/null +++ b/product/ERP5Legacy/tests/testLegacyMRP.py @@ -0,0 +1,536 @@ +# -*- coding: utf-8 -*- +############################################################################## +# Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved. +# Yusuke Muraoka <yusuke@nexedi.com> +# +# WARNING: This program as such is intended to be used by professional +# programmers who take the whole responsibility of assessing all potential +# consequences resulting from its eventual inadequacies and bugs +# End users who are looking for a ready-to-use solution with commercial +# guarantees and support are strongly adviced to contract a Free Software +# Service Company +# +# This program is Free Software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +############################################################################## + +import unittest +import transaction + +from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase +from DateTime import DateTime + +from Products.CMFCore.utils import getToolByName +from Products.ERP5Type.tests.utils import reindex + +from Products.ERP5Legacy.tests.testLegacyBPMCore import TestBPMMixin +from Products.ERP5Type.tests.backportUnittest import skip + +class TestMRPMixin(TestBPMMixin): + transformation_portal_type = 'Transformation' + transformed_resource_portal_type = 'Transformation Transformed Resource' + product_portal_type = 'Product' + organisation_portal_type = 'Organisation' + order_portal_type = 'Production Order' + order_line_portal_type = 'Production Order Line' + + def getBusinessTemplateList(self): + return TestBPMMixin.getBusinessTemplateList(self) + ('erp5_mrp', ) + + def invalidateRules(self): + """ + do reversely of validateRules + """ + rule_tool = self.getRuleTool() + for rule in rule_tool.contentValues( + portal_type=rule_tool.getPortalRuleTypeList()): + if rule.getValidationState() == 'validated': + rule.invalidate() + + def _createDocument(self, portal_type, **kw): + module = self.portal.getDefaultModule( + portal_type=portal_type) + return self._createObject(module, portal_type, **kw) + + def _createObject(self, parent, portal_type, id=None, **kw): + o = None + if id is not None: + o = parent.get(str(id), None) + if o is None: + o = parent.newContent(portal_type=portal_type) + o.edit(**kw) + return o + + def createTransformation(self, **kw): + return self._createDocument(self.transformation_portal_type, **kw) + + def createProduct(self, **kw): + return self._createDocument(self.product_portal_type, **kw) + + def createOrganisation(self, **kw): + return self._createDocument(self.organisation_portal_type, **kw) + + def createOrder(self, **kw): + return self._createDocument(self.order_portal_type, **kw) + + def createOrderLine(self, order, **kw): + return self._createObject(order, self.order_line_portal_type, **kw) + + def createTransformedResource(self, transformation, **kw): + return self._createObject(transformation, self.transformed_resource_portal_type, **kw) + + @reindex + def createCategories(self): + category_tool = getToolByName(self.portal, 'portal_categories') + self.createCategoriesInCategory(category_tool.base_amount, ['weight']) + self.createCategoriesInCategory(category_tool.base_amount.weight, ['kg']) + self.createCategoriesInCategory(category_tool.trade_phase, ['mrp',]) + self.createCategoriesInCategory(category_tool.trade_phase.mrp, + ['p' + str(i) for i in range(5)]) # phase0 ~ 4 + + @reindex + def createDefaultOrder(self, transformation=None, business_process=None): + if transformation is None: + transformation = self.createDefaultTransformation() + if business_process is None: + business_process = self.createSimpleBusinessProcess() + + base_date = DateTime() + + order = self.createOrder(specialise_value=business_process, + start_date=base_date, + stop_date=base_date+3) + order_line = self.createOrderLine(order, + quantity=10, + resource=transformation.getResource(), + specialise_value=transformation) + # XXX in some case, specialise_value is not related to order_line by edit, + # but by setSpecialise() is ok, Why? + order_line.setSpecialiseValue(transformation) + return order + + @reindex + def createDefaultTransformation(self): + resource1 = self.createProduct(id='1', quantity_unit_list=['weight/kg']) + resource2 = self.createProduct(id='2', quantity_unit_list=['weight/kg']) + resource3 = self.createProduct(id='3', quantity_unit_list=['weight/kg']) + resource4 = self.createProduct(id='4', quantity_unit_list=['weight/kg']) + resource5 = self.createProduct(id='5', quantity_unit_list=['weight/kg']) + + transformation = self.createTransformation(resource_value=resource5) + self.createTransformedResource(transformation=transformation, + resource_value=resource1, + quantity=3, + quantity_unit_list=['weight/kg'], + trade_phase='mrp/p2') + self.createTransformedResource(transformation=transformation, + resource_value=resource2, + quantity=1, + quantity_unit_list=['weight/kg'], + trade_phase='mrp/p2') + self.createTransformedResource(transformation=transformation, + resource_value=resource3, + quantity=4, + quantity_unit_list=['weight/kg'], + trade_phase='mrp/p3') + self.createTransformedResource(transformation=transformation, + resource_value=resource4, + quantity=1, + quantity_unit_list=['weight/kg'], + trade_phase='mrp/p3') + return transformation + + @reindex + def createSimpleBusinessProcess(self): + """ mrp/p2 mrp/3 + ready -------- partial_produced ------- done + """ + business_process = self.createBusinessProcess() + business_path_p2 = self.createBusinessPath(business_process) + business_path_p3 = self.createBusinessPath(business_process) + business_state_ready = self.createBusinessState(business_process) + business_state_partial = self.createBusinessState(business_process) + business_state_done = self.createBusinessState(business_process) + + # organisations + source_section = self.createOrganisation(title='source_section') + source = self.createOrganisation(title='source') + destination_section = self.createOrganisation(title='destination_section') + destination = self.createOrganisation(title='destination') + + business_process.edit(referential_date='stop_date') + business_path_p2.edit(id='p2', + predecessor_value=business_state_ready, + successor_value=business_state_partial, + quantity=1, + trade_phase=['mrp/p2'], + source_section_value=source_section, + source_value=source, + destination_section_value=destination_section, + destination_value=destination, + ) + business_path_p3.edit(id='p3', + predecessor_value=business_state_partial, + successor_value=business_state_done, + quantity=1, + deliverable=1, # root explanation + trade_phase=['mrp/p3'], + source_section_value=source_section, + source_value=source, + destination_section_value=destination_section, + destination_value=destination, + ) + return business_process + + @reindex + def createConcurrentBusinessProcess(self): + """ mrp/p2 + ready ======== partial_produced + mrp/p3 + """ + business_process = self.createBusinessProcess() + business_path_p2 = self.createBusinessPath(business_process) + business_path_p3 = self.createBusinessPath(business_process) + business_state_ready = self.createBusinessState(business_process) + business_state_partial = self.createBusinessState(business_process) + + # organisations + source_section = self.createOrganisation(title='source_section') + source = self.createOrganisation(title='source') + destination_section = self.createOrganisation(title='destination_section') + destination = self.createOrganisation(title='destination') + + business_process.edit(referential_date='stop_date') + business_path_p2.edit(id='p2', + predecessor_value=business_state_ready, + successor_value=business_state_partial, + quantity=1, + trade_phase=['mrp/p2'], + source_section_value=source_section, + source_value=source, + destination_section_value=destination_section, + destination_value=destination, + ) + business_path_p3.edit(id='p3', + predecessor_value=business_state_ready, + successor_value=business_state_partial, + quantity=1, + deliverable=1, # root explanation + trade_phase=['mrp/p3'], + source_section_value=source_section, + source_value=source, + destination_section_value=destination_section, + destination_value=destination, + ) + return business_process + + @reindex + def beforeTearDown(self): + super(TestMRPMixin, self).beforeTearDown() + transaction.abort() + for module in ( + self.portal.organisation_module, + self.portal.production_order_module, + self.portal.transformation_module, + self.portal.business_process_module, + # don't remove document because reuse it for testing of id + # self.portal.product_module, + self.portal.portal_simulation,): + module.manage_delObjects(list(module.objectIds())) + transaction.commit() + +class TestMRPImplementation(TestMRPMixin, ERP5TypeTestCase): + """the test for implementation""" + @skip('Unfinished experimental feature') + def test_TransformationRule_getHeadProductionPathList(self): + rule = self.portal.portal_rules.default_transformation_model_rule + + transformation = self.createDefaultTransformation() + + business_process = self.createSimpleBusinessProcess() + self.assertEquals([business_process.p2], + rule.getHeadProductionPathList(transformation, business_process)) + + business_process = self.createConcurrentBusinessProcess() + self.assertEquals(set([business_process.p2, business_process.p3]), + set(rule.getHeadProductionPathList(transformation, business_process))) + + def test_TransformationRule_expand(self): + # mock order + order = self.createDefaultOrder() + order_line = order.objectValues()[0] + + business_process = order.getSpecialiseValue() + + # paths + path_p2 = '%s/p2' % business_process.getRelativeUrl() + path_p3 = '%s/p3' % business_process.getRelativeUrl() + + # organisations + path = business_process.objectValues( + portal_type=self.portal.getPortalBusinessPathTypeList())[0] + source_section = path.getSourceSection() + source = path.getSource() + destination_section = path.getDestinationSection() + destination = path.getDestination() + consumed_organisations = (source_section, source, destination_section, None) + produced_organisations = (source_section, None, destination_section, destination) + + # don't need another rules, just need TransformationRule for test + self.invalidateRules() + + self.stepTic() + + # alter simulations of the order + # root + applied_rule = self.portal.portal_simulation.newContent(portal_type='Applied Rule') + movement = applied_rule.newContent(portal_type='Simulation Movement') + applied_rule.edit(causality_value=order) + movement.edit(order_value=order_line, + quantity=order_line.getQuantity(), + resource=order_line.getResource()) + # test mock + applied_rule = movement.newContent(potal_type='Applied Rule') + + rule = self.portal.portal_rules.default_transformation_model_rule + rule.expand(applied_rule) + + # assertion + expected_value_set = set([ + ((path_p2,), 'product_module/5', produced_organisations, 'mrp/p3', -10), + ((path_p2,), 'product_module/1', consumed_organisations, 'mrp/p2', 30), + ((path_p2,), 'product_module/2', consumed_organisations, 'mrp/p2', 10), + ((path_p3,), 'product_module/5', consumed_organisations, 'mrp/p3', 10), + ((path_p3,), 'product_module/3', consumed_organisations, 'mrp/p3', 40), + ((path_p3,), 'product_module/4', consumed_organisations, 'mrp/p3', 10), + ((path_p3,), 'product_module/5', produced_organisations, None, -10)]) + movement_list = applied_rule.objectValues() + self.assertEquals(len(expected_value_set), len(movement_list)) + movement_value_set = set([]) + for movement in movement_list: + movement_value_set |= set([(tuple(movement.getCausalityList()), + movement.getResource(), + (movement.getSourceSection(), + movement.getSource(), + movement.getDestinationSection(), + movement.getDestination(),), # organisations + movement.getTradePhase(), + movement.getQuantity())]) + self.assertEquals(expected_value_set, movement_value_set) + + @skip('Unfinished experimental feature') + def test_TransformationRule_expand_concurrent(self): + business_process = self.createConcurrentBusinessProcess() + + # mock order + order = self.createDefaultOrder(business_process=business_process) + order_line = order.objectValues()[0] + + # phases + phase_p2 = '%s/p2' % business_process.getRelativeUrl() + phase_p3 = '%s/p3' % business_process.getRelativeUrl() + + # organisations + path = business_process.objectValues( + portal_type=self.portal.getPortalBusinessPathTypeList())[0] + source_section = path.getSourceSection() + source = path.getSource() + destination_section = path.getDestinationSection() + destination = path.getDestination() + organisations = (source_section, source, destination_section, destination) + consumed_organisations = (source_section, source, destination_section, None) + produced_organisations = (source_section, None, destination_section, destination) + + # don't need another rules, just need TransformationRule for test + self.invalidateRules() + + self.stepTic() + + # alter simulations of the order + # root + applied_rule = self.portal.portal_simulation.newContent(portal_type='Applied Rule') + movement = applied_rule.newContent(portal_type='Simulation Movement') + applied_rule.edit(causality_value=order) + movement.edit(order_value=order_line, + quantity=order_line.getQuantity(), + resource=order_line.getResource()) + # test mock + applied_rule = movement.newContent(potal_type='Applied Rule') + + rule = self.portal.portal_rules.default_transformation_model_rule + rule.expand(applied_rule) + + # assertion + expected_value_set = set([ + ((phase_p2,), 'product_module/1', consumed_organisations, 'mrp/p2', 30), + ((phase_p2,), 'product_module/2', consumed_organisations, 'mrp/p2', 10), + ((phase_p3,), 'product_module/3', consumed_organisations, 'mrp/p3', 40), + ((phase_p3,), 'product_module/4', consumed_organisations, 'mrp/p3', 10), + ((phase_p2, phase_p3), 'product_module/5', produced_organisations, None, -10)]) + movement_list = applied_rule.objectValues() + self.assertEquals(len(expected_value_set), len(movement_list)) + movement_value_set = set([]) + for movement in movement_list: + movement_value_set |= set([(tuple(movement.getCausalityList()), + movement.getResource(), + (movement.getSourceSection(), + movement.getSource(), + movement.getDestinationSection(), + movement.getDestination(),), # organisations + movement.getTradePhase(), + movement.getQuantity())]) + self.assertEquals(expected_value_set, movement_value_set) + + @skip('Unfinished experimental feature') + def test_TransformationRule_expand_reexpand(self): + """ + test case of difference when any movement are frozen + by using above result + """ + self.test_TransformationRule_expand_concurrent() + + self.stepTic() + + applied_rule = self.portal.portal_simulation.objectValues()[0] + + business_process = applied_rule.getCausalityValue().getSpecialiseValue() + + # phases + phase_p2 = '%s/p2' % business_process.getRelativeUrl() + phase_p3 = '%s/p3' % business_process.getRelativeUrl() + + # organisations + path = business_process.objectValues( + portal_type=self.portal.getPortalBusinessPathTypeList())[0] + source_section = path.getSourceSection() + source = path.getSource() + destination_section = path.getDestinationSection() + destination = path.getDestination() + consumed_organisations = (source_section, source, destination_section, None) + produced_organisations = (source_section, None, destination_section, destination) + + movement = applied_rule.objectValues()[0] + applied_rule = movement.objectValues()[0] + + # these movements are made by transformation + for movement in applied_rule.objectValues(): + movement.edit(quantity=1) + # set the state value of isFrozen to 1, + movement._baseSetFrozen(1) + + # re-expand + rule = self.portal.portal_rules.default_transformation_model_rule + rule.expand(applied_rule) + + # assertion + expected_value_set = set([ + ((phase_p2,), 'product_module/1', consumed_organisations, 'mrp/p2', 1), # Frozen + ((phase_p2,), 'product_module/1', consumed_organisations, 'mrp/p2', 29), + ((phase_p2,), 'product_module/2', consumed_organisations, 'mrp/p2', 1), # Frozen + ((phase_p2,), 'product_module/2', consumed_organisations, 'mrp/p2', 9), + ((phase_p3,), 'product_module/3', consumed_organisations, 'mrp/p3', 1), # Frozen + ((phase_p3,), 'product_module/3', consumed_organisations, 'mrp/p3', 39), + ((phase_p3,), 'product_module/4', consumed_organisations, 'mrp/p3', 1), # Frozen + ((phase_p3,), 'product_module/4', consumed_organisations, 'mrp/p3', 9), + ((phase_p2, phase_p3), 'product_module/5', produced_organisations, None, 1), # Frozen + ((phase_p2, phase_p3), 'product_module/5', produced_organisations, None, -11)]) + movement_list = applied_rule.objectValues() + self.assertEquals(len(expected_value_set), len(movement_list)) + movement_value_set = set([]) + for movement in movement_list: + movement_value_set |= set([(tuple(movement.getCausalityList()), + movement.getResource(), + (movement.getSourceSection(), + movement.getSource(), + movement.getDestinationSection(), + movement.getDestination(),), # organisations + movement.getTradePhase(), + movement.getQuantity())]) + self.assertEquals(expected_value_set, movement_value_set) + + @skip('Unfinished experimental feature') + def test_TransformationSourcingRule_expand(self): + # mock order + order = self.createDefaultOrder() + order_line = order.objectValues()[0] + + # don't need another rules, just need TransformationSourcingRule for test + self.invalidateRules() + + self.stepTic() + + business_process = order.getSpecialiseValue() + + # get last path of a business process + # in simple business path, the last is between "partial_produced" and "done" + causality_path = None + for state in business_process.objectValues( + portal_type=self.portal.getPortalBusinessStateTypeList()): + if len(state.getRemainingTradePhaseList(self.portal)) == 0: + causality_path = state.getSuccessorRelatedValue() + + # phases + phase_p2 = '%s/p2' % business_process.getRelativeUrl() + + # organisations + source_section = causality_path.getSourceSection() + source = causality_path.getSource() + destination_section = causality_path.getDestinationSection() + destination = causality_path.getDestination() + organisations = (source_section, source, destination_section, destination) + + # sourcing resource + sourcing_resource = order_line.getResource() + + # alter simulations of the order + # root + applied_rule = self.portal.portal_simulation.newContent(portal_type='Applied Rule') + movement = applied_rule.newContent(portal_type='Simulation Movement') + applied_rule.edit(causality_value=order) + movement.edit(order_value=order_line, + causality_value=causality_path, + quantity=order_line.getQuantity(), + resource=sourcing_resource, + ) + + self.stepTic() + + # test mock + applied_rule = movement.newContent(potal_type='Applied Rule') + + rule = self.portal.portal_rules.default_transformation_sourcing_model_rule + rule.expand(applied_rule) + + # assertion + expected_value_set = set([ + ((phase_p2,), sourcing_resource, organisations, 10)]) + movement_list = applied_rule.objectValues() + self.assertEquals(len(expected_value_set), len(movement_list)) + movement_value_set = set([]) + for movement in movement_list: + movement_value_set |= set([(tuple(movement.getCausalityList()), + movement.getResource(), + (movement.getSourceSection(), + movement.getSource(), + movement.getDestinationSection(), + movement.getDestination(),), # organisations + movement.getQuantity())]) + self.assertEquals(expected_value_set, movement_value_set) + + +def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestMRPImplementation)) + return suite