Commit cc1407d2 authored by Rafael Monnerat's avatar Rafael Monnerat

slapos-master: Port latest tests for slapos into slapos-master

  This is a partial port, just to have more tests that
  weren't afected by the patches for keep apache as entry-point
  of the backend.
parent c7dc30a6
Pipeline #16728 failed with stage
...@@ -18,7 +18,7 @@ md5sum = 84f099cc9852c4f53a075dccbb3880f0 ...@@ -18,7 +18,7 @@ md5sum = 84f099cc9852c4f53a075dccbb3880f0
[template-balancer] [template-balancer]
filename = instance-balancer.cfg.in filename = instance-balancer.cfg.in
md5sum = 67022177ad0f08511af426888cf2738e md5sum = c7c0bb9abbd0f8cc6c7956d83a61c4b3
[template-apache-backend-conf] [template-apache-backend-conf]
filename = apache-backend.conf.in filename = apache-backend.conf.in
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
{% set part_list = [] -%} {% set part_list = [] -%}
{% macro section(name) %}{% do part_list.append(name) %}{{ name }}{% endmacro -%} {% macro section(name) %}{% do part_list.append(name) %}{{ name }}{% endmacro -%}
{% set ssl_parameter_dict = slapparameter_dict['ssl'] -%} {% set ssl_parameter_dict = slapparameter_dict['ssl'] -%}
{% set frontend_caucase_url_list = ssl_parameter_dict.get('frontend-caucase-url-list', []) -%}
{% set shared_ca_path = slapparameter_dict.get('shared-certificate-authority-path') -%} {% set shared_ca_path = slapparameter_dict.get('shared-certificate-authority-path') -%}
{# {#
XXX: This template only supports exactly one IPv4 and (if ipv6 is used) one IPv6 XXX: This template only supports exactly one IPv4 and (if ipv6 is used) one IPv6
...@@ -38,7 +39,7 @@ mode = 644 ...@@ -38,7 +39,7 @@ mode = 644
{% set haproxy_dict = {} -%} {% set haproxy_dict = {} -%}
{% set apache_dict = {} -%} {% set apache_dict = {} -%}
{% set zope_virtualhost_monster_backend_dict = {} %} {% set zope_virtualhost_monster_backend_dict = {} %}
{% set test_runner_url_dict = {} %} {# family_name => list of apache URLs #} {% set test_runner_url_dict = {} %} {# family_name => list of URLs #}
{% set next_port = itertools.count(slapparameter_dict['tcpv4-port']).next -%} {% set next_port = itertools.count(slapparameter_dict['tcpv4-port']).next -%}
{% for family_name, parameter_id_list in sorted( {% for family_name, parameter_id_list in sorted(
slapparameter_dict['zope-family-dict'].iteritems()) -%} slapparameter_dict['zope-family-dict'].iteritems()) -%}
...@@ -59,19 +60,19 @@ mode = 644 ...@@ -59,19 +60,19 @@ mode = 644
{% set test_runner_address_list = slapparameter_dict.get(parameter_id ~ '-test-runner-address-list', []) %} {% set test_runner_address_list = slapparameter_dict.get(parameter_id ~ '-test-runner-address-list', []) %}
{% if test_runner_address_list -%} {% if test_runner_address_list -%}
{% set test_runner_backend_mapping = {} %} {% set test_runner_backend_mapping = {} %}
{% set test_runner_apache_url_list = [] %} {% set test_runner_balancer_url_list = [] %}
{% set test_runner_external_port = next_port() %} {% set test_runner_external_port = next_port() %}
{% for i, (test_runner_internal_ip, test_runner_internal_port) in enumerate(test_runner_address_list) %} {% for i, (test_runner_internal_ip, test_runner_internal_port) in enumerate(test_runner_address_list) %}
{% do test_runner_backend_mapping.__setitem__( {% do test_runner_backend_mapping.__setitem__(
'unit_test_' ~ i, 'unit_test_' ~ i,
'http://' ~ test_runner_internal_ip ~ ':' ~ test_runner_internal_port ) %} 'http://' ~ test_runner_internal_ip ~ ':' ~ test_runner_internal_port ) %}
{% do test_runner_apache_url_list.append( {% do test_runner_balancer_url_list.append(
'https://' ~ ipv4 ~ ':' ~ test_runner_external_port ~ '/unit_test_' ~ i ~ '/' ) %} 'https://' ~ ipv4 ~ ':' ~ test_runner_external_port ~ '/unit_test_' ~ i ~ '/' ) %}
{% endfor %} {% endfor %}
{% do zope_virtualhost_monster_backend_dict.__setitem__( {% do zope_virtualhost_monster_backend_dict.__setitem__(
(ipv4, test_runner_external_port), (ipv4, test_runner_external_port),
( ssl_authentication, test_runner_backend_mapping ) ) -%} ( ssl_authentication, test_runner_backend_mapping ) ) -%}
{% do test_runner_url_dict.__setitem__(family_name, test_runner_apache_url_list) -%} {% do test_runner_url_dict.__setitem__(family_name, test_runner_balancer_url_list) -%}
{% endif -%} {% endif -%}
{% endfor -%} {% endfor -%}
...@@ -123,6 +124,23 @@ caucase-key = ${directory:apache-conf}/apache-caucase.pem ...@@ -123,6 +124,23 @@ caucase-key = ${directory:apache-conf}/apache-caucase.pem
ca-cert = ${directory:apache-conf}/ca.crt ca-cert = ${directory:apache-conf}/ca.crt
crl = ${directory:apache-conf}/crl.pem crl = ${directory:apache-conf}/crl.pem
[simplefile]
< = jinja2-template-base
template = inline:{{ '{{ content }}' }}
{% macro simplefile(section_name, file_path, content, mode='') -%}
{% set content_section_name = section_name ~ '-content' -%}
[{{ content_section_name }}]
content = {{ dumps(content) }}
[{{ section(section_name) }}]
< = simplefile
rendered = {{ file_path }}
context = key content {{content_section_name}}:content
mode = {{ mode }}
{%- endmacro %}
[apache-ssl] [apache-ssl]
{% if ssl_parameter_dict.get('key') -%} {% if ssl_parameter_dict.get('key') -%}
key = ${apache-ssl-key:rendered} key = ${apache-ssl-key:rendered}
...@@ -273,8 +291,6 @@ template = inline: ...@@ -273,8 +291,6 @@ template = inline:
{% endfor %} {% endfor %}
[apachedex-parameters] [apachedex-parameters]
# XXX - Sample log file with curent date: apache_access.log-%(date)s.gz
# which will be equivalent to apache_access.log-20150112.gz if the date is 2015-01-12
apache-log-list = ${apache-conf-parameter-dict:access-log} apache-log-list = ${apache-conf-parameter-dict:access-log}
configuration = ${monitor-apachedex-report-config:rendered} configuration = ${monitor-apachedex-report-config:rendered}
promise-threshold = {{ slapparameter_dict['apachedex-promise-threshold'] }} promise-threshold = {{ slapparameter_dict['apachedex-promise-threshold'] }}
......
...@@ -30,11 +30,21 @@ import os ...@@ -30,11 +30,21 @@ import os
from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass
setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass(
_setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass(
os.path.abspath( os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..', 'software.cfg'))) os.path.join(os.path.dirname(__file__), '..', '..', 'software.cfg')))
setup_module_executed = False
def setUpModule():
# slapos.testing.testcase's only need to be executed once
global setup_module_executed
if not setup_module_executed:
_setUpModule()
setup_module_executed = True
class ERP5InstanceTestCase(SlapOSInstanceTestCase): class ERP5InstanceTestCase(SlapOSInstanceTestCase):
"""ERP5 base test case """ERP5 base test case
""" """
......
import glob
import hashlib
import json
import logging
import os
import re
import shutil
import subprocess
import tempfile
import time
import urllib
import urlparse
from BaseHTTPServer import BaseHTTPRequestHandler
from typing import Dict
import mock
import OpenSSL.SSL
import pexpect
import psutil
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from slapos.testing.testcase import ManagedResource
from slapos.testing.utils import (CrontabMixin, ManagedHTTPServer,
findFreeTCPPort)
from . import ERP5InstanceTestCase, setUpModule
setUpModule # pyflakes
class EchoHTTPServer(ManagedHTTPServer):
"""An HTTP Server responding with the request path and incoming headers,
encoded in json.
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = json.dumps(
{
'Path': self.path,
'Incoming Headers': self.headers.dict
},
indent=2,
)
self.end_headers()
self.wfile.write(response)
log_message = logging.getLogger(__name__ + '.EchoHTTPServer').info
class EchoHTTP11Server(ManagedHTTPServer):
"""An HTTP/1.1 Server responding with the request path and incoming headers,
encoded in json.
"""
class RequestHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def do_GET(self):
# type: () -> None
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = json.dumps(
{
'Path': self.path,
'Incoming Headers': self.headers.dict
},
indent=2,
)
self.send_header("Content-Length", len(response))
self.end_headers()
self.wfile.write(response)
log_message = logging.getLogger(__name__ + '.EchoHTTP11Server').info
class CaucaseService(ManagedResource):
"""A caucase service.
"""
url = None # type: str
directory = None # type: str
_caucased_process = None # type: subprocess.Popen
def open(self):
# type: () -> None
# start a caucased and server certificate.
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
caucased_path = os.path.join(software_release_root_path, 'bin', 'caucased')
self.directory = tempfile.mkdtemp()
caucased_dir = os.path.join(self.directory, 'caucased')
os.mkdir(caucased_dir)
os.mkdir(os.path.join(caucased_dir, 'user'))
os.mkdir(os.path.join(caucased_dir, 'service'))
backend_caucased_netloc = '%s:%s' % (self._cls._ipv4_address, findFreeTCPPort(self._cls._ipv4_address))
self.url = 'http://' + backend_caucased_netloc
self._caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(caucased_dir, 'server.key.pem'),
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(30):
try:
if requests.get(self.url).status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError('caucased failed to start.')
def close(self):
# type: () -> None
self._caucased_process.terminate()
self._caucased_process.wait()
shutil.rmtree(self.directory)
class BalancerTestCase(ERP5InstanceTestCase):
@classmethod
def getInstanceSoftwareType(cls):
return 'balancer'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
return {
'shared-certificate-authority-path': os.path.join(
'~', 'srv', 'ssl'),
'tcpv4-port': 8000,
'computer-memory-percent-threshold': 100,
# XXX what is this ? should probably not be needed here
'name': cls.__name__,
'monitor-passwd': 'secret',
'apachedex-configuration': [
'--logformat', '%h %l %u %t "%r" %>s %O "%{Referer}i" "%{User-Agent}i" %{ms}T',
'--erp5-base', '+erp5', '.*/VirtualHostRoot/erp5(/|\\?|$)',
'--base', '+other', '/',
'--skip-user-agent', 'Zabbix',
'--error-detail',
'--js-embed',
'--quiet',
],
'apachedex-promise-threshold': 100,
'haproxy-server-check-path': '/',
'zope-family-dict': {
'default': ['dummy_http_server'],
},
'dummy_http_server': [[cls.getManagedResource("backend_web_server", EchoHTTPServer).netloc, 1, False]],
'backend-path-dict': {
'default': '',
},
'ssl-authentication-dict': {},
'ssl': {
'caucase-url': cls.getManagedResource("caucase", CaucaseService).url,
},
'family-path-routing-dict': {},
'path-routing-list': [],
}
@classmethod
def getInstanceParameterDict(cls):
# type: () -> Dict
return {'_': json.dumps(cls._getInstanceParameterDict())}
def setUp(self):
self.default_balancer_url = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])['default']
class SlowHTTPServer(ManagedHTTPServer):
"""An HTTP Server which reply after 2 seconds.
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
self.send_response(200)
self.send_header("Content-Type", "text/plain")
time.sleep(2)
self.end_headers()
self.wfile.write("OK\n")
log_message = logging.getLogger(__name__ + '.SlowHandler').info
class TestLog(BalancerTestCase, CrontabMixin):
"""Check logs emitted by balancer
"""
__partition_reference__ = 'l'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestLog, cls)._getInstanceParameterDict()
# use a slow server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]]
return parameter_dict
def test_access_log_format(self):
# type: () -> None
requests.get(
urlparse.urljoin(self.default_balancer_url, '/url_path'),
verify=False,
)
time.sleep(.5) # wait a bit more until access is logged
with open(os.path.join(self.computer_partition_root_path, 'var', 'log', 'apache-access.log')) as access_log_file:
access_line = access_log_file.read().splitlines()[-1]
self.assertIn('/url_path', access_line)
# last \d is the request time in milli seconds, since this SlowHTTPServer
# sleeps for 2 seconds, it should take between 2 and 3 seconds to process
# the request - but our test machines can be slow sometimes, so we tolerate
# it can take up to 20 seconds.
match = re.match(
r'([(\d\.)]+) - - \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)" (\d+)',
access_line
)
self.assertTrue(match)
assert match
request_time = int(match.groups()[-1])
# XXX For slapos master, timing is in microsecond (not milisecond)
self.assertGreater(request_time, 2 * 1000000)
self.assertLess(request_time, 20 * 1000000)
def test_access_log_apachedex_report(self):
# type: () -> None
# make a request so that we have something in the logs
requests.get(self.default_balancer_url, verify=False)
# crontab for apachedex is executed
self._executeCrontabAtDate('generate-apachedex-report', '23:59')
# it creates a report for the day
apachedex_report, = glob.glob(
os.path.join(
self.computer_partition_root_path,
'srv',
'monitor',
'private',
'apachedex',
'ApacheDex-*.html',
))
with open(apachedex_report, 'r') as f:
report_text = f.read()
self.assertIn('APacheDEX', report_text)
# having this table means that apachedex could parse some lines.
self.assertIn('<h2>Hits per status code</h2>', report_text)
def test_access_log_rotation(self):
# type: () -> None
# run logrotate a first time so that it create state files
self._executeCrontabAtDate('logrotate', '2000-01-01')
# make a request so that we have something in the logs
requests.get(self.default_balancer_url, verify=False).raise_for_status()
# slow query crontab depends on crontab for log rotation
# to be executed first.
self._executeCrontabAtDate('logrotate', '2050-01-01')
# this logrotate leaves the log for the day as non compressed
rotated_log_file = os.path.join(
self.computer_partition_root_path,
'srv',
'backup',
'logrotate',
'apache-access.log-20500101',
)
self.assertTrue(os.path.exists(rotated_log_file))
requests.get(self.default_balancer_url, verify=False).raise_for_status()
# on next day execution of logrotate, log files are compressed
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
def test_error_log(self):
# stop backend server
backend_server = self.getManagedResource("slow_web_server", SlowHTTPServer)
self.addCleanup(backend_server.open)
backend_server.close()
# after a while, balancer should detect and log this event in error log
time.sleep(5)
self.assertEqual(
requests.get(self.default_balancer_url, verify=False).status_code,
requests.codes.service_unavailable)
with open(os.path.join(self.computer_partition_root_path, 'var', 'log', 'apache-error.log')) as error_log_file:
error_line = error_log_file.read().splitlines()[-1]
self.assertIn('apache.conf -D FOREGROUND', error_line)
# this log also include a timestamp
# This regex is for haproxy mostly, so keep it commented for now, until we can
# Merge the slapos-master setup and erp5.
# self.assertRegexpMatches(error_line, r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}')
class BalancerCookieHTTPServer(ManagedHTTPServer):
"""An HTTP Server which can set balancer cookie.
This server set cookie when requested /set-cookie path.
The reply body is the name used when registering this resource
using getManagedResource. This way we can assert which
backend replied.
"""
@property
def RequestHandler(self):
server = self
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
self.send_response(200)
self.send_header("Content-Type", "text/plain")
if self.path == '/set_cookie':
# the balancer tells the backend what's the name of the balancer cookie with
# the X-Balancer-Current-Cookie header.
self.send_header('Set-Cookie', '%s=anything' % self.headers['X-Balancer-Current-Cookie'])
# The name of this cookie is SERVERID
assert self.headers['X-Balancer-Current-Cookie'] == 'SERVERID'
self.end_headers()
self.wfile.write(server._name)
log_message = logging.getLogger(__name__ + '.BalancerCookieHTTPServer').info
return RequestHandler
class TestBalancer(BalancerTestCase):
"""Check balancing capabilities
"""
__partition_reference__ = 'b'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestBalancer, cls)._getInstanceParameterDict()
# use two backend servers
parameter_dict['dummy_http_server'] = [
[cls.getManagedResource("backend_web_server1", BalancerCookieHTTPServer).netloc, 1, False],
[cls.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).netloc, 1, False],
]
return parameter_dict
def test_balancer_round_robin(self):
# requests are by default balanced to both servers
self.assertEqual(
{requests.get(self.default_balancer_url, verify=False).text for _ in range(10)},
{'backend_web_server1', 'backend_web_server2'}
)
def test_balancer_server_down(self):
# if one backend is down, it is excluded from balancer
self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).close()
self.addCleanup(self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).open)
self.assertEqual(
{requests.get(self.default_balancer_url, verify=False).text for _ in range(10)},
{'backend_web_server1',}
)
def test_balancer_set_cookie(self):
# if backend provides a "SERVERID" cookie, balancer will overwrite it with the
# backend selected by balancing algorithm
self.assertIn(
requests.get(urlparse.urljoin(self.default_balancer_url, '/set_cookie'), verify=False).cookies['SERVERID'],
('default-0', 'default-1'),
)
def test_balancer_respects_sticky_cookie(self):
# if request is made with the sticky cookie, the client stick on one balancer
cookies = dict(SERVERID='default-1')
self.assertEqual(
{requests.get(self.default_balancer_url, verify=False, cookies=cookies).text for _ in range(10)},
{'backend_web_server2',}
)
# if that backend becomes down, requests are balanced to another server
self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).close()
self.addCleanup(self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).open)
self.assertEqual(
requests.get(self.default_balancer_url, verify=False, cookies=cookies).text,
'backend_web_server1')
class TestTestRunnerEntryPoints(BalancerTestCase):
"""Check balancer has some entries for test runner.
"""
__partition_reference__ = 't'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(
TestTestRunnerEntryPoints,
cls,
)._getInstanceParameterDict()
parameter_dict['dummy_http_server-test-runner-address-list'] = [
[
cls.getManagedResource("backend_0", EchoHTTPServer).hostname,
cls.getManagedResource("backend_0", EchoHTTPServer).port,
],
[
cls.getManagedResource("backend_1", EchoHTTPServer).hostname,
cls.getManagedResource("backend_1", EchoHTTPServer).port,
],
[
cls.getManagedResource("backend_2", EchoHTTPServer).hostname,
cls.getManagedResource("backend_2", EchoHTTPServer).port,
],
]
return parameter_dict
def test_use_proper_backend(self):
# requests are directed to proper backend based on URL path
test_runner_url_list = self.getRootPartitionConnectionParameterDict(
)['default-test-runner-url-list']
url_0, url_1, url_2 = test_runner_url_list
self.assertEqual(
urlparse.urlparse(url_0).netloc,
urlparse.urlparse(url_1).netloc)
self.assertEqual(
urlparse.urlparse(url_0).netloc,
urlparse.urlparse(url_2).netloc)
path_0 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_0/something'.format(
netloc=urlparse.urlparse(url_0).netloc)
path_1 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_1/something'.format(
netloc=urlparse.urlparse(url_0).netloc)
path_2 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_2/something'.format(
netloc=urlparse.urlparse(url_0).netloc)
self.assertEqual(
{
requests.get(url_0 + 'something', verify=False).json()['Path']
for _ in range(10)
}, {path_0})
self.assertEqual(
{
requests.get(url_1 + 'something', verify=False).json()['Path']
for _ in range(10)
}, {path_1})
self.assertEqual(
{
requests.get(url_2 + 'something', verify=False).json()['Path']
for _ in range(10)
}, {path_2})
# If a test runner backend is down, others can be accessed.
self.getManagedResource("backend_0", EchoHTTPServer).close()
self.assertEqual(
{
requests.get(url_0 + 'something', verify=False).status_code
for _ in range(5)
}, {503})
self.assertEqual(
{
requests.get(url_1 + 'something', verify=False).json()['Path']
for _ in range(10)
}, {path_1})
class TestHTTP(BalancerTestCase):
"""Check HTTP protocol with a HTTP/1.1 backend
"""
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestHTTP, cls)._getInstanceParameterDict()
# use a HTTP/1.1 server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("HTTP/1.1 Server", EchoHTTP11Server).netloc, 1, False]]
return parameter_dict
__partition_reference__ = 'h'
def test_http_version(self):
# type: () -> None
self.assertEqual(
subprocess.check_output([
'curl',
'--silent',
'--show-error',
'--output',
'/dev/null',
'--insecure',
'--write-out',
'%{http_version}',
self.default_balancer_url,
]),
'1.1',
)
def test_keep_alive(self):
# type: () -> None
# when doing two requests, connection is established only once
session = requests.Session()
session.verify = False
# do a first request, which establish a first connection
session.get(self.default_balancer_url).raise_for_status()
# "break" new connection method and check we can make another request
with mock.patch(
"requests.packages.urllib3.connectionpool.HTTPSConnectionPool._new_conn",
) as new_conn:
session.get(self.default_balancer_url).raise_for_status()
new_conn.assert_not_called()
parsed_url = urlparse.urlparse(self.default_balancer_url)
# check that we have an open file for the ip connection
self.assertTrue([
c for c in psutil.Process(os.getpid()).connections()
if c.status == 'ESTABLISHED' and c.raddr.ip == parsed_url.hostname
and c.raddr.port == parsed_url.port
])
class ContentTypeHTTPServer(ManagedHTTPServer):
"""An HTTP/1.1 Server which reply with content type from path.
For example when requested http://host/text/plain it will reply
with Content-Type: text/plain header.
The body is always "OK"
"""
class RequestHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def do_GET(self):
# type: () -> None
self.send_response(200)
if self.path == '/':
self.send_header("Content-Length", 0)
return self.end_headers()
content_type = self.path[1:]
body = "OK"
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", len(body))
self.end_headers()
self.wfile.write(body)
log_message = logging.getLogger(__name__ + '.ContentTypeHTTPServer').info
class TestContentEncoding(BalancerTestCase):
"""Test how responses are gzip encoded or not depending on content type header.
"""
__partition_reference__ = 'ce'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestContentEncoding, cls)._getInstanceParameterDict()
parameter_dict['dummy_http_server'] = [
[cls.getManagedResource("content_type_server", ContentTypeHTTPServer).netloc, 1, False],
]
return parameter_dict
# Disabled test until we can rework on it for apache, or drop
# apache on the backend.
def disabled_test_gzip_encoding(self):
# type: () -> None
for content_type in (
'text/cache-manifest',
'text/html',
'text/plain',
'text/css',
'application/hal+json',
'application/json',
'application/x-javascript',
'text/xml',
'application/xml',
'application/rss+xml',
'text/javascript',
'application/javascript',
'image/svg+xml',
'application/x-font-ttf',
'application/font-woff',
'application/font-woff2',
'application/x-font-opentype',
'application/wasm',):
resp = requests.get(
urlparse.urljoin(self.default_balancer_url, content_type),
verify=False,
headers={"Accept-Encoding": "gzip, deflate",})
self.assertEqual(resp.headers['Content-Type'], content_type)
self.assertEqual(
resp.headers.get('Content-Encoding'),
'gzip',
'%s uses wrong encoding: %s' % (content_type, resp.headers.get('Content-Encoding')))
self.assertEqual(resp.text, 'OK')
def test_no_gzip_encoding(self):
# type: () -> None
resp = requests.get(urlparse.urljoin(self.default_balancer_url, '/image/png'), verify=False)
self.assertNotIn('Content-Encoding', resp.headers)
self.assertEqual(resp.text, 'OK')
class CaucaseCertificate(ManagedResource):
"""A certificate signed by a caucase service.
"""
ca_crt_file = None # type: str
crl_file = None # type: str
csr_file = None # type: str
cert_file = None # type: str
key_file = None # type: str
def open(self):
# type: () -> None
self.tmpdir = tempfile.mkdtemp()
self.ca_crt_file = os.path.join(self.tmpdir, 'ca-crt.pem')
self.crl_file = os.path.join(self.tmpdir, 'ca-crl.pem')
self.csr_file = os.path.join(self.tmpdir, 'csr.pem')
self.cert_file = os.path.join(self.tmpdir, 'crt.pem')
self.key_file = os.path.join(self.tmpdir, 'key.pem')
def close(self):
# type: () -> None
shutil.rmtree(self.tmpdir)
@property
def _caucase_path(self):
# type: () -> str
"""path of caucase executable.
"""
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
return os.path.join(software_release_root_path, 'bin', 'caucase')
def request(self, common_name, caucase):
# type: (str, CaucaseService) -> None
"""Generate certificate and request signature to the caucase service.
This overwrite any previously requested certificate for this instance.
"""
cas_args = [
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
]
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
with open(self.key_file, 'wb') as f:
f.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(
NameOID.COMMON_NAME,
common_name,
),
])).sign(
key,
hashes.SHA256(),
default_backend(),
)
with open(self.csr_file, 'wb') as f:
f.write(csr.public_bytes(serialization.Encoding.PEM))
csr_id = subprocess.check_output(
cas_args + [
'--send-csr', self.csr_file,
],
).split()[0]
assert csr_id
for _ in range(30):
if not subprocess.call(
cas_args + [
'--get-crt', csr_id, self.cert_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) == 0:
break
else:
time.sleep(1)
else:
raise RuntimeError('getting service certificate failed.')
with open(self.cert_file) as f:
assert 'BEGIN CERTIFICATE' in f.read()
def revoke(self, caucase):
# type: (str, CaucaseService) -> None
"""Revoke the client certificate on this caucase instance.
"""
subprocess.check_call([
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
'--revoke-crt', self.cert_file, self.key_file,
])
class TestServerTLSProvidedCertificate(BalancerTestCase):
"""Check that certificate and key can be provided as instance parameters.
"""
__partition_reference__ = 's'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
server_caucase = cls.getManagedResource('server_caucase', CaucaseService)
server_certificate = cls.getManagedResource('server_certificate', CaucaseCertificate)
server_certificate.request(cls._ipv4_address.decode(), server_caucase)
parameter_dict = super(TestServerTLSProvidedCertificate, cls)._getInstanceParameterDict()
with open(server_certificate.cert_file) as f:
parameter_dict['ssl']['cert'] = f.read()
with open(server_certificate.key_file) as f:
parameter_dict['ssl']['key'] = f.read()
return parameter_dict
def test_certificate_validates_with_provided_ca(self):
# type: () -> None
server_certificate = self.getManagedResource("server_certificate", CaucaseCertificate)
requests.get(self.default_balancer_url, verify=server_certificate.ca_crt_file)
...@@ -41,31 +41,50 @@ import requests ...@@ -41,31 +41,50 @@ import requests
from . import ERP5InstanceTestCase from . import ERP5InstanceTestCase
from . import setUpModule from . import setUpModule
setUpModule # pyflakes setUpModule # pyflakes
class TestPublishedURLIsReachableMixin(object): class TestPublishedURLIsReachableMixin(object):
"""Mixin that checks that default page of ERP5 is reachable. """Mixin that checks that default page of ERP5 is reachable.
""" """
def _checkERP5IsReachable(self, url):
def _checkERP5IsReachable(self, base_url, site_id, verify):
# We access ERP5 trough a "virtual host", which should make
# ERP5 produce URLs using https://virtual-host-name:1234/virtual_host_root
# as base.
virtual_host_url = urlparse.urljoin(
base_url,
'/VirtualHostBase/https/virtual-host-name:1234/{}/VirtualHostRoot/_vh_virtual_host_root/'
.format(site_id))
# What happens is that instantiation just create the services, but does not # What happens is that instantiation just create the services, but does not
# wait for ERP5 to be initialized. When this test run ERP5 instance is # wait for ERP5 to be initialized. When this test run ERP5 instance is
# instantiated, but zope is still busy creating the site and haproxy # instantiated, but zope is still busy creating the site and haproxy replies
# replies with 503 Service Unavailable when zope is not started yet, with # with 503 Service Unavailable when zope is not started yet, with 404 when
# 404 when erp5 site is not created, with 500 when mysql is not yet # erp5 site is not created, with 500 when mysql is not yet reachable, so we
# reachable, so we retry in a loop until we get a succesful response. # configure this requests session to retry.
for i in range(1, 60): # XXX we should probably add a promise instead
# XXX can we get CA from caucase already ? session = requests.Session()
r = requests.get(url, verify=False) session.mount(
if r.status_code != requests.codes.ok: base_url,
delay = i * 2 requests.adapters.HTTPAdapter(
self.logger.warn( max_retries=requests.packages.urllib3.util.retry.Retry(
"ERP5 was not available, sleeping for %ds and retrying", delay) total=60,
time.sleep(delay) backoff_factor=.5,
continue status_forcelist=(404, 500, 503))))
r.raise_for_status()
break r = session.get(virtual_host_url, verify=verify, allow_redirects=False)
self.assertEqual(r.status_code, requests.codes.found)
# access on / are redirected to login form, with virtual host preserved
self.assertEqual(r.headers.get('location'), 'https://virtual-host-name:1234/virtual_host_root/login_form')
# login page can be rendered and contain the text "ERP5"
r = session.get(
urlparse.urljoin(base_url, '{}/login_form'.format(site_id)),
verify=verify,
allow_redirects=False,
)
self.assertEqual(r.status_code, requests.codes.ok)
self.assertIn("ERP5", r.text) self.assertIn("ERP5", r.text)
def test_published_family_default_v6_is_reachable(self): def test_published_family_default_v6_is_reachable(self):
...@@ -73,15 +92,18 @@ class TestPublishedURLIsReachableMixin(object): ...@@ -73,15 +92,18 @@ class TestPublishedURLIsReachableMixin(object):
""" """
param_dict = self.getRootPartitionConnectionParameterDict() param_dict = self.getRootPartitionConnectionParameterDict()
self._checkERP5IsReachable( self._checkERP5IsReachable(
urlparse.urljoin(param_dict['family-default-v6'], param_dict['site-id'])) param_dict['family-default-v6'],
param_dict['site-id'],
verify=False)
def test_published_family_default_v4_is_reachable(self): def test_published_family_default_v4_is_reachable(self):
"""Tests the IPv4 URL published by the root partition is reachable. """Tests the IPv4 URL published by the root partition is reachable.
""" """
param_dict = self.getRootPartitionConnectionParameterDict() param_dict = self.getRootPartitionConnectionParameterDict()
self._checkERP5IsReachable( self._checkERP5IsReachable(
urlparse.urljoin(param_dict['family-default'], param_dict['site-id'])) param_dict['family-default'],
param_dict['site-id'],
verify=False)
class TestDefaultParameters( class TestDefaultParameters(
ERP5InstanceTestCase, TestPublishedURLIsReachableMixin): ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
...@@ -99,6 +121,30 @@ class TestMedusa(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin): ...@@ -99,6 +121,30 @@ class TestMedusa(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
def getInstanceParameterDict(cls): def getInstanceParameterDict(cls):
return {'_': json.dumps({'wsgi': False})} return {'_': json.dumps({'wsgi': False})}
class TestJupyter(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test ERP5 Jupyter notebook
"""
__partition_reference__ = 'jupyter'
@classmethod
def getInstanceParameterDict(cls):
return {'_': json.dumps({'jupyter': {'enable': True}})}
def test_jupyter_notebook_is_reachable(self):
param_dict = self.getRootPartitionConnectionParameterDict()
self.assertEqual(
'https://[%s]:8888/tree' % self._ipv6_address,
param_dict['jupyter-url']
)
result = requests.get(
param_dict['jupyter-url'], verify=False, allow_redirects=False)
self.assertEqual(
[requests.codes.found, True, '/login?next=%2Ftree'],
[result.status_code, result.is_redirect, result.headers['Location']]
)
class TestApacheBalancerPorts(ERP5InstanceTestCase): class TestApacheBalancerPorts(ERP5InstanceTestCase):
"""Instantiate with two zope families, this should create for each family: """Instantiate with two zope families, this should create for each family:
...@@ -132,19 +178,16 @@ class TestApacheBalancerPorts(ERP5InstanceTestCase): ...@@ -132,19 +178,16 @@ class TestApacheBalancerPorts(ERP5InstanceTestCase):
self.assertTrue(parsed.port) self.assertTrue(parsed.port)
def test_published_family_parameters(self): def test_published_family_parameters(self):
# when we request two families, we have two published family-{family_name} # when we request two families, we have two published family-{family_name} URLs
# URLs
param_dict = self.getRootPartitionConnectionParameterDict() param_dict = self.getRootPartitionConnectionParameterDict()
for family_name in ('family1', 'family2'): for family_name in ('family1', 'family2'):
self.checkValidHTTPSURL( self.checkValidHTTPSURL(
param_dict['family-{family_name}'.format(family_name=family_name)]) param_dict['family-{family_name}'.format(family_name=family_name)])
self.checkValidHTTPSURL( self.checkValidHTTPSURL(
param_dict['family-{family_name}-v6'.format( param_dict['family-{family_name}-v6'.format(family_name=family_name)])
family_name=family_name)])
def test_published_test_runner_url(self): def test_published_test_runner_url(self):
# each family's also a list of test test runner URLs, by default 3 per # each family's also a list of test test runner URLs, by default 3 per family
# family
param_dict = self.getRootPartitionConnectionParameterDict() param_dict = self.getRootPartitionConnectionParameterDict()
for family_name in ('family1', 'family2'): for family_name in ('family1', 'family2'):
family_test_runner_url_list = param_dict[ family_test_runner_url_list = param_dict[
...@@ -154,8 +197,7 @@ class TestApacheBalancerPorts(ERP5InstanceTestCase): ...@@ -154,8 +197,7 @@ class TestApacheBalancerPorts(ERP5InstanceTestCase):
self.checkValidHTTPSURL(url) self.checkValidHTTPSURL(url)
def test_zope_listen(self): def test_zope_listen(self):
# we requested 3 zope in family1 and 5 zopes in family2, we should have 8 # we requested 3 zope in family1 and 5 zopes in family2, we should have 8 zope running.
# zope running.
with self.slap.instance_supervisor_rpc as supervisor: with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo() all_process_info = supervisor.getAllProcessInfo()
self.assertEqual( self.assertEqual(
...@@ -190,46 +232,7 @@ class TestApacheBalancerPorts(ERP5InstanceTestCase): ...@@ -190,46 +232,7 @@ class TestApacheBalancerPorts(ERP5InstanceTestCase):
]) ])
class TestDisableTestRunner( class TestZopeNodeParameterOverride(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test ERP5 can be instantiated without test runner.
"""
__partition_reference__ = 'distr'
@classmethod
def getInstanceParameterDict(cls):
return {'_': json.dumps({'test-runner': {'enabled': False}})}
def test_no_runUnitTestScript(self):
"""No runUnitTest script should be generated in any partition.
"""
# self.computer_partition_root_path is the path of root partition.
# we want to assert that no scripts exist in any partition.
bin_programs = map(
os.path.basename,
glob.glob(self.computer_partition_root_path + "/../*/bin/*"))
self.assertTrue(bin_programs) # just to check the glob was correct.
self.assertNotIn('runUnitTest', bin_programs)
self.assertNotIn('runTestSuite', bin_programs)
def test_no_apache_testrunner_port(self):
# Apache only listen on two ports, there is no apache ports allocated for
# test runner
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
process_info, = [p for p in all_process_info if p['name'] == 'apache']
apache_process = psutil.Process(process_info['pid'])
self.assertEqual(
sorted([socket.AF_INET, socket.AF_INET6]),
sorted(
c.family
for c in apache_process.connections()
if c.status == 'LISTEN'
))
class TestZopeNodeParameterOverride(
ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test override zope node parameters """Test override zope node parameters
""" """
__partition_reference__ = 'override' __partition_reference__ = 'override'
...@@ -244,7 +247,7 @@ class TestZopeNodeParameterOverride( ...@@ -244,7 +247,7 @@ class TestZopeNodeParameterOverride(
"server": {}, "server": {},
"cache-size-bytes": "20MB", "cache-size-bytes": "20MB",
"cache-size-bytes!": [ "cache-size-bytes!": [
("bb-0", 1 << 20), ("bb-0", 1<<20),
("bb-.*", "500MB"), ("bb-.*", "500MB"),
], ],
"pool-timeout": "10m", "pool-timeout": "10m",
...@@ -315,7 +318,7 @@ class TestZopeNodeParameterOverride( ...@@ -315,7 +318,7 @@ class TestZopeNodeParameterOverride(
partition = self.getComputerPartitionPath('zope-bb') partition = self.getComputerPartitionPath('zope-bb')
for zope in xrange(5): for zope in xrange(5):
checkConf({ checkConf({
"cache-size-bytes": "500MB" if zope else 1 << 20, "cache-size-bytes": "500MB" if zope else 1<<20,
}, { }, {
"cache-size": None, "cache-size": None,
}) })
......
##############################################################################
# coding: utf-8
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import json
import glob
import urlparse
import socket
import sys
import time
import contextlib
import datetime
import subprocess
import gzip
from backports import lzma
import MySQLdb
from slapos.testing.utils import CrontabMixin
from slapos.testing.utils import getPromisePluginParameterDict
from . import ERP5InstanceTestCase
from . import setUpModule
setUpModule # pyflakes
class MariaDBTestCase(ERP5InstanceTestCase):
"""Base test case for mariadb tests.
"""
__partition_reference__ = 'm'
@classmethod
def getInstanceSoftwareType(cls):
return "mariadb"
@classmethod
def _getInstanceParameterDict(cls):
return {
'tcpv4-port': 3306,
'max-connection-count': 5,
'long-query-time': 3,
'max-slowqueries-threshold': 1,
'slowest-query-threshold': 0.1,
# XXX what is this ? should probably not be needed here
'name': cls.__name__,
'monitor-passwd': 'secret',
# XXX should probably not be needed here
'computer-memory-percent-threshold': 100,
}
@classmethod
def getInstanceParameterDict(cls):
return {'_': json.dumps(cls._getInstanceParameterDict())}
def getDatabaseConnection(self):
connection_parameter_dict = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])
db_url = urlparse.urlparse(connection_parameter_dict['database-list'][0])
self.assertEqual('mysql', db_url.scheme)
self.assertTrue(db_url.path.startswith('/'))
database_name = db_url.path[1:]
return MySQLdb.connect(
user=db_url.username,
passwd=db_url.password,
host=db_url.hostname,
port=db_url.port,
db=database_name,
)
class TestCrontabs(MariaDBTestCase, CrontabMixin):
def test_full_backup(self):
self._executeCrontabAtDate('mariadb-backup', '2050-01-01')
with gzip.open(
os.path.join(
self.computer_partition_root_path,
'srv',
'backup',
'mariadb-full',
'20500101000000.sql.gz',
),
'r') as dump:
self.assertIn('CREATE TABLE', dump.read())
def test_logrotate_and_slow_query_digest(self):
# slow query digest needs to run after logrotate, since it operates on the rotated
# file, so this tests both logrotate and slow query digest.
# run logrotate a first time so that it create state files
self._executeCrontabAtDate('logrotate', '2000-01-01')
# make two slow queries. We are using long-query-time=3, so the queries
# must take more than 3 seconds to be logged.
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("SELECT SLEEP(3.1)")
cnx.store_result()
cnx.query("SELECT SLEEP(3.2)")
# slow query crontab depends on crontab for log rotation
# to be executed first.
self._executeCrontabAtDate('logrotate', '2050-01-01')
# this logrotate leaves the log for the day as non compressed
rotated_log_file = os.path.join(
self.computer_partition_root_path,
'srv',
'backup',
'logrotate',
'mariadb_slowquery.log-20500101',
)
self.assertTrue(os.path.exists(rotated_log_file))
# then crontab to generate slow query report is executed
self._executeCrontabAtDate('generate-mariadb-slow-query-report', '2050-01-01')
# and it creates a report for the day
slow_query_report = os.path.join(
self.computer_partition_root_path,
'srv',
'monitor',
'private',
'slowquery_digest',
'slowquery_digest.txt-2050-01-01.xz',
)
with lzma.open(slow_query_report, 'r') as f:
# this is the hash for our "select sleep(n)" slow query
self.assertIn("ID 0xF9A57DD5A41825CA", f.read())
# on next day execution of logrotate, log files are compressed
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
# there's a promise checking that the threshold is not exceeded
# and it reports a problem since we set a threshold of 1 slow query
check_slow_query_promise_plugin = getPromisePluginParameterDict(
os.path.join(
self.computer_partition_root_path,
'etc',
'plugin',
'check-slow-query-pt-digest-result.py',
))
with self.assertRaises(subprocess.CalledProcessError) as error_context:
subprocess.check_output('faketime 2050-01-01 %s' % check_slow_query_promise_plugin['command'], shell=True)
self.assertEqual(
error_context.exception.output,
"""\
Threshold is lower than expected:
Expected total queries : 1.0 and current is: 2
Expected slowest query : 0.1 and current is: 3
""")
class TestMariaDB(MariaDBTestCase):
def test_utf8_collation(self):
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query(
"""
CREATE TABLE test_utf8_collation (
col1 CHAR(10)
)
""")
cnx.store_result()
cnx.query(
"""
insert into test_utf8_collation values ("à"), ("あ")
""")
cnx.store_result()
cnx.query(
"""
select * from test_utf8_collation where col1 = "a"
""")
self.assertEqual((('à',),), cnx.store_result().fetch_row(maxrows=2))
class TestMroonga(MariaDBTestCase):
def test_mroonga_plugin_loaded(self):
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("show plugins")
plugins = cnx.store_result().fetch_row(maxrows=1000)
self.assertIn(
('Mroonga', 'ACTIVE', 'STORAGE ENGINE', 'ha_mroonga.so', 'GPL'),
plugins)
def test_mroonga_normalize_udf(self):
# example from https://mroonga.org/docs/reference/udf/mroonga_normalize.html#usage
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query(
"""
SELECT mroonga_normalize("ABCDあぃうぇ㍑")
""")
self.assertEqual((('abcdあぃうぇリットル',),),
cnx.store_result().fetch_row(maxrows=2))
if 0:
# this example fail with:
# OperationalError: (1123, "Can't initialize function 'mroonga_normalize'; mroonga_normalize(): nonexistent normalizer NormalizerMySQLUnicodeCIExceptKanaCI")
# same error on mroonga "official" docker images using mysql
# https://hub.docker.com/layers/groonga/mroonga/latest/images/sha256-e5a979801c95544ca3a1228d2c4d819820850e0162649553f2e94850e5e1c988?context=explore
# so it's probably OK to ignore
cnx.query(
"""
SELECT mroonga_normalize("aBcDあぃウェ㍑", "NormalizerMySQLUnicodeCIExceptKanaCIKanaWithVoicedSoundMark")
""")
self.assertEqual((('ABCDあぃうぇ㍑',),),
cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_normalizer(self):
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-normalizer
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("SET NAMES utf8")
cnx.store_result()
cnx.query(
"""
CREATE TABLE diaries (
day DATE PRIMARY KEY,
content VARCHAR(64) NOT NULL,
FULLTEXT INDEX (content) COMMENT 'normalizer "NormalizerAuto"'
) Engine=Mroonga DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
""")
cnx.store_result()
cnx.query(
"""INSERT INTO diaries VALUES ("2013-04-23", "ブラックコーヒーを飲んだ。")""")
cnx.store_result()
cnx.query(
"""
SELECT *
FROM diaries
WHERE MATCH (content) AGAINST ("+ふらつく" IN BOOLEAN MODE)
""")
self.assertEqual((), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
SELECT *
FROM diaries
WHERE MATCH (content) AGAINST ("+ブラック" IN BOOLEAN MODE)
""")
self.assertEqual(
((datetime.date(2013, 4, 23), 'ブラックコーヒーを飲んだ。'),),
cnx.store_result().fetch_row(maxrows=2),
)
def test_mroonga_full_text_normalizer_TokenBigramSplitSymbolAlphaDigit(self):
# Similar to as ERP5's testI18NSearch with erp5_full_text_mroonga_catalog
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query(
"""
CREATE TABLE `full_text` (
`uid` BIGINT UNSIGNED NOT NULL,
`SearchableText` MEDIUMTEXT,
PRIMARY KEY (`uid`),
FULLTEXT `SearchableText` (`SearchableText`) COMMENT 'parser "TokenBigramSplitSymbolAlphaDigit"'
) ENGINE=mroonga
""")
cnx.store_result()
cnx.query(
"""
INSERT INTO full_text VALUES
(1, "Gabriel Fauré Quick brown fox jumps over the lazy dog"),
(2, "武者小路 実篤 Slow white fox jumps over the diligent dog."),
(3, "( - + )")""")
cnx.store_result()
cnx.query(
"""
SELECT uid
FROM full_text
WHERE MATCH (`full_text`.`SearchableText`) AGAINST ('*D+ Faure' IN BOOLEAN MODE)
""")
self.assertEqual(((1,),), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
SELECT uid
FROM full_text
WHERE MATCH (`full_text`.`SearchableText`) AGAINST ('*D+ 武者' IN BOOLEAN MODE)
""")
self.assertEqual(((2,),), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
SELECT uid
FROM full_text
WHERE MATCH (`full_text`.`SearchableText`) AGAINST ('*D+ +quick +fox +dog' IN BOOLEAN MODE)
""")
self.assertEqual(((1,),), cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_stem(self):
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-token-filters
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("SELECT mroonga_command('register token_filters/stem')")
self.assertEqual((('true',),), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
CREATE TABLE memos (
id INT NOT NULL PRIMARY KEY,
content TEXT NOT NULL,
FULLTEXT INDEX (content) COMMENT 'normalizer "NormalizerAuto", token_filters "TokenFilterStem"'
) Engine=Mroonga DEFAULT CHARSET=utf8
""")
cnx.store_result()
cnx.query(
"""INSERT INTO memos VALUES (1, "I develop Groonga"), (2, "I'm developing Groonga"), (3, "I developed Groonga")"""
)
cnx.store_result()
cnx.query(
"""
SELECT *
FROM memos
WHERE MATCH (content) AGAINST ("+develops" IN BOOLEAN MODE)
""")
self.assertEqual([
(1, "I develop Groonga"),
(2, "I'm developing Groonga"),
(3, "I developed Groonga"),
], list(sorted(cnx.store_result().fetch_row(maxrows=4))))
# Copyright (C) 2021 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
import json
import os.path
import unittest
from slapos.grid.utils import md5digest
from . import ERP5InstanceTestCase, setUpModule as _setUpModule
from .test_erp5 import TestPublishedURLIsReachableMixin
# skip tests when software release is built with wendelin.core 1.
def setUpModule():
_setUpModule()
cls = ERP5InstanceTestCase
if not os.path.exists(
os.path.join(
cls.slap.software_directory,
md5digest(cls.getSoftwareURL()),
'bin', 'wcfs')):
raise unittest.SkipTest("built with wendelin.core 1")
class TestWCFS(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test Wendelin Core File System
"""
__partition_reference__ = 'wcfs'
@classmethod
def getInstanceParameterDict(cls):
return {'_': json.dumps({'wcfs': {'enable': True}})}
def test_wcfs_accessible(self):
"""Verify that wcfs filesystem is basically accessible.
- we can read .wcfs/zurl
- its content is equal to published `serving-zurl`
"""
zurl = json.loads(
self.getComputerPartition('wcfs').getConnectionParameter('_')
)['serving-zurl']
mntpt = lookupMount(zurl)
zurl_ = readfile("%s/.wcfs/zurl" % mntpt)
self.assertEqual(zurl_, zurl)
# lookupMount returns /proc/mount entry for wcfs mounted to serve zurl.
def lookupMount(zurl):
for line in readfile('/proc/mounts').splitlines():
# <zurl> <mountpoint> fuse.wcfs ...
zurl_, mntpt, typ, _ = line.split(None, 3)
if typ != 'fuse.wcfs':
continue
if zurl_ == zurl:
return mntpt
raise KeyError("lookup mount %s: no /proc/mounts entry" % zurl)
# readfile returns content of file @path.
def readfile(path):
with open(path, 'r') as f:
return f.read()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment