Commit b95634cc authored by Klaus Wölfel's avatar Klaus Wölfel

Add required scripts for handling data ingestion and data analysis

parent b5931057
portal = context.getPortalObject()
portal.ERP5Site_stopIngestionList()
portal.ERP5Site_createDataAnalysisList()
portal.ERP5Site_executeDataAnalysisList()
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Alarm_handleAnalysis</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
from DateTime import DateTime
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
now = DateTime()
query_dict = {
"portal_type": "Data Ingestion Line",
"stock.quantity": '!=0',
"resource_portal_type": "Data Product",
"simulation_state": "stopped"}
for movement in portal_catalog(**query_dict):
batch_relative_url = movement.getAggregateDataIngestionBatch()
if batch_relative_url is None:
raise ValueError("Transformation requires movement to have " +
"aggregated data ingestion batch")
data_ingestion = movement.getParentValue()
# Get applicable transformation
for transformation in portal_catalog(
portal_type = "Data Transformation",
validation_state = "validated",
resource_relative_url = movement.getResource()):
# Create Analysis
data_analysis = portal.data_analysis_module.newContent(
portal_type = "Data Analysis",
title = transformation.getTitle(),
reference = data_ingestion.getReference(),
start_date = now,
specialise_value = transformation,
causality_value = data_ingestion,
source = data_ingestion.getSource(),
source_section = data_ingestion.getSourceSection(),
source_project = data_ingestion.getSourceProject(),
destination = data_ingestion.getDestination(),
destination_section = data_ingestion.getDestinationSection(),
destination_project = data_ingestion.getDestinationProject())
# create input and output lines
for transformation_line in transformation.objectValues(
portal_type=["Data Transformation Resource Line",
"Data Transformation Operation Line"]):
resource = transformation_line.getResourceValue()
quantity = transformation_line.getQuantity()
if isinstance(quantity, tuple):
quantity = quantity[0]
aggregate_set = set()
# manually add device and device configuration to every line
aggregate_set.add(movement.getAggregateDevice())
aggregate_set.add(movement.getAggregateDeviceConfiguration())
if transformation_line.getPortalType() == \
"Data Transformation Resource Line":
# at the moment, we only check for positive or negative quantity
if quantity < 0:
# it is an input line
# aggregate transformed item from data ingestion batch related to our
# movement. If it is an input resource line, then we search for an
# ingestion line with the same resource. If it is an operation line
# then we search for an ingestion line with resource portal type
# Data Product
related_movement_list = portal_catalog(
portal_type="Data Ingestion Line",
aggregate_relative_url=batch_relative_url,
resource_relative_url = resource.getRelativeUrl())
for related_movement in related_movement_list:
aggregate_set.update(related_movement.getAggregateSet())
related_movement.getParentValue().deliver()
else:
# it is an output line
# create new item based on item_type
item_type = resource.getAggregatedPortalType()
module = portal.getDefaultModule(item_type)
item = module.newContent(portal_type = item_type,
title = transformation.getTitle(),
reference = "%s-%s" %(transformation.getTitle(),
data_ingestion.getReference()),
version = '001')
item.validate()
aggregate_set.add(item)
data_analysis.newContent(
portal_type = "Data Analysis Line",
title = transformation_line.getTitle(),
reference = transformation_line.getReference(),
int_index = transformation_line.getIntIndex(),
resource_value = resource,
quantity = quantity,
quantity_unit = transformation_line.getQuantityUnit(),
aggregate_value_set = aggregate_set)
data_analysis.plan()
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_createDataAnalysisList</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
for data_analysis in portal_catalog(portal_type = "Data Analysis",
simulation_state = "planned"):
data_analysis.start()
data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\
.DataAnalysis_executeDataOperation()
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_executeDataAnalysisList</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
from DateTime import DateTime
from Products.ERP5Type.DateUtils import addToDate
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
portal_catalog = context.getPortalObject().portal_catalog
# search for dates older than one minute ago
old_start_date = addToDate(DateTime(), {'minute' : -1})
start_date_query = Query(**{'delivery.start_date': old_start_date, 'range': 'ngt'})
kw_dict = {"query": start_date_query,
"portal_type": "Data Ingestion",
"simulation_state": "started"}
parent_uid_list = [x.getUid() for x in portal_catalog(**kw_dict)]
if len(parent_uid_list) != 0:
kw_dict = {"portal_type": "Data Ingestion Line",
"stock.quantity": '!=0',
"resource_portal_type": "Data Product",
"parent_uid": parent_uid_list}
for data_ingestion_line in portal_catalog(**kw_dict):
data_ingestion = data_ingestion_line.getParentValue()
# make sure that each movement has aggregate batch
# this relies on reference. It cannot be done on
# ingestion because the possibility of parallel ingestion
#
# should be probably done by consistency constraint
# probably it can be done in generic way using required item type
# on resource.
if data_ingestion_line.DataIngestionLine_needRequiredItem():
batch = data_ingestion_line.getAggregateDataIngestionBatchValue()
if data_ingestion_line.DataIngestionLine_hasMissingRequiredItem():
# make sure that each movement has aggregate batch
# this relies on reference. It cannot be done on
# ingestion because the possibility of parallel ingestion
# TODO: make it generic, use Constraint
if batch is None:
reference_tuple = data_ingestion.getReference().split('.')
data_ingestion_batch_reference = '.'.join(reference_tuple[:-1])
batch = portal_catalog.getResultValue(
portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference)
if batch is not None:
data_ingestion_line.setDefaultAggregateValue(batch)
else:
# we need to wait for existing batch before we can stop the data ingestion
continue
# -> only stop when stop date is older than current date
# -> set top date using information from data operation script
if len(batch.getAggregateRelatedList()) < 2:
# we need to wait until there are 2 batches until we can stop it
# TODO: this should be implemented in transformation, not here
continue
# Only stop data ingestion of related resource is configured for batch
# ingestion
# TODO: probably this should be done in another way
resource = data_ingestion_line.getResourceValue()
if "big_data/ingestion/batch_ingestion" in resource.getUseList():
data_ingestion.setStopDate(DateTime())
data_ingestion.stop()
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_stopIngestionList</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
from DateTime import DateTime
now = DateTime()
today_string = now.strftime('%Y%m%d')
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
data_ingestion_reference = movement_dict.get('reference', reference)
data_ingestion_id = "%s-%s" %(today_string, data_ingestion_reference)
resource_reference = movement_dict.get('resource_reference', None)
specialise_reference = movement_dict.get('specialise_reference', None)
# first search for applicable data ingestion
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
simulation_state = 'started',
reference =data_ingestion_reference)
if data_ingestion is None:
data_ingestion = portal.data_ingestion_module.get(data_ingestion_id)
if data_ingestion is None:
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults(
portal_type = 'Data Supply',
reference = specialise_reference,
validation_state = 'validated')]
# if we do not find a validated data supply, we look for a default data supply
if not specialise_value_list:
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults(
portal_type = 'Data Supply',
reference = specialise_reference,
validation_state = 'default')]
context.log(specialise_value_list)
# create a new data ingestion
data_ingestion = portal.data_ingestion_module.newContent(
id = data_ingestion_id,
portal_type = "Data Ingestion",
reference = data_ingestion_reference,
start_date = now,
specialise_value_list = specialise_value_list)
property_list = ["title",
"source",
"source_section",
"source_project",
"destination",
"destination_section",
"destination_project",
"specialise"]
composed = data_ingestion.asComposedDocument()
data_ingestion.edit(**{p: composed.getProperty(p) for p in property_list})
# create ingestion lines from specialise lines and assign input line
# and operation line
input_line = None
operation_line = None
for supply_line in composed.objectValues(
portal_type = 'Data Supply Line'):
current_line = data_ingestion.newContent(
portal_type = "Data Ingestion Line",
title = supply_line.getTitle(),
aggregate = supply_line.getAggregateList(),
int_index = supply_line.getIntIndex(),
quantity = supply_line.getQuantity(),
reference = supply_line.getReference(),
resource = supply_line.getResource(),
)
if current_line.getResourceReference() == resource_reference:
input_line = current_line
elif current_line.getResourceValue().getPortalType() == "Data Operation":
operation_line = current_line
else:
# we set quantity=0 for the empty line
current_line.setQuantity(0)
# copy device and configuration from operation line to input line
input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList())
# Check if we have a batch referece
data_ingestion_batch_reference = movement_dict.get(
'aggregate_data_ingestion_batch_reference',
None)
data_ingestion_batch_id = "%s-%s" %(today_string,
data_ingestion_batch_reference)
data_stream = None
if data_ingestion_batch_reference is not None:
data_ingestion_batch = portal_catalog.getResultValue(
portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference)
if data_ingestion_batch is None:
data_ingestion_batch = portal.data_ingestion_batch_module.get(
data_ingestion_batch_id)
if data_ingestion_batch is None:
data_ingestion_batch = portal.data_ingestion_batch_module.newContent(
id = data_ingestion_batch_id,
portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference)
else:
previous_data_ingestion_line = portal_catalog.getResultValue(
portal_type = "Data Ingestion Line",
resource_reference = resource_reference,
aggregate_uid = data_ingestion_batch.getUid())
if previous_data_ingestion_line is not None:
data_stream = previous_data_ingestion_line\
.getAggregateDataStreamValue()
input_line.setDefaultAggregateValue(data_ingestion_batch)
if data_stream is None:
data_stream = portal.data_stream_module.newContent(
portal_type = "Data Stream",
reference = data_ingestion_reference)
data_stream.validate()
input_line.setDefaultAggregateValue(data_stream)
data_ingestion.start()
else:
# find ingestion line for current resource
for line in data_ingestion.objectValues(portal_type="Data Ingestion Line"):
if line.getResourceReference() == resource_reference:
input_line = line
elif line.getResourceValue().getPortalType() == "Data Operation":
operation_line = line
data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue()
return data_operation, {'data_stream': data_stream}
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>movement_dict, reference</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>IngestionPolicy_getIngestionOperationAndParameterDict</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment