Commit 7f59134b authored by Jim Fulton's avatar Jim Fulton

Cache sizes can now be changed. (Previously, you couldn't change the

size of an existing cache file.)
parent 1bbc6c15
...@@ -39,7 +39,8 @@ New Features ...@@ -39,7 +39,8 @@ New Features
- The previous (ZODB 3.8) ZEO client-cache format is supported. - The previous (ZODB 3.8) ZEO client-cache format is supported.
The newer cache format introduced in ZODB 3.9.0a1 is no-longer The newer cache format introduced in ZODB 3.9.0a1 is no-longer
supported. Cache files can still be larger than 4G. supported. Cache files can still be larger than 4G. Cache file
sizes can now be changed.
3.9.0a4 (2008-11-06) 3.9.0a4 (2008-11-06)
==================== ====================
......
...@@ -88,6 +88,7 @@ ZEC_HEADER_SIZE = 12 ...@@ -88,6 +88,7 @@ ZEC_HEADER_SIZE = 12
# while opening. # while opening.
max_block_size = (1<<31) - 1 max_block_size = (1<<31) - 1
# After the header, the file contains a contiguous sequence of blocks. All # After the header, the file contains a contiguous sequence of blocks. All
# blocks begin with a one-byte status indicator: # blocks begin with a one-byte status indicator:
# #
...@@ -116,6 +117,8 @@ max_block_size = (1<<31) - 1 ...@@ -116,6 +117,8 @@ max_block_size = (1<<31) - 1
# 2 byte version length must be 0 # 2 byte version length must be 0
# 4 byte data size # 4 byte data size
# data # data
# 8 byte redundant oid for error detection.
allocated_record_overhead = 43
# The cache's currentofs goes around the file, circularly, forever. # The cache's currentofs goes around the file, circularly, forever.
# It's always the starting offset of some block. # It's always the starting offset of some block.
...@@ -187,35 +190,25 @@ class ClientCache(object): ...@@ -187,35 +190,25 @@ class ClientCache(object):
# here -- the scan() method must be called then to open the file # here -- the scan() method must be called then to open the file
# (and it sets self.f). # (and it sets self.f).
fsize = ZEC_HEADER_SIZE
if path: if path:
self._lock_file = zc.lockfile.LockFile(path + '.lock') self._lock_file = zc.lockfile.LockFile(path + '.lock')
if not os.path.exists(path):
if path and os.path.exists(path): # Create a small empty file. We'll make it bigger in _initfile.
# Reuse an existing file. scan() will open & read it.
self.f = None
logger.info("reusing persistent cache file %r", path)
else:
if path:
self.f = open(path, 'wb+') self.f = open(path, 'wb+')
self.f.write(magic+z64)
logger.info("created persistent cache file %r", path) logger.info("created persistent cache file %r", path)
else: else:
self.f = tempfile.TemporaryFile() fsize = os.path.getsize(self.path)
logger.info("created temporary cache file %r", self.f.name) self.f = open(path, 'rb+')
# Make sure the OS really saves enough bytes for the file. logger.info("reusing persistent cache file %r", path)
self.f.seek(self.maxsize - 1) else:
self.f.write('x') # Create a small empty file. We'll make it bigger in _initfile.
self.f.truncate() self.f = tempfile.TemporaryFile()
# Start with one magic header block self.f.write(magic+z64)
self.f.seek(0) logger.info("created temporary cache file %r", self.f.name)
self.f.write(magic)
self.f.write(z64) self._initfile(self.f, fsize)
# add as many free blocks as are needed to fill the space
nfree = self.maxsize - ZEC_HEADER_SIZE
for i in range(0, nfree, max_block_size):
block_size = min(max_block_size, nfree-i)
self.f.write('f' + pack(">I", block_size))
self.f.seek(block_size-5, 1)
sync(self.f)
# Statistics: _n_adds, _n_added_bytes, # Statistics: _n_adds, _n_added_bytes,
# _n_evicts, _n_evicted_bytes, # _n_evicts, _n_evicted_bytes,
...@@ -224,8 +217,6 @@ class ClientCache(object): ...@@ -224,8 +217,6 @@ class ClientCache(object):
self._setup_trace(path) self._setup_trace(path)
self.open()
self._lock = threading.RLock() self._lock = threading.RLock()
# Backward compatibility. Client code used to have to use the fc # Backward compatibility. Client code used to have to use the fc
...@@ -238,20 +229,13 @@ class ClientCache(object): ...@@ -238,20 +229,13 @@ class ClientCache(object):
# Scan the current contents of the cache file, calling `install` # Scan the current contents of the cache file, calling `install`
# for each object found in the cache. This method should only # for each object found in the cache. This method should only
# be called once to initialize the cache from disk. # be called once to initialize the cache from disk.
def open(self): def _initfile(self, f, fsize):
if self.f is not None: # we're not (re)using a pre-existing file maxsize = self.maxsize
return read = f.read
fsize = os.path.getsize(self.path) seek = f.seek
if fsize != self.maxsize: write = f.write
logger.warning("existing cache file %r has size %d; " seek(0)
"requested size %d ignored", self.path, if read(4) != magic:
fsize, self.maxsize)
self.maxsize = fsize
self.f = open(self.path, 'rb+')
read = self.f.read
seek = self.f.seek
_magic = read(4)
if _magic != magic:
raise ValueError("unexpected magic number: %r" % _magic) raise ValueError("unexpected magic number: %r" % _magic)
self.tid = read(8) self.tid = read(8)
if len(self.tid) != 8: if len(self.tid) != 8:
...@@ -264,8 +248,9 @@ class ClientCache(object): ...@@ -264,8 +248,9 @@ class ClientCache(object):
self.current = ZODB.fsIndex.fsIndex() self.current = ZODB.fsIndex.fsIndex()
self.noncurrent = BTrees.LOBTree.LOBTree() self.noncurrent = BTrees.LOBTree.LOBTree()
max_free_size = l = 0 l = 0
ofs = max_free_offset = ZEC_HEADER_SIZE ofs = ZEC_HEADER_SIZE
first_free_offset = 0
current = self.current current = self.current
while ofs < fsize: while ofs < fsize:
seek(ofs) seek(ofs)
...@@ -273,35 +258,77 @@ class ClientCache(object): ...@@ -273,35 +258,77 @@ class ClientCache(object):
if status == 'a': if status == 'a':
size, oid, start_tid, end_tid, lver = unpack( size, oid, start_tid, end_tid, lver = unpack(
">I8s8s8sH", read(30)) ">I8s8s8sH", read(30))
if end_tid == z64: if ofs+size <= maxsize:
assert oid not in current, (ofs, self.f.tell()) if end_tid == z64:
current[oid] = ofs assert oid not in current, (ofs, f.tell())
current[oid] = ofs
else:
assert start_tid < end_tid, (ofs, f.tell())
self._set_noncurrent(oid, start_tid, ofs)
assert lver == 0, "Versions aren't supported"
l += 1
else:
# free block
if first_free_offset == 0:
first_free_offset = ofs
if status == 'f':
size, = unpack(">I", read(4))
if size > max_block_size:
# Oops, we either have an old cache, or a we
# crashed while storing. Split this block into two.
assert size <= max_block_size*2
seek(ofs+max_block_size)
write('f'+pack(">I", size-max_block_size))
seek(ofs)
write('f'+pack(">I", max_block_size))
sync(f)
elif status in '1234':
size = int(status)
else: else:
assert start_tid < end_tid, (ofs, self.f.tell()) raise ValueError("unknown status byte value %s in client "
self._set_noncurrent(oid, start_tid, ofs) "cache file" % 0, hex(ord(status)))
assert lver == 0, "Versions aren't supported"
l += 1 if ofs + size >= maxsize:
elif status == 'f': # Oops, the file was bigger before.
size, = unpack(">I", read(4)) if ofs+size > maxsize:
if size > max_block_size: # The last record is too big. Replace it with a smaller
# Oops, we either have an old cache, or a we # free record
# crashed while storing. Split this block into two. size = maxsize-ofs
assert size <= max_block_size*2
seek(ofs+max_block_size)
self.f.write('f'+pack(">I", size-max_block_size))
seek(ofs) seek(ofs)
self.f.write('f'+pack(">I", max_block_size)) if size > 4:
elif status in '1234': write('f'+pack(">I", size))
size = int(status) else:
else: write("012345"[size])
raise ValueError("unknown status byte value %s in client " sync(f)
"cache file" % 0, hex(ord(status))) ofs += size
break
ofs += size ofs += size
if ofs != fsize: if fsize < maxsize:
raise ValueError("final offset %s != file size %s in client " assert ofs==fsize
"cache file" % (ofs, fsize)) # Make sure the OS really saves enough bytes for the file.
self.currentofs = max_free_offset seek(self.maxsize - 1)
write('x')
# add as many free blocks as are needed to fill the space
seek(ofs)
nfree = maxsize - ZEC_HEADER_SIZE
for i in range(0, nfree, max_block_size):
block_size = min(max_block_size, nfree-i)
write('f' + pack(">I", block_size))
seek(block_size-5, 1)
sync(self.f)
first_free_offset = ofs
else:
assert ofs==maxsize
if maxsize < fsize:
seek(maxsize)
f.truncate()
# We use the first_free_offset because it is most likelyt the
# place where we last wrote.
self.currentofs = first_free_offset or ZEC_HEADER_SIZE
self._len = l self._len = l
def _set_noncurrent(self, oid, tid, ofs): def _set_noncurrent(self, oid, tid, ofs):
...@@ -518,7 +545,7 @@ class ClientCache(object): ...@@ -518,7 +545,7 @@ class ClientCache(object):
if noncurrent_for_oid and (u64(start_tid) in noncurrent_for_oid): if noncurrent_for_oid and (u64(start_tid) in noncurrent_for_oid):
return return
size = 43 + len(data) size = allocated_record_overhead + len(data)
# A number of cache simulation experiments all concluded that the # A number of cache simulation experiments all concluded that the
# 2nd-level ZEO cache got a much higher hit rate if "very large" # 2nd-level ZEO cache got a much higher hit rate if "very large"
......
...@@ -134,7 +134,7 @@ class CacheTests(ZODB.tests.util.TestCase): ...@@ -134,7 +134,7 @@ class CacheTests(ZODB.tests.util.TestCase):
n = p64(i) n = p64(i)
cache.store(n, n, None, data[i]) cache.store(n, n, None, data[i])
self.assertEquals(len(cache), i + 1) self.assertEquals(len(cache), i + 1)
# The cache now uses 3287 bytes. The next insert # The cache is now almost full. The next insert
# should delete some objects. # should delete some objects.
n = p64(50) n = p64(50)
cache.store(n, n, None, data[51]) cache.store(n, n, None, data[51])
...@@ -197,10 +197,10 @@ class CacheTests(ZODB.tests.util.TestCase): ...@@ -197,10 +197,10 @@ class CacheTests(ZODB.tests.util.TestCase):
self.assert_(1 not in cache.noncurrent) self.assert_(1 not in cache.noncurrent)
def testVeryLargeCaches(self): def testVeryLargeCaches(self):
cache = ZEO.cache.ClientCache('cache', size=(1<<33)) cache = ZEO.cache.ClientCache('cache', size=(1<<32)+(1<<20))
cache.store(n1, n2, None, "x") cache.store(n1, n2, None, "x")
cache.close() cache.close()
cache = ZEO.cache.ClientCache('cache', size=(1<<33)) cache = ZEO.cache.ClientCache('cache', size=(1<<33)+(1<<20))
self.assertEquals(cache.load(n1), ('x', n2)) self.assertEquals(cache.load(n1), ('x', n2))
cache.close() cache.close()
...@@ -224,6 +224,77 @@ class CacheTests(ZODB.tests.util.TestCase): ...@@ -224,6 +224,77 @@ class CacheTests(ZODB.tests.util.TestCase):
ZEO.cache.max_block_size) ZEO.cache.max_block_size)
f.close() f.close()
def testChangingCacheSize(self):
# start with a small cache
data = 'x'
recsize = ZEO.cache.allocated_record_overhead+len(data)
for extra in (0, 2, recsize-2):
cache = ZEO.cache.ClientCache(
'cache', size=ZEO.cache.ZEC_HEADER_SIZE+100*recsize+extra)
for i in range(100):
cache.store(p64(i), n1, None, data)
self.assertEquals(len(cache), 100)
self.assertEquals(os.path.getsize(
'cache'), ZEO.cache.ZEC_HEADER_SIZE+100*recsize+extra)
# Now make it smaller
cache.close()
small = 50
cache = ZEO.cache.ClientCache(
'cache', size=ZEO.cache.ZEC_HEADER_SIZE+small*recsize+extra)
self.assertEquals(len(cache), small)
self.assertEquals(os.path.getsize(
'cache'), ZEO.cache.ZEC_HEADER_SIZE+small*recsize+extra)
self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
set(range(small)))
for i in range(100, 110):
cache.store(p64(i), n1, None, data)
self.assertEquals(len(cache), small)
expected_oids = set(range(10, 50)+range(100, 110))
self.assertEquals(
set(u64(oid) for (oid, tid) in cache.contents()),
expected_oids)
# Make sure we can reopen with same size
cache.close()
cache = ZEO.cache.ClientCache(
'cache', size=ZEO.cache.ZEC_HEADER_SIZE+small*recsize+extra)
self.assertEquals(len(cache), small)
self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
expected_oids)
# Now make it bigger
cache.close()
large = 150
cache = ZEO.cache.ClientCache(
'cache', size=ZEO.cache.ZEC_HEADER_SIZE+large*recsize+extra)
self.assertEquals(len(cache), small)
self.assertEquals(os.path.getsize(
'cache'), ZEO.cache.ZEC_HEADER_SIZE+large*recsize+extra)
self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
expected_oids)
for i in range(200, 305):
cache.store(p64(i), n1, None, data)
self.assertEquals(len(cache), large)
expected_oids = set(range(10, 50)+range(105, 110)+range(200, 305))
self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
expected_oids)
# Make sure we can reopen with same size
cache.close()
cache = ZEO.cache.ClientCache(
'cache', size=ZEO.cache.ZEC_HEADER_SIZE+large*recsize+extra)
self.assertEquals(len(cache), large)
self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
expected_oids)
# Cleanup
cache.close()
os.remove('cache')
__test__ = dict( __test__ = dict(
kill_does_not_cause_cache_corruption = kill_does_not_cause_cache_corruption =
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment