Commit 1e730a8c authored by Tim Peters's avatar Tim Peters

Brrr.

Weak sets have have pragmatic gotchas, explained in the comments
before the new WeakSet.as_weakref_list() method.  In essence, we
just took all the weak sets of connection objects and changed
everything so that a list of live objects is never materialized
anymore.  Also added new map()-like methods so that clients don't
usually need to be aware of the weakrefs under the covers.
parent ce4ac7b4
...@@ -130,10 +130,12 @@ class _ConnectionPool(object): ...@@ -130,10 +130,12 @@ class _ConnectionPool(object):
assert result in self.all assert result in self.all
return result return result
# Return a list of all connections we currently know about. # For every live connection c, invoke f(c).
def all_as_list(self): def map(self, f):
return self.all.as_list() for wr in self.all.as_weakref_list():
c = wr()
if c is not None:
f(c)
class DB(object): class DB(object):
"""The Object Database """The Object Database
...@@ -294,8 +296,7 @@ class DB(object): ...@@ -294,8 +296,7 @@ class DB(object):
self._a() self._a()
try: try:
for pool in self._pools.values(): for pool in self._pools.values():
for c in pool.all_as_list(): pool.map(f)
f(c)
finally: finally:
self._r() self._r()
...@@ -562,22 +563,26 @@ class DB(object): ...@@ -562,22 +563,26 @@ class DB(object):
def connectionDebugInfo(self): def connectionDebugInfo(self):
result = [] result = []
t = time() t = time()
for version, pool in self._pools.items():
for c in pool.all_as_list():
o = c._opened
d = c._debug_info
if d:
if len(d) == 1:
d = d[0]
else:
d = ''
d = "%s (%s)" % (d, len(c._cache))
result.append({ def get_info(c):
'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)), # `result`, `time` and `version` are lexically inherited.
'info': d, o = c._opened
'version': version, d = c._debug_info
}) if d:
if len(d) == 1:
d = d[0]
else:
d = ''
d = "%s (%s)" % (d, len(c._cache))
result.append({
'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
'info': d,
'version': version,
})
for version, pool in self._pools.items():
pool.map(get_info)
return result return result
def getActivityMonitor(self): def getActivityMonitor(self):
...@@ -614,25 +619,27 @@ class DB(object): ...@@ -614,25 +619,27 @@ class DB(object):
# Zope will rebind this method to arbitrary user code at runtime. # Zope will rebind this method to arbitrary user code at runtime.
return find_global(modulename, globalname) return find_global(modulename, globalname)
def setCacheSize(self, v): def setCacheSize(self, size):
self._a() self._a()
try: try:
self._cache_size = v self._cache_size = size
pool = self._pools.get('') pool = self._pools.get('')
if pool is not None: if pool is not None:
for c in pool.all_as_list(): def setsize(c):
c._cache.cache_size = v c._cache.cache_size = size
pool.map(setsize)
finally: finally:
self._r() self._r()
def setVersionCacheSize(self, v): def setVersionCacheSize(self, size):
self._a() self._a()
try: try:
self._version_cache_size = v self._version_cache_size = size
def setsize(c):
c._cache.cache_size = size
for version, pool in self._pools.items(): for version, pool in self._pools.items():
if version: if version:
for c in pool.all_as_list(): pool.map(setsize)
c._cache.cache_size = v
finally: finally:
self._r() self._r()
......
...@@ -221,10 +221,25 @@ class WeakSet(object): ...@@ -221,10 +221,25 @@ class WeakSet(object):
def remove(self, obj): def remove(self, obj):
del self.data[id(obj)] del self.data[id(obj)]
# Return a list of all the objects in the collection. # Return a list of weakrefs to all the objects in the collection.
# Because a weak dict is used internally, iteration # Because a weak dict is used internally, iteration is dicey (the
# is dicey (the underlying dict may change size during # underlying dict may change size during iteration, due to gc or
# iteration, due to gc or activity from other threads). # activity from other threads). as_weakef_list() is safe.
# as_list() attempts to be safe. #
def as_list(self): # Something like this should really be a method of Python's weak dicts.
return self.data.values() # If we invoke self.data.values() instead, we get back a list of live
# objects instead of weakrefs. If gc occurs while this list is alive,
# all the objects move to an older generation (because they're strongly
# referenced by the list!). They can't get collected then, until a
# less frequent collection of the older generation. Before then, if we
# invoke self.data.values() again, they're still alive, and if gc occurs
# while that list is alive they're all moved to yet an older generation.
# And so on. Stress tests showed that it was easy to get into a state
# where a WeakSet grows without bounds, despite that almost all its
# elements are actually trash. By returning a list of weakrefs instead,
# we avoid that, although the decision to use weakrefs is now# very
# visible to our clients.
def as_weakref_list(self):
# We're cheating by breaking into the internals of Python's
# WeakValueDictionary here (accessing its .data attribute).
return self.data.data.values()
...@@ -43,12 +43,12 @@ class TransactionManager(object): ...@@ -43,12 +43,12 @@ class TransactionManager(object):
def begin(self): def begin(self):
if self._txn is not None: if self._txn is not None:
self._txn.abort() self._txn.abort()
self._txn = Transaction(self._synchs.as_list(), self) self._txn = Transaction(self._synchs.as_weakref_list(), self)
return self._txn return self._txn
def get(self): def get(self):
if self._txn is None: if self._txn is None:
self._txn = Transaction(self._synchs.as_list(), self) self._txn = Transaction(self._synchs.as_weakref_list(), self)
return self._txn return self._txn
def free(self, txn): def free(self, txn):
...@@ -82,7 +82,7 @@ class ThreadTransactionManager(object): ...@@ -82,7 +82,7 @@ class ThreadTransactionManager(object):
txn.abort() txn.abort()
synchs = self._synchs.get(tid) synchs = self._synchs.get(tid)
if synchs is not None: if synchs is not None:
synchs = synchs.as_list() synchs = synchs.as_weakref_list()
txn = self._txns[tid] = Transaction(synchs, self) txn = self._txns[tid] = Transaction(synchs, self)
return txn return txn
...@@ -92,7 +92,7 @@ class ThreadTransactionManager(object): ...@@ -92,7 +92,7 @@ class ThreadTransactionManager(object):
if txn is None: if txn is None:
synchs = self._synchs.get(tid) synchs = self._synchs.get(tid)
if synchs is not None: if synchs is not None:
synchs = synchs.as_list() synchs = synchs.as_weakref_list()
txn = self._txns[tid] = Transaction(synchs, self) txn = self._txns[tid] = Transaction(synchs, self)
return txn return txn
......
...@@ -138,6 +138,7 @@ import sys ...@@ -138,6 +138,7 @@ import sys
import thread import thread
import warnings import warnings
import traceback import traceback
import weakref
from cStringIO import StringIO from cStringIO import StringIO
# Sigh. In the maze of __init__.py's, ZODB.__init__.py takes 'get' # Sigh. In the maze of __init__.py's, ZODB.__init__.py takes 'get'
...@@ -203,6 +204,14 @@ class Transaction(object): ...@@ -203,6 +204,14 @@ class Transaction(object):
# raised, incorporating this traceback. # raised, incorporating this traceback.
self._failure_traceback = None self._failure_traceback = None
# Invoke f(synch) for each synch in self._synchronizers.
def _synch_map(self, f):
for wr in self._synchronizers:
assert isinstance(wr, weakref.ref)
synch = wr()
if synch is not None:
f(synch)
# Raise TransactionFailedError, due to commit()/join()/register() # Raise TransactionFailedError, due to commit()/join()/register()
# getting called when the current transaction has already suffered # getting called when the current transaction has already suffered
# a commit failure. # a commit failure.
...@@ -286,8 +295,7 @@ class Transaction(object): ...@@ -286,8 +295,7 @@ class Transaction(object):
self.commit(True) self.commit(True)
if not subtransaction: if not subtransaction:
for s in self._synchronizers: self._synch_map(lambda s: s.beforeCompletion(self))
s.beforeCompletion(self)
self.status = Status.COMMITTING self.status = Status.COMMITTING
try: try:
...@@ -311,8 +319,7 @@ class Transaction(object): ...@@ -311,8 +319,7 @@ class Transaction(object):
self.status = Status.COMMITTED self.status = Status.COMMITTED
if self._manager: if self._manager:
self._manager.free(self) self._manager.free(self)
for s in self._synchronizers: self._synch_map(lambda s: s.afterCompletion(self))
s.afterCompletion(self)
self.log.debug("commit") self.log.debug("commit")
def _commitResources(self, subtransaction): def _commitResources(self, subtransaction):
...@@ -360,8 +367,7 @@ class Transaction(object): ...@@ -360,8 +367,7 @@ class Transaction(object):
self._cleanup(L) self._cleanup(L)
finally: finally:
if not subtransaction: if not subtransaction:
for s in self._synchronizers: self._synch_map(lambda s: s.afterCompletion(self))
s.afterCompletion(self)
raise t, v, tb raise t, v, tb
def _cleanup(self, L): def _cleanup(self, L):
...@@ -427,8 +433,7 @@ class Transaction(object): ...@@ -427,8 +433,7 @@ class Transaction(object):
def abort(self, subtransaction=False): def abort(self, subtransaction=False):
if not subtransaction: if not subtransaction:
for s in self._synchronizers: self._synch_map(lambda s: s.beforeCompletion(self))
s.beforeCompletion(self)
if subtransaction and self._nonsub: if subtransaction and self._nonsub:
from ZODB.POSException import TransactionError from ZODB.POSException import TransactionError
...@@ -458,8 +463,7 @@ class Transaction(object): ...@@ -458,8 +463,7 @@ class Transaction(object):
if not subtransaction: if not subtransaction:
if self._manager: if self._manager:
self._manager.free(self) self._manager.free(self)
for s in self._synchronizers: self._synch_map(lambda s: s.afterCompletion(self))
s.afterCompletion(self)
self.log.debug("abort") self.log.debug("abort")
if tb is not None: if tb is not None:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment