Commit 5e6fd05f authored by Jim Fulton's avatar Jim Fulton

Make the cache thread safe again. :/

There are some cases where we want to touch the cache outside of the
I/O thread. Including:

- ClientStorage wants to to invalidata cache entries if it gets a
  conflict error in voting.

- loadBefore can probably be optimized by checking the cache in the
  client thread. (This will be safe for loadBefore, because the intent
  in practice is never to load current data.
parent 062cbecc
...@@ -33,7 +33,7 @@ import time ...@@ -33,7 +33,7 @@ import time
import ZODB.fsIndex import ZODB.fsIndex
import zc.lockfile import zc.lockfile
from ZODB.utils import p64, u64, z64 from ZODB.utils import p64, u64, z64, RLock
import six import six
from ._compat import PYPY from ._compat import PYPY
...@@ -182,6 +182,8 @@ class ClientCache(object): ...@@ -182,6 +182,8 @@ class ClientCache(object):
# currentofs. # currentofs.
self.currentofs = ZEC_HEADER_SIZE self.currentofs = ZEC_HEADER_SIZE
self._lock = RLock()
# self.f is the open file object. # self.f is the open file object.
# When we're not reusing an existing file, self.f is left None # When we're not reusing an existing file, self.f is left None
# here -- the scan() method must be called then to open the file # here -- the scan() method must be called then to open the file
...@@ -239,9 +241,10 @@ class ClientCache(object): ...@@ -239,9 +241,10 @@ class ClientCache(object):
return self return self
def clear(self): def clear(self):
self.f.seek(ZEC_HEADER_SIZE) with self._lock:
self.f.truncate() self.f.seek(ZEC_HEADER_SIZE)
self._initfile(ZEC_HEADER_SIZE) self.f.truncate()
self._initfile(ZEC_HEADER_SIZE)
## ##
# Scan the current contents of the cache file, calling `install` # Scan the current contents of the cache file, calling `install`
...@@ -451,26 +454,28 @@ class ClientCache(object): ...@@ -451,26 +454,28 @@ class ClientCache(object):
# new tid must be strictly greater than our current idea of the most # new tid must be strictly greater than our current idea of the most
# recent tid. # recent tid.
def setLastTid(self, tid): def setLastTid(self, tid):
if (not tid) or (tid == z64): with self._lock:
return if (not tid) or (tid == z64):
if (tid <= self.tid) and self._len: return
if tid == self.tid: if (tid <= self.tid) and self._len:
return # Be a little forgiving if tid == self.tid:
raise ValueError("new last tid (%s) must be greater than " return # Be a little forgiving
"previous one (%s)" raise ValueError("new last tid (%s) must be greater than "
% (u64(tid), u64(self.tid))) "previous one (%s)"
assert isinstance(tid, bytes) and len(tid) == 8, tid % (u64(tid), u64(self.tid)))
self.tid = tid assert isinstance(tid, bytes) and len(tid) == 8, tid
self.f.seek(len(magic)) self.tid = tid
self.f.write(tid) self.f.seek(len(magic))
self.f.flush() self.f.write(tid)
self.f.flush()
## ##
# Return the last transaction seen by the cache. # Return the last transaction seen by the cache.
# @return a transaction id # @return a transaction id
# @defreturn string, or 8 nulls if no transaction is yet known # @defreturn string, or 8 nulls if no transaction is yet known
def getLastTid(self): def getLastTid(self):
return self.tid with self._lock:
return self.tid
## ##
# Return the current data record for oid. # Return the current data record for oid.
...@@ -479,52 +484,54 @@ class ClientCache(object): ...@@ -479,52 +484,54 @@ class ClientCache(object):
# in the cache # in the cache
# @defreturn 3-tuple: (string, string, string) # @defreturn 3-tuple: (string, string, string)
def load(self, oid, before_tid=None): def load(self, oid, before_tid=None):
ofs = self.current.get(oid) with self._lock:
if ofs is None: ofs = self.current.get(oid)
self._trace(0x20, oid) if ofs is None:
return None self._trace(0x20, oid)
self.f.seek(ofs) return None
read = self.f.read
status = read(1)
assert status == b'a', (ofs, self.f.tell(), oid)
size, saved_oid, tid, end_tid, lver, ldata = unpack(
">I8s8s8sHI", read(34))
assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
assert end_tid == z64, (ofs, self.f.tell(), oid, tid, end_tid)
assert lver == 0, "Versions aren't supported"
if before_tid and tid >= before_tid:
return None
data = read(ldata)
assert len(data) == ldata, (ofs, self.f.tell(), oid, len(data), ldata)
# WARNING: The following assert changes the file position.
# We must not depend on this below or we'll fail in optimized mode.
assert read(8) == oid, (ofs, self.f.tell(), oid)
self._n_accesses += 1
self._trace(0x22, oid, tid, end_tid, ldata)
ofsofs = self.currentofs - ofs
if ofsofs < 0:
ofsofs += self.maxsize
if (ofsofs > self.rearrange and
self.maxsize > 10*len(data) and
size > 4):
# The record is far back and might get evicted, but it's
# valuable, so move it forward.
# Remove fromn old loc:
del self.current[oid]
self.f.seek(ofs) self.f.seek(ofs)
self.f.write(b'f'+pack(">I", size)) read = self.f.read
status = read(1)
assert status == b'a', (ofs, self.f.tell(), oid)
size, saved_oid, tid, end_tid, lver, ldata = unpack(
">I8s8s8sHI", read(34))
assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
assert end_tid == z64, (ofs, self.f.tell(), oid, tid, end_tid)
assert lver == 0, "Versions aren't supported"
if before_tid and tid >= before_tid:
return None
data = read(ldata)
assert len(data) == ldata, (
ofs, self.f.tell(), oid, len(data), ldata)
# WARNING: The following assert changes the file position.
# We must not depend on this below or we'll fail in optimized mode.
assert read(8) == oid, (ofs, self.f.tell(), oid)
self._n_accesses += 1
self._trace(0x22, oid, tid, end_tid, ldata)
# Write to new location: ofsofs = self.currentofs - ofs
self._store(oid, tid, None, data, size) if ofsofs < 0:
ofsofs += self.maxsize
return data, tid if (ofsofs > self.rearrange and
self.maxsize > 10*len(data) and
size > 4):
# The record is far back and might get evicted, but it's
# valuable, so move it forward.
# Remove fromn old loc:
del self.current[oid]
self.f.seek(ofs)
self.f.write(b'f'+pack(">I", size))
# Write to new location:
self._store(oid, tid, None, data, size)
return data, tid
## ##
# Return a non-current revision of oid that was current before tid. # Return a non-current revision of oid that was current before tid.
...@@ -533,54 +540,56 @@ class ClientCache(object): ...@@ -533,54 +540,56 @@ class ClientCache(object):
# @return data record, serial number, start tid, and end tid # @return data record, serial number, start tid, and end tid
# @defreturn 4-tuple: (string, string, string, string) # @defreturn 4-tuple: (string, string, string, string)
def loadBefore(self, oid, before_tid): def loadBefore(self, oid, before_tid):
noncurrent_for_oid = self.noncurrent.get(u64(oid)) with self._lock:
if noncurrent_for_oid is None: noncurrent_for_oid = self.noncurrent.get(u64(oid))
result = self.load(oid, before_tid) if noncurrent_for_oid is None:
if result: result = self.load(oid, before_tid)
return result[0], result[1], None if result:
else: return result[0], result[1], None
self._trace(0x24, oid, "", before_tid) else:
return result self._trace(0x24, oid, "", before_tid)
return result
items = noncurrent_for_oid.items(None, u64(before_tid)-1)
if not items: items = noncurrent_for_oid.items(None, u64(before_tid)-1)
result = self.load(oid, before_tid) if not items:
if result: result = self.load(oid, before_tid)
return result[0], result[1], None if result:
else: return result[0], result[1], None
self._trace(0x24, oid, "", before_tid) else:
return result self._trace(0x24, oid, "", before_tid)
return result
tid, ofs = items[-1] tid, ofs = items[-1]
self.f.seek(ofs) self.f.seek(ofs)
read = self.f.read read = self.f.read
status = read(1) status = read(1)
assert status == b'a', (ofs, self.f.tell(), oid, before_tid) assert status == b'a', (ofs, self.f.tell(), oid, before_tid)
size, saved_oid, saved_tid, end_tid, lver, ldata = unpack( size, saved_oid, saved_tid, end_tid, lver, ldata = unpack(
">I8s8s8sHI", read(34)) ">I8s8s8sHI", read(34))
assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid) assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
assert saved_tid == p64(tid), (ofs, self.f.tell(), oid, saved_tid, tid) assert saved_tid == p64(tid), (
assert end_tid != z64, (ofs, self.f.tell(), oid) ofs, self.f.tell(), oid, saved_tid, tid)
assert lver == 0, "Versions aren't supported" assert end_tid != z64, (ofs, self.f.tell(), oid)
data = read(ldata) assert lver == 0, "Versions aren't supported"
assert len(data) == ldata, (ofs, self.f.tell()) data = read(ldata)
assert len(data) == ldata, (ofs, self.f.tell())
# WARNING: The following assert changes the file position.
# We must not depend on this below or we'll fail in optimized mode. # WARNING: The following assert changes the file position.
assert read(8) == oid, (ofs, self.f.tell(), oid) # We must not depend on this below or we'll fail in optimized mode.
assert read(8) == oid, (ofs, self.f.tell(), oid)
if end_tid < before_tid:
result = self.load(oid, before_tid) if end_tid < before_tid:
if result: result = self.load(oid, before_tid)
return result[0], result[1], None if result:
else: return result[0], result[1], None
self._trace(0x24, oid, "", before_tid) else:
return result self._trace(0x24, oid, "", before_tid)
return result
self._n_accesses += 1 self._n_accesses += 1
self._trace(0x26, oid, "", saved_tid) self._trace(0x26, oid, "", saved_tid)
return data, saved_tid, end_tid return data, saved_tid, end_tid
## ##
# Store a new data record in the cache. # Store a new data record in the cache.
...@@ -591,45 +600,48 @@ class ClientCache(object): ...@@ -591,45 +600,48 @@ class ClientCache(object):
# current. # current.
# @param data the actual data # @param data the actual data
def store(self, oid, start_tid, end_tid, data): def store(self, oid, start_tid, end_tid, data):
seek = self.f.seek with self._lock:
if end_tid is None: seek = self.f.seek
ofs = self.current.get(oid) if end_tid is None:
if ofs: ofs = self.current.get(oid)
seek(ofs) if ofs:
read = self.f.read seek(ofs)
status = read(1) read = self.f.read
assert status == b'a', (ofs, self.f.tell(), oid) status = read(1)
size, saved_oid, saved_tid, end_tid = unpack( assert status == b'a', (ofs, self.f.tell(), oid)
">I8s8s8s", read(28)) size, saved_oid, saved_tid, end_tid = unpack(
assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid) ">I8s8s8s", read(28))
assert end_tid == z64, (ofs, self.f.tell(), oid) assert saved_oid == oid, (
if saved_tid == start_tid: ofs, self.f.tell(), oid, saved_oid)
assert end_tid == z64, (ofs, self.f.tell(), oid)
if saved_tid == start_tid:
return
raise ValueError("already have current data for oid")
else:
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid and (
u64(start_tid) in noncurrent_for_oid):
return return
raise ValueError("already have current data for oid")
else:
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid and (u64(start_tid) in noncurrent_for_oid):
return
size = allocated_record_overhead + len(data) size = allocated_record_overhead + len(data)
# A number of cache simulation experiments all concluded that the # A number of cache simulation experiments all concluded that the
# 2nd-level ZEO cache got a much higher hit rate if "very large" # 2nd-level ZEO cache got a much higher hit rate if "very large"
# objects simply weren't cached. For now, we ignore the request # objects simply weren't cached. For now, we ignore the request
# only if the entire cache file is too small to hold the object. # only if the entire cache file is too small to hold the object.
if size >= min(max_block_size, self.maxsize - ZEC_HEADER_SIZE): if size >= min(max_block_size, self.maxsize - ZEC_HEADER_SIZE):
return return
self._n_adds += 1 self._n_adds += 1
self._n_added_bytes += size self._n_added_bytes += size
self._len += 1 self._len += 1
self._store(oid, start_tid, end_tid, data, size) self._store(oid, start_tid, end_tid, data, size)
if end_tid: if end_tid:
self._trace(0x54, oid, start_tid, end_tid, dlen=len(data)) self._trace(0x54, oid, start_tid, end_tid, dlen=len(data))
else: else:
self._trace(0x52, oid, start_tid, dlen=len(data)) self._trace(0x52, oid, start_tid, dlen=len(data))
def _store(self, oid, start_tid, end_tid, data, size): def _store(self, oid, start_tid, end_tid, data, size):
# Low-level store used by store and load # Low-level store used by store and load
...@@ -696,35 +708,37 @@ class ClientCache(object): ...@@ -696,35 +708,37 @@ class ClientCache(object):
# - tid the id of the transaction that wrote a new revision of oid, # - tid the id of the transaction that wrote a new revision of oid,
# or None to forget all cached info about oid. # or None to forget all cached info about oid.
def invalidate(self, oid, tid): def invalidate(self, oid, tid):
ofs = self.current.get(oid) with self._lock:
if ofs is None: ofs = self.current.get(oid)
# 0x10 == invalidate (miss) if ofs is None:
self._trace(0x10, oid, tid) # 0x10 == invalidate (miss)
return self._trace(0x10, oid, tid)
return
self.f.seek(ofs)
read = self.f.read
status = read(1)
assert status == b'a', (ofs, self.f.tell(), oid)
size, saved_oid, saved_tid, end_tid = unpack(">I8s8s8s", read(28))
assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
assert end_tid == z64, (ofs, self.f.tell(), oid)
del self.current[oid]
if tid is None:
self.f.seek(ofs) self.f.seek(ofs)
self.f.write(b'f'+pack(">I", size)) read = self.f.read
# 0x1E = invalidate (hit, discarding current or non-current) status = read(1)
self._trace(0x1E, oid, tid) assert status == b'a', (ofs, self.f.tell(), oid)
self._len -= 1 size, saved_oid, saved_tid, end_tid = unpack(">I8s8s8s", read(28))
else: assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
if tid == saved_tid: assert end_tid == z64, (ofs, self.f.tell(), oid)
logger.warning("Ignoring invalidation with same tid as current") del self.current[oid]
return if tid is None:
self.f.seek(ofs+21) self.f.seek(ofs)
self.f.write(tid) self.f.write(b'f'+pack(">I", size))
self._set_noncurrent(oid, saved_tid, ofs) # 0x1E = invalidate (hit, discarding current or non-current)
# 0x1C = invalidate (hit, saving non-current) self._trace(0x1E, oid, tid)
self._trace(0x1C, oid, tid) self._len -= 1
else:
if tid == saved_tid:
logger.warning(
"Ignoring invalidation with same tid as current")
return
self.f.seek(ofs+21)
self.f.write(tid)
self._set_noncurrent(oid, saved_tid, ofs)
# 0x1C = invalidate (hit, saving non-current)
self._trace(0x1C, oid, tid)
## ##
# Generates (oid, serial) oairs for all objects in the # Generates (oid, serial) oairs for all objects in the
......
...@@ -24,8 +24,7 @@ class StaleCache(object): ...@@ -24,8 +24,7 @@ class StaleCache(object):
class IClientCache(zope.interface.Interface): class IClientCache(zope.interface.Interface):
"""Client cache interface. """Client cache interface.
Note that caches need not be thread safe, fpr the most part, Note that caches need to be thread safe.
except for getLastTid, which may be called from multiple threads.
""" """
def close(): def close():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment