Commit 2b0c9aa4 authored by Jeremy Hylton's avatar Jeremy Hylton

Extend iterator() to include hint about backpointers.

The Record() object now has a data_txn attribute that is either None
or the id of the transaction that contains the data used by the
current record.  Example: When transactionalUndo() modifies an object,
it typical creates a new data record that points at the transaction
before the undo.  The new record contains the same logical data as the
record it refers to.  (For consistency purposes, this is a stronger
claim than that the pickles in two different data records are the
same.)

Add a test of the new iterator() feature in TransactionalUndoStorage.
parent 0b2d3637
......@@ -115,7 +115,7 @@
# may have a back pointer to a version record or to a non-version
# record.
#
__version__='$Revision: 1.108 $'[11:-2]
__version__='$Revision: 1.109 $'[11:-2]
import base64
from cPickle import Pickler, Unpickler, loads
......@@ -2098,18 +2098,27 @@ def _loadBack_impl(file, oid, back):
if vlen:
file.seek(vlen + 16, 1)
if plen != z64:
return file.read(U64(plen)), serial, old
return file.read(U64(plen)), serial, old, tloc
back = file.read(8) # We got a back pointer!
def _loadBack(file, oid, back):
data, serial, old = _loadBack_impl(file, oid, back)
data, serial, old, tloc = _loadBack_impl(file, oid, back)
return data, serial
def _loadBackPOS(file, oid, back):
"""Return position of data record for backpointer."""
data, serial, old = _loadBack_impl(file, oid, back)
data, serial, old, tloc = _loadBack_impl(file, oid, back)
return old
def _loadBackTxn(file, oid, back):
"""Return data, serial, and txn id for backpointer."""
data, serial, old, stloc = _loadBack_impl(file, oid, back)
tloc = U64(stloc)
file.seek(tloc)
h = file.read(TRANS_HDR_LEN)
tid = h[:8]
return data, serial, tid
def _truncate(file, name, pos):
seek=file.seek
seek(0,2)
......@@ -2346,6 +2355,7 @@ class RecordIterator(Iterator, BaseStorage.TransactionRecord):
break
self._pos = pos + dlen
tid = None
if plen:
p = self._file.read(plen)
else:
......@@ -2359,9 +2369,9 @@ class RecordIterator(Iterator, BaseStorage.TransactionRecord):
# this.
p = None
else:
p = _loadBack(self._file, oid, p)[0]
p, _s, tid = _loadBackTxn(self._file, oid, p)
r = Record(oid, serial, version, p)
r = Record(oid, serial, version, p, tid)
return r
......@@ -2370,7 +2380,7 @@ class RecordIterator(Iterator, BaseStorage.TransactionRecord):
class Record(BaseStorage.DataRecord):
"""An abstract database record."""
def __init__(self, *args):
self.oid, self.serial, self.version, self.data = args
self.oid, self.serial, self.version, self.data, self.data_txn = args
class UndoSearch:
......
......@@ -2,13 +2,14 @@
Any storage that supports transactionalUndo() must pass these tests.
"""
from __future__ import nested_scopes
import time
import types
from ZODB import POSException
from ZODB.Transaction import Transaction
from ZODB.referencesf import referencesf
from ZODB.utils import u64
from ZODB.utils import u64, p64
from ZODB import DB
from Persistence import Persistent
......@@ -57,6 +58,17 @@ class TransactionalUndoStorage:
newrevs[oid] = self._transaction_newserial(oid)
return newrevs
def _iterate(self):
"""Iterate over the storage in its final state."""
# This is testing that the iterator() code works correctly.
# The hasattr() guards against ZEO, which doesn't support iterator.
if not hasattr(self._storage, "iterator"):
return
iter = self._storage.iterator()
for txn in iter:
for rec in txn:
pass
def checkSimpleTransactionalUndo(self):
eq = self.assertEqual
oid = self._storage.new_oid()
......@@ -117,6 +129,7 @@ class TransactionalUndoStorage:
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(23))
self._iterate()
def checkUndoCreationBranch1(self):
eq = self.assertEqual
......@@ -147,6 +160,7 @@ class TransactionalUndoStorage:
eq(len(oids), 1)
eq(oids[0], oid)
self.assertRaises(KeyError, self._storage.load, oid, '')
self._iterate()
def checkUndoCreationBranch2(self):
eq = self.assertEqual
......@@ -178,6 +192,7 @@ class TransactionalUndoStorage:
eq(oids[0], oid)
data, revid = self._storage.load(oid, '')
eq(zodb_unpickle(data), MinPO(12))
self._iterate()
def checkTwoObjectUndo(self):
eq = self.assertEqual
......@@ -231,6 +246,7 @@ class TransactionalUndoStorage:
eq(zodb_unpickle(data), MinPO(31))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(51))
self._iterate()
def checkTwoObjectUndoAtOnce(self):
# Convenience
......@@ -301,6 +317,7 @@ class TransactionalUndoStorage:
eq(zodb_unpickle(data), MinPO(32))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(52))
self._iterate()
def checkTwoObjectUndoAgain(self):
eq = self.assertEqual
......@@ -371,6 +388,7 @@ class TransactionalUndoStorage:
eq(zodb_unpickle(data), MinPO(33))
data, revid2 = self._storage.load(oid2, '')
eq(zodb_unpickle(data), MinPO(54))
self._iterate()
def checkNotUndoable(self):
......@@ -428,6 +446,7 @@ class TransactionalUndoStorage:
self._storage.transactionalUndo,
tid, t)
self._storage.tpc_abort(t)
self._iterate()
def checkTransactionalUndoAfterPack(self):
eq = self.assertEqual
......@@ -466,6 +485,7 @@ class TransactionalUndoStorage:
data, revid = self._storage.load(oid, '')
# The object must now be at the second state
eq(zodb_unpickle(data), MinPO(52))
self._iterate()
def checkTransactionalUndoAfterPackWithObjectUnlinkFromRoot(self):
eq = self.assertEqual
......@@ -521,3 +541,92 @@ class TransactionalUndoStorage:
eq(o1.obj, o2)
eq(o1.obj.obj, o3)
self._iterate()
def checkTransactionalUndoIterator(self):
# check that data_txn set in iterator makes sense
if not hasattr(self._storage, "iterator"):
return
s = self._storage
BATCHES = 4
OBJECTS = 4
orig = []
for i in range(BATCHES):
t = Transaction()
tid = p64(i)
s.tpc_begin(t, tid)
for j in range(OBJECTS):
oid = s.new_oid()
obj = MinPO(i * OBJECTS + j)
revid = s.store(oid, None, zodb_pickle(obj), '', t)
orig.append((tid, oid, revid))
s.tpc_vote(t)
s.tpc_finish(t)
i = 0
for tid, oid, revid in orig:
self._dostore(oid, revid=revid, data=MinPO(revid),
description="update %s" % i)
# Undo the OBJECTS transactions that modified objects created
# in the ith original transaction.
def undo(i):
info = s.undoInfo()
t = Transaction()
s.tpc_begin(t)
base = i * OBJECTS + i
for j in range(OBJECTS):
tid = info[base + j]['id']
s.transactionalUndo(tid, t)
s.tpc_vote(t)
s.tpc_finish(t)
for i in range(BATCHES):
undo(i)
# There are now (2 + OBJECTS) * BATCHES transactions:
# BATCHES original transactions, followed by
# OBJECTS * BATCHES modifications, followed by
# BATCHES undos
iter = s.iterator()
offset = 0
eq = self.assertEqual
for i in range(BATCHES):
txn = iter[offset]
offset += 1
tid = p64(i)
eq(txn.tid, tid)
L1 = [(rec.oid, rec.serial, rec.data_txn) for rec in txn]
L2 = [(oid, revid, None) for _tid, oid, revid in orig
if _tid == tid]
eq(L1, L2)
for i in range(BATCHES * OBJECTS):
txn = iter[offset]
offset += 1
eq(len([rec for rec in txn if rec.data_txn is None]), 1)
for i in range(BATCHES):
txn = iter[offset]
offset += 1
# The undos are performed in reverse order.
otid = p64(BATCHES - 1 - i)
L1 = [(rec.oid, rec.data_txn) for rec in txn]
L2 = [(oid, otid) for _tid, oid, revid in orig
if _tid == otid]
L1.sort()
L2.sort()
eq(L1, L2)
self.assertRaises(IndexError, iter.__getitem__, offset)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment