Commit d02792d6 authored by Hanno Schlichting's avatar Hanno Schlichting

Add support for exception views to WSGIPublisher.

parent af09790a
......@@ -16,6 +16,8 @@ Bugs Fixed
Features Added
++++++++++++++
- Add support for exception views to WSGIPublisher.
- Add support for ConflictError and TransientError retry logic directly
into WSGIPublisher.
......
......@@ -28,6 +28,7 @@ from zExceptions import (
Unauthorized,
)
from ZODB.POSException import ConflictError
from zope.component import queryMultiAdapter
from zope.event import notify
from zope.security.management import newInteraction, endInteraction
from zope.publisher.skinnable import setDefaultSkin
......@@ -266,12 +267,25 @@ def _publish_response(request, response, module_info, _publish=publish):
try:
with transaction_pubevents(request):
response = _publish(request, module_info)
except Unauthorized as exc:
response._unauthorized(exc)
except HTTPRedirection as exc:
# TODO: HTTPOk is only handled by the httpexceptions
# middleware, maybe it should be handled here.
response.redirect(exc)
except Exception as exc:
view = queryMultiAdapter((exc, request), name=u'index.html')
if view is not None:
parents = request.get('PARENTS')
if parents:
view.__parent__ = parents[0]
response.setStatus(exc.__class__)
response.setBody(view())
return response
if isinstance(exc, Unauthorized):
response._unauthorized(exc)
return response
raise
return response
......
......@@ -12,6 +12,12 @@
##############################################################################
import unittest
from zope.component.testing import PlacelessSetup
from zope.interface.common.interfaces import IException
from zope.publisher.interfaces import INotFound
from zope.security.interfaces import IUnauthorized
from zope.security.interfaces import IForbidden
from ZPublisher.WSGIPublisher import get_module_info
......@@ -180,13 +186,15 @@ class TestPublish(unittest.TestCase):
self.assertEqual(response.realm, None)
class TestPublishModule(unittest.TestCase):
class TestPublishModule(unittest.TestCase, PlacelessSetup):
def setUp(self):
from zope.testing.cleanup import cleanUp
cleanUp()
PlacelessSetup.setUp(self)
def tearDown(self):
PlacelessSetup.tearDown(self)
from zope.testing.cleanup import cleanUp
cleanUp()
......@@ -400,6 +408,108 @@ class TestPublishModule(unittest.TestCase):
_request_factory=_request_factory)
self.assertTrue(_request._closed)
def testCustomExceptionViewUnauthorized(self):
from AccessControl import Unauthorized
registerExceptionView(IUnauthorized)
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
_publish._raise = Unauthorized('argg')
app_iter = self._callFUT(environ, start_response, _publish)
body = ''.join(app_iter)
self.assertEqual(start_response._called_with[0][0], '401 Unauthorized')
self.assertTrue('Exception View: Unauthorized' in body)
def testCustomExceptionViewForbidden(self):
from zExceptions import Forbidden
registerExceptionView(IForbidden)
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
_publish._raise = Forbidden('argh')
app_iter = self._callFUT(environ, start_response, _publish)
body = ''.join(app_iter)
self.assertEqual(start_response._called_with[0][0], '403 Forbidden')
self.assertTrue('Exception View: Forbidden' in body)
def testCustomExceptionViewNotFound(self):
from zExceptions import NotFound
registerExceptionView(INotFound)
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
_publish._raise = NotFound('argh')
app_iter = self._callFUT(environ, start_response, _publish)
body = ''.join(app_iter)
self.assertEqual(start_response._called_with[0][0], '404 Not Found')
self.assertTrue('Exception View: NotFound' in body)
def testCustomExceptionViewBadRequest(self):
from zExceptions import BadRequest
registerExceptionView(IException)
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
_publish._raise = BadRequest('argh')
app_iter = self._callFUT(environ, start_response, _publish)
body = ''.join(app_iter)
self.assertEqual(start_response._called_with[0][0], '400 Bad Request')
self.assertTrue('Exception View: BadRequest' in body)
def testCustomExceptionViewInternalError(self):
from zExceptions import InternalError
registerExceptionView(IException)
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
_publish._raise = InternalError('argh')
app_iter = self._callFUT(environ, start_response, _publish)
body = ''.join(app_iter)
self.assertEqual(
start_response._called_with[0][0], '500 Internal Server Error')
self.assertTrue('Exception View: InternalError' in body)
def testRedirectNoExceptionView(self):
from zExceptions import Redirect
registerExceptionView(IException)
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
_publish._raise = Redirect('http://localhost:9/')
app_iter = self._callFUT(environ, start_response, _publish)
body = ''.join(app_iter)
self.assertEqual(body, '')
status, headers = start_response._called_with[0]
self.assertEqual(status, '302 Found')
headers = dict(headers)
self.assertEqual(headers['Location'], 'http://localhost:9/')
class CustomExceptionView(object):
def __init__(self, context, request):
self.context = context
self.__parent__ = None
self.request = request
def __call__(self):
return ('Exception View: %s\nContext: %s' % (
self.context.__class__.__name__,
self.__parent__.__class__.__name__))
def registerExceptionView(for_):
from zope.interface import Interface
from zope.component import getGlobalSiteManager
from zope.publisher.interfaces.browser import IDefaultBrowserLayer
gsm = getGlobalSiteManager()
gsm.registerAdapter(
CustomExceptionView,
required=(for_, IDefaultBrowserLayer),
provided=Interface,
name=u'index.html',
)
class DummyRequest(dict):
_processedInputs = False
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment