Commit 66a6857a authored by Jim Fulton's avatar Jim Fulton

Refactored handling of invalidations on ZEO clients to fix

a possible ordering problem for invalidation messages.

This was motivated by code inspecition during merge of an earlier
refactoring to the trunk.  The refactoring also simplifies the code
and probably makes it a tad faster.
parent 0cac86d6
...@@ -4,6 +4,9 @@ Whats new in ZODB 3.8.1 ...@@ -4,6 +4,9 @@ Whats new in ZODB 3.8.1
Bugs Fixed: Bugs Fixed:
- (beta 8) Refactored handling of invalidations on ZEO clients to fix
a possible ordering problem for invalidation messages.
- (beta 8) An ZEO cache internal data structure can get out of sync - (beta 8) An ZEO cache internal data structure can get out of sync
with the data in a way that prevents data from being loaded into the with the data in a way that prevents data from being loaded into the
cache. We don't yet know why, but added an exception handler to cache. We don't yet know why, but added an exception handler to
......
...@@ -1240,11 +1240,9 @@ class ClientStorage(object): ...@@ -1240,11 +1240,9 @@ class ClientStorage(object):
# Invalidation as result of verify_cache(). # Invalidation as result of verify_cache().
# Queue an invalidate for the end the verification procedure. # Queue an invalidate for the end the verification procedure.
if self._pickler is None: if self._pickler is None:
# This should never happen. TODO: assert it doesn't, or log log2("invalidateVerify with no _pickler", level=logging.ERROR)
# if it does.
return return
oid, version = args self._pickler.dump((None, [args]))
self._pickler.dump((oid, version, None))
def endVerify(self): def endVerify(self):
"""Server callback to signal end of cache validation.""" """Server callback to signal end of cache validation."""
...@@ -1258,32 +1256,26 @@ class ClientStorage(object): ...@@ -1258,32 +1256,26 @@ class ClientStorage(object):
try: try:
if catch_up: if catch_up:
# process catch-up invalidations # process catch-up invalidations
tid, invalidations = catch_up self._process_invalidations(*catch_up)
self._process_invalidations(
(oid, version, tid)
for oid, version in invalidations
)
if self._pickler is None: if self._pickler is None:
return return
# write end-of-data marker # write end-of-data marker
self._pickler.dump((None, None, None)) self._pickler.dump((None, None))
self._pickler = None self._pickler = None
self._tfile.seek(0) self._tfile.seek(0)
unpickler = cPickle.Unpickler(self._tfile) unpickler = cPickle.Unpickler(self._tfile)
min_tid = self._cache.getLastTid() min_tid = self._cache.getLastTid()
def InvalidationLogIterator(): while 1:
while 1: tid, invalidations = unpickler.load()
oid, version, tid = unpickler.load() if invalidations is None:
if oid is None: break
break if ((tid is None)
if ((tid is None) or (min_tid is None)
or (min_tid is None) or (tid > min_tid)
or (tid > min_tid) ):
): self._process_invalidations(tid, invalidations)
yield oid, version, tid
self._process_invalidations(InvalidationLogIterator())
self._tfile.close() self._tfile.close()
self._tfile = None self._tfile = None
finally: finally:
...@@ -1301,34 +1293,32 @@ class ClientStorage(object): ...@@ -1301,34 +1293,32 @@ class ClientStorage(object):
if self._pickler is not None: if self._pickler is not None:
log2("Transactional invalidation during cache verification", log2("Transactional invalidation during cache verification",
level=BLATHER) level=BLATHER)
for oid, version in args: self._pickler.dump((tid, args))
self._pickler.dump((oid, version, tid))
return return
self._process_invalidations([(oid, version, tid) self._process_invalidations(tid, args)
for oid, version in args])
finally: finally:
self._lock.release() self._lock.release()
def _process_invalidations(self, invs): def _process_invalidations(self, tid, invs):
# Invalidations are sent by the ZEO server as a sequence of # Invalidations are sent by the ZEO server as a sequence of
# oid, version, tid triples. The DB's invalidate() method expects a # oid, version, tid triples. The DB's invalidate() method expects a
# dictionary of oids. # dictionary of oids.
# versions maps version names to dictionary of invalidations # versions maps version names to dictionary of invalidations
versions = {} versions = {}
for oid, version, tid in invs: for oid, version in invs:
if oid == self._load_oid: if oid == self._load_oid:
self._load_status = 0 self._load_status = 0
self._cache.invalidate(oid, version, tid) self._cache.invalidate(oid, version, tid)
oids = versions.get((version, tid)) oids = versions.get(version)
if not oids: if not oids:
versions[(version, tid)] = [oid] versions[version] = [oid]
else: else:
oids.append(oid) oids.append(oid)
if self._db is not None: if self._db is not None:
for (version, tid), d in versions.items(): for version, oids in versions.items():
self._db.invalidate(tid, d, version=version) self._db.invalidate(tid, oids, version=version)
# The following are for compatibility with protocol version 2.0.0 # The following are for compatibility with protocol version 2.0.0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment