Commit 774134a0 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge ZODB3-fast-restart-branch to the trunk

parent 83699c6e
......@@ -34,8 +34,16 @@ temporary directory as determined by the tempfile module.
The ClientStorage overrides the client name default to the value of
the environment variable ZEO_CLIENT, if it exists.
Each cache file has a 4-byte magic number followed by a sequence of
records of the form:
Each cache file has a 12-byte header followed by a sequence of
records. The header format is as follows:
offset in header: name -- description
0: magic -- 4-byte magic number, identifying this as a ZEO cache file
4: lasttid -- 8-byte last transaction id
Each record has the following form:
offset in record: name -- description
......@@ -111,7 +119,8 @@ from ZODB.utils import U64
import zLOG
from ZEO.ICache import ICache
magic='ZEC0'
magic = 'ZEC1'
headersize = 12
class ClientCache:
......@@ -126,6 +135,8 @@ class ClientCache:
self._storage = storage
self._limit = size / 2
self._client = client
self._ltid = None # For getLastTid()
# Allocate locks:
L = allocate_lock()
......@@ -154,9 +165,9 @@ class ClientCache:
fi = open(p[i],'r+b')
if fi.read(4) == magic: # Minimal sanity
fi.seek(0, 2)
if fi.tell() > 30:
# First serial is at offset 19 + 4 for magic
fi.seek(23)
if fi.tell() > headersize:
# Read serial at offset 19 of first record
fi.seek(headersize + 19)
s[i] = fi.read(8)
# If we found a non-zero serial, then use the file
if s[i] != '\0\0\0\0\0\0\0\0':
......@@ -172,14 +183,14 @@ class ClientCache:
if f[0] is None:
# We started, open the first cache file
f[0] = open(p[0], 'w+b')
f[0].write(magic)
f[0].write(magic + '\0' * (headersize - len(magic)))
current = 0
f[1] = None
else:
self._f = f = [tempfile.TemporaryFile(suffix='.zec'), None]
# self._p file name 'None' signifies an unnamed temp file.
self._p = p = [None, None]
f[0].write(magic)
f[0].write(magic + '\0' * (headersize - len(magic)))
current = 0
self.log("%s: storage=%r, size=%r; file[%r]=%r" %
......@@ -219,6 +230,57 @@ class ClientCache:
except OSError:
pass
def getLastTid(self):
"""Get the last transaction id stored by setLastTid().
If the cache is persistent, it is read from the current
cache file; otherwise it's an instance variable.
"""
if self._client is None:
return self._ltid
else:
self._acquire()
try:
return self._getLastTid()
finally:
self._release()
def _getLastTid(self):
f = self._f[self._current]
f.seek(4)
tid = f.read(8)
if len(tid) < 8 or tid == '\0\0\0\0\0\0\0\0':
return None
else:
return tid
def setLastTid(self, tid):
"""Store the last transaction id.
If the cache is persistent, it is written to the current
cache file; otherwise it's an instance variable.
"""
if self._client is None:
if tid == '\0\0\0\0\0\0\0\0':
tid = None
self._ltid = tid
else:
self._acquire()
try:
self._setLastTid(tid)
finally:
self._release()
def _setLastTid(self, tid):
if tid is None:
tid = '\0\0\0\0\0\0\0\0'
else:
tid = str(tid)
assert len(tid) == 8
f = self._f[self._current]
f.seek(4)
f.write(tid)
def verify(self, verifyFunc):
"""Call the verifyFunc on every object in the cache.
......@@ -477,6 +539,7 @@ class ClientCache:
self._acquire()
try:
if self._pos + size > self._limit:
ltid = self._getLastTid()
current = not self._current
self._current = current
self._trace(0x70)
......@@ -500,8 +563,12 @@ class ClientCache:
else:
# Temporary cache file:
self._f[current] = tempfile.TemporaryFile(suffix='.zec')
self._f[current].write(magic)
self._pos = 4
header = magic
if ltid:
header += ltid
self._f[current].write(header +
'\0' * (headersize - len(header)))
self._pos = headersize
finally:
self._release()
......@@ -593,7 +660,7 @@ class ClientCache:
f = self._f[fileindex]
seek = f.seek
read = f.read
pos = 4
pos = headersize
count = 0
while 1:
......@@ -652,7 +719,6 @@ class ClientCache:
del serial[oid]
del index[oid]
pos = pos + tlen
count += 1
......
......@@ -22,7 +22,6 @@ ClientDisconnected -- exception raised by ClientStorage
"""
# XXX TO DO
# get rid of beginVerify, set up _tfile in verify_cache
# set self._storage = stub later, in endVerify
# if wait is given, wait until verify is complete
......@@ -60,6 +59,9 @@ class UnrecognizedResult(ClientStorageError):
class ClientDisconnected(ClientStorageError, Disconnected):
"""The database storage is disconnected from the storage."""
def tid2time(tid):
return str(TimeStamp(tid))
def get_timestamp(prev_ts=None):
"""Internal helper to return a unique TimeStamp instance.
......@@ -208,6 +210,8 @@ class ClientStorage:
self._connection = None
# _server_addr is used by sortKey()
self._server_addr = None
self._tfile = None
self._pickler = None
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0,
......@@ -337,12 +341,14 @@ class ClientStorage:
This is called by ConnectionManager after it has decided which
connection should be used.
"""
# XXX would like to report whether we get a read-only connection
if self._connection is not None:
log2(INFO, "Reconnected to storage")
reconnect = 1
else:
log2(INFO, "Connected to storage")
reconnect = 0
self.set_server_addr(conn.get_addr())
stub = self.StorageServerStubClass(conn)
stub = self.StorageServerStubClass(conn)
self._oids = []
self._info.update(stub.get_info())
self.verify_cache(stub)
......@@ -353,6 +359,11 @@ class ClientStorage:
self._connection = conn
self._server = stub
if reconnect:
log2(INFO, "Reconnected to storage: %s" % self._server_addr)
else:
log2(INFO, "Connected to storage: %s" % self._server_addr)
def set_server_addr(self, addr):
# Normalize server address and convert to string
if isinstance(addr, types.StringType):
......@@ -381,12 +392,42 @@ class ClientStorage:
return self._server_addr
def verify_cache(self, server):
"""Internal routine called to verify the cache."""
# XXX beginZeoVerify ends up calling back to beginVerify() below.
# That whole exchange is rather unnecessary.
server.beginZeoVerify()
"""Internal routine called to verify the cache.
The return value (indicating which path we took) is used by
the test suite.
"""
last_inval_tid = self._cache.getLastTid()
if last_inval_tid is not None:
ltid = server.lastTransaction()
if ltid == last_inval_tid:
log2(INFO, "No verification necessary "
"(last_inval_tid up-to-date)")
self._cache.open()
return "no verification"
# log some hints about last transaction
log2(INFO, "last inval tid: %r %s"
% (last_inval_tid, tid2time(last_inval_tid)))
log2(INFO, "last transaction: %r %s" %
(ltid, ltid and tid2time(ltid)))
pair = server.getInvalidations(last_inval_tid)
if pair is not None:
log2(INFO, "Recovering %d invalidations" % len(pair[1]))
self._cache.open()
self.invalidateTransaction(*pair)
return "quick verification"
log2(INFO, "Verifying cache")
# setup tempfile to hold zeoVerify results
self._tfile = tempfile.TemporaryFile(suffix=".inv")
self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo
self._cache.verify(server.zeoVerify)
server.endZeoVerify()
return "full verification"
### Is there a race condition between notifyConnected and
### notifyDisconnected? In Particular, what if we get
......@@ -402,7 +443,8 @@ class ClientStorage:
This is called by ConnectionManager when the connection is
closed or when certain problems with the connection occur.
"""
log2(PROBLEM, "Disconnected from storage")
log2(PROBLEM, "Disconnected from storage: %s"
% repr(self._server_addr))
self._connection = None
self._server = disconnected_stub
......@@ -644,6 +686,7 @@ class ClientStorage:
self._serial = id
self._seriald.clear()
del self._serials[:]
self._tbuf.clear()
def end_transaction(self):
"""Internal helper to end a transaction."""
......@@ -678,12 +721,13 @@ class ClientStorage:
if f is not None:
f()
self._server.tpc_finish(self._serial)
tid = self._server.tpc_finish(self._serial)
r = self._check_serials()
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
self._update_cache()
self._cache.setLastTid(tid)
finally:
self.end_transaction()
......@@ -779,12 +823,6 @@ class ClientStorage:
"""Server callback to update the info dictionary."""
self._info.update(dict)
def beginVerify(self):
"""Server callback to signal start of cache validation."""
self._tfile = tempfile.TemporaryFile(suffix=".inv")
self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo
def invalidateVerify(self, args):
"""Server callback to invalidate an (oid, version) pair.
......@@ -802,6 +840,7 @@ class ClientStorage:
if self._pickler is None:
return
self._pickler.dump((0,0))
self._pickler = None
self._tfile.seek(0)
unpick = cPickle.Unpickler(self._tfile)
f = self._tfile
......@@ -815,29 +854,26 @@ class ClientStorage:
self._db.invalidate(oid, version=version)
f.close()
def invalidateTrans(self, args):
"""Server callback to invalidate a list of (oid, version) pairs.
This is called as the result of a transaction.
"""
def invalidateTransaction(self, tid, args):
"""Invalidate objects modified by tid."""
self._cache.setLastTid(tid)
if self._pickler is not None:
self.log("Transactional invalidation during cache verification",
level=zLOG.BLATHER)
for t in args:
self.self._pickler.dump(t)
return
db = self._db
for oid, version in args:
self._cache.invalidate(oid, version=version)
try:
self._db.invalidate(oid, version=version)
except AttributeError, msg:
log2(PROBLEM,
"Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
repr(version),
msg))
# Unfortunately, the ZEO 2 wire protocol uses different names for
# several of the callback methods invoked by the StorageServer.
# We can't change the wire protocol at this point because that
# would require synchronized updates of clients and servers and we
# don't want that. So here we alias the old names to their new
# implementations.
begin = beginVerify
if db is not None:
db.invalidate(oid, version=version)
# The following are for compatibility with protocol version 2.0.0
def invalidateTrans(self, args):
return self.invalidateTransaction(None, args)
invalidate = invalidateVerify
end = endVerify
Invalidate = invalidateTrans
......
......@@ -44,16 +44,16 @@ class ClientStorage:
self.rpc = rpc
def beginVerify(self):
self.rpc.callAsync('begin')
self.rpc.callAsync('beginVerify')
def invalidateVerify(self, args):
self.rpc.callAsync('invalidate', args)
self.rpc.callAsync('invalidateVerify', args)
def endVerify(self):
self.rpc.callAsync('end')
self.rpc.callAsync('endVerify')
def invalidateTrans(self, args):
self.rpc.callAsync('Invalidate', args)
def invalidateTransaction(self, tid, args):
self.rpc.callAsync('invalidateTransaction', tid, args)
def serialnos(self, arg):
self.rpc.callAsync('serialnos', arg)
......
......@@ -32,6 +32,9 @@ class StorageServer:
zrpc.connection.Connection class.
"""
self.rpc = rpc
if self.rpc.peer_protocol_version == 'Z200':
self.lastTransaction = lambda: None
self.getInvalidations = lambda tid: None
def extensionMethod(self, name):
return ExtensionMethodWrapper(self.rpc, name).call
......@@ -51,8 +54,13 @@ class StorageServer:
def get_info(self):
return self.rpc.call('get_info')
def beginZeoVerify(self):
self.rpc.callAsync('beginZeoVerify')
def lastTransaction(self):
# Not in protocol version 2.0.0; see __init__()
return self.rpc.call('lastTransaction')
def getInvalidations(self, tid):
# Not in protocol version 2.0.0; see __init__()
return self.rpc.call('getInvalidations', tid)
def zeoVerify(self, oid, s, sv):
self.rpc.callAsync('zeoVerify', oid, s, sv)
......
This diff is collapsed.
......@@ -117,18 +117,12 @@ def main():
# Must be a misaligned record caused by a crash
##print "Skipping 8 bytes at offset", offset-8
continue
oid = f_read(8)
if len(oid) < 8:
r = f_read(16)
if len(r) < 16:
break
if heuristic and oid[:4] != '\0\0\0\0':
f.seek(-8, 1)
continue
offset += 8
serial = f_read(8)
if len(serial) < 8:
break
offset += 8
offset += 16
records += 1
oid, serial = struct_unpack(">8s8s", r)
# Decode the code
dlen, version, code, current = (code & 0x7fffff00,
code & 0x80,
......
......@@ -153,24 +153,14 @@ def main():
if ts == 0:
# Must be a misaligned record caused by a crash
if not quiet:
print "Skipping 8 bytes at offset", offset-8,
print repr(r)
print "Skipping 8 bytes at offset", offset-8
continue
oid = f_read(8)
if len(oid) < 8:
r = f_read(16)
if len(r) < 16:
break
if heuristic and oid[:4] != '\0\0\0\0':
# Heuristic for severe data corruption
print "Seeking back over bad oid at offset", offset,
print repr(r)
f.seek(-8, 1)
continue
offset += 8
serial = f_read(8)
if len(serial) < 8:
break
offset += 8
offset += 16
records += 1
oid, serial = struct_unpack(">8s8s", r)
if t0 is None:
t0 = ts
thisinterval = t0 / interval
......
......@@ -20,7 +20,9 @@ import select
import socket
import asyncore
import tempfile
import thread # XXX do we really need to catch thread.error
import threading
import time
import zLOG
......@@ -36,9 +38,18 @@ from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
from ZODB.tests.StorageTestBase import handle_all_serials, ZERO
class TestClientStorage(ClientStorage):
def verify_cache(self, stub):
self.end_verify = threading.Event()
self.verify_result = ClientStorage.verify_cache(self, stub)
def endVerify(self):
ClientStorage.endVerify(self)
self.end_verify.set()
class DummyDB:
def invalidate(self, *args, **kws):
def invalidate(self, *args, **kwargs):
pass
......@@ -48,6 +59,7 @@ class CommonSetupTearDown(StorageTestBase):
__super_setUp = StorageTestBase.setUp
__super_tearDown = StorageTestBase.tearDown
keep = 0
invq = None
def setUp(self):
"""Test setup for connection tests.
......@@ -99,17 +111,15 @@ class CommonSetupTearDown(StorageTestBase):
raise NotImplementedError
def openClientStorage(self, cache='', cache_size=200000, wait=1,
read_only=0, read_only_fallback=0,
addr=None):
if addr is None:
addr = self.addr
storage = ClientStorage(addr,
client=cache,
cache_size=cache_size,
wait=wait,
min_disconnect_poll=0.1,
read_only=read_only,
read_only_fallback=read_only_fallback)
read_only=0, read_only_fallback=0):
base = TestClientStorage(self.addr,
client=cache,
cache_size=cache_size,
wait=wait,
min_disconnect_poll=0.1,
read_only=read_only,
read_only_fallback=read_only_fallback)
storage = base
storage.registerDB(DummyDB(), None)
return storage
......@@ -121,7 +131,7 @@ class CommonSetupTearDown(StorageTestBase):
path = "%s.%d" % (self.file, index)
conf = self.getConfig(path, create, read_only)
zeoport, adminaddr, pid = forker.start_zeo_server(
conf, addr, ro_svr, self.keep)
conf, addr, ro_svr, self.keep, self.invq)
self._pids.append(pid)
self._servers.append(adminaddr)
......@@ -420,9 +430,9 @@ class ConnectionTests(CommonSetupTearDown):
for t in threads:
t.closeclients()
class ReconnectionTests(CommonSetupTearDown):
keep = 1
invq = 2
def checkReadOnlyStorage(self):
# Open a read-only client to a read-only *storage*; stores fail
......@@ -557,6 +567,113 @@ class ReconnectionTests(CommonSetupTearDown):
else:
self.fail("Couldn't store after starting a read-write server")
def checkNoVerificationOnServerRestart(self):
self._storage = self.openClientStorage()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
self._dostore()
self.shutdownServer()
self.pollDown()
self._storage.verify_result = None
self.startServer(create=0)
self.pollUp()
# There were no transactions committed, so no verification
# should be needed.
self.assertEqual(self._storage.verify_result, "no verification")
def checkNoVerificationOnServerRestartWith2Clients(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
self._dostore(oid, revid)
perstorage.load(oid, '')
self.shutdownServer()
self.pollDown()
self._storage.verify_result = None
perstorage.verify_result = None
self.startServer(create=0)
self.pollUp()
# There were no transactions committed, so no verification
# should be needed.
self.assertEqual(self._storage.verify_result, "no verification")
perstorage.close()
self.assertEqual(perstorage.verify_result, "no verification")
def checkQuickVerificationWith2Clients(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "quick verification")
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
def checkVerificationWith2ClientsInvqOverflow(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
self._storage = self.openClientStorage()
oid = self._storage.new_oid()
# When we create a new storage, it should always do a full
# verification
self.assertEqual(self._storage.verify_result, "full verification")
# do two storages of the object to make sure an invalidation
# message is generated
revid = self._dostore(oid)
revid = self._dostore(oid, revid)
perstorage.load(oid, '')
perstorage.close()
# the test code sets invq bound to 2
for i in range(5):
revid = self._dostore(oid, revid)
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
t = time.time() + 30
while not perstorage.end_verify.isSet():
perstorage.sync()
if time.time() > t:
self.fail("timed out waiting for endVerify")
self.assertEqual(self._storage.load(oid, '')[1], revid)
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
perstorage.close()
class MSTThread(threading.Thread):
......
......@@ -51,7 +51,7 @@ def get_port():
raise RuntimeError, "Can't find port"
def start_zeo_server(conf, addr=None, ro_svr=0, keep=0):
def start_zeo_server(conf, addr=None, ro_svr=0, keep=0, invq=None):
"""Start a ZEO server in a separate process.
Returns the ZEO port, the test server port, and the pid.
......@@ -77,6 +77,8 @@ def start_zeo_server(conf, addr=None, ro_svr=0, keep=0):
args.append('-r')
if keep:
args.append('-k')
if invq:
args += ['-Q', str(invq)]
args.append(str(port))
d = os.environ.copy()
d['PYTHONPATH'] = os.pathsep.join(sys.path)
......
......@@ -261,6 +261,19 @@ class ClientCacheTests(unittest.TestCase):
self.assert_(None is not cache._index.get(oid1) < 0)
self.assert_(None is not cache._index.get(oid2) < 0)
def testLastTid(self):
cache = self.cache
self.failUnless(cache.getLastTid() is None)
ltid = 'pqrstuvw'
cache.setLastTid(ltid)
self.assertEqual(cache.getLastTid(), ltid)
cache.checkSize(10*self.cachesize) # Force a file flip
self.assertEqual(cache.getLastTid(), ltid)
cache.setLastTid(None)
self.failUnless(cache.getLastTid() is None)
cache.checkSize(10*self.cachesize) # Force a file flip
self.failUnless(cache.getLastTid() is None)
class PersistentClientCacheTests(unittest.TestCase):
def setUp(self):
......@@ -348,6 +361,26 @@ class PersistentClientCacheTests(unittest.TestCase):
self.fail("invalidated data resurrected, size %d, was %d" %
(len(loaded[0]), len(data)))
def testPersistentLastTid(self):
cache = self.cache
self.failUnless(cache.getLastTid() is None)
ltid = 'pqrstuvw'
cache.setLastTid(ltid)
self.assertEqual(cache.getLastTid(), ltid)
oid = 'abcdefgh'
data = '1234'
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
self.assertEqual(cache.getLastTid(), ltid)
cache.checkSize(10*self.cachesize) # Force a file flip
self.assertEqual(cache.getLastTid(), ltid)
cache = self.reopenCache()
self.assertEqual(cache.getLastTid(), ltid)
cache.setLastTid(None)
self.failUnless(cache.getLastTid() is None)
cache.checkSize(10*self.cachesize) # Force a file flip
self.failUnless(cache.getLastTid() is None)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ClientCacheTests))
......
......@@ -116,8 +116,9 @@ def main():
ro_svr = 0
keep = 0
configfile = None
invalidation_queue_size = 100
# Parse the arguments and let getopt.error percolate
opts, args = getopt.getopt(sys.argv[1:], 'rkC:')
opts, args = getopt.getopt(sys.argv[1:], 'rkC:Q:')
for opt, arg in opts:
if opt == '-r':
ro_svr = 1
......@@ -125,6 +126,8 @@ def main():
keep = 1
elif opt == '-C':
configfile = arg
elif opt == '-Q':
invalidation_queue_size = int(arg)
# Open the config file and let ZConfig parse the data there. Then remove
# the config file, otherwise we'll leave turds.
fp = open(configfile, 'r')
......@@ -145,7 +148,9 @@ def main():
sys.exit(2)
addr = ('', zeo_port)
log(label, 'creating the storage server')
serv = ZEO.StorageServer.StorageServer(addr, {'1': storage}, ro_svr)
serv = ZEO.StorageServer.StorageServer(
addr, {'1': storage}, ro_svr,
invalidation_queue_size=invalidation_queue_size)
log(label, 'entering ThreadedAsync loop')
ThreadedAsync.LoopCallback.loop()
......
......@@ -119,7 +119,7 @@ class ConnectionManager:
# XXX need each connection started with async==0 to have a
# callback
log("CM.set_async(%s)" % repr(map))
log("CM.set_async(%s)" % repr(map), level=zLOG.DEBUG)
if not self.closed and self.trigger is None:
log("CM.set_async(): first call")
self.trigger = trigger()
......@@ -294,6 +294,9 @@ class ConnectThread(threading.Thread):
if success > 0:
break
time.sleep(delay)
if self.mgr.is_connected():
log("CT: still trying to replace fallback connection",
level=zLOG.INFO)
delay = min(delay*2, self.tmax)
log("CT: exiting thread: %s" % self.getName())
......
This diff is collapsed.
......@@ -16,6 +16,7 @@ import asyncore
import os
import socket
import thread
import errno
if os.name == 'posix':
......
......@@ -12,11 +12,9 @@
#
##############################################################################
"""Handy standard storage machinery
"""
# Do this portably in the face of checking out with -kv
import string
__version__ = string.split('$Revision: 1.30 $')[-2:][0]
$Id: BaseStorage.py,v 1.31 2003/01/03 22:07:43 jeremy Exp $
"""
import cPickle
import ThreadLock, bpthread
import time, UndoLogCompatible
......@@ -277,8 +275,8 @@ class BaseStorage(UndoLogCompatible.UndoLogCompatible):
restoring = 1
else:
restoring = 0
for transaction in other.iterator():
fiter = other.iterator()
for transaction in fiter:
tid=transaction.tid
if _ts is None:
_ts=TimeStamp(tid)
......@@ -313,6 +311,8 @@ class BaseStorage(UndoLogCompatible.UndoLogCompatible):
self.tpc_vote(transaction)
self.tpc_finish(transaction)
fiter.close()
class TransactionRecord:
"""Abstract base class for iterator protocol"""
......
This diff is collapsed.
......@@ -38,6 +38,7 @@ class IteratorCompare:
eq(zodb_unpickle(rec.data), MinPO(val))
val = val + 1
eq(val, val0 + len(revids))
txniter.close()
class IteratorStorage(IteratorCompare):
......@@ -191,3 +192,5 @@ class IteratorDeepCompare:
# they were the same length
self.assertRaises(IndexError, iter1.next)
self.assertRaises(IndexError, iter2.next)
iter1.close()
iter2.close()
......@@ -72,6 +72,101 @@ class FileStorageTests(
else:
self.fail("expect long user field to raise error")
def check_use_fsIndex(self):
from ZODB.fsIndex import fsIndex
self.assertEqual(self._storage._index.__class__, fsIndex)
# XXX We could really use some tests for sanity checking
def check_conversion_to_fsIndex_not_if_readonly(self):
self.tearDown()
class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self):
return {}, {}, {}, {}
from ZODB.fsIndex import fsIndex
# Hack FileStorage to create dictionary indexes
self._storage = OldFileStorage('FileStorageTests.fs')
self.assertEqual(type(self._storage._index), type({}))
for i in range(10):
self._dostore()
# Should save the index
self._storage.close()
self._storage = ZODB.FileStorage.FileStorage(
'FileStorageTests.fs', read_only=1)
self.assertEqual(type(self._storage._index), type({}))
def check_conversion_to_fsIndex(self):
self.tearDown()
class OldFileStorage(ZODB.FileStorage.FileStorage):
def _newIndexes(self):
return {}, {}, {}, {}
from ZODB.fsIndex import fsIndex
# Hack FileStorage to create dictionary indexes
self._storage = OldFileStorage('FileStorageTests.fs')
self.assertEqual(type(self._storage._index), type({}))
for i in range(10):
self._dostore()
oldindex = self._storage._index.copy()
# Should save the index
self._storage.close()
self._storage = ZODB.FileStorage.FileStorage('FileStorageTests.fs')
self.assertEqual(self._storage._index.__class__, fsIndex)
self.failUnless(self._storage._used_index)
index = {}
for k, v in self._storage._index.items():
index[k] = v
self.assertEqual(index, oldindex)
def check_save_after_load_with_no_index(self):
for i in range(10):
self._dostore()
self._storage.close()
os.remove('FileStorageTests.fs.index')
self.open()
self.assertEqual(self._storage._saved, 1)
# This would make the unit tests too slow
# check_save_after_load_that_worked_hard(self)
def check_periodic_save_index(self):
# Check the basic algorithm
oldsaved = self._storage._saved
self._storage._records_before_save = 10
for i in range(4):
self._dostore()
self.assertEqual(self._storage._saved, oldsaved)
self._dostore()
self.assertEqual(self._storage._saved, oldsaved+1)
# Now make sure the parameter changes as we get bigger
for i in range(20):
self._dostore()
self.failUnless(self._storage._records_before_save > 20)
class FileStorageRecoveryTest(
StorageTestBase.StorageTestBase,
RecoveryStorage.RecoveryStorage,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment