Commit f9de1eed authored by Jim Fulton's avatar Jim Fulton

Changed the strategy for managing savepoints. The requirements

for savepoint management are:

- All savepoints for a transaction should be invalidated when the
  transaction commits or aborts

- If a savepoint is rolled back, then all savepoints after it within 
  a transaction must be invalidated.

We previously implemented these requirements by organizing transaction
savepoints into a doubly linked list.  This was overkill.  We didn't
have need for such fine-grained ordering.  This strategy had the
disadvantage that it kept all savepoints around until the transaction
ended.  Savepoints could be expensive to keep and it's possible that
some applications could keep a lot of them.

The new stragey is to:

- Keep weak references to savepoints.  We can forget about savepoints
  that the application isn't using.  Any resources used by these
  savepoints can be freed.

(We have to keep a strong reference to the last savepoint used for
  a subtransaction.)

- We assign indexes to savepoints within a transaction.  When a
  savepoint is rolled back, in addition to invalidating that
  savepoint, we also invalidate savepoints with a higher index.

A side effect of this change is that code using the savepoint API
should interfere less with code using subtransactions.  Of course, we
really need to phase out code that uses subtransactions.

It is likely that we can leverage this change in strategy to speed
creation of ZODB connection savepoints.  Creating a ZODB connection
savepoint now requires copying the savepoint storage index.  This
index could become large.  If applications aren't holding on to old
savepoints, then it is possible that we could avoid this copy.
parent 0e8e3c55
...@@ -150,6 +150,7 @@ import logging ...@@ -150,6 +150,7 @@ import logging
import sys import sys
import thread import thread
import warnings import warnings
import weakref
import traceback import traceback
from cStringIO import StringIO from cStringIO import StringIO
...@@ -187,6 +188,21 @@ class Transaction(object): ...@@ -187,6 +188,21 @@ class Transaction(object):
interface.implements(interfaces.ITransaction, interface.implements(interfaces.ITransaction,
interfaces.ITransactionDeprecated) interfaces.ITransactionDeprecated)
# Assign an index to each savepoint so we can invalidate
# later savepoints on rollback
_savepoint_index = 0
# If savepoints are used, keep a weak key dict of them
_savepoints = {}
# Remamber the savepoint for the last subtransaction
_subtransaction_savepoint = None
# Meta data
user = ""
description = ""
def __init__(self, synchronizers=None, manager=None): def __init__(self, synchronizers=None, manager=None):
self.status = Status.ACTIVE self.status = Status.ACTIVE
# List of resource managers, e.g. MultiObjectResourceAdapters. # List of resource managers, e.g. MultiObjectResourceAdapters.
...@@ -209,8 +225,6 @@ class Transaction(object): ...@@ -209,8 +225,6 @@ class Transaction(object):
# The user, description, and _extension attributes are accessed # The user, description, and _extension attributes are accessed
# directly by storages, leading underscore notwithstanding. # directly by storages, leading underscore notwithstanding.
self.user = ""
self.description = ""
self._extension = {} self._extension = {}
self.log = logging.getLogger("txn.%d" % thread.get_ident()) self.log = logging.getLogger("txn.%d" % thread.get_ident())
...@@ -226,9 +240,6 @@ class Transaction(object): ...@@ -226,9 +240,6 @@ class Transaction(object):
# inefficient for FIFO access of this kind. # inefficient for FIFO access of this kind.
self._before_commit = [] self._before_commit = []
# Keep track of the last savepoint
self._last_savepoint = None
# Raise TransactionFailedError, due to commit()/join()/register() # Raise TransactionFailedError, due to commit()/join()/register()
# getting called when the current transaction has already suffered # getting called when the current transaction has already suffered
# a commit/savepoint failure. # a commit/savepoint failure.
...@@ -256,8 +267,26 @@ class Transaction(object): ...@@ -256,8 +267,26 @@ class Transaction(object):
resource = DataManagerAdapter(resource) resource = DataManagerAdapter(resource)
self._resources.append(resource) self._resources.append(resource)
if self._last_savepoint is not None:
self._last_savepoint.join(resource) if self._savepoints:
# A data manager has joined a transaction *after* a savepoint
# was created. A couple of things are different in this case:
# 1. We need to add it's savepoint to all previous savepoints.
# so that if they are rolled back, we roll this was back too.
# 2. We don't actualy need to ask it for a savepoint.
# Because is just joining. We can just abort it to roll
# back to the current state, so we simply use and
# AbortSavepoint.
datamanager_savepoint = AbortSavepoint(resource, self)
for ref in self._savepoints:
transaction_savepoint = ref()
if transaction_savepoint is not None:
transaction_savepoint._savepoints.append(
datamanager_savepoint)
def savepoint(self, optimistic=False): def savepoint(self, optimistic=False):
if self.status is Status.COMMITFAILED: if self.status is Status.COMMITFAILED:
...@@ -269,18 +298,44 @@ class Transaction(object): ...@@ -269,18 +298,44 @@ class Transaction(object):
self._cleanup(self._resources) self._cleanup(self._resources)
self._saveCommitishError() # reraises! self._saveCommitishError() # reraises!
if self._last_savepoint is not None:
savepoint.previous = self._last_savepoint self._savepoint_index += 1
self._last_savepoint.next = savepoint ref = weakref.ref(savepoint, self._remove_savepoint_ref)
self._last_savepoint = savepoint if self._savepoints is None:
self._savepoints = {}
self._savepoints[ref] = self._savepoint_index
return savepoint return savepoint
def _invalidate_last_savepoint(self): def _remove_savepoint_ref(self, ref):
# Invalidate the last savepoint and any previous try:
# savepoints. This is done on a commit or abort. del self._savepoints[ref]
if self._last_savepoint is not None: except KeyError:
self._last_savepoint._invalidate_previous() pass
self._last_savepoint = None
def _invalidate_next(self, savepoint):
savepoints = self._savepoints
ref = weakref.ref(savepoint)
index = savepoints[ref]
del savepoints[ref]
# use items to make copy to avoid mutating while iterating
for ref, i in savepoints.items():
if i > index:
savepoint = ref()
if savepoint is not None:
savepoint.transaction = None # invalidate
del savepoints[ref]
def _invalidate_savepoints(self):
savepoints = self._savepoints
for ref in savepoints:
savepoint = ref()
if savepoint is not None:
savepoint.transaction = None # invalidate
savepoints.clear()
def register(self, obj): def register(self, obj):
# The old way of registering transaction participants. # The old way of registering transaction participants.
...@@ -320,11 +375,12 @@ class Transaction(object): ...@@ -320,11 +375,12 @@ class Transaction(object):
def commit(self, subtransaction=False): def commit(self, subtransaction=False):
self._invalidate_last_savepoint() if self._savepoints:
self._invalidate_savepoints()
if subtransaction: if subtransaction:
# TODO depricate subtransactions # TODO deprecate subtransactions
self.savepoint(1) self._subtransaction_savepoint = self.savepoint(1)
return return
if self.status is Status.COMMITFAILED: if self.status is Status.COMMITFAILED:
...@@ -428,15 +484,16 @@ class Transaction(object): ...@@ -428,15 +484,16 @@ class Transaction(object):
if subtransaction: if subtransaction:
# TODO deprecate subtransactions # TODO deprecate subtransactions
if not self._last_savepoint: if not self._subtransaction_savepoint:
raise interfaces.InvalidSavepointRollbackError raise interfaces.InvalidSavepointRollbackError
if self._last_savepoint.valid: if self._subtransaction_savepoint.valid:
# We're supposed to be able to call abort(1) multiple # We're supposed to be able to call abort(1) multiple
# times. Sigh. # times. Sigh.
self._last_savepoint.rollback() self._subtransaction_savepoint.rollback()
return return
self._invalidate_last_savepoint() if self._savepoints:
self._invalidate_savepoints()
self._synchronizers.map(lambda s: s.beforeCompletion(self)) self._synchronizers.map(lambda s: s.beforeCompletion(self))
...@@ -598,18 +655,17 @@ class Savepoint: ...@@ -598,18 +655,17 @@ class Savepoint:
""" """
interface.implements(interfaces.ISavepoint) interface.implements(interfaces.ISavepoint)
valid = property(lambda self: self.transaction is not None)
def __init__(self, transaction, optimistic, *resources): def __init__(self, transaction, optimistic, *resources):
self.transaction = transaction self.transaction = transaction
self._savepoints = savepoints = [] self._savepoints = savepoints = []
self.valid = True
self.next = self.previous = None
self.optimistic = optimistic
for datamanager in resources: for datamanager in resources:
try: try:
savepoint = datamanager.savepoint savepoint = datamanager.savepoint
except AttributeError: except AttributeError:
if not self.optimistic: if not optimistic:
raise TypeError("Savepoints unsupported", datamanager) raise TypeError("Savepoints unsupported", datamanager)
savepoint = NoRollbackSavepoint(datamanager) savepoint = NoRollbackSavepoint(datamanager)
else: else:
...@@ -617,43 +673,19 @@ class Savepoint: ...@@ -617,43 +673,19 @@ class Savepoint:
savepoints.append(savepoint) savepoints.append(savepoint)
def join(self, datamanager):
# A data manager has joined a transaction *after* a savepoint
# was created. A couple of things are different in this case:
# 1. We need to add it's savepoint to all previous savepoints.
# so that if they are rolled back, we roll this was back too.
# 2. We don't actualy need to ask it for a savepoint. Because
# is just joining, then we can abort it if there is an error,
# so we use an AbortSavepoint.
savepoint = AbortSavepoint(datamanager, self.transaction)
while self is not None:
self._savepoints.append(savepoint)
self = self.previous
def rollback(self): def rollback(self):
if not self.valid: transaction = self.transaction
if transaction is None:
raise interfaces.InvalidSavepointRollbackError raise interfaces.InvalidSavepointRollbackError
self._invalidate_next() self.transaction = None
transaction._invalidate_next(self)
try: try:
for savepoint in self._savepoints: for savepoint in self._savepoints:
savepoint.rollback() savepoint.rollback()
except: except:
# Mark the transaction as failed # Mark the transaction as failed
self.transaction._saveCommitishError() # reraises! transaction._saveCommitishError() # reraises!
def _invalidate_next(self):
self.valid = False
if self.next is not None:
self.next._invalidate_next()
def _invalidate_previous(self):
self.valid = False
if self.previous is not None:
self.previous._invalidate_previous()
class AbortSavepoint: class AbortSavepoint:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment