Commit 6c7d0c42 authored by Guido van Rossum's avatar Guido van Rossum

Commit changes from the short-lived "Recovery" branch to the trunk.

Highlights:

BaseStorage.py:

Fix copyTransactionsFrom() when commitVersion(), abortVersion() or
transactionalUndo() is used.

FileStorage.py:

Add restore() method, which can store data records corresponding to
undo or version manipulations; add close() method to FileIterator
class; raise POSKeyError instead of KeyError for bad keys, to ensure
safe formatting of transaction ids (binary strings) in tracebacks.

POSException.py:

Add POSKeyError.

fsdump.py:

Deal with records indicating the undo or abort of a version doing
object creation.

tests/IteratorStorage.py:

New unittests for the iterator() method and interface of the
storage API; new unit tests for extended file iterators; new class,
IteratorDeepCompare; test of the iterator .close() method.

tests/testFileStorage.py:

Add class FileStorageRecoveryTest, which adds two simple tests for
copyTransactionsFrom().  This indirectly tests the new restore()
method.
parent 2470e3fa
......@@ -14,7 +14,7 @@
"""
# Do this portably in the face of checking out with -kv
import string
__version__ = string.split('$Revision: 1.17 $')[-2:][0]
__version__ = string.split('$Revision: 1.18 $')[-2:][0]
import ThreadLock, bpthread
import time, UndoLogCompatible
......@@ -227,7 +227,24 @@ class BaseStorage(UndoLogCompatible.UndoLogCompatible):
"""
_ts=None
ok=1
preindex={}; preget=preindex.get # waaaa
preindex={};
preget=preindex.get # waaaa
# restore() is a new storage API method which has an identical
# signature to store() except that it does not return anything.
# Semantically, restore() is also identical to store() except that it
# doesn't do the ConflictError or VersionLockError consistency
# checks. The reason to use restore() over store() in this method is
# that store() cannot be used to copy transactions spanning a version
# commit or abort, or over transactional undos.
#
# We'll use restore() if it's available, otherwise we'll fall back to
# using store(). However, if we use store, then
# copyTransactionsFrom() may fail with VersionLockError or
# ConflictError.
if hasattr(self, 'restore'):
restoring = 1
else:
restoring = 0
for transaction in other.iterator():
tid=transaction.tid
......@@ -252,9 +269,12 @@ class BaseStorage(UndoLogCompatible.UndoLogCompatible):
for r in transaction:
oid=r.oid
if verbose: print `oid`, r.version, len(r.data)
pre=preget(oid, None)
s=self.store(oid, pre, r.data, r.version, transaction)
preindex[oid]=s
if restoring:
self.restore(oid, r.serial, r.data, r.version, transaction)
else:
pre=preget(oid, None)
s=self.store(oid, pre, r.data, r.version, transaction)
preindex[oid]=s
self.tpc_vote(transaction)
self.tpc_finish(transaction)
......
......@@ -114,12 +114,12 @@
# may have a back pointer to a version record or to a non-version
# record.
#
__version__='$Revision: 1.78 $'[11:-2]
__version__='$Revision: 1.79 $'[11:-2]
import struct, time, os, string, base64, sys
from struct import pack, unpack
import POSException
from POSException import UndoError
from POSException import UndoError, POSKeyError
from TimeStamp import TimeStamp
from lock_file import lock_file
from utils import t32, p64, U64, cp
......@@ -574,7 +574,10 @@ class FileStorage(BaseStorage.BaseStorage,
file=self._file
seek=file.seek
read=file.read
pos=_index[oid]
try:
pos=_index[oid]
except KeyError:
raise POSKeyError(oid)
while 1:
seek(pos)
h=read(42)
......@@ -583,7 +586,8 @@ class FileStorage(BaseStorage.BaseStorage,
if dserial == serial: break # Yeee ha!
# Keep looking for serial
pos=U64(prev)
if not pos: raise KeyError, serial
if not pos:
raise POSKeyError(serial)
continue
if vlen:
......@@ -682,6 +686,80 @@ class FileStorage(BaseStorage.BaseStorage,
finally:
self._lock_release()
def restore(self, oid, serial, data, version, transaction):
# A lot like store() but without all the consistency checks. This
# should only be used when we /know/ the data is good, hence the
# method name. While the signature looks like store() there are some
# differences:
#
# - serial is the serial number of /this/ revision, not of the
# previous revision. It is used instead of self._serial, which is
# ignored.
#
# - Nothing is returned
#
# - data can be None, which indicates a George Bailey object (i.e. one
# who's creation has been transactionally undone).
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
# Position of the non-version data
pnv = None
# We need to get some information about previous revisions of the
# object. Specifically, we need the position of the non-version
# data if this update is in a version. We also need the position
# of the previous record in this version.
old = self._index_get(oid, 0)
if old:
self._file.seek(old)
# Read the previous revision record
h = self._file.read(42)
doid,oserial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
if doid != oid:
raise CorruptedDataError, h
# Calculate the file position in the temporary file
here = self._pos + self._tfile.tell() + self._thl
# And update the temp file index
self._tindex[oid] = here
# Write the recovery data record
if data is None:
dlen = 0
else:
dlen = len(data)
self._tfile.write(pack('>8s8s8s8sH8s',
oid, serial, p64(old), p64(self._pos),
len(version), p64(dlen)))
# We need to write some version information if this revision is
# happening in a version.
if version:
# If there's a previous revision in this version, write the
# position, otherwise write the position of the previous
# non-version revision.
if pnv:
self._tfile.write(pnv)
else:
self._tfile.write(p64(old))
# Link to the last record for this version
pv = self._tvindex.get(version, 0)
if not pv:
self._vindex_get(version, 0)
self._tfile.write(p64(pv))
self._tvindex[version] = here
self._tfile.write(version)
# And finally, write the data
if data is None:
# Write a zero backpointer, which is indication used to
# represent an un-creation transaction.
self._tfile.write(z64)
else:
self._tfile.write(data)
finally:
self._lock_release()
def supportsUndo(self):
return 1
......@@ -942,7 +1020,7 @@ class FileStorage(BaseStorage.BaseStorage,
def transactionalUndo(self, transaction_id, transaction):
"""Undo a transaction, given by transaction_id.
Do so by writing new data that reverses tyhe action taken by
Do so by writing new data that reverses the action taken by
the transaction."""
# Usually, we can get by with just copying a data pointer, by
# writing a file position rather than a pickle. Sometimes, we
......@@ -1944,7 +2022,8 @@ def _loadBack(file, oid, back):
while 1:
old=U64(back)
if not old: raise KeyError, oid
if not old:
raise POSKeyError(oid)
seek(old)
h=read(42)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
......@@ -1961,7 +2040,8 @@ def _loadBackPOS(file, oid, back):
while 1:
old=U64(back)
if not old: raise KeyError, oid
if not old:
raise POSKeyError(oid)
seek(old)
h=read(42)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
......@@ -2014,6 +2094,8 @@ class FileIterator(Iterator):
"""Iterate over the transactions in a FileStorage file.
"""
_ltid=z64
_file = None
def __init__(self, file, start=None, stop=None):
if isinstance(file, StringType):
......@@ -2030,6 +2112,12 @@ class FileIterator(Iterator):
self._skip_to_start(start)
self._stop = stop
def close(self):
file = self._file
if file is not None:
self._file = None
file.close()
def _skip_to_start(self, start):
# Scan through the transaction records doing almost no sanity
# checks.
......@@ -2057,6 +2145,10 @@ class FileIterator(Iterator):
self._file.name, pos, U64(rtl), U64(stl))
def next(self, index=0):
if self._file is None:
# A closed iterator. XXX: Is IOError the best we can do? For
# now, mimic a read on a closed file.
raise IOError, 'iterator is closed'
file=self._file
seek=file.seek
read=file.read
......
......@@ -12,8 +12,8 @@
##############################################################################
"""BoboPOS-defined exceptions
$Id: POSException.py,v 1.9 2002/01/17 17:34:33 jeremy Exp $"""
__version__ = '$Revision: 1.9 $'.split()[-2:][0]
$Id: POSException.py,v 1.10 2002/01/25 02:15:07 gvanrossum Exp $"""
__version__ = '$Revision: 1.10 $'.split()[-2:][0]
from string import join
from types import StringType, DictType
......@@ -23,6 +23,13 @@ class POSError(Exception):
"""Persistent object system error
"""
class POSKeyError(KeyError, POSError):
"""Key not found in database
"""
def __str__(self):
return "%016x" % utils.U64(self.args[0])
class TransactionError(POSError):
"""An error occured due to normal transaction processing
"""
......
......@@ -49,9 +49,12 @@ def fsdump(path, file=None, with_offset=1):
(`trans.status`, trans.user, trans.description)
j = 0
for rec in trans:
modname, classname = get_pickle_metadata(rec.data)
dig = md5.new(rec.data).hexdigest()
fullclass = "%s.%s" % (modname, classname)
if rec.data is None:
fullclass = "undo or abort of object creation"
else:
modname, classname = get_pickle_metadata(rec.data)
dig = md5.new(rec.data).hexdigest()
fullclass = "%s.%s" % (modname, classname)
# special case for testing purposes
if fullclass == "ZODB.tests.MinPO.MinPO":
obj = zodb_unpickle(rec.data)
......
......@@ -38,6 +38,13 @@ class IteratorStorage(IteratorCompare):
txniter = self._storage.iterator()
self.iter_verify(txniter, [revid1, revid2, revid3], 11)
def checkClose(self):
self._oid = oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=MinPO(11))
txniter = self._storage.iterator()
txniter.close()
self.assertRaises(IOError, txniter.__getitem__, 0)
def checkVersionIterator(self):
if not self._storage.supportsVersions():
return
......@@ -60,53 +67,41 @@ class IteratorStorage(IteratorCompare):
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# XXX extend these checks. right now, just iterating with CVS
# FS or Berkeley will fail here, but once fixed we should
# check that the right data is returned.
txniter = self._storage.iterator()
for trans in txniter:
for data in trans:
pass
def checkTransactionalUndoIterator(self):
def checkUndoZombieNonVersion(self):
if not hasattr(self._storage, 'supportsTransactionalUndo'):
return
if not self._storage.supportsTransactionalUndo():
return
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(23))
revid = self._dostore(oid, revid=revid, data=MinPO(24))
revid = self._dostore(oid, revid=revid, data=MinPO(25))
self.undoTrans(0)
self.undoTrans(2)
self.undoTrans(4)
# XXX extend these checks. right now, just iterating with CVS
# FS or Berkeley will fail here, but once fixed we should
# check that the right data is returned.
txniter = self._storage.iterator()
for trans in txniter:
for data in trans:
pass
# The last transaction performed an undo of the transaction
# that created object oid. (As Barry points out, the object
# is now in the George Bailey state.) Assert that the final
# data record contains None in the data attribute.
self.assertEqual(data.oid, oid)
self.assertEqual(data.data, None)
def undoTrans(self, i):
revid = self._dostore(oid, data=MinPO(94))
# Get the undo information
info = self._storage.undoInfo()
tid = info[i]['id']
tid = info[0]['id']
# Undo the creation of the object, rendering it a zombie
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# Now attempt to iterator over the storage
iter = self._storage.iterator()
for txn in iter:
for rec in txn:
pass
# The last transaction performed an undo of the transaction that
# created object oid. (As Barry points out, the object is now in the
# George Bailey state.) Assert that the final data record contains
# None in the data attribute.
self.assertEqual(rec.oid, oid)
self.assertEqual(rec.data, None)
class ExtendedIteratorStorage(IteratorCompare):
......@@ -145,3 +140,27 @@ class ExtendedIteratorStorage(IteratorCompare):
txniter = self._storage.iterator(revid3, revid3)
self.iter_verify(txniter, [revid3], 13)
class IteratorDeepCompare:
def compare(self, storage1, storage2):
eq = self.assertEqual
iter1 = storage1.iterator()
iter2 = storage2.iterator()
for txn1, txn2 in zip(iter1, iter2):
eq(txn1.tid, txn2.tid)
eq(txn1.status, txn2.status)
eq(txn1.user, txn2.user)
eq(txn1.description, txn2.description)
eq(txn1._extension, txn2._extension)
for rec1, rec2 in zip(txn1, txn2):
eq(rec1.oid, rec2.oid)
eq(rec1.serial, rec2.serial)
eq(rec1.version, rec2.version)
eq(rec1.data, rec2.data)
# Make sure there are no more records left in rec1 and rec2,
# meaning they were the same length.
self.assertRaises(IndexError, txn1.next)
self.assertRaises(IndexError, txn2.next)
# Make sure ther are no more records left in txn1 and txn2, meaning
# they were the same length
self.assertRaises(IndexError, iter1.next)
self.assertRaises(IndexError, iter2.next)
from __future__ import nested_scopes
import ZODB.FileStorage
import sys, os, unittest
import errno
from ZODB.Transaction import Transaction
from ZODB.tests import StorageTestBase, BasicStorage, \
TransactionalUndoStorage, VersionStorage, \
......@@ -45,10 +49,83 @@ class FileStorageTests(
if os.path.exists(path):
os.remove(path)
class FileStorageRecoveryTest(
StorageTestBase.StorageTestBase,
IteratorStorage.IteratorDeepCompare,
):
def setUp(self):
StorageTestBase.StorageTestBase.setUp(self)
self._storage = ZODB.FileStorage.FileStorage('Source.fs')
self._dst = ZODB.FileStorage.FileStorage('Dest.fs')
def tearDown(self):
StorageTestBase.StorageTestBase.tearDown(self)
self._dst.close()
for ext in '', '.old', '.tmp', '.lock', '.index':
for fs in 'Source', 'Dest':
path = fs + '.fs' + ext
try:
os.remove(path)
except OSError, e:
if e.errno <> errno.ENOENT: raise
def checkSimpleRecovery(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=11)
revid = self._dostore(oid, revid=revid, data=12)
revid = self._dostore(oid, revid=revid, data=13)
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def checkRecoveryAcrossVersions(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=21)
revid = self._dostore(oid, revid=revid, data=22)
revid = self._dostore(oid, revid=revid, data=23, version='one')
revid = self._dostore(oid, revid=revid, data=34, version='one')
# Now commit the version
t = Transaction()
self._storage.tpc_begin(t)
self._storage.commitVersion('one', '', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def checkRecoverAbortVersion(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=21, version="one")
revid = self._dostore(oid, revid=revid, data=23, version='one')
revid = self._dostore(oid, revid=revid, data=34, version='one')
# Now abort the version and the creation
t = Transaction()
self._storage.tpc_begin(t)
oids = self._storage.abortVersion('one', t)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
self.assertEqual(oids, [oid])
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
# Also make sure the the last transaction has a data record
# with None for its data attribute, because we've undone the
# object.
for s in self._storage, self._dst:
iter = s.iterator()
for trans in iter:
pass # iterate until we get the last one
data = trans[0]
self.assertRaises(IndexError, lambda i:trans[i], 1)
self.assertEqual(data.oid, oid)
self.assertEqual(data.data, None)
def test_suite():
suite = unittest.makeSuite(FileStorageTests, 'check')
suite2 = unittest.makeSuite(Corruption.FileStorageCorruptTests, 'check')
suite3 = unittest.makeSuite(FileStorageRecoveryTest, 'check')
suite.addTest(suite2)
suite.addTest(suite3)
return suite
def main():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment