Commit c2ca9428 authored by Jérome Perrin's avatar Jérome Perrin

WIP: Inventory api: interpolation + group by time sequence

jerome/erp5!6 rebased and in a big
commit so that we can start using that
SimulationTool: implement "linear" flow API

Conflicts:
	product/ERP5/bootstrap/erp5_core/SkinTemplateItem/portal_skins/erp5_core/Resource_zGetInventoryList.xml

testInventoryAPI: silent pyflakes warnings

more interpolation implementation

inventory_list interpolation flow: support at_date & to_date

more and more interpolation API

interpolation

interpolation

group_by_time_sequence_list

testInventoryAPI: move inventory valuation methods in a dedicated test class

This should not be in test inventory list.

test: move Tracking API to a dedicated test module

inventory api: dirty way of disabling cache if keys such as node_category are used.

interpolation: security on SimulationTool_zGetInterpolationMethod
parent f86a73bc
......@@ -560,6 +560,8 @@ class SimulationTool(BaseTool):
omit_output=0,
omit_asset_increase=0,
omit_asset_decrease=0,
# interpolation_method
interpolation_method='default',
# group by
group_by_node=0,
group_by_node_category=0,
......@@ -596,6 +598,7 @@ class SimulationTool(BaseTool):
group_by_function_category=0,
group_by_function_category_strict_membership=0,
group_by_date=0,
group_by_time_sequence_list=None,
# sort_on
sort_on=None,
group_by=None,
......@@ -649,6 +652,43 @@ class SimulationTool(BaseTool):
date_dict['range'] = 'ngt'
if date_dict:
column_value_dict['date'] = date_dict
if interpolation_method != 'default':
if not group_by_time_sequence_list:
if not (from_date and (to_date or at_date)):
raise ValueError("date_range is required to use interpolation_method")
# if we consider flow, we also select movement whose mirror date is
# in the from_date/to_date range and movement whose
# start_date/stop_date contains the report range.
# The selected range is wider, but the selected movements will have an
# "interpolation_ratio" applied to their quantity and prices.
if to_date:
column_value_dict['date'] = ComplexQuery(
Query(date=(from_date, to_date), range='minmax'),
Query(mirror_date=(from_date, to_date), range='minmax'),
ComplexQuery(
Query(mirror_date=from_date, range='min'),
Query(date=to_date, range='max'),
operator="AND"),
ComplexQuery(
Query(date=from_date, range='min'),
Query(mirror_date=to_date, range='max'),
operator="AND"),
operator="OR"
)
else:
column_value_dict['date'] = ComplexQuery(
Query(date=(from_date, at_date), range='minngt'),
Query(mirror_date=(from_date, at_date), range='minngt'),
ComplexQuery(
Query(mirror_date=from_date, range='min'),
Query(date=at_date, range='ngt'),
operator="AND"),
ComplexQuery(
Query(date=from_date, range='min'),
Query(mirror_date=at_date, range='ngt'),
operator="AND"),
operator="OR"
)
else:
column_value_dict['date'] = {'query': [to_date], 'range': 'ngt'}
column_value_dict['mirror_date'] = {'query': [from_date], 'range': 'nlt'}
......@@ -948,6 +988,8 @@ class SimulationTool(BaseTool):
new_kw['related_key_select_expression_list'] =\
related_key_select_expression_list
# XXX
sql_kw['group_by_time_sequence_list'] = group_by_time_sequence_list
return sql_kw, new_kw
#######################################################
......@@ -1032,6 +1074,17 @@ class SimulationTool(BaseTool):
output_simulation_state - only take rows with specified simulation_state
and quantity < 0
interpolation_method - Method to consider movements when calculating flows.
* (default): Consider the movement decreases 100% of source stock on
start date and increase 100% of the destination node stock on stop
date.
* linear: consider the movement decreases source stock and increase
destination stock linearly between start date and stop date.
* all_or_nothing: consider only movement who starts after the beginning of
the query period and finishes after the end of the query period.
* one_for_all: consider the movement fully as long as it is partially
contained in the query period.
ignore_variation - do not take into account variation in inventory
calculation (useless on getInventory, but useful on
getInventoryList)
......@@ -1158,6 +1211,7 @@ class SimulationTool(BaseTool):
group_by_section_category=0,
group_by_section_category_strict_membership=0,
group_by_resource=None,
group_by_time_sequence_list=(),
group_by=None,
**ignored):
"""
......@@ -1177,7 +1231,8 @@ class SimulationTool(BaseTool):
group_by_function or group_by_mirror_section or group_by_payment or \
group_by_sub_variation or group_by_variation or \
group_by_movement or group_by_date or group_by_section_category or\
group_by_section_category_strict_membership:
group_by_section_category_strict_membership or \
group_by_time_sequence_list:
if group_by_resource is None:
group_by_resource = 1
new_group_by_dict['group_by_resource'] = group_by_resource
......@@ -1194,6 +1249,7 @@ class SimulationTool(BaseTool):
omit_simulation=0,
only_accountable=True,
default_stock_table='stock',
interpolation_method='default',
selection_domain=None, selection_report=None,
statistic=0, inventory_list=1,
precision=None, connection_id=None,
......@@ -1272,6 +1328,7 @@ class SimulationTool(BaseTool):
'stock_table_id': default_stock_table,
'src__': src__,
'ignore_variation': ignore_variation,
'interpolation_method': interpolation_method,
'standardise': standardise,
'omit_simulation': omit_simulation,
'only_accountable': only_accountable,
......@@ -1287,7 +1344,9 @@ class SimulationTool(BaseTool):
# Get cached data
if getattr(self, "Resource_zGetInventoryCacheResult", None) is not None and \
optimisation__ and (not kw.get('from_date')) and \
'transformed_resource' not in kw:
'transformed_resource' not in kw \
and "category" not in str(kw) \
and "group_by_time_sequence_list" not in kw:
# Here is the different kind of date
# from_date : >=
# to_date : <
......@@ -1317,7 +1376,7 @@ class SimulationTool(BaseTool):
kw['from_date'] = cached_date
else:
cached_result = []
sql_kw, new_kw = self._generateKeywordDict(**kw)
sql_kw, new_kw = self._generateKeywordDict(interpolation_method=interpolation_method, **kw)
# Copy kw content as _generateSQLKeywordDictFromKeywordDict
# remove some values from it
try:
......@@ -1330,6 +1389,25 @@ class SimulationTool(BaseTool):
stock_sql_kw = self._generateSQLKeywordDictFromKeywordDict(
table=default_stock_table, sql_kw=sql_kw, new_kw=new_kw_copy)
stock_sql_kw.update(base_inventory_kw)
# TODO: move in _generateSQLKeywordDictFromKeywordDict
if interpolation_method in ('linear', 'all_or_nothing', 'one_for_all'):
# XXX only DateTime instance are supported
from_date = kw.get('from_date')
if from_date:
from_date = from_date.toZone("UTC")
to_date = kw.get('to_date')
if to_date:
to_date = to_date.toZone("UTC")
at_date = kw.get('at_date')
if at_date:
at_date = at_date.toZone("UTC")
stock_sql_kw['interpolation_method_from_date'] = from_date
stock_sql_kw['interpolation_method_to_date'] = to_date
stock_sql_kw['interpolation_method_at_date'] = at_date
elif interpolation_method != 'default':
raise ValueError("Unsupported interpolation_method %r" % (interpolation_method,))
delta_result = self.Resource_zGetInventoryList(
**stock_sql_kw)
if src__:
......
<dtml-let interpolation_ratio="SimulationTool_zGetInterpolationMethod(
stock_table_id=stock_table_id,
interpolation_method=interpolation_method,
interpolation_method_from_date=interpolation_method_from_date,
interpolation_method_to_date=interpolation_method_to_date,
interpolation_method_at_date=interpolation_method_at_date,
group_by_time_sequence_list=group_by_time_sequence_list,
src__=1)">
SELECT
<dtml-if expr="precision is not None">
SUM(ROUND(<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>, <dtml-var precision>)) AS inventory,
SUM(ROUND(<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>, <dtml-var precision>)) AS total_quantity,
SUM(ROUND(
<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>
* <dtml-var interpolation_ratio>, <dtml-var precision>)) AS inventory,
SUM(ROUND(
<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>
* <dtml-var interpolation_ratio>, <dtml-var precision>)) AS total_quantity,
<dtml-if convert_quantity_result>
SUM(ROUND(<dtml-var stock_table_id>.quantity * measure.quantity
<dtml-if quantity_unit_uid> / quantity_unit_conversion.quantity</dtml-if>
<dtml-if transformed_uid> * transformation.quantity</dtml-if>, <dtml-var precision>))
* <dtml-var interpolation_ratio>, <dtml-var precision>))
AS converted_quantity,
</dtml-if>
IFNULL(SUM(ROUND(<dtml-var stock_table_id>.total_price, <dtml-var precision>)), 0) AS total_price
IFNULL(SUM(ROUND(
<dtml-var stock_table_id>.total_price * <dtml-var interpolation_ratio>, <dtml-var precision>)), 0) AS total_price
<dtml-else>
SUM(<dtml-var stock_table_id>.quantity <dtml-if transformed_uid> * transformation.quantity</dtml-if>) AS inventory,
SUM(<dtml-var stock_table_id>.quantity <dtml-if transformed_uid> * transformation.quantity</dtml-if>) AS total_quantity,
SUM(<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>
* <dtml-var interpolation_ratio>
) AS inventory,
SUM(<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>
* <dtml-var interpolation_ratio>
) AS total_quantity,
<dtml-if convert_quantity_result>
ROUND(SUM(<dtml-var stock_table_id>.quantity * measure.quantity
<dtml-if quantity_unit_uid> / quantity_unit_conversion.quantity</dtml-if>
<dtml-if transformed_uid> * transformation.quantity</dtml-if>), 12)
<dtml-if transformed_uid> * transformation.quantity</dtml-if> * <dtml-var interpolation_ratio>), 12)
AS converted_quantity,
</dtml-if>
IFNULL(SUM(<dtml-var stock_table_id>.total_price), 0) AS total_price
IFNULL(SUM(<dtml-var stock_table_id>.total_price * <dtml-var interpolation_ratio>), 0) AS total_price
</dtml-if>
<dtml-if inventory_list>
,
......@@ -56,6 +77,8 @@ SELECT
COUNT(DISTINCT <dtml-var stock_table_id>.uid) AS stock_uid,
MAX(<dtml-var stock_table_id>.date) AS date
</dtml-if>
<dtml-if group_by_time_sequence_list>, slot_index </dtml-if> <dtml-comment>XXX is this really needed? are empty slots returned ? </dtml-comment>
<dtml-if select_expression>, <dtml-var select_expression></dtml-if>
FROM
......@@ -69,6 +92,55 @@ FROM
</dtml-if>
</dtml-in>
, <dtml-var stock_table_id>
<dtml-if group_by_time_sequence_list>
RIGHT JOIN
( <dtml-in prefix="time_slot" expr="_.list(_.enumerate(group_by_time_sequence_list))">
SELECT
<dtml-sqlvar expr="time_slot_key" type="int"> slot_index,
<dtml-sqlvar expr="time_slot_item.get('from_date')" type="datetime" optional> slot_from_date,
<dtml-sqlvar expr="time_slot_item.get('at_date')" type="datetime" optional> slot_at_date,
<dtml-sqlvar expr="time_slot_item.get('to_date')" type="datetime" optional> slot_to_date
<dtml-unless time_slot_end>UNION ALL</dtml-unless>
</dtml-in> ) slots
ON
<dtml-if group_by_time_sequence_list>
(
( slot_from_date is not null AND
( slot_at_date is not null AND
GREATEST(`stock`.`date`, `stock`.`mirror_date`) >= slot_from_date AND
LEAST(`stock`.`date`, `stock`.`mirror_date`) <= slot_at_date
) OR (
(
slot_to_date is not null AND
GREATEST(`stock`.`date`, `stock`.`mirror_date`) >= slot_from_date AND
LEAST(`stock`.`date`, `stock`.`mirror_date`) < slot_to_date
) OR (
GREATEST(`stock`.`date`, `stock`.`mirror_date`) >= slot_from_date AND
slot_at_date is null AND slot_to_date is null
)
)
) OR (
slot_from_date is null AND (
( slot_at_date is not null AND
( LEAST(`stock`.`date`, `stock`.`mirror_date`) <= slot_at_date )
) OR LEAST(`stock`.`date`, `stock`.`mirror_date`) < slot_to_date
)
)
)
<dtml-else>
(
( slot_from_date is null OR stock.date >= slot_from_date )
AND ( slot_at_date is null OR stock.date <= slot_at_date )
AND ( slot_to_date is null OR stock.date < slot_to_date )
)
</dtml-if>
</dtml-if>
</dtml-if>
<dtml-if quantity_unit_uid> <dtml-comment>XXX quantity unit conversion will not work when using implict_join=False</dtml-comment>
LEFT JOIN quantity_unit_conversion ON
......@@ -116,9 +188,16 @@ WHERE
<dtml-if group_by_expression>
GROUP BY
<dtml-if transformed_uid>transformation.transformed_uid,</dtml-if>
<dtml-if group_by_time_sequence_list>slot_index,</dtml-if>
<dtml-var group_by_expression>
</dtml-if>
<dtml-if order_by_expression>
ORDER BY
<dtml-var order_by_expression>
<dtml-else>
<dtml-if group_by_time_sequence_list>
ORDER BY slot_index
</dtml-if>
</dtml-if>
</dtml-let>
......@@ -45,7 +45,12 @@ convert_quantity_result\r\n
quantity_unit_uid\r\n
stock_table_id=stock\r\n
transformed_uid\r\n
transformed_variation_text</string> </value>
transformed_variation_text\r\n
group_by_time_sequence_list:list\r\n
interpolation_method\r\n
interpolation_method_from_date=not_applicable\r\n
interpolation_method_to_date=not_applicable\r\n
interpolation_method_at_date=not_applicable</string> </value>
</item>
<item>
<key> <string>cache_time_</string> </key>
......
<dtml-if expr="interpolation_method == 'linear'">
<dtml-if group_by_time_sequence_list>
CASE
WHEN <dtml-var stock_table_id>.mirror_date = <dtml-var stock_table_id>.date THEN 1
ELSE (
UNIX_TIMESTAMP(
IFNULL(
LEAST(
IFNULL(slot_at_date, slot_to_date),
GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
),
GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
)
)
- UNIX_TIMESTAMP(
GREATEST(
IFNULL(
slot_from_date, TIMESTAMP(0)
),
LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
)
)
)
/ (
UNIX_TIMESTAMP(GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)) -
UNIX_TIMESTAMP(LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)) ) END
<dtml-else>
CASE
WHEN <dtml-var stock_table_id>.mirror_date = <dtml-var stock_table_id>.date THEN 1
ELSE (
UNIX_TIMESTAMP(LEAST(
<dtml-if interpolation_method_at_date>
<dtml-sqlvar interpolation_method_at_date type="datetime">
<dtml-else>
<dtml-sqlvar interpolation_method_to_date type="datetime">
</dtml-if>,
GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date) ))
- UNIX_TIMESTAMP(GREATEST(<dtml-sqlvar interpolation_method_from_date type="datetime">,
LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date))))
/ ( UNIX_TIMESTAMP(GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)) -
UNIX_TIMESTAMP(LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)) ) END
</dtml-if>
<dtml-elif expr="interpolation_method == 'all_or_nothing'">
CASE
WHEN (
-- movement contained in time frame
LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
>= <dtml-sqlvar interpolation_method_from_date type="datetime"> AND
GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
<dtml-if interpolation_method_at_date>
<= <dtml-sqlvar interpolation_method_at_date type="datetime">
<dtml-else>
< <dtml-sqlvar interpolation_method_to_date type="datetime">
</dtml-if>
) THEN 1
ELSE 0
END
<dtml-elif expr="interpolation_method == 'one_for_all'">
CASE
WHEN (
-- movement overlaps with time frame
GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
<= <dtml-sqlvar interpolation_method_from_date type="datetime"> OR
LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
<dtml-if interpolation_method_at_date>
>= <dtml-sqlvar interpolation_method_at_date type="datetime">
<dtml-else>
> <dtml-sqlvar interpolation_method_to_date type="datetime">
</dtml-if>
) THEN 0
ELSE 1
END
<dtml-else>
1
</dtml-if>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="SQL" module="Products.ZSQLMethods.SQL"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_Use_Database_Methods_Permission</string> </key>
<value>
<list>
<string>Member</string>
</list>
</value>
</item>
<item>
<key> <string>arguments_src</string> </key>
<value> <string>stock_table_id\r\n
interpolation_method\r\n
interpolation_method_from_date\r\n
interpolation_method_to_date\r\n
interpolation_method_at_date\r\n
group_by_time_sequence_list:list</string> </value>
</item>
<item>
<key> <string>connection_id</string> </key>
<value> <string>cmf_activity_sql_connection</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>SimulationTool_zGetInterpolationMethod</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -691,9 +691,9 @@ class TestInventory(InventoryAPITestCase):
self.assertNotEquals(len(node_list), 0)
# getInventory on node uid for all member of a category ...
total_quantity = sum([quantity_for_node[node] for node in node_list])
total_quantity = sum([quantity_for_node[x] for x in node_list])
self.assertInventoryEquals(total_quantity,
node_uid=[node.getUid() for node in node_list])
node_uid=[x.getUid() for x in node_list])
# ... is equivalent to node_category
self.assertInventoryEquals(total_quantity,
node_category=category.getRelativeUrl())
......@@ -882,6 +882,463 @@ class TestInventory(InventoryAPITestCase):
resource=self.resource.getRelativeUrl(),
at_date=date_gmt_1)
def test_interpolation_method_linear_to_date(self):
self._makeMovement(
quantity=10,
start_date=DateTime("2016/01/01 01:00:00"),
stop_date=DateTime("2016/01/01 11:00:00"),
)
# With a time frame that does not contain the movement, we have 0%
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/02/01 00:00:00"),
to_date=DateTime("2016/02/02 00:00:00"),
interpolation_method='linear')
# With a time frame that contains the full movement, we have 100% of the quantity
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
to_date=DateTime("2016/01/02 00:00:00"),
interpolation_method='linear')
# corner case: exact same time, we also have 100%
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
to_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='linear')
# With a time frame containing the 50% of the movement, we have 50% of the quantity
# time frame start before movement
self.assertInventoryEquals(
5,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
to_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='linear')
# time frame start at exact same time as movement
self.assertInventoryEquals(
5,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
to_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='linear')
# Time frame is contained inside the movement
self.assertInventoryEquals(
5,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 02:00:00"),
to_date=DateTime("2016/01/01 07:00:00"),
interpolation_method='linear')
# Time frame finishes after movement end
self.assertInventoryEquals(
5,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
to_date=DateTime("2016/01/01 12:00:00"),
interpolation_method='linear')
# Time frame finishes at exact same time that movement end
self.assertInventoryEquals(
5,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
to_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='linear')
def test_interpolation_method_linear_at_date(self):
self._makeMovement(
quantity=10,
start_date=DateTime("2016/01/01 01:00:00"),
stop_date=DateTime("2016/01/01 11:00:00"),
)
# With a time frame that does not contain the movement, we have 0%
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/02/01 00:00:00"),
at_date=DateTime("2016/02/02 00:00:00"),
interpolation_method='linear')
# With a time frame that contains the full movement, we have 100% of the quantity
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
at_date=DateTime("2016/01/02 00:00:00"),
interpolation_method='linear')
# corner case: exact same time, we also have 100%
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
at_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='linear')
# With a time frame containing the 50% of the movement, we have 50% of the quantity
# time frame start before movement
self.assertInventoryEquals(
5,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
at_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='linear')
# time frame start at exact same time as movement
self.assertInventoryEquals(
5,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
at_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='linear')
# Time frame is contained inside the movement
self.assertInventoryEquals(
5,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 02:00:00"),
at_date=DateTime("2016/01/01 07:00:00"),
interpolation_method='linear')
# Time frame finishes after movement end
self.assertInventoryEquals(
5,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
at_date=DateTime("2016/01/01 12:00:00"),
interpolation_method='linear')
# Time frame finishes at exact same time that movement end
self.assertInventoryEquals(
5,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
at_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='linear')
def test_interpolation_method_XXX_one_for_all_to_date(self):
self._makeMovement(
quantity=10,
start_date=DateTime("2016/01/01 01:00:00"),
stop_date=DateTime("2016/01/01 11:00:00"),
)
# With a time frame that does not contain the movement, we have 0%
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/02/01 00:00:00"),
to_date=DateTime("2016/02/02 00:00:00"),
interpolation_method='one_for_all')
# With a time frame that contains the full movement, we have 100% of the quantity
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
to_date=DateTime("2016/01/02 00:00:00"),
interpolation_method='one_for_all')
# corner case: exact same time, we also have 100%
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
to_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='one_for_all')
# With a time frame containing the 50% of the movement, we have 100% of the quantity
# this is "one_for_all" XXX naming
# time frame start before movement
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
to_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='one_for_all')
# time frame start at exact same time as movement
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
to_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='one_for_all')
# Time frame is contained inside the movement
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 02:00:00"),
to_date=DateTime("2016/01/01 07:00:00"),
interpolation_method='one_for_all')
# Time frame finishes after movement end
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
to_date=DateTime("2016/01/01 12:00:00"),
interpolation_method='one_for_all')
# Time frame finishes at exact same time that movement end
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
to_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='one_for_all')
def test_interpolation_method_XXX_one_for_all_at_date(self):
self._makeMovement(
quantity=10,
start_date=DateTime("2016/01/01 01:00:00"),
stop_date=DateTime("2016/01/01 11:00:00"),
)
# With a time frame that does not contain the movement, we have 0%
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/02/01 00:00:00"),
at_date=DateTime("2016/02/02 00:00:00"),
interpolation_method='one_for_all')
# With a time frame that contains the full movement, we have 100% of the quantity
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
at_date=DateTime("2016/01/02 00:00:00"),
interpolation_method='one_for_all')
# corner case: exact same time, we also have 100%
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
at_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='one_for_all')
# With a time frame containing the 50% of the movement, we have 100% of the quantity
# this is "one_for_all" XXX naming
# time frame start before movement
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
at_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='one_for_all')
# time frame start at exact same time as movement
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
at_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='one_for_all')
# Time frame is contained inside the movement
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 02:00:00"),
at_date=DateTime("2016/01/01 07:00:00"),
interpolation_method='one_for_all')
# Time frame finishes after movement end
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
at_date=DateTime("2016/01/01 12:00:00"),
interpolation_method='one_for_all')
# Time frame finishes at exact same time that movement end
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
at_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='one_for_all')
def test_interpolation_method_XXX_all_or_nothing_to_date(self):
self._makeMovement(
quantity=10,
start_date=DateTime("2016/01/01 01:00:00"),
stop_date=DateTime("2016/01/01 11:00:00"),
)
# With a time frame that does not contain the movement, we have 0%
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/02/01 00:00:00"),
to_date=DateTime("2016/02/02 00:00:00"),
interpolation_method='all_or_nothing')
# With a time frame that contains the full movement, we have 100% of the quantity
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
to_date=DateTime("2016/01/02 00:00:00"),
interpolation_method='all_or_nothing')
# corner case: exact same time, we have 0%, because to_date will discard the movement
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
to_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='all_or_nothing')
# With a time frame containing the 50% of the movement, we have 0% of the quantity
# this is "all or nothing" XXX naming
# time frame start before movement
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
to_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='all_or_nothing')
# time frame start at exact same time as movement
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
to_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='all_or_nothing')
# Time frame is contained inside the movement
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 02:00:00"),
to_date=DateTime("2016/01/01 07:00:00"),
interpolation_method='all_or_nothing')
# Time frame finishes after movement end
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
to_date=DateTime("2016/01/01 12:00:00"),
interpolation_method='all_or_nothing')
# Time frame finishes at exact same time that movement end
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
to_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='all_or_nothing')
def test_interpolation_method_XXX_all_or_nothing_at_date(self):
self._makeMovement(
quantity=10,
start_date=DateTime("2016/01/01 01:00:00"),
stop_date=DateTime("2016/01/01 11:00:00"),
)
# With a time frame that does not contain the movement, we have 0%
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/02/01 00:00:00"),
at_date=DateTime("2016/02/02 00:00:00"),
interpolation_method='all_or_nothing')
# With a time frame that contains the full movement, we have 100% of the quantity
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
at_date=DateTime("2016/01/02 00:00:00"),
interpolation_method='all_or_nothing')
# corner case: exact same time, we also have 100%, because at_date include the movement
self.assertInventoryEquals(
10,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
at_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='all_or_nothing')
# With a time frame containing the 50% of the movement, we have 0% of the quantity
# this is "all or nothing" XXX naming
# time frame start before movement
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 00:00:00"),
at_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='all_or_nothing')
# time frame start at exact same time as movement
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 01:00:00"),
at_date=DateTime("2016/01/01 06:00:00"),
interpolation_method='all_or_nothing')
# Time frame is contained inside the movement
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 02:00:00"),
at_date=DateTime("2016/01/01 07:00:00"),
interpolation_method='all_or_nothing')
# Time frame finishes after movement end
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
at_date=DateTime("2016/01/01 12:00:00"),
interpolation_method='all_or_nothing')
# Time frame finishes at exact same time that movement end
self.assertInventoryEquals(
0,
node_uid=self.node.getUid(),
from_date=DateTime("2016/01/01 06:00:00"),
at_date=DateTime("2016/01/01 11:00:00"),
interpolation_method='all_or_nothing')
def test_interpolation_method_invalid_input(self):
# with interpolation method at least one of from_date, at_date or to_date is required.
with self.assertRaises(ValueError):
self.portal.portal_simulation.getInventoryList(
interpolation_method='linear')
with self.assertRaises(ValueError):
self.portal.portal_simulation.getInventoryList(
interpolation_method='all_or_nothing') # XXX
with self.assertRaises(ValueError):
self.portal.portal_simulation.getInventoryList(
interpolation_method='one_for_all') # XXX
class TestInventoryList(InventoryAPITestCase):
"""Tests getInventoryList methods.
"""
......@@ -1009,8 +1466,8 @@ class TestInventoryList(InventoryAPITestCase):
getInventoryList = self.getSimulationTool().getInventoryList
self.section.setGroup('level1')
self.other_section.setGroup('level1')
m1 = self._makeMovement(quantity=2)
m2 = self._makeMovement(destination_section_value=self.other_section, quantity=3)
self._makeMovement(quantity=2)
self._makeMovement(destination_section_value=self.other_section, quantity=3)
inventory_list = getInventoryList(node_uid=self.node.getUid(),
section_category='group/level1',
......@@ -1026,8 +1483,8 @@ class TestInventoryList(InventoryAPITestCase):
getInventoryList = self.getSimulationTool().getInventoryList
self.section.setGroup('level1')
self.other_section.setGroup('level1')
m1 = self._makeMovement(quantity=2)
m2 = self._makeMovement(destination_section_value=self.other_section, quantity=3)
self._makeMovement(quantity=2)
self._makeMovement(destination_section_value=self.other_section, quantity=3)
inventory_list = getInventoryList(node_uid=self.node.getUid(),
section_category='group/level1',
......@@ -1265,6 +1722,216 @@ class TestInventoryList(InventoryAPITestCase):
self.assertEqual([r.inventory for r in inventory_list
if r.strict_use_uid == use.use1.use12.getUid()], [11])
def test_group_by_time_sequence(self):
getInventoryList = self.getSimulationTool().getInventoryList
# Create 3 groups of movements:
self._makeMovement(quantity=1, start_date=DateTime('2016/01/01'))
self._makeMovement(quantity=3, start_date=DateTime('2016/02/01'))
self._makeMovement(quantity=5, start_date=DateTime('2016/02/02'))
self._makeMovement(quantity=7, start_date=DateTime('2016/03/01'))
# Create "noise" movement that we should not select
self._makeMovement(
quantity=10,
start_date=DateTime('2016/02/01'),
destination_value=self.portal.organisation_module.newContent())
inventory_list = getInventoryList(
node_uid=self.node.getUid(),
group_by_time_sequence_list=(
{'at_date': DateTime('2016/01/01').latestTime()},
{'from_date': DateTime('2016/02/01'), 'to_date': DateTime('2016/03/01')},
{'from_date': DateTime('2016/03/01')},
)
)
self.assertEqual(3, len(inventory_list))
self.assertEqual(1, inventory_list[0].total_quantity)
self.assertEqual(0, inventory_list[0].slot_index)
self.assertEqual(3 + 5, inventory_list[1].total_quantity)
self.assertEqual(1, inventory_list[1].slot_index)
self.assertEqual(7, inventory_list[2].total_quantity)
self.assertEqual(2, inventory_list[2].slot_index)
# now using all combinasion of from_date, at_date & to_date
inventory_list = getInventoryList(
node_uid=self.node.getUid(),
group_by_time_sequence_list=(
{'at_date': DateTime('2016/01/01').latestTime()},
{'to_date': DateTime('2016/01/02')}, # equivalent to above
{'from_date': DateTime('2016/02/01'), 'at_date': DateTime('2016/02/29').latestTime()},
{'from_date': DateTime('2016/02/01'), 'to_date': DateTime('2016/03/01')},
{'from_date': DateTime('2016/03/01')},
)
)
self.assertEqual([1, 1, 3+5, 3+5, 7], [x.inventory for x in inventory_list])
def test_group_by_time_sequence_empty_slots_are_returned(self):
getInventoryList = self.getSimulationTool().getInventoryList
self._makeMovement(title="M1", quantity=3, start_date=DateTime('2016/01/01'))
self._makeMovement(title="M2", quantity=5, start_date=DateTime('2016/02/01'))
inventory_list = getInventoryList(
node_uid=self.node.getUid(),
group_by_time_sequence_list=(
# before M1 -> empty
{'at_date': DateTime('2001/01/01').latestTime()},
{'to_date': DateTime('2001/01/01')},
{'from_date': DateTime('1999/01/01'), 'to_date': DateTime('2001/01/01')},
{'from_date': DateTime('1999/01/01'), 'at_date': DateTime('2001/01/01')},
# selecting M1
{'from_date': DateTime('2016/01/01'), 'to_date': DateTime('2016/01/02')},
# between M1 & M2 -> empty
{'from_date': DateTime('2016/01/02'), 'at_date': DateTime('2001/01/03')},
{'from_date': DateTime('2016/01/02'), 'to_date': DateTime('2001/01/03')},
# selecting M2
{'from_date': DateTime('2016/02/01'), 'to_date': DateTime('2016/02/03')},
# after M2 -> empty
{'from_date': DateTime('2016/02/03'), 'to_date': DateTime('2016/02/04')},
{'from_date': DateTime('2016/02/03'), 'at_date': DateTime('2001/02/04')},
{'from_date': DateTime('2016/02/03')},
)
)
self.assertEqual(
[
0, 0, 0, 0,
3,
0, 0,
5,
0, 0, 0
], [x.inventory for x in inventory_list])
def test_group_by_time_sequence_invalid_inputs(self):
getInventoryList = self.getSimulationTool().getInventoryList
self._makeMovement(title="M1", quantity=3, start_date=DateTime('2016/01/02'))
# no from_date, at_date or to_date on a slot raise a ValueError
with self.assertRaises(ValueError):
getInventoryList(
node_uid=self.node.getUid(),
group_by_time_sequence_list=(
{},
)
)
# slots where start_date > stop_date are valid, but select nothing
self.assertEqual(
[0],
[x.inventory for x in
getInventoryList(
node_uid=self.node.getUid(),
group_by_time_sequence_list=(
{ 'from_date': DateTime('2016/01/03'),
'at_date': DateTime('2016/01/01') }
)
)
]
)
self.assertEqual(
[0],
[x.inventory for x in
getInventoryList(
node_uid=self.node.getUid(),
group_by_time_sequence_list=(
{ 'from_date': DateTime('2016/01/03'),
'to_date': DateTime('2016/01/01') }
)
)
]
)
def test_group_by_time_sequence_with_interpolation_method(self):
getInventoryList = self.getSimulationTool().getInventoryList
self._makeMovement(
quantity=10,
title="M1",
start_date=DateTime('2016/01/01 01:00:00'),
stop_date=DateTime('2016/01/01 11:00:00'),
)
self._makeMovement(
quantity=5,
title="M2",
start_date=DateTime('2016/01/01 10:00:00'),
stop_date=DateTime('2016/01/01 15:00:00'),
)
self._makeMovement(
title="M3",
quantity=10,
start_date=DateTime("2016/01/01 05:00:00"),
stop_date=DateTime("2016/01/01 15:00:00"),
)
self._makeMovement(
title="M4",
quantity=5,
start_date=DateTime("2016/01/01 18:00:00"),
stop_date=DateTime("2016/01/01 23:00:00"),
)
self._makeMovement(
title="M5",
destination_value=self.portal.organisation_module.newContent(),
quantity=10,
start_date=DateTime("2016/01/01 08:00:00"),
stop_date=DateTime("2016/01/01 17:00:00"),
)
# We have created these movements:
# 00:00 10:00 20:00
# | | |
# M1 XXXXXXXXXX
# M2 XXXXX
# M3 XXXXXXXXXX
# M4 XXXXX
# M5 YYYYYYYYYY (will not be counted because on another node)
# | | |
# We will query with this time sequence:
# 00:00 10:00 20:00
# | | | expected quantity:
# ... ] 1 ( M1 )
# [ ] 3 ( M1 )
# [] 5 ( M1 + M2 + M3)
# [ ... 2 ( M4 )
# M1 XXXXXXXXXX
# M2 XXXXX
# M3 XXXXXXXXXX
# M4 XXXXX
# M5 YYYYYYYYYY
# | | |
inventory_list = getInventoryList(
node_uid=self.node.getUid(),
interpolation_method='linear',
group_by_time_sequence_list=(
{'at_date': DateTime('2016/01/01 02:00:00')},
{'from_date': DateTime('2016/01/01 01:00:00'), 'to_date': DateTime('2016/01/01 04:00:00')},
{'from_date': DateTime('2016/01/01 09:00:00'), 'to_date': DateTime('2016/01/01 11:00:00')},
{'from_date': DateTime('2016/01/01 21:00:00'), },
)
)
self.assertEqual(
[1, 3, 5, 2],
[x.inventory for x in inventory_list])
def test_OmitInputOmitOutput(self):
getInventoryList = self.getSimulationTool().getInventoryList
self._makeMovement(quantity=1, price=1)
......@@ -1443,6 +2110,98 @@ class TestInventoryList(InventoryAPITestCase):
checkInventory(line=3, type='Future', source=1, quantity=-9)
checkInventory(line=3, type='Future', destination=1, quantity=9)
def test_interpolation_method_linear(self):
self._makeMovement(
title="M1",
quantity=10,
price=2,
destination_value=self.node,
start_date=DateTime("2016/01/01 01:00:00"),
stop_date=DateTime("2016/01/01 11:00:00"),
)
self._makeMovement(
title="M2",
quantity=5,
price=3,
destination_value=self.node,
start_date=DateTime("2016/01/01 10:00:00"),
stop_date=DateTime("2016/01/01 15:00:00"),
)
self._makeMovement(
title="M3",
quantity=10,
price=5,
destination_value=self.other_node,
start_date=DateTime("2016/01/01 05:00:00"),
stop_date=DateTime("2016/01/01 15:00:00"),
)
self._makeMovement(
title="M4",
quantity=5,
price=7,
destination_value=self.other_node,
start_date=DateTime("2016/01/01 18:00:00"),
stop_date=DateTime("2016/01/01 23:00:00"),
)
# We have created these movements:
# 00:00 10:00 20:00
# | | |
# M1 XXXXXXXXXX
# M2 XXXXX
# M3 XXXXXXXXXX
# M4 XXXXX
# | | |
inventory_list = self.getSimulationTool().getInventoryList(
group_by_node=True,
node_uid=(self.node.getUid(), self.other_node.getUid()),
from_date=DateTime("2016/01/01 08:00:00"),
at_date=DateTime("2016/01/01 12:00:00"),
interpolation_method="linear",
)
# We only query for the part between 8:00 to 12:00, so we select:
# 00:00 10:00 20:00
# | [ | [ |
# M1 XXXXXXXXXX
# ^^^ 3
# M2 XXXXX
# ^^ 2 -> total for `node`: 5
# M3 XXXXXXXXXX
# ^^^^ 4 -> total for `other_node`: 4
# M4 XXXXX
# | [ | [ |
self.assertEqual(2, len(inventory_list))
node_inventory, = [x for x in inventory_list if x.node_uid == self.node.getUid()]
self.assertEqual(3+2, node_inventory.inventory)
self.assertEqual(3*2 + 2*3, node_inventory.total_price)
other_node_inventory, = [x for x in inventory_list if x.node_uid == self.other_node.getUid()]
self.assertEqual(4, other_node_inventory.inventory)
self.assertEqual(4*5, other_node_inventory.total_price)
# this is also true if we use a precision
inventory_list = self.getSimulationTool().getInventoryList(
group_by_node=True,
precision=2,
node_uid=(self.node.getUid(), self.other_node.getUid()),
from_date=DateTime("2016/01/01 08:00:00"),
at_date=DateTime("2016/01/01 12:00:00"),
interpolation_method="linear",
)
self.assertEqual(2, len(inventory_list))
node_inventory, = [x for x in inventory_list if x.node_uid == self.node.getUid()]
self.assertEqual(3+2, node_inventory.inventory)
self.assertEqual(3*2 + 2*3, node_inventory.total_price)
other_node_inventory, = [x for x in inventory_list if x.node_uid == self.other_node.getUid()]
self.assertEqual(4, other_node_inventory.inventory)
self.assertEqual(4*5, other_node_inventory.total_price)
class TestInventoryAssetPriceValuationMethod(InventoryAPITestCase):
def test_inventory_asset_price(self):
# examples from http://accountinginfo.com/study/inventory/inventory-120.htm
movement_list = [
......@@ -1710,7 +2469,7 @@ class TestMovementHistoryList(InventoryAPITestCase):
getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
start_date = DateTime(2001, 1, 1)
stop_date = DateTime(2002, 2, 2)
mvt = self._makeMovement(quantity=100,
self._makeMovement(quantity=100,
start_date=start_date,
stop_date=stop_date)
# start_date is for source
......@@ -1740,9 +2499,9 @@ class TestMovementHistoryList(InventoryAPITestCase):
def testResource(self):
getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
mvt = self._makeMovement(quantity=100)
self._makeMovement(quantity=100)
another_resource = self._makeResource()
another_mvt = self._makeMovement(quantity=3,
self._makeMovement(quantity=3,
resource_value=another_resource)
# we can query resource directly by uid
mvt_history_list = getMovementHistoryList(
......@@ -1764,7 +2523,7 @@ class TestMovementHistoryList(InventoryAPITestCase):
def testSectionCategory(self):
getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
self.section.setGroup('level1/level2')
mvt = self._makeMovement(quantity=100)
self._makeMovement(quantity=100)
# for section category, both exact category or any parent category works
# section_category can also be a list.
......@@ -1788,7 +2547,7 @@ class TestMovementHistoryList(InventoryAPITestCase):
# it is currently invalid to pass the same category twice to inventory API
getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
self.section.setGroup('level1/level2')
mvt = self._makeMovement(quantity=100)
self._makeMovement(quantity=100)
movement_history_list = getMovementHistoryList(
section_category=['group/level1',
'group/level1/level2'])
......@@ -1800,7 +2559,7 @@ class TestMovementHistoryList(InventoryAPITestCase):
getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
self.section.setGroup('level1/level2')
self.node.setGroup('level1')
mvt = self._makeMovement(quantity=100)
self._makeMovement(quantity=100)
valid_category_list = [ 'group/level1',
['group/level1', 'group/anotherlevel'],
......@@ -2191,7 +2950,7 @@ class TestMovementHistoryList(InventoryAPITestCase):
def testSameNodeDifferentDates(self):
getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
date = DateTime()
mvt = self._makeMovement( quantity=2,
self._makeMovement( quantity=2,
start_date=date,
stop_date=date+1,
source_value=self.node,
......@@ -2204,7 +2963,7 @@ class TestMovementHistoryList(InventoryAPITestCase):
def testSameNodeSameDates(self):
getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
mvt = self._makeMovement( quantity=2,
self._makeMovement( quantity=2,
start_date=DateTime(),
source_value=self.node,
destination_value=self.node )
......@@ -2215,7 +2974,7 @@ class TestMovementHistoryList(InventoryAPITestCase):
def testSameNodeSameDatesSameSections(self):
getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
mvt = self._makeMovement( quantity=2,
self._makeMovement( quantity=2,
start_date=DateTime(),
source_value=self.node,
destination_value=self.node,
......@@ -2294,8 +3053,8 @@ class TestMovementHistoryList(InventoryAPITestCase):
def test_OmitAssetIncreaseDecrease(self):
getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
m1 = self._makeMovement(quantity=1, price=-1)
m2 = self._makeMovement(quantity=-1, price=-1)
self._makeMovement(quantity=1, price=-1)
self._makeMovement(quantity=-1, price=-1)
mvt_history_list = getMovementHistoryList(node_uid=self.node.getUid(),
omit_asset_increase=1)
self.assertEqual(1, len(mvt_history_list))
......@@ -2432,7 +3191,7 @@ class TestMovementHistoryList(InventoryAPITestCase):
m2 = delivery.newContent(portal_type='Dummy Movement', quantity=1,
price=2, resource_value=self.resource,
start_date=DateTime(2010, 1, 1))
m3 = delivery.newContent(portal_type='Dummy Movement', quantity=1,
delivery.newContent(portal_type='Dummy Movement', quantity=1,
price=7, resource_value=self.other_resource,
start_date=DateTime(2010, 1, 2))
self.commit();
......@@ -2530,208 +3289,6 @@ class TestInventoryStat(InventoryAPITestCase):
makeMovement(quantity=5)
self.assertEqual(getInventoryStat(node_uid=node_uid)[0].stock_uid, 3)
class TestTrackingList(InventoryAPITestCase):
"""Tests Item Tracking
"""
def testNodeUid(self):
getTrackingList = self.getSimulationTool().getTrackingList
start_date = DateTime()
def makeMovement(aggregate=None):
self._makeMovement(quantity=1, price=1,
aggregate_value=aggregate,
resource_value=self.resource,
start_date = start_date,
source_value=self.other_node,
destination_value=self.node)
item_uid = self.item.getUid()
other_item_uid = self.other_item.getUid()
node_uid = self.node.getUid()
self.assertEqual(len(getTrackingList(node_uid=node_uid,
at_date=start_date)),0)
makeMovement(aggregate=self.item)
result = getTrackingList(node_uid=node_uid,at_date=start_date)
self.assertEqual(len(result),1)
self.failIfDifferentSet([x.uid for x in result], [item_uid])
makeMovement(aggregate=self.other_item)
result = getTrackingList(node_uid=node_uid,at_date=start_date)
self.assertEqual(len(result),2)
self.failIfDifferentSet([x.uid for x in result], [item_uid, other_item_uid])
def testSeveralAggregateOnMovement(self):
getTrackingList = self.getSimulationTool().getTrackingList
start_date = DateTime()
def makeMovement(aggregate_list=None):
self._makeMovement(quantity=1, price=1,
aggregate_list=aggregate_list,
resource_value=self.resource,
start_date = start_date,
source_value=self.other_node,
destination_value=self.node)
item_uid = self.item.getUid()
other_item_uid = self.other_item.getUid()
node_uid = self.node.getUid()
self.assertEqual(len(getTrackingList(node_uid=node_uid,
at_date=start_date)),0)
makeMovement(aggregate_list=[self.item.getRelativeUrl(),
self.other_item.getRelativeUrl()])
result = getTrackingList(node_uid=node_uid,at_date=start_date)
self.assertEqual(len(result),2)
self.failIfDifferentSet([x.uid for x in result], [item_uid, other_item_uid])
def testDates(self):
"""
Test different dates parameters of getTrackingList.
"""
getTrackingList = self.getSimulationTool().getTrackingList
now = DateTime()
node_1 = self._makeOrganisation(title='Node 1')
node_2 = self._makeOrganisation(title='Node 2')
date_0 = now - 4 # Before first movement
date_1 = now - 3 # First movement
date_2 = now - 2 # Between both movements
date_3 = now - 1 # Second movement
date_4 = now # After last movement
self._makeMovement(quantity=1, price=1,
aggregate_value=self.item,
resource_value=self.resource,
start_date=date_1,
source_value=None,
destination_value=node_1)
self._makeMovement(quantity=1, price=1,
aggregate_value=self.item,
resource_value=self.resource,
start_date=date_3,
source_value=node_1,
destination_value=node_2)
node_1_uid = node_1.getUid()
node_2_uid = node_2.getUid()
date_location_dict = {
date_0: {'at_date': None, 'to_date': None},
date_1: {'at_date': node_1_uid, 'to_date': None},
date_2: {'at_date': node_1_uid, 'to_date': node_1_uid},
date_3: {'at_date': node_2_uid, 'to_date': node_1_uid},
date_4: {'at_date': node_2_uid, 'to_date': node_2_uid}
}
node_uid_to_node_number = {
node_1_uid: 1,
node_2_uid: 2
}
for date, location_dict in date_location_dict.iteritems():
for param_id, location_uid in location_dict.iteritems():
param_dict = {param_id: date}
uid_list = [x.node_uid for x in getTrackingList(
aggregate_uid=self.item.getUid(), **param_dict)]
if location_uid is None:
self.assertEqual(len(uid_list), 0)
else:
self.assertEqual(len(uid_list), 1)
self.assertEqual(uid_list[0], location_uid,
'%s=now - %i, aggregate should be at node %i but is at node %i' % \
(param_id, now - date, node_uid_to_node_number[location_uid], node_uid_to_node_number[uid_list[0]]))
def testFutureTrackingList(self):
movement = self._makeMovement(quantity=1, aggregate_value=self.item,)
getFutureTrackingList = self.portal.portal_simulation.getFutureTrackingList
node_uid = self.node.getUid()
for state in ('planned', 'ordered', 'confirmed', 'ready', 'started',
'stopped', 'delivered'):
movement.simulation_state = state
movement.reindexObject()
self.tic()
tracking_node_uid_list = [brain.node_uid for brain in
getFutureTrackingList(item=self.item.getRelativeUrl())]
self.assertEqual([node_uid], tracking_node_uid_list,
"%s != %s (state:%s)" % ([node_uid], tracking_node_uid_list, state))
for state in ('draft', 'cancelled', 'deleted'):
movement.simulation_state = state
movement.reindexObject()
self.tic()
tracking_node_uid_list = [brain.node_uid for brain in
getFutureTrackingList(item=self.item.getRelativeUrl())]
self.assertEqual([], tracking_node_uid_list,
"%s != %s (state:%s)" % ([], tracking_node_uid_list, state))
def _createScenarioToTestTrackingListMethod(self,
state_a="delivered", state_b="delivered", state_c="delivered"):
"""
Scenario:
Item 1 => A -> B -> C
Item 2 => A -> B
"""
now = DateTime()
item_a = self.getItemModule().newContent(title="Item 1")
item_b = self.getItemModule().newContent(title="Item 2")
node_a = self._makeOrganisation(title='Node A')
node_b = self._makeOrganisation(title='Node B')
node_c = self._makeOrganisation(title='Node C')
movement_a = self._makeMovement(source_value=node_a,
destination_value=node_b, resource=self.resource,
quantity=1, aggregate_value=item_a, start_date=now,
simulation_state=state_a)
movement_b = self._makeMovement(source_value=node_b,
destination_value=node_c, resource=self.resource,
quantity=1, aggregate_value=item_a, start_date=now+1,
simulation_state=state_b)
movement_c = self._makeMovement(source_value=node_a,
destination_value=node_b, resource=self.resource,
quantity=1, aggregate_value=item_b, start_date=now+1,
simulation_state=state_c)
self.tic()
return {"item_a": item_a, "item_b": item_b,
"node_a": node_a, "node_b": node_b,
"movement_a": movement_a, "movement_b": movement_b,
"movement_c": movement_c}
def testTrackingListWithOutputParameter(self):
"""
Add test to check if getTrackingList with output=1, returns only Item 2
if you search for items in B
"""
data_dict = self._createScenarioToTestTrackingListMethod()
item_a = data_dict['item_a']
item_b = data_dict['item_b']
node_b = data_dict['node_b']
movement_a = data_dict['movement_a']
movement_b = data_dict['movement_b']
movement_c = data_dict['movement_c']
getTrackingList = self.portal.portal_simulation.getTrackingList
path_list = [i.path for i in getTrackingList(
node_uid=node_b.getUid())]
self.assertTrue(item_a.getPath() in path_list,
"Is expected %s in B" % item_a.getPath())
self.assertTrue(item_b.getPath() in path_list,
"Is expected %s in B" % item_b.getPath())
path_list = [i.path for i in getTrackingList(
node_uid=node_b.getUid(), output=1)]
self.assertTrue(item_a.getPath() not in path_list,
"%s should not be in B" % item_a.getTitle())
self.assertTrue(item_b.getPath() in path_list,
"%s should be in B" % item_b.getTitle())
def testCurrentTrackingListWithOutputParameter(self):
"""
Add test to check if getCurrentTrackingList with B -> C started and not delivered,
returns only Item 2 if you search for items in B
"""
data_dict = self._createScenarioToTestTrackingListMethod(state_b="started")
item_a = data_dict['item_a']
item_b = data_dict['item_b']
node_b = data_dict['node_b']
getCurrentTrackingList = self.portal.portal_simulation.getCurrentTrackingList
path_list = [i.path for i in getCurrentTrackingList(
node_uid=node_b.getUid(), output=1)]
self.assertTrue(item_a.getPath() not in path_list,
"%s should not be in B" % item_a.getTitle())
self.assertTrue(item_b.getPath() in path_list,
"%s should be in B" % item_a.getTitle())
# TODO: missing tests for input=1
class TestInventoryCacheTable(InventoryAPITestCase):
""" Test impact of creating cache entries into inventory_cache table
......@@ -3376,7 +3933,7 @@ class TestInventoryCacheTable(InventoryAPITestCase):
# Create an old movement
INVENTORY_QUANTITY_4 = 100
INVENTORY_DATE_4 = self.NOW - 3 * self.CACHE_LAG
movement = self._makeMovement(quantity=INVENTORY_QUANTITY_4,
self._makeMovement(quantity=INVENTORY_QUANTITY_4,
start_date=INVENTORY_DATE_4,
simulation_state='delivered')
# Get inventory in past so that cache is filled
......@@ -3442,7 +3999,7 @@ class TestInventoryCacheTable(InventoryAPITestCase):
# Create a new movement, indexation should not fail
INVENTORY_QUANTITY_4 = 5000
INVENTORY_DATE_4 = self.CACHE_DATE
movement = self._makeMovement(
self._makeMovement(
quantity=INVENTORY_QUANTITY_4,
start_date=INVENTORY_DATE_4,
simulation_state='delivered',
......@@ -4004,14 +4561,12 @@ def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestInventory))
suite.addTest(unittest.makeSuite(TestInventoryList))
suite.addTest(unittest.makeSuite(TestInventoryAssetPriceValuationMethod))
suite.addTest(unittest.makeSuite(TestMovementHistoryList))
suite.addTest(unittest.makeSuite(TestInventoryStat))
suite.addTest(unittest.makeSuite(TestNextNegativeInventoryDate))
suite.addTest(unittest.makeSuite(TestTrackingList))
suite.addTest(unittest.makeSuite(TestInventoryCacheTable))
suite.addTest(unittest.makeSuite(TestUnitConversion))
suite.addTest(unittest.makeSuite(TestUnitConversionDefinition))
suite.addTest(unittest.makeSuite(TestUnitConversionBackwardCompatibility))
return suite
##############################################################################
#
# Copyright (c) 2004 Nexedi SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
"""Unit Tests for Tracking API.
"""
import os
import unittest
from DateTime import DateTime
from Products.ERP5.tests.testInventoryAPI import InventoryAPITestCase
class TestTrackingList(InventoryAPITestCase):
"""Tests Item Tracking
"""
def testNodeUid(self):
getTrackingList = self.getSimulationTool().getTrackingList
start_date = DateTime()
def makeMovement(aggregate=None):
self._makeMovement(quantity=1, price=1,
aggregate_value=aggregate,
resource_value=self.resource,
start_date = start_date,
source_value=self.other_node,
destination_value=self.node)
item_uid = self.item.getUid()
other_item_uid = self.other_item.getUid()
node_uid = self.node.getUid()
self.assertEqual(len(getTrackingList(node_uid=node_uid,
at_date=start_date)),0)
makeMovement(aggregate=self.item)
result = getTrackingList(node_uid=node_uid,at_date=start_date)
self.assertEqual(len(result),1)
self.failIfDifferentSet([x.uid for x in result], [item_uid])
makeMovement(aggregate=self.other_item)
result = getTrackingList(node_uid=node_uid,at_date=start_date)
self.assertEqual(len(result),2)
self.failIfDifferentSet([x.uid for x in result], [item_uid, other_item_uid])
def testSeveralAggregateOnMovement(self):
getTrackingList = self.getSimulationTool().getTrackingList
start_date = DateTime()
def makeMovement(aggregate_list=None):
self._makeMovement(quantity=1, price=1,
aggregate_list=aggregate_list,
resource_value=self.resource,
start_date = start_date,
source_value=self.other_node,
destination_value=self.node)
item_uid = self.item.getUid()
other_item_uid = self.other_item.getUid()
node_uid = self.node.getUid()
self.assertEqual(len(getTrackingList(node_uid=node_uid,
at_date=start_date)),0)
makeMovement(aggregate_list=[self.item.getRelativeUrl(),
self.other_item.getRelativeUrl()])
result = getTrackingList(node_uid=node_uid,at_date=start_date)
self.assertEqual(len(result),2)
self.failIfDifferentSet([x.uid for x in result], [item_uid, other_item_uid])
def testDates(self):
"""
Test different dates parameters of getTrackingList.
"""
getTrackingList = self.getSimulationTool().getTrackingList
now = DateTime()
node_1 = self._makeOrganisation(title='Node 1')
node_2 = self._makeOrganisation(title='Node 2')
date_0 = now - 4 # Before first movement
date_1 = now - 3 # First movement
date_2 = now - 2 # Between both movements
date_3 = now - 1 # Second movement
date_4 = now # After last movement
self._makeMovement(quantity=1, price=1,
aggregate_value=self.item,
resource_value=self.resource,
start_date=date_1,
source_value=None,
destination_value=node_1)
self._makeMovement(quantity=1, price=1,
aggregate_value=self.item,
resource_value=self.resource,
start_date=date_3,
source_value=node_1,
destination_value=node_2)
node_1_uid = node_1.getUid()
node_2_uid = node_2.getUid()
date_location_dict = {
date_0: {'at_date': None, 'to_date': None},
date_1: {'at_date': node_1_uid, 'to_date': None},
date_2: {'at_date': node_1_uid, 'to_date': node_1_uid},
date_3: {'at_date': node_2_uid, 'to_date': node_1_uid},
date_4: {'at_date': node_2_uid, 'to_date': node_2_uid}
}
node_uid_to_node_number = {
node_1_uid: 1,
node_2_uid: 2
}
for date, location_dict in date_location_dict.iteritems():
for param_id, location_uid in location_dict.iteritems():
param_dict = {param_id: date}
uid_list = [x.node_uid for x in getTrackingList(
aggregate_uid=self.item.getUid(), **param_dict)]
if location_uid is None:
self.assertEqual(len(uid_list), 0)
else:
self.assertEqual(len(uid_list), 1)
self.assertEqual(uid_list[0], location_uid,
'%s=now - %i, aggregate should be at node %i but is at node %i' % \
(param_id, now - date, node_uid_to_node_number[location_uid], node_uid_to_node_number[uid_list[0]]))
def testFutureTrackingList(self):
movement = self._makeMovement(quantity=1, aggregate_value=self.item,)
getFutureTrackingList = self.portal.portal_simulation.getFutureTrackingList
node_uid = self.node.getUid()
for state in ('planned', 'ordered', 'confirmed', 'ready', 'started',
'stopped', 'delivered'):
movement.simulation_state = state
movement.reindexObject()
self.tic()
tracking_node_uid_list = [brain.node_uid for brain in
getFutureTrackingList(item=self.item.getRelativeUrl())]
self.assertEqual([node_uid], tracking_node_uid_list,
"%s != %s (state:%s)" % ([node_uid], tracking_node_uid_list, state))
for state in ('draft', 'cancelled', 'deleted'):
movement.simulation_state = state
movement.reindexObject()
self.tic()
tracking_node_uid_list = [brain.node_uid for brain in
getFutureTrackingList(item=self.item.getRelativeUrl())]
self.assertEqual([], tracking_node_uid_list,
"%s != %s (state:%s)" % ([], tracking_node_uid_list, state))
def _createScenarioToTestTrackingListMethod(self,
state_a="delivered", state_b="delivered", state_c="delivered"):
"""
Scenario:
Item 1 => A -> B -> C
Item 2 => A -> B
"""
now = DateTime()
item_a = self.getItemModule().newContent(title="Item 1")
item_b = self.getItemModule().newContent(title="Item 2")
node_a = self._makeOrganisation(title='Node A')
node_b = self._makeOrganisation(title='Node B')
node_c = self._makeOrganisation(title='Node C')
movement_a = self._makeMovement(source_value=node_a,
destination_value=node_b, resource=self.resource,
quantity=1, aggregate_value=item_a, start_date=now,
simulation_state=state_a)
movement_b = self._makeMovement(source_value=node_b,
destination_value=node_c, resource=self.resource,
quantity=1, aggregate_value=item_a, start_date=now+1,
simulation_state=state_b)
movement_c = self._makeMovement(source_value=node_a,
destination_value=node_b, resource=self.resource,
quantity=1, aggregate_value=item_b, start_date=now+1,
simulation_state=state_c)
self.tic()
return {"item_a": item_a, "item_b": item_b,
"node_a": node_a, "node_b": node_b,
"movement_a": movement_a, "movement_b": movement_b,
"movement_c": movement_c}
def testTrackingListWithOutputParameter(self):
"""
Add test to check if getTrackingList with output=1, returns only Item 2
if you search for items in B
"""
data_dict = self._createScenarioToTestTrackingListMethod()
item_a = data_dict['item_a']
item_b = data_dict['item_b']
node_b = data_dict['node_b']
getTrackingList = self.portal.portal_simulation.getTrackingList
path_list = [i.path for i in getTrackingList(
node_uid=node_b.getUid())]
self.assertTrue(item_a.getPath() in path_list,
"Is expected %s in B" % item_a.getPath())
self.assertTrue(item_b.getPath() in path_list,
"Is expected %s in B" % item_b.getPath())
path_list = [i.path for i in getTrackingList(
node_uid=node_b.getUid(), output=1)]
self.assertTrue(item_a.getPath() not in path_list,
"%s should not be in B" % item_a.getTitle())
self.assertTrue(item_b.getPath() in path_list,
"%s should be in B" % item_b.getTitle())
def testCurrentTrackingListWithOutputParameter(self):
"""
Add test to check if getCurrentTrackingList with B -> C started and not delivered,
returns only Item 2 if you search for items in B
"""
data_dict = self._createScenarioToTestTrackingListMethod(state_b="started")
item_a = data_dict['item_a']
item_b = data_dict['item_b']
node_b = data_dict['node_b']
getCurrentTrackingList = self.portal.portal_simulation.getCurrentTrackingList
path_list = [i.path for i in getCurrentTrackingList(
node_uid=node_b.getUid(), output=1)]
self.assertTrue(item_a.getPath() not in path_list,
"%s should not be in B" % item_a.getTitle())
self.assertTrue(item_b.getPath() in path_list,
"%s should be in B" % item_a.getTitle())
# TODO: missing tests for input=1
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestTrackingList))
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment