Commit cf9f715d authored by Jim Fulton's avatar Jim Fulton

The FileStorage iterator now handles large files better. Whenm

iteratng from a starting transaction near the end of the file, the
iterator will scan backward from the end of the file to find the
starting point.
parent 66a5e06a
...@@ -42,6 +42,11 @@ New Features ...@@ -42,6 +42,11 @@ New Features
- As a small convenience (mainly for tests), you can now specify - As a small convenience (mainly for tests), you can now specify
initial data as a string argument to the Blob constructor. initial data as a string argument to the Blob constructor.
- The FileStorage iterator now handles large files better. Whenm
iteratng from a starting transaction near the end of the file, the
iterator will scan backward from the end of the file to find the
starting point.
3.9.0a8 (2008-12-15) 3.9.0a8 (2008-12-15)
==================== ====================
...@@ -54,7 +59,7 @@ New Features ...@@ -54,7 +59,7 @@ New Features
blob-cache-size * (100 - blob-cache-size-check) / 100 blob-cache-size * (100 - blob-cache-size-check) / 100
The makes it far more likely (but doesn't guarantee) that the blob The makes it far more likely (but doesn't guarantee) that the blob
cache size will remain under the maximum. cache size will remain under the maximum.
The blob-cache-size check was reduced to 10%. The blob-cache-size check was reduced to 10%.
...@@ -79,7 +84,7 @@ New Features ...@@ -79,7 +84,7 @@ New Features
cache will periodically be reduced to the target size. cache will periodically be reduced to the target size.
The client blob directory layout has changed. If you have existing The client blob directory layout has changed. If you have existing
non-shared blob directories, you will have to remove them. non-shared blob directories, you will have to remove them.
Bugs Fixed Bugs Fixed
---------- ----------
...@@ -119,7 +124,7 @@ New Features ...@@ -119,7 +124,7 @@ New Features
you would otherwise pass to ZEO.ClientStorage.ClientStorage:: you would otherwise pass to ZEO.ClientStorage.ClientStorage::
import ZEO import ZEO
db = ZEO.DB(('some_host', 8200)) db = ZEO.DB(('some_host', 8200))
- Object saves are a little faster - Object saves are a little faster
...@@ -297,7 +302,7 @@ Bugs Fixed ...@@ -297,7 +302,7 @@ Bugs Fixed
- Fixed bug 153316: persistent and BTrees were using `int` - Fixed bug 153316: persistent and BTrees were using `int`
for memory sizes which caused errors on x86_64 Intel Xeon machines for memory sizes which caused errors on x86_64 Intel Xeon machines
(using 64-bit Linux). (using 64-bit Linux).
- Fixed small bug that the Connection.isReadOnly method didn't - Fixed small bug that the Connection.isReadOnly method didn't
work after a savepoint. work after a savepoint.
...@@ -390,7 +395,7 @@ Bugs Fixed: ...@@ -390,7 +395,7 @@ Bugs Fixed:
Bugs Fixed: Bugs Fixed:
- Fixed several bugs that caused ZEO cache corruption when connecting - Fixed several bugs that caused ZEO cache corruption when connecting
to servers. These bugs affected both persistent and non-persistent caches. to servers. These bugs affected both persistent and non-persistent caches.
- Improved the the ZEO client shutdown support to try to - Improved the the ZEO client shutdown support to try to
avoid spurious errors on exit, especially for scripts, such as zeopack. avoid spurious errors on exit, especially for scripts, such as zeopack.
...@@ -416,7 +421,7 @@ Bugs Fixed: ...@@ -416,7 +421,7 @@ Bugs Fixed:
Bugs Fixed: Bugs Fixed:
- The cache used an excessive amount of memory, causing applications - The cache used an excessive amount of memory, causing applications
with large caches to exhaust available memory. with large caches to exhaust available memory.
3.8.1b1 (2008-05-08) 3.8.1b1 (2008-05-08)
==================== ====================
......
...@@ -21,7 +21,7 @@ from persistent.TimeStamp import TimeStamp ...@@ -21,7 +21,7 @@ from persistent.TimeStamp import TimeStamp
from struct import pack, unpack from struct import pack, unpack
from types import StringType from types import StringType
from zc.lockfile import LockFile from zc.lockfile import LockFile
from ZODB.FileStorage.format import CorruptedDataError from ZODB.FileStorage.format import CorruptedError, CorruptedDataError
from ZODB.FileStorage.format import FileStorageFormatter, DataHeader from ZODB.FileStorage.format import FileStorageFormatter, DataHeader
from ZODB.FileStorage.format import TRANS_HDR, TRANS_HDR_LEN from ZODB.FileStorage.format import TRANS_HDR, TRANS_HDR_LEN
from ZODB.FileStorage.format import TxnHeader, DATA_HDR, DATA_HDR_LEN from ZODB.FileStorage.format import TxnHeader, DATA_HDR, DATA_HDR_LEN
...@@ -50,7 +50,6 @@ packed_version = "FS21" ...@@ -50,7 +50,6 @@ packed_version = "FS21"
logger = logging.getLogger('ZODB.FileStorage') logger = logging.getLogger('ZODB.FileStorage')
def panic(message, *data): def panic(message, *data):
logger.critical(message, *data) logger.critical(message, *data)
raise CorruptedTransactionError(message) raise CorruptedTransactionError(message)
...@@ -210,7 +209,7 @@ class FileStorage( ...@@ -210,7 +209,7 @@ class FileStorage(
self.blob_dir = os.path.abspath(blob_dir) self.blob_dir = os.path.abspath(blob_dir)
if create and os.path.exists(self.blob_dir): if create and os.path.exists(self.blob_dir):
ZODB.blob.remove_committed_dir(self.blob_dir) ZODB.blob.remove_committed_dir(self.blob_dir)
self._blob_init(blob_dir) self._blob_init(blob_dir)
zope.interface.alsoProvides(self, zope.interface.alsoProvides(self,
ZODB.interfaces.IBlobStorageRestoreable) ZODB.interfaces.IBlobStorageRestoreable)
...@@ -484,7 +483,7 @@ class FileStorage( ...@@ -484,7 +483,7 @@ class FileStorage(
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
assert not version assert not version
self._lock_acquire() self._lock_acquire()
try: try:
if oid > self._oid: if oid > self._oid:
...@@ -532,7 +531,7 @@ class FileStorage( ...@@ -532,7 +531,7 @@ class FileStorage(
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire() self._lock_acquire()
try: try:
old = self._index_get(oid, 0) old = self._index_get(oid, 0)
...@@ -544,7 +543,7 @@ class FileStorage( ...@@ -544,7 +543,7 @@ class FileStorage(
if oldserial != committed_tid: if oldserial != committed_tid:
raise POSException.ConflictError( raise POSException.ConflictError(
oid=oid, serials=(committed_tid, oldserial)) oid=oid, serials=(committed_tid, oldserial))
pos = self._pos pos = self._pos
here = pos + self._tfile.tell() + self._thl here = pos + self._tfile.tell() + self._thl
self._tindex[oid] = here self._tindex[oid] = here
...@@ -748,7 +747,7 @@ class FileStorage( ...@@ -748,7 +747,7 @@ class FileStorage(
def _finish_finish(self, tid): def _finish_finish(self, tid):
# This is a separate method to allow tests to replace it with # This is a separate method to allow tests to replace it with
# something broken. :) # something broken. :)
self._file.flush() self._file.flush()
if fsync is not None: if fsync is not None:
fsync(self._file.fileno()) fsync(self._file.fileno())
...@@ -825,7 +824,7 @@ class FileStorage( ...@@ -825,7 +824,7 @@ class FileStorage(
# Eek, a later transaction modified the data, but, # Eek, a later transaction modified the data, but,
# maybe it is pointing at the same data we are. # maybe it is pointing at the same data we are.
ctid, cdataptr, cdata = self._undoDataInfo(oid, ipos, tpos) ctid, cdataptr, cdata = self._undoDataInfo(oid, ipos, tpos)
if cdataptr != pos: if cdataptr != pos:
# We aren't sure if we are talking about the same data # We aren't sure if we are talking about the same data
try: try:
...@@ -994,7 +993,7 @@ class FileStorage( ...@@ -994,7 +993,7 @@ class FileStorage(
self.openCommittedBlobFile(h.oid, userial), self.openCommittedBlobFile(h.oid, userial),
open(tmp, 'wb')) open(tmp, 'wb'))
self._blob_storeblob(h.oid, self._tid, tmp) self._blob_storeblob(h.oid, self._tid, tmp)
new = DataHeader(h.oid, self._tid, ipos, otloc, 0, len(p)) new = DataHeader(h.oid, self._tid, ipos, otloc, 0, len(p))
# TODO: This seek shouldn't be necessary, but some other # TODO: This seek shouldn't be necessary, but some other
...@@ -1177,7 +1176,7 @@ class FileStorage( ...@@ -1177,7 +1176,7 @@ class FileStorage(
# Helpers that remove an oid dir or revision file. # Helpers that remove an oid dir or revision file.
handle_file = ZODB.blob.remove_committed handle_file = ZODB.blob.remove_committed
handle_dir = ZODB.blob.remove_committed_dir handle_dir = ZODB.blob.remove_committed_dir
# Fist step: move or remove oids or revisions # Fist step: move or remove oids or revisions
for line in open(os.path.join(self.blob_dir, '.removed')): for line in open(os.path.join(self.blob_dir, '.removed')):
line = line.strip().decode('hex') line = line.strip().decode('hex')
...@@ -1191,10 +1190,10 @@ class FileStorage( ...@@ -1191,10 +1190,10 @@ class FileStorage(
handle_dir(path) handle_dir(path)
maybe_remove_empty_dir_containing(path) maybe_remove_empty_dir_containing(path)
continue continue
if len(line) != 16: if len(line) != 16:
raise ValueError("Bad record in ", self.blob_dir, '.removed') raise ValueError("Bad record in ", self.blob_dir, '.removed')
oid, tid = line[:8], line[8:] oid, tid = line[:8], line[8:]
path = fshelper.getBlobFilename(oid, tid) path = fshelper.getBlobFilename(oid, tid)
if not os.path.exists(path): if not os.path.exists(path):
...@@ -1208,7 +1207,7 @@ class FileStorage( ...@@ -1208,7 +1207,7 @@ class FileStorage(
if not self.pack_keep_old: if not self.pack_keep_old:
return return
# Second step, copy remaining files. # Second step, copy remaining files.
for path, dir_names, file_names in os.walk(self.blob_dir): for path, dir_names, file_names in os.walk(self.blob_dir):
for file_name in file_names: for file_name in file_names:
...@@ -1219,7 +1218,7 @@ class FileStorage( ...@@ -1219,7 +1218,7 @@ class FileStorage(
if not os.path.exists(dest): if not os.path.exists(dest):
os.makedirs(dest, 0700) os.makedirs(dest, 0700)
link_or_copy(file_path, old+file_path[lblob_dir:]) link_or_copy(file_path, old+file_path[lblob_dir:])
def iterator(self, start=None, stop=None): def iterator(self, start=None, stop=None):
return FileIterator(self._file_name, start, stop) return FileIterator(self._file_name, start, stop)
...@@ -1244,7 +1243,7 @@ class FileStorage( ...@@ -1244,7 +1243,7 @@ class FileStorage(
for trans in FileIterator(self._file_name, pos=pos)] for trans in FileIterator(self._file_name, pos=pos)]
finally: finally:
self._lock_release() self._lock_release()
def lastTid(self, oid): def lastTid(self, oid):
"""Return last serialno committed for object oid. """Return last serialno committed for object oid.
...@@ -1641,16 +1640,23 @@ class FileIterator(FileStorageFormatter): ...@@ -1641,16 +1640,23 @@ class FileIterator(FileStorageFormatter):
assert isinstance(filename, str) assert isinstance(filename, str)
file = open(filename, 'rb') file = open(filename, 'rb')
self._file = file self._file = file
self._file_name = filename
if file.read(4) != packed_version: if file.read(4) != packed_version:
raise FileStorageFormatError(file.name) raise FileStorageFormatError(file.name)
file.seek(0,2) file.seek(0,2)
self._file_size = file.tell() self._file_size = file.tell()
if (pos < 4) or pos > self._file_size:
raise ValueError("Given position is greater than the file size",
pos, self._file_size)
self._pos = pos self._pos = pos
assert start is None or isinstance(start, str) assert start is None or isinstance(start, str)
assert stop is None or isinstance(stop, str) assert stop is None or isinstance(stop, str)
self._start = start
self._stop = stop
if start: if start:
if self._file_size <= 4:
return
self._skip_to_start(start) self._skip_to_start(start)
self._stop = stop
def __len__(self): def __len__(self):
# Define a bogus __len__() to make the iterator work # Define a bogus __len__() to make the iterator work
...@@ -1674,32 +1680,87 @@ class FileIterator(FileStorageFormatter): ...@@ -1674,32 +1680,87 @@ class FileIterator(FileStorageFormatter):
file.close() file.close()
def _skip_to_start(self, start): def _skip_to_start(self, start):
# Scan through the transaction records doing almost no sanity
# checks.
file = self._file file = self._file
read = file.read pos1 = self._pos
seek = file.seek file.seek(pos1)
tid1 = file.read(8)
if len(tid1) < 8:
raise CorruptedError("Couldn't read tid.")
if start < tid1:
pos2 = pos1
tid2 = tid1
file.seek(4)
tid1 = file.read(8)
if start <= tid1:
self._pos = 4
return
pos1 = 4
else:
if start == tid1:
return
# Try to read the last transaction. We could be unlucky and
# opened the file while committing a transaction. In that
# case, we'll just scan from the beginning if the file is
# small enough, otherwise we'll fail.
file.seek(self._file_size-8)
l = u64(file.read(8))
if not (l + 12 <= self._file_size and
self._read_num(self._file_size-l) == l):
if self._file_size < (1<<20):
return self._scan_foreward(start)
raise ValueError("Can't find last transaction in large file")
pos2 = self._file_size-l-8
file.seek(pos2)
tid2 = file.read(8)
if tid2 < tid1:
raise CorruptedError("Tids out of order.")
if tid2 <= start:
if tid2 == start:
self._pos = pos2
else:
self._pos = self._file_size
return
t1 = ZODB.TimeStamp.TimeStamp(tid1).timeTime()
t2 = ZODB.TimeStamp.TimeStamp(tid2).timeTime()
ts = ZODB.TimeStamp.TimeStamp(start).timeTime()
if (ts - t1) < (t2 - ts):
return self._scan_forward(pos1, start)
else:
return self._scan_backward(pos2, start)
def _scan_forward(self, pos, start):
logger.debug("Scan forward %s:%s looking for %r",
self._file_name, pos, start)
file = self._file
while 1: while 1:
seek(self._pos) # Read the transaction record
h = read(16) h = self._read_txn_header(pos)
if len(h) < 16: if h.tid >= start:
self._pos = pos
return return
tid, stl = unpack(">8s8s", h)
if tid >= start: pos += h.tlen + 8
def _scan_backward(self, pos, start):
logger.debug("Scan backward %s:%s looking for %r",
self._file_name, pos, start)
file = self._file
seek = file.seek
read = file.read
while 1:
pos -= 8
seek(pos)
tlen = ZODB.utils.u64(read(8))
pos -= tlen
h = self._read_txn_header(pos)
if h.tid <= start:
if h.tid == start:
self._pos = pos
else:
self._pos = pos + tlen + 8
return return
tl = u64(stl)
try:
self._pos += tl + 8
except OverflowError:
self._pos = long(self._pos) + tl + 8
if __debug__:
# Sanity check
seek(self._pos - 8, 0)
rtl = read(8)
if rtl != stl:
pos = file.tell() - 8
panic("%s has inconsistent transaction length at %s "
"(%s != %s)", file.name, pos, u64(rtl), u64(stl))
# Iterator protocol # Iterator protocol
def __iter__(self): def __iter__(self):
......
FileStorage-specific iterator tests
===================================
The FileStorage iterator has some special features that deserve some
special tests.
We'll make some assertions about time, so we'll take it over:
>>> now = 1229959248
>>> def faux_time():
... global now
... now += 0.1
... return now
>>> import time
>>> time_time = time.time
>>> time.time = faux_time
Commit a bunch of transactions:
>>> import ZODB.FileStorage, transaction
>>> db = ZODB.DB('data.fs')
>>> tids = [db.storage.lastTransaction()]
>>> poss = [db.storage._pos]
>>> conn = db.open()
>>> for i in range(100):
... conn.root()[i] = conn.root().__class__()
... transaction.commit()
... tids.append(db.storage.lastTransaction())
... poss.append(db.storage._pos)
Deciding where to start
-----------------------
By default, we start at the beginning:
>>> it = ZODB.FileStorage.FileIterator('data.fs')
>>> it.next().tid == tids[0]
True
The file iterator has an optimization to deal with large files. It
can serarch from either the front or the back of the file, depending
on the starting transaction given. To see this, we'll turn on debug
logging:
>>> import logging, sys
>>> old_log_level = logging.getLogger().getEffectiveLevel()
>>> logging.getLogger().setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler(sys.stdout)
>>> logging.getLogger().addHandler(handler)
If we specify a start transaction, we'll scan forward or backward, as
seems best and set the next record to that:
>>> it = ZODB.FileStorage.FileIterator('data.fs', tids[0])
>>> it.next().tid == tids[0]
True
>>> it = ZODB.FileStorage.FileIterator('data.fs', tids[1])
Scan forward data.fs:4 looking for '\x03z\xbd\xd8\xd06\x9c\xcc'
>>> it.next().tid == tids[1]
True
>>> it = ZODB.FileStorage.FileIterator('data.fs', tids[30])
Scan forward data.fs:4 looking for '\x03z\xbd\xd8\xdc\x96.\xcc'
>>> it.next().tid == tids[30]
True
>>> it = ZODB.FileStorage.FileIterator('data.fs', tids[70])
Scan backward data.fs:118274 looking for '\x03z\xbd\xd8\xed\xa7>\xcc'
>>> it.next().tid == tids[70]
True
>>> it = ZODB.FileStorage.FileIterator('data.fs', tids[-2])
Scan backward data.fs:118274 looking for '\x03z\xbd\xd8\xfa\x06\xd0\xcc'
>>> it.next().tid == tids[-2]
True
>>> it = ZODB.FileStorage.FileIterator('data.fs', tids[-1])
>>> it.next().tid == tids[-1]
True
We can also supply a file position. This can speed up finding the
starting point, or just pick up where another iterator left off:
>>> it = ZODB.FileStorage.FileIterator('data.fs', pos=poss[50])
>>> it.next().tid == tids[51]
True
>>> it = ZODB.FileStorage.FileIterator('data.fs', tids[0], pos=4)
>>> it.next().tid == tids[0]
True
>>> it = ZODB.FileStorage.FileIterator('data.fs', tids[-1], pos=poss[-2])
>>> it.next().tid == tids[-1]
True
>>> it = ZODB.FileStorage.FileIterator('data.fs', tids[50], pos=poss[50])
Scan backward data.fs:36542 looking for '\x03z\xbd\xd8\xe5\x1e\xb6\xcc'
>>> it.next().tid == tids[50]
True
>>> it = ZODB.FileStorage.FileIterator('data.fs', tids[49], pos=poss[50])
Scan backward data.fs:36542 looking for '\x03z\xbd\xd8\xe4\xb1|\xcc'
>>> it.next().tid == tids[49]
True
>>> it = ZODB.FileStorage.FileIterator('data.fs', tids[51], pos=poss[50])
>>> it.next().tid == tids[51]
True
>>> logging.getLogger().setLevel(old_log_level)
>>> logging.getLogger().removeHandler(handler)
If a starting transaction is before the first transaction in the file,
then the first transaction is returned.
>>> from ZODB.utils import p64, u64
>>> it = ZODB.FileStorage.FileIterator('data.fs', p64(u64(tids[0])-1))
>>> it.next().tid == tids[0]
True
If it is after the last transaction, then iteration be empty:
>>> it = ZODB.FileStorage.FileIterator('data.fs', p64(u64(tids[-1])+1))
>>> list(it)
[]
Even if we write more transactions:
>>> it = ZODB.FileStorage.FileIterator('data.fs', p64(u64(tids[-1])+1))
>>> for i in range(10):
... conn.root()[i] = conn.root().__class__()
... transaction.commit()
>>> list(it)
[]
.. Cleanup
>>> time.time = time_time
>>> it.close()
>>> db.close()
...@@ -93,7 +93,7 @@ The pack_keep_old constructor argument controls whether a .old file (and .old di ...@@ -93,7 +93,7 @@ The pack_keep_old constructor argument controls whether a .old file (and .old di
def test_suite(): def test_suite():
return unittest.TestSuite(( return unittest.TestSuite((
doctest.DocFileSuite( doctest.DocFileSuite(
'zconfig.txt', 'zconfig.txt', 'iterator.test',
setUp=ZODB.tests.util.setUp, tearDown=ZODB.tests.util.tearDown, setUp=ZODB.tests.util.setUp, tearDown=ZODB.tests.util.tearDown,
), ),
doctest.DocTestSuite( doctest.DocTestSuite(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment