Commit 41b01fb5 authored by Ivan Tyagov's avatar Ivan Tyagov

Changes in 'erp5_wendelin_data_lake_ingestion' ...

See merge request nexedi/wendelin!138
parents a7ed9e96 b8f6c3d0
Pipeline #28304 failed with stage
in 0 seconds
from erp5.component.module.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query
import hashlib
CHUNK_SIZE = 200000
def getHash(data_stream):
hash_md5 = hashlib.md5()
data_stream_chunk = None
n_chunk = 0
chunk_size = CHUNK_SIZE
while True:
start_offset = n_chunk*chunk_size
end_offset = n_chunk*chunk_size+chunk_size
try:
data_stream_chunk = ''.join(data_stream.readChunkList(start_offset, end_offset))
except Exception:
# data stream is empty
data_stream_chunk = ""
hash_md5.update(data_stream_chunk)
if data_stream_chunk == "": break
n_chunk += 1
return hash_md5.hexdigest()
def isInterruptedAbandonedSplitIngestion(reference):
from DateTime import DateTime
day_hours = 1.0/24/60*60*24
# started split data ingestions for reference
catalog_kw = {'portal_type': 'Data Ingestion',
'simulation_state': 'started',
'reference': reference}
invalidate = True
for data_ingestion in portal_catalog(**catalog_kw):
# check that all related ingestions are old (more than 24 hours)
if (DateTime() - data_ingestion.getCreationDate()) < day_hours:
invalidate = False
return invalidate
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_end_single = portal.ERP5Site_getIngestionReferenceDictionary()["single_end_suffix"]
reference_first_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_first_suffix"]
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
# stop single started ingestion (not split files)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%"+reference_end_single):
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1:
data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream',
reference = data_ingestion.getReference())
if data_stream is not None:
hash_value = getHash(data_stream)
data_stream.setVersion(hash_value)
if data_stream.getValidationState() != "validated":
data_stream.validate()
if data_ingestion.getSimulationState() == "started":
data_ingestion.stop()
# append split ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%"+reference_first_split):
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
if isInterruptedAbandonedSplitIngestion(data_ingestion.getReference()):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
else:
try:
last_data_stream_id = ""
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference(), validation_state="draft")
result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
full_data_stream = None
for data_stream in result_list:
log(''.join(["Data stream for split ingestion: ", data_stream.getId()]))
if data_stream.getId() == data_ingestion.getId():
log("It is base data stream")
full_data_stream = data_stream
else:
log("It is not base data stream, it is a part")
if full_data_stream != None:
log("appending content to base data stream...")
full_data_stream.appendData(data_stream.getData())
last_data_stream_id = data_stream.getId()
portal.data_stream_module.deleteContent(data_stream.getId())
if last_data_stream_id.endswith(reference_end_split):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=True)
full_data_stream_hash = getHash(full_data_stream)
full_data_stream.setVersion(full_data_stream_hash)
if full_data_stream.getValidationState() != "validated":
full_data_stream.validate()
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = data_ingestion.getReference())
for ingestion in related_split_ingestions:
if ingestion.getId() == full_data_stream.getId():
if ingestion.getSimulationState() == "started":
ingestion.stop()
else:
portal.ERP5Site_invalidateReference(ingestion)
ingestion.deliver()
except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
context.logEntry(e)
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="_reconstructor" module="copy_reg"/>
</klass>
<tuple>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
<global name="object" module="__builtin__"/>
<none/>
</tuple>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>DataLake_stopIngestionList</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
...@@ -54,7 +54,7 @@ ...@@ -54,7 +54,7 @@
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
<value> <string>ERP5_getDescriptorHTMLContent</string> </value> <value> <string>ERP5Site_getDescriptorHTMLContent</string> </value>
</item> </item>
</dictionary> </dictionary>
</pickle> </pickle>
......
...@@ -119,7 +119,7 @@ ...@@ -119,7 +119,7 @@
}); });
}) })
.declareMethod("getDescriptorContent", function (descriptorReference) { .declareMethod("getDescriptorContent", function (descriptorReference) {
var url = "/ERP5_getDescriptorHTMLContent?reference=" + descriptorReference, var url = "/ERP5Site_getDescriptorHTMLContent?reference=" + descriptorReference,
xmlHttp = new XMLHttpRequest(); xmlHttp = new XMLHttpRequest();
try { try {
xmlHttp.open("GET", url, false); xmlHttp.open("GET", url, false);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment