Commit dddb45c4 authored by Ivan Tyagov's avatar Ivan Tyagov

Changes in data lake ingestion and UI

See merge request nexedi/wendelin!46
parents 5ad4a202 bd164f63
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Alarm" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>active_sense_method_id</string> </key>
<value> <string>Alarm_checkDataStreamStateConsistency</string> </value>
</item>
<item>
<key> <string>automatic_solve</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>enabled</string> </key>
<value> <int>1</int> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>wendelin_check_datastream_consistency</string> </value>
</item>
<item>
<key> <string>periodicity_hour</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_minute</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_minute_frequency</string> </key>
<value> <int>2</int> </value>
</item>
<item>
<key> <string>periodicity_month</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_month_day</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_start_date</string> </key>
<value>
<object>
<klass>
<global id="1.1" name="DateTime" module="DateTime.DateTime"/>
</klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>1420070400.0</float>
<string>GMT</string>
</tuple>
</state>
</object>
</value>
</item>
<item>
<key> <string>periodicity_stop_date</string> </key>
<value>
<object>
<klass> <reference id="1.1"/> </klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>32503680000.0</float>
<string>GMT</string>
</tuple>
</state>
</object>
</value>
</item>
<item>
<key> <string>periodicity_week</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Alarm</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string>Check Data Stream Consistency</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
...@@ -30,7 +30,7 @@ portal = context.getPortalObject() ...@@ -30,7 +30,7 @@ portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"] reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
#if last chunk of split ingestion -> validate all related data streams and publish the current one: #if last chunk of split ingestion -> validate all related data streams
if data_stream.getId().endswith(reference_end_split): if data_stream.getId().endswith(reference_end_split):
query = Query(portal_type="Data Stream", reference=data_stream.getReference(), validation_state="draft") query = Query(portal_type="Data Stream", reference=data_stream.getReference(), validation_state="draft")
split_ingestion_data_stream_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),)) split_ingestion_data_stream_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
...@@ -41,4 +41,3 @@ if data_stream.getId().endswith(reference_end_split): ...@@ -41,4 +41,3 @@ if data_stream.getId().endswith(reference_end_split):
chunk_data_stream.validate() chunk_data_stream.validate()
if data_stream.getValidationState() != "validated": if data_stream.getValidationState() != "validated":
data_stream.validate() data_stream.validate()
data_stream.publish()
# Consistency state alarm that checks and fixes (changes state) that
# all published Data Sets have their linked Data Stream(s) matching its state.
for data_set in context.portal_catalog(portal_type = "Data Set",
validation_state = "published"):
data_ingestion_line_list = context.portal_catalog(
portal_type = "Data Ingestion Line",
aggregate_uid = data_set.getUid())
for data_ingestion_line in data_ingestion_line_list:
data_stream = data_ingestion_line.getAggregateValue(portal_type = "Data Stream")
if data_stream.getValidationState() == "validated":
data_stream.publish()
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Alarm_checkDataStreamStateConsistency</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
...@@ -69,8 +69,6 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -69,8 +69,6 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
data_stream.setVersion(hash_value) data_stream.setVersion(hash_value)
if data_stream.getValidationState() != "validated" and data_stream.getValidationState() != "published": if data_stream.getValidationState() != "validated" and data_stream.getValidationState() != "published":
data_stream.validate() data_stream.validate()
if data_stream.getValidationState() != "published":
data_stream.publish()
if data_ingestion.getSimulationState() == "started": if data_ingestion.getSimulationState() == "started":
data_ingestion.stop() data_ingestion.stop()
except Exception as e: except Exception as e:
......
...@@ -117,9 +117,8 @@ data_ingestion.start() ...@@ -117,9 +117,8 @@ data_ingestion.start()
data_operation = operation_line.getResourceValue() data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue() data_stream = input_line.getAggregateDataStreamValue()
# if not split (one single ingestion) validate and publish the data stream # if not split (one single ingestion) validate the data stream
if eof == reference_end_single: if eof == reference_end_single:
data_stream.validate() data_stream.validate()
data_stream.publish()
return data_operation, {'data_stream': data_stream} return data_operation, {'data_stream': data_stream}
...@@ -19,7 +19,7 @@ class TestDataIngestion(SecurityTestCase): ...@@ -19,7 +19,7 @@ class TestDataIngestion(SecurityTestCase):
SIZE_HASH = REFERENCE_SEPARATOR + "fake-size"+ REFERENCE_SEPARATOR + "fake-hash" SIZE_HASH = REFERENCE_SEPARATOR + "fake-size"+ REFERENCE_SEPARATOR + "fake-hash"
SINGLE_INGESTION_END = REFERENCE_SEPARATOR SINGLE_INGESTION_END = REFERENCE_SEPARATOR
CHUNK_SIZE_CSV = 25 CHUNK_SIZE_CSV = 25
REF_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR + "fake-dataset" + REFERENCE_SEPARATOR REF_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR
REF_SUPPLIER_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR REF_SUPPLIER_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR
INVALID = "_invalid" INVALID = "_invalid"
...@@ -36,10 +36,12 @@ class TestDataIngestion(SecurityTestCase): ...@@ -36,10 +36,12 @@ class TestDataIngestion(SecurityTestCase):
random_string = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(10)]) random_string = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(10)])
return 'UNIT-TEST-' + random_string return 'UNIT-TEST-' + random_string
def getIngestionReference(self, reference, extension, randomize_ingestion_reference=False): def getIngestionReference(self, reference, extension, randomize_ingestion_reference=False, data_set_reference=False):
if not data_set_reference:
data_set_reference = "fake-dataset"
if not randomize_ingestion_reference: if not randomize_ingestion_reference:
# return hard coded which results in one Data Set and multiple Data Streams (in context of test) # return hard coded which results in one Data Set and multiple Data Streams (in context of test)
return self.REF_PREFIX + reference + extension return self.REF_PREFIX + data_set_reference + self.REFERENCE_SEPARATOR + reference + extension
else: else:
# create random one # create random one
random_string = self.getRandomReference() random_string = self.getRandomReference()
...@@ -88,15 +90,15 @@ class TestDataIngestion(SecurityTestCase): ...@@ -88,15 +90,15 @@ class TestDataIngestion(SecurityTestCase):
ingestion_policy.ingest() ingestion_policy.ingest()
self.tic() self.tic()
def ingest(self, data_chunk, reference, extension, eof, randomize_ingestion_reference=False): def ingest(self, data_chunk, reference, extension, eof, randomize_ingestion_reference=False, data_set_reference=False):
ingestion_reference = self.getIngestionReference(reference, extension, randomize_ingestion_reference) ingestion_reference = self.getIngestionReference(reference, extension, randomize_ingestion_reference, data_set_reference)
# use default ebulk policy # use default ebulk policy
ingestion_policy = self.portal.portal_ingestion_policies.default_ebulk ingestion_policy = self.portal.portal_ingestion_policies.default_ebulk
self.ingestRequest(ingestion_reference, eof, data_chunk, ingestion_policy) self.ingestRequest(ingestion_reference, eof, data_chunk, ingestion_policy)
_, ingestion_reference = self.sanitizeReference(ingestion_reference) _, ingestion_reference = self.sanitizeReference(ingestion_reference)
return ingestion_reference return ingestion_reference
def stepIngest(self, extension, delimiter, randomize_ingestion_reference=False): def stepIngest(self, extension, delimiter, randomize_ingestion_reference=False, data_set_reference=False):
file_name = "file_name.csv" file_name = "file_name.csv"
reference = self.getRandomReference() reference = self.getRandomReference()
array = [[random.random() for i in range(self.CHUNK_SIZE_CSV + 10)] for j in range(self.CHUNK_SIZE_CSV + 10)] array = [[random.random() for i in range(self.CHUNK_SIZE_CSV + 10)] for j in range(self.CHUNK_SIZE_CSV + 10)]
...@@ -111,7 +113,8 @@ class TestDataIngestion(SecurityTestCase): ...@@ -111,7 +113,8 @@ class TestDataIngestion(SecurityTestCase):
chunk.append(line) chunk.append(line)
else: else:
break break
ingestion_reference = self.ingest(data_chunk, reference, extension, self.SINGLE_INGESTION_END, randomize_ingestion_reference=randomize_ingestion_reference) ingestion_reference = self.ingest(data_chunk, reference, extension, self.SINGLE_INGESTION_END,
randomize_ingestion_reference=randomize_ingestion_reference, data_set_reference=data_set_reference)
if os.path.exists(file_name): if os.path.exists(file_name):
os.remove(file_name) os.remove(file_name)
...@@ -129,17 +132,18 @@ class TestDataIngestion(SecurityTestCase): ...@@ -129,17 +132,18 @@ class TestDataIngestion(SecurityTestCase):
data_stream_data = data_stream.getData() data_stream_data = data_stream.getData()
self.assertEqual(data_chunk, data_stream_data) self.assertEqual(data_chunk, data_stream_data)
# check Data Set is validated and Data Stream is published
self.assertEqual('validated', data_set.getValidationState())
self.assertEqual('published', data_stream.getValidationState())
return data_set, [data_stream] return data_set, [data_stream]
def test_01_DefaultEbulkIngestion(self): def test_01_DefaultEbulkIngestion(self):
""" """
Test default ingestion with ebulk too. Test default ingestion with ebulk too.
""" """
self.stepIngest(self.CSV, ",") data_set, data_stream_list = self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
# check Data Set and Data Stream is validated
self.assertEqual('validated', data_set.getValidationState())
self.assertSameSet(['validated' for x in data_stream_list],
[x.getValidationState() for x in data_stream_list])
def test_02_DefaultSplitIngestion(self): def test_02_DefaultSplitIngestion(self):
""" """
...@@ -182,12 +186,9 @@ class TestDataIngestion(SecurityTestCase): ...@@ -182,12 +186,9 @@ class TestDataIngestion(SecurityTestCase):
data_stream_list = self.getDataStreamChunkList(ingestion_reference) data_stream_list = self.getDataStreamChunkList(ingestion_reference)
#one data stream per chunk #one data stream per chunk
self.assertEqual(len(data_stream_list), 4) self.assertEqual(len(data_stream_list), 4)
#last datastream (EOF) published, the rest validated #all data streams are validated
for stream in data_stream_list: self.assertSameSet(['validated' for x in data_stream_list],
if stream.getId().endswith(self.EOF.replace(self.REFERENCE_SEPARATOR, "")): [x.getValidationState() for x in data_stream_list])
self.assertEqual('published', stream.getValidationState())
else:
self.assertEqual('validated', stream.getValidationState())
def test_03_DefaultWendelinConfigurationExistency(self): def test_03_DefaultWendelinConfigurationExistency(self):
""" """
...@@ -211,6 +212,13 @@ class TestDataIngestion(SecurityTestCase): ...@@ -211,6 +212,13 @@ class TestDataIngestion(SecurityTestCase):
# check data set and all Data Streams states # check data set and all Data Streams states
self.assertEqual('validated', data_set.getValidationState()) self.assertEqual('validated', data_set.getValidationState())
self.assertSameSet(['validated' for x in data_stream_list],
[x.getValidationState() for x in data_stream_list])
# publish data set and have all Data Streams publsihed automatically
data_set.publish()
self.tic()
self.assertEqual('published', data_set.getValidationState())
self.assertSameSet(['published' for x in data_stream_list], self.assertSameSet(['published' for x in data_stream_list],
[x.getValidationState() for x in data_stream_list]) [x.getValidationState() for x in data_stream_list])
...@@ -221,4 +229,37 @@ class TestDataIngestion(SecurityTestCase): ...@@ -221,4 +229,37 @@ class TestDataIngestion(SecurityTestCase):
self.assertSameSet(['invalidated' for x in data_stream_list], self.assertSameSet(['invalidated' for x in data_stream_list],
[x.getValidationState() for x in data_stream_list]) [x.getValidationState() for x in data_stream_list])
def test_05_StateConsistencyAlarm(self):
"""
Test alarm that checks (and fixes) Data set - Data stream state consistency
"""
data_set_reference = "consistency-dataset-" + self.getRandomReference()
data_set, data_stream_list = self.stepIngest(self.CSV, ",",
randomize_ingestion_reference=False, data_set_reference=data_set_reference)
self.tic()
first_file_stream = data_stream_list[0]
# publish Data Set (all Data Streams are published automatically)
data_set.publish()
self.tic()
# ingest new file
data_set, data_stream_list = self.stepIngest(self.CSV, ",")
self.tic()
second_file_stream = data_stream_list[0]
# second ingested file stream is in validated state (inconsistent with whole data set state)
self.assertEqual(data_set.getValidationState(), 'published')
self.assertEqual(first_file_stream.getValidationState(), 'published')
self.assertEqual(second_file_stream.getValidationState(), 'validated')
# call explicitly alarm to fix the state inconsistency
self.portal.portal_alarms.wendelin_check_datastream_consistency.Alarm_checkDataStreamStateConsistency()
self.tic()
# check states where updated (all published)
self.assertEqual(data_set.getValidationState(), 'published')
self.assertEqual(first_file_stream.getValidationState(), 'published')
self.assertEqual(second_file_stream.getValidationState(), 'published')
# XXX: new test which simulates download / upload of Data Set and increase DS version # XXX: new test which simulates download / upload of Data Set and increase DS version
\ No newline at end of file
...@@ -46,8 +46,8 @@ ...@@ -46,8 +46,8 @@
<key> <string>text_content_warning_message</string> </key> <key> <string>text_content_warning_message</string> </key>
<value> <value>
<tuple> <tuple>
<string>W:102, 34: Unused variable \'i\' (unused-variable)</string> <string>W:104, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:102, 76: Unused variable \'j\' (unused-variable)</string> <string>W:104, 76: Unused variable \'j\' (unused-variable)</string>
</tuple> </tuple>
</value> </value>
</item> </item>
......
...@@ -5,6 +5,8 @@ data_product_module/fif_data ...@@ -5,6 +5,8 @@ data_product_module/fif_data
data_product_module/fif_descriptor data_product_module/fif_descriptor
data_supply_module/embulk data_supply_module/embulk
data_supply_module/embulk/** data_supply_module/embulk/**
portal_alarms/wendelin_check_datastream_consistency
portal_alarms/wendelin_check_datastream_consistency/**
portal_alarms/wendelin_data_lake_handle_analysis portal_alarms/wendelin_data_lake_handle_analysis
portal_alarms/wendelin_data_lake_handle_analysis/** portal_alarms/wendelin_data_lake_handle_analysis/**
portal_callables/DataIngestionLine_writeEbulkIngestionToDataStream portal_callables/DataIngestionLine_writeEbulkIngestionToDataStream
......
...@@ -70,7 +70,7 @@ ...@@ -70,7 +70,7 @@
<tfoot> <tfoot>
</tfoot> </tfoot>
</table> </table>
<!-- rpm --> <!-- rpm
<table class="ui-responsive ui-body-c ui-table-inset"> <table class="ui-responsive ui-body-c ui-table-inset">
<thead class="ui-bar-inherit thead"> <thead class="ui-bar-inherit thead">
<tr> <tr>
...@@ -92,7 +92,7 @@ ...@@ -92,7 +92,7 @@
</tbody> </tbody>
<tfoot> <tfoot>
</tfoot> </tfoot>
</table> </table> -->
......
...@@ -238,7 +238,7 @@ ...@@ -238,7 +238,7 @@
</item> </item>
<item> <item>
<key> <string>serial</string> </key> <key> <string>serial</string> </key>
<value> <string>977.26100.35373.708</string> </value> <value> <string>983.63603.62266.8260</string> </value>
</item> </item>
<item> <item>
<key> <string>state</string> </key> <key> <string>state</string> </key>
...@@ -256,7 +256,7 @@ ...@@ -256,7 +256,7 @@
</tuple> </tuple>
<state> <state>
<tuple> <tuple>
<float>1564400844.41</float> <float>1593095858.94</float>
<string>UTC</string> <string>UTC</string>
</tuple> </tuple>
</state> </state>
......
...@@ -60,7 +60,7 @@ ...@@ -60,7 +60,7 @@
"key": "field_listbox", "key": "field_listbox",
"lines": 15, "lines": 15,
"list_method": "portal_catalog", "list_method": "portal_catalog",
"query": "urn:jio:allDocs?query=portal_type%3A%22Data+Set%22+AND+validation_state%3A%22validated%22+AND+NOT+reference%3A%22%25_invalid%22", "query": "urn:jio:allDocs?query=portal_type%3A%22Data+Set%22+AND+validation_state%3A%22published%22+AND+NOT+reference%3A%22%25_invalid%22",
"portal_type": [], "portal_type": [],
"search_column_list": column_list, "search_column_list": column_list,
"sort_column_list": column_list, "sort_column_list": column_list,
......
...@@ -236,7 +236,7 @@ ...@@ -236,7 +236,7 @@
</item> </item>
<item> <item>
<key> <string>serial</string> </key> <key> <string>serial</string> </key>
<value> <string>983.63603.62266.8260</string> </value> <value> <string>984.959.26850.28842</string> </value>
</item> </item>
<item> <item>
<key> <string>state</string> </key> <key> <string>state</string> </key>
...@@ -254,7 +254,7 @@ ...@@ -254,7 +254,7 @@
</tuple> </tuple>
<state> <state>
<tuple> <tuple>
<float>1589984604.57</float> <float>1593095447.25</float>
<string>UTC</string> <string>UTC</string>
</tuple> </tuple>
</state> </state>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment