Commit 964303ab authored by Jim Fulton's avatar Jim Fulton

Added support for wrapper storages that transform pickle data. (I

thought this support already existed, but found out I was wrong when I
tried to make it work. :/) Applications for this include compression
and encryption.  An example wrapper storage implementation,
ZODB.tests.hexstorage, was included for testing.

It is important that storage implementations not assume that
storages contain pickles.  Renamed IStorageDB to IStorageWrapper and
expanded it to provide methods for transforming and untransforming
data records.  Storages implementations should use these methods to
get pickle data from stored records.
parent 6ee0e254
...@@ -778,6 +778,7 @@ class StorageServerDB: ...@@ -778,6 +778,7 @@ class StorageServerDB:
def invalidateCache(self): def invalidateCache(self):
self.server._invalidateCache(self.storage_id) self.server._invalidateCache(self.storage_id)
transform_record_data = untransform_record_data = lambda self, data: data
class StorageServer: class StorageServer:
......
...@@ -35,7 +35,12 @@ class IterationTests: ...@@ -35,7 +35,12 @@ class IterationTests:
def checkIteratorExhaustionStorage(self): def checkIteratorExhaustionStorage(self):
# Test the storage's garbage collection mechanism. # Test the storage's garbage collection mechanism.
self._dostore()
iterator = self._storage.iterator() iterator = self._storage.iterator()
# At this point, a wrapping iterator might not have called the CS
# iterator yet. We'll consume one item to make sure this happens.
iterator.next()
self.assertEquals(1, len(self._storage._iterator_ids)) self.assertEquals(1, len(self._storage._iterator_ids))
iid = list(self._storage._iterator_ids)[0] iid = list(self._storage._iterator_ids)[0]
self.assertEquals([], list(iterator)) self.assertEquals([], list(iterator))
...@@ -47,18 +52,25 @@ class IterationTests: ...@@ -47,18 +52,25 @@ class IterationTests:
def checkIteratorGCSpanTransactions(self): def checkIteratorGCSpanTransactions(self):
# Keep a hard reference to the iterator so it won't be automatically # Keep a hard reference to the iterator so it won't be automatically
# garbage collected at the transaction boundary. # garbage collected at the transaction boundary.
self._dostore()
iterator = self._storage.iterator() iterator = self._storage.iterator()
self._dostore() self._dostore()
# As the iterator was not garbage collected, we can still use it. (We # As the iterator was not garbage collected, we can still use it. (We
# don't see the transaction we just wrote being picked up, because # don't see the transaction we just wrote being picked up, because
# iterators only see the state from the point in time when they were # iterators only see the state from the point in time when they were
# created.) # created.)
self.assertEquals([], list(iterator)) self.assert_(list(iterator))
def checkIteratorGCStorageCommitting(self): def checkIteratorGCStorageCommitting(self):
# We want the iterator to be garbage-collected, so we don't keep any # We want the iterator to be garbage-collected, so we don't keep any
# hard references to it. The storage tracks its ID, though. # hard references to it. The storage tracks its ID, though.
self._storage.iterator()
# The odd little jig we do below arises from the fact that the
# CS iterator may not be constructed right away if the CS is wrapped.
# We need to actually do some iteration to get the iterator created.
# We do a store to make sure the iterator isn't exhausted right away.
self._dostore()
self._storage.iterator().next()
self.assertEquals(1, len(self._storage._iterator_ids)) self.assertEquals(1, len(self._storage._iterator_ids))
iid = list(self._storage._iterator_ids)[0] iid = list(self._storage._iterator_ids)[0]
...@@ -70,8 +82,15 @@ class IterationTests: ...@@ -70,8 +82,15 @@ class IterationTests:
self.assertRaises(KeyError, self._storage._server.iterator_next, iid) self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
def checkIteratorGCStorageTPCAborting(self): def checkIteratorGCStorageTPCAborting(self):
self._storage.iterator() # The odd little jig we do below arises from the fact that the
# CS iterator may not be constructed right away if the CS is wrapped.
# We need to actually do some iteration to get the iterator created.
# We do a store to make sure the iterator isn't exhausted right away.
self._dostore()
self._storage.iterator().next()
iid = list(self._storage._iterator_ids)[0] iid = list(self._storage._iterator_ids)[0]
t = transaction.Transaction() t = transaction.Transaction()
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
self._storage.tpc_abort(t) self._storage.tpc_abort(t)
...@@ -79,7 +98,14 @@ class IterationTests: ...@@ -79,7 +98,14 @@ class IterationTests:
self.assertRaises(KeyError, self._storage._server.iterator_next, iid) self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
def checkIteratorGCStorageDisconnect(self): def checkIteratorGCStorageDisconnect(self):
self._storage.iterator()
# The odd little jig we do below arises from the fact that the
# CS iterator may not be constructed right away if the CS is wrapped.
# We need to actually do some iteration to get the iterator created.
# We do a store to make sure the iterator isn't exhausted right away.
self._dostore()
self._storage.iterator().next()
iid = list(self._storage._iterator_ids)[0] iid = list(self._storage._iterator_ids)[0]
t = transaction.Transaction() t = transaction.Transaction()
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
......
...@@ -47,6 +47,7 @@ import ZEO.tests.ConnectionTests ...@@ -47,6 +47,7 @@ import ZEO.tests.ConnectionTests
import ZEO.zrpc.connection import ZEO.zrpc.connection
import ZODB import ZODB
import ZODB.blob import ZODB.blob
import ZODB.tests.hexstorage
import ZODB.tests.testblob import ZODB.tests.testblob
import ZODB.tests.util import ZODB.tests.util
import ZODB.utils import ZODB.utils
...@@ -59,6 +60,7 @@ class DummyDB: ...@@ -59,6 +60,7 @@ class DummyDB:
pass pass
def invalidateCache(*unused): def invalidateCache(*unused):
pass pass
transform_record_data = untransform_record_data = lambda self, v: v
class CreativeGetState(persistent.Persistent): class CreativeGetState(persistent.Persistent):
...@@ -94,7 +96,8 @@ class MiscZEOTests: ...@@ -94,7 +96,8 @@ class MiscZEOTests:
def checkZEOInvalidation(self): def checkZEOInvalidation(self):
addr = self._storage._addr addr = self._storage._addr
storage2 = ClientStorage(addr, wait=1, min_disconnect_poll=0.1) storage2 = self._wrap_client(
ClientStorage(addr, wait=1, min_disconnect_poll=0.1))
try: try:
oid = self._storage.new_oid() oid = self._storage.new_oid()
ob = MinPO('first') ob = MinPO('first')
...@@ -196,13 +199,16 @@ class GenericTests( ...@@ -196,13 +199,16 @@ class GenericTests(
self.blob_cache_dir = tempfile.mkdtemp( self.blob_cache_dir = tempfile.mkdtemp(
'blob_cache', 'blob_cache',
dir=os.path.abspath(os.getcwd())) dir=os.path.abspath(os.getcwd()))
self._storage = ClientStorage( self._storage = self._wrap_client(ClientStorage(
zport, '1', cache_size=20000000, zport, '1', cache_size=20000000,
min_disconnect_poll=0.5, wait=1, min_disconnect_poll=0.5, wait=1,
wait_timeout=60, blob_dir=self.blob_cache_dir, wait_timeout=60, blob_dir=self.blob_cache_dir,
shared_blob_dir=self.shared_blob_dir) shared_blob_dir=self.shared_blob_dir))
self._storage.registerDB(DummyDB()) self._storage.registerDB(DummyDB())
def _wrap_client(self, client):
return client
def tearDown(self): def tearDown(self):
self._storage.close() self._storage.close()
for server in self._servers: for server in self._servers:
...@@ -317,6 +323,16 @@ class FileStorageTests(FullGenericTests): ...@@ -317,6 +323,16 @@ class FileStorageTests(FullGenericTests):
</filestorage> </filestorage>
""" """
_expected_interfaces = (
('ZODB.interfaces', 'IStorageRestoreable'),
('ZODB.interfaces', 'IStorageIteration'),
('ZODB.interfaces', 'IStorageUndoable'),
('ZODB.interfaces', 'IStorageCurrentRecordIteration'),
('ZODB.interfaces', 'IExternalGC'),
('ZODB.interfaces', 'IStorage'),
('zope.interface', 'Interface'),
)
def checkInterfaceFromRemoteStorage(self): def checkInterfaceFromRemoteStorage(self):
# ClientStorage itself doesn't implement IStorageIteration, but the # ClientStorage itself doesn't implement IStorageIteration, but the
# FileStorage on the other end does, and thus the ClientStorage # FileStorage on the other end does, and thus the ClientStorage
...@@ -326,18 +342,48 @@ class FileStorageTests(FullGenericTests): ...@@ -326,18 +342,48 @@ class FileStorageTests(FullGenericTests):
self.failUnless(ZODB.interfaces.IStorageIteration.providedBy( self.failUnless(ZODB.interfaces.IStorageIteration.providedBy(
self._storage)) self._storage))
# This is communicated using ClientStorage's _info object: # This is communicated using ClientStorage's _info object:
self.assertEquals( self.assertEquals(self._expected_interfaces,
(('ZODB.interfaces', 'IStorageRestoreable'),
('ZODB.interfaces', 'IStorageIteration'),
('ZODB.interfaces', 'IStorageUndoable'),
('ZODB.interfaces', 'IStorageCurrentRecordIteration'),
('ZODB.interfaces', 'IExternalGC'),
('ZODB.interfaces', 'IStorage'),
('zope.interface', 'Interface'),
),
self._storage._info['interfaces'] self._storage._info['interfaces']
) )
class FileStorageHexTests(FileStorageTests):
_expected_interfaces = (
('ZODB.interfaces', 'IStorageRestoreable'),
('ZODB.interfaces', 'IStorageIteration'),
('ZODB.interfaces', 'IStorageUndoable'),
('ZODB.interfaces', 'IStorageCurrentRecordIteration'),
('ZODB.interfaces', 'IExternalGC'),
('ZODB.interfaces', 'IStorage'),
('ZODB.interfaces', 'IStorageWrapper'),
('zope.interface', 'Interface'),
)
def getConfig(self):
return """\
%import ZODB.tests
<hexstorage>
<filestorage 1>
path Data.fs
</filestorage>
</hexstorage>
"""
class FileStorageClientHexTests(FileStorageHexTests):
def getConfig(self):
return """\
%import ZODB.tests
<serverhexstorage>
<filestorage 1>
path Data.fs
</filestorage>
</serverhexstorage>
"""
def _wrap_client(self, client):
return ZODB.tests.hexstorage.HexStorage(client)
class MappingStorageTests(GenericTests): class MappingStorageTests(GenericTests):
"""ZEO backed by a Mapping storage.""" """ZEO backed by a Mapping storage."""
...@@ -1405,7 +1451,8 @@ def quick_close_doesnt_kill_server(): ...@@ -1405,7 +1451,8 @@ def quick_close_doesnt_kill_server():
slow_test_classes = [ slow_test_classes = [
BlobAdaptedFileStorageTests, BlobWritableCacheTests, BlobAdaptedFileStorageTests, BlobWritableCacheTests,
DemoStorageTests, FileStorageTests, MappingStorageTests, MappingStorageTests, DemoStorageTests,
FileStorageTests, FileStorageHexTests, FileStorageClientHexTests,
] ]
quick_test_classes = [ quick_test_classes = [
......
...@@ -53,6 +53,7 @@ def find_global(*args): ...@@ -53,6 +53,7 @@ def find_global(*args):
def state(self, oid, serial, prfactory, p=''): def state(self, oid, serial, prfactory, p=''):
p = p or self.loadSerial(oid, serial) p = p or self.loadSerial(oid, serial)
p = self._crs_untransform_record_data(p)
file = StringIO(p) file = StringIO(p)
unpickler = Unpickler(file) unpickler = Unpickler(file)
unpickler.find_global = find_global unpickler.find_global = find_global
...@@ -80,13 +81,13 @@ class IPersistentReference(zope.interface.Interface): ...@@ -80,13 +81,13 @@ class IPersistentReference(zope.interface.Interface):
def __cmp__(other): def __cmp__(other):
'''if other is equivalent reference, return 0; else raise ValueError. '''if other is equivalent reference, return 0; else raise ValueError.
Equivalent in this case means that oid and database_name are the same. Equivalent in this case means that oid and database_name are the same.
If either is a weak reference, we only support `is` equivalence, and If either is a weak reference, we only support `is` equivalence, and
otherwise raise a ValueError even if the datbase_names and oids are otherwise raise a ValueError even if the datbase_names and oids are
the same, rather than guess at the correct semantics. the same, rather than guess at the correct semantics.
It is impossible to sort reliably, since the actual persistent It is impossible to sort reliably, since the actual persistent
class may have its own comparison, and we have no idea what it is. class may have its own comparison, and we have no idea what it is.
We assert that it is reasonably safe to assume that an object is We assert that it is reasonably safe to assume that an object is
...@@ -135,7 +136,7 @@ class PersistentReference(object): ...@@ -135,7 +136,7 @@ class PersistentReference(object):
def __cmp__(self, other): def __cmp__(self, other):
if self is other or ( if self is other or (
isinstance(other, PersistentReference) and isinstance(other, PersistentReference) and
self.oid == other.oid and self.oid == other.oid and
self.database_name == other.database_name and self.database_name == other.database_name and
not self.weak and not self.weak and
...@@ -179,6 +180,7 @@ def tryToResolveConflict(self, oid, committedSerial, oldSerial, newpickle, ...@@ -179,6 +180,7 @@ def tryToResolveConflict(self, oid, committedSerial, oldSerial, newpickle,
# class_tuple, old, committed, newstate = ('',''), 0, 0, 0 # class_tuple, old, committed, newstate = ('',''), 0, 0, 0
try: try:
prfactory = PersistentReferenceFactory() prfactory = PersistentReferenceFactory()
newpickle = self._crs_untransform_record_data(newpickle)
file = StringIO(newpickle) file = StringIO(newpickle)
unpickler = Unpickler(file) unpickler = Unpickler(file)
unpickler.find_global = find_global unpickler.find_global = find_global
...@@ -194,7 +196,7 @@ def tryToResolveConflict(self, oid, committedSerial, oldSerial, newpickle, ...@@ -194,7 +196,7 @@ def tryToResolveConflict(self, oid, committedSerial, oldSerial, newpickle,
newargs = () newargs = ()
if klass in _unresolvable: if klass in _unresolvable:
return None raise ConflictError
newstate = unpickler.load() newstate = unpickler.load()
inst = klass.__new__(klass, *newargs) inst = klass.__new__(klass, *newargs)
...@@ -203,7 +205,7 @@ def tryToResolveConflict(self, oid, committedSerial, oldSerial, newpickle, ...@@ -203,7 +205,7 @@ def tryToResolveConflict(self, oid, committedSerial, oldSerial, newpickle,
resolve = inst._p_resolveConflict resolve = inst._p_resolveConflict
except AttributeError: except AttributeError:
_unresolvable[klass] = 1 _unresolvable[klass] = 1
return None raise ConflictError
old = state(self, oid, oldSerial, prfactory) old = state(self, oid, oldSerial, prfactory)
committed = state(self, oid, committedSerial, prfactory, committedData) committed = state(self, oid, committedSerial, prfactory, committedData)
...@@ -215,9 +217,9 @@ def tryToResolveConflict(self, oid, committedSerial, oldSerial, newpickle, ...@@ -215,9 +217,9 @@ def tryToResolveConflict(self, oid, committedSerial, oldSerial, newpickle,
pickler.inst_persistent_id = persistent_id pickler.inst_persistent_id = persistent_id
pickler.dump(meta) pickler.dump(meta)
pickler.dump(resolved) pickler.dump(resolved)
return file.getvalue(1) return self._crs_transform_record_data(file.getvalue(1))
except (ConflictError, BadClassName): except (ConflictError, BadClassName):
return None pass
except: except:
# If anything else went wrong, catch it here and avoid passing an # If anything else went wrong, catch it here and avoid passing an
# arbitrary exception back to the client. The error here will mask # arbitrary exception back to the client. The error here will mask
...@@ -225,9 +227,19 @@ def tryToResolveConflict(self, oid, committedSerial, oldSerial, newpickle, ...@@ -225,9 +227,19 @@ def tryToResolveConflict(self, oid, committedSerial, oldSerial, newpickle,
# ConflictError, but not necessarily from other errors. But log # ConflictError, but not necessarily from other errors. But log
# the error so that any problems can be fixed. # the error so that any problems can be fixed.
logger.error("Unexpected error", exc_info=True) logger.error("Unexpected error", exc_info=True)
return None
class ConflictResolvingStorage: raise ConflictError(oid=oid, serials=(committedSerial, oldSerial),
data=newpickle)
class ConflictResolvingStorage(object):
"Mix-in class that provides conflict resolution handling for storages" "Mix-in class that provides conflict resolution handling for storages"
tryToResolveConflict = tryToResolveConflict tryToResolveConflict = tryToResolveConflict
_crs_transform_record_data = _crs_untransform_record_data = (
lambda self, o: o)
def registerDB(self, wrapper):
self._crs_untransform_record_data = wrapper.untransform_record_data
self._crs_transform_record_data = wrapper.transform_record_data
super(ConflictResolvingStorage, self).registerDB(wrapper)
...@@ -707,6 +707,8 @@ class DB(object): ...@@ -707,6 +707,8 @@ class DB(object):
""" """
self._connectionMap(lambda c: c.invalidateCache()) self._connectionMap(lambda c: c.invalidateCache())
transform_record_data = untransform_record_data = lambda self, data: data
def objectCount(self): def objectCount(self):
return len(self.storage) return len(self.storage)
......
...@@ -476,14 +476,8 @@ class FileStorage( ...@@ -476,14 +476,8 @@ class FileStorage(
committed_tid = h.tid committed_tid = h.tid
if oldserial != committed_tid: if oldserial != committed_tid:
rdata = self.tryToResolveConflict(oid, committed_tid, data = self.tryToResolveConflict(oid, committed_tid,
oldserial, data) oldserial, data)
if rdata is None:
raise POSException.ConflictError(
oid=oid, serials=(committed_tid, oldserial),
data=data)
else:
data = rdata
pos = self._pos pos = self._pos
here = pos + self._tfile.tell() + self._thl here = pos + self._tfile.tell() + self._thl
...@@ -843,10 +837,11 @@ class FileStorage( ...@@ -843,10 +837,11 @@ class FileStorage(
except KeyError: except KeyError:
# couldn't find oid; what's the real explanation for this? # couldn't find oid; what's the real explanation for this?
raise UndoError("_loadBack() failed for %s", oid) raise UndoError("_loadBack() failed for %s", oid)
data = self.tryToResolveConflict(oid, ctid, tid, bdata, cdata) try:
data = self.tryToResolveConflict(oid, ctid, tid, bdata, cdata)
if data:
return data, 0, ipos return data, 0, ipos
except POSException.ConflictError:
pass
raise UndoError("Some data were modified by a later transaction", oid) raise UndoError("Some data were modified by a later transaction", oid)
...@@ -961,7 +956,7 @@ class FileStorage( ...@@ -961,7 +956,7 @@ class FileStorage(
except ZODB.POSException.POSKeyError: except ZODB.POSException.POSKeyError:
pass # It was removed, so no need to copy data pass # It was removed, so no need to copy data
else: else:
if ZODB.blob.is_blob_record(up): if self.is_blob_record(up):
# We're undoing a blob modification operation. # We're undoing a blob modification operation.
# We have to copy the blob data # We have to copy the blob data
tmp = ZODB.utils.mktemp(dir=self.fshelper.temp_dir) tmp = ZODB.utils.mktemp(dir=self.fshelper.temp_dir)
...@@ -1768,7 +1763,7 @@ class FileIterator(FileStorageFormatter): ...@@ -1768,7 +1763,7 @@ class FileIterator(FileStorageFormatter):
def next(self): def next(self):
if self._file is None: if self._file is None:
raise ZODB.interfaces.StorageStopIteration() raise StopIteration()
pos = self._pos pos = self._pos
while True: while True:
...@@ -1857,7 +1852,7 @@ class FileIterator(FileStorageFormatter): ...@@ -1857,7 +1852,7 @@ class FileIterator(FileStorageFormatter):
return result return result
self.close() self.close()
raise ZODB.interfaces.StorageStopIteration() raise StopIteration()
class TransactionRecord(BaseStorage.TransactionRecord): class TransactionRecord(BaseStorage.TransactionRecord):
...@@ -1916,7 +1911,7 @@ class TransactionRecordIterator(FileStorageFormatter): ...@@ -1916,7 +1911,7 @@ class TransactionRecordIterator(FileStorageFormatter):
return Record(h.oid, h.tid, data, prev_txn, pos) return Record(h.oid, h.tid, data, prev_txn, pos)
raise ZODB.interfaces.StorageStopIteration() raise StopIteration()
class Record(BaseStorage.DataRecord): class Record(BaseStorage.DataRecord):
......
...@@ -30,7 +30,6 @@ from ZODB.utils import p64, u64, z64 ...@@ -30,7 +30,6 @@ from ZODB.utils import p64, u64, z64
import logging import logging
import os import os
import ZODB.blob
import ZODB.fsIndex import ZODB.fsIndex
import ZODB.POSException import ZODB.POSException
...@@ -502,7 +501,7 @@ class FileStoragePacker(FileStorageFormatter): ...@@ -502,7 +501,7 @@ class FileStoragePacker(FileStorageFormatter):
data = self._file.read(h.plen) data = self._file.read(h.plen)
else: else:
data = self.fetchDataViaBackpointer(h.oid, h.back) data = self.fetchDataViaBackpointer(h.oid, h.back)
if data and ZODB.blob.is_blob_record(data): if data and self._storage.is_blob_record(data):
# We need to remove the blob record. Maybe we # We need to remove the blob record. Maybe we
# need to remove oid: # need to remove oid:
......
...@@ -617,29 +617,24 @@ class BlobStorageMixin(object): ...@@ -617,29 +617,24 @@ class BlobStorageMixin(object):
""" """
self.dirty_oids = [] self.dirty_oids = []
def copyTransactionsFrom(self, other): def registerDB(self, db):
for trans in other.iterator(): self.__untransform_record_data = db.untransform_record_data
self.tpc_begin(trans, trans.tid, trans.status) try:
for record in trans: m = super(BlobStorageMixin, self).registerDB
blobfilename = None except AttributeError:
if is_blob_record(record.data): pass
try: else:
blobfilename = other.loadBlob(record.oid, record.tid) m(db)
except POSKeyError:
pass def __untransform_record_data(self, record):
if blobfilename is not None: return record
fd, name = tempfile.mkstemp(
suffix='.tmp', dir=self.fshelper.temp_dir)
os.close(fd)
utils.cp(open(blobfilename, 'rb'), open(name, 'wb'))
self.restoreBlob(record.oid, record.tid, record.data,
name, record.data_txn, trans)
else:
self.restore(record.oid, record.tid, record.data,
'', record.data_txn, trans)
self.tpc_vote(trans) def is_blob_record(self, record):
self.tpc_finish(trans) if record:
return is_blob_record(self.__untransform_record_data(record))
def copyTransactionsFrom(self, other):
copyTransactionsFromTo(other, self)
def loadBlob(self, oid, serial): def loadBlob(self, oid, serial):
"""Return the filename where the blob file can be found. """Return the filename where the blob file can be found.
...@@ -943,3 +938,27 @@ def is_blob_record(record): ...@@ -943,3 +938,27 @@ def is_blob_record(record):
pass pass
return False return False
def copyTransactionsFromTo(source, destination):
for trans in source.iterator():
destination.tpc_begin(trans, trans.tid, trans.status)
for record in trans:
blobfilename = None
if is_blob_record(record.data):
try:
blobfilename = source.loadBlob(record.oid, record.tid)
except POSKeyError:
pass
if blobfilename is not None:
fd, name = tempfile.mkstemp(
suffix='.tmp', dir=destination.fshelper.temp_dir)
os.close(fd)
utils.cp(open(blobfilename, 'rb'), open(name, 'wb'))
destination.restoreBlob(record.oid, record.tid, record.data,
name, record.data_txn, trans)
else:
destination.restore(record.oid, record.tid, record.data,
'', record.data_txn, trans)
destination.tpc_vote(trans)
destination.tpc_finish(trans)
...@@ -287,23 +287,32 @@ class IConnection(Interface): ...@@ -287,23 +287,32 @@ class IConnection(Interface):
""" """
class IStorageDB(Interface): class IStorageWrapper(Interface):
"""Database interface exposed to storages """Storage wrapper interface
This interface provides 2 facilities: This interface provides 3 facilities:
- Out-of-band invalidation support - Out-of-band invalidation support
A storage can notify it's database of object invalidations that A storage can notify it's wrapper of object invalidations that
don't occur due to direct operations on the storage. Currently don't occur due to direct operations on the storage. Currently
this is only used by ZEO client storages to pass invalidation this is only used by ZEO client storages to pass invalidation
messages sent from a server. messages sent from a server.
- Record-reference extraction. - Record-reference extraction
The references method can be used to extract referenced object The references method can be used to extract referenced object
IDs from a database record. This can be used by storages to IDs from a database record. This can be used by storages to
provide more advanced garbage collection. provide more advanced garbage collection. A wrapper storage
that transforms data will provide a references method that
untransforms data passed to it and then pass the data to the
layer above it.
- Record transformation
A storage wrapper may transform data, for example for
compression or encryption. Methods are provided to transform or
untransform data.
This interface may be implemented by storage adapters or other This interface may be implemented by storage adapters or other
intermediaries. For example, a storage adapter that provides intermediaries. For example, a storage adapter that provides
...@@ -337,6 +346,16 @@ class IStorageDB(Interface): ...@@ -337,6 +346,16 @@ class IStorageDB(Interface):
be created and returned. be created and returned.
""" """
def transform_record_data(data):
"""Return transformed data
"""
def untransform_record_data(data):
"""Return untransformed data
"""
IStorageDB = IStorageWrapper # for backward compatibility
class IDatabase(IStorageDB): class IDatabase(IStorageDB):
"""ZODB DB. """ZODB DB.
...@@ -595,12 +614,18 @@ class IStorage(Interface): ...@@ -595,12 +614,18 @@ class IStorage(Interface):
revisions. revisions.
""" """
def registerDB(db): def registerDB(wrapper):
"""Register an IStorageDB. """Register a storage wrapper IStorageWrapper.
The passed object is a wrapper object that provides an upcall
interface to support composition.
Note that, for historical reasons, an implementation may Note that, for historical reasons, an implementation may
require a second argument, however, if required, the None will require a second argument, however, if required, the None will
be passed as the second argument. be passed as the second argument.
Also, for historical reasons, this is called registerDB rather
than register_wrapper.
""" """
def sortKey(): def sortKey():
......
...@@ -49,7 +49,7 @@ class BasicStorage: ...@@ -49,7 +49,7 @@ class BasicStorage:
self.assertRaises( self.assertRaises(
POSException.StorageTransactionError, POSException.StorageTransactionError,
self._storage.store, self._storage.store,
ZERO, 1, 2, '', transaction.Transaction()) ZERO, 1, '2', '', transaction.Transaction())
self._storage.tpc_abort(t) self._storage.tpc_abort(t)
def checkSerialIsNoneForInitialRevision(self): def checkSerialIsNoneForInitialRevision(self):
......
<component>
<sectiontype
name="hexstorage"
datatype="ZODB.tests.hexstorage.ZConfigHex"
implements="ZODB.storage"
>
<section type="ZODB.storage" name="*" attribute="base" required="yes" />
</sectiontype>
<sectiontype
name="serverhexstorage"
datatype="ZODB.tests.hexstorage.ZConfigServerHex"
implements="ZODB.storage"
>
<section type="ZODB.storage" name="*" attribute="base" required="yes" />
</sectiontype>
</component>
##############################################################################
#
# Copyright (c) 2010 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import ZODB.blob
import ZODB.interfaces
import zope.interface
class HexStorage(object):
zope.interface.implements(ZODB.interfaces.IStorageWrapper)
copied_methods = (
'close', 'getName', 'getSize', 'history', 'isReadOnly',
'lastTransaction', 'new_oid', 'sortKey',
'tpc_abort', 'tpc_begin', 'tpc_finish', 'tpc_vote',
'loadBlob', 'openCommittedBlobFile', 'temporaryDirectory',
'supportsUndo', 'undo', 'undoLog', 'undoInfo',
)
def __init__(self, base):
self.base = base
base.registerDB(self)
for name in self.copied_methods:
v = getattr(base, name, None)
if v is not None:
setattr(self, name, v)
zope.interface.directlyProvides(self, zope.interface.providedBy(base))
def __getattr__(self, name):
return getattr(self.base, name)
def __len__(self):
return len(self.base)
def load(self, oid, version=''):
data, serial = self.base.load(oid, version)
return data[2:].decode('hex'), serial
def loadBefore(self, oid, tid):
r = self.base.loadBefore(oid, tid)
if r is not None:
data, serial, after = r
return data[2:].decode('hex'), serial, after
else:
return r
def loadSerial(self, oid, serial):
return self.base.loadSerial(oid, serial)[2:].decode('hex')
def pack(self, pack_time, referencesf, gc=True):
def refs(p, oids=None):
return referencesf(p[2:].decode('hex'), oids)
return self.base.pack(pack_time, refs, gc)
def registerDB(self, db):
self.db = db
self._db_transform = db.transform_record_data
self._db_untransform = db.untransform_record_data
_db_transform = _db_untransform = lambda self, data: data
def store(self, oid, serial, data, version, transaction):
return self.base.store(
oid, serial, '.h'+data.encode('hex'), version, transaction)
def restore(self, oid, serial, data, version, prev_txn, transaction):
return self.base.restore(
oid, serial, data and ('.h'+data.encode('hex')), version, prev_txn,
transaction)
def iterator(self, start=None, stop=None):
for t in self.base.iterator(start, stop):
yield Transaction(self, t)
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
return self.base.storeBlob(oid, oldserial, '.h'+data.encode('hex'),
blobfilename, version, transaction)
def restoreBlob(self, oid, serial, data, blobfilename, prev_txn,
transaction):
return self.base.restoreBlob(oid, serial,
data and ('.h'+data.encode('hex')),
blobfilename, prev_txn, transaction)
def invalidateCache(self):
return self.db.invalidateCache()
def invalidate(self, transaction_id, oids, version=''):
return self.db.invalidate(transaction_id, oids, version)
def references(self, record, oids=None):
return self.db.references(record[2:].decode('hex'), oids)
def transform_record_data(self, data):
return '.h'+self._db_transform(data).encode('hex')
def untransform_record_data(self, data):
return self._db_untransform(data[2:].decode('hex'))
def record_iternext(self, next=None):
oid, tid, data, next = self.base.record_iternext(next)
return oid, tid, data[2:].decode('hex'), next
def copyTransactionsFrom(self, other):
ZODB.blob.copyTransactionsFromTo(other, self)
class ServerHexStorage(HexStorage):
"""Use on ZEO storage server when Hex is used on client
Don't do conversion as part of load/store, but provide
pickle decoding.
"""
copied_methods = HexStorage.copied_methods + (
'load', 'loadBefore', 'loadSerial', 'store', 'restore',
'iterator', 'storeBlob', 'restoreBlob', 'record_iternext',
)
class Transaction(object):
def __init__(self, store, trans):
self.__store = store
self.__trans = trans
def __iter__(self):
for r in self.__trans:
if r.data:
r.data = self.__store.untransform_record_data(r.data)
yield r
def __getattr__(self, name):
return getattr(self.__trans, name)
class ZConfigHex:
_factory = HexStorage
def __init__(self, config):
self.config = config
self.name = config.getSectionName()
def open(self):
base = self.config.base.open()
return self._factory(base)
class ZConfigServerHex(ZConfigHex):
_factory = ServerHexStorage
...@@ -16,6 +16,7 @@ import random ...@@ -16,6 +16,7 @@ import random
import transaction import transaction
from ZODB.DB import DB from ZODB.DB import DB
from zope.testing import doctest from zope.testing import doctest
import ZODB.tests.hexstorage
import ZODB.tests.util import ZODB.tests.util
import ZODB.utils import ZODB.utils
import ZODB.DemoStorage import ZODB.DemoStorage
...@@ -78,6 +79,13 @@ class DemoStorageTests( ...@@ -78,6 +79,13 @@ class DemoStorageTests(
checkUndoZombie = checkLoadBeforeUndo checkUndoZombie = checkLoadBeforeUndo
class DemoStorageHexTests(DemoStorageTests):
def setUp(self):
StorageTestBase.StorageTestBase.setUp(self)
self._storage = ZODB.tests.hexstorage.HexStorage(
ZODB.DemoStorage.DemoStorage())
class DemoStorageWrappedBase(DemoStorageTests): class DemoStorageWrappedBase(DemoStorageTests):
def setUp(self): def setUp(self):
...@@ -111,6 +119,11 @@ class DemoStorageWrappedAroundFileStorage(DemoStorageWrappedBase): ...@@ -111,6 +119,11 @@ class DemoStorageWrappedAroundFileStorage(DemoStorageWrappedBase):
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
return FileStorage('FileStorageTests.fs') return FileStorage('FileStorageTests.fs')
class DemoStorageWrappedAroundHexMappingStorage(DemoStorageWrappedBase):
def _makeBaseStorage(self):
from ZODB.MappingStorage import MappingStorage
return ZODB.tests.hexstorage.HexStorage(MappingStorage())
def setUp(test): def setUp(test):
...@@ -232,8 +245,11 @@ def test_suite(): ...@@ -232,8 +245,11 @@ def test_suite():
), ),
)) ))
suite.addTest(unittest.makeSuite(DemoStorageTests, 'check')) suite.addTest(unittest.makeSuite(DemoStorageTests, 'check'))
suite.addTest(unittest.makeSuite(DemoStorageHexTests, 'check'))
suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundFileStorage, suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundFileStorage,
'check')) 'check'))
suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundMappingStorage, suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundMappingStorage,
'check')) 'check'))
suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundHexMappingStorage,
'check'))
return suite return suite
...@@ -15,6 +15,7 @@ import cPickle ...@@ -15,6 +15,7 @@ import cPickle
import os, unittest import os, unittest
import transaction import transaction
import ZODB.FileStorage import ZODB.FileStorage
import ZODB.tests.hexstorage
import ZODB.tests.testblob import ZODB.tests.testblob
import ZODB.tests.util import ZODB.tests.util
import zope.testing.setupstack import zope.testing.setupstack
...@@ -279,13 +280,29 @@ class FileStorageTests( ...@@ -279,13 +280,29 @@ class FileStorageTests(
else: else:
self.assertNotEqual(next_oid, None) self.assertNotEqual(next_oid, None)
class FileStorageHexTests(FileStorageTests):
def open(self, **kwargs):
self._storage = ZODB.tests.hexstorage.HexStorage(
ZODB.FileStorage.FileStorage('FileStorageTests.fs',**kwargs))
class FileStorageTestsWithBlobsEnabled(FileStorageTests): class FileStorageTestsWithBlobsEnabled(FileStorageTests):
def open(self, **kwargs): def open(self, **kwargs):
if 'blob_dir' not in kwargs: if 'blob_dir' not in kwargs:
kwargs = kwargs.copy() kwargs = kwargs.copy()
kwargs['blob_dir'] = 'blobs' kwargs['blob_dir'] = 'blobs'
return FileStorageTests.open(self, **kwargs) FileStorageTests.open(self, **kwargs)
class FileStorageHexTestsWithBlobsEnabled(FileStorageTests):
def open(self, **kwargs):
if 'blob_dir' not in kwargs:
kwargs = kwargs.copy()
kwargs['blob_dir'] = 'blobs'
FileStorageTests.open(self, **kwargs)
self._storage = ZODB.tests.hexstorage.HexStorage(self._storage)
class FileStorageRecoveryTest( class FileStorageRecoveryTest(
StorageTestBase.StorageTestBase, StorageTestBase.StorageTestBase,
...@@ -304,6 +321,15 @@ class FileStorageRecoveryTest( ...@@ -304,6 +321,15 @@ class FileStorageRecoveryTest(
def new_dest(self): def new_dest(self):
return ZODB.FileStorage.FileStorage('Dest.fs') return ZODB.FileStorage.FileStorage('Dest.fs')
class FileStorageHexRecoveryTest(FileStorageRecoveryTest):
def setUp(self):
StorageTestBase.StorageTestBase.setUp(self)
self._storage = ZODB.tests.hexstorage.HexStorage(
ZODB.FileStorage.FileStorage("Source.fs", create=True))
self._dst = ZODB.tests.hexstorage.HexStorage(
ZODB.FileStorage.FileStorage("Dest.fs", create=True))
class FileStorageNoRestore(ZODB.FileStorage.FileStorage): class FileStorageNoRestore(ZODB.FileStorage.FileStorage):
...@@ -634,10 +660,14 @@ def test_suite(): ...@@ -634,10 +660,14 @@ def test_suite():
from zope.testing import doctest from zope.testing import doctest
suite = unittest.TestSuite() suite = unittest.TestSuite()
for klass in [FileStorageTests, Corruption.FileStorageCorruptTests, for klass in [
FileStorageRecoveryTest, FileStorageNoRestoreRecoveryTest, FileStorageTests, FileStorageHexTests,
FileStorageTestsWithBlobsEnabled, AnalyzeDotPyTest, Corruption.FileStorageCorruptTests,
]: FileStorageRecoveryTest, FileStorageHexRecoveryTest,
FileStorageNoRestoreRecoveryTest,
FileStorageTestsWithBlobsEnabled, FileStorageHexTestsWithBlobsEnabled,
AnalyzeDotPyTest,
]:
suite.addTest(unittest.makeSuite(klass, "check")) suite.addTest(unittest.makeSuite(klass, "check"))
suite.addTest(doctest.DocTestSuite( suite.addTest(doctest.DocTestSuite(
setUp=zope.testing.setupstack.setUpDirectory, setUp=zope.testing.setupstack.setUpDirectory,
...@@ -649,9 +679,18 @@ def test_suite(): ...@@ -649,9 +679,18 @@ def test_suite():
test_blob_storage_recovery=True, test_blob_storage_recovery=True,
test_packing=True, test_packing=True,
)) ))
suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
'BlobFileHexStorage',
lambda name, blob_dir:
ZODB.tests.hexstorage.HexStorage(
ZODB.FileStorage.FileStorage('%s.fs' % name, blob_dir=blob_dir)),
test_blob_storage_recovery=True,
test_packing=True,
))
suite.addTest(PackableStorage.IExternalGC_suite( suite.addTest(PackableStorage.IExternalGC_suite(
lambda : ZODB.FileStorage.FileStorage( lambda : ZODB.FileStorage.FileStorage(
'data.fs', blob_dir='blobs', pack_gc=False))) 'data.fs', blob_dir='blobs', pack_gc=False)))
suite.layer = ZODB.tests.util.MininalTestLayer('testFileStorage')
return suite return suite
if __name__=='__main__': if __name__=='__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment