Commit 2e9d01ea authored by Hanno Schlichting's avatar Hanno Schlichting

flake8 Products/Shared/ZTUtils.

parent dad37b5e
......@@ -36,10 +36,10 @@ class ViewPageTemplateFileTests(unittest.TestCase):
def test_pt_getEngine(self):
from zope.tales.expressions import DeferExpr
from zope.tales.expressions import LazyExpr
from zope.tales.expressions import NotExpr
from zope.tales.pythonexpr import PythonExpr
from zope.contentprovider.tales import TALESProviderExpression
from Products.PageTemplates.DeferExpr import LazyExpr
from Products.PageTemplates.Expressions import TrustedZopePathExpr
from Products.PageTemplates.Expressions import SecureModuleImporter
from Products.PageTemplates.Expressions import UnicodeAwareStringExpr
......
......@@ -11,6 +11,13 @@
#
##############################################################################
# BBB
from zope.tales.expressions import DeferWrapper, DeferExpr
from zope.tales.expressions import LazyWrapper, LazyExpr
from zope.deferredimport import deprecated
# BBB Zope 5.0
deprecated(
'Please import from zope.tales.expressions.',
DeferExpr='zope.tales.expressions:DeferExpr',
DeferWrapper='zope.tales.expressions:DeferWrapper',
LazyExpr='zope.tales.expressions:LazyExpr',
LazyWrapper='zope.tales.expressions:LazyWrapper',
)
......@@ -17,6 +17,7 @@ for Python expressions, string literals, and paths.
"""
import logging
import sys
from zope.component import queryUtility
from zope.i18n import translate
......@@ -46,6 +47,10 @@ from zope.contentprovider.tales import TALESProviderExpression
from Products.PageTemplates import ZRPythonExpr
from Products.PageTemplates.interfaces import IUnicodeEncodingConflictResolver
if sys.version_info >= (3, 0):
basestring = str
unicode = str
SecureModuleImporter = ZRPythonExpr._SecureModuleImporter()
LOG = logging.getLogger('Expressions')
......@@ -57,6 +62,7 @@ LOG = logging.getLogger('Expressions')
# Path Expression uses them
ZopeUndefs = Undefs + (NotFound, Unauthorized)
def boboAwareZopeTraverse(object, path_items, econtext):
"""Traverses a sequence of names, first trying attributes then items.
......@@ -77,6 +83,7 @@ def boboAwareZopeTraverse(object, path_items, econtext):
request=request)
return object
def trustedBoboAwareZopeTraverse(object, path_items, econtext):
"""Traverses a sequence of names, first trying attributes then items.
......@@ -97,6 +104,7 @@ def trustedBoboAwareZopeTraverse(object, path_items, econtext):
request=request)
return object
def render(ob, ns):
"""Calls the object, possibly a document template, or just returns
it if not callable. (From DT_Util.py)
......@@ -115,11 +123,12 @@ def render(ob, ns):
ob = ZRPythonExpr.call_with_ns(ob, ns, 2)
else:
ob = ob()
except AttributeError, n:
except AttributeError as n:
if str(n) != '__call__':
raise
return ob
class ZopePathExpr(PathExpr):
_TRAVERSER = staticmethod(boboAwareZopeTraverse)
......@@ -138,7 +147,7 @@ class ZopePathExpr(PathExpr):
# Try all but the last subexpression, skipping undefined ones.
try:
ob = expr(econtext)
except ZopeUndefs: # use Zope 2 expression types
except ZopeUndefs: # use Zope 2 expression types
pass
else:
break
......@@ -160,15 +169,17 @@ class ZopePathExpr(PathExpr):
for expr in self._subexprs:
try:
expr(econtext)
except ZopeUndefs: # use Zope 2 expression types
except ZopeUndefs: # use Zope 2 expression types
pass
else:
return 1
return 0
class TrustedZopePathExpr(ZopePathExpr):
_TRAVERSER = staticmethod(trustedBoboAwareZopeTraverse)
class SafeMapping(MultiMapping):
"""Mapping with security declarations and limited method exposure.
......@@ -183,6 +194,7 @@ class SafeMapping(MultiMapping):
_push = MultiMapping.push
_pop = MultiMapping.pop
class ZopeContext(Context):
def __init__(self, engine, contexts):
......@@ -248,17 +260,16 @@ class ZopeContext(Context):
try:
return resolver.resolve(self.contexts['context'], text, expr)
except UnicodeDecodeError,e:
LOG.error("""UnicodeDecodeError detected for expression "%s"\n"""
"""Resolver class: %s\n"""
"""Exception text: %s\n"""
"""Template: %s\n"""
"""Rendered text: %r""" % \
(expr, resolver.__class__, e,
self.contexts['template'].absolute_url(1), text))
raise
except UnicodeDecodeError as e:
LOG.error("UnicodeDecodeError detected for expression \"%s\"\n"
"Resolver class: %s\n"
"Exception text: %s\n"
"Template: %s\n"
"Rendered text: %r" %
(expr, resolver.__class__, e,
self.contexts['template'].absolute_url(1), text))
raise
else:
# This is a weird culprit ...calling unicode() on non-string
# objects
return unicode(text)
......@@ -276,15 +287,18 @@ class ZopeContext(Context):
"""
raise NotImplementedError
class ErrorInfo(BaseErrorInfo):
"""Information about an exception passed to an on-error handler.
"""
__allow_access_to_unprotected_subobjects__ = True
class ZopeEngine(Z3Engine):
_create_context = ZopeContext
class ZopeIterator(Iterator):
# allow iterator API to be accessed from (restricted) Python TALES
......@@ -338,6 +352,7 @@ class ZopeIterator(Iterator):
self._last_item = self.item
return super(ZopeIterator, self).next()
class PathIterator(ZopeIterator):
"""A TALES Iterator with the ability to use first() and last() on
subpaths of elements."""
......@@ -370,6 +385,7 @@ class PathIterator(ZopeIterator):
return False
return ob1 == ob2
class UnicodeAwareStringExpr(StringExpr):
def __call__(self, econtext):
......@@ -384,6 +400,7 @@ class UnicodeAwareStringExpr(StringExpr):
vvals.append(v)
return self._expr % tuple(vvals)
def createZopeEngine(zpe=ZopePathExpr):
e = ZopeEngine()
e.iteratorFactory = PathIterator
......@@ -398,6 +415,7 @@ def createZopeEngine(zpe=ZopePathExpr):
e.registerBaseName('modules', SecureModuleImporter)
return e
def createTrustedZopeEngine():
# same as createZopeEngine, but use non-restricted Python
# expression evaluator
......@@ -406,5 +424,7 @@ def createTrustedZopeEngine():
return e
_engine = createZopeEngine()
def getEngine():
return _engine
Page Template history
This file contains change information for previous versions of
PageTemplates. Change information for the current release can be found
in the file CHANGES.txt.
Version 1.4.1
Bugs Fixed
- Tracebacks were often truncated.
- __bobo_traverse__ objects, such as the root, triggered
security incorrectly when traversed.
- If a PageTemplate was owned by a missing user, or one with
insufficient permissions, the editing form broke.
- PageTemplateFiles didn't bind 'user'.
- There was no help.
Version 1.4.0
Features Added
- ZPTs are now cache-enabled
- Added property sheet to ZPT
Bugs Fixed
- Expressions with embedded newlines were broken
- History comparison tried to expand macros
- Iterator exceptions weren't converted
- 'Unauthorized' exception couldn't be handled by on-error
Version 1.3.3
Features Added
- Allow any false value in tal:repeat to act as an empty sequence.
- manage_addPageTemplate accepts optional title and text
arguments, and returns the new object if REQUEST is None.
Bugs Fixed
- The various *Python modules failed to import CompilerError.
- Security machinery changes in Zope 2.4 broke ZRPythonExpr
Version 1.3.2
Features Added
- Adopted Zope-style CHANGES.txt and HISTORY.txt
- Improved execution performance
- nocall: paths are back in.
Bugs Fixed
- TALES expressions let any string exception through, not just
Redirect and Unauthorized.
Version 1.3.1
Features Added
- Added error logging to PageTemplateFiles.
- Refactored PythonExpr, and added support for Zope 2.4
Version 1.3.0
Features Added
- New builtin variables 'default', 'user', and 'attrs'.
- Removed path modifiers.
- Added '|' operator for paths.
- Tweaked parameters passed when calling DTML.
- Expression types now have corresponding builtin functions in
Python expressions.
Version 1.2.1
Bug Fixed
- 'repeat' variable access was broken.
Version 1.2.0
Features Added
- Depends on the new ZTUtils package, which adds batching and
tree widget capabilities.
- Path expressions now have optional path modifiers. These
appear in parenthesis before the path, and include 'if',
'exists', and 'nocall'.
- Changed nocall: and exists: expressions types into path modifiers.
- The 'if' path modifier can cancel any TAL action.
Version 1.1.0
Features Added
- Changed tests to match TAL's omitted attributes.
Version 1.0.0
- Various minor bugs fixed
Version 1.0.0b1
- All functionality described in the Project Wiki is implemented
......@@ -54,10 +54,11 @@ class PageTemplate(ExtensionClass.Base,
def pt_macros(self):
self._cook_check()
if self._v_errors:
__traceback_supplement__ = (PageTemplateTracebackSupplement, self, {})
raise PTRuntimeError, (
__traceback_supplement__ = (
PageTemplateTracebackSupplement, self, {})
raise PTRuntimeError(
'Page Template %s has errors: %s' % (
self.id, self._v_errors
self.id, self._v_errors
))
return self._v_macros
......@@ -75,9 +76,9 @@ class PageTemplate(ExtensionClass.Base,
showtal = sourceAnnotations = False
if source:
showtal = True
return super(PageTemplate, self).pt_render(c, source=source, sourceAnnotations=sourceAnnotations,
showtal=showtal)
return super(PageTemplate, self).pt_render(
c, source=source, sourceAnnotations=sourceAnnotations,
showtal=showtal)
def pt_errors(self, namespace={}, check_macro_expansion=None):
# The check_macro_expansion argument is added for
......@@ -93,7 +94,7 @@ class PageTemplate(ExtensionClass.Base,
return ('Macro expansion failed', '%s: %s' % sys.exc_info()[:2])
def __call__(self, *args, **kwargs):
if not kwargs.has_key('args'):
if 'args' not in kwargs:
kwargs['args'] = args
return self.pt_render(extra_context={'options': kwargs})
......@@ -107,14 +108,14 @@ class PageTemplate(ExtensionClass.Base,
except:
return ('%s\n Macro expansion failed\n %s\n-->\n%s' %
(self._error_start, "%s: %s" % sys.exc_info()[:2],
self._text) )
self._text))
return ('%s\n %s\n-->\n%s' % (self._error_start,
'\n '.join(self._v_errors),
self._text))
# convenience method for the ZMI which allows to explicitly
# specify the HTMLness of a template. The old Zope 2
# specify the HTMLness of a template. The old Zope 2
# implementation had this as well, but arguably on the wrong class
# (this should be a ZopePageTemplate thing if at all)
def html(self):
......
......@@ -28,12 +28,15 @@ from Products.PageTemplates.PageTemplate import PageTemplate
from Shared.DC.Scripts.Script import Script
from Shared.DC.Scripts.Signature import FuncCode
from zope.contenttype import guess_content_type
from zope.pagetemplate.pagetemplatefile import sniff_type
from zope.pagetemplate.pagetemplatefile import (
sniff_type,
XML_PREFIX_MAX_LENGTH,
)
LOG = getLogger('PageTemplateFile')
def guess_type(filename, text):
def guess_type(filename, text):
# check for XML ourself since guess_content_type can't
# detect text/xml if 'filename' won't end with .xml
# XXX: fix this in zope.contenttype
......@@ -46,6 +49,7 @@ def guess_type(filename, text):
return content_type
return sniff_type(text) or 'text/html'
class PageTemplateFile(SimpleItem, Script, PageTemplate, Traversable):
"""Zope 2 implementation of a PageTemplate loaded from a file."""
......@@ -61,8 +65,8 @@ class PageTemplateFile(SimpleItem, Script, PageTemplate, Traversable):
_default_bindings = {'name_subpath': 'traverse_subpath'}
security = ClassSecurityInfo()
security.declareProtected('View management screens',
'read', 'document_src')
security.declareProtected(
'View management screens', 'read', 'document_src')
def __init__(self, filename, _prefix=None, **kw):
name = kw.pop('__name__', None)
......@@ -114,7 +118,7 @@ class PageTemplateFile(SimpleItem, Script, PageTemplate, Traversable):
request = aq_get(self, 'REQUEST', None)
if request is not None:
response = request.response
if not response.headers.has_key('content-type'):
if 'content-type' not in response.headers:
response.setHeader('content-type', self.content_type)
# Execute the template in a new security context.
......@@ -199,24 +203,7 @@ class PageTemplateFile(SimpleItem, Script, PageTemplate, Traversable):
def __getstate__(self):
from ZODB.POSException import StorageError
raise StorageError, ("Instance of AntiPersistent class %s "
"cannot be stored." % self.__class__.__name__)
raise StorageError("Instance of AntiPersistent class %s "
"cannot be stored." % self.__class__.__name__)
InitializeClass(PageTemplateFile)
XML_PREFIXES = [
"<?xml", # ascii, utf-8
"\xef\xbb\xbf<?xml", # utf-8 w/ byte order mark
"\0<\0?\0x\0m\0l", # utf-16 big endian
"<\0?\0x\0m\0l\0", # utf-16 little endian
"\xfe\xff\0<\0?\0x\0m\0l", # utf-16 big endian w/ byte order mark
"\xff\xfe<\0?\0x\0m\0l\0", # utf-16 little endian w/ byte order mark
]
XML_PREFIX_MAX_LENGTH = max(map(len, XML_PREFIXES))
def sniff_type(text):
for prefix in XML_PREFIXES:
if text.startswith(prefix):
return "text/xml"
return None
......@@ -15,16 +15,23 @@
Handler for Python expressions that uses the RestrictedPython package.
"""
import sys
from AccessControl import safe_builtins
from AccessControl.ZopeGuards import guarded_getattr, get_safe_globals
from DocumentTemplate.DT_Util import TemplateDict, InstanceDict
from DocumentTemplate.security import RestrictedDTML
from RestrictedPython import compile_restricted_eval
from zope.tales.tales import CompilerError
from zope.tales.pythonexpr import PythonExpr
if sys.version_info >= (3, 0):
unicode = str
class PythonExpr(PythonExpr):
_globals = get_safe_globals()
_globals['_getattr_'] = guarded_getattr
_globals['__debug__' ] = __debug__
_globals['__debug__'] = __debug__
def __init__(self, name, expr, engine):
self.text = self.expr = text = expr.strip().replace('\n', ' ')
......@@ -33,11 +40,13 @@ class PythonExpr(PythonExpr):
# We convert the expression to UTF-8 (ajung)
if isinstance(text, unicode):
text = text.encode('utf-8')
code, err, warn, use = compile_restricted_eval(text,
self.__class__.__name__)
code, err, warn, use = compile_restricted_eval(
text, self.__class__.__name__)
if err:
raise engine.getCompilerError()('Python expression error:\n%s' %
'\n'.join(err))
raise engine.getCompilerError()(
'Python expression error:\n%s' % '\n'.join(err))
self._varnames = use.keys()
self._code = code
......@@ -47,7 +56,8 @@ class PythonExpr(PythonExpr):
vars.update(self._globals)
return eval(self._code, vars, {})
class _SecureModuleImporter:
class _SecureModuleImporter(object):
__allow_access_to_unprotected_subobjects__ = True
def __getitem__(self, module):
......@@ -57,11 +67,11 @@ class _SecureModuleImporter:
mod = getattr(mod, name)
return mod
from DocumentTemplate.DT_Util import TemplateDict, InstanceDict
from DocumentTemplate.security import RestrictedDTML
class Rtd(RestrictedDTML, TemplateDict):
this = None
def call_with_ns(f, ns, arg=1):
td = Rtd()
# prefer 'context' to 'here'; fall back to 'None'
......@@ -74,7 +84,7 @@ def call_with_ns(f, ns, arg=1):
td._push(InstanceDict(td.this, td))
td._push(ns)
try:
if arg==2:
if arg == 2:
return f(None, td)
else:
return f(td)
......
......@@ -14,6 +14,7 @@
"""
import os
import sys
from AccessControl.class_init import InitializeClass
from AccessControl.Permissions import change_page_templates
......@@ -45,8 +46,11 @@ from Products.PageTemplates.utils import encodingFromXMLPreamble
from Products.PageTemplates.utils import charsetFromMetaEquiv
from Products.PageTemplates.utils import convertToUnicode
if sys.version_info >= (3, 0):
unicode = str
preferred_encodings = ['utf-8', 'iso-8859-15']
if os.environ.has_key('ZPT_PREFERRED_ENCODING'):
if 'ZPT_PREFERRED_ENCODING' in os.environ:
preferred_encodings.insert(0, os.environ['ZPT_PREFERRED_ENCODING'])
......@@ -68,6 +72,7 @@ class Src(Explicit):
InitializeClass(Src)
class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
Traversable, PropertyManager):
"Zope wrapper for Page Template using TAL, TALES, and METAL"
......@@ -83,19 +88,19 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
'www', 'default.html')
manage_options = (
{'label':'Edit', 'action':'pt_editForm'},
{'label':'Test', 'action':'ZScriptHTML_tryForm'},
) + PropertyManager.manage_options \
{'label': 'Edit', 'action': 'pt_editForm'},
{'label': 'Test', 'action': 'ZScriptHTML_tryForm'},
) + PropertyManager.manage_options \
+ Historical.manage_options \
+ SimpleItem.manage_options \
+ Cacheable.manage_options
_properties=({'id':'title', 'type': 'ustring', 'mode': 'w'},
{'id':'content_type', 'type':'string', 'mode': 'w'},
{'id':'output_encoding', 'type':'string', 'mode': 'w'},
{'id':'expand', 'type':'boolean', 'mode': 'w'},
)
_properties = (
{'id': 'title', 'type': 'ustring', 'mode': 'w'},
{'id': 'content_type', 'type': 'string', 'mode': 'w'},
{'id': 'output_encoding', 'type': 'string', 'mode': 'w'},
{'id': 'expand', 'type': 'boolean', 'mode': 'w'},
)
security = ClassSecurityInfo()
security.declareObjectProtected(view)
......@@ -120,7 +125,6 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
security.declareProtected(change_page_templates, 'pt_edit')
def pt_edit(self, text, content_type, keep_output_encoding=False):
text = text.strip()
is_unicode = isinstance(text, unicode)
......@@ -194,7 +198,7 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
text = unicode(text, 'utf-8')
self.pt_edit(text, content_type, True)
REQUEST.set('text', self.read()) # May not equal 'text'!
REQUEST.set('text', self.read()) # May not equal 'text'!
REQUEST.set('title', self.title)
message = "Saved changes."
if getattr(self, '_v_warnings', None):
......@@ -202,7 +206,6 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
% '<br>'.join(self._v_warnings))
return self.pt_editForm(manage_tabs_message=message)
security.declareProtected(change_page_templates, 'pt_setTitle')
def pt_setTitle(self, title, encoding='utf-8'):
if not isinstance(title, unicode):
......@@ -231,9 +234,6 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
text = file.read()
content_type = guess_type(filename, text)
# if not content_type in ('text/html', 'text/xml'):
# raise ValueError('Unsupported mimetype: %s' % content_type)
self.pt_edit(text, content_type)
return self.pt_editForm(manage_tabs_message='Saved changes')
......@@ -241,23 +241,24 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
def pt_changePrefs(self, REQUEST, height=None, width=None,
dtpref_cols="100%", dtpref_rows="20"):
"""Change editing preferences."""
dr = {"Taller":5, "Shorter":-5}.get(height, 0)
dc = {"Wider":5, "Narrower":-5}.get(width, 0)
if isinstance(height, int): dtpref_rows = height
dr = {"Taller": 5, "Shorter": -5}.get(height, 0)
dc = {"Wider": 5, "Narrower": -5}.get(width, 0)
if isinstance(height, int):
dtpref_rows = height
if isinstance(width, int) or \
isinstance(width, str) and width.endswith('%'):
dtpref_cols = width
rows = str(max(1, int(dtpref_rows) + dr))
cols = str(dtpref_cols)
if cols.endswith('%'):
cols = str(min(100, max(25, int(cols[:-1]) + dc))) + '%'
cols = str(min(100, max(25, int(cols[:-1]) + dc))) + '%'
else:
cols = str(max(35, int(cols) + dc))
cols = str(max(35, int(cols) + dc))
e = (DateTime("GMT") + 365).rfc822()
setCookie = REQUEST["RESPONSE"].setCookie
setCookie("dtpref_rows", rows, path='/', expires=e)
setCookie("dtpref_cols", cols, path='/', expires=e)
REQUEST.other.update({"dtpref_cols":cols, "dtpref_rows":rows})
REQUEST.other.update({"dtpref_cols": cols, "dtpref_rows": rows})
return self.pt_editForm()
def ZScriptHTML_tryParams(self):
......@@ -269,7 +270,7 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
return ZopePageTemplate.inheritedAttribute(
'manage_historyCompare')(
self, rev1, rev2, REQUEST,
historyComparisonResults=html_diff(rev1._text, rev2._text) )
historyComparisonResults=html_diff(rev1._text, rev2._text))
def pt_getContext(self, *args, **kw):
root = None
......@@ -309,7 +310,7 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
request = aq_get(self, 'REQUEST', None)
if request is not None:
response = request.response
if not response.headers.has_key('content-type'):
if 'content-type' not in response.headers:
response.setHeader('content-type', self.content_type)
security = getSecurityManager()
......@@ -338,9 +339,11 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
finally:
security.removeContext(self)
security.declareProtected(change_page_templates,
'manage_historyCopy',
'manage_beforeHistoryCopy', 'manage_afterHistoryCopy')
security.declareProtected(
change_page_templates,
'manage_historyCopy',
'manage_beforeHistoryCopy',
'manage_afterHistoryCopy')
security.declareProtected(change_page_templates, 'PUT')
def PUT(self, REQUEST, RESPONSE):
......@@ -357,7 +360,7 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
security.declareProtected(change_page_templates, 'manage_FTPput')
manage_FTPput = PUT
security.declareProtected(ftp_access, 'manage_FTPstat','manage_FTPlist')
security.declareProtected(ftp_access, 'manage_FTPstat', 'manage_FTPlist')
security.declareProtected(ftp_access, 'manage_FTPget')
def manage_FTPget(self):
"Get source for FTP download"
......@@ -406,33 +409,33 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
# here?
_text = state.get('_text')
if _text is not None and not isinstance(state['_text'], unicode):
text, encoding = convertToUnicode(state['_text'],
state.get('content_type', 'text/html'),
preferred_encodings)
text, encoding = convertToUnicode(
state['_text'],
state.get('content_type', 'text/html'),
preferred_encodings)
state['_text'] = text
state['output_encoding'] = encoding
self.__dict__.update(state)
def pt_render(self, source=False, extra_context={}):
result = PageTemplate.pt_render(self, source, extra_context)
assert isinstance(result, unicode)
return result
def wl_isLocked(self):
return 0
InitializeClass(ZopePageTemplate)
setattr(ZopePageTemplate, 'source.xml', ZopePageTemplate.source_dot_xml)
setattr(ZopePageTemplate, 'source.xml', ZopePageTemplate.source_dot_xml)
setattr(ZopePageTemplate, 'source.html', ZopePageTemplate.source_dot_xml)
# Product registration and Add support
manage_addPageTemplateForm = PageTemplateFile(
'www/ptAdd', globals(), __name__='manage_addPageTemplateForm')
def manage_addPageTemplate(self, id, title='', text='', encoding='utf-8',
submit=None, REQUEST=None, RESPONSE=None):
"Add a Page Template with optional file content."
......@@ -440,23 +443,21 @@ def manage_addPageTemplate(self, id, title='', text='', encoding='utf-8',
filename = ''
content_type = 'text/html'
if REQUEST and REQUEST.has_key('file'):
if REQUEST and 'file' in REQUEST:
file = REQUEST['file']
filename = file.filename
text = file.read()
headers = getattr(file, 'headers', None)
if headers and headers.has_key('content_type'):
if headers and 'content_type' in headers:
content_type = headers['content_type']
else:
content_type = guess_type(filename, text)
else:
if hasattr(text, 'read'):
filename = getattr(text, 'filename', '')
headers = getattr(text, 'headers', None)
text = text.read()
if headers and headers.has_key('content_type'):
if headers and 'content_type' in headers:
content_type = headers['content_type']
else:
content_type = guess_type(filename, text)
......@@ -487,4 +488,4 @@ def initialize(context):
permission='Add Page Templates',
constructors=(manage_addPageTemplateForm,
manage_addPageTemplate),
)
)
......@@ -22,12 +22,10 @@ misc_ = {}
# import ZTUtils in order to make i importable through
# ZopeGuards.load_module() where an importable modules must be
# available in sys.modules
import ZTUtils
import ZTUtils # NOQA
def initialize(context):
# Import lazily, and defer initialization to the module
import ZopePageTemplate
ZopePageTemplate.initialize(context)
......@@ -13,6 +13,7 @@
from zope.interface import Interface
class IUnicodeEncodingConflictResolver(Interface):
""" A utility that tries to convert a non-unicode string into
a Python unicode by implementing some policy in order
......@@ -21,10 +22,8 @@ class IUnicodeEncodingConflictResolver(Interface):
"""
def resolve(context, text, expression):
""" Returns 'text' as unicode string.
""" Returns 'text' as unicode string.
'context' is the current context object.
'expression' is the original expression (can be used for
'expression' is the original expression (can be used for
logging purposes)
"""
......@@ -16,8 +16,8 @@
import util
__allow_access_to_unprotected_subobjects__={'batch': 1}
__roles__=None
__allow_access_to_unprotected_subobjects__ = {'batch': 1}
__roles__ = None
class batch(util.Base):
......@@ -26,97 +26,112 @@ class batch(util.Base):
def __init__(self, sequence, size, start=0, end=0,
orphan=3, overlap=0):
start=start+1
start = start + 1
start,end,sz=opt(start,end,size,orphan,sequence)
start, end, sz = opt(start, end, size, orphan, sequence)
self._last=end-1
self._first=start-1
self._last = end - 1
self._first = start - 1
self._sequence=sequence
self._size=size
self._start=start
self._end=end
self._orphan=orphan
self._overlap=overlap
self._sequence = sequence
self._size = size
self._start = start
self._end = end
self._orphan = orphan
self._overlap = overlap
def previous_sequence(self): return self._first
def previous_sequence(self):
return self._first
def previous_sequence_end_number(self):
start,end,spam=opt(0, self._start-1+self._overlap,
self._size, self._orphan, self._sequence)
start, end, spam = opt(0, self._start - 1 + self._overlap,
self._size, self._orphan, self._sequence)
return end
def previous_sequence_start_number(self):
start,end,spam=opt(0, self._start-1+self._overlap,
self._size, self._orphan, self._sequence)
start, end, spam = opt(0, self._start - 1 + self._overlap,
self._size, self._orphan, self._sequence)
return start
def previous_sequence_end_item(self):
start,end,spam=opt(0, self._start-1+self._overlap,
self._size, self._orphan, self._sequence)
return self._sequence[end-1]
start, end, spam = opt(0, self._start - 1 + self._overlap,
self._size, self._orphan, self._sequence)
return self._sequence[end - 1]
def previous_sequence_start_item(self):
start,end,spam=opt(0, self._start-1+self._overlap,
self._size, self._orphan, self._sequence)
return self._sequence[start-1]
start, end, spam = opt(0, self._start - 1 + self._overlap,
self._size, self._orphan, self._sequence)
return self._sequence[start - 1]
def next_sequence_end_number(self):
start,end,spam=opt(self._end+1-self._overlap, 0,
self._size, self._orphan, self._sequence)
start, end, spam = opt(self._end + 1 - self._overlap, 0,
self._size, self._orphan, self._sequence)
return end
def next_sequence_start_number(self):
start,end,spam=opt(self._end+1-self._overlap, 0,
self._size, self._orphan, self._sequence)
start, end, spam = opt(self._end + 1 - self._overlap, 0,
self._size, self._orphan, self._sequence)
return start
def next_sequence_end_item(self):
start,end,spam=opt(self._end+1-self._overlap, 0,
self._size, self._orphan, self._sequence)
return self._sequence[end-1]
start, end, spam = opt(self._end + 1 - self._overlap, 0,
self._size, self._orphan, self._sequence)
return self._sequence[end - 1]
def next_sequence_start_item(self):
start,end,spam=opt(self._end+1-self._overlap, 0,
self._size, self._orphan, self._sequence)
return self._sequence[start-1]
start, end, spam = opt(self._end + 1 - self._overlap, 0,
self._size, self._orphan, self._sequence)
return self._sequence[start - 1]
def next_sequence(self):
try: self._sequence[self._end]
except IndexError: return 0
else: return 1
try:
self._sequence[self._end]
except IndexError:
return 0
else:
return 1
def __getitem__(self, index):
if index > self._last: raise IndexError, index
return self._sequence[index+self._first]
if index > self._last:
raise IndexError(index)
return self._sequence[index + self._first]
def opt(start,end,size,orphan,sequence):
def opt(start, end, size, orphan, sequence):
if size < 1:
if start > 0 and end > 0 and end >= start:
size=end+1-start
else: size=7
size = end + 1 - start
else:
size = 7
if start > 0:
try: sequence[start-1]
except: start=len(sequence)
try:
sequence[start - 1]
except Exception:
start = len(sequence)
if end > 0:
if end < start: end=start
if end < start:
end = start
else:
end=start+size-1
try: sequence[end+orphan-1]
except: end=len(sequence)
end = start + size - 1
try:
sequence[end + orphan - 1]
except Exception:
end = len(sequence)
elif end > 0:
try: sequence[end-1]
except: end=len(sequence)
start=end+1-size
if start - 1 < orphan: start=1
try:
sequence[end - 1]
except Exception:
end = len(sequence)
start = end + 1 - size
if start - 1 < orphan:
start = 1
else:
start=1
end=start+size-1
try: sequence[end+orphan-1]
except: end=len(sequence)
return start,end,size
start = 1
end = start + size - 1
try:
sequence[end + orphan - 1]
except Exception:
end = len(sequence)
return start, end, size
......@@ -19,41 +19,45 @@ from zope.traversing.adapters import DefaultTraversable
from Products.PageTemplates.tests import util
from Products.PageTemplates.PageTemplate import PageTemplate
from Products.PageTemplates.interfaces import IUnicodeEncodingConflictResolver
from Products.PageTemplates.unicodeconflictresolver import DefaultUnicodeEncodingConflictResolver
from Products.PageTemplates.unicodeconflictresolver import \
DefaultUnicodeEncodingConflictResolver
from Acquisition import Implicit
from AccessControl import SecurityManager
from AccessControl.SecurityManagement import noSecurityManager
class AqPageTemplate(Implicit, PageTemplate):
pass
class UnitTestSecurityPolicy:
class UnitTestSecurityPolicy(object):
"""
Stub out the existing security policy for unit testing purposes.
"""
#
# Standard SecurityPolicy interface
#
def validate( self
, accessed=None
, container=None
, name=None
, value=None
, context=None
, roles=None
, *args
, **kw):
def validate(self,
accessed=None,
container=None,
name=None,
value=None,
context=None,
roles=None,
*args, **kw):
return 1
def checkPermission( self, permission, object, context) :
def checkPermission(self, permission, object, context):
return 1
class DTMLTests(zope.component.testing.PlacelessSetup, unittest.TestCase):
def setUp(self):
super(DTMLTests, self).setUp()
zope.component.provideAdapter(DefaultTraversable, (None,))
provideUtility(DefaultUnicodeEncodingConflictResolver, IUnicodeEncodingConflictResolver)
provideUtility(DefaultUnicodeEncodingConflictResolver,
IUnicodeEncodingConflictResolver)
self.t = AqPageTemplate()
self.policy = UnitTestSecurityPolicy()
......@@ -65,7 +69,7 @@ class DTMLTests(zope.component.testing.PlacelessSetup, unittest.TestCase):
SecurityManager.setSecurityPolicy(self.oldPolicy)
noSecurityManager() # Reset to old policy.
def check1(self):
def test_1(self):
"""DTML test 1: if, in, and var:
%(comment)[ blah %(comment)]
......@@ -90,18 +94,18 @@ class DTMLTests(zope.component.testing.PlacelessSetup, unittest.TestCase):
tal = util.read_input('DTML1.html')
self.t.write(tal)
aa=util.argv(('one', 'two', 'three', 'cha', 'cha', 'cha'))
o=self.t.__of__(aa)()
aa = util.argv(('one', 'two', 'three', 'cha', 'cha', 'cha'))
o = self.t.__of__(aa)()
expect = util.read_output('DTML1a.html')
util.check_xml(expect, o)
aa=util.argv(())
o=self.t.__of__(aa)()
aa = util.argv(())
o = self.t.__of__(aa)()
expect = util.read_output('DTML1b.html')
util.check_xml(expect, o)
def check3(self):
def test_3(self):
"""DTML test 3: batches and formatting:
<html><head><title>Test of documentation templates</title></head>
......@@ -134,18 +138,19 @@ class DTMLTests(zope.component.testing.PlacelessSetup, unittest.TestCase):
tal = util.read_input('DTML3.html')
self.t.write(tal)
aa=util.argv(('one', 'two', 'three', 'four', 'five',
'six', 'seven', 'eight', 'nine', 'ten',
'eleven', 'twelve', 'thirteen', 'fourteen', 'fifteen',
'sixteen', 'seventeen', 'eighteen', 'nineteen', 'twenty',
))
aa = util.argv(('one', 'two', 'three', 'four', 'five',
'six', 'seven', 'eight', 'nine', 'ten',
'eleven', 'twelve', 'thirteen', 'fourteen',
'fifteen', 'sixteen', 'seventeen', 'eighteen',
'nineteen', 'twenty',
))
from Products.PageTemplates.tests import batch
o=self.t.__of__(aa)(batch=batch.batch(aa.args, 5))
o = self.t.__of__(aa)(batch=batch.batch(aa.args, 5))
expect = util.read_output('DTML3.html')
util.check_xml(expect, o)
def check_on_error_in_slot_filler(self):
def test_on_error_in_slot_filler(self):
# The `here` isn't defined, so the macro definition is
# expected to catch the error that gets raised.
text = '''\
......@@ -166,7 +171,7 @@ class DTMLTests(zope.component.testing.PlacelessSetup, unittest.TestCase):
aa = util.argv(('one', 'two', 'three', 'four', 'five'))
self.t.__of__(aa)()
def check_on_error_in_slot_default(self):
def test_on_error_in_slot_default(self):
# The `here` isn't defined, so the macro definition is
# expected to catch the error that gets raised.
text = '''\
......@@ -184,7 +189,3 @@ class DTMLTests(zope.component.testing.PlacelessSetup, unittest.TestCase):
self.t.write(text)
aa = util.argv(('one', 'two', 'three', 'four', 'five'))
self.t.__of__(aa)()
def test_suite():
return unittest.makeSuite(DTMLTests, 'check')
# *-* coding: iso-8859-1 -*-
import sys
import unittest
from zope.component.testing import PlacelessSetup
if sys.version_info >= (3, 0):
unicode = str
class EngineTestsBase(PlacelessSetup):
def setUp(self):
......@@ -23,17 +28,18 @@ class EngineTestsBase(PlacelessSetup):
class Dummy:
__allow_access_to_unprotected_subobjects__ = 1
management_page_charset = 'utf-8'
def __call__(self):
return 'dummy'
management_page_charset = 'utf-8'
class DummyDocumentTemplate:
__allow_access_to_unprotected_subobjects__ = 1
isDocTemp = True
def __call__(self, client=None, REQUEST={}, RESPONSE=None, **kw):
return 'dummy'
def absolute_url(self, relative=0):
url = 'dummy'
if not relative:
......@@ -41,25 +47,25 @@ class EngineTestsBase(PlacelessSetup):
return url
_DEFAULT_BINDINGS = dict(
one = 1,
d = {'one': 1, 'b': 'b', '': 'blank', '_': 'under'},
blank = '',
dummy = Dummy(),
dummy2 = DummyDocumentTemplate(),
eightbit = '',
one=1,
d={'one': 1, 'b': 'b', '': 'blank', '_': 'under'},
blank='',
dummy=Dummy(),
dummy2=DummyDocumentTemplate(),
eightbit='',
# ZopeContext needs 'context' and 'template' keys for unicode
# conflict resolution, and 'context' needs a
# conflict resolution, and 'context' needs a
# 'management_page_charset'
context = Dummy(),
template = DummyDocumentTemplate(),
)
context=Dummy(),
template=DummyDocumentTemplate(),
)
if bindings is None:
bindings = _DEFAULT_BINDINGS
return self._makeEngine().getContext(bindings)
def test_compile(self):
#Test expression compilation
# Test expression compilation
e = self._makeEngine()
for p in ('x', 'x/y', 'x/y/z'):
e.compile(p)
......@@ -185,9 +191,10 @@ class EngineTestsBase(PlacelessSetup):
from Products.PageTemplates.interfaces \
import IUnicodeEncodingConflictResolver
provideUtility(StrictUnicodeEncodingConflictResolver,
IUnicodeEncodingConflictResolver)
IUnicodeEncodingConflictResolver)
self.assertEqual(ec.evaluate(expr), u'')
class UntrustedEngineTests(EngineTestsBase, unittest.TestCase):
def _makeEngine(self):
......@@ -196,6 +203,7 @@ class UntrustedEngineTests(EngineTestsBase, unittest.TestCase):
# XXX: add tests that show security checks being enforced
class TrustedEngineTests(EngineTestsBase, unittest.TestCase):
def _makeEngine(self):
......@@ -204,6 +212,7 @@ class TrustedEngineTests(EngineTestsBase, unittest.TestCase):
# XXX: add tests that show security checks *not* being enforced
class UnicodeEncodingConflictResolverTests(PlacelessSetup, unittest.TestCase):
def testDefaultResolver(self):
......@@ -213,7 +222,7 @@ class UnicodeEncodingConflictResolverTests(PlacelessSetup, unittest.TestCase):
import IUnicodeEncodingConflictResolver
from Products.PageTemplates.unicodeconflictresolver \
import DefaultUnicodeEncodingConflictResolver
provideUtility(DefaultUnicodeEncodingConflictResolver,
provideUtility(DefaultUnicodeEncodingConflictResolver,
IUnicodeEncodingConflictResolver)
resolver = getUtility(IUnicodeEncodingConflictResolver)
self.assertRaises(UnicodeDecodeError,
......@@ -226,8 +235,8 @@ class UnicodeEncodingConflictResolverTests(PlacelessSetup, unittest.TestCase):
import IUnicodeEncodingConflictResolver
from Products.PageTemplates.unicodeconflictresolver \
import StrictUnicodeEncodingConflictResolver
provideUtility(StrictUnicodeEncodingConflictResolver,
IUnicodeEncodingConflictResolver)
provideUtility(StrictUnicodeEncodingConflictResolver,
IUnicodeEncodingConflictResolver)
resolver = getUtility(IUnicodeEncodingConflictResolver)
text = u'\xe4\xfc\xe4'
self.assertEqual(resolver.resolve(None, text, None), text)
......@@ -239,8 +248,8 @@ class UnicodeEncodingConflictResolverTests(PlacelessSetup, unittest.TestCase):
import IUnicodeEncodingConflictResolver
from Products.PageTemplates.unicodeconflictresolver \
import IgnoringUnicodeEncodingConflictResolver
provideUtility(IgnoringUnicodeEncodingConflictResolver,
IUnicodeEncodingConflictResolver)
provideUtility(IgnoringUnicodeEncodingConflictResolver,
IUnicodeEncodingConflictResolver)
resolver = getUtility(IUnicodeEncodingConflictResolver)
self.assertEqual(resolver.resolve(None, '', None), '')
......@@ -250,13 +259,14 @@ class UnicodeEncodingConflictResolverTests(PlacelessSetup, unittest.TestCase):
from Products.PageTemplates.interfaces \
import IUnicodeEncodingConflictResolver
from Products.PageTemplates.unicodeconflictresolver \
import ReplacingUnicodeEncodingConflictResolver
provideUtility(ReplacingUnicodeEncodingConflictResolver,
IUnicodeEncodingConflictResolver)
import ReplacingUnicodeEncodingConflictResolver
provideUtility(ReplacingUnicodeEncodingConflictResolver,
IUnicodeEncodingConflictResolver)
resolver = getUtility(IUnicodeEncodingConflictResolver)
self.assertEqual(resolver.resolve(None, '', None),
u'\ufffd\ufffd\ufffd')
class ZopeContextTests(unittest.TestCase):
def _getTargetClass(self):
......@@ -291,11 +301,3 @@ class ZopeContextTests(unittest.TestCase):
info = context.createErrorInfo(AttributeError('nonesuch'), (12, 3))
self.assertTrue(info.type is AttributeError)
self.assertEqual(info.__allow_access_to_unprotected_subobjects__, 1)
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(UntrustedEngineTests),
unittest.makeSuite(TrustedEngineTests),
unittest.makeSuite(UnicodeEncodingConflictResolverTests),
unittest.makeSuite(ZopeContextTests),
))
......@@ -11,65 +11,72 @@
#
##############################################################################
import sys
import unittest
from AccessControl import SecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from Acquisition import Implicit
import zope.component.testing
from zope.component import provideUtility
from zope.traversing.adapters import DefaultTraversable
from Products.PageTemplates.tests import util
from Products.PageTemplates.PageTemplate import PageTemplate
from Products.PageTemplates.interfaces import IUnicodeEncodingConflictResolver
from Products.PageTemplates.unicodeconflictresolver import DefaultUnicodeEncodingConflictResolver
from AccessControl import SecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from Products.PageTemplates.unicodeconflictresolver import \
DefaultUnicodeEncodingConflictResolver
if sys.version_info >= (3, 0):
unicode = str
from Acquisition import Implicit
class AqPageTemplate(Implicit, PageTemplate):
pass
class Folder(util.Base):
pass
class UnitTestSecurityPolicy:
class UnitTestSecurityPolicy(object):
"""
Stub out the existing security policy for unit testing purposes.
"""
#
# Standard SecurityPolicy interface
#
def validate( self
, accessed=None
, container=None
, name=None
, value=None
, context=None
, roles=None
, *args
, **kw):
# Standard SecurityPolicy interface
def validate(self,
accessed=None,
container=None,
name=None,
value=None,
context=None,
roles=None,
*args, **kw):
return 1
def checkPermission( self, permission, object, context) :
def checkPermission(self, permission, object, context):
return 1
class HTMLTests(zope.component.testing.PlacelessSetup, unittest.TestCase):
def setUp(self):
super(HTMLTests, self).setUp()
zope.component.provideAdapter(DefaultTraversable, (None,))
provideUtility(DefaultUnicodeEncodingConflictResolver, IUnicodeEncodingConflictResolver)
provideUtility(DefaultUnicodeEncodingConflictResolver,
IUnicodeEncodingConflictResolver)
self.folder = f = Folder()
f.laf = AqPageTemplate()
f.t = AqPageTemplate()
self.policy = UnitTestSecurityPolicy()
self.oldPolicy = SecurityManager.setSecurityPolicy( self.policy )
self.oldPolicy = SecurityManager.setSecurityPolicy(self.policy)
noSecurityManager() # Use the new policy.
def tearDown(self):
super(HTMLTests, self).tearDown()
SecurityManager.setSecurityPolicy( self.oldPolicy )
SecurityManager.setSecurityPolicy(self.oldPolicy)
noSecurityManager() # Reset to old policy.
def assert_expected(self, t, fname, *args, **kwargs):
......@@ -89,78 +96,75 @@ class HTMLTests(zope.component.testing.PlacelessSetup, unittest.TestCase):
def getProducts(self):
return [
{'description': 'This is the tee for those who LOVE Zope. '
'Show your heart on your tee.',
'price': 12.99, 'image': 'smlatee.jpg'
},
{'description': 'This is the tee for Jim Fulton. '
'He\'s the Zope Pope!',
'price': 11.99, 'image': 'smpztee.jpg'
},
]
def check1(self):
{'description': 'This is the tee for those who LOVE Zope. '
'Show your heart on your tee.',
'price': 12.99, 'image': 'smlatee.jpg'
},
{'description': 'This is the tee for Jim Fulton. '
'He\'s the Zope Pope!',
'price': 11.99, 'image': 'smpztee.jpg'
},
]
def test_1(self):
self.assert_expected(self.folder.laf, 'TeeShopLAF.html')
def check2(self):
def test_2(self):
self.folder.laf.write(util.read_input('TeeShopLAF.html'))
self.assert_expected(self.folder.t, 'TeeShop2.html',
getProducts=self.getProducts)
def check3(self):
def test_3(self):
self.folder.laf.write(util.read_input('TeeShopLAF.html'))
self.assert_expected(self.folder.t, 'TeeShop1.html',
getProducts=self.getProducts)
def checkSimpleLoop(self):
def testSimpleLoop(self):
self.assert_expected(self.folder.t, 'Loop1.html')
def checkFancyLoop(self):
def testFancyLoop(self):
self.assert_expected(self.folder.t, 'Loop2.html')
def checkGlobalsShadowLocals(self):
def testGlobalsShadowLocals(self):
self.assert_expected(self.folder.t, 'GlobalsShadowLocals.html')
def checkStringExpressions(self):
def testStringExpressions(self):
self.assert_expected(self.folder.t, 'StringExpression.html')
def checkReplaceWithNothing(self):
def testReplaceWithNothing(self):
self.assert_expected(self.folder.t, 'CheckNothing.html')
def checkWithXMLHeader(self):
def testWithXMLHeader(self):
self.assert_expected(self.folder.t, 'CheckWithXMLHeader.html')
def checkNotExpression(self):
def testNotExpression(self):
self.assert_expected(self.folder.t, 'CheckNotExpression.html')
def checkPathNothing(self):
def testPathNothing(self):
self.assert_expected(self.folder.t, 'CheckPathNothing.html')
def checkPathAlt(self):
def testPathAlt(self):
self.assert_expected(self.folder.t, 'CheckPathAlt.html')
def checkBatchIteration(self):
def testBatchIteration(self):
self.assert_expected(self.folder.t, 'CheckBatchIteration.html')
def checkUnicodeInserts(self):
def testUnicodeInserts(self):
self.assert_expected_unicode(self.folder.t, 'CheckUnicodeInserts.html')
def checkI18nTranslate(self):
def testI18nTranslate(self):
self.assert_expected(self.folder.t, 'CheckI18nTranslate.html')
def checkImportOldStyleClass(self):
def testImportOldStyleClass(self):
self.assert_expected(self.folder.t, 'CheckImportOldStyleClass.html')
def checkRepeatVariable(self):
def testRepeatVariable(self):
self.assert_expected(self.folder.t, 'RepeatVariable.html')
def checkBooleanAttributesAndDefault(self):
def testBooleanAttributesAndDefault(self):
# Zope 2.9 and below support the semantics that an HTML
# "boolean" attribute (e.g. 'selected', 'disabled', etc.) can
# be used together with 'default'.
self.assert_expected(self.folder.t, 'BooleanAttributesAndDefault.html')
def test_suite():
return unittest.makeSuite(HTMLTests, 'check')
......@@ -11,14 +11,14 @@ class MiscTests(unittest.TestCase):
context = ['context']
here = ['here']
request = {'request': 1}
names = {'context' : context, 'here': here, 'request' : request}
names = {'context': context, 'here': here, 'request': request}
result = call_with_ns(lambda td: td.this, names)
self.assertTrue(result is context, result)
def test_call_with_ns_no_context_or_here(self):
from Products.PageTemplates.ZRPythonExpr import call_with_ns
request = {'request': 1}
names = {'request' : request}
names = {'request': request}
result = call_with_ns(lambda td: td.this, names)
self.assertTrue(result is None, result)
......@@ -26,7 +26,7 @@ class MiscTests(unittest.TestCase):
from Products.PageTemplates.ZRPythonExpr import call_with_ns
context = ['context']
here = ['here']
names = {'context' : context, 'here': here}
names = {'context': context, 'here': here}
def _find_request(td):
ns = td._pop() # peel off 'ns'
......@@ -42,15 +42,14 @@ class MiscTests(unittest.TestCase):
def test_call_with_request_preserves_tainting(self):
from Products.PageTemplates.ZRPythonExpr import call_with_ns
class Request(dict):
def taintWrapper(self):
return {'tainted': 'found'}
context = ['context']
here = ['here']
names = {'context' : context, 'here': here, 'request' : Request()}
names = {'context': context, 'here': here, 'request': Request()}
found = call_with_ns(lambda td: td['tainted'], names)
self.assertEqual(found, 'found')
def test_suite():
return unittest.makeSuite(MiscTests)
......@@ -528,7 +528,7 @@ class DummyRequest(dict):
pass
class DummyFileUpload:
class DummyFileUpload(object):
def __init__(self, data='', filename='', content_type=''):
self.data = data
......
"""Tests of PageTemplateFile."""
import os, os.path
import os
import os.path
import tempfile
import unittest
import Zope2
......@@ -17,9 +18,12 @@ class TypeSniffingTestCase(unittest.TestCase):
def setUp(self):
from zope.component import provideUtility
from Products.PageTemplates.interfaces import IUnicodeEncodingConflictResolver
from Products.PageTemplates.unicodeconflictresolver import DefaultUnicodeEncodingConflictResolver
provideUtility(DefaultUnicodeEncodingConflictResolver, IUnicodeEncodingConflictResolver)
from Products.PageTemplates.interfaces import \
IUnicodeEncodingConflictResolver
from Products.PageTemplates.unicodeconflictresolver import \
DefaultUnicodeEncodingConflictResolver
provideUtility(DefaultUnicodeEncodingConflictResolver,
IUnicodeEncodingConflictResolver)
def tearDown(self):
if os.path.exists(self.TEMPFILENAME):
......@@ -131,29 +135,29 @@ class TypeSniffingTestCase(unittest.TestCase):
def test_getId(self):
desired_id = os.path.splitext(os.path.split(self.TEMPFILENAME)[-1])[0]
f = open(self.TEMPFILENAME, 'w')
print >> f, 'Boring'
f.write('Boring')
f.close()
pt = PageTemplateFile(self.TEMPFILENAME)
pt_id = pt.getId()
self.assertEqual(
pt_id, desired_id,
'getId() returned %r. Expecting %r' % (pt_id, desired_id)
)
pt_id, desired_id,
'getId() returned %r. Expecting %r' % (pt_id, desired_id)
)
def test_getPhysicalPath(self):
desired_id = os.path.splitext(os.path.split(self.TEMPFILENAME)[-1])[0]
desired_path = (desired_id,)
f = open(self.TEMPFILENAME, 'w')
print >> f, 'Boring'
f.write('Boring')
f.close()
pt = PageTemplateFile(self.TEMPFILENAME)
pt_path = pt.getPhysicalPath()
self.assertEqual(
pt_path, desired_path,
'getPhysicalPath() returned %r. Expecting %r' % (
desired_path, pt_path,
)
)
pt_path, desired_path,
'getPhysicalPath() returned %r. Expecting %r' % (
desired_path, pt_path,
)
)
class LineEndingsTestCase(unittest.TestCase):
......@@ -203,15 +207,7 @@ class LazyLoadingTestCase(unittest.TestCase):
def test_lazy(self):
f = open(self.TEMPFILENAME, 'w')
print >> f, 'Lazyness'
f.write('Lazyness')
f.close()
pt = PageTemplateFile(self.TEMPFILENAME)
self.assertTrue(not pt._text and not pt._v_program)
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(TypeSniffingTestCase),
unittest.makeSuite(LineEndingsTestCase),
unittest.makeSuite(LazyLoadingTestCase),
))
......@@ -14,89 +14,127 @@
######################################################################
# Utility facilities to aid setting things up.
import os, sys, string, re
import os
import re
import string
import sys
from ExtensionClass import Base
import Products.PageTemplates.tests
class Bruce(Base):
__allow_access_to_unprotected_subobjects__=1
def __str__(self): return 'bruce'
def __int__(self): return 42
def __float__(self): return 42.0
def keys(self): return ['bruce']*7
def values(self): return [self]*7
def items(self): return [('bruce',self)]*7
def __len__(self): return 7
def __getitem__(self,index):
if (type(index) is type(1) and
(index < 0 or index > 6)): raise IndexError, index
__allow_access_to_unprotected_subobjects__ = 1
isDocTemp = 0
def __str__(self):
return 'bruce'
def __int__(self):
return 42
def __float__(self):
return 42.0
def keys(self):
return ['bruce'] * 7
def values(self):
return [self] * 7
def items(self):
return [('bruce', self)] * 7
def __len__(self):
return 7
def __getitem__(self, index):
if (isinstance(index, int) and
(index < 0 or index > 6)):
raise IndexError(index)
return self
isDocTemp=0
def __getattr__(self,name):
if name[:1]=='_': raise AttributeError, name
def __getattr__(self, name):
if name[:1] == '_':
raise AttributeError(name)
return self
bruce=Bruce()
bruce = Bruce()
class arg(Base):
__allow_access_to_unprotected_subobjects__=1
def __init__(self,nn,aa): self.num, self.arg = nn, aa
def __str__(self): return str(self.arg)
__allow_access_to_unprotected_subobjects__ = 1
def __init__(self, nn, aa):
self.num, self.arg = nn, aa
def __str__(self):
return str(self.arg)
class argv(Base):
__allow_access_to_unprotected_subobjects__=1
__allow_access_to_unprotected_subobjects__ = 1
def __init__(self, argv=sys.argv[1:]):
args=self.args=[]
args = self.args = []
for aa in argv:
args.append(arg(len(args)+1,aa))
args.append(arg(len(args) + 1, aa))
def items(self):
return map(lambda a: ('spam%d' % a.num, a), self.args)
def values(self): return self.args
def values(self):
return self.args
def getPhysicalRoot(self):
return self
def nicerange(lo, hi):
if hi <= lo+1:
return str(lo+1)
if hi <= lo + 1:
return str(lo + 1)
else:
return "%d,%d" % (lo+1, hi)
return "%d,%d" % (lo + 1, hi)
def check_html(s1, s2):
s1 = normalize_html(s1)
s2 = normalize_html(s2)
if s1!=s2:
print
if s1 != s2:
from OFS.ndiff import SequenceMatcher, dump, IS_LINE_JUNK
a = string.split(s1, '\n')
b = string.split(s2, '\n')
def add_nl(s):
return s + '\n'
a = map(add_nl, a)
b = map(add_nl, b)
cruncher=SequenceMatcher(isjunk=IS_LINE_JUNK, a=a, b=b)
cruncher = SequenceMatcher(isjunk=IS_LINE_JUNK, a=a, b=b)
for tag, alo, ahi, blo, bhi in cruncher.get_opcodes():
if tag == 'equal':
continue
print nicerange(alo, ahi) + tag[0] + nicerange(blo, bhi)
print(nicerange(alo, ahi) + tag[0] + nicerange(blo, bhi))
dump('<', a, alo, ahi)
if a and b:
print '---'
print('---')
dump('>', b, blo, bhi)
assert s1==s2, "HTML Output Changed"
assert s1 == s2, "HTML Output Changed"
def check_xml(s1, s2):
s1 = normalize_xml(s1)
s2 = normalize_xml(s2)
assert s1==s2, "XML Output Changed"
assert s1 == s2, "XML Output Changed"
def normalize_html(s):
s = re.sub(r"[ \t]+", " ", s)
s = re.sub(r"/>", ">", s)
return s
def normalize_xml(s):
s = re.sub(r"\s+", " ", s)
s = re.sub(r"(?s)\s+<", "<", s)
......@@ -104,15 +142,16 @@ def normalize_xml(s):
return s
import Products.PageTemplates.tests
dir = os.path.dirname( Products.PageTemplates.tests.__file__)
dir = os.path.dirname(Products.PageTemplates.tests.__file__)
input_dir = os.path.join(dir, 'input')
output_dir = os.path.join(dir, 'output')
def read_input(filename):
filename = os.path.join(input_dir, filename)
return open(filename, 'r').read()
def read_output(filename):
filename = os.path.join(output_dir, filename)
return open(filename, 'r').read()
......@@ -20,11 +20,14 @@ from Products.PageTemplates.interfaces import IUnicodeEncodingConflictResolver
from zope.interface import implements
from zope.i18n.interfaces import IUserPreferredCharsets
if sys.version_info >= (3, 0):
unicode = str
default_encoding = sys.getdefaultencoding()
class DefaultUnicodeEncodingConflictResolver:
""" This resolver implements the old-style behavior and will
class DefaultUnicodeEncodingConflictResolver(object):
""" This resolver implements the old-style behavior and will
raise an exception in case of the string 'text' can't be converted
propertly to unicode.
"""
......@@ -34,12 +37,13 @@ class DefaultUnicodeEncodingConflictResolver:
def resolve(self, context, text, expression):
return unicode(text)
DefaultUnicodeEncodingConflictResolver = DefaultUnicodeEncodingConflictResolver()
DefaultUnicodeEncodingConflictResolver = \
DefaultUnicodeEncodingConflictResolver()
class Z2UnicodeEncodingConflictResolver:
""" This resolver tries to lookup the encoding from the
'management_page_charset' property and defaults to
class Z2UnicodeEncodingConflictResolver(object):
""" This resolver tries to lookup the encoding from the
'management_page_charset' property and defaults to
sys.getdefaultencoding().
"""
......@@ -53,7 +57,8 @@ class Z2UnicodeEncodingConflictResolver:
try:
return unicode(text)
except UnicodeDecodeError:
encoding = getattr(context, 'management_page_charset', default_encoding)
encoding = getattr(
context, 'management_page_charset', default_encoding)
try:
return unicode(text, encoding, self.mode)
except UnicodeDecodeError:
......@@ -61,7 +66,7 @@ class Z2UnicodeEncodingConflictResolver:
return unicode(text, 'iso-8859-15', self.mode)
class PreferredCharsetResolver:
class PreferredCharsetResolver(object):
""" A resolver that tries use the encoding information
from the HTTP_ACCEPT_CHARSET header.
"""
......@@ -78,7 +83,8 @@ class PreferredCharsetResolver:
if request is None:
charsets = [default_encoding]
management_charset = getattr(context, 'management_page_charset', None)
management_charset = getattr(
context, 'management_page_charset', None)
if management_charset:
charsets.insert(0, management_charset)
else:
......@@ -86,7 +92,7 @@ class PreferredCharsetResolver:
charsets = getattr(request, '__zpt_available_charsets', None)
# No uncached charsets found: investigate the HTTP_ACCEPT_CHARSET
# header. This code is only called if 'context' has a request
# header. This code is only called if 'context' has a request
# object. The condition is true because otherwise 'charsets' contains
# at least the default encoding of Python.
if charsets is None:
......@@ -98,14 +104,14 @@ class PreferredCharsetResolver:
# include the charsets based on the HTTP_ACCEPT_CHARSET
# header
charsets = IUserPreferredCharsets(request).getPreferredCharsets() +\
charsets
charsets = IUserPreferredCharsets(
request).getPreferredCharsets() + charsets
# cache list of charsets
request.__zpt_available_charsets = charsets
for enc in charsets:
if enc == '*':
if enc == '*':
continue
try:
......@@ -116,7 +122,10 @@ class PreferredCharsetResolver:
return text
StrictUnicodeEncodingConflictResolver = Z2UnicodeEncodingConflictResolver('strict')
ReplacingUnicodeEncodingConflictResolver = Z2UnicodeEncodingConflictResolver('replace')
IgnoringUnicodeEncodingConflictResolver = Z2UnicodeEncodingConflictResolver('ignore')
StrictUnicodeEncodingConflictResolver = \
Z2UnicodeEncodingConflictResolver('strict')
ReplacingUnicodeEncodingConflictResolver = \
Z2UnicodeEncodingConflictResolver('replace')
IgnoringUnicodeEncodingConflictResolver = \
Z2UnicodeEncodingConflictResolver('ignore')
PreferredCharsetResolver = PreferredCharsetResolver()
......@@ -13,11 +13,19 @@
"""Some helper methods
"""
import re
import re
import sys
if sys.version_info >= (3, 0):
unicode = str
xml_preamble_reg = re.compile(
r'^<\?xml.*?encoding="(.*?)".*?\?>', re.M)
http_equiv_reg = re.compile(
r'(<meta\s+[^>]*?http\-equiv[^>]*?content-type.*?>)', re.I | re.M | re.S)
http_equiv_reg2 = re.compile(
r'charset.*?=.*?(?P<charset>[\w\-]*)', re.I | re.M | re.S)
xml_preamble_reg = re.compile(r'^<\?xml.*?encoding="(.*?)".*?\?>', re.M)
http_equiv_reg = re.compile(r'(<meta\s+[^>]*?http\-equiv[^>]*?content-type.*?>)', re.I|re.M|re.S)
http_equiv_reg2 = re.compile(r'charset.*?=.*?(?P<charset>[\w\-]*)', re.I|re.M|re.S)
def encodingFromXMLPreamble(xml):
""" Extract the encoding from a xml preamble.
......@@ -32,10 +40,11 @@ def encodingFromXMLPreamble(xml):
return mo.group(1).lower()
def charsetFromMetaEquiv(html):
""" Return the value of the 'charset' from a html document
containing <meta http-equiv="content-type" content="text/html; charset=utf8>.
Returns None, if not available.
def charsetFromMetaEquiv(html):
"""
Return the value of the 'charset' from a html document containing
<meta http-equiv="content-type" content="text/html; charset=utf8>.
Returns None, if not available.
"""
# first check for the <meta...> tag
......@@ -47,7 +56,7 @@ def charsetFromMetaEquiv(html):
# search for the charset value
mo = http_equiv_reg2.search(meta)
if mo:
# return charset
# return charset
return mo.group(1).lower()
return None
......@@ -60,12 +69,12 @@ def convertToUnicode(source, content_type, preferred_encodings):
if content_type.startswith('text/xml'):
encoding = encodingFromXMLPreamble(source)
return unicode(source, encoding), encoding
return unicode(source, encoding), encoding
elif content_type.startswith('text/html'):
encoding = charsetFromMetaEquiv(source)
if encoding:
return unicode(source, encoding), encoding
return unicode(source, encoding), encoding
# Try to detect the encoding by converting it unicode without raising
# exceptions. There are some smarter Python-based sniffer methods
......
......@@ -17,11 +17,12 @@ from ZPublisher.BeforeTraverse import unregisterBeforeTraverse
from ZPublisher.BaseRequest import quote
from zExceptions import BadRequest
class VirtualHostMonster(Persistent, Item, Implicit):
"""Provide a simple drop-in solution for virtual hosting.
"""
meta_type='Virtual Host Monster'
meta_type = 'Virtual Host Monster'
priority = 25
id = 'VHM'
......@@ -31,8 +32,10 @@ class VirtualHostMonster(Persistent, Item, Implicit):
security = ClassSecurityInfo()
manage_options=({'label':'About', 'action':'manage_main'},
{'label':'Mappings', 'action':'manage_edit'})
manage_options = (
{'label': 'About', 'action': 'manage_main'},
{'label': 'Mappings', 'action': 'manage_edit'},
)
security.declareProtected(View, 'manage_main')
manage_main = DTMLFile('www/VirtualHostMonster', globals(),
......@@ -56,12 +59,11 @@ class VirtualHostMonster(Persistent, Item, Implicit):
# Drop the protocol, if any
line = line.split('://')[-1]
try:
host, path = [x.strip() for x in line.split('/', 1)]
host, path = [x.strip() for x in line.split('/', 1)]
except:
raise ValueError(
'Line needs a slash between host and path: %s'
% line )
pp = filter(None, path.split( '/'))
'Line needs a slash between host and path: %s' % line)
pp = filter(None, path.split('/'))
if pp:
obpath = pp[:]
if obpath[0] == 'VirtualHostBase':
......@@ -76,18 +78,18 @@ class VirtualHostMonster(Persistent, Item, Implicit):
try:
ob = self.unrestrictedTraverse(obpath)
except:
raise ValueError, (
'Path not found: %s' % obpath )
raise ValueError(
'Path not found: %s' % obpath)
if not getattr(ob.aq_base, 'isAnObjectManager', 0):
raise ValueError, (
'Path must lead to an Object Manager: %s'
% obpath)
raise ValueError(
'Path must lead to an Object Manager: %s' %
obpath)
if 'VirtualHostRoot' not in pp:
pp.append('/')
pp.reverse()
try:
int(host.replace('.',''))
raise ValueError, (
int(host.replace('.', ''))
raise ValueError(
'IP addresses are not mappable: %s' % host)
except ValueError:
pass
......@@ -96,15 +98,15 @@ class VirtualHostMonster(Persistent, Item, Implicit):
host = host[2:]
else:
host_map = fixed_map
hostname, port = (host.split( ':', 1) + [None])[:2]
hostname, port = (host.split(':', 1) + [None])[:2]
if hostname not in host_map:
host_map[hostname] = {}
host_map[hostname][port] = pp
except 'LineError', msg:
except 'LineError' as msg:
line = '%s #! %s' % (line, msg)
new_lines.append(line)
self.lines = tuple(new_lines)
self.have_map = not not (fixed_map or sub_map) # booleanize
self.have_map = bool(fixed_map or sub_map) # booleanize
if RESPONSE is not None:
RESPONSE.redirect(
'manage_edit?manage_tabs_message=Changes%20Saved.')
......@@ -115,9 +117,10 @@ class VirtualHostMonster(Persistent, Item, Implicit):
def manage_addToContainer(self, container, nextURL=''):
self.addToContainer(container)
if nextURL:
return MessageDialog(title='Item Added',
message='This object now has a %s' % self.meta_type,
action=nextURL)
return MessageDialog(
title='Item Added',
message='This object now has a %s' % self.meta_type,
action=nextURL)
def manage_beforeDelete(self, item, container):
if item is self:
......@@ -125,19 +128,18 @@ class VirtualHostMonster(Persistent, Item, Implicit):
def manage_afterAdd(self, item, container):
if item is self:
if queryBeforeTraverse(container,
self.meta_type):
raise BadRequest, ('This container already has a %s' %
self.meta_type)
if queryBeforeTraverse(container, self.meta_type):
raise BadRequest('This container already has a %s' %
self.meta_type)
id = self.id
if callable(id): id = id()
if callable(id):
id = id()
# We want the original object, not stuff in between
container = container.this()
hook = NameCaller(id)
registerBeforeTraverse(container, hook,
self.meta_type,
self.priority)
registerBeforeTraverse(
container, hook, self.meta_type, self.priority)
def __call__(self, client, request, response=None):
'''Traversing at home'''
......@@ -210,11 +212,11 @@ class VirtualHostMonster(Persistent, Item, Implicit):
request['ACTUAL_URL'] = request['VIRTUAL_URL'] + add
return
vh_used = 1 # Only retry once.
vh_used = 1 # Only retry once.
# Try to apply the host map if one exists, and if no
# VirtualHost directives were found.
host = request['SERVER_URL'].split('://')[1].lower()
hostname, port = (host.split( ':', 1) + [None])[:2]
hostname, port = (host.split(':', 1) + [None])[:2]
ports = self.fixed_map.get(hostname, 0)
if not ports and self.sub_map:
get = self.sub_map.get
......@@ -243,13 +245,13 @@ class VirtualHostMonster(Persistent, Item, Implicit):
if name[:1] != '/':
return getattr(self, name)
parents = request.PARENTS
parents.pop() # I don't belong there
parents.pop() # I don't belong there
if len(name) > 1:
request.setVirtualRoot(name)
else:
request.setVirtualRoot([])
return parents.pop() # He'll get put back on
return parents.pop() # He'll get put back on
InitializeClass(VirtualHostMonster)
......@@ -266,5 +268,5 @@ def manage_addVirtualHostMonster(self, id=None, REQUEST=None, **ignored):
REQUEST['RESPONSE'].redirect('%s?%s' % (goto, qs))
constructors = (
('manage_addVirtualHostMonster', manage_addVirtualHostMonster),
('manage_addVirtualHostMonster', manage_addVirtualHostMonster),
)
......@@ -3,6 +3,8 @@
def initialize(context):
import VirtualHostMonster
context.registerClass(instance_class=VirtualHostMonster.VirtualHostMonster,
permission='Add Virtual Host Monsters',
constructors=VirtualHostMonster.constructors)
context.registerClass(
instance_class=VirtualHostMonster.VirtualHostMonster,
permission='Add Virtual Host Monsters',
constructors=VirtualHostMonster.constructors,
)
......@@ -4,11 +4,10 @@ These tests mainly verify that OFS.Traversable.absolute_url()
works correctly in a VHM environment.
Also see http://zope.org/Collectors/Zope/809
Note: Tests require Zope >= 2.7
"""
import unittest
class VHMRegressions(unittest.TestCase):
def setUp(self):
......@@ -52,64 +51,64 @@ class VHMRegressions(unittest.TestCase):
self.assertEqual(m(), ('', 'folder', 'doc'))
def test_actual_url_no_VHR_no_doc_w_trailing_slash(self):
ob = self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/')
self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/')
self.assertEqual(self.app.REQUEST['ACTUAL_URL'],
'http://www.mysite.com/folder/')
'http://www.mysite.com/folder/')
def test_actual_url_no_VHR_no_doc_no_trailing_slash(self):
ob = self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder')
self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder')
self.assertEqual(self.app.REQUEST['ACTUAL_URL'],
'http://www.mysite.com/folder')
def test_actual_url_no_VHR_w_doc_w_trailing_slash(self):
ob = self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/doc/')
self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/doc/')
self.assertEqual(self.app.REQUEST['ACTUAL_URL'],
'http://www.mysite.com/folder/doc/')
'http://www.mysite.com/folder/doc/')
def test_actual_url_no_VHR_w_doc_no_trailing_slash(self):
ob = self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/doc')
self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/doc')
self.assertEqual(self.app.REQUEST['ACTUAL_URL'],
'http://www.mysite.com/folder/doc')
def test_actual_url_w_VHR_w_doc_w_trailing_slash(self):
ob = self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/VirtualHostRoot/doc/')
self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/VirtualHostRoot/doc/')
self.assertEqual(self.app.REQUEST['ACTUAL_URL'],
'http://www.mysite.com/doc/')
'http://www.mysite.com/doc/')
def test_actual_url_w_VHR_w_doc_no_trailing_slash(self):
ob = self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/VirtualHostRoot/doc')
self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/VirtualHostRoot/doc')
self.assertEqual(self.app.REQUEST['ACTUAL_URL'],
'http://www.mysite.com/doc')
def test_actual_url_w_VHR_no_doc_w_trailing_slash(self):
ob = self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/VirtualHostRoot/')
self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/VirtualHostRoot/')
self.assertEqual(self.app.REQUEST['ACTUAL_URL'],
'http://www.mysite.com/')
def test_actual_url_w_VHR_no_doc_no_trailing_slash(self):
ob = self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/VirtualHostRoot')
self.traverse('/VirtualHostBase/http/www.mysite.com:80'
'/folder/VirtualHostRoot')
self.assertEqual(self.app.REQUEST['ACTUAL_URL'],
'http://www.mysite.com/')
'http://www.mysite.com/')
def gen_cases():
for vbase, ubase in (
('', 'http://foo'),
('/VirtualHostBase/http/example.com:80', 'http://example.com'),
):
('', 'http://foo'),
('/VirtualHostBase/http/example.com:80', 'http://example.com')):
yield vbase, '', '', 'folder/doc', ubase
for vr, _vh, p in (
('folder', '', 'doc'),
('folder', 'foo', 'doc'),
('', 'foo', 'folder/doc'),
):
('folder', '', 'doc'),
('folder', 'foo', 'doc'),
('', 'foo', 'folder/doc')):
vparts = [vbase, vr, 'VirtualHostRoot']
if not vr:
del vparts[1]
......@@ -117,6 +116,7 @@ def gen_cases():
vparts.append('_vh_' + _vh)
yield '/'.join(vparts), vr, _vh, p, ubase
for i, (vaddr, vr, _vh, p, ubase) in enumerate(gen_cases()):
def test(self, vaddr=vaddr, vr=vr, _vh=_vh, p=p, ubase=ubase):
ob = self.traverse('%s/%s/' % (vaddr, p))
......@@ -160,10 +160,7 @@ class VHMAddingTests(unittest.TestCase):
vhm2 = self._makeOne()
self.assertRaises(BadRequest, vhm2.manage_addToContainer, self.root)
self.assertRaises( BadRequest
, manage_addVirtualHostMonster
, self.root
)
self.assertRaises(BadRequest, manage_addVirtualHostMonster, self.root)
def test_add_id_collision(self):
from OFS.Folder import Folder
......@@ -174,10 +171,7 @@ class VHMAddingTests(unittest.TestCase):
vhm1 = self._makeOne()
self.assertRaises(BadRequest, vhm1.manage_addToContainer, self.root)
self.assertRaises( BadRequest
, manage_addVirtualHostMonster
, self.root
)
self.assertRaises(BadRequest, manage_addVirtualHostMonster, self.root)
def test_add_addToContainer(self):
from ZPublisher.BeforeTraverse import queryBeforeTraverse
......@@ -197,9 +191,3 @@ class VHMAddingTests(unittest.TestCase):
self.assertTrue(VirtualHostMonster.id in self.root.objectIds())
hook = queryBeforeTraverse(self.root, VirtualHostMonster.meta_type)
self.assertTrue(hook)
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(VHMRegressions))
suite.addTest(unittest.makeSuite(VHMAddingTests))
return suite
......@@ -19,6 +19,7 @@ from AccessControl.SecurityInfo import ClassSecurityInfo
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl.Permissions import view_management_screens
from AccessControl.PermissionRole import _what_not_even_god_should_do
from AccessControl.unauthorized import Unauthorized
from AccessControl.ZopeGuards import guarded_getattr
from Acquisition import aq_parent
from Acquisition import aq_inner
......@@ -29,19 +30,20 @@ defaultBindings = {'name_context': 'context',
'name_m_self': 'script',
'name_ns': '',
'name_subpath': 'traverse_subpath',
}
}
_marker = [] # Create a new marker
class NameAssignments:
# Note that instances of this class are intended to be immutable
# and persistent but not inherit from ExtensionClass.
_exprs = (('name_context', 'self._getContext()'),
_exprs = (('name_context', 'self._getContext()'),
('name_container', 'self._getContainer()'),
('name_m_self', 'self'),
('name_ns', 'self._getNamespace(caller_namespace, kw)'),
('name_subpath', 'self._getTraverseSubpath()'),
('name_m_self', 'self'),
('name_ns', 'self._getNamespace(caller_namespace, kw)'),
('name_subpath', 'self._getTraverseSubpath()'),
)
_isLegalName = re.compile(r'_$|[a-zA-Z][a-zA-Z0-9_]*$').match
......@@ -60,8 +62,8 @@ class NameAssignments:
if not assigned_name:
continue
if not _isLegalName(assigned_name):
raise ValueError, ('"%s" is not a valid variable name.'
% assigned_name)
raise ValueError('"%s" is not a valid variable name.' %
assigned_name)
asgns[name] = assigned_name
self._asgns = asgns
......@@ -76,7 +78,7 @@ class NameAssignments:
def getAssignedName(self, name, default=_marker):
val = self._asgns.get(name, default)
if val is _marker:
raise KeyError, name
raise KeyError(name)
return val
def getAssignedNames(self):
......@@ -101,7 +103,7 @@ class NameAssignments:
text = ['bound_data.append(%s)\n' % bindtext]
for assigned_name in assigned_names:
text.append('if kw.has_key("%s"):\n' % assigned_name)
text.append(' del kw["%s"]\n' % assigned_name)
text.append(' del kw["%s"]\n' % assigned_name)
codetext = string.join(text, '')
return (compile(codetext, '<string>', 'exec'), len(assigned_names))
......@@ -148,8 +150,6 @@ class NameAssignments:
return self._generateCodeBlock(text, assigned_names)
from AccessControl.unauthorized import Unauthorized
class UnauthorizedBinding:
"""Explanation: as of Zope 2.6.3 a security hole was closed - no
security check was happening when 'context' and 'container'
......@@ -191,7 +191,6 @@ class UnauthorizedBinding:
self.__you_lose()
return guarded_getattr(self._wrapped, name, default)
#return getattr(self._wrapped, name, default)
def __you_lose(self):
name = self.__dict__['_name']
......@@ -199,6 +198,7 @@ class UnauthorizedBinding:
__str__ = __call__ = index_html = __you_lose
class Bindings:
security = ClassSecurityInfo()
......@@ -237,7 +237,7 @@ class Bindings:
path = request['TraversalRequestNameStack']
names = self.getBindingAssignments()
if (not names.isNameAssigned('name_subpath') or
(path and hasattr(self.aq_base, path[-1])) ):
(path and hasattr(self.aq_base, path[-1]))):
return
subpath = path[:]
path[:] = []
......@@ -278,7 +278,8 @@ class Bindings:
parent = aq_parent(self)
inner = aq_inner(self)
container = aq_parent(inner)
try: getSecurityManager().validate(parent, container, '', self)
try:
getSecurityManager().validate(parent, container, '', self)
except Unauthorized:
return UnauthorizedBinding('context', self)
return self
......@@ -291,7 +292,8 @@ class Bindings:
parent = aq_parent(self)
inner = aq_inner(self)
container = aq_parent(inner)
try: getSecurityManager().validate(parent, container, '', self)
try:
getSecurityManager().validate(parent, container, '', self)
except Unauthorized:
return UnauthorizedBinding('container', self)
return self
......
......@@ -17,14 +17,14 @@ from AccessControl.SecurityInfo import ClassSecurityInfo
from App.special_dtml import DTMLFile
from Shared.DC.Scripts.Bindings import Bindings
class BindingsUI(Bindings):
security = ClassSecurityInfo()
manage_options = (
{'label':'Bindings',
'action':'ZBindingsHTML_editForm'},
)
{'label': 'Bindings', 'action': 'ZBindingsHTML_editForm'},
)
security.declareProtected(view_management_screens,
'ZBindingsHTML_editForm')
......
......@@ -27,9 +27,10 @@ from OFS.SimpleItem import SimpleItem
from zExceptions import Redirect
from Shared.DC.Scripts.BindingsUI import BindingsUI
from Shared.DC.Scripts.Bindings import defaultBindings
# Temporary:
from Shared.DC.Scripts.Signature import FuncCode
from Shared.DC.Scripts.Bindings import defaultBindings # NOQA
from Shared.DC.Scripts.Signature import FuncCode # NOQA
class Script(SimpleItem, BindingsUI):
"""Web-callable script mixin
......@@ -53,7 +54,7 @@ class Script(SimpleItem, BindingsUI):
for argvar in argvars:
if argvar.value:
vv.append("%s=%s" % (quote(argvar.name), quote(argvar.value)))
raise Redirect, "%s?%s" % (REQUEST['URL1'], join(vv, '&'))
raise Redirect("%s?%s" % (REQUEST['URL1'], join(vv, '&')))
from Signature import _setFuncSignature
......
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
......@@ -29,7 +29,7 @@ def escape(s, encoding='repr'):
s = base64.encodestring(s)[:-1]
encoding = 'base64'
elif '>' in s or '<' in s or '&' in s:
if not ']]>' in s:
if ']]>' not in s:
s = '<![CDATA[' + s + ']]>'
encoding = 'cdata'
else:
......@@ -38,6 +38,7 @@ def escape(s, encoding='repr'):
s = s.replace('<', '&lt;')
return encoding, s
def unescape(s, encoding):
if encoding == 'base64':
return base64.decodestring(s)
......@@ -46,6 +47,7 @@ def unescape(s, encoding):
s = s.replace('&gt;', '>')
return s.replace('&amp;', '&')
class Global:
def __init__(self, module, name):
self.module = module
......@@ -60,6 +62,7 @@ class Global:
return '%s<%s%s name="%s" module="%s"/>\n' % (
' ' * indent, name, id, self.name, self.module)
class Scalar:
def __init__(self, v):
self._v = v
......@@ -76,6 +79,7 @@ class Scalar:
return '%s<%s%s>%s</%s>\n' % (
' ' * indent, name, id, self.value(), name)
class Long(Scalar):
def value(self):
result = str(self._v)
......@@ -83,6 +87,7 @@ class Long(Scalar):
return result[:-1]
return result
class String(Scalar):
def __init__(self, v, encoding=''):
encoding, v = escape(v, encoding)
......@@ -90,7 +95,7 @@ class String(Scalar):
self._v = v
def __str__(self, indent=0):
if hasattr(self,'id'):
if hasattr(self, 'id'):
id = ' id="%s"' % self.id
else:
id = ''
......@@ -102,6 +107,7 @@ class String(Scalar):
return '%s<%s%s%s>%s</%s>\n' % (
' ' * indent, name, id, encoding, self.value(), name)
class Unicode(String):
def __init__(self, v, encoding):
v = unicode(v, encoding)
......@@ -110,6 +116,7 @@ class Unicode(String):
def value(self):
return self._v.encode('utf-8')
class Wrapper:
def __init__(self, v):
self._v = v
......@@ -134,6 +141,7 @@ class Wrapper:
v = v.__str__()
return '%s<%s%s>\n%s%s</%s>\n' % (i, name, id, v, i, name)
class Collection:
def __str__(self, indent=0):
if hasattr(self, 'id'):
......@@ -148,6 +156,7 @@ class Collection:
else:
return '%s<%s%s/>\n' % (i, name, id)
class Dictionary(Collection):
def __init__(self):
self._d = []
......@@ -173,6 +182,7 @@ class Dictionary(Collection):
self._d
))
class Sequence(Collection):
def __init__(self, v=None):
if not v:
......@@ -199,11 +209,13 @@ class Sequence(Collection):
lambda v, indent=indent: self._stringify(v, indent),
self._subs))
class none:
def __str__(self, indent=0):
return ' ' * indent + '<none/>\n'
none = none()
class Reference(Scalar):
def __init__(self, v):
self._v = v
......@@ -215,6 +227,7 @@ class Reference(Scalar):
Get = Reference
class Object(Sequence):
def __init__(self, klass, args):
self._subs = [Klass(klass), args]
......@@ -222,16 +235,45 @@ class Object(Sequence):
def __setstate__(self, v):
self.append(State(v))
class Int(Scalar): pass
class Float(Scalar): pass
class List(Sequence): pass
class Tuple(Sequence): pass
class Key(Wrapper): pass
class Value(Wrapper): pass
class Klass(Wrapper): pass
class State(Wrapper): pass
class Pickle(Wrapper): pass
class Persistent(Wrapper): pass
class Int(Scalar):
pass
class Float(Scalar):
pass
class List(Sequence):
pass
class Tuple(Sequence):
pass
class Key(Wrapper):
pass
class Value(Wrapper):
pass
class Klass(Wrapper):
pass
class State(Wrapper):
pass
class Pickle(Wrapper):
pass
class Persistent(Wrapper):
pass
class ToXMLUnpickler(Unpickler):
......@@ -286,11 +328,11 @@ class ToXMLUnpickler(Unpickler):
for q in "\"'":
if rep.startswith(q):
if not rep.endswith(q):
raise ValueError, 'insecure string pickle'
raise ValueError('insecure string pickle')
rep = rep[len(q):-len(q)]
break
else:
raise ValueError, 'insecure string pickle'
raise ValueError('insecure string pickle')
self.append(String(rep.decode('string-escape')))
dispatch[STRING] = load_string
......@@ -300,7 +342,7 @@ class ToXMLUnpickler(Unpickler):
dispatch[BINSTRING] = load_binstring
def load_unicode(self):
self.append(Unicode(self.readline()[:-1],'raw-unicode-escape'))
self.append(Unicode(self.readline()[:-1], 'raw-unicode-escape'))
dispatch[UNICODE] = load_unicode
def load_binunicode(self):
......@@ -422,18 +464,22 @@ class ToXMLUnpickler(Unpickler):
def ToXMLload(file):
return ToXMLUnpickler(file).load()
def ToXMLloads(str):
from StringIO import StringIO
file = StringIO(str)
return ToXMLUnpickler(file).load()
def name(self, tag, data):
return ''.join(data[2:]).strip()
def start_pickle(self, tag, attrs):
self._pickleids = {}
return [tag, attrs]
def save_int(self, tag, data):
if self.binary:
v = int(name(self, tag, data))
......@@ -447,12 +493,14 @@ def save_int(self, tag, data):
return BININT + struct.pack('<i', v)
return INT + name(self, tag, data) + '\n'
def save_float(self, tag, data):
if self.binary:
return BINFLOAT + struct.pack('>d', float(name(self, tag, data)))
else:
return FLOAT + name(self, tag, data) + '\n'
def save_put(self, v, attrs):
id = attrs.get('id', '')
if id:
......@@ -472,6 +520,7 @@ def save_put(self, v, attrs):
return v + id
return v
def save_string(self, tag, data):
a = data[1]
v = ''.join(data[2:])
......@@ -488,6 +537,7 @@ def save_string(self, tag, data):
v = STRING + repr(v) + '\n'
return save_put(self, v, a)
def save_unicode(self, tag, data):
a = data[1]
v = ''.join(data[2:])
......@@ -504,12 +554,14 @@ def save_unicode(self, tag, data):
v = UNICODE + v + '\n'
return save_put(self, v, a)
def save_tuple(self, tag, data):
T = data[2:]
if not T:
return EMPTY_TUPLE
return save_put(self, MARK + ''.join(T) + TUPLE, data[1])
def save_list(self, tag, data):
L = data[2:]
if self.binary:
......@@ -522,6 +574,7 @@ def save_list(self, tag, data):
v = APPEND.join(L) + APPEND
return v
def save_dict(self, tag, data):
D = data[2:]
if self.binary:
......@@ -534,6 +587,7 @@ def save_dict(self, tag, data):
v = v + SETITEM.join(D) + SETITEM
return v
def save_reference(self, tag, data):
a = data[1]
id = a['id']
......@@ -549,6 +603,7 @@ def save_reference(self, tag, data):
else:
return GET + repr(id) + '\n'
def save_object(self, tag, data):
v = MARK + data[2]
x = data[3][1:]
......@@ -559,10 +614,12 @@ def save_object(self, tag, data):
v = v + data[4] + BUILD # state
return v
def save_global(self, tag, data):
a = data[1]
return save_put(self, GLOBAL + a['module'] + '\n' + a['name'] + '\n', a)
def save_persis(self, tag, data):
v = data[2]
if self.binary:
......@@ -570,6 +627,7 @@ def save_persis(self, tag, data):
else:
return PERSID + v
class xmlPickler(NoBlanks, xyap):
start_handlers = {
'pickle': lambda self, tag, attrs: [tag, attrs],
......@@ -587,11 +645,11 @@ class xmlPickler(NoBlanks, xyap):
'dictionary': save_dict,
'item': lambda self, tag, data: ''.join(map(str, data[2:])),
'value': lambda self, tag, data: data[2],
'key' : lambda self, tag, data: data[2],
'key': lambda self, tag, data: data[2],
'object': save_object,
'klass': lambda self, tag, data: data[2],
'state': lambda self, tag, data: data[2],
'global': save_global,
'persistent': save_persis,
'unicode': save_unicode,
}
}
......@@ -56,6 +56,7 @@ class xyap:
top = end[tag](self, tag, top)
append(top)
class NoBlanks:
def handle_data(self, data):
......@@ -71,12 +72,15 @@ def struct(self, tag, data):
_nulljoin = "".join
def name(self, tag, data):
return _nulljoin(data[2:]).strip()
def tuplef(self, tag, data):
return tuple(data[2:])
class XYap(xyap):
def __init__(self):
self._parser = xml.parsers.expat.ParserCreate()
......@@ -85,6 +89,7 @@ class XYap(xyap):
self._parser.CharacterDataHandler = self.handle_data
xyap.__init__(self)
class xmlrpc(NoBlanks, XYap):
end_handlers = {
'methodCall': tuplef,
......@@ -114,4 +119,4 @@ class xmlrpc(NoBlanks, XYap):
'name': name,
'array': lambda self, tag, data: data[2],
'data': lambda self, tag, data: data[2:],
}
}
......@@ -15,25 +15,31 @@
from ExtensionClass import Base
class LazyPrevBatch(Base):
def __of__(self, parent):
return Batch(parent._sequence, parent._size,
parent.first - parent._size + parent.overlap, 0,
parent.orphan, parent.overlap)
class LazyNextBatch(Base):
def __of__(self, parent):
try: parent._sequence[parent.end]
except IndexError: return None
try:
parent._sequence[parent.end]
except IndexError:
return None
return Batch(parent._sequence, parent._size,
parent.end - parent.overlap, 0,
parent.orphan, parent.overlap)
class LazySequenceLength(Base):
def __of__(self, parent):
parent.sequence_length = l = len(parent._sequence)
return l
class Batch(Base):
"""Create a sequence batch"""
__allow_access_to_unprotected_subobjects__ = 1
......@@ -63,7 +69,7 @@ class Batch(Base):
start = start + 1
start,end,sz = opt(start,end,size,orphan,sequence)
start, end, sz = opt(start, end, size, orphan, sequence)
self._sequence = sequence
self.size = sz
......@@ -77,43 +83,56 @@ class Batch(Base):
if self.first == 0:
self.previous = None
def __getitem__(self, index):
if index < 0:
if index + self.end < self.first: raise IndexError, index
if index + self.end < self.first:
raise IndexError(index)
return self._sequence[index + self.end]
if index >= self.length: raise IndexError, index
return self._sequence[index+self.first]
if index >= self.length:
raise IndexError(index)
return self._sequence[index + self.first]
def __len__(self):
return self.length
def opt(start,end,size,orphan,sequence):
def opt(start, end, size, orphan, sequence):
if size < 1:
if start > 0 and end > 0 and end >= start:
size=end+1-start
else: size=7
size = end + 1 - start
else:
size = 7
if start > 0:
try: sequence[start-1]
except IndexError: start=len(sequence)
try:
sequence[start - 1]
except IndexError:
start = len(sequence)
if end > 0:
if end < start: end=start
if end < start:
end = start
else:
end=start+size-1
try: sequence[end+orphan]
except IndexError: end=len(sequence)
end = start + size - 1
try:
sequence[end + orphan]
except IndexError:
end = len(sequence)
elif end > 0:
try: sequence[end-1]
except IndexError: end=len(sequence)
start=end+1-size
if start - 1 < orphan: start=1
try:
sequence[end - 1]
except IndexError:
end = len(sequence)
start = end + 1 - size
if start - 1 < orphan:
start = 1
else:
start=1
end=start+size-1
try: sequence[end+orphan-1]
except IndexError: end=len(sequence)
return start,end,size
start = 1
end = start + size - 1
try:
sequence[end + orphan - 1]
except IndexError:
end = len(sequence)
return start, end, size
......@@ -31,7 +31,7 @@ class SimpleTreeNode(TreeNode):
obid = self.id
pre = self.aq_acquire('tree_pre')
return {'link': '?%s-setstate=%s,%s,%s#%s' % \
return {'link': '?%s-setstate=%s,%s,%s#%s' %
(pre, setst[0], exnum, obid, obid),
'img': ''}
......
......@@ -13,27 +13,36 @@
"""Tree manipulation classes
"""
import base64
from string import translate, maketrans
import zlib
from Acquisition import Explicit
from ComputedAttribute import ComputedAttribute
from types import ListType, TupleType
class TreeNode(Explicit):
__allow_access_to_unprotected_subobjects__ = 1
state = 0 # leaf
state = 0 # leaf
height = 1
size = 1
def __init__(self):
self._child_list = []
def _add_child(self, child):
'Add a child which already has all of its children.'
self._child_list.append(child)
self.height = max(self.height, child.height + 1)
self.size = self.size + child.size
def flat(self):
'Return a flattened preorder list of tree nodes'
items = []
self.walk(items.append)
return items
def walk(self, f, data=None):
'Preorder walk this tree, passing each node to a function'
if data is None:
......@@ -42,18 +51,23 @@ class TreeNode(Explicit):
f(self, data)
for child in self._child_list:
child.__of__(self).walk(f, data)
def _depth(self):
return self.aq_parent.depth + 1
depth = ComputedAttribute(_depth, 1)
def __getitem__(self, index):
return self._child_list[index].__of__(self)
def __len__(self):
return len(self._child_list)
_marker = []
class TreeMaker:
'''Class for mapping a hierachy of objects into a tree of nodes.'''
'''Class for mapping a hierarchy of objects into a tree of nodes.'''
__allow_access_to_unprotected_subobjects__ = 1
......@@ -87,37 +101,35 @@ class TreeMaker:
def setIdAttr(self, id):
"""Set the attribute or method name called to get a unique Id.
The id attribute or method is used to get a unique id for every node in
the tree, so that the state of the tree can be encoded as a string using
Tree.encodeExpansion(). The returned id should be unique and stable
across Zope requests.
If the attribute or method isn't found on an object, either the objects
persistence Id or the result of id() on the object is used instead.
The id attribute or method is used to get a unique id for every
node in the tree, so that the state of the tree can be encoded
as a string using Tree.encodeExpansion(). The returned id should
be unique and stable across Zope requests.
If the attribute or method isn't found on an object, either
the objects persistence Id or the result of id() on the object
is used instead.
"""
self._id = id
def setExpandRoot(self, expand):
"""Set wether or not to expand the root node by default.
When no expanded flag or mapping is passed to .tree(), assume the root
node is expanded, and leave all subnodes closed.
The default is to expand the root node.
"""
self._expand_root = expand and True or False
def setAssumeChildren(self, assume):
"""Set wether or not to assume nodes have children.
When a node is not expanded, when assume children is set, don't
determine if it is a leaf node, but assume it can be opened. Use this
when determining the children for a node is expensive.
The default is to not assume there are children.
"""
self._assume_children = assume and True or False
......@@ -134,7 +146,6 @@ class TreeMaker:
0: Leaf node, cannot be opened or closed, no children are
processed.
1: Node opened. Children will be processed as part of the tree.
"""
self._state_function = function
......@@ -177,9 +188,11 @@ class TreeMaker:
id_attr = self._id
if hasattr(object, id_attr):
obid = getattr(object, id_attr)
if not simple_type(obid): obid = obid()
if not simple_type(obid):
obid = obid()
return obid
if hasattr(object, '_p_oid'): return str(object._p_oid)
if hasattr(object, '_p_oid'):
return str(object._p_oid)
return id(object)
def hasChildren(self, object):
......@@ -196,7 +209,7 @@ class TreeMaker:
self._cached_children = None
if ob is object:
return children
if self._values_function is not None:
return self._values_function(object)
......@@ -213,18 +226,17 @@ class TreeMaker:
return self._values_filter(children)
return children
def simple_type(ob,
is_simple={type(''):1, type(0):1, type(0.0):1,
type(0L):1, type(None):1 }.has_key):
is_simple={type(''): 1, type(0): 1, type(0.0): 1,
type(None): 1}.has_key):
return is_simple(type(ob))
import base64
from string import translate, maketrans
import zlib
a2u_map = maketrans('+/=', '-._')
u2a_map = maketrans('-._', '+/=')
def b2a(s):
'''Encode a value as a cookie- and url-safe string.
......@@ -232,10 +244,12 @@ def b2a(s):
'''
return translate(base64.encodestring(str(s)), a2u_map).replace('\n', '')
def a2b(s):
'''Decode a b2a-encoded string.'''
return base64.decodestring(translate(s, u2a_map))
def encodeExpansion(nodes, compress=1):
'''Encode the expanded node ids of a tree into a string.
......@@ -248,7 +262,8 @@ def encodeExpansion(nodes, compress=1):
last_depth = -1
n = 0
for node in nodes:
if node.state <=0: continue
if node.state <= 0:
continue
dd = last_depth - node.depth + 1
last_depth = node.depth
if dd > 0:
......@@ -258,26 +273,27 @@ def encodeExpansion(nodes, compress=1):
n = n + 1
result = ':'.join(steps)
if compress and len(result) > 2:
zresult = ':' + b2a(zlib.compress(result, 9))
zresult = ':' + b2a(zlib.compress(result, 9))
if len(zresult) < len(result):
result = zresult
return result
def decodeExpansion(s, nth=None, maxsize=8192):
'''Decode an expanded node map from a string.
If nth is an integer, also return the (map, key) pair for the nth entry.
'''
if len(s) > maxsize: # Set limit to avoid DoS attacks.
if len(s) > maxsize: # Set limit to avoid DoS attacks.
raise ValueError('Encoded node map too large')
if s[0] == ':': # Compressed state
if s[0] == ':': # Compressed state
dec = zlib.decompressobj()
s = dec.decompress(a2b(s[1:]), maxsize)
if dec.unconsumed_tail:
raise ValueError('Encoded node map too large')
del dec
map = m = {}
mstack = []
pop = 0
......
......@@ -14,10 +14,11 @@
"""
import cgi
import sys
import urllib
from AccessControl import getSecurityManager
from AccessControl.unauthorized import Unauthorized
from AccessControl.ZopeGuards import guarded_getitem
from DateTime.DateTime import DateTime
from ZTUtils.Batch import Batch
......@@ -27,56 +28,50 @@ from ZTUtils.Tree import decodeExpansion
from ZTUtils.Tree import encodeExpansion
from ZTUtils.Tree import TreeMaker
try:
from AccessControl.ZopeGuards import guarded_getitem
except ImportError:
Unauthorized = 'Unauthorized'
def guarded_getitem(object, index):
v = object[index]
if getSecurityManager().validate(object, object, index, v):
return v
raise Unauthorized, 'unauthorized access to element %s' % index
else:
from AccessControl.unauthorized import Unauthorized
class LazyFilter(Lazy):
# A LazyFilter that checks with the security policy
def __init__(self, seq, test=None, skip=None):
self._seq=seq
self._data=[]
self._eindex=-1
self._test=test
self._seq = seq
self._data = []
self._eindex = -1
self._test = test
if not (skip is None or str(skip) == skip):
raise TypeError, 'Skip must be None or a string'
raise TypeError('Skip must be None or a string')
self._skip = skip
def __getitem__(self,index):
data=self._data
try: s=self._seq
except AttributeError: return data[index]
i=index
if i < 0: i=len(self)+i
if i < 0: raise IndexError, index
ind=len(data)
if i < ind: return data[i]
ind=ind-1
test=self._test
e=self._eindex
def __getitem__(self, index):
data = self._data
try:
s = self._seq
except AttributeError:
return data[index]
i = index
if i < 0:
i = len(self) + i
if i < 0:
raise IndexError(index)
ind = len(data)
if i < ind:
return data[i]
ind = ind - 1
test = self._test
e = self._eindex
skip = self._skip
while i > ind:
e = e + 1
try:
try: v = guarded_getitem(s, e)
except Unauthorized, vv:
try:
v = guarded_getitem(s, e)
except Unauthorized as vv:
if skip is None:
self._eindex = e
msg = '(item %s): %s' % (index, vv)
raise Unauthorized, msg, sys.exc_info()[2]
raise Unauthorized(msg)
skip_this = 1
else:
skip_this = 0
......@@ -84,35 +79,43 @@ class LazyFilter(Lazy):
del self._test
del self._seq
del self._eindex
raise IndexError, index
if skip_this: continue
raise IndexError(index)
if skip_this:
continue
if skip and not getSecurityManager().checkPermission(skip, v):
continue
if test is None or test(v):
data.append(v)
ind=ind+1
self._eindex=e
ind = ind + 1
self._eindex = e
return data[i]
class TreeSkipMixin:
'''Mixin class to make trees test security, and allow
skipping of unauthorized objects. '''
skip = None
def setSkip(self, skip):
self.skip = skip
return self
def getChildren(self, object):
return LazyFilter(self._getChildren(object), skip=self.skip)
def filterChildren(self, children):
if self._values_filter:
return self._values_filter(LazyFilter(children, skip=self.skip))
return children
class TreeMaker(TreeSkipMixin, TreeMaker):
_getChildren = TreeMaker.getChildren
class SimpleTreeMaker(TreeSkipMixin, SimpleTreeMaker):
_getChildren = SimpleTreeMaker.getChildren
def cookieTree(self, root_object, default_state=None):
'''Make a tree with state stored in a cookie.'''
tree_pre = self.tree_pre
......@@ -133,7 +136,7 @@ class SimpleTreeMaker(TreeSkipMixin, SimpleTreeMaker):
m[obid] = {expid: None}
else:
m[obid][expid] = None
elif st == 'c' and m is not state and obid==expid:
elif st == 'c' and m is not state and obid == expid:
del m[obid]
else:
state = decodeExpansion(state)
......@@ -146,6 +149,8 @@ class SimpleTreeMaker(TreeSkipMixin, SimpleTreeMaker):
# Make the Batch class test security, and let it skip unauthorized.
_Batch = Batch
class Batch(Batch):
def __init__(self, sequence, size, start=0, end=0,
orphan=0, overlap=0, skip_unauthorized=None):
......@@ -160,6 +165,7 @@ class Batch(Batch):
# "make_query(bstart=batch.previous.first)" to one and
# "make_query(bstart=batch.end)" to the other.
def make_query(*args, **kwargs):
'''Construct a URL query string, with marshalling markup.
......@@ -187,6 +193,7 @@ def make_query(*args, **kwargs):
return '&'.join(qlist)
def make_hidden_input(*args, **kwargs):
'''Construct a set of hidden input elements, with marshalling markup.
......@@ -205,7 +212,9 @@ def make_hidden_input(*args, **kwargs):
d.update(arg)
d.update(kwargs)
hq = lambda x:cgi.escape(x, quote=True)
def hq(x):
return cgi.escape(x, quote=True)
qlist = complex_marshal(d.items())
for i in range(len(qlist)):
k, m, v = qlist[i]
......@@ -214,6 +223,7 @@ def make_hidden_input(*args, **kwargs):
return '\n'.join(qlist)
def complex_marshal(pairs):
'''Add request marshalling information to a list of name-value pairs.
......@@ -243,11 +253,11 @@ def complex_marshal(pairs):
if isinstance(sv, list):
for ssv in sv:
sm = simple_marshal(ssv)
sublist.append(('%s.%s' % (k, sk),
'%s:list:record' % sm, ssv))
sublist.append(('%s.%s' % (k, sk),
'%s:list:record' % sm, ssv))
else:
sm = simple_marshal(sv)
sublist.append(('%s.%s' % (k, sk), '%s:record' % sm, sv))
sublist.append(('%s.%s' % (k, sk), '%s:record' % sm, sv))
elif isinstance(v, list):
sublist = []
for sv in v:
......@@ -262,6 +272,7 @@ def complex_marshal(pairs):
return pairs
def simple_marshal(v):
if isinstance(v, str):
return ''
......@@ -275,6 +286,7 @@ def simple_marshal(v):
return ':date'
return ''
def url_query(request, req_name="URL", omit=None):
'''Construct a URL with a query string, using the current request.
......
# This file is needed to make this directory a Python package.
from unittest import TestCase, makeSuite
import unittest
from ZTUtils import Batch
class BatchTests(TestCase):
class BatchTests(unittest.TestCase):
def testEmpty(self):
'''Test empty Batch'''
......@@ -19,7 +20,7 @@ class BatchTests(TestCase):
assert b.previous is None
assert b.next is None
assert b.start == 1, b.start
assert len(b) == b.end == bsize
assert len(b) == b.end == bsize
assert b.sequence_length == len(seq)
for i in seq:
assert b[i] == i, (b[i], i)
......@@ -41,9 +42,7 @@ class BatchTests(TestCase):
def testLengthEqualsSizePlusOrphans(self):
'''Test limit case where batch length is equal to size + orphans'''
for bsize, length in ((12,11), (13,12), (14,13), (15,10)):
b = Batch(range(bsize), size=10, start=1, end=0, orphan=3, overlap=0)
for bsize, length in ((12, 11), (13, 12), (14, 13), (15, 10)):
b = Batch(range(bsize),
size=10, start=1, end=0, orphan=3, overlap=0)
assert length == b.length
def test_suite():
return makeSuite(BatchTests)
......@@ -2,6 +2,7 @@ import unittest
from ZTUtils import Tree
class Item:
children = ()
id = ''
......@@ -10,8 +11,11 @@ class Item:
self.id = id
self.children = children
def tpId(self): return self.id
def tpValues(self): return self.children
def tpId(self):
return self.id
def tpValues(self):
return self.children
class TreeTests(unittest.TestCase):
......@@ -66,7 +70,10 @@ class TreeTests(unittest.TestCase):
self.assertEqual([s.object for s in set], expected_set)
set = []
def collect(node, set=set): set.append(node.object)
def collect(node, set=set):
set.append(node.object)
treeroot.walk(collect)
self.assertEqual(len(set), treeroot.size)
self.assertEqual(set, expected_set)
......@@ -82,15 +89,19 @@ class TreeTests(unittest.TestCase):
self.assert_(treeroot.object is self.root)
items = self.items
expected_set = [items['a'], items['b'], items['d'], items['e'],
expected_set = [
items['a'], items['b'], items['d'], items['e'],
items['c'], items['f'], items['h'], items['i'], items['g']]
set = treeroot.flat()
self.assertEqual(len(set), treeroot.size)
self.assertEqual([s.object for s in set], expected_set)
set = []
def collect(node, set=set): set.append(node.object)
def collect(node, set=set):
set.append(node.object)
treeroot.walk(collect)
self.assertEqual(len(set), treeroot.size)
self.assertEqual(set, expected_set)
......@@ -145,21 +156,24 @@ class TreeTests(unittest.TestCase):
self.tm.setChildAccess(filter=filter)
treeroot = self.tm.tree(self.root, 1)
self.assertEqual(treeroot.size, 3)
self.assertEqual(len(treeroot), 1)
self.assertEqual(len(treeroot[0]), 1)
expected_set = [self.items['a'], self.items['b'], self.items['d']]
set = []
def collect(node, set=set): set.append(node.object)
def collect(node, set=set):
set.append(node.object)
treeroot.walk(collect)
self.assertEqual(set, expected_set)
def testChildrenFunction(self):
def childrenFunction(object):
return object.children
self.tm.setChildAccess(function=childrenFunction)
treeroot = self.tm.tree(self.root)
......@@ -172,7 +186,7 @@ class TreeTests(unittest.TestCase):
if object.id == 'd':
return -1
return state
self.tm.setStateFunction(stateFunction)
treeroot = self.tm.tree(self.root)
......@@ -198,9 +212,9 @@ class TreeTests(unittest.TestCase):
def testEncodedExpansionIdWithDot(self):
# Regression test for Collector issue #603
# An encoded node ID with a first character with the first 6 bits set.
item = Item('\xfcberbug!', (Item('b'),)) # 'uberbug!' with u-umlaut.
item = Item('\xfcberbug!', (Item('b'),)) # 'uberbug!' with u-umlaut.
treeroot1 = self.tm.tree(item)
encoded = Tree.encodeExpansion(treeroot1.flat())
decodedmap = Tree.decodeExpansion(encoded)
......@@ -208,17 +222,13 @@ class TreeTests(unittest.TestCase):
self.assertEqual(treeroot1.size, treeroot2.size)
self.assertEqual(len(treeroot1), len(treeroot2))
def testDecodeInputSizeLimit(self):
self.assertRaises(ValueError, Tree.decodeExpansion, 'x' * 10000)
def testDecodeDecompressedSizeLimit(self):
import zlib
from ZTUtils.Tree import b2a
big = b2a(zlib.compress('x' * (1024*1100)))
self.assert_(len(big) < 8192) # Must be under the input size limit
big = b2a(zlib.compress('x' * (1024 * 1100)))
self.assert_(len(big) < 8192) # Must be under the input size limit
self.assertRaises(ValueError, Tree.decodeExpansion, ':' + big)
def test_suite():
return unittest.makeSuite(TreeTests)
from unittest import TestCase, makeSuite
import unittest
import urllib
from ZTUtils.Zope import make_query, complex_marshal
from ZTUtils.Zope import make_hidden_input
from DateTime import DateTime
class QueryTests(TestCase):
class QueryTests(unittest.TestCase):
def testMarshallLists(self):
'''Test marshalling lists'''
test_date = DateTime()
list_ = [1, test_date, 'str']
result = complex_marshal([('list',list_),])
result = complex_marshal([('list', list_), ])
assert result == [('list', ':int:list', 1),
('list', ':date:list', test_date),
('list', ':list', 'str')]
......@@ -20,7 +21,7 @@ class QueryTests(TestCase):
'''Test marshalling records'''
test_date = DateTime()
record = {'arg1': 1, 'arg2': test_date, 'arg3': 'str'}
result = complex_marshal([('record',record),])
result = complex_marshal([('record', record), ])
assert result == [('record.arg1', ':int:record', 1),
('record.arg2', ':date:record', test_date),
('record.arg3', ':record', 'str')]
......@@ -29,7 +30,7 @@ class QueryTests(TestCase):
'''Test marshalling lists inside of records'''
test_date = DateTime()
record = {'arg1': [1, test_date, 'str'], 'arg2': 1}
result = complex_marshal([('record',record),])
result = complex_marshal([('record', record), ])
assert result == [('record.arg1', ':int:list:record', 1),
('record.arg1', ':date:list:record', test_date),
('record.arg1', ':list:record', 'str'),
......@@ -41,12 +42,16 @@ class QueryTests(TestCase):
quote_date = urllib.quote(str(test_date))
record = {'arg1': [1, test_date, 'str'], 'arg2': 1}
list_ = [1, test_date, 'str']
date = test_date
int_ = 1
str_ = 'str'
query = make_query(date=test_date, integer=int_, listing=list_,
record=record, string=str_)
assert query == 'date:date=%s&integer:int=1&listing:int:list=1&listing:date:list=%s&listing:list=str&string=str&record.arg1:int:list:record=1&record.arg1:date:list:record=%s&record.arg1:list:record=str&record.arg2:int:record=1'%(quote_date,quote_date,quote_date)
assert query == (
'date:date=%s&integer:int=1&listing:int:list=1&'
'listing:date:list=%s&listing:list=str&string=str&'
'record.arg1:int:list:record=1&record.arg1:date:list:record=%s&'
'record.arg1:list:record=str&record.arg2:int:record=1' % (
quote_date, quote_date, quote_date))
def testMakeHiddenInput(self):
tag = make_hidden_input(foo='bar')
......@@ -55,11 +60,11 @@ class QueryTests(TestCase):
self.assertEqual(tag, '<input type="hidden" name="foo:int" value="1">')
# Escaping
tag = make_hidden_input(foo='bar & baz')
self.assertEqual(tag, '<input type="hidden" name="foo" value="bar &amp; baz">')
self.assertEqual(
tag, '<input type="hidden" name="foo" value="bar &amp; baz">')
tag = make_hidden_input(foo='<bar>')
self.assertEqual(tag, '<input type="hidden" name="foo" value="&lt;bar&gt;">')
self.assertEqual(
tag, '<input type="hidden" name="foo" value="&lt;bar&gt;">')
tag = make_hidden_input(foo='"bar"')
self.assertEqual(tag, '<input type="hidden" name="foo" value="&quot;bar&quot;">')
def test_suite():
return makeSuite(QueryTests)
self.assertEqual(
tag, '<input type="hidden" name="foo" value="&quot;bar&quot;">')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment