Commit abb7fa76 authored by Ivan Tyagov's avatar Ivan Tyagov

Add the very minimal ebulk test.

parent d90c9696
from Products.ERP5Type.tests.SecurityTestCase import SecurityTestCase
import string
import random
import csv
import os
import numpy as np
import base64
from datetime import datetime
class TestDataIngestion(SecurityTestCase):
REFERENCE_SEPARATOR = "/"
PART_1 = REFERENCE_SEPARATOR + "001"
PART_2 = REFERENCE_SEPARATOR + "002"
PART_3 = REFERENCE_SEPARATOR + "003"
EOF = REFERENCE_SEPARATOR + "EOF"
FIF = REFERENCE_SEPARATOR + "fif"
TXT = REFERENCE_SEPARATOR + "txt"
CSV = REFERENCE_SEPARATOR + "csv"
TSV = REFERENCE_SEPARATOR + "tsv"
GZ = REFERENCE_SEPARATOR + "gz"
NII = REFERENCE_SEPARATOR + "nii"
SIZE_HASH = REFERENCE_SEPARATOR + "fake-size"+ REFERENCE_SEPARATOR + "fake-hash"
SINGLE_INGESTION_END = REFERENCE_SEPARATOR
RANDOM = REFERENCE_SEPARATOR + ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(3)])
CHUNK_SIZE_TXT = 50000
CHUNK_SIZE_CSV = 25
REF_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR + "fake-dataset" + REFERENCE_SEPARATOR
REF_SUPPLIER_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR
INGESTION_SCRIPT = 'HandleFifEmbulkIngestion'
INVALID = "_invalid"
NEW = "_NEW"
FALSE = "FALSE"
TRUE = "TRUE"
def getTitle(self):
return "DataIngestionTest"
def getBusinessTemplateList(self):
return ('erp5_base', 'erp5_web', 'erp5_ingestion_mysql_innodb_catalog', 'erp5_ingestion', 'erp5_dms',
'erp5_wendelin', 'erp5_callables', 'erp5_core')
def afterSetUp(self):
self.context = self.portal #.UnitTest_getContext()
self.assertEqual(self.REFERENCE_SEPARATOR, self.portal.getIngestionReferenceDictionary()["reference_separator"])
self.assertEqual(self.INVALID, self.portal.getIngestionReferenceDictionary()["invalid_suffix"])
self.assertEqual(self.EOF, self.REFERENCE_SEPARATOR + self.portal.getIngestionReferenceDictionary()["split_end_suffix"])
self.assertEqual(self.SINGLE_INGESTION_END, self.REFERENCE_SEPARATOR)
self.assertEqual(self.PART_1, self.REFERENCE_SEPARATOR + self.portal.getIngestionReferenceDictionary()["split_first_suffix"])
def getRandomReference(self):
random_string = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(10)])
return 'UNIT-TEST-' + random_string
def getIngestionReference(self, reference, extension):
return self.REF_PREFIX + reference + extension
def sanitizeReference(self, reference):
ingestion_reference = self.REFERENCE_SEPARATOR.join(reference.split(self.REFERENCE_SEPARATOR)[1:])
data_stream = self.getDataStream(ingestion_reference)
ingestion_id = data_stream.getId()
return ingestion_id, ingestion_reference
def getFullReference(self, ingestion_reference, size, hash_value):
return self.REF_SUPPLIER_PREFIX + ingestion_reference + self.REFERENCE_SEPARATOR + self.REFERENCE_SEPARATOR + str("") + self.REFERENCE_SEPARATOR + ""
def chunks(self, l, n):
for i in xrange(0, len(l), n):
yield l[i:i+n]
def generateRawDataBytesAndArray(self):
url = 'data_stream_module/mne_sample_for_test'
sample_data_stream = self.context.restrictedTraverse(url)
raw_data, array, json_data = self.context.generateRawData(sample_data_stream)
return raw_data, array, json_data
def getIngestionPolicy(self, reference, ingestion_script):
ingestion_policy = self.portal.portal_catalog.getResultValue(
portal_type = 'Ingestion Policy',
reference = reference)
if ingestion_policy != None: return ingestion_policy
ingestion_policy = self.portal.portal_ingestion_policies.newContent( \
id = reference,
portal_type ='Ingestion Policy',
reference = reference,
version = '001',
script_id = ingestion_script)
ingestion_policy.validate()
self.tic()
return ingestion_policy
def getDataIngestion(self, reference):
data_ingestion = self.portal.portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
reference = reference)
return data_ingestion
def getDataStream(self, reference):
data_stream = self.portal.portal_catalog.getResultValue(
portal_type = 'Data Stream',
reference = reference)
return data_stream
def ingestRequest(self, reference, eof, data_chunk, ingestion):
encoded_data_chunk = base64.b64encode(data_chunk)
request = self.portal.REQUEST
# only POST for Wendelin allowed
request.environ["REQUEST_METHOD"] = 'POST'
request.set('reference', reference + eof + self.SIZE_HASH)
request.set('data_chunk', encoded_data_chunk)
ingestion.ingest()
self.tic()
return
def ingest(self, data_chunk, reference, extension):
ingestion_policy = self.getIngestionPolicy(reference, self.INGESTION_SCRIPT)
ingestion_reference = self.getIngestionReference(reference, extension)
# use default ebulk policy
ingestion_policy = self.portal.portal_ingestion_policies.wendelin_embulk
now = datetime.now()
self.ingestRequest(ingestion_reference, self.SINGLE_INGESTION_END, data_chunk, ingestion_policy)
ingestion_id, ingestion_reference = self.sanitizeReference(ingestion_reference)
return ingestion_reference
def stepIngest(self, extension, delimiter):
file_name = "file_name.csv"
reference = self.getRandomReference()
array = [[random.random() for i in range(self.CHUNK_SIZE_CSV + 10)] for j in range(self.CHUNK_SIZE_CSV + 10)]
np.savetxt(file_name, array, delimiter=delimiter)
chunk = []
with open(file_name, 'r') as csv_file:
data_chunk = csv_file.read()
csv_file.seek(0)
reader = csv.reader(csv_file, delimiter=delimiter)
for index, line in enumerate(reader):
if (index < self.CHUNK_SIZE_CSV):
chunk.append(line)
else:
break
ingestion_reference = self.ingest(data_chunk, reference, extension)
if os.path.exists(file_name):
os.remove(file_name)
# test properly ingested
data_ingestion = self.getDataIngestion(ingestion_reference)
self.assertNotEqual(None, data_ingestion)
data_ingestion_line = [x for x in data_ingestion.objectValues() \
if x.getReference() == 'out_stream'][0]
data_stream = data_ingestion_line.getAggregateValue(portal_type='Data Stream')
self.assertNotEqual(None, data_stream)
data_stream_data = data_stream.getData()
self.assertEqual(data_chunk, data_stream_data)
def test_01_DefaultEbulkIngestion(self):
"""
Test default ingestion with ebulk too.
"""
delimiter = ","
extension = self.CSV
self.stepIngest(extension, delimiter)
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Test Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testDataLakeIngestion</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.testDataLakeIngestion</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Test Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W:124, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:122, 4: Unused variable \'now\' (unused-variable)</string>
<string>W:131, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:131, 76: Unused variable \'j\' (unused-variable)</string>
</tuple>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
</pickle>
<pickle>
<tuple>
<none/>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</tuple>
</pickle>
</record>
</ZopeData>
test.erp5.testDataLakeIngestion
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment