Commit 208f3974 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge zodb4 conversion code from the zope3-zodb3-devel-branch.

parent 6ca2ed8c
# This directory contains a Python package.
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Nitty-gritty conversion of a ZODB 4 FileStorage to a ZODB 3 FileStorage."""
from cPickle import dumps, Pickler, Unpickler
from cStringIO import StringIO
from ZODB.FileStorage import FileStorage
from ZODB.zodb4 import z4iterator
class Conversion:
def __init__(self, input_path, output_path):
"""Initialize a ZODB4->ZODB3 FileStorage converter."""
self.instore = IterableFileIterator(input_path)
self.outstore = FileStorage(output_path)
def run(self):
self.instore._read_metadata()
self.outstore.copyTransactionsFrom(self.instore)
self.outstore.close()
self.instore.close()
class IterableFileIterator(z4iterator.FileIterator):
def iterator(self):
return self
def __iter__(self):
baseiter = z4iterator.FileIterator.__iter__(self)
for txn in baseiter:
yield DataRecordConvertingTxn(txn)
class DataRecordConvertingTxn(object):
def __init__(self, txn):
self._txn = txn
self.user = str8(txn.user)
self.description = str8(txn.description)
def __getattr__(self, name):
return getattr(self._txn, name)
def __iter__(self):
for record in self._txn:
record.tid = record.serial
# transform the data record format
# (including persistent references)
sio = StringIO(record.data)
up = Unpickler(sio)
up.persistent_load = PersistentIdentifier
classmeta = up.load()
state = up.load()
sio = StringIO()
p = Pickler(sio, 1)
p.persistent_id = get_persistent_id
p.dump(classmeta)
p.dump(state)
record.data = sio.getvalue()
yield record
class PersistentIdentifier:
def __init__(self, ident):
if isinstance(ident, tuple):
self._oid, (self._class, args) = ident
if args:
# we have args from __getnewargs__(), but can just
# lose them since they're an optimization to allow
# ghost construction
self._class = None
else:
assert isinstance(ident, str)
self._oid = ident
self._class = None
def get_persistent_id(ob):
if isinstance(ob, PersistentIdentifier):
if ob._class is None:
return ob._oid
else:
return ob._oid, ob._class
else:
return None
def str8(s):
# convert unicode strings to 8-bit strings
if isinstance(s, unicode):
# Should we use UTF-8 or ASCII? Not sure.
return s.encode("ascii")
else:
return s
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Script to convert a ZODB 4 file storage to a ZODB 3 file storage.
This is needed since Zope 3 is being changed to use ZODB 3 instead of
ZODB 4.
"""
import getopt
import os
import sys
try:
__file__
except NameError:
__file__ = os.path.realpath(sys.argv[0])
here = os.path.dirname(__file__)
topdir = os.path.dirname(os.path.dirname(here))
# Make sure that if we're run as a script, we can import the ZODB
# package and our sibling modules.
try:
import ZODB.zodb4
except ImportError:
sys.path.append(topdir)
import ZODB.zodb4
from ZODB.lock_file import LockFile
from ZODB.zodb4 import conversion
class ConversionApp:
def __init__(self, name=None, args=None):
if name is None:
name = os.path.basename(sys.argv[0])
if args is None:
args = sys.argv[1:]
self.name = name
self.verbosity = 0
self.dbfile = None
self.parse_args(args)
def run(self):
if not os.path.exists(self.dbfile):
self.error("input database does not exist: %s" % self.dbfile)
base, ext = os.path.splitext(self.dbfile)
if ext != ".fs":
base = self.dbfile
self.dbindex = self.dbfile + ".index"
self.bakfile = base + ".fs4"
self.bakindex = self.bakfile + ".index"
if os.path.exists(self.bakfile):
self.error("backup database already exists: %s\n"
"please move aside and try again" % self.bakfile)
if os.path.exists(self.bakindex):
self.error("backup database index already exists: %s\n"
"please move aside and try again" % self.bakindex)
self.convert()
def convert(self):
lock = LockFile(self.bakfile + ".lock")
try:
# move the ZODB 4 database to be the backup
os.rename(self.dbfile, self.bakfile)
if os.path.exists(self.dbindex):
try:
os.rename(self.dbindex, self.bakindex)
except:
# we couldn't rename *both*, so try to make sure we
# don't rename either
os.rename(self.bakfile, self.dbfile)
raise
# go:
converter = conversion.Conversion(self.bakfile, self.dbfile)
converter.run()
finally:
lock.close()
def parse_args(self, args):
opts, args = getopt.getopt(args, "v", ["verbose"])
for opt, arg in opts:
if opt in ("-v", "--verbose"):
self.verbosity += 1
if len(args) == 0:
# use default location for Data.fs
self.dbfile = os.path.join(topdir, "Data.fs")
elif len(args) == 1:
self.dbfile = args[0]
else:
self.error("too many command-line arguments", rc=2)
def error(self, message, rc=1):
print >>sys.stderr, "%s: %s" % (self.name, message)
sys.exit(rc)
def main():
ConversionApp().run()
if __name__ == "__main__":
main()
# This directory contains a Python package.
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the routines to convert between long and 64-bit strings"""
# originally zodb.tests.test_utils
import random
import unittest
NUM = 100
from ZODB.zodb4.z4utils import p64, u64
class TestUtils(unittest.TestCase):
small = [random.randrange(1, 1L<<32, int=long)
for i in range(NUM)]
large = [random.randrange(1L<<32, 1L<<64, int=long)
for i in range(NUM)]
all = small + large
def test_LongToStringToLong(self):
for num in self.all:
s = p64(num)
n2 = u64(s)
self.assertEquals(num, n2, "u64() failed")
def test_KnownConstants(self):
self.assertEquals("\000\000\000\000\000\000\000\001", p64(1))
self.assertEquals("\000\000\000\001\000\000\000\000", p64(1L<<32))
self.assertEquals(u64("\000\000\000\000\000\000\000\001"), 1)
self.assertEquals(u64("\000\000\000\001\000\000\000\000"), 1L<<32)
def test_suite():
return unittest.makeSuite(TestUtils)
if __name__ == "__main__":
unittest.main(defaultTest="test_suite")
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Convenience function extracted from ZODB4's zodb.storage.base."""
def splitrefs(refstr, oidlen=8):
# refstr is a packed string of reference oids. Always return a list of
# oid strings. Most storages use fixed oid lengths of 8 bytes, but if
# the oids in refstr are a different size, use oidlen to specify. This
# does /not/ support variable length oids in refstr.
if not refstr:
return []
num, extra = divmod(len(refstr), oidlen)
fmt = '%ds' % oidlen
assert extra == 0, refstr
return list(struct.unpack('>' + (fmt * num), refstr))
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""FileStorage-specific exceptions."""
from ZODB.zodb4.z4interfaces import _fmt_oid, POSError
# originally from zodb.storage.interfaces
class StorageError(POSError):
"""Base class for storage based exceptions."""
class StorageSystemError(StorageError):
"""Panic! Internal storage error!"""
# originally zodb.storage.file.errors
class FileStorageError(StorageError):
pass
class PackError(FileStorageError):
pass
class FileStorageFormatError(FileStorageError):
"""Invalid file format
The format of the given file is not valid.
"""
class CorruptedError(FileStorageError, StorageSystemError):
"""Corrupted file storage."""
class CorruptedDataError(CorruptedError):
def __init__(self, oid=None, buf=None, pos=None):
self.oid = oid
self.buf = buf
self.pos = pos
def __str__(self):
if self.oid:
msg = "Error reading oid %s. Found %r" % (_fmt_oid(self.oid),
self.buf)
else:
msg = "Error reading unknown oid. Found %r" % self.buf
if self.pos:
msg += " at %d" % self.pos
return msg
class FileStorageQuotaError(FileStorageError, StorageSystemError):
"""File storage quota exceeded."""
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
# originally zodb.storage.file.format
"""Tools for working with the low-level FileStorage format.
Files are arranged as follows.
- The first 1024 bytes are a storage metadata section.
The first two bytes are the characters F and S.
The next two bytes are a storage format version id, currently 43.
The next four bytes are the database version string.
The rest of the section is reserved.
A transaction record consists of:
- 8-byte transaction id, which is also a time stamp.
- 8-byte transaction record length - 8.
- 1-byte status code
- 2-byte length of user name
- 2-byte length of description
- 2-byte length of extension attributes
- user name
- description
- extension attributes
* A sequence of data records
- 8-byte redundant transaction length -8
A data record consists of
- 8-byte oid.
- 8-byte serial, which is a type stamp that matches the
transaction timestamp.
- 8-byte previous-record file-position.
- 8-byte beginning of transaction record file position.
- 2-byte version length
- 4-byte number of object references (oids)
- 8-byte data length
? 8-byte position of non-version data
(if version length > 0)
? 8-byte position of previous record in this version
(if version length > 0)
? version string (if version length > 0)
? reference oids (length == # of oids * 8)
? data (if data length > 0)
? 8-byte position of data record containing data
(data length == 0)
Note that the lengths and positions are all big-endian.
Also, the object ids time stamps are big-endian, so comparisons
are meaningful.
Version handling
There isn't a separate store for versions. Each record has a
version field, indicating what version it is in. The records in a
version form a linked list. Each record that has a non-empty
version string has a pointer to the previous record in the version.
Version back pointers are retained *even* when versions are
committed or aborted or when transactions are undone.
There is a notion of 'current' version records, which are the
records in a version that are the current records for their
respective objects. When a version is comitted, the current records
are committed to the destination version. When a version is
aborted, the current records are aborted.
When committing or aborting, we search backward through the linked
list until we find a record for an object that does not have a
current record in the version. If we find a record for which the
non-version pointer is the same as the previous pointer, then we
forget that the corresponding object had a current record in the
version. This strategy allows us to avoid searching backward through
previously committed or aborted version records.
Of course, we ignore records in undone transactions when committing
or aborting.
Backpointers
When we commit or abort a version, we don't copy (or delete)
and data. Instead, we write records with back pointers.
A version record *never* has a back pointer to a non-version
record, because we never abort to a version. A non-version record
may have a back pointer to a version record or to a non-version
record.
"""
import logging
import struct
from ZODB.zodb4.z4base import splitrefs
from ZODB.zodb4.z4interfaces import ZERO, MAXTID, POSKeyError, _fmt_oid
from ZODB.zodb4.z4utils import u64, p64
from ZODB.zodb4.z4errors \
import CorruptedDataError, CorruptedError, FileStorageFormatError
# the struct formats for the headers
TRANS_HDR = ">8sQcHHH"
DATA_HDR = ">8s8sQQHIQ"
# constants to support various header sizes
TRANS_HDR_LEN = 23
DATA_HDR_LEN = 46
DATA_VERSION_HDR_LEN = 62
assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
logger = logging.getLogger("zodb.storage.file")
def panic(message, *data):
logger.critical(message, *data)
raise CorruptedError(message % data)
class FileStorageFormatter:
"""Mixin class that can read and write the low-level format."""
# subclasses must provide _file
def _read_index(self, index, vindex, tindex, stop=MAXTID,
ltid=ZERO, start=None, maxoid=ZERO, recover=0,
read_only=0):
"""Scan the entire file storage and recreate the index.
Returns file position, max oid, and last transaction id. It also
stores index information in the three dictionary arguments.
Arguments:
index -- dictionary, oid -> data record
vindex -- dictionary, oid -> data record for version data
tindex -- dictionary, oid -> data record
XXX tindex is cleared before return, so it will be empty
There are several default arguments that affect the scan or the
return values. XXX should document them.
The file position returned is the position just after the last
valid transaction record. The oid returned is the maximum object
id in the data. The transaction id is the tid of the last
transaction.
"""
self._file.seek(0, 2)
file_size = self._file.tell()
self._file.seek(0)
if start is None:
start = self._metadata_size
if file_size:
if file_size < start:
raise FileStorageFormatError(self._file.name)
self._read_metadata()
else:
if not read_only:
self._write_metadata()
return self._metadata_size, maxoid, ltid
pos = start
self._file.seek(start)
tid = '\0' * 7 + '\1'
while True:
# Read the transaction record
h = self._file.read(TRANS_HDR_LEN)
if not h:
break
if len(h) != TRANS_HDR_LEN:
if not read_only:
logger.warn('%s truncated at %s', self._file.name, pos)
self._file.seek(pos)
self._file.truncate()
break
tid, tl, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
if el < 0:
el = t32 - el
if tid <= ltid:
logger.warn("%s time-stamp reduction at %s",
self._file.name, pos)
ltid = tid
if pos+(tl+8) > file_size or status=='c':
# Hm, the data were truncated or the checkpoint flag
# wasn't cleared. They may also be corrupted, in
# which case, we don't want to totally lose the data.
if not read_only:
logger.warn("%s truncated, possibly due "
"to damaged records at %s",
self._file.name, pos)
_truncate(self._file, self._file.name, pos)
break
if status not in ' up':
logger.warn('%s has invalid status, %s, at %s',
self._file.name, status, pos)
if tl < (TRANS_HDR_LEN+ul+dl+el):
# We're in trouble. Find out if this is bad data in
# the middle of the file, or just a turd that Win 9x
# dropped at the end when the system crashed. Skip to
# the end and read what should be the transaction
# length of the last transaction.
self._file.seek(-8, 2)
rtl = u64(self._file.read(8))
# Now check to see if the redundant transaction length is
# reasonable:
if file_size - rtl < pos or rtl < TRANS_HDR_LEN:
logger.critical('%s has invalid transaction header at %s',
self._file.name, pos)
if not read_only:
logger.warn("It appears that there is invalid data "
"at the end of the file, possibly due "
"to a system crash. %s truncated "
"to recover from bad data at end.",
self._file.name)
_truncate(file, self._file.name, pos)
break
else:
if recover:
return pos, None, None
panic('%s has invalid transaction header at %s',
self._file.name, pos)
if tid >= stop:
break
tpos = pos
tend = tpos + tl
if status == 'u':
# Undone transaction, skip it
self._file.seek(tend)
h = self._file.read(8)
if h != stl:
if recover: return tpos, None, None
panic('%s has inconsistent transaction length at %s',
self._file.name, pos)
pos = tend + 8
continue
pos = tpos + (TRANS_HDR_LEN + ul + dl + el)
while pos < tend:
# Read the data records for this transaction
h = self._read_data_header(pos)
dlen = h.recordlen()
tindex[h.oid] = pos
if h.version:
vindex[h.version] = pos
if pos + dlen > tend or h.tloc != tpos:
if recover:
return tpos, None, None
panic("%s data record exceeds transaction record at %s",
self._file.name, pos)
if index.get(h.oid, 0) != h.prev:
if h.prev:
if recover:
return tpos, None, None
logger.error("%s incorrect previous pointer at %s: "
"index says %r record says %r",
self._file.name, pos, index.get(h.oid),
h.prev)
pos += dlen
if pos != tend:
if recover:
return tpos, None, None
panic("%s data records don't add up at %s",
self._file.name, tpos)
# Read the (intentionally redundant) transaction length
self._file.seek(pos)
l = u64(self._file.read(8))
if l != tl:
if recover:
return tpos, None, None
panic("%s redundant transaction length check failed at %s",
self._file.name, pos)
pos += 8
if tindex: # avoid the pathological empty transaction case
_maxoid = max(tindex.keys()) # in 2.2, just max(tindex)
maxoid = max(_maxoid, maxoid)
index.update(tindex)
tindex.clear()
return pos, maxoid, ltid
_metadata_size = 1024
_format_version = "43"
def _read_metadata(self):
# Read the 1K metadata block at the beginning of the storage.
self._file.seek(0)
fs = self._file.read(2)
if fs != "FS":
raise FileStorageFormatError(self._file.name)
fsver = self._file.read(2)
if fsver != self._format_version:
raise FileStorageFormatError(self._file.name)
ver = self._file.read(4)
if ver != "\0" * 4:
self._version = ver
def _write_metadata(self):
# Write the 1K metadata block at the beginning of the storage.
self._file.seek(0)
self._file.write("FS")
self._file.write(self._format_version)
# If self._version is not yet set, write all zeros.
if self._version is not None:
self._file.write(self._version)
else:
self._file.write("\0" * 4)
# Fill the rest with null bytes
self._file.write("\0" * (self._metadata_size - 8))
def _read_num(self, pos):
"""Read an 8-byte number."""
self._file.seek(pos)
return u64(self._file.read(8))
def _read_data_header(self, pos, oid=None):
"""Return a DataHeader object for data record at pos.
If ois is not None, raise CorruptedDataError if oid passed
does not match oid in file.
If there is version data, reads the version part of the header.
If there is no pickle data, reads the back pointer.
"""
self._file.seek(pos)
s = self._file.read(DATA_HDR_LEN)
if len(s) != DATA_HDR_LEN:
raise CorruptedDataError(oid, s, pos)
h = DataHeader.fromString(s)
if oid is not None and oid != h.oid:
raise CorruptedDataError(oid, s, pos)
if h.vlen:
s = self._file.read(16 + h.vlen)
h.parseVersion(s)
if not h.plen:
h.back = u64(self._file.read(8))
return h
def _write_version_header(self, file, pnv, vprev, version):
s = struct.pack(">QQ", pnv, vprev)
file.write(s + version)
def _read_txn_header(self, pos, tid=None):
self._file.seek(pos)
s = self._file.read(TRANS_HDR_LEN)
if len(s) != TRANS_HDR_LEN:
raise CorruptedDataError(tid, s, pos)
h = TxnHeader.fromString(s)
if tid is not None and tid != h.tid:
raise CorruptedDataError(tid, s, pos)
h.user = self._file.read(h.ulen)
h.descr = self._file.read(h.dlen)
h.ext = self._file.read(h.elen)
return h
def _loadBack_impl(self, oid, back, fail):
# shared implementation used by various _loadBack methods
#
# If the backpointer ultimately resolves to 0:
# If fail is True, raise KeyError for zero backpointer.
# If fail is False, return the empty data from the record
# with no backpointer.
while 1:
if not back:
# If backpointer is 0, object does not currently exist.
raise POSKeyError(oid)
h = self._read_data_header(back)
refs = self._file.read(h.nrefs * 8)
if h.plen:
return self._file.read(h.plen), refs, h.serial, back, h.tloc
if h.back == 0 and not fail:
assert h.nrefs == 0
return None, None, h.serial, back, h.tloc
back = h.back
def _loadBack(self, oid, back, fail=True):
data, refs, serial, old, tloc = self._loadBack_impl(oid, back, fail)
return data, serial
def _loadBackPOS(self, oid, back, fail=True):
"""Return position of data record for backpointer."""
data, refs, serial, old, tloc = self._loadBack_impl(oid, back, fail)
return old
def _loadBackTxn(self, oid, back, fail=True):
"""Return data, serial, and txn id for backpointer."""
data, refs, serial, old, tloc = self._loadBack_impl(oid, back, fail)
self._file.seek(tloc)
h = self._file.read(TRANS_HDR_LEN)
tid = h[:8]
refs = splitrefs(refs)
return data, refs, serial, tid
def getTxnFromData(self, oid, back):
"""Return transaction id for data at back."""
h = self._read_data_header(back, oid)
self._file.seek(h.tloc)
# seek to transaction header, where tid is first 8 bytes
return self._file.read(8)
def fail(self, pos, msg, *args):
s = ("%s:%s:" + msg) % ((self._name, pos) + args)
logger.error(s)
raise CorruptedError(s)
def checkTxn(self, th, pos):
if th.tid <= self.ltid:
self.fail(pos, "time-stamp reduction: %s <= %s",
_fmt_oid(th.tid), _fmt_oid(self.ltid))
self.ltid = th.tid
if th.status == "c":
self.fail(pos, "transaction with checkpoint flag set")
if not (th.status == " " or th.status == "p"):
self.fail(pos, "invalid transaction status: %r", th.status)
if th.tlen < th.headerlen():
self.fail(pos, "invalid transaction header: "
"txnlen (%d) < headerlen(%d)", th.tlen, th.headerlen())
def checkData(self, th, tpos, dh, pos):
tend = tpos + th.tlen
if dh.tloc != tpos:
self.fail(pos, "data record does not point to transaction header"
": %d != %d", dh.tloc, tpos)
if pos + dh.recordlen() > tpos + th.tlen:
self.fail(pos, "data record size exceeds transaction size: "
"%d > %d", pos + dh.recordlen(), tpos + th.tlen)
if dh.prev >= pos:
self.fail(pos, "invalid previous pointer: %d", dh.prev)
if dh.back:
if dh.back >= pos:
self.fail(pos, "invalid back pointer: %d", dh.prev)
if dh.nrefs or dh.plen:
self.fail(pos, "data record has back pointer and data")
class DataHeader:
"""Header for a data record."""
__slots__ = (
"oid", "serial", "prev", "tloc", "vlen", "plen", "nrefs", "back",
# These three attributes are only defined when vlen > 0
"pnv", "vprev", "version")
version = ""
back = 0
def __init__(self, oid, serial, prev, tloc, vlen, nrefs, plen):
self.oid = oid
self.serial = serial
self.prev = prev
self.tloc = tloc
self.vlen = vlen
self.nrefs = nrefs
self.plen = plen
def fromString(cls, s):
return cls(*struct.unpack(DATA_HDR, s))
fromString = classmethod(fromString)
def asString(self):
s = struct.pack(DATA_HDR, self.oid, self.serial, self.prev,
self.tloc, self.vlen, self.nrefs, self.plen)
if self.version:
v = struct.pack(">QQ", self.pnv, self.vprev)
return s + v + self.version
else:
return s
def setVersion(self, version, pnv, vprev):
self.version = version
self.vlen = len(version)
self.pnv = pnv
self.vprev = vprev
def parseVersion(self, buf):
self.pnv, self.vprev = struct.unpack(">QQ", buf[:16])
self.version = buf[16:]
def recordlen(self):
rlen = DATA_HDR_LEN + (self.nrefs * 8) + (self.plen or 8)
if self.version:
rlen += 16 + self.vlen
return rlen
class TxnHeader:
"""Header for a transaction record."""
__slots__ = ("tid", "tlen", "status", "user", "descr", "ext",
"ulen", "dlen", "elen")
def __init__(self, tid, tlen, status, ulen, dlen, elen):
self.tid = tid
self.tlen = tlen
self.status = status
self.ulen = ulen
self.dlen = dlen
self.elen = elen
def fromString(cls, s):
return cls(*struct.unpack(TRANS_HDR, s))
fromString = classmethod(fromString)
def asString(self):
s = struct.pack(TRANS_HDR, self.tid, self.tlen, self.status,
self.ulen, self.dlen, self.elen)
return "".join([s, self.user, self.descr, self.ext])
def headerlen(self):
return TRANS_HDR_LEN + self.ulen + self.dlen + self.elen
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
# originally zodb.interfaces
"""ZODB database interfaces and exceptions
The Zope Object Database (ZODB) manages persistent objects using
pickle-based object serialization. The database has a pluggable
storage backend.
The IAppDatabase, IAppConnection, and ITransaction interfaces describe
the public APIs of the database.
The IDatabase, IConnection, and ITransactionAttrs interfaces describe
private APIs used by the implementation.
$Id: z4interfaces.py,v 1.2 2004/02/20 19:01:07 jeremy Exp $
"""
from ZODB.zodb4 import z4utils
from zope.interface import Interface, Attribute
##from transaction.interfaces import ITransaction as _ITransaction
##from transaction.interfaces \
## import TransactionError, RollbackError, ConflictError as _ConflictError
__all__ = [
# Constants
'ZERO',
'MAXTID',
# Exceptions
'POSError',
'POSKeyError',
## 'ConflictError',
## 'ReadConflictError',
## 'DanglingReferenceError',
## 'VersionError',
## 'VersionCommitError',
## 'VersionLockError',
## 'UndoError',
## 'MultipleUndoErrors',
## 'ExportError',
## 'Unsupported',
## 'InvalidObjectReference',
## # Interfaces
## 'IAppConnection',
## 'IConnection',
## 'ITransaction',
## 'ITransactionAttrs',
]
ZERO = '\0'*8
MAXTID = '\377'*8
def _fmt_oid(oid):
return "%016x" % z4utils.u64(oid)
def _fmt_undo(oid, reason):
s = reason and (": %s" % reason) or ""
return "Undo error %s%s" % (_fmt_oid(oid), s)
class POSError(StandardError):
"""Persistent object system error."""
class POSKeyError(KeyError, POSError):
"""Key not found in database."""
def __str__(self):
return _fmt_oid(self.args[0])
##class ConflictError(_ConflictError):
## """Two transactions tried to modify the same object at once.
## This transaction should be resubmitted.
## Instance attributes:
## oid : string
## the OID (8-byte packed string) of the object in conflict
## class_name : string
## the fully-qualified name of that object's class
## message : string
## a human-readable explanation of the error
## serials : (string, string)
## a pair of 8-byte packed strings; these are the serial numbers
## related to conflict. The first is the revision of object that
## is in conflict, the second is the revision of that the current
## transaction read when it started.
## The caller should pass either object or oid as a keyword argument,
## but not both of them. If object is passed, it should be a
## persistent object with an _p_oid attribute.
## """
## def __init__(self, message=None, object=None, oid=None, serials=None):
## if message is None:
## self.message = "database conflict error"
## else:
## self.message = message
## if object is None:
## self.oid = None
## self.class_name = None
## else:
## self.oid = object._p_oid
## klass = object.__class__
## self.class_name = klass.__module__ + "." + klass.__name__
## if oid is not None:
## assert self.oid is None
## self.oid = oid
## self.serials = serials
## def __str__(self):
## extras = []
## if self.oid:
## extras.append("oid %s" % _fmt_oid(self.oid))
## if self.class_name:
## extras.append("class %s" % self.class_name)
## if self.serials:
## extras.append("serial was %s, now %s" %
## tuple(map(_fmt_oid, self.serials)))
## if extras:
## return "%s (%s)" % (self.message, ", ".join(extras))
## else:
## return self.message
## def get_oid(self):
## return self.oid
## def get_class_name(self):
## return self.class_name
## def get_old_serial(self):
## return self.serials[0]
## def get_new_serial(self):
## return self.serials[1]
## def get_serials(self):
## return self.serials
##class ReadConflictError(ConflictError):
## """Conflict detected when object was loaded.
## An attempt was made to read an object that has changed in another
## transaction (eg. another thread or process).
## """
## def __init__(self, message=None, object=None, serials=None):
## if message is None:
## message = "database read conflict error"
## ConflictError.__init__(self, message=message, object=object,
## serials=serials)
##class DanglingReferenceError(TransactionError):
## """An object has a persistent reference to a missing object.
## If an object is stored and it has a reference to another object
## that does not exist (for example, it was deleted by pack), this
## exception may be raised. Whether a storage supports this feature,
## it a quality of implementation issue.
## Instance attributes:
## referer: oid of the object being written
## missing: referenced oid that does not have a corresponding object
## """
## def __init__(self, Aoid, Boid):
## self.referer = Aoid
## self.missing = Boid
## def __str__(self):
## return "from %s to %s" % (_fmt_oid(self.referer),
## _fmt_oid(self.missing))
##class VersionError(POSError):
## """An error in handling versions occurred."""
##class VersionCommitError(VersionError):
## """An invalid combination of versions was used in a version commit."""
##class VersionLockError(VersionError, TransactionError):
## """Can't modify an object that is modified in unsaved version."""
## def __init__(self, oid, version):
## self.oid = oid
## self.version = version
## def __str__(self):
## return "%s locked in version %r" % (_fmt_oid(self.oid),
## self.version)
##class UndoError(POSError):
## """An attempt was made to undo a non-undoable transaction."""
## def __init__(self, oid, reason=None):
## self._oid = oid
## self._reason = reason
## def __str__(self):
## return _fmt_undo(self._oid, self._reason)
##class MultipleUndoErrors(UndoError):
## """Several undo errors occured during a single transaction."""
## def __init__(self, errs):
## # provide an oid and reason for clients that only look at that
## UndoError.__init__(self, *errs[0])
## self._errs = errs
## def __str__(self):
## return "\n".join([_fmt_undo(*pair) for pair in self._errs])
##class ExportError(POSError):
## """An export file doesn't have the right format."""
##class Unsupported(POSError):
## """An feature that is unsupported bt the storage was used."""
##class InvalidObjectReference(POSError):
## """An object contains an invalid reference to another object.
## A reference is invalid if it refers to an object managed
## by a different database connection.
## Attributes:
## obj is the invalid object
## jar is the manager that attempted to store it.
## obj._p_jar != jar
## """
## def __init__(self, obj, jar):
## self.obj = obj
## self.jar = jar
## def __str__(self):
## return "Invalid reference to object %s." % _fmt_oid(self.obj._p_jar)
##class IAppDatabase(Interface):
## """Interface exported by database to applications.
## The database contains a graph of objects reachable from the
## distinguished root object. The root object is a mapping object
## that can contain arbitrary application data.
## There is only rudimentary support for using more than one database
## in a single application. The persistent state of an object in one
## database can not contain a direct reference to an object in
## another database.
## """
## def open(version="", transaction=None, temporary=False, force=False,
## waitflag=True):
## # XXX Most of these arguments should eventually go away
## """Open a new database connection."""
## def abortVersion(version):
## """Abort the locked database version named version."""
## def commitVersion(source, dest=""):
## """Commit changes from locked database version source to dest.
## The default value of dest means commit the changes to the
## default version.
## """
## def pack(time):
## """Pack database to time."""
## def undo(txnid):
## """Undo changes caused by transaction txnid."""
##class IAppConnection(Interface):
## """Interface exported by database connection to applications.
## Each database connection provides an independent copy of the
## persistent object space. ZODB supports multiple threads by
## providing each thread with a separate connection.
## Connections are synchronized through database commits and explicit
## sync() calls. Changes to the object space are only made visible
## when a transaction commits. When a connection commits its
## changes, they become visible to other connections. Changes made
## by other connections are also become visible at this time.
## """
## def root():
## """Return the root of the database."""
## def sync():
## """Process pending invalidations.
## If there is a current transaction, it will be aborted.
## """
## def get(oid):
## """Return object for `oid`.
## The object may be a ghost.
## """
##class IDatabase(Interface):
## """Interface between the database and its connections."""
## def invalidate(oids, conn=None, version=""):
## pass
## def _closeConnection(conn):
## pass
##class IConnection(Interface):
## """Interface required of Connection by ZODB DB.
## The Connection also implements IPersistentDataManager.
## """
## def invalidate(oids):
## """Invalidate a set of oids modified by a single transaction.
## This marks the oids as invalid, but doesn't actually
## invalidate them. The object data will be actually invalidated
## at certain transaction boundaries.
## """
## def reset(version=""):
## """Reset connection to use specified version."""
## def getVersion():
## """Return the version that connection is using."""
## def close():
## pass
## def cacheGC():
## pass
## def add(obj):
## """Add a persistent object to this connection.
## Essentially, set _p_jar and assign _p_oid on the object.
## Raises a TypeError if obj is not persistent. Does nothing if
## obj is already added to this connection.
## """
##class ITransaction(_ITransaction):
## """Extends base ITransaction with with metadata.
## Client code should use this interface to set attributes.
## """
## def note(text):
## """Add the text to the transaction description
## If there previous description isn't empty, a blank line is
## added before the new text.
## """
## def setUser(user_name):
## """Set the transaction user name."""
## def setExtendedInfo(name, value):
## """Set extended information."""
##class ITransactionAttrs(_ITransaction):
## # XXX The following attributes used by storages, so they are part
## # of the interface. But I'd rather not have user code explicitly
## # use the attributes.
## user = Attribute("The user as set by setUser()")
## description = Attribute("A description as set by note()")
## _extension = Attribute(
## """Extended info as set by setExtendedInfo()
## Should be None or a dictionary.""")
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Iterator support for ZODB 4 databases."""
from cPickle import loads
from struct import unpack
from ZODB.zodb4.z4interfaces import ZERO
from ZODB.zodb4.z4utils import u64, splitrefs
from ZODB.zodb4.z4format import FileStorageFormatter, DataHeader, TxnHeader
from ZODB.zodb4.z4format import TRANS_HDR, TRANS_HDR_LEN, DATA_HDR
from ZODB.zodb4.z4format import DATA_HDR_LEN, DATA_VERSION_HDR_LEN
# originally from zodb.storage.file.main
class FileIterator(FileStorageFormatter):
"""Iterate over the transactions in a FileStorage file."""
_ltid = ZERO
## implements(IStorageIterator)
def __init__(self, file):
# - removed start and stop arguments
if isinstance(file, str):
file = open(file, 'rb')
self._file = file
self._read_metadata()
self._file.seek(0,2)
self._file_size = self._file.tell()
self._pos = self._metadata_size
def close(self):
file = self._file
if file is not None:
self._file = None
file.close()
def __iter__(self):
if self._file is None:
# A closed iterator. XXX: Is IOError the best we can do? For
# now, mimic a read on a closed file.
raise IOError("iterator is closed")
file = self._file
seek = file.seek
read = file.read
pos = self._pos
while True:
# Read the transaction record
seek(pos)
h = read(TRANS_HDR_LEN)
if len(h) < TRANS_HDR_LEN:
break
tid, tl, status, ul, dl, el = unpack(TRANS_HDR,h)
if el < 0:
el = (1L<<32) - el
if tid <= self._ltid:
warn("%s time-stamp reduction at %s", self._file.name, pos)
self._ltid = tid
if pos+(tl+8) > self._file_size or status=='c':
# Hm, the data were truncated or the checkpoint flag wasn't
# cleared. They may also be corrupted,
# in which case, we don't want to totally lose the data.
warn("%s truncated, possibly due to damaged records at %s",
self._file.name, pos)
break
if status not in ' p':
warn('%s has invalid status, %s, at %s', self._file.name,
status, pos)
if tl < (TRANS_HDR_LEN+ul+dl+el):
# We're in trouble. Find out if this is bad data in
# the middle of the file, or just a turd that Win 9x
# dropped at the end when the system crashed. Skip to
# the end and read what should be the transaction
# length of the last transaction.
seek(-8, 2)
rtl = u64(read(8))
# Now check to see if the redundant transaction length is
# reasonable:
if self._file_size - rtl < pos or rtl < TRANS_HDR_LEN:
logger.critical('%s has invalid transaction header at %s',
self._file.name, pos)
warn("It appears that there is invalid data at the end of "
"the file, possibly due to a system crash. %s "
"truncated to recover from bad data at end.",
self._file.name)
break
else:
warn('%s has invalid transaction header at %s',
self._file.name, pos)
break
tpos = pos
tend = tpos+tl
pos = tpos+(TRANS_HDR_LEN+ul+dl+el)
# user and description are utf-8 encoded strings
user = read(ul).decode('utf-8')
description = read(dl).decode('utf-8')
e = {}
if el:
try:
e = loads(read(el))
# XXX can we do better?
except:
pass
result = RecordIterator(tid, status, user, description, e, pos,
tend, file, tpos)
pos = tend
# Read the (intentionally redundant) transaction length
seek(pos)
l = u64(read(8))
if l != tl:
warn("%s redundant transaction length check failed at %s",
self._file.name, pos)
break
pos += 8
yield result
class RecordIterator(FileStorageFormatter):
"""Iterate over data records for a transaction in a FileStorage."""
## implements(ITransactionRecordIterator, ITransactionAttrs)
def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
self.tid = tid
self.status = status
self.user = user
self.description = desc
self._extension = ext
self._pos = pos
self._tend = tend
self._file = file
self._tpos = tpos
def __iter__(self):
pos = self._pos
while pos < self._tend:
# Read the data records for this transaction
h = self._read_data_header(pos)
dlen = h.recordlen()
if pos + dlen > self._tend or h.tloc != self._tpos:
warn("%s data record exceeds transaction record at %s",
file.name, pos)
return
pos += dlen
prev_txn = None
if h.plen:
refsdata = self._file.read(h.nrefs * 8)
refs = splitrefs(refsdata)
data = self._file.read(h.plen)
else:
if not h.back:
# If the backpointer is 0, then this transaction
# undoes the object creation. It either aborts
# the version that created the object or undid the
# transaction that created it. Return None
# for data and refs because the backpointer has
# the real data and refs.
data = None
refs = None
else:
data, refs, _s, tid = self._loadBackTxn(h.oid, h.back)
prev_txn = self.getTxnFromData(h.oid, h.back)
yield Record(h.oid, h.serial, h.version, data, prev_txn, refs)
class Record:
"""An abstract database record."""
## implements(IDataRecord)
def __init__(self, oid, serial, version, data, data_txn, refs):
self.oid = oid
self.serial = serial
self.version = version
self.data = data
self.data_txn = data_txn
self.refs = refs
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
# originally zodb.utils
from persistent.TimeStamp import TimeStamp
import struct
import time
from sets import Set
def p64(v):
"""Pack an integer or long into a 8-byte string"""
return struct.pack(">Q", v)
def u64(v):
"""Unpack an 8-byte string into a 64-bit long integer."""
return struct.unpack(">Q", v)[0]
def cp(f1, f2, l):
read = f1.read
write = f2.write
n = 8192
while l > 0:
if n > l:
n = l
d = read(n)
if not d:
break
write(d)
l = l - len(d)
# originally from zodb.storage.base
def splitrefs(refstr, oidlen=8):
# refstr is a packed string of reference oids. Always return a list of
# oid strings. Most storages use fixed oid lengths of 8 bytes, but if
# the oids in refstr are a different size, use oidlen to specify. This
# does /not/ support variable length oids in refstr.
if not refstr:
return []
num, extra = divmod(len(refstr), oidlen)
fmt = '%ds' % oidlen
assert extra == 0, refstr
return list(struct.unpack('>' + (fmt * num), refstr))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment