Commit b1f1934d authored by Jérome Perrin's avatar Jérome Perrin

software/erp5: tests for publisher-timeout

parent d25ab7bd
Pipeline #22098 failed with stage
in 0 seconds
...@@ -406,7 +406,7 @@ class TestWatchActivities(ERP5InstanceTestCase): ...@@ -406,7 +406,7 @@ class TestWatchActivities(ERP5InstanceTestCase):
__partition_reference__ = 'wa' __partition_reference__ = 'wa'
def test(self): def test(self):
# "watch_activites" scripts use watch command. We'll fake a watch command # "watch_activities" scripts use watch command. We'll fake a watch command
# that executes the actual command only once to check the output. # that executes the actual command only once to check the output.
tmpdir = tempfile.mkdtemp() tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir) self.addCleanup(shutil.rmtree, tmpdir)
...@@ -443,37 +443,35 @@ class TestWatchActivities(ERP5InstanceTestCase): ...@@ -443,37 +443,35 @@ class TestWatchActivities(ERP5InstanceTestCase):
self.assertIn(' dict ', output) self.assertIn(' dict ', output)
class ZopeTestMixin(CrontabMixin): class ZopeSkinsMixin(object):
"""Mixin class for zope features. """Mixins with utility methods to test zope behaviors.
""" """
wsgi = NotImplemented # type: bool
__partition_reference__ = 'z'
@classmethod @classmethod
def getInstanceParameterDict(cls): def _setUpClass(cls):
return { super(ZopeSkinsMixin, cls)._setUpClass()
'_': param_dict = cls.getRootPartitionConnectionParameterDict()
json.dumps({ with cls.getXMLRPCClient() as erp5_xmlrpc_client:
"zope-partition-dict": { # wait for ERP5 to be ready (TODO: this should probably be a promise)
"default": { for _ in range(120):
"longrequest-logger-interval": 1, time.sleep(1)
"longrequest-logger-timeout": 1, try:
}, erp5_xmlrpc_client.getTitle()
}, except (six.moves.xmlrpc_client.ProtocolError,
"wsgi": cls.wsgi, six.moves.xmlrpc_client.Fault):
}) pass
} else:
break
@classmethod @classmethod
def _setUpClass(cls): def _getAuthenticatedZopeUrl(cls, path, family_name='default'):
super(ZopeTestMixin, cls)._setUpClass() """Returns a URL to access a zope family through balancer,
with credentials in the URL.
path is joined with urllib.parse.urljoin to the URL of the portal.
"""
param_dict = cls.getRootPartitionConnectionParameterDict() param_dict = cls.getRootPartitionConnectionParameterDict()
# rebuild an url with user and password parsed = six.moves.urllib.parse.urlparse(param_dict['family-' + family_name])
parsed = six.moves.urllib.parse.urlparse(param_dict['family-default']) base_url = parsed._replace(
cls.zope_base_url = parsed._replace(
netloc='{}:{}@{}:{}'.format( netloc='{}:{}@{}:{}'.format(
param_dict['inituser-login'], param_dict['inituser-login'],
param_dict['inituser-password'], param_dict['inituser-password'],
...@@ -482,22 +480,17 @@ class ZopeTestMixin(CrontabMixin): ...@@ -482,22 +480,17 @@ class ZopeTestMixin(CrontabMixin):
), ),
path=param_dict['site-id'] + '/', path=param_dict['site-id'] + '/',
).geturl() ).geturl()
return six.moves.urllib_parse.urljoin(base_url, path)
cls.zope_deadlock_debugger_url = six.moves.urllib_parse.urljoin( @classmethod
cls.zope_base_url,
'/manage_debug_threads?{deadlock-debugger-password}'.format(
**param_dict),
)
@contextlib.contextmanager @contextlib.contextmanager
def getXMLRPCClient(): def getXMLRPCClient(cls):
# don't verify certificate # don't verify certificate
ssl_context = ssl.create_default_context() ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE ssl_context.verify_mode = ssl.CERT_NONE
erp5_xmlrpc_client = six.moves.xmlrpc_client.ServerProxy( erp5_xmlrpc_client = six.moves.xmlrpc_client.ServerProxy(
cls.zope_base_url, cls._getAuthenticatedZopeUrl(''),
context=ssl_context, context=ssl_context,
) )
# BBB use as a context manager only on python3 # BBB use as a context manager only on python3
...@@ -507,20 +500,9 @@ class ZopeTestMixin(CrontabMixin): ...@@ -507,20 +500,9 @@ class ZopeTestMixin(CrontabMixin):
with erp5_xmlrpc_client: with erp5_xmlrpc_client:
yield erp5_xmlrpc_client yield erp5_xmlrpc_client
with getXMLRPCClient() as erp5_xmlrpc_client: @classmethod
# wait for ERP5 to be ready (TODO: this should probably be a promise) def _addPythonScript(cls, script_id, params, body):
for _ in range(120): with cls.getXMLRPCClient() as erp5_xmlrpc_client:
time.sleep(1)
try:
erp5_xmlrpc_client.getTitle()
except (six.moves.xmlrpc_client.ProtocolError,
six.moves.xmlrpc_client.Fault):
pass
else:
break
def addPythonScript(script_id, params, body):
with getXMLRPCClient() as erp5_xmlrpc_client:
custom = erp5_xmlrpc_client.portal_skins.custom custom = erp5_xmlrpc_client.portal_skins.custom
try: try:
custom.manage_addProduct.PythonScripts.manage_addPythonScript( custom.manage_addProduct.PythonScripts.manage_addPythonScript(
...@@ -535,8 +517,40 @@ class ZopeTestMixin(CrontabMixin): ...@@ -535,8 +517,40 @@ class ZopeTestMixin(CrontabMixin):
body, body,
) )
class ZopeTestMixin(ZopeSkinsMixin, CrontabMixin):
"""Mixin class for zope features.
"""
wsgi = NotImplemented # type: bool
__partition_reference__ = 'z'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps({
"zope-partition-dict": {
"default": {
"longrequest-logger-interval": 1,
"longrequest-logger-timeout": 1,
},
},
"wsgi": cls.wsgi,
})
}
@classmethod
def _setUpClass(cls):
super(ZopeTestMixin, cls)._setUpClass()
cls.zope_base_url = cls._getAuthenticatedZopeUrl('')
param_dict = cls.getRootPartitionConnectionParameterDict()
cls.zope_deadlock_debugger_url = cls._getAuthenticatedZopeUrl(
'/manage_debug_threads?{deadlock-debugger-password}'.format(
**param_dict))
# a python script to verify activity processing # a python script to verify activity processing
addPythonScript( cls._addPythonScript(
script_id='ERP5Site_verifyActivityProcessing', script_id='ERP5Site_verifyActivityProcessing',
params='mode', params='mode',
body='''if 1: body='''if 1:
...@@ -556,7 +570,7 @@ class ZopeTestMixin(CrontabMixin): ...@@ -556,7 +570,7 @@ class ZopeTestMixin(CrontabMixin):
'ERP5Site_verifyActivityProcessing', 'ERP5Site_verifyActivityProcessing',
) )
# a python script logging to event log # a python script logging to event log
addPythonScript( cls._addPythonScript(
script_id='ERP5Site_logMessage', script_id='ERP5Site_logMessage',
params='name', params='name',
body='''if 1: body='''if 1:
...@@ -569,7 +583,7 @@ class ZopeTestMixin(CrontabMixin): ...@@ -569,7 +583,7 @@ class ZopeTestMixin(CrontabMixin):
'ERP5Site_logMessage', 'ERP5Site_logMessage',
) )
# a python script issuing a long request # a python script issuing a long request
addPythonScript( cls._addPythonScript(
script_id='ERP5Site_executeLongRequest', script_id='ERP5Site_executeLongRequest',
params='', params='',
body='''if 1: body='''if 1:
...@@ -850,3 +864,61 @@ class TestZopeWSGI(ZopeTestMixin, ERP5InstanceTestCase): ...@@ -850,3 +864,61 @@ class TestZopeWSGI(ZopeTestMixin, ERP5InstanceTestCase):
@unittest.expectedFailure @unittest.expectedFailure
def test_basic_authentication_user_in_access_log(self): def test_basic_authentication_user_in_access_log(self):
super(TestZopeWSGI, self).test_basic_authentication_user_in_access_log(self) super(TestZopeWSGI, self).test_basic_authentication_user_in_access_log(self)
class TestZopePublisherTimeout(ZopeSkinsMixin, ERP5InstanceTestCase):
__partition_reference__ = 't'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps({
# a default timeout of 3
"publisher-timeout": 3,
# and a family without timeout
"family-override": {
"no-timeout": {
"publisher-timeout": None,
},
},
"zope-partition-dict": {
"default": {
"family": "default",
},
"no-timeout": {
"family": "no-timeout",
"port-base": 2300,
},
},
})
}
@classmethod
def _setUpClass(cls):
super(TestZopePublisherTimeout, cls)._setUpClass()
cls._addPythonScript(
'ERP5Site_doSlowRequest',
'',
'''if 1:
import time
def recurse(o):
time.sleep(0.1)
for sub in o.objectValues():
recurse(sub)
recurse(context.getPortalObject())
'''
)
def test_long_request_interupted_on_default_family(self):
ret = requests.get(self._getAuthenticatedZopeUrl(
'ERP5Site_doSlowRequest', family_name='default'), verify=False)
self.assertIn('TimeoutReachedError', ret.text)
self.assertEqual(ret.status_code, requests.codes.server_error)
def test_long_request_not_interupted_on_no_timeout_family(self):
with self.assertRaises(requests.exceptions.Timeout):
requests.get(
self._getAuthenticatedZopeUrl('ERP5Site_doSlowRequest', family_name='no-timeout'),
verify=False,
timeout=6)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment