Commit 3014e80e authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: minor fixes after 10k-files-dataset ingestion

parent 09639815
...@@ -34,6 +34,10 @@ ...@@ -34,6 +34,10 @@
<tuple/> <tuple/>
</value> </value>
</item> </item>
<item>
<key> <string>periodicity_hour_frequency</string> </key>
<value> <int>1</int> </value>
</item>
<item> <item>
<key> <string>periodicity_minute</string> </key> <key> <string>periodicity_minute</string> </key>
<value> <value>
...@@ -44,7 +48,9 @@ ...@@ -44,7 +48,9 @@
</item> </item>
<item> <item>
<key> <string>periodicity_minute_frequency</string> </key> <key> <string>periodicity_minute_frequency</string> </key>
<value> <int>5</int> </value> <value>
<none/>
</value>
</item> </item>
<item> <item>
<key> <string>periodicity_month</string> </key> <key> <string>periodicity_month</string> </key>
......
...@@ -4,15 +4,15 @@ from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery ...@@ -4,15 +4,15 @@ from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
# invalidate old (more than 5 min) pending ingestions (e.g. split ingestions that were canceled/interrumped and no resumed) # invalidate old (more than 5 hours) pending ingestions (e.g. split ingestions that were canceled/interrumped and no resumed)
from DateTime import DateTime from DateTime import DateTime
now = DateTime() now = DateTime()
now_minus_max = now - 1.0/24/60*9999 now_minus_max = now - 1.0/24/60*9999
now_minus_5 = now - 1.0/24/60*5 now_minus_5 = now - 1.0/24/60*60*5
catalog_kw = {'creation_date': {'query': (now_minus_max, now_minus_5), 'range': 'minmax'}, 'simulation_state': 'started', 'portal_type': 'Data Ingestion'} catalog_kw = {'creation_date': {'query': (now_minus_max, now_minus_5), 'range': 'minmax'}, 'simulation_state': 'started', 'portal_type': 'Data Ingestion'}
for data_ingestion in portal_catalog(**catalog_kw): for data_ingestion in portal_catalog(**catalog_kw):
# search related data ingestions that are not old yet (less than 5 min) # search related data ingestions that are not old yet (less than 5 hours)
catalog_kw = {'creation_date': {'query': (now_minus_5, DateTime()), 'range': 'minmax'}, catalog_kw = {'creation_date': {'query': (now_minus_5, DateTime()), 'range': 'minmax'},
'simulation_state': 'started', 'simulation_state': 'started',
'portal_type': 'Data Ingestion', 'portal_type': 'Data Ingestion',
......
...@@ -19,22 +19,26 @@ for line_data_ingestion in portal_catalog(**query_dict): ...@@ -19,22 +19,26 @@ for line_data_ingestion in portal_catalog(**query_dict):
resource_relative_url = line_data_ingestion.getResource()): resource_relative_url = line_data_ingestion.getResource()):
# Create Analysis # Create Analysis
try: try:
log("creating Data Analysis for Data Ingestion " + str(data_ingestion.getReference())) try:
data_analysis = portal.data_analysis_module.newContent( log("creating Data Analysis for Data Ingestion " + str(data_ingestion.getReference()))
portal_type = "Data Analysis", data_analysis = portal.data_analysis_module.newContent(
id = data_ingestion.getId(), portal_type = "Data Analysis",
title = "%s - %s" %(transformation.getTitle(),data_ingestion.getTitle()), id = data_ingestion.getId(),
reference = data_ingestion.getReference(), title = "%s - %s" %(transformation.getTitle(),data_ingestion.getTitle()),
start_date = now, reference = data_ingestion.getReference(),
specialise_value = transformation, start_date = now,
causality_value = data_ingestion, specialise_value = transformation,
source = data_ingestion.getSource(), causality_value = data_ingestion,
source_section = data_ingestion.getSourceSection(), source = data_ingestion.getSource(),
source_project = data_ingestion.getSourceProject(), source_section = data_ingestion.getSourceSection(),
destination = data_ingestion.getDestination(), source_project = data_ingestion.getSourceProject(),
destination_section = data_ingestion.getDestinationSection(), destination = data_ingestion.getDestination(),
destination_project = data_ingestion.getDestinationProject()) destination_section = data_ingestion.getDestinationSection(),
context.logEntry("Data Analyisis created for Data Ingestion %s (ID: %s)" % (str(data_ingestion.getReference()), data_analysis.getId())) destination_project = data_ingestion.getDestinationProject())
context.logEntry("Data Analyisis created for Data Ingestion %s (ID: %s)" % (str(data_ingestion.getReference()), data_analysis.getId()))
except:
context.logEntry("[ERROR] Error creating Data Analysis for Data Ingestion '%s'. Script returned" % data_ingestion.getId())
return # Data Analysis was already created
# create input and output lines # create input and output lines
log("creating input and output lines") log("creating input and output lines")
......
...@@ -40,10 +40,11 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -40,10 +40,11 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
if data_stream is not None: if data_stream is not None:
hash_value = getHash(data_stream) hash_value = getHash(data_stream)
data_stream.setVersion(hash_value) data_stream.setVersion(hash_value)
data_ingestion.stop()
if data_stream.getValidationState() != "validated": if data_stream.getValidationState() != "validated":
data_stream.validate() data_stream.validate()
context.logEntry("Data Ingestion %s stopped." % data_ingestion.getId()) if data_ingestion.getSimulationState() == "started":
data_ingestion.stop()
context.logEntry("Data Ingestion %s stopped." % data_ingestion.getId())
# append split ingestions # append split ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion", for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
...@@ -79,7 +80,8 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -79,7 +80,8 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
for ingestion in related_split_ingestions: for ingestion in related_split_ingestions:
if ingestion.getId() == full_data_stream.getId(): if ingestion.getId() == full_data_stream.getId():
ingestion.stop() if ingestion.getSimulationState() == "started":
ingestion.stop()
else: else:
ingestion.setReference(ingestion.getReference() + "_invalid") ingestion.setReference(ingestion.getReference() + "_invalid")
ingestion.deliver() ingestion.deliver()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment