##############################################################################
#
# Copyright (c) 2004 Nexedi SARL and Contributors. All Rights Reserved.
#          Jerome Perrin <jerome@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

"""Unit Tests for Inventory API.
"""

import sys
import random
import os

if __name__ == '__main__':
  execfile(os.path.join(sys.path[0], 'framework.py'))

from AccessControl.SecurityManagement import newSecurityManager
from DateTime import DateTime
from Testing import ZopeTestCase

from Products.ERP5.Document.OrderRule import OrderRule
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import reindex

# Needed in order to have a log file inside the current folder
os.environ.setdefault('EVENT_LOG_FILE', 'zLOG.log')
os.environ.setdefault('EVENT_LOG_SEVERITY', '-300')

class InventoryAPITestCase(ERP5TypeTestCase):
  """Base class for Inventory API Tests {{{
  """

  GROUP_CATEGORIES = ( 'group/test_group/A1/B1/C1',
                       'group/test_group/A1/B1/C2',
                       'group/test_group/A1/B2/C1',
                       'group/test_group/A1/B2/C2',
                       'group/test_group/A2/B1/C1',
                       'group/test_group/A2/B1/C2',
                       'group/test_group/A2/B2/C1',
                       'group/test_group/A2/B2/C2', )
  
  def getTitle(self):
    """Title of the test."""
    return self.__class__.__doc__

  def afterSetUp(self):
    """set up """
    self.createCategories()
    self.login()
    self.portal = self.getPortal()
    if not hasattr(self.portal, 'testing_folder'):
      self.portal.newContent(portal_type='Folder',
                            id='testing_folder')
    self.folder = self.portal.testing_folder
    
    self.section = self._makeOrganisation(title='Section')
    self.node = self._makeOrganisation(title='Node')
    self.payment_node = self.section.newContent(
                                  title='Payment Node',
                                  portal_type='Bank Account')
    self.mirror_section = self._makeOrganisation(title='Mirror Section')
    self.mirror_node = self._makeOrganisation(title='Mirror Node')
    self.resource = self.getCurrencyModule().newContent(
                                  title='Resource',
                                  portal_type='Currency')
    # create a dummy Rule, to be able to create simulation movements
    rule_tool = self.portal.portal_rules
    if not hasattr(rule_tool, 'default_order_rule'):
      rule_tool._setObject('default_order_rule',
                           OrderRule('default_order_rule'))

  def _safeTic(self):
    """Like tic, but swallowing errors, usefull for teardown"""
    try:
      get_transaction().commit()
      self.tic()
    except RuntimeError:
      pass

  def beforeTearDown(self):
    """Clear everything for next test."""
    self._safeTic()
    for module in [ 'organisation_module',
                    'person_module',
                    'currency_module',
                    'portal_simulation',
                    self.folder.getId() ]:
      folder = getattr(self.getPortal(), module, None)
      if folder:
        [x.unindexObject() for x in folder.objectValues()]
        self._safeTic()
        folder.manage_delObjects([x.getId() for x in folder.objectValues()])
    self._safeTic()
    # cancel remaining messages
    activity_tool = self.getPortal().portal_activities
    for message in activity_tool.getMessageList():
      activity_tool.manageCancel(message.object_path, message.method_id)
      ZopeTestCase._print('\nCancelling active message %s.%s()\n'
                          % (message.object_path, message.method_id) )
    get_transaction().commit()

  def login(self, quiet=0, run=1):
    uf = self.getPortal().acl_users
    uf._doAddUser('alex', '', ['Manager', 'Assignee', 'Assignor',
                               'Associate', 'Auditor', 'Author'], [])
    user = uf.getUserById('alex').__of__(uf)
    newSecurityManager(None, user)
  
  def createCategories(self):
    """Create the categories for our test. """
    # create categories
    for cat_string in self.getNeededCategoryList() :
      base_cat = cat_string.split("/")[0]
      path = self.getPortal().portal_categories[base_cat]
      for cat in cat_string.split("/")[1:] :
        if not cat in path.objectIds() :
          path = path.newContent(
                    portal_type='Category',
                    id=cat,
                    immediate_reindex=1 )
        else:
          path = path[cat]
    # check categories have been created
    for cat_string in self.getNeededCategoryList() :
      self.assertNotEquals(None,
                self.getCategoryTool().restrictedTraverse(cat_string),
                cat_string)
                
  def getNeededCategoryList(self):
    """return a list of categories that should be created."""
    return (  'region/level1/level2',
              'group/level1/level2',
              'group/anotherlevel',
              'product_line/level1/level2',
           # we create a huge group category for consolidation tests
           ) + self.GROUP_CATEGORIES
  
  def getBusinessTemplateList(self):
    """ """
    return ('erp5_base', 'erp5_dummy_movement')

  # TODO: move this to a base class {{{
  @reindex
  def _makeOrganisation(self, **kw):
    """Creates an organisation."""
    org = self.getPortal().organisation_module.newContent(
          portal_type='Organisation',
          **kw)
    return org

  @reindex
  def _makeSalePackingList(self, **kw):
    """Creates a sale packing list."""
    spl = self.getPortal().sale_packing_list_module.newContent(
          portal_type='Sale Packing List',)
    spl.edit(**kw)
    return spl
  
  @reindex
  def _makeSaleInvoice(self, created_by_builder=0, **kw):
    """Creates a sale invoice."""
    sit = self.getPortal().accounting_module.newContent(
          portal_type='Sale Invoice Transaction',
          created_by_builder=created_by_builder)
    sit.edit(**kw)
    return sit

  @reindex
  def _makeCurrency(self, **kw):
    """Creates a currency."""
    currency = self.getCurrencyModule().newContent(
            portal_type = 'Currency', **kw)
    get_transaction().commit()
    self.tic()
    return currency
  _makeResource = _makeCurrency
  # }}}

  @reindex
  def _makeMovement(self, **kw):
    """Creates a movement.
    """
    mvt = self.folder.newContent(portal_type='Dummy Movement')
    kw.setdefault('destination_section_value', self.section)
    kw.setdefault('source_section_value', self.mirror_section)
    kw.setdefault('destination_value', self.node)
    kw.setdefault('source_value', self.mirror_node)
    kw.setdefault('resource_value', self.resource)
    mvt.edit(**kw)
    return mvt
  
  @reindex
  def _makeSimulationMovement(self, **kw):
    """Creates a simulation movement.
    """
    ar = self.getSimulationTool().newContent(portal_type='Applied Rule')
    # we created a default_order_rule in setUp
    ar.setSpecialise('portal_rules/default_order_rule')
    mvt = ar.newContent(portal_type='Simulation Movement')
    kw.setdefault('destination_section_value', self.section)
    kw.setdefault('source_section_value', self.mirror_section)
    kw.setdefault('destination_value', self.node)
    kw.setdefault('source_value', self.mirror_node)
    kw.setdefault('resource_value', self.resource)
    mvt.edit(**kw)
    return mvt

# }}}

class TestInventory(InventoryAPITestCase):
  """Tests getInventory methods.
  """
  def testReturnedTypeIsFloat(self):
    """getInventory returns a float"""
    getInventory = self.getSimulationTool().getInventory
    self.assertEquals(type(getInventory()), type(0.1))
    # default is 0
    self.assertEquals(0, getInventory())

  def test_SimulationMovement(self):
    """Test Simulation Movements works in this testing environnement.
    """
    getInventory = self.getSimulationTool().getInventory
    self._makeSimulationMovement(quantity=100)
    self.assertEquals(100, getInventory(section_uid=self.section.getUid()))
    # mixed with a real movement
    self._makeMovement(quantity=100)
    self.assertEquals(200, getInventory(section_uid=self.section.getUid()))

  def test_SimulationMovementisAccountable(self):
    """Test Simulation Movements are not accountable if related to a delivery.
    """
    getInventory = self.getSimulationTool().getInventory
    sim_mvt = self._makeSimulationMovement(quantity=100)
    mvt = self._makeMovement(quantity=100)
    # simulation movement are accountable,
    self.failUnless(sim_mvt.isAccountable())
    # unless connected to a delivery movement
    sim_mvt.setDeliveryValue(mvt)
    self.failIf(sim_mvt.isAccountable())
    # not accountable movement are not counted by getInventory
    get_transaction().commit(); self.tic() # (after reindexing of course)
    self.assertEquals(100, getInventory(section_uid=self.section.getUid()))
  
  def test_OmitSimulation(self):
    """Test omit_simulation argument to getInventory.
    """
    getInventory = self.getSimulationTool().getInventory
    self._makeSimulationMovement(quantity=100)
    self._makeMovement(quantity=100)
    self.assertEquals(100, getInventory(section_uid=self.section.getUid(),
                                        omit_simulation=1))

  def test_SectionCategory(self):
    """Tests inventory on section category. """
    getInventory = self.getSimulationTool().getInventory
    self.section.setGroup('level1/level2')
    self._makeMovement(quantity=100)
    self.assertEquals(getInventory(
                        section_category='group/level1'), 100)
    self.assertEquals(getInventory(
                        section_category='group/level1/level2'), 100)
    self.assertEquals(getInventory(
                        section_category='group/anotherlevel'), 0)
    
    # section category can be a list
    self.assertEquals(getInventory(
            section_category=['group/anotherlevel', 'group/level1']), 100)

    # strict_section_category only takes movement where section is strict
    # member of the category.
    self.assertEquals(getInventory(
                section_category_strict_membership=['group/level1']), 0)
    self.section.setGroup('level1')
    get_transaction().commit()
    self.tic()
    self.assertEquals(getInventory(
                section_category_strict_membership=['group/level1']), 100)
    
    # non existing values to section_category are not silently ignored, but
    # raises an exception
    self.assertRaises(ValueError,
                      getInventory,
                      section_category='group/notexists')

  def test_MirrorSectionCategory(self):
    """Tests inventory on mirror section category. """
    getInventory = self.getSimulationTool().getInventory
    self.mirror_section.setGroup('level1/level2')
    self._makeMovement(quantity=100)
    self.assertEquals(getInventory(
                        mirror_section_category='group/level1'), 100)
    self.assertEquals(getInventory(
                        mirror_section_category='group/level1/level2'), 100)
    self.assertEquals(getInventory(
                        mirror_section_category='group/anotherlevel'), 0)
    
    # section category can be a list
    self.assertEquals(getInventory(
            mirror_section_category=['group/anotherlevel',
                                     'group/level1']), 100)

    # strict_section_category only takes movement where section is strict
    # member of the category.
    self.assertEquals(getInventory(
              mirror_section_category_strict_membership=['group/level1']), 0)
    self.mirror_section.setGroup('level1')
    get_transaction().commit()
    self.tic()
    self.assertEquals(getInventory(
            mirror_section_category_strict_membership=['group/level1']), 100)
    
    # non existing values to section_category are not silently ignored, but
    # raises an exception
    self.assertRaises(ValueError,
                      getInventory,
                      mirror_section_category='group/notexists')

  def test_NodeCategory(self):
    """Tests inventory on node_category """
    getInventory = self.getSimulationTool().getInventory
    self.node.setGroup('level1/level2')
    self._makeMovement(quantity=100,
                       source_value=None)
    self.assertEquals(getInventory(
                        node_category='group/level1'), 100)
    self.assertEquals(getInventory(
                        node_category='group/level1/level2'), 100)
    self.assertEquals(getInventory(
                node_category_strict_membership=['group/level1']), 0)
    self.node.setGroup('level1')
    get_transaction().commit()
    self.tic()
    self.assertEquals(getInventory(
                node_category_strict_membership=['group/level1']), 100)
  
  def test_ResourceCategory(self):
    """Tests inventory on resource_category """
    getInventory = self.getSimulationTool().getInventory
    self.resource.setProductLine('level1/level2')
    self._makeMovement(quantity=100,
                       source_value=None)
    self.assertEquals(getInventory(
                        resource_category='product_line/level1'), 100)
    self.assertEquals(getInventory(
                        resource_category='product_line/level1/level2'), 100)
    self.assertEquals(getInventory(
                resource_category_strict_membership=['product_line/level1']), 0)
    self.resource.setProductLine('level1')
    get_transaction().commit()
    self.tic()
    self.assertEquals(getInventory(
            resource_category_strict_membership=['product_line/level1']), 100)

  def test_PaymentCategory(self):
    """Tests inventory on payment_category """
    getInventory = self.getSimulationTool().getInventory
    # for now, BankAccount have a product_line category, so we can use this for
    # our category membership tests.
    self.payment_node.setProductLine('level1/level2')
    self._makeMovement(quantity=100,
                       destination_payment_value=self.payment_node,
                       source_value=None)
    self.assertEquals(getInventory(
                        payment_category='product_line/level1'), 100)
    self.assertEquals(getInventory(
                        payment_category='product_line/level1/level2'), 100)
    self.assertEquals(getInventory(
                payment_category_strict_membership=['product_line/level1']), 0)
    self.payment_node.setProductLine('level1')
    get_transaction().commit()
    self.tic()
    self.assertEquals(getInventory(
              payment_category_strict_membership=['product_line/level1']), 100)

  def test_SimulationState(self):
    """Tests inventory on simulation state. """
    getInventory = self.getSimulationTool().getInventory
    self.payment_node.setProductLine('level1/level2')
    self._makeMovement(quantity=100,
                       simulation_state='confirmed',
                       source_value=None)
    self.assertEquals(getInventory(), 100)
    self.assertEquals(getInventory(simulation_state='confirmed'), 100)
    self.assertEquals(getInventory(simulation_state='planned'), 0)

    self.assertEquals(getInventory(simulation_state=['planned',
                                                     'confirmed']), 100)

  def test_MultipleNodes(self):
    """Test section category with many nodes. """
    test_group = self.getCategoryTool().resolveCategory('group/test_group')
    self.assertNotEquals(len(test_group.objectValues()), 0)
    # we first create a section for each group category
    quantity_for_node = {}
    for category in test_group.getCategoryChildValueList():
      # we create a member node for each category
      node = self._makeOrganisation(group_value=category)
      # we create a movement to each node
      quantity = random.randint(100, 1000)
      self._makeMovement(quantity=quantity,
                         destination_section_value=node,
                         destination_value=node)
      # and record for later
      quantity_for_node[node] = quantity

    getInventory = self.getSimulationTool().getInventory
    for category in test_group.getCategoryChildValueList():
      node_list = category.getGroupRelatedValueList(portal_type='Organisation')
      self.assertNotEquals(len(node_list), 0)

      # getInventory on node uid for all member of a category ...
      total_quantity = sum([quantity_for_node[node] for node in node_list])
      self.assertEquals(getInventory(
        node_uid=[node.getUid() for node in node_list]), total_quantity)
      # ... is equivalent to node_category
      self.assertEquals(getInventory(
        node_category=category.getRelativeUrl()), total_quantity)
  
  # FIXME: this test is currently broken
  def TODO_test_DoubleSectionCategory(self):
    """Tests inventory on section category, when the section is twice member\
    of the same category like it happens for group and mapping"""
    getInventory = self.getSimulationTool().getInventory
    self.section.setGroup('level1/level2')
    self.section.setMapping('group/level1/level2')
    self._makeMovement(quantity=100)
    # We are twice member of the section_category, but the quantity should not
    # change.
    self.assertEquals(getInventory(
                        section_category='group/level1'), 100)
    self.assertEquals(getInventory(
                        section_category='group/level1/level2'), 100)
    self.assertEquals(getInventory(
            section_category_strict_membership=['group/level1/level2']), 100)

  def test_NoSection(self):
    """Tests inventory on section category / section uid, when the section is\
    empty."""
    getInventory = self.getSimulationTool().getInventory
    self.section.setGroup('level1/level2')
    self._makeMovement(quantity=100, source_section_value=None)
    self.assertEquals(getInventory(
                        section_category='group/level1/level2'), 100)
    self.assertEquals(getInventory(
            section_category_strict_membership=['group/level1/level2']), 100)
    self.assertEquals(getInventory(
                        section_uid=self.section.getUid()), 100)
  
  def testPrecision(self):
    # getInventory supports a precision= argument to specify the precision to
    # round
    getInventory = self.getSimulationTool().getInventory
    getInventoryAssetPrice = self.getSimulationTool().getInventoryAssetPrice
    self._makeMovement( quantity=0.1234, price=1 )
    self.assertAlmostEquals(0.123,
                getInventory(precision=3, node_uid=self.node.getUid()),
                places=3)
    self.assertAlmostEquals(0.123,
             getInventoryAssetPrice(precision=3, node_uid=self.node.getUid()),
             places=3)
  
  def testPrecisionAndFloatRoundingIssues(self):
    # sum([0.1] * 10) != 1.0 but this is not a problem here
    getInventory = self.getSimulationTool().getInventory
    getInventoryAssetPrice = self.getSimulationTool().getInventoryAssetPrice
    self._makeMovement( quantity=1, price=1 )
    for i in range(10):
      self._makeMovement( quantity=-0.1, price=1 )
    self.assertEquals(0, getInventory(precision=2, node_uid=self.node.getUid()))
    self.assertEquals(0, getInventoryAssetPrice(precision=2,
                                                node_uid=self.node.getUid()))
    

class TestInventoryList(InventoryAPITestCase):
  """Tests getInventoryList methods.
  """

class TestMovementHistoryList(InventoryAPITestCase):
  """Tests Movement history list methods.
  """
  def testReturnedTypeIsList(self):
    """Movement History List returns a sequence object""" 
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    mvt_history_list = getMovementHistoryList()
    self.assertEquals(str(mvt_history_list.__class__),
                    'Shared.DC.ZRDB.Results.Results')
    # default is an empty list
    self.assertEquals(0, len(mvt_history_list))
  
  def testMovementBothSides(self):
    """Movement History List returns movement from both sides""" 
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    self._makeMovement(quantity=100)
    # we don't filter, so we have the same movement from both sides.
    self.assertEquals(2, len(getMovementHistoryList()))

  def testBrainClass(self):
    """Movement History List uses InventoryListBrain for brains""" 
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    self._makeMovement(quantity=100)
    # maybe this check is too low level (Shared/DC/ZRDB//Results.py, class r) 
    r_bases = getMovementHistoryList()._class.__bases__
    brain_class = r_bases[2].__name__
    self.assertEquals('InventoryListBrain', brain_class,
      "unexpected brain class for getMovementHistoryList InventoryListBrain"
      " != %s (bases %s)" % (brain_class, r_bases))
  
  def testSection(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    mvt = self._makeMovement(quantity=100)
    mvt_history_list = getMovementHistoryList(
                            section_uid = self.section.getUid())
    self.assertEquals(1, len(mvt_history_list))
    self.assertEquals(mvt.getUid(), mvt_history_list[0].uid)
    self.assertEquals(100, mvt_history_list[0].total_quantity)
    self.assertEquals(self.section.getRelativeUrl(),
                  mvt_history_list[0].section_relative_url)
  
  def testMirrorSection(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    mvt = self._makeMovement(quantity=100)
    mvt_history_list = getMovementHistoryList(
                            mirror_section_uid = self.section.getUid())
    self.assertEquals(1, len(mvt_history_list))
    self.assertEquals(mvt.getUid(), mvt_history_list[0].uid)
    self.assertEquals(-100, mvt_history_list[0].total_quantity)
    self.assertEquals(self.mirror_section.getRelativeUrl(),
                  mvt_history_list[0].section_relative_url)
    self.assertEquals(self.mirror_node.getRelativeUrl(),
                  mvt_history_list[0].node_relative_url)
    
    # if we look from the other side, everything is reverted
    mvt_history_list = getMovementHistoryList(
                            section_uid = self.section.getUid())
    self.assertEquals(1, len(mvt_history_list))
    self.assertEquals(100, mvt_history_list[0].total_quantity)
    self.assertEquals(self.section.getRelativeUrl(),
                  mvt_history_list[0].section_relative_url)
    self.assertEquals(self.node.getRelativeUrl(),
                  mvt_history_list[0].node_relative_url)
  
  def testDifferentDatesPerSection(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    start_date = DateTime(2001, 1, 1)
    stop_date = DateTime(2002, 2, 2)
    mvt = self._makeMovement(quantity=100,
                             start_date=start_date,
                             stop_date=stop_date)
    # start_date is for source
    self.assertEquals(start_date, getMovementHistoryList(
                            section_uid=self.mirror_section.getUid())[0].date)
    # stop_date is for destination
    self.assertEquals(stop_date, getMovementHistoryList(
                            section_uid=self.section.getUid())[0].date)
    
  def testNode(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    mvt = self._makeMovement(quantity=100)
    mvt_history_list = getMovementHistoryList(
                            node_uid = self.node.getUid())
    self.assertEquals(1, len(mvt_history_list))
    self.assertEquals(mvt.getUid(), mvt_history_list[0].uid)
    self.assertEquals(100, mvt_history_list[0].total_quantity)

  def testMirrorNode(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    mvt = self._makeMovement(quantity=100)
    mvt_history_list = getMovementHistoryList(
                            mirror_node_uid = self.node.getUid())
    self.assertEquals(1, len(mvt_history_list))
    self.assertEquals(mvt.getUid(), mvt_history_list[0].uid)
    self.assertEquals(-100, mvt_history_list[0].total_quantity)

  def testResource(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    mvt = self._makeMovement(quantity=100)
    another_resource = self._makeResource()
    another_mvt = self._makeMovement(quantity=3,
                                     resource_value=another_resource)
    # we can query resource directly by uid
    mvt_history_list = getMovementHistoryList(
                            node_uid=self.node.getUid(),
                            resource_uid=self.resource.getUid())
    self.assertEquals(1, len(mvt_history_list))
    self.assertEquals(100, mvt_history_list[0].total_quantity)
    # getMovementHistoryList should return only movement for
    mvt_history_list = getMovementHistoryList(
                            node_uid=self.node.getUid(),
                            resource_uid=another_resource.getUid())
    self.assertEquals(1, len(mvt_history_list))
    self.assertEquals(3, mvt_history_list[0].total_quantity)

    # wrong value yields an empty list
    self.assertEquals(0, len(getMovementHistoryList(
                            resource_uid = self.node.getUid())))
  
  def testSectionCategory(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    self.section.setGroup('level1/level2')
    mvt = self._makeMovement(quantity=100)

    # for section category, both exact category or any parent category works
    # section_category can also be a list.
    for section_category in [ 'group/level1',
                              'group/level1/level2',
                             ['group/level1', 'group/anotherlevel'],
                             ['group/level1', 'group/level1'],
                             ['group/level1', 'group/level1/level2'], ]:
      movement_history_list = getMovementHistoryList(
                                section_category=section_category)
      self.assertEquals(len(movement_history_list), 1)
      self.assertEquals(movement_history_list[0].total_quantity, 100)
    
    # again, bad category raises an exception
    self.assertRaises(ValueError,
                      getMovementHistoryList,
                      section_category='group/notexists')
    # (but other arguments are ignored)
    self.assertEquals(len(getMovementHistoryList(
                        section_category='group/level1',
                        ignored='argument')), 1)
    
  def testNodeCategoryAndSectionCategory(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    self.section.setGroup('level1/level2')
    self.node.setGroup('level1')
    mvt = self._makeMovement(quantity=100)

    valid_category_list = [ 'group/level1',
                           ['group/level1', 'group/anotherlevel'],
                           ['group/level1', 'group/level1'],
                           ['group/level1', 'group/level1/level2'], ]
    invalid_category_list = ['group/anotherlevel', 'product_line/level1']

    # both valid
    for section_category in valid_category_list:
      for node_category in valid_category_list:
        movement_history_list = getMovementHistoryList(
                                  node_category=node_category,
                                  section_category=section_category)
        self.assertEquals(len(movement_history_list), 1)
        self.assertEquals(movement_history_list[0].total_quantity, 100)

    # if node category OR section category is not matched, no movement are
    # returned.
    for section_category in valid_category_list:
      for node_category in invalid_category_list:
        movement_history_list = getMovementHistoryList(
                                  node_category=node_category,
                                  section_category=section_category)
        self.assertEquals(len(movement_history_list), 0)

    for section_category in invalid_category_list:
      for node_category in valid_category_list:
        movement_history_list = getMovementHistoryList(
                                  node_category=node_category,
                                  section_category=section_category)
        self.assertEquals(len(movement_history_list), 0)


  # Date tests:
  # ===========
  #
  # For all date tests, we create a list of movements with dates:
  #     start_date (date for source)        stop_date(date for destination)
  #              2006/01/01                       2006/01/02
  #              2006/01/02                       2006/01/03
  #              2006/01/03                       2006/01/04
  #              2006/01/04                       2006/01/05
  # in all those tests, we usually look from the destination, so the first
  # movement is at 2006/01/02
  #

  def test_FromDate(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    for date in [DateTime(2006, 01, day) for day in range(1, 4)]:
      self._makeMovement(quantity=100,
                         start_date=date,
                         stop_date=date+1)
    # from_date takes all movements >= 
    self.assertEquals(len(getMovementHistoryList(
                        from_date=DateTime(2006, 01, 03),
                        section_uid=self.section.getUid())), 2)
    self.assertEquals(len(getMovementHistoryList(
                        from_date=DateTime(2006, 01, 02),
                        section_uid=self.mirror_section.getUid())), 2)

  def test_AtDate(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    for date in [DateTime(2006, 01, day) for day in range(1, 4)]:
      self._makeMovement(quantity=100,
                         start_date=date,
                         stop_date=date+1)
    # at_date takes all movements <=
    self.assertEquals(len(getMovementHistoryList(
                        at_date=DateTime(2006, 01, 03),
                        section_uid=self.section.getUid())), 2)
    self.assertEquals(len(getMovementHistoryList(
                        at_date=DateTime(2006, 01, 02),
                        section_uid=self.mirror_section.getUid())), 2)

  def test_ToDate(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    for date in [DateTime(2006, 01, day) for day in range(1, 4)]:
      self._makeMovement(quantity=100,
                         start_date=date,
                         stop_date=date+1)
    # to_date takes all movements <
    self.assertEquals(len(getMovementHistoryList(
                        to_date=DateTime(2006, 01, 03),
                        section_uid=self.section.getUid())), 1)
    self.assertEquals(len(getMovementHistoryList(
                        to_date=DateTime(2006, 01, 02),
                        section_uid=self.mirror_section.getUid())), 1)

  def test_FromDateAtDate(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    for date in [DateTime(2006, 01, day) for day in range(1, 4)]:
      self._makeMovement(quantity=100,
                         start_date=date,
                         stop_date=date+1)
    # both from_date and at_date
    self.assertEquals(len(getMovementHistoryList(
                        from_date=DateTime(2006, 01, 03),
                        at_date=DateTime(2006, 01, 03),
                        section_uid=self.section.getUid())), 1)
    self.assertEquals(len(getMovementHistoryList(
                        from_date=DateTime(2006, 01, 02),
                        at_date=DateTime(2006, 01, 03),
                        section_uid=self.section.getUid())), 2)
    self.assertEquals(len(getMovementHistoryList(
                        from_date=DateTime(2005, 01, 02),
                        at_date=DateTime(2006, 01, 03),
                        section_uid=self.section.getUid())), 2)
    # from other side
    self.assertEquals(len(getMovementHistoryList(
                        from_date=DateTime(2006, 01, 02),
                        at_date=DateTime(2006, 01, 03),
                        section_uid=self.mirror_section.getUid())), 2)

  def test_FromDateToDate(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    for date in [DateTime(2006, 01, day) for day in range(1, 4)]:
      self._makeMovement(quantity=100,
                         start_date=date,
                         stop_date=date+1)
    # both from_date and to_date
    self.assertEquals(len(getMovementHistoryList(
                        from_date=DateTime(2006, 01, 03),
                        to_date=DateTime(2006, 01, 03),
                        section_uid=self.section.getUid())), 0)
    self.assertEquals(len(getMovementHistoryList(
                        from_date=DateTime(2006, 01, 02),
                        to_date=DateTime(2006, 01, 03),
                        section_uid=self.section.getUid())), 1)
    self.assertEquals(len(getMovementHistoryList(
                        from_date=DateTime(2005, 01, 02),
                        to_date=DateTime(2007, 01, 02),
                        section_uid=self.section.getUid())), 3)
    # from other side
    self.assertEquals(len(getMovementHistoryList(
                        from_date=DateTime(2006, 01, 02),
                        to_date=DateTime(2006, 01, 03),
                        section_uid=self.mirror_section.getUid())), 1)

  def test_SortOnDate(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    date_list = [DateTime(2006, 01, day) for day in range(1, 10)]
    reverse_date_list = date_list[:]
    reverse_date_list.reverse()

    # we create movements with a random order on dates, to have an extra change
    # that they are not sorted accidentally.
    random_date_list = date_list[:]
    random.shuffle(random_date_list)
    for date in random_date_list:
      self._makeMovement(quantity=100,
                         start_date=date - 1,
                         stop_date=date)
    
    movement_date_list = [ x.date for x in getMovementHistoryList(
                              section_uid=self.section.getUid(),
                              sort_on=(('stock.date', 'ascending'),)) ]
    self.assertEquals(movement_date_list, date_list)
    movement_date_list = [ x.date for x in getMovementHistoryList(
                              section_uid=self.section.getUid(),
                              sort_on=(('stock.date', 'descending'),)) ]
    self.assertEquals(movement_date_list, reverse_date_list)
    # minimum test for (('stock.date', 'ASC'), ('stock.uid', 'ASC')) which is
    # what you want to make sure that the last line on a page precedes the
    # first line on the previous page.
    movement_date_list = [x.date for x in getMovementHistoryList(
                              section_uid=self.section.getUid(),
                              sort_on=(('stock.date', 'ascending'),
                                       ('stock.uid', 'ascending'),)) ]
    self.assertEquals(movement_date_list, date_list)

  # FIXME: do we want to include it or no ?
  def test_Limit(self):
    return "is it part of this API ?" # XXX
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    for q in range(10):
      self._makeMovement(quantity=1)
    self.assertEquals(3, len(getMovementHistoryList(list_start=2,
                                                    list_lines=3)))
  
  def test_SimulationState(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    self._makeMovement(quantity=2, simulation_state="confirmed")
    self._makeMovement(quantity=3, simulation_state="planned")
    for simulation_state in ['confirmed', ['confirmed', 'stopped']]:
      movement_history_list = getMovementHistoryList(
                                simulation_state=simulation_state,
                                section_uid=self.section.getUid())
      self.assertEquals(len(movement_history_list), 1)
      self.assertEquals(movement_history_list[0].total_quantity, 2)
    
    movement_history_list = getMovementHistoryList(
                              simulation_state=["confirmed", "planned"],
                              section_uid=self.section.getUid())
    self.assertEquals(len(movement_history_list), 2)

  def test_SimulationMovement(self):
    """Test simulation movement are listed in getMovementHistoryList
    """
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    self._makeSimulationMovement(quantity=100)
    self._makeMovement(quantity=100)
    movement_history_list = getMovementHistoryList(
                                    section_uid=self.section.getUid())
    self.assertEquals(2, len(movement_history_list))
  
  def test_OmitSimulation(self):
    """Test omit_simulation argument to getMovementHistoryList.
    """
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    self._makeSimulationMovement(quantity=100)
    self._makeMovement(quantity=100)
    movement_history_list = getMovementHistoryList(
                                    section_uid=self.section.getUid(),
                                    omit_simulation=1)
    self.assertEquals(1, len(movement_history_list))
    self.assertEquals(100, movement_history_list[0].quantity)
  
  def test_RunningTotalQuantity(self):
    """Test that a running_total_quantity attribute is set on brains
    """
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    date_and_qty_list = [(DateTime(2006, 01, day), day) for day in range(1, 10)]
    for date, quantity in date_and_qty_list:
      self._makeMovement(stop_date=date, quantity=quantity)
    movement_history_list = getMovementHistoryList(
                                    section_uid=self.section.getUid(),
                                    sort_on=[('stock.date', 'asc'),
                                             ('stock.uid', 'asc')])
    running_total_quantity=0
    for idx, (date, quantity) in enumerate(date_and_qty_list):
      brain = movement_history_list[idx]
      running_total_quantity += quantity
      self.assertEquals(running_total_quantity, brain.running_total_quantity)
      self.assertEquals(date, brain.date)
      self.assertEquals(quantity, brain.quantity)
  
  def test_RunningTotalPrice(self):
    """Test that a running_total_price attribute is set on brains
    """
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    date_and_price_list = [(DateTime(2006, 01, day), day) for day in range(1, 10)]
    for date, price in date_and_price_list:
      self._makeMovement(stop_date=date, quantity=1, price=price)
    movement_history_list = getMovementHistoryList(
                                    section_uid=self.section.getUid(),
                                    sort_on=[('stock.date', 'asc'),
                                             ('stock.uid', 'asc')])
    running_total_price=0
    for idx, (date, price) in enumerate(date_and_price_list):
      brain = movement_history_list[idx]
      running_total_price += price
      self.assertEquals(running_total_price, brain.running_total_price)
      self.assertEquals(date, brain.date)
      self.assertEquals(price, brain.total_price)

  def test_RunningTotalWithInitialValue(self):
    """Test running_total_price and running_total_quantity with an initial
    value.
    """
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    date_and_qty_list = [(DateTime(2006, 01, day), day) for day in range(1, 10)]
    for date, quantity in date_and_qty_list:
      self._makeMovement(stop_date=date, price=quantity, quantity=quantity)
    initial_running_total_price=100
    initial_running_total_quantity=-10
    movement_history_list = getMovementHistoryList(
                                    initial_running_total_quantity=
                                            initial_running_total_quantity,
                                    initial_running_total_price=
                                            initial_running_total_price,
                                    section_uid=self.section.getUid(),
                                    sort_on=[('stock.date', 'asc'),
                                             ('stock.uid', 'asc')])
    running_total_price=initial_running_total_price
    running_total_quantity=initial_running_total_quantity
    for idx, (date, quantity) in enumerate(date_and_qty_list):
      brain = movement_history_list[idx]
      self.assertEquals(date, brain.date)
      running_total_quantity += quantity
      self.assertEquals(running_total_quantity, brain.running_total_quantity)
      running_total_price += quantity * quantity # we've set price=quantity
      self.assertEquals(running_total_price, brain.running_total_price)
  
  def testRunningQuantityWithQuantity0(self):
    # a 0 quantity should not be a problem for running total price
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    date = DateTime()
    quantity = -1
    for i in range(3):
      self._makeMovement( quantity=quantity+i, price=1, start_date=date+i )
    mvt_history_list = getMovementHistoryList(
                            node_uid=self.node.getUid(),
                            sort_on=[['stock.date', 'ASC']])
    self.assertEquals(3, len(mvt_history_list))
    self.assertEquals(-1, mvt_history_list[0].running_total_quantity)
    self.assertEquals(-1, mvt_history_list[0].running_total_price)
    self.assertEquals(-1, mvt_history_list[1].running_total_quantity)
    self.assertEquals(-1, mvt_history_list[1].running_total_price)
    self.assertEquals(0, mvt_history_list[2].running_total_quantity)
    self.assertEquals(0, mvt_history_list[2].running_total_price)

  # bug #352
  def testSameNodeDifferentDates(self):
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    date = DateTime()
    mvt = self._makeMovement( quantity=2,
                              start_date=date,
                              stop_date=date+1,
                              source_value=self.node,
                              destination_value=self.node )
    
    mvt_history_list = getMovementHistoryList(
                            node_uid=self.node.getUid(),)
    self.assertEquals(2, len(mvt_history_list))
    self.assertEquals(0, sum([r.total_quantity for r in mvt_history_list]))
 
  def testPrecision(self):
    # getMovementHistoryList supports a precision= argument to specify the
    # precision to round
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    self._makeMovement( quantity=0.1234, price=1 )
    mvt_history_list = getMovementHistoryList(
                            precision=2,
                            node_uid=self.node.getUid())
    self.assertEquals(1, len(mvt_history_list))
    self.assertEquals(0.12, mvt_history_list[0].running_total_quantity)
    self.assertEquals(0.12, mvt_history_list[0].running_total_price)
    self.assertEquals(0.12, mvt_history_list[0].total_quantity)
    self.assertEquals(0.12, mvt_history_list[0].total_price)
    
    mvt_history_list = getMovementHistoryList(
                            precision=3,
                            node_uid=self.node.getUid())
    self.assertEquals(1, len(mvt_history_list))
    self.assertEquals(0.123, mvt_history_list[0].running_total_quantity)
    self.assertEquals(0.123, mvt_history_list[0].running_total_price)
    self.assertEquals(0.123, mvt_history_list[0].total_quantity)
    self.assertEquals(0.123, mvt_history_list[0].total_price)

  def testPrecisionAndFloatRoundingIssues(self):
    # sum([0.1] * 10) != 1.0 but this is not a problem here
    getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
    date = DateTime()
    self._makeMovement( quantity=1, price=1, start_date=date )
    for i in range(10):
      self._makeMovement( quantity=-0.1, price=1, start_date=date+i )
    mvt_history_list = getMovementHistoryList(
                            precision=2,
                            node_uid=self.node.getUid(),
                            sort_on=[['stock.date', 'ASC']])
    self.assertEquals(11, len(mvt_history_list))
    self.assertEquals(0, mvt_history_list[-1].running_total_quantity)
    self.assertEquals(0, mvt_history_list[-1].running_total_price)
    
class TestInventoryStat(InventoryAPITestCase):
  """Tests Inventory Stat methods.
  """

if __name__ == '__main__':
  framework()
else:
  import unittest
  def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestInventory))
    suite.addTest(unittest.makeSuite(TestInventoryList))
    suite.addTest(unittest.makeSuite(TestMovementHistoryList))
    suite.addTest(unittest.makeSuite(TestInventoryStat))
    return suite

# vim: foldmethod=marker