Commit 495e1551 authored by Jim Fulton's avatar Jim Fulton

Cleaned up the Z309 ZEO protocol, removing versions from arguments and

return values.  This in turn simplified the client and server
software.

Added code to select different client and server stubs and input
handlers depening on whether the Z309 or earlier protocols are used.

ZODB 3.8 clients can now talk to ZODB 3.9 servers and the other way
around.
parent 8e35276f
...@@ -109,7 +109,7 @@ class ClientStorage(object): ...@@ -109,7 +109,7 @@ class ClientStorage(object):
TransactionBufferClass = TransactionBuffer TransactionBufferClass = TransactionBuffer
ClientCacheClass = ClientCache ClientCacheClass = ClientCache
ConnectionManagerClass = ConnectionManager ConnectionManagerClass = ConnectionManager
StorageServerStubClass = ServerStub.StorageServer StorageServerStubClass = ServerStub.stub
def __init__(self, addr, storage='1', cache_size=20 * MB, def __init__(self, addr, storage='1', cache_size=20 * MB,
name='', client=None, debug=0, var=None, name='', client=None, debug=0, var=None,
...@@ -566,7 +566,8 @@ class ClientStorage(object): ...@@ -566,7 +566,8 @@ class ClientStorage(object):
ZODB.interfaces.IStorageCurrentRecordIteration, ZODB.interfaces.IStorageCurrentRecordIteration,
ZODB.interfaces.IBlobStorage, ZODB.interfaces.IBlobStorage,
): ):
if (iface.__module__, iface.__name__) in self._info.get('interfaces', ()): if (iface.__module__, iface.__name__) in self._info.get(
'interfaces', ()):
zope.interface.alsoProvides(self, iface) zope.interface.alsoProvides(self, iface)
def _handle_extensions(self): def _handle_extensions(self):
...@@ -1208,8 +1209,13 @@ class ClientStorage(object): ...@@ -1208,8 +1209,13 @@ class ClientStorage(object):
self._pickler = cPickle.Pickler(self._tfile, 1) self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo self._pickler.fast = 1 # Don't use the memo
if self._connection.peer_protocol_version < 'Z309':
client = ClientStorage308Adapter(self)
else:
client = self
# allow incoming invalidations: # allow incoming invalidations:
self._connection.register_object(self) self._connection.register_object(client)
# If verify_cache() finishes the cache verification process, # If verify_cache() finishes the cache verification process,
# it should set self._server. If it goes through full cache # it should set self._server. If it goes through full cache
...@@ -1279,8 +1285,8 @@ class ClientStorage(object): ...@@ -1279,8 +1285,8 @@ class ClientStorage(object):
server.endZeoVerify() server.endZeoVerify()
return "full verification" return "full verification"
def invalidateVerify(self, args): def invalidateVerify(self, oid):
"""Server callback to invalidate an (oid, '') pair. """Server callback to invalidate an oid pair.
This is called as part of cache validation. This is called as part of cache validation.
""" """
...@@ -1290,7 +1296,7 @@ class ClientStorage(object): ...@@ -1290,7 +1296,7 @@ class ClientStorage(object):
# This should never happen. # This should never happen.
logger.error("%s invalidateVerify with no _pickler", self.__name__) logger.error("%s invalidateVerify with no _pickler", self.__name__)
return return
self._pickler.dump((None, [args[0]])) self._pickler.dump((None, [oid]))
def endVerify(self): def endVerify(self):
"""Server callback to signal end of cache validation.""" """Server callback to signal end of cache validation."""
...@@ -1304,10 +1310,7 @@ class ClientStorage(object): ...@@ -1304,10 +1310,7 @@ class ClientStorage(object):
try: try:
if catch_up: if catch_up:
# process catch-up invalidations # process catch-up invalidations
tid, invalidations = catch_up self._process_invalidations(*catch_up)
self._process_invalidations(
tid, (arg[0] for arg in invalidations)
)
if self._pickler is None: if self._pickler is None:
return return
...@@ -1337,7 +1340,7 @@ class ClientStorage(object): ...@@ -1337,7 +1340,7 @@ class ClientStorage(object):
self._pending_server = None self._pending_server = None
def invalidateTransaction(self, tid, args): def invalidateTransaction(self, tid, oids):
"""Server callback: Invalidate objects modified by tid.""" """Server callback: Invalidate objects modified by tid."""
self._lock.acquire() self._lock.acquire()
try: try:
...@@ -1345,14 +1348,13 @@ class ClientStorage(object): ...@@ -1345,14 +1348,13 @@ class ClientStorage(object):
logger.debug( logger.debug(
"%s Transactional invalidation during cache verification", "%s Transactional invalidation during cache verification",
self.__name__) self.__name__)
self._pickler.dump((tid, [arg[0] for arg in args])) self._pickler.dump((tid, oids))
else: else:
self._process_invalidations(tid, (arg[0] for arg in args)) self._process_invalidations(tid, oids)
finally: finally:
self._lock.release() self._lock.release()
def _process_invalidations(self, tid, oids): def _process_invalidations(self, tid, oids):
oids = list(oids)
for oid in oids: for oid in oids:
if oid == self._load_oid: if oid == self._load_oid:
self._load_status = 0 self._load_status = 0
...@@ -1363,8 +1365,8 @@ class ClientStorage(object): ...@@ -1363,8 +1365,8 @@ class ClientStorage(object):
# The following are for compatibility with protocol version 2.0.0 # The following are for compatibility with protocol version 2.0.0
def invalidateTrans(self, args): def invalidateTrans(self, oids):
return self.invalidateTransaction(None, args) return self.invalidateTransaction(None, oids)
invalidate = invalidateVerify invalidate = invalidateVerify
end = endVerify end = endVerify
...@@ -1482,3 +1484,17 @@ class RecordIterator(object): ...@@ -1482,3 +1484,17 @@ class RecordIterator(object):
self._completed = True self._completed = True
raise ZODB.interfaces.StorageStopIteration() raise ZODB.interfaces.StorageStopIteration()
return ZODB.BaseStorage.DataRecord(*item) return ZODB.BaseStorage.DataRecord(*item)
class ClientStorage308Adapter:
def __init__(self, client):
self.client = client
def invalidateTransaction(self, tid, args):
self.client.invalidateTransaction(tid, [arg[0] for arg in args])
def invalidateVerify(self, arg):
self.client.invalidateVerify(arg[0])
def __getattr__(self, name):
return getattr(self.client, name)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""RPC stubs for interface exported by ClientStorage."""
class ClientStorage:
"""An RPC stub class for the interface exported by ClientStorage.
This is the interface presented by ClientStorage to the
StorageServer; i.e. the StorageServer calls these methods and they
are executed in the ClientStorage.
See the ClientStorage class for documentation on these methods.
It is currently important that all methods here are asynchronous
(meaning they don't have a return value and the caller doesn't
wait for them to complete), *and* that none of them cause any
calls from the client to the storage. This is due to limitations
in the zrpc subpackage.
The on-the-wire names of some of the methods don't match the
Python method names. That's because the on-the-wire protocol was
fixed for ZEO 2 and we don't want to change it. There are some
aliases in ClientStorage.py to make up for this.
"""
def __init__(self, rpc):
"""Constructor.
The argument is a connection: an instance of the
zrpc.connection.Connection class.
"""
self.rpc = rpc
def beginVerify(self):
self.rpc.callAsync('beginVerify')
def invalidateVerify(self, args):
self.rpc.callAsync('invalidateVerify', args)
def endVerify(self):
self.rpc.callAsync('endVerify')
def invalidateTransaction(self, tid, args):
self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)
def info(self, arg):
self.rpc.callAsync('info', arg)
def storeBlob(self, oid, serial, blobfilename):
def store():
yield ('receiveBlobStart', (oid, serial))
f = open(blobfilename, 'rb')
while 1:
chunk = f.read(59000)
if not chunk:
break
yield ('receiveBlobChunk', (oid, serial, chunk, ))
f.close()
yield ('receiveBlobStop', (oid, serial))
self.rpc.callAsyncIterator(store())
...@@ -48,15 +48,6 @@ class StorageServer: ...@@ -48,15 +48,6 @@ class StorageServer:
""" """
self.rpc = rpc self.rpc = rpc
# Wait until we know what version the other side is using.
while rpc.peer_protocol_version is None:
time.sleep(0.1)
if rpc.peer_protocol_version == 'Z200':
self.lastTransaction = lambda: None
self.getInvalidations = lambda tid: None
self.getAuthProtocol = lambda: None
def extensionMethod(self, name): def extensionMethod(self, name):
return ExtensionMethodWrapper(self.rpc, name).call return ExtensionMethodWrapper(self.rpc, name).call
...@@ -111,15 +102,15 @@ class StorageServer: ...@@ -111,15 +102,15 @@ class StorageServer:
return self.rpc.call('getInvalidations', tid) return self.rpc.call('getInvalidations', tid)
## ##
# Check whether serial numbers s and sv are current for oid. # Check whether a serial number is current for oid.
# If one or both of the serial numbers are not current, the # If the serial number is not current, the
# server will make an asynchronous invalidateVerify() call. # server will make an asynchronous invalidateVerify() call.
# @param oid object id # @param oid object id
# @param s serial number on non-version data # @param s serial number
# @defreturn async # @defreturn async
def zeoVerify(self, oid, s): def zeoVerify(self, oid, s):
self.rpc.callAsync('zeoVerify', oid, s, None) self.rpc.callAsync('zeoVerify', oid, s)
## ##
# Check whether current serial number is valid for oid. # Check whether current serial number is valid for oid.
...@@ -130,7 +121,7 @@ class StorageServer: ...@@ -130,7 +121,7 @@ class StorageServer:
# @defreturn async # @defreturn async
def verify(self, oid, serial): def verify(self, oid, serial):
self.rpc.callAsync('verify', oid, '', serial) self.rpc.callAsync('verify', oid, serial)
## ##
# Signal to the server that cache verification is done. # Signal to the server that cache verification is done.
...@@ -183,7 +174,7 @@ class StorageServer: ...@@ -183,7 +174,7 @@ class StorageServer:
# @exception KeyError if oid is not found # @exception KeyError if oid is not found
def loadEx(self, oid): def loadEx(self, oid):
return self.rpc.call("loadEx", oid, '')[:2] return self.rpc.call("loadEx", oid)
## ##
# Return non-current data along with transaction ids that identify # Return non-current data along with transaction ids that identify
...@@ -207,12 +198,11 @@ class StorageServer: ...@@ -207,12 +198,11 @@ class StorageServer:
# @defreturn async # @defreturn async
def storea(self, oid, serial, data, id): def storea(self, oid, serial, data, id):
self.rpc.callAsync('storea', oid, serial, data, '', id) self.rpc.callAsync('storea', oid, serial, data, id)
def restorea(self, oid, serial, data, prev_txn, id): def restorea(self, oid, serial, data, prev_txn, id):
self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id) self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
def storeBlob(self, oid, serial, data, blobfilename, txn): def storeBlob(self, oid, serial, data, blobfilename, txn):
# Store a blob to the server. We don't want to real all of # Store a blob to the server. We don't want to real all of
...@@ -228,13 +218,12 @@ class StorageServer: ...@@ -228,13 +218,12 @@ class StorageServer:
break break
yield ('storeBlobChunk', (chunk, )) yield ('storeBlobChunk', (chunk, ))
f.close() f.close()
yield ('storeBlobEnd', (oid, serial, data, '', id(txn))) yield ('storeBlobEnd', (oid, serial, data, id(txn)))
self.rpc.callAsyncIterator(store()) self.rpc.callAsyncIterator(store())
def storeBlobShared(self, oid, serial, data, filename, id): def storeBlobShared(self, oid, serial, data, filename, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, id)
'', id)
## ##
# Start two-phase commit for a transaction # Start two-phase commit for a transaction
...@@ -262,16 +251,13 @@ class StorageServer: ...@@ -262,16 +251,13 @@ class StorageServer:
def history(self, oid, length=None): def history(self, oid, length=None):
if length is None: if length is None:
return self.rpc.call('history', oid, '') return self.rpc.call('history', oid)
else: else:
return self.rpc.call('history', oid, '', length) return self.rpc.call('history', oid, length)
def record_iternext(self, next): def record_iternext(self, next):
return self.rpc.call('record_iternext', next) return self.rpc.call('record_iternext', next)
def load(self, oid):
return self.rpc.call('load', oid, '')
def sendBlob(self, oid, serial): def sendBlob(self, oid, serial):
return self.rpc.call('sendBlob', oid, serial) return self.rpc.call('sendBlob', oid, serial)
...@@ -284,9 +270,6 @@ class StorageServer: ...@@ -284,9 +270,6 @@ class StorageServer:
def new_oid(self): def new_oid(self):
return self.rpc.call('new_oid') return self.rpc.call('new_oid')
def store(self, oid, serial, data, trans):
return self.rpc.call('store', oid, serial, data, '', trans)
def undo(self, trans_id, trans): def undo(self, trans_id, trans):
return self.rpc.call('undo', trans_id, trans) return self.rpc.call('undo', trans_id, trans)
...@@ -311,6 +294,90 @@ class StorageServer: ...@@ -311,6 +294,90 @@ class StorageServer:
def iterator_gc(self, iids): def iterator_gc(self, iids):
return self.rpc.callAsync('iterator_gc', iids) return self.rpc.callAsync('iterator_gc', iids)
class StorageServer308(StorageServer):
def __init__(self, rpc):
if rpc.peer_protocol_version == 'Z200':
self.lastTransaction = lambda: None
self.getInvalidations = lambda tid: None
self.getAuthProtocol = lambda: None
StorageServer.__init__(self, rpc)
def history(self, oid, length=None):
if length is None:
return self.rpc.call('history', oid, '')
else:
return self.rpc.call('history', oid, '', length)
def getInvalidations(self, tid):
# Not in protocol version 2.0.0; see __init__()
result = self.rpc.call('getInvalidations', tid)
if result is not None:
result = result[0], [oid for (oid, version) in result[1]]
return result
def verify(self, oid, serial):
self.rpc.callAsync('verify', oid, '', serial)
def loadEx(self, oid):
return self.rpc.call("loadEx", oid, '')[:2]
def storea(self, oid, serial, data, id):
self.rpc.callAsync('storea', oid, serial, data, '', id)
def storeBlob(self, oid, serial, data, blobfilename, txn):
# Store a blob to the server. We don't want to real all of
# the data into memory, so we use a message iterator. This
# allows us to read the blob data as needed.
def store():
yield ('storeBlobStart', ())
f = open(blobfilename, 'rb')
while 1:
chunk = f.read(59000)
if not chunk:
break
yield ('storeBlobChunk', (chunk, ))
f.close()
yield ('storeBlobEnd', (oid, serial, data, '', id(txn)))
self.rpc.callAsyncIterator(store())
def storeBlobShared(self, oid, serial, data, filename, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename,
'', id)
def zeoVerify(self, oid, s):
self.rpc.callAsync('zeoVerify', oid, s, None)
def iterator_start(self, start, stop):
raise NotImplementedError
def iterator_next(self, iid):
raise NotImplementedError
def iterator_record_start(self, txn_iid, tid):
raise NotImplementedError
def iterator_record_next(self, iid):
raise NotImplementedError
def iterator_gc(self, iids):
raise NotImplementedError
def stub(client, connection):
# Wait until we know what version the other side is using.
while connection.peer_protocol_version is None:
time.sleep(0.1)
if connection.peer_protocol_version < 'Z309':
return StorageServer308(connection)
return StorageServer(connection)
class ExtensionMethodWrapper: class ExtensionMethodWrapper:
def __init__(self, rpc, name): def __init__(self, rpc, name):
......
...@@ -37,7 +37,6 @@ import ZODB.serialize ...@@ -37,7 +37,6 @@ import ZODB.serialize
import ZEO.zrpc.error import ZEO.zrpc.error
import zope.interface import zope.interface
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer from ZEO.monitor import StorageStats, StatsServer
from ZEO.zrpc.server import Dispatcher from ZEO.zrpc.server import Dispatcher
...@@ -81,9 +80,6 @@ class StorageServerError(StorageError): ...@@ -81,9 +80,6 @@ class StorageServerError(StorageError):
class ZEOStorage: class ZEOStorage:
"""Proxy to underlying storage for a single remote client.""" """Proxy to underlying storage for a single remote client."""
# Classes we instantiate. A subclass might override.
ClientStorageStubClass = ClientStub.ClientStorage
# A list of extension methods. A subclass with extra methods # A list of extension methods. A subclass with extra methods
# should override. # should override.
extensions = [] extensions = []
...@@ -128,7 +124,12 @@ class ZEOStorage: ...@@ -128,7 +124,12 @@ class ZEOStorage:
def notifyConnected(self, conn): def notifyConnected(self, conn):
self.connection = conn # For restart_other() below self.connection = conn # For restart_other() below
self.client = self.ClientStorageStubClass(conn) assert conn.peer_protocol_version is not None
if conn.peer_protocol_version < 'Z309':
self.client = ClientStub308(conn)
conn.register_object(ZEOStorage308Adapter(self))
else:
self.client = ClientStub(conn)
addr = conn.addr addr = conn.addr
if isinstance(addr, type("")): if isinstance(addr, type("")):
label = addr label = addr
...@@ -208,7 +209,6 @@ class ZEOStorage: ...@@ -208,7 +209,6 @@ class ZEOStorage:
else: else:
raise raise
def _check_tid(self, tid, exc=None): def _check_tid(self, tid, exc=None):
if self.read_only: if self.read_only:
raise ReadOnlyError() raise ReadOnlyError()
...@@ -288,7 +288,6 @@ class ZEOStorage: ...@@ -288,7 +288,6 @@ class ZEOStorage:
'size': storage.getSize(), 'size': storage.getSize(),
'name': storage.getName(), 'name': storage.getName(),
'supportsUndo': supportsUndo, 'supportsUndo': supportsUndo,
'supportsVersions': False,
'extensionMethods': self.getExtensionMethods(), 'extensionMethods': self.getExtensionMethods(),
'supports_record_iternext': hasattr(self, 'record_iternext'), 'supports_record_iternext': hasattr(self, 'record_iternext'),
'interfaces': tuple(interfaces), 'interfaces': tuple(interfaces),
...@@ -302,23 +301,14 @@ class ZEOStorage: ...@@ -302,23 +301,14 @@ class ZEOStorage:
def getExtensionMethods(self): def getExtensionMethods(self):
return self._extensions return self._extensions
def loadEx(self, oid, version=''): def loadEx(self, oid):
self.stats.loads += 1 self.stats.loads += 1
if version: return self.storage.load(oid, '')
raise StorageServerError("Versions aren't supported.")
data, serial = self.storage.load(oid, '')
return data, serial, ''
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
self.stats.loads += 1 self.stats.loads += 1
return self.storage.loadBefore(oid, tid) return self.storage.loadBefore(oid, tid)
def zeoLoad(self, oid):
self.stats.loads += 1
p, s = self.storage.load(oid, '')
return p, s, '', None, None
def getInvalidations(self, tid): def getInvalidations(self, tid):
invtid, invlist = self.server.get_invalidations(self.storage_id, tid) invtid, invlist = self.server.get_invalidations(self.storage_id, tid)
if invtid is None: if invtid is None:
...@@ -327,18 +317,16 @@ class ZEOStorage: ...@@ -327,18 +317,16 @@ class ZEOStorage:
% (len(invlist), u64(invtid))) % (len(invlist), u64(invtid)))
return invtid, invlist return invtid, invlist
def verify(self, oid, version, tid): def verify(self, oid, tid):
if version:
raise StorageServerError("Versions aren't supported.")
try: try:
t = self.getTid(oid) t = self.getTid(oid)
except KeyError: except KeyError:
self.client.invalidateVerify((oid, "")) self.client.invalidateVerify(oid)
else: else:
if tid != t: if tid != t:
self.client.invalidateVerify((oid, '')) self.client.invalidateVerify(oid)
def zeoVerify(self, oid, s, sv=None): def zeoVerify(self, oid, s):
if not self.verifying: if not self.verifying:
self.verifying = 1 self.verifying = 1
self.stats.verifying_clients += 1 self.stats.verifying_clients += 1
...@@ -350,9 +338,6 @@ class ZEOStorage: ...@@ -350,9 +338,6 @@ class ZEOStorage:
# could be caused by an object uncreation, in which case # could be caused by an object uncreation, in which case
# invalidation is right. It could be an application bug # invalidation is right. It could be an application bug
# that left a dangling reference, in which case it's bad. # that left a dangling reference, in which case it's bad.
else:
if sv:
raise StorageServerError("Versions aren't supported.")
else: else:
if s != os: if s != os:
self.client.invalidateVerify((oid, '')) self.client.invalidateVerify((oid, ''))
...@@ -483,9 +468,7 @@ class ZEOStorage: ...@@ -483,9 +468,7 @@ class ZEOStorage:
# Most of the real implementations are in methods beginning with # Most of the real implementations are in methods beginning with
# an _. # an _.
def storea(self, oid, serial, data, version, id): def storea(self, oid, serial, data, id):
if version:
raise StorageServerError("Versions aren't supported.")
self._check_tid(id, exc=StorageTransactionError) self._check_tid(id, exc=StorageTransactionError)
self.stats.stores += 1 self.stats.stores += 1
self.txnlog.store(oid, serial, data) self.txnlog.store(oid, serial, data)
...@@ -503,17 +486,13 @@ class ZEOStorage: ...@@ -503,17 +486,13 @@ class ZEOStorage:
def storeBlobChunk(self, chunk): def storeBlobChunk(self, chunk):
os.write(self.blob_tempfile[0], chunk) os.write(self.blob_tempfile[0], chunk)
def storeBlobEnd(self, oid, serial, data, version, id): def storeBlobEnd(self, oid, serial, data, id):
if version:
raise StorageServerError("Versions aren't supported.")
fd, tempname = self.blob_tempfile fd, tempname = self.blob_tempfile
self.blob_tempfile = None self.blob_tempfile = None
os.close(fd) os.close(fd)
self.blob_log.append((oid, serial, data, tempname)) self.blob_log.append((oid, serial, data, tempname))
def storeBlobShared(self, oid, serial, data, filename, version, id): def storeBlobShared(self, oid, serial, data, filename, id):
if version:
raise StorageServerError("Versions aren't supported.")
# Reconstruct the full path from the filename in the OID directory # Reconstruct the full path from the filename in the OID directory
filename = os.path.join(self.storage.fshelper.getPathForOID(oid), filename = os.path.join(self.storage.fshelper.getPathForOID(oid),
filename) filename)
...@@ -570,7 +549,7 @@ class ZEOStorage: ...@@ -570,7 +549,7 @@ class ZEOStorage:
newserial = [(oid, err)] newserial = [(oid, err)]
else: else:
if serial != "\0\0\0\0\0\0\0\0": if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append((oid, '')) self.invalidated.append(oid)
if isinstance(newserial, str): if isinstance(newserial, str):
newserial = [(oid, newserial)] newserial = [(oid, newserial)]
...@@ -633,8 +612,7 @@ class ZEOStorage: ...@@ -633,8 +612,7 @@ class ZEOStorage:
def _undo(self, trans_id): def _undo(self, trans_id):
tid, oids = self.storage.undo(trans_id, self.transaction) tid, oids = self.storage.undo(trans_id, self.transaction)
inv = [(oid, None) for oid in oids] self.invalidated.extend(oids)
self.invalidated.extend(inv)
return tid, oids return tid, oids
# When a delayed transaction is restarted, the dance is # When a delayed transaction is restarted, the dance is
...@@ -723,20 +701,6 @@ class ZEOStorage: ...@@ -723,20 +701,6 @@ class ZEOStorage:
else: else:
return 1 return 1
def modifiedInVersion(self, oid):
return ''
def versions(self):
return ()
def versionEmpty(self, version):
return True
def commitVersion(self, *a, **k):
raise NotImplementedError
abortVersion = commitVersion
# IStorageIteration support # IStorageIteration support
def iterator_start(self, start, stop): def iterator_start(self, start, stop):
...@@ -785,7 +749,6 @@ class ZEOStorage: ...@@ -785,7 +749,6 @@ class ZEOStorage:
item = (info.oid, item = (info.oid,
info.tid, info.tid,
info.data, info.data,
info.version,
info.data_txn) info.data_txn)
return item return item
...@@ -805,10 +768,7 @@ class StorageServerDB: ...@@ -805,10 +768,7 @@ class StorageServerDB:
if version: if version:
raise StorageServerError("Versions aren't supported.") raise StorageServerError("Versions aren't supported.")
storage_id = self.storage_id storage_id = self.storage_id
self.server.invalidate( self.server.invalidate(None, storage_id, tid, oids)
None, storage_id, tid,
[(oid, '') for oid in oids],
)
for zeo_server in self.server.connections.get(storage_id, ())[:]: for zeo_server in self.server.connections.get(storage_id, ())[:]:
try: try:
zeo_server.connection.poll() zeo_server.connection.poll()
...@@ -1081,7 +1041,7 @@ class StorageServer: ...@@ -1081,7 +1041,7 @@ class StorageServer:
This is called from several ZEOStorage methods. This is called from several ZEOStorage methods.
invalidated is a sequence of oid, empty-string pairs. invalidated is a sequence of oids.
This can do three different things: This can do three different things:
...@@ -1306,3 +1266,129 @@ class SlowMethodThread(threading.Thread): ...@@ -1306,3 +1266,129 @@ class SlowMethodThread(threading.Thread):
self.delay.error(sys.exc_info()) self.delay.error(sys.exc_info())
else: else:
self.delay.reply(result) self.delay.reply(result)
class ClientStub:
def __init__(self, rpc):
self.rpc = rpc
def beginVerify(self):
self.rpc.callAsync('beginVerify')
def invalidateVerify(self, args):
self.rpc.callAsync('invalidateVerify', args)
def endVerify(self):
self.rpc.callAsync('endVerify')
def invalidateTransaction(self, tid, args):
self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)
def info(self, arg):
self.rpc.callAsync('info', arg)
def storeBlob(self, oid, serial, blobfilename):
def store():
yield ('receiveBlobStart', (oid, serial))
f = open(blobfilename, 'rb')
while 1:
chunk = f.read(59000)
if not chunk:
break
yield ('receiveBlobChunk', (oid, serial, chunk, ))
f.close()
yield ('receiveBlobStop', (oid, serial))
self.rpc.callAsyncIterator(store())
class ClientStub308(ClientStub):
def invalidateTransaction(self, tid, args):
self.rpc.callAsyncNoPoll(
'invalidateTransaction', tid, [(arg, '') for arg in args])
def invalidateVerify(self, oid):
self.rpc.callAsync('invalidateVerify', (oid, ''))
class ZEOStorage308Adapter:
def __init__(self, storage):
self.storage = storage
def getSerial(self, oid):
return self.storage.loadEx(oid)[1] # Z200
def history(self, oid, version, size=1):
if version:
raise ValueError("Versions aren't supported.")
return self.storage.history(oid, size)
def getInvalidations(self, tid):
result = self.storage.getInvalidations(tid)
if result is not None:
result = result[0], [(oid, '') for oid in result[1]]
return result
def verify(self, oid, version, tid):
if version:
raise StorageServerError("Versions aren't supported.")
return self.storage.verify(oid, tid)
def loadEx(self, oid, version=''):
if version:
raise StorageServerError("Versions aren't supported.")
data, serial = self.storage.loadEx(oid)
return data, serial, ''
def storea(self, oid, serial, data, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
self.storage.storea(oid, serial, data, id)
def storeBlobEnd(self, oid, serial, data, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
self.storage.storeBlobEnd(oid, serial, data, id)
def storeBlobShared(self, oid, serial, data, filename, version, id):
if version:
raise StorageServerError("Versions aren't supported.")
self.storage.storeBlobShared(oid, serial, data, filename, id)
def getInfo(self):
result = self.storage.getInfo()
result['supportsVersions'] = False
return result
def zeoVerify(self, oid, s, sv=None):
if sv:
raise StorageServerError("Versions aren't supported.")
self.storage.zeoVerify(oid, s)
def modifiedInVersion(self, oid):
return ''
def versions(self):
return ()
def versionEmpty(self, version):
return True
def commitVersion(self, *a, **k):
raise NotImplementedError
abortVersion = commitVersion
def zeoLoad(self, oid): # Z200
p, s = self.storage,loadEx(oid)
return p, s, '', None, None
def __getattr__(self, name):
return getattr(self.storage, name)
...@@ -620,7 +620,7 @@ class InvqTests(CommonSetupTearDown): ...@@ -620,7 +620,7 @@ class InvqTests(CommonSetupTearDown):
perstorage = self.openClientStorage(cache="test") perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "quick verification") self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage._server._last_invals, self.assertEqual(perstorage._server._last_invals,
(revid, [(oid, '')])) (revid, [oid]))
self.assertEqual(perstorage.load(oid, ''), self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, '')) self._storage.load(oid, ''))
......
...@@ -86,7 +86,7 @@ def encode_format(fmt): ...@@ -86,7 +86,7 @@ def encode_format(fmt):
def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
path='Data.fs'): path='Data.fs', protocol=None):
"""Start a ZEO server in a separate process. """Start a ZEO server in a separate process.
Takes two positional arguments a string containing the storage conf Takes two positional arguments a string containing the storage conf
...@@ -126,6 +126,9 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, ...@@ -126,6 +126,9 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
args = [qa(sys.executable), qa(script), '-C', qa(tmpfile)] args = [qa(sys.executable), qa(script), '-C', qa(tmpfile)]
if keep: if keep:
args.append("-k") args.append("-k")
if protocol:
args.extend(["-v", protocol])
d = os.environ.copy() d = os.environ.copy()
d['PYTHONPATH'] = os.pathsep.join(sys.path) d['PYTHONPATH'] = os.pathsep.join(sys.path)
...@@ -276,7 +279,7 @@ def setUp(test): ...@@ -276,7 +279,7 @@ def setUp(test):
servers = {} servers = {}
def start_server(storage_conf=None, zeo_conf=None, port=None, keep=False, def start_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
addr=None): addr=None, path='Data.fs', protocol=None):
"""Start a ZEO server. """Start a ZEO server.
Return the server and admin addresses. Return the server and admin addresses.
...@@ -289,7 +292,7 @@ def setUp(test): ...@@ -289,7 +292,7 @@ def setUp(test):
elif addr is not None: elif addr is not None:
raise TypeError("Can't specify port and addr") raise TypeError("Can't specify port and addr")
addr, adminaddr, pid, config_path = start_zeo_server( addr, adminaddr, pid, config_path = start_zeo_server(
storage_conf, zeo_conf, port, keep) storage_conf, zeo_conf, port, keep, path, protocol)
os.remove(config_path) os.remove(config_path)
servers[adminaddr] = pid servers[adminaddr] = pid
return addr, adminaddr return addr, adminaddr
......
Test that multiple protocols are supported
==========================================
A full test of all protocols isn't practical. But we'll do a limited
test that at least the current and previous protocols are supported in
both directions.
Let's start a Z308 server
>>> storage_conf = '''
... <blobstorage>
... blob-dir server-blobs
... <filestorage>
... path Data.fs
... </filestorage>
... </blobstorage>
... '''
>>> addr, admin = start_server(
... storage_conf, dict(invalidation_queue_size=5), protocol='Z308')
A current client should be able to connect to a old server:
>>> import ZEO, ZODB.blob, transaction
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> wait_connected(db.storage)
>>> db.storage._connection.peer_protocol_version
'Z308'
>>> conn = db.open()
>>> conn.root().x = 0
>>> transaction.commit()
>>> len(db.history(conn.root()._p_oid, 99))
2
>>> conn.root()['blob1'] = ZODB.blob.Blob()
>>> conn.root()['blob1'].open('w').write('blob data 1')
>>> transaction.commit()
>>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True)
>>> wait_connected(db2.storage)
>>> conn2 = db2.open()
>>> for i in range(5):
... conn2.root().x += 1
... transaction.commit()
>>> conn2.root()['blob2'] = ZODB.blob.Blob()
>>> conn2.root()['blob2'].open('w').write('blob data 2')
>>> transaction.commit()
>>> conn.sync()
>>> conn.root().x
5
>>> db.close()
>>> for i in range(2):
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x
7
>>> db.close()
>>> for i in range(10):
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x
17
>>> conn.root()['blob1'].open().read()
'blob data 1'
>>> conn.root()['blob2'].open().read()
'blob data 2'
Note that when taking to a 3.8 server, iteration won't work:
>>> db.storage.iterator()
Traceback (most recent call last):
...
NotImplementedError
>>> db2.close()
>>> db.close()
>>> stop_server(admin)
>>> import os, zope.testing.setupstack
>>> os.remove('client-1.zec')
>>> zope.testing.setupstack.rmtree('blobs')
>>> zope.testing.setupstack.rmtree('server-blobs')
And the other way around:
>>> addr, _ = start_server(storage_conf, dict(invalidation_queue_size=5))
Note that we'll have to pull some hijinks:
>>> import ZEO.zrpc.connection
>>> old_current_protocol = ZEO.zrpc.connection.Connection.current_protocol
>>> ZEO.zrpc.connection.Connection.current_protocol = 'Z308'
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> db.storage._connection.peer_protocol_version
'Z308'
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x = 0
>>> transaction.commit()
>>> len(db.history(conn.root()._p_oid, 99))
2
>>> conn.root()['blob1'] = ZODB.blob.Blob()
>>> conn.root()['blob1'].open('w').write('blob data 1')
>>> transaction.commit()
>>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True)
>>> wait_connected(db2.storage)
>>> conn2 = db2.open()
>>> for i in range(5):
... conn2.root().x += 1
... transaction.commit()
>>> conn2.root()['blob2'] = ZODB.blob.Blob()
>>> conn2.root()['blob2'].open('w').write('blob data 2')
>>> transaction.commit()
>>> conn.sync()
>>> conn.root().x
5
>>> db.close()
>>> for i in range(2):
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x
7
>>> db.close()
>>> for i in range(10):
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x
17
>>> conn.root()['blob1'].open().read()
'blob data 1'
>>> conn.root()['blob2'].open().read()
'blob data 2'
>>> db2.close()
>>> db.close()
Undo the hijinks:
>>> ZEO.zrpc.connection.Connection.current_protocol = old_current_protocol
...@@ -9,8 +9,8 @@ multiple storage servers. ...@@ -9,8 +9,8 @@ multiple storage servers.
We'll create a Faux storage that has a registerDB method. We'll create a Faux storage that has a registerDB method.
>>> class FauxStorage: >>> class FauxStorage:
... invalidations = [('trans0', [('ob0', '')]), ... invalidations = [('trans0', ['ob0']),
... ('trans1', [('ob0', ''), ('ob1', '')]), ... ('trans1', ['ob0', 'ob1']),
... ] ... ]
... def registerDB(self, db): ... def registerDB(self, db):
... self.db = db ... self.db = db
...@@ -85,24 +85,24 @@ Now, if we call invalidate, we'll see it propigate to the client: ...@@ -85,24 +85,24 @@ Now, if we call invalidate, we'll see it propigate to the client:
>>> storage.db.invalidate('trans2', ['ob1', 'ob2']) >>> storage.db.invalidate('trans2', ['ob1', 'ob2'])
invalidateTransaction trans2 1 invalidateTransaction trans2 1
[('ob1', ''), ('ob2', '')] ['ob1', 'ob2']
invalidateTransaction trans2 2 invalidateTransaction trans2 2
[('ob1', ''), ('ob2', '')] ['ob1', 'ob2']
>>> storage.db.invalidate('trans3', ['ob1', 'ob2']) >>> storage.db.invalidate('trans3', ['ob1', 'ob2'])
invalidateTransaction trans3 1 invalidateTransaction trans3 1
[('ob1', ''), ('ob2', '')] ['ob1', 'ob2']
invalidateTransaction trans3 2 invalidateTransaction trans3 2
[('ob1', ''), ('ob2', '')] ['ob1', 'ob2']
The storage servers queue will reflect the invalidations: The storage servers queue will reflect the invalidations:
>>> for tid, invalidated in server.invq['t']: >>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated ... print repr(tid), invalidated
'trans3' [('ob1', ''), ('ob2', '')] 'trans3' ['ob1', 'ob2']
'trans2' [('ob1', ''), ('ob2', '')] 'trans2' ['ob1', 'ob2']
'trans1' [('ob0', ''), ('ob1', '')] 'trans1' ['ob0', 'ob1']
'trans0' [('ob0', '')] 'trans0' ['ob0']
If we call invalidateCache, the storage server will close each of it's If we call invalidateCache, the storage server will close each of it's
connections: connections:
...@@ -117,5 +117,5 @@ The servers's invalidation queue will get reset ...@@ -117,5 +117,5 @@ The servers's invalidation queue will get reset
>>> for tid, invalidated in server.invq['t']: >>> for tid, invalidated in server.invq['t']:
... print repr(tid), invalidated ... print repr(tid), invalidated
'trans1' [('ob0', ''), ('ob1', '')] 'trans1' ['ob0', 'ob1']
'trans0' [('ob0', '')] 'trans0' ['ob0']
...@@ -818,7 +818,7 @@ class StorageServerWrapper: ...@@ -818,7 +818,7 @@ class StorageServerWrapper:
return result return result
def store(self, oid, serial, data, version_ignored, transaction): def store(self, oid, serial, data, version_ignored, transaction):
self.server.storea(oid, serial, data, '', id(transaction)) self.server.storea(oid, serial, data, id(transaction))
def tpc_finish(self, transaction, func = lambda: None): def tpc_finish(self, transaction, func = lambda: None):
self.server.tpc_finish(id(transaction)) self.server.tpc_finish(id(transaction))
...@@ -859,7 +859,7 @@ def multiple_storages_invalidation_queue_is_not_insane(): ...@@ -859,7 +859,7 @@ def multiple_storages_invalidation_queue_is_not_insane():
>>> trans, oids = s1.getInvalidations(last) >>> trans, oids = s1.getInvalidations(last)
>>> from ZODB.utils import u64 >>> from ZODB.utils import u64
>>> sorted([int(u64(oid)) for (oid, v) in oids]) >>> sorted([int(u64(oid)) for oid in oids])
[10, 11, 12, 13, 14] [10, 11, 12, 13, 14]
>>> server.close_server() >>> server.close_server()
...@@ -913,7 +913,7 @@ structure using lastTransactions. ...@@ -913,7 +913,7 @@ structure using lastTransactions.
>>> from ZODB.utils import u64 >>> from ZODB.utils import u64
>>> sorted([int(u64(oid)) for (oid, _) in oids]) >>> sorted([int(u64(oid)) for oid in oids])
[0, 92, 93, 94, 95, 96, 97, 98, 99, 100] [0, 92, 93, 94, 95, 96, 97, 98, 99, 100]
(Note that the fact that we get oids for 92-100 is actually an (Note that the fact that we get oids for 92-100 is actually an
...@@ -961,7 +961,7 @@ transaction, we'll get a result: ...@@ -961,7 +961,7 @@ transaction, we'll get a result:
>>> ntid == last[-1] >>> ntid == last[-1]
True True
>>> sorted([int(u64(oid)) for (oid, _) in oids]) >>> sorted([int(u64(oid)) for oid in oids])
[0, 101, 102, 103, 104] [0, 101, 102, 103, 104]
>>> fs.close() >>> fs.close()
...@@ -970,9 +970,11 @@ transaction, we'll get a result: ...@@ -970,9 +970,11 @@ transaction, we'll get a result:
def tpc_finish_error(): def tpc_finish_error():
r"""Server errors in tpc_finish weren't handled properly. r"""Server errors in tpc_finish weren't handled properly.
>>> import ZEO.ClientStorage >>> import ZEO.ClientStorage, ZEO.zrpc.connection
>>> class Connection: >>> class Connection:
... peer_protocol_version = (
... ZEO.zrpc.connection.Connection.current_protocol)
... def __init__(self, client): ... def __init__(self, client):
... self.client = client ... self.client = client
... def get_addr(self): ... def get_addr(self):
...@@ -1127,6 +1129,22 @@ def client_has_newer_data_than_server(): ...@@ -1127,6 +1129,22 @@ def client_has_newer_data_than_server():
""" """
def history_over_zeo():
"""
>>> addr, _ = start_server()
>>> import ZEO, ZODB.blob, transaction
>>> db = ZEO.DB(addr)
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x = 0
>>> transaction.commit()
>>> len(db.history(conn.root()._p_oid, 99))
2
>>> db.close()
"""
slow_test_classes = [ slow_test_classes = [
BlobAdaptedFileStorageTests, BlobWritableCacheTests, BlobAdaptedFileStorageTests, BlobWritableCacheTests,
DemoStorageTests, FileStorageTests, MappingStorageTests, DemoStorageTests, FileStorageTests, MappingStorageTests,
...@@ -1150,6 +1168,7 @@ def test_suite(): ...@@ -1150,6 +1168,7 @@ def test_suite():
doctest.DocFileSuite( doctest.DocFileSuite(
'zeo-fan-out.test', 'zdoptions.test', 'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt', 'drop_cache_rather_than_verify.txt',
'protocols.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown, setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
), ),
) )
......
...@@ -13,6 +13,9 @@ ...@@ -13,6 +13,9 @@
############################################################################## ##############################################################################
"""Helper file used to launch a ZEO server cross platform""" """Helper file used to launch a ZEO server cross platform"""
from ZEO.StorageServer import StorageServer
from ZEO.runzeo import ZEOOptions
import asyncore import asyncore
import os import os
import sys import sys
...@@ -25,10 +28,6 @@ import asyncore ...@@ -25,10 +28,6 @@ import asyncore
import threading import threading
import logging import logging
from ZEO.StorageServer import StorageServer
from ZEO.runzeo import ZEOOptions
def cleanup(storage): def cleanup(storage):
# FileStorage and the Berkeley storages have this method, which deletes # FileStorage and the Berkeley storages have this method, which deletes
# all files and directories used by the storage. This prevents @-files # all files and directories used by the storage. This prevents @-files
...@@ -155,12 +154,15 @@ def main(): ...@@ -155,12 +154,15 @@ def main():
keep = 0 keep = 0
configfile = None configfile = None
# Parse the arguments and let getopt.error percolate # Parse the arguments and let getopt.error percolate
opts, args = getopt.getopt(sys.argv[1:], 'kC:') opts, args = getopt.getopt(sys.argv[1:], 'kC:v:')
for opt, arg in opts: for opt, arg in opts:
if opt == '-k': if opt == '-k':
keep = 1 keep = 1
elif opt == '-C': elif opt == '-C':
configfile = arg configfile = arg
elif opt == '-v':
import ZEO.zrpc.connection
ZEO.zrpc.connection.Connection.current_protocol = arg
zo = ZEOOptions() zo = ZEOOptions()
zo.realize(["-C", configfile]) zo.realize(["-C", configfile])
......
...@@ -350,11 +350,11 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -350,11 +350,11 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# If we're a client, an exhaustive list of the server protocols we # If we're a client, an exhaustive list of the server protocols we
# can accept. # can accept.
servers_we_can_talk_to = [current_protocol] servers_we_can_talk_to = ["Z308", current_protocol]
# If we're a server, an exhaustive list of the client protocols we # If we're a server, an exhaustive list of the client protocols we
# can accept. # can accept.
clients_we_can_talk_to = ["Z200", "Z201", "Z303", current_protocol] clients_we_can_talk_to = ["Z200", "Z201", "Z303", "Z308", current_protocol]
# This is pretty excruciating. Details: # This is pretty excruciating. Details:
# #
...@@ -779,24 +779,25 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -779,24 +779,25 @@ class Connection(smac.SizedMessageAsyncConnection, object):
class ManagedServerConnection(Connection): class ManagedServerConnection(Connection):
"""Server-side Connection subclass.""" """Server-side Connection subclass."""
__super_init = Connection.__init__
__super_close = Connection.close
# Servers use a shared server trigger that uses the asyncore socket map # Servers use a shared server trigger that uses the asyncore socket map
trigger = trigger() trigger = trigger()
def __init__(self, sock, addr, obj, mgr): def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr self.mgr = mgr
self.__super_init(sock, addr, obj, 'S') Connection.__init__(self, sock, addr, obj, 'S')
self.obj.notifyConnected(self)
def handshake(self): def handshake(self):
# Send the server's preferred protocol to the client. # Send the server's preferred protocol to the client.
self.message_output(self.current_protocol) self.message_output(self.current_protocol)
def recv_handshake(self, proto):
Connection.recv_handshake(self, proto)
self.obj.notifyConnected(self)
def close(self): def close(self):
self.obj.notifyDisconnected() self.obj.notifyDisconnected()
self.__super_close() Connection.close(self)
class ManagedClientConnection(Connection): class ManagedClientConnection(Connection):
"""Client-side Connection subclass.""" """Client-side Connection subclass."""
......
...@@ -425,8 +425,7 @@ class DataRecord(object): ...@@ -425,8 +425,7 @@ class DataRecord(object):
version = '' version = ''
def __init__(self, oid, tid, data, version, prev): def __init__(self, oid, tid, data, prev):
assert not version, "versions are no-longer supported"
self.oid = oid self.oid = oid
self.tid = tid self.tid = tid
self.data = data self.data = data
......
...@@ -1090,7 +1090,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -1090,7 +1090,7 @@ class FileStorage(BaseStorage.BaseStorage,
pos = pos - 8 - u64(read(8)) pos = pos - 8 - u64(read(8))
seek(0) seek(0)
return [(trans.tid, [(r.oid, '') for r in trans]) return [(trans.tid, [r.oid for r in trans])
for trans in FileIterator(self._file_name, pos=pos)] for trans in FileIterator(self._file_name, pos=pos)]
finally: finally:
self._lock_release() self._lock_release()
...@@ -1711,7 +1711,7 @@ class TransactionRecordIterator(FileStorageFormatter): ...@@ -1711,7 +1711,7 @@ class TransactionRecordIterator(FileStorageFormatter):
class Record(BaseStorage.DataRecord): class Record(BaseStorage.DataRecord):
def __init__(self, oid, tid, data, prev, pos): def __init__(self, oid, tid, data, prev, pos):
super(Record, self).__init__(oid, tid, data, '', prev) super(Record, self).__init__(oid, tid, data, prev)
self.pos = pos self.pos = pos
......
...@@ -332,7 +332,7 @@ class TransactionRecord: ...@@ -332,7 +332,7 @@ class TransactionRecord:
def __iter__(self): def __iter__(self):
for oid, data in self.data.items(): for oid, data in self.data.items():
yield DataRecord(oid, self.tid, data, None) yield DataRecord(oid, self.tid, data)
def pack(self, oid): def pack(self, oid):
self.status = 'p' self.status = 'p'
...@@ -345,12 +345,12 @@ class DataRecord(object): ...@@ -345,12 +345,12 @@ class DataRecord(object):
zope.interface.implements(ZODB.interfaces.IStorageRecordInformation) zope.interface.implements(ZODB.interfaces.IStorageRecordInformation)
version = '' version = ''
data_txn = None
def __init__(self, oid, tid, data, prev): def __init__(self, oid, tid, data):
self.oid = oid self.oid = oid
self.tid = tid self.tid = tid
self.data = data self.data = data
self.data_txn = prev
def DB(*args, **kw): def DB(*args, **kw):
return ZODB.DB(MappingStorage(), *args, **kw) return ZODB.DB(MappingStorage(), *args, **kw)
...@@ -471,7 +471,7 @@ Now, we can call lastInvalidations on it: ...@@ -471,7 +471,7 @@ Now, we can call lastInvalidations on it:
True True
>>> from ZODB.utils import u64 >>> from ZODB.utils import u64
>>> [[int(u64(oid)) for (oid, version) in oids] >>> [[int(u64(oid)) for oid in oids]
... for (i, oids) in invalidations] ... for (i, oids) in invalidations]
... # doctest: +NORMALIZE_WHITESPACE ... # doctest: +NORMALIZE_WHITESPACE
[[0, 91], [0, 92], [0, 93], [0, 94], [0, 95], [[0, 91], [0, 92], [0, 93], [0, 94], [0, 95],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment