Commit 7d81f21a authored by Jim Fulton's avatar Jim Fulton Committed by GitHub

Merge pull request #56 from zopefoundation/no-more-load

Simplify MVCC by determining transaction start time using lastTransac…
parents 4905bb85 7ed1a8f9
......@@ -20,13 +20,12 @@ to application logic. ZODB includes features such as a plugable storage
interface, rich transaction support, and undo.
"""
version = "4.3.1"
version = "5.0.dev0"
import os
from setuptools import setup, find_packages
classifiers = """\
Development Status :: 4 - Beta
Intended Audience :: Developers
License :: OSI Approved :: Zope Public License
Programming Language :: Python
......@@ -142,7 +141,7 @@ setup(name="ZODB",
'persistent >= 4.2.0',
'BTrees >= 4.2.0',
'ZConfig',
'transaction >= 1.5.0',
'transaction >= 1.6.1',
'six',
'zc.lockfile',
'zope.interface',
......
......@@ -12,12 +12,12 @@
#
##############################################################################
"""ZODB transfer activity monitoring
"""
$Id$"""
import threading
import time
from . import utils
class ActivityMonitor:
"""ZODB load/store activity monitor
......@@ -31,7 +31,7 @@ class ActivityMonitor:
def __init__(self, history_length=3600):
self.history_length = history_length # Number of seconds
self.log = [] # [(time, loads, stores)]
self.trim_lock = threading.Lock()
self.trim_lock = utils.Lock()
def closedConnection(self, conn):
log = self.log
......@@ -42,7 +42,7 @@ class ActivityMonitor:
def trim(self, now):
self.trim_lock.acquire()
log = self.log
cutoff = now - self.history_length
n = 0
......@@ -51,7 +51,7 @@ class ActivityMonitor:
n = n + 1
if n:
del log[:n]
self.trim_lock.release()
def setHistoryLength(self, history_length):
......
......@@ -18,7 +18,6 @@ its use is not recommended. It's still here for historical reasons.
"""
from __future__ import print_function
import threading
import time
import logging
import sys
......@@ -28,10 +27,10 @@ import zope.interface
from persistent.TimeStamp import TimeStamp
import ZODB.interfaces
from ZODB import POSException
from ZODB.utils import z64, oid_repr, byte_ord, byte_chr
from ZODB.UndoLogCompatible import UndoLogCompatible
from ZODB._compat import dumps, _protocol, py2_hasattr
from . import POSException, utils
from .utils import z64, oid_repr, byte_ord, byte_chr
from .UndoLogCompatible import UndoLogCompatible
from ._compat import dumps, _protocol, py2_hasattr
log = logging.getLogger("ZODB.BaseStorage")
......@@ -85,8 +84,8 @@ class BaseStorage(UndoLogCompatible):
log.debug("create storage %s", self.__name__)
# Allocate locks:
self._lock = threading.RLock()
self.__commit_lock = threading.Lock()
self._lock = utils.RLock()
self.__commit_lock = utils.Lock()
# Comment out the following 4 lines to debug locking:
self._lock_acquire = self._lock.acquire
......@@ -108,45 +107,6 @@ class BaseStorage(UndoLogCompatible):
else:
self._oid = oid
########################################################################
# The following methods are normally overridden on instances,
# except when debugging:
def _lock_acquire(self, *args):
f = sys._getframe(1)
sys.stdout.write("[la(%s:%s)\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
self._lock.acquire(*args)
sys.stdout.write("la(%s:%s)]\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
def _lock_release(self, *args):
f = sys._getframe(1)
sys.stdout.write("[lr(%s:%s)\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
self._lock.release(*args)
sys.stdout.write("lr(%s:%s)]\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
def _commit_lock_acquire(self, *args):
f = sys._getframe(1)
sys.stdout.write("[ca(%s:%s)\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
self.__commit_lock.acquire(*args)
sys.stdout.write("ca(%s:%s)]\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
def _commit_lock_release(self, *args):
f = sys._getframe(1)
sys.stdout.write("[cr(%s:%s)\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
self.__commit_lock.release(*args)
sys.stdout.write("cr(%s:%s)]\n" % (f.f_code.co_filename, f.f_lineno))
sys.stdout.flush()
#
########################################################################
def sortKey(self):
"""Return a string that can be used to sort storage instances.
......
......@@ -12,13 +12,11 @@
#
##############################################################################
"""Database connection support
$Id$"""
"""
from __future__ import print_function
import logging
import sys
import tempfile
import threading
import warnings
import os
import time
......@@ -29,7 +27,6 @@ from persistent import PickleCache
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from ZODB.interfaces import IBlobStorage
from ZODB.interfaces import IMVCCStorage
from ZODB.blob import Blob, rename_or_copy_blob, remove_committed_dir
from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
......@@ -52,8 +49,11 @@ from ZODB.utils import p64, u64, z64, oid_repr, positive_id
from ZODB import utils
import six
from .mvccadapter import HistoricalStorageAdapter
global_reset_counter = 0
noop = lambda : None
def resetCaches():
"""Causes all connection caches to be reset as connections are reopened.
......@@ -103,13 +103,17 @@ class Connection(ExportImport, object):
# Multi-database support
self.connections = {self._db.database_name: self}
storage = db.storage
if IMVCCStorage.providedBy(storage):
# Use a connection-specific storage instance.
self._mvcc_storage = True
storage = storage.new_instance()
storage = db._mvcc_storage
if before:
try:
before_instance = storage.before_instance
except AttributeError:
def before_instance(before):
return HistoricalStorageAdapter(
storage.new_instance(), before)
storage = before_instance(before)
else:
self._mvcc_storage = False
storage = storage.new_instance()
self._normal_storage = self._storage = storage
self.new_oid = db.new_oid
......@@ -139,17 +143,17 @@ class Connection(ExportImport, object):
# ReadConflictError (just to be able to clean them up from the
# cache on abort with the other modified objects). All objects
# of this list are either in _cache or in _added.
self._registered_objects = []
self._registered_objects = [] # [object]
# ids and serials of objects for which readCurrent was called
# in a transaction.
self._readCurrent = {}
self._readCurrent = {} # {oid ->serial}
# Dict of oid->obj added explicitly through add(). Used as a
# preliminary cache until commit time when objects are all moved
# to the real _cache. The objects are moved to _creating at
# commit time.
self._added = {}
self._added = {} # {oid -> object}
# During commit this is turned into a list, which receives
# objects added as a side-effect of storing a modified object.
......@@ -163,32 +167,11 @@ class Connection(ExportImport, object):
# adding. Used during abort to remove created objects from the
# _cache, and by persistent_id to check that a new object isn't
# reachable from multiple databases.
self._creating = {}
self._creating = {} # {oid -> implicitly_added_flag}
# List of oids of modified objects, which have to be invalidated
# in the cache on abort and in other connections on finish.
self._modified = []
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
# another is processing invalidations. All the invalidations
# from a single transaction should be applied atomically, so
# the lock must be held when reading _invalidated.
# It sucks that we have to hold the lock to read _invalidated.
# Normally, _invalidated is written by calling dict.update, which
# will execute atomically by virtue of the GIL. But some storage
# might generate oids where hash or compare invokes Python code. In
# that case, the GIL can't save us.
# Note: since that was written, it was officially declared that the
# type of an oid is str. TODO: remove the related now-unnecessary
# critical sections (if any -- this needs careful thought).
self._inv_lock = threading.Lock()
self._invalidated = set()
# Flag indicating whether the cache has been invalidated:
self._invalidatedCache = False
self._modified = [] # [oid]
# We intend to prevent committing a transaction in which
# ReadConflictError occurs. _conflicts is the set of oids that
......@@ -199,12 +182,6 @@ class Connection(ExportImport, object):
# _conflicts).
self._conflicts = {}
# _txn_time stores the upper bound on transactions visible to
# this connection. That is, all object revisions must be
# written before _txn_time. If it is None, then the current
# revisions are acceptable.
self._txn_time = None
# To support importFile(), implemented in the ExportImport base
# class, we need to run _importDuringCommit() from our commit()
# method. If _import is not None, it is a two-tuple of arguments
......@@ -253,7 +230,7 @@ class Connection(ExportImport, object):
if obj is not None:
return obj
p, serial = self._storage.load(oid, '')
p, _ = self._storage.load(oid, '')
obj = self._reader.getGhost(p)
# Avoid infiniate loop if obj tries to load its state before
......@@ -297,22 +274,20 @@ class Connection(ExportImport, object):
# Call the close callbacks.
if self.__onCloseCallbacks is not None:
for f in self.__onCloseCallbacks:
callbacks = self.__onCloseCallbacks
self.__onCloseCallbacks = None
for f in callbacks:
try:
f()
except: # except what?
f = getattr(f, 'im_self', f)
self._log.exception("Close callback failed for %s", f)
self.__onCloseCallbacks = None
self._debug_info = ()
if self.opened:
self.transaction_manager.unregisterSynch(self)
if self._mvcc_storage:
self._storage.sync(force=False)
if primary:
for connection in self.connections.values():
if connection is not self:
......@@ -341,31 +316,7 @@ class Connection(ExportImport, object):
"""Returns True if this connection is read only."""
if self.opened is None:
raise ConnectionStateError("The database connection is closed")
return self.before is not None or self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
if self.before is not None:
# This is a historical connection. Invalidations are irrelevant.
return
self._inv_lock.acquire()
try:
if self._txn_time is None:
self._txn_time = tid
elif (tid is not None) and (tid < self._txn_time):
raise AssertionError("invalidations out of order, %r < %r"
% (tid, self._txn_time))
self._invalidated.update(oids)
finally:
self._inv_lock.release()
def invalidateCache(self):
self._inv_lock.acquire()
try:
self._invalidatedCache = True
finally:
self._inv_lock.release()
return self._storage.isReadOnly()
@property
def root(self):
......@@ -403,8 +354,7 @@ class Connection(ExportImport, object):
def sync(self):
"""Manually update the view on the database."""
self.transaction_manager.abort()
self._storage_sync()
self.transaction_manager.begin()
def getDebugInfo(self):
"""Returns a tuple with different items for debugging the
......@@ -490,62 +440,6 @@ class Connection(ExportImport, object):
self._registered_objects = []
self._creating.clear()
# Process pending invalidations.
def _flush_invalidations(self):
if self._mvcc_storage:
# Poll the storage for invalidations.
invalidated = self._storage.poll_invalidations()
if invalidated is None:
# special value: the transaction is so old that
# we need to flush the whole cache.
self._cache.invalidate(list(self._cache.cache_data.keys()))
elif invalidated:
self._cache.invalidate(invalidated)
self._inv_lock.acquire()
try:
# Non-ghostifiable objects may need to read when they are
# invalidated, so we'll quickly just replace the
# invalidating dict with a new one. We'll then process
# the invalidations after freeing the lock *and* after
# resetting the time. This means that invalidations will
# happen after the start of the transactions. They are
# subject to conflict errors and to reading old data.
# TODO: There is a potential problem lurking for persistent
# classes. Suppose we have an invalidation of a persistent
# class and of an instance. If the instance is
# invalidated first and if the invalidation logic uses
# data read from the class, then the invalidation could
# be performed with stale data. Or, suppose that there
# are instances of the class that are freed as a result of
# invalidating some object. Perhaps code in their __del__
# uses class data. Really, the only way to properly fix
# this is to, in fact, make classes ghostifiable. Then
# we'd have to reimplement attribute lookup to check the
# class state and, if necessary, activate the class. It's
# much worse than that though, because we'd also need to
# deal with slots. When a class is ghostified, we'd need
# to replace all of the slot operations with versions that
# reloaded the object when called. It's hard to say which
# is better or worse. For now, it seems the risk of
# using a class while objects are being invalidated seems
# small enough to be acceptable.
invalidated = dict.fromkeys(self._invalidated)
self._invalidated = set()
self._txn_time = None
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy()
finally:
self._inv_lock.release()
self._cache.invalidate(invalidated)
# Now is a good time to collect some garbage.
self._cache.incrgc()
def tpc_begin(self, transaction):
"""Begin commit of a transaction, starting the two-phase commit."""
self._modified = []
......@@ -600,9 +494,6 @@ class Connection(ExportImport, object):
self._added_during_commit = []
if self._invalidatedCache:
raise ConflictError()
for obj in self._registered_objects:
oid = obj._p_oid
assert oid
......@@ -614,10 +505,6 @@ class Connection(ExportImport, object):
elif oid in self._added:
assert obj._p_serial == z64
elif obj._p_changed:
if oid in self._invalidated:
resolve = getattr(obj, "_p_resolveConflict", None)
if resolve is None:
raise ConflictError(object=obj)
self._modified.append(oid)
else:
# Nothing to do. It's been said that it's legal, e.g., for
......@@ -655,9 +542,6 @@ class Connection(ExportImport, object):
self._creating[oid] = implicitly_adding
else:
if (oid in self._invalidated
and not hasattr(obj, '_p_resolveConflict')):
raise ConflictError(object=obj)
self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj
......@@ -794,21 +678,11 @@ class Connection(ExportImport, object):
self._handle_serial(oid, serial)
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
def callback(tid):
if self._mvcc_storage:
# Inter-connection invalidation is not needed when the
# storage provides MVCC.
return
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
self._storage.tpc_finish(transaction, callback)
"""Indicate confirmation that the transaction is done.
"""
# XXX someday, we'll care about the *real* tid we get back via
# the callback (that we're no longer calling).
self._storage.tpc_finish(transaction)
self._tpc_cleanup()
def sortKey(self):
......@@ -825,20 +699,35 @@ class Connection(ExportImport, object):
# We don't do anything before a commit starts.
pass
# Call the underlying storage's sync() method (if any), and process
# pending invalidations regardless. Of course this should only be
# called at transaction boundaries.
def _storage_sync(self, *ignored):
def newTransaction(self, transaction, sync=True):
self._readCurrent.clear()
sync = getattr(self._storage, 'sync', 0)
if sync:
sync()
self._flush_invalidations()
afterCompletion = _storage_sync
newTransaction = _storage_sync
try:
self._storage.sync(sync)
invalidated = self._storage.poll_invalidations()
if invalidated is None:
# special value: the transaction is so old that
# we need to flush the whole cache.
invalidated = self._cache.cache_data.copy()
self._cache.invalidate(invalidated)
except AttributeError:
assert self._storage is None
# Now is a good time to collect some garbage.
self._cache.incrgc()
def afterCompletion(self, transaction):
# Note that we we call newTransaction here for 2 reasons:
# a) Applying invalidations early frees up resources
# early. This is especially useful if the connection isn't
# going to be used in a while.
# b) Non-hygienic applications might start new transactions by
# finalizing previous ones without calling begin. We pass
# False to avoid possiblyt expensive sync calls to not
# penalize well-behaved applications that call begin.
self.newTransaction(transaction, False)
# Transaction-manager synchronization -- ISynchronizer
# Transaction-manager synchronization -- ISynchronizer
##########################################################################
##########################################################################
......@@ -866,7 +755,20 @@ class Connection(ExportImport, object):
raise
try:
self._setstate(obj, oid)
p, serial = self._storage.load(oid, '')
self._load_count += 1
self._reader.setGhostState(obj, p)
obj._p_serial = serial
self._cache.update_object_size_estimation(oid, len(p))
obj._p_estimated_size = len(p)
# Blob support
if isinstance(obj, Blob):
obj._p_blob_uncommitted = None
obj._p_blob_committed = self._storage.loadBlob(oid, serial)
except ConflictError:
raise
except:
......@@ -874,102 +776,6 @@ class Connection(ExportImport, object):
className(obj), oid_repr(oid))
raise
def _setstate(self, obj, oid):
# Helper for setstate(), which provides logging of failures.
# We accept the oid param, which must be the same as obj._p_oid,
# as a performance optimization for the pure-Python persistent implementation
# where accessing an attribute involves __getattribute__ calls
# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
# If an object has been invalidated, among the cases to consider:
# - Try MVCC
# - Raise ConflictError.
if self.before is not None:
# Load data that was current before the time we have.
before = self.before
t = self._storage.loadBefore(oid, before)
if t is None:
raise POSKeyError() # historical connection!
p, serial, end = t
else:
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if self._invalidatedCache:
raise ReadConflictError()
if (oid in self._invalidated):
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(oid, '')
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
self._cache.update_object_size_estimation(oid, len(p))
obj._p_estimated_size = len(p)
# Blob support
if isinstance(obj, Blob):
obj._p_blob_uncommitted = None
obj._p_blob_committed = self._storage.loadBlob(oid, serial)
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not self._setstate_noncurrent(obj):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start
# MVCC Blob support
if isinstance(obj, Blob):
obj._p_blob_uncommitted = None
obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, start)
return True
def register(self, obj):
"""Register obj with the current transaction manager.
......@@ -1044,18 +850,33 @@ class Connection(ExportImport, object):
register for afterCompletion() calls.
"""
self.opened = time.time()
if transaction_manager is None:
transaction_manager = transaction.manager
self.transaction_manager = transaction_manager
self.opened = time.time()
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
# This newTransaction is to deal with some pathalogical cases:
#
# a) Someone opens a connection when a transaction isn't
# active and proceeeds without calling begin on a
# transaction manager. We initialize the transaction for
# the connection, but we don't do a storage sync, since
# this will be done if a well-nehaved application calls
# begin, and we don't want to penalize well-behaved
# transactions by syncing twice, as storage syncs might be
# expensive.
# b) Lots of tests assume that connection transaction
# information is set on open.
#
# Fortunately, this is a cheap operation. It doesn't really
# cost much, if anything.
self.newTransaction(None, False)
transaction_manager.registerSynch(self)
......@@ -1074,8 +895,6 @@ class Connection(ExportImport, object):
See the docstring for the resetCaches() function.
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
self._invalidatedCache = False
cache_size = self._cache.cache_size
cache_size_bytes = self._cache.cache_size_bytes
self._cache = cache = PickleCache(self, cache_size, cache_size_bytes)
......@@ -1084,9 +903,8 @@ class Connection(ExportImport, object):
def _release_resources(self):
for c in six.itervalues(self.connections):
if c._mvcc_storage:
if c._storage is not None:
c._storage.release()
if c._storage is not None:
c._storage.release()
c._storage = c._normal_storage = None
c._cache = PickleCache(self, 0, 0)
......@@ -1151,7 +969,7 @@ class Connection(ExportImport, object):
self.cacheGC()
return result
def _rollback(self, state):
def _rollback_savepoint(self, state):
self._abort()
self._registered_objects = []
src = self._storage
......@@ -1178,7 +996,7 @@ class Connection(ExportImport, object):
self._creating.update(src.creating)
for oid in oids:
data, serial = src.load(oid, src)
data, serial = src.load(oid)
obj = self._cache.get(oid, None)
if obj is not None:
self._cache.update_object_size_estimation(
......@@ -1193,7 +1011,7 @@ class Connection(ExportImport, object):
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
# to be reattached "cleanly"
self.invalidate(None, (oid, ))
self._cache.invalidate(oid)
else:
s = self._storage.store(oid, serial, data,
'', transaction)
......@@ -1235,16 +1053,17 @@ class Connection(ExportImport, object):
# Savepoint support
#####################################################################
@implementer(IDataManagerSavepoint)
class Savepoint:
def __init__(self, datamanager, state):
self.datamanager = datamanager
self.state = state
def rollback(self):
self.datamanager._rollback(self.state)
self.datamanager._rollback_savepoint(self.state)
@implementer(IBlobStorage)
class TmpStore:
......@@ -1254,7 +1073,7 @@ class TmpStore:
def __init__(self, storage):
self._storage = storage
for method in (
'getName', 'new_oid', 'getSize', 'sortKey', 'loadBefore',
'getName', 'new_oid', 'getSize', 'sortKey',
'isReadOnly'
):
setattr(self, method, getattr(storage, method))
......@@ -1278,7 +1097,7 @@ class TmpStore:
remove_committed_dir(self._blob_dir)
self._blob_dir = None
def load(self, oid, version):
def load(self, oid, version=''):
pos = self.index.get(oid)
if pos is None:
return self._storage.load(oid, '')
......@@ -1372,6 +1191,7 @@ class TmpStore:
self.index = index.copy()
self.creating = creating
class RootConvenience(object):
def __init__(self, root):
......
......@@ -13,13 +13,15 @@
##############################################################################
"""Database objects
"""
from __future__ import print_function
import sys
import threading
import logging
import datetime
import time
import warnings
from . import utils
from ZODB.broken import find_global
from ZODB.utils import z64
from ZODB.Connection import Connection
......@@ -179,6 +181,7 @@ class ConnectionPool(AbstractConnectionPool):
(available and available[0][0] < threshhold)
):
t, c = available.pop(0)
assert not c.opened
self.all.remove(c)
c._release_resources()
......@@ -213,6 +216,7 @@ class ConnectionPool(AbstractConnectionPool):
to_remove = ()
for (t, c) in self.available:
assert not c.opened
if t < threshhold:
to_remove += (c,)
self.all.remove(c)
......@@ -397,15 +401,9 @@ class DB(object):
- `xrefs` - Boolian flag indicating whether implicit cross-database
references are allowed
"""
if isinstance(storage, six.string_types):
from ZODB import FileStorage
storage = ZODB.FileStorage.FileStorage(storage, **storage_args)
elif storage is None:
from ZODB import MappingStorage
storage = ZODB.MappingStorage.MappingStorage(**storage_args)
# Allocate lock.
x = threading.RLock()
x = utils.RLock()
self._a = x.acquire
self._r = x.release
......@@ -419,12 +417,24 @@ class DB(object):
self._historical_cache_size_bytes = historical_cache_size_bytes
# Setup storage
if isinstance(storage, six.string_types):
from ZODB import FileStorage
storage = ZODB.FileStorage.FileStorage(storage, **storage_args)
elif storage is None:
from ZODB import MappingStorage
storage = ZODB.MappingStorage.MappingStorage(**storage_args)
else:
assert not storage_args
self.storage = storage
if IMVCCStorage.providedBy(storage):
self._mvcc_storage = storage
else:
from .mvccadapter import MVCCAdapter
self._mvcc_storage = MVCCAdapter(storage)
self.references = ZODB.serialize.referencesf
try:
storage.registerDB(self)
except TypeError:
storage.registerDB(self, None) # Backward compat
if (not hasattr(storage, 'tpc_vote')) and not storage.isReadOnly():
warnings.warn(
......@@ -434,12 +444,10 @@ class DB(object):
DeprecationWarning, 2)
storage.tpc_vote = lambda *args: None
if IMVCCStorage.providedBy(storage):
temp_storage = storage.new_instance()
else:
temp_storage = storage
temp_storage = self._mvcc_storage.new_instance()
try:
try:
temp_storage.poll_invalidations()
temp_storage.load(z64, '')
except KeyError:
# Create the database's root in the storage if it doesn't exist
......@@ -458,8 +466,7 @@ class DB(object):
temp_storage.tpc_vote(t)
temp_storage.tpc_finish(t)
finally:
if IMVCCStorage.providedBy(temp_storage):
temp_storage.release()
temp_storage.release()
# Multi-database setup.
if databases is None:
......@@ -559,15 +566,17 @@ class DB(object):
# sys.getrefcount(ob) returns. But, in addition to that,
# the cache holds an extra reference on non-ghost objects,
# and we also want to pretend that doesn't exist.
# If we have no way to get a refcount, we return False to symbolize
# that. As opposed to None, this has the advantage of being usable
# as a number (0) in case clients depended on that.
# If we have no way to get a refcount, we return False
# to symbolize that. As opposed to None, this has the
# advantage of being usable as a number (0) in case
# clients depended on that.
detail.append({
'conn_no': cn,
'oid': oid,
'id': id,
'klass': "%s%s" % (module, ob.__class__.__name__),
'rc': rc(ob) - 3 - (ob._p_changed is not None) if rc else False,
'rc': (rc(ob) - 3 - (ob._p_changed is not None)
if rc else False),
'state': ob._p_changed,
#'references': con.references(oid),
})
......@@ -632,8 +641,9 @@ class DB(object):
c.afterCompletion = c.newTransaction = c.close = noop
c._release_resources()
self.storage.close()
self._mvcc_storage.close()
del self.storage
del self._mvcc_storage
def getCacheSize(self):
return self._cache_size
......@@ -665,27 +675,6 @@ class DB(object):
def getHistoricalTimeout(self):
return self.historical_pool.timeout
def invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
change to the object. The connection commiting the change should be
passed in to prevent useless (but harmless) messages to the
connection.
"""
# Storages, esp. ZEO tests, need the version argument still. :-/
assert version==''
# Notify connections.
def inval(c):
if c is not connection:
c.invalidate(tid, oids)
self._connectionMap(inval)
def invalidateCache(self):
"""Invalidate each of the connection caches
"""
self._connectionMap(lambda c: c.invalidateCache())
transform_record_data = untransform_record_data = lambda self, data: data
def objectCount(self):
......@@ -752,19 +741,17 @@ class DB(object):
result = self.pool.pop()
assert result is not None
# open the connection.
result.open(transaction_manager)
# A good time to do some cache cleanup.
# (note we already have the lock)
self.pool.availableGC()
self.historical_pool.availableGC()
return result
finally:
self._r()
result.open(transaction_manager)
return result
def connectionDebugInfo(self):
result = []
t = time.time()
......@@ -986,16 +973,16 @@ class ContextManager:
self.tm.abort()
self.conn.close()
resource_counter_lock = threading.Lock()
resource_counter_lock = utils.Lock()
resource_counter = 0
class TransactionalUndo(object):
def __init__(self, db, tids):
self._db = db
self._storage = db.storage
self._storage = getattr(
db._mvcc_storage, 'undo_instance', db._mvcc_storage.new_instance)()
self._tids = tids
self._oids = set()
def abort(self, transaction):
pass
......@@ -1005,19 +992,13 @@ class TransactionalUndo(object):
def commit(self, transaction):
for tid in self._tids:
result = self._storage.undo(tid, transaction)
if result:
self._oids.update(result[1])
self._storage.undo(tid, transaction)
def tpc_vote(self, transaction):
for oid, _ in self._storage.tpc_vote(transaction) or ():
self._oids.add(oid)
self._storage.tpc_vote(transaction)
def tpc_finish(self, transaction):
self._storage.tpc_finish(
transaction,
lambda tid: self._db.invalidate(tid, self._oids)
)
self._storage.tpc_finish(transaction)
def tpc_abort(self, transaction):
self._storage.tpc_abort(transaction)
......
......@@ -19,11 +19,11 @@ to be layered over a base database.
The base storage must not change.
"""
from __future__ import print_function
import os
import random
import weakref
import tempfile
import threading
import ZODB.BaseStorage
import ZODB.blob
import ZODB.interfaces
......@@ -72,7 +72,7 @@ class DemoStorage(ConflictResolvingStorage):
self._issued_oids = set()
self._stored_oids = set()
self._commit_lock = threading.Lock()
self._commit_lock = ZODB.utils.Lock()
self._transaction = None
if name is None:
......
......@@ -20,7 +20,6 @@ import contextlib
import errno
import logging
import os
import threading
import time
from struct import pack
from struct import unpack
......@@ -31,6 +30,8 @@ from zc.lockfile import LockFile
from zope.interface import alsoProvides
from zope.interface import implementer
from .. import utils
from ZODB.blob import BlobStorageMixin
from ZODB.blob import link_or_copy
from ZODB.blob import remove_committed
......@@ -2047,7 +2048,7 @@ class FilePool:
self.name = file_name
self._files = []
self._out = []
self._cond = threading.Condition()
self._cond = utils.Condition()
@contextlib.contextmanager
def write_lock(self):
......
......@@ -19,7 +19,6 @@ storage without distracting storage details.
import BTrees
import time
import threading
import ZODB.BaseStorage
import ZODB.interfaces
import ZODB.POSException
......@@ -40,10 +39,10 @@ class MappingStorage(object):
self._transactions = BTrees.OOBTree.OOBTree() # {tid->TransactionRecord}
self._ltid = ZODB.utils.z64
self._last_pack = None
_lock = threading.RLock()
_lock = ZODB.utils.RLock()
self._lock_acquire = _lock.acquire
self._lock_release = _lock.release
self._commit_lock = threading.Lock()
self._commit_lock = ZODB.utils.Lock()
self._opened = True
self._transaction = None
self._oid = 0
......
......@@ -273,8 +273,6 @@ test.
>>> conn.root()['first']['count'] += 1
>>> conn.root()['third'] = persistent.mapping.PersistentMapping()
>>> transaction.commit()
>>> len(historical_conn._invalidated)
0
>>> historical_conn.close()
Note that if you try to open an historical connection to a time in the future,
......
......@@ -201,21 +201,6 @@ class IConnection(Interface):
def isReadOnly():
"""Returns True if the storage for this connection is read only."""
def invalidate(tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects are accessed by the
current transaction, the revision written before Connection.tid will be
used.
The DB calls this method, even when the Connection is closed.
Parameters:
tid: the storage-level id of the transaction that committed
oids: oids is an iterable of oids.
"""
def root():
"""Return the database root object.
......@@ -278,14 +263,6 @@ class IConnection(Interface):
If clear is True, reset the counters.
"""
def invalidateCache():
"""Invalidate the connection cache
This invalidates *all* objects in the cache. If the connection
is open, subsequent reads will fail until a new transaction
begins or until the connection os reopned.
"""
def readCurrent(obj):
"""Make sure an object being read is current
......@@ -577,6 +554,20 @@ class IStorage(Interface):
def load(oid, version):
"""Load data for an object id
NOTE: This method is deprecated and may be removed in the
future. It is no longer used by ZODB, although it may still
be used in some tests or scripts. Previously, there was a
requirement that load results be properly ordered with
invalidations so that at any point in time, clients have a
consistent view of what version of an object is current. This
restriction has been relaxed and some storages will be
simplified as a result of the removal of this requirement.
An alternative to calling load is calling loadBefore passing
ZODB.utils.maxtid::
store.loadBefore(oid, ZODB.utils.maxtid)
The version argumement should always be an empty string. It
exists soley for backward compatibility with older storage
implementations.
......@@ -611,19 +602,6 @@ class IStorage(Interface):
otherwise, POSKeyError is raised.
"""
# The following two methods are effectively part of the interface,
# as they are generally needed when one storage wraps
# another. This deserves some thought, at probably debate, before
# adding them.
#
# def _lock_acquire():
# """Acquire the storage lock
# """
# def _lock_release():
# """Release the storage lock
# """
def new_oid():
"""Allocate a new object id.
......@@ -661,11 +639,7 @@ class IStorage(Interface):
The passed object is a wrapper object that provides an upcall
interface to support composition.
Note that, for historical reasons, an implementation may
require a second argument, however, if required, the None will
be passed as the second argument.
Also, for historical reasons, this is called registerDB rather
Note that, for historical reasons, this is called registerDB rather
than register_wrapper.
"""
......@@ -804,7 +778,6 @@ class IStorage(Interface):
"""
class IStorageRestoreable(IStorage):
"""Copying Transactions
......@@ -1096,11 +1069,9 @@ class IMVCCStorage(IStorage):
"""
def release():
"""Release all persistent sessions used by this storage instance.
"""Release resources held by the storage instance.
After this call, the storage instance can still be used;
calling methods that use persistent sessions will cause the
persistent sessions to be reopened.
The storage instance won't be used again after this call.
"""
def poll_invalidations():
......
"""Adapt IStorage objects to IMVCCStorage
This is a largely internal implementation of ZODB, especially DB and
Connection. It takes the MVCC implementation involving invalidations
and start time and moves it into a storage adapter. This allows ZODB
to treat Relstoage and other storages in pretty much the same way and
also simplifies the implementation of the DB and Connection classes.
"""
import zope.interface
from . import interfaces, serialize, POSException
from .utils import p64, u64, Lock
class Base(object):
_copy_methods = (
'getName', 'getSize', 'history', 'lastTransaction', 'sortKey',
'loadBlob', 'openCommittedBlobFile',
'isReadOnly', 'supportsUndo', 'undoLog', 'undoInfo',
'temporaryDirectory',
)
def __init__(self, storage):
self._storage = storage
if interfaces.IBlobStorage.providedBy(storage):
zope.interface.alsoProvides(self, interfaces.IBlobStorage)
def __getattr__(self, name):
if name in self._copy_methods:
if hasattr(self._storage, name):
m = getattr(self._storage, name)
setattr(self, name, m)
return m
raise AttributeError(name)
def __len__(self):
return len(self._storage)
class MVCCAdapter(Base):
def __init__(self, storage):
Base.__init__(self, storage)
self._instances = set()
self._lock = Lock()
if hasattr(storage, 'registerDB'):
storage.registerDB(self)
def new_instance(self):
instance = MVCCAdapterInstance(self)
with self._lock:
self._instances.add(instance)
return instance
def before_instance(self, before=None):
return HistoricalStorageAdapter(self._storage, before)
def undo_instance(self):
return UndoAdapterInstance(self)
def _release(self, instance):
with self._lock:
self._instances.remove(instance)
closed = False
def close(self):
if not self.closed:
self.closed = True
self._storage.close()
del self._instances
del self._storage
def invalidateCache(self):
with self._lock:
for instance in self._instances:
instance._invalidateCache()
def invalidate(self, transaction_id, oids, version=''):
with self._lock:
for instance in self._instances:
instance._invalidate(oids)
def _invalidate_finish(self, oids, committing_instance):
with self._lock:
for instance in self._instances:
if instance is not committing_instance:
instance._invalidate(oids)
references = serialize.referencesf
transform_record_data = untransform_record_data = lambda self, data: data
def pack(self, pack_time, referencesf):
return self._storage.pack(pack_time, referencesf)
class MVCCAdapterInstance(Base):
_copy_methods = Base._copy_methods + (
'loadSerial', 'new_oid', 'tpc_vote',
'checkCurrentSerialInTransaction', 'tpc_abort',
)
def __init__(self, base):
self._base = base
Base.__init__(self, base._storage)
self._lock = Lock()
self._invalidations = set()
self._start = None # Transaction start time
self._sync = getattr(self._storage, 'sync', lambda : None)
def release(self):
self._base._release(self)
close = release
def _invalidateCache(self):
with self._lock:
self._invalidations = None
def _invalidate(self, oids):
with self._lock:
try:
self._invalidations.update(oids)
except AttributeError:
if self._invalidations is not None:
raise
def sync(self, force=True):
if force:
self._sync()
def poll_invalidations(self):
self._start = p64(u64(self._storage.lastTransaction()) + 1)
with self._lock:
if self._invalidations is None:
self._invalidations = set()
return None
else:
result = list(self._invalidations)
self._invalidations.clear()
return result
def load(self, oid, version=''):
assert self._start is not None
r = self._storage.loadBefore(oid, self._start)
if r is None:
raise POSException.ReadConflictError(repr(oid))
return r[:2]
_modified = None # Used to keep track of oids modified within a
# transaction, so we can invalidate them later.
def tpc_begin(self, transaction):
self._storage.tpc_begin(transaction)
self._modified = set()
def store(self, oid, serial, data, version, transaction):
s = self._storage.store(oid, serial, data, version, transaction)
self._modified.add(oid)
return s
def storeBlob(self, oid, serial, data, blobfilename, version, transaction):
s = self._storage.storeBlob(
oid, serial, data, blobfilename, '', transaction)
self._modified.add(oid)
return s
def tpc_finish(self, transaction, func = lambda tid: None):
modified = self._modified
self._modified = None
def invalidate_finish(tid):
self._base._invalidate_finish(modified, self)
func(tid)
self._storage.tpc_finish(transaction, invalidate_finish)
def read_only_writer(self, *a, **kw):
raise POSException.ReadOnlyError
class HistoricalStorageAdapter(Base):
"""Adapt a storage to a historical storage
"""
_copy_methods = Base._copy_methods + (
'loadSerial', 'tpc_begin', 'tpc_finish', 'tpc_abort', 'tpc_vote',
'checkCurrentSerialInTransaction',
)
def __init__(self, storage, before=None):
Base.__init__(self, storage)
self._before = before
def isReadOnly(self):
return True
def supportsUndo(self):
return False
def release(self):
pass
close = release
def sync(self, force=True):
pass
def poll_invalidations(self):
return []
new_oid = pack = store = read_only_writer
def load(self, oid, version=''):
r = self._storage.loadBefore(oid, self._before)
if r is None:
raise POSException.POSKeyError(oid)
return r[:2]
class UndoAdapterInstance(Base):
_copy_methods = Base._copy_methods + (
'tpc_abort',
)
def __init__(self, base):
self._base = base
Base.__init__(self, base._storage)
def release(self):
pass
close = release
def tpc_begin(self, transaction):
self._storage.tpc_begin(transaction)
self._undone = set()
def undo(self, transaction_id, transaction):
result = self._storage.undo(transaction_id, transaction)
if result:
self._undone.update(result[1])
return result
def tpc_vote(self, transaction):
result = self._storage.tpc_vote(transaction)
if result:
for oid, serial in result:
self._undone.add(oid)
def tpc_finish(self, transaction, func = lambda tid: None):
def invalidate_finish(tid):
self._base._invalidate_finish(self._undone, None)
func(tid)
self._storage.tpc_finish(transaction, invalidate_finish)
......@@ -29,6 +29,8 @@ import transaction
import zope.interface
import zope.interface.verify
from .. import utils
ZERO = b'\0'*8
class BasicStorage:
......@@ -345,7 +347,7 @@ class BasicStorage:
results = {}
started.wait()
attempts = []
attempts_cond = threading.Condition()
attempts_cond = utils.Condition()
def update_attempts():
with attempts_cond:
......
......@@ -65,6 +65,7 @@ class ZODBClientThread(TestThread):
for i in range(self.commits):
self.commit(d, i)
self.test.assertEqual(sorted(d.keys()), list(range(self.commits)))
conn.close()
def commit(self, d, num):
d[num] = time.time()
......
......@@ -46,6 +46,8 @@ class MVCCMappingStorage(MappingStorage):
inst._commit_lock = self._commit_lock
inst.new_oid = self.new_oid
inst.pack = self.pack
inst.loadBefore = self.loadBefore
inst._ltid = self._ltid
inst._main_lock_acquire = self._lock_acquire
inst._main_lock_release = self._lock_release
return inst
......@@ -73,11 +75,10 @@ class MVCCMappingStorage(MappingStorage):
# prevent changes to _transactions and _data during analysis
self._main_lock_acquire()
try:
if self._transactions:
new_tid = self._transactions.maxKey()
else:
new_tid = b''
new_tid = ZODB.utils.z64
# Copy the current data into a snapshot. This is obviously
# very inefficient for large storages, but it's good for
......@@ -112,7 +113,7 @@ class MVCCMappingStorage(MappingStorage):
finally:
self._main_lock_release()
self._polled_tid = new_tid
self._polled_tid = self._ltid = new_tid
return list(changed_oids)
def tpc_finish(self, transaction, func = lambda tid: None):
......
......@@ -25,7 +25,8 @@ from ZODB.serialize import referencesf
from ZODB.tests.MinPO import MinPO
from ZODB.tests.MTStorage import TestThread
from ZODB.tests.StorageTestBase import snooze
from ZODB._compat import loads, PersistentPickler, Pickler, Unpickler, BytesIO, _protocol
from ZODB._compat import (loads, PersistentPickler, Pickler, Unpickler,
BytesIO, _protocol)
import transaction
import ZODB.interfaces
import ZODB.tests.util
......@@ -270,6 +271,8 @@ class PackableStorage(PackableStorageBase):
self._sanity_check()
db.close()
def checkPackWhileWriting(self):
self._PackWhileWriting(pack_now=False)
......@@ -312,6 +315,8 @@ class PackableStorage(PackableStorageBase):
self._sanity_check()
db.close()
def checkPackWithMultiDatabaseReferences(self):
databases = {}
db = DB(self._storage, databases=databases, database_name='')
......@@ -327,6 +332,9 @@ class PackableStorage(PackableStorageBase):
db.pack(time.time()+1)
# some valid storages always return 0 for len()
self.assertTrue(len(self._storage) in (0, 1))
conn.close()
otherdb.close()
db.close()
def checkPackAllRevisions(self):
self._initroot()
......@@ -718,7 +726,7 @@ class ClientThread(TestThread):
def __init__(self, db, choices, loop_trip, timer, thread_id):
TestThread.__init__(self)
self.root = db.open().root()
self.db = db
self.choices = choices
self.loop_trip = loop_trip
self.millis = timer.elapsed_millis
......@@ -737,6 +745,7 @@ class ClientThread(TestThread):
def runtest(self):
from random import choice
conn = self.db.open()
for j in range(self.loop_trip):
assign_worked = False
......@@ -745,7 +754,7 @@ class ClientThread(TestThread):
try:
index = choice(self.choices)
alist.extend([self.millis(), index])
self.root[index].value = MinPO(j)
conn.root()[index].value = MinPO(j)
assign_worked = True
transaction.commit()
alist.append(self.millis())
......@@ -756,6 +765,8 @@ class ClientThread(TestThread):
transaction.abort()
alist.append(assign_worked)
conn.close()
class ElapsedTimer:
def __init__(self, start_time):
self.start_time = start_time
......@@ -776,5 +787,5 @@ def IExternalGC_suite(factory):
return doctest.DocFileSuite(
'IExternalGC.test',
setUp=setup, tearDown=zope.testing.setupstack.tearDown,
setUp=setup, tearDown=ZODB.tests.util.tearDown,
checker=ZODB.tests.util.checker)
......@@ -23,7 +23,7 @@ import sys
import time
import transaction
from ZODB.utils import u64
from ZODB.utils import u64, z64
from ZODB.tests.MinPO import MinPO
from ZODB._compat import PersistentPickler, Unpickler, BytesIO, _protocol
import ZODB.tests.util
......@@ -153,8 +153,8 @@ class StorageTestBase(ZODB.tests.util.TestCase):
self._storage.close()
def tearDown(self):
self._close()
ZODB.tests.util.TestCase.tearDown(self)
self._close()
def _dostore(self, oid=None, revid=None, data=None,
already_pickled=0, user=None, description=None):
......
......@@ -74,7 +74,7 @@ You can't put blobs into a database that has uses a Non-Blob-Storage, though:
>>> transaction2.commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
Unsupported: Storing Blobs in <ZODB.MappingStorage.MappingStorage object at ...> is not supported.
Unsupported: Storing Blobs in ...
>>> transaction2.abort()
>>> connection2.close()
......
......@@ -130,4 +130,4 @@ revision as well as the entire directory:
Clean up our blob directory and database:
>>> blob_storage.close()
>>> database.close()
......@@ -49,3 +49,5 @@ writing and expect the file to be in the blob temporary directory::
True
>>> w.close()
>>> database.close()
......@@ -160,3 +160,5 @@ knowledge that the underlying storage's pack method is also called:
>>> blob_storage._blobs_pack_is_in_progress
False
>>> base_storage.pack = base_pack
>>> database.close()
......@@ -61,7 +61,7 @@ While it's boring, it's important to verify that the same relationships
hold if the default pool size is overridden.
>>> handler.clear()
>>> st.close()
>>> db.close()
>>> st = Storage()
>>> PS = 2 # smaller pool size
>>> db = DB(st, pool_size=PS)
......@@ -117,7 +117,7 @@ We can change the pool size on the fly:
Enough of that.
>>> handler.clear()
>>> st.close()
>>> db.close()
More interesting is the stack-like nature of connection reuse. So long as
we keep opening new connections, and keep them alive, all connections
......@@ -256,7 +256,7 @@ Nothing in that last block should have logged any msgs:
If "too many" connections are open, then closing one may kick an older
closed one out of the available connection stack.
>>> st.close()
>>> db.close()
>>> st = Storage()
>>> db = DB(st, pool_size=3)
>>> conns = [db.open() for dummy in range(6)]
......@@ -324,7 +324,7 @@ gc to reclaim the Connection and its cache eventually works, but that can
take "a long time" and caches can hold on to many objects, and limited
resources (like RDB connections), for the duration.
>>> st.close()
>>> db.close()
>>> st = Storage()
>>> db = DB(st, pool_size=2)
>>> conn0 = db.open()
......
......@@ -25,26 +25,43 @@ Make a change locally:
>>> rt = cn.root()
>>> rt['a'] = 1
Sync should not have been called yet.
Sync isn't called when a connectiin is opened, even though that
implicitly starts a new transaction:
>>> st.sync_called # False before 3.4
>>> st.sync_called
False
Sync is only called when we explicitly start a new transaction:
>>> _ = transaction.begin()
>>> st.sync_called
True
>>> st.sync_called = False
BTW, calling ``sync()`` on a connectin starts a new transaction, which
caused ``sync()`` to be called on the storage:
``sync()`` is called by the Connection's ``afterCompletion()`` hook after the
commit completes.
>>> cn.sync()
>>> st.sync_called
True
>>> st.sync_called = False
``sync()`` is not called by the Connection's ``afterCompletion()``
hook after the commit completes, because we'll sunc when a new
transaction begins:
>>> transaction.commit()
>>> st.sync_called # False before 3.4
True
False
``sync()`` is also called by the ``afterCompletion()`` hook after an abort.
``sync()`` is also not called by the ``afterCompletion()`` hook after an abort.
>>> st.sync_called = False
>>> rt['b'] = 2
>>> transaction.abort()
>>> st.sync_called # False before 3.4
True
False
And ``sync()`` is called whenever we explicitly start a new transaction, via
the ``newTransaction()`` hook.
......@@ -62,48 +79,14 @@ traceback then ;-)
>>> cn.close()
One more, very obscure. It was the case that if the first action a new
threaded transaction manager saw was a ``begin()`` call, then synchronizers
registered after that in the same transaction weren't communicated to the
`Transaction` object, and so the synchronizers' ``afterCompletion()`` hooks
weren't called when the transaction commited. None of the test suites
(ZODB's, Zope 2.8's, or Zope3's) caught that, but apparently Zope 3 takes this
path at some point when serving pages.
As a special case, if a synchronizer registers while a transaction is
in flight, then newTransaction and this the storage sync method is
called:
UPDATE: transaction 1.6.1 introduced a change that causes
newTransaction to be called when a synchronizer registeres with a
transaction manager with an active transaction.
>>> tm = transaction.ThreadTransactionManager()
>>> st.sync_called = False
>>> dummy = tm.begin() # we're doing this _before_ opening a connection
>>> cn = db.open(transaction_manager=tm)
>>> rt = cn.root() # make a change
>>> rt['c'] = 3
>>> st.sync_called
True
>>> st.sync_called = False
Now ensure that ``cn.afterCompletion() -> st.sync()`` gets called by commit
despite that the `Connection` registered after the transaction began:
>>> tm.commit()
>>> st.sync_called
True
And try the same thing with a non-threaded transaction manager:
>>> cn.close()
>>> tm = transaction.TransactionManager()
>>> st.sync_called = False
>>> dummy = tm.begin() # we're doing this _before_ opening a connection
>>> _ = tm.begin() # we're doing this _before_ opening a connection
>>> cn = db.open(transaction_manager=tm)
>>> rt = cn.root() # make a change
>>> rt['d'] = 4
>>> st.sync_called
True
>>> st.sync_called = False
>>> tm.commit()
>>> st.sync_called
True
......
......@@ -22,11 +22,13 @@ import unittest
import transaction
import ZODB.tests.util
from ZODB.config import databaseFromString
from ZODB.utils import p64
from ZODB.utils import p64, u64, z64
from persistent import Persistent
from zope.interface.verify import verifyObject
from zope.testing import loggingsupport, renormalizing
from .. import mvccadapter
checker = renormalizing.RENormalizing([
# Python 3 bytes add a "b".
(re.compile("b('.*?')"), r"\1"),
......@@ -154,7 +156,8 @@ class ConnectionDotAdd(ZODB.tests.util.TestCase):
self.datamgr.add(obj)
self.datamgr.tpc_begin(self.transaction)
self.datamgr.tpc_finish(self.transaction)
self.assertTrue(obj._p_oid not in self.datamgr._storage._stored)
self.assertTrue(obj._p_oid not in
self.datamgr._storage._storage._stored)
def test__resetCacheResetsReader(self):
# https://bugs.launchpad.net/zodb/+bug/142667
......@@ -435,8 +438,11 @@ class UserMethodTests(unittest.TestCase):
...
ConnectionStateError: The database connection is closed
>>> db.close()
An expedient way to create a read-only storage:
>>> db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>")
>>> db.storage.isReadOnly = lambda: True
>>> cn = db.open()
>>> cn.isReadOnly()
......@@ -510,7 +516,9 @@ class InvalidationTests(unittest.TestCase):
they have the expected effect.
>>> db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>")
>>> mvcc_storage = db._mvcc_storage
>>> cn = db.open()
>>> mvcc_instance = cn._storage
>>> p1 = Persistent()
>>> p2 = Persistent()
>>> p3 = Persistent()
......@@ -521,22 +529,19 @@ class InvalidationTests(unittest.TestCase):
Transaction ids are 8-byte strings, just like oids; p64() will
create one from an int.
>>> cn.invalidate(p64(1), {p1._p_oid: 1})
>>> cn._txn_time
'\x00\x00\x00\x00\x00\x00\x00\x01'
>>> p1._p_oid in cn._invalidated
True
>>> p2._p_oid in cn._invalidated
False
>>> mvcc_storage.invalidate(p64(1), {p1._p_oid: 1})
>>> cn.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
>>> cn._txn_time
'\x00\x00\x00\x00\x00\x00\x00\x01'
>>> p1._p_oid in cn._invalidated
Transaction start times are based on storage's last
transaction. (Previousely, they were based on the first
invalidation seen in a transaction.)
>>> mvcc_instance.poll_invalidations() == [p1._p_oid]
True
>>> p2._p_oid in cn._invalidated
>>> mvcc_instance._start == p64(u64(db.storage.lastTransaction()) + 1)
True
>>> mvcc_storage.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
Calling invalidate() doesn't affect the object state until
a transaction boundary.
......@@ -552,23 +557,24 @@ class InvalidationTests(unittest.TestCase):
>>> cn.sync()
>>> p1._p_state
-1
0
>>> p2._p_state
-1
>>> p3._p_state
0
>>> cn._invalidated
set([])
>>> db.close()
"""
def doctest_invalidateCache():
"""The invalidateCache method invalidates a connection's cache. It also
prevents reads until the end of a transaction::
"""The invalidateCache method invalidates a connection's cache.
It also prevents reads until the end of a transaction::
>>> from ZODB.tests.util import DB
>>> import transaction
>>> db = DB()
>>> mvcc_storage = db._mvcc_storage
>>> tm = transaction.TransactionManager()
>>> connection = db.open(transaction_manager=tm)
>>> connection.root()['a'] = StubObject()
......@@ -584,53 +590,33 @@ def doctest_invalidateCache():
So we have a connection and an active transaction with some modifications.
Lets call invalidateCache:
>>> connection.invalidateCache()
Now, if we try to load an object, we'll get a read conflict:
>>> connection.root()['b'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
If we try to commit the transaction, we'll get a conflict error:
>>> mvcc_storage.invalidateCache()
>>> tm.commit()
Traceback (most recent call last):
...
ConflictError: database conflict error
This won't have any effect until the next transaction:
and the cache will have been cleared:
>>> connection.root()['a']._p_changed
0
>>> connection.root()['b']._p_changed
>>> connection.root()['c']._p_changed
1
>>> print(connection.root()['a']._p_changed)
None
>>> print(connection.root()['b']._p_changed)
None
>>> print(connection.root()['c']._p_changed)
None
But if we sync():
But we'll be able to access data again:
>>> connection.sync()
>>> connection.root()['b'].x
1
All of our data was invalidated:
Aborting a transaction after a read conflict also lets us read data and go
on about our business:
>>> connection.root()['a']._p_changed
>>> connection.root()['b']._p_changed
>>> connection.root()['c']._p_changed
>>> connection.invalidateCache()
But we can load data as usual:
>>> connection.root()['c'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
Now, if we try to load an object, we'll get a read conflict:
>>> tm.abort()
>>> connection.root()['c'].x
>>> connection.root()['b'].x
1
>>> connection.root()['c'].x = 2
>>> tm.commit()
>>> db.close()
"""
......@@ -1289,6 +1275,9 @@ class StubStorage:
raise TypeError('StubStorage does not support versions.')
return self._data[oid]
def loadBefore(self, oid, tid):
return self._data[oid] + (None, )
def store(self, oid, serial, p, version, transaction):
if version != '':
raise TypeError('StubStorage does not support versions.')
......@@ -1304,6 +1293,9 @@ class StubStorage:
# storage
return None
def lastTransaction(self):
return z64
class TestConnectionInterface(unittest.TestCase):
......@@ -1318,6 +1310,7 @@ class StubDatabase:
def __init__(self):
self.storage = StubStorage()
self._mvcc_storage = mvccadapter.MVCCAdapter(self.storage)
self.new_oid = self.storage.new_oid
classFactory = None
......
......@@ -83,34 +83,40 @@ def test_invalidateCache():
>>> from ZODB.tests.util import DB
>>> import transaction
>>> db = DB()
>>> mvcc_storage = db._mvcc_storage
>>> tm1 = transaction.TransactionManager()
>>> c1 = db.open(transaction_manager=tm1)
>>> c1.root()['a'] = MinPO(1)
>>> tm1.commit()
>>> tm2 = transaction.TransactionManager()
>>> c2 = db.open(transaction_manager=tm2)
>>> c1.root()['a']._p_deactivate()
>>> c2.root()['a'].value
1
>>> tm3 = transaction.TransactionManager()
>>> c3 = db.open(transaction_manager=tm3)
>>> c3.root()['a'].value
1
>>> c3.close()
>>> db.invalidateCache()
>>> c1.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> c2.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> mvcc_storage.invalidateCache()
>>> c1.root.a._p_changed
0
>>> c1.sync()
>>> c1.root.a._p_changed
>>> c2.root.a._p_changed
0
>>> c2.sync()
>>> c2.root.a._p_changed
>>> c3 is db.open(transaction_manager=tm3)
True
>>> print(c3.root()['a']._p_changed)
None
>>> c3.root.a._p_changed
>>> c1.root()['a'].value
1
>>> c2.root()['a'].value
1
>>> c3.root()['a'].value
1
>>> db.close()
"""
......@@ -125,7 +131,7 @@ def connectionDebugInfo():
... now += .1
... return now
>>> real_time = time.time
>>> if isinstance(time,type):
>>> if isinstance(time, type):
... time.time = staticmethod(faux_time) # Jython
... else:
... time.time = faux_time
......@@ -151,7 +157,7 @@ def connectionDebugInfo():
>>> before
[None, '\x03zY\xd8\xc0m9\xdd', None]
>>> opened
['2008-12-04T20:40:44Z (1.40s)', '2008-12-04T20:40:45Z (0.30s)', None]
['2008-12-04T20:40:44Z (1.30s)', '2008-12-04T20:40:46Z (0.10s)', None]
>>> infos
['test info (2)', ' (0)', ' (0)']
......
......@@ -76,12 +76,12 @@ class DemoStorageTests(
db = DB(self._storage) # creates object 0. :)
self.assertEqual(len(self._storage), 1)
self.assertTrue(self._storage)
conn = db.open()
for i in range(10):
conn.root()[i] = conn.root().__class__()
transaction.commit()
with db.transaction() as conn:
for i in range(10):
conn.root()[i] = conn.root().__class__()
self.assertEqual(len(self._storage), 11)
self.assertTrue(self._storage)
db.close()
def checkLoadBeforeUndo(self):
pass # we don't support undo yet
......
......@@ -36,6 +36,7 @@ from ZODB.tests import ReadOnlyStorage, RecoveryStorage
from ZODB.tests.StorageTestBase import MinPO, zodb_pickle
from ZODB._compat import dump, dumps, _protocol
from . import util
class FileStorageTests(
StorageTestBase.StorageTestBase,
......@@ -706,7 +707,7 @@ def test_suite():
suite.addTest(unittest.makeSuite(klass, "check"))
suite.addTest(doctest.DocTestSuite(
setUp=zope.testing.setupstack.setUpDirectory,
tearDown=zope.testing.setupstack.tearDown,
tearDown=util.tearDown,
checker=ZODB.tests.util.checker))
suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
'BlobFileStorage',
......
......@@ -33,6 +33,7 @@ from ZODB.tests import (
)
class MVCCTests:
def checkClosingNestedDatabasesWorks(self):
# This tests for the error described in
# https://github.com/zopefoundation/ZODB/issues/45
......@@ -42,7 +43,6 @@ class MVCCTests:
db1.close()
db2.close()
def checkCrossConnectionInvalidation(self):
# Verify connections see updated state at txn boundaries.
# This will fail if the Connection doesn't poll for changes.
......
......@@ -38,6 +38,8 @@ __test__ = dict(
>>> list(conn2.root()[0].keys())
[]
>>> db2.close()
>>> db1.close()
""",
)
......
......@@ -64,7 +64,7 @@ Now we see two transactions and two changed objects.
Clean up.
>>> st.close()
>>> db.close()
"""
import re
......@@ -87,6 +87,6 @@ checker = renormalizing.RENormalizing([
def test_suite():
return doctest.DocTestSuite(
setUp=zope.testing.setupstack.setUpDirectory,
tearDown=zope.testing.setupstack.tearDown,
tearDown=ZODB.tests.util.tearDown,
optionflags=doctest.REPORT_NDIFF,
checker=ZODB.tests.util.checker + checker)
......@@ -99,10 +99,13 @@ class MinimalMemoryStorage(BaseStorage, object):
del self._txn
def _finish(self, tid, u, d, e):
with self._lock:
self._lock_acquire()
try:
self._index.update(self._txn.index)
self._cur.update(self._txn.cur())
self._ltid = self._tid
finally:
self._lock_release()
def loadBefore(self, the_oid, the_tid):
# It's okay if loadBefore() is really expensive, because this
......@@ -121,6 +124,9 @@ class MinimalMemoryStorage(BaseStorage, object):
end_tid = None
else:
end_tid = tids[j]
self.hook(the_oid, self._cur[the_oid], '')
return self._index[(the_oid, tid)], tid, end_tid
def loadSerial(self, oid, serial):
......
......@@ -54,6 +54,8 @@ except NameError:
import io
file_type = io.BufferedReader
from . import util
def new_time():
"""Create a _new_ time stamp.
......@@ -334,6 +336,7 @@ class RecoveryBlobStorage(BlobTestBase,
transaction.commit()
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
db.close()
def gc_blob_removes_uncommitted_data():
......@@ -446,7 +449,6 @@ def packing_with_uncommitted_data_non_undoing():
Clean up:
>>> database.close()
"""
def packing_with_uncommitted_data_undoing():
......@@ -609,7 +611,7 @@ def do_not_depend_on_cwd():
>>> with conn.root()['blob'].open() as fp: fp.read()
'data'
>>> bs.close()
>>> db.close()
"""
def savepoint_isolation():
......@@ -700,9 +702,11 @@ def savepoint_cleanup():
>>> db.close()
"""
def lp440234_Setting__p_changed_of_a_Blob_w_no_uncomitted_changes_is_noop():
r"""
>>> conn = ZODB.connection('data.fs', blob_dir='blobs')
>>> db = ZODB.DB('data.fs', blob_dir='blobs')
>>> conn = db.open()
>>> blob = ZODB.blob.Blob(b'blah')
>>> conn.add(blob)
>>> transaction.commit()
......@@ -714,7 +718,7 @@ def lp440234_Setting__p_changed_of_a_Blob_w_no_uncomitted_changes_is_noop():
>>> old_serial == blob._p_serial
True
>>> conn.close()
>>> db.close()
"""
def setUp(test):
......@@ -757,7 +761,7 @@ def storage_reusable_suite(prefix, factory,
"blob_connection.txt",
"blob_importexport.txt",
"blob_transaction.txt",
setUp=setup, tearDown=zope.testing.setupstack.tearDown,
setUp=setup, tearDown=util.tearDown,
checker=zope.testing.renormalizing.RENormalizing([
# Py3k renders bytes where Python2 used native strings...
(re.compile(r"^b'"), "'"),
......@@ -780,15 +784,16 @@ def storage_reusable_suite(prefix, factory,
if test_packing:
suite.addTest(doctest.DocFileSuite(
"blob_packing.txt",
setUp=setup, tearDown=zope.testing.setupstack.tearDown,
setUp=setup, tearDown=util.tearDown,
))
suite.addTest(doctest.DocTestSuite(
setUp=setup, tearDown=zope.testing.setupstack.tearDown,
checker = ZODB.tests.util.checker + \
setUp=setup, tearDown=util.tearDown,
checker = (
ZODB.tests.util.checker +
zope.testing.renormalizing.RENormalizing([
(re.compile(r'\%(sep)s\%(sep)s' % dict(sep=os.path.sep)), '/'),
(re.compile(r'\%(sep)s' % dict(sep=os.path.sep)), '/'),
]),
(re.compile(r'\%(sep)s\%(sep)s' % dict(sep=os.path.sep)), '/'),
(re.compile(r'\%(sep)s' % dict(sep=os.path.sep)), '/'),
])),
))
def create_storage(self, name='data', blob_dir=None):
......@@ -823,7 +828,7 @@ def test_suite():
"blob_tempdir.txt",
"blobstorage_packing.txt",
setUp=setUp,
tearDown=zope.testing.setupstack.tearDown,
tearDown=util.tearDown,
optionflags=doctest.ELLIPSIS,
checker=ZODB.tests.util.checker,
))
......@@ -831,7 +836,7 @@ def test_suite():
"blob_layout.txt",
optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE,
setUp=setUp,
tearDown=zope.testing.setupstack.tearDown,
tearDown=util.tearDown,
checker=ZODB.tests.util.checker +
zope.testing.renormalizing.RENormalizing([
(re.compile(r'\%(sep)s\%(sep)s' % dict(sep=os.path.sep)), '/'),
......
......@@ -33,9 +33,17 @@ This note includes doctests that explain how MVCC is implemented (and
test that the implementation is correct). The tests use a
MinimalMemoryStorage that implements MVCC support, but not much else.
***IMPORTANT***: The MVCC approach has changed since these tests were
originally written. The new approach is much simpler because we no
longer call load to get the current state of an object. We call
loadBefore instead, having gotten a transaction time at the start of a
transaction. As a result, the rhythm of the tests is a little odd,
because we no longer need to probe a complex dance that doesn't exist any more.
>>> from ZODB.tests.test_storage import MinimalMemoryStorage
>>> from ZODB import DB
>>> db = DB(MinimalMemoryStorage())
>>> st = MinimalMemoryStorage()
>>> db = DB(st)
We will use two different connections with different transaction managers
to make sure that the connections act independently, even though they'll
......@@ -59,6 +67,10 @@ Now open a second connection.
>>> tm2 = transaction.TransactionManager()
>>> cn2 = db.open(transaction_manager=tm2)
>>> from ZODB.utils import p64, u64
>>> cn2._storage._start == p64(u64(st.lastTransaction()) + 1)
True
>>> txn_time2 = cn2._storage._start
Connection high-water mark
--------------------------
......@@ -67,22 +79,20 @@ The ZODB Connection tracks a transaction high-water mark, which
bounds the latest transaction id that can be read by the current
transaction and still present a consistent view of the database.
Transactions with ids up to but not including the high-water mark
are OK to read. When a transaction commits, the database sends
invalidations to all the other connections; the invalidation contains
the transaction id and the oids of modified objects. The Connection
stores the high-water mark in _txn_time, which is set to None until
an invalidation arrives.
are OK to read. At the beginning of a transaction, a connection
sets the high-water mark to just over the last transaction time the
storage has seen.
>>> cn = db.open()
>>> print(cn._txn_time)
None
>>> cn.invalidate(100, dict.fromkeys([1, 2]))
>>> cn._txn_time
100
>>> cn.invalidate(200, dict.fromkeys([1, 2]))
>>> cn._txn_time
100
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
>>> cn.db()._mvcc_storage.invalidate(100, dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
>>> cn.db()._mvcc_storage.invalidate(200, dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
A connection's high-water mark is set to the transaction id taken from
the first invalidation processed by the connection. Transaction ids are
......@@ -95,8 +105,8 @@ but that doesn't work unless an object is modified. sync() will abort
a transaction and process invalidations.
>>> cn.sync()
>>> print(cn._txn_time) # the high-water mark got reset to None
None
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
Basic functionality
-------------------
......@@ -109,16 +119,16 @@ will modify "a." The other transaction will then modify "b" and commit.
>>> tm1.get().commit()
>>> txn = db.lastTransaction()
The second connection has its high-water mark set now.
The second connection already has its high-water mark set.
>>> cn2._txn_time == txn
>>> cn2._storage._start == txn_time2
True
It is safe to read "b," because it was not modified by the concurrent
transaction.
>>> r2 = cn2.root()
>>> r2["b"]._p_serial < cn2._txn_time
>>> r2["b"]._p_serial < cn2._storage._start
True
>>> r2["b"].value
1
......@@ -130,7 +140,7 @@ non-current version.
>>> r2["a"].value
1
>>> r2["a"]._p_serial < cn2._txn_time
>>> r2["a"]._p_serial < cn2._storage._start
True
We can confirm that we have a non-current revision by asking the
......@@ -143,32 +153,33 @@ It's possible to modify "a", but we get a conflict error when we
commit the transaction.
>>> r2["a"].value = 3
>>> tm2.get().commit()
>>> tm2.get().commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ConflictError: database conflict error (oid 0x01, class ZODB.tests.MinPO.MinPO)
ConflictError: database conflict error (oid 0x01, class ZODB.tests.MinPO...
>>> tm2.get().abort()
This example will demonstrate that we can commit a transaction if we only
modify current revisions.
>>> print(cn2._txn_time)
None
>>> cn2._storage._start == p64(u64(st.lastTransaction()) + 1)
True
>>> txn_time2 = cn2._storage._start
>>> r1 = cn1.root()
>>> r1["a"].value = 3
>>> tm1.get().commit()
>>> txn = db.lastTransaction()
>>> cn2._txn_time == txn
>>> cn2._storage._start == txn_time2
True
>>> r2["b"].value = r2["a"].value + 1
>>> r2["b"].value
3
>>> tm2.get().commit()
>>> print(cn2._txn_time)
None
>>> cn2._storage._start == p64(u64(st.lastTransaction()) + 1)
True
Object cache
------------
......@@ -302,22 +313,18 @@ same things now.
>>> r2["a"].value, r2["b"].value
(42, 43)
>>> db.close()
Late invalidation
-----------------
The combination of ZEO and MVCC adds more complexity. Since
invalidations are delivered asynchronously by ZEO, it is possible for
an invalidation to arrive just after a request to load the invalidated
object is sent. The connection can't use the just-loaded data,
because the invalidation arrived first. The complexity for MVCC is
that it must check for invalidated objects after it has loaded them,
just in case.
The combination of ZEO and MVCC used to add more complexity. That's
why ZODB no-longer calls load. :)
Rather than add all the complexity of ZEO to these tests, the
MinimalMemoryStorage has a hook. We'll write a subclass that will
deliver an invalidation when it loads an object. The hook allows us
to test the Connection code.
deliver an invalidation when it loads (or loadBefore's) an object.
The hook allows us to test the Connection code.
>>> class TestStorage(MinimalMemoryStorage):
... def __init__(self):
......@@ -351,22 +358,30 @@ non-current revision to load.
>>> oid = r1["b"]._p_oid
>>> ts.hooked[oid] = 1
This test is kinda screwy because it depends on an old approach that
has changed. We'll hack the _txn_time to get the original expected
result, even though what's going on now is much simpler.
>>> cn1._storage._start = ts.lastTransaction()
Once the oid is hooked, an invalidation will be delivered the next
time it is activated. The code below activates the object, then
confirms that the hook worked and that the old state was retrieved.
>>> oid in cn1._invalidated
>>> oid in cn1._storage._invalidations
False
>>> r1["b"]._p_state
-1
>>> r1["b"]._p_activate()
>>> oid in cn1._invalidated
>>> oid in cn1._storage._invalidations
True
>>> ts.count
1
>>> r1["b"].value
0
>>> db.close()
No earlier revision available
-----------------------------
......@@ -391,18 +406,17 @@ Again, once the oid is hooked, an invalidation will be delivered the next
time it is activated. The code below activates the object, but unlike the
section above, this is no older state to retrieve.
>>> oid in cn1._invalidated
>>> oid in cn1._storage._invalidations
False
>>> r1["b"]._p_state
-1
>>> r1["b"]._p_activate()
>>> cn1._storage._start = ts.lastTransaction()
>>> r1["b"]._p_activate() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ReadConflictError: database read conflict error (oid 0x02, class ZODB.tests.MinPO.MinPO)
>>> oid in cn1._invalidated
True
>>> ts.count
1
ReadConflictError: ...
>>> db.close()
"""
import doctest
import re
......
......@@ -61,6 +61,7 @@ checker = renormalizing.RENormalizing([
])
def setUp(test, name='test'):
clear_transaction_syncs()
transaction.abort()
d = tempfile.mkdtemp(prefix=name)
zope.testing.setupstack.register(test, zope.testing.setupstack.rmtree, d)
......@@ -71,7 +72,9 @@ def setUp(test, name='test'):
os.chdir(d)
zope.testing.setupstack.register(test, transaction.abort)
tearDown = zope.testing.setupstack.tearDown
def tearDown(test):
clear_transaction_syncs()
zope.testing.setupstack.tearDown(test)
class TestCase(unittest.TestCase):
......@@ -186,3 +189,18 @@ def mess_with_time(test=None, globs=None, now=1278864701.5):
time.time = staticmethod(faux_time) # jython
else:
time.time = faux_time
def clear_transaction_syncs():
"""Clear data managers registered with the global transaction manager
Many tests don't clean up synchronizer's registered with the
global transaction managers, which can wreak havoc with following
tests, now that connections interact with their storages at
transaction boundaries. We need to make sure that we clear any
registered data managers.
For now, we'll use the transaction manager's
underware. Eventually, an transaction managers need to grow an API
for this.
"""
transaction.manager.clearSynchs()
......@@ -11,10 +11,12 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from __future__ import print_function
import os
import struct
import sys
import time
import threading
import warnings
from binascii import hexlify, unhexlify
from struct import pack, unpack
......@@ -32,6 +34,7 @@ __all__ = ['z64',
'u64',
'U64',
'cp',
'maxtid',
'newTid',
'oid_repr',
'serial_repr',
......@@ -100,6 +103,8 @@ else:
z64 = b'\0' * 8
maxtid = b'\x7f\xff\xff\xff\xff\xff\xff\xff'
assert sys.hexversion >= 0x02030000
# The distinction between ints and longs is blurred in Python 2.2,
......@@ -308,3 +313,65 @@ class locked(object):
def __call__(self, func):
return Locked(func, preconditions=self.preconditions)
if os.environ.get('DEBUG_LOCKING'):
class Lock:
lock_class = threading.Lock
def __init__(self):
self._lock = self.lock_class()
def pr(self, name, a=None, kw=None):
f = sys._getframe(2)
if f.f_code.co_filename.endswith('ZODB/utils.py'):
f = sys._getframe(3)
f = '%s:%s' % (f.f_code.co_filename, f.f_lineno)
print(id(self), self._lock, threading.get_ident(), f, name,
a if a else '', kw if kw else '')
def acquire(self, *a, **kw):
self.pr('acquire', a, kw)
return self._lock.acquire(*a, **kw)
def release(self):
self.pr('release')
return self._lock.release()
def __enter__(self):
self.pr('acquire')
return self._lock.acquire()
def __exit__(self, *ignored):
self.pr('release')
return self._lock.release()
class RLock(Lock):
lock_class = threading.RLock
class Condition(Lock):
lock_class = threading.Condition
def wait(self, *a, **kw):
self.pr('wait', a, kw)
return self._lock.wait(*a, **kw)
def wait_for(self, *a, **kw):
self.pr('wait_for', a, kw)
return self._lock.wait_for(*a, **kw)
def notify(self, *a, **kw):
self.pr('notify', a, kw)
return self._lock.notify(*a, **kw)
def notify_all(self):
self.pr('notify_all')
return self._lock.notify_all()
notifyAll = notify_all
else:
from threading import Condition, Lock, RLock
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment