Commit 0e5dee20 authored by Guido van Rossum's avatar Guido van Rossum

StorageConfig and StorageTypes are dead. Long Live Schema.

parent 6afaf1fa
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Higher-level support for configuring storages.
Storages are configured a la DBTab.
A storage section has the form
<Storage Name (dependent)>
# For example
type FileStorage
file_name var/Data.fs
read_only 1
</Storage>
where Name and (dependent) are optional. Once you have retrieved the
section object (probably with getSection("Storage", name), the
function creatStorage() in this module will create the storage object
for you.
"""
from StorageTypes import storage_types
def createStorage(section):
"""Create a storage specified by a configuration section."""
klass, args = getStorageInfo(section)
return klass(**args)
def getStorageInfo(section):
"""Extract a storage description from a configuration section.
Return a tuple (klass, args) where klass is the storage class and
args is a dictionary of keyword arguments. To create the storage,
call klass(**args).
Adapted from DatabaseFactory.setStorageParams() in DBTab.py.
"""
type = section.get("type")
if not type:
raise RuntimeError, "A storage type is required"
module = None
pos = type.rfind(".")
if pos >= 0:
# Specified the module
module, type = type[:pos], type[pos+1:]
converter = None
if not module:
# Use a default module and argument converter.
info = storage_types.get(type)
if not info:
raise RuntimeError, "Unknown storage type: %s" % type
module, converter = info
m = __import__(module, {}, {}, [type])
klass = getattr(m, type)
args = {}
if section.name:
args["name"] = section.name
for key in section.keys():
if key.lower() != "type":
args[key] = section.get(key)
if converter is not None:
args = converter(**args)
return (klass, args)
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Default storage types.
Adapted from DBTab/StorageTypes.py.
"""
import re
from ZConfig.Config import asBoolean
def convertFileStorageArgs(quota=None, stop=None, **kw):
if kw.has_key('name'):
# FileStorage doesn't accept a 'name' arg
del kw['name']
if quota is not None:
kw['quota'] = long(quota) or None
if stop is not None:
stop = long(stop)
if not stop:
stop = None
else:
from ZODB.utils import p64
stop = p64(stop)
kw['stop'] = stop
# Boolean args
for name in (
'create', 'read_only'
):
if kw.has_key(name):
kw[name] = asBoolean(kw[name])
return kw
# Match URLs of the form 'zeo://zope.example.com:1234'
zeo_url_re = re.compile('zeo:/*(?P<host>[A-Za-z0-9\.-]+):(?P<port>[0-9]+)')
def convertAddresses(s):
# Allow multiple addresses using semicolons as a split character.
res = []
for a in s.split(';'):
a = a.strip()
if a:
mo = zeo_url_re.match(a)
if mo is not None:
# ZEO URL
host, port = mo.groups()
res.append((host, int(port)))
else:
# Socket file
res.append(a)
return res
def convertClientStorageArgs(addr=None, **kw):
if addr is None:
raise RuntimeError, 'An addr parameter is required for ClientStorage.'
kw['addr'] = convertAddresses(addr)
# Integer args
for name in (
'cache_size', 'min_disconnect_poll', 'max_disconnect_poll',
):
if kw.has_key(name):
kw[name] = int(kw[name])
# Boolean args
for name in (
'wait', 'read_only', 'read_only_fallback',
):
if kw.has_key(name):
kw[name] = asBoolean(kw[name])
# The 'client' parameter must be None to be false. Yuck.
if kw.has_key('client') and not kw['client']:
kw['client'] = None
return kw
def convertBDBStorageArgs(**kw):
from BDBStorage.BerkeleyBase import BerkeleyConfig
config = BerkeleyConfig()
for name in dir(BerkeleyConfig):
if name.startswith('_'):
continue
val = kw.get(name)
if val is not None:
if name != 'logdir':
val = int(val)
setattr(config, name, val)
del kw[name]
# XXX: Nobody ever passes in env
assert not kw.has_key('env')
kw['config'] = config
return kw
storage_types = {
'FileStorage': ('ZODB.FileStorage', convertFileStorageArgs),
'DemoStorage': ('ZODB.DemoStorage', None),
'MappingStorage': ('ZODB.MappingStorage', None),
'TemporaryStorage': ('Products.TemporaryFolder.TemporaryStorage', None),
'ClientStorage': ('ZEO.ClientStorage', convertClientStorageArgs),
'BDBFullStorage': ('BDBStorage.BDBFullStorage', convertBDBStorageArgs),
'BDBMinimalStorage': ('BDBStorage.BDBMinimalStorage',
convertBDBStorageArgs),
}
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import os
import shutil
import tempfile
import unittest
from StringIO import StringIO
import ZConfig.Context
from ZODB import StorageConfig
class StorageTestBase(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.tmpfn = tempfile.mktemp()
self.storage = None
def tearDown(self):
unittest.TestCase.tearDown(self)
storage = self.storage
self.storage = None
try:
if storage is not None:
storage.close()
except:
pass
try:
# BDBFullStorage storage creates a directory
if os.path.isdir(self.tmpfn):
shutil.rmtree(self.tmpfn)
else:
os.remove(self.tmpfn)
except os.error:
pass
def loadConfigText(self, text):
context = ZConfig.Context.Context()
io = StringIO(text)
return context.loadFile(io)
class StorageTestCase(StorageTestBase):
def testFileStorage(self):
from ZODB.FileStorage import FileStorage
sample = """
<Storage>
type FileStorage
file_name %s
create yes
</Storage>
""" % self.tmpfn
rootconf = self.loadConfigText(sample)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, FileStorage)
self.assertEqual(args, {"file_name": self.tmpfn, "create": 1})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, FileStorage))
def testZEOStorage(self):
from ZEO.ClientStorage import ClientStorage
sample = """
<Storage>
type ClientStorage
addr zeo://www.python.org:9001
wait no
</Storage>
"""
rootconf = self.loadConfigText(sample)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, ClientStorage)
self.assertEqual(args, {"addr": [("www.python.org", 9001)], "wait": 0})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, ClientStorage))
def testDemoStorage(self):
from ZODB.DemoStorage import DemoStorage
sample = """
<Storage>
type DemoStorage
</Storage>
"""
rootconf = self.loadConfigText(sample)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, DemoStorage)
self.assertEqual(args, {})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, DemoStorage))
def testModuleStorage(self):
# Test explicit module+class
from ZODB.DemoStorage import DemoStorage
sample = """
<Storage>
type ZODB.DemoStorage.DemoStorage
</Storage>
"""
rootconf = self.loadConfigText(sample)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, DemoStorage)
self.assertEqual(args, {})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, DemoStorage))
class BDBStorageTests(StorageTestBase):
def testFullStorage(self):
try:
from BDBStorage.BDBFullStorage import BDBFullStorage
except ImportError:
return
sample = """
<Storage>
type BDBFullStorage
name %s
cachesize 1000
</Storage>
""" % self.tmpfn
os.mkdir(self.tmpfn)
rootconf = self.loadConfigText(sample)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, BDBFullStorage)
# It's too hard to test the config instance equality
args = args.copy()
del args['config']
self.assertEqual(args, {"name": self.tmpfn})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, BDBFullStorage))
# XXX _config isn't public
self.assert_(self.storage._config.cachesize, 1000)
def testMinimalStorage(self):
try:
from BDBStorage.BDBMinimalStorage import BDBMinimalStorage
except ImportError:
return
sample = """
<Storage>
type BDBMinimalStorage
name %s
cachesize 1000
</Storage>
""" % self.tmpfn
os.mkdir(self.tmpfn)
rootconf = self.loadConfigText(sample)
storageconf = rootconf.getSection("Storage")
cls, args = StorageConfig.getStorageInfo(storageconf)
self.assertEqual(cls, BDBMinimalStorage)
# It's too hard to test the config instance equality
args = args.copy()
del args['config']
self.assertEqual(args, {"name": self.tmpfn})
self.storage = StorageConfig.createStorage(storageconf)
self.assert_(isinstance(self.storage, BDBMinimalStorage))
# XXX _config isn't public
self.assert_(self.storage._config.cachesize, 1000)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(StorageTestCase))
import BDBStorage
if BDBStorage.is_available:
suite.addTest(unittest.makeSuite(BDBStorageTests))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment