Commit d2d2a1b6 authored by Łukasz Nowak's avatar Łukasz Nowak

software/slapos-master: Test SSL authentication

This is critical feature of SlapOS Master instantiation, and very easy to
damage, thus testing it is important.
parent 9a393f73
...@@ -31,6 +31,10 @@ import glob ...@@ -31,6 +31,10 @@ import glob
import urlparse import urlparse
import socket import socket
import time import time
import re
import BaseHTTPServer
import multiprocessing
import subprocess
import psutil import psutil
import requests import requests
...@@ -315,3 +319,166 @@ class TestZopeNodeParameterOverride( ...@@ -315,3 +319,166 @@ class TestZopeNodeParameterOverride(
}, { }, {
"cache-size": None, "cache-size": None,
}) })
def popenCommunicate(command_list, input_=None, **kwargs):
kwargs.update(stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
popen = subprocess.Popen(command_list, **kwargs)
result = popen.communicate(input_)[0]
if popen.returncode is None:
popen.kill()
if popen.returncode != 0:
raise ValueError(
'Issue during calling %r, result was:\n%s' % (command_list, result))
return result
class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
response = {
'Path': self.path,
'Incoming Headers': self.headers.dict
}
response = json.dumps(response, indent=2)
self.end_headers()
self.wfile.write(response)
class TestDeploymentScriptInstantiation(ERP5InstanceTestCase):
"""This check deployment script like instantiation
Low level assertions are done here in roder to assure that
https://lab.nexedi.com/nexedi/slapos.package/blob/master/playbook/
slapos-master-standalone.yml
works correctly
"""
__partition_reference__ = 'tdsi'
# a bit more partition is required
partition_count = 20
@classmethod
def getInstanceParameterDict(cls):
# As close as possible configuration to deployment script
parameter_dict = {
"timezone": "UTC",
"site-id": "erp5",
"bt5": "erp5_full_text_myisam_catalog slapos_configurator",
"wsgi": False,
"test-runner": {"enabled": False}, # won't work anyway here
"zope-partition-dict": {
"admin": {
"family": "admin",
"thread-amount": 4,
"port-base": 2220,
"instance-count": 1
},
"activities-node": {
"family": "activities",
"thread-amount": 4,
"instance-count": 1,
"timerserver-interval": 1,
"port-base": 2230
},
"distribution-node": {
"family": "distribution",
"thread-amount": 1,
"instance-count": 1,
"port-base": 2210,
"timerserver-interval": 1
},
"web-node": {
"family": "web",
"thread-amount": 2,
"instance-count": 1,
"port-base": 2240
},
"service-slapos": {
"family": "service",
"thread-amount": 2,
"instance-count": 1,
"port-base": 2250,
"ssl-authentication": True,
"backend-path": "/%(site-id)s/portal_slap"
}
}
}
# put shared-certificate-authority-path in controlled location
cls.ca_path = os.path.join(cls.slap.instance_directory, 'ca_path')
parameter_dict["shared-certificate-authority-path"] = cls.ca_path
return {'_': json.dumps(parameter_dict)}
@classmethod
def callSupervisorMethod(cls, method, *args, **kwargs):
with cls.slap.instance_supervisor_rpc as instance_supervisor:
return getattr(instance_supervisor, method)(*args, **kwargs)
def test_ssl_auth(self):
backend_apache_configuration_list = glob.glob(
os.path.join(
self.slap.instance_directory, '*', 'etc', 'apache', 'apache.conf'))
self.assertEqual(
1,
len(backend_apache_configuration_list)
)
backend_apache_configuration = open(
backend_apache_configuration_list[0]).read()
self.assertIn(
'SSLVerifyClient require',
backend_apache_configuration
)
self.assertIn(
r'RequestHeader set Remote-User %{SSL_CLIENT_S_DN_CN}s',
backend_apache_configuration
)
# stop haproxy, it's going to be hijacked
haproxy_name = ':'.join([
(q['group'], q['name'])
for q in self.callSupervisorMethod('getAllProcessInfo')
if 'haproxy' in q['name']][0])
self.callSupervisorMethod('stopProcess', haproxy_name)
# do similar certificate request like CertificateAuthorityTool
openssl_config = os.path.join(self.ca_path, 'openssl.cnf')
key = os.path.join(self.ca_path, 'private', 'test.key')
csr = os.path.join(self.ca_path, 'text.csr')
cert = os.path.join(self.ca_path, 'certs', 'test.crt')
common_name = 'TEST-SSL-AUTH'
popenCommunicate([
'openssl', 'req', '-utf8', '-nodes', '-config', openssl_config, '-new',
'-keyout', key, '-out', csr, '-days', '3650'], '%s\n' % (common_name,),
stdin=subprocess.PIPE)
popenCommunicate([
'openssl', 'ca', '-utf8', '-days', '3650', '-batch', '-config',
openssl_config, '-out', cert, '-infiles', csr])
# find IP and port on which hijacked process shall listen
portal_slap_line = [
q for q in backend_apache_configuration.splitlines()
if 'portal_slap' in q][0]
ip, port = re.search(
r'.*http:\/\/(.*):(\d*)\/.*', portal_slap_line).groups()
port = int(port)
server = BaseHTTPServer.HTTPServer((ip, port), TestHandler)
server_process = multiprocessing.Process(
target=server.serve_forever, name='HTTPServer')
server_process.start()
try:
# assert that accessing the service endpoint results with certificate
# authentication and proper information extraction
result_json = requests.get(
self.getRootPartitionConnectionParameterDict()['family-service'],
verify=False, cert=(cert, key)).json()
self.assertEqual(
common_name,
result_json['Incoming Headers']['remote-user']
)
self.assertEqual(
'/erp5/portal_slap/',
result_json['Path']
)
finally:
server_process.join(10)
server_process.terminate()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment