Commit 29091374 authored by Jim Fulton's avatar Jim Fulton

Abstract ZODB's MVCC implementation into a storage adapter.

That's applied to storages other than RelStorage.

See: https://groups.google.com/forum/#!topic/zodb/QJYmvF0eUUM
parent 9bcd944e
......@@ -20,7 +20,7 @@ to application logic. ZODB includes features such as a plugable storage
interface, rich transaction support, and undo.
"""
version = "4.3.0.dev0"
version = "5.0.dev0"
import os
from setuptools import setup, find_packages
......
......@@ -27,7 +27,6 @@ from persistent import PickleCache
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from ZODB.interfaces import IBlobStorage
from ZODB.interfaces import IMVCCStorage
from ZODB.blob import Blob, rename_or_copy_blob, remove_committed_dir
from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
......@@ -102,13 +101,11 @@ class Connection(ExportImport, object):
# Multi-database support
self.connections = {self._db.database_name: self}
storage = db.storage
if IMVCCStorage.providedBy(storage):
# Use a connection-specific storage instance.
self._mvcc_storage = True
storage = storage.new_instance()
storage = db._mvcc_storage
if before:
storage = storage.before_instance(before)
else:
self._mvcc_storage = False
storage = storage.new_instance()
self._normal_storage = self._storage = storage
self.new_oid = db.new_oid
......@@ -138,17 +135,17 @@ class Connection(ExportImport, object):
# ReadConflictError (just to be able to clean them up from the
# cache on abort with the other modified objects). All objects
# of this list are either in _cache or in _added.
self._registered_objects = []
self._registered_objects = [] # [object]
# ids and serials of objects for which readCurrent was called
# in a transaction.
self._readCurrent = {}
self._readCurrent = {} # {oid ->serial}
# Dict of oid->obj added explicitly through add(). Used as a
# preliminary cache until commit time when objects are all moved
# to the real _cache. The objects are moved to _creating at
# commit time.
self._added = {}
self._added = {} # {oid -> object}
# During commit this is turned into a list, which receives
# objects added as a side-effect of storing a modified object.
......@@ -162,32 +159,11 @@ class Connection(ExportImport, object):
# adding. Used during abort to remove created objects from the
# _cache, and by persistent_id to check that a new object isn't
# reachable from multiple databases.
self._creating = {}
self._creating = {} # {oid -> implicitly_added_flag}
# List of oids of modified objects, which have to be invalidated
# in the cache on abort and in other connections on finish.
self._modified = []
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
# another is processing invalidations. All the invalidations
# from a single transaction should be applied atomically, so
# the lock must be held when reading _invalidated.
# It sucks that we have to hold the lock to read _invalidated.
# Normally, _invalidated is written by calling dict.update, which
# will execute atomically by virtue of the GIL. But some storage
# might generate oids where hash or compare invokes Python code. In
# that case, the GIL can't save us.
# Note: since that was written, it was officially declared that the
# type of an oid is str. TODO: remove the related now-unnecessary
# critical sections (if any -- this needs careful thought).
self._inv_lock = utils.Lock()
self._invalidated = set()
# Flag indicating whether the cache has been invalidated:
self._invalidatedCache = False
self._modified = [] # [oid]
# We intend to prevent committing a transaction in which
# ReadConflictError occurs. _conflicts is the set of oids that
......@@ -198,12 +174,6 @@ class Connection(ExportImport, object):
# _conflicts).
self._conflicts = {}
# _txn_time stores the upper bound on transactions visible to
# this connection. That is, all object revisions must be
# written before _txn_time. If it is None, then the current
# revisions are acceptable.
self._txn_time = None
# To support importFile(), implemented in the ExportImport base
# class, we need to run _importDuringCommit() from our commit()
# method. If _import is not None, it is a two-tuple of arguments
......@@ -252,17 +222,7 @@ class Connection(ExportImport, object):
if obj is not None:
return obj
before = self.before
if before is None:
# Normal case
before = self._txn_time
data = self._storage.loadBefore(oid, before)
if data is None:
# see the comment in setstate
raise ReadConflictError()
p, _, _ = data
p, _ = self._storage.load(oid, '')
obj = self._reader.getGhost(p)
# Avoid infiniate loop if obj tries to load its state before
......@@ -320,9 +280,6 @@ class Connection(ExportImport, object):
if self.opened:
self.transaction_manager.unregisterSynch(self)
if self._mvcc_storage:
self._storage.sync(force=False)
if primary:
for connection in self.connections.values():
if connection is not self:
......@@ -353,22 +310,6 @@ class Connection(ExportImport, object):
raise ConnectionStateError("The database connection is closed")
return self.before is not None or self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
if self.before is not None:
# This is a historical connection. Invalidations are irrelevant.
return
with self._inv_lock:
self._invalidated.update(oids)
def invalidateCache(self):
self._inv_lock.acquire()
try:
self._invalidatedCache = True
finally:
self._inv_lock.release()
@property
def root(self):
"""Return the database root object."""
......@@ -545,9 +486,6 @@ class Connection(ExportImport, object):
self._added_during_commit = []
if self._invalidatedCache:
raise ConflictError()
for obj in self._registered_objects:
oid = obj._p_oid
assert oid
......@@ -559,10 +497,6 @@ class Connection(ExportImport, object):
elif oid in self._added:
assert obj._p_serial == z64
elif obj._p_changed:
if oid in self._invalidated:
resolve = getattr(obj, "_p_resolveConflict", None)
if resolve is None:
raise ConflictError(object=obj)
self._modified.append(oid)
else:
# Nothing to do. It's been said that it's legal, e.g., for
......@@ -600,9 +534,6 @@ class Connection(ExportImport, object):
self._creating[oid] = implicitly_adding
else:
if (oid in self._invalidated
and not hasattr(obj, '_p_resolveConflict')):
raise ConflictError(object=obj)
self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj
......@@ -739,21 +670,11 @@ class Connection(ExportImport, object):
self._handle_serial(oid, serial)
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
def callback(tid):
if self._mvcc_storage:
# Inter-connection invalidation is not needed when the
# storage provides MVCC.
return
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
self._storage.tpc_finish(transaction, callback)
"""Indicate confirmation that the transaction is done.
"""
# XXX someday, we'll care about the *real* tid we get back via
# the callback (that we're no longer calling).
self._storage.tpc_finish(transaction)
self._tpc_cleanup()
def sortKey(self):
......@@ -773,67 +694,16 @@ class Connection(ExportImport, object):
def newTransaction(self, transaction, sync=True):
self._readCurrent.clear()
if self._mvcc_storage:
# XXX It's not clear if we should make calling sync
# dependent on the sync flag here. RelStorage's sync semantics
# are a bit different, plus RelStorage doesn't actually
# work with this code. :/ This will probably be sorted when
# we do:
# https://groups.google.com/forum/#!topic/zodb/QJYmvF0eUUM
self._storage.sync(True)
# Poll the storage for invalidations.
mvc_invalidated = self._storage.poll_invalidations()
if mvc_invalidated is None:
try:
self._storage.sync(sync)
invalidated = self._storage.poll_invalidations()
if invalidated is None:
# special value: the transaction is so old that
# we need to flush the whole cache.
self._invalidatedCache = True
else:
if sync:
getattr(self._storage, 'sync', noop)()
mvc_invalidated = None
if self.opened:
self._txn_time = p64(u64(self._storage.lastTransaction()) + 1)
with self._inv_lock:
# Non-ghostifiable objects may need to read when they are
# invalidated, so we'll quickly just replace the
# invalidating dict with a new one. We'll then process
# the invalidations after freeing the lock *and* after
# resetting the time. This means that invalidations will
# happen after the start of the transactions. They are
# subject to conflict errors and to reading old data.
# TODO: There is a potential problem lurking for persistent
# classes. Suppose we have an invalidation of a persistent
# class and of an instance. If the instance is
# invalidated first and if the invalidation logic uses
# data read from the class, then the invalidation could
# be performed with stale data. Or, suppose that there
# are instances of the class that are freed as a result of
# invalidating some object. Perhaps code in their __del__
# uses class data. Really, the only way to properly fix
# this is to, in fact, make classes ghostifiable. Then
# we'd have to reimplement attribute lookup to check the
# class state and, if necessary, activate the class. It's
# much worse than that though, because we'd also need to
# deal with slots. When a class is ghostified, we'd need
# to replace all of the slot operations with versions that
# reloaded the object when called. It's hard to say which
# is better or worse. For now, it seems the risk of
# using a class while objects are being invalidated seems
# small enough to be acceptable.
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy()
else:
invalidated = dict.fromkeys(self._invalidated)
self._invalidated = set()
if mvc_invalidated:
self._cache.invalidate(mvc_invalidated)
self._cache.invalidate(invalidated)
except AttributeError:
assert self._storage is None
# Now is a good time to collect some garbage.
self._cache.incrgc()
......@@ -877,24 +747,8 @@ class Connection(ExportImport, object):
raise
try:
p, serial = self._storage.load(oid, '')
before = self.before
if before is None:
# Normal case
if self._invalidatedCache:
raise ReadConflictError()
before = self._txn_time
data = self._storage.loadBefore(oid, before)
if data is None:
# We had data (by definition, since we have a
# reference to it), but it's gone. It must have
# updated since this transaction, and been packed
# away, cuz the tests are mean. The best we can do is
# raise a ReadConflictError and try again
raise ReadConflictError()
p, serial, _ = data
self._load_count += 1
self._reader.setGhostState(obj, p)
......@@ -1033,8 +887,6 @@ class Connection(ExportImport, object):
See the docstring for the resetCaches() function.
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
self._invalidatedCache = False
cache_size = self._cache.cache_size
cache_size_bytes = self._cache.cache_size_bytes
self._cache = cache = PickleCache(self, cache_size, cache_size_bytes)
......@@ -1042,9 +894,7 @@ class Connection(ExportImport, object):
self._reader._cache = cache
def _release_resources(self):
assert not self.opened
for c in six.itervalues(self.connections):
if c._mvcc_storage:
if c._storage is not None:
c._storage.release()
c._storage = c._normal_storage = None
......@@ -1089,7 +939,7 @@ class Connection(ExportImport, object):
def savepoint(self):
if self._savepoint_storage is None:
tmpstore = TmpStore(self._normal_storage, self._txn_time)
tmpstore = TmpStore(self._normal_storage)
self._savepoint_storage = tmpstore
self._storage = self._savepoint_storage
......@@ -1111,7 +961,7 @@ class Connection(ExportImport, object):
self.cacheGC()
return result
def _rollback(self, state):
def _rollback_savepoint(self, state):
self._abort()
self._registered_objects = []
src = self._storage
......@@ -1138,7 +988,7 @@ class Connection(ExportImport, object):
self._creating.update(src.creating)
for oid in oids:
data, serial, _ = src.loadBefore(oid, self._txn_time)
data, serial = src.load(oid)
obj = self._cache.get(oid, None)
if obj is not None:
self._cache.update_object_size_estimation(
......@@ -1153,7 +1003,7 @@ class Connection(ExportImport, object):
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
# to be reattached "cleanly"
self.invalidate(None, (oid, ))
self._cache.invalidate(oid)
else:
s = self._storage.store(oid, serial, data,
'', transaction)
......@@ -1195,26 +1045,29 @@ class Connection(ExportImport, object):
# Savepoint support
#####################################################################
@implementer(IDataManagerSavepoint)
class Savepoint:
def __init__(self, datamanager, state):
self.datamanager = datamanager
self.state = state
def rollback(self):
self.datamanager._rollback(self.state)
self.datamanager._rollback_savepoint(self.state)
@implementer(IBlobStorage)
class TmpStore:
"""A storage-like thing to support savepoints."""
def __init__(self, storage, before):
def __init__(self, storage):
self._storage = storage
self._before = before
for method in 'getName', 'new_oid', 'getSize', 'sortKey', 'isReadOnly':
for method in (
'getName', 'new_oid', 'getSize', 'sortKey',
'isReadOnly'
):
setattr(self, method, getattr(storage, method))
self._file = tempfile.TemporaryFile(prefix='TmpStore')
......@@ -1236,11 +1089,10 @@ class TmpStore:
remove_committed_dir(self._blob_dir)
self._blob_dir = None
def loadBefore(self, oid, before):
assert before == self._before
def load(self, oid, version=''):
pos = self.index.get(oid)
if pos is None:
return self._storage.loadBefore(oid, before)
return self._storage.load(oid, '')
self._file.seek(pos)
h = self._file.read(8)
oidlen = u64(h)
......@@ -1250,7 +1102,7 @@ class TmpStore:
h = self._file.read(16)
size = u64(h[8:])
serial = h[:8]
return self._file.read(size), serial, None
return self._file.read(size), serial
def store(self, oid, serial, data, version, transaction):
# we have this funny signature so we can reuse the normal non-commit
......@@ -1331,6 +1183,7 @@ class TmpStore:
self.index = index.copy()
self.creating = creating
class RootConvenience(object):
def __init__(self, root):
......
......@@ -401,12 +401,6 @@ class DB(object):
- `xrefs` - Boolian flag indicating whether implicit cross-database
references are allowed
"""
if isinstance(storage, six.string_types):
from ZODB import FileStorage
storage = ZODB.FileStorage.FileStorage(storage, **storage_args)
elif storage is None:
from ZODB import MappingStorage
storage = ZODB.MappingStorage.MappingStorage(**storage_args)
# Allocate lock.
x = utils.RLock()
......@@ -423,12 +417,24 @@ class DB(object):
self._historical_cache_size_bytes = historical_cache_size_bytes
# Setup storage
if isinstance(storage, six.string_types):
from ZODB import FileStorage
storage = ZODB.FileStorage.FileStorage(storage, **storage_args)
elif storage is None:
from ZODB import MappingStorage
storage = ZODB.MappingStorage.MappingStorage(**storage_args)
else:
assert not storage_args
self.storage = storage
if IMVCCStorage.providedBy(storage):
self._mvcc_storage = storage
else:
from .mvccadapter import MVCCAdapter
self._mvcc_storage = MVCCAdapter(storage)
self.references = ZODB.serialize.referencesf
try:
storage.registerDB(self)
except TypeError:
storage.registerDB(self, None) # Backward compat
if (not hasattr(storage, 'tpc_vote')) and not storage.isReadOnly():
warnings.warn(
......@@ -438,12 +444,10 @@ class DB(object):
DeprecationWarning, 2)
storage.tpc_vote = lambda *args: None
if IMVCCStorage.providedBy(storage):
temp_storage = storage.new_instance()
else:
temp_storage = storage
temp_storage = self._mvcc_storage.new_instance()
try:
try:
temp_storage.poll_invalidations()
temp_storage.load(z64, '')
except KeyError:
# Create the database's root in the storage if it doesn't exist
......@@ -462,7 +466,6 @@ class DB(object):
temp_storage.tpc_vote(t)
temp_storage.tpc_finish(t)
finally:
if IMVCCStorage.providedBy(temp_storage):
temp_storage.release()
# Multi-database setup.
......@@ -634,16 +637,13 @@ class DB(object):
@self._connectionMap
def _(c):
if c.opened:
c.transaction_manager.abort()
# Note that this will modify our pool, but this is safe, because
# _connectionMap makes a list of the pool to iterate over
c.close()
c.afterCompletion = c.newTransaction = c.close = noop
c._release_resources()
self.storage.close()
self._mvcc_storage.close()
del self.storage
del self._mvcc_storage
def getCacheSize(self):
return self._cache_size
......@@ -675,27 +675,6 @@ class DB(object):
def getHistoricalTimeout(self):
return self.historical_pool.timeout
def invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
change to the object. The connection commiting the change should be
passed in to prevent useless (but harmless) messages to the
connection.
"""
# Storages, esp. ZEO tests, need the version argument still. :-/
assert version==''
# Notify connections.
def inval(c):
if c is not connection:
c.invalidate(tid, oids)
self._connectionMap(inval)
def invalidateCache(self):
"""Invalidate each of the connection caches
"""
self._connectionMap(lambda c: c.invalidateCache())
transform_record_data = untransform_record_data = lambda self, data: data
def objectCount(self):
......@@ -1001,9 +980,9 @@ class TransactionalUndo(object):
def __init__(self, db, tids):
self._db = db
self._storage = db.storage
self._storage = getattr(
db._mvcc_storage, 'undo_instance', db._mvcc_storage.new_instance)()
self._tids = tids
self._oids = set()
def abort(self, transaction):
pass
......@@ -1013,19 +992,13 @@ class TransactionalUndo(object):
def commit(self, transaction):
for tid in self._tids:
result = self._storage.undo(tid, transaction)
if result:
self._oids.update(result[1])
self._storage.undo(tid, transaction)
def tpc_vote(self, transaction):
for oid, _ in self._storage.tpc_vote(transaction) or ():
self._oids.add(oid)
self._storage.tpc_vote(transaction)
def tpc_finish(self, transaction):
self._storage.tpc_finish(
transaction,
lambda tid: self._db.invalidate(tid, self._oids)
)
self._storage.tpc_finish(transaction)
def tpc_abort(self, transaction):
self._storage.tpc_abort(transaction)
......
......@@ -273,8 +273,6 @@ test.
>>> conn.root()['first']['count'] += 1
>>> conn.root()['third'] = persistent.mapping.PersistentMapping()
>>> transaction.commit()
>>> len(historical_conn._invalidated)
0
>>> historical_conn.close()
Note that if you try to open an historical connection to a time in the future,
......
......@@ -201,21 +201,6 @@ class IConnection(Interface):
def isReadOnly():
"""Returns True if the storage for this connection is read only."""
def invalidate(tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects are accessed by the
current transaction, the revision written before Connection.tid will be
used.
The DB calls this method, even when the Connection is closed.
Parameters:
tid: the storage-level id of the transaction that committed
oids: oids is an iterable of oids.
"""
def root():
"""Return the database root object.
......@@ -278,14 +263,6 @@ class IConnection(Interface):
If clear is True, reset the counters.
"""
def invalidateCache():
"""Invalidate the connection cache
This invalidates *all* objects in the cache. If the connection
is open, subsequent reads will fail until a new transaction
begins or until the connection os reopned.
"""
def readCurrent(obj):
"""Make sure an object being read is current
......@@ -456,7 +433,7 @@ class IDatabase(IStorageDB):
"""
class IStorage(Interface):
class IStorageData(Interface):
"""A storage is responsible for storing and retrieving data of objects.
Consistency and locking
......@@ -490,90 +467,6 @@ class IStorage(Interface):
"""
def close():
"""Close the storage.
Finalize the storage, releasing any external resources. The
storage should not be used after this method is called.
"""
def getName():
"""The name of the storage
The format and interpretation of this name is storage
dependent. It could be a file name, a database name, etc..
This is used soley for informational purposes.
"""
def getSize():
"""An approximate size of the database, in bytes.
This is used soley for informational purposes.
"""
def history(oid, size=1):
"""Return a sequence of history information dictionaries.
Up to size objects (including no objects) may be returned.
The information provides a log of the changes made to the
object. Data are reported in reverse chronological order.
Each dictionary has the following keys:
time
UTC seconds since the epoch (as in time.time) that the
object revision was committed.
tid
The transaction identifier of the transaction that
committed the version.
serial
An alias for tid, which expected by older clients.
user_name
The user identifier, if any (or an empty string) of the
user on whos behalf the revision was committed.
description
The transaction description for the transaction that
committed the revision.
size
The size of the revision data record.
If the transaction had extension items, then these items are
also included if they don't conflict with the keys above.
"""
def isReadOnly():
"""Test whether a storage allows committing new transactions
For a given storage instance, this method always returns the
same value. Read-only-ness is a static property of a storage.
"""
# XXX Note that this method doesn't really buy us much,
# especially since we have to account for the fact that a
# ostensibly non-read-only storage may be read-only
# transiently. It would be better to just have read-only errors.
def lastTransaction():
"""Return the id of the last committed transaction.
If no transactions have been committed, return a string of 8
null (0) characters.
"""
def __len__():
"""The approximate number of objects in the storage
This is used soley for informational purposes.
"""
def load(oid, version):
"""Load data for an object id
......@@ -625,19 +518,6 @@ class IStorage(Interface):
otherwise, POSKeyError is raised.
"""
# The following two methods are effectively part of the interface,
# as they are generally needed when one storage wraps
# another. This deserves some thought, at probably debate, before
# adding them.
#
# def _lock_acquire():
# """Acquire the storage lock
# """
# def _lock_release():
# """Release the storage lock
# """
def new_oid():
"""Allocate a new object id.
......@@ -647,54 +527,6 @@ class IStorage(Interface):
The return value is a string.
"""
def pack(pack_time, referencesf):
"""Pack the storage
It is up to the storage to interpret this call, however, the
general idea is that the storage free space by:
- discarding object revisions that were old and not current as of the
given pack time.
- garbage collecting objects that aren't reachable from the
root object via revisions remaining after discarding
revisions that were not current as of the pack time.
The pack time is given as a UTC time in seconds since the
epoch.
The second argument is a function that should be used to
extract object references from database records. This is
needed to determine which objects are referenced from object
revisions.
"""
def registerDB(wrapper):
"""Register a storage wrapper IStorageWrapper.
The passed object is a wrapper object that provides an upcall
interface to support composition.
Note that, for historical reasons, an implementation may
require a second argument, however, if required, the None will
be passed as the second argument.
Also, for historical reasons, this is called registerDB rather
than register_wrapper.
"""
def sortKey():
"""Sort key used to order distributed transactions
When a transaction involved multiple storages, 2-phase commit
operations are applied in sort-key order. This must be unique
among storages used in a transaction. Obviously, the storage
can't assure this, but it should construct the sort key so it
has a reasonable chance of being unique.
The result must be a string.
"""
def store(oid, serial, data, version, transaction):
"""Store data for the object id, oid.
......@@ -818,6 +650,142 @@ class IStorage(Interface):
"""
class IStorageMetaData(Interface):
def close():
"""Close the storage.
Finalize the storage, releasing any external resources. The
storage should not be used after this method is called.
"""
def getName():
"""The name of the storage
The format and interpretation of this name is storage
dependent. It could be a file name, a database name, etc..
This is used soley for informational purposes.
"""
def getSize():
"""An approximate size of the database, in bytes.
This is used soley for informational purposes.
"""
def history(oid, size=1):
"""Return a sequence of history information dictionaries.
Up to size objects (including no objects) may be returned.
The information provides a log of the changes made to the
object. Data are reported in reverse chronological order.
Each dictionary has the following keys:
time
UTC seconds since the epoch (as in time.time) that the
object revision was committed.
tid
The transaction identifier of the transaction that
committed the version.
serial
An alias for tid, which expected by older clients.
user_name
The user identifier, if any (or an empty string) of the
user on whos behalf the revision was committed.
description
The transaction description for the transaction that
committed the revision.
size
The size of the revision data record.
If the transaction had extension items, then these items are
also included if they don't conflict with the keys above.
"""
def isReadOnly():
"""Test whether a storage allows committing new transactions
For a given storage instance, this method always returns the
same value. Read-only-ness is a static property of a storage.
"""
# XXX Note that this method doesn't really buy us much,
# especially since we have to account for the fact that a
# ostensibly non-read-only storage may be read-only
# transiently. It would be better to just have read-only errors.
def lastTransaction():
"""Return the id of the last committed transaction.
If no transactions have been committed, return a string of 8
null (0) characters.
"""
def __len__():
"""The approximate number of objects in the storage
This is used soley for informational purposes.
"""
def pack(pack_time, referencesf):
"""Pack the storage
It is up to the storage to interpret this call, however, the
general idea is that the storage free space by:
- discarding object revisions that were old and not current as of the
given pack time.
- garbage collecting objects that aren't reachable from the
root object via revisions remaining after discarding
revisions that were not current as of the pack time.
The pack time is given as a UTC time in seconds since the
epoch.
The second argument is a function that should be used to
extract object references from database records. This is
needed to determine which objects are referenced from object
revisions.
"""
def registerDB(wrapper):
"""Register a storage wrapper IStorageWrapper.
The passed object is a wrapper object that provides an upcall
interface to support composition.
Note that, for historical reasons, an implementation may
require a second argument, however, if required, the None will
be passed as the second argument.
Also, for historical reasons, this is called registerDB rather
than register_wrapper.
"""
def sortKey():
"""Sort key used to order distributed transactions
When a transaction involved multiple storages, 2-phase commit
operations are applied in sort-key order. This must be unique
among storages used in a transaction. Obviously, the storage
can't assure this, but it should construct the sort key so it
has a reasonable chance of being unique.
The result must be a string.
"""
class IStorage(IStorageMetaData, IStorageData):
pass
class IStorageRestoreable(IStorage):
"""Copying Transactions
......@@ -1110,11 +1078,7 @@ class IMVCCStorage(IStorage):
"""
def release():
"""Release all persistent sessions used by this storage instance.
After this call, the storage instance can still be used;
calling methods that use persistent sessions will cause the
persistent sessions to be reopened.
"""Release resources held by
"""
def poll_invalidations():
......
"""Adapt IStorage objects to IMVCCStorage
This is a largely internal implementation of ZODB, especially DB and
Connection. It takes the MVCC implementation involving invalidations
and start time and moves it into a storage adapter. This allows ZODB
to treat Relstoage and other storages in pretty much the same way and
also simplifies the implementation of the DB and Connection classes.
"""
import threading
import zope.interface
from . import interfaces, serialize, POSException
from .utils import p64, u64
class Base(object):
_copy_methods = (
'getName', 'getSize', 'history', 'lastTransaction', 'sortKey',
'loadBlob', 'openCommittedBlobFile',
)
def __init__(self, storage):
self._storage = storage
for name in self._copy_methods:
if hasattr(self._storage, name):
setattr(self, name, getattr(storage, name))
if interfaces.IBlobStorage.providedBy(storage):
zope.interface.alsoProvides(self, interfaces.IBlobStorage)
def isReadOnly(self):
return self._storage.isReadOnly()
def supportsUndo(self):
try:
return self._storage.supportsUndo()
except AttributeError:
return False
def undoLog(self, first, last, filter=None):
try:
return self._storage.undoLog(first, last, filter)
except AttributeError:
return False
def undoInfo(self, first=0, last=20, specification=None):
try:
return self._storage.undoInfo(first, last, specification)
except AttributeError:
return False
def temporaryDirectory(self):
return self._storage.temporaryDirectory()
def __len__():
return len(self._storage)
class MVCCAdapter(Base):
def __init__(self, storage):
Base.__init__(self, storage)
self._instances = set()
if hasattr(storage, 'registerDB'):
storage.registerDB(self)
def new_instance(self):
instance = MVCCAdapterInstance(self)
self._instances.add(instance)
return instance
def before_instance(self, before=None):
return BeforeAdapterInstance(self, before)
def undo_instance(self):
return UndoAdapterInstance(self)
def _release(self, instance):
self._instances.remove(instance)
closed = False
def close(self):
if not self.closed:
self.closed = True
self._storage.close()
del self._instances
del self._storage
def invalidateCache(self):
for instance in self._instances:
instance._invalidateCache()
def invalidate(self, transaction_id, oids, version=''):
for instance in self._instances:
instance._invalidate(oids)
references = serialize.referencesf
transform_record_data = untransform_record_data = lambda self, data: data
def _invalidate_finish(self, oids, committing_instance):
for instance in self._instances:
if instance is not committing_instance:
instance._invalidate(oids)
def pack(self, pack_time, referencesf):
return self._storage.pack(pack_time, referencesf)
class MVCCAdapterInstance(Base):
_copy_methods = Base._copy_methods + (
'isReadOnly', 'loadSerial', 'new_oid', 'tpc_vote',
'checkCurrentSerialInTransaction', 'tpc_abort',
)
def __init__(self, base):
self._base = base
Base.__init__(self, base._storage)
self._lock = threading.Lock()
self._invalidations = set()
self._start = None # Transaction start time
self._sync = getattr(self._storage, 'sync', lambda : None)
def release(self):
self._base._release(self)
close = release
def _invalidateCache(self):
with self._lock:
self._invalidations = None
def _invalidate(self, oids):
with self._lock:
try:
self._invalidations.update(oids)
except AttributeError:
if self._invalidations is not None:
raise
def sync(self, force=True):
if force:
self._sync()
def poll_invalidations(self):
self._start = p64(u64(self._storage.lastTransaction()) + 1)
with self._lock:
if self._invalidations is None:
self._invalidations = set()
return None
else:
result = list(self._invalidations)
self._invalidations.clear()
return result
def load(self, oid, version=''):
assert self._start is not None
r = self._storage.loadBefore(oid, self._start)
if r is None:
raise POSException.ReadConflictError(repr(oid))
return r[:2]
def tpc_begin(self, transaction):
self._storage.tpc_begin(transaction)
self._modified = set()
def store(self, oid, serial, data, version, transaction):
s = self._storage.store(oid, serial, data, version, transaction)
self._modified.add(oid)
return s
def storeBlob(self, oid, serial, data, blobfilename, version, transaction):
s = self._storage.storeBlob(
oid, serial, data, blobfilename, '', transaction)
self._modified.add(oid)
return s
def tpc_finish(self, transaction, func = lambda tid: None):
def invalidate_finish(tid):
self._base._invalidate_finish(self._modified, self)
func(tid)
del self._modified
self._storage.tpc_finish(transaction, invalidate_finish)
def read_only_writer(self, *a, **kw):
raise POSException.ReadOnlyError
class BeforeAdapterInstance(Base):
_copy_methods = Base._copy_methods + (
'loadSerial', 'tpc_begin', 'tpc_finish', 'tpc_abort', 'tpc_vote',
'checkCurrentSerialInTransaction',
)
def __init__(self, base, before=None):
Base.__init__(self, base._storage)
self._before = before
def isReadOnly(self):
return True
def supportsUndo(self):
return False
def release(self):
pass
close = release
def sync(self, force=True):
pass
def poll_invalidations(self):
return []
new_oid = pack = store = read_only_writer
def load(self, oid, version=''):
r = self._storage.loadBefore(oid, self._before)
if r is None:
raise POSException.POSKeyError(oid)
return r[:2]
class UndoAdapterInstance(Base):
_copy_methods = Base._copy_methods + (
'tpc_abort',
)
def __init__(self, base):
self._base = base
Base.__init__(self, base._storage)
def release(self):
pass
close = release
def tpc_begin(self, transaction):
self._storage.tpc_begin(transaction)
self._undone = set()
def undo(self, transaction_id, transaction):
result = self._storage.undo(transaction_id, transaction)
if result:
self._undone.update(result[1])
return result
def tpc_vote(self, transaction):
result = self._storage.tpc_vote(transaction)
if result:
self._undone.update(result[1])
def tpc_finish(self, transaction, func = lambda tid: None):
def invalidate_finish(tid):
self._base._invalidate_finish(self._undone, None)
func(tid)
self._storage.tpc_finish(transaction, invalidate_finish)
......@@ -74,7 +74,7 @@ You can't put blobs into a database that has uses a Non-Blob-Storage, though:
>>> transaction2.commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
Unsupported: Storing Blobs in <ZODB.MappingStorage.MappingStorage object at ...> is not supported.
Unsupported: Storing Blobs in ...
>>> transaction2.abort()
>>> connection2.close()
......
......@@ -27,6 +27,8 @@ from persistent import Persistent
from zope.interface.verify import verifyObject
from zope.testing import loggingsupport, renormalizing
from .. import mvccadapter
checker = renormalizing.RENormalizing([
# Python 3 bytes add a "b".
(re.compile("b('.*?')"), r"\1"),
......@@ -154,7 +156,8 @@ class ConnectionDotAdd(ZODB.tests.util.TestCase):
self.datamgr.add(obj)
self.datamgr.tpc_begin(self.transaction)
self.datamgr.tpc_finish(self.transaction)
self.assertTrue(obj._p_oid not in self.datamgr._storage._stored)
self.assertTrue(obj._p_oid not in
self.datamgr._storage._storage._stored)
def test__resetCacheResetsReader(self):
# https://bugs.launchpad.net/zodb/+bug/142667
......@@ -435,8 +438,11 @@ class UserMethodTests(unittest.TestCase):
...
ConnectionStateError: The database connection is closed
>>> db.close()
An expedient way to create a read-only storage:
>>> db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>")
>>> db.storage.isReadOnly = lambda: True
>>> cn = db.open()
>>> cn.isReadOnly()
......@@ -510,7 +516,9 @@ class InvalidationTests(unittest.TestCase):
they have the expected effect.
>>> db = databaseFromString("<zodb>\n<mappingstorage/>\n</zodb>")
>>> mvcc_storage = db._mvcc_storage
>>> cn = db.open()
>>> mvcc_instance = cn._storage
>>> p1 = Persistent()
>>> p2 = Persistent()
>>> p3 = Persistent()
......@@ -521,29 +529,18 @@ class InvalidationTests(unittest.TestCase):
Transaction ids are 8-byte strings, just like oids; p64() will
create one from an int.
>>> cn.invalidate(p64(1), {p1._p_oid: 1})
>>> mvcc_storage.invalidate(p64(1), {p1._p_oid: 1})
Transaction start times are based on storage's last
transaction. (Previousely, they were based on the first
invalidation seen in a transaction.)
>>> cn._txn_time == p64(u64(db.storage.lastTransaction()) + 1)
>>> mvcc_instance.poll_invalidations() == [p1._p_oid]
True
>>> p1._p_oid in cn._invalidated
True
>>> p2._p_oid in cn._invalidated
False
>>> cn.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
>>> cn._txn_time == p64(u64(db.storage.lastTransaction()) + 1)
>>> mvcc_instance._start == p64(u64(db.storage.lastTransaction()) + 1)
True
>>> p1._p_oid in cn._invalidated
True
>>> p2._p_oid in cn._invalidated
True
>>> mvcc_storage.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
Calling invalidate() doesn't affect the object state until
a transaction boundary.
......@@ -560,24 +557,24 @@ class InvalidationTests(unittest.TestCase):
>>> cn.sync()
>>> p1._p_state
-1
0
>>> p2._p_state
-1
>>> p3._p_state
0
>>> cn._invalidated
set([])
>>> db.close()
"""
def doctest_invalidateCache():
"""The invalidateCache method invalidates a connection's cache. It also
prevents reads until the end of a transaction::
"""The invalidateCache method invalidates a connection's cache.
It also prevents reads until the end of a transaction::
>>> from ZODB.tests.util import DB
>>> import transaction
>>> db = DB()
>>> mvcc_storage = db._mvcc_storage
>>> tm = transaction.TransactionManager()
>>> connection = db.open(transaction_manager=tm)
>>> connection.root()['a'] = StubObject()
......@@ -593,53 +590,33 @@ def doctest_invalidateCache():
So we have a connection and an active transaction with some modifications.
Lets call invalidateCache:
>>> connection.invalidateCache()
>>> mvcc_storage.invalidateCache()
Now, if we try to load an object, we'll get a read conflict:
This won't have any effect until the next transaction:
>>> connection.root()['b'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
If we try to commit the transaction, we'll get a conflict error:
>>> tm.commit()
Traceback (most recent call last):
...
ConflictError: database conflict error
>>> connection.root()['a']._p_changed
0
>>> connection.root()['b']._p_changed
>>> connection.root()['c']._p_changed
1
and the cache will have been cleared:
But if we sync():
>>> print(connection.root()['a']._p_changed)
None
>>> print(connection.root()['b']._p_changed)
None
>>> print(connection.root()['c']._p_changed)
None
>>> connection.sync()
But we'll be able to access data again:
All of our data was invalidated:
>>> connection.root()['b'].x
1
>>> connection.root()['a']._p_changed
>>> connection.root()['b']._p_changed
>>> connection.root()['c']._p_changed
Aborting a transaction after a read conflict also lets us read data and go
on about our business:
But we can load data as usual:
>>> connection.invalidateCache()
>>> connection.root()['c'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
Now, if we try to load an object, we'll get a read conflict:
>>> tm.abort()
>>> connection.root()['c'].x
>>> connection.root()['b'].x
1
>>> connection.root()['c'].x = 2
>>> tm.commit()
>>> db.close()
"""
......@@ -1333,6 +1310,7 @@ class StubDatabase:
def __init__(self):
self.storage = StubStorage()
self._mvcc_storage = mvccadapter.MVCCAdapter(self.storage)
self.new_oid = self.storage.new_oid
classFactory = None
......
......@@ -83,34 +83,40 @@ def test_invalidateCache():
>>> from ZODB.tests.util import DB
>>> import transaction
>>> db = DB()
>>> mvcc_storage = db._mvcc_storage
>>> tm1 = transaction.TransactionManager()
>>> c1 = db.open(transaction_manager=tm1)
>>> c1.root()['a'] = MinPO(1)
>>> tm1.commit()
>>> tm2 = transaction.TransactionManager()
>>> c2 = db.open(transaction_manager=tm2)
>>> c1.root()['a']._p_deactivate()
>>> c2.root()['a'].value
1
>>> tm3 = transaction.TransactionManager()
>>> c3 = db.open(transaction_manager=tm3)
>>> c3.root()['a'].value
1
>>> c3.close()
>>> db.invalidateCache()
>>> c1.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> c2.root()['a'].value
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> mvcc_storage.invalidateCache()
>>> c1.root.a._p_changed
0
>>> c1.sync()
>>> c1.root.a._p_changed
>>> c2.root.a._p_changed
0
>>> c2.sync()
>>> c2.root.a._p_changed
>>> c3 is db.open(transaction_manager=tm3)
True
>>> print(c3.root()['a']._p_changed)
None
>>> c3.root.a._p_changed
>>> c1.root()['a'].value
1
>>> c2.root()['a'].value
1
>>> c3.root()['a'].value
1
>>> db.close()
"""
......
......@@ -547,14 +547,13 @@ def loadblob_tmpstore():
>>> transaction.commit()
>>> blob_oid = root['blob']._p_oid
>>> tid = connection._storage.lastTransaction()
>>> _txn_time = connection._txn_time
Now we open a database with a TmpStore in front:
>>> database.close()
>>> from ZODB.Connection import TmpStore
>>> tmpstore = TmpStore(blob_storage, _txn_time)
>>> tmpstore = TmpStore(blob_storage)
We can access the blob correctly:
......
......@@ -68,9 +68,9 @@ Now open a second connection.
>>> tm2 = transaction.TransactionManager()
>>> cn2 = db.open(transaction_manager=tm2)
>>> from ZODB.utils import p64, u64
>>> cn2._txn_time == p64(u64(st.lastTransaction()) + 1)
>>> cn2._storage._start == p64(u64(st.lastTransaction()) + 1)
True
>>> txn_time2 = cn2._txn_time
>>> txn_time2 = cn2._storage._start
Connection high-water mark
--------------------------
......@@ -85,13 +85,13 @@ storage has seen.
>>> cn = db.open()
>>> cn._txn_time == p64(u64(st.lastTransaction()) + 1)
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
>>> cn.invalidate(100, dict.fromkeys([1, 2]))
>>> cn._txn_time == p64(u64(st.lastTransaction()) + 1)
>>> cn.db()._mvcc_storage.invalidate(100, dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
>>> cn.invalidate(200, dict.fromkeys([1, 2]))
>>> cn._txn_time == p64(u64(st.lastTransaction()) + 1)
>>> cn.db()._mvcc_storage.invalidate(200, dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
A connection's high-water mark is set to the transaction id taken from
......@@ -105,7 +105,7 @@ but that doesn't work unless an object is modified. sync() will abort
a transaction and process invalidations.
>>> cn.sync()
>>> cn._txn_time == p64(u64(st.lastTransaction()) + 1)
>>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True
Basic functionality
......@@ -121,14 +121,14 @@ will modify "a." The other transaction will then modify "b" and commit.
The second connection already has its high-water mark set.
>>> cn2._txn_time == txn_time2
>>> cn2._storage._start == txn_time2
True
It is safe to read "b," because it was not modified by the concurrent
transaction.
>>> r2 = cn2.root()
>>> r2["b"]._p_serial < cn2._txn_time
>>> r2["b"]._p_serial < cn2._storage._start
True
>>> r2["b"].value
1
......@@ -140,7 +140,7 @@ non-current version.
>>> r2["a"].value
1
>>> r2["a"]._p_serial < cn2._txn_time
>>> r2["a"]._p_serial < cn2._storage._start
True
We can confirm that we have a non-current revision by asking the
......@@ -153,32 +153,32 @@ It's possible to modify "a", but we get a conflict error when we
commit the transaction.
>>> r2["a"].value = 3
>>> tm2.get().commit()
>>> tm2.get().commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ConflictError: database conflict error (oid 0x01, class ZODB.tests.MinPO.MinPO)
ConflictError: database conflict error (oid 0x01, class ZODB.tests.MinPO...
>>> tm2.get().abort()
This example will demonstrate that we can commit a transaction if we only
modify current revisions.
>>> cn2._txn_time == p64(u64(st.lastTransaction()) + 1)
>>> cn2._storage._start == p64(u64(st.lastTransaction()) + 1)
True
>>> txn_time2 = cn2._txn_time
>>> txn_time2 = cn2._storage._start
>>> r1 = cn1.root()
>>> r1["a"].value = 3
>>> tm1.get().commit()
>>> txn = db.lastTransaction()
>>> cn2._txn_time == txn_time2
>>> cn2._storage._start == txn_time2
True
>>> r2["b"].value = r2["a"].value + 1
>>> r2["b"].value
3
>>> tm2.get().commit()
>>> cn2._txn_time == p64(u64(st.lastTransaction()) + 1)
>>> cn2._storage._start == p64(u64(st.lastTransaction()) + 1)
True
Object cache
......@@ -362,18 +362,18 @@ This test is kinda screwy because it depends on an old approach that
has changed. We'll hack the _txn_time to get the original expected
result, even though what's going on now is much simpler.
>>> cn1._txn_time = ts.lastTransaction()
>>> cn1._storage._start = ts.lastTransaction()
Once the oid is hooked, an invalidation will be delivered the next
time it is activated. The code below activates the object, then
confirms that the hook worked and that the old state was retrieved.
>>> oid in cn1._invalidated
>>> oid in cn1._storage._invalidations
False
>>> r1["b"]._p_state
-1
>>> r1["b"]._p_activate()
>>> oid in cn1._invalidated
>>> oid in cn1._storage._invalidations
True
>>> ts.count
1
......@@ -406,15 +406,15 @@ Again, once the oid is hooked, an invalidation will be delivered the next
time it is activated. The code below activates the object, but unlike the
section above, this is no older state to retrieve.
>>> oid in cn1._invalidated
>>> oid in cn1._storage._invalidations
False
>>> r1["b"]._p_state
-1
>>> cn1._txn_time = ts.lastTransaction()
>>> r1["b"]._p_activate()
>>> cn1._storage._start = ts.lastTransaction()
>>> r1["b"]._p_activate() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
ReadConflictError: ...
>>> db.close()
"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment