Commit de181fc4 authored by Jim Fulton's avatar Jim Fulton

Added support for copying and recovery of blob storages:

- Added a helper function, ZODB.blob.is_blob_record for testing whether
  a data record is for a blob.  This can be used when iterating over a
  storage to detect blob records so that blob data can be copied.

In the future, we may want to build this into a blob-aware
  iteration interface, so that records get blob file attributes
  automatically.

- Added the IBlobStorageRestoreable interfaces for blob storages
  that support recovery via a restoreBlob method.

- Updated ZODB.blob.BlobStorage to implement
  IBlobStorageRestoreable and to have a copyTransactionsFrom method
  that also copies blob data.

Also removed the version argument from the history method.
parent a9d1c2d2
...@@ -8,6 +8,23 @@ ...@@ -8,6 +8,23 @@
New Features New Features
------------ ------------
- Added support for copying and recovery of blob storages:
- Added a helper function, ZODB.blob.is_blob_record for testing whether
a data record is for a blob. This can be used when iterating over a
storage to detect blob records so that blob data can be copied.
In the future, we may want to build this into a blob-aware
iteration interface, so that records get blob file attributes
automatically.
- Added the IBlobStorageRestoreable interfaces for blob storages
that support recovery via a restoreBlob method.
- Updated ZODB.blob.BlobStorage to implement
IBlobStorageRestoreable and to have a copyTransactionsFrom method
that also copies blob data.
- New `ClientStorage` configuration option `drop_cache_rather_verify`. - New `ClientStorage` configuration option `drop_cache_rather_verify`.
If this option is true then the ZEO client cache is dropped instead of If this option is true then the ZEO client cache is dropped instead of
the long (unoptimized) verification. For large caches, setting this the long (unoptimized) verification. For large caches, setting this
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
"""Blobs """Blobs
""" """
import cPickle
import base64 import base64
import logging import logging
import os import os
...@@ -443,6 +444,10 @@ class BlobStorage(SpecificationDecoratorBase): ...@@ -443,6 +444,10 @@ class BlobStorage(SpecificationDecoratorBase):
self.__supportsUndo = supportsUndo self.__supportsUndo = supportsUndo
self._blobs_pack_is_in_progress = False self._blobs_pack_is_in_progress = False
if ZODB.interfaces.IStorageRestoreable.providedBy(storage):
zope.interface.alsoProvides(self,
ZODB.interfaces.IBlobStorageRestoreable)
@non_overridable @non_overridable
def temporaryDirectory(self): def temporaryDirectory(self):
return self.fshelper.temp_dir return self.fshelper.temp_dir
...@@ -452,14 +457,9 @@ class BlobStorage(SpecificationDecoratorBase): ...@@ -452,14 +457,9 @@ class BlobStorage(SpecificationDecoratorBase):
normal_storage = getProxiedObject(self) normal_storage = getProxiedObject(self)
return '<BlobStorage proxy for %r at %s>' % (normal_storage, return '<BlobStorage proxy for %r at %s>' % (normal_storage,
hex(id(self))) hex(id(self)))
@non_overridable
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
"""Stores data that has a BLOB attached."""
serial = self.store(oid, oldserial, data, version, transaction)
assert isinstance(serial, str) # XXX in theory serials could be
# something else
@non_overridable
def _storeblob(self, oid, serial, blobfilename):
self._lock_acquire() self._lock_acquire()
try: try:
targetpath = self.fshelper.getPathForOID(oid) targetpath = self.fshelper.getPathForOID(oid)
...@@ -474,8 +474,52 @@ class BlobStorage(SpecificationDecoratorBase): ...@@ -474,8 +474,52 @@ class BlobStorage(SpecificationDecoratorBase):
self.dirty_oids.append((oid, serial)) self.dirty_oids.append((oid, serial))
finally: finally:
self._lock_release() self._lock_release()
@non_overridable
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
"""Stores data that has a BLOB attached."""
assert not version, "Versions aren't supported."
serial = self.store(oid, oldserial, data, '', transaction)
self._storeblob(oid, serial, blobfilename)
return self._tid
@non_overridable
def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
transaction):
"""Write blob data already committed in a separate database
"""
self.restore(oid, serial, data, '', prev_txn, transaction)
self._storeblob(oid, serial, blobfilename)
return self._tid return self._tid
@non_overridable
def copyTransactionsFrom(self, other):
for trans in other.iterator():
self.tpc_begin(trans, trans.tid, trans.status)
for record in trans:
blobfilename = None
if is_blob_record(record.data):
try:
blobfilename = other.loadBlob(record.oid, record.tid)
except POSKeyError:
pass
if blobfilename is not None:
fd, name = tempfile.mkstemp(
suffix='.tmp', dir=self.fshelper.temp_dir)
os.close(fd)
utils.cp(open(blobfilename), open(name, 'wb'))
self.restoreBlob(record.oid, record.tid, record.data,
name, record.data_txn, trans)
else:
self.restore(record.oid, record.tid, record.data,
'', record.data_txn, trans)
self.tpc_vote(trans)
self.tpc_finish(trans)
@non_overridable @non_overridable
def tpc_finish(self, *arg, **kw): def tpc_finish(self, *arg, **kw):
# We need to override the base storage's tpc_finish instead of # We need to override the base storage's tpc_finish instead of
...@@ -692,3 +736,13 @@ if sys.platform == 'win32': ...@@ -692,3 +736,13 @@ if sys.platform == 'win32':
else: else:
remove_committed = os.remove remove_committed = os.remove
remove_committed_dir = shutil.rmtree remove_committed_dir = shutil.rmtree
def is_blob_record(record):
"""Check whether a database record is a blob record.
This is primarily intended to be used when copying data from one
storage to another.
"""
return cPickle.loads(record) is ZODB.blob.Blob
...@@ -446,7 +446,7 @@ class IStorage(Interface): ...@@ -446,7 +446,7 @@ class IStorage(Interface):
This is used soley for informational purposes. This is used soley for informational purposes.
""" """
def history(oid, version='', size=1): def history(oid, size=1):
"""Return a sequence of history information dictionaries. """Return a sequence of history information dictionaries.
Up to size objects (including no objects) may be returned. Up to size objects (including no objects) may be returned.
...@@ -1009,6 +1009,14 @@ class IBlobStorage(Interface): ...@@ -1009,6 +1009,14 @@ class IBlobStorage(Interface):
If Blobs use this, then commits can be performed with a simple rename. If Blobs use this, then commits can be performed with a simple rename.
""" """
class IBlobStorageRestoreable(IBlobStorage, IStorageRestoreable):
def storeBlob(oid, serial, data, blobfilename, prev_txn, transaction):
"""Write blob data already committed in a separate database
See the restore and storeBlob methods.
"""
class BlobError(Exception): class BlobError(Exception):
pass pass
......
...@@ -25,7 +25,7 @@ from ZODB.utils import U64, p64 ...@@ -25,7 +25,7 @@ from ZODB.utils import U64, p64
from transaction import Transaction from transaction import Transaction
import itertools import itertools
import ZODB.blob
class IteratorCompare: class IteratorCompare:
...@@ -214,6 +214,18 @@ class IteratorDeepCompare: ...@@ -214,6 +214,18 @@ class IteratorDeepCompare:
eq(rec1.oid, rec2.oid) eq(rec1.oid, rec2.oid)
eq(rec1.tid, rec2.tid) eq(rec1.tid, rec2.tid)
eq(rec1.data, rec2.data) eq(rec1.data, rec2.data)
if ZODB.blob.is_blob_record(rec1.data):
try:
fn1 = storage1.loadBlob(rec1.oid, rec1.tid)
except ZODB.POSException.POSKeyError:
self.assertRaises(
ZODB.POSException.POSKeyError,
storage2.loadBlob, rec1.oid, rec1.tid)
else:
fn2 = storage2.loadBlob(rec1.oid, rec1.tid)
self.assert_(fn1 != fn2)
eq(open(fn1).read(), open(fn2).read())
# Make sure there are no more records left in rec1 and rec2, # Make sure there are no more records left in rec1 and rec2,
# meaning they were the same length. # meaning they were the same length.
# Additionally, check that we're backwards compatible to the # Additionally, check that we're backwards compatible to the
......
...@@ -12,10 +12,15 @@ ...@@ -12,10 +12,15 @@
# #
############################################################################## ##############################################################################
import base64, os, re, shutil, stat, sys, tempfile, unittest import base64, os, re, shutil, stat, sys, tempfile, unittest, random, struct
import time import time
import ZODB.tests.IteratorStorage
from zope.testing import doctest, renormalizing from zope.testing import doctest, renormalizing
import zope.testing.setupstack
import ZODB.tests.util import ZODB.tests.util
import ZODB.interfaces
from StringIO import StringIO from StringIO import StringIO
from pickle import Pickler from pickle import Pickler
...@@ -275,6 +280,49 @@ class BlobUndoTests(BlobTests): ...@@ -275,6 +280,49 @@ class BlobUndoTests(BlobTests):
database.close() database.close()
class RecoveryBlobStorage(unittest.TestCase,
ZODB.tests.IteratorStorage.IteratorDeepCompare):
def setUp(self):
self.globs = {}
zope.testing.setupstack.setUpDirectory(self)
self._storage = BlobStorage(
'src_blobs', ZODB.FileStorage.FileStorage("Source.fs", create=True))
self._dst = BlobStorage(
'dest_blobs', ZODB.FileStorage.FileStorage("Dest.fs", create=True))
def tearDown(self):
self._storage.close()
self._dst.close()
zope.testing.setupstack.tearDown(self)
# Requires a setUp() that creates a self._dst destination storage
def testSimpleBlobRecovery(self):
self.assert_(
ZODB.interfaces.IBlobStorageRestoreable.providedBy(self._storage)
)
db = DB(self._storage)
conn = db.open()
conn.root()[1] = ZODB.blob.Blob()
transaction.commit()
conn.root()[2] = ZODB.blob.Blob()
conn.root()[2].open('w').write('some data')
transaction.commit()
conn.root()[3] = ZODB.blob.Blob()
conn.root()[3].open('w').write(
(''.join(struct.pack(">I", random.randint(0, (1<<32)-1))
for i in range(random.randint(10000,20000)))
)[:-random.randint(1,4)]
)
transaction.commit()
conn.root()[2] = ZODB.blob.Blob()
conn.root()[2].open('w').write('some other data')
transaction.commit()
self._dst.copyTransactionsFrom(self._storage)
self.compare(self._storage, self._dst)
def gc_blob_removes_uncommitted_data(): def gc_blob_removes_uncommitted_data():
""" """
>>> from ZODB.blob import Blob >>> from ZODB.blob import Blob
...@@ -540,6 +588,22 @@ def loadblob_tmpstore(): ...@@ -540,6 +588,22 @@ def loadblob_tmpstore():
>>> os.unlink(storagefile+".tmp") >>> os.unlink(storagefile+".tmp")
""" """
def is_blob_record():
"""
>>> fs = FileStorage('Data.fs')
>>> bs = ZODB.blob.BlobStorage('blobs', fs)
>>> db = DB(bs)
>>> conn = db.open()
>>> conn.root()['blob'] = ZODB.blob.Blob()
>>> transaction.commit()
>>> ZODB.blob.is_blob_record(fs.load(ZODB.utils.p64(0), '')[0])
False
>>> ZODB.blob.is_blob_record(fs.load(ZODB.utils.p64(1), '')[0])
True
>>> db.close()
"""
def setUp(test): def setUp(test):
ZODB.tests.util.setUp(test) ZODB.tests.util.setUp(test)
def rmtree(path): def rmtree(path):
...@@ -575,6 +639,7 @@ def test_suite(): ...@@ -575,6 +639,7 @@ def test_suite():
)) ))
suite.addTest(unittest.makeSuite(BlobCloneTests)) suite.addTest(unittest.makeSuite(BlobCloneTests))
suite.addTest(unittest.makeSuite(BlobUndoTests)) suite.addTest(unittest.makeSuite(BlobUndoTests))
suite.addTest(unittest.makeSuite(RecoveryBlobStorage))
return suite return suite
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment