Commit 1ca48658 authored by Ivan Tyagov's avatar Ivan Tyagov

Use current way of ingesting added by JM.

Conflicts:
	bt5/erp5_wendelin/DocumentTemplateItem/portal_components/document.erp5.IngestionPolicy.py
	bt5/erp5_wendelin/TestTemplateItem/portal_components/test.erp5.testWendelin.py
parents 499d03e1 17800924
......@@ -53,12 +53,23 @@ class IngestionPolicy(Folder):
return self.portal_ingestion_policies.unpack(data)
security.declarePublic('ingest')
def ingest(self, **kw):
def ingest(self, REQUEST, **kw):
"""
Ingest chunk of raw data either from a Sensor or any of DAUs.
"""
if self.REQUEST.method != 'POST':
raise BadRequest('Only POST request is allowed.')
environ = REQUEST.environ
method = environ.pop('REQUEST_METHOD')
try:
if method != 'POST':
raise BadRequest('Only POST request is allowed.')
if REQUEST._file is not None:
assert not REQUEST.form, REQUEST.form # Are cgi and HTTPRequest fixed ?
# Query string was ignored so parse again, faking a GET request.
# Such POST is legit: https://stackoverflow.com/a/14710450
REQUEST.processInputs()
REQUEST.form['data_chunk'] = REQUEST._file.read()
finally:
environ['REQUEST_METHOD'] = method
tag_parsing_script_id = self.getScriptId()
......@@ -105,4 +116,4 @@ class IngestionPolicy(Folder):
if ingestion_script is None:
raise NotFound('No such ingestion script found: %s' %ingestion_script_id)
ingestion_script(data_chunk=data_chunk, **parameter_dict)
ingestion_script(data_chunk=data_chunk, **parameter_dict)
\ No newline at end of file
......@@ -27,10 +27,13 @@
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from wendelin.bigarray.array_zodb import ZBigArray
from DateTime import DateTime
from cStringIO import StringIO
import msgpack
import numpy as np
import string
import random
import urllib
def getRandomString():
return 'test_%s' %''.join([random.choice(string.ascii_letters + string.digits) \
......@@ -49,14 +52,14 @@ class Test(ERP5TypeTestCase):
def getTitle(self):
return "Wendelin Test"
def test_01_IngestionFromFluentd(self):
def test_01_IngestionFromFluentd(self, old_fluentd=False):
"""
Test ingestion using a POST Request containing a msgpack encoded message
simulating input from fluentd.
"""
portal = self.portal
request = portal.REQUEST
ingestion_policy = portal.restrictedTraverse("portal_ingestion_policies/wendelin_1")
data_supply = portal.restrictedTraverse("data_supply_module/wendelin_1")
reference = 'wendelin-default-ingestion'
......@@ -67,19 +70,28 @@ class Test(ERP5TypeTestCase):
# make sure real_data tail is also a full line
real_data += '\n'
# simulate fluentd by setting proper values in REQUEST
request.method = 'POST'
data_chunk = msgpack.packb([0, real_data], use_bin_type=True)
request.set('reference', reference)
request.set('data_chunk', data_chunk)
ingestion_policy.ingest()
self.tic()
# simulate fluentd
body = msgpack.packb([0, real_data], use_bin_type=True)
if old_fluentd:
env = {'CONTENT_TYPE': 'application/x-www-form-urlencoded'}
body = urllib.urlencode({'data_chunk': body})
else:
env = {'CONTENT_TYPE': 'application/octet-stream'}
path = ingestion_policy.getPath() + '/ingest?reference=' + reference
publish_kw = dict(basic='ERP5TypeTestCase:', env=env,
request_method='POST', stdin=StringIO(body))
response = self.publish(path, **publish_kw)
# Due to inconsistencies in the Zope framework,
# a normal instance returns 204. As explained at
# http://blog.ploeh.dk/2013/04/30/rest-lesson-learned-avoid-204-responses/
# turning 200 into 204 automatically when the body is empty is questionable.
self.assertEqual(200, response.getStatus())
# get related Data ingestion
data_ingestion = data_supply.Base_getRelatedObjectList(portal_type='Data Ingestion')[0]
self.assertNotEqual(None, data_ingestion)
data_ingestion_line = [x for x in data_ingestion.objectValues() if x.getReference() == 'out_data_stream'][0]
data_stream = data_ingestion_line.getAggregateValue()
self.assertEqual('Data Stream', data_stream.getPortalType())
......@@ -92,7 +104,7 @@ class Test(ERP5TypeTestCase):
reference = reference)
data_array.validate()
self.tic()
data_stream.DataStream_transform(\
chunk_length = 10450, \
transform_script_id = 'DataStream_copyCSVToDataArray',
......@@ -103,15 +115,14 @@ class Test(ERP5TypeTestCase):
zarray = data_array.getArray()
self.assertEqual(np.average(zarray), np.average(np.arange(100001)))
self.assertTrue(np.array_equal(zarray, np.arange(100001)))
# clean up
data_array.invalidate()
data_stream.setData('')
self.tic()
# test ingesting with bad reference and raise of NotFound
#request.set('reference', reference + 'not_existing')
#self.assertRaises(NotFound, ingestion_policy.ingest)
def test_01_1_IngestionFromOldFluentd(self):
self.test_01_IngestionFromFluentd(True)
def test_01_02_ParallelTransformation(self):
"""
......@@ -243,4 +254,4 @@ class Test(ERP5TypeTestCase):
# test as sequence
bucket = bucket_stream.getBucketItemSequence(start_key=10, count=1)[0]
self.assertEqual(100000, bucket[1].value)
\ No newline at end of file
self.assertEqual(100000, bucket[1].value)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment