Commit 8a89e89d authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: all ingestion objects shares id

- some cleanup in logs
- some try catch for better error log
parent 88be7c22
...@@ -10,17 +10,15 @@ kw_dict = {"portal_type": "Data Set", ...@@ -10,17 +10,15 @@ kw_dict = {"portal_type": "Data Set",
# data set # data set
for data_set in portal_catalog(**kw_dict): for data_set in portal_catalog(**kw_dict):
if not data_set.getReference().endswith("_invalid"): if not data_set.getReference().endswith("_invalid"):
context.logEntry("Invalidating data set and dependencies for: " + data_set.getReference()) context.logEntry("Deleted data set found. Invalidating data set '%s' and dependencies." % data_set.getReference())
log("Ivalid Dataset: " + str(data_set.getReference()))
reference_query = Query(**{'reference': data_set.getReference()+'/%'}) reference_query = Query(**{'reference': data_set.getReference()+'/%'})
kw_dict = {"portal_type": "Data Stream", kw_dict = {"portal_type": "Data Stream",
"query": reference_query} "query": reference_query}
for data_stream in portal_catalog(**kw_dict): for data_stream in portal_catalog(**kw_dict):
if data_stream.getReference().startswith(data_set.getReference()+'/'): if data_stream.getReference().startswith(data_set.getReference()+'/'):
log("Related data stream: " + data_stream.getReference())
portal.ERP5Site_invalidateIngestionObjects(data_stream.getReference()) portal.ERP5Site_invalidateIngestionObjects(data_stream.getReference())
log("Invalidating data set")
data_set.setReference(data_set.getReference() + "_invalid") data_set.setReference(data_set.getReference() + "_invalid")
context.logEntry("Data set '%s' invalidated." % data_set.getReference())
# data streams # data streams
kw_dict = {"portal_type": "Data Stream", kw_dict = {"portal_type": "Data Stream",
...@@ -28,8 +26,7 @@ kw_dict = {"portal_type": "Data Stream", ...@@ -28,8 +26,7 @@ kw_dict = {"portal_type": "Data Stream",
for data_stream in portal_catalog(**kw_dict): for data_stream in portal_catalog(**kw_dict):
if not data_stream.getReference().endswith("_invalid"): if not data_stream.getReference().endswith("_invalid"):
log("Ivalid Data Stream: " + str(data_stream.getReference())) context.logEntry("Deleted data stream found: " + data_stream.getReference())
context.logEntry("Ivalid Data Stream: " + str(data_stream.getReference()))
portal.ERP5Site_invalidateIngestionObjects(data_stream.getReference()) portal.ERP5Site_invalidateIngestionObjects(data_stream.getReference())
...@@ -49,12 +46,15 @@ for data_ingestion in portal_catalog(portal_type="Data Ingestion", **catalog_kw) ...@@ -49,12 +46,15 @@ for data_ingestion in portal_catalog(portal_type="Data Ingestion", **catalog_kw)
"validation_state": "Validated"} "validation_state": "Validated"}
for data_stream in portal_catalog(**kw_dict): for data_stream in portal_catalog(**kw_dict):
if not data_stream.getReference().endswith("_invalid"): if not data_stream.getReference().endswith("_invalid"):
log("%s %s (id:%s) invalidated" % (data_stream.getPortalType(), data_stream.getReference(), data_stream.getId()))
data_stream.setReference(data_stream.getReference() + "_invalid") data_stream.setReference(data_stream.getReference() + "_invalid")
try: try:
data_stream.invalidate() data_stream.invalidate()
except: except:
pass # fails if it's already invalidated context.logEntry("[WARNING] Could not invalidate data stream '%s', it was already invalidated" % data_stream.getId())
data_ingestion.cancel() context.logEntry("%s %s (id:%s) invalidated" % (data_stream.getPortalType(), data_stream.getReference(), data_stream.getId()))
log("%s %s (id:%s) invalidated and canceled" % (data_ingestion.getPortalType(), data_ingestion.getReference(), data_ingestion.getId())) try:
data_ingestion.setReference(data_ingestion.getReference() + "_invalid") data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
data_ingestion.cancel()
except:
context.logEntry("[WARNING] Could not invalidate/cancel data ingestion '%s', it was already invalidated/cancelled" % data_ingestion.getId())
context.logEntry("%s %s (id:%s) invalidated and cancelled" % (data_ingestion.getPortalType(), data_ingestion.getReference(), data_ingestion.getId()))
...@@ -18,79 +18,83 @@ for line_data_ingestion in portal_catalog(**query_dict): ...@@ -18,79 +18,83 @@ for line_data_ingestion in portal_catalog(**query_dict):
validation_state = "validated", validation_state = "validated",
resource_relative_url = line_data_ingestion.getResource()): resource_relative_url = line_data_ingestion.getResource()):
# Create Analysis # Create Analysis
log("creating Data Analysis for Data Ingestion " + str(data_ingestion.getReference())) try:
data_analysis = portal.data_analysis_module.newContent( log("creating Data Analysis for Data Ingestion " + str(data_ingestion.getReference()))
portal_type = "Data Analysis", data_analysis = portal.data_analysis_module.newContent(
title = "%s - %s" %(transformation.getTitle(),data_ingestion.getTitle()), portal_type = "Data Analysis",
reference = data_ingestion.getReference(), id = data_ingestion.getId(),
start_date = now, title = "%s - %s" %(transformation.getTitle(),data_ingestion.getTitle()),
specialise_value = transformation, reference = data_ingestion.getReference(),
causality_value = data_ingestion, start_date = now,
source = data_ingestion.getSource(), specialise_value = transformation,
source_section = data_ingestion.getSourceSection(), causality_value = data_ingestion,
source_project = data_ingestion.getSourceProject(), source = data_ingestion.getSource(),
destination = data_ingestion.getDestination(), source_section = data_ingestion.getSourceSection(),
destination_section = data_ingestion.getDestinationSection(), source_project = data_ingestion.getSourceProject(),
destination_project = data_ingestion.getDestinationProject()) destination = data_ingestion.getDestination(),
context.logEntry("Data Analyisis created for Data Ingestion %s (ID: %s)" % (str(data_ingestion.getReference()), data_analysis.getId())) destination_section = data_ingestion.getDestinationSection(),
destination_project = data_ingestion.getDestinationProject())
context.logEntry("Data Analyisis created for Data Ingestion %s (ID: %s)" % (str(data_ingestion.getReference()), data_analysis.getId()))
# create input and output lines # create input and output lines
log("creating input and output lines") log("creating input and output lines")
for transformation_line in transformation.objectValues( for transformation_line in transformation.objectValues(
portal_type=["Data Transformation Resource Line", portal_type=["Data Transformation Resource Line",
"Data Transformation Operation Line"]): "Data Transformation Operation Line"]):
resource = transformation_line.getResourceValue() resource = transformation_line.getResourceValue()
quantity = transformation_line.getQuantity() quantity = transformation_line.getQuantity()
if isinstance(quantity, tuple): if isinstance(quantity, tuple):
quantity = quantity[0] quantity = quantity[0]
aggregate_set = set() aggregate_set = set()
# manually add device and device configuration to every line # manually add device and device configuration to every line
if line_data_ingestion.getAggregateDevice() is not None: if line_data_ingestion.getAggregateDevice() is not None:
aggregate_set.add(line_data_ingestion.getAggregateDevice()) aggregate_set.add(line_data_ingestion.getAggregateDevice())
if line_data_ingestion.getAggregateDeviceConfiguration() is not None: if line_data_ingestion.getAggregateDeviceConfiguration() is not None:
aggregate_set.add(line_data_ingestion.getAggregateDeviceConfiguration()) aggregate_set.add(line_data_ingestion.getAggregateDeviceConfiguration())
if transformation_line.getPortalType() == "Data Transformation Resource Line": if transformation_line.getPortalType() == "Data Transformation Resource Line":
# at the moment, we only check for positive or negative quantity # at the moment, we only check for positive or negative quantity
if quantity < 0: if quantity < 0:
# it is an input line. If it is an input resource line, then we search for an # it is an input line. If it is an input resource line, then we search for an
# ingestion line with the same resource. If it is an operation line # ingestion line with the same resource. If it is an operation line
# then we search for an ingestion line with resource portal type Data Product # then we search for an ingestion line with resource portal type Data Product
related_lines_list = portal_catalog( related_lines_list = portal_catalog(
portal_type="Data Ingestion Line", portal_type="Data Ingestion Line",
simulation_state="stopped", simulation_state="stopped",
resource_relative_url = resource.getRelativeUrl()) resource_relative_url = resource.getRelativeUrl())
for related_line in related_lines_list: for related_line in related_lines_list:
if(related_line.getParentValue().getReference() == data_ingestion.getReference() and related_line.getParentValue().getSimulationState() == "stopped"): if(related_line.getParentValue().getReference() == data_ingestion.getReference() and related_line.getParentValue().getSimulationState() == "stopped"):
aggregate_set.update(related_line.getAggregateSet()) aggregate_set.update(related_line.getAggregateSet())
related_line.getParentValue().deliver() related_line.getParentValue().deliver()
log("DATA INGESTION DELIVERED") log("DATA INGESTION DELIVERED")
context.logEntry("Data Ingestion '%s' delivered." % data_ingestion.getId()) context.logEntry("Data Ingestion '%s' delivered." % data_ingestion.getId())
else: else:
# it is an output line # it is an output line
# create new item based on item_type: data array, stream, descriptor, etc. # create new item based on item_type: data array, stream, descriptor, etc.
item_type = resource.getAggregatedPortalType() item_type = resource.getAggregatedPortalType()
module = portal.getDefaultModule(item_type) module = portal.getDefaultModule(item_type)
item = module.newContent(portal_type = item_type, item = module.newContent(portal_type = item_type,
title = data_ingestion.getTitle(), title = data_ingestion.getTitle(),
id = data_ingestion.getId(), id = data_ingestion.getId(),
reference = data_ingestion.getReference(), reference = data_ingestion.getReference(),
version = '001') version = '001')
if "Data Descriptor" not in item_type: if "Data Descriptor" not in item_type:
item.validate() item.validate()
log("Creating " + str(item_type)) log("Creating " + str(item_type))
context.logEntry(str(item_type) + " created.") context.logEntry(str(item_type) + " created.")
aggregate_set = set() aggregate_set = set()
aggregate_set.add(item) aggregate_set.add(item)
data_analysis.newContent( data_analysis.newContent(
portal_type = "Data Analysis Line", portal_type = "Data Analysis Line",
title = transformation_line.getTitle(), title = transformation_line.getTitle(),
reference = transformation_line.getReference(), reference = transformation_line.getReference(),
int_index = transformation_line.getIntIndex(), int_index = transformation_line.getIntIndex(),
resource_value = resource, resource_value = resource,
quantity = quantity, quantity = quantity,
quantity_unit = transformation_line.getQuantityUnit(), quantity_unit = transformation_line.getQuantityUnit(),
aggregate_value_set = aggregate_set) aggregate_value_set = aggregate_set)
data_analysis.plan() data_analysis.plan()
log("DATA ANALYSIS PLANNED") log("DATA ANALYSIS PLANNED")
except Exception as e:
context.logEntry("[ERROR] Error creating Data Analysis for Data Ingestion '%s': %s" % (data_ingestion.getId(), str(e)))
...@@ -4,8 +4,11 @@ from Products.ERP5Type.Log import log ...@@ -4,8 +4,11 @@ from Products.ERP5Type.Log import log
for data_analysis in portal_catalog(portal_type = "Data Analysis", for data_analysis in portal_catalog(portal_type = "Data Analysis",
simulation_state = "planned"): simulation_state = "planned"):
if data_analysis.getSimulationState() == "planned": try:
data_analysis.start() if data_analysis.getSimulationState() == "planned":
log("DATA ANALYSIS STARTED") data_analysis.start()
data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\ log("DATA ANALYSIS STARTED")
.DataAnalysis_executeDataOperation() data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\
.DataAnalysis_executeDataOperation()
except Exception as e:
context.logEntry("[ERROR] Error executing Data Analysis for '%s': %s" % (data_analysis.getId()), str(e))
...@@ -4,7 +4,6 @@ from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery ...@@ -4,7 +4,6 @@ from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
log("Invalidating related objects for reference " + reference)
context.logEntry("Invalidating objects for reference: " + reference) context.logEntry("Invalidating objects for reference: " + reference)
portal_type_query = ComplexQuery(Query(portal_type='Data Stream'), portal_type_query = ComplexQuery(Query(portal_type='Data Stream'),
......
...@@ -24,14 +24,20 @@ if len(parent_uid_list) != 0: ...@@ -24,14 +24,20 @@ if len(parent_uid_list) != 0:
parent_uid_list = [x.getUid() for x in portal_catalog(**kw_dict)] parent_uid_list = [x.getUid() for x in portal_catalog(**kw_dict)]
for data_ingestion_line in portal_catalog(**kw_dict): for data_ingestion_line in portal_catalog(**kw_dict):
data_ingestion = data_ingestion_line.getParentValue() try:
resource = data_ingestion_line.getResourceValue() data_ingestion = data_ingestion_line.getParentValue()
log("data_ingestion: " + str(data_ingestion.getReference())) resource = data_ingestion_line.getResourceValue()
log("resource: " + str(resource.getReference())) log("data_ingestion: " + str(data_ingestion.getReference()))
if "big_data/ingestion/batch_ingestion" in resource.getUseList(): log("resource: " + str(resource.getReference()))
data_ingestion.setStopDate(DateTime()) if "big_data/ingestion/batch_ingestion" in resource.getUseList():
data_ingestion.stop() if data_ingestion.getSimulationState() == 'started':
log("DATA INGESTION STOPPED") data_ingestion.setStopDate(DateTime())
context.logEntry("Data Ingestion '%s' stopped." % data_ingestion.getId()) data_ingestion.stop()
else: log("DATA INGESTION STOPPED")
raise ValueError("resource.getUseList must be 'big_data/ingestion/batch_ingestion'") context.logEntry("Data Ingestion '%s' stopped." % data_ingestion.getId())
else:
context.logEntry("[WARNING] Could not stop Data Ingestion '%s' because it is already stopped." % data_ingestion.getId())
else:
raise ValueError("resource.getUseList must be 'big_data/ingestion/batch_ingestion'")
except Exception as e:
context.logEntry("[ERROR] Error stopping Data Ingestion '%s': %s." % (data_ingestion.getId()), str(e))
...@@ -2,14 +2,15 @@ from DateTime import DateTime ...@@ -2,14 +2,15 @@ from DateTime import DateTime
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
now = DateTime() now = DateTime()
today_string = now.strftime('%Y%m%d-%H%M%S') now_string = now.strftime('%Y%m%d-%H%M%S-%f')[:-3]
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
try: try:
# remove supplier and eof from reference # remove supplier and eof from reference
reference = '/'.join(reference.split('/')[1:-1]) #'_'.join(reference.split('.')[:-1]) reference = '/'.join(reference.split('/')[1:-1])
log("Reference: " + reference) log("Reference: " + reference)
context.logEntry("Data Ingestion reference: %s" % reference) context.logEntry("Data Ingestion reference: %s" % reference)
...@@ -19,7 +20,7 @@ try: ...@@ -19,7 +20,7 @@ try:
specialise_reference = movement_dict.get('specialise_reference', None) specialise_reference = movement_dict.get('specialise_reference', None)
extension = movement_dict.get('extension', None) extension = movement_dict.get('extension', None)
dataset_reference = movement_dict.get('aggregate_data_set_reference', None) dataset_reference = movement_dict.get('aggregate_data_set_reference', None)
data_ingestion_id = '%s_%s_%s' %(specialise_reference, data_ingestion_reference.replace("/","_"), today_string) data_ingestion_id = '%s_%s_%s' %(specialise_reference, data_ingestion_reference.replace("/","_"), now_string)
context.logEntry("Data Ingestion ID: %s" % reference) context.logEntry("Data Ingestion ID: %s" % reference)
# search for applicable data ingestion # search for applicable data ingestion
...@@ -89,7 +90,8 @@ try: ...@@ -89,7 +90,8 @@ try:
data_stream = portal.data_stream_module.newContent( data_stream = portal.data_stream_module.newContent(
portal_type = "Data Stream", portal_type = "Data Stream",
title = "%s.%s" % (data_ingestion.getTitle(), extension), id = data_ingestion_id,
title = "%s%s" % (data_ingestion.getTitle(), "."+extension if extension != "none" else ""),
reference = data_ingestion_reference) reference = data_ingestion_reference)
data_stream.validate() data_stream.validate()
...@@ -147,6 +149,5 @@ try: ...@@ -147,6 +149,5 @@ try:
return data_operation, {'data_stream': data_stream} return data_operation, {'data_stream': data_stream}
except Exception as e: except Exception as e:
context.logEntry("[ERROR] At script IngestionPolicy_getIngestionOperationAndParameterDict") context.logEntry("[ERROR] Error during ingestion policy operation: " + str(e))
context.logEntry("[ERROR] " + str(e))
raise e raise e
...@@ -48,7 +48,10 @@ class TestDataIngestion(SecurityTestCase): ...@@ -48,7 +48,10 @@ class TestDataIngestion(SecurityTestCase):
return self.REF_PREFIX + reference + extension return self.REF_PREFIX + reference + extension
def sanitizeReference(self, reference): def sanitizeReference(self, reference):
return reference.replace("/", "_"), '/'.join(reference.split('/')[1:]) ingestion_reference = '/'.join(reference.split('/')[1:])
data_stream = self.getDataStream(ingestion_reference)
ingestion_id = data_stream.getId()
return ingestion_id, ingestion_reference
def chunks(self, l, n): def chunks(self, l, n):
for i in xrange(0, len(l), n): for i in xrange(0, len(l), n):
...@@ -102,26 +105,15 @@ class TestDataIngestion(SecurityTestCase): ...@@ -102,26 +105,15 @@ class TestDataIngestion(SecurityTestCase):
return None return None
def manuallyStopIngestionWorkaround(self, reference, now_time): def manuallyStopIngestionWorkaround(self, reference, now_time):
# TODO: replace the while with an aproach similar to: try:
# https://lab.nexedi.com/nexedi/erp5/blob/master/bt5/erp5_scalability_test/SkinTemplateItem/portal_skins/erp5_scalability_test/ERP5Site_getScalabilityTestMetric.py#L6 data_ingestion = self.portal.portal_catalog.getResultValue(
done = False portal_type = 'Data Ingestion',
retry = 0 reference = reference)
while not done and retry < 60: if data_ingestion.getSimulationState() == "started":
try: data_ingestion.stop()
now = now_time.strftime('%Y%m%d-%H%M%S') self.tic()
data_ingestion_id = "%s_%s" %(reference, now) except:
url = 'data_ingestion_module/' + data_ingestion_id raise StandardError("Could not find ingestion with reference %s" % reference)
data_ingestion = self.context.restrictedTraverse(url)
if data_ingestion.getSimulationState() == "started":
data_ingestion.stop()
self.tic()
done = True
except KeyError: # due id uses time, now_time must be sincronized
retry += 1
now_time += timedelta(0,1)
log("KeyError while manuallyStopIngestionWorkaround with now %s, retrying..." % now)
if not done:
raise StandardError("Could not find ingestion")
def simulateIngestionAlarm(self, reference, now): def simulateIngestionAlarm(self, reference, now):
self.portal.ERP5Site_stopIngestionList() self.portal.ERP5Site_stopIngestionList()
...@@ -144,7 +136,7 @@ class TestDataIngestion(SecurityTestCase): ...@@ -144,7 +136,7 @@ class TestDataIngestion(SecurityTestCase):
self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.EOF, data_chunk, ingestion_policy) self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.EOF, data_chunk, ingestion_policy)
ingestion_id, ingestion_reference = self.sanitizeReference(ingestion_reference) ingestion_id, ingestion_reference = self.sanitizeReference(ingestion_reference)
self.simulateIngestionAlarm(ingestion_id, now) self.simulateIngestionAlarm(ingestion_reference, now)
return ingestion_reference return ingestion_reference
def checkDataObjects(self, ingestion_reference, data_chunk, array, json_data): def checkDataObjects(self, ingestion_reference, data_chunk, array, json_data):
......
...@@ -46,14 +46,17 @@ ...@@ -46,14 +46,17 @@
<key> <string>text_content_warning_message</string> </key> <key> <string>text_content_warning_message</string> </key>
<value> <value>
<tuple> <tuple>
<string>W:168, 34: Unused variable \'i\' (unused-variable)</string> <string>W:137, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:168, 76: Unused variable \'j\' (unused-variable)</string> <string>W:160, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:191, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string> <string>W:160, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:191, 4: Reimport \'numpy\' (imported line 9) (reimported)</string> <string>W:183, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string>
<string>W:207, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string> <string>W:183, 4: Reimport \'numpy\' (imported line 9) (reimported)</string>
<string>W:211, 10: No exception type(s) specified (bare-except)</string> <string>W:199, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string>
<string>W:219, 26: Unused variable \'e\' (unused-variable)</string> <string>W:203, 10: No exception type(s) specified (bare-except)</string>
<string>W:276, 4: Unused variable \'ingestion_id\' (unused-variable)</string> <string>W:211, 26: Unused variable \'e\' (unused-variable)</string>
<string>W:268, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W: 8, 0: Unused timedelta imported from datetime (unused-import)</string>
<string>W: 13, 0: Unused log imported from Products.ERP5Type.Log (unused-import)</string>
</tuple> </tuple>
</value> </value>
</item> </item>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment