Commit 0e8427c9 authored by Jeremy Hylton's avatar Jeremy Hylton

Break up transactionalUndo() into possibly understandable chunks.

transactionalUndo() does argument checking and locking.  It calls
_transactional_undo(), which finds the right transaction record.  It calls
_txn_undo_write(), which writes the data records.

Add a summary comment above undoLog() that explains how the
transaction_id is created and used.
......@@ -115,7 +115,7 @@
# may have a back pointer to a version record or to a non-version
# record.
#
__version__='$Revision: 1.87 $'[11:-2]
__version__='$Revision: 1.88 $'[11:-2]
import struct, time, os, string, base64, sys
from struct import pack, unpack
......@@ -971,7 +971,6 @@ class FileStorage(BaseStorage.BaseStorage,
self._file.seek(pos+8)
return self._file.read(8)
def _transactionalUndoRecord(self, oid, pos, serial, pre, version):
"""Get the indo information for a data record
......@@ -979,7 +978,6 @@ class FileStorage(BaseStorage.BaseStorage,
version, packed non-version data pointer, and current
position. If the pickle is true, then the data pointer must
be 0, but the pickle can be empty *and* the pointer 0.
"""
copy=1 # Can we just copy a data pointer
......@@ -1035,6 +1033,32 @@ class FileStorage(BaseStorage.BaseStorage,
raise UndoError('Some data were modified by a later transaction')
# undoLog() returns a description dict that includes an id entry.
# The id is opaque to the client, but encodes information that
# uniquely identifies a transaction in the storage. The id is a
# base64 encoded string, where the components of the string are:
# - the transaction id
# - the packed file position of the transaction record
# - the oid of an object modified by the transaction
# The file position is sufficient in most cases, but doesn't work
# if the id is used after a pack and may not work if used with
# replicated storages. If the file position is incorrect, the oid
# can be used for a relatively efficient search for the
# transaction record. FileStorage keeps an index mapping oids to
# file positions, but do notes have a transaction id to file
# offset index. The oid index maps to the most recent revision of
# the object. Transactional undo must follow back pointers until
# it finds the correct transaction record,
# This approach fails if the transaction record has no data
# records. It's not clear if that is possible, but it may be for
# commitVersion and abortVersion.
# The file offset also supports non-transactional undo, which
# won't work after a pack and isn't supported by replicated
# storages.
def undoLog(self, first=0, last=-20, filter=None):
if last < 0:
last = first - last + 1
......@@ -1073,28 +1097,6 @@ class FileStorage(BaseStorage.BaseStorage,
e = loads(read(el))
except:
pass
# We now need an encoded id that isn't dependent on file
# position, because it will break after a pack, and in the
# face of replication, while the transaction and data records
# may be identical (as viewed from the storage interface),
# file positions may be meaningless across replicas.
#
# We'd love to just give the tid, but FS makes it expensive to
# go from tid to transaction record. :( However, if the txn
# has data records, then we can encode the oid of one of the
# objects affected by the txn. Then we can use the index to
# find the current revision of the object, follow a
# back-pointer to find its most-current txn, and then follow
# the txns back until we find a match. Seems like the best we
# can do w/o a persistent tid->filepos mapping.
#
# Note: if the txn has no data records, we're screwed. Punt
# on that for now.
#
# Note that we're still encoding the transaction position
# in the transaction ID in order to support non-transactional
# undo. This can be removed as soon as non-transactional
# undo is removed.
next = read(8)
# next is either the redundant txn length - 8, or an oid
if next == tl:
......@@ -1134,113 +1136,128 @@ class FileStorage(BaseStorage.BaseStorage,
self._lock_acquire()
try:
# As seen in undoLog() below, transaction_id encodes the tid and
# possibly the oid of the first object in the transaction record.
# transaction_id will be of length 16 if there were objects
# affected by the txn, and 8 if there weren't (e.g. abortVersion()
# and commitVersion()). In the latter case, we could still find
# the transaction through an expensive search of the file, but
# we're punting on that for now.
transaction_id = base64.decodestring(transaction_id + '\n')
tid = transaction_id[:8]
oid = transaction_id[16:]
if oid == '' or not self._index.has_key(oid):
# We can't get the position of the transaction easily.
# Note that there is a position encoded in the
# transaction_id at [8:16], but it can't be used reliably
# across multiple file storages and thus breaks
# transactional integrity.
raise UndoError, 'Undoing a non-object affecting transaction'
# Find the file position for the current revision of this object,
# and search back for the beginning of its transaction record
pos = self._index[oid]
ostloc = p64(self._pos)
here = self._pos + (self._tfile.tell() + self._thl)
while 1:
self._file.seek(pos)
h = self._file.read(DATA_HDR_LEN)
doid,serial,prev,tpos,vlen,plen = unpack('>8s8s8s8sH8s', h)
tpos = U64(tpos)
self._file.seek(tpos)
# Read transaction id to see if we've got a match
thistid = self._file.read(8)
if thistid == tid:
break # Yeee ha!
# Keep looking
pos = U64(prev)
if not pos:
# We never found the right transaction
raise UndoError, 'Invalid undo transaction id'
# We're sitting at the transaction we want to undo, but let's move
# the file pointer back to the start of the txn record.
return self._transactional_undo(transaction_id)
finally:
self._lock_release()
def _transactional_undo(self, transaction_id):
# As seen in undoLog() below, transaction_id encodes the tid and
# possibly the oid of the first object in the transaction record.
# transaction_id will be of length 16 if there were objects
# affected by the txn, and 8 if there weren't (e.g. abortVersion()
# and commitVersion()). In the latter case, we could still find
# the transaction through an expensive search of the file, but
# we're punting on that for now.
transaction_id = base64.decodestring(transaction_id + '\n')
tid = transaction_id[:8]
pos = U64(transaction_id[8:16])
# XXX
oid = transaction_id[16:]
if oid == '' or not self._index.has_key(oid):
# We can't get the position of the transaction easily.
# Note that there is a position encoded in the
# transaction_id at [8:16], but it can't be used reliably
# across multiple file storages and thus breaks
# transactional integrity.
raise UndoError, 'Undoing a non-object affecting transaction'
# Find the file position for the current revision of this object,
# and search back for the beginning of its transaction record
pos = self._index[oid]
ostloc = p64(self._pos)
here = self._pos + (self._tfile.tell() + self._thl)
while 1:
self._file.seek(pos)
h = self._file.read(DATA_HDR_LEN)
doid, serial, prev, tpos, vlen, plen = \
unpack('>8s8s8s8sH8s', h)
tpos = U64(tpos)
self._file.seek(tpos)
h = self._file.read(TRANS_HDR_LEN)
if len(h) != TRANS_HDR_LEN or h[:8] != tid:
raise UndoError, 'Invalid undo transaction id'
if h[16] == 'u':
return
if h[16] != ' ':
raise UndoError, 'non-undoable transaction'
tl = U64(h[8:16])
ul, dl, el = struct.unpack(">HHH", h[17:TRANS_HDR_LEN])
tend = tpos + tl
pos = tpos + (TRANS_HDR_LEN + ul + dl + el)
tindex = {}
failures = {} # keep track of failures, cause we may succeed later
failed = failures.has_key
# Read the data records for this transaction
while pos < tend:
self._file.seek(pos)
h = self._file.read(DATA_HDR_LEN)
oid, serial, sprev, stloc, vlen, splen = \
struct.unpack(">8s8s8s8sH8s", h)
if failed(oid):
del failures[oid] # second chance!
plen = U64(splen)
prev = U64(sprev)
if vlen:
dlen = DATA_VERSION_HDR_LEN + vlen + (plen or 8)
self._file.seek(16, 1)
version = self._file.read(vlen)
else:
dlen = DATA_HDR_LEN + (plen or 8)
version = ''
# Read transaction id to see if we've got a match
thistid = self._file.read(8)
if thistid == tid:
break # Yeee ha!
# Keep looking
pos = U64(prev)
if not pos:
# We never found the right transaction
raise UndoError('Invalid undo transaction id')
# We're sitting at the transaction we want to undo, but let's move
# the file pointer back to the start of the txn record.
tindex = self._txn_undo_write(tpos, tid, ostloc, here)
self._tindex.update(tindex)
return tindex.keys()
def _txn_undo_write(self, tpos, tid, ostloc, here):
# a helper function to write the data records for transactional undo
self._file.seek(tpos)
h = self._file.read(TRANS_HDR_LEN)
# XXX jer: don't think the second test is needed at this point
if len(h) != TRANS_HDR_LEN or h[:8] != tid:
raise UndoError('Invalid undo transaction id')
if h[16] == 'u':
return
if h[16] != ' ':
raise UndoError('non-undoable transaction')
tl = U64(h[8:16])
ul, dl, el = struct.unpack(">HHH", h[17:TRANS_HDR_LEN])
tend = tpos + tl
pos = tpos + (TRANS_HDR_LEN + ul + dl + el)
tindex = {}
failures = {} # keep track of failures, cause we may succeed later
failed = failures.has_key
# Read the data records for this transaction
while pos < tend:
self._file.seek(pos)
h = self._file.read(DATA_HDR_LEN)
oid, serial, sprev, stloc, vlen, splen = \
struct.unpack(">8s8s8s8sH8s", h)
if failed(oid):
del failures[oid] # second chance!
plen = U64(splen)
prev = U64(sprev)
if vlen:
dlen = DATA_VERSION_HDR_LEN + vlen + (plen or 8)
self._file.seek(16, 1)
version = self._file.read(vlen)
else:
dlen = DATA_HDR_LEN + (plen or 8)
version = ''
try:
p, prev, v, snv, ipos = self._transactionalUndoRecord(
oid, pos, serial, prev, version)
except UndoError, v:
# Don't fail right away. We may be redeemed later!
failures[oid] = v
try:
p, prev, v, snv, ipos = self._transactionalUndoRecord(
oid, pos, serial, prev, version)
except UndoError, v:
# Don't fail right away. We may be redeemed later!
failures[oid] = v
else:
plen =len(p)
self._tfile.write(pack(">8s8s8s8sH8s",
oid, self._serial, p64(ipos),
ostloc, len(v), p64(plen)))
if v:
vprev=self._tvindex.get(v, 0) or self._vindex.get(v, 0)
self._tfile.write(snv + p64(vprev) + v)
self._tvindex[v] = here
odlen = DATA_VERSION_HDR_LEN + len(v)+(plen or 8)
else:
plen =len(p)
self._tfile.write(pack(">8s8s8s8sH8s",
oid, self._serial, p64(ipos),
ostloc, len(v), p64(plen)))
if v:
vprev=self._tvindex.get(v, 0) or self._vindex.get(v, 0)
self._tfile.write(snv + p64(vprev) + v)
self._tvindex[v]=here
odlen = DATA_VERSION_HDR_LEN + len(v)+(plen or 8)
else:
odlen = DATA_HDR_LEN+(plen or 8)
odlen = DATA_HDR_LEN+(plen or 8)
if p:
self._tfile.write(p)
else:
self._tfile.write(p64(prev))
tindex[oid]=here
here=here+odlen
if p:
self._tfile.write(p)
else:
self._tfile.write(p64(prev))
tindex[oid] = here
here += odlen
pos=pos+dlen
if pos > tend:
raise UndoError, 'non-undoable transaction'
pos=pos+dlen
if pos > tend:
raise UndoError, 'non-undoable transaction'
if failures: raise UndoError(failures)
self._tindex.update(tindex)
return tindex.keys()
if failures:
raise UndoError(failures)
finally: self._lock_release()
return tindex
def versionEmpty(self, version):
if not version:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment