Commit 4adc8720 authored by Eteri's avatar Eteri Committed by Eteri

erp5_wendelin: test unpacking mgspack data and extracting dates

parent eee332f8
...@@ -349,10 +349,11 @@ class Test(ERP5TypeTestCase): ...@@ -349,10 +349,11 @@ class Test(ERP5TypeTestCase):
Test ingestion using a POST Request containing a msgpack encoded message Test ingestion using a POST Request containing a msgpack encoded message
simulating input from fluentd. simulating input from fluentd.
""" """
from DateTime import DateTime from datetime import datetime, timedelta
import time
portal = self.portal portal = self.portal
now = DateTime() now = datetime.now()
reference="test_sensor.test_product" reference="test_sensor.test_product"
title = reference title = reference
...@@ -378,7 +379,7 @@ class Test(ERP5TypeTestCase): ...@@ -378,7 +379,7 @@ class Test(ERP5TypeTestCase):
'reference': reference.split('.')[0], 'reference': reference.split('.')[0],
'version': '001', 'version': '001',
'effective_date': now, 'effective_date': now,
'expiration_date': now + 365*10} 'expiration_date': now + timedelta(days=365)}
data_supply = portal.data_supply_module.newContent( \ data_supply = portal.data_supply_module.newContent( \
portal_type='Data Supply', **data_supply_kw) portal_type='Data Supply', **data_supply_kw)
data_supply.validate() data_supply.validate()
...@@ -403,18 +404,19 @@ class Test(ERP5TypeTestCase): ...@@ -403,18 +404,19 @@ class Test(ERP5TypeTestCase):
self.tic() self.tic()
portal.log("data_supply = ", data_supply) data_list = []
portal.log("ingestion_policy = ", ingestion_policy) int_date = int(time.mktime(now.timetuple()))
real_data = []
number_string_list = [] # create data for ingestion in [date, value] format
for my_list in list(chunks(range(0, 100001), 10)): for x in range(0, 10001):
number_string_list.append(','.join([str(x) for x in my_list])) data_list = []
real_data = '\n'.join(number_string_list) data_list = [int_date, x]
# make sure real_data tail is also a full line real_data.append(data_list)
real_data += '\n' int_date = int_date + 1000
# simulate fluentd # simulate fluentd
body = msgpack.packb([0, real_data], use_bin_type=True) body = msgpack.packb(real_data, use_bin_type=True)
env = {'CONTENT_TYPE': 'application/octet-stream'} env = {'CONTENT_TYPE': 'application/octet-stream'}
path = ingestion_policy.getPath() + '/ingest?reference=' + reference path = ingestion_policy.getPath() + '/ingest?reference=' + reference
...@@ -438,6 +440,18 @@ class Test(ERP5TypeTestCase): ...@@ -438,6 +440,18 @@ class Test(ERP5TypeTestCase):
# body is msgpacked real data. # body is msgpacked real data.
self.assertEqual(body, data_stream_data) self.assertEqual(body, data_stream_data)
# unpack data
start = 0
end = len(data_stream_data)
unpacked, end = data_stream.readMsgpackChunkList(start, end)
# compare unpacked data with real data
self.assertEqual([real_data], unpacked)
# extract dates and compare with real dates
f = data_stream.extractDateTime
for i in range(0, len(unpacked[0])):
self.assertEqual(np.datetime64(real_data[i][0], 's'), f(unpacked[0][i][0]))
# clean up # clean up
data_stream.setData(None) data_stream.setData(None)
self.tic() self.tic()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment