Commit 9d8ba356 authored by Paul Graydon's avatar Paul Graydon

wendelin_telecom_test: Update and improve tests

parent ebef772d
...@@ -34,10 +34,14 @@ import string ...@@ -34,10 +34,14 @@ import string
from Products.ERP5Type.tests.SecurityTestCase import SecurityTestCase from Products.ERP5Type.tests.SecurityTestCase import SecurityTestCase
def generateRandomString(length=32): def generateRandomString(length=24, only_digits=False, hexadecimal=False):
return ''.join( character_list = string.digits
[random.choice(string.ascii_letters + string.digits) for _ in xrange(length)] if not only_digits:
) if hexadecimal:
character_list += 'ABCDEF'
else:
character_list += string.ascii_letters
return ''.join([random.choice(character_list) for _ in xrange(length)])
class WendelinTelecomTest(SecurityTestCase): class WendelinTelecomTest(SecurityTestCase):
""" """
...@@ -50,8 +54,16 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -50,8 +54,16 @@ class WendelinTelecomTest(SecurityTestCase):
def afterSetUp(self): def afterSetUp(self):
# Set up variables for ORS ingestion testing # Set up variables for ORS ingestion testing
self.ors_enb_log_ingestion = self.portal.portal_ingestion_policies.ors_enb_log_ingestion self.ors_enb_log_ingestion = self.portal.portal_ingestion_policies.ors_enb_log_ingestion
self.test_ors_example_log_valid = {'log': self.portal.web_page_module.test_example_ors_enb_log_valid.getTextContent()} self.test_ors_example_log_valid = {
self.test_ors_example_log_invalid = {'log': self.portal.web_page_module.test_example_ors_enb_log_invalid.getTextContent()} 'log': self.portal.web_page_module.test_example_ors_enb_log_valid.getTextContent()
}
self.test_ors_example_log_invalid = {
'log': self.portal.web_page_module.test_example_ors_enb_log_invalid.getTextContent()
}
self.test_ors_example_log_empty = {'log': ""}
# Set up ingestor user for performing ingestions
self.ingestor_user = self.createWendelinTelecomUser('test_ingestor_%s' % generateRandomString(), None, 'ingestor')
def beforeTearDown(self): def beforeTearDown(self):
self.abort() self.abort()
...@@ -67,16 +79,19 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -67,16 +79,19 @@ class WendelinTelecomTest(SecurityTestCase):
(self.portal.data_analysis_module, 'Data Analysis'), (self.portal.data_analysis_module, 'Data Analysis'),
(self.portal.data_array_module, 'Data Array'), (self.portal.data_array_module, 'Data Array'),
): ):
objects = module.objectValues(portal_type=portal_type) object_list = module.objectValues(portal_type=portal_type)
if objects: if object_list:
test_object_ids = [obj.getId() for obj in objects if ('test' in obj.getReference() and 'default' not in obj.getId())] test_object_id_list = [
if test_object_ids: obj.getId() for obj in object_list \
module.manage_delObjects(ids=test_object_ids) if ('test' in obj.getReference().lower() and 'default' not in obj.getId())
]
if test_object_id_list:
module.manage_delObjects(ids=test_object_id_list)
self.tic() self.tic()
def createWendelinTelecomUser(self, reference, project, function): def createWendelinTelecomUser(self, reference, project, function):
# Create and validate a new Person with an assignment associated to the provided project and function # Create and validate a new Person with an assignment linked to the provided project and function
# Also generate a validated ERP5 login for the Person # Also generate and validate an ERP5 login for the Person
user = self.portal.person_module.newContent( user = self.portal.person_module.newContent(
portal_type='Person', portal_type='Person',
reference=reference reference=reference
...@@ -87,103 +102,126 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -87,103 +102,126 @@ class WendelinTelecomTest(SecurityTestCase):
self.tic() self.tic()
return user return user
def createOrsClientProject(self, reference=None, user_reference=None, ors_tag_reference=None, ors_title_reference=None): def registerOrs(self, tag_hostname_seed=None, tag_comp_id_seed=None, tag_enb_id_seed=None):
# Create a client project with the provided reference. The reference will be used for all items linked to the project # Create a Data Acquisition Unit and related Data Supply with a tag constructed from the provided seeds.
# If set, user_reference overrides reference for the client user account # If any seed is NOT defined, it is generated at random.
# If set, ors_tag_reference overrides reference for the Data Acquisition Unit (ORS) if tag_hostname_seed is None:
# If set, ors_title_reference overrides reference for the title of the Data Acquisition Unit tag_hostname_seed = generateRandomString(length=3, only_digits=True)
if reference is None: if tag_comp_id_seed is None:
reference = generateRandomString() tag_comp_id_seed = generateRandomString(length=4, only_digits=True)
ors_tag = 'test_%s' % (ors_tag_reference or reference) if tag_enb_id_seed is None:
parameter_dict = { tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
'project': 'test_project_%s' % reference, ors_tag = 'ors%s_COMP-%s_e0x%sTest' % (tag_hostname_seed, tag_comp_id_seed, tag_enb_id_seed)
'project_title': 'Test Project %s' % reference,
'username': 'test_user_%s' % (user_reference or reference), response = self.portal.ERP5Site_registerOrs(ors_tag)
'ors_tag': ors_tag, self.tic()
'ors_data_acquisition_unit': 'ors.%s' % ors_tag,
'ors_title': 'Test ORS %s' % (ors_title_reference or reference) # Fetch created items from the catalog
data_acquisition_unit = self.portal.portal_catalog.getResultValue(
portal_type='Data Acquisition Unit',
reference=ors_tag,
validation_state='validated'
)
data_supply = None
if data_acquisition_unit is not None:
data_supply = data_acquisition_unit.DataAcquisitionUnit_createOrsDataSupply(batch=1)
# Return all associated items
return {
'response': response,
'data_acquisition_unit': data_acquisition_unit,
'data_supply': data_supply
} }
# Call the script responsible for creating the project and all associated items and store the JSON response def registerOrsClientProject(self, reference_seed=None, client_user_reference_seed=None):
response = self.portal.Base_registerOrsClientProject( # Create a client project with the provided reference seed,
parameter_dict['project'], # as well as a related ERP5 Person with the same reference seed, and an ERP5 login.
parameter_dict['project_title'], # If NOT defined, reference_seed is generated at random.
parameter_dict['username'], # If defined, client_user_reference_seed overrides reference_seed for the client user.
parameter_dict['ors_tag'], if reference_seed is None:
parameter_dict['ors_title'] reference_seed = generateRandomString()
project_reference = 'test_project_%s' % reference_seed
project_title = 'Test Project %s' % reference_seed
client_email = 'test_user_%s@test.wendelin-tele.com' % (client_user_reference_seed or reference_seed)
client_user_reference = client_email.split('@')[0]
# Call the script responsible for creating the project and the associated user, and store the JSON response
response = self.portal.ProjectModule_registerOrsClientProject(
project_reference,
project_title,
client_email,
form_id='testing'
) )
self.tic() self.tic()
# Fetch all associated items from the catalog # Fetch created items from the catalog
project = self.portal.portal_catalog.getResultValue( project = self.portal.portal_catalog.getResultValue(
portal_type='Project', portal_type='Project',
reference=parameter_dict['project'], reference=project_reference,
title=parameter_dict['project_title'], title=project_title,
validation_state='validated' validation_state='validated'
) )
client_user = self.portal.portal_catalog.getResultValue( client_user = self.portal.portal_catalog.getResultValue(
portal_type='Person', portal_type='Person',
reference=parameter_dict['username'], reference=client_user_reference,
validation_state='validated' default_email_text=client_email,
)
data_acquisition_unit = self.portal.portal_catalog.getResultValue(
portal_type='Data Acquisition Unit',
reference=parameter_dict['ors_data_acquisition_unit'],
title=parameter_dict['ors_title'],
validation_state='validated' validation_state='validated'
) )
data_supply = None
if data_acquisition_unit:
data_supply = data_acquisition_unit.DataAcquisitionUnit_createOrsDataSupply(batch=1)
# Return all associated items # Return all associated items
return { return {
'response': response, 'response': response,
'project': project, 'project': project,
'client_user': client_user, 'client_user': client_user
'data_acquisition_unit': data_acquisition_unit,
'data_supply': data_supply
} }
def ingestOrsLogDataFromFluentd(self, log_data, reference): def ingestOrsLogDataFromFluentd(self, log_data, ors_tag):
# Simulate a fluentd instance sending the provided log data to Wendelin for ingestion # Simulate a fluentd instance sending the provided log data to Wendelin for ingestion
reference = 'ors.%s' % ors_tag
body = msgpack.packb([0, log_data], use_bin_type=True) body = msgpack.packb([0, log_data], use_bin_type=True)
env = {'CONTENT_TYPE': 'application/octet-stream'} env = {'CONTENT_TYPE': 'application/octet-stream'}
path = self.ors_enb_log_ingestion.getPath() + '/ingest?reference=' + reference path = self.ors_enb_log_ingestion.getPath() + '/ingest?reference=' + reference
publish_kw = dict(user='ERP5TypeTestCase', env=env, publish_kw = dict(
request_method='POST', stdin=StringIO(body)) env=env,
user=self.ingestor_user.Person_getUserId(),
request_method='POST',
stdin=StringIO(body)
)
return self.publish(path, **publish_kw) return self.publish(path, **publish_kw)
def getDataStream(self, data_acquisition_unit): def getDataStream(self, data_acquisition_unit):
# Get a Data Stream linked to the provided Data Acquisition Unit # Retrieve a Data Stream linked to the provided Data Acquisition Unit
for line in data_acquisition_unit.getAggregateRelatedValueList(portal_type='Data Ingestion Line'): for line in data_acquisition_unit.getAggregateRelatedValueList(portal_type='Data Ingestion Line'):
data_stream = line.getAggregateValue(portal_type='Data Stream') data_stream = line.getAggregateValue(portal_type='Data Stream')
if data_stream: if data_stream:
return data_stream return data_stream
def getDataAnalysis(self, data_supply): def getDataAnalysis(self, data_supply):
# Get a Data Analysis linked to the provided Data Supply # Retrieve a Data Analysis linked to the provided Data Supply
for data_analysis in data_supply.getSpecialiseRelatedValueList(portal_type='Data Analysis'): for data_analysis in data_supply.getSpecialiseRelatedValueList(portal_type='Data Analysis'):
if data_analysis: if data_analysis:
return data_analysis return data_analysis
def getDataArrays(self, data_analysis): def getDataArrays(self, data_analysis):
# Get the Data Arrays linked to the provided Data Analysis # Retrieve the Data Arrays linked to the provided Data Analysis
data_arrays = [] data_array_list = []
for line in data_analysis.contentValues(portal_type='Data Analysis Line'): for line in data_analysis.contentValues(portal_type='Data Analysis Line'):
data_array = line.getAggregateValue(portal_type='Data Array') data_array = line.getAggregateValue(portal_type='Data Array')
if data_array: if data_array:
data_arrays.append(data_array) data_array_list.append(data_array)
return data_arrays return data_array_list
def getOrsLogIngestionItems(self, log_data, reference, stop_data_analysis=False): def getOrsLogIngestionItems(self, log_data, reference, stop_data_analysis=False):
# Simulate an ingestion of the provided log data # Simulate an ingestion of the provided log data
response = self.ingestOrsLogDataFromFluentd(log_data, reference) response = self.ingestOrsLogDataFromFluentd(log_data, reference)
self.tic() self.tic()
# Retrieve all items linked to the ingestion # Retrieve all items linked to the ingestion
data_acquisition_unit = self.portal.portal_catalog.getResultValue(portal_type='Data Acquisition Unit', reference=reference) data_acquisition_unit = self.portal.portal_catalog.getResultValue(
portal_type='Data Acquisition Unit',
reference=reference
)
data_supply = data_acquisition_unit.DataAcquisitionUnit_createOrsDataSupply(batch=1) data_supply = data_acquisition_unit.DataAcquisitionUnit_createOrsDataSupply(batch=1)
self.tic() self.tic()
...@@ -200,7 +238,7 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -200,7 +238,7 @@ class WendelinTelecomTest(SecurityTestCase):
data_analysis.stop() data_analysis.stop()
self.tic() self.tic()
data_arrays = self.getDataArrays(data_analysis) data_array_list = self.getDataArrays(data_analysis)
# Return all associated items # Return all associated items
return { return {
...@@ -209,9 +247,9 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -209,9 +247,9 @@ class WendelinTelecomTest(SecurityTestCase):
'data_supply': data_supply, 'data_supply': data_supply,
'data_stream': data_stream, 'data_stream': data_stream,
'data_analysis': data_analysis, 'data_analysis': data_analysis,
'data_arrays': data_arrays 'data_array_list': data_array_list
} }
def checkDocumentPermissions(self, user, document, user_can_view, user_can_modify, user_can_add): def checkDocumentPermissions(self, user, document, user_can_view, user_can_modify, user_can_add):
user_id = user.Person_getUserId() user_id = user.Person_getUserId()
...@@ -234,62 +272,141 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -234,62 +272,141 @@ class WendelinTelecomTest(SecurityTestCase):
self.assertUserCanAddDocument(user_id, document) self.assertUserCanAddDocument(user_id, document)
else: else:
self.failIfUserCanAddDocument(user_id, document) self.failIfUserCanAddDocument(user_id, document)
def checkIngestionDocumentsPermissions(self, user, ingestion_items, user_is_admin, user_is_assigned_to_same_project): def checkIngestionDocumentsPermissions(self, user, ingestion_item_dict):
user_destination_project = None
user_function_list = None
for assignment in user.contentValues(portal_type='Assignment'):
if assignment.getValidationState() == 'open':
user_destination_project = assignment.getDestinationProject()
user_function_list = assignment.getFunctionList()
ors_destination_project = ingestion_item_dict['data_supply'].getDestinationProject()
same_project = (user_destination_project is not None) \
and (ors_destination_project is not None) \
and (user_destination_project == ors_destination_project)
user_is_admin = 'administrator' in user_function_list
user_is_ingestor = 'ingestor' in user_function_list
# A client can only view a Data Acquisition Unit (ORS) if they are related to the same project # A client can only view a Data Acquisition Unit (ORS) if they are related to the same project
# An administrator can view and edit all of them, as well as add one # An administrator can view and edit all of them, as well as add one
self.checkDocumentPermissions(user, ingestion_items['data_acquisition_unit'], user_is_admin or user_is_assigned_to_same_project, user_is_admin, user_is_admin) # An ingestor can view any of them
self.checkDocumentPermissions(
user,
ingestion_item_dict['data_acquisition_unit'],
same_project or user_is_admin or user_is_ingestor,
user_is_admin,
user_is_admin
)
# Same as above for a Data Supply (required for computing security roles on users) # Same as above for a Data Supply (required for computing security roles on users)
self.checkDocumentPermissions(user, ingestion_items['data_supply'], user_is_admin or user_is_assigned_to_same_project, user_is_admin, user_is_admin) self.checkDocumentPermissions(
user,
ingestion_item_dict['data_supply'],
same_project or user_is_admin or user_is_ingestor,
user_is_admin,
user_is_admin
)
# Only an administrator can view a Data Stream, and nothing else # An administrator can view a Data Stream
self.checkDocumentPermissions(user, ingestion_items['data_stream'], user_is_admin, False, False) # An ingestor has all rights to a Data Stream (in order to append new log data to it)
self.checkDocumentPermissions(user, ingestion_item_dict['data_stream'], user_is_admin or user_is_ingestor, user_is_ingestor, user_is_ingestor)
# A client can only view a Data Analysis if they are related to the same project (required for KPI graphing) # A client can only view a Data Analysis if they are related to the same project (required for KPI graphing)
# An administrator can view all of them # An administrator can view all of them
self.checkDocumentPermissions(user, ingestion_items['data_analysis'], user_is_admin or user_is_assigned_to_same_project, False, False) self.checkDocumentPermissions(user, ingestion_item_dict['data_analysis'], user_is_admin or same_project, False, False)
# A client can only view a Data Array if they are related to the same project # A client can only view a Data Array if they are related to the same project
# An administrator can view all of them # An administrator can view all of them
for data_array in ingestion_items['data_arrays']: for data_array in ingestion_item_dict['data_array_list']:
self.checkDocumentPermissions(user, data_array, user_is_admin or user_is_assigned_to_same_project, False, False) self.checkDocumentPermissions(user, data_array, user_is_admin or same_project, False, False)
def checkModulePermissions(self, user, user_is_admin): def checkModulePermissions(self, user):
# Everyone can view the Data Acquisition Unit and Data Supply modules user_function_list = None
# Only administrators can add documents to them for assignment in user.contentValues(portal_type='Assignment'):
self.checkDocumentPermissions(user, self.portal.data_acquisition_unit_module, True, False, user_is_admin) if assignment.getValidationState() == 'open':
self.checkDocumentPermissions(user, self.portal.data_supply_module, True, False, user_is_admin) user_function_list = assignment.getFunctionList()
user_is_client = 'user' in user_function_list
user_is_admin = 'administrator' in user_function_list
user_is_ingestor = 'ingestor' in user_function_list
# Everyone can view the Data Product module (required for KPI graphing) # Everyone can view the Data Product module (required for KPI graphing)
# Everyone can also view the two data products used in the KPI calculation process (required for KPI graphing) # Everyone can also view the two data products used in the KPI calculation process (required for KPI graphing)
self.checkDocumentPermissions(user, self.portal.data_product_module, True, False, False) self.checkDocumentPermissions(user, self.portal.data_product_module, True, False, False)
ors_kpi = self.portal.portal_catalog.getResultValue(portal_type='Data Product', ors_kpi = self.portal.portal_catalog.getResultValue(
portal_type='Data Product',
reference='ors_kpi', reference='ors_kpi',
validation_state='validated') validation_state='validated'
)
self.checkDocumentPermissions(user, ors_kpi, True, False, False) self.checkDocumentPermissions(user, ors_kpi, True, False, False)
ors_enb_log_data = self.portal.portal_catalog.getResultValue(portal_type='Data Product', ors_enb_log_data = self.portal.portal_catalog.getResultValue(
portal_type='Data Product',
reference='ors_enb_log_data', reference='ors_enb_log_data',
validation_state='validated') validation_state='validated'
)
self.checkDocumentPermissions(user, ors_enb_log_data, True, False, False) self.checkDocumentPermissions(user, ors_enb_log_data, True, False, False)
# Everyone can view the Data Transformation module (required for KPI graphing) # Only ingestors can view the Data Operation Module, as well as the two Data Operations required
# Everyone can also view the data transformation used to produce the KPIs (required for KPI graphing) # for ORS eNB log ingestion
self.checkDocumentPermissions(user, self.portal.data_transformation_module, True, False, False) self.checkDocumentPermissions(user, self.portal.data_operation_module, user_is_ingestor, False, False)
data_transformation = self.portal.portal_catalog.getResultValue(portal_type='Data Transformation', ingest_ors_enb_log_data = self.portal.portal_catalog.getResultValue(
reference='ors_enb_log_data_transformation', portal_type='Data Operation',
validation_state='validated') reference='ingest_ors_enb_log_data',
self.checkDocumentPermissions(user, data_transformation, True, False, False) validation_state='validated'
)
self.checkDocumentPermissions(user, ingest_ors_enb_log_data, user_is_ingestor, False, False)
calculate_ors_kpi = self.portal.portal_catalog.getResultValue(
portal_type='Data Operation',
reference='calculate_ors_kpi',
validation_state='validated'
)
self.checkDocumentPermissions(user, calculate_ors_kpi, user_is_ingestor, False, False)
# Only administrators can view the Data Ingestion and Data Stream modules # Everyone can view the Data Acquisition Unit and Data Supply modules
self.checkDocumentPermissions(user, self.portal.data_ingestion_module, user_is_admin, False, False) # Only administrators can add items to them
self.checkDocumentPermissions(user, self.portal.data_stream_module, user_is_admin, False, False) self.checkDocumentPermissions(user, self.portal.data_acquisition_unit_module, True, False, user_is_admin)
self.checkDocumentPermissions(user, self.portal.data_supply_module, True, False, user_is_admin)
# Only clients and administrator can view the Data Transformation module (required for KPI graphing)
# Only they can also view the data transformation used to produce the KPIs (required for KPI graphing)
self.checkDocumentPermissions(
user,
self.portal.data_transformation_module,
user_is_client or user_is_admin,
False,
False
)
data_transformation = self.portal.portal_catalog.getResultValue(
portal_type='Data Transformation',
reference='ors_enb_log_data_transformation',
validation_state='validated'
)
self.checkDocumentPermissions(user, data_transformation, user_is_client or user_is_admin, False, False)
# Only ingestors and administrators can view the Data Ingestion and Data Stream modules
# Only ingestors can add new Data Ingestions and Data Streams
self.checkDocumentPermissions(
user,
self.portal.data_ingestion_module,
user_is_ingestor or user_is_admin,
False,
user_is_ingestor
)
self.checkDocumentPermissions(
user,
self.portal.data_stream_module,
user_is_ingestor or user_is_admin,
False,
user_is_ingestor
)
# Only administrators can view the Data Analysis module # Only administrators can view the Data Analysis module
self.checkDocumentPermissions(user, self.portal.data_analysis_module, user_is_admin, False, False) self.checkDocumentPermissions(user, self.portal.data_analysis_module, user_is_admin, False, False)
# Everyone can view the Data Array module (required for KPI graphing) # Only clients and administrators can view the Data Array module (required for KPI graphing)
self.checkDocumentPermissions(user, self.portal.data_array_module, True, False, False) self.checkDocumentPermissions(user, self.portal.data_array_module, user_is_client or user_is_admin, False, False)
# Only administrators have access to the Person and Project modules and can add items to them for client management purposes # Only administrators have access to the Person and Project modules and can add items to them for client management purposes
self.checkDocumentPermissions(user, self.portal.project_module, user_is_admin, False, user_is_admin) self.checkDocumentPermissions(user, self.portal.project_module, user_is_admin, False, user_is_admin)
...@@ -300,289 +417,340 @@ class WendelinTelecomTest(SecurityTestCase): ...@@ -300,289 +417,340 @@ class WendelinTelecomTest(SecurityTestCase):
Test the action which creates an ORS Data Supply from a Data Acquisition Unit. Test the action which creates an ORS Data Supply from a Data Acquisition Unit.
Check that the Data Supply is indeed created and validated. Check that the Data Supply is indeed created and validated.
''' '''
ors_reference = 'ors.test_%s' % generateRandomString() reference = 'test_%s' % generateRandomString()
# Create and validate a Data Acquisition Unit # Create and validate a Data Acquisition Unit
data_acquisition_unit = self.portal.data_acquisition_unit_module.newContent( data_acquisition_unit = self.portal.data_acquisition_unit_module.newContent(
portal_type='Data Acquisition Unit', portal_type='Data Acquisition Unit',
reference=ors_reference reference=reference
) )
data_acquisition_unit.validate() data_acquisition_unit.validate()
self.tic() self.tic()
# Call the tested script which creates a related Data Supply # Call the script which creates a related Data Supply
# No need to assign a project to it here: that is implicitly tested later created_data_supply = data_acquisition_unit.DataAcquisitionUnit_createOrsDataSupply(batch=1)
data_supply = data_acquisition_unit.DataAcquisitionUnit_createOrsDataSupply(batch=1)
self.tic() self.tic()
# Check that the Data Supply exists and is validated # Check that the Data Supply exists and is validated
self.assertTrue(data_supply is not None) self.assertTrue(created_data_supply is not None)
self.assertTrue(data_supply.getValidationState() == 'validated') self.assertTrue(created_data_supply.getValidationState() == 'validated')
# Call the script again to retrieve the same Data Supply
retrieved_data_supply = data_acquisition_unit.DataAcquisitionUnit_createOrsDataSupply(batch=1)
# Check that both Data Supplies are identical
self.assertTrue(created_data_supply == retrieved_data_supply)
def test_02_registerOrsClientProject(self): def test_02_registerOrsClientProject(self):
''' '''
Test the script called during slave instantiation in SlapOS to register a new client project. Test the action performed by Administrator users in the Project module to register a new client project.
Check the successful case as well as all error cases. Check the successful case as well as all error cases.
''' '''
# Generate a random reference, call the tested script and retrieve all associated items # Generate a random reference seed, call the script and retrieve the associated items
# This first call should succeed # This first call should succeed
reference = generateRandomString() reference_seed = generateRandomString()
project_items = self.createOrsClientProject(reference=reference) project_item_dict = self.registerOrsClientProject(reference_seed=reference_seed)
# Parse the JSON response and check that the user credentials exist and are valid
response_dict = json.loads(project_items['response'])
self.assertTrue(response_dict['username'] == 'test_user_%s' % reference)
self.assertTrue(len(response_dict['init_password']) == 16)
# Check that all items created by the script have been created # Check that both the project and the client user have been created
self.assertTrue(project_items['project'] is not None) self.assertTrue(project_item_dict['project'] is not None)
self.assertTrue(project_items['client_user'] is not None) self.assertTrue(project_item_dict['client_user'] is not None)
self.assertTrue(project_items['data_acquisition_unit'] is not None)
self.assertTrue(project_items['data_supply'] is not None)
# Call the script a second time with the same reference # Call the script a second time with the same reference
# This should not do anything and respond with an error as the project already exists # This should not do anything as the project already exists
project_items = self.createOrsClientProject(reference=reference) repeated_project_item_dict = self.registerOrsClientProject(reference_seed=reference_seed)
# Parse the JSON response and check that the error message is valid # Check that both the project and the client user are identical to the previous ones
response_dict = json.loads(project_items['response']) self.assertTrue(repeated_project_item_dict['project'] == project_item_dict['project'])
self.assertTrue('error_msg' in response_dict) self.assertTrue(repeated_project_item_dict['client_user'] == repeated_project_item_dict['client_user'])
self.assertTrue(response_dict['error_msg'] == "Client project %s already registered." % project_items['project'].getReference())
# Create a new reference seed for the project, but reuse the previous reference for the client user account
# Create a new reference for the project, but reuse the previous reference for the client user account new_project_reference_seed = generateRandomString()
new_project_reference = generateRandomString() while new_project_reference_seed == reference_seed:
while new_project_reference == reference: new_project_reference_seed = generateRandomString()
new_project_reference = generateRandomString() # Call the script a third time, keeping the same reference seed as before ONLY for the client user account
# Call the script a third time, keeping the same reference as before ONLY for the client user account
# This should also error out as the client user account already exists # This should also error out as the client user account already exists
project_items = self.createOrsClientProject(reference=new_project_reference, user_reference=reference) new_project_item_dict = self.registerOrsClientProject(reference_seed=new_project_reference_seed, client_user_reference_seed=reference_seed)
# Parse the JSON response and check that the error message is valid
response_dict = json.loads(project_items['response'])
self.assertTrue('error_msg' in response_dict)
self.assertTrue(response_dict['error_msg'] == "Client account username not available: test_user_%s." % reference)
# Check that the new project is NOT created
self.assertTrue(project_items['project'] is None)
# Same thing as above, but reuse the original reference for the Data Acquisition Unit (ORS) # Check that the new project is NOT created and that the client user is the same as previously
new_project_reference_2 = generateRandomString() self.assertTrue(new_project_item_dict['project'] is None)
while (new_project_reference_2 == reference) or (new_project_reference_2 == new_project_reference): self.assertTrue(new_project_item_dict['client_user'] == project_item_dict['client_user'])
new_project_reference_2 = generateRandomString()
# Call the script a fourth time, keeping the same reference as before ONLY for the Data Acquisition Unit
# This should also error out as it already exists
project_items = self.createOrsClientProject(reference=new_project_reference_2, ors_tag_reference=reference)
# Parse the JSON response and check that the error message is valid def test_03_registerOrs(self):
response_dict = json.loads(project_items['response'])
self.assertTrue('error_msg' in response_dict)
self.assertTrue(response_dict['error_msg'] == "ORS with tag test_%s already registered." % reference)
# Check that the new project is NOT created
self.assertTrue(project_items['project'] is None)
# Generate new reference again, but reuse the original reference for the Data Acquisition Unit's title
new_project_reference_3 = generateRandomString()
while (new_project_reference_3 == reference):
new_project_reference_3 = generateRandomString()
# Call the script a fifth time, using the new reference for everything except the ORS title
# This should succeed as the title is not checked for collision
project_items = self.createOrsClientProject(reference=new_project_reference_3, ors_title_reference=reference)
# Parse the JSON response and check that the user credentials exist and are valid
response_dict = json.loads(project_items['response'])
self.assertTrue(response_dict['username'] == 'test_user_%s' % new_project_reference_3)
self.assertTrue(len(response_dict['init_password']) == 16)
# Check that all items created by the script have been created
self.assertTrue(project_items['project'] is not None)
self.assertTrue(project_items['client_user'] is not None)
self.assertTrue(project_items['data_acquisition_unit'] is not None)
self.assertTrue(project_items['data_supply'] is not None)
def test_03_registerNewOrsToProject(self):
''' '''
Test the action which configures a Data Acquisition Unit and Data Supply representing an ORS Test the script called during slave instantiation in SlapOS by an ORS to automatically register itself.
assigned to a given client project. Check all detected cases.
''' '''
project_items = self.createOrsClientProject() tag_hostname_seed = generateRandomString(length=3, only_digits=True)
tag_comp_id_seed = generateRandomString(length=4, only_digits=True)
# Check that all items related to the project exist tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
self.assertTrue(project_items['project'] is not None) ors_item_dict = self.registerOrs(
self.assertTrue(project_items['client_user'] is not None) tag_hostname_seed=tag_hostname_seed,
self.assertTrue(project_items['data_acquisition_unit'] is not None) tag_comp_id_seed=tag_comp_id_seed,
self.assertTrue(project_items['data_supply'] is not None) tag_enb_id_seed=tag_enb_id_seed
# Add a new ORS to the project, with a new reference
reference = generateRandomString()
ors_tag = 'test_%s' % reference
ors_reference = 'ors.test_%s' % reference
ors_title = 'ORS Test %s' % reference
project_items['project'].Project_newOrs(ors_tag, ors_title)
self.tic()
# Check that the Data Acquisition Unit exists and is validated
data_acquisition_unit = self.portal.portal_catalog.getResultValue(
portal_type='Data Acquisition Unit',
reference=ors_reference,
title=ors_title,
validation_state='validated'
) )
self.assertTrue(data_acquisition_unit is not None)
# Check that the Data Supply exists and is validated # Parse the JSON response and check that it is empty, indicating a success
data_supply = data_acquisition_unit.DataAcquisitionUnit_createOrsDataSupply(batch=1) response_dict = json.loads(ors_item_dict['response'])
self.assertTrue(data_supply is not None) self.assertTrue(response_dict == {})
# Generate new references
new_reference = generateRandomString()
new_generic_reference = 'test_%s' % new_reference
new_ors_reference = 'ors.test_%s' % new_reference
new_ors_title = 'ORS Test %s' % new_reference
# Add a new ORS to the project, with a different reference but the same title # Check that the Data Acquisition Unit and Data Supply have been created
project_items['project'].Project_newOrs(new_generic_reference, ors_title) self.assertTrue(ors_item_dict['data_acquisition_unit'] is not None)
self.tic() self.assertTrue(ors_item_dict['data_supply'] is not None)
# Check that the Data Acquisition Unit exists and is validated # Call the script a second time with the same seeds
new_data_acquisition_unit = self.portal.portal_catalog.getResultValue( # This should not do anything as the items already exist
portal_type='Data Acquisition Unit', repeated_ors_item_dict = self.registerOrs(
reference=new_ors_reference, tag_hostname_seed=tag_hostname_seed,
title=ors_title, tag_comp_id_seed=tag_comp_id_seed,
validation_state='validated' tag_enb_id_seed=tag_enb_id_seed
) )
self.assertTrue(new_data_acquisition_unit is not None)
# Check that the Data Supply exists and is validated # Parse the JSON response and check the error message
new_data_supply = new_data_acquisition_unit.DataAcquisitionUnit_createOrsDataSupply(batch=1) response_dict = json.loads(repeated_ors_item_dict['response'])
self.assertTrue(new_data_supply is not None) self.assertTrue('error_msg' in response_dict)
self.assertTrue(response_dict['error_msg'] == "ORS with tag %s already exists." % ors_item_dict['data_acquisition_unit'].getReference())
# Generate a new seed that will cause the tag to be invalid
invalid_tag_hostname_seed = 'invalid_hostname'
# Call the script a third time with the new seed
# This should error out as the tag is invalid
invalid_ors_item_dict = self.registerOrs(
tag_hostname_seed=invalid_tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
tag_enb_id_seed=tag_enb_id_seed
)
# Add another ORS to the project, with the same reference and a new title # Parse the JSON response and check the error message
# This should fail as there can be no reference collision response_dict = json.loads(invalid_ors_item_dict['response'])
project_items['project'].Project_newOrs(ors_tag, new_ors_title) self.assertTrue('error_msg' in response_dict)
self.tic() self.assertTrue(response_dict['error_msg'] == "Invalid ORS tag ors%s_COMP-%s_e0x%sTest found" % (invalid_tag_hostname_seed, tag_comp_id_seed, tag_enb_id_seed))
# Check that the Data Acquisition Unit and Data Supply have NOT been created
self.assertTrue(invalid_ors_item_dict['data_acquisition_unit'] is None)
self.assertTrue(invalid_ors_item_dict['data_supply'] is None)
# Now, link the original Data Supply to a client project
project_a_item_dict = self.registerOrsClientProject()
project_a_url = project_a_item_dict['project'].getRelativeUrl()
ors_item_dict['data_supply'].setDestinationProject(project_a_url)
# Generate a new valid enb_id seed
new_tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
# Call the script to simulate an ORS re-registering with another eNB identifier
new_enb_id_ors_item_dict = self.registerOrs(
tag_hostname_seed=tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
tag_enb_id_seed=new_tag_enb_id_seed
)
# Check that the Data Acquisition Unit does not exist # Check that the Data Acquisition Unit and Data Supply have been created
new_data_acquisition_unit = self.portal.portal_catalog.getResultValue( self.assertTrue(new_enb_id_ors_item_dict['data_acquisition_unit'] is not None)
portal_type='Data Acquisition Unit', self.assertTrue(new_enb_id_ors_item_dict['data_supply'] is not None)
reference=ors_reference, self.assertTrue(new_enb_id_ors_item_dict['data_supply'].getDestinationProject() == project_a_url)
title=new_ors_title,
validation_state='validated' # Now, link the above Data Supply to a second project
project_b_item_dict = self.registerOrsClientProject()
new_enb_id_ors_item_dict['data_supply'].setDestinationProject(project_b_item_dict['project'].getRelativeUrl())
# Generate another valid enb_id seed
another_tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
while another_tag_enb_id_seed == new_tag_enb_id_seed:
another_tag_enb_id_seed = generateRandomString(length=5, hexadecimal=True)
# Call the script to simulate the same ORS registering a third time with another eNB identifier
another_enb_id_ors_item_dict = self.registerOrs(
tag_hostname_seed=tag_hostname_seed,
tag_comp_id_seed=tag_comp_id_seed,
tag_enb_id_seed=another_tag_enb_id_seed
) )
self.assertTrue(new_data_acquisition_unit is None)
def test_04_1_ingestValidOrsLogDataFromFluentd(self, valid_data=True): # Check that the Data Acquisition Unit and Data Supply have been created
self.assertTrue(another_enb_id_ors_item_dict['data_acquisition_unit'] is not None)
self.assertTrue(another_enb_id_ors_item_dict['data_supply'] is not None)
# As the ORS has been linked to two different projects already,
# it cannot be automatically decided to which project this version should be assigned to
self.assertTrue(another_enb_id_ors_item_dict['data_supply'].getDestinationProject() is None)
def test_04_1_ingestValidOrsLogDataFromFluentd(self, data_key="valid"):
''' '''
Test a simple valid ORS log ingestion: simulate a fluentd gateway forwarding valid ORS logs to a project on the platform, Test a simple valid ORS log ingestion: simulate a fluentd gateway forwarding valid ORS logs to the platform,
and check that all items related to the ingestion are valid. and check that all items related to the ingestion are valid.
''' '''
project_items = self.createOrsClientProject() # Register the ORS
ors_reference = project_items['data_acquisition_unit'].getReference() ors_item_dict = self.registerOrs()
ors_tag = ors_item_dict['data_acquisition_unit'].getReference()
# Get the correct data logs according to what is being tested # Get the correct data logs according to what is being tested
test_ors_example_log = self.test_ors_example_log_valid test_ors_example_log = None
if not valid_data: if data_key == "valid":
test_ors_example_log = self.test_ors_example_log_valid
elif data_key == "invalid":
test_ors_example_log = self.test_ors_example_log_invalid test_ors_example_log = self.test_ors_example_log_invalid
ingestion_items = self.getOrsLogIngestionItems(test_ors_example_log, ors_reference) elif data_key == "empty":
test_ors_example_log = self.test_ors_example_log_empty
# Perform ingestion
ingestion_item_dict = self.getOrsLogIngestionItems(test_ors_example_log, ors_tag)
# In all cases, check that all items related to the ingestion exist # In all cases, check that all items related to the ingestion exist
self.assertEqual(NO_CONTENT, ingestion_items['response'].getStatus()) self.assertEqual(NO_CONTENT, ingestion_item_dict['response'].getStatus())
self.assertTrue(ingestion_items['data_acquisition_unit'] is not None) self.assertTrue(ingestion_item_dict['data_acquisition_unit'] is not None)
self.assertTrue(ingestion_items['data_supply'] is not None) self.assertTrue(ingestion_item_dict['data_supply'] is not None)
self.assertTrue(ingestion_items['data_stream'].getData() is not None) self.assertTrue(ingestion_item_dict['data_stream'].getData() is not None)
self.assertTrue(ingestion_items['data_analysis'] is not None) self.assertTrue(ingestion_item_dict['data_analysis'] is not None)
self.assertTrue(all(data_array is not None for data_array in ingestion_items['data_arrays'])) self.assertTrue(all(data_array is not None for data_array in ingestion_item_dict['data_array_list']))
# Check that the data arrays containing the KPI data have correctly been initialized # Check that the data arrays containing the KPI data have correctly been initialized
self.assertTrue(ingestion_items['data_acquisition_unit'].Base_getERabDataArrayKey() is not None) self.assertTrue(ingestion_item_dict['data_acquisition_unit'].DataAcquisitionUnit_getERabDataArrayKey() is not None)
self.assertTrue(ingestion_items['data_acquisition_unit'].Base_getEUtranDataArrayKey() is not None) self.assertTrue(ingestion_item_dict['data_acquisition_unit'].DataAcquisitionUnit_getEUtranDataArrayKey() is not None)
if valid_data: e_rab_shape = (0,)
# If all data is valid: check that the arrays contain some data e_rab_dtype = [
self.assertTrue(all(data_array.getArrayShape() is not None for data_array in ingestion_items['data_arrays'])) ('vt', '<f8'),
self.assertTrue(all(data_array.getArrayDtype() is not None for data_array in ingestion_items['data_arrays'])) ('vInitialEPSBEstabSR_lo', '<f8'),
else: ('vInitialEPSBEstabSR_hi', '<f8'),
# If some data is invalid: check that the whole log (one data chunk in this case) is ignored ('vAddedEPSBEstabSR_lo', '<f8'),
# and the data arrays are empty ('vAddedEPSBEstabSR_hi', '<f8')
self.assertTrue(all(data_array.getArrayShape() is None for data_array in ingestion_items['data_arrays'])) ]
self.assertTrue(all(data_array.getArrayDtype() is None for data_array in ingestion_items['data_arrays'])) e_utran_shape = (0,)
e_utran_dtype = [('evt', '<f8'), ('qci', '<f8'), ('dl_lo', '<f8'), ('dl_hi', '<f8'), ('ul_lo', '<f8'), ('ul_hi', '<f8')]
if data_key == "valid":
e_rab_shape = (24,)
elif data_key == "invalid":
e_rab_shape = (30,)
elif data_key == "empty":
e_rab_shape = None
e_rab_dtype = None
e_utran_shape = None
e_utran_dtype = None
for data_array in ingestion_item_dict['data_array_list']:
if 'e_rab' in data_array.getReference():
self.assertTrue(data_array.getArrayDtype() == e_rab_dtype)
self.assertTrue(data_array.getArrayShape() == e_rab_shape)
elif 'e_utran' in data_array.getReference():
self.assertTrue(data_array.getArrayDtype() == e_utran_dtype)
self.assertTrue(data_array.getArrayShape() == e_utran_shape)
def test_04_2_ingestInvalidOrsLogDataFromFluentd(self): def test_04_2_ingestInvalidOrsLogDataFromFluentd(self):
''' '''
Test an invalid ORS log ingestion: simulate a fluentd gateway forwarding invalid ORS logs to a project on the platform. Test an invalid ORS log ingestion: simulate a fluentd gateway forwarding invalid ORS logs to the platform.
Check that all items are valid, but that the data arrays contain no data due to the invalid data chunk being ignored. Check that all items are still valid, as only the invalid measurements are ignored.
''' '''
# Call the above test, but test with invalid data # Call the above test, but test with invalid data
self.test_04_1_ingestValidOrsLogDataFromFluentd(valid_data=False) self.test_04_1_ingestValidOrsLogDataFromFluentd(data_key="invalid")
def test_05_wendelinTelecomSecurityModel(self): def test_04_3_ingestEmptyOrsLogDataFromFluentd(self):
''' '''
Test Wendelin Telecom's custom security model: check that different users have the correct permissions according to their function and project. Test an empty ORS log ingestion: simulate a fluentd gateway forwarding empty ORS logs to the platform.
Check that all items are valid, but the data arrays remain uninitialized.
''' '''
# Create two distinct projects # Call the above test, but test with empty data
project_a_items = self.createOrsClientProject() self.test_04_1_ingestValidOrsLogDataFromFluentd(data_key="empty")
ors_a_reference = project_a_items['data_acquisition_unit'].getReference()
project_b_items = self.createOrsClientProject()
ors_b_reference = project_b_items['data_acquisition_unit'].getReference()
# Perform ingestions for the ORSs of both projects def test_04_4_ingestOrsLogDataWithoutPrefix(self):
ingestion_a_items = self.getOrsLogIngestionItems(self.test_ors_example_log_valid, ors_a_reference, stop_data_analysis=True) '''
ingestion_b_items = self.getOrsLogIngestionItems(self.test_ors_example_log_valid, ors_b_reference, stop_data_analysis=True) Simulate an entity trying to send data to the platform for ingestion
without using the 'ors.' prefix added by fluentd.
Check that the ingestion is refused.
'''
# Create a client user not associated to a project (should not happen in practice) # No need to register an ORS here
client_user_n = self.createWendelinTelecomUser('test_user_%s' % generateRandomString(), None, 'user') entity_tag = generateRandomString()
# Create two administrator users: one associated to Project A and the second not associated to a project # Call the script that parses the ingestion tag
admin_user_a = self.createWendelinTelecomUser('test_user_%s' % generateRandomString(), project_a_items['project'].getRelativeUrl(), 'administrator') # Check that it raises the expected error
admin_user_n = self.createWendelinTelecomUser('test_user_%s' % generateRandomString(), None, 'administrator') self.assertRaises(
ValueError,
self.portal.IngestionPolicy_parseOrsFluentdTag,
entity_tag
)
ors_n_reference = 'ors.test_%s' % generateRandomString() def test_05_wendelinTelecomSecurityModel(self):
'''
Test Wendelin Telecom's custom security model:
check that different users have the correct permissions according to their function and project.
'''
# Generate a Data Acquisition Unit without a project (should not happen in practice) # Setup two distinct projects with one linked ORS each
data_acquisition_unit = self.portal.data_acquisition_unit_module.newContent( project_a_item_dict = self.registerOrsClientProject()
portal_type='Data Acquisition Unit', ors_a_item_dict = self.registerOrs()
reference=ors_n_reference ors_a_tag = ors_a_item_dict['data_acquisition_unit'].getReference()
ors_a_item_dict['data_supply'].setDestinationProject(project_a_item_dict['project'].getRelativeUrl())
project_b_item_dict = self.registerOrsClientProject()
ors_b_item_dict = self.registerOrs()
ors_b_tag = ors_b_item_dict['data_acquisition_unit'].getReference()
ors_b_item_dict['data_supply'].setDestinationProject(project_b_item_dict['project'].getRelativeUrl())
# Register a third ORS without linking it to a project
ors_n_item_dict = self.registerOrs()
ors_n_tag = ors_n_item_dict['data_acquisition_unit'].getReference()
# Perform ingestions for all three ORSs
ingestion_a_item_dict = self.getOrsLogIngestionItems(
self.test_ors_example_log_valid,
ors_a_tag,
stop_data_analysis=True
)
ingestion_b_item_dict = self.getOrsLogIngestionItems(
self.test_ors_example_log_valid,
ors_b_tag,
stop_data_analysis=True
)
ingestion_n_item_dict = self.getOrsLogIngestionItems(
self.test_ors_example_log_valid,
ors_n_tag,
stop_data_analysis=True
) )
data_acquisition_unit.validate()
self.tic()
data_acquisition_unit.DataAcquisitionUnit_createOrsDataSupply(batch=1)
self.tic()
# Perform an ingestion for the ORS not associated to a project # Create a client user not associated to a project
ingestion_n_items = self.getOrsLogIngestionItems(self.test_ors_example_log_valid, ors_n_reference, stop_data_analysis=True) client_user_n = self.createWendelinTelecomUser('test_user_%s' % generateRandomString(), None, 'user')
# Create two administrator users: one associated to Project A and the second not associated to a project
admin_user_a = self.createWendelinTelecomUser(
'test_user_%s' % generateRandomString(),
project_a_item_dict['project'].getRelativeUrl(),
'administrator'
)
admin_user_n = self.createWendelinTelecomUser('test_administrator_%s' % generateRandomString(), None, 'administrator')
# Check that the client of Project A only has access to Project A documents # Check that the client of Project A only has access to Project A documents
client_user_a = project_a_items['client_user'] client_user_a = project_a_item_dict['client_user']
self.checkModulePermissions(client_user_a, False) self.checkModulePermissions(client_user_a)
self.checkIngestionDocumentsPermissions(client_user_a, ingestion_a_items, False, True) self.checkIngestionDocumentsPermissions(client_user_a, ingestion_a_item_dict)
self.checkIngestionDocumentsPermissions(client_user_a, ingestion_b_items, False, False) self.checkIngestionDocumentsPermissions(client_user_a, ingestion_b_item_dict)
self.checkIngestionDocumentsPermissions(client_user_a, ingestion_n_items, False, False) self.checkIngestionDocumentsPermissions(client_user_a, ingestion_n_item_dict)
# Check that the client of project_B only has access to project_B documents # Check that the client of project_B only has access to project_B documents
client_user_b = project_b_items['client_user'] client_user_b = project_b_item_dict['client_user']
self.checkModulePermissions(client_user_b, False) self.checkModulePermissions(client_user_b)
self.checkIngestionDocumentsPermissions(client_user_b, ingestion_a_items, False, False) self.checkIngestionDocumentsPermissions(client_user_b, ingestion_a_item_dict)
self.checkIngestionDocumentsPermissions(client_user_b, ingestion_b_items, False, True) self.checkIngestionDocumentsPermissions(client_user_b, ingestion_b_item_dict)
self.checkIngestionDocumentsPermissions(client_user_b, ingestion_n_items, False, False) self.checkIngestionDocumentsPermissions(client_user_b, ingestion_n_item_dict)
# Check that the client without a project does not have access to any document # Check that the client without a project does not have access to any document
self.checkModulePermissions(client_user_n, False) self.checkModulePermissions(client_user_n)
self.checkIngestionDocumentsPermissions(client_user_n, ingestion_a_items, False, False) self.checkIngestionDocumentsPermissions(client_user_n, ingestion_a_item_dict)
self.checkIngestionDocumentsPermissions(client_user_n, ingestion_b_items, False, False) self.checkIngestionDocumentsPermissions(client_user_n, ingestion_b_item_dict)
self.checkIngestionDocumentsPermissions(client_user_n, ingestion_n_items, False, False) self.checkIngestionDocumentsPermissions(client_user_n, ingestion_n_item_dict)
# Check that both administrators, whether assigned to a project, have access to all documents # Check that both administrators, whether assigned to a project, have access to all documents
self.checkModulePermissions(admin_user_a, True) self.checkModulePermissions(admin_user_a)
self.checkIngestionDocumentsPermissions(admin_user_a, ingestion_a_items, True, True) self.checkIngestionDocumentsPermissions(admin_user_a, ingestion_a_item_dict)
self.checkIngestionDocumentsPermissions(admin_user_a, ingestion_b_items, True, False) self.checkIngestionDocumentsPermissions(admin_user_a, ingestion_b_item_dict)
self.checkIngestionDocumentsPermissions(admin_user_a, ingestion_n_items, True, False) self.checkIngestionDocumentsPermissions(admin_user_a, ingestion_n_item_dict)
self.checkModulePermissions(admin_user_a, True) self.checkModulePermissions(admin_user_a)
self.checkIngestionDocumentsPermissions(admin_user_n, ingestion_a_items, True, False) self.checkIngestionDocumentsPermissions(admin_user_n, ingestion_a_item_dict)
self.checkIngestionDocumentsPermissions(admin_user_n, ingestion_b_items, True, False) self.checkIngestionDocumentsPermissions(admin_user_n, ingestion_b_item_dict)
self.checkIngestionDocumentsPermissions(admin_user_n, ingestion_n_items, True, False) self.checkIngestionDocumentsPermissions(admin_user_n, ingestion_n_item_dict)
\ No newline at end of file
# Check that the ingestor user only has access to documents needed for ingestion
self.checkModulePermissions(self.ingestor_user)
self.checkIngestionDocumentsPermissions(self.ingestor_user, ingestion_a_item_dict)
self.checkIngestionDocumentsPermissions(self.ingestor_user, ingestion_b_item_dict)
self.checkIngestionDocumentsPermissions(self.ingestor_user, ingestion_n_item_dict)
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment