Commit b2ad5dad authored by Nicolas Delaby's avatar Nicolas Delaby

Cleanup test

  * harmonize indentation
  * import transaction
  * remove quiet and run_all_test variables
  * cosmetic changes


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@26973 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 1fc28dcf
# -*- coding: utf-8 -*-
############################################################################## ##############################################################################
# #
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved. # Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
...@@ -34,18 +35,14 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase ...@@ -34,18 +35,14 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.CachePlugins.DummyCache import DummyCache from Products.ERP5Type.CachePlugins.DummyCache import DummyCache
from AccessControl.SecurityManagement import newSecurityManager from AccessControl.SecurityManagement import newSecurityManager
from zLOG import LOG from zLOG import LOG
import transaction
try:
from transaction import get as get_transaction
except ImportError:
pass
class TestingCache(DummyCache): class TestingCache(DummyCache):
"""A dummy cache that mark cache miss, so that you can later count access """A dummy cache that mark cache miss, so that you can later count access
using getCacheMisses() """ using getCacheMisses() """
def __init__(self, params): def __init__(self, params):
DummyCache.__init__(self, params) DummyCache.__init__(self, params)
def __call__(self, callable_object, cache_id, scope, cache_duration=None, def __call__(self, callable_object, cache_id, scope, cache_duration=None,
*args, **kwd): *args, **kwd):
self.markCacheMiss(1) self.markCacheMiss(1)
...@@ -53,131 +50,102 @@ class TestingCache(DummyCache): ...@@ -53,131 +50,102 @@ class TestingCache(DummyCache):
class TestCacheTool(ERP5TypeTestCase): class TestCacheTool(ERP5TypeTestCase):
run_all_test = 1
def getTitle(self): def getTitle(self):
return "Cache Tool" return "Cache Tool"
def afterSetUp(self): def afterSetUp(self):
self.login() self.login()
def login(self, quiet=0, run=run_all_test): def login(self):
uf = self.getPortal().acl_users uf = self.getPortal().acl_users
uf._doAddUser('seb', '', ['Manager'], []) uf._doAddUser('admin', '', ['Manager'], [])
uf._doAddUser('ERP5TypeTestCase', '', ['Manager'], []) uf._doAddUser('ERP5TypeTestCase', '', ['Manager'], [])
user = uf.getUserById('seb').__of__(uf) user = uf.getUserById('admin').__of__(uf)
newSecurityManager(None, user) newSecurityManager(None, user)
def test_01_CheckCacheTool(self, quiet=0, run=run_all_test): def test_01_CheckCacheTool(self):
if not run: portal = self.getPortal()
return self.assertNotEqual(None,getattr(portal,'portal_caches',None))
if not quiet: transaction.commit()
message = '\nCheck CacheTool '
ZopeTestCase._print(message) def test_02_CheckPortalTypes(self):
LOG('Testing... ',0,message) portal = self.getPortal()
portal_types = portal.portal_types
portal = self.getPortal() typeinfo_names = ("Cache Factory",
self.assertNotEqual(None,getattr(portal,'portal_caches',None))
get_transaction().commit()
def test_02_CheckPortalTypes(self, quiet=0, run=run_all_test):
if not run:
return
if not quiet:
message = '\nCheck Portal Types'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
portal_types = portal.portal_types
typeinfo_names = ("Cache Factory",
"Ram Cache", "Ram Cache",
"Distributed Ram Cache", "Distributed Ram Cache",
"SQL Cache", "SQL Cache",
) )
for typeinfo_name in typeinfo_names: for typeinfo_name in typeinfo_names:
portal_type = getattr(portal_types,typeinfo_name,None) portal_type = getattr(portal_types,typeinfo_name,None)
self.assertNotEqual(None,portal_type) self.assertNotEqual(None,portal_type)
get_transaction().commit() transaction.commit()
def test_03_CreateCacheFactories(self, quiet=0, run=run_all_test): def test_03_CreateCacheFactories(self):
if not run: portal = self.getPortal()
return portal_caches = portal.portal_caches
if not quiet:
message = '\nCreate Cache Tool Factories' # Cache plugins are organised into 'Cache factories' so we create
ZopeTestCase._print(message) # factories first ram_cache_factory (to test Ram Cache Plugin)
LOG('Testing... ',0,message) ram_cache_factory = portal_caches.newContent(portal_type="Cache Factory",
portal = self.getPortal() id = 'ram_cache_factory',
portal_caches = portal.portal_caches container=portal_caches)
ram_cache_plugin = ram_cache_factory.newContent(portal_type="Ram Cache",
# Cache plugins are organised into 'Cache factories' so we create container=ram_cache_factory)
# factories first ram_cache_factory (to test Ram Cache Plugin) ram_cache_plugin.setIntIndex(0)
ram_cache_factory = portal_caches.newContent(portal_type="Cache Factory",
id = 'ram_cache_factory',
container=portal_caches) ## distributed_ram_cache_factory (to test Distributed Ram Cache Plugin)
ram_cache_plugin = ram_cache_factory.newContent(portal_type="Ram Cache", dram_cache_factory = portal_caches.newContent(portal_type="Cache Factory",
container=ram_cache_factory) id = 'distributed_ram_cache_factory',
ram_cache_plugin.setIntIndex(0) container=portal_caches)
dram_cache_plugin = dram_cache_factory.newContent(
portal_type="Distributed Ram Cache", container=dram_cache_factory)
## distributed_ram_cache_factory (to test Distributed Ram Cache Plugin) dram_cache_plugin.setIntIndex(0)
dram_cache_factory = portal_caches.newContent(portal_type="Cache Factory",
id = 'distributed_ram_cache_factory', ## sql_cache_factory (to test SQL Cache Plugin)
container=portal_caches) sql_cache_factory = portal_caches.newContent(portal_type="Cache Factory",
dram_cache_plugin = dram_cache_factory.newContent( id = 'sql_cache_factory',
portal_type="Distributed Ram Cache", container=dram_cache_factory) container=portal_caches)
dram_cache_plugin.setIntIndex(0) sql_cache_plugin = sql_cache_factory.newContent(
portal_type="SQL Cache", container=sql_cache_factory)
## sql_cache_factory (to test SQL Cache Plugin) sql_cache_plugin.setIntIndex(0)
sql_cache_factory = portal_caches.newContent(portal_type="Cache Factory",
id = 'sql_cache_factory', ## erp5_user_factory (to test a combination of all cache plugins)
container=portal_caches) erp5_user_factory = portal_caches.newContent(portal_type="Cache Factory",
sql_cache_plugin = sql_cache_factory.newContent( id = "erp5_user_factory",
portal_type="SQL Cache", container=sql_cache_factory) container=portal_caches)
sql_cache_plugin.setIntIndex(0)
ram_cache_plugin = erp5_user_factory.newContent(
## erp5_user_factory (to test a combination of all cache plugins) portal_type="Ram Cache", container=erp5_user_factory)
erp5_user_factory = portal_caches.newContent(portal_type="Cache Factory", ram_cache_plugin.setIntIndex(0)
id = "erp5_user_factory", dram_cache_plugin = erp5_user_factory.newContent(
container=portal_caches) portal_type="Distributed Ram Cache", container=erp5_user_factory)
dram_cache_plugin.setIntIndex(1)
ram_cache_plugin = erp5_user_factory.newContent( sql_cache_plugin = erp5_user_factory.newContent(
portal_type="Ram Cache", container=erp5_user_factory) portal_type="SQL Cache", container=erp5_user_factory)
ram_cache_plugin.setIntIndex(0) sql_cache_plugin.setIntIndex(2)
dram_cache_plugin = erp5_user_factory.newContent(
portal_type="Distributed Ram Cache", container=erp5_user_factory) ##
dram_cache_plugin.setIntIndex(1) transaction.commit()
sql_cache_plugin = erp5_user_factory.newContent(
portal_type="SQL Cache", container=erp5_user_factory) ## update Ram Cache structure
sql_cache_plugin.setIntIndex(2) portal_caches.updateCache()
from Products.ERP5Type.Cache import CachingMethod
##
get_transaction().commit() ## do we have the same structure we created above?
self.assert_('ram_cache_factory' in CachingMethod.factories)
## update Ram Cache structure self.assert_('distributed_ram_cache_factory' in CachingMethod.factories)
portal_caches.updateCache() self.assert_('sql_cache_factory' in CachingMethod.factories)
from Products.ERP5Type.Cache import CachingMethod self.assert_('erp5_user_factory' in CachingMethod.factories)
## do we have the same structure we created above? def test_04_CreateCachedMethod(self):
self.assert_('ram_cache_factory' in CachingMethod.factories) portal = self.getPortal()
self.assert_('distributed_ram_cache_factory' in CachingMethod.factories)
self.assert_('sql_cache_factory' in CachingMethod.factories) ## add test cached method
self.assert_('erp5_user_factory' in CachingMethod.factories) py_script_id = "testCachedMethod"
py_script_params = "value=10000, portal_path=('','erp5')"
def test_04_CreateCachedMethod(self, quiet=0, run=run_all_test): py_script_body = """
if not run:
return
if not quiet:
message = '\nCreate Cache Method (Python Script)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
## add test cached method
py_script_id = "testCachedMethod"
py_script_params = "value=10000, portal_path=('','erp5')"
py_script_body = """
def veryExpensiveMethod(value): def veryExpensiveMethod(value):
## do something expensive for some time ## do something expensive for some time
## no 'time.sleep()' available in Zope ## no 'time.sleep()' available in Zope
...@@ -190,96 +158,83 @@ def veryExpensiveMethod(value): ...@@ -190,96 +158,83 @@ def veryExpensiveMethod(value):
result = veryExpensiveMethod(value) result = veryExpensiveMethod(value)
return result return result
""" """
portal.manage_addProduct['PythonScripts'].manage_addPythonScript( portal.manage_addProduct['PythonScripts'].manage_addPythonScript(
id=py_script_id) id=py_script_id)
py_script_obj = getattr(portal, py_script_id) py_script_obj = getattr(portal, py_script_id)
py_script_obj.ZPythonScript_edit(py_script_params, py_script_body) py_script_obj.ZPythonScript_edit(py_script_params, py_script_body)
get_transaction().commit() transaction.commit()
def test_05_CacheFactoryOnePlugin(self, quiet=0, run=run_all_test): def test_05_CacheFactoryOnePlugin(self):
""" Test cache factory containing only one cache plugin. """ """ Test cache factory containing only one cache plugin. """
if not run: portal = self.getPortal()
return from Products.ERP5Type.Cache import CachingMethod
if not quiet: py_script_id = "testCachedMethod"
message = '\nTest each type of cache plugin individually.' py_script_obj = getattr(portal, py_script_id)
ZopeTestCase._print(message) for cf_name in ('ram_cache_factory',
LOG('Testing... ',0,message) 'distributed_ram_cache_factory',
portal = self.getPortal() 'sql_cache_factory'):
from Products.ERP5Type.Cache import CachingMethod
py_script_id = "testCachedMethod"
py_script_obj = getattr(portal, py_script_id)
for cf_name in ('ram_cache_factory',
'distributed_ram_cache_factory',
'sql_cache_factory'):
my_cache = CachingMethod(py_script_obj,
'py_script_obj',
cache_factory=cf_name)
self._cacheFactoryInstanceTest(my_cache, cf_name)
def test_06_CacheFactoryMultiPlugins(self, quiet=0, run=run_all_test):
""" Test a cache factory containing multiple cache plugins. """
if not run:
return
if not quiet:
message = '\nTest combination of available cache plugins under a cache'\
' factory'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
portal = self.getPortal()
from Products.ERP5Type.Cache import CachingMethod
py_script_id = "testCachedMethod"
py_script_obj = getattr(portal, py_script_id)
cf_name = 'erp5_user_factory'
my_cache = CachingMethod(py_script_obj, my_cache = CachingMethod(py_script_obj,
'py_script_obj', 'py_script_obj',
cache_factory=cf_name) cache_factory=cf_name)
self._cacheFactoryInstanceTest(my_cache, cf_name) self._cacheFactoryInstanceTest(my_cache, cf_name)
def test_06_CacheFactoryMultiPlugins(self):
""" Test a cache factory containing multiple cache plugins. """
portal = self.getPortal()
from Products.ERP5Type.Cache import CachingMethod
py_script_id = "testCachedMethod"
py_script_obj = getattr(portal, py_script_id)
cf_name = 'erp5_user_factory'
my_cache = CachingMethod(py_script_obj,
'py_script_obj',
cache_factory=cf_name)
self._cacheFactoryInstanceTest(my_cache, cf_name)
def _cacheFactoryInstanceTest(self, my_cache, cf_name): def _cacheFactoryInstanceTest(self, my_cache, cf_name):
portal = self.getPortal() portal = self.getPortal()
print print
print "="*40 print "="*40
print "TESTING:", cf_name print "TESTING:", cf_name
# if the test fails because your machine is too fast, increase this value. # if the test fails because your machine is too fast, increase this value.
nb_iterations = 30000 nb_iterations = 30000
portal.portal_caches.clearCacheFactory(cf_name) portal.portal_caches.clearCacheFactory(cf_name)
## 1st call ## 1st call
start = time.time() start = time.time()
original = my_cache(nb_iterations, portal_path=('', portal.getId())) original = my_cache(nb_iterations, portal_path=('', portal.getId()))
end = time.time() end = time.time()
calculation_time = end-start calculation_time = end-start
print "\n\tCalculation time (1st call)", calculation_time print "\n\tCalculation time (1st call)", calculation_time
## 2nd call - should be cached now ## 2nd call - should be cached now
start = time.time() start = time.time()
cached = my_cache(nb_iterations, portal_path=('', portal.getId())) cached = my_cache(nb_iterations, portal_path=('', portal.getId()))
end = time.time() end = time.time()
calculation_time = end-start calculation_time = end-start
print "\n\tCalculation time (2nd call)", calculation_time print "\n\tCalculation time (2nd call)", calculation_time
# check if cache works by getting calculation_time for last cache # check if cache works by getting calculation_time for last cache
# operation even remote cache must have access time less than a second. # operation even remote cache must have access time less than a second.
# if it's greater than method wasn't previously cached and was calculated # if it's greater than method wasn't previously cached and was calculated
# instead # instead
self.assert_(1.0 > calculation_time) self.assert_(1.0 > calculation_time)
## check if equal. ## check if equal.
self.assertEquals(original, cached) self.assertEquals(original, cached)
## OK so far let's clear cache ## OK so far let's clear cache
portal.portal_caches.clearCacheFactory(cf_name) portal.portal_caches.clearCacheFactory(cf_name)
## 1st call ## 1st call
start = time.time() start = time.time()
original = my_cache(nb_iterations, portal_path=('', portal.getId())) original = my_cache(nb_iterations, portal_path=('', portal.getId()))
end = time.time() end = time.time()
calculation_time = end-start calculation_time = end-start
print "\n\tCalculation time (after cache clear)", calculation_time print "\n\tCalculation time (after cache clear)", calculation_time
## Cache cleared shouldn't be previously cached ## Cache cleared shouldn't be previously cached
self.assert_(1.0 < calculation_time) self.assert_(1.0 < calculation_time)
def test_CachePersistentObjects(self): def test_CachePersistentObjects(self):
# storing persistent objects in cache is not allowed, but this check is # storing persistent objects in cache is not allowed, but this check is
...@@ -296,7 +251,6 @@ return result ...@@ -296,7 +251,6 @@ return result
return self.portal.getTitle return self.portal.getTitle
cached_func = CachingMethod(func, 'cache_bound_method') cached_func = CachingMethod(func, 'cache_bound_method')
self.assertRaises(TypeError, cached_func) self.assertRaises(TypeError, cached_func)
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment