Commit 476e7bd8 authored by Tim Peters's avatar Tim Peters

ISynchronizer grows a newTransaction() method, called

whenever TransactionManager.begin() is called.

Connection implements that, and changes its ISynchronizer
afterCompletion() method, to call sync() on its storage
(if the storage has such a method), and to process
invalidations in any case.

The bottom line is that storage sync() will get done "by
magic" now after top-level commit() and abort(), and after
explicit TransactionManager.begin().  This should make it
possible to deprecate Connection.sync(), although I'm not
doing that yet.  Made a small but meaningful start by
purging many sync() calls from some of the nastiest ZEO
tests -- and they still work fine.
parent e3f4fb77
...@@ -5,9 +5,6 @@ Release date: DD-MMM-2005 ...@@ -5,9 +5,6 @@ Release date: DD-MMM-2005
transaction transaction
----------- -----------
- A ``getBeforeCommitHooks()`` method was added. It returns an iterable
producing the registered beforeCommit hooks.
- Doing a subtransaction commit erroneously processed invalidations, which - Doing a subtransaction commit erroneously processed invalidations, which
could lead to an inconsistent view of the database. For example, let T be could lead to an inconsistent view of the database. For example, let T be
the transaction of which the subtransaction commit was a part. If T read a the transaction of which the subtransaction commit was a part. If T read a
...@@ -24,6 +21,24 @@ transaction ...@@ -24,6 +21,24 @@ transaction
could fail, and despite that T never modifed O. could fail, and despite that T never modifed O.
- A ``getBeforeCommitHooks()`` method was added. It returns an iterable
producing the registered beforeCommit hooks.
- The ``ISynchronizer`` interface has a new ``newTransaction()`` method.
This is invoked whenever a transaction manager's ``begin()`` method is
called. (Note that a transaction object's (as opposed to a transaction
manager's) ``begin()`` method is deprecated, and ``newTransaction()``
is not called when using the deprecated method.)
- Relatedly, ``Connection`` implements ``ISynchronizer``, and ``Connection``'s
``afterCompletion()`` and ``newTransaction()`` methods now call ``sync()``
on the underlying storage (if the underlying storage has such a method),
in addition to processing invalidations. The practical implication is that
storage synchronization will be done automatically now, whenever a
transaction is explicitly started, and after top-level transaction commit
or abort. As a result, ``Connection.sync()`` should virtually never be
needed anymore, and will eventually be deprecated.
What's new in ZODB3 3.4a5? What's new in ZODB3 3.4a5?
========================== ==========================
......
...@@ -640,6 +640,7 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -640,6 +640,7 @@ class ConnectionTests(CommonSetupTearDown):
r1["a"] = MinPO("a") r1["a"] = MinPO("a")
transaction.commit() transaction.commit()
self.assertEqual(r1._p_state, 0) # up-to-date
db2 = DB(self.openClientStorage()) db2 = DB(self.openClientStorage())
r2 = db2.open().root() r2 = db2.open().root()
...@@ -649,18 +650,16 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -649,18 +650,16 @@ class ConnectionTests(CommonSetupTearDown):
r2["b"] = MinPO("b") r2["b"] = MinPO("b")
transaction.commit() transaction.commit()
# make sure the invalidation is received in the other client # Make sure the invalidation is received in the other client.
for i in range(10): for i in range(10):
c1._storage.sync() if r1._p_state == -1:
if c1._invalidated.has_key(r1._p_oid):
break break
time.sleep(0.1) time.sleep(0.1)
self.assert_(c1._invalidated.has_key(r1._p_oid)) self.assertEqual(r1._p_state, -1) # ghost
# force the invalidations to be applied...
c1.sync()
r1.keys() # unghostify r1.keys() # unghostify
self.assertEqual(r1._p_serial, r2._p_serial) self.assertEqual(r1._p_serial, r2._p_serial)
self.assertEqual(r1["b"].value, "b")
db2.close() db2.close()
db1.close() db1.close()
......
...@@ -88,8 +88,7 @@ class StressTask: ...@@ -88,8 +88,7 @@ class StressTask:
try: try:
self.tm.get().commit() self.tm.get().commit()
except ConflictError, msg: except ConflictError, msg:
self.tm.get().abort() self.tm.abort()
cn.sync()
else: else:
if self.sleep: if self.sleep:
time.sleep(self.sleep) time.sleep(self.sleep)
...@@ -152,7 +151,6 @@ class StressThread(FailableThread): ...@@ -152,7 +151,6 @@ class StressThread(FailableThread):
break break
except (ConflictError, KeyError): except (ConflictError, KeyError):
transaction.abort() transaction.abort()
cn.sync()
key = self.startnum key = self.startnum
while not self.stop.isSet(): while not self.stop.isSet():
try: try:
...@@ -164,11 +162,6 @@ class StressThread(FailableThread): ...@@ -164,11 +162,6 @@ class StressThread(FailableThread):
time.sleep(self.sleep) time.sleep(self.sleep)
except (ReadConflictError, ConflictError), msg: except (ReadConflictError, ConflictError), msg:
transaction.abort() transaction.abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
else: else:
self.added_keys.append(key) self.added_keys.append(key)
key += self.step key += self.step
...@@ -201,7 +194,6 @@ class LargeUpdatesThread(FailableThread): ...@@ -201,7 +194,6 @@ class LargeUpdatesThread(FailableThread):
except (ConflictError, KeyError): except (ConflictError, KeyError):
# print "%d getting tree abort" % self.threadnum # print "%d getting tree abort" % self.threadnum
transaction.abort() transaction.abort()
cn.sync()
keys_added = {} # set of keys we commit keys_added = {} # set of keys we commit
tkeys = [] tkeys = []
...@@ -223,7 +215,6 @@ class LargeUpdatesThread(FailableThread): ...@@ -223,7 +215,6 @@ class LargeUpdatesThread(FailableThread):
except (ReadConflictError, ConflictError), msg: except (ReadConflictError, ConflictError), msg:
# print "%d setting key %s" % (self.threadnum, msg) # print "%d setting key %s" % (self.threadnum, msg)
transaction.abort() transaction.abort()
cn.sync()
break break
else: else:
# print "%d set #%d" % (self.threadnum, len(keys)) # print "%d set #%d" % (self.threadnum, len(keys))
...@@ -236,16 +227,10 @@ class LargeUpdatesThread(FailableThread): ...@@ -236,16 +227,10 @@ class LargeUpdatesThread(FailableThread):
except ConflictError, msg: except ConflictError, msg:
# print "%d commit %s" % (self.threadnum, msg) # print "%d commit %s" % (self.threadnum, msg)
transaction.abort() transaction.abort()
cn.sync()
continue continue
for k in keys: for k in keys:
tkeys.remove(k) tkeys.remove(k)
keys_added[k] = 1 keys_added[k] = 1
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
self.added_keys = keys_added.keys() self.added_keys = keys_added.keys()
cn.close() cn.close()
...@@ -287,7 +272,6 @@ class VersionStressThread(FailableThread): ...@@ -287,7 +272,6 @@ class VersionStressThread(FailableThread):
break break
except (ConflictError, KeyError): except (ConflictError, KeyError):
transaction.abort() transaction.abort()
cn.sync()
while not self.stop.isSet(): while not self.stop.isSet():
try: try:
tree[key] = self.threadnum tree[key] = self.threadnum
...@@ -297,11 +281,6 @@ class VersionStressThread(FailableThread): ...@@ -297,11 +281,6 @@ class VersionStressThread(FailableThread):
break break
except (VersionLockError, ReadConflictError, ConflictError), msg: except (VersionLockError, ReadConflictError, ConflictError), msg:
transaction.abort() transaction.abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
if self.sleep: if self.sleep:
time.sleep(self.sleep) time.sleep(self.sleep)
try: try:
...@@ -319,7 +298,6 @@ class VersionStressThread(FailableThread): ...@@ -319,7 +298,6 @@ class VersionStressThread(FailableThread):
return commit return commit
except ConflictError, msg: except ConflictError, msg:
transaction.abort() transaction.abort()
cn.sync()
finally: finally:
cn.close() cn.close()
return 0 return 0
...@@ -351,7 +329,6 @@ class InvalidationTests: ...@@ -351,7 +329,6 @@ class InvalidationTests:
except ReadConflictError: except ReadConflictError:
if retries: if retries:
transaction.abort() transaction.abort()
cn.sync()
else: else:
raise raise
except: except:
......
...@@ -27,7 +27,9 @@ from persistent import PickleCache ...@@ -27,7 +27,9 @@ from persistent import PickleCache
# interfaces # interfaces
from persistent.interfaces import IPersistentDataManager from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection from ZODB.interfaces import IConnection
from transaction.interfaces import ISavepointDataManager, IDataManagerSavepoint from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
from transaction.interfaces import ISynchronizer
from zope.interface import implements from zope.interface import implements
import transaction import transaction
...@@ -59,7 +61,10 @@ def resetCaches(): ...@@ -59,7 +61,10 @@ def resetCaches():
class Connection(ExportImport, object): class Connection(ExportImport, object):
"""Connection to ZODB for loading and storing objects.""" """Connection to ZODB for loading and storing objects."""
implements(IConnection, ISavepointDataManager, IPersistentDataManager) implements(IConnection,
ISavepointDataManager,
IPersistentDataManager,
ISynchronizer)
_storage = _normal_storage = _savepoint_storage = None _storage = _normal_storage = _savepoint_storage = None
...@@ -291,11 +296,8 @@ class Connection(ExportImport, object): ...@@ -291,11 +296,8 @@ class Connection(ExportImport, object):
def sync(self): def sync(self):
"""Manually update the view on the database.""" """Manually update the view on the database."""
self._txn_mgr.get().abort() self._txn_mgr.abort()
sync = getattr(self._storage, 'sync', 0) self._storage_sync()
if sync:
sync()
self._flush_invalidations()
def getDebugInfo(self): def getDebugInfo(self):
"""Returns a tuple with different items for debugging the """Returns a tuple with different items for debugging the
...@@ -379,6 +381,7 @@ class Connection(ExportImport, object): ...@@ -379,6 +381,7 @@ class Connection(ExportImport, object):
self._needs_to_join = True self._needs_to_join = True
self._registered_objects = [] self._registered_objects = []
# Process pending invalidations.
def _flush_invalidations(self): def _flush_invalidations(self):
self._inv_lock.acquire() self._inv_lock.acquire()
try: try:
...@@ -650,10 +653,19 @@ class Connection(ExportImport, object): ...@@ -650,10 +653,19 @@ class Connection(ExportImport, object):
# We don't do anything before a commit starts. # We don't do anything before a commit starts.
pass pass
def afterCompletion(self, txn): # Call the underlying storage's sync() method (if any), and process
# pending invalidations regardless. Of course this should only be
# called at transaction boundaries.
def _storage_sync(self, *ignored):
sync = getattr(self._storage, 'sync', 0)
if sync:
sync()
self._flush_invalidations() self._flush_invalidations()
# Transaction-manager synchronization -- ISynchronizer afterCompletion = _storage_sync
newTransaction = _storage_sync
# Transaction-manager synchronization -- ISynchronizer
########################################################################## ##########################################################################
########################################################################## ##########################################################################
......
...@@ -279,8 +279,8 @@ class IConnection(Interface): ...@@ -279,8 +279,8 @@ class IConnection(Interface):
"""Manually update the view on the database. """Manually update the view on the database.
This includes aborting the current transaction, getting a fresh and This includes aborting the current transaction, getting a fresh and
consistent view of the data (synchronizing with the storage if possible) consistent view of the data (synchronizing with the storage if
and call cacheGC() for this connection. possible) and calling cacheGC() for this connection.
This method was especially useful in ZODB 3.2 to better support This method was especially useful in ZODB 3.2 to better support
read-only connections that were affected by a couple of problems. read-only connections that were affected by a couple of problems.
......
Here are some tests that storage sync() methods get called at appropriate
times in the life of a transaction. The tested behavior is new in ZODB 3.4.
First define a lightweight storage with a sync() method:
>>> import ZODB
>>> from ZODB.MappingStorage import MappingStorage
>>> import transaction
>>> class SimpleStorage(MappingStorage):
... sync_called = False
...
... def sync(self, *args):
... self.sync_called = True
Make a change locally:
>>> st = SimpleStorage()
>>> db = ZODB.DB(st)
>>> cn = db.open()
>>> rt = cn.root()
>>> rt['a'] = 1
Sync should not have been called yet.
>>> st.sync_called # False before 3.4
False
sync is called by the Connection's afterCompletion() hook after the commit
completes.
>>> transaction.commit()
>>> st.sync_called # False before 3.4
True
sync is also called by the afterCompletion() hook after an abort.
>>> st.sync_called = False
>>> rt['b'] = 2
>>> transaction.abort()
>>> st.sync_called # False before 3.4
True
And sync is called whenever we explicitly start a new txn, via the
newTransaction() hook.
>>> st.sync_called = False
>>> dummy = transaction.begin()
>>> st.sync_called # False before 3.4
True
Clean up. Closing db isn't enough -- closing a DB doesn't close its
Connections. Leaving our Connection open here can cause the
SimpleStorage.sync() method to get called later, during another test, and
our doctest-synthesized module globals no longer exist then. You get
a weird traceback then ;-)
>>> cn.close()
>>> db.close()
...@@ -21,19 +21,18 @@ import persistent.dict, transaction ...@@ -21,19 +21,18 @@ import persistent.dict, transaction
def testAddingThenModifyThenAbort(): def testAddingThenModifyThenAbort():
"""\ """\
We ran into a problem in which abort failed after adding an object in We ran into a problem in which abort failed after adding an object in
a savepoint and then modifying the object. The problem was that, on a savepoint and then modifying the object. The problem was that, on
commit, the savepoint was aborted before the modifications were commit, the savepoint was aborted before the modifications were
aborted. Because the object was added in the savepoint, it's _p_oid aborted. Because the object was added in the savepoint, its _p_oid
and _p_jar were cleared when the savepoint was aborted. The object and _p_jar were cleared when the savepoint was aborted. The object
was in the registered-object list. There's an invariant for this was in the registered-object list. There's an invariant for this
lists that states that all objects in the list should have an oid and list that states that all objects in the list should have an oid and
(correct) jar. (correct) jar.
The fix was to abort work done after he savepoint before aborting the The fix was to abort work done after the savepoint before aborting the
savepoint. savepoint.
>>> import ZODB.tests.util >>> import ZODB.tests.util
>>> db = ZODB.tests.util.DB() >>> db = ZODB.tests.util.DB()
>>> connection = db.open() >>> connection = db.open()
...@@ -44,21 +43,19 @@ savepoint. ...@@ -44,21 +43,19 @@ savepoint.
>>> sp = transaction.savepoint() >>> sp = transaction.savepoint()
>>> ob.x = 1 >>> ob.x = 1
>>> transaction.abort() >>> transaction.abort()
""" """
def testModifyThenSavePointThenModifySomeMoreThenCommit(): def testModifyThenSavePointThenModifySomeMoreThenCommit():
"""\ """\
We got conflict errors when we committed after we modified an object We got conflict errors when we committed after we modified an object
in a savepoint and then modified it some more after the last in a savepoint, and then modified it some more after the last
savepoint. savepoint.
The problem was that we were effectively commiting the object twice -- The problem was that we were effectively commiting the object twice --
when commiting the current data and when committing the savepoint. when commiting the current data and when committing the savepoint.
The fix was to first make a new savepoint to move new changes to the The fix was to first make a new savepoint to move new changes to the
savepoint storage and *then* to commit the savepoint storage. (This is savepoint storage and *then* to commit the savepoint storage. (This is
similar to thr strategy that was used for subtransactions prior to similar to the strategy that was used for subtransactions prior to
savepoints.) savepoints.)
...@@ -71,7 +68,6 @@ savepoints.) ...@@ -71,7 +68,6 @@ savepoints.)
>>> sp = transaction.savepoint() >>> sp = transaction.savepoint()
>>> root['a'] = 2 >>> root['a'] = 2
>>> transaction.commit() >>> transaction.commit()
""" """
def test_suite(): def test_suite():
...@@ -82,4 +78,3 @@ def test_suite(): ...@@ -82,4 +78,3 @@ def test_suite():
if __name__ == '__main__': if __name__ == '__main__':
unittest.main(defaultTest='test_suite') unittest.main(defaultTest='test_suite')
...@@ -44,7 +44,7 @@ It allows deposits and debits to be entered for multiple people. ...@@ -44,7 +44,7 @@ It allows deposits and debits to be entered for multiple people.
It accepts a sequence of entries and generates a sequence of status It accepts a sequence of entries and generates a sequence of status
messages. For each entry, it applies the change and then validates messages. For each entry, it applies the change and then validates
the user's account. If the user's account is invalid, we role back the user's account. If the user's account is invalid, we role back
the change for that entry. The success or failure of an entry is the change for that entry. The success or failure of an entry is
indicated in the output status. First we'll initialize some accounts: indicated in the output status. First we'll initialize some accounts:
>>> root['bob-balance'] = 0.0 >>> root['bob-balance'] = 0.0
...@@ -60,7 +60,7 @@ Now, we'll define a validation function to validate an account: ...@@ -60,7 +60,7 @@ Now, we'll define a validation function to validate an account:
... raise ValueError('Overdrawn', name) ... raise ValueError('Overdrawn', name)
And a function to apply entries. If the function fails in some And a function to apply entries. If the function fails in some
unexpected way, it rolls back all of it's changes and unexpected way, it rolls back all of it's changes and
prints the error: prints the error:
>>> def apply_entries(entries): >>> def apply_entries(entries):
...@@ -102,7 +102,7 @@ Now let's try applying some entries: ...@@ -102,7 +102,7 @@ Now let's try applying some entries:
>>> root['sally-balance'] >>> root['sally-balance']
-80.0 -80.0
If we give provide entries that cause an unexpected error: If we give provide entries that cause an unexpected error:
>>> apply_entries([ >>> apply_entries([
...@@ -115,7 +115,7 @@ If we give provide entries that cause an unexpected error: ...@@ -115,7 +115,7 @@ If we give provide entries that cause an unexpected error:
Updated sally Updated sally
Unexpected exception unsupported operand type(s) for +=: 'float' and 'str' Unexpected exception unsupported operand type(s) for +=: 'float' and 'str'
Because the apply_entries used a savepoint for the entire function, Because the apply_entries used a savepoint for the entire function,
it was able to rollback the partial changes without rolling back it was able to rollback the partial changes without rolling back
changes made in the previous call to apply_entries: changes made in the previous call to apply_entries:
......
...@@ -17,4 +17,5 @@ from zope.testing.doctestunit import DocFileSuite ...@@ -17,4 +17,5 @@ from zope.testing.doctestunit import DocFileSuite
def test_suite(): def test_suite():
return DocFileSuite("dbopen.txt", return DocFileSuite("dbopen.txt",
"multidb.txt", "multidb.txt",
"synchronizers.txt",
) )
...@@ -32,6 +32,16 @@ from transaction._transaction import Transaction ...@@ -32,6 +32,16 @@ from transaction._transaction import Transaction
# Obscure: because of the __init__.py maze, we can't import WeakSet # Obscure: because of the __init__.py maze, we can't import WeakSet
# at top level here. # at top level here.
# Call the ISynchronizer newTransaction() method on every element of
# WeakSet synchs.
# A transaction manager needs to do this whenever begin() is called.
# Since it would be good if tm.get() returned the new transaction while
# newTransaction() is running, calling this has to be delayed until after
# the transaction manager has done whatever it needs to do to make its
# get() return the new txn.
def _new_transaction(txn, synchs):
synchs.map(lambda s: s.newTransaction(txn))
class TransactionManager(object): class TransactionManager(object):
def __init__(self): def __init__(self):
...@@ -43,8 +53,9 @@ class TransactionManager(object): ...@@ -43,8 +53,9 @@ class TransactionManager(object):
def begin(self): def begin(self):
if self._txn is not None: if self._txn is not None:
self._txn.abort() self._txn.abort()
self._txn = Transaction(self._synchs, self) txn = self._txn = Transaction(self._synchs, self)
return self._txn _new_transaction(txn, self._synchs)
return txn
def get(self): def get(self):
if self._txn is None: if self._txn is None:
...@@ -91,6 +102,7 @@ class ThreadTransactionManager(TransactionManager): ...@@ -91,6 +102,7 @@ class ThreadTransactionManager(TransactionManager):
txn.abort() txn.abort()
synchs = self._synchs.get(tid) synchs = self._synchs.get(tid)
txn = self._txns[tid] = Transaction(synchs, self) txn = self._txns[tid] = Transaction(synchs, self)
_new_transaction(txn, synchs)
return txn return txn
def get(self): def get(self):
......
...@@ -28,6 +28,9 @@ class ITransactionManager(zope.interface.Interface): ...@@ -28,6 +28,9 @@ class ITransactionManager(zope.interface.Interface):
"""Begin a new transaction. """Begin a new transaction.
If an existing transaction is in progress, it will be aborted. If an existing transaction is in progress, it will be aborted.
The newTransaction() method of registered synchronizers is called,
passing the new transaction object.
""" """
def get(): def get():
...@@ -55,15 +58,15 @@ class ITransactionManager(zope.interface.Interface): ...@@ -55,15 +58,15 @@ class ITransactionManager(zope.interface.Interface):
def registerSynch(synch): def registerSynch(synch):
"""Register an ISynchronizer. """Register an ISynchronizer.
Synchronizers are notified at the beginning and end of Synchronizers are notified about some major events in a transaction's
transaction completion. life. See ISynchronizer for details.
""" """
def unregisterSynch(synch): def unregisterSynch(synch):
"""Unregister an ISynchronizer. """Unregister an ISynchronizer.
Synchronizers are notified at the beginning and end of Synchronizers are notified about some major events in a transaction's
transaction completion. life. See ISynchronizer for details.
""" """
class ITransaction(zope.interface.Interface): class ITransaction(zope.interface.Interface):
...@@ -365,3 +368,10 @@ class ISynchronizer(zope.interface.Interface): ...@@ -365,3 +368,10 @@ class ISynchronizer(zope.interface.Interface):
def afterCompletion(transaction): def afterCompletion(transaction):
"""Hook that is called by the transaction after completing a commit. """Hook that is called by the transaction after completing a commit.
""" """
def newTransaction(transaction):
"""Hook that is called at the start of a transaction.
This hook is called when, and only when, a transaction manager's
begin() method is called explictly.
"""
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment