Commit 985884a3 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: remove debug logs

parent bae5c56d
...@@ -13,7 +13,6 @@ CHUNK_SIZE_TXT = 50000 ...@@ -13,7 +13,6 @@ CHUNK_SIZE_TXT = 50000
CHUNK_SIZE_CSV = 25 CHUNK_SIZE_CSV = 25
def saveRawFile(data_stream, file_name): def saveRawFile(data_stream, file_name):
log("Writting raw data file...")
data_stream_chunk = None data_stream_chunk = None
n_chunk = 0 n_chunk = 0
chunk_size = CHUNK_SIZE chunk_size = CHUNK_SIZE
...@@ -28,7 +27,6 @@ def saveRawFile(data_stream, file_name): ...@@ -28,7 +27,6 @@ def saveRawFile(data_stream, file_name):
with open(file_name, 'a') as fif_file: with open(file_name, 'a') as fif_file:
fif_file.write(data_stream_chunk) fif_file.write(data_stream_chunk)
n_chunk += 1 n_chunk += 1
log("Done.")
def getJSONfromTextFile(file_name): def getJSONfromTextFile(file_name):
try: try:
...@@ -88,11 +86,9 @@ def processFifData(file_name, data_array, data_descriptor): ...@@ -88,11 +86,9 @@ def processFifData(file_name, data_array, data_descriptor):
try: try:
json_report = getMNEReportJSON(file_name, raw) json_report = getMNEReportJSON(file_name, raw)
data_descriptor.setTextContent(json_report) data_descriptor.setTextContent(json_report)
log("Data Descriptor content saved")
except Exception as e: except Exception as e:
log("Error handling Data Descriptor content: " + str(e)) log("Error handling Data Descriptor content: " + str(e))
log("Saving raw data in Data Array...")
picks = mne.pick_types(raw.info) picks = mne.pick_types(raw.info)
if len(picks) == 0: raise StandardError("The raw data does not contain any element") if len(picks) == 0: raise StandardError("The raw data does not contain any element")
data, times = raw[picks[:1]] # get data from first pick to get shape data, times = raw[picks[:1]] # get data from first pick to get shape
...@@ -113,7 +109,6 @@ def processTextData(file_name, data_array, data_descriptor): ...@@ -113,7 +109,6 @@ def processTextData(file_name, data_array, data_descriptor):
try: try:
json_report = getJSONfromTextFile(file_name) json_report = getJSONfromTextFile(file_name)
data_descriptor.setTextContent(json_report) data_descriptor.setTextContent(json_report)
log("Data Descriptor text content saved")
except Exception as e: except Exception as e:
log("Error handling Data Descriptor content: " + str(e)) log("Error handling Data Descriptor content: " + str(e))
...@@ -166,11 +161,9 @@ def processCsvData(file_name, data_array, data_descriptor, delimiter=","): ...@@ -166,11 +161,9 @@ def processCsvData(file_name, data_array, data_descriptor, delimiter=","):
appendArray(chunk, data_array, columns, initialize) appendArray(chunk, data_array, columns, initialize)
# so returning after first chunk # so returning after first chunk
data_descriptor.setTextContent(json_data) data_descriptor.setTextContent(json_data)
log("Data Descriptor content saved")
return return
initialize = False initialize = False
data_descriptor.setTextContent(json_data) data_descriptor.setTextContent(json_data)
log("Data Descriptor content saved")
except Exception as e: except Exception as e:
log("Error handling csv Data Descriptor content: " + str(e)) log("Error handling csv Data Descriptor content: " + str(e))
...@@ -203,7 +196,6 @@ def processNiiData(file_name, data_array, data_descriptor, gz=False): ...@@ -203,7 +196,6 @@ def processNiiData(file_name, data_array, data_descriptor, gz=False):
pass # ignore non serializable info pass # ignore non serializable info
json_data = json.dumps(data) json_data = json.dumps(data)
data_descriptor.setTextContent(json_data) data_descriptor.setTextContent(json_data)
log("Data Descriptor nii content saved")
except Exception as e: except Exception as e:
log("Error handling Data Descriptor nii content: " + str(e)) log("Error handling Data Descriptor nii content: " + str(e))
raise e raise e
...@@ -217,9 +209,6 @@ def processGZData(file_name, data_array, data_descriptor): ...@@ -217,9 +209,6 @@ def processGZData(file_name, data_array, data_descriptor):
raise KeyError("gz") raise KeyError("gz")
def processRawData(data_stream, data_array, data_descriptor, reference_extension): def processRawData(data_stream, data_array, data_descriptor, reference_extension):
import time
start = time.time()
content = {"File content":"empty"} content = {"File content":"empty"}
if data_stream.getSize() == 0: if data_stream.getSize() == 0:
data_descriptor.setTextContent(json.dumps(content)) data_descriptor.setTextContent(json.dumps(content))
...@@ -243,7 +232,6 @@ def processRawData(data_stream, data_array, data_descriptor, reference_extension ...@@ -243,7 +232,6 @@ def processRawData(data_stream, data_array, data_descriptor, reference_extension
"default" : processTextData, "default" : processTextData,
} }
try: try:
log("Processing data...")
if reference_extension in options: if reference_extension in options:
options[reference_extension](file_name, data_array, data_descriptor) options[reference_extension](file_name, data_array, data_descriptor)
else: else:
...@@ -256,7 +244,5 @@ def processRawData(data_stream, data_array, data_descriptor, reference_extension ...@@ -256,7 +244,5 @@ def processRawData(data_stream, data_array, data_descriptor, reference_extension
if os.path.exists(file_name): if os.path.exists(file_name):
os.remove(file_name) os.remove(file_name)
elapsed = time.time() - start
log("Done. Time elapsed: " + str(elapsed))
return "Raw data processed in %s seconds." % str(elapsed) return "Raw data processed."
\ No newline at end of file \ No newline at end of file
...@@ -52,13 +52,13 @@ ...@@ -52,13 +52,13 @@
<key> <string>text_content_warning_message</string> </key> <key> <string>text_content_warning_message</string> </key>
<value> <value>
<tuple> <tuple>
<string>W: 75, 2: No exception type(s) specified (bare-except)</string> <string>W: 73, 2: No exception type(s) specified (bare-except)</string>
<string>W: 80, 4: No exception type(s) specified (bare-except)</string> <string>W: 78, 4: No exception type(s) specified (bare-except)</string>
<string>W: 98, 8: Unused variable \'times\' (unused-variable)</string> <string>W: 94, 8: Unused variable \'times\' (unused-variable)</string>
<string>W:135, 6: No exception type(s) specified (bare-except)</string> <string>W:130, 6: No exception type(s) specified (bare-except)</string>
<string>W:171, 8: Unreachable code (unreachable)</string> <string>W:165, 8: Unreachable code (unreachable)</string>
<string>W:190, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string> <string>W:183, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string>
<string>W:194, 10: No exception type(s) specified (bare-except)</string> <string>W:187, 10: No exception type(s) specified (bare-except)</string>
</tuple> </tuple>
</value> </value>
</item> </item>
......
from Products.ERP5Type.Log import log
import base64 import base64
log("Data_chunk size: %s" % str(len(data_chunk)))
decoded = base64.b64decode(data_chunk) decoded = base64.b64decode(data_chunk)
log("Decoded data_chunk size: %s" % str(len(decoded)))
log("Appending to data stream: %s - reference: %s" % (data_stream, data_stream.getReference()))
data_stream.appendData(decoded) data_stream.appendData(decoded)
log("Ingested data successfully appended.")
context.logEntry("Datachunk (size %s) appended to Data Stream." % str(len(decoded)))
from Products.ERP5Type.Log import log
log("Processing raw data from Data Stream " + str(input_stream_data.getReference()))
context.logEntry("Processing raw data from Data Stream %s" % str(input_stream_data.getReference()))
reference_extension = input_stream_data.getReference().split("/")[-1] reference_extension = input_stream_data.getReference().split("/")[-1]
result = str(context.processRawData(input_stream_data, output_array, output_descriptor, reference_extension)) result = str(context.processRawData(input_stream_data, output_array, output_descriptor, reference_extension))
log(result)
context.logEntry("Result: %s" % result)
from Products.ERP5Type.Log import log
context.logEntry("[NEW INGESTION]")
context.logEntry("Reference received: %s" % reference)
record = reference.rsplit('/') record = reference.rsplit('/')
length = len(record) length = len(record)
if (length < 7): if (length < 7):
context.logEntry("[ERROR] Data Ingestion reference is not well formated") context.logEntry("[ERROR] In HandleFifEmbulkIngestion: Data Ingestion reference is not well formated")
raise ValueError("Data Ingestion reference is not well formated.") raise ValueError("Data Ingestion reference is not well formated.")
invalid_chars = ["&", ";", "#", "%", '"', "+"] invalid_chars = ["&", ";", "#", "%", '"', "+"]
for char in invalid_chars: for char in invalid_chars:
if char in reference: if char in reference:
context.logEntry("[ERROR] Data Ingestion reference contains chars that are not allowed") context.logEntry("[ERROR] In HandleFifEmbulkIngestion: Data Ingestion reference contains chars that are not allowed")
raise ValueError("Data Ingestion reference contains chars that are not allowed.") raise ValueError("Data Ingestion reference contains chars that are not allowed.")
...@@ -35,7 +30,4 @@ dict = { 'filename': filename, ...@@ -35,7 +30,4 @@ dict = { 'filename': filename,
'hash': hash 'hash': hash
} }
log("Returning dictionary: %s." % dict)
context.logEntry("Parameter dictionary: %s" % dict)
return dict return dict
from Products.ERP5Type.Log import log
portal = context.getPortalObject() portal = context.getPortalObject()
portal.ERP5Site_stopIngestionList() portal.ERP5Site_stopIngestionList()
portal.ERP5Site_createDataAnalysisList() portal.ERP5Site_createDataAnalysisList()
......
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
return
# This alarm was deprecated - kept for test
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
...@@ -19,7 +21,6 @@ for data_ingestion in portal_catalog(**catalog_kw): ...@@ -19,7 +21,6 @@ for data_ingestion in portal_catalog(**catalog_kw):
'reference': data_ingestion.getReference()} 'reference': data_ingestion.getReference()}
invalidate = True invalidate = True
if len(portal_catalog(**catalog_kw)) > 0: if len(portal_catalog(**catalog_kw)) > 0:
context.logEntry("Data Ingestion %s is old but it has related data ingestions that are not, so split ingestion is still in course. It won't be invalidated." % data_ingestion.getId())
invalidate = False invalidate = False
if invalidate: if invalidate:
...@@ -33,11 +34,9 @@ for data_ingestion in portal_catalog(**catalog_kw): ...@@ -33,11 +34,9 @@ for data_ingestion in portal_catalog(**catalog_kw):
data_stream.invalidate() data_stream.invalidate()
except: except:
context.logEntry("[WARNING] Could not invalidate data stream '%s', it was already invalidated or draft" % data_stream.getId()) context.logEntry("[WARNING] Could not invalidate data stream '%s', it was already invalidated or draft" % data_stream.getId())
context.logEntry("%s %s (id:%s) invalidated" % (data_stream.getPortalType(), data_stream.getReference(), data_stream.getId()))
try: try:
if not data_ingestion.getReference().endswith("_invalid"): if not data_ingestion.getReference().endswith("_invalid"):
data_ingestion.setReference(data_ingestion.getReference() + "_invalid") data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
data_ingestion.deliver() data_ingestion.deliver()
except: except:
context.logEntry("[WARNING] Could not invalidate/deliver data ingestion '%s', it was already invalidated/deliver" % data_ingestion.getId()) context.logEntry("[WARNING] Could not invalidate/deliver data ingestion '%s', it was already invalidated/deliver" % data_ingestion.getId())
context.logEntry("%s %s (id:%s) invalidated and delivered" % (data_ingestion.getPortalType(), data_ingestion.getReference(), data_ingestion.getId()))
from Products.ERP5Type.Log import log
operation = None operation = None
parameter_dict = {} parameter_dict = {}
...@@ -10,7 +8,6 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"): ...@@ -10,7 +8,6 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
else: else:
resource_portal_type = '' resource_portal_type = ''
if resource_portal_type == 'Data Operation': if resource_portal_type == 'Data Operation':
log("Getting Data Operation and Resources from Data Analysis...")
operation_analysis_line = analysis_line operation_analysis_line = analysis_line
operation = analysis_line.getResourceValue() operation = analysis_line.getResourceValue()
else: else:
...@@ -21,6 +18,5 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"): ...@@ -21,6 +18,5 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
parameter_dict[reference] = aggregate parameter_dict[reference] = aggregate
script_id = operation.getScriptId() script_id = operation.getScriptId()
log("Calling script: " + str(script_id))
getattr(operation_analysis_line, script_id)(**parameter_dict) getattr(operation_analysis_line, script_id)(**parameter_dict)
context.stop() context.stop()
...@@ -20,7 +20,6 @@ for line_data_ingestion in portal_catalog(**query_dict): ...@@ -20,7 +20,6 @@ for line_data_ingestion in portal_catalog(**query_dict):
# Create Analysis # Create Analysis
try: try:
try: try:
context.logEntry("creating Data Analysis for Data Ingestion " + str(data_ingestion.getReference()))
data_analysis = portal.data_analysis_module.newContent( data_analysis = portal.data_analysis_module.newContent(
portal_type = "Data Analysis", portal_type = "Data Analysis",
id = data_ingestion.getId(), id = data_ingestion.getId(),
...@@ -35,14 +34,12 @@ for line_data_ingestion in portal_catalog(**query_dict): ...@@ -35,14 +34,12 @@ for line_data_ingestion in portal_catalog(**query_dict):
destination = data_ingestion.getDestination(), destination = data_ingestion.getDestination(),
destination_section = data_ingestion.getDestinationSection(), destination_section = data_ingestion.getDestinationSection(),
destination_project = data_ingestion.getDestinationProject()) destination_project = data_ingestion.getDestinationProject())
context.logEntry("Data Analyisis created for Data Ingestion %s (ID: %s)" % (str(data_ingestion.getReference()), data_ingestion.getId()))
except Exception as e: except Exception as e:
context.logEntry("[WARNING] Exception creating Data Analysis (already created?): " + str(e)) log(''.join(["[WARNING] Exception creating Data Analysis (already created?): ", str(e)]))
data_analysis = None data_analysis = None
if data_analysis is not None: if data_analysis is not None:
# create input and output lines # create input and output lines
context.logEntry("creating input and output lines of data analysis %s (ID: %s)" % (str(data_ingestion.getReference()), data_ingestion.getId()))
for transformation_line in transformation.objectValues( for transformation_line in transformation.objectValues(
portal_type=["Data Transformation Resource Line", portal_type=["Data Transformation Resource Line",
"Data Transformation Operation Line"]): "Data Transformation Operation Line"]):
...@@ -70,7 +67,6 @@ for line_data_ingestion in portal_catalog(**query_dict): ...@@ -70,7 +67,6 @@ for line_data_ingestion in portal_catalog(**query_dict):
if(related_line.getParentValue().getReference() == data_ingestion.getReference() and related_line.getParentValue().getSimulationState() == "stopped"): if(related_line.getParentValue().getReference() == data_ingestion.getReference() and related_line.getParentValue().getSimulationState() == "stopped"):
aggregate_set.update(related_line.getAggregateSet()) aggregate_set.update(related_line.getAggregateSet())
related_line.getParentValue().deliver() related_line.getParentValue().deliver()
context.logEntry("Data Ingestion '%s' delivered. (ID: %s)" % (str(data_ingestion.getReference()), data_ingestion.getId()))
else: else:
# it is an output line # it is an output line
# create new item based on item_type: data array, stream, descriptor, etc. # create new item based on item_type: data array, stream, descriptor, etc.
...@@ -83,7 +79,6 @@ for line_data_ingestion in portal_catalog(**query_dict): ...@@ -83,7 +79,6 @@ for line_data_ingestion in portal_catalog(**query_dict):
version = '001') version = '001')
if "Data Descriptor" not in item_type: if "Data Descriptor" not in item_type:
item.validate() item.validate()
context.logEntry(str(item_type) + " %s created (ID: %s)" % (str(data_ingestion.getReference()), data_ingestion.getId()))
aggregate_set = set() aggregate_set = set()
aggregate_set.add(item) aggregate_set.add(item)
...@@ -98,6 +93,5 @@ for line_data_ingestion in portal_catalog(**query_dict): ...@@ -98,6 +93,5 @@ for line_data_ingestion in portal_catalog(**query_dict):
aggregate_value_set = aggregate_set) aggregate_value_set = aggregate_set)
data_analysis.plan() data_analysis.plan()
context.logEntry("Data Anaysis '%s' planned. (ID: %s)" % (str(data_ingestion.getReference()), data_ingestion.getId()))
except Exception as e: except Exception as e:
context.logEntry("[ERROR] Error creating Data Analysis for Data Ingestion '%s' (ID: %s): %s" % (data_ingestion.getReference(), data_ingestion.getId(), str(e))) context.logEntry("[ERROR] Error creating Data Analysis for Data Ingestion '%s' (ID: %s): %s" % (data_ingestion.getReference(), data_ingestion.getId(), str(e)))
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
from Products.ERP5Type.Log import log
for data_analysis in portal_catalog(portal_type = "Data Analysis", for data_analysis in portal_catalog(portal_type = "Data Analysis",
simulation_state = "planned"): simulation_state = "planned"):
try: try:
if data_analysis.getSimulationState() == "planned": if data_analysis.getSimulationState() == "planned":
data_analysis.start() data_analysis.start()
context.logEntry("Data Analysis %s started." % data_analysis.getReference())
data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\ data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\
.DataAnalysis_executeDataOperation() .DataAnalysis_executeDataOperation()
except Exception as e: except Exception as e:
......
...@@ -5,6 +5,5 @@ data_set = portal.data_set_module.get(reference) ...@@ -5,6 +5,5 @@ data_set = portal.data_set_module.get(reference)
if data_set is not None: if data_set is not None:
version = int(data_set.getVersion()) + 1 version = int(data_set.getVersion()) + 1
data_set.setVersion("%03d" % (version,)) data_set.setVersion("%03d" % (version,))
context.logEntry("Dataset %s increased version to %03d" % (data_set.getId(), version))
else: else:
context.logEntry("Fail to increase dataset version. No dataset found for reference '%s'" % (reference)) context.logEntry("Fail to increase dataset version. No dataset found for reference '%s'" % (reference))
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
context.logEntry("Invalidating objects for reference: " + reference)
portal_type_query = ComplexQuery(Query(portal_type='Data Stream'), portal_type_query = ComplexQuery(Query(portal_type='Data Stream'),
Query(portal_type='Data Array'), Query(portal_type='Data Array'),
Query(portal_type='Data Descriptor'), Query(portal_type='Data Descriptor'),
...@@ -14,12 +11,13 @@ portal_type_query = ComplexQuery(Query(portal_type='Data Stream'), ...@@ -14,12 +11,13 @@ portal_type_query = ComplexQuery(Query(portal_type='Data Stream'),
logical_operator="OR") logical_operator="OR")
kw_dict = {"query": portal_type_query, kw_dict = {"query": portal_type_query,
"reference": reference} "reference": reference}
try:
for document in portal_catalog(**kw_dict): for document in portal_catalog(**kw_dict):
if not document.getReference().endswith("_invalid"): if not document.getReference().endswith("_invalid"):
context.logEntry("%s %s (id:%s) invalidated" % (document.getPortalType(), document.getReference(), document.getId())) document.setReference(document.getReference() + "_invalid")
document.setReference(document.getReference() + "_invalid") try:
try: document.invalidate()
document.invalidate() except:
except: pass # fails if it's already invalidated, draft or if it doens't allow invalidation (e.g. DI)
pass # fails if it's already invalidated, draft or if it doens't allow invalidation (e.g. DI) except Exception as e:
context.logEntry("[ERROR] Error invalidating objects for reference '%s': %s" % (reference, str(e)))
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery
portal = context.getPortalObject() portal = context.getPortalObject()
...@@ -7,7 +6,6 @@ portal_catalog = portal.portal_catalog ...@@ -7,7 +6,6 @@ portal_catalog = portal.portal_catalog
try: try:
if success: # full split ingestions successfully appendend if success: # full split ingestions successfully appendend
# invalidate old ingestion objects # invalidate old ingestion objects
context.logEntry("Invalidating old data ingestion objects after split ingestion for reference: " + reference)
data_stream = portal_catalog.getResultValue( data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream', portal_type = 'Data Stream',
reference = reference, reference = reference,
...@@ -18,26 +16,21 @@ try: ...@@ -18,26 +16,21 @@ try:
id = data_stream.getId()) id = data_stream.getId())
data_stream.setReference(data_stream.getReference() + "_invalid") data_stream.setReference(data_stream.getReference() + "_invalid")
data_stream.invalidate() data_stream.invalidate()
log("Data Stream invalidated")
if not data_ingestion.getReference().endswith("_invalid"): if not data_ingestion.getReference().endswith("_invalid"):
data_ingestion.setReference(data_ingestion.getReference() + "_invalid") data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
log("Data ingestion invalidated")
data_an = portal_catalog.getResultValue( data_an = portal_catalog.getResultValue(
portal_type = 'Data Analysis', portal_type = 'Data Analysis',
id = data_stream.getId()) id = data_stream.getId())
if data_an != None: if data_an != None:
data_an.setReference(data_an.getReference() + "_invalid") data_an.setReference(data_an.getReference() + "_invalid")
log("Data Analysis invalidated")
data_array = portal_catalog.getResultValue( data_array = portal_catalog.getResultValue(
portal_type = 'Data Array', portal_type = 'Data Array',
id = data_stream.getId()) id = data_stream.getId())
if data_array != None: if data_array != None:
data_array.setReference(data_array.getReference() + "_invalid") data_array.setReference(data_array.getReference() + "_invalid")
data_array.invalidate() data_array.invalidate()
log("Data Array invalidated")
else: # split ingestion interrumped and restarted else: # split ingestion interrumped and restarted
# invalidate draft datastreams and old started data ingestions # invalidate draft datastreams and old started data ingestions
context.logEntry("Invalidating old split data ingestions and data streams for reference: " + reference)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion", for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started", simulation_state = "started",
reference = reference): reference = reference):
...@@ -50,5 +43,5 @@ try: ...@@ -50,5 +43,5 @@ try:
if not data_stream.getReference().endswith("_invalid"): if not data_stream.getReference().endswith("_invalid"):
data_stream.setReference(data_stream.getReference() + "_invalid") data_stream.setReference(data_stream.getReference() + "_invalid")
except Exception as e: except Exception as e:
log("ERROR in ERP5Site_invalidateSplitIngestions: " + str(e)) context.logEntry("ERROR in ERP5Site_invalidateSplitIngestions: " + str(e))
pass pass
from Products.ERP5Type.Log import log
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
context.logEntry("Renaming requested: '%s' -> '%s'" % (reference, new_reference))
# check new reference # check new reference
data_ingestions = portal_catalog(portal_type = "Data Ingestion", reference = new_reference) data_ingestions = portal_catalog(portal_type = "Data Ingestion", reference = new_reference)
if len(data_ingestions) > 0: raise "Error renaming: new reference '%s' already exists." % new_reference if len(data_ingestions) > 0: raise "Error renaming: new reference '%s' already exists." % new_reference
......
...@@ -33,7 +33,6 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -33,7 +33,6 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion", related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1: if len(related_split_ingestions) == 1:
context.logEntry("Started single ingestion (not split) found: %s - reference: %s" % (data_ingestion.getId(), data_ingestion.getReference()))
data_stream = portal_catalog.getResultValue( data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream', portal_type = 'Data Stream',
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
...@@ -44,26 +43,24 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -44,26 +43,24 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
data_stream.validate() data_stream.validate()
if data_ingestion.getSimulationState() == "started": if data_ingestion.getSimulationState() == "started":
data_ingestion.stop() data_ingestion.stop()
context.logEntry("Data Ingestion %s stopped. Reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
# append split ingestions # append split ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion", for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started", simulation_state = "started",
id = "%001"): id = "%001"):
if not data_ingestion.getReference().endswith("_invalid"): if not data_ingestion.getReference().endswith("_invalid"):
context.logEntry("Started split ingestion found: %s - reference: %s" % (data_ingestion.getId(), data_ingestion.getReference()))
try: try:
last_data_stream_id = "" last_data_stream_id = ""
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference(), validation_state="draft") query = Query(portal_type="Data Stream", reference=data_ingestion.getReference(), validation_state="draft")
result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),)) result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
full_data_stream = None full_data_stream = None
for data_stream in result_list: for data_stream in result_list:
log("data_stream.getId(): " + data_stream.getId()) log(''.join(["Data stream for split ingestion: ", data_stream.getId()]))
if data_stream.getId() == data_ingestion.getId(): if data_stream.getId() == data_ingestion.getId():
log("is base data stream (001)") log("It is base data stream (001)")
full_data_stream = data_stream full_data_stream = data_stream
else: else:
log("is NOT base data stream (!=001)") log("It is not base data stream, it is a part (!=001)")
if full_data_stream != None: if full_data_stream != None:
log("appending content to base data stream...") log("appending content to base data stream...")
full_data_stream.appendData(data_stream.getData()) full_data_stream.appendData(data_stream.getData())
...@@ -85,7 +82,6 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -85,7 +82,6 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
else: else:
ingestion.setReference(ingestion.getReference() + "_invalid") ingestion.setReference(ingestion.getReference() + "_invalid")
ingestion.deliver() ingestion.deliver()
context.logEntry("Chunks of split ingestion where appended into Data Stream %s. Reference: %s. Corresponding Data Ingestion stopped." % (full_data_stream.getId(), full_data_stream.getReference()))
except Exception as e: except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference())) context.logEntry("ERROR appending split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
context.logEntry(e) context.logEntry(e)
from DateTime import DateTime from DateTime import DateTime
from Products.ERP5Type.Log import log
now = DateTime() now = DateTime()
now_string = now.strftime('%Y%m%d-%H%M%S-%f')[:-3] now_string = now.strftime('%Y%m%d-%H%M%S-%f')[:-3]
...@@ -10,7 +9,6 @@ portal_catalog = portal.portal_catalog ...@@ -10,7 +9,6 @@ portal_catalog = portal.portal_catalog
try: try:
# remove supplier, eof, size and hash from reference # remove supplier, eof, size and hash from reference
reference = '/'.join(reference.split('/')[1:-3]) reference = '/'.join(reference.split('/')[1:-3])
context.logEntry("Data Ingestion reference: %s" % reference)
data_ingestion_reference = reference data_ingestion_reference = reference
eof = movement_dict.get('eof', 'END') if movement_dict.get('eof', 'END') != "" else 'END' eof = movement_dict.get('eof', 'END') if movement_dict.get('eof', 'END') != "" else 'END'
...@@ -19,7 +17,6 @@ try: ...@@ -19,7 +17,6 @@ try:
extension = movement_dict.get('extension', None) extension = movement_dict.get('extension', None)
dataset_reference = movement_dict.get('dataset_reference', None) dataset_reference = movement_dict.get('dataset_reference', None)
data_ingestion_id = '%s_%s_%s_%s' %(supplier, dataset_reference, now_string, eof) data_ingestion_id = '%s_%s_%s_%s' %(supplier, dataset_reference, now_string, eof)
context.logEntry("Data Ingestion ID: %s" % data_ingestion_id)
size = movement_dict.get('size', None) if movement_dict.get('size', None) != "" else None size = movement_dict.get('size', None) if movement_dict.get('size', None) != "" else None
hash_value = movement_dict.get('hash', None) if movement_dict.get('hash', None) != "" else None hash_value = movement_dict.get('hash', None) if movement_dict.get('hash', None) != "" else None
...@@ -30,22 +27,6 @@ try: ...@@ -30,22 +27,6 @@ try:
if data_ingestion is not None: if data_ingestion is not None:
if data_ingestion.getSimulationState() != 'started': if data_ingestion.getSimulationState() != 'started':
modified = False
if size != None and size != "":
data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream',
validation_state = 'validated',
reference = data_ingestion_reference)
if data_stream != None:
if size != data_stream.getSize():
modified = True
elif hash_value != None and hash_value != "" and hash_value != data_stream.getVersion():
modified = True
if not modified:
context.logEntry("An older ingestion for reference %s was already done." % data_ingestion_reference)
context.logEntry("Old file version will be invalidated.")
else:
context.logEntry("Ingestion of modified file. Old version will be invalidated.")
if eof == "END": # if not split (one single ingestion), invalidate old ingestion if eof == "END": # if not split (one single ingestion), invalidate old ingestion
portal.ERP5Site_invalidateIngestionObjects(data_ingestion_reference) portal.ERP5Site_invalidateIngestionObjects(data_ingestion_reference)
...@@ -62,7 +43,6 @@ try: ...@@ -62,7 +43,6 @@ try:
reference = data_ingestion_reference, reference = data_ingestion_reference,
start_date = now, start_date = now,
specialise_value_list = specialise_value_list) specialise_value_list = specialise_value_list)
context.logEntry("Data Ingestion created. ID: %s - Reference: %s" % (data_ingestion_id, data_ingestion_reference))
property_list = ["title", property_list = ["title",
"source", "source",
...@@ -111,14 +91,12 @@ try: ...@@ -111,14 +91,12 @@ try:
title = "%s%s" % (data_ingestion.getTitle(), "."+extension if extension != "none" else ""), title = "%s%s" % (data_ingestion.getTitle(), "."+extension if extension != "none" else ""),
reference = data_ingestion_reference) reference = data_ingestion_reference)
context.logEntry("Data Stream created. ID: %s - Reference: %s" % (data_stream.getId(), data_ingestion_reference))
input_line.setDefaultAggregateValue(data_stream) input_line.setDefaultAggregateValue(data_stream)
if dataset_reference is not None: if dataset_reference is not None:
data_set = portal.data_set_module.get(dataset_reference) data_set = portal.data_set_module.get(dataset_reference)
try: try:
if data_set is None: if data_set is None:
context.logEntry("Data Set created.")
data_set = portal.data_set_module.newContent( data_set = portal.data_set_module.newContent(
portal_type = "Data Set", portal_type = "Data Set",
title = "Data set " + dataset_reference, title = "Data set " + dataset_reference,
...@@ -128,11 +106,8 @@ try: ...@@ -128,11 +106,8 @@ try:
version = "001" version = "001"
) )
data_set.validate() data_set.validate()
else:
context.logEntry("Data Set found for dataset reference: " + dataset_reference)
except: except:
data_set = portal.data_set_module.get(dataset_reference) data_set = portal.data_set_module.get(dataset_reference)
context.logEntry("Data Set found for dataset reference: " + dataset_reference)
if data_set.getReference().endswith("_invalid"): if data_set.getReference().endswith("_invalid"):
data_set.setReference(data_set.getReference().replace("_invalid", "")) data_set.setReference(data_set.getReference().replace("_invalid", ""))
if data_set.getValidationState() == "invalidated": if data_set.getValidationState() == "invalidated":
...@@ -140,7 +115,6 @@ try: ...@@ -140,7 +115,6 @@ try:
input_line.setDefaultAggregateValue(data_set) input_line.setDefaultAggregateValue(data_set)
data_ingestion.start() data_ingestion.start()
context.logEntry("Data Ingestion started.")
data_operation = operation_line.getResourceValue() data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue() data_stream = input_line.getAggregateDataStreamValue()
...@@ -150,5 +124,5 @@ try: ...@@ -150,5 +124,5 @@ try:
return data_operation, {'data_stream': data_stream} return data_operation, {'data_stream': data_stream}
except Exception as e: except Exception as e:
context.logEntry("[ERROR] Error during ingestion policy operation: " + str(e)) context.logEntry(''.join(["[ERROR] Error during ingestion policy operation: ", str(e)]))
raise e raise e
...@@ -11,8 +11,7 @@ try: ...@@ -11,8 +11,7 @@ try:
if data_set is None or data_set.getReference().endswith("_invalid"): if data_set is None or data_set.getReference().endswith("_invalid"):
return { "status_code": 0, "result": [] } return { "status_code": 0, "result": [] }
except Exception as e: # fails because unauthorized access except Exception as e: # fails because unauthorized access
context.logEntry("[ERROR] At script getDataStreamList") log("Unauthorized access to getDataStreamList.")
context.logEntry("[ERROR] " + str(e))
return { "status_code": 1, "error_message": "401 - Unauthorized access. Please check your user credentials and try again." } return { "status_code": 1, "error_message": "401 - Unauthorized access. Please check your user credentials and try again." }
data_set = portal.data_set_module.get(data_set_reference) data_set = portal.data_set_module.get(data_set_reference)
......
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery
portal = context.getPortalObject() portal = context.getPortalObject()
......
...@@ -11,15 +11,13 @@ portal = context.getPortalObject() ...@@ -11,15 +11,13 @@ portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
try: try:
context.logEntry("Checking if reference %s already exists" % reference)
# remove supplier and eof from reference # remove supplier and eof from reference
data_ingestion_reference = '/'.join(reference.split('/')[1:-3]) data_ingestion_reference = '/'.join(reference.split('/')[1:-3])
EOF = reference.split('/')[-3] EOF = reference.split('/')[-3]
size = reference.split('/')[-2] size = reference.split('/')[-2]
if data_ingestion_reference is "": if data_ingestion_reference is "":
context.logEntry("[ERROR] Data Ingestion reference is not well formated") context.logEntry("[ERROR] Data Ingestion reference parameter for ingestionReferenceExists script is not well formated")
raise ValueError("Data Ingestion reference is not well formated") raise ValueError("Data Ingestion reference is not well formated")
# check if there are started ingestions for this reference # check if there are started ingestions for this reference
...@@ -41,8 +39,7 @@ try: ...@@ -41,8 +39,7 @@ try:
return TRUE return TRUE
else: else:
# previous ingestion was interrumped # previous ingestion was interrumped
context.log("[WARNING] User has restarted an interrumpted ingestion for reference %s." % data_ingestion.getReference()) log(''.join(["[WARNING] User has restarted an interrumpted ingestion for reference ", data_ingestion.getReference(), ". Previous split ingestions will be discarted and full ingestion restarted."]))
context.log("[WARNING] Previous split ingestions for reference %s will be discarted and full ingestion restarted." % data_ingestion.getReference())
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False) portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
except: except:
pass pass
...@@ -60,9 +57,7 @@ try: ...@@ -60,9 +57,7 @@ try:
if size != "" and size != None: if size != "" and size != None:
# this is a modified file # this is a modified file
return FALSE return FALSE
context.logEntry("[ERROR] Data Ingestion reference %s already exists" % data_ingestion_reference)
return TRUE return TRUE
except Exception as e: except Exception as e:
context.logEntry("[ERROR] At script ingestionReferenceExists") context.logEntry(''.join(["[ERROR] At script ingestionReferenceExists: ", str(e)]))
context.logEntry("[ERROR] " + str(e))
raise e raise e
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
portal = context.getPortalObject() portal = context.getPortalObject()
...@@ -15,8 +14,6 @@ if object.getPortalType() == "Data Set": ...@@ -15,8 +14,6 @@ if object.getPortalType() == "Data Set":
if data_stream.getReference().startswith(data_set.getReference()+'/') and not data_stream.getReference().endswith("_invalid"): if data_stream.getReference().startswith(data_set.getReference()+'/') and not data_stream.getReference().endswith("_invalid"):
portal.ERP5Site_invalidateIngestionObjects(data_stream.getReference()) portal.ERP5Site_invalidateIngestionObjects(data_stream.getReference())
data_set.setReference(data_set.getReference() + "_invalid") data_set.setReference(data_set.getReference() + "_invalid")
context.logEntry("Data set '%s' invalidated." % data_set.getReference())
elif object.getPortalType() == "Data Stream": elif object.getPortalType() == "Data Stream":
data_stream = object data_stream = object
context.logEntry("Invalidating data stream: " + data_stream.getReference())
portal.ERP5Site_invalidateIngestionObjects(data_stream.getReference()) portal.ERP5Site_invalidateIngestionObjects(data_stream.getReference())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment