Commit 25be6dfc authored by Jim Fulton's avatar Jim Fulton

Added (the rest of) transactional undo. Woo hoo!

To do this, I had to change the temprary index to be a mapping
object (dictionary) rather than a sequence (list).
parent 0937d2a5
...@@ -184,12 +184,13 @@ ...@@ -184,12 +184,13 @@
# may have a back pointer to a version record or to a non-version # may have a back pointer to a version record or to a non-version
# record. # record.
# #
__version__='$Revision: 1.52 $'[11:-2] __version__='$Revision: 1.53 $'[11:-2]
import struct, time, os, bpthread, string, base64, sys import struct, time, os, bpthread, string, base64, sys
from struct import pack, unpack from struct import pack, unpack
from cPickle import loads from cPickle import loads
import POSException import POSException
UndoError = POSException.UndoError
from TimeStamp import TimeStamp from TimeStamp import TimeStamp
from lock_file import lock_file from lock_file import lock_file
from utils import t32, p64, U64, cp from utils import t32, p64, U64, cp
...@@ -327,11 +328,10 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -327,11 +328,10 @@ class FileStorage(BaseStorage.BaseStorage,
self._tvindex=tvindex self._tvindex=tvindex
self._index_get=index.get self._index_get=index.get
self._vindex_get=vindex.get self._vindex_get=vindex.get
self._tappend=tindex.append
def __len__(self): return len(self._index) def __len__(self): return len(self._index)
def _newIndexes(self): return {}, {}, [], {} def _newIndexes(self): return {}, {}, {}, {}
def abortVersion(self, src, transaction): def abortVersion(self, src, transaction):
return self.commitVersion(src, '', transaction, abort=1) return self.commitVersion(src, '', transaction, abort=1)
...@@ -464,7 +464,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -464,7 +464,7 @@ class FileStorage(BaseStorage.BaseStorage,
seek=file.seek seek=file.seek
tfile=self._tfile tfile=self._tfile
write=tfile.write write=tfile.write
tappend=self._tappend tindex=self._tindex
index=self._index index=self._index
index_get=index.get index_get=index.get
...@@ -495,7 +495,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -495,7 +495,7 @@ class FileStorage(BaseStorage.BaseStorage,
pnv=h[-16:-8] pnv=h[-16:-8]
if index_get(oid, None) == srcpos: if index_get(oid, None) == srcpos:
# This is a current record! # This is a current record!
tappend((oid,here)) tindex[oid]=here
appoids(oid) appoids(oid)
write(h[:16] + spos + middle) write(h[:16] + spos + middle)
if dest: if dest:
...@@ -664,7 +664,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -664,7 +664,7 @@ class FileStorage(BaseStorage.BaseStorage,
write=tfile.write write=tfile.write
pos=self._pos pos=self._pos
here=pos+(tfile.tell()+self._thl) here=pos+(tfile.tell()+self._thl)
self._tappend((oid, here)) self._tindex[oid]=here
newserial=self._serial newserial=self._serial
write(pack(">8s8s8s8sH8s", write(pack(">8s8s8s8sH8s",
oid, newserial, p64(old), p64(pos), oid, newserial, p64(old), p64(pos),
...@@ -699,7 +699,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -699,7 +699,7 @@ class FileStorage(BaseStorage.BaseStorage,
def supportsVersions(self): return 1 def supportsVersions(self): return 1
def _clear_temp(self): def _clear_temp(self):
del self._tindex[:] self._tindex.clear()
self._tvindex.clear() self._tvindex.clear()
self._tfile.seek(0) self._tfile.seek(0)
...@@ -708,11 +708,9 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -708,11 +708,9 @@ class FileStorage(BaseStorage.BaseStorage,
self._nextpos=0 self._nextpos=0
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire() self._lock_acquire()
try: try:
if transaction is not self._transaction: return
tfile=self._tfile tfile=self._tfile
dlen=tfile.tell() dlen=tfile.tell()
if not dlen: return # No data in this trans if not dlen: return # No data in this trans
...@@ -777,9 +775,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -777,9 +775,7 @@ class FileStorage(BaseStorage.BaseStorage,
self._pos=nextpos self._pos=nextpos
index=self._index self._index.update(self._tindex)
for oid, pos in self._tindex: index[oid]=pos
self._vindex.update(self._tvindex) self._vindex.update(self._tvindex)
def _abort(self): def _abort(self):
...@@ -793,7 +789,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -793,7 +789,7 @@ class FileStorage(BaseStorage.BaseStorage,
tid, tpos = transaction_id[:8], U64(transaction_id[8:]) tid, tpos = transaction_id[:8], U64(transaction_id[8:])
packt=self._packt packt=self._packt
if packt is None or packt > tid: if packt is None or packt > tid:
raise POSException.UndoError, ( raise UndoError, (
'Undo is currently disabled for database maintenance.<p>') 'Undo is currently disabled for database maintenance.<p>')
file=self._file file=self._file
...@@ -804,10 +800,9 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -804,10 +800,9 @@ class FileStorage(BaseStorage.BaseStorage,
seek(tpos) seek(tpos)
h=read(23) h=read(23)
if len(h) != 23 or h[:8] != tid: if len(h) != 23 or h[:8] != tid:
raise POSException.UndoError, 'Invalid undo transaction id' raise UndoError('Invalid undo transaction id')
if h[16] == 'u': return if h[16] == 'u': return
if h[16] != ' ': if h[16] != ' ': raise UndoError
raise POSException.UndoError, 'non-undoable transaction'
tl=U64(h[8:16]) tl=U64(h[8:16])
ul,dl,el=unpack(">HHH", h[17:23]) ul,dl,el=unpack(">HHH", h[17:23])
tend=tpos+tl tend=tpos+tl
...@@ -822,11 +817,9 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -822,11 +817,9 @@ class FileStorage(BaseStorage.BaseStorage,
prev=U64(sprev) prev=U64(sprev)
dlen=42+(plen or 8) dlen=42+(plen or 8)
if vlen: dlen=dlen+(16+vlen) if vlen: dlen=dlen+(16+vlen)
if index_get(oid,0) != pos: if index_get(oid,0) != pos: raise UndoError
raise POSException.UndoError, 'non-undoable transaction'
pos=pos+dlen pos=pos+dlen
if pos > tend: if pos > tend: raise UndoError
raise POSException.UndoError, 'non-undoable transaction'
t[oid]=prev t[oid]=prev
seek(tpos+16) seek(tpos+16)
...@@ -837,29 +830,40 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -837,29 +830,40 @@ class FileStorage(BaseStorage.BaseStorage,
return t.keys() return t.keys()
finally: self._lock_release() finally: self._lock_release()
def supportsTransactionalUndo(self): return 0 def supportsTransactionalUndo(self): return 1
def _dataInfo(self, oid, pos): def _undoDataInfo(self, oid, pos, tpos):
"""Return the serial, version and data pointer for the oid """Return the serial, data pointer, data, and version for the oid
record at pos""" record at pos"""
file=self._file if tpos:
file=self._tfile
pos = tpos - self._pos - self._thl
tpos=file.tell()
else:
file=self._file
read=file.read read=file.read
file.seek(pos) file.seek(pos)
h=read(42) h=read(42)
roid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h) roid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
if roid != oid: if roid != oid: raise UndoError('Invalid undo transaction id')
raise POSException.UndoError, 'Invalid undo transaction id'
if vlen: if vlen:
read(16) # skip nv pointer and version previous pointer read(16) # skip nv pointer and version previous pointer
version=read(vlen) version=read(vlen)
else: else:
version='' version=''
if U64(splen): plen = U64(splen)
return serial, pos, version if plen:
data = read(plen)
else: else:
return serial, U64(read(8)), version data=''
pos=U64(read(8))
if tpos: file.seek(tpos) # Restore temp file to end
return serial, pos, data, version
def _getVersion(self, oid, pos): def _getVersion(self, oid, pos):
self._file.seek(pos) self._file.seek(pos)
read=self._file.read read=self._file.read
...@@ -887,30 +891,39 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -887,30 +891,39 @@ class FileStorage(BaseStorage.BaseStorage,
""" """
copy=1 # Can we just copy a data pointer copy=1 # Can we just copy a data pointer
tpos=self._tindex.get(oid, 0)
ipos=self._index.get(oid,0) ipos=self._index.get(oid,0)
if ipos != pos: tipos=tpos or ipos
if tipos != pos:
# Eek, a later transaction modified the data, but, # Eek, a later transaction modified the data, but,
# maybe it is pointing at the same data we are. # maybe it is pointing at the same data we are.
cserial, cdata, cver = self._dataInfo(oid, ipos) cserial, cdataptr, cver, cdata = self._undoDataInfo(
oid, ipos, tpos)
# Versions of undone record and current record *must* match! # Versions of undone record and current record *must* match!
if cver != version: if cver != version:
raise POSException.UndoError( raise UndoError('Current and undone versions differ')
'non-undoable transaction')
if cdata != pos: if cdataptr != pos:
# We aren't sure if we are talking about the same data # We aren't sure if we are talking about the same data
if ( try:
cdata == ipos # The current record wrote a new pickle if (
or # The current record wrote a new pickle
# Backpointers are different cdataptr == tipos
_loadBackPOS(self._file, oid, p64(pos)) != or
_loadBackPOS(self._file, oid, p64(cdata)) # Backpointers are different
): _loadBackPOS(self._file, oid, p64(pos)) !=
if pre: _loadBackPOS(self._file, oid, p64(cdataptr))
copy=0 # we'll try to do conflict resolution ):
else: if pre and not tpos:
raise POSException.UndoError( copy=0 # we'll try to do conflict resolution
'non-undoable transaction') else:
# We bail if:
# - We don't have a previous record, which should
# be impossible.
raise UndoError
except KeyError:
# LoadBack gave us a key error. Bail.
raise UndoError
version, snv = self._getVersion(oid, pre) version, snv = self._getVersion(oid, pre)
if copy: if copy:
...@@ -918,13 +931,12 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -918,13 +931,12 @@ class FileStorage(BaseStorage.BaseStorage,
return '', pre, version, snv, ipos return '', pre, version, snv, ipos
data=self.tryToResolveConflict( data=self.tryToResolveConflict(
oid, cserial, serial, _loadBack(self._file, oid, p64(pre))) oid, cserial, serial, _loadBack(self._file, oid, p64(pre)), cdata)
if data: if data:
return data, 0, version, snv, ipos return data, 0, version, snv, ipos
raise POSException.UndoError( raise UndoError('Some data were modified by a later transaction')
'non-undoable transaction')
def transactionalUndo(self, transaction_id, transaction): def transactionalUndo(self, transaction_id, transaction):
"""Undo a transaction, given by transaction_id. """Undo a transaction, given by transaction_id.
...@@ -956,20 +968,23 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -956,20 +968,23 @@ class FileStorage(BaseStorage.BaseStorage,
seek(tpos) seek(tpos)
h=read(23) h=read(23)
if len(h) != 23 or h[:8] != tid: if len(h) != 23 or h[:8] != tid:
raise POSException.UndoError, 'Invalid undo transaction id' raise UndoError, 'Invalid undo transaction id'
if h[16] == 'u': return if h[16] == 'u': return
if h[16] != ' ': if h[16] != ' ':
raise POSException.UndoError, 'non-undoable transaction' raise UndoError, 'non-undoable transaction'
tl=U64(h[8:16]) tl=U64(h[8:16])
ul,dl,el=unpack(">HHH", h[17:23]) ul,dl,el=unpack(">HHH", h[17:23])
tend=tpos+tl tend=tpos+tl
pos=tpos+(23+ul+dl+el) pos=tpos+(23+ul+dl+el)
tindex={} tindex={}
failures={} # keep track of failures, cause we may succeed later
failed=failures.has_key
# Read the data records for this transaction # Read the data records for this transaction
while pos < tend: while pos < tend:
seek(pos) seek(pos)
h=read(42) h=read(42)
oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h) oid,serial,sprev,stloc,vlen,splen = unpack(">8s8s8s8sH8s", h)
if failed(oid): del failures[oid] # second chance!
plen=U64(splen) plen=U64(splen)
prev=U64(sprev) prev=U64(sprev)
if vlen: if vlen:
...@@ -980,31 +995,36 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -980,31 +995,36 @@ class FileStorage(BaseStorage.BaseStorage,
dlen=42+(plen or 8) dlen=42+(plen or 8)
version='' version=''
p, prev, v, snv, ipos = self._transactionalUndoRecord( try:
oid, pos, serial, prev, version) p, prev, v, snv, ipos = self._transactionalUndoRecord(
oid, pos, serial, prev, version)
plen=len(p) except UndoError, v:
write(pack(">8s8s8s8sH8s", # Don't fail right away. We may be redeemed later!
oid, newserial, p64(ipos), ostloc, failures[oid]=v
len(v), p64(plen)))
if v:
vprev=self._tvindex.get(v, self._vindex.get(v, 0))
write(snv+p64(vprev)+v)
self._tvindex[v]=here
odlen = 58+len(v)+(plen or 8)
else: else:
odlen = 42+(plen or 8) plen=len(p)
write(pack(">8s8s8s8sH8s",
oid, newserial, p64(ipos), ostloc,
len(v), p64(plen)))
if v:
vprev=self._tvindex.get(v, 0) or self._vindex.get(v, 0)
write(snv+p64(vprev)+v)
self._tvindex[v]=here
odlen = 58+len(v)+(plen or 8)
else:
odlen = 42+(plen or 8)
if p: write(p) if p: write(p)
else: write(p64(prev)) else: write(p64(prev))
tindex[oid]=here
here=here+odlen
pos=pos+dlen pos=pos+dlen
if pos > tend: if pos > tend:
raise POSException.UndoError, 'non-undoable transaction' raise UndoError, 'non-undoable transaction'
tindex[oid]=here
here=here+odlen
self._tindex[len(self._tindex):] = tindex.items() if failures: raise UndoError(failures)
self._tindex.update(tindex)
return tindex.keys() return tindex.keys()
finally: self._lock_release() finally: self._lock_release()
...@@ -1015,7 +1035,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1015,7 +1035,7 @@ class FileStorage(BaseStorage.BaseStorage,
try: try:
packt=self._packt packt=self._packt
if packt is None: if packt is None:
raise POSException.UndoError, ( raise UndoError(
'Undo is currently disabled for database maintenance.<p>') 'Undo is currently disabled for database maintenance.<p>')
pos=self._pos pos=self._pos
if pos < 39: return [] if pos < 39: return []
...@@ -1242,7 +1262,6 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1242,7 +1262,6 @@ class FileStorage(BaseStorage.BaseStorage,
oseek=ofile.seek oseek=ofile.seek
write=ofile.write write=ofile.write
tappend=tindex.append
index_get=index.get index_get=index.get
vindex_get=vindex.get vindex_get=vindex.get
pindex_get=pindex.get pindex_get=pindex.get
...@@ -1390,7 +1409,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1390,7 +1409,7 @@ class FileStorage(BaseStorage.BaseStorage,
nvindex[oid]=opos nvindex[oid]=opos
tappend((oid,opos)) tindex[oid]=opos
opos=opos+dlen opos=opos+dlen
pos=pos+dlen pos=pos+dlen
...@@ -1466,10 +1485,8 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1466,10 +1485,8 @@ class FileStorage(BaseStorage.BaseStorage,
_lock_release() _lock_release()
locked=0 locked=0
for oid, p in tindex: index.update(tindex) # Record the position
index[oid]=p # Record the position tindex.clear()
del tindex[:]
# Now, maybe we need to hack or delete the transaction # Now, maybe we need to hack or delete the transaction
otl=opos-otpos otl=opos-otpos
...@@ -1554,7 +1571,6 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos): ...@@ -1554,7 +1571,6 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
read=file.read read=file.read
write=file.write write=file.write
tappend=tindex.append
index_get=index.get index_get=index.get
vindex_get=vindex.get vindex_get=vindex.get
...@@ -1618,7 +1634,7 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos): ...@@ -1618,7 +1634,7 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
pv=p64(vindex_get(version, 0)) pv=p64(vindex_get(version, 0))
if status != 'u': vindex[version]=opos if status != 'u': vindex[version]=opos
tappend((oid,opos)) tindex[oid]=opos
if plen: p=read(plen) if plen: p=read(plen)
else: else:
...@@ -1655,11 +1671,10 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos): ...@@ -1655,11 +1671,10 @@ def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
# skip the (intentionally redundant) transaction length # skip the (intentionally redundant) transaction length
pos=pos+8 pos=pos+8
if status != 'u': if status != 'u':
for oid, p in tindex: index.update(tindex) # Record the position
index[oid]=p # Record the position
del tindex[:] tindex.clear()
write(stl) write(stl)
opos=opos+8 opos=opos+8
...@@ -1683,7 +1698,7 @@ def recover(file_name): ...@@ -1683,7 +1698,7 @@ def recover(file_name):
file=open(file_name, 'r+b') file=open(file_name, 'r+b')
index={} index={}
vindex={} vindex={}
tindex=[] tindex={}
pos, oid, tid = read_index( pos, oid, tid = read_index(
file, file_name, index, vindex, tindex, recover=1) file, file_name, index, vindex, tindex, recover=1)
...@@ -1723,7 +1738,6 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8, ...@@ -1723,7 +1738,6 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
index_get=index.get index_get=index.get
vndexpos=vindex.get vndexpos=vindex.get
tappend=tindex.append
pos=start pos=start
seek(start) seek(start)
...@@ -1814,7 +1828,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8, ...@@ -1814,7 +1828,7 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
plen=U64(splen) plen=U64(splen)
dlen=42+(plen or 8) dlen=42+(plen or 8)
tappend((oid,pos)) tindex[oid]=pos
if vlen: if vlen:
dlen=dlen+(16+vlen) dlen=dlen+(16+vlen)
...@@ -1854,11 +1868,11 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8, ...@@ -1854,11 +1868,11 @@ def read_index(file, name, index, vindex, tindex, stop='\377'*8,
name, pos) name, pos)
pos=pos+8 pos=pos+8
for oid, p in tindex: for oid, p in tindex.items():
maxoid=max(maxoid,oid) maxoid=max(maxoid,oid)
index[oid]=p # Record the position index[oid]=p # Record the position
del tindex[:] tindex.clear()
return pos, maxoid, ltid return pos, maxoid, ltid
...@@ -2099,7 +2113,7 @@ class RecordIterator(Iterator): ...@@ -2099,7 +2113,7 @@ class RecordIterator(Iterator):
return r return r
raise IndexError, index raise IndexError, index
class Record: class Record:
"""An abstract database record """An abstract database record
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment