Commit 9e3602ef authored by Jérome Perrin's avatar Jérome Perrin

software/erp5/test: move Caucase helper classes in __init__

parent 22e601f1
...@@ -25,12 +25,26 @@ ...@@ -25,12 +25,26 @@
# #
############################################################################## ##############################################################################
import hashlib
import itertools import itertools
import json import json
import os import os
import shutil
import subprocess
import sys import sys
import tempfile
import time
import urllib
from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from slapos.testing.testcase import ManagedResource, makeModuleSetUpAndTestCaseClass
from slapos.testing.utils import findFreeTCPPort
_setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass( _setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass(
...@@ -200,3 +214,175 @@ class ERP5InstanceTestCase(SlapOSInstanceTestCase, metaclass=ERP5InstanceTestMet ...@@ -200,3 +214,175 @@ class ERP5InstanceTestCase(SlapOSInstanceTestCase, metaclass=ERP5InstanceTestMet
def getComputerPartitionPath(cls, partition_reference): def getComputerPartitionPath(cls, partition_reference):
partition_id = cls.getComputerPartition(partition_reference).getId() partition_id = cls.getComputerPartition(partition_reference).getId()
return os.path.join(cls.slap._instance_root, partition_id) return os.path.join(cls.slap._instance_root, partition_id)
class CaucaseService(ManagedResource):
"""A caucase service.
"""
url: str = None
directory: str = None
_caucased_process: subprocess.Popen = None
def open(self) -> None:
# start a caucased and server certificate.
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
caucased_path = os.path.join(software_release_root_path, 'bin', 'caucased')
self.directory = tempfile.mkdtemp()
caucased_dir = os.path.join(self.directory, 'caucased')
os.mkdir(caucased_dir)
os.mkdir(os.path.join(caucased_dir, 'user'))
os.mkdir(os.path.join(caucased_dir, 'service'))
backend_caucased_netloc = f'{self._cls._ipv4_address}:{findFreeTCPPort(self._cls._ipv4_address)}'
self.url = 'http://' + backend_caucased_netloc
self._caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(caucased_dir, 'server.key.pem'),
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
# capture subprocess output not to pollute test's own stdout
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(30):
try:
if requests.get(self.url).status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError('caucased failed to start.')
def close(self) -> None:
self._caucased_process.terminate()
self._caucased_process.wait()
self._caucased_process.stdout.close()
shutil.rmtree(self.directory)
@property
def ca_crt_path(self) -> str:
"""Path of the CA certificate from this caucase.
"""
ca_crt_path = os.path.join(self.directory, 'ca.crt.pem')
if not os.path.exists(ca_crt_path):
with open(ca_crt_path, 'w') as f:
f.write(
requests.get(urllib.parse.urljoin(
self.url,
'/cas/crt/ca.crt.pem',
)).text)
return ca_crt_path
class CaucaseCertificate(ManagedResource):
"""A certificate signed by a caucase service.
"""
ca_crt_file: str = None
crl_file: str = None
csr_file: str = None
cert_file: str = None
key_file: str = None
def open(self) -> None:
self.tmpdir = tempfile.mkdtemp()
self.ca_crt_file = os.path.join(self.tmpdir, 'ca-crt.pem')
self.crl_file = os.path.join(self.tmpdir, 'ca-crl.pem')
self.csr_file = os.path.join(self.tmpdir, 'csr.pem')
self.cert_file = os.path.join(self.tmpdir, 'crt.pem')
self.key_file = os.path.join(self.tmpdir, 'key.pem')
def close(self) -> None:
shutil.rmtree(self.tmpdir)
@property
def _caucase_path(self) -> str:
"""path of caucase executable.
"""
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
return os.path.join(software_release_root_path, 'bin', 'caucase')
def request(self, common_name: str, caucase: CaucaseService) -> None:
"""Generate certificate and request signature to the caucase service.
This overwrite any previously requested certificate for this instance.
"""
cas_args = [
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
]
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
with open(self.key_file, 'wb') as f:
f.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(
NameOID.COMMON_NAME,
common_name,
),
])).sign(
key,
hashes.SHA256(),
default_backend(),
)
with open(self.csr_file, 'wb') as f:
f.write(csr.public_bytes(serialization.Encoding.PEM))
csr_id = subprocess.check_output(
cas_args + [
'--send-csr', self.csr_file,
],
).split()[0].decode()
assert csr_id
for _ in range(30):
if not subprocess.call(
cas_args + [
'--get-crt', csr_id, self.cert_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) == 0:
break
else:
time.sleep(1)
else:
raise RuntimeError('getting service certificate failed.')
with open(self.cert_file) as cert_file:
assert 'BEGIN CERTIFICATE' in cert_file.read()
def revoke(self, caucase: CaucaseService) -> None:
"""Revoke the client certificate on this caucase instance.
"""
subprocess.check_call([
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
'--revoke-crt', self.cert_file, self.key_file,
])
import glob import glob
import hashlib
import json import json
import logging import logging
import os import os
import re import re
import shutil
import subprocess import subprocess
import tempfile
import time import time
import urllib.parse import urllib.parse
from http.server import BaseHTTPRequestHandler from http.server import BaseHTTPRequestHandler
...@@ -15,16 +12,10 @@ from unittest import mock ...@@ -15,16 +12,10 @@ from unittest import mock
import pexpect import pexpect
import psutil import psutil
import requests import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from slapos.testing.testcase import ManagedResource from slapos.testing.utils import CrontabMixin, ManagedHTTPServer
from slapos.testing.utils import CrontabMixin, ManagedHTTPServer, findFreeTCPPort
from . import ERP5InstanceTestCase, default, matrix, setUpModule from . import CaucaseCertificate, CaucaseService, ERP5InstanceTestCase, default, matrix, setUpModule
setUpModule # pyflakes setUpModule # pyflakes
...@@ -73,59 +64,6 @@ class EchoHTTP11Server(ManagedHTTPServer): ...@@ -73,59 +64,6 @@ class EchoHTTP11Server(ManagedHTTPServer):
log_message = logging.getLogger(__name__ + '.EchoHTTP11Server').info log_message = logging.getLogger(__name__ + '.EchoHTTP11Server').info
class CaucaseService(ManagedResource):
"""A caucase service.
"""
url: str = None
directory: str = None
_caucased_process: subprocess.Popen = None
def open(self) -> None:
# start a caucased and server certificate.
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
caucased_path = os.path.join(software_release_root_path, 'bin', 'caucased')
self.directory = tempfile.mkdtemp()
caucased_dir = os.path.join(self.directory, 'caucased')
os.mkdir(caucased_dir)
os.mkdir(os.path.join(caucased_dir, 'user'))
os.mkdir(os.path.join(caucased_dir, 'service'))
backend_caucased_netloc = f'{self._cls._ipv4_address}:{findFreeTCPPort(self._cls._ipv4_address)}'
self.url = 'http://' + backend_caucased_netloc
self._caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(caucased_dir, 'server.key.pem'),
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
# capture subprocess output not to pollute test's own stdout
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(30):
try:
if requests.get(self.url).status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError('caucased failed to start.')
def close(self) -> None:
self._caucased_process.terminate()
self._caucased_process.wait()
self._caucased_process.stdout.close()
shutil.rmtree(self.directory)
class BalancerTestCase(ERP5InstanceTestCase): class BalancerTestCase(ERP5InstanceTestCase):
# We explicitly specify 'balancer' as our software type here, # We explicitly specify 'balancer' as our software type here,
# therefore we don't request ZODB. We therefore don't # therefore we don't request ZODB. We therefore don't
...@@ -628,110 +566,6 @@ class TestContentEncoding(BalancerTestCase): ...@@ -628,110 +566,6 @@ class TestContentEncoding(BalancerTestCase):
self.assertEqual(resp.text, 'OK') self.assertEqual(resp.text, 'OK')
class CaucaseCertificate(ManagedResource):
"""A certificate signed by a caucase service.
"""
ca_crt_file: str = None
crl_file: str = None
csr_file: str = None
cert_file: str = None
key_file: str = None
def open(self) -> None:
self.tmpdir = tempfile.mkdtemp()
self.ca_crt_file = os.path.join(self.tmpdir, 'ca-crt.pem')
self.crl_file = os.path.join(self.tmpdir, 'ca-crl.pem')
self.csr_file = os.path.join(self.tmpdir, 'csr.pem')
self.cert_file = os.path.join(self.tmpdir, 'crt.pem')
self.key_file = os.path.join(self.tmpdir, 'key.pem')
def close(self) -> None:
shutil.rmtree(self.tmpdir)
@property
def _caucase_path(self) -> str:
"""path of caucase executable.
"""
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
return os.path.join(software_release_root_path, 'bin', 'caucase')
def request(self, common_name: str, caucase: CaucaseService) -> None:
"""Generate certificate and request signature to the caucase service.
This overwrite any previously requested certificate for this instance.
"""
cas_args = [
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
]
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
with open(self.key_file, 'wb') as f:
f.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(
NameOID.COMMON_NAME,
common_name,
),
])).sign(
key,
hashes.SHA256(),
default_backend(),
)
with open(self.csr_file, 'wb') as f:
f.write(csr.public_bytes(serialization.Encoding.PEM))
csr_id = subprocess.check_output(
cas_args + [
'--send-csr', self.csr_file,
],
).split()[0].decode()
assert csr_id
for _ in range(30):
if not subprocess.call(
cas_args + [
'--get-crt', csr_id, self.cert_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) == 0:
break
else:
time.sleep(1)
else:
raise RuntimeError('getting service certificate failed.')
with open(self.cert_file) as cert_file:
assert 'BEGIN CERTIFICATE' in cert_file.read()
def revoke(self, caucase: CaucaseService) -> None:
"""Revoke the client certificate on this caucase instance.
"""
subprocess.check_call([
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
'--revoke-crt', self.cert_file, self.key_file,
])
class TestFrontendXForwardedFor(BalancerTestCase): class TestFrontendXForwardedFor(BalancerTestCase):
__partition_reference__ = 'xff' __partition_reference__ = 'xff'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment