Commit c7d58894 authored by Jim Fulton's avatar Jim Fulton

Removed the Berkeley DB storage from the Zope (and ZODB 3) repository.

The storage was added to these repositories hoping that that would
encourage people to test it. The storage is not (yet) usable in
production due to Berkeley DB log handling problems.  The storage is
a constant source of irritation due to errors in implicit algorithms
to decide whether to build BDB extensions when building Python and
due to implicit algorithms used to decide whether the BDB extension
was suitable for use by the storage when running tests.  The BDB
storage tests are also rather time consuming.

The final straw was the recent ZODB change that caused many BDB
storage tests to fail. :(
parent e376b855
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Berkeley storage with full undo and versioning support.
$Revision: 1.79 $
"""
import time
import cPickle as pickle
from struct import pack, unpack
from ZODB import POSException
from ZODB.utils import p64, U64
from ZODB.referencesf import referencesf
from persistent.TimeStamp import TimeStamp
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
from BDBStorage import db, ZERO
from BDBStorage.BerkeleyBase import BerkeleyBase, PackStop, _WorkThread
ABORT = 'A'
COMMIT = 'C'
PRESENT = 'X'
# Special flag for uncreated objects (i.e. Does Not Exist)
DNE = '\377'*8
# DEBUGGING
#DNE = 'nonexist'
from _helper import incr
BDBFULL_SCHEMA_VERSION = 'BF01'
class BDBFullStorage(BerkeleyBase, ConflictResolvingStorage):
def _setupDBs(self):
# Data Type Assumptions:
#
# - Object ids (oid) are 8-bytes
# - Objects have revisions, with each revision being identified by a
# unique serial number. We sometimes refer to 16-byte strings of
# oid+serial as a revision id.
# - Transaction ids (tid) are 8-bytes
# - Version ids (vid) are 8-bytes
# - Data pickles are of arbitrary length
#
# The Full storage uses the following tables:
#
# serials -- {oid -> [tid]}
# Maps oids to txn ids, to make it easy to look up the
# txn id for the current revision of the object. The value
# combined with the oid provides a revision id (revid) which is
# used to point into the other tables.
#
# metadata -- {oid+tid -> vid+nvrevid+lrevid+previd}
# Maps object revisions to object metadata. This mapping is used
# to find other information about a particular concrete object
# revision. Essentially it stitches all the other pieces
# together. The object revision is identified by the tid of the
# transaction in which the object was modified. Normally this
# will be the serial number (IOW, the serial number and tid will
# be the same value), except in the case of abortVersion(). See
# above for details.
#
# vid is the id of the version this object revision was modified
# in. It will be zero if the object was modified in the
# non-version.
#
# nvrevid is the tid pointing to the most current non-version
# object revision. So, if the object is living in a version and
# that version is aborted, the nvrevid points to the object
# revision that will soon be restored. nvrevid will be zero if
# the object was never modified in a version.
#
# lrevid is the tid pointing to object revision's pickle state (I
# think of it as the "live revision id" since it's the state that
# gives life to the object described by this metadata record).
#
# prevrevid is the tid pointing to the previous state of the
# object. This is used for undo.
#
# txnMetadata -- {tid -> userlen+desclen+user+desc+ext}
# Maps tids to metadata about a transaction.
#
# userlen is the length in characters of the `user' field as an
# 8-byte unsigned long integer
# desclen is the length in characters of the `desc' field as an
# 8-byte unsigned long integer
# user is the user information passed to tpc_finish()
# desc is the description info passed to tpc_finish()
# ext is the extra info passed to tpc_finish(). It is a
# dictionary that we get already pickled by BaseStorage.
#
# pickles -- {oid+serial -> pickle}
# Maps the object revisions to the revision's pickle data.
#
# refcounts -- {oid -> count}
# Maps the oid to the reference count for the object. This
# reference count is updated during the _finish() call. In the
# Full storage the refcounts include all the revisions of the
# object, so it is never decremented except at pack time. When it
# goes to zero, the object is automatically deleted.
#
# txnoids -- {tid -> [oid]}
# Maps transaction ids to the oids of the objects modified by the
# transaction.
#
# pickleRefcounts -- {oid+tid -> count}
# Maps an object revision to the reference count of that
# revision's pickle. In the face of transactional undo, multiple
# revisions can point to a single pickle so that pickle can't be
# garbage collected until there are no revisions pointing to it.
#
# vids -- {version_string -> vid}
# Maps version strings (which are arbitrary) to vids.
#
# versions -- {vid -> version_string}
# Maps vids to version strings.
#
# currentVersions -- {vid -> [oid + tid]}
# Maps vids to the revids of the objects modified in that version
# for all current versions (except the 0th version, which is the
# non-version).
#
# oids -- [oid]
# This is a list of oids of objects that are modified in the
# current uncommitted transaction.
#
# pvids -- [vid]
# This is a list of all the version ids that have been created in
# the current uncommitted transaction.
#
# prevrevids -- {oid -> tid}
# This is a list of previous revision ids for objects which are
# modified by transactionalUndo in the current uncommitted
# transaction. It's necessary to properly handle multiple
# transactionalUndo()'s in a single ZODB transaction.
#
# pending -- tid -> 'A' | 'C'
# This is an optional flag which says what to do when the database
# is recovering from a crash. The flag is normally 'A' which
# means any pending data should be aborted. At the start of the
# tpc_finish() this flag will be changed to 'C' which means, upon
# recovery/restart, all pending data should be committed. Outside
# of any transaction (e.g. before the tpc_begin()), there will be
# no pending entry. It is a database invariant that if the
# pending table is empty, the oids, pvids, and prevrevids tables
# must also be empty.
#
# info -- {key -> value}
# This table contains storage metadata information. The keys and
# values are simple strings of variable length. Here are the
# valid keys:
#
# packtime - time of the last pack. It is illegal to undo to
# before the last pack time.
#
# dbversion - the version of the database serialization
# protocol (reserved for ZODB4)
#
# version - the underlying Berkeley database schema version
#
# objrevs -- {newserial+oid -> oldserial}
# This table collects object revision information for packing
# purposes. Every time a new object revision is committed, we
# write an entry to this table. When we run pack, we iterate from
# the start of this table until newserial > packtime, deleting old
# revisions of objects. Note that when a new revision of an
# object is first written to a version, no entry is written here.
# We do write an entry when we commit or abort the version.
#
# delqueue -- [oid]
# This is also a Queue, not a BTree. It is used during pack to
# list objects for which no more references exist, such that the
# objects can be completely packed away.
#
# packmark -- [oid]
# Every object reachable from the root during a classic pack
# operation will have its oid present in this table.
#
# oidqueue -- [oid]
# This table is a Queue, not a BTree. It is used during the mark
# phase of pack() and contains a list of oids for work to be done.
#
self._packing = False
self._info = self._setupDB('info')
self._serials = self._setupDB('serials', db.DB_DUP)
self._pickles = self._setupDB('pickles')
self._refcounts = self._setupDB('refcounts')
# Temporary tables which keep information during ZODB transactions
self._oids = self._setupDB('oids')
self._pvids = self._setupDB('pvids')
self._prevrevids = self._setupDB('prevrevids')
self._pending = self._setupDB('pending')
# Other tables
self._vids = self._setupDB('vids')
self._versions = self._setupDB('versions')
self._currentVersions = self._setupDB('currentVersions', db.DB_DUP)
self._metadata = self._setupDB('metadata')
self._txnMetadata = self._setupDB('txnMetadata')
self._txnoids = self._setupDB('txnoids', db.DB_DUP)
self._pickleRefcounts = self._setupDB('pickleRefcounts')
# Tables to support packing.
self._objrevs = self._setupDB('objrevs', db.DB_DUP)
self._packmark = self._setupDB('packmark')
self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 16)
self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
# Do recovery and consistency checks
self._withlock(self._dorecovery)
def _version_check(self, txn):
version = self._info.get('version')
if version is None:
self._info.put('version', BDBFULL_SCHEMA_VERSION, txn=txn)
elif version <> BDBFULL_SCHEMA_VERSION:
raise POSException.StorageSystemError, 'incompatible storage version'
def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack
# operation. I think we can safely throw out this data since the next
# pack operation will reproduce it faithfully.
txn = self._env.txn_begin()
try:
self._oidqueue.truncate(txn)
self._packmark.truncate(txn)
except:
txn.abort()
raise
else:
txn.commit()
# The pendings table may have entries if we crashed before we could
# abort or commit the outstanding ZODB transaction.
pendings = self._pending.keys()
assert len(pendings) <= 1
if len(pendings) == 0:
assert len(self._oids) == 0
assert len(self._pvids) == 0
assert len(self._prevrevids) == 0
else:
# Do recovery
tid = pendings[0]
flag = self._pending.get(tid)
assert flag in (ABORT, COMMIT)
if flag == ABORT:
self.log('aborting pending transaction %r', tid)
self._withtxn(self._doabort, tid)
else:
self.log('recovering pending transaction %r', tid)
self._withtxn(self._docommit, tid)
# Initialize our cache of the next available version id.
c = self._versions.cursor()
try:
rec = c.last()
finally:
c.close()
if rec:
# Convert to a Python long integer. Note that cursor.last()
# returns key/value, and we want the key (which for the
# versions table is the vid).
self._nextvid = U64(rec[0])
else:
self._nextvid = 0L
# Initialize the last transaction
c = self._txnoids.cursor()
try:
rec = c.last()
finally:
c.close()
if rec:
self._ltid = rec[0]
else:
self._ltid = ZERO
def _make_autopacker(self, event):
config = self._config
return _Autopack(self, event,
config.frequency, config.packtime, config.gcpack)
def _doabort(self, txn, tid):
# First clean up the oid indexed (or oid+tid indexed) tables.
co = cs = ct = cv = cr = None
try:
co = self._oids.cursor(txn=txn)
cs = self._serials.cursor(txn=txn)
ct = self._txnoids.cursor(txn=txn)
cv = self._currentVersions.cursor(txn=txn)
cr = self._objrevs.cursor(txn=txn)
rec = co.first()
while rec:
oid = rec[0]
rec = co.next()
try:
cs.set_both(oid, tid)
except db.DBNotFoundError:
pass
else:
cs.delete()
try:
ct.set_both(tid, oid)
except db.DBNotFoundError:
pass
else:
ct.delete()
# Now clean up the revision-indexed tables
revid = oid+tid
vid = self._metadata[revid][:8]
self._metadata.delete(revid, txn=txn)
# If the transaction performed an operation that did not
# write a new pickle, e.g. a version operation or undo,
# there will be metadata but no pickle.
try:
self._pickles.delete(revid, txn=txn)
except db.DBNotFoundError:
pass
# Clean up the object revisions table
try:
cr.set(oid+tid)
except db.DBNotFoundError:
pass
else:
cr.delete()
# Now we have to clean up the currentVersions table
if vid <> ZERO:
cv.set_both(vid, revid)
cv.delete()
finally:
# There's a small window of opportunity for leaking cursors here,
# if one of the earler closes were to fail. In practice this
# shouldn't happen.
if co: co.close()
if cs: cs.close()
if cv: cv.close()
if cr: cr.close()
# Now clean up the vids and versions tables
cpv = self._pvids.cursor(txn=txn)
try:
rec = cpv.first()
while rec:
vid = rec[0]
rec = cpv.next()
version = self._versions[vid]
self._versions.delete(vid, txn=txn)
self._vids.delete(version, txn=txn)
finally:
cpv.close()
# Now clean up the tid indexed table, and the temporary log tables
self._txnMetadata.delete(tid, txn=txn)
self._oids.truncate(txn)
self._pvids.truncate(txn)
self._prevrevids.truncate(txn)
self._pending.truncate(txn)
def _abort(self):
pendings = self._pending.keys()
if len(pendings) == 0:
# Nothing to abort
assert len(self._oids) == 0
assert len(self._pvids) == 0
assert len(self._prevrevids) == 0
return
assert len(pendings) == 1
tid = pendings[0]
flag = self._pending.get(tid)
assert flag == ABORT
self._withtxn(self._doabort, tid)
def _docommit(self, txn, tid):
self._pending.put(self._tid, COMMIT, txn)
# Almost all the data's already written by now so we don't need to do
# much more than update reference counts. Even there, our work is
# easy because we're not going to decref anything here.
deltas = {}
co = cs = None
try:
co = self._oids.cursor(txn=txn)
cs = self._serials.cursor(txn=txn)
rec = co.first()
while rec:
oid = rec[0]
rec = co.next()
# Get the pointer to the live pickle data for this revision
metadata = self._metadata[oid + self._tid]
lrevid = unpack('>8s', metadata[16:24])[0]
# Incref all objects referenced by this pickle, but watch out
# for the George Bailey Event, which has no pickle.
if lrevid <> DNE:
revid = oid + lrevid
data = self._pickles[revid]
self._update(deltas, data, 1)
# Incref this pickle; there's a new revision pointing to it
refcount = self._pickleRefcounts.get(revid, ZERO, txn=txn)
self._pickleRefcounts.put(revid, incr(refcount, 1),
txn=txn)
# Now delete all entries from the serials table where the
# stored tid is not equal to the committing tid.
srec = cs.set(oid)
while srec:
soid, data = srec
if soid <> oid:
break
if len(data) == 8:
stid = data
else:
# In the face of abortVersion, the first half is the
# serial number and the second half is the tid.
stid = data[8:]
if stid <> tid:
cs.delete()
srec = cs.next_dup()
finally:
# There's a small window of opportunity for leaking a cursor here,
# if co.close() were to fail. In practice this shouldn't happen.
if co: co.close()
if cs: cs.close()
# Now incref all references
for oid, delta in deltas.items():
refcount = self._refcounts.get(oid, ZERO, txn=txn)
self._refcounts.put(oid, incr(refcount, delta), txn=txn)
# Now clean up the temporary log tables
self._pvids.truncate(txn)
self._prevrevids.truncate(txn)
self._pending.truncate(txn)
self._oids.truncate(txn)
def _dobegin(self, txn, tid, u, d, e):
# It's more convenient to store the transaction metadata now, rather
# than in the _finish() call. Doesn't matter because if the ZODB
# transaction were to abort, we'd clean this up anyway.
userlen = len(u)
desclen = len(d)
lengths = pack('>II', userlen, desclen)
data = lengths + u + d + e
# When a transaction begins, we set the pending flag to ABORT,
# meaning, if we crash between now and the time we vote, all changes
# will be aborted.
self._pending.put(tid, ABORT, txn=txn)
self._txnMetadata.put(tid, data, txn=txn)
def _begin(self, tid, u, d, e):
self._withtxn(self._dobegin, self._tid, u, d, e)
def _finish(self, tid, u, d, e):
self._withtxn(self._docommit, self._tid)
self._ltid = tid
#
# Storing an object revision in a transaction
#
def _dostore(self, txn, oid, serial, data, version):
conflictresolved = False
vid = nvrevid = ovid = ZERO
# Check for conflict errors. JF says: under some circumstances,
# it is possible that we'll get two stores for the same object in
# a single transaction. It's not clear though under what
# situations that can occur or what the semantics ought to be.
# For now, we'll assume this doesn't happen.
prev = tid = self._getTidMissingOk(oid)
if tid is None:
# There's never been a previous revision of this object.
prev = ZERO
elif serial != tid:
# The object exists in the database, but the serial number
# given in the call is not the same as the last stored serial
# number. First, attempt application level conflict
# resolution, and if that fails, raise a ConflictError.
rdata = self.tryToResolveConflict(oid, tid, serial, data)
if rdata:
conflictresolved = True
data = rdata
else:
raise POSException.ConflictError(
oid=oid, serials=(tid, serial), data=data)
# Do we already know about this version? If not, we need to record
# the fact that a new version is being created. version will be the
# empty string when the transaction is storing on the non-version
# revision of the object.
if version:
vid = self._findcreatevid(version, txn)
# Now get some information and do some checks on the old revision of
# the object. We need to get the tid of the previous transaction to
# modify this object. If that transaction is in a version, it must
# be the same version as we're making this change in now.
if tid:
rec = self._metadata[oid + prev]
ovid, onvrevid = unpack('>8s8s', rec[:16])
if ovid == ZERO:
# The last revision of this object was made on the
# non-version, we don't care where the current change is
# made. But if we're storing this change on a version then
# the non-version revid will be the previous revid
if version:
nvrevid = prev
elif ovid != vid:
# We're trying to make a change on a version that's different
# than the version the current revision is on. Nuh uh.
raise POSException.VersionLockError(
'version mismatch for object %s (was: %s, got: %s)' %
tuple(map(U64, (oid, ovid, vid))))
else:
# We're making another change to this object on this version.
# The non-version revid is the same as for the previous
# revision of the object.
nvrevid = onvrevid
# Now optimistically store data to all the tables
newserial = self._tid
revid = oid + newserial
self._serials.put(oid, newserial, txn=txn)
self._pickles.put(revid, data, txn=txn)
self._metadata.put(revid, vid+nvrevid+newserial+prev, txn=txn)
self._txnoids.put(newserial, oid, txn=txn)
# Update the object revisions table, but only if this store isn't
# the first one of this object in a new version.
if not version or ovid <> ZERO:
self._objrevs.put(newserial+oid, prev, txn=txn)
# Update the log tables
self._oids.put(oid, PRESENT, txn=txn)
if vid <> ZERO:
self._currentVersions.put(vid, revid, txn=txn)
self._pvids.put(vid, PRESENT, txn=txn)
# And return the new serial number
if conflictresolved:
return ResolvedSerial
return newserial
def store(self, oid, serial, data, version, transaction):
# Lock and transaction wrapper
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
return self._withtxn(self._dostore, oid, serial, data, version)
finally:
self._lock_release()
def _dorestore(self, txn, oid, serial, data, version, prev_txn):
tid = self._tid
vid = nvrevid = ovid = ZERO
prevrevid = prev_txn
# self._tid contains the transaction id as set by
# BaseStorage.tpc_begin().
revid = oid + tid
# Calculate and write the entries for version ids
if version:
vid = self._findcreatevid(version, txn)
# Calculate the previous revision id for this object, but only if we
# weren't told what to believe, via prev_txn
if prevrevid is None:
# Get the metadata for the current revision of the object
crevid = self._getTidMissingOk(oid)
if crevid is None:
# There's never been a previous revision of this object
prevrevid = ZERO
else:
prevrevid = crevid
# Get the metadata for the previous revision, so that we can dig out
# the non-version revid, but only if there /is/ a previous revision
if prevrevid <> ZERO:
try:
ovid, onvrevid = unpack(
'>8s8s', self._metadata[oid+prevrevid][:16])
except KeyError:
# prev_txn is just a hint. If the transaction it points to
# does not exist, perhaps because it's been packed away, just
# ignore it. Also, check to see if the data matches. If
# not...
prevrevid = ZERO
else:
if ovid == ZERO:
# The last revision of this object was made on the
# non-version, we don't care where the current change is
# made. But if we're storing this change on a version
# then the non-version revid will be the previous revid
if version:
nvrevid = prevrevid
else:
# We're making another change to this object on this
# version. The non-version revid is the same as for the
# previous revision of the object.
nvrevid = onvrevid
# Check for George Bailey Events
if data is None:
lrevid = DNE
else:
# Store the pickle record. Remember that the reference counts are
# updated in _docommit().
self._pickles.put(revid, data, txn=txn)
lrevid = tid
self._serials.put(oid, serial, txn=txn)
# Update the rest of the tables
self._metadata.put(revid, vid+nvrevid+lrevid+prevrevid, txn=txn)
self._txnoids.put(tid, oid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn)
if vid <> ZERO:
self._currentVersions.put(vid, revid, txn=txn)
# Update the object revisions table, but only if this store isn't
# the first one of this object in a new version.
if not version or ovid <> ZERO:
self._objrevs.put(tid+oid, prevrevid, txn=txn)
def restore(self, oid, serial, data, version, prev_txn, transaction):
# A lot like store() but without all the consistency checks. This
# should only be used when we /know/ the data is good, hence the
# method name. While the signature looks like store() there are some
# differences:
#
# - serial is the serial number of /this/ revision, not of the
# previous revision. It is used instead of self._tid, which is
# ignored.
#
# - Nothing is returned
#
# - data can be None, which indicates a George Bailey object
# (i.e. one who's creation has been transactionally undone).
#
# prev_txn is a backpointer. In the original database, it's possible
# that the data was actually living in a previous transaction. This
# can happen for transactional undo and other operations, and is used
# as a space saving optimization. Under some circumstances the
# prev_txn may not actually exist in the target database (i.e. self)
# for example, if it's been packed away. In that case, the prev_txn
# should be considered just a hint, and is ignored if the transaction
# doesn't exist.
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
self._withtxn(
self._dorestore, oid, serial, data, version, prev_txn)
finally:
self._lock_release()
#
# Things we can do in and to a version
#
def _findcreatevid(self, version, txn):
# Get the vid associated with a version string, or create one if there
# is no vid for the version. If we're creating a new version entry,
# we need to update the pvids table in case the transaction current in
# progress gets aborted.
vid = self._vids.get(version)
if vid is None:
self._nextvid += 1
# Convert the version id into an 8-byte string
vid = p64(self._nextvid)
# Now update the vids/versions tables, along with the log table
self._vids.put(version, vid, txn=txn)
self._versions.put(vid, version, txn=txn)
self._pvids.put(vid, PRESENT, txn=txn)
return vid
def _doAbortVersion(self, txn, version):
vid = self._vids.get(version)
if vid is None:
raise POSException.VersionError, 'not a version: %s' % version
# We need to keep track of the oids that are affected by the abort so
# that we can return it to the connection, which must invalidate the
# objects so they can be reloaded.
rtnoids = {}
c = self._currentVersions.cursor(txn)
try:
try:
rec = c.set_range(vid)
except db.DBNotFoundError:
rec = None
while rec:
cvid, revid = rec
if cvid <> vid:
# No more objects modified in this version
break
oid = revid[:8]
if rtnoids.has_key(oid):
# We've already dealt with this oid
c.delete()
rec = c.next()
continue
# This object was modified
rtnoids[oid] = True
# Calculate the values for the new transaction metadata
tid = self._getTid(oid)
meta = self._metadata[oid+tid]
curvid, nvrevid = unpack('>8s8s', meta[:16])
assert curvid == vid
if nvrevid <> ZERO:
# Get the non-version data for the object. We're mostly
# interested in the lrevid, i.e. the pointer to the pickle
# data in the non-version
nvmeta = self._metadata[oid+nvrevid]
xcurvid, xnvrevid, lrevid = unpack('>8s8s8s', nvmeta[:24])
assert xcurvid == ZERO
assert xnvrevid == ZERO
else:
# This object was created in the version, so there's no
# non-version data that might have an lrevid.
lrevid = DNE
# Write all the new data to the serials and metadata tables.
newserial = self._tid
self._serials.put(oid, newserial, txn=txn)
self._metadata.put(oid+newserial, ZERO+ZERO+lrevid+tid,
txn=txn)
self._txnoids.put(newserial, oid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn)
# Now we need to write two records to the object revisions
# table. First, it's the record containing the previous
# serial number, and then it's a record containing the
# non-version serial number (but make sure the object wasn't
# created in the version).
self._objrevs.put(newserial+oid, tid, txn=txn)
self._objrevs.put(newserial+oid, nvrevid, txn=txn)
c.delete()
rec = c.next()
# XXX Should we garbage collect vids and versions? Doing so might
# interact badly with transactional undo because say we delete the
# record of the version here, and then we undo the txn with the
# abortVersion? We'll be left with metadata records that contain
# vids for which we know nothing about. So for now, no, we never
# remove stuff from the vids or version tables. I think this is
# fine in practice since the number of versions should be quite
# small over the lifetime of the database. Maybe we can figure
# out a way to do this in the pack operations.
return self._tid, rtnoids.keys()
finally:
c.close()
def abortVersion(self, version, transaction):
# Abort the version, but retain enough information to make the abort
# undoable.
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
# We can't abort the empty version, because it's not a version!
if not version:
raise POSException.VersionError
self._lock_acquire()
try:
return self._withtxn(self._doAbortVersion, version)
finally:
self._lock_release()
def _doCommitVersion(self, txn, src, dest):
# Keep track of the oids affected by this commit. See abortVersion()
rtnoids = {}
# Get the version ids associated with the src and dest version strings
svid = self._vids[src]
if not dest:
dvid = ZERO
else:
# Find the vid for the dest version, or create one if necessary.
dvid = self._findcreatevid(dest, txn)
c = self._currentVersions.cursor(txn)
try:
try:
rec = c.set_range(svid)
except db.DBNotFoundError:
rec = None
while rec:
cvid, revid = rec
if cvid <> svid:
# No more objects modified in this version
break
oid = revid[:8]
if rtnoids.has_key(oid):
# We've already dealt with this oid
c.delete()
rec = c.next()
continue
# This object was modified
rtnoids[oid] = True
# Calculate the values for the new transaction metadata
tid = self._getTid(oid)
meta = self._metadata[oid+tid]
curvid, nvrevid, lrevid = unpack('>8s8s8s', meta[:24])
assert curvid == svid
# If we're committing to the non-version, then the nvrevid
# ought to be ZERO too, regardless of what it was for the
# source version.
if not dest:
nvrevid = ZERO
newserial = self._tid
self._serials.put(oid, newserial, txn=txn)
self._metadata.put(oid+newserial, dvid+nvrevid+lrevid+tid,
txn=txn)
self._txnoids.put(newserial, oid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn)
# Now we need to write two records to the object revisions
# table. First, it's the record containing the previous
# serial number, and then it's a record containing the
# non-version serial number. However, if we're committing to
# a different version, don't write the second record.
self._objrevs.put(newserial+oid, tid, txn=txn)
if not dest:
self._objrevs.put(newserial+oid, nvrevid, txn=txn)
c.delete()
rec = c.next()
return self._tid, rtnoids.keys()
finally:
c.close()
def commitVersion(self, src, dest, transaction):
# Commit a source version `src' to a destination version `dest'. It's
# perfectly valid to move an object from one version to another. src
# and dest are version strings, and if we're committing to a
# non-version, dest will be empty.
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
# Sanity checks
if not src or src == dest:
raise POSException.VersionCommitError
self._lock_acquire()
try:
return self._withtxn(self._doCommitVersion, src, dest)
finally:
self._lock_release()
def modifiedInVersion(self, oid):
# Return the version string of the version that contains the most
# recent change to the object. The empty string means the change
# isn't in a version.
self._lock_acquire()
try:
# Let KeyErrors percolate up
tid = self._getTid(oid)
vid = self._metadata[oid+tid][:8]
if vid == ZERO:
# Not in a version
return ''
return self._versions[vid]
finally:
self._lock_release()
def versionEmpty(self, version):
# Return true if version is empty.
self._lock_acquire()
try:
# First, check if we're querying the empty (i.e. non) version
if not version:
c = self._serials.cursor()
try:
rec = c.first()
return not rec
finally:
c.close()
# If the named version doesn't exist or there are no objects in
# the version, then return true.
missing = []
vid = self._vids.get(version, missing)
if vid is missing:
return True
return not self._currentVersions.has_key(vid)
finally:
self._lock_release()
def versions(self, max=None):
# Return the list of current versions, as version strings, up to the
# maximum requested.
retval = []
self._lock_acquire()
try:
try:
c = self._currentVersions.cursor()
rec = c.first()
while rec and (max is None or max > 0):
# currentVersions maps vids to [oid]'s so dig the key out
# of the returned record and look the vid up in the
# vids->versions table.
retval.append(self._versions[rec[0]])
# Since currentVersions has duplicates (i.e. multiple vid
# keys with different oids), get the next record that has
# a different key than the current one.
rec = c.next_nodup()
if max is not None:
max -= 1
return retval
finally:
c.close()
finally:
self._lock_release()
#
# Accessor interface
#
def _load(self, oid, tid, version):
# Get the metadata associated with this revision of the object.
# All we really need is the vid, the non-version revid and the
# pickle pointer revid.
rec = self._metadata[oid+tid]
vid, nvrevid, lrevid = unpack('>8s8s8s', rec[:24])
if lrevid == DNE:
raise KeyError, 'Object does not exist: %r' % oid
# If the object isn't living in a version, or if the version the
# object is living in is the one that was requested, we simply
# return the current revision's pickle.
if vid == ZERO:
return self._pickles[oid+lrevid], tid, ""
if self._versions.get(vid) == version:
return self._pickles[oid+lrevid], tid, version
# The object was living in a version, but not the one requested.
# Semantics here are to return the non-version revision. Allow
# KeyErrors to percolate up (meaning there's no non-version rev).
lrevid = self._metadata[oid+nvrevid][16:24]
return self._pickles[oid+lrevid], tid, ""
def loadEx(self, oid, version):
self._lock_acquire()
try:
# Get the current revision information for the object. As per the
# protocol, let KeyErrors percolate up.
tid = self._getTid(oid)
return self._load(oid, tid, version)
finally:
self._lock_release()
def load(self, oid, version):
return self.loadEx(oid, version)[:2]
def loadBefore(self, oid, tid):
self._lock_acquire()
try:
c = self._metadata.cursor()
try:
# The range search will return the smallest key greater
# than oid + tid. We need to look at the previous record.
try:
p = c.set_range(oid + tid)
except db.DBNotFoundError:
# If tid > cur tid for oid, then we'll get a not-found
# error. Perhaps the current tid is sufficient?
cur_tid = self._getTid(oid)
# if cur_tid >= tid, set_range() would have worked
assert cur_tid < tid
data, tid, ver = self._load(oid, cur_tid, "")
return data, cur_tid, None
# p is the transaction after the one we're looking for.
# Grab its tid to use as the end tid return value.
next_tid = p[0][8:]
return self._search_before(c, oid, tid, next_tid)
finally:
c.close()
finally:
self._lock_release()
def _search_before(self, c, oid, tid, end_tid):
# Operates on the cursor created by loadBefore().
p = c.prev()
if p is None:
return None
key, rec = p
# If the previous record is for a different oid, then
# there is no matching record.
if key[:8] != oid:
return None
vid, nvrevid, lrevid = unpack(">8s8s8s", rec[:24])
if vid == ZERO:
revid = lrevid
start_tid = key[8:]
else:
revid = start_tid = nvrevid
while True:
rec = c.next()
if rec is None or rec[0][:8] <> oid:
end_tid = None
break
vid = rec[1][:8]
if vid == ZERO:
end_tid = rec[0][8:]
break
data = self._pickles[oid+revid]
return data, start_tid, end_tid
def _getTidMissingOk(self, oid):
# For the object, return the curent serial number and transaction id
# of the last transaction that modified the object. Usually these
# will be the same, unless the last transaction was an abortVersion.
# Also note that the serials table is written optimistically so we may
# have multiple entries for this oid. We need to collect them in
# order and return the latest one if the pending flag is COMMIT, or
# the second to latest one if the pending flag is ABORT.
#
# BAW: We must have the application level lock here.
c = self._serials.cursor()
try:
# There can be zero, one, or two entries in the serials table for
# this oid. If there are no entries, raise a KeyError (we know
# nothing about this object).
#
# If there is exactly one entry then this has to be the entry for
# the object, regardless of the pending flag.
#
# If there are two entries, then we need to look at the pending
# flag to decide which to return (there /better/ be a pending flag
# set!). If the pending flag is COMMIT then we've already voted
# so the second one is the good one. If the pending flag is ABORT
# then we haven't yet committed to this transaction so the first
# one is the good one.
serials = []
try:
rec = c.set(oid)
except db.DBNotFoundError:
rec = None
while rec:
serials.append(rec[1])
rec = c.next_dup()
if not serials:
return None
if len(serials) == 1:
return serials[0]
else:
pending = self._pending.get(self._tid)
assert pending in (ABORT, COMMIT), 'pending: %s' % pending
if pending == ABORT:
return serials[0]
else:
return serials[1]
finally:
c.close()
def _getTid(self, oid):
# For the object, return the curent serial number and transaction id
# of the last transaction that modified the object. Usually these
# will be the same, unless the last transaction was an abortVersion
tid = self._getTidMissingOk(oid)
if tid is None:
raise KeyError, 'Object does not exist: %r' % oid
return tid
def _loadSerialEx(self, oid, serial, want_version=True):
# Just like loadSerial, except that it returns the pickle data, the
# version this object revision is living in, and a backpointer. The
# backpointer is None if the lrevid for this metadata record is the
# same as the tid. If not, we have a pointer to previously existing
# data, so we return that.
self._lock_acquire()
try:
# Get the pointer to the pickle for the given serial number. Let
# KeyErrors percolate up.
metadata = self._metadata[oid+serial]
vid, nrevid, lrevid = unpack('>8s8s8s', metadata[:24])
if vid == ZERO:
version = ''
else:
version = self._versions[vid]
revid = lrevid
if not want_version and vid != ZERO:
revid = nrevid
# Check for an zombification event, possible with transactional
# undo. Use data==None to specify that.
if revid == DNE:
return None, version, None
backpointer = None
if revid != serial:
# This transaction shares its pickle data with a previous
# transaction. We need to let the caller know, esp. when it's
# the iterator code, so that it can pass this information on.
backpointer = revid
return self._pickles[oid+revid], version, backpointer
finally:
self._lock_release()
def loadSerial(self, oid, serial):
return self._loadSerialEx(oid, serial, want_version=False)[0]
def getTid(self, oid):
# Return the serial number for the current revision of this object,
# irrespective of any versions.
self._lock_acquire()
try:
tid = self._getTid(oid)
# XXX Do we really need to check for DNE?
# See if the object has been uncreated
lrevid = unpack('>8s', self._metadata[oid+tid][16:24])[0]
if lrevid == DNE:
raise KeyError
return tid
finally:
self._lock_release()
def _last_packtime(self):
return self._info.get('packtime', ZERO)
def lastTransaction(self):
"""Return transaction id for last committed transaction"""
return self._ltid
#
# Transactional undo
#
def _undo_current_tid(self, oid, ctid):
# Returns (oid, metadata record, None). The last represents the data
# which will always be None because there's no conflict resolution
# necessary.
vid, nvrevid, lrevid, prevrevid = unpack(
'>8s8s8s8s', self._metadata[oid+ctid])
# We can always undo the last transaction. The prevrevid pointer
# doesn't necessarily point to the previous transaction, if the
# revision we're undoing was itself an undo. Use a cursor to find the
# previous revision of this object.
mdc = self._metadata.cursor()
try:
mdc.set(oid+ctid)
mrec = mdc.prev()
if not mrec or mrec[0][:8] <> oid:
# The previous transaction metadata record doesn't point to
# one for this object. This could be caused by two
# conditions: either we're undoing the creation of the object,
# or the object creation transaction has been packed away.
# Checking the current record's prevrevid will tell us.
return oid, vid+nvrevid+DNE+ctid, None
# BAW: If the serial number of this object record is the same as
# the serial we're being asked to undo, then I think we have a
# problem (since the storage invariant is that it retains only one
# metadata record per object revision).
assert mrec[0][8:] <> ctid, 'storage invariant violated'
# All is good, so just restore this metadata record
return oid, mrec[1], None
finally:
mdc.close()
def _undo_to_same_pickle(self, oid, tid, ctid):
# Returns (oid, metadata record, data). Data always be None unless
# conflict resolution was necessary and succeeded.
#
# We need to compare the lrevid (pickle pointers) of the transaction
# previous to the current one, and the transaction previous to the one
# we want to undo. If their lrevids are the same, it's undoable
# because we're undoing to the same pickle state.
last_prevrevid = self._metadata[oid+ctid][24:]
target_prevrevid = self._metadata[oid+tid][24:]
if target_prevrevid == last_prevrevid == ZERO:
# We're undoing the object's creation, so the only thing to undo
# from there is the zombification of the object, i.e. the last
# transaction for this object.
vid, nvrevid = unpack('>8s8s', self._metadata[oid+tid][:16])
return oid, vid+nvrevid+DNE+ctid, None
elif target_prevrevid == ZERO or last_prevrevid == ZERO:
# The object's revision is in its initial creation state but we're
# asking for an undo of something other than the initial creation
# state. No, no.
raise POSException.UndoError(
'Undoing mismatched zombification', oid)
last_lrevid = self._metadata[oid+last_prevrevid][16:24]
target_metadata = self._metadata[oid+target_prevrevid]
target_lrevid = target_metadata[16:24]
# If the pickle pointers of the object's last revision and the
# undo-target revision are the same, then the transaction can be
# undone. Note that we cannot test for pickle equality here because
# that would allow us to undo to an arbitrary object history. Imagine
# a boolean object -- if undo tested for equality and not identity,
# then half the time we could undo to an arbitrary point in the
# object's history.
if target_lrevid == last_lrevid:
return oid, target_metadata, None
# Check previous transactional undos done in this transaction
elif target_lrevid == self._prevrevids.get(oid):
return oid, target_metadata, None
else:
# Attempt application level conflict resolution
try:
data = self.tryToResolveConflict(
oid, ctid, tid, self._pickles[oid+target_lrevid])
except ConflictError:
raise POSException.UndoError, 'Cannot undo transaction'
if data:
return oid, target_metadata, data
raise POSException.UndoError('Cannot undo transaction', oid)
def _dotxnundo(self, txn, tid):
# First, make sure the transaction isn't protected by a pack.
packtime = self._last_packtime()
if tid <= packtime:
raise POSException.UndoError('Transaction cannot be undone')
# Calculate all the oids of objects modified in this transaction
newrevs = []
c = self._txnoids.cursor(txn=txn)
try:
rec = c.set(tid)
while rec:
oid = rec[1]
rec = c.next_dup()
# In order to be able to undo this transaction, we must be
# undoing either the current revision of the object, or we
# must be restoring the exact same pickle (identity compared)
# that would be restored if we were undoing the current
# revision. Otherwise, we attempt application level conflict
# resolution. If that fails, we raise an exception.
ctid = self._getTid(oid)
if ctid == tid:
newrevs.append(self._undo_current_tid(oid, ctid))
else:
newrevs.append(self._undo_to_same_pickle(oid, tid, ctid))
finally:
c.close()
# We've checked all the objects affected by the transaction we're
# about to undo, and everything looks good. So now we'll write the
# new metadata records (and potentially new pickle records).
rtnoids = {}
for oid, metadata, data in newrevs:
newserial = self._tid
revid = oid + self._tid
# If the data pickle is None, then this undo is simply
# re-using a pickle stored earlier. All we need to do then is
# bump the pickle refcount to reflect this new reference,
# which will happen during _docommit(). Otherwise we need to
# store the new pickle data and calculate the new lrevid.
vid, nvrevid, ign, prevrevid = unpack('>8s8s8s8s', metadata)
if data is not None:
self._pickles.put(revid, data, txn=txn)
metadata = vid+nvrevid+newserial+prevrevid
# We need to write all the new records for an object changing in
# this transaction. Note that we only write to the serials table
# if prevrevids hasn't already seen this object, otherwise we'll
# end up with multiple entries in the serials table for the same
# object revision.
if not self._prevrevids.has_key(oid):
self._serials.put(oid, newserial, txn=txn)
self._metadata.put(revid, metadata, txn=txn)
# Only add this oid to txnoids once
if not rtnoids.has_key(oid):
self._prevrevids.put(oid, prevrevid, txn=txn)
self._txnoids.put(newserial, oid, txn=txn)
if vid <> ZERO:
self._currentVersions.put(vid, revid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn)
rtnoids[oid] = True
# Add this object revision to the autopack table
self._objrevs.put(newserial+oid, prevrevid, txn=txn)
return self._tid, rtnoids.keys()
def transactionalUndo(self, tid, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
return self._withtxn(self._dotxnundo, tid)
finally:
self._lock_release()
def _unpack_txnmeta(self, txnmeta):
userlen, desclen = unpack('>2I', txnmeta[:8])
user = txnmeta[8:8+userlen]
desc = txnmeta[8+userlen:8+userlen+desclen]
extdata = txnmeta[8+userlen+desclen:]
# ext is a pickled mapping. Any exceptions are ignored, but XXX can
# we (and FileStorage :) do better?
ext = {}
if extdata:
try:
ext = pickle.loads(extdata)
except Exception, e:
self.log('Error unpickling extension data: %s', e)
return user, desc, ext
def _doundolog(self, first, last, filter):
# Get the last packtime
packtime = self._last_packtime()
i = 0 # first <= i < last
txnDescriptions = [] # the return value
c = self._txnMetadata.cursor()
try:
# We start at the last transaction and scan backwards because we
# can stop early if we find a transaction that is earlier than a
# pack. We still have the potential to scan through all the
# transactions.
rec = c.last()
while rec and i < last:
tid, txnmeta = rec
rec = c.prev()
if tid <= packtime:
break
user, desc, ext = self._unpack_txnmeta(txnmeta)
# Create a dictionary for the TransactionDescription
txndesc = {'id' : tid,
'time' : TimeStamp(tid).timeTime(),
'user_name' : user,
'description': desc,
}
txndesc.update(ext)
# Now call the filter to see if this transaction should be
# added to the return list...
if filter is None or filter(txndesc):
# ...and see if this is within the requested ordinals
if i >= first:
txnDescriptions.append(txndesc)
i += 1
return txnDescriptions
finally:
c.close()
def undoLog(self, first=0, last=-20, filter=None):
# Get a list of transaction ids that can be undone, based on the
# determination of the filter. filter is a function which takes a
# transaction description and returns true or false.
#
# Note that this method has been deprecated by undoInfo() which itself
# has some flaws, but is the best we have now. We don't actually need
# to implement undoInfo() because BaseStorage (which we eventually
# inherit from) mixes in the UndoLogCompatible class which provides an
# implementation written in terms of undoLog().
#
# Interface specifies that if last is < 0, its absolute value is the
# maximum number of transactions to return.
if last < 0:
last = abs(last)
return self._withlock(self._doundolog, first, last, filter)
def history(self, oid, version=None, size=1, filter=None):
self._lock_acquire()
try:
# Jim says:
#
# This documentation is wrong. I think that the version should
# be ignored. It really shouldn't be in the signature. Zope
# never passes the version argument.
#
# so we ignore `version', which makes our lives a bit easier. We
# start with the most recent revision of the object, then search
# the transaction records backwards until we find enough records.
history = []
tid = self._getTid(oid)
# BAW: Again, let KeyErrors percolate up
while len(history) < size:
# Some information comes out of the revision metadata...
vid, nvrevid, lrevid, previd = unpack(
'>8s8s8s8s', self._metadata[oid+tid])
# ...while other information comes out of the transaction
# metadata.
txnmeta = self._txnMetadata[tid]
userlen, desclen = unpack('>II', txnmeta[:8])
user = txnmeta[8:8+userlen]
desc = txnmeta[8+userlen:8+userlen+desclen]
# Now get the pickle size
data = self._pickles[oid+lrevid]
# Create a HistoryEntry structure, which turns out to be a
# dictionary with some specifically named entries (BAW:
# although this poorly documented).
if vid == ZERO:
retvers = ''
else:
retvers = self._versions[vid]
# The HistoryEntry object
d = {'time' : TimeStamp(tid).timeTime(),
'user_name' : user,
'description': desc,
'tid' : tid,
'version' : retvers,
'size' : len(data),
}
if filter is None or filter(d):
history.append(d)
# Chase the link backwards to the next most historical
# revision, stopping when we've reached the end.
if previd == ZERO:
break
serial = tid = previd
return history
finally:
self._lock_release()
# Packing
#
# There are two types of pack operations, the classic pack and the
# autopack. Autopack's primary job is to periodically delete non-current
# object revisions. It runs in a thread and has an `autopack time' which
# is essentially just a time in the past at which to autopack to. For
# example, you might set up autopack to run once per hour, packing away
# all revisions that are older than 4 hours. Autopack can also be
# configured to periodically do a classic pack.
#
# Classic pack is like autopack -- it packs away old revisions -- but it
# also does a mark and sweep through all the known objects, looking for
# those that are not root reachable as of the pack time. Such objects are
# also packed away even if they have current revisions in the packable
# transactions, because it means that there is no undo operation that can
# restore the object's reachability. Remember that you cannot undo
# previous to the latest pack time.
#
# Both packing strategies do reference counting, and the refcounts are
# sums of the refcounts of all revisions, so if an object's refcount goes
# to zero, all its object revisions can safely be packed away.
#
# We try to interleave BerkeleyDB transactions and non-pack-lock
# acquisition as granularly as possible so that packing doesn't block
# other operations for too long. But remember we don't use Berkeley locks
# so we have to be careful about our application level locks.
# First, the public API for classic pack
def pack(self, t, zreferencesf):
"""Perform a pack on the storage.
There are two forms of packing: incremental and full gc. In an
incremental pack, only old object revisions are removed. In a full gc
pack, cyclic garbage detection and removal is also performed.
t is the pack time. All non-current object revisions older than t
will be removed in an incremental pack.
pack() always performs an incremental pack. If the gc flag is True,
then pack() will also perform a garbage collection. Some storages
(e.g. FileStorage) always do both phases in a pack() call. Such
storages should simply ignore the gc flag.
"""
# For all intents and purposes, referencesf here is always going to be
# the same as ZODB.referencesf.referencesf. It's too much of a PITA
# to pass that around to the helper methods, so just assert they're
# the same.
assert zreferencesf == referencesf
self.log('classic pack started')
# A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time.
self._packlock.acquire()
self._packing = True
try:
# We don't wrap this in _withtxn() because we're going to do the
# operation across several Berkeley transactions, which allows
# other work to happen (stores and reads) while packing is being
# done.
self._dopack(t)
finally:
self._packing = False
self._packlock.release()
self.log('classic pack finished')
def _dopack(self, t, gc=True):
# BAW: should a pack time in the future be a ValueError? When ZEO is
# involved, t could come from a remote machine with a skewed clock.
# Jim wants us to believe t if it's "close", but our algorithm
# requires synchronicity between the calculation of the pack time and
# the timestamps used in serial numbers.
#
# If a transaction is currently in progress, wait for it to finish
# before calculating the pack time, by acquiring the commit lock.
# This guarantees that the next transaction begins after the pack
# time so that any objects added in that transaction will have a
# serial number greater than the pack time. Such objects will be
# completely ignored for packing purposes.
#
# If we don't do this, then it would be possible for some of the
# current transaction's objects to have been stored with a serial
# number earlier than the pack time, but not yet linked to the root.
# Say that thread 1 starts a transaction, and then thread 2 starts a
# pack. Thread 2 then marks the root-reachable objects, but before
# sweeping, object B is stored by thread 1. If the object linking B
# to the root hasn't been stored by the time of the sweep, B will be
# collected as garbage.
#
# t is a TimeTime, or time float, convert this to a TimeStamp object,
# using an algorithm similar to what's used in FileStorage. We know
# that our transaction ids, a.k.a. revision ids, are timestamps.
self._commit_lock_acquire()
try:
packtime = min(t, time.time())
finally:
self._commit_lock_release()
t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
packtid = `t0`
# Collect all revisions of all objects earlier than the pack time.
self._lock_acquire()
try:
self._withtxn(self._collect_revs, packtid)
finally:
self._lock_release()
# Collect any objects with refcount zero.
self._lock_acquire()
try:
self._withtxn(self._collect_objs)
finally:
self._lock_release()
# If we're not doing a classic pack, we're done.
if not gc:
return
# Do a mark and sweep for garbage collection. Calculate the set of
# objects reachable from the root. Anything else is a candidate for
# having all their revisions packed away. The set of reachable
# objects lives in the _packmark table.
self._lock_acquire()
try:
self._withtxn(self._mark, packtid)
finally:
self._lock_release()
# Now perform a sweep, using oidqueue to hold all object ids for
# objects which are not root reachable as of the pack time.
self._lock_acquire()
try:
self._withtxn(self._sweep, packtid)
finally:
self._lock_release()
# Once again, collect any objects with refcount zero due to the mark
# and sweep garbage collection pass.
self._lock_acquire()
try:
self._withtxn(self._collect_objs)
finally:
self._lock_release()
def autopack(self, t, gc=False):
"""Perform an autopack pass.
Autopacking is different than classic pack() in that it doesn't do
cyclic garbage detection unless the gc flag is True.
"""
self.log('autopack started (packtime: %s, gc? %s)', t,
(gc and 'yes' or 'no'))
# A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time.
self._packlock.acquire()
self._packing = True
try:
# We don't wrap this in _withtxn() because we're going to do the
# operation across several Berkeley transactions, which allows
# other work to happen (stores and reads) while packing is being
# done.
self._dopack(t, gc)
finally:
self._packing = False
self._packlock.release()
self.log('autopack finished')
def _collect_revs(self, txn, packtid):
ct = co = None
try:
co = self._objrevs.cursor(txn=txn)
ct = self._txnoids.cursor(txn=txn)
rec = co.first()
while rec:
if self._stop:
raise PackStop, 'stopped in _collect_revs()'
revid, oldserial = rec
newserial = revid[:8]
oid = revid[8:]
if newserial > packtid:
break
# If the oldserial is ZERO, then this is the first revision of
# the object, and thus no old revision to pack away. We can
# delete this record from objrevs so we won't have to deal
# with it again. Otherwise, we can remove the metadata record
# for this revision and decref the corresponding pickle.
if oldserial <> ZERO:
orevid = oid+oldserial
# It's possible this object revision has already been
# deleted, if the oid points to a decref'd away object
if self._metadata.has_key(orevid):
metadata = self._metadata[orevid]
self._metadata.delete(orevid, txn=txn)
# Decref the pickle
self._decrefPickle(oid, metadata[16:24], txn)
try:
# Remove the txnoids entry. We have to use a cursor
# here because of the set_both().
ct.set_both(oldserial, oid)
except db.DBNotFoundError:
pass
else:
ct.delete()
co.delete()
rec = co.next()
finally:
if co: co.close()
if ct: ct.close()
# Note that before we commit this Berkeley transaction, we also need
# to update the last packtime entry, so we can't have the possibility
# of a race condition with undoLog().
self._info.put('packtime', packtid, txn=txn)
def _decrefPickle(self, oid, lrevid, txn):
if lrevid == DNE:
# There is no pickle data
return
revid = oid + lrevid
refcount = U64(self._pickleRefcounts.get(revid, ZERO)) - 1
assert refcount >= 0
if refcount == 0:
# We can collect this pickle
self._pickleRefcounts.delete(revid, txn=txn)
data = self._pickles[revid]
self._pickles.delete(revid, txn=txn)
deltas = {}
self._update(deltas, data, -1)
self._decref(deltas, txn)
else:
self._pickleRefcounts.put(revid, p64(refcount), txn=txn)
def _decref(self, deltas, txn):
for oid, delta in deltas.items():
refcount = U64(self._refcounts.get(oid, ZERO)) + delta
if refcount > 0:
self._refcounts.put(oid, p64(refcount), txn=txn)
else:
# This object is no longer referenced by any other object in
# the system. We can collect all traces of it.
self._delqueue.append(oid, txn)
def _collect_objs(self, txn):
orec = self._delqueue.consume(txn)
while orec:
if self._stop:
raise PackStop, 'stopped in _collect_objs()'
oid = orec[1]
# Delete the object from the serials table
c = self._serials.cursor(txn)
try:
try:
rec = c.set(oid)
except db.DBNotFoundError:
rec = None
while rec and rec[0] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 1'
c.delete()
rec = c.next_dup()
# We don't need the refcounts any more, but note that if the
# object was never referenced from another object, there may
# not be a refcounts entry.
try:
self._refcounts.delete(oid, txn=txn)
except db.DBNotFoundError:
pass
finally:
c.close()
# Collect all metadata records for this object
c = self._metadata.cursor(txn)
try:
try:
rec = c.set_range(oid)
except db.DBNotFoundError:
rec = None
while rec and rec[0][:8] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 2'
revid, metadata = rec
tid = revid[8:]
c.delete()
rec = c.next()
self._decrefPickle(oid, metadata[16:24], txn)
# Delete the txnoid entry for this revision
ct = self._txnoids.cursor(txn=txn)
try:
ct.set_both(tid, oid)
ct.delete()
finally:
ct.close()
# Clean up version related tables
vid = metadata[:8]
if vid <> ZERO:
cv = self._currentVersions.cursor(txn=txn)
try:
try:
cv.set_both(vid, revid)
except db.DBNotFoundError:
pass
else:
cv.delete()
finally:
cv.close()
# BAW: maybe we want to refcount vids and versions table
# entries, but given the rarity of versions, this
# seems like too much work for too little gain.
finally:
c.close()
# We really do want this down here, since _decrefPickle() could
# add more items to the queue.
orec = self._delqueue.consume(txn)
assert len(self._delqueue) == 0
def _findrev(self, oid, packtid, txn):
# BAW: Maybe this could probably be more efficient by not doing so
# much searching, but it would also be more complicated, so the
# tradeoff should be measured.
tid = self._getTid(oid)
c = self._metadata.cursor(txn=txn)
try:
rec = c.set_range(oid)
while rec:
revid, metadata = rec
coid = revid[:8]
ctid = revid[8:]
if coid <> oid or ctid > packtid:
# We found the end of the metadata records for this
# object prior to the pack time.
break
tid = ctid
rec = c.next()
finally:
c.close()
return tid
def _rootset(self, packtid, txn):
# Find the root set for reachability purposes. A root set is a tuple
# of oid and tid. First, the current root object as of the pack time
# is always in the root set. Second, any object revision after the
# pack time that has a back pointer (lrevid) to before the pack time
# serves as another root because some future undo could then revive
# any referenced objects. The root set ends up in the oidqueue.
try:
zerorev = self._findrev(ZERO, packtid, txn)
except KeyError:
# There's no root object
return
self._oidqueue.append(ZERO+zerorev, txn)
c = self._txnoids.cursor(txn)
try:
try:
rec = c.set_range(packtid)
except db.DBNotFoundError:
rec = None
while rec:
tid, oid = rec
revid = oid + tid
rec = c.next()
lrevid = self._metadata[revid][16:24]
if lrevid < packtid:
self._oidqueue.append(revid, txn)
finally:
c.close()
# tid is None if all we care about is that any object revision is present.
def _packmark_has(self, oid, tid, txn):
if tid is None:
return self._packmark.has_key(oid)
c = self._packmark.cursor(txn)
try:
try:
c.set_both(oid, tid)
return True
except db.DBNotFoundError:
return False
finally:
c.close()
def _mark(self, txn, packtid):
# Find the oids for all the objects reachable from the root, as of the
# pack time. To reduce the amount of in-core memory we need to do a
# pack operation, we'll save the mark data in the packmark table. The
# oidqueue is a BerkeleyDB Queue that holds the list of object ids to
# look at next, and by using this we don't need to keep an in-memory
# dictionary.
assert len(self._oidqueue) == 0
# Quick exit for empty storages
if not self._serials:
return
# Start with the root set, iterating over all reachable objects until
# we've traversed the entire object tree.
self._rootset(packtid, txn)
rec = self._oidqueue.consume(txn)
while rec:
if self._stop:
raise PackStop, 'stopped in _mark()'
revid = rec[1]
oid = revid[:8]
tid = revid[8:]
# See if this revision is already in the packmark
if not self._packmark_has(oid, tid, txn):
# BAW: We are more conservative than FileStorage here, since
# any reference to an object keeps all the object references
# alive. FileStorage will collect individual object
# revisions. I think our way is fine since we'll eventually
# collect everything incrementally anyway, and for Berkeley,
# all object revisions add to the refcount total.
self._packmark.put(oid, tid, txn=txn)
# Say there's no root object (as is the case in some of the
# unit tests), and we're looking up oid ZERO. Then serial
# will be None.
if tid is not None:
lrevid = self._metadata[oid+tid][16:24]
# Now get the oids of all the objects referenced by this
# object revision
data = self._pickles[oid+lrevid]
refdoids = []
referencesf(data, refdoids)
# And append them to the queue for later
for roid in refdoids:
rtid = self._findrev(roid, tid, txn)
self._oidqueue.append(roid+rtid, txn)
# Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume(txn)
assert len(self._oidqueue) == 0
def _sweep(self, txn, packtid):
c = self._serials.cursor(txn=txn)
try:
rec = c.first()
while rec:
if self._stop:
raise PackStop, 'stopped in _sweep()'
oid = rec[0]
rec = c.next()
tid = self._getTid(oid)
# If the current revision of this object newer than the
# packtid, we'll ignore this object since we only care about
# root reachability as of the pack time.
if tid > packtid:
continue
# Otherwise, if packmark (which knows about all the root
# reachable objects) doesn't have a record for this guy, then
# we can zap it. Do so by appending to oidqueue.
if not self._packmark_has(oid, None, txn):
self._delqueue.append(oid, txn)
finally:
c.close()
# We're done with the mark table
self._packmark.truncate(txn=txn)
#
# Iterator protocol
#
def iterator(self, start=None, stop=None):
"""Get a transactions iterator for the storage."""
return _TransactionsIterator(self, start, stop)
def _nexttxn(self, tid, first=False):
self._lock_acquire()
c = self._txnMetadata.cursor()
try:
# Berkeley raises DBNotFound exceptions (a.k.a. KeyError) to
# signal that it's at the end of records. Turn these into
# IndexError to signal the end of iteration.
try:
if tid is None:
# We want the first transaction
rec = c.first()
else:
# Get the next transaction after the specified one.
rec = c.set_range(tid)
except KeyError:
raise IndexError
# We're now pointing at the tid >= the requested one. For all
# calls but the first one, tid will be the last transaction id we
# returned, so we really want the next one.
if not first:
rec = c.next()
if rec is None:
raise IndexError
tid, txnmeta = rec
# Now unpack the necessary information. Don't impedence match the
# status flag (that's done by the caller).
packtime = self._last_packtime()
if tid <= packtime:
packedp = True
else:
packedp = False
user, desc, ext = self._unpack_txnmeta(txnmeta)
return tid, packedp, user, desc, ext
finally:
if c:
c.close()
self._lock_release()
# Other interface assertions
def supportsTransactionalUndo(self):
return True
def supportsUndo(self):
return True
def supportsVersions(self):
return True
class _GetItemBase:
def __getitem__(self, i):
# Ignore the index, since we expect .next() will raise the appropriate
# IndexError when the iterator is exhausted.
return self.next()
class _TransactionsIterator(_GetItemBase):
"""Provide forward iteration through the transactions in a storage.
Transactions *must* be accessed sequentially (e.g. with a for loop).
"""
def __init__(self, storage, start, stop):
self._storage = storage
self._tid = start
self._stop = stop
self._closed = False
self._first = True
self._iters = []
def __len__(self):
# This is a lie. It's here only for Python 2.1 support for
# list()-ifying these objects.
return 0
# This allows us to pass an iterator as the `other' argument to
# copyTransactionsFrom() in BaseStorage. The advantage here is that we
# can create the iterator manually, e.g. setting start and stop, and then
# just let copyTransactionsFrom() do its thing.
def iterator(self):
return self
def next(self):
"""Return the ith item in the sequence of transaction data.
Items must be accessed sequentially, and are instances of
RecordsIterator. An IndexError will be raised after all of the items
have been returned.
"""
if self._closed:
raise IOError, 'iterator is closed'
# Let IndexErrors percolate up.
tid, packedp, user, desc, ext = self._storage._nexttxn(
self._tid, self._first)
self._first = False
# Did we reach the specified end?
if self._stop is not None and tid > self._stop:
raise IndexError
self._tid = tid
it = _RecordsIterator(self._storage, tid, packedp, user, desc, ext)
self._iters.append(it)
return it
def close(self):
for it in self._iters:
it.close()
self._closed = True
class _RecordsIterator(_GetItemBase):
"""Provide transaction meta-data and forward iteration through the
transactions in a storage.
Items *must* be accessed sequentially (e.g. with a for loop).
"""
# Transaction id as an 8-byte timestamp string
tid = None
# Transaction status code;
# ' ' -- normal
# 'p' -- Transaction has been packed, and contains incomplete data.
#
# Note that undone ('u') and checkpoint transactions ('c') should not be
# included.
status = None
# The standard transaction metadata
user = None
description = None
_extension = None
def __init__(self, storage, tid, packedp, user, desc, ext):
self._storage = storage
self.tid = tid
# Impedence matching
if packedp:
self.status = 'p'
else:
self.status = ' '
self.user = user
self.description = desc
self._extension = ext
# BAW: touching the storage's private parts!
self._table = self._storage._txnoids
self._cursor = None
self._rec = None
def next(self):
"""Return the ith item in the sequence of record data.
Items must be accessed sequentially, and are instances of Record. An
IndexError will be raised after all of the items have been
returned.
"""
if self._table is None:
# We already exhausted this iterator
raise IndexError
# Initialize a txnoids cursor and set it to the start of the oids
# touched by this transaction. We do this here to ensure the cursor
# is closed if there are any problems. A hole in this approach is if
# the client never exhausts the iterator. Then I think we have a
# problem because I don't think the environment can be closed if
# there's an open cursor, but you also cannot close the cursor if the
# environment is already closed (core dumps), so an __del__ doesn't
# help a whole lot.
try:
if self._cursor is None:
self._cursor = self._table.cursor()
try:
self._rec = self._cursor.set(self.tid)
except db.DBNotFoundError:
pass
# Cursor exhausted?
if self._rec is None:
self.close()
raise IndexError
oid = self._rec[1]
self._rec = self._cursor.next_dup()
data, version, lrevid = self._storage._loadSerialEx(oid, self.tid)
return _Record(oid, self.tid, version, data, lrevid)
except:
self.close()
raise
def close(self):
if self._cursor:
self._cursor.close()
self._cursor = None
# _table == None means the iterator has been exhausted
self._table = None
class _Record:
# Object Id
oid = None
# Object serial number (i.e. revision id)
tid = None
# Version string
version = None
# Data pickle
data = None
# The pointer to the transaction containing the pickle data, if not None
data_txn = None
def __init__(self, oid, tid, version, data, data_txn):
self.oid = oid
self.tid = tid
self.version = version
self.data = data
self.data_txn = data_txn
class _Autopack(_WorkThread):
NAME = 'autopacking'
def __init__(self, storage, event, frequency, packtime, gcpack):
_WorkThread.__init__(self, storage, event, frequency)
self._packtime = packtime
self._gcpack = gcpack
# Bookkeeping
self._lastgc = 0
def _dowork(self):
# Should we do a full gc pack this time?
if self._gcpack <= 0:
dofullgc = False
else:
v = (self._lastgc + 1) % self._gcpack
self._lastgc = v
dofullgc = not v
# Run the full gc phase
self._storage.autopack(time.time() - self._packtime, dofullgc)
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Berkeley storage without undo or versioning.
"""
__version__ = '$Revision: 1.34 $'[-2:][0]
from ZODB import POSException
from ZODB.utils import p64, U64
from ZODB.referencesf import referencesf
from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
from BDBStorage import db, ZERO
from BerkeleyBase import BerkeleyBase, PackStop, _WorkThread
ABORT = 'A'
COMMIT = 'C'
PRESENT = 'X'
BDBMINIMAL_SCHEMA_VERSION = 'BM01'
class BDBMinimalStorage(BerkeleyBase, ConflictResolvingStorage):
def _setupDBs(self):
# Data Type Assumptions:
#
# - Object ids (oid) are 8-bytes
# - Objects have revisions, with each revision being identified by a
# unique serial number.
# - Transaction ids (tid) are 8-bytes
# - Data pickles are of arbitrary length
#
# The Minimal storage uses the following tables:
#
# serials -- {oid -> [serial]}
# Maps oids to serial numbers. Each oid can be mapped to 1 or 2
# serial numbers (this is for optimistic writes). If it maps to
# two serial numbers, then the current one is determined by the
# pending flag (see below).
#
# pickles -- {oid+serial -> pickle}
# Maps the object revisions to the revision's pickle data.
#
# refcounts -- {oid -> count}
# Maps the oid to the reference count for the object. This
# reference count is updated during the _finish() call. When it
# goes to zero, the object is automatically deleted.
#
# oids -- [oid]
# This is a list of oids of objects that are modified in the
# current uncommitted transaction.
#
# pending -- tid -> 'A' | 'C'
# This is an optional flag which says what to do when the database
# is recovering from a crash. The flag is normally 'A' which
# means any pending data should be aborted. At the start of the
# tpc_finish() this flag will be changed to 'C' which means, upon
# recovery/restart, all pending data should be committed. Outside
# of any transaction (e.g. before the tpc_begin()), there will be
# no pending entry. It is a database invariant that if the
# pending table is empty, the oids table must also be empty.
#
# info -- {key -> value}
# This table contains storage metadata information. The keys and
# values are simple strings of variable length. Here are the
# valid keys:
#
# version - the version of the database (reserved for ZODB4)
#
# packmark -- [oid]
# Every object reachable from the root during a classic pack
# operation will have its oid present in this table.
#
# oidqueue -- [oid]
# This table is a Queue, not a BTree. It is used during the mark
# phase of pack() and contains a list of oids for work to be done.
# It is also used during pack to list objects for which no more
# references exist, such that the objects can be completely packed
# away.
#
self._packing = False
self._info = self._setupDB('info')
self._serials = self._setupDB('serials', db.DB_DUP)
self._pickles = self._setupDB('pickles')
self._refcounts = self._setupDB('refcounts')
self._oids = self._setupDB('oids')
self._pending = self._setupDB('pending')
# Tables to support packing.
self._packmark = self._setupDB('packmark')
self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 8)
# Do recovery and consistency checks
pendings = self._pending.keys()
assert len(pendings) <= 1
if len(pendings) == 0:
assert len(self._oids) == 0
else:
# Do recovery
tid = pendings[0]
flag = self._pending.get(tid)
assert flag in (ABORT, COMMIT)
self._lock_acquire()
try:
if flag == ABORT:
self._withtxn(self._doabort, tid)
else:
self._withtxn(self._docommit, tid)
finally:
self._lock_release()
def _version_check(self, txn):
version = self._info.get('version')
if version is None:
self._info.put('version', BDBMINIMAL_SCHEMA_VERSION, txn=txn)
elif version <> BDBMINIMAL_SCHEMA_VERSION:
raise POSException.StorageSystemError(
'incompatible storage version')
def _make_autopacker(self, event):
return _Autopack(self, event, self._config.frequency)
def _doabort(self, txn, tid):
co = cs = None
try:
co = self._oids.cursor(txn=txn)
cs = self._serials.cursor(txn=txn)
rec = co.first()
while rec:
oid = rec[0]
rec = co.next()
try:
cs.set_both(oid, tid)
except db.DBNotFoundError:
pass
else:
cs.delete()
# And delete the pickle table entry for this revision.
self._pickles.delete(oid+tid, txn=txn)
finally:
# There's a small window of opportunity for leaking a cursor here,
# if co.close() were to fail. In practice this shouldn't happen.
if co: co.close()
if cs: cs.close()
# We're done with these tables
self._oids.truncate(txn)
self._pending.truncate(txn)
def _abort(self):
self._withtxn(self._doabort, self._tid)
def _docommit(self, txn, tid):
self._pending.put(self._tid, COMMIT, txn)
deltas = {}
co = cs = None
try:
co = self._oids.cursor(txn=txn)
cs = self._serials.cursor(txn=txn)
rec = co.first()
while rec:
oid = rec[0]
rec = co.next()
# Remove from the serials table all entries with key oid where
# the serial is not tid. These are the old revisions of the
# object. At the same time, we want to collect the oids of
# the objects referred to by this revision's pickle, so that
# later we can decref those reference counts.
srec = cs.set(oid)
while srec:
soid, stid = srec
if soid <> oid:
break
if stid <> tid:
# This is the previous revision of the object, so
# decref its referents and clean up its pickles.
cs.delete()
data = self._pickles.get(oid+stid, txn=txn)
assert data is not None
self._update(deltas, data, -1)
self._pickles.delete(oid+stid, txn=txn)
srec = cs.next_dup()
# Now add incref deltas for all objects referenced by the new
# revision of this object.
data = self._pickles.get(oid+tid, txn=txn)
assert data is not None
self._update(deltas, data, 1)
finally:
# There's a small window of opportunity for leaking a cursor here,
# if co.close() were to fail. In practice this shouldn't happen.
if co: co.close()
if cs: cs.close()
# We're done with this table
self._pending.truncate(txn)
# If we're in the middle of a pack, we need to add to the packmark
# table any objects that were modified in this transaction.
# Otherwise, there's a race condition where mark might have happened,
# then the object is added, then sweep runs, deleting the object
# created in the interrim.
if self._packing:
for oid in self._oids.keys():
self._packmark.put(oid, PRESENT, txn=txn)
self._oids.truncate(txn)
# Now, to finish up, we need apply the refcount deltas to the
# refcounts table, and do recursive collection of all refcount == 0
# objects.
while deltas:
deltas = self._update_refcounts(deltas, txn)
def _update_refcounts(self, deltas, txn):
newdeltas = {}
for oid, delta in deltas.items():
refcount = U64(self._refcounts.get(oid, ZERO, txn=txn)) + delta
assert refcount >= 0
if refcount == 0:
# The reference count for this object has just gone to zero,
# so we can safely remove all traces of it from the serials,
# pickles and refcounts table. Note that before we remove its
# pickle, we need to decref all the objects referenced by it.
current = self._getCurrentSerial(oid)
data = self._pickles.get(oid+current, txn=txn)
self._update(newdeltas, data, -1)
# And delete the serials, pickle and refcount entries. At
# this point, I believe we should have just one serial entry.
self._serials.delete(oid, txn=txn)
assert self._serials.get(oid, txn=txn) is None
self._refcounts.delete(oid, txn=txn)
self._pickles.delete(oid+current, txn=txn)
else:
self._refcounts.put(oid, p64(refcount), txn=txn)
# Return the list of objects referenced by pickles just deleted in
# this round, for decref'ing on the next go 'round.
return newdeltas
def _begin(self, tid, u, d, e):
# When a transaction begins, we set the pending flag to ABORT,
# meaning, if we crash between now and the time we vote, all changes
# will be aborted.
txn = self._env.txn_begin()
try:
self._pending.put(self._tid, ABORT, txn)
except:
txn.abort()
raise
else:
txn.commit()
def _dostore(self, txn, oid, serial, data):
conflictresolved = False
oserial = self._getCurrentSerial(oid)
if oserial is not None and serial <> oserial:
# The object exists in the database, but the serial number
# given in the call is not the same as the last stored serial
# number. Raise a ConflictError.
rdata = self.tryToResolveConflict(oid, oserial, serial, data)
if rdata:
conflictresolved = True
data = rdata
else:
raise POSException.ConflictError(
oid=oid, serials=(oserial, serial), data=data)
# Optimistically write to the serials and pickles table. Be sure
# to also update the oids table for this object too.
newserial = self._tid
self._serials.put(oid, newserial, txn=txn)
self._pickles.put(oid+newserial, data, txn=txn)
self._oids.put(oid, PRESENT, txn=txn)
# If we're in the middle of a pack, we need to add these objects to
# the packmark, so a specific race condition won't collect them.
# E.g. we do a mark, then we do a store, then we sweep. The objects
# stored between the mark and sweep would get collected away.
if self._packing:
self._packmark.put(oid, PRESENT, txn=txn)
# Return the new serial number for the object
if conflictresolved:
return ResolvedSerial
return newserial
def store(self, oid, serial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
# We don't support versions
if version <> '':
raise POSException.Unsupported, 'versions are not supported'
# All updates must be done with the application lock acquired
self._lock_acquire()
try:
return self._withtxn(self._dostore, oid, serial, data)
finally:
self._lock_release()
def _finish(self, tid, u, d, e):
# _docommit() twiddles the pending flag to COMMIT now since after the
# vote call, we promise that the changes will be committed, no matter
# what. The recovery process will check this.
self._withtxn(self._docommit, self._tid)
#
# Accessor interface
#
def _getCurrentSerial(self, oid):
# BAW: We must have the application level lock here.
c = self._serials.cursor()
try:
# There can be zero, one, or two entries in the serials table for
# this oid. If there are no entries, raise a KeyError (we know
# nothing about this object).
#
# If there is exactly one entry then this has to be the entry for
# the object, regardless of the pending flag.
#
# If there are two entries, then we need to look at the pending
# flag to decide which to return (there /better/ be a pending flag
# set!). If the pending flag is COMMIT then we've already voted
# so the second one is the good one. If the pending flag is ABORT
# then we haven't yet committed to this transaction so the first
# one is the good one.
serials = []
try:
rec = c.set(oid)
except db.DBNotFoundError:
rec = None
while rec:
serials.append(rec[1])
rec = c.next_dup()
if not serials:
return None
if len(serials) == 1:
return serials[0]
pending = self._pending.get(self._tid)
assert pending in (ABORT, COMMIT)
if pending == ABORT:
return serials[0]
return serials[1]
finally:
c.close()
def load(self, oid, version):
if version <> '':
raise POSException.Unsupported, 'versions are not supported'
self._lock_acquire()
try:
# Get the current serial number for this object
serial = self._getCurrentSerial(oid)
if serial is None:
raise KeyError, 'Object does not exist: %r' % oid
# Get this revision's pickle data
return self._pickles[oid+serial], serial
finally:
self._lock_release()
def modifiedInVersion(self, oid):
# So BaseStorage.getTid() just works. Note that this storage
# doesn't support versions.
return ''
#
# Packing. In Minimal storage, packing is only required to get rid of
# object cycles, since there are no old object revisions.
#
def pack(self, t, zreferencesf):
# For all intents and purposes, referencesf here is always going to be
# the same as ZODB.referencesf.referencesf. It's too much of a PITA
# to pass that around to the helper methods, so just assert they're
# the same.
assert zreferencesf == referencesf
self.log('classic pack started')
# A simple wrapper around the bulk of packing, but which acquires a
# lock that prevents multiple packs from running at the same time.
self._packlock.acquire()
# Before setting the packing flag to true, acquire the storage lock
# and clear out the packmark table, in case there's any cruft left
# over from the previous pack.
def clear_packmark(txn):
self._packmark.truncate(txn=txn)
self._withlock(self._withtxn, clear_packmark)
self._packing = True
try:
# We don't wrap this in _withtxn() because we're going to do the
# operation across several Berkeley transactions, which allows
# other work to happen (stores and reads) while packing is being
# done.
#
# Also, we don't care about the pack time, since we don't need to
# collect object revisions
self._dopack()
finally:
self._packing = False
self._packlock.release()
self.log('classic pack finished')
def _dopack(self):
# Do a mark and sweep for garbage collection. Calculate the set of
# objects reachable from the root. Anything else is a candidate for
# having all their revisions packed away. The set of reachable
# objects lives in the _packmark table.
self._withlock(self._withtxn, self._mark)
# Now perform a sweep, using oidqueue to hold all object ids for
# objects which are not root reachable as of the pack time.
self._withlock(self._withtxn, self._sweep)
# Once again, collect any objects with refcount zero due to the mark
# and sweep garbage collection pass.
self._withlock(self._withtxn, self._collect_objs)
def _mark(self, txn):
# Find the oids for all the objects reachable from the root. To
# reduce the amount of in-core memory we need do do a pack operation,
# we'll save the mark data in the packmark table. The oidqueue is a
# BerkeleyDB Queue that holds the list of object ids to look at next,
# and by using this we don't need to keep an in-memory dictionary.
assert len(self._oidqueue) == 0
# Quick exit for empty storages
if not self._serials:
return
# The oid of the object we're looking at, starting at the root
oid = ZERO
# Start at the root, find all the objects the current revision of the
# root references, and then for each of those, find all the objects it
# references, and so on until we've traversed the entire object graph.
while oid:
if self._stop:
raise PackStop, 'stopped in _mark()'
if not self._packmark.has_key(oid):
# We've haven't yet seen this object
self._packmark.put(oid, PRESENT, txn=txn)
# Get the pickle data for this object
tid = self._getCurrentSerial(oid)
# Say there's no root object (as is the case in some of the
# unit tests), and we're looking up oid ZERO. Then serial
# will be None.
if tid is not None:
data = self._pickles[oid+tid]
# Now get the oids of all the objects referenced by this
# pickle
refdoids = []
referencesf(data, refdoids)
# And append them to the queue for later
for oid in refdoids:
self._oidqueue.append(oid, txn)
# Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume(txn)
oid = rec and rec[1]
assert len(self._oidqueue) == 0
def _sweep(self, txn):
c = self._serials.cursor(txn=txn)
try:
rec = c.first()
while rec:
if self._stop:
raise PackStop, 'stopped in _sweep()'
oid = rec[0]
rec = c.next()
# If packmark (which knows about all the root reachable
# objects) doesn't have a record for this guy, then we can zap
# it. Do so by appending to oidqueue.
if not self._packmark.has_key(oid):
self._oidqueue.append(oid, txn)
finally:
c.close()
# We're done with the mark table
self._packmark.truncate(txn)
def _collect_objs(self, txn):
orec = self._oidqueue.consume(txn)
while orec:
if self._stop:
raise PackStop, 'stopped in _collect_objs()'
oid = orec[1]
# Delete the object from the serials table
c = self._serials.cursor(txn)
try:
try:
rec = c.set(oid)
except db.DBNotFoundError:
rec = None
while rec and rec[0] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 1'
c.delete()
rec = c.next_dup()
# We don't need the refcounts any more, but note that if the
# object was never referenced from another object, there may
# not be a refcounts entry.
try:
self._refcounts.delete(oid, txn=txn)
except db.DBNotFoundError:
pass
finally:
c.close()
# Now collect the pickle data and do reference counting
c = self._pickles.cursor(txn)
try:
try:
rec = c.set_range(oid)
except db.DBNotFoundError:
rec = None
while rec and rec[0][:8] == oid:
if self._stop:
raise PackStop, 'stopped in _collect_objs() loop 2'
data = rec[1]
c.delete()
rec = c.next()
deltas = {}
self._update(deltas, data, -1)
for oid, delta in deltas.items():
refcount = U64(self._refcounts.get(oid, ZERO)) + delta
if refcount <= 0:
self._oidqueue.append(oid, txn)
else:
self._refcounts.put(oid, p64(refcount), txn=txn)
finally:
c.close()
# We really do want this down here, since _decrefPickle() could
# add more items to the queue.
orec = self._oidqueue.consume(txn)
assert len(self._oidqueue) == 0
#
# Stuff we don't support
#
def supportsTransactionalUndo(self):
return False
def supportsUndo(self):
return False
def supportsVersions(self):
return False
# Don't implement these
#
# versionEmpty(self, version)
# versions(self, max=None)
# loadSerial(self, oid, serial)
# getTid(self, oid)
# transactionalUndo(self, tid, transaction)
# undoLog(self, first=0, last=-20, filter=None)
# history(self, oid, version=None, size=1, filter=None)
# iterator(self, start=None, stop=None)
class _Autopack(_WorkThread):
NAME = 'autopacking'
def _dowork(self):
# Run the autopack phase
self._storage.pack('ignored', referencesf)
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Base class for BerkeleyDB-based storage implementations.
"""
import os
import sys
import time
import errno
import shutil
import threading
from types import StringType
# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net
from BDBStorage import db, ZERO
# BaseStorage provides primitives for lock acquisition and release, and a host
# of other methods, some of which are overridden here, some of which are not.
from ZODB.lock_file import lock_file
from ZODB.BaseStorage import BaseStorage
from ZODB.referencesf import referencesf
import zLOG
GBYTES = 1024 * 1024 * 1000
# How long should we wait to join one of the background daemon threads? It's
# a good idea to not set this too short, or we could corrupt our database.
# That would be recoverable, but recovery could take a long time too, so it's
# better to shutdown cleanly.
JOIN_TIME = 10
class PackStop(Exception):
"""Escape hatch for pack operations."""
class BerkeleyConfig:
"""Bag of attributes for configuring Berkeley based storages.
Berkeley databases are wildly configurable, and this class exposes some of
that. To customize these options, instantiate one of these classes and
set the attributes below to the desired value. Then pass this instance to
the Berkeley storage constructor, using the `config' keyword argument.
BerkeleyDB stores all its information in an `environment directory'
(modulo log files, which can be in a different directory, see below). By
default, the `name' argument given to the storage constructor names this
directory, but you can set this option to explicitly point to a different
location:
- envdir if not None, names the BerkeleyDB environment directory. The
directory will be created if necessary, but its parent directory must
exist. Additional configuration is available through the BerkeleyDB
DB_CONFIG mechanism.
Berkeley storages need to be checkpointed occasionally, otherwise
automatic recovery can take a huge amount of time. You should set up a
checkpointing policy which trades off the amount of work done periodically
against the recovery time. Note that the Berkeley environment is
automatically, and forcefully, checkpointed twice when it is closed.
The following checkpointing attributes are supported:
- interval indicates how often, in seconds, a Berkeley checkpoint is
performed. If this is non-zero, checkpointing is performed by a
background thread. Otherwise checkpointing will only be done when the
storage is closed. You really want to enable checkpointing. ;)
- kbytes is passed directly to txn_checkpoint()
- min is passed directly to txn_checkpoint()
You can achieve one of the biggest performance wins by moving the Berkeley
log files to a different disk than the data files. We saw between 2.5 and
7 x better performance this way. Here are attributes which control the
log files.
- logdir if not None, is passed to the environment's set_lg_dir() method
before it is opened.
You can also improve performance by tweaking the Berkeley cache size.
Berkeley's default cache size is 256KB which is usually too small. Our
default cache size is 128MB which seems like a useful tradeoff between
resource consumption and improved performance. You might be able to get
slightly better results by turning up the cache size, although be mindful
of your system's limits. See here for more details:
http://www.sleepycat.com/docs/ref/am_conf/cachesize.html
These attributes control cache size settings:
- cachesize should be the size of the cache in bytes.
These attributes control the autopacking thread:
- frequency is the time in seconds after which an autopack phase will be
performed. E.g. if frequency is 3600, an autopack will be done once per
hour. Set frequency to 0 to disable autopacking (the default).
- packtime is the time in seconds marking the moment in the past at which
to autopack to. E.g. if packtime is 14400, autopack will pack to 4
hours in the past. For Minimal storage, this value is ignored.
- gcpack is an integer indicating how often an autopack phase should do a
full garbage collecting pack. E.g. if gcpack is 24 and frequence is
3600, a gc pack will be performed once per day. Set to zero to never
automatically do gc packs. For Minimal storage, this value is ignored;
all packs are gc packs.
Here are some other miscellaneous configuration variables:
- read_only causes ReadOnlyError's to be raised whenever any operation
(except pack!) might modify the underlying database.
"""
envdir = None
interval = 120
kbyte = 0
min = 0
logdir = None
cachesize = 128 * 1024 * 1024
frequency = 0
packtime = 4 * 60 * 60
gcpack = 0
read_only = False
def __repr__(self):
d = self.__class__.__dict__.copy()
d.update(self.__dict__)
return """<BerkeleyConfig (read_only=%(read_only)s):
\tenvironment dir:: %(envdir)s
\tcheckpoint interval: %(interval)s seconds
\tcheckpoint kbytes: %(kbyte)s
\tcheckpoint minutes: %(min)s
\t----------------------
\tlogdir: %(logdir)s
\tcachesize: %(cachesize)s bytes
\t----------------------
\tautopack frequency: %(frequency)s seconds
\tpack to %(packtime)s seconds in the past
\tclassic pack every %(gcpack)s autopacks
\t>""" % d
class BerkeleyBase(BaseStorage):
"""Base storage for Minimal and Full Berkeley implementations."""
def __init__(self, name, env=None, prefix='zodb_', config=None):
"""Create a new storage.
name is an arbitrary name for this storage. It is returned by the
getName() method.
Optional env, if given, is either a string or a DBEnv object. If it
is a non-empty string, it names the database environment,
i.e. essentially the name of a directory into which BerkeleyDB will
store all its supporting files. It is passed directly to
DbEnv().open(), which in turn is passed to the BerkeleyDB function
DBEnv->open() as the db_home parameter.
Note that if you want to customize the underlying Berkeley DB
parameters, this directory can contain a DB_CONFIG file as per the
Sleepycat documentation.
If env is given and it is not a string, it must be an opened DBEnv
object as returned by bsddb3.db.DBEnv(). In this case, it is your
responsibility to create the object and open it with the proper
flags.
Optional prefix is the string to prepend to name when passed to
DB.open() as the dbname parameter. IOW, prefix+name is passed to the
BerkeleyDb function DB->open() as the database parameter. It defaults
to "zodb_".
Optional config must be a BerkeleyConfig instance, or None, which
means to use the default configuration options.
"""
# sanity check arguments
if config is None:
config = BerkeleyConfig()
self._config = config
if name == '':
raise TypeError, 'database name is empty'
if env is None:
env = name
self.log('Creating Berkeley environment')
if env == '':
raise TypeError, 'environment name is empty'
elif isinstance(env, StringType):
self._env, self._lockfile = env_from_string(env, self._config)
else:
self._env = env
# Use the absolute path to the environment directory as the name.
# This should be enough of a guarantee that sortKey() -- which via
# BaseStorage uses the name -- is globally unique.
envdir = os.path.abspath(self._env.db_home)
self.log('Berkeley environment dir: %s', envdir)
BaseStorage.__init__(self, envdir)
self._is_read_only = config.read_only
# Instantiate a pack lock
self._packlock = threading.RLock()
self._stop = self._closed = False
# Initialize a few other things
self._prefix = prefix
# Give the subclasses a chance to interpose into the database setup
# procedure
self._tables = []
self._setupDBs()
self._withtxn(self._version_check)
# Initialize the object id counter.
self._init_oid()
# Set up the checkpointing thread
self.log('setting up threads')
if config.interval > 0:
self._checkpointstop = event = threading.Event()
self._checkpointer = _Checkpoint(self, event, config.interval)
self._checkpointer.start()
else:
self._checkpointer = None
# Set up the autopacking thread
if config.frequency > 0:
self._autopackstop = event = threading.Event()
self._autopacker = self._make_autopacker(event)
self._autopacker.start()
else:
self._autopacker = None
self.log('ready')
def _version_check(self, txn):
raise NotImplementedError
def _make_autopacker(self, event):
raise NotImplementedError
def _setupDB(self, name, flags=0, dbtype=None, reclen=None):
"""Open an individual database with the given flags.
flags are passed directly to the underlying DB.set_flags() call.
Optional dbtype specifies the type of BerkeleyDB access method to
use. Optional reclen if not None gives the record length.
"""
if dbtype is None:
dbtype = db.DB_BTREE
d = db.DB(self._env)
if flags:
d.set_flags(flags)
# Our storage is based on the underlying BSDDB btree database type.
if reclen is not None:
d.set_re_len(reclen)
# DB 4.1 requires that operations happening in a transaction must be
# performed on a database that was opened in a transaction. Since we
# do the former, we must do the latter. However, earlier DB versions
# don't transactionally protect database open, so this is the most
# portable way to write the code.
openflags = db.DB_CREATE
try:
openflags |= db.DB_AUTO_COMMIT
except AttributeError:
pass
d.open(self._prefix + name, dbtype, openflags)
self._tables.append(d)
return d
def _setupDBs(self):
"""Set up the storages databases, typically using '_setupDB'.
This must be implemented in a subclass.
"""
raise NotImplementedError, '_setupDbs()'
def _init_oid(self):
"""Initialize the object id counter."""
# If the `serials' database is non-empty, the last object id in the
# database will be returned (as a [key, value] pair). Use it to
# initialize the object id counter.
#
# If the database is empty, just initialize it to zero.
value = self._serials.cursor().last()
if value:
self._oid = value[0]
else:
self._oid = ZERO
# It can be very expensive to calculate the "length" of the database, so
# we cache the length and adjust it as we add and remove objects.
_len = None
def __len__(self):
"""Return the number of objects in the index."""
if self._len is None:
# The cache has never been initialized. Do it once the expensive
# way.
self._len = len(self._serials)
return self._len
def new_oid(self, last=None):
"""Create a new object id.
If last is provided, the new oid will be one greater than that.
"""
# BAW: the last parameter is undocumented in the UML model
newoid = BaseStorage.new_oid(self, last)
if self._len is not None:
# Increment the cached length
self._len += 1
return newoid
def getSize(self):
"""Return the size of the database."""
# Return the size of the pickles table as a rough estimate
filename = os.path.join(self._env.db_home, 'zodb_pickles')
return os.path.getsize(filename)
def _vote(self):
pass
def _finish(self, tid, user, desc, ext):
"""Called from BaseStorage.tpc_finish(), this commits the underlying
BSDDB transaction.
tid is the transaction id
user is the transaction user
desc is the transaction description
ext is the transaction extension
These are all ignored.
"""
self._transaction.commit()
def _abort(self):
"""Called from BaseStorage.tpc_abort(), this aborts the underlying
BSDDB transaction.
"""
self._transaction.abort()
def _clear_temp(self):
"""Called from BaseStorage.tpc_abort(), BaseStorage.tpc_begin(),
BaseStorage.tpc_finish(), this clears out the temporary log file
"""
# BAW: no-op this since the right CommitLog file operations are
# performed by the methods in the derived storage class.
pass
def log(self, msg, *args):
zLOG.LOG(self.__class__.__name__, zLOG.INFO, msg % args)
def close(self):
"""Close the storage.
All background threads are stopped and joined first, then all the
tables are closed, and finally the environment is force checkpointed
and closed too.
"""
# We have to shutdown the background threads before we acquire the
# lock, or we'll could end up closing the environment before the
# autopacking thread exits.
self._stop = True
# Stop the autopacker thread
if self._autopacker:
self.log('stopping autopacking thread')
# Setting the event also toggles the stop flag
self._autopackstop.set()
self._autopacker.join(JOIN_TIME)
if self._checkpointer:
self.log('stopping checkpointing thread')
# Setting the event also toggles the stop flag
self._checkpointstop.set()
self._checkpointer.join(JOIN_TIME)
self._lock_acquire()
try:
if not self._closed:
self._doclose()
self._closed = True
finally:
self._lock_release()
self.log('finished closing the database')
def _doclose(self):
# Close all the tables
for d in self._tables:
d.close()
# As recommended by Keith Bostic @ Sleepycat, we need to do
# two checkpoints just before we close the environment.
# Otherwise, auto-recovery on environment opens can be
# extremely costly. We want to do auto-recovery for ease of
# use, although they aren't strictly necessary if the database
# was shutdown gracefully. The DB_FORCE flag is required for
# the second checkpoint, but we include it in both because it
# can't hurt and is more robust.
self._env.txn_checkpoint(0, 0, db.DB_FORCE)
self._env.txn_checkpoint(0, 0, db.DB_FORCE)
lockfile = os.path.join(self._env.db_home, '.lock')
self._lockfile.close()
self._env.close()
os.unlink(lockfile)
# A couple of convenience methods
def _update(self, deltas, data, incdec):
refdoids = []
referencesf(data, refdoids)
for oid in refdoids:
rc = deltas.get(oid, 0) + incdec
if rc == 0:
# Save space in the dict by zapping zeroes
del deltas[oid]
else:
deltas[oid] = rc
def _withlock(self, meth, *args):
self._lock_acquire()
try:
return meth(*args)
finally:
self._lock_release()
def _withtxn(self, meth, *args, **kws):
txn = self._env.txn_begin()
try:
ret = meth(txn, *args, **kws)
except PackStop:
# Escape hatch for shutdown during pack. Like the bare except --
# i.e. abort the transaction -- but swallow the exception.
txn.abort()
except:
## import traceback ; traceback.print_exc()
zLOG.LOG(self.__class__.__name__, zLOG.DEBUG,
"unexpected error in _withtxn", error=sys.exc_info())
txn.abort()
raise
else:
txn.commit()
return ret
def docheckpoint(self):
config = self._config
self._lock_acquire()
try:
if not self._stop:
self._env.txn_checkpoint(config.kbyte, config.min)
finally:
self._lock_release()
def cleanup(self):
"""Remove the entire environment directory for this storage."""
cleanup(self.getName())
def env_from_string(envname, config):
# BSDDB requires that the directory already exists. BAW: do we need to
# adjust umask to ensure filesystem permissions?
try:
os.mkdir(envname)
except OSError, e:
if e.errno <> errno.EEXIST: raise
# already exists
# Create the lock file so no other process can open the environment.
# This is required in order to work around the Berkeley lock
# exhaustion problem (i.e. we do our own application level locks
# rather than rely on Berkeley's finite page locks).
lockpath = os.path.join(envname, '.lock')
try:
lockfile = open(lockpath, 'r+')
except IOError, e:
if e.errno <> errno.ENOENT: raise
lockfile = open(lockpath, 'w+')
lock_file(lockfile)
lockfile.write(str(os.getpid()))
lockfile.flush()
try:
# Create, initialize, and open the environment
env = db.DBEnv()
if config.logdir is not None:
env.set_lg_dir(config.logdir)
gbytes, bytes = divmod(config.cachesize, GBYTES)
env.set_cachesize(gbytes, bytes)
env.open(envname,
db.DB_CREATE # create underlying files as necessary
| db.DB_RECOVER # run normal recovery before opening
| db.DB_INIT_MPOOL # initialize shared memory buffer pool
| db.DB_INIT_TXN # initialize transaction subsystem
| db.DB_THREAD # we use the env from multiple threads
)
except:
lockfile.close()
raise
return env, lockfile
def cleanup(envdir):
"""Remove the entire environment directory for a Berkeley storage."""
try:
shutil.rmtree(envdir)
except OSError, e:
if e.errno <> errno.ENOENT:
raise
class _WorkThread(threading.Thread):
NAME = 'worker'
def __init__(self, storage, event, checkinterval):
threading.Thread.__init__(self)
self._storage = storage
self._event = event
self._interval = checkinterval
# Bookkeeping. _nextcheck is useful as a non-public interface aiding
# testing. See test_autopack.py.
self._stop = False
self._nextcheck = checkinterval
# We don't want these threads to hold up process exit. That could
# lead to corrupt databases, but recovery should ultimately save us.
self.setDaemon(True)
def run(self):
name = self.NAME
self._storage.log('%s thread started', name)
while not self._stop:
now = time.time()
if now >= self._nextcheck:
self._storage.log('running %s', name)
self._dowork()
# Recalculate `now' because _dowork() could have taken a
# while. time.time() can be expensive, but oh well.
self._nextcheck = time.time() + self._interval
# Block w/ timeout on the shutdown event.
self._event.wait(self._interval)
self._stop = self._event.isSet()
self._storage.log('%s thread finished', name)
def _dowork(self):
pass
class _Checkpoint(_WorkThread):
NAME = 'checkpointing'
def _dowork(self):
self._storage.docheckpoint()
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Requirements:
#
# All: BerkeleyDB 4.1.25, available from www.sleepycat.com
# Python 2.2: PyBSDDB 4.1.3 or better, from pybsddb.sf.net
# Python 2.3: nothing extra
try:
from bsddb import db
except ImportError:
try:
from bsddb3 import db
except ImportError:
db = None
# This flag tells other components whether Berkeley storage is available
is_available = bool(db)
# Useful constants
ZERO = '\0'*8
/*****************************************************************************
Copyright (c) 2002 Zope Corporation and Contributors.
All Rights Reserved.
This software is subject to the provisions of the Zope Public License,
Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
FOR A PARTICULAR PURPOSE
****************************************************************************/
#include <Python.h>
/* This helper only works for Python 2.2 and beyond. If we're using an
* older version of Python, stop out now so we don't leave a broken, but
* compiled and importable module laying about. BDBFullStorage.py has a
* workaround for when this extension isn't available.
*/
#if PY_VERSION_HEX < 0x020200F0
#error "Must be using at least Python 2.2"
#endif
/* Increment an 8-byte unsigned integer (represented as an 8-byte raw string),
* by a Python integer.
* The arguments are an 8-byte Python string, and a Python int or long.
* The result is an 8-byte Python string, representing their sum.
* XXX It's unclear what this intends to do if the sum overflows an 8-byte
* XXX unsigned integer. _PyLong_AsByteArray should raise OverflowError then.
*/
static PyObject*
helper_incr(PyObject* self, PyObject* args)
{
PyObject *pylong = NULL, *incr, *sum = NULL, *result = NULL;
char *s, x[8];
int len, res;
if (!PyArg_ParseTuple(args, "s#O:incr", &s, &len, &incr))
return NULL;
assert(len == 8);
/* There seems to be no direct route from byte array to long long, so
* first convert it to a PyLongObject*, then convert /that/ thing to a
* long long.
*/
pylong = _PyLong_FromByteArray(s, len,
0 /* big endian */,
0 /* unsigned */);
if (!pylong)
return NULL;
sum = PyNumber_Add(pylong, incr);
if (!sum)
goto err;
res = _PyLong_AsByteArray((PyLongObject*)sum, x, 8,
0 /* big endian */,
0 /* unsigned */);
if (res < 0)
goto err;
result = PyString_FromStringAndSize(x, 8);
err:
Py_XDECREF(pylong);
Py_XDECREF(sum);
return result;
}
static PyMethodDef helper_methods[] = {
{"incr", helper_incr, METH_VARARGS},
{NULL, NULL} /* sentinel */
};
DL_EXPORT(void)
init_helper(void)
{
(void)Py_InitModule("_helper", helper_methods);
}
import os
import pprint
from hotshot.log import LogReader
prevloc = None
byline = {}
log = LogReader('profile.dat')
for what, place, tdelta in log:
byline[prevloc] = tdelta + byline.get(prevloc, 0)
byline.setdefault(place,0)
prevloc = place
# Sort
results = [(v, k) for k, v in byline.items()]
results.sort()
for usecs, place in results:
if not place:
print 'Bad unpack:', usecs, place
continue
filename, line, funcname = place
print '%08d' % usecs, os.path.split(filename)[1], line
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Basic test framework class for both the BDBFullStorage and BDBMinimalStorage
# Berkeley storages
import os
import shutil
import sys
import unittest
import BDBStorage
from BDBStorage.BerkeleyBase import BerkeleyConfig
from ZODB.tests.StorageTestBase import StorageTestBase
DBHOME = 'test-db'
class BerkeleyTestBase(StorageTestBase):
def _config(self, read_only=False):
# Checkpointing just slows the tests down because we have to wait for
# the thread to properly shutdown. This can take up to 10 seconds, so
# for the purposes of the test suite we shut off this thread.
config = BerkeleyConfig()
config.interval = 0
config.read_only = read_only
return config
def _envdir(self):
return DBHOME
def open(self, read_only=False):
self._storage = self.ConcreteStorage(
self._envdir(), config=self._config(read_only))
def _zap_dbhome(self, dir=None):
if dir is None:
dir = self._envdir()
if os.path.isdir(dir):
shutil.rmtree(dir)
def _mk_dbhome(self, dir=None):
if dir is None:
dir = self._get_envdir()
os.mkdir(dir)
try:
return self.ConcreteStorage(dir, config=self._config())
except:
self._zap_dbhome()
raise
def setUp(self):
StorageTestBase.setUp(self)
self._zap_dbhome()
self.open()
def tearDown(self):
StorageTestBase.tearDown(self)
self._zap_dbhome()
class MinimalTestBase(BerkeleyTestBase):
from BDBStorage import BDBMinimalStorage
ConcreteStorage = BDBMinimalStorage.BDBMinimalStorage
class FullTestBase(BerkeleyTestBase):
from BDBStorage import BDBFullStorage
ConcreteStorage = BDBFullStorage.BDBFullStorage
def makeSuite(*args,**kw):
prefix = kw.get('prefix','check')
level = kw.get('level')
suite = unittest.TestSuite()
if level:
suite.level = level
if BDBStorage.is_available:
for klass in args:
suite.addTest(unittest.makeSuite(klass, prefix))
else:
sys.stderr.write("BDBStorage not available, tests disabled\n")
return suite
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Base class for unit tests at the ZODB layer
import os
from ZODB import DB
from BDBStorage.tests.BerkeleyTestBase import BerkeleyTestBase
DBHOME = 'test-db'
class ZODBTestBase(BerkeleyTestBase):
def setUp(self):
BerkeleyTestBase.setUp(self)
self._db = None
try:
self._db = DB(self._storage)
self._conn = self._db.open()
self._root = self._conn.root()
except:
self.tearDown()
raise
def _close(self):
if self._db is not None:
self._db.close()
self._db = self._storage = self._conn = self._root = None
def tearDown(self):
# If the tests exited with any uncommitted objects, they'll blow up
# subsequent tests because the next transaction commit will try to
# commit those object. But they're tied to closed databases, so
# that's broken. Aborting the transaction now saves us the headache.
try:
get_transaction().abort()
self._close()
finally:
BerkeleyTestBase.tearDown(self)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import time
import unittest
import threading
from persistent.TimeStamp import TimeStamp
from ZODB import DB
from ZODB.Transaction import Transaction
from ZODB.referencesf import referencesf
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle
from persistent import Persistent
import BDBStorage
if BDBStorage.is_available:
from BDBStorage.BDBFullStorage import BDBFullStorage
from BDBStorage.BDBMinimalStorage import BDBMinimalStorage
from BDBStorage.BerkeleyBase import BerkeleyConfig
else:
# Sigh
class FakeBaseClass: pass
BDBFullStorage = BDBMinimalStorage = FakeBaseClass
from BDBStorage import ZERO
from BDBStorage.tests.BerkeleyTestBase import BerkeleyTestBase
try:
True, False
except NameError:
True = 1
False = 0
class C(Persistent):
pass
class TestAutopackBase(BerkeleyTestBase):
def _config(self, read_only=False):
config = BerkeleyConfig()
# Autopack every 1 second, 2 seconds into the past, no classic packs
config.frequency = 1
config.packtime = 2
config.gcpack = 0
config.read_only = read_only
return config
def _wait_for_next_autopack(self):
storage = self._storage
# BAW: this uses a non-public interface
packtime = storage._autopacker._nextcheck
while packtime == storage._autopacker._nextcheck:
time.sleep(0.1)
def _mk_dbhome(self, dir):
# Create the storage
os.mkdir(dir)
try:
return self.ConcreteStorage(dir, config=self._config())
except:
self._zap_dbhome(dir)
raise
class TestAutopack(TestAutopackBase):
ConcreteStorage = BDBFullStorage
def testAutopack(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
# Wait for an autopack operation to occur, then make three revisions
# to an object. Wait for the next autopack operation and make sure
# all three revisions still exist. Then sleep 10 seconds and wait for
# another autopack operation. Then verify that the first two
# revisions have been packed away.
oid = storage.new_oid()
self._wait_for_next_autopack()
revid1 = self._dostore(oid, data=MinPO(2112))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(2113))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(2114))
self._wait_for_next_autopack()
unless(storage.loadSerial(oid, revid1))
unless(storage.loadSerial(oid, revid2))
unless(storage.loadSerial(oid, revid3))
# Two more autopacks ought to be enough to pack away old revisions
self._wait_for_next_autopack()
self._wait_for_next_autopack()
# The first two revisions should now be gone, but the third should
# still exist because it's the current revision, and we haven't done a
# classic pack.
raises(KeyError, self._storage.loadSerial, oid, revid1)
raises(KeyError, self._storage.loadSerial, oid, revid2)
unless(storage.loadSerial(oid, revid3))
class TestAutomaticClassicPack(TestAutopackBase):
ConcreteStorage = BDBFullStorage
def _config(self, read_only=False):
config = BerkeleyConfig()
# Autopack every 1 second, 2 seconds into the past, classic packing
# every time.
config.frequency = 1
config.packtime = 2
config.gcpack = 1
config.read_only = read_only
return config
def testAutomaticClassicPack(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
# Wait for an autopack operation to occur, then make three revisions
# to an object. Wait for the next autopack operation and make sure
# all three revisions still exist. Then sleep 10 seconds and wait for
# another autopack operation. Then verify that the first two
# revisions have been packed away.
oid = storage.new_oid()
self._wait_for_next_autopack()
revid1 = self._dostore(oid, data=MinPO(2112))
revid2 = self._dostore(oid, revid=revid1, data=MinPO(2113))
revid3 = self._dostore(oid, revid=revid2, data=MinPO(2114))
self._wait_for_next_autopack()
unless(storage.loadSerial(oid, revid1))
unless(storage.loadSerial(oid, revid2))
unless(storage.loadSerial(oid, revid3))
# Two more autopacks ought to be enough to pack away old revisions
self._wait_for_next_autopack()
self._wait_for_next_autopack()
# The first two revisions should now be gone, but the third should
# still exist because it's the current revision, and we haven't done a
# classic pack.
raises(KeyError, storage.loadSerial, oid, revid1)
raises(KeyError, storage.loadSerial, oid, revid2)
raises(KeyError, storage.loadSerial, oid, revid3)
def testCycleUnreachable(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
db = DB(storage)
conn = db.open()
root = conn.root()
self._wait_for_next_autopack()
# Store an object that's reachable from the root
obj1 = C()
obj2 = C()
obj1.obj = obj2
obj2.obj = obj1
root.obj = obj1
txn = get_transaction()
txn.note('root -> obj1 <-> obj2')
txn.commit()
oid1 = obj1._p_oid
oid2 = obj2._p_oid
assert oid1 and oid2 and oid1 <> oid2
self._wait_for_next_autopack()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# Now unlink it, which should still leave obj1 and obj2 alive
del root.obj
txn = get_transaction()
txn.note('root -X-> obj1 <-> obj2')
txn.commit()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# Do an explicit full pack to right now to collect all the old
# revisions and the cycle.
storage.pack(time.time(), referencesf)
# And it should be packed away
unless(storage.load(ZERO, ''))
raises(KeyError, storage.load, oid1, '')
raises(KeyError, storage.load, oid2, '')
class TestMinimalPack(TestAutopackBase):
ConcreteStorage = BDBMinimalStorage
def _config(self, read_only=False):
config = BerkeleyConfig()
# Autopack every 3 seconds
config.frequency = 3
config.read_only = read_only
return config
def testRootUnreachable(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
db = DB(storage)
conn = db.open()
root = conn.root()
self._wait_for_next_autopack()
# Store an object that's reachable from the root
obj = C()
obj.value = 999
root.obj = obj
txn = get_transaction()
txn.note('root -> obj')
txn.commit()
oid = obj._p_oid
assert oid
self._wait_for_next_autopack()
unless(storage.load(ZERO, ''))
unless(storage.load(oid, ''))
# Now unlink it
del root.obj
txn = get_transaction()
txn.note('root -X-> obj')
txn.commit()
# The object should be gone due to reference counting
unless(storage.load(ZERO, ''))
raises(KeyError, storage.load, oid, '')
def testCycleUnreachable(self):
unless = self.failUnless
raises = self.assertRaises
storage = self._storage
db = DB(storage)
conn = db.open()
root = conn.root()
self._wait_for_next_autopack()
# Store an object that's reachable from the root
obj1 = C()
obj2 = C()
obj1.obj = obj2
obj2.obj = obj1
root.obj = obj1
txn = get_transaction()
txn.note('root -> obj1 <-> obj2')
txn.commit()
oid1 = obj1._p_oid
oid2 = obj2._p_oid
assert oid1 and oid2 and oid1 <> oid2
self._wait_for_next_autopack()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# Now unlink it, which should still leave obj1 and obj2 alive
del root.obj
txn = get_transaction()
txn.note('root -X-> obj1 <-> obj2')
txn.commit()
unless(storage.load(ZERO, ''))
unless(storage.load(oid1, ''))
unless(storage.load(oid2, ''))
# But the next autopack should collect both obj1 and obj2
self._wait_for_next_autopack()
# And it should be packed away
unless(storage.load(ZERO, ''))
raises(KeyError, storage.load, oid1, '')
raises(KeyError, storage.load, oid2, '')
def test_suite():
suite = unittest.TestSuite()
suite.level = 2
if BDBStorage.is_available:
suite.addTest(unittest.makeSuite(TestAutopack))
suite.addTest(unittest.makeSuite(TestAutomaticClassicPack))
suite.addTest(unittest.makeSuite(TestMinimalPack))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Unit test for database creation
import os
import time
import unittest
import BDBStorage
if BDBStorage.is_available:
from BDBStorage.BerkeleyBase import BerkeleyConfig
from BDBStorage.BDBFullStorage import BDBFullStorage
from BDBStorage.tests import BerkeleyTestBase
class TestMixin:
def testDBHomeExists(self):
self.failUnless(os.path.isdir(BerkeleyTestBase.DBHOME))
class MinimalCreateTest(BerkeleyTestBase.MinimalTestBase, TestMixin):
pass
class FullCreateTest(BerkeleyTestBase.FullTestBase, TestMixin):
pass
class FullOpenExistingTest(BerkeleyTestBase.FullTestBase):
def testOpenWithExistingVersions(self):
version = 'test-version'
oid = self._storage.new_oid()
revid = self._dostore(oid, data=7, version=version)
# Now close the current storage and re-open it
self._storage.close()
self._storage = self.ConcreteStorage(BerkeleyTestBase.DBHOME)
self.assertEqual(self._storage.modifiedInVersion(oid), version)
def testOpenAddVersion(self):
eq = self.assertEqual
version1 = 'test-version'
oid1 = self._storage.new_oid()
revid = self._dostore(oid1, data=7, version=version1)
# Now close the current storage and re-open it
self._storage.close()
self._storage = self.ConcreteStorage(BerkeleyTestBase.DBHOME)
eq(self._storage.modifiedInVersion(oid1), version1)
# Now create a 2nd version string, then close/reopen
version2 = 'new-version'
oid2 = self._storage.new_oid()
revid = self._dostore(oid2, data=8, version=version2)
# Now close the current storage and re-open it
self._storage.close()
self._storage = self.ConcreteStorage(BerkeleyTestBase.DBHOME)
eq(self._storage.modifiedInVersion(oid1), version1)
# Now create a 2nd version string, then close/reopen
eq(self._storage.modifiedInVersion(oid2), version2)
class FullOpenCloseTest(BerkeleyTestBase.FullTestBase):
level = 2
def _mk_dbhome(self, dir):
config = BerkeleyConfig
config.interval = 10
os.mkdir(dir)
try:
return self.ConcreteStorage(dir, config=config)
except:
self._zap_dbhome(dir)
raise
def testCloseWithCheckpointingThread(self):
# All the interesting stuff happens in the setUp and tearDown
time.sleep(20)
class OpenRecoveryTest(BerkeleyTestBase.FullTestBase):
def open(self):
self._storage = None
def testOpenWithBogusConfig(self):
class C: pass
c = C()
# This instance won't have the necessary attributes, so the creation
# will fail. We want to be sure that everything gets cleaned up
# enough to fix that and create a proper storage.
dir = self._envdir()
self.assertRaises(AttributeError, BDBFullStorage, dir, config=c)
c = BerkeleyConfig()
s = BDBFullStorage(dir, config=c)
s.close()
def test_suite():
return BDBStorage.tests.BerkeleyTestBase.makeSuite(
MinimalCreateTest,
FullCreateTest,
FullOpenExistingTest,
FullOpenCloseTest,
OpenRecoveryTest,
prefix='test'
)
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Unit tests for basic storage functionality
import unittest
from ZODB import POSException
import BDBStorage
from BDBStorage.tests import BerkeleyTestBase
from ZODB.tests.BasicStorage import BasicStorage
from ZODB.tests.HistoryStorage import HistoryStorage
from ZODB.tests.IteratorStorage import IteratorStorage, ExtendedIteratorStorage
from ZODB.tests.MTStorage import MTStorage
from ZODB.tests.PackableStorage import PackableStorage, PackableUndoStorage
from ZODB.tests.PersistentStorage import PersistentStorage
from ZODB.tests.ReadOnlyStorage import ReadOnlyStorage
from ZODB.tests.RecoveryStorage import RecoveryStorage
from ZODB.tests.RevisionStorage import RevisionStorage
from ZODB.tests.Synchronization import SynchronizedStorage
from ZODB.tests.TransactionalUndoStorage import TransactionalUndoStorage
from ZODB.tests.TransactionalUndoVersionStorage import \
TransactionalUndoVersionStorage
from ZODB.tests.VersionStorage import VersionStorage
from ZODB.tests import ConflictResolution
class MinimalTest(BerkeleyTestBase.MinimalTestBase,
BasicStorage,
MTStorage,
PackableStorage,
ReadOnlyStorage,
SynchronizedStorage,
):
def checkVersionedStoreAndLoad(self):
# This storage doesn't support versions, so we should get an exception
oid = self._storage.new_oid()
self.assertRaises(POSException.Unsupported,
self._dostore,
oid, data=11, version='a version')
class FullTest(BerkeleyTestBase.FullTestBase, BasicStorage,
RevisionStorage, VersionStorage,
TransactionalUndoStorage,
TransactionalUndoVersionStorage,
PackableStorage, PackableUndoStorage,
HistoryStorage,
IteratorStorage, ExtendedIteratorStorage,
ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage,
SynchronizedStorage,
PersistentStorage,
MTStorage,
ReadOnlyStorage):
pass
DST_DBHOME = 'test-dst'
class FullRecoveryTest(BerkeleyTestBase.FullTestBase,
RecoveryStorage):
def setUp(self):
BerkeleyTestBase.FullTestBase.setUp(self)
self._zap_dbhome(DST_DBHOME)
self._dst = self._mk_dbhome(DST_DBHOME)
def tearDown(self):
BerkeleyTestBase.FullTestBase.tearDown(self)
self._dst.close()
self._zap_dbhome(DST_DBHOME)
def new_dest(self):
self._zap_dbhome(DST_DBHOME)
return self._mk_dbhome(DST_DBHOME)
def test_suite():
return BDBStorage.tests.BerkeleyTestBase.makeSuite(
FullTest,
FullRecoveryTest,
MinimalTest,
)
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Test creation of a brand new database, and insertion of root objects.
import unittest
import BDBStorage
from BDBStorage.tests.ZODBTestBase import ZODBTestBase
from persistent.mapping import PersistentMapping
class InsertMixin:
def checkIsEmpty(self):
self.failUnless(not self._root.has_key('names'))
def checkNewInserts(self):
self._root['names'] = names = PersistentMapping()
names['Warsaw'] = 'Barry'
names['Hylton'] = 'Jeremy'
get_transaction().commit()
class FullNewInsertsTest(ZODBTestBase, InsertMixin):
from BDBStorage import BDBFullStorage
ConcreteStorage = BDBFullStorage.BDBFullStorage
class MinimalNewInsertsTest(ZODBTestBase, InsertMixin):
from BDBStorage import BDBMinimalStorage
ConcreteStorage = BDBMinimalStorage.BDBMinimalStorage
def test_suite():
return BDBStorage.tests.BerkeleyTestBase.makeSuite(
MinimalNewInsertsTest,
FullNewInsertsTest,
)
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Whitebox testing of storage implementation details.
import unittest
from ZODB.utils import U64
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
import BDBStorage
if BDBStorage.is_available:
from BDBStorage.BDBMinimalStorage import BDBMinimalStorage
from BDBStorage.BDBFullStorage import BDBFullStorage
else:
# Sigh
class FakeBaseClass: pass
BDBFullStorage = BDBMinimalStorage = FakeBaseClass
from BDBStorage.tests.ZODBTestBase import ZODBTestBase
from BDBStorage.tests.BerkeleyTestBase import BerkeleyTestBase
from persistent import Persistent
ZERO = '\0'*8
class Object(Persistent):
pass
class WhiteboxLowLevelMinimal(BerkeleyTestBase):
ConcreteStorage = BDBMinimalStorage
def checkTableConsistencyAfterCommit(self):
unless = self.failIf
eq = self.assertEqual
oid = self._storage.new_oid()
revid1 = self._dostore(oid, data=11)
revid2 = self._dostore(oid, revid=revid1, data=12)
revid3 = self._dostore(oid, revid=revid2, data=13)
# First off, there should be no entries in the pending table
unless(self._storage._pending.keys())
# Also, there should be no entries in the oids table
unless(self._storage._oids.keys())
# Now, there should be exactly one oid in the serials table, and
# exactly one record for that oid in the table too.
oids = {}
c = self._storage._serials.cursor()
try:
rec = c.first()
while rec:
oid, serial = rec
oids.setdefault(oid, []).append(serial)
rec = c.next()
finally:
c.close()
eq(len(oids), 1)
eq(len(oids[oids.keys()[0]]), 1)
# There should now be exactly one entry in the pickles table.
pickles = self._storage._pickles.items()
eq(len(pickles), 1)
key, data = pickles[0]
poid = key[:8]
pserial = key[8:]
eq(oid, poid)
eq(revid3, pserial)
obj = zodb_unpickle(data)
eq(obj.value, 13)
# Now verify the refcounts table, which should be empty because the
# stored object isn't referenced by any other objects.
eq(len(self._storage._refcounts.keys()), 0)
def checkStorageVersionAfterCreation(self):
from BDBStorage.BDBMinimalStorage import BDBMINIMAL_SCHEMA_VERSION
eq = self.assertEqual
eq(self._storage._info['version'], BDBMINIMAL_SCHEMA_VERSION)
self._storage.close()
self.open()
eq(self._storage._info['version'], BDBMINIMAL_SCHEMA_VERSION)
class WhiteboxHighLevelMinimal(ZODBTestBase):
ConcreteStorage = BDBMinimalStorage
def checkReferenceCounting(self):
eq = self.assertEqual
obj = MinPO(11)
self._root.obj = obj
get_transaction().commit()
obj.value = 12
get_transaction().commit()
obj.value = 13
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 2)
# And now refcount out the object
del self._root.obj
get_transaction().commit()
# Verification stage. Our serials table should have exactly one
# entry, oid == 0
keys = self._storage._serials.keys()
eq(len(keys), 1)
eq(len(self._storage._serials.items()), 1)
eq(keys[0], ZERO)
# The pickles table now should have exactly one revision of the root
# object, and no revisions of the MinPO object, which should have been
# collected away.
pickles = self._storage._pickles.items()
eq(len(pickles), 1)
rec = pickles[0]
key = rec[0]
data = rec[1]
eq(key[:8], ZERO)
# And that pickle should have no 'obj' attribute.
unobj = zodb_unpickle(data)
self.failIf(hasattr(unobj, 'obj'))
# Our refcounts table should have no entries in it, because the root
# object is an island.
eq(len(self._storage._refcounts.keys()), 0)
# And of course, oids and pendings should be empty too
eq(len(self._storage._oids.keys()), 0)
eq(len(self._storage._pending.keys()), 0)
def checkRecursiveReferenceCounting(self):
eq = self.assertEqual
obj1 = Object()
obj2 = Object()
obj3 = Object()
obj4 = Object()
self._root.obj = obj1
obj1.obj = obj2
obj2.obj = obj3
obj3.obj = obj4
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 5)
eq(len(self._storage._pickles.items()), 5)
# And now refcount out the object
del self._root.obj
get_transaction().commit()
# Verification stage. Our serials table should have exactly one
# entry, oid == 0
keys = self._storage._serials.keys()
eq(len(keys), 1)
eq(len(self._storage._serials.items()), 1)
eq(keys[0], ZERO)
# The pickles table now should have exactly one revision of the root
# object, and no revisions of any other objects, which should have
# been collected away.
pickles = self._storage._pickles.items()
eq(len(pickles), 1)
rec = pickles[0]
key = rec[0]
data = rec[1]
eq(key[:8], ZERO)
# And that pickle should have no 'obj' attribute.
unobj = zodb_unpickle(data)
self.failIf(hasattr(unobj, 'obj'))
# Our refcounts table should have no entries in it, because the root
# object is an island.
eq(len(self._storage._refcounts.keys()), 0)
# And of course, oids and pendings should be empty too
eq(len(self._storage._oids.keys()), 0)
eq(len(self._storage._pending.keys()), 0)
class WhiteboxLowLevelFull(BerkeleyTestBase):
ConcreteStorage = BDBFullStorage
def checkStorageVersionAfterCreation(self):
from BDBStorage.BDBFullStorage import BDBFULL_SCHEMA_VERSION
eq = self.assertEqual
eq(self._storage._info['version'], BDBFULL_SCHEMA_VERSION)
self._storage.close()
self.open()
eq(self._storage._info['version'], BDBFULL_SCHEMA_VERSION)
class WhiteboxHighLevelFull(ZODBTestBase):
ConcreteStorage = BDBFullStorage
def checkReferenceCounting(self):
eq = self.assertEqual
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 1)
eq(len(self._storage._pickles.items()), 1)
# Now store an object
obj = MinPO(11)
self._root.obj = obj
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 3)
obj.value = 12
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 4)
obj.value = 13
get_transaction().commit()
# Make sure the databases have what we expect
eq(len(self._storage._serials.items()), 2)
eq(len(self._storage._pickles.items()), 5)
# And now refcount out the object
del self._root.obj
get_transaction().commit()
# Verification stage. Our serials tabl should still have 2 entries,
# one for the root object and one for the now unlinked MinPO obj.
keys = self._storage._serials.keys()
eq(len(keys), 2)
eq(len(self._storage._serials.items()), 2)
eq(keys[0], ZERO)
# The pickles table should now have 6 entries, broken down like so:
# - 3 revisions of the root object: the initial database-open
# revision, the revision that got its obj attribute set, and the
# revision that got its obj attribute deleted.
# - 3 Three revisions of obj, corresponding to values 11, 12, and 13
pickles = self._storage._pickles.items()
eq(len(pickles), 6)
# Our refcounts table should have one entry in it for the MinPO that's
# referenced in an earlier revision of the root object
eq(len(self._storage._refcounts.keys()), 1)
# And of course, oids and pendings should be empty too
eq(len(self._storage._oids.keys()), 0)
eq(len(self._storage._pending.keys()), 0)
def test_suite():
return BDBStorage.tests.BerkeleyTestBase.makeSuite(
WhiteboxLowLevelMinimal,
WhiteboxHighLevelMinimal,
WhiteboxLowLevelFull,
WhiteboxHighLevelFull,
)
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Test some simple ZODB level stuff common to both the BDBMinimalStorage and
# BDBFullStorage storages, like transaction aborts and commits, changing
# objects, etc. Doesn't test undo, versions, or packing.
import time
import unittest
import BDBStorage
from BDBStorage.tests.ZODBTestBase import ZODBTestBase
from persistent.mapping import PersistentMapping
class CommitAndRead:
def checkCommit(self):
self.failUnless(not self._root)
names = self._root['names'] = PersistentMapping()
names['Warsaw'] = 'Barry'
names['Hylton'] = 'Jeremy'
get_transaction().commit()
def checkReadAfterCommit(self):
eq = self.assertEqual
self.checkCommit()
names = self._root['names']
eq(names['Warsaw'], 'Barry')
eq(names['Hylton'], 'Jeremy')
self.failUnless(names.get('Drake') is None)
def checkAbortAfterRead(self):
self.checkReadAfterCommit()
names = self._root['names']
names['Drake'] = 'Fred'
get_transaction().abort()
def checkReadAfterAbort(self):
self.checkAbortAfterRead()
names = self._root['names']
self.failUnless(names.get('Drake') is None)
def checkChangingCommits(self):
self.checkReadAfterAbort()
now = time.time()
# Make sure the last timestamp was more than 3 seconds ago
timestamp = self._root.get('timestamp')
if timestamp is None:
timestamp = self._root['timestamp'] = 0
get_transaction().commit()
self.failUnless(now > timestamp + 3)
self._root['timestamp'] = now
time.sleep(3)
class MinimalCommitAndRead(ZODBTestBase, CommitAndRead):
from BDBStorage import BDBMinimalStorage
ConcreteStorage = BDBMinimalStorage.BDBMinimalStorage
class FullCommitAndRead(ZODBTestBase, CommitAndRead):
from BDBStorage import BDBFullStorage
ConcreteStorage = BDBFullStorage.BDBFullStorage
def test_suite():
return BDBStorage.tests.BerkeleyTestBase.makeSuite(
MinimalCommitAndRead,
FullCommitAndRead
)
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Framework for running Unit tests
import unittest
MODULES = ('commitlog', 'create', 'virgin', 'zodb_simple', 'storage_api')
def suite():
alltests = unittest.TestSuite()
for modname in MODULES:
mod = __import__('test_'+modname)
alltests.addTest(mod.test_suite())
return alltests
def test_suite():
# Just to silence the top-level test.py
return None
if __name__ == '__main__':
unittest.main(defaultTest='suite')
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Time transaction commits and normalize vs. pickle size and #objects.
Actually just counts the size of pickles in the transaction via the iterator
protocol, so storage overheads aren't counted.
Usage: %(PROGRAM)s [options]
Options:
-h/--help
Print this message and exit.
-s filename
--source=filename
Use database in filename as the source (must be a FileStorage)
-d filename
--dest=filename
Use database in filename as the destination (must be a BDB storage)
-o filename
--output=filename
Print results in filename, otherwise stdout.
-m txncount
--max=txncount
Stop after committing txncount transactions.
-k txncount
--skip=txncount
Skip the first txncount transactions.
-p/--profile
Turn on specialized profiling.
-q/--quiet
Be quite.
"""
import sys
import os
import getopt
import time
import errno
import profile
import traceback
import marshal
from bsddb3 import db
from ZODB import utils
from persistent.TimeStamp import TimeStamp
from ZODB.FileStorage import FileStorage
from BDBStorage.BDBFullStorage import BDBFullStorage
PROGRAM = sys.argv[0]
ZERO = '\0'*8
def usage(code, msg=''):
print >> sys.stderr, __doc__ % globals()
if msg:
print >> sys.stderr, msg
sys.exit(code)
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], 'hs:d:qo:l:pm:k:',
['help', 'source=', 'dest=', 'quiet',
'output=', 'logfile=', 'profile',
'max=', 'skip='])
except getopt.error, msg:
usage(1, msg)
class Options:
source = None
dest = None
verbose = 1
outfile = None
logfile = None
profilep = 0
maxtxn = -1
skiptxn = -1
options = Options()
for opt, arg in opts:
if opt in ('-h', '--help'):
usage(0)
elif opt in ('-s', '--source'):
options.source = arg
elif opt in ('-d', '--dest'):
options.dest = arg
elif opt in ('-q', '--quiet'):
options.verbose = 0
elif opt in ('-o', '--output'):
options.outfile = arg
elif opt in ('-l', '--logfile'):
options.logfile = arg
elif opt in ('-p', '--profile'):
options.profilep = 1
elif opt in ('-m', '--max'):
options.maxtxn = int(arg)
elif opt in ('-k', '--skip'):
options.skiptxn = int(arg)
if args:
usage(1)
if not options.source or not options.dest:
usage(1, 'Source and destination databases must be provided')
# Open the output file
if options.outfile is None:
options.outfp = sys.stdout
options.outclosep = 0
else:
options.outfp = open(options.outfile, 'w')
options.outclosep = 1
# Open the logfile
if options.logfile is None:
options.logfp = sys.stdout
options.logclosep = 0
else:
options.logfp = open(options.logfile, 'w')
options.logclosep = 1
# Print a comment, this is a hack
print >> options.outfp, '# FS->BDB 3.3.11'
print >> options.outfp, '#', time.ctime()
print >>sys.stderr, 'Opening source FileStorage...'
t0 = time.time()
srcdb = FileStorage(options.source, read_only=1)
t1 = time.time()
print >>sys.stderr, 'Opening source FileStorage done. %s seconds' % (t1-t0)
#
# Uncomment this section to do a FS->BDB migration
#
print >>sys.stderr, 'Opening destination BDB...'
t0 = time.time()
dstdb = BDBFullStorage(options.dest)
t1 = time.time()
print >>sys.stderr, 'Opening destination BDB done. %s seconds' % (t1-t0)
#
# Uncomment this section to do a FS->FS migration
#
## print >>sys.stderr, 'Opening destination FileStorage...'
## t0 = time.time()
## dstdb = FileStorage(dest)
## t1 = time.time()
## print >>sys.stderr, 'Opening destination FileStorage done. %s seconds' % (
## t1-t0)
try:
t0 = time.time()
doit(srcdb, dstdb, options)
t1 = time.time()
print 'Total time:', t1-t0
finally:
# Done
srcdb.close()
dstdb.close()
if options.outclosep:
options.outfp.close()
if options.logclosep:
options.logfp.close()
def doit(srcdb, dstdb, options):
outfp = options.outfp
logfp = options.logfp
profilep = options.profilep
verbose = options.verbose
# some global information
largest_pickle = 0
largest_txn_in_size = 0
largest_txn_in_objects = 0
# Ripped from BaseStorage.copyTransactionsFrom()
ts = None
ok = 1
prevrevids = {}
counter = 0
skipper = 0
for txn in srcdb.iterator():
skipper += 1
if skipper <= options.skiptxn:
continue
counter += 1
if counter > options.maxtxn > 0:
break
tid = txn.tid
if ts is None:
ts = TimeStamp(tid)
else:
t = TimeStamp(tid)
if t <= ts:
if ok:
print 'Time stamps are out of order %s, %s' % (ts, t)
ok = 0
ts = t.laterThan(ts)
tid = `ts`
else:
ts = t
if not ok:
print 'Time stamps are back in order %s' % t
ok = 1
if verbose:
print ts
prof = None
if profilep and (counter % 100) == 0:
prof = profile.Profile()
objects = 0
size = 0
t0 = time.time()
dstdb.tpc_begin(txn, tid, txn.status)
t1 = time.time()
try:
for r in txn:
oid = r.oid
objects += 1
thissize = len(r.data)
size += thissize
if thissize > largest_pickle:
largest_pickle = thissize
if verbose:
if not r.version:
vstr = 'norev'
else:
vstr = r.version
print utils.U64(oid), vstr, len(r.data)
oldrevid = prevrevids.get(oid, ZERO)
newrevid = dstdb.store(oid, oldrevid, r.data, r.version, txn)
prevrevids[oid] = newrevid
t2 = time.time()
dstdb.tpc_vote(txn)
t3 = time.time()
# Profile every 100 transactions
if prof:
prof.runcall(dstdb.tpc_finish, txn)
else:
dstdb.tpc_finish(txn)
t4 = time.time()
except KeyError, e:
traceback.print_exc(file=logfp)
# record the results
if objects > largest_txn_in_objects:
largest_txn_in_objects = objects
if size > largest_txn_in_size:
largest_txn_in_size = size
print >> outfp, utils.U64(tid), objects, size, t4-t0, \
t1-t0, t2-t1, t3-t2, t4-t3
if prof:
prof.create_stats()
fp = open('profile-%02d.txt' % (counter / 100), 'wb')
marshal.dump(prof.stats, fp)
fp.close()
print >> outfp, largest_pickle, largest_txn_in_size, largest_txn_in_objects
if __name__ == '__main__':
main()
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Time transaction commits and normalize vs. pickle size and #objects.
Actually just counts the size of pickles in the transaction via the iterator
protocol, so storage overheads aren't counted.
Usage: %(PROGRAM)s [options]
Options:
-h/--help
Print this message and exit.
-s filename
--source=filename
Use database in filename as the source (must be a FileStorage)
-d filename
--dest=filename
Use database in filename as the destination (must be a BDB storage)
-o filename
--output=filename
Print results in filename, otherwise stdout.
-m txncount
--max=txncount
Stop after committing txncount transactions.
-k txncount
--skip=txncount
Skip the first txncount transactions.
-p/--profile
Turn on specialized profiling.
-q/--quiet
Be quite.
"""
import sys
import os
import getopt
import time
import errno
import profile
import traceback
import marshal
from bsddb3 import db
from ZODB import utils
from persistent.TimeStamp import TimeStamp
from ZODB.FileStorage import FileStorage
from BDBStorage.BDBFullStorage import BDBFullStorage
PROGRAM = sys.argv[0]
ZERO = '\0'*8
def usage(code, msg=''):
print >> sys.stderr, __doc__ % globals()
if msg:
print >> sys.stderr, msg
sys.exit(code)
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], 'hs:d:qo:l:pm:k:',
['help', 'source=', 'dest=', 'quiet',
'output=', 'logfile=', 'profile',
'max=', 'skip='])
except getopt.error, msg:
usage(1, msg)
class Options:
source = None
dest = None
verbose = 1
outfile = None
logfile = None
profilep = 0
maxtxn = -1
skiptxn = -1
options = Options()
for opt, arg in opts:
if opt in ('-h', '--help'):
usage(0)
elif opt in ('-s', '--source'):
options.source = arg
elif opt in ('-d', '--dest'):
options.dest = arg
elif opt in ('-q', '--quiet'):
options.verbose = 0
elif opt in ('-o', '--output'):
options.outfile = arg
elif opt in ('-l', '--logfile'):
options.logfile = arg
elif opt in ('-p', '--profile'):
options.profilep = 1
elif opt in ('-m', '--max'):
options.maxtxn = int(arg)
elif opt in ('-k', '--skip'):
options.skiptxn = int(arg)
if args:
usage(1)
if not options.source or not options.dest:
usage(1, 'Source and destination databases must be provided')
# Open the output file
if options.outfile is None:
options.outfp = sys.stdout
options.outclosep = 0
else:
options.outfp = open(options.outfile, 'w')
options.outclosep = 1
# Open the logfile
if options.logfile is None:
options.logfp = sys.stdout
options.logclosep = 0
else:
options.logfp = open(options.logfile, 'w')
options.logclosep = 1
# Print a comment, this is a hack
print >> options.outfp, '# FS->BDB 3.3.11'
print >> options.outfp, '#', time.ctime()
print >>sys.stderr, 'Opening source FileStorage...'
t0 = time.time()
srcdb = FileStorage(options.source, read_only=1)
t1 = time.time()
print >>sys.stderr, 'Opening source FileStorage done. %s seconds' % (t1-t0)
#
# Uncomment this section to do a FS->BDB migration
#
print >>sys.stderr, 'Opening destination BDB...'
t0 = time.time()
## dstdb = BDBFullStorage(options.dest)
dstdb = None
t1 = time.time()
print >>sys.stderr, 'Opening destination BDB done. %s seconds' % (t1-t0)
#
# Uncomment this section to do a FS->FS migration
#
## print >>sys.stderr, 'Opening destination FileStorage...'
## t0 = time.time()
## dstdb = FileStorage(dest)
## t1 = time.time()
## print >>sys.stderr, 'Opening destination FileStorage done. %s seconds' % (
## t1-t0)
try:
t0 = time.time()
doit(srcdb, dstdb, options)
t1 = time.time()
print 'Total time:', t1-t0
finally:
# Done
srcdb.close()
## dstdb.close()
if options.outclosep:
options.outfp.close()
if options.logclosep:
options.logfp.close()
def doit(srcdb, dstdb, options):
outfp = options.outfp
logfp = options.logfp
profilep = options.profilep
verbose = options.verbose
# some global information
largest_pickle = 0
largest_txn_in_size = 0
largest_txn_in_objects = 0
# Ripped from BaseStorage.copyTransactionsFrom()
ts = None
ok = 1
prevrevids = {}
counter = 0
skipper = 0
from bsddb3 import db
env = db.DBEnv()
env.open('BDB',
db.DB_CREATE # create underlying files as necessary
| db.DB_RECOVER # run normal recovery before opening
| db.DB_INIT_MPOOL # initialize shared memory buffer pool
| db.DB_INIT_LOCK # initialize locking subsystem
| db.DB_INIT_TXN # initialize transaction subsystem
| db.DB_THREAD # we use the environment from other threads
)
d = db.DB(env)
d.open('zodb_picklesauce', db.DB_BTREE, db.DB_CREATE)
for txn in srcdb.iterator():
skipper += 1
if skipper <= options.skiptxn:
continue
counter += 1
if counter > options.maxtxn > 0:
break
tid = txn.tid
if ts is None:
ts = TimeStamp(tid)
else:
t = TimeStamp(tid)
if t <= ts:
if ok:
print 'Time stamps are out of order %s, %s' % (ts, t)
ok = 0
ts = t.laterThan(ts)
tid = `ts`
else:
ts = t
if not ok:
print 'Time stamps are back in order %s' % t
ok = 1
if verbose:
print ts
prof = None
if profilep and (counter % 100) == 0:
prof = profile.Profile()
objects = 0
size = 0
t0 = time.time()
t1 = time.time()
try:
dbtxn = env.txn_begin()
for r in txn:
oid = r.oid
objects += 1
thissize = len(r.data)
size += thissize
if thissize > largest_pickle:
largest_pickle = thissize
if verbose:
if not r.version:
vstr = 'norev'
else:
vstr = r.version
print utils.U64(oid), vstr, len(r.data)
key = oid + tid
d.put(key, r.data, txn=dbtxn)
t2 = time.time()
t3 = time.time()
dbtxn.commit()
t4 = time.time()
except KeyError, e:
traceback.print_exc(file=logfp)
# record the results
if objects > largest_txn_in_objects:
largest_txn_in_objects = objects
if size > largest_txn_in_size:
largest_txn_in_size = size
print >> outfp, utils.U64(tid), objects, size, t4-t0, \
t1-t0, t2-t1, t3-t2, t4-t3
if prof:
prof.create_stats()
fp = open('profile-%02d.txt' % (counter / 100), 'wb')
marshal.dump(prof.stats, fp)
fp.close()
d.close()
print >> outfp, largest_pickle, largest_txn_in_size, largest_txn_in_objects
if __name__ == '__main__':
main()
...@@ -215,18 +215,6 @@ setup( ...@@ -215,18 +215,6 @@ setup(
['App/www', ['App/www/*']]], ['App/www', ['App/www/*']]],
) )
# BDBStorage
setup(
name='BDBStorage',
author=AUTHOR,
packages=['BDBStorage', 'BDBStorage.tests'],
ext_modules = [
Extension(name='BDBStorage._helper',
sources=['BDBStorage/_helper.c']),
]
)
# BTrees # BTrees
setup( setup(
name='BTrees', name='BTrees',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment