diff --git a/src/ZEO/ClientStorage.py b/src/ZEO/ClientStorage.py index 77413feebce4e8d7ad7dc7d00dfc1a31cf82be79..a51afb2522e3e5356b55936210004a70daffd528 100644 --- a/src/ZEO/ClientStorage.py +++ b/src/ZEO/ClientStorage.py @@ -903,25 +903,32 @@ class ClientStorage: return self._pickler.dump(args) + def _process_invalidations(self, invs): + # Invalidations are sent by the ZEO server as a sequence of + # oid, version pairs. The DB's invalidate() method expects a + # dictionary of oids. + + # versions maps version names to dictionary of invalidations + versions = {} + for oid, version in invs: + d = versions.setdefault(version, {}) + self._cache.invalidate(oid, version=version) + d[oid] = 1 + if self._db is not None: + for v, d in versions.items(): + self._db.invalidate(d, version=v) + def endVerify(self): """Server callback to signal end of cache validation.""" if self._pickler is None: return - self._pickler.dump((0,0)) + # write end-of-data marker + self._pickler.dump((None, None)) self._pickler = None self._tfile.seek(0) - unpick = cPickle.Unpickler(self._tfile) f = self._tfile self._tfile = None - - while 1: - oid, version = unpick.load() - log2(INFO, "verify invalidate %r" % oid) - if not oid: - break - self._cache.invalidate(oid, version=version) - if self._db is not None: - self._db.invalidate(oid, version=version) + self._process_invalidations(InvalidationLogIterator(f)) f.close() log2(INFO, "endVerify finishing") @@ -939,11 +946,7 @@ class ClientStorage: for t in args: self.self._pickler.dump(t) return - db = self._db - for oid, version in args: - self._cache.invalidate(oid, version=version) - if db is not None: - db.invalidate(oid, version=version) + self._process_invalidations(args) # The following are for compatibility with protocol version 2.0.0 @@ -954,4 +957,18 @@ class ClientStorage: end = endVerify Invalidate = invalidateTrans +class InvalidationLogIterator: + """Helper class for reading invalidations in endVerify.""" + # XXX will require extra work to backport to Python 2.1 + + def __init__(self, fileobj): + self._unpickler = cPickle.Unpickler(fileobj) + + def __iter__(self): + return self + def next(self): + oid, version = self._unpickler.load() + if oid is None: + raise StopIteration + return oid, version diff --git a/src/ZEO/tests/ConnectionTests.py b/src/ZEO/tests/ConnectionTests.py index d4a4e4d95f94c788a441e3bf52deee857191aea2..3e678e1fda687194aaf56501145afcb4cc606a34 100644 --- a/src/ZEO/tests/ConnectionTests.py +++ b/src/ZEO/tests/ConnectionTests.py @@ -31,6 +31,7 @@ from ZEO.Exceptions import ClientDisconnected from ZEO.zrpc.marshal import Marshaller from ZEO.tests import forker +from ZODB.DB import DB from ZODB.Transaction import get_transaction, Transaction from ZODB.POSException import ReadOnlyError from ZODB.tests.StorageTestBase import StorageTestBase @@ -61,6 +62,7 @@ class CommonSetupTearDown(StorageTestBase): invq = None timeout = None monitor = 0 + db_class = DummyDB def setUp(self): """Test setup for connection tests. @@ -472,6 +474,36 @@ class ConnectionTests(CommonSetupTearDown): for t in threads: t.closeclients() + def checkCrossDBInvalidations(self): + db1 = DB(self.openClientStorage()) + c1 = db1.open() + r1 = c1.root() + + r1["a"] = MinPO("a") + get_transaction().commit() + + db2 = DB(self.openClientStorage()) + r2 = db2.open().root() + + self.assertEqual(r2["a"].value, "a") + + r2["b"] = MinPO("b") + get_transaction().commit() + + # make sure the invalidation is received in the other client + c1._storage.sync() + self.assert_(r1._p_oid in c1._invalidated) + + # force the invalidations to be applied... + c1.setLocalTransaction() + c1.getTransaction().register(c1) + c1.getTransaction().abort() + r1.keys() # unghostify + self.assertEqual(r1._p_serial, r2._p_serial) + + db2.close() + db1.close() + class ReconnectionTests(CommonSetupTearDown): keep = 1 invq = 2