Commit ab628670 authored by Hanno Schlichting's avatar Hanno Schlichting

Make webdav/ftp methods conditionally available based on ZServer presence.

parent ed87506a
...@@ -20,6 +20,7 @@ import warnings ...@@ -20,6 +20,7 @@ import warnings
from AccessControl.class_init import InitializeClass from AccessControl.class_init import InitializeClass
from AccessControl.SecurityInfo import ClassSecurityInfo from AccessControl.SecurityInfo import ClassSecurityInfo
from Acquisition import Explicit from Acquisition import Explicit
from App import bbb
from App.Common import package_home from App.Common import package_home
from App.Common import rfc1123_date from App.Common import rfc1123_date
from App.config import getConfiguration from App.config import getConfiguration
...@@ -119,12 +120,13 @@ class ImageFile(Explicit): ...@@ -119,12 +120,13 @@ class ImageFile(Explicit):
return filestream_iterator(self.path, mode='rb') return filestream_iterator(self.path, mode='rb')
security.declarePublic('HEAD') if bbb.HAS_ZSERVER:
def HEAD(self, REQUEST, RESPONSE): security.declarePublic('HEAD')
""" """ def HEAD(self, REQUEST, RESPONSE):
RESPONSE.setHeader('Content-Type', self.content_type) """ """
RESPONSE.setHeader('Last-Modified', self.lmh) RESPONSE.setHeader('Content-Type', self.content_type)
return '' RESPONSE.setHeader('Last-Modified', self.lmh)
return ''
def __len__(self): def __len__(self):
# This is bogus and needed because of the way Python tests truth. # This is bogus and needed because of the way Python tests truth.
......
...@@ -10,45 +10,11 @@ ...@@ -10,45 +10,11 @@
# FOR A PARTICULAR PURPOSE. # FOR A PARTICULAR PURPOSE.
# #
############################################################################## ##############################################################################
"""FTP Support for Zope classes.
Preliminary FTP support interface. Note, most FTP functions are import pkg_resources
provided by existing methods such as PUT and manage_delObjects.
All FTP methods should be governed by a single permission: HAS_ZSERVER = True
'FTP access'. try:
""" dist = pkg_resources.get_distribution('ZServer')
except pkg_resources.DistributionNotFound:
from zope.interface import implements HAS_ZSERVER = False
from interfaces import IFTPAccess
class FTPInterface:
"Interface for FTP objects"
implements(IFTPAccess)
# XXX The stat and list marshal format should probably
# be XML, not marshal, maybe Andrew K's xml-marshal.
# This will probably be changed later.
def manage_FTPstat(self, REQUEST):
"""Returns a stat-like tuple. (marshalled to a string) Used by
FTP for directory listings, and MDTM and SIZE"""
def manage_FTPlist(self, REQUEST):
"""Returns a directory listing consisting of a tuple of
(id,stat) tuples, marshaled to a string. Note, the listing it
should include '..' if there is a Folder above the current
one.
In the case of non-foldoid objects it should return a single
tuple (id,stat) representing itself."""
# Optional method to support FTP download.
# Should not be implemented by Foldoid objects.
def manage_FTPget(self):
"""Returns the source content of an object. For example, the
source text of a Document, or the data of a file."""
...@@ -133,15 +133,16 @@ class Application(ApplicationDefaultPermissions, Folder.Folder): ...@@ -133,15 +133,16 @@ class Application(ApplicationDefaultPermissions, Folder.Folder):
"""Utility function to return current date/time""" """Utility function to return current date/time"""
return DateTime(*args) return DateTime(*args)
def DELETE(self, REQUEST, RESPONSE): if bbb.HAS_ZSERVER:
"""Delete a resource object.""" def DELETE(self, REQUEST, RESPONSE):
self.dav__init(REQUEST, RESPONSE) """Delete a resource object."""
raise Forbidden('This resource cannot be deleted.') self.dav__init(REQUEST, RESPONSE)
raise Forbidden('This resource cannot be deleted.')
def MOVE(self, REQUEST, RESPONSE):
"""Move a resource to a new location.""" def MOVE(self, REQUEST, RESPONSE):
self.dav__init(REQUEST, RESPONSE) """Move a resource to a new location."""
raise Forbidden('This resource cannot be moved.') self.dav__init(REQUEST, RESPONSE)
raise Forbidden('This resource cannot be moved.')
def absolute_url(self, relative=0): def absolute_url(self, relative=0):
"""The absolute URL of the root object is BASE1 or "/". """The absolute URL of the root object is BASE1 or "/".
......
...@@ -31,6 +31,7 @@ from AccessControl.requestmethod import requestmethod ...@@ -31,6 +31,7 @@ from AccessControl.requestmethod import requestmethod
from AccessControl.tainted import TaintedString from AccessControl.tainted import TaintedString
from DocumentTemplate.permissions import change_dtml_methods from DocumentTemplate.permissions import change_dtml_methods
from DocumentTemplate.security import RestrictedDTML from DocumentTemplate.security import RestrictedDTML
from OFS import bbb
from OFS.Cache import Cacheable from OFS.Cache import Cacheable
from OFS.History import Historical from OFS.History import Historical
from OFS.History import html_diff from OFS.History import html_diff
...@@ -366,27 +367,28 @@ class DTMLMethod(RestrictedDTML, ...@@ -366,27 +367,28 @@ class DTMLMethod(RestrictedDTML,
RESPONSE.setHeader('Content-Type', 'text/plain') RESPONSE.setHeader('Content-Type', 'text/plain')
return self.read() return self.read()
security.declareProtected(change_dtml_methods, 'PUT') if bbb.HAS_ZSERVER:
def PUT(self, REQUEST, RESPONSE): security.declareProtected(change_dtml_methods, 'PUT')
""" Handle FTP / HTTP PUT requests. def PUT(self, REQUEST, RESPONSE):
""" """ Handle FTP / HTTP PUT requests.
self.dav__init(REQUEST, RESPONSE) """
self.dav__simpleifhandler(REQUEST, RESPONSE, refresh=1) self.dav__init(REQUEST, RESPONSE)
body = REQUEST.get('BODY', '') self.dav__simpleifhandler(REQUEST, RESPONSE, refresh=1)
self._validateProxy(REQUEST) body = REQUEST.get('BODY', '')
self.munge(body) self._validateProxy(REQUEST)
self.ZCacheable_invalidate() self.munge(body)
RESPONSE.setStatus(204) self.ZCacheable_invalidate()
return RESPONSE RESPONSE.setStatus(204)
return RESPONSE
security.declareProtected(ftp_access, 'manage_FTPstat')
security.declareProtected(ftp_access, 'manage_FTPlist') security.declareProtected(ftp_access, 'manage_FTPstat')
security.declareProtected(ftp_access, 'manage_FTPlist')
security.declareProtected(ftp_access, 'manage_FTPget')
def manage_FTPget(self): security.declareProtected(ftp_access, 'manage_FTPget')
""" Get source for FTP download. def manage_FTPget(self):
""" """ Get source for FTP download.
return self.read() """
return self.read()
def manage_historyCompare(self, rev1, rev2, REQUEST, def manage_historyCompare(self, rev1, rev2, REQUEST,
historyComparisonResults=''): historyComparisonResults=''):
......
...@@ -37,6 +37,7 @@ from zope.contenttype import guess_content_type ...@@ -37,6 +37,7 @@ from zope.contenttype import guess_content_type
from zope.interface import implementedBy from zope.interface import implementedBy
from zope.interface import implements from zope.interface import implements
from OFS import bbb
from OFS.Cache import Cacheable from OFS.Cache import Cacheable
from OFS.interfaces import IWriteLock from OFS.interfaces import IWriteLock
from OFS.PropertyManager import PropertyManager from OFS.PropertyManager import PropertyManager
...@@ -595,23 +596,6 @@ class File(Persistent, Implicit, PropertyManager, ...@@ -595,23 +596,6 @@ class File(Persistent, Implicit, PropertyManager,
return next, size return next, size
security.declareProtected(change_images_and_files, 'PUT')
def PUT(self, REQUEST, RESPONSE):
"""Handle HTTP PUT requests"""
self.dav__init(REQUEST, RESPONSE)
self.dav__simpleifhandler(REQUEST, RESPONSE, refresh=1)
type = REQUEST.get_header('content-type', None)
file = REQUEST['BODYFILE']
data, size = self._read_data(file)
content_type = self._get_content_type(file, data, self.__name__,
type or self.content_type)
self.update_data(data, content_type, size)
RESPONSE.setStatus(204)
return RESPONSE
security.declareProtected(View, 'get_size') security.declareProtected(View, 'get_size')
def get_size(self): def get_size(self):
# Get the size of a file or image. # Get the size of a file or image.
...@@ -636,35 +620,53 @@ class File(Persistent, Implicit, PropertyManager, ...@@ -636,35 +620,53 @@ class File(Persistent, Implicit, PropertyManager,
def __len__(self): def __len__(self):
return 1 return 1
security.declareProtected(ftp_access, 'manage_FTPstat') if bbb.HAS_ZSERVER:
security.declareProtected(ftp_access, 'manage_FTPlist') security.declareProtected(change_images_and_files, 'PUT')
def PUT(self, REQUEST, RESPONSE):
security.declareProtected(ftp_access, 'manage_FTPget') """Handle HTTP PUT requests"""
def manage_FTPget(self): self.dav__init(REQUEST, RESPONSE)
"""Return body for ftp.""" self.dav__simpleifhandler(REQUEST, RESPONSE, refresh=1)
RESPONSE = self.REQUEST.RESPONSE type = REQUEST.get_header('content-type', None)
if self.ZCacheable_isCachingEnabled(): file = REQUEST['BODYFILE']
result = self.ZCacheable_get(default=None)
if result is not None: data, size = self._read_data(file)
# We will always get None from RAMCacheManager but we will get content_type = self._get_content_type(file, data, self.__name__,
# something implementing the IStreamIterator interface type or self.content_type)
# from FileCacheManager. self.update_data(data, content_type, size)
# the content-length is required here by HTTPResponse, even
# though FTP doesn't use it. RESPONSE.setStatus(204)
RESPONSE.setHeader('Content-Length', self.size) return RESPONSE
return result
security.declareProtected(ftp_access, 'manage_FTPstat')
security.declareProtected(ftp_access, 'manage_FTPlist')
security.declareProtected(ftp_access, 'manage_FTPget')
def manage_FTPget(self):
"""Return body for ftp."""
RESPONSE = self.REQUEST.RESPONSE
if self.ZCacheable_isCachingEnabled():
result = self.ZCacheable_get(default=None)
if result is not None:
# We will always get None from RAMCacheManager but we will
# get something implementing the IStreamIterator interface
# from FileCacheManager.
# the content-length is required here by HTTPResponse,
# even though FTP doesn't use it.
RESPONSE.setHeader('Content-Length', self.size)
return result
data = self.data data = self.data
if isinstance(data, str): if isinstance(data, str):
RESPONSE.setBase(None) RESPONSE.setBase(None)
return data return data
while data is not None: while data is not None:
RESPONSE.write(data.data) RESPONSE.write(data.data)
data = data.next data = data.next
return '' return ''
InitializeClass(File) InitializeClass(File)
......
...@@ -596,62 +596,6 @@ class ObjectManager(CopyContainer, ...@@ -596,62 +596,6 @@ class ObjectManager(CopyContainer,
listing.sort() listing.sort()
return listing return listing
# FTP support methods
security.declareProtected(ftp_access, 'manage_FTPlist')
def manage_FTPlist(self, REQUEST):
"""Directory listing for FTP.
"""
out = ()
# check to see if we are being acquiring or not
ob = self
while 1:
if is_acquired(ob):
raise ValueError('FTP List not supported on acquired objects')
if not hasattr(ob, '__parent__'):
break
ob = aq_parent(ob)
files = list(self.objectItems())
# recursive ride through all subfolders (ls -R) (ajung)
if REQUEST.environ.get('FTP_RECURSIVE', 0) == 1:
all_files = copy.copy(files)
for f in files:
if (hasattr(aq_base(f[1]), 'isPrincipiaFolderish') and
f[1].isPrincipiaFolderish):
all_files.extend(findChildren(f[1]))
files = all_files
# Perform globbing on list of files (ajung)
globbing = REQUEST.environ.get('GLOBBING', '')
if globbing:
files = [x for x in files if fnmatch.fnmatch(x[0], globbing)]
files.sort()
if not (hasattr(self, 'isTopLevelPrincipiaApplicationObject') and
self.isTopLevelPrincipiaApplicationObject):
files.insert(0, ('..', aq_parent(self)))
files.insert(0, ('.', self))
for k, v in files:
# Note that we have to tolerate failure here, because
# Broken objects won't stat correctly. If an object fails
# to be able to stat itself, we will ignore it, but log
# the error.
try:
stat = marshal.loads(v.manage_FTPstat(REQUEST))
except:
LOG.error("Failed to stat file '%s'" % k,
exc_info=sys.exc_info())
stat = None
if stat is not None:
out = out + ((k, stat),)
return marshal.dumps(out)
security.declareProtected(ftp_access, 'manage_hasId') security.declareProtected(ftp_access, 'manage_hasId')
def manage_hasId(self, REQUEST): def manage_hasId(self, REQUEST):
""" check if the folder has an object with REQUEST['id'] """ """ check if the folder has an object with REQUEST['id'] """
...@@ -659,37 +603,94 @@ class ObjectManager(CopyContainer, ...@@ -659,37 +603,94 @@ class ObjectManager(CopyContainer,
if not REQUEST['id'] in self.objectIds(): if not REQUEST['id'] in self.objectIds():
raise KeyError(REQUEST['id']) raise KeyError(REQUEST['id'])
security.declareProtected(ftp_access, 'manage_FTPstat') if bbb.HAS_ZSERVER:
def manage_FTPstat(self, REQUEST): # FTP support methods
"""Psuedo stat, used by FTP for directory listings. security.declareProtected(ftp_access, 'manage_FTPlist')
""" def manage_FTPlist(self, REQUEST):
mode = 0o0040000 """Directory listing for FTP.
from AccessControl.User import nobody """
# check to see if we are acquiring our objectValues or not out = ()
if not (len(REQUEST.PARENTS) > 1 and
self.objectValues() == REQUEST.PARENTS[1].objectValues()): # check to see if we are being acquiring or not
try: ob = self
if getSecurityManager().validate( while 1:
None, self, 'manage_FTPlist', self.manage_FTPlist): if is_acquired(ob):
mode = mode | 0o0770 raise ValueError(
except Exception: 'FTP List not supported on acquired objects')
pass if not hasattr(ob, '__parent__'):
break
ob = aq_parent(ob)
files = list(self.objectItems())
# recursive ride through all subfolders (ls -R) (ajung)
if REQUEST.environ.get('FTP_RECURSIVE', 0) == 1:
all_files = copy.copy(files)
for f in files:
if (hasattr(aq_base(f[1]), 'isPrincipiaFolderish') and
f[1].isPrincipiaFolderish):
all_files.extend(findChildren(f[1]))
files = all_files
# Perform globbing on list of files (ajung)
globbing = REQUEST.environ.get('GLOBBING', '')
if globbing:
files = [x for x in files if fnmatch.fnmatch(x[0], globbing)]
files.sort()
if not (hasattr(self, 'isTopLevelPrincipiaApplicationObject') and
self.isTopLevelPrincipiaApplicationObject):
files.insert(0, ('..', aq_parent(self)))
files.insert(0, ('.', self))
for k, v in files:
# Note that we have to tolerate failure here, because
# Broken objects won't stat correctly. If an object fails
# to be able to stat itself, we will ignore it, but log
# the error.
try:
stat = marshal.loads(v.manage_FTPstat(REQUEST))
except:
LOG.error("Failed to stat file '%s'" % k,
exc_info=sys.exc_info())
stat = None
if stat is not None:
out = out + ((k, stat),)
return marshal.dumps(out)
security.declareProtected(ftp_access, 'manage_FTPstat')
def manage_FTPstat(self, REQUEST):
"""Psuedo stat, used by FTP for directory listings.
"""
mode = 0o0040000
from AccessControl.User import nobody
# check to see if we are acquiring our objectValues or not
if not (len(REQUEST.PARENTS) > 1 and
self.objectValues() == REQUEST.PARENTS[1].objectValues()):
try:
if getSecurityManager().validate(
None, self, 'manage_FTPlist', self.manage_FTPlist):
mode = mode | 0o0770
except Exception:
pass
if nobody.allowed(self, getRoles( if nobody.allowed(self, getRoles(
self, 'manage_FTPlist', self.manage_FTPlist, ())): self, 'manage_FTPlist', self.manage_FTPlist, ())):
mode = mode | 0o0007 mode = mode | 0o0007
if hasattr(aq_base(self), '_p_mtime'): if hasattr(aq_base(self), '_p_mtime'):
mtime = DateTime(self._p_mtime).timeTime() mtime = DateTime(self._p_mtime).timeTime()
else: else:
mtime = time.time() mtime = time.time()
# get owner and group # get owner and group
owner = group = 'Zope' owner = group = 'Zope'
for user, roles in self.get_local_roles(): for user, roles in self.get_local_roles():
if 'Owner' in roles: if 'Owner' in roles:
owner = user owner = user
break break
return marshal.dumps( return marshal.dumps(
(mode, 0, 0, 1, owner, group, 0, mtime, mtime, mtime)) (mode, 0, 0, 1, owner, group, 0, mtime, mtime, mtime))
def __delitem__(self, name): def __delitem__(self, name):
return self.manage_delObjects(ids=[name]) return self.manage_delObjects(ids=[name])
......
...@@ -270,81 +270,82 @@ class Item(Base, ...@@ -270,81 +270,82 @@ class Item(Base,
return () return ()
objectIds = objectItems = objectValues objectIds = objectItems = objectValues
# FTP support methods if bbb.HAS_ZSERVER:
# FTP support methods
def manage_FTPstat(self, REQUEST): def manage_FTPstat(self, REQUEST):
"""Psuedo stat, used by FTP for directory listings. """Psuedo stat, used by FTP for directory listings.
""" """
from AccessControl.User import nobody from AccessControl.User import nobody
mode = 0o0100000 mode = 0o0100000
if (hasattr(aq_base(self), 'manage_FTPget')): if (hasattr(aq_base(self), 'manage_FTPget')):
try: try:
if getSecurityManager().validate( if getSecurityManager().validate(
None, self, 'manage_FTPget', self.manage_FTPget): None, self, 'manage_FTPget', self.manage_FTPget):
mode = mode | 0o0440 mode = mode | 0o0440
except Unauthorized: except Unauthorized:
pass pass
if nobody.allowed( if nobody.allowed(
self.manage_FTPget, self.manage_FTPget,
getRoles(self, 'manage_FTPget', self.manage_FTPget, ())): getRoles(self, 'manage_FTPget', self.manage_FTPget, ())):
mode = mode | 0o0004 mode = mode | 0o0004
# check write permissions # check write permissions
if hasattr(aq_base(self), 'PUT'): if hasattr(aq_base(self), 'PUT'):
try: try:
if getSecurityManager().validate(None, self, 'PUT', self.PUT): if getSecurityManager().validate(None, self, 'PUT', self.PUT):
mode = mode | 0o0220 mode = mode | 0o0220
except Unauthorized: except Unauthorized:
pass pass
if nobody.allowed( if nobody.allowed(
self.PUT, self.PUT,
getRoles(self, 'PUT', self.PUT, ())): getRoles(self, 'PUT', self.PUT, ())):
mode = mode | 0o0002 mode = mode | 0o0002
# get size # get size
if hasattr(aq_base(self), 'get_size'): if hasattr(aq_base(self), 'get_size'):
size = self.get_size() size = self.get_size()
elif hasattr(aq_base(self), 'manage_FTPget'): elif hasattr(aq_base(self), 'manage_FTPget'):
size = len(self.manage_FTPget()) size = len(self.manage_FTPget())
else: else:
size = 0 size = 0
# get modification time # get modification time
if hasattr(aq_base(self), '_p_mtime'): if hasattr(aq_base(self), '_p_mtime'):
mtime = DateTime(self._p_mtime).timeTime() mtime = DateTime(self._p_mtime).timeTime()
else: else:
mtime = time.time() mtime = time.time()
# get owner and group # get owner and group
owner = group = 'Zope' owner = group = 'Zope'
if hasattr(aq_base(self), 'get_local_roles'): if hasattr(aq_base(self), 'get_local_roles'):
for user, roles in self.get_local_roles(): for user, roles in self.get_local_roles():
if 'Owner' in roles: if 'Owner' in roles:
owner = user owner = user
break
return marshal.dumps(
(mode, 0, 0, 1, owner, group, size, mtime, mtime, mtime))
def manage_FTPlist(self, REQUEST):
"""Directory listing for FTP.
In the case of non-Foldoid objects, the listing should contain one
object, the object itself.
"""
from App.Common import is_acquired
# check to see if we are being acquiring or not
ob = self
while 1:
if is_acquired(ob):
raise ValueError('FTP List not supported on acquired objects')
if not hasattr(ob, '__parent__'):
break break
return marshal.dumps( ob = aq_parent(ob)
(mode, 0, 0, 1, owner, group, size, mtime, mtime, mtime))
def manage_FTPlist(self, REQUEST):
"""Directory listing for FTP.
In the case of non-Foldoid objects, the listing should contain one stat = marshal.loads(self.manage_FTPstat(REQUEST))
object, the object itself. id = self.getId()
""" return marshal.dumps((id, stat))
from App.Common import is_acquired
# check to see if we are being acquiring or not
ob = self
while 1:
if is_acquired(ob):
raise ValueError('FTP List not supported on acquired objects')
if not hasattr(ob, '__parent__'):
break
ob = aq_parent(ob)
stat = marshal.loads(self.manage_FTPstat(REQUEST))
id = self.getId()
return marshal.dumps((id, stat))
def __len__(self): def __len__(self):
return 1 return 1
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
from zope.component.interfaces import IPossibleSite from zope.component.interfaces import IPossibleSite
from zope.container.interfaces import IContainer from zope.container.interfaces import IContainer
from zope.deferredimport import deprecated
from zope.interface import Attribute from zope.interface import Attribute
from zope.interface import Interface from zope.interface import Interface
from zope.interface.interfaces import IObjectEvent from zope.interface.interfaces import IObjectEvent
...@@ -171,27 +172,6 @@ class ICopySource(Interface): ...@@ -171,27 +172,6 @@ class ICopySource(Interface):
""" """
# XXX: might contain non-API methods and outdated comments;
# not synced with ZopeBook API Reference;
# based on OFS.FTPInterface.FTPInterface
class IFTPAccess(Interface):
"""Provide support for FTP access"""
def manage_FTPstat(REQUEST):
"""Returns a stat-like tuple. (marshalled to a string) Used by
FTP for directory listings, and MDTM and SIZE"""
def manage_FTPlist(REQUEST):
"""Returns a directory listing consisting of a tuple of
(id,stat) tuples, marshaled to a string. Note, the listing it
should include '..' if there is a Folder above the current
one.
In the case of non-foldoid objects it should return a single
tuple (id,stat) representing itself."""
# XXX: might contain non-API methods and outdated comments; # XXX: might contain non-API methods and outdated comments;
# not synced with ZopeBook API Reference; # not synced with ZopeBook API Reference;
# based on OFS.Traversable.Traversable # based on OFS.Traversable.Traversable
...@@ -519,7 +499,7 @@ class ILockItem(Interface): ...@@ -519,7 +499,7 @@ class ILockItem(Interface):
# XXX: might contain non-API methods and outdated comments; # XXX: might contain non-API methods and outdated comments;
# not synced with ZopeBook API Reference; # not synced with ZopeBook API Reference;
# based on OFS.SimpleItem.Item # based on OFS.SimpleItem.Item
class IItem(IZopeObject, IManageable, IFTPAccess, class IItem(IZopeObject, IManageable,
ICopySource, ITraversable, IOwned): ICopySource, ITraversable, IOwned):
__name__ = BytesLine(title=u"Name") __name__ = BytesLine(title=u"Name")
...@@ -1057,3 +1037,10 @@ class IObjectClonedEvent(IObjectEvent): ...@@ -1057,3 +1037,10 @@ class IObjectClonedEvent(IObjectEvent):
event.object is the copied object, already added to its container. event.object is the copied object, already added to its container.
Note that this event is dispatched to all sublocations. Note that this event is dispatched to all sublocations.
""" """
# BBB Zope 5.0
deprecated(
'Please import from webdav.interfaces.',
IFTPAccess='webdav.interfaces:IFTPAccess',
)
import unittest
class TestFTPInterface(unittest.TestCase):
def test_interfaces(self):
from OFS.interfaces import IFTPAccess
from OFS.FTPInterface import FTPInterface
from zope.interface.verify import verifyClass
verifyClass(IFTPAccess, FTPInterface)
...@@ -257,27 +257,6 @@ class FileTests(unittest.TestCase): ...@@ -257,27 +257,6 @@ class FileTests(unittest.TestCase):
self.assertEqual(resp.getStatus(), 200) self.assertEqual(resp.getStatus(), 200)
self.assertEqual(data, str(self.file.data)) self.assertEqual(data, str(self.file.data))
def testPUT(self):
s = '# some python\n'
# with content type
data = StringIO(s)
req = aputrequest(data, 'text/x-python')
req.processInputs()
self.file.PUT(req, req.RESPONSE)
self.assertEqual(self.file.content_type, 'text/x-python')
self.assertEqual(str(self.file.data), s)
# without content type
data.seek(0)
req = aputrequest(data, '')
req.processInputs()
self.file.PUT(req, req.RESPONSE)
self.assertEqual(self.file.content_type, 'text/x-python')
self.assertEqual(str(self.file.data), s)
def testIndexHtmlWithPdata(self): def testIndexHtmlWithPdata(self):
self.file.manage_upload('a' * (2 << 16)) # 128K self.file.manage_upload('a' * (2 << 16)) # 128K
self.file.index_html(self.app.REQUEST, self.app.REQUEST.RESPONSE) self.file.index_html(self.app.REQUEST, self.app.REQUEST.RESPONSE)
......
...@@ -37,6 +37,7 @@ from Shared.DC.Scripts.Script import Script ...@@ -37,6 +37,7 @@ from Shared.DC.Scripts.Script import Script
from Shared.DC.Scripts.Signature import FuncCode from Shared.DC.Scripts.Signature import FuncCode
from zExceptions import ResourceLockedError from zExceptions import ResourceLockedError
from Products.PageTemplates import bbb
from Products.PageTemplates.PageTemplate import PageTemplate from Products.PageTemplates.PageTemplate import PageTemplate
from Products.PageTemplates.PageTemplateFile import PageTemplateFile from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PageTemplates.PageTemplateFile import guess_type from Products.PageTemplates.PageTemplateFile import guess_type
...@@ -344,27 +345,29 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable, ...@@ -344,27 +345,29 @@ class ZopePageTemplate(Script, PageTemplate, Historical, Cacheable,
'manage_beforeHistoryCopy', 'manage_beforeHistoryCopy',
'manage_afterHistoryCopy') 'manage_afterHistoryCopy')
security.declareProtected(change_page_templates, 'PUT') if bbb.HAS_ZSERVER:
def PUT(self, REQUEST, RESPONSE): security.declareProtected(change_page_templates, 'PUT')
""" Handle HTTP PUT requests """ def PUT(self, REQUEST, RESPONSE):
""" Handle HTTP PUT requests """
self.dav__init(REQUEST, RESPONSE)
self.dav__simpleifhandler(REQUEST, RESPONSE, refresh=1) self.dav__init(REQUEST, RESPONSE)
text = REQUEST.get('BODY', '') self.dav__simpleifhandler(REQUEST, RESPONSE, refresh=1)
content_type = guess_type('', text) text = REQUEST.get('BODY', '')
self.pt_edit(text, content_type) content_type = guess_type('', text)
RESPONSE.setStatus(204) self.pt_edit(text, content_type)
return RESPONSE RESPONSE.setStatus(204)
return RESPONSE
security.declareProtected(change_page_templates, 'manage_FTPput')
manage_FTPput = PUT security.declareProtected(change_page_templates, 'manage_FTPput')
manage_FTPput = PUT
security.declareProtected(ftp_access, 'manage_FTPstat', 'manage_FTPlist')
security.declareProtected(ftp_access, 'manage_FTPget') security.declareProtected(ftp_access, 'manage_FTPstat')
def manage_FTPget(self): security.declareProtected(ftp_access, 'manage_FTPlist')
"Get source for FTP download" security.declareProtected(ftp_access, 'manage_FTPget')
result = self.read() def manage_FTPget(self):
return result.encode(self.output_encoding) "Get source for FTP download"
result = self.read()
return result.encode(self.output_encoding)
security.declareProtected(view_management_screens, 'html') security.declareProtected(view_management_screens, 'html')
def html(self): def html(self):
......
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import pkg_resources
HAS_ZSERVER = True
try:
dist = pkg_resources.get_distribution('ZServer')
except pkg_resources.DistributionNotFound:
HAS_ZSERVER = False
...@@ -15,6 +15,7 @@ from zope.publisher.http import HTTPCharsets ...@@ -15,6 +15,7 @@ from zope.publisher.http import HTTPCharsets
from Testing.makerequest import makerequest from Testing.makerequest import makerequest
from Testing.ZopeTestCase import ZopeTestCase, installProduct from Testing.ZopeTestCase import ZopeTestCase, installProduct
from Products.PageTemplates.PageTemplateFile import guess_type
from Products.PageTemplates.ZopePageTemplate import ZopePageTemplate from Products.PageTemplates.ZopePageTemplate import ZopePageTemplate
from Products.PageTemplates.ZopePageTemplate import manage_addPageTemplate from Products.PageTemplates.ZopePageTemplate import manage_addPageTemplate
from Products.PageTemplates.utils import encodingFromXMLPreamble from Products.PageTemplates.utils import encodingFromXMLPreamble
...@@ -308,9 +309,8 @@ class ZopePageTemplateFileTests(ZopeTestCase): ...@@ -308,9 +309,8 @@ class ZopePageTemplateFileTests(ZopeTestCase):
def _put(self, text): def _put(self, text):
zpt = self._createZPT() zpt = self._createZPT()
REQUEST = self.app.REQUEST content_type = guess_type('', text)
REQUEST.set('BODY', text) zpt.pt_edit(text, content_type)
zpt.PUT(REQUEST, REQUEST.RESPONSE)
return zpt return zpt
def testPutHTMLIso8859_15WithCharsetInfo(self): def testPutHTMLIso8859_15WithCharsetInfo(self):
...@@ -417,14 +417,6 @@ class ZPTRegressions(unittest.TestCase): ...@@ -417,14 +417,6 @@ class ZPTRegressions(unittest.TestCase):
pt = self.app.pt1 pt = self.app.pt1
self.assertEqual(pt.document_src(), self.text) self.assertEqual(pt.document_src(), self.text)
def testFTPGet(self):
# check for bug #2269
request = self.app.REQUEST
text = '<span tal:content="string:foobar"></span>'
self._addPT('pt1', text=text, REQUEST=request)
result = self.app.pt1.manage_FTPget()
self.assertEqual(result, text)
class ZPTMacros(zope.component.testing.PlacelessSetup, unittest.TestCase): class ZPTMacros(zope.component.testing.PlacelessSetup, unittest.TestCase):
......
...@@ -120,18 +120,6 @@ class TestFunctional(ZopeTestCase.FunctionalTestCase): ...@@ -120,18 +120,6 @@ class TestFunctional(ZopeTestCase.FunctionalTestCase):
self.assertEqual(response.getStatus(), 200) self.assertEqual(response.getStatus(), 200)
self.assertEqual(self.folder.index_html.title_or_id(), 'Foo') self.assertEqual(self.folder.index_html.title_or_id(), 'Foo')
def testPUTExisting(self):
# FTP new data into an existing object
self.setPermissions([change_dtml_documents])
put_data = StringIO('foo')
response = self.publish(self.folder_path + '/index_html',
request_method='PUT', stdin=put_data,
basic=self.basic_auth)
self.assertEqual(response.getStatus(), 204)
self.assertEqual(self.folder.index_html(), 'foo')
def testHEAD(self): def testHEAD(self):
# HEAD should work without passing stdin # HEAD should work without passing stdin
response = self.publish(self.folder_path + '/index_html', response = self.publish(self.folder_path + '/index_html',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment