Commit f416526d authored by Tim Peters's avatar Tim Peters

Finally changed Connection to use the new transaction join API. This

essentially means a connection keep track of which objects from
the connection are modified, instead of transactions keeping track of that.
A good consequence hasn't yet been implemented:  if a connection is
closed with modifications still pending, we can detect that efficiently
now, and complain.
parent dd06e3cf
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
############################################################################## ##############################################################################
"""Database connection support """Database connection support
$Id: Connection.py,v 1.148 2004/04/15 16:22:38 jeremy Exp $""" $Id: Connection.py,v 1.149 2004/04/16 01:08:12 tim_one Exp $"""
import logging import logging
import sys import sys
...@@ -192,8 +192,17 @@ class Connection(ExportImport, object): ...@@ -192,8 +192,17 @@ class Connection(ExportImport, object):
self._reset_counter = global_reset_counter self._reset_counter = global_reset_counter
self._load_count = 0 # Number of objects unghosted self._load_count = 0 # Number of objects unghosted
self._store_count = 0 # Number of objects stored self._store_count = 0 # Number of objects stored
# List of oids of modified objects (to be invalidated on an abort).
self._modified = [] self._modified = []
# List of all objects (not oids) registered as modified by the
# persistence machinery.
self._registered_objects = []
# Do we need to join a txn manager?
self._needs_to_join = True
# If a transaction manager is passed to the constructor, use # If a transaction manager is passed to the constructor, use
# it instead of the global transaction manager. The instance # it instead of the global transaction manager. The instance
# variable will hold a TM instance. # variable will hold a TM instance.
...@@ -373,7 +382,7 @@ class Connection(ExportImport, object): ...@@ -373,7 +382,7 @@ class Connection(ExportImport, object):
obj._p_jar = self obj._p_jar = self
if self._added_during_commit is not None: if self._added_during_commit is not None:
self._added_during_commit.append(obj) self._added_during_commit.append(obj)
self._txn_mgr.get().register(obj) self._register(obj)
# Add to _added after calling register(), so that _added # Add to _added after calling register(), so that _added
# can be used as a test for whether the object has been # can be used as a test for whether the object has been
# registered with the transaction. # registered with the transaction.
...@@ -440,22 +449,23 @@ class Connection(ExportImport, object): ...@@ -440,22 +449,23 @@ class Connection(ExportImport, object):
cache_size = self._cache.cache_size cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size) self._cache = cache = PickleCache(self, cache_size)
def abort(self, object, transaction): def abort(self, transaction):
"""Abort the object in the transaction. """Abort the object in the transaction.
This just deactivates the thing. This just deactivates the thing.
""" """
if object is self:
self._flush_invalidations() for obj in self._registered_objects:
else: oid = obj._p_oid
oid = object._p_oid
assert oid is not None assert oid is not None
if oid in self._added: if oid in self._added:
del self._added[oid] del self._added[oid]
del object._p_jar del obj._p_jar
del object._p_oid del obj._p_oid
else: else:
self._cache.invalidate(object._p_oid) self._cache.invalidate(oid)
self._tpc_cleanup()
# XXX should there be a way to call incrgc directly? # XXX should there be a way to call incrgc directly?
# perhaps "full sweep" should do that? # perhaps "full sweep" should do that?
...@@ -546,24 +556,20 @@ class Connection(ExportImport, object): ...@@ -546,24 +556,20 @@ class Connection(ExportImport, object):
# assert that here, because self may have been reused (by # assert that here, because self may have been reused (by
# another thread) by the time we get back here. # another thread) by the time we get back here.
def commit(self, obj, transaction): def commit(self, transaction):
if obj is self:
# We registered ourself. Execute a commit action, if any.
if self._import: if self._import:
# XXX eh?
self._importDuringCommit(transaction, *self._import) self._importDuringCommit(transaction, *self._import)
self._import = None self._import = None
return
for obj in self._registered_objects:
oid = obj._p_oid oid = obj._p_oid
assert oid
if oid in self._conflicts: if oid in self._conflicts:
raise ReadConflictError(object=obj) raise ReadConflictError(object=obj)
if oid is None or obj._p_jar is not self: if obj._p_jar is not self:
# new object raise InvalidObjectReference(obj, obj._p_jar)
oid = self.new_oid()
obj._p_jar = self
obj._p_oid = oid
assert obj._p_serial == z64
elif oid in self._added: elif oid in self._added:
assert obj._p_serial == z64 assert obj._p_serial == z64
elif obj._p_changed: elif obj._p_changed:
...@@ -573,8 +579,10 @@ class Connection(ExportImport, object): ...@@ -573,8 +579,10 @@ class Connection(ExportImport, object):
raise ConflictError(object=obj) raise ConflictError(object=obj)
self._modified.append(oid) self._modified.append(oid)
else: else:
# Nothing to do # Nothing to do. It's been said that it's legal, e.g., for
return # an object to set _p_changed to false after it's been
# changed and registered.
continue
self._store_objects(ObjectWriter(obj), transaction) self._store_objects(ObjectWriter(obj), transaction)
...@@ -626,8 +634,8 @@ class Connection(ExportImport, object): ...@@ -626,8 +634,8 @@ class Connection(ExportImport, object):
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
# Copy invalidating and creating info from temporary storage: # Copy invalidating and creating info from temporary storage:
self._modified[len(self._modified):] = oids self._modified.extend(oids)
self._creating[len(self._creating):] = src._creating self._creating.extend(src._creating)
for oid in oids: for oid in oids:
data, serial = src.load(oid, src) data, serial = src.load(oid, src)
...@@ -745,7 +753,14 @@ class Connection(ExportImport, object): ...@@ -745,7 +753,14 @@ class Connection(ExportImport, object):
elif obj._p_oid in self._added: elif obj._p_oid in self._added:
# It was registered before it was added to _added. # It was registered before it was added to _added.
return return
self._txn_mgr.get().register(obj) self._register(obj)
def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj)
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
def root(self): def root(self):
"""Return the database root object. """Return the database root object.
...@@ -825,7 +840,7 @@ class Connection(ExportImport, object): ...@@ -825,7 +840,7 @@ class Connection(ExportImport, object):
"""Load non-current state for obj or raise ReadConflictError.""" """Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)): if not (self._mvcc and self._setstate_noncurrent(obj)):
self._txn_mgr.get().register(obj) self._register(obj)
self._conflicts[obj._p_oid] = True self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj) raise ReadConflictError(object=obj)
...@@ -867,7 +882,7 @@ class Connection(ExportImport, object): ...@@ -867,7 +882,7 @@ class Connection(ExportImport, object):
finally: finally:
self._inv_lock.release() self._inv_lock.release()
else: else:
self._txn_mgr.get().register(obj) self._register(obj)
raise ReadConflictError(object=obj) raise ReadConflictError(object=obj)
def oldstate(self, obj, tid): def oldstate(self, obj, tid):
...@@ -912,20 +927,6 @@ class Connection(ExportImport, object): ...@@ -912,20 +927,6 @@ class Connection(ExportImport, object):
self._log.error("setklassstate failed", exc_info=sys.exc_info()) self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise raise
def tpc_abort(self, transaction):
if self._import:
self._import = None
self._storage.tpc_abort(transaction)
self._cache.invalidate(self._modified)
self._conflicts.clear()
if not self._synch:
self._flush_invalidations()
self._invalidate_creating()
while self._added:
oid, obj = self._added.popitem()
del obj._p_oid
del obj._p_jar
def tpc_begin(self, transaction, sub=False): def tpc_begin(self, transaction, sub=False):
self._modified = [] self._modified = []
...@@ -1009,10 +1010,28 @@ class Connection(ExportImport, object): ...@@ -1009,10 +1010,28 @@ class Connection(ExportImport, object):
d[oid] = 1 d[oid] = 1
self._db.invalidate(tid, d, self) self._db.invalidate(tid, d, self)
self._storage.tpc_finish(transaction, callback) self._storage.tpc_finish(transaction, callback)
self._tpc_cleanup()
def tpc_abort(self, transaction):
if self._import:
self._import = None
self._storage.tpc_abort(transaction)
self._cache.invalidate(self._modified)
self._invalidate_creating()
while self._added:
oid, obj = self._added.popitem()
del obj._p_oid
del obj._p_jar
self._tpc_cleanup()
# Common cleanup actions after tpc_finish/tpc_abort.
def _tpc_cleanup(self):
self._conflicts.clear() self._conflicts.clear()
if not self._synch: if not self._synch:
self._flush_invalidations() self._flush_invalidations()
self._needs_to_join = True
self._registered_objects = []
def sync(self): def sync(self):
self._txn_mgr.get().abort() self._txn_mgr.get().abort()
...@@ -1044,5 +1063,5 @@ class Connection(ExportImport, object): ...@@ -1044,5 +1063,5 @@ class Connection(ExportImport, object):
new._p_oid = oid new._p_oid = oid
new._p_jar = self new._p_jar = self
new._p_changed = 1 new._p_changed = 1
self._txn_mgr.get().register(new) self._register(new)
self._cache[oid] = new self._cache[oid] = new
...@@ -65,13 +65,13 @@ class ExportImport: ...@@ -65,13 +65,13 @@ class ExportImport:
return customImporters[magic](self, f, clue) return customImporters[magic](self, f, clue)
raise ExportError("Invalid export header") raise ExportError("Invalid export header")
t = self.getTransaction() t = self._txn_mgr.get()
if clue: if clue:
t.note(clue) t.note(clue)
return_oid_list = [] return_oid_list = []
self._import = f, return_oid_list self._import = f, return_oid_list
self.getTransaction().register(self) self._register()
t.commit(1) t.commit(1)
# Return the root imported object. # Return the root imported object.
if return_oid_list: if return_oid_list:
......
...@@ -21,6 +21,7 @@ from persistent import Persistent ...@@ -21,6 +21,7 @@ from persistent import Persistent
from ZODB.config import databaseFromString from ZODB.config import databaseFromString
from ZODB.utils import p64, u64 from ZODB.utils import p64, u64
from ZODB.tests.warnhook import WarningsHook from ZODB.tests.warnhook import WarningsHook
import transaction
class ConnectionDotAdd(unittest.TestCase): class ConnectionDotAdd(unittest.TestCase):
...@@ -31,6 +32,9 @@ class ConnectionDotAdd(unittest.TestCase): ...@@ -31,6 +32,9 @@ class ConnectionDotAdd(unittest.TestCase):
self.datamgr._setDB(self.db) self.datamgr._setDB(self.db)
self.transaction = StubTransaction() self.transaction = StubTransaction()
def tearDown(self):
transaction.abort()
def check_add(self): def check_add(self):
from ZODB.POSException import InvalidObjectReference from ZODB.POSException import InvalidObjectReference
obj = StubObject() obj = StubObject()
...@@ -61,7 +65,7 @@ class ConnectionDotAdd(unittest.TestCase): ...@@ -61,7 +65,7 @@ class ConnectionDotAdd(unittest.TestCase):
obj = StubObject() obj = StubObject()
self.datamgr.add(obj) self.datamgr.add(obj)
oid = obj._p_oid oid = obj._p_oid
self.datamgr.abort(obj, self.transaction) self.datamgr.abort(self.transaction)
self.assert_(obj._p_oid is None) self.assert_(obj._p_oid is None)
self.assert_(obj._p_jar is None) self.assert_(obj._p_jar is None)
self.assertRaises(KeyError, self.datamgr.get, oid) self.assertRaises(KeyError, self.datamgr.get, oid)
...@@ -85,9 +89,8 @@ class ConnectionDotAdd(unittest.TestCase): ...@@ -85,9 +89,8 @@ class ConnectionDotAdd(unittest.TestCase):
obj = StubObject() obj = StubObject()
self.datamgr.add(obj) self.datamgr.add(obj)
oid = obj._p_oid oid = obj._p_oid
self.datamgr.tpc_begin(self.transaction) self.datamgr.tpc_begin(self.transaction)
self.datamgr.commit(obj, self.transaction) self.datamgr.commit(self.transaction)
# Let's pretend something bad happened here. # Let's pretend something bad happened here.
self.datamgr.tpc_abort(self.transaction) self.datamgr.tpc_abort(self.transaction)
self.assert_(obj._p_oid is None) self.assert_(obj._p_oid is None)
...@@ -99,9 +102,8 @@ class ConnectionDotAdd(unittest.TestCase): ...@@ -99,9 +102,8 @@ class ConnectionDotAdd(unittest.TestCase):
obj = StubObject() obj = StubObject()
self.datamgr.add(obj) self.datamgr.add(obj)
oid = obj._p_oid oid = obj._p_oid
self.datamgr.tpc_begin(self.transaction) self.datamgr.tpc_begin(self.transaction)
self.datamgr.commit(obj, self.transaction) self.datamgr.commit(self.transaction)
self.datamgr.tpc_finish(self.transaction) self.datamgr.tpc_finish(self.transaction)
self.assert_(obj._p_oid is oid) self.assert_(obj._p_oid is oid)
self.assert_(obj._p_jar is self.datamgr) self.assert_(obj._p_jar is self.datamgr)
...@@ -115,9 +117,9 @@ class ConnectionDotAdd(unittest.TestCase): ...@@ -115,9 +117,9 @@ class ConnectionDotAdd(unittest.TestCase):
def checkModifyOnGetstate(self): def checkModifyOnGetstate(self):
subobj = StubObject() subobj = StubObject()
obj = ModifyOnGetStateObject(subobj) obj = ModifyOnGetStateObject(subobj)
self.datamgr.add(obj)
self.datamgr.tpc_begin(self.transaction) self.datamgr.tpc_begin(self.transaction)
self.datamgr.commit(obj, self.transaction) self.datamgr.commit(self.transaction)
self.datamgr.tpc_finish(self.transaction) self.datamgr.tpc_finish(self.transaction)
storage = self.db._storage storage = self.db._storage
self.assert_(obj._p_oid in storage._stored, "object was not stored") self.assert_(obj._p_oid in storage._stored, "object was not stored")
...@@ -564,7 +566,6 @@ class StubDatabase: ...@@ -564,7 +566,6 @@ class StubDatabase:
def invalidate(self, transaction, dict_with_oid_keys, connection): def invalidate(self, transaction, dict_with_oid_keys, connection):
pass pass
def test_suite(): def test_suite():
s = unittest.makeSuite(ConnectionDotAdd, 'check') s = unittest.makeSuite(ConnectionDotAdd, 'check')
s.addTest(doctest.DocTestSuite()) s.addTest(doctest.DocTestSuite())
......
...@@ -80,10 +80,10 @@ calls the following four methods on each resource manager; it calls ...@@ -80,10 +80,10 @@ calls the following four methods on each resource manager; it calls
tpc_begin() on each resource manager before calling commit() on any of tpc_begin() on each resource manager before calling commit() on any of
them. them.
1. tpc_begin() 1. tpc_begin(txn, subtransaction=False)
2. commit() 2. commit(txn)
3. tpc_vote() 3. tpc_vote(txn)
4. tpc_finish() 4. tpc_finish(txn)
Subtransaction commit Subtransaction commit
--------------------- ---------------------
...@@ -91,11 +91,11 @@ Subtransaction commit ...@@ -91,11 +91,11 @@ Subtransaction commit
When a subtransaction commits, the protocol is different. When a subtransaction commits, the protocol is different.
1. tpc_begin() is passed a second argument, which indicates that a 1. tpc_begin() is passed a second argument, which indicates that a
subtransaction is begin committed. subtransaction is being committed.
2. tpc_vote() is not called. 2. tpc_vote() is not called.
Once a subtransaction has been committed, the top-level transaction Once a subtransaction has been committed, the top-level transaction
commit will start with a commit_sub() called instead of a tpc_begin() commit will start with a commit_sub() call instead of a tpc_begin()
call. call.
Error handling Error handling
...@@ -321,7 +321,11 @@ class Transaction(object): ...@@ -321,7 +321,11 @@ class Transaction(object):
# Called when an exception occurs during tpc_vote or tpc_finish. # Called when an exception occurs during tpc_vote or tpc_finish.
for rm in L: for rm in L:
if id(rm) not in self._voted: if id(rm) not in self._voted:
rm.cleanup(self) try:
rm.abort(self)
except Exception:
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
for rm in L: for rm in L:
if id(rm) in self._sub: if id(rm) in self._sub:
try: try:
...@@ -465,15 +469,9 @@ class MultiObjectResourceAdapter(object): ...@@ -465,15 +469,9 @@ class MultiObjectResourceAdapter(object):
def tpc_vote(self, txn): def tpc_vote(self, txn):
self.manager.tpc_vote(txn) self.manager.tpc_vote(txn)
def cleanup(self, txn):
self._abort(self.objects[self.ncommitted:], txn)
def abort(self, txn): def abort(self, txn):
self._abort(self.objects, txn)
def _abort(self, objects, txn):
tb = None tb = None
for o in objects: for o in self.objects:
try: try:
self.manager.abort(o, txn) self.manager.abort(o, txn)
except: except:
......
...@@ -67,9 +67,11 @@ abort() methods of Object to cause errors during commit. ...@@ -67,9 +67,11 @@ abort() methods of Object to cause errors during commit.
Note that the implementation uses lists internally, so that objects Note that the implementation uses lists internally, so that objects
are committed in the order they are registered. (In the presence of are committed in the order they are registered. (In the presence of
multiple resource managers, objects from a single resource manager are multiple resource managers, objects from a single resource manager are
committed in order. The order of resource managers depends on committed in order. I'm not sure if this is an accident of the
sortKey().) I'm not sure if this is an accident of the implementation implementation or a feature that should be supported by any
or a feature that should be supported by any implementation. implementation.)
The order of resource managers depends on sortKey().
>>> cn = TestConnection() >>> cn = TestConnection()
>>> cn.register(Object()) >>> cn.register(Object())
...@@ -82,8 +84,7 @@ RuntimeError: commit ...@@ -82,8 +84,7 @@ RuntimeError: commit
>>> len(cn.committed) >>> len(cn.committed)
1 1
>>> len(cn.aborted) >>> len(cn.aborted)
2 3
""" """
import transaction import transaction
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment