pax_global_header 0000666 0000000 0000000 00000000064 14601336532 0014515 g ustar 00root root 0000000 0000000 52 comment=8360f221a5d6bdca638d33ecaebeabe025962af5
slapos-master-software-erp5-test/ 0000775 0000000 0000000 00000000000 14601336532 0017331 5 ustar 00root root 0000000 0000000 slapos-master-software-erp5-test/software/ 0000775 0000000 0000000 00000000000 14601336532 0021163 5 ustar 00root root 0000000 0000000 slapos-master-software-erp5-test/software/erp5/ 0000775 0000000 0000000 00000000000 14601336532 0022036 5 ustar 00root root 0000000 0000000 slapos-master-software-erp5-test/software/erp5/test/ 0000775 0000000 0000000 00000000000 14601336532 0023015 5 ustar 00root root 0000000 0000000 slapos-master-software-erp5-test/software/erp5/test/README.md 0000664 0000000 0000000 00000000040 14601336532 0024266 0 ustar 00root root 0000000 0000000 Tests for ERP5 software release
slapos-master-software-erp5-test/software/erp5/test/setup.py 0000664 0000000 0000000 00000004066 14601336532 0024535 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from setuptools import setup, find_packages
version = '0.0.1.dev0'
name = 'slapos.test.erp5'
with open("README.md") as f:
long_description = f.read()
setup(name=name,
version=version,
description="Test for SlapOS' ERP5 software release",
long_description=long_description,
long_description_content_type='text/markdown',
maintainer="Nexedi",
maintainer_email="info@nexedi.com",
url="https://lab.nexedi.com/nexedi/slapos",
packages=find_packages(),
install_requires=[
'slapos.core',
'supervisor',
'slapos.libnetworkcache',
'erp5.util',
'psutil',
'requests',
'mysqlclient',
'cryptography',
'pexpect',
'pyOpenSSL',
],
test_suite='test',
)
slapos-master-software-erp5-test/software/erp5/test/test/ 0000775 0000000 0000000 00000000000 14601336532 0023774 5 ustar 00root root 0000000 0000000 slapos-master-software-erp5-test/software/erp5/test/test/__init__.py 0000664 0000000 0000000 00000032702 14601336532 0026111 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2022 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import hashlib
import itertools
import json
import os
import shutil
import subprocess
import sys
import tempfile
import time
import urllib
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from slapos.testing.testcase import ManagedResource, makeModuleSetUpAndTestCaseClass
from slapos.testing.utils import findFreeTCPPort
_setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass(
os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..', 'software.cfg')))
setup_module_executed = False
def setUpModule():
# slapos.testing.testcase's only need to be executed once
global setup_module_executed
if not setup_module_executed:
_setUpModule()
setup_module_executed = True
# Metaclass to parameterize our tests.
# This is a rough adaption of the parameterized package:
# https://github.com/wolever/parameterized
# Consult following note for rationale why we don't use parameterized:
# https://lab.nexedi.com/nexedi/slapos/merge_requests/1306
class ERP5InstanceTestMeta(type):
"""Adjust ERP5InstanceTestCase instances to be run in several flavours (e.g. NEO/ZEO)
Adjustments can be declared via setting the '__test_matrix__' attribute
of a test case.
A test matrix is a dict which maps the flavoured class name suffix to
a tuple of parameters.
A parameter is a function which receives the instance_parameter_dict
and modifies it in place (therefore no return value is needed).
You can use the 'matrix' helper function to construct a test matrix.
If .__test_matrix__ is 'None' the test case is ignored.
If the test case should be run without any adaptions, you can set
.__test_matrix__ to 'matrix((default,))'.
"""
def __new__(cls, name, bases, attrs):
base_class = super().__new__(cls, name, bases, attrs)
if base_class._isParameterized():
cls._parameterize(base_class)
return base_class
# _isParameterized tells whether class is parameterized.
# All classes with 'metaclass=ERP5InstanceTestMeta' are parameterized
# except from a class which has been automatically instantiated from
# such user class. This exception prevents infinite recursion due to
# a parameterized class which tries to parameterize itself again.
def _isParameterized(self):
return not getattr(self, '.created_by_parametrize', False)
# Create multiple test classes from single definition.
@classmethod
def _parameterize(cls, base_class):
mod_dict = sys.modules[base_class.__module__].__dict__
for class_name_suffix, parameter_tuple in (base_class.__test_matrix__ or {}).items():
parameterized_cls_dict = dict(
base_class.__dict__,
**{
# Avoid infinite loop by a parameterized class which
# parameterize itself again and again and..
".created_by_parametrize": True,
# Switch
#
# .getInstanceParameterDict to ._test_getInstanceParameterDict
# ._base_getInstanceParameterDict to .getInstanceParameterDict
#
# so that we could inject base implementation to be called above
# user-defined getInstanceParameterDict.
"_test_getInstanceParameterDict": base_class.getInstanceParameterDict,
"getInstanceParameterDict": cls._getParameterizedInstanceParameterDict(parameter_tuple)
}
)
name = f"{base_class.__name__}_{class_name_suffix}"
mod_dict[name] = type(name, (base_class,), parameterized_cls_dict)
# _getParameterizedInstanceParameterDict returns a modified version of
# a test cases original 'getInstanceParameterDict'. The modified version
# applies parameters on the default instance parameters.
@staticmethod
def _getParameterizedInstanceParameterDict(parameter_tuple):
@classmethod
def getInstanceParameterDict(cls):
instance_parameter_dict = json.loads(
cls._test_getInstanceParameterDict().get("_", r"{}")
)
[p(instance_parameter_dict) for p in parameter_tuple]
return {"_": json.dumps(instance_parameter_dict)}
return getInstanceParameterDict
# Hide tests in unpatched base class: It doesn't make sense to run tests
# in original class, because parameters have not been assigned yet.
#
# We can't simply call 'delattr', because this wouldn't remove
# inherited tests. Overriding dir is sufficient, because this is
# the way how unittest discovers tests:
# https://github.com/python/cpython/blob/3.11/Lib/unittest/loader.py#L237
def __dir__(self):
if self._isParameterized():
return [attr for attr in super().__dir__() if not attr.startswith('test')]
return super().__dir__()
def matrix(*parameter_tuple):
"""matrix creates a mapping of test_name -> parameter_tuple.
Each provided parameter_tuple won't be combined within itself,
but with any other provided parameter_tuple, for instance
>>> parameter_tuple0 = (param0, param1)
>>> parameter_tuple1 = (param2, param3)
>>> matrix(parameter_tuple0, parameter_tuple1)
will return all options of (param0 | param1) & (param2 | param3):
- param0_param2
- param0_param3
- param1_param2
- param1_param3
"""
return {
"_".join([p.__name__ for p in params]): params
for params in itertools.product(*parameter_tuple)
}
# Define parameters (function which receives instance params + modifies them).
#
# default runs tests without any adaption
def default(instance_parameter_dict): ...
def zeo(instance_parameter_dict):
instance_parameter_dict['zodb'] = [{"type": "zeo", "server": {}}]
def neo(instance_parameter_dict):
# We don't provide encryption certificates in test runs for the sake
# of simplicity. By default SSL is turned on, we need to explicitly
# deactivate it:
# https://lab.nexedi.com/nexedi/slapos/blob/a8150a1ac/software/neoppod/instance-neo-input-schema.json#L61-65
instance_parameter_dict['zodb'] = [{"type": "neo", "server": {"ssl": False}}]
class ERP5InstanceTestCase(SlapOSInstanceTestCase, metaclass=ERP5InstanceTestMeta):
"""ERP5 base test case
"""
__test_matrix__ = matrix((zeo, neo)) # switch between NEO and ZEO mode
@classmethod
def isNEO(cls):
return '_neo' in cls.__name__
@classmethod
def getRootPartitionConnectionParameterDict(cls):
"""Return the output parameters from the root partition"""
return json.loads(
cls.computer_partition.getConnectionParameterDict()['_'])
@classmethod
def getComputerPartition(cls, partition_reference):
for computer_partition in cls.slap.computer.getComputerPartitionList():
if partition_reference == computer_partition.getInstanceParameter(
'instance_title'):
return computer_partition
@classmethod
def getComputerPartitionPath(cls, partition_reference):
partition_id = cls.getComputerPartition(partition_reference).getId()
return os.path.join(cls.slap._instance_root, partition_id)
class CaucaseService(ManagedResource):
"""A caucase service.
"""
url: str = None
directory: str = None
_caucased_process: subprocess.Popen = None
def open(self) -> None:
# start a caucased and server certificate.
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
caucased_path = os.path.join(software_release_root_path, 'bin', 'caucased')
self.directory = tempfile.mkdtemp()
caucased_dir = os.path.join(self.directory, 'caucased')
os.mkdir(caucased_dir)
os.mkdir(os.path.join(caucased_dir, 'user'))
os.mkdir(os.path.join(caucased_dir, 'service'))
backend_caucased_netloc = f'{self._cls._ipv4_address}:{findFreeTCPPort(self._cls._ipv4_address)}'
self.url = 'http://' + backend_caucased_netloc
self._caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(caucased_dir, 'server.key.pem'),
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
# capture subprocess output not to pollute test's own stdout
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(30):
try:
if requests.get(self.url).status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError('caucased failed to start.')
def close(self) -> None:
self._caucased_process.terminate()
self._caucased_process.wait()
self._caucased_process.stdout.close()
shutil.rmtree(self.directory)
@property
def ca_crt_path(self) -> str:
"""Path of the CA certificate from this caucase.
"""
ca_crt_path = os.path.join(self.directory, 'ca.crt.pem')
if not os.path.exists(ca_crt_path):
with open(ca_crt_path, 'w') as f:
f.write(
requests.get(urllib.parse.urljoin(
self.url,
'/cas/crt/ca.crt.pem',
)).text)
return ca_crt_path
class CaucaseCertificate(ManagedResource):
"""A certificate signed by a caucase service.
"""
ca_crt_file: str = None
crl_file: str = None
csr_file: str = None
cert_file: str = None
key_file: str = None
def open(self) -> None:
self.tmpdir = tempfile.mkdtemp()
self.ca_crt_file = os.path.join(self.tmpdir, 'ca-crt.pem')
self.crl_file = os.path.join(self.tmpdir, 'ca-crl.pem')
self.csr_file = os.path.join(self.tmpdir, 'csr.pem')
self.cert_file = os.path.join(self.tmpdir, 'crt.pem')
self.key_file = os.path.join(self.tmpdir, 'key.pem')
def close(self) -> None:
shutil.rmtree(self.tmpdir)
@property
def _caucase_path(self) -> str:
"""path of caucase executable.
"""
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
return os.path.join(software_release_root_path, 'bin', 'caucase')
def request(self, common_name: str, caucase: CaucaseService, san: x509.SubjectAlternativeName=None) -> None:
"""Generate certificate and request signature to the caucase service.
This overwrite any previously requested certificate for this instance.
"""
cas_args = [
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
]
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
with open(self.key_file, 'wb') as f:
f.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(
NameOID.COMMON_NAME,
common_name,
),
]))
if san:
csr = csr.add_extension(san, critical=True)
csr = csr.sign(key, hashes.SHA256(), default_backend())
with open(self.csr_file, 'wb') as f:
f.write(csr.public_bytes(serialization.Encoding.PEM))
csr_id = subprocess.check_output(
cas_args + [
'--send-csr', self.csr_file,
],
).split()[0].decode()
assert csr_id
for _ in range(30):
if not subprocess.call(
cas_args + [
'--get-crt', csr_id, self.cert_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) == 0:
break
else:
time.sleep(1)
else:
raise RuntimeError('getting service certificate failed.')
with open(self.cert_file) as cert_file:
assert 'BEGIN CERTIFICATE' in cert_file.read()
def revoke(self, caucase: CaucaseService) -> None:
"""Revoke the client certificate on this caucase instance.
"""
subprocess.check_call([
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
'--revoke-crt', self.cert_file, self.key_file,
])
slapos-master-software-erp5-test/software/erp5/test/test/benchmarks.py 0000664 0000000 0000000 00000023100 14601336532 0026457 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2022 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import contextlib
import datetime
import json
import pathlib
import socket
import struct
import subprocess
import typing
import urllib.parse
import psutil
import requests
from . import ERP5InstanceTestCase, default, matrix, setUpModule
from .test_erp5 import ZopeSkinsMixin
class TestOrderBuildPackingListSimulation(
ZopeSkinsMixin,
ERP5InstanceTestCase,
):
"""Create orders and build packing lists.
"""
__partition_reference__ = 's'
__test_matrix__ = matrix((default, ))
_start: datetime.datetime
_previous: datetime.datetime
@classmethod
def getInstanceParameterDict(cls) -> dict:
return {
'_':
json.dumps(
{
"bt5":
" ".join(
[
"erp5_full_text_mroonga_catalog",
"erp5_configurator_standard",
"erp5_scalability_test",
]),
"mariadb": {
# We use a large innodb-buffer-pool-size because the simulation
# select method used for sale packing list does not use index and
# cause slow queries
"innodb-buffer-pool-size": 32 * 1024 * 1024 * 1024, # 32Go
},
"zope-partition-dict": {
"activities": {
"instance-count": 32,
"family": "activities",
"thread-amount": 2,
"port-base": 2300
},
"default": {
"instance-count": 1,
"family": "default",
"port-base": 2200
},
},
})
}
@classmethod
def _setUpClass(cls) -> None:
super()._setUpClass()
cls.zope_base_url = cls._getAuthenticatedZopeUrl('')
cls.create_sale_order_batch_url = urllib.parse.urljoin(
cls.zope_base_url, 'ERP5Site_createScalabilityTestSaleOrderBatch')
def setUp(self) -> None:
super().setUp()
self.measurement_file = open(f'measures{self.id()}.jsonl', 'w')
self.addCleanup(self.measurement_file.close)
# Describe the software used. TODO: use nxd-bom once integrated
self.write_measurement(
{
'type': 'sbom',
# content of runwsgi script, to know which versions of python packages were used
'runwsgi-content':
(pathlib.Path(
self.computer_partition_root_path
) / 'software_release' / 'bin' / 'runwsgi').read_text(),
'mysql-show-variables':
subprocess.check_output((
pathlib.Path(self.getComputerPartitionPath('mariadb')) / 'bin' / 'mysql',
'-e', 'show variables'), text=True),
'erp5-git-describe':
subprocess.check_output(
('git', 'describe', '--long'),
cwd=pathlib.Path(self.computer_partition_root_path) / 'software_release' / 'parts' / 'erp5',
text=True),
'erp5-git-diff':
subprocess.check_output(
('git', 'diff'),
cwd=pathlib.Path(self.computer_partition_root_path) / 'software_release' / 'parts' / 'erp5',
text=True),
'slapos-software-release-git-describe':
subprocess.check_output(
('git', 'describe', '--long'),
cwd=pathlib.Path(self.getSoftwareURL()).parent,
text=True),
'slapos-software-release-git-diff':
subprocess.check_output(
('git', 'diff'),
cwd=pathlib.Path(self.getSoftwareURL()).parent,
text=True),
})
def write_measurement(
self, measurement: dict[str, typing.Union[str, float]]) -> None:
json.dump(
measurement,
self.measurement_file,
)
self.measurement_file.write('\n')
self.measurement_file.flush()
def take_measurements(self, step: str) -> None:
# Time for this iteration
now = datetime.datetime.now()
elapsed = now - self._previous
self._previous = now
# Memory usage of all zopes
with self.slap.instance_supervisor_rpc as supervisor:
zope_memory_info_list = [
psutil.Process(process['pid']).memory_info()
for process in supervisor.getAllProcessInfo()
if process['name'].startswith('zope-') and process['pid']
]
zope_total_rss = sum(mem.rss for mem in zope_memory_info_list)
zope_count = len(zope_memory_info_list)
# Database size
if self.isNEO():
root_fs_size = zeo_root_stats = 'N/A'
else:
root_fs = pathlib.Path(
self.getComputerPartitionPath('zodb')) / 'srv' / 'zodb' / 'root.fs'
root_fs_size = root_fs.stat().st_size
# ZEO stats ( using ruok protocol https://github.com/zopefoundation/ZEO/commit/d5082536 )
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.connect((self._ipv4_address, 2100))
s.sendall(b'\x00\x00\x00\x04ruok')
_ = s.recv(struct.unpack(">I", s.recv(4))[0])
zeo_stats = json.loads(s.recv(struct.unpack(">I", s.recv(4))[0]))
# we are supposed to have only one storage with name "root"
zeo_root_stats = zeo_stats.pop('root')
assert not zeo_stats
self.logger.info(
"Measurements for %s (after %s): "
"elapsed=%s zope_total_rss=%s / %s root_fs_size=%s",
step,
now - self._start,
elapsed,
zope_total_rss,
zope_count,
root_fs_size,
)
self.write_measurement(
{
'step': step,
'step_duration_seconds': elapsed.total_seconds(),
'step_duration': str(elapsed),
'zope_total_rss': zope_total_rss,
'zope_count': zope_count,
'root_fs_size': root_fs_size,
'zeo_stats': zeo_root_stats,
'now': str(now),
})
def test(self) -> None:
self._start = self._previous = datetime.datetime.now()
with requests.Session() as session:
ret = session.get(
urllib.parse.urljoin(
self.zope_base_url, 'ERP5Site_bootstrapScalabilityTest'),
verify=False,
params={'user_quantity:int': 1})
if not ret.ok:
self.logger.error(ret.text)
if self._debug:
breakpoint()
ret.raise_for_status()
self._waitForActivities(
timeout=datetime.timedelta(hours=2).total_seconds())
# XXX default reference generator for sale packing list cause
# many conflict errors, disable it.
self._addPythonScript(
script_id='Delivery_generateReference',
params='*args, **kw',
body='context.setReference("no reference for benchmark")',
)
self.take_measurements("setup")
# XXX now that we have installed business templates,
# restart all zopes to workaround a bug with accessors not
# working after some time (packing_list_line.getStartDate no longer
# acquire from parent's sale packing list)
with self.slap.instance_supervisor_rpc as supervisor:
supervisor.stopAllProcesses()
supervisor.startAllProcesses()
self.slap.waitForInstance()
self.take_measurements("restart")
with requests.Session() as session:
for i in range(100):
for j in range(5):
ret = session.get(
self.create_sale_order_batch_url,
verify=False,
params={
'random_seed': f'{i}.{j}',
'order_count:int': '50',
},
)
if not ret.ok:
self.logger.error(ret.text)
if self._debug:
breakpoint()
ret.raise_for_status()
self._waitForActivities(
timeout=datetime.timedelta(hours=2).total_seconds())
self.take_measurements(f"iteration_{i+1:03}")
# final measurements, take a "zodb analyze" snapshot
if not self.isNEO():
zodb_cmd = pathlib.Path(
self.computer_partition_root_path
) / 'software_release' / 'bin' / 'zodb'
root_fs = pathlib.Path(
self.getComputerPartitionPath('zodb')) / 'srv' / 'zodb' / 'root.fs'
self.write_measurement(
{
'zodb analyze':
subprocess.check_output((zodb_cmd, 'analyze', root_fs), text=True)
})
# and a pt-query-digest for slow log
pt_query_digest = pathlib.Path(
self.getComputerPartitionPath(
'mariadb')) / 'bin' / 'pt-query-digest'
mariadb_slowquery_log = pathlib.Path(
self.getComputerPartitionPath(
'mariadb')) / 'var' / 'log' / 'mariadb_slowquery.log'
self.write_measurement(
{
'pt-query-digest':
subprocess.check_output(
(pt_query_digest, mariadb_slowquery_log), text=True)
})
slapos-master-software-erp5-test/software/erp5/test/test/test_balancer.py 0000664 0000000 0000000 00000121175 14601336532 0027163 0 ustar 00root root 0000000 0000000 import glob
import ipaddress
import json
import logging
import os
import re
import socket
import subprocess
import sqlite3
import tempfile
import time
import urllib.parse
from http.server import BaseHTTPRequestHandler
from unittest import mock
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
import OpenSSL.SSL
import pexpect
import psutil
import requests
from slapos.proxy.db_version import DB_VERSION
from slapos.testing.utils import CrontabMixin, ManagedHTTPServer
from . import CaucaseCertificate, CaucaseService, ERP5InstanceTestCase, default, matrix, setUpModule
setUpModule # pyflakes
class EchoHTTPServer(ManagedHTTPServer):
"""An HTTP Server responding with the request path and incoming headers,
encoded in json.
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = json.dumps(
{
'Path': self.path,
'Incoming Headers': dict(self.headers.items()),
},
indent=2,
).encode('utf-8')
self.end_headers()
self.wfile.write(response)
log_message = logging.getLogger(__name__ + '.EchoHTTPServer').info
class EchoHTTP11Server(ManagedHTTPServer):
"""An HTTP/1.1 Server responding with the request path and incoming headers,
encoded in json.
"""
class RequestHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def do_GET(self) -> None:
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = json.dumps(
{
'Path': self.path,
'Incoming Headers': dict(self.headers.items()),
},
indent=2,
).encode('utf-8')
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
log_message = logging.getLogger(__name__ + '.EchoHTTP11Server').info
class BalancerTestCase(ERP5InstanceTestCase):
# We explicitly specify 'balancer' as our software type here,
# therefore we don't request ZODB. We therefore don't
# need to run these tests with both NEO and ZEO mode,
# it wouldn't make any difference.
# https://lab.nexedi.com/nexedi/slapos/blob/273037c8/stack/erp5/instance.cfg.in#L216-230
__test_matrix__ = matrix((default,))
@classmethod
def getInstanceSoftwareType(cls):
return 'balancer'
@classmethod
def _getInstanceParameterDict(cls) -> dict:
return {
'tcpv4-port': 8000,
'computer-memory-percent-threshold': 100,
# XXX what is this ? should probably not be needed here
'name': cls.__name__,
'monitor-passwd': 'secret',
'apachedex-configuration': [
'--logformat', '%h %l %u %t "%r" %>s %O "%{Referer}i" "%{User-Agent}i" %{ms}T',
'--erp5-base', '+erp5', '.*/VirtualHostRoot/erp5(/|\\?|$)',
'--base', '+other', '/',
'--skip-user-agent', 'Zabbix',
'--error-detail',
'--js-embed',
'--quiet',
],
'apachedex-promise-threshold': 100,
'haproxy-server-check-path': '/',
'zope-family-dict': {
'default': ['dummy_http_server'],
},
'dummy_http_server': [[cls.getManagedResource("backend_web_server", EchoHTTPServer).netloc, 1, False]],
'ssl-authentication-dict': {'default': False},
'ssl': {},
'timeout-dict': {'default': None},
'frontend-parameter-dict': {
'default': {
'internal-path': '',
'zope-family': 'default',
},
},
'family-path-routing-dict': {},
'path-routing-list': [],
}
@classmethod
def getInstanceParameterDict(cls) -> dict:
return {'_': json.dumps(cls._getInstanceParameterDict())}
def setUp(self) -> None:
self.default_balancer_direct_url = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])['default']
self.default_balancer_zope_url = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])['url-backend-default']
class TestURLRewrite(BalancerTestCase):
__partition_reference__ = 'ur'
def test_direct(self):
self.assertEqual(requests.get(self.default_balancer_direct_url, verify=False).json()['Path'], '/')
self.assertEqual(
requests.get(
urllib.parse.urljoin(
self.default_balancer_direct_url,
'/VirtualHostBase/https/example.com:443/VirtualHostRoot/path'),
verify=False
).json()['Path'],
'/VirtualHostBase/https/example.com:443/VirtualHostRoot/path')
def test_zope(self):
netloc = urllib.parse.urlparse(self.default_balancer_zope_url).netloc
self.assertEqual(
requests.get(self.default_balancer_zope_url, verify=False).json()['Path'],
f'/VirtualHostBase/https/{netloc}/VirtualHostRoot/')
self.assertEqual(
requests.get(urllib.parse.urljoin(
self.default_balancer_zope_url, 'path'), verify=False).json()['Path'],
f'/VirtualHostBase/https/{netloc}/VirtualHostRoot/path')
self.assertEqual(
requests.get(
urllib.parse.urljoin(
self.default_balancer_zope_url,
'/VirtualHostBase/https/example.com:443/VirtualHostRoot/path'),
verify=False
).json()['Path'],
f'/VirtualHostBase/https/{netloc}/VirtualHostRoot/VirtualHostBase/https/example.com:443/VirtualHostRoot/path')
def test_bad_host(self):
self.assertEqual(
requests.get(self.default_balancer_zope_url, headers={'Host': 'a/b'}, verify=False).status_code,
requests.codes.bad_request)
class SlowHTTPServer(ManagedHTTPServer):
"""An HTTP Server which reply after a timeout.
Timeout is 2 seconds by default, and can be specified in the path of the URL:
GET /{timeout}
but because balancer rewrites the URL, the actual URL used by this server is:
GET /VirtualHostBase/https/{host}/VirtualHostRoot/{timeout}
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
timeout = 2
if self.path == '/': # for health checks
timeout = 0
try:
timeout = int(self.path.split('/')[5])
except (ValueError, IndexError):
pass
self.send_response(200)
self.send_header("Content-Type", "text/plain")
time.sleep(timeout)
self.end_headers()
self.wfile.write(b"OK\n")
log_message = logging.getLogger(__name__ + '.SlowHTTPServer').info
class TestTimeout(BalancerTestCase, CrontabMixin):
__partition_reference__ = 't'
@classmethod
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
# use a slow server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]]
# and set timeout of 1 second
parameter_dict['timeout-dict'] = {'default': 1}
return parameter_dict
def test_timeout(self) -> None:
self.assertEqual(
requests.get(
urllib.parse.urljoin(self.default_balancer_zope_url, '/1'),
verify=False).status_code,
requests.codes.ok)
self.assertEqual(
requests.get(
urllib.parse.urljoin(self.default_balancer_zope_url, '/5'),
verify=False).status_code,
requests.codes.gateway_timeout)
class TestLog(BalancerTestCase, CrontabMixin):
"""Check logs emitted by balancer
"""
__partition_reference__ = 'l'
@classmethod
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
# use a slow server instead, so that we can test logs with slow requests
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]]
return parameter_dict
def test_access_log_format(self) -> None:
requests.get(
urllib.parse.urljoin(self.default_balancer_zope_url, '/url_path'),
verify=False,
)
time.sleep(.5) # wait a bit more until access is logged
with open(os.path.join(self.computer_partition_root_path, 'var', 'log', 'apache-access.log')) as access_log_file:
access_line = access_log_file.read().splitlines()[-1]
self.assertIn('/url_path', access_line)
# last \d is the request time in milli seconds, since this SlowHTTPServer
# sleeps for 2 seconds, it should take between 2 and 3 seconds to process
# the request - but our test machines can be slow sometimes, so we tolerate
# it can take up to 20 seconds.
match = re.match(
r'([(\da-fA-F:\.)]+) - - \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)" (\d+)',
access_line
)
self.assertTrue(match)
assert match
request_time = int(match.groups()[-1])
self.assertGreater(request_time, 2 * 1000)
self.assertLess(request_time, 20 * 1000)
def test_access_log_apachedex_report(self) -> None:
# make a request so that we have something in the logs
requests.get(self.default_balancer_zope_url, verify=False)
# crontab for apachedex is executed
self._executeCrontabAtDate('generate-apachedex-report', '23:59')
# it creates a report for the day
apachedex_report, = glob.glob(
os.path.join(
self.computer_partition_root_path,
'srv',
'monitor',
'private',
'apachedex',
'ApacheDex-*.html',
))
with open(apachedex_report) as f:
report_text = f.read()
self.assertIn('APacheDEX', report_text)
# having this table means that apachedex could parse some lines.
self.assertIn('
Hits per status code
', report_text)
def test_access_log_rotation(self) -> None:
# run logrotate a first time so that it create state files
self._executeCrontabAtDate('logrotate', '2000-01-01')
# make a request so that we have something in the logs
requests.get(self.default_balancer_zope_url, verify=False).raise_for_status()
# slow query crontab depends on crontab for log rotation
# to be executed first.
self._executeCrontabAtDate('logrotate', '2050-01-01')
# this logrotate leaves the log for the day as non compressed
rotated_log_file = os.path.join(
self.computer_partition_root_path,
'srv',
'backup',
'logrotate',
'apache-access.log-20500101',
)
self.assertTrue(os.path.exists(rotated_log_file))
requests.get(self.default_balancer_zope_url, verify=False).raise_for_status()
# on next day execution of logrotate, log files are compressed
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
def test_error_log(self) -> None:
# stop backend server
backend_server = self.getManagedResource("slow_web_server", SlowHTTPServer)
self.addCleanup(backend_server.open)
backend_server.close()
# after a while, balancer should detect and log this event in error log
time.sleep(5)
self.assertEqual(
requests.get(self.default_balancer_zope_url, verify=False).status_code,
requests.codes.service_unavailable)
with open(os.path.join(self.computer_partition_root_path, 'var', 'log', 'apache-error.log')) as error_log_file:
error_line = error_log_file.read().splitlines()[-1]
self.assertIn('backend default has no server available!', error_line)
# this log also include a timestamp
self.assertRegex(error_line, r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}')
class BalancerCookieHTTPServer(ManagedHTTPServer):
"""An HTTP Server which can set balancer cookie.
This server set cookie when requested /set-cookie path (actually
/VirtualHostBase/https/{host}/VirtualHostRoot/set-cookie , which is
added by balancer proxy)
The reply body is the name used when registering this resource
using getManagedResource. This way we can assert which
backend replied.
"""
@property
def RequestHandler(self):
server = self
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
self.send_response(200)
self.send_header("Content-Type", "text/plain")
if self.path != '/' and self.path.split('/')[5] == 'set_cookie':
# the balancer tells the backend what's the name of the balancer cookie with
# the X-Balancer-Current-Cookie header.
self.send_header('Set-Cookie', '%s=anything' % self.headers['X-Balancer-Current-Cookie'])
# The name of this cookie is SERVERID
assert self.headers['X-Balancer-Current-Cookie'] == 'SERVERID'
self.end_headers()
self.wfile.write(server._name.encode('utf-8'))
log_message = logging.getLogger(__name__ + '.BalancerCookieHTTPServer').info
return RequestHandler
class TestBalancer(BalancerTestCase):
"""Check balancing capabilities
"""
__partition_reference__ = 'b'
@classmethod
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
# use two backend servers
parameter_dict['dummy_http_server'] = [
[cls.getManagedResource("backend_web_server1", BalancerCookieHTTPServer).netloc, 1, False],
[cls.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).netloc, 1, False],
]
return parameter_dict
def test_balancer_round_robin(self) -> None:
# requests are by default balanced to both servers
self.assertEqual(
{requests.get(self.default_balancer_zope_url, verify=False).text for _ in range(10)},
{'backend_web_server1', 'backend_web_server2'}
)
def test_balancer_server_down(self) -> None:
# if one backend is down, it is excluded from balancer
self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).close()
self.addCleanup(self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).open)
self.assertEqual(
{requests.get(self.default_balancer_zope_url, verify=False).text for _ in range(10)},
{'backend_web_server1',}
)
def test_balancer_set_cookie(self) -> None:
# if backend provides a "SERVERID" cookie, balancer will overwrite it with the
# backend selected by balancing algorithm
self.assertIn(
requests.get(urllib.parse.urljoin(self.default_balancer_zope_url, '/set_cookie'), verify=False).cookies['SERVERID'],
('default-0', 'default-1'),
)
def test_balancer_respects_sticky_cookie(self) -> None:
# if request is made with the sticky cookie, the client stick on one balancer
cookies = dict(SERVERID='default-1')
self.assertEqual(
{requests.get(self.default_balancer_zope_url, verify=False, cookies=cookies).text for _ in range(10)},
{'backend_web_server2',}
)
# if that backend becomes down, requests are balanced to another server
self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).close()
self.addCleanup(self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).open)
self.assertEqual(
requests.get(self.default_balancer_zope_url, verify=False, cookies=cookies).text,
'backend_web_server1')
def test_balancer_stats_socket(self) -> None:
# real time statistics can be obtained by using the stats socket and there
# is a wrapper which makes this a bit easier.
socat_process = subprocess.Popen(
[self.computer_partition_root_path + '/bin/haproxy-socat-stats'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
try:
output, _ = socat_process.communicate(b"show stat\n")
except:
socat_process.kill()
socat_process.wait()
raise
self.assertEqual(socat_process.poll(), 0)
# output is a csv
self.assertIn(b'\ndefault,BACKEND,', output)
class TestTestRunnerEntryPoints(BalancerTestCase):
"""Check balancer has some entries for test runner.
"""
__partition_reference__ = 't'
@classmethod
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
parameter_dict['dummy_http_server-test-runner-address-list'] = [
[
cls.getManagedResource("backend_0", EchoHTTPServer).hostname,
cls.getManagedResource("backend_0", EchoHTTPServer).port,
],
[
cls.getManagedResource("backend_1", EchoHTTPServer).hostname,
cls.getManagedResource("backend_1", EchoHTTPServer).port,
],
[
cls.getManagedResource("backend_2", EchoHTTPServer).hostname,
cls.getManagedResource("backend_2", EchoHTTPServer).port,
],
]
return parameter_dict
def test_use_proper_backend(self) -> None:
# requests are directed to proper backend based on URL path
test_runner_url_list = self.getRootPartitionConnectionParameterDict(
)['default-test-runner-url-list']
url_0, url_1, url_2 = test_runner_url_list
self.assertEqual(
urllib.parse.urlparse(url_0).netloc,
urllib.parse.urlparse(url_1).netloc)
self.assertEqual(
urllib.parse.urlparse(url_0).netloc,
urllib.parse.urlparse(url_2).netloc)
path_0 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_0/something'.format(
netloc=urllib.parse.urlparse(url_0).netloc)
path_1 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_1/something'.format(
netloc=urllib.parse.urlparse(url_0).netloc)
path_2 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_2/something'.format(
netloc=urllib.parse.urlparse(url_0).netloc)
self.assertEqual(
{
requests.get(url_0 + 'something', verify=False).json()['Path']
for _ in range(10)
}, {path_0})
self.assertEqual(
{
requests.get(url_1 + 'something', verify=False).json()['Path']
for _ in range(10)
}, {path_1})
self.assertEqual(
{
requests.get(url_2 + 'something', verify=False).json()['Path']
for _ in range(10)
}, {path_2})
# If a test runner backend is down, others can be accessed.
self.getManagedResource("backend_0", EchoHTTPServer).close()
self.assertEqual(
{
requests.get(url_0 + 'something', verify=False).status_code
for _ in range(5)
}, {503})
self.assertEqual(
{
requests.get(url_1 + 'something', verify=False).json()['Path']
for _ in range(10)
}, {path_1})
class TestHTTP(BalancerTestCase):
"""Check HTTP protocol with a HTTP/1.1 backend
"""
@classmethod
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
# use a HTTP/1.1 server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("HTTP/1.1 Server", EchoHTTP11Server).netloc, 1, False]]
return parameter_dict
__partition_reference__ = 'h'
def test_http_version(self) -> None:
self.assertEqual(
subprocess.check_output([
'curl',
'--silent',
'--show-error',
'--output',
'/dev/null',
'--insecure',
'--write-out',
'%{http_version}',
self.default_balancer_zope_url,
]),
b'2',
)
def test_keep_alive(self) -> None:
# when doing two requests, connection is established only once
with requests.Session() as session:
session.verify = False
# do a first request, which establish a first connection
session.get(self.default_balancer_zope_url).raise_for_status()
# "break" new connection method and check we can make another request
with mock.patch(
"requests.packages.urllib3.connectionpool.HTTPSConnectionPool._new_conn",
) as new_conn:
session.get(self.default_balancer_zope_url).raise_for_status()
new_conn.assert_not_called()
parsed_url = urllib.parse.urlparse(self.default_balancer_zope_url)
# check that we have an open file for the ip connection
self.assertTrue([
c for c in psutil.Process(os.getpid()).connections()
if c.status == 'ESTABLISHED' and c.raddr.ip == parsed_url.hostname
and c.raddr.port == parsed_url.port
])
class TestServerTLSEmbeddedCaucase(BalancerTestCase):
"""Check Server TLS with embedded caucase
"""
__partition_reference__ = 's'
def _getCaucaseCACertificatePath(self) -> str:
"""Returns the path of the caucase certificate on file system.
"""
ca_cert = tempfile.NamedTemporaryFile(
prefix="ca.crt.pem",
mode="w",
delete=False,
)
ca_cert.write(
requests.get(
urllib.parse.urljoin(
self.getRootPartitionConnectionParameterDict()['caucase-http-url'],
'/cas/crt/ca.crt.pem',
)).text)
ca_cert.flush()
self.addCleanup(os.unlink, ca_cert.name)
return ca_cert.name
def _getServerCertificate(self, hostname: str, port: int) -> x509.base.Certificate:
sock = socket.socket(socket.AF_INET6 if ':' in hostname else socket.AF_INET)
sock.connect((hostname, port))
ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
sock_ssl = OpenSSL.SSL.Connection(ctx, sock)
sock_ssl.set_connect_state()
sock_ssl.set_tlsext_host_name(hostname.encode())
sock_ssl.do_handshake()
cert = sock_ssl.get_peer_certificate()
crypto_cert = cert.to_cryptography()
sock_ssl.close()
sock.close()
return crypto_cert
def test_certificate_validates_with_caucase_ca(self) -> None:
requests.get(self.default_balancer_zope_url, verify=self._getCaucaseCACertificatePath())
def test_certificate_renewal(self) -> None:
balancer_parsed_url = urllib.parse.urlparse(self.default_balancer_zope_url)
certificate_before_renewal = self._getServerCertificate(
balancer_parsed_url.hostname,
balancer_parsed_url.port)
# run caucase updater in the future, so that certificate is renewed
caucase_updater, = glob.glob(
os.path.join(
self.computer_partition_root_path,
'etc',
'service',
'caucase-updater-haproxy-certificate-*',
))
process = pexpect.spawnu("faketime +90days " + caucase_updater)
logger = self.logger
class DebugLogFile:
def write(self, msg):
logger.info("output from caucase_updater: %s", msg)
def flush(self):
pass
process.logfile = DebugLogFile()
process.expect("Renewing .*\nNext wake-up.*")
process.terminate()
process.wait()
# wait for server to use new certificate
for _ in range(30):
certificate_after_renewal = self._getServerCertificate(
balancer_parsed_url.hostname,
balancer_parsed_url.port)
if certificate_after_renewal.not_valid_before > certificate_before_renewal.not_valid_before:
break
time.sleep(.5)
self.assertGreater(
certificate_after_renewal.not_valid_before,
certificate_before_renewal.not_valid_before,
)
# requests are served properly after certificate renewal
self.test_certificate_validates_with_caucase_ca()
class TestServerTLSExternalCaucase(TestServerTLSEmbeddedCaucase):
"""Check Server TLS with external caucase
"""
@classmethod
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
parameter_dict['ssl']['caucase-url'] = cls.getManagedResource(
"caucase", CaucaseService).url
return parameter_dict
def test_published_caucase_http_url_parameter(self) -> None:
self.assertEqual(
self.getRootPartitionConnectionParameterDict()['caucase-http-url'],
self.getManagedResource("caucase", CaucaseService).url,
)
class TestServerTLSCSRTemplateParameter(TestServerTLSExternalCaucase):
"""Check Server TLS with a CSR template passed as parameter
"""
@classmethod
def _getInstanceParameterDict(cls) -> dict:
# use a CSR template with this subject, we'll assert that the
# certificate used by haproxy has same subject.
cls.csr_subject = subject = x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, cls.__name__)])
# Add all IPs of the computer in SubjectAlternativeName, we don't
# know what will be the IP of the balancer partition.
with sqlite3.connect(cls.slap._proxy_database) as db:
ip_address_list = [
x509.IPAddress(ipaddress.ip_address(r)) for (r, ) in db.execute(
f"SELECT address FROM partition_network{DB_VERSION}").fetchall()
]
assert ip_address_list
csr = x509.CertificateSigningRequestBuilder().subject_name(
subject).add_extension(
x509.SubjectAlternativeName(ip_address_list),
critical=True,
).sign(
rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend(),
),
hashes.SHA256(),
default_backend(),
)
parameter_dict = super()._getInstanceParameterDict()
parameter_dict['ssl']['csr'] = csr.public_bytes(serialization.Encoding.PEM).decode()
return parameter_dict
def test_certificate_validates_with_caucase_ca(self) -> None:
super().test_certificate_validates_with_caucase_ca()
balancer_parsed_url = urllib.parse.urlparse(self.default_balancer_zope_url)
cert = self._getServerCertificate(
balancer_parsed_url.hostname,
balancer_parsed_url.port,
)
self.assertEqual(
cert.subject.rfc4514_string(),
self.csr_subject.rfc4514_string())
class ContentTypeHTTPServer(ManagedHTTPServer):
"""An HTTP/1.1 Server which reply with content type from path.
For example when requested http://host/text/plain it will reply
with Content-Type: text/plain header.
This actually uses a URL like this to support zope style virtual host:
GET /VirtualHostBase/https/{host}/VirtualHostRoot/text/plain
The body is always "OK"
"""
class RequestHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def do_GET(self) -> None:
self.send_response(200)
if self.path == '/':
self.send_header("Content-Length", '0')
return self.end_headers()
content_type = '/'.join(self.path.split('/')[5:])
body = b"OK"
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
log_message = logging.getLogger(__name__ + '.ContentTypeHTTPServer').info
class TestContentEncoding(BalancerTestCase):
"""Test how responses are gzip encoded or not depending on content type header.
"""
__partition_reference__ = 'ce'
@classmethod
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
parameter_dict['dummy_http_server'] = [
[cls.getManagedResource("content_type_server", ContentTypeHTTPServer).netloc, 1, False],
]
return parameter_dict
def test_gzip_encoding(self) -> None:
for content_type in (
'text/cache-manifest',
'text/html',
'text/plain',
'text/css',
'application/hal+json',
'application/json',
'application/x-javascript',
'text/xml',
'application/xml',
'application/rss+xml',
'text/javascript',
'application/javascript',
'image/svg+xml',
'application/x-font-ttf',
'application/font-woff',
'application/font-woff2',
'application/x-font-opentype',
'application/wasm',):
resp = requests.get(urllib.parse.urljoin(self.default_balancer_zope_url, content_type), verify=False)
self.assertEqual(resp.headers['Content-Type'], content_type)
self.assertEqual(
resp.headers.get('Content-Encoding'),
'gzip',
'{} uses wrong encoding: {}'.format(content_type, resp.headers.get('Content-Encoding')))
self.assertEqual(resp.text, 'OK')
def test_no_gzip_encoding(self) -> None:
resp = requests.get(urllib.parse.urljoin(self.default_balancer_zope_url, '/image/png'), verify=False)
self.assertNotIn('Content-Encoding', resp.headers)
self.assertEqual(resp.text, 'OK')
class TestFrontendXForwardedFor(BalancerTestCase):
__partition_reference__ = 'xff'
@classmethod
def _getInstanceParameterDict(cls) -> dict:
frontend_caucase = cls.getManagedResource('frontend_caucase', CaucaseService)
certificate = cls.getManagedResource('client_certificate', CaucaseCertificate)
certificate.request('shared frontend', frontend_caucase)
parameter_dict = super()._getInstanceParameterDict()
# add another "-auth" backend, that will have ssl-authentication enabled
parameter_dict['zope-family-dict']['default-auth'] = ['dummy_http_server']
parameter_dict['ssl-authentication-dict'] = {
'default': False,
'default-auth': True,
}
parameter_dict['timeout-dict']['default-auth'] = None
parameter_dict['ssl']['frontend-caucase-url-list'] = [frontend_caucase.url]
return parameter_dict
def test_x_forwarded_for_added_when_verified_connection(self) -> None:
client_certificate = self.getManagedResource('client_certificate', CaucaseCertificate)
for backend in ('default', 'default-auth'):
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])[backend]
result = requests.get(
balancer_url,
headers={'X-Forwarded-For': '1.2.3.4'},
cert=(client_certificate.cert_file, client_certificate.key_file),
verify=False,
).json()
self.assertEqual(result['Incoming Headers'].get('x-forwarded-for', '').split(', ')[0], '1.2.3.4')
def test_x_forwarded_for_stripped_when_no_certificate(self) -> None:
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default']
result = requests.get(
balancer_url,
headers={'X-Forwarded-For': '1.2.3.4'},
verify=False,
).json()
self.assertNotIn('x-fowarded-for', [k.lower() for k in result['Incoming Headers'].keys()])
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default-auth']
with self.assertRaisesRegex(Exception, "certificate required"):
requests.get(
balancer_url,
headers={'X-Forwarded-For': '1.2.3.4'},
verify=False,
)
def test_x_forwarded_for_stripped_when_not_verified_certificate(self) -> None:
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default']
# certificate from an unknown CA
another_unrelated_caucase = self.getManagedResource('another_unrelated_caucase', CaucaseService)
unknown_client_certificate = self.getManagedResource('unknown_client_certificate', CaucaseCertificate)
unknown_client_certificate.request('unknown client certificate', another_unrelated_caucase)
result = requests.get(
balancer_url,
headers={'X-Forwarded-For': '1.2.3.4'},
cert=(unknown_client_certificate.cert_file, unknown_client_certificate.key_file),
verify=False,
).json()
self.assertNotIn('x-fowarded-for', [k.lower() for k in result['Incoming Headers'].keys()])
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default-auth']
with self.assertRaisesRegex(Exception, "unknown ca"):
requests.get(
balancer_url,
headers={'X-Forwarded-For': '1.2.3.4'},
cert=(unknown_client_certificate.cert_file, unknown_client_certificate.key_file),
verify=False,
)
class TestServerTLSProvidedCertificate(BalancerTestCase):
"""Check that certificate and key can be provided as instance parameters.
"""
__partition_reference__ = 's'
@classmethod
def _getInstanceParameterDict(cls) -> dict:
server_caucase = cls.getManagedResource('server_caucase', CaucaseService)
server_certificate = cls.getManagedResource('server_certificate', CaucaseCertificate)
# Add all IPs of the computer in SubjectAlternativeName, we don't
# know what will be the IP of the balancer partition.
with sqlite3.connect(cls.slap._proxy_database) as db:
ip_address_list = [
x509.IPAddress(ipaddress.ip_address(r)) for (r, ) in db.execute(
f"SELECT address FROM partition_network{DB_VERSION}").fetchall()
]
assert ip_address_list
server_certificate.request(
cls.__name__,
server_caucase,
x509.SubjectAlternativeName(ip_address_list))
parameter_dict = super()._getInstanceParameterDict()
with open(server_certificate.cert_file) as f:
parameter_dict['ssl']['cert'] = f.read()
with open(server_certificate.key_file) as f:
parameter_dict['ssl']['key'] = f.read()
return parameter_dict
def test_certificate_validates_with_provided_ca(self) -> None:
server_certificate = self.getManagedResource("server_certificate", CaucaseCertificate)
requests.get(self.default_balancer_zope_url, verify=server_certificate.ca_crt_file)
class TestClientTLS(BalancerTestCase):
__partition_reference__ = 'c'
@classmethod
def _getInstanceParameterDict(cls) -> dict:
frontend_caucase1 = cls.getManagedResource('frontend_caucase1', CaucaseService)
certificate1 = cls.getManagedResource('client_certificate1', CaucaseCertificate)
certificate1.request('client_certificate1', frontend_caucase1)
frontend_caucase2 = cls.getManagedResource('frontend_caucase2', CaucaseService)
certificate2 = cls.getManagedResource('client_certificate2', CaucaseCertificate)
certificate2.request('client_certificate2', frontend_caucase2)
parameter_dict = super()._getInstanceParameterDict()
parameter_dict['ssl-authentication-dict'] = {
'default': True,
}
parameter_dict['ssl']['frontend-caucase-url-list'] = [
frontend_caucase1.url,
frontend_caucase2.url,
]
return parameter_dict
def test_refresh_crl(self) -> None:
logger = self.logger
class DebugLogFile:
def write(self, msg):
logger.info("output from caucase_updater: %s", msg)
def flush(self):
pass
for client_certificate_name, caucase_name in (
('client_certificate1', 'frontend_caucase1'),
('client_certificate2', 'frontend_caucase2'),
):
client_certificate = self.getManagedResource(client_certificate_name,
CaucaseCertificate)
# when client certificate can be authenticated, backend receive the CN of
# the client certificate in "remote-user" header
def _make_request() -> dict:
return requests.get(
self.default_balancer_zope_url,
cert=(client_certificate.cert_file, client_certificate.key_file),
verify=False,
).json()
self.assertEqual(_make_request()['Incoming Headers'].get('remote-user'),
client_certificate_name)
# when certificate is revoked, updater service should update the CRL
# used by balancer from the caucase service used for client certificates
# (ie. the one used by frontend).
caucase = self.getManagedResource(caucase_name, CaucaseService)
client_certificate.revoke(caucase)
# until the CRL is updated, the client certificate is still accepted.
self.assertEqual(_make_request()['Incoming Headers'].get('remote-user'),
client_certificate_name)
# We have two services in charge of updating CRL and CA certificates for
# each frontend CA, plus the one for the balancer's own certificate
caucase_updater_list = glob.glob(
os.path.join(
self.computer_partition_root_path,
'etc',
'service',
'caucase-updater-*',
))
self.assertEqual(len(caucase_updater_list), 3)
# find the one corresponding to this caucase
for caucase_updater_candidate in caucase_updater_list:
with open(caucase_updater_candidate) as f:
if caucase.url in f.read():
caucase_updater = caucase_updater_candidate
break
else:
self.fail("Could not find caucase updater script for %s" % caucase.url)
# simulate running updater service in the future, to confirm that it fetches
# the new CRL and make sure balancer uses that new CRL.
process = pexpect.spawnu("faketime +1day %s" % caucase_updater)
process.logfile = DebugLogFile()
process.expect("Got new CRL.*Next wake-up at.*")
process.terminate()
process.wait()
with self.assertRaisesRegex(Exception, 'certificate revoked'):
_make_request()
class TestPathBasedRouting(BalancerTestCase):
"""Check path-based routing rewrites URLs as expected.
"""
__partition_reference__ = 'pbr'
@classmethod
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
parameter_dict['zope-family-dict'][
'second'
] = parameter_dict['zope-family-dict'][
'default'
]
parameter_dict['timeout-dict']['second'] = None
parameter_dict['ssl-authentication-dict']['second'] = False
# Routing rules outermost slashes mean nothing. They are internally
# stripped and rebuilt in order to correctly represent the request's URL.
parameter_dict['family-path-routing-dict'] = {
'default': [
['foo/bar', 'erp5/boo/far/faz'], # no outermost slashes
['/foo', '/erp5/somewhere'],
['/foo/shadowed', '/foo_shadowed'], # unreachable
['/next', '/erp5/web_site_module/another_next_website'],
],
}
parameter_dict['path-routing-list'] = [
['/next', '/erp5/web_site_module/the_next_website'],
['/next2', '/erp5/web_site_module/the_next2_website'],
['//', '//erp5/web_site_module/123//'], # extraneous slashes
]
return parameter_dict
def test_routing(self) -> None:
published_dict = json.loads(self.computer_partition.getConnectionParameterDict()['_'])
scheme = 'scheme'
netloc = 'example.com:8080'
prefix = '/VirtualHostBase/' + scheme + '//' + urllib.parse.quote(
netloc,
safe='',
)
# For easier reading of test data, visually separating the virtual host
# base from the virtual host root
vhr = '/VirtualHostRoot'
def assertRoutingEqual(family: str, path: str, expected_path: str) -> None:
# sanity check: unlike the rules, this test is sensitive to outermost
# slashes, and paths must be absolute-ish for code simplicity.
assert path.startswith('/')
# Frontend is expected to provide URLs with the following path structure:
# /VirtualHostBase////VirtualHostRoot
# where:
# - scheme is the user-input scheme
# - netloc is the user-input netloc
# - path is the user-input path
# Someday, frontends will instead propagate scheme and netloc via other
# means (likely: HTTP headers), in which case this test and the SR will
# need to be amended to reconstruct Virtual Host urls itself, and this
# test will need to be updated accordingly.
self.assertEqual(
requests.get(
urllib.parse.urljoin(published_dict[family], prefix + vhr + path),
verify=False,
).json()['Path'],
expected_path,
)
# Trailing slash presence is preserved.
assertRoutingEqual('default', '/foo/bar', prefix + '/erp5/boo/far/faz' + vhr + '/_vh_foo/_vh_bar')
assertRoutingEqual('default', '/foo/bar/', prefix + '/erp5/boo/far/faz' + vhr + '/_vh_foo/_vh_bar/')
# Subpaths are preserved.
assertRoutingEqual('default', '/foo/bar/hey', prefix + '/erp5/boo/far/faz' + vhr + '/_vh_foo/_vh_bar/hey')
# Rule precedence: later less-specific rules are applied.
assertRoutingEqual('default', '/foo', prefix + '/erp5/somewhere' + vhr + '/_vh_foo')
assertRoutingEqual('default', '/foo/', prefix + '/erp5/somewhere' + vhr + '/_vh_foo/')
assertRoutingEqual('default', '/foo/baz', prefix + '/erp5/somewhere' + vhr + '/_vh_foo/baz')
# Rule precedence: later more-specific rules are meaningless.
assertRoutingEqual('default', '/foo/shadowed', prefix + '/erp5/somewhere' + vhr + '/_vh_foo/shadowed')
# Rule precedence: family rules applied before general rules.
assertRoutingEqual('default', '/next', prefix + '/erp5/web_site_module/another_next_website' + vhr + '/_vh_next')
# Fallback on general rules when no family-specific rule matches
# Note: the root is special in that there is always a trailing slash in the
# produced URL.
assertRoutingEqual('default', '/', prefix + '/erp5/web_site_module/123' + vhr + '/')
# Rule-less family reach general rules.
assertRoutingEqual('second', '/foo/bar', prefix + '/erp5/web_site_module/123' + vhr + '/foo/bar') # Rules match whole-elements, so the rule order does not matter to
# elements which share a common prefix.
assertRoutingEqual('second', '/next', prefix + '/erp5/web_site_module/the_next_website' + vhr + '/_vh_next')
assertRoutingEqual('second', '/next2', prefix + '/erp5/web_site_module/the_next2_website' + vhr + '/_vh_next2')
slapos-master-software-erp5-test/software/erp5/test/test/test_erp5.py 0000664 0000000 0000000 00000146026 14601336532 0026271 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2022 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import contextlib
import datetime
import glob
import http.client
import json
import os
import pathlib
import resource
import shutil
import socket
import sqlite3
import ssl
import subprocess
import tempfile
import time
import unittest
import urllib.parse
import xmlrpc.client
import psutil
import requests
import urllib3
from slapos.testing.utils import CrontabMixin
import zc.buildout.configparser
from . import CaucaseService, ERP5InstanceTestCase, default, matrix, neo, setUpModule
setUpModule # pyflakes
class TestPublishedURLIsReachableMixin:
"""Mixin that checks that default page of ERP5 is reachable.
"""
@contextlib.contextmanager
def requestSession(self, base_url):
# What happens is that instantiation just create the services, but does not
# wait for ERP5 to be initialized. When this test run ERP5 instance is
# instantiated, but zope is still busy creating the site and haproxy replies
# with 503 Service Unavailable when zope is not started yet, with 404 when
# erp5 site is not created, with 500 when mysql is not yet reachable, so we
# configure this requests session to retry.
# XXX we should probably add a promise instead
with requests.Session() as session:
session.mount(
base_url,
requests.adapters.HTTPAdapter(
max_retries=urllib3.util.retry.Retry(
total=20,
backoff_factor=.5,
status_forcelist=(404, 500, 503))))
yield session
def _checkERP5IsReachableWithVirtualHost(self, url, verify):
with self.requestSession(urllib.parse.urljoin(url, '/')) as session:
r = session.get(url, verify=verify, allow_redirects=True)
# access on / are redirected to login form
self.assertTrue(r.url.endswith('/login_form'))
self.assertEqual(r.status_code, requests.codes.ok)
self.assertIn("ERP5", r.text)
# host header is used in redirected URL. The URL is always https
r = session.get(url, verify=verify, allow_redirects=False, headers={'Host': 'www.example.com'})
self.assertEqual(r.headers.get('Location'), 'https://www.example.com/login_form')
r = session.get(url, verify=verify, allow_redirects=False, headers={'Host': 'www.example.com:1234'})
self.assertEqual(r.headers.get('Location'), 'https://www.example.com:1234/login_form')
def _checkERP5IsReachableWithoutVirtualHost(self, base_url, site_id, verify):
# We access ERP5 trough a "virtual host", which should make
# ERP5 produce URLs using https://virtual-host-name:1234/virtual_host_root
# as base.
virtual_host_url = urllib.parse.urljoin(
base_url,
'/VirtualHostBase/https/virtual-host-name:1234/{}/VirtualHostRoot/_vh_virtual_host_root/'
.format(site_id))
with self.requestSession(base_url) as session:
r = session.get(virtual_host_url, verify=verify, allow_redirects=False)
self.assertEqual(r.status_code, requests.codes.found)
# access on / are redirected to login form, with virtual host preserved
self.assertEqual(r.headers.get('location'), 'https://virtual-host-name:1234/virtual_host_root/login_form')
# login page can be rendered and contain the text "ERP5"
r = session.get(
urllib.parse.urljoin(base_url, f'{site_id}/login_form'),
verify=verify,
allow_redirects=False,
)
self.assertEqual(r.status_code, requests.codes.ok)
self.assertIn("ERP5", r.text)
def _getCaucaseServiceCACertificate(self):
ca_cert = tempfile.NamedTemporaryFile(
prefix="ca.crt.pem",
mode="w",
delete=False,
)
ca_cert.write(
requests.get(
urllib.parse.urljoin(
self.getRootPartitionConnectionParameterDict()['caucase-http-url'],
'/cas/crt/ca.crt.pem',
)).text)
ca_cert.flush()
self.addCleanup(os.unlink, ca_cert.name)
return ca_cert.name
def test_published_family_default_v6_is_reachable(self):
"""Tests the IPv6 URL published by the root partition is reachable.
"""
param_dict = self.getRootPartitionConnectionParameterDict()
self._checkERP5IsReachableWithoutVirtualHost(
param_dict['family-default-v6'],
param_dict['site-id'],
self._getCaucaseServiceCACertificate(),
)
def test_published_family_default_v4_is_reachable(self):
"""Tests the IPv4 URL published by the root partition is reachable.
"""
param_dict = self.getRootPartitionConnectionParameterDict()
self._checkERP5IsReachableWithoutVirtualHost(
param_dict['family-default'],
param_dict['site-id'],
self._getCaucaseServiceCACertificate(),
)
def test_published_frontend_default_is_reachable(self):
"""Tests the frontend URL published by the root partition is reachable.
"""
param_dict = self.getRootPartitionConnectionParameterDict()
self._checkERP5IsReachableWithVirtualHost(
param_dict['url-frontend-default'],
self._getCaucaseServiceCACertificate(),
)
class TestDefaultParameters(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test ERP5 can be instantiated with no parameters
"""
__partition_reference__ = 'defp'
__test_matrix__ = matrix((default,))
def test_frontend_request(self):
with open(os.path.join(self.computer_partition_root_path,
'.installed-switch-softwaretype.cfg')) as f:
installed = zc.buildout.configparser.parse(f, 'installed')
self.assertEqual(
installed['request-frontend-default']['config-type'], '')
self.assertNotIn('config-path', installed['request-frontend-default'])
self.assertEqual(
installed['request-frontend-default']['config-authenticate-to-backend'], 'true')
self.assertEqual(installed['request-frontend-default']['shared'], 'true')
self.assertEqual(
installed['request-frontend-default']['name'], 'frontend-default')
self.assertEqual(
installed['request-frontend-default']['software-url'],
'http://git.erp5.org/gitweb/slapos.git/blob_plain/HEAD:/software/apache-frontend/software.cfg'
)
self.assertEqual(
installed['request-frontend-default']['connection-secure_access'],
self.getRootPartitionConnectionParameterDict()['url-frontend-default'])
class TestExternalCaucase(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test providing the URL of an external caucase in parameters.
"""
__partition_reference__ = 'ec'
@classmethod
def getInstanceParameterDict(cls) -> dict:
caucase_url = cls.getManagedResource("caucase", CaucaseService).url
return {'_': json.dumps({'caucase': {'url': caucase_url}})}
def test_published_caucase_http_url_parameter(self) -> None:
self.assertEqual(
self.getRootPartitionConnectionParameterDict()['caucase-http-url'],
self.getManagedResource("caucase", CaucaseService).url,
)
class TestReinstantiateWithExternalCaucase(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test providing the URL of an external caucase in parameters after
the initial instantiation.
"""
__partition_reference__ = 'sc'
def test_switch_to_external_caucase(self) -> None:
# this also waits that ERP5 is fully ready
self.test_published_frontend_default_is_reachable()
external_caucase_url = self.getManagedResource("caucase", CaucaseService).url
partition_parameter_kw = {
'_':
json.dumps(
dict(
json.loads(self.getInstanceParameterDict()['_']),
caucase={'url': external_caucase_url}))
}
def rerequest():
return self.slap.request(
software_release=self.getSoftwareURL(),
software_type=self.getInstanceSoftwareType(),
partition_reference=self.default_partition_reference,
partition_parameter_kw=partition_parameter_kw,
state='started')
rerequest()
self.slap.waitForInstance(max_retry=10)
self.assertEqual(
json.loads(rerequest().getConnectionParameterDict()['_'])['caucase-http-url'],
external_caucase_url)
with tempfile.NamedTemporaryFile(mode="w") as ca_cert:
ca_cert.write(
requests.get(
urllib.parse.urljoin(
external_caucase_url,
'/cas/crt/ca.crt.pem',
)).text)
ca_cert.flush()
requests.get(
self.getRootPartitionConnectionParameterDict()['url-frontend-default'],
verify=ca_cert.name).raise_for_status()
class TestJupyter(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test ERP5 Jupyter notebook
"""
__partition_reference__ = 'jupyter'
@classmethod
def getInstanceParameterDict(cls):
return {'_': json.dumps({'jupyter': {'enable': True}})}
def test_jupyter_notebook_is_reachable(self):
param_dict = self.getRootPartitionConnectionParameterDict()
self.assertEqual(
'https://[%s]:8888/tree' % self.getPartitionIPv6(self.getPartitionId("jupyter")),
param_dict['jupyter-url']
)
result = requests.get(
param_dict['jupyter-url'], verify=False, allow_redirects=False)
self.assertEqual(
[requests.codes.found, True, '/login?next=%2Ftree'],
[result.status_code, result.is_redirect, result.headers['Location']]
)
class TestBalancerPorts(ERP5InstanceTestCase):
"""Instantiate with two zope families, this should create for each family:
- a balancer entry point with corresponding haproxy
- a balancer entry point for test runner
and no frontend at all, because more than one family exist.
"""
__partition_reference__ = 'ap'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps({
"zope-partition-dict": {
"family1": {
"instance-count": 3,
"family": "family1"
},
"family2": {
"instance-count": 5,
"family": "family2"
},
},
})
}
def checkValidHTTPSURL(self, url):
parsed = urllib.parse.urlparse(url)
self.assertEqual(parsed.scheme, 'https')
self.assertTrue(parsed.hostname)
self.assertTrue(parsed.port)
def test_published_family_parameters(self):
# when we request two families, we have two published family-{family_name} URLs
param_dict = self.getRootPartitionConnectionParameterDict()
for family_name in ('family1', 'family2'):
self.checkValidHTTPSURL(
param_dict[f'family-{family_name}'])
self.checkValidHTTPSURL(
param_dict[f'family-{family_name}-v6'])
# ports are allocated in alphabetical order and are "stable", ie. is not supposed
# to change after updating software release, because there is typically a rapid-cdn
# frontend pointing to this port.
self.assertEqual(urllib.parse.urlparse(param_dict['family-family1']).port, 2152)
self.assertEqual(urllib.parse.urlparse(param_dict['family-family1-v6']).port, 2152)
self.assertEqual(urllib.parse.urlparse(param_dict['family-family2']).port, 2155)
self.assertEqual(urllib.parse.urlparse(param_dict['family-family2-v6']).port, 2155)
def test_published_test_runner_url(self):
# each family's also a list of test test runner URLs, by default 3 per family
param_dict = self.getRootPartitionConnectionParameterDict()
for family_name in ('family1', 'family2'):
family_test_runner_url_list = param_dict[
f'{family_name}-test-runner-url-list']
self.assertEqual(3, len(family_test_runner_url_list))
self.assertEqual(3, len(set(family_test_runner_url_list)))
for url in family_test_runner_url_list:
self.checkValidHTTPSURL(url)
def test_zope_listen(self):
# we requested 3 zope in family1 and 5 zopes in family2, we should have 8 zope running.
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
self.assertEqual(
3 + 5,
len([p for p in all_process_info if p['name'].startswith('zope-')]))
def test_haproxy_listen(self):
# We have 2 families, haproxy should listen to a total of 3 ports per family
# normal access on ipv4 and ipv6 and test runner access on ipv4 only
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
process_info, = (p for p in all_process_info if p['name'].startswith('haproxy-'))
haproxy_master_process = psutil.Process(process_info['pid'])
haproxy_worker_process, = haproxy_master_process.children()
self.assertEqual(
sorted([socket.AF_INET] * 4 + [socket.AF_INET6] * 2),
sorted(
c.family
for c in haproxy_worker_process.connections()
if c.status == 'LISTEN'
))
def test_no_frontend_request(self):
with open(os.path.join(self.computer_partition_root_path,
'.installed-switch-softwaretype.cfg')) as f:
installed = zc.buildout.configparser.parse(f, 'installed')
self.assertFalse(
[section for section in installed if 'request-frontend' in section])
self.assertFalse(
[
param for param in self.getRootPartitionConnectionParameterDict()
if 'frontend' in param
])
class TestBalancerPortsStable(ERP5InstanceTestCase):
"""Instantiate with two one families and a frontend, then
re-request with one more family and one more frontend, the ports
should not change
"""
__partition_reference__ = 'ap'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps(
{
"frontend": {
"zzz": {
"zope-family": "zzz"
}
},
"zope-partition-dict": {
"zzz": {
"instance-count": 1,
"family": "zzz"
},
},
})
}
def test_same_balancer_ports_when_adding_zopes_or_frontends(self):
param_dict_before = self.getRootPartitionConnectionParameterDict()
balancer_param_dict_before = json.loads(
self.getComputerPartition('balancer').getConnectionParameter('_'))
# re-request with one more frontend and one more backend, that are before
# the existing ones when sorting alphabetically
instance_parameter_dict = json.loads(self.getInstanceParameterDict()['_'])
instance_parameter_dict['frontend']['aaa'] = {"zope-family": "aaa"}
instance_parameter_dict['zope-partition-dict']['aaa'] = {
"instance-count": 2,
"family": "aaa"
}
def rerequest():
return self.slap.request(
software_release=self.getSoftwareURL(),
software_type=self.getInstanceSoftwareType(),
partition_reference=self.default_partition_reference,
partition_parameter_kw={'_': json.dumps(instance_parameter_dict)},
state='started')
rerequest()
self.slap.waitForInstance(max_retry=10)
param_dict_after = json.loads(rerequest().getConnectionParameterDict()['_'])
balancer_param_dict_after = json.loads(
self.getComputerPartition('balancer').getConnectionParameter('_'))
self.assertEqual(param_dict_before['family-zzz-v6'], param_dict_after['family-zzz-v6'])
self.assertEqual(param_dict_before['url-frontend-zzz'], param_dict_after['url-frontend-zzz'])
self.assertEqual(balancer_param_dict_before['url-backend-zzz'], balancer_param_dict_after['url-backend-zzz'])
self.assertNotEqual(param_dict_before['family-zzz-v6'], param_dict_after['family-aaa-v6'])
self.assertNotEqual(param_dict_before['url-frontend-zzz'], param_dict_after['url-frontend-aaa'])
self.assertNotEqual(balancer_param_dict_before['url-backend-zzz'], balancer_param_dict_after['url-backend-aaa'])
class TestSeleniumTestRunner(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test ERP5 can be instantiated with selenium server for test runner.
"""
__partition_reference__ = 'sel'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps({
'test-runner': {
'selenium': {
"target": "selenium-server",
"server-url": "https://example.com",
"verify-server-certificate": False,
"desired-capabilities": {
"browserName": "firefox",
"version": "68.0.2esr",
}
}
}
})
}
def test_test_runner_configuration_json_file(self):
runUnitTest_script, = glob.glob(
self.computer_partition_root_path + "/../*/bin/runUnitTest.real")
config_file = None
with open(runUnitTest_script) as f:
for line in f:
if 'ERP5_TEST_RUNNER_CONFIGURATION' in line:
_, config_file = line.split('=')
assert config_file
with open(config_file.strip()) as f:
self.assertEqual(
f.read(),
json.dumps(json.loads(self.getInstanceParameterDict()['_'])['test-runner'], sort_keys=True))
class TestDisableTestRunner(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test ERP5 can be instantiated without test runner.
"""
__partition_reference__ = 'distr'
@classmethod
def getInstanceParameterDict(cls):
return {'_': json.dumps({'test-runner': {'enabled': False}})}
def test_no_runUnitTestScript(self):
"""No runUnitTest script should be generated in any partition.
"""
# self.computer_partition_root_path is the path of root partition.
# we want to assert that no scripts exist in any partition.
bin_programs = list(map(os.path.basename,
glob.glob(self.computer_partition_root_path + "/../*/bin/*")))
self.assertTrue(bin_programs) # just to check the glob was correct.
self.assertNotIn('runUnitTest', bin_programs)
self.assertNotIn('runTestSuite', bin_programs)
def test_no_haproxy_testrunner_port(self):
# Haproxy only listen on two ports for frontend, two ports for legacy entry points
# and there is no haproxy ports allocated for test runner
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
process_info, = (p for p in all_process_info if p['name'].startswith('haproxy'))
haproxy_master_process = psutil.Process(process_info['pid'])
haproxy_worker_process, = haproxy_master_process.children()
self.assertEqual(
sorted([socket.AF_INET, socket.AF_INET6, socket.AF_INET, socket.AF_INET6]),
sorted(
c.family
for c in haproxy_worker_process.connections()
if c.status == 'LISTEN'
))
class TestZopeNodeParameterOverride(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test override zope node parameters
"""
__partition_reference__ = 'override'
__test_matrix__ = matrix((default,))
@classmethod
def getInstanceParameterDict(cls):
# The following example includes the most commonly used options,
# but not necessarily in a meaningful way.
return {'_': json.dumps({
"zodb": [{
"type": "zeo",
"server": {},
"cache-size-bytes": "20MB",
"cache-size-bytes!": [
("bb-0", 1<<20),
("bb-.*", "500MB"),
],
"pool-timeout": "10m",
"storage-dict": {
"cache-size!": [
("a-.*", "50MB"),
],
},
}],
"zope-partition-dict": {
"a": {
"instance-count": 3,
},
"bb": {
"instance-count": 5,
"port-base": 2300,
},
},
})}
def test_zope_conf(self):
zeo_addr = json.loads(
self.getComputerPartition('zodb').getConnectionParameter('_')
)["storage-dict"]["root"]["server"]
def checkParameter(line, kw):
k, v = line.split()
self.assertFalse(k.endswith('!'), k)
try:
expected = kw.pop(k)
except KeyError:
if k == 'server':
return
self.assertIsNotNone(expected)
self.assertEqual(str(expected), v)
def checkConf(zodb, storage):
zodb["mount-point"] = "/"
zodb["pool-size"] = 4
zodb["pool-timeout"] = "10m"
zodb["%import"] = "ZEO"
storage["storage"] = "root"
storage["server"] = zeo_addr
storage["server-sync"] = "true"
with open(f'{partition}/etc/zope-{zope}.conf') as f:
conf = list(map(str.strip, f.readlines()))
i = conf.index("") + 1
conf = iter(conf[i:conf.index("", i)])
for line in conf:
if line == '':
for line in conf:
if line == '':
break
checkParameter(line, storage)
for k, v in storage.items():
self.assertIsNone(v, k)
del storage
else:
checkParameter(line, zodb)
for k, v in zodb.items():
self.assertIsNone(v, k)
partition = self.getComputerPartitionPath('zope-a')
for zope in range(3):
checkConf({
"cache-size-bytes": "20MB",
}, {
"cache-size": "50MB",
})
partition = self.getComputerPartitionPath('zope-bb')
for zope in range(5):
checkConf({
"cache-size-bytes": "500MB" if zope else 1<<20,
}, {
"cache-size": None,
})
class TestWatchActivities(ERP5InstanceTestCase):
"""Tests for bin/watch_activities scripts in zope partitions.
"""
__partition_reference__ = 'wa'
def test(self):
# "watch_activities" scripts use watch command. We'll fake a watch command
# that executes the actual command only once to check the output.
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
with open(os.path.join(tmpdir, 'watch'), 'w') as f:
f.write("""#!/bin/sh
if [ "$1" != "-n" ] || [ "$2" != "5" ]
then
echo unexpected arguments: "$1" "$2"
exit 1
fi
shift
shift
exec bash -c "$@"
""")
os.fchmod(f.fileno(), 0o700)
try:
output = subprocess.check_output(
[
os.path.join(
self.getComputerPartitionPath('zope-1'),
'bin',
'watch_activities',
)
],
env=dict(os.environ,
PATH=os.pathsep.join([tmpdir, os.environ['PATH']])),
stderr=subprocess.STDOUT,
text=True,
)
except subprocess.CalledProcessError as e:
self.fail(e.output)
self.assertIn(' dict ', output)
class ZopeSkinsMixin:
"""Mixins with utility methods to test zope behaviors.
"""
@classmethod
def _setUpClass(cls):
super()._setUpClass()
cls._waitForActivities()
@classmethod
def _waitForActivities(cls, timeout=datetime.timedelta(minutes=10).total_seconds()):
"""Wait for ERP5 to be ready and have processed all activities.
"""
for _ in range(int(timeout / 5)):
with cls.getXMLRPCClient() as erp5_xmlrpc_client:
try:
if erp5_xmlrpc_client.portal_activities.countMessage() == 0:
break
except (xmlrpc.client.ProtocolError,
xmlrpc.client.Fault,
http.client.HTTPException):
pass
time.sleep(5)
else:
if cls._debug:
breakpoint()
raise AssertionError("Timeout waiting for activities")
@classmethod
def _getAuthenticatedZopeUrl(cls, path, family_name='default'):
"""Returns a URL to access a zope family through balancer,
with credentials in the URL.
path is joined with urllib.parse.urljoin to the URL of the portal.
"""
param_dict = cls.getRootPartitionConnectionParameterDict()
parsed = urllib.parse.urlparse(param_dict['family-' + family_name])
base_url = parsed._replace(
netloc='{}:{}@{}:{}'.format(
param_dict['inituser-login'],
param_dict['inituser-password'],
parsed.hostname,
parsed.port,
),
path=param_dict['site-id'] + '/',
).geturl()
return urllib.parse.urljoin(base_url, path)
@classmethod
@contextlib.contextmanager
def getXMLRPCClient(cls):
# don't verify certificate
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
erp5_xmlrpc_client = xmlrpc.client.ServerProxy(
cls._getAuthenticatedZopeUrl(''),
context=ssl_context,
)
with erp5_xmlrpc_client:
yield erp5_xmlrpc_client
@classmethod
def _addPythonScript(cls, script_id, params, body):
with cls.getXMLRPCClient() as erp5_xmlrpc_client:
custom = erp5_xmlrpc_client.portal_skins.custom
try:
custom.manage_addProduct.PythonScripts.manage_addPythonScript(
script_id)
except xmlrpc.client.ProtocolError as e:
if e.errcode != 302:
raise
getattr(custom, script_id).ZPythonScriptHTML_editAction(
'',
'',
params,
body,
)
class ZopeTestMixin(ZopeSkinsMixin, CrontabMixin):
"""Mixin class for zope features.
"""
__partition_reference__ = 'z'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps({
"zope-partition-dict": {
"default": {
"longrequest-logger-interval": 1,
"longrequest-logger-timeout": 1,
},
"multiple": {
"family": "multiple",
"instance-count": 3,
"port-base": 2210,
},
},
}),
}
@classmethod
def _setUpClass(cls):
super()._setUpClass()
cls.zope_base_url = cls._getAuthenticatedZopeUrl('')
param_dict = cls.getRootPartitionConnectionParameterDict()
cls.zope_deadlock_debugger_url = cls._getAuthenticatedZopeUrl(
'/manage_debug_threads?{deadlock-debugger-password}'.format(
**param_dict))
# a python script to verify activity processing
cls._addPythonScript(
script_id='ERP5Site_verifyActivityProcessing',
params='mode',
body='''if 1:
import json
portal = context.getPortalObject()
if mode == "count":
return json.dumps(dict(count=len(portal.portal_activities.getMessageList())))
if mode == "activate":
for _ in range(10):
portal.portal_templates.activate(activity="SQLQueue").getTitle()
return "activated"
raise ValueError("Unknown mode: %s" % mode)
''',
)
cls.zope_verify_activity_processing_url = urllib.parse.urljoin(
cls.zope_base_url,
'ERP5Site_verifyActivityProcessing',
)
# a python script logging to event log
cls._addPythonScript(
script_id='ERP5Site_logMessage',
params='name',
body='''if 1:
from erp5.component.module.Log import log
return log("hello %s" % name)
''',
)
cls.zope_log_message_url = urllib.parse.urljoin(
cls.zope_base_url,
'ERP5Site_logMessage',
)
# a python script issuing a long request
cls._addPythonScript(
script_id='ERP5Site_executeLongRequest',
params='',
body='''if 1:
import time
for _ in range(5):
time.sleep(1)
return "done"
''',
)
cls.zope_long_request_url = urllib.parse.urljoin(
cls.zope_base_url,
'ERP5Site_executeLongRequest',
)
def setUp(self):
super().setUp()
# run logrotate a first time so that it create state files
self._executeCrontabAtDate('logrotate', '2000-01-01')
def tearDown(self):
super().tearDown()
# reset logrotate status
logrotate_status = os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'logrotate.status',
)
if os.path.exists(logrotate_status):
os.unlink(logrotate_status)
for logfile in glob.glob(
os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'backup',
'logrotate',
'*',
)):
os.unlink(logfile)
for logfile in glob.glob(
os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'monitor',
'private',
'documents',
'*',
)):
os.unlink(logfile)
def _getCrontabCommand(self, crontab_name: str) -> str:
"""Read a crontab and return the command that is executed.
overloaded to use crontab from zope partition
"""
with open(
os.path.join(
self.getComputerPartitionPath('zope-default'),
'etc',
'cron.d',
crontab_name,
)) as f:
crontab_spec, = f.readlines()
self.assertNotEqual(crontab_spec[0], '@', crontab_spec)
return crontab_spec.split(None, 5)[-1]
def test_event_log_rotation(self):
requests.get(
self.zope_log_message_url,
params={
"name": "world"
},
verify=False,
).raise_for_status()
zope_event_log_path = os.path.join(
self.getComputerPartitionPath('zope-default'),
'var',
'log',
'zope-0-event.log',
)
with open(zope_event_log_path) as f:
self.assertIn('hello world', f.read())
self._executeCrontabAtDate('logrotate', '2050-01-01')
# this logrotate leaves the log for the day as non compressed
rotated_log_file = os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'backup',
'logrotate',
'zope-0-event.log-20500101',
)
with open(rotated_log_file) as f:
self.assertIn('hello world', f.read())
requests.get(
self.zope_log_message_url,
params={
"name": "le monde"
},
verify=False,
).raise_for_status()
with open(zope_event_log_path) as f:
self.assertNotIn('hello world', f.read())
with open(zope_event_log_path) as f:
self.assertIn('hello le monde', f.read())
# on next day execution of logrotate, log files are compressed
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
def test_access_log_rotation(self):
requests.get(
self.zope_base_url,
verify=False,
headers={
'User-Agent': 'before rotation'
},
).raise_for_status()
zope_access_log_path = os.path.join(
self.getComputerPartitionPath('zope-default'),
'var',
'log',
'zope-0-Z2.log',
)
with open(zope_access_log_path) as f:
self.assertIn('before rotation', f.read())
self._executeCrontabAtDate('logrotate', '2050-01-01')
# this logrotate leaves the log for the day as non compressed
rotated_log_file = os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'backup',
'logrotate',
'zope-0-Z2.log-20500101',
)
with open(rotated_log_file) as f:
self.assertIn('before rotation', f.read())
requests.get(
self.zope_base_url,
verify=False,
headers={
'User-Agent': 'after rotation'
},
).raise_for_status()
with open(zope_access_log_path) as f:
self.assertNotIn('before rotation', f.read())
with open(zope_access_log_path) as f:
self.assertIn('after rotation', f.read())
# on next day execution of logrotate, log files are compressed
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
def test_long_request_log_rotation(self):
requests.get(self.zope_long_request_url,
verify=False,
params={
'when': 'before rotation'
}).raise_for_status()
zope_long_request_log_path = os.path.join(
self.getComputerPartitionPath('zope-default'),
'var',
'log',
'longrequest_logger_zope-0.log',
)
with open(zope_long_request_log_path) as f:
self.assertIn('before rotation', f.read())
self._executeCrontabAtDate('logrotate', '2050-01-01')
# this logrotate leaves the log for the day as non compressed
rotated_log_file = os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'backup',
'logrotate',
'longrequest_logger_zope-0.log-20500101',
)
with open(rotated_log_file) as f:
self.assertIn('before rotation', f.read())
requests.get(
self.zope_long_request_url,
verify=False,
params={
'when': 'after rotation'
},
).raise_for_status()
with open(zope_long_request_log_path) as f:
self.assertNotIn('before rotation', f.read())
with open(zope_long_request_log_path) as f:
self.assertIn('after rotation', f.read())
# on next day execution of logrotate, log files are compressed
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
def test_neo_root_log_rotation(self):
zope_neo_root_log_path = os.path.join(
self.getComputerPartitionPath('zope-default'),
'var',
'log',
'zope-0-neo-root.log',
)
if not self.isNEO():
self.assertFalse(os.path.exists(zope_neo_root_log_path))
return
def check_sqlite_log(path):
with contextlib.closing(sqlite3.connect(path)) as con:
con.execute('select * from log')
check_sqlite_log(zope_neo_root_log_path)
self._executeCrontabAtDate('logrotate', '2050-01-01')
rotated_log_file = os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'backup',
'logrotate',
'zope-0-neo-root.log-20500101',
)
check_sqlite_log(rotated_log_file)
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
requests.get(self._getAuthenticatedZopeUrl('/'), verify=False).raise_for_status()
check_sqlite_log(zope_neo_root_log_path)
def test_basic_authentication_user_in_access_log(self):
param_dict = self.getRootPartitionConnectionParameterDict()
requests.get(self.zope_base_url,
verify=False,
auth=requests.auth.HTTPBasicAuth(
param_dict['inituser-login'],
param_dict['inituser-password'],
)).raise_for_status()
zope_access_log_path = os.path.join(
self.getComputerPartitionPath('zope-default'),
'var',
'log',
'zope-0-Z2.log',
)
with open(zope_access_log_path) as f:
self.assertIn(param_dict['inituser-login'], f.read())
def test_deadlock_debugger(self):
dump_response = requests.get(
self.zope_deadlock_debugger_url,
verify=False,
)
dump_response.raise_for_status()
self.assertIn('Thread ', dump_response.text)
def test_activity_processing(self):
def wait_for_activities(max_retries):
for retry in range(max_retries):
time.sleep(10)
resp = requests.get(
self.zope_verify_activity_processing_url,
params={
'mode': 'count',
'retry': retry,
},
verify=False,
)
if not resp.ok:
# XXX we start by flushing existing activities from site creation
# and initial upgrader run. During this time it may happen that
# ERP5 replies with site errors, we tolerate these errors and only
# check the final state.
continue
count = resp.json()['count']
if not count:
break
else:
self.assertEqual(count, 0)
wait_for_activities(60)
requests.get(
self.zope_verify_activity_processing_url,
params={
'mode': 'activate'
},
verify=False,
).raise_for_status()
wait_for_activities(10)
def test_multiple_zope_family_log_files(self):
logfiles = [
os.path.basename(p) for p in glob.glob(
os.path.join(
self.getComputerPartitionPath('zope-multiple'), 'var', 'log', '*'))
]
self.assertEqual(
sorted([l for l in logfiles if l.startswith('zope')]), [
'zope-0-Z2.log',
'zope-0-event.log',
'zope-0-neo-root.log',
'zope-1-Z2.log',
'zope-1-event.log',
'zope-1-neo-root.log',
'zope-2-Z2.log',
'zope-2-event.log',
'zope-2-neo-root.log',
] if self.isNEO() else [
'zope-0-Z2.log',
'zope-0-event.log',
'zope-1-Z2.log',
'zope-1-event.log',
'zope-2-Z2.log',
'zope-2-event.log',
])
class TestZopeWSGI(ZopeTestMixin, ERP5InstanceTestCase):
pass
class TestZopePublisherTimeout(ZopeSkinsMixin, ERP5InstanceTestCase):
__partition_reference__ = 't'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps({
# a default timeout of 3
"publisher-timeout": 3,
# and a family without timeout
"family-override": {
"no-timeout": {
"publisher-timeout": None,
},
},
"zope-partition-dict": {
# a family to process activities, so that our test
# does not hit a zope node processing activities
"activity": {
"family": "activity",
},
"default": {
"family": "default",
"port-base": 2210,
},
"no-timeout": {
"family": "no-timeout",
"port-base": 22220,
},
},
})
}
@classmethod
def _setUpClass(cls):
super()._setUpClass()
cls._addPythonScript(
'ERP5Site_doSlowRequest',
'',
'''if 1:
import time
def recurse(o):
time.sleep(0.1)
for sub in o.objectValues():
recurse(sub)
recurse(context.getPortalObject())
'''
)
def test_long_request_interupted_on_default_family(self):
ret = requests.get(self._getAuthenticatedZopeUrl(
'ERP5Site_doSlowRequest', family_name='default'), verify=False)
self.assertIn('TimeoutReachedError', ret.text)
self.assertEqual(ret.status_code, requests.codes.server_error)
def test_long_request_not_interupted_on_no_timeout_family(self):
with self.assertRaises(requests.exceptions.Timeout):
requests.get(
self._getAuthenticatedZopeUrl('ERP5Site_doSlowRequest', family_name='no-timeout'),
verify=False,
timeout=6)
class TestCloudooo(ZopeSkinsMixin, ERP5InstanceTestCase):
"""Test ERP5 can be instantiated with cloudooo parameters
"""
__partition_reference__ = 'c'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps({
'cloudooo-url-list': [
'https://cloudooo1.example.com/',
'https://cloudooo2.example.com/',
],
'cloudooo-retry-count': 123,
})
}
def test_cloudooo_url_list_preference(self):
self.assertEqual(
requests.get(
self._getAuthenticatedZopeUrl(
'portal_preferences/getPreferredDocumentConversionServerUrlList'),
verify=False).text,
"['https://cloudooo1.example.com/', 'https://cloudooo2.example.com/']")
@unittest.expectedFailure # setting "retry" is not implemented
def test_cloudooo_retry_count_preference(self):
self.assertEqual(
requests.get(
self._getAuthenticatedZopeUrl(
'portal_preferences/getPreferredDocumentConversionServerRetry'),
verify=False).text,
"123")
class TestCloudoooDefaultParameter(ZopeSkinsMixin, ERP5InstanceTestCase):
"""Test default ERP5 cloudooo parameters
"""
__partition_reference__ = 'cd'
def test_cloudooo_url_list_preference(self):
self.assertIn(
requests.get(
self._getAuthenticatedZopeUrl(
'portal_preferences/getPreferredDocumentConversionServerUrlList'),
verify=False).text,
[
"['https://cloudooo1.erp5.net/', 'https://cloudooo.erp5.net/']",
"['https://cloudooo.erp5.net/', 'https://cloudooo1.erp5.net/']",
])
@unittest.expectedFailure # default value of "retry" does not match schema
def test_cloudooo_retry_count_preference(self):
self.assertEqual(
requests.get(
self._getAuthenticatedZopeUrl(
'portal_preferences/getPreferredDocumentConversionServerRetry'),
verify=False).text,
"2")
class TestNEO(ZopeSkinsMixin, CrontabMixin, ERP5InstanceTestCase):
"""Tests specific to neo storage
"""
__partition_reference__ = 'n'
__test_matrix__ = matrix((neo,))
def _getCrontabCommand(self, crontab_name: str) -> str:
"""Read a crontab and return the command that is executed.
overloaded to use crontab from neo partition
"""
with open(
os.path.join(
self.getComputerPartitionPath('neo-0'),
'etc',
'cron.d',
crontab_name,
)) as f:
crontab_spec, = f.readlines()
self.assertNotEqual(crontab_spec[0], '@', crontab_spec)
return crontab_spec.split(None, 5)[-1]
def test_log_rotation(self):
# first run to create state files
self._executeCrontabAtDate('logrotate', '2000-01-01')
def check_sqlite_log(path):
with self.subTest(path), contextlib.closing(sqlite3.connect(path)) as con:
con.execute('select * from log')
logfiles = ('neoadmin.log', 'neomaster.log', 'neostorage-0.log')
for f in logfiles:
check_sqlite_log(
os.path.join(
self.getComputerPartitionPath('neo-0'),
'var',
'log',
f))
self._executeCrontabAtDate('logrotate', '2050-01-01')
for f in logfiles:
check_sqlite_log(
os.path.join(
self.getComputerPartitionPath('neo-0'),
'srv',
'backup',
'logrotate',
f'{f}-20500101'))
self._executeCrontabAtDate('logrotate', '2050-01-02')
requests.get(self._getAuthenticatedZopeUrl('/'), verify=False).raise_for_status()
for f in logfiles:
check_sqlite_log(
os.path.join(
self.getComputerPartitionPath('neo-0'),
'var',
'log',
f))
class TestPassword(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
__partition_reference__ = 'p'
def test_no_plain_text_password_in_files(self):
inituser_password = self.getRootPartitionConnectionParameterDict()[
'inituser-password'].encode()
self.assertFalse(
[f for f in pathlib.Path(self.slap._instance_root).glob('**/*')
if f.is_file() and inituser_password in f.read_bytes()])
# the hashed password is present in some files
inituser_password_hashed = self.getRootPartitionConnectionParameterDict()[
'inituser-password-hashed'].encode()
self.assertTrue(
[f for f in pathlib.Path(self.slap._instance_root).glob('**/*')
if f.is_file() and inituser_password_hashed in f.read_bytes()])
class TestWithMaxRlimitNofileParameter(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test setting the with-max-rlimit-nofile parameter sets the open fd soft limit to the hard limit.
"""
__partition_reference__ = 'nf'
@classmethod
def getInstanceParameterDict(cls):
return {'_': json.dumps({'with-max-rlimit-nofile': True})}
def test_with_max_rlimit_nofile(self):
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
_, current_hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
process_info, = (p for p in all_process_info if p['name'].startswith('zope-'))
self.assertEqual(
resource.prlimit(process_info['pid'], resource.RLIMIT_NOFILE),
(current_hard_limit, current_hard_limit))
class TestUnsetWithMaxRlimitNofileParameter(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test not setting the with-max-rlimit-nofile parameter doesn't change the soft limit of erp5
"""
__partition_reference__ = 'nnf'
def test_unset_with_max_rlimit_nofile(self) -> None:
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
limit = resource.getrlimit(resource.RLIMIT_NOFILE)
process_info, = (p for p in all_process_info if p['name'].startswith('zope-'))
self.assertEqual(
resource.prlimit(process_info['pid'], resource.RLIMIT_NOFILE), limit)
class TestFrontend(ERP5InstanceTestCase):
__partition_reference__ = 'f'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps(
{
"zope-partition-dict": {
"backoffice": {
"family": "default",
},
"web": {
"family": "web",
"port-base": 2300,
},
"activities": {
# this family will not have frontend
"family": "activities",
"port-base": 2400,
},
},
"frontend": {
"backoffice": {
"zope-family": "default",
},
"website": {
"zope-family": "web",
"internal-path": "/%(site-id)s/web_site_module/my_website",
"instance-parameters": {
# some extra frontend parameters
"enable_cache": "true",
}
}
},
"sla-dict": {
"computer_guid=COMP-1234": ["frontend-backoffice"]
}
})
}
def test_frontend_url_published(self):
param_dict = self.getRootPartitionConnectionParameterDict()
requests.get(
param_dict['url-frontend-backoffice'],
verify=False,
allow_redirects=False,
)
requests.get(
param_dict['url-frontend-website'],
verify=False,
allow_redirects=False,
)
def test_request_parameters(self):
param_dict = self.getRootPartitionConnectionParameterDict()
balancer_param_dict = json.loads(
self.getComputerPartition('balancer').getConnectionParameter('_'))
with open(os.path.join(self.computer_partition_root_path,
'.installed-switch-softwaretype.cfg')) as f:
installed = zc.buildout.configparser.parse(f, 'installed')
self.assertEqual(
installed['request-frontend-backoffice']['config-type'], '')
self.assertEqual(
installed['request-frontend-backoffice']['shared'], 'true')
self.assertEqual(
installed['request-frontend-backoffice']['config-url'],
balancer_param_dict['url-backend-backoffice'])
self.assertNotIn('config-path', installed['request-frontend-backoffice'])
self.assertEqual(
installed['request-frontend-backoffice']['sla-computer_guid'],
'COMP-1234')
self.assertEqual(
installed['request-frontend-backoffice']['software-url'],
'http://git.erp5.org/gitweb/slapos.git/blob_plain/HEAD:/software/apache-frontend/software.cfg'
)
self.assertEqual(
installed['request-frontend-backoffice']['connection-secure_access'],
param_dict['url-frontend-backoffice'])
self.assertEqual(
installed['request-frontend-website']['config-type'], '')
# no SLA by default
self.assertFalse([k for k in installed['request-frontend-website'] if k.startswith('sla-')])
# instance parameters are propagated
self.assertEqual(
installed['request-frontend-website']['config-enable_cache'], 'true')
self.assertEqual(
installed['request-frontend-website']['config-url'],
balancer_param_dict['url-backend-website'])
self.assertNotIn('config-path', installed['request-frontend-website'])
self.assertEqual(
installed['request-frontend-website']['connection-secure_access'],
param_dict['url-frontend-website'])
# no frontend was requested for activities family
self.assertNotIn('request-frontend-activities', installed)
self.assertNotIn('url-frontend-activities', param_dict)
self.assertNotIn('url-backend-activities', balancer_param_dict)
def test_path_virtualhost(self):
balancer_param_dict = json.loads(
self.getComputerPartition('balancer').getConnectionParameter('_'))
found_line = False
retries = 10
while retries:
requests.get(balancer_param_dict['url-backend-website'], verify=False)
for logfile in glob.glob(os.path.join(self.getComputerPartitionPath('zope-web'), 'var/log/*Z2.log')):
with open(logfile) as f:
for line in f:
if 'GET /VirtualHost' in line:
found_line = True
break
if found_line:
break
time.sleep(1)
retries = retries - 1
self.assertTrue(found_line)
percent_encoded_netloc = urllib.parse.quote(
urllib.parse.urlparse(
balancer_param_dict['url-backend-website']).netloc)
self.assertIn(
f'/VirtualHostBase/https/{percent_encoded_netloc}/erp5/web_site_module/my_website/VirtualHostRoot/ HTTP', line)
class TestDefaultFrontendWithZopePartitionDict(ERP5InstanceTestCase):
"""Default frontend also is requested when only one zope family
is defined, but on multiple partitions
"""
__partition_reference__ = 'fzpd'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps(
{
"zope-partition-dict": {
"backoffice-0": {
"family": "backoffice",
},
"backoffice-1": {
"family": "backoffice",
}
}
}
)
}
def test_frontend_requested(self):
param_dict = self.getRootPartitionConnectionParameterDict()
balancer_param_dict = json.loads(
self.getComputerPartition('balancer').getConnectionParameter('_'))
with open(os.path.join(self.computer_partition_root_path,
'.installed-switch-softwaretype.cfg')) as f:
installed = zc.buildout.configparser.parse(f, 'installed')
self.assertEqual(
installed['request-frontend-default']['config-url'],
balancer_param_dict['url-backend-default'])
requests.get(
param_dict['url-frontend-default'],
verify=False,
allow_redirects=False,
)
slapos-master-software-erp5-test/software/erp5/test/test/test_mariadb.py 0000664 0000000 0000000 00000031771 14601336532 0027015 0 ustar 00root root 0000000 0000000 ##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import contextlib
import datetime
import glob
import gzip
import json
import lzma
import os
import subprocess
import urllib.parse
import MySQLdb
import MySQLdb.connections
from slapos.testing.utils import CrontabMixin, getPromisePluginParameterDict
from . import ERP5InstanceTestCase, default, matrix, setUpModule
setUpModule # pyflakes
class MariaDBTestCase(ERP5InstanceTestCase):
"""Base test case for mariadb tests.
"""
__partition_reference__ = 'm'
# We explicitly specify 'mariadb' as our software type here,
# therefore we don't request ZODB. We therefore don't
# need to run these tests with both NEO and ZEO mode,
# it wouldn't make any difference.
# https://lab.nexedi.com/nexedi/slapos/blob/273037c8/stack/erp5/instance.cfg.in#L216-230
__test_matrix__ = matrix((default,))
@classmethod
def getInstanceSoftwareType(cls):
return "mariadb"
@classmethod
def _getInstanceParameterDict(cls) -> dict:
return {
'tcpv4-port': 3306,
'max-connection-count': 5,
'long-query-time': 3,
'max-slowqueries-threshold': 1,
'slowest-query-threshold': 0.1,
# XXX what is this ? should probably not be needed here
'name': cls.__name__,
'monitor-passwd': 'secret',
# XXX should probably not be needed here
'computer-memory-percent-threshold': 100,
}
@classmethod
def getInstanceParameterDict(cls) -> dict:
return {'_': json.dumps(cls._getInstanceParameterDict())}
def getDatabaseConnection(self) -> MySQLdb.connections.Connection:
connection_parameter_dict = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])
db_url = urllib.parse.urlparse(connection_parameter_dict['database-list'][0])
self.assertEqual('mysql', db_url.scheme)
self.assertTrue(db_url.path.startswith('/'))
database_name = db_url.path[1:]
return MySQLdb.connect(
user=db_url.username,
passwd=db_url.password,
host=db_url.hostname,
port=db_url.port,
db=database_name,
use_unicode=True,
charset='utf8mb4'
)
class TestCrontabs(MariaDBTestCase, CrontabMixin):
_save_instance_file_pattern_list = \
MariaDBTestCase._save_instance_file_pattern_list + (
'*/srv/backup/*',
)
def test_full_backup(self) -> None:
self._executeCrontabAtDate('mariadb-backup', '2050-01-01')
full_backup_file, = glob.glob(
os.path.join(
self.computer_partition_root_path,
'srv',
'backup',
'mariadb-full',
'205001010000??.sql.gz',
))
with gzip.open(full_backup_file, 'rt') as dump:
self.assertIn('CREATE TABLE', dump.read())
def test_logrotate_and_slow_query_digest(self) -> None:
# slow query digest needs to run after logrotate, since it operates on the rotated
# file, so this tests both logrotate and slow query digest.
# run logrotate a first time so that it create state files
self._executeCrontabAtDate('logrotate', '2000-01-01')
# make two slow queries. We are using long-query-time=3, so the queries
# must take more than 3 seconds to be logged.
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("SELECT SLEEP(3.1)")
cnx.store_result()
cnx.query("SELECT SLEEP(3.2)")
# slow query crontab depends on crontab for log rotation
# to be executed first.
self._executeCrontabAtDate('logrotate', '2050-01-01')
# this logrotate leaves the log for the day as non compressed
rotated_log_file = os.path.join(
self.computer_partition_root_path,
'srv',
'backup',
'logrotate',
'mariadb_slowquery.log-20500101',
)
self.assertTrue(os.path.exists(rotated_log_file))
# then crontab to generate slow query report is executed
self._executeCrontabAtDate('generate-mariadb-slow-query-report', '2050-01-01')
# and it creates a report for the day
slow_query_report = os.path.join(
self.computer_partition_root_path,
'srv',
'monitor',
'private',
'slowquery_digest',
'slowquery_digest.txt-2050-01-01.xz',
)
with lzma.open(slow_query_report, 'rt') as f:
# this is the hash for our "select sleep(n)" slow query
self.assertIn("ID 0xF9A57DD5A41825CA", f.read())
# on next day execution of logrotate, log files are compressed
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
# there's a promise checking that the threshold is not exceeded
# and it reports a problem since we set a threshold of 1 slow query
check_slow_query_promise_plugin = getPromisePluginParameterDict(
os.path.join(
self.computer_partition_root_path,
'etc',
'plugin',
'check-slow-query-pt-digest-result.py',
))
with self.assertRaises(subprocess.CalledProcessError) as error_context:
subprocess.check_output(
'faketime 2050-01-01 %s' % check_slow_query_promise_plugin['command'],
text=True,
shell=True)
self.assertEqual(
error_context.exception.output,
"Threshold is lower than expected: \n"
"Expected total queries : 1.0 and current is: 2\n"
"Expected slowest query : 0.1 and current is: 3\n",
)
class TestMariaDB(MariaDBTestCase):
def test_utf8_collation(self) -> None:
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query(
"""
CREATE TABLE test_utf8_collation (
col1 CHAR(10)
)
""")
cnx.store_result()
cnx.query(
"""
insert into test_utf8_collation values ("à"), ("あ")
""")
cnx.store_result()
cnx.query(
"""
select * from test_utf8_collation where col1 = "a"
""")
self.assertEqual((('à',),), cnx.store_result().fetch_row(maxrows=2))
class TestMroonga(MariaDBTestCase):
def test_mroonga_plugin_loaded(self) -> None:
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("show plugins")
plugins = cnx.store_result().fetch_row(maxrows=1000)
self.assertIn(
('Mroonga', 'ACTIVE', 'STORAGE ENGINE', 'ha_mroonga.so', 'GPL'),
plugins)
def test_mroonga_normalize_udf(self) -> None:
# example from https://mroonga.org/docs/reference/udf/mroonga_normalize.html#usage
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query(
"""
SELECT mroonga_normalize("ABCDあぃうぇ㍑")
""")
# XXX this is returned as bytes by mroonga/mariadb (this might be a bug)
self.assertEqual((('abcdあぃうぇリットル'.encode(),),),
cnx.store_result().fetch_row(maxrows=2))
if 0:
# this example fail with:
# OperationalError: (1123, "Can't initialize function 'mroonga_normalize'; mroonga_normalize(): nonexistent normalizer NormalizerMySQLUnicodeCIExceptKanaCI")
# same error on mroonga "official" docker images using mysql
# https://hub.docker.com/layers/groonga/mroonga/latest/images/sha256-e5a979801c95544ca3a1228d2c4d819820850e0162649553f2e94850e5e1c988?context=explore
# so it's probably OK to ignore
cnx.query(
"""
SELECT mroonga_normalize("aBcDあぃウェ㍑", "NormalizerMySQLUnicodeCIExceptKanaCIKanaWithVoicedSoundMark")
""")
self.assertEqual((('ABCDあぃうぇ㍑'.encode(),),),
cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_normalizer(self) -> None:
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-normalizer
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("SET NAMES utf8")
cnx.store_result()
cnx.query(
"""
CREATE TABLE diaries (
day DATE PRIMARY KEY,
content VARCHAR(64) NOT NULL,
FULLTEXT INDEX (content) COMMENT 'normalizer "NormalizerAuto"'
) Engine=Mroonga DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
""")
cnx.store_result()
cnx.query(
"""INSERT INTO diaries VALUES ("2013-04-23", "ブラックコーヒーを飲んだ。")""")
cnx.store_result()
cnx.query(
"""
SELECT *
FROM diaries
WHERE MATCH (content) AGAINST ("+ふらつく" IN BOOLEAN MODE)
""")
self.assertEqual((), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
SELECT *
FROM diaries
WHERE MATCH (content) AGAINST ("+ブラック" IN BOOLEAN MODE)
""")
self.assertEqual(
((datetime.date(2013, 4, 23), 'ブラックコーヒーを飲んだ。'),),
cnx.store_result().fetch_row(maxrows=2),
)
def test_mroonga_full_text_normalizer_TokenBigramSplitSymbolAlphaDigit(self) -> None:
# Similar to as ERP5's testI18NSearch with erp5_full_text_mroonga_catalog
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query(
"""
CREATE TABLE `full_text` (
`uid` BIGINT UNSIGNED NOT NULL,
`SearchableText` MEDIUMTEXT,
PRIMARY KEY (`uid`),
FULLTEXT `SearchableText` (`SearchableText`) COMMENT 'parser "TokenBigramSplitSymbolAlphaDigit"'
) ENGINE=mroonga
""")
cnx.store_result()
cnx.query(
"""
INSERT INTO full_text VALUES
(1, "Gabriel Fauré Quick brown fox jumps over the lazy dog"),
(2, "武者小路 実篤 Slow white fox jumps over the diligent dog."),
(3, "( - + )")""")
cnx.store_result()
cnx.query(
"""
SELECT uid
FROM full_text
WHERE MATCH (`full_text`.`SearchableText`) AGAINST ('*D+ Faure' IN BOOLEAN MODE)
""")
self.assertEqual(((1,),), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
SELECT uid
FROM full_text
WHERE MATCH (`full_text`.`SearchableText`) AGAINST ('*D+ 武者' IN BOOLEAN MODE)
""")
self.assertEqual(((2,),), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
SELECT uid
FROM full_text
WHERE MATCH (`full_text`.`SearchableText`) AGAINST ('*D+ +quick +fox +dog' IN BOOLEAN MODE)
""")
self.assertEqual(((1,),), cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_stem(self) -> None:
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-token-filters
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("SELECT mroonga_command('register token_filters/stem')")
self.assertEqual(((b'true',),), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
CREATE TABLE memos (
id INT NOT NULL PRIMARY KEY,
content TEXT NOT NULL,
FULLTEXT INDEX (content) COMMENT 'normalizer "NormalizerAuto", token_filters "TokenFilterStem"'
) Engine=Mroonga DEFAULT CHARSET=utf8
""")
cnx.store_result()
cnx.query(
"""INSERT INTO memos VALUES (1, "I develop Groonga"), (2, "I'm developing Groonga"), (3, "I developed Groonga")"""
)
cnx.store_result()
cnx.query(
"""
SELECT *
FROM memos
WHERE MATCH (content) AGAINST ("+develops" IN BOOLEAN MODE)
""")
self.assertEqual([
(1, "I develop Groonga"),
(2, "I'm developing Groonga"),
(3, "I developed Groonga"),
], list(sorted(cnx.store_result().fetch_row(maxrows=4))))
slapos-master-software-erp5-test/software/erp5/test/test/test_wcfs.py 0000664 0000000 0000000 00000006147 14601336532 0026357 0 ustar 00root root 0000000 0000000 # Copyright (C) 2022 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
import json
import os.path
import unittest
from slapos.grid.utils import md5digest
from . import ERP5InstanceTestCase
from . import setUpModule as _setUpModule
from .test_erp5 import TestPublishedURLIsReachableMixin
# skip tests when software release is built with wendelin.core 1.
def setUpModule():
_setUpModule()
cls = ERP5InstanceTestCase
if not os.path.exists(
os.path.join(
cls.slap.software_directory,
md5digest(cls.getSoftwareURL()),
'bin', 'wcfs')):
raise unittest.SkipTest("built with wendelin.core 1")
class TestWCFS(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test Wendelin Core File System
"""
__partition_reference__ = 'wcfs'
# Only run in ZEO mode; don't run with NEO.
# Current NEO/py and NEO/go versions have interoperability
# issues. Once these issues are fixed the following
# lines have to be removed so that test case runs agains NEO.
# Please see the following MR for more context:
# https://lab.nexedi.com/nexedi/slapos/merge_requests/1283#note_174854
@classmethod
def setUpClass(cls):
if json.loads(cls.getInstanceParameterDict()["_"])['zodb'][0]["type"] == "neo":
raise unittest.SkipTest("Not yet fixed WCFS+NEO interoperability issue.")
super().setUpClass()
@classmethod
def getInstanceParameterDict(cls):
return {'_': json.dumps({'wcfs': {'enable': True}})}
def test_wcfs_accessible(self):
"""Verify that wcfs filesystem is basically accessible.
- we can read .wcfs/zurl
- its content is equal to published `serving-zurl`
"""
zurl = json.loads(
self.getComputerPartition('wcfs').getConnectionParameter('_')
)['serving-zurl']
mntpt = lookupMount(zurl)
zurl_ = readfile("%s/.wcfs/zurl" % mntpt)
self.assertEqual(zurl_, zurl)
# lookupMount returns /proc/mount entry for wcfs mounted to serve zurl.
def lookupMount(zurl):
for line in readfile('/proc/mounts').splitlines():
# fuse.wcfs ...
zurl_, mntpt, typ, _ = line.split(None, 3)
if typ != 'fuse.wcfs':
continue
if zurl_ == zurl:
return mntpt
raise KeyError("lookup mount %s: no /proc/mounts entry" % zurl)
# readfile returns content of file @path.
def readfile(path):
with open(path) as f:
return f.read()