Commit ddfe57eb authored by Kirill Smelkov's avatar Kirill Smelkov

MVCCAdapter.load: Raise ReadConflictError only if pack is running simultaneously

Currently when load(oid) finds that the object was deleted, it raises
ReadConflictError - not POSKeyError - because a pack could be running
simultaneously and the deletion could result from the pack. In that case
we want corresponding transaction to be retried - not failed - via
raising ConflictError subclass for backward-compatibility reason.
However from semantic point of view, it is more logically correct to
raise POSKeyError, when an object is found to be deleted or
not-yet-created, and raise ReadConflictError only if a pack was actually
running simultaneously, and the deletion could result from that pack.

-> Fix MVCCAdapter.load to do this - now it raises ReadConflictError
only if MVCCAdapterInstance view appears before storage packtime, which
indicates that there could indeed be conflict in between read access and
pack removing the object.

To detect if pack was running and beyond MVCCAdapterInstance view, we
need to teach storage drivers to provide way to known what was the last
pack time/transaction. Add optional IStorageLastPack interface with
.lastPack() method to do so. If a storage does not implement lastPack,
we take conservative approach and raise ReadConflictError
unconditionally as before.

Add/adapt corresponding tests.

Teach FileStorage, MappingStorage and DemoStorage to implement the new
interface.

NOTE: storages that implement IMVCCStorage natively already raise
POSKeyError - not ReadConflictError - when load(oid) finds deleted
object. This is so because IMVCCStorages natively provide isolation, via
e.g. RDBMS in case of RelStorage. The isolation property provided by
RDBMS guarantees that connection view of the database is not affected by
other changes - e.g. pack - until connection's transaction is complete.

/cc @jimfulton
parent 3a493b01
...@@ -104,14 +104,20 @@ class DemoStorage(ConflictResolvingStorage): ...@@ -104,14 +104,20 @@ class DemoStorage(ConflictResolvingStorage):
self._temporary_changes = True self._temporary_changes = True
changes = ZODB.MappingStorage.MappingStorage() changes = ZODB.MappingStorage.MappingStorage()
zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage) zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage)
zope.interface.alsoProvides(self, ZODB.interfaces.IStorageLastPack)
if close_changes_on_close is None: if close_changes_on_close is None:
close_changes_on_close = False close_changes_on_close = False
else: else:
if ZODB.interfaces.IBlobStorage.providedBy(changes): if ZODB.interfaces.IBlobStorage.providedBy(changes):
zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage) zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage)
if ZODB.interfaces.IStorageLastPack.providedBy(changes):
zope.interface.alsoProvides(self, ZODB.interfaces.IStorageLastPack)
if close_changes_on_close is None: if close_changes_on_close is None:
close_changes_on_close = True close_changes_on_close = True
if ZODB.interfaces.IStorageLastPack.providedBy(self):
self.lastPack = changes.lastPack
self.changes = changes self.changes = changes
self.close_changes_on_close = close_changes_on_close self.close_changes_on_close = close_changes_on_close
......
...@@ -57,6 +57,7 @@ from ZODB.interfaces import IStorageCurrentRecordIteration ...@@ -57,6 +57,7 @@ from ZODB.interfaces import IStorageCurrentRecordIteration
from ZODB.interfaces import IStorageIteration from ZODB.interfaces import IStorageIteration
from ZODB.interfaces import IStorageRestoreable from ZODB.interfaces import IStorageRestoreable
from ZODB.interfaces import IStorageUndoable from ZODB.interfaces import IStorageUndoable
from ZODB.interfaces import IStorageLastPack
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from ZODB.POSException import MultipleUndoErrors from ZODB.POSException import MultipleUndoErrors
from ZODB.POSException import POSKeyError from ZODB.POSException import POSKeyError
...@@ -133,6 +134,7 @@ class TempFormatter(FileStorageFormatter): ...@@ -133,6 +134,7 @@ class TempFormatter(FileStorageFormatter):
IStorageCurrentRecordIteration, IStorageCurrentRecordIteration,
IExternalGC, IExternalGC,
IStorage, IStorage,
IStorageLastPack,
) )
class FileStorage( class FileStorage(
FileStorageFormatter, FileStorageFormatter,
...@@ -145,6 +147,11 @@ class FileStorage( ...@@ -145,6 +147,11 @@ class FileStorage(
# Set True while a pack is in progress; undo is blocked for the duration. # Set True while a pack is in progress; undo is blocked for the duration.
_pack_is_in_progress = False _pack_is_in_progress = False
# last tid used as pack cut-point
# TODO save/load _lastPack with index - for it not to go to z64 on next
# file reopen. Currently lastPack is used only to detect race of load
# wrt simultaneous pack, so not persisting lastPack is practically ok.
_lastPack = z64
def __init__(self, file_name, create=False, read_only=False, stop=None, def __init__(self, file_name, create=False, read_only=False, stop=None,
quota=None, pack_gc=True, pack_keep_old=True, packer=None, quota=None, pack_gc=True, pack_keep_old=True, packer=None,
...@@ -1183,6 +1190,11 @@ class FileStorage( ...@@ -1183,6 +1190,11 @@ class FileStorage(
finally: finally:
p.close() p.close()
def lastPack(self):
"""lastPack implements IStorageLastPack."""
with self._lock:
return self._lastPack
def pack(self, t, referencesf, gc=None): def pack(self, t, referencesf, gc=None):
"""Copy data from the current database file to a packed file """Copy data from the current database file to a packed file
...@@ -1207,6 +1219,7 @@ class FileStorage( ...@@ -1207,6 +1219,7 @@ class FileStorage(
if self._pack_is_in_progress: if self._pack_is_in_progress:
raise FileStorageError('Already packing') raise FileStorageError('Already packing')
self._pack_is_in_progress = True self._pack_is_in_progress = True
stop = min(stop, self._ltid)
if gc is None: if gc is None:
gc = self._pack_gc gc = self._pack_gc
...@@ -1245,6 +1258,8 @@ class FileStorage( ...@@ -1245,6 +1258,8 @@ class FileStorage(
self._file = open(self._file_name, 'r+b') self._file = open(self._file_name, 'r+b')
self._initIndex(index, self._tindex) self._initIndex(index, self._tindex)
self._pos = opos self._pos = opos
if stop > self._lastPack:
self._lastPack = stop
# We're basically done. Now we need to deal with removed # We're basically done. Now we need to deal with removed
# blobs and removing the .old file (see further down). # blobs and removing the .old file (see further down).
......
...@@ -30,6 +30,7 @@ import zope.interface ...@@ -30,6 +30,7 @@ import zope.interface
@zope.interface.implementer( @zope.interface.implementer(
ZODB.interfaces.IStorage, ZODB.interfaces.IStorage,
ZODB.interfaces.IStorageIteration, ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageLastPack,
) )
class MappingStorage(object): class MappingStorage(object):
"""In-memory storage implementation """In-memory storage implementation
...@@ -186,6 +187,15 @@ class MappingStorage(object): ...@@ -186,6 +187,15 @@ class MappingStorage(object):
self._oid += 1 self._oid += 1
return ZODB.utils.p64(self._oid) return ZODB.utils.p64(self._oid)
# ZODB.interfaces.IStorageLastPack
@ZODB.utils.locked(opened)
def lastPack(self):
packed = self._last_pack
if packed is None:
return ZODB.utils.z64
else:
return packed
# ZODB.interfaces.IStorage # ZODB.interfaces.IStorage
@ZODB.utils.locked(opened) @ZODB.utils.locked(opened)
def pack(self, t, referencesf, gc=True): def pack(self, t, referencesf, gc=True):
...@@ -193,6 +203,14 @@ class MappingStorage(object): ...@@ -193,6 +203,14 @@ class MappingStorage(object):
return return
stop = ZODB.TimeStamp.TimeStamp(*time.gmtime(t)[:5]+(t%60,)).raw() stop = ZODB.TimeStamp.TimeStamp(*time.gmtime(t)[:5]+(t%60,)).raw()
# clip stop to last committed transaction.
# don't use ._ltid as head - for MVCCMappingStorage ._ltid is specific
# to current storage instance, not whole storage history.
head = ZODB.utils.z64
if self._transactions:
head = self._transactions.maxKey()
stop = min(stop, head)
if self._last_pack is not None and self._last_pack >= stop: if self._last_pack is not None and self._last_pack >= stop:
if self._last_pack == stop: if self._last_pack == stop:
return return
......
# -*- coding: utf-8 -*-
############################################################################## ##############################################################################
# #
# Copyright (c) Zope Corporation and Contributors. # Copyright (c) Zope Corporation and Contributors.
...@@ -749,6 +750,8 @@ class IStorage(Interface): ...@@ -749,6 +750,8 @@ class IStorage(Interface):
extract object references from database records. This is extract object references from database records. This is
needed to determine which objects are referenced from object needed to determine which objects are referenced from object
revisions. revisions.
See also: IStorageLastPack.
""" """
def registerDB(wrapper): def registerDB(wrapper):
...@@ -876,6 +879,34 @@ class IPrefetchStorage(IStorage): ...@@ -876,6 +879,34 @@ class IPrefetchStorage(IStorage):
more than once. more than once.
""" """
class IStorageLastPack(Interface):
def lastPack(): # -> pack-cut-point (tid)
"""lastPack returns ID of the last transaction used as pack cut point.
For a database view with at ≥ lastPack, the storage guarantees to
persist all objects revisions to represent such view. For a database
view with at < lastPack, the storage does not provide such guarantee.
In particular pack can remove objects revisions that were non-current as
of lastPack database view at pack time.
Similarly, the storage gurantees to persist all transactions in
[lastPack, lastTransaction] range, while for [0, lastPack) range there
is no such guarantee. In particular pack can remove transactions with
only non-current objects revisions as of lastPack database view.
lastPack is non-decreasing - it can only grow, or stay equal over time.
lastPack is always ≤ IStorage.lastTransaction.
lastPack is related to pack_time passed to IStorage.pack - internally
that time is converted to transaction ID format after clipping into
valid range and looking up nearby transaction.
lastPack value cannot be cached - for client/storage case the call has
to perform round-trip and synchronize with the server.
"""
class IMultiCommitStorage(IStorage): class IMultiCommitStorage(IStorage):
"""A multi-commit storage can commit multiple transactions at once. """A multi-commit storage can commit multiple transactions at once.
......
...@@ -154,11 +154,12 @@ class MVCCAdapterInstance(Base): ...@@ -154,11 +154,12 @@ class MVCCAdapterInstance(Base):
r = self._storage.loadBefore(oid, self._start) r = self._storage.loadBefore(oid, self._start)
if r is None: if r is None:
# object was deleted or not-yet-created. # object was deleted or not-yet-created.
# raise ReadConflictError - not - POSKeyError due to backward # raise POSKeyError, or ReadConflictError, if the deletion is
# compatibility: a pack(t+δ) could be running simultaneously to our # potentially due to simultaneous pack: a pack(t+δ) could be
# transaction that observes database as of t state. Such pack, # running simultaneously to our transaction that observes database
# because it packs the storage from a "future-to-us" point of view, # as of t state. Such pack, because it packs the storage from a
# can remove object revisions that we can try to load, for example: # "future-to-us" point of view, can remove object revisions that we
# can try to load, for example:
# #
# txn1 <-- t # txn1 <-- t
# obj.revA # obj.revA
...@@ -168,15 +169,22 @@ class MVCCAdapterInstance(Base): ...@@ -168,15 +169,22 @@ class MVCCAdapterInstance(Base):
# #
# for such case we want user transaction to be restarted - not # for such case we want user transaction to be restarted - not
# failed - by raising ConflictError subclass. # failed - by raising ConflictError subclass.
# if hasattr(self._storage, 'pack'):
# XXX we don't detect for pack to be actually running - just assume lastPack = getattr(self._storage, 'lastPack', None)
# the worst. It would be good if storage could provide information if lastPack is not None:
# whether pack is/was actually running and its details, take that packed = lastPack()
# into account, and raise ReadConflictError only in the presence of packConflict = (p64(u64(self._start)-1) < packed)
# database being simultaneously updated from back of its log. else:
raise POSException.ReadConflictError( packConflict = True # no lastPack information - assume the worst
"load %s @%s: object deleted, likely by simultaneous pack" %
(oid_repr(oid), tid_repr(p64(u64(self._start) - 1)))) if packConflict:
# database simultaneously packed from back of its log over our view of it
raise POSException.ReadConflictError(
"load %s @%s: object deleted, likely by simultaneous pack" %
(oid_repr(oid), tid_repr(p64(u64(self._start) - 1))))
# no simultaneous pack detected, or lastPack was before our view of the database
raise POSException.POSKeyError(oid)
return r[:2] return r[:2]
......
...@@ -45,6 +45,7 @@ class MVCCMappingStorage(MappingStorage): ...@@ -45,6 +45,7 @@ class MVCCMappingStorage(MappingStorage):
inst._commit_lock = self._commit_lock inst._commit_lock = self._commit_lock
inst.new_oid = self.new_oid inst.new_oid = self.new_oid
inst.pack = self.pack inst.pack = self.pack
inst.lastPack = self.lastPack
inst.loadBefore = self.loadBefore inst.loadBefore = self.loadBefore
inst._ltid = self._ltid inst._ltid = self._ltid
inst._main_lock = self._lock inst._main_lock = self._lock
......
...@@ -16,11 +16,14 @@ from __future__ import print_function ...@@ -16,11 +16,14 @@ from __future__ import print_function
import doctest import doctest
import time import time
import warnings
import gc
from persistent import Persistent from persistent import Persistent
from persistent.mapping import PersistentMapping from persistent.mapping import PersistentMapping
from ZODB import DB from ZODB import DB
from ZODB.POSException import ConflictError, StorageError from ZODB.POSException import ConflictError, StorageError, POSKeyError, \
ReadConflictError
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.MTStorage import TestThread from ZODB.tests.MTStorage import TestThread
...@@ -528,6 +531,109 @@ class PackableStorage(PackableStorageBase): ...@@ -528,6 +531,109 @@ class PackableStorage(PackableStorageBase):
eq(pobj.getoid(), oid2) eq(pobj.getoid(), oid2)
eq(pobj.value, 11) eq(pobj.value, 11)
def checkPackVSConnectionGet(self):
# verify behaviour of Connection.get vs simultaneous pack:
#
# For deleted objects, in normal circumstances, get should raise POSKeyError.
# However when a pack was run simultaneously with packtime going after
# connection view of the database, for storages that do not implement
# IMVCCStorage natively, get should raise ReadConflictError instead.
#
# IMVCCStorage storages are not affected, since they natively provide
# isolation, via e.g. RDBMS in case of RelStorage. The isolation
# property provided by RDBMS guarantees that connection view of the
# database is not affected by other changes - e.g. pack - until
# connection's transaction is complete.
db = DB(self._storage)
eq = self.assertEqual
raises = self.assertRaises
# connA is main connection through which database changes are made
# @at0 - start
tmA = transaction.TransactionManager()
connA = db.open(transaction_manager=tmA)
rootA = connA.root()
rootA[0] = None
tmA.commit()
at0 = rootA._p_serial
# conn0 is "current" db connection that observes database as of @at0 state
tm0 = transaction.TransactionManager()
conn0 = db.open(transaction_manager=tm0)
# @at1 - new object is added to database and linked to root
rootA[0] = objA = MinPO(1)
tmA.commit()
oid = objA._p_oid
at1 = objA._p_serial
# conn1 is "current" db connection that observes database as of @at1 state
tm1 = transaction.TransactionManager()
conn1 = db.open(transaction_manager=tm1)
# @at2 - object value is modified
objA.value = 2
tmA.commit()
at2 = objA._p_serial
# ---- before pack ----
# conn0.get(oid) -> POSKeyError (as of @at0 object was not yet created)
errGetNoObject = POSKeyError
if (not ZODB.interfaces.IMVCCStorage.providedBy(self._storage)) and \
(not ZODB.interfaces.IStorageLastPack.providedBy(self._storage)):
warnings.warn("FIXME %s does not implement lastPack" %
type(self._storage), DeprecationWarning)
errGetNoObject = ReadConflictError
raises(errGetNoObject, conn0.get, oid)
# conn1.get(oid) -> obj(1)
obj1 = conn1.get(oid)
eq(obj1._p_oid, oid)
eq(obj1.value, 1)
# --- after pack to latest db head ----
db.pack(time.time()+1)
# IMVCCStorage - as before
#
# !IMVCCStorage:
# conn0.get(oid) -> ReadConflictError
# conn1.get(oid) -> ReadConflictError
#
# ( conn1: the pack removes obj@at1 revision, which results in conn1
# finding the object in non-existent/deleted state, traditionally this
# is reported as ReadConflictError for conn1's transaction to be
# restarted.
#
# conn0: obj@at0 never existed, but after pack@at2 it is
# indistinguishable whether obj@at0 revision was removed, or it never
# existed -> ReadConflictError too, similarly to conn1 )
conn0.cacheMinimize()
conn1.cacheMinimize()
del obj1
gc.collect()
if ZODB.interfaces.IMVCCStorage.providedBy(self._storage):
raises(POSKeyError, conn0.get, oid)
obj1 = conn1.get(oid)
eq(obj1._p_oid, oid)
eq(obj1.value, 1)
else:
# !IMVCCStorage
raises(ReadConflictError, conn0.get, oid)
raises(ReadConflictError, conn1.get, oid)
# connA stays ok
connA.cacheMinimize()
objA_ = connA.get(oid)
self.assertIs(objA_, objA)
eq(objA_.value, 2)
# end
self._sanity_check()
db.close()
class PackableStorageWithOptionalGC(PackableStorage): class PackableStorageWithOptionalGC(PackableStorage):
def checkPackAllRevisionsNoGC(self): def checkPackAllRevisionsNoGC(self):
......
...@@ -282,6 +282,20 @@ class UserMethodTests(unittest.TestCase): ...@@ -282,6 +282,20 @@ class UserMethodTests(unittest.TestCase):
Traceback (most recent call last): Traceback (most recent call last):
... ...
POSKeyError: 0x01 POSKeyError: 0x01
A request for an object that doesn't exist yet as of connection view of
the database will raise a POSKeyError too.
>>> tm2 = transaction.TransactionManager()
>>> cn2 = db.open(transaction_manager=tm2)
>>> root2 = cn2.root()
>>> obj2 = Persistent()
>>> root2[2] = obj2
>>> tm2.commit()
>>> cn.get(obj2._p_oid) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
""" """
def doctest_close(self): def doctest_close(self):
......
...@@ -386,7 +386,7 @@ No earlier revision available ...@@ -386,7 +386,7 @@ No earlier revision available
We'll reuse the code from the example above, except that there will We'll reuse the code from the example above, except that there will
only be a single revision of "b." As a result, the attempt to only be a single revision of "b." As a result, the attempt to
activate "b" will result in a ReadConflictError. activate "b" will result in a POSKeyError.
>>> ts = TestStorage() >>> ts = TestStorage()
>>> db = DB(ts) >>> db = DB(ts)
...@@ -413,7 +413,7 @@ False ...@@ -413,7 +413,7 @@ False
>>> r1["b"]._p_activate() # doctest: +ELLIPSIS >>> r1["b"]._p_activate() # doctest: +ELLIPSIS
Traceback (most recent call last): Traceback (most recent call last):
... ...
ReadConflictError: ... POSKeyError: ...
>>> db.close() >>> db.close()
""" """
...@@ -427,7 +427,7 @@ checker = renormalizing.RENormalizing([ ...@@ -427,7 +427,7 @@ checker = renormalizing.RENormalizing([
(re.compile("b('.*?')"), r"\1"), (re.compile("b('.*?')"), r"\1"),
# Python 3 adds module name to exceptions. # Python 3 adds module name to exceptions.
(re.compile("ZODB.POSException.ConflictError"), r"ConflictError"), (re.compile("ZODB.POSException.ConflictError"), r"ConflictError"),
(re.compile("ZODB.POSException.ReadConflictError"), r"ReadConflictError"), (re.compile("ZODB.POSException.POSKeyError"), r"POSKeyError"),
]) ])
def test_suite(): def test_suite():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment