Commit 1eb086da authored by Jim Fulton's avatar Jim Fulton Committed by GitHub

Merge pull request #32 from zopefoundation/ssl

Add SSL support +

It's sad no reviewer, but I suppose it can be reviewed even after a merge.
parents b8711838 22615fb4
...@@ -6,17 +6,6 @@ matrix: ...@@ -6,17 +6,6 @@ matrix:
python: 3.4 python: 3.4
- os: linux - os: linux
python: 3.5 python: 3.5
- os: osx
language: generic
env: TERRYFY_PYTHON='macpython 3.4'
- os: osx
language: generic
env: TERRYFY_PYTHON='homebrew 3'
before_install:
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then git clone https://github.com/MacPython/terryfy; fi
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then source terryfy/travis_tools.sh; fi
- if [[ "$TRAVIS_OS_NAME" == "osx" ]]; then get_python_environment $TERRYFY_PYTHON venv; fi
- if [[ "$TERRYFY_PYTHON" == "homebrew 3" ]]; then alias pip=`which pip3` ; fi
install: install:
- pip install -U setuptools - pip install -U setuptools
- python bootstrap.py - python bootstrap.py
......
This diff is collapsed.
This diff is collapsed.
...@@ -92,7 +92,7 @@ def alltests(): ...@@ -92,7 +92,7 @@ def alltests():
_unittests_only(suite, mod.test_suite()) _unittests_only(suite, mod.test_suite())
return suite return suite
tests_require = ['zope.testing', 'manuel', 'random2'] tests_require = ['zope.testing', 'manuel', 'random2', 'mock']
long_description = ( long_description = (
open('README.rst').read() open('README.rst').read()
......
...@@ -93,6 +93,7 @@ class ClientStorage(object): ...@@ -93,6 +93,7 @@ class ClientStorage(object):
blob_cache_size=None, blob_cache_size_check=10, blob_cache_size=None, blob_cache_size_check=10,
client_label=None, client_label=None,
cache=None, cache=None,
ssl = None, ssl_server_hostname=None,
# Mostly ignored backward-compatability options # Mostly ignored backward-compatability options
client=None, var=None, client=None, var=None,
min_disconnect_poll=1, max_disconnect_poll=None, min_disconnect_poll=1, max_disconnect_poll=None,
...@@ -137,10 +138,6 @@ class ClientStorage(object): ...@@ -137,10 +138,6 @@ class ClientStorage(object):
Defaults to None, in which case the cache is not Defaults to None, in which case the cache is not
persistent. See ClientCache for more info. persistent. See ClientCache for more info.
disconnect_poll
The delay in seconds between attempts to connect to the
server, in seconds. Defaults to 1 second.
wait_timeout wait_timeout
Maximum time to wait for results, including connecting. Maximum time to wait for results, including connecting.
...@@ -264,6 +261,7 @@ class ClientStorage(object): ...@@ -264,6 +261,7 @@ class ClientStorage(object):
addr, self, cache, storage, addr, self, cache, storage,
ZEO.asyncio.client.Fallback if read_only_fallback else read_only, ZEO.asyncio.client.Fallback if read_only_fallback else read_only,
wait_timeout or 30, wait_timeout or 30,
ssl = ssl, ssl_server_hostname=ssl_server_hostname,
) )
self._call = self._server.call self._call = self._server.call
self._async = self._server.async self._async = self._server.async
...@@ -726,6 +724,11 @@ class ClientStorage(object): ...@@ -726,6 +724,11 @@ class ClientStorage(object):
try: try:
for oid in self._call('vote', id(txn)) or (): for oid in self._call('vote', id(txn)) or ():
tbuf.serial(oid, ResolvedSerial) tbuf.serial(oid, ResolvedSerial)
except POSException.StorageTransactionError:
# Hm, we got disconnected and reconnected bwtween
# _check_trans and voting. Let's chack the transaction again:
self._check_trans(txn, 'tpc_vote')
raise
except POSException.ConflictError as err: except POSException.ConflictError as err:
oid = getattr(err, 'oid', None) oid = getattr(err, 'oid', None)
if oid is not None: if oid is not None:
......
...@@ -31,7 +31,6 @@ import threading ...@@ -31,7 +31,6 @@ import threading
import time import time
import transaction import transaction
import warnings import warnings
import ZEO.acceptor
import ZEO.asyncio.server import ZEO.asyncio.server
import ZODB.blob import ZODB.blob
import ZODB.event import ZODB.event
...@@ -51,6 +50,8 @@ from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError ...@@ -51,6 +50,8 @@ from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.utils import oid_repr, p64, u64, z64 from ZODB.utils import oid_repr, p64, u64, z64
from .asyncio.server import Acceptor
logger = logging.getLogger('ZEO.StorageServer') logger = logging.getLogger('ZEO.StorageServer')
def log(message, level=logging.INFO, label='', exc_info=False): def log(message, level=logging.INFO, label='', exc_info=False):
...@@ -695,6 +696,7 @@ class StorageServer: ...@@ -695,6 +696,7 @@ class StorageServer:
invalidation_queue_size=100, invalidation_queue_size=100,
invalidation_age=None, invalidation_age=None,
transaction_timeout=None, transaction_timeout=None,
ssl=None,
): ):
"""StorageServer constructor. """StorageServer constructor.
...@@ -767,7 +769,7 @@ class StorageServer: ...@@ -767,7 +769,7 @@ class StorageServer:
storage.registerDB(StorageServerDB(self, name)) storage.registerDB(StorageServerDB(self, name))
self.invalidation_age = invalidation_age self.invalidation_age = invalidation_age
self.zeo_storages_by_storage_id = {} # {storage_id -> [ZEOStorage]} self.zeo_storages_by_storage_id = {} # {storage_id -> [ZEOStorage]}
self.acceptor = ZEO.acceptor.Acceptor(addr, self.new_connection) self.acceptor = Acceptor(self, addr, ssl)
if isinstance(addr, tuple) and addr[0]: if isinstance(addr, tuple) and addr[0]:
self.addr = self.acceptor.addr self.addr = self.acceptor.addr
else: else:
...@@ -789,6 +791,9 @@ class StorageServer: ...@@ -789,6 +791,9 @@ class StorageServer:
timeout.start() timeout.start()
self.timeouts[name] = timeout self.timeouts[name] = timeout
def create_client_handler(self):
return ZEOStorage(self, self.read_only)
def _setup_invq(self, name, storage): def _setup_invq(self, name, storage):
lastInvalidations = getattr(storage, 'lastInvalidations', None) lastInvalidations = getattr(storage, 'lastInvalidations', None)
if lastInvalidations is None: if lastInvalidations is None:
...@@ -803,23 +808,6 @@ class StorageServer: ...@@ -803,23 +808,6 @@ class StorageServer:
self.invq[name] = list(lastInvalidations(self.invq_bound)) self.invq[name] = list(lastInvalidations(self.invq_bound))
self.invq[name].reverse() self.invq[name].reverse()
def new_connection(self, sock, addr):
"""Internal: factory to create a new connection.
"""
logger.debug("new connection %s" % (addr,))
def run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
ZEO.asyncio.server.new_connection(
loop, addr, sock, ZEOStorage(self, self.read_only))
loop.run_forever()
loop.close()
thread = threading.Thread(target=run, name='zeo_client_hander')
thread.setDaemon(True)
thread.start()
def register_connection(self, storage_id, zeo_storage): def register_connection(self, storage_id, zeo_storage):
"""Internal: register a ZEOStorage with a particular storage. """Internal: register a ZEOStorage with a particular storage.
...@@ -1320,4 +1308,3 @@ class Serving(ServerEvent): ...@@ -1320,4 +1308,3 @@ class Serving(ServerEvent):
class Closed(ServerEvent): class Closed(ServerEvent):
pass pass
...@@ -43,7 +43,7 @@ def connection(*args, **kw): ...@@ -43,7 +43,7 @@ def connection(*args, **kw):
ra ra
def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None, def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None,
port=0, **kw): port=0, threaded=True, **kw):
"""Convenience function to start a server for interactive exploration """Convenience function to start a server for interactive exploration
This fuction starts a ZEO server, given a storage configuration or This fuction starts a ZEO server, given a storage configuration or
...@@ -78,7 +78,7 @@ def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None, ...@@ -78,7 +78,7 @@ def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None,
port port
If no ZEO configuration is supplied, the one will be computed If no ZEO configuration is supplied, the one will be computed
from the port. If no port is supplied, one will be chosedn from the port. If no port is supplied, one will be chosedn
randomly. dynamically.
""" """
import os, ZEO.tests.forker import os, ZEO.tests.forker
...@@ -87,4 +87,4 @@ def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None, ...@@ -87,4 +87,4 @@ def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None,
return ZEO.tests.forker.start_zeo_server( return ZEO.tests.forker.start_zeo_server(
storage_conf, zeo_conf, port, keep=True, path=path, storage_conf, zeo_conf, port, keep=True, path=path,
blob_dir=blob_dir, suicide=False, threaded=True, **kw) blob_dir=blob_dir, suicide=False, threaded=threaded, **kw)
================================
asyncio-based networking for ZEO
================================
This package provides the networking interface for ZEO. It provides a
somewhat RPC-like API.
Notes
=====
Sending data immediately: ayncio vs asyncore
--------------------------------------------
The previous ZEO networking implementation used the ``asyncore`` library.
When writing with asyncore, writes were done only from the event loop.
This meant that when sending data, code would have to "wake up" the
event loop, typically after adding data to some sort of output buffer.
Asyncio takes an entirely different and saner approach. When an
application wants to send data, it writes to a transport. All
interactions with a transport (in a correct application) are from the
same thread, which is also the thread running any event loop.
Transports are always either idle or sending data. When idle, the
transport writes to the outout socket immediately. If not all data
isn't sent, then it buffers it and becomes sending. If a transport is
sending, then we know that the socket isn't ready for more data, so
``write`` can just buffer the data. There's no point in waking up the
event loop, because the socket will do so when it's ready for more
data.
An exception to the paragraph above occurs when operations cross
threads, as occures for most client operations and when a transaction
commits on the server and results have to be sent to other clients. In
these cases, a call_soon_threadsafe method is used which queues an
operation and has to wake up an event loop to process it.
Server threading
----------------
There are currently two server implementations, an implementation that
used a thread per client (and a thread to listen for connections),
``ZEO.asyncio.mtacceptor.Acceptor``, and an implementation that uses a
single networking thread, ``ZEO.asyncio.server.Acceptor``. The
implementation is selected by changing an import in
``ZEO.StorageServer``. The currently-used implementation is
``ZEO.asyncio.server.Acceptor``, although this sentance is likely to
rot, so check the import to be sure. (Maybe this should be configurable.)
ZEO switched to a multi-threaded implementation several years ago
because it was found to improve performance for large databases using
magnetic disks. Because client threads are always working on behalf of
a single client, there's not really an issue with making blocking
calls, such as executing slow I/O operations.
Initially, the asyncio-based implementation used a multi-threaded
server. A simple thread accepted connections and handed accepted
sockets to ``create_connection``. This became a problem when SSL was
added because ``create_connection`` sets up SSL conections as client
connections, and doesn't provide an option to create server
connections.
In response, I created an ``asyncio.Server``-based implementation.
This required using a single thread. This was a pretty trivial
change, however, it led to the tests becoming unstable to the point
that it was impossible to run all tests without some failing. One
test was broken due to a ``asyncio.Server`` `bug
<http://bugs.python.org/issue27386>`_. It's unclear whether the test
instability is due to ``asyncio.Server`` problems or due to latent
test (or ZEO) bugs, but even after beating the tests mostly into
submission, tests failures are more likely when using
``asyncio.Server``. Beatings will continue.
While fighting test failures using ``asyncio.Server``, the
multi-threaded implementation was updated to use a monkey patch to
allow it to create SSL server connections. Aside from the real risk of a
monkey patch, this works very well.
Both implementations seem to perform about the same.
...@@ -36,7 +36,7 @@ class Protocol(base.Protocol): ...@@ -36,7 +36,7 @@ class Protocol(base.Protocol):
def __init__(self, loop, def __init__(self, loop,
addr, client, storage_key, read_only, connect_poll=1, addr, client, storage_key, read_only, connect_poll=1,
heartbeat_interval=60): heartbeat_interval=60, ssl=None, ssl_server_hostname=None):
"""Create a client interface """Create a client interface
addr is either a host,port tuple or a string file name. addr is either a host,port tuple or a string file name.
...@@ -54,6 +54,8 @@ class Protocol(base.Protocol): ...@@ -54,6 +54,8 @@ class Protocol(base.Protocol):
self.connect_poll = connect_poll self.connect_poll = connect_poll
self.heartbeat_interval = heartbeat_interval self.heartbeat_interval = heartbeat_interval
self.futures = {} # { message_id -> future } self.futures = {} # { message_id -> future }
self.ssl = ssl
self.ssl_server_hostname = ssl_server_hostname
self.connect() self.connect()
...@@ -81,10 +83,11 @@ class Protocol(base.Protocol): ...@@ -81,10 +83,11 @@ class Protocol(base.Protocol):
if isinstance(self.addr, tuple): if isinstance(self.addr, tuple):
host, port = self.addr host, port = self.addr
cr = self.loop.create_connection( cr = self.loop.create_connection(
self.protocol_factory, host or 'localhost', port) self.protocol_factory, host or 'localhost', port,
ssl=self.ssl, server_hostname=self.ssl_server_hostname)
else: else:
cr = self.loop.create_unix_connection( cr = self.loop.create_unix_connection(
self.protocol_factory, self.addr) self.protocol_factory, self.addr, ssl=self.ssl)
self._connecting = cr = asyncio.async(cr, loop=self.loop) self._connecting = cr = asyncio.async(cr, loop=self.loop)
...@@ -265,7 +268,8 @@ class Client: ...@@ -265,7 +268,8 @@ class Client:
def __init__(self, loop, def __init__(self, loop,
addrs, client, cache, storage_key, read_only, connect_poll, addrs, client, cache, storage_key, read_only, connect_poll,
register_failed_poll=9): register_failed_poll=9,
ssl=None, ssl_server_hostname=None):
"""Create a client interface """Create a client interface
addr is either a host,port tuple or a string file name. addr is either a host,port tuple or a string file name.
...@@ -281,6 +285,8 @@ class Client: ...@@ -281,6 +285,8 @@ class Client:
self.connect_poll = connect_poll self.connect_poll = connect_poll
self.register_failed_poll = register_failed_poll self.register_failed_poll = register_failed_poll
self.client = client self.client = client
self.ssl = ssl
self.ssl_server_hostname = ssl_server_hostname
for name in Protocol.client_delegated: for name in Protocol.client_delegated:
setattr(self, name, getattr(client, name)) setattr(self, name, getattr(client, name))
self.cache = cache self.cache = cache
...@@ -344,6 +350,8 @@ class Client: ...@@ -344,6 +350,8 @@ class Client:
self.protocols = [ self.protocols = [
Protocol(self.loop, addr, self, Protocol(self.loop, addr, self,
self.storage_key, self.read_only, self.connect_poll, self.storage_key, self.read_only, self.connect_poll,
ssl=self.ssl,
ssl_server_hostname=self.ssl_server_hostname,
) )
for addr in self.addrs for addr in self.addrs
] ]
...@@ -584,14 +592,16 @@ class Client: ...@@ -584,14 +592,16 @@ class Client:
class ClientRunner: class ClientRunner:
def set_options(self, addrs, wrapper, cache, storage_key, read_only, def set_options(self, addrs, wrapper, cache, storage_key, read_only,
timeout=30, disconnect_poll=1): timeout=30, disconnect_poll=1,
**kwargs):
self.__args = (addrs, wrapper, cache, storage_key, read_only, self.__args = (addrs, wrapper, cache, storage_key, read_only,
disconnect_poll) disconnect_poll)
self.__kwargs = kwargs
self.timeout = timeout self.timeout = timeout
def setup_delegation(self, loop): def setup_delegation(self, loop):
self.loop = loop self.loop = loop
self.client = Client(loop, *self.__args) self.client = Client(loop, *self.__args, **self.__kwargs)
self.call_threadsafe = self.client.call_threadsafe self.call_threadsafe = self.client.call_threadsafe
self.call_async_threadsafe = self.client.call_async_threadsafe self.call_async_threadsafe = self.client.call_async_threadsafe
...@@ -685,9 +695,10 @@ class ClientThread(ClientRunner): ...@@ -685,9 +695,10 @@ class ClientThread(ClientRunner):
def __init__(self, addrs, client, cache, def __init__(self, addrs, client, cache,
storage_key='1', read_only=False, timeout=30, storage_key='1', read_only=False, timeout=30,
disconnect_poll=1): disconnect_poll=1, ssl=None, ssl_server_hostname=None):
self.set_options(addrs, client, cache, storage_key, read_only, self.set_options(addrs, client, cache, storage_key, read_only,
timeout, disconnect_poll) timeout, disconnect_poll,
ssl=ssl, ssl_server_hostname=ssl_server_hostname)
self.thread = threading.Thread( self.thread = threading.Thread(
target=self.run, target=self.run,
name="%s zeo client networking thread" % client.__name__, name="%s zeo client networking thread" % client.__name__,
......
...@@ -11,8 +11,43 @@ ...@@ -11,8 +11,43 @@
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
"""Multi-threaded server connectin acceptor
Each connection is run in it's own thread. Testing serveral years ago
suggsted that this was a win, but ZODB shootout and another
lower-level tests suggest otherwise. It's really unclear, which is
why we're keeping this around for now.
Asyncio doesn't let you accept connections in one thread and handle
them in another. To get around this, we have a listener implemented
using asyncore, but when we get a connection, we hand the socket to
asyncio. This worked well until we added SSL support. (Even then, it
worked on Mac OS X for some reason.)
SSL + non-blocking sockets requires special care, which asyncio
provides. Unfortunately, create_connection, assumes it's creating a
client connection. It would be easy to fix this,
http://bugs.python.org/issue27392, but it's hard to justify the fix to
get it accepted, so we won't bother for now. This currently uses a
horrible monley patch to work with SSL.
To use this module, replace::
from .asyncio.server import Acceptor
with:
from .asyncio.mtacceptor import Acceptor
in ZEO.StorageServer.
"""
import asyncio
import asyncore import asyncore
import socket import socket
import threading
from .server import ServerProtocol
# _has_dualstack: True if the dual-stack sockets are supported # _has_dualstack: True if the dual-stack sockets are supported
try: try:
...@@ -36,13 +71,18 @@ import logging ...@@ -36,13 +71,18 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Acceptor(asyncore.dispatcher): class Acceptor(asyncore.dispatcher):
"""A server that accepts incoming RPC connections""" """A server that accepts incoming RPC connections
def __init__(self, addr, factory): And creates a separate thread for each.
self.socket_map = {} """
asyncore.dispatcher.__init__(self, map=self.socket_map)
def __init__(self, storage_server, addr, ssl):
self.storage_server = storage_server
self.addr = addr self.addr = addr
self.factory = factory self.__socket_map = {}
asyncore.dispatcher.__init__(self, map=self.__socket_map)
self.ssl_context = ssl
self._open_socket() self._open_socket()
def _open_socket(self): def _open_socket(self):
...@@ -120,17 +160,50 @@ class Acceptor(asyncore.dispatcher): ...@@ -120,17 +160,50 @@ class Acceptor(asyncore.dispatcher):
addr = addr[:2] addr = addr[:2]
try: try:
c = self.factory(sock, addr) logger.debug("new connection %s" % (addr,))
def run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
zs = self.storage_server.create_client_handler()
protocol = ServerProtocol(loop, self.addr, zs)
protocol.stop = loop.stop
if self.ssl_context is None:
cr = loop.create_connection((lambda : protocol), sock=sock)
else:
#######################################################
# XXX See http://bugs.python.org/issue27392 :(
_make_ssl_transport = loop._make_ssl_transport
def make_ssl_transport(*a, **kw):
kw['server_side'] = True
return _make_ssl_transport(*a, **kw)
loop._make_ssl_transport = make_ssl_transport
#
#######################################################
cr = loop.create_connection(
(lambda : protocol), sock=sock,
ssl=self.ssl_context,
server_hostname='fu' # http://bugs.python.org/issue27391
)
asyncio.async(cr, loop=loop)
loop.run_forever()
loop.close()
thread = threading.Thread(target=run, name='zeo_client_hander')
thread.setDaemon(True)
thread.start()
except Exception: except Exception:
if sock.fileno() in asyncore.socket_map: if sock.fileno() in self.__socket_map:
del asyncore.socket_map[sock.fileno()] del self.__socket_map[sock.fileno()]
logger.exception("Error in handle_accept") logger.exception("Error in handle_accept")
else: else:
logger.info("connect from %s: %s", repr(addr), c) logger.info("connect from %s", repr(addr))
def loop(self, timeout=30.0): def loop(self, timeout=30.0):
try: try:
asyncore.loop(map=self.socket_map, timeout=timeout) asyncore.loop(map=self.__socket_map, timeout=timeout)
except Exception: except Exception:
if not self.__closed: if not self.__closed:
raise # Unexpected exc raise # Unexpected exc
...@@ -142,4 +215,4 @@ class Acceptor(asyncore.dispatcher): ...@@ -142,4 +215,4 @@ class Acceptor(asyncore.dispatcher):
if not self.__closed: if not self.__closed:
self.__closed = True self.__closed = True
asyncore.dispatcher.close(self) asyncore.dispatcher.close(self)
logger.debug("Closed accepter, %s", len(self.socket_map)) logger.debug("Closed accepter, %s", len(self.__socket_map))
...@@ -51,11 +51,12 @@ class ServerProtocol(base.Protocol): ...@@ -51,11 +51,12 @@ class ServerProtocol(base.Protocol):
if exc: if exc:
logger.error("Disconnected %s:%s", exc.__class__.__name__, exc) logger.error("Disconnected %s:%s", exc.__class__.__name__, exc)
self.zeo_storage.notify_disconnected() self.zeo_storage.notify_disconnected()
self.stop()
self.loop.stop() def stop(self):
pass # Might be replaced when running a thread per client
def finish_connect(self, protocol_version): def finish_connect(self, protocol_version):
if protocol_version == b'ruok': if protocol_version == b'ruok':
self._write(json.dumps(self.zeo_storage.ruok()).encode("ascii")) self._write(json.dumps(self.zeo_storage.ruok()).encode("ascii"))
self.close() self.close()
...@@ -199,3 +200,73 @@ class MTDelay(Delay): ...@@ -199,3 +200,73 @@ class MTDelay(Delay):
def error(self, exc_info): def error(self, exc_info):
self.ready.wait() self.ready.wait()
self.protocol.call_soon_threadsafe(Delay.error, self, exc_info) self.protocol.call_soon_threadsafe(Delay.error, self, exc_info)
class Acceptor:
def __init__(self, storage_server, addr, ssl):
self.storage_server = storage_server
self.addr = addr
self.ssl_context = ssl
self.event_loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if isinstance(addr, tuple):
cr = loop.create_server(self.factory, addr[0], addr[1],
reuse_address=True, ssl=ssl)
else:
cr = loop.create_unix_server(self.factory, addr, ssl=ssl)
f = asyncio.async(cr)
@f.add_done_callback
def listenting(f):
server = f.result()
self.server = server
if isinstance(addr, tuple) and addr[1] == 0:
addrs = [s.getsockname() for s in server.sockets]
addrs = [a for a in addrs if len(a) == len(addr)]
if addrs:
self.addr = addrs[0]
else:
self.addr = server.sockets[0].getsockname()[:len(addr)]
logger.info("listening on %s", str(addr))
loop.run_until_complete(f)
def factory(self):
try:
logger.debug("Accepted connection")
zs = self.storage_server.create_client_handler()
protocol = ServerProtocol(self.event_loop, self.addr, zs)
except Exception:
logger.exception("Failure in protocol factory")
return protocol
def loop(self, timeout=None):
self.event_loop.run_forever()
self.event_loop.close()
closed = False
def close(self):
if not self.closed:
self.closed = True
self.event_loop.call_soon_threadsafe(self._close)
def _close(self):
loop = self.event_loop
self.server.close()
f = asyncio.async(self.server.wait_closed(), loop=loop)
@f.add_done_callback
def server_closed(f):
# stop the loop when the server closes:
loop.call_soon(loop.stop)
def timeout():
logger.warning("Timed out closing asyncio.Server")
loop.call_soon(loop.stop)
# But if the server doesn't close in a second, stop the loop anyway.
loop.call_later(1, timeout)
...@@ -31,7 +31,8 @@ class Loop: ...@@ -31,7 +31,8 @@ class Loop:
future.set_exception(ConnectionRefusedError()) future.set_exception(ConnectionRefusedError())
def create_connection( def create_connection(
self, protocol_factory, host=None, port=None, sock=None self, protocol_factory, host=None, port=None, sock=None,
ssl=None, server_hostname=None
): ):
future = asyncio.Future(loop=self) future = asyncio.Future(loop=self)
if sock is None: if sock is None:
...@@ -85,7 +86,7 @@ class Transport: ...@@ -85,7 +86,7 @@ class Transport:
capacity = 1 << 64 capacity = 1 << 64
paused = False paused = False
extra = dict(peername='1.2.3.4') extra = dict(peername='1.2.3.4', sockname=('127.0.0.1', 4200))
def __init__(self, protocol): def __init__(self, protocol):
self.data = [] self.data = []
......
<component> <component>
<sectiontype name="zeo"> <import package="ZODB"/>
<description> <sectiontype name="ssl" datatype="ZEO.zconfig.client_ssl">
The content of a ZEO section describe operational parameters
of a ZEO server except for the storage(s) to be served.
</description>
<key name="address" datatype="socket-binding-address" <key name="certificate" datatype="existing-dirpath" required="yes">
required="yes">
<description> <description>
The address at which the server should listen. This can be in The full path to an SSL certificate file.
the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
</description> </description>
</key> </key>
<key name="read-only" datatype="boolean" <key name="key" datatype="existing-dirpath" required="no">
required="no"
default="false">
<description> <description>
Flag indicating whether the server should operate in read-only The full path to an SSL key file for the client certificate.
mode. Defaults to false. Note that even if the server is
operating in writable mode, individual storages may still be
read-only. But if the server is in read-only mode, no write
operations are allowed, even if the storages are writable. Note
that pack() is considered a read-only operation.
</description> </description>
</key> </key>
<key name="invalidation-queue-size" datatype="integer" <key name="password-function" required="no">
required="no"
default="100">
<description> <description>
The storage server keeps a queue of the objects modified by the Dotted name of importable function for retrieving a password
last N transactions, where N == invalidation_queue_size. This for the client certificate key.
queue is used to speed client cache verification when a client
disconnects for a short period of time.
</description> </description>
</key> </key>
<key name="invalidation-age" datatype="float" required="no"> <key name="authenticate" datatype="existing-dirpath" required="no">
<description> <description>
The maximum age of a client for which quick-verification Path to a file or directory containing server certificates to be
invalidations will be provided by iterating over the served authenticated.
storage. This option should only be used if the served storage
supports efficient iteration from a starting point near the
end of the transaction history (e.g. end of file).
</description> </description>
</key> </key>
<key name="transaction-timeout" datatype="integer" <key name="check-hostname" datatype="boolean" required="no" default="true">
required="no">
<description> <description>
The maximum amount of time to wait for a transaction to commit Verify the host name in the server certificate is as expected.
after acquiring the storage lock, specified in seconds. If the
transaction takes too long, the client connection will be closed
and the transaction aborted.
</description> </description>
</key> </key>
<key name="pid-filename" datatype="existing-dirpath" <key name="server-hostname" required="no">
required="no">
<description> <description>
The full path to the file in which to write the ZEO server's Process ID Host name to use for SSL host name checks.
at startup. If omitted, $INSTANCE/var/ZEO.pid is used.
If ``check-hostname`` is true then use this as the
value to check. If an address is a host/port pair, then this
defaults to the host in the address.
</description> </description>
<metadefault>$INSTANCE/var/ZEO.pid (or $clienthome/ZEO.pid)</metadefault>
</key> </key>
<!-- DM 2006-06-12: added option --> </sectiontype>
<key name="drop-cache-rather-verify" datatype="boolean"
required="no" default="false"> <sectiontype name="clientstorage" datatype="ZEO.zconfig.ClientStorageConfig"
<description> implements="ZODB.storage">
indicates that the cache should be dropped rather than
verified when the verification optimization is not <section type="ssl" name="*" attribute="ssl" />
available (e.g. when the ZEO server restarted).
</description> <multikey name="server" datatype="socket-connection-address" required="yes"
/>
<key name="cache-size" datatype="byte-size" default="20MB">
<description>
The cache size in bytes, KB or MB. This defaults to a 20MB.
Optional ``KB`` or ``MB`` suffixes can (and usually are) used to
specify units other than bytes.
</description>
</key>
<key name="cache-path" required="no">
<description>
The file path of a persistent cache file
</description>
</key>
<key name="blob-dir" required="no">
<description>
Path name to the blob cache directory.
</description>
</key>
<key name="shared-blob-dir" required="no" default="no"
datatype="boolean">
<description>
Tells whether the cache is a shared writable directory
and that the ZEO protocol should not transfer the file
but only the filename when committing.
</description>
</key>
<key name="blob-cache-size" required="no" datatype="byte-size">
<description>
Maximum size of the ZEO blob cache, in bytes. If not set, then
the cache size isn't checked and the blob directory will
grow without bound.
This option is ignored if shared_blob_dir is true.
</description>
</key>
<key name="blob-cache-size-check" required="no" datatype="integer">
<description>
ZEO check size as percent of blob_cache_size. The ZEO
cache size will be checked when this many bytes have been
loaded into the cache. Defaults to 10% of the blob cache
size. This option is ignored if shared_blob_dir is true.
</description>
</key>
<key name="read-only" datatype="boolean" default="off">
<description>
A flag indicating whether this should be a read-only storage,
defaulting to false (i.e. writing is allowed by default).
</description>
</key>
<key name="read-only-fallback" datatype="boolean" default="off">
<description>
A flag indicating whether a read-only remote storage should be
acceptable as a fallback when no writable storages are
available. Defaults to false. At most one of read_only and
read_only_fallback should be true.
</description>
</key>
<key name="wait-timeout" datatype="integer" default="30">
<description>
How long to wait for an initial connection, defaulting to 30
seconds. If an initial connection can't be made within this time
limit, then creation of the client storage will fail with a
``ZEO.Exceptions.ClientDisconnected`` exception.
After the initial connection, if the client is disconnected:
- In-flight server requests will fail with a
``ZEO.Exceptions.ClientDisconnected`` exception.
- New requests will block for up to ``wait_timeout`` waiting for a
connection to be established before failing with a
``ZEO.Exceptions.ClientDisconnected`` exception.
</description>
</key>
<key name="client-label" required="no">
<description>
A label for the client in server logs
</description>
</key>
<!-- The following are undocumented, but not gone. :) -->
<key name="storage" default="1">
<description>
The name of the storage that the client wants to use. If the
ZEO server serves more than one storage, the client selects
the storage it wants to use by name. The default name is '1',
which is also the default name for the ZEO server.
</description>
</key>
<key name="name" default="">
<description>
The storage name. If unspecified, the address of the server
will be used as the name.
</description>
</key> </key>
</sectiontype> </sectiontype>
......
...@@ -105,6 +105,7 @@ class ZEOOptionsMixin: ...@@ -105,6 +105,7 @@ class ZEOOptionsMixin:
"t:", "timeout=", float) "t:", "timeout=", float)
self.add('pid_file', 'zeo.pid_filename', self.add('pid_file', 'zeo.pid_filename',
None, 'pid-file=') None, 'pid-file=')
self.add("ssl", "zeo.ssl")
class ZEOOptions(ZDOptions, ZEOOptionsMixin): class ZEOOptions(ZDOptions, ZEOOptionsMixin):
...@@ -341,6 +342,7 @@ def create_server(storages, options): ...@@ -341,6 +342,7 @@ def create_server(storages, options):
invalidation_queue_size = options.invalidation_queue_size, invalidation_queue_size = options.invalidation_queue_size,
invalidation_age = options.invalidation_age, invalidation_age = options.invalidation_age,
transaction_timeout = options.transaction_timeout, transaction_timeout = options.transaction_timeout,
ssl = options.ssl,
) )
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
<import package="ZODB"/> <import package="ZODB"/>
<!-- Use the ZEO server information structure. --> <!-- Use the ZEO server information structure. -->
<import package="ZEO"/> <import package="ZEO" file="server.xml" />
<import package="ZConfig.components.logger"/> <import package="ZConfig.components.logger"/>
......
<component>
<sectiontype name="ssl" datatype="ZEO.zconfig.server_ssl">
<key name="certificate" datatype="existing-dirpath" required="yes">
<description>
The full path to an SSL certificate file.
</description>
</key>
<key name="key" datatype="existing-dirpath" required="no">
<description>
The full path to an SSL key file for the server certificate.
</description>
</key>
<key name="password-function" required="no">
<description>
Dotted name of importable function for retrieving a password
for the client certificate key.
</description>
</key>
<key name="authenticate" required="yes">
<description>
Path to a file or directory containing client certificates to
be authenticated. This can also be - or SIGNED to require
signed client certificates.
</description>
</key>
</sectiontype>
<sectiontype name="zeo">
<section type="ssl" name="*" attribute="ssl" />
<description>
The content of a ZEO section describe operational parameters
of a ZEO server except for the storage(s) to be served.
</description>
<key name="address" datatype="socket-binding-address"
required="yes">
<description>
The address at which the server should listen. This can be in
the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
</description>
</key>
<key name="read-only" datatype="boolean"
required="no"
default="false">
<description>
Flag indicating whether the server should operate in read-only
mode. Defaults to false. Note that even if the server is
operating in writable mode, individual storages may still be
read-only. But if the server is in read-only mode, no write
operations are allowed, even if the storages are writable. Note
that pack() is considered a read-only operation.
</description>
</key>
<key name="invalidation-queue-size" datatype="integer"
required="no"
default="100">
<description>
The storage server keeps a queue of the objects modified by the
last N transactions, where N == invalidation_queue_size. This
queue is used to speed client cache verification when a client
disconnects for a short period of time.
</description>
</key>
<key name="invalidation-age" datatype="float" required="no">
<description>
The maximum age of a client for which quick-verification
invalidations will be provided by iterating over the served
storage. This option should only be used if the served storage
supports efficient iteration from a starting point near the
end of the transaction history (e.g. end of file).
</description>
</key>
<key name="transaction-timeout" datatype="integer"
required="no">
<description>
The maximum amount of time to wait for a transaction to commit
after acquiring the storage lock, specified in seconds. If the
transaction takes too long, the client connection will be closed
and the transaction aborted.
</description>
</key>
<key name="pid-filename" datatype="existing-dirpath"
required="no">
<description>
The full path to the file in which to write the ZEO server's Process ID
at startup. If omitted, $INSTANCE/var/ZEO.pid is used.
</description>
<metadefault>$INSTANCE/var/ZEO.pid (or $clienthome/ZEO.pid)</metadefault>
</key>
</sectiontype>
</component>
...@@ -127,7 +127,8 @@ class CommitLockTests: ...@@ -127,7 +127,8 @@ class CommitLockTests:
# list is a socket domain (AF_INET, AF_UNIX, etc.) and an # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
# address. # address.
addr = self._storage._addr addr = self._storage._addr
new = ZEO.ClientStorage.ClientStorage(addr, wait=1) new = ZEO.ClientStorage.ClientStorage(
addr, wait=1, **self._client_options())
new.registerDB(DummyDB()) new.registerDB(DummyDB())
return new return new
......
...@@ -16,7 +16,6 @@ import contextlib ...@@ -16,7 +16,6 @@ import contextlib
import os import os
import time import time
import socket import socket
import asyncore
import threading import threading
import logging import logging
...@@ -36,6 +35,8 @@ import ZODB.tests.util ...@@ -36,6 +35,8 @@ import ZODB.tests.util
import transaction import transaction
from transaction import Transaction from transaction import Transaction
from . import testssl
logger = logging.getLogger('ZEO.tests.ConnectionTests') logger = logging.getLogger('ZEO.tests.ConnectionTests')
ZERO = '\0'*8 ZERO = '\0'*8
...@@ -66,7 +67,6 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -66,7 +67,6 @@ class CommonSetupTearDown(StorageTestBase):
keep = 0 keep = 0
invq = None invq = None
timeout = None timeout = None
monitor = 0
db_class = DummyDB db_class = DummyDB
def setUp(self, before=None): def setUp(self, before=None):
...@@ -79,10 +79,9 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -79,10 +79,9 @@ class CommonSetupTearDown(StorageTestBase):
self.__super_setUp() self.__super_setUp()
logging.info("setUp() %s", self.id()) logging.info("setUp() %s", self.id())
self.file = 'storage_conf' self.file = 'storage_conf'
self.addr = []
self._servers = [] self._servers = []
self.caches = [] self.caches = []
self._newAddr() self.addr = [('localhost', 0)]
self.startServer() self.startServer()
def tearDown(self): def tearDown(self):
...@@ -147,18 +146,17 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -147,18 +146,17 @@ class CommonSetupTearDown(StorageTestBase):
min_disconnect_poll=0.1, min_disconnect_poll=0.1,
read_only=read_only, read_only=read_only,
read_only_fallback=read_only_fallback, read_only_fallback=read_only_fallback,
username=username, **self._client_options())
password=password,
realm=realm)
storage.registerDB(DummyDB()) storage.registerDB(DummyDB())
return storage return storage
def _client_options(self):
return {}
def getServerConfig(self, addr, ro_svr): def getServerConfig(self, addr, ro_svr):
zconf = forker.ZEOConfig(addr) zconf = forker.ZEOConfig(addr)
if ro_svr: if ro_svr:
zconf.read_only = 1 zconf.read_only = 1
if self.monitor:
zconf.monitor_address = ("", 42000)
if self.invq: if self.invq:
zconf.invalidation_queue_size = self.invq zconf.invalidation_queue_size = self.invq
if self.timeout: if self.timeout:
...@@ -179,6 +177,8 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -179,6 +177,8 @@ class CommonSetupTearDown(StorageTestBase):
zeoport, stop = forker.start_zeo_server( zeoport, stop = forker.start_zeo_server(
sconf, zconf, addr[1], keep, **kw) sconf, zconf, addr[1], keep, **kw)
self._servers.append(stop) self._servers.append(stop)
if addr[1] == 0:
self.addr[index] = zeoport
def shutdownServer(self, index=0): def shutdownServer(self, index=0):
logging.info("shutdownServer(index=%d) @ %s" % logging.info("shutdownServer(index=%d) @ %s" %
...@@ -191,31 +191,16 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -191,31 +191,16 @@ class CommonSetupTearDown(StorageTestBase):
def pollUp(self, timeout=30.0, storage=None): def pollUp(self, timeout=30.0, storage=None):
if storage is None: if storage is None:
storage = self._storage storage = self._storage
# Poll until we're connected. storage.server_status()
now = time.time()
giveup = now + timeout
while not storage.is_connected():
asyncore.poll(0.1)
now = time.time()
if now > giveup:
self.fail("timed out waiting for storage to connect")
# When the socket map is empty, poll() returns immediately,
# and this is a pure busy-loop then. At least on some Linux
# flavors, that can starve the thread trying to connect,
# leading to grossly increased runtime (typical) or bogus
# "timed out" failures. A little sleep here cures both.
time.sleep(0.1)
def pollDown(self, timeout=30.0): def pollDown(self, timeout=30.0):
# Poll until we're disconnected. # Poll until we're disconnected.
now = time.time() now = time.time()
giveup = now + timeout giveup = now + timeout
while self._storage.is_connected(): while self._storage.is_connected():
asyncore.poll(0.1)
now = time.time() now = time.time()
if now > giveup: if now > giveup:
self.fail("timed out waiting for storage to disconnect") self.fail("timed out waiting for storage to disconnect")
# See pollUp() for why we sleep a little here.
time.sleep(0.1) time.sleep(0.1)
...@@ -564,6 +549,18 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -564,6 +549,18 @@ class ConnectionTests(CommonSetupTearDown):
self.assertRaises(ClientDisconnected, self.assertRaises(ClientDisconnected,
self._storage.load, b'\0'*8, '') self._storage.load, b'\0'*8, '')
class SSLConnectionTests(ConnectionTests):
def getServerConfig(self, addr, ro_svr):
return testssl.server_config.replace(
'127.0.0.1:0',
'{}: {}\nread-only {}'.format(
addr[0], addr[1], 'true' if ro_svr else 'false'))
def _client_options(self):
return {'ssl': testssl.client_ssl()}
class InvqTests(CommonSetupTearDown): class InvqTests(CommonSetupTearDown):
invq = 3 invq = 3
...@@ -966,7 +963,7 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -966,7 +963,7 @@ class TimeoutTests(CommonSetupTearDown):
self.assertRaises(ClientDisconnected, storage.tpc_finish, txn) self.assertRaises(ClientDisconnected, storage.tpc_finish, txn)
# Make sure it's logged as CRITICAL # Make sure it's logged as CRITICAL
for line in open("server-%s.log" % self.addr[0][1]): for line in open("server-0.log"):
if (('Transaction timeout after' in line) and if (('Transaction timeout after' in line) and
('CRITICAL ZEO.StorageServer' in line) ('CRITICAL ZEO.StorageServer' in line)
): ):
......
-----BEGIN CERTIFICATE-----
MIID/TCCAuWgAwIBAgIJAJWOSC4oLyp9MA0GCSqGSIb3DQEBBQUAMFwxCzAJBgNV
BAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9EQjERMA8GA1UEAxMIem9k
Yi5vcmcxHjAcBgkqhkiG9w0BCQEWD2NsaWVudEB6b2RiLm9yZzAeFw0xNjA2MjMx
NTA3MTNaFw0xNzA2MjMxNTA3MTNaMFwxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJW
QTENMAsGA1UEChMEWk9EQjERMA8GA1UEAxMIem9kYi5vcmcxHjAcBgkqhkiG9w0B
CQEWD2NsaWVudEB6b2RiLm9yZzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
ggEBANqtLqtrzwDv0nakKiTX5ZtSHOaTmfwHzHsZJkcNf7kJUgGtE0oe3cIj4iAC
CCpPfUf9OfIA1NpmDsvPMqw80ho1o9g8QZWfW6QOTj2kbnanrRqMDmRvrWlzMQIe
+7EabYxbTjyduk76N2Sa4Tf/my6Yton3zyKtdjSJzoQ/SAKT+Nvjt5s47I4THEFY
gN7Njbg1FyihKwuwR64EUsyBanutKvXVT7gnB1V2cQhn+LW+NwgzsxKnptvyvBGr
Ago+uoxrHlIQu59xznS2vA3Ck2K3hIOnXpXYYGeRYKzplZLZnCfZ2uQheiO3ioEO
UCXICbPMxA7IEpTC75j1n9a3HKcCAwEAAaOBwTCBvjAdBgNVHQ4EFgQUCg1EhkHz
qYFOgbI/D6zR1tcwHBcwgY4GA1UdIwSBhjCBg4AUCg1EhkHzqYFOgbI/D6zR1tcw
HBehYKReMFwxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9E
QjERMA8GA1UEAxMIem9kYi5vcmcxHjAcBgkqhkiG9w0BCQEWD2NsaWVudEB6b2Ri
Lm9yZ4IJAJWOSC4oLyp9MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADggEB
AJk3TviNGmjzCuDzLajeLEB9Iy285Fl3D/3TvThSDECS3cUq+i8fQm0cjOYnE/rK
6Lpg6soeQFetoWQsNT2uvxyv3iZEOtBwNhQKaKTkzIRTmMx/aWM0zG0c3psepC6c
1Fgp0HAts2JKC4Ni7zHFBDb8YZi87IUHYNKuJKcUWKiEgeHu2zCI1Q43FMSKoaG8
XwYi1Mxw6TQQtjZrMnNPSeO7zySBuCw10bCZSMC5xvsqckfREifRT//4A0/COWYK
/p6TZMTaMjrK8fOaPpap314QnLf80P6oLEZ7wkghaNuyq8IzgATuxYVy21132MNB
qZIUS+iblQAZDSHnQJoehQQ=
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEA2q0uq2vPAO/SdqQqJNflm1Ic5pOZ/AfMexkmRw1/uQlSAa0T
Sh7dwiPiIAIIKk99R/058gDU2mYOy88yrDzSGjWj2DxBlZ9bpA5OPaRudqetGowO
ZG+taXMxAh77sRptjFtOPJ26Tvo3ZJrhN/+bLpi2iffPIq12NInOhD9IApP42+O3
mzjsjhMcQViA3s2NuDUXKKErC7BHrgRSzIFqe60q9dVPuCcHVXZxCGf4tb43CDOz
Eqem2/K8EasCCj66jGseUhC7n3HOdLa8DcKTYreEg6deldhgZ5FgrOmVktmcJ9na
5CF6I7eKgQ5QJcgJs8zEDsgSlMLvmPWf1rccpwIDAQABAoIBAQDSju7hIG2x+Tou
AuSRlVEAvZAWdQlQJDJAVXcF83mIMfFEq+Jm/FGLHgIdz9cM5n07VBj3bNWHdb3J
gTjJn8audffNvjdoWoli7mNn92xl1A5aAYHaM65GWyRVZn/zh/7zpvcuZrF+Wm/7
7yXtRbGmrGUXdAV+3odzDz5LGKO91fTuM2nW0j+p7+q2Bzko7+rl9AVxaveco4pt
TtqXX2eOC3wTcNotBfJJD89/+/szg62K4CYCUAaetKMPcVrgQ4v0YHakOl8lJJxW
q7XiqhPyjeZp6h8e9dtVCkeZHa3xacuoftF2w3FslVEX/LOooAKf07PU1xxJezZN
trP11pgBAoGBAPoysqD9OW3C0WyIXm/Lx1Pncr/p/kKYlXNDWQRmizHmfLC4xrBI
n75mYSp7BJ7Gh2W7lW9lnyJ0uaqXdqOOswO/baTeSEvvhL+lLs4F1cQWemjTD4qy
KqCxCCbTf8gZPssiJXsXGmf6yWAdpjfYUxarxB0Ks6wHLBpQHctvVebJAoGBAN+/
W0+yAXssr7AWgKAyiHtqE/MiI8e0bDJWDjHxaPvnSVyDwVOARj/JKvw7jnhg8FKJ
1iWtl8ym2ktcAQygSl3HW4wggD59BbeXrUQr4nZi8wLMTpLxnSfITFrwXpwVShT4
8KJPR46W/Plkphm1fZn6Lr0uGJ12pV+iPAq/F+/vAoGALmRoKuHJXEjbfDxtBl3K
wAwSgvNoagDQ9WZvgxlghggu5rXcYaOVu0BQlAfre2VkhcCanOVC9KigJLmhDgLP
vsooEoIE9c+b1c1TOHBsiseAOx+nqhgPP2yUDl75Oqkzs4bJXGGUS+N8o43b3E8I
WRPQcXIijqtlyhtA6w/h5cECgYEA2M6DnGXQKZrTYr1rRc+xkGTpj9607P5XGS9p
8dsK74zd+VdyLYdOiuBTVrYfB2ZneJM3fqsHPLcxL3SnT6TCarySaOXVXremoo/G
xRgBCNY4w61VNe4JalMcKcJg6r12W3wdMCnCHNkRqFdu29qRKnLSd14DXBFrjY+W
vpMMjuECgYEAtNOm9WTCAlVbGTMplXcUJBWNwdOz+sHMi+PhP/fTNm0XH3fepFeQ
th7b/RYocTwAHGu8mArX2DgXAfDQAYMPFN8vmbh0XQsQV+iuja9MO2TagTyVSe3x
DBOTFCLg8oWtzwySZ6BKR/KsGI2ryRxyE1VojV0zNXRyZqDCm+G4Vs0=
-----END RSA PRIVATE KEY-----
...@@ -14,67 +14,46 @@ ZODB.notify. ...@@ -14,67 +14,46 @@ ZODB.notify.
Now, let's start a server and verify that we get a serving event: Now, let's start a server and verify that we get a serving event:
>>> import ZEO.StorageServer, ZODB.MappingStorage >>> import ZEO
>>> server = ZEO.StorageServer.StorageServer( >>> addr, stop = ZEO.server()
... ('127.0.0.1', 0), {'1': ZODB.MappingStorage.MappingStorage()})
>>> isinstance(last_event, ZEO.StorageServer.Serving) >>> isinstance(last_event, ZEO.StorageServer.Serving)
True True
>>> last_event.server is server
True
>>> last_event.address[0], last_event.address[1] > 0
('127.0.0.1', True)
If the host part pf the address passed to the constructor is not an >>> last_event.address == addr
empty string. then the server addr attribute is the same as the
address attribute of the event:
>>> server.addr == last_event.address
True True
Let's run the server in a thread, to make sure we can connect. >>> server = last_event.server
>>> server.addr == addr
True
>>> server.start_thread() Let's make sure we can connect.
>>> client = ZEO.client(last_event.address) >>> client = ZEO.client(last_event.address).close()
>>> client.is_connected()
True
If we close the server, we'll get a closed event: If we close the server, we'll get a closed event:
>>> server.close() >>> stop()
>>> isinstance(last_event, ZEO.StorageServer.Closed) >>> isinstance(last_event, ZEO.StorageServer.Closed)
True True
>>> last_event.server is server >>> last_event.server is server
True True
>>> wait_until(lambda : not client.is_connected(test=True))
>>> client.close()
If we pass an empty string as the host part of the server address, we If we pass an empty string as the host part of the server address, we
can't really assign a single address, so the server addr attribute is can't really assign a single address, so the server addr attribute is
left alone: left alone:
>>> server = ZEO.StorageServer.StorageServer( >>> addr, stop = ZEO.server(port=('', 0))
... ('', 0), {'1': ZODB.MappingStorage.MappingStorage()})
>>> isinstance(last_event, ZEO.StorageServer.Serving) >>> isinstance(last_event, ZEO.StorageServer.Serving)
True True
>>> last_event.server is server
True
>>> last_event.address[1] > 0 >>> last_event.address[1] > 0
True True
If the host part pf the address passed to the constructor is not an >>> last_event.server.addr
empty string. then the server addr attribute is the same as the
address attribute of the event:
>>> server.addr
('', 0) ('', 0)
>>> server.close() >>> stop()
The runzeo module provides some process support, including getting the The runzeo module provides some process support, including getting the
server configuration via a ZConfig configuration file. To spell a server configuration via a ZConfig configuration file. To spell a
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
############################################################################## ##############################################################################
"""Library for forking storage server and connecting client storage""" """Library for forking storage server and connecting client storage"""
from __future__ import print_function from __future__ import print_function
import gc
import os import os
import random import random
import sys import sys
...@@ -25,7 +26,7 @@ import tempfile ...@@ -25,7 +26,7 @@ import tempfile
import six import six
import ZODB.tests.util import ZODB.tests.util
import zope.testing.setupstack import zope.testing.setupstack
from ZEO._compat import BytesIO from ZEO._compat import StringIO
logger = logging.getLogger('ZEO.tests.forker') logger = logging.getLogger('ZEO.tests.forker')
...@@ -40,9 +41,6 @@ class ZEOConfig: ...@@ -40,9 +41,6 @@ class ZEOConfig:
addr = '%s:%s' % addr addr = '%s:%s' % addr
self.address = addr self.address = addr
self.read_only = None self.read_only = None
self.invalidation_queue_size = None
self.invalidation_age = None
self.transaction_timeout = None
self.loglevel = 'INFO' self.loglevel = 'INFO'
def dump(self, f): def dump(self, f):
...@@ -50,13 +48,16 @@ class ZEOConfig: ...@@ -50,13 +48,16 @@ class ZEOConfig:
print("address " + self.address, file=f) print("address " + self.address, file=f)
if self.read_only is not None: if self.read_only is not None:
print("read-only", self.read_only and "true" or "false", file=f) print("read-only", self.read_only and "true" or "false", file=f)
if self.invalidation_queue_size is not None:
print("invalidation-queue-size", for name in (
self.invalidation_queue_size, file=f) 'invalidation_queue_size', 'invalidation_age',
if self.invalidation_age is not None: 'transaction_timeout', 'pid_filename',
print("invalidation-age", self.invalidation_age, file=f) 'ssl_certificate', 'ssl_key',
if self.transaction_timeout is not None: ):
print("transaction-timeout", self.transaction_timeout, file=f) v = getattr(self, name, None)
if v:
print(name.replace('_', '-'), v, file=f)
print("</zeo>", file=f) print("</zeo>", file=f)
print(""" print("""
...@@ -69,9 +70,9 @@ class ZEOConfig: ...@@ -69,9 +70,9 @@ class ZEOConfig:
""" % (self.loglevel, self.logpath), file=f) """ % (self.loglevel, self.logpath), file=f)
def __str__(self): def __str__(self):
f = BytesIO() f = StringIO()
self.dump(f) self.dump(f)
return f.getvalue().decode() return f.getvalue()
def encode_format(fmt): def encode_format(fmt):
...@@ -83,7 +84,7 @@ def encode_format(fmt): ...@@ -83,7 +84,7 @@ def encode_format(fmt):
return fmt return fmt
def runner(config, qin, qout, timeout=None, def runner(config, qin, qout, timeout=None,
join_timeout=9, debug=False, name=None, debug=False, name=None,
keep=False, protocol=None): keep=False, protocol=None):
if debug: if debug:
...@@ -124,11 +125,7 @@ def runner(config, qin, qout, timeout=None, ...@@ -124,11 +125,7 @@ def runner(config, qin, qout, timeout=None,
except Empty: except Empty:
pass pass
server.server.close() server.server.close()
thread.join(join_timeout) thread.join(3)
if thread.is_alive():
logger.warning("server thread didn't stop")
else:
logger.debug('server thread stopped')
if not keep: if not keep:
# Try to cleanup storage files # Try to cleanup storage files
...@@ -138,10 +135,11 @@ def runner(config, qin, qout, timeout=None, ...@@ -138,10 +135,11 @@ def runner(config, qin, qout, timeout=None,
except AttributeError: except AttributeError:
pass pass
qout.put('stopped') qout.put(thread.is_alive())
qin.get(timeout=11) # ack
if hasattr(qout, 'close'): if hasattr(qout, 'close'):
qout.close() qout.close()
qout.join_thread() qout.cancel_join_thread()
except Exception: except Exception:
logger.exception("In server thread") logger.exception("In server thread")
...@@ -153,12 +151,25 @@ def runner(config, qin, qout, timeout=None, ...@@ -153,12 +151,25 @@ def runner(config, qin, qout, timeout=None,
def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None): def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None):
qin.put('stop') qin.put('stop')
if hasattr(qin, 'close'): dirty = qout.get(timeout=stop_timeout)
qin.close() qin.put('ack')
qin.join_thread() if dirty:
qout.get(timeout=stop_timeout) print("WARNING SERVER DIDN'T STOP CLEANLY", file=sys.stderr)
# The runner thread didn't stop. If it was a process,
# give it some time to exit
if hasattr(thread, 'pid') and thread.pid:
os.waitpid(thread.pid)
else:
# Gaaaa, force gc in hopes of maybe getting the unclosed
# sockets to get GCed
gc.collect()
thread.join(stop_timeout) thread.join(stop_timeout)
os.remove(config) os.remove(config)
if hasattr(qin, 'close'):
qin.close()
qin.cancel_join_thread()
def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
path='Data.fs', protocol=None, blob_dir=None, path='Data.fs', protocol=None, blob_dir=None,
...@@ -181,25 +192,24 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, ...@@ -181,25 +192,24 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
storage_conf = '<blobstorage>\nblob-dir %s\n%s\n</blobstorage>' % ( storage_conf = '<blobstorage>\nblob-dir %s\n%s\n</blobstorage>' % (
blob_dir, storage_conf) blob_dir, storage_conf)
if port is None: if zeo_conf is None or isinstance(zeo_conf, dict):
raise AssertionError("The port wasn't specified") if port is None:
raise AssertionError("The port wasn't specified")
if isinstance(port, int): if isinstance(port, int):
addr = 'localhost', port addr = 'localhost', port
else: else:
addr = port addr = port
adminaddr = port+'-test'
if zeo_conf is None or isinstance(zeo_conf, dict):
z = ZEOConfig(addr) z = ZEOConfig(addr)
if zeo_conf: if zeo_conf:
z.__dict__.update(zeo_conf) z.__dict__.update(zeo_conf)
zeo_conf = z zeo_conf = str(z)
# Store the config info in a temp file. # Store the config info in a temp file.
tmpfile = tempfile.mktemp(".conf", dir=os.getcwd()) tmpfile = tempfile.mktemp(".conf", dir=os.getcwd())
fp = open(tmpfile, 'w') fp = open(tmpfile, 'w')
zeo_conf.dump(fp) fp.write(str(zeo_conf) + '\n\n')
fp.write(storage_conf) fp.write(storage_conf)
fp.close() fp.close()
...@@ -222,7 +232,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, ...@@ -222,7 +232,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
thread.start() thread.start()
addr = qout.get(timeout=start_timeout) addr = qout.get(timeout=start_timeout)
def stop(stop_timeout=9): def stop(stop_timeout=99):
stop_runner(thread, tmpfile, qin, qout, stop_timeout) stop_runner(thread, tmpfile, qin, qout, stop_timeout)
return addr, stop return addr, stop
......
...@@ -2,12 +2,10 @@ You can change the address(es) of a client storaage. ...@@ -2,12 +2,10 @@ You can change the address(es) of a client storaage.
We'll start by setting up a server and connecting to it: We'll start by setting up a server and connecting to it:
>>> import ZEO, ZEO.StorageServer, ZODB.FileStorage, transaction >>> import ZEO, transaction
>>> server = ZEO.StorageServer.StorageServer(
... ('127.0.0.1', 0), {'1': ZODB.FileStorage.FileStorage('t.fs')})
>>> server.start_thread()
>>> conn = ZEO.connection(server.addr, max_disconnect_poll=0.1) >>> addr, stop = ZEO.server(path='test.fs')
>>> conn = ZEO.connection(addr)
>>> client = conn.db().storage >>> client = conn.db().storage
>>> client.is_connected() >>> client.is_connected()
True True
...@@ -18,30 +16,25 @@ We'll start by setting up a server and connecting to it: ...@@ -18,30 +16,25 @@ We'll start by setting up a server and connecting to it:
Now we'll close the server: Now we'll close the server:
>>> server.close() >>> stop()
And wait for the connectin to notice it's disconnected: And wait for the connectin to notice it's disconnected:
>>> wait_until(lambda : not client.is_connected()) >>> wait_until(lambda : not client.is_connected())
Now, we'll restart the server and update the connection: Now, we'll restart the server:
>>> server = ZEO.StorageServer.StorageServer(
... ('127.0.0.1', 0), {'1': ZODB.FileStorage.FileStorage('t.fs')})
>>> server.start_thread()
>>> client.new_addr(server.addr)
>>> addr, stop = ZEO.server(path='test.fs')
Update with another client: Update with another client:
>>> conn2 = ZEO.connection(server.addr) >>> conn2 = ZEO.connection(addr)
>>> conn2.root.x += 1 >>> conn2.root.x += 1
>>> transaction.commit() >>> transaction.commit()
Wait for connect: Update the connection and wait for connect:
>>> client.new_addr(addr)
>>> wait_until(lambda : client.is_connected()) >>> wait_until(lambda : client.is_connected())
>>> _ = transaction.begin() >>> _ = transaction.begin()
>>> conn.root() >>> conn.root()
...@@ -51,4 +44,4 @@ Wait for connect: ...@@ -51,4 +44,4 @@ Wait for connect:
>>> conn.close() >>> conn.close()
>>> conn2.close() >>> conn2.close()
>>> server.close() >>> stop()
-----BEGIN CERTIFICATE-----
MIID/TCCAuWgAwIBAgIJAKpYWt7G+3R4MA0GCSqGSIb3DQEBBQUAMFwxCzAJBgNV
BAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9EQjERMA8GA1UEAxMIem9k
Yi5vcmcxHjAcBgkqhkiG9w0BCQEWD3NlcnZlckB6b2RiLm9yZzAeFw0xNjA2MjMx
NTA5MTZaFw0xNzA2MjMxNTA5MTZaMFwxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJW
QTENMAsGA1UEChMEWk9EQjERMA8GA1UEAxMIem9kYi5vcmcxHjAcBgkqhkiG9w0B
CQEWD3NlcnZlckB6b2RiLm9yZzCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
ggEBANqyMazB39wFKFh2nIspOw+e/7LPLQZpsd6BnYOxPbdlhb23Q930WqgW5qCA
aJjLqkmvQvZhElXdO2lOxGLR2/Cu71UgmYXkZbMKqPNtoYCPRPCCKm5EczAFXTy4
SUD40wAlndP7J8TjpIaKZjgdfy4GWnGQQAbXUR1eFZszQtU7pUvyjgXsY+BEgq4h
F4iIBarcf8k/6PTldTRLxEbRiTNZ4cdIEtTJL/LzSu5oBw8Z3J05aPg9DcWVl89P
XtemKxU8+Vm847xXJkuODkpSgOCqAPn959b/DGdn1fyx2CR0lSgC4n2rYE/oWxw0
hiyI1jeXTAfOtqvcg4XA5Cs3ivUCAwEAAaOBwTCBvjAdBgNVHQ4EFgQUoWgKmGKI
o2U9vFK48cyXuZrmPdgwgY4GA1UdIwSBhjCBg4AUoWgKmGKIo2U9vFK48cyXuZrm
PdihYKReMFwxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9E
QjERMA8GA1UEAxMIem9kYi5vcmcxHjAcBgkqhkiG9w0BCQEWD3NlcnZlckB6b2Ri
Lm9yZ4IJAKpYWt7G+3R4MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADggEB
ACiGzPx1445EdyO4Em3M6MwPQD8KVLvxsYE5HQMk0A2BTJOs2Vzf0NPL2hn947D/
wFe/ubkJXmzPG2CBCbNA//exWj8x2rR8256qJuWDwFx0WUFpRmEVFUGK+bpvgxQV
DlLxnWV8z6Tq9vINRjKT3mcBX4OpDgbEL4O92pbJ7kZNn4Z2+v6/lsWzg3Eo6LVY
fj902gQD/6hjVenD6J5Dqftj4x9nsKjdMz8n5Ok5E1J02ghiWjlUp1PNUKwc2Elw
oFnjPiacbko0fSnD9Zf6qBbACAYyBkHvBc1ZMebnGepZn3E6V91X5kZl84hGzgsb
C+2aGtAqSnvL4/DlIyss3hc=
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA2rIxrMHf3AUoWHaciyk7D57/ss8tBmmx3oGdg7E9t2WFvbdD
3fRaqBbmoIBomMuqSa9C9mESVd07aU7EYtHb8K7vVSCZheRlswqo822hgI9E8IIq
bkRzMAVdPLhJQPjTACWd0/snxOOkhopmOB1/LgZacZBABtdRHV4VmzNC1TulS/KO
Bexj4ESCriEXiIgFqtx/yT/o9OV1NEvERtGJM1nhx0gS1Mkv8vNK7mgHDxncnTlo
+D0NxZWXz09e16YrFTz5WbzjvFcmS44OSlKA4KoA+f3n1v8MZ2fV/LHYJHSVKALi
fatgT+hbHDSGLIjWN5dMB862q9yDhcDkKzeK9QIDAQABAoIBACtwA0/V/jm8SIQx
ouw9Fz8GDLGeVsoUSkDwq7GRjbmUj5jcAr3eH/eM/OfaOWxH353dEsbPBw5I79j9
zSH3nuDSTjUxUWz3rX9/WYloOBDJ5B6FLBpUvDBIkHlT/TDLe1VnI08Mbpy7vlz+
tkjlCvLATkyKIz14nOLhYhc+ekLRxQZrVRgHIPW13c0F61drkc4uCs/UMbYRzZiZ
nnMeQLghIUP13xMtMMkNx0P1ampvV18kCJWuvHUqOMnCaPnTGCnGfOUtq98sTdui
vBnleePmBzF5VGT7M3e3pr63EKyVPD/bx2dbItcxOahBTgDbMQR4AQ65NvD5+B0+
d9XbfgECgYEA7wLsMvFAmj5IrXYGYnaqy1LsMiZll9zv4QB2tAKLSIy5z0Cns54R
ttI6Ni94CfjBOKmR5IZXjgXZg5ydu+tBLwqNMJISziFSyTxzwNsAEPczUHefFhwq
zhAkIVsISy42AxpuyHlz5EBSJiUULhguUhmIDxCbQnmlSK6X9MxYO3UCgYEA6j2c
dfU4RC2tzjNKoKs52mUoWmwFsM5CKs0hDeRn2KV80nNKn/mRa0cZ36u/x0hN3fBm
wrwNiZWOX+ot5SfnePx0dOiTOxfWYeXtzVqF6KhPUK8K5424zL+N1uw941gvkXYR
Cc79AxDI/S5y/2ZR8aW4H91LdCcvPlE4Dp1JoYECgYAuE2gpYezMT1l/ZxNQBARk
8fVqrZBEOGld/NLlXOAw+kAPvi0WKVDM57YlH/2KHpRRMg9X+LYEQQhvoM+fnHiS
cvxI8sABUNc+yBKgiRd4Lc+MoaLfhkqSMvZkH8J3i88Jxhy5NQCsbeHoTJmZUTwM
w7NBBDiKFh1Q56ePn50ayQKBgQCA8guoT6Z6uZ6dDVU+nyOI2vjc1exICTMZdrSE
fkDAXVEaVMc2y17G7GwM2fIHlQDwdP9ModLd80td937uUAo3atn85W7vL88fM0C2
M+fVTJnk84cQMs8RPz2om4HyHcCJ1bHJcX2Ma3gJD8HUYJIpcS2rtNlthoiWSIWQ
XfuDgQKBgQCYW+x52XSaTjE0SviYmeNuO30bELgjxPyPLge407EAp1h6Sr9Za5ap
2aQsClsuzpHcHDoWmzO+dTr70Qyt/YoY5FN3uZRG9m7X/OH7+ceBEU/BtQ0Whem4
YNE5lpuw4eGsJQGkhRFcqKy66gId7Sw5b6CIaJqbhQCogkWt5JQuWQ==
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIID8DCCAtigAwIBAgIJALop9P9MBfzLMA0GCSqGSIb3DQEBBQUAMFgxCzAJBgNV
BAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9EQjERMA8GA1UEAxMIem9k
Yi5vcmcxGjAYBgkqhkiG9w0BCQEWC3B3QHpvZGIub3JnMB4XDTE2MDYyMzE1MTAz
MVoXDTE3MDYyMzE1MTAzMVowWDELMAkGA1UEBhMCVVMxCzAJBgNVBAgTAlZBMQ0w
CwYDVQQKEwRaT0RCMREwDwYDVQQDEwh6b2RiLm9yZzEaMBgGCSqGSIb3DQEJARYL
cHdAem9kYi5vcmcwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDKw/iw
N1EPddU9QQQ+OnCJv9G3rbTOPt4zEbpfTROIHTME3krFKPALrGF2aK+oBpHx3/TZ
HN5UvWK/jmGtDL9jekKCAaeAaVIKlESUS6DIxZY+FaO3re/1fbmBNRz8Cnn1raAw
/4YZRDPvblooH4Nt5m7uooGAIIDPft3fInhmGboOoIpXc7nMGVGOWXlDN5I9oFmm
4vby4CUMy3A/0wnHgTuMNy7Tpjgz2E/1MRAOyWQ7PZYiASs4ycZfas8058O8DI+o
rSYyum/czecIz52P6jbx5LWvcKDWac8QbJoHPelthYtxcMHee2+Nh6MWW688CBzq
HSeFAdNO3d9kMiFpAgMBAAGjgbwwgbkwHQYDVR0OBBYEFDui1OC2+2z2rHADglk5
tGOndxhoMIGJBgNVHSMEgYEwf4AUO6LU4Lb7bPascAOCWTm0Y6d3GGihXKRaMFgx
CzAJBgNVBAYTAlVTMQswCQYDVQQIEwJWQTENMAsGA1UEChMEWk9EQjERMA8GA1UE
AxMIem9kYi5vcmcxGjAYBgkqhkiG9w0BCQEWC3B3QHpvZGIub3JnggkAuin0/0wF
/MswDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOCAQEAiEYO8MZ3OG8sqy9t
AtUZbv0aTsIUzy/QTUKKDUo8qwKNOylqyqGAZV0tZ5eCoqIGFAwRJBBymIizU3zH
U1k2MnYZMVi7uYwSy+qwg52+X7GLl/kaAfx8kNvpr274CuZQnLojJS+K8HtH5Pom
YD3gTO3OxGS4IS6uf6DD+mf+C9OBnTl47P0HA0/eHBEXVSc2vsv30H/UoW5VbZ6z
6TWkoPwSMVhCNRRRif4/eqCLh24/h5b4uvAC+tsrIPQ9If7EsqVTNMCbAkv3ib6g
OmaCdbrGkqvD3UVn7i5ci96UZoF80EWNZiwhMdvQtMfOAR4jHQ1pTepJni6JwzZP
UMNDpQ==
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
Proc-Type: 4,ENCRYPTED
DEK-Info: DES-EDE3-CBC,769B900D03925712
z5M/XkqEC1+PxJ1T3QrUhGG9fPTBsPxKy8WlwIytbMg0RXS0oJBiFgNYp1ktqzGo
yT+AdTCRR1hNVX8M5HbV3ksUjKxXKCL3+yaaB6JtGbNRr2qTNwosvxD92nKT/hvN
R6rHF6LcO05s8ubs9b9ON/ja7HCx69N5CjBuCbCFHUTlAXkwD9w0ScrxrtfP50EY
FOw6LAqhhzq6/KO7c1SJ7k9LYzakhL+nbw5KM9QgBk4WHlmKLbCZIZ5RWvu0F4s5
n4qk/BcuXIkbYuEv2kH0nDk5eDfA/dj7xZcMMgL5VFymQzaZLYyj4WuQYXu/7JW/
nM/ZWBkZOMaI3vnPTG1hJ9pgjLjQnjfNA/bGWwbLxjCsPmR8yvZS4v2iqdB6X3Vl
yJ9aV9r8KoU0PJk3x4v2Zp+RQxgrKSaQw59sXptaXAY3NCRR6ohvq4P5X6UB8r5S
vYdoMeVXhX1hzXeMguln7zQInwJhPZqk4wMIV3lTsCqh1eJ7NC2TGCwble+B/ClR
KtzuJBzoYPLw44ltnXPEMmVH1Fkh17+QZFgZRJrKGD9PGOAXmnzudsZ1xX9kNnOM
JLIT/mzKcqkd8S1n1Xi1x7zGA0Ng5xmKGFe8oDokPJucJO1Ou+hbLDmC0DZUGzr1
qqPJ3F/DzZZDTmD/rZF6doPJgFAZvgpVeiWS8/v1qbz/nz13uwXDLjRPgLfcKpmQ
4R3V4QlgviDilW61VTZnzV9qAOx4fG6+IwWIGBlrJnfsH/fSCDNlAStc6k12zdun
PIIRJBfbEprGig3vRWUoBASReqow1JCN9DaVCX1P27pDKY5oDe+7/HOrQpwhPoya
2HEwbKeyY0nCcCXbkWGL1bwEUs/PrJv+61rik4KxOWhKpHWkZLzbozELb44jXrJx
e8K8XKz4La2DEjsUYHc31u6T69GBQO9JDEvih15phUWq8ITvDnkHpAg+wYb1JAHD
QcqDtAulMvT/ZGN0h7qdwbHMggEsLgCCVPG4iZ5K4cXsMbePFvQqq+o4FTMF+cM5
2Dq0wir92U9cH+ooy80LIt5Kp5zqgQZzr73o9MEgwqJocCrx9ZrofKRUmTV+ZU0r
w5mfUM47Ctnqia0UNGx6SUs3CHFDPWPbzrAaqGzSvFhzR1MMoL1/rJzP1VSm3Fk3
ESWkPrg0J8dcQP/ch9MhH8eoQYyA+2q1vClUbeZLAs5KoHxgi6pSkGYqFhshrA+t
2AIrUPDPPDf0PgRoXJrzdVOiNNY1rzyql+0JqDH6DjCVcAADWY+48p9U2YFTd7Je
DvnZWihwe0qYGn1AKIkvJ4SR3bQg36etrxhMrMl/8lUn2dnT7GFrhjr9HwCpJwa7
8tv150SrQXt3FXZCHb+RMUgoWZDeksDohPiGzXkPU6kaSviZVnRMslyU4ahWp6vC
8tYUhb7K6N+is1hYkICNt6zLl2vBDuCDWmiIwopHtnH1kz8bYlp4/GBVaMIgZiCM
gM/7+p4YCc++s2sJiQ9+BqPo0zKm3bbSP+fPpeWefQVte9Jx4S36YXU52HsJxBTN
WUdHABC+aS2A45I12xMNzOJR6VfxnG6f3JLpt3MkUCEg+898vJGope+TJUhD+aJC
-----END RSA PRIVATE KEY-----
...@@ -12,53 +12,108 @@ ...@@ -12,53 +12,108 @@
# #
############################################################################## ##############################################################################
import doctest
import tempfile
import unittest import unittest
import ZODB.config
import ZODB.tests.util from zope.testing import setupstack
from ZODB.tests.testConfig import ConfigTestBase from ZODB.config import storageFromString
class ZEOConfigTest(ConfigTestBase): from .forker import start_zeo_server
def test_zeo_config(self): class ZEOConfigTest(setupstack.TestCase):
# We're looking for a port that doesn't exist so a
# connection attempt will fail. Instead of elaborate setUp = setupstack.setUpDirectory
# logic to loop over a port calculation, we'll just pick a
# simple "random", likely to not-exist port number and add def start_server(self, settings='', **kw):
# an elaborate comment explaining this instead. Go ahead,
# grep for 9. for name, value in kw.items():
from ZEO.ClientStorage import ClientDisconnected settings += '\n%s %s\n' % (name.replace('_', '-'), value)
import ZConfig
from ZODB.config import getDbSchema zeo_conf = """
from ZEO._compat import StringIO <zeo>
cfg = """ address 127.0.0.1:0
<zodb> %s
<zeoclient> </zeo>
server localhost:56897 """ % settings
wait false return start_zeo_server("<mappingstorage>\n</mappingstorage>\n",
</zeoclient> zeo_conf, threaded=True)
</zodb>
""" def start_client(self, addr, settings='', **kw):
config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg)) settings += '\nserver %s:%s\n' % addr
self.assertEqual(config.database[0].config.storage.config.blob_dir, for name, value in kw.items():
None) settings += '\n%s %s\n' % (name.replace('_', '-'), value)
self.assertRaises(ClientDisconnected, self._test, cfg) return storageFromString(
"""
cfg = """ %import ZEO
<zodb>
<zeoclient> <clientstorage>
blob-dir blobs {}
server localhost:56897 </clientstorage>
wait false """.format(settings))
</zeoclient>
</zodb> def _client_assertions(
""" self, client, addr,
config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg)) connected=True,
self.assertEqual(config.database[0].config.storage.config.blob_dir, cache_size=20 * (1<<20),
'blobs') cache_path=None,
self.assertRaises(ClientDisconnected, self._test, cfg) blob_dir=None,
shared_blob_dir=False,
blob_cache_size=None,
blob_cache_size_check=10,
read_only=False,
read_only_fallback=False,
wait_timeout=30,
client_label=None,
storage='1',
name=None,
):
self.assertEqual(client.is_connected(), connected)
self.assertEqual(client._addr, [addr])
self.assertEqual(client._cache.maxsize, cache_size)
self.assertEqual(client._cache.path, cache_path)
self.assertEqual(client.blob_dir, blob_dir)
self.assertEqual(client.shared_blob_dir, shared_blob_dir)
self.assertEqual(client._blob_cache_size, blob_cache_size)
if blob_cache_size:
self.assertEqual(client._blob_cache_size_check,
blob_cache_size * blob_cache_size_check // 100)
self.assertEqual(client._is_read_only, read_only)
self.assertEqual(client._read_only_fallback, read_only_fallback)
self.assertEqual(client._server.timeout, wait_timeout)
self.assertEqual(client._client_label, client_label)
self.assertEqual(client._storage, storage)
self.assertEqual(client.__name__,
name if name is not None else str(client._addr))
def test_default_zeo_config(self, **client_settings):
addr, stop = self.start_server()
client = self.start_client(addr, **client_settings)
self._client_assertions(client, addr, **client_settings)
client.close()
stop()
def test_client_variations(self):
for name, value in dict(
cache_size=4200,
cache_path='test',
blob_dir='blobs',
blob_cache_size=424242,
read_only=True,
read_only_fallback=True,
wait_timeout=33,
client_label='test_client',
name='Test'
).items():
params = {name: value}
self.test_default_zeo_config(**params)
def test_blob_cache_size_check(self):
self.test_default_zeo_config(blob_cache_size=424242,
blob_cache_size_check=50)
def test_suite(): def test_suite():
return unittest.makeSuite(ZEOConfigTest) return unittest.makeSuite(ZEOConfigTest)
...@@ -79,6 +79,12 @@ class MappingStorageConnectionTests( ...@@ -79,6 +79,12 @@ class MappingStorageConnectionTests(
): ):
"""Mapping storage connection tests.""" """Mapping storage connection tests."""
class SSLConnectionTests(
MappingStorageConfig,
ConnectionTests.SSLConnectionTests,
):
pass
# The ReconnectionTests can't work with MappingStorage because it's only an # The ReconnectionTests can't work with MappingStorage because it's only an
# in-memory storage and has no persistent state. # in-memory storage and has no persistent state.
...@@ -88,6 +94,12 @@ class MappingStorageTimeoutTests( ...@@ -88,6 +94,12 @@ class MappingStorageTimeoutTests(
): ):
pass pass
class SSLConnectionTests(
MappingStorageConfig,
ConnectionTests.SSLConnectionTests,
):
pass
test_classes = [FileStorageConnectionTests, test_classes = [FileStorageConnectionTests,
FileStorageReconnectionTests, FileStorageReconnectionTests,
...@@ -95,6 +107,7 @@ test_classes = [FileStorageConnectionTests, ...@@ -95,6 +107,7 @@ test_classes = [FileStorageConnectionTests,
FileStorageTimeoutTests, FileStorageTimeoutTests,
MappingStorageConnectionTests, MappingStorageConnectionTests,
MappingStorageTimeoutTests, MappingStorageTimeoutTests,
SSLConnectionTests,
] ]
def invalidations_while_connecting(): def invalidations_while_connecting():
......
...@@ -39,6 +39,7 @@ import re ...@@ -39,6 +39,7 @@ import re
import shutil import shutil
import signal import signal
import stat import stat
import ssl
import sys import sys
import tempfile import tempfile
import threading import threading
...@@ -55,6 +56,8 @@ import ZODB.tests.util ...@@ -55,6 +56,8 @@ import ZODB.tests.util
import ZODB.utils import ZODB.utils
import zope.testing.setupstack import zope.testing.setupstack
from . import testssl
logger = logging.getLogger('ZEO.tests.testZEO') logger = logging.getLogger('ZEO.tests.testZEO')
class DummyDB: class DummyDB:
...@@ -99,7 +102,7 @@ class MiscZEOTests: ...@@ -99,7 +102,7 @@ class MiscZEOTests:
def checkZEOInvalidation(self): def checkZEOInvalidation(self):
addr = self._storage._addr addr = self._storage._addr
storage2 = self._wrap_client( storage2 = self._wrap_client(
ClientStorage(addr, wait=1, min_disconnect_poll=0.1)) ClientStorage(addr, wait=1, **self._client_options()))
try: try:
oid = self._storage.new_oid() oid = self._storage.new_oid()
ob = MinPO('first') ob = MinPO('first')
...@@ -128,13 +131,13 @@ class MiscZEOTests: ...@@ -128,13 +131,13 @@ class MiscZEOTests:
# Earlier, a ClientStorage would not have the last transaction id # Earlier, a ClientStorage would not have the last transaction id
# available right after successful connection, this is required now. # available right after successful connection, this is required now.
addr = self._storage._addr addr = self._storage._addr
storage2 = ClientStorage(addr) storage2 = ClientStorage(addr, **self._client_options())
self.assert_(storage2.is_connected()) self.assert_(storage2.is_connected())
self.assertEquals(ZODB.utils.z64, storage2.lastTransaction()) self.assertEquals(ZODB.utils.z64, storage2.lastTransaction())
storage2.close() storage2.close()
self._dostore() self._dostore()
storage3 = ClientStorage(addr) storage3 = ClientStorage(addr, **self._client_options())
self.assert_(storage3.is_connected()) self.assert_(storage3.is_connected())
self.assertEquals(8, len(storage3.lastTransaction())) self.assertEquals(8, len(storage3.lastTransaction()))
self.assertNotEquals(ZODB.utils.z64, storage3.lastTransaction()) self.assertNotEquals(ZODB.utils.z64, storage3.lastTransaction())
...@@ -164,26 +167,33 @@ class GenericTests( ...@@ -164,26 +167,33 @@ class GenericTests(
def setUp(self): def setUp(self):
StorageTestBase.StorageTestBase.setUp(self) StorageTestBase.StorageTestBase.setUp(self)
logger.info("setUp() %s", self.id()) logger.info("setUp() %s", self.id())
port = get_port(self)
zconf = forker.ZEOConfig(('', port))
zport, stop = forker.start_zeo_server(self.getConfig(), zport, stop = forker.start_zeo_server(self.getConfig(),
zconf, port) self.getZEOConfig())
self._servers = [stop] self._servers = [stop]
if not self.blob_cache_dir: if not self.blob_cache_dir:
# This is the blob cache for ClientStorage # This is the blob cache for ClientStorage
self.blob_cache_dir = tempfile.mkdtemp( self.blob_cache_dir = tempfile.mkdtemp(
'blob_cache', 'blob_cache',
dir=os.path.abspath(os.getcwd())) dir=os.path.abspath(os.getcwd()))
self._storage = self._wrap_client(ClientStorage( self._storage = self._wrap_client(
zport, '1', cache_size=20000000, ClientStorage(
min_disconnect_poll=0.5, wait=1, zport, '1', cache_size=20000000,
wait_timeout=60, blob_dir=self.blob_cache_dir, min_disconnect_poll=0.5, wait=1,
shared_blob_dir=self.shared_blob_dir)) wait_timeout=60, blob_dir=self.blob_cache_dir,
shared_blob_dir=self.shared_blob_dir,
**self._client_options()),
)
self._storage.registerDB(DummyDB()) self._storage.registerDB(DummyDB())
def getZEOConfig(self):
return forker.ZEOConfig(('', get_port(self)))
def _wrap_client(self, client): def _wrap_client(self, client):
return client return client
def _client_options(self):
return {}
def tearDown(self): def tearDown(self):
self._storage.close() self._storage.close()
for stop in self._servers: for stop in self._servers:
...@@ -204,7 +214,8 @@ class GenericTests( ...@@ -204,7 +214,8 @@ class GenericTests(
# cleaner way. # cleaner way.
addr = self._storage._addr addr = self._storage._addr
self._storage.close() self._storage.close()
self._storage = ClientStorage(addr, read_only=read_only, wait=1) self._storage = ClientStorage(
addr, read_only=read_only, wait=1, **self._client_options())
def checkWriteMethods(self): def checkWriteMethods(self):
# ReadOnlyStorage defines checkWriteMethods. The decision # ReadOnlyStorage defines checkWriteMethods. The decision
...@@ -223,7 +234,8 @@ class GenericTests( ...@@ -223,7 +234,8 @@ class GenericTests(
def _do_store_in_separate_thread(self, oid, revid, voted): def _do_store_in_separate_thread(self, oid, revid, voted):
def do_store(): def do_store():
store = ZEO.ClientStorage.ClientStorage(self._storage._addr) store = ZEO.ClientStorage.ClientStorage(
self._storage._addr, **self._client_options())
try: try:
t = transaction.get() t = transaction.get()
store.tpc_begin(t) store.tpc_begin(t)
...@@ -335,6 +347,16 @@ class FileStorageTests(FullGenericTests): ...@@ -335,6 +347,16 @@ class FileStorageTests(FullGenericTests):
self._storage._info['interfaces'] self._storage._info['interfaces']
) )
class FileStorageSSLTests(FileStorageTests):
def getZEOConfig(self):
return testssl.server_config
def _client_options(self):
return {'ssl': testssl.client_ssl()}
class FileStorageHexTests(FileStorageTests): class FileStorageHexTests(FileStorageTests):
_expected_interfaces = ( _expected_interfaces = (
('ZODB.interfaces', 'IStorageRestoreable'), ('ZODB.interfaces', 'IStorageRestoreable'),
...@@ -1066,7 +1088,7 @@ def runzeo_without_configfile(): ...@@ -1066,7 +1088,7 @@ def runzeo_without_configfile():
------ ------
--T INFO ZEO.StorageServer StorageServer created RW with storages 1RWt --T INFO ZEO.StorageServer StorageServer created RW with storages 1RWt
------ ------
--T INFO ZEO.acceptor listening on ... --T INFO ZEO.asyncio... listening on ...
------ ------
--T INFO ZEO.StorageServer closing storage '1' --T INFO ZEO.StorageServer closing storage '1'
testing exit immediately testing exit immediately
...@@ -1447,7 +1469,8 @@ def quick_close_doesnt_kill_server(): ...@@ -1447,7 +1469,8 @@ def quick_close_doesnt_kill_server():
Start a server: Start a server:
>>> addr, _ = start_server() >>> from .testssl import server_config, client_ssl
>>> addr, _ = start_server(zeo_conf=server_config)
Now connect and immediately disconnect. This caused the server to Now connect and immediately disconnect. This caused the server to
die in the past: die in the past:
...@@ -1460,9 +1483,12 @@ def quick_close_doesnt_kill_server(): ...@@ -1460,9 +1483,12 @@ def quick_close_doesnt_kill_server():
... s.connect(addr) ... s.connect(addr)
... s.close() ... s.close()
>>> print("\n\nXXX WARNING: running quick_close_doesnt_kill_server with ssl as hack pending http://bugs.python.org/issue27386\n", file=sys.stderr) # Intentional long line to be annoying till this is fixed
Now we should be able to connect as normal: Now we should be able to connect as normal:
>>> db = ZEO.DB(addr) >>> db = ZEO.DB(addr, ssl=client_ssl())
>>> db.storage.is_connected() >>> db.storage.is_connected()
True True
...@@ -1485,7 +1511,8 @@ def can_use_empty_string_for_local_host_on_client(): ...@@ -1485,7 +1511,8 @@ def can_use_empty_string_for_local_host_on_client():
slow_test_classes = [ slow_test_classes = [
#BlobAdaptedFileStorageTests, BlobWritableCacheTests, #BlobAdaptedFileStorageTests, BlobWritableCacheTests,
MappingStorageTests, DemoStorageTests, MappingStorageTests, DemoStorageTests,
FileStorageTests, FileStorageHexTests, FileStorageClientHexTests, FileStorageTests, FileStorageSSLTests,
FileStorageHexTests, FileStorageClientHexTests,
] ]
quick_test_classes = [FileStorageRecoveryTests, ZRPCConnectionTests] quick_test_classes = [FileStorageRecoveryTests, ZRPCConnectionTests]
...@@ -1538,8 +1565,6 @@ class ServerManagingClientStorageForIExternalGCTest( ...@@ -1538,8 +1565,6 @@ class ServerManagingClientStorageForIExternalGCTest(
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
# Collect misc tests into their own layer to reduce size of
# unit test layer
zeo = unittest.TestSuite() zeo = unittest.TestSuite()
zeo.addTest(unittest.makeSuite(ZODB.tests.util.AAAA_Test_Runner_Hack)) zeo.addTest(unittest.makeSuite(ZODB.tests.util.AAAA_Test_Runner_Hack))
patterns = [ patterns = [
...@@ -1570,12 +1595,16 @@ def test_suite(): ...@@ -1570,12 +1595,16 @@ def test_suite():
"ClientDisconnected"), "ClientDisconnected"),
)), )),
)) ))
zeo.layer = ZODB.tests.util.MininalTestLayer('testZeo-misc')
suite.addTest(zeo)
zeo = unittest.TestSuite()
zeo.addTest( zeo.addTest(
doctest.DocFileSuite( doctest.DocFileSuite(
'zeo-fan-out.test', 'zdoptions.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt', 'client-config.test', 'drop_cache_rather_than_verify.txt', 'client-config.test',
'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt', 'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
'dynamic_server_ports.test', 'new_addr.test', '../nagios.rst', 'dynamic_server_ports.test', '../nagios.rst',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown, setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
checker=renormalizing.RENormalizing(patterns), checker=renormalizing.RENormalizing(patterns),
globs={'print_function': print_function}, globs={'print_function': print_function},
...@@ -1588,9 +1617,23 @@ def test_suite(): ...@@ -1588,9 +1617,23 @@ def test_suite():
)) ))
for klass in quick_test_classes: for klass in quick_test_classes:
zeo.addTest(unittest.makeSuite(klass, "check")) zeo.addTest(unittest.makeSuite(klass, "check"))
zeo.layer = ZODB.tests.util.MininalTestLayer('testZeo-misc') zeo.layer = ZODB.tests.util.MininalTestLayer('testZeo-misc2')
suite.addTest(zeo) suite.addTest(zeo)
# tests that often fail, maybe if they have their own layers
for name in 'zeo-fan-out.test', 'new_addr.test':
zeo = unittest.TestSuite()
zeo.addTest(
doctest.DocFileSuite(
name,
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
checker=renormalizing.RENormalizing(patterns),
globs={'print_function': print_function},
),
)
zeo.layer = ZODB.tests.util.MininalTestLayer('testZeo-' + name)
suite.addTest(zeo)
suite.addTest(unittest.makeSuite(MultiprocessingTests)) suite.addTest(unittest.makeSuite(MultiprocessingTests))
# Put the heavyweights in their own layers # Put the heavyweights in their own layers
......
This diff is collapsed.
"""SSL configuration support
"""
import os
import sys
def ssl_config(section, server):
import ssl
cafile = capath = None
auth = section.authenticate
if auth:
if os.path.isdir(auth):
capath=auth
else:
cafile=auth
context = ssl.create_default_context(
ssl.Purpose.CLIENT_AUTH, cafile=cafile, capath=capath)
if section.certificate:
password = section.password_function
if password:
module, name = password.rsplit('.', 1)
module = __import__(module, globals(), locals(), ['*'], 0)
password = getattr(module, name)
context.load_cert_chain(section.certificate, section.key, password)
context.verify_mode = ssl.CERT_REQUIRED
if sys.version_info >= (3, 4):
context.verify_flags |= ssl.VERIFY_X509_STRICT | (
context.cert_store_stats()['crl'] and ssl.VERIFY_CRL_CHECK_LEAF)
if server:
context.check_hostname = False
return context
context.check_hostname = section.check_hostname
return context, section.server_hostname
def server_ssl(section):
return ssl_config(section, True)
def client_ssl(section):
return ssl_config(section, False)
class ClientStorageConfig:
def __init__(self, config):
self.config = config
self.name = config.getSectionName()
def open(self):
from ZEO.ClientStorage import ClientStorage
# config.server is a multikey of socket-connection-address values
# where the value is a socket family, address tuple.
config = self.config
addresses = [server.address for server in config.server]
options = {}
if config.blob_cache_size is not None:
options['blob_cache_size'] = config.blob_cache_size
if config.blob_cache_size_check is not None:
options['blob_cache_size_check'] = config.blob_cache_size_check
if config.client_label is not None:
options['client_label'] = config.client_label
ssl = config.ssl
if ssl:
options['ssl'] = ssl[0]
options['ssl_server_hostname'] = ssl[1]
return ClientStorage(
addresses,
blob_dir=config.blob_dir,
shared_blob_dir=config.shared_blob_dir,
storage=config.storage,
cache_size=config.cache_size,
cache=config.cache_path,
name=config.name,
read_only=config.read_only,
read_only_fallback=config.read_only_fallback,
wait_timeout=config.wait_timeout,
**options)
...@@ -20,6 +20,7 @@ deps = ...@@ -20,6 +20,7 @@ deps =
zope.interface zope.interface
zope.testing zope.testing
zope.testrunner zope.testrunner
mock
[testenv:simple] [testenv:simple]
# Test that 'setup.py test' works # Test that 'setup.py test' works
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment