Commit 466ff9fe authored by Jérome Perrin's avatar Jérome Perrin

one test

parent e655a5b0
...@@ -37,6 +37,7 @@ _setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass( ...@@ -37,6 +37,7 @@ _setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass(
setup_module_executed = False setup_module_executed = False
setup_module_executed = True
def setUpModule(): def setUpModule():
# slapos.testing.testcase's only need to be executed once # slapos.testing.testcase's only need to be executed once
global setup_module_executed global setup_module_executed
......
##############################################################################
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import json
import glob
import urlparse
import socket
import time
import psutil
import requests
from . import ERP5InstanceTestCase
from . import setUpModule
setUpModule # pyflakes
class TestPublishedURLIsReachableMixin(object):
"""Mixin that checks that default page of ERP5 is reachable.
"""
def _checkERP5IsReachable(self, url):
# What happens is that instanciation just create the services, but does not
# wait for ERP5 to be initialized. When this test run ERP5 instance is
# instanciated, but zope is still busy creating the site and haproxy replies
# with 503 Service Unavailable, sometimes the first request is 404, so we
# retry in a loop.
# If we can move the "create site" in slapos node instance, then this retry loop
# would not be necessary.
for i in range(1, 60):
r = requests.get(url, verify=False) # XXX can we get CA from caucase already ?
if r.status_code in (requests.codes.service_unavailable,
requests.codes.not_found):
delay = i * 2
self.logger.warn("ERP5 was not available, sleeping for %ds and retrying", delay)
time.sleep(delay)
continue
if r.status_code != requests.codes.ok:
r.raise_for_status()
break
self.assertIn("ERP5", r.text)
def test_published_family_default_v6_is_reachable(self):
"""Tests the IPv6 URL published by the root partition is reachable.
"""
param_dict = self.getRootPartitionConnectionParameterDict()
self._checkERP5IsReachable(
urlparse.urljoin(param_dict['family-default-v6'], param_dict['site-id']))
def test_published_family_default_v4_is_reachable(self):
"""Tests the IPv4 URL published by the root partition is reachable.
"""
param_dict = self.getRootPartitionConnectionParameterDict()
self._checkERP5IsReachable(
urlparse.urljoin(param_dict['family-default'], param_dict['site-id']))
class TestDefaultParameters(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test ERP5 can be instanciated with no parameters
"""
__partition_reference__ = 'defp'
class TestMedusa(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test ERP5 Medusa server
"""
__partition_reference__ = 'medusa'
@classmethod
def getInstanceParameterDict(cls):
return {'_': json.dumps({'wsgi': False})}
class TestApacheBalancerPorts(ERP5InstanceTestCase):
"""Instanciate with two zope families, this should create for each family:
- a balancer entry point with corresponding haproxy
- a balancer entry point for test runner
"""
__partition_reference__ = 'ap'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps({
"zope-partition-dict": {
"family1": {
"instance-count": 3,
"family": "family1"
},
"family2": {
"instance-count": 5,
"family": "family2"
},
},
})
}
def checkValidHTTPSURL(self, url):
parsed = urlparse.urlparse(url)
self.assertEqual(parsed.scheme, 'https')
self.assertTrue(parsed.hostname)
self.assertTrue(parsed.port)
def test_published_family_parameters(self):
# when we request two families, we have two published family-{family_name} URLs
param_dict = self.getRootPartitionConnectionParameterDict()
for family_name in ('family1', 'family2'):
self.checkValidHTTPSURL(
param_dict['family-{family_name}'.format(family_name=family_name)])
self.checkValidHTTPSURL(
param_dict['family-{family_name}-v6'.format(family_name=family_name)])
def test_published_test_runner_url(self):
# each family's also a list of test test runner URLs, by default 3 per family
param_dict = self.getRootPartitionConnectionParameterDict()
for family_name in ('family1', 'family2'):
family_test_runner_url_list = param_dict[
'{family_name}-test-runner-url-list'.format(family_name=family_name)]
self.assertEqual(3, len(family_test_runner_url_list))
for url in family_test_runner_url_list:
self.checkValidHTTPSURL(url)
def test_zope_listen(self):
# we requested 3 zope in family1 and 5 zopes in family2, we should have 8 zope running.
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
self.assertEqual(
3 + 5,
len([p for p in all_process_info if p['name'].startswith('zope-')]))
def test_apache_listen(self):
# We have 2 families, apache should listen to a total of 3 ports per family
# normal access on ipv4 and ipv6 and test runner access on ipv4 only
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
process_info, = [p for p in all_process_info if p['name'] == 'apache']
apache_process = psutil.Process(process_info['pid'])
self.assertEqual(
sorted([socket.AF_INET] * 4 + [socket.AF_INET6] * 2),
sorted([
c.family
for c in apache_process.connections()
if c.status == 'LISTEN'
]))
def test_haproxy_listen(self):
# There is one haproxy per family
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
process_info, = [
p for p in all_process_info if p['name'].startswith('haproxy-')
]
haproxy_process = psutil.Process(process_info['pid'])
self.assertEqual([socket.AF_INET, socket.AF_INET], [
c.family for c in haproxy_process.connections() if c.status == 'LISTEN'
])
class TestDisableTestRunner(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test ERP5 can be instanciated without test runner.
"""
__partition_reference__ = 'distr'
@classmethod
def getInstanceParameterDict(cls):
return {'_': json.dumps({'test-runner': {'enabled': False}})}
def test_no_runUnitTestScript(self):
"""No runUnitTest script should be generated in any partition.
"""
# self.computer_partition_root_path is the path of root partition.
# we want to assert that no scripts exist in any partition.
bin_programs = map(os.path.basename,
glob.glob(self.computer_partition_root_path + "/../*/bin/*"))
self.assertTrue(bin_programs) # just to check the glob was correct.
self.assertNotIn('runUnitTest', bin_programs)
self.assertNotIn('runTestSuite', bin_programs)
def test_no_apache_testrunner_port(self):
# Apache only listen on two ports, there is no apache ports allocated for test runner
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
process_info, = [p for p in all_process_info if p['name'] == 'apache']
apache_process = psutil.Process(process_info['pid'])
self.assertEqual(
sorted([socket.AF_INET, socket.AF_INET6]),
sorted(
c.family
for c in apache_process.connections()
if c.status == 'LISTEN'
))
class TestZopeNodeParameterOverride(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test override zope node parameters
"""
__partition_reference__ = 'override'
@classmethod
def getInstanceParameterDict(cls):
# The following example includes the most commonly used options,
# but not necessarily in a meaningful way.
return {'_': json.dumps({
"zodb": [{
"type": "zeo",
"server": {},
"cache-size-bytes": "20MB",
"cache-size-bytes!": [
("bb-0", 1<<20),
("bb-.*", "500MB"),
],
"pool-timeout": "10m",
"storage-dict": {
"cache-size!": [
("a-.*", "50MB"),
],
},
}],
"zope-partition-dict": {
"a": {
"instance-count": 3,
},
"bb": {
"instance-count": 5,
"port-base": 2300,
},
},
})}
def test_zope_conf(self):
zeo_addr = json.loads(
self.getComputerPartition('zodb').getConnectionParameter('_')
)["storage-dict"]["root"]["server"]
def checkParameter(line, kw):
k, v = line.split()
self.assertFalse(k.endswith('!'), k)
try:
expected = kw.pop(k)
except KeyError:
if k == 'server':
return
self.assertIsNotNone(expected)
self.assertEqual(str(expected), v)
def checkConf(zodb, storage):
zodb["mount-point"] = "/"
zodb["pool-size"] = 4
zodb["pool-timeout"] = "10m"
storage["storage"] = "root"
storage["server"] = zeo_addr
with open('%s/etc/zope-%s.conf' % (partition, zope)) as f:
conf = map(str.strip, f.readlines())
i = conf.index("<zodb_db root>") + 1
conf = iter(conf[i:conf.index("</zodb_db>", i)])
for line in conf:
if line == '<zeoclient>':
for line in conf:
if line == '</zeoclient>':
break
checkParameter(line, storage)
for k, v in storage.iteritems():
self.assertIsNone(v, k)
del storage
else:
checkParameter(line, zodb)
for k, v in zodb.iteritems():
self.assertIsNone(v, k)
partition = self.getComputerPartitionPath('zope-a')
for zope in xrange(3):
checkConf({
"cache-size-bytes": "20MB",
}, {
"cache-size": "50MB",
})
partition = self.getComputerPartitionPath('zope-bb')
for zope in xrange(5):
checkConf({
"cache-size-bytes": "500MB" if zope else 1<<20,
}, {
"cache-size": None,
})
##############################################################################
# coding: utf-8
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import json
import glob
import urlparse
import socket
import time
import contextlib
import datetime
import MySQLdb
from . import ERP5InstanceTestCase
from . import setUpModule
setUpModule # pyflakes
class MariaDBTestCase(ERP5InstanceTestCase):
"""Base test case for mariadb tests.
"""
__partition_reference__ = 'm'
@classmethod
def getInstanceSoftwareType(cls):
return "mariadb"
@classmethod
def _getInstanceParameterDict(cls):
return {
'tcpv4-port': 3306,
'max-connection-count': 5,
'max-slowqueries-threshold': 5,
'slowest-query-threshold': 10,
# XXX what is this ? should probably not be needed here
'name': cls.__name__,
'monitor-passwd': 'secret',
# XXX should probably not be needed here
'computer-memory-percent-threshold': 100,
}
@classmethod
def getInstanceParameterDict(cls):
return {'_': json.dumps(cls._getInstanceParameterDict())}
def getDatabaseConnection(self):
connection_parameter_dict = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])
db_url = urlparse.urlparse(connection_parameter_dict['database-list'][0])
self.assertEqual('mysql', db_url.scheme)
self.assertTrue(db_url.path.startswith('/'))
database_name = db_url.path[1:]
return MySQLdb.connect(
user=db_url.username,
passwd=db_url.password,
host=db_url.hostname,
port=db_url.port,
db=database_name,
)
class TestMariaDB(MariaDBTestCase):
def test_utf8_collation(self):
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query(
"""
CREATE TABLE test_utf8_collation (
col1 CHAR(10)
)
""")
cnx.store_result()
cnx.query(
"""
insert into test_utf8_collation values ("à"), ("あ")
""")
cnx.store_result()
cnx.query(
"""
select * from test_utf8_collation where col1 = "a"
""")
self.assertEqual((('à',),), cnx.store_result().fetch_row(maxrows=2))
class TestMroonga(MariaDBTestCase):
def test_mroonga_plugin_loaded(self):
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("show plugins")
plugins = cnx.store_result().fetch_row(maxrows=1000)
self.assertIn(
('Mroonga', 'ACTIVE', 'STORAGE ENGINE', 'ha_mroonga.so', 'GPL'),
plugins)
def test_mroonga_normalize_udf(self):
# example from https://mroonga.org/docs/reference/udf/mroonga_normalize.html#usage
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query(
"""
SELECT mroonga_normalize("ABCDあぃうぇ㍑")
""")
self.assertEqual((('abcdあぃうぇリットル',),),
cnx.store_result().fetch_row(maxrows=2))
if 0:
# this example fail with:
# OperationalError: (1123, "Can't initialize function 'mroonga_normalize'; mroonga_normalize(): nonexistent normalizer NormalizerMySQLUnicodeCIExceptKanaCI")
# same error on mroonga "official" docker images using mysql
# https://hub.docker.com/layers/groonga/mroonga/latest/images/sha256-e5a979801c95544ca3a1228d2c4d819820850e0162649553f2e94850e5e1c988?context=explore
# so it's probably OK to ignore
cnx.query(
"""
SELECT mroonga_normalize("aBcDあぃウェ㍑", "NormalizerMySQLUnicodeCIExceptKanaCIKanaWithVoicedSoundMark")
""")
self.assertEqual((('ABCDあぃうぇ㍑',),),
cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_normalizer(self):
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-normalizer
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("SET NAMES utf8")
cnx.store_result()
cnx.query(
"""
CREATE TABLE diaries (
day DATE PRIMARY KEY,
content VARCHAR(64) NOT NULL,
FULLTEXT INDEX (content) COMMENT 'normalizer "NormalizerAuto"'
) Engine=Mroonga DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci
""")
cnx.store_result()
cnx.query(
"""INSERT INTO diaries VALUES ("2013-04-23", "ブラックコーヒーを飲んだ。")""")
cnx.store_result()
cnx.query(
"""
SELECT *
FROM diaries
WHERE MATCH (content) AGAINST ("+ふらつく" IN BOOLEAN MODE)
""")
self.assertEqual((), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
SELECT *
FROM diaries
WHERE MATCH (content) AGAINST ("+ブラック" IN BOOLEAN MODE)
""")
self.assertEqual(
((datetime.date(2013, 4, 23), 'ブラックコーヒーを飲んだ。'),),
cnx.store_result().fetch_row(maxrows=2),
)
def test_mroonga_full_text_normalizer_TokenBigramSplitSymbolAlphaDigit(self):
# Similar to as ERP5's testI18NSearch with erp5_full_text_mroonga_catalog
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query(
"""
CREATE TABLE `full_text` (
`uid` BIGINT UNSIGNED NOT NULL,
`SearchableText` MEDIUMTEXT,
PRIMARY KEY (`uid`),
FULLTEXT `SearchableText` (`SearchableText`) COMMENT 'parser "TokenBigramSplitSymbolAlphaDigit"'
) ENGINE=mroonga
""")
cnx.store_result()
cnx.query(
"""
INSERT INTO full_text VALUES
(1, "Gabriel Fauré Quick brown fox jumps over the lazy dog"),
(2, "武者小路 実篤 Slow white fox jumps over the diligent dog."),
(3, "( - + )")""")
cnx.store_result()
cnx.query(
"""
SELECT uid
FROM full_text
WHERE MATCH (`full_text`.`SearchableText`) AGAINST ('*D+ Faure' IN BOOLEAN MODE)
""")
self.assertEqual(((1,),), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
SELECT uid
FROM full_text
WHERE MATCH (`full_text`.`SearchableText`) AGAINST ('*D+ 武者' IN BOOLEAN MODE)
""")
self.assertEqual(((2,),), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
SELECT uid
FROM full_text
WHERE MATCH (`full_text`.`SearchableText`) AGAINST ('*D+ +quick +fox +dog' IN BOOLEAN MODE)
""")
self.assertEqual(((1,),), cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_stem(self):
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-token-filters
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("SELECT mroonga_command('register token_filters/stem')")
self.assertEqual((('true',),), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
CREATE TABLE memos (
id INT NOT NULL PRIMARY KEY,
content TEXT NOT NULL,
FULLTEXT INDEX (content) COMMENT 'normalizer "NormalizerAuto", token_filters "TokenFilterStem"'
) Engine=Mroonga DEFAULT CHARSET=utf8
""")
cnx.store_result()
cnx.query(
"""INSERT INTO memos VALUES (1, "I develop Groonga"), (2, "I'm developing Groonga"), (3, "I developed Groonga")"""
)
cnx.store_result()
cnx.query(
"""
SELECT *
FROM memos
WHERE MATCH (content) AGAINST ("+develops" IN BOOLEAN MODE)
""")
self.assertEqual([
(1, "I develop Groonga"),
(2, "I'm developing Groonga"),
(3, "I developed Groonga"),
], list(sorted(cnx.store_result().fetch_row(maxrows=4))))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment