Commit 318f92e8 authored by Jim Fulton's avatar Jim Fulton

Greatly simplified the internal dance to create, open, and close

connections.   Connections now get their database and related data
when they are constructed.  They no longer throw away this information
when they are closed.  (The old design that threw away the database
and related data did so to avoid memory leaks in Python 1.)

Fixed a multi-database connection-management bug. (The fix
necessitated the simplification.)
parent 8fcb1362
...@@ -66,27 +66,22 @@ class Connection(ExportImport, object): ...@@ -66,27 +66,22 @@ class Connection(ExportImport, object):
IPersistentDataManager, IPersistentDataManager,
ISynchronizer) ISynchronizer)
_storage = _normal_storage = _savepoint_storage = None
_code_timestamp = 0 _code_timestamp = 0
########################################################################## ##########################################################################
# Connection methods, ZODB.IConnection # Connection methods, ZODB.IConnection
def __init__(self, version='', cache_size=400, def __init__(self, db, version='', cache_size=400):
cache_deactivate_after=None, mvcc=True,
txn_mgr=DEPRECATED_ARGUMENT,
transaction_manager=None,
synch=True):
"""Create a new Connection.""" """Create a new Connection."""
if txn_mgr is not DEPRECATED_ARGUMENT: self._db = db
deprecated36("use transaction_manager= instead of txn_mgr=") self._normal_storage = self._storage = db._storage
if transaction_manager is None: self.new_oid = db._storage.new_oid
transaction_manager = txn_mgr self._savepoint_storage = None
else:
raise ValueError("cannot specify both transaction_manager= " self.transaction_manager = self._synch = self._mvcc = None
"and txn_mgr=")
self._log = logging.getLogger("ZODB.Connection") self._log = logging.getLogger("ZODB.Connection")
self._debug_info = () self._debug_info = ()
...@@ -101,6 +96,7 @@ class Connection(ExportImport, object): ...@@ -101,6 +96,7 @@ class Connection(ExportImport, object):
# Unclear: Why do we want version caches to behave this way? # Unclear: Why do we want version caches to behave this way?
self._cache.cache_drain_resistance = 100 self._cache.cache_drain_resistance = 100
self._committed = [] self._committed = []
self._added = {} self._added = {}
self._added_during_commit = None self._added_during_commit = None
...@@ -119,15 +115,6 @@ class Connection(ExportImport, object): ...@@ -119,15 +115,6 @@ class Connection(ExportImport, object):
# Do we need to join a txn manager? # Do we need to join a txn manager?
self._needs_to_join = True self._needs_to_join = True
# If a transaction manager is passed to the constructor, use
# it instead of the global transaction manager. The instance
# variable will hold a TM instance.
self.transaction_manager = transaction_manager or transaction.manager
# _synch is a boolean; if True, the Connection will register
# with the TM to receive afterCompletion() calls.
self._synch = synch
# _invalidated queues invalidate messages delivered from the DB # _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while # _inv_lock prevents one thread from modifying the set while
# another is processing invalidations. All the invalidations # another is processing invalidations. All the invalidations
...@@ -158,7 +145,6 @@ class Connection(ExportImport, object): ...@@ -158,7 +145,6 @@ class Connection(ExportImport, object):
# If it is None, then the current revisions are acceptable. # If it is None, then the current revisions are acceptable.
# If the connection is in a version, mvcc will be disabled, because # If the connection is in a version, mvcc will be disabled, because
# loadBefore() only returns non-version data. # loadBefore() only returns non-version data.
self._mvcc = mvcc and not version
self._txn_time = None self._txn_time = None
# To support importFile(), implemented in the ExportImport base # To support importFile(), implemented in the ExportImport base
...@@ -167,11 +153,16 @@ class Connection(ExportImport, object): ...@@ -167,11 +153,16 @@ class Connection(ExportImport, object):
# to pass to _importDuringCommit(). # to pass to _importDuringCommit().
self._import = None self._import = None
self.connections = None
self._reader = ObjectReader(self, self._cache, self._db.classFactory)
# Multi-database support
self.connections = {self._db.database_name: self}
def add(self, obj): def add(self, obj):
"""Add a new object 'obj' to the database and assign it an oid.""" """Add a new object 'obj' to the database and assign it an oid."""
if self._storage is None: if self._opened is None:
raise ConnectionStateError("The database connection is closed") raise ConnectionStateError("The database connection is closed")
marker = object() marker = object()
...@@ -195,7 +186,7 @@ class Connection(ExportImport, object): ...@@ -195,7 +186,7 @@ class Connection(ExportImport, object):
def get(self, oid): def get(self, oid):
"""Return the persistent object with oid 'oid'.""" """Return the persistent object with oid 'oid'."""
if self._storage is None: if self._opened is None:
raise ConnectionStateError("The database connection is closed") raise ConnectionStateError("The database connection is closed")
obj = self._cache.get(oid, None) obj = self._cache.get(oid, None)
...@@ -234,7 +225,7 @@ class Connection(ExportImport, object): ...@@ -234,7 +225,7 @@ class Connection(ExportImport, object):
self.__onCloseCallbacks = [] self.__onCloseCallbacks = []
self.__onCloseCallbacks.append(f) self.__onCloseCallbacks.append(f)
def close(self): def close(self, primary=True):
"""Close the Connection.""" """Close the Connection."""
if not self._needs_to_join: if not self._needs_to_join:
# We're currently joined to a transaction. # We're currently joined to a transaction.
...@@ -254,18 +245,28 @@ class Connection(ExportImport, object): ...@@ -254,18 +245,28 @@ class Connection(ExportImport, object):
self._log.error("Close callback failed for %s", f, self._log.error("Close callback failed for %s", f,
exc_info=sys.exc_info()) exc_info=sys.exc_info())
self.__onCloseCallbacks = None self.__onCloseCallbacks = None
self._storage = self._savepoint_storage = self._normal_storage = None
self.new_oid = None
self._debug_info = () self._debug_info = ()
self._opened = None
# Return the connection to the pool.
if self._db is not None:
if self._synch: if self._synch:
self.transaction_manager.unregisterSynch(self) self.transaction_manager.unregisterSynch(self)
self._db._closeConnection(self) self._synch = None
# _closeConnection() set self._db to None. However, we can't
# assert that here, because self may have been reused (by if primary:
# another thread) by the time we get back here. for connection in self.connections.values():
if connection is not self:
connection.close(False)
# Return the connection to the pool.
if self._opened is not None:
self._db._returnToPool(self)
# _returnToPool() set self._opened to None.
# However, we can't assert that here, because self may
# have been reused (by another thread) by the time we
# get back here.
else:
self._opened = None
def db(self): def db(self):
"""Returns a handle to the database this connection belongs to.""" """Returns a handle to the database this connection belongs to."""
...@@ -273,7 +274,7 @@ class Connection(ExportImport, object): ...@@ -273,7 +274,7 @@ class Connection(ExportImport, object):
def isReadOnly(self): def isReadOnly(self):
"""Returns True if the storage for this connection is read only.""" """Returns True if the storage for this connection is read only."""
if self._storage is None: if self._opened is None:
raise ConnectionStateError("The database connection is closed") raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly() return self._storage.isReadOnly()
...@@ -700,7 +701,7 @@ class Connection(ExportImport, object): ...@@ -700,7 +701,7 @@ class Connection(ExportImport, object):
database.""" database."""
oid = obj._p_oid oid = obj._p_oid
if self._storage is None: if self._opened is None:
msg = ("Shouldn't load state for %s " msg = ("Shouldn't load state for %s "
"when the connection is closed" % oid_repr(oid)) "when the connection is closed" % oid_repr(oid))
self._log.error(msg) self._log.error(msg)
...@@ -873,8 +874,8 @@ class Connection(ExportImport, object): ...@@ -873,8 +874,8 @@ class Connection(ExportImport, object):
# return a list of [ghosts....not recently used.....recently used] # return a list of [ghosts....not recently used.....recently used]
return everything.items() + items return everything.items() + items
def _setDB(self, odb, mvcc=None, txn_mgr=DEPRECATED_ARGUMENT, def open(self, transaction_manager=None, mvcc=True, synch=True,
transaction_manager=None, synch=None): delegate=True):
"""Register odb, the DB that this Connection uses. """Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection This method is called by the DB every time a Connection
...@@ -893,39 +894,37 @@ class Connection(ExportImport, object): ...@@ -893,39 +894,37 @@ class Connection(ExportImport, object):
register for afterCompletion() calls. register for afterCompletion() calls.
""" """
if txn_mgr is not DEPRECATED_ARGUMENT:
deprecated36("use transaction_manager= instead of txn_mgr=")
if transaction_manager is None:
transaction_manager = txn_mgr
else:
raise ValueError("cannot specify both transaction_manager= "
"and txn_mgr=")
# TODO: Why do we go to all the trouble of setting _db and # TODO: Why do we go to all the trouble of setting _db and
# other attributes on open and clearing them on close? # other attributes on open and clearing them on close?
# A Connection is only ever associated with a single DB # A Connection is only ever associated with a single DB
# and Storage. # and Storage.
self._db = odb
self._normal_storage = self._storage = odb._storage
self.new_oid = odb._storage.new_oid
self._opened = time() self._opened = time()
if synch is not None:
self._synch = synch self._synch = synch
if mvcc is not None: self._mvcc = mvcc and not self._version
self._mvcc = mvcc
self.transaction_manager = transaction_manager or transaction.manager if transaction_manager is None:
transaction_manager = transaction.manager
self.transaction_manager = transaction_manager
if self._reset_counter != global_reset_counter: if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache. # New code is in place. Start a new cache.
self._resetCache() self._resetCache()
else: else:
self._flush_invalidations() self._flush_invalidations()
if self._synch:
self.transaction_manager.registerSynch(self)
self._reader = ObjectReader(self, self._cache, self._db.classFactory)
# Multi-database support if synch:
self.connections = {self._db.database_name: self} transaction_manager.registerSynch(self)
if self._cache is not None:
self._cache.incrgc() # This is a good time to do some GC
if delegate:
# delegate open to secondary connections
for connection in self.connections.values():
if connection is not self:
connection.open(transaction_manager, mvcc, synch, False)
def _resetCache(self): def _resetCache(self):
"""Creates a new cache, discarding the old one. """Creates a new cache, discarding the old one.
......
...@@ -273,7 +273,7 @@ class DB(object): ...@@ -273,7 +273,7 @@ class DB(object):
self.undoInfo = storage.undoInfo self.undoInfo = storage.undoInfo
# This is called by Connection.close(). # This is called by Connection.close().
def _closeConnection(self, connection): def _returnToPool(self, connection):
"""Return a connection to the pool. """Return a connection to the pool.
connection._db must be self on entry. connection._db must be self on entry.
...@@ -282,7 +282,7 @@ class DB(object): ...@@ -282,7 +282,7 @@ class DB(object):
self._a() self._a()
try: try:
assert connection._db is self assert connection._db is self
connection._db = None connection._opened = None
am = self._activity_monitor am = self._activity_monitor
if am is not None: if am is not None:
...@@ -560,15 +560,13 @@ class DB(object): ...@@ -560,15 +560,13 @@ class DB(object):
size = self._version_cache_size size = self._version_cache_size
else: else:
size = self._cache_size size = self._cache_size
c = self.klass(version=version, cache_size=size, mvcc=mvcc, c = self.klass(self, version, size)
transaction_manager=transaction_manager)
pool.push(c) pool.push(c)
result = pool.pop() result = pool.pop()
assert result is not None assert result is not None
# Tell the connection it belongs to self. # Tell the connection it belongs to self.
result._setDB(self, mvcc=mvcc, synch=synch, result.open(transaction_manager, mvcc, synch)
transaction_manager=transaction_manager)
# A good time to do some cache cleanup. # A good time to do some cache cleanup.
self._connectionMap(lambda c: c.cacheGC()) self._connectionMap(lambda c: c.cacheGC())
......
...@@ -112,10 +112,7 @@ class IConnection(Interface): ...@@ -112,10 +112,7 @@ class IConnection(Interface):
getTransferCounts getTransferCounts
""" """
def __init__(version='', cache_size=400, def __init__(database, version='', cache_size=400):
cache_deactivate_after=None, mvcc=True,
transaction_manager=None,
synch=True):
"""Create a new Connection. """Create a new Connection.
A Connection instance should by instantiated by the DB A Connection instance should by instantiated by the DB
......
...@@ -28,9 +28,9 @@ class ConnectionDotAdd(unittest.TestCase): ...@@ -28,9 +28,9 @@ class ConnectionDotAdd(unittest.TestCase):
def setUp(self): def setUp(self):
from ZODB.Connection import Connection from ZODB.Connection import Connection
self.datamgr = Connection()
self.db = StubDatabase() self.db = StubDatabase()
self.datamgr._setDB(self.db) self.datamgr = Connection(self.db)
self.datamgr.open()
self.transaction = StubTransaction() self.transaction = StubTransaction()
def tearDown(self): def tearDown(self):
...@@ -358,7 +358,8 @@ class UserMethodTests(unittest.TestCase): ...@@ -358,7 +358,8 @@ class UserMethodTests(unittest.TestCase):
>>> cn.db() is db >>> cn.db() is db
True True
>>> cn.close() >>> cn.close()
>>> cn.db() >>> cn.db() is db
True
""" """
def test_isReadOnly(self): def test_isReadOnly(self):
......
...@@ -165,7 +165,7 @@ class ZODBTests(unittest.TestCase): ...@@ -165,7 +165,7 @@ class ZODBTests(unittest.TestCase):
ZODB.Connection.resetCaches() ZODB.Connection.resetCaches()
conn.close() conn.close()
self.assert_(len(conn._cache) > 0) # Still not flushed self.assert_(len(conn._cache) > 0) # Still not flushed
conn._setDB(self._db) # simulate the connection being reopened conn.open() # simulate the connection being reopened
self.assertEqual(len(conn._cache), 0) self.assertEqual(len(conn._cache), 0)
def checkExplicitTransactionManager(self): def checkExplicitTransactionManager(self):
......
...@@ -78,6 +78,50 @@ different connections to the same database. ...@@ -78,6 +78,50 @@ different connections to the same database.
""" """
def test_connection_management_doesnt_get_caching_wrong():
"""
If a connection participates in a multidatabase, then it's
connections must remain so that references between it's cached
objects remain sane.
>>> import ZODB.tests.util, transaction, persistent
>>> databases = {}
>>> db1 = ZODB.tests.util.DB(databases=databases, database_name='1')
>>> db2 = ZODB.tests.util.DB(databases=databases, database_name='2')
>>> tm = transaction.TransactionManager()
>>> conn1 = db1.open(transaction_manager=tm)
>>> conn2 = conn1.get_connection('2')
>>> z = MyClass()
>>> conn2.root()['z'] = z
>>> tm.commit()
>>> x = MyClass()
>>> x.z = z
>>> conn1.root()['x'] = x
>>> y = MyClass()
>>> y.z = z
>>> conn1.root()['y'] = y
>>> tm.commit()
>>> conn1.root()['x'].z is conn1.root()['y'].z
True
So, we have 2 objects in conn1 that point to the same object in conn2.
Now, we'll deactivate one, close and repopen the connection, and see
if we get the same objects:
>>> x._p_deactivate()
>>> conn1.close()
>>> conn1 = db1.open(transaction_manager=tm)
>>> conn1.root()['x'].z is conn1.root()['y'].z
True
>>> db1.close()
>>> db2.close()
"""
def tearDownDbs(test): def tearDownDbs(test):
test.globs['db1'].close() test.globs['db1'].close()
test.globs['db2'].close() test.globs['db2'].close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment