Commit 3614ee21 authored by Tres Seaver's avatar Tres Seaver

Merge from 2.12 branch.

parent 565bc25a
...@@ -16,6 +16,7 @@ from Products.Sessions.interfaces import BrowserIdManagerErr #BBB ...@@ -16,6 +16,7 @@ from Products.Sessions.interfaces import BrowserIdManagerErr #BBB
from Products.Sessions.interfaces import SessionDataManagerErr #BBB from Products.Sessions.interfaces import SessionDataManagerErr #BBB
def initialize(context): def initialize(context):
import BrowserIdManager import BrowserIdManager
import SessionDataManager import SessionDataManager
...@@ -48,9 +49,11 @@ def initialize(context): ...@@ -48,9 +49,11 @@ def initialize(context):
security = ModuleSecurityInfo('Products') security = ModuleSecurityInfo('Products')
security.declarePublic('Sessions') security.declarePublic('Sessions')
security.declarePublic('Transience') security.declarePublic('Transience')
security = ModuleSecurityInfo('Products.Sessions.interfaces') security = ModuleSecurityInfo('Products.Sessions.interfaces')
security.declareObjectPublic() security.declareObjectPublic()
security.setDefaultAccess('allow') security.setDefaultAccess('allow')
security = ModuleSecurityInfo('Products.Transience') security = ModuleSecurityInfo('Products.Transience')
security.declarePublic('MaxTransientObjectsExceeded') security.declarePublic('MaxTransientObjectsExceeded')
......
...@@ -10,30 +10,7 @@ ...@@ -10,30 +10,7 @@
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
import sys, os, time import unittest
from Testing import makerequest
import ZODB
from ZODB.POSException import InvalidObjectReference, ConflictError
from Persistence import Persistent
from ZODB.DemoStorage import DemoStorage
import transaction
from OFS.DTMLMethod import DTMLMethod
import Acquisition
from Acquisition import aq_base
from Products.Sessions.BrowserIdManager import BrowserIdManager
from Products.Sessions.SessionDataManager import \
SessionDataManager, SessionDataManagerErr, SessionDataManagerTraverser
from Products.Transience.Transience import \
TransientObjectContainer, TransientObject
from Products.TemporaryFolder.TemporaryFolder import MountedTemporaryFolder
from DateTime import DateTime
from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
import time, threading
from cPickle import UnpickleableError
from OFS.Application import Application
from ZPublisher.BeforeTraverse import registerBeforeTraverse, \
unregisterBeforeTraverse
tf_name = 'temp_folder' tf_name = 'temp_folder'
idmgr_name = 'browser_id_manager' idmgr_name = 'browser_id_manager'
...@@ -42,11 +19,16 @@ sdm_name = 'session_data_manager' ...@@ -42,11 +19,16 @@ sdm_name = 'session_data_manager'
stuff = {} stuff = {}
def _getDB(): def _getDB():
from OFS.Application import Application
import transaction
db = stuff.get('db') db = stuff.get('db')
if not db: if not db:
from ZODB import DB
from ZODB.DemoStorage import DemoStorage
ds = DemoStorage() ds = DemoStorage()
db = ZODB.DB(ds, pool_size=60) db = DB(ds, pool_size=60)
conn = db.open() conn = db.open()
root = conn.root() root = conn.root()
app = Application() app = Application()
...@@ -57,17 +39,20 @@ def _getDB(): ...@@ -57,17 +39,20 @@ def _getDB():
conn.close() conn.close()
return db return db
def _delDB(): def _delDB():
import transaction
transaction.abort() transaction.abort()
del stuff['db'] del stuff['db']
class DummyAqImplicit(Acquisition.Implicit):
pass
class DummyPersistent(Persistent):
pass
def _populate(app): def _populate(app):
from OFS.DTMLMethod import DTMLMethod
from Products.Sessions.BrowserIdManager import BrowserIdManager
from Products.Sessions.SessionDataManager import SessionDataManager
from Products.TemporaryFolder.TemporaryFolder import MountedTemporaryFolder
from Products.Transience.Transience import TransientObjectContainer
import transaction
bidmgr = BrowserIdManager(idmgr_name) bidmgr = BrowserIdManager(idmgr_name)
tf = MountedTemporaryFolder(tf_name, title="Temporary Folder") tf = MountedTemporaryFolder(tf_name, title="Temporary Folder")
toc = TransientObjectContainer(toc_name, title='Temporary ' toc = TransientObjectContainer(toc_name, title='Temporary '
...@@ -76,17 +61,25 @@ def _populate(app): ...@@ -76,17 +61,25 @@ def _populate(app):
path='/'+tf_name+'/'+toc_name, title='Session Data Manager', path='/'+tf_name+'/'+toc_name, title='Session Data Manager',
requestName='TESTOFSESSION') requestName='TESTOFSESSION')
try: app._delObject(idmgr_name) try:
except (AttributeError, KeyError): pass app._delObject(idmgr_name)
except (AttributeError, KeyError):
pass
try: app._delObject(tf_name) try:
except (AttributeError, KeyError): pass app._delObject(tf_name)
except (AttributeError, KeyError):
pass
try: app._delObject(sdm_name) try:
except (AttributeError, KeyError): pass app._delObject(sdm_name)
except (AttributeError, KeyError):
pass
try: app._delObject('index_html') try:
except (AttributeError, KeyError): pass app._delObject('index_html')
except (AttributeError, KeyError):
pass
app._setObject(idmgr_name, bidmgr) app._setObject(idmgr_name, bidmgr)
...@@ -102,8 +95,11 @@ def _populate(app): ...@@ -102,8 +95,11 @@ def _populate(app):
app._setObject('index_html', DTMLMethod('', __name__='foo')) app._setObject('index_html', DTMLMethod('', __name__='foo'))
transaction.commit() transaction.commit()
class TestBase(TestCase):
class TestSessionManager(unittest.TestCase):
def setUp(self): def setUp(self):
from Testing import makerequest
db = _getDB() db = _getDB()
conn = db.open() conn = db.open()
root = conn.root() root = conn.root()
...@@ -114,7 +110,6 @@ class TestBase(TestCase): ...@@ -114,7 +110,6 @@ class TestBase(TestCase):
_delDB() _delDB()
del self.app del self.app
class TestSessionManager(TestBase):
def testHasId(self): def testHasId(self):
self.failUnless(self.app.session_data_manager.id == \ self.failUnless(self.app.session_data_manager.id == \
sdm_name) sdm_name)
...@@ -128,6 +123,7 @@ class TestSessionManager(TestBase): ...@@ -128,6 +123,7 @@ class TestSessionManager(TestBase):
self.failUnless(sd is None) self.failUnless(sd is None)
def testGetSessionDataCreate(self): def testGetSessionDataCreate(self):
from Products.Transience.Transience import TransientObject
sd = self.app.session_data_manager.getSessionData(1) sd = self.app.session_data_manager.getSessionData(1)
self.failUnless(sd.__class__ is TransientObject) self.failUnless(sd.__class__ is TransientObject)
...@@ -139,6 +135,7 @@ class TestSessionManager(TestBase): ...@@ -139,6 +135,7 @@ class TestSessionManager(TestBase):
self.failUnless(not self.app.session_data_manager.hasSessionData()) self.failUnless(not self.app.session_data_manager.hasSessionData())
def testSessionDataWrappedInSDMandTOC(self): def testSessionDataWrappedInSDMandTOC(self):
from Acquisition import aq_base
sd = self.app.session_data_manager.getSessionData(1) sd = self.app.session_data_manager.getSessionData(1)
sdm = aq_base(getattr(self.app, sdm_name)) sdm = aq_base(getattr(self.app, sdm_name))
toc = aq_base(getattr(self.app.temp_folder, toc_name)) toc = aq_base(getattr(self.app.temp_folder, toc_name))
...@@ -147,6 +144,8 @@ class TestSessionManager(TestBase): ...@@ -147,6 +144,8 @@ class TestSessionManager(TestBase):
self.failUnless(aq_base(sd.aq_parent.aq_parent) is toc) self.failUnless(aq_base(sd.aq_parent.aq_parent) is toc)
def testNewSessionDataObjectIsValid(self): def testNewSessionDataObjectIsValid(self):
from Acquisition import aq_base
from Products.Transience.Transience import TransientObject
sdType = type(TransientObject(1)) sdType = type(TransientObject(1))
sd = self.app.session_data_manager.getSessionData() sd = self.app.session_data_manager.getSessionData()
self.failUnless(type(aq_base(sd)) is sdType) self.failUnless(type(aq_base(sd)) is sdType)
...@@ -165,6 +164,7 @@ class TestSessionManager(TestBase): ...@@ -165,6 +164,7 @@ class TestSessionManager(TestBase):
self.failUnless(sd == bykeysd) self.failUnless(sd == bykeysd)
def testBadExternalSDCPath(self): def testBadExternalSDCPath(self):
from Products.Sessions.SessionDataManager import SessionDataManagerErr
sdm = self.app.session_data_manager sdm = self.app.session_data_manager
# fake out webdav # fake out webdav
sdm.REQUEST['REQUEST_METHOD'] = 'GET' sdm.REQUEST['REQUEST_METHOD'] = 'GET'
...@@ -182,6 +182,7 @@ class TestSessionManager(TestBase): ...@@ -182,6 +182,7 @@ class TestSessionManager(TestBase):
self.failUnless(not sdm.getSessionData().has_key('test')) self.failUnless(not sdm.getSessionData().has_key('test'))
def testGhostUnghostSessionManager(self): def testGhostUnghostSessionManager(self):
import transaction
sdm = self.app.session_data_manager sdm = self.app.session_data_manager
transaction.commit() transaction.commit()
sd = sdm.getSessionData() sd = sdm.getSessionData()
...@@ -191,6 +192,11 @@ class TestSessionManager(TestBase): ...@@ -191,6 +192,11 @@ class TestSessionManager(TestBase):
self.failUnless(sdm.getSessionData().get('foo') == 'bar') self.failUnless(sdm.getSessionData().get('foo') == 'bar')
def testSubcommitAssignsPJar(self): def testSubcommitAssignsPJar(self):
global DummyPersistent # so pickle can find it
from Persistence import Persistent
import transaction
class DummyPersistent(Persistent):
pass
sd = self.app.session_data_manager.getSessionData() sd = self.app.session_data_manager.getSessionData()
dummy = DummyPersistent() dummy = DummyPersistent()
sd.set('dp', dummy) sd.set('dp', dummy)
...@@ -199,9 +205,11 @@ class TestSessionManager(TestBase): ...@@ -199,9 +205,11 @@ class TestSessionManager(TestBase):
self.failIf(sd['dp']._p_jar is None) self.failIf(sd['dp']._p_jar is None)
def testForeignObject(self): def testForeignObject(self):
from ZODB.POSException import InvalidObjectReference
self.assertRaises(InvalidObjectReference, self._foreignAdd) self.assertRaises(InvalidObjectReference, self._foreignAdd)
def _foreignAdd(self): def _foreignAdd(self):
import transaction
ob = self.app.session_data_manager ob = self.app.session_data_manager
# we don't want to fail due to an acquisition wrapper # we don't want to fail due to an acquisition wrapper
...@@ -213,6 +221,11 @@ class TestSessionManager(TestBase): ...@@ -213,6 +221,11 @@ class TestSessionManager(TestBase):
transaction.commit() transaction.commit()
def testAqWrappedObjectsFail(self): def testAqWrappedObjectsFail(self):
from Acquisition import Implicit
import transaction
class DummyAqImplicit(Implicit):
pass
a = DummyAqImplicit() a = DummyAqImplicit()
b = DummyAqImplicit() b = DummyAqImplicit()
aq_wrapped = a.__of__(b) aq_wrapped = a.__of__(b)
...@@ -227,6 +240,8 @@ class TestSessionManager(TestBase): ...@@ -227,6 +240,8 @@ class TestSessionManager(TestBase):
self.failUnless(self.app.REQUEST.has_key('TESTOFSESSION')) self.failUnless(self.app.REQUEST.has_key('TESTOFSESSION'))
def testUnlazifyAutoPopulated(self): def testUnlazifyAutoPopulated(self):
from Acquisition import aq_base
from Products.Transience.Transience import TransientObject
self.app.REQUEST['PARENTS'] = [self.app] self.app.REQUEST['PARENTS'] = [self.app]
self.app.REQUEST['URL'] = 'a' self.app.REQUEST['URL'] = 'a'
self.app.REQUEST.traverse('/') self.app.REQUEST.traverse('/')
...@@ -234,11 +249,8 @@ class TestSessionManager(TestBase): ...@@ -234,11 +249,8 @@ class TestSessionManager(TestBase):
sdType = type(TransientObject(1)) sdType = type(TransientObject(1))
self.failUnless(type(aq_base(sess)) is sdType) self.failUnless(type(aq_base(sess)) is sdType)
def test_suite():
test_datamgr = makeSuite(TestSessionManager, 'test')
suite = TestSuite((test_datamgr,))
return suite
if __name__ == '__main__': def test_suite():
runner = TextTestRunner(verbosity=9, descriptions=9) return unittest.TestSuite((
runner.run(test_suite()) unittest.makeSuite(TestSessionManager),
))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment