Commit b5c0c55b authored by Klaus Wölfel's avatar Klaus Wölfel

allow multiple input lines per ingestion

parent caaf87de
...@@ -16,6 +16,61 @@ data_ingestion = portal_catalog.getResultValue( ...@@ -16,6 +16,61 @@ data_ingestion = portal_catalog.getResultValue(
simulation_state = 'started', simulation_state = 'started',
reference =data_ingestion_reference) reference =data_ingestion_reference)
def init_input_line(input_line, operation_line):
# copy device and configuration from operation line to input line
input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList())
# Check if we have a batch referece
data_ingestion_batch_reference = movement_dict.get(
'aggregate_data_ingestion_batch_reference',
None)
data_ingestion_batch_id = "%s-%s" %(today_string,
data_ingestion_batch_reference)
data_sink = None
if data_ingestion_batch_reference is not None:
data_ingestion_batch = portal_catalog.getResultValue(
portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference)
if data_ingestion_batch is None:
data_ingestion_batch = portal.data_ingestion_batch_module.get(
data_ingestion_batch_id)
if data_ingestion_batch is None:
data_ingestion_batch = portal.data_ingestion_batch_module.newContent(
id = data_ingestion_batch_id,
portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference)
else:
previous_data_ingestion_line = portal_catalog.getResultValue(
portal_type = "Data Ingestion Line",
resource_reference = resource_reference,
aggregate_uid = data_ingestion_batch.getUid())
if previous_data_ingestion_line is not None:
data_sink = previous_data_ingestion_line\
.getAggregateDataSinkValue()
input_line.setDefaultAggregateValue(data_ingestion_batch)
data_product = portal.portal_catalog.getResultValue(
portal_type = "Data Product",
reference = resource_reference)
data_sink_type = data_product.getAggregatedPortalType()
if data_sink is None:
data_sink = portal.getDefaultModule(data_sink_type).newContent(
portal_type = data_sink_type,
reference = "%s-%s" %(data_ingestion_reference, resource_reference))
data_sink.validate()
input_line.setDefaultAggregateValue(data_sink)
input_line.setQuantity(1)
if data_ingestion is None: if data_ingestion is None:
document = portal.data_ingestion_module.get(data_ingestion_id) document = portal.data_ingestion_module.get(data_ingestion_id)
if (document is not None) and document.getSimulationState() == 'started': if (document is not None) and document.getSimulationState() == 'started':
...@@ -78,58 +133,7 @@ if data_ingestion is None: ...@@ -78,58 +133,7 @@ if data_ingestion is None:
# we set quantity=0 for the empty line # we set quantity=0 for the empty line
current_line.setQuantity(0) current_line.setQuantity(0)
# copy device and configuration from operation line to input line init_input_line(input_line, operation_line)
input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList())
# Check if we have a batch referece
data_ingestion_batch_reference = movement_dict.get(
'aggregate_data_ingestion_batch_reference',
None)
data_ingestion_batch_id = "%s-%s" %(today_string,
data_ingestion_batch_reference)
data_sink = None
if data_ingestion_batch_reference is not None:
data_ingestion_batch = portal_catalog.getResultValue(
portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference)
if data_ingestion_batch is None:
data_ingestion_batch = portal.data_ingestion_batch_module.get(
data_ingestion_batch_id)
if data_ingestion_batch is None:
data_ingestion_batch = portal.data_ingestion_batch_module.newContent(
id = data_ingestion_batch_id,
portal_type = "Data Ingestion Batch",
reference = data_ingestion_batch_reference)
else:
previous_data_ingestion_line = portal_catalog.getResultValue(
portal_type = "Data Ingestion Line",
resource_reference = resource_reference,
aggregate_uid = data_ingestion_batch.getUid())
if previous_data_ingestion_line is not None:
data_sink = previous_data_ingestion_line\
.getAggregateDataSinkValue()
input_line.setDefaultAggregateValue(data_ingestion_batch)
data_product = portal.portal_catalog.getResultValue(
portal_type = "Data Product",
reference = resource_reference)
data_sink_type = data_product.getAggregatedPortalType()
if data_sink is None:
data_sink = portal.getDefaultModule(data_sink_type).newContent(
portal_type = data_sink_type,
reference = data_ingestion_reference)
data_sink.validate()
input_line.setDefaultAggregateValue(data_sink)
data_ingestion.start() data_ingestion.start()
else: else:
...@@ -139,6 +143,9 @@ else: ...@@ -139,6 +143,9 @@ else:
input_line = line input_line = line
elif line.getResourceValue().getPortalType() == "Data Operation": elif line.getResourceValue().getPortalType() == "Data Operation":
operation_line = line operation_line = line
if input_line.getQuantity() == 0:
init_input_line(input_line, operation_line)
data_operation = operation_line.getResourceValue() data_operation = operation_line.getResourceValue()
parameter_dict = { parameter_dict = {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment