Commit c717e473 authored by Jim Fulton's avatar Jim Fulton

Fixed version undo and added packing.

parent c317c8ca
...@@ -82,85 +82,120 @@ ...@@ -82,85 +82,120 @@
# attributions are listed in the accompanying credits file. # attributions are listed in the accompanying credits file.
# #
############################################################################## ##############################################################################
"""File-based ZODB storage #
# File-based ZODB storage
Files are arranged as follows. #
# Files are arranged as follows.
- The first 4 bytes are a file identifier. #
# - The first 4 bytes are a file identifier.
- The rest of the file consists of a sequence of transaction #
"records". # - The rest of the file consists of a sequence of transaction
# "records".
A transaction record consists of: #
# A transaction record consists of:
- 8-byte transaction id, which is also a time stamp. #
# - 8-byte transaction id, which is also a time stamp.
- 8-byte transaction record length - 8. #
# - 8-byte transaction record length - 8.
- 1-byte status code #
# - 1-byte status code
- 2-byte length of user name #
# - 2-byte length of user name
- 2-byte length of description #
# - 2-byte length of description
- 4-byte length of extension attributes #
# - 4-byte length of extension attributes
- user name #
# - user name
- description #
# - description
* A sequence of data records #
# * A sequence of data records
- 8-byte redundant transaction length -8 #
# - 8-byte redundant transaction length -8
A data record consists of #
# A data record consists of
- 8-byte oid. #
# - 8-byte oid.
- 8-byte serial, which is a type stamp that matches the #
transaction timestamp. # - 8-byte serial, which is a type stamp that matches the
# transaction timestamp.
- 8-byte previous-record file-position. #
# - 8-byte previous-record file-position.
- 8-byte beginning of transaction record file position. #
# - 8-byte beginning of transaction record file position.
- 2-byte version length #
# - 2-byte version length
- 8-byte data length #
# - 8-byte data length
? 8-byte position of non-version data #
(if version length > 0) # ? 8-byte position of non-version data
# (if version length > 0)
? 8-byte position of previous record in this version #
(if version length > 0) # ? 8-byte position of previous record in this version
# (if version length > 0)
? version string #
(if version length > 0) # ? version string
# (if version length > 0)
? data #
(data length > 0) # ? data
# (data length > 0)
? 8-byte position of data record containing data #
(data length > 0) # ? 8-byte position of data record containing data
# (data length == 0)
#
Note that the lengths and positions are all big-endian. # Note that the lengths and positions are all big-endian.
Also, the object ids time stamps are big-endian, so comparisons # Also, the object ids time stamps are big-endian, so comparisons
are meaningful. # are meaningful.
#
""" # Version handling
__version__='$Revision: 1.11 $'[11:-2] #
# There isn't a separate store for versions. Each record has a
# version field, indicating what version it is in. The records in a
# version form a linked list. Each record that has a non-empty
# version string has a pointer to the previous record in the version.
# Version back pointers are retained *even* when versions are
# committed or aborted or when transactions are undone.
#
# There is a notion of "current" version records, which are the
# records in a version that are the current records for their
# respective objects. When a version is comitted, the current records
# are committed to the destination version. When a version is
# aborted, the current records are aborted.
#
# When committing or aborting, we search backward through the linked
# list until we find a record for an object that does not have a
# current record in the version. If we find a record for which the
# non-version pointer is the same as the previous pointer, then we
# forget that the corresponding object had a current record in the
# version. This strategy allows us to avoid searching backward through
# previously committed or aborted version records.
#
# Of course, we ignore records in undone transactions when committing
# or aborting.
#
# Backpointers
#
# When we commit or abort a version, we don't copy (or delete)
# and data. Instead, we write records with back pointers.
#
# A version record *never* has a back pointer to a non-version
# record, because we never abort to a version. A non-version record
# may have a back pointer to a version record or to a non-version
# record.
#
__version__='$Revision: 1.12 $'[11:-2]
import struct, time, os, bpthread, string, base64 import struct, time, os, bpthread, string, base64
now=time.time
from struct import pack, unpack from struct import pack, unpack
from cPickle import dumps, loads from cPickle import loads
import POSException import POSException
from TimeStamp import TimeStamp from TimeStamp import TimeStamp
from lock_file import lock_file from lock_file import lock_file
from utils import t32, p64, u64, cp from utils import t32, p64, u64, cp
from zLOG import LOG, WARNING, ERROR, PANIC, register_subsystem from zLOG import LOG, WARNING, ERROR, PANIC, register_subsystem
register_subsystem('ZODB FS') register_subsystem('ZODB FS')
import BaseStorage
z64='\0'*8 z64='\0'*8
...@@ -174,7 +209,6 @@ def panic(message, *data): ...@@ -174,7 +209,6 @@ def panic(message, *data):
message=message%data message=message%data
LOG('ZODB FS',PANIC,"%s ERROR: %s\n" % (packed_version, message)) LOG('ZODB FS',PANIC,"%s ERROR: %s\n" % (packed_version, message))
raise CorruptedTransactionError, message raise CorruptedTransactionError, message
class FileStorageError: pass class FileStorageError: pass
...@@ -194,12 +228,10 @@ class CorruptedDataError(CorruptedFileStorageError): pass ...@@ -194,12 +228,10 @@ class CorruptedDataError(CorruptedFileStorageError): pass
packed_version='FS21' packed_version='FS21'
class FileStorage: class FileStorage(BaseStorage.BaseStorage):
_packt=0 _packt=z64
_transaction=None
_serial=z64
def __init__(self, file_name, create=0, read_only=0, stop=None): def __init__(self, file_name, create=0, read_only=0, stop=None, base=None):
if read_only: if read_only:
if create: if create:
...@@ -219,26 +251,15 @@ class FileStorage: ...@@ -219,26 +251,15 @@ class FileStorage:
f.flush() f.flush()
except: pass except: pass
self._lock_file=f # so it stays open self._lock_file=f # so it stays open
self.__name__=file_name BaseStorage.BaseStorage.__init__(self, file_name)
self._tfile=open(file_name+'.tmp','w+b') self._tfile=open(file_name+'.tmp','w+b')
index, vindex, tindex, tvindex = self._newIndexes() index, vindex, tindex, tvindex = self._newIndexes()
self._index=index self._initIndex(index, vindex, tindex, tvindex)
self._vindex=vindex
self._tindex=tindex self._base=base
self._tvindex=tvindex
self._indexpos=index.get
self._vindexpos=vindex.get
self._tappend=tindex.append
# Allocate locks:
l=bpthread.allocate_lock()
self._a=l.acquire
self._r=l.release
l=bpthread.allocate_lock()
self._ca=l.acquire
self._cr=l.release
# Now open the file # Now open the file
...@@ -271,66 +292,38 @@ class FileStorage: ...@@ -271,66 +292,38 @@ class FileStorage:
self._ts=t self._ts=t
def _initIndex(self, index, vindex, tindex, tvindex):
self._index=index
self._vindex=vindex
self._tindex=tindex
self._tvindex=tvindex
self._index_get=index.get
self._vindex_get=vindex.get
self._tappend=tindex.append
def __len__(self): return len(self._index) def __len__(self): return len(self._index)
def _newIndexes(self): return {}, {}, [], {} def _newIndexes(self): return {}, {}, [], {}
def abortVersion(self, src, transaction): def abortVersion(self, src, transaction):
# We are going to abort by simply storing back pointers. return self.commitVersion(src, '', transaction, abort=1)
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._a()
try:
file=self._file
read=file.read
seek=file.seek
tfile=self._tfile
write=tfile.write
tappend=self._tappend
index=self._index
srcpos=self._vindex.get(src, 0)
spos=p64(srcpos)
middle=p64(self._pos)+'\0'*10
here=tfile.tell()+self._pos+self._thl
oids=[]
appoids=oids.append
while srcpos:
seek(srcpos)
h=read(58) # oid, serial, prev(oid), tloc, vlen, plen, pnv, pv
oid=h[:8]
if index[oid]==srcpos:
tappend((oid,here))
appoids(oid)
# oid,ser prev tl,vl,pl pnv
write(h[:16] + spos + middle + h[-16:-8])
here=here+50
spos=h[-8:]
srcpos=u64(spos)
self._tvindex[src]=0
return oids
finally: self._r()
def close(self): def close(self):
self._file.close() self._file.close()
# Eventuallly, we should save_index # Eventuallly, we should save_index
def commitVersion(self, src, dest, transaction): def commitVersion(self, src, dest, transaction, abort=None):
# We are going to commit by simply storing back pointers. # We are going to commit by simply storing back pointers.
if dest and abort:
raise 'VersionCommitError', (
'Internal error, can\'t abort to a version')
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
self._a() self._lock_acquire()
try: try:
file=self._file file=self._file
read=file.read read=file.read
...@@ -340,13 +333,12 @@ class FileStorage: ...@@ -340,13 +333,12 @@ class FileStorage:
tappend=self._tappend tappend=self._tappend
index=self._index index=self._index
srcpos=self._vindex.get(src, 0) srcpos=self._vindex_get(src, 0)
spos=p64(srcpos) spos=p64(srcpos)
middle=struct.pack(">8sH8s", p64(self._pos), len(dest), z64) middle=struct.pack(">8sH8s", p64(self._pos), len(dest), z64)
if dest: if dest:
sd=p64(self._vindex.get(dest, 0)) sd=p64(self._vindex_get(dest, 0))
heredelta=66+len(dest) heredelta=66+len(dest)
else: else:
sd='' sd=''
...@@ -356,69 +348,119 @@ class FileStorage: ...@@ -356,69 +348,119 @@ class FileStorage:
oids=[] oids=[]
appoids=oids.append appoids=oids.append
tvindex=self._tvindex tvindex=self._tvindex
current_oids={}
current=current_oids.has_key
t=None
tstatus=' '
while srcpos: while srcpos:
seek(srcpos) seek(srcpos)
h=read(58) # oid, serial, prev(oid), tloc, vlen, plen, pnv, pv h=read(58) # oid, serial, prev(oid), tloc, vlen, plen, pnv, pv
oid=h[:8] oid=h[:8]
pnv=h[-16:-8]
if index[oid]==srcpos: if index[oid]==srcpos:
# This is a current record!
tappend((oid,here)) tappend((oid,here))
appoids(oid) appoids(oid)
write(h[:16] + spos + middle) write(h[:16] + spos + middle)
if dest: if dest:
tvindex[dest]=here tvindex[dest]=here
write(h[-16:-8]+sd+dest) write(pnv+sd+dest)
sd=p64(here) sd=p64(here)
write(spos) # data backpointer to src data write(abort and pnv or spos) # data backpointer to src data
here=here+heredelta here=here+heredelta
if h[16:24] != pnv:
# This is not the first current record, so mark it
current_oids[oid]=1
else:
# Hm. This is a non-current record. Is there a
# current record for this oid?
if not current(oid):
# Nope. We're done *if* this transaction wasn't undone.
tloc=h[24:32]
if t != tloc:
# We haven't checked this transaction before,
# get it's status.
t=tloc
seek(u64(t)+16)
tstatus=read(1)
if tstatus != 'u':
# Yee ha! We can quit
break
elif h[16:24] == pnv:
# This is the first current record, so unmark it.
# Note that we don't need to check if this was
# undone. If it *was* undone, then there must
# be a later record that is the first record, or
# there isn't a current record. In either case,
# we can't be in this branch. :)
del current_oids[oid]
spos=h[-8:] spos=h[-8:]
srcpos=u64(spos) srcpos=u64(spos)
tvindex[src]=0
return oids return oids
finally: self._r() finally: self._lock_release()
def getName(self): return self.__name__
def getSize(self): return self._pos def getSize(self): return self._pos
def history(self, oid, version, length=1): def _loada(self, oid, _index, file):
# TBD "Read any version and return the version"
pass pos=_index[oid]
file.seek(pos)
read=file.read
h=read(42)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
if vlen:
file.seek(16,1)
version=read(vlen)
else:
version=''
if plen != z64: return read(u64(plen)), version
return _loadBack(file, oid, read(8))[0], version
def _load(self, oid, version, _index, file):
pos=_index[oid]
file.seek(pos)
read=file.read
h=read(42)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
if doid != oid: raise CorruptedDataError, h
if vlen:
pnv=read(8) # Read location of non-version data
if (not version or len(version) != vlen or
(read(8) # skip past version link
and version != read(vlen))
):
return _loadBack(file, oid, pnv)
# If we get here, then either this was not a version record,
# or we've already read past the version data!
if plen != z64: return read(u64(plen)), serial
pnv=read(8)
# We use the current serial, since that is the one that
# will get checked when we store.
return _loadBack(file, oid, pnv)[0], serial
def load(self, oid, version, _stuff=None): def load(self, oid, version, _stuff=None):
self._a() self._lock_acquire()
try: try:
pos=self._index[oid] try: return self._load(oid, version, self._index, self._file)
file=self._file except:
file.seek(pos) if self._base is not None:
read=file.read return self._base.load(oid, version)
h=read(42) raise
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h) finally: self._lock_release()
if doid != oid: raise CorruptedDataError, h
if vlen:
pnv=read(8) # Read location of non-version data
if (not version or len(version) != vlen or
(read(8) # skip past version link
and version != read(vlen))
):
return _loadBack(file, oid, pnv)
# If we get here, then either this was not a version record,
# or we've already read past the version data!
if plen != z64: return read(u64(plen)), serial
pnv=read(8)
# We use the current serial, since that is the one that
# will get checked when we store.
return _loadBack(file, oid, pnv)[0], serial
finally: self._r()
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
self._a() self._lock_acquire()
try: try:
pos=self._index[oid] pos=self._index[oid]
file=self._file file=self._file
...@@ -431,35 +473,15 @@ class FileStorage: ...@@ -431,35 +473,15 @@ class FileStorage:
seek(24,1) # skip plen, pnv, and pv seek(24,1) # skip plen, pnv, and pv
return file.read(vlen) return file.read(vlen)
return '' return ''
finally: self._r() finally: self._lock_release()
def new_oid(self, last=None):
if last is None:
self._a()
try:
last=self._oid
d=ord(last[-1])
if d < 255: last=last[:-1]+chr(d+1)
else: last=self.new_oid(last[:-1])
self._oid=last
return last
finally: self._r()
else:
d=ord(last[-1])
if d < 255: return last[:-1]+chr(d+1)+'\0'*(8-len(last))
else: return self.new_oid(last[:-1])
def pack(self, t, rf):
# TBD
pass
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
self._a() self._lock_acquire()
try: try:
old=self._indexpos(oid, 0) old=self._index_get(oid, 0)
pnv=None pnv=None
if old: if old:
file=self._file file=self._file
...@@ -494,7 +516,7 @@ class FileStorage: ...@@ -494,7 +516,7 @@ class FileStorage:
else: write(p64(old)) else: write(p64(old))
# Link to last record for this version: # Link to last record for this version:
tvindex=self._tvindex tvindex=self._tvindex
pv=tvindex.get(version, 0) or self._vindexpos(version, 0) pv=tvindex.get(version, 0) or self._vindex_get(version, 0)
write(p64(pv)) write(p64(pv))
tvindex[version]=here tvindex[version]=here
write(version) write(version)
...@@ -503,112 +525,67 @@ class FileStorage: ...@@ -503,112 +525,67 @@ class FileStorage:
return serial return serial
finally: self._r() finally: self._lock_release()
def registerDB(self, db, limit): pass # we don't care
def supportsUndo(self): return 0 # for now def supportsUndo(self): return 1
def supportsVersions(self): return 1 def supportsVersions(self): return 1
def tpc_abort(self, transaction):
self._a()
try:
if transaction is not self._transaction: return
del self._tindex[:]
self._transaction=None
self._cr()
finally: self._r()
def tpc_begin(self, transaction):
self._a()
try:
if self._transaction is transaction: return
self._r()
self._ca()
self._a()
self._transaction=transaction
del self._tindex[:] # Just to be sure!
self._tvindex.clear() # ''
self._tfile.seek(0)
t=time.time()
t=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
self._ts=t=t.laterThan(self._ts)
self._serial=`t`
user=transaction.user
desc=transaction.description
ext=transaction._extension
if ext: ext=dumps(ext,1)
else: ext=""
# Ugh, we have to record the transaction header length
# so that we can get version pointers right.
self._thl=23+len(user)+len(desc)+len(ext)
# And we have to save the data used to compute the
# header length. It's unlikely that this stuff would
# change, but if it did, it would be a disaster.
self._ude=user, desc, ext
finally: self._r()
def tpc_finish(self, transaction, f=None): def _clear_temp(self):
self._a() del self._tindex[:]
try: self._tvindex.clear()
if transaction is not self._transaction: return self._tfile.seek(0)
if f is not None: f()
file=self._file def _begin(self, tid, u, d, e):
write=file.write self._thl=23+len(u)+len(d)+len(e)
tfile=self._tfile
dlen=tfile.tell() def _finish(self, tid, u, d, e):
tfile.seek(0) file=self._file
id=self._serial write=file.write
user, desc, ext = self._ude tfile=self._tfile
self._ude=None dlen=tfile.tell()
tfile.seek(0)
tlen=self._thl id=self._serial
pos=self._pos user, desc, ext = self._ude
file.seek(pos)
tl=tlen+dlen tlen=self._thl
stl=p64(tl) pos=self._pos
write(pack( file.seek(pos)
">8s" "8s" "c" "H" "H" "H" tl=tlen+dlen
,id, stl, ' ', len(user), len(desc), len(ext), stl=p64(tl)
)) write(pack(
if user: write(user) ">8s" "8s" "c" "H" "H" "H"
if desc: write(desc) ,id, stl, ' ', len(user), len(desc), len(ext),
if ext: write(ext) ))
if user: write(user)
cp(tfile, file, dlen) if desc: write(desc)
if ext: write(ext)
write(stl)
file.flush() cp(tfile, file, dlen)
self._pos=pos+tl+8
write(stl)
tindex=self._tindex file.flush()
index=self._index self._pos=pos+tl+8
for oid, pos in tindex: index[oid]=pos
del tindex[:] index=self._index
for oid, pos in self._tindex: index[oid]=pos
tvindex=self._tvindex
self._vindex.update(tvindex) self._vindex.update(self._tvindex)
tvindex.clear()
self._transaction=None
self._cr()
finally: self._r()
def undo(self, transaction_id): def undo(self, transaction_id):
self._a() self._lock_acquire()
try: try:
transaction_id=base64.decodestring(transaction_id+'==\n')
tid, tpos = transaction_id[:8], u64(transaction_id[8:])
packt=self._packt
if packt is None or packt > tid:
raise POSException.UndoError, (
'Undo is currently disabled for database maintenance.<p>')
file=self._file file=self._file
seek=file.seek seek=file.seek
read=file.read read=file.read
indexpos=self._indexpos index_get=self._index_get
unpack=struct.unpack unpack=struct.unpack
transaction_id=base64.decodestring(transaction_id+'==\n')
tid, tpos = transaction_id[:8], u64(transaction_id[8:])
seek(tpos) seek(tpos)
h=read(23) h=read(23)
if len(h) != 23 or h[:8] != tid: if len(h) != 23 or h[:8] != tid:
...@@ -619,8 +596,7 @@ class FileStorage: ...@@ -619,8 +596,7 @@ class FileStorage:
ul,dl,el=unpack(">HHH", h[17:23]) ul,dl,el=unpack(">HHH", h[17:23])
tend=tpos+tl tend=tpos+tl
pos=tpos+23+ul+dl+el pos=tpos+23+ul+dl+el
t=[] t={}
tappend=t.append
while pos < tend: while pos < tend:
# Read the data records for this transaction # Read the data records for this transaction
seek(pos) seek(pos)
...@@ -630,21 +606,26 @@ class FileStorage: ...@@ -630,21 +606,26 @@ class FileStorage:
prev=u64(sprev) prev=u64(sprev)
dlen=42+(plen or 8) dlen=42+(plen or 8)
if vlen: dlen=dlen+16+vlen if vlen: dlen=dlen+16+vlen
if indexpos(oid,0) != pos: if index_get(oid,0) != pos:
raise UndoError, 'Undoable transaction' raise UndoError, 'Undoable transaction'
pos=pos+dlen pos=pos+dlen
if pos > tend: raise UndoError, 'Undoable transaction' if pos > tend: raise UndoError, 'Undoable transaction'
tappend((oid,prev)) t[oid]=prev
seek(tpos+16) seek(tpos+16)
file.write('u') file.write('u')
index=self._index index=self._index
for oid, pos in t: index[oid]=pos for oid, pos in t.items(): index[oid]=pos
finally: self._r() return t.keys()
finally: self._lock_release()
def undoLog(self, first, last, filter=None): def undoLog(self, first, last, filter=None):
self._a() self._lock_acquire()
try: try:
packt=self._packt
if packt is None:
raise POSException.UndoError, (
'Undo is currently disabled for database maintenance.<p>')
pos=self._pos pos=self._pos
if pos < 39: return [] if pos < 39: return []
file=self._file file=self._file
...@@ -663,6 +644,7 @@ class FileStorage: ...@@ -663,6 +644,7 @@ class FileStorage:
seek(pos) seek(pos)
h=read(23) h=read(23)
tid, tl, status, ul, dl, el = unpack(">8s8scHHH", h) tid, tl, status, ul, dl, el = unpack(">8s8scHHH", h)
if tid < packt: break
if status != ' ': continue if status != ' ': continue
u=ul and read(ul) or '' u=ul and read(ul) or ''
d=dl and read(dl) or '' d=dl and read(dl) or ''
...@@ -679,18 +661,393 @@ class FileStorage: ...@@ -679,18 +661,393 @@ class FileStorage:
i=i+1 i=i+1
return r return r
finally: self._r() finally: self._lock_release()
def versionEmpty(self, version): def versionEmpty(self, version):
return not self._vindex.get(version, 0) self._lock_acquire()
try:
index=self._index
file=self._file
seek=file.seek
read=file.read
srcpos=self._vindex_get(version, 0)
t=tstatus=None
while srcpos:
seek(srcpos)
oid=read(8)
if index[oid]==srcpos: return 0
h=read(50) # serial, prev(oid), tloc, vlen, plen, pnv, pv
tloc=h[16:24]
if t != tloc:
# We haven't checked this transaction before,
# get it's status.
t=tloc
seek(u64(t)+16)
tstatus=read(1)
if tstatus != 'u': return 1
spos=h[-8:]
srcpos=u64(spos)
return 1
finally: self._lock_release()
def versions(self, max=None): def versions(self, max=None):
if max: return self._vindex.keys()[:max] r=[]
return self._vindex.keys() a=r.append
for version in self._vindex.keys()[:max]:
if self.versionEmpty(version): continue
a(version)
if max and len(r) >= max: return r
return r
def pack(self, t, referencesf):
"""Copy data from the current database file to a packed file
Non-current records from transactions with time-stamp strings less
than packtss are ommitted. As are all undone records.
Also, data back pointers that point before packtss are resolved and
the associated data are copied, since the old records are not copied.
"""
# Ugh, this seems long
packing=1 # are we in the packing phase (or the copy phase)
locked=0
_lock_acquire=self._lock_acquire
_lock_release=self._lock_release
index, vindex, tindex, tvindex = self._newIndexes()
name=self.__name__
file=open(name, 'r+b')
stop=`apply(TimeStamp, time.gmtime(t)[:5]+(t%60,))`
try:
##################################################################
# Step 1, get index as of pack time that
# includes only referenced objects.
# Record pack time so we don't undo while packing
_lock_acquire()
self._packt=stop
_lock_release()
packpos, maxoid, ltid = read_index(
file, name, index, vindex, tindex, stop)
rootl=[z64]
pop=rootl.pop
pindex={}
referenced=pindex.has_key
_load=self._load
_loada=self._loada
v=None
while rootl:
oid=pop()
if referenced(oid): continue
try:
p, v = _loada(oid, index, file)
referencesf(p, rootl)
if v:
p, serial = _load(oid, '', index, file)
referencesf(p, rootl)
pindex[oid]=index[oid]
except:
pindex[oid]=0
error('Bad reference to %s', `(oid,v)`)
spackpos=p64(packpos)
##################################################################
# Step 2, copy data and compute new index based on new positions.
index, vindex, tindex, tvindex = self._newIndexes()
ofile=open(name+'.pack', 'w+b')
# Index for non-version data. This is a temporary structure
# to reduce I/O during packing
nvindex={}
# Cache a bunch of methods
seek=file.seek
read=file.read
oseek=ofile.seek
write=ofile.write
tappend=tindex.append
index_get=index.get
vindex_get=vindex.get
pindex_get=pindex.get
# Initialize,
pv=z64
offset=0 # the abound of spaec freed by packing
pos=opos=4
oseek(0)
write(packed_version)
# Copy the data in two stages. In the packing stage,
# we skip records that are non-current or that are for
# unreferenced objects. We also skip undone transactions.
#
# After the packing stage, we copy everything but undone
# transactions, however, we have to update various back pointers.
# We have to have the storage lock in the second phase to keep
# data from being changed while we're copying.
pnv=None
while 1:
# Check for end of packed records
if packing and pos >= packpos:
# OK, we're done with the old stuff, now we have
# to get the lock so we can copy the new stuff!
offset=pos-opos
if offset <= 0:
# we didn't free any space, there's no point in
# continuing
ofile.close()
file.close()
os.remove(name+'.pack')
return
packing=0
_lock_acquire()
locked=1
self._packt=None # Prevent undo until we're done
# Read the transaction record
seek(pos)
h=read(23)
if len(h) < 23: break
tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
if el < 0: el=t32-el
tl=u64(stl)
tpos=pos
tend=tpos+tl
if status=='u':
# Undone transaction, skip it
pos=tend+8
continue
otpos=opos # start pos of output trans
# write out the transaction record
write(h)
thl=ul+dl+el
h=read(thl)
if len(h) != thl:
raise 'Pack Error', opos
write(h)
thl=23+thl
pos=tpos+thl
opos=otpos+thl
status=' '
while pos < tend:
# Read the data records for this transaction
seek(pos)
h=read(42)
oid,serial,sprev,stloc,vlen,splen = unpack(
">8s8s8s8sH8s", h)
plen=u64(splen)
dlen=42+(plen or 8)
# print u64(oid), pos, vlen, plen, pindex.get(oid,'?')
if vlen:
dlen=dlen+16+vlen
if packing and pindex_get(oid,0) != pos:
# This is not the most current record, or
# the oid is no longer referenced so skip it.
pos=pos+dlen
status='p'
continue
pnv=u64(read(8))
# skip pos prev ver rec
seek(8,1)
version=read(vlen)
pv=p64(vindex_get(version, 0))
vindex[version]=opos
else:
if packing:
ppos=pindex_get(oid, 0)
if ppos != pos:
if not ppos:
# This object is no longer referenced
# so skip it.
pos=pos+dlen
status='p'
continue
# This is not the most current record
# But maybe it's the most current committed
# record.
seek(ppos)
ph=read(42)
pdoid,ps,pp,pt,pvlen,pplen = unpack(
">8s8s8s8sH8s", ph)
if not pvlen:
# The most current record is committed, so
# we can toss this one
pos=pos+dlen
status='p'
continue
pnv=read(8)
pnv=_loadBackPOS(file, oid, pnv)
if pnv > pos:
# The current non version data is later,
# so this isn't the current record
pos=pos+dlen
status='p'
continue
nvindex[oid]=opos
tappend((oid,opos))
opos=opos+dlen
pos=pos+dlen
if plen:
p=read(plen)
else:
p=read(8)
if packing:
# When packing we resolve back pointers!
p, serial = _loadBack(file, oid, p)
plen=len(p)
opos=opos+plen-8
splen=p64(plen)
else:
p=u64(p)
if p < packpos:
# We have a backpointer to a
# non-packed record. We have to be
# careful. If we were pointing to a
# current record, then we should still
# point at one, otherwise, we should
# point at the last non-version record.
if pindex[oid]==p:
# we were pointing to the
# current record
p=index[oid]
else:
p=nvindex[oid]
else:
# This points back to a non-packed record.
# Just adjust for the offset
p=p-offset
p=p64(p)
sprev=p64(index_get(oid,0))
write(pack(">8s8s8s8sH8s",
oid,serial,sprev,p64(otpos),vlen,splen))
if vlen:
if not pnv:
write(z64)
else:
if pnv < packpos:
# we need to point to the packed
# non-version rec
pnv=nvindex[oid]
else:
# we just need to adjust the pointer
# with the offset
pnv=pnv-offset
write(p64(pnv))
write(pv)
write(version)
write(p)
# print 'current', opos
# skip the (intentionally redundant) transaction length
pos=pos+8
if locked:
# temporarily release the lock to give other threads
# a chance to do some work!
_lock_release()
locked=0
for oid, p in tindex:
index[oid]=p # Record the position
del tindex[:]
# Now, maybe we need to hack or delete the transaction
otl=opos-otpos
if otl != tl:
# Oops, what came out is not what came in!
# Check for empty:
if otl==thl:
# Empty, slide back over the header:
opos=otpos
oseek(opos)
else:
# Not empty, but we need to adjust transaction length
# and update the status
oseek(otpos+8)
otl=p64(otl)
write(otl+status)
oseek(opos)
write(otl)
opos=opos+8
else:
write(p64(otl))
opos=opos+8
if not packing:
# We are in the copying phase. Lets update the
# pack time and release the lock so others can write.
_lock_acquire()
locked=1
# OK, we've copied everything. Now we need to wrap things
# up.
# Hack the files around.
name=self.__name__
ofile.flush()
ofile.close()
file.close()
self._file.close()
try: os.remove(name)
except:
# Waaa
self._file=open(name,'r+b')
raise
# OK, we're beyond the point of no return
os.rename(name+'.pack', name)
self._file=open(name,'r+b')
self._initIndex(index, vindex, tindex, tvindex)
self._pos=opos
finally:
if locked: _lock_release()
_lock_acquire()
self._packt=z64
_lock_release()
def read_index(file, name, index, vindex, tindex, stop='\377'*8): def read_index(file, name, index, vindex, tindex, stop='\377'*8):
indexpos=index.get index_get=index.get
vndexpos=vindex.get vndexpos=vindex.get
tappend=tindex.append tappend=tindex.append
...@@ -768,13 +1125,12 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8): ...@@ -768,13 +1125,12 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8):
if status=='u': if status=='u':
# Undone transaction, skip it # Undone transaction, skip it
pos=tpos+tl seek(tend)
seek(pos)
h=read(8) h=read(8)
if h != stl: if h != stl:
panic('%s has inconsistent transaction length at %s', panic('%s has inconsistent transaction length at %s',
name, pos) name, pos)
pos=pos+8 pos=tend+8
continue continue
pos=tpos+23+ul+dl+el pos=tpos+23+ul+dl+el
...@@ -790,6 +1146,8 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8): ...@@ -790,6 +1146,8 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8):
dlen=42+(plen or 8) dlen=42+(plen or 8)
tappend((oid,pos)) tappend((oid,pos))
# print u64(oid), pos, vlen, plen, index.get(oid,'?')
if vlen: if vlen:
dlen=dlen+16+vlen dlen=dlen+16+vlen
...@@ -805,7 +1163,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8): ...@@ -805,7 +1163,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8):
if pos+dlen > tend or tloc != tpos: if pos+dlen > tend or tloc != tpos:
panic("%s data record exceeds transaction record at %s", panic("%s data record exceeds transaction record at %s",
name, pos) name, pos)
if indexpos(oid,0) != prev: if index_get(oid,0) != prev:
panic("%s incorrect previous pointer at %s", panic("%s incorrect previous pointer at %s",
name, pos) name, pos)
...@@ -843,15 +1201,25 @@ def _loadBack(file, oid, back): ...@@ -843,15 +1201,25 @@ def _loadBack(file, oid, back):
seek(old) seek(old)
h=read(42) h=read(42)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h) doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
#if doid != oid:
# panic(lambda x: None,
# "%s version record back pointer points to "
# "invalid record as %s", name, back)
if vlen: seek(vlen+16,1) if vlen: seek(vlen+16,1)
if plen != z64: return read(u64(plen)), serial if plen != z64: return read(u64(plen)), serial
back=read(8) # We got a back pointer! back=read(8) # We got a back pointer!
def _loadBackPOS(file, oid, back):
seek=file.seek
read=file.read
while 1:
old=u64(back)
if not old: raise KeyError, oid
seek(old)
h=read(42)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
if vlen: seek(vlen+16,1)
if plen != z64: return old
back=read(8) # We got a back pointer!
def _checkVindex(file, index, vindex): def _checkVindex(file, index, vindex):
seek=file.seek seek=file.seek
read=file.read read=file.read
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment