Commit b15feea9 authored by Jim Fulton's avatar Jim Fulton

debugged export/import, version commit/abort and mostly undo

parent 13b1a917
......@@ -84,22 +84,22 @@
##############################################################################
"""Database connection support
$Id: Connection.py,v 1.5 1999/05/12 15:55:43 jim Exp $"""
__version__='$Revision: 1.5 $'[11:-2]
$Id: Connection.py,v 1.6 1999/05/18 15:55:09 jim Exp $"""
__version__='$Revision: 1.6 $'[11:-2]
from cPickleCache import PickleCache
from bpthread import allocate_lock
from POSException import ConflictError
from POSException import ConflictError, ExportError
from cStringIO import StringIO
from cPickle import Unpickler, Pickler
from ExtensionClass import Base
import Transaction, string, ExportImport
ExtensionKlass=Base.__class__
class HelperClass: pass
ClassType=type(HelperClass)
class Connection:
class Connection(ExportImport.ExportImport):
"""Object managers for individual object space.
An object space is a version of collection of objects. In a
......@@ -117,10 +117,9 @@ class Connection:
self._version=version
self._cache=cache=PickleCache(self, cache_size, cache_deactivate_after)
self._incrgc=cache.incrgc
self._invalidated={}
lock=allocate_lock()
self._a=lock.acquire
self._r=lock.release
self._invalidated=d={}
self._invalid=d.has_key
self._committed=[]
def _breakcr(self):
try: del self._cache
......@@ -217,11 +216,7 @@ class Connection:
Any objects modified since the last transaction are invalidated.
"""
self._db=odb
cache=self._cache
for oid in self._invalidated.keys():
if cache.has_key(oid):
cache[oid]._p_deactivate()
self._invalidated.clear()
self._cache.invalidate(self._invalidated)
return self
......@@ -232,7 +227,8 @@ class Connection:
def commit(self, object, transaction):
oid=object._p_oid
if self._invalidated.has_key(oid): raise ConflictError, oid
invalid=self._invalid
if invalid(oid) or invalid(None): raise ConflictError, oid
self._invalidating.append(oid)
plan=self._planToStore
stack=[]
......@@ -283,7 +279,7 @@ class Connection:
oid=object._p_oid
try: serial=object._p_serial
except: serial='\0'*8
if self._invalidated.has_key(oid): raise ConflictError, oid
if invalid(oid): raise ConflictError, oid
klass = object.__class__
if klass is ExtensionKlass:
......@@ -320,9 +316,6 @@ class Connection:
return topoid
def commitVersion(self, destination=''):
raise 'Not Implemented Yet!'
def db(self): return self._db
def getVersion(self): return self._version
......@@ -334,9 +327,7 @@ class Connection:
it. The object data will be actually invalidated at certain
transaction boundaries.
"""
self._a()
self._invalidated[oid]=1
self._r()
def modifiedInVersion(self, o):
return self._db.modifiedInVersion(o._p_oid)
......@@ -347,11 +338,8 @@ class Connection:
# Note, we no longer mess with the object's state
# flag, _p_changed. This is the object's job.
oid=object._p_oid
self._a()
if self._invalidated.has_key(oid):
self._r()
raise ConflictError, oid
self._r()
invalid=self._invalid
if invalid(oid) or invalid(None): raise ConflictError, oid
p, serial = self._storage.load(oid, self._version)
file=StringIO(p)
unpickler=Unpickler(file)
......@@ -368,45 +356,25 @@ class Connection:
def tpc_abort(self, transaction):
self._storage.tpc_abort(transaction)
cache=self._cache
invalidated=self._invalidated
for oid in invalidated.keys():
if cache.has_key(oid):
cache[oid]._p_deactivate()
invalidated.clear()
for oid in self._invalidating:
cache[oid]._p_deactivate()
cache.invalidate(self._invalidated)
cache.invalidate(self._invalidating)
def tpc_begin(self, transaction):
if self._invalid(None): # Some nitwit invalidated everything!
raise ConflictError, oid
self._invalidating=[]
self._storage.tpc_begin(transaction)
def tpc_finish(self, transaction):
self._storage.tpc_finish(transaction, self.tpc_finish_)
cache=self._cache
invalidated=self._invalidated
for oid in invalidated.keys():
if cache.has_key(oid):
cache[oid]._p_deactivate()
invalidated.clear()
self._cache.invalidate(self._invalidated)
def tpc_finish_(self):
invalidate=self._db.invalidate
for oid in self._invalidating: invalidate(oid, self)
def exportFile(self, oid, file=None):
pass # Not implemented yet
def importFile(self, file):
pass # Not implemented yet
######################################################################
# BoboPOS 2 compat.
def export_file(self, o, file=None): return self.exportFile(o._p_oid, file)
import_file=importFile
class tConnection(Connection):
def close(self):
self._breakcr()
......@@ -84,13 +84,16 @@
##############################################################################
"""Database objects
$Id: DB.py,v 1.5 1999/05/12 15:55:43 jim Exp $"""
__version__='$Revision: 1.5 $'[11:-2]
$Id: DB.py,v 1.6 1999/05/18 15:55:09 jim Exp $"""
__version__='$Revision: 1.6 $'[11:-2]
import cPickle, cStringIO, sys
import cPickle, cStringIO, sys, POSException
from Connection import Connection
from bpthread import allocate_lock
from Transaction import Transaction
from referencesf import referencesf
StringType=type('')
class DB:
"""The Object Database
......@@ -117,6 +120,7 @@ class DB:
"""
self._storage=storage
storage.registerDB(self, None)
try: storage.load('\0\0\0\0\0\0\0\0','')
except:
import PersistentMapping
......@@ -199,7 +203,7 @@ class DB:
finally: self._r()
def abortVersion(self, version):
raise 'Not Yet Implemented'
AbortVersion(self, version)
def cacheDetail(self):
"""Return information on objects in the various caches
......@@ -271,7 +275,7 @@ class DB:
def close(self): self._storage.close()
def commitVersion(self, source, destination=''):
raise 'Not yet implemented'
CommitVersion(self, source, destination)
def exportFile(self, oid, file=None):
raise 'Not yet implemented'
......@@ -294,7 +298,8 @@ class DB:
def importFile(self, file):
raise 'Not yet implemented'
def invalidate(self, oid, connection=None, rc=sys.getrefcount):
def invalidate(self, oid, connection=None, version='',
rc=sys.getrefcount):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
......@@ -303,7 +308,6 @@ class DB:
connection.
"""
if connection is not None: version=connection._version
else: version=''
self._a()
try:
pools,pooll=self._pools
......@@ -327,6 +331,11 @@ class DB:
self._temps=t
finally: self._r()
def invalidateMany(self, oids=None, version=''):
if oids is None: self.invalidate(None, version=version)
else:
for oid in oids: self.invalidate(oid, version=version)
def objectCount(self): return len(self._storage)
def open(self, version='', transaction=None, temporary=0, force=None,
......@@ -344,6 +353,9 @@ class DB:
Note that the connection pool is managed as a stack, to increate the
likelihood that the connection's stack will include useful objects.
"""
if type(version) is not StringType:
raise POSException.Unimplemented, 'temporary versions'
self._a()
try:
......@@ -430,7 +442,7 @@ class DB:
finally: self._r()
def pack(self, t):
self._storage.pack(t,referencesf,-1)
self._storage.pack(t,referencesf)
def setCacheDeactivateAfter(self, v): self._cache_deactivate_after=v
def setCacheSize(self, v): self._cache_size=v
......@@ -449,20 +461,37 @@ class DB:
def versionEmpty(self, version):
return self._storage.versionEmpty(version)
def referencesf(p,rootl,
Unpickler=cPickle.Unpickler,
StringIO=cStringIO.StringIO):
u=Unpickler(StringIO(p))
u.persistent_load=rootl
u.noload()
try: u.noload()
except:
# Hm. We failed to do second load. Maybe there wasn't a
# second pickle. Let's check:
f=StringIO(p)
u=Unpickler(f)
u.persistent_load=[]
u.noload()
if len(p) > f.tell(): raise ValueError, 'Error unpickling, %s' % p
class CommitVersion:
"""An object that will see to version commit
in cooperation with a transaction manager.
"""
def __init__(self, db, version, dest=''):
self._db=db
s=db._storage
self._version=version
self._dest=dest
self.tpc_abort=s.tpc_abort
self.tpc_begin=s.tpc_begin
self.tpc_finish=s.tpc_finish
get_transaction().register(self)
def abort(self, reallyme, t): pass
def commit(self, reallyme, t):
db=self._db
dest=self._dest
for oid in db._storage.commitVersion(self._version, dest, t):
db.invalidate(oid, version=dest)
class AbortVersion(CommitVersion):
"""An object that will see to version abortion
in cooperation with a transaction manager.
"""
def commit(self, reallyme, t):
db=self._db
version=self._version
for oid in db._storage.abortVersion(version, t):
db.invalidate(oid, version=version)
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
import POSException, string
from utils import p64, u64
from referencesf import referencesf
from cStringIO import StringIO
from cPickle import Pickler, Unpickler
class ExportImport:
def exportFile(self, oid, file=None):
if file is None: file=TemporaryFile()
elif type(file) is StringType: file=open(file,'w+b')
write=file.write
write('ZEXP')
version=self._version
ref=referencesf
oids=[oid]
done_oids={}
done=done_oids.has_key
load=self._storage.load
while oids:
oid=oids[0]
del oids[0]
try: p, serial = load(oid, version)
except: pass # Ick, a broken reference
else:
ref(p, oids)
write(oid)
write(p64(len(p)))
write(p)
write(export_end_marker)
return file
def importFile(self, file, clue=''):
# This is tricky, because we need to work in a transaction!
if type(file) is StringType:
file_name=file
file=open(file,'rb')
else:
try: file_name=file.name
except: file_name='(unknown)'
t=get_transaction().sub()
t.note('import into %s from %s' % (self.db().getName(), file_name))
if clue: t.note(clue)
storage=self._storage
new_oid=storage.new_oid
read=file.read
if read(4) != 'ZEXP':
raise POSException.ExportError, 'Invalid export header'
oids={}
wrote_oid=oids.has_key
new_oid=storage.new_oid
store=storage.store
def persistent_load(ooid,
Ghost=Ghost, StringType=StringType,
atoi=string.atoi, TupleType=type(()),
oids=oids, wrote_oid=wrote_oid, new_oid=new_oid):
"Remap a persistent id to a new ID and create a ghost for it."
if type(ooid) is TupleType: ooid, klass = ooid
else: klass=None
if wrote_oid(ooid): oid=oids[ooid]
else:
if klass is None: oid=new_oid()
else: oid=new_oid(), klass
oids[ooid]=oid
Ghost=Ghost()
Ghost.oid=oid
return Ghost
version=self._version
return_oid=None
storage.tpc_begin(t)
try:
while 1:
h=read(16)
if h==export_end_marker: break
if len(h) != 16: raise ExportError, 'Truncated export file'
l=u64(h[8:16])
p=read(l)
if len(p) != l: raise ExportError, 'Truncated export file'
ooid=h[:8]
if oids:
oid=oids[ooid]
if type(oid) is TupleType: oid=oid[0]
else: oids[ooid]=return_oid=oid=new_oid()
pfile=StringIO(p)
unpickler=Unpickler(pfile)
unpickler.persistent_load=persistent_load
newp=StringIO()
pickler=Pickler(newp,1)
pickler.persistent_id=persistent_id
pickler.dump(unpickler.load())
pickler.dump(unpickler.load())
p=newp.getvalue()
plen=len(p)
store(oid, None, p, version, t)
except:
storage.tpc_abort(t)
raise
else:
storage.tpc_finish(t)
if return_oid is not None: return self[return_oid]
######################################################################
# BoboPOS 2 compat.
def export_file(self, o, file=None): return self.exportFile(o._p_oid, file)
import_file=importFile
StringType=type('')
def TemporaryFile():
# This is sneaky suicide
global TemporaryFile
import tempfile
TemporaryFile=tempfile.TemporaryFile
return TemporaryFile()
export_end_marker='\377'*16
class Ghost: pass
def persistent_id(object, Ghost=Ghost):
if hasattr(object, '__class__') and object.__class__ is Ghost:
return object.oid
......@@ -93,9 +93,7 @@ Files are arranged as follows.
A transaction record consists of:
- 8-byte transaction record, which is also a time stamp.
- 8-byte previous-transaction file position.
- 8-byte transaction id, which is also a time stamp.
- 8-byte transaction record length - 8.
......@@ -151,46 +149,19 @@ Also, the object ids time stamps are big-endian, so comparisons
are meaningful.
"""
__version__='$Revision: 1.6 $'[11:-2]
__version__='$Revision: 1.7 $'[11:-2]
import struct, time, os, bpthread
import struct, time, os, bpthread, string, base64
now=time.time
from struct import pack, unpack
from cPickle import dumps
from cPickle import dumps, loads
import POSException
from TimeStamp import TimeStamp
from lock_file import lock_file
t32 = 1L << 32
def p64(v, pack=struct.pack):
if v < t32: h=0
else:
h=v/t32
v=v%t32
return pack(">II", h, v)
def u64(v, unpack=struct.unpack):
h, v = unpack(">ii", v)
if v < 0: v=t32-v
if h:
if h < 0: h=t32-h
v=h*t32+v
return v
from utils import t32, p64, u64, cp
z64='\0'*8
def cp(f1, f2, l):
read=f1.read
write=f2.write
n=8192
while l > 0:
if n > l: n=l
d=read(n)
write(d)
l = l - len(d)
def warn(log, message, *data):
log("%s warn: %s\n" % (packed_version, (message % data)))
......@@ -219,7 +190,7 @@ class CorruptedFileStorageError(FileStorageError,
class CorruptedTransactionError(CorruptedFileStorageError): pass
class CorruptedDataError(CorruptedFileStorageError): pass
packed_version='FS20'
packed_version='FS21'
class FileStorage:
_packt=0
......@@ -237,6 +208,17 @@ class FileStorage:
if stop is None: stop='\377'*8
# Lock the database
if not read_only:
try: f=open(file_name+'.lock', 'r+')
except: f=open(file_name+'.lock', 'w+')
lock_file(f)
try:
f.write(str(os.getpid()))
f.flush()
except: pass
self._lock_file=f # so it stays open
self.__name__=file_name
self._tfile=open(file_name+'.tmp','w+b')
index, vindex, tindex, tvindex = self._newIndexes()
......@@ -264,20 +246,18 @@ class FileStorage:
self._file=file=open(file_name,'w+b')
self._file.write(packed_version)
self._pos=4
self._tpos=0
self._oid='\0\0\0\0\0\0\0\0'
return
if os.path.exists(file_name):
file=open(file_name, read_only and 'rb' or 'r+b')
#if not read_only: lock_file(file)
else:
if read_only:
raise ValueError, "can\'t create a read-only file"
file=open(file_name,'w+b')
self._file=file
self._pos, self._tpos, self._oid, tid = read_index(
self._pos, self._oid, tid = read_index(
file, file_name, index, vindex, tindex, stop, log)
self._ts=tid=TimeStamp(tid)
......@@ -291,44 +271,48 @@ class FileStorage:
def _newIndexes(self): return {}, {}, [], {}
def abortVersion(self, version, transaction):
def abortVersion(self, src, transaction):
# We are going to abort by simply storing back pointers.
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._a()
try:
pos=self._vindex[version]
spos=p64(pos)
file=self._file
seek=file.seek
read=file.read
file=self._tfile
write=file.write
tell=file.tell
tloc=p64(self._pos)
seek=file.seek
tfile=self._tfile
write=tfile.write
tappend=self._tappend
index=self._index
pack=struct.pack
unpack=struct.unpack
serial=self._serial
while pos:
seek(pos)
h=read(58)
srcpos=self._vindex.get(src, 0)
spos=p64(srcpos)
middle=p64(self._pos)+'\0'*10
here=tfile.tell()+self._pos+self._thl
oids=[]
appoids=oids.append
while srcpos:
seek(srcpos)
h=read(58) # oid, serial, prev(oid), tloc, vlen, plen, pnv, pv
oid=h[:8]
if index[oid]==pos:
tappend(oid, tell())
pc=h[-16:-8] # Position of committed (non-version) data
write(pack(
">"
"8s" "8s" "8s" "8s" "H" "8s" "8s",
oid, serial, spos, tloc, 0, z64, pc
))
if index[oid]==srcpos:
tappend((oid,here))
appoids(oid)
# oid,ser prev tl,vl,pl pnv
write(h[:16] + spos + middle + h[-16:-8])
here=here+50
spos=h[-8:]
pos=u64(spos)
srcpos=u64(spos)
self._tvindex[src]=0
del self._vindex[version]
return oids
finally: self._r()
......@@ -337,40 +321,58 @@ class FileStorage:
# Eventuallly, we should save_index
def commitVersion(self, src, dest, transaction):
# We are going to commit by simply storing back pointers.
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._a()
try:
pos=self._vindex[version]
spos=p64(pos)
file=self._file
seek=file.seek
read=file.read
file=self._tfile
write=file.write
tell=file.tell
tloc=p64(self._pos)
seek=file.seek
tfile=self._tfile
write=tfile.write
tappend=self._tappend
index=self._index
pack=struct.pack
unpack=struct.unpack
destlen=len(dest)
while pos:
seek(pos)
h=read(58)
srcpos=self._vindex.get(src, 0)
spos=p64(srcpos)
middle=struct.pack(">8sH8s", p64(self._pos), len(dest), z64)
if dest:
sd=p64(self._vindex.get(dest, 0))
heredelta=66+len(dest)
else:
sd=''
heredelta=50
here=tfile.tell()+self._pos+self._thl
oids=[]
appoids=oids.append
while srcpos:
seek(srcpos)
h=read(58) # oid, serial, prev(oid), tloc, vlen, plen, pnv, pv
oid=h[:8]
if index[oid]==pos:
tappend(oid, tell())
write(pack(">8s" "8s" "8s" "H" "8s" "8s",
oid, spos, tloc,destlen, z64, h[-16:-8]))
write(dest)
write(spos)
if index[oid]==srcpos:
tappend((oid,here))
appoids(oid)
write(h[:16] + spos + middle)
if dest:
write(h[-16:-8]+sd+dest)
sd=p64(here)
write(spos) # data backpointer to src data
here=here+heredelta
spos=h[-8:]
pos=u64(spos)
srcpos=u64(spos)
self._tvindex[src]=0
del self._vindex[version]
return oids
finally: self._r()
......@@ -403,7 +405,10 @@ class FileStorage:
# If we get here, then either this was not a version record,
# or we've already read past the version data!
if plen != z64: return read(u64(plen)), serial
return _loadBack(file, oid, pnv)
pnv=read(8)
# We use the current serial, since that is the one that
# will get checked when we store.
return _loadBack(file, oid, pnv)[0], serial
finally: self._r()
def modifiedInVersion(self, oid):
......@@ -494,6 +499,8 @@ class FileStorage:
finally: self._r()
def registerDB(self, db, limit): pass # we don't care
def supportsUndo(self): return 0 # for now
def supportsVersions(self): return 1
......@@ -531,7 +538,7 @@ class FileStorage:
# Ugh, we have to record the transaction header length
# so that we can get version pointers right.
self._thl=33+len(user)+len(desc)+len(ext)
self._thl=23+len(user)+len(desc)+len(ext)
# And we have to save the data used to compute the
# header length. It's unlikely that this stuff would
......@@ -557,12 +564,11 @@ class FileStorage:
tlen=self._thl
pos=self._pos
file.seek(pos)
stpos=p64(self._tpos)
tl=tlen+dlen
stl=p64(tl)
write(pack(
">8s" "8s" "8s" "c" "H" "H" "I"
,id, stpos, stl, ' ', len(user), len(desc), len(ext),
">8s" "8s" "c" "H" "H" "H"
,id, stl, ' ', len(user), len(desc), len(ext),
))
if user: write(user)
if desc: write(desc)
......@@ -572,7 +578,6 @@ class FileStorage:
write(stl)
file.flush()
self._tpos=pos
self._pos=pos+tl+8
tindex=self._tindex
......@@ -589,15 +594,88 @@ class FileStorage:
finally: self._r()
def undo(self, transaction_id):
# TBD
pass
self._a()
try:
file=self._file
seek=file.seek
read=file.read
indexpos=self._indexpos
unpack=struct.unpack
transaction_id=base64.decodestring(transaction_id+'==\n')
tid, tpos = transaction_id[:8], u64(transaction_id[8:])
seek(tpos)
h=read(23)
if len(h) != 23 or h[:8] != tid:
raise UndoError, 'Invalid undo transaction id'
if h[16] == 'u': return
if h[16] != ' ': raise UndoError, 'Undoable transaction'
tl=u64(h[8:16])
ul,dl,el=unpack(">HHH", h[17:23])
tend=tpos+tl
pos=tpos+23+ul+dl+el
t=[]
tappend=t.append
while pos < tend:
# Read the data records for this transaction
seek(pos)
h=read(42)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
plen=u64(splen)
prev=u64(sprev)
dlen=42+(plen or 8)
if vlen: dlen=dlen+16+vlen
if indexpos(oid,0) != pos: raise UndoError, 'Undoable transaction'
pos=pos+dlen
if pos > tend: raise UndoError, 'Undoable transaction'
tappend((oid,prev))
def undoLog(self, version, first, last, path):
# TBD
return []
seek(tpos+16)
file.write('u')
index=self._index
for oid, pos in t: index[oid]=pos
finally: self._r()
def undoLog(self, first, last, filter=None):
self._a()
try:
pos=self._pos
if pos < 39: return []
file=self._file
seek=file.seek
read=file.read
unpack=struct.unpack
strip=string.strip
encode=base64.encodestring
r=[]
append=r.append
i=0
while i < last and pos > 39:
seek(pos-8)
pos=pos-u64(read(8))-8
if i < first: continue
seek(pos)
h=read(23)
tid, tl, status, ul, dl, el = unpack(">8s8scHHH", h)
if status != ' ': continue
u=ul and read(ul) or ''
d=dl and read(dl) or ''
d={'id': encode(tid+p64(pos))[:22],
'time': TimeStamp(tid).timeTime(),
'user_name': u, 'description': d}
if el:
try:
e=loads(read(el))
d.update(e)
except: pass
if filter is None or filter(d):
append(d)
i=i+1
return r
finally: self._r()
def versionEmpty(self, version):
return not self._vindex.has_key(version)
return not self._vindex.get(version, 0)
def versions(self, max=None):
if max: return self._vindex.keys()[:max]
......@@ -618,7 +696,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
if file_size:
if file_size < 4: raise FileStorageFormatError, file.name
if read(4) != packed_version:
raise FileStorageFormatError, file_name
raise FileStorageFormatError, name
else: file.write(packed_version)
pos=4
......@@ -629,22 +707,21 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
while 1:
# Read the transaction record
h=read(33)
h=read(23)
if not h: break
if len(h) != 33:
if len(h) != 23:
warn(log, '%s truncated at %s', name, pos)
seek(pos)
file.truncate()
break
tid, sprev, stl, status, ul, dl, el = unpack(">8s8s8scHHi",h)
tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
if el < 0: el=t32-el
if tid <= ltid:
warn(log, "%s time-stamp reduction at %s", name, pos)
ltid=tid
prev=u64(sprev)
tl=u64(stl)
if tl+pos+8 > file_size:
......@@ -675,7 +752,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
if status not in ' up':
warn(log,'%s has invalid status, %s, at %s', name, status, pos)
if prev != tpos or ul > tl or dl > tl or el > tl:
if ul > tl or dl > tl or el > tl:
panic(log,'%s has invalid transaction header at %s', name, pos)
if tid >= stop: break
......@@ -694,7 +771,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
pos=pos+8
continue
pos=tpos+33+ul+dl
pos=tpos+23+ul+dl+el
while pos < tend:
# Read the data records for this transaction
......@@ -705,17 +782,18 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
tloc=u64(stloc)
plen=u64(splen)
dlen=42+(plen or 8)+vlen
dlen=42+(plen or 8)
tappend((oid,pos))
if vlen:
dlen=dlen+16
dlen=dlen+16+vlen
seek(8,1)
pv=u64(read(8))
version=read(vlen)
if vndexpos(version, 0) != pv:
panic(log,"%s incorrect previous version pointer at %s",
name, pos)
# Jim says: "It's just not worth the bother."
#if vndexpos(version, 0) != pv:
# panic(log,"%s incorrect previous version pointer at %s",
# name, pos)
vindex[version]=pos
if pos+dlen > tend or tloc != tpos:
......@@ -744,7 +822,9 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
del tindex[:]
return pos, tpos, maxoid, ltid
_checkVindex(file, index, vindex)
return pos, maxoid, ltid
def _loadBack(file, oid, back):
......@@ -757,10 +837,21 @@ def _loadBack(file, oid, back):
seek(old)
h=read(42)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
if doid != oid or vlen:
if doid != oid:
panic(lambda x: None,
"%s version record back pointer points to "
"invalid record as %s", name, back)
if plen: return read(u64(plen)), serial
if vlen: seek(vlen+16,1)
if plen != z64: return read(u64(plen)), serial
back=read(8) # We got a back pointer!
def _checkVindex(file, index, vindex):
seek=file.seek
read=file.read
get=index.get
for version, pos in vindex.items():
seek(pos)
oid=read(8)
if get(oid, None) != pos:
del vindex[version] # This version is no longer active
......@@ -84,8 +84,8 @@
##############################################################################
'''BoboPOS-defined exceptions
$Id: POSException.py,v 1.2 1999/05/10 23:15:56 jim Exp $'''
__version__='$Revision: 1.2 $'[11:-2]
$Id: POSException.py,v 1.3 1999/05/18 15:55:10 jim Exp $'''
__version__='$Revision: 1.3 $'[11:-2]
class POSError(Exception):
......@@ -129,3 +129,16 @@ class StorageSystemError(StorageError):
"""Panic! Internal storage error!
"""
class ExportError(POSError):
"""An export file doesn't have the right format.
"""
pass
class Unimplemented(POSError):
"""An unimplemented feature was used
"""
pass
class Unsupported(POSError):
"""An feature that is unsupported bt the storage was used.
"""
......@@ -84,8 +84,8 @@
##############################################################################
"""Transaction management
$Id: Transaction.py,v 1.6 1999/05/12 15:55:44 jim Exp $"""
__version__='$Revision: 1.6 $'[11:-2]
$Id: Transaction.py,v 1.7 1999/05/18 15:55:10 jim Exp $"""
__version__='$Revision: 1.7 $'[11:-2]
import time, sys, struct
from struct import pack
......@@ -100,30 +100,50 @@ class Transaction:
_connections=None
_extension=None
def __init__(self,
time=time.time, pack=struct.pack, gmtime=time.gmtime):
def __init__(self, id=None):
self._id=id
self._objects=[]
self._append=self._objects.append
self._note=self._user=self._description=''
def _init(self):
self._objects=[]
self._append=self._objects.append
self.user=self.description=''
if self._connections:
for c in self._connections.values(): c.close()
del self._connections
def __str__(self): return "%.3f\t%s" % (self.time,self._note)
def sub(self):
r=self.__class__()
r.user=self.user
r.description=self.description
r._extention=self._extension
return r
def __str__(self): return "%.3f\t%s" % (self._id, self.user)
def abort(self, freeme=1):
'Abort the transaction.'
'''Abort the transaction.
This is called from the application. This means that we haven\'t
entered two-phase commit yet, so no tpc_ messages are sent.
'''
t=v=tb=None
try:
# Abort the objects
for o in self._objects:
try:
if hasattr(o,'_p_jar'): o=o._p_jar
if hasattr(o,'tpc_abort'): o.tpc_abort(self)
except: t,v,tb=sys.exc_info()
j=getattr(o, '_p_jar', o)
j.abort(o, self)
except:
if t is None:
t,v,tb=sys.exc_info()
if t is not None: raise t,v,tb
finally:
tb=None
if freeme: free_transaction()
if self._id is not None and freeme: free_transaction()
def begin(self, info=None):
'''Begin a new transaction.
......@@ -131,40 +151,61 @@ class Transaction:
This aborts any transaction in progres.
'''
if self._objects: self.abort(0)
self.__init__()
self._init()
if info:
info=split(info,'\t')
self.user=strip(info[0])
self.description=strip(join(info,'\t'))
self.description=strip(join(info[1:],'\t'))
def commit(self):
'Finalize the transaction'
t=v=tb=None
jars={}
objects=self._objects
try:
try:
for o in self._objects:
if hasattr(o,'_p_jar'):
j=o._p_jar
while objects:
o=objects[-1]
j=getattr(o, '_p_jar', o)
i=id(j)
if not jars.has_key(i):
jars[i]=j
j.tpc_begin(self)
j.commit(o,self)
elif hasattr(o,'tpc_begin'):
o.tpc_begin(self)
del objects[-1]
except:
t,v,tb=sys.exc_info()
self.abort()
# Ugh, we got an got an error during commit, so we
# have to clean up.
# First, we have to abort any uncommitted objects.
for o in objects:
try:
j=getattr(o, '_p_jar', o)
j.abort(o, self)
except: pass
# Then, we unwind TPC for the jars that began it.
for j in jars.values():
try: j.tpc_abort(self) # This should never fail
except: pass
raise t,v,tb
for o in self._objects:
for j in jars.values():
try:
if hasattr(o,'_p_jar'): o=o._p_jar
if hasattr(o,'tpc_finish'): o.tpc_finish(self)
except: t,v,tb=sys.exc_info()
j.tpc_finish(self) # This should never fail
except:
if t is None:
t,v,tb=sys.exc_info()
if t is not None: raise t,v,tb
finally:
tb=None
free_transaction()
if self._id is not None: free_transaction()
def register(self,object):
'Register the given object for transaction control.'
......@@ -191,11 +232,18 @@ class Transaction:
try:
import thread
except:
_t=Transaction(None)
def get_transaction(_t=_t): return _t
def free_transaction(_t=_t): _t.__init__()
else:
_t={}
def get_transaction(_id=thread.get_ident, _t=_t):
id=_id()
try: t=_t[id]
except KeyError: _t[id]=t=Transaction()
except KeyError: _t[id]=t=Transaction(id)
return t
def free_transaction(_id=thread.get_ident, _t=_t):
......@@ -205,11 +253,6 @@ try:
del thread
except:
_t=Transaction()
def get_transaction(_t=_t): return _t
def free_transaction(_t=_t): _t.__init__()
del _t
import __main__
......
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""Provide a function that can find object references in pickles
"""
import cPickle, cStringIO
def referencesf(p,rootl,
Unpickler=cPickle.Unpickler,
StringIO=cStringIO.StringIO,
tt=type(()),
type=type):
u=Unpickler(StringIO(p))
l=len(rootl)
u.persistent_load=rootl
u.noload()
try: u.noload()
except:
# Hm. We failed to do second load. Maybe there wasn't a
# second pickle. Let's check:
f=StringIO(p)
u=Unpickler(f)
u.persistent_load=[]
u.noload()
if len(p) > f.tell(): raise ValueError, 'Error unpickling, %s' % p
# References may have class info, so we need to
# check for wrapped references.
for i in range(l, len(rootl)):
v=rootl[i]
if v:
if type(v) is tt: v=v[0]
rootl[i]=v
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
import TimeStamp, time, struct
t32 = 1L << 32
def p64(v, pack=struct.pack):
if v < t32: h=0
else:
h=v/t32
v=v%t32
return pack(">II", h, v)
def u64(v, unpack=struct.unpack):
h, v = unpack(">ii", v)
if v < 0: v=t32-v
if h:
if h < 0: h=t32-h
v=h*t32+v
return v
def cp(f1, f2, l):
read=f1.read
write=f2.write
n=8192
while l > 0:
if n > l: n=l
d=read(n)
write(d)
l = l - len(d)
def newTimeStamp(old=None,
TimeStamp=TimeStamp.TimeStamp,
time=time.time, gmtime=time.gmtime):
t=time()
ts=TimeStamp(gmtime(t)[:5]+(t%60,))
if old is not None: return ts.laterThan(than)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment