Commit addec006 authored by Jim Fulton's avatar Jim Fulton

Merge branch 'load-calls-loadBefore' of github.com:zopefoundation/ZODB into load-calls-loadBefore

Conflicts:
	src/ZODB/utils.py
parents e8a853a3 7ab793d2
...@@ -2,7 +2,12 @@ ...@@ -2,7 +2,12 @@
Change History Change History
================ ================
4.3.0 (unreleased) 4.3.1 (2016-06-06)
==================
- Fixed: FileStorage loadBefore didn't handle deleted/undone data correctly.
4.3.0 (2016-05-31)
================== ==================
- Drop support for Python 2.6 and 3.2. - Drop support for Python 2.6 and 3.2.
...@@ -29,7 +34,7 @@ ...@@ -29,7 +34,7 @@
- DemoStorage: add support for conflict resolution and fix history() - DemoStorage: add support for conflict resolution and fix history()
https://github.com/zopefoundation/ZODB/pull/58 https://github.com/zopefoundation/ZODB/pull/58
- Fixed: FileStorage loadBefore didn't handle deleted/undone data correctly. - Fixed a test that depended on implementation-specific behavior in tpc_finish
4.2.0 (2015-06-02) 4.2.0 (2015-06-02)
================== ==================
......
...@@ -20,13 +20,12 @@ to application logic. ZODB includes features such as a plugable storage ...@@ -20,13 +20,12 @@ to application logic. ZODB includes features such as a plugable storage
interface, rich transaction support, and undo. interface, rich transaction support, and undo.
""" """
version = "4.3.0.dev0" version = "5.0.dev0"
import os import os
from setuptools import setup, find_packages from setuptools import setup, find_packages
classifiers = """\ classifiers = """\
Development Status :: 4 - Beta
Intended Audience :: Developers Intended Audience :: Developers
License :: OSI Approved :: Zope Public License License :: OSI Approved :: Zope Public License
Programming Language :: Python Programming Language :: Python
...@@ -131,7 +130,7 @@ setup(name="ZODB", ...@@ -131,7 +130,7 @@ setup(name="ZODB",
license = "ZPL 2.1", license = "ZPL 2.1",
platforms = ["any"], platforms = ["any"],
description = doclines[0], description = doclines[0],
classifiers = filter(None, classifiers.split("\n")), classifiers = list(filter(None, classifiers.split("\n"))),
long_description = long_description, long_description = long_description,
test_suite="__main__.alltests", # to support "setup.py test" test_suite="__main__.alltests", # to support "setup.py test"
tests_require = tests_require, tests_require = tests_require,
...@@ -142,7 +141,7 @@ setup(name="ZODB", ...@@ -142,7 +141,7 @@ setup(name="ZODB",
'persistent >= 4.2.0', 'persistent >= 4.2.0',
'BTrees >= 4.2.0', 'BTrees >= 4.2.0',
'ZConfig', 'ZConfig',
'transaction >= 1.5.0', 'transaction >= 1.6.1',
'six', 'six',
'zc.lockfile', 'zc.lockfile',
'zope.interface', 'zope.interface',
......
...@@ -27,7 +27,6 @@ from persistent import PickleCache ...@@ -27,7 +27,6 @@ from persistent import PickleCache
from persistent.interfaces import IPersistentDataManager from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection from ZODB.interfaces import IConnection
from ZODB.interfaces import IBlobStorage from ZODB.interfaces import IBlobStorage
from ZODB.interfaces import IMVCCStorage
from ZODB.blob import Blob, rename_or_copy_blob, remove_committed_dir from ZODB.blob import Blob, rename_or_copy_blob, remove_committed_dir
from transaction.interfaces import ISavepointDataManager from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint from transaction.interfaces import IDataManagerSavepoint
...@@ -50,6 +49,8 @@ from ZODB.utils import p64, u64, z64, oid_repr, positive_id ...@@ -50,6 +49,8 @@ from ZODB.utils import p64, u64, z64, oid_repr, positive_id
from ZODB import utils from ZODB import utils
import six import six
from .mvccadapter import HistoricalStorageAdapter
global_reset_counter = 0 global_reset_counter = 0
noop = lambda : None noop = lambda : None
...@@ -102,13 +103,17 @@ class Connection(ExportImport, object): ...@@ -102,13 +103,17 @@ class Connection(ExportImport, object):
# Multi-database support # Multi-database support
self.connections = {self._db.database_name: self} self.connections = {self._db.database_name: self}
storage = db.storage storage = db._mvcc_storage
if IMVCCStorage.providedBy(storage): if before:
# Use a connection-specific storage instance. try:
self._mvcc_storage = True before_instance = storage.before_instance
storage = storage.new_instance() except AttributeError:
def before_instance(before):
return HistoricalStorageAdapter(
storage.new_instance(), before)
storage = before_instance(before)
else: else:
self._mvcc_storage = False storage = storage.new_instance()
self._normal_storage = self._storage = storage self._normal_storage = self._storage = storage
self.new_oid = db.new_oid self.new_oid = db.new_oid
...@@ -138,17 +143,17 @@ class Connection(ExportImport, object): ...@@ -138,17 +143,17 @@ class Connection(ExportImport, object):
# ReadConflictError (just to be able to clean them up from the # ReadConflictError (just to be able to clean them up from the
# cache on abort with the other modified objects). All objects # cache on abort with the other modified objects). All objects
# of this list are either in _cache or in _added. # of this list are either in _cache or in _added.
self._registered_objects = [] self._registered_objects = [] # [object]
# ids and serials of objects for which readCurrent was called # ids and serials of objects for which readCurrent was called
# in a transaction. # in a transaction.
self._readCurrent = {} self._readCurrent = {} # {oid ->serial}
# Dict of oid->obj added explicitly through add(). Used as a # Dict of oid->obj added explicitly through add(). Used as a
# preliminary cache until commit time when objects are all moved # preliminary cache until commit time when objects are all moved
# to the real _cache. The objects are moved to _creating at # to the real _cache. The objects are moved to _creating at
# commit time. # commit time.
self._added = {} self._added = {} # {oid -> object}
# During commit this is turned into a list, which receives # During commit this is turned into a list, which receives
# objects added as a side-effect of storing a modified object. # objects added as a side-effect of storing a modified object.
...@@ -162,32 +167,11 @@ class Connection(ExportImport, object): ...@@ -162,32 +167,11 @@ class Connection(ExportImport, object):
# adding. Used during abort to remove created objects from the # adding. Used during abort to remove created objects from the
# _cache, and by persistent_id to check that a new object isn't # _cache, and by persistent_id to check that a new object isn't
# reachable from multiple databases. # reachable from multiple databases.
self._creating = {} self._creating = {} # {oid -> implicitly_added_flag}
# List of oids of modified objects, which have to be invalidated # List of oids of modified objects, which have to be invalidated
# in the cache on abort and in other connections on finish. # in the cache on abort and in other connections on finish.
self._modified = [] self._modified = [] # [oid]
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
# another is processing invalidations. All the invalidations
# from a single transaction should be applied atomically, so
# the lock must be held when reading _invalidated.
# It sucks that we have to hold the lock to read _invalidated.
# Normally, _invalidated is written by calling dict.update, which
# will execute atomically by virtue of the GIL. But some storage
# might generate oids where hash or compare invokes Python code. In
# that case, the GIL can't save us.
# Note: since that was written, it was officially declared that the
# type of an oid is str. TODO: remove the related now-unnecessary
# critical sections (if any -- this needs careful thought).
self._inv_lock = utils.Lock()
self._invalidated = set()
# Flag indicating whether the cache has been invalidated:
self._invalidatedCache = False
# We intend to prevent committing a transaction in which # We intend to prevent committing a transaction in which
# ReadConflictError occurs. _conflicts is the set of oids that # ReadConflictError occurs. _conflicts is the set of oids that
...@@ -198,12 +182,6 @@ class Connection(ExportImport, object): ...@@ -198,12 +182,6 @@ class Connection(ExportImport, object):
# _conflicts). # _conflicts).
self._conflicts = {} self._conflicts = {}
# _txn_time stores the upper bound on transactions visible to
# this connection. That is, all object revisions must be
# written before _txn_time. If it is None, then the current
# revisions are acceptable.
self._txn_time = None
# To support importFile(), implemented in the ExportImport base # To support importFile(), implemented in the ExportImport base
# class, we need to run _importDuringCommit() from our commit() # class, we need to run _importDuringCommit() from our commit()
# method. If _import is not None, it is a two-tuple of arguments # method. If _import is not None, it is a two-tuple of arguments
...@@ -252,17 +230,7 @@ class Connection(ExportImport, object): ...@@ -252,17 +230,7 @@ class Connection(ExportImport, object):
if obj is not None: if obj is not None:
return obj return obj
before = self.before p, _ = self._storage.load(oid, '')
if before is None:
# Normal case
before = self._txn_time
data = self._storage.loadBefore(oid, before)
if data is None:
# see the comment in setstate
raise ReadConflictError()
p, _, _ = data
obj = self._reader.getGhost(p) obj = self._reader.getGhost(p)
# Avoid infiniate loop if obj tries to load its state before # Avoid infiniate loop if obj tries to load its state before
...@@ -320,9 +288,6 @@ class Connection(ExportImport, object): ...@@ -320,9 +288,6 @@ class Connection(ExportImport, object):
if self.opened: if self.opened:
self.transaction_manager.unregisterSynch(self) self.transaction_manager.unregisterSynch(self)
if self._mvcc_storage:
self._storage.sync(force=False)
if primary: if primary:
for connection in self.connections.values(): for connection in self.connections.values():
if connection is not self: if connection is not self:
...@@ -351,23 +316,7 @@ class Connection(ExportImport, object): ...@@ -351,23 +316,7 @@ class Connection(ExportImport, object):
"""Returns True if this connection is read only.""" """Returns True if this connection is read only."""
if self.opened is None: if self.opened is None:
raise ConnectionStateError("The database connection is closed") raise ConnectionStateError("The database connection is closed")
return self.before is not None or self._storage.isReadOnly() return self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
if self.before is not None:
# This is a historical connection. Invalidations are irrelevant.
return
with self._inv_lock:
self._invalidated.update(oids)
def invalidateCache(self):
self._inv_lock.acquire()
try:
self._invalidatedCache = True
finally:
self._inv_lock.release()
@property @property
def root(self): def root(self):
...@@ -405,7 +354,7 @@ class Connection(ExportImport, object): ...@@ -405,7 +354,7 @@ class Connection(ExportImport, object):
def sync(self): def sync(self):
"""Manually update the view on the database.""" """Manually update the view on the database."""
self.transaction_manager.abort() self.transaction_manager.begin()
def getDebugInfo(self): def getDebugInfo(self):
"""Returns a tuple with different items for debugging the """Returns a tuple with different items for debugging the
...@@ -545,9 +494,6 @@ class Connection(ExportImport, object): ...@@ -545,9 +494,6 @@ class Connection(ExportImport, object):
self._added_during_commit = [] self._added_during_commit = []
if self._invalidatedCache:
raise ConflictError()
for obj in self._registered_objects: for obj in self._registered_objects:
oid = obj._p_oid oid = obj._p_oid
assert oid assert oid
...@@ -559,10 +505,6 @@ class Connection(ExportImport, object): ...@@ -559,10 +505,6 @@ class Connection(ExportImport, object):
elif oid in self._added: elif oid in self._added:
assert obj._p_serial == z64 assert obj._p_serial == z64
elif obj._p_changed: elif obj._p_changed:
if oid in self._invalidated:
resolve = getattr(obj, "_p_resolveConflict", None)
if resolve is None:
raise ConflictError(object=obj)
self._modified.append(oid) self._modified.append(oid)
else: else:
# Nothing to do. It's been said that it's legal, e.g., for # Nothing to do. It's been said that it's legal, e.g., for
...@@ -600,9 +542,6 @@ class Connection(ExportImport, object): ...@@ -600,9 +542,6 @@ class Connection(ExportImport, object):
self._creating[oid] = implicitly_adding self._creating[oid] = implicitly_adding
else: else:
if (oid in self._invalidated
and not hasattr(obj, '_p_resolveConflict')):
raise ConflictError(object=obj)
self._modified.append(oid) self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj p = writer.serialize(obj) # This calls __getstate__ of obj
...@@ -739,21 +678,11 @@ class Connection(ExportImport, object): ...@@ -739,21 +678,11 @@ class Connection(ExportImport, object):
self._handle_serial(oid, serial) self._handle_serial(oid, serial)
def tpc_finish(self, transaction): def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done.""" """Indicate confirmation that the transaction is done.
"""
def callback(tid): # XXX someday, we'll care about the *real* tid we get back via
if self._mvcc_storage: # the callback (that we're no longer calling).
# Inter-connection invalidation is not needed when the self._storage.tpc_finish(transaction)
# storage provides MVCC.
return
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
self._storage.tpc_finish(transaction, callback)
self._tpc_cleanup() self._tpc_cleanup()
def sortKey(self): def sortKey(self):
...@@ -770,68 +699,33 @@ class Connection(ExportImport, object): ...@@ -770,68 +699,33 @@ class Connection(ExportImport, object):
# We don't do anything before a commit starts. # We don't do anything before a commit starts.
pass pass
def newTransaction(self, transaction=None): def newTransaction(self, transaction, sync=True):
self._readCurrent.clear() self._readCurrent.clear()
if self._mvcc_storage: try:
self._storage.sync(True) self._storage.sync(sync)
# Poll the storage for invalidations. invalidated = self._storage.poll_invalidations()
mvc_invalidated = self._storage.poll_invalidations() if invalidated is None:
if mvc_invalidated is None:
# special value: the transaction is so old that # special value: the transaction is so old that
# we need to flush the whole cache. # we need to flush the whole cache.
self._invalidatedCache = True
else:
getattr(self._storage, 'sync', noop)()
mvc_invalidated = None
with self._inv_lock:
# Non-ghostifiable objects may need to read when they are
# invalidated, so we'll quickly just replace the
# invalidating dict with a new one. We'll then process
# the invalidations after freeing the lock *and* after
# resetting the time. This means that invalidations will
# happen after the start of the transactions. They are
# subject to conflict errors and to reading old data.
# TODO: There is a potential problem lurking for persistent
# classes. Suppose we have an invalidation of a persistent
# class and of an instance. If the instance is
# invalidated first and if the invalidation logic uses
# data read from the class, then the invalidation could
# be performed with stale data. Or, suppose that there
# are instances of the class that are freed as a result of
# invalidating some object. Perhaps code in their __del__
# uses class data. Really, the only way to properly fix
# this is to, in fact, make classes ghostifiable. Then
# we'd have to reimplement attribute lookup to check the
# class state and, if necessary, activate the class. It's
# much worse than that though, because we'd also need to
# deal with slots. When a class is ghostified, we'd need
# to replace all of the slot operations with versions that
# reloaded the object when called. It's hard to say which
# is better or worse. For now, it seems the risk of
# using a class while objects are being invalidated seems
# small enough to be acceptable.
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy() invalidated = self._cache.cache_data.copy()
else: self._cache.invalidate(invalidated)
invalidated = dict.fromkeys(self._invalidated) except AttributeError:
self._invalidated = set() assert self._storage is None
if self.opened:
self._txn_time = p64(u64(self._storage.lastTransaction()) + 1)
if mvc_invalidated:
self._cache.invalidate(mvc_invalidated)
self._cache.invalidate(invalidated)
# Now is a good time to collect some garbage. # Now is a good time to collect some garbage.
self._cache.incrgc() self._cache.incrgc()
afterCompletion = newTransaction def afterCompletion(self, transaction):
# Note that we we call newTransaction here for 2 reasons:
# a) Applying invalidations early frees up resources
# early. This is especially useful if the connection isn't
# going to be used in a while.
# b) Non-hygienic applications might start new transactions by
# finalizing previous ones without calling begin. We pass
# False to avoid possiblyt expensive sync calls to not
# penalize well-behaved applications that call begin.
self.newTransaction(transaction, False)
# Transaction-manager synchronization -- ISynchronizer # Transaction-manager synchronization -- ISynchronizer
########################################################################## ##########################################################################
...@@ -861,24 +755,8 @@ class Connection(ExportImport, object): ...@@ -861,24 +755,8 @@ class Connection(ExportImport, object):
raise raise
try: try:
p, serial = self._storage.load(oid, '')
before = self.before
if before is None:
# Normal case
if self._invalidatedCache:
raise ReadConflictError()
before = self._txn_time
data = self._storage.loadBefore(oid, before)
if data is None:
# We had data (by definition, since we have a
# reference to it), but it's gone. It must have
# updated since this transaction, and been packed
# away, cuz the tests are mean. The best we can do is
# raise a ReadConflictError and try again
raise ReadConflictError()
p, serial, _ = data
self._load_count += 1 self._load_count += 1
self._reader.setGhostState(obj, p) self._reader.setGhostState(obj, p)
...@@ -977,15 +855,30 @@ class Connection(ExportImport, object): ...@@ -977,15 +855,30 @@ class Connection(ExportImport, object):
self.transaction_manager = transaction_manager self.transaction_manager = transaction_manager
transaction_manager.registerSynch(self)
self.opened = time.time() self.opened = time.time()
if self._reset_counter != global_reset_counter: if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache. # New code is in place. Start a new cache.
self._resetCache() self._resetCache()
self.newTransaction() # This newTransaction is to deal with some pathalogical cases:
#
# a) Someone opens a connection when a transaction isn't
# active and proceeeds without calling begin on a
# transaction manager. We initialize the transaction for
# the connection, but we don't do a storage sync, since
# this will be done if a well-nehaved application calls
# begin, and we don't want to penalize well-behaved
# transactions by syncing twice, as storage syncs might be
# expensive.
# b) Lots of tests assume that connection transaction
# information is set on open.
#
# Fortunately, this is a cheap operation. It doesn't really
# cost much, if anything.
self.newTransaction(None, False)
transaction_manager.registerSynch(self)
if self._cache is not None: if self._cache is not None:
self._cache.incrgc() # This is a good time to do some GC self._cache.incrgc() # This is a good time to do some GC
...@@ -1002,8 +895,6 @@ class Connection(ExportImport, object): ...@@ -1002,8 +895,6 @@ class Connection(ExportImport, object):
See the docstring for the resetCaches() function. See the docstring for the resetCaches() function.
""" """
self._reset_counter = global_reset_counter self._reset_counter = global_reset_counter
self._invalidated.clear()
self._invalidatedCache = False
cache_size = self._cache.cache_size cache_size = self._cache.cache_size
cache_size_bytes = self._cache.cache_size_bytes cache_size_bytes = self._cache.cache_size_bytes
self._cache = cache = PickleCache(self, cache_size, cache_size_bytes) self._cache = cache = PickleCache(self, cache_size, cache_size_bytes)
...@@ -1011,11 +902,9 @@ class Connection(ExportImport, object): ...@@ -1011,11 +902,9 @@ class Connection(ExportImport, object):
self._reader._cache = cache self._reader._cache = cache
def _release_resources(self): def _release_resources(self):
assert not self.opened
for c in six.itervalues(self.connections): for c in six.itervalues(self.connections):
if c._mvcc_storage: if c._storage is not None:
if c._storage is not None: c._storage.release()
c._storage.release()
c._storage = c._normal_storage = None c._storage = c._normal_storage = None
c._cache = PickleCache(self, 0, 0) c._cache = PickleCache(self, 0, 0)
...@@ -1058,7 +947,7 @@ class Connection(ExportImport, object): ...@@ -1058,7 +947,7 @@ class Connection(ExportImport, object):
def savepoint(self): def savepoint(self):
if self._savepoint_storage is None: if self._savepoint_storage is None:
tmpstore = TmpStore(self._normal_storage, self._txn_time) tmpstore = TmpStore(self._normal_storage)
self._savepoint_storage = tmpstore self._savepoint_storage = tmpstore
self._storage = self._savepoint_storage self._storage = self._savepoint_storage
...@@ -1080,7 +969,7 @@ class Connection(ExportImport, object): ...@@ -1080,7 +969,7 @@ class Connection(ExportImport, object):
self.cacheGC() self.cacheGC()
return result return result
def _rollback(self, state): def _rollback_savepoint(self, state):
self._abort() self._abort()
self._registered_objects = [] self._registered_objects = []
src = self._storage src = self._storage
...@@ -1107,7 +996,7 @@ class Connection(ExportImport, object): ...@@ -1107,7 +996,7 @@ class Connection(ExportImport, object):
self._creating.update(src.creating) self._creating.update(src.creating)
for oid in oids: for oid in oids:
data, serial, _ = src.loadBefore(oid, self._txn_time) data, serial = src.load(oid)
obj = self._cache.get(oid, None) obj = self._cache.get(oid, None)
if obj is not None: if obj is not None:
self._cache.update_object_size_estimation( self._cache.update_object_size_estimation(
...@@ -1122,7 +1011,7 @@ class Connection(ExportImport, object): ...@@ -1122,7 +1011,7 @@ class Connection(ExportImport, object):
# that that the next attribute access of its name # that that the next attribute access of its name
# unghostify it, which will cause its blob data # unghostify it, which will cause its blob data
# to be reattached "cleanly" # to be reattached "cleanly"
self.invalidate(None, (oid, )) self._cache.invalidate(oid)
else: else:
s = self._storage.store(oid, serial, data, s = self._storage.store(oid, serial, data,
'', transaction) '', transaction)
...@@ -1164,26 +1053,29 @@ class Connection(ExportImport, object): ...@@ -1164,26 +1053,29 @@ class Connection(ExportImport, object):
# Savepoint support # Savepoint support
##################################################################### #####################################################################
@implementer(IDataManagerSavepoint) @implementer(IDataManagerSavepoint)
class Savepoint: class Savepoint:
def __init__(self, datamanager, state): def __init__(self, datamanager, state):
self.datamanager = datamanager self.datamanager = datamanager
self.state = state self.state = state
def rollback(self): def rollback(self):
self.datamanager._rollback(self.state) self.datamanager._rollback_savepoint(self.state)
@implementer(IBlobStorage) @implementer(IBlobStorage)
class TmpStore: class TmpStore:
"""A storage-like thing to support savepoints.""" """A storage-like thing to support savepoints."""
def __init__(self, storage, before): def __init__(self, storage):
self._storage = storage self._storage = storage
self._before = before for method in (
for method in 'getName', 'new_oid', 'getSize', 'sortKey', 'isReadOnly': 'getName', 'new_oid', 'getSize', 'sortKey',
'isReadOnly'
):
setattr(self, method, getattr(storage, method)) setattr(self, method, getattr(storage, method))
self._file = tempfile.TemporaryFile(prefix='TmpStore') self._file = tempfile.TemporaryFile(prefix='TmpStore')
...@@ -1205,11 +1097,10 @@ class TmpStore: ...@@ -1205,11 +1097,10 @@ class TmpStore:
remove_committed_dir(self._blob_dir) remove_committed_dir(self._blob_dir)
self._blob_dir = None self._blob_dir = None
def loadBefore(self, oid, before): def load(self, oid, version=''):
assert before == self._before
pos = self.index.get(oid) pos = self.index.get(oid)
if pos is None: if pos is None:
return self._storage.loadBefore(oid, before) return self._storage.load(oid, '')
self._file.seek(pos) self._file.seek(pos)
h = self._file.read(8) h = self._file.read(8)
oidlen = u64(h) oidlen = u64(h)
...@@ -1219,7 +1110,7 @@ class TmpStore: ...@@ -1219,7 +1110,7 @@ class TmpStore:
h = self._file.read(16) h = self._file.read(16)
size = u64(h[8:]) size = u64(h[8:])
serial = h[:8] serial = h[:8]
return self._file.read(size), serial, None return self._file.read(size), serial
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
# we have this funny signature so we can reuse the normal non-commit # we have this funny signature so we can reuse the normal non-commit
...@@ -1300,6 +1191,7 @@ class TmpStore: ...@@ -1300,6 +1191,7 @@ class TmpStore:
self.index = index.copy() self.index = index.copy()
self.creating = creating self.creating = creating
class RootConvenience(object): class RootConvenience(object):
def __init__(self, root): def __init__(self, root):
......
...@@ -401,12 +401,6 @@ class DB(object): ...@@ -401,12 +401,6 @@ class DB(object):
- `xrefs` - Boolian flag indicating whether implicit cross-database - `xrefs` - Boolian flag indicating whether implicit cross-database
references are allowed references are allowed
""" """
if isinstance(storage, six.string_types):
from ZODB import FileStorage
storage = ZODB.FileStorage.FileStorage(storage, **storage_args)
elif storage is None:
from ZODB import MappingStorage
storage = ZODB.MappingStorage.MappingStorage(**storage_args)
# Allocate lock. # Allocate lock.
x = utils.RLock() x = utils.RLock()
...@@ -423,12 +417,24 @@ class DB(object): ...@@ -423,12 +417,24 @@ class DB(object):
self._historical_cache_size_bytes = historical_cache_size_bytes self._historical_cache_size_bytes = historical_cache_size_bytes
# Setup storage # Setup storage
if isinstance(storage, six.string_types):
from ZODB import FileStorage
storage = ZODB.FileStorage.FileStorage(storage, **storage_args)
elif storage is None:
from ZODB import MappingStorage
storage = ZODB.MappingStorage.MappingStorage(**storage_args)
else:
assert not storage_args
self.storage = storage self.storage = storage
if IMVCCStorage.providedBy(storage):
self._mvcc_storage = storage
else:
from .mvccadapter import MVCCAdapter
self._mvcc_storage = MVCCAdapter(storage)
self.references = ZODB.serialize.referencesf self.references = ZODB.serialize.referencesf
try:
storage.registerDB(self)
except TypeError:
storage.registerDB(self, None) # Backward compat
if (not hasattr(storage, 'tpc_vote')) and not storage.isReadOnly(): if (not hasattr(storage, 'tpc_vote')) and not storage.isReadOnly():
warnings.warn( warnings.warn(
...@@ -438,12 +444,10 @@ class DB(object): ...@@ -438,12 +444,10 @@ class DB(object):
DeprecationWarning, 2) DeprecationWarning, 2)
storage.tpc_vote = lambda *args: None storage.tpc_vote = lambda *args: None
if IMVCCStorage.providedBy(storage): temp_storage = self._mvcc_storage.new_instance()
temp_storage = storage.new_instance()
else:
temp_storage = storage
try: try:
try: try:
temp_storage.poll_invalidations()
temp_storage.load(z64, '') temp_storage.load(z64, '')
except KeyError: except KeyError:
# Create the database's root in the storage if it doesn't exist # Create the database's root in the storage if it doesn't exist
...@@ -462,8 +466,7 @@ class DB(object): ...@@ -462,8 +466,7 @@ class DB(object):
temp_storage.tpc_vote(t) temp_storage.tpc_vote(t)
temp_storage.tpc_finish(t) temp_storage.tpc_finish(t)
finally: finally:
if IMVCCStorage.providedBy(temp_storage): temp_storage.release()
temp_storage.release()
# Multi-database setup. # Multi-database setup.
if databases is None: if databases is None:
...@@ -634,16 +637,13 @@ class DB(object): ...@@ -634,16 +637,13 @@ class DB(object):
@self._connectionMap @self._connectionMap
def _(c): def _(c):
if c.opened: c.transaction_manager.abort()
c.transaction_manager.abort()
# Note that this will modify our pool, but this is safe, because
# _connectionMap makes a list of the pool to iterate over
c.close()
c.afterCompletion = c.newTransaction = c.close = noop c.afterCompletion = c.newTransaction = c.close = noop
c._release_resources() c._release_resources()
self.storage.close() self._mvcc_storage.close()
del self.storage del self.storage
del self._mvcc_storage
def getCacheSize(self): def getCacheSize(self):
return self._cache_size return self._cache_size
...@@ -675,27 +675,6 @@ class DB(object): ...@@ -675,27 +675,6 @@ class DB(object):
def getHistoricalTimeout(self): def getHistoricalTimeout(self):
return self.historical_pool.timeout return self.historical_pool.timeout
def invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
change to the object. The connection commiting the change should be
passed in to prevent useless (but harmless) messages to the
connection.
"""
# Storages, esp. ZEO tests, need the version argument still. :-/
assert version==''
# Notify connections.
def inval(c):
if c is not connection:
c.invalidate(tid, oids)
self._connectionMap(inval)
def invalidateCache(self):
"""Invalidate each of the connection caches
"""
self._connectionMap(lambda c: c.invalidateCache())
transform_record_data = untransform_record_data = lambda self, data: data transform_record_data = untransform_record_data = lambda self, data: data
def objectCount(self): def objectCount(self):
...@@ -762,8 +741,6 @@ class DB(object): ...@@ -762,8 +741,6 @@ class DB(object):
result = self.pool.pop() result = self.pool.pop()
assert result is not None assert result is not None
# open the connection.
# A good time to do some cache cleanup. # A good time to do some cache cleanup.
# (note we already have the lock) # (note we already have the lock)
self.pool.availableGC() self.pool.availableGC()
...@@ -1003,9 +980,9 @@ class TransactionalUndo(object): ...@@ -1003,9 +980,9 @@ class TransactionalUndo(object):
def __init__(self, db, tids): def __init__(self, db, tids):
self._db = db self._db = db
self._storage = db.storage self._storage = getattr(
db._mvcc_storage, 'undo_instance', db._mvcc_storage.new_instance)()
self._tids = tids self._tids = tids
self._oids = set()
def abort(self, transaction): def abort(self, transaction):
pass pass
...@@ -1015,19 +992,13 @@ class TransactionalUndo(object): ...@@ -1015,19 +992,13 @@ class TransactionalUndo(object):
def commit(self, transaction): def commit(self, transaction):
for tid in self._tids: for tid in self._tids:
result = self._storage.undo(tid, transaction) self._storage.undo(tid, transaction)
if result:
self._oids.update(result[1])
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
for oid, _ in self._storage.tpc_vote(transaction) or (): self._storage.tpc_vote(transaction)
self._oids.add(oid)
def tpc_finish(self, transaction): def tpc_finish(self, transaction):
self._storage.tpc_finish( self._storage.tpc_finish(transaction)
transaction,
lambda tid: self._db.invalidate(tid, self._oids)
)
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
self._storage.tpc_abort(transaction) self._storage.tpc_abort(transaction)
......
...@@ -273,8 +273,6 @@ test. ...@@ -273,8 +273,6 @@ test.
>>> conn.root()['first']['count'] += 1 >>> conn.root()['first']['count'] += 1
>>> conn.root()['third'] = persistent.mapping.PersistentMapping() >>> conn.root()['third'] = persistent.mapping.PersistentMapping()
>>> transaction.commit() >>> transaction.commit()
>>> len(historical_conn._invalidated)
0
>>> historical_conn.close() >>> historical_conn.close()
Note that if you try to open an historical connection to a time in the future, Note that if you try to open an historical connection to a time in the future,
......
...@@ -201,21 +201,6 @@ class IConnection(Interface): ...@@ -201,21 +201,6 @@ class IConnection(Interface):
def isReadOnly(): def isReadOnly():
"""Returns True if the storage for this connection is read only.""" """Returns True if the storage for this connection is read only."""
def invalidate(tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects are accessed by the
current transaction, the revision written before Connection.tid will be
used.
The DB calls this method, even when the Connection is closed.
Parameters:
tid: the storage-level id of the transaction that committed
oids: oids is an iterable of oids.
"""
def root(): def root():
"""Return the database root object. """Return the database root object.
...@@ -278,14 +263,6 @@ class IConnection(Interface): ...@@ -278,14 +263,6 @@ class IConnection(Interface):
If clear is True, reset the counters. If clear is True, reset the counters.
""" """
def invalidateCache():
"""Invalidate the connection cache
This invalidates *all* objects in the cache. If the connection
is open, subsequent reads will fail until a new transaction
begins or until the connection os reopned.
"""
def readCurrent(obj): def readCurrent(obj):
"""Make sure an object being read is current """Make sure an object being read is current
...@@ -625,19 +602,6 @@ class IStorage(Interface): ...@@ -625,19 +602,6 @@ class IStorage(Interface):
otherwise, POSKeyError is raised. otherwise, POSKeyError is raised.
""" """
# The following two methods are effectively part of the interface,
# as they are generally needed when one storage wraps
# another. This deserves some thought, at probably debate, before
# adding them.
#
# def _lock_acquire():
# """Acquire the storage lock
# """
# def _lock_release():
# """Release the storage lock
# """
def new_oid(): def new_oid():
"""Allocate a new object id. """Allocate a new object id.
...@@ -675,11 +639,7 @@ class IStorage(Interface): ...@@ -675,11 +639,7 @@ class IStorage(Interface):
The passed object is a wrapper object that provides an upcall The passed object is a wrapper object that provides an upcall
interface to support composition. interface to support composition.
Note that, for historical reasons, an implementation may Note that, for historical reasons, this is called registerDB rather
require a second argument, however, if required, the None will
be passed as the second argument.
Also, for historical reasons, this is called registerDB rather
than register_wrapper. than register_wrapper.
""" """
...@@ -818,7 +778,6 @@ class IStorage(Interface): ...@@ -818,7 +778,6 @@ class IStorage(Interface):
""" """
class IStorageRestoreable(IStorage): class IStorageRestoreable(IStorage):
"""Copying Transactions """Copying Transactions
...@@ -1110,11 +1069,9 @@ class IMVCCStorage(IStorage): ...@@ -1110,11 +1069,9 @@ class IMVCCStorage(IStorage):
""" """
def release(): def release():
"""Release all persistent sessions used by this storage instance. """Release resources held by the storage instance.
After this call, the storage instance can still be used; The storage instance won't be used again after this call.
calling methods that use persistent sessions will cause the
persistent sessions to be reopened.
""" """
def poll_invalidations(): def poll_invalidations():
......
"""Adapt IStorage objects to IMVCCStorage
This is a largely internal implementation of ZODB, especially DB and
Connection. It takes the MVCC implementation involving invalidations
and start time and moves it into a storage adapter. This allows ZODB
to treat Relstoage and other storages in pretty much the same way and
also simplifies the implementation of the DB and Connection classes.
"""
import zope.interface
from . import interfaces, serialize, POSException
from .utils import p64, u64, Lock
class Base(object):
_copy_methods = (
'getName', 'getSize', 'history', 'lastTransaction', 'sortKey',
'loadBlob', 'openCommittedBlobFile',
'isReadOnly', 'supportsUndo', 'undoLog', 'undoInfo',
'temporaryDirectory',
)
def __init__(self, storage):
self._storage = storage
if interfaces.IBlobStorage.providedBy(storage):
zope.interface.alsoProvides(self, interfaces.IBlobStorage)
def __getattr__(self, name):
if name in self._copy_methods:
if hasattr(self._storage, name):
m = getattr(self._storage, name)
setattr(self, name, m)
return m
raise AttributeError(name)
def __len__(self):
return len(self._storage)
class MVCCAdapter(Base):
def __init__(self, storage):
Base.__init__(self, storage)
self._instances = set()
self._lock = Lock()
if hasattr(storage, 'registerDB'):
storage.registerDB(self)
def new_instance(self):
instance = MVCCAdapterInstance(self)
with self._lock:
self._instances.add(instance)
return instance
def before_instance(self, before=None):
return HistoricalStorageAdapter(self._storage, before)
def undo_instance(self):
return UndoAdapterInstance(self)
def _release(self, instance):
with self._lock:
self._instances.remove(instance)
closed = False
def close(self):
if not self.closed:
self.closed = True
self._storage.close()
del self._instances
del self._storage
def invalidateCache(self):
with self._lock:
for instance in self._instances:
instance._invalidateCache()
def invalidate(self, transaction_id, oids, version=''):
with self._lock:
for instance in self._instances:
instance._invalidate(oids)
def _invalidate_finish(self, oids, committing_instance):
with self._lock:
for instance in self._instances:
if instance is not committing_instance:
instance._invalidate(oids)
references = serialize.referencesf
transform_record_data = untransform_record_data = lambda self, data: data
def pack(self, pack_time, referencesf):
return self._storage.pack(pack_time, referencesf)
class MVCCAdapterInstance(Base):
_copy_methods = Base._copy_methods + (
'loadSerial', 'new_oid', 'tpc_vote',
'checkCurrentSerialInTransaction', 'tpc_abort',
)
def __init__(self, base):
self._base = base
Base.__init__(self, base._storage)
self._lock = Lock()
self._invalidations = set()
self._start = None # Transaction start time
self._sync = getattr(self._storage, 'sync', lambda : None)
def release(self):
self._base._release(self)
close = release
def _invalidateCache(self):
with self._lock:
self._invalidations = None
def _invalidate(self, oids):
with self._lock:
try:
self._invalidations.update(oids)
except AttributeError:
if self._invalidations is not None:
raise
def sync(self, force=True):
if force:
self._sync()
def poll_invalidations(self):
self._start = p64(u64(self._storage.lastTransaction()) + 1)
with self._lock:
if self._invalidations is None:
self._invalidations = set()
return None
else:
result = list(self._invalidations)
self._invalidations.clear()
return result
def load(self, oid, version=''):
assert self._start is not None
r = self._storage.loadBefore(oid, self._start)
if r is None:
raise POSException.ReadConflictError(repr(oid))
return r[:2]
_modified = None # Used to keep track of oids modified within a
# transaction, so we can invalidate them later.
def tpc_begin(self, transaction):
self._storage.tpc_begin(transaction)
self._modified = set()
def store(self, oid, serial, data, version, transaction):
s = self._storage.store(oid, serial, data, version, transaction)
self._modified.add(oid)
return s
def storeBlob(self, oid, serial, data, blobfilename, version, transaction):
s = self._storage.storeBlob(
oid, serial, data, blobfilename, '', transaction)
self._modified.add(oid)
return s
def tpc_finish(self, transaction, func = lambda tid: None):
modified = self._modified
self._modified = None
def invalidate_finish(tid):
self._base._invalidate_finish(modified, self)
func(tid)
self._storage.tpc_finish(transaction, invalidate_finish)
def read_only_writer(self, *a, **kw):
raise POSException.ReadOnlyError
class HistoricalStorageAdapter(Base):
"""Adapt a storage to a historical storage
"""
_copy_methods = Base._copy_methods + (
'loadSerial', 'tpc_begin', 'tpc_finish', 'tpc_abort', 'tpc_vote',
'checkCurrentSerialInTransaction',
)
def __init__(self, storage, before=None):
Base.__init__(self, storage)
self._before = before
def isReadOnly(self):
return True
def supportsUndo(self):
return False
def release(self):
pass
close = release
def sync(self, force=True):
pass
def poll_invalidations(self):
return []
new_oid = pack = store = read_only_writer
def load(self, oid, version=''):
r = self._storage.loadBefore(oid, self._before)
if r is None:
raise POSException.POSKeyError(oid)
return r[:2]
class UndoAdapterInstance(Base):
_copy_methods = Base._copy_methods + (
'tpc_abort',
)
def __init__(self, base):
self._base = base
Base.__init__(self, base._storage)
def release(self):
pass
close = release
def tpc_begin(self, transaction):
self._storage.tpc_begin(transaction)
self._undone = set()
def undo(self, transaction_id, transaction):
result = self._storage.undo(transaction_id, transaction)
if result:
self._undone.update(result[1])
return result
def tpc_vote(self, transaction):
result = self._storage.tpc_vote(transaction)
if result:
for oid, serial in result:
self._undone.add(oid)
def tpc_finish(self, transaction, func = lambda tid: None):
def invalidate_finish(tid):
self._base._invalidate_finish(self._undone, None)
func(tid)
self._storage.tpc_finish(transaction, invalidate_finish)
...@@ -17,7 +17,8 @@ Any storage that supports the history() method should be able to pass ...@@ -17,7 +17,8 @@ Any storage that supports the history() method should be able to pass
all these tests. all these tests.
""" """
from time import time import sys
from time import time, sleep
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
class HistoryStorage: class HistoryStorage:
...@@ -31,6 +32,9 @@ class HistoryStorage: ...@@ -31,6 +32,9 @@ class HistoryStorage:
self.assertRaises(KeyError,self._storage.history,oid) self.assertRaises(KeyError,self._storage.history,oid)
revids = [None] revids = [None]
for data in data: for data in data:
if sys.platform == 'win32':
# time.time() has a precision of 1ms on Windows.
sleep(0.002)
revids.append(self._dostore(oid, revids[-1], MinPO(data))) revids.append(self._dostore(oid, revids[-1], MinPO(data)))
revids.reverse() revids.reverse()
del revids[-1] del revids[-1]
......
...@@ -74,7 +74,7 @@ You can't put blobs into a database that has uses a Non-Blob-Storage, though: ...@@ -74,7 +74,7 @@ You can't put blobs into a database that has uses a Non-Blob-Storage, though:
>>> transaction2.commit() # doctest: +ELLIPSIS >>> transaction2.commit() # doctest: +ELLIPSIS
Traceback (most recent call last): Traceback (most recent call last):
... ...
Unsupported: Storing Blobs in <ZODB.MappingStorage.MappingStorage object at ...> is not supported. Unsupported: Storing Blobs in ...
>>> transaction2.abort() >>> transaction2.abort()
>>> connection2.close() >>> connection2.close()
......
...@@ -25,27 +25,43 @@ Make a change locally: ...@@ -25,27 +25,43 @@ Make a change locally:
>>> rt = cn.root() >>> rt = cn.root()
>>> rt['a'] = 1 >>> rt['a'] = 1
Sync is called when a connection is open, as that starts a new transaction: Sync isn't called when a connectiin is opened, even though that
implicitly starts a new transaction:
>>> st.sync_called
False
Sync is only called when we explicitly start a new transaction:
>>> _ = transaction.begin()
>>> st.sync_called >>> st.sync_called
True True
>>> st.sync_called = False >>> st.sync_called = False
BTW, calling ``sync()`` on a connectin starts a new transaction, which
caused ``sync()`` to be called on the storage:
``sync()`` is called by the Connection's ``afterCompletion()`` hook after the >>> cn.sync()
commit completes. >>> st.sync_called
True
>>> st.sync_called = False
``sync()`` is not called by the Connection's ``afterCompletion()``
hook after the commit completes, because we'll sunc when a new
transaction begins:
>>> transaction.commit() >>> transaction.commit()
>>> st.sync_called # False before 3.4 >>> st.sync_called # False before 3.4
True False
``sync()`` is also called by the ``afterCompletion()`` hook after an abort. ``sync()`` is also not called by the ``afterCompletion()`` hook after an abort.
>>> st.sync_called = False >>> st.sync_called = False
>>> rt['b'] = 2 >>> rt['b'] = 2
>>> transaction.abort() >>> transaction.abort()
>>> st.sync_called # False before 3.4 >>> st.sync_called # False before 3.4
True False
And ``sync()`` is called whenever we explicitly start a new transaction, via And ``sync()`` is called whenever we explicitly start a new transaction, via
the ``newTransaction()`` hook. the ``newTransaction()`` hook.
...@@ -63,45 +79,14 @@ traceback then ;-) ...@@ -63,45 +79,14 @@ traceback then ;-)
>>> cn.close() >>> cn.close()
One more, very obscure. It was the case that if the first action a new As a special case, if a synchronizer registers while a transaction is
threaded transaction manager saw was a ``begin()`` call, then synchronizers in flight, then newTransaction and this the storage sync method is
registered after that in the same transaction weren't communicated to the called:
`Transaction` object, and so the synchronizers' ``afterCompletion()`` hooks
weren't called when the transaction commited. None of the test suites
(ZODB's, Zope 2.8's, or Zope3's) caught that, but apparently Zope 3 takes this
path at some point when serving pages.
>>> tm = transaction.ThreadTransactionManager()
>>> st.sync_called = False
>>> dummy = tm.begin() # we're doing this _before_ opening a connection
>>> cn = db.open(transaction_manager=tm)
>>> rt = cn.root() # make a change
>>> rt['c'] = 3
>>> st.sync_called
True
>>> st.sync_called = False
Now ensure that ``cn.afterCompletion() -> st.sync()`` gets called by commit
despite that the `Connection` registered after the transaction began:
>>> tm.commit()
>>> st.sync_called
True
And try the same thing with a non-threaded transaction manager:
>>> cn.close()
>>> tm = transaction.TransactionManager() >>> tm = transaction.TransactionManager()
>>> st.sync_called = False >>> st.sync_called = False
>>> dummy = tm.begin() # we're doing this _before_ opening a connection >>> _ = tm.begin() # we're doing this _before_ opening a connection
>>> cn = db.open(transaction_manager=tm) >>> cn = db.open(transaction_manager=tm)
>>> rt = cn.root() # make a change
>>> rt['d'] = 4
>>> st.sync_called
True
>>> st.sync_called = False
>>> tm.commit()
>>> st.sync_called >>> st.sync_called
True True
......
...@@ -27,6 +27,8 @@ from persistent import Persistent ...@@ -27,6 +27,8 @@ from persistent import Persistent
from zope.interface.verify import verifyObject from zope.interface.verify import verifyObject
from zope.testing import loggingsupport, renormalizing from zope.testing import loggingsupport, renormalizing
from .. import mvccadapter
checker = renormalizing.RENormalizing([ checker = renormalizing.RENormalizing([
# Python 3 bytes add a "b". # Python 3 bytes add a "b".
(re.compile("b('.*?')"), r"\1"), (re.compile("b('.*?')"), r"\1"),
...@@ -154,7 +156,8 @@ class ConnectionDotAdd(ZODB.tests.util.TestCase): ...@@ -154,7 +156,8 @@ class ConnectionDotAdd(ZODB.tests.util.TestCase):
self.datamgr.add(obj) self.datamgr.add(obj)
self.datamgr.tpc_begin(self.transaction) self.datamgr.tpc_begin(self.transaction)
self.datamgr.tpc_finish(self.transaction) self.datamgr.tpc_finish(self.transaction)
self.assertTrue(obj._p_oid not in self.datamgr._storage._stored) self.assertTrue(obj._p_oid not in
self.datamgr._storage._storage._stored)
def test__resetCacheResetsReader(self): def test__resetCacheResetsReader(self):
# https://bugs.launchpad.net/zodb/+bug/142667 # https://bugs.launchpad.net/zodb/+bug/142667
...@@ -435,8 +438,11 @@ class UserMethodTests(unittest.TestCase): ...@@ -435,8 +438,11 @@ class UserMethodTests(unittest.TestCase):
... ...
ConnectionStateError: The database connection is closed ConnectionStateError: The database connection is closed
>>> db.close()
An expedient way to create a read-only storage: An expedient way to create a read-only storage:
>>> db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>")
>>> db.storage.isReadOnly = lambda: True >>> db.storage.isReadOnly = lambda: True
>>> cn = db.open() >>> cn = db.open()
>>> cn.isReadOnly() >>> cn.isReadOnly()
...@@ -510,7 +516,9 @@ class InvalidationTests(unittest.TestCase): ...@@ -510,7 +516,9 @@ class InvalidationTests(unittest.TestCase):
they have the expected effect. they have the expected effect.
>>> db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>") >>> db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>")
>>> mvcc_storage = db._mvcc_storage
>>> cn = db.open() >>> cn = db.open()
>>> mvcc_instance = cn._storage
>>> p1 = Persistent() >>> p1 = Persistent()
>>> p2 = Persistent() >>> p2 = Persistent()
>>> p3 = Persistent() >>> p3 = Persistent()
...@@ -521,29 +529,18 @@ class InvalidationTests(unittest.TestCase): ...@@ -521,29 +529,18 @@ class InvalidationTests(unittest.TestCase):
Transaction ids are 8-byte strings, just like oids; p64() will Transaction ids are 8-byte strings, just like oids; p64() will
create one from an int. create one from an int.
>>> cn.invalidate(p64(1), {p1._p_oid: 1}) >>> mvcc_storage.invalidate(p64(1), {p1._p_oid: 1})
Transaction start times are based on storage's last Transaction start times are based on storage's last
transaction. (Previousely, they were based on the first transaction. (Previousely, they were based on the first
invalidation seen in a transaction.) invalidation seen in a transaction.)
>>> cn._txn_time == p64(u64(db.storage.lastTransaction()) + 1) >>> mvcc_instance.poll_invalidations() == [p1._p_oid]
True True
>>> mvcc_instance._start == p64(u64(db.storage.lastTransaction()) + 1)
>>> p1._p_oid in cn._invalidated
True
>>> p2._p_oid in cn._invalidated
False
>>> cn.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
>>> cn._txn_time == p64(u64(db.storage.lastTransaction()) + 1)
True True
>>> p1._p_oid in cn._invalidated >>> mvcc_storage.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
True
>>> p2._p_oid in cn._invalidated
True
Calling invalidate() doesn't affect the object state until Calling invalidate() doesn't affect the object state until
a transaction boundary. a transaction boundary.
...@@ -560,24 +557,24 @@ class InvalidationTests(unittest.TestCase): ...@@ -560,24 +557,24 @@ class InvalidationTests(unittest.TestCase):
>>> cn.sync() >>> cn.sync()
>>> p1._p_state >>> p1._p_state
-1 0
>>> p2._p_state >>> p2._p_state
-1 -1
>>> p3._p_state >>> p3._p_state
0 0
>>> cn._invalidated
set([])
>>> db.close() >>> db.close()
""" """
def doctest_invalidateCache(): def doctest_invalidateCache():
"""The invalidateCache method invalidates a connection's cache. It also """The invalidateCache method invalidates a connection's cache.
prevents reads until the end of a transaction::
It also prevents reads until the end of a transaction::
>>> from ZODB.tests.util import DB >>> from ZODB.tests.util import DB
>>> import transaction >>> import transaction
>>> db = DB() >>> db = DB()
>>> mvcc_storage = db._mvcc_storage
>>> tm = transaction.TransactionManager() >>> tm = transaction.TransactionManager()
>>> connection = db.open(transaction_manager=tm) >>> connection = db.open(transaction_manager=tm)
>>> connection.root()['a'] = StubObject() >>> connection.root()['a'] = StubObject()
...@@ -593,53 +590,33 @@ def doctest_invalidateCache(): ...@@ -593,53 +590,33 @@ def doctest_invalidateCache():
So we have a connection and an active transaction with some modifications. So we have a connection and an active transaction with some modifications.
Lets call invalidateCache: Lets call invalidateCache:
>>> connection.invalidateCache() >>> mvcc_storage.invalidateCache()
Now, if we try to load an object, we'll get a read conflict:
>>> connection.root()['b'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
If we try to commit the transaction, we'll get a conflict error:
>>> tm.commit() This won't have any effect until the next transaction:
Traceback (most recent call last):
...
ConflictError: database conflict error
and the cache will have been cleared: >>> connection.root()['a']._p_changed
0
>>> connection.root()['b']._p_changed
>>> connection.root()['c']._p_changed
1
>>> print(connection.root()['a']._p_changed) But if we sync():
None
>>> print(connection.root()['b']._p_changed)
None
>>> print(connection.root()['c']._p_changed)
None
But we'll be able to access data again: >>> connection.sync()
>>> connection.root()['b'].x All of our data was invalidated:
1
Aborting a transaction after a read conflict also lets us read data and go >>> connection.root()['a']._p_changed
on about our business: >>> connection.root()['b']._p_changed
>>> connection.root()['c']._p_changed
>>> connection.invalidateCache() But we can load data as usual:
>>> connection.root()['c'].x Now, if we try to load an object, we'll get a read conflict:
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> tm.abort() >>> connection.root()['b'].x
>>> connection.root()['c'].x
1 1
>>> connection.root()['c'].x = 2
>>> tm.commit()
>>> db.close() >>> db.close()
""" """
...@@ -1333,6 +1310,7 @@ class StubDatabase: ...@@ -1333,6 +1310,7 @@ class StubDatabase:
def __init__(self): def __init__(self):
self.storage = StubStorage() self.storage = StubStorage()
self._mvcc_storage = mvccadapter.MVCCAdapter(self.storage)
self.new_oid = self.storage.new_oid self.new_oid = self.storage.new_oid
classFactory = None classFactory = None
......
...@@ -83,34 +83,40 @@ def test_invalidateCache(): ...@@ -83,34 +83,40 @@ def test_invalidateCache():
>>> from ZODB.tests.util import DB >>> from ZODB.tests.util import DB
>>> import transaction >>> import transaction
>>> db = DB() >>> db = DB()
>>> mvcc_storage = db._mvcc_storage
>>> tm1 = transaction.TransactionManager() >>> tm1 = transaction.TransactionManager()
>>> c1 = db.open(transaction_manager=tm1) >>> c1 = db.open(transaction_manager=tm1)
>>> c1.root()['a'] = MinPO(1) >>> c1.root()['a'] = MinPO(1)
>>> tm1.commit() >>> tm1.commit()
>>> tm2 = transaction.TransactionManager() >>> tm2 = transaction.TransactionManager()
>>> c2 = db.open(transaction_manager=tm2) >>> c2 = db.open(transaction_manager=tm2)
>>> c1.root()['a']._p_deactivate() >>> c2.root()['a'].value
1
>>> tm3 = transaction.TransactionManager() >>> tm3 = transaction.TransactionManager()
>>> c3 = db.open(transaction_manager=tm3) >>> c3 = db.open(transaction_manager=tm3)
>>> c3.root()['a'].value >>> c3.root()['a'].value
1 1
>>> c3.close() >>> c3.close()
>>> db.invalidateCache()
>>> c1.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> c2.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> mvcc_storage.invalidateCache()
>>> c1.root.a._p_changed
0
>>> c1.sync()
>>> c1.root.a._p_changed
>>> c2.root.a._p_changed
0
>>> c2.sync()
>>> c2.root.a._p_changed
>>> c3 is db.open(transaction_manager=tm3) >>> c3 is db.open(transaction_manager=tm3)
True True
>>> print(c3.root()['a']._p_changed) >>> c3.root.a._p_changed
None
>>> c1.root()['a'].value
1
>>> c2.root()['a'].value
1
>>> c3.root()['a'].value
1
>>> db.close() >>> db.close()
""" """
......
...@@ -15,6 +15,7 @@ import doctest ...@@ -15,6 +15,7 @@ import doctest
import os import os
if os.environ.get('USE_ZOPE_TESTING_DOCTEST'): if os.environ.get('USE_ZOPE_TESTING_DOCTEST'):
from zope.testing import doctest from zope.testing import doctest
import sys
import unittest import unittest
import transaction import transaction
import ZODB.FileStorage import ZODB.FileStorage
...@@ -303,9 +304,18 @@ class FileStorageTests( ...@@ -303,9 +304,18 @@ class FileStorageTests(
# is based on what was cached during the first load. # is based on what was cached during the first load.
self.assertEqual(storage.load(z64)[0], b'foo' if fail else b'bar') self.assertEqual(storage.load(z64)[0], b'foo' if fail else b'bar')
def checkFlushNeededAfterTruncate(self): # We want to be sure that the above test detects any regression
self._storage._files.flush = lambda: None # in the code it checks, because any bug here is like a time bomb: not
self.checkFlushAfterTruncate(True) # obvious, hard to reproduce, with possible data corruption.
# It's even more important that FilePool.flush() is quite aggressive and
# we'd like to optimize it when Python gets an API to flush read buffers.
# Therefore, 'checkFlushAfterTruncate' is tested in turn by another unit
# test.
# On Windows, flushing explicitely is not (always?) necessary.
if sys.platform != 'win32':
def checkFlushNeededAfterTruncate(self):
self._storage._files.flush = lambda: None
self.checkFlushAfterTruncate(True)
class FileStorageHexTests(FileStorageTests): class FileStorageHexTests(FileStorageTests):
...@@ -390,7 +400,7 @@ class AnalyzeDotPyTest(StorageTestBase.StorageTestBase): ...@@ -390,7 +400,7 @@ class AnalyzeDotPyTest(StorageTestBase.StorageTestBase):
self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True) self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True)
def checkanalyze(self): def checkanalyze(self):
import types, sys import types
from BTrees.OOBTree import OOBTree from BTrees.OOBTree import OOBTree
from ZODB.scripts import analyze from ZODB.scripts import analyze
......
...@@ -547,14 +547,13 @@ def loadblob_tmpstore(): ...@@ -547,14 +547,13 @@ def loadblob_tmpstore():
>>> transaction.commit() >>> transaction.commit()
>>> blob_oid = root['blob']._p_oid >>> blob_oid = root['blob']._p_oid
>>> tid = connection._storage.lastTransaction() >>> tid = connection._storage.lastTransaction()
>>> _txn_time = connection._txn_time
Now we open a database with a TmpStore in front: Now we open a database with a TmpStore in front:
>>> database.close() >>> database.close()
>>> from ZODB.Connection import TmpStore >>> from ZODB.Connection import TmpStore
>>> tmpstore = TmpStore(blob_storage, _txn_time) >>> tmpstore = TmpStore(blob_storage)
We can access the blob correctly: We can access the blob correctly:
......
...@@ -38,7 +38,7 @@ originally written. The new approach is much simpler because we no ...@@ -38,7 +38,7 @@ originally written. The new approach is much simpler because we no
longer call load to get the current state of an object. We call longer call load to get the current state of an object. We call
loadBefore instead, having gotten a transaction time at the start of a loadBefore instead, having gotten a transaction time at the start of a
transaction. As a result, the rhythm of the tests is a little odd, transaction. As a result, the rhythm of the tests is a little odd,
because the probe a complex dance that doesn't exist any more. because we no longer need to probe a complex dance that doesn't exist any more.
>>> from ZODB.tests.test_storage import MinimalMemoryStorage >>> from ZODB.tests.test_storage import MinimalMemoryStorage
>>> from ZODB import DB >>> from ZODB import DB
...@@ -68,9 +68,9 @@ Now open a second connection. ...@@ -68,9 +68,9 @@ Now open a second connection.
>>> tm2 = transaction.TransactionManager() >>> tm2 = transaction.TransactionManager()
>>> cn2 = db.open(transaction_manager=tm2) >>> cn2 = db.open(transaction_manager=tm2)
>>> from ZODB.utils import p64, u64 >>> from ZODB.utils import p64, u64
>>> cn2._txn_time == p64(u64(st.lastTransaction()) + 1) >>> cn2._storage._start == p64(u64(st.lastTransaction()) + 1)
True True
>>> txn_time2 = cn2._txn_time >>> txn_time2 = cn2._storage._start
Connection high-water mark Connection high-water mark
-------------------------- --------------------------
...@@ -85,13 +85,13 @@ storage has seen. ...@@ -85,13 +85,13 @@ storage has seen.
>>> cn = db.open() >>> cn = db.open()
>>> cn._txn_time == p64(u64(st.lastTransaction()) + 1) >>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True True
>>> cn.invalidate(100, dict.fromkeys([1, 2])) >>> cn.db()._mvcc_storage.invalidate(100, dict.fromkeys([1, 2]))
>>> cn._txn_time == p64(u64(st.lastTransaction()) + 1) >>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True True
>>> cn.invalidate(200, dict.fromkeys([1, 2])) >>> cn.db()._mvcc_storage.invalidate(200, dict.fromkeys([1, 2]))
>>> cn._txn_time == p64(u64(st.lastTransaction()) + 1) >>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True True
A connection's high-water mark is set to the transaction id taken from A connection's high-water mark is set to the transaction id taken from
...@@ -105,7 +105,7 @@ but that doesn't work unless an object is modified. sync() will abort ...@@ -105,7 +105,7 @@ but that doesn't work unless an object is modified. sync() will abort
a transaction and process invalidations. a transaction and process invalidations.
>>> cn.sync() >>> cn.sync()
>>> cn._txn_time == p64(u64(st.lastTransaction()) + 1) >>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True True
Basic functionality Basic functionality
...@@ -121,14 +121,14 @@ will modify "a." The other transaction will then modify "b" and commit. ...@@ -121,14 +121,14 @@ will modify "a." The other transaction will then modify "b" and commit.
The second connection already has its high-water mark set. The second connection already has its high-water mark set.
>>> cn2._txn_time == txn_time2 >>> cn2._storage._start == txn_time2
True True
It is safe to read "b," because it was not modified by the concurrent It is safe to read "b," because it was not modified by the concurrent
transaction. transaction.
>>> r2 = cn2.root() >>> r2 = cn2.root()
>>> r2["b"]._p_serial < cn2._txn_time >>> r2["b"]._p_serial < cn2._storage._start
True True
>>> r2["b"].value >>> r2["b"].value
1 1
...@@ -140,7 +140,7 @@ non-current version. ...@@ -140,7 +140,7 @@ non-current version.
>>> r2["a"].value >>> r2["a"].value
1 1
>>> r2["a"]._p_serial < cn2._txn_time >>> r2["a"]._p_serial < cn2._storage._start
True True
We can confirm that we have a non-current revision by asking the We can confirm that we have a non-current revision by asking the
...@@ -153,32 +153,32 @@ It's possible to modify "a", but we get a conflict error when we ...@@ -153,32 +153,32 @@ It's possible to modify "a", but we get a conflict error when we
commit the transaction. commit the transaction.
>>> r2["a"].value = 3 >>> r2["a"].value = 3
>>> tm2.get().commit() >>> tm2.get().commit() # doctest: +ELLIPSIS
Traceback (most recent call last): Traceback (most recent call last):
... ...
ConflictError: database conflict error (oid 0x01, class ZODB.tests.MinPO.MinPO) ConflictError: database conflict error (oid 0x01, class ZODB.tests.MinPO...
>>> tm2.get().abort() >>> tm2.get().abort()
This example will demonstrate that we can commit a transaction if we only This example will demonstrate that we can commit a transaction if we only
modify current revisions. modify current revisions.
>>> cn2._txn_time == p64(u64(st.lastTransaction()) + 1) >>> cn2._storage._start == p64(u64(st.lastTransaction()) + 1)
True True
>>> txn_time2 = cn2._txn_time >>> txn_time2 = cn2._storage._start
>>> r1 = cn1.root() >>> r1 = cn1.root()
>>> r1["a"].value = 3 >>> r1["a"].value = 3
>>> tm1.get().commit() >>> tm1.get().commit()
>>> txn = db.lastTransaction() >>> txn = db.lastTransaction()
>>> cn2._txn_time == txn_time2 >>> cn2._storage._start == txn_time2
True True
>>> r2["b"].value = r2["a"].value + 1 >>> r2["b"].value = r2["a"].value + 1
>>> r2["b"].value >>> r2["b"].value
3 3
>>> tm2.get().commit() >>> tm2.get().commit()
>>> cn2._txn_time == p64(u64(st.lastTransaction()) + 1) >>> cn2._storage._start == p64(u64(st.lastTransaction()) + 1)
True True
Object cache Object cache
...@@ -362,18 +362,18 @@ This test is kinda screwy because it depends on an old approach that ...@@ -362,18 +362,18 @@ This test is kinda screwy because it depends on an old approach that
has changed. We'll hack the _txn_time to get the original expected has changed. We'll hack the _txn_time to get the original expected
result, even though what's going on now is much simpler. result, even though what's going on now is much simpler.
>>> cn1._txn_time = ts.lastTransaction() >>> cn1._storage._start = ts.lastTransaction()
Once the oid is hooked, an invalidation will be delivered the next Once the oid is hooked, an invalidation will be delivered the next
time it is activated. The code below activates the object, then time it is activated. The code below activates the object, then
confirms that the hook worked and that the old state was retrieved. confirms that the hook worked and that the old state was retrieved.
>>> oid in cn1._invalidated >>> oid in cn1._storage._invalidations
False False
>>> r1["b"]._p_state >>> r1["b"]._p_state
-1 -1
>>> r1["b"]._p_activate() >>> r1["b"]._p_activate()
>>> oid in cn1._invalidated >>> oid in cn1._storage._invalidations
True True
>>> ts.count >>> ts.count
1 1
...@@ -406,15 +406,15 @@ Again, once the oid is hooked, an invalidation will be delivered the next ...@@ -406,15 +406,15 @@ Again, once the oid is hooked, an invalidation will be delivered the next
time it is activated. The code below activates the object, but unlike the time it is activated. The code below activates the object, but unlike the
section above, this is no older state to retrieve. section above, this is no older state to retrieve.
>>> oid in cn1._invalidated >>> oid in cn1._storage._invalidations
False False
>>> r1["b"]._p_state >>> r1["b"]._p_state
-1 -1
>>> cn1._txn_time = ts.lastTransaction() >>> cn1._storage._start = ts.lastTransaction()
>>> r1["b"]._p_activate() >>> r1["b"]._p_activate() # doctest: +ELLIPSIS
Traceback (most recent call last): Traceback (most recent call last):
... ...
ReadConflictError: database read conflict error ReadConflictError: ...
>>> db.close() >>> db.close()
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment