Commit eff63642 authored by Ivan Tyagov's avatar Ivan Tyagov

Transformation at append of data to Data Stream is triggered by an activity...

Transformation at append of data to Data Stream is triggered by an activity not at append time any more thus test is redundant.
parent 8bde5dac
......@@ -26,10 +26,7 @@
##############################################################################
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript
from wendelin.bigarray.array_zodb import ZBigArray
#from DateTime import DateTime
#from zExceptions import NotFound
import msgpack
import numpy as np
import string
......@@ -116,100 +113,6 @@ class Test(ERP5TypeTestCase):
#request.set('reference', reference + 'not_existing')
#self.assertRaises(NotFound, ingestion_policy.ingest)
def test_01_1_IngestionTail(self):
"""
Test real time convertion to a numpy array by appending data to a data stream.
"""
portal = self.portal
reference = getRandomString()
number_string_list = []
for my_list in list(chunks(range(0, 10001), 10)):
number_string_list.append(','.join([str(x) for x in my_list]))
real_data = '\n'.join(number_string_list)
# make sure real_data tail is also a full line
real_data += '\n'
data_stream = portal.data_stream_module.newContent(
portal_type = 'Data Stream',
reference = reference)
data_stream.validate()
data_stream.appendData(real_data)
data_array = portal.data_array_module.newContent(
portal_type = 'Data Array',
reference = reference)
data_array.validate()
self.tic()
self.assertEqual(None, data_array.getArray())
# override DataStream_transformTail to actually do transformation on appenData
start = data_stream.getSize()
script_id = 'DataStream_transformTail'
script_content_list = ["start_offset, end_offset", """
# created by testWendelin.test_01_1_IngestionTail
start = %s
end = %s
context.activate().DataStream_readChunkListAndTransform( \
start, \
end, \
%s, \
transform_script_id = 'DataStream_copyCSVToDataArray', \
data_array_reference=context.getReference())""" %(start, start + 10450, 10450)]
createZODBPythonScript(
portal.portal_skins.custom,
script_id,
*script_content_list)
self.tic()
number_string_list = []
for my_list in list(chunks(range(10001, 200001), 10)):
number_string_list.append(','.join([str(x) for x in my_list]))
real_data = '\n'.join(number_string_list)
# make sure real_data tail is also a full line
real_data += '\n'
# append data to Data Stream and check array which should be feed now.
data_stream.appendData(real_data)
self.tic()
# test that extracted array contains same values as input CSV
zarray = data_array.getArray()
self.assertNotEqual(None, data_array.getArray())
expected_numpy_array = np.arange(10001, 200001)
self.assertEqual(np.average(zarray), np.average(expected_numpy_array))
self.assertTrue(np.array_equal(zarray, expected_numpy_array))
# clean up script
portal.portal_skins.custom.manage_delObjects([script_id,])
self.tic()
# analyze numpy array using activities.
active_process = portal.portal_activities.newActiveProcess()
zarray = data_array.getArray()
max_elements = zarray.shape[0]
expected_result_list = []
jobs = 15
offset = max_elements / jobs
start = 0
end = start + offset
for _ in range(jobs):
# calculate directly expectations
expected_result_list.append(np.average(expected_numpy_array[start:end]))
data_array.activate(
active_process = active_process.getPath(), \
activity='SQLQueue').DataArray_calculateArraySliceAverageAndStore(start, end)
data_array.log('%s %s' %(start, end))
start += offset
end += offset
self.tic()
result_list = [x.getResult() for x in active_process.getResultList()]
self.assertSameSet(result_list, expected_result_list)
# final reduce job to a number
sum(result_list)
def test_01_02_ParallelTransformation(self):
"""
test parallel execution.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment