Commit 330f26f7 authored by Arnaud Fontaine's avatar Arnaud Fontaine

WIP: ZODB Components: Migrate MemcachedTool (portal_memcached) from filesystem.

All {Unit,CodingStyle} Tests pass but SelectionTool may used portal_memcached
so this commit depends on migrating SelectionTool too.
parent 75309541
...@@ -2094,7 +2094,6 @@ class ERP5Generator(PortalGenerator): ...@@ -2094,7 +2094,6 @@ class ERP5Generator(PortalGenerator):
# Add ERP5Type Tool # Add ERP5Type Tool
addERP5Tool(p, 'portal_caches', 'Cache Tool') addERP5Tool(p, 'portal_caches', 'Cache Tool')
addERP5Tool(p, 'portal_memcached', 'Memcached Tool')
# Add erp5 catalog tool # Add erp5 catalog tool
addERP5Tool(p, 'portal_catalog', 'Catalog Tool') addERP5Tool(p, 'portal_catalog', 'Catalog Tool')
......
...@@ -31,7 +31,7 @@ from AccessControl import ClassSecurityInfo ...@@ -31,7 +31,7 @@ from AccessControl import ClassSecurityInfo
from Products.ERP5Type.XMLObject import XMLObject from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type import PropertySheet from Products.ERP5Type import PropertySheet
from Products.ERP5Type import Permissions from Products.ERP5Type import Permissions
from Products.ERP5Type.Tool.MemcachedTool import MemcachedDict from erp5.component.tool.MemcachedTool import MemcachedDict
from Products.ERP5Type.Globals import InitializeClass from Products.ERP5Type.Globals import InitializeClass
class MemcachedPlugin(XMLObject): class MemcachedPlugin(XMLObject):
...@@ -61,7 +61,7 @@ class MemcachedPlugin(XMLObject): ...@@ -61,7 +61,7 @@ class MemcachedPlugin(XMLObject):
security.declarePublic('getConnection') security.declarePublic('getConnection')
def getConnection(self): def getConnection(self):
try: try:
key, connection = self._v_connection key, connection = self._v_connection # pylint: disable=access-member-before-definition
except AttributeError: except AttributeError:
key = None key = None
url = self.getUrlString() url = self.getUrlString()
...@@ -79,4 +79,4 @@ class MemcachedPlugin(XMLObject): ...@@ -79,4 +79,4 @@ class MemcachedPlugin(XMLObject):
self._v_connection = my_key, connection self._v_connection = my_key, connection
return connection return connection
InitializeClass(MemcachedPlugin) InitializeClass(MemcachedPlugin)
\ No newline at end of file
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Document Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>default_reference</string> </key>
<value> <string>MemcachedPlugin</string> </value>
</item>
<item>
<key> <string>default_source_reference</string> </key>
<value> <string>Products.ERP5Type.Core.MemcachedPlugin</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>document.erp5.MemcachedPlugin</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Document Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.Workflow"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_log</string> </key>
<value>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2006 Nexedi SARL and Contributors. All Rights Reserved.
# Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import time
from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions, _dtmldir
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Globals import DTMLFile, InitializeClass
from quopri import encodestring
MEMCACHED_TOOL_MODIFIED_FLAG_PROPERTY_ID = '_v_memcached_edited'
class _MemcacheTool(BaseTool):
id = "portal_memcached"
meta_type = "ERP5 Memcached Tool"
portal_type = "Memcached Tool"
manage_options = (
{
'label': 'Configure',
'action': 'memcached_tool_configure',
},
) + BaseTool.manage_options
try:
import memcache
except ImportError:
memcache = None
def encodeKey(key):
"""
Encode the key like 'Quoted Printable'.
"""
# According to the memcached's protocol.txt, the key cannot contain
# control characters and white spaces.
return encodestring(key, True).replace('\n', '').replace('\r', '')
if memcache is not None:
# Real memcache tool
from Shared.DC.ZRDB.TM import TM
from Products.PythonScripts.Utility import allow_class
from zLOG import LOG, INFO
MARKER = object()
DELETE_ACTION = 0
UPDATE_ACTION = 1
_client_pool = {}
def getClient(server_list, server_max_key_length, server_max_value_length):
"""
Pool memcache.Client instances.
This is possible as there is no such thing as a database snapshot on
memcached connections (unlike, for example, mysql).
Also, memcached.Client instance are thread-safe (by inheriting from
threading.local), so we only need one instance per parameter set (and
we use few enough parameter variants to make this manageable).
"""
key = (
tuple(sorted(server_list)),
server_max_key_length,
server_max_value_length,
)
try:
return _client_pool[key]
except KeyError:
client = _client_pool[key] = memcache.Client(
server_list,
pickleProtocol=-1, # use the highest available version
server_max_key_length=server_max_key_length,
server_max_value_length=server_max_value_length,
)
return client
class MemcachedDict(TM):
"""
Present memcached similarly to a dictionary (not all method are
available).
Uses transactions to only update memcached at commit time.
No conflict generation/resolution : last edit wins.
"""
def __init__(self, server_list, expiration_time=0,
server_max_key_length=memcache.SERVER_MAX_KEY_LENGTH,
server_max_value_length=memcache.SERVER_MAX_VALUE_LENGTH,
):
"""
server_list (tuple of strings)
Servers to connect to, in 'host:port' format.
expiration_time (int)
Entry expiration time. See "Expiration times" in memcache protocol
spec. Summary:
0 = never
less than 60*60*24*30 = time starting at entry creation/update
more = absolute unix timestamp
server_max_key_length (int)
Maximum key length. Storing larger keys will cause an exception to be
raised.
server_max_value_length (int)
Maximum value length. Storing larger values will cause an exception to
be raised.
"""
# connection cache with duration limited to transaction length.
self.local_cache = {}
# Each key in scheduled_action_dict must be handled at commit.
# UPDATE_ACTION: send local_cache value to server
# DELETE_ACTION: delete on server
self.scheduled_action_dict = {}
self.server_list = server_list
# see "Expiration times" from memcached protocol docs
# (this simulates relative expiration time greater than 30 days)
self.expiration_time_since_epoch = expiration_time >= 2592000
self.expiration_time = expiration_time
self.server_max_key_length = server_max_key_length
self.server_max_value_length = server_max_value_length
self.memcached_connection = getClient(
server_list,
server_max_key_length=server_max_key_length,
server_max_value_length=server_max_value_length,
)
def _finish(self, *ignored):
"""
Actually modifies the values in memcached.
This avoids multiple accesses to memcached during the transaction.
Invalidate all local cache to make sure changes donc by other zopes
would not be ignored.
"""
try:
expiration_time = self.expiration_time
if self.expiration_time_since_epoch:
expiration_time += time.time()
for key, value in self.local_cache.iteritems():
if getattr(value, MEMCACHED_TOOL_MODIFIED_FLAG_PROPERTY_ID, None):
delattr(value, MEMCACHED_TOOL_MODIFIED_FLAG_PROPERTY_ID)
self.scheduled_action_dict[key] = UPDATE_ACTION
for key, action in self.scheduled_action_dict.iteritems():
encoded_key = encodeKey(key)
if action is UPDATE_ACTION:
self.memcached_connection.set(
encoded_key,
self.local_cache[key],
expiration_time,
)
elif action is DELETE_ACTION:
self.memcached_connection.delete(encoded_key, 0)
except Exception:
# This is a cache. Failing to push data to server must be fine, as long as
# cleanup succeeds.
LOG('MemcachedDict', INFO, 'An exception occured during _finish', error=True)
self.__cleanup()
def _abort(self, *ignored):
self.__cleanup()
def __cleanup(self):
self.local_cache.clear()
self.scheduled_action_dict.clear()
def __getitem__(self, key):
"""
Get an item from local cache, otherwise from memcached.
"""
# We need to register in this function too to be able to flush cache at
# transaction end.
self._register()
if self.scheduled_action_dict.get(key) == DELETE_ACTION:
raise KeyError
result = self.local_cache.get(key, MARKER)
if result is MARKER:
encoded_key = encodeKey(key)
try:
result = self.memcached_connection.get(encoded_key)
except memcache.Client.MemcachedConnectionError:
LOG('MemcacheTool', INFO, 'get command to memcached server (%r) failed' % (self.server_list,), error=True)
raise KeyError
self.local_cache[key] = result
return result
def __setitem__(self, key, value):
"""
Set an item to local cache and schedule update of memcached.
"""
self._register()
self.scheduled_action_dict[key] = UPDATE_ACTION
self.local_cache[key] = value
def __delitem__(self, key):
"""
Schedule key for deletion in memcached.
Set the value to None in local cache to avoid gathering the value
from memcached.
Never raises KeyError because action is delayed.
"""
self._register()
self.scheduled_action_dict[key] = DELETE_ACTION
self.local_cache[key] = None
def set(self, key, value):
return self.__setitem__(key, value)
def get(self, key, default=None):
try:
return self.__getitem__(key)
except KeyError:
return default
class SharedDict(object):
"""
Class to make possible for multiple "users" to store data in the same
dictionary without risking to overwrite other's data.
Each "user" of the dictionary must get an instance of this class.
"""
def __init__(self, dictionary, prefix):
"""
dictionary
Instance of dictionary to share.
prefix
Prefix used by the "user" owning an instance of this class.
"""
self._dictionary = dictionary
self.prefix = prefix
def _prefixKey(self, key):
if not isinstance(key, basestring):
raise TypeError, 'Key %s is not a string. Only strings are supported as key in SharedDict' % (repr(key), )
return '%s_%s' % (self.prefix, key)
def __getitem__(self, key):
return self._dictionary.__getitem__(self._prefixKey(key))
def __setitem__(self, key, value):
self._dictionary.__setitem__(self._prefixKey(key), value)
def __delitem__(self, key):
self._dictionary.__delitem__(self._prefixKey(key))
# These are the method names called by zope
__guarded_setitem__ = __setitem__
__guarded_getitem__ = __getitem__
__guarded_delitem__ = __delitem__
def get(self, key, default=None):
return self._dictionary.get(self._prefixKey(key), default)
def set(self, key, value):
self._dictionary.set(self._prefixKey(key), value)
allow_class(SharedDict)
class MemcachedTool(_MemcacheTool):
"""
Memcached interface available as a tool.
"""
security = ClassSecurityInfo()
memcached_tool_configure = DTMLFile('memcached_tool_configure', _dtmldir)
erp5_site_global_id = ''
security.declareProtected(Permissions.AccessContentsInformation, 'getMemcachedDict')
def getMemcachedDict(self, key_prefix, plugin_path):
"""
Returns an object which can be used as a dict and which gets from/stores
to memcached server.
key_prefix
Mandatory argument allowing different tool users to share the same
dictionary key namespace.
plugin_path
relative_url of dedicated Memcached Plugin
"""
memcached_plugin = self.restrictedTraverse(plugin_path, None)
if memcached_plugin is None:
raise ValueError('Memcached Plugin does not exists: %r' % (
plugin_path, ))
global_prefix = self.erp5_site_global_id
if global_prefix:
key_prefix = global_prefix + '_' + key_prefix
return SharedDict(memcached_plugin.getConnection(), prefix=key_prefix)
InitializeClass(MemcachedTool)
else:
# Placeholder memcache tool
class MemcachedTool(_MemcacheTool):
"""
Dummy MemcachedTool placeholder.
"""
title = "DISABLED"
security = ClassSecurityInfo()
def failingMethod(self, *args, **kw):
"""
if this function is called and memcachedtool is disabled, fail loudly
with a meaningfull message.
"""
raise RuntimeError, 'MemcachedTool is disabled. You should ask the'\
' server administrator to enable it by installing python-memcached.'
memcached_tool_configure = failingMethod
getMemcachedDict = failingMethod
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Tool Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>default_reference</string> </key>
<value> <string>MemcachedTool</string> </value>
</item>
<item>
<key> <string>default_source_reference</string> </key>
<value> <string>Products.ERP5Type.Tool.MemcachedTool</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>tool.erp5.MemcachedTool</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Tool Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.Workflow"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_log</string> </key>
<value>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Memcached Tool" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_count</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>_mt_index</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
<item>
<key> <string>_tree</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>portal_memcached</string> </value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="Length" module="BTrees.Length"/>
</pickle>
<pickle> <int>0</int> </pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="OOBTree" module="BTrees.OOBTree"/>
</pickle>
<pickle>
<none/>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="OOBTree" module="BTrees.OOBTree"/>
</pickle>
<pickle>
<none/>
</pickle>
</record>
</ZopeData>
...@@ -24,6 +24,7 @@ document.erp5.InvoiceLine ...@@ -24,6 +24,7 @@ document.erp5.InvoiceLine
document.erp5.Item document.erp5.Item
document.erp5.MailMessage document.erp5.MailMessage
document.erp5.MappedValue document.erp5.MappedValue
document.erp5.MemcachedPlugin
document.erp5.Movement document.erp5.Movement
document.erp5.Order document.erp5.Order
document.erp5.PackingList document.erp5.PackingList
......
...@@ -7,6 +7,7 @@ tool.erp5.DeliveryTool ...@@ -7,6 +7,7 @@ tool.erp5.DeliveryTool
tool.erp5.DiffTool tool.erp5.DiffTool
tool.erp5.DomainTool tool.erp5.DomainTool
tool.erp5.IntrospectionTool tool.erp5.IntrospectionTool
tool.erp5.MemcachedTool
tool.erp5.NotificationTool tool.erp5.NotificationTool
tool.erp5.OrderTool tool.erp5.OrderTool
tool.erp5.PasswordTool tool.erp5.PasswordTool
......
...@@ -7,6 +7,7 @@ portal_diff ...@@ -7,6 +7,7 @@ portal_diff
portal_domains portal_domains
portal_ids portal_ids
portal_introspections portal_introspections
portal_memcached
portal_notifications portal_notifications
portal_orders portal_orders
portal_password portal_password
......
...@@ -38,11 +38,6 @@ from Products.ERP5Type import interfaces ...@@ -38,11 +38,6 @@ from Products.ERP5Type import interfaces
import zope.interface import zope.interface
from base64 import encodestring from base64 import encodestring
try:
from Products.ERP5Type.Tool.MemcachedTool import MemcachedDict, SharedDict
except ImportError:
LOG('DistributedRamCache', 0, 'unable to import memcache')
## global dictionary containing connection objects ## global dictionary containing connection objects
connection_pool = local() connection_pool = local()
...@@ -83,6 +78,7 @@ class DistributedRamCache(BaseCache): ...@@ -83,6 +78,7 @@ class DistributedRamCache(BaseCache):
try: try:
dictionary = local_dict[configuration_key] dictionary = local_dict[configuration_key]
except KeyError: except KeyError:
from erp5.component.tool.MemcachedTool import MemcachedDict
dictionary = MemcachedDict(self._servers.split('\n'), dictionary = MemcachedDict(self._servers.split('\n'),
expiration_time=self._expiration_time, expiration_time=self._expiration_time,
server_max_key_length=self._server_max_key_length, server_max_key_length=self._server_max_key_length,
...@@ -93,6 +89,7 @@ class DistributedRamCache(BaseCache): ...@@ -93,6 +89,7 @@ class DistributedRamCache(BaseCache):
def getCacheStorage(self, **kw): def getCacheStorage(self, **kw):
"""Follow MemcachedTool.getMemcachedDict implementation """Follow MemcachedTool.getMemcachedDict implementation
""" """
from erp5.component.tool.MemcachedTool import SharedDict
return SharedDict(self._getMemcachedDict(), prefix=self._key_prefix) return SharedDict(self._getMemcachedDict(), prefix=self._key_prefix)
def _getCacheId(self, cache_id, scope): def _getCacheId(self, cache_id, scope):
......
...@@ -26,299 +26,8 @@ ...@@ -26,299 +26,8 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# #
############################################################################## ##############################################################################
import time
from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions, _dtmldir
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Globals import DTMLFile, InitializeClass
from quopri import encodestring
# XXX: This file has been kept only for backward compatibility as
# ERP5Form.Selection imports it and cannot be migrated to ZODB Component
# as this is a non-ERP5 object
MEMCACHED_TOOL_MODIFIED_FLAG_PROPERTY_ID = '_v_memcached_edited' MEMCACHED_TOOL_MODIFIED_FLAG_PROPERTY_ID = '_v_memcached_edited'
class _MemcacheTool(BaseTool):
id = "portal_memcached"
meta_type = "ERP5 Memcached Tool"
portal_type = "Memcached Tool"
manage_options = (
{
'label': 'Configure',
'action': 'memcached_tool_configure',
},
) + BaseTool.manage_options
try:
import memcache
except ImportError:
memcache = None
def encodeKey(key):
"""
Encode the key like 'Quoted Printable'.
"""
# According to the memcached's protocol.txt, the key cannot contain
# control characters and white spaces.
return encodestring(key, True).replace('\n', '').replace('\r', '')
if memcache is not None:
# Real memcache tool
from Shared.DC.ZRDB.TM import TM
from Products.PythonScripts.Utility import allow_class
from zLOG import LOG, INFO
MARKER = object()
DELETE_ACTION = 0
UPDATE_ACTION = 1
_client_pool = {}
def getClient(server_list, server_max_key_length, server_max_value_length):
"""
Pool memcache.Client instances.
This is possible as there is no such thing as a database snapshot on
memcached connections (unlike, for example, mysql).
Also, memcached.Client instance are thread-safe (by inheriting from
threading.local), so we only need one instance per parameter set (and
we use few enough parameter variants to make this manageable).
"""
key = (
tuple(sorted(server_list)),
server_max_key_length,
server_max_value_length,
)
try:
return _client_pool[key]
except KeyError:
client = _client_pool[key] = memcache.Client(
server_list,
pickleProtocol=-1, # use the highest available version
server_max_key_length=server_max_key_length,
server_max_value_length=server_max_value_length,
)
return client
class MemcachedDict(TM):
"""
Present memcached similarly to a dictionary (not all method are
available).
Uses transactions to only update memcached at commit time.
No conflict generation/resolution : last edit wins.
"""
def __init__(self, server_list, expiration_time=0,
server_max_key_length=memcache.SERVER_MAX_KEY_LENGTH,
server_max_value_length=memcache.SERVER_MAX_VALUE_LENGTH,
):
"""
server_list (tuple of strings)
Servers to connect to, in 'host:port' format.
expiration_time (int)
Entry expiration time. See "Expiration times" in memcache protocol
spec. Summary:
0 = never
less than 60*60*24*30 = time starting at entry creation/update
more = absolute unix timestamp
server_max_key_length (int)
Maximum key length. Storing larger keys will cause an exception to be
raised.
server_max_value_length (int)
Maximum value length. Storing larger values will cause an exception to
be raised.
"""
# connection cache with duration limited to transaction length.
self.local_cache = {}
# Each key in scheduled_action_dict must be handled at commit.
# UPDATE_ACTION: send local_cache value to server
# DELETE_ACTION: delete on server
self.scheduled_action_dict = {}
self.server_list = server_list
# see "Expiration times" from memcached protocol docs
# (this simulates relative expiration time greater than 30 days)
self.expiration_time_since_epoch = expiration_time >= 2592000
self.expiration_time = expiration_time
self.server_max_key_length = server_max_key_length
self.server_max_value_length = server_max_value_length
self.memcached_connection = getClient(
server_list,
server_max_key_length=server_max_key_length,
server_max_value_length=server_max_value_length,
)
def _finish(self, *ignored):
"""
Actually modifies the values in memcached.
This avoids multiple accesses to memcached during the transaction.
Invalidate all local cache to make sure changes donc by other zopes
would not be ignored.
"""
try:
expiration_time = self.expiration_time
if self.expiration_time_since_epoch:
expiration_time += time.time()
for key, value in self.local_cache.iteritems():
if getattr(value, MEMCACHED_TOOL_MODIFIED_FLAG_PROPERTY_ID, None):
delattr(value, MEMCACHED_TOOL_MODIFIED_FLAG_PROPERTY_ID)
self.scheduled_action_dict[key] = UPDATE_ACTION
for key, action in self.scheduled_action_dict.iteritems():
encoded_key = encodeKey(key)
if action is UPDATE_ACTION:
self.memcached_connection.set(
encoded_key,
self.local_cache[key],
expiration_time,
)
elif action is DELETE_ACTION:
self.memcached_connection.delete(encoded_key, 0)
except Exception:
# This is a cache. Failing to push data to server must be fine, as long as
# cleanup succeeds.
LOG('MemcachedDict', INFO, 'An exception occured during _finish', error=True)
self.__cleanup()
def _abort(self, *ignored):
self.__cleanup()
def __cleanup(self):
self.local_cache.clear()
self.scheduled_action_dict.clear()
def __getitem__(self, key):
"""
Get an item from local cache, otherwise from memcached.
"""
# We need to register in this function too to be able to flush cache at
# transaction end.
self._register()
if self.scheduled_action_dict.get(key) == DELETE_ACTION:
raise KeyError
result = self.local_cache.get(key, MARKER)
if result is MARKER:
encoded_key = encodeKey(key)
try:
result = self.memcached_connection.get(encoded_key)
except memcache.Client.MemcachedConnectionError:
LOG('MemcacheTool', INFO, 'get command to memcached server (%r) failed' % (self.server_list,), error=True)
raise KeyError
self.local_cache[key] = result
return result
def __setitem__(self, key, value):
"""
Set an item to local cache and schedule update of memcached.
"""
self._register()
self.scheduled_action_dict[key] = UPDATE_ACTION
self.local_cache[key] = value
def __delitem__(self, key):
"""
Schedule key for deletion in memcached.
Set the value to None in local cache to avoid gathering the value
from memcached.
Never raises KeyError because action is delayed.
"""
self._register()
self.scheduled_action_dict[key] = DELETE_ACTION
self.local_cache[key] = None
def set(self, key, value):
return self.__setitem__(key, value)
def get(self, key, default=None):
try:
return self.__getitem__(key)
except KeyError:
return default
class SharedDict(object):
"""
Class to make possible for multiple "users" to store data in the same
dictionary without risking to overwrite other's data.
Each "user" of the dictionary must get an instance of this class.
"""
def __init__(self, dictionary, prefix):
"""
dictionary
Instance of dictionary to share.
prefix
Prefix used by the "user" owning an instance of this class.
"""
self._dictionary = dictionary
self.prefix = prefix
def _prefixKey(self, key):
if not isinstance(key, basestring):
raise TypeError, 'Key %s is not a string. Only strings are supported as key in SharedDict' % (repr(key), )
return '%s_%s' % (self.prefix, key)
def __getitem__(self, key):
return self._dictionary.__getitem__(self._prefixKey(key))
def __setitem__(self, key, value):
self._dictionary.__setitem__(self._prefixKey(key), value)
def __delitem__(self, key):
self._dictionary.__delitem__(self._prefixKey(key))
# These are the method names called by zope
__guarded_setitem__ = __setitem__
__guarded_getitem__ = __getitem__
__guarded_delitem__ = __delitem__
def get(self, key, default=None):
return self._dictionary.get(self._prefixKey(key), default)
def set(self, key, value):
self._dictionary.set(self._prefixKey(key), value)
allow_class(SharedDict)
class MemcachedTool(_MemcacheTool):
"""
Memcached interface available as a tool.
"""
security = ClassSecurityInfo()
memcached_tool_configure = DTMLFile('memcached_tool_configure', _dtmldir)
erp5_site_global_id = ''
security.declareProtected(Permissions.AccessContentsInformation, 'getMemcachedDict')
def getMemcachedDict(self, key_prefix, plugin_path):
"""
Returns an object which can be used as a dict and which gets from/stores
to memcached server.
key_prefix
Mandatory argument allowing different tool users to share the same
dictionary key namespace.
plugin_path
relative_url of dedicated Memcached Plugin
"""
memcached_plugin = self.restrictedTraverse(plugin_path, None)
if memcached_plugin is None:
raise ValueError('Memcached Plugin does not exists: %r' % (
plugin_path, ))
global_prefix = self.erp5_site_global_id
if global_prefix:
key_prefix = global_prefix + '_' + key_prefix
return SharedDict(memcached_plugin.getConnection(), prefix=key_prefix)
InitializeClass(MemcachedTool)
else:
# Placeholder memcache tool
class MemcachedTool(_MemcachedTool):
"""
Dummy MemcachedTool placeholder.
"""
title = "DISABLED"
security = ClassSecurityInfo()
def failingMethod(self, *args, **kw):
"""
if this function is called and memcachedtool is disabled, fail loudly
with a meaningfull message.
"""
raise RuntimeError, 'MemcachedTool is disabled. You should ask the'\
' server administrator to enable it by installing python-memcached.'
memcached_tool_configure = failingMethod
getMemcachedDict = failingMethod
...@@ -88,7 +88,7 @@ import Products.ERP5Type.Workflow ...@@ -88,7 +88,7 @@ import Products.ERP5Type.Workflow
def initialize( context ): def initialize( context ):
# Import Product Components # Import Product Components
from Tool import (CacheTool, MemcachedTool, from Tool import (CacheTool,
TypesTool, PropertySheetTool, TypesTool, PropertySheetTool,
ComponentTool) ComponentTool)
import Document import Document
...@@ -103,7 +103,6 @@ def initialize( context ): ...@@ -103,7 +103,6 @@ def initialize( context ):
XMLObject.XMLObject, XMLObject.XMLObject,
ERP5TypeInformation, ) ERP5TypeInformation, )
portal_tools = ( CacheTool.CacheTool, portal_tools = ( CacheTool.CacheTool,
MemcachedTool.MemcachedTool,
TypesTool.TypesTool, TypesTool.TypesTool,
PropertySheetTool.PropertySheetTool, PropertySheetTool.PropertySheetTool,
ComponentTool.ComponentTool ComponentTool.ComponentTool
......
...@@ -1000,11 +1000,17 @@ class ERP5TypeCommandLineTestCase(ERP5TypeTestCaseMixin): ...@@ -1000,11 +1000,17 @@ class ERP5TypeCommandLineTestCase(ERP5TypeTestCaseMixin):
def _updateMemcachedConfiguration(self): def _updateMemcachedConfiguration(self):
"""Update default memcached plugin configuration """Update default memcached plugin configuration
""" """
portal_memcached = self.portal.portal_memcached try:
connection_dict = _getVolatileMemcachedServerDict() default_memcached_plugin = self.portal.portal_memcached.default_memcached_plugin
url_string = '%(hostname)s:%(port)s' % connection_dict # May not be present after upgrading from filesystem to ZODB Components
if portal_memcached.default_memcached_plugin.getUrlString() != url_string: # (testUpgradeInstanceWithOldDataFs)
portal_memcached.default_memcached_plugin.setUrlString(url_string) except AttributeError:
pass
else:
connection_dict = _getVolatileMemcachedServerDict()
url_string = '%(hostname)s:%(port)s' % connection_dict
if default_memcached_plugin.getUrlString() != url_string:
default_memcached_plugin.setUrlString(url_string)
def _clearActivity(self, quiet=0): def _clearActivity(self, quiet=0):
"""Clear activities if `erp5_tests_recreate_catalog` environment variable is """Clear activities if `erp5_tests_recreate_catalog` environment variable is
......
...@@ -257,8 +257,7 @@ def installRealMemcachedTool(portal): ...@@ -257,8 +257,7 @@ def installRealMemcachedTool(portal):
def _recreateMemcachedTool(portal): def _recreateMemcachedTool(portal):
"""Recreate the memcached tool for this portal. """Recreate the memcached tool for this portal.
""" """
from Products.ERP5Type.Tool import MemcachedTool portal.portal_components.reset(force=True)
reload(MemcachedTool)
portal.manage_delObjects(['portal_memcached']) portal.manage_delObjects(['portal_memcached'])
portal.newContent(id='portal_memcached', portal_type="Memcached Tool") portal.newContent(id='portal_memcached', portal_type="Memcached Tool")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment