Commit bd901052 authored by Jim Fulton's avatar Jim Fulton

Major refactoring of cache code to reduce memory consumption, which

was astronomical.  Also greatly simplified the code.
parent a6605d20
Whats new in ZODB 3.8.1
=======================
......@@ -17,6 +19,9 @@ Bugs Fixed:
- A bug in the cache-opening logic led to cache failure in the
unlikely event that a cache has no free blocks.
- The cache used an excessive amount of memory, causing applications
with large caches to exhaust available memory.
- When using ZEO Client Storages, Errors occured when trying to store
objects too big to fit in the ZEO cache file.
......
......@@ -22,51 +22,54 @@ it to map this richer API onto the simple key-based API of the lower-level
FileCache.
"""
from struct import pack, unpack
import bisect
import BTrees.LLBTree
import BTrees.LOBTree
import logging
import os
import struct
import tempfile
import time
import ZODB.fsIndex
import ZODB.lock_file
from ZODB.utils import z64, u64
from ZODB.utils import p64, u64, z64
logger = logging.getLogger("ZEO.cache")
##
# A disk-based cache for ZEO clients.
# <p>
#
# This class provides an interface to a persistent, disk-based cache
# used by ZEO clients to store copies of database records from the
# server.
# <p>
#
# The details of the constructor as unspecified at this point.
# <p>
#
# Each entry in the cache is valid for a particular range of transaction
# ids. The lower bound is the transaction that wrote the data. The
# upper bound is the next transaction that wrote a revision of the
# object. If the data is current, the upper bound is stored as None;
# the data is considered current until an invalidate() call is made.
# <p>
#
# It is an error to call store() twice with the same object without an
# intervening invalidate() to set the upper bound on the first cache
# entry. <em>Perhaps it will be necessary to have a call the removes
# entry. Perhaps it will be necessary to have a call the removes
# something from the cache outright, without keeping a non-current
# entry.</em>
# <h3>Cache verification</h3>
# <p>
# entry.
# Cache verification
#
# When the client is connected to the server, it receives
# invalidations every time an object is modified. When the client is
# disconnected then reconnects, it must perform cache verification to make
# sure its cached data is synchronized with the storage's current state.
# <p>
#
# quick verification
# full verification
# <p>
#
##
# FileCache stores a cache in a single on-disk file.
#
# On-disk cache structure.
......@@ -100,11 +103,13 @@ ZEC3_HEADER_SIZE = 12
#
# 1 byte allocation status ('a').
# 4 bytes block size, >I format.
# 16 bytes oid + tid, string.
# size-OBJECT_HEADER_SIZE bytes, the serialization of an Object (see
# class Object for details).
OBJECT_HEADER_SIZE = 1 + 4 + 16
# 8 byte oid
# 8 byte start_tid
# 8 byte end_tid
# 2 byte version length
# 4 byte data size
# version
# data
# The cache's currentofs goes around the file, circularly, forever.
# It's always the starting offset of some block.
......@@ -120,11 +125,6 @@ OBJECT_HEADER_SIZE = 1 + 4 + 16
class ClientCache(object):
"""A simple in-memory cache."""
##
# Do we put the constructor here?
# @param path path of persistent snapshot of cache state (a file path)
# @param size size of cache file, in bytes
# The default size of 200MB makes a lot more sense than the traditional
# default of 20MB. The default here is misleading, though, since
# ClientStorage is the only user of ClientCache, and it always passes an
......@@ -140,57 +140,22 @@ class ClientCache(object):
# to change the cache size in that case
self.maxsize = size
# The cache stores objects in a dict mapping (oid, tid) pairs
# to Object() records (see below). The tid is the transaction
# id that wrote the object. An object record includes data,
# serialno, and end tid. It has auxillary data structures to
# compute the appropriate tid, given the oid and a transaction id
# representing an arbitrary point in history.
#
# The serialized form of the cache just stores the Object()
# records. The in-memory form can be reconstructed from these
# records.
# Maps oid to current tid. Used to compute key for objects.
self.current = {}
# The number of records in the cache.
self._len = 0
# Maps oid to list of (start_tid, end_tid) pairs in sorted order.
# Used to find matching key for load of non-current data.
self.noncurrent = {}
# {oid -> pos}
self.current = ZODB.fsIndex.fsIndex()
# Map oid to (version, tid) pair. If there is no entry, the object
# is not modified in a version.
self.version = {}
# {oid -> {tid->pos}}
# Note that caches in the wild seem to have very little non-current
# data, so this would seem to have little impact on memory consumption.
# I wonder if we even need to store non-current data in the cache.
self.noncurrent = BTrees.LOBTree.LOBTree()
# tid for the most recent transaction we know about. This is also
# stored near the start of the file.
self.tid = None
# There's one Entry instance, kept in memory, for each currently
# allocated block in the file, and there's one allocated block in the
# file per serialized Object. filemap retrieves the Entry given the
# starting offset of a block, and key2entry retrieves the Entry given
# an object revision's key (an (oid, start_tid) pair). From an
# Entry, we can get the Object's key and file offset.
# Map offset in file to pair (data record size, Entry).
# Entry is None iff the block starting at offset is free.
# filemap always contains a complete account of what's in the
# file -- study method _verify_filemap for executable checking
# of the relevant invariants. An offset is at the start of a
# block iff it's a key in filemap. The data record size is
# stored in the file too, so we could just seek to the offset
# and read it up; keeping it in memory is an optimization.
self.filemap = {}
# Map key to Entry. After
# obj = key2entry[key]
# then
# obj.key == key
# is true. An object is currently stored on disk iff its key is in
# key2entry.
self.key2entry = {}
# Always the offset into the file of the start of a block.
# New and relocated objects are always written starting at
# currentofs.
......@@ -224,11 +189,9 @@ class ClientCache(object):
self.f.write(magic)
self.f.write(z64)
# and one free block.
self.f.write('f' + struct.pack(">I", self.maxsize -
self.f.write('f' + pack(">I", self.maxsize -
ZEC3_HEADER_SIZE))
sync(self.f)
self.filemap[ZEC3_HEADER_SIZE] = (self.maxsize - ZEC3_HEADER_SIZE,
None)
# Statistics: _n_adds, _n_added_bytes,
# _n_evicts, _n_evicted_bytes,
......@@ -237,12 +200,17 @@ class ClientCache(object):
self._setup_trace(path)
# Backward compatibility. Client code used to have to use the fc
# attr to get to the file cache to get cache stats.
@property
def fc(self):
return self
##
# Scan the current contents of the cache file, calling `install`
# for each object found in the cache. This method should only
# be called once to initialize the cache from disk.
def scan(self, install):
def open(self):
if self.f is not None: # we're not (re)using a pre-existing file
return
fsize = os.path.getsize(self.path)
......@@ -252,10 +220,12 @@ class ClientCache(object):
fsize, self.maxsize)
self.maxsize = fsize
self.f = open(self.path, 'rb+')
_magic = self.f.read(4)
read = self.f.read
seek = self.f.seek
_magic = read(4)
if _magic != magic:
raise ValueError("unexpected magic number: %r" % _magic)
self.tid = self.f.read(8)
self.tid = read(8)
if len(self.tid) != 8:
raise ValueError("cache file too small -- no tid at start")
......@@ -263,38 +233,48 @@ class ClientCache(object):
# file, and tell our parent about it too (via the `install` callback).
# Remember the location of the largest free block. That seems a
# decent place to start currentofs.
max_free_size = 0
max_free_size = l = 0
ofs = max_free_offset = ZEC3_HEADER_SIZE
current = self.current
while ofs < fsize:
self.f.seek(ofs)
ent = None
status = self.f.read(1)
seek(ofs)
status = read(1)
if status == 'a':
size, rawkey = struct.unpack(">I16s", self.f.read(20))
key = rawkey[:8], rawkey[8:]
assert key not in self.key2entry
self.key2entry[key] = ent = Entry(key, ofs)
install(self.f, ent)
size, oid, start_tid, end_tid = unpack(">I8s8s8s", read(28))
if end_tid == z64:
current[oid] = ofs
else:
assert start_tid < end_tid
self._set_noncurrent(oid, start_tid, ofs)
l += 1
elif status == 'f':
size, = struct.unpack(">I", self.f.read(4))
size, = unpack(">I", read(4))
elif status in '1234':
size = int(status)
else:
raise ValueError("unknown status byte value %s in client "
"cache file" % 0, hex(ord(status)))
self.filemap[ofs] = size, ent
if ent is None and size > max_free_size:
max_free_size, max_free_offset = size, ofs
ofs += size
if ofs != fsize:
raise ValueError("final offset %s != file size %s in client "
"cache file" % (ofs, fsize))
if __debug__:
self._verify_filemap()
self.currentofs = max_free_offset
self._len = l
def _set_noncurrent(self, oid, tid, ofs):
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid is None:
noncurrent_for_oid = BTrees.LLBTree.LLBTree()
self.noncurrent[u64(oid)] = noncurrent_for_oid
noncurrent_for_oid[u64(tid)] = ofs
def _del_noncurrent(self, oid, tid):
noncurrent_for_oid = self.noncurrent[u64(oid)]
del noncurrent_for_oid[u64(tid)]
if not noncurrent_for_oid:
del self.noncurrent[u64(oid)]
def clearStats(self):
self._n_adds = self._n_added_bytes = 0
......@@ -310,17 +290,7 @@ class ClientCache(object):
##
# The number of objects currently in the cache.
def __len__(self):
return len(self.key2entry)
##
# Iterate over the objects in the cache, producing an Entry for each.
def __iter__(self):
return self.key2entry.itervalues()
##
# Test whether an (oid, tid) pair is in the cache.
def __contains__(self, key):
return key in self.key2entry
return self._len
##
# Close the underlying file. No methods accessing the cache should be
......@@ -349,159 +319,30 @@ class ClientCache(object):
if self.currentofs + nbytes > self.maxsize:
self.currentofs = ZEC3_HEADER_SIZE
ofs = self.currentofs
seek = self.f.seek
read = self.f.read
current = self.current
while nbytes > 0:
size, e = self.filemap.pop(ofs)
if e is not None:
del self.key2entry[e.key]
self._evictobj(e, size)
seek(ofs)
status = read(1)
if status == 'a':
size, oid, start_tid, end_tid = unpack(">I8s8s8s", read(28))
self._n_evicts += 1
self._n_evicted_bytes += size
if end_tid == z64:
del current[oid]
else:
self._del_noncurrent(oid, start_tid)
self._len -= 1
else:
if status == 'f':
size = unpack(">I", read(4))[0]
else:
size = int(status)
ofs += size
nbytes -= size
return ofs - self.currentofs
##
# Write Object obj, with data, to file starting at currentofs.
# nfreebytes are already available for overwriting, and it's
# guranteed that's enough. obj.offset is changed to reflect the
# new data record position, and filemap and key2entry are updated to
# match.
def _writeobj(self, obj, nfreebytes):
size = OBJECT_HEADER_SIZE + obj.size
assert size <= nfreebytes
excess = nfreebytes - size
# If there's any excess (which is likely), we need to record a
# free block following the end of the data record. That isn't
# expensive -- it's all a contiguous write.
if excess == 0:
extra = ''
elif excess < 5:
extra = "01234"[excess]
else:
extra = 'f' + struct.pack(">I", excess)
self.f.seek(self.currentofs)
# Before writing data, we'll write a free block for the space freed.
# We'll come back with a last atomic write to rewrite the start of the
# allocated-block header.
self.f.write('f'+struct.pack(">I", nfreebytes))
# Now write the rest of the allocation block header and object data.
self.f.write(struct.pack(">8s8s", obj.key[0], obj.key[1]))
obj.serialize(self.f)
self.f.write(extra)
# Now, we'll go back and rewrite the beginning of the
# allocated block header.
self.f.seek(self.currentofs)
self.f.write('a'+struct.pack(">I", size))
# Update index
e = Entry(obj.key, self.currentofs)
self.key2entry[obj.key] = e
self.filemap[self.currentofs] = size, e
self.currentofs += size
if excess:
# We need to record the free block in filemap, but there's
# no need to advance currentofs beyond it. Instead it
# gives some breathing room for the next object to get
# written.
self.filemap[self.currentofs] = excess, None
##
# Add Object object to the cache. This may evict existing objects, to
# make room (and almost certainly will, in steady state once the cache
# is first full). The object must not already be in the cache. If the
# object is too large for the cache, False is returned, otherwise True.
def add(self, object):
size = OBJECT_HEADER_SIZE + object.size
# A number of cache simulation experiments all concluded that the
# 2nd-level ZEO cache got a much higher hit rate if "very large"
# objects simply weren't cached. For now, we ignore the request
# only if the entire cache file is too small to hold the object.
if size > self.maxsize - ZEC3_HEADER_SIZE:
return False
assert object.key not in self.key2entry
assert len(object.key[0]) == 8
assert len(object.key[1]) == 8
self._n_adds += 1
self._n_added_bytes += size
available = self._makeroom(size)
self._writeobj(object, available)
return True
##
# Evict the object represented by Entry `e` from the cache, freeing
# `size` bytes in the file for reuse. `size` is used only for summary
# statistics. This does not alter the file, or self.filemap or
# self.key2entry (those are the caller's responsibilities). It does
# invoke _evicted(Object) on our parent.
def _evictobj(self, e, size):
self._n_evicts += 1
self._n_evicted_bytes += size
# Load the object header into memory so we know how to
# update the parent's in-memory data structures.
self.f.seek(e.offset + OBJECT_HEADER_SIZE)
o = Object.fromFile(self.f, e.key, skip_data=True)
self._evicted(o)
##
# Return Object for key, or None if not in cache.
def access(self, key):
self._n_accesses += 1
e = self.key2entry.get(key)
if e is None:
return None
offset = e.offset
size, e2 = self.filemap[offset]
assert e is e2
self.f.seek(offset + OBJECT_HEADER_SIZE)
return Object.fromFile(self.f, key)
##
# Remove Object for key from cache, if present.
def remove(self, key):
# If an object is being explicitly removed, we need to load
# its header into memory and write a free block marker to the
# disk where the object was stored. We need to load the
# header to update the in-memory data structures held by
# ClientCache.
# We could instead just keep the header in memory at all times.
e = self.key2entry.pop(key, None)
if e is None:
return
offset = e.offset
size, e2 = self.filemap[offset]
assert e is e2
self.filemap[offset] = size, None
self.f.seek(offset + OBJECT_HEADER_SIZE)
o = Object.fromFile(self.f, key, skip_data=True)
assert size >= 5 # only free blocks are tiny
# Because `size` >= 5, we can change an allocated block to a free
# block just by overwriting the 'a' status byte with 'f' -- the
# size field stays the same.
self.f.seek(offset)
self.f.write('f')
self.f.flush()
self._evicted(o)
##
# Update on-disk representation of Object obj.
#
# This method should be called when the object header is modified.
# obj must be in the cache. The only real use for this is during
# invalidation, to set the end_tid field on a revision that was current
# (and so had an end_tid of None, but no longer does).
def update(self, obj):
e = self.key2entry[obj.key]
self.f.seek(e.offset + OBJECT_HEADER_SIZE)
obj.serialize_header(self.f)
##
# Update our idea of the most recent tid. This is stored in the
# instance, and also written out near the start of the cache file. The
......@@ -518,100 +359,6 @@ class ClientCache(object):
self.f.write(tid)
self.f.flush()
##
# This debug method marches over the entire cache file, verifying that
# the current contents match the info in self.filemap and self.key2entry.
def _verify_filemap(self, display=False):
a = ZEC3_HEADER_SIZE
f = self.f
while a < self.maxsize:
f.seek(a)
status = f.read(1)
if status in 'af':
size, = struct.unpack(">I", f.read(4))
else:
size = int(status)
if display:
if a == self.currentofs:
print '*****',
print "%c%d" % (status, size),
size2, obj = self.filemap[a]
assert size == size2
assert (obj is not None) == (status == 'a')
if obj is not None:
assert obj.offset == a
assert self.key2entry[obj.key] is obj
a += size
if display:
print
assert a == self.maxsize
def open(self):
self.scan(self.install)
##
# Callback for FileCache.scan(), when a pre-existing file cache is
# used. For each object in the file, `install()` is invoked. `f`
# is the file object, positioned at the start of the serialized Object.
# `ent` is an Entry giving the object's key ((oid, start_tid) pair).
def install(self, f, ent):
# Called by cache storage layer to insert object.
o = Object.fromFile(f, ent.key, skip_data=True)
if o is None:
return
oid = o.key[0]
if o.version:
self.version[oid] = o.version, o.start_tid
elif o.end_tid is None:
self.current[oid] = o.start_tid
else:
assert o.start_tid < o.end_tid
this_span = o.start_tid, o.end_tid
span_list = self.noncurrent.get(oid)
if span_list:
bisect.insort_left(span_list, this_span)
else:
self.noncurrent[oid] = [this_span]
##
# Set the last transaction seen by the cache.
# @param tid a transaction id
# @exception ValueError attempt to set a new tid less than the current tid
##
# Return the last transaction seen by the cache.
# @return a transaction id
......@@ -631,28 +378,29 @@ class ClientCache(object):
# @defreturn 3-tuple: (string, string, string)
def load(self, oid, version=""):
tid = None
if version:
p = self.version.get(oid)
if p is None:
self._trace(0x20, oid, version)
return None
elif p[0] == version:
tid = p[1]
# Otherwise, we know the cache has version data but not
# for the requested version. Thus, we know it is safe
# to return the non-version data from the cache.
if tid is None:
tid = self.current.get(oid)
if tid is None:
self._trace(0x20, oid, version)
return None
o = self.access((oid, tid))
if o is None:
ofs = self.current.get(oid)
if ofs is None:
self._trace(0x20, oid, version)
return None
self._trace(0x22, oid, version, o.start_tid, o.end_tid, len(o.data))
return o.data, tid, o.version
self.f.seek(ofs)
read = self.f.read
assert read(1) == 'a'
size, saved_oid, tid, end_tid, lver, ldata = unpack(
">I8s8s8shI", read(34))
assert saved_oid == oid
if lver or version:
if lver != len(version) or read(lver) != version:
self._trace(0x20, oid, version)
return None
data = read(ldata)
assert len(data) == ldata
assert read(8) == oid
self._n_accesses += 1
self._trace(0x22, oid, version, tid, end_tid, ldata)
return data, tid, version
##
# Return a non-current revision of oid that was current before tid.
......@@ -661,30 +409,38 @@ class ClientCache(object):
# @return data record, serial number, start tid, and end tid
# @defreturn 4-tuple: (string, string, string, string)
def loadBefore(self, oid, tid):
L = self.noncurrent.get(oid)
if L is None:
self._trace(0x24, oid, "", tid)
def loadBefore(self, oid, before_tid):
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid is None:
self._trace(0x24, oid, "", before_tid)
return None
# A pair with None as the second element is less than any pair with
# the same first tid. Dubious: this relies on that None is less
# than any comparable non-None object in recent Pythons.
i = bisect.bisect_left(L, (tid, None))
# Now L[i-1] < (tid, None) < L[i], and the start_tid for everything in
# L[:i] is < tid, and the start_tid for everything in L[i:] is >= tid.
# Therefore the largest start_tid < tid must be at L[i-1]. If i is 0,
# there is no start_tid < tid: we don't have any data old enougn.
if i == 0:
self._trace(0x24, oid, "", tid)
return
lo, hi = L[i-1]
assert lo < tid
if tid > hi: # we don't have any data in the right range
self._trace(0x24, oid, "", tid)
items = noncurrent_for_oid.items(None, u64(before_tid)-1)
if not items:
self._trace(0x24, oid, "", before_tid)
return None
tid, ofs = items[-1]
self.f.seek(ofs)
read = self.f.read
assert read(1) == 'a'
size, saved_oid, saved_tid, end_tid, lver, ldata = unpack(
">I8s8s8shI", read(34))
assert saved_oid == oid
assert saved_tid == p64(tid)
assert lver == 0
assert end_tid != z64
data = read(ldata)
assert len(data) == ldata
assert read(8) == oid
if end_tid < before_tid:
self._trace(0x24, oid, "", before_tid)
return None
o = self.access((oid, lo))
self._trace(0x26, oid, "", tid)
return o.data, o.start_tid, o.end_tid
self._n_accesses += 1
self._trace(0x26, oid, "", saved_tid)
return data, saved_tid, end_tid
##
# Return the version an object is modified in, or None for an
......@@ -693,12 +449,20 @@ class ClientCache(object):
# @return name of version in which the object is modified
# @defreturn string or None
# XXX This approac is wrong, but who cares
def modifiedInVersion(self, oid):
p = self.version.get(oid)
if p is None:
ofs = self.current.get(oid)
if ofs is None:
return None
version, tid = p
return version
self.f.seek(ofs)
read = self.f.read
assert self.f.read(1) == 'a'
size, saved_oid, saved_tid, end_tid, lver, ldata = unpack(
">I8s8s8shI", read(34))
assert saved_oid == oid
if lver:
return read(lver)
return None
##
# Store a new data record in the cache.
......@@ -720,62 +484,89 @@ class ClientCache(object):
# the requested version, doesn't find it, then asks the server
# for that data. The server returns the non-version data,
# which may already be in the cache.
if (oid, start_tid) in self:
seek = self.f.seek
if end_tid is None:
ofs = self.current.get(oid)
if ofs:
seek(ofs)
read = self.f.read
assert read(1) == 'a'
size, saved_oid, saved_tid, end_tid = unpack(
">I8s8s8s", read(28))
assert saved_oid == oid
assert end_tid == z64
if saved_tid == start_tid:
return
raise ValueError("already have current data for oid")
else:
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid and (u64(start_tid) in noncurrent_for_oid):
return
if version:
raise ValueError("cache only stores current version data")
size = 43 + len(version) + len(data)
# A number of cache simulation experiments all concluded that the
# 2nd-level ZEO cache got a much higher hit rate if "very large"
# objects simply weren't cached. For now, we ignore the request
# only if the entire cache file is too small to hold the object.
if size > self.maxsize - ZEC3_HEADER_SIZE:
return
o = Object((oid, start_tid), version, data, start_tid, end_tid)
self._n_adds += 1
self._n_added_bytes += size
self._len += 1
nfreebytes = self._makeroom(size)
assert size <= nfreebytes
excess = nfreebytes - size
# If there's any excess (which is likely), we need to record a
# free block following the end of the data record. That isn't
# expensive -- it's all a contiguous write.
if excess == 0:
extra = ''
elif excess < 5:
extra = "01234"[excess]
else:
extra = 'f' + pack(">I", excess)
ofs = self.currentofs
seek(ofs)
write = self.f.write
# Before writing data, we'll write a free block for the space freed.
# We'll come back with a last atomic write to rewrite the start of the
# allocated-block header.
write('f'+pack(">I", nfreebytes))
# Now write the rest of the allocation block header and object data.
write(pack(">8s8s8shi",
oid, start_tid, end_tid or z64, len(version), len(data),
))
if version:
if end_tid is not None:
raise ValueError("cache only stores current version data")
if oid in self.version:
if self.version[oid] != (version, start_tid):
raise ValueError("data already exists for version %r"
% self.version[oid][0])
if not self.add(o):
return # too large
self.version[oid] = version, start_tid
self._trace(0x50, oid, version, start_tid, dlen=len(data))
write(version)
write(data)
write(oid)
write(extra)
# Now, we'll go back and rewrite the beginning of the
# allocated block header.
seek(ofs)
write('a'+pack(">I", size))
if end_tid:
self._set_noncurrent(oid, start_tid, ofs)
self._trace(0x54, oid, version, start_tid, end_tid, dlen=len(data))
else:
if end_tid is None:
_cur_start = self.current.get(oid)
if _cur_start:
if _cur_start != start_tid:
raise ValueError(
"already have current data for oid")
else:
return
if not self.add(o):
return # too large
self.current[oid] = start_tid
self._trace(0x52, oid, version, start_tid, dlen=len(data))
self.current[oid] = ofs
if version:
self._trace(0x50, oid, version, start_tid, dlen=len(data))
else:
L = self.noncurrent.setdefault(oid, [])
p = start_tid, end_tid
if p in L:
return # duplicate store
if not self.add(o):
return # too large
bisect.insort_left(L, p)
self._trace(0x54, oid, version, start_tid, end_tid,
dlen=len(data))
##
# Remove all knowledge of noncurrent revisions of oid, both in
# self.noncurrent and in our FileCache. `version` and `tid` are used
# only for trace records.
def _remove_noncurrent_revisions(self, oid, version, tid):
noncurrent_list = self.noncurrent.get(oid)
if noncurrent_list:
# Note: must iterate over a copy of noncurrent_list. The
# FileCache remove() calls our _evicted() method, and that
# mutates the list.
for old_tid, dummy in noncurrent_list[:]:
# 0x1E = invalidate (hit, discarding current or non-current)
self._trace(0x1E, oid, version, tid)
self.remove((oid, old_tid))
# fc.remove() calling back to _evicted() should have removed
# the list from noncurrent when the last non-current revision
# was removed.
assert oid not in self.noncurrent
self._trace(0x52, oid, version, start_tid, dlen=len(data))
self.currentofs += size
##
# If `tid` is None, or we have data for `oid` in a (non-empty) version,
......@@ -794,62 +585,36 @@ class ClientCache(object):
if tid > self.tid and tid is not None:
self.setLastTid(tid)
remove_all_knowledge_of_oid = tid is None
if oid in self.version:
# Forget we know about the version data.
# 0x1A = invalidate (hit, version)
self._trace(0x1A, oid, version, tid)
dllversion, dlltid = self.version[oid]
assert not version or version == dllversion, (version, dllversion)
self.remove((oid, dlltid))
assert oid not in self.version # .remove() got rid of it
# And continue: we must also remove any non-version data from
# the cache. Or, at least, I have such a poor understanding of
# versions that anything less drastic would probably be wrong.
remove_all_knowledge_of_oid = True
if remove_all_knowledge_of_oid:
self._remove_noncurrent_revisions(oid, version, tid)
# Only current, non-version data remains to be handled.
cur_tid = self.current.get(oid)
if not cur_tid:
ofs = self.current.get(oid)
if ofs is None:
# 0x10 == invalidate (miss)
self._trace(0x10, oid, version, tid)
return
# We had current data for oid, but no longer.
if remove_all_knowledge_of_oid:
# 0x1E = invalidate (hit, discarding current or non-current)
self._trace(0x1E, oid, version, tid)
self.remove((oid, cur_tid))
assert cur_tid not in self.current # .remove() got rid of it
return
# Add the data we have to the list of non-current data for oid.
assert tid is not None and cur_tid <= tid
# 0x1C = invalidate (hit, saving non-current)
self._trace(0x1C, oid, version, tid)
del self.current[oid] # because we no longer have current data
# Update the end_tid half of oid's validity range on disk.
# TODO: Want to fetch object without marking it as accessed.
o = self.access((oid, cur_tid))
assert o is not None
assert o.end_tid is None # i.e., o was current
if o is None:
# TODO: Since we asserted o is not None above, this block
# should be removed; waiting on time to prove it can't happen.
return
o.end_tid = tid
self.update(o) # record the new end_tid on disk
# Add to oid's list of non-current data.
L = self.noncurrent.setdefault(oid, [])
bisect.insort_left(L, (cur_tid, tid))
self.f.seek(ofs)
read = self.f.read
assert read(1) == 'a'
size, saved_oid, saved_tid, end_tid, lver = unpack(
">I8s8s8sh", read(30))
assert saved_oid == oid
assert end_tid == z64
del self.current[oid]
if tid is None or lver:
self.f.seek(ofs)
self.f.write('f'+pack(">I", size))
if lver:
# 0x1A = invalidate (hit, version)
self._trace(0x1A, oid, version, tid)
else:
# 0x1E = invalidate (hit, discarding current or non-current)
self._trace(0x1E, oid, version, tid)
self._len -= 1
else:
self.f.seek(ofs+21)
self.f.write(tid)
self._set_noncurrent(oid, saved_tid, ofs)
# 0x1C = invalidate (hit, saving non-current)
self._trace(0x1C, oid, version, tid)
##
# Generates (oid, serial, version) triples for all objects in the
......@@ -857,13 +622,19 @@ class ClientCache(object):
def contents(self):
# May need to materialize list instead of iterating;
# depends on whether the caller may change the cache.
for o in self:
oid, tid = o.key
if oid in self.version:
obj = self.access(o.key)
yield oid, tid, obj.version
seek = self.f.seek
read = self.f.read
for oid, ofs in self.current.iteritems():
seek(ofs)
assert read(1) == 'a'
size, saved_oid, tid, end_tid, lver = unpack(">I8s8s8sh", read(30))
assert saved_oid == oid
assert end_tid == z64
if lver:
version = read(lver)
else:
yield oid, tid, ""
version = ''
yield oid, tid, version
def dump(self):
from ZODB.utils import oid_repr
......@@ -880,29 +651,6 @@ class ClientCache(object):
print oid_repr(x.key[0]), oid_repr(x.key[1]), oid_repr(end_tid)
print
def _evicted(self, o):
# Called by the FileCache to signal that Object o has been evicted.
oid, tid = o.key
if o.end_tid is None:
if o.version:
del self.version[oid]
else:
del self.current[oid]
else:
# Although we use bisect to keep the list sorted,
# we never expect the list to be very long. So the
# brute force approach should normally be fine.
L = self.noncurrent[oid]
element = (o.start_tid, o.end_tid)
if len(L) == 1:
# We don't want to leave an empty list in the dict: if
# the oid is never referenced again, it would consume RAM
# forever more for no purpose.
assert L[0] == element
del self.noncurrent[oid]
else:
L.remove(element)
# If `path` isn't None (== we're using a persistent cache file), and
# envar ZEO_CACHE_TRACE is set to a non-empty value, try to open
# path+'.trace' as a trace file, and store the file object in
......@@ -910,204 +658,44 @@ class ClientCache(object):
# tracing by setting self._trace to a dummy function, and set
# self._tracefile to None.
def _setup_trace(self, path):
self._tracefile = None
_tracefile = None
if path and os.environ.get("ZEO_CACHE_TRACE"):
tfn = path + ".trace"
try:
self._tracefile = open(tfn, "ab")
self._trace(0x00)
_tracefile = open(tfn, "ab")
except IOError, msg:
self._tracefile = None
logger.warning("cannot write tracefile %r (%s)", tfn, msg)
else:
logger.info("opened tracefile %r", tfn)
if self._tracefile is None:
def notrace(*args, **kws):
pass
self._trace = notrace
def _trace(self,
code, oid="", version="", tid=z64, end_tid=z64, dlen=0,
# The next two are just speed hacks.
time_time=time.time, struct_pack=struct.pack):
# The code argument is two hex digits; bits 0 and 7 must be zero.
# The first hex digit shows the operation, the second the outcome.
# This method has been carefully tuned to be as fast as possible.
# Note: when tracing is disabled, this method is hidden by a dummy.
if version:
code |= 0x80
encoded = (dlen + 255) & 0x7fffff00 | code
if tid is None:
tid = z64
if end_tid is None:
end_tid = z64
try:
self._tracefile.write(
struct_pack(">iiH8s8s",
time_time(),
encoded,
len(oid),
tid, end_tid) + oid)
except:
print `tid`, `end_tid`
raise
##
# An Object stores the cached data for a single object.
# <p>
# The cached data includes the actual object data, the key, and three
# data fields that describe the validity period and version of the
# object. The key contains the oid and a redundant start_tid. The
# actual size of an object is variable, depending on the size of the
# data and whether it is in a version.
# <p>
# The serialized format does not include the key, because it is stored
# in the header used by the cache file's storage format.
# <p>
# Instances of Object are generally short-lived -- they're really a way to
# package data on the way to or from the disk file.
class Object(object):
__slots__ = (# pair (object id, txn id) -- something usable as a dict key;
# the second part of the pair is equal to start_tid
"key",
# string, tid of txn that wrote the data
"start_tid",
# string, tid of txn that wrote next revision, or None
# if the data is current; if not None, end_tid is strictly
# greater than start_tid
"end_tid",
# string, name of version
"version",
# string, the actual data record for the object
"data",
# total size of serialized object; this includes the
# data, version, and all overhead (header) bytes.
"size",
)
# A serialized Object on disk looks like:
#
# offset # bytes value
# ------ ------- -----
# 0 8 end_tid; string
# 8 2 len(version); 2-byte signed int
# 10 4 len(data); 4-byte signed int
# 14 len(version) version; string
# 14+len(version) len(data) the object pickle; string
# 14+len(version)+
# len(data) 8 oid; string
# The serialization format uses an end tid of "\0"*8 (z64), the least
# 8-byte string, to represent None. It isn't possible for an end_tid
# to be 0, because it must always be strictly greater than the start_tid.
fmt = ">8shi" # end_tid, len(self.version), len(self.data)
FIXED_HEADER_SIZE = struct.calcsize(fmt)
assert FIXED_HEADER_SIZE == 14
TOTAL_FIXED_SIZE = FIXED_HEADER_SIZE + 8 # +8 for the oid at the end
def __init__(self, key, version, data, start_tid, end_tid):
self.key = key
self.version = version
self.data = data
self.start_tid = start_tid
self.end_tid = end_tid
# The size of the serialized object on disk, including the
# 14-byte header, the lengths of data and version, and a
# copy of the 8-byte oid.
if data is not None:
self.size = self.TOTAL_FIXED_SIZE + len(data) + len(version)
##
# Return the fixed-sized serialization header as a string: pack end_tid,
# and the lengths of the .version and .data members.
def get_header(self):
return struct.pack(self.fmt,
self.end_tid or z64,
len(self.version),
len(self.data))
##
# Write the serialized representation of self to file f, at its current
# position.
def serialize(self, f):
f.writelines([self.get_header(),
self.version,
self.data,
self.key[0]])
##
# Write the fixed-size header for self, to file f at its current position.
# The only real use for this is when the current revision of an object
# in cache is invalidated. Then the end_tid field gets set to the tid
# of the transaction that caused the invalidation.
def serialize_header(self, f):
f.write(self.get_header())
##
# fromFile is a class constructor, unserializing an Object from the
# current position in file f. Exclusive access to f for the duration
# is assumed. The key is a (oid, start_tid) pair, and the oid must
# match the serialized oid. If `skip_data` is true, .data is left
# None in the Object returned, but all the other fields are populated.
# Else (`skip_data` is false, the default), all fields including .data
# are populated. .data can be big, so it's prudent to skip it when it
# isn't needed.
def fromFile(cls, f, key, skip_data=False):
s = f.read(cls.FIXED_HEADER_SIZE)
if not s:
return None
oid, start_tid = key
end_tid, vlen, dlen = struct.unpack(cls.fmt, s)
if end_tid == z64:
end_tid = None
version = f.read(vlen)
if vlen != len(version):
raise ValueError("corrupted record, version")
if skip_data:
data = None
f.seek(dlen, 1)
else:
data = f.read(dlen)
if dlen != len(data):
raise ValueError("corrupted record, data")
s = f.read(8)
if s != oid:
raise ValueError("corrupted record, oid")
return cls((oid, start_tid), version, data, start_tid, end_tid)
fromFile = classmethod(fromFile)
# Entry just associates a key with a file offset. It's used by FileCache.
class Entry(object):
__slots__ = (# object key -- something usable as a dict key.
'key',
# Offset from start of file to the object's data
# record; this includes all overhead bytes (status
# byte, size bytes, etc). The size of the data
# record is stored in the file near the start of the
# record, but for efficiency we also keep size in a
# dict (filemap; see later).
'offset',
)
if _tracefile is None:
self._trace = lambda *a, **k: None
return
def __init__(self, key=None, offset=None):
self.key = key
self.offset = offset
now = time.time
def _trace(code, oid="", version="", tid=z64, end_tid=z64, dlen=0):
# The code argument is two hex digits; bits 0 and 7 must be zero.
# The first hex digit shows the operation, the second the outcome.
# This method has been carefully tuned to be as fast as possible.
# Note: when tracing is disabled, this method is hidden by a dummy.
if version:
code |= 0x80
encoded = (dlen + 255) & 0x7fffff00 | code
if tid is None:
tid = z64
if end_tid is None:
end_tid = z64
try:
_tracefile.write(
pack(">iiH8s8s",
now(), encoded, len(oid), tid, end_tid) + oid,
)
except:
print `tid`, `end_tid`
raise
self._trace = _trace
_trace(0x00)
def sync(f):
......
......@@ -20,7 +20,7 @@ import zope.testing.setupstack
from zope.testing import doctest
import ZEO.cache
from ZODB.utils import p64
from ZODB.utils import p64, u64
n1 = p64(1)
n2 = p64(2)
......@@ -45,9 +45,9 @@ class CacheTests(unittest.TestCase):
self.assertEqual(self.cache.getLastTid(), None)
self.cache.setLastTid(n2)
self.assertEqual(self.cache.getLastTid(), n2)
self.cache.invalidate(None, "", n1)
self.cache.invalidate(n1, "", n1)
self.assertEqual(self.cache.getLastTid(), n2)
self.cache.invalidate(None, "", n3)
self.cache.invalidate(n1, "", n3)
self.assertEqual(self.cache.getLastTid(), n3)
self.assertRaises(ValueError, self.cache.setLastTid, n2)
......@@ -99,9 +99,9 @@ class CacheTests(unittest.TestCase):
self.assertEqual(self.cache.loadBefore(n2, n4), None)
def testException(self):
# Not allowed to save non-current version data
self.assertRaises(ValueError,
self.cache.store,
n1, "version", n2, n3, "data")
self.cache.store, n1, "version", n2, n3, "data")
self.cache.store(n1, "", n2, None, "data")
self.assertRaises(ValueError,
self.cache.store,
......@@ -149,9 +149,10 @@ class CacheTests(unittest.TestCase):
eq = self.assertEqual
eq(copy.getLastTid(), self.cache.getLastTid())
eq(len(copy), len(self.cache))
eq(copy.version, self.cache.version)
eq(copy.current, self.cache.current)
eq(copy.noncurrent, self.cache.noncurrent)
eq(dict(copy.current), dict(self.cache.current))
eq(dict([(k, dict(v)) for (k, v) in copy.noncurrent.items()]),
dict([(k, dict(v)) for (k, v) in self.cache.noncurrent.items()]),
)
def testCurrentObjectLargerThanCache(self):
if self.cache.path:
......@@ -181,7 +182,7 @@ class CacheTests(unittest.TestCase):
self.assertEquals(None, self.cache.load(n1))
# If an object cannot be stored in the cache, it must not be
# recorded as non-current.
self.assert_((n2, n3) not in cache.noncurrent[n1])
self.assert_(1 not in cache.noncurrent)
__test__ = dict(
kill_does_not_cause_cache_corruption =
......@@ -233,9 +234,7 @@ __test__ = dict(
>>> import ZEO.cache, ZODB.utils
>>> cache = ZEO.cache.ClientCache('cache', 1000)
>>> data = 'X' * (1000 - ZEO.cache.ZEC3_HEADER_SIZE
... - ZEO.cache.OBJECT_HEADER_SIZE
... - ZEO.cache.Object.TOTAL_FIXED_SIZE)
>>> data = 'X' * (1000 - ZEO.cache.ZEC3_HEADER_SIZE - 43)
>>> cache.store(ZODB.utils.p64(1), '', ZODB.utils.p64(1), None, data)
>>> cache.close()
>>> cache = ZEO.cache.ClientCache('cache', 1000)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment