Commit c1113ce4 authored by Roque Porchetto's avatar Roque Porchetto

Data descriptor for metadata.

parent 389f68d2
...@@ -3,27 +3,26 @@ import mne ...@@ -3,27 +3,26 @@ import mne
import json import json
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
def getMetadata(raw_info): def getJSONMetadata(raw_info):
dict = { 'filename': raw_info['filename'], info = { 'filename': raw_info['filename'],
'nchan': raw_info['nchan'], 'nchan': raw_info['nchan'],
'ch_names': raw_info['ch_names'], 'ch_names': raw_info['ch_names'],
'sfreq': raw_info['sfreq'] 'sfreq': raw_info['sfreq']
} }
return json.dumps(dict) return json.dumps(info)
def processRawData(data_stream, data_array, data_stream_metadata): def processRawData(data_stream, data_array, data_descriptor):
try: try:
data_stream_content = data_stream.getData() data_stream_content = data_stream.getData()
raw_file = "ernoise_raw.fif" raw_file = "ernoise_raw.fif"
with open(raw_file, 'wb') as file: with open(raw_file, 'wb') as fif_file:
file.write(data_stream_content) fif_file.write(data_stream_content)
raw = mne.io.read_raw_fif(raw_file, preload=True) raw = mne.io.read_raw_fif(raw_file, preload=True)
json_metadata = getJSONMetadata(raw.info)
#data_descriptor.setTextContent(json_metadata)
#log("Data Descriptor metadata: " + str(data_descriptor.getTextContent()))
data_array.setArray(raw._data) data_array.setArray(raw._data)
log("Data Array array: " + str(data_array.getArray())) log("Data Array array: " + str(data_array.getArray()))
metadata = getMetadata(raw.info)
data_stream_metadata.appendData(metadata)
log("Data Stream metadata: " + str(data_stream_metadata.getData()))
os.remove(raw_file) os.remove(raw_file)
return "Raw data processed." return "Raw data processed."
except Exception, e: except Exception, e:
......
...@@ -52,9 +52,7 @@ ...@@ -52,9 +52,7 @@
<key> <string>text_content_warning_message</string> </key> <key> <string>text_content_warning_message</string> </key>
<value> <value>
<tuple> <tuple>
<string>W: 19, 0: Bad indentation. Found 8 spaces, expected 6 (bad-indentation)</string> <string>W: 21, 4: Unused variable \'json_metadata\' (unused-variable)</string>
<string>W: 7, 2: Redefining built-in \'dict\' (redefined-builtin)</string>
<string>W: 18, 33: Redefining built-in \'file\' (redefined-builtin)</string>
</tuple> </tuple>
</value> </value>
</item> </item>
......
...@@ -56,7 +56,7 @@ ...@@ -56,7 +56,7 @@
<key> <string>aggregated_portal_type</string> </key> <key> <string>aggregated_portal_type</string> </key>
<value> <value>
<tuple> <tuple>
<string>Fif Data Descriptor</string> <string>Data Descriptor</string>
</tuple> </tuple>
</value> </value>
</item> </item>
......
...@@ -2,7 +2,7 @@ from Products.ERP5Type.Log import log ...@@ -2,7 +2,7 @@ from Products.ERP5Type.Log import log
log("Processing raw data from Data Stream " + str(input_stream_data.getReference()) + " to Data Array " + str(output_array.getReference())) log("Processing raw data from Data Stream " + str(input_stream_data.getReference()) + " to Data Array " + str(output_array.getReference()))
result = str(context.processRawData(input_stream_data, output_array, output_metadata)) result = str(context.processRawData(input_stream_data, output_array, output_descriptor))
log(result) log(result)
log("Metadata stored in Data Stream " + str(output_metadata.getReference())) log("Metadata stored in Data Descriptor " + str(output_descriptor.getReference()))
...@@ -67,7 +67,7 @@ ...@@ -67,7 +67,7 @@
</item> </item>
<item> <item>
<key> <string>_params</string> </key> <key> <string>_params</string> </key>
<value> <string>input_stream_data, output_array, output_metadata</string> </value> <value> <string>input_stream_data, output_array, output_descriptor</string> </value>
</item> </item>
<item> <item>
<key> <string>description</string> </key> <key> <string>description</string> </key>
......
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
pair = reference.rsplit('.', 1) record = reference.rsplit('.')
ref = pair[0] dict = { 'filename': record[2],
end = pair[1] 'extension': record[3],
'eof': record[4],
supplier = "telecom" 'specialise_reference': record[0],
'aggregate_data_set_reference': record[1],
dict = { 'reference': ref, 'resource_reference': 'fif'
'end': end, }
'specialise_reference': supplier,
'resource_reference': 'fif',
'aggregate_data_set_reference': 'MNE_FIF'
}
log("From %s: returning dictionary: %s." % (script.getId(), dict)) log("From %s: returning dictionary: %s." % (script.getId(), dict))
......
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
log("Handle Analysis Alarm Fired!") log('Handle Analysis Alarm Fired!')
portal = context.getPortalObject() portal = context.getPortalObject()
portal.ERP5Site_stopIngestionList() portal.ERP5Site_stopIngestionList()
portal.ERP5Site_createDataAnalysisList() portal.ERP5Site_createDataAnalysisList()
......
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
log("Analisys Operation Script")
operation = None operation = None
parameter_dict = {} parameter_dict = {}
for analysis_line in context.objectValues(portal_type="Data Analysis Line"): for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
resource = analysis_line.getResourceValue() resource = analysis_line.getResourceValue()
log("##### Data Analisis Line. getResourceValue: " + str(resource.getReference))
if resource is not None: if resource is not None:
resource_portal_type = resource.getPortalType() resource_portal_type = resource.getPortalType()
else: else:
resource_portal_type = '' resource_portal_type = ''
log("Resource type: " + str(resource_portal_type))
if resource_portal_type == 'Data Operation': if resource_portal_type == 'Data Operation':
log("Getting data operation ...") log("Getting Data Operation and Resources from Data Analysis...")
operation_analysis_line = analysis_line operation_analysis_line = analysis_line
operation = analysis_line.getResourceValue() operation = analysis_line.getResourceValue()
else: else:
...@@ -22,7 +19,8 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"): ...@@ -22,7 +19,8 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
analysis_line.getAggregateDataArrayValue() or \ analysis_line.getAggregateDataArrayValue() or \
analysis_line.getAggregateDataDescriptorValue() analysis_line.getAggregateDataDescriptorValue()
parameter_dict[reference] = aggregate parameter_dict[reference] = aggregate
log("Resource ref: " + str(reference) + " - aggregate: " + str(aggregate))
script_id = operation.getScriptId() script_id = operation.getScriptId()
log("Calling script: " + str(script_id))
getattr(operation_analysis_line, script_id)(**parameter_dict) getattr(operation_analysis_line, script_id)(**parameter_dict)
context.stop() context.stop()
...@@ -3,37 +3,25 @@ portal = context.getPortalObject() ...@@ -3,37 +3,25 @@ portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
log("Create Analisys Script")
now = DateTime() now = DateTime()
query_dict = { query_dict = {
"portal_type": "Data Ingestion Line", "portal_type": "Data Ingestion Line",
#"stock.quantity": '!=0',
"resource_portal_type": "Data Product", "resource_portal_type": "Data Product",
"simulation_state": "stopped"} "simulation_state": "stopped"
}
for movement in portal_catalog(**query_dict): for line_data_ingestion in portal_catalog(**query_dict):
"""batch_relative_url = movement.getAggregateDataIngestionBatch() data_ingestion = line_data_ingestion.getParentValue()
if batch_relative_url is None:
raise ValueError("Transformation requires movement to have " +
"aggregated data ingestion batch")"""
data_ingestion = movement.getParentValue()
log("####################################### FOR ITERATION ####################################")
log("Data ingestion LINE: " + str(movement.getReference()))
log("Data ingestion: " + str(data_ingestion.getReference()))
log("Data ingestion resource: " + str(movement.getResource()))
# Get applicable transformation # Get applicable transformation
for transformation in portal_catalog( for transformation in portal_catalog(
portal_type = "Data Transformation", portal_type = "Data Transformation",
validation_state = "validated", validation_state = "validated",
resource_relative_url = movement.getResource()): resource_relative_url = line_data_ingestion.getResource()):
log("transformation: " + str(transformation) + " - ref: " + str(transformation.getReference()))
# Create Analysis # Create Analysis
log("creating Data Analysis") log("creating Data Analysis for Data Ingestion " + str(data_ingestion.getReference()))
data_analysis = portal.data_analysis_module.newContent( data_analysis = portal.data_analysis_module.newContent(
portal_type = "Data Analysis", portal_type = "Data Analysis",
title = transformation.getTitle(), title = "%s - %s" %(transformation.getTitle(),data_ingestion.getTitle()),
reference = data_ingestion.getReference(), reference = data_ingestion.getReference(),
start_date = now, start_date = now,
specialise_value = transformation, specialise_value = transformation,
...@@ -56,69 +44,41 @@ for movement in portal_catalog(**query_dict): ...@@ -56,69 +44,41 @@ for movement in portal_catalog(**query_dict):
quantity = quantity[0] quantity = quantity[0]
aggregate_set = set() aggregate_set = set()
# manually add device and device configuration to every line # manually add device and device configuration to every line
if movement.getAggregateDevice() is not None: if line_data_ingestion.getAggregateDevice() is not None:
aggregate_set.add(movement.getAggregateDevice()) aggregate_set.add(line_data_ingestion.getAggregateDevice())
log("adding getAggregateDevice to aggregate_set") if line_data_ingestion.getAggregateDeviceConfiguration() is not None:
log("aggregate_set content: " + str(aggregate_set)) aggregate_set.add(line_data_ingestion.getAggregateDeviceConfiguration())
if movement.getAggregateDeviceConfiguration() is not None: if transformation_line.getPortalType() == "Data Transformation Resource Line":
aggregate_set.add(movement.getAggregateDeviceConfiguration())
log("adding getAggregateDeviceConfiguration to aggregate_set")
log("aggregate_set content: " + str(aggregate_set))
#raise ValueError("aggregate_set: " + str(aggregate_set))
if transformation_line.getPortalType() == \
"Data Transformation Resource Line":
# at the moment, we only check for positive or negative quantity # at the moment, we only check for positive or negative quantity
log("Transformation line: " + str(transformation_line.getReference()))
log("Resource: " + str(resource) + " - relURL: " + str(resource.getRelativeUrl()))
log("QUANTITY: " + str(quantity))
if quantity < 0: if quantity < 0:
log("QUANTITY < 0!!!!!! It is an input line!") # it is an input line. If it is an input resource line, then we search for an
# it is an input line
# aggregate transformed item from data ingestion batch related to our
# movement. If it is an input resource line, then we search for an
# ingestion line with the same resource. If it is an operation line # ingestion line with the same resource. If it is an operation line
# then we search for an ingestion line with resource portal type # then we search for an ingestion line with resource portal type Data Product
# Data Product related_lines_list = portal_catalog(
related_movement_list = portal_catalog(
portal_type="Data Ingestion Line", portal_type="Data Ingestion Line",
simulation_state="stopped", simulation_state="stopped",
#aggregate_relative_url=batch_relative_url,
resource_relative_url = resource.getRelativeUrl()) resource_relative_url = resource.getRelativeUrl())
log("this resource has " + str(len(related_movement_list)) + " related movement/s") for related_line in related_lines_list:
for related_movement in related_movement_list: if(related_line.getParentValue().getReference() == data_ingestion.getReference()):
log("One related movement: " + str(related_movement)) aggregate_set.update(related_line.getAggregateSet())
log("related_movement.getAgregateSet: " + str(related_movement.getAggregateSet())) related_line.getParentValue().deliver()
log("Related movement PARENT: " + str(related_movement.getParentValue())) log("DATA INGESTION DELIVERED")
if(related_movement.getParentValue().getReference() == data_ingestion.getReference()):
log("related movement corresponds to current data_ingestion!")
log("delivering...")
aggregate_set.update(related_movement.getAggregateSet())
log("updating related_movement.getAggregateSet to aggregate_set")
log("aggregate_set content: " + str(aggregate_set))
related_movement.getParentValue().deliver()
else: else:
log("QUANTITY >= 0!!!!!! It is an output line!!")
# it is an output line # it is an output line
# create new item based on item_type # create new item based on item_type: data array, stream, descriptor, etc.
item_type = resource.getAggregatedPortalType() item_type = resource.getAggregatedPortalType()
module = portal.getDefaultModule(item_type) module = portal.getDefaultModule(item_type)
item = module.newContent(portal_type = item_type, item = module.newContent(portal_type = item_type,
title = transformation.getTitle(), title = data_ingestion.getTitle(),
reference = "%s-%s" %(transformation.getTitle(), #title = data_ingestion.getReference(),
data_ingestion.getReference()), reference = data_ingestion.getReference(),
version = '001') version = '001')
#raise ValueError("item: " + str(item.getReference())) if "Data Descriptor" not in item_type:
item.validate() item.validate()
log("creating item type: " + str(item_type)) log("Creating " + str(item_type))
aggregate_set = set() aggregate_set = set()
aggregate_set.add(item) aggregate_set.add(item)
log("adding created item to aggregate_set")
log("aggregate_set content: " + str(aggregate_set))
#raise ValueError("aggregate_set: " + str(aggregate_set))
log("Adding Analysis Line: " + str(transformation_line.getTitle()))
data_analysis.newContent( data_analysis.newContent(
portal_type = "Data Analysis Line", portal_type = "Data Analysis Line",
title = transformation_line.getTitle(), title = transformation_line.getTitle(),
...@@ -128,6 +88,6 @@ for movement in portal_catalog(**query_dict): ...@@ -128,6 +88,6 @@ for movement in portal_catalog(**query_dict):
quantity = quantity, quantity = quantity,
quantity_unit = transformation_line.getQuantityUnit(), quantity_unit = transformation_line.getQuantityUnit(),
aggregate_value_set = aggregate_set) aggregate_value_set = aggregate_set)
log("data_analysis.plan() - ref " + str(data_analysis.getReference()))
data_analysis.plan() data_analysis.plan()
log("DATA ANALYSIS PLANNED")
...@@ -2,10 +2,9 @@ portal = context.getPortalObject() ...@@ -2,10 +2,9 @@ portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
log("Excecute Analisys Script")
for data_analysis in portal_catalog(portal_type = "Data Analysis", for data_analysis in portal_catalog(portal_type = "Data Analysis",
simulation_state = "planned"): simulation_state = "planned"):
data_analysis.start() data_analysis.start()
log("DATA ANALYSIS STARTED")
data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\ data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\
.DataAnalysis_executeDataOperation() .DataAnalysis_executeDataOperation()
...@@ -3,8 +3,6 @@ from Products.ERP5Type.DateUtils import addToDate ...@@ -3,8 +3,6 @@ from Products.ERP5Type.DateUtils import addToDate
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
log("Stop Ingestion Script")
portal_catalog = context.getPortalObject().portal_catalog portal_catalog = context.getPortalObject().portal_catalog
# search for dates older than one minute ago # search for dates older than one minute ago
...@@ -16,13 +14,9 @@ kw_dict = {"query": start_date_query, ...@@ -16,13 +14,9 @@ kw_dict = {"query": start_date_query,
parent_uid_list = [x.getUid() for x in portal_catalog(**kw_dict)] parent_uid_list = [x.getUid() for x in portal_catalog(**kw_dict)]
#di = portal_catalog(**kw_dict)[0]
#log("DI.state = " +str(di.getSimulationState()))
if len(parent_uid_list) != 0: if len(parent_uid_list) != 0:
log("Data ingestions found: " + str(len(parent_uid_list))) log("Stoping %s ingestions..." %(str(len(parent_uid_list))))
kw_dict = {"portal_type": "Data Ingestion Line", kw_dict = {"portal_type": "Data Ingestion Line",
#"stock.quantity": '!=0', # this is always FALSE!!!!!!!
"resource_portal_type": "Data Product", "resource_portal_type": "Data Product",
"parent_uid": parent_uid_list} "parent_uid": parent_uid_list}
...@@ -38,6 +32,7 @@ if len(parent_uid_list) != 0: ...@@ -38,6 +32,7 @@ if len(parent_uid_list) != 0:
# should be probably done by consistency constraint # should be probably done by consistency constraint
# probably it can be done in generic way using required item type # probably it can be done in generic way using required item type
# on resource. # on resource.
"""
if data_ingestion_line.DataIngestionLine_needRequiredItem(): if data_ingestion_line.DataIngestionLine_needRequiredItem():
raise ValueError("DataIngestionLine_needRequiredItem: " + "TRUE") raise ValueError("DataIngestionLine_needRequiredItem: " + "TRUE")
batch = data_ingestion_line.getAggregateDataIngestionBatchValue() batch = data_ingestion_line.getAggregateDataIngestionBatchValue()
...@@ -65,19 +60,14 @@ if len(parent_uid_list) != 0: ...@@ -65,19 +60,14 @@ if len(parent_uid_list) != 0:
# we need to wait until there are 2 batches until we can stop it # we need to wait until there are 2 batches until we can stop it
# TODO: this should be implemented in transformation, not here # TODO: this should be implemented in transformation, not here
continue continue
"""
# Only stop data ingestion of related resource is configured for batch
# ingestion
# TODO: probably this should be done in another way
resource = data_ingestion_line.getResourceValue() resource = data_ingestion_line.getResourceValue()
log("data_ingestion: " + str(data_ingestion.getReference())) log("data_ingestion: " + str(data_ingestion.getReference()))
log("resource: " + str(resource.getReference())) log("resource: " + str(resource.getReference()))
if "big_data/ingestion/batch_ingestion" in resource.getUseList(): if "big_data/ingestion/batch_ingestion" in resource.getUseList():
#if "big_data/ingestion" in resource.getUseList():
#raise ValueError("big_data/ingestion: " + "TRUE")
log("resource.getUseList: big_data/ingestion/batch_ingestion")
data_ingestion.setStopDate(DateTime()) data_ingestion.setStopDate(DateTime())
data_ingestion.stop() data_ingestion.stop()
log("Ingestion stopped") log("DATA INGESTION STOPPED")
else: else:
raise ValueError("big_data/ingestion/batch_ingestion: " + "FALSE") raise ValueError("resource.getUseList must be 'big_data/ingestion/batch_ingestion'")
...@@ -6,65 +6,59 @@ today_string = now.strftime('%Y%m%d') ...@@ -6,65 +6,59 @@ today_string = now.strftime('%Y%m%d')
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
data_ingestion_reference = movement_dict.get('reference', reference) # removing eof from reference
data_ingestion_id = "%s-%s" %(today_string, data_ingestion_reference) reference = '_'.join(reference.split('.')[:-1])
end_part = movement_dict.get('end', "EOF") log("Reference: " + reference)
data_ingestion_reference = reference
data_ingestion_id = '%s_%s' %(data_ingestion_reference, today_string)
eof = movement_dict.get('eof', 'EOF')
resource_reference = movement_dict.get('resource_reference', None) resource_reference = movement_dict.get('resource_reference', None)
specialise_reference = movement_dict.get('specialise_reference', None) specialise_reference = movement_dict.get('specialise_reference', None)
dataset_reference = movement_dict.get('aggregate_data_set_reference', None) dataset_reference = movement_dict.get('aggregate_data_set_reference', None)
log("SCRIPT PARAMETERS: data_ingestion_reference %s; end %s; data_ingestion_id %s; resource_reference %s; specialise_reference %s; dataset_reference %s" % (data_ingestion_reference, end_part, data_ingestion_id, resource_reference, specialise_reference, dataset_reference))
# first search for applicable data ingestion # first search for applicable data ingestion
data_ingestion = portal_catalog.getResultValue( data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion', portal_type = 'Data Ingestion',
simulation_state = 'planned', simulation_state = 'planned',
reference =data_ingestion_reference) reference = data_ingestion_reference)
log("Data ingestion: " + str(data_ingestion)) log("Planned Data Ingestion found for reference: " + str(data_ingestion))
if data_ingestion is None: if data_ingestion is None:
log("Data ingestion is NONE. Searching document by DI ID with date")
document = portal.data_ingestion_module.get(data_ingestion_id) document = portal.data_ingestion_module.get(data_ingestion_id)
log("Document: " + str(document)) log("Data Ingestion by id-date found: " + str(document))
if (document is not None):
log("Document is NOT NONE")
log("Document.getSimulationState: " + str(document.getSimulationState()))
if (document is not None): if (document is not None):
if document.getSimulationState() == 'planned': if document.getSimulationState() == 'planned':
data_ingestion = document data_ingestion = document
else: else:
raise ValueError("Could not create Data Ingestion for reference %s. An older ingestion for that file was already done. Please review the reference." % data_ingestion_reference) raise ValueError("An older ingestion for reference %s was already done." % data_ingestion_reference)
if data_ingestion is None: if data_ingestion is None:
log("data_ingestion is NONE! Searching for a validated Data Supply...")
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults( specialise_value_list = [x.getObject() for x in portal_catalog.searchResults(
portal_type = 'Data Supply', portal_type = 'Data Supply',
reference = specialise_reference, reference = specialise_reference,
validation_state = 'validated')] validation_state = 'validated')]
log("Supply list: " + str(specialise_value_list))
# if we do not find a validated data supply, we look for a default data supply # if we do not find a validated data supply, we look for a default data supply
if not specialise_value_list: if not specialise_value_list:
log("SUPPLY is NONE! Searching for a default Data Supply...")
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults( specialise_value_list = [x.getObject() for x in portal_catalog.searchResults(
portal_type = 'Data Supply', portal_type = 'Data Supply',
reference = specialise_reference, reference = specialise_reference,
validation_state = 'default')] validation_state = 'default')]
context.log(specialise_value_list) log("Data Suppliers list: " + str(specialise_value_list))
# create a new data ingestion # create a new data ingestion
log("Creating a new Data Ingestion") log("Creating a new Data Ingestion")
data_ingestion = portal.data_ingestion_module.newContent( data_ingestion = portal.data_ingestion_module.newContent(
id = data_ingestion_id, id = data_ingestion_id,
portal_type = "Data Ingestion", portal_type = "Data Ingestion",
title = movement_dict.get('filename', data_ingestion_reference),
reference = data_ingestion_reference, reference = data_ingestion_reference,
start_date = now, start_date = now,
specialise_value_list = specialise_value_list) specialise_value_list = specialise_value_list)
property_list = ["title", property_list = ["title",
"source", "source",
"source_section", "source_section",
...@@ -73,17 +67,15 @@ if data_ingestion is None: ...@@ -73,17 +67,15 @@ if data_ingestion is None:
"destination_section", "destination_section",
"destination_project", "destination_project",
"specialise"] "specialise"]
composed = data_ingestion.asComposedDocument() composed = data_ingestion.asComposedDocument()
data_ingestion.edit(**{p: composed.getProperty(p) for p in property_list}) data_ingestion.edit(**{p: composed.getProperty(p) for p in property_list})
# create ingestion lines from specialise lines and assign input line # create ingestion lines from specialise lines and assign input line
# and operation line # and operation line
log("Creating ingestion lines from supply lines and assigning input line and operation line")
input_line = None input_line = None
operation_line = None operation_line = None
for supply_line in composed.objectValues( for supply_line in composed.objectValues(portal_type = 'Data Supply Line'):
portal_type = 'Data Supply Line'):
current_line = data_ingestion.newContent( current_line = data_ingestion.newContent(
portal_type = "Data Ingestion Line", portal_type = "Data Ingestion Line",
title = supply_line.getTitle(), title = supply_line.getTitle(),
...@@ -93,102 +85,51 @@ if data_ingestion is None: ...@@ -93,102 +85,51 @@ if data_ingestion is None:
reference = supply_line.getReference(), reference = supply_line.getReference(),
resource = supply_line.getResource(), resource = supply_line.getResource(),
) )
log("CURRENT LINE: " + str(current_line) + " - Resource: " + str(current_line.getResourceReference())) #log("CURRENT LINE: " + str(current_line) + " - Resource: " + str(current_line.getResourceReference()))
if current_line.getResourceReference() == resource_reference: if current_line.getResourceReference() == resource_reference:
log("CURRENT LINE is an INPUT LINE") #log("CURRENT LINE is an INPUT LINE")
input_line = current_line input_line = current_line
elif current_line.getResourceValue().getPortalType() == "Data Operation": elif current_line.getResourceValue().getPortalType() == "Data Operation":
log("CURRENT LINE is an D.OPERATION LINE") #log("CURRENT LINE is an D.OPERATION LINE")
operation_line = current_line operation_line = current_line
else: else:
log("CURRENT LINE nor INPUT or D.OP LINE, SET QUANTITY TO 0") #log("CURRENT LINE nor INPUT or D.OP LINE, SET QUANTITY TO 0")
# we set quantity=0 for the empty line # we set quantity=0 for the empty line
current_line.setQuantity(0) current_line.setQuantity(0)
# copy device and configuration from operation line to input line # copy device and configuration from operation line to input line
log("copy device and configuration from operation line to input line") #log("copy device and configuration from operation line to input line")
input_line.setAggregateSet( input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList()) input_line.getAggregateList() + operation_line.getAggregateList())
log("input_line.getAggregateList(): " + str(input_line.getAggregateList()) + " - operation_line.getAggregateList(): " + str(operation_line.getAggregateList())) #log("input_line.getAggregateList(): " + str(input_line.getAggregateList()))
#log("operation_line.getAggregateList(): " + str(operation_line.getAggregateList()))
# Check if we have a batch reference
log("Checking for a batch reference")
data_ingestion_batch_reference = movement_dict.get(
'aggregate_data_ingestion_batch_reference',
None)
data_ingestion_batch_id = "%s-%s" %(today_string,
data_ingestion_batch_reference)
data_stream = None
if data_ingestion_batch_reference is not None:
log("data_ingestion_batch_reference IS NOT NONE!")
log("data_ingestion_batch_reference: " + str(data_ingestion_batch_reference))
data_ingestion_batch = portal_catalog.getResultValue(
portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference)
log("data_ingestion_batch: " + str(data_ingestion_batch))
raise ValueError("STOP! data_ingestion_batch found!!!!")
if data_ingestion_batch is None:
data_ingestion_batch = portal.data_ingestion_batch_module.get(
data_ingestion_batch_id)
if data_ingestion_batch is None:
data_ingestion_batch = portal.data_ingestion_batch_module.newContent(
id = data_ingestion_batch_id,
portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference)
else: data_stream = portal.data_stream_module.newContent(
previous_data_ingestion_line = portal_catalog.getResultValue( portal_type = "Data Stream",
portal_type = "Data Ingestion Line", reference = data_ingestion_reference)
resource_reference = resource_reference, data_stream.validate()
aggregate_uid = data_ingestion_batch.getUid())
log("Creating a new Data Stream")
if previous_data_ingestion_line is not None:
data_stream = previous_data_ingestion_line\
.getAggregateDataStreamValue()
input_line.setDefaultAggregateValue(data_ingestion_batch)
log("Checking Data Stream: " + str(data_stream))
if data_stream is None:
log("Data Stream is NONE")
data_stream = portal.data_stream_module.newContent(
portal_type = "Data Stream",
reference = data_ingestion_reference)
data_stream.validate()
log("Data Stream CREATED and linked to input_line.")
log("Data stream id : " + str(data_stream.getId()))
log("Data stream ref : " + str(data_stream.getReference()))
input_line.setDefaultAggregateValue(data_stream) input_line.setDefaultAggregateValue(data_stream)
data_ingestion.plan() data_ingestion.plan()
log("DATA INGESTION PLANNED.") log("DATA INGESTION PLANNED.")
data_set = None
if dataset_reference is not None: if dataset_reference is not None:
log("getting data set by ref: " + dataset_reference)
data_set = portal_catalog.getResultValue( data_set = portal_catalog.getResultValue(
portal_type = "Data Set", portal_type = "Data Set",
reference = dataset_reference) reference = dataset_reference)
log("Data Set found for dataset reference: " + dataset_reference)
if data_set is None: if data_set is None:
log("data set None. creating data set..") log("Creating a new Data Set")
data_set = portal.data_set_module.newContent( data_set = portal.data_set_module.newContent(
portal_type = "Data Set", portal_type = "Data Set",
reference = dataset_reference) reference = dataset_reference)
data_set.validate() data_set.validate()
#else: #data_sink stuff
log("data set linked to input line: " + input_line.getReference())
input_line.setDefaultAggregateValue(data_set) input_line.setDefaultAggregateValue(data_set)
else: else:
log("Data ingestion is NOT NONE: " + str(data_ingestion))
log("Getting input and operation lines from DI")
# find ingestion line for current resource # find ingestion line for current resource
for line in data_ingestion.objectValues(portal_type="Data Ingestion Line"): for line in data_ingestion.objectValues(portal_type="Data Ingestion Line"):
if line.getResourceReference() == resource_reference: if line.getResourceReference() == resource_reference:
...@@ -199,12 +140,8 @@ else: ...@@ -199,12 +140,8 @@ else:
data_operation = operation_line.getResourceValue() data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue() data_stream = input_line.getAggregateDataStreamValue()
log("Ending SCRIPT") if eof == "EOF":
log("data_operation: " + str(data_operation))
log("data_stream: " + str(data_stream))
if end_part == "EOF":
data_ingestion.start() data_ingestion.start()
log("DATA INGESTION STARTED") log("DATA INGESTION STARTED")
return data_operation, {'data_stream': data_stream} return data_operation, {'data_stream': data_stream}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment