Commit 2f444231 authored by Hanno Schlichting's avatar Hanno Schlichting

Move `ZPublisher.Publish` module into ZServer distribution.

Change Testing to use the WSGI publisher for functional and testbrowser
based tests incl. functional doctests. Alternatives are available
in `ZServer.Testing`.
parent c92eb929
......@@ -31,6 +31,12 @@ Features Added
Restructuring
+++++++++++++
- Change Testing to use the WSGI publisher for functional and testbrowser
based tests incl. functional doctests. Alternatives are available
in `ZServer.Testing`.
- Move `ZPublisher.Publish` module into ZServer distribution.
- Remove `Globals` package, opened database are now found in
`Zope2.opened` next to `Zope2.DB`.
......
......@@ -16,14 +16,16 @@ After Marius Gedminas' functional.py module for Zope3.
"""
import base64
import re
from functools import partial
import sys
import transaction
import sandbox
import interfaces
from zope.interface import implements
from Testing.ZopeTestCase import interfaces
from Testing.ZopeTestCase import sandbox
from Zope2.Startup.httpexceptions import HTTPExceptionHandler
def savestate(func):
'''Decorator saving thread local state before executing func
......@@ -61,8 +63,10 @@ class Functional(sandbox.Sandboxed):
from StringIO import StringIO
from ZPublisher.HTTPRequest import HTTPRequest as Request
from ZPublisher.HTTPResponse import HTTPResponse as Response
from ZPublisher.Publish import publish_module
from ZPublisher.WSGIPublisher import (
publish_module,
WSGIResponse,
)
# Commit the sandbox for good measure
transaction.commit()
......@@ -76,6 +80,7 @@ class Functional(sandbox.Sandboxed):
env['SERVER_NAME'] = request['SERVER_NAME']
env['SERVER_PORT'] = request['SERVER_PORT']
env['SERVER_PROTOCOL'] = 'HTTP/1.1'
env['REQUEST_METHOD'] = request_method
p = path.split('?')
......@@ -93,42 +98,54 @@ class Functional(sandbox.Sandboxed):
stdin = StringIO()
outstream = StringIO()
response = Response(stdout=outstream, stderr=sys.stderr)
response = WSGIResponse(stdout=outstream, stderr=sys.stderr)
request = Request(stdin, env, response)
request.retry_max_count = 0
for k, v in extra.items():
request[k] = v
publish_module('Zope2',
debug=not handle_errors,
request=request,
response=response)
wsgi_headers = StringIO()
return ResponseWrapper(response, outstream, path)
def start_response(status, headers):
wsgi_headers.write('HTTP/1.1 %s\r\n' % status)
headers = '\r\n'.join([': '.join(x) for x in headers])
wsgi_headers.write(headers)
wsgi_headers.write('\r\n\r\n')
publish = partial(publish_module, _request=request, _response=response)
if handle_errors:
publish = HTTPExceptionHandler(publish)
class ResponseWrapper:
'''Decorates a response object with additional introspective methods.'''
wsgi_result = publish(env, start_response)
return ResponseWrapper(response, outstream, path,
wsgi_result, wsgi_headers)
_bodyre = re.compile('\r\n\r\n(.*)', re.MULTILINE | re.DOTALL)
def __init__(self, response, outstream, path):
class ResponseWrapper(object):
'''Decorates a response object with additional introspective methods.'''
def __init__(self, response, outstream, path,
wsgi_result=(), wsgi_headers=''):
self._response = response
self._outstream = outstream
self._path = path
self._wsgi_result = wsgi_result
self._wsgi_headers = wsgi_headers
def __getattr__(self, name):
return getattr(self._response, name)
def __str__(self):
return self.getOutput()
def getOutput(self):
'''Returns the complete output, headers and all.'''
return self._outstream.getvalue()
return self._wsgi_headers.getvalue() + self.getBody()
def getBody(self):
'''Returns the page body, i.e. the output par headers.'''
body = self._bodyre.search(self.getOutput())
if body is not None:
body = body.group(1)
return body
return ''.join(self._wsgi_result)
def getPath(self):
'''Returns the path used by the request.'''
......
......@@ -100,7 +100,7 @@ Test Unauthorized
... """, handle_errors=True))
HTTP/1.1 401 Unauthorized
...
Www-Authenticate: basic realm=...
WWW-Authenticate: basic realm=...
Test Basic Authentication
......
......@@ -15,6 +15,7 @@
import base64
import doctest
from functools import partial
import re
import sys
import warnings
......@@ -32,6 +33,7 @@ from Testing.ZopeTestCase import standard_permissions
from Testing.ZopeTestCase.sandbox import AppZapper
from Testing.ZopeTestCase.functional import ResponseWrapper
from Testing.ZopeTestCase.functional import savestate
from Zope2.Startup.httpexceptions import HTTPExceptionHandler
if sys.version_info >= (3, ):
basestring = str
......@@ -82,16 +84,12 @@ class DocResponseWrapper(ResponseWrapper):
"""Response Wrapper for use in doctests
"""
def __init__(self, response, outstream, path, header_output):
ResponseWrapper.__init__(self, response, outstream, path)
def __init__(self, response, outstream, path, header_output,
wsgi_result=(), wsgi_headers=''):
ResponseWrapper.__init__(self, response, outstream, path,
wsgi_result, wsgi_headers)
self.header_output = header_output
def __str__(self):
body = self.getBody()
if body:
return "%s\n\n%s" % (self.header_output, body)
return "%s\n" % (self.header_output)
basicre = re.compile('Basic (.+)?:(.+)?$')
headerre = re.compile('(\S+): (.+)$')
......@@ -131,8 +129,11 @@ def http(request_string, handle_errors=True):
import urllib
import rfc822
from cStringIO import StringIO
from ZPublisher.HTTPResponse import HTTPResponse as Response
from ZPublisher.Publish import publish_module
from ZPublisher.HTTPRequest import HTTPRequest as Request
from ZPublisher.WSGIPublisher import (
publish_module,
WSGIResponse,
)
# Commit work done by previous python code.
transaction.commit()
......@@ -185,13 +186,24 @@ def http(request_string, handle_errors=True):
env['HTTP_AUTHORIZATION'] = auth_header(env['HTTP_AUTHORIZATION'])
outstream = StringIO()
response = Response(stdout=outstream, stderr=sys.stderr)
response = WSGIResponse(stdout=outstream, stderr=sys.stderr)
request = Request(instream, env, response)
request.retry_max_count = 0
env['wsgi.input'] = instream
wsgi_headers = StringIO()
def start_response(status, headers):
wsgi_headers.write('HTTP/1.1 %s\r\n' % status)
headers = '\r\n'.join([': '.join(x) for x in headers])
wsgi_headers.write(headers)
wsgi_headers.write('\r\n\r\n')
publish = partial(publish_module, _request=request, _response=response)
if handle_errors:
publish = HTTPExceptionHandler(publish)
publish_module('Zope2',
response=response,
stdin=instream,
environ=env,
debug=not handle_errors)
wsgi_result = publish(env, start_response)
header_output.setResponseStatus(response.getStatus(), response.errmsg)
header_output.setResponseHeaders(response.headers)
......@@ -200,7 +212,8 @@ def http(request_string, handle_errors=True):
sync()
return DocResponseWrapper(response, outstream, path, header_output)
return DocResponseWrapper(
response, outstream, path, header_output, wsgi_result, wsgi_headers)
class ZopeSuiteFactory:
......
......@@ -32,6 +32,7 @@ class PublisherConnection(object):
def __init__(self, host, timeout=None):
self.caller = functional.http
self.host = host
self.response = None
def set_debuglevel(self, level):
pass
......@@ -79,35 +80,20 @@ class PublisherConnection(object):
def getresponse(self):
"""Return a ``urllib2`` compatible response.
The goal of ths method is to convert the Zope Publisher's reseponse to
The goal of ths method is to convert the Zope Publisher's response to
a ``urllib2`` compatible response, which is also understood by
mechanize.
"""
real_response = self.response._response
status = real_response.getStatus()
reason = status_reasons[real_response.status]
headers = []
# Convert header keys to camel case. This is basically a copy
# paste from ZPublisher.HTTPResponse
for key, val in real_response.headers.items():
if key.lower() == key:
# only change non-literal header names
key = "%s%s" % (key[:1].upper(), key[1:])
start = 0
l = key.find('-', start)
while l >= start:
key = "%s-%s%s" % (
key[:l], key[l + 1:l + 2].upper(), key[l + 2:])
start = l + 1
l = key.find('-', start)
headers.append((key, val))
# get the cookies, breaking them into tuples for sorting
cookies = real_response._cookie_list()
headers.extend(cookies)
headers.sort()
headers.insert(0, ('Status', "%s %s" % (status, reason)))
headers = '\r\n'.join('%s: %s' % h for h in headers)
content = real_response.body
# Replace HTTP/1.1 200 OK with Status: 200 OK line.
headers = ['Status: %s %s' % (status, reason)]
wsgi_headers = self.response._wsgi_headers.getvalue().split('\r\n')
headers += [line for line in wsgi_headers[1:]]
headers = '\r\n'.join(headers)
content = self.response.getBody()
return PublisherResponse(content, headers, status, reason)
......
......@@ -10,400 +10,32 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Python Object Publisher -- Publish Python objects on web servers
"""
import os
import sys
from thread import allocate_lock
import transaction
from urlparse import urlparse
from six import reraise
from zExceptions import (
HTTPOk,
HTTPRedirection,
Redirect,
from zope.deferredimport import deprecated
# BBB Zope 5.0
deprecated(
'Please import from ZServer.ZPublisher.Publish.',
_default_debug_mode='ZServer.ZPublisher.Publish:_default_debug_mode',
_default_realm='ZServer.ZPublisher.Publish:_default_realm',
call_object='ZServer.ZPublisher.Publish:call_object',
DefaultTransactionsManager=(
'ZServer.ZPublisher.Publish:DefaultTransactionsManager'),
dont_publish_class='ZServer.ZPublisher.Publish:dont_publish_class',
get_module_info='ZServer.ZPublisher.Publish:get_module_info',
missing_name='ZServer.ZPublisher.Publish:missing_name',
publish='ZServer.ZPublisher.Publish:publish',
publish_module='ZServer.ZPublisher.Publish:publish_module',
publish_module_standard=(
'ZServer.ZPublisher.Publish:publish_module_standard'),
set_default_debug_mode=(
'ZServer.ZPublisher.Publish:set_default_debug_mode'),
set_default_authentication_realm=(
'ZServer.ZPublisher.Publish:set_default_authentication_realm'),
)
from zope.event import notify
from zope.publisher.interfaces import ISkinnable
from zope.publisher.interfaces.browser import IBrowserPage
from zope.publisher.skinnable import setDefaultSkin
from zope.security.management import newInteraction, endInteraction
from ZPublisher.mapply import mapply
from ZPublisher import pubevents
from ZPublisher import Retry
from ZPublisher.HTTPRequest import HTTPRequest as Request
from ZPublisher.HTTPResponse import HTTPResponse as Response
def call_object(object, args, request):
return object(*args)
def missing_name(name, request):
if name == 'self':
return request['PARENTS'][0]
request.response.badRequestError(name)
def dont_publish_class(klass, request):
request.response.forbiddenError("class %s" % klass.__name__)
_default_debug_mode = False
_default_realm = None
def set_default_debug_mode(debug_mode):
global _default_debug_mode
_default_debug_mode = debug_mode
def set_default_authentication_realm(realm):
global _default_realm
_default_realm = realm
def publish(request, module_name, after_list, debug=0,
# Optimize:
call_object=call_object,
missing_name=missing_name,
dont_publish_class=dont_publish_class,
mapply=mapply,
):
(bobo_before, bobo_after, object, realm, debug_mode, err_hook,
validated_hook, transactions_manager) = get_module_info(module_name)
parents = None
response = None
try:
notify(pubevents.PubStart(request))
# TODO pass request here once BaseRequest implements IParticipation
newInteraction()
request.processInputs()
request_get = request.get
response = request.response
# First check for "cancel" redirect:
if request_get('SUBMIT', '').strip().lower() == 'cancel':
cancel = request_get('CANCEL_ACTION', '')
if cancel:
# Relative URLs aren't part of the spec, but are accepted by
# some browsers.
for part, base in zip(urlparse(cancel)[:3],
urlparse(request['BASE1'])[:3]):
if not part:
continue
if not part.startswith(base):
cancel = ''
break
if cancel:
raise Redirect(cancel)
after_list[0] = bobo_after
if debug_mode:
response.debug_mode = debug_mode
if realm and not request.get('REMOTE_USER', None):
response.realm = realm
if bobo_before is not None:
bobo_before()
# Get the path list.
# According to RFC1738 a trailing space in the path is valid.
path = request_get('PATH_INFO')
request['PARENTS'] = parents = [object]
if transactions_manager:
transactions_manager.begin()
object = request.traverse(path, validated_hook=validated_hook)
if IBrowserPage.providedBy(object):
request.postProcessInputs()
notify(pubevents.PubAfterTraversal(request))
if transactions_manager:
transactions_manager.recordMetaData(object, request)
ok_exception = None
try:
result = mapply(object, request.args, request,
call_object, 1,
missing_name,
dont_publish_class,
request, bind=1)
except (HTTPOk, HTTPRedirection) as exc:
ok_exception = exc
else:
if result is not response:
response.setBody(result)
notify(pubevents.PubBeforeCommit(request))
if transactions_manager:
transactions_manager.commit()
notify(pubevents.PubSuccess(request))
endInteraction()
if ok_exception:
raise ok_exception
return response
except:
# save in order to give 'PubFailure' the original exception info
exc_info = sys.exc_info()
# DM: provide nicer error message for FTP
sm = None
if response is not None:
sm = getattr(response, "setMessage", None)
if sm is not None:
from asyncore import compact_traceback
cl, val = sys.exc_info()[:2]
sm('%s: %s %s' % (
getattr(cl, '__name__', cl), val,
debug_mode and compact_traceback()[-1] or ''))
# debug is just used by tests (has nothing to do with debug_mode!)
if not debug and err_hook is not None:
retry = False
if parents:
parents = parents[0]
try:
try:
return err_hook(parents, request,
sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2],
)
except Retry:
if not request.supports_retry():
return err_hook(parents, request,
sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2],
)
retry = True
finally:
# Note: 'abort's can fail.
# Nevertheless, we want end request handling.
try:
try:
notify(pubevents.PubBeforeAbort(
request, exc_info, retry))
finally:
if transactions_manager:
transactions_manager.abort()
finally:
endInteraction()
notify(pubevents.PubFailure(request, exc_info, retry))
# Only reachable if Retry is raised and request supports retry.
newrequest = request.retry()
request.close() # Free resources held by the request.
# Set the default layer/skin on the newly generated request
if ISkinnable.providedBy(newrequest):
setDefaultSkin(newrequest)
try:
return publish(newrequest, module_name, after_list, debug)
finally:
newrequest.close()
else:
# Note: 'abort's can fail.
# Nevertheless, we want end request handling.
try:
try:
notify(pubevents.PubBeforeAbort(request, exc_info, False))
finally:
if transactions_manager:
transactions_manager.abort()
finally:
endInteraction()
notify(pubevents.PubFailure(request, exc_info, False))
raise
def publish_module_standard(
module_name,
stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr,
environ=os.environ, debug=0, request=None, response=None):
must_die = 0
status = 200
after_list = [None]
try:
try:
if response is None:
response = Response(stdout=stdout, stderr=stderr)
else:
stdout = response.stdout
# debug is just used by tests (has nothing to do with debug_mode!)
response.handle_errors = not debug
if request is None:
request = Request(stdin, environ, response)
# make sure that the request we hand over has the
# default layer/skin set on it; subsequent code that
# wants to look up views will likely depend on it
if ISkinnable.providedBy(request):
setDefaultSkin(request)
response = publish(request, module_name, after_list, debug=debug)
except (SystemExit, ImportError):
# XXX: Rendered ImportErrors were never caught here because they
# were re-raised as string exceptions. Maybe we should handle
# ImportErrors like all other exceptions. Currently they are not
# re-raised at all, so they don't show up here.
must_die = sys.exc_info()
request.response.exception(1)
except:
# debug is just used by tests (has nothing to do with debug_mode!)
if debug:
raise
request.response.exception()
status = response.getStatus()
if response:
outputBody = getattr(response, 'outputBody', None)
if outputBody is not None:
outputBody()
else:
response = str(response)
if response:
stdout.write(response)
# The module defined a post-access function, call it
if after_list[0] is not None:
after_list[0]()
finally:
if request is not None:
request.close()
if must_die:
# Try to turn exception value into an exit code.
try:
if hasattr(must_die[1], 'code'):
code = must_die[1].code
else:
code = int(must_die[1])
except:
code = must_die[1] and 1 or 0
if hasattr(request.response, '_requestShutdown'):
request.response._requestShutdown(code)
try:
reraise(must_die[0], must_die[1], must_die[2])
finally:
must_die = None
return status
_l = allocate_lock()
def get_module_info(module_name, modules={},
acquire=_l.acquire,
release=_l.release):
if module_name in modules:
return modules[module_name]
if module_name[-4:] == '.cgi':
module_name = module_name[:-4]
acquire()
tb = None
g = globals()
try:
try:
module = __import__(module_name, g, g, ('__doc__',))
# Let the app specify a realm
if hasattr(module, '__bobo_realm__'):
realm = module.__bobo_realm__
elif _default_realm is not None:
realm = _default_realm
else:
realm = module_name
# Check for debug mode
debug_mode = None
if hasattr(module, '__bobo_debug_mode__'):
debug_mode = bool(module.__bobo_debug_mode__)
else:
debug_mode = _default_debug_mode
bobo_before = getattr(module, "__bobo_before__", None)
bobo_after = getattr(module, "__bobo_after__", None)
if hasattr(module, 'bobo_application'):
object = module.bobo_application
elif hasattr(module, 'web_objects'):
object = module.web_objects
else:
object = module
error_hook = getattr(module, 'zpublisher_exception_hook', None)
validated_hook = getattr(module, 'zpublisher_validated_hook', None)
transactions_manager = getattr(
module, 'zpublisher_transactions_manager', None)
if not transactions_manager:
# Create a default transactions manager for use
# by software that uses ZPublisher and ZODB but
# not the rest of Zope.
transactions_manager = DefaultTransactionsManager()
info = (bobo_before, bobo_after, object, realm, debug_mode,
error_hook, validated_hook, transactions_manager)
modules[module_name] = modules[module_name + '.cgi'] = info
return info
except Exception:
t, v, tb = sys.exc_info()
reraise(t, str(v), tb)
finally:
tb = None
release()
class DefaultTransactionsManager:
def begin(self):
transaction.begin()
def commit(self):
transaction.commit()
def abort(self):
transaction.abort()
def recordMetaData(self, object, request):
# Is this code needed?
request_get = request.get
T = transaction.get()
T.note(request_get('PATH_INFO'))
auth_user = request_get('AUTHENTICATED_USER', None)
if auth_user is not None:
T.setUser(auth_user, request_get('AUTHENTICATION_PATH'))
def publish_module(module_name,
stdin=sys.stdin, stdout=sys.stdout, stderr=sys.stderr,
environ=os.environ, debug=0, request=None, response=None):
""" publish a Python module """
return publish_module_standard(module_name, stdin, stdout, stderr,
environ, debug, request, response)
# BBB Zope 5.0
deprecated(
'Please import from ZPublisher.',
Retry='ZPublisher:Retry',
)
......@@ -304,21 +304,30 @@ _request_closer_for_repoze_tm = _RequestCloserForTransaction()
def publish_module(environ, start_response,
_publish=publish, # only for testing
_response_factory=WSGIResponse, # only for testing
_request_factory=HTTPRequest, # only for testing
_publish=publish, # only for testing
_response=None,
_response_factory=WSGIResponse,
_request=None,
_request_factory=HTTPRequest,
module_name='Zope2',
):
module_info = get_module_info()
module_info = get_module_info(module_name)
transactions_manager = module_info[7]
status = 200
stdout = StringIO()
stderr = StringIO()
response = _response_factory(stdout=stdout, stderr=stderr)
if _response is None:
response = _response_factory(stdout=stdout, stderr=stderr)
else:
response = _response
response._http_version = environ['SERVER_PROTOCOL'].split('/')[1]
response._server_version = environ.get('SERVER_SOFTWARE')
request = _request_factory(environ['wsgi.input'], environ, response)
if _request is None:
request = _request_factory(environ['wsgi.input'], environ, response)
else:
request = _request
repoze_tm_active = 'repoze.tm.active' in environ
......
Exception handling
------------------
These tests capture the current behavior. Maybe some of that behavior should
be changed. The behavior caused by handleErrors=False shows only up in tests.
Create the browser object we'll be using.
>>> from Testing.testbrowser import Browser
>>> browser = Browser()
>>> # XXX: browser has no API for disabling redirects
>>> browser.mech_browser.set_handle_redirect(False)
Create the objects that are raising exceptions.
>>> dummy = app.test_folder_1_._setObject('foo', ExceptionRaiser1())
>>> dummy = app.test_folder_1_._setObject('bar', ExceptionRaiser2())
>>> dummy = app.test_folder_1_._setObject('baz', ExceptionRaiser3())
Handle AttributeError.
>>> app.test_folder_1_.foo.exception = AttributeError('ERROR VALUE')
>>> browser.handleErrors = True
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
HTTPError: HTTP Error 500: Internal Server Error
>>> browser.handleErrors = False
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
AttributeError: ERROR VALUE
>>> browser.contents
Handle ImportError.
>>> app.test_folder_1_.foo.exception = ImportError('ERROR VALUE')
>>> browser.handleErrors = True
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
HTTPError: HTTP Error 500: Internal Server Error
>>> browser.handleErrors = False
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
ImportError: ERROR VALUE
>>> browser.contents
Handle zope.publisher.interfaces.NotFound.
>>> from zope.publisher.interfaces import NotFound
>>> app.test_folder_1_.foo.exception = NotFound('OBJECT','NAME')
>>> browser.handleErrors = True
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
HTTPError: HTTP Error 404: Not Found
>>> browser.handleErrors = False
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
NotFound: Object: 'OBJECT', name: 'NAME'
>>> browser.contents
Don't handle SystemExit, even if handleErrors is True.
>>> app.test_folder_1_.foo.exception = SystemExit('ERROR VALUE')
>>> browser.handleErrors = True
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
SystemExit: ERROR VALUE
>>> browser.contents
>>> browser.handleErrors = False
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
SystemExit: ERROR VALUE
>>> browser.contents
Handle zExceptions.Redirect.
>>> from zExceptions import Redirect
>>> app.test_folder_1_.foo.exception = Redirect('LOCATION')
>>> browser.handleErrors = True
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
HTTPError: HTTP Error 302: Found
>>> browser.contents
''
>>> browser.headers['Location']
'LOCATION'
>>> browser.handleErrors = False
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
Redirect: LOCATION
>>> browser.contents
Handle zExceptions.Unauthorized raised by the object. We take the
'WWW-Authenticate' header as a sign that HTTPResponse._unauthorized was called.
>>> from zExceptions import Unauthorized
>>> app.test_folder_1_.foo.exception = Unauthorized('ERROR VALUE')
>>> browser.handleErrors = True
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
HTTPError: HTTP Error 401: Unauthorized
>>> browser.headers['WWW-Authenticate']
'basic realm="Zope2"'
>>> browser.handleErrors = False
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
Unauthorized: ERROR VALUE
>>> browser.contents
And the same with unicode error value.
>>> app.test_folder_1_.foo.exception = Unauthorized(u'ERROR VALUE \u03A9')
>>> browser.handleErrors = True
>>> browser.open('http://localhost/test_folder_1_/foo')
Traceback (most recent call last):
...
HTTPError: HTTP Error 401: Unauthorized
>>> browser.headers['WWW-Authenticate']
'basic realm="Zope2"'
>>> browser.handleErrors = False
>>> try:
... browser.open('http://localhost/test_folder_1_/foo')
... except Unauthorized, e:
... e._message == u'ERROR VALUE \u03A9'
... else:
... print "Unauthorized not raised"
True
>>> browser.contents
Handle zExceptions.Unauthorized raised by BaseRequest.traverse. We take the
'WWW-Authenticate' header as a sign that HTTPResponse._unauthorized was called.
>>> browser.handleErrors = True
>>> browser.open('http://localhost/test_folder_1_/bar')
Traceback (most recent call last):
...
HTTPError: HTTP Error 401: Unauthorized
>>> 'You are not authorized to access this resource.' in browser.contents
True
>>> browser.headers['WWW-Authenticate']
'basic realm="Zope2"'
>>> browser.handleErrors = False
>>> browser.open('http://localhost/test_folder_1_/bar')
Traceback (most recent call last):
...
Unauthorized: You are not authorized to access this resource...
>>> browser.contents
Handle zExceptions.Forbidden raised by BaseRequest.traverse. 'traverse'
converts it into zExceptions.NotFound if we are not in debug mode.
>>> browser.handleErrors = True
>>> browser.open('http://localhost/test_folder_1_/baz')
Traceback (most recent call last):
...
HTTPError: HTTP Error 404: Not Found
>>> '<p><strong>Resource not found</strong></p>' in browser.contents
True
>>> '<p><b>Resource:</b> index_html</p>' in browser.contents
True
>>> browser.handleErrors = False
>>> browser.open('http://localhost/test_folder_1_/baz')
Traceback (most recent call last):
...
NotFound: <html>
...<h2>Site Error</h2>
...<p><strong>Resource not found</strong></p>...
...<p><b>Resource:</b> index_html</p>...
>>> browser.contents
import doctest
from zope.interface import implements
from zope.publisher.interfaces.browser import IDefaultBrowserLayer
from zope.publisher.interfaces.browser import IBrowserRequest
from ZPublisher import Retry
from ZODB.POSException import ConflictError
class Tracer:
"""Trace used to record pathway taken through the publisher
machinery. And provide framework for spewing out exceptions at
just the right time.
"""
def __init__(self):
self.reset()
def reset(self):
self.tracedPath = []
self.exceptions = {}
def append(self, arg):
self.tracedPath.append(arg)
def showTracedPath(self):
for arg in self.tracedPath:
print(arg)
def possiblyRaiseException(self, context):
exceptions = tracer.exceptions.get(context, None)
if exceptions:
exception = exceptions[0]
exceptions.remove(exception)
exceptionShortName = exception.__name__ # KISS
exceptionShortName = exceptionShortName.split("'")[0]
self.append('raising %s from %s' % (exceptionShortName, context))
raise exception
tracer = Tracer()
class TransactionsManager:
"""Mock TransactionManager to replace
Zope2.App.startup.TransactionsManager.
"""
def abort(self):
tracer.append('abort')
def begin(self):
tracer.append('begin')
def commit(self):
tracer.append('commit')
tracer.possiblyRaiseException('commit')
def recordMetaData(self, obj, request):
pass
zpublisher_transactions_manager = TransactionsManager()
def zpublisher_exception_hook(published, request, t, v, traceback):
"""Mock zpublisher_exception_hook to replace
Zope2.App.startup.zpublisher_exception_hook
"""
if issubclass(t, ConflictError):
raise Retry(t, v, traceback)
if t is Retry:
v.reraise()
tracer.append('zpublisher_exception_hook')
tracer.possiblyRaiseException('zpublisher_exception_hook')
return 'zpublisher_exception_hook'
class Object:
"""Mock object for traversing to.
"""
def __call__(self):
tracer.append('__call__')
tracer.possiblyRaiseException('__call__')
return '__call__'
class Response:
"""Mock Response to replace ZPublisher.HTTPResponse.HTTPResponse.
"""
def setBody(self, a):
pass
class Request:
"""Mock Request to replace ZPublisher.HTTPRequest.HTTPRequest.
"""
implements(IBrowserRequest)
args = ()
def __init__(self):
self.response = Response()
def processInputs(self):
pass
def get(self, a, b=''):
return ''
def __setitem__(self, name, value):
pass
def traverse(self, path, validated_hook):
return Object()
def close(self):
pass
retry_count = 0
retry_max_count = 3
def supports_retry(self):
return self.retry_count < self.retry_max_count
def retry(self):
self.retry_count += 1
r = self.__class__()
r.retry_count = self.retry_count
return r
class RequestWithSkinCheck(Request):
def traverse(self, path, validated_hook):
if IDefaultBrowserLayer.providedBy(self):
return Object()
else:
tracer.exceptions['__call__'] = [ValueError]
return Object()
module_name = __name__
after_list = [None]
def testPublisher():
"""
Tests to ensure that the ZPublisher correctly manages the ZODB
transaction boundaries.
>>> from ZPublisher.Publish import publish
ZPublisher will commit the transaction after it has made a
rendering of the object.
>>> tracer.reset()
>>> request = Request()
>>> response = publish(request, module_name, after_list)
>>> tracer.showTracedPath()
begin
__call__
commit
If ZPublisher sees an exception when rendering the requested
object then it will try rendering an error message. The
transaction is eventually aborted after rendering the error
message. (Note that this handling of the transaction boundaries is
different to how Zope3 does things. Zope3 aborts the transaction
before rendering the error message.)
>>> tracer.reset()
>>> tracer.exceptions['__call__'] = [ValueError]
>>> request = Request()
>>> response = publish(request, module_name, after_list)
>>> tracer.showTracedPath()
begin
__call__
raising ValueError from __call__
zpublisher_exception_hook
abort
If there is a futher exception raised while trying to render the
error then ZPublisher is still required to abort the
transaction. And the exception propagates out of publish.
>>> tracer.reset()
>>> tracer.exceptions['__call__'] = [ValueError]
>>> tracer.exceptions['zpublisher_exception_hook'] = [ValueError]
>>> request = Request()
>>> response = publish(request, module_name, after_list)
Traceback (most recent call last):
...
ValueError
>>> tracer.showTracedPath()
begin
__call__
raising ValueError from __call__
zpublisher_exception_hook
raising ValueError from zpublisher_exception_hook
abort
ZPublisher can also deal with database ConflictErrors. The original
transaction is aborted and a second is made in which the request
is attempted again. (There is a fair amount of collaboration to
implement the retry functionality. Relies on Request and
zpublisher_exception_hook also doing the right thing.)
>>> tracer.reset()
>>> tracer.exceptions['__call__'] = [ConflictError]
>>> request = Request()
>>> response = publish(request, module_name, after_list)
>>> tracer.showTracedPath()
begin
__call__
raising ConflictError from __call__
abort
begin
__call__
commit
Same behaviour if there is a conflict when attempting to commit
the transaction. (Again this relies on collaboration from
zpublisher_exception_hook.)
>>> tracer.reset()
>>> tracer.exceptions['commit'] = [ConflictError]
>>> request = Request()
>>> response = publish(request, module_name, after_list)
>>> tracer.showTracedPath()
begin
__call__
commit
raising ConflictError from commit
abort
begin
__call__
commit
ZPublisher will retry the request several times. After 3 retries it
gives up and the exception propogates out.
>>> tracer.reset()
>>> tracer.exceptions['__call__'] = [ConflictError, ConflictError,
... ConflictError, ConflictError]
>>> request = Request()
>>> response = publish(request, module_name, after_list)
Traceback (most recent call last):
...
ConflictError: database conflict error
>>> tracer.showTracedPath()
begin
__call__
raising ConflictError from __call__
abort
begin
__call__
raising ConflictError from __call__
abort
begin
__call__
raising ConflictError from __call__
abort
begin
__call__
raising ConflictError from __call__
abort
However ZPublisher does not retry ConflictErrors that are raised
while trying to render an error message.
>>> tracer.reset()
>>> tracer.exceptions['__call__'] = [ValueError]
>>> tracer.exceptions['zpublisher_exception_hook'] = [ConflictError]
>>> request = Request()
>>> response = publish(request, module_name, after_list)
Traceback (most recent call last):
...
ConflictError: database conflict error
>>> tracer.showTracedPath()
begin
__call__
raising ValueError from __call__
zpublisher_exception_hook
raising ConflictError from zpublisher_exception_hook
abort
The request generator applies the default skin layer to the request.
We have a specially crafted request that tests this. If the
request does not have the required interface it raises an
ValueError. Let's see that this works as expected
>>> tracer.reset()
>>> request = RequestWithSkinCheck()
>>> from zope.publisher.skinnable import setDefaultSkin
>>> setDefaultSkin(request)
>>> response = publish(request, module_name, after_list)
>>> tracer.showTracedPath()
begin
__call__
commit
Retries generate new request objects, the publisher needs to
ensure that the skin layer is applied to those as well. If the
skin layer is not applied to subsequent requests, an ValueError
would be raised here.
>>> tracer.reset()
>>> tracer.exceptions['commit'] = [ConflictError, ConflictError,
... ConflictError, ConflictError]
>>> request = RequestWithSkinCheck()
>>> setDefaultSkin(request)
>>> response = publish(request, module_name, after_list)
Traceback (most recent call last):
...
ConflictError: database conflict error
>>> tracer.showTracedPath()
begin
__call__
commit
raising ConflictError from commit
abort
begin
__call__
commit
raising ConflictError from commit
abort
begin
__call__
commit
raising ConflictError from commit
abort
begin
__call__
commit
raising ConflictError from commit
abort
"""
pass
class ObjectNotFound:
"""Mock object for traversing to.
"""
def __call__(self):
tracer.append('ObjectNotFound')
return 'ObjectNotFound'
class PathRequest(Request):
def __init__(self, path):
self.PATH_INFO = path
Request.__init__(self)
def get(self, a, b=''):
if a == 'PATH_INFO':
return self.PATH_INFO
else:
return ''
def traverse(self, path, validated_hook):
if path == self.PATH_INFO:
return Object()
else:
return ObjectNotFound()
def testPublishPath():
"""
Tests to ensure that publish passes paths through to the request without
stripping spaces (as this can lead to google indexing pages with a trailing
space when someone has a typo in an href to you're site). Zope bug #1991.
>>> from ZPublisher.Publish import publish
Without the trailing space, should work normally
>>> tracer.reset()
>>> request = PathRequest('/foo')
>>> response = publish(request, module_name, after_list)
>>> tracer.showTracedPath()
begin
__call__
commit
Now with a trailing space, should also work normally, but in zope 2.9.0
and earlier publish did a strip() on the path so instead of __call__ you
an ObjectNotFound in the trace.
>>> tracer.reset()
>>> request = PathRequest('/foo ')
>>> response = publish(request, module_name, after_list)
>>> tracer.showTracedPath()
begin
__call__
commit
"""
def test_suite():
return doctest.DocTestSuite()
##############################################################################
#
# Copyright (c) 2010 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Functional tests for exception handling.
"""
import unittest
from Testing.ZopeTestCase import FunctionalDocFileSuite
from OFS.SimpleItem import SimpleItem
class ExceptionRaiser1(SimpleItem):
def index_html(self):
"""DOCSTRING
"""
raise self.exception
class ExceptionRaiser2(ExceptionRaiser1):
__roles__ = ()
class ExceptionRaiser3(SimpleItem):
def index_html(self):
return 'NO DOCSTRING'
def test_suite():
return unittest.TestSuite([
FunctionalDocFileSuite(
'exception_handling.txt',
globs={
'ExceptionRaiser1': ExceptionRaiser1,
'ExceptionRaiser2': ExceptionRaiser2,
'ExceptionRaiser3': ExceptionRaiser3,
}),
])
......@@ -6,9 +6,7 @@ from ZODB.POSException import ConflictError
from zope.interface.verify import verifyObject
from zope.event import subscribers
from ZPublisher.Publish import publish, Retry
from ZPublisher.BaseRequest import BaseRequest
from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.pubevents import (
PubStart, PubSuccess, PubFailure,
PubAfterTraversal, PubBeforeCommit, PubBeforeAbort,
......@@ -19,6 +17,10 @@ from ZPublisher.interfaces import (
IPubAfterTraversal, IPubBeforeCommit,
IPubBeforeStreaming,
)
from ZPublisher import Retry
from ZPublisher.WSGIPublisher import publish_module
from ZPublisher.WSGIPublisher import WSGIResponse
PUBMODULE = 'TEST_testpubevents'
......@@ -71,85 +73,77 @@ class TestPubEvents(TestCase):
del modules[PUBMODULE]
subscribers[:] = self._saved_subscribers
def _publish(self, request, module_name):
def start_response(status, headers):
pass
publish_module({
'SERVER_PROTOCOL': 'HTTP/1.1',
'SERVER_NAME': 'localhost',
'SERVER_PORT': 'localhost',
'REQUEST_METHOD': 'GET',
}, start_response, _request=request, module_name=module_name)
def testSuccess(self):
r = self.request
r.action = 'succeed'
publish(r, PUBMODULE, [None])
self._publish(r, PUBMODULE)
events = self.reporter.events
self.assertEqual(len(events), 4)
self.assert_(isinstance(events[0], PubStart))
self.assertEqual(events[0].request, r)
self.assert_(isinstance(events[-1], PubSuccess))
self.assertEqual(events[-1].request, r)
# test AfterTraversal and BeforeCommit as well
self.assert_(isinstance(events[1], PubAfterTraversal))
self.assertEqual(events[1].request, r)
self.assert_(isinstance(events[2], PubBeforeCommit))
self.assertEqual(events[2].request, r)
self.assert_(isinstance(events[3], PubSuccess))
self.assertEqual(events[3].request, r)
def testFailureReturn(self):
r = self.request
r.action = 'fail_return'
publish(r, PUBMODULE, [None])
self.assertRaises(Exception, self._publish, r, PUBMODULE)
events = self.reporter.events
self.assertEqual(len(events), 3)
self.assert_(isinstance(events[0], PubStart))
self.assertEqual(events[0].request, r)
self.assert_(isinstance(events[1], PubBeforeAbort))
self.assertEqual(events[1].request, r)
self.assertEqual(events[1].retry, False)
self.assert_(isinstance(events[2], PubFailure))
self.assertEqual(events[2].request, r)
self.assertEqual(events[2].retry, False)
self.assertEqual(len(events[2].exc_info), 3)
def testFailureException(self):
r = self.request
r.action = 'fail_exception'
self.assertRaises(Exception, publish, r, PUBMODULE, [None])
self.assertRaises(Exception, self._publish, r, PUBMODULE)
events = self.reporter.events
self.assertEqual(len(events), 3)
self.assert_(isinstance(events[0], PubStart))
self.assertEqual(events[0].request, r)
self.assert_(isinstance(events[1], PubBeforeAbort))
self.assertEqual(events[1].request, r)
self.assertEqual(events[1].retry, False)
self.assertEqual(len(events[1].exc_info), 3)
self.assert_(isinstance(events[2], PubFailure))
self.assertEqual(events[2].request, r)
self.assertEqual(events[2].retry, False)
self.assertEqual(len(events[2].exc_info), 3)
def testFailureConflict(self):
r = self.request
r.action = 'conflict'
publish(r, PUBMODULE, [None])
self.assertRaises(ConflictError, self._publish, r, PUBMODULE)
events = self.reporter.events
self.assertEqual(len(events), 7)
self.assert_(isinstance(events[0], PubStart))
self.assertEqual(events[0].request, r)
self.assert_(isinstance(events[1], PubBeforeAbort))
self.assertEqual(events[1].request, r)
self.assertEqual(events[1].retry, True)
self.assertEqual(len(events[1].exc_info), 3)
self.assert_(isinstance(events[1].exc_info[1], ConflictError))
self.assert_(isinstance(events[2], PubFailure))
self.assertEqual(events[2].request, r)
self.assertEqual(events[2].retry, True)
self.assertEqual(len(events[2].exc_info), 3)
self.assert_(isinstance(events[2].exc_info[1], ConflictError))
self.assert_(isinstance(events[3], PubStart))
self.assert_(isinstance(events[4], PubAfterTraversal))
self.assert_(isinstance(events[5], PubBeforeCommit))
self.assert_(isinstance(events[6], PubSuccess))
def testStreaming(self):
out = StringIO()
response = HTTPResponse(stdout=out)
response = WSGIResponse(stdout=out)
response.write('datachunk1')
response.write('datachunk2')
......@@ -184,7 +178,7 @@ class _Response(object):
class _Request(BaseRequest):
response = _Response()
response = WSGIResponse()
_hacked_path = False
args = ()
......@@ -193,14 +187,6 @@ class _Request(BaseRequest):
self['PATH_INFO'] = self['URL'] = ''
self.steps = []
def supports_retry(self):
return True
def retry(self):
r = self.__class__()
r.action = 'succeed'
return r
def traverse(self, *unused, **unused_kw):
action = self.action
if action.startswith('fail'):
......@@ -216,8 +202,28 @@ class _Request(BaseRequest):
# override to get rid of the 'EndRequestEvent' notification
pass
class _TransactionsManager(object):
def __init__(self, *args, **kw):
self.tracer = []
def abort(self):
self.tracer.append('abort')
def begin(self):
self.tracer.append('begin')
def commit(self):
self.tracer.append('commit')
def recordMetaData(self, obj, request):
pass
# define things necessary for publication
bobo_application = _Application()
zpublisher_transactions_manager = _TransactionsManager()
def zpublisher_exception_hook(parent, request, *unused):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment