Commit 150915f9 authored by Klaus Wölfel's avatar Klaus Wölfel Committed by Levin Zimmermann

Data Analysis: Support transient in- and output

This commit is a rebase of the following original commits:

Analysis execution methods supporting transient in- and output

---

Conflicting files:
bt5/erp5_wendelin/SkinTemplateItem/portal_skins/erp5_wendelin/DataAnalysis_executeDataOperation.py
bt5/erp5_wendelin/SkinTemplateItem/portal_skins/erp5_wendelin/DataAnalysis_executeDataOperation.xml
bt5/erp5_wendelin/SkinTemplateItem/portal_skins/erp5_wendelin/ERP5Site_executeDataAnalysisList.py

processing of analysis with transient items

---

Conflicts:
bt5/erp5_wendelin/SkinTemplateItem/portal_skins/erp5_wendelin/DataAnalysis_executeDataOperation.py
bt5/erp5_wendelin/SkinTemplateItem/portal_skins/erp5_wendelin/ERP5Site_createDataAnalysisList.py
parent 1098afbf
portal = context.getPortalObject() portal = context.getPortalObject()
operation = None operation = None
use = None use_list = []
parameter_dict = {} parameter_dict = {}
transient_output_item = None
context.checkConsistency(fixit=True) context.checkConsistency(fixit=True)
initial_product = context.getSpecialiseValue(portal_type="Data Transformation").getResourceValue() initial_product = context.getSpecialiseValue(portal_type="Data Transformation").getResourceValue()
for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line"), for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line"),
key=lambda x: x.getIntIndex()): key=lambda x: x.getIntIndex()):
resource = analysis_line.getResourceValue() resource = analysis_line.getResourceValue()
if resource == initial_product: if resource == initial_product:
use = analysis_line.getUse() use_list = analysis_line.getUseList()
if resource is not None: if resource is not None:
resource_portal_type = resource.getPortalType() resource_portal_type = resource.getPortalType()
else: else:
...@@ -19,13 +20,20 @@ for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line ...@@ -19,13 +20,20 @@ for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line
operation = analysis_line.getResourceValue() operation = analysis_line.getResourceValue()
else: else:
parameter = {} parameter = {}
for portal_type in ["Data Array", "Progress Indicator"] + \ for portal_type in ["Data Array", "Data Array View", "Progress Indicator"] + \
list(portal.getPortalDataSinkTypeList()) + \ list(portal.getPortalDataSinkTypeList()) + \
list(portal.getPortalDataDescriptorTypeList()): list(portal.getPortalDataDescriptorTypeList()):
value = analysis_line.getAggregateValue(portal_type=portal_type) value = analysis_line.getAggregateValue(portal_type=portal_type)
if value is not None: if value is not None:
parameter[portal_type] = value parameter[portal_type] = value
if analysis_line.getQuantity() < 0 and "big_data/analysis/transient" in analysis_line.getUseList():
# at the moment we only support transient data arrays
parameter['Data Array'] = transient_input_item
if analysis_line.getQuantity() > 0 and "big_data/analysis/transient" in analysis_line.getUseList():
# at the moment we only support transient data arrays
transient_output_item = portal.data_array_module.newContent(portal_type='Data Array',
temp_object=True)
parameter['Data Array'] = transient_output_item
for base_category in analysis_line.getVariationRangeBaseCategoryList(): for base_category in analysis_line.getVariationRangeBaseCategoryList():
parameter[base_category] = analysis_line.getVariationCategoryItemList( parameter[base_category] = analysis_line.getVariationCategoryItemList(
base_category_list=(base_category,))[0][0] base_category_list=(base_category,))[0][0]
...@@ -44,15 +52,20 @@ for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line ...@@ -44,15 +52,20 @@ for analysis_line in sorted(context.objectValues(portal_type="Data Analysis Line
else: else:
parameter_dict[reference] = parameter parameter_dict[reference] = parameter
script_id = operation.getScriptId() if transient_output_item is not None and not consuming_analysis_list:
return
script_id = operation.getScriptId()
out = getattr(operation_analysis_line, script_id)(**parameter_dict) out = getattr(operation_analysis_line, script_id)(**parameter_dict)
for consuming_analysis in consuming_analysis_list:
portal.restrictedTraverse(consuming_analysis).DataAnalysis_executeDataOperation(transient_input_item = transient_output_item)
if out == 1: if out == 1:
context.activate(serialization_tag=str(context.getUid())).DataAnalysis_executeDataOperation() context.activate(serialization_tag=str(context.getUid())).DataAnalysis_executeDataOperation(consuming_analysis_list)
else: else:
# only stop batch ingestions # only stop batch ingestions
if use == "big_data/ingestion/batch": if "big_data/ingestion/batch" in use_list:
context.stop() context.stop()
# stop refresh # stop refresh
if context.getRefreshState() == "refresh_started": if context.getRefreshState() == "refresh_started":
......
...@@ -50,7 +50,7 @@ ...@@ -50,7 +50,7 @@
</item> </item>
<item> <item>
<key> <string>_params</string> </key> <key> <string>_params</string> </key>
<value> <string></string> </value> <value> <string>consuming_analysis_list=[], transient_input_item=None</string> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
......
...@@ -121,13 +121,13 @@ for movement in portal_catalog(query = query): ...@@ -121,13 +121,13 @@ for movement in portal_catalog(query = query):
resource_relative_url = resource.getRelativeUrl()) resource_relative_url = resource.getRelativeUrl())
for related_movement in related_movement_list: for related_movement in related_movement_list:
#aggregate_set.update(related_movement.getAggregateSet()) if "big_data/ingestion/batch" in related_movement.getUseList():
related_movement.getParentValue().deliver() related_movement.getParentValue().deliver()
# create new item based on item_type if it is not already aggregated # create new item based on item_type if it is not already aggregated
aggregate_type_set = set( aggregate_type_set = set(
[portal.restrictedTraverse(a).getPortalType() for a in aggregate_set]) [portal.restrictedTraverse(a).getPortalType() for a in aggregate_set])
for item_type in transformation_line.getAggregatedPortalTypeList(): for item_type in transformation_line.getAggregatedPortalTypeList():
# if item is not yet aggregated to this line, search it by related project # if item is not yet aggregated to this line, search it by related project
# and source If the item is a data configuration or a device configuration # and source If the item is a data configuration or a device configuration
...@@ -135,14 +135,18 @@ for movement in portal_catalog(query = query): ...@@ -135,14 +135,18 @@ for movement in portal_catalog(query = query):
# the variation nor the related sensor. Data Array Lines are created # the variation nor the related sensor. Data Array Lines are created
# by Data Operation. # by Data Operation.
if item_type not in aggregate_type_set: if all(
[
if item_type in portal.getPortalDeviceConfigurationTypeList() + portal.getPortalDataConfigurationTypeList(): # Do not create item if it is a Data Array Line, then it is created by data operation itself.
item_type not in aggregate_type_set,
if item_type == "Status Configuration": # Do not create item if it is a transient Data Array.
not (item_type == "Data Array" and "big_data/analysis/transient" in transformation_line.getUseList()),
]
):
item = None item = None
else: if item_type in portal.getPortalDeviceConfigurationTypeList() + portal.getPortalDataConfigurationTypeList():
if item_type != "Status Configuration":
item = portal.portal_catalog.getResultValue( item = portal.portal_catalog.getResultValue(
portal_type=item_type, portal_type=item_type,
#validation_state="validated", #validation_state="validated",
...@@ -150,7 +154,6 @@ for movement in portal_catalog(query = query): ...@@ -150,7 +154,6 @@ for movement in portal_catalog(query = query):
item_source_relative_url=delivery.getSource()) item_source_relative_url=delivery.getSource())
elif item_type != "Data Array Line": elif item_type != "Data Array Line":
item_query_dict = dict( item_query_dict = dict(
portal_type=item_type, portal_type=item_type,
validation_state="validated", validation_state="validated",
...@@ -159,8 +162,10 @@ for movement in portal_catalog(query = query): ...@@ -159,8 +162,10 @@ for movement in portal_catalog(query = query):
item_resource_uid=resource.getUid(), item_resource_uid=resource.getUid(),
item_source_relative_url=data_analysis.getSource()) item_source_relative_url=data_analysis.getSource())
if data_analysis.getDestinationProjectValue() is not None: if data_analysis.getDestinationProjectValue() is not None:
item_query_dict["item_project_relative_url"] = data_analysis.getDestinationProject() item_query_dict["item_project_relative_url"] = data_analysis.getDestinationProject()
item = portal.portal_catalog.getResultValue(**item_query_dict) item = portal.portal_catalog.getResultValue(**item_query_dict)
if item is None: if item is None:
...@@ -176,7 +181,9 @@ for movement in portal_catalog(query = query): ...@@ -176,7 +181,9 @@ for movement in portal_catalog(query = query):
pass pass
aggregate_set.add(item.getRelativeUrl()) aggregate_set.add(item.getRelativeUrl())
tag = "%s-%s" %(data_analysis.getUid(), transformation_line.getUid())
data_analysis_line = data_analysis.newContent( data_analysis_line = data_analysis.newContent(
activate_kw={'tag': tag},
portal_type = "Data Analysis Line", portal_type = "Data Analysis Line",
title = transformation_line.getTitle(), title = transformation_line.getTitle(),
reference = transformation_line.getReference(), reference = transformation_line.getReference(),
...@@ -185,7 +192,7 @@ for movement in portal_catalog(query = query): ...@@ -185,7 +192,7 @@ for movement in portal_catalog(query = query):
variation_category_list = transformation_line.getVariationCategoryList(), variation_category_list = transformation_line.getVariationCategoryList(),
quantity = quantity, quantity = quantity,
quantity_unit = transformation_line.getQuantityUnit(), quantity_unit = transformation_line.getQuantityUnit(),
use = transformation_line.getUse(), use_list = transformation_line.getUseList(),
aggregate_set = aggregate_set) aggregate_set = aggregate_set)
# for intput lines of first level analysis set causality and specialise # for intput lines of first level analysis set causality and specialise
if quantity < 0 and delivery.getPortalType() == "Data Ingestion": if quantity < 0 and delivery.getPortalType() == "Data Ingestion":
...@@ -193,7 +200,14 @@ for movement in portal_catalog(query = query): ...@@ -193,7 +200,14 @@ for movement in portal_catalog(query = query):
causality_value = delivery, causality_value = delivery,
specialise_value_list = data_supply_list) specialise_value_list = data_supply_list)
data_analysis.checkConsistency(fixit=True) # fix consistency of line and all affected items. Do it after reindexing
# activities of newly created Data Analysis Line finished, because check
# consistency script might need to find the newly created Data Analysis
# Line in catalog.
data_analysis_line.checkConsistency(fixit=True)
for item in data_analysis_line.getAggregateValueList():
item.activate(after_tag=tag).checkConsistency(fixit=True)
try: try:
data_analysis.start() data_analysis.start()
except UnsupportedWorkflowMethod: except UnsupportedWorkflowMethod:
......
portal = context.getPortalObject() portal = context.getPortalObject()
#search_kw = { consuming_analysis_list_dict = {}
# 'simulation_state': 'started',
# 'portal_type': 'Data Analysis',
#}
#method_kw = { def add_consuming_analysis(producing_analysis_relative_url, consuming_analysis_relative_url):
# 'active_process': this_portal_type_active_process, consuming_analysis_list = consuming_analysis_list_dict.setdefault(producing_analysis_relative_url, [])
#} consuming_analysis_list.append(consuming_analysis_relative_url)
#activate_kw = {
# 'tag': tag,
# 'priority': priority,
#}
#portal.portal_catalog.searchAndActivate(
# method_id='DataAnalysis_executeDataOperation',
# method_kw=method_kw,
# activate_kw=activate_kw,
# **search_kw)
# First we split all started Data Analysis documents into documents with transient
# inputs and without transient inputs. Documents without transient inputs
# are added to 'data_analysis_list'.
data_analysis_list = []
for data_analysis in portal.portal_catalog(portal_type = "Data Analysis", for data_analysis in portal.portal_catalog(portal_type = "Data Analysis",
simulation_state = "started"): simulation_state = "started"):
has_transient_input = False
for line in data_analysis.objectValues(portal_type="Data Analysis Line"):
if line.getUse() == "big_data/analysis/transient" and line.getQuantity() < 0:
has_transient_input = True
add_consuming_analysis(line.getParentValue().getCausality(), line.getParentRelativeUrl())
if not has_transient_input:
data_analysis_list.append(data_analysis)
# Now we will activate `executeDataOperation` on given Data Analysis documents
for data_analysis in data_analysis_list:
if not data_analysis.hasActivity(): if not data_analysis.hasActivity():
if data_analysis.getRefreshState() == "current": if data_analysis.getRefreshState() == "current":
consuming_analysis_list = consuming_analysis_list_dict.get(data_analysis.getRelativeUrl(), [])
data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\ data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\
.DataAnalysis_executeDataOperation() .DataAnalysis_executeDataOperation(consuming_analysis_list)
# Finally we refresh specified Data Analysis documents
for data_analysis in portal.portal_catalog(portal_type = "Data Analysis", for data_analysis in portal.portal_catalog(portal_type = "Data Analysis",
refresh_state = "refresh_planned"): refresh_state = "refresh_planned"):
if data_analysis.getRefreshState() == "refresh_planned": if data_analysis.getRefreshState() == "refresh_planned":
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment