From 728f5faad42062ae86df4f60cbe0c4f81457e4f7 Mon Sep 17 00:00:00 2001 From: Tim Peters <tim.one@comcast.net> Date: Mon, 21 Mar 2005 17:20:13 +0000 Subject: [PATCH] Merge pycon-multidb branch (-r 29573:29605). This introduces a "multi-database" concept (a simplification of Jim's Wiki proposal), and adds many interface definitions. Work done during the PyCon 2005 ZODB sprint, by Christian Theune, Jim Fulton and Tim Peters. --- src/ZODB/BaseStorage.py | 6 + src/ZODB/Connection.py | 980 +++++++++++---------------- src/ZODB/DB.py | 18 +- src/ZODB/FileStorage/FileStorage.py | 1 + src/ZODB/FileStorage/format.py | 6 +- src/ZODB/TmpStore.py | 3 + src/ZODB/interfaces.py | 442 +++++++++++- src/ZODB/tests/multidb.txt | 146 ++++ src/ZODB/tests/testConnection.py | 2 + src/ZODB/tests/test_doctest_files.py | 4 +- src/persistent/interfaces.py | 23 +- src/transaction/interfaces.py | 262 +------ 12 files changed, 1072 insertions(+), 821 deletions(-) create mode 100644 src/ZODB/tests/multidb.txt diff --git a/src/ZODB/BaseStorage.py b/src/ZODB/BaseStorage.py index bf55bfbc..e090e219 100644 --- a/src/ZODB/BaseStorage.py +++ b/src/ZODB/BaseStorage.py @@ -252,6 +252,12 @@ class BaseStorage(UndoLogCompatible): pass def tpc_finish(self, transaction, f=None): + # It's important that the storage calls the function we pass + # while it still has its lock. We don't want another thread + # to be able to read any updated data until we've had a chance + # to send an invalidation message to all of the other + # connections! + self._lock_acquire() try: if transaction is not self._transaction: diff --git a/src/ZODB/Connection.py b/src/ZODB/Connection.py index f74da69e..b7c00537 100644 --- a/src/ZODB/Connection.py +++ b/src/ZODB/Connection.py @@ -23,6 +23,12 @@ from time import time from persistent import PickleCache +# interfaces +from persistent.interfaces import IPersistentDataManager +from ZODB.interfaces import IConnection +from transaction.interfaces import IDataManager +from zope.interface import implements + import transaction from ZODB.ConflictResolution import ResolvedSerial @@ -31,12 +37,9 @@ from ZODB.POSException \ import ConflictError, ReadConflictError, InvalidObjectReference, \ ConnectionStateError from ZODB.TmpStore import TmpStore -from ZODB.utils import u64, oid_repr, z64, positive_id from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr -from ZODB.interfaces import IConnection -from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36 - -from zope.interface import implements +from ZODB.utils import u64, oid_repr, z64, positive_id, \ + DEPRECATED_ARGUMENT, deprecated36 global_reset_counter = 0 @@ -54,127 +57,19 @@ def resetCaches(): global_reset_counter += 1 class Connection(ExportImport, object): - """Connection to ZODB for loading and storing objects. - - The Connection object serves as a data manager. The root() method - on a Connection returns the root object for the database. This - object and all objects reachable from it are associated with the - Connection that loaded them. When a transaction commits, it uses - the Connection to store modified objects. - - Typical use of ZODB is for each thread to have its own - Connection and that no thread should have more than one Connection - to the same database. A thread is associated with a Connection by - loading objects from that Connection. Objects loaded by one - thread should not be used by another thread. - - A Connection can be associated with a single version when it is - created. By default, a Connection is not associated with a - version; it uses non-version data. - - Each Connection provides an isolated, consistent view of the - database, by managing independent copies of objects in the - database. At transaction boundaries, these copies are updated to - reflect the current state of the database. - - You should not instantiate this class directly; instead call the - open() method of a DB instance. - - In many applications, root() is the only method of the Connection - that you will need to use. - - Synchronization - --------------- - - A Connection instance is not thread-safe. It is designed to - support a thread model where each thread has its own transaction. - If an application has more than one thread that uses the - connection or the transaction the connection is registered with, - the application should provide locking. - - The Connection manages movement of objects in and out of object - storage. - - TODO: We should document an intended API for using a Connection via - multiple threads. - - TODO: We should explain that the Connection has a cache and that - multiple calls to get() will return a reference to the same - object, provided that one of the earlier objects is still - referenced. Object identity is preserved within a connection, but - not across connections. - - TODO: Mention the database pool. - - A database connection always presents a consistent view of the - objects in the database, although it may not always present the - most current revision of any particular object. Modifications - made by concurrent transactions are not visible until the next - transaction boundary (abort or commit). - - Two options affect consistency. By default, the mvcc and synch - options are enabled by default. - - If you pass mvcc=True to db.open(), the Connection will never read - non-current revisions of an object. Instead it will raise a - ReadConflictError to indicate that the current revision is - unavailable because it was written after the current transaction - began. - - The logic for handling modifications assumes that the thread that - opened a Connection (called db.open()) is the thread that will use - the Connection. If this is not true, you should pass synch=False - to db.open(). When the synch option is disabled, some transaction - boundaries will be missed by the Connection; in particular, if a - transaction does not involve any modifications to objects loaded - from the Connection and synch is disabled, the Connection will - miss the transaction boundary. Two examples of this behavior are - db.undo() and read-only transactions. - - - :Groups: - - - `User Methods`: root, get, add, close, db, sync, isReadOnly, - cacheGC, cacheFullSweep, cacheMinimize, getVersion, - modifiedInVersion - - `Experimental Methods`: setLocalTransaction, getTransaction, - onCloseCallbacks - - `Transaction Data Manager Methods`: tpc_begin, tpc_vote, - tpc_finish, tpc_abort, sortKey, abort, commit, commit_sub, - abort_sub - - `Database Invalidation Methods`: invalidate, _setDB - - `IPersistentDataManager Methods`: setstate, register, - setklassstate - - `Other Methods`: oldstate, exchange, getDebugInfo, setDebugInfo, - getTransferCounts + """Connection to ZODB for loading and storing objects.""" - """ - implements(IConnection) + implements(IConnection, IDataManager, IPersistentDataManager) _tmp = None _code_timestamp = 0 + # ZODB.IConnection + def __init__(self, version='', cache_size=400, cache_deactivate_after=None, mvcc=True, txn_mgr=None, synch=True): - """Create a new Connection. - - A Connection instance should by instantiated by the DB - instance that it is connected to. - - :Parameters: - - `version`: the "version" that all changes will be made - in, defaults to no version. - - `cache_size`: the target size of the in-memory object - cache, measured in objects. - - `cache_deactivate_after`: deprecated, ignored - - `mvcc`: boolean indicating whether MVCC is enabled - - `txn_mgr`: transaction manager to use. None means - used the default transaction manager. - - `synch`: boolean indicating whether Connection should - register for afterCompletion() calls. - """ - + """Create a new Connection.""" self._log = logging.getLogger("ZODB.Connection") self._storage = None self._debug_info = () @@ -220,7 +115,7 @@ class Connection(ExportImport, object): # from a single transaction should be applied atomically, so # the lock must be held when reading _invalidated. - # It sucks that we have to hold the lock to read _invalidated. + # It sucks that we have to hold the lock to read _invalidated. # Normally, _invalidated is written by calling dict.update, which # will execute atomically by virtue of the GIL. But some storage # might generate oids where hash or compare invokes Python code. In @@ -253,79 +148,20 @@ class Connection(ExportImport, object): # to pass to _importDuringCommit(). self._import = None - def getTransaction(self): - """Get the current transaction for this connection. - - :deprecated: - - The transaction manager's get method works the same as this - method. You can pass a transaction manager (TM) to DB.open() - to control which TM the Connection uses. - """ - deprecated36("getTransaction() is deprecated. " - "Use the txn_mgr argument to DB.open() instead.") - return self._txn_mgr.get() - - def setLocalTransaction(self): - """Use a transaction bound to the connection rather than the thread. - - :deprecated: - - Returns the transaction manager used by the connection. You - can pass a transaction manager (TM) to DB.open() to control - which TM the Connection uses. - """ - deprecated36("setLocalTransaction() is deprecated. " - "Use the txn_mgr argument to DB.open() instead.") - if self._txn_mgr is transaction.manager: - if self._synch: - self._txn_mgr.unregisterSynch(self) - self._txn_mgr = transaction.TransactionManager() - if self._synch: - self._txn_mgr.registerSynch(self) - return self._txn_mgr - - def _cache_items(self): - # find all items on the lru list - items = self._cache.lru_items() - # fine everything. some on the lru list, some not - everything = self._cache.cache_data - # remove those items that are on the lru list - for k,v in items: - del everything[k] - # return a list of [ghosts....not recently used.....recently used] - return everything.items() + items + self.connections = None - def __repr__(self): - if self._version: - ver = ' (in version %s)' % `self._version` - else: - ver = '' - return '<Connection at %08x%s>' % (positive_id(self), ver) + def get_connection(self, database_name): + """Return a Connection for the named database.""" + connection = self.connections.get(database_name) + if connection is None: + new_con = self._db.databases[database_name].open() + self.connections.update(new_con.connections) + new_con.connections = self.connections + connection = new_con + return connection def get(self, oid): - """Return the persistent object with oid 'oid'. - - If the object was not in the cache and the object's class is - ghostable, then a ghost will be returned. If the object is - already in the cache, a reference to the cached object will be - returned. - - Applications seldom need to call this method, because objects - are loaded transparently during attribute lookup. - - :return: persistent object corresponding to `oid` - - :Parameters: - - `oid`: an object id - - :Exceptions: - - `KeyError`: if oid does not exist. It is possible that an - object does not exist as of the current transaction, but - existed in the past. It may even exist again in the - future, if the transaction that removed it is undone. - - `ConnectionStateError`: if the connection is closed. - """ + """Return the persistent object with oid 'oid'.""" if self._storage is None: raise ConnectionStateError("The database connection is closed") @@ -347,33 +183,8 @@ class Connection(ExportImport, object): self._cache[oid] = obj return obj - # deprecate this method? - __getitem__ = get - def add(self, obj): - """Add a new object 'obj' to the database and assign it an oid. - - A persistent object is normally added to the database and - assigned an oid when it becomes reachable to an object already in - the database. In some cases, it is useful to create a new - object and use its oid (_p_oid) in a single transaction. - - This method assigns a new oid regardless of whether the object - is reachable. - - The object is added when the transaction commits. The object - must implement the IPersistent interface and must not - already be associated with a Connection. - - :Parameters: - - `obj`: a Persistent object - - :Exceptions: - - `TypeError`: if obj is not a persistent object. - - `InvalidObjectReference`: if obj is already associated - with another connection. - - `ConnectionStateError`: if the connection is closed. - """ + """Add a new object 'obj' to the database and assign it an oid.""" if self._storage is None: raise ConnectionStateError("The database connection is closed") @@ -397,72 +208,11 @@ class Connection(ExportImport, object): raise InvalidObjectReference(obj, obj._p_jar) def sortKey(self): - # If two connections use the same storage, give them a - # consistent order using id(). This is unique for the - # lifetime of a connection, which is good enough. - return "%s:%s" % (self._sortKey(), id(self)) - - def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None): - """Register odb, the DB that this Connection uses. - - This method is called by the DB every time a Connection - is opened. Any invalidations received while the Connection - was closed will be processed. - - If the global module function resetCaches() was called, the - cache will be cleared. - - :Parameters: - - `odb`: database that owns the Connection - - `mvcc`: boolean indicating whether MVCC is enabled - - `txn_mgr`: transaction manager to use. None means - used the default transaction manager. - - `synch`: boolean indicating whether Connection should - register for afterCompletion() calls. - """ - - # TODO: Why do we go to all the trouble of setting _db and - # other attributes on open and clearing them on close? - # A Connection is only ever associated with a single DB - # and Storage. - - self._db = odb - self._storage = odb._storage - self._sortKey = odb._storage.sortKey - self.new_oid = odb._storage.new_oid - self._opened = time() - if synch is not None: - self._synch = synch - if mvcc is not None: - self._mvcc = mvcc - self._txn_mgr = txn_mgr or transaction.manager - if self._reset_counter != global_reset_counter: - # New code is in place. Start a new cache. - self._resetCache() - else: - self._flush_invalidations() - if self._synch: - self._txn_mgr.registerSynch(self) - self._reader = ConnectionObjectReader(self, self._cache, - self._db.classFactory) - - def _resetCache(self): - """Creates a new cache, discarding the old one. - - See the docstring for the resetCaches() function. - """ - self._reset_counter = global_reset_counter - self._invalidated.clear() - cache_size = self._cache.cache_size - self._cache = cache = PickleCache(self, cache_size) + """Return a consistent sort key for this connection.""" + return "%s:%s" % (self._storage.sortKey(), id(self)) def abort(self, transaction): - """Abort modifications to registered objects. - - This tells the cache to invalidate changed objects. _p_jar - and _p_oid are deleted from new objects. - """ - + """Abort a transaction and forget all changes.""" for obj in self._registered_objects: oid = obj._p_oid assert oid is not None @@ -475,70 +225,22 @@ class Connection(ExportImport, object): self._tpc_cleanup() - # Should there be a way to call incrgc directly? - # Perhaps "full sweep" should do that? - - # TODO: we should test what happens when these methods are called - # mid-transaction. - - def cacheFullSweep(self, dt=None): - deprecated36("cacheFullSweep is deprecated. " - "Use cacheMinimize instead.") - if dt is None: - self._cache.full_sweep() - else: - self._cache.full_sweep(dt) - - def cacheMinimize(self, dt=DEPRECATED_ARGUMENT): - """Deactivate all unmodified objects in the cache. - - Call _p_deactivate() on each cached object, attempting to turn - it into a ghost. It is possible for individual objects to - remain active. - - :Parameters: - - `dt`: ignored. It is provided only for backwards compatibility. - """ - if dt is not DEPRECATED_ARGUMENT: - deprecated36("cacheMinimize() dt= is ignored.") - self._cache.minimize() + # TODO: we should test what happens when cacheGC is called mid-transaction. def cacheGC(self): - """Reduce cache size to target size. - - Call _p_deactivate() on cached objects until the cache size - falls under the target size. - """ + """Reduce cache size to target size.""" self._cache.incrgc() __onCloseCallbacks = None def onCloseCallback(self, f): - """Register a callable, f, to be called by close(). - - The callable, f, will be called at most once, the next time - the Connection is closed. - - :Parameters: - - `f`: object that will be called on `close` - """ + """Register a callable, f, to be called by close().""" if self.__onCloseCallbacks is None: self.__onCloseCallbacks = [] self.__onCloseCallbacks.append(f) def close(self): - """Close the Connection. - - A closed Connection should not be used by client code. It - can't load or store objects. Objects in the cache are not - freed, because Connections are re-used and the cache are - expected to be useful to the next client. - - When the Connection is closed, all callbacks registered by - onCloseCallback() are invoked and the cache is scanned for - old objects. - """ - + """Close the Connection.""" if not self._needs_to_join: # We're currently joined to a transaction. raise ConnectionStateError("Cannot close a connection joined to " @@ -575,7 +277,10 @@ class Connection(ExportImport, object): # assert that here, because self may have been reused (by # another thread) by the time we get back here. + # transaction.interfaces.IDataManager + def commit(self, transaction): + """Commit changes to an object""" if self._import: # TODO: This code seems important for Zope, but needs docs # to explain why. @@ -653,7 +358,8 @@ class Connection(ExportImport, object): self._handle_serial(s, oid) def commit_sub(self, t): - """Commit all work done in all subtransactions for this transaction.""" + """Commit all changes made in subtransactions and begin 2-phase commit + """ if self._tmp is None: return src = self._storage @@ -674,7 +380,7 @@ class Connection(ExportImport, object): self._handle_serial(s, oid, change=False) def abort_sub(self, t): - """Abort work done in all subtransactions for this transaction.""" + """Discard all subtransaction data.""" if self._tmp is None: return src = self._storage @@ -685,7 +391,7 @@ class Connection(ExportImport, object): self._invalidate_creating(src._creating) def _invalidate_creating(self, creating=None): - """Dissown any objects newly saved in an uncommitted transaction.""" + """Disown any objects newly saved in an uncommitted transaction.""" if creating is None: creating = self._creating self._creating = [] @@ -697,42 +403,6 @@ class Connection(ExportImport, object): del o._p_jar del o._p_oid - def db(self): - return self._db - - def getVersion(self): - if self._storage is None: - raise ConnectionStateError("The database connection is closed") - return self._version - - def isReadOnly(self): - if self._storage is None: - raise ConnectionStateError("The database connection is closed") - return self._storage.isReadOnly() - - def invalidate(self, tid, oids): - """Notify the Connection that transaction 'tid' invalidated oids. - - When the next transaction boundary is reached, objects will be - invalidated. If any of the invalidated objects is accessed by - the current transaction, the revision written before C{tid} - will be used. - - The DB calls this method, even when the Connection is closed. - - :Parameters: - - `tid`: the storage-level id of the transaction that committed - - `oids`: oids is a set of oids, represented as a dict with oids - as keys. - """ - self._inv_lock.acquire() - try: - if self._txn_time is None: - self._txn_time = tid - self._invalidated.update(oids) - finally: - self._inv_lock.release() - # The next two methods are callbacks for transaction synchronization. def beforeCompletion(self, txn): @@ -753,213 +423,35 @@ class Connection(ExportImport, object): # Now is a good time to collect some garbage self._cache.incrgc() - def modifiedInVersion(self, oid): - try: - return self._db.modifiedInVersion(oid) - except KeyError: - return self._version - - def register(self, obj): - """Register obj with the current transaction manager. - - A subclass could override this method to customize the default - policy of one transaction manager for each thread. - - obj must be an object loaded from this Connection. - """ - assert obj._p_jar is self - if obj._p_oid is None: - # There is some old Zope code that assigns _p_jar - # directly. That is no longer allowed, but we need to - # provide support for old code that still does it. - - # The actual complaint here is that an object without - # an oid is being registered. I can't think of any way to - # achieve that without assignment to _p_jar. If there is - # a way, this will be a very confusing warning. - deprecated36("Assigning to _p_jar is deprecated, and will be " - "changed to raise an exception.") - elif obj._p_oid in self._added: - # It was registered before it was added to _added. - return - self._register(obj) - - def _register(self, obj=None): - if obj is not None: - self._registered_objects.append(obj) - if self._needs_to_join: - self._txn_mgr.get().join(self) - self._needs_to_join = False - def root(self): - """Return the database root object. - - The root is a persistent.mapping.PersistentMapping. - """ + """Return the database root object.""" return self.get(z64) - def setstate(self, obj): - oid = obj._p_oid + def db(self): + """Returns a handle to the database this connection belongs to.""" + return self._db + def isReadOnly(self): + """Returns True if the storage for this connection is read only.""" if self._storage is None: - msg = ("Shouldn't load state for %s " - "when the connection is closed" % oid_repr(oid)) - self._log.error(msg) - raise ConnectionStateError(msg) + raise ConnectionStateError("The database connection is closed") + return self._storage.isReadOnly() + def invalidate(self, tid, oids): + """Notify the Connection that transaction 'tid' invalidated oids.""" + self._inv_lock.acquire() try: - self._setstate(obj) - except ConflictError: - raise - except: - self._log.error("Couldn't load state for %s", oid_repr(oid), - exc_info=sys.exc_info()) - raise - - def _setstate(self, obj): - # Helper for setstate(), which provides logging of failures. - - # The control flow is complicated here to avoid loading an - # object revision that we are sure we aren't going to use. As - # a result, invalidation tests occur before and after the - # load. We can only be sure about invalidations after the - # load. + if self._txn_time is None: + self._txn_time = tid + self._invalidated.update(oids) + finally: + self._inv_lock.release() - # If an object has been invalidated, there are several cases - # to consider: - # 1. Check _p_independent() - # 2. Try MVCC - # 3. Raise ConflictError. + # IDataManager - # Does anything actually use _p_independent()? It would simplify - # the code if we could drop support for it. - - # There is a harmless data race with self._invalidated. A - # dict update could go on in another thread, but we don't care - # because we have to check again after the load anyway. - - if (obj._p_oid in self._invalidated - and not myhasattr(obj, "_p_independent")): - # If the object has _p_independent(), we will handle it below. - self._load_before_or_conflict(obj) - return - - p, serial = self._storage.load(obj._p_oid, self._version) - self._load_count += 1 - - self._inv_lock.acquire() - try: - invalid = obj._p_oid in self._invalidated - finally: - self._inv_lock.release() - - if invalid: - if myhasattr(obj, "_p_independent"): - # This call will raise a ReadConflictError if something - # goes wrong - self._handle_independent(obj) - else: - self._load_before_or_conflict(obj) - return - - self._reader.setGhostState(obj, p) - obj._p_serial = serial - - def _load_before_or_conflict(self, obj): - """Load non-current state for obj or raise ReadConflictError.""" - - if not (self._mvcc and self._setstate_noncurrent(obj)): - self._register(obj) - self._conflicts[obj._p_oid] = True - raise ReadConflictError(object=obj) - - def _setstate_noncurrent(self, obj): - """Set state using non-current data. - - Return True if state was available, False if not. - """ - try: - # Load data that was current before the commit at txn_time. - t = self._storage.loadBefore(obj._p_oid, self._txn_time) - except KeyError: - return False - if t is None: - return False - data, start, end = t - # The non-current transaction must have been written before - # txn_time. It must be current at txn_time, but could have - # been modified at txn_time. - - assert start < self._txn_time, (u64(start), u64(self._txn_time)) - assert end is not None - assert self._txn_time <= end, (u64(self._txn_time), u64(end)) - self._reader.setGhostState(obj, data) - obj._p_serial = start - return True - - def _handle_independent(self, obj): - # Helper method for setstate() handles possibly independent objects - # Call _p_independent(), if it returns True, setstate() wins. - # Otherwise, raise a ConflictError. - - if obj._p_independent(): - self._inv_lock.acquire() - try: - try: - del self._invalidated[obj._p_oid] - except KeyError: - pass - finally: - self._inv_lock.release() - else: - self._conflicts[obj._p_oid] = 1 - self._register(obj) - raise ReadConflictError(object=obj) - - def oldstate(self, obj, tid): - """Return copy of obj that was written by tid. - - The returned object does not have the typical metadata - (_p_jar, _p_oid, _p_serial) set. I'm not sure how references - to other peristent objects are handled. - - :return: a persistent object - - :Parameters: - - `obj`: a persistent object from this Connection. - - `tid`: id of a transaction that wrote an earlier revision. - - :Exceptions: - - `KeyError`: if tid does not exist or if tid deleted a revision - of obj. - """ - assert obj._p_jar is self - p = self._storage.loadSerial(obj._p_oid, tid) - return self._reader.getState(p) - - def setklassstate(self, obj): - # Special case code to handle ZClasses, I think. - # Called the cache when an object of type type is invalidated. - try: - oid = obj._p_oid - p, serial = self._storage.load(oid, self._version) - - # We call getGhost(), but we actually get a non-ghost back. - # The object is a class, which can't actually be ghosted. - copy = self._reader.getGhost(p) - obj.__dict__.clear() - obj.__dict__.update(copy.__dict__) - - obj._p_oid = oid - obj._p_jar = self - obj._p_changed = 0 - obj._p_serial = serial - except: - self._log.error("setklassstate failed", exc_info=sys.exc_info()) - raise - - def tpc_begin(self, transaction, sub=False): - self._modified = [] + def tpc_begin(self, transaction, sub=False): + """Begin commit of a transaction, starting the two-phase commit.""" + self._modified = [] # _creating is a list of oids of new objects, which is used to # remove them from the cache if a transaction aborts. @@ -972,6 +464,7 @@ class Connection(ExportImport, object): self._storage.tpc_begin(transaction) def tpc_vote(self, transaction): + """Verify that a data manager can commit the transaction.""" try: vote = self._storage.tpc_vote except AttributeError: @@ -1022,12 +515,7 @@ class Connection(ExportImport, object): obj._p_serial = serial def tpc_finish(self, transaction): - # It's important that the storage calls the function we pass - # while it still has its lock. We don't want another thread - # to be able to read any updated data until we've had a chance - # to send an invalidation message to all of the other - # connections! - + """Indicate confirmation that the transaction is done.""" if self._tmp is not None: # Commiting a subtransaction! # There is no need to invalidate anything. @@ -1044,6 +532,7 @@ class Connection(ExportImport, object): self._tpc_cleanup() def tpc_abort(self, transaction): + """Abort a transaction.""" if self._import: self._import = None self._storage.tpc_abort(transaction) @@ -1055,16 +544,16 @@ class Connection(ExportImport, object): del obj._p_jar self._tpc_cleanup() - # Common cleanup actions after tpc_finish/tpc_abort. def _tpc_cleanup(self): + """Performs cleanup operations to support tpc_finish and tpc_abort.""" self._conflicts.clear() if not self._synch: self._flush_invalidations() self._needs_to_join = True self._registered_objects = [] - def sync(self): + """Manually update the view on the database.""" self._txn_mgr.get().abort() sync = getattr(self._storage, 'sync', 0) if sync: @@ -1072,22 +561,304 @@ class Connection(ExportImport, object): self._flush_invalidations() def getDebugInfo(self): + """Returns a tuple with different items for debugging the + connection. + """ return self._debug_info def setDebugInfo(self, *args): + """Add the given items to the debug information of this connection.""" self._debug_info = self._debug_info + args def getTransferCounts(self, clear=False): - """Returns the number of objects loaded and stored. - - If clear is True, reset the counters. - """ + """Returns the number of objects loaded and stored.""" res = self._load_count, self._store_count if clear: self._load_count = 0 self._store_count = 0 return res + ############################################## + # persistent.interfaces.IPersistentDatamanager + + def oldstate(self, obj, tid): + """Return copy of 'obj' that was written by transaction 'tid'.""" + assert obj._p_jar is self + p = self._storage.loadSerial(obj._p_oid, tid) + return self._reader.getState(p) + + def setstate(self, obj): + """Turns the ghost 'obj' into a real object by loading it's from the + database.""" + oid = obj._p_oid + + if self._storage is None: + msg = ("Shouldn't load state for %s " + "when the connection is closed" % oid_repr(oid)) + self._log.error(msg) + raise ConnectionStateError(msg) + + try: + self._setstate(obj) + except ConflictError: + raise + except: + self._log.error("Couldn't load state for %s", oid_repr(oid), + exc_info=sys.exc_info()) + raise + + def _setstate(self, obj): + # Helper for setstate(), which provides logging of failures. + + # The control flow is complicated here to avoid loading an + # object revision that we are sure we aren't going to use. As + # a result, invalidation tests occur before and after the + # load. We can only be sure about invalidations after the + # load. + + # If an object has been invalidated, there are several cases + # to consider: + # 1. Check _p_independent() + # 2. Try MVCC + # 3. Raise ConflictError. + + # Does anything actually use _p_independent()? It would simplify + # the code if we could drop support for it. + + # There is a harmless data race with self._invalidated. A + # dict update could go on in another thread, but we don't care + # because we have to check again after the load anyway. + + if (obj._p_oid in self._invalidated + and not myhasattr(obj, "_p_independent")): + # If the object has _p_independent(), we will handle it below. + self._load_before_or_conflict(obj) + return + + p, serial = self._storage.load(obj._p_oid, self._version) + self._load_count += 1 + + self._inv_lock.acquire() + try: + invalid = obj._p_oid in self._invalidated + finally: + self._inv_lock.release() + + if invalid: + if myhasattr(obj, "_p_independent"): + # This call will raise a ReadConflictError if something + # goes wrong + self._handle_independent(obj) + else: + self._load_before_or_conflict(obj) + return + + self._reader.setGhostState(obj, p) + obj._p_serial = serial + + def _load_before_or_conflict(self, obj): + """Load non-current state for obj or raise ReadConflictError.""" + if not (self._mvcc and self._setstate_noncurrent(obj)): + self._register(obj) + self._conflicts[obj._p_oid] = True + raise ReadConflictError(object=obj) + + def _setstate_noncurrent(self, obj): + """Set state using non-current data. + + Return True if state was available, False if not. + """ + try: + # Load data that was current before the commit at txn_time. + t = self._storage.loadBefore(obj._p_oid, self._txn_time) + except KeyError: + return False + if t is None: + return False + data, start, end = t + # The non-current transaction must have been written before + # txn_time. It must be current at txn_time, but could have + # been modified at txn_time. + + assert start < self._txn_time, (u64(start), u64(self._txn_time)) + assert end is not None + assert self._txn_time <= end, (u64(self._txn_time), u64(end)) + self._reader.setGhostState(obj, data) + obj._p_serial = start + return True + + def _handle_independent(self, obj): + # Helper method for setstate() handles possibly independent objects + # Call _p_independent(), if it returns True, setstate() wins. + # Otherwise, raise a ConflictError. + + if obj._p_independent(): + self._inv_lock.acquire() + try: + try: + del self._invalidated[obj._p_oid] + except KeyError: + pass + finally: + self._inv_lock.release() + else: + self._conflicts[obj._p_oid] = 1 + self._register(obj) + raise ReadConflictError(object=obj) + + def register(self, obj): + """Register obj with the current transaction manager. + + A subclass could override this method to customize the default + policy of one transaction manager for each thread. + + obj must be an object loaded from this Connection. + """ + assert obj._p_jar is self + if obj._p_oid is None: + # There is some old Zope code that assigns _p_jar + # directly. That is no longer allowed, but we need to + # provide support for old code that still does it. + + # The actual complaint here is that an object without + # an oid is being registered. I can't think of any way to + # achieve that without assignment to _p_jar. If there is + # a way, this will be a very confusing warning. + deprecated36("Assigning to _p_jar is deprecated, and will be " + "changed to raise an exception.") + elif obj._p_oid in self._added: + # It was registered before it was added to _added. + return + self._register(obj) + + def _register(self, obj=None): + if obj is not None: + self._registered_objects.append(obj) + if self._needs_to_join: + self._txn_mgr.get().join(self) + self._needs_to_join = False + + # PROTECTED stuff (used by e.g. ZODB.DB.DB) + + def _cache_items(self): + # find all items on the lru list + items = self._cache.lru_items() + # fine everything. some on the lru list, some not + everything = self._cache.cache_data + # remove those items that are on the lru list + for k,v in items: + del everything[k] + # return a list of [ghosts....not recently used.....recently used] + return everything.items() + items + + def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None): + """Register odb, the DB that this Connection uses. + + This method is called by the DB every time a Connection + is opened. Any invalidations received while the Connection + was closed will be processed. + + If the global module function resetCaches() was called, the + cache will be cleared. + + Parameters: + odb: database that owns the Connection + mvcc: boolean indicating whether MVCC is enabled + txn_mgr: transaction manager to use. None means + used the default transaction manager. + synch: boolean indicating whether Connection should + register for afterCompletion() calls. + """ + + # TODO: Why do we go to all the trouble of setting _db and + # other attributes on open and clearing them on close? + # A Connection is only ever associated with a single DB + # and Storage. + + self._db = odb + self._storage = odb._storage + self.new_oid = odb._storage.new_oid + self._opened = time() + if synch is not None: + self._synch = synch + if mvcc is not None: + self._mvcc = mvcc + self._txn_mgr = txn_mgr or transaction.manager + if self._reset_counter != global_reset_counter: + # New code is in place. Start a new cache. + self._resetCache() + else: + self._flush_invalidations() + if self._synch: + self._txn_mgr.registerSynch(self) + self._reader = ConnectionObjectReader(self, self._cache, + self._db.classFactory) + + # Multi-database support + self.connections = {self._db.database_name: self} + + def _resetCache(self): + """Creates a new cache, discarding the old one. + + See the docstring for the resetCaches() function. + """ + self._reset_counter = global_reset_counter + self._invalidated.clear() + cache_size = self._cache.cache_size + self._cache = cache = PickleCache(self, cache_size) + + # Python protocol + + def __repr__(self): + if self._version: + ver = ' (in version %s)' % `self._version` + else: + ver = '' + return '<Connection at %08x%s>' % (positive_id(self), ver) + + # DEPRECATION candidates + + __getitem__ = get + + def modifiedInVersion(self, oid): + """Returns the version the object with the given oid was modified in. + + If it wasn't modified in a version, the current version of this + connection is returned. + """ + try: + return self._db.modifiedInVersion(oid) + except KeyError: + import pdb; pdb.set_trace() + return self.getVersion() + + def getVersion(self): + """Returns the version this connection is attached to.""" + if self._storage is None: + raise ConnectionStateError("The database connection is closed") + return self._version + + def setklassstate(self, obj): + # Special case code to handle ZClasses, I think. + # Called the cache when an object of type type is invalidated. + try: + oid = obj._p_oid + p, serial = self._storage.load(oid, self._version) + + # We call getGhost(), but we actually get a non-ghost back. + # The object is a class, which can't actually be ghosted. + copy = self._reader.getGhost(p) + obj.__dict__.clear() + obj.__dict__.update(copy.__dict__) + + obj._p_oid = oid + obj._p_jar = self + obj._p_changed = 0 + obj._p_serial = serial + except: + self._log.error("setklassstate failed", exc_info=sys.exc_info()) + raise + def exchange(self, old, new): # called by a ZClasses method that isn't executed by the test suite oid = old._p_oid @@ -1096,3 +867,52 @@ class Connection(ExportImport, object): new._p_changed = 1 self._register(new) self._cache[oid] = new + + # DEPRECATED methods + + def getTransaction(self): + """Get the current transaction for this connection. + + :deprecated: + + The transaction manager's get method works the same as this + method. You can pass a transaction manager (TM) to DB.open() + to control which TM the Connection uses. + """ + deprecated36("getTransaction() is deprecated. " + "Use the txn_mgr argument to DB.open() instead.") + return self._txn_mgr.get() + + def setLocalTransaction(self): + """Use a transaction bound to the connection rather than the thread. + + :deprecated: + + Returns the transaction manager used by the connection. You + can pass a transaction manager (TM) to DB.open() to control + which TM the Connection uses. + """ + deprecated36("setLocalTransaction() is deprecated. " + "Use the txn_mgr argument to DB.open() instead.") + if self._txn_mgr is transaction.manager: + if self._synch: + self._txn_mgr.unregisterSynch(self) + self._txn_mgr = transaction.TransactionManager() + if self._synch: + self._txn_mgr.registerSynch(self) + return self._txn_mgr + + def cacheFullSweep(self, dt=None): + deprecated36("cacheFullSweep is deprecated. " + "Use cacheMinimize instead.") + if dt is None: + self._cache.full_sweep() + else: + self._cache.full_sweep(dt) + + def cacheMinimize(self, dt=DEPRECATED_ARGUMENT): + """Deactivate all unmodified objects in the cache.""" + if dt is not DEPRECATED_ARGUMENT: + deprecated36("cacheMinimize() dt= is ignored.") + self._cache.minimize() + diff --git a/src/ZODB/DB.py b/src/ZODB/DB.py index 7a2eb9f8..b5fb360b 100644 --- a/src/ZODB/DB.py +++ b/src/ZODB/DB.py @@ -27,6 +27,9 @@ from ZODB.serialize import referencesf from ZODB.utils import WeakSet from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36 +from zope.interface import implements +from ZODB.interfaces import IDatabase + import transaction logger = logging.getLogger('ZODB.DB') @@ -178,6 +181,7 @@ class DB(object): setCacheDeactivateAfter, getVersionCacheDeactivateAfter, setVersionCacheDeactivateAfter """ + implements(IDatabase) klass = Connection # Class to use for connections _activity_monitor = None @@ -188,6 +192,8 @@ class DB(object): cache_deactivate_after=DEPRECATED_ARGUMENT, version_pool_size=3, version_cache_size=100, + database_name='unnamed', + databases=None, version_cache_deactivate_after=DEPRECATED_ARGUMENT, ): """Create an object database. @@ -248,6 +254,16 @@ class DB(object): storage.tpc_vote(t) storage.tpc_finish(t) + # Multi-database setup. + if databases is None: + databases = {} + self.databases = databases + self.database_name = database_name + if database_name in databases: + raise ValueError("database_name %r already in databases" % + database_name) + databases[database_name] = self + # Pass through methods: for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog', 'versionEmpty', 'versions']: @@ -565,7 +581,7 @@ class DB(object): def get_info(c): # `result`, `time` and `version` are lexically inherited. o = c._opened - d = c._debug_info + d = c.getDebugInfo() if d: if len(d) == 1: d = d[0] diff --git a/src/ZODB/FileStorage/FileStorage.py b/src/ZODB/FileStorage/FileStorage.py index 4e592734..89a3585e 100644 --- a/src/ZODB/FileStorage/FileStorage.py +++ b/src/ZODB/FileStorage/FileStorage.py @@ -547,6 +547,7 @@ class FileStorage(BaseStorage.BaseStorage, self._lock_release() def load(self, oid, version): + """Return pickle data and serial number.""" self._lock_acquire() try: pos = self._lookup_pos(oid) diff --git a/src/ZODB/FileStorage/format.py b/src/ZODB/FileStorage/format.py index 9c124be9..5f65d983 100644 --- a/src/ZODB/FileStorage/format.py +++ b/src/ZODB/FileStorage/format.py @@ -68,16 +68,16 @@ # # - 8-byte data length # -# ? 8-byte position of non-version data +# ? 8-byte position of non-version data record # (if version length > 0) # # ? 8-byte position of previous record in this version # (if version length > 0) # -# ? version string +# ? version string # (if version length > 0) # -# ? data +# ? data # (data length > 0) # # ? 8-byte position of data record containing data diff --git a/src/ZODB/TmpStore.py b/src/ZODB/TmpStore.py index d2ea2612..3e0f1633 100644 --- a/src/ZODB/TmpStore.py +++ b/src/ZODB/TmpStore.py @@ -61,6 +61,9 @@ class TmpStore: serial = h[:8] return self._file.read(size), serial + def sortKey(self): + return self._storage.sortKey() + # TODO: clarify difference between self._storage & self._db._storage def modifiedInVersion(self, oid): diff --git a/src/ZODB/interfaces.py b/src/ZODB/interfaces.py index e6495fbe..eb8290d3 100644 --- a/src/ZODB/interfaces.py +++ b/src/ZODB/interfaces.py @@ -16,14 +16,122 @@ $Id$ """ -import zope.interface +from zope.interface import Interface, Attribute -class IConnection(zope.interface.Interface): - """ZODB connection. +class IConnection(Interface): + """Connection to ZODB for loading and storing objects. - TODO: This interface is incomplete. + The Connection object serves as a data manager. The root() method + on a Connection returns the root object for the database. This + object and all objects reachable from it are associated with the + Connection that loaded them. When a transaction commits, it uses + the Connection to store modified objects. + + Typical use of ZODB is for each thread to have its own + Connection and that no thread should have more than one Connection + to the same database. A thread is associated with a Connection by + loading objects from that Connection. Objects loaded by one + thread should not be used by another thread. + + A Connection can be associated with a single version when it is + created. By default, a Connection is not associated with a + version; it uses non-version data. + + Each Connection provides an isolated, consistent view of the + database, by managing independent copies of objects in the + database. At transaction boundaries, these copies are updated to + reflect the current state of the database. + + You should not instantiate this class directly; instead call the + open() method of a DB instance. + + In many applications, root() is the only method of the Connection + that you will need to use. + + Synchronization + --------------- + + A Connection instance is not thread-safe. It is designed to + support a thread model where each thread has its own transaction. + If an application has more than one thread that uses the + connection or the transaction the connection is registered with, + the application should provide locking. + + The Connection manages movement of objects in and out of object + storage. + + TODO: We should document an intended API for using a Connection via + multiple threads. + + TODO: We should explain that the Connection has a cache and that + multiple calls to get() will return a reference to the same + object, provided that one of the earlier objects is still + referenced. Object identity is preserved within a connection, but + not across connections. + + TODO: Mention the database pool. + + A database connection always presents a consistent view of the + objects in the database, although it may not always present the + most current revision of any particular object. Modifications + made by concurrent transactions are not visible until the next + transaction boundary (abort or commit). + + Two options affect consistency. By default, the mvcc and synch + options are enabled by default. + + If you pass mvcc=True to db.open(), the Connection will never read + non-current revisions of an object. Instead it will raise a + ReadConflictError to indicate that the current revision is + unavailable because it was written after the current transaction + began. + + The logic for handling modifications assumes that the thread that + opened a Connection (called db.open()) is the thread that will use + the Connection. If this is not true, you should pass synch=False + to db.open(). When the synch option is disabled, some transaction + boundaries will be missed by the Connection; in particular, if a + transaction does not involve any modifications to objects loaded + from the Connection and synch is disabled, the Connection will + miss the transaction boundary. Two examples of this behavior are + db.undo() and read-only transactions. + + Groups of methods: + + User Methods: + root, get, add, close, db, sync, isReadOnly, cacheGC, cacheFullSweep, + cacheMinimize, getVersion, modifiedInVersion + + Experimental Methods: + onCloseCallbacks + + Database Invalidation Methods: + invalidate + + Other Methods: exchange, getDebugInfo, setDebugInfo, + getTransferCounts """ + def __init__(version='', cache_size=400, + cache_deactivate_after=None, mvcc=True, txn_mgr=None, + synch=True): + """Create a new Connection. + + A Connection instance should by instantiated by the DB + instance that it is connected to. + + Parameters: + version: the "version" that all changes will be made in, defaults + to no version. + cache_size: the target size of the in-memory object cache, measured + in objects. + mvcc: boolean indicating whether MVCC is enabled + txn_mgr: transaction manager to use. None means used the default + transaction manager. + synch: boolean indicating whether Connection should register for + afterCompletion() calls. + """ + def add(ob): """Add a new object 'obj' to the database and assign it an oid. @@ -38,4 +146,330 @@ class IConnection(zope.interface.Interface): The object is added when the transaction commits. The object must implement the IPersistent interface and must not already be associated with a Connection. + + Parameters: + obj: a Persistent object + + Raises TypeError if obj is not a persistent object. + + Raises InvalidObjectReference if obj is already associated with another + connection. + + Raises ConnectionStateError if the connection is closed. + """ + + def get(oid): + """Return the persistent object with oid 'oid'. + + If the object was not in the cache and the object's class is + ghostable, then a ghost will be returned. If the object is + already in the cache, a reference to the cached object will be + returned. + + Applications seldom need to call this method, because objects + are loaded transparently during attribute lookup. + + Parameters: + oid: an object id + + Raises KeyError if oid does not exist. + + It is possible that an object does not exist as of the current + transaction, but existed in the past. It may even exist again in + the future, if the transaction that removed it is undone. + + Raises ConnectionStateError if the connection is closed. + """ + + def cacheMinimize(): + """Deactivate all unmodified objects in the cache. + + Call _p_deactivate() on each cached object, attempting to turn + it into a ghost. It is possible for individual objects to + remain active. + """ + + def cacheGC(): + """Reduce cache size to target size. + + Call _p_deactivate() on cached objects until the cache size + falls under the target size. + """ + + def onCloseCallback(f): + """Register a callable, f, to be called by close(). + + f will be called with no arguments before the Connection is closed. + + Parameters: + f: method that will be called on `close` + """ + + def close(): + """Close the Connection. + + When the Connection is closed, all callbacks registered by + onCloseCallback() are invoked and the cache is garbage collected. + + A closed Connection should not be used by client code. It can't load + or store objects. Objects in the cache are not freed, because + Connections are re-used and the cache is expected to be useful to the + next client. + """ + + def db(): + """Returns a handle to the database this connection belongs to.""" + + def isReadOnly(): + """Returns True if the storage for this connection is read only.""" + + def invalidate(tid, oids): + """Notify the Connection that transaction 'tid' invalidated oids. + + When the next transaction boundary is reached, objects will be + invalidated. If any of the invalidated objects are accessed by the + current transaction, the revision written before Connection.tid will be + used. + + The DB calls this method, even when the Connection is closed. + + Parameters: + tid: the storage-level id of the transaction that committed + oids: oids is a set of oids, represented as a dict with oids as keys. """ + + def root(): + """Return the database root object. + + The root is a persistent.mapping.PersistentMapping. + """ + + def getVersion(): + """Returns the version this connection is attached to.""" + + # Multi-database support. + + connections = Attribute("""\ + A mapping from database name to a Connection to that database. + + In multi-database use, the Connections of all members of a database + collection share the same .connections object. + + In single-database use, of course this mapping contains a single + entry. + """) + + # TODO: should this accept all the arguments one may pass to DB.open()? + def get_connection(database_name): + """Return a Connection for the named database. + + This is intended to be called from an open Connection associated with + a multi-database. In that case, database_name must be the name of a + database within the database collection (probably the name of a + different database than is associated with the calling Connection + instance, but it's fine to use the name of the calling Connection + object's database). A Connection for the named database is + returned. If no connection to that database is already open, a new + Connection is opened. So long as the multi-database remains open, + passing the same name to get_connection() multiple times returns the + same Connection object each time. + """ + + def sync(): + """Manually update the view on the database. + + This includes aborting the current transaction, getting a fresh and + consistent view of the data (synchronizing with the storage if possible) + and call cacheGC() for this connection. + + This method was especially useful in ZODB 3.2 to better support + read-only connections that were affected by a couple of problems. + """ + + # Debug information + + def getDebugInfo(): + """Returns a tuple with different items for debugging the connection. + + Debug information can be added to a connection by using setDebugInfo. + """ + + def setDebugInfo(*items): + """Add the given items to the debug information of this connection.""" + + def getTransferCounts(clear=False): + """Returns the number of objects loaded and stored. + + If clear is True, reset the counters. + """ + +class IDatabase(Interface): + """ZODB DB. + + TODO: This interface is incomplete. + """ + + def __init__(storage, + pool_size=7, + cache_size=400, + version_pool_size=3, + version_cache_size=100, + database_name='unnamed', + databases=None, + ): + """Create an object database. + + storage: the storage used by the database, e.g. FileStorage + pool_size: expected maximum number of open connections + cache_size: target size of Connection object cache, in number of + objects + version_pool_size: expected maximum number of connections (per + version) + version_cache_size: target size of Connection object cache for + version connections, in number of objects + database_name: when using a multi-database, the name of this DB + within the database group. It's a (detected) error if databases + is specified too and database_name is already a key in it. + This becomes the value of the DB's database_name attribute. + databases: when using a multi-database, a mapping to use as the + binding of this DB's .databases attribute. It's intended + that the second and following DB's added to a multi-database + pass the .databases attribute set on the first DB added to the + collection. + """ + + databases = Attribute("""\ + A mapping from database name to DB (database) object. + + In multi-database use, all DB members of a database collection share + the same .databases object. + + In single-database use, of course this mapping contains a single + entry. + """) + +class IStorage(Interface): + """A storage is responsible for storing and retrieving data of objects. + """ + + def load(oid, version): + """XXX""" + + def close(): + """XXX""" + + def cleanup(): + """XXX""" + + def lastSerial(): + """XXX""" + + def lastTransaction(): + """XXX""" + + def lastTid(oid): + """Return last serialno committed for object oid.""" + + def loadSerial(oid, serial): + """XXX""" + + def loadBefore(oid, tid): + """XXX""" + + def iterator(start=None, stop=None): + """XXX""" + + def sortKey(): + """XXX""" + + def getName(): + """XXX""" + + def getSize(): + """XXX""" + + def history(oid, version, length=1, filter=None): + """XXX""" + + def new_oid(last=None): + """XXX""" + + def set_max_oid(possible_new_max_oid): + """XXX""" + + def registerDB(db, limit): + """XXX""" + + def isReadOnly(): + """XXX""" + + def supportsUndo(): + """XXX""" + + def supportsVersions(): + """XXX""" + + def tpc_abort(transaction): + """XXX""" + + def tpc_begin(transaction): + """XXX""" + + def tpc_vote(transaction): + """XXX""" + + def tpc_finish(transaction, f=None): + """XXX""" + + def getSerial(oid): + """XXX""" + + def loadSerial(oid, serial): + """XXX""" + + def loadBefore(oid, tid): + """XXX""" + + def getExtensionMethods(): + """XXX""" + + def copyTransactionsFrom(): + """XXX""" + + def store(oid, oldserial, data, version, transaction): + """ + + may return the new serial or not + """ + +class IUndoableStorage(IStorage): + + def undo(transaction_id, txn): + """XXX""" + + def undoInfo(): + """XXX""" + + def undoLog(first, last, filter=None): + """XXX""" + + def pack(t, referencesf): + """XXX""" + +class IVersioningStorage(IStorage): + + def abortVersion(src, transaction): + """XXX""" + + def commitVersion(src, dest, transaction): + """XXX""" + + def modifiedInVersion(oid): + """XXX""" + + def versionEmpty(version): + """XXX""" + + def versions(max=None): + """XXX""" + diff --git a/src/ZODB/tests/multidb.txt b/src/ZODB/tests/multidb.txt new file mode 100644 index 00000000..47a9dcdf --- /dev/null +++ b/src/ZODB/tests/multidb.txt @@ -0,0 +1,146 @@ +############################################################################## +# +# Copyright (c) 2005 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## + +Multi-database tests +==================== + +Multi-database support adds the ability to tie multiple databases into a +collection. The original proposal is in the fishbowl: + + http://www.zope.org/Wikis/ZODB/MultiDatabases/ + +It was implemented during the PyCon 2005 sprints, but in a simpler form, +by Jim Fulton, Christian Theune,and Tim Peters. Overview: + +No private attributes were added, and one new method was introduced. + +DB: + +- a new .database_name attribute holds the name of this database + +- a new .databases attribute maps from database name to DB object; all DBs + in a multi-database collection share the same .databases object + +- the DB constructor has new optional arguments with the same names + (database_name= and databases=). + +Connection: + +- a new .connections attribute maps from database name to a Connection for + the database with that name; the .connections mapping object is also + shared among databases in a collection + +- a new .get_connection(database_name) method returns a Connection for a + database in the collection; if a connection is already open, it's returned + (this is the value .connections[database_name]), else a new connection is + opened (and stored as .connections[database_name]) + + +Creating a multi-database starts with creating a named DB: + + >>> from ZODB.tests.test_storage import MinimalMemoryStorage + >>> from ZODB import DB + >>> dbmap = {} + >>> db = DB(MinimalMemoryStorage(), database_name='root', databases=dbmap) + +The database name is accessible afterwards and in a newly created collection: + + >>> db.database_name + 'root' + >>> db.databases # doctest: +ELLIPSIS + {'root': <ZODB.DB.DB object at ...>} + >>> db.databases is dbmap + True + +Adding another database to the collection works like this: + + >>> db2 = DB(MinimalMemoryStorage(), + ... database_name='notroot', + ... databases=dbmap) + +The new db2 now shares the 'databases' dictionary with db and has two entries: + + >>> db2.databases is db.databases is dbmap + True + >>> len(db2.databases) + 2 + >>> names = dbmap.keys(); names.sort(); print names + ['notroot', 'root'] + +It's an error to try to insert a database with a name already in use: + + >>> db3 = DB(MinimalMemoryStorage(), + ... database_name='root', + ... databases=dbmap) + Traceback (most recent call last): + ... + ValueError: database_name 'root' already in databases + +Because that failed, db.databases wasn't changed: + + >>> len(db.databases) # still 2 + 2 + +You can (still) get a connection to a database this way: + + >>> cn = db.open() + >>> cn # doctest: +ELLIPSIS + <Connection at ...> + +This is the only connection in this collection right now: + + >>> cn.connections # doctest: +ELLIPSIS + {'root': <Connection at ...>} + +Getting a connection to a different database from an existing connection in the +same database collection (this enables 'connection binding' within a given +thread/transaction/context ...): + + >>> cn2 = cn.get_connection('notroot') + >>> cn2 # doctest: +ELLIPSIS + <Connection at ...> + +Now there are two connections in that collection: + + >>> cn2.connections is cn.connections + True + >>> len(cn2.connections) + 2 + >>> names = cn.connections.keys(); names.sort(); print names + ['notroot', 'root'] + +So long as this database group remains open, the same Connection objects +are returned: + + >>> cn.get_connection('root') is cn + True + >>> cn.get_connection('notroot') is cn2 + True + >>> cn2.get_connection('root') is cn + True + >>> cn2.get_connection('notroot') is cn2 + True + +Of course trying to get a connection for a database not in the group raises +an exception: + + >>> cn.get_connection('no way') + Traceback (most recent call last): + ... + KeyError: 'no way' + +Clean up: + + >>> for a_db in dbmap.values(): + ... a_db.close() diff --git a/src/ZODB/tests/testConnection.py b/src/ZODB/tests/testConnection.py index b8f9ba88..0a392303 100644 --- a/src/ZODB/tests/testConnection.py +++ b/src/ZODB/tests/testConnection.py @@ -647,6 +647,8 @@ class StubDatabase: self._storage = StubStorage() classFactory = None + database_name = 'stubdatabase' + databases = {'stubdatabase': database_name} def invalidate(self, transaction, dict_with_oid_keys, connection): pass diff --git a/src/ZODB/tests/test_doctest_files.py b/src/ZODB/tests/test_doctest_files.py index 069c1e2a..94a150b8 100644 --- a/src/ZODB/tests/test_doctest_files.py +++ b/src/ZODB/tests/test_doctest_files.py @@ -15,4 +15,6 @@ from zope.testing.doctestunit import DocFileSuite def test_suite(): - return DocFileSuite("dbopen.txt") + return DocFileSuite("dbopen.txt", + "multidb.txt", + ) diff --git a/src/persistent/interfaces.py b/src/persistent/interfaces.py index cd5f2fae..3504eefb 100644 --- a/src/persistent/interfaces.py +++ b/src/persistent/interfaces.py @@ -257,18 +257,35 @@ class IPersistentDataManager(Interface): def setstate(object): """Load the state for the given object. - The object should be in the ghost state. - The object's state will be set and the object will end up - in the saved state. + The object should be in the ghost state. The object's state will be + set and the object will end up in the saved state. The object must provide the IPersistent interface. """ + def oldstate(obj, tid): + """Return copy of 'obj' that was written by transaction 'tid'. + + The returned object does not have the typical metadata (_p_jar, _p_oid, + _p_serial) set. I'm not sure how references to other peristent objects + are handled. + + Parameters + obj: a persistent object from this Connection. + tid: id of a transaction that wrote an earlier revision. + + Raises KeyError if tid does not exist or if tid deleted a revision of + obj. + """ + def register(object): """Register an IPersistent with the current transaction. This method must be called when the object transitions to the changed state. + + A subclass could override this method to customize the default + policy of one transaction manager for each thread. """ def mtime(object): diff --git a/src/transaction/interfaces.py b/src/transaction/interfaces.py index 7f17ff8b..303722c9 100644 --- a/src/transaction/interfaces.py +++ b/src/transaction/interfaces.py @@ -18,104 +18,7 @@ $Id$ import zope.interface -class IResourceManager(zope.interface.Interface): - """Objects that manage resources transactionally. - - These objects may manage data for other objects, or they may manage - non-object storages, such as relational databases. - - IDataManagerOriginal is the interface currently provided by ZODB - database connections, but the intent is to move to the newer - IDataManager. - """ - - # Two-phase commit protocol. These methods are called by the - # ITransaction object associated with the transaction being - # committed. - - def tpc_begin(transaction): - """Begin two-phase commit, to save data changes. - - An implementation should do as much work as possible without - making changes permanent. Changes should be made permanent - when tpc_finish is called (or aborted if tpc_abort is called). - The work can be divided between tpc_begin() and tpc_vote(), and - the intent is that tpc_vote() be as fast as possible (to minimize - the period of uncertainty). - - transaction is the ITransaction instance associated with the - transaction being committed. - """ - - def tpc_vote(transaction): - """Verify that a resource manager can commit the transaction. - - This is the last chance for a resource manager to vote 'no'. A - resource manager votes 'no' by raising an exception. - - transaction is the ITransaction instance associated with the - transaction being committed. - """ - - def tpc_finish(transaction): - """Indicate confirmation that the transaction is done. - - transaction is the ITransaction instance associated with the - transaction being committed. - - This should never fail. If this raises an exception, the - database is not expected to maintain consistency; it's a - serious error. - """ - - def tpc_abort(transaction): - """Abort a transaction. - - transaction is the ITransaction instance associated with the - transaction being committed. - - All changes made by the current transaction are aborted. Note - that this includes all changes stored in any savepoints that may - be associated with the current transaction. - - tpc_abort() can be called at any time, either in or out of the - two-phase commit. - - This should never fail. - """ - - # The savepoint/rollback API. - - def savepoint(transaction): - """Save partial transaction changes. - - There are two purposes: - - 1) To allow discarding partial changes without discarding all - dhanges. - - 2) To checkpoint changes to disk that would otherwise live in - memory for the duration of the transaction. - - Returns an object implementing ISavePoint2 that can be used - to discard changes made since the savepoint was captured. - - An implementation that doesn't support savepoints should implement - this method by returning a savepoint object that raises an - exception when its rollback method is called. The savepoint method - shouldn't raise an error. This way, transactions that create - savepoints can proceed as long as an attempt is never made to roll - back a savepoint. - """ - - def discard(transaction): - """Discard changes within the transaction since the last savepoint. - - That means changes made since the last savepoint if one exists, or - since the start of the transaction. - """ - -class IDataManagerOriginal(zope.interface.Interface): +class IDataManager(zope.interface.Interface): """Objects that manage transactional storage. These objects may manage data for other objects, or they may manage @@ -155,7 +58,7 @@ class IDataManagerOriginal(zope.interface.Interface): has been called; this is only used when the transaction is being committed. - This call also implied the beginning of 2-phase commit. + This call also implies the beginning of 2-phase commit. """ # Two-phase commit protocol. These methods are called by the @@ -180,10 +83,12 @@ class IDataManagerOriginal(zope.interface.Interface): """ - def tpc_abort(transaction): """Abort a transaction. + This is called by a transaction manager to end a two-phase commit on + the data manager. + This is always called after a tpc_begin call. transaction is the ITransaction instance associated with the @@ -202,6 +107,11 @@ class IDataManagerOriginal(zope.interface.Interface): database is not expected to maintain consistency; it's a serious error. + It's important that the storage calls the passed function + while it still has its lock. We don't want another thread + to be able to read any updated data until we've had a chance + to send an invalidation message to all of the other + connections! """ def tpc_vote(transaction): @@ -214,125 +124,46 @@ class IDataManagerOriginal(zope.interface.Interface): transaction being committed. """ - def commit(object, transaction): - """CCCommit changes to an object + def commit(transaction): + """Commit modifications to registered objects. Save the object as part of the data to be made persistent if the transaction commits. - """ - - def abort(object, transaction): - """Abort changes to an object - - Only changes made since the last transaction or - sub-transaction boundary are discarded. - - This method may be called either: - - o Outside of two-phase commit, or - - o In the first phase of two-phase commit - - """ - - def sortKey(): - """ - Return a key to use for ordering registered DataManagers - - ZODB uses a global sort order to prevent deadlock when it commits - transactions involving multiple resource managers. The resource - manager must define a sortKey() method that provides a global ordering - for resource managers. - """ - -class IDataManager(zope.interface.Interface): - """Data management interface for storing objects transactionally. - - ZODB database connections currently provides the older - IDataManagerOriginal interface, but the intent is to move to this newer - IDataManager interface. - - Our hope is that this interface will evolve and become the standard - interface. There are some issues to be resolved first, like: - - - Probably want separate abort methods for use in and out of - two-phase commit. - - - The savepoint api may need some more thought. - - """ - - def prepare(transaction): - """Perform the first phase of a 2-phase commit - - The data manager prepares for commit any changes to be made - persistent. A normal return from this method indicated that - the data manager is ready to commit the changes. - - The data manager must raise an exception if it is not prepared - to commit the transaction after executing prepare(). - The transaction must match that used for preceeding - savepoints, if any. + This includes conflict detection and handling. If no conflicts or + errors occur it saves the objects in the storage. """ - # This is equivalent to zodb3's tpc_begin, commit, and - # tpc_vote combined. - def abort(transaction): - """Abort changes made by transaction - - This may be called before two-phase commit or in the second - phase of two-phase commit. - - The transaction must match that used for preceeding - savepoints, if any. - - """ - - # This is equivalent to *both* zodb3's abort and tpc_abort - # calls. This should probably be split into 2 methods. - - def commit(transaction): - """Finish two-phase commit - - The prepare method must be called, with the same transaction, - before calling commit. - - """ - - # This is equivalent to zodb3's tpc_finish - - def savepoint(transaction): - """Do tentative commit of changes to this point. - - Should return an object implementing IRollback that can be used - to rollback to the savepoint. - - Note that (unlike zodb3) this doesn't use a 2-phase commit - protocol. If this call fails, or if a rollback call on the - result fails, the (containing) transaction should be - aborted. Aborting the containing transaction is *not* the - responsibility of the data manager, however. + """Abort a transaction and forget all changes. - An implementation that doesn't support savepoints should - implement this method by returning a rollback implementation - that always raises an error when it's rollback method is - called. The savepoing method shouldn't raise an error. This - way, transactions that create savepoints can proceed as long - as an attempt is never made to roll back a savepoint. + Abort must be called outside of a two-phase commit. + Abort is called by the transaction manager to abort transactions + that are not yet in a two-phase commit. """ def sortKey(): - """ - Return a key to use for ordering registered DataManagers + """Return a key to use for ordering registered DataManagers ZODB uses a global sort order to prevent deadlock when it commits transactions involving multiple resource managers. The resource manager must define a sortKey() method that provides a global ordering for resource managers. """ + # XXX: Alternate version: + #"""Return a consistent sort key for this connection. + # + #This allows ordering multiple connections that use the same storage in + #a consistent manner. This is unique for the lifetime of a connection, + #which is good enough to avoid ZEO deadlocks. + #""" + + def beforeCompletion(transaction): + """Hook that is called by the transaction before completing a commit""" + + def afterCompletion(transaction): + """Hook that is called by the transaction after completing a commit""" class ITransaction(zope.interface.Interface): """Object representing a running transaction. @@ -414,34 +245,6 @@ class ITransaction(zope.interface.Interface): # Unsure: is this allowed to cause an exception here, during # the two-phase commit, or can it toss data silently? -class ISavePoint(zope.interface.Interface): - """ISavePoint objects represent partial transaction changes. - - Sequences of savepoint objects are associated with transactions, - and with IResourceManagers. - """ - - def rollback(): - """Discard changes made after this savepoint. - - This includes discarding (call the discard method on) all - subsequent savepoints. - """ - - def discard(): - """Discard changes saved by this savepoint. - - That means changes made since the immediately preceding - savepoint if one exists, or since the start of the transaction, - until this savepoint. - - Once a savepoint has been discarded, it's an error to attempt - to rollback or discard it again. - """ - - next_savepoint = zope.interface.Attribute( - """The next savepoint (later in time), or None if self is the - most recent savepoint.""") class IRollback(zope.interface.Interface): @@ -457,3 +260,4 @@ class IRollback(zope.interface.Interface): - The transaction has ended. """ + -- 2.30.9