Commit 3f6ca327 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: fix split file bug

- new script to handle planned ingestions
- unit test refactoring
- improvement of some queries
- better handling of empty ingested files
- more cleanup of logs
parent 052f61d9
...@@ -160,7 +160,7 @@ def processCsvData(file_name, data_array, data_descriptor, delimiter=","): ...@@ -160,7 +160,7 @@ def processCsvData(file_name, data_array, data_descriptor, delimiter=","):
data["csv"] = chunk data["csv"] = chunk
json_data = json.dumps(data) json_data = json.dumps(data)
except Exception as e: except Exception as e:
log("Error while getting JSON from text file: " + str(e)) log("Error while getting JSON from csv file: " + str(e))
return "" return ""
# super inefficient: takes minutes for a csv file of 5MB (57860 rows) # super inefficient: takes minutes for a csv file of 5MB (57860 rows)
appendArray(chunk, data_array, columns, initialize) appendArray(chunk, data_array, columns, initialize)
...@@ -220,6 +220,11 @@ def processRawData(data_stream, data_array, data_descriptor, reference_extension ...@@ -220,6 +220,11 @@ def processRawData(data_stream, data_array, data_descriptor, reference_extension
import time import time
start = time.time() start = time.time()
content = {"File content":"empty"}
if data_stream.getSize() == 0:
data_descriptor.setTextContent(json.dumps(content))
return "Empty Data Stream"
file_name = "temporal_file_%s" % DateTime().strftime('%Y%m%d-%H%M%S') file_name = "temporal_file_%s" % DateTime().strftime('%Y%m%d-%H%M%S')
try: try:
saveRawFile(data_stream, file_name) saveRawFile(data_stream, file_name)
......
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
portal = context.getPortalObject() portal = context.getPortalObject()
portal.ERP5Site_stopIngestionList() portal.ERP5Site_stopIngestionList()
portal.ERP5Site_startIngestionList()
portal.ERP5Site_createDataAnalysisList() portal.ERP5Site_createDataAnalysisList()
portal.ERP5Site_executeDataAnalysisList() portal.ERP5Site_executeDataAnalysisList()
...@@ -42,15 +42,14 @@ catalog_kw = {'creation_date': {'query': (now_minus_max, now_minus_10), 'range': ...@@ -42,15 +42,14 @@ catalog_kw = {'creation_date': {'query': (now_minus_max, now_minus_10), 'range':
for data_ingestion in portal_catalog(portal_type="Data Ingestion", **catalog_kw): for data_ingestion in portal_catalog(portal_type="Data Ingestion", **catalog_kw):
# invalidate related Data Stream # invalidate related Data Stream
kw_dict = {"portal_type": "Data Stream", kw_dict = {"portal_type": "Data Stream",
"reference": data_ingestion.getReference(), "reference": data_ingestion.getReference()}
"validation_state": "Validated"}
for data_stream in portal_catalog(**kw_dict): for data_stream in portal_catalog(**kw_dict):
if not data_stream.getReference().endswith("_invalid"): if not data_stream.getReference().endswith("_invalid"):
data_stream.setReference(data_stream.getReference() + "_invalid") data_stream.setReference(data_stream.getReference() + "_invalid")
try: try:
data_stream.invalidate() data_stream.invalidate()
except: except:
context.logEntry("[WARNING] Could not invalidate data stream '%s', it was already invalidated" % data_stream.getId()) context.logEntry("[WARNING] Could not invalidate data stream '%s', it was already invalidated or draft" % data_stream.getId())
context.logEntry("%s %s (id:%s) invalidated" % (data_stream.getPortalType(), data_stream.getReference(), data_stream.getId())) context.logEntry("%s %s (id:%s) invalidated" % (data_stream.getPortalType(), data_stream.getReference(), data_stream.getId()))
try: try:
data_ingestion.setReference(data_ingestion.getReference() + "_invalid") data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
......
...@@ -7,7 +7,7 @@ for data_analysis in portal_catalog(portal_type = "Data Analysis", ...@@ -7,7 +7,7 @@ for data_analysis in portal_catalog(portal_type = "Data Analysis",
try: try:
if data_analysis.getSimulationState() == "planned": if data_analysis.getSimulationState() == "planned":
data_analysis.start() data_analysis.start()
log("DATA ANALYSIS STARTED") context.logEntry("Data Analysis %s started." % data_analysis.getReference())
data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\ data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\
.DataAnalysis_executeDataOperation() .DataAnalysis_executeDataOperation()
except Exception as e: except Exception as e:
......
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
# sort manually beacause query doesn't sort
def sorted_by_date(results):
A = [ x for x in results]
for j in range(1,len(A)):
key = A[j]
i = j-1
while (i > -1) and key.getId()[-3:] < A[i].getId()[-3:]:
A[i+1]=A[i]
i=i-1
A[i+1] = key
return A
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "planned",
id = "%EOF"):
context.logEntry("Planned EOF ingestion found: " + data_ingestion.getId())
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "planned",
reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1:
data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream',
reference = data_ingestion.getReference())
if data_stream is not None:
data_stream.validate()
data_ingestion.start()
context.logEntry("Data Ingestion %s started." % data_ingestion.getId())
elif len(related_split_ingestions) > 1:
try:
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference())
result_list = portal_catalog(query=query, sort_on=(('creation_date', 'DESC', 'date'),))
index = 1
# for some reason, the sort query doesn't work (random order)
for data_stream in sorted_by_date(result_list):
if index == 1:
full_data_stream = data_stream
else:
full_data_stream.appendData(data_stream.getData())
#data_stream.update_data("",0)
portal.data_stream_module.deleteContent(data_stream.getId())
index += 1
full_data_stream.validate()
for ingestion in related_split_ingestions:
if ingestion.getId() == full_data_stream.getId():
ingestion.start()
else:
ingestion.cancel()
context.logEntry("Chunks of split ingestion where appended into Data Stream %s. Corresponding Data Ingestion started." % full_data_stream.getId())
except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s" % data_ingestion.getReference())
context.logEntry(e)
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_startIngestionList</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
...@@ -20,7 +20,7 @@ try: ...@@ -20,7 +20,7 @@ try:
specialise_reference = movement_dict.get('specialise_reference', None) specialise_reference = movement_dict.get('specialise_reference', None)
extension = movement_dict.get('extension', None) extension = movement_dict.get('extension', None)
dataset_reference = movement_dict.get('aggregate_data_set_reference', None) dataset_reference = movement_dict.get('aggregate_data_set_reference', None)
data_ingestion_id = '%s_%s_%s' %(specialise_reference, data_ingestion_reference.replace("/","_").replace(" ","_"), now_string) data_ingestion_id = '%s_%s_%s_%s' %(specialise_reference, data_ingestion_reference.replace("/","_").replace(" ","_"), now_string, eof)
context.logEntry("Data Ingestion ID: %s" % reference) context.logEntry("Data Ingestion ID: %s" % reference)
# search for applicable data ingestion # search for applicable data ingestion
...@@ -33,7 +33,7 @@ try: ...@@ -33,7 +33,7 @@ try:
context.logEntry("An older ingestion for reference %s was already done." % data_ingestion_reference) context.logEntry("An older ingestion for reference %s was already done." % data_ingestion_reference)
raise ValueError("An older ingestion for reference %s was already done." % data_ingestion_reference) raise ValueError("An older ingestion for reference %s was already done." % data_ingestion_reference)
if data_ingestion is None: #if data_ingestion is None:
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults( specialise_value_list = [x.getObject() for x in portal_catalog.searchResults(
portal_type = 'Data Supply', portal_type = 'Data Supply',
reference = 'embulk', reference = 'embulk',
...@@ -93,9 +93,7 @@ try: ...@@ -93,9 +93,7 @@ try:
id = data_ingestion_id, id = data_ingestion_id,
title = "%s%s" % (data_ingestion.getTitle(), "."+extension if extension != "none" else ""), title = "%s%s" % (data_ingestion.getTitle(), "."+extension if extension != "none" else ""),
reference = data_ingestion_reference) reference = data_ingestion_reference)
data_stream.validate()
log("Creating new Data Stream. ID: %s" % data_stream.getId())
context.logEntry("Data Stream created. ID: %s" % data_stream.getId()) context.logEntry("Data Stream created. ID: %s" % data_stream.getId())
input_line.setDefaultAggregateValue(data_stream) input_line.setDefaultAggregateValue(data_stream)
...@@ -103,7 +101,6 @@ try: ...@@ -103,7 +101,6 @@ try:
data_set = portal.data_set_module.get(dataset_reference) data_set = portal.data_set_module.get(dataset_reference)
try: try:
if data_set is None: if data_set is None:
log("Creating new Data Set")
context.logEntry("Data Set created.") context.logEntry("Data Set created.")
data_set = portal.data_set_module.newContent( data_set = portal.data_set_module.newContent(
portal_type = "Data Set", portal_type = "Data Set",
...@@ -115,10 +112,10 @@ try: ...@@ -115,10 +112,10 @@ try:
) )
data_set.validate() data_set.validate()
else: else:
log("Data Set found for dataset reference: " + dataset_reference) context.logEntry("Data Set found for dataset reference: " + dataset_reference)
except: except:
data_set = portal.data_set_module.get(dataset_reference) data_set = portal.data_set_module.get(dataset_reference)
log("Data Set found for dataset reference: " + dataset_reference) context.logEntry("Data Set found for dataset reference: " + dataset_reference)
if data_set.getReference().endswith("_invalid"): if data_set.getReference().endswith("_invalid"):
data_set.setReference(data_set.getReference().replace("_invalid", "")) data_set.setReference(data_set.getReference().replace("_invalid", ""))
if data_set.getValidationState() == "invalidated": if data_set.getValidationState() == "invalidated":
...@@ -126,26 +123,25 @@ try: ...@@ -126,26 +123,25 @@ try:
input_line.setDefaultAggregateValue(data_set) input_line.setDefaultAggregateValue(data_set)
data_ingestion.plan() data_ingestion.plan()
log("Data Ingestion planned.")
context.logEntry("Data Ingestion planned.") context.logEntry("Data Ingestion planned.")
else: #else:
log("Planned Data Ingestion found for reference: " + str(data_ingestion)) # log("Planned Data Ingestion found for reference: " + str(data_ingestion))
context.logEntry("Planned Data Ingestion found for reference: " + str(data_ingestion)) # context.logEntry("Planned Data Ingestion found for reference: " + str(data_ingestion))
# find ingestion line for current resource # # find ingestion line for current resource
for line in data_ingestion.objectValues(portal_type="Data Ingestion Line"): # for line in data_ingestion.objectValues(portal_type="Data Ingestion Line"):
if line.getResourceReference() == resource_reference: # if line.getResourceReference() == resource_reference:
input_line = line # input_line = line
elif line.getResourceValue().getPortalType() == "Data Operation": # elif line.getResourceValue().getPortalType() == "Data Operation":
operation_line = line # operation_line = line
data_operation = operation_line.getResourceValue() data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue() data_stream = input_line.getAggregateDataStreamValue()
if eof == "EOF": #if eof == "EOF":
data_ingestion.start() # data_ingestion.start()
log("Data Ingestion started.") # log("Data Ingestion started.")
context.logEntry("Data Ingestion started.") # context.logEntry("Data Ingestion started.")
return data_operation, {'data_stream': data_stream} return data_operation, {'data_stream': data_stream}
except Exception as e: except Exception as e:
......
...@@ -26,13 +26,16 @@ query_dict = { ...@@ -26,13 +26,16 @@ query_dict = {
data_stream_list = [] data_stream_list = []
for stream in portal_catalog(**query_dict): for stream in portal_catalog(**query_dict):
query = ComplexQuery(Query(simulation_state='started'),
Query(simulation_state='stopped'),
Query(simulation_state='delivered'),
logical_operator="OR")
ing_dict = { ing_dict = {
"query": query,
"portal_type": "Data Ingestion", "portal_type": "Data Ingestion",
"reference": stream.getReference()} "reference": stream.getReference()}
ingestions = portal_catalog(**ing_dict) ingestions = portal_catalog(**ing_dict)
if len(ingestions) > 0: if len(ingestions) == 1:
ingestion = ingestions[0]
if ingestion.getSimulationState() in ['started','stopped','delivered']:
data_stream_list.append(["data_stream_module/"+stream.getId(), stream.getReference()]) data_stream_list.append(["data_stream_module/"+stream.getId(), stream.getReference()])
return { "status_code": 0, "result": data_stream_list } return { "status_code": 0, "result": data_stream_list }
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
data_ingestion = portal_catalog.getResultValue( query = ComplexQuery(Query(simulation_state='started'),
portal_type = 'Data Ingestion', Query(simulation_state='stopped'),
reference = reference) Query(simulation_state='delivered'),
logical_operator="OR")
ing_dict = {
"query": query,
"portal_type": "Data Ingestion",
"reference": reference}
ingestions = portal_catalog(**ing_dict)
if len(ingestions) == 1:
data_ingestion = ingestions[0]
else:
context.logEntry("ERROR getting Data Ingestion of file %s. The file does not have a unique data ingestion in correct state.")
return '{"metadata":"No metadata available for this type of file yet"}'
try: try:
if data_ingestion is None or data_ingestion.getSimulationState() != 'delivered': if data_ingestion is None or data_ingestion.getSimulationState() != 'delivered':
...@@ -26,7 +37,7 @@ try: ...@@ -26,7 +37,7 @@ try:
data_descriptor = context.restrictedTraverse(url) data_descriptor = context.restrictedTraverse(url)
except Exception as e: except Exception as e:
# backward compatibility # backward compatibility
log("ERROR while looking for data descriptor with id %s : %s" % (str(data_ingestion.getId()), str(e))) context.logEntry("ERROR while looking for data descriptor with id %s : %s" % (str(data_ingestion.getId()), str(e)))
query = Query(portal_type="Data Descriptor") query = Query(portal_type="Data Descriptor")
data_descriptor = None data_descriptor = None
for document in portal_catalog(query=query): for document in portal_catalog(query=query):
...@@ -42,5 +53,5 @@ try: ...@@ -42,5 +53,5 @@ try:
return '{"metadata":"No metadata available for this type of file yet"}' return '{"metadata":"No metadata available for this type of file yet"}'
except Exception as e: except Exception as e:
log("Error getting data descriptor content: " + str(e)) context.logEntry("Error getting data descriptor content: " + str(e))
return '{"metadata":"No metadata descriptor found for this file"}' return '{"metadata":"No metadata descriptor found for this file"}'
...@@ -14,7 +14,9 @@ from Products.ERP5Type.Log import log ...@@ -14,7 +14,9 @@ from Products.ERP5Type.Log import log
class TestDataIngestion(SecurityTestCase): class TestDataIngestion(SecurityTestCase):
PART = "/xxx" PART_1 = "/001"
PART_2 = "/002"
PART_3 = "/003"
EOF = "/EOF" EOF = "/EOF"
FIF = "/fif" FIF = "/fif"
TXT = "/txt" TXT = "/txt"
...@@ -116,6 +118,8 @@ class TestDataIngestion(SecurityTestCase): ...@@ -116,6 +118,8 @@ class TestDataIngestion(SecurityTestCase):
raise StandardError("Could not find ingestion with reference %s" % reference) raise StandardError("Could not find ingestion with reference %s" % reference)
def simulateIngestionAlarm(self, reference, now): def simulateIngestionAlarm(self, reference, now):
self.portal.ERP5Site_startIngestionList()
self.tic()
self.portal.ERP5Site_stopIngestionList() self.portal.ERP5Site_stopIngestionList()
self.tic() self.tic()
self.manuallyStopIngestionWorkaround(reference, now) self.manuallyStopIngestionWorkaround(reference, now)
...@@ -258,13 +262,19 @@ class TestDataIngestion(SecurityTestCase): ...@@ -258,13 +262,19 @@ class TestDataIngestion(SecurityTestCase):
def test_data_ingestion_splitted_file(self): def test_data_ingestion_splitted_file(self):
reference = self.getRandomReference() reference = self.getRandomReference()
ingestion_policy = self.getIngestionPolicy(reference, self.INGESTION_SCRIPT) ingestion_policy = self.getIngestionPolicy(reference, self.INGESTION_SCRIPT)
data_chunk = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(1000)]) data_chunk_1 = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(250)])
data_chunk_1 = data_chunk[:int(math.floor(len(data_chunk)/2))] data_chunk_2 = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(250)])
data_chunk_2 = data_chunk[int(math.floor(len(data_chunk)/2)):] data_chunk_3 = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(250)])
data_chunk_4 = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(250)])
data_chunk = data_chunk_1 + data_chunk_2 + data_chunk_3 + data_chunk_4
ingestion_reference = self.getIngestionReference(reference, self.FIF) ingestion_reference = self.getIngestionReference(reference, self.FIF)
self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.PART, data_chunk_1, ingestion_policy) self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.PART_1, data_chunk_1, ingestion_policy)
self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.EOF, data_chunk_2, ingestion_policy) self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.PART_2, data_chunk_2, ingestion_policy)
self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.PART_3, data_chunk_3, ingestion_policy)
self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.EOF, data_chunk_4, ingestion_policy)
self.portal.ERP5Site_startIngestionList()
self.tic()
ingestion_id, ingestion_reference = self.sanitizeReference(ingestion_reference) ingestion_id, ingestion_reference = self.sanitizeReference(ingestion_reference)
data_stream = self.getDataStream(ingestion_reference) data_stream = self.getDataStream(ingestion_reference)
self.assertEqual(len(data_chunk), len(data_stream.getData())) self.assertEqual(len(data_chunk), len(data_stream.getData()))
......
...@@ -46,16 +46,17 @@ ...@@ -46,16 +46,17 @@
<key> <string>text_content_warning_message</string> </key> <key> <string>text_content_warning_message</string> </key>
<value> <value>
<tuple> <tuple>
<string>W:137, 4: Unused variable \'ingestion_id\' (unused-variable)</string> <string>W:141, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:160, 34: Unused variable \'i\' (unused-variable)</string> <string>W:164, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:160, 76: Unused variable \'j\' (unused-variable)</string> <string>W:164, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:183, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string> <string>W:187, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string>
<string>W:183, 4: Reimport \'numpy\' (imported line 9) (reimported)</string> <string>W:187, 4: Reimport \'numpy\' (imported line 9) (reimported)</string>
<string>W:199, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string> <string>W:203, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string>
<string>W:203, 10: No exception type(s) specified (bare-except)</string> <string>W:207, 10: No exception type(s) specified (bare-except)</string>
<string>W:211, 26: Unused variable \'e\' (unused-variable)</string> <string>W:215, 26: Unused variable \'e\' (unused-variable)</string>
<string>W:268, 4: Unused variable \'ingestion_id\' (unused-variable)</string> <string>W:278, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W: 8, 0: Unused timedelta imported from datetime (unused-import)</string> <string>W: 8, 0: Unused timedelta imported from datetime (unused-import)</string>
<string>W: 10, 0: Unused import math (unused-import)</string>
<string>W: 13, 0: Unused log imported from Products.ERP5Type.Log (unused-import)</string> <string>W: 13, 0: Unused log imported from Products.ERP5Type.Log (unused-import)</string>
</tuple> </tuple>
</value> </value>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment