Commit 423cb856 authored by Kirill Smelkov's avatar Kirill Smelkov

client: Remove support for interoperability with ZEO4 server

As explained in https://github.com/zopefoundation/ZEO/issues/209 there is possibility of data
corruption when ZEO5 client loads data from ZEO4 server.

A fix for this is not trivial and would have to forward-port
load-tracking in client from ZEO4 to ZEO5. However, as discussed in
https://github.com/zopefoundation/ZEO/issues/209, we believe that noone
is actually using ZEO5.client-ZEO4.server configuration. Thus, given
that it was already planned to drop ZEO4 support soon, it was decided to
drop support for ZEO4 server instead of fixing it.

This contains 3 patches:

- patch 1 adds test that catches mentioned data corruption problem
- patch 2 corrects documentation for credentials (ZEO5 feature) and user/password/realm (ZEO4-only basic auth)
- patch 3 actually removes support for interoperability with ZEO4.

Please see individual patches and their descriptions for details.

In particular I believe the following excerpt from patch 3 is important to note here as well:

---- 8< ----
- we do _not_ remove verify_invalidation_queue added by Jim in 2016 via
  5ba506e7 (Fixed a bug handling ZEO4 invalidations during cache
  verification) with the following message:

        ZEO4 servers can send invalidations out of order with
        ``getInvalidations`` results, presumably because ``getInvalidations``
        didn't get the commit lock.  ZEO4 clients worked around this (maybe
        not directly) by queuing invalidations during cache verification.
        ZEO5 servers don't send invalidations out of order with
        ``getInvalidations`` calls and the ZEO5 client didn't need an
        invalidation queue, except they do need one to work correctly with
        ZEO4 servers. :/

  This feature, even-though it is commented as being ZEO4-only, looks
  too risky to be removed in stable branch, especially taking into
  account that in https://github.com/zopefoundation/ZEO/pull/195
  @d-maurer instead of removing, preserved this queue in a similar form:

  https://github.com/zopefoundation/ZEO/blob/30e271bbe5380a1fe65f3ca776bc70b4b29b58d9/src/ZEO/asyncio/client.py#L90
  https://github.com/zopefoundation/ZEO/blob/30e271bbe5380a1fe65f3ca776bc70b4b29b58d9/src/ZEO/asyncio/client.py#L277-L280
  https://github.com/zopefoundation/ZEO/blob/30e271bbe5380a1fe65f3ca776bc70b4b29b58d9/src/ZEO/asyncio/client.py#L630-L641

  I believe it is better be safe than sorry.
---- 8< ----

Thanks beforehand,
Kirill

/reviewed-by @dataflake, @d-maurer
/reviewed-on https://github.com/zopefoundation/ZEO/pull/213
parents a6072ecd 629dd350
......@@ -33,7 +33,6 @@ jobs:
- ["3.9", "docs"]
- ["3.9", "coverage"]
- ["2.7", "py27-msgpack1"]
- ["2.7", "py27-zeo4"]
- ["2.7", "py27-zodbmaster"]
- ["3.7", "py37-msgpack1"]
- ["3.7", "py37-uvloop"]
......
......@@ -28,7 +28,6 @@ testenv-deps = [
testenv-setenv = [
"!py27-!pypy: PYTHONWARNINGS=ignore::ResourceWarning",
"msgpack1: ZEO_MSGPACK=1",
"zeo4: ZEO4_SERVER=1",
]
[coverage]
......@@ -50,7 +49,6 @@ additional-rules = [
[github-actions]
additional-config = [
"- [\"2.7\", \"py27-msgpack1\"]",
"- [\"2.7\", \"py27-zeo4\"]",
"- [\"2.7\", \"py27-zodbmaster\"]",
"- [\"3.7\", \"py37-msgpack1\"]",
"- [\"3.7\", \"py37-uvloop\"]",
......
......@@ -4,6 +4,15 @@ Changelog
5.4.0 (unreleased)
------------------
- Remove support for interoperability with ZEO4 server. It turned out that ZEO5
client, contrary to interoperability with ZEO5 server, implements support for
interoperability with ZEO4 server incorrectly with concurrency bugs that lead
to data corruption. The fix is not trivial and we believe that in 2022 noone
actually uses ZEO5.client-ZEO4.server configuration. That's why support for
ZEO4 server was dropped rather than fixed.
See `issue 209 <https://github.com/zopefoundation/ZEO/issues/209>` for details.
- If the ``zeopack`` script cannot connect to a server it sets exit status 1
See `#214 <https://github.com/zopefoundation/ZEO/issues/214>`_.
......
......@@ -219,12 +219,7 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
Wait for server connection, defaulting to true.
credentials
username
password
realm
[ZEO4 only] Credentials for authentication to server.
In ZEO5 support for credentials has been dropped in favor of SSL.
`credentials` support is scheduled to be removed in `ZEO6`.
[Experimental] Credentials object for authentication to server.
server_sync
Whether sync() should make a server round trip, thus causing client
......@@ -252,6 +247,9 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
Defaults to false.
username
password
realm
disconnect_poll
min_disconnect_poll
max_disconnect_poll
......@@ -263,8 +261,6 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
testConnection() and doAuth() for details).
"""
assert not username or password or realm
if isinstance(addr, int):
addr = ('127.0.0.1', addr)
......@@ -1045,11 +1041,6 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
# Below are methods invoked by the StorageServer
def serialnos(self, args):
"""Server callback to pass a list of changed (oid, serial) pairs.
"""
self._tbuf.serialnos(args)
def info(self, dict):
"""Server callback to update the info dictionary."""
self._info.update(dict)
......
......@@ -91,15 +91,3 @@ class TransactionBuffer(object):
for oid in server_resolved:
if oid not in seen:
yield oid, None, True
# Support ZEO4:
def serialnos(self, args):
for oid in args:
if isinstance(oid, bytes):
self.server_resolved.add(oid)
else:
oid, serial = oid
if isinstance(serial, Exception):
self.exception = serial
elif serial == b'rs':
self.server_resolved.add(oid)
......@@ -27,7 +27,6 @@ from ZEO._compat import StringIO
logger = logging.getLogger('ZEO.tests.forker')
DEBUG = os.environ.get('ZEO_TEST_SERVER_DEBUG')
ZEO4_SERVER = os.environ.get('ZEO4_SERVER')
class ZEOConfig(object):
......@@ -102,12 +101,6 @@ def runner(config, qin, qout, timeout=None,
try:
import threading
if ZEO4_SERVER:
# XXX: test dependency. In practice this is
# probably ok
from ZEO.tests.ZEO4 import runzeo
else:
from . import runzeo
options = runzeo.ZEOOptions()
......@@ -118,9 +111,6 @@ def runner(config, qin, qout, timeout=None,
server.clear_socket()
server.create_server()
logger.debug('SERVER CREATED')
if ZEO4_SERVER:
qout.put(server.server.addr)
else:
qout.put(server.server.acceptor.addr)
logger.debug('ADDRESS SENT')
thread = threading.Thread(
......
......@@ -86,7 +86,7 @@ class Protocol(base.Protocol):
# One place where special care was required was in cache setup on
# connect. See finish connect below.
protocols = b'309', b'310', b'3101', b'4', b'5'
protocols = b'5',
def __init__(self, loop,
addr, client, storage_key, read_only, connect_poll=1,
......@@ -313,11 +313,11 @@ class Protocol(base.Protocol):
# syncronously, as that would lead to DEADLOCK!
client_methods = (
'invalidateTransaction', 'serialnos', 'info',
'invalidateTransaction', 'info',
'receiveBlobStart', 'receiveBlobChunk', 'receiveBlobStop',
# plus: notify_connected, notify_disconnected
)
client_delegated = client_methods[2:]
client_delegated = client_methods[1:]
def heartbeat(self, write=True):
if write:
......@@ -434,10 +434,10 @@ class Client(object):
self.protocols = ()
self.disconnected(None)
# Work around odd behavior of ZEO4 server. It may send
# invalidations for transactions later than the result of
# getInvalidations. While we support ZEO 4 servers, we'll
# need to keep an invalidation queue. :(
# Protection against potentially odd behavior of a ZEO server: if it
# may send invalidations for transactions later than the result of
# getInvalidations, without queueing, client's cache might get out of
# sync wrt data on the server.
self.verify_invalidation_queue = []
def new_addrs(self, addrs):
......@@ -598,7 +598,7 @@ class Client(object):
self.cache.setLastTid(server_tid)
self.ready = True
# Gaaaa, ZEO 4 work around. See comment in __init__. :(
# See comment in __init__. :(
for tid, oids in self.verify_invalidation_queue:
if tid > server_tid:
self.invalidateTransaction(tid, oids)
......@@ -769,24 +769,6 @@ class Client(object):
else:
self.verify_invalidation_queue.append((tid, oids))
def serialnos(self, serials):
# Method called by ZEO4 storage servers.
# Before delegating, check for errors (likely ConflictErrors)
# and invalidate the oids they're associated with. In the
# past, this was done by the client, but now we control the
# cache and this is our last chance, as the client won't call
# back into us when there's an error.
for oid in serials:
if isinstance(oid, bytes):
self.cache.invalidate(oid, None)
else:
oid, serial = oid
if isinstance(serial, Exception) or serial == b'rs':
self.cache.invalidate(oid, None)
self.client.serialnos(serials)
@property
def protocol_version(self):
return self.protocol.protocol_version
......
......@@ -112,8 +112,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
transport = loop.transport
if finish_start:
protocol.data_received(sized(self.enc + b'3101'))
self.assertEqual(self.pop(2, False), self.enc + b'3101')
protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.pop(2, False), self.enc + b'5')
self.respond(1, None)
self.respond(2, 'a'*8)
self.pop(4)
......@@ -323,8 +323,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# This time we'll send a lower protocol version. The client
# will send it back, because it's lower than the client's
# protocol:
protocol.data_received(sized(self.enc + b'310'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'310')
protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.assertEqual(self.pop(), (1, False, 'register', ('TEST', False)))
self.assertFalse(wrapper.notify_connected.called)
......@@ -359,8 +359,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
cache.store(b'2'*8, b'a'*8, None, '2 data')
self.assertFalse(client.connected.done() or transport.data)
protocol.data_received(sized(self.enc + b'3101'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101')
protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None)
self.respond(2, b'e'*8)
self.pop(4)
......@@ -395,8 +395,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
self.assertTrue(cache)
self.assertFalse(client.connected.done() or transport.data)
protocol.data_received(sized(self.enc + b'3101'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101')
protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None)
self.respond(2, b'e'*8)
self.pop(4)
......@@ -447,8 +447,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
self.assertEqual(sorted(loop.connecting), addrs[:1])
protocol = loop.protocol
transport = loop.transport
protocol.data_received(sized(self.enc + b'3101'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101')
protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None)
# Now, when the first connection fails, it won't be retried,
......@@ -465,8 +465,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
wrapper, cache, loop, client, protocol, transport = self.start()
cache.store(b'4'*8, b'a'*8, None, '4 data')
cache.setLastTid('b'*8)
protocol.data_received(sized(self.enc + b'3101'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101')
protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None)
self.respond(2, 'a'*8)
self.pop()
......@@ -479,8 +479,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
self.assertFalse(transport is loop.transport)
protocol = loop.protocol
transport = loop.transport
protocol.data_received(sized(self.enc + b'3101'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101')
protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None)
self.respond(2, 'b'*8)
self.pop(4)
......@@ -499,8 +499,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# We'll treat the first address as read-only and we'll let it connect:
loop.connect_connecting(addrs[0])
protocol, transport = loop.protocol, loop.transport
protocol.data_received(sized(self.enc + b'3101'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101')
protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
# We see that the client tried a writable connection:
self.assertEqual(self.pop(),
(1, False, 'register', ('TEST', False)))
......@@ -531,9 +531,9 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# We connect the second address:
loop.connect_connecting(addrs[1])
loop.protocol.data_received(sized(self.enc + b'3101'))
loop.protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(loop.transport.pop(2)),
self.enc + b'3101')
self.enc + b'5')
self.assertEqual(self.parse(loop.transport.pop()),
(1, False, 'register', ('TEST', False)))
self.assertTrue(self.is_read_only())
......@@ -567,8 +567,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
def test_invalidations_while_verifying(self):
# While we're verifying, invalidations are ignored
wrapper, cache, loop, client, protocol, transport = self.start()
protocol.data_received(sized(self.enc + b'3101'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101')
protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None)
self.pop(4)
self.send('invalidateTransaction', b'b'*8, [b'1'*8], called=False)
......@@ -586,8 +586,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# Similarly, invalidations aren't processed while reconnecting:
protocol.data_received(sized(self.enc + b'3101'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'3101')
protocol.data_received(sized(self.enc + b'5'))
self.assertEqual(self.unsized(transport.pop(2)), self.enc + b'5')
self.respond(1, None)
self.pop(4)
self.send('invalidateTransaction', b'd'*8, [b'1'*8], called=False)
......
======================
Copy of ZEO 4 server
======================
This copy was made by first converting the ZEO 4 server code to use
relative imports. The code was tested with ZEO 4 before copying. It
was unchanged aside from the relative imports.
The ZEO 4 server is used for tests if the ZEO4_SERVER environment
variable is set to a non-empty value.
This diff is collapsed.
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
_auth_modules = {}
def get_module(name):
if name == 'sha':
from auth_sha import StorageClass, SHAClient, Database
return StorageClass, SHAClient, Database
elif name == 'digest':
from .auth_digest import StorageClass, DigestClient, DigestDatabase
return StorageClass, DigestClient, DigestDatabase
else:
return _auth_modules.get(name)
def register_module(name, storage_class, client, db):
if name in _auth_modules:
raise TypeError("%s is already registred" % name)
_auth_modules[name] = storage_class, client, db
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Digest authentication for ZEO
This authentication mechanism follows the design of HTTP digest
authentication (RFC 2069). It is a simple challenge-response protocol
that does not send passwords in the clear, but does not offer strong
security. The RFC discusses many of the limitations of this kind of
protocol.
Guard the password database as if it contained plaintext passwords.
It stores the hash of a username and password. This does not expose
the plaintext password, but it is sensitive nonetheless. An attacker
with the hash can impersonate the real user. This is a limitation of
the simple digest scheme.
HTTP is a stateless protocol, and ZEO is a stateful protocol. The
security requirements are quite different as a result. The HTTP
protocol uses a nonce as a challenge. The ZEO protocol requires a
separate session key that is used for message authentication. We
generate a second nonce for this purpose; the hash of nonce and
user/realm/password is used as the session key.
TODO: I'm not sure if this is a sound approach; SRP would be preferred.
"""
import os
import random
import struct
import time
from .base import Database, Client
from ..StorageServer import ZEOStorage
from ZEO.Exceptions import AuthError
from ..hash import sha1
def get_random_bytes(n=8):
try:
b = os.urandom(n)
except NotImplementedError:
L = [chr(random.randint(0, 255)) for i in range(n)]
b = b"".join(L)
return b
def hexdigest(s):
return sha1(s.encode()).hexdigest()
class DigestDatabase(Database):
def __init__(self, filename, realm=None):
Database.__init__(self, filename, realm)
# Initialize a key used to build the nonce for a challenge.
# We need one key for the lifetime of the server, so it
# is convenient to store in on the database.
self.noncekey = get_random_bytes(8)
def _store_password(self, username, password):
dig = hexdigest("%s:%s:%s" % (username, self.realm, password))
self._users[username] = dig
def session_key(h_up, nonce):
# The hash itself is a bit too short to be a session key.
# HMAC wants a 64-byte key. We don't want to use h_up
# directly because it would never change over time. Instead
# use the hash plus part of h_up.
return (sha1(("%s:%s" % (h_up, nonce)).encode('latin-1')).digest() +
h_up.encode('utf-8')[:44])
class StorageClass(ZEOStorage):
def set_database(self, database):
assert isinstance(database, DigestDatabase)
self.database = database
self.noncekey = database.noncekey
def _get_time(self):
# Return a string representing the current time.
t = int(time.time())
return struct.pack("i", t)
def _get_nonce(self):
# RFC 2069 recommends a nonce of the form
# H(client-IP ":" time-stamp ":" private-key)
dig = sha1()
dig.update(str(self.connection.addr).encode('latin-1'))
dig.update(self._get_time())
dig.update(self.noncekey)
return dig.hexdigest()
def auth_get_challenge(self):
"""Return realm, challenge, and nonce."""
self._challenge = self._get_nonce()
self._key_nonce = self._get_nonce()
return self.auth_realm, self._challenge, self._key_nonce
def auth_response(self, resp):
# verify client response
user, challenge, response = resp
# Since zrpc is a stateful protocol, we just store the nonce
# we sent to the client. It will need to generate a new
# nonce for a new connection anyway.
if self._challenge != challenge:
raise ValueError("invalid challenge")
# lookup user in database
h_up = self.database.get_password(user)
# regeneration resp from user, password, and nonce
check = hexdigest("%s:%s" % (h_up, challenge))
if check == response:
self.connection.setSessionKey(session_key(h_up, self._key_nonce))
return self._finish_auth(check == response)
extensions = [auth_get_challenge, auth_response]
class DigestClient(Client):
extensions = ["auth_get_challenge", "auth_response"]
def start(self, username, realm, password):
_realm, challenge, nonce = self.stub.auth_get_challenge()
if _realm != realm:
raise AuthError("expected realm %r, got realm %r"
% (_realm, realm))
h_up = hexdigest("%s:%s:%s" % (username, realm, password))
resp_dig = hexdigest("%s:%s" % (h_up, challenge))
result = self.stub.auth_response((username, challenge, resp_dig))
if result:
return session_key(h_up, nonce)
else:
return None
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Base classes for defining an authentication protocol.
Database -- abstract base class for password database
Client -- abstract base class for authentication client
"""
from __future__ import print_function
from __future__ import print_function
import os
from ..hash import sha1
class Client(object):
# Subclass should override to list the names of methods that
# will be called on the server.
extensions = []
def __init__(self, stub):
self.stub = stub
for m in self.extensions:
setattr(self.stub, m, self.stub.extensionMethod(m))
def sort(L):
"""Sort a list in-place and return it."""
L.sort()
return L
class Database(object):
"""Abstracts a password database.
This class is used both in the authentication process (via
get_password()) and by client scripts that manage the password
database file.
The password file is a simple, colon-separated text file mapping
usernames to password hashes. The hashes are SHA hex digests
produced from the password string.
"""
realm = None
def __init__(self, filename, realm=None):
"""Creates a new Database
filename: a string containing the full pathname of
the password database file. Must be readable by the user
running ZEO. Must be writeable by any client script that
accesses the database.
realm: the realm name (a string)
"""
self._users = {}
self.filename = filename
self.load()
if realm:
if self.realm and self.realm != realm:
raise ValueError("Specified realm %r differs from database "
"realm %r" % (realm or '', self.realm))
else:
self.realm = realm
def save(self, fd=None):
filename = self.filename
needs_closed = False
if not fd:
fd = open(filename, 'w')
needs_closed = True
try:
if self.realm:
print("realm", self.realm, file=fd)
for username in sorted(self._users.keys()):
print("%s: %s" % (username, self._users[username]), file=fd)
finally:
if needs_closed:
fd.close()
def load(self):
filename = self.filename
if not filename:
return
if not os.path.exists(filename):
return
with open(filename) as fd:
L = fd.readlines()
if not L:
return
if L[0].startswith("realm "):
line = L.pop(0).strip()
self.realm = line[len("realm "):]
for line in L:
username, hash = line.strip().split(":", 1)
self._users[username] = hash.strip()
def _store_password(self, username, password):
self._users[username] = self.hash(password)
def get_password(self, username):
"""Returns password hash for specified username.
Callers must check for LookupError, which is raised in
the case of a non-existent user specified."""
if username not in self._users:
raise LookupError("No such user: %s" % username)
return self._users[username]
def hash(self, s):
return sha1(s.encode()).hexdigest()
def add_user(self, username, password):
if username in self._users:
raise LookupError("User %s already exists" % username)
self._store_password(username, password)
def del_user(self, username):
if username not in self._users:
raise LookupError("No such user: %s" % username)
del self._users[username]
def change_password(self, username, password):
if username not in self._users:
raise LookupError("No such user: %s" % username)
self._store_password(username, password)
"""HMAC (Keyed-Hashing for Message Authentication) Python module.
Implements the HMAC algorithm as described by RFC 2104.
"""
from six.moves import map
def _strxor(s1, s2):
"""Utility method. XOR the two strings s1 and s2 (must have same length).
"""
return "".join(map(lambda x, y: chr(ord(x) ^ ord(y)), s1, s2))
# The size of the digests returned by HMAC depends on the underlying
# hashing module used.
digest_size = None
class HMAC(object):
"""RFC2104 HMAC class.
This supports the API for Cryptographic Hash Functions (PEP 247).
"""
def __init__(self, key, msg=None, digestmod=None):
"""Create a new HMAC object.
key: key for the keyed hash object.
msg: Initial input for the hash, if provided.
digestmod: A module supporting PEP 247. Defaults to the md5 module.
"""
if digestmod is None:
import md5
digestmod = md5
self.digestmod = digestmod
self.outer = digestmod.new()
self.inner = digestmod.new()
self.digest_size = digestmod.digest_size
blocksize = 64
ipad = "\x36" * blocksize
opad = "\x5C" * blocksize
if len(key) > blocksize:
key = digestmod.new(key).digest()
key = key + chr(0) * (blocksize - len(key))
self.outer.update(_strxor(key, opad))
self.inner.update(_strxor(key, ipad))
if msg is not None:
self.update(msg)
# def clear(self):
# raise NotImplementedError("clear() method not available in HMAC.")
def update(self, msg):
"""Update this hashing object with the string msg.
"""
self.inner.update(msg)
def copy(self):
"""Return a separate copy of this hashing object.
An update to this copy won't affect the original object.
"""
other = HMAC("")
other.digestmod = self.digestmod
other.inner = self.inner.copy()
other.outer = self.outer.copy()
return other
def digest(self):
"""Return the hash value of this hashing object.
This returns a string containing 8-bit data. The object is
not altered in any way by this function; you can continue
updating the object after calling this function.
"""
h = self.outer.copy()
h.update(self.inner.digest())
return h.digest()
def hexdigest(self):
"""Like digest(), but returns a string of hexadecimal digits instead.
"""
return "".join([hex(ord(x))[2:].zfill(2)
for x in tuple(self.digest())])
def new(key, msg=None, digestmod=None):
"""Create a new hashing object and return it.
key: The starting key for the hash.
msg: if available, will immediately be hashed into the object's starting
state.
You can now feed arbitrary strings into the object using its update()
method, and can ask for the hash value at any time by calling its digest()
method.
"""
return HMAC(key, msg, digestmod)
<component>
<sectiontype name="zeo">
<description>
The content of a ZEO section describe operational parameters
of a ZEO server except for the storage(s) to be served.
</description>
<key name="address" datatype="socket-binding-address"
required="yes">
<description>
The address at which the server should listen. This can be in
the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
</description>
</key>
<key name="read-only" datatype="boolean"
required="no"
default="false">
<description>
Flag indicating whether the server should operate in read-only
mode. Defaults to false. Note that even if the server is
operating in writable mode, individual storages may still be
read-only. But if the server is in read-only mode, no write
operations are allowed, even if the storages are writable. Note
that pack() is considered a read-only operation.
</description>
</key>
<key name="invalidation-queue-size" datatype="integer"
required="no"
default="100">
<description>
The storage server keeps a queue of the objects modified by the
last N transactions, where N == invalidation_queue_size. This
queue is used to speed client cache verification when a client
disconnects for a short period of time.
</description>
</key>
<key name="invalidation-age" datatype="float" required="no">
<description>
The maximum age of a client for which quick-verification
invalidations will be provided by iterating over the served
storage. This option should only be used if the served storage
supports efficient iteration from a starting point near the
end of the transaction history (e.g. end of file).
</description>
</key>
<key name="monitor-address" datatype="socket-binding-address"
required="no">
<description>
The address at which the monitor server should listen. If
specified, a monitor server is started. The monitor server
provides server statistics in a simple text format. This can
be in the form 'host:port' to signify a TCP/IP connection or a
pathname string to signify a Unix domain socket connection (at
least one '/' is required). A hostname may be a DNS name or a
dotted IP address. If the hostname is omitted, the platform's
default behavior is used when binding the listening socket (''
is passed to socket.bind() as the hostname portion of the
address).
</description>
</key>
<key name="transaction-timeout" datatype="integer"
required="no">
<description>
The maximum amount of time to wait for a transaction to commit
after acquiring the storage lock, specified in seconds. If the
transaction takes too long, the client connection will be closed
and the transaction aborted.
</description>
</key>
<key name="authentication-protocol" required="no">
<description>
The name of the protocol used for authentication. The
only protocol provided with ZEO is "digest," but extensions
may provide other protocols.
</description>
</key>
<key name="authentication-database" required="no">
<description>
The path of the database containing authentication credentials.
</description>
</key>
<key name="authentication-realm" required="no">
<description>
The authentication realm of the server. Some authentication
schemes use a realm to identify the logical set of usernames
that are accepted by this server.
</description>
</key>
<key name="pid-filename" datatype="existing-dirpath"
required="no">
<description>
The full path to the file in which to write the ZEO server's Process ID
at startup. If omitted, $INSTANCE/var/ZEO.pid is used.
</description>
<metadefault>$INSTANCE/var/ZEO.pid (or $clienthome/ZEO.pid)</metadefault>
</key>
<!-- DM 2006-06-12: added option -->
<key name="drop-cache-rather-verify" datatype="boolean"
required="no" default="false">
<description>
indicates that the cache should be dropped rather than
verified when the verification optimization is not
available (e.g. when the ZEO server restarted).
</description>
</key>
</sectiontype>
</component>
##############################################################################
#
# Copyright (c) 2008 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""In Python 2.6, the "sha" and "md5" modules have been deprecated
in favor of using hashlib for both. This class allows for compatibility
between versions."""
try:
import hashlib
sha1 = hashlib.sha1
new = sha1
except ImportError:
import sha
sha1 = sha.new
new = sha1
digest_size = sha.digest_size
##############################################################################
#
# Copyright (c) 2003 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Monitor behavior of ZEO server and record statistics.
"""
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
import asyncore
import socket
import time
import logging
zeo_version = 'unknown'
try:
import pkg_resources
except ImportError:
pass
else:
zeo_dist = pkg_resources.working_set.find(
pkg_resources.Requirement.parse('ZODB3')
)
if zeo_dist is not None:
zeo_version = zeo_dist.version
class StorageStats(object):
"""Per-storage usage statistics."""
def __init__(self, connections=None):
self.connections = connections
self.loads = 0
self.stores = 0
self.commits = 0
self.aborts = 0
self.active_txns = 0
self.verifying_clients = 0
self.lock_time = None
self.conflicts = 0
self.conflicts_resolved = 0
self.start = time.ctime()
@property
def clients(self):
return len(self.connections)
def parse(self, s):
# parse the dump format
lines = s.split("\n")
for line in lines:
field, value = line.split(":", 1)
if field == "Server started":
self.start = value
elif field == "Clients":
# Hack because we use this both on the server and on
# the client where there are no connections.
self.connections = [0] * int(value)
elif field == "Clients verifying":
self.verifying_clients = int(value)
elif field == "Active transactions":
self.active_txns = int(value)
elif field == "Commit lock held for":
# This assumes
self.lock_time = time.time() - int(value)
elif field == "Commits":
self.commits = int(value)
elif field == "Aborts":
self.aborts = int(value)
elif field == "Loads":
self.loads = int(value)
elif field == "Stores":
self.stores = int(value)
elif field == "Conflicts":
self.conflicts = int(value)
elif field == "Conflicts resolved":
self.conflicts_resolved = int(value)
def dump(self, f):
print("Server started:", self.start, file=f)
print("Clients:", self.clients, file=f)
print("Clients verifying:", self.verifying_clients, file=f)
print("Active transactions:", self.active_txns, file=f)
if self.lock_time:
howlong = time.time() - self.lock_time
print("Commit lock held for:", int(howlong), file=f)
print("Commits:", self.commits, file=f)
print("Aborts:", self.aborts, file=f)
print("Loads:", self.loads, file=f)
print("Stores:", self.stores, file=f)
print("Conflicts:", self.conflicts, file=f)
print("Conflicts resolved:", self.conflicts_resolved, file=f)
class StatsClient(asyncore.dispatcher):
def __init__(self, sock, addr):
asyncore.dispatcher.__init__(self, sock)
self.buf = []
self.closed = 0
def close(self):
self.closed = 1
# The socket is closed after all the data is written.
# See handle_write().
def write(self, s):
self.buf.append(s)
def writable(self):
return len(self.buf)
def readable(self):
return 0
def handle_write(self):
s = "".join(self.buf)
self.buf = []
n = self.socket.send(s.encode('ascii'))
if n < len(s):
self.buf.append(s[:n])
if self.closed and not self.buf:
asyncore.dispatcher.close(self)
class StatsServer(asyncore.dispatcher):
StatsConnectionClass = StatsClient
def __init__(self, addr, stats):
asyncore.dispatcher.__init__(self)
self.addr = addr
self.stats = stats
if type(self.addr) == tuple:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
logger = logging.getLogger('ZEO.monitor')
logger.info("listening on %s", repr(self.addr))
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error:
return
f = self.StatsConnectionClass(sock, addr)
self.dump(f)
f.close()
def dump(self, f):
print("ZEO monitor server version %s" % zeo_version, file=f)
print(time.ctime(), file=f)
print(file=f)
L = sorted(self.stats.keys())
for k in L:
stats = self.stats[k]
print("Storage:", k, file=f)
stats.dump(f)
print(file=f)
This diff is collapsed.
<schema>
<!-- note that zeoctl.xml is a closely related schema which should
match this schema, but should require the "runner" section -->
<description>
This schema describes the configuration of the ZEO storage server
process.
</description>
<!-- Use the storage types defined by ZODB. -->
<import package="ZODB"/>
<!-- Use the ZEO server information structure. -->
<import package="ZEO.tests.ZEO4"/>
<import package="ZConfig.components.logger"/>
<!-- runner control -->
<import package="zdaemon"/>
<section type="zeo" name="*" required="yes" attribute="zeo" />
<section type="runner" name="*" required="no" attribute="runner" />
<multisection name="*" type="ZODB.storage"
attribute="storages"
required="yes">
<description>
One or more storages that are provided by the ZEO server. The
section names are used as the storage names, and must be unique
within each ZEO storage server. Traditionally, these names
represent small integers starting at '1'.
</description>
</multisection>
<section name="*" type="eventlog" attribute="eventlog" required="no" />
</schema>
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# zrpc is a package with the following modules
# client -- manages connection creation to remote server
# connection -- object dispatcher
# log -- logging helper
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
# server -- manages incoming connections from remote clients
# smac -- sized message async connections
# trigger -- medusa's trigger
# zrpc is not an advertised subpackage of ZEO; its interfaces are internal
# This file is a slightly modified copy of Python 2.3's Lib/hmac.py.
# This file is under the Python Software Foundation (PSF) license.
"""HMAC (Keyed-Hashing for Message Authentication) Python module.
Implements the HMAC algorithm as described by RFC 2104.
"""
from six.moves import map
def _strxor(s1, s2):
"""Utility method. XOR the two strings s1 and s2 (must have same length).
"""
return "".join(map(lambda x, y: chr(ord(x) ^ ord(y)), s1, s2))
# The size of the digests returned by HMAC depends on the underlying
# hashing module used.
digest_size = None
class HMAC(object):
"""RFC2104 HMAC class.
This supports the API for Cryptographic Hash Functions (PEP 247).
"""
def __init__(self, key, msg=None, digestmod=None):
"""Create a new HMAC object.
key: key for the keyed hash object.
msg: Initial input for the hash, if provided.
digestmod: A module supporting PEP 247. Defaults to the md5 module.
"""
if digestmod is None:
import md5
digestmod = md5
self.digestmod = digestmod
self.outer = digestmod.new()
self.inner = digestmod.new()
# Python 2.1 and 2.2 differ about the correct spelling
try:
self.digest_size = digestmod.digestsize
except AttributeError:
self.digest_size = digestmod.digest_size
blocksize = 64
ipad = "\x36" * blocksize
opad = "\x5C" * blocksize
if len(key) > blocksize:
key = digestmod.new(key).digest()
key = key + chr(0) * (blocksize - len(key))
self.outer.update(_strxor(key, opad))
self.inner.update(_strxor(key, ipad))
if msg is not None:
self.update(msg)
# def clear(self):
# raise NotImplementedError("clear() method not available in HMAC.")
def update(self, msg):
"""Update this hashing object with the string msg.
"""
self.inner.update(msg)
def copy(self):
"""Return a separate copy of this hashing object.
An update to this copy won't affect the original object.
"""
other = HMAC("")
other.digestmod = self.digestmod
other.inner = self.inner.copy()
other.outer = self.outer.copy()
return other
def digest(self):
"""Return the hash value of this hashing object.
This returns a string containing 8-bit data. The object is
not altered in any way by this function; you can continue
updating the object after calling this function.
"""
h = self.outer.copy()
h.update(self.inner.digest())
return h.digest()
def hexdigest(self):
"""Like digest(), but returns a string of hexadecimal digits instead.
"""
return "".join([hex(ord(x))[2:].zfill(2)
for x in tuple(self.digest())])
def new(key, msg=None, digestmod=None):
"""Create a new hashing object and return it.
key: The starting key for the hash.
msg: if available, will immediately be hashed into the object's starting
state.
You can now feed arbitrary strings into the object using its update()
method, and can ask for the hash value at any time by calling its digest()
method.
"""
return HMAC(key, msg, digestmod)
This diff is collapsed.
This diff is collapsed.
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from ZODB import POSException
from ZEO.Exceptions import ClientDisconnected
class ZRPCError(POSException.StorageError):
pass
class DisconnectedError(ZRPCError, ClientDisconnected):
"""The database storage is disconnected from the storage server.
The error occurred because a problem in the low-level RPC connection,
or because the connection was closed.
"""
# This subclass is raised when zrpc catches the error.
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import threading
import logging
from ZODB.loglevels import BLATHER
LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging
logger = logging.getLogger('ZEO.zrpc')
_label = "%s" % os.getpid()
def new_label():
global _label
_label = str(os.getpid())
def log(message, level=BLATHER, label=None, exc_info=False):
label = label or _label
if LOG_THREAD_ID:
label = label + ':' + threading.currentThread().getName()
logger.log(level, '(%s) %s' % (label, message), exc_info=exc_info)
REPR_LIMIT = 60
def short_repr(obj):
"Return an object repr limited to REPR_LIMIT bytes."
# Some of the objects being repr'd are large strings. A lot of memory
# would be wasted to repr them and then truncate, so they are treated
# specially in this function.
# Also handle short repr of a tuple containing a long string.
# This strategy works well for arguments to StorageServer methods.
# The oid is usually first and will get included in its entirety.
# The pickle is near the beginning, too, and you can often fit the
# module name in the pickle.
if isinstance(obj, str):
if len(obj) > REPR_LIMIT:
r = repr(obj[:REPR_LIMIT])
else:
r = repr(obj)
if len(r) > REPR_LIMIT:
r = r[:REPR_LIMIT-4] + '...' + r[-1]
return r
elif isinstance(obj, (list, tuple)):
elts = []
size = 0
for elt in obj:
r = short_repr(elt)
elts.append(r)
size += len(r)
if size > REPR_LIMIT:
break
if isinstance(obj, tuple):
r = "(%s)" % (", ".join(elts))
else:
r = "[%s]" % (", ".join(elts))
else:
r = repr(obj)
if len(r) > REPR_LIMIT:
return r[:REPR_LIMIT] + '...'
else:
return r
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
<component>
<import package="ZODB"/>
<sectiontype
name="loaddelayed_storage"
datatype="ZEO.tests.racetest.ZConfigLoadDelayed"
implements="ZODB.storage"
>
<section type="ZODB.storage" name="*" attribute="base" required="yes" />
</sectiontype>
</component>
......@@ -35,15 +35,6 @@ logger = logging.getLogger('ZEO.tests.forker')
DEBUG = _forker.DEBUG
ZEO4_SERVER = _forker.ZEO4_SERVER
skip_if_testing_client_against_zeo4 = (
(lambda func: None)
if ZEO4_SERVER else
(lambda func: func)
)
ZEOConfig = _forker.ZEOConfig
......
This diff is collapsed.
This diff is collapsed.
......@@ -102,9 +102,8 @@ test_classes = [FileStorageConnectionTests,
FileStorageTimeoutTests,
MappingStorageConnectionTests,
MappingStorageTimeoutTests,
SSLConnectionTests,
]
if not forker.ZEO4_SERVER:
test_classes.append(SSLConnectionTests)
def invalidations_while_connecting():
......
......@@ -58,7 +58,7 @@ class FakeServer(object):
class FakeConnection(object):
protocol_version = b'Z4'
protocol_version = b'Z5'
addr = 'test'
def call_soon_threadsafe(f, *a):
......
This diff is collapsed.
......@@ -8,11 +8,9 @@ import unittest
import ZEO.StorageServer
from . import forker
from .threaded import threaded_server_tests
@unittest.skipIf(forker.ZEO4_SERVER, "ZEO4 servers don't support SSL")
class ClientAuthTests(setupstack.TestCase):
def setUp(self):
......
......@@ -12,7 +12,6 @@ from ZODB.broken import find_global
import ZEO
from . import forker
from .utils import StorageServer
......@@ -22,7 +21,6 @@ class Var(object):
return True
@unittest.skipIf(forker.ZEO4_SERVER, "ZEO4 servers don't support SSL")
class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase):
def test_server_side(self):
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment