Commit bbd2895b authored by Jérome Perrin's avatar Jérome Perrin

erp5 with dependent services

parent cea14a17
from . import ERP5InstanceTestCase from . import ERP5InstanceTestCase
from . import setUpModule from . import setUpModule
from slapos.testing.utils import findFreeTCPPort from slapos.testing.utils import findFreeTCPPort
from slapos.testing.testcase import ManagedService
from slapos.testing.utils import ManagedHTTPServer
from slapos.testing.utils import CrontabMixin
from BaseHTTPServer import HTTPServer
from BaseHTTPServer import BaseHTTPRequestHandler from BaseHTTPServer import BaseHTTPRequestHandler
import OpenSSL.SSL import OpenSSL.SSL
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
...@@ -14,50 +16,189 @@ import hashlib ...@@ -14,50 +16,189 @@ import hashlib
import json import json
import multiprocessing import multiprocessing
import os import os
import re
import requests import requests
import shutil import shutil
import subprocess import subprocess
import tempfile import tempfile
import time import time
import logging
import urlparse
from typing import Dict
setUpModule # pyflakes setUpModule # pyflakes
class TestHandler(BaseHTTPRequestHandler):
def setUpModule():
print("no setupmodule")
# TODO:
# balancer cookie set on POST (only) XXX ? really here
# balancer cookie used used to stick to backend
# gzip responses
class EchoHTTPServer(ManagedHTTPServer):
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self): def do_GET(self):
# type: () -> None
self.send_response(200) self.send_response(200)
self.send_header("Content-Type", "application/json") self.send_header("Content-Type", "application/json")
response = { response = {'Path': self.path, 'Incoming Headers': self.headers.dict}
'Path': self.path,
'Incoming Headers': self.headers.dict
}
response = json.dumps(response, indent=2) response = json.dumps(response, indent=2)
self.end_headers() self.end_headers()
self.wfile.write(response) self.wfile.write(response)
class TestFrontendXForwardedFor(ERP5InstanceTestCase): log_message = logging.getLogger(__name__ + '.HeaderEchoHandler').info
__partition_reference__ = 'xff'
http_server_process = None
frontend_caucase_dir = None class CaucaseService(ManagedService):
frontend_caucased_process = None url = None # type: str
backend_caucase_dir = None caucase_dir = None # type: str
backend_caucased_process = None caucase_process = None # type: subprocess.Popen
def start(self):
# type: () -> None
# start a caucased and server certificate.
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
caucased_path = os.path.join(software_release_root_path, 'bin', 'caucased')
self.caucase_dir = tempfile.mkdtemp()
caucased_dir = os.path.join(self.caucase_dir, 'caucased')
os.mkdir(caucased_dir)
backend_caucased_netloc = '%s:%s' % (self._cls._ipv4_address, findFreeTCPPort(self._cls._ipv4_address))
self.url = 'http://' + backend_caucased_netloc
self.caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(caucased_dir, 'server.key.pem'),
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(10):
try:
if requests.get(self.url).status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError('caucased failed to start.')
def stop(self):
# type: () -> None
self.caucased_process.terminate()
self.caucased_process.wait()
shutil.rmtree(self.caucase_dir)
class BalancerTestCase(ERP5InstanceTestCase):
instance_max_retry = 2 # XXX debug
@classmethod @classmethod
def getInstanceSoftwareType(cls): def getInstanceSoftwareType(cls):
return 'balancer' return 'balancer'
@classmethod @classmethod
def setUpClass(cls): def _getInstanceParameterDict(cls):
# start a dummy web server echoing headers. # type: () -> Dict
http_server_port = findFreeTCPPort(cls._ipv4_address) return {
server = HTTPServer( 'tcpv4-port': 8000,
(cls._ipv4_address, http_server_port), 'computer-memory-percent-threshold': 100,
TestHandler) # XXX what is this ? should probably not be needed here
cls.http_server_process = multiprocessing.Process( 'name': cls.__name__,
target=server.serve_forever, name='HTTPServer') 'monitor-passwd': 'secret',
cls.http_server_process.start() 'apachedex-configuration': '--erp5-base +erp5 .*/VirtualHostRoot/erp5(/|\\?|$) --base +other / --skip-user-agent Zabbix --error-detail --js-embed --quiet',
cls.http_server_netloc = '%s:%s' % (cls._ipv4_address, http_server_port) 'apachedex-promise-threshold': 100,
'haproxy-server-check-path': '/',
'zope-family-dict': {
'default': ['dummy_http_server'], # TODO: dummy_http_server is bad name
},
'dummy_http_server': [[cls.getManagedService("backend_web_server", EchoHTTPServer).netloc, 1, False]],
'backend-path-dict': {
'default': '/',
},
'ssl-authentication-dict': {},
'ssl': {
'caucase-url': cls.getManagedService("caucase", CaucaseService).url,
}
}
@classmethod
def getInstanceParameterDict(cls):
# type: () -> Dict
return {'_': json.dumps(cls._getInstanceParameterDict())}
class SlowHTTPServer(ManagedHTTPServer):
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
self.send_response(200)
self.send_header("Content-Type", "text/plain")
time.sleep(3)
self.end_headers()
self.wfile.write("OK\n")
log_message = logging.getLogger(__name__ + '.SlowHandler').info
class TestAccessLog(BalancerTestCase, CrontabMixin):
"""Check access logs emitted by balancer
"""
__partition_reference__ = 'l'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestAccessLog, cls)._getInstanceParameterDict()
# use a slow server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedService("slow_web_server", SlowHTTPServer).netloc, 1, False]]
return parameter_dict
def test_access_log(self):
# type: () -> None
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default']
requests.get(
urlparse.urljoin(balancer_url, '/slow'),
verify=False,
)
with open(os.path.join(self.computer_partition_root_path, 'var', 'log', 'apache-access.log')) as access_log_file:
access_line = access_log_file.read()
self.assertIn('/slow', access_line)
# last \d is the request time in micro seconds, since this SlowHTTPServer
# sleeps for 3 seconds, it should take between 3 and 4 seconds to process
# the request
match = re.match(
r'([(\d\.)]+) - - \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)" (\d+)',
access_line
)
self.assertTrue(match)
request_time = int(match.groups()[-1])
self.assertGreater(request_time, 3000000)
self.assertLess(request_time, 4000000)
import pdb; pdb.set_trace()
class TestFrontendXForwardedFor(BalancerTestCase):
__partition_reference__ = 'xff'
frontend_caucase_dir = None
frontend_caucased_process = None
# TODO: ManagedService
@classmethod
def setUpClass(cls):
# type: () -> None
# start a caucased and generate a valid client certificate. # start a caucased and generate a valid client certificate.
cls.computer_partition_root_path = os.path.abspath(os.curdir) cls.computer_partition_root_path = os.path.abspath(os.curdir)
cls.frontend_caucase_dir = tempfile.mkdtemp() cls.frontend_caucase_dir = tempfile.mkdtemp()
...@@ -197,83 +338,33 @@ class TestFrontendXForwardedFor(ERP5InstanceTestCase): ...@@ -197,83 +338,33 @@ class TestFrontendXForwardedFor(ERP5InstanceTestCase):
else: else:
raise RuntimeError('getting service certificate failed.') raise RuntimeError('getting service certificate failed.')
# start a caucased and server certificate.
cls.backend_caucase_dir = tempfile.mkdtemp()
backend_caucased_dir = os.path.join(cls.backend_caucase_dir, 'caucased')
os.mkdir(backend_caucased_dir)
backend_caucased_netloc = '%s:%s' % (cls._ipv4_address, findFreeTCPPort(cls._ipv4_address))
cls.backend_caucased_url = 'http://' + backend_caucased_netloc
cls.backend_caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(backend_caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(backend_caucased_dir, 'server.key.pem'),
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(10):
try:
if requests.get(cls.backend_caucased_url).status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError('caucased failed to start.')
super(TestFrontendXForwardedFor, cls).setUpClass() super(TestFrontendXForwardedFor, cls).setUpClass()
@classmethod @classmethod
def getInstanceParameterDict(cls): def _getInstanceParameterDict(cls):
return { # type: () -> Dict
'_': json.dumps({ parameter_dict = super(TestFrontendXForwardedFor, cls)._getInstanceParameterDict()
'tcpv4-port': 3306, # add another "-auth" backend, that will have ssl-authentication enabled
'computer-memory-percent-threshold': 100, parameter_dict['zope-family-dict']['default-auth'] = ['dummy_http_server']
# XXX what is this ? should probably not be needed here parameter_dict['backend-path-dict']['default-auth'] = '/'
'name': cls.__name__, parameter_dict['ssl-authentication-dict'] = {
'monitor-passwd': 'secret',
'apachedex-configuration': '',
'apachedex-promise-threshold': 100,
'haproxy-server-check-path': '/',
'zope-family-dict': {
'default': ['dummy_http_server'],
'default-auth': ['dummy_http_server'],
},
'dummy_http_server': [[cls.http_server_netloc, 1, False]],
'backend-path-dict': {
'default': '/',
'default-auth': '/',
},
'ssl-authentication-dict': {
'default': False, 'default': False,
'default-auth': True, 'default-auth': True,
},
'ssl': {
'caucase-url': cls.backend_caucased_url,
'frontend-caucase-url-list': [cls.frontend_caucased_url],
},
})
} }
parameter_dict['ssl']['frontend-caucase-url-list'] = [cls.frontend_caucased_url]
return parameter_dict
@classmethod @classmethod
def _cleanup(cls, snapshot_name): def _cleanup(cls, snapshot_name):
if cls.http_server_process:
cls.http_server_process.terminate()
if cls.frontend_caucased_process: if cls.frontend_caucased_process:
cls.frontend_caucased_process.terminate() cls.frontend_caucased_process.terminate()
cls.frontend_caucased_process.wait()
if cls.frontend_caucase_dir: if cls.frontend_caucase_dir:
shutil.rmtree(cls.frontend_caucase_dir) shutil.rmtree(cls.frontend_caucase_dir)
if cls.backend_caucased_process:
cls.backend_caucased_process.terminate()
if cls.backend_caucase_dir:
shutil.rmtree(cls.backend_caucase_dir)
super(TestFrontendXForwardedFor, cls)._cleanup(snapshot_name) super(TestFrontendXForwardedFor, cls)._cleanup(snapshot_name)
def test_x_forwarded_for_added_when_verified_connection(self): def test_x_forwarded_for_added_when_verified_connection(self):
# type: () -> None
for backend in ('default', 'default-auth'): for backend in ('default', 'default-auth'):
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])[backend] balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])[backend]
result = requests.get( result = requests.get(
...@@ -285,6 +376,7 @@ class TestFrontendXForwardedFor(ERP5InstanceTestCase): ...@@ -285,6 +376,7 @@ class TestFrontendXForwardedFor(ERP5InstanceTestCase):
self.assertEqual(result['Incoming Headers'].get('x-forwarded-for').split(', ')[0], '1.2.3.4') self.assertEqual(result['Incoming Headers'].get('x-forwarded-for').split(', ')[0], '1.2.3.4')
def test_x_forwarded_for_stripped_when_not_verified_connection(self): def test_x_forwarded_for_stripped_when_not_verified_connection(self):
# type: () -> None
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default'] balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default']
result = requests.get( result = requests.get(
balancer_url, balancer_url,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment