Commit e13aed4a authored by Klaus Wölfel's avatar Klaus Wölfel

Support streaming case with new framework

parent ad57c970
operation = None
use = None
parameter_dict = {}
initial_product = context.getSpecialiseValue().getResourceValue()
for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
resource = analysis_line.getResourceValue()
if resource == initial_product():
use = analysis_line.getUse()
if resource is not None:
resource_portal_type = resource.getPortalType()
else:
......@@ -11,10 +15,13 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
operation = analysis_line.getResourceValue()
else:
reference = analysis_line.getReference()
aggregate = analysis_line.getAggregateDataStreamValue() or \
aggregate = analysis_line.getAggregateDataSinkValue() or \
analysis_line.getAggregateDataArrayValue() or \
analysis_line.getAggregateDataDescriptorValue()
parameter_dict[reference] = aggregate
script_id = operation.getScriptId()
getattr(operation_analysis_line, script_id)(**parameter_dict)
context.stop()
# only stop batch ingestions
if use == "big_data/ingestion/batch":
context.stop()
......@@ -82,6 +82,7 @@
<value>
<list>
<string>my_title</string>
<string>my_reference</string>
<string>my_specialise_title</string>
<string>my_causality_title</string>
</list>
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ProxyField" module="Products.ERP5Form.ProxyField"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>delegated_list</string> </key>
<value>
<list/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>my_reference</string> </value>
</item>
<item>
<key> <string>message_values</string> </key>
<value>
<dictionary>
<item>
<key> <string>external_validator_failed</string> </key>
<value> <string>The input failed the external validator.</string> </value>
</item>
</dictionary>
</value>
</item>
<item>
<key> <string>overrides</string> </key>
<value>
<dictionary>
<item>
<key> <string>field_id</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>form_id</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>target</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</value>
</item>
<item>
<key> <string>tales</string> </key>
<value>
<dictionary>
<item>
<key> <string>field_id</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>form_id</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>target</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</value>
</item>
<item>
<key> <string>values</string> </key>
<value>
<dictionary>
<item>
<key> <string>field_id</string> </key>
<value> <string>my_view_mode_reference</string> </value>
</item>
<item>
<key> <string>form_id</string> </key>
<value> <string>Base_viewDMSFieldLibrary</string> </value>
</item>
<item>
<key> <string>target</string> </key>
<value> <string>Click to edit the target</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
from DateTime import DateTime
from Products.ZSQLCatalog.SQLCatalog import AndQuery, OrQuery, Query, SimpleQuery
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
now = DateTime()
query_dict = {
"portal_type": "Data Ingestion Line",
"stock.quantity": '!=0',
"resource_portal_type": "Data Product",
"simulation_state": "stopped"}
query = AndQuery(
Query(portal_type = "Data Ingestion Line"),
Query(**{"stock.quantity": "!=0"}),
Query(resource_portal_type = "Data Product"),
# Should be improved to support mor than one analysis per ingestion
SimpleQuery(causality_related_relative_url = None),
OrQuery(
Query(simulation_state = "stopped",
use_relative_url = "use/big_data/ingestion/batch"),
Query(simulation_state = "started",
use_relative_url = "use/big_data/ingestion/stream")))
for movement in portal_catalog(**query_dict):
batch_relative_url = movement.getAggregateDataIngestionBatch()
if batch_relative_url is None:
for movement in portal_catalog(query):
if movement.DataIngestionLine_hasMissingRequiredItem():
raise ValueError("Transformation requires movement to have " +
"aggregated data ingestion batch")
data_ingestion = movement.getParentValue()
......@@ -58,12 +65,20 @@ for movement in portal_catalog(**query_dict):
# ingestion line with the same resource. If it is an operation line
# then we search for an ingestion line with resource portal type
# Data Product
batch_relative_url = movement.getAggregateDataIngestionBatch()
if batch_relative_url is not None:
related_movement_list = portal_catalog(
portal_type="Data Ingestion Line",
aggregate_relative_url=batch_relative_url,
resource_relative_url = resource.getRelativeUrl())
else:
# get related movements only from current data ingestion
related_movement_list = movement.getParentValue().searchFolder(
portal_type="Data Ingestion Line",
resource_relative_url = resource.getRelativeUrl())
for related_movement in related_movement_list:
aggregate_set.update(related_movement.getAggregateSet())
if related_movement.getUse() == "big_data/ingestion/batch":
related_movement.getParentValue().deliver()
else:
# it is an output line
......@@ -76,7 +91,7 @@ for movement in portal_catalog(**query_dict):
data_ingestion.getReference()),
version = '001')
item.validate()
aggregate_set.add(item)
aggregate_set.add(item.getRelativeUrl())
data_analysis.newContent(
portal_type = "Data Analysis Line",
......@@ -84,8 +99,9 @@ for movement in portal_catalog(**query_dict):
reference = transformation_line.getReference(),
int_index = transformation_line.getIntIndex(),
resource_value = resource,
variation_category_list = transformation_line.getVariationCategoryList(),
quantity = quantity,
quantity_unit = transformation_line.getQuantityUnit(),
aggregate_value_set = aggregate_set)
aggregate_set = aggregate_set)
data_analysis.plan()
data_analysis.start()
......@@ -2,7 +2,6 @@ portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
for data_analysis in portal_catalog(portal_type = "Data Analysis",
simulation_state = "planned"):
data_analysis.start()
simulation_state = "started"):
data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\
.DataAnalysis_executeDataOperation()
......@@ -5,11 +5,14 @@ from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
portal_catalog = context.getPortalObject().portal_catalog
# search for dates older than one minute ago
# Only stop data ingestion of related resource is configured for batch
# ingestion
old_start_date = addToDate(DateTime(), {'minute' : -1})
start_date_query = Query(**{'delivery.start_date': old_start_date, 'range': 'ngt'})
kw_dict = {"query": start_date_query,
"portal_type": "Data Ingestion",
"simulation_state": "started"}
"simulation_state": "started",
"use_relative_url": "use/big_data/ingestion/batch"}
parent_uid_list = [x.getUid() for x in portal_catalog(**kw_dict)]
......@@ -53,10 +56,5 @@ if len(parent_uid_list) != 0:
# TODO: this should be implemented in transformation, not here
continue
# Only stop data ingestion of related resource is configured for batch
# ingestion
# TODO: probably this should be done in another way
resource = data_ingestion_line.getResourceValue()
if "big_data/ingestion/batch_ingestion" in resource.getUseList():
data_ingestion.setStopDate(DateTime())
data_ingestion.stop()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment