Commit 91e69b30 authored by Chris McDonough's avatar Chris McDonough

Overhaul how the Application object is initialized (replace long function with...

Overhaul how the Application object is initialized (replace long function with a somewhat structured recipe).� Include tests for initializer functionality.
parent e2deb5bb
......@@ -12,8 +12,8 @@
##############################################################################
__doc__='''Application support
$Id: Application.py,v 1.198 2003/12/11 19:50:27 evan Exp $'''
__version__='$Revision: 1.198 $'[11:-2]
$Id: Application.py,v 1.199 2003/12/20 18:56:05 chrism Exp $'''
__version__='$Revision: 1.199 $'[11:-2]
import Globals,Folder,os,sys,App.Product, App.ProductRegistry, misc_
import time, traceback, os, Products
......@@ -33,6 +33,7 @@ from zExceptions import Redirect as RedirectException, Forbidden
from HelpSys.HelpSys import HelpSys
from Acquisition import aq_base
from App.Product import doInstall
from App.config import getConfiguration
class Application(Globals.ApplicationDefaultPermissions,
ZDOM.Root, Folder.Folder,
......@@ -65,6 +66,8 @@ class Application(Globals.ApplicationDefaultPermissions,
# Set the universal default method to index_html
_object_manager_browser_default_id = 'index_html'
_initializer_registry = None
def title_and_id(self): return self.title
def title_or_id(self): return self.title
......@@ -179,7 +182,7 @@ class Application(Globals.ApplicationDefaultPermissions,
if rebuild:
from BTrees.OOBTree import OOBTree
jar.root()['ZGlobals'] = OOBTree()
result=1
result = 1
zglobals =jar.root()['ZGlobals']
reg_has_key=zglobals.has_key
......@@ -240,6 +243,19 @@ class Application(Globals.ApplicationDefaultPermissions,
return 1
return 0
_setInitializerRegistry__roles__ = ()
def _setInitializerFlag(self, flag):
if self._initializer_registry is None:
self._initializer_registry = {}
self._initializer_registry[flag] = 1
_getInitializerRegistry__roles__ = ()
def _getInitializerFlag(self, flag):
reg = self._initializer_registry
if reg is None:
reg = {}
return reg.get(flag)
class Expired(Globals.Persistent):
icon='p_/broken'
......@@ -258,119 +274,162 @@ class Expired(Globals.Persistent):
__inform_commit__=__save__
def initialize(app):
# Initialize the application
initializer = AppInitializer(app)
initializer.initialize()
# The following items marked b/c are backward compatibility hacks
# which make sure that expected system objects are added to the
# bobobase. This is required because the bobobase in use may pre-
# date the introduction of certain system objects such as those
# which provide Lever support.
class AppInitializer:
""" Initialze an Application object (called at startup) """
def __init__(self, app):
self.app = (app,)
def getApp(self):
# this is probably necessary, but avoid acquisition anyway
return self.app[0]
def commit(self, note):
get_transaction().note(note)
get_transaction().commit()
# b/c: Ensure that Control Panel exists.
def initialize(self):
app = self.getApp()
# make sure to preserve relative ordering of calls below.
self.install_cp_and_products()
self.install_tempfolder_and_sdc()
self.install_session_data_manager()
self.install_browser_id_manager()
self.install_required_roles()
self.install_zglobals()
self.install_inituser()
self.install_errorlog()
self.install_products()
self.install_standards()
self.check_zglobals()
def install_cp_and_products(self):
app = self.getApp()
# Ensure that Control Panel exists.
if not hasattr(app, 'Control_Panel'):
cpl=ApplicationManager()
cpl._init()
app._setObject('Control_Panel', cpl)
get_transaction().note('Added Control_Panel')
get_transaction().commit()
self.commit('Added Control_Panel')
# b/c: Ensure that a ProductFolder exists.
if not hasattr(aq_base(app.Control_Panel), 'Products'):
app.Control_Panel.Products=App.Product.ProductFolder()
get_transaction().note('Added Control_Panel.Products')
get_transaction().commit()
self.commit('Added Control_Panel.Products')
def install_tempfolder_and_sdc(self):
app = self.getApp()
from Products.ZODBMountPoint.MountedObject import manage_addMounts,\
MountedObject
from Products.ZODBMountPoint.MountedObject import getConfiguration as \
getDBTabConfiguration
dbtab_config = getDBTabConfiguration()
# Ensure that a temp folder exists
if not hasattr(app, 'temp_folder'):
from Products.ZODBMountPoint.MountedObject import manage_addMounts
tf = getattr(app, 'temp_folder', None)
if getattr(tf, 'meta_type', None) == MountedObject.meta_type:
# tf is a MountPoint object. This means that the temp_folder
# couldn't be mounted properly (the meta_type would have been
# the meta type of the container class otherwise). The
# MountPoint object writes a message to zLOG so we don't
# need to.
return
if tf is None:
# do nothing if we've already installed one
if not app._getInitializerFlag('temp_folder'):
if dbtab_config is None:
# DefaultConfiguration, do nothing
return
mount_paths = [ x[0] for x in dbtab_config.listMountPaths() ]
if not '/temp_folder' in mount_paths:
# we won't be able to create the mount point properly
LOG('Zope Default Object Creation', ERROR,
('Could not initialze a Temporary Folder because '
'a database was not configured to be mounted at '
'the /temp_folder mount point'))
return
try:
manage_addMounts(app, ('/temp_folder',))
get_transaction().note('Added temp_folder')
get_transaction().commit()
app._setInitializerFlag('temp_folder')
self.commit('Added temp_folder')
tf = app.temp_folder
except:
LOG('Zope Default Object Creation', ERROR,
'Could not add a /temp_folder mount point due to an error.',
('Could not add a /temp_folder mount point due to an '
'error.'),
error=sys.exc_info())
return
# Ensure that there is a transient container in the temp folder
tf = getattr(app, 'temp_folder', None)
if tf is not None and not hasattr(aq_base(tf), 'session_data'):
env_has = os.environ.get
# Ensure that there is a transient object container in the temp folder
config = getConfiguration()
if not hasattr(aq_base(tf), 'session_data'):
from Products.Transience.Transience import TransientObjectContainer
addnotify = env_has('ZSESSION_ADD_NOTIFY', None)
delnotify = env_has('ZSESSION_DEL_NOTIFY', None)
addnotify = getattr(config, 'session_add_notify_script_path', None)
delnotify = getattr(config, 'session_delete_notify_script_path',
None)
default_limit = 1000
limit = env_has('ZSESSION_OBJECT_LIMIT', default_limit)
try:
limit=int(limit)
if limit != default_limit:
LOG('Zope Default Object Creation', INFO,
('using ZSESSION_OBJECT_LIMIT-specified max objects '
'value of %s' % limit))
except ValueError:
LOG('Zope Default Object Creation', WARNING,
('Noninteger value %s specified for ZSESSION_OBJECT_LIMIT, '
'defaulting to %s' % (limit, default_limit)))
limit = default_limit
limit = (getattr(config, 'maximum_number_of_session_objects', None)
or default_limit)
timeout_spec = getattr(config, 'session_timeout_minutes', None)
if addnotify and app.unrestrictedTraverse(addnotify, None) is None:
LOG('Zope Default Object Creation', WARNING,
('failed to use nonexistent "%s" script as '
'ZSESSION_ADD_NOTIFY' % addnotify))
'session-add-notify-script-path' % addnotify))
addnotify=None
elif addnotify:
LOG('Zope Default Object Creation', INFO,
'using %s as add notification script' % addnotify)
if delnotify and app.unrestrictedTraverse(delnotify, None) is None:
LOG('Zope Default Object Creation', WARNING,
('failed to use nonexistent "%s" script as '
'ZSESSION_DEL_NOTIFY' % delnotify))
'session-delete-notify-script-path' % delnotify))
delnotify=None
elif delnotify:
LOG('Zope Default Object Creation', INFO,
'using %s as delete notification script' % delnotify)
toc = TransientObjectContainer('session_data',
'Session Data Container', addNotification = addnotify,
delNotification = delnotify, limit=limit)
timeout_spec = env_has('ZSESSION_TIMEOUT_MINS', '')
toc = TransientObjectContainer(
'session_data', 'Session Data Container',
addNotification = addnotify,
delNotification = delnotify,
limit=limit)
if timeout_spec:
try:
timeout_spec = int(timeout_spec)
except ValueError:
LOG('Zope Default Object Creation', WARNING,
('"%s" is an illegal value for ZSESSION_TIMEOUT_MINS, '
'using default timeout instead.' % timeout_spec))
else:
LOG('Zope Default Object Creation', INFO,
('using ZSESSION_TIMEOUT_MINS-specified session timeout '
'value of %s' % timeout_spec))
toc = TransientObjectContainer('session_data',
'Session Data Container', timeout_mins = timeout_spec,
addNotification=addnotify, delNotification = delnotify,
'Session Data Container',
timeout_mins = timeout_spec,
addNotification = addnotify,
delNotification = delnotify,
limit=limit)
tf._setObject('session_data', toc)
tf_reserved = getattr(tf, '_reserved_names', ())
if 'session_data' not in tf_reserved:
tf._reserved_names = tf_reserved + ('session_data',)
get_transaction().note('Added session_data to temp_folder')
get_transaction().commit()
del toc
del addnotify
del delnotify
del timeout_spec
del env_has
del tf
self.commit('Added session_data to temp_folder')
return tf # return the tempfolder object for test purposes
def install_browser_id_manager(self):
app = self.getApp()
if app._getInitializerFlag('browser_id_manager'):
# do nothing if we've already installed one
return
# Ensure that a browser ID manager exists
if not hasattr(app, 'browser_id_manager'):
from Products.Sessions.BrowserIdManager import BrowserIdManager
bid = BrowserIdManager('browser_id_manager', 'Browser Id Manager')
app._setObject('browser_id_manager', bid)
get_transaction().note('Added browser_id_manager')
get_transaction().commit()
del bid
app._setInitializerFlag('browser_id_manager')
self.commit('Added browser_id_manager')
def install_session_data_manager(self):
app = self.getApp()
if app._getInitializerFlag('session_data_manager'):
# do nothing if we've already installed one
return
# Ensure that a session data manager exists
if not hasattr(app, 'session_data_manager'):
from Products.Sessions.SessionDataManager import SessionDataManager
......@@ -379,56 +438,63 @@ def initialize(app):
path='/temp_folder/session_data',
requestName='SESSION')
app._setObject('session_data_manager', sdm)
get_transaction().note('Added session_data_manager')
get_transaction().commit()
del sdm
app._setInitializerFlag('session_data_manager')
self.commit('Added session_data_manager')
def install_required_roles(self):
app = self.getApp()
# b/c: Ensure that Owner role exists.
# Ensure that Owner role exists.
if hasattr(app, '__ac_roles__') and not ('Owner' in app.__ac_roles__):
app.__ac_roles__=app.__ac_roles__ + ('Owner',)
get_transaction().note('Added Owner role')
get_transaction().commit()
self.commit('Added Owner role')
# ensure the Authenticated role exists.
if hasattr(app, '__ac_roles__'):
if not 'Authenticated' in app.__ac_roles__:
app.__ac_roles__=app.__ac_roles__ + ('Authenticated',)
get_transaction().note('Added Authenticated role')
get_transaction().commit()
self.commit('Added Authenticated role')
def install_zglobals(self):
app = self.getApp()
# Make sure we have Globals
# Make sure we have ZGlobals
root=app._p_jar.root()
if not root.has_key('ZGlobals'):
from BTrees.OOBTree import OOBTree
app._p_jar.root()['ZGlobals'] = OOBTree()
get_transaction().note('Added Globals')
get_transaction().commit()
root['ZGlobals'] = OOBTree()
self.commit('Added ZGlobals')
def install_inituser(self):
app = self.getApp()
# Install the initial user.
if hasattr(app, 'acl_users'):
users = app.acl_users
if hasattr(users, '_createInitialUser'):
app.acl_users._createInitialUser()
get_transaction().note('Created initial user')
get_transaction().commit()
self.commit('Created initial user')
def install_errorlog(self):
app = self.getApp()
if app._getInitializerFlag('error_log'):
# do nothing if we've already installed one
return
# Install an error_log
if not hasattr(app, 'error_log'):
from Products.SiteErrorLog.SiteErrorLog import SiteErrorLog
error_log = SiteErrorLog()
app._setObject('error_log', error_log)
get_transaction().note('Added site error_log at /error_log')
get_transaction().commit()
install_products(app)
install_standards(app)
app._setInitializerFlag('error_log')
self.commit('Added site error_log at /error_log')
# Note that the code from here on only runs if we are not a ZEO
# client, or if we are a ZEO client and we've specified by way
# of env variable that we want to force products to load.
def check_zglobals(self):
if not doInstall():
return
app = self.getApp()
# Check for dangling pointers (broken zclass dependencies) in the
# global class registry. If found, rebuild the registry. Note that
# if the check finds problems but fails to successfully rebuild the
......@@ -464,20 +530,62 @@ def initialize(app):
# App.Product.initializeProduct will set this if a disk-based
# product was added or updated and we are not a ZEO client.
if getattr(Globals, '__disk_product_installed__', 0):
if getattr(Globals, '__disk_product_installed__', None):
try:
LOG('Zope', INFO, 'New disk product detected, determining '\
'if we need to fix up any ZClasses.')
LOG('Zope', INFO,
('New disk product detected, determining if we need '
'to fix up any ZClasses.'))
if app.fixupZClassDependencies():
LOG('Zope', INFO, 'Repaired broken ZClass dependencies.')
get_transaction().commit()
LOG('Zope',INFO,
'Repaired broken ZClass dependencies.')
self.commit('Repaired broked ZClass dependencies')
except:
LOG('Zope', ERROR,
'Attempt to fixup ZClass dependencies after detecting ' \
'an updated disk-based product failed.',
('Attempt to fixup ZClass dependencies after '
'detecting an updated disk-based product failed.'),
error=sys.exc_info())
get_transaction().abort()
def install_products(self):
app = self.getApp()
# this defers to a function for b/c reasons
return install_products(app)
def install_standards(self):
app = self.getApp()
# this defers to a function for b/c reasons
return install_standards(app)
def install_products(app):
# Install a list of products into the basic folder class, so
# that all folders know about top-level objects, aka products
folder_permissions = get_folder_permissions()
meta_types=[]
done={}
debug_mode = App.config.getConfiguration().debug_mode
get_transaction().note('Prior to product installs')
get_transaction().commit()
products = get_products()
for priority, product_name, index, product_dir in products:
# For each product, we will import it and try to call the
# intialize() method in the product __init__ module. If
# the method doesnt exist, we put the old-style information
# together and do a default initialization.
if done.has_key(product_name):
continue
done[product_name]=1
install_product(app, product_dir, product_name, meta_types,
folder_permissions, raise_exc=debug_mode)
Products.meta_types=Products.meta_types+tuple(meta_types)
Globals.default__class_init__(Folder.Folder)
def get_products():
""" Return a list of tuples in the form:
[(priority, dir_name, index, base_dir), ...] for each Product directory
......@@ -555,36 +663,6 @@ def import_product(product_dir, product_name, raise_exc=0, log_exc=1):
exc = None
def install_products(app):
# Install a list of products into the basic folder class, so
# that all folders know about top-level objects, aka products
folder_permissions = get_folder_permissions()
meta_types=[]
done={}
debug_mode = App.config.getConfiguration().debug_mode
get_transaction().note('Prior to product installs')
get_transaction().commit()
products = get_products()
for priority, product_name, index, product_dir in products:
# For each product, we will import it and try to call the
# intialize() method in the product __init__ module. If
# the method doesnt exist, we put the old-style information
# together and do a default initialization.
if done.has_key(product_name):
continue
done[product_name]=1
install_product(app, product_dir, product_name, meta_types,
folder_permissions, raise_exc=debug_mode)
Products.meta_types=Products.meta_types+tuple(meta_types)
Globals.default__class_init__(Folder.Folder)
def get_folder_permissions():
folder_permissions={}
for p in Folder.Folder.__ac_permissions__:
......@@ -687,7 +765,7 @@ def install_product(app, product_dir, product_name, meta_types,
for permission, names in new_permissions:
folder_permissions[permission]=names
new_permissions.sort()
Folder.Folder.__ac_permissions__ = tuple(
Folder.Folder.__ac_permissions__=tuple(
list(Folder.Folder.__ac_permissions__)+new_permissions)
if not doInstall():
......
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import os, sys, unittest, tempfile, cStringIO
import ZODB
from OFS.Application import Application, AppInitializer, get_products
import Zope.Startup
import ZConfig
from App.config import getConfiguration, setConfiguration
TEMPNAME = tempfile.mktemp()
TEMPPRODUCTS = os.path.join(TEMPNAME, "Products")
bad_cfg = """
instancehome <<INSTANCE_HOME>>
<zodb_db main>
mount-point /
<mappingstorage>
name mappingstorage
</mappingstorage>
</zodb_db>
"""
good_cfg = bad_cfg + """
<zodb_db temporary>
# Temporary storage database (for sessions)
<temporarystorage>
name temporary storage for sessioning
</temporarystorage>
mount-point /temp_folder
container-class Products.TemporaryFolder.TemporaryContainer
</zodb_db>
"""
def getSchema():
startup = os.path.dirname(os.path.realpath(Zope.Startup.__file__))
schemafile = os.path.join(startup, 'zopeschema.xml')
return ZConfig.loadSchema(schemafile)
def getApp():
from ZODB.ZApplication import ZApplicationWrapper
DB = getConfiguration().dbtab.getDatabase('/')
return ZApplicationWrapper(DB, 'Application', Application, (), 'foo')()
original_config = None
class TestInitialization( unittest.TestCase ):
""" Test the application initializer object """
def setUp(self):
global original_config
if original_config is None:
original_config = getConfiguration()
self.schema = getSchema()
os.makedirs(TEMPNAME)
os.makedirs(TEMPPRODUCTS)
def tearDown(self):
import App.config
del self.schema
App.config.setConfiguration(original_config)
os.rmdir(TEMPPRODUCTS)
os.rmdir(TEMPNAME)
def configure(self, text):
# We have to create a directory of our own since the existence
# of the directory is checked. This handles this in a
# platform-independent way.
schema = self.schema
sio = cStringIO.StringIO(text.replace("<<INSTANCE_HOME>>", TEMPNAME))
conf, handler = ZConfig.loadConfigFile(schema, sio)
self.assertEqual(conf.instancehome, TEMPNAME)
setConfiguration(conf)
def getOne(self):
app = getApp()
return AppInitializer(app)
def test_install_cp_and_products(self):
self.configure(good_cfg)
i = self.getOne()
app = i.getApp()
i.install_cp_and_products()
self.failUnless(hasattr(app, 'Control_Panel'))
self.assertEqual(app.Control_Panel.meta_type, 'Control Panel')
self.failUnless(hasattr(app.Control_Panel, 'Products'))
self.assertEqual(app.Control_Panel.Products.meta_type,
'Product Management')
def test_install_tempfolder_and_sdc(self):
self.configure(good_cfg)
i = self.getOne()
i.install_tempfolder_and_sdc()
app = i.getApp()
self.assertEqual(app.temp_folder.meta_type, 'Temporary Folder')
self.assertEqual(app.temp_folder.session_data.meta_type,
'Transient Object Container')
self.failUnless(app._getInitializerFlag('temp_folder'))
def test_install_tempfolder_and_sdc_status(self):
self.configure(good_cfg)
i = self.getOne()
status = i.install_tempfolder_and_sdc()
self.failUnless(status)
i = self.getOne()
self.configure(bad_cfg)
status = i.install_tempfolder_and_sdc()
self.failIf(status)
def test_install_browser_id_manager(self):
self.configure(good_cfg)
i = self.getOne()
app = i.getApp()
i.install_browser_id_manager()
self.assertEqual(app.browser_id_manager.meta_type,'Browser Id Manager')
self.failUnless(app._getInitializerFlag('browser_id_manager'))
def test_install_session_data_manager(self):
self.configure(good_cfg)
i = self.getOne()
i.install_session_data_manager()
app = i.getApp()
self.assertEqual(app.session_data_manager.meta_type,
'Session Data Manager')
self.failUnless(app._getInitializerFlag('session_data_manager'))
def test_install_required_roles(self):
self.configure(good_cfg)
i = self.getOne()
i.install_required_roles()
app = i.getApp()
self.failUnless('Owner' in app.__ac_roles__)
self.failUnless('Authenticated' in app.__ac_roles__)
def test_install_zglobals(self):
from BTrees.OOBTree import OOBTree
self.configure(good_cfg)
i = self.getOne()
i.install_zglobals()
root = i.getApp()._p_jar.root()
self.failUnless(root.has_key('ZGlobals'))
self.failUnless(isinstance(root['ZGlobals'], OOBTree))
def test_install_inituser(self):
fname = os.path.join(TEMPNAME, 'inituser')
f = open(fname, 'w')
f.write('theuser:password')
f.close()
try:
self.configure(good_cfg)
i = self.getOne()
i.install_inituser()
app = i.getApp()
self.failUnless(app.acl_users.getUser('theuser'))
finally:
if os.path.exists(fname):
os.unlink(fname)
def test_install_errorlog(self):
self.configure(good_cfg)
i = self.getOne()
i.install_errorlog()
app = i.getApp()
self.assertEqual(app.error_log.meta_type, 'Site Error Log')
self.failUnless(app._getInitializerFlag('error_log'))
def test_install_products(self):
self.configure(good_cfg)
i = self.getOne()
i.install_products()
self.failUnless(Application.misc_.__dict__.has_key('OFSP'))
def test_install_standards(self):
self.configure(good_cfg)
i = self.getOne()
i.install_products() # required
i .install_standards()
app = i.getApp()
self.assertEqual(app.index_html.meta_type, 'DTML Method')
self.assertEqual(app.standard_error_message.meta_type, 'DTML Method')
self.assertEqual(app.standard_html_header.meta_type, 'DTML Method')
self.assertEqual(app.standard_html_footer.meta_type, 'DTML Method')
self.assertEqual(getattr(app, 'standard_template.pt').meta_type,
'Page Template')
self.failUnless(hasattr(app, '_standard_objects_have_been_added'))
def test_suite():
suite = unittest.TestSuite()
suite.addTest( unittest.makeSuite( TestInitialization ) )
return suite
def main():
unittest.main(defaultTest='test_suite')
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment