Commit c7ff478a authored by Jim Fulton's avatar Jim Fulton

Changed the way delegation of storage methods was done to work with

storages that don't support versions or that don't support undo.

Changed some comments to doc strings.

Deprecated storages without tpc_vote.

Removed the modifiedInVersion cache.

Added an XXX about the broken ResourceManager implementations.
parent c3ce017f
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
$Id$""" $Id$"""
import warnings
import cPickle, cStringIO, sys import cPickle, cStringIO, sys
import threading import threading
from time import time, ctime from time import time, ctime
...@@ -31,6 +33,7 @@ from ZODB.interfaces import IDatabase ...@@ -31,6 +33,7 @@ from ZODB.interfaces import IDatabase
import transaction import transaction
logger = logging.getLogger('ZODB.DB') logger = logging.getLogger('ZODB.DB')
class _ConnectionPool(object): class _ConnectionPool(object):
...@@ -77,23 +80,28 @@ class _ConnectionPool(object): ...@@ -77,23 +80,28 @@ class _ConnectionPool(object):
# a list (we push only "on the right", but may pop from both ends). # a list (we push only "on the right", but may pop from both ends).
self.available = [] self.available = []
# Change our belief about the expected maximum # of live connections.
# If the pool_size is smaller than the current value, this may discard
# the oldest available connections.
def set_pool_size(self, pool_size): def set_pool_size(self, pool_size):
"""Change our belief about the expected maximum # of live connections.
If the pool_size is smaller than the current value, this may discard
the oldest available connections.
"""
self.pool_size = pool_size self.pool_size = pool_size
self._reduce_size() self._reduce_size()
# Register a new available connection. We must not know about c already.
# c will be pushed onto the available stack even if we're over the
# pool size limit.
def push(self, c): def push(self, c):
"""Register a new available connection.
We must not know about c already. c will be pushed onto the available
stack even if we're over the pool size limit.
"""
assert c not in self.all assert c not in self.all
assert c not in self.available assert c not in self.available
self._reduce_size(strictly_less=True) self._reduce_size(strictly_less=True)
self.all.add(c) self.all.add(c)
self.available.append(c) self.available.append(c)
n, limit = len(self.all), self.pool_size n = len(self.all)
limit = self.pool_size
if n > limit: if n > limit:
reporter = logger.warn reporter = logger.warn
if n > 2 * limit: if n > 2 * limit:
...@@ -101,20 +109,25 @@ class _ConnectionPool(object): ...@@ -101,20 +109,25 @@ class _ConnectionPool(object):
reporter("DB.open() has %s open connections with a pool_size " reporter("DB.open() has %s open connections with a pool_size "
"of %s", n, limit) "of %s", n, limit)
# Reregister an available connection formerly obtained via pop(). This
# pushes it on the stack of available connections, and may discard
# older available connections.
def repush(self, c): def repush(self, c):
"""Reregister an available connection formerly obtained via pop().
This pushes it on the stack of available connections, and may discard
older available connections.
"""
assert c in self.all assert c in self.all
assert c not in self.available assert c not in self.available
self._reduce_size(strictly_less=True) self._reduce_size(strictly_less=True)
self.available.append(c) self.available.append(c)
# Throw away the oldest available connections until we're under our
# target size (strictly_less=False) or no more than that (strictly_less=
# True, the default).
def _reduce_size(self, strictly_less=False): def _reduce_size(self, strictly_less=False):
target = self.pool_size - bool(strictly_less) """Throw away the oldest available connections until we're under our
target size (strictly_less=False, the default) or no more than that
(strictly_less=True).
"""
target = self.pool_size
if strictly_less:
target -= 1
while len(self.available) > target: while len(self.available) > target:
c = self.available.pop(0) c = self.available.pop(0)
self.all.remove(c) self.all.remove(c)
...@@ -132,11 +145,13 @@ class _ConnectionPool(object): ...@@ -132,11 +145,13 @@ class _ConnectionPool(object):
# now, and `c` would be left in a user-visible crazy state. # now, and `c` would be left in a user-visible crazy state.
c._resetCache() c._resetCache()
# Pop an available connection and return it, or return None if none are
# available. In the latter case, the caller should create a new
# connection, register it via push(), and call pop() again. The
# caller is responsible for serializing this sequence.
def pop(self): def pop(self):
"""Pop an available connection and return it.
Return None if none are available - in this case, the caller should
create a new connection, register it via push(), and call pop() again.
The caller is responsible for serializing this sequence.
"""
result = None result = None
if self.available: if self.available:
result = self.available.pop() result = self.available.pop()
...@@ -145,8 +160,8 @@ class _ConnectionPool(object): ...@@ -145,8 +160,8 @@ class _ConnectionPool(object):
assert result in self.all assert result in self.all
return result return result
# For every live connection c, invoke f(c).
def map(self, f): def map(self, f):
"""For every live connection c, invoke f(c)."""
self.all.map(f) self.all.map(f)
class DB(object): class DB(object):
...@@ -227,8 +242,6 @@ class DB(object): ...@@ -227,8 +242,6 @@ class DB(object):
self._version_pool_size = version_pool_size self._version_pool_size = version_pool_size
self._version_cache_size = version_cache_size self._version_cache_size = version_cache_size
self._miv_cache = {}
# Setup storage # Setup storage
self._storage=storage self._storage=storage
self.references = ZODB.serialize.referencesf self.references = ZODB.serialize.referencesf
...@@ -238,7 +251,13 @@ class DB(object): ...@@ -238,7 +251,13 @@ class DB(object):
storage.registerDB(self, None) # Backward compat storage.registerDB(self, None) # Backward compat
if not hasattr(storage, 'tpc_vote'): if not hasattr(storage, 'tpc_vote'):
warnings.warn(
"Storage doesn't have a tpc_vote and this violates "
"the stirage API. Violently monkeypatching in a do-nothing "
"tpc_vote.",
DeprecationWarning, 2)
storage.tpc_vote = lambda *args: None storage.tpc_vote = lambda *args: None
try: try:
storage.load(z64, '') storage.load(z64, '')
except KeyError: except KeyError:
...@@ -268,13 +287,45 @@ class DB(object): ...@@ -268,13 +287,45 @@ class DB(object):
database_name) database_name)
databases[database_name] = self databases[database_name] = self
# Pass through methods: self._setupUndoMethods()
for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog', self._setupVersionMethods()
'versionEmpty', 'versions']: self.history = storage.history
setattr(self, m, getattr(storage, m))
if hasattr(storage, 'undoInfo'): def _setupUndoMethods(self):
self.undoInfo = storage.undoInfo storage = self._storage
try:
self.supportsUndo = storage.supportsUndo
except AttributeError:
self.supportsUndo = lambda : False
if self.supportsUndo():
self.undoLog = storage.undoLog
if hasattr(storage, 'undoInfo'):
self.undoInfo = storage.undoInfo
else:
self.undoLog = self.undoInfo = lambda *a,**k: ()
def undo(*a, **k):
raise NotImplementedError
self.undo = undo
def _setupVersionMethods(self):
storage = self._storage
try:
self.supportsVersions = storage.supportsVersions
except AttributeError:
self.supportsVersions = lambda : False
if self.supportsVersions():
self.versionEmpty = storage.versionEmpty
self.versions = storage.versions
self.modifiedInVersion = storage.modifiedInVersion
else:
self.versionEmpty = lambda version: True
self.versions = lambda max=None: ()
self.modifiedInVersion = lambda oid: ''
def commitVersion(*a, **k):
raise NotImplementedError
self.commitVersion = self.abortVersion = commitVersion
# This is called by Connection.close(). # This is called by Connection.close().
def _returnToPool(self, connection): def _returnToPool(self, connection):
...@@ -471,12 +522,6 @@ class DB(object): ...@@ -471,12 +522,6 @@ class DB(object):
""" """
if connection is not None: if connection is not None:
version = connection._version version = connection._version
# Update modified in version cache
for oid in oids:
h = hash(oid) % 131
o = self._miv_cache.get(h, None)
if o is not None and o[0]==oid:
del self._miv_cache[h]
# Notify connections. # Notify connections.
def inval(c): def inval(c):
...@@ -487,20 +532,9 @@ class DB(object): ...@@ -487,20 +532,9 @@ class DB(object):
def invalidateCache(self): def invalidateCache(self):
"""Invalidate each of the connection caches """Invalidate each of the connection caches
""" """
self._miv_cache.clear()
self._connectionMap(lambda c: c.invalidateCache()) self._connectionMap(lambda c: c.invalidateCache())
def modifiedInVersion(self, oid):
h = hash(oid) % 131
cache = self._miv_cache
o = cache.get(h, None)
if o and o[0] == oid:
return o[1]
v = self._storage.modifiedInVersion(oid)
cache[h] = oid, v
return v
def objectCount(self): def objectCount(self):
return len(self._storage) return len(self._storage)
...@@ -687,10 +721,6 @@ class DB(object): ...@@ -687,10 +721,6 @@ class DB(object):
txn = transaction.get() txn = transaction.get()
txn.register(TransactionalUndo(self, id)) txn.register(TransactionalUndo(self, id))
def versionEmpty(self, version):
return self._storage.versionEmpty(version)
resource_counter_lock = threading.Lock() resource_counter_lock = threading.Lock()
resource_counter = 0 resource_counter = 0
...@@ -698,6 +728,11 @@ resource_counter = 0 ...@@ -698,6 +728,11 @@ resource_counter = 0
class ResourceManager(object): class ResourceManager(object):
"""Transaction participation for a version or undo resource.""" """Transaction participation for a version or undo resource."""
# XXX This implementation is broken. Subclasses invalidate oids
# in their commit calls. Invalidations should not be sent until
# tpc_finish is called. In fact, invalidations should be sent to
# the db *while* tpc_finish is being called on the storage.
def __init__(self, db): def __init__(self, db):
self._db = db self._db = db
# Delegate the actual 2PC methods to the storage # Delegate the actual 2PC methods to the storage
...@@ -729,10 +764,10 @@ class ResourceManager(object): ...@@ -729,10 +764,10 @@ class ResourceManager(object):
# argument to the methods below is self. # argument to the methods below is self.
def abort(self, obj, txn): def abort(self, obj, txn):
pass raise NotImplementedError
def commit(self, obj, txn): def commit(self, obj, txn):
pass raise NotImplementedError
class CommitVersion(ResourceManager): class CommitVersion(ResourceManager):
...@@ -742,6 +777,7 @@ class CommitVersion(ResourceManager): ...@@ -742,6 +777,7 @@ class CommitVersion(ResourceManager):
self._dest = dest self._dest = dest
def commit(self, ob, t): def commit(self, ob, t):
# XXX see XXX in ResourceManager
dest = self._dest dest = self._dest
tid, oids = self._db._storage.commitVersion(self._version, tid, oids = self._db._storage.commitVersion(self._version,
self._dest, self._dest,
...@@ -760,6 +796,7 @@ class AbortVersion(ResourceManager): ...@@ -760,6 +796,7 @@ class AbortVersion(ResourceManager):
self._version = version self._version = version
def commit(self, ob, t): def commit(self, ob, t):
# XXX see XXX in ResourceManager
tid, oids = self._db._storage.abortVersion(self._version, t) tid, oids = self._db._storage.abortVersion(self._version, t)
self._db.invalidate(tid, self._db.invalidate(tid,
dict.fromkeys(oids, 1), dict.fromkeys(oids, 1),
...@@ -772,5 +809,6 @@ class TransactionalUndo(ResourceManager): ...@@ -772,5 +809,6 @@ class TransactionalUndo(ResourceManager):
self._tid = tid self._tid = tid
def commit(self, ob, t): def commit(self, ob, t):
# XXX see XXX in ResourceManager
tid, oids = self._db._storage.undo(self._tid, t) tid, oids = self._db._storage.undo(self._tid, t)
self._db.invalidate(tid, dict.fromkeys(oids, 1)) self._db.invalidate(tid, dict.fromkeys(oids, 1))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment