Commit 9da008b1 authored by Rafael Monnerat's avatar Rafael Monnerat

slapos_erp5: (test) pin Datetime to prevent flakly failures

  After the 25 of the month, the simulation generate extra SPL for discounts,
  pin DateTime to have stable assertions regardless the date the test runs.
parent 032ce4cf
Pipeline #36192 failed with stage
in 0 seconds
......@@ -325,118 +325,120 @@ class TestSlapOSVirtualMasterScenarioMixin(DefaultScenarioMixin):
class TestSlapOSVirtualMasterScenario(TestSlapOSVirtualMasterScenarioMixin):
def test_virtual_master_without_accounting_scenario(self):
currency, _, _, sale_person = self.bootstrapVirtualMasterTest(is_virtual_master_accountable=False)
with PinnedDateTime(self, DateTime('2024/02/17')):
currency, _, _, sale_person = self.bootstrapVirtualMasterTest(is_virtual_master_accountable=False)
self.tic()
self.tic()
self.logout()
# lets join as slapos administrator, which will own few compute_nodes
owner_reference = 'owner-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, owner_reference)
self.logout()
# lets join as slapos administrator, which will own few compute_nodes
owner_reference = 'owner-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, owner_reference)
self.login()
owner_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=owner_reference).getParentValue()
# owner_person.setCareerSubordinationValue(seller_organisation)
self.login()
owner_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=owner_reference).getParentValue()
# owner_person.setCareerSubordinationValue(seller_organisation)
self.tic()
self.tic()
# hooray, now it is time to create compute_nodes
self.logout()
self.login(sale_person.getUserId())
# hooray, now it is time to create compute_nodes
self.logout()
self.login(sale_person.getUserId())
# create a default project
project_relative_url = self.addProject(person=owner_person, currency=currency)
# create a default project
project_relative_url = self.addProject(person=owner_person, currency=currency)
self.logout()
self.login()
project = self.portal.restrictedTraverse(project_relative_url)
preference = self.portal.portal_preferences.slapos_default_system_preference
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % project.getRelativeUrl()
]
)
self.logout()
self.login()
project = self.portal.restrictedTraverse(project_relative_url)
preference = self.portal.portal_preferences.slapos_default_system_preference
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % project.getRelativeUrl()
]
)
self.tic()
self.tic()
self.logout()
self.login(owner_person.getUserId())
self.logout()
self.login(owner_person.getUserId())
public_server_title = 'Public Server for %s' % owner_reference
public_server_id = self.requestComputeNode(public_server_title, project.getReference())
public_server = self.portal.portal_catalog.getResultValue(
portal_type='Compute Node', reference=public_server_id)
self.setAccessToMemcached(public_server)
self.assertNotEqual(None, public_server)
self.setServerOpenPublic(public_server)
public_server.generateCertificate()
public_server_title = 'Public Server for %s' % owner_reference
public_server_id = self.requestComputeNode(public_server_title, project.getReference())
public_server = self.portal.portal_catalog.getResultValue(
portal_type='Compute Node', reference=public_server_id)
self.setAccessToMemcached(public_server)
self.assertNotEqual(None, public_server)
self.setServerOpenPublic(public_server)
public_server.generateCertificate()
# and install some software on them
public_server_software = self.generateNewSoftwareReleaseUrl()
public_instance_type = 'public type'
# and install some software on them
public_server_software = self.generateNewSoftwareReleaseUrl()
public_instance_type = 'public type'
self.supplySoftware(public_server, public_server_software)
self.supplySoftware(public_server, public_server_software)
# format the compute_nodes
self.formatComputeNode(public_server)
# format the compute_nodes
self.formatComputeNode(public_server)
software_product, release_variation, type_variation = self.addSoftwareProduct(
"instance product", project, public_server_software, public_instance_type
)
software_product, release_variation, type_variation = self.addSoftwareProduct(
"instance product", project, public_server_software, public_instance_type
)
self.addAllocationSupply("for compute node", public_server, software_product,
release_variation, type_variation)
self.addAllocationSupply("for compute node", public_server, software_product,
release_variation, type_variation)
self.tic()
self.logout()
self.login()
self.tic()
self.logout()
self.login()
self.checkServiceSubscriptionRequest(public_server)
self.checkServiceSubscriptionRequest(public_server)
# join as the another visitor and request software instance on public
# compute_node
self.logout()
public_reference = 'public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, public_reference)
# join as the another visitor and request software instance on public
# compute_node
self.logout()
public_reference = 'public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, public_reference)
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=public_reference).getParentValue()
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=public_reference).getParentValue()
public_instance_title = 'Public title %s' % self.generateNewId()
self.checkInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
public_server, project.getReference())
with PinnedDateTime(self, DateTime('2024/02/17 01:01')):
public_instance_title = 'Public title %s' % self.generateNewId()
self.checkInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
public_server, project.getReference())
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type='ERP5 Login', reference=public_reference).getParentValue()
self.login(owner_person.getUserId())
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type='ERP5 Login', reference=public_reference).getParentValue()
self.login(owner_person.getUserId())
# and the instances
self.checkInstanceUnallocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type, public_server,
project.getReference())
# and the instances
self.checkInstanceUnallocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type, public_server,
project.getReference())
# and uninstall some software on them
self.logout()
self.login(owner_person.getUserId())
self.supplySoftware(public_server, public_server_software,
state='destroyed')
# and uninstall some software on them
self.logout()
self.login(owner_person.getUserId())
self.supplySoftware(public_server, public_server_software,
state='destroyed')
self.logout()
# Uninstall from compute_node
self.login()
self.simulateSlapgridSR(public_server)
self.logout()
# Uninstall from compute_node
self.login()
self.simulateSlapgridSR(public_server)
self.tic()
self.tic()
self.login()
# Ensure no unexpected object has been created
......@@ -500,13 +502,13 @@ class TestSlapOSVirtualMasterScenario(TestSlapOSVirtualMasterScenarioMixin):
# object. It requires to be created under shadow user for security reasons.
tmp_subscription_request = createTempSubscription(person, source_section, total_price, currency)
return person.Entity_createDepositPaymentTransaction([tmp_subscription_request])
payment_transaction = owner_person.Person_restrictMethodAsShadowUser(
shadow_document=owner_person,
callable_object=wrapWithShadow,
argument_list=[owner_person, seller_organisation.getRelativeUrl(),
total_price, currency.getRelativeUrl()])
self.tic()
self.logout()
self.login()
......@@ -699,9 +701,9 @@ class TestSlapOSVirtualMasterScenario(TestSlapOSVirtualMasterScenarioMixin):
# Pay deposit to validate virtual master + one computer, for the organisation
# For now we cannot rely on user payments
deposit_amount = 42.0 + 99.0
deposit_amount = 42.0 + 99.0
ledger = self.portal.portal_categories.ledger.automated
outstanding_amount_list = customer_section_organisation.Entity_getOutstandingDepositAmountList(
currency.getUid(), ledger_uid=ledger.getUid())
amount = sum([i.total_price for i in outstanding_amount_list])
......@@ -907,14 +909,14 @@ class TestSlapOSVirtualMasterScenario(TestSlapOSVirtualMasterScenarioMixin):
self.login(project_owner_person.getUserId())
# Pay deposit to validate virtual master + one computer
deposit_amount = 42.0 + 99.0
deposit_amount = 42.0 + 99.0
ledger = self.portal.portal_categories.ledger.automated
outstanding_amount_list = project_owner_person.Entity_getOutstandingDepositAmountList(
currency.getUid(), ledger_uid=ledger.getUid())
amount = sum([i.total_price for i in outstanding_amount_list])
self.assertEqual(amount, deposit_amount)
# Ensure to pay from the website
outstanding_amount = self.web_site.restrictedTraverse(outstanding_amount_list[0].getRelativeUrl())
outstanding_amount.Base_createExternalPaymentTransactionFromOutstandingAmountAndRedirect()
......@@ -1036,142 +1038,149 @@ class TestSlapOSVirtualMasterScenario(TestSlapOSVirtualMasterScenarioMixin):
def test_virtual_master_slave_without_accounting_scenario(self):
currency, _, _, sale_person = self.bootstrapVirtualMasterTest(is_virtual_master_accountable=False)
with PinnedDateTime(self, DateTime('2024/02/17')):
currency, _, _, sale_person = self.bootstrapVirtualMasterTest(is_virtual_master_accountable=False)
self.web_site = self.portal.web_site_module.slapos_master_panel
self.web_site = self.portal.web_site_module.slapos_master_panel
# some preparation
self.logout()
# some preparation
self.logout()
# lets join as slapos administrator, which will own few compute_nodes
owner_reference = 'owner-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, owner_reference)
# lets join as slapos administrator, which will own few compute_nodes
owner_reference = 'owner-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, owner_reference)
self.login()
owner_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=owner_reference).getParentValue()
self.tic()
self.login()
owner_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=owner_reference).getParentValue()
self.tic()
self.logout()
self.login(sale_person.getUserId())
# create a default project
project_relative_url = self.addProject(person=owner_person, currency=currency)
self.logout()
self.login(sale_person.getUserId())
# create a default project
project_relative_url = self.addProject(person=owner_person, currency=currency)
self.logout()
self.login()
project = self.portal.restrictedTraverse(project_relative_url)
self.logout()
self.login()
project = self.portal.restrictedTraverse(project_relative_url)
preference = self.portal.portal_preferences.slapos_default_system_preference
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % project.getRelativeUrl()
]
)
self.tic()
preference = self.portal.portal_preferences.slapos_default_system_preference
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % project.getRelativeUrl()
]
)
self.tic()
# hooray, now it is time to create compute_nodes
self.login(owner_person.getUserId())
# hooray, now it is time to create compute_nodes
self.login(owner_person.getUserId())
public_server_title = 'Public Server for %s' % owner_reference
public_server_id = self.requestComputeNode(public_server_title, project.getReference())
public_server = self.portal.portal_catalog.getResultValue(
portal_type='Compute Node', reference=public_server_id)
self.setAccessToMemcached(public_server)
self.assertNotEqual(None, public_server)
self.setServerOpenPublic(public_server)
public_server.generateCertificate()
# and install some software on them
public_server_software = self.generateNewSoftwareReleaseUrl()
self.supplySoftware(public_server, public_server_software)
#software_product, release_variation, type_variation = self.addSoftwareProduct(
public_instance_type = 'public type'
software_product, software_release, software_type = self.addSoftwareProduct(
"instance product", project, public_server_software, public_instance_type
)
public_server_title = 'Public Server for %s' % owner_reference
public_server_id = self.requestComputeNode(public_server_title, project.getReference())
public_server = self.portal.portal_catalog.getResultValue(
portal_type='Compute Node', reference=public_server_id)
self.setAccessToMemcached(public_server)
self.assertNotEqual(None, public_server)
self.setServerOpenPublic(public_server)
public_server.generateCertificate()
self.addAllocationSupply("for compute node", public_server, software_product,
software_release, software_type)
# and install some software on them
public_server_software = self.generateNewSoftwareReleaseUrl()
self.supplySoftware(public_server, public_server_software)
# format the compute_nodes
self.formatComputeNode(public_server)
#software_product, release_variation, type_variation = self.addSoftwareProduct(
public_instance_type = 'public type'
software_product, software_release, software_type = self.addSoftwareProduct(
"instance product", project, public_server_software, public_instance_type
)
# join as the another visitor and request software instance on public
# compute_node
self.logout()
public_reference = 'public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, public_reference)
self.addAllocationSupply("for compute node", public_server, software_product,
software_release, software_type)
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=public_reference).getParentValue()
# format the compute_nodes
self.formatComputeNode(public_server)
public_instance_title = 'Public title %s' % self.generateNewId()
self.checkInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
public_server, project.getReference())
# join as the another visitor and request software instance on public
# compute_node
self.logout()
public_reference = 'public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, public_reference)
# hooray, now it is time to create compute_nodes
self.login(owner_person.getUserId())
instance_node_title = 'Shared Instance for %s' % owner_reference
# Convert the Software Instance into an Instance Node
# to explicitely mark it as accepting Slave Instance
software_instance = self.portal.portal_catalog.getResultValue(
portal_type='Software Instance', title=public_instance_title)
instance_node = self.addInstanceNode(instance_node_title, software_instance)
slave_server_software = self.generateNewSoftwareReleaseUrl()
slave_instance_type = 'slave type'
software_product, software_release, software_type = self.addSoftwareProduct(
'share product', project, slave_server_software, slave_instance_type
)
self.addAllocationSupply("for instance node", instance_node, software_product,
software_release, software_type)
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=public_reference).getParentValue()
slave_instance_title = 'Slave title %s' % self.generateNewId()
self.checkSlaveInstanceAllocation(public_person.getUserId(),
public_reference, slave_instance_title,
slave_server_software, slave_instance_type,
public_server, project.getReference())
with PinnedDateTime(self, DateTime('2024/02/17 00:05')):
public_instance_title = 'Public title %s' % self.generateNewId()
self.checkInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
public_server, project.getReference())
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type='ERP5 Login', reference=public_reference).getParentValue()
self.login(owner_person.getUserId())
# hooray, now it is time to create compute_nodes
self.login(owner_person.getUserId())
instance_node_title = 'Shared Instance for %s' % owner_reference
# Convert the Software Instance into an Instance Node
# to explicitely mark it as accepting Slave Instance
software_instance = self.portal.portal_catalog.getResultValue(
portal_type='Software Instance', title=public_instance_title)
instance_node = self.addInstanceNode(instance_node_title, software_instance)
slave_server_software = self.generateNewSoftwareReleaseUrl()
slave_instance_type = 'slave type'
software_product, software_release, software_type = self.addSoftwareProduct(
'share product', project, slave_server_software, slave_instance_type
)
self.addAllocationSupply("for instance node", instance_node, software_product,
software_release, software_type)
# and the instances
self.checkSlaveInstanceUnallocation(public_person.getUserId(),
public_reference, slave_instance_title,
slave_server_software, slave_instance_type, public_server,
project.getReference())
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=public_reference).getParentValue()
# and uninstall some software on them
self.logout()
self.login(owner_person.getUserId())
self.supplySoftware(public_server, public_server_software,
state='destroyed')
slave_instance_title = 'Slave title %s' % self.generateNewId()
self.checkSlaveInstanceAllocation(public_person.getUserId(),
public_reference, slave_instance_title,
slave_server_software, slave_instance_type,
public_server, project.getReference())
self.logout()
# Uninstall from compute_node
self.login()
self.simulateSlapgridSR(public_server)
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type='ERP5 Login', reference=public_reference).getParentValue()
self.login(owner_person.getUserId())
self.tic()
# and the instances
self.checkSlaveInstanceUnallocation(public_person.getUserId(),
public_reference, slave_instance_title,
slave_server_software, slave_instance_type, public_server,
project.getReference())
self.login()
# Ensure no unexpected object has been created
# 6 allocation supply/line/cell
# 2 compute/instance node
# 1 credential request
# 2 instance tree
# 9 open sale order / line
# 3 assignment
# and uninstall some software on them
self.logout()
self.login(owner_person.getUserId())
self.supplySoftware(public_server, public_server_software,
state='destroyed')
self.logout()
# Uninstall from compute_node
self.login()
self.simulateSlapgridSR(public_server)
self.tic()
self.login()
# Ensure no unexpected object has been created
# 6 allocation supply/line/cell
# 2 compute/instance node
# 1 credential request
# 2 instance tree
# 9 open sale order / line
# 3 assignment
# 4 simulation movement
# 4 sale packing list
# 2 sale trade condition
......@@ -1185,100 +1194,102 @@ class TestSlapOSVirtualMasterScenario(TestSlapOSVirtualMasterScenarioMixin):
def test_virtual_master_slave_on_same_tree_without_accounting_scenario(self):
currency, _, _, sale_person = self.bootstrapVirtualMasterTest(is_virtual_master_accountable=False)
with PinnedDateTime(self, DateTime('2024/02/17')):
currency, _, _, sale_person = self.bootstrapVirtualMasterTest(is_virtual_master_accountable=False)
self.web_site = self.portal.web_site_module.slapos_master_panel
self.web_site = self.portal.web_site_module.slapos_master_panel
# some preparation
self.logout()
# some preparation
self.logout()
# lets join as slapos administrator, which will own few compute_nodes
owner_reference = 'owner-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, owner_reference)
# lets join as slapos administrator, which will own few compute_nodes
owner_reference = 'owner-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, owner_reference)
self.login()
owner_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=owner_reference).getParentValue()
self.tic()
self.login()
owner_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=owner_reference).getParentValue()
self.tic()
self.logout()
self.login(sale_person.getUserId())
# create a default project
project_relative_url = self.addProject(person=owner_person, currency=currency)
self.logout()
self.login(sale_person.getUserId())
# create a default project
project_relative_url = self.addProject(person=owner_person, currency=currency)
self.logout()
self.login()
project = self.portal.restrictedTraverse(project_relative_url)
self.logout()
self.login()
project = self.portal.restrictedTraverse(project_relative_url)
preference = self.portal.portal_preferences.slapos_default_system_preference
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % project.getRelativeUrl()
]
)
self.tic()
preference = self.portal.portal_preferences.slapos_default_system_preference
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % project.getRelativeUrl()
]
)
self.tic()
# hooray, now it is time to create compute_nodes
self.login(owner_person.getUserId())
# hooray, now it is time to create compute_nodes
self.login(owner_person.getUserId())
public_server_title = 'Public Server for %s' % owner_reference
public_server_id = self.requestComputeNode(public_server_title, project.getReference())
public_server = self.portal.portal_catalog.getResultValue(
portal_type='Compute Node', reference=public_server_id)
self.setAccessToMemcached(public_server)
self.assertNotEqual(None, public_server)
self.setServerOpenPublic(public_server)
public_server.generateCertificate()
# and install some software on them
public_server_software = self.generateNewSoftwareReleaseUrl()
self.supplySoftware(public_server, public_server_software)
#software_product, release_variation, type_variation = self.addSoftwareProduct(
public_instance_type = 'public type'
software_product, software_release, software_type = self.addSoftwareProduct(
"instance product", project, public_server_software, public_instance_type
)
public_server_title = 'Public Server for %s' % owner_reference
public_server_id = self.requestComputeNode(public_server_title, project.getReference())
public_server = self.portal.portal_catalog.getResultValue(
portal_type='Compute Node', reference=public_server_id)
self.setAccessToMemcached(public_server)
self.assertNotEqual(None, public_server)
self.setServerOpenPublic(public_server)
public_server.generateCertificate()
self.addAllocationSupply("for compute node", public_server, software_product,
software_release, software_type,
is_slave_on_same_instance_tree_allocable=True)
# and install some software on them
public_server_software = self.generateNewSoftwareReleaseUrl()
self.supplySoftware(public_server, public_server_software)
# format the compute_nodes
self.formatComputeNode(public_server)
#software_product, release_variation, type_variation = self.addSoftwareProduct(
public_instance_type = 'public type'
software_product, software_release, software_type = self.addSoftwareProduct(
"instance product", project, public_server_software, public_instance_type
)
# join as the another visitor and request software instance on public
# compute_node
self.logout()
public_reference = 'public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, public_reference)
self.addAllocationSupply("for compute node", public_server, software_product,
software_release, software_type,
is_slave_on_same_instance_tree_allocable=True)
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=public_reference).getParentValue()
# format the compute_nodes
self.formatComputeNode(public_server)
public_instance_title = 'Public title %s' % self.generateNewId()
self.checkInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
public_server, project.getReference())
# join as the another visitor and request software instance on public
# compute_node
self.logout()
public_reference = 'public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, public_reference)
self.tic()
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=public_reference).getParentValue()
# request slave instance on the same instance tree
self.login(public_person.getUserId())
slave_instance_title = 'Slave title %s' % self.generateNewId()
self.checkInstanceTreeSlaveInstanceAllocation(
public_person.getUserId(),
public_reference, public_instance_title,
slave_instance_title,
public_server_software, public_instance_type,
public_server, project.getReference()
)
with PinnedDateTime(self, DateTime('2024/02/17 00:05')):
public_instance_title = 'Public title %s' % self.generateNewId()
self.checkInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
public_server, project.getReference())
self.tic()
# request slave instance on the same instance tree
self.login(public_person.getUserId())
slave_instance_title = 'Slave title %s' % self.generateNewId()
self.checkInstanceTreeSlaveInstanceAllocation(
public_person.getUserId(),
public_reference, public_instance_title,
slave_instance_title,
public_server_software, public_instance_type,
public_server, project.getReference()
)
self.login()
# Ensure no unexpected object has been created
......@@ -1301,201 +1312,203 @@ class TestSlapOSVirtualMasterScenario(TestSlapOSVirtualMasterScenarioMixin):
def test_virtual_master_on_remote_tree_without_accounting_scenario(self):
currency, _, _, sale_person = self.bootstrapVirtualMasterTest(is_virtual_master_accountable=False)
with PinnedDateTime(self, DateTime('2024/02/17')):
currency, _, _, sale_person = self.bootstrapVirtualMasterTest(is_virtual_master_accountable=False)
self.web_site = self.portal.web_site_module.slapos_master_panel
self.web_site = self.portal.web_site_module.slapos_master_panel
# some preparation
self.logout()
# lets join as slapos administrator, which will own few compute_nodes
remote_owner_reference = 'remote-owner-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, remote_owner_reference)
# some preparation
self.logout()
self.login()
remote_owner_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=remote_owner_reference).getParentValue()
self.tic()
# lets join as slapos administrator, which will own few compute_nodes
remote_owner_reference = 'remote-owner-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, remote_owner_reference)
self.logout()
self.login(sale_person.getUserId())
# create a default project
remote_project_relative_url = self.addProject(person=remote_owner_person, currency=currency)
self.login()
remote_owner_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=remote_owner_reference).getParentValue()
self.tic()
self.logout()
self.login()
remote_project = self.portal.restrictedTraverse(remote_project_relative_url)
self.logout()
self.login(sale_person.getUserId())
# create a default project
remote_project_relative_url = self.addProject(person=remote_owner_person, currency=currency)
preference = self.portal.portal_preferences.slapos_default_system_preference
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % remote_project.getRelativeUrl()
]
)
self.tic()
self.logout()
self.login()
remote_project = self.portal.restrictedTraverse(remote_project_relative_url)
# hooray, now it is time to create compute_nodes
self.login(remote_owner_person.getUserId())
preference = self.portal.portal_preferences.slapos_default_system_preference
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % remote_project.getRelativeUrl()
]
)
self.tic()
remote_server_title = 'Remote Server for %s' % remote_owner_person
remote_server_id = self.requestComputeNode(remote_server_title, remote_project.getReference())
remote_server = self.portal.portal_catalog.getResultValue(
portal_type='Compute Node', reference=remote_server_id)
self.setAccessToMemcached(remote_server)
self.assertNotEqual(None, remote_server)
self.setServerOpenPublic(remote_server)
remote_server.generateCertificate()
# hooray, now it is time to create compute_nodes
self.login(remote_owner_person.getUserId())
# and install some software on them
remote_server_software = self.generateNewSoftwareReleaseUrl()
remote_instance_type = 'public type'
remote_server_title = 'Remote Server for %s' % remote_owner_person
remote_server_id = self.requestComputeNode(remote_server_title, remote_project.getReference())
remote_server = self.portal.portal_catalog.getResultValue(
portal_type='Compute Node', reference=remote_server_id)
self.setAccessToMemcached(remote_server)
self.assertNotEqual(None, remote_server)
self.setServerOpenPublic(remote_server)
remote_server.generateCertificate()
self.supplySoftware(remote_server, remote_server_software)
# and install some software on them
remote_server_software = self.generateNewSoftwareReleaseUrl()
remote_instance_type = 'public type'
# format the compute_nodes
self.formatComputeNode(remote_server)
self.supplySoftware(remote_server, remote_server_software)
remote_software_product, remote_release_variation, remote_type_variation = self.addSoftwareProduct(
"remote product", remote_project, remote_server_software, remote_instance_type
)
# format the compute_nodes
self.formatComputeNode(remote_server)
self.addAllocationSupply("for compute node", remote_server, remote_software_product,
remote_release_variation, remote_type_variation)
remote_software_product, remote_release_variation, remote_type_variation = self.addSoftwareProduct(
"remote product", remote_project, remote_server_software, remote_instance_type
)
# join as the another visitor and request software instance on public
# compute_node
self.logout()
remote_public_reference = 'remote-public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, remote_public_reference)
self.addAllocationSupply("for compute node", remote_server, remote_software_product,
remote_release_variation, remote_type_variation)
self.login()
remote_public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=remote_public_reference).getParentValue()
# join as the another visitor and request software instance on public
# compute_node
self.logout()
remote_public_reference = 'remote-public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, remote_public_reference)
####################################
# Create a local project
####################################
self.logout()
self.login(sale_person.getUserId())
# create a default project
project_relative_url = self.addProject(person=remote_public_person, currency=currency)
self.login()
remote_public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=remote_public_reference).getParentValue()
self.logout()
self.login()
project = self.portal.restrictedTraverse(project_relative_url)
####################################
# Create a local project
####################################
self.logout()
self.login(sale_person.getUserId())
# create a default project
project_relative_url = self.addProject(person=remote_public_person, currency=currency)
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % project.getRelativeUrl()
]
)
self.tic()
self.logout()
self.login()
project = self.portal.restrictedTraverse(project_relative_url)
owner_person = remote_public_person
self.logout()
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % project.getRelativeUrl()
]
)
self.tic()
# hooray, now it is time to create compute_nodes
self.login(owner_person.getUserId())
owner_person = remote_public_person
self.logout()
remote_compute_node = self.requestRemoteNode(project, remote_project,
remote_public_person)
# hooray, now it is time to create compute_nodes
self.login(owner_person.getUserId())
# and install some software on them
public_server_software = remote_server_software
remote_compute_node = self.requestRemoteNode(project, remote_project,
remote_public_person)
#software_product, release_variation, type_variation = self.addSoftwareProduct(
public_instance_type = remote_instance_type
software_product, software_release, software_type = self.addSoftwareProduct(
"instance product", project, public_server_software, public_instance_type
)
# and install some software on them
public_server_software = remote_server_software
self.addAllocationSupply("for remote node", remote_compute_node, software_product,
software_release, software_type)
self.tic()
#software_product, release_variation, type_variation = self.addSoftwareProduct(
public_instance_type = remote_instance_type
software_product, software_release, software_type = self.addSoftwareProduct(
"instance product", project, public_server_software, public_instance_type
)
# join as the another visitor and request software instance on public
# compute_node
self.logout()
public_reference = 'public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, public_reference)
self.addAllocationSupply("for remote node", remote_compute_node, software_product,
software_release, software_type)
self.tic()
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=public_reference).getParentValue()
# join as the another visitor and request software instance on public
# compute_node
self.logout()
public_reference = 'public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, public_reference)
public_instance_title = 'Public title %s' % self.generateNewId()
self.checkRemoteInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
remote_compute_node, project.getReference())
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=public_reference).getParentValue()
# XXX Do this for every scenario tests
self.logout()
self.tic()
# now instantiate it on compute_node and set some nice connection dict
self.simulateSlapgridCP(remote_server)
self.tic()
self.login()
with PinnedDateTime(self, DateTime('2024/02/17 01:01')):
public_instance_title = 'Public title %s' % self.generateNewId()
self.checkRemoteInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
remote_compute_node, project.getReference())
# owner_person should have one Instance Tree created by alarm
owner_instance_tree_list = self.portal.portal_catalog(
portal_type='Instance Tree',
destination_section__uid=owner_person.getUid()
)
self.assertEqual(1, len(owner_instance_tree_list))
owner_software_instance = owner_instance_tree_list[0].getSuccessorValue()
self.assertEqual('Software Instance', owner_software_instance.getPortalType())
self.assertEqual(
remote_server.getRelativeUrl(),
owner_software_instance.getAggregateValue().getParentValue().getRelativeUrl()
)
# XXX Do this for every scenario tests
self.logout()
self.tic()
# now instantiate it on compute_node and set some nice connection dict
self.simulateSlapgridCP(remote_server)
self.tic()
self.login()
# public_person should have one Instance Tree
public_instance_tree_list = self.portal.portal_catalog(
portal_type='Instance Tree',
destination_section__uid=public_person.getUid()
)
self.assertEqual(1, len(public_instance_tree_list))
# owner_person should have one Instance Tree created by alarm
owner_instance_tree_list = self.portal.portal_catalog(
portal_type='Instance Tree',
destination_section__uid=owner_person.getUid()
)
self.assertEqual(1, len(owner_instance_tree_list))
owner_software_instance = owner_instance_tree_list[0].getSuccessorValue()
self.assertEqual('Software Instance', owner_software_instance.getPortalType())
self.assertEqual(
remote_server.getRelativeUrl(),
owner_software_instance.getAggregateValue().getParentValue().getRelativeUrl()
)
self.assertEqual(
'_remote_%s_%s' % (project.getReference(),
public_instance_tree_list[0].getSuccessorReference()),
owner_software_instance.getTitle()
)
connection_dict = owner_software_instance.getConnectionXmlAsDict()
self.assertSameSet(('url_1', 'url_2'), connection_dict.keys())
self.assertSameSet(
['http://%s/' % q.getIpAddress() for q in
owner_software_instance.getAggregateValue().contentValues(portal_type='Internet Protocol Address')],
connection_dict.values())
# public_person should have one Instance Tree
public_instance_tree_list = self.portal.portal_catalog(
portal_type='Instance Tree',
destination_section__uid=public_person.getUid()
)
self.assertEqual(1, len(public_instance_tree_list))
self.checkRemoteInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
remote_compute_node, project.getReference(),
connection_dict_to_check=owner_software_instance.getConnectionXmlAsDict())
self.assertEqual(
'_remote_%s_%s' % (project.getReference(),
public_instance_tree_list[0].getSuccessorReference()),
owner_software_instance.getTitle()
)
connection_dict = owner_software_instance.getConnectionXmlAsDict()
self.assertSameSet(('url_1', 'url_2'), connection_dict.keys())
self.assertSameSet(
['http://%s/' % q.getIpAddress() for q in
owner_software_instance.getAggregateValue().contentValues(portal_type='Internet Protocol Address')],
connection_dict.values())
self.checkRemoteInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
remote_compute_node, project.getReference(),
connection_dict_to_check=owner_software_instance.getConnectionXmlAsDict())
# Destroy the instance, and ensure the remote one is destroyed too
self.checkRemoteInstanceUnallocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
remote_compute_node, project.getReference())
# Destroy the instance, and ensure the remote one is destroyed too
self.checkRemoteInstanceUnallocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
remote_compute_node, project.getReference())
self.login()
self.login()
# Report destruction from compute_node
self.simulateSlapgridUR(remote_server)
self.assertEqual(
"",
owner_software_instance.getAggregate("")
)
# Report destruction from compute_node
self.simulateSlapgridUR(remote_server)
self.assertEqual(
"",
owner_software_instance.getAggregate("")
)
# Ensure no unexpected object has been created
# 3 allocation supply/line/cell
......@@ -1532,241 +1545,243 @@ class TestSlapOSVirtualMasterScenario(TestSlapOSVirtualMasterScenarioMixin):
def test_virtual_master_slave_instance_on_remote_tree_without_accounting_scenario(self):
currency, _, _, sale_person = self.bootstrapVirtualMasterTest(is_virtual_master_accountable=False)
with PinnedDateTime(self, DateTime('2024/02/17')):
currency, _, _, sale_person = self.bootstrapVirtualMasterTest(is_virtual_master_accountable=False)
self.web_site = self.portal.web_site_module.slapos_master_panel
self.web_site = self.portal.web_site_module.slapos_master_panel
# some preparation
self.logout()
# some preparation
self.logout()
# lets join as slapos administrator, which will own few compute_nodes
remote_owner_reference = 'remote-owner-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, remote_owner_reference)
# lets join as slapos administrator, which will own few compute_nodes
remote_owner_reference = 'remote-owner-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, remote_owner_reference)
self.login()
remote_owner_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=remote_owner_reference).getParentValue()
self.tic()
self.login()
remote_owner_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=remote_owner_reference).getParentValue()
self.tic()
self.logout()
self.login(sale_person.getUserId())
# create a default project
remote_project_relative_url = self.addProject(person=remote_owner_person, currency=currency)
self.logout()
self.login(sale_person.getUserId())
# create a default project
remote_project_relative_url = self.addProject(person=remote_owner_person, currency=currency)
self.logout()
self.login()
remote_project = self.portal.restrictedTraverse(remote_project_relative_url)
self.logout()
self.login()
remote_project = self.portal.restrictedTraverse(remote_project_relative_url)
preference = self.portal.portal_preferences.slapos_default_system_preference
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % remote_project.getRelativeUrl()
]
)
self.tic()
preference = self.portal.portal_preferences.slapos_default_system_preference
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % remote_project.getRelativeUrl()
]
)
self.tic()
# hooray, now it is time to create compute_nodes
self.login(remote_owner_person.getUserId())
# hooray, now it is time to create compute_nodes
self.login(remote_owner_person.getUserId())
remote_server_title = 'Remote Server for %s' % remote_owner_person
remote_server_id = self.requestComputeNode(remote_server_title, remote_project.getReference())
remote_server = self.portal.portal_catalog.getResultValue(
portal_type='Compute Node', reference=remote_server_id)
self.setAccessToMemcached(remote_server)
self.assertNotEqual(None, remote_server)
self.setServerOpenPublic(remote_server)
remote_server.generateCertificate()
remote_server_title = 'Remote Server for %s' % remote_owner_person
remote_server_id = self.requestComputeNode(remote_server_title, remote_project.getReference())
remote_server = self.portal.portal_catalog.getResultValue(
portal_type='Compute Node', reference=remote_server_id)
self.setAccessToMemcached(remote_server)
self.assertNotEqual(None, remote_server)
self.setServerOpenPublic(remote_server)
remote_server.generateCertificate()
# and install some software on them
remote_server_software = self.generateNewSoftwareReleaseUrl()
remote_instance_type = 'public type'
# and install some software on them
remote_server_software = self.generateNewSoftwareReleaseUrl()
remote_instance_type = 'public type'
self.supplySoftware(remote_server, remote_server_software)
self.supplySoftware(remote_server, remote_server_software)
# format the compute_nodes
self.formatComputeNode(remote_server)
# format the compute_nodes
self.formatComputeNode(remote_server)
remote_software_product, remote_release_variation, remote_type_variation = self.addSoftwareProduct(
"remote product", remote_project, remote_server_software, remote_instance_type
)
remote_software_product, remote_release_variation, remote_type_variation = self.addSoftwareProduct(
"remote product", remote_project, remote_server_software, remote_instance_type
)
self.addAllocationSupply("for compute node", remote_server, remote_software_product,
remote_release_variation, remote_type_variation,
destination_value=remote_owner_person)
private_instance_title = 'Private title %s' % self.generateNewId()
self.checkInstanceAllocation(remote_owner_person.getUserId(),
remote_owner_reference, private_instance_title,
remote_server_software, remote_instance_type,
remote_server, remote_project.getReference())
# Convert the Software Instance into an Instance Node
# to explicitely mark it as accepting Slave Instance
private_software_instance = self.portal.portal_catalog.getResultValue(
portal_type='Software Instance', title=private_instance_title)
instance_node_title = 'Shared Instance for %s' % remote_owner_reference
instance_node = self.addInstanceNode(instance_node_title, private_software_instance)
self.addAllocationSupply(
"for instance node", instance_node, remote_software_product,
remote_release_variation, remote_type_variation)
# join as the another visitor and request software instance on public
# compute_node
self.logout()
remote_public_reference = 'remote-public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, remote_public_reference)
self.addAllocationSupply("for compute node", remote_server, remote_software_product,
remote_release_variation, remote_type_variation,
destination_value=remote_owner_person)
self.login()
remote_public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=remote_public_reference).getParentValue()
with PinnedDateTime(self, DateTime('2024/02/17 00:05')):
private_instance_title = 'Private title %s' % self.generateNewId()
self.checkInstanceAllocation(remote_owner_person.getUserId(),
remote_owner_reference, private_instance_title,
remote_server_software, remote_instance_type,
remote_server, remote_project.getReference())
####################################
# Create a local project
####################################
self.logout()
self.login(sale_person.getUserId())
# create a default project
project_relative_url = self.addProject(person=remote_public_person, currency=currency)
# Convert the Software Instance into an Instance Node
# to explicitely mark it as accepting Slave Instance
private_software_instance = self.portal.portal_catalog.getResultValue(
portal_type='Software Instance', title=private_instance_title)
instance_node_title = 'Shared Instance for %s' % remote_owner_reference
instance_node = self.addInstanceNode(instance_node_title, private_software_instance)
self.logout()
self.login()
project = self.portal.restrictedTraverse(project_relative_url)
self.addAllocationSupply(
"for instance node", instance_node, remote_software_product,
remote_release_variation, remote_type_variation)
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % project.getRelativeUrl()
]
)
self.tic()
# join as the another visitor and request software instance on public
# compute_node
self.logout()
remote_public_reference = 'remote-public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, remote_public_reference)
owner_person = remote_public_person
self.logout()
self.login()
remote_public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=remote_public_reference).getParentValue()
# hooray, now it is time to create compute_nodes
self.login(owner_person.getUserId())
####################################
# Create a local project
####################################
self.logout()
self.login(sale_person.getUserId())
# create a default project
project_relative_url = self.addProject(person=remote_public_person, currency=currency)
remote_compute_node = self.requestRemoteNode(project, remote_project,
remote_public_person)
self.logout()
self.login()
project = self.portal.restrictedTraverse(project_relative_url)
# and install some software on them
public_server_software = remote_server_software
preference.edit(
preferred_subscription_assignment_category_list=[
'function/customer',
'role/client',
'destination_project/%s' % project.getRelativeUrl()
]
)
self.tic()
#software_product, release_variation, type_variation = self.addSoftwareProduct(
public_instance_type = remote_instance_type
software_product, software_release, software_type = self.addSoftwareProduct(
"instance product", project, public_server_software, public_instance_type
)
owner_person = remote_public_person
self.logout()
self.addAllocationSupply("for remote node", remote_compute_node, software_product,
software_release, software_type)
self.tic()
# hooray, now it is time to create compute_nodes
self.login(owner_person.getUserId())
# join as the another visitor and request software instance on public
# compute_node
self.logout()
public_reference = 'public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, public_reference)
remote_compute_node = self.requestRemoteNode(project, remote_project,
remote_public_person)
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=public_reference).getParentValue()
# and install some software on them
public_server_software = remote_server_software
public_instance_title = 'Public title %s' % self.generateNewId()
self.checkRemoteInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
remote_compute_node, project.getReference(),
slave=True)
#software_product, release_variation, type_variation = self.addSoftwareProduct(
public_instance_type = remote_instance_type
software_product, software_release, software_type = self.addSoftwareProduct(
"instance product", project, public_server_software, public_instance_type
)
# XXX Do this for every scenario tests
self.logout()
self.tic()
# now instantiate it on compute_node and set some nice connection dict
self.simulateSlapgridCP(remote_server)
self.tic()
self.addAllocationSupply("for remote node", remote_compute_node, software_product,
software_release, software_type)
self.tic()
self.login()
# owner_person should have one Instance Tree created by alarm
owner_instance_tree_list = self.portal.portal_catalog(
portal_type='Instance Tree',
destination_section__uid=owner_person.getUid()
)
self.assertEqual(1, len(owner_instance_tree_list))
owner_software_instance = owner_instance_tree_list[0].getSuccessorValue()
self.assertEqual('Slave Instance', owner_software_instance.getPortalType())
self.assertEqual(
remote_server.getRelativeUrl(),
owner_software_instance.getAggregateValue().getParentValue().getRelativeUrl()
)
# join as the another visitor and request software instance on public
# compute_node
self.logout()
public_reference = 'public-%s' % self.generateNewId()
self.joinSlapOS(self.web_site, public_reference)
# public_person should have one Instance Tree
public_instance_tree_list = self.portal.portal_catalog(
portal_type='Instance Tree',
destination_section__uid=public_person.getUid()
)
self.assertEqual(1, len(public_instance_tree_list))
self.login()
public_person = self.portal.portal_catalog.getResultValue(
portal_type="ERP5 Login",
reference=public_reference).getParentValue()
self.assertEqual(
'_remote_%s_%s' % (project.getReference(),
public_instance_tree_list[0].getSuccessorReference()),
owner_software_instance.getTitle()
)
connection_dict = owner_software_instance.getConnectionXmlAsDict()
self.assertSameSet(('url_1', 'url_2'), connection_dict.keys())
self.assertSameSet(
['http://%s/%s' % (q.getIpAddress(), owner_software_instance.getReference()) for q in
owner_software_instance.getAggregateValue().contentValues(portal_type='Internet Protocol Address')],
connection_dict.values())
public_instance_title = 'Public title %s' % self.generateNewId()
self.checkRemoteInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
remote_compute_node, project.getReference(),
slave=True)
self.checkRemoteInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
remote_compute_node, project.getReference(),
connection_dict_to_check=owner_software_instance.getConnectionXmlAsDict(),
slave=True)
# XXX Do this for every scenario tests
self.logout()
self.tic()
# now instantiate it on compute_node and set some nice connection dict
self.simulateSlapgridCP(remote_server)
self.tic()
self.login()
self.login()
# owner_person should have one Instance Tree created by alarm
owner_instance_tree_list = self.portal.portal_catalog(
portal_type='Instance Tree',
destination_section__uid=owner_person.getUid()
)
self.assertEqual(1, len(owner_instance_tree_list))
owner_software_instance = owner_instance_tree_list[0].getSuccessorValue()
self.assertEqual('Slave Instance', owner_software_instance.getPortalType())
self.assertEqual(
remote_server.getRelativeUrl(),
owner_software_instance.getAggregateValue().getParentValue().getRelativeUrl()
)
# Ensure no unexpected object has been created
# 6 allocation supply/line/cell
# 3 compute/remote/instance node
# 1 credential request
# 2 instance tree
# 9 open sale order / line
# 3 assignment
# 4 simulation movements
# 4 sale packing list / line
# 2 sale trade condition
# 1 software installation
# 2 software instance
# 1 software product
# 4 subscription requests
self.assertRelatedObjectCount(remote_project, 42)
# public_person should have one Instance Tree
public_instance_tree_list = self.portal.portal_catalog(
portal_type='Instance Tree',
destination_section__uid=public_person.getUid()
)
self.assertEqual(1, len(public_instance_tree_list))
# Ensure no unexpected object has been created
# 3 allocation supply/line/cell
# 1 compute node
# 1 credential request
# 1 instance tree
# 4 open sale order / line
# 3 assignment
# 2 simulation movements
# 2 sale packing list / line
# 2 sale trade condition
# 1 software instance
# 1 software product
# 2 subscription requests
self.assertRelatedObjectCount(project, 23)
self.assertEqual(
'_remote_%s_%s' % (project.getReference(),
public_instance_tree_list[0].getSuccessorReference()),
owner_software_instance.getTitle()
)
connection_dict = owner_software_instance.getConnectionXmlAsDict()
self.assertSameSet(('url_1', 'url_2'), connection_dict.keys())
self.assertSameSet(
['http://%s/%s' % (q.getIpAddress(), owner_software_instance.getReference()) for q in
owner_software_instance.getAggregateValue().contentValues(portal_type='Internet Protocol Address')],
connection_dict.values())
self.checkRemoteInstanceAllocation(public_person.getUserId(),
public_reference, public_instance_title,
public_server_software, public_instance_type,
remote_compute_node, project.getReference(),
connection_dict_to_check=owner_software_instance.getConnectionXmlAsDict(),
slave=True)
self.checkERP5StateBeforeExit()
self.login()
# Ensure no unexpected object has been created
# 6 allocation supply/line/cell
# 3 compute/remote/instance node
# 1 credential request
# 2 instance tree
# 9 open sale order / line
# 3 assignment
# 4 simulation movements
# 4 sale packing list / line
# 2 sale trade condition
# 1 software installation
# 2 software instance
# 1 software product
# 4 subscription requests
self.assertRelatedObjectCount(remote_project, 42)
# Ensure no unexpected object has been created
# 3 allocation supply/line/cell
# 1 compute node
# 1 credential request
# 1 instance tree
# 4 open sale order / line
# 3 assignment
# 2 simulation movements
# 2 sale packing list / line
# 2 sale trade condition
# 1 software instance
# 1 software product
# 2 subscription requests
self.assertRelatedObjectCount(project, 23)
self.checkERP5StateBeforeExit()
"""
def test_open_order_with_service_scenario(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment