Commit 1a36e9fb authored by Tres Seaver's avatar Tres Seaver

Merge r112823-112876 from the tseaver-fix_wsgi branch.

- Full test coverage for ZPublisher.WSGIPublisher.

- Add 'add_user' script and finder help, borrowed from 'repoze.zope2'.

- Add call to 'setDefaultSkin(request)' to fix view lookups.

- Override the 'write' method in 'WSGIHTTPReponse' to avoid inappropriate
  stringification, allowing things like the Plone resource registry to work
  properly.

- Defer closing the request until the transaction commits, if and only if
  we know that middleware is handling the transaction.

- Make the WSGI publish function deal with three special cases:
  
  - App returns a file-like object as the response body (keep the app from
    stringifying it).
  
  - App raises an Unauthorized exception (just set the response status, to
    let middleware handle issuing a challenge).
  
  - App raises a Redirect exception (just call redirect on the response).

- Adapt test code to the new signature of 'HTTPResponse._cookie_list',
  which now returns a list of two-tuples, rather than rendered strings.

- Get quickstart page rendering under plain paste config.

- Make WSGIResponse.__str__ raise and exception, preventing an
  'attractive nuisance.'

  The real logic is in finalize and listHeaders now, and the publish*
  functions call those directly.

- Move finalization logic out of HTTPResponse.listHeaders.

- Refactor WSGIHTTPResponse to avoid the need to use str() and parse.
  
  o Instead, compute status and headers directly.
  
- Chop out error and transaction handling from the 'publish*' functions:
  the point of doing WSGI is to move that stuff out of the application,
  and out into middleware.
  
- One backward incompatibility:  the special "shutdown" behavior is gone
  here.  It should be replaced by something in view code.

- Factor out computation of the list of response headers from stringifying
  them, allowing WSGIHTTPResponse do reuse them as tuples.

- Chop out copy-and-paste fossils irrelevant to WSGI publication.

- Replace contorted logic for case-normalizing response headers with
  idiomatic Python.

- More PEP8 conformance.
parent 0e977040
......@@ -116,11 +116,15 @@ setup(name='Zope2',
include_package_data=True,
zip_safe=False,
entry_points={
'paste.app_factory': [
'main=Zope2.Startup.run:make_wsgi_app',
],
'console_scripts': [
'mkzopeinstance=Zope2.utilities.mkzopeinstance:main',
'runzope=Zope2.Startup.run:run',
'zopectl=Zope2.Startup.zopectl:run',
'zpasswd=Zope2.utilities.zpasswd:main',
'addzope2user=Zope2.utilities.adduser:main'
],
},
)
......@@ -184,7 +184,7 @@ def http(request_string, handle_errors=True):
)
header_output.setResponseStatus(response.getStatus(), response.errmsg)
header_output.setResponseHeaders(response.headers)
header_output.appendResponseHeaders(response._cookie_list())
header_output.headersl.extend(response._cookie_list())
header_output.appendResponseHeaders(response.accumulated_headers)
sync()
......
......@@ -59,7 +59,7 @@ class PublisherConnection(testing.PublisherConnection):
l = key.find('-', start)
headers.append((key, val))
# get the cookies, breaking them into tuples for sorting
cookies = [(c[:10], c[12:]) for c in real_response._cookie_list()]
cookies = real_response._cookie_list()
headers.extend(cookies)
headers.sort()
headers.insert(0, ('Status', "%s %s" % (status, reason)))
......
......@@ -883,9 +883,9 @@ class HTTPResponse(BaseResponse):
# of name=value pairs may be quoted.
if attrs.get('quoted', True):
cookie = 'Set-Cookie: %s="%s"' % (name, quote(attrs['value']))
cookie = '%s="%s"' % (name, quote(attrs['value']))
else:
cookie = 'Set-Cookie: %s=%s' % (name, quote(attrs['value']))
cookie = '%s=%s' % (name, quote(attrs['value']))
for name, v in attrs.items():
name = name.lower()
if name == 'expires':
......@@ -904,41 +904,60 @@ class HTTPResponse(BaseResponse):
# and block read/write access via JavaScript
elif name == 'http_only' and v:
cookie = '%s; HTTPOnly' % cookie
cookie_list.append(cookie)
cookie_list.append(('Set-Cookie', cookie))
# Should really check size of cookies here!
return cookie_list
def finalize(self):
""" Set headers required by various parts of protocol.
"""
body = self.body
if (not 'content-length' in self.headers and
not 'transfer-encoding' in self.headers):
self.setHeader('content-length', len(body))
return "%d %s" % (self.status, self.errmsg), self.listHeaders()
def listHeaders(self):
""" Return a list of (key, value) pairs for our headers.
o Do appropriate case normalization.
"""
result = [
('X-Powered-By', 'Zope (www.zope.org), Python (www.python.org)')
]
for key, value in self.headers.items():
if key.lower() == key:
# only change non-literal header names
key = '-'.join([x.capitalize() for x in key.split('-')])
result.append((key, value))
result.extend(self._cookie_list())
result.extend(self.accumulated_headers)
return result
def __str__(self,
html_search=re.compile('<html>',re.I).search,
):
if self._wrote:
return '' # Streaming output was used.
headers = self.headers
status, headers = self.finalize()
body = self.body
if not headers.has_key('content-length') and \
not headers.has_key('transfer-encoding'):
self.setHeader('content-length',len(body))
chunks = []
append = chunks.append
# status header must come first.
append("Status: %d %s" % (self.status, self.errmsg))
append("X-Powered-By: Zope (www.zope.org), Python (www.python.org)")
for key, value in headers.items():
if key.lower() == key:
# only change non-literal header names
key = '-'.join([x.capitalize() for x in key.split('-')])
append("%s: %s" % (key, value))
chunks.extend(self._cookie_list())
for key, value in self.accumulated_headers:
append("%s: %s" % (key, value))
append('') # RFC 2616 mandates empty line between headers and payload
append(body)
chunks.append("Status: %s" % status)
for key, value in headers:
chunks.append("%s: %s" % (key, value))
# RFC 2616 mandates empty line between headers and payload
chunks.append('')
chunks.append(body)
return '\r\n'.join(chunks)
def write(self,data):
......
......@@ -13,16 +13,23 @@
""" Python Object Publisher -- Publish Python objects on web servers
"""
from cStringIO import StringIO
import sys
import time
import transaction
from zExceptions import Redirect
from zExceptions import Unauthorized
from zope.event import notify
from zope.publisher.skinnable import setDefaultSkin
from ZServer.medusa.http_date import build_http_date
from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.maybe_lock import allocate_lock
from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.mapply import mapply
from ZPublisher.pubevents import PubBeforeStreaming
from ZPublisher.Publish import call_object
from ZPublisher.Publish import dont_publish_class
from ZPublisher.Publish import get_module_info
from ZPublisher.Publish import missing_name
_NOW = None # overwrite for testing
def _now():
......@@ -38,18 +45,19 @@ class WSGIResponse(HTTPResponse):
Most significantly, streaming is not (yet) supported.
"""
_streaming = 0
_streaming = _chunking = 0
_http_version = None
_server_version = None
_http_connection = None
def __str__(self):
# Set this value to 1 if streaming output in
# HTTP/1.1 should use chunked encoding
http_chunk = 0
if self._wrote:
if self._chunking:
return '0\r\n\r\n'
else:
return ''
# Append any "cleanup" functions to this list.
after_list = ()
def finalize(self):
headers = self.headers
body = self.body
......@@ -67,19 +75,6 @@ class WSGIResponse(HTTPResponse):
if content_length is None and not self._streaming:
self.setHeader('content-length', len(body))
chunks = []
append = chunks.append
# status header must come first.
version = self._http_version or '1.0'
append("HTTP/%s %d %s" % (version, self.status, self.errmsg))
# add zserver headers
if self._server_version is not None:
append('Server: %s' % self._server_version)
append('Date: %s' % build_http_date(_now()))
if self._http_version == '1.0':
if (self._http_connection == 'keep-alive' and
'content-length' in self.headers):
......@@ -89,398 +84,166 @@ class WSGIResponse(HTTPResponse):
# Close the connection if we have been asked to.
# Use chunking if streaming output.
if self._http_version=='1.1':
if self._http_connection=='close':
self.setHeader('Connection','close')
if self._http_version == '1.1':
if self._http_connection == 'close':
self.setHeader('Connection', 'close')
elif not self.headers.has_key('content-length'):
if self.http_chunk and self._streaming:
self.setHeader('Transfer-Encoding','chunked')
self._chunking=1
self.setHeader('Transfer-Encoding', 'chunked')
self._chunking = 1
else:
self.setHeader('Connection','close')
for key, val in headers.items():
if key.lower()==key:
# only change non-literal header names
key="%s%s" % (key[:1].upper(), key[1:])
start=0
l=key.find('-',start)
while l >= start:
key="%s-%s%s" % (key[:l],key[l+1:l+2].upper(),key[l+2:])
start=l+1
l=key.find('-',start)
append("%s: %s" % (key, val))
return '%s %s' % (self.status, self.errmsg), self.listHeaders()
if self.cookies:
chunks.extend(self._cookie_list())
def listHeaders(self):
result = []
if self._server_version:
result.append(('Server', self._server_version))
for key, value in self.accumulated_headers:
append("%s: %s" % (key, value))
append('') # RFC 2616 mandates empty line between headers and payload
append(body)
result.append(('Date', build_http_date(_now())))
result.extend(HTTPResponse.listHeaders(self))
return result
return "\r\n".join(chunks)
def _unauthorized(self):
self.setStatus(401)
def write(self,data):
""" Add data to our output stream.
class Retry(Exception):
"""Raise this to retry a request
HTML data may be returned using a stream-oriented interface.
This allows the browser to display partial results while
computation of a response to proceed.
"""
if not self._streaming:
def __init__(self, t=None, v=None, tb=None):
self._args=t, v, tb
def reraise(self):
t, v, tb = self._args
if t is None: t=Retry
if tb is None: raise t, v
try: raise t, v, tb
finally: tb=None
def call_object(object, args, request):
result=apply(object,args) # Type s<cr> to step into published object.
return result
def missing_name(name, request):
if name=='self': return request['PARENTS'][0]
request.response.badRequestError(name)
notify(PubBeforeStreaming(self))
def dont_publish_class(klass, request):
request.response.forbiddenError("class %s" % klass.__name__)
self._streaming = 1
self.stdout.flush()
_default_debug_mode = False
_default_realm = None
self.stdout.write(data)
def set_default_debug_mode(debug_mode):
global _default_debug_mode
_default_debug_mode = debug_mode
def setBody(self, body, title='', is_error=0):
if isinstance(body, file):
body.seek(0, 2)
length = body.tell()
body.seek(0)
self.setHeader('Content-Length', '%d' % length)
self.body = body
else:
HTTPResponse.setBody(self, body, title, is_error)
def set_default_authentication_realm(realm):
global _default_realm
_default_realm = realm
def __str__(self):
def publish(request, module_name, after_list, debug=0,
# Optimize:
call_object=call_object,
missing_name=missing_name,
dont_publish_class=dont_publish_class,
mapply=mapply,
# XXX Consider how we are to handle the cases this logic was trying
# to cover
#if self._wrote:
# if self._chunking:
# return '0\r\n\r\n'
# else:
# return ''
raise NotImplementedError
def publish(request, module_name,
_get_module_info=get_module_info, # only for testing
):
(bobo_before,
bobo_after,
object,
realm,
debug_mode,
err_hook,
validated_hook,
transactions_manager,
)= _get_module_info(module_name)
(bobo_before, bobo_after, object, realm, debug_mode, err_hook,
validated_hook, transactions_manager)= get_module_info(module_name)
parents=None
response=None
try:
request.processInputs()
response = request.response
request_get=request.get
response=request.response
# First check for "cancel" redirect:
if request_get('SUBMIT','').strip().lower()=='cancel':
cancel=request_get('CANCEL_ACTION','')
if cancel:
raise Redirect, cancel
if bobo_after is not None:
response.after_list += (bobo_after,)
after_list[0]=bobo_after
if debug_mode:
response.debug_mode=debug_mode
if realm and not request.get('REMOTE_USER',None):
response.realm=realm
response.debug_mode = debug_mode
if realm and not request.get('REMOTE_USER', None):
response.realm = realm
if bobo_before is not None:
bobo_before()
# Get the path list.
# According to RFC1738 a trailing space in the path is valid.
path=request_get('PATH_INFO')
request['PARENTS']=parents=[object]
if transactions_manager:
transactions_manager.begin()
path = request.get('PATH_INFO')
object=request.traverse(path, validated_hook=validated_hook)
request['PARENTS'] = parents = [object]
object = request.traverse(path, validated_hook=validated_hook)
if transactions_manager:
transactions_manager.recordMetaData(object, request)
result=mapply(object, request.args, request,
call_object,1,
result = mapply(object,
request.args,
request,
call_object,
1,
missing_name,
dont_publish_class,
request, bind=1)
request,
bind=1,
)
if result is not response:
response.setBody(result)
if transactions_manager:
transactions_manager.commit()
return response
except:
# DM: provide nicer error message for FTP
sm = None
if response is not None:
sm = getattr(response, "setMessage", None)
if sm is not None:
from asyncore import compact_traceback
cl,val= sys.exc_info()[:2]
sm('%s: %s %s' % (
getattr(cl,'__name__',cl), val,
debug_mode and compact_traceback()[-1] or ''))
if err_hook is not None:
if parents:
parents=parents[0]
try:
try:
return err_hook(parents, request,
sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2],
)
except Retry:
if not request.supports_retry():
return err_hook(parents, request,
sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2],
)
finally:
if transactions_manager:
transactions_manager.abort()
# Only reachable if Retry is raised and request supports retry.
newrequest=request.retry()
request.close() # Free resources held by the request.
try:
return publish(newrequest, module_name, after_list, debug)
finally:
newrequest.close()
else:
if transactions_manager:
transactions_manager.abort()
raise
def publish_module_standard(environ, start_response):
must_die=0
status=200
after_list=[None]
def publish_module(environ, start_response,
_publish=publish, # only for testing
_response_factory=WSGIResponse, # only for testing
_request_factory=HTTPRequest, # only for testing
):
status = 200
stdout = StringIO()
stderr = StringIO()
response = WSGIResponse(stdout=stdout, stderr=stderr)
response = _response_factory(stdout=stdout, stderr=stderr)
response._http_version = environ['SERVER_PROTOCOL'].split('/')[1]
response._http_connection = environ.get('CONNECTION_TYPE', 'close')
response._server_version = environ['SERVER_SOFTWARE']
response._server_version = environ.get('SERVER_SOFTWARE')
request = HTTPRequest(environ['wsgi.input'], environ, response)
# Let's support post-mortem debugging
handle_errors = environ.get('wsgi.handleErrors', True)
request = _request_factory(environ['wsgi.input'], environ, response)
setDefaultSkin(request)
try:
response = publish(request, 'Zope2', after_list=[None],
debug=handle_errors)
except SystemExit, v:
must_die=sys.exc_info()
request.response.exception(must_die)
except ImportError, v:
if isinstance(v, tuple) and len(v)==3: must_die=v
elif hasattr(sys, 'exc_info'): must_die=sys.exc_info()
else: must_die = SystemExit, v, sys.exc_info()[2]
request.response.exception(1, v)
except:
request.response.exception()
status=response.getStatus()
if response:
response = _publish(request, 'Zope2')
except Unauthorized, v:
response._unauthorized()
except Redirect, v:
response.redirect(v)
# Start the WSGI server response
status = response.getHeader('status')
# ZServerHTTPResponse calculates all headers and things when you
# call it's __str__, so we need to get it, and then munge out
# the headers from it. It's a bit backwards, and we might optimize
# this by not using ZServerHTTPResponse at all, and making the
# HTTPResponses more WSGI friendly. But this works.
result = str(response)
headers, body = result.split('\r\n\r\n',1)
headers = [tuple(n.split(': ',1)) for n in headers.split('\r\n')[1:]]
status, headers = response.finalize()
start_response(status, headers)
if isinstance(response.body, file):
result = response.body
else:
# If somebody used response.write, that data will be in the
# stdout StringIO, so we put that before the body.
# XXX This still needs verification that it really works.
result=(stdout.getvalue(), body)
request.close()
stdout.close()
result = (stdout.getvalue(), response.body)
if after_list[0] is not None: after_list[0]()
if 'repoze.tm.active' in environ:
txn = transaction.get()
txn.addAfterCommitHook(lambda ok: request.close())
else:
request.close() # this aborts the transation!
if must_die:
# Try to turn exception value into an exit code.
try:
if hasattr(must_die[1], 'code'):
code = must_die[1].code
else: code = int(must_die[1])
except:
code = must_die[1] and 1 or 0
if hasattr(request.response, '_requestShutdown'):
request.response._requestShutdown(code)
stdout.close()
try: raise must_die[0], must_die[1], must_die[2]
finally: must_die=None
for callable in response.after_list:
callable()
# Return the result body iterable.
return result
_l=allocate_lock()
def get_module_info(module_name, modules={},
acquire=_l.acquire,
release=_l.release,
):
if modules.has_key(module_name): return modules[module_name]
if module_name[-4:]=='.cgi': module_name=module_name[:-4]
acquire()
tb=None
g = globals()
try:
try:
module=__import__(module_name, g, g, ('__doc__',))
# Let the app specify a realm
if hasattr(module,'__bobo_realm__'):
realm=module.__bobo_realm__
elif _default_realm is not None:
realm=_default_realm
else:
realm=module_name
# Check for debug mode
debug_mode=None
if hasattr(module,'__bobo_debug_mode__'):
debug_mode=not not module.__bobo_debug_mode__
else:
debug_mode = _default_debug_mode
bobo_before = getattr(module, "__bobo_before__", None)
bobo_after = getattr(module, "__bobo_after__", None)
if hasattr(module,'bobo_application'):
object=module.bobo_application
elif hasattr(module,'web_objects'):
object=module.web_objects
else: object=module
error_hook=getattr(module,'zpublisher_exception_hook', None)
validated_hook=getattr(module,'zpublisher_validated_hook', None)
transactions_manager=getattr(
module,'zpublisher_transactions_manager', None)
if not transactions_manager:
# Create a default transactions manager for use
# by software that uses ZPublisher and ZODB but
# not the rest of Zope.
transactions_manager = DefaultTransactionsManager()
info= (bobo_before, bobo_after, object, realm, debug_mode,
error_hook, validated_hook, transactions_manager)
modules[module_name]=modules[module_name+'.cgi']=info
return info
except:
t,v,tb=sys.exc_info()
v=str(v)
raise ImportError, (t, v), tb
finally:
tb=None
release()
class DefaultTransactionsManager:
def begin(self):
transaction.begin()
def commit(self):
transaction.commit()
def abort(self):
transaction.abort()
def recordMetaData(self, object, request):
# Is this code needed?
request_get = request.get
T= transaction.get()
T.note(request_get('PATH_INFO'))
auth_user=request_get('AUTHENTICATED_USER',None)
if auth_user is not None:
T.setUser(auth_user, request_get('AUTHENTICATION_PATH'))
# profiling support
_pfile = None # profiling filename
_plock=allocate_lock() # profiling lock
_pfunc=publish_module_standard
_pstat=None
def install_profiling(filename):
global _pfile
_pfile = filename
def pm(environ, start_response):
try:
r=_pfunc(environ, start_response)
except: r=None
sys._pr_=r
def publish_module_profiled(environ, start_response):
import profile, pstats
global _pstat
_plock.acquire()
try:
if request is not None:
path_info=request.get('PATH_INFO')
else: path_info=environ.get('PATH_INFO')
if path_info[-14:]=='manage_profile':
return _pfunc(environ, start_response)
pobj=profile.Profile()
pobj.runcall(pm, menviron, start_response)
result=sys._pr_
pobj.create_stats()
if _pstat is None:
_pstat=sys._ps_=pstats.Stats(pobj)
else: _pstat.add(pobj)
finally:
_plock.release()
if result is None:
try:
error=sys.exc_info()
file=open(_pfile, 'w')
file.write(
"See the url "
"http://www.python.org/doc/current/lib/module-profile.html"
"\n for information on interpreting profiler statistics.\n\n"
)
sys.stdout=file
_pstat.strip_dirs().sort_stats('cumulative').print_stats(250)
_pstat.strip_dirs().sort_stats('time').print_stats(250)
file.flush()
file.close()
except: pass
raise error[0], error[1], error[2]
return result
def publish_module(environ, start_response):
""" publish a Python module, with or without profiling enabled """
if _pfile: # profiling is enabled
return publish_module_profiled(environ, start_response)
else:
return publish_module_standard(environ, start_response)
......@@ -208,7 +208,7 @@ class HTTPResponseTests(unittest.TestCase):
self.assertEqual(cookie.get('quoted'), True)
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"')
self.assertEqual(cookies[0], ('Set-Cookie', 'foo="bar"'))
def test_setCookie_w_expires(self):
EXPIRES = 'Wed, 31-Dec-97 23:59:59 GMT'
......@@ -223,7 +223,7 @@ class HTTPResponseTests(unittest.TestCase):
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0],
'Set-Cookie: foo="bar"; Expires=%s' % EXPIRES)
('Set-Cookie', 'foo="bar"; Expires=%s' % EXPIRES))
def test_setCookie_w_domain(self):
response = self._makeOne()
......@@ -237,7 +237,7 @@ class HTTPResponseTests(unittest.TestCase):
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0],
'Set-Cookie: foo="bar"; Domain=example.com')
('Set-Cookie', 'foo="bar"; Domain=example.com'))
def test_setCookie_w_path(self):
response = self._makeOne()
......@@ -250,7 +250,7 @@ class HTTPResponseTests(unittest.TestCase):
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"; Path=/')
self.assertEqual(cookies[0], ('Set-Cookie', 'foo="bar"; Path=/'))
def test_setCookie_w_comment(self):
response = self._makeOne()
......@@ -263,7 +263,8 @@ class HTTPResponseTests(unittest.TestCase):
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"; Comment=COMMENT')
self.assertEqual(cookies[0],
('Set-Cookie', 'foo="bar"; Comment=COMMENT'))
def test_setCookie_w_secure_true_value(self):
response = self._makeOne()
......@@ -276,7 +277,7 @@ class HTTPResponseTests(unittest.TestCase):
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"; Secure')
self.assertEqual(cookies[0], ('Set-Cookie','foo="bar"; Secure'))
def test_setCookie_w_secure_false_value(self):
response = self._makeOne()
......@@ -289,7 +290,7 @@ class HTTPResponseTests(unittest.TestCase):
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"')
self.assertEqual(cookies[0], ('Set-Cookie', 'foo="bar"'))
def test_setCookie_w_httponly_true_value(self):
response = self._makeOne()
......@@ -302,7 +303,7 @@ class HTTPResponseTests(unittest.TestCase):
cookie_list = response._cookie_list()
self.assertEqual(len(cookie_list), 1)
self.assertEqual(cookie_list[0], 'Set-Cookie: foo="bar"; HTTPOnly')
self.assertEqual(cookie_list[0], ('Set-Cookie', 'foo="bar"; HTTPOnly'))
def test_setCookie_w_httponly_false_value(self):
response = self._makeOne()
......@@ -315,7 +316,7 @@ class HTTPResponseTests(unittest.TestCase):
cookie_list = response._cookie_list()
self.assertEqual(len(cookie_list), 1)
self.assertEqual(cookie_list[0], 'Set-Cookie: foo="bar"')
self.assertEqual(cookie_list[0], ('Set-Cookie', 'foo="bar"'))
def test_setCookie_unquoted(self):
response = self._makeOne()
......@@ -327,7 +328,7 @@ class HTTPResponseTests(unittest.TestCase):
cookie_list = response._cookie_list()
self.assertEqual(len(cookie_list), 1)
self.assertEqual(cookie_list[0], 'Set-Cookie: foo=bar')
self.assertEqual(cookie_list[0], ('Set-Cookie', 'foo=bar'))
def test_appendCookie_w_existing(self):
response = self._makeOne()
......@@ -930,6 +931,177 @@ class HTTPResponseTests(unittest.TestCase):
else:
self.fail("Didn't raise Unauthorized")
def test_finalize_empty(self):
response = self._makeOne()
status, headers = response.finalize()
self.assertEqual(status, '200 OK')
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('Content-Length', '0'),
])
def test_finalize_w_body(self):
response = self._makeOne()
response.body = 'TEST'
status, headers = response.finalize()
self.assertEqual(status, '200 OK')
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('Content-Length', '4'),
])
def test_finalize_w_existing_content_length(self):
response = self._makeOne()
response.setHeader('Content-Length', '42')
status, headers = response.finalize()
self.assertEqual(status, '200 OK')
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('Content-Length', '42'),
])
def test_finalize_w_transfer_encoding(self):
response = self._makeOne()
response.setHeader('Transfer-Encoding', 'slurry')
status, headers = response.finalize()
self.assertEqual(status, '200 OK')
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('Transfer-Encoding', 'slurry'),
])
def test_finalize_after_redirect(self):
response = self._makeOne()
response.redirect('http://example.com/')
status, headers = response.finalize()
self.assertEqual(status, '302 Moved Temporarily')
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('Content-Length', '0'),
('Location', 'http://example.com/'),
])
def test_listHeaders_empty(self):
response = self._makeOne()
headers = response.listHeaders()
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
])
def test_listHeaders_already_wrote(self):
# listHeaders doesn't do the short-circuit on _wrote.
response = self._makeOne()
response._wrote = True
headers = response.listHeaders()
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
])
def test_listHeaders_existing_content_length(self):
response = self._makeOne()
response.setHeader('Content-Length', 42)
headers = response.listHeaders()
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('Content-Length', '42'),
])
def test_listHeaders_existing_transfer_encoding(self):
# If 'Transfer-Encoding' is set, don't force 'Content-Length'.
response = self._makeOne()
response.setHeader('Transfer-Encoding', 'slurry')
headers = response.listHeaders()
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('Transfer-Encoding', 'slurry'),
])
def test_listHeaders_after_setHeader(self):
response = self._makeOne()
response.setHeader('x-consistency', 'Foolish')
headers = response.listHeaders()
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('X-Consistency', 'Foolish'),
])
def test_listHeaders_after_setHeader_literal(self):
response = self._makeOne()
response.setHeader('X-consistency', 'Foolish', literal=True)
headers = response.listHeaders()
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('X-consistency', 'Foolish'),
])
def test_listHeaders_after_redirect(self):
response = self._makeOne()
response.redirect('http://example.com/')
headers = response.listHeaders()
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('Location', 'http://example.com/'),
])
def test_listHeaders_after_setCookie_appendCookie(self):
response = self._makeOne()
response.setCookie('foo', 'bar', path='/')
response.appendCookie('foo', 'baz')
headers = response.listHeaders()
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('Set-Cookie', 'foo="bar%3Abaz"; Path=/'),
])
def test_listHeaders_after_expireCookie(self):
response = self._makeOne()
response.expireCookie('qux', path='/')
headers = response.listHeaders()
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('Set-Cookie', 'qux="deleted"; '
'Path=/; '
'Expires=Wed, 31-Dec-97 23:59:59 GMT; '
'Max-Age=0'),
])
def test_listHeaders_after_addHeader(self):
response = self._makeOne()
response.addHeader('X-Consistency', 'Foolish')
response.addHeader('X-Consistency', 'Oatmeal')
headers = response.listHeaders()
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('X-Consistency', 'Foolish'),
('X-Consistency', 'Oatmeal'),
])
def test_listHeaders_w_body(self):
response = self._makeOne()
response.setBody('BLAH')
headers = response.listHeaders()
self.assertEqual(headers,
[('X-Powered-By', 'Zope (www.zope.org), '
'Python (www.python.org)'),
('Content-Length', '4'),
('Content-Type', 'text/plain; charset=iso-8859-15'),
])
def test___str__already_wrote(self):
response = self._makeOne()
response._wrote = True
......
......@@ -31,105 +31,435 @@ class WSGIResponseTests(unittest.TestCase):
from ZPublisher import WSGIPublisher
WSGIPublisher._NOW, self._old_NOW = value, WSGIPublisher._NOW
def test___str__already_wrote_not_chunking(self):
def test_finalize_sets_204_on_empty_not_streaming(self):
response = self._makeOne()
response._wrote = True
response._chunking = False
self.assertEqual(str(response), '')
def test___str__already_wrote_w_chunking(self):
response = self._makeOne()
response._wrote = True
response._chunking = True
self.assertEqual(str(response), '0\r\n\r\n')
def test___str__sets_204_on_empty_not_streaming(self):
response = self._makeOne()
str(response) # not checking value
response.finalize()
self.assertEqual(response.status, 204)
def test___str__sets_204_on_empty_not_streaming_ignores_non_200(self):
def test_finalize_sets_204_on_empty_not_streaming_ignores_non_200(self):
response = self._makeOne()
response.setStatus(302)
str(response) # not checking value
response.finalize()
self.assertEqual(response.status, 302)
def test___str___sets_content_length_if_missing(self):
def test_finalize_sets_content_length_if_missing(self):
response = self._makeOne()
response.setBody('TESTING')
str(response) # not checking value
self.assertEqual(response.getHeader('Content-Length'),
str(len('TESTING')))
response.finalize()
self.assertEqual(response.getHeader('Content-Length'), '7')
def test___str___skips_setting_content_length_if_missing_w_streaming(self):
def test_finalize_skips_setting_content_length_if_missing_w_streaming(self):
response = self._makeOne()
response._streaming = True
response.body = 'TESTING'
str(response) # not checking value
response.finalize()
self.failIf(response.getHeader('Content-Length'))
def test___str___w_default_http_version(self):
def test_finalize_HTTP_1_0_keep_alive_w_content_length(self):
response = self._makeOne()
response._http_version = '1.0'
response._http_connection = 'keep-alive'
response.setBody('TESTING')
result = str(response).splitlines()
self.assertEqual(result[0], 'HTTP/1.0 200 OK')
response.finalize()
self.assertEqual(response.getHeader('Connection'), 'Keep-Alive')
def test_finalize_HTTP_1_0_keep_alive_wo_content_length_streaming(self):
response = self._makeOne()
response._http_version = '1.0'
response._http_connection = 'keep-alive'
response._streaming = True
response.finalize()
self.assertEqual(response.getHeader('Connection'), 'close')
def test___str___w_explicit_http_version(self):
def test_finalize_HTTP_1_0_not_keep_alive_w_content_length(self):
response = self._makeOne()
response._http_version = '1.0'
response.setBody('TESTING')
response.finalize()
self.assertEqual(response.getHeader('Connection'), 'close')
def test_finalize_HTTP_1_1_connection_close(self):
response = self._makeOne()
response._http_version = '1.1'
result = str(response).splitlines()
self.assertEqual(result[0], 'HTTP/1.1 200 OK')
response._http_connection = 'close'
response.finalize()
self.assertEqual(response.getHeader('Connection'), 'close')
def test___str___skips_Server_header_wo_server_version_set(self):
def test_finalize_HTTP_1_1_wo_content_length_streaming_wo_http_chunk(self):
response = self._makeOne()
response._http_version = '1.1'
response._streaming = True
response.http_chunk = 0
response.finalize()
self.assertEqual(response.getHeader('Connection'), 'close')
self.assertEqual(response.getHeader('Transfer-Encoding'), None)
self.failIf(response._chunking)
def test_finalize_HTTP_1_1_wo_content_length_streaming_w_http_chunk(self):
response = self._makeOne()
response._http_version = '1.1'
response._streaming = True
response.http_chunk = 1
response.finalize()
self.assertEqual(response.getHeader('Connection'), None)
def test_finalize_HTTP_1_1_w_content_length_wo_chunk_wo_streaming(self):
response = self._makeOne()
response._http_version = '1.1'
response.setBody('TESTING')
result = str(response).splitlines()
sv = [x for x in result if x.lower().startswith('server-version')]
response.finalize()
self.assertEqual(response.getHeader('Connection'), None)
def test_listHeaders_skips_Server_header_wo_server_version_set(self):
response = self._makeOne()
response.setBody('TESTING')
headers = response.listHeaders()
sv = [x for x in headers if x[0] == 'Server']
self.failIf(sv)
def test___str___includes_Server_header_w_server_version_set(self):
def test_listHeaders_includes_Server_header_w_server_version_set(self):
response = self._makeOne()
response._server_version = 'TESTME'
response.setBody('TESTING')
result = str(response).splitlines()
self.assertEqual(result[1], 'Server: TESTME')
headers = response.listHeaders()
sv = [x for x in headers if x[0] == 'Server']
self.failUnless(('Server', 'TESTME') in sv)
def test___str___includes_Date_header(self):
def test_listHeaders_includes_Date_header(self):
import time
WHEN = time.localtime()
self._setNOW(time.mktime(WHEN))
response = self._makeOne()
response.setBody('TESTING')
result = str(response).splitlines()
self.assertEqual(result[1], 'Date: %s' %
time.strftime('%a, %d %b %Y %H:%M:%S GMT',
time.gmtime(time.mktime(WHEN))))
headers = response.listHeaders()
whenstr = time.strftime('%a, %d %b %Y %H:%M:%S GMT',
time.gmtime(time.mktime(WHEN)))
self.failUnless(('Date', whenstr) in headers)
def test___str___HTTP_1_0_keep_alive_w_content_length(self):
response = self._makeOne()
response._http_version = '1.0'
response._http_connection = 'keep-alive'
response.setBody('TESTING')
str(response) # not checking value
self.assertEqual(response.getHeader('Connection'), 'Keep-Alive')
#def test___str__already_wrote_not_chunking(self):
# response = self._makeOne()
# response._wrote = True
# response._chunking = False
# self.assertEqual(str(response), '')
def test___str___HTTP_1_0_keep_alive_wo_content_length_streaming(self):
response = self._makeOne()
response._http_version = '1.0'
response._http_connection = 'keep-alive'
response._streaming = True
str(response) # not checking value
self.assertEqual(response.getHeader('Connection'), 'close')
#def test___str__already_wrote_w_chunking(self):
# response = self._makeOne()
# response._wrote = True
# response._chunking = True
# self.assertEqual(str(response), '0\r\n\r\n')
def test___str___HTTP_1_0_not_keep_alive_w_content_length(self):
def test___str___raises(self):
response = self._makeOne()
response._http_version = '1.0'
response.setBody('TESTING')
str(response) # not checking value
self.assertEqual(response.getHeader('Connection'), 'close')
self.assertRaises(NotImplementedError, lambda: str(response))
class Test_publish(unittest.TestCase):
def _callFUT(self, request, module_name, _get_module_info=None):
from ZPublisher.WSGIPublisher import publish
if _get_module_info is None:
return publish(request, module_name)
return publish(request, module_name, _get_module_info)
def test_invalid_module_doesnt_catch_error(self):
_gmi = DummyCallable()
_gmi._raise = ImportError('testing')
self.assertRaises(ImportError, self._callFUT, None, 'nonesuch', _gmi)
self.assertEqual(_gmi._called_with, (('nonesuch',), {}))
def test_wo_REMOTE_USER(self):
request = DummyRequest(PATH_INFO='/')
response = request.response = DummyResponse()
_before = DummyCallable()
_after = object()
_object = DummyCallable()
_object._result = 'RESULT'
request._traverse_to = _object
_realm = 'TESTING'
_debug_mode = True
_err_hook = DummyCallable()
_validated_hook = object()
_tm = DummyTM()
_gmi = DummyCallable()
_gmi._result = (_before, _after, _object, _realm, _debug_mode,
_err_hook, _validated_hook, _tm)
returned = self._callFUT(request, 'okmodule', _gmi)
self.failUnless(returned is response)
self.assertEqual(_gmi._called_with, (('okmodule',), {}))
self.failUnless(request._processedInputs)
self.assertEqual(response.after_list, (_after,))
self.failUnless(response.debug_mode)
self.assertEqual(response.realm, 'TESTING')
self.assertEqual(_before._called_with, ((), {}))
self.assertEqual(request['PARENTS'], [_object])
self.assertEqual(request._traversed, ('/', None, _validated_hook))
self.assertEqual(_tm._recorded, (_object, request))
self.assertEqual(_object._called_with, ((), {}))
self.assertEqual(response._body, 'RESULT')
self.assertEqual(_err_hook._called_with, None)
def test_w_REMOTE_USER(self):
request = DummyRequest(PATH_INFO='/', REMOTE_USER='phred')
response = request.response = DummyResponse()
_before = DummyCallable()
_after = object()
_object = DummyCallable()
_object._result = 'RESULT'
request._traverse_to = _object
_realm = 'TESTING'
_debug_mode = True
_err_hook = DummyCallable()
_validated_hook = object()
_tm = DummyTM()
_gmi = DummyCallable()
_gmi._result = (_before, _after, _object, _realm, _debug_mode,
_err_hook, _validated_hook, _tm)
self._callFUT(request, 'okmodule', _gmi)
self.assertEqual(response.realm, None)
class Test_publish_module(unittest.TestCase):
def setUp(self):
from zope.testing.cleanup import cleanUp
cleanUp()
def tearDown(self):
from zope.testing.cleanup import cleanUp
cleanUp()
def _callFUT(self, environ, start_response,
_publish=None, _response_factory=None, _request_factory=None):
from ZPublisher.WSGIPublisher import publish_module
if _publish is not None:
if _response_factory is not None:
if _request_factory is not None:
return publish_module(environ, start_response, _publish,
_response_factory, _request_factory)
return publish_module(environ, start_response, _publish,
_response_factory)
else:
if _request_factory is not None:
return publish_module(environ, start_response, _publish,
_request_factory=_request_factory)
return publish_module(environ, start_response, _publish)
return publish_module(environ, start_response)
def _registerView(self, factory, name, provides=None):
from zope.component import provideAdapter
from zope.interface import Interface
from zope.publisher.browser import IDefaultBrowserLayer
from OFS.interfaces import IApplication
if provides is None:
provides = Interface
requires = (IApplication, IDefaultBrowserLayer)
provideAdapter(factory, requires, provides, name)
def _makeEnviron(self, **kw):
from StringIO import StringIO
environ = {
'SCRIPT_NAME' : '',
'REQUEST_METHOD' : 'GET',
'QUERY_STRING' : '',
'SERVER_NAME' : '127.0.0.1',
'REMOTE_ADDR': '127.0.0.1',
'wsgi.url_scheme': 'http',
'SERVER_PORT': '8080',
'HTTP_HOST': '127.0.0.1:8080',
'SERVER_PROTOCOL' : 'HTTP/1.1',
'wsgi.input' : StringIO(''),
'CONTENT_LENGTH': '0',
'HTTP_CONNECTION': 'keep-alive',
'CONTENT_TYPE': ''
}
environ.update(kw)
return environ
def test_calls_setDefaultSkin(self):
from zope.traversing.interfaces import ITraversable
from zope.traversing.namespace import view
class TestView:
__name__ = 'testing'
def __init__(self, context, request):
pass
def __call__(self):
return 'foobar'
# Define the views
self._registerView(TestView, 'testing')
# Bind the 'view' namespace (for @@ traversal)
self._registerView(view, 'view', ITraversable)
environ = self._makeEnviron(PATH_INFO='/@@testing')
self.assertEqual(self._callFUT(environ, noopStartResponse),
('', 'foobar'))
def test_publish_can_return_new_response(self):
from ZPublisher.HTTPRequest import HTTPRequest
_response = DummyResponse()
_response.body = 'BODY'
_after1 = DummyCallable()
_after2 = DummyCallable()
_response.after_list = (_after1, _after2)
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
_publish._result = _response
app_iter = self._callFUT(environ, start_response, _publish)
self.assertEqual(app_iter, ('', 'BODY'))
(status, headers), kw = start_response._called_with
self.assertEqual(status, '204 No Content')
self.assertEqual(headers, [('Content-Length', '0')])
self.assertEqual(kw, {})
(request, module), kw = _publish._called_with
self.failUnless(isinstance(request, HTTPRequest))
self.assertEqual(module, 'Zope2')
self.assertEqual(kw, {})
self.failUnless(_response._finalized)
self.assertEqual(_after1._called_with, ((), {}))
self.assertEqual(_after2._called_with, ((), {}))
def test_swallows_Unauthorized(self):
from zExceptions import Unauthorized
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
_publish._raise = Unauthorized('TESTING')
app_iter = self._callFUT(environ, start_response, _publish)
self.assertEqual(app_iter, ('', ''))
(status, headers), kw = start_response._called_with
self.assertEqual(status, '401 Unauthorized')
self.failUnless(('Content-Length', '0') in headers)
self.assertEqual(kw, {})
def test_swallows_Redirect(self):
from zExceptions import Redirect
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
_publish._raise = Redirect('/redirect_to')
app_iter = self._callFUT(environ, start_response, _publish)
self.assertEqual(app_iter, ('', ''))
(status, headers), kw = start_response._called_with
self.assertEqual(status, '302 Moved Temporarily')
self.failUnless(('Location', '/redirect_to') in headers)
self.failUnless(('Content-Length', '0') in headers)
self.assertEqual(kw, {})
def test_response_body_is_file(self):
class DummyFile(file):
def __init__(self):
pass
def read(self, *args, **kw):
raise NotImplementedError()
_response = DummyResponse()
_response._status = '200 OK'
_response._headers = [('Content-Length', '4')]
body = _response.body = DummyFile()
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
_publish._result = _response
app_iter = self._callFUT(environ, start_response, _publish)
self.failUnless(app_iter is body)
def test_request_closed_when_tm_middleware_not_active(self):
environ = self._makeEnviron()
start_response = DummyCallable()
_request = DummyRequest()
_request._closed = False
def _close():
_request._closed = True
_request.close = _close
def _request_factory(stdin, environ, response):
return _request
_publish = DummyCallable()
_publish._result = DummyResponse()
app_iter = self._callFUT(environ, start_response, _publish,
_request_factory=_request_factory)
self.failUnless(_request._closed)
def test_request_not_closed_when_tm_middleware_active(self):
import transaction
environ = self._makeEnviron()
environ['repoze.tm.active'] = 1
start_response = DummyCallable()
_request = DummyRequest()
_request._closed = False
def _close():
_request._closed = True
_request.close = _close
def _request_factory(stdin, environ, response):
return _request
_publish = DummyCallable()
_publish._result = DummyResponse()
app_iter = self._callFUT(environ, start_response, _publish,
_request_factory=_request_factory)
self.failIf(_request._closed)
txn = transaction.get()
self.failUnless(list(txn.getAfterCommitHooks()))
class DummyRequest(dict):
_processedInputs = False
_traversed = None
_traverse_to = None
args = ()
def processInputs(self):
self._processedInputs = True
def traverse(self, path, response=None, validated_hook=None):
self._traversed = (path, response, validated_hook)
return self._traverse_to
class DummyResponse(object):
debug_mode = False
after_list = ()
realm = None
_body = None
_finalized = False
_status = '204 No Content'
_headers = [('Content-Length', '0')]
def finalize(self):
self._finalized = True
return self._status, self._headers
def setBody(self, body):
self._body = body
body = property(lambda self: self._body, setBody)
class DummyCallable(object):
_called_with = _raise = _result = None
def __call__(self, *args, **kw):
self._called_with = (args, kw)
if self._raise:
raise self._raise
return self._result
class DummyTM(object):
_recorded = _raise = _result = None
def recordMetaData(self, *args):
self._recorded = args
def noopStartResponse(status, headers):
pass
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(WSGIResponseTests))
return suite
return unittest.TestSuite((
unittest.makeSuite(WSGIResponseTests),
unittest.makeSuite(Test_publish),
unittest.makeSuite(Test_publish_module),
))
......@@ -140,7 +140,7 @@ class ZServerHTTPResponse(HTTPResponse):
val = val.replace('\n\t', '\r\n\t')
append("%s: %s" % (key, val))
if self.cookies:
chunks.extend(self._cookie_list())
chunks.extend(['%s: %s' % x for x in self._cookie_list()])
append('')
append(body)
......
......@@ -52,6 +52,22 @@ def _setconfig(configfile=None):
App.config.setConfiguration(opts.configroot)
return opts
def make_wsgi_app(global_config, zope_conf):
from App.config import setConfiguration
from Zope2.Startup import get_starter
from Zope2.Startup.handlers import handleConfig
from Zope2.Startup.options import ZopeOptions
from ZPublisher.WSGIPublisher import publish_module
starter = get_starter()
opts = ZopeOptions()
opts.configfile = zope_conf
opts.realize(args=(), progname='Zope2WSGI', raise_getopt_errs=False)
handleConfig(opts.configroot, opts.confighandlers)
setConfiguration(opts.configroot)
starter.setConfiguration(opts.configroot)
starter.prepare()
return publish_module
if __name__ == '__main__':
run()
##############################################################################
#
# This was yanked out of repoze.zope2
#
##############################################################################
""" Add a Zope management user to the root Zope user folder """
import sys
from Zope2.utilities.finder import ZopeFinder
def adduser(app, user, pwd):
import transaction
result = app.acl_users._doAddUser(user, pwd, ['Manager'], [])
transaction.commit()
return result
def main(argv=sys.argv):
import sys
try:
user, pwd = argv[1], argv[2]
except IndexError:
print "%s <username> <password>" % argv[0]
sys.exit(255)
finder = ZopeFinder(argv)
finder.filter_warnings()
app = finder.get_app()
adduser(app, user, pwd)
if __name__ == '__main__':
main()
##############################################################################
#
# yanked from repoze.zope2
#
##############################################################################
import os
class ZopeFinder:
def __init__(self, argv):
self.cmd = argv[0]
def filter_warnings(self):
import warnings
warnings.simplefilter('ignore', Warning, append=True)
def get_app(self, config_file=None):
# given a config file, return a Zope application object
if config_file is None:
config_file = self.get_zope_conf()
from Zope2.Startup import options, handlers
import App.config
import Zope2
opts = options.ZopeOptions()
opts.configfile = config_file
opts.realize(args=[], doc="", raise_getopt_errs=0)
handlers.handleConfig(opts.configroot, opts.confighandlers)
App.config.setConfiguration(opts.configroot)
app = Zope2.app()
return app
def get_zope_conf(self):
# the default config file path is assumed to live in
# $instance_home/etc/zope.conf, and the console scripts that use this
# are assumed to live in $instance_home/bin; override if the
# environ contains "ZOPE_CONF".
ihome = os.path.dirname(os.path.abspath(os.path.dirname(self.cmd)))
default_config_file = os.path.join(ihome, 'etc', 'zope.conf')
zope_conf = os.environ.get('ZOPE_CONF', default_config_file)
return zope_conf
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment