Commit bd164f63 authored by Roque's avatar Roque

erp5_wendelin_data_lake_ingestion: new test for consistency state alarm

parent a7d7eff7
......@@ -19,7 +19,7 @@ class TestDataIngestion(SecurityTestCase):
SIZE_HASH = REFERENCE_SEPARATOR + "fake-size"+ REFERENCE_SEPARATOR + "fake-hash"
SINGLE_INGESTION_END = REFERENCE_SEPARATOR
CHUNK_SIZE_CSV = 25
REF_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR + "fake-dataset" + REFERENCE_SEPARATOR
REF_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR
REF_SUPPLIER_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR
INVALID = "_invalid"
......@@ -36,10 +36,12 @@ class TestDataIngestion(SecurityTestCase):
random_string = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(10)])
return 'UNIT-TEST-' + random_string
def getIngestionReference(self, reference, extension, randomize_ingestion_reference=False):
def getIngestionReference(self, reference, extension, randomize_ingestion_reference=False, data_set_reference=False):
if not data_set_reference:
data_set_reference = "fake-dataset"
if not randomize_ingestion_reference:
# return hard coded which results in one Data Set and multiple Data Streams (in context of test)
return self.REF_PREFIX + reference + extension
return self.REF_PREFIX + data_set_reference + self.REFERENCE_SEPARATOR + reference + extension
else:
# create random one
random_string = self.getRandomReference()
......@@ -88,15 +90,15 @@ class TestDataIngestion(SecurityTestCase):
ingestion_policy.ingest()
self.tic()
def ingest(self, data_chunk, reference, extension, eof, randomize_ingestion_reference=False):
ingestion_reference = self.getIngestionReference(reference, extension, randomize_ingestion_reference)
def ingest(self, data_chunk, reference, extension, eof, randomize_ingestion_reference=False, data_set_reference=False):
ingestion_reference = self.getIngestionReference(reference, extension, randomize_ingestion_reference, data_set_reference)
# use default ebulk policy
ingestion_policy = self.portal.portal_ingestion_policies.default_ebulk
self.ingestRequest(ingestion_reference, eof, data_chunk, ingestion_policy)
_, ingestion_reference = self.sanitizeReference(ingestion_reference)
return ingestion_reference
def stepIngest(self, extension, delimiter, randomize_ingestion_reference=False):
def stepIngest(self, extension, delimiter, randomize_ingestion_reference=False, data_set_reference=False):
file_name = "file_name.csv"
reference = self.getRandomReference()
array = [[random.random() for i in range(self.CHUNK_SIZE_CSV + 10)] for j in range(self.CHUNK_SIZE_CSV + 10)]
......@@ -111,7 +113,8 @@ class TestDataIngestion(SecurityTestCase):
chunk.append(line)
else:
break
ingestion_reference = self.ingest(data_chunk, reference, extension, self.SINGLE_INGESTION_END, randomize_ingestion_reference=randomize_ingestion_reference)
ingestion_reference = self.ingest(data_chunk, reference, extension, self.SINGLE_INGESTION_END,
randomize_ingestion_reference=randomize_ingestion_reference, data_set_reference=data_set_reference)
if os.path.exists(file_name):
os.remove(file_name)
......@@ -129,17 +132,18 @@ class TestDataIngestion(SecurityTestCase):
data_stream_data = data_stream.getData()
self.assertEqual(data_chunk, data_stream_data)
# check Data Set is validated and Data Stream is published
self.assertEqual('validated', data_set.getValidationState())
self.assertEqual('validated', data_stream.getValidationState())
return data_set, [data_stream]
def test_01_DefaultEbulkIngestion(self):
"""
Test default ingestion with ebulk too.
"""
self.stepIngest(self.CSV, ",")
data_set, data_stream_list = self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
# check Data Set and Data Stream is validated
self.assertEqual('validated', data_set.getValidationState())
self.assertSameSet(['validated' for x in data_stream_list],
[x.getValidationState() for x in data_stream_list])
def test_02_DefaultSplitIngestion(self):
"""
......@@ -225,4 +229,37 @@ class TestDataIngestion(SecurityTestCase):
self.assertSameSet(['invalidated' for x in data_stream_list],
[x.getValidationState() for x in data_stream_list])
def test_05_StateConsistencyAlarm(self):
"""
Test alarm that checks (and fixes) Data set - Data stream state consistency
"""
data_set_reference = "consistency-dataset-" + self.getRandomReference()
data_set, data_stream_list = self.stepIngest(self.CSV, ",",
randomize_ingestion_reference=False, data_set_reference=data_set_reference)
self.tic()
first_file_stream = data_stream_list[0]
# publish Data Set (all Data Streams are published automatically)
data_set.publish()
self.tic()
# ingest new file
data_set, data_stream_list = self.stepIngest(self.CSV, ",")
self.tic()
second_file_stream = data_stream_list[0]
# second ingested file stream is in validated state (inconsistent with whole data set state)
self.assertEqual(data_set.getValidationState(), 'published')
self.assertEqual(first_file_stream.getValidationState(), 'published')
self.assertEqual(second_file_stream.getValidationState(), 'validated')
# call explicitly alarm to fix the state inconsistency
self.portal.portal_alarms.wendelin_check_datastream_consistency.Alarm_checkDataStreamStateConsistency()
self.tic()
# check states where updated (all published)
self.assertEqual(data_set.getValidationState(), 'published')
self.assertEqual(first_file_stream.getValidationState(), 'published')
self.assertEqual(second_file_stream.getValidationState(), 'published')
# XXX: new test which simulates download / upload of Data Set and increase DS version
\ No newline at end of file
......@@ -46,8 +46,8 @@
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W:102, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:102, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:104, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:104, 76: Unused variable \'j\' (unused-variable)</string>
</tuple>
</value>
</item>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment