Commit 522ed337 authored by Jim Fulton's avatar Jim Fulton

Added iterator facility, which enables scripts that copy data

from one storage to another.
parent 8b83f04f
......@@ -184,7 +184,7 @@
# may have a back pointer to a version record or to a non-version
# record.
#
__version__='$Revision: 1.41 $'[11:-2]
__version__='$Revision: 1.42 $'[11:-2]
import struct, time, os, bpthread, string, base64, sys
from struct import pack, unpack
......@@ -272,6 +272,8 @@ class FileStorage(BaseStorage.BaseStorage):
self._tfile=None
self._file_name=file_name
BaseStorage.BaseStorage.__init__(self, file_name)
index, vindex, tindex, tvindex = self._newIndexes()
......@@ -1312,6 +1314,9 @@ class FileStorage(BaseStorage.BaseStorage):
self._packt=z64
_lock_release()
def iterator(self): return FileIterator(self._file_name)
def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
"""Copy transactions forward in the data file
......@@ -1684,3 +1689,194 @@ def _truncate(file, name, pos):
seek(pos)
file.truncate()
class Iterator:
"""A General simple iterator that uses the Python for-loop index protocol
"""
__index=-1
__current=None
def __getitem__(self, i):
__index=self.__index
while i > __index:
__index=__index+1
self.__current=self.next(__index)
self.__index=__index
return self.__current
class FileIterator(Iterator):
"""Iterate over the transactions in a FileStorage file.
"""
_ltid=z64
def __init__(self, file):
if type(file) is type(''): file=open(file, 'r+b')
self._file=file
if file.read(4) != packed_version: raise FileStorageFormatError, name
file.seek(0,2)
self._file_size=file.tell()
self._pos=4
self._index={}
def next(self, index=0):
file=self._file
seek=file.seek
read=file.read
pos=self._pos
while 1:
# Read the transaction record
seek(pos)
h=read(23)
if len(h) < 23: break
tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
if el < 0: el=t32-el
if tid <= self._ltid:
warn("%s time-stamp reduction at %s", name, pos)
self._ltid=tid
tl=u64(stl)
if tl+pos+8 > self._file_size or status=='c':
# Hm, the data were truncated or the checkpoint flag wasn't
# cleared. They may also be corrupted,
# in which case, we don't want to totally lose the data.
warn("%s truncated, possibly due to damaged records at %s",
name, pos)
break
if status not in ' up':
warn('%s has invalid status, %s, at %s', name, status, pos)
if ul > tl or dl > tl or el > tl or tl < (23+ul+dl+el):
# We're in trouble. Find out if this is bad data in the
# middle of the file, or just a turd that Win 9x dropped
# at the end when the system crashed.
# Skip to the end and read what should be the transaction length
# of the last transaction.
seek(-8, 2)
rtl=u64(read(8))
# Now check to see if the redundant transaction length is
# reasonable:
if self._file_size - rtl < pos or rtl < 23:
nearPanic('%s has invalid transaction header at %s',
name, pos)
warn("It appears that there is invalid data at the end of the "
"file, possibly due to a system crash. %s truncated "
"to recover from bad data at end."
% name)
break
else:
warn('%s has invalid transaction header at %s', name, pos)
break
# if tid >= stop: raise IndexError, index
tpos=pos
tend=tpos+tl
if status=='u':
# Undone transaction, skip it
seek(tend)
h=read(8)
if h != stl:
panic('%s has inconsistent transaction length at %s',
name, pos)
pos=tend+8
continue
pos=tpos+23+ul+dl+el
user=read(ul)
description=read(dl)
d={}
if el:
try:
e=loads(read(el))
d.update(e)
except: pass
result=RecordIterator(
tid, user, description, d,
pos, (tend, file, seek, read,
self._index, self._index.get,
tpos,
)
)
pos=tend
# Read the (intentionally redundant) transaction length
seek(pos)
h=read(8)
if h != stl:
warn("%s redundant transaction length check failed at %s",
name, pos)
break
self._pos=pos+8
return result
raise IndexError, index
class RecordIterator(Iterator):
"""Iterate over the transactions in a FileStorage file.
"""
def __init__(self, tid, user, desc, ext, pos, stuff):
self.tid=tid
self.user=user
self.description=desc
self._extension=ext
self._pos=pos
self._stuff = stuff
def next(self, index=0):
name=''
pos = self._pos
tend, file, seek, read, _index, get, tpos = self._stuff
while pos < tend:
# Read the data records for this transaction
seek(pos)
h=read(42)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
prev=u64(sprev)
tloc=u64(stloc)
plen=u64(splen)
dlen=42+(plen or 8)
if vlen:
dlen=dlen+16+vlen
seek(8,1)
pv=u64(read(8))
version=read(vlen)
else:
version=''
if pos+dlen > tend or tloc != tpos:
warn("%s data record exceeds transaction record at %s",
name, pos)
break
self._pos=pos+dlen
if plen: p=read(plen)
else:
p=read(8)
p=_loadBack(file, oid, p)[0]
r=Record(oid, get(oid, None), serial, version, p)
_index[oid]=serial
return r
raise IndexError, index
class Record:
"""An abstract database record
"""
def __init__(self, *args):
self.oid, self.old, self.serial, self.version, self.p = args
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment