Commit 0b2d3637 authored by Jeremy Hylton's avatar Jeremy Hylton

Various small cleanups in FileStorage.

Share backpointer implementation between _loadBack() and
_loadBackPOS() to avoid duplicate code.

Simplify interface between FileIterator and RecordIterator.  Don't
pass unused arguments and don't pass nested tuples.

Use global variables to hold trans and data struct formats.

Reformat several docstrings.

Add whitespace to RecordIterator so that I can read it.
parent e8c35106
......@@ -115,7 +115,7 @@
# may have a back pointer to a version record or to a non-version
# record.
#
__version__='$Revision: 1.107 $'[11:-2]
__version__='$Revision: 1.108 $'[11:-2]
import base64
from cPickle import Pickler, Unpickler, loads
......@@ -148,11 +148,17 @@ from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC, register_subsystem
register_subsystem('ZODB FS')
z64='\0'*8
# the struct formats for the headers
TRANS_HDR = ">8s8scHHH"
DATA_HDR = ">8s8s8s8sH8s"
# constants to support various header sizes
TRANS_HDR_LEN = 23
DATA_HDR_LEN = 42
DATA_VERSION_HDR_LEN = 58
assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
def warn(message, *data):
LOG('ZODB FS', WARNING, "%s warn: %s\n" % (packed_version,
(message % data)))
......@@ -175,21 +181,19 @@ class FileStorageError(POSException.StorageError): pass
class FileStorageFormatError(FileStorageError):
"""Invalid file format
The format of the given file is not valid
The format of the given file is not valid.
"""
class CorruptedFileStorageError(FileStorageError,
POSException.StorageSystemError):
"""Corrupted file storage
"""
"""Corrupted file storage."""
class CorruptedTransactionError(CorruptedFileStorageError): pass
class CorruptedDataError(CorruptedFileStorageError): pass
class FileStorageQuotaError(FileStorageError,
POSException.StorageSystemError):
"""File storage quota exceeded
"""
"""File storage quota exceeded."""
packed_version='FS21'
......@@ -365,7 +369,7 @@ class FileStorage(BaseStorage.BaseStorage,
if pos < 4: return 0
seek(pos)
s = read(TRANS_HDR_LEN)
tid, stl, status, ul, dl, el = unpack(">8s8scHHH", s)
tid, stl, status, ul, dl, el = unpack(TRANS_HDR, s)
if not ltid: ltid=tid
if stl != rstl: return 0 # inconsistent lengths
if status == 'u': continue # undone trans, search back
......@@ -379,7 +383,7 @@ class FileStorage(BaseStorage.BaseStorage,
# Read the data records for this transaction
seek(opos)
h=read(DATA_HDR_LEN)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
oid,serial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
tloc=U64(stloc)
plen=U64(splen)
......@@ -549,7 +553,7 @@ class FileStorage(BaseStorage.BaseStorage,
file.seek(pos)
read=file.read
h=read(DATA_HDR_LEN)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
doid,serial,prev,tloc,vlen,plen = unpack(DATA_HDR, h)
if vlen:
nv = read(8) != z64
file.seek(8,1) # Skip previous version record pointer
......@@ -594,7 +598,7 @@ class FileStorage(BaseStorage.BaseStorage,
file.seek(pos)
read = file.read
h = read(DATA_HDR_LEN)
doid, serial, prev, tloc, vlen, plen = unpack(">8s8s8s8sH8s", h)
doid, serial, prev, tloc, vlen, plen = unpack(DATA_HDR, h)
if doid != oid:
raise CorruptedDataError, h
if vlen:
......@@ -633,7 +637,7 @@ class FileStorage(BaseStorage.BaseStorage,
while 1:
seek(pos)
h=read(DATA_HDR_LEN)
doid,dserial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
doid,dserial,prev,tloc,vlen,plen = unpack(DATA_HDR, h)
if doid != oid: raise CorruptedDataError, h
if dserial == serial: break # Yeee ha!
# Keep looking for serial
......@@ -662,13 +666,12 @@ class FileStorage(BaseStorage.BaseStorage,
except KeyError:
raise POSKeyError(oid)
file=self._file
seek=file.seek
seek(pos)
file.seek(pos)
doid,serial,prev,tloc,vlen = unpack(">8s8s8s8sH", file.read(34))
if doid != oid:
raise CorruptedDataError, pos
if vlen:
seek(24,1) # skip plen, pnv, and pv
file.read(24) # skip plen, pnv, and pv
return file.read(vlen)
return ''
finally: self._lock_release()
......@@ -686,7 +689,7 @@ class FileStorage(BaseStorage.BaseStorage,
if old:
self._file.seek(old)
h=self._file.read(DATA_HDR_LEN)
doid,oserial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
if doid != oid: raise CorruptedDataError, h
if vlen:
pnv=self._file.read(8) # non-version data pointer
......@@ -710,7 +713,7 @@ class FileStorage(BaseStorage.BaseStorage,
here=pos+(tfile.tell()+self._thl)
self._tindex[oid]=here
newserial=self._serial
write(pack(">8s8s8s8sH8s",
write(pack(DATA_HDR,
oid, newserial, p64(old), p64(pos),
len(version), p64(len(data))
)
......@@ -784,7 +787,7 @@ class FileStorage(BaseStorage.BaseStorage,
if old:
self._file.seek(old)
h = self._file.read(42)
doid, x, y, z, vlen, w = unpack(">8s8s8s8sH8s", h)
doid, x, y, z, vlen, w = unpack(DATA_HDR, h)
if doid != oid:
raise CorruptedDataError, h
# XXX assert versions match?
......@@ -940,7 +943,7 @@ class FileStorage(BaseStorage.BaseStorage,
# Read the data records for this transaction
seek(pos)
h=read(DATA_HDR_LEN)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
oid,serial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
plen=U64(splen)
prev=U64(sprev)
dlen=DATA_HDR_LEN+(plen or 8)
......@@ -973,7 +976,7 @@ class FileStorage(BaseStorage.BaseStorage,
read=file.read
file.seek(pos)
h=read(DATA_HDR_LEN)
roid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
roid,serial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
if roid != oid: raise UndoError('Invalid undo transaction id')
if vlen:
read(16) # skip nv pointer and version previous pointer
......@@ -996,7 +999,7 @@ class FileStorage(BaseStorage.BaseStorage,
self._file.seek(pos)
read=self._file.read
h=read(DATA_HDR_LEN)
doid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
doid,serial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
if vlen:
h=read(16)
return read(vlen), h[:8]
......@@ -1171,7 +1174,7 @@ class FileStorage(BaseStorage.BaseStorage,
self._file.seek(pos)
h = self._file.read(DATA_HDR_LEN)
oid, serial, sprev, stloc, vlen, splen = \
struct.unpack(">8s8s8s8sH8s", h)
struct.unpack(DATA_HDR, h)
if failed(oid):
del failures[oid] # second chance!
plen = U64(splen)
......@@ -1192,7 +1195,7 @@ class FileStorage(BaseStorage.BaseStorage,
failures[oid] = v
else:
plen = len(p)
self._tfile.write(pack(">8s8s8s8sH8s",
self._tfile.write(pack(DATA_HDR,
oid, self._serial, p64(ipos),
ostloc, len(v), p64(plen)))
if v:
......@@ -1283,7 +1286,7 @@ class FileStorage(BaseStorage.BaseStorage,
if len(r) >= size: return r
seek(pos)
h=read(DATA_HDR_LEN)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
doid,serial,prev,tloc,vlen,plen = unpack(DATA_HDR, h)
prev=U64(prev)
if vlen:
......@@ -1302,7 +1305,7 @@ class FileStorage(BaseStorage.BaseStorage,
seek(U64(tloc))
h=read(TRANS_HDR_LEN)
tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
user_name=read(ul)
description=read(dl)
if el: d=loads(read(el))
......@@ -1467,7 +1470,7 @@ class FileStorage(BaseStorage.BaseStorage,
seek(pos)
h=read(TRANS_HDR_LEN)
if len(h) < TRANS_HDR_LEN: break
tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
if status=='c':
# Oops. we found a checkpoint flag.
break
......@@ -1510,7 +1513,7 @@ class FileStorage(BaseStorage.BaseStorage,
seek(pos)
h=read(DATA_HDR_LEN)
oid,serial,sprev,stloc,vlen,splen = unpack(
">8s8s8s8sH8s", h)
DATA_HDR, h)
plen=U64(splen)
dlen=DATA_HDR_LEN+(plen or 8)
......@@ -1545,7 +1548,7 @@ class FileStorage(BaseStorage.BaseStorage,
seek(ppos)
ph=read(DATA_HDR_LEN)
pdoid,ps,pp,pt,pvlen,pplen = unpack(
">8s8s8s8sH8s", ph)
DATA_HDR, ph)
if not pvlen:
# The most current record is committed, so
# we can toss this one
......@@ -1614,7 +1617,7 @@ class FileStorage(BaseStorage.BaseStorage,
p=p64(p)
sprev=p64(index_get(oid, 0))
write(pack(">8s8s8s8sH8s",
write(pack(DATA_HDR,
oid,serial,sprev,p64(otpos),vlen,splen))
if vlen:
if not pnv:
......@@ -1780,7 +1783,7 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
seek(pos)
h=read(TRANS_HDR_LEN)
if len(h) < TRANS_HDR_LEN: break
tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
if status=='c': break # Oops. we found a checkpoint flag.
tl=U64(stl)
tpos=pos
......@@ -1805,7 +1808,7 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
# Read the data records for this transaction
seek(pos)
h=read(DATA_HDR_LEN)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
oid,serial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
plen=U64(splen)
dlen=DATA_HDR_LEN+(plen or 8)
......@@ -1834,7 +1837,7 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
# WRITE
seek(opos)
sprev=p64(index_get(oid, 0))
write(pack(">8s8s8s8sH8s",
write(pack(DATA_HDR,
oid,serial,sprev,p64(otpos),vlen,splen))
if vlen:
if not pnv: write(z64)
......@@ -1960,7 +1963,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
file.truncate()
break
tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
if el < 0: el=t32-el
if tid <= ltid:
......@@ -2027,7 +2030,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
seek(pos)
h=read(DATA_HDR_LEN)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
oid,serial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
prev=U64(sprev)
tloc=U64(stloc)
plen=U64(splen)
......@@ -2082,37 +2085,30 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
return pos, maxoid, ltid
def _loadBack(file, oid, back):
def _loadBack_impl(file, oid, back):
# shared implementation used by various _loadBack methods
while 1:
old = U64(back)
if not old:
raise POSKeyError(oid)
file.seek(old)
h=file.read(DATA_HDR_LEN)
doid, serial, prev, tloc, vlen, plen = unpack(">8s8s8s8sH8s", h)
h = file.read(DATA_HDR_LEN)
doid, serial, prev, tloc, vlen, plen = unpack(DATA_HDR, h)
if vlen:
file.seek(vlen + 16, 1)
if plen != z64:
return file.read(U64(plen)), serial
return file.read(U64(plen)), serial, old
back = file.read(8) # We got a back pointer!
def _loadBackPOS(file, oid, back):
"""Return the position of the record containing the data used by
the record at the given position (back)."""
seek=file.seek
read=file.read
def _loadBack(file, oid, back):
data, serial, old = _loadBack_impl(file, oid, back)
return data, serial
while 1:
old=U64(back)
if not old:
raise POSKeyError(oid)
seek(old)
h=read(DATA_HDR_LEN)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
if vlen: seek(vlen+16,1)
if plen != z64: return old
back=read(8) # We got a back pointer!
def _loadBackPOS(file, oid, back):
"""Return position of data record for backpointer."""
data, serial, old = _loadBack_impl(file, oid, back)
return old
def _truncate(file, name, pos):
seek=file.seek
......@@ -2224,7 +2220,7 @@ class FileIterator(Iterator):
h=read(TRANS_HDR_LEN)
if len(h) < TRANS_HDR_LEN: break
tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
if el < 0: el=t32-el
if tid <= self._ltid:
......@@ -2292,23 +2288,18 @@ class FileIterator(Iterator):
except: e={}
else: e={}
result=RecordIterator(
tid, status, user, description, e,
pos, (tend, file, seek, read,
tpos,
)
)
pos=tend
result = RecordIterator(tid, status, user, description, e, pos,
tend, file, tpos)
pos = tend
# Read the (intentionally redundant) transaction length
seek(pos)
h=read(8)
h = read(8)
if h != stl:
warn("%s redundant transaction length check failed at %s",
self._file.name, pos)
break
self._pos=pos+8
self._pos = pos + 8
return result
......@@ -2317,48 +2308,48 @@ class FileIterator(Iterator):
class RecordIterator(Iterator, BaseStorage.TransactionRecord):
"""Iterate over the transactions in a FileStorage file.
"""
def __init__(self, tid, status, user, desc, ext, pos, stuff):
self.tid=tid
self.status=status
self.user=user
self.description=desc
self._extension=ext
self._pos=pos
self._stuff = stuff
def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
self.tid = tid
self.status = status
self.user = user
self.description = desc
self._extension = ext
self._pos = pos
self._tend = tend
self._file = file
self._tpos = tpos
def next(self, index=0):
name=''
pos = self._pos
tend, file, seek, read, tpos = self._stuff
while pos < tend:
while pos < self._tend:
# Read the data records for this transaction
seek(pos)
h=read(DATA_HDR_LEN)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
prev=U64(sprev)
tloc=U64(stloc)
plen=U64(splen)
self._file.seek(pos)
h = self._file.read(DATA_HDR_LEN)
oid, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
prev = U64(sprev)
tloc = U64(stloc)
plen = U64(splen)
dlen=DATA_HDR_LEN+(plen or 8)
dlen = DATA_HDR_LEN + (plen or 8)
if vlen:
dlen=dlen+(16+vlen)
seek(8,1)
pv=U64(read(8))
version=read(vlen)
dlen += (16 + vlen)
tmp = self._file.read(16)
pv = U64(tmp[8:16])
version = self._file.read(vlen)
else:
version=''
version = ''
if pos+dlen > tend or tloc != tpos:
if pos + dlen > self._tend or tloc != self._tpos:
warn("%s data record exceeds transaction record at %s",
name, pos)
file.name, pos)
break
self._pos=pos+dlen
self._pos = pos + dlen
if plen:
p = read(plen)
p = self._file.read(plen)
else:
p = read(8)
p = self._file.read(8)
if p == z64:
# If the backpointer is 0 (encoded as z64), then
# this transaction undoes the object creation. It
......@@ -2368,7 +2359,7 @@ class RecordIterator(Iterator, BaseStorage.TransactionRecord):
# this.
p = None
else:
p = _loadBack(file, oid, p)[0]
p = _loadBack(self._file, oid, p)[0]
r = Record(oid, serial, version, p)
......@@ -2377,8 +2368,7 @@ class RecordIterator(Iterator, BaseStorage.TransactionRecord):
raise IndexError, index
class Record(BaseStorage.DataRecord):
"""An abstract database record
"""
"""An abstract database record."""
def __init__(self, *args):
self.oid, self.serial, self.version, self.data = args
......@@ -2414,7 +2404,7 @@ class UndoSearch:
self.pos -= U64(self.file.read(8)) + 8
self.file.seek(self.pos)
h = self.file.read(TRANS_HDR_LEN)
tid, tl, status, ul, dl, el = struct.unpack(">8s8scHHH", h)
tid, tl, status, ul, dl, el = struct.unpack(TRANS_HDR, h)
if tid < self.packt or status == 'p':
self.stop = 1
return None
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment