Commit 713528d0 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: single ingestions (not split) are immediately available in site

- fix in ingestion ref exists handling restarted split ingestions
parent 72ec635d
...@@ -4,23 +4,22 @@ from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery ...@@ -4,23 +4,22 @@ from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
# invalidate old (more than 4 min) pending ingestions (e.g. split ingestions that were canceled/interrumped) # invalidate old (more than 5 min) pending ingestions (e.g. split ingestions that were canceled/interrumped and no resumed)
from DateTime import DateTime from DateTime import DateTime
now = DateTime() now = DateTime()
now_minus_max = now - 1.0/24/60*9999 now_minus_max = now - 1.0/24/60*9999
now_minus_4 = now - 1.0/24/60*4 now_minus_5 = now - 1.0/24/60*5
catalog_kw = {'creation_date': {'query': (now_minus_max, now_minus_5), 'range': 'minmax'}, 'simulation_state': 'started', 'portal_type': 'Data Ingestion'}
catalog_kw = {'creation_date': {'query': (now_minus_max, now_minus_4), 'range': 'minmax'}, 'simulation_state': 'started', 'portal_type': 'Data Ingestion'}
for data_ingestion in portal_catalog(**catalog_kw): for data_ingestion in portal_catalog(**catalog_kw):
# search related data ingestions that are not old yet (less than 4 min) # search related data ingestions that are not old yet (less than 5 min)
catalog_kw = {'creation_date': {'query': (now_minus_4, DateTime()), 'range': 'minmax'}, catalog_kw = {'creation_date': {'query': (now_minus_5, DateTime()), 'range': 'minmax'},
'simulation_state': 'started', 'simulation_state': 'started',
'portal_type': 'Data Ingestion', 'portal_type': 'Data Ingestion',
'reference': data_ingestion.getReference()} 'reference': data_ingestion.getReference()}
invalidate = True invalidate = True
if len(portal_catalog(**catalog_kw)) > 0: if len(portal_catalog(**catalog_kw)) > 0:
context.logEntry("Data Ingestion %s is old but it has related data ingestions that are not, so ingestion is still in course. It won't be invalidated." % data_ingestion.getId()) context.logEntry("Data Ingestion %s is old but it has related data ingestions that are not, so split ingestion is still in course. It won't be invalidated." % data_ingestion.getId())
invalidate = False invalidate = False
if invalidate: if invalidate:
......
...@@ -25,23 +25,24 @@ def getHash(data_stream): ...@@ -25,23 +25,24 @@ def getHash(data_stream):
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
# start single started ingestion (not split files) # stop single started ingestion (not split files)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion", for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started", simulation_state = "started",
id = "%EOF"): id = "%END"):
if not data_ingestion.getReference().endswith("_invalid"): if not data_ingestion.getReference().endswith("_invalid"):
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion", related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1: if len(related_split_ingestions) == 1:
context.logEntry("Started EOF ingestion found: " + data_ingestion.getId()) context.logEntry("Started single ingestion (not split) found: " + data_ingestion.getId())
data_stream = portal_catalog.getResultValue( data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream', portal_type = 'Data Stream',
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
if data_stream is not None: if data_stream is not None:
hash_value = getHash(data_stream) hash_value = getHash(data_stream)
data_stream.setVersion(hash_value) data_stream.setVersion(hash_value)
data_stream.validate()
data_ingestion.stop() data_ingestion.stop()
if data_stream.getValidationState() != "validated":
data_stream.validate()
context.logEntry("Data Ingestion %s stopped." % data_ingestion.getId()) context.logEntry("Data Ingestion %s stopped." % data_ingestion.getId())
# append split ingestions # append split ingestions
...@@ -64,7 +65,8 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -64,7 +65,8 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
if last_data_stream_id.endswith("EOF"): if last_data_stream_id.endswith("EOF"):
hash = getHash(full_data_stream) hash = getHash(full_data_stream)
full_data_stream.setVersion(hash) full_data_stream.setVersion(hash)
full_data_stream.validate() if full_data_stream.getValidationState() != "validated":
full_data_stream.validate()
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion", related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
for ingestion in related_split_ingestions: for ingestion in related_split_ingestions:
......
...@@ -14,7 +14,7 @@ try: ...@@ -14,7 +14,7 @@ try:
context.logEntry("Data Ingestion reference: %s" % reference) context.logEntry("Data Ingestion reference: %s" % reference)
data_ingestion_reference = reference data_ingestion_reference = reference
eof = movement_dict.get('eof', 'EOF') eof = movement_dict.get('eof', 'END') if movement_dict.get('eof', 'END') != "" else 'END'
resource_reference = movement_dict.get('resource_reference', None) resource_reference = movement_dict.get('resource_reference', None)
supplier = movement_dict.get('supplier', None) supplier = movement_dict.get('supplier', None)
extension = movement_dict.get('extension', None) extension = movement_dict.get('extension', None)
...@@ -141,6 +141,9 @@ try: ...@@ -141,6 +141,9 @@ try:
data_operation = operation_line.getResourceValue() data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue() data_stream = input_line.getAggregateDataStreamValue()
if eof == "END":
data_stream.validate()
return data_operation, {'data_stream': data_stream} return data_operation, {'data_stream': data_stream}
except Exception as e: except Exception as e:
context.logEntry("[ERROR] Error during ingestion policy operation: " + str(e)) context.logEntry("[ERROR] Error during ingestion policy operation: " + str(e))
......
...@@ -31,11 +31,21 @@ try: ...@@ -31,11 +31,21 @@ try:
if data_ingestion.getSimulationState() == 'started': if data_ingestion.getSimulationState() == 'started':
try: try:
if EOF != "EOF" and int(EOF) == 1: # check if user tries to restart the previous split ingestion
# The user has restarted an interrupted split ingestion if (EOF == "" or EOF == "END") or (EOF != "EOF" and int(EOF) == 1):
context.log("[WARNING] User has restarted an interrumpted ingestion for reference %s." % data_ingestion.getReference()) # check if existing split ingestion is still being processed or if it is interrumped
context.log("[WARNING] Previous split ingestions for reference %s will be discarted and full ingestion restarted." % data_ingestion.getReference()) data_ingestion_eof = portal_catalog.getResultValue(
portal.ERP5Site_invalidateIngestionObjects(data_ingestion.getReference()) portal_type = 'Data Ingestion',
reference = data_ingestion_reference,
id = "%EOF")
if data_ingestion_eof:
# reference exists: previous split ingestion is still being processed
return TRUE
else:
# previous ingestion was interrumped
context.log("[WARNING] User has restarted an interrumpted ingestion for reference %s." % data_ingestion.getReference())
context.log("[WARNING] Previous split ingestions for reference %s will be discarted and full ingestion restarted." % data_ingestion.getReference())
portal.ERP5Site_invalidateIngestionObjects(data_ingestion.getReference())
except: except:
pass pass
return FALSE return FALSE
......
...@@ -25,6 +25,7 @@ class TestDataIngestion(SecurityTestCase): ...@@ -25,6 +25,7 @@ class TestDataIngestion(SecurityTestCase):
GZ = "/gz" GZ = "/gz"
NII = "/nii" NII = "/nii"
SIZE_HASH = "/fake-size/fake-hash" SIZE_HASH = "/fake-size/fake-hash"
SINGLE_INGESTION_END = "/"
RANDOM = "/" + ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(3)]) RANDOM = "/" + ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(3)])
CHUNK_SIZE_TXT = 50000 CHUNK_SIZE_TXT = 50000
CHUNK_SIZE_CSV = 25 CHUNK_SIZE_CSV = 25
...@@ -135,7 +136,7 @@ class TestDataIngestion(SecurityTestCase): ...@@ -135,7 +136,7 @@ class TestDataIngestion(SecurityTestCase):
ingestion_reference = self.getIngestionReference(reference, extension) ingestion_reference = self.getIngestionReference(reference, extension)
now = datetime.now() now = datetime.now()
self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.EOF, data_chunk, ingestion_policy) self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.SINGLE_INGESTION_END, data_chunk, ingestion_policy)
ingestion_id, ingestion_reference = self.sanitizeReference(ingestion_reference) ingestion_id, ingestion_reference = self.sanitizeReference(ingestion_reference)
self.simulateIngestionAlarm(ingestion_reference, now) self.simulateIngestionAlarm(ingestion_reference, now)
......
...@@ -46,15 +46,15 @@ ...@@ -46,15 +46,15 @@
<key> <string>text_content_warning_message</string> </key> <key> <string>text_content_warning_message</string> </key>
<value> <value>
<tuple> <tuple>
<string>W:139, 4: Unused variable \'ingestion_id\' (unused-variable)</string> <string>W:140, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:162, 34: Unused variable \'i\' (unused-variable)</string> <string>W:163, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:162, 76: Unused variable \'j\' (unused-variable)</string> <string>W:163, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:185, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string> <string>W:186, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string>
<string>W:185, 4: Reimport \'numpy\' (imported line 9) (reimported)</string> <string>W:186, 4: Reimport \'numpy\' (imported line 9) (reimported)</string>
<string>W:201, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string> <string>W:202, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string>
<string>W:205, 10: No exception type(s) specified (bare-except)</string> <string>W:206, 10: No exception type(s) specified (bare-except)</string>
<string>W:213, 26: Unused variable \'e\' (unused-variable)</string> <string>W:214, 26: Unused variable \'e\' (unused-variable)</string>
<string>W:278, 4: Unused variable \'ingestion_id\' (unused-variable)</string> <string>W:279, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W: 8, 0: Unused timedelta imported from datetime (unused-import)</string> <string>W: 8, 0: Unused timedelta imported from datetime (unused-import)</string>
<string>W: 10, 0: Unused import math (unused-import)</string> <string>W: 10, 0: Unused import math (unused-import)</string>
<string>W: 13, 0: Unused log imported from Products.ERP5Type.Log (unused-import)</string> <string>W: 13, 0: Unused log imported from Products.ERP5Type.Log (unused-import)</string>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment