Commit 341d42e3 authored by Jérome Perrin's avatar Jérome Perrin

software/erp5/test: move caucase helper classes in slapos.core

parent f825e6ef
...@@ -41,7 +41,7 @@ setup(name=name, ...@@ -41,7 +41,7 @@ setup(name=name,
url="https://lab.nexedi.com/nexedi/slapos", url="https://lab.nexedi.com/nexedi/slapos",
packages=find_packages(), packages=find_packages(),
install_requires=[ install_requires=[
'slapos.core', 'slapos.core[testing]',
'supervisor', 'supervisor',
'slapos.libnetworkcache', 'slapos.libnetworkcache',
'erp5.util', 'erp5.util',
......
...@@ -25,26 +25,13 @@ ...@@ -25,26 +25,13 @@
# #
############################################################################## ##############################################################################
import hashlib
import itertools import itertools
import json import json
import os import os
import shutil
import subprocess
import sys import sys
import tempfile
import time
import urllib
import requests from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from slapos.testing.testcase import ManagedResource, makeModuleSetUpAndTestCaseClass
from slapos.testing.utils import findFreeTCPPort
ERP5PY3 = os.environ['SLAPOS_SR_TEST_NAME'] == 'erp5-py3' ERP5PY3 = os.environ['SLAPOS_SR_TEST_NAME'] == 'erp5-py3'
...@@ -221,174 +208,3 @@ class ERP5InstanceTestCase(SlapOSInstanceTestCase, metaclass=ERP5InstanceTestMet ...@@ -221,174 +208,3 @@ class ERP5InstanceTestCase(SlapOSInstanceTestCase, metaclass=ERP5InstanceTestMet
def getComputerPartitionPath(cls, partition_reference): def getComputerPartitionPath(cls, partition_reference):
partition_id = cls.getComputerPartition(partition_reference).getId() partition_id = cls.getComputerPartition(partition_reference).getId()
return os.path.join(cls.slap._instance_root, partition_id) return os.path.join(cls.slap._instance_root, partition_id)
class CaucaseService(ManagedResource):
"""A caucase service.
"""
url: str = None
directory: str = None
_caucased_process: subprocess.Popen = None
def open(self) -> None:
# start a caucased and server certificate.
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
caucased_path = os.path.join(software_release_root_path, 'bin', 'caucased')
self.directory = tempfile.mkdtemp()
caucased_dir = os.path.join(self.directory, 'caucased')
os.mkdir(caucased_dir)
os.mkdir(os.path.join(caucased_dir, 'user'))
os.mkdir(os.path.join(caucased_dir, 'service'))
backend_caucased_netloc = f'{self._cls._ipv4_address}:{findFreeTCPPort(self._cls._ipv4_address)}'
self.url = 'http://' + backend_caucased_netloc
self._caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(caucased_dir, 'server.key.pem'),
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
# capture subprocess output not to pollute test's own stdout
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(30):
try:
if requests.get(self.url).status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError('caucased failed to start.')
def close(self) -> None:
self._caucased_process.terminate()
self._caucased_process.wait()
self._caucased_process.stdout.close()
shutil.rmtree(self.directory)
@property
def ca_crt_path(self) -> str:
"""Path of the CA certificate from this caucase.
"""
ca_crt_path = os.path.join(self.directory, 'ca.crt.pem')
if not os.path.exists(ca_crt_path):
with open(ca_crt_path, 'w') as f:
f.write(
requests.get(urllib.parse.urljoin(
self.url,
'/cas/crt/ca.crt.pem',
)).text)
return ca_crt_path
class CaucaseCertificate(ManagedResource):
"""A certificate signed by a caucase service.
"""
ca_crt_file: str = None
crl_file: str = None
csr_file: str = None
cert_file: str = None
key_file: str = None
def open(self) -> None:
self.tmpdir = tempfile.mkdtemp()
self.ca_crt_file = os.path.join(self.tmpdir, 'ca-crt.pem')
self.crl_file = os.path.join(self.tmpdir, 'ca-crl.pem')
self.csr_file = os.path.join(self.tmpdir, 'csr.pem')
self.cert_file = os.path.join(self.tmpdir, 'crt.pem')
self.key_file = os.path.join(self.tmpdir, 'key.pem')
def close(self) -> None:
shutil.rmtree(self.tmpdir)
@property
def _caucase_path(self) -> str:
"""path of caucase executable.
"""
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
return os.path.join(software_release_root_path, 'bin', 'caucase')
def request(self, common_name: str, caucase: CaucaseService, san: x509.SubjectAlternativeName=None) -> None:
"""Generate certificate and request signature to the caucase service.
This overwrite any previously requested certificate for this instance.
"""
cas_args = [
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
]
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
with open(self.key_file, 'wb') as f:
f.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(
NameOID.COMMON_NAME,
common_name,
),
]))
if san:
csr = csr.add_extension(san, critical=True)
csr = csr.sign(key, hashes.SHA256(), default_backend())
with open(self.csr_file, 'wb') as f:
f.write(csr.public_bytes(serialization.Encoding.PEM))
csr_id = subprocess.check_output(
cas_args + [
'--send-csr', self.csr_file,
],
).split()[0].decode()
assert csr_id
for _ in range(30):
if not subprocess.call(
cas_args + [
'--get-crt', csr_id, self.cert_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) == 0:
break
else:
time.sleep(1)
else:
raise RuntimeError('getting service certificate failed.')
with open(self.cert_file) as cert_file:
assert 'BEGIN CERTIFICATE' in cert_file.read()
def revoke(self, caucase: CaucaseService) -> None:
"""Revoke the client certificate on this caucase instance.
"""
subprocess.check_call([
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
'--revoke-crt', self.cert_file, self.key_file,
])
...@@ -23,9 +23,10 @@ import psutil ...@@ -23,9 +23,10 @@ import psutil
import requests import requests
from slapos.proxy.db_version import DB_VERSION from slapos.proxy.db_version import DB_VERSION
from slapos.testing.caucase import CaucaseCertificate, CaucaseService
from slapos.testing.utils import CrontabMixin, ManagedHTTPServer from slapos.testing.utils import CrontabMixin, ManagedHTTPServer
from . import CaucaseCertificate, CaucaseService, ERP5InstanceTestCase, default, matrix, setUpModule from . import ERP5InstanceTestCase, default, matrix, setUpModule
setUpModule # pyflakes setUpModule # pyflakes
......
...@@ -48,10 +48,12 @@ import xmlrpc.client ...@@ -48,10 +48,12 @@ import xmlrpc.client
import psutil import psutil
import requests import requests
import urllib3 import urllib3
from slapos.testing.caucase import CaucaseService
from slapos.testing.utils import CrontabMixin from slapos.testing.utils import CrontabMixin
import zc.buildout.configparser import zc.buildout.configparser
from . import CaucaseService, ERP5InstanceTestCase, default, matrix, neo, setUpModule, ERP5PY3
from . import ERP5InstanceTestCase, default, matrix, neo, setUpModule, ERP5PY3
setUpModule # pyflakes setUpModule # pyflakes
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment