Commit d5d7ef22 authored by Christian Theune's avatar Christian Theune

Merge gocept-iteration branch.

parent 9070f8e2
......@@ -8,6 +8,9 @@ Change History
New Features
------------
- Cleaned-up the storage iteration API and provided an iterator implementation
for ZEO.
- Versions are no-longer supported.
- ZEO cache files can be larger than 4G. Note that older ZEO cache
......
......@@ -29,8 +29,9 @@ import threading
import time
import types
import logging
import weakref
from zope.interface import implements
import zope.interface
from ZEO import ServerStub
from ZEO.cache import ClientCache
from ZEO.TransactionBuffer import TransactionBuffer
......@@ -38,11 +39,12 @@ from ZEO.Exceptions import ClientStorageError, ClientDisconnected, AuthError
from ZEO.auth import get_module
from ZEO.zrpc.client import ConnectionManager
import ZODB.interfaces
import ZODB.lock_file
import ZODB.BaseStorage
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
from ZODB.interfaces import IBlobStorage
from ZODB.blob import rename_or_copy_blob
from persistent.TimeStamp import TimeStamp
......@@ -103,7 +105,10 @@ class ClientStorage(object):
"""
implements(IBlobStorage)
# ClientStorage does not declare any interfaces here. Interfaces are
# declared according to the server's storage once a connection is
# established.
# Classes we instantiate. A subclass might override.
TransactionBufferClass = TransactionBuffer
......@@ -260,6 +265,9 @@ class ClientStorage(object):
self._password = password
self._realm = realm
self._iterators = weakref.WeakValueDictionary()
self._iterator_ids = set()
# Flag tracking disconnections in the middle of a transaction. This
# is reset in tpc_begin() and set in notifyDisconnected().
self._midtxn_disconnect = 0
......@@ -270,7 +278,7 @@ class ClientStorage(object):
self._verification_invalidations = None
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0}
'supportsUndo': 0, 'interfaces': ()}
self._tbuf = self.TransactionBufferClass()
self._db = None
......@@ -526,6 +534,15 @@ class ClientStorage(object):
self._handle_extensions()
# Decorate ClientStorage with all interfaces that the backend storage
# supports.
remote_interfaces = []
for module_name, interface_name in self._info['interfaces']:
module = __import__(module_name, globals(), locals(), [interface_name])
interface = getattr(module, interface_name)
remote_interfaces.append(interface)
zope.interface.directlyProvides(self, remote_interfaces)
def _handle_extensions(self):
for name in self.getExtensionMethods().keys():
if not hasattr(self, name):
......@@ -633,6 +650,7 @@ class ClientStorage(object):
self._ready.clear()
self._server = disconnected_stub
self._midtxn_disconnect = 1
self._iterator_gc()
def __len__(self):
"""Return the size of the storage."""
......@@ -933,14 +951,7 @@ class ClientStorage(object):
raise POSException.POSKeyError("No blob file", oid, serial)
# First, we'll create the directory for this oid, if it doesn't exist.
targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
try:
os.makedirs(targetpath, 0700)
except OSError:
# We might have lost a race. If so, the directory
# must exist now
assert os.path.exists(targetpath)
self.fshelper.createPathForOID(oid)
# OK, it's not here and we (or someone) needs to get it. We
# want to avoid getting it multiple times. We want to avoid
......@@ -1075,6 +1086,7 @@ class ClientStorage(object):
self._tbuf.clear()
self._seriald.clear()
del self._serials[:]
self._iterator_gc()
self.end_transaction()
def tpc_finish(self, txn, f=None):
......@@ -1104,6 +1116,7 @@ class ClientStorage(object):
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
finally:
self._load_lock.release()
self._iterator_gc()
self.end_transaction()
def _update_cache(self, tid):
......@@ -1176,6 +1189,25 @@ class ClientStorage(object):
return []
return self._server.undoLog(first, last)
# Recovery support
def copyTransactionsFrom(self, other, verbose=0):
"""Copy transactions from another storage.
This is typically used for converting data from one storage to
another. `other` must have an .iterator() method.
"""
ZODB.BaseStorage.copy(other, self, verbose)
def restore(self, oid, serial, data, version, prev_txn, transaction):
"""Write data already committed in a separate database."""
assert not version
self._check_trans(transaction)
self._server.restorea(oid, serial, data, prev_txn, id(transaction))
# Don't update the transaction buffer, because current data are
# unaffected.
return self._check_serials()
# Below are methods invoked by the StorageServer
def serialnos(self, args):
......@@ -1249,3 +1281,103 @@ class ClientStorage(object):
invalidate = invalidateVerify
end = endVerify
Invalidate = invalidateTrans
# IStorageIteration
def iterator(self, start=None, stop=None):
"""Return an IStorageTransactionInformation iterator."""
# iids are "iterator IDs" that can be used to query an iterator whose
# status is held on the server.
iid = self._server.iterator_start(start, stop)
return self._setup_iterator(TransactionIterator, iid)
def _setup_iterator(self, factory, iid, *args):
self._iterators[iid] = iterator = factory(self, iid, *args)
self._iterator_ids.add(iid)
return iterator
def _forget_iterator(self, iid):
self._iterators.pop(iid, None)
self._iterator_ids.remove(iid)
def _iterator_gc(self):
iids = self._iterator_ids - set(self._iterators)
try:
self._server.iterator_gc(list(iids))
except ClientDisconnected:
# We could not successfully garbage-collect iterators.
# The server might have been restarted, so the IIDs might mean
# something different now. We simply forget our unused IIDs to
# avoid gc'ing foreign iterators.
# In the case that the server was not restarted, we accept the
# risk of leaking resources on the ZEO server.
pass
self._iterator_ids -= iids
class TransactionIterator(object):
def __init__(self, storage, iid, *args):
self._storage = storage
self._iid = iid
self._ended = False
def __iter__(self):
return self
def next(self):
if self._ended:
raise ZODB.interfaces.StorageStopIteration()
tx_data = self._storage._server.iterator_next(self._iid)
if tx_data is None:
# The iterator is exhausted, and the server has already
# disposed it.
self._ended = True
self._storage._forget_iterator(self._iid)
raise ZODB.interfaces.StorageStopIteration()
return ClientStorageTransactionInformation(self._storage, self, *tx_data)
class ClientStorageTransactionInformation(ZODB.BaseStorage.TransactionRecord):
def __init__(self, storage, txiter, tid, status, user, description, extension):
self._storage = storage
self._txiter = txiter
self._completed = False
self._riid = None
self.tid = tid
self.status = status
self.user = user
self.description = description
self.extension = extension
def __iter__(self):
riid = self._storage._server.iterator_record_start(self._txiter._iid, self.tid)
return self._storage._setup_iterator(RecordIterator, riid)
class RecordIterator(object):
def __init__(self, storage, riid):
self._riid = riid
self._completed = False
self._storage = storage
def __iter__(self):
return self
def next(self):
if self._completed:
# We finished iteration once already and the server can't know
# about the iteration anymore.
raise ZODB.interfaces.StorageStopIteration()
item = self._storage._server.iterator_record_next(self._riid)
if item is None:
# The iterator is exhausted, and the server has already
# disposed it.
self._completed = True
raise ZODB.interfaces.StorageStopIteration()
return ZODB.BaseStorage.DataRecord(*item)
......@@ -18,10 +18,13 @@ must serialize them as the actually execute at the server. The
concurrent commits are achieved by logging actions up until the
tpc_vote(). At that point, the entire transaction is committed on the
real storage.
"""
import cPickle
import tempfile
class CommitLog:
def __init__(self):
......@@ -35,7 +38,11 @@ class CommitLog:
return self.file.tell()
def store(self, oid, serial, data):
self.pickler.dump((oid, serial, data))
self.pickler.dump(('s', oid, serial, data))
self.stores += 1
def restore(self, oid, serial, data, prev_txn):
self.pickler.dump(('r', oid, serial, data, prev_txn))
self.stores += 1
def get_loader(self):
......
......@@ -209,6 +209,10 @@ class StorageServer:
def storea(self, oid, serial, data, id):
self.rpc.callAsync('storea', oid, serial, data, '', id)
def restorea(self, oid, serial, data, prev_txn, id):
self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
def storeBlob(self, oid, serial, data, blobfilename, txn):
# Store a blob to the server. We don't want to real all of
......@@ -292,6 +296,22 @@ class StorageServer:
def undoInfo(self, first, last, spec):
return self.rpc.call('undoInfo', first, last, spec)
def iterator_start(self, start, stop):
return self.rpc.call('iterator_start', start, stop)
def iterator_next(self, iid):
return self.rpc.call('iterator_next', iid)
def iterator_record_start(self, txn_iid, tid):
return self.rpc.call('iterator_record_start', txn_iid, tid)
def iterator_record_next(self, iid):
return self.rpc.call('iterator_record_next', iid)
def iterator_gc(self, iids):
return self.rpc.call('iterator_gc', iids)
class ExtensionMethodWrapper:
def __init__(self, rpc, name):
self.rpc = rpc
......
......@@ -29,12 +29,14 @@ import tempfile
import threading
import time
import warnings
import itertools
import transaction
import ZODB.serialize
import ZEO.zrpc.error
import zope.interface
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer
......@@ -53,7 +55,6 @@ from ZODB.loglevels import BLATHER
logger = logging.getLogger('ZEO.StorageServer')
# TODO: This used to say "ZSS", which is now implied in the logger name.
# Can this be either set to str(os.getpid()) (if that makes sense) or removed?
_label = "" # default label used for logging.
......@@ -110,6 +111,11 @@ class ZEOStorage:
self._extensions = {}
for func in self.extensions:
self._extensions[func.func_name] = None
self._iterators = {}
self._iterator_ids = itertools.count()
# Stores the last item that was handed out for a
# transaction iterator.
self._txn_iterators_last = {}
def finish_auth(self, authenticated):
if not self.auth_realm:
......@@ -272,6 +278,12 @@ class ZEOStorage:
else:
supportsUndo = supportsUndo()
# Communicate the backend storage interfaces to the client
storage_provides = zope.interface.providedBy(storage)
interfaces = []
for candidate in storage_provides.__iro__:
interfaces.append((candidate.__module__, candidate.__name__))
return {'length': len(storage),
'size': storage.getSize(),
'name': storage.getName(),
......@@ -279,6 +291,7 @@ class ZEOStorage:
'supportsVersions': False,
'extensionMethods': self.getExtensionMethods(),
'supports_record_iternext': hasattr(self, 'record_iternext'),
'interfaces': tuple(interfaces),
}
def get_size_info(self):
......@@ -477,6 +490,11 @@ class ZEOStorage:
self.stats.stores += 1
self.txnlog.store(oid, serial, data)
def restorea(self, oid, serial, data, prev_txn, id):
self._check_tid(id, exc=StorageTransactionError)
self.stats.stores += 1
self.txnlog.restore(oid, serial, data, prev_txn)
def storeBlobStart(self):
assert self.blob_tempfile is None
self.blob_tempfile = tempfile.mkstemp(
......@@ -544,16 +562,7 @@ class ZEOStorage:
# Unexpected errors are logged and passed to the client
self.log("store error: %s, %s" % sys.exc_info()[:2],
logging.ERROR, exc_info=True)
# Try to pickle the exception. If it can't be pickled,
# the RPC response would fail, so use something else.
pickler = cPickle.Pickler()
pickler.fast = 1
try:
pickler.dump(err, 1)
except:
msg = "Couldn't pickle storage exception: %s" % repr(err)
self.log(msg, logging.ERROR)
err = StorageServerError(msg)
err = self._marshal_error(err)
# The exception is reported back as newserial for this oid
newserial = [(oid, err)]
else:
......@@ -575,6 +584,37 @@ class ZEOStorage:
return err is None
def _restore(self, oid, serial, data, prev_txn):
err = None
try:
self.storage.restore(oid, serial, data, '', prev_txn, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
self.store_failed = 1
if not isinstance(err, TransactionError):
# Unexpected errors are logged and passed to the client
self.log("store error: %s, %s" % sys.exc_info()[:2],
logging.ERROR, exc_info=True)
err = self._marshal_error(err)
# The exception is reported back as newserial for this oid
self.serials.append((oid, err))
return err is None
def _marshal_error(self, error):
# Try to pickle the exception. If it can't be pickled,
# the RPC response would fail, so use something that can be pickled.
pickler = cPickle.Pickler()
pickler.fast = 1
try:
pickler.dump(error, 1)
except:
msg = "Couldn't pickle storage exception: %s" % repr(error)
self.log(msg, logging.ERROR)
error = StorageServerError(msg)
return error
def _vote(self):
if not self.store_failed:
# Only call tpc_vote of no store call failed, otherwise
......@@ -627,8 +667,18 @@ class ZEOStorage:
self._tpc_begin(self.transaction, self.tid, self.status)
loads, loader = self.txnlog.get_loader()
for i in range(loads):
# load oid, serial, data, version
if not self._store(*loader.load()):
store = loader.load()
store_type = store[0]
store_args = store[1:]
if store_type == 's':
do_store = self._store
elif store_type == 'r':
do_store = self._restore
else:
raise ValueError('Invalid store type: %r' % store_type)
if not do_store(*store_args):
break
# Blob support
......@@ -683,6 +733,62 @@ class ZEOStorage:
abortVersion = commitVersion
# IStorageIteration support
def iterator_start(self, start, stop):
iid = self._iterator_ids.next()
self._iterators[iid] = iter(self.storage.iterator(start, stop))
return iid
def iterator_next(self, iid):
iterator = self._iterators[iid]
try:
info = iterator.next()
except StopIteration:
del self._iterators[iid]
item = None
if iid in self._txn_iterators_last:
del self._txn_iterators_last[iid]
else:
item = (info.tid,
info.status,
info.user,
info.description,
info.extension)
# Keep a reference to the last iterator result to allow starting a
# record iterator off it.
self._txn_iterators_last[iid] = info
return item
def iterator_record_start(self, txn_iid, tid):
record_iid = self._iterator_ids.next()
txn_info = self._txn_iterators_last[txn_iid]
if txn_info.tid != tid:
raise Exception(
'Out-of-order request for record iterator for transaction %r' % tid)
self._iterators[record_iid] = iter(txn_info)
return record_iid
def iterator_record_next(self, iid):
iterator = self._iterators[iid]
try:
info = iterator.next()
except StopIteration:
del self._iterators[iid]
item = None
else:
item = (info.oid,
info.tid,
info.data,
info.version,
info.data_txn)
return item
def iterator_gc(self, iids):
for iid in iids:
self._iterators.pop(iid, None)
class StorageServerDB:
def __init__(self, server, storage_id):
......
......@@ -102,7 +102,8 @@ class CommonSetupTearDown(StorageTestBase):
"""
self.__super_setUp()
logging.info("setUp() %s", self.id())
self.file = tempfile.mktemp()
fd, self.file = tempfile.mkstemp()
os.close(fd)
self.addr = []
self._pids = []
self._servers = []
......
##############################################################################
#
# Copyright (c) 2008 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""ZEO iterator protocol tests."""
import transaction
class IterationTests:
def checkIteratorGCProtocol(self):
# Test garbage collection on protocol level.
server = self._storage._server
iid = server.iterator_start(None, None)
# None signals the end of iteration.
self.assertEquals(None, server.iterator_next(iid))
# The server has disposed the iterator already.
self.assertRaises(KeyError, server.iterator_next, iid)
iid = server.iterator_start(None, None)
# This time, we tell the server to throw the iterator away.
server.iterator_gc([iid])
self.assertRaises(KeyError, server.iterator_next, iid)
def checkIteratorExhaustionStorage(self):
# Test the storage's garbage collection mechanism.
iterator = self._storage.iterator()
self.assertEquals(1, len(self._storage._iterator_ids))
iid = list(self._storage._iterator_ids)[0]
self.assertEquals([], list(iterator))
self.assertEquals(0, len(self._storage._iterator_ids))
# The iterator has run through, so the server has already disposed it.
self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
def checkIteratorGCSpanTransactions(self):
iterator = self._storage.iterator()
t = transaction.Transaction()
self._storage.tpc_begin(t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEquals([], list(iterator))
def checkIteratorGCStorageCommitting(self):
# We want the iterator to be garbage-collected, so we don't keep any
# hard references to it. The storage tracks its ID, though.
self._storage.iterator()
self.assertEquals(1, len(self._storage._iterator_ids))
iid = list(self._storage._iterator_ids)[0]
# GC happens at the transaction boundary. After that, both the storage
# and the server have forgotten the iterator.
self._dostore()
self.assertEquals(0, len(self._storage._iterator_ids))
self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
def checkIteratorGCStorageTPCAborting(self):
self._storage.iterator()
iid = list(self._storage._iterator_ids)[0]
t = transaction.Transaction()
self._storage.tpc_begin(t)
self._storage.tpc_abort(t)
self.assertEquals(0, len(self._storage._iterator_ids))
self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
def checkIteratorGCStorageDisconnect(self):
self._storage.iterator()
iid = list(self._storage._iterator_ids)[0]
t = transaction.Transaction()
self._storage.tpc_begin(t)
# Show that after disconnecting, the client side GCs the iterators
# as well. I'm calling this directly to avoid accidentally
# calling tpc_abort implicitly.
self._storage.notifyDisconnected()
self.assertEquals(0, len(self._storage._iterator_ids))
def checkIteratorParallel(self):
self._dostore()
self._dostore()
iter1 = self._storage.iterator()
iter2 = self._storage.iterator()
txn_info1 = iter1.next()
txn_info2 = iter2.next()
self.assertEquals(txn_info1.tid, txn_info2.tid)
txn_info1 = iter1.next()
txn_info2 = iter2.next()
self.assertEquals(txn_info1.tid, txn_info2.tid)
self.assertRaises(StopIteration, iter1.next)
self.assertRaises(StopIteration, iter2.next)
......@@ -39,7 +39,7 @@ import transaction
from ZODB.tests import StorageTestBase, BasicStorage, \
TransactionalUndoStorage, \
PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \
MTStorage, ReadOnlyStorage
MTStorage, ReadOnlyStorage, IteratorStorage, RecoveryStorage
from ZODB.tests.testDemoStorage import DemoStorageWrappedBase
......@@ -47,7 +47,8 @@ from ZEO.ClientStorage import ClientStorage
import ZEO.zrpc.connection
from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests, \
IterationTests
from ZEO.tests.forker import get_port
import ZEO.tests.ConnectionTests
......@@ -56,7 +57,6 @@ import ZEO.StorageServer
logger = logging.getLogger('ZEO.tests.testZEO')
class DummyDB:
def invalidate(self, *args):
pass
......@@ -158,7 +158,7 @@ class GenericTests(
CommitLockTests.CommitLockVoteTests,
ThreadTests.ThreadTests,
# Locally defined (see above)
MiscZEOTests
MiscZEOTests,
):
"""Combine tests from various origins in one class."""
......@@ -196,6 +196,15 @@ class GenericTests(
for pid in self._pids:
os.waitpid(pid, 0)
def runTest(self):
try:
super(GenericTests, self).runTest()
except:
self._failed = True
raise
else:
self._failed = False
def open(self, read_only=0):
# Needed to support ReadOnlyStorage tests. Ought to be a
# cleaner way.
......@@ -226,9 +235,75 @@ class FullGenericTests(
PackableStorage.PackableUndoStorage,
RevisionStorage.RevisionStorage,
TransactionalUndoStorage.TransactionalUndoStorage,
IteratorStorage.IteratorStorage,
IterationTests.IterationTests,
):
"""Extend GenericTests with tests that MappingStorage can't pass."""
class FileStorageRecoveryTests(StorageTestBase.StorageTestBase,
RecoveryStorage.RecoveryStorage):
level = 2
def setUp(self):
self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True)
self._dst = ZODB.FileStorage.FileStorage("Dest.fs", create=True)
def getConfig(self):
filename = self.__fs_base = tempfile.mktemp()
return """\
<filestorage 1>
path %s
</filestorage>
""" % filename
def _new_storage(self):
port = get_port()
zconf = forker.ZEOConfig(('', port))
zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
zconf, port)
blob_cache_dir = tempfile.mkdtemp()
self._pids.append(pid)
self._servers.append(adminaddr)
self._conf_paths.append(path)
self.blob_cache_dirs.append(blob_cache_dir)
storage = ClientStorage(
zport, '1', cache_size=20000000,
min_disconnect_poll=0.5, wait=1,
wait_timeout=60, blob_dir=blob_cache_dir)
storage.registerDB(DummyDB())
return storage
def setUp(self):
self._pids = []
self._servers = []
self._conf_paths = []
self.blob_cache_dirs = []
self._storage = self._new_storage()
self._dst = self._new_storage()
def tearDown(self):
self._storage.close()
self._dst.close()
for p in self._conf_paths:
os.remove(p)
for p in self.blob_cache_dirs:
ZODB.blob.remove_committed_dir(p)
for server in self._servers:
forker.shutdown_zeo_server(server)
if hasattr(os, 'waitpid'):
# Not in Windows Python until 2.3
for pid in self._pids:
os.waitpid(pid, 0)
def new_dest(self):
return self._new_storage()
class FileStorageTests(FullGenericTests):
"""Test ZEO backed by a FileStorage."""
level = 2
......@@ -241,12 +316,36 @@ class FileStorageTests(FullGenericTests):
</filestorage>
""" % filename
def checkInterfaceFromRemoteStorage(self):
# ClientStorage itself doesn't implement IStorageIteration, but the
# FileStorage on the other end does, and thus the ClientStorage
# instance that is connected to it reflects this.
self.failIf(ZODB.interfaces.IStorageIteration.implementedBy(
ZEO.ClientStorage.ClientStorage))
self.failUnless(ZODB.interfaces.IStorageIteration.providedBy(
self._storage))
# This is communicated using ClientStorage's _info object:
self.assertEquals((('ZODB.interfaces', 'IStorageIteration'),
('zope.interface', 'Interface')),
self._storage._info['interfaces'])
class MappingStorageTests(GenericTests):
"""ZEO backed by a Mapping storage."""
def getConfig(self):
return """<mappingstorage 1/>"""
def checkSimpleIteration(self):
# The test base class IteratorStorage assumes that we keep undo data
# to construct our iterator, which we don't, so we disable this test.
pass
def checkUndoZombie(self):
# The test base class IteratorStorage assumes that we keep undo data
# to construct our iterator, which we don't, so we disable this test.
pass
class DemoStorageTests(
GenericTests,
):
......@@ -260,6 +359,11 @@ class DemoStorageTests(
</demostorage>
""" % tempfile.mktemp()
def checkUndoZombie(self):
# The test base class IteratorStorage assumes that we keep undo data
# to construct our iterator, which we don't, so we disable this test.
pass
class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
"""Make sure a heartbeat is being sent and that it does no harm
......@@ -554,7 +658,7 @@ class CommonBlobTests:
self._storage.close()
class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
"""ZEO backed by a BlobStorage-adapted FileStorage."""
def setUp(self):
......@@ -645,7 +749,7 @@ class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
check_data(filename)
class BlobWritableCacheTests(GenericTests, CommonBlobTests):
class BlobWritableCacheTests(FullGenericTests, CommonBlobTests):
def setUp(self):
self.blobdir = self.blob_cache_dir = tempfile.mkdtemp()
......@@ -823,7 +927,7 @@ without this method:
>>> st = StorageServerWrapper(sv, 'fs')
>>> s = st.server
Now, if we ask fior the invalidations since the last committed
Now, if we ask for the invalidations since the last committed
transaction, we'll get a result:
>>> tid, oids = s.getInvalidations(last[-1])
......@@ -849,7 +953,8 @@ transaction, we'll get a result:
"""
test_classes = [FileStorageTests, MappingStorageTests, DemoStorageTests,
test_classes = [FileStorageTests, FileStorageRecoveryTests,
MappingStorageTests, DemoStorageTests,
BlobAdaptedFileStorageTests, BlobWritableCacheTests]
def test_suite():
......
......@@ -21,14 +21,18 @@ import time
import logging
from struct import pack as _structpack, unpack as _structunpack
import zope.interface
from persistent.TimeStamp import TimeStamp
import ZODB.interfaces
from ZODB import POSException
from ZODB.utils import z64, oid_repr
from ZODB.UndoLogCompatible import UndoLogCompatible
log = logging.getLogger("ZODB.BaseStorage")
class BaseStorage(UndoLogCompatible):
"""Base class that supports storage implementations.
......@@ -188,6 +192,7 @@ class BaseStorage(UndoLogCompatible):
ext = cPickle.dumps(ext, 1)
else:
ext = ""
self._ude = user, desc, ext
if tid is None:
......@@ -338,7 +343,7 @@ def copy(source, dest, verbose=0):
if verbose:
print oid_repr(oid), r.version, len(r.data)
if restoring:
dest.restore(oid, r.tid, r.data, '',
dest.restore(oid, r.tid, r.data, r.version,
r.data_txn, transaction)
else:
pre = preget(oid, None)
......@@ -348,10 +353,36 @@ def copy(source, dest, verbose=0):
dest.tpc_vote(transaction)
dest.tpc_finish(transaction)
fiter.close()
class TransactionRecord:
class TransactionRecord(object):
"""Abstract base class for iterator protocol"""
class DataRecord:
zope.interface.implements(ZODB.interfaces.IStorageTransactionInformation)
def __init__(self, tid, status, user, description, extension):
self.tid = tid
self.status = status
self.user = user
self.description = description
self.extension = extension
# XXX This is a workaround to make the TransactionRecord compatible with a
# transaction object because it is passed to tpc_begin().
def _ext_set(self, value):
self.extension = value
def _ext_get(self):
return self.extension
_extension = property(fset=_ext_set, fget=_ext_get)
class DataRecord(object):
"""Abstract base class for iterator protocol"""
zope.interface.implements(ZODB.interfaces.IStorageRecordInformation)
def __init__(self, oid, tid, data, version, prev):
self.oid = oid
self.tid = tid
self.data = data
self.version = version
self.data_txn = prev
......@@ -80,15 +80,19 @@ and call it to monitor the storage.
"""
import cPickle
import base64, time
import ZODB.BaseStorage
import ZODB.interfaces
import zope.interface
from ZODB import POSException
from ZODB.utils import z64, oid_repr
from ZODB.BaseStorage import BaseStorage
from persistent.TimeStamp import TimeStamp
from cPickle import loads
from BTrees import OOBTree
class DemoStorage(BaseStorage):
class DemoStorage(ZODB.BaseStorage.BaseStorage):
"""Demo storage
Demo storages provide useful storages for writing tests because
......@@ -104,9 +108,10 @@ class DemoStorage(BaseStorage):
"""
zope.interface.implements(ZODB.interfaces.IStorageIteration)
def __init__(self, name='Demo Storage', base=None, quota=None):
BaseStorage.__init__(self, name, base)
ZODB.BaseStorage.BaseStorage.__init__(self, name, base)
# We use a BTree because the items are sorted!
self._data = OOBTree.OOBTree()
......@@ -133,7 +138,7 @@ class DemoStorage(BaseStorage):
# by the base storage, leading to a variety of "impossible" problems.
def new_oid(self):
if self._base is None:
return BaseStorage.new_oid(self)
return ZODB.BaseStorage.BaseStorage.new_oid(self)
else:
return self._base.new_oid()
......@@ -317,6 +322,10 @@ class DemoStorage(BaseStorage):
self._tsize = self._size + 120 + len(u) + len(d) + len(e)
def _finish(self, tid, user, desc, ext):
if not self._tindex:
# No data, so we don't update anything.
return
self._size = self._tsize
self._data[tid] = None, user, desc, ext, tuple(self._tindex)
......@@ -364,7 +373,7 @@ class DemoStorage(BaseStorage):
'time': TimeStamp(tid).timeTime(),
'user_name': u, 'description': d}
if e:
d.update(loads(e))
d.update(cPickle.loads(e))
if filter is None or filter(d):
if i >= first:
r.append(d)
......@@ -569,3 +578,27 @@ class DemoStorage(BaseStorage):
def close(self):
if self._base is not None:
self._base.close()
def iterator(self, start=None, end=None):
# First iterate over the base storage
if self._base is not None:
for transaction in self._base.iterator(start, end):
yield transaction
# Then iterate over our local transactions
for tid, transaction in self._data.items():
if tid >= start and tid <= end:
yield TransactionRecord(tid, transaction)
class TransactionRecord(ZODB.BaseStorage.TransactionRecord):
def __init__(self, tid, transaction):
packed, user, description, extension, records = transaction
super(TransactionRecord, self).__init__(
tid, packed, user, description, extension)
self.records = transaction
def __iter__(self):
for record in self.records:
oid, prev, version, data, tid = record
yield ZODB.BaseStorage.DataRecord(oid, tid, data, version, prev)
......@@ -29,6 +29,8 @@ from struct import pack, unpack
# Not all platforms have fsync
fsync = getattr(os, "fsync", None)
import zope.interface
import ZODB.interfaces
from ZODB import BaseStorage, ConflictResolution, POSException
from ZODB.POSException import UndoError, POSKeyError, MultipleUndoErrors
from persistent.TimeStamp import TimeStamp
......@@ -88,6 +90,8 @@ class FileStorage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage,
FileStorageFormatter):
zope.interface.implements(ZODB.interfaces.IStorageIteration)
# Set True while a pack is in progress; undo is blocked for the duration.
_pack_is_in_progress = False
......@@ -1127,7 +1131,7 @@ class FileStorage(BaseStorage.BaseStorage,
seek(0)
return [(trans.tid, [(r.oid, '') for r in trans])
for trans in FileIterator(self._file, pos=pos)]
for trans in FileIterator(self._file_name, pos=pos)]
finally:
self._lock_release()
......@@ -1516,31 +1520,16 @@ def _truncate(file, name, pos):
file.seek(pos)
file.truncate()
class Iterator:
"""A General simple iterator that uses the Python for-loop index protocol
"""
__index=-1
__current=None
def __getitem__(self, i):
__index=self.__index
while i > __index:
__index=__index+1
self.__current=self.next(__index)
self.__index=__index
return self.__current
class FileIterator(Iterator, FileStorageFormatter):
class FileIterator(FileStorageFormatter):
"""Iterate over the transactions in a FileStorage file.
"""
_ltid = z64
_file = None
def __init__(self, file, start=None, stop=None, pos=4L):
if isinstance(file, str):
file = open(file, 'rb')
def __init__(self, filename, start=None, stop=None, pos=4L):
assert isinstance(filename, str)
file = open(filename, 'rb')
self._file = file
if file.read(4) != packed_version:
raise FileStorageFormatError(file.name)
......@@ -1602,14 +1591,17 @@ class FileIterator(Iterator, FileStorageFormatter):
panic("%s has inconsistent transaction length at %s "
"(%s != %s)", file.name, pos, u64(rtl), u64(stl))
def next(self, index=0):
# Iterator protocol
def __iter__(self):
return self
def next(self):
if self._file is None:
# A closed iterator. Is IOError the best we can do? For
# now, mimic a read on a closed file.
raise IOError('iterator is closed')
raise ZODB.interfaces.StorageStopIteration()
pos = self._pos
while 1:
while True:
# Read the transaction record
try:
h = self._read_txn_header(pos)
......@@ -1625,11 +1617,11 @@ class FileIterator(Iterator, FileStorageFormatter):
self._ltid = h.tid
if self._stop is not None and h.tid > self._stop:
raise IndexError(index)
break
if h.status == "c":
# Assume we've hit the last, in-progress transaction
raise IndexError(index)
break
if pos + h.tlen + 8 > self._file_size:
# Hm, the data were truncated or the checkpoint flag wasn't
......@@ -1679,7 +1671,7 @@ class FileIterator(Iterator, FileStorageFormatter):
except:
pass
result = RecordIterator(h.tid, h.status, h.user, h.descr,
result = TransactionRecord(h.tid, h.status, h.user, h.descr,
e, pos, tend, self._file, tpos)
# Read the (intentionally redundant) transaction length
......@@ -1693,23 +1685,25 @@ class FileIterator(Iterator, FileStorageFormatter):
return result
raise IndexError(index)
self.close()
raise ZODB.interfaces.StorageStopIteration()
class RecordIterator(Iterator, BaseStorage.TransactionRecord,
FileStorageFormatter):
class TransactionRecord(BaseStorage.TransactionRecord, FileStorageFormatter):
"""Iterate over the transactions in a FileStorage file."""
def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
self.tid = tid
self.status = status
self.user = user
self.description = desc
self._extension = ext
BaseStorage.TransactionRecord.__init__(
self, tid, status, user, desc, ext)
self._pos = pos
self._tend = tend
self._file = file
self._tpos = tpos
def next(self, index=0):
def __iter__(self):
return self
def next(self):
pos = self._pos
while pos < self._tend:
# Read the data records for this transaction
......@@ -1738,20 +1732,18 @@ class RecordIterator(Iterator, BaseStorage.TransactionRecord,
# Should it go to the original data like BDBFullStorage?
prev_txn = self.getTxnFromData(h.oid, h.back)
r = Record(h.oid, h.tid, data, prev_txn, pos)
return r
return Record(h.oid, h.tid, data, prev_txn, pos)
raise ZODB.interfaces.StorageStopIteration()
raise IndexError(index)
class Record(BaseStorage.DataRecord):
"""An abstract database record."""
def __init__(self, oid, tid, data, prev, pos):
self.oid = oid
self.tid = tid
self.data = data
self.data_txn = prev
super(Record, self).__init__(oid, tid, data, '', prev)
self.pos = pos
class UndoSearch:
def __init__(self, file, pos, first, last, filter=None):
......
# this is a package
from ZODB.FileStorage.FileStorage import FileStorage, RecordIterator
from ZODB.FileStorage.FileStorage import FileStorage, TransactionRecord
from ZODB.FileStorage.FileStorage import FileIterator, Record, packed_version
# BBB Alias for compatibility
RecordIterator = TransactionRecord
......@@ -21,16 +21,16 @@ It is meant to illustrate the simplest possible storage.
The Mapping storage uses a single data structure to map object ids to data.
"""
import ZODB.BaseStorage
from ZODB.utils import u64, z64
from ZODB.BaseStorage import BaseStorage
from ZODB import POSException
from persistent.TimeStamp import TimeStamp
class MappingStorage(BaseStorage):
class MappingStorage(ZODB.BaseStorage.BaseStorage):
def __init__(self, name='Mapping Storage'):
BaseStorage.__init__(self, name)
ZODB.BaseStorage.BaseStorage.__init__(self, name)
# ._index maps an oid to a string s. s[:8] is the tid of the
# transaction that created oid's current state, and s[8:] is oid's
# current state.
......
......@@ -324,6 +324,21 @@ class FilesystemHelper:
"""
return os.path.join(self.base_dir, utils.oid_repr(oid))
def createPathForOID(self, oid):
"""Given an OID, creates a directory on the filesystem where
the blob data relating to that OID is stored, if it doesn't exist.
"""
path = self.getPathForOID(oid)
if os.path.exists(path):
return
try:
os.makedirs(path, 0700)
except OSError:
# We might have lost a race. If so, the directory
# must exist now
assert os.path.exists(path)
def getBlobFilename(self, oid, tid):
"""Given an oid and a tid, return the full filename of the
'committed' blob file related to that oid and tid.
......
......@@ -82,7 +82,7 @@ except ImportError:
import ZODB.FileStorage
from ZODB.utils import u64
from ZODB.FileStorage import RecordIterator
from ZODB.FileStorage import TransactionRecord
from persistent.TimeStamp import TimeStamp
......@@ -146,7 +146,7 @@ def read_txn_header(f, pos, file_size, outp, ltid):
except: e={}
else: e={}
result = RecordIterator(tid, status, user, description, e, pos, tend,
result = TransactionRecord(tid, status, user, description, e, pos, tend,
f, tpos)
pos = tend
......
......@@ -286,6 +286,7 @@ class IConnection(Interface):
begins or until the connection os reopned.
"""
class IStorageDB(Interface):
"""Database interface exposed to storages
......@@ -418,6 +419,7 @@ class IDatabase(IStorageDB):
should also close all the Connections.
"""
class IStorage(Interface):
"""A storage is responsible for storing and retrieving data of objects.
"""
......@@ -710,6 +712,7 @@ class IStorage(Interface):
"""
class IStorageRestoreable(IStorage):
"""Copying Transactions
......@@ -744,7 +747,7 @@ class IStorageRestoreable(IStorage):
# failed to take into account records after the pack time.
def restore(oid, serial, data, prev_txn, transaction):
def restore(oid, serial, data, version, prev_txn, transaction):
"""Write data already committed in a separate database
The restore method is used when copying data from one database
......@@ -775,41 +778,44 @@ class IStorageRestoreable(IStorage):
Nothing is returned.
"""
class IStorageRecordInformation(Interface):
"""Provide information about a single storage record
"""
oid = Attribute("The object id")
tid = Attribute("The transaction id")
data = Attribute("The data record")
version = Attribute("The version id")
data_txn = Attribute("The previous transaction id")
class IStorageTransactionInformation(Interface):
"""Provide information about a storage transaction
"""Provide information about a storage transaction.
Can be iterated over to retrieve the records modified in the transaction.
"""
tid = Attribute("Transaction id")
status = Attribute("Transaction Status") # XXX what are valid values?
user = Attribute("Transaction user")
description = Attribute("Transaction Description")
extension = Attribute("Transaction extension data")
extension = Attribute("A dictionary carrying the transaction's extension data")
def __iter__():
"""Return an iterable of IStorageRecordInformation
"""
"""Iterate over the transaction's records given as
IStorageRecordInformation objects.
class IStorageIteration(Interface):
"""API for iterating over the contents of a storage
"""
Note that this is a future API. Some storages now provide an
approximation of this.
"""
class IStorageIteration(Interface):
"""API for iterating over the contents of a storage."""
def iterator(start=None, stop=None):
"""Return an IStorageTransactionInformation iterator.
An IStorageTransactionInformation iterator is returned for
iterating over the transactions in the storage.
If the start argument is not None, then iteration will start
with the first transaction whose identifier is greater than or
equal to start.
......@@ -818,8 +824,12 @@ class IStorageIteration(Interface):
the last transaction whose identifier is less than or equal to
stop.
The iterator provides access to the data as available at the time when
the iterator was retrieved.
"""
class IStorageUndoable(IStorage):
"""A storage supporting transactional undo.
"""
......@@ -932,6 +942,7 @@ class IStorageCurrentRecordIteration(IStorage):
"""
class IBlob(Interface):
"""A BLOB supports efficient handling of large data within ZODB."""
......@@ -986,5 +997,12 @@ class IBlobStorage(Interface):
If Blobs use this, then commits can be performed with a simple rename.
"""
class BlobError(Exception):
pass
class StorageStopIteration(IndexError, StopIteration):
"""A combination of StopIteration and IndexError to provide a
backwards-compatible exception.
"""
......@@ -15,6 +15,7 @@
Any storage that supports the iterator() method should be able to pass
all these tests.
"""
from ZODB.tests.MinPO import MinPO
......@@ -23,13 +24,16 @@ from ZODB.utils import U64, p64
from transaction import Transaction
import itertools
class IteratorCompare:
def iter_verify(self, txniter, revids, val0):
eq = self.assertEqual
oid = self._oid
val = val0
for reciter, revid in zip(txniter, revids + [None]):
for reciter, revid in itertools.izip(txniter, revids + [None]):
eq(reciter.tid, revid)
for rec in reciter:
eq(rec.oid, oid)
......@@ -37,7 +41,7 @@ class IteratorCompare:
eq(zodb_unpickle(rec.data), MinPO(val))
val = val + 1
eq(val, val0 + len(revids))
txniter.close()
class IteratorStorage(IteratorCompare):
......@@ -51,13 +55,6 @@ class IteratorStorage(IteratorCompare):
txniter = self._storage.iterator()
self.iter_verify(txniter, [revid1, revid2, revid3], 11)
def checkClose(self):
self._oid = oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
txniter = self._storage.iterator()
txniter.close()
self.assertRaises(IOError, txniter.__getitem__, 0)
def checkUndoZombie(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(94))
......@@ -89,7 +86,7 @@ class IteratorStorage(IteratorCompare):
iter = self._storage.iterator()
count = 0
for txn in iter:
self.assertEqual(txn._extension, {})
self.assertEqual(txn.extension, {})
count +=1
self.assertEqual(count, 1)
......@@ -130,6 +127,31 @@ class IteratorStorage(IteratorCompare):
if not match:
self.fail("Could not find transaction with matching id")
def checkIterateRepeatedly(self):
self._dostore()
transactions = self._storage.iterator()
self.assertEquals(1, len(list(transactions)))
# The iterator can only be consumed once:
self.assertEquals(0, len(list(transactions)))
def checkIterateRecordsRepeatedly(self):
self._dostore()
tinfo = self._storage.iterator().next()
self.assertEquals(1, len(list(tinfo)))
# The iterator can only be consumed once:
self.assertEquals(0, len(list(tinfo)))
def checkIterateWhileWriting(self):
self._dostore()
iterator = self._storage.iterator()
# We have one transaction with 1 modified object.
txn_1 = iterator.next()
self.assertEquals(1, len(list(txn_1)))
# We store another transaction with 1 object, the already running
# iterator does not pick this up.
self._dostore()
self.assertRaises(StopIteration, iterator.next)
class ExtendedIteratorStorage(IteratorCompare):
......@@ -173,28 +195,36 @@ class ExtendedIteratorStorage(IteratorCompare):
txniter = self._storage.iterator(revid3, revid3)
self.iter_verify(txniter, [revid3], 13)
class IteratorDeepCompare:
def compare(self, storage1, storage2):
eq = self.assertEqual
iter1 = storage1.iterator()
iter2 = storage2.iterator()
for txn1, txn2 in zip(iter1, iter2):
for txn1, txn2 in itertools.izip(iter1, iter2):
eq(txn1.tid, txn2.tid)
eq(txn1.status, txn2.status)
eq(txn1.user, txn2.user)
eq(txn1.description, txn2.description)
eq(txn1._extension, txn2._extension)
for rec1, rec2 in zip(txn1, txn2):
eq(txn1.extension, txn2.extension)
itxn1 = iter(txn1)
itxn2 = iter(txn2)
for rec1, rec2 in itertools.izip(itxn1, itxn2):
eq(rec1.oid, rec2.oid)
eq(rec1.tid, rec2.tid)
eq(rec1.data, rec2.data)
# Make sure there are no more records left in rec1 and rec2,
# meaning they were the same length.
self.assertRaises(IndexError, txn1.next)
self.assertRaises(IndexError, txn2.next)
# Additionally, check that we're backwards compatible to the
# IndexError we used to raise before.
self.assertRaises(IndexError, itxn1.next)
self.assertRaises(IndexError, itxn2.next)
self.assertRaises(StopIteration, itxn1.next)
self.assertRaises(StopIteration, itxn2.next)
# Make sure ther are no more records left in txn1 and txn2, meaning
# they were the same length
self.assertRaises(IndexError, iter1.next)
self.assertRaises(IndexError, iter2.next)
iter1.close()
iter2.close()
self.assertRaises(StopIteration, iter1.next)
self.assertRaises(StopIteration, iter2.next)
......@@ -30,6 +30,7 @@ import time
from persistent import Persistent
from persistent.mapping import PersistentMapping
import transaction
import ZODB.interfaces
from ZODB import DB
from ZODB.serialize import referencesf
from ZODB.tests.MinPO import MinPO
......@@ -149,6 +150,16 @@ class PackableStorageBase:
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
def _sanity_check(self):
# Iterate over the storage to make sure it's sane.
if not ZODB.interfaces.IStorageIteration.providedBy(self._storage):
return
it = self._storage.iterator()
for txn in it:
for data in txn:
pass
class PackableStorage(PackableStorageBase):
def checkPackEmptyStorage(self):
......@@ -253,16 +264,7 @@ class PackableStorage(PackableStorageBase):
self.fail('a thread is still alive')
# Iterate over the storage to make sure it's sane, but not every
# storage supports iterators.
if not hasattr(self._storage, "iterator"):
return
it = self._storage.iterator()
for txn in it:
for data in txn:
pass
it.close()
self._sanity_check()
def checkPackWhileWriting(self):
self._PackWhileWriting(pack_now=False)
......@@ -304,14 +306,7 @@ class PackableStorage(PackableStorageBase):
packt = time.time()
thread.join()
# Iterate over the storage to make sure it's sane.
if not hasattr(self._storage, "iterator"):
return
it = self._storage.iterator()
for txn in it:
for data in txn:
pass
it.close()
self._sanity_check()
def checkPackWithMultiDatabaseReferences(self):
databases = {}
......
......@@ -22,6 +22,7 @@ from ZODB.serialize import referencesf
import time
class RecoveryStorage(IteratorDeepCompare):
# Requires a setUp() that creates a self._dst destination storage
def checkSimpleRecovery(self):
......@@ -49,12 +50,16 @@ class RecoveryStorage(IteratorDeepCompare):
# copy the final transaction manually. even though there
# was a pack, the restore() ought to succeed.
it = self._storage.iterator()
final = list(it)[-1]
# Get the last transaction and its record iterator. Record iterators
# can't be accessed out-of-order, so we need to do this in a bit
# complicated way:
for final in it:
records = list(final)
self._dst.tpc_begin(final, final.tid, final.status)
for r in final:
for r in records:
self._dst.restore(r.oid, r.tid, r.data, '', r.data_txn,
final)
it.close()
self._dst.tpc_vote(final)
self._dst.tpc_finish(final)
......
......@@ -636,11 +636,13 @@ class TransactionalUndoStorage:
for j in range(OBJECTS):
oid = s.new_oid()
obj = MinPO(i * OBJECTS + j)
revid = s.store(oid, None, zodb_pickle(obj), '', t)
orig.append((tid, oid, revid))
s.store(oid, None, zodb_pickle(obj), '', t)
orig.append((tid, oid))
s.tpc_vote(t)
s.tpc_finish(t)
orig = [(tid, oid, s.getTid(oid)) for tid, oid in orig]
i = 0
for tid, oid, revid in orig:
self._dostore(oid, revid=revid, data=MinPO(revid),
......@@ -668,14 +670,11 @@ class TransactionalUndoStorage:
# OBJECTS * BATCHES modifications, followed by
# BATCHES undos
iter = s.iterator()
offset = 0
transactions = s.iterator()
eq = self.assertEqual
for i in range(BATCHES):
txn = iter[offset]
offset += 1
txn = transactions.next()
tid = p64(i + 1)
eq(txn.tid, tid)
......@@ -687,13 +686,11 @@ class TransactionalUndoStorage:
eq(L1, L2)
for i in range(BATCHES * OBJECTS):
txn = iter[offset]
offset += 1
txn = transactions.next()
eq(len([rec for rec in txn if rec.data_txn is None]), 1)
for i in range(BATCHES):
txn = iter[offset]
offset += 1
txn = transactions.next()
# The undos are performed in reverse order.
otid = p64(BATCHES - i)
......@@ -704,7 +701,7 @@ class TransactionalUndoStorage:
L2.sort()
eq(L1, L2)
self.assertRaises(IndexError, iter.__getitem__, offset)
self.assertRaises(StopIteration, transactions.next)
def checkUndoLogMetadata(self):
# test that the metadata is correct in the undo log
......
......@@ -308,6 +308,7 @@ class FileStorageTests(
else:
self.assertNotEqual(next_oid, None)
class FileStorageRecoveryTest(
StorageTestBase.StorageTestBase,
RecoveryStorage.RecoveryStorage,
......@@ -326,6 +327,40 @@ class FileStorageRecoveryTest(
def new_dest(self):
return ZODB.FileStorage.FileStorage('Dest.fs')
class FileStorageNoRestore(ZODB.FileStorage.FileStorage):
@property
def restore(self):
raise Exception
class FileStorageNoRestoreRecoveryTest(
StorageTestBase.StorageTestBase,
RecoveryStorage.RecoveryStorage,
):
# This test actually verifies a code path of
# BaseStorage.copyTransactionsFrom. For simplicity of implementation, we
# use a FileStorage deprived of its restore method.
def setUp(self):
self._storage = FileStorageNoRestore("Source.fs", create=True)
self._dst = FileStorageNoRestore("Dest.fs", create=True)
def tearDown(self):
self._storage.close()
self._dst.close()
self._storage.cleanup()
self._dst.cleanup()
def new_dest(self):
return FileStorageNoRestore('Dest.fs')
def checkRestoreAcrossPack(self):
# Skip this check as it calls restore directly.
pass
class SlowFileStorageTest(BaseFileStorageTests):
level = 2
......@@ -492,7 +527,8 @@ def test_suite():
suite = unittest.TestSuite()
for klass in [FileStorageTests, Corruption.FileStorageCorruptTests,
FileStorageRecoveryTest, SlowFileStorageTest]:
FileStorageRecoveryTest, FileStorageNoRestoreRecoveryTest,
SlowFileStorageTest]:
suite.addTest(unittest.makeSuite(klass, "check"))
suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp,
tearDown=ZODB.tests.util.tearDown))
......
......@@ -37,6 +37,7 @@ class MappingStorageTests(StorageTestBase.StorageTestBase,
# have this limit, so we inhibit this test here.
pass
def test_suite():
suite = unittest.makeSuite(MappingStorageTests, 'check')
return suite
......
......@@ -295,5 +295,3 @@ def mktemp(dir=None):
handle, filename = mkstemp(dir=dir)
os.close(handle)
return filename
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment