Commit bb525b6a authored by Jim Fulton's avatar Jim Fulton

Added savepoints!

(And also added interfaces and rearranged some code to hopefully make
it easier to read.)
parent 2d487198
......@@ -17,6 +17,7 @@ $Id$"""
import logging
import sys
import tempfile
import threading
import warnings
from time import time
......@@ -33,13 +34,12 @@ import transaction
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
from ZODB.POSException \
import ConflictError, ReadConflictError, InvalidObjectReference, \
ConnectionStateError
from ZODB.TmpStore import TmpStore
from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
from ZODB.POSException import ConflictError, ReadConflictError
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
from ZODB.utils import u64, oid_repr, z64, positive_id, \
DEPRECATED_ARGUMENT, deprecated36
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
global_reset_counter = 0
......@@ -61,17 +61,19 @@ class Connection(ExportImport, object):
implements(IConnection, IDataManager, IPersistentDataManager)
_storage = _normal_storage = _savepoint_storage = None
_tmp = None
_code_timestamp = 0
# ZODB.IConnection
##########################################################################
# Connection methods, ZODB.IConnection
def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
"""Create a new Connection."""
self._log = logging.getLogger("ZODB.Connection")
self._storage = None
self._debug_info = ()
self._opened = None # time.time() when DB.open() opened us
......@@ -150,39 +152,6 @@ class Connection(ExportImport, object):
self.connections = None
def get_connection(self, database_name):
"""Return a Connection for the named database."""
connection = self.connections.get(database_name)
if connection is None:
new_con = self._db.databases[database_name].open()
self.connections.update(new_con.connections)
new_con.connections = self.connections
connection = new_con
return connection
def get(self, oid):
"""Return the persistent object with oid 'oid'."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
obj = self._cache.get(oid, None)
if obj is not None:
return obj
obj = self._added.get(oid, None)
if obj is not None:
return obj
p, serial = self._storage.load(oid, self._version)
obj = self._reader.getGhost(p)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = None
obj._p_serial = serial
self._cache[oid] = obj
return obj
def add(self, obj):
"""Add a new object 'obj' to the database and assign it an oid."""
if self._storage is None:
......@@ -207,52 +176,41 @@ class Connection(ExportImport, object):
elif obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)
def sortKey(self):
"""Return a consistent sort key for this connection."""
return "%s:%s" % (self._storage.sortKey(), id(self))
def get(self, oid):
"""Return the persistent object with oid 'oid'."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
def abort(self, transaction):
"""Abort a transaction and forget all changes."""
for obj in self._registered_objects:
oid = obj._p_oid
assert oid is not None
if oid in self._added:
del self._added[oid]
del obj._p_jar
del obj._p_oid
else:
obj = self._cache.get(oid, None)
if obj is not None:
return obj
obj = self._added.get(oid, None)
if obj is not None:
return obj
# Note: If we invalidate a non-ghostifiable object
# (i.e. a persistent class), the object will
# immediately reread it's state. That means that the
# following call could result in a call to
# self.setstate, which, of course, must suceed.
# In general, it would be better if the read could be
# delayed until the start of the next transaction. If
# we read at the end of a transaction and if the
# object was invalidated during this transaction, then
# we'll read non-current data, which we'll discard
# later in transaction finalization. Unfortnately, we
# can only delay the read if this abort corresponds to
# a top-level-transaction abort. We can't tell if
# this is a top-level-transaction abort, so we have to
# go ahead and invalidate now. Fortunately, it's
# pretty unlikely that the object we are invalidating
# was invalidated by another thread, so the risk of a
# reread is pretty low.
p, serial = self._storage.load(oid, self._version)
obj = self._reader.getGhost(p)
self._cache.invalidate(oid)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = None
obj._p_serial = serial
self._tpc_cleanup()
self._cache[oid] = obj
return obj
# TODO: we should test what happens when cacheGC is called mid-transaction.
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache."""
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
# TODO: we should test what happens when cacheGC is called mid-transaction.
def cacheGC(self):
"""Reduce cache size to target size."""
self._cache.incrgc()
__onCloseCallbacks = None
def onCloseCallback(self, f):
"""Register a callable, f, to be called by close()."""
if self.__onCloseCallbacks is None:
......@@ -266,12 +224,6 @@ class Connection(ExportImport, object):
raise ConnectionStateError("Cannot close a connection joined to "
"a transaction")
if self._tmp is not None:
# There are no direct modifications pending, but a subtransaction
# is pending.
raise ConnectionStateError("Cannot close a connection with a "
"pending subtransaction")
if self._cache is not None:
self._cache.incrgc() # This is a good time to do some GC
......@@ -285,7 +237,8 @@ class Connection(ExportImport, object):
self._log.error("Close callback failed for %s", f,
exc_info=sys.exc_info())
self.__onCloseCallbacks = None
self._storage = self._tmp = self.new_oid = None
self._storage = self._savepoint_storage = self._normal_storage = None
self.new_oid = None
self._debug_info = ()
self._opened = None
# Return the connection to the pool.
......@@ -297,156 +250,128 @@ class Connection(ExportImport, object):
# assert that here, because self may have been reused (by
# another thread) by the time we get back here.
# transaction.interfaces.IDataManager
def commit(self, transaction):
"""Commit changes to an object"""
if self._import:
# TODO: This code seems important for Zope, but needs docs
# to explain why.
self._importDuringCommit(transaction, *self._import)
self._import = None
# Just in case an object is added as a side-effect of storing
# a modified object. If, for example, a __getstate__() method
# calls add(), the newly added objects will show up in
# _added_during_commit. This sounds insane, but has actually
# happened.
def db(self):
"""Returns a handle to the database this connection belongs to."""
return self._db
self._added_during_commit = []
def isReadOnly(self):
"""Returns True if the storage for this connection is read only."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
for obj in self._registered_objects:
oid = obj._p_oid
assert oid
if oid in self._conflicts:
raise ReadConflictError(object=obj)
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
self._inv_lock.acquire()
try:
if self._txn_time is None:
self._txn_time = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
if obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)
elif oid in self._added:
assert obj._p_serial == z64
elif obj._p_changed:
if oid in self._invalidated:
resolve = getattr(obj, "_p_resolveConflict", None)
if resolve is None:
raise ConflictError(object=obj)
self._modified.append(oid)
else:
# Nothing to do. It's been said that it's legal, e.g., for
# an object to set _p_changed to false after it's been
# changed and registered.
continue
def root(self):
"""Return the database root object."""
return self.get(z64)
self._store_objects(ObjectWriter(obj), transaction)
def getVersion(self):
"""Returns the version this connection is attached to."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
for obj in self._added_during_commit:
self._store_objects(ObjectWriter(obj), transaction)
self._added_during_commit = None
def get_connection(self, database_name):
"""Return a Connection for the named database."""
connection = self.connections.get(database_name)
if connection is None:
new_con = self._db.databases[database_name].open()
self.connections.update(new_con.connections)
new_con.connections = self.connections
connection = new_con
return connection
def _store_objects(self, writer, transaction):
for obj in writer:
oid = obj._p_oid
serial = getattr(obj, "_p_serial", z64)
def sync(self):
"""Manually update the view on the database."""
self._txn_mgr.get().abort()
sync = getattr(self._storage, 'sync', 0)
if sync:
sync()
self._flush_invalidations()
if serial == z64:
# obj is a new object
self._creating.append(oid)
# Because obj was added, it is now in _creating, so it can
# be removed from _added.
self._added.pop(oid, None)
else:
if (oid in self._invalidated
and not hasattr(obj, '_p_resolveConflict')):
raise ConflictError(object=obj)
self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj
s = self._storage.store(oid, serial, p, self._version, transaction)
self._store_count += 1
# Put the object in the cache before handling the
# response, just in case the response contains the
# serial number for a newly created object
try:
self._cache[oid] = obj
except:
# Dang, I bet it's wrapped:
# TODO: Deprecate, then remove, this.
if hasattr(obj, 'aq_base'):
self._cache[oid] = obj.aq_base
else:
raise
def getDebugInfo(self):
"""Returns a tuple with different items for debugging the
connection.
"""
return self._debug_info
self._handle_serial(s, oid)
def setDebugInfo(self, *args):
"""Add the given items to the debug information of this connection."""
self._debug_info = self._debug_info + args
def commit_sub(self, t):
"""Commit all changes made in subtransactions and begin 2-phase commit
"""
if self._tmp is None:
return
src = self._storage
self._storage = self._tmp
self._tmp = None
def getTransferCounts(self, clear=False):
"""Returns the number of objects loaded and stored."""
res = self._load_count, self._store_count
if clear:
self._load_count = 0
self._store_count = 0
return res
self._log.debug("Commiting subtransaction of size %s", src.getSize())
oids = src._index.keys()
self._storage.tpc_begin(t)
# Connection methods
##########################################################################
# Copy invalidating and creating info from temporary storage:
self._modified.extend(oids)
self._creating.extend(src._creating)
##########################################################################
# Data manager (IDataManager) methods
for oid in oids:
data, serial = src.load(oid, src)
s = self._storage.store(oid, serial, data, self._version, t)
self._handle_serial(s, oid, change=False)
def abort(self, transaction):
"""Abort a transaction and forget all changes."""
def abort_sub(self, t):
"""Discard all subtransaction data."""
if self._tmp is None:
return
src = self._storage
self._storage = self._tmp
self._tmp = None
if self._savepoint_storage is not None:
self._abort_savepoint()
# Note: If we invalidate a non-ghostifiable object (i.e. a
# persistent class), the object will immediately reread it's
# state. That means that the following call could result in a
# call to self.setstate, which, of course, must succeed. In
# general, it would be better if the read could be delayed
# until the start of the next transaction. If we read at the
# end of a transaction and if the object was invalidated
# during this transaction, then we'll read non-current data,
# which we'll discard later in transaction finalization. We
# could, theoretically queue this invalidation by calling
# self.invalidate. Unfortunately, attempts to make that
# change resulted in mysterious test failures. It's pretty
# unlikely that the object we are invalidating was invalidated
# by another thread, so the risk of a reread is pretty low.
# It's really not worth the effort to pursue this.
self._abort()
self._cache.invalidate(src._index.keys())
self._invalidate_creating(src._creating)
self._tpc_cleanup()
def _invalidate_creating(self, creating=None):
"""Disown any objects newly saved in an uncommitted transaction."""
if creating is None:
creating = self._creating
self._creating = []
def _abort(self):
"""Abort a transaction and forget all changes."""
for oid in creating:
o = self._cache.get(oid)
if o is not None:
del self._cache[oid]
del o._p_jar
del o._p_oid
for obj in self._registered_objects:
oid = obj._p_oid
assert oid is not None
if oid in self._added:
del self._added[oid]
del obj._p_jar
del obj._p_oid
else:
# The next two methods are callbacks for transaction synchronization.
# Note: If we invalidate a non-ghostifiable object
# (i.e. a persistent class), the object will
# immediately reread it's state. That means that the
# following call could result in a call to
# self.setstate, which, of course, must suceed.
# In general, it would be better if the read could be
# delayed until the start of the next transaction. If
# we read at the end of a transaction and if the
# object was invalidated during this transaction, then
# we'll read non-current data, which we'll discard
# later in transaction finalization. Unfortnately, we
# can only delay the read if this abort corresponds to
# a top-level-transaction abort. We can't tell if
# this is a top-level-transaction abort, so we have to
# go ahead and invalidate now. Fortunately, it's
# pretty unlikely that the object we are invalidating
# was invalidated by another thread, so the risk of a
# reread is pretty low.
def beforeCompletion(self, txn):
# We don't do anything before a commit starts.
pass
self._cache.invalidate(oid)
def afterCompletion(self, txn):
def _tpc_cleanup(self):
"""Performs cleanup operations to support tpc_finish and tpc_abort."""
self._conflicts.clear()
if not self._synch:
self._flush_invalidations()
self._needs_to_join = True
self._registered_objects = []
def _flush_invalidations(self):
self._inv_lock.acquire()
......@@ -491,54 +416,101 @@ class Connection(ExportImport, object):
# Now is a good time to collect some garbage.
self._cache.incrgc()
def root(self):
"""Return the database root object."""
return self.get(z64)
def tpc_begin(self, transaction):
"""Begin commit of a transaction, starting the two-phase commit."""
self._modified = []
def db(self):
"""Returns a handle to the database this connection belongs to."""
return self._db
# _creating is a list of oids of new objects, which is used to
# remove them from the cache if a transaction aborts.
self._creating = []
self._normal_storage.tpc_begin(transaction)
def isReadOnly(self):
"""Returns True if the storage for this connection is read only."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
def commit(self, transaction):
"""Commit changes to an object"""
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
self._inv_lock.acquire()
try:
if self._txn_time is None:
self._txn_time = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
if self._savepoint_storage is not None:
self._commit_savepoint(transaction)
self._commit(transaction)
def _commit(self, transaction):
"""Commit changes to an object"""
if self._import:
# TODO: This code seems important for Zope, but needs docs
# to explain why.
self._importDuringCommit(transaction, *self._import)
self._import = None
# Just in case an object is added as a side-effect of storing
# a modified object. If, for example, a __getstate__() method
# calls add(), the newly added objects will show up in
# _added_during_commit. This sounds insane, but has actually
# happened.
# IDataManager
self._added_during_commit = []
def tpc_begin(self, transaction, sub=False):
"""Begin commit of a transaction, starting the two-phase commit."""
self._modified = []
for obj in self._registered_objects:
oid = obj._p_oid
assert oid
if oid in self._conflicts:
raise ReadConflictError(object=obj)
# _creating is a list of oids of new objects, which is used to
# remove them from the cache if a transaction aborts.
self._creating = []
if sub and self._tmp is None:
# Sub-transaction!
self._tmp = self._storage
self._storage = TmpStore(self._version, self._storage)
if obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)
elif oid in self._added:
assert obj._p_serial == z64
elif obj._p_changed:
if oid in self._invalidated:
resolve = getattr(obj, "_p_resolveConflict", None)
if resolve is None:
raise ConflictError(object=obj)
self._modified.append(oid)
else:
# Nothing to do. It's been said that it's legal, e.g., for
# an object to set _p_changed to false after it's been
# changed and registered.
continue
self._storage.tpc_begin(transaction)
self._store_objects(ObjectWriter(obj), transaction)
def tpc_vote(self, transaction):
"""Verify that a data manager can commit the transaction."""
for obj in self._added_during_commit:
self._store_objects(ObjectWriter(obj), transaction)
self._added_during_commit = None
def _store_objects(self, writer, transaction):
for obj in writer:
oid = obj._p_oid
serial = getattr(obj, "_p_serial", z64)
if serial == z64:
# obj is a new object
self._creating.append(oid)
# Because obj was added, it is now in _creating, so it can
# be removed from _added.
self._added.pop(oid, None)
else:
if (oid in self._invalidated
and not hasattr(obj, '_p_resolveConflict')):
raise ConflictError(object=obj)
self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj
s = self._storage.store(oid, serial, p, self._version, transaction)
self._store_count += 1
# Put the object in the cache before handling the
# response, just in case the response contains the
# serial number for a newly created object
try:
vote = self._storage.tpc_vote
except AttributeError:
return
s = vote(transaction)
self._handle_serial(s)
self._cache[oid] = obj
except:
# Dang, I bet it's wrapped:
# TODO: Deprecate, then remove, this.
if hasattr(obj, 'aq_base'):
self._cache[oid] = obj.aq_base
else:
raise
self._handle_serial(s, oid)
def _handle_serial(self, store_return, oid=None, change=1):
"""Handle the returns from store() and tpc_vote() calls."""
......@@ -582,26 +554,13 @@ class Connection(ExportImport, object):
obj._p_changed = 0 # transition from changed to up-to-date
obj._p_serial = serial
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
if self._tmp is not None:
# Commiting a subtransaction!
# There is no need to invalidate anything.
self._storage.tpc_finish(transaction)
self._storage._creating[:0]=self._creating
del self._creating[:]
else:
def callback(tid):
d = {}
for oid in self._modified:
d[oid] = 1
self._db.invalidate(tid, d, self)
self._storage.tpc_finish(transaction, callback)
self._tpc_cleanup()
def tpc_abort(self, transaction):
if self._import:
self._import = None
if self._savepoint_storage is not None:
self._abort_savepoint()
self._storage.tpc_abort(transaction)
# Note: If we invalidate a non-justifiable object (i.e. a
......@@ -628,41 +587,59 @@ class Connection(ExportImport, object):
del obj._p_jar
self._tpc_cleanup()
def _tpc_cleanup(self):
"""Performs cleanup operations to support tpc_finish and tpc_abort."""
self._conflicts.clear()
if not self._synch:
self._flush_invalidations()
self._needs_to_join = True
self._registered_objects = []
def _invalidate_creating(self, creating=None):
"""Disown any objects newly saved in an uncommitted transaction."""
if creating is None:
creating = self._creating
self._creating = []
def sync(self):
"""Manually update the view on the database."""
self._txn_mgr.get().abort()
sync = getattr(self._storage, 'sync', 0)
if sync:
sync()
self._flush_invalidations()
for oid in creating:
o = self._cache.get(oid)
if o is not None:
del self._cache[oid]
del o._p_jar
del o._p_oid
def getDebugInfo(self):
"""Returns a tuple with different items for debugging the
connection.
"""
return self._debug_info
def tpc_vote(self, transaction):
"""Verify that a data manager can commit the transaction."""
try:
vote = self._storage.tpc_vote
except AttributeError:
return
s = vote(transaction)
self._handle_serial(s)
def setDebugInfo(self, *args):
"""Add the given items to the debug information of this connection."""
self._debug_info = self._debug_info + args
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
def callback(tid):
d = {}
for oid in self._modified:
d[oid] = 1
self._db.invalidate(tid, d, self)
self._storage.tpc_finish(transaction, callback)
self._tpc_cleanup()
def getTransferCounts(self, clear=False):
"""Returns the number of objects loaded and stored."""
res = self._load_count, self._store_count
if clear:
self._load_count = 0
self._store_count = 0
return res
def sortKey(self):
"""Return a consistent sort key for this connection."""
return "%s:%s" % (self._storage.sortKey(), id(self))
##############################################
# Data manager (IDataManager) methods
##########################################################################
##########################################################################
# Transaction-manager synchronization -- ISynchronizer
def beforeCompletion(self, txn):
# We don't do anything before a commit starts.
pass
def afterCompletion(self, txn):
self._flush_invalidations()
# Transaction-manager synchronization -- ISynchronizer
##########################################################################
##########################################################################
# persistent.interfaces.IPersistentDatamanager
def oldstate(self, obj, tid):
......@@ -818,12 +795,24 @@ class Connection(ExportImport, object):
self._register(obj)
def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj)
# The order here is important. We need to join before
# registering the object, because joining may take a
# savepoint, and the savepoint should not reflect the change
# to the object.
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
if obj is not None:
self._registered_objects.append(obj)
# persistent.interfaces.IPersistentDatamanager
##########################################################################
##########################################################################
# PROTECTED stuff (used by e.g. ZODB.DB.DB)
def _cache_items(self):
......@@ -862,7 +851,7 @@ class Connection(ExportImport, object):
# and Storage.
self._db = odb
self._storage = odb._storage
self._normal_storage = self._storage = odb._storage
self.new_oid = odb._storage.new_oid
self._opened = time()
if synch is not None:
......@@ -892,6 +881,7 @@ class Connection(ExportImport, object):
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
##########################################################################
# Python protocol
def __repr__(self):
......@@ -901,6 +891,10 @@ class Connection(ExportImport, object):
ver = ''
return '<Connection at %08x%s>' % (positive_id(self), ver)
# Python protocol
##########################################################################
##########################################################################
# DEPRECATION candidates
__getitem__ = get
......@@ -916,33 +910,6 @@ class Connection(ExportImport, object):
except KeyError:
return self.getVersion()
def getVersion(self):
"""Returns the version this connection is attached to."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
try:
oid = obj._p_oid
p, serial = self._storage.load(oid, self._version)
# We call getGhost(), but we actually get a non-ghost back.
# The object is a class, which can't actually be ghosted.
copy = self._reader.getGhost(p)
obj.__dict__.clear()
obj.__dict__.update(copy.__dict__)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = 0
obj._p_serial = serial
except:
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def exchange(self, old, new):
# called by a ZClasses method that isn't executed by the test suite
oid = old._p_oid
......@@ -952,8 +919,20 @@ class Connection(ExportImport, object):
self._register(new)
self._cache[oid] = new
# DEPRECATION candidates
##########################################################################
##########################################################################
# DEPRECATED methods
def cacheFullSweep(self, dt=None):
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def getTransaction(self):
"""Get the current transaction for this connection.
......@@ -986,16 +965,150 @@ class Connection(ExportImport, object):
self._txn_mgr.registerSynch(self)
return self._txn_mgr
def cacheFullSweep(self, dt=None):
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
# DEPRECATED methods
##########################################################################
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache."""
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
#####################################################################
# Savepoint support
def savepoint(self):
if self._savepoint_storage is None:
self._savepoint_storage = TmpStore(self._version,
self._normal_storage)
self._storage = self._savepoint_storage
self._creating = []
self._commit(None)
self._storage.creating.extend(self._creating)
del self._creating[:]
self._registered_objects = []
state = self._storage.position, self._storage.index.copy()
return Savepoint(self, state)
def _rollback(self, state):
self._abort()
src = self._storage
self._cache.invalidate(src.index)
src.reset(*state)
def _commit_savepoint(self, transaction):
"""Commit all changes made in subtransactions and begin 2-phase commit
"""
src = self._savepoint_storage
self._storage = self._normal_storage
self._savepoint_storage = None
self._log.debug("Commiting savepoints of size %s", src.getSize())
oids = src.index.keys()
# Copy invalidating and creating info from temporary storage:
self._modified.extend(oids)
self._creating.extend(src.creating)
for oid in oids:
data, serial = src.load(oid, src)
s = self._storage.store(oid, serial, data,
self._version, transaction)
self._handle_serial(s, oid, change=False)
src.close()
def _abort_savepoint(self):
"""Discard all subtransaction data."""
src = self._savepoint_storage
self._storage = self._normal_storage
self._savepoint_storage = None
# Note: If we invalidate a non-ghostifiable object (i.e. a
# persistent class), the object will immediately reread it's
# state. That means that the following call could result in a
# call to self.setstate, which, of course, must succeed. In
# general, it would be better if the read could be delayed
# until the start of the next transaction. If we read at the
# end of a transaction and if the object was invalidated
# during this transaction, then we'll read non-current data,
# which we'll discard later in transaction finalization. We
# could, theoretically queue this invalidation by calling
# self.invalidate. Unfortunately, attempts to make that
# change resulted in mysterious test failures. It's pretty
# unlikely that the object we are invalidating was invalidated
# by another thread, so the risk of a reread is pretty low.
# It's really not worth the effort to pursue this.
self._cache.invalidate(src.index)
self._invalidate_creating(src.creating)
src.close()
# Savepoint support
#####################################################################
class Savepoint:
def __init__(self, datamanager, state):
self.datamanager = datamanager
self.state = state
def rollback(self):
self.datamanager._rollback(self.state)
class TmpStore:
"""A storage-like thing to support savepoints."""
def __init__(self, base_version, storage):
self._storage = storage
for method in (
'getName', 'new_oid', 'modifiedInVersion', 'getSize',
'undoLog', 'versionEmpty', 'sortKey',
):
setattr(self, method, getattr(storage, method))
self._base_version = base_version
self._file = tempfile.TemporaryFile()
# position: current file position
# _tpos: file position at last commit point
self.position = 0L
# index: map oid to pos of last committed version
self.index = {}
self.creating = []
def __len__(self):
return len(self.index)
def close(self):
self._file.close()
def load(self, oid, version):
pos = self.index.get(oid)
if pos is None:
return self._storage.load(oid, self._base_version)
self._file.seek(pos)
h = self._file.read(8)
oidlen = u64(h)
read_oid = self._file.read(oidlen)
if read_oid != oid:
raise POSException.StorageSystemError('Bad temporary storage')
h = self._file.read(16)
size = u64(h[8:])
serial = h[:8]
return self._file.read(size), serial
def store(self, oid, serial, data, version, transaction):
# we have this funny signature so we can reuse the normal non-commit
# commit logic
assert version == self._base_version
self._file.seek(self.position)
l = len(data)
if serial is None:
serial = z64
header = p64(len(oid)) + oid + serial + p64(l)
self._file.write(header)
self._file.write(data)
self.index[oid] = self.position
self.position += l + len(header)
return serial
def reset(self, position, index):
self._file.truncate(position)
self.position = position
self.index = index
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from ZODB import POSException
from ZODB.utils import p64, u64, z64
import tempfile
class TmpStore:
"""A storage to support subtransactions."""
_bver = ''
def __init__(self, base_version, storage):
self._transaction = None
self._storage = storage
if base_version:
self._bver = base_version
self._file = tempfile.TemporaryFile()
# _pos: current file position
# _tpos: file position at last commit point
self._pos = self._tpos = 0L
# _index: map oid to pos of last committed version
self._index = {}
# _tindex: map oid to pos for new updates
self._tindex = {}
self._creating = []
def close(self):
self._file.close()
def getName(self):
return self._storage.getName()
def getSize(self):
return self._pos
def load(self, oid, version):
pos = self._index.get(oid)
if pos is None:
return self._storage.load(oid, self._bver)
self._file.seek(pos)
h = self._file.read(8)
oidlen = u64(h)
read_oid = self._file.read(oidlen)
if read_oid != oid:
raise POSException.StorageSystemError('Bad temporary storage')
h = self._file.read(16)
size = u64(h[8:])
serial = h[:8]
return self._file.read(size), serial
def sortKey(self):
return self._storage.sortKey()
# TODO: clarify difference between self._storage & self._db._storage
def modifiedInVersion(self, oid):
if self._index.has_key(oid):
return self._bver
return self._storage.modifiedInVersion(oid)
def new_oid(self):
return self._storage.new_oid()
def registerDB(self, db, limit):
pass
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._file.seek(self._pos)
l = len(data)
if serial is None:
serial = z64
header = p64(len(oid)) + oid + serial + p64(l)
self._file.write(header)
self._file.write(data)
self._tindex[oid] = self._pos
self._pos += l + len(header)
return serial
def tpc_abort(self, transaction):
if transaction is not self._transaction:
return
self._tindex.clear()
self._transaction = None
self._pos = self._tpos
def tpc_begin(self, transaction):
if self._transaction is transaction:
return
self._transaction = transaction
self._tindex.clear() # Just to be sure!
self._pos = self._tpos
def tpc_vote(self, transaction):
pass
def tpc_finish(self, transaction, f=None):
if transaction is not self._transaction:
return
if f is not None:
f()
self._index.update(self._tindex)
self._tindex.clear()
self._tpos = self._pos
def undoLog(self, first, last, filter=None):
return ()
def versionEmpty(self, version):
# TODO: what is this supposed to do?
if version == self._bver:
return len(self._index)
......@@ -294,7 +294,7 @@ class UserMethodTests(unittest.TestCase):
>>> cn.close() # this was succeeding
Traceback (most recent call last):
...
ConnectionStateError: Cannot close a connection with a pending subtransaction
ConnectionStateError: Cannot close a connection joined to a transaction
Again this leaves the connection as it was.
>>> transaction.commit()
......
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of savepoint feature
$Id$
"""
import unittest
from zope.testing import doctest
def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite('testConnectionSavepoint.txt'),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
Savepoints
==========
Savepoints provide a way to save to disk intermediate work done during
a transaction allowing:
- partial transaction (subtransaction) rollback (abort)
- state of saved objects to be freed, freeing on-line memory for other
uses
Savepoints make it possible to write atomic subroutines that don't
make top-level transaction commitments.
Applications
------------
To demonstrate how savepoints work with transactions, we'll show an example.
>>> import ZODB.tests.util
>>> db = ZODB.tests.util.DB()
>>> connection = db.open()
>>> root = connection.root()
>>> root['name'] = 'bob'
As with other data managers, we can commit changes:
>>> import transaction
>>> transaction.commit()
>>> root['name']
'bob'
and abort changes:
>>> root['name'] = 'sally'
>>> root['name']
'sally'
>>> transaction.abort()
>>> root['name']
'bob'
Now, lets look at an application that manages funds for people.
It allows deposits and debits to be entered for multiple people.
It accepts a sequence of entries and generates a sequence of status
messages. For each entry, it applies the change and then validates
the user's account. If the user's account is invalid, we role back
the change for that entry. The success or failure of an entry is
indicated in the output status. First we'll initialize some accounts:
>>> root['bob-balance'] = 0.0
>>> root['bob-credit'] = 0.0
>>> root['sally-balance'] = 0.0
>>> root['sally-credit'] = 100.0
>>> transaction.commit()
Now, we'll define a validation function to validate an account:
>>> def validate_account(name):
... if root[name+'-balance'] + root[name+'-credit'] < 0:
... raise ValueError('Overdrawn', name)
And a function to apply entries. If the function fails in some
unexpected way, it rolls back all of it's changes and
prints the error:
>>> def apply_entries(entries):
... savepoint = transaction.savepoint()
... try:
... for name, amount in entries:
... entry_savepoint = transaction.savepoint()
... try:
... root[name+'-balance'] += amount
... validate_account(name)
... except ValueError, error:
... entry_savepoint.rollback()
... print 'Error', str(error)
... else:
... print 'Updated', name
... except Exception, error:
... savepoint.rollback()
... print 'Unexpected exception', error
Now let's try applying some entries:
>>> apply_entries([
... ('bob', 10.0),
... ('sally', 10.0),
... ('bob', 20.0),
... ('sally', 10.0),
... ('bob', -100.0),
... ('sally', -100.0),
... ])
Updated bob
Updated sally
Updated bob
Updated sally
Error ('Overdrawn', 'bob')
Updated sally
>>> root['bob-balance']
30.0
>>> root['sally-balance']
-80.0
If we give provide entries that cause an unexpected error:
>>> apply_entries([
... ('bob', 10.0),
... ('sally', 10.0),
... ('bob', '20.0'),
... ('sally', 10.0),
... ])
Updated bob
Updated sally
Unexpected exception unsupported operand type(s) for +=: 'float' and 'str'
Because the apply_entries used a savepoint for the entire function,
it was able to rollback the partial changes without rolling back
changes made in the previous call to apply_entries:
>>> root['bob-balance']
30.0
>>> root['sally-balance']
-80.0
If we now abort the outer transactions, the earlier changes will go
away:
>>> transaction.abort()
>>> root['bob-balance']
0.0
>>> root['sally-balance']
0.0
......@@ -550,13 +550,17 @@ class ZODBTests(unittest.TestCase):
self.assertEqual(rt['a'], 1)
rt['b'] = 2
# Subtransactions don't do tpc_vote, so we poison tpc_begin.
poisoned = PoisonedObject(PoisonedJar(break_tpc_begin=True))
transaction.get().register(poisoned)
poisoned = PoisonedJar()
transaction.get().join(poisoned)
poisoned.break_savepoint = True
self.assertRaises(PoisonedError, transaction.get().commit, True)
# Trying to subtxn-commit again fails too.
self.assertRaises(TransactionFailedError, transaction.get().commit, True)
self.assertRaises(TransactionFailedError, transaction.get().commit, True)
self.assertRaises(TransactionFailedError,
transaction.get().commit, True)
self.assertRaises(TransactionFailedError,
transaction.get().commit, True)
# Top-level commit also fails.
self.assertRaises(TransactionFailedError, transaction.get().commit)
......@@ -568,6 +572,7 @@ class ZODBTests(unittest.TestCase):
# also raises TransactionFailedError.
self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
# Clean up via abort(), and try again.
transaction.get().abort()
rt['a'] = 1
......@@ -576,13 +581,18 @@ class ZODBTests(unittest.TestCase):
# Cleaning up via begin() should also work.
rt['a'] = 2
transaction.get().register(poisoned)
poisoned = PoisonedJar()
transaction.get().join(poisoned)
poisoned.break_savepoint = True
self.assertRaises(PoisonedError, transaction.get().commit, True)
self.assertRaises(TransactionFailedError, transaction.get().commit, True)
self.assertRaises(TransactionFailedError,
transaction.get().commit, True)
# The change to rt['a'] is lost.
self.assertEqual(rt['a'], 1)
# Trying to modify an object also fails.
self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
# Clean up via begin(), and try again.
transaction.begin()
rt['a'] = 2
......@@ -603,9 +613,11 @@ class PoisonedError(Exception):
# PoisonedJar arranges to raise exceptions from interesting places.
# For whatever reason, subtransaction commits don't call tpc_vote.
class PoisonedJar:
def __init__(self, break_tpc_begin=False, break_tpc_vote=False):
def __init__(self, break_tpc_begin=False, break_tpc_vote=False,
break_savepoint=False):
self.break_tpc_begin = break_tpc_begin
self.break_tpc_vote = break_tpc_vote
self.break_savepoint = break_savepoint
def sortKey(self):
return str(id(self))
......@@ -620,13 +632,9 @@ class PoisonedJar:
if self.break_tpc_vote:
raise PoisonedError("tpc_vote fails")
# commit_sub is needed else this jar is ignored during subtransaction
# commit.
def commit_sub(*args):
pass
def abort_sub(*args):
pass
def savepoint(self):
if self.break_savepoint:
raise PoisonedError("savepoint fails")
def commit(*args):
pass
......
......@@ -238,569 +238,6 @@ def test_tpc_abort_phase2():
False
"""
def test_commit_w_subtransactions():
"""
So, we have a data manager:
>>> dm = DataManager()
and we do some work that modifies uncommited state:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 1)
Now we'll commit the changes in a subtransaction. When the data
manager joins a transaction, the transaction will create an
adapter.
>>> dma = DataManagerAdapter(dm)
and register it as a modified object. At commit time, the
transaction will get the "jar" like this:
>>> jar = getattr(dma, '_p_jar', dma)
and, of course, the jar and the adapter will be the same:
>>> jar is dma
True
The transaction will call tpc_begin:
>>> t1 = '1'
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
Then the transaction will call commit on the jar:
>>> jar.commit(t1)
This doesn't actually do anything. :)
>>> dm.state, dm.delta
(0, 1)
The transaction will then call tpc_vote:
>>> jar.tpc_vote(t1)
This doesn't do anything either, because zodb4 data managers don't
actually do two-phase commit for subtransactions.
>>> dm.state, dm.delta
(0, 1)
Finally, we call tpc_finish. This does actally create a savepoint,
but we can't really tell that from outside.
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 1)
We'll do more of the above:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 2)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 2)
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 3)
Note that the bove works *because* the same transaction is used
for each subtransaction.
Finally, we'll do a little more work:
>>> dm.inc()
>>> dm.inc()
>>> dm.state, dm.delta
(0, 5)
and then commit the top-level transaction.
The transaction will actually go through the steps for a subtransaction:
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
And then call commit_sub:
>>> jar.commit_sub(t1)
As usual, this doesn't actually do anything. ;)
>>> dm.state, dm.delta
(0, 5)
The transaction manager doesn's call tpc_begin, because commit_sub
implies the start of two-phase commit. Next, it does call commit:
>>> jar.commit(t1)
which doesn't do anything.
Finally, the transaction calls tpc_vote:
>>> jar.tpc_vote(t1)
which actually does something (because this is the top-level txn):
>>> dm.state, dm.delta
(5, 5)
>>> dm.prepared
True
Finally, tpc_finish is called:
>>> jar.tpc_finish(t1)
and the data manager finishes the two-phase commit:
>>> dm.state, dm.delta
(5, 0)
>>> dm.prepared
False
"""
def test_commit_w_subtransactions_featuring_subtransaction_abort():
"""
So, we have a data manager:
>>> dm = DataManager()
and we do some work that modifies uncommited state:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 1)
Now we'll commit the changes in a subtransaction. When the data
manager joins a transaction, the transaction will create an
adapter.
>>> dma = DataManagerAdapter(dm)
and register it as a modified object. At commit time, the
transaction will get the "jar" like this:
>>> jar = getattr(dma, '_p_jar', dma)
and, of course, the jar and the adapter will be the same:
>>> jar is dma
True
The transaction will call tpc_begin:
>>> t1 = '1'
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
Then the transaction will call commit on the jar:
>>> jar.commit(t1)
This doesn't actually do anything. :)
>>> dm.state, dm.delta
(0, 1)
The transaction will then call tpc_vote:
>>> jar.tpc_vote(t1)
This doesn't do anything either, because zodb4 data managers don't
actually do two-phase commit for subtransactions.
>>> dm.state, dm.delta
(0, 1)
Finally, we call tpc_finish. This does actally create a savepoint,
but we can't really tell that from outside.
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 1)
We'll do more of the above:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 2)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 2)
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
But then we'll decide to abort a subtransaction.
The transaction will just call abort as usual:
>>> jar.abort(t1)
This will cause a rollback to the last savepoint:
>>> dm.state, dm.delta
(0, 2)
Then we do more work:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 3)
Note that the bove works *because* the same transaction is used
for each subtransaction.
Finally, we'll do a little more work:
>>> dm.inc()
>>> dm.inc()
>>> dm.state, dm.delta
(0, 5)
and then commit the top-level transaction.
The transaction will actually go through the steps for a subtransaction:
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
And then call commit_sub:
>>> jar.commit_sub(t1)
As usual, this doesn't actually do anything. ;)
>>> dm.state, dm.delta
(0, 5)
The transaction manager doesn's call tpc_begin, because commit_sub
implies the start of two-phase commit. Next, it does call commit:
>>> jar.commit(t1)
which doesn't do anything.
Finally, the transaction calls tpc_vote:
>>> jar.tpc_vote(t1)
which actually does something (because this is the top-level txn):
>>> dm.state, dm.delta
(5, 5)
>>> dm.prepared
True
Finally, tpc_finish is called:
>>> jar.tpc_finish(t1)
and the data manager finishes the two-phase commit:
>>> dm.state, dm.delta
(5, 0)
>>> dm.prepared
False
"""
def test_abort_w_subtransactions():
"""
So, we have a data manager:
>>> dm = DataManager()
and we do some work that modifies uncommited state:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 1)
Now we'll commit the changes in a subtransaction. When the data
manager joins a transaction, the transaction will create an
adapter.
>>> dma = DataManagerAdapter(dm)
and register it as a modified object. At commit time, the
transaction will get the "jar" like this:
>>> jar = getattr(dma, '_p_jar', dma)
and, of course, the jar and the adapter will be the same:
>>> jar is dma
True
The transaction will call tpc_begin:
>>> t1 = '1'
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
Then the transaction will call commit on the jar:
>>> jar.commit(t1)
This doesn't actually do anything. :)
>>> dm.state, dm.delta
(0, 1)
The transaction will then call tpc_vote:
>>> jar.tpc_vote(t1)
This doesn't do anything either, because zodb4 data managers don't
actually do two-phase commit for subtransactions.
>>> dm.state, dm.delta
(0, 1)
Finally, we call tpc_finish. This does actally create a savepoint,
but we can't really tell that from outside.
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 1)
We'll do more of the above:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 2)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 2)
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 3)
Note that the bove works *because* the same transaction is used
for each subtransaction.
Finally, we'll do a little more work:
>>> dm.inc()
>>> dm.inc()
>>> dm.state, dm.delta
(0, 5)
and then abort the top-level transaction.
The transaction first call abort on the jar:
>>> jar.abort(t1)
This will have the effect of aborting the subtrancation:
>>> dm.state, dm.delta
(0, 3)
Then the transaction will call abort_sub:
>>> jar.abort_sub(t1)
This will abort all of the subtransactions:
>>> dm.state, dm.delta
(0, 0)
"""
def test_tpc_abort_w_subtransactions_featuring_subtransaction_abort():
"""
So, we have a data manager:
>>> dm = DataManager()
and we do some work that modifies uncommited state:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 1)
Now we'll commit the changes in a subtransaction. When the data
manager joins a transaction, the transaction will create an
adapter.
>>> dma = DataManagerAdapter(dm)
and register it as a modified object. At commit time, the
transaction will get the "jar" like this:
>>> jar = getattr(dma, '_p_jar', dma)
and, of course, the jar and the adapter will be the same:
>>> jar is dma
True
The transaction will call tpc_begin:
>>> t1 = '1'
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
Then the transaction will call commit on the jar:
>>> jar.commit(t1)
This doesn't actually do anything. :)
>>> dm.state, dm.delta
(0, 1)
The transaction will then call tpc_vote:
>>> jar.tpc_vote(t1)
This doesn't do anything either, because zodb4 data managers don't
actually do two-phase commit for subtransactions.
>>> dm.state, dm.delta
(0, 1)
Finally, we call tpc_finish. This does actally create a savepoint,
but we can't really tell that from outside.
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 1)
We'll do more of the above:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 2)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 2)
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
But then we'll decide to abort a subtransaction.
The transaction will just call abort as usual:
>>> jar.abort(t1)
This will cause a rollback to the last savepoint:
>>> dm.state, dm.delta
(0, 2)
Then we do more work:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 3)
Note that the bove works *because* the same transaction is used
for each subtransaction.
Finally, we'll do a little more work:
>>> dm.inc()
>>> dm.inc()
>>> dm.state, dm.delta
(0, 5)
and then commit the top-level transaction.
The transaction will actually go through the steps for a subtransaction:
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
And then call commit_sub:
>>> jar.commit_sub(t1)
As usual, this doesn't actually do anything. ;)
>>> dm.state, dm.delta
(0, 5)
The transaction manager doesn's call tpc_begin, because commit_sub
implies the start of two-phase commit. Next, it does call commit:
>>> jar.commit(t1)
which doesn't do anything.
Finally, the transaction calls tpc_vote:
>>> jar.tpc_vote(t1)
which actually does something (because this is the top-level txn):
>>> dm.state, dm.delta
(5, 5)
>>> dm.prepared
True
Now, at the last minute, the transaction is aborted (possibly due
to a "no vote" from another data manager):
>>> jar.tpc_abort(t1)
An the changes are undone:
>>> dm.state, dm.delta
(0, 0)
>>> dm.prepared
False
"""
def test_suite():
return DocTestSuite()
......
......@@ -24,6 +24,7 @@ get = manager.get
begin = manager.begin
commit = manager.commit
abort = manager.abort
savepoint = manager.savepoint
def get_transaction():
from ZODB.utils import deprecated36
......
......@@ -67,6 +67,9 @@ class TransactionManager(object):
def abort(self, sub=False):
self.get().abort(sub)
def savepoint(self, optimistic=False):
return self.get().savepoint(optimistic)
class ThreadTransactionManager(TransactionManager):
"""Thread-aware transaction manager.
......
......@@ -30,6 +30,8 @@ registers its _p_jar attribute. TODO: explain adapter
Subtransactions
---------------
Note: Suntransactions are deprecated!
A subtransaction applies the transaction notion recursively. It
allows a set of modifications within a transaction to be committed or
aborted as a group. A subtransaction is a strictly local activity;
......@@ -82,7 +84,7 @@ calls the following four methods on each resource manager; it calls
tpc_begin() on each resource manager before calling commit() on any of
them.
1. tpc_begin(txn, subtransaction=False)
1. tpc_begin(txn)
2. commit(txn)
3. tpc_vote(txn)
4. tpc_finish(txn)
......@@ -90,6 +92,8 @@ them.
Subtransaction commit
---------------------
Note: Subtransactions are deprecated!
When a subtransaction commits, the protocol is different.
1. tpc_begin() is passed a second argument, which indicates that a
......@@ -128,8 +132,7 @@ uncommitted object, including the object that failed in its commit(),
call abort().
Once uncommitted objects are aborted, tpc_abort() or abort_sub() is
called on each resource manager. abort_sub() is called if the
resource manager was involved in a subtransaction.
called on each resource manager.
Synchronization
---------------
......@@ -213,14 +216,6 @@ class Transaction(object):
self.log = logging.getLogger("txn.%d" % thread.get_ident())
self.log.debug("new transaction")
# _sub contains all of the resource managers involved in
# subtransactions. It maps id(a resource manager) to the resource
# manager.
self._sub = {}
# _nonsub contains all the resource managers that do not support
# subtransactions that were involved in subtransaction commits.
self._nonsub = {}
# If a commit fails, the traceback is saved in _failure_traceback.
# If another attempt is made to commit, TransactionFailedError is
# raised, incorporating this traceback.
......@@ -231,6 +226,9 @@ class Transaction(object):
# inefficient for FIFO access of this kind.
self._before_commit = []
# Keep track of the last savepoint
self._last_savepoint = None
# Raise TransactionFailedError, due to commit()/join()/register()
# getting called when the current transaction has already suffered
# a commit failure.
......@@ -258,6 +256,34 @@ class Transaction(object):
resource = DataManagerAdapter(resource)
self._resources.append(resource)
if self._last_savepoint is not None:
self._last_savepoint.join(resource)
def savepoint(self, optimistic=False):
if self.status is Status.COMMITFAILED:
self._prior_commit_failed() # doesn't return, it raises
try:
savepoint = Savepoint(optimistic)
for resource in self._resources:
savepoint.join(resource)
except:
self._cleanup(self._resources)
self._saveCommitishError() # doesn't return, it raises!
if self._last_savepoint is not None:
savepoint.previous = self._last_savepoint
self._last_savepoint.next = savepoint
self._last_savepoint = savepoint
return savepoint
def _invalidate_last_savepoint(self):
# Invalidate the last savepoint and any previous
# savepoints. This is done on a commit or abort.
if self._last_savepoint is not None:
self._last_savepoint._invalidate_previous()
self._last_savepoint = None
def register(self, obj):
# The old way of registering transaction participants.
#
......@@ -273,9 +299,6 @@ class Transaction(object):
raise ValueError("Register with no manager")
adapter = self._adapters.get(manager)
if adapter is None:
if myhasattr(manager, "commit_sub"):
adapter = MultiObjectResourceAdapterSub(manager)
else:
adapter = MultiObjectResourceAdapter(manager)
adapter.objects.append(obj)
self._adapters[manager] = adapter
......@@ -286,46 +309,46 @@ class Transaction(object):
assert id(obj) not in map(id, adapter.objects)
adapter.objects.append(obj)
# In the presence of subtransactions, an existing adapter
# might be in _adapters but not in _resources.
if adapter not in self._resources:
self._resources.append(adapter)
def begin(self):
from ZODB.utils import deprecated36
deprecated36("Transaction.begin() should no longer be used; use "
"the begin() method of a transaction manager.")
if (self._resources or
self._sub or
self._nonsub or
self._synchronizers):
if (self._resources or self._synchronizers):
self.abort()
# Else aborting wouldn't do anything, except if _manager is non-None,
# in which case it would do nothing besides uselessly free() this
# transaction.
def commit(self, subtransaction=False):
self._invalidate_last_savepoint()
if subtransaction:
# TODO depricate subtransactions
self.savepoint(1)
return
if self.status is Status.COMMITFAILED:
self._prior_commit_failed() # doesn't return
if not subtransaction:
self._callBeforeCommitHooks()
if not subtransaction and self._sub and self._resources:
# This commit is for a top-level transaction that has
# previously committed subtransactions. Do one last
# subtransaction commit to clear out the current objects,
# then commit all the subjars.
self.commit(True)
if not subtransaction:
self._synchronizers.map(lambda s: s.beforeCompletion(self))
self.status = Status.COMMITTING
try:
self._commitResources(subtransaction)
self._commitResources()
except:
self._saveCommitishError() # This raises!
self.status = Status.COMMITTED
if self._manager:
self._manager.free(self)
self._synchronizers.map(lambda s: s.afterCompletion(self))
self.log.debug("commit")
def _saveCommitishError(self):
self.status = Status.COMMITFAILED
# Save the traceback for TransactionFailedError.
ft = self._failure_traceback = StringIO()
......@@ -338,14 +361,6 @@ class Transaction(object):
ft.writelines(traceback.format_exception_only(t, v))
raise t, v, tb
if subtransaction:
self._resources = []
else:
self.status = Status.COMMITTED
if self._manager:
self._manager.free(self)
self._synchronizers.map(lambda s: s.afterCompletion(self))
self.log.debug("commit")
def beforeCommitHook(self, hook, *args, **kws):
self._before_commit.append((hook, args, kws))
......@@ -357,28 +372,17 @@ class Transaction(object):
hook, args, kws = self._before_commit.pop(0)
hook(*args, **kws)
def _commitResources(self, subtransaction):
def _commitResources(self):
# Execute the two-phase commit protocol.
L = self._getResourceManagers(subtransaction)
L = list(self._resources)
L.sort(rm_cmp)
try:
for rm in L:
# If you pass subtransaction=True to tpc_begin(), it
# will create a temporary storage for the duration of
# the transaction. To signal that the top-level
# transaction is committing, you must then call
# commit_sub().
if not subtransaction and id(rm) in self._sub:
del self._sub[id(rm)]
rm.commit_sub(self)
else:
rm.tpc_begin(self, subtransaction)
rm.tpc_begin(self)
for rm in L:
rm.commit(self)
self.log.debug("commit %r" % rm)
if not subtransaction:
# Not sure why, but it is intentional that you do not
# call tpc_vote() for subtransaction commits.
for rm in L:
rm.tpc_vote(self)
self._voted[id(rm)] = True
......@@ -401,7 +405,6 @@ class Transaction(object):
try:
self._cleanup(L)
finally:
if not subtransaction:
self._synchronizers.map(lambda s: s.afterCompletion(self))
raise t, v, tb
......@@ -415,68 +418,30 @@ class Transaction(object):
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
for rm in L:
if id(rm) in self._sub:
try:
rm.abort_sub(self)
except Exception:
self.log.error("Error in abort_sub() on manager %s",
rm, exc_info=sys.exc_info())
else:
try:
rm.tpc_abort(self)
except Exception:
self.log.error("Error in tpc_abort() on manager %s",
rm, exc_info=sys.exc_info())
def _getResourceManagers(self, subtransaction):
L = []
if subtransaction:
# If we are in a subtransaction, make sure all resource
# managers are placed in either _sub or _nonsub. When
# the top-level transaction commits, we need to merge
# these back into the resource set.
# If a data manager doesn't support sub-transactions, we
# don't do anything with it now. (That's somewhat okay,
# because subtransactions are mostly just an
# optimization.) Save it until the top-level transaction
# commits.
def abort(self, subtransaction=False):
for rm in self._resources:
if myhasattr(rm, "commit_sub"):
self._sub[id(rm)] = rm
L.append(rm)
else:
self._nonsub[id(rm)] = rm
else:
if self._sub or self._nonsub:
# Merge all of _sub, _nonsub, and _resources.
d = dict(self._sub)
d.update(self._nonsub)
# TODO: I think _sub and _nonsub are disjoint, and that
# _resources is empty. If so, we can simplify this code.
assert len(d) == len(self._sub) + len(self._nonsub)
assert not self._resources
for rm in self._resources:
d[id(rm)] = rm
L = d.values()
else:
L = list(self._resources)
if subtransaction:
# TODO deprecate subtransactions
if not self._last_savepoint:
raise interfaces.InvalidSavepointRollbackError
if self._last_savepoint.valid:
# We're supposed to be able to call abort(1) multiple
# times. Sigh.
self._last_savepoint.rollback()
return
L.sort(rm_cmp)
return L
self._invalidate_last_savepoint()
def abort(self, subtransaction=False):
if not subtransaction:
self._synchronizers.map(lambda s: s.beforeCompletion(self))
if subtransaction and self._nonsub:
from ZODB.POSException import TransactionError
raise TransactionError("Resource manager does not support "
"subtransaction abort")
tb = None
for rm in self._resources + self._nonsub.values():
for rm in self._resources:
try:
rm.abort(self)
except:
......@@ -485,20 +450,11 @@ class Transaction(object):
self.log.error("Failed to abort resource manager: %s",
rm, exc_info=sys.exc_info())
if not subtransaction:
for rm in self._sub.values():
try:
rm.abort_sub(self)
except:
if tb is None:
t, v, tb = sys.exc_info()
self.log.error("Failed to abort_sub resource manager: %s",
rm, exc_info=sys.exc_info())
if not subtransaction:
if self._manager:
self._manager.free(self)
self._synchronizers.map(lambda s: s.afterCompletion(self))
self.log.debug("abort")
if tb is not None:
......@@ -539,8 +495,8 @@ class MultiObjectResourceAdapter(object):
def sortKey(self):
return self.manager.sortKey()
def tpc_begin(self, txn, sub=False):
self.manager.tpc_begin(txn, sub)
def tpc_begin(self, txn):
self.manager.tpc_begin(txn)
def tpc_finish(self, txn):
self.manager.tpc_finish(txn)
......@@ -571,25 +527,6 @@ class MultiObjectResourceAdapter(object):
if tb is not None:
raise t, v, tb
class MultiObjectResourceAdapterSub(MultiObjectResourceAdapter):
"""Adapt resource managers that participate in subtransactions."""
def commit_sub(self, txn):
self.manager.commit_sub(txn)
def abort_sub(self, txn):
self.manager.abort_sub(txn)
def tpc_begin(self, txn, sub=False):
self.manager.tpc_begin(txn, sub)
self.sub = sub
def tpc_finish(self, txn):
self.manager.tpc_finish(txn)
if self.sub:
self.objects = []
def rm_cmp(rm1, rm2):
return cmp(rm1.sortKey(), rm2.sortKey())
......@@ -624,50 +561,82 @@ class DataManagerAdapter(object):
def __init__(self, datamanager):
self._datamanager = datamanager
self._rollback = None
# TODO: I'm not sure why commit() doesn't do anything
def commit(self, transaction):
# We don't do anything here because ZODB4-style data managers
# didn't have a separate commit step
pass
def abort(self, transaction):
# We need to discard any changes since the last save point, or all
# changes
if self._rollback is None:
# No previous savepoint, so just abort
self._datamanager.abort(transaction)
else:
self._rollback()
def abort_sub(self, transaction):
self._datamanager.abort(transaction)
def commit_sub(self, transaction):
# Nothing to do wrt data, be we begin 2pc for the top-level
# trans
self._sub = False
def tpc_begin(self, transaction, subtransaction=False):
self._sub = subtransaction
def tpc_begin(self, transaction):
# We don't do anything here because ZODB4-style data managers
# didn't have a separate tpc_begin step
pass
def tpc_abort(self, transaction):
if self._sub:
self.abort(self, transaction)
else:
self._datamanager.abort(transaction)
def tpc_finish(self, transaction):
if self._sub:
self._rollback = self._datamanager.savepoint(transaction).rollback
else:
self._datamanager.commit(transaction)
def tpc_vote(self, transaction):
if not self._sub:
self._datamanager.prepare(transaction)
def sortKey(self):
return self._datamanager.sortKey()
class Savepoint:
"""Transaction savepoint
Transaction savepoints coordinate savepoints for data managers
participating in a transaction.
"""
interface.implements(interfaces.ISavepoint)
def __init__(self, optimistic):
self._savepoints = []
self.valid = True
self.next = self.previous = None
self.optimistic = optimistic
def join(self, datamanager):
try:
savepoint = datamanager.savepoint
except AttributeError:
if not self.optimistic:
raise TypeError("Savepoints unsupported", datamanager)
savepoint = NoRollbackSavepoint(datamanager)
else:
savepoint = savepoint()
self._savepoints.append(savepoint)
def rollback(self):
if not self.valid:
raise interfaces.InvalidSavepointRollbackError
self._invalidate_next()
for savepoint in self._savepoints:
savepoint.rollback()
def _invalidate_next(self):
self.valid = False
if self.next is not None:
self.next._invalidate_next()
def _invalidate_previous(self):
self.valid = False
if self.previous is not None:
self.previous._invalidate_previous()
class NoRollbackSavepoint:
def __init__(self, datamanager):
self.datamanager = datamanager
def rollback(self):
raise TypeError("Savepoints unsupported", self.datamanager)
......@@ -18,157 +18,46 @@ $Id$
import zope.interface
class ISynchronizer(zope.interface.Interface):
"""Objects that participate in the transaction-boundary notification API.
"""
def beforeCompletion(transaction):
"""Hook that is called by the transaction at the start of a commit."""
def afterCompletion(transaction):
"""Hook that is called by the transaction after completing a commit."""
class ITransactionManager(zope.interface.Interface):
"""An object that manages a sequence of transactions
class IDataManager(zope.interface.Interface):
"""Objects that manage transactional storage.
These objects may manage data for other objects, or they may manage
non-object storages, such as relational databases.
IDataManagerOriginal is the interface currently provided by ZODB
database connections, but the intent is to move to the newer
IDataManager.
Applications use transaction managers to establish transaction boundaries.
"""
def abort_sub(transaction):
"""Discard all subtransaction data.
See subtransaction.txt
This is called when top-level transactions are aborted.
No further subtransactions can be started once abort_sub()
has been called; this is only used when the transaction is
being aborted.
abort_sub also implies the abort of a 2-phase commit.
def begin():
"""Begin a new transaction.
This should never fail.
If an existing transaction is in progress, it will be aborted.
"""
def commit_sub(transaction):
"""Commit all changes made in subtransactions and begin 2-phase commit
Data are saved *as if* they are part of the current transaction.
That is, they will not be persistent unless the current transaction
is committed.
This is called when the current top-level transaction is committed.
No further subtransactions can be started once commit_sub()
has been called; this is only used when the transaction is
being committed.
This call also implies the beginning of 2-phase commit.
def get():
"""Get the current transaction.
"""
# Two-phase commit protocol. These methods are called by the
# ITransaction object associated with the transaction being
# committed.
def tpc_begin(transaction, subtransaction=False):
"""Begin commit of a transaction, starting the two-phase commit.
transaction is the ITransaction instance associated with the
transaction being committed.
subtransaction is a Boolean flag indicating whether the
two-phase commit is being invoked for a subtransaction.
Important note: Subtransactions are modelled in the sense that
when you commit a subtransaction, subsequent commits should be
for subtransactions as well. That is, there must be a
commit_sub() call between a tpc_begin() call with the
subtransaction flag set to true and a tpc_begin() with the
flag set to false.
def commit():
"""Commit the current transaction
"""
def tpc_abort(transaction):
"""Abort a transaction.
This is called by a transaction manager to end a two-phase commit on
the data manager.
This is always called after a tpc_begin call.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail.
def abort(self):
"""Abort the current transaction
"""
def tpc_finish(transaction):
"""Indicate confirmation that the transaction is done.
def registerSynch(synch):
"""Register an ISynchronizer.
transaction is the ITransaction instance associated with the
transaction being committed.
Synchronizers are notified at the beginning and end of
transaction completion.
This should never fail. If this raises an exception, the
database is not expected to maintain consistency; it's a
serious error.
It's important that the storage calls the passed function
while it still has its lock. We don't want another thread
to be able to read any updated data until we've had a chance
to send an invalidation message to all of the other
connections!
"""
def tpc_vote(transaction):
"""Verify that a data manager can commit the transaction
This is the last chance for a data manager to vote 'no'. A
data manager votes 'no' by raising an exception.
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def commit(transaction):
"""Commit modifications to registered objects.
Save the object as part of the data to be made persistent if
the transaction commits.
This includes conflict detection and handling. If no conflicts or
errors occur it saves the objects in the storage.
"""
def abort(transaction):
"""Abort a transaction and forget all changes.
Abort must be called outside of a two-phase commit.
Abort is called by the transaction manager to abort transactions
that are not yet in a two-phase commit.
"""
def unregisterSynch(synch):
"""Unregister an ISynchronizer.
def sortKey():
"""Return a key to use for ordering registered DataManagers
Synchronizers are notified at the beginning and end of
transaction completion.
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
# Alternate version:
#"""Return a consistent sort key for this connection.
#
#This allows ordering multiple connections that use the same storage in
#a consistent manner. This is unique for the lifetime of a connection,
#which is good enough to avoid ZEO deadlocks.
#"""
class ITransaction(zope.interface.Interface):
"""Object representing a running transaction.
......@@ -219,9 +108,17 @@ class ITransaction(zope.interface.Interface):
def join(datamanager):
"""Add a datamanager to the transaction.
The if the data manager supports savepoints, it must call this
*before* making any changes. If the transaction has had any
savepoints, then it will take a savepoint of the data manager
when join is called and this savepoint must reflct the state
of the data manager before any changes that caused the data
manager to join the transaction.
The datamanager must implement the
transactions.interfaces.IDataManager interface, and be
adaptable to ZODB.interfaces.IDataManager.
"""
def note(text):
......@@ -293,3 +190,161 @@ class ITransactionDeprecated(zope.interface.Interface):
# TODO: deprecate this for 3.6.
def register(object):
"""Register the given object for transaction control."""
class IDataManager(zope.interface.Interface):
"""Objects that manage transactional storage.
These objects may manage data for other objects, or they may manage
non-object storages, such as relational databases.
IDataManagerOriginal is the interface currently provided by ZODB
database connections, but the intent is to move to the newer
IDataManager.
Note that when data are modified, data managers should join a
transaction so that data can be committed when the user commits
the transaction.
"""
# Two-phase commit protocol. These methods are called by the
# ITransaction object associated with the transaction being
# committed.
def abort(transaction):
"""Abort a transaction and forget all changes.
Abort must be called outside of a two-phase commit.
Abort is called by the transaction manager to abort transactions
that are not yet in a two-phase commit.
"""
def tpc_begin(transaction):
"""Begin commit of a transaction, starting the two-phase commit.
transaction is the ITransaction instance associated with the
transaction being committed.
subtransaction is a Boolean flag indicating whether the
two-phase commit is being invoked for a subtransaction.
Important note: Subtransactions are modelled in the sense that
when you commit a subtransaction, subsequent commits should be
for subtransactions as well. That is, there must be a
commit_sub() call between a tpc_begin() call with the
subtransaction flag set to true and a tpc_begin() with the
flag set to false.
"""
def commit(transaction):
"""Commit modifications to registered objects.
Save the object as part of the data to be made persistent if
the transaction commits.
This includes conflict detection and handling. If no conflicts or
errors occur it saves the objects in the storage.
"""
def tpc_abort(transaction):
"""Abort a transaction.
This is called by a transaction manager to end a two-phase commit on
the data manager.
This is always called after a tpc_begin call.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail.
"""
def tpc_vote(transaction):
"""Verify that a data manager can commit the transaction
This is the last chance for a data manager to vote 'no'. A
data manager votes 'no' by raising an exception.
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def tpc_finish(transaction):
"""Indicate confirmation that the transaction is done.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail. If this raises an exception, the
database is not expected to maintain consistency; it's a
serious error.
It's important that the storage calls the passed function
while it still has its lock. We don't want another thread
to be able to read any updated data until we've had a chance
to send an invalidation message to all of the other
connections!
"""
def sortKey():
"""Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
# Alternate version:
#"""Return a consistent sort key for this connection.
#
#This allows ordering multiple connections that use the same storage in
#a consistent manner. This is unique for the lifetime of a connection,
#which is good enough to avoid ZEO deadlocks.
#"""
class ISavepointDataManager(IDataManager):
def savepoint():
"""Return a savepoint (ISavepoint)
"""
class ISavepoint(zope.interface.Interface):
"""A transaction savepoint
"""
def rollback():
"""Rollback any work done since the savepoint
An InvalidSavepointRollbackError is raised if the savepoint
isn't valid.
"""
valid = zope.interface.Attribute(
"Boolean indicating whether the savepoint is valid")
class InvalidSavepointRollbackError(Exception):
"""Attempt to rollback an invalid savepoint
A savepoint may be invalid because:
- The surrounding transaction has committed or aborted
- An earlier savepoint in the same transaction has been rolled back
"""
class ISynchronizer(zope.interface.Interface):
"""Objects that participate in the transaction-boundary notification API.
"""
def beforeCompletion(transaction):
"""Hook that is called by the transaction at the start of a commit.
"""
def afterCompletion(transaction):
"""Hook that is called by the transaction after completing a commit.
"""
Savepoints
==========
Savepoints provide a way to save to disk intermediate work done during
a transaction allowing:
- partial transaction (subtransaction) rollback (abort)
- state of saved objects to be freed, freeing on-line memory for other
uses
Savepoints make it possible to write atomic subroutines that don't
make top-level transaction commitments.
Applications
------------
To demonstrate how savepoints work with transactions, we've provided a
sample data manager implementation that provides savepoint support.
The primary purpose of this data manager is to provide code that can
be read to understand how savepoints work. The secondary purpose is to
provide support for demonstrating the correct operation of savepoint
support within the transaction system. This data manager is very
simple. It provides flat storage of named immutable values, like strings
and numbers.
>>> import transaction.tests.savepointsample
>>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
>>> dm['name'] = 'bob'
As with other data managers, we can commit changes:
>>> transaction.commit()
>>> dm['name']
'bob'
and abort changes:
>>> dm['name'] = 'sally'
>>> dm['name']
'sally'
>>> transaction.abort()
>>> dm['name']
'bob'
Now, lets look at an application that manages funds for people.
It allows deposits and debits to be entered for multiple people.
It accepts a sequence of entries and generates a sequence of status
messages. For each entry, it applies the change and then validates
the user's account. If the user's account is invalid, we role back
the change for that entry. The success or failure of an entry is
indicated in the output status. First we'll initialize some accounts:
>>> dm['bob-balance'] = 0.0
>>> dm['bob-credit'] = 0.0
>>> dm['sally-balance'] = 0.0
>>> dm['sally-credit'] = 100.0
>>> transaction.commit()
Now, we'll define a validation function to validate an account:
>>> def validate_account(name):
... if dm[name+'-balance'] + dm[name+'-credit'] < 0:
... raise ValueError('Overdrawn', name)
And a function to apply entries. If the function fails in some
unexpected way, it rolls back all of it's changes and
prints the error:
>>> def apply_entries(entries):
... savepoint = transaction.savepoint()
... try:
... for name, amount in entries:
... entry_savepoint = transaction.savepoint()
... try:
... dm[name+'-balance'] += amount
... validate_account(name)
... except ValueError, error:
... entry_savepoint.rollback()
... print 'Error', str(error)
... else:
... print 'Updated', name
... except Exception, error:
... savepoint.rollback()
... print 'Unexpected exception', error
Now let's try applying some entries:
>>> apply_entries([
... ('bob', 10.0),
... ('sally', 10.0),
... ('bob', 20.0),
... ('sally', 10.0),
... ('bob', -100.0),
... ('sally', -100.0),
... ])
Updated bob
Updated sally
Updated bob
Updated sally
Error ('Overdrawn', 'bob')
Updated sally
>>> dm['bob-balance']
30.0
>>> dm['sally-balance']
-80.0
If we give provide entries that cause an unexpected error:
>>> apply_entries([
... ('bob', 10.0),
... ('sally', 10.0),
... ('bob', '20.0'),
... ('sally', 10.0),
... ])
Updated bob
Updated sally
Unexpected exception unsupported operand type(s) for +=: 'float' and 'str'
Because the apply_entries used a savepoint for the entire function,
it was able to rollback the partial changes without rolling back
changes made in the previous call to apply_entries:
>>> dm['bob-balance']
30.0
>>> dm['sally-balance']
-80.0
If we now abort the outer transactions, the earlier changes will go
away:
>>> transaction.abort()
>>> dm['bob-balance']
0.0
>>> dm['sally-balance']
0.0
Savepoint invalidation
----------------------
Once a savepoint has been used, it can't be used again:
>>> savepoint = transaction.savepoint()
>>> dm['bob-balance'] = 100.0
>>> dm['bob-balance']
100.0
>>> savepoint.rollback()
>>> dm['bob-balance']
0.0
>>> savepoint.rollback()
Traceback (most recent call last):
...
InvalidSavepointRollbackError
Using a savepoint also invalidates any savepoints that com eafter it:
>>> savepoint1 = transaction.savepoint()
>>> dm['bob-balance'] = 100.0
>>> dm['bob-balance']
100.0
>>> savepoint2 = transaction.savepoint()
>>> dm['bob-balance'] = 200.0
>>> dm['bob-balance']
200.0
>>> savepoint1.rollback()
>>> dm['bob-balance']
0.0
>>> savepoint2.rollback()
Traceback (most recent call last):
...
InvalidSavepointRollbackError
>>> transaction.abort()
Databases without savepoint support
-----------------------------------
Normally it's an error to use savepoints with databases that don't
support savepoints:
>>> dm_no_sp = transaction.tests.savepointsample.SampleDataManager()
>>> dm_no_sp['name'] = 'bob'
>>> transaction.commit()
>>> dm_no_sp['name'] = 'sally'
>>> savepoint = transaction.savepoint()
Traceback (most recent call last):
...
TypeError: ('Savepoints unsupported', {'name': 'bob'})
>>> transaction.abort()
However, a flag can be passed to the transaction savepoint method to
indicate that databases without savepoint support should be tolderated
until a savepoint is roled back. This allows transactions to proceed
is there are no reasons to roll back:
>>> dm_no_sp['name'] = 'sally'
>>> savepoint = transaction.savepoint(1)
>>> dm_no_sp['name'] = 'sue'
>>> transaction.commit()
>>> dm_no_sp['name']
'sue'
>>> savepoint = transaction.savepoint(1)
>>> dm_no_sp['name'] = 'sam'
>>> savepoint.rollback()
Traceback (most recent call last):
...
TypeError: ('Savepoints unsupported', {'name': 'sam'})
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Savepoint data manager implementation example
Sample data manager implementation that illustrates how to implement savepoints
See savepoint.txt in the transaction package.
$Id$
"""
import UserDict
from zope import interface
import transaction.interfaces
class SampleDataManager(UserDict.DictMixin):
"""Sample implementation of data manager that doesn't support savepoints
This data manager stores named simple values, like strings and numbers.
"""
interface.implements(transaction.interfaces.ISavepointDataManager)
def __init__(self, transaction_manager = None):
if transaction_manager is None:
# Use the thread-local transaction manager if none is provided:
transaction_manager = transaction.manager
self.transaction_manager = transaction_manager
# Our committed and uncommitted data:
self.committed = {}
self.uncommitted = self.committed.copy()
# Our transaction state:
# If our uncommitted data is modified, we'll join a transaction
# and keep track of the transaction we joined. Any commit
# related messages we get should be for this same transaction
self.transaction = None
# What phase, if any, of two-phase commit we are in:
self.tpc_phase = None
#######################################################################
# Provide a mapping interface to uncommitted data. We provide
# a basic subset of the interface. DictMixin does the rest.
def __getitem__(self, name):
return self.uncommitted[name]
def __setitem__(self, name, value):
self._join() # join the current transaction, if we haven't already
self.uncommitted[name] = value
def __delitem__(self, name):
self._join() # join the current transaction, if we haven't already
del self.uncommitted[name]
def keys(self):
return self.uncommitted.keys()
#
#######################################################################
#######################################################################
# Transaction methods
def _join(self):
# If this is the first change in the transaction, join the transaction
if self.transaction is None:
self.transaction = self.transaction_manager.get()
self.transaction.join(self)
def _resetTransaction(self):
self.transaction = None
self.tpc_phase = None
def abort(self, transaction):
"""Throw away changes made before the commit process has started
"""
assert ((transaction is self.transaction) or (self.transaction is None)
), "Must not change transactions"
assert self.tpc_phase is None, "Must be called outside of tpc"
self.uncommitted = self.committed.copy()
self._resetTransaction()
def tpc_begin(self, transaction):
"""Enter two-phase commit
"""
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase is None, "Must be called outside of tpc"
self.tpc_phase = 1
def commit(self, transaction):
"""Record data modified during the transaction
"""
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase == 1, "Must be called in first phase of tpc"
# In our simple example, we don't need to do anything.
# A more complex data manager would typically write to some sort
# of log.
def tpc_vote(self, transaction):
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase == 1, "Must be called in first phase of tpc"
# This particular data manager is always ready to vote.
# Real data managers will usually need to take some steps to
# make sure that the finish will succeed
self.tpc_phase = 2
def tpc_finish(self, transaction):
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase == 2, "Must be called in second phase of tpc"
self.committed = self.uncommitted.copy()
self._resetTransaction()
def tpc_abort(self, transaction):
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase is not None, "Must be called inside of tpc"
self.uncommitted = self.committed.copy()
self._resetTransaction()
#
#######################################################################
#######################################################################
# Other data manager methods
def sortKey(self):
# Commit operations on multiple data managers are performed in
# sort key order. This important to avoid deadlock when data
# managers are shared among multiple threads or processes and
# use locks to manage that sharing. We aren't going to bother
# with that here.
return str(id(self))
#
#######################################################################
class SampleSavepointDataManager(SampleDataManager):
"""Sample implementation of a savepoint-supporting data manager
This extends the basic data manager with savepoint support.
"""
def savepoint(self):
# When we create the savepoint, we save the existing database state
return SampleSavepoint(self, self.uncommitted.copy())
def _rollback_savepoint(self, savepoint):
# when we rollback the savepoint, we restore the saved data
self.uncommitted = savepoint.data
class SampleSavepoint:
def __init__(self, data_manager, data):
self.data_manager = data_manager
self.data = data
def rollback(self):
self.data_manager._rollback_savepoint(self)
......@@ -128,7 +128,7 @@ class TestConnection:
def sortKey(self):
return str(id(self))
def tpc_begin(self, txn, sub):
def tpc_begin(self, txn):
self.calls.append("begin")
def tpc_vote(self, txn):
......
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of savepoint feature
$Id$
"""
import unittest
from zope.testing import doctest
def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite('../savepoint.txt'),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment