Commit 68538ec9 authored by Paul Graydon's avatar Paul Graydon

ors_wendelin: Update ORS ingestion process

parent 8d7fe6ae
...@@ -72,7 +72,7 @@ ...@@ -72,7 +72,7 @@
</item> </item>
<item> <item>
<key> <string>script_id</string> </key> <key> <string>script_id</string> </key>
<value> <string>DataAnalysisLine_calculateKPI</string> </value> <value> <string>DataAnalysisLine_calculateOrsKpi</string> </value>
</item> </item>
<item> <item>
<key> <string>title</string> </key> <key> <string>title</string> </key>
...@@ -152,7 +152,7 @@ ...@@ -152,7 +152,7 @@
</item> </item>
<item> <item>
<key> <string>serial</string> </key> <key> <string>serial</string> </key>
<value> <string>1007.25652.35251.43929</string> </value> <value> <string>1017.25284.41439.37888</string> </value>
</item> </item>
<item> <item>
<key> <string>state</string> </key> <key> <string>state</string> </key>
...@@ -172,7 +172,7 @@ ...@@ -172,7 +172,7 @@
</tuple> </tuple>
<state> <state>
<tuple> <tuple>
<float>1680177876.53</float> <float>1718889733.66</float>
<string>UTC</string> <string>UTC</string>
</tuple> </tuple>
</state> </state>
......
...@@ -8,22 +8,25 @@ total_size = in_data_stream.getSize() ...@@ -8,22 +8,25 @@ total_size = in_data_stream.getSize()
if start >= total_size: if start >= total_size:
return return
chunk_size = 1024*1024 chunk_size = 1024 * 1024
end = min(start + chunk_size, total_size)
if start + chunk_size > total_size: log_data = ''.join(in_data_stream.readChunkList(start, end))
end = total_size
else:
end = start + chunk_size
chunk_data = "".join(in_data_stream.readChunkList(start, end)) # Some lines may contain several contiguous JSON objects
# last one maybe a not valide json # Normalize to one JSON object per line
split_chunk_data = chunk_data.splitlines() newlines_offset = log_data.count('}{')
if len(split_chunk_data) < 2: log_data = log_data.replace('}{', '}\n{')
# Last chunk may be an incomplete JSON object: leave it for next iteration
log_data_lines = log_data.splitlines()
if len(log_data_lines) < 2:
return return
chunk_data = '\n'.join(split_chunk_data[:-1]) log_data = '\n'.join(log_data_lines[:-1])
end = start + len(chunk_data) + 1
end = start + len(log_data) - newlines_offset + 1
chunk_data = chunk_data.decode('utf8') log_data = log_data.decode('utf8')
e_rab_data_array = None e_rab_data_array = None
e_utran_data_array = None e_utran_data_array = None
...@@ -34,7 +37,12 @@ for array in out_array: ...@@ -34,7 +37,12 @@ for array in out_array:
if array['variation'] == 'e_utran': if array['variation'] == 'e_utran':
e_utran_data_array = array['Data Array'] e_utran_data_array = array['Data Array']
vt, vInititialEPSBEstabSR, vAddedEPSBEstabSR, evt, vIPThp_qci = context.Base_getORSKPIValue(chunk_data) try:
vt, vInititialEPSBEstabSR, vAddedEPSBEstabSR, evt, vIPThp_qci = context.Base_getORSKPIValue(log_data)
except AssertionError:
# If some data is invalid: ignore data chunk and move on rather than crashing the data analysis activity
progress_indicator.setIntOffsetIndex(end)
return
e_rab_dtype = np.dtype([ e_rab_dtype = np.dtype([
('vt', 'float'), ('vt', 'float'),
...@@ -44,7 +52,7 @@ e_rab_dtype = np.dtype([ ...@@ -44,7 +52,7 @@ e_rab_dtype = np.dtype([
('vAddedEPSBEstabSR_hi', 'float64'), ('vAddedEPSBEstabSR_hi', 'float64'),
]) ])
e_utran_dtype =np.dtype([ e_utran_dtype = np.dtype([
('evt', 'float'), ('evt', 'float'),
('dl_lo', 'float64'), ('dl_lo', 'float64'),
('dl_hi', 'float64'), ('dl_hi', 'float64'),
...@@ -57,7 +65,6 @@ if not e_rab_array: ...@@ -57,7 +65,6 @@ if not e_rab_array:
e_rab_array = e_rab_data_array.initArray(shape=(0,), dtype=e_rab_dtype) e_rab_array = e_rab_data_array.initArray(shape=(0,), dtype=e_rab_dtype)
e_rab_array_data = [] e_rab_array_data = []
e_utran_array = e_utran_data_array.getArray() e_utran_array = e_utran_data_array.getArray()
if not e_utran_array: if not e_utran_array:
e_utran_array = e_utran_data_array.initArray(shape=(0,), dtype=e_utran_dtype ) e_utran_array = e_utran_data_array.initArray(shape=(0,), dtype=e_utran_dtype )
......
...@@ -77,15 +77,15 @@ ...@@ -77,15 +77,15 @@
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
<value> <string>DataAnalysisLine_calculateKPI</string> </value> <value> <string>DataAnalysisLine_calculateOrsKpi</string> </value>
</item> </item>
<item> <item>
<key> <string>reference</string> </key> <key> <string>reference</string> </key>
<value> <string>DataAnalysisLine_calculateKPI</string> </value> <value> <string>DataAnalysisLine_calculateOrsKpi</string> </value>
</item> </item>
<item> <item>
<key> <string>title</string> </key> <key> <string>title</string> </key>
<value> <string>DataAnalysisLine_calculateKPI</string> </value> <value> <string>DataAnalysisLine_calculateOrsKpi</string> </value>
</item> </item>
</dictionary> </dictionary>
</pickle> </pickle>
......
"""
This script is used during the ingestion of ORS log files through fluentd.
It assumes data comes encoded in the following format: msgpack(timestamp, data).
It will first unpack the msgpack, then remove the first item of the tuple (timestamp),
get the actual data stored under the 'log' key and append the corresponding string to "Data Stream".
"""
out_stream["Data Stream"].appendData('\n'.join([c[1]['log'] for c in context.unpack(data_chunk)]))
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PyData Script" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="_reconstructor" module="copy_reg"/>
</klass>
<tuple>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
<global name="object" module="__builtin__"/>
<none/>
</tuple>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_local_properties</string> </key>
<value>
<tuple>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>reference</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>comment</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
</tuple>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>data_chunk, out_stream={}</string> </value>
</item>
<item>
<key> <string>comment</string> </key>
<value> <string>This script is used with default simple Wendelin model defined in erp5_wendelin_data.\n
For other models it will need adjustments.</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value> <string>Ingestion operation script to be used for ingestion of ORS logs through fluentd: unpack the data from MsgPack format and append it to the data stream.</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>DataIngestionLine_writeOrsLogFluentdIngestionToDataStream</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>PyData Script</string> </value>
</item>
<item>
<key> <string>reference</string> </key>
<value> <string>DataIngestionLine_writeOrsLogFluentdIngestionToDataStream</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string>DataIngestionLine_writeOrsLogFluentdIngestionToDataStream</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
...@@ -52,15 +52,17 @@ ...@@ -52,15 +52,17 @@
</tuple> </tuple>
</value> </value>
</item> </item>
<item>
<key> <string>data_operation_script_id</string> </key>
<value> <string>IngestionPolicy_getIngestionOperationAndParameterDict</string> </value>
</item>
<item> <item>
<key> <string>default_reference</string> </key> <key> <string>default_reference</string> </key>
<value> <string>ors_ingestion</string> </value> <value> <string>ors_ingestion</string> </value>
</item> </item>
<item> <item>
<key> <string>description</string> </key> <key> <string>description</string> </key>
<value> <value> <string>Handles ingestion of packets of ORS enb.xlog data forwarded through fluentd in MessagePack format. </string> </value>
<none/>
</value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
...@@ -148,7 +150,7 @@ ...@@ -148,7 +150,7 @@
</item> </item>
<item> <item>
<key> <string>serial</string> </key> <key> <string>serial</string> </key>
<value> <string>1007.25441.42822.58419</string> </value> <value> <string>1017.20946.52185.63385</string> </value>
</item> </item>
<item> <item>
<key> <string>state</string> </key> <key> <string>state</string> </key>
...@@ -168,7 +170,7 @@ ...@@ -168,7 +170,7 @@
</tuple> </tuple>
<state> <state>
<tuple> <tuple>
<float>1680168713.51</float> <float>1718612396.27</float>
<string>UTC</string> <string>UTC</string>
</tuple> </tuple>
</state> </state>
......
...@@ -33,12 +33,12 @@ data_supply.newContent( ...@@ -33,12 +33,12 @@ data_supply.newContent(
data_supply.newContent( data_supply.newContent(
portal_type='Data Supply Line', portal_type='Data Supply Line',
title='Ingest Data', title='Ingest ORS Log Data',
reference='ingestion_operation', reference='ingestion_operation',
quantity=1, quantity=1,
int_index=1, int_index=1,
aggregate_value=context, aggregate_value=context,
resource='data_operation_module/wendelin_ingest_data' resource='data_operation_module/ors_wendelin_ingest_log_data'
).validate() ).validate()
data_supply.validate() data_supply.validate()
......
...@@ -4,7 +4,8 @@ data_product_module/ors_kpi/** ...@@ -4,7 +4,8 @@ data_product_module/ors_kpi/**
data_transformation_module/ors_data_transformation data_transformation_module/ors_data_transformation
data_transformation_module/ors_data_transformation/** data_transformation_module/ors_data_transformation/**
organisation_module/ors_* organisation_module/ors_*
portal_callables/DataAnalysisLine_calculateKPI portal_callables/DataAnalysisLine_calculateOrsKpi
portal_callables/DataIngestionLine_writeOrsLogFluentdIngestionToDataStream
portal_callables/IngestionPolicy_parseORSTag portal_callables/IngestionPolicy_parseORSTag
portal_ingestion_policies/ors_ingestion portal_ingestion_policies/ors_ingestion
web_page_module/ndarray_bundle.js web_page_module/ndarray_bundle.js
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment