Commit 32703c43 authored by Ivan Tyagov's avatar Ivan Tyagov

Improve SeesionTool to work as well in a distributed environment (memcached, MySQL).

Make use of already present cache plugins used as a storage.
Extend test to cover functionality for all kind of storage (i.e. cache) plugins.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@24702 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent e44242e8
......@@ -28,76 +28,147 @@
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type.Cache import CachingMethod
from Products.ERP5Type import Permissions
from Acquisition import aq_base
from time import time
from UserDict import UserDict
# the ERP5 cache factory used as a storage
SESSION_CACHE_FACTORY = 'erp5_session_cache'
SESSION_SCOPE = 'SESSION'
# =24 hours, really last default duration value set if not defined in storage
DEFAULT_SESSION_DURATION = 86400
_marker=[]
def wrapper():
""" Simple cache call wrapper."""
s = Session()
return s
# global storage plugin
storage_plugin = None
class Session(UserDict):
""" Session object acts as a plain python dictionary but can wrap/unwarp ZODB
RAM object (newTempOrder, newTempDocument..) in order to safely store it inside.
Please be AWARE that there's no security checks applied! """
""" Session acts as a plain python dictionary stored in respecitve Cache Factory/Cache Plugin.
Depending on cache plugin used as a storage some restrictions may apply.
Please be AWARE that there's no security checks applied. """
## we have our own security policy and do not want Zope's
# we have our own security policy and do not want Zope's
_guarded_writes = 1
__allow_access_to_unprotected_subobjects__ = 1
# XXX (dirty hack): we shouldn't need to explicitly set uid here
uid = 'NULL'
## a handle to current aquisition context
# used to set duration of session
session_duration = None
def _updatecontext(self, aq_context):
""" Update current aquisition context.
This makes only sense for local RAM Session."""
pass
def _updateSessionDuration(self, session_duration):
self.session_duration = int(session_duration)
def _updateSessionId(self, session_id):
self.session_id = session_id
def __str__(self):
return self.__repr__()
def edit(self, **kw):
""" Edit session object. """
for key, item in kw.items():
self.__setitem__(key, item)
class RamSession(Session):
""" Local RAM Session dictionary """
# a handle to current aquisition context
_aq_context = None
def _updatecontext(self, aq_context):
""" Update current aquisition context. """
self._aq_context = aq_context
def __getattr__(self, key, default=_marker):
if key in self.data:
return self.__getitem__(key)
if default is not _marker:
return default
raise AttributeError, key
# disabled as session should be dictionary like
# def __getattr__(self, key, default=_marker):
# if key in self.data:
# return self.__getitem__(key)
# if default is not _marker:
# return default
# raise AttributeError, key
def __getitem__(self, key):
if key in self.data:
value = self.data[key]
if hasattr(value, '__of__'):
# returned it wrapped in aquisition context
value = value.__of__(self._aq_context)
return value
raise KeyError, key
def __setitem__(self, key, item):
self.data[key] = aq_base(item)
# save value without its acquisition context
Session.__setitem__(self, key, aq_base(item))
def __str__(self):
return self.__repr__()
class DistributedSession(Session):
""" Distributed Session dictionary.
It's use together with DistributedRamCache or SQLCache cache (storage) plugins."""
# session_id used to get respective dictionary from memcached
session_id = None
def _updateStorage(self):
""" Update backend storage. """
global storage_plugin
storage_plugin.set(self.session_id, \
SESSION_SCOPE, \
value = self, \
cache_duration = getattr(self, 'session_duration', DEFAULT_SESSION_DURATION))
# need to override methods that change session so changes are transparently sent to backend storage
def __setitem__(self, key, item):
Session.__setitem__(self, key, aq_base(item))
self._updateStorage()
def __delitem__(self, key):
Session.__delitem__(self, key)
self._updateStorage()
def clear(self):
Session.clear(self)
self._updateStorage()
def update(self, dict=None, **kwargs):
Session.update(self, dict, **kwargs)
self._updateStorage()
def setdefault(self, key, failobj=None):
Session.setdefault(self, key, failobj)
self._updateStorage()
def pop(self, key, *args):
Session.pop(self, key, *args)
self._updateStorage()
def popitem(self):
Session.popitem(self)
self._updateStorage()
def edit(self, **kw):
""" Edit session object. """
for key, item in kw.items():
self.__setitem__(key, item)
class SessionTool(BaseTool):
""" Using this tool you can get a RAM based session object by providing
""" Using this tool you can get a Session object by providing
your own generated session_id.
This session object can be used anywhere in Zope/ERP5 environment.
Because it uses lazy Cache as a storage backend you do not need to initialize it
(Cache will take care for that first time it's called).
This session object can be used anywhere in Zope / ERP5 environment.
It can be local RAM based or Distributed (memcached or SQL (MySQL)).
Its type depends on the type of cache plugin used under Cache Factory defined
as string in SESSION_CACHE_FACTORY and its first (and only) Cache Plugin.
You do not need to initialize it as this tool will initialize it as a plain dictionary for you.
Example:
session_id = '1234567'
session = context.portal_sessions[session_id]
session['shopping_cart'] = newTempOrder(context, '987654321')
session['shopping_cart'] = newTempOrder(context, '987654321') # will work only for local RAM sessions
(you can also use 'session.edit(shopping_cart= newTempOrder(context, '987654321'))' )
(later in another script you can acquire shopping_cart):
......@@ -107,11 +178,13 @@ class SessionTool(BaseTool):
shopping_cart = session['shopping_cart']
Please note that:
- developer is responsible for handling an uniform sessiond_id (using cookies for example).
- developer is responsible for handling an unique sessiond_id (using cookies for example).
- it's not recommended to store in portal_sessions ZODB persistent objects because in order
to store them in Cache(RAM) portal_sessions tool will remove aquisition wrapper. At "get"
to store them in Local RAM portal_sessions tool will remove aquisition wrapper. At "get"
request they'll be returend wrapped.
- it's recommended that developer store temporary RAM based objects like 'TempOrder'
- developer can store temporary RAM based objects like 'TempOrder' but ONLY
when using Local RAM type of sessions. In a distributed environment one can use only
pickable types ue to the nature of memcached server and MySQL storage.
"""
id = 'portal_sessions'
......@@ -120,41 +193,53 @@ class SessionTool(BaseTool):
allowed_types = ()
security = ClassSecurityInfo()
## the ERP5 cache factory used as a storage
_cache_factory = 'erp5_session_cache'
def __getitem__(self, key):
session = self._getSessionObject(key)
session = self.getSession(key)
session._updatecontext(self)
return session
def __setitem__(self, key, item):
session = self._getSessionObject(key)
session._updatecontext(self)
def getSession(self, session_id, session_duration=None):
""" Return session object. """
storage_plugin = self._getStoragePlugin()
# expire explicitly as each session can have a different life duration
storage_plugin.expireOldCacheEntries(forceCheck=1)
session = storage_plugin.get(session_id, SESSION_SCOPE, None)
if session is None:
# init it in cache and use different Session types based on cache plugin type used as a storage
storage_plugin_type = storage_plugin.__class__.__name__
if storage_plugin_type in ("RamCache",):
session = RamSession()
elif storage_plugin_type in ("DistributedRamCache", "SQLCache",):
session = DistributedSession()
session._updateSessionId(session_id)
if session_duration is None:
# set session duration (this is used from backend storage machinery for expire purposes)
session_duration = self.portal_caches[SESSION_CACHE_FACTORY].objectValues()[0].cache_duration
session._updateSessionDuration(session_duration)
storage_plugin.set(session_id, SESSION_SCOPE, session, session_duration)
else:
# cache plugin returns wrapper (CacheEntry instance)
session = session.value
return session
def newContent(self, id, **kw):
""" Create new session object. """
session = self._getSessionObject(id)
session = self.getSession(id)
session._updatecontext(self)
session.edit(**kw)
session.update(**kw)
return session
security.declareProtected(Permissions.ModifyPortalContent, 'manage_delObjects')
security.declareProtected(Permissions.AccessContentsInformation, 'manage_delObjects')
def manage_delObjects(self, ids=[], REQUEST=None):
""" Delete session object. """
storage_plugin = self._getStoragePlugin()
if not isinstance(ids, list) or isinstance(ids, list):
ids = [ids]
for session_id in ids:
cache_method = self._getCacheMethod(session_id)
cache_method.delete(session_id)
storage_plugin.delete(session_id, SESSION_SCOPE)
def _getCacheMethod(self, session_id):
""" Get caching method used to interact with Cache system. """
cache_method = CachingMethod(callable_object = wrapper, \
id = session_id, \
cache_factory = self._cache_factory)
return cache_method
def _getSessionObject(self, session_id):
""" Return session object. """
return self._getCacheMethod(session_id)()
def _getStoragePlugin(self):
""" Get cache storage plugin."""
global storage_plugin
storage_plugin = self.portal_caches.getRamCacheRoot()[SESSION_CACHE_FACTORY].getCachePluginList()[0]
return storage_plugin
......@@ -34,12 +34,23 @@ from Products.ERP5Type.Document import newTempOrder
from AccessControl.SecurityManagement import newSecurityManager
from zLOG import LOG
from Products.ERP5Type.tests.Sequence import SequenceList
from Products.ERP5Type.Tool.SessionTool import SESSION_CACHE_FACTORY
from string import letters as LETTERS
from random import choice
import time
try:
from transaction import get as get_transaction
except ImportError:
pass
primitives_kw = dict(attr_1 = ['list_item'], \
attr_2 = ('tuple1','tuple2',), \
attr_3 = 1, \
attr_4 = 0.1, \
attr_5 = {'some_key': 'some_value'}, \
attr_6 = 'string', )
class TestSessionTool(ERP5TypeTestCase):
run_all_test = 1
......@@ -58,17 +69,41 @@ class TestSessionTool(ERP5TypeTestCase):
user = uf.getUserById('ivan').__of__(uf)
newSecurityManager(None, user)
def stepTestAcquisitionRamSessionStorage(self, sequence=None,
def _changeCachePlugin(self, portal_type, storage_duration = 86400):
""" Change current cache plugin with new one. """
portal_caches = self.portal.portal_caches
session_cache_factory = getattr(portal_caches, SESSION_CACHE_FACTORY)
# remove current cache plugin
session_cache_factory.manage_delObjects(list(session_cache_factory.objectIds()))
cache_plugin = session_cache_factory.newContent(portal_type=portal_type)
cache_plugin.setCacheDuration(storage_duration)
cache_plugin.setIntIndex(0)
get_transaction().commit()
portal_caches.updateCache()
def stepTestSetGet(self, sequence=None,
sequence_list=None, **kw):
session = self.portal.portal_sessions[self.session_id]
session.clear()
session.update(primitives_kw)
session = self.portal.portal_sessions[self.session_id]
self.assertEquals(primitives_kw, session)
# API check
self.assert_(self.portal.portal_sessions[self.session_id] == \
self.portal.portal_sessions.getSession(self.session_id))
session.clear()
session.edit(**primitives_kw)
session = self.portal.portal_sessions[self.session_id]
self.assertEquals(primitives_kw, session)
def stepTestAcquisitionRamSessionStorage(self, sequence=None, \
sequence_list=None, **kw):
portal_sessions = self.getPortal().portal_sessions
session = portal_sessions.newContent(
self.session_id, \
attr_1 = newTempOrder(portal_sessions, '1'), \
attr_2 = newTempOrder(portal_sessions, '2'), \
attr_3 = 1, \
attr_4 = 0.1, \
attr_5 = {}, \
attr_6 = 'string',)
attr_2 = newTempOrder(portal_sessions, '2'),)
## check temp (RAM based) attributes stored in session
for i in range (1, 3):
attr_name = 'attr_%s' %i
......@@ -76,50 +111,70 @@ class TestSessionTool(ERP5TypeTestCase):
attr = session[attr_name]
self.assert_(str(i), attr.getId())
self.assert_(0 == len(attr.objectIds()))
## check primitive stype storage
self.assert_(1 == session['attr_3'])
self.assert_(0.1 == session['attr_4'])
self.assert_({} == session['attr_5'])
self.assert_('string' == session['attr_6'])
def stepDeleteSessionObjectAttributes(self, sequence=None,
def stepModifySession(self, sequence=None, \
sequence_list=None, **kw):
""" Delete session keys."""
""" Modify session and check that modifications are updated in storage backend."""
portal_sessions = self.getPortal().portal_sessions
session = portal_sessions.newContent(
self.session_id, \
attr_1 = newTempOrder(portal_sessions, '1'), \
attr_2 = newTempOrder(portal_sessions, '2'), \
attr_3 = 1, \
attr_4 = 0.1, \
attr_5 = {}, \
attr_6 = 'string',)
session = portal_sessions.newContent(self.session_id, \
**primitives_kw)
session = portal_sessions[self.session_id]
session.pop('attr_1')
session.pop('attr_2')
del session['attr_2']
# get again session object again and check that session value is updated
# (this makes sense for memcached/SQL)
session = portal_sessions[self.session_id]
self.assert_(not 'attr_1' in session.keys())
self.assert_(not 'attr_2' in session.keys())
def stepDeleteSessionObject(self, sequence=None,
session.update(**{'key_1':'value_1',
'key_2':'value_2',})
session = portal_sessions[self.session_id]
self.assert_('key_1' in session.keys())
self.assert_(session['key_1'] == 'value_1')
self.assert_('key_2' in session.keys())
self.assert_(session['key_2'] == 'value_2')
session.clear()
session = portal_sessions[self.session_id]
self.assert_(session == {})
session['pop_key'] = 'pop_value'
session = portal_sessions[self.session_id]
self.assert_(session['pop_key'] == 'pop_value')
session.popitem()
session = portal_sessions[self.session_id]
self.assert_(session == {})
session.setdefault('default', 'value')
session = portal_sessions[self.session_id]
self.assert_(session['default'] == 'value')
def stepDeleteClearSession(self, sequence=None, \
sequence_list=None, **kw):
""" Get session object and check keys stored in previous test. """
portal_sessions = self.getPortal().portal_sessions
session = portal_sessions.newContent(self.session_id, \
**primitives_kw)
# delete it
portal_sessions.manage_delObjects(self.session_id)
session = portal_sessions[self.session_id]
self.assert_({} == session)
# clear it
session = portal_sessions.newContent(
self.session_id, \
attr_1 = newTempOrder(portal_sessions, '1'), \
attr_2 = newTempOrder(portal_sessions, '2'), \
attr_3 = 1,
attr_4 = 0.1,
attr_5 = {},
attr_6 = 'string',)
## delete it
portal_sessions.manage_delObjects(self.session_id)
**primitives_kw)
session = portal_sessions[self.session_id]
self.assert_(primitives_kw == session)
session.clear()
session = portal_sessions[self.session_id]
self.assert_(0 == len(session.keys()))
self.assert_(session == {})
def stepTestSessionDictInterface(self, sequence=None,
def stepTestSessionDictInterface(self, sequence=None, \
sequence_list=None, **kw):
session = self.portal.portal_sessions[self.session_id]
session.clear()
session['foo'] = 'Bar'
self.assertTrue('foo' in session)
self.assertEquals('Bar', session['foo'])
......@@ -128,14 +183,52 @@ class TestSessionTool(ERP5TypeTestCase):
self.assertEquals('Default', session.get('bar', 'Default'))
self.assertRaises(KeyError, session.__getitem__, 'bar')
def stepTestSessionGetattr(self, sequence=None,
def stepTestSessionGetattr(self, sequence=None, \
sequence_list=None, **kw):
session = self.portal.portal_sessions[self.session_id]
session.clear()
session['foo'] = 'Bar'
self.assertEquals('Bar', session.foo)
#self.assertEquals('Bar', session.foo)
self.assertEquals('Default', getattr(session, 'bar', 'Default'))
self.assertRaises(AttributeError, getattr, session, 'bar')
def stepTestSessionBulkStorage(self, sequence=None, \
sequence_list=None, **kw):
""" Test massive session sets which uses different cache plugin. """
kw = {}
session = self.portal.portal_sessions[self.session_id]
session.clear()
session = self.portal.portal_sessions[self.session_id]
# test many sets
for i in range(0, 500):
v = ''.join([choice(LETTERS) for x in range(10)])
session[i] = v
kw[i] = v
session = self.portal.portal_sessions[self.session_id]
self.assertEquals(kw, session)
# test big session
session.clear()
for key, item in kw.items():
kw[key] = ''.join([choice(LETTERS) for x in range(1000)])
session.update(kw)
session = self.portal.portal_sessions[self.session_id]
self.assertEquals(kw, session)
def stepTestSessionExpire(self, sequence=None, \
sequence_list=None, **kw):
""" Test expire session which uses different cache plugin. """
interval = 3
portal_sessions = self.getPortal().portal_sessions
portal_sessions.manage_delObjects(self.session_id)
session = portal_sessions.getSession(self.session_id, session_duration = interval)
session['key'] = 'value'
time.sleep(interval+1)
session = self.getPortal().portal_sessions.getSession(self.session_id)
# session should be an emty dic as it expired
self.assert_(session == {})
def test_01_CheckSessionTool(self, quiet=0, run=run_all_test):
""" Create portal_sessions tool and needed cache factory. """
if not run:
......@@ -157,11 +250,14 @@ class TestSessionTool(ERP5TypeTestCase):
LOG('Testing... ', 0, message)
sequence_list = SequenceList()
sequence_string = 'stepTestAcquisitionRamSessionStorage \
stepDeleteSessionObjectAttributes \
stepDeleteSessionObject \
sequence_string = 'stepTestSetGet \
stepTestAcquisitionRamSessionStorage \
stepModifySession \
stepDeleteClearSession \
stepTestSessionDictInterface \
stepTestSessionGetattr \
stepTestSessionBulkStorage \
stepTestSessionExpire \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
......@@ -171,13 +267,48 @@ class TestSessionTool(ERP5TypeTestCase):
if not run:
return
if not quiet:
message = '\nTest Distributed Session.'
message = '\nTest Distributed Session (memcached).'
ZopeTestCase._print(message)
LOG('Testing... ', 0, message)
# create memcached plugin and test
self._changeCachePlugin('Distributed Ram Cache')
sequence_list = SequenceList()
sequence_string = 'stepTestSetGet \
stepModifySession \
stepDeleteClearSession \
stepTestSessionDictInterface \
stepTestSessionGetattr \
stepTestSessionBulkStorage \
stepTestSessionExpire \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def test_04_SQLDistributedSession(self, quiet=0, run=run_all_test):
""" Test DistributedSession which uses SQL based cache plugin. """
if not run:
return
if not quiet:
message = '\nTest Distributed Session (SQL).'
ZopeTestCase._print(message)
LOG('Testing... ', 0, message)
# XXX: create memcached plugin and test
self.portal.portal_caches.clearAllCache()
# create memcached plugin and test
self._changeCachePlugin('SQL Cache')
sequence_list = SequenceList()
sequence_string = 'stepTestSetGet \
stepModifySession \
stepDeleteClearSession \
stepTestSessionDictInterface \
stepTestSessionGetattr \
stepTestSessionBulkStorage \
stepTestSessionExpire \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestSessionTool))
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment