Commit b08a718b authored by Jim Fulton's avatar Jim Fulton

Make sure the ZEO 5 client works with the ZEO 4 server

Added an option to test the ZEO 5 client against the ZEO 4
server. This involved making a copy of the ZEO 4 server in
ZEO.tests.ZEO4 and disabling tests of ZEO5-specific features.

Testing revealed bugs in support for ZEO 4 servers, which were fixed.
parent 96659e2c
......@@ -14,6 +14,15 @@ matrix:
- os: linux
python: 3.5
env: ZEO_MTACCEPTOR=1
- os: linux
python: 2.7
env: ZEO4_SERVER=1
- os: linux
python: 3.4
env: ZEO4_SERVER=1
- os: linux
python: 3.5
env: ZEO4_SERVER=1
install:
- pip install -U setuptools
- python bootstrap.py
......
......@@ -949,8 +949,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
def serialnos(self, args):
"""Server callback to pass a list of changed (oid, serial) pairs.
"""
for oid, s in args:
self._tbuf.serial(oid, s)
self._tbuf.serialnos(args)
def info(self, dict):
"""Server callback to update the info dictionary."""
......
......@@ -92,3 +92,17 @@ class TransactionBuffer:
for oid in server_resolved:
if oid not in seen:
yield oid, None, True
# Support ZEO4:
def serialnos(self, args):
for oid in args:
if isinstance(oid, bytes):
self.server_resolved.add(oid)
else:
oid, serial = oid
if isinstance(serial, Exception):
self.exception = serial
elif serial == b'rs':
self.server_resolved.add(oid)
......@@ -599,13 +599,19 @@ class Client(object):
self.cache.setLastTid(tid)
def serialnos(self, serials):
# Method called by ZEO4 storage servers.
# Before delegating, check for errors (likely ConflictErrors)
# and invalidate the oids they're associated with. In the
# past, this was done by the client, but now we control the
# cache and this is our last chance, as the client won't call
# back into us when there's an error.
for oid, serial in serials:
if isinstance(serial, Exception):
for oid in serials:
if isinstance(oid, bytes):
self.cache.invalidate(oid, None)
else:
oid, serial = oid
if isinstance(serial, Exception) or serial == b'rs':
self.cache.invalidate(oid, None)
self.client.serialnos(serials)
......
======================
Copy of ZEO 4 server
======================
This copy was made by first converting the ZEO 4 server code to use
relative imports. The code was tested with ZEO 4 before copying. It
was unchanged aside from the relative imports.
The ZEO 4 server is used for tests if the ZEO4_SERVER environment
variable is set to a non-empty value.
This diff is collapsed.
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
_auth_modules = {}
def get_module(name):
if name == 'sha':
from auth_sha import StorageClass, SHAClient, Database
return StorageClass, SHAClient, Database
elif name == 'digest':
from .auth_digest import StorageClass, DigestClient, DigestDatabase
return StorageClass, DigestClient, DigestDatabase
else:
return _auth_modules.get(name)
def register_module(name, storage_class, client, db):
if name in _auth_modules:
raise TypeError("%s is already registred" % name)
_auth_modules[name] = storage_class, client, db
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Digest authentication for ZEO
This authentication mechanism follows the design of HTTP digest
authentication (RFC 2069). It is a simple challenge-response protocol
that does not send passwords in the clear, but does not offer strong
security. The RFC discusses many of the limitations of this kind of
protocol.
Guard the password database as if it contained plaintext passwords.
It stores the hash of a username and password. This does not expose
the plaintext password, but it is sensitive nonetheless. An attacker
with the hash can impersonate the real user. This is a limitation of
the simple digest scheme.
HTTP is a stateless protocol, and ZEO is a stateful protocol. The
security requirements are quite different as a result. The HTTP
protocol uses a nonce as a challenge. The ZEO protocol requires a
separate session key that is used for message authentication. We
generate a second nonce for this purpose; the hash of nonce and
user/realm/password is used as the session key.
TODO: I'm not sure if this is a sound approach; SRP would be preferred.
"""
import os
import random
import struct
import time
from .base import Database, Client
from ..StorageServer import ZEOStorage
from ZEO.Exceptions import AuthError
from ..hash import sha1
def get_random_bytes(n=8):
try:
b = os.urandom(n)
except NotImplementedError:
L = [chr(random.randint(0, 255)) for i in range(n)]
b = b"".join(L)
return b
def hexdigest(s):
return sha1(s.encode()).hexdigest()
class DigestDatabase(Database):
def __init__(self, filename, realm=None):
Database.__init__(self, filename, realm)
# Initialize a key used to build the nonce for a challenge.
# We need one key for the lifetime of the server, so it
# is convenient to store in on the database.
self.noncekey = get_random_bytes(8)
def _store_password(self, username, password):
dig = hexdigest("%s:%s:%s" % (username, self.realm, password))
self._users[username] = dig
def session_key(h_up, nonce):
# The hash itself is a bit too short to be a session key.
# HMAC wants a 64-byte key. We don't want to use h_up
# directly because it would never change over time. Instead
# use the hash plus part of h_up.
return (sha1(("%s:%s" % (h_up, nonce)).encode('latin-1')).digest() +
h_up.encode('utf-8')[:44])
class StorageClass(ZEOStorage):
def set_database(self, database):
assert isinstance(database, DigestDatabase)
self.database = database
self.noncekey = database.noncekey
def _get_time(self):
# Return a string representing the current time.
t = int(time.time())
return struct.pack("i", t)
def _get_nonce(self):
# RFC 2069 recommends a nonce of the form
# H(client-IP ":" time-stamp ":" private-key)
dig = sha1()
dig.update(str(self.connection.addr).encode('latin-1'))
dig.update(self._get_time())
dig.update(self.noncekey)
return dig.hexdigest()
def auth_get_challenge(self):
"""Return realm, challenge, and nonce."""
self._challenge = self._get_nonce()
self._key_nonce = self._get_nonce()
return self.auth_realm, self._challenge, self._key_nonce
def auth_response(self, resp):
# verify client response
user, challenge, response = resp
# Since zrpc is a stateful protocol, we just store the nonce
# we sent to the client. It will need to generate a new
# nonce for a new connection anyway.
if self._challenge != challenge:
raise ValueError("invalid challenge")
# lookup user in database
h_up = self.database.get_password(user)
# regeneration resp from user, password, and nonce
check = hexdigest("%s:%s" % (h_up, challenge))
if check == response:
self.connection.setSessionKey(session_key(h_up, self._key_nonce))
return self._finish_auth(check == response)
extensions = [auth_get_challenge, auth_response]
class DigestClient(Client):
extensions = ["auth_get_challenge", "auth_response"]
def start(self, username, realm, password):
_realm, challenge, nonce = self.stub.auth_get_challenge()
if _realm != realm:
raise AuthError("expected realm %r, got realm %r"
% (_realm, realm))
h_up = hexdigest("%s:%s:%s" % (username, realm, password))
resp_dig = hexdigest("%s:%s" % (h_up, challenge))
result = self.stub.auth_response((username, challenge, resp_dig))
if result:
return session_key(h_up, nonce)
else:
return None
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Base classes for defining an authentication protocol.
Database -- abstract base class for password database
Client -- abstract base class for authentication client
"""
from __future__ import print_function
from __future__ import print_function
import os
from ..hash import sha1
class Client:
# Subclass should override to list the names of methods that
# will be called on the server.
extensions = []
def __init__(self, stub):
self.stub = stub
for m in self.extensions:
setattr(self.stub, m, self.stub.extensionMethod(m))
def sort(L):
"""Sort a list in-place and return it."""
L.sort()
return L
class Database:
"""Abstracts a password database.
This class is used both in the authentication process (via
get_password()) and by client scripts that manage the password
database file.
The password file is a simple, colon-separated text file mapping
usernames to password hashes. The hashes are SHA hex digests
produced from the password string.
"""
realm = None
def __init__(self, filename, realm=None):
"""Creates a new Database
filename: a string containing the full pathname of
the password database file. Must be readable by the user
running ZEO. Must be writeable by any client script that
accesses the database.
realm: the realm name (a string)
"""
self._users = {}
self.filename = filename
self.load()
if realm:
if self.realm and self.realm != realm:
raise ValueError("Specified realm %r differs from database "
"realm %r" % (realm or '', self.realm))
else:
self.realm = realm
def save(self, fd=None):
filename = self.filename
needs_closed = False
if not fd:
fd = open(filename, 'w')
needs_closed = True
try:
if self.realm:
print("realm", self.realm, file=fd)
for username in sorted(self._users.keys()):
print("%s: %s" % (username, self._users[username]), file=fd)
finally:
if needs_closed:
fd.close()
def load(self):
filename = self.filename
if not filename:
return
if not os.path.exists(filename):
return
with open(filename) as fd:
L = fd.readlines()
if not L:
return
if L[0].startswith("realm "):
line = L.pop(0).strip()
self.realm = line[len("realm "):]
for line in L:
username, hash = line.strip().split(":", 1)
self._users[username] = hash.strip()
def _store_password(self, username, password):
self._users[username] = self.hash(password)
def get_password(self, username):
"""Returns password hash for specified username.
Callers must check for LookupError, which is raised in
the case of a non-existent user specified."""
if username not in self._users:
raise LookupError("No such user: %s" % username)
return self._users[username]
def hash(self, s):
return sha1(s.encode()).hexdigest()
def add_user(self, username, password):
if username in self._users:
raise LookupError("User %s already exists" % username)
self._store_password(username, password)
def del_user(self, username):
if username not in self._users:
raise LookupError("No such user: %s" % username)
del self._users[username]
def change_password(self, username, password):
if username not in self._users:
raise LookupError("No such user: %s" % username)
self._store_password(username, password)
"""HMAC (Keyed-Hashing for Message Authentication) Python module.
Implements the HMAC algorithm as described by RFC 2104.
"""
from six.moves import map
from six.moves import zip
def _strxor(s1, s2):
"""Utility method. XOR the two strings s1 and s2 (must have same length).
"""
return "".join(map(lambda x, y: chr(ord(x) ^ ord(y)), s1, s2))
# The size of the digests returned by HMAC depends on the underlying
# hashing module used.
digest_size = None
class HMAC:
"""RFC2104 HMAC class.
This supports the API for Cryptographic Hash Functions (PEP 247).
"""
def __init__(self, key, msg = None, digestmod = None):
"""Create a new HMAC object.
key: key for the keyed hash object.
msg: Initial input for the hash, if provided.
digestmod: A module supporting PEP 247. Defaults to the md5 module.
"""
if digestmod is None:
import md5
digestmod = md5
self.digestmod = digestmod
self.outer = digestmod.new()
self.inner = digestmod.new()
self.digest_size = digestmod.digest_size
blocksize = 64
ipad = "\x36" * blocksize
opad = "\x5C" * blocksize
if len(key) > blocksize:
key = digestmod.new(key).digest()
key = key + chr(0) * (blocksize - len(key))
self.outer.update(_strxor(key, opad))
self.inner.update(_strxor(key, ipad))
if msg is not None:
self.update(msg)
## def clear(self):
## raise NotImplementedError("clear() method not available in HMAC.")
def update(self, msg):
"""Update this hashing object with the string msg.
"""
self.inner.update(msg)
def copy(self):
"""Return a separate copy of this hashing object.
An update to this copy won't affect the original object.
"""
other = HMAC("")
other.digestmod = self.digestmod
other.inner = self.inner.copy()
other.outer = self.outer.copy()
return other
def digest(self):
"""Return the hash value of this hashing object.
This returns a string containing 8-bit data. The object is
not altered in any way by this function; you can continue
updating the object after calling this function.
"""
h = self.outer.copy()
h.update(self.inner.digest())
return h.digest()
def hexdigest(self):
"""Like digest(), but returns a string of hexadecimal digits instead.
"""
return "".join([hex(ord(x))[2:].zfill(2)
for x in tuple(self.digest())])
def new(key, msg = None, digestmod = None):
"""Create a new hashing object and return it.
key: The starting key for the hash.
msg: if available, will immediately be hashed into the object's starting
state.
You can now feed arbitrary strings into the object using its update()
method, and can ask for the hash value at any time by calling its digest()
method.
"""
return HMAC(key, msg, digestmod)
<component>
<sectiontype name="zeo">
<description>
The content of a ZEO section describe operational parameters
of a ZEO server except for the storage(s) to be served.
</description>
<key name="address" datatype="socket-binding-address"
required="yes">
<description>
The address at which the server should listen. This can be in
the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
</description>
</key>
<key name="read-only" datatype="boolean"
required="no"
default="false">
<description>
Flag indicating whether the server should operate in read-only
mode. Defaults to false. Note that even if the server is
operating in writable mode, individual storages may still be
read-only. But if the server is in read-only mode, no write
operations are allowed, even if the storages are writable. Note
that pack() is considered a read-only operation.
</description>
</key>
<key name="invalidation-queue-size" datatype="integer"
required="no"
default="100">
<description>
The storage server keeps a queue of the objects modified by the
last N transactions, where N == invalidation_queue_size. This
queue is used to speed client cache verification when a client
disconnects for a short period of time.
</description>
</key>
<key name="invalidation-age" datatype="float" required="no">
<description>
The maximum age of a client for which quick-verification
invalidations will be provided by iterating over the served
storage. This option should only be used if the served storage
supports efficient iteration from a starting point near the
end of the transaction history (e.g. end of file).
</description>
</key>
<key name="monitor-address" datatype="socket-binding-address"
required="no">
<description>
The address at which the monitor server should listen. If
specified, a monitor server is started. The monitor server
provides server statistics in a simple text format. This can
be in the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
</description>
</key>
<key name="transaction-timeout" datatype="integer"
required="no">
<description>
The maximum amount of time to wait for a transaction to commit
after acquiring the storage lock, specified in seconds. If the
transaction takes too long, the client connection will be closed
and the transaction aborted.
</description>
</key>
<key name="authentication-protocol" required="no">
<description>
The name of the protocol used for authentication. The
only protocol provided with ZEO is "digest," but extensions
may provide other protocols.
</description>
</key>
<key name="authentication-database" required="no">
<description>
The path of the database containing authentication credentials.
</description>
</key>
<key name="authentication-realm" required="no">
<description>
The authentication realm of the server. Some authentication
schemes use a realm to identify the logical set of usernames
that are accepted by this server.
</description>
</key>
<key name="pid-filename" datatype="existing-dirpath"
required="no">
<description>
The full path to the file in which to write the ZEO server's Process ID
at startup. If omitted, $INSTANCE/var/ZEO.pid is used.
</description>
<metadefault>$INSTANCE/var/ZEO.pid (or $clienthome/ZEO.pid)</metadefault>
</key>
<!-- DM 2006-06-12: added option -->
<key name="drop-cache-rather-verify" datatype="boolean"
required="no" default="false">
<description>
indicates that the cache should be dropped rather than
verified when the verification optimization is not
available (e.g. when the ZEO server restarted).
</description>
</key>
</sectiontype>
</component>
##############################################################################
#
# Copyright (c) 2008 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""In Python 2.6, the "sha" and "md5" modules have been deprecated
in favor of using hashlib for both. This class allows for compatibility
between versions."""
try:
import hashlib
sha1 = hashlib.sha1
new = sha1
except ImportError:
import sha
sha1 = sha.new
new = sha1
digest_size = sha.digest_size
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Monitor behavior of ZEO server and record statistics.
"""
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
import asyncore
import socket
import time
import logging
zeo_version = 'unknown'
try:
import pkg_resources
except ImportError:
pass
else:
zeo_dist = pkg_resources.working_set.find(
pkg_resources.Requirement.parse('ZODB3')
)
if zeo_dist is not None:
zeo_version = zeo_dist.version
class StorageStats:
"""Per-storage usage statistics."""
def __init__(self, connections=None):
self.connections = connections
self.loads = 0
self.stores = 0
self.commits = 0
self.aborts = 0
self.active_txns = 0
self.verifying_clients = 0
self.lock_time = None
self.conflicts = 0
self.conflicts_resolved = 0
self.start = time.ctime()
@property
def clients(self):
return len(self.connections)
def parse(self, s):
# parse the dump format
lines = s.split("\n")
for line in lines:
field, value = line.split(":", 1)
if field == "Server started":
self.start = value
elif field == "Clients":
# Hack because we use this both on the server and on
# the client where there are no connections.
self.connections = [0] * int(value)
elif field == "Clients verifying":
self.verifying_clients = int(value)
elif field == "Active transactions":
self.active_txns = int(value)
elif field == "Commit lock held for":
# This assumes
self.lock_time = time.time() - int(value)
elif field == "Commits":
self.commits = int(value)
elif field == "Aborts":
self.aborts = int(value)
elif field == "Loads":
self.loads = int(value)
elif field == "Stores":
self.stores = int(value)
elif field == "Conflicts":
self.conflicts = int(value)
elif field == "Conflicts resolved":
self.conflicts_resolved = int(value)
def dump(self, f):
print("Server started:", self.start, file=f)
print("Clients:", self.clients, file=f)
print("Clients verifying:", self.verifying_clients, file=f)
print("Active transactions:", self.active_txns, file=f)
if self.lock_time:
howlong = time.time() - self.lock_time
print("Commit lock held for:", int(howlong), file=f)
print("Commits:", self.commits, file=f)
print("Aborts:", self.aborts, file=f)
print("Loads:", self.loads, file=f)
print("Stores:", self.stores, file=f)
print("Conflicts:", self.conflicts, file=f)
print("Conflicts resolved:", self.conflicts_resolved, file=f)
class StatsClient(asyncore.dispatcher):
def __init__(self, sock, addr):
asyncore.dispatcher.__init__(self, sock)
self.buf = []
self.closed = 0
def close(self):
self.closed = 1
# The socket is closed after all the data is written.
# See handle_write().
def write(self, s):
self.buf.append(s)
def writable(self):
return len(self.buf)
def readable(self):
return 0
def handle_write(self):
s = "".join(self.buf)
self.buf = []
n = self.socket.send(s.encode('ascii'))
if n < len(s):
self.buf.append(s[:n])
if self.closed and not self.buf:
asyncore.dispatcher.close(self)
class StatsServer(asyncore.dispatcher):
StatsConnectionClass = StatsClient
def __init__(self, addr, stats):
asyncore.dispatcher.__init__(self)
self.addr = addr
self.stats = stats
if type(self.addr) == tuple:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
logger = logging.getLogger('ZEO.monitor')
logger.info("listening on %s", repr(self.addr))
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error:
return
f = self.StatsConnectionClass(sock, addr)
self.dump(f)
f.close()
def dump(self, f):
print("ZEO monitor server version %s" % zeo_version, file=f)
print(time.ctime(), file=f)
print(file=f)
L = sorted(self.stats.keys())
for k in L:
stats = self.stats[k]
print("Storage:", k, file=f)
stats.dump(f)
print(file=f)
This diff is collapsed.
<schema>
<!-- note that zeoctl.xml is a closely related schema which should
match this schema, but should require the "runner" section -->
<description>
This schema describes the configuration of the ZEO storage server
process.
</description>
<!-- Use the storage types defined by ZODB. -->
<import package="ZODB"/>
<!-- Use the ZEO server information structure. -->
<import package="ZEO.tests.ZEO4"/>
<import package="ZConfig.components.logger"/>
<!-- runner control -->
<import package="zdaemon"/>
<section type="zeo" name="*" required="yes" attribute="zeo" />
<section type="runner" name="*" required="no" attribute="runner" />
<multisection name="*" type="ZODB.storage"
attribute="storages"
required="yes">
<description>
One or more storages that are provided by the ZEO server. The
section names are used as the storage names, and must be unique
within each ZEO storage server. Traditionally, these names
represent small integers starting at '1'.
</description>
</multisection>
<section name="*" type="eventlog" attribute="eventlog" required="no" />
</schema>
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# zrpc is a package with the following modules
# client -- manages connection creation to remote server
# connection -- object dispatcher
# log -- logging helper
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
# server -- manages incoming connections from remote clients
# smac -- sized message async connections
# trigger -- medusa's trigger
# zrpc is not an advertised subpackage of ZEO; its interfaces are internal
# This file is a slightly modified copy of Python 2.3's Lib/hmac.py.
# This file is under the Python Software Foundation (PSF) license.
"""HMAC (Keyed-Hashing for Message Authentication) Python module.
Implements the HMAC algorithm as described by RFC 2104.
"""
from six.moves import map
from six.moves import zip
def _strxor(s1, s2):
"""Utility method. XOR the two strings s1 and s2 (must have same length).
"""
return "".join(map(lambda x, y: chr(ord(x) ^ ord(y)), s1, s2))
# The size of the digests returned by HMAC depends on the underlying
# hashing module used.
digest_size = None
class HMAC:
"""RFC2104 HMAC class.
This supports the API for Cryptographic Hash Functions (PEP 247).
"""
def __init__(self, key, msg = None, digestmod = None):
"""Create a new HMAC object.
key: key for the keyed hash object.
msg: Initial input for the hash, if provided.
digestmod: A module supporting PEP 247. Defaults to the md5 module.
"""
if digestmod is None:
import md5
digestmod = md5
self.digestmod = digestmod
self.outer = digestmod.new()
self.inner = digestmod.new()
# Python 2.1 and 2.2 differ about the correct spelling
try:
self.digest_size = digestmod.digestsize
except AttributeError:
self.digest_size = digestmod.digest_size
blocksize = 64
ipad = "\x36" * blocksize
opad = "\x5C" * blocksize
if len(key) > blocksize:
key = digestmod.new(key).digest()
key = key + chr(0) * (blocksize - len(key))
self.outer.update(_strxor(key, opad))
self.inner.update(_strxor(key, ipad))
if msg is not None:
self.update(msg)
## def clear(self):
## raise NotImplementedError("clear() method not available in HMAC.")
def update(self, msg):
"""Update this hashing object with the string msg.
"""
self.inner.update(msg)
def copy(self):
"""Return a separate copy of this hashing object.
An update to this copy won't affect the original object.
"""
other = HMAC("")
other.digestmod = self.digestmod
other.inner = self.inner.copy()
other.outer = self.outer.copy()
return other
def digest(self):
"""Return the hash value of this hashing object.
This returns a string containing 8-bit data. The object is
not altered in any way by this function; you can continue
updating the object after calling this function.
"""
h = self.outer.copy()
h.update(self.inner.digest())
return h.digest()
def hexdigest(self):
"""Like digest(), but returns a string of hexadecimal digits instead.
"""
return "".join([hex(ord(x))[2:].zfill(2)
for x in tuple(self.digest())])
def new(key, msg = None, digestmod = None):
"""Create a new hashing object and return it.
key: The starting key for the hash.
msg: if available, will immediately be hashed into the object's starting
state.
You can now feed arbitrary strings into the object using its update()
method, and can ask for the hash value at any time by calling its digest()
method.
"""
return HMAC(key, msg, digestmod)
This diff is collapsed.
This diff is collapsed.
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from ZODB import POSException
from ZEO.Exceptions import ClientDisconnected
class ZRPCError(POSException.StorageError):
pass
class DisconnectedError(ZRPCError, ClientDisconnected):
"""The database storage is disconnected from the storage server.
The error occurred because a problem in the low-level RPC connection,
or because the connection was closed.
"""
# This subclass is raised when zrpc catches the error.
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import threading
import logging
from ZODB.loglevels import BLATHER
LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging
logger = logging.getLogger('ZEO.zrpc')
_label = "%s" % os.getpid()
def new_label():
global _label
_label = str(os.getpid())
def log(message, level=BLATHER, label=None, exc_info=False):
label = label or _label
if LOG_THREAD_ID:
label = label + ':' + threading.currentThread().getName()
logger.log(level, '(%s) %s' % (label, message), exc_info=exc_info)
REPR_LIMIT = 60
def short_repr(obj):
"Return an object repr limited to REPR_LIMIT bytes."
# Some of the objects being repr'd are large strings. A lot of memory
# would be wasted to repr them and then truncate, so they are treated
# specially in this function.
# Also handle short repr of a tuple containing a long string.
# This strategy works well for arguments to StorageServer methods.
# The oid is usually first and will get included in its entirety.
# The pickle is near the beginning, too, and you can often fit the
# module name in the pickle.
if isinstance(obj, str):
if len(obj) > REPR_LIMIT:
r = repr(obj[:REPR_LIMIT])
else:
r = repr(obj)
if len(r) > REPR_LIMIT:
r = r[:REPR_LIMIT-4] + '...' + r[-1]
return r
elif isinstance(obj, (list, tuple)):
elts = []
size = 0
for elt in obj:
r = short_repr(elt)
elts.append(r)
size += len(r)
if size > REPR_LIMIT:
break
if isinstance(obj, tuple):
r = "(%s)" % (", ".join(elts))
else:
r = "[%s]" % (", ".join(elts))
else:
r = repr(obj)
if len(r) > REPR_LIMIT:
return r[:REPR_LIMIT] + '...'
else:
return r
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import logging
from ZEO._compat import Unpickler, Pickler, BytesIO, PY3, PYPY
from .error import ZRPCError
from .log import log, short_repr
def encode(*args): # args: (msgid, flags, name, args)
# (We used to have a global pickler, but that's not thread-safe. :-( )
# It's not thread safe if, in the couse of pickling, we call the
# Python interpeter, which releases the GIL.
# Note that args may contain very large binary pickles already; for
# this reason, it's important to use proto 1 (or higher) pickles here
# too. For a long time, this used proto 0 pickles, and that can
# bloat our pickle to 4x the size (due to high-bit and control bytes
# being represented by \xij escapes in proto 0).
# Undocumented: cPickle.Pickler accepts a lone protocol argument;
# pickle.py does not.
if PY3:
# XXX: Py3: Needs optimization.
f = BytesIO()
pickler = Pickler(f, 3)
pickler.fast = 1
pickler.dump(args)
res = f.getvalue()
return res
else:
pickler = Pickler(1)
pickler.fast = 1
# Only CPython's cPickle supports dumping
# and returning in one operation:
# return pickler.dump(args, 1)
# For PyPy we must return the value; fortunately this
# works the same on CPython and is no more expensive
pickler.dump(args)
return pickler.getvalue()
if PY3:
# XXX: Py3: Needs optimization.
fast_encode = encode
elif PYPY:
# can't use the python-2 branch, need a new pickler
# every time, getvalue() only works once
fast_encode = encode
else:
def fast_encode():
# Only use in cases where you *know* the data contains only basic
# Python objects
pickler = Pickler(1)
pickler.fast = 1
dump = pickler.dump
def fast_encode(*args):
return dump(args, 1)
return fast_encode
fast_encode = fast_encode()
def decode(msg):
"""Decodes msg and returns its parts"""
unpickler = Unpickler(BytesIO(msg))
unpickler.find_global = find_global
try:
unpickler.find_class = find_global # PyPy, zodbpickle, the non-c-accelerated version
except AttributeError:
pass
try:
return unpickler.load() # msgid, flags, name, args
except:
log("can't decode message: %s" % short_repr(msg),
level=logging.ERROR)
raise
def server_decode(msg):
"""Decodes msg and returns its parts"""
unpickler = Unpickler(BytesIO(msg))
unpickler.find_global = server_find_global
try:
unpickler.find_class = server_find_global # PyPy, zodbpickle, the non-c-accelerated version
except AttributeError:
pass
try:
return unpickler.load() # msgid, flags, name, args
except:
log("can't decode message: %s" % short_repr(msg),
level=logging.ERROR)
raise
_globals = globals()
_silly = ('__doc__',)
exception_type_type = type(Exception)
def find_global(module, name):
"""Helper for message unpickler"""
try:
m = __import__(module, _globals, _globals, _silly)
except ImportError as msg:
raise ZRPCError("import error %s: %s" % (module, msg))
try:
r = getattr(m, name)
except AttributeError:
raise ZRPCError("module %s has no global %s" % (module, name))
safe = getattr(r, '__no_side_effects__', 0)
if safe:
return r
# TODO: is there a better way to do this?
if type(r) == exception_type_type and issubclass(r, Exception):
return r
raise ZRPCError("Unsafe global: %s.%s" % (module, name))
def server_find_global(module, name):
"""Helper for message unpickler"""
try:
if module != 'ZopeUndo.Prefix':
raise ImportError
m = __import__(module, _globals, _globals, _silly)
except ImportError as msg:
raise ZRPCError("import error %s: %s" % (module, msg))
try:
r = getattr(m, name)
except AttributeError:
raise ZRPCError("module %s has no global %s" % (module, name))
return r
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import socket
# _has_dualstack: True if the dual-stack sockets are supported
try:
# Check whether IPv6 sockets can be created
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
except (socket.error, AttributeError):
_has_dualstack = False
else:
# Check whether enabling dualstack (disabling v6only) works
try:
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
except (socket.error, AttributeError):
_has_dualstack = False
else:
_has_dualstack = True
s.close()
del s
from .connection import Connection
from .log import log
from .log import logger
import logging
# Export the main asyncore loop
loop = asyncore.loop
class Dispatcher(asyncore.dispatcher):
"""A server that accepts incoming RPC connections"""
__super_init = asyncore.dispatcher.__init__
def __init__(self, addr, factory=Connection, map=None):
self.__super_init(map=map)
self.addr = addr
self.factory = factory
self._open_socket()
def _open_socket(self):
if type(self.addr) == tuple:
if self.addr[0] == '' and _has_dualstack:
# Wildcard listen on all interfaces, both IPv4 and
# IPv6 if possible
self.create_socket(socket.AF_INET6, socket.SOCK_STREAM)
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
elif ':' in self.addr[0]:
self.create_socket(socket.AF_INET6, socket.SOCK_STREAM)
if _has_dualstack:
# On Linux, IPV6_V6ONLY is off by default.
# If the user explicitly asked for IPv6, don't bind to IPv4
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, True)
else:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
log("listening on %s" % str(self.addr), logging.INFO)
for i in range(25):
try:
self.bind(self.addr)
except Exception as exc:
log("bind failed %s waiting", i)
if i == 24:
raise
else:
time.sleep(5)
else:
break
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error as msg:
log("accepted failed: %s" % msg)
return
# We could short-circuit the attempt below in some edge cases
# and avoid a log message by checking for addr being None.
# Unfortunately, our test for the code below,
# quick_close_doesnt_kill_server, causes addr to be None and
# we'd have to write a test for the non-None case, which is
# *even* harder to provoke. :/ So we'll leave things as they
# are for now.
# It might be better to check whether the socket has been
# closed, but I don't see a way to do that. :(
# Drop flow-info from IPv6 addresses
if addr: # Sometimes None on Mac. See above.
addr = addr[:2]
try:
c = self.factory(sock, addr)
except:
if sock.fileno() in asyncore.socket_map:
del asyncore.socket_map[sock.fileno()]
logger.exception("Error in handle_accept")
else:
log("connect from %s: %s" % (repr(addr), c))
This diff is collapsed.
This diff is collapsed.
......@@ -32,6 +32,13 @@ logger = logging.getLogger('ZEO.tests.forker')
DEBUG = os.environ.get('ZEO_TEST_SERVER_DEBUG')
ZEO4_SERVER = os.environ.get('ZEO4_SERVER')
skip_if_testing_client_against_zeo4 = (
(lambda func: None)
if ZEO4_SERVER else
(lambda func: func)
)
class ZEOConfig:
"""Class to generate ZEO configuration file. """
......@@ -104,21 +111,30 @@ def runner(config, qin, qout, timeout=None,
))
try:
import ZEO.runzeo, threading
import threading
from six.moves.queue import Empty
options = ZEO.runzeo.ZEOOptions()
if ZEO4_SERVER:
from .ZEO4 import runzeo
else:
from .. import runzeo
options = runzeo.ZEOOptions()
options.realize(['-C', config])
server = ZEO.runzeo.ZEOServer(options)
server = runzeo.ZEOServer(options)
globals()[(name if name else 'last') + '_server'] = server
server.open_storages()
server.clear_socket()
server.create_server()
logger.debug('SERVER CREATED')
if ZEO4_SERVER:
qout.put(server.server.addr)
else:
qout.put(server.server.acceptor.addr)
logger.debug('ADDRESS SENT')
thread = threading.Thread(
target=server.server.loop, kwargs=dict(timeout=.2),
target=server.server.loop,
kwargs={} if ZEO4_SERVER else dict(timeout=.2),
name = None if name is None else name + '-server',
)
thread.setDaemon(True)
......
This diff is collapsed.
This diff is collapsed.
......@@ -8,6 +8,9 @@ import unittest
import ZEO.StorageServer
from . import forker
@unittest.skipIf(forker.ZEO4_SERVER, "ZEO4 servers don't support SSL")
class ClientAuthTests(setupstack.TestCase):
def setUp(self):
......@@ -50,7 +53,6 @@ class ClientAuthTests(setupstack.TestCase):
stop()
def test_suite():
return unittest.makeSuite(ClientAuthTests)
......@@ -9,6 +9,7 @@ from ZODB.broken import find_global
import ZEO
from . import forker
from .utils import StorageServer
class Var(object):
......@@ -16,10 +17,10 @@ class Var(object):
self.value = other
return True
@unittest.skipIf(forker.ZEO4_SERVER, "ZEO4 servers don't support SSL")
class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase):
def test_server_side(self):
# First, verify default conflict resolution.
server = StorageServer(self, DemoStorage())
zs = server.zs
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment