Commit 81e78f9d authored by Jeremy Hylton's avatar Jeremy Hylton

Add automatic synchronization for Connection objects.

This changes the behavior slightly for invalidations.  A Connection
will receive invalidations even if it didn't have any modified objects
in the transaction, e.g. db.undo() or a read-only transaction.  For
this to work, it assumes that the thread that called db.open() is the
thread that is going to use the connection.  If this isn't true, the
synch argument to db.open() can be used to disable the new feature.
parent 65df3786
...@@ -469,7 +469,7 @@ class NastyConfict(Base, TestCase): ...@@ -469,7 +469,7 @@ class NastyConfict(Base, TestCase):
r1["t"] = self.t r1["t"] = self.t
get_transaction().commit() get_transaction().commit()
r2 = self.db.open().root() r2 = self.db.open(synch=False).root()
copy = r2["t"] copy = r2["t"]
# Make sure all of copy is loaded. # Make sure all of copy is loaded.
list(copy.values()) list(copy.values())
...@@ -546,7 +546,7 @@ class NastyConfict(Base, TestCase): ...@@ -546,7 +546,7 @@ class NastyConfict(Base, TestCase):
r1["t"] = self.t r1["t"] = self.t
get_transaction().commit() get_transaction().commit()
r2 = self.db.open().root() r2 = self.db.open(synch=False).root()
copy = r2["t"] copy = r2["t"]
# Make sure all of copy is loaded. # Make sure all of copy is loaded.
list(copy.values()) list(copy.values())
...@@ -687,7 +687,7 @@ class NastyConfict(Base, TestCase): ...@@ -687,7 +687,7 @@ class NastyConfict(Base, TestCase):
r1["t"] = self.t r1["t"] = self.t
get_transaction().commit() get_transaction().commit()
r2 = self.db.open().root() r2 = self.db.open(synch=False).root()
copy = r2["t"] copy = r2["t"]
# Make sure all of copy is loaded. # Make sure all of copy is loaded.
list(copy.values()) list(copy.values())
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
############################################################################## ##############################################################################
"""Database connection support """Database connection support
$Id: Connection.py,v 1.146 2004/04/09 11:11:32 gintautasm Exp $""" $Id: Connection.py,v 1.147 2004/04/14 20:45:37 jeremy Exp $"""
import logging import logging
import sys import sys
...@@ -104,6 +104,32 @@ class Connection(ExportImport, object): ...@@ -104,6 +104,32 @@ class Connection(ExportImport, object):
XXX Mention the database pool. XXX Mention the database pool.
A database connection always presents a consistent view of the
objects in the database, although it may not always present the
most current revision of any particular object. Modifications
made by concurrent transactions are not visible until the next
transaction boundary (abort or commit).
Two options affect consistency. By default, the mvcc and synch
options are enabled by default.
If you pass mvcc=True to db.open(), the Connection will never read
non-current revisions of an object. Instead it will raise a
ReadConflictError to indicate that the current revision is
unavailable because it was written after the current transaction
began.
The logic for handling modifications assumes that the thread that
opened a Connection (called db.open()) is the thread that will use
the Connection. If this is not true, you should pass synch=False
to db.open(). When the synch option is disabled, some transaction
boundaries will be missed by the Connection; in particular, if a
transaction does not involve any modifications to objects loaded
from the Connection and synch is disabled, the Connection will
miss the transaction boundary. Two examples of this behavior are
db.undo() and read-only transactions.
:Groups: :Groups:
- `User Methods`: root, get, add, close, db, sync, isReadOnly, - `User Methods`: root, get, add, close, db, sync, isReadOnly,
...@@ -126,11 +152,24 @@ class Connection(ExportImport, object): ...@@ -126,11 +152,24 @@ class Connection(ExportImport, object):
_code_timestamp = 0 _code_timestamp = 0
def __init__(self, version='', cache_size=400, def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None): cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
"""Create a new Connection. """Create a new Connection.
A Connection instance should by instantiated by the DB A Connection instance should by instantiated by the DB
instance that it connects to. instance that it is connected to.
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
- `cache_size`: the target size of the in-memory object
cache, measured in objects.
- `cache_deactivate_after`: deprecated, ignored
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
""" """
self._log = logging.getLogger("zodb.conn") self._log = logging.getLogger("zodb.conn")
...@@ -157,9 +196,11 @@ class Connection(ExportImport, object): ...@@ -157,9 +196,11 @@ class Connection(ExportImport, object):
# If a transaction manager is passed to the constructor, use # If a transaction manager is passed to the constructor, use
# it instead of the global transaction manager. The instance # it instead of the global transaction manager. The instance
# variable will hold either a TM class or the transaction # variable will hold a TM instance.
# module itself, which implements the same interface. self._txn_mgr = txn_mgr or transaction.manager
self._txn_mgr = txn_mgr or transaction # _synch is a boolean; if True, the Connection will register
# with the TM to receive afterCompletion() calls.
self._synch = synch
# _invalidated queues invalidate messages delivered from the DB # _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while # _inv_lock prevents one thread from modifying the set while
...@@ -194,17 +235,36 @@ class Connection(ExportImport, object): ...@@ -194,17 +235,36 @@ class Connection(ExportImport, object):
self._import = None self._import = None
def getTransaction(self): def getTransaction(self):
# XXX mark this as deprecated? """Get the current transaction for this connection.
:deprecated:
The transaction manager's get method works the same as this
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
warnings.warn("getTransaction() is deprecated.",
DeprecationWarning)
return self._txn_mgr.get() return self._txn_mgr.get()
def setLocalTransaction(self): def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread""" """Use a transaction bound to the connection rather than the thread.
# XXX mark this method as deprecated? :deprecated:
# Note that its signature has changed.
if self._txn_mgr is transaction: Returns the transaction manager used by the connection. You
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
warnings.warn("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.",
DeprecationWarning)
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._txn_mgr = transaction.TransactionManager() self._txn_mgr = transaction.TransactionManager()
if self._synch:
self._txn_mgr.registerSynch(self)
return self._txn_mgr return self._txn_mgr
def _cache_items(self): def _cache_items(self):
...@@ -326,17 +386,23 @@ class Connection(ExportImport, object): ...@@ -326,17 +386,23 @@ class Connection(ExportImport, object):
# lifetime of a connection, which is good enough. # lifetime of a connection, which is good enough.
return "%s:%s" % (self._sortKey(), id(self)) return "%s:%s" % (self._sortKey(), id(self))
def _setDB(self, odb): def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
"""Register odb, the DB that this Connection uses. """Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection This method is called by the DB every time a Connection
is opened. Any invalidations received while the Connection is opened. Any invalidations received while the Connection
was closed will be processed. was closed will be processed.
If resetCaches() was called, the cache will be cleared. If the global module function resetCaches() was called, the
cache will be cleared.
:Parameters: :Parameters:
- `odb`: that database that owns the Connection - `odb`: database that owns the Connection
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
""" """
# XXX Why do we go to all the trouble of setting _db and # XXX Why do we go to all the trouble of setting _db and
...@@ -348,11 +414,18 @@ class Connection(ExportImport, object): ...@@ -348,11 +414,18 @@ class Connection(ExportImport, object):
self._storage = odb._storage self._storage = odb._storage
self._sortKey = odb._storage.sortKey self._sortKey = odb._storage.sortKey
self.new_oid = odb._storage.new_oid self.new_oid = odb._storage.new_oid
if synch is not None:
self._synch = synch
if mvcc is not None:
self._mvcc = mvcc
self._txn_mgr = txn_mgr or transaction.manager
if self._reset_counter != global_reset_counter: if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache. # New code is in place. Start a new cache.
self._resetCache() self._resetCache()
else: else:
self._flush_invalidations() self._flush_invalidations()
if self._synch:
self._txn_mgr.registerSynch(self)
self._reader = ConnectionObjectReader(self, self._cache, self._reader = ConnectionObjectReader(self, self._cache,
self._db.classFactory) self._db.classFactory)
...@@ -465,6 +538,8 @@ class Connection(ExportImport, object): ...@@ -465,6 +538,8 @@ class Connection(ExportImport, object):
self._debug_info = () self._debug_info = ()
# Return the connection to the pool. # Return the connection to the pool.
if self._db is not None: if self._db is not None:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._db._closeConnection(self) self._db._closeConnection(self)
# _closeConnection() set self._db to None. However, we can't # _closeConnection() set self._db to None. However, we can't
# assert that here, because self may have been reused (by # assert that here, because self may have been reused (by
...@@ -620,6 +695,15 @@ class Connection(ExportImport, object): ...@@ -620,6 +695,15 @@ class Connection(ExportImport, object):
finally: finally:
self._inv_lock.release() self._inv_lock.release()
# The next two methods are callbacks for transaction synchronization.
def beforeCompletion(self, txn):
# We don't do anything before a commit starts.
pass
def afterCompletion(self, txn):
self._flush_invalidations()
def _flush_invalidations(self): def _flush_invalidations(self):
self._inv_lock.acquire() self._inv_lock.acquire()
try: try:
...@@ -832,8 +916,9 @@ class Connection(ExportImport, object): ...@@ -832,8 +916,9 @@ class Connection(ExportImport, object):
self._import = None self._import = None
self._storage.tpc_abort(transaction) self._storage.tpc_abort(transaction)
self._cache.invalidate(self._modified) self._cache.invalidate(self._modified)
self._flush_invalidations()
self._conflicts.clear() self._conflicts.clear()
if not self._synch:
self._flush_invalidations()
self._invalidate_creating() self._invalidate_creating()
while self._added: while self._added:
oid, obj = self._added.popitem() oid, obj = self._added.popitem()
...@@ -925,6 +1010,7 @@ class Connection(ExportImport, object): ...@@ -925,6 +1010,7 @@ class Connection(ExportImport, object):
self._storage.tpc_finish(transaction, callback) self._storage.tpc_finish(transaction, callback)
self._conflicts.clear() self._conflicts.clear()
if not self._synch:
self._flush_invalidations() self._flush_invalidations()
def sync(self): def sync(self):
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
############################################################################## ##############################################################################
"""Database objects """Database objects
$Id: DB.py,v 1.73 2004/04/08 18:12:25 tim_one Exp $""" $Id: DB.py,v 1.74 2004/04/14 20:45:37 jeremy Exp $"""
import cPickle, cStringIO, sys import cPickle, cStringIO, sys
from thread import allocate_lock from thread import allocate_lock
...@@ -398,7 +398,7 @@ class DB(object): ...@@ -398,7 +398,7 @@ class DB(object):
return len(self._storage) return len(self._storage)
def open(self, version='', transaction=None, temporary=0, force=None, def open(self, version='', transaction=None, temporary=0, force=None,
waitflag=1, mvcc=True, txn_mgr=None): waitflag=1, mvcc=True, txn_mgr=None, synch=True):
"""Return a database Connection for use by application code. """Return a database Connection for use by application code.
The optional version argument can be used to specify that a The optional version argument can be used to specify that a
...@@ -412,6 +412,20 @@ class DB(object): ...@@ -412,6 +412,20 @@ class DB(object):
Note that the connection pool is managed as a stack, to Note that the connection pool is managed as a stack, to
increate the likelihood that the connection's stack will increate the likelihood that the connection's stack will
include useful objects. include useful objects.
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
- `transaction`: XXX
- `temporary`: XXX
- `force`: XXX
- `waitflag`: XXX
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
""" """
self._a() self._a()
try: try:
...@@ -431,7 +445,7 @@ class DB(object): ...@@ -431,7 +445,7 @@ class DB(object):
# a one-use connection. # a one-use connection.
c = self.klass(version=version, c = self.klass(version=version,
cache_size=self._version_cache_size, cache_size=self._version_cache_size,
mvcc=mvcc, txn_mgr=txn_mgr) mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
c._setDB(self) c._setDB(self)
self._temps.append(c) self._temps.append(c)
if transaction is not None: if transaction is not None:
...@@ -487,7 +501,7 @@ class DB(object): ...@@ -487,7 +501,7 @@ class DB(object):
elif self._pool_size > len(allocated) or force: elif self._pool_size > len(allocated) or force:
c = self.klass(version=version, c = self.klass(version=version,
cache_size=self._cache_size, cache_size=self._cache_size,
mvcc=mvcc, txn_mgr=txn_mgr) mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
allocated.append(c) allocated.append(c)
pool.append(c) pool.append(c)
...@@ -518,7 +532,7 @@ class DB(object): ...@@ -518,7 +532,7 @@ class DB(object):
c = pool[-1] c = pool[-1]
del pool[-1] del pool[-1]
c._setDB(self) c._setDB(self, mvcc, txn_mgr, synch)
for pool, allocated in pooll: for pool, allocated in pooll:
for cc in pool: for cc in pool:
cc.cacheGC() cc.cacheGC()
......
...@@ -43,7 +43,7 @@ class TransactionManager(object): ...@@ -43,7 +43,7 @@ class TransactionManager(object):
self._txn = None self._txn = None
def registerSynch(self, synch): def registerSynch(self, synch):
self.synchs.append(synch) self._synchs.append(synch)
def unregisterSynch(self, synch): def unregisterSynch(self, synch):
self._synchs.remove(synch) self._synchs.remove(synch)
......
...@@ -246,7 +246,7 @@ class Transaction(object): ...@@ -246,7 +246,7 @@ class Transaction(object):
if not subtransaction: if not subtransaction:
for s in self._synchronizers: for s in self._synchronizers:
s.beforeCompletion() s.beforeCompletion(self)
if not subtransaction: if not subtransaction:
self.status = Status.COMMITTING self.status = Status.COMMITTING
...@@ -260,7 +260,7 @@ class Transaction(object): ...@@ -260,7 +260,7 @@ class Transaction(object):
if self._manager: if self._manager:
self._manager.free(self) self._manager.free(self)
for s in self._synchronizers: for s in self._synchronizers:
s.afterCompletion() s.afterCompletion(self)
self.log.debug("commit") self.log.debug("commit")
def _commitResources(self, subtransaction): def _commitResources(self, subtransaction):
...@@ -313,6 +313,8 @@ class Transaction(object): ...@@ -313,6 +313,8 @@ class Transaction(object):
self.status = Status.FAILED self.status = Status.FAILED
if self._manager: if self._manager:
self._manager.free(self) self._manager.free(self)
for s in self._synchronizers:
s.afterCompletion(self)
raise t, v, tb raise t, v, tb
def _cleanup(self, L): def _cleanup(self, L):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment