diff --git a/product/ERP5/tests/testERP5Simulation.py b/product/ERP5/tests/testERP5Simulation.py index 342ce15fd8b3a1d27d4d86f6994614315b43ba17..34a2138578ea3c97eb9b58f414c83b0312ec022e 100644 --- a/product/ERP5/tests/testERP5Simulation.py +++ b/product/ERP5/tests/testERP5Simulation.py @@ -36,203 +36,138 @@ from zLOG import LOG from Products.CMFCore.utils import getToolByName from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.Sequence import SequenceList -from testPackingList import TestPackingList, TestPackingListMixin +from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod +from testPackingList import TestPackingList +from testInvoice import TestSaleInvoice, TestInvoiceMixin -class TestERP5SimulationMixin(TestPackingListMixin): +class TestERP5SimulationMixin(TestInvoiceMixin): def getBusinessTemplateList(self): - return list(TestPackingListMixin.getBusinessTemplateList(self)) + \ - ['erp5_simulation',] + return list(TestInvoiceMixin.getBusinessTemplateList(self)) + \ + ['erp5_administration', 'erp5_simulation',] def afterSetUp(self, quiet=1, run=1): - TestPackingListMixin.afterSetUp(self, quiet, run) + TestInvoiceMixin.afterSetUp(self) portal_rules = self.portal.portal_rules for rule in portal_rules.objectValues(portal_type='Order Rule'): if rule.getValidationState() == 'validated': rule.invalidate() self.validateNewRules() -class TestERP5Simulation(TestERP5SimulationMixin, ERP5TypeTestCase): - run_all_test = 1 - quiet = 0 + @UnrestrictedMethod + def createInvoiceTransactionRule(self, resource=None): + """Create a sale invoice transaction rule with only one cell for + product_line/apparel and default_region + The accounting rule cell will have the provided resource, but this his more + or less optional (as long as price currency is set correctly on order) + """ + portal = self.portal + account_module = portal.account_module + for account_id, account_gap, account_type \ + in self.account_definition_list: + if not account_id in account_module.objectIds(): + account = account_module.newContent(id=account_id) + account.setGap(account_gap) + account.setAccountType(account_type) + portal.portal_workflow.doActionFor(account, 'validate_action') + + invoice_rule = portal.portal_rules.default_invoice_transaction_rule + if invoice_rule.getValidationState() == 'validated': + invoice_rule.invalidate() + invoice_rule.deleteContent(list(invoice_rule.objectIds())) + transaction.commit() + self.tic() + region_predicate = invoice_rule.newContent(portal_type = 'Predicate') + product_line_predicate = invoice_rule.newContent(portal_type = 'Predicate') + region_predicate.edit( + membership_criterion_base_category_list = ['destination_region'], + membership_criterion_category_list = + ['destination_region/region/%s' % self.default_region ], + int_index = 1, + string_index = 'region' + ) + product_line_predicate.edit( + membership_criterion_base_category_list = ['product_line'], + membership_criterion_category_list = + ['product_line/apparel'], + int_index = 1, + string_index = 'product' + ) + product_line_predicate.immediateReindexObject() + region_predicate.immediateReindexObject() + + invoice_rule.updateMatrix() + cell_list = invoice_rule.getCellValueList(base_id='movement') + self.assertEquals(len(cell_list),1) + cell = cell_list[0] + + for line_id, line_source_id, line_destination_id, line_ratio in \ + self.transaction_line_definition_list: + line = cell.newContent(id=line_id, + portal_type='Accounting Transaction Line', quantity=line_ratio, + resource_value=resource, + source_value=account_module[line_source_id], + destination_value=account_module[line_destination_id]) + + # matching provider for source and destination + for category in ('source', 'destination',): + invoice_rule.newContent( + portal_type='Category Membership Divergence Tester', + title='%s divergence tester' % category, + tested_property=category, + divergence_provider=False, + matching_provider=True) + # matching provider for quantity (i.e. only used for expand) + invoice_rule.newContent( + portal_type='Net Converted Quantity Divergence Tester', + title='%s divergence tester' % category, + tested_property='quantity', + quantity=0, + divergence_provider=False, + matching_provider=True) + invoice_rule.validate() + transaction.commit() + self.tic() def validateNewRules(self): # create an Order Rule document. portal_rules = self.portal.portal_rules new_order_rule = filter( - lambda x:x.title == 'New Simple Order Rule', + lambda x:x.title == 'New Default Order Rule', portal_rules.objectValues(portal_type='Order Rule'))[0] if new_order_rule.getValidationState() != 'validated': new_order_rule.validate() - def _modifyPackingListLineQuantity(self, sequence=None, - sequence_list=None, delta=0.0): - """ - Set a increased quantity on packing list lines - """ - packing_list = sequence.get('packing_list') - quantity = self.default_quantity + delta - sequence.edit(line_quantity=quantity) - for packing_list_line in packing_list.objectValues( - portal_type=self.packing_list_line_portal_type): - packing_list_line.edit(quantity=quantity) - sequence.edit(last_delta=delta) - - def stepIncreasePackingListLineQuantity2(self, sequence=None, - sequence_list=None, **kw): - return self._modifyPackingListLineQuantity(sequence, sequence_list, 2.0) - - def stepDecreasePackingListLineQuantity1(self, sequence=None, - sequence_list=None, **kw): - return self._modifyPackingListLineQuantity(sequence, sequence_list, -1.0) - - def stepDecreasePackingListLineQuantity10(self, sequence=None, - sequence_list=None, **kw): - return self._modifyPackingListLineQuantity(sequence, sequence_list, -10.0) - - def stepSplitAndDeferPackingList(self, sequence=None, sequence_list=None, **kw): - """ - Do the split and defer action - """ - packing_list = sequence.get('packing_list') + def _acceptDecisionQuantity(self, document): solver_tool = self.portal.portal_solvers - solver_process = solver_tool.newSolverProcess(packing_list) - sequence.edit(solver_process=solver_process) + solver_process = solver_tool.newSolverProcess(document) quantity_solver_decision = filter( lambda x:x.getCausalityValue().getTestedProperty()=='quantity', solver_process.contentValues())[0] - # use Quantity Split Solver. - quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Split Solver']) - # configure for Quantity Split Solver. - kw = {'delivery_solver':'FIFO', - 'start_date':packing_list.getStartDate() + 10} - quantity_solver_decision.updateConfiguration(**kw) + # use Quantity Accept Solver. + quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Accept Solver']) solver_process.buildTargetSolverList() solver_process.solve() - # build split deliveries manually. XXX ad-hoc - previous_tag = None - for delivery_builder in packing_list.getBuilderList(): - this_builder_tag = '%s_split_%s' % (packing_list.getPath(), - delivery_builder.getId()) - after_tag = [] - if previous_tag: - after_tag.append(previous_tag) - delivery_builder.activate( - after_method_id=('solve', - 'immediateReindexObject', - 'recursiveImmediateReindexObject',), # XXX too brutal. - after_tag=after_tag, - ).build(explanation_uid=packing_list.getCausalityValue().getUid()) - def stepCheckPackingListSplitted(self, sequence=None, sequence_list=None, **kw): - """ - Test if packing list was splitted - """ - order = sequence.get('order') - packing_list_list = order.getCausalityRelatedValueList( - portal_type=self.packing_list_portal_type) - self.assertEquals(2,len(packing_list_list)) - packing_list1 = None - packing_list2 = None - for packing_list in packing_list_list: - if packing_list.getUid() == sequence.get('packing_list').getUid(): - packing_list1 = packing_list - else: - packing_list2 = packing_list - sequence.edit(new_packing_list=packing_list2) - for line in packing_list1.objectValues( - portal_type= self.packing_list_line_portal_type): - self.assertEquals(self.default_quantity-10,line.getQuantity()) - for line in packing_list2.objectValues( - portal_type= self.packing_list_line_portal_type): - self.assertEquals(10,line.getQuantity()) + def _acceptDivergenceOnInvoice(self, invoice, divergence_list): + print invoice, divergence_list + return self._acceptDecisionQuantity(invoice) - def _checkSolverState(self, sequence=None, sequence_list=None, - state='solved'): - """ - Check if target solvers' state. + def stepAcceptDecisionQuantity(self,sequence=None, sequence_list=None): """ - solver_process = sequence.get('solver_process') - for solver in solver_process.objectValues( - portal_type=self.portal.getPortalTargetSolverTypeList()): - self.assertEquals(state, solver.getSolverState()) - - def stepCheckSolverIsSolving(self, sequence=None, sequence_list=None, **kw): - """ - Check if all target solvers have 'solving' state. - """ - self._checkSolverState(sequence, sequence_list, 'solving') - - def stepCheckSolverIsSolved(self, sequence=None, sequence_list=None, **kw): - """ - Check if all target solvers have 'solved' state. - """ - self._checkSolverState(sequence, sequence_list, 'solved') - - def test_01_splitAndDefer(self, quiet=quiet, run=run_all_test): - """ - Change the quantity on an delivery line, then - see if the packing list is divergent and then - split and defer the packing list + Solve quantity divergence by using solver tool. """ - if not run: return - sequence_list = SequenceList() - - # Test with a simply order without cell - sequence_string = self.default_sequence + '\ - stepIncreasePackingListLineQuantity2 \ - stepCheckPackingListIsCalculating \ - stepTic \ - stepCheckPackingListIsNotDivergent \ - stepCheckPackingListIsSolved \ - stepDecreasePackingListLineQuantity1 \ - stepCheckPackingListIsCalculating \ - stepTic \ - stepCheckPackingListIsNotDivergent \ - stepCheckPackingListIsSolved \ - stepDecreasePackingListLineQuantity10 \ - stepCheckPackingListIsCalculating \ - stepTic \ - stepCheckPackingListIsDiverged \ - stepSplitAndDeferPackingList \ - stepCheckSolverIsSolving \ - stepTic \ - stepCheckPackingListSplitted \ - stepCheckPackingListIsSolved \ - stepCheckSolverIsSolved \ - ' - sequence_list.addSequenceString(sequence_string) - - sequence_list.play(self, quiet=quiet) - -class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): - def validateNewRules(self): - # create an Order Rule document. - portal_rules = self.portal.portal_rules - new_order_rule = filter( - lambda x:x.title == 'New Default Order Rule', - portal_rules.objectValues(portal_type='Order Rule'))[0] - if new_order_rule.getValidationState() != 'validated': - new_order_rule.validate() + packing_list = sequence.get('packing_list') + self._acceptDecisionQuantity(packing_list) - def stepAcceptDecisionQuantity(self,sequence=None, sequence_list=None, **kw): + def stepAcceptDecisionQuantityInvoice(self, sequence=None, + sequence_list=None): """ Solve quantity divergence by using solver tool. """ - packing_list = sequence.get('packing_list') - solver_tool = self.portal.portal_solvers - solver_process = solver_tool.newSolverProcess(packing_list) - quantity_solver_decision = filter( - lambda x:x.getCausalityValue().getTestedProperty()=='quantity', - solver_process.contentValues())[0] - # use Quantity Accept Solver. - quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Accept Solver']) - solver_process.buildTargetSolverList() - solver_process.solve() - # XXX-JPS We do not need the divergence message anymore. - # since the divergence message == the divergence tester itself - # with its title, description, tested property, etc. + invoice = sequence.get('invoice') + self._acceptDecisionQuantity(invoice) - def stepAcceptDecisionResource(self,sequence=None, sequence_list=None, **kw): + def stepAcceptDecisionResource(self,sequence=None, sequence_list=None): """ Solve quantity divergence by using solver tool. """ @@ -247,7 +182,7 @@ class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): solver_process.buildTargetSolverList() solver_process.solve() - def stepSplitAndDeferPackingList(self, sequence=None, sequence_list=None, **kw): + def stepSplitAndDeferPackingList(self, sequence=None, sequence_list=None): """ Do the split and defer action """ @@ -295,7 +230,15 @@ class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): solver_process.buildTargetSolverList() solver_process.solve() - def stepAdoptPrevisionQuantity(self,sequence=None, sequence_list=None, **kw): + def _adoptDivergenceOnInvoice(self, invoice, divergence_list): + print invoice, divergence_list + return self._adoptPrevisionQuantity(invoice) + + def _adoptDivergenceOnPackingList(self, packing_list, divergence_list): + print packing_list, divergence_list + return self._adoptPrevisionQuantity(packing_list) + + def stepAdoptPrevisionQuantity(self,sequence=None, sequence_list=None): """ Solve quantity divergence by using solver tool. """ @@ -303,14 +246,14 @@ class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): self._adoptPrevisionQuantity(packing_list) def stepNewPackingListAdoptPrevisionQuantity(self, sequence=None, - sequence_list=None, **kw): + sequence_list=None): """ Solve quantity divergence by using solver tool. """ packing_list = sequence.get('new_packing_list') self._adoptPrevisionQuantity(packing_list) - def stepAdoptPrevisionResource(self,sequence=None, sequence_list=None, **kw): + def stepAdoptPrevisionResource(self,sequence=None, sequence_list=None): """ Solve resource divergence by using solver tool. """ @@ -325,7 +268,7 @@ class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): solver_process.buildTargetSolverList() solver_process.solve() - def stepCheckPackingListLineWithSameResource(self,sequence=None, sequence_list=None, **kw): + def stepCheckPackingListLineWithSameResource(self,sequence=None, sequence_list=None): """ Look if the packing list has new previsions """ @@ -339,7 +282,7 @@ class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): [x.getOrder() for x in \ line.getDeliveryRelatedValueList()]) - def stepUnifyDestinationWithDecision(self,sequence=None, sequence_list=None, **kw): + def stepUnifyDestinationWithDecision(self,sequence=None, sequence_list=None): """ Check if simulation movement are disconnected """ @@ -354,25 +297,29 @@ class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): solver_process.buildTargetSolverList() solver_process.solve() - def stepUnifyStartDateWithDecision(self,sequence=None, sequence_list=None, **kw): - """ - Check if simulation movement are disconnected - """ - packing_list = sequence.get('packing_list') + def _unifyStartDateWithDecision(self, document): solver_tool = self.portal.portal_solvers - solver_process = solver_tool.newSolverProcess(packing_list) + solver_process = solver_tool.newSolverProcess(document) for start_date_solver_decision in filter( lambda x:x.getCausalityValue().getTestedProperty()=='start_date', solver_process.contentValues()): # use StartDate Replacement Solver. start_date_solver_decision.setSolverValue(self.portal.portal_types['Start Date Replacement Solver']) # configure for Quantity Split Solver. - kw = {'value':packing_list.getStartDate()} + kw = {'value':document.getStartDate()} start_date_solver_decision.updateConfiguration(**kw) solver_process.buildTargetSolverList() solver_process.solve() - def stepUnifyStartDateWithPrevision(self,sequence=None, sequence_list=None, **kw): + def stepUnifyStartDateWithDecision(self,sequence=None, sequence_list=None): + packing_list = sequence.get('packing_list') + self._unifyStartDateWithDecision(packing_list) + + def stepUnifyStartDateWithDecisionInvoice(self,sequence=None, sequence_list=None): + invoice = sequence.get('invoice') + self._unifyStartDateWithDecision(invoice) + + def stepUnifyStartDateWithPrevision(self,sequence=None, sequence_list=None): """ Check if simulation movement are disconnected """ @@ -393,7 +340,7 @@ class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): solver_process.buildTargetSolverList() solver_process.solve() - def checkOrderRuleSimulation(self, rule_reference, sequence=None, sequence_list=None, **kw): + def checkOrderRuleSimulation(self, rule_reference, sequence=None, sequence_list=None): """ Test if simulation is matching order, be sure that rule_reference is used to expand simulation for order @@ -459,7 +406,7 @@ class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): # XXX Test acquisition self.checkAcquisition(simulation_movement, order_movement) - def stepModifySimulationLineQuantityForMergedLine(self,sequence=None, sequence_list=None, **kw): + def stepModifySimulationLineQuantityForMergedLine(self,sequence=None, sequence_list=None): """ Check if simulation movement are disconnected """ @@ -470,7 +417,7 @@ class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): simulation_line.edit(quantity=self.default_quantity-2) simulation_line.getOrderValue().edit(quantity=self.default_quantity-2) - def stepCheckSimulationQuantityUpdatedForMergedLine(self,sequence=None, sequence_list=None, **kw): + def stepCheckSimulationQuantityUpdatedForMergedLine(self,sequence=None, sequence_list=None): """ Test if the quantity of the simulation movement was changed """ @@ -482,7 +429,7 @@ class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): simulation_line.getDeliveryError(), self.default_quantity * 2) - def stepCheckPackingListLineWithDifferentResource(self,sequence=None, sequence_list=None, **kw): + def stepCheckPackingListLineWithDifferentResource(self,sequence=None, sequence_list=None): """ Look if the packing list has new previsions """ @@ -495,8 +442,168 @@ class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): self.assertEquals(sorted(packing_list_line.getCausalityList()), sorted(order_line_list)) +class TestERP5Simulation(TestERP5SimulationMixin, ERP5TypeTestCase): + run_all_test = 1 + quiet = 0 + + def validateNewRules(self): + # create an Order Rule document. + portal_rules = self.portal.portal_rules + new_order_rule = filter( + lambda x:x.title == 'New Simple Order Rule', + portal_rules.objectValues(portal_type='Order Rule'))[0] + if new_order_rule.getValidationState() != 'validated': + new_order_rule.validate() + + def _modifyPackingListLineQuantity(self, sequence=None, + sequence_list=None, delta=0.0): + """ + Set a increased quantity on packing list lines + """ + packing_list = sequence.get('packing_list') + quantity = self.default_quantity + delta + sequence.edit(line_quantity=quantity) + for packing_list_line in packing_list.objectValues( + portal_type=self.packing_list_line_portal_type): + packing_list_line.edit(quantity=quantity) + sequence.edit(last_delta=delta) + + def stepIncreasePackingListLineQuantity2(self, sequence=None, + sequence_list=None, **kw): + return self._modifyPackingListLineQuantity(sequence, sequence_list, 2.0) + + def stepDecreasePackingListLineQuantity1(self, sequence=None, + sequence_list=None, **kw): + return self._modifyPackingListLineQuantity(sequence, sequence_list, -1.0) + + def stepDecreasePackingListLineQuantity10(self, sequence=None, + sequence_list=None, **kw): + return self._modifyPackingListLineQuantity(sequence, sequence_list, -10.0) + + def stepSplitAndDeferPackingList(self, sequence=None, sequence_list=None, **kw): + """ + Do the split and defer action + """ + packing_list = sequence.get('packing_list') + solver_tool = self.portal.portal_solvers + solver_process = solver_tool.newSolverProcess(packing_list) + sequence.edit(solver_process=solver_process) + quantity_solver_decision = filter( + lambda x:x.getCausalityValue().getTestedProperty()=='quantity', + solver_process.contentValues())[0] + # use Quantity Split Solver. + quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Split Solver']) + # configure for Quantity Split Solver. + kw = {'delivery_solver':'FIFO', + 'start_date':packing_list.getStartDate() + 10} + quantity_solver_decision.updateConfiguration(**kw) + solver_process.buildTargetSolverList() + solver_process.solve() + # build split deliveries manually. XXX ad-hoc + previous_tag = None + for delivery_builder in packing_list.getBuilderList(): + this_builder_tag = '%s_split_%s' % (packing_list.getPath(), + delivery_builder.getId()) + after_tag = [] + if previous_tag: + after_tag.append(previous_tag) + delivery_builder.activate( + after_method_id=('solve', + 'immediateReindexObject', + 'recursiveImmediateReindexObject',), # XXX too brutal. + after_tag=after_tag, + ).build(explanation_uid=packing_list.getCausalityValue().getUid()) + + def stepCheckPackingListSplitted(self, sequence=None, sequence_list=None, **kw): + """ + Test if packing list was splitted + """ + order = sequence.get('order') + packing_list_list = order.getCausalityRelatedValueList( + portal_type=self.packing_list_portal_type) + self.assertEquals(2,len(packing_list_list)) + packing_list1 = None + packing_list2 = None + for packing_list in packing_list_list: + if packing_list.getUid() == sequence.get('packing_list').getUid(): + packing_list1 = packing_list + else: + packing_list2 = packing_list + sequence.edit(new_packing_list=packing_list2) + for line in packing_list1.objectValues( + portal_type= self.packing_list_line_portal_type): + self.assertEquals(self.default_quantity-10,line.getQuantity()) + for line in packing_list2.objectValues( + portal_type= self.packing_list_line_portal_type): + self.assertEquals(10,line.getQuantity()) + + def _checkSolverState(self, sequence=None, sequence_list=None, + state='solved'): + """ + Check if target solvers' state. + """ + solver_process = sequence.get('solver_process') + for solver in solver_process.objectValues( + portal_type=self.portal.getPortalTargetSolverTypeList()): + self.assertEquals(state, solver.getSolverState()) + + def stepCheckSolverIsSolving(self, sequence=None, sequence_list=None, **kw): + """ + Check if all target solvers have 'solving' state. + """ + self._checkSolverState(sequence, sequence_list, 'solving') + + def stepCheckSolverIsSolved(self, sequence=None, sequence_list=None, **kw): + """ + Check if all target solvers have 'solved' state. + """ + self._checkSolverState(sequence, sequence_list, 'solved') + + def test_01_splitAndDefer(self, quiet=quiet, run=run_all_test): + """ + Change the quantity on an delivery line, then + see if the packing list is divergent and then + split and defer the packing list + """ + if not run: return + sequence_list = SequenceList() + + # Test with a simply order without cell + sequence_string = self.default_sequence + '\ + stepIncreasePackingListLineQuantity2 \ + stepCheckPackingListIsCalculating \ + stepTic \ + stepCheckPackingListIsNotDivergent \ + stepCheckPackingListIsSolved \ + stepDecreasePackingListLineQuantity1 \ + stepCheckPackingListIsCalculating \ + stepTic \ + stepCheckPackingListIsNotDivergent \ + stepCheckPackingListIsSolved \ + stepDecreasePackingListLineQuantity10 \ + stepCheckPackingListIsCalculating \ + stepTic \ + stepCheckPackingListIsDiverged \ + stepSplitAndDeferPackingList \ + stepCheckSolverIsSolving \ + stepTic \ + stepCheckPackingListSplitted \ + stepCheckPackingListIsSolved \ + stepCheckSolverIsSolved \ + ' + sequence_list.addSequenceString(sequence_string) + + sequence_list.play(self, quiet=quiet) + +class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList): + pass + +class TestERP5SimulationInvoice(TestERP5SimulationMixin, TestSaleInvoice): + pass + def test_suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(TestERP5Simulation)) suite.addTest(unittest.makeSuite(TestERP5SimulationPackingList)) + suite.addTest(unittest.makeSuite(TestERP5SimulationInvoice)) return suite