Commit 29fd0f2d authored by Jeremy Hylton's avatar Jeremy Hylton

Add automatic synchronization for Connection objects.

This changes the behavior slightly for invalidations.  A Connection
will receive invalidations even if it didn't have any modified objects
in the transaction, e.g. db.undo() or a read-only transaction.  For
this to work, it assumes that the thread that called db.open() is the
thread that is going to use the connection.  If this isn't true, the
synch argument to db.open() can be used to disable the new feature.
parent 05b9e5df
......@@ -469,7 +469,7 @@ class NastyConfict(Base, TestCase):
r1["t"] = self.t
get_transaction().commit()
r2 = self.db.open().root()
r2 = self.db.open(synch=False).root()
copy = r2["t"]
# Make sure all of copy is loaded.
list(copy.values())
......@@ -546,7 +546,7 @@ class NastyConfict(Base, TestCase):
r1["t"] = self.t
get_transaction().commit()
r2 = self.db.open().root()
r2 = self.db.open(synch=False).root()
copy = r2["t"]
# Make sure all of copy is loaded.
list(copy.values())
......@@ -687,7 +687,7 @@ class NastyConfict(Base, TestCase):
r1["t"] = self.t
get_transaction().commit()
r2 = self.db.open().root()
r2 = self.db.open(synch=False).root()
copy = r2["t"]
# Make sure all of copy is loaded.
list(copy.values())
......
......@@ -13,7 +13,7 @@
##############################################################################
"""Database connection support
$Id: Connection.py,v 1.146 2004/04/09 11:11:32 gintautasm Exp $"""
$Id: Connection.py,v 1.147 2004/04/14 20:45:37 jeremy Exp $"""
import logging
import sys
......@@ -104,6 +104,32 @@ class Connection(ExportImport, object):
XXX Mention the database pool.
A database connection always presents a consistent view of the
objects in the database, although it may not always present the
most current revision of any particular object. Modifications
made by concurrent transactions are not visible until the next
transaction boundary (abort or commit).
Two options affect consistency. By default, the mvcc and synch
options are enabled by default.
If you pass mvcc=True to db.open(), the Connection will never read
non-current revisions of an object. Instead it will raise a
ReadConflictError to indicate that the current revision is
unavailable because it was written after the current transaction
began.
The logic for handling modifications assumes that the thread that
opened a Connection (called db.open()) is the thread that will use
the Connection. If this is not true, you should pass synch=False
to db.open(). When the synch option is disabled, some transaction
boundaries will be missed by the Connection; in particular, if a
transaction does not involve any modifications to objects loaded
from the Connection and synch is disabled, the Connection will
miss the transaction boundary. Two examples of this behavior are
db.undo() and read-only transactions.
:Groups:
- `User Methods`: root, get, add, close, db, sync, isReadOnly,
......@@ -126,11 +152,24 @@ class Connection(ExportImport, object):
_code_timestamp = 0
def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None):
cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
"""Create a new Connection.
A Connection instance should by instantiated by the DB
instance that it connects to.
instance that it is connected to.
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
- `cache_size`: the target size of the in-memory object
cache, measured in objects.
- `cache_deactivate_after`: deprecated, ignored
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
self._log = logging.getLogger("zodb.conn")
......@@ -157,9 +196,11 @@ class Connection(ExportImport, object):
# If a transaction manager is passed to the constructor, use
# it instead of the global transaction manager. The instance
# variable will hold either a TM class or the transaction
# module itself, which implements the same interface.
self._txn_mgr = txn_mgr or transaction
# variable will hold a TM instance.
self._txn_mgr = txn_mgr or transaction.manager
# _synch is a boolean; if True, the Connection will register
# with the TM to receive afterCompletion() calls.
self._synch = synch
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
......@@ -194,17 +235,36 @@ class Connection(ExportImport, object):
self._import = None
def getTransaction(self):
# XXX mark this as deprecated?
"""Get the current transaction for this connection.
:deprecated:
The transaction manager's get method works the same as this
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
warnings.warn("getTransaction() is deprecated.",
DeprecationWarning)
return self._txn_mgr.get()
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread"""
"""Use a transaction bound to the connection rather than the thread.
# XXX mark this method as deprecated?
# Note that its signature has changed.
:deprecated:
if self._txn_mgr is transaction:
Returns the transaction manager used by the connection. You
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
warnings.warn("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.",
DeprecationWarning)
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._txn_mgr = transaction.TransactionManager()
if self._synch:
self._txn_mgr.registerSynch(self)
return self._txn_mgr
def _cache_items(self):
......@@ -326,17 +386,23 @@ class Connection(ExportImport, object):
# lifetime of a connection, which is good enough.
return "%s:%s" % (self._sortKey(), id(self))
def _setDB(self, odb):
def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
"""Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection
is opened. Any invalidations received while the Connection
was closed will be processed.
If resetCaches() was called, the cache will be cleared.
If the global module function resetCaches() was called, the
cache will be cleared.
:Parameters:
- `odb`: that database that owns the Connection
- `odb`: database that owns the Connection
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
# XXX Why do we go to all the trouble of setting _db and
......@@ -348,11 +414,18 @@ class Connection(ExportImport, object):
self._storage = odb._storage
self._sortKey = odb._storage.sortKey
self.new_oid = odb._storage.new_oid
if synch is not None:
self._synch = synch
if mvcc is not None:
self._mvcc = mvcc
self._txn_mgr = txn_mgr or transaction.manager
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
if self._synch:
self._txn_mgr.registerSynch(self)
self._reader = ConnectionObjectReader(self, self._cache,
self._db.classFactory)
......@@ -465,6 +538,8 @@ class Connection(ExportImport, object):
self._debug_info = ()
# Return the connection to the pool.
if self._db is not None:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._db._closeConnection(self)
# _closeConnection() set self._db to None. However, we can't
# assert that here, because self may have been reused (by
......@@ -620,6 +695,15 @@ class Connection(ExportImport, object):
finally:
self._inv_lock.release()
# The next two methods are callbacks for transaction synchronization.
def beforeCompletion(self, txn):
# We don't do anything before a commit starts.
pass
def afterCompletion(self, txn):
self._flush_invalidations()
def _flush_invalidations(self):
self._inv_lock.acquire()
try:
......@@ -832,8 +916,9 @@ class Connection(ExportImport, object):
self._import = None
self._storage.tpc_abort(transaction)
self._cache.invalidate(self._modified)
self._flush_invalidations()
self._conflicts.clear()
if not self._synch:
self._flush_invalidations()
self._invalidate_creating()
while self._added:
oid, obj = self._added.popitem()
......@@ -925,7 +1010,8 @@ class Connection(ExportImport, object):
self._storage.tpc_finish(transaction, callback)
self._conflicts.clear()
self._flush_invalidations()
if not self._synch:
self._flush_invalidations()
def sync(self):
self._txn_mgr.get().abort()
......
......@@ -13,7 +13,7 @@
##############################################################################
"""Database objects
$Id: DB.py,v 1.73 2004/04/08 18:12:25 tim_one Exp $"""
$Id: DB.py,v 1.74 2004/04/14 20:45:37 jeremy Exp $"""
import cPickle, cStringIO, sys
from thread import allocate_lock
......@@ -398,7 +398,7 @@ class DB(object):
return len(self._storage)
def open(self, version='', transaction=None, temporary=0, force=None,
waitflag=1, mvcc=True, txn_mgr=None):
waitflag=1, mvcc=True, txn_mgr=None, synch=True):
"""Return a database Connection for use by application code.
The optional version argument can be used to specify that a
......@@ -412,6 +412,20 @@ class DB(object):
Note that the connection pool is managed as a stack, to
increate the likelihood that the connection's stack will
include useful objects.
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
- `transaction`: XXX
- `temporary`: XXX
- `force`: XXX
- `waitflag`: XXX
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
self._a()
try:
......@@ -431,7 +445,7 @@ class DB(object):
# a one-use connection.
c = self.klass(version=version,
cache_size=self._version_cache_size,
mvcc=mvcc, txn_mgr=txn_mgr)
mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
c._setDB(self)
self._temps.append(c)
if transaction is not None:
......@@ -487,7 +501,7 @@ class DB(object):
elif self._pool_size > len(allocated) or force:
c = self.klass(version=version,
cache_size=self._cache_size,
mvcc=mvcc, txn_mgr=txn_mgr)
mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
allocated.append(c)
pool.append(c)
......@@ -518,7 +532,7 @@ class DB(object):
c = pool[-1]
del pool[-1]
c._setDB(self)
c._setDB(self, mvcc, txn_mgr, synch)
for pool, allocated in pooll:
for cc in pool:
cc.cacheGC()
......
......@@ -43,7 +43,7 @@ class TransactionManager(object):
self._txn = None
def registerSynch(self, synch):
self.synchs.append(synch)
self._synchs.append(synch)
def unregisterSynch(self, synch):
self._synchs.remove(synch)
......
......@@ -246,7 +246,7 @@ class Transaction(object):
if not subtransaction:
for s in self._synchronizers:
s.beforeCompletion()
s.beforeCompletion(self)
if not subtransaction:
self.status = Status.COMMITTING
......@@ -260,7 +260,7 @@ class Transaction(object):
if self._manager:
self._manager.free(self)
for s in self._synchronizers:
s.afterCompletion()
s.afterCompletion(self)
self.log.debug("commit")
def _commitResources(self, subtransaction):
......@@ -313,6 +313,8 @@ class Transaction(object):
self.status = Status.FAILED
if self._manager:
self._manager.free(self)
for s in self._synchronizers:
s.afterCompletion(self)
raise t, v, tb
def _cleanup(self, L):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment