Commit e9aa3bd3 authored by Jim Fulton's avatar Jim Fulton

Fixed misshandling of files > 2GB in size on systems with 32-bit

integers that could lead to database corruption. Now all file position
computation is done with Python longs. Thanks to Shane Hathaway for
finding this.

Fixed a bug that could cause database corruption if transaction user
names, descriptions, or extension data exceeded 64K bytes. This was
due to the faulty assumption that struct.pack caught overflows, which
it doesn't. :(

Changed to use new tpc_vote protocol. This allows errors that occur
in the last transaction step to be caught during the first phase of
two-phase commit. For example, if we run out of disk space, we now
catch the problem early enough to recover from it.
parent a4a3691f
......@@ -184,7 +184,7 @@
# may have a back pointer to a version record or to a non-version
# record.
#
__version__='$Revision: 1.44 $'[11:-2]
__version__='$Revision: 1.45 $'[11:-2]
import struct, time, os, bpthread, string, base64, sys
from struct import pack, unpack
......@@ -192,7 +192,7 @@ from cPickle import loads
import POSException
from TimeStamp import TimeStamp
from lock_file import lock_file
from utils import t32, p64, u64, cp
from utils import t32, p64, U64, cp
from zLOG import LOG, WARNING, ERROR, PANIC, register_subsystem
register_subsystem('ZODB FS')
import BaseStorage
......@@ -299,7 +299,7 @@ class FileStorage(BaseStorage.BaseStorage):
file, file_name, index, vindex, tindex, stop,
ltid=ltid, start=start, maxoid=maxoid,
)
else:
else:
self._pos, self._oid, tid = read_index(
file, file_name, index, vindex, tindex, stop)
......@@ -379,7 +379,7 @@ class FileStorage(BaseStorage.BaseStorage):
while 1:
seek(pos-8)
rstl=read(8)
tl=u64(rstl)
tl=U64(rstl)
pos=pos-tl-8
if pos < 4: return 0
seek(pos)
......@@ -390,7 +390,7 @@ class FileStorage(BaseStorage.BaseStorage):
if status not in ' p': return 0
if ul > tl or dl > tl or el > tl or tl < (23+ul+dl+el): return 0
tend=pos+tl
opos=pos+23+ul+dl+el
opos=pos+(23+ul+dl+el)
if opos==tend: continue # empty trans
while opos < tend:
......@@ -398,11 +398,11 @@ class FileStorage(BaseStorage.BaseStorage):
seek(opos)
h=read(42)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
tloc=u64(stloc)
plen=u64(splen)
tloc=U64(stloc)
plen=U64(splen)
dlen=42+(plen or 8)
if vlen: dlen=dlen+16+vlen
if vlen: dlen=dlen+(16+vlen)
if opos+dlen > tend or tloc != pos: return 0
......@@ -425,7 +425,7 @@ class FileStorage(BaseStorage.BaseStorage):
info=p.load()
index=info.get('index', None)
pos=info.get('pos', None)
pos=long(info.get('pos', None))
oid=info.get('oid', None)
vindex=info.get('vindex', None)
if index is None or pos is None or oid is None or vindex is None:
......@@ -440,7 +440,8 @@ class FileStorage(BaseStorage.BaseStorage):
self._file.close()
self._lock_file.close()
self._tfile.close()
self._save_index()
try: self._save_index()
except: pass # We don't care if this fails.
def commitVersion(self, src, dest, transaction, abort=None):
# We are going to commit by simply storing back pointers.
......@@ -474,7 +475,7 @@ class FileStorage(BaseStorage.BaseStorage):
sd=''
heredelta=50
here=tfile.tell()+self._pos+self._thl
here=self._pos+(tfile.tell()+self._thl)
oids=[]
appoids=oids.append
tvindex=self._tvindex
......@@ -515,7 +516,7 @@ class FileStorage(BaseStorage.BaseStorage):
# We haven't checked this transaction before,
# get it's status.
t=tloc
seek(u64(t)+16)
seek(U64(t)+16)
tstatus=read(1)
if tstatus != 'u':
......@@ -532,7 +533,7 @@ class FileStorage(BaseStorage.BaseStorage):
del current_oids[oid]
spos=h[-8:]
srcpos=u64(spos)
srcpos=U64(spos)
return oids
......@@ -555,7 +556,7 @@ class FileStorage(BaseStorage.BaseStorage):
version=''
nv=0
if plen != z64: return read(u64(plen)), version, nv
if plen != z64: return read(U64(plen)), version, nv
return _loadBack(file, oid, read(8))[0], version, nv
def _load(self, oid, version, _index, file):
......@@ -575,7 +576,7 @@ class FileStorage(BaseStorage.BaseStorage):
# If we get here, then either this was not a version record,
# or we've already read past the version data!
if plen != z64: return read(u64(plen)), serial
if plen != z64: return read(U64(plen)), serial
pnv=read(8)
# We use the current serial, since that is the one that
# will get checked when we store.
......@@ -601,7 +602,7 @@ class FileStorage(BaseStorage.BaseStorage):
if doid != oid: raise CorruptedDataError, h
if dserial == serial: break # Yeee ha!
# Keep looking for serial
pos=u64(prev)
pos=U64(prev)
if not pos: raise KeyError, serial
continue
......@@ -610,7 +611,7 @@ class FileStorage(BaseStorage.BaseStorage):
read(8) # skip past version link
read(vlen) # skip version
if plen != z64: return read(u64(plen))
if plen != z64: return read(U64(plen))
# We got a backpointer, probably from a commit.
pnv=read(8)
......@@ -662,7 +663,7 @@ class FileStorage(BaseStorage.BaseStorage):
tfile=self._tfile
write=tfile.write
pos=self._pos
here=tfile.tell()+pos+self._thl
here=pos+(tfile.tell()+self._thl)
self._tappend((oid, here))
serial=self._serial
write(pack(">8s8s8s8sH8s",
......@@ -684,7 +685,7 @@ class FileStorage(BaseStorage.BaseStorage):
# Check quota
quota=self._quota
if quota is not None and tfile.tell()+pos+self._thl > quota:
if quota is not None and pos+(tfile.tell()+self._thl) > quota:
raise FileStorageQuotaError, (
'The storage quota has been exceeded.')
......@@ -702,66 +703,90 @@ class FileStorage(BaseStorage.BaseStorage):
def _begin(self, tid, u, d, e):
self._thl=23+len(u)+len(d)+len(e)
self._nextpos=0
def tpc_vote(self, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
tfile=self._tfile
dlen=tfile.tell()
if not dlen: return # No data in this trans
file=self._file
write=file.write
tfile.seek(0)
id=self._serial
user, desc, ext = self._ude
luser=len(user)
ldesc=len(desc)
lext=len(ext)
# We have to check lengths here because struct.pack
# doesn't raise an exception on overflow!
if luser > 65535: raise FileStorageError, 'user name too long'
if ldesc > 65535: raise FileStorageError, 'description too long'
if lext > 65535: raise FileStorageError, 'too much extension data'
tlen=self._thl
pos=self._pos
file.seek(pos)
tl=tlen+dlen
stl=p64(tl)
try:
# Note that we use a status of 'c', for checkpoint.
# If this flag isn't cleared, anything after this is
# suspect.
write(pack(
">8s" "8s" "c" "H" "H" "H"
,id, stl, 'c', luser, ldesc, lext,
))
if user: write(user)
if desc: write(desc)
if ext: write(ext)
cp(tfile, file, dlen)
write(stl)
file.flush()
except:
# Hm, an error occured writing out the data. Maybe the
# disk is full. We don't want any turd at the end.
file.truncate(pos)
raise
self._nextpos=pos+(tl+8)
finally: self._lock_release()
def _finish(self, tid, u, d, e):
tfile=self._tfile
dlen=tfile.tell()
if not dlen: return # No data in this trans
file=self._file
write=file.write
tfile.seek(0)
id=self._serial
user, desc, ext = self._ude
tlen=self._thl
pos=self._pos
file.seek(pos)
tl=tlen+dlen
stl=p64(tl)
try:
# Note that we use a status of 'c', for checkpoint.
# If this flag isn't cleared, anything after this is
# suspect.
write(pack(
">8s" "8s" "c" "H" "H" "H"
,id, stl, 'c', len(user), len(desc), len(ext),
))
if user: write(user)
if desc: write(desc)
if ext: write(ext)
cp(tfile, file, dlen)
write(stl)
# OK, not clear the checkpoint flag
file.seek(pos+16)
write(self._tstatus)
file.flush()
# Clear the checkpoint flag
file.seek(self._pos+16)
file.write(self._tstatus)
file.flush()
if fsync is not None:
fsync(file.fileno())
if fsync is not None: fsync(file.fileno())
except:
# Hm, an error occured writing out the data. Maybe the
# disk is full. We don't want any turd at the end.
file.truncate(pos)
self._pos=pos+tl+8
self._pos=self._nextpos
index=self._index
for oid, pos in self._tindex: index[oid]=pos
self._vindex.update(self._tvindex)
def _abort(self):
if self._nextpos: self._file.truncate(self._nextpos)
def undo(self, transaction_id):
self._lock_acquire()
try:
self._clear_index()
transaction_id=base64.decodestring(transaction_id+'==\n')
tid, tpos = transaction_id[:8], u64(transaction_id[8:])
tid, tpos = transaction_id[:8], U64(transaction_id[8:])
packt=self._packt
if packt is None or packt > tid:
raise POSException.UndoError, (
......@@ -779,20 +804,20 @@ class FileStorage(BaseStorage.BaseStorage):
if h[16] == 'u': return
if h[16] != ' ':
raise POSException.UndoError, 'Undoable transaction'
tl=u64(h[8:16])
tl=U64(h[8:16])
ul,dl,el=unpack(">HHH", h[17:23])
tend=tpos+tl
pos=tpos+23+ul+dl+el
pos=tpos+(23+ul+dl+el)
t={}
while pos < tend:
# Read the data records for this transaction
seek(pos)
h=read(42)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
plen=u64(splen)
prev=u64(sprev)
plen=U64(splen)
prev=U64(sprev)
dlen=42+(plen or 8)
if vlen: dlen=dlen+16+vlen
if vlen: dlen=dlen+(16+vlen)
if index_get(oid,0) != pos:
raise POSException.UndoError, 'Undoable transaction'
pos=pos+dlen
......@@ -828,7 +853,7 @@ class FileStorage(BaseStorage.BaseStorage):
i=0
while i < last and pos > 39:
seek(pos-8)
pos=pos-u64(read(8))-8
pos=pos-U64(read(8))-8
seek(pos)
h=read(23)
tid, tl, status, ul, dl, el = unpack(">8s8scHHH", h)
......@@ -870,13 +895,13 @@ class FileStorage(BaseStorage.BaseStorage):
# We haven't checked this transaction before,
# get it's status.
t=tloc
seek(u64(t)+16)
seek(U64(t)+16)
tstatus=read(1)
if tstatus != 'u': return 1
spos=h[-8:]
srcpos=u64(spos)
srcpos=U64(spos)
return 1
finally: self._lock_release()
......@@ -908,7 +933,7 @@ class FileStorage(BaseStorage.BaseStorage):
seek(pos)
h=read(42)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
prev=u64(prev)
prev=U64(prev)
if vlen:
nv = read(8) != z64
......@@ -924,10 +949,9 @@ class FileStorage(BaseStorage.BaseStorage):
version=''
wantver=None
seek(u64(tloc))
seek(U64(tloc))
h=read(23)
tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
if el < 0: el=t32-el
user_name=read(ul)
description=read(dl)
if el: d=loads(read(el))
......@@ -938,7 +962,7 @@ class FileStorage(BaseStorage.BaseStorage):
d['description']=description
d['serial']=serial
d['version']=version
d['size']=u64(plen)
d['size']=U64(plen)
if filter is None or filter(d):
r.append(d)
......@@ -949,7 +973,7 @@ class FileStorage(BaseStorage.BaseStorage):
def _redundant_pack(self, file, pos):
file.seek(pos-8)
p=u64(file.read(8))
p=U64(file.read(8))
file.seek(pos-p+8)
return file.read(1) not in ' u'
......@@ -1044,8 +1068,8 @@ class FileStorage(BaseStorage.BaseStorage):
# Initialize,
pv=z64
offset=0 # the amount of spaec freed by packing
pos=opos=4
offset=0L # the amount of space freed by packing
pos=opos=4L
oseek(0)
write(packed_version)
......@@ -1060,8 +1084,6 @@ class FileStorage(BaseStorage.BaseStorage):
pnv=None
while 1:
# Check for end of packed records
if packing and pos >= packpos:
# OK, we're done with the old stuff, now we have
......@@ -1088,8 +1110,7 @@ class FileStorage(BaseStorage.BaseStorage):
if status=='c':
# Oops. we found a checkpoint flag.
break
if el < 0: el=t32-el
tl=u64(stl)
tl=U64(stl)
tpos=pos
tend=tpos+tl
......@@ -1119,18 +1140,18 @@ class FileStorage(BaseStorage.BaseStorage):
h=read(42)
oid,serial,sprev,stloc,vlen,splen = unpack(
">8s8s8s8sH8s", h)
plen=u64(splen)
plen=U64(splen)
dlen=42+(plen or 8)
if vlen:
dlen=dlen+16+vlen
dlen=dlen+(16+vlen)
if packing and pindex_get(oid,0) != pos:
# This is not the most current record, or
# the oid is no longer referenced so skip it.
pos=pos+dlen
continue
pnv=u64(read(8))
pnv=U64(read(8))
# skip position of previous version record
seek(8,1)
version=read(vlen)
......@@ -1193,7 +1214,7 @@ class FileStorage(BaseStorage.BaseStorage):
opos=opos+plen-8
splen=p64(plen)
else:
p=u64(p)
p=U64(p)
if p < packpos:
# We have a backpointer to a
# non-packed record. We have to be
......@@ -1357,8 +1378,7 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
if len(h) < 23: break
tid, stl, status, ul, dl, el = unpack(">8s8scHHH",h)
if status=='c': break # Oops. we found a checkpoint flag.
if el < 0: el=t32-el
tl=u64(stl)
tl=U64(stl)
tpos=pos
tend=tpos+tl
......@@ -1382,12 +1402,12 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
seek(pos)
h=read(42)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
plen=u64(splen)
plen=U64(splen)
dlen=42+(plen or 8)
if vlen:
dlen=dlen+16+vlen
pnv=u64(read(8))
dlen=dlen+(16+vlen)
pnv=U64(read(8))
# skip position of previous version record
seek(8,1)
version=read(vlen)
......@@ -1399,7 +1419,7 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
if plen: p=read(plen)
else:
p=read(8)
p=u64(p)
p=U64(p)
if p >= p2: p=p-offset
elif p >= p1:
# Ick, we're in trouble. Let's bail
......@@ -1449,7 +1469,7 @@ def search_back(file, pos):
s=p=file.tell()
while p > pos:
seek(p-8)
l=u64(read(8))
l=U64(read(8))
if l <= 0: break
p=p-l-8
......@@ -1482,7 +1502,7 @@ def recover(file_name):
def read_index(file, name, index, vindex, tindex, stop='\377'*8,
ltid=z64, start=4, maxoid=z64, recover=0):
ltid=z64, start=4L, maxoid=z64, recover=0):
read=file.read
seek=file.seek
......@@ -1495,7 +1515,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
if read(4) != packed_version: raise FileStorageFormatError, name
else:
file.write(packed_version)
return 4, maxoid, ltid
return 4L, maxoid, ltid
index_get=index.get
vndexpos=vindex.get
......@@ -1504,7 +1524,6 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
pos=start
seek(start)
unpack=struct.unpack
tpos=0
tid='\0'*7+'\1'
while 1:
......@@ -1524,9 +1543,9 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
warn("%s time-stamp reduction at %s", name, pos)
ltid=tid
tl=u64(stl)
tl=U64(stl)
if tl+pos+8 > file_size or status=='c':
if pos+(tl+8) > file_size or status=='c':
# Hm, the data were truncated or the checkpoint flag wasn't
# cleared. They may also be corrupted,
# in which case, we don't want to totally lose the data.
......@@ -1545,7 +1564,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
# Skip to the end and read what should be the transaction length
# of the last transaction.
seek(-8, 2)
rtl=u64(read(8))
rtl=U64(read(8))
# Now check to see if the redundant transaction length is
# reasonable:
if file_size - rtl < pos or rtl < 23:
......@@ -1557,7 +1576,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
_truncate(file, name, pos)
break
else:
if recover: return tpos, None, None
if recover: return pos, None, None
panic('%s has invalid transaction header at %s', name, pos)
if tid >= stop: break
......@@ -1576,24 +1595,24 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
pos=tend+8
continue
pos=tpos+23+ul+dl+el
pos=tpos+(23+ul+dl+el)
while pos < tend:
# Read the data records for this transaction
seek(pos)
h=read(42)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
prev=u64(sprev)
tloc=u64(stloc)
plen=u64(splen)
prev=U64(sprev)
tloc=U64(stloc)
plen=U64(splen)
dlen=42+(plen or 8)
tappend((oid,pos))
if vlen:
dlen=dlen+16+vlen
dlen=dlen+(16+vlen)
seek(8,1)
pv=u64(read(8))
pv=U64(read(8))
version=read(vlen)
# Jim says: "It's just not worth the bother."
#if vndexpos(version, 0) != pv:
......@@ -1642,14 +1661,14 @@ def _loadBack(file, oid, back):
read=file.read
while 1:
old=u64(back)
old=U64(back)
if not old: raise KeyError, oid
seek(old)
h=read(42)
doid,serial,prev,tloc,vlen,plen = unpack(">8s8s8s8sH8s", h)
if vlen: seek(vlen+16,1)
if plen != z64: return read(u64(plen)), serial
if plen != z64: return read(U64(plen)), serial
back=read(8) # We got a back pointer!
def _loadBackPOS(file, oid, back):
......@@ -1657,7 +1676,7 @@ def _loadBackPOS(file, oid, back):
read=file.read
while 1:
old=u64(back)
old=U64(back)
if not old: raise KeyError, oid
seek(old)
h=read(42)
......@@ -1718,7 +1737,7 @@ class FileIterator(Iterator):
if file.read(4) != packed_version: raise FileStorageFormatError, name
file.seek(0,2)
self._file_size=file.tell()
self._pos=4
self._pos=4L
def next(self, index=0):
file=self._file
......@@ -1739,9 +1758,9 @@ class FileIterator(Iterator):
warn("%s time-stamp reduction at %s", name, pos)
self._ltid=tid
tl=u64(stl)
tl=U64(stl)
if tl+pos+8 > self._file_size or status=='c':
if pos+(tl+8) > self._file_size or status=='c':
# Hm, the data were truncated or the checkpoint flag wasn't
# cleared. They may also be corrupted,
# in which case, we don't want to totally lose the data.
......@@ -1753,21 +1772,21 @@ class FileIterator(Iterator):
warn('%s has invalid status, %s, at %s', name, status, pos)
if ul > tl or dl > tl or el > tl or tl < (23+ul+dl+el):
# We're in trouble. Find out if this is bad data in the
# middle of the file, or just a turd that Win 9x dropped
# at the end when the system crashed.
# Skip to the end and read what should be the transaction length
# of the last transaction.
# We're in trouble. Find out if this is bad data in
# the middle of the file, or just a turd that Win 9x
# dropped at the end when the system crashed. Skip to
# the end and read what should be the transaction
# length of the last transaction.
seek(-8, 2)
rtl=u64(read(8))
rtl=U64(read(8))
# Now check to see if the redundant transaction length is
# reasonable:
if self._file_size - rtl < pos or rtl < 23:
nearPanic('%s has invalid transaction header at %s',
name, pos)
warn("It appears that there is invalid data at the end of the "
"file, possibly due to a system crash. %s truncated "
"to recover from bad data at end."
warn("It appears that there is invalid data at the end of "
"the file, possibly due to a system crash. %s "
"truncated to recover from bad data at end."
% name)
break
else:
......@@ -1789,7 +1808,7 @@ class FileIterator(Iterator):
pos=tend+8
continue
pos=tpos+23+ul+dl+el
pos=tpos+(23+ul+dl+el)
user=read(ul)
description=read(dl)
if el:
......@@ -1841,16 +1860,16 @@ class RecordIterator(Iterator):
seek(pos)
h=read(42)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
prev=u64(sprev)
tloc=u64(stloc)
plen=u64(splen)
prev=U64(sprev)
tloc=U64(stloc)
plen=U64(splen)
dlen=42+(plen or 8)
if vlen:
dlen=dlen+16+vlen
dlen=dlen+(16+vlen)
seek(8,1)
pv=u64(read(8))
pv=U64(read(8))
version=read(vlen)
else:
version=''
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment