Commit 606639c4 authored by Barry Warsaw's avatar Barry Warsaw

Here is a fix for a problem reported against the standby replicas. In

that environment, it is possible that all the data in the replicas
recoverable through the public api are identical, while the actual
on-disc representation differs.  This bug can be boiled down to doing
the following in "standard" ZODB:

    1. undoLog()
    2. pack()
    3. transactionalUndo()

Previously, undoLog() encoded the file position in the 'id' key of the
undo record, but of course the pack() breaks that file position.  We
fix this by encoding the oid of an object touched in the transaction
instead.  That way, transactionalUndo() can start at the current
revision of the object, and scan back until it can find a transaction
with a matching id to the one we're undoing.

This approach breaks when the transaction we're undoing doesn't touch
any objects, e.g. is an abortVersion() or commitVersion().  By edict,
that's a non-requirement for now.  We could fix that by doing a more
expensive binary search for the matching transaction.

WIBNI we had an index from tids to file positions? :)
parent d9c08573
...@@ -115,7 +115,7 @@ ...@@ -115,7 +115,7 @@
# may have a back pointer to a version record or to a non-version # may have a back pointer to a version record or to a non-version
# record. # record.
# #
__version__='$Revision: 1.83 $'[11:-2] __version__='$Revision: 1.84 $'[11:-2]
import struct, time, os, string, base64, sys import struct, time, os, string, base64, sys
from struct import pack, unpack from struct import pack, unpack
...@@ -1054,12 +1054,39 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1054,12 +1054,39 @@ class FileStorage(BaseStorage.BaseStorage,
self._lock_acquire() self._lock_acquire()
try: try:
transaction_id = base64.decodestring(transaction_id + '==\n') # As seen in undoLog() below, transaction_id encodes the tid and
tid, tpos = transaction_id[:8], U64(transaction_id[8:]) # possibly the oid of the first object in the transaction record.
# transaction_id will be of length 16 if there were objects
# affected by the txn, and 8 if there weren't (e.g. abortVersion()
# and commitVersion()). In the latter case, we could still find
# the transaction through an expensive search of the file, but
# we're punting on that for now.
tid = transaction_id[:8]
oid = transaction_id[8:]
if oid == '' or not self._index.has_key(oid):
raise UndoError, 'Undoing a non-object affecting transaction'
# Find the file position for the current revision of this object,
# and search back for the beginning of its transaction record
pos = self._index[oid]
ostloc = p64(self._pos) ostloc = p64(self._pos)
here = self._pos + (self._tfile.tell() + self._thl) here = self._pos + (self._tfile.tell() + self._thl)
while 1:
self._file.seek(pos)
h = self._file.read(DATA_HDR_LEN)
doid,serial,prev,tpos,vlen,plen = unpack('>8s8s8s8sH8s', h)
tpos = U64(tpos)
self._file.seek(tpos)
# Read transaction id to see if we've got a match
thistid = self._file.read(8)
if thistid == tid:
break # Yeee ha!
# Keep looking
pos = U64(prev)
if not pos:
# We never found the right transaction
raise UndoError, 'Invalid undo transaction id'
# We're sitting at the transaction we want to undo, but let's move
# the file pointer back to the start of the txn record.
self._file.seek(tpos) self._file.seek(tpos)
h = self._file.read(TRANS_HDR_LEN) h = self._file.read(TRANS_HDR_LEN)
if len(h) != TRANS_HDR_LEN or h[:8] != tid: if len(h) != TRANS_HDR_LEN or h[:8] != tid:
...@@ -1130,15 +1157,18 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1130,15 +1157,18 @@ class FileStorage(BaseStorage.BaseStorage,
finally: self._lock_release() finally: self._lock_release()
def undoLog(self, first=0, last=-20, filter=None): def undoLog(self, first=0, last=-20, filter=None):
if last < 0: last=first-last+1 if last < 0:
last=first-last+1
self._lock_acquire() self._lock_acquire()
try: try:
packt=self._packt packt = self._packt
if packt is None: if packt is None:
raise UndoError( raise UndoError(
'Undo is currently disabled for database maintenance.<p>') 'Undo is currently disabled for database maintenance.<p>')
pos=self._pos pos=self._pos
if pos < 39: return [] # BAW: Why 39 please? This makes no sense (see also below).
if pos < 39:
return []
file=self._file file=self._file
seek=file.seek seek=file.seek
read=file.read read=file.read
...@@ -1154,24 +1184,54 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1154,24 +1184,54 @@ class FileStorage(BaseStorage.BaseStorage,
seek(pos) seek(pos)
h=read(TRANS_HDR_LEN) h=read(TRANS_HDR_LEN)
tid, tl, status, ul, dl, el = unpack(">8s8scHHH", h) tid, tl, status, ul, dl, el = unpack(">8s8scHHH", h)
if tid < packt: break if tid < packt:
if status != ' ': continue break
if status != ' ':
continue
u=ul and read(ul) or '' u=ul and read(ul) or ''
d=dl and read(dl) or '' d=dl and read(dl) or ''
d={'id': encode(tid+p64(pos))[:22], e = {}
'time': TimeStamp(tid).timeTime(),
'user_name': u, 'description': d}
if el: if el:
try: try:
e=loads(read(el)) e = loads(read(el))
d.update(e) except:
except: pass pass
# We now need an encoded id that isn't dependent on file
# position, because it will break after a pack, and in the
# face of replication, while the transaction and data records
# may be identical (as viewed from the storage interface),
# file positions may be meaningless across replicas.
#
# We'd love to just give the tid, but FS makes it expensive to
# go from tid to transaction record. :( However, if the txn
# has data records, then we can encode the oid of one of the
# objects affected by the txn. Then we can use the index to
# find the current revision of the object, follow a
# back-pointer to find its most-current txn, and then follow
# the txns back until we find a match. Seems like the best we
# can do w/o a persistent tid->filepos mapping.
#
# Note: if the txn has no data records, we're screwed. Punt
# on that for now.
next = read(8)
# next is either the redundant txn length - 8, or an oid
if next == tl:
# There were no objects in this txn
id = tid
else:
id = tid + next
d={'id': id,
'time': TimeStamp(tid).timeTime(),
'user_name': u,
'description': d}
d.update(e)
if filter is None or filter(d): if filter is None or filter(d):
if i >= first: append(d) if i >= first:
append(d)
i=i+1 i=i+1
return r return r
finally: self._lock_release() finally:
self._lock_release()
def versionEmpty(self, version): def versionEmpty(self, version):
if not version: if not version:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment