Commit 74aa0db6 authored by Thomas Gambier's avatar Thomas Gambier 🚴🏼 Committed by Rafael Monnerat

[slapos_pdm]: fix UpgradeDecisionLine_cancel

/reviewed-on nexedi/slapos.core!117
parent 37d8708f
...@@ -5,7 +5,8 @@ software_release = context.getAggregateValue(portal_type="Software Release") ...@@ -5,7 +5,8 @@ software_release = context.getAggregateValue(portal_type="Software Release")
upgrade_decision = context.getParentValue() upgrade_decision = context.getParentValue()
if software_release.getValidationState() == "archived": if software_release.getValidationState() == "archived":
upgrade_decision.cancel(comment="Software Release is archived") upgrade_decision.cancel(comment="Software Release is archived.")
return
if hosting_subscription is not None: if hosting_subscription is not None:
if hosting_subscription.getSlapState() == "destroy_requested": if hosting_subscription.getSlapState() == "destroy_requested":
......
...@@ -1871,4 +1871,107 @@ ${new_software_release_url}""", ...@@ -1871,4 +1871,107 @@ ${new_software_release_url}""",
self.assertEqual(event.getSimulationState(), "delivered") self.assertEqual(event.getSimulationState(), "delivered")
def testUpgradeDecisionLine_cancel_archived_software_release(self):
software_release = self._makeSoftwareRelease()
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision_line = self._makeUpgradeDecisionLine(upgrade_decision)
upgrade_decision_line.setAggregateValueList([software_release])
self.tic()
software_release.archive()
upgrade_decision_line.UpgradeDecisionLine_cancel()
self.assertEqual('cancelled', upgrade_decision.getSimulationState())
workflow_history_list = upgrade_decision.Base_getWorkflowHistoryItemList('upgrade_decision_workflow', display=0)
self.assertEqual("Software Release is archived.", workflow_history_list[-1].comment)
@simulate('NotificationTool_getDocumentValue',
'reference=None',
'assert reference == "slapos-upgrade-delivered-computer.notification"\n' \
'return context.restrictedTraverse(' \
'context.REQUEST["testUpgradeDecisionLine_cancel_destroyed_hosting_subscription"])')
def testUpgradeDecisionLine_cancel_destroyed_hosting_subscription(self):
software_release = self._makeSoftwareRelease()
hosting_subscription = self._makeFullHostingSubscription(software_release.getUrlString())
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision_line = self._makeUpgradeDecisionLine(upgrade_decision)
upgrade_decision_line.setAggregateValueList([software_release, hosting_subscription])
notification_message = self.portal.notification_message_module.newContent(
portal_type="Notification Message",
title='Test NM title %s' % self.new_id,
text_content_substitution_mapping_method_id=
"NotificationMessage_getSubstitutionMappingDictFromArgument",
text_content="""${software_product_title}
${computer_title}
${computer_reference}
${software_release_name}
${software_release_reference}
${new_software_release_url}""",
content_type='text/html',
)
self.portal.REQUEST\
['testUpgradeDecisionLine_cancel_destroyed_hosting_subscription'] = \
notification_message.getRelativeUrl()
self.tic()
kw = dict(
software_release = hosting_subscription.getUrlString(),
software_type = hosting_subscription.getSourceReference(),
instance_xml = hosting_subscription.getTextContent(),
sla_xml = self.generateSafeXml(),
shared = False
)
hosting_subscription.requestDestroy(**kw)
self.tic()
upgrade_decision_line.UpgradeDecisionLine_cancel()
self.assertEqual('cancelled', upgrade_decision.getSimulationState())
workflow_history_list = upgrade_decision.Base_getWorkflowHistoryItemList('upgrade_decision_workflow', display=0)
self.assertEqual("Hosting Subscription is destroyed.", workflow_history_list[-1].comment)
@simulate('NotificationTool_getDocumentValue',
'reference=None',
'assert reference == "slapos-upgrade-delivered-computer.notification"\n' \
'return context.restrictedTraverse(' \
'context.REQUEST["testUpgradeDecisionLine_cancel_destroyed_hs_archived_sr"])')
def testUpgradeDecisionLine_cancel_destroyed_hs_archived_sr(self):
software_release = self._makeSoftwareRelease()
hosting_subscription = self._makeFullHostingSubscription(software_release.getUrlString())
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision_line = self._makeUpgradeDecisionLine(upgrade_decision)
upgrade_decision_line.setAggregateValueList([software_release, hosting_subscription])
notification_message = self.portal.notification_message_module.newContent(
portal_type="Notification Message",
title='Test NM title %s' % self.new_id,
text_content_substitution_mapping_method_id=
"NotificationMessage_getSubstitutionMappingDictFromArgument",
text_content="""${software_product_title}
${computer_title}
${computer_reference}
${software_release_name}
${software_release_reference}
${new_software_release_url}""",
content_type='text/html',
)
self.portal.REQUEST\
['testUpgradeDecisionLine_cancel_destroyed_hs_archived_sr'] = \
notification_message.getRelativeUrl()
self.tic()
kw = dict(
software_release = hosting_subscription.getUrlString(),
software_type = hosting_subscription.getSourceReference(),
instance_xml = hosting_subscription.getTextContent(),
sla_xml = self.generateSafeXml(),
shared = False
)
hosting_subscription.requestDestroy(**kw)
software_release.archive()
self.tic()
upgrade_decision_line.UpgradeDecisionLine_cancel()
self.assertEqual('cancelled', upgrade_decision.getSimulationState())
workflow_history_list = upgrade_decision.Base_getWorkflowHistoryItemList('upgrade_decision_workflow', display=0)
self.assertEqual("Software Release is archived.", workflow_history_list[-1].comment)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment