From 24511e089f92f861e1c78b1e83464136db3bf48c Mon Sep 17 00:00:00 2001
From: Roque Porchetto <roque.porchetto@nexedi.com>
Date: Thu, 16 Jul 2020 19:30:21 +0000
Subject: [PATCH] erp5_wendelin_data_lake_ingestion: use linked data streams
 for split ingestions - test updated

---
 .../ERP5Site_stopIngestionList.py             | 10 ++++++++++
 .../test.erp5.testDataLakeIngestion.py        | 20 ++++++++++++++++++-
 .../test.erp5.testDataLakeIngestion.xml       |  4 ++--
 3 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/ERP5Site_stopIngestionList.py b/bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/ERP5Site_stopIngestionList.py
index 592901c..dd3ef72 100644
--- a/bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/ERP5Site_stopIngestionList.py
+++ b/bt5/erp5_wendelin_data_lake_ingestion/SkinTemplateItem/portal_skins/erp5_wendelin_data_lake/ERP5Site_stopIngestionList.py
@@ -93,6 +93,16 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
               ingestion.stop()
           else:
             ingestion.deliver()
+        #link split datastreams
+        related_split_streams = portal_catalog(portal_type = "Data Stream",
+                                               reference = data_ingestion.getReference(),
+                                               sort_on=[('creation_date', 'ascending')])
+        predecessor = None
+        for stream in related_split_streams:
+          if predecessor:
+            predecessor.setSuccessorValue(stream)
+            stream.setPredecessorValue(predecessor)
+          predecessor = stream
       except Exception as e:
         context.log("ERROR handling split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
         context.log(e)
diff --git a/bt5/erp5_wendelin_data_lake_ingestion/TestTemplateItem/portal_components/test.erp5.testDataLakeIngestion.py b/bt5/erp5_wendelin_data_lake_ingestion/TestTemplateItem/portal_components/test.erp5.testDataLakeIngestion.py
index 6e16281..e55cbc4 100644
--- a/bt5/erp5_wendelin_data_lake_ingestion/TestTemplateItem/portal_components/test.erp5.testDataLakeIngestion.py
+++ b/bt5/erp5_wendelin_data_lake_ingestion/TestTemplateItem/portal_components/test.erp5.testDataLakeIngestion.py
@@ -75,7 +75,8 @@ class TestDataIngestion(SecurityTestCase):
   def getDataStreamChunkList(self, reference):
     data_stream_list = self.portal.portal_catalog(
                         portal_type = 'Data Stream',
-                        reference = reference)
+                        reference = reference,
+                        sort_on=[('creation_date', 'ascending')])
     return data_stream_list
 
   def ingestRequest(self, reference, eof, data_chunk, ingestion_policy):
@@ -189,6 +190,23 @@ class TestDataIngestion(SecurityTestCase):
     #all data streams are validated
     self.assertSameSet(['validated' for x in data_stream_list],
                        [x.getValidationState() for x in data_stream_list])
+    #data streams are linked
+    data_stream_1 = data_stream_list[0].getObject()
+    data_stream_2 = data_stream_list[1].getObject()
+    data_stream_3 = data_stream_list[2].getObject()
+    data_stream_4 = data_stream_list[3].getObject()
+    # test successor
+    self.assertSameSet(data_stream_2.getRecursiveSuccessorValueList(), \
+                       [data_stream_3, data_stream_4])
+    self.assertSameSet(data_stream_4.getRecursiveSuccessorValueList(), \
+                       [])
+    # test predecessor
+    self.assertSameSet(data_stream_1.getRecursivePredecessorValueList(), \
+                       [])
+    self.assertSameSet(data_stream_2.getRecursivePredecessorValueList(), \
+                       [data_stream_1])
+    self.assertSameSet(data_stream_4.getRecursivePredecessorValueList(), \
+                       [data_stream_3, data_stream_2, data_stream_1])
 
   def test_03_DefaultWendelinConfigurationExistency(self):
     """
diff --git a/bt5/erp5_wendelin_data_lake_ingestion/TestTemplateItem/portal_components/test.erp5.testDataLakeIngestion.xml b/bt5/erp5_wendelin_data_lake_ingestion/TestTemplateItem/portal_components/test.erp5.testDataLakeIngestion.xml
index 6803a55..d0a5d89 100644
--- a/bt5/erp5_wendelin_data_lake_ingestion/TestTemplateItem/portal_components/test.erp5.testDataLakeIngestion.xml
+++ b/bt5/erp5_wendelin_data_lake_ingestion/TestTemplateItem/portal_components/test.erp5.testDataLakeIngestion.xml
@@ -46,8 +46,8 @@
             <key> <string>text_content_warning_message</string> </key>
             <value>
               <tuple>
-                <string>W:104, 34: Unused variable \'i\' (unused-variable)</string>
-                <string>W:104, 76: Unused variable \'j\' (unused-variable)</string>
+                <string>W:105, 34: Unused variable \'i\' (unused-variable)</string>
+                <string>W:105, 76: Unused variable \'j\' (unused-variable)</string>
               </tuple>
             </value>
         </item>
-- 
2.30.9