Commit a26aaa13 authored by Jérome Perrin's avatar Jérome Perrin

software/erp5/test: use type annotations instead of type comments

using https://github.com/ilevkivskyi/com2ann
parent 3a2f1c1a
...@@ -34,8 +34,7 @@ class EchoHTTPServer(ManagedHTTPServer): ...@@ -34,8 +34,7 @@ class EchoHTTPServer(ManagedHTTPServer):
encoded in json. encoded in json.
""" """
class RequestHandler(BaseHTTPRequestHandler): class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self): def do_GET(self) -> None:
# type: () -> None
self.send_response(200) self.send_response(200)
self.send_header("Content-Type", "application/json") self.send_header("Content-Type", "application/json")
response = json.dumps( response = json.dumps(
...@@ -57,8 +56,7 @@ class EchoHTTP11Server(ManagedHTTPServer): ...@@ -57,8 +56,7 @@ class EchoHTTP11Server(ManagedHTTPServer):
""" """
class RequestHandler(BaseHTTPRequestHandler): class RequestHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1' protocol_version = 'HTTP/1.1'
def do_GET(self): def do_GET(self) -> None:
# type: () -> None
self.send_response(200) self.send_response(200)
self.send_header("Content-Type", "application/json") self.send_header("Content-Type", "application/json")
response = json.dumps( response = json.dumps(
...@@ -78,12 +76,11 @@ class EchoHTTP11Server(ManagedHTTPServer): ...@@ -78,12 +76,11 @@ class EchoHTTP11Server(ManagedHTTPServer):
class CaucaseService(ManagedResource): class CaucaseService(ManagedResource):
"""A caucase service. """A caucase service.
""" """
url = None # type: str url: str = None
directory = None # type: str directory: str = None
_caucased_process = None # type: subprocess.Popen _caucased_process: subprocess.Popen = None
def open(self): def open(self) -> None:
# type: () -> None
# start a caucased and server certificate. # start a caucased and server certificate.
software_release_root_path = os.path.join( software_release_root_path = os.path.join(
self._cls.slap._software_root, self._cls.slap._software_root,
...@@ -121,8 +118,7 @@ class CaucaseService(ManagedResource): ...@@ -121,8 +118,7 @@ class CaucaseService(ManagedResource):
else: else:
raise RuntimeError('caucased failed to start.') raise RuntimeError('caucased failed to start.')
def close(self): def close(self) -> None:
# type: () -> None
self._caucased_process.terminate() self._caucased_process.terminate()
self._caucased_process.wait() self._caucased_process.wait()
self._caucased_process.stdout.close() self._caucased_process.stdout.close()
...@@ -143,8 +139,7 @@ class BalancerTestCase(ERP5InstanceTestCase): ...@@ -143,8 +139,7 @@ class BalancerTestCase(ERP5InstanceTestCase):
return 'balancer' return 'balancer'
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
return { return {
'tcpv4-port': 8000, 'tcpv4-port': 8000,
'computer-memory-percent-threshold': 100, 'computer-memory-percent-threshold': 100,
...@@ -179,12 +174,10 @@ class BalancerTestCase(ERP5InstanceTestCase): ...@@ -179,12 +174,10 @@ class BalancerTestCase(ERP5InstanceTestCase):
} }
@classmethod @classmethod
def getInstanceParameterDict(cls): def getInstanceParameterDict(cls) -> dict:
# type: () -> dict
return {'_': json.dumps(cls._getInstanceParameterDict())} return {'_': json.dumps(cls._getInstanceParameterDict())}
def setUp(self): def setUp(self) -> None:
# type: () -> None
self.default_balancer_url = json.loads( self.default_balancer_url = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])['default'] self.computer_partition.getConnectionParameterDict()['_'])['default']
...@@ -195,8 +188,7 @@ class SlowHTTPServer(ManagedHTTPServer): ...@@ -195,8 +188,7 @@ class SlowHTTPServer(ManagedHTTPServer):
Timeout is 2 seconds by default, and can be specified in the path of the URL Timeout is 2 seconds by default, and can be specified in the path of the URL
""" """
class RequestHandler(BaseHTTPRequestHandler): class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self): def do_GET(self) -> None:
# type: () -> None
self.send_response(200) self.send_response(200)
self.send_header("Content-Type", "text/plain") self.send_header("Content-Type", "text/plain")
timeout = 2 timeout = 2
...@@ -214,8 +206,7 @@ class SlowHTTPServer(ManagedHTTPServer): ...@@ -214,8 +206,7 @@ class SlowHTTPServer(ManagedHTTPServer):
class TestTimeout(BalancerTestCase, CrontabMixin): class TestTimeout(BalancerTestCase, CrontabMixin):
__partition_reference__ = 't' __partition_reference__ = 't'
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
parameter_dict = super()._getInstanceParameterDict() parameter_dict = super()._getInstanceParameterDict()
# use a slow server instead # use a slow server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]] parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]]
...@@ -223,8 +214,7 @@ class TestTimeout(BalancerTestCase, CrontabMixin): ...@@ -223,8 +214,7 @@ class TestTimeout(BalancerTestCase, CrontabMixin):
parameter_dict['timeout-dict'] = {'default': 1} parameter_dict['timeout-dict'] = {'default': 1}
return parameter_dict return parameter_dict
def test_timeout(self): def test_timeout(self) -> None:
# type: () -> None
self.assertEqual( self.assertEqual(
requests.get( requests.get(
urllib.parse.urljoin(self.default_balancer_url, '/1'), urllib.parse.urljoin(self.default_balancer_url, '/1'),
...@@ -242,15 +232,13 @@ class TestLog(BalancerTestCase, CrontabMixin): ...@@ -242,15 +232,13 @@ class TestLog(BalancerTestCase, CrontabMixin):
""" """
__partition_reference__ = 'l' __partition_reference__ = 'l'
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
parameter_dict = super()._getInstanceParameterDict() parameter_dict = super()._getInstanceParameterDict()
# use a slow server instead # use a slow server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]] parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]]
return parameter_dict return parameter_dict
def test_access_log_format(self): def test_access_log_format(self) -> None:
# type: () -> None
requests.get( requests.get(
urllib.parse.urljoin(self.default_balancer_url, '/url_path'), urllib.parse.urljoin(self.default_balancer_url, '/url_path'),
verify=False, verify=False,
...@@ -274,8 +262,7 @@ class TestLog(BalancerTestCase, CrontabMixin): ...@@ -274,8 +262,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
self.assertGreater(request_time, 2 * 1000) self.assertGreater(request_time, 2 * 1000)
self.assertLess(request_time, 20 * 1000) self.assertLess(request_time, 20 * 1000)
def test_access_log_apachedex_report(self): def test_access_log_apachedex_report(self) -> None:
# type: () -> None
# make a request so that we have something in the logs # make a request so that we have something in the logs
requests.get(self.default_balancer_url, verify=False) requests.get(self.default_balancer_url, verify=False)
...@@ -297,8 +284,7 @@ class TestLog(BalancerTestCase, CrontabMixin): ...@@ -297,8 +284,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
# having this table means that apachedex could parse some lines. # having this table means that apachedex could parse some lines.
self.assertIn('<h2>Hits per status code</h2>', report_text) self.assertIn('<h2>Hits per status code</h2>', report_text)
def test_access_log_rotation(self): def test_access_log_rotation(self) -> None:
# type: () -> None
# run logrotate a first time so that it create state files # run logrotate a first time so that it create state files
self._executeCrontabAtDate('logrotate', '2000-01-01') self._executeCrontabAtDate('logrotate', '2000-01-01')
...@@ -324,8 +310,7 @@ class TestLog(BalancerTestCase, CrontabMixin): ...@@ -324,8 +310,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
self.assertTrue(os.path.exists(rotated_log_file + '.xz')) self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file)) self.assertFalse(os.path.exists(rotated_log_file))
def test_error_log(self): def test_error_log(self) -> None:
# type: () -> None
# stop backend server # stop backend server
backend_server = self.getManagedResource("slow_web_server", SlowHTTPServer) backend_server = self.getManagedResource("slow_web_server", SlowHTTPServer)
self.addCleanup(backend_server.open) self.addCleanup(backend_server.open)
...@@ -356,8 +341,7 @@ class BalancerCookieHTTPServer(ManagedHTTPServer): ...@@ -356,8 +341,7 @@ class BalancerCookieHTTPServer(ManagedHTTPServer):
def RequestHandler(self): def RequestHandler(self):
server = self server = self
class RequestHandler(BaseHTTPRequestHandler): class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self): def do_GET(self) -> None:
# type: () -> None
self.send_response(200) self.send_response(200)
self.send_header("Content-Type", "text/plain") self.send_header("Content-Type", "text/plain")
if self.path == '/set_cookie': if self.path == '/set_cookie':
...@@ -378,8 +362,7 @@ class TestBalancer(BalancerTestCase): ...@@ -378,8 +362,7 @@ class TestBalancer(BalancerTestCase):
""" """
__partition_reference__ = 'b' __partition_reference__ = 'b'
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
parameter_dict = super()._getInstanceParameterDict() parameter_dict = super()._getInstanceParameterDict()
# use two backend servers # use two backend servers
...@@ -389,16 +372,14 @@ class TestBalancer(BalancerTestCase): ...@@ -389,16 +372,14 @@ class TestBalancer(BalancerTestCase):
] ]
return parameter_dict return parameter_dict
def test_balancer_round_robin(self): def test_balancer_round_robin(self) -> None:
# type: () -> None
# requests are by default balanced to both servers # requests are by default balanced to both servers
self.assertEqual( self.assertEqual(
{requests.get(self.default_balancer_url, verify=False).text for _ in range(10)}, {requests.get(self.default_balancer_url, verify=False).text for _ in range(10)},
{'backend_web_server1', 'backend_web_server2'} {'backend_web_server1', 'backend_web_server2'}
) )
def test_balancer_server_down(self): def test_balancer_server_down(self) -> None:
# type: () -> None
# if one backend is down, it is excluded from balancer # if one backend is down, it is excluded from balancer
self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).close() self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).close()
self.addCleanup(self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).open) self.addCleanup(self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).open)
...@@ -407,8 +388,7 @@ class TestBalancer(BalancerTestCase): ...@@ -407,8 +388,7 @@ class TestBalancer(BalancerTestCase):
{'backend_web_server1',} {'backend_web_server1',}
) )
def test_balancer_set_cookie(self): def test_balancer_set_cookie(self) -> None:
# type: () -> None
# if backend provides a "SERVERID" cookie, balancer will overwrite it with the # if backend provides a "SERVERID" cookie, balancer will overwrite it with the
# backend selected by balancing algorithm # backend selected by balancing algorithm
self.assertIn( self.assertIn(
...@@ -416,8 +396,7 @@ class TestBalancer(BalancerTestCase): ...@@ -416,8 +396,7 @@ class TestBalancer(BalancerTestCase):
('default-0', 'default-1'), ('default-0', 'default-1'),
) )
def test_balancer_respects_sticky_cookie(self): def test_balancer_respects_sticky_cookie(self) -> None:
# type: () -> None
# if request is made with the sticky cookie, the client stick on one balancer # if request is made with the sticky cookie, the client stick on one balancer
cookies = dict(SERVERID='default-1') cookies = dict(SERVERID='default-1')
self.assertEqual( self.assertEqual(
...@@ -432,8 +411,7 @@ class TestBalancer(BalancerTestCase): ...@@ -432,8 +411,7 @@ class TestBalancer(BalancerTestCase):
requests.get(self.default_balancer_url, verify=False, cookies=cookies).text, requests.get(self.default_balancer_url, verify=False, cookies=cookies).text,
'backend_web_server1') 'backend_web_server1')
def test_balancer_stats_socket(self): def test_balancer_stats_socket(self) -> None:
# type: () -> None
# real time statistics can be obtained by using the stats socket and there # real time statistics can be obtained by using the stats socket and there
# is a wrapper which makes this a bit easier. # is a wrapper which makes this a bit easier.
socat_process = subprocess.Popen( socat_process = subprocess.Popen(
...@@ -458,8 +436,7 @@ class TestTestRunnerEntryPoints(BalancerTestCase): ...@@ -458,8 +436,7 @@ class TestTestRunnerEntryPoints(BalancerTestCase):
""" """
__partition_reference__ = 't' __partition_reference__ = 't'
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
parameter_dict = super()._getInstanceParameterDict() parameter_dict = super()._getInstanceParameterDict()
parameter_dict['dummy_http_server-test-runner-address-list'] = [ parameter_dict['dummy_http_server-test-runner-address-list'] = [
...@@ -478,8 +455,7 @@ class TestTestRunnerEntryPoints(BalancerTestCase): ...@@ -478,8 +455,7 @@ class TestTestRunnerEntryPoints(BalancerTestCase):
] ]
return parameter_dict return parameter_dict
def test_use_proper_backend(self): def test_use_proper_backend(self) -> None:
# type: () -> None
# requests are directed to proper backend based on URL path # requests are directed to proper backend based on URL path
test_runner_url_list = self.getRootPartitionConnectionParameterDict( test_runner_url_list = self.getRootPartitionConnectionParameterDict(
)['default-test-runner-url-list'] )['default-test-runner-url-list']
...@@ -532,8 +508,7 @@ class TestHTTP(BalancerTestCase): ...@@ -532,8 +508,7 @@ class TestHTTP(BalancerTestCase):
"""Check HTTP protocol with a HTTP/1.1 backend """Check HTTP protocol with a HTTP/1.1 backend
""" """
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
parameter_dict = super()._getInstanceParameterDict() parameter_dict = super()._getInstanceParameterDict()
# use a HTTP/1.1 server instead # use a HTTP/1.1 server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("HTTP/1.1 Server", EchoHTTP11Server).netloc, 1, False]] parameter_dict['dummy_http_server'] = [[cls.getManagedResource("HTTP/1.1 Server", EchoHTTP11Server).netloc, 1, False]]
...@@ -541,8 +516,7 @@ class TestHTTP(BalancerTestCase): ...@@ -541,8 +516,7 @@ class TestHTTP(BalancerTestCase):
__partition_reference__ = 'h' __partition_reference__ = 'h'
def test_http_version(self): def test_http_version(self) -> None:
# type: () -> None
self.assertEqual( self.assertEqual(
subprocess.check_output([ subprocess.check_output([
'curl', 'curl',
...@@ -558,8 +532,7 @@ class TestHTTP(BalancerTestCase): ...@@ -558,8 +532,7 @@ class TestHTTP(BalancerTestCase):
b'2', b'2',
) )
def test_keep_alive(self): def test_keep_alive(self) -> None:
# type: () -> None
# when doing two requests, connection is established only once # when doing two requests, connection is established only once
with requests.Session() as session: with requests.Session() as session:
session.verify = False session.verify = False
...@@ -594,8 +567,7 @@ class ContentTypeHTTPServer(ManagedHTTPServer): ...@@ -594,8 +567,7 @@ class ContentTypeHTTPServer(ManagedHTTPServer):
""" """
class RequestHandler(BaseHTTPRequestHandler): class RequestHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1' protocol_version = 'HTTP/1.1'
def do_GET(self): def do_GET(self) -> None:
# type: () -> None
self.send_response(200) self.send_response(200)
if self.path == '/': if self.path == '/':
self.send_header("Content-Length", '0') self.send_header("Content-Length", '0')
...@@ -615,16 +587,14 @@ class TestContentEncoding(BalancerTestCase): ...@@ -615,16 +587,14 @@ class TestContentEncoding(BalancerTestCase):
""" """
__partition_reference__ = 'ce' __partition_reference__ = 'ce'
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
parameter_dict = super()._getInstanceParameterDict() parameter_dict = super()._getInstanceParameterDict()
parameter_dict['dummy_http_server'] = [ parameter_dict['dummy_http_server'] = [
[cls.getManagedResource("content_type_server", ContentTypeHTTPServer).netloc, 1, False], [cls.getManagedResource("content_type_server", ContentTypeHTTPServer).netloc, 1, False],
] ]
return parameter_dict return parameter_dict
def test_gzip_encoding(self): def test_gzip_encoding(self) -> None:
# type: () -> None
for content_type in ( for content_type in (
'text/cache-manifest', 'text/cache-manifest',
'text/html', 'text/html',
...@@ -652,8 +622,7 @@ class TestContentEncoding(BalancerTestCase): ...@@ -652,8 +622,7 @@ class TestContentEncoding(BalancerTestCase):
'{} uses wrong encoding: {}'.format(content_type, resp.headers.get('Content-Encoding'))) '{} uses wrong encoding: {}'.format(content_type, resp.headers.get('Content-Encoding')))
self.assertEqual(resp.text, 'OK') self.assertEqual(resp.text, 'OK')
def test_no_gzip_encoding(self): def test_no_gzip_encoding(self) -> None:
# type: () -> None
resp = requests.get(urllib.parse.urljoin(self.default_balancer_url, '/image/png'), verify=False) resp = requests.get(urllib.parse.urljoin(self.default_balancer_url, '/image/png'), verify=False)
self.assertNotIn('Content-Encoding', resp.headers) self.assertNotIn('Content-Encoding', resp.headers)
self.assertEqual(resp.text, 'OK') self.assertEqual(resp.text, 'OK')
...@@ -663,14 +632,13 @@ class CaucaseCertificate(ManagedResource): ...@@ -663,14 +632,13 @@ class CaucaseCertificate(ManagedResource):
"""A certificate signed by a caucase service. """A certificate signed by a caucase service.
""" """
ca_crt_file = None # type: str ca_crt_file: str = None
crl_file = None # type: str crl_file: str = None
csr_file = None # type: str csr_file: str = None
cert_file = None # type: str cert_file: str = None
key_file = None # type: str key_file: str = None
def open(self): def open(self) -> None:
# type: () -> None
self.tmpdir = tempfile.mkdtemp() self.tmpdir = tempfile.mkdtemp()
self.ca_crt_file = os.path.join(self.tmpdir, 'ca-crt.pem') self.ca_crt_file = os.path.join(self.tmpdir, 'ca-crt.pem')
self.crl_file = os.path.join(self.tmpdir, 'ca-crl.pem') self.crl_file = os.path.join(self.tmpdir, 'ca-crl.pem')
...@@ -678,13 +646,11 @@ class CaucaseCertificate(ManagedResource): ...@@ -678,13 +646,11 @@ class CaucaseCertificate(ManagedResource):
self.cert_file = os.path.join(self.tmpdir, 'crt.pem') self.cert_file = os.path.join(self.tmpdir, 'crt.pem')
self.key_file = os.path.join(self.tmpdir, 'key.pem') self.key_file = os.path.join(self.tmpdir, 'key.pem')
def close(self): def close(self) -> None:
# type: () -> None
shutil.rmtree(self.tmpdir) shutil.rmtree(self.tmpdir)
@property @property
def _caucase_path(self): def _caucase_path(self) -> str:
# type: () -> str
"""path of caucase executable. """path of caucase executable.
""" """
software_release_root_path = os.path.join( software_release_root_path = os.path.join(
...@@ -693,8 +659,7 @@ class CaucaseCertificate(ManagedResource): ...@@ -693,8 +659,7 @@ class CaucaseCertificate(ManagedResource):
) )
return os.path.join(software_release_root_path, 'bin', 'caucase') return os.path.join(software_release_root_path, 'bin', 'caucase')
def request(self, common_name, caucase): def request(self, common_name: str, caucase: CaucaseService) -> None:
# type: (str, CaucaseService) -> None
"""Generate certificate and request signature to the caucase service. """Generate certificate and request signature to the caucase service.
This overwrite any previously requested certificate for this instance. This overwrite any previously requested certificate for this instance.
...@@ -756,8 +721,7 @@ class CaucaseCertificate(ManagedResource): ...@@ -756,8 +721,7 @@ class CaucaseCertificate(ManagedResource):
with open(self.cert_file) as cert_file: with open(self.cert_file) as cert_file:
assert 'BEGIN CERTIFICATE' in cert_file.read() assert 'BEGIN CERTIFICATE' in cert_file.read()
def revoke(self, caucase): def revoke(self, caucase: CaucaseService) -> None:
# type: (CaucaseService) -> None
"""Revoke the client certificate on this caucase instance. """Revoke the client certificate on this caucase instance.
""" """
subprocess.check_call([ subprocess.check_call([
...@@ -773,8 +737,7 @@ class TestFrontendXForwardedFor(BalancerTestCase): ...@@ -773,8 +737,7 @@ class TestFrontendXForwardedFor(BalancerTestCase):
__partition_reference__ = 'xff' __partition_reference__ = 'xff'
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
frontend_caucase = cls.getManagedResource('frontend_caucase', CaucaseService) frontend_caucase = cls.getManagedResource('frontend_caucase', CaucaseService)
certificate = cls.getManagedResource('client_certificate', CaucaseCertificate) certificate = cls.getManagedResource('client_certificate', CaucaseCertificate)
certificate.request('shared frontend', frontend_caucase) certificate.request('shared frontend', frontend_caucase)
...@@ -791,8 +754,7 @@ class TestFrontendXForwardedFor(BalancerTestCase): ...@@ -791,8 +754,7 @@ class TestFrontendXForwardedFor(BalancerTestCase):
parameter_dict['ssl']['frontend-caucase-url-list'] = [frontend_caucase.url] parameter_dict['ssl']['frontend-caucase-url-list'] = [frontend_caucase.url]
return parameter_dict return parameter_dict
def test_x_forwarded_for_added_when_verified_connection(self): def test_x_forwarded_for_added_when_verified_connection(self) -> None:
# type: () -> None
client_certificate = self.getManagedResource('client_certificate', CaucaseCertificate) client_certificate = self.getManagedResource('client_certificate', CaucaseCertificate)
for backend in ('default', 'default-auth'): for backend in ('default', 'default-auth'):
...@@ -805,8 +767,7 @@ class TestFrontendXForwardedFor(BalancerTestCase): ...@@ -805,8 +767,7 @@ class TestFrontendXForwardedFor(BalancerTestCase):
).json() ).json()
self.assertEqual(result['Incoming Headers'].get('x-forwarded-for', '').split(', ')[0], '1.2.3.4') self.assertEqual(result['Incoming Headers'].get('x-forwarded-for', '').split(', ')[0], '1.2.3.4')
def test_x_forwarded_for_stripped_when_no_certificate(self): def test_x_forwarded_for_stripped_when_no_certificate(self) -> None:
# type: () -> None
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default'] balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default']
result = requests.get( result = requests.get(
balancer_url, balancer_url,
...@@ -822,8 +783,7 @@ class TestFrontendXForwardedFor(BalancerTestCase): ...@@ -822,8 +783,7 @@ class TestFrontendXForwardedFor(BalancerTestCase):
verify=False, verify=False,
) )
def test_x_forwarded_for_stripped_when_not_verified_certificate(self): def test_x_forwarded_for_stripped_when_not_verified_certificate(self) -> None:
# type: () -> None
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default'] balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default']
# certificate from an unknown CA # certificate from an unknown CA
...@@ -855,8 +815,7 @@ class TestServerTLSProvidedCertificate(BalancerTestCase): ...@@ -855,8 +815,7 @@ class TestServerTLSProvidedCertificate(BalancerTestCase):
__partition_reference__ = 's' __partition_reference__ = 's'
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
server_caucase = cls.getManagedResource('server_caucase', CaucaseService) server_caucase = cls.getManagedResource('server_caucase', CaucaseService)
server_certificate = cls.getManagedResource('server_certificate', CaucaseCertificate) server_certificate = cls.getManagedResource('server_certificate', CaucaseCertificate)
server_certificate.request(cls._ipv4_address, server_caucase) server_certificate.request(cls._ipv4_address, server_caucase)
...@@ -867,8 +826,7 @@ class TestServerTLSProvidedCertificate(BalancerTestCase): ...@@ -867,8 +826,7 @@ class TestServerTLSProvidedCertificate(BalancerTestCase):
parameter_dict['ssl']['key'] = f.read() parameter_dict['ssl']['key'] = f.read()
return parameter_dict return parameter_dict
def test_certificate_validates_with_provided_ca(self): def test_certificate_validates_with_provided_ca(self) -> None:
# type: () -> None
server_certificate = self.getManagedResource("server_certificate", CaucaseCertificate) server_certificate = self.getManagedResource("server_certificate", CaucaseCertificate)
requests.get(self.default_balancer_url, verify=server_certificate.ca_crt_file) requests.get(self.default_balancer_url, verify=server_certificate.ca_crt_file)
...@@ -877,8 +835,7 @@ class TestClientTLS(BalancerTestCase): ...@@ -877,8 +835,7 @@ class TestClientTLS(BalancerTestCase):
__partition_reference__ = 'c' __partition_reference__ = 'c'
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
frontend_caucase1 = cls.getManagedResource('frontend_caucase1', CaucaseService) frontend_caucase1 = cls.getManagedResource('frontend_caucase1', CaucaseService)
certificate1 = cls.getManagedResource('client_certificate1', CaucaseCertificate) certificate1 = cls.getManagedResource('client_certificate1', CaucaseCertificate)
certificate1.request('client_certificate1', frontend_caucase1) certificate1.request('client_certificate1', frontend_caucase1)
...@@ -897,8 +854,7 @@ class TestClientTLS(BalancerTestCase): ...@@ -897,8 +854,7 @@ class TestClientTLS(BalancerTestCase):
] ]
return parameter_dict return parameter_dict
def test_refresh_crl(self): def test_refresh_crl(self) -> None:
# type: () -> None
logger = self.logger logger = self.logger
class DebugLogFile: class DebugLogFile:
...@@ -916,8 +872,7 @@ class TestClientTLS(BalancerTestCase): ...@@ -916,8 +872,7 @@ class TestClientTLS(BalancerTestCase):
# when client certificate can be authenticated, backend receive the CN of # when client certificate can be authenticated, backend receive the CN of
# the client certificate in "remote-user" header # the client certificate in "remote-user" header
def _make_request(): def _make_request() -> dict:
# type: () -> dict
return requests.get( return requests.get(
self.default_balancer_url, self.default_balancer_url,
cert=(client_certificate.cert_file, client_certificate.key_file), cert=(client_certificate.cert_file, client_certificate.key_file),
...@@ -976,8 +931,7 @@ class TestPathBasedRouting(BalancerTestCase): ...@@ -976,8 +931,7 @@ class TestPathBasedRouting(BalancerTestCase):
__partition_reference__ = 'pbr' __partition_reference__ = 'pbr'
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
parameter_dict = super()._getInstanceParameterDict() parameter_dict = super()._getInstanceParameterDict()
parameter_dict['zope-family-dict'][ parameter_dict['zope-family-dict'][
'second' 'second'
...@@ -1003,8 +957,7 @@ class TestPathBasedRouting(BalancerTestCase): ...@@ -1003,8 +957,7 @@ class TestPathBasedRouting(BalancerTestCase):
] ]
return parameter_dict return parameter_dict
def test_routing(self): def test_routing(self) -> None:
# type: () -> None
published_dict = json.loads(self.computer_partition.getConnectionParameterDict()['_']) published_dict = json.loads(self.computer_partition.getConnectionParameterDict()['_'])
scheme = 'scheme' scheme = 'scheme'
netloc = 'example.com:8080' netloc = 'example.com:8080'
...@@ -1015,8 +968,7 @@ class TestPathBasedRouting(BalancerTestCase): ...@@ -1015,8 +968,7 @@ class TestPathBasedRouting(BalancerTestCase):
# For easier reading of test data, visually separating the virtual host # For easier reading of test data, visually separating the virtual host
# base from the virtual host root # base from the virtual host root
vhr = '/VirtualHostRoot' vhr = '/VirtualHostRoot'
def assertRoutingEqual(family, path, expected_path): def assertRoutingEqual(family: str, path: str, expected_path: str) -> None:
# type: (str, str, str) -> None
# sanity check: unlike the rules, this test is sensitive to outermost # sanity check: unlike the rules, this test is sensitive to outermost
# slashes, and paths must be absolute-ish for code simplicity. # slashes, and paths must be absolute-ish for code simplicity.
assert path.startswith('/') assert path.startswith('/')
......
...@@ -634,8 +634,7 @@ class ZopeTestMixin(ZopeSkinsMixin, CrontabMixin): ...@@ -634,8 +634,7 @@ class ZopeTestMixin(ZopeSkinsMixin, CrontabMixin):
)): )):
os.unlink(logfile) os.unlink(logfile)
def _getCrontabCommand(self, crontab_name): def _getCrontabCommand(self, crontab_name: str) -> str:
# type: (str) -> str
"""Read a crontab and return the command that is executed. """Read a crontab and return the command that is executed.
overloaded to use crontab from zope partition overloaded to use crontab from zope partition
...@@ -1046,8 +1045,7 @@ class TestNEO(ZopeSkinsMixin, CrontabMixin, ERP5InstanceTestCase): ...@@ -1046,8 +1045,7 @@ class TestNEO(ZopeSkinsMixin, CrontabMixin, ERP5InstanceTestCase):
__partition_reference__ = 'n' __partition_reference__ = 'n'
__test_matrix__ = matrix((neo,)) __test_matrix__ = matrix((neo,))
def _getCrontabCommand(self, crontab_name): def _getCrontabCommand(self, crontab_name: str) -> str:
# type: (str) -> str
"""Read a crontab and return the command that is executed. """Read a crontab and return the command that is executed.
overloaded to use crontab from neo partition overloaded to use crontab from neo partition
...@@ -1127,7 +1125,7 @@ class TestUnsetWithMaxRlimitNofileParameter(ERP5InstanceTestCase, TestPublishedU ...@@ -1127,7 +1125,7 @@ class TestUnsetWithMaxRlimitNofileParameter(ERP5InstanceTestCase, TestPublishedU
""" """
__partition_reference__ = 'unset-with-max-rlimit-nofile' __partition_reference__ = 'unset-with-max-rlimit-nofile'
def test_unset_with_max_rlimit_nofile(self): def test_unset_with_max_rlimit_nofile(self) -> None:
with self.slap.instance_supervisor_rpc as supervisor: with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo() all_process_info = supervisor.getAllProcessInfo()
limit = resource.getrlimit(resource.RLIMIT_NOFILE) limit = resource.getrlimit(resource.RLIMIT_NOFILE)
......
...@@ -36,6 +36,7 @@ import subprocess ...@@ -36,6 +36,7 @@ import subprocess
import urllib.parse import urllib.parse
import MySQLdb import MySQLdb
import MySQLdb.connections
from slapos.testing.utils import CrontabMixin, getPromisePluginParameterDict from slapos.testing.utils import CrontabMixin, getPromisePluginParameterDict
...@@ -60,8 +61,7 @@ class MariaDBTestCase(ERP5InstanceTestCase): ...@@ -60,8 +61,7 @@ class MariaDBTestCase(ERP5InstanceTestCase):
return "mariadb" return "mariadb"
@classmethod @classmethod
def _getInstanceParameterDict(cls): def _getInstanceParameterDict(cls) -> dict:
# type: () -> dict
return { return {
'tcpv4-port': 3306, 'tcpv4-port': 3306,
'max-connection-count': 5, 'max-connection-count': 5,
...@@ -76,12 +76,10 @@ class MariaDBTestCase(ERP5InstanceTestCase): ...@@ -76,12 +76,10 @@ class MariaDBTestCase(ERP5InstanceTestCase):
} }
@classmethod @classmethod
def getInstanceParameterDict(cls): def getInstanceParameterDict(cls) -> dict:
# type: () -> dict
return {'_': json.dumps(cls._getInstanceParameterDict())} return {'_': json.dumps(cls._getInstanceParameterDict())}
def getDatabaseConnection(self): def getDatabaseConnection(self) -> MySQLdb.connections.Connection:
# type: () -> MySQLdb.connections.Connection
connection_parameter_dict = json.loads( connection_parameter_dict = json.loads(
self.computer_partition.getConnectionParameterDict()['_']) self.computer_partition.getConnectionParameterDict()['_'])
db_url = urllib.parse.urlparse(connection_parameter_dict['database-list'][0]) db_url = urllib.parse.urlparse(connection_parameter_dict['database-list'][0])
...@@ -106,8 +104,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin): ...@@ -106,8 +104,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
'*/srv/backup/*', '*/srv/backup/*',
) )
def test_full_backup(self): def test_full_backup(self) -> None:
# type: () -> None
self._executeCrontabAtDate('mariadb-backup', '2050-01-01') self._executeCrontabAtDate('mariadb-backup', '2050-01-01')
full_backup_file, = glob.glob( full_backup_file, = glob.glob(
os.path.join( os.path.join(
...@@ -121,8 +118,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin): ...@@ -121,8 +118,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
with gzip.open(full_backup_file, 'rt') as dump: with gzip.open(full_backup_file, 'rt') as dump:
self.assertIn('CREATE TABLE', dump.read()) self.assertIn('CREATE TABLE', dump.read())
def test_logrotate_and_slow_query_digest(self): def test_logrotate_and_slow_query_digest(self) -> None:
# type: () -> None
# slow query digest needs to run after logrotate, since it operates on the rotated # slow query digest needs to run after logrotate, since it operates on the rotated
# file, so this tests both logrotate and slow query digest. # file, so this tests both logrotate and slow query digest.
...@@ -193,8 +189,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin): ...@@ -193,8 +189,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
class TestMariaDB(MariaDBTestCase): class TestMariaDB(MariaDBTestCase):
def test_utf8_collation(self): def test_utf8_collation(self) -> None:
# type: () -> None
cnx = self.getDatabaseConnection() cnx = self.getDatabaseConnection()
with contextlib.closing(cnx): with contextlib.closing(cnx):
cnx.query( cnx.query(
...@@ -219,8 +214,7 @@ class TestMariaDB(MariaDBTestCase): ...@@ -219,8 +214,7 @@ class TestMariaDB(MariaDBTestCase):
class TestMroonga(MariaDBTestCase): class TestMroonga(MariaDBTestCase):
def test_mroonga_plugin_loaded(self): def test_mroonga_plugin_loaded(self) -> None:
# type: () -> None
cnx = self.getDatabaseConnection() cnx = self.getDatabaseConnection()
with contextlib.closing(cnx): with contextlib.closing(cnx):
cnx.query("show plugins") cnx.query("show plugins")
...@@ -229,8 +223,7 @@ class TestMroonga(MariaDBTestCase): ...@@ -229,8 +223,7 @@ class TestMroonga(MariaDBTestCase):
('Mroonga', 'ACTIVE', 'STORAGE ENGINE', 'ha_mroonga.so', 'GPL'), ('Mroonga', 'ACTIVE', 'STORAGE ENGINE', 'ha_mroonga.so', 'GPL'),
plugins) plugins)
def test_mroonga_normalize_udf(self): def test_mroonga_normalize_udf(self) -> None:
# type: () -> None
# example from https://mroonga.org/docs/reference/udf/mroonga_normalize.html#usage # example from https://mroonga.org/docs/reference/udf/mroonga_normalize.html#usage
cnx = self.getDatabaseConnection() cnx = self.getDatabaseConnection()
with contextlib.closing(cnx): with contextlib.closing(cnx):
...@@ -255,8 +248,7 @@ class TestMroonga(MariaDBTestCase): ...@@ -255,8 +248,7 @@ class TestMroonga(MariaDBTestCase):
self.assertEqual((('ABCDあぃうぇ㍑'.encode(),),), self.assertEqual((('ABCDあぃうぇ㍑'.encode(),),),
cnx.store_result().fetch_row(maxrows=2)) cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_normalizer(self): def test_mroonga_full_text_normalizer(self) -> None:
# type: () -> None
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-normalizer # example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-normalizer
cnx = self.getDatabaseConnection() cnx = self.getDatabaseConnection()
with contextlib.closing(cnx): with contextlib.closing(cnx):
...@@ -293,8 +285,7 @@ class TestMroonga(MariaDBTestCase): ...@@ -293,8 +285,7 @@ class TestMroonga(MariaDBTestCase):
cnx.store_result().fetch_row(maxrows=2), cnx.store_result().fetch_row(maxrows=2),
) )
def test_mroonga_full_text_normalizer_TokenBigramSplitSymbolAlphaDigit(self): def test_mroonga_full_text_normalizer_TokenBigramSplitSymbolAlphaDigit(self) -> None:
# type: () -> None
# Similar to as ERP5's testI18NSearch with erp5_full_text_mroonga_catalog # Similar to as ERP5's testI18NSearch with erp5_full_text_mroonga_catalog
cnx = self.getDatabaseConnection() cnx = self.getDatabaseConnection()
with contextlib.closing(cnx): with contextlib.closing(cnx):
...@@ -337,8 +328,7 @@ class TestMroonga(MariaDBTestCase): ...@@ -337,8 +328,7 @@ class TestMroonga(MariaDBTestCase):
""") """)
self.assertEqual(((1,),), cnx.store_result().fetch_row(maxrows=2)) self.assertEqual(((1,),), cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_stem(self): def test_mroonga_full_text_stem(self) -> None:
# type: () -> None
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-token-filters # example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-token-filters
cnx = self.getDatabaseConnection() cnx = self.getDatabaseConnection()
with contextlib.closing(cnx): with contextlib.closing(cnx):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment