Commit 85d8329a authored by Ophélie Gagnard's avatar Ophélie Gagnard

slapos_metadata_transform_agent&test: multi data array

parent e5024fd6
......@@ -4,7 +4,6 @@ portal = context.getPortalObject()
portal.portal_catalog.searchAndActivate(
portal_type="Data Array",
publication_section_relative_url="publication_section/file_system_image/node_image",
simulation_state="converted",
method_id='DataArray_generateDiffND',
activate_kw={'tag': tag},
......
portal = context.getPortalObject()
operation = None
use = None
parameter_dict = {}
context.checkConsistency(fixit=True)
initial_product = context.getSpecialiseValue(portal_type="Data Transformation").getResourceValue()
for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
resource = analysis_line.getResourceValue()
if resource == initial_product:
use = analysis_line.getUse()
if resource is not None:
resource_portal_type = resource.getPortalType()
else:
resource_portal_type = ''
if resource_portal_type == 'Data Operation':
operation_analysis_line = analysis_line
operation = analysis_line.getResourceValue()
else:
parameter = {}
for portal_type in ["Data Array", "Progress Indicator"] + \
list(portal.getPortalDataSinkTypeList()) + \
list(portal.getPortalDataDescriptorTypeList()):
value = analysis_line.getAggregateValue(portal_type=portal_type)
if value is not None:
parameter[portal_type] = value
#data array
for value in analysis_line.getAggregateValueList(portal_type='Data Array'):
if value.getSimulationState() != 'processed':
parameter['Data Array'] = value
for base_category in analysis_line.getVariationRangeBaseCategoryList():
parameter[base_category] = analysis_line.getVariationCategoryItemList(
base_category_list=(base_category,))[0][0]
reference = analysis_line.getReference()
# several lines with same reference wil turn the parameter into a list
if reference in parameter_dict:
if not isinstance(parameter_dict[reference], list):
parameter_dict[reference] = [parameter_dict[reference]]
parameter_dict[reference].append(parameter)
else:
parameter_dict[reference] = parameter
script_id = operation.getScriptId()
out = getattr(operation_analysis_line, script_id)(**parameter_dict)
if out == 1:
context.activate(serialization_tag=str(context.getUid())).DataAnalysis_executeDataOperation()
else:
# only stop batch ingestions
if use == "big_data/ingestion/batch":
context.stop()
# stop refresh
if context.getRefreshState() == "refresh_started":
context.stopRefresh()
return out
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>DataAnalysis_executeDataOperation</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
import numpy as np
from Products.ZSQLCatalog.SQLCatalog import AndQuery, Query
# Do nothing for reference image, just change state
if 'file_system_image/reference_image' in context.getPublicationSectionList():
context.processFile()
return
for publication_section in context.getPublicationSectionList():
if 'distribution' in publication_section:
current_node_distribution = publication_section
......
from DateTime import DateTime
from Products.ZSQLCatalog.SQLCatalog import AndQuery, OrQuery, Query
from Products.ERP5Type.Errors import UnsupportedWorkflowMethod
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
now = DateTime()
if not include_delivered:
batch_simulation_state = "stopped"
stream_simulation_state = "started"
else:
batch_simulation_state = ["stopped", "delivered"]
stream_simulation_state = ["started", "stopped", "delivered"]
query = AndQuery(
Query(portal_type = ["Data Ingestion Line", "Data Analysis Line"]),
Query(**{"stock.quantity": "!=0"}),
Query(resource_portal_type = "Data Product"),
# Should be improved to support mor than one analysis per ingestion
#SimpleQuery(parent_causality_related_relative_url = None),
OrQuery(Query(simulation_state = batch_simulation_state,
use_relative_url = "use/big_data/ingestion/batch"),
Query(simulation_state = stream_simulation_state,
use_relative_url = "use/big_data/ingestion/stream")))
for movement in portal_catalog(query = query):
if movement.getQuantity() <= 0:
continue
if movement.DataIngestionLine_hasMissingRequiredItem():
raise ValueError("Transformation requires movement to have " +
"aggregated data ingestion batch")
delivery = movement.getParentValue()
data_supply = delivery.getSpecialiseValue(portal_type="Data Supply")
data_supply_list = delivery.getSpecialiseValueList(portal_type="Data Supply")
composed_data_supply = data_supply.asComposedDocument()
# Get applicable transformation
transformation_list = []
for transformation in composed_data_supply.getSpecialiseValueList(portal_type="Data Transformation"):
for line in transformation.objectValues():
if line.getResourceValue() == movement.getResourceValue() and line.getQuantity() < 0:
transformation_list.append(transformation)
break
for transformation in portal.portal_catalog(
portal_type = "Data Transformation",
validation_state = "validated",
resource_relative_url = movement.getResource()):
if transformation.getVariationCategoryList() == movement.getVariationCategoryList():
transformation_list.append(transformation)
for transformation in transformation_list:
is_shared_data_analysis = False
# Check if analysis already exists
data_analysis = portal_catalog.getResultValue(
portal_type="Data Analysis",
specialise_relative_url = transformation.getRelativeUrl(),
causality_relative_url = delivery.getRelativeUrl())
if data_analysis is not None:
# Find data array line
for data_analysis_line in data_analysis.objectValues(portal_type='Data Analysis Line'):
data_array_list = data_analysis_line.getAggregateValueList(portal_type='Data Array')
if data_array_list:
has_to_check_array = False
for data_array in data_array_list:
if data_array.getSimulationState() != 'processed':
has_to_check_array = True
break
# all data array is checked, create new one
if not has_to_check_array:
module = portal.getDefaultModule('Data Array')
data_array = module.newContent(portal_type = 'Data Array',
title = transformation.getTitle(),
reference = "%s-%s" %(transformation.getTitle(),delivery.getReference()))
data_analysis_line.setAggregateValueList(data_analysis_line.getAggregateValueList() + [data_array])
break
continue
# for first level analysis check if same kind of data analysis with same project and same source already exists
# If yes, then later add additional input lines to this shared data analysis
if delivery.getPortalType() == "Data Ingestion":
data_analysis = portal_catalog.getResultValue(
portal_type="Data Analysis",
specialise_relative_url = transformation.getRelativeUrl(),
source_relative_url = delivery.getSource(),
destination_project_relative_url = delivery.getDestinationProject())
if data_analysis is not None:
data_analysis.setDefaultCausalityValue(delivery)
data_analysis.setSpecialiseValueSet(data_analysis.getSpecialiseValueList() + data_supply_list)
is_shared_data_analysis = True
else:
# Create Analysis
data_analysis = portal.data_analysis_module.newContent(
portal_type = "Data Analysis",
title = transformation.getTitle(),
reference = delivery.getReference(),
start_date = delivery.getStartDate(),
stop_date = delivery.getStopDate(),
specialise_value_list = [transformation] + data_supply_list,
causality_value = delivery,
source = delivery.getSource(),
source_section = delivery.getSourceSection(),
source_project = delivery.getSourceProject(),
destination = delivery.getDestination(),
destination_section = delivery.getDestinationSection(),
destination_project = delivery.getDestinationProject())
data_analysis.checkConsistency(fixit=True)
# create input and output lines
for transformation_line in transformation.objectValues(
portal_type=["Data Transformation Resource Line",
"Data Transformation Operation Line"]):
resource = transformation_line.getResourceValue()
quantity = transformation_line.getQuantity()
if isinstance(quantity, tuple):
quantity = quantity[0]
# In case of shared data anylsis only add additional input lines
if is_shared_data_analysis and quantity > -1:
continue
aggregate_set = set()
# manually add device to every line
aggregate_set.add(movement.getAggregateDevice())
if transformation_line.getPortalType() == \
"Data Transformation Resource Line":
# at the moment, we only check for positive or negative quantity
if quantity < 0:
# it is an input line
# aggregate transformed item from data ingestion batch related to our
# movement. If it is an input resource line, then we search for an
# ingestion line with the same resource. If it is an operation line
# then we search for an ingestion line with resource portal type
# Data Product
batch_relative_url = movement.getAggregateDataIngestionBatch()
if batch_relative_url is not None:
related_movement_list = portal_catalog(
portal_type="Data Ingestion Line",
aggregate_relative_url=batch_relative_url,
resource_relative_url = resource.getRelativeUrl())
else:
# get related movements only from current data ingestion
related_movement_list = movement.getParentValue().searchFolder(
portal_type=["Data Ingestion Line", "Data Analysis Line"],
resource_relative_url = resource.getRelativeUrl())
for related_movement in related_movement_list:
aggregate_set.update(related_movement.getAggregateSet())
if related_movement.getUse() == "big_data/ingestion/batch":
related_movement.getParentValue().deliver()
# create new item based on item_type if it is not already aggregated
aggregate_type_set = set(
[portal.restrictedTraverse(a).getPortalType() for a in aggregate_set])
for item_type in transformation_line.getAggregatedPortalTypeList():
# create item if it does note exist yet.
# Except if it is a Data Array Line, then it is currently created by
# data operation itself (probably this exception is inconsistent)
if item_type not in aggregate_type_set and item_type != "Data Array Line":
item_query_dict = dict(
portal_type=item_type,
validation_state="validated",
item_variation_text=transformation_line.getVariationText(),
item_device_relative_url=movement.getAggregateDevice(),
item_resource_uid=resource.getUid(),
item_source_relative_url=data_analysis.getSource())
if data_analysis.getDestinationProjectValue() is not None:
item_query_dict["item_project_relative_url"] = data_analysis.getDestinationProject()
item = portal.portal_catalog.getResultValue(**item_query_dict)
if item is None:
module = portal.getDefaultModule(item_type)
item = module.newContent(portal_type = item_type,
title = transformation.getTitle(),
reference = "%s-%s" %(transformation.getTitle(),
delivery.getReference()),
version = '001')
try:
item.validate()
except AttributeError:
pass
aggregate_set.add(item.getRelativeUrl())
# find other items such as device configuration and data configuration
# from data ingestion and data supply
composed = data_analysis.asComposedDocument()
line_list = [l for l in delivery.objectValues(portal_type="Data Ingestion Line")]
line_list += [l for l in composed.objectValues(portal_type="Data Supply Line")]
for line in line_list:
if line.getResourceValue().getPortalType() == "Data Operation":
aggregate_set.update(line.getAggregateList())
data_analysis_line = data_analysis.newContent(
portal_type = "Data Analysis Line",
title = transformation_line.getTitle(),
reference = transformation_line.getReference(),
int_index = transformation_line.getIntIndex(),
resource_value = resource,
variation_category_list = transformation_line.getVariationCategoryList(),
quantity = quantity,
quantity_unit = transformation_line.getQuantityUnit(),
use = transformation_line.getUse(),
aggregate_set = aggregate_set)
# for intput lines of first level analysis set causality and specialise
if quantity < 0 and delivery.getPortalType() == "Data Ingestion":
data_analysis_line.edit(
causality_value = delivery,
specialise_value_list = data_supply_list)
data_analysis.checkConsistency(fixit=True)
try:
data_analysis.start()
except UnsupportedWorkflowMethod:
pass
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>include_delivered=False</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_createDataAnalysisList</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -373,3 +373,54 @@ tail.0:2021-10-15 15:11:02.230745474 +0200 CEST[fluentbit_end]\n'
)
self.assertEqual(len(data_array_list), 1)
self.assertEqual(data_array_list[0].getCausalityValue(), self.data_product)
def test_multi_data_array(self):
request = self.portal.REQUEST
request_dict = self._create_request_dict()
for reference in request_dict:
request.environ["REQUEST_METHOD"] = 'POST'
request.set('reference', reference)
request.set('data_chunk', request_dict[reference])
self.portal.portal_ingestion_policies.metadata_upload.ingest()
self.tic()
data_stream_list = self.portal.portal_catalog(portal_type = 'Data Stream')
self.assertEqual(len(data_stream_list), 3)
self.portal.portal_alarms.wendelin_handle_analysis.activeSense()
self.tic()
self.portal.portal_alarms.wendelin_handle_analysis.activeSense()
self.tic()
# 2 references, 1 node
data_array_list = self.portal.portal_catalog(portal_type='Data Array')
self.assertEqual(len(data_array_list), 3)
# 1 is created
self.portal.portal_alarms.slapos_check_node_status.activeSense()
self.tic()
self.portal.portal_alarms.slapos_check_node_status.activeSense()
self.tic()
data_array_list = self.portal.portal_catalog(portal_type='Data Array')
self.assertEqual(len(data_array_list), 4)
for data_array in data_array_list:
self.assertEqual(data_array.getSimulationState(), 'processed')
request = self.portal.REQUEST
request_dict = self._create_request_dict()
for reference in request_dict:
request.environ["REQUEST_METHOD"] = 'POST'
request.set('reference', reference)
request.set('data_chunk', request_dict[reference])
self.portal.portal_ingestion_policies.metadata_upload.ingest()
self.tic()
self.portal.portal_alarms.wendelin_handle_analysis.activeSense()
self.tic()
self.portal.portal_alarms.wendelin_handle_analysis.activeSense()
self.tic()
new_data_array_list = self.portal.portal_catalog(portal_type='Data Array')
new_data_array_list = [x for x in new_data_array_list if x.getSimulationState() == 'converted']
self.assertEqual(len(new_data_array_list), 3)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment