Commit 9f455fe5 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: handle ingestion of modified and deleted files

- corresponding refactoring in check references, handle split, etc
- new permissions for contributor (delete)
parent 713528d0
<type_roles> <type_roles>
<role id='Auditor'> <role id='Auditor; Author; Manager'>
<property id='title'>admin</property> <property id='title'>admin</property>
<multi_property id='category'>function/admin</multi_property> <multi_property id='category'>function/admin</multi_property>
</role> </role>
<role id='Auditor'> <role id='Auditor; Author; Manager'>
<property id='title'>contributor</property> <property id='title'>contributor</property>
<multi_property id='category'>function/contributor</multi_property> <multi_property id='category'>function/contributor</multi_property>
</role> </role>
......
<type_roles> <type_roles>
<role id='Auditor; Assignor'> <role id='Auditor; Author; Manager'>
<property id='title'>admin</property> <property id='title'>admin</property>
<multi_property id='category'>function/admin</multi_property> <multi_property id='category'>function/admin</multi_property>
</role> </role>
<role id='Auditor'> <role id='Auditor; Author; Manager'>
<property id='title'>contributor</property> <property id='title'>contributor</property>
<multi_property id='category'>function/contributor</multi_property> <multi_property id='category'>function/contributor</multi_property>
</role> </role>
......
<type_roles> <type_roles>
<role id='Auditor'> <role id='Auditor; Author; Manager'>
<property id='title'>admin</property> <property id='title'>admin</property>
<multi_property id='category'>function/admin</multi_property> <multi_property id='category'>function/admin</multi_property>
</role> </role>
<role id='Auditor'> <role id='Auditor; Author; Manager'>
<property id='title'>contributor</property> <property id='title'>contributor</property>
<multi_property id='category'>function/contributor</multi_property> <multi_property id='category'>function/contributor</multi_property>
</role> </role>
......
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
try:
if success: # full split ingestions successfully appendend
# invalidate old ingestion objects
context.logEntry("Invalidating old data ingestion objects after split ingestion for reference: " + reference)
data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream',
reference = reference,
validation_state = "validated")
if data_stream != None:
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
id = data_stream.getId())
data_stream.setReference(data_stream.getReference() + "_invalid")
data_stream.invalidate()
log("Data Stream invalidated")
if not data_ingestion.getReference().endswith("_invalid"):
data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
log("Data ingestion invalidated")
data_an = portal_catalog.getResultValue(
portal_type = 'Data Analysis',
id = data_stream.getId())
if data_an != None:
data_an.setReference(data_an.getReference() + "_invalid")
log("Data Analysis invalidated")
data_array = portal_catalog.getResultValue(
portal_type = 'Data Array',
id = data_stream.getId())
if data_array != None:
data_array.setReference(data_array.getReference() + "_invalid")
data_array.invalidate()
log("Data Array invalidated")
else: # split ingestion interrumped and restarted
# invalidate draft datastreams and old started data ingestions
context.logEntry("Invalidating old split data ingestions and data streams for reference: " + reference)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = reference):
if not data_ingestion.getReference().endswith("_invalid"):
data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
data_ingestion.deliver()
for data_stream in portal_catalog(portal_type = "Data Stream",
validation_state = "draft",
reference = reference):
if not data_stream.getReference().endswith("_invalid"):
data_stream.setReference(data_stream.getReference() + "_invalid")
except Exception as e:
log("ERROR in ERP5Site_invalidateSplitIngestions: " + str(e))
pass
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>reference, success</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_invalidateSplitIngestions</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
...@@ -53,21 +53,29 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -53,21 +53,29 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
context.logEntry("Started split ingestion found: " + data_ingestion.getId()) context.logEntry("Started split ingestion found: " + data_ingestion.getId())
try: try:
last_data_stream_id = "" last_data_stream_id = ""
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference()) query = Query(portal_type="Data Stream", reference=data_ingestion.getReference(), validation_state="draft")
result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),)) result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
full_data_stream = None
for data_stream in result_list: for data_stream in result_list:
log("data_stream.getId(): " + data_stream.getId())
if data_stream.getId() == data_ingestion.getId(): if data_stream.getId() == data_ingestion.getId():
log("is base data stream (001)")
full_data_stream = data_stream full_data_stream = data_stream
else: else:
full_data_stream.appendData(data_stream.getData()) log("is NOT base data stream (!=001)")
last_data_stream_id = data_stream.getId() if full_data_stream != None:
portal.data_stream_module.deleteContent(data_stream.getId()) log("appending content to base data stream...")
full_data_stream.appendData(data_stream.getData())
last_data_stream_id = data_stream.getId()
portal.data_stream_module.deleteContent(data_stream.getId())
if last_data_stream_id.endswith("EOF"): if last_data_stream_id.endswith("EOF"):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=True)
hash = getHash(full_data_stream) hash = getHash(full_data_stream)
full_data_stream.setVersion(hash) full_data_stream.setVersion(hash)
if full_data_stream.getValidationState() != "validated": if full_data_stream.getValidationState() != "validated":
full_data_stream.validate() full_data_stream.validate()
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion", related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
for ingestion in related_split_ingestions: for ingestion in related_split_ingestions:
if ingestion.getId() == full_data_stream.getId(): if ingestion.getId() == full_data_stream.getId():
......
...@@ -7,7 +7,6 @@ now_string = now.strftime('%Y%m%d-%H%M%S-%f')[:-3] ...@@ -7,7 +7,6 @@ now_string = now.strftime('%Y%m%d-%H%M%S-%f')[:-3]
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
try: try:
# remove supplier, eof, size and hash from reference # remove supplier, eof, size and hash from reference
reference = '/'.join(reference.split('/')[1:-3]) reference = '/'.join(reference.split('/')[1:-3])
...@@ -37,15 +36,17 @@ try: ...@@ -37,15 +36,17 @@ try:
portal_type = 'Data Stream', portal_type = 'Data Stream',
validation_state = 'validated', validation_state = 'validated',
reference = data_ingestion_reference) reference = data_ingestion_reference)
if size != data_stream.getSize(): if data_stream != None:
modified = True if size != data_stream.getSize():
elif hash_value != None and hash_value != "" and hash_value != data_stream.getVersion(): modified = True
modified = True elif hash_value != None and hash_value != "" and hash_value != data_stream.getVersion():
modified = True
if not modified: if not modified:
context.logEntry("An older ingestion for reference %s was already done." % data_ingestion_reference) context.logEntry("An older ingestion for reference %s was already done." % data_ingestion_reference)
raise ValueError("An older ingestion for reference %s was already done." % data_ingestion_reference) context.logEntry("Old file version will be invalidated.")
else: else:
context.logEntry("Ingestion of modified file. Old version will be invalidated.") context.logEntry("Ingestion of modified file. Old version will be invalidated.")
if eof == "END": # if not split (one single ingestion), invalidate old ingestion
portal.ERP5Site_invalidateIngestionObjects(data_ingestion_reference) portal.ERP5Site_invalidateIngestionObjects(data_ingestion_reference)
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults( specialise_value_list = [x.getObject() for x in portal_catalog.searchResults(
...@@ -101,9 +102,12 @@ try: ...@@ -101,9 +102,12 @@ try:
input_line.setAggregateSet( input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList()) input_line.getAggregateList() + operation_line.getAggregateList())
if hash_value is None or eof != "END": # do not set hash if split, calculate when append
hash_value = ""
data_stream = portal.data_stream_module.newContent( data_stream = portal.data_stream_module.newContent(
portal_type = "Data Stream", portal_type = "Data Stream",
id = data_ingestion_id, id = data_ingestion_id,
version = hash_value,
title = "%s%s" % (data_ingestion.getTitle(), "."+extension if extension != "none" else ""), title = "%s%s" % (data_ingestion.getTitle(), "."+extension if extension != "none" else ""),
reference = data_ingestion_reference) reference = data_ingestion_reference)
......
...@@ -26,19 +26,11 @@ query_dict = { ...@@ -26,19 +26,11 @@ query_dict = {
data_stream_list = [] data_stream_list = []
for stream in portal_catalog(**query_dict): for stream in portal_catalog(**query_dict):
query = ComplexQuery(Query(simulation_state='started'), if stream.getVersion() == "":
Query(simulation_state='stopped'), return { "status_code": 2, "result": [] }
Query(simulation_state='delivered'), data_stream_list.append({ "id": "data_stream_module/"+stream.getId(),
logical_operator="OR") "reference": stream.getReference(),
ing_dict = { "size": stream.getSize(),
"query": query, "hash": stream.getVersion() })
"portal_type": "Data Ingestion",
"reference": stream.getReference()}
ingestions = portal_catalog(**ing_dict)
if len(ingestions) == 1:
data_stream_list.append({ "id": "data_stream_module/"+stream.getId(),
"reference": stream.getReference(),
"size": stream.getSize(),
"hash": stream.getVersion() })
return { "status_code": 0, "result": data_stream_list } return { "status_code": 0, "result": data_stream_list }
...@@ -22,14 +22,12 @@ try: ...@@ -22,14 +22,12 @@ try:
context.logEntry("[ERROR] Data Ingestion reference is not well formated") context.logEntry("[ERROR] Data Ingestion reference is not well formated")
raise ValueError("Data Ingestion reference is not well formated") raise ValueError("Data Ingestion reference is not well formated")
# check if there are started ingestions for this reference
data_ingestion = portal_catalog.getResultValue( data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion', portal_type = 'Data Ingestion',
simulation_state = "started",
reference = data_ingestion_reference) reference = data_ingestion_reference)
if data_ingestion != None:
if data_ingestion is None:
return FALSE
if data_ingestion.getSimulationState() == 'started':
try: try:
# check if user tries to restart the previous split ingestion # check if user tries to restart the previous split ingestion
if (EOF == "" or EOF == "END") or (EOF != "EOF" and int(EOF) == 1): if (EOF == "" or EOF == "END") or (EOF != "EOF" and int(EOF) == 1):
...@@ -45,9 +43,17 @@ try: ...@@ -45,9 +43,17 @@ try:
# previous ingestion was interrumped # previous ingestion was interrumped
context.log("[WARNING] User has restarted an interrumpted ingestion for reference %s." % data_ingestion.getReference()) context.log("[WARNING] User has restarted an interrumpted ingestion for reference %s." % data_ingestion.getReference())
context.log("[WARNING] Previous split ingestions for reference %s will be discarted and full ingestion restarted." % data_ingestion.getReference()) context.log("[WARNING] Previous split ingestions for reference %s will be discarted and full ingestion restarted." % data_ingestion.getReference())
portal.ERP5Site_invalidateIngestionObjects(data_ingestion.getReference()) portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
except: except:
pass pass
# the ingestion attemp corresponds to a split ingestion in course, accept
return FALSE
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
reference = data_ingestion_reference)
if data_ingestion is None:
return FALSE return FALSE
if size != "" and size != None: if size != "" and size != None:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment