Commit 0569958a authored by Aurel's avatar Aurel

add a first version for client part, handler for storage is missing

and cache management must be complete


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@42 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 1f2cea9f
from ZODB import BaseStorage, ConflictResolution, POSException
from ZODB.utils import p64, u64, cp, z64
class NEOStorageError(POSException.StorageError):
pass
class NEOStorageConflictError(NEOStorageError):
pass
class NEOStorageNotFoundError(NEOStorageError):
pass
class NEOStorage(BaseStorage.BaseStorage,
ConflictResolution.ConflictResolvingStorage):
"""Wrapper class for neoclient."""
def __init__(self, master_addr, master_port, read_only=False, **kw):
self._is_read_only = read_only
from neo.client.app import Application # here to prevent recursive import
self.app = Application(master_addr, master_port)
def load(self, oid, version=None):
try:
self.app.load(oid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError (oid)
def close(self):
self.app.close()
def cleanup(self):
self.app.cleanup()
def lastSerial(self):
self.app.lastSerial()
def lastTransaction(self):
self.app.lastTransaction()
def new_oid(self):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.new_oid()
def tpc_begin(self, transaction, tid=None, status=' '):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.tpc_begin(transaction, tid, status)
def tpc_vote(self, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.tpc_vote(transaction)
def tpc_abort(self, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.tpc_abort(transaction)
def tpc_finish(self, transaction, f=None):
self.app.tpc_finish(transaction, f)
def store(self, oid, serial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
try:
self.app.store(oid, serial, data, version, transaction)
except NEOStorageConflictError:
new_data = self.app.tryToResolveConflict(oid, self.app.tid, serial, data)
if new_data is not None:
# try again after conflict resolution
self.store(oid, serial, new_data, version, transaction)
else:
raise POSException.ConflictError(oid=oid,
serials=(self.app.tid, serial), data=data)
def _clear_temp(self):
self.app._clear_temp()
# mutliple revisions
def loadSerial(self, oid, serial):
try:
self.app.loadSerial(oid,serial)
except NEOStorageNotFoundError:
raise POSException.POSKeyError (oid, serial)
def loadBefore(self, oid, tid):
try:
self.app.loadBefore(self, oid, tid)
except NEOStorageNotFoundError:
raise POSException.POSKeyError (oid, tid)
def iterator(self, start=None, stop=None):
self.app.iterator(start, stop)
# undo
def undo(self, transaction_id, txn):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.undo(transaction_id, txn)
def undoInfo(self, first=0, last=-20, specification=None):
# XXX is it needed ?
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.undoInfo(first, last, specification)
def undoLog(self, first, last, filter):
if self._is_read_only:
raise POSException.ReadOnlyError()
self.app.undoLog(first, last, filter)
import logging
import os
from time import time
from threading import Lock, Condition
from cPickle import dumps, loads
from zlib import compress, alder32, decompress
from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode
from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection
from neo.protocol import Packet, INVALID_UUID, CLIENT_NODE_TYPE, UP_TO_DATE_STATE
from neo.client.master import MasterEventHandler
from neo.client.NEOStorage import NEOStorageConflictError, NEOStorageKeyError
class ConnectionManager(object):
"""This class manage a pool of connection to storage node."""
def __init__(self, storage, pool_size=25):
self.storage = storage
self.pool_size = 0
self.max_pool_size = pool_size
self.connection_dict = {}
# define a lock in order to create one connection to
# a storage node at a time to avoid multiple connection
# to the same node
l = Lock()
self.connection_lock_acquire = l.acquire
self.connection_lock_release = l.release
def _initNodeConnection(self, addr):
"""Init a connection to a given storage node."""
handler = StorageEventHandler(self.storage)
conn = ClientConnection(self.storage.em, handler, addr)
msg_id = conn.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid, addr[0],
addr[1], 'main')
conn.expectMessage(msg_id)
while 1:
self.em.poll(1)
if self.storage_node is not None:
break
logging.debug('connected to a storage node %s' %(addr,))
return conn
def _dropConnection(self,):
"""Drop a connection."""
pass
def _createNodeConnection(self, node):
"""Create a connection to a given storage node."""
self.connection_lock_acquire()
try:
# check dict again, maybe another thread
# just created the connection
if self.connection_dict.has_key(node.getUUID()):
return self.connection_dict[node.getUUID()]
if self.pool_size > self.max_pool_size:
# must drop some unused connections
self.dropConnection()
conn = self._initNodeConnection(node.getServer())
# add node to node manager
if not self.storage.nm.hasNode(node):
n = StorageNode(node.getServer())
self.storage.nm.add(n)
self.connection_dict[node.getUUID()] = conn
return conn
finally:
self.connection_lock_release()
def getConnForNode(self, node):
"""Return connection object to a given node
If no connection exists, create a new one"""
if self.connection_dict.has_key(node.getUUID()):
# Already connected to node
return self.connection_dict[node.getUUID()]
else:
# Create new connection to node
return self._createNodeConnection(node)
class Application(object):
"""The client node application."""
def __init__(self, master_addr, master_port, **kw):
logging.basicConfig(level = logging.DEBUG)
logging.debug('master node address is %s, port is %d' %(master_addr, master_port))
# Internal Attributes
self.em = EventManager()
self.nm = NodeManager()
self.cm = ConnectionManager(self)
self.pt = None
self.primary_master_node = None
self.master_conn = None
self.uuid = None
self.mq_cache = MQ()
self.new_oid_list = [] # List of new oid for ZODB
self.txn = None # The current transaction
self.tid = None # The current transaction id
self.txn_finished = 0 # Flag to know when transaction finished on master
self.txn_stored = 0 # Flag to knwo when transaction has well been stored
self.loaded_object = None # Current data of the object we are loading
# object_stored is used to know if storage node
# accepted the object or raised a conflict
# 0 : no answer yet
# 1 : ok
# 2 : conflict
self.object_stored = 0
# Lock definition :
# _oid_lock is used in order to not call multiple oid
# generation at the same time
# _txn_lock lock the entire transaction process, it is acquire
# at tpc begin and release at tpc_finish or tpc_abort
# _cache_lock is used for the client cache
# _load_lock is acquire to protect self.loaded_object used in event
# handler when retrieving object from storage node
# _undo_log_lock is used when retrieving undo information
lock = Lock()
self._oid_lock_acquire = lock.acquire
self._oid_lock_release = lock.release
lock = Lock()
self._txn_lock_acquire = lock.acquire
self._txn_lock_release = lock.release
lock = Lock()
self._cache_lock_acquire = lock.acquire
self._cache_lock_release = lock.release
lock = Lock()
self._load_lock_acquire = lock.acquire
self._load_lock_release = lock.release
lock = Lock()
self._undo_log_lock_acquire = lock.acquire
self._undo_log_lock_release = lock.release
# XXX Generate an UUID for self. For now, just use a random string.
# Avoid an invalid UUID.
if self.uuid is None:
while 1:
uuid = os.urandom(16)
if uuid != INVALID_UUID:
break
self.uuid = uuid
# Connect to primary master node
defined_master_addr = (master_addr, master_port)
while 1:
self.node_not_ready = 0
logging.debug("trying to connect to primary master...")
self.connectToPrimaryMasterNode(defined_master_addr)
if not self.node_not_ready and self.pt.filled():
# got a connection and partition table
break
else:
# wait a bit before reasking
t = time()
while time() < t + 1:
pass
logging.info("connected to primary master node")
def connectToPrimaryMasterNode(self, defined_master_addr):
"""Connect to the primary master node."""
handler = MasterEventHandler(self)
n = MasterNode(server = defined_master_addr)
self.nm.add(n)
# Connect to defined master node and get primary master node
if self.primary_master_node is None:
conn = ClientConnection(self.em, handler, defined_master_addr)
msg_id = conn.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
defined_master_addr[0],
defined_master_addr[1], 'main')
conn.addPacket(p)
conn.expectMessage(msg_id)
while 1:
self.em.poll(1)
if self.primary_master_node is not None:
break
if self.node_not_ready:
# must wait
return
logging.debug('primary master node is %s' %(self.primary_master_node.server,))
# Close connection if not already connected to primary master node
if self.primary_master_node.server != defined_master_addr:
for conn in self.em.getConnectionList():
if not isinstance(conn, ListeningConnection):
conn.close()
# Connect to primary master node
conn = ClientConnection(self.em, handler, self.primary_master_node.server)
msg_id = conn.getNextId()
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
self.primary_master_node.server[0],
self.primary_master_node.server[1] , 'main')
conn.addPacket(p)
conn.expectMessage(msg_id)
self.master_conn = conn
# Wait for primary master node information
while 1:
self.em.poll(1)
if self.pt.filled() or self.node_not_ready:
break
def new_oid(self):
"""Get a new OID."""
self._oid_lock_acquire()
try:
if len(self.new_oid_list) == 0:
# Get new oid list from master node
# we manage a list of oid here to prevent
# from asking too many time new oid one by one
# from master node
conn = self.master_conn
conn.getNextId()
p = Packet()
p.askNewOIDList(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
while 1:
self.em.poll(1)
if len(self.new_oid_list) > 0:
break
return sellf.new_oid_list.pop()
finally:
self._oid_lock_release()
def _load(self, oid, serial=""):
"""Internal method which manage load and loadSerial."""
partition_id = oid % self.num_paritions
# Only used up to date node for retrieving object
storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \
if x.getState() == UP_TO_DATE_STATE]
self._load_lock_acquire()
data = None
# Store data on each node
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
conn.getNextId()
p = Packet()
p.askObjectByOID(msg_id, oid, serial)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.loaded_object = None
try:
while 1:
self.em.poll(1)
if self.loaded_object is not None:
break
if self.loaded_object == -1:
# OID not found
continue
# Copy object data here to release lock as soon as possible
noid, serial, compressed, crc, data = self.loaded_object
finally:
self._load_lock_release()
# Check data here
if noid != oid:
# Oops, try with next node
logging.error('got wrong oid %s instead of %s from node %s' \
%(noid, oid, storage_node.getServer()))
# Reacquire lock and try again
self._load_lock_acquire()
continue
elif compressed and crc != alder32(data):
# Check crc if we use compression
logging.error('wrong crc from node %s for oid %s' \
%(storage_node.getServer(), oid))
# Reacquire lock and try again
self._load_lock_acquire()
continue
else:
break
if data is None:
# We didn't got any object from storage node
raise NEOStorageNotFoundError()
# Uncompress data
if compressed:
data = decompressed(data)
try:
# Put object into cache
self.cache_lock_acquire()
self.cache[oid] = serial, data
return loads(data), serial
finally:
self.cache_lock_release()
def load(self, oid, version=None):
"""Load an object for a given oid."""
# First try from cache
self._cache_lock_acquire = lock.acquire
try:
if oid in self.cache:
return loads(self.cache[oid][1]), self.cache[oid][0]
finally:
self._cache_lock_release = lock.release
# Otherwise get it from storage node
return self._load(oid)
def loadSerial(self, oid, serial):
"""Load an object for a given oid."""
# Do not try in cache as it managed only up-to-date object
return self._load(oid, serial), None
def lastTransaction(self):
# does not seem to be used
return
def tpc_begin(self, transaction, tid=None, status=' '):
"""Begin a new transaction."""
# First get a transaction, only one is allowed at a time
self._txn_lock_acquire()
if self.txn == transaction:
# Wa have already began the same transaction
return
self.txn = transaction
# Init list of oid used in this transaction
self.txn_oid_list = []
# Get a new transaction id if necessary
if tid is None:
self.tid = None
conn = self.master_conn
conn.getNextId()
p = Packet()
p.askNewTID(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
while 1:
self.em.poll(1)
if self.tid is not None:
break
else:
self.tid = tid
def store(self, oid, serial, data, version, transaction):
"""Store object."""
if transaction is not self.txn:
raise POSException.StorageTransactionError(self, transaction)
# Find which storage node to use
partition_id = oid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
# Store data on each node
for storage_node in storage_node_list:
conn = self.getConnForNode(storage_node.getUUID())
conn.getNextId()
p = Packet()
# Compres data with zlib
compressed_data = compress(dumps(data))
crc = alder32(compressed_data)
p.askStoreObject(msg_id, oid, serial, 1, crc, compressed_data, self.tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.object_stored = 0
while 1:
self.em.poll(1)
if self.object_stored == 1:
self.txn_oid_list.append(oid)
break
elif self.object.stored == 2:
# Conflict, removed oid from list
try:
self.txn_oid_list.remove(oid)
except ValueError:
# Oid wasn't already stored in list
pass
raise NEOStorageConflictError()
def tpc_vote(self, transaction):
"""Store current transaction."""
if transaction is not self.txn:
raise POSException.StorageTransactionError(self, transaction)
user = transaction.user
desc = transaction.description
ext = transaction._extension # XXX Need a dump ?
partition_id = self.tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
# Store data on each node
for storage_node in storage_node_list:
conn = self.getConnForNode(storage_node.getUUID())
conn.getNextId()
p = Packet()
p.askStoreTransaction(msg_id, self.tid, user, desc, ext, oid_list)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.txn_stored == 0
while 1:
self.em.poll(1)
if self.txn_stored:
break
def _clear_txn(self):
"""Clear some transaction parameter and release lock."""
self.txn = None
self._txn_lock_release()
def tpc_abort(self, transaction):
"""Abort current transaction."""
if transaction is not self.txn:
return
try:
# Abort transaction on each node used for it
# In node where objects were stored
aborted_node = {}
for oid in self.txn_oid_list:
partition_id = oid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node):
conn = self.getConnForNode(storage_node.getUUID())
conn.getNextId()
p = Packet()
p.abortTransaction(msg_id, self.tid)
conn.addPacket(p)
aborted_node[storage_node] = 1
# In nodes where transaction was stored
partition_id = self.tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node):
conn = self.getConnForNode(storage_node.getUUID())
conn.getNextId()
p = Packet()
p.abortTransaction(msg_id, self.tid)
conn.addPacket(p)
finally:
self._clear_txn()
def tpc_finish(self, transaction, f=None):
"""Finish current transaction."""
if self.txn is not transaction:
return
try:
# Call function given by ZODB
if f is not None:
f()
# Call finish on master
conn = self.master_conn
conn.getNextId()
p = Packet()
p.finishTransaction(msg_id, self.oid_list, self.tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.txn_finished = 0
while 1:
self.em.poll(1)
if self.txn_finished:
break
# XXX must update cache here...
# Release transaction
return self.tid
finally:
self._clear_txn()
def loadBefore(self, oid, tid):
pass
def undo(self, transaction_id, txn):
if transaction is not self.txn:
raise POSException.StorageTransactionError(self, transaction)
# First abort on primary master node
# Second abort on storage node
# Then invalidate cache
return tid, oid_list
def undoLog(self, first, last, filter):
# First get list of transaction from master node
self.undo_log_lock_acquire()
try:
conn = self.master_conn
conn.getNextId()
p = Packet()
p.getTIDList(msg_id, first, last)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.undo_tid_list = None
while 1:
self.em.poll(1)
# must take care of order here
if self.undo_tid_list is not None:
break
# For each transaction, get info
undo_txn_list = []
for tid in undo_tid_list:
partition_id = tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
conn = self.getConnForNode(storage_node.getUUID())
conn.getNextId()
p = Packet()
p.getTransactionInformation(msg_id, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.undo_txn_info = None
while 1:
self.em.poll(1)
if self.undo_txn_info is not None:
break
undo_txn_list.append(self.undo_txn_info)
return undo_txn_dict
finally:
self.undo_log_lock_release()
def __del__(self):
"""Clear all connection."""
# Due to bug in ZODB, close is not always called when shutting
# down zope, so use __del__ to close connections
for conn in self.em.getConnectionList():
conn.close()
close = __del__
import logging
from neo.handler import EventHandler
from neo.connection import ClientConnection
from neo.protocol import Packet, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID
from neo.node import MasterNode, StorageNode, ClientNode
from neo.pt import PartitionTable
class MasterEventHandler(EventHandler):
"""This class deals with events for a master."""
def __init__(self, app):
self.app = app
EventHandler.__init__(self)
def handleNotReady(self, conn, packet, message):
if isinstance(conn, ClientConnection):
app = self.app
app.node_not_ready = 1
else:
self.handleUnexpectedPacket(conn, packet)
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas):
if isinstance(conn, ClientConnection):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
if node_type != MASTER_NODE_TYPE:
# The peer is not a master node!
logging.error('%s:%d is not a master node', ip_address, port)
app.nm.remove(node)
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
app.nm.remove(node)
conn.close()
return
conn.setUUID(uuid)
node.setUUID(uuid)
# Create partition table if necessary
if app.pt is None:
app.pt = PartitionTable(num_partitions, num_replicas)
# Ask a primary master.
msg_id = conn.getNextId()
conn.addPacket(Packet().askPrimaryMaster(msg_id))
conn.expectMessage(msg_id)
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
if isinstance(conn, ClientConnection):
app = self.app
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
if primary_node.getUUID() == primary_uuid:
# Whatever the situation is, I trust this master.
app.primary_master_node = primary_node
else:
self.handleUnexpectedPacket(conn, packet)
def handleSendPartitionTable(self, conn, packet, row_list):
if isinstance(conn, ClientConnection):
app = self.app
for offset, node in row_list:
app.pt.setRow(offset, row)
else:
self.handleUnexpectedPacket(conn, packet)
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
# Register new nodes.
addr = (ip_address, port)
if app.server == addr:
# This is self.
continue
else:
n = app.app.nm.getNodeByServer(addr)
if n is None:
if node_type == MASTER_NODE:
n = MasterNode(server = addr)
elif node_typ == STORAGE_NODE:
n = StorageNode(server = addr)
elif node_typ == CLIENT_NODE:
n = ClientNode(server = addr)
else:
continue
app.app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
def handleNotifyPartitionChanges(self, conn, packet, cell_list):
if isinstance(conn, ClientConnection):
app = self.app
for cell in cell_list:
app.pt.addNode(cell)
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerNewTID(self, conn, packet, tid):
if isinstance(conn, ClientConnection):
app = self.app
app.tid = tid
else:
self.handleUnexpectedPacket(conn, packet)
def handleNotifyTransactionFinished(self, conn, packet, tid):
if isinstance(conn, ClientConnection):
app = self.app
if tid != app.tid:
# what's this ?
raise
else:
app.txn_finished = 1
else:
self.handleUnexpectedPacket(conn, packet)
def handleInvalidateObjects(self, conn, packet, oid_list):
raise NotImplementedError('this method must be overridden')
def handleAnswerNewOIDList(self, conn, packet, oid_list):
if isinstance(conn, ClientConnection):
app = self.app
app.new_oid_list = oid_list
app.new_oid_list.reverse()
else:
self.handleUnexpectedPacket(conn, packet)
##############################################################################
#
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved.
# Yoshinori Okuji <yo@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
"""
Multi-Queue Cache Algorithm.
"""
from math import log
class Element:
"""
This class defines an element of a FIFO buffer.
"""
pass
class FIFO:
"""
This class implements a FIFO buffer.
"""
def __init__(self):
self._head = None
self._tail = None
self._len = 0
def __len__(self):
return self._len
def append(self):
element = Element()
element.next = None
element.prev = self._tail
if self._tail is not None:
self._tail.next = element
self._tail = element
if self._head is None:
self._head = element
self._len += 1
return element
def head(self):
return self._head
def tail(self):
return self._tail
def shift(self):
element = self._head
if element is None:
return None
del self[element]
return element
def __delitem__(self, element):
if element.next is None:
self._tail = element.prev
else:
element.next.prev = element.prev
if element.prev is None:
self._head = element.next
else:
element.prev.next = element.next
self._len -= 1
class Data:
"""
Data for each element in a FIFO buffer.
"""
pass
class MQ:
"""
This class manages cached data by a variant of Multi-Queue.
This class caches various sizes of objects. Here are some considerations:
- Expired objects are not really deleted immediately. But if GC is invoked too often,
it degrades the performance significantly.
- If large objects are cached, the number of cached objects decreases. This might affect
the cache hit ratio. It might be better to tweak a buffer level according to the size of
an object.
- Stored values must be strings.
- The size calculation is not accurate.
"""
def __init__(self, life_time=10000, buffer_levels=9, max_history_size=100000, max_size=20*1024*1024):
self._history_buffer = FIFO()
self._cache_buffers = []
for level in range(buffer_levels):
self._cache_buffers.append(FIFO())
self._data = {}
self._time = 0
self._life_time = life_time
self._buffer_levels = buffer_levels
self._max_history_size = max_history_size
self._max_size = max_size
self._size = 0
def has_key(self, id):
if id in self._data:
data = self._data[id]
if data.level >= 0:
return 1
return 0
__contains__ = has_key
def fetch(self, id):
"""
Fetch a value associated with the id.
"""
if id in self._data:
data = self._data[id]
if data.level >= 0:
del self._cache_buffers[data.level][data.element]
value = data.value
self._size -= len(value) # XXX inaccurate
self.store(id, value)
return value
raise KeyError, "%s was not found in the cache" % id
__getitem__ = fetch
def get(self, id, d=None):
try:
return self.fetch(id)
except KeyError:
return d
def _evict(self, id):
"""
Evict an element to the history buffer.
"""
data = self._data[id]
self._size -= len(data.value) # XXX inaccurate
del self._cache_buffers[data.level][data.element]
element = self._history_buffer.append()
data.level = -1
data.element = element
delattr(data, 'value')
delattr(data, 'expire_time')
element.data = data
if len(self._history_buffer) > self._max_history_size:
element = self._history_buffer.shift()
del self._data[element.data.id]
def store(self, id, value):
if id in self._data:
data = self._data[id]
level, element, counter = data.level, data.element, data.counter + 1
if level >= 0:
del self._cache_buffers[level][element]
else:
del self._history_buffer[element]
else:
counter = 1
# XXX It might be better to adjust the level according to the object size.
level = int(log(counter, 2))
if level >= self._buffer_levels:
level = self._buffer_levels - 1
element = self._cache_buffers[level].append()
data = Data()
data.id = id
data.expire_time = self._time + self._life_time
data.level = level
data.element = element
data.value = value
data.counter = counter
element.data = data
self._data[id] = data
self._size += len(value) # XXX inaccurate
self._time += 1
# Expire old elements.
for level in range(self._buffer_levels):
cache_buffer = self._cache_buffers[level]
head = cache_buffer.head()
if head is not None and head.data.expire_time < self._time:
del cache_buffer[head]
data = head.data
if level > 0:
new_level = level - 1
element = cache_buffer[new_level].append()
element.data = data
data.expire_time = self._time + self._life_time
data.level = new_level
data.element = element
else:
self._evict(data.id)
# Limit the size.
size = self._size
max_size = self._max_size
if size > max_size:
for cache_buffer in self._cache_buffers:
while size > max_size:
element = cache_buffer.shift()
if element is None:
break
data = element.data
del self._data[data.id]
size -= len(data.value) # XXX inaccurate
if size <= max_size:
break
self._size = size
__setitem__ = store
def invalidate(self, id):
if id in self._data:
data = self._data[id]
if data.level >= 0:
del self._cache_buffers[data.level][data.element]
self._evict(id)
return
raise KeyError, "%s was not found in the cache" % id
__delitem__ = invalidate
# Here is a test.
if __name__ == '__main__':
cache = MQ()
cache[1] = "1"
cache[2] = "2"
assert cache.get(1) == "1", 'cannot get 1'
assert cache.get(2) == "2", 'cannot get 2'
assert cache.get(3) == None, 'can get 3!'
del cache[1]
assert cache.get(1) == None, 'can get 1!'
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment