Commit 9ceaa3f2 authored by Jérome Perrin's avatar Jérome Perrin

ERP5TypeTestCase: update .publish() patch with Zope4 version

This changes slightly getOutput, `Status` is not printed as an header
(Status: 200 OK) but only in the status line (HTTP/1.1 200 OK)
parent 3e81bc25
...@@ -2085,7 +2085,7 @@ return 1 ...@@ -2085,7 +2085,7 @@ return 1
for credential in ['ERP5TypeTestCase:', 'zope_user:']: for credential in ['ERP5TypeTestCase:', 'zope_user:']:
response = self.publish('%s/%s' %(document.getPath(), object_url), response = self.publish('%s/%s' %(document.getPath(), object_url),
basic=credential) basic=credential)
self.assertIn('Status: 200 OK', response.getOutput()) self.assertIn('200 OK', response.getOutput())
# OOod produced HTML navigation, test it # OOod produced HTML navigation, test it
self.assertIn('First page', response.getBody()) self.assertIn('First page', response.getBody())
self.assertIn('Back', response.getBody()) self.assertIn('Back', response.getBody())
......
...@@ -21,7 +21,8 @@ import traceback ...@@ -21,7 +21,8 @@ import traceback
import urllib import urllib
import ConfigParser import ConfigParser
from contextlib import contextmanager from contextlib import contextmanager
from six.moves import cStringIO as StringIO from io import BytesIO
from functools import partial
from six.moves.urllib.parse import unquote_to_bytes from six.moves.urllib.parse import unquote_to_bytes
from cPickle import dumps from cPickle import dumps
from glob import glob from glob import glob
...@@ -751,8 +752,10 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase): ...@@ -751,8 +752,10 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
request_method='GET', stdin=None, handle_errors=True): request_method='GET', stdin=None, handle_errors=True):
'''Publishes the object at 'path' returning a response object.''' '''Publishes the object at 'path' returning a response object.'''
from ZPublisher.Response import Response from ZPublisher.HTTPResponse import WSGIResponse
from ZPublisher.Publish import publish_module_standard from ZPublisher.WSGIPublisher import publish_module
from ZPublisher.httpexceptions import HTTPExceptionHandler
from Testing.ZopeTestCase.functional import ResponseWrapper
from AccessControl.SecurityManagement import getSecurityManager from AccessControl.SecurityManagement import getSecurityManager
from AccessControl.SecurityManagement import setSecurityManager from AccessControl.SecurityManagement import setSecurityManager
...@@ -771,6 +774,7 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase): ...@@ -771,6 +774,7 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
env['SERVER_NAME'] = request['SERVER_NAME'] env['SERVER_NAME'] = request['SERVER_NAME']
env['SERVER_PORT'] = request['SERVER_PORT'] env['SERVER_PORT'] = request['SERVER_PORT']
env['HTTP_ACCEPT_CHARSET'] = request['HTTP_ACCEPT_CHARSET'] env['HTTP_ACCEPT_CHARSET'] = request['HTTP_ACCEPT_CHARSET']
env['SERVER_PROTOCOL'] = 'HTTP/1.1'
env['REQUEST_METHOD'] = request_method env['REQUEST_METHOD'] = request_method
query = '' query = ''
...@@ -798,10 +802,10 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase): ...@@ -798,10 +802,10 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
return orig_extractUserIds(pas, request, plugins) return orig_extractUserIds(pas, request, plugins)
if stdin is None: if stdin is None:
stdin = StringIO() stdin = BytesIO()
outstream = StringIO() outstream = BytesIO()
response = Response(stdout=outstream, stderr=sys.stderr) response = WSGIResponse(stdout=outstream, stderr=sys.stderr)
try: try:
if user: if user:
...@@ -815,13 +819,29 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase): ...@@ -815,13 +819,29 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
if extra: if extra:
for k, v in six.iteritems(extra): request[k] = v for k, v in six.iteritems(extra): request[k] = v
publish_module_standard('Zope2', wsgi_headers = BytesIO()
request=request,
response=response, def start_response(status, headers):
stdin=stdin, # Keep the fake response in-sync with the actual values
environ=env, # from the WSGI start_response call.
debug=not handle_errors, response.setStatus(status.split()[0])
) for key, value in headers:
response.setHeader(key, value)
wsgi_headers.write(
b'HTTP/1.1 ' + status.encode('ascii') + b'\r\n')
headers = b'\r\n'.join([
(k + ': ' + v).encode('ascii') for k, v in headers])
wsgi_headers.write(headers)
wsgi_headers.write(b'\r\n\r\n')
publish = partial(publish_module, _request=request, _response=response)
if handle_errors:
publish = HTTPExceptionHandler(publish)
wsgi_result = publish(env, start_response)
finally: finally:
if user: if user:
PAS._extractUserIds = orig_extractUserIds PAS._extractUserIds = orig_extractUserIds
...@@ -831,12 +851,12 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase): ...@@ -831,12 +851,12 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
setSite(self.portal) setSite(self.portal)
# Make sure that the skin cache does not have objects that were # Make sure that the skin cache does not have objects that were
# loaded with the connection used by the requested url. # loaded with the connection used by the requested url.
self.changeSkin(self.portal.getCurrentSkinName()) self.changeSkin(self.portal.getCurrentSkinName())
return ResponseWrapper(response, outstream, path) return ResponseWrapper(response, outstream, path,
wsgi_result, wsgi_headers)
def getConsistencyMessageList(self, obj): def getConsistencyMessageList(self, obj):
return sorted([ str(message.getMessage()) return sorted([ str(message.getMessage())
...@@ -1426,45 +1446,6 @@ ERP5Site.getBootstrapBusinessTemplateUrl = lambda bt_title: \ ...@@ -1426,45 +1446,6 @@ ERP5Site.getBootstrapBusinessTemplateUrl = lambda bt_title: \
ERP5TypeCommandLineTestCase._getBTPathAndIdList((bt_title,))[0][0] ERP5TypeCommandLineTestCase._getBTPathAndIdList((bt_title,))[0][0]
class ResponseWrapper:
'''Decorates a response object with additional introspective methods.'''
_headers_separator_re = re.compile('(?:\r?\n){2}')
def __init__(self, response, outstream, path):
self._response = response
self._outstream = outstream
self._path = path
def __getattr__(self, name):
return getattr(self._response, name)
def getOutput(self):
'''Returns the complete output, headers and all.'''
return self._outstream.getvalue()
def getBody(self):
'''Returns the page body, i.e. the output par headers.'''
output = self.getOutput()
try:
headers, body = self._headers_separator_re.split(output, 1)
return body
except ValueError:
# not enough values to unpack: no body
return None
def getPath(self):
'''Returns the path used by the request.'''
return self._path
def getHeader(self, name):
'''Returns the value of a response header.'''
return self.headers.get(name.lower())
def getCookie(self, name):
'''Returns a response cookie.'''
return self.cookies.get(name)
class ERP5ReportTestCase(ERP5TypeTestCase): class ERP5ReportTestCase(ERP5TypeTestCase):
"""Base class for testing ERP5 Reports """Base class for testing ERP5 Reports
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment