Commit ea940ce8 authored by Rafael Monnerat's avatar Rafael Monnerat

slapos_cloud: Introduce SlapOSComputeNodeMixin

   Move away part of SlapTool code into a Mixin specific to fill and get Computer Information cache
parent 2372fb34
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2010-2022 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from Products.ERP5Type.Cache import DEFAULT_CACHE_SCOPE
from AccessControl import Unauthorized
from AccessControl.Permissions import access_contents_information
from AccessControl import getSecurityManager
from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod
from Products.ERP5Type.tests.utils import DummyMailHostMixin
import time
from lxml import etree
from zLOG import LOG, INFO
try:
from slapos.slap.slap import (
Computer as ComputeNode,
ComputerPartition as SlapComputePartition,
SoftwareRelease)
from slapos.util import xml2dict, dumps
except ImportError:
# Do no prevent instance from starting
# if libs are not installed
class ComputeNode:
def __init__(self):
raise ImportError
class SlapComputePartition:
def __init__(self):
raise ImportError
class SoftwareRelease:
def __init__(self):
raise ImportError
def xml2dict(dictionary):
raise ImportError
def dumps(*args):
raise ImportError
def _assertACI(document):
sm = getSecurityManager()
if sm.checkPermission(access_contents_information,
document):
return document
raise Unauthorized('User %r has no access to %r' % (sm.getUser(), document))
class SlapOSComputeNodeMixin(object):
def _getCachePlugin(self):
return self.getPortalObject().portal_caches\
.getRamCacheRoot().get('compute_node_information_cache_factory')\
.getCachePluginList()[0]
@UnrestrictedMethod
def _getSoftwareReleaseValueList(self):
"""Returns list of Software Releases documents for compute_node"""
portal = self.getPortalObject()
software_release_list = []
for software_installation in portal.portal_catalog.unrestrictedSearchResults(
portal_type='Software Installation',
default_aggregate_uid=self.getUid(),
validation_state='validated',
):
software_installation = _assertACI(software_installation.getObject())
software_release_response = SoftwareRelease(
software_release=software_installation.getUrlString().decode('UTF-8'),
computer_guid=self.getReference().decode('UTF-8'))
if software_installation.getSlapState() == 'destroy_requested':
software_release_response._requested_state = 'destroyed'
else:
software_release_response._requested_state = 'available'
known_state = software_installation.getTextAccessStatus()
if known_state.startswith("#access"):
software_release_response._known_state = 'available'
elif known_state.startswith("#building"):
software_release_response._known_state = 'building'
else:
software_release_response._known_state = 'error'
software_release_list.append(software_release_response)
return software_release_list
def _getCacheComputeNodeInformation(self, user):
self.REQUEST.response.setHeader('Content-Type', 'text/xml; charset=utf-8')
slap_compute_node = ComputeNode(self.getReference().decode("UTF-8"))
slap_compute_node._computer_partition_list = []
slap_compute_node._software_release_list = self._getSoftwareReleaseValueList()
unrestrictedSearchResults = self.getPortalObject().portal_catalog.unrestrictedSearchResults
compute_partition_list = unrestrictedSearchResults(
parent_uid=self.getUid(),
validation_state="validated",
portal_type="Compute Partition"
)
self._calculateSlapComputeNodeInformation(slap_compute_node, compute_partition_list)
return dumps(slap_compute_node)
def _activateFillComputeNodeInformationCache(self, user):
tag = 'compute_node_information_cache_fill_%s_%s' % (self.getReference(), user)
if self.getPortalObject().portal_activities.countMessageWithTag(tag) == 0:
self.activate(activity='SQLQueue', priority=2, tag=tag)._fillComputeNodeInformationCache(user)
def _fillComputeNodeInformationCache(self, user):
key = '%s_%s' % (self.getReference(), user)
try:
self._getCachePlugin().set(key, DEFAULT_CACHE_SCOPE,
dict (
time=time.time(),
refresh_etag=self._calculateRefreshEtag(),
data=self._getCacheComputeNodeInformation(user),
),
cache_duration=self.getPortalObject().portal_caches\
.getRamCacheRoot().get('compute_node_information_cache_factory'\
).cache_duration
)
except (Unauthorized, IndexError):
# XXX: Unauthorized hack. Race condition of not ready setup delivery which provides
# security information shall not make this method fail, as it will be
# called later anyway
# Note: IndexError ignored, as it happend in case if full reindex is
# called on site
pass
def _calculateRefreshEtag(self):
# check max indexation timestamp
# it is unlikely to get an empty catalog
last_indexed_entry = self.getPortalObject().portal_catalog(
select_list=['indexation_timestamp'],
portal_type=['Compute Node', 'Compute Partition',
'Software Instance', 'Slave Instance',
'Software Installation'],
sort_on=[('indexation_timestamp', 'DESC')],
limit=1,
)[0]
return '%s_%s' % (last_indexed_entry.uid,
last_indexed_entry.indexation_timestamp)
def _isTestRun(self):
if self.REQUEST.get('disable_isTestRun', False):
return False
if issubclass(self.getPortalObject().MailHost.__class__, DummyMailHostMixin) \
or self.REQUEST.get('test_list'):
return True
return False
def _getComputeNodeInformation(self, user, refresh_etag):
portal = self.getPortalObject()
user_document = _assertACI(portal.portal_catalog.unrestrictedGetResultValue(
reference=user, portal_type=['Person', 'Compute Node', 'Software Instance']))
user_type = user_document.getPortalType()
self.REQUEST.response.setHeader('Content-Type', 'text/xml; charset=utf-8')
slap_compute_node = ComputeNode(self.getReference().decode("UTF-8"))
slap_compute_node._computer_partition_list = []
if user_type in ('Compute Node', 'Person'):
if not self._isTestRun():
cache_plugin = self._getCachePlugin()
key = '%s_%s' % (self.getReference(), user)
try:
entry = cache_plugin.get(key, DEFAULT_CACHE_SCOPE)
except KeyError:
entry = None
if entry is not None and isinstance(entry.getValue(), dict):
cached_dict = entry.getValue()
cached_etag = cached_dict.get('refresh_etag', None)
if (refresh_etag != cached_etag):
# Do not recalculate the compute_node information
# if nothing changed
self._activateFillComputeNodeInformationCache(user)
return cached_dict['data'], cached_etag
else:
self._activateFillComputeNodeInformationCache(user)
self.REQUEST.response.setStatus(503)
return self.REQUEST.response, None
else:
return self._getCacheComputeNodeInformation(user), None
else:
slap_compute_node._software_release_list = []
if user_type == 'Software Instance':
compute_partition_list = self.contentValues(
portal_type="Compute Partition",
checked_permission="View")
else:
compute_partition_list = self.getPortalObject().portal_catalog.unrestrictedSearchResults(
parent_uid=self.getUid(),
validation_state="validated",
portal_type="Compute Partition")
self._calculateSlapComputeNodeInformation(slap_compute_node, compute_partition_list)
return dumps(slap_compute_node), None
def _calculateSlapComputeNodeInformation(self, slap_compute_node, compute_partition_list):
if len(compute_partition_list) == 0:
return
unrestrictedSearchResults = self.getPortalObject().portal_catalog.unrestrictedSearchResults
compute_partition_uid_list = [x.uid for x in compute_partition_list]
grouped_software_instance_list = unrestrictedSearchResults(
portal_type="Software Instance",
default_aggregate_uid=compute_partition_uid_list,
validation_state="validated",
group_by_list=['default_aggregate_uid'],
select_list=['default_aggregate_uid', 'count(*)']
)
slave_software_instance_list = unrestrictedSearchResults(
default_aggregate_uid=compute_partition_uid_list,
portal_type='Slave Instance',
validation_state="validated",
select_list=['default_aggregate_uid'],
**{"slapos_item.slap_state": "start_requested"}
)
for compute_partition in compute_partition_list:
software_instance_list = [x for x in grouped_software_instance_list if (x.default_aggregate_uid == compute_partition.getUid())]
if (len(software_instance_list) == 1) and (software_instance_list[0]['count(*)'] > 1):
software_instance_list = software_instance_list + software_instance_list
slap_compute_node._computer_partition_list.append(
self._getSlapPartitionByPackingList(
_assertACI(compute_partition.getObject()),
software_instance_list,
[x for x in slave_software_instance_list if (x.default_aggregate_uid == compute_partition.getUid())]
)
)
def _instanceXmlToDict(self, xml):
result_dict = {}
try:
result_dict = xml2dict(xml)
except (etree.XMLSchemaError, etree.XMLSchemaParseError, # pylint: disable=catching-non-exception
etree.XMLSchemaValidateError, etree.XMLSyntaxError): # pylint: disable=catching-non-exception
LOG('SlapOSComputeNodeCacheMixin', INFO, 'Issue during parsing xml:', error=True)
return result_dict
def _getSlapPartitionByPackingList(self, compute_partition_document,
software_instance_list,
shared_instance_sql_list):
compute_node = compute_partition_document
while compute_node.getPortalType() != 'Compute Node':
compute_node = compute_node.getParentValue()
compute_node_id = compute_node.getReference().decode("UTF-8")
slap_partition = SlapComputePartition(compute_node_id,
compute_partition_document.getReference().decode("UTF-8"))
slap_partition._software_release_document = None
slap_partition._requested_state = 'destroyed'
slap_partition._need_modification = 0
software_instance = None
if compute_partition_document.getSlapState() == 'busy':
software_instance_count = len(software_instance_list)
if software_instance_count == 1:
software_instance = _assertACI(software_instance_list[0].getObject())
elif software_instance_count > 1:
# XXX do not prevent the system to work if one partition is broken
raise NotImplementedError, "Too many instances linked to %s" % \
compute_partition_document.getRelativeUrl()
if software_instance is not None:
state = software_instance.getSlapState()
if state == "stop_requested":
slap_partition._requested_state = 'stopped'
if state == "start_requested":
slap_partition._requested_state = 'started'
slap_partition._access_status = software_instance.getTextAccessStatus()
slap_partition._software_release_document = SoftwareRelease(
software_release=software_instance.getUrlString().decode("UTF-8"),
computer_guid=compute_node_id)
slap_partition._need_modification = 1
parameter_dict = software_instance._asParameterDict(
shared_instance_sql_list=shared_instance_sql_list
)
# software instance has to define an xml parameter
slap_partition._parameter_dict = self._instanceXmlToDict(
parameter_dict.pop('xml'))
slap_partition._connection_dict = self._instanceXmlToDict(
parameter_dict.pop('connection_xml'))
slap_partition._filter_dict = self._instanceXmlToDict(
parameter_dict.pop('filter_xml'))
slap_partition._instance_guid = parameter_dict.pop('instance_guid')
for slave_instance_dict in parameter_dict.get("slave_instance_list", []):
if slave_instance_dict.has_key("connection_xml"):
slave_instance_dict.update(self._instanceXmlToDict(
slave_instance_dict.pop("connection_xml")))
if slave_instance_dict.has_key("xml"):
slave_instance_dict.update(self._instanceXmlToDict(
slave_instance_dict.pop("xml")))
slap_partition._parameter_dict.update(parameter_dict)
return slap_partition
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Mixin Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>SlapOSComputeNodeMixin</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>mixin.erp5.SlapOSComputeNodeMixin</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Mixin Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.Workflow"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_log</string> </key>
<value>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>error_message</string> </key>
<value>
<list>
<persistent> <string encoding="base64">AAAAAAAAAAU=</string> </persistent>
</list>
</value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="5" aka="AAAAAAAAAAU=">
<pickle>
<global name="Message" module="Products.ERP5Type.Message"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>default</string> </key>
<value> <string>Class ${reference} must be defined</string> </value>
</item>
<item>
<key> <string>domain</string> </key>
<value> <string>erp5_ui</string> </value>
</item>
<item>
<key> <string>mapping</string> </key>
<value>
<dictionary>
<item>
<key> <string>reference</string> </key>
<value> <string>SlapOSComputeNodeCacheMixin</string> </value>
</item>
</dictionary>
</value>
</item>
<item>
<key> <string>message</string> </key>
<value> <string>Class ${reference} must be defined</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<type_mixin>
<portal_type id="Compute Node">
<item>SlapOSCacheMixin</item>
<item>SlapOSComputeNodeMixin</item>
</portal_type>
<portal_type id="Compute Partition">
<item>SlapOSCacheMixin</item>
......
mixin.erp5.SlapOSCacheMixin
mixin.erp5.SlapOSComputeNodeMixin
\ No newline at end of file
......@@ -19,6 +19,8 @@ portal_alarms/slapos_stop_collect_instance
portal_alarms/slapos_update_compute_node_capacity_scope
portal_caches/access_status_data_cache_factory
portal_caches/access_status_data_cache_factory/volatile_cache_plugin
portal_caches/compute_node_information_cache_factory
portal_caches/compute_node_information_cache_factory/persistent_cache_plugin
portal_caches/last_stored_data_cache_factory
portal_caches/last_stored_data_cache_factory/volatile_cache_plugin
product_module/compute_node
......
Compute Node | SlapOSCacheMixin
Compute Node | SlapOSComputeNodeMixin
Compute Partition | SlapOSCacheMixin
Person | SlapOSCacheMixin
\ No newline at end of file
......@@ -17,30 +17,29 @@ def getComputeNodeReferenceAndUserId(item):
compute_node = partition.getParentValue()
if compute_node is not None and compute_node.getValidationState() == 'validated':
return compute_node.getReference(), compute_node.getUserId()
return None, None
return compute_node, compute_node.getReference(), compute_node.getUserId()
return None, None, None
def Item_activateFillComputeNodeInformationCache(state_change):
item = state_change['object']
portal = item.getPortalObject()
compute_node_reference, user_id = getComputeNodeReferenceAndUserId(item)
if compute_node_reference is None:
compute_node, compute_node_reference, user_id = getComputeNodeReferenceAndUserId(item)
if compute_node is None:
return None
if user_id is None:
return None
user = portal.acl_users.getUserById(user_id)
if user is None:
raise ValueError("User %s not found" % user_id)
sm = getSecurityManager()
try:
newSecurityManager(None, user)
portal.portal_slap._activateFillComputeNodeInformationCache(
compute_node_reference, compute_node_reference)
compute_node._activateFillComputeNodeInformationCache(
compute_node_reference)
finally:
setSecurityManager(sm)
......
......@@ -78,16 +78,16 @@ class TestSlapOSSlapToolgetFullComputerInformation(TestSlapOSSlapToolMixin):
self.tic()
self.login(self.compute_node_user_id)
self.portal_slap.getFullComputerInformation(self.compute_node_id)
# First access.
# Cache has been filled by interaction workflow
# (luckily, it seems the cache is filled after everything is indexed)
response = self.portal_slap.getFullComputerInformation(self.compute_node_id)
self.commit()
first_etag = self.portal_slap._calculateRefreshEtag()
first_etag = self.compute_node._calculateRefreshEtag()
first_body_fingerprint = hashData(
self.portal_slap._getCacheComputeNodeInformation(self.compute_node_id,
self.compute_node_id)
self.compute_node._getCacheComputeNodeInformation(self.compute_node_id)
)
self.assertEqual(200, response.status)
self.assertTrue('last-modified' not in response.headers)
......@@ -120,10 +120,9 @@ class TestSlapOSSlapToolgetFullComputerInformation(TestSlapOSSlapToolMixin):
self.commit()
self.assertEqual(200, response.status)
self.assertTrue('last-modified' not in response.headers)
second_etag = self.portal_slap._calculateRefreshEtag()
second_etag = self.compute_node._calculateRefreshEtag()
second_body_fingerprint = hashData(
self.portal_slap._getCacheComputeNodeInformation(self.compute_node_id,
self.compute_node_id)
self.compute_node._getCacheComputeNodeInformation(self.compute_node_id)
)
self.assertNotEqual(first_etag, second_etag)
# The indexation timestamp does not impact the response body
......@@ -154,10 +153,9 @@ class TestSlapOSSlapToolgetFullComputerInformation(TestSlapOSSlapToolMixin):
# Check that the result is stable, as the indexation timestamp is not changed yet
current_activity_count = len(self.portal.portal_activities.getMessageList())
# Edition does not impact the etag
self.assertEqual(second_etag, self.portal_slap._calculateRefreshEtag())
self.assertEqual(second_etag, self.compute_node._calculateRefreshEtag())
third_body_fingerprint = hashData(
self.portal_slap._getCacheComputeNodeInformation(self.compute_node_id,
self.compute_node_id)
self.compute_node._getCacheComputeNodeInformation(self.compute_node_id)
)
# The edition impacts the response body
self.assertNotEqual(first_body_fingerprint, third_body_fingerprint)
......@@ -177,7 +175,7 @@ class TestSlapOSSlapToolgetFullComputerInformation(TestSlapOSSlapToolMixin):
self.commit()
self.assertEqual(200, response.status)
self.assertTrue('last-modified' not in response.headers)
third_etag = self.portal_slap._calculateRefreshEtag()
third_etag = self.compute_node._calculateRefreshEtag()
self.assertNotEqual(second_etag, third_etag)
self.assertEqual(third_etag, response.headers.get('etag'))
self.assertEqual(third_body_fingerprint, hashData(response.body))
......@@ -192,12 +190,11 @@ class TestSlapOSSlapToolgetFullComputerInformation(TestSlapOSSlapToolMixin):
# Check that the result is stable, as the indexation timestamp is not changed yet
current_activity_count = len(self.portal.portal_activities.getMessageList())
# Edition does not impact the etag
self.assertEqual(third_etag, self.portal_slap._calculateRefreshEtag())
self.assertEqual(third_etag, self.compute_node._calculateRefreshEtag())
# The edition does not impact the response body yet, as the aggregate relation
# is not yet unindex
self.assertEqual(third_body_fingerprint, hashData(
self.portal_slap._getCacheComputeNodeInformation(self.compute_node_id,
self.compute_node_id)
self.compute_node._getCacheComputeNodeInformation(self.compute_node_id)
))
response = self.portal_slap.getFullComputerInformation(self.compute_node_id)
self.commit()
......@@ -217,10 +214,9 @@ class TestSlapOSSlapToolgetFullComputerInformation(TestSlapOSSlapToolMixin):
self.commit()
self.assertEqual(200, response.status)
self.assertTrue('last-modified' not in response.headers)
fourth_etag = self.portal_slap._calculateRefreshEtag()
fourth_etag = self.compute_node._calculateRefreshEtag()
fourth_body_fingerprint = hashData(
self.portal_slap._getCacheComputeNodeInformation(self.compute_node_id,
self.compute_node_id)
self.compute_node._getCacheComputeNodeInformation(self.compute_node_id)
)
self.assertNotEqual(third_etag, fourth_etag)
# The indexation timestamp does not impact the response body
......@@ -733,43 +729,9 @@ class TestSlapOSSlapToolComputeNodeAccess(TestSlapOSSlapToolMixin):
response = self.portal_slap.computerBang(self.compute_node_id,
error_log)
self.assertEqual('None', response)
created_at = rfc1123_date(DateTime())
since = created_at
response = self.portal_slap.getComputerStatus(self.compute_node_id)
# check returned XML
xml_fp = StringIO.StringIO()
# We do not assert getComputerStatus on this test, since
# the change of the timestamp is part of reportComputeNodeBang
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response.body),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<dictionary id='i2'>
<string>created_at</string>
<string>%(created_at)s</string>
<string>no_data_since_15_minutes</string>
<int>0</int>
<string>no_data_since_5_minutes</string>
<int>0</int>
<string>since</string>
<string>%(since)s</string>
<string>state</string>
<string/>
<string>text</string>
<string>#error bang</string>
<string>user</string>
<string>%(compute_node_id)s</string>
</dictionary>
</marshal>
""" % dict(
created_at=created_at,
since=since,
compute_node_id=self.compute_node_id,
)
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
self.assertComputeNodeBangSimulator((), {'comment': error_log})
finally:
if os.path.exists(self.compute_node_bang_simulator):
......@@ -2635,43 +2597,9 @@ class TestSlapOSSlapToolPersonAccess(TestSlapOSSlapToolMixin):
response = self.portal_slap.computerBang(self.compute_node_id,
error_log)
self.assertEqual('None', response)
created_at = rfc1123_date(DateTime())
since = created_at
response = self.portal_slap.getComputerStatus(self.compute_node_id)
# check returned XML
xml_fp = StringIO.StringIO()
# We do not assert getComputerStatus on this test, since
# the change of the timestamp is part of reportComputeNodeBang
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response.body),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<dictionary id='i2'>
<string>created_at</string>
<string>%(created_at)s</string>
<string>no_data_since_15_minutes</string>
<int>0</int>
<string>no_data_since_5_minutes</string>
<int>0</int>
<string>since</string>
<string>%(since)s</string>
<string>state</string>
<string/>
<string>text</string>
<string>#error bang</string>
<string>user</string>
<string>%(person_reference)s</string>
</dictionary>
</marshal>
""" % dict(
created_at=created_at,
since=since,
person_reference=self.person_reference,
)
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
self.assertComputeNodeBangSimulator((), {'comment': error_log})
finally:
if os.path.exists(self.compute_node_bang_simulator):
......
......@@ -38,11 +38,8 @@ from Products.DCWorkflow.DCWorkflow import ValidationFailed
from Products.ERP5Type.Globals import InitializeClass
from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions
from Products.ERP5Type.Cache import DEFAULT_CACHE_SCOPE
from Products.ERP5Type.Cache import CachingMethod
from lxml import etree
import time
from Products.ERP5Type.tests.utils import DummyMailHostMixin
try:
from slapos.slap.slap import (
Computer as ComputeNode,
......@@ -162,167 +159,6 @@ class SlapTool(BaseTool):
# Public GET methods
####################################################
def _isTestRun(self):
if self.REQUEST.get('disable_isTestRun', False):
return False
if issubclass(self.getPortalObject().MailHost.__class__, DummyMailHostMixin) \
or self.REQUEST.get('test_list'):
return True
return False
def _getCachePlugin(self):
return self.getPortalObject().portal_caches\
.getRamCacheRoot().get('compute_node_information_cache_factory')\
.getCachePluginList()[0]
def _getCacheComputeNodeInformation(self, compute_node_id, user):
self.REQUEST.response.setHeader('Content-Type', 'text/xml; charset=utf-8')
slap_compute_node = ComputeNode(compute_node_id.decode("UTF-8"))
parent_uid = self._getComputeNodeUidByReference(compute_node_id)
slap_compute_node._computer_partition_list = []
slap_compute_node._software_release_list = \
self._getSoftwareReleaseValueListForComputeNode(compute_node_id)
unrestrictedSearchResults = self.getPortalObject().portal_catalog.unrestrictedSearchResults
compute_partition_list = unrestrictedSearchResults(
parent_uid=parent_uid,
validation_state="validated",
portal_type="Compute Partition"
)
self._calculateSlapComputeNodeInformation(slap_compute_node, compute_partition_list)
return dumps(slap_compute_node)
def _fillComputeNodeInformationCache(self, compute_node_id, user):
key = '%s_%s' % (compute_node_id, user)
try:
self._getCachePlugin().set(key, DEFAULT_CACHE_SCOPE,
dict (
time=time.time(),
refresh_etag=self._calculateRefreshEtag(),
data=self._getCacheComputeNodeInformation(compute_node_id, user),
),
cache_duration=self.getPortalObject().portal_caches\
.getRamCacheRoot().get('compute_node_information_cache_factory'\
).cache_duration
)
except (Unauthorized, IndexError):
# XXX: Unauthorized hack. Race condition of not ready setup delivery which provides
# security information shall not make this method fail, as it will be
# called later anyway
# Note: IndexError ignored, as it happend in case if full reindex is
# called on site
pass
def _activateFillComputeNodeInformationCache(self, compute_node_id, user):
tag = 'compute_node_information_cache_fill_%s_%s' % (compute_node_id, user)
if self.getPortalObject().portal_activities.countMessageWithTag(tag) == 0:
self.activate(activity='SQLQueue', tag=tag)._fillComputeNodeInformationCache(
compute_node_id, user)
def _calculateSlapComputeNodeInformation(self, slap_compute_node, compute_partition_list):
if len(compute_partition_list) == 0:
return
unrestrictedSearchResults = self.getPortalObject().portal_catalog.unrestrictedSearchResults
compute_partition_uid_list = [x.uid for x in compute_partition_list]
grouped_software_instance_list = unrestrictedSearchResults(
portal_type="Software Instance",
default_aggregate_uid=compute_partition_uid_list,
validation_state="validated",
group_by_list=['default_aggregate_uid'],
select_list=['default_aggregate_uid', 'count(*)']
)
slave_software_instance_list = unrestrictedSearchResults(
default_aggregate_uid=compute_partition_uid_list,
portal_type='Slave Instance',
validation_state="validated",
select_list=['default_aggregate_uid'],
**{"slapos_item.slap_state": "start_requested"}
)
for compute_partition in compute_partition_list:
software_instance_list = [x for x in grouped_software_instance_list if (x.default_aggregate_uid == compute_partition.getUid())]
if (len(software_instance_list) == 1) and (software_instance_list[0]['count(*)'] > 1):
software_instance_list = software_instance_list + software_instance_list
slap_compute_node._computer_partition_list.append(
self._getSlapPartitionByPackingList(
_assertACI(compute_partition.getObject()),
software_instance_list,
[x for x in slave_software_instance_list if (x.default_aggregate_uid == compute_partition.getUid())]
)
)
def _calculateRefreshEtag(self):
# check max indexation timestamp
# it is unlikely to get an empty catalog
last_indexed_entry = self.getPortalObject().portal_catalog(
select_list=['indexation_timestamp'],
portal_type=['Compute Node', 'Compute Partition',
'Software Instance', 'Slave Instance',
'Software Installation'],
sort_on=[('indexation_timestamp', 'DESC')],
limit=1,
)[0]
return '%s_%s' % (last_indexed_entry.uid,
last_indexed_entry.indexation_timestamp)
def _getComputeNodeInformation(self, compute_node_id, user, refresh_etag):
portal = self.getPortalObject()
user_document = _assertACI(portal.portal_catalog.unrestrictedGetResultValue(
reference=user, portal_type=['Person', 'Compute Node', 'Software Instance']))
user_type = user_document.getPortalType()
self.REQUEST.response.setHeader('Content-Type', 'text/xml; charset=utf-8')
slap_compute_node = ComputeNode(compute_node_id.decode("UTF-8"))
parent_uid = self._getComputeNodeUidByReference(compute_node_id)
slap_compute_node._computer_partition_list = []
if user_type in ('Compute Node', 'Person'):
if not self._isTestRun():
cache_plugin = self._getCachePlugin()
key = '%s_%s' % (compute_node_id, user)
try:
entry = cache_plugin.get(key, DEFAULT_CACHE_SCOPE)
except KeyError:
entry = None
if entry is not None and isinstance(entry.getValue(), dict):
cached_dict = entry.getValue()
cached_etag = cached_dict.get('refresh_etag', None)
if (refresh_etag != cached_etag):
# Do not recalculate the compute_node information
# if nothing changed
self._activateFillComputeNodeInformationCache(compute_node_id, user)
return cached_dict['data'], cached_etag
else:
self._activateFillComputeNodeInformationCache(compute_node_id, user)
self.REQUEST.response.setStatus(503)
return self.REQUEST.response, None
else:
return self._getCacheComputeNodeInformation(compute_node_id, user), None
else:
slap_compute_node._software_release_list = []
if user_type == 'Software Instance':
compute_node = self.getPortalObject().portal_catalog.unrestrictedSearchResults(
portal_type='Compute Node', reference=compute_node_id,
validation_state="validated")[0].getObject()
compute_partition_list = compute_node.contentValues(
portal_type="Compute Partition",
checked_permission="View")
else:
compute_partition_list = self.getPortalObject().portal_catalog.unrestrictedSearchResults(
parent_uid=parent_uid,
validation_state="validated",
portal_type="Compute Partition")
self._calculateSlapComputeNodeInformation(slap_compute_node, compute_partition_list)
return dumps(slap_compute_node), None
@UnrestrictedMethod
def _getInstanceTreeIpList(self, compute_node_id, compute_partition_id):
software_instance = self._getSoftwareInstanceForComputePartition(
......@@ -363,8 +199,11 @@ class SlapTool(BaseTool):
if str(user) == computer_id:
compute_node = self.getPortalObject().portal_membership.getAuthenticatedMember().getUserValue()
compute_node.setAccessStatus('#access %s' % computer_id)
refresh_etag = self._calculateRefreshEtag()
body, etag = self._getComputeNodeInformation(computer_id, user, refresh_etag)
else:
compute_node = self._getComputeNodeDocument(computer_id)
refresh_etag = compute_node._calculateRefreshEtag()
body, etag = compute_node._getComputeNodeInformation(user, refresh_etag)
if self.REQUEST.response.getStatus() == 200:
# Keep in cache server for 7 days
......@@ -1576,37 +1415,6 @@ class SlapTool(BaseTool):
'timestamp': "%i" % timestamp,
}
@UnrestrictedMethod
def _getSoftwareReleaseValueListForComputeNode(self, compute_node_reference):
"""Returns list of Software Releases documentsfor compute_node"""
compute_node_document = self._getComputeNodeDocument(compute_node_reference)
portal = self.getPortalObject()
software_release_list = []
for software_installation in portal.portal_catalog.unrestrictedSearchResults(
portal_type='Software Installation',
default_aggregate_uid=compute_node_document.getUid(),
validation_state='validated',
):
software_installation = _assertACI(software_installation.getObject())
software_release_response = SoftwareRelease(
software_release=software_installation.getUrlString().decode('UTF-8'),
computer_guid=compute_node_reference.decode('UTF-8'))
if software_installation.getSlapState() == 'destroy_requested':
software_release_response._requested_state = 'destroyed'
else:
software_release_response._requested_state = 'available'
known_state = software_installation.getTextAccessStatus()
if known_state.startswith("#access"):
software_release_response._known_state = 'available'
elif known_state.startswith("#building"):
software_release_response._known_state = 'building'
else:
software_release_response._known_state = 'error'
software_release_list.append(software_release_response)
return software_release_list
@convertToREST
def _softwareReleaseError(self, url, compute_node_id, error_log):
"""
......
portal_caches/last_stored_data_cache_factory
portal_caches/last_stored_data_cache_factory/volatile_cache_plugin
portal_caches/compute_node_information_cache_factory
portal_caches/compute_node_information_cache_factory/persistent_cache_plugin
\ No newline at end of file
portal_caches/compute_node_information_cache_factory
portal_caches/compute_node_information_cache_factory/persistent_cache_plugin
portal_caches/slap_cache_factory
portal_caches/slap_cache_factory/persistent_cache_plugin
web_site_module/slapos_hateoas
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment