Commit dbf34672 authored by Klaus Wölfel's avatar Klaus Wölfel

enable data analysis as causality for other data analysis and adding input and...

enable data analysis as causality for other data analysis and adding input and output parameter as dict with additional information
parent c1b53f31
portal = context.getPortalObject()
operation = None
use = None
parameter_dict = {}
......@@ -14,21 +15,30 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
operation_analysis_line = analysis_line
operation = analysis_line.getResourceValue()
else:
reference = analysis_line.getReference()
aggregate = analysis_line.getAggregateDataSinkValue() or \
analysis_line.getAggregateDataArrayValue() or \
analysis_line.getAggregateDataDescriptorValue()
parameter_dict[reference] = aggregate
progress_indicator = analysis_line.getAggregateProgressIndicatorValue()
if progress_indicator is not None:
parameter_dict["%s_progress_indicator" %reference] = progress_indicator
parameter = {}
for portal_type in ["Data Array", "Progress Indicator"] + \
list(portal.getPortalDataSinkTypeList()) + \
list(portal.getPortalDataDescriptorTypeList()):
value = analysis_line.getAggregateValue(portal_type=portal_type)
if value is not None:
parameter[portal_type] = value
for base_category in analysis_line.getVariationRangeBaseCategoryList():
parameter_dict["%s_%s" %(reference, base_category)] = \
analysis_line.getVariationCategoryItemList(
base_category_list=(base_category,))[0][0]
parameter[base_category] = analysis_line.getVariationCategoryItemList(
base_category_list=(base_category,))[0][0]
reference = analysis_line.getReference()
# several lines with same reference wil turn the parameter into a list
if reference in parameter_dict:
if not isinstance(parameter_dict[reference], list):
parameter_dict[reference] = [parameter_dict[reference]]
parameter_dict[reference].append(parameter)
else:
parameter_dict[reference] = parameter
script_id = operation.getScriptId()
getattr(operation_analysis_line, script_id)(**parameter_dict)
out = getattr(operation_analysis_line, script_id)(**parameter_dict)
# only stop batch ingestions
if use == "big_data/ingestion/batch":
context.stop()
return out
......@@ -7,7 +7,7 @@ portal_catalog = portal.portal_catalog
now = DateTime()
query = AndQuery(
Query(portal_type = "Data Ingestion Line"),
Query(portal_type = ["Data Ingestion Line", "Data Analysis Line"]),
Query(**{"stock.quantity": "!=0"}),
Query(resource_portal_type = "Data Product"),
# Should be improved to support mor than one analysis per ingestion
......@@ -20,6 +20,8 @@ query = AndQuery(
Query(use_relative_url = "use/big_data/ingestion/stream"))))
for movement in portal_catalog(query):
if movement.getQuantity() <= 0:
continue
if movement.DataIngestionLine_hasMissingRequiredItem():
raise ValueError("Transformation requires movement to have " +
"aggregated data ingestion batch")
......@@ -80,7 +82,7 @@ for movement in portal_catalog(query):
else:
# get related movements only from current data ingestion
related_movement_list = movement.getParentValue().searchFolder(
portal_type="Data Ingestion Line",
portal_type=["Data Ingestion Line", "Data Analysis Line"],
resource_relative_url = resource.getRelativeUrl())
for related_movement in related_movement_list:
aggregate_set.update(related_movement.getAggregateSet())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment