Commit 7e089655 authored by Guido van Rossum's avatar Guido van Rossum

Add docstrings to every function, class and method in this module.

Also remove unused optional argument 'last' from new_oid() and
'_stuff' from load().
parent ac0c6476
...@@ -11,9 +11,16 @@ ...@@ -11,9 +11,16 @@
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
"""Network ZODB storage client """The ClientStorage class and the exceptions that it may raise.
$Id: ClientStorage.py,v 1.68 2002/09/25 22:02:44 bwarsaw Exp $ Public contents of this module:
ClientStorage -- the main class, implementing the Storage API
ClientStorageError -- exception raised by ClientStorage
UnrecognizedResult -- exception raised by ClientStorage
ClientDisconnected -- exception raised by ClientStorage
$Id: ClientStorage.py,v 1.69 2002/10/01 16:37:03 gvanrossum Exp $
""" """
# XXX TO DO # XXX TO DO
...@@ -45,15 +52,21 @@ except ImportError: ...@@ -45,15 +52,21 @@ except ImportError:
ResolvedSerial = 'rs' ResolvedSerial = 'rs'
class ClientStorageError(POSException.StorageError): class ClientStorageError(POSException.StorageError):
"""An error occured in the ZEO Client Storage""" """An error occured in the ZEO Client Storage."""
class UnrecognizedResult(ClientStorageError): class UnrecognizedResult(ClientStorageError):
"""A server call returned an unrecognized result""" """A server call returned an unrecognized result."""
class ClientDisconnected(ClientStorageError, Disconnected): class ClientDisconnected(ClientStorageError, Disconnected):
"""The database storage is disconnected from the storage.""" """The database storage is disconnected from the storage."""
def get_timestamp(prev_ts=None): def get_timestamp(prev_ts=None):
"""Internal helper to return a unique TimeStamp instance.
If the optional argument is not None, it must be a TimeStamp; the
return value is then guaranteed to be at least 1 microsecond later
the argument.
"""
t = time.time() t = time.time()
t = apply(TimeStamp, (time.gmtime(t)[:5] + (t % 60,))) t = apply(TimeStamp, (time.gmtime(t)[:5] + (t % 60,)))
if prev_ts is not None: if prev_ts is not None:
...@@ -61,15 +74,30 @@ def get_timestamp(prev_ts=None): ...@@ -61,15 +74,30 @@ def get_timestamp(prev_ts=None):
return t return t
class DisconnectedServerStub: class DisconnectedServerStub:
"""Raise ClientDisconnected on all attribute access.""" """Internal helper class used as a faux RPC stub when disconnected.
This raises ClientDisconnected on all attribute accesses.
This is a singleton class -- there should be only one instance,
the global disconnected_stub, os it can be tested by identity.
"""
def __getattr__(self, attr): def __getattr__(self, attr):
raise ClientDisconnected() raise ClientDisconnected()
# Singleton instance of DisconnectedServerStub
disconnected_stub = DisconnectedServerStub() disconnected_stub = DisconnectedServerStub()
class ClientStorage: class ClientStorage:
"""A Storage class that is a network client to a remote storage.
This is a faithful implementation of the Storage API.
This class is thread-safe; transactions are serialized in
tpc_begin().
"""
def __init__(self, addr, storage='1', cache_size=20000000, def __init__(self, addr, storage='1', cache_size=20000000,
name='', client=None, debug=0, var=None, name='', client=None, debug=0, var=None,
min_disconnect_poll=5, max_disconnect_poll=300, min_disconnect_poll=5, max_disconnect_poll=300,
...@@ -77,6 +105,67 @@ class ClientStorage: ...@@ -77,6 +105,67 @@ class ClientStorage:
wait=None, # defaults to 1 wait=None, # defaults to 1
read_only=0, read_only_fallback=0): read_only=0, read_only_fallback=0):
"""ClientStorage constructor.
This is typically invoked from a custom_zodb.py file.
All arguments except addr should be keyword arguments.
Arguments:
addr -- The server address(es). This is either a list of
addresses, or a single address. Each address can be a
(hostname, port) tuple to signify a TCP/IP connection, or
a pathname string to signify a Unix domain socket
connection. A hostname may be a DNS name or a dotted IP
address. Required.
storage -- The storage name, defaulting to '1'. This must
match one of the storage names supported by the server(s)
specified by the addr argument.
cache_size -- The disk cache size, defaulting to 20 megabytes.
This is passed to the ClientCache constructor.
name -- The storage name, defaulting to ''. If this is false,
str(addr) is used as the storage name.
client -- The client name, defaulting to None. If this is
false, the environment value ZEO_CLIENT is used. if the
effective value is true, the client cache is persistent.
See ClientCache for more info.
debug -- Ignored. This is present only for backwards
compatibility with ZEO 1.
var -- The 'var' directory, defaulting to None. This is
passed to the ClientCache constructor, which picks a
default if the value is None.
min_disconnect_poll -- The minimum delay in seconds between
attempts to connect to the server, in seconds. Defaults
to 5 seconds.
max_disconnect_poll -- The maximum delay in seconds between
attempts to connect to the server, in seconds. Defaults
to 300 seconds.
wait_for_server_on_startup -- A backwards compatible alias for
the wait argument.
wait -- A flag indicating whether to wait until a connection
with a server is made, defaulting to true.
read_only -- A flag indicating whether this should be a
read-only storage, defaulting to false (i.e. writing is
allowed by default).
read_only_fallback -- A flag indicating whether a read-only
remote storage should be acceptable as a fallback when no
writable storages are available. Defaults to false. At
most one of read_only and read_only_fallback should be
true.
"""
log2(INFO, "ClientStorage (pid=%d) created %s/%s for storage: %r" % log2(INFO, "ClientStorage (pid=%d) created %s/%s for storage: %r" %
(os.getpid(), (os.getpid(),
read_only and "RO" or "RW", read_only and "RO" or "RW",
...@@ -164,6 +253,7 @@ class ClientStorage: ...@@ -164,6 +253,7 @@ class ClientStorage:
self._cache.open() self._cache.open()
def close(self): def close(self):
"""Storage API: finalize the storage, releasing external resources."""
if self._tbuf is not None: if self._tbuf is not None:
self._tbuf.close() self._tbuf.close()
self._tbuf = None self._tbuf = None
...@@ -175,32 +265,49 @@ class ClientStorage: ...@@ -175,32 +265,49 @@ class ClientStorage:
self._rpc_mgr = None self._rpc_mgr = None
def registerDB(self, db, limit): def registerDB(self, db, limit):
"""Register that the storage is controlled by the given DB.""" """Storage API: register a database for invalidation messages.
# This is called by ZODB.DB (and by some tests).
# The storage isn't really ready to use until after this call. This is called by ZODB.DB (and by some tests).
The storage isn't really ready to use until after this call.
"""
log2(INFO, "registerDB(%s, %s)" % (repr(db), repr(limit))) log2(INFO, "registerDB(%s, %s)" % (repr(db), repr(limit)))
self._db = db self._db = db
def is_connected(self): def is_connected(self):
"""Return whether the storage is currently connected to a server."""
if self._server is disconnected_stub: if self._server is disconnected_stub:
return 0 return 0
else: else:
return 1 return 1
def sync(self): def sync(self):
"""Handle any pending invalidation messages.""" """Handle any pending invalidation messages.
This is called by the sync method in ZODB.Connection.
"""
self._server._update() self._server._update()
def testConnection(self, conn): def testConnection(self, conn):
"""Test a connection. """Internal: test the given connection.
This returns: 1 if the connection is an optimal match, This returns: 1 if the connection is an optimal match,
0 if it is a suboptimal but acceptable match 0 if it is a suboptimal but acceptable match.
It can also raise DisconnectedError or ReadOnlyError. It can also raise DisconnectedError or ReadOnlyError.
This is called by ConnectionManager to decide which connection This is called by ZEO.zrpc.ConnectionManager to decide which
to use in case there are multiple, and some are read-only and connection to use in case there are multiple, and some are
others are read-write. read-only and others are read-write.
This works by calling register() on the server. In read-only
mode, register() is called with the read_only flag set. In
writable mode and in read-only fallback mode, register() is
called with the read_only flag cleared. In read-only fallback
mode only, if the register() call raises ReadOnlyError, it is
retried with the read-only flag set, and if this succeeds,
this is deemed a suboptimal match. In all other cases, a
succeeding register() call is deemed an optimal match, and any
exception raised by register() is passed through.
""" """
log2(INFO, "Testing connection %r" % conn) log2(INFO, "Testing connection %r" % conn)
# XXX Check the protocol version here? # XXX Check the protocol version here?
...@@ -216,7 +323,7 @@ class ClientStorage: ...@@ -216,7 +323,7 @@ class ClientStorage:
return 0 return 0
def notifyConnected(self, conn): def notifyConnected(self, conn):
"""Start using the given connection. """Internal: start using the given connection.
This is called by ConnectionManager after it has decided which This is called by ConnectionManager after it has decided which
connection should be used. connection should be used.
...@@ -237,6 +344,7 @@ class ClientStorage: ...@@ -237,6 +344,7 @@ class ClientStorage:
self._server = stub self._server = stub
def verify_cache(self, server): def verify_cache(self, server):
"""Internal routine called to verify the cache."""
# XXX beginZeoVerify ends up calling back to beginVerify() below. # XXX beginZeoVerify ends up calling back to beginVerify() below.
# That whole exchange is rather unnecessary. # That whole exchange is rather unnecessary.
server.beginZeoVerify() server.beginZeoVerify()
...@@ -252,41 +360,69 @@ class ClientStorage: ...@@ -252,41 +360,69 @@ class ClientStorage:
### nothing to gain by getting the instance lock. ### nothing to gain by getting the instance lock.
def notifyDisconnected(self): def notifyDisconnected(self):
"""Internal: notify that the server connection was terminated.
This is called by ConnectionManager when the connection is
closed or when certain problems with the connection occur.
"""
log2(PROBLEM, "Disconnected from storage") log2(PROBLEM, "Disconnected from storage")
self._connection = None self._connection = None
self._server = disconnected_stub self._server = disconnected_stub
def __len__(self): def __len__(self):
"""Return the size of the storage."""
# XXX Where is this used?
return self._info['length'] return self._info['length']
def getName(self): def getName(self):
return "%s (%s)" % (self.__name__, "XXX") """Storage API: return the storage name as a string.
The return value consists of two parts: the name as determined
by the name and addr argments to the ClientStorage
constructor, and the string 'connected' or 'disconnected' in
parentheses indicating whether the storage is (currently)
connected.
"""
return "%s (%s)" % (
self.__name__,
self.is_connected() and "connected" or "disconnected")
def getSize(self): def getSize(self):
"""Storage API: an approximate size of the database, in bytes."""
return self._info['size'] return self._info['size']
def supportsUndo(self): def supportsUndo(self):
"""Storage API: return whether we support undo."""
return self._info['supportsUndo'] return self._info['supportsUndo']
def supportsVersions(self): def supportsVersions(self):
"""Storage API: return whether we support versions."""
return self._info['supportsVersions'] return self._info['supportsVersions']
def supportsTransactionalUndo(self): def supportsTransactionalUndo(self):
"""Storage API: return whether we support transactional undo."""
return self._info['supportsTransactionalUndo'] return self._info['supportsTransactionalUndo']
def isReadOnly(self): def isReadOnly(self):
"""Storage API: return whether we are in read-only mode.
XXX In read-only fallback mode, this returns false, even if we
are currently connected to a read-only server.
"""
return self._is_read_only return self._is_read_only
def _check_trans(self, trans): def _check_trans(self, trans):
"""Internal helper to check a transaction argument for sanity."""
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
if self._transaction is not trans: if self._transaction is not trans:
raise POSException.StorageTransactionError(self._transaction, raise POSException.StorageTransactionError(self._transaction,
trans) trans)
def abortVersion(self, src, transaction): def abortVersion(self, version, transaction):
"""Storage API: clear any changes made by the given version."""
self._check_trans(transaction) self._check_trans(transaction)
oids = self._server.abortVersion(src, self._serial) oids = self._server.abortVersion(version, self._serial)
# When a version aborts, invalidate the version and # When a version aborts, invalidate the version and
# non-version data. The non-version data should still be # non-version data. The non-version data should still be
# valid, but older versions of ZODB will change the # valid, but older versions of ZODB will change the
...@@ -300,26 +436,39 @@ class ClientStorage: ...@@ -300,26 +436,39 @@ class ClientStorage:
self._tbuf.invalidate(oid, '') self._tbuf.invalidate(oid, '')
return oids return oids
def commitVersion(self, src, dest, transaction): def commitVersion(self, source, destination, transaction):
"""Storage API: commit the source version in the destination."""
self._check_trans(transaction) self._check_trans(transaction)
oids = self._server.commitVersion(src, dest, self._serial) oids = self._server.commitVersion(source, destination, self._serial)
if dest: if destination:
# just invalidate our version data # just invalidate our version data
for oid in oids: for oid in oids:
self._tbuf.invalidate(oid, src) self._tbuf.invalidate(oid, source)
else: else:
# dest is '', so invalidate version and non-version # destination is '', so invalidate version and non-version
for oid in oids: for oid in oids:
self._tbuf.invalidate(oid, dest) self._tbuf.invalidate(oid, destination)
return oids return oids
def history(self, oid, version, length=1): def history(self, oid, version, length=1):
"""Storage API: return a sequence of HistoryEntry objects.
This does not support the optional filter argument defined by
the Storage API.
"""
return self._server.history(oid, version, length) return self._server.history(oid, version, length)
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
"""Storage API: load a historical revision of an object."""
return self._server.loadSerial(oid, serial) return self._server.loadSerial(oid, serial)
def load(self, oid, version, _stuff=None): def load(self, oid, version):
"""Storage API: return the data for a given object.
This returns the pickle data and serial number for the object
specified by the given object id and version, if they exist;
otherwise a KeyError is raised.
"""
p = self._cache.load(oid, version) p = self._cache.load(oid, version)
if p: if p:
return p return p
...@@ -336,12 +485,17 @@ class ClientStorage: ...@@ -336,12 +485,17 @@ class ClientStorage:
raise KeyError, oid # no non-version data for this raise KeyError, oid # no non-version data for this
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
"""Storage API: return the version, if any, that modfied an object.
If no version modified the object, return an empty string.
"""
v = self._cache.modifiedInVersion(oid) v = self._cache.modifiedInVersion(oid)
if v is not None: if v is not None:
return v return v
return self._server.modifiedInVersion(oid) return self._server.modifiedInVersion(oid)
def new_oid(self, last=None): def new_oid(self):
"""Storage API: return a new object identifier."""
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
# avoid multiple oid requests to server at the same time # avoid multiple oid requests to server at the same time
...@@ -354,7 +508,19 @@ class ClientStorage: ...@@ -354,7 +508,19 @@ class ClientStorage:
finally: finally:
self._oid_lock.release() self._oid_lock.release()
def pack(self, t=None, rf=None, wait=1, days=0): def pack(self, t=None, referencesf=None, wait=1, days=0):
"""Storage API: pack the storage.
Deviations from the Storage API: the referencesf argument is
ignored; two additional optional arguments wait and days are
provided:
wait -- a flag indicating whether to wait for the pack to
complete; defaults to true.
days -- a number of days to subtract from the pack time;
defaults to zero.
"""
# XXX Is it okay that read-only connections allow pack()? # XXX Is it okay that read-only connections allow pack()?
# rf argument ignored; server will provide it's own implementation # rf argument ignored; server will provide it's own implementation
if t is None: if t is None:
...@@ -363,6 +529,7 @@ class ClientStorage: ...@@ -363,6 +529,7 @@ class ClientStorage:
return self._server.pack(t, wait) return self._server.pack(t, wait)
def _check_serials(self): def _check_serials(self):
"""Internal helper to move data from _serials to _seriald."""
if self._serials: if self._serials:
l = len(self._serials) l = len(self._serials)
r = self._serials[:l] r = self._serials[:l]
...@@ -374,18 +541,21 @@ class ClientStorage: ...@@ -374,18 +541,21 @@ class ClientStorage:
return r return r
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
"""Storage API: store data for an object."""
self._check_trans(transaction) self._check_trans(transaction)
self._server.storea(oid, serial, data, version, self._serial) self._server.storea(oid, serial, data, version, self._serial)
self._tbuf.store(oid, version, data) self._tbuf.store(oid, version, data)
return self._check_serials() return self._check_serials()
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
"""Storage API: vote on a transaction."""
if transaction is not self._transaction: if transaction is not self._transaction:
return return
self._server.vote(self._serial) self._server.vote(self._serial)
return self._check_serials() return self._check_serials()
def tpc_begin(self, transaction, tid=None, status=' '): def tpc_begin(self, transaction, tid=None, status=' '):
"""Storage API: begin a transaction."""
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
self._tpc_cond.acquire() self._tpc_cond.acquire()
...@@ -424,6 +594,7 @@ class ClientStorage: ...@@ -424,6 +594,7 @@ class ClientStorage:
del self._serials[:] del self._serials[:]
def end_transaction(self): def end_transaction(self):
"""Internal helper to end a transaction."""
# the right way to set self._transaction to None # the right way to set self._transaction to None
# calls notify() on _tpc_cond in case there are waiting threads # calls notify() on _tpc_cond in case there are waiting threads
self._tpc_cond.acquire() self._tpc_cond.acquire()
...@@ -432,6 +603,7 @@ class ClientStorage: ...@@ -432,6 +603,7 @@ class ClientStorage:
self._tpc_cond.release() self._tpc_cond.release()
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
"""Storage API: abort a transaction."""
if transaction is not self._transaction: if transaction is not self._transaction:
return return
try: try:
...@@ -443,6 +615,7 @@ class ClientStorage: ...@@ -443,6 +615,7 @@ class ClientStorage:
self.end_transaction() self.end_transaction()
def tpc_finish(self, transaction, f=None): def tpc_finish(self, transaction, f=None):
"""Storage API: finish a transaction."""
if transaction is not self._transaction: if transaction is not self._transaction:
return return
try: try:
...@@ -459,8 +632,11 @@ class ClientStorage: ...@@ -459,8 +632,11 @@ class ClientStorage:
self.end_transaction() self.end_transaction()
def _update_cache(self): def _update_cache(self):
# Iterate over the objects in the transaction buffer and """Internal helper to handle objects modified by a transaction.
# update or invalidate the cache.
This iterates over the objects in the transaction buffer and
update or invalidate the cache.
"""
self._cache.checkSize(self._tbuf.get_size()) self._cache.checkSize(self._tbuf.get_size())
try: try:
self._tbuf.begin_iterate() self._tbuf.begin_iterate()
...@@ -489,6 +665,14 @@ class ClientStorage: ...@@ -489,6 +665,14 @@ class ClientStorage:
self._tbuf.clear() self._tbuf.clear()
def transactionalUndo(self, trans_id, trans): def transactionalUndo(self, trans_id, trans):
"""Storage API: undo a transaction.
This is executed in a transactional context. It has no effect
until the transaction is committed. It can be undone itself.
Zope uses this to implement undo unless it is not supported by
a storage.
"""
self._check_trans(trans) self._check_trans(trans)
oids = self._server.transactionalUndo(trans_id, self._serial) oids = self._server.transactionalUndo(trans_id, self._serial)
for oid in oids: for oid in oids:
...@@ -496,6 +680,7 @@ class ClientStorage: ...@@ -496,6 +680,7 @@ class ClientStorage:
return oids return oids
def undo(self, transaction_id): def undo(self, transaction_id):
"""Storage API: undo a transaction, writing directly to the storage."""
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
# XXX what are the sync issues here? # XXX what are the sync issues here?
...@@ -505,34 +690,45 @@ class ClientStorage: ...@@ -505,34 +690,45 @@ class ClientStorage:
return oids return oids
def undoInfo(self, first=0, last=-20, specification=None): def undoInfo(self, first=0, last=-20, specification=None):
"""Storage API: return undo information."""
return self._server.undoInfo(first, last, specification) return self._server.undoInfo(first, last, specification)
def undoLog(self, first=0, last=-20, filter=None): def undoLog(self, first=0, last=-20, filter=None):
"""Storage API: return a sequence of TransactionDescription objects."""
if filter is not None: if filter is not None:
return () # can't pass a filter to server return () # can't pass a filter to server
return self._server.undoLog(first, last) # Eek! return self._server.undoLog(first, last) # Eek!
def versionEmpty(self, version): def versionEmpty(self, version):
"""Storage API: return whether the version has no transactions."""
return self._server.versionEmpty(version) return self._server.versionEmpty(version)
def versions(self, max=None): def versions(self, max=None):
"""Storage API: return a sequence of versions in the storage."""
return self._server.versions(max) return self._server.versions(max)
# below are methods invoked by the StorageServer # Below are methods invoked by the StorageServer
def serialnos(self, args): def serialnos(self, args):
"""Server callback to pass a list of changed (oid, serial) pairs."""
self._serials.extend(args) self._serials.extend(args)
def info(self, dict): def info(self, dict):
"""Server callback to update the info dictionary."""
self._info.update(dict) self._info.update(dict)
def beginVerify(self): def beginVerify(self):
"""Server callback to signal start of cache validation."""
self._tfile = tempfile.TemporaryFile(suffix=".inv") self._tfile = tempfile.TemporaryFile(suffix=".inv")
self._pickler = cPickle.Pickler(self._tfile, 1) self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo self._pickler.fast = 1 # Don't use the memo
def invalidateVerify(self, args): def invalidateVerify(self, args):
"""Server callback to invalidate an (oid, version) pair.
This is called as part of cache validation.
"""
# Invalidation as result of verify_cache(). # Invalidation as result of verify_cache().
# Queue an invalidate for the end the verification procedure. # Queue an invalidate for the end the verification procedure.
if self._pickler is None: if self._pickler is None:
...@@ -541,6 +737,7 @@ class ClientStorage: ...@@ -541,6 +737,7 @@ class ClientStorage:
self._pickler.dump(args) self._pickler.dump(args)
def endVerify(self): def endVerify(self):
"""Server callback to signal end of cache validation."""
if self._pickler is None: if self._pickler is None:
return return
self._pickler.dump((0,0)) self._pickler.dump((0,0))
...@@ -558,7 +755,10 @@ class ClientStorage: ...@@ -558,7 +755,10 @@ class ClientStorage:
f.close() f.close()
def invalidateTrans(self, args): def invalidateTrans(self, args):
# Invalidation as a result of a transaction. """Server callback to invalidate a list of (oid, version) pairs.
This is called as the result of a transaction.
"""
for oid, version in args: for oid, version in args:
self._cache.invalidate(oid, version=version) self._cache.invalidate(oid, version=version)
try: try:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment