Commit 4f4b64fc authored by Jim Fulton's avatar Jim Fulton

Added direct blob support to FileStorage. This is to provide more

efficient packing (and undo).
parent ce274a89
......@@ -22,6 +22,14 @@ New Features
XXX There are known issues with this implementation that need to be
sorted out before it is "released".
3.9.0a9 (2008-12-??)
====================
New Features
------------
- FileStorage now supports blobs directly.
3.9.0a8 (2008-12-15)
====================
......
This diff is collapsed.
......@@ -24,16 +24,15 @@ from the revision of the root at that time or if it is reachable from
a backpointer after that time.
"""
import os
from ZODB.serialize import referencesf
from ZODB.FileStorage.format import DataHeader, TRANS_HDR_LEN
from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
from ZODB.utils import p64, u64, z64
from ZODB.fsIndex import fsIndex
from ZODB.FileStorage.format import FileStorageFormatter, CorruptedDataError
from ZODB.FileStorage.format import DataHeader, TRANS_HDR_LEN
import ZODB.POSException
import logging
import os
import ZODB.blob
import ZODB.fsIndex
import ZODB.POSException
logger = logging.getLogger(__name__)
......@@ -147,7 +146,7 @@ class PackCopier(FileStorageFormatter):
class GC(FileStorageFormatter):
def __init__(self, file, eof, packtime, gc):
def __init__(self, file, eof, packtime, gc, referencesf):
self._file = file
self._name = file.name
self.eof = eof
......@@ -155,7 +154,9 @@ class GC(FileStorageFormatter):
self.gc = gc
# packpos: position of first txn header after pack time
self.packpos = None
self.oid2curpos = fsIndex() # maps oid to current data record position
# {oid -> current data record position}:
self.oid2curpos = ZODB.fsIndex.fsIndex()
# The set of reachable revisions of each object.
#
......@@ -166,12 +167,14 @@ class GC(FileStorageFormatter):
# second is a dictionary mapping objects to lists of
# positions; it is used to handle the same number of objects
# for which we must keep multiple revisions.
self.reachable = fsIndex()
self.reachable = ZODB.fsIndex.fsIndex()
self.reach_ex = {}
# keep ltid for consistency checks during initial scan
self.ltid = z64
self.referencesf = referencesf
def isReachable(self, oid, pos):
"""Return 1 if revision of `oid` at `pos` is reachable."""
......@@ -319,7 +322,7 @@ class GC(FileStorageFormatter):
while dh.back:
dh = self._read_data_header(dh.back)
if dh.plen:
return referencesf(self._file.read(dh.plen))
return self.referencesf(self._file.read(dh.plen))
else:
return []
......@@ -332,7 +335,16 @@ class FileStoragePacker(FileStorageFormatter):
# current_size is the storage's _pos. All valid data at the start
# lives before that offset (there may be a checkpoint transaction in
# progress after it).
def __init__(self, path, stop, la, lr, cla, clr, current_size, gc=True):
def __init__(self, storage, referencesf, stop, gc=True):
self._storage = storage
if storage.blob_dir:
self.pack_blobs = True
self.blob_removed = open(
os.path.join(storage.blob_dir, '.removed'), 'w')
else:
self.pack_blobs = False
path = storage._file.name
self._name = path
# We open our own handle on the storage so that much of pack can
# proceed in parallel. It's important to close this file at every
......@@ -342,24 +354,24 @@ class FileStoragePacker(FileStorageFormatter):
self._path = path
self._stop = stop
self.locked = False
self.file_end = current_size
self.file_end = storage.getSize()
self.gc = GC(self._file, self.file_end, self._stop, gc)
self.gc = GC(self._file, self.file_end, self._stop, gc, referencesf)
# The packer needs to acquire the parent's commit lock
# during the copying stage, so the two sets of lock acquire
# and release methods are passed to the constructor.
self._lock_acquire = la
self._lock_release = lr
self._commit_lock_acquire = cla
self._commit_lock_release = clr
self._lock_acquire = storage._lock_acquire
self._lock_release = storage._lock_release
self._commit_lock_acquire = storage._commit_lock_acquire
self._commit_lock_release = storage._commit_lock_release
# The packer will use several indexes.
# index: oid -> pos
# tindex: oid -> pos, for current txn
# oid2tid: not used by the packer
self.index = fsIndex()
self.index = ZODB.fsIndex.fsIndex()
self.tindex = {}
self.oid2tid = {}
self.toid2tid = {}
......@@ -465,18 +477,6 @@ class FileStoragePacker(FileStorageFormatter):
return pos, new_pos
def fetchBackpointer(self, oid, back):
"""Return data and refs backpointer `back` to object `oid.
If `back` is 0 or ultimately resolves to 0, return None
and None. In this case, the transaction undoes the object
creation.
"""
if back == 0:
return None
data, tid = self._loadBackTxn(oid, back, 0)
return data
def copyDataRecords(self, pos, th):
"""Copy any current data records between pos and tend.
......@@ -492,8 +492,24 @@ class FileStoragePacker(FileStorageFormatter):
while pos < tend:
h = self._read_data_header(pos)
if not self.gc.isReachable(h.oid, pos):
if self.pack_blobs:
# We need to find out if this is a blob, so get the data:
if h.plen:
data = self._file.read(h.plen)
else:
data = self.fetchDataViaBackpointer(h.oid, h.back)
if data and ZODB.blob.is_blob_record(data):
# We need to remove the blob record. Maybe we
# need to remove oid:
if h.oid not in self.gc.reachable:
self.blob_removed.write(h.oid.encode('hex')+'\n')
else:
self.blob_removed.write(
(h.oid+h.tid).encode('hex')+'\n')
pos += h.recordlen()
continue
pos += h.recordlen()
# If we are going to copy any data, we need to copy
......@@ -510,16 +526,25 @@ class FileStoragePacker(FileStorageFormatter):
if h.plen:
data = self._file.read(h.plen)
else:
# If a current record has a backpointer, fetch
# refs and data from the backpointer. We need
# to write the data in the new record.
data = self.fetchBackpointer(h.oid, h.back)
data = self.fetchDataViaBackpointer(h.oid, h.back)
self.writePackedDataRecord(h, data, new_tpos)
new_pos = self._tfile.tell()
return new_tpos, pos
def fetchDataViaBackpointer(self, oid, back):
"""Return the data for oid via backpointer back
If `back` is 0 or ultimately resolves to 0, return None.
In this case, the transaction undoes the object
creation.
"""
if back == 0:
return None
data, tid = self._loadBackTxn(oid, back, 0)
return data
def writePackedDataRecord(self, h, data, new_tpos):
# Update the header to reflect current information, then write
# it to the output file.
......@@ -575,7 +600,7 @@ class FileStoragePacker(FileStorageFormatter):
if h.plen:
data = self._file.read(h.plen)
else:
data = self.fetchBackpointer(h.oid, h.back)
data = self.fetchDataViaBackpointer(h.oid, h.back)
if h.back:
prev_txn = self.getTxnFromData(h.oid, h.back)
......
......@@ -20,17 +20,34 @@ class IFileStoragePacker(zope.interface.Interface):
The new file will have the same name as the old file with
'.pack' appended. (The packer can get the old file name via
storage._file.name.)
storage._file.name.) If blobs are supported, if the storages
blob_dir attribute is not None or empty, then a .removed file
most be created in the blob directory. This file contains of
the form:
(oid+serial).encode('hex')+'\n'
or, of the form:
oid.encode('hex')+'\n'
If packing is unnecessary, or would not change the file, then
None is returned, otherwise a tule is returned with:
no pack or removed files are created None is returned,
otherwise a tuple is returned with:
- the size of the packed file, and
- the packed index
If and only if packing was necessary (non-None) and there was
no error, then the commit lock must be acquired.
no error, then the commit lock must be acquired. In addition,
it is up to FileStorage to:
- Rename the .pack file, and
- process the blob_dir/.removed file by removing the blobs
corresponding to the file records.
"""
class IFileStorage(zope.interface.Interface):
......
......@@ -483,6 +483,14 @@ class FilesystemHelper:
continue
yield oid, path
class NoBlobsFileSystemHelper:
@property
def temp_dir(self):
raise TypeError("Blobs are not supported")
getPathForOID = getBlobFilenamem = temp_dir
class BlobStorageError(Exception):
"""The blob storage encountered an invalid state."""
......@@ -575,53 +583,31 @@ LAYOUTS['lawn'] = LawnLayout()
class BlobStorageMixin(object):
"""A mix-in to help storages support blobssupport blobs."""
zope.interface.implements(ZODB.interfaces.IBlobStorage)
def __init__(self, blob_dir, layout='automatic'):
def _blob_init(self, blob_dir, layout='automatic'):
# XXX Log warning if storage is ClientStorage
self.fshelper = FilesystemHelper(blob_dir, layout)
self.fshelper.create()
self.fshelper.checkSecure()
self.dirty_oids = []
def temporaryDirectory(self):
return self.fshelper.temp_dir
@non_overridable
def _storeblob(self, oid, serial, blobfilename):
self._lock_acquire()
try:
self.fshelper.getPathForOID(oid, create=True)
targetname = self.fshelper.getBlobFilename(oid, serial)
rename_or_copy_blob(blobfilename, targetname)
# if oid already in there, something is really hosed.
# The underlying storage should have complained anyway
self.dirty_oids.append((oid, serial))
finally:
self._lock_release()
@non_overridable
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
"""Stores data that has a BLOB attached."""
assert not version, "Versions aren't supported."
serial = self.store(oid, oldserial, data, '', transaction)
self._storeblob(oid, serial, blobfilename)
return self._tid
def _blob_init_no_blobs(self):
self.fshelper = NoBlobsFileSystemHelper()
self.dirty_oids = []
@non_overridable
def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
transaction):
"""Write blob data already committed in a separate database
def _blob_tpc_abort(self):
"""Blob cleanup to be called from subclass tpc_abort
"""
self.restore(oid, serial, data, '', prev_txn, transaction)
self._storeblob(oid, serial, blobfilename)
while self.dirty_oids:
oid, serial = self.dirty_oids.pop()
clean = self.fshelper.getBlobFilename(oid, serial)
if os.path.exists(clean):
remove_committed(clean)
return self._tid
def _blob_tpc_finish(self):
"""Blob cleanup to be called from subclass tpc_finish
"""
self.dirty_oids = []
@non_overridable
def copyTransactionsFrom(self, other):
for trans in other.iterator():
self.tpc_begin(trans, trans.tid, trans.status)
......@@ -646,23 +632,6 @@ class BlobStorageMixin(object):
self.tpc_vote(trans)
self.tpc_finish(trans)
@non_overridable
def blob_tpc_finish(self):
"""Blob cleanup to be called from subclass tpc_finish
"""
self.dirty_oids = []
@non_overridable
def blob_tpc_abort(self):
"""Blob cleanup to be called from subclass tpc_abort
"""
while self.dirty_oids:
oid, serial = self.dirty_oids.pop()
clean = self.fshelper.getBlobFilename(oid, serial)
if os.path.exists(clean):
remove_committed(clean)
@non_overridable
def loadBlob(self, oid, serial):
"""Return the filename where the blob file can be found.
"""
......@@ -671,7 +640,6 @@ class BlobStorageMixin(object):
raise POSKeyError("No blob file", oid, serial)
return filename
@non_overridable
def openCommittedBlobFile(self, oid, serial, blob=None):
blob_filename = self.loadBlob(oid, serial)
if blob is None:
......@@ -679,8 +647,42 @@ class BlobStorageMixin(object):
else:
return BlobFile(blob_filename, 'r', blob)
def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
transaction):
"""Write blob data already committed in a separate database
"""
self.restore(oid, serial, data, '', prev_txn, transaction)
self._blob_storeblob(oid, serial, blobfilename)
return self._tid
def _blob_storeblob(self, oid, serial, blobfilename):
self._lock_acquire()
try:
self.fshelper.getPathForOID(oid, create=True)
targetname = self.fshelper.getBlobFilename(oid, serial)
rename_or_copy_blob(blobfilename, targetname)
# if oid already in there, something is really hosed.
# The underlying storage should have complained anyway
self.dirty_oids.append((oid, serial))
finally:
self._lock_release()
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
"""Stores data that has a BLOB attached."""
assert not version, "Versions aren't supported."
serial = self.store(oid, oldserial, data, '', transaction)
self._blob_storeblob(oid, serial, blobfilename)
return self._tid
def temporaryDirectory(self):
return self.fshelper.temp_dir
class BlobStorage(SpecificationDecoratorBase, BlobStorageMixin):
class BlobStorage(SpecificationDecoratorBase):
"""A storage to support blobs."""
zope.interface.implements(ZODB.interfaces.IBlobStorage)
......@@ -690,13 +692,14 @@ class BlobStorage(SpecificationDecoratorBase, BlobStorageMixin):
__slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo',
'_blobs_pack_is_in_progress', )
def __new__(self, base_directory, storage, layout='automatic'):
return SpecificationDecoratorBase.__new__(self, storage)
def __init__(self, base_directory, storage, layout='automatic'):
# XXX Log warning if storage is ClientStorage
SpecificationDecoratorBase.__init__(self, storage)
BlobStorageMixin.__init__(self, base_directory, layout)
self._blob_init(base_directory, layout)
try:
supportsUndo = storage.supportsUndo
except AttributeError:
......@@ -722,7 +725,7 @@ class BlobStorage(SpecificationDecoratorBase, BlobStorageMixin):
# providing a _finish method because methods found on the proxied
# object aren't rebound to the proxy
getProxiedObject(self).tpc_finish(*arg, **kw)
self.blob_tpc_finish()
self._blob_tpc_finish()
@non_overridable
def tpc_abort(self, *arg, **kw):
......@@ -730,7 +733,7 @@ class BlobStorage(SpecificationDecoratorBase, BlobStorageMixin):
# providing an _abort method because methods found on the proxied object
# aren't rebound to the proxy
getProxiedObject(self).tpc_abort(*arg, **kw)
self.blob_tpc_abort()
self._blob_tpc_abort()
@non_overridable
def _packUndoing(self, packtime, referencesf):
......@@ -856,6 +859,12 @@ class BlobStorage(SpecificationDecoratorBase, BlobStorageMixin):
return undo_serial, keys
for name, v in BlobStorageMixin.__dict__.items():
if isinstance(v, type(BlobStorageMixin.__dict__['storeBlob'])):
assert name not in BlobStorage.__dict__
setattr(BlobStorage, name, non_overridable(v))
del name, v
copied = logging.getLogger('ZODB.blob.copied').debug
def rename_or_copy_blob(f1, f2, chmod=True):
"""Try to rename f1 to f2, fallback to copy.
......@@ -894,9 +903,12 @@ if sys.platform == 'win32':
filename = os.path.join(dirpath, filename)
remove_committed(filename)
shutil.rmtree(path)
link_or_copy = shutil.copy
else:
remove_committed = os.remove
remove_committed_dir = shutil.rmtree
link_or_copy = os.link
def is_blob_record(record):
......
......@@ -14,6 +14,7 @@
import os, unittest
import transaction
import ZODB.FileStorage
import ZODB.tests.testblob
import ZODB.tests.util
import zope.testing.setupstack
from ZODB import POSException
......@@ -547,6 +548,40 @@ def deal_with_finish_failures():
>>> db.close()
"""
def pack_with_open_blob_files():
"""
Make sure packing works while there are open blob files.
>>> fs = ZODB.FileStorage.FileStorage('data.fs', blob_dir='blobs')
>>> db = ZODB.DB(fs)
>>> tm1 = transaction.TransactionManager()
>>> conn1 = db.open(tm1)
>>> import ZODB.blob
>>> conn1.root()[1] = ZODB.blob.Blob()
>>> conn1.add(conn1.root()[1])
>>> conn1.root()[1].open('w').write('some data')
>>> tm1.commit()
>>> tm2 = transaction.TransactionManager()
>>> conn2 = db.open(tm2)
>>> f = conn1.root()[1].open()
>>> conn1.root()[2] = ZODB.blob.Blob()
>>> conn1.add(conn1.root()[2])
>>> conn1.root()[2].open('w').write('some more data')
>>> db.pack()
>>> f.read()
'some data'
>>> tm1.commit()
>>> conn2.sync()
>>> conn2.root()[2].open().read()
'some more data'
>>> db.close()
"""
def test_suite():
from zope.testing import doctest
......@@ -558,6 +593,13 @@ def test_suite():
suite.addTest(doctest.DocTestSuite(
setUp=zope.testing.setupstack.setUpDirectory,
tearDown=zope.testing.setupstack.tearDown))
suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
'BlobFileStorage',
lambda name, blob_dir:
ZODB.FileStorage.FileStorage('%s.fs' % name, blob_dir=blob_dir),
test_blob_storage_recovery=True,
test_packing=True,
))
return suite
if __name__=='__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment