Commit 61db5159 authored by Tim Peters's avatar Tim Peters

Finally changed Connection to use the new transaction join API. This

essentially means a connection keep track of which objects from
the connection are modified, instead of transactions keeping track of that.
A good consequence hasn't yet been implemented:  if a connection is
closed with modifications still pending, we can detect that efficiently
now, and complain.
parent 552b63d7
......@@ -13,7 +13,7 @@
##############################################################################
"""Database connection support
$Id: Connection.py,v 1.148 2004/04/15 16:22:38 jeremy Exp $"""
$Id: Connection.py,v 1.149 2004/04/16 01:08:12 tim_one Exp $"""
import logging
import sys
......@@ -192,8 +192,17 @@ class Connection(ExportImport, object):
self._reset_counter = global_reset_counter
self._load_count = 0 # Number of objects unghosted
self._store_count = 0 # Number of objects stored
# List of oids of modified objects (to be invalidated on an abort).
self._modified = []
# List of all objects (not oids) registered as modified by the
# persistence machinery.
self._registered_objects = []
# Do we need to join a txn manager?
self._needs_to_join = True
# If a transaction manager is passed to the constructor, use
# it instead of the global transaction manager. The instance
# variable will hold a TM instance.
......@@ -373,7 +382,7 @@ class Connection(ExportImport, object):
obj._p_jar = self
if self._added_during_commit is not None:
self._added_during_commit.append(obj)
self._txn_mgr.get().register(obj)
self._register(obj)
# Add to _added after calling register(), so that _added
# can be used as a test for whether the object has been
# registered with the transaction.
......@@ -440,22 +449,23 @@ class Connection(ExportImport, object):
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
def abort(self, object, transaction):
def abort(self, transaction):
"""Abort the object in the transaction.
This just deactivates the thing.
"""
if object is self:
self._flush_invalidations()
else:
oid = object._p_oid
for obj in self._registered_objects:
oid = obj._p_oid
assert oid is not None
if oid in self._added:
del self._added[oid]
del object._p_jar
del object._p_oid
del obj._p_jar
del obj._p_oid
else:
self._cache.invalidate(object._p_oid)
self._cache.invalidate(oid)
self._tpc_cleanup()
# XXX should there be a way to call incrgc directly?
# perhaps "full sweep" should do that?
......@@ -546,24 +556,20 @@ class Connection(ExportImport, object):
# assert that here, because self may have been reused (by
# another thread) by the time we get back here.
def commit(self, obj, transaction):
if obj is self:
# We registered ourself. Execute a commit action, if any.
def commit(self, transaction):
if self._import:
# XXX eh?
self._importDuringCommit(transaction, *self._import)
self._import = None
return
for obj in self._registered_objects:
oid = obj._p_oid
assert oid
if oid in self._conflicts:
raise ReadConflictError(object=obj)
if oid is None or obj._p_jar is not self:
# new object
oid = self.new_oid()
obj._p_jar = self
obj._p_oid = oid
assert obj._p_serial == z64
if obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)
elif oid in self._added:
assert obj._p_serial == z64
elif obj._p_changed:
......@@ -573,8 +579,10 @@ class Connection(ExportImport, object):
raise ConflictError(object=obj)
self._modified.append(oid)
else:
# Nothing to do
return
# Nothing to do. It's been said that it's legal, e.g., for
# an object to set _p_changed to false after it's been
# changed and registered.
continue
self._store_objects(ObjectWriter(obj), transaction)
......@@ -626,8 +634,8 @@ class Connection(ExportImport, object):
self._storage.tpc_begin(t)
# Copy invalidating and creating info from temporary storage:
self._modified[len(self._modified):] = oids
self._creating[len(self._creating):] = src._creating
self._modified.extend(oids)
self._creating.extend(src._creating)
for oid in oids:
data, serial = src.load(oid, src)
......@@ -745,7 +753,14 @@ class Connection(ExportImport, object):
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._txn_mgr.get().register(obj)
self._register(obj)
def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj)
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
def root(self):
"""Return the database root object.
......@@ -825,7 +840,7 @@ class Connection(ExportImport, object):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
self._txn_mgr.get().register(obj)
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
......@@ -867,7 +882,7 @@ class Connection(ExportImport, object):
finally:
self._inv_lock.release()
else:
self._txn_mgr.get().register(obj)
self._register(obj)
raise ReadConflictError(object=obj)
def oldstate(self, obj, tid):
......@@ -912,20 +927,6 @@ class Connection(ExportImport, object):
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def tpc_abort(self, transaction):
if self._import:
self._import = None
self._storage.tpc_abort(transaction)
self._cache.invalidate(self._modified)
self._conflicts.clear()
if not self._synch:
self._flush_invalidations()
self._invalidate_creating()
while self._added:
oid, obj = self._added.popitem()
del obj._p_oid
del obj._p_jar
def tpc_begin(self, transaction, sub=False):
self._modified = []
......@@ -1009,10 +1010,28 @@ class Connection(ExportImport, object):
d[oid] = 1
self._db.invalidate(tid, d, self)
self._storage.tpc_finish(transaction, callback)
self._tpc_cleanup()
def tpc_abort(self, transaction):
if self._import:
self._import = None
self._storage.tpc_abort(transaction)
self._cache.invalidate(self._modified)
self._invalidate_creating()
while self._added:
oid, obj = self._added.popitem()
del obj._p_oid
del obj._p_jar
self._tpc_cleanup()
# Common cleanup actions after tpc_finish/tpc_abort.
def _tpc_cleanup(self):
self._conflicts.clear()
if not self._synch:
self._flush_invalidations()
self._needs_to_join = True
self._registered_objects = []
def sync(self):
self._txn_mgr.get().abort()
......@@ -1044,5 +1063,5 @@ class Connection(ExportImport, object):
new._p_oid = oid
new._p_jar = self
new._p_changed = 1
self._txn_mgr.get().register(new)
self._register(new)
self._cache[oid] = new
......@@ -65,13 +65,13 @@ class ExportImport:
return customImporters[magic](self, f, clue)
raise ExportError("Invalid export header")
t = self.getTransaction()
t = self._txn_mgr.get()
if clue:
t.note(clue)
return_oid_list = []
self._import = f, return_oid_list
self.getTransaction().register(self)
self._register()
t.commit(1)
# Return the root imported object.
if return_oid_list:
......
......@@ -21,6 +21,7 @@ from persistent import Persistent
from ZODB.config import databaseFromString
from ZODB.utils import p64, u64
from ZODB.tests.warnhook import WarningsHook
import transaction
class ConnectionDotAdd(unittest.TestCase):
......@@ -31,6 +32,9 @@ class ConnectionDotAdd(unittest.TestCase):
self.datamgr._setDB(self.db)
self.transaction = StubTransaction()
def tearDown(self):
transaction.abort()
def check_add(self):
from ZODB.POSException import InvalidObjectReference
obj = StubObject()
......@@ -61,7 +65,7 @@ class ConnectionDotAdd(unittest.TestCase):
obj = StubObject()
self.datamgr.add(obj)
oid = obj._p_oid
self.datamgr.abort(obj, self.transaction)
self.datamgr.abort(self.transaction)
self.assert_(obj._p_oid is None)
self.assert_(obj._p_jar is None)
self.assertRaises(KeyError, self.datamgr.get, oid)
......@@ -85,9 +89,8 @@ class ConnectionDotAdd(unittest.TestCase):
obj = StubObject()
self.datamgr.add(obj)
oid = obj._p_oid
self.datamgr.tpc_begin(self.transaction)
self.datamgr.commit(obj, self.transaction)
self.datamgr.commit(self.transaction)
# Let's pretend something bad happened here.
self.datamgr.tpc_abort(self.transaction)
self.assert_(obj._p_oid is None)
......@@ -99,9 +102,8 @@ class ConnectionDotAdd(unittest.TestCase):
obj = StubObject()
self.datamgr.add(obj)
oid = obj._p_oid
self.datamgr.tpc_begin(self.transaction)
self.datamgr.commit(obj, self.transaction)
self.datamgr.commit(self.transaction)
self.datamgr.tpc_finish(self.transaction)
self.assert_(obj._p_oid is oid)
self.assert_(obj._p_jar is self.datamgr)
......@@ -115,9 +117,9 @@ class ConnectionDotAdd(unittest.TestCase):
def checkModifyOnGetstate(self):
subobj = StubObject()
obj = ModifyOnGetStateObject(subobj)
self.datamgr.add(obj)
self.datamgr.tpc_begin(self.transaction)
self.datamgr.commit(obj, self.transaction)
self.datamgr.commit(self.transaction)
self.datamgr.tpc_finish(self.transaction)
storage = self.db._storage
self.assert_(obj._p_oid in storage._stored, "object was not stored")
......@@ -564,7 +566,6 @@ class StubDatabase:
def invalidate(self, transaction, dict_with_oid_keys, connection):
pass
def test_suite():
s = unittest.makeSuite(ConnectionDotAdd, 'check')
s.addTest(doctest.DocTestSuite())
......
......@@ -80,10 +80,10 @@ calls the following four methods on each resource manager; it calls
tpc_begin() on each resource manager before calling commit() on any of
them.
1. tpc_begin()
2. commit()
3. tpc_vote()
4. tpc_finish()
1. tpc_begin(txn, subtransaction=False)
2. commit(txn)
3. tpc_vote(txn)
4. tpc_finish(txn)
Subtransaction commit
---------------------
......@@ -91,11 +91,11 @@ Subtransaction commit
When a subtransaction commits, the protocol is different.
1. tpc_begin() is passed a second argument, which indicates that a
subtransaction is begin committed.
subtransaction is being committed.
2. tpc_vote() is not called.
Once a subtransaction has been committed, the top-level transaction
commit will start with a commit_sub() called instead of a tpc_begin()
commit will start with a commit_sub() call instead of a tpc_begin()
call.
Error handling
......@@ -321,7 +321,11 @@ class Transaction(object):
# Called when an exception occurs during tpc_vote or tpc_finish.
for rm in L:
if id(rm) not in self._voted:
rm.cleanup(self)
try:
rm.abort(self)
except Exception:
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
for rm in L:
if id(rm) in self._sub:
try:
......@@ -465,15 +469,9 @@ class MultiObjectResourceAdapter(object):
def tpc_vote(self, txn):
self.manager.tpc_vote(txn)
def cleanup(self, txn):
self._abort(self.objects[self.ncommitted:], txn)
def abort(self, txn):
self._abort(self.objects, txn)
def _abort(self, objects, txn):
tb = None
for o in objects:
for o in self.objects:
try:
self.manager.abort(o, txn)
except:
......
......@@ -67,9 +67,11 @@ abort() methods of Object to cause errors during commit.
Note that the implementation uses lists internally, so that objects
are committed in the order they are registered. (In the presence of
multiple resource managers, objects from a single resource manager are
committed in order. The order of resource managers depends on
sortKey().) I'm not sure if this is an accident of the implementation
or a feature that should be supported by any implementation.
committed in order. I'm not sure if this is an accident of the
implementation or a feature that should be supported by any
implementation.)
The order of resource managers depends on sortKey().
>>> cn = TestConnection()
>>> cn.register(Object())
......@@ -82,8 +84,7 @@ RuntimeError: commit
>>> len(cn.committed)
1
>>> len(cn.aborted)
2
3
"""
import transaction
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment