Commit d0310869 authored by Bryton Lacquement's avatar Bryton Lacquement :door: Committed by Julien Muchembled

Add WSGI support

This first work on WSGI is only to stop using ZServer (Medusa),
which is a required step before moving to Zope 4. This means that
Zope should behave almost exactly the same way as before, notably:

- We don't take advantage yet of what WSGI offers, like IPv6.
- There's extra code to handle errors the same way as before
  (this is something we'll have to change for Zope 4).

The most significant change in behaviour is that the chosen WSGI server
(waitress) does some of the HTTP work in worker threads (Medusa does it
entirely in the IO thread), and the biggest consequence concerns the
deadlock debugger that is now run from the worker thread:
- it does not work if all threads are blocked
- doing better would require to patch waitress in a quite ugly way

About TimerService, we simplify things by removing the egg.
In zope.conf, it's possible to import from the product.

/reviewed-on nexedi/erp5!883
parents 545a0487 5eeb52d5
......@@ -60,12 +60,7 @@ import transaction
from App.config import getConfiguration
from Shared.DC.ZRDB.Results import Results
import Products.Localizer.patches
localizer_lock = Products.Localizer.patches._requests_lock
localizer_contexts = Products.Localizer.patches._requests
LocalizerContext = lambda request: request
from zope.globalrequest import getRequest, setRequest
from Products.MailHost.MailHost import MailHostError
from zLOG import LOG, INFO, WARNING, ERROR
......@@ -139,14 +134,19 @@ def getServerAddress():
global _server_address
if _server_address is None:
ip = port = ''
from asyncore import socket_map
for k, v in socket_map.items():
if hasattr(v, 'addr'):
# see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
type = str(getattr(v, '__class__', 'unknown'))
if type == 'ZServer.HTTPServer.zhttp_server':
ip, port = v.addr
break
try:
zopewsgi = sys.modules['Products.ERP5.bin.zopewsgi']
except KeyError:
from asyncore import socket_map
for k, v in socket_map.items():
if hasattr(v, 'addr'):
# see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
type = str(getattr(v, '__class__', 'unknown'))
if type == 'ZServer.HTTPServer.zhttp_server':
ip, port = v.addr
break
else:
ip, port = zopewsgi.server.addr
if ip == '0.0.0.0':
ip = socket.gethostbyname(socket.gethostname())
_server_address = '%s:%s' %(ip, port)
......@@ -1464,7 +1464,7 @@ class ActivityTool (BaseTool):
def invoke(self, message):
if self.activity_tracking:
activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name))
old_localizer_context = False
old_request = None
if getattr(self, 'aq_chain', None) is not None:
# Grab existing acquisition chain and extrach base objects.
base_chain = [aq_base(x) for x in self.aq_chain]
......@@ -1500,16 +1500,8 @@ class ActivityTool (BaseTool):
new_request.other['VirtualRootPhysicalPath'] = request_info['VirtualRootPhysicalPath']
if 'HTTP_ACCEPT_LANGUAGE' in request_info:
new_request.environ['HTTP_ACCEPT_LANGUAGE'] = request_info['HTTP_ACCEPT_LANGUAGE']
# Replace Localizer/iHotfix Context, saving existing one
localizer_context = LocalizerContext(new_request)
id = get_ident()
localizer_lock.acquire()
try:
old_localizer_context = localizer_contexts.get(id)
localizer_contexts[id] = localizer_context
finally:
localizer_lock.release()
# Execute Localizer/iHotfix "patch 2"
old_request = getRequest()
setRequest(new_request)
new_request.processInputs()
new_request_container = request_container.__class__(REQUEST=new_request)
......@@ -1528,17 +1520,7 @@ class ActivityTool (BaseTool):
# Restore default skin selection
skinnable = self.getPortalObject()
skinnable.changeSkin(skinnable.getSkinNameFromRequest(request))
if old_localizer_context is not False:
# Restore Localizer/iHotfix context
id = get_ident()
localizer_lock.acquire()
try:
if old_localizer_context is None:
del localizer_contexts[id]
else:
localizer_contexts[id] = old_localizer_context
finally:
localizer_lock.release()
setRequest(old_request)
if self.activity_tracking:
activity_tracking_logger.info('invoked message')
if my_self is not self: # We rewrapped self
......
import argparse
from io import BytesIO
import logging
import os
import posixpath
import socket
from tempfile import TemporaryFile
import time
from urllib import quote
from waitress.server import create_server
import ZConfig
import Zope2
from Zope2.Startup.run import make_wsgi_app
from Products.ERP5Type.patches.WSGIPublisher import publish_module
# this class licensed under the MIT license (stolen from pyramid_translogger)
class TransLogger(object):
format = ('%(REMOTE_ADDR)s - %(REMOTE_USER)s [%(time)s] '
'"%(REQUEST_METHOD)s %(REQUEST_URI)s %(HTTP_VERSION)s" '
'%(status)s %(bytes)s "%(HTTP_REFERER)s" "%(HTTP_USER_AGENT)s"')
def __init__(self, application, logger):
self.application = application
self.logger = logger
def __call__(self, environ, start_response):
start = time.localtime()
req_uri = quote(environ.get('SCRIPT_NAME', '')
+ environ.get('PATH_INFO', ''))
if environ.get('QUERY_STRING'):
req_uri += '?'+environ['QUERY_STRING']
method = environ['REQUEST_METHOD']
def replacement_start_response(status, headers, exc_info=None):
# @@: Ideally we would count the bytes going by if no
# content-length header was provided; but that does add
# some overhead, so at least for now we'll be lazy.
bytes = None
for name, value in headers:
if name.lower() == 'content-length':
bytes = value
self.write_log(environ, method, req_uri, start, status, bytes)
return start_response(status, headers)
return self.application(environ, replacement_start_response)
def write_log(self, environ, method, req_uri, start, status, bytes):
if bytes is None:
bytes = '-'
if time.daylight:
offset = time.altzone / 60 / 60 * -100
else:
offset = time.timezone / 60 / 60 * -100
if offset >= 0:
offset = "+%0.4d" % (offset)
elif offset < 0:
offset = "%0.4d" % (offset)
d = {
'REMOTE_ADDR': environ.get('REMOTE_ADDR') or '-',
'REMOTE_USER': environ.get('REMOTE_USER') or '-',
'REQUEST_METHOD': method,
'REQUEST_URI': req_uri,
'HTTP_VERSION': environ.get('SERVER_PROTOCOL'),
'time': time.strftime('%d/%b/%Y:%H:%M:%S ', start) + offset,
'status': status.split(None, 1)[0],
'bytes': bytes,
'HTTP_REFERER': environ.get('HTTP_REFERER', '-'),
'HTTP_USER_AGENT': environ.get('HTTP_USER_AGENT', '-'),
}
message = self.format % d
self.logger.warn(message)
def app_wrapper(large_file_threshold, use_webdav):
try:
from product.DeadlockDebugger.dumper import dump_threads, dump_url
except Exception:
dump_url = '\0'
def app(environ, start_response):
path_info = environ['PATH_INFO']
if dump_url.startswith(path_info):
query_string = environ['QUERY_STRING']
if dump_url == (path_info + '?' + query_string if query_string
else path_info):
start_response('200 OK', (('Content-type', 'text/plain'),))
return [dump_threads()]
original_wsgi_input = environ['wsgi.input']
if not hasattr(original_wsgi_input, 'seek'):
# Convert environ['wsgi.input'] to a file-like object.
cl = environ.get('CONTENT_LENGTH')
cl = int(cl) if cl else 0
if cl > large_file_threshold:
new_wsgi_input = environ['wsgi.input'] = TemporaryFile('w+b')
else:
new_wsgi_input = environ['wsgi.input'] = BytesIO()
rest = cl
chunksize = 1<<20
try:
while chunksize < rest:
new_wsgi_input.write(original_wsgi_input.read(chunksize))
rest -= chunksize
if rest:
new_wsgi_input.write(original_wsgi_input.read(rest))
except (socket.error, IOError):
msg = b'Not enough data in request or socket error'
start_response('400 Bad Request', [
('Content-Type', 'text/plain'),
('Content-Length', str(len(msg))),
]
)
return [msg]
new_wsgi_input.seek(0)
if use_webdav:
# Munge the request to ensure that we call manage_FTPGet.
# Set a flag to indicate this request came through the WebDAV source
# port server.
environ['WEBDAV_SOURCE_PORT'] = 1
if environ['REQUEST_METHOD'] == 'GET':
if os.sep != '/':
path_info = path_info.replace(os.sep, '/')
path_info = posixpath.join(path_info, 'manage_DAVget')
path_info = posixpath.normpath(path_info)
environ['PATH_INFO'] = path_info
return publish_module(environ, start_response)
return app
def runwsgi():
global server
parser = argparse.ArgumentParser()
parser.add_argument('-w', '--webdav', action='store_true')
parser.add_argument('address', help='<ip>:<port>')
parser.add_argument('zope_conf', help='path to zope.conf')
args = parser.parse_args()
startup = os.path.dirname(Zope2.Startup.__file__)
schema = ZConfig.loadSchema(os.path.join(startup, 'zopeschema.xml'))
conf, _ = ZConfig.loadConfig(schema, args.zope_conf)
make_wsgi_app({}, zope_conf=args.zope_conf)
server = create_server(
TransLogger(app_wrapper(conf.large_file_threshold, args.webdav),
logger=logging.getLogger("access")),
listen=args.address,
threads=conf.zserver_threads,
trusted_proxy='*',
trusted_proxy_headers=('x-forwarded-for',),
clear_untrusted_proxy_headers=True,
)
server.run()
......@@ -38,19 +38,7 @@ from App.class_init import default__class_init__, ApplicationDefaultPermissions
# Nicer alias for class initializer.
InitializeClass = default__class_init__
##########################################
# Localizer is not always loaded prior to ERP5 products,
# thus, as Localizer is supposed to patch Global to add get_request to it,
# we prefer to redefine get_request inside ERP5Type/Utils,
# to avoid the case when Global wasn't patched and get_request is not available.
# This is specially important on Zope 2.12 where Globals doesn't even exist.
##########################################
try:
import Products.iHotfix
get_request = Products.iHotfix.get_request
except (ImportError, AttributeError):
import Products.Localizer
get_request = Products.Localizer.get_request
from zope.globalrequest import getRequest as get_request
# Persistency stuff also hasn't moved much from Zope 2.8, although the old
# "Persistence" module remains there for ancient backward compatibility.
......
......@@ -21,6 +21,7 @@
##############################################################################
# Load all monkey patches
from Products.ERP5Type.patches import WSGIPublisher
from Products.ERP5Type.patches import HTTPRequest
from Products.ERP5Type.patches import AccessControl_patch
from Products.ERP5Type.patches import Restricted
......
......@@ -30,7 +30,7 @@
ERP5Type is provides a RAD environment for Zope / CMF
All ERP5 classes derive from ERP5Type
"""
from patches import python, pylint
from patches import python, pylint, globalrequest
from zLOG import LOG, INFO
DISPLAY_BOOT_PROCESS = False
......
This diff is collapsed.
try:
import zope.globalrequest
except ImportError:
import sys
sys.modules['zope.globalrequest'] = sys.modules[__name__]
from threading import local
localData = local()
def getRequest():
return getattr(localData, 'request', None)
def setRequest(request):
localData.request = request
def clearRequest():
setRequest(None)
......@@ -31,7 +31,6 @@ import os
import sys
import imp
import re
import thread
from Testing import ZopeTestCase
from Testing.ZopeTestCase import PortalTestCase, user_name
......@@ -97,10 +96,9 @@ class ERP5TypeLiveTestCase(ERP5TypeTestCaseMixin):
registry._conns[-1] = portal
# This is for Localizer patch
from Products.Localizer import patches
from zope.globalrequest import setRequest
request = portal.REQUEST
with patches._requests_lock:
patches._requests[thread.get_ident()] = request
setRequest(request)
# Make live tests run under the same server URL than the host instance.
if _request_server_url:
......
......@@ -725,7 +725,7 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
'''Publishes the object at 'path' returning a response object.'''
from ZPublisher.Response import Response
from ZPublisher.Test import publish_module
from ZPublisher.Publish import publish_module_standard
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
......@@ -738,8 +738,6 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
if env is None:
env = {}
if extra is None:
extra = {}
request = self.app.REQUEST
......@@ -780,11 +778,20 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
try:
if user:
PAS._extractUserIds = _extractUserIds
publish_module('Zope2',
# The following `HTTPRequest` object would be created anyway inside
# `publish_module_standard` if no `request` argument was given.
request = request.__class__(stdin, env, response)
# However, we need to inject the content of `extra` inside the
# request.
if extra:
for k, v in extra.items(): request[k] = v
publish_module_standard('Zope2',
request=request,
response=response,
stdin=stdin,
environ=env,
extra=extra,
debug=not handle_errors,
)
finally:
......
......@@ -21,7 +21,6 @@ This is a hotfix, it dynamically applies several patches to Zope.
# Import from the Standard Library
import logging
import os
from thread import allocate_lock, get_ident
# Import from itools
from .itools.i18n import AcceptLanguageType
......@@ -30,7 +29,8 @@ from .itools.i18n import AcceptLanguageType
import Globals
from ZPublisher import Publish
from ZPublisher.HTTPRequest import HTTPRequest
from zope.globalrequest import clearRequest, setRequest
from zope.globalrequest import getRequest as get_request
# Flag
patch = False
......@@ -57,56 +57,21 @@ logger = logging.getLogger('Localizer')
# Also, we keep the get_request method in the Globals module for backwards
# compatibility (with TranslationService for example).
_requests = {}
_requests_lock = allocate_lock()
def get_request():
"""Get a request object"""
return _requests.get(get_ident(), None)
def new_publish(request, module_name, after_list, debug=0,
zope_publish=Publish.publish):
# Get the process id
ident = get_ident()
# Add the request object to the global dictionnary
_requests_lock.acquire()
try:
_requests[ident] = request
finally:
_requests_lock.release()
# Call the old publish
try:
# Publish
x = zope_publish(request, module_name, after_list, debug)
finally:
# Remove the request object.
# When conflicts occur the "publish" method is called again,
# recursively. In this situation the "_requests dictionary would
# be cleaned in the innermost call, hence outer calls find the
# request does not exist anymore. For this reason we check first
# wether the request is there or not.
if ident in _requests:
_requests_lock.acquire()
try:
del _requests[ident]
finally:
_requests_lock.release()
return x
def get_new_publish(zope_publish):
def publish(request, *args, **kwargs):
try:
setRequest(request)
return zope_publish(request, *args, **kwargs)
finally:
clearRequest()
return publish
if patch is False:
logger.info('Install "Globals.get_request".')
# Apply the patch
Publish.publish = new_publish
# First import (it's not a refresh operation).
# We need to apply the patches.
Publish.publish = get_new_publish(Publish.publish)
patch = True
# Add to Globals for backwards compatibility
......
......@@ -8,10 +8,10 @@ import traceback
import thread
import re
import sys, os, errno, time, socket
from functools import partial
from StringIO import StringIO
from zLOG import LOG, INFO
from ZServer.PubCore import handle
from ZPublisher.BaseRequest import BaseRequest
from ZPublisher.BaseResponse import BaseResponse
from ZPublisher.HTTPRequest import HTTPRequest
......@@ -38,21 +38,46 @@ class TimerServer:
'\tInterval: %s seconds.\n'%(time.ctime(time.time()), interval))
def run(self):
# wait until the zhttp_server exist in socket_map
# because TimerService has to be started after the Zope HTTPServer
from asyncore import socket_map
ip = port = ''
while 1:
time.sleep(5)
for k, v in socket_map.items():
if hasattr(v, 'addr'):
try:
zopewsgi = sys.modules['Products.ERP5.bin.zopewsgi']
except KeyError:
# wait until the zhttp_server exist in socket_map
# because TimerService has to be started after the Zope HTTPServer
from asyncore import socket_map
ip = port = ''
while 1:
time.sleep(5)
for k, v in socket_map.items():
if hasattr(v, 'addr'):
# see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
type = str(getattr(v, '__class__', 'unknown'))
if type == 'ZServer.HTTPServer.zhttp_server':
ip, port = v.addr
break
if port:
break
if port:
break
from ZServer.PubCore import handle
else:
while 1:
time.sleep(5)
try:
server = zopewsgi.server
break
except AttributeError:
pass
ip, port = server.addr
start_response = lambda *_: None
class handle(object):
def __init__(self, module_name, request, response):
self.service = partial(zopewsgi.publish_module,
request.environ,
start_response,
_module_name=module_name,
_request=request,
_response=response)
server.add_task(self)
if ip == '0.0.0.0':
ip = socket.gethostbyname(socket.gethostname())
......@@ -99,6 +124,9 @@ class TimerServer:
class TimerResponse(BaseResponse):
after_list = ()
def _finish(self):
pass
......@@ -108,6 +136,9 @@ class TimerResponse(BaseResponse):
def _unauthorized(self):
pass
def finalize(self):
return None, None
# This is taken from ZPublisher.HTTPResponse
# I don't think it's safe to make TimerResponse a subclass of HTTPResponse,
# so I inline here the method . This is required it you want unicode page
......@@ -148,6 +179,7 @@ class TimerRequest(HTTPRequest):
env['SERVER_PORT'] = ''
env['REMOTE_ADDR'] = ''
env['GATEWAY_INTERFACE'] = 'CGI/1.1'
env['SERVER_PROTOCOL'] = 'HTTP/1.0'
env['PATH_INFO']= '/Control_Panel/timer_service/process_timer'
return env
......
#!/usr/bin/env python
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
setup(name='timerserver',
version='2.0.4',
license='GPL',
description='Timer Server for Zope',
long_description='',
author='Nikolay Kim',
author_email='fafhrd@legco.biz',
packages=['timerserver'],
zip_safe=False,
package_data={'timerserver': ['component.xml']},
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment