Commit a7d9afc7 authored by Chris McDonough's avatar Chris McDonough

New TransientObjectContainer implementation.

Changes:

 - More stable under high usage, especially in the face of
   situations under which there are many ZODB conflict
   errors. The previous implementation had stability problems
   when many conflict errors were encountered; especially
   conflicts that were generated as a result of a simultaneous
   change to a subobject of the TOC (such as in the case of a Zope
   application which makes heavy use of both frames and
   sessions).

 - More conflict-resistant.  Instead of ignoring the likelihood
   that multiple threads will attempt to perform the same actions
   simultaneously in methods of the TOC (which often causes
   conflicts), the new implementation attempts to avoid conflicts
   by employing a chance-based housekeeping model.  In this model,
   one thread is "elected" by chance to do the kinds of tasks that
   cause the most conflicts.

 - Now uses a "timeslice" based model instead of a "ring" based
   model.  This also helps cut down on conflicts and makes
   the code slighly less obfuscated (not much, though! ;-)

 - Quite a few more comments in the code.

 - Changes to the sessioning stresstest (which exposed the
   bug that made me reimplement the TOC in the first place).

 - Updates to unit tests.

 - A "HowTransienceWorks.stx" document which attempts to
   explain how the code works.  It's not stellar, but
   it's a start.

 - Changes to the TransientObject class that the TOC
   hands out (typically as a "session data object"), in order
   to make invalidation less Rube-Goldberg-ish.

The structure of the TOC object has changed enough that in order to
maintain b/w compatibility, an in-place upgrade of "old" instances
is implied by running them with this code.   "Upgraded" instances
are not backwards-incompatible, however, so folks can hopefully
move back and forth between Zope versions without much hassle.
parent 23ae0c58
......@@ -24,7 +24,9 @@ from Products.Sessions.SessionDataManager import \
from Products.Transience.Transience import \
TransientObjectContainer, TransientObject
from Products.TemporaryFolder.TemporaryFolder import MountedTemporaryFolder
from ZODB.POSException import InvalidObjectReference, ConflictError
from Products.TemporaryFolder.LowConflictConnection import LowConflictConnection
from ZODB.Connection import Connection
from ZODB.POSException import InvalidObjectReference, ConflictError, ReadConflictError, BTreesConflictError
from DateTime import DateTime
from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
import time, threading, random
......@@ -32,6 +34,7 @@ from cPickle import UnpickleableError
from ZODB.DemoStorage import DemoStorage
from OFS.Application import Application
import sys
from zLOG import log_time
sys.setcheckinterval(200)
tf_name = 'temp_folder'
......@@ -65,7 +68,7 @@ def _populate(app):
bidmgr = BrowserIdManager(idmgr_name)
tf = MountedTemporaryFolder(tf_name, title="Temporary Folder")
toc = TransientObjectContainer(toc_name, title='Temporary '
'Transient Object Container', timeout_mins=20)
'Transient Object Container', timeout_mins=1)
session_data_manager=SessionDataManager(id='session_data_manager',
path='/'+tf_name+'/'+toc_name, title='Session Data Manager')
......@@ -89,7 +92,7 @@ def _populate(app):
get_transaction().commit()
class TestMultiThread(TestCase):
def testNonOverlappingBrowserIds(self):
def testOverlappingBrowserIds(self):
readers = []
writers = []
readiters = 20
......@@ -115,10 +118,7 @@ class TestMultiThread(TestCase):
while threading.activeCount() > 1:
time.sleep(1)
for thread in readers:
assert thread.out == [], thread.out
def testOverlappingBrowserIds(self):
def testNonOverlappingBrowserIds(self):
readers = []
writers = []
readiters = 20
......@@ -144,9 +144,6 @@ class TestMultiThread(TestCase):
while threading.activeCount() > 1:
time.sleep(1)
for thread in readers:
assert thread.out == [], thread.out
class BaseReaderWriter(threading.Thread):
def __init__(self, db, iters, sdm_name):
self.conn = db.open()
......@@ -166,13 +163,23 @@ class BaseReaderWriter(threading.Thread):
try:
self.run1()
return
except ReadConflictError:
print "read conflict"
except BTreesConflictError:
print "btrees conflict"
except ConflictError:
print "general conflict"
except:
get_transaction().abort()
print log_time()
raise
i = i + 1
#print "conflict %d" % i
if i > 3: raise
get_transaction().abort()
time.sleep(random.randrange(5) * .1)
finally:
self.conn.close()
del self.app
print i
class ReaderThread(BaseReaderWriter):
def run1(self):
......
How Transience Works
The object responsible for managing the expiration of "transient"
objects is the TransientObjectContainer, the class definition for
which is located in
Products.Transience.Transience.TransientObjectContainer. An
instance of this class is found in the default Zope installation at
/temp_folder/session_data.
The TransientObjectContainer (TOC) holds Transient Objects (TOs).
A TO is obtained via its container via a call to
TOC.new_or_existing(key), where "key" is usually the "browser id"
associated with a visitor (See Products.Session.BrowserIdManager).
If the TOC has a "current" TO corresponding to "key", it is
returned.
If the TOC does not have a "current" TO corresponding to "key", (due
to the expiration of the TO or because it never existed in the first
place) a "new" TO is manufactured and returned.
Timeslices
Transience defines the notion of a "timeslice". A "timeslice" is an
integer that represents some "slice" of time, defined by a "period".
For example, if a period is 20 seconds long, three ordered time
slices might be expressed as 0, 20, and 40. The next timeslice
would be 60, and so on. For an absolute time to "belong" to a
timeslice, it would need to be equal to or greater than one
timeslice integer, but less than the subsequent timeslice integer.
Data Structures Maintained by a Transient Object Container
The TOC maintains five important kinds of data structures:
- a "_data" structure, which is an IOBTree mapping a "timeslice"
integer to a "bucket" (see next bullet for definition of bucket).
- One or more "buckets", which are OOBTree objects which map a "key"
(usually browser id) to a TransientObject. Buckets are stored
inside of the "_data" structure. There is a concept of a
"current" bucket, which is the bucket that is contained within the
_data structured with a key equal to the "current" timeslice.
- An "index" which is an OOBTree mapping transient object "key" to
"timeslice", letting us quickly figure out which element in the _data
mapping contains the transient object related to the key. It is
stored as the attribute "_index" of the TOC. When calling code
wants to obtain a Transient Object, its key is looked up in the
index, which returns a timeslice. We ask the _data structure for the
bucket it has stored under that timeslice. Then the bucket is asked
for the object stored under the key. This returns the Transient Object.
- A "last timeslice" integer, which is equal to the "last" timeslice
under which TOC housekeeping operations were performed.
- A "next to deindex" integer, which is a timeslice
representing the next bucket which requires "deindexing"
(the removal of all the keys of the bucket from the index).
When a Transient Object is created via new_or_existing, it is added
to the "current" bucket. As time goes by, the bucket to which the
TO was added ceases to be the "current" bucket. If the transient
object is "accessed" (it is called up out of the TOC via the TOC's
'get' method), it is again moved to the "current" bucket defined by
the current time's timeslice.
During the course of normal operations, a TransientObject will move
from an "old" bucket to the "current" bucket many times, as long as
it continues to be accessed. It is possible for a TransientObject
to *never* expire, as long as it is called up out of its TOC often
enough.
If a TransientObject is not accessed in the period of time defined by
the TOC's "timeout", it is deindexed and eventually garbage collected.
How the TransientObjectContainer Determines if a TransientObject is "Current"
A TO is current if it has an entry in the "index". When a TO has an
entry in the index, it implies that the TO resides in a bucket that
is no "older" than the TOC timeout period, based on the bucket's
timeslice.
Housekeeping: Finalization, Notification, Garbage Collection, and
Bucket Replentishing
The TOC performs "deindexing", "notification", "garbage
collection", and "bucket replentishing". It performs these tasks
"in-band". This means that the TOC does not maintain a separate
thread that wakes up every so often to do these housekeeping tasks.
Instead, during the course of normal operations, the TOC
opportunistically performs them.
Deindexing is defined as the act of making an "expired" TO
inaccessible (by deleting it from the "index"). After a TO is
deindexed, it may not be used by application code any longer,
although it may "stick around" in a bucket for a while until the
bucket is eventually garbage collected.
Notification is defined as optionally calling a function at TOC
finalization time. The optional function call is user-defined, but
it is managed by the "notifyDestruct" method of the TOC.
Garbage collection is defined as deleting "expired" buckets in the
_data structure (the _data structure maps a timeslice to a bucket).
Bucket replentishing is defined as the action of (opportunistically)
creating more buckets to insert into the the _data structure,
replacing ones that are deleted during garbage collection. The act
of deleting a bucket does not necessarily imply that a new bucket
will be immediately created thereafter. We create new buckets in
batches to reduce the possibility of conflicts.
Goals
- A low number of ZODB conflict errors (which reduce performance).
- Stability.
To Do
- Testing under ZEO.
......@@ -11,42 +11,46 @@
#
##############################################################################
"""
Transient Object Container class.
Transient Object Container Class ('timeslice'-based design).
$Id: Transience.py,v 1.24 2002/01/11 14:55:22 chrism Exp $
$Id: Transience.py,v 1.25 2002/06/21 01:51:43 chrism Exp $
"""
__version__='$Revision: 1.24 $'[11:-2]
__version__='$Revision: 1.25 $'[11:-2]
import Globals
from Globals import HTMLFile
from TransienceInterfaces import ItemWithId,\
from TransienceInterfaces import Transient, DictionaryLike, ItemWithId,\
TTWDictionary, ImmutablyValuedMappingOfPickleableObjects,\
StringKeyedHomogeneousItemContainer, TransientItemContainer
from TransientObject import TransientObject
from OFS.SimpleItem import SimpleItem
from Persistence import Persistent
from Acquisition import Implicit
from AccessControl import ClassSecurityInfo, getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.User import nobody
from BTrees import OOBTree
from BTrees.OOBTree import OOBTree, OOBucket, OOSet
from BTrees.IOBTree import IOBTree
from BTrees.Length import Length
from zLOG import LOG, WARNING, BLATHER
import os, math, time, sys, random
import os.path
import os
import math, sys, random
import time
from types import InstanceType
from TransientObject import TransientObject
import thread
import ThreadLock
import Queue
DEBUG = os.environ.get('Z_TOC_DEBUG', '')
_marker = []
def DLOG(*args):
tmp = []
for arg in args:
tmp.append(str(arg))
LOG('Transience DEBUG', BLATHER, ' '.join(tmp))
DEBUG = os.environ.get('Z_TOC_DEBUG', '')
class MaxTransientObjectsExceeded(Exception): pass
_notfound = []
_marker = []
# permissions
MIN_SPARE_BUCKETS = 10 # minimum number of transient buckets to keep spare
PERIOD = 20 # attempt housekeeping every PERIOD seconds
ADD_CONTAINER_PERM = 'Add Transient Object Container'
MGMT_SCREEN_PERM = 'View management screens'
ACCESS_CONTENTS_PERM = 'Access contents information'
......@@ -57,21 +61,63 @@ MANAGE_CONTAINER_PERM = 'Manage Transient Object Container'
constructTransientObjectContainerForm = HTMLFile(
'dtml/addTransientObjectContainer', globals())
def TLOG(*args):
sargs = []
sargs.append(str(thread.get_ident()))
sargs.append(str(time.time()))
for arg in args:
sargs.append(str(arg))
LOG('Transience', BLATHER, ' '.join(sargs))
def constructTransientObjectContainer(self, id, title='', timeout_mins=20,
addNotification=None, delNotification=None, limit=0, REQUEST=None):
""" """
ob = TransientObjectContainer(
id, title, timeout_mins, addNotification, delNotification, limit=limit
)
ob = TransientObjectContainer(id, title, timeout_mins,
addNotification, delNotification, limit=limit)
self._setObject(id, ob)
if REQUEST is not None:
return self.manage_main(self, REQUEST, update_menu=1)
class TransientObjectContainer(SimpleItem):
""" Persists objects for a user-settable time period, after which it
expires them """
""" Object which contains items that are automatically flushed
after a period of inactivity """
meta_type = "Transient Object Container"
icon = "misc_/Transience/datacontainer.gif"
# chrism 6/20/2002
# I was forced to make this a mostly "synchronized" class, using
# a single ThreadLock instance ("lock" below). I realize this
# is paranoid and even a little sloppy. ;-)
#
# Rationale: in high-conflict situations without this lock, the
# index and the "data" (bucket) structure slowly get out of sync with
# one another. I'm only sure about one thing when it comes to this:
# I don't completely understand why. So, I'm not going to worry about
# it (indefinitely) as the locking solves it. "Slow and steady" is better
# than "fast and broken".
lock = ThreadLock.allocate_lock()
# notify_queue is a queue in which deindexed objects are placed
# for later processing by housekeeping, which calls the
# "delete notifier" at appropriate times. As threads pass through
# the housekeeping stage, they pull any unnotified objects from this
# queue and call the delete notifier. We use a queue here in order
# to not "double-notify" when two threads are doing housekeeping
# at the same time. Note that there may be a case where a conflict
# error is raised and the results of a delete notifier are not
# committed, but that is better than calling the delete notifier
# *again* on the retry.
notify_queue = Queue.Queue()
# replentish queue is a size-one queue. It is used as optimization
# to avoid conflicts. If you're running low on buckets, an entry is
# placed in the replentish queue. The next thread that does housekeeping
# to notice the entry will extend the buckets. Because queues are thread-
# safe, more than one thread will not attempt to replentish at the same
# time.
replentish_queue = Queue.Queue(1)
__implements__ = (ItemWithId,
StringKeyedHomogeneousItemContainer,
TransientItemContainer
......@@ -87,90 +133,69 @@ class TransientObjectContainer(SimpleItem):
)
security = ClassSecurityInfo()
ok = {'meta_type':1, 'id':1, 'icon':1, 'bobobase_modification_time':1 }
security.setDefaultAccess(ok)
security.setPermissionDefault(ACCESS_TRANSIENTS_PERM,
security.setPermissionDefault(MANAGE_CONTAINER_PERM,
['Manager',])
security.setPermissionDefault(MGMT_SCREEN_PERM,
['Manager',])
security.setPermissionDefault(ACCESS_CONTENTS_PERM,
['Manager','Anonymous'])
security.setPermissionDefault(MANAGE_CONTAINER_PERM,['Manager',])
security.setPermissionDefault(MGMT_SCREEN_PERM,['Manager',])
security.setPermissionDefault(ACCESS_CONTENTS_PERM,['Manager','Anonymous'])
security.setPermissionDefault(CREATE_TRANSIENTS_PERM,['Manager',])
security.setPermissionDefault(ACCESS_TRANSIENTS_PERM,
['Manager','Anonymous','Sessions'])
security.setPermissionDefault(CREATE_TRANSIENTS_PERM,
['Manager',])
security.declareProtected(MGMT_SCREEN_PERM, 'manage_container')
manage_container = HTMLFile('dtml/manageTransientObjectContainer',
globals())
_limit = 0
_data = None
security.setDefaultAccess('deny')
def __init__(self, id, title='', timeout_mins=20, addNotification=None,
delNotification=None, err_margin=.20, limit=0):
delNotification=None, limit=0):
self.id = id
self.title=title
self._addCallback = None
self._delCallback = None
self._err_margin = err_margin
self._reset()
self._setTimeout(timeout_mins)
self._setLimit(limit)
self._reset()
self._addCallback = None
self._delCallback = None
self.setDelNotificationTarget(delNotification)
self.setAddNotificationTarget(addNotification)
# -----------------------------------------------------------------
# ItemWithID
#
def getId(self):
return self.id
# -----------------------------------------------------------------
# StringKeyedHomogenousItemContainer
#
security.declareProtected(CREATE_TRANSIENTS_PERM, 'new_or_existing')
def new_or_existing(self, k):
item = self.get(k, _notfound)
if item is _notfound: return self.new(k)
else: return item
security.declareProtected(ACCESS_TRANSIENTS_PERM, 'get')
def get(self, k, default=_marker):
# Intentionally uses a different marker than _notfound
def new_or_existing(self, key):
self.lock.acquire()
try:
v = self[k]
except KeyError:
if default is _marker: return None
else: return default
else:
if hasattr(v, 'isValid') and v.isValid():
return v.__of__(self)
elif not hasattr(v, 'isValid'):
return v
else:
del self[k] # item is no longer valid, so we delete it
if default is _marker: return None
else: return default
DEBUG and TLOG('new_or_existing called with %s' % key)
notfound = []
item = self.get(key, notfound)
if item is notfound:
# intentionally dont call "new" here in order to avoid another
# call to "get"
item = TransientObject(key)
self[key] = item
self.notifyAdd(item)
return item.__of__(self)
finally:
self.lock.release()
security.declareProtected(CREATE_TRANSIENTS_PERM, 'new')
def new(self, k):
if type(k) is not type(''):
raise TypeError, (k, "key is not a string type")
if self.get(k, None) is not None:
raise KeyError, "duplicate key %s" % k # Not allowed to dup keys
item = TransientObject(k)
self[k] = item
def new(self, key):
self.lock.acquire()
try:
if type(key) is not type(''):
raise TypeError, (key, "key is not a string type")
if self.has_key(key):
raise KeyError, "cannot duplicate key %s" % key
item = TransientObject(key)
self[key] = item
self.notifyAdd(item)
return item.__of__(self)
security.declareProtected(ACCESS_TRANSIENTS_PERM, 'has_key')
def has_key(self, k):
v = self.get(k, _notfound)
if v is _notfound: return 0
return 1
# -----------------------------------------------------------------
# TransientItemContainer
#
finally:
self.lock.release()
security.declareProtected(MANAGE_CONTAINER_PERM, 'setTimeoutMinutes')
def setTimeoutMinutes(self, timeout_mins):
......@@ -184,38 +209,37 @@ class TransientObjectContainer(SimpleItem):
""" """
return self._timeout_secs / 60
security.declareProtected(MGMT_SCREEN_PERM, 'getSubobjectLimit')
def getSubobjectLimit(self):
""" """
return self._limit
security.declareProtected(MANAGE_CONTAINER_PERM, 'setSubobjectLimit')
def setSubobjectLimit(self, limit):
""" """
if limit != self.getSubobjectLimit():
self._setLimit(limit)
security.declareProtected(MGMT_SCREEN_PERM, 'getSubobjectLimit')
def getSubobjectLimit(self):
""" """
return self._limit
security.declareProtected(MGMT_SCREEN_PERM, 'getAddNotificationTarget')
def getAddNotificationTarget(self):
return self._addCallback or ''
security.declareProtected(MANAGE_CONTAINER_PERM,'setAddNotificationTarget')
def setAddNotificationTarget(self, f):
# We should assert that the callback function 'f' implements
# the TransientNotification interface
self._addCallback = f
security.declareProtected(MGMT_SCREEN_PERM, 'getDelNotificationTarget')
def getDelNotificationTarget(self):
return self._delCallback or ''
security.declareProtected(MANAGE_CONTAINER_PERM,
'setDelNotificationTarget')
security.declareProtected(MANAGE_CONTAINER_PERM,'setDelNotificationTarget')
def setDelNotificationTarget(self, f):
# We should assert that the callback function 'f' implements
# the TransientNotification interface
self._delCallback = f
# ----------------------------------------------
# Supporting methods (not part of the interface)
#
def notifyAdd(self, item):
if self._addCallback:
self._notify(item, 'add')
......@@ -259,7 +283,8 @@ class TransientObjectContainer(SimpleItem):
except:
# dont raise, just log
path = self.getPhysicalPath()
LOG('Transience', WARNING,
LOG('Transience',
WARNING,
'%s failed when calling %s in %s' % (name,callback,
'/'.join(path)),
error=sys.exc_info()
......@@ -275,18 +300,13 @@ class TransientObjectContainer(SimpleItem):
error=sys.exc_info()
)
# -----------------------------------------------------------------
# Management item support (non API)
#
security.declareProtected(MANAGE_CONTAINER_PERM,
'manage_changeTransientObjectContainer')
def manage_changeTransientObjectContainer(self, title='',
timeout_mins=20, addNotification=None, delNotification=None,
limit=0, REQUEST=None):
"""
Change an existing transient object container.
"""
def manage_changeTransientObjectContainer(
self, title='', timeout_mins=20, addNotification=None,
delNotification=None, limit=0, REQUEST=None
):
""" Change an existing transient object container. """
self.title = title
self.setTimeoutMinutes(timeout_mins)
self.setSubobjectLimit(limit)
......@@ -298,169 +318,507 @@ class TransientObjectContainer(SimpleItem):
self.setDelNotificationTarget(delNotification)
if REQUEST is not None:
return self.manage_container(self, REQUEST)
return self.manage_container(
self, REQUEST, manage_tabs_message='Changes saved.'
)
def _setTimeout(self, timeout_mins):
if type(timeout_mins) is not type(1):
raise TypeError, (timeout_mins, "Must be integer")
self._timeout_secs = timeout_mins * 60
self._timeout_secs = t_secs = timeout_mins * 60
# timeout_slices == fewest number of timeslices that's >= t_secs
self._timeout_slices=int(math.ceil(float(t_secs)/self._period))
def _setLimit(self, limit):
if type(limit) is not type(1):
raise TypeError, (limit, "Must be integer")
self._limit = limit
def _setLastAccessed(self, transientObject):
sla = getattr(transientObject, 'setLastAccessed', None)
if sla is not None: sla()
security.declareProtected(MGMT_SCREEN_PERM, 'nudge')
def nudge(self):
""" Used by mgmt interface to maybe turn the ring each time
a screen is shown """
self._getCurrentBucket()
def _reset(self):
if hasattr(self,'_ring'):
for k in self.keys():
try: self.notifyDestruct(self[k])
except KeyError: pass
t_secs = self._timeout_secs
r_secs = self._resolution_secs = int(t_secs * self._err_margin) or 1
numbuckets = int(math.floor(t_secs/r_secs)) or 1
def _getCurrentTimeslice(self):
"""
Return an integer representing the 'current' timeslice.
The current timeslice is guaranteed to be the same integer
within a 'slice' of time based on a divisor of 'period'.
'period' is the number of seconds in a slice.
"""
period = self._period
now = time.time()
low = int(math.floor(now)) - period + 1
high = int(math.ceil(now)) + 1
for x in range(low, high):
if x % period == 0:
return x
def _getTimeslices(self, begin, n):
""" Get a list of future timeslice integers of 'n' size """
l = []
i = 0
now = int(time.time())
for x in range(numbuckets):
dump_after = now + i
c = OOBTree.OOBTree()
l.insert(0, [c, dump_after])
i = i + r_secs
index = OOBTree.OOBTree()
self._ring = Ring(l, index)
for x in range(n):
l.append(begin + (x * self._period))
return l
def _getIndex(self):
""" returns the index, a mapping of TOC key to bucket """
self.lock.acquire()
try:
if self._data is None:
# do in-place upgrade of old instances
self._upgrade()
return self._index
finally:
self.lock.release()
def _upgrade(self):
""" upgrade older ring-based (2.5.X) TOC instances """
self.lock.acquire()
try:
self._reset()
timeout_mins = self._timeout_secs / 60
self._setTimeout(timeout_mins)
# iterate over all the buckets in the ring
for bucket, dump_after in self._ring._data:
# get all TOs in the ring and call our __setitem__
for k, v in bucket.items():
self[k] = v
# we probably should delete the old "_ring" attribute here,
# but leave it around in case folks switch back to 2.5.X
finally:
self.lock.release()
def _reset(self):
""" Reset ourselves to a sane state (deletes all content) """
self.lock.acquire()
try:
# set the period (the timeslice length)
self._period = PERIOD
# set the number of minimum spare buckets
self._min_spare_buckets = MIN_SPARE_BUCKETS
# _data contains a mapping of f-of-time(int) (aka "slice") to
# "bucket". Each bucket will contain a set of transient items.
# Transient items move automatically from bucket-to-bucket inside
# of the _data structure based on last access time (e.g.
# "get" calls), escaping destruction only if they move quickly
# enough.
# We make enough buckets initially to last us a while, and
# we subsequently extend _data with fresh buckets and remove old
# buckets as necessary during normal operations (see
# _housekeep()).
self._data = IOBTree()
# populate _data with some number of buckets, each of which
# is "current" for its timeslice key
for i in self._getTimeslices(self._getCurrentTimeslice(),
self._min_spare_buckets*2):
self._data[i] = OOBTree()
# _index is a mapping of transient item key -> slice, letting
# us quickly figure out which bucket in the _data mapping
# contains the transient object related to the key
self._index = OOBTree()
# our "__len__" is the length of _index.
# we need to maintain the length of the index structure separately
# because getting the length of a BTree is very expensive.
try: self.__len__.set(0)
except AttributeError: self.__len__ = self.getLen = Length()
# set up last_timeslice and deindex_next integer pointers
# we set them to the current timeslice as it's the only sane
# thing to do
self._last_timeslice=Increaser(self._getCurrentTimeslice())
self._deindex_next=Increaser(self._getCurrentTimeslice())
finally:
self.lock.release()
def _getCurrentBucket(self):
# no timeout always returns last bucket
if not self._timeout_secs:
DEBUG and DLOG('no timeout, returning first bucket')
b, dump_after = self._ring._data[0]
return b
index = self._ring._index
now = int(time.time())
i = self._timeout_secs
# expire all buckets in the ring which have a dump_after time that
# is before now, turning the ring as many turns as necessary to
# get to a non-expirable bucket.
to_clean = []
while 1:
l = b, dump_after = self._ring._data[-1]
if now > dump_after:
DEBUG and DLOG('dumping... now is %s' % now)
DEBUG and DLOG('dump_after for %s was %s'%(b, dump_after))
self._ring.turn()
# mutate elements in-place in the ring
new_dump_after = now + i
l[1] = new_dump_after
if b: to_clean.append(b)# only clean non-empty buckets
i = i + self._resolution_secs
"""
Do housekeeping if necessary, then return the 'current' bucket.
"""
self.lock.acquire()
try:
# do in-place upgrade of old "ring-based" instances if
# we've just upgraded from Zope 2.5.X
if self._data is None:
self._upgrade()
# pnow == the current timeslice
pnow = self._getCurrentTimeslice()
# plast == the last timeslice under which we did housekeeping
plast = self._last_timeslice()
plast = pnow - self._period
# data is the mapping from timeslice to bucket
data = self._data
if not data.has_key(pnow):
# we were asleep a little too long, we don't even have a
# current bucket; we create one for ourselves.
# XXX - currently this ignores going back in time.
DEBUG and TLOG('_getCurrentBucket: creating current bucket!')
data[pnow] = OOBTree()
if pnow <= plast:
# If we went "back in time" or if the timeslice hasn't
# changed, dont try to do housekeeping.
# Instead, just return the current bucket.
return pnow
# the current timeslice has changed since the last time we did
# housekeeping, so we're going to see if we need to finalize
# anything.
DEBUG and TLOG('_getCurrentBucket: new timeslice (pnow) %s' % pnow)
# period == number of seconds in a slice
period = self._period
# pmax == the last timeslice integer kept by _data as a key.
pmax = data.maxKey()
# housekeep_elected indicates that this thread was elected to do
# housekeeping. We set it off initially and only set it true if
# we "win the roll". The "roll" is necessary to avoid a conflict
# scenario where more than one thread tries to do housekeeping at
# the same time.
housekeep_elected = 0
# We ask this thread to "roll the dice." If it wins, it gets
# elected to do housekeeping
housekeep_elected = self._roll(pnow, pmax)
housekeep_elected and DEBUG and TLOG('housekeep elected')
# t_slices == this TOC's timeout expressed in slices
# (fewest number of timeslices that's >= t_secs)
t_slices = self._timeout_slices
# pprev = the truly previous timeslice in relation to pnow
pprev = pnow - period
# deindex_next == the timeslice of the bucket we need to start
# deindexing from
deindex_next = self._deindex_next()
# The ordered set implied by data.keys(deindex_next, pprev) is
# a set of all timeslices that may have entries in the index which
# are known about by _data, starting from "deindex_next" up to
# but not including the current timeslice. We iterate over
# these keys, deindexing buckets as necessary when they're older
# than the timeout.
# XXX - fixme! range search doesn't always work (btrees bug)
for k in data.keys(deindex_next, pprev):
if k < deindex_next:
DEBUG and TLOG(
'broken range search: key %s < min %s'
% (k, deindex_next)
)
continue
if k > pprev:
DEBUG and TLOG(
'broken range search: key %s > max %s'
% (k, pprev)
)
continue
# pthen == the number of seconds elapsed since the timeslice
# implied by k
pthen = pnow - k
# slices_since == the number of slices elapsed since the
# timeslice implied by k
slices_since = pthen / period
# if the number of slices since 'k' is less than the number of
# slices that make up the timeout, break out of this loop.
# (remember, this is an ordered set, and following keys are
# bound to be higher, meaning subsequent tests will also fail,
# so we don't even bother checking them)
if slices_since < t_slices:
DEBUG and TLOG(
'_getCurrentBucket: slices_since (%s)<t_slices (%s)' %
(slices_since, t_slices))
break
# if the bucket has keys, deindex them and add them to the
# notify queue (destruction notification happens during
# garbage collection)
bucket = data[k]
keys = bucket.keys()
for key in keys:
self.notify_queue.put((key, bucket[key]))
DEBUG and TLOG(
'_getCurrentBucket: deindexing keys %s' % list(keys))
keys and self._deindex(keys)
# set the "last deindexed" pointer to k + period
deindex_next = k+period
self._deindex_next.set(deindex_next)
# available_spares == the number of "spare" ("clean", "future")
# buckets that exist in "_data"
available_spares = (pmax-pnow) / period
DEBUG and TLOG(
'_getCurrentBucket: available_spares %s' % available_spares
)
# if we were elected to do housekeeping, do it now.
if housekeep_elected:
# delete_end == the last bucket we want to destroy
delete_end = deindex_next - period
# min_spares == minimum number of spare buckets acceptable
# by this TOC
min_spares = self._min_spare_buckets
if available_spares < min_spares:
DEBUG and TLOG(
'_getCurrentBucket: available_spares < min_spares'
)
# the first bucket we want to begin creating
replentish_start = pmax + period
try:
self.replentish_queue.put_nowait(replentish_start)
except Queue.Full:
DEBUG and TLOG(
'_getCurrentBucket: replentish queue full'
)
self._housekeep(delete_end)
# finally, bump the last_timeslice housekeeping counter and return
# the current bucket
self._last_timeslice.set(pnow)
return pnow
finally:
self.lock.release()
def _roll(self, pnow, pmax):
"""
Roll the dice to see if we're the lucky thread that does
housekeeping. This method is guaranteed to return true at
some point as the difference between pnow and pmax naturally
diminishes to zero.
The reason we do the 'random' dance in the last part of this
is to minimize the chance that two threads will attempt to
do housekeeping at the same time (causing conflicts and double-
notifications).
"""
period = self._period
low = pnow/period
high = pmax/period
if high <= low:
# we really need to win this roll because we have no
# spare buckets (and no valid values to provide to randrange), so
# we rig the toss.
DEBUG and TLOG("_roll: rigged toss")
return 1
else:
# we're not in an emergency bucket shortage, so we can take
# our chances during the roll. It's highly unlikely that two
# threads will win the roll simultaneously, so we avoid a certain
# class of conflicts here.
if random.randrange(low, high) == low: # WINNAH!
DEBUG and TLOG("_roll: roll winner")
return 1
DEBUG and TLOG("_roll: roll loser")
return 0
def _housekeep(self, delete_end):
""" do garbage collection, bucket replentishing and notification """
data = self._data
period = self._period
min_spares = self._min_spare_buckets
DEBUG and TLOG(
'_housekeep: current slice %s' % self._getCurrentTimeslice()
)
notify = {}
while 1:
try:
k, v = self.notify_queue.get_nowait()
# duplicates will be ignored
notify[k] = v
except Queue.Empty:
break
if to_clean: self._clean(to_clean, index)
b, dump_after = self._ring._data[0]
return b
def _clean(self, bucket_set, index):
# Build a reverse index. Eventually, I'll keep this in another
# persistent struct but I'm afraid of creating more conflicts right
# now. The reverse index is a mapping from bucketref -> OOSet of string
# keys.
rindex = {}
for k, v in list(index.items()):
# listifying above is dumb, but I think there's a btrees bug
# that causes plain old index.items to always return a sequence
# of even numbers
if rindex.get(v, _marker) is _marker: rindex[v]=OOBTree.OOSet([k])
else: rindex[v].insert(k)
if DEBUG: DLOG("rindex", rindex)
trans_obs = [] # sequence of objects that we will eventually finalize
for bucket_to_expire in bucket_set:
keys = rindex.get(bucket_to_expire, [])
if keys and DEBUG: DLOG("deleting")
for k in keys:
if DEBUG: DLOG(k)
trans_obs.append(bucket_to_expire[k])
del index[k]
try: self.__len__.change(-1)
except AttributeError: pass
bucket_to_expire.clear()
# finalize em
self.notifyDestruct(trans_obs)
to_notify = notify.values()
# if we have transient objects to notify about destruction, notify
# them (only once, that's why we use a queue) ("notification")
if to_notify:
DEBUG and TLOG('_housekeep: notifying: %s' % notify.keys())
self.notifyDestruct(to_notify)
security.declareProtected(MGMT_SCREEN_PERM, 'nudge')
def nudge(self):
""" Used by mgmt interface to turn the bucket set each time
a screen is shown """
self._getCurrentBucket()
# extend _data with extra buckets if necessary ("bucket replentishing")
try:
replentish_start = self.replentish_queue.get_nowait()
DEBUG and TLOG('_housekeep: replentishing')
new_bucket_keys=self._getTimeslices(replentish_start, min_spares)
DEBUG and TLOG('_housekeep: new_bucket_keys = %s '%new_bucket_keys)
for i in new_bucket_keys:
if data.has_key(i):
continue
data[i] = OOBTree()
except Queue.Empty:
DEBUG and TLOG('replentish queue empty')
# gc the stale buckets at the "beginning" of _data ("garbage collect")
# iterate over the keys in data that have no minimum value and
# a maximum value of delete_end (note: ordered set)
# XXX- fixme. range search doesn't always work (btrees bug)
for k in data.keys(None, delete_end):
if k > delete_end:
DEBUG and TLOG(
'_housekeep: broken range search (key %s > max %s)'
% (k, delete_end)
)
continue
bucket = data[k]
# delete the bucket from _data
del data[k]
DEBUG and TLOG('_housekeep: deleted data[%s]' % k)
def _deindex(self, keys):
""" Iterate over 'keys' and remove any that match from our index """
self.lock.acquire()
try:
index = self._getIndex()
for k in keys:
if index.has_key(k):
DEBUG and TLOG('_deindex: deleting %s' % k)
self.__len__.change(-1)
del index[k]
finally:
self.lock.release()
def __setitem__(self, k, v):
self.lock.acquire()
try:
notfound = []
current = self._getCurrentBucket()
index = self._ring._index
b = index.get(k)
if b is None:
# this is a new key
index[k] = current
index = self._getIndex()
b = index.get(k, notfound)
if b is notfound:
# if this is a new item, we do OOM protection before actually
# adding it to ourselves.
li = self._limit
# do OOM protection
if li and len(self) >= li:
LOG('Transience', WARNING,
('Transient object container %s max subobjects '
'reached' % self.id)
)
raise MaxTransientObjectsExceeded, (
"%s exceeds maximum number of subobjects %s" % (len(self), li)
)
"%s exceeds maximum number of subobjects %s" %
(len(self), li))
# do length accounting
try: self.__len__.change(1)
except AttributeError: pass
elif b is not current:
elif b != current:
# this is an old key that isn't in the current bucket.
del b[k] # delete it from the old bucket
index[k] = current
if self._data[b].has_key(k):
del self._data[b][k] # delete it from the old bucket
# change the value
current[k] = v
self._setLastAccessed(v)
DEBUG and TLOG('setitem: setting current[%s]=%s' % (k,v))
self._data[current][k] = v
# change the TO's last accessed time
if hasattr(v, 'setLastAccessed'):
v.setLastAccessed()
# set the index up with the current bucket for this key
index[k] = current
finally:
self.lock.release()
def __getitem__(self, k):
current = self._getCurrentBucket()
index = self._ring._index
self.lock.acquire()
try:
# we dont want to call getCurrentBucket here because we need to
# be able to raise a KeyError. The housekeeping steps
# performed in the getCurrentBucket method would be ignored
# if we raised a KeyError.
index = self._getIndex()
# the next line will raise the proper error if the item has expired
b = index[k]
v = b[k] # grab the value before we potentially time it out.
if b is not current:
# it's not optimal to do writes in getitem, but there's no choice.
# we accessed the object, so it should become current.
index[k] = current # change the index to the current bucket.
current[k] = v # add the value to the current bucket.
self._setLastAccessed(v)
del b[k] # delete the item from the old bucket.
v = self._data[b][k]
if hasattr(v, '__of__'):
return v.__of__(self)
else:
return v
finally:
self.lock.release()
def __delitem__(self, k):
self.lock.acquire()
try:
self._getCurrentBucket()
index = self._ring._index
index = self._getIndex()
b = index[k]
v = self._data[b][k]
del self._data[b][k]
self.__len__.change(-1)
if hasattr(v, '__of__'):
v = v.__of__(self)
del index[k]
del b[k]
finally:
self.lock.release()
self.notifyDestruct(v)
security.declareProtected(ACCESS_TRANSIENTS_PERM, '__len__')
def __len__(self):
""" this won't be called unless we havent run __init__ """
if DEBUG: DLOG('Class __len__ called!')
self._getCurrentBucket()
return len(self._ring._index)
security.declareProtected(ACCESS_TRANSIENTS_PERM, 'getLen')
getLen = __len__
security.declareProtected(ACCESS_TRANSIENTS_PERM, 'get')
def get(self, k, default=_marker):
self.lock.acquire()
try:
DEBUG and TLOG('get: called with k=%s' % k)
notfound = []
current = self._getCurrentBucket()
DEBUG and TLOG('get: current is %s' % current)
if default is _marker: default=None
index = self._getIndex()
b = index.get(k, notfound)
if b is notfound:
# it's not here, this is a genuine miss
DEBUG and TLOG('bucket was notfound for %s' %k)
return default
else:
v = self._data[b].get(k, notfound)
if v is notfound:
DEBUG and TLOG(
'get: %s was not found in index bucket (%s)' % (k, b))
return default
elif b != current:
DEBUG and TLOG('get: b was not current, it was %s' %b)
# we accessed the object, so it becomes current
# by moving it to the current bucket
del self._data[b][k] # delete the item from the old bucket.
self._data[current][k] = v # add the value to the current
self._setLastAccessed(v)
index[k] = current # change the index to the current buck.
if hasattr(v, '__of__'):
v = v.__of__(self)
return v
finally:
self.lock.release()
def _setLastAccessed(self, transientObject):
self.lock.acquire()
try:
sla = getattr(transientObject, 'setLastAccessed', None)
if sla is not None: sla()
finally:
self.lock.release()
security.declareProtected(ACCESS_TRANSIENTS_PERM, 'has_key')
def has_key(self, k):
notfound = []
v = self.get(k, notfound)
if v is notfound: return 0
return 1
def values(self):
return map(lambda k, self=self: self[k], self.keys())
......@@ -468,15 +826,58 @@ class TransientObjectContainer(SimpleItem):
def items(self):
return map(lambda k, self=self: (k, self[k]), self.keys())
def true_items(self):
l = []
for bucket in self._data.values():
items = list(bucket.items())
l.extend(items)
return l
def keys(self):
self._getCurrentBucket()
return list(self._ring._index.keys())
index = self._getIndex()
return list(index.keys())
# proxy security declaration
security.declareProtected(ACCESS_TRANSIENTS_PERM, 'getLen')
class Increaser(Persistent):
"""
A persistent object representing a typically increasing integer that
has conflict resolution uses the greatest integer out of the three
available states
"""
def __init__(self, v):
self.value = v
def set(self, v):
self.value = v
def __getstate__(self):
return self.value
def __setstate__(self, v):
self.value = v
def __call__(self):
return self.value
def _p_resolveConflict(self, old, state1, state2):
DEBUG and TLOG('Resolving conflict in Increaser')
if old <= state1 <= state2: return state2
if old <= state2 <= state1: return state1
return old
def _p_independent(self):
return 1
class Ring(Persistent):
""" ring of buckets """
""" ring of buckets. This class is only kept for backwards-compatibility
purposes (Zope 2.5X). """
def __init__(self, l, index):
if not len(l):
raise "ring must have at least one element"
DEBUG and TLOG('initial _ring buckets: %s' % map(oid, l))
self._data = l
self._index = index
......@@ -497,9 +898,4 @@ class Ring(Persistent):
def _p_independent(self):
return 1
# this should really have a _p_resolveConflict, but
# I've not had time to come up with a reasonable one that
# works in every circumstance.
Globals.InitializeClass(TransientObjectContainer)
......@@ -149,8 +149,8 @@ class DictionaryLike(Interface.Base):
def get(k, default='marker'):
"""
Return value associated with key k. If k does not exist and default
is not marker, return default, else raise KeyError.
Return value associated with key k. Return None or default if k
does not exist.
"""
def has_key(k):
......
......@@ -13,10 +13,10 @@
"""
Simple ZODB-based transient object implementation.
$Id: TransientObject.py,v 1.6 2002/06/11 15:19:38 chrism Exp $
$Id: TransientObject.py,v 1.7 2002/06/21 01:51:43 chrism Exp $
"""
__version__='$Revision: 1.6 $'[11:-2]
__version__='$Revision: 1.7 $'[11:-2]
from Persistence import Persistent
from Acquisition import Implicit
......@@ -74,6 +74,13 @@ class TransientObject(Persistent, Implicit):
#
def invalidate(self):
if hasattr(self, '_invalid'):
# we dont want to invalidate twice
return
trans_ob_container = getattr(self, 'aq_parent', None)
if trans_ob_container is not None:
if trans_ob_container.has_key(self.token):
del trans_ob_container[self.token]
self._invalid = None
def isValid(self):
......
import sys, os, time, unittest
if __name__=='__main__':
sys.path.insert(0, '..')
sys.path.insert(0, '../../..')
import ZODB # in order to get Persistence.Persistent working
from Testing import makerequest
import Acquisition
......@@ -74,7 +78,7 @@ class TestLastAccessed(TestBase):
sdo = self.app.sm.new_or_existing('TempObject')
la1 = sdo.getLastAccessed()
fauxtime.sleep(WRITEGRANULARITY + 1)
sdo = self.app.sm['TempObject']
sdo = self.app.sm.get('TempObject')
assert sdo.getLastAccessed() > la1, (sdo.getLastAccessed(), la1)
class TestNotifications(TestBase):
......@@ -90,13 +94,15 @@ class TestNotifications(TestBase):
self.app.sm.setDelNotificationTarget(delNotificationTarget)
sdo = self.app.sm.new_or_existing('TempObject')
timeout = self.timeout * 60
fauxtime.sleep(timeout + (timeout * .33))
try: sdo1 = self.app.sm['TempObject']
except KeyError: pass
fauxtime.sleep(timeout + (timeout * .75))
sdo1 = self.app.sm.get('TempObject')
for x in range(1, 100):
# force the sdm to do housekeeping
self.app.sm._getCurrentBucket()
now = fauxtime.time()
k = sdo.get('endtime')
assert type(k) == type(now)
assert k <= now
assert (type(k) == type(now)), type(k)
assert k <= now, (k, now)
def addNotificationTarget(item, context):
item['starttime'] = fauxtime.time()
......
......@@ -12,6 +12,9 @@
##############################################################################
import sys, os, time, random, unittest
if __name__ == "__main__":
sys.path.insert(0, '../../..')
import ZODB
from Products.Transience.Transience import TransientObjectContainer,\
MaxTransientObjectsExceeded
......@@ -23,7 +26,7 @@ from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
import time as oldtime
import fauxtime
class TestTransientObjectContainer(TestCase):
class TestBase(TestCase):
def setUp(self):
Products.Transience.Transience.time = fauxtime
Products.Transience.TransientObject.time = fauxtime
......@@ -36,6 +39,7 @@ class TestTransientObjectContainer(TestCase):
Products.Transience.Transience.time = oldtime
Products.Transience.TransientObject.time = oldtime
class TestTransientObjectContainer(TestBase):
def testGetItemFails(self):
self.assertRaises(KeyError, self._getitemfail)
......@@ -280,6 +284,36 @@ class TestTransientObjectContainer(TestCase):
except KeyError:
if self.t.has_key(x): assert 1==2,"failed to delete %s" % x
def testChangingTimeoutWorks(self):
# 1 minute
for x in range(10, 110):
self.t[x] = x
fauxtime.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
# 2 minutes
self.t._setTimeout(self.timeout/60*2)
self.t._reset()
for x in range(10, 110):
self.t[x] = x
fauxtime.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
fauxtime.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
# 3 minutes
self.t._setTimeout(self.timeout/60*3)
self.t._reset()
for x in range(10, 110):
self.t[x] = x
fauxtime.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
fauxtime.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
fauxtime.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
def testItemsGetExpired(self):
for x in range(10, 110):
self.t[x] = x
......@@ -326,14 +360,14 @@ class TestTransientObjectContainer(TestCase):
fauxtime.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
def testGetItemDelaysTimeout(self):
def testGetDelaysTimeout(self):
for x in range(10, 110):
self.t[x] = x
# current bucket will become old after we sleep for a while.
fauxtime.sleep(self.timeout/2)
# these items will be added to the new current bucket by getitem
for x in range(10, 110):
self.t[x]
self.t.get(x)
fauxtime.sleep(self.timeout/2)
assert len(self.t.keys()) == 100, len(self.t.keys())
for x in range(10, 110):
......@@ -344,7 +378,7 @@ class TestTransientObjectContainer(TestCase):
self.t[x] = x
# current bucket will become old after we sleep for a while.
fauxtime.sleep(self.timeout/2)
# these items will be added to the new current bucket by getitem
# these items will be added to the new current bucket by setitem
for x in range(10, 110):
self.t[x] = x + 1
fauxtime.sleep(self.timeout/2)
......@@ -352,19 +386,6 @@ class TestTransientObjectContainer(TestCase):
for x in range(10, 110):
assert self.t[x] == x + 1
def testGetDelaysTimeout(self):
for x in range(10, 110):
self.t[x] = x
# current bucket will become old after we sleep for a while.
fauxtime.sleep(self.timeout/2)
# these items will be added to the new current bucket by getitem
for x in range(10, 110):
self.t.get(x)
fauxtime.sleep(self.timeout/2)
assert len(self.t.keys()) == 100, len(self.t.keys())
for x in range(10, 110):
assert self.t[x] == x
def testLen(self):
added = {}
r = range(10, 1010)
......@@ -428,6 +449,7 @@ def lsubtract(l1, l2):
def test_suite():
testsuite = makeSuite(TestTransientObjectContainer, 'test')
#testsuite = makeSuite(TestBase, 'test')
alltests = TestSuite((testsuite,))
return alltests
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment