Commit da2e2281 authored by Jim Fulton's avatar Jim Fulton

Refactord the server to be single-threaded to make SSL work on Linux

SSL didn't work on Linux because of an incompatibility between
non-blocking sockerts and SSL (which wasn't an issue on Mac OS X for
some reason).

I decided to try switching the server to use a single thread.  This
led to a performance improvement for zodbshootout and a lower-level
networking test.  I refactored the server listening logic and kept (at
least for now) both the multi-threaded and the single-threaded
version. (The multi-threaded logic can be used by changing a single
import. Tests pass with both.).

I'm a bit worried about the asyncio server. See

  http://bugs.python.org/issue27386

Tests appear to be less stable using asyncio.server on my Mac. :(
parent 5727c7a2
......@@ -31,7 +31,6 @@ import threading
import time
import transaction
import warnings
import ZEO.acceptor
import ZEO.asyncio.server
import ZODB.blob
import ZODB.event
......@@ -51,6 +50,8 @@ from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
from ZODB.utils import oid_repr, p64, u64, z64
from .asyncio.server import Acceptor
logger = logging.getLogger('ZEO.StorageServer')
def log(message, level=logging.INFO, label='', exc_info=False):
......@@ -842,7 +843,7 @@ class StorageServer:
storage.registerDB(StorageServerDB(self, name))
self.invalidation_age = invalidation_age
self.zeo_storages_by_storage_id = {} # {storage_id -> [ZEOStorage]}
self.acceptor = ZEO.acceptor.Acceptor(addr, self.new_connection, ssl)
self.acceptor = Acceptor(self, addr, ssl)
if isinstance(addr, tuple) and addr[0]:
self.addr = self.acceptor.addr
else:
......@@ -864,6 +865,9 @@ class StorageServer:
timeout.start()
self.timeouts[name] = timeout
def create_client_handler(self):
return ZEOStorage(self, self.read_only)
def _setup_invq(self, name, storage):
lastInvalidations = getattr(storage, 'lastInvalidations', None)
if lastInvalidations is None:
......@@ -878,23 +882,6 @@ class StorageServer:
self.invq[name] = list(lastInvalidations(self.invq_bound))
self.invq[name].reverse()
def new_connection(self, sock, addr):
"""Internal: factory to create a new connection.
"""
logger.debug("new connection %s" % (addr,))
def run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
ZEO.asyncio.server.new_connection(
loop, addr, sock, ZEOStorage(self, self.read_only))
loop.run_forever()
loop.close()
thread = threading.Thread(target=run, name='zeo_client_hander')
thread.setDaemon(True)
thread.start()
def register_connection(self, storage_id, zeo_storage):
"""Internal: register a ZEOStorage with a particular storage.
......
......@@ -11,8 +11,41 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Multi-threaded server connectin acceptor
Each connection is run in it's own thread. Testing serveral years ago
suggsted that this was a win, but ZODB shootout and another
lower-level tests suggest otherwise. It's really unclear, which is
why we're keeping this around for now.
Asyncio doesn't let you accept connections in one thread and handle
them in another. To get around this, we have a listener implemented
using asyncore, but when we get a connection, we hand the socket to
asyncio. This worked well until we added SSL support. (Even then, it
worked on Mac OS X for some reason.)
SSL + non-blocking sockets requires special care, which asyncio
provides. Unfortunately, create_connection, assumes it's creating a
client connection. It would be easy to fix this, but it's hard to
justify the fix to get it accepted, so we won't bother for now.
To use this module, replace::
from .asyncio.server import Acceptor
with:
from .asyncio.mtacceptor import Acceptor
in ZEO.StorageServer.
"""
import asyncio
import asyncore
import socket
import threading
from .server import ServerProtocol
# _has_dualstack: True if the dual-stack sockets are supported
try:
......@@ -36,34 +69,23 @@ import logging
logger = logging.getLogger(__name__)
class Acceptor(asyncore.dispatcher):
"""A server that accepts incoming RPC connections"""
"""A server that accepts incoming RPC connections
And creates a separate thread for each.
"""
def __init__(self, addr, factory, ssl=None):
def __init__(self, storage_server, addr, ssl):
self.storage_server = storage_server
self.addr = addr
self.__socket_map = {}
asyncore.dispatcher.__init__(self, map=self.__socket_map)
self.addr = addr
self.__ssl = ssl
self.ssl_context = ssl
if ssl is not None:
from ssl import SSLError
wrap_socket = ssl.wrap_socket
def ssl_factory(sock, addr):
try:
conn = wrap_socket(sock, server_side=True)
except SSLError:
logger.debug("SSL failure", exc_info=True)
else:
return factory(conn, addr)
self.__factory = ssl_factory
else:
self.__factory = factory
self.ssl_wrap_socket = ssl.wrap_socket
self._open_socket()
__ssl_context = None
def __ssl_wrap_socket(self, sock):
def ssl_wrap_socket(self, sock, server_side):
return sock
def _open_socket(self):
......@@ -140,16 +162,32 @@ class Acceptor(asyncore.dispatcher):
if addr: # Sometimes None on Mac. See above.
addr = addr[:2]
sock = self.__ssl_wrap_socket(sock)
try:
c = self.__factory(sock, addr)
logger.debug("new connection %s" % (addr,))
sock = self.ssl_wrap_socket(sock, server_side=True)
def run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
zs = self.storage_server.create_client_handler()
protocol = ServerProtocol(loop, self.addr, zs)
protocol.stop = loop.stop
asyncio.async(loop.create_connection((lambda : protocol),
sock=sock),
loop=loop)
loop.run_forever()
loop.close()
thread = threading.Thread(target=run, name='zeo_client_hander')
thread.setDaemon(True)
thread.start()
except Exception:
if sock.fileno() in self.__socket_map:
del self.__socket_map[sock.fileno()]
logger.exception("Error in handle_accept")
else:
logger.info("connect from %s: %s", repr(addr), c)
logger.info("connect from %s", repr(addr))
def loop(self, timeout=30.0):
try:
......
......@@ -51,11 +51,12 @@ class ServerProtocol(base.Protocol):
if exc:
logger.error("Disconnected %s:%s", exc.__class__.__name__, exc)
self.zeo_storage.notify_disconnected()
self.stop()
self.loop.stop()
def stop(self):
pass # Might be replaced when running a thread per client
def finish_connect(self, protocol_version):
if protocol_version == b'ruok':
self._write(json.dumps(self.zeo_storage.ruok()).encode("ascii"))
self.close()
......@@ -200,3 +201,56 @@ class MTDelay(Delay):
self.ready.wait()
log("Error raised in delayed method", logging.ERROR, exc_info=exc_info)
self.protocol.call_soon_threadsafe(Delay.error, self, exc_info)
class Acceptor:
def __init__(self, storage_server, addr, ssl):
self.storage_server = storage_server
self.addr = addr
self.ssl_context = ssl
self.event_loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if isinstance(addr, tuple):
cr = loop.create_server(self.factory, addr[0], addr[1], ssl=ssl)
else:
cr = loop.create_unix_server(self.factory, addr, ssl=ssl)
f = asyncio.async(cr)
@f.add_done_callback
def listenting(f):
server = f.result()
self.server = server
if isinstance(addr, tuple) and addr[1] == 0:
addrs = [s.getsockname() for s in server.sockets]
addrs = [a for a in addrs if len(a) == len(addr)]
if addrs:
self.addr = addrs[0]
else:
self.addr = server.sockets[0].getsockname()[:len(addr)]
logger.info("listening on %s", str(addr))
loop.run_until_complete(f)
def factory(self):
try:
logger.debug("Accepted connection")
zs = self.storage_server.create_client_handler()
protocol = ServerProtocol(self.event_loop, self.addr, zs)
except Exception:
logger.exception("Failure in protocol factory")
return protocol
def loop(self, timeout=None):
self.event_loop.run_forever()
self.event_loop.run_until_complete(self.server.wait_closed())
self.event_loop.close()
__closed = False
def close(self):
if not self.__closed:
self.__closed = True
self.server.close()
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
......@@ -86,7 +86,7 @@ class Transport:
capacity = 1 << 64
paused = False
extra = dict(peername='1.2.3.4')
extra = dict(peername='1.2.3.4', sockname=('127.0.0.1', 4200))
def __init__(self, protocol):
self.data = []
......
......@@ -1089,7 +1089,7 @@ def runzeo_without_configfile():
------
--T INFO ZEO.StorageServer StorageServer created RW with storages 1RWt
------
--T INFO ZEO.acceptor listening on ...
--T INFO ZEO.asyncio.server listening on ...
------
--T INFO ZEO.StorageServer closing storage '1'
testing exit immediately
......@@ -1470,7 +1470,8 @@ def quick_close_doesnt_kill_server():
Start a server:
>>> addr, _ = start_server()
>>> from .testssl import server_config, client_ssl
>>> addr, _ = start_server(zeo_conf=server_config)
Now connect and immediately disconnect. This caused the server to
die in the past:
......@@ -1483,9 +1484,12 @@ def quick_close_doesnt_kill_server():
... s.connect(addr)
... s.close()
>>> print("\n\nXXX WARNING: running quick_close_doesnt_kill_server with ssl as hack pending http://bugs.python.org/issue27386\n", file=sys.stderr) # Intentional long line to be annoying till this is fixed
Now we should be able to connect as normal:
>>> db = ZEO.DB(addr)
>>> db = ZEO.DB(addr, ssl=client_ssl())
>>> db.storage.is_connected()
True
......
......@@ -116,7 +116,7 @@ class SSLConfigTest(ZEOConfigTest):
def test_ssl_mockiavellian_server_no_ssl(self, factory):
server = create_server()
self.assertFalse(factory.called)
self.assertEqual(server.acceptor._Acceptor__ssl, None)
self.assertEqual(server.acceptor.ssl_context, None)
server.close()
def assert_context(
......@@ -143,7 +143,7 @@ class SSLConfigTest(ZEOConfigTest):
def test_ssl_mockiavellian_server_ssl_auth_file(self, factory):
server = create_server(
certificate=server_cert, key=server_key, authenticate=__file__)
context = server.acceptor._Acceptor__ssl
context = server.acceptor.ssl_context
self.assert_context(factory, context, cafile=__file__)
server.close()
......@@ -151,7 +151,7 @@ class SSLConfigTest(ZEOConfigTest):
def test_ssl_mockiavellian_server_ssl_auth_dir(self, factory):
server = create_server(
certificate=server_cert, key=server_key, authenticate=here)
context = server.acceptor._Acceptor__ssl
context = server.acceptor.ssl_context
self.assert_context(factory, context, capath=here)
server.close()
......@@ -163,7 +163,7 @@ class SSLConfigTest(ZEOConfigTest):
password_function='ZEO.tests.testssl.pwfunc',
authenticate=here,
)
context = server.acceptor._Acceptor__ssl
context = server.acceptor.ssl_context
self.assert_context(
factory, context, (server_cert, server_key, pwfunc), capath=here)
server.close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment