Commit 44c8f9ae authored by Jim Fulton's avatar Jim Fulton

- (3.8a1) ZEO's strategoes for avoiding client cache verification were

  improved in the case that servers are restarted.  Before, if
  transactions were committed after the restart, clients that were up
  to date or nearly up to date at the time of the restart and then
  connected had to verify their caches.  Now, it is far more likely
  that a client that reconnects soon after a server restart won't have
  to verify its cache.

- Fixed a serious bug that could cause clients that disconnect from and
  reconnect to a server to get bad invalidation data if the server
  serves multiple storages with active writes.
parent 8bc95951
What's new on ZODB 3.8a1?
=========================
ZEO
---
- (3.8a1) ZEO's strategoes for avoiding client cache verification were
improved in the case that servers are restarted. Before, if
transactions were committed after the restart, clients that were up
to date or nearly up to date at the time of the restart and then
connected had to verify their caches. Now, it is far more likely
that a client that reconnects soon after a server restart won't have
to verify its cache.
- Fixed a serious bug that could cause clients that disconnect from and
reconnect to a server to get bad invalidation data if the server
serves multiple storages with active writes.
Transactions
------------
......
......@@ -283,7 +283,7 @@ class ZEOStorage:
return p, s, v, pv, sv
def getInvalidations(self, tid):
invtid, invlist = self.server.get_invalidations(tid)
invtid, invlist = self.server.get_invalidations(self.storage_id, tid)
if invtid is None:
return None
self.log("Return %d invalidations up to tid %s"
......@@ -787,11 +787,20 @@ class StorageServer:
self.database = None
if auth_protocol:
self._setup_auth(auth_protocol)
# A list of at most invalidation_queue_size invalidations.
# A list, by server, of at most invalidation_queue_size invalidations.
# The list is kept in sorted order with the most recent
# invalidation at the front. The list never has more than
# self.invq_bound elements.
self.invq = []
self.invq = {}
for name, storage in storages.items():
lastInvalidations = getattr(storage, 'lastInvalidations', None)
if lastInvalidations is None:
self.invq[name] = [(storage.lastTransaction(), None)]
else:
self.invq[name] = list(
lastInvalidations(invalidation_queue_size)
)
self.invq[name].reverse()
self.invq_bound = invalidation_queue_size
self.connections = {}
self.dispatcher = self.DispatcherClass(addr,
......@@ -906,17 +915,20 @@ class StorageServer:
the current client.
"""
if invalidated:
if len(self.invq) >= self.invq_bound:
self.invq.pop()
self.invq.insert(0, (tid, invalidated))
invq = self.invq[storage_id]
if len(invq) >= self.invq_bound:
invq.pop()
invq.insert(0, (tid, invalidated))
for p in self.connections.get(storage_id, ()):
if invalidated and p is not conn:
p.client.invalidateTransaction(tid, invalidated)
elif info is not None:
p.client.info(info)
def get_invalidations(self, tid):
def get_invalidations(self, storage_id, tid):
"""Return a tid and list of all objects invalidation since tid.
The tid is the most recent transaction id seen by the client.
......@@ -926,22 +938,23 @@ class StorageServer:
do full cache verification.
"""
if not self.invq:
invq = self.invq[storage_id]
if not invq:
log("invq empty")
return None, []
earliest_tid = self.invq[-1][0]
earliest_tid = invq[-1][0]
if earliest_tid > tid:
log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
return None, []
oids = {}
for _tid, L in self.invq:
for _tid, L in invq:
if _tid <= tid:
break
for key in L:
oids[key] = 1
latest_tid = self.invq[0][0]
latest_tid = invq[0][0]
return latest_tid, oids.keys()
def close_server(self):
......
......@@ -15,6 +15,7 @@
# System imports
import asyncore
import doctest
import logging
import os
import random
......@@ -27,6 +28,7 @@ import shutil
# ZODB test support
import ZODB
import ZODB.tests.util
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
......@@ -46,6 +48,8 @@ from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
import ZEO.tests.ConnectionTests
import ZEO.StorageServer
logger = logging.getLogger('ZEO.tests.testZEO')
class DummyDB:
......@@ -580,11 +584,212 @@ class BlobWritableCacheTests(GenericTests, CommonBlobTests):
super(BlobWritableCacheTests, self).setUp()
class StorageServerClientWrapper:
def __init__(self):
self.serials = []
def serialnos(self, serials):
self.serials.extend(serials)
def info(self, info):
pass
class StorageServerWrapper:
def __init__(self, server, storage_id):
self.storage_id = storage_id
self.server = ZEO.StorageServer.ZEOStorage(server, server.read_only)
self.server.register(storage_id, False)
self.server._thunk = lambda : None
self.server.client = StorageServerClientWrapper()
def sortKey(self):
return self.storage_id
def __getattr__(self, name):
return getattr(self.server, name)
def registerDB(self, *args):
pass
def supportsUndo(self):
return False
def supportsVersions(self):
return False
def new_oid(self):
return self.server.new_oids(1)[0]
def tpc_begin(self, transaction):
self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')
def tpc_vote(self, transaction):
self.server._restart()
self.server.vote(id(transaction))
result = self.server.client.serials[:]
del self.server.client.serials[:]
return result
def store(self, oid, serial, data, version, transaction):
self.server.storea(oid, serial, data, version, id(transaction))
def tpc_finish(self, transaction, func = lambda: None):
self.server.tpc_finish(id(transaction))
def multiple_storages_invalidation_queue_is_not_insane():
"""
>>> from ZEO.StorageServer import StorageServer, ZEOStorage
>>> from ZODB.FileStorage import FileStorage
>>> from ZODB.DB import DB
>>> from persistent.dict import PersistentDict
>>> from transaction import commit
>>> fs1 = FileStorage('t1.fs')
>>> fs2 = FileStorage('t2.fs')
>>> server = StorageServer(('', get_port()), dict(fs1=fs1, fs2=fs2))
>>> s1 = StorageServerWrapper(server, 'fs1')
>>> s2 = StorageServerWrapper(server, 'fs2')
>>> db1 = DB(s1); conn1 = db1.open()
>>> db2 = DB(s2); conn2 = db2.open()
>>> commit()
>>> o1 = conn1.root()
>>> for i in range(10):
... o1.x = PersistentDict(); o1 = o1.x
... commit()
>>> last = fs1.lastTransaction()
>>> for i in range(5):
... o1.x = PersistentDict(); o1 = o1.x
... commit()
>>> o2 = conn2.root()
>>> for i in range(20):
... o2.x = PersistentDict(); o2 = o2.x
... commit()
>>> trans, oids = s1.getInvalidations(last)
>>> from ZODB.utils import u64
>>> sorted([u64(oid) for (oid, v) in oids])
[10L, 11L, 12L, 13L, 14L]
>>> server.close_server()
"""
def getInvalidationsAfterServerRestart():
"""
Clients were often forced to verify their caches after a server
restart even if there weren't many transactions between the server
restart and the client connect.
Let's create a file storage and stuff some data into it:
>>> from ZEO.StorageServer import StorageServer, ZEOStorage
>>> from ZODB.FileStorage import FileStorage
>>> from ZODB.DB import DB
>>> from persistent.dict import PersistentDict
>>> fs = FileStorage('t.fs')
>>> db = DB(fs)
>>> conn = db.open()
>>> from transaction import commit
>>> last = []
>>> for i in range(100):
... conn.root()[i] = PersistentDict()
... commit()
... last.append(fs.lastTransaction())
>>> db.close()
Now we'll open a storage server on the data, simulating a restart:
>>> fs = FileStorage('t.fs')
>>> sv = StorageServer(('', get_port()), dict(fs=fs))
>>> s = ZEOStorage(sv, sv.read_only)
>>> s.register('fs', False)
If we ask for the last transaction, we should get the last transaction
we saved:
>>> s.lastTransaction() == last[-1]
True
If a storage implements the method lastInvalidations, as FileStorage
does, then the stroage server will populate its invalidation data
structure using lastTransactions.
>>> tid, oids = s.getInvalidations(last[-10])
>>> tid == last[-1]
True
>>> from ZODB.utils import u64
>>> sorted([u64(oid) for (oid, version) in oids])
[0L, 92L, 93L, 94L, 95L, 96L, 97L, 98L, 99L, 100L]
(Note that the fact that we get oids for 92-100 is actually an
artifact of the fact that the FileStorage lastInvalidations method
returns all OIDs written by transactions, even if the OIDs were
created and not modified. FileStorages don't record whether objects
were created rather than modified. Objects that are just created don't
need to be invalidated. This means we'll invalidate objects that
dont' need to be invalidated, however, that's better than verifying
caches.)
>>> sv.close_server()
>>> fs.close()
If a storage doesn't implement lastInvalidations, a client can still
avoid verifying its cache if it was up to date when the server
restarted. To illustrate this, we'll create a subclass of FileStorage
without this method:
>>> class FS(FileStorage):
... lastInvalidations = property()
>>> fs = FS('t.fs')
>>> sv = StorageServer(('', get_port()), dict(fs=fs))
>>> st = StorageServerWrapper(sv, 'fs')
>>> s = st.server
Now, if we ask fior the invalidations since the last committed
transaction, we'll get a result:
>>> tid, oids = s.getInvalidations(last[-1])
>>> tid == last[-1]
True
>>> oids
[]
>>> db = DB(st); conn = db.open()
>>> ob = conn.root()
>>> for i in range(5):
... ob.x = PersistentDict(); ob = ob.x
... commit()
... last.append(fs.lastTransaction())
>>> ntid, oids = s.getInvalidations(tid)
>>> ntid == last[-1]
True
>>> sorted([u64(oid) for (oid, version) in oids])
[0L, 101L, 102L, 103L, 104L]
"""
test_classes = [FileStorageTests, MappingStorageTests,
BlobAdaptedFileStorageTests, BlobWritableCacheTests]
def test_suite():
suite = unittest.TestSuite()
suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp,
tearDown=ZODB.tests.util.tearDown))
for klass in test_classes:
sub = unittest.makeSuite(klass, "check")
suite.addTest(sub)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment