Commit 8c11895b authored by Ivan Tyagov's avatar Ivan Tyagov

Add multiple uploads test.

parent 4d9841d7
...@@ -3,6 +3,7 @@ import string ...@@ -3,6 +3,7 @@ import string
import random import random
import csv import csv
import os import os
import time
import numpy as np import numpy as np
import base64 import base64
from datetime import datetime from datetime import datetime
...@@ -101,26 +102,26 @@ class TestDataIngestion(SecurityTestCase): ...@@ -101,26 +102,26 @@ class TestDataIngestion(SecurityTestCase):
reference = reference) reference = reference)
return data_stream return data_stream
def ingestRequest(self, reference, eof, data_chunk, ingestion): def ingestRequest(self, reference, eof, data_chunk, ingestion_policy):
encoded_data_chunk = base64.b64encode(data_chunk) encoded_data_chunk = base64.b64encode(data_chunk)
request = self.portal.REQUEST request = self.portal.REQUEST
# only POST for Wendelin allowed # only POST for Wendelin allowed
request.environ["REQUEST_METHOD"] = 'POST' request.environ["REQUEST_METHOD"] = 'POST'
request.set('reference', reference + eof + self.SIZE_HASH) reference = reference + eof + self.SIZE_HASH
self.portal.log("Ingest with reference=%s" %reference)
request.set('reference', reference)
request.set('data_chunk', encoded_data_chunk) request.set('data_chunk', encoded_data_chunk)
ingestion.ingest() ingestion_policy.ingest()
self.tic() self.tic()
return return
def ingest(self, data_chunk, reference, extension): def ingest(self, data_chunk, reference, extension, eof):
ingestion_policy = self.getIngestionPolicy(reference, self.INGESTION_SCRIPT)
ingestion_reference = self.getIngestionReference(reference, extension) ingestion_reference = self.getIngestionReference(reference, extension)
# use default ebulk policy # use default ebulk policy
ingestion_policy = self.portal.portal_ingestion_policies.wendelin_embulk ingestion_policy = self.portal.portal_ingestion_policies.wendelin_embulk
now = datetime.now() self.ingestRequest(ingestion_reference, eof, data_chunk, ingestion_policy)
self.ingestRequest(ingestion_reference, self.SINGLE_INGESTION_END, data_chunk, ingestion_policy)
ingestion_id, ingestion_reference = self.sanitizeReference(ingestion_reference) ingestion_id, ingestion_reference = self.sanitizeReference(ingestion_reference)
return ingestion_reference return ingestion_reference
...@@ -141,7 +142,7 @@ class TestDataIngestion(SecurityTestCase): ...@@ -141,7 +142,7 @@ class TestDataIngestion(SecurityTestCase):
else: else:
break break
ingestion_reference = self.ingest(data_chunk, reference, extension) ingestion_reference = self.ingest(data_chunk, reference, extension, self.SINGLE_INGESTION_END)
if os.path.exists(file_name): if os.path.exists(file_name):
os.remove(file_name) os.remove(file_name)
...@@ -166,3 +167,45 @@ class TestDataIngestion(SecurityTestCase): ...@@ -166,3 +167,45 @@ class TestDataIngestion(SecurityTestCase):
delimiter = "," delimiter = ","
extension = self.CSV extension = self.CSV
self.stepIngest(extension, delimiter) self.stepIngest(extension, delimiter)
def test_02_DefaultSplitIngestion(self):
"""
Test multiple uploads from ebulk end up in same Data Stream concatenated
(in case of large file upload when ebluk by default splits file to 50MBs
chunks).
"""
data_chunk_1 = ''.join([random.choice(string.ascii_letters + string.digits) \
for _ in xrange(250)])
data_chunk_2 = ''.join([random.choice(string.ascii_letters + string.digits) \
for _ in xrange(250)])
data_chunk_3 = ''.join([random.choice(string.ascii_letters + string.digits) \
for _ in xrange(250)])
data_chunk_4 = ''.join([random.choice(string.ascii_letters + string.digits) \
for _ in xrange(250)])
data_chunk = data_chunk_1 + data_chunk_2 + data_chunk_3 + data_chunk_4
reference = self.getRandomReference()
ingestion_reference = self.ingest(data_chunk_1, reference, self.FIF, self.PART_1)
time.sleep(1)
self.tic()
ingestion_reference = self.ingest(data_chunk_2, reference, self.FIF, self.PART_2)
time.sleep(1)
self.tic()
ingestion_reference = self.ingest(data_chunk_3, reference, self.FIF, self.PART_3)
time.sleep(1)
self.tic()
ingestion_reference = self.ingest(data_chunk_4, reference, self.FIF, self.EOF)
time.sleep(1)
self.tic()
# call explicitly alarm so all 4 Data Streams can be concatenated to one
self.portal.portal_alarms.wendelin_data_lake_handle_analysis.Alarm_dataLakeHandleAnalysis()
self.tic()
# check resulting Data Stream
data_stream = self.getDataStream(ingestion_reference)
self.assertEqual(data_chunk, data_stream.getData())
...@@ -46,10 +46,10 @@ ...@@ -46,10 +46,10 @@
<key> <string>text_content_warning_message</string> </key> <key> <string>text_content_warning_message</string> </key>
<value> <value>
<tuple> <tuple>
<string>W:124, 4: Unused variable \'ingestion_id\' (unused-variable)</string> <string>W:125, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:122, 4: Unused variable \'now\' (unused-variable)</string> <string>W:132, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:131, 34: Unused variable \'i\' (unused-variable)</string> <string>W:132, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:131, 76: Unused variable \'j\' (unused-variable)</string> <string>W: 9, 0: Unused datetime imported from datetime (unused-import)</string>
</tuple> </tuple>
</value> </value>
</item> </item>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment