Commit 0e0bb023 authored by Chris Withers's avatar Chris Withers

merge of DA.py caching work from 2.9 branch:

      - Reworking of _cached_result in Shared.DC.ZRDB.DA.DA:

        - fixed KeyError reported in Collector #2212

        - fixed two memory leaks that occurred under high load

        - fixed broken cache keys for people using the obscure 
          Shared.DC.ZRDB.DA.DA.connection_hook

        - fixed incorrect cache ordering resulting in newer results
          being dumped when the cache became too large.
parents c044d8f5 1ed2f06e
......@@ -44,8 +44,7 @@ from AccessControl.DTML import RestrictedDTML
from webdav.Resource import Resource
from webdav.Lockable import ResourceLockedError
from zExceptions import BadRequest
try: from IOBTree import Bucket
except: Bucket=lambda:{}
from BTrees.OOBTree import OOBucket as Bucket
class DatabaseError(BadRequest):
......@@ -357,39 +356,79 @@ class DA(
def _searchable_result_columns(self): return self._col
def _cached_result(self, DB__, query):
pure_query = query
# we need to munge the incoming query key in the cache
# so that the same request to a different db is returned
query = query + ('\nDBConnId: %s' % self.connection_hook, )
# Try to fetch from cache
if hasattr(self,'_v_cache'): cache=self._v_cache
else: cache=self._v_cache={}, Bucket()
cache, tcache = cache
def _cached_result(self, DB__, query, max_rows, conn_id):
# Try to fetch a result from the cache.
# Compute and cache the result otherwise.
# Also maintains the cache and ensures stale entries
# are never returned and that the cache never gets too large.
# NB: Correct cache behavior is predicated on Bucket.keys()
# returning a sequence ordered from smalled number
# (ie: the oldest cache entry) to largest number
# (ie: the newest cache entry). Please be careful if you
# change the class instantied below!
# get hold of a cache
caches = getattr(self,'_v_cache',None)
if caches is None:
caches = self._v_cache = {}, Bucket()
cache, tcache = caches
# the key for caching
cache_key = query,max_rows,conn_id
# the maximum number of result sets to cache
max_cache=self.max_cache_
# the current time
now=time()
# the oldest time which is not stale
t=now-self.cache_time_
if len(cache) > max_cache / 2:
# if the cache is too big, we purge entries from it
if len(cache) >= max_cache:
keys=tcache.keys()
keys.reverse()
while keys and (len(keys) > max_cache or keys[-1] < t):
key=keys[-1]
# We also hoover out any stale entries, as we're
# already doing cache minimisation.
# 'keys' is ordered, so we purge the oldest results
# until the cache is small enough and there are no
# stale entries in it
while keys and (len(keys) >= max_cache or keys[0] < t):
key=keys[0]
q=tcache[key]
del tcache[key]
if int(cache[q][0]) == key:
del cache[q]
del keys[-1]
if cache.has_key(query):
k, r = cache[query]
if k > t: return r
del keys[0]
# okay, now see if we have a cached result
if cache.has_key(cache_key):
k, r = cache[cache_key]
# the result may still be stale, as we only hoover out
# stale results above if the cache gets too large.
if k > t:
# yay! a cached result returned!
return r
else:
# delete stale cache entries
del cache[cache_key]
del tcache[k]
# call the pure query
result=apply(DB__.query, pure_query)
if self.cache_time_ > 0:
tcache[int(now)]=query
cache[query]= now, result
result=DB__.query(query,max_rows)
# When a ZSQL method is handled by one ZPublisher thread twice in
# less time than it takes for time.time() to return a different
# value, the SQL generated is different, then this code will leak
# an entry in 'cache' for each time the ZSQL method generates
# different SQL until time.time() returns a different value.
#
# On Linux, you would need an extremely fast machine under extremely
# high load, making this extremely unlikely. On Windows, this is a
# little more likely, but still unlikely to be a problem.
#
# If it does become a problem, the values of the tcache mapping
# need to be turned into sets of cache keys rather than a single
# cache key.
tcache[now]=cache_key
cache[cache_key]= now, result
return result
......@@ -456,8 +495,9 @@ class DA(
if src__: return query
if self.cache_time_ > 0 and self.max_cache_ > 0:
result=self._cached_result(DB__, (query, self.max_rows_))
else: result=DB__.query(query, self.max_rows_)
result=self._cached_result(DB__, query, self.max_rows_, c)
else:
result=DB__.query(query, self.max_rows_)
if hasattr(self, '_v_brain'): brain=self._v_brain
else:
......
from pprint import pprint
from time import time
from unittest import TestCase,TestSuite,makeSuite
class DummyDB:
conn_num = 1
result = None
def query(self,query,max_rows):
if self.result:
return self.result
return 'result for ' + query
def hook_method(self):
conn_to_use = 'conn'+str(self.conn_num)
self.conn_num += 1
return conn_to_use
class DummyTime:
def __init__(self,t):
self.t = float(t)
def __call__(self):
return self.t
class TestCaching(TestCase):
echo = False
def setUp(self):
from Shared.DC.ZRDB import DA
self.DA = DA
self.da = DA.DA('da','title','conn_id','arg1 arg2','some sql')
# set the da's caching parameters
self.da.cache_time_ = 10
self.da.max_cache_ = 2
def _do_query(self,query,t):
try:
self.DA.time = DummyTime(t)
result = self.da._cached_result(DummyDB(),query,1,'conn_id')
finally:
self.DA.time = time
self.assertEqual(result,'result for '+query)
def _check_mapping(self,expected,actual):
missing = []
extra = []
different = []
for key,value in expected.items():
try:
ai = actual[key]
except KeyError:
missing.append(key)
else:
if ai!=value:
different.append('%r: %r != %r' % (key,value,ai))
for key in actual.keys():
try:
expected[key]
except KeyError:
extra.append(key)
result = []
if different:
different.sort()
result.append("Mismatching, key: (expected != actual):")
for r in different:
result.append(r)
if missing:
missing.sort()
result.append("The following keys were missing from actual:")
for r in missing:
result.append(repr(r))
if extra:
extra.sort()
result.append("The following extra keys were found in actual:")
for r in extra:
result.append(repr(r))
return result
def _check_cache(self,cache,tcache):
if self.echo:
print "cache:"
pprint(self.da._v_cache[0])
print "tcache:"
pprint(self.da._v_cache[1])
print
result = []
r = self._check_mapping(cache,self.da._v_cache[0])
if r:
result.append("cache didn't match expected:")
result.extend(r)
r = self._check_mapping(tcache,self.da._v_cache[1])
if r:
result.append("tcache didn't match expected:")
result.extend(r)
if result:
self.fail('\n\n'+'\n'.join(result))
def test_bad_aquisition(self):
# checks that something called _v_cache isn't acquired from anywhere
from ExtensionClass import Base
class Dummy(Base):
_v_cache = 'muhahaha'
obj = Dummy()
self.da = self.da.__of__(obj)
del self.da._v_cache
self._do_query('query',1)
def test_same_query_different_seconds(self):
# this tests a sequence of requests for the same
# query, but where the item returned is always in the cache
self._check_cache({},{})
for t in range(1,6):
self._do_query('query',t)
self._check_cache(
{('query',1,'conn_id'): (1,'result for query')},
{1: ('query',1,'conn_id')}
)
def test_same_query_same_second(self):
# this tests a sequence set of requests for the same
# query, but where the item returned is always in the cache
# and where the queries all occur in the same second
self._check_cache({},{})
for t in range(11,16,1):
t = float(t)/10
self._do_query('query',t)
self._check_cache(
{('query',1,'conn_id'): (1.1,'result for query')},
{1.1: ('query',1,'conn_id')}
)
def test_different_queries_different_second(self):
# This tests different queries being fired into the cache
# in sufficient volume to excercise the purging code
self._check_cache({},{})
# one
self._do_query('query1',1.1)
self._check_cache(
{('query1',1,'conn_id'): (1.1,'result for query1')},
{1.1: ('query1',1,'conn_id')}
)
# two
self._do_query( 'query2',3.2)
self._check_cache(
{('query1',1,'conn_id'): (1.1,'result for query1'),
('query2',1,'conn_id'): (3.2,'result for query2'),},
{1.1: ('query1',1,'conn_id'),
3.2: ('query2',1,'conn_id'),}
)
# three - now we drop our first cache entry
self._do_query('query3',4.3)
self._check_cache(
{('query2',1,'conn_id'): (3.2,'result for query2'),
('query3',1,'conn_id'): (4.3,'result for query3'),},
{3.2: ('query2',1,'conn_id'),
4.3: ('query3',1,'conn_id'),}
)
# four - now we drop our second cache entry
self._do_query('query4',8.4)
self._check_cache(
{('query3',1,'conn_id'): (4.3,'result for query3'),
('query4',1,'conn_id'): (8.4,'result for query4'),},
{4.3: ('query3',1,'conn_id'),
8.4: ('query4',1,'conn_id'),}
)
def test_different_queries_same_second(self):
# This tests different queries being fired into the cache
# in the same second in sufficient quantities to exercise
# the purging code
self._check_cache({},{})
# one
self._do_query('query1',1.0)
self._check_cache(
{('query1',1,'conn_id'): (1.0,'result for query1')},
{1.0: ('query1',1,'conn_id')}
)
# two
self._do_query( 'query2',1.1)
self._check_cache(
{('query1',1,'conn_id'): (1.0,'result for query1'),
('query2',1,'conn_id'): (1.1,'result for query2'),},
{1.0: ('query1',1,'conn_id'),
1.1: ('query2',1,'conn_id'),}
)
# three - now we drop our first cache entry
self._do_query('query3',1.2)
self._check_cache(
{('query2',1,'conn_id'): (1.1,'result for query2'),
('query3',1,'conn_id'): (1.2,'result for query3'),},
{1.1: ('query2',1,'conn_id'),
1.2: ('query3',1,'conn_id'),}
)
# four - now we drop another cache entry
self._do_query('query4',1.3)
self._check_cache(
{('query3',1,'conn_id'): (1.2,'result for query3'),
('query4',1,'conn_id'): (1.3,'result for query4'),},
{1.2: ('query3',1,'conn_id'),
1.3: ('query4',1,'conn_id'),}
)
def test_time_tcache_expires(self):
# This tests that once the cache purging code is triggered,
# it will actively hoover out all expired cache entries
# the first query gets cached
self._do_query('query1',1)
self._check_cache(
{('query1',1,'conn_id'): (1,'result for query1')},
{1: ('query1',1,'conn_id')}
)
# the 2nd gets cached, the cache is still smaller than max_cache
self._do_query('query2',2)
self._check_cache(
{('query1',1,'conn_id'): (1,'result for query1'),
('query2',1,'conn_id'): (2,'result for query2')},
{1: ('query1',1,'conn_id'),
2:('query2',1,'conn_id')}
)
# the 3rd trips the max_cache trigger, so both our old queries get
# dumped because they are past their expiration time
self._do_query('query',23)
self._check_cache(
{('query',1,'conn_id'): (23,'result for query')},
{23:('query',1,'conn_id')}
)
def test_time_refreshed_cache(self):
# This tests that when a cached query is expired when it comes
# to check for a cached entry for that query, the stale entry is
# removed and replaced with a fresh entry.
# the first query gets cached
self._do_query('query1',1)
self._check_cache(
{('query1',1,'conn_id'): (1,'result for query1')},
{1: ('query1',1,'conn_id')}
)
# do the same query much later, so new one gets cached
self._do_query('query1',12)
self._check_cache(
{('query1',1,'conn_id'): (12,'result for query1')},
{12: ('query1',1,'conn_id')}
)
class DummyDA:
def __call__(self):
conn = DummyDB()
conn.result = ((),())
return conn
sql_quote__ = "I don't know what this is."
class Hook:
conn_num = 1
def __call__(self):
conn_to_use = 'conn'+str(self.conn_num)
self.conn_num += 1
return conn_to_use
class TestCacheKeys(TestCase):
# These tests check that the keys used for caching are unique
# in the right ways.
def _cached_result(self,DB__,query,row_count,conn_id):
self.cache_key = query,row_count,conn_id
# we return something that can be safely turned into an empty Result
return ((),())
def setUp(self):
from Shared.DC.ZRDB.DA import DA
self.da = DA('da','title','conn_id','arg1 arg2','some sql')
self.da._cached_result = self._cached_result
self.da.conn_id = DummyDA()
# These need to be set so DA.__call__ tries for a cached result
self.da.cache_time_ = 1
self.da.max_cache_ = 1
def test_default(self):
self.da()
self.assertEqual(self.cache_key,('some sql',1000,'conn_id'))
def test_different_max_rows(self):
self.da.max_rows_ = 123
self.da()
self.assertEqual(self.cache_key,('some sql',123,'conn_id'))
def test_connection_hook(self):
self.da.connection_hook = 'hook_method'
self.da.hook_method = Hook()
self.da.conn1 = DummyDA()
self.da()
self.assertEqual(self.cache_key,('some sql',1000,'conn1'))
self.da.conn2 = DummyDA()
self.da()
self.assertEqual(self.cache_key,('some sql',1000,'conn2'))
class TestFullChain(TestCase):
# This exercises both DA.__call__ and DA._cached_result.
def setUp(self):
from Shared.DC.ZRDB.DA import DA
self.da = DA('da','title','conn_id','arg1 arg2','some sql')
self.da.conn_id = DummyDA()
def test_args_match(self):
# This checks is that DA._cached_result's call signature
# matches that expected by DA.__call__
# These need to be set so DA.__call__ tries for a cached result
self.da.cache_time_ = 1
self.da.max_cache_ = 1
# the actual test, will throw exceptions if things aren't right
self.da()
def test_cached_result_not_called_for_no_caching(self):
# blow up the _cached_result method on our
# test instance
self.da._cached_result = None
# check we never get there with the default "no cachine"
self.da()
# turn caching on
self.da.cache_time_ = 1
self.da.max_cache_ = 1
# check that we get an exception
self.assertRaises(TypeError,self.da)
def test_suite():
suite = TestSuite()
suite.addTest(makeSuite(TestCaching))
suite.addTest(makeSuite(TestCacheKeys))
suite.addTest(makeSuite(TestFullChain))
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment