Commit 237ac09a authored by Ivan Tyagov's avatar Ivan Tyagov

erp5_wendelin: append the original data(msgpack) to data stream - without unpacking

See merge request !43
parents e571a68b fbc97e19
......@@ -30,7 +30,7 @@ from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet
from erp5.component.document.BigFile import BigFile
from wendelin.bigarray.array_zodb import ZBigArray
from Products.ERP5.Document.File import _MARKER
from erp5.component.document.File import _MARKER
from ZPublisher import HTTPRangeSupport
from webdav.common import rfc1123_date
from DateTime import DateTime
......
......@@ -30,7 +30,7 @@ import hashlib
from BTrees.OOBTree import OOBTree
from BTrees.LOBTree import LOBTree
from AccessControl import ClassSecurityInfo
from Products.ERP5.Document.Document import Document
from erp5.component.document.Document import Document
from Products.ERP5Type import Permissions, PropertySheet
from Products.ERP5Type.BTreeData import PersistentString
from erp5.component.module.Log import log
......
......@@ -26,6 +26,9 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import msgpack
import struct
import numpy as np
from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet
from erp5.component.document.BigFile import BigFile
......@@ -57,6 +60,33 @@ class DataStream(BigFile):
for chunk in data.iterate(start_offset, end_offset - start_offset):
yield chunk
def readMsgpackChunkList(self, start_offset, end_offset):
"""
Read chunks of msgpack data from a Data Stream and return (unpacked_data list, offset)
"""
unpacker = msgpack.Unpacker()
data = self._baseGetData()
pos = start_offset
data_list = []
for chunk in data.iterate(start_offset, end_offset - start_offset):
unpacker.feed(chunk)
while True:
pos = start_offset + unpacker.tell()
try:
#yield unpacker.unpack()
data_list.append(unpacker.unpack())
except msgpack.exceptions.OutOfData:
break
#raise StopIteration(pos)
return data_list, pos
def extractDateTime(self, date_time_holder):
if isinstance(date_time_holder, int):
return np.datetime64(date_time_holder, 's')
# if it is not in, we Expect msgpack.ExtType
s, ns = struct.unpack(">II", date_time_holder.data)
return np.datetime64(s, 's') + np.timedelta64(ns, 'ns')
def readChunkList(self, start_offset, end_offset):
"""
Read chunks of data from a Data Stream and return them.
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PyData Script" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_local_properties</string> </key>
<value>
<tuple>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>reference</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>comment</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
</tuple>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>data_chunk, out_stream={}</string> </value>
</item>
<item>
<key> <string>comment</string> </key>
<value> <string>This script is used with default simple Wendelin model defined in erp5_wendelin_data.\n
For other models it will need adjustments.</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value> <string>Ingestion operation script to be used with Fluentd Input Bin plugin: Append given data chunk to given data stream</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>DataIngestionLine_writeIngestionToDataStream</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>PyData Script</string> </value>
</item>
<item>
<key> <string>reference</string> </key>
<value> <string>DataIngestionLine_writeIngestionToDataStream</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string>DataIngestionLine_writeIngestionToDataStream</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
sensor_reference = reference.split('.')[0]
data_product_reference = reference.split('.')[1]
return {'resource_reference' : data_product_reference,
'specialise_reference': sensor_reference,
'reference': sensor_reference }
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PyData Script" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_local_properties</string> </key>
<value>
<tuple>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>reference</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>comment</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
</tuple>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>reference</string> </value>
</item>
<item>
<key> <string>comment</string> </key>
<value> <string>tag example: sensor_1.sample-data</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value> <string>tag example: sensor_1.sample-data</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>IngestionPolicy_parseSimpleIngestionTag</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>PyData Script</string> </value>
</item>
<item>
<key> <string>reference</string> </key>
<value> <string>IngestionPolicy_parseSimpleIngestionTag</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string>IngestionPolicy_parseSimpleIngestionTag</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -125,7 +125,7 @@ class Test(ERP5TypeTestCase):
# clean up
data_array.invalidate()
data_stream.setData('')
data_stream.setData(None)
self.tic()
......@@ -341,4 +341,117 @@ class Test(ERP5TypeTestCase):
reg = LinearRegression().fit(X, y)
predicted = reg.predict(np.array([[4, 10]]))
self.assertEqual(predicted.all(),np.array([27.]).all())
\ No newline at end of file
def test_09_IngestionFromFluentdStoreMsgpack(self, old_fluentd=False):
"""
Test ingestion using a POST Request containing a msgpack encoded message
simulating input from fluentd.
"""
from datetime import datetime, timedelta
import time
portal = self.portal
now = datetime.now()
reference="test_sensor.test_product"
title = reference
ingestion_policy = portal.portal_ingestion_policies['default']
# create related data supply and etc.
use_category = portal.restrictedTraverse("portal_categories/use/big_data/ingestion/stream")
data_operation = portal.restrictedTraverse("data_operation_module/wendelin_ingest_data")
# create Data Product
data_product = portal.data_product_module.newContent(
portal_type = "Data Product",
title = "Append to Data Stream",
reference = reference.split('.')[1])
data_product.setUseValue(use_category)
data_product.setAggregatedPortalTypeList(["Data Stream", "Progress Indicator"])
data_product.validate()
# create Data Supply
data_supply_kw = {'title': title,
'reference': reference.split('.')[0],
'version': '001',
'effective_date': now,
'expiration_date': now + timedelta(days=365)}
data_supply = portal.data_supply_module.newContent( \
portal_type='Data Supply', **data_supply_kw)
data_supply.validate()
# add ingestion line
data_supply_line_kw = {'title': 'Ingest Data',
'reference': 'ingestion_operation',
'int_index': 1,
'quantity': 1.0}
data_supply_line = data_supply.newContent(portal_type='Data Supply Line', **data_supply_line_kw)
data_supply_line.setResourceValue(data_operation)
# add append to Data Stream line
data_supply_line_kw = {'title': 'Data Stream',
'reference': 'out_stream',
'int_index': 2,
'quantity': 1.0}
data_supply_line = data_supply.newContent(portal_type='Data Supply Line', \
**data_supply_line_kw)
data_supply_line.setResourceValue(data_product)
data_supply_line.setUseValue(use_category)
self.tic()
data_list = []
int_date = int(time.mktime(now.timetuple()))
real_data = []
# create data for ingestion in [date, value] format
for x in range(0, 10001):
data_list = []
data_list = [int_date, x]
real_data.append(data_list)
int_date = int_date + 1000
# simulate fluentd
body = msgpack.packb(real_data, use_bin_type=True)
env = {'CONTENT_TYPE': 'application/octet-stream'}
path = ingestion_policy.getPath() + '/ingest?reference=' + reference
publish_kw = dict(user='ERP5TypeTestCase', env=env,
request_method='POST', stdin=StringIO(body))
response = self.publish(path, **publish_kw)
self.assertEqual(200, response.getStatus())
self.tic()
# get related Data ingestion
data_ingestion = data_supply.Base_getRelatedObjectList(portal_type='Data Ingestion')[0]
self.assertNotEqual(None, data_ingestion)
data_ingestion_line = [x for x in data_ingestion.objectValues() if x.getReference() == 'out_stream'][0]
data_stream = data_ingestion_line.getAggregateValue()
self.assertEqual('Data Stream', data_stream.getPortalType())
data_stream_data = data_stream.getData()
# body is msgpacked real data.
self.assertEqual(body, data_stream_data)
# unpack data
start = 0
end = len(data_stream_data)
unpacked, end = data_stream.readMsgpackChunkList(start, end)
# compare unpacked data with real data
self.assertEqual([real_data], unpacked)
# extract dates and compare with real dates
f = data_stream.extractDateTime
for i in range(0, len(unpacked[0])):
self.assertEqual(np.datetime64(real_data[i][0], 's'), f(unpacked[0][i][0]))
# clean up
data_stream.setData(None)
self.tic()
portal_callables/DataIngestionLine_writeFluentdIngestionToDataStream
portal_callables/IngestionPolicy_parseSimpleFluentdTag
\ No newline at end of file
portal_callables/IngestionPolicy_parseSimpleFluentdTag
portal_callables/DataIngestionLine_writeIngestionToDataStream
portal_callables/IngestionPolicy_parseSimpleIngestionTag
\ No newline at end of file
......@@ -3,7 +3,9 @@ web_page_module/wendelin_js
portal_gadgets/WendelinInformationGadget
portal_ingestion_policies/default_http_json
portal_ingestion_policies/default_http_json/**
portal_ingestion_policies/default
data_supply_module/default_http_json
data_supply_module/default_http_json/**
data_product_module/default_http_json
data_product_module/default_http_json/**
\ No newline at end of file
data_product_module/default_http_json/**
data_operation_module/wendelin_ingest_data
\ No newline at end of file
data_operation_module/wendelin_ingest_data
data_product_module/default_http_json
data_product_module/default_http_json/**
data_supply_module/default_http_json
data_supply_module/default_http_json/**
portal_alarms/wendelin_handle_analysis
portal_callables/DataIngestionLine_writeFluentdIngestionToDataStream
portal_callables/DataIngestionLine_writeIngestionToDataStream
portal_callables/IngestionPolicy_parseSimpleFluentdTag
portal_callables/IngestionPolicy_parseSimpleIngestionTag
portal_categories/business_application
portal_categories/business_application/**
portal_categories/quantity_unit
......@@ -17,6 +20,7 @@ portal_categories/use
portal_categories/use/**
portal_gadgets/WendelinInformationGadget
portal_gadgets/WendelinInformationGadget/**
portal_ingestion_policies/default
portal_ingestion_policies/default_http_json
portal_ingestion_policies/default_http_json/**
web_page_module/wendelin_information_gadget.html
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment