Commit d1536a57 authored by Hanno Schlichting's avatar Hanno Schlichting

Move webdav's EtagSupport, Lockable and LockItem into OFS.

parent 32e8dea3
......@@ -22,11 +22,14 @@ Features Added
- AccessControl = 4.0a3
- AuthEncoding = 4.0.0
- Products.PythonScripts = 4.0
- zExceptions = 3.3
Restructuring
+++++++++++++
- Move webdav's EtagSupport, Lockable and LockItem into OFS.
- Split `Products.TemporaryFolder` and `Products.ZODBMountPoint` into
one new project called `Products.TemporaryFolder`.
......
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import time
from zope.interface import implements
from zope.interface import Interface
from zExceptions import HTTPPreconditionFailed
class EtagBaseInterface(Interface):
"""\
Basic Etag support interface, meaning the object supports generating
an Etag that can be used by certain HTTP and WebDAV Requests.
"""
def http__etag():
"""\
Entity tags are used for comparing two or more entities from
the same requested resource. Predominantly used for Caching,
Etags can also be used to deal with the 'Lost Updates Problem'.
An HTTP Client such as Amaya that supports PUT for editing can
use the Etag value returned in the head of a GET response in the
'if-match' header submitted with a PUT request. If the Etag
for the requested resource in the PUT request's 'if-match' header
is different from the current Etag value returned by this method,
the PUT will fail (it means that the state of the resource has
changed since the last copy the Client recieved) because the
precondition (the 'if-match') fails (the submitted Etag does not
match the current Etag).
"""
def http__refreshEtag():
"""\
While it may make sense to use the ZODB Object Id or the
database mtime to generate an Etag, this could
fail on certain REQUESTS because:
o The object is not stored in the ZODB, or
o A Request such as PUT changes the oid or database mtime
*AFTER* the Response has been written out, but the Etag needs
to be updated and returned with the Response of the PUT request.
Thus, Etags need to be refreshed manually when an object changes.
"""
class EtagSupport(object):
"""
This class is the basis for supporting Etags in Zope. It's main
function right now is to support the *Lost Updates Problem* by
allowing Etags and If-Match headers to be checked on PUT calls to
provide a *Seatbelt* style functionality. The Etags is based on
the databaes mtime, and thus is updated whenever the
object is updated. If a PUT request, or other HTTP or Dav request
comes in with an Etag different than the current one, that request
can be rejected according to the type of header (If-Match,
If-None-Match).
"""
implements(EtagBaseInterface)
def http__etag(self, readonly=0):
try:
etag = self.__etag
except AttributeError:
if readonly: # Don't refresh the etag on reads
return
self.http__refreshEtag()
etag = self.__etag
return etag
def http__refreshEtag(self):
self.__etag = 'ts%s' % str(time.time())[2:]
def http__parseMatchList(self, REQUEST, header="if-match"):
# Return a sequence of strings found in the header specified
# (should be one of {'if-match' or 'if-none-match'}). If the
# header is not in the request, returns None. Otherwise,
# returns a tuple of Etags.
matchlist = REQUEST.get_header(header)
if matchlist is None:
matchlist = REQUEST.get_header(header.title())
if matchlist is None:
return None
matchlist = [x.strip() for x in matchlist.split(',')]
r = []
for match in matchlist:
if match == '*':
r.insert(0, match)
elif (match[0] + match[-1] == '""') and (len(match) > 2):
r.append(match[1:-1])
return tuple(r)
def http__processMatchHeaders(self, REQUEST=None):
# Process if-match and if-none-match headers
if REQUEST is None:
REQUEST = self.aq_acquire('REQUEST')
matchlist = self.http__parseMatchList(REQUEST, 'if-match')
nonematch = self.http__parseMatchList(REQUEST, 'if-none-match')
if matchlist is None:
# There's no Matchlist, but 'if-none-match' might need processing
pass
elif ('*' in matchlist):
return 1 # * matches everything
elif self.http__etag() not in matchlist:
# The resource etag is not in the list of etags required
# to match, as specified in the 'if-match' header. The
# condition fails and the HTTP Method may *not* execute.
raise HTTPPreconditionFailed()
elif self.http__etag() in matchlist:
return 1
if nonematch is None:
# There's no 'if-none-match' header either, so there's no
# problem continuing with the request
return 1
elif ('*' in nonematch):
# if-none-match: * means that the operation should not
# be performed if the specified resource exists
raise HTTPPreconditionFailed()
elif self.http__etag() in nonematch:
# The opposite of if-match, the condition fails
# IF the resources Etag is in the if-none-match list
raise HTTPPreconditionFailed()
elif self.http__etag() not in nonematch:
return 1
......@@ -21,6 +21,7 @@ from zope.interface import implements
from OFS.FindSupport import FindSupport
from OFS.interfaces import IFolder
from OFS.Lockable import LockableItem
from OFS.ObjectManager import ObjectManager
from OFS.PropertyManager import PropertyManager
from OFS.role import RoleManager
......@@ -30,10 +31,19 @@ try:
from webdav.Collection import Collection
except ImportError:
class Collection(object):
pass
def dav__init(self, request, response):
pass
def dav__validate(self, object, methodname, REQUEST):
pass
def dav__simpleifhandler(self, request, response, method='PUT',
col=0, url=None, refresh=0):
pass
manage_addFolderForm = DTMLFile('dtml/folderAdd', globals())
manage_addFolderForm=DTMLFile('dtml/folderAdd', globals())
def manage_addFolder(self, id, title='',
createPublic=0,
......@@ -54,6 +64,7 @@ class Folder(
PropertyManager,
RoleManager,
Collection,
LockableItem,
Item,
FindSupport,
):
......
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import random
import time
from AccessControl.class_init import InitializeClass
from AccessControl.owner import ownerInfo
from AccessControl.SecurityInfo import ClassSecurityInfo
from Persistence import Persistent
from zope.interface import implements
from OFS.interfaces import ILockItem
_randGen = random.Random(time.time())
MAXTIMEOUT = (2**32) - 1 # Maximum timeout time
DEFAULTTIMEOUT = 12 * 60 # Default timeout
def generateLockToken():
# Generate a lock token
return '%s-%s-00105A989226:%.03f' % \
(_randGen.random(), _randGen.random(), time.time())
def validateTimeout(timeout):
# Timeout *should* be in the form "Seconds-XXX" or "Infinite"
errors = []
try:
t = str(timeout).split('-')[-1]
if t.lower() == 'infinite':
# Default to 1800 seconds for infinite requests
timeout = DEFAULTTIMEOUT
else:
timeout = int(t)
except ValueError:
errors.append("Bad timeout value")
if timeout > MAXTIMEOUT:
errors.append("Timeout request is greater than %s" % MAXTIMEOUT)
return timeout, errors
class LockItem(Persistent):
implements(ILockItem)
security = ClassSecurityInfo()
security.declarePublic('getOwner', 'getLockToken', 'getDepth',
'getTimeout', 'getTimeoutString',
'getModifiedTime', 'isValid', 'getLockScope',
'getLockType')
security.declareProtected('Change Lock Information',
'setTimeout', 'refresh')
security.declareProtected('Access contents information',
'getCreator', 'getCreatorPath')
def __init__(self, creator, owner='', depth=0, timeout='Infinite',
locktype='write', lockscope='exclusive', token=None):
errors = []
# First check the values and raise value errors if outside of contract
if not getattr(creator, 'getUserName', None):
errors.append("Creator not a user object")
if str(depth).lower() not in ('0', 'infinity'):
errors.append("Depth must be 0 or infinity")
if locktype.lower() != 'write':
errors.append("Lock type '%s' not supported" % locktype)
if lockscope.lower() != 'exclusive':
errors.append("Lock scope '%s' not supported" % lockscope)
timeout, e = validateTimeout(timeout)
errors = errors + e
# Finally, if there were errors, report them ALL to on high
if errors:
raise ValueError(errors)
# AccessControl.owner.ownerInfo returns the id of the creator
# and the path to the UserFolder they're defined in
self._creator = ownerInfo(creator)
self._owner = owner
self._depth = depth
self._timeout = timeout
self._locktype = locktype
self._lockscope = lockscope
self._modifiedtime = time.time()
if token is None:
self._token = generateLockToken()
else:
self._token = token
def getCreator(self):
return self._creator
def getCreatorPath(self):
db, name = self._creator
path = '/'.join(db)
return "/%s/%s" % (path, name)
def getOwner(self):
return self._owner
def getLockToken(self):
return self._token
def getDepth(self):
return self._depth
def getTimeout(self):
return self._timeout
def getTimeoutString(self):
t = str(self._timeout)
if t[-1] == 'L':
t = t[:-1] # lob off Long signifier
return "Second-%s" % t
def setTimeout(self, newtimeout):
timeout, errors = validateTimeout(newtimeout)
if errors:
raise ValueError(errors)
else:
self._timeout = timeout
self._modifiedtime = time.time() # reset modified
def getModifiedTime(self):
return self._modifiedtime
def refresh(self):
self._modifiedtime = time.time()
def isValid(self):
now = time.time()
modified = self._modifiedtime
timeout = self._timeout
return (modified + timeout) > now
def getLockType(self):
return self._locktype
def getLockScope(self):
return self._lockscope
def asLockDiscoveryProperty(self, ns='d', fake=0):
if fake:
token = 'this-is-a-faked-no-permission-token'
else:
token = self._token
s = (' <%(ns)s:activelock>\n'
' <%(ns)s:locktype><%(ns)s:%(locktype)s/></%(ns)s:locktype>\n'
' <%(ns)s:lockscope><%(ns)s:%(lockscope)s/></%(ns)s:lockscope>\n'
' <%(ns)s:depth>%(depth)s</%(ns)s:depth>\n'
' <%(ns)s:owner>%(owner)s</%(ns)s:owner>\n'
' <%(ns)s:timeout>%(timeout)s</%(ns)s:timeout>\n'
' <%(ns)s:locktoken>\n'
' <%(ns)s:href>opaquelocktoken:%(locktoken)s</%(ns)s:href>\n'
' </%(ns)s:locktoken>\n'
' </%(ns)s:activelock>\n'
) % {
'ns': ns,
'locktype': self._locktype,
'lockscope': self._lockscope,
'depth': self._depth,
'owner': self._owner,
'timeout': self.getTimeoutString(),
'locktoken': token}
return s
def asXML(self):
s = """<?xml version="1.0" encoding="utf-8" ?>
<d:prop xmlns:d="DAV:">
<d:lockdiscovery>
%s
</d:lockdiscovery>
</d:prop>""" % self.asLockDiscoveryProperty(ns="d")
return s
InitializeClass(LockItem)
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from AccessControl.class_init import InitializeClass
from AccessControl.SecurityInfo import ClassSecurityInfo
from Acquisition import aq_base
from Persistence import PersistentMapping
from zope.interface import implements
from OFS.EtagSupport import EtagSupport
from OFS.interfaces import ILockItem, IWriteLock
class LockableItem(EtagSupport):
"""Implements the WriteLock interface.
"""
implements(IWriteLock)
# Protect methods using declarative security
security = ClassSecurityInfo()
security.declarePrivate('wl_lockmapping')
security.declarePublic('wl_isLocked', 'wl_getLock', 'wl_isLockedByUser',
'wl_lockItems', 'wl_lockValues', 'wl_lockTokens',)
security.declareProtected('WebDAV Lock items', 'wl_setLock')
security.declareProtected('WebDAV Unlock items', 'wl_delLock')
security.declareProtected('Manage WebDAV Locks', 'wl_clearLocks')
# Setting default roles for permissions - we want owners of conent
# to be able to lock.
security.setPermissionDefault('WebDAV Lock items', ('Manager', 'Owner',))
security.setPermissionDefault('WebDAV Unlock items', ('Manager', 'Owner',))
def wl_lockmapping(self, killinvalids=0, create=0):
""" if 'killinvalids' is 1, locks who are no longer valid
will be deleted """
try:
locks = getattr(self, '_dav_writelocks', None)
except Exception:
locks = None
if locks is None:
if create:
locks = self._dav_writelocks = PersistentMapping()
else:
# Don't generate a side effect transaction.
locks = {}
return locks
elif killinvalids:
# Delete invalid locks
for token, lock in locks.items():
if not lock.isValid():
del locks[token]
if (not locks) and hasattr(aq_base(self),
'__no_valid_write_locks__'):
self.__no_valid_write_locks__()
return locks
else:
return locks
def wl_lockItems(self, killinvalids=0):
return self.wl_lockmapping(killinvalids).items()
def wl_lockValues(self, killinvalids=0):
return self.wl_lockmapping(killinvalids).values()
def wl_lockTokens(self, killinvalids=0):
return self.wl_lockmapping(killinvalids).keys()
def wl_hasLock(self, token, killinvalids=0):
if not token:
return 0
return token in self.wl_lockmapping(killinvalids).keys()
def wl_isLocked(self):
# returns true if 'self' is locked at all
# We set 'killinvalids' to 1 to delete all locks who are no longer
# valid (timeout has been exceeded)
locks = self.wl_lockmapping(killinvalids=1)
if locks.keys():
return 1
else:
return 0
def wl_setLock(self, locktoken, lock):
locks = self.wl_lockmapping(create=1)
if ILockItem.providedBy(lock):
if locktoken == lock.getLockToken():
locks[locktoken] = lock
else:
raise ValueError('Lock tokens do not match')
else:
raise ValueError('Lock does not implement the LockItem Interface')
def wl_getLock(self, locktoken):
locks = self.wl_lockmapping(killinvalids=1)
return locks.get(locktoken, None)
def wl_delLock(self, locktoken):
locks = self.wl_lockmapping()
if locktoken in locks:
del locks[locktoken]
def wl_clearLocks(self):
# Called by lock management machinery to quickly and effectively
# destroy all locks.
try:
locks = self.wl_lockmapping()
locks.clear()
except:
# The locks may be totally messed up, so we'll just delete
# and replace.
if hasattr(self, '_dav_writelocks'):
del self._dav_writelocks
if IWriteLock.providedBy(self):
self._dav_writelocks = PersistentMapping()
# Call into a special hook used by LockNullResources to delete
# themselves. Could be used by other objects who want to deal
# with the state of empty locks.
if hasattr(aq_base(self), '__no_valid_write_locks__'):
self.__no_valid_write_locks__()
InitializeClass(LockableItem)
def wl_isLocked(ob):
""" Returns true if the object is locked, returns 0 if the object
is not locked or does not implement the WriteLockInterface """
return wl_isLockable(ob) and ob.wl_isLocked()
def wl_isLockable(ob):
return IWriteLock.providedBy(ob)
......@@ -59,6 +59,7 @@ from OFS.interfaces import IObjectManager
from OFS.Traversable import Traversable
from OFS.event import ObjectWillBeAddedEvent
from OFS.event import ObjectWillBeRemovedEvent
from OFS.Lockable import LockableItem
from OFS.subscribers import compatibilityCall
from OFS.XMLExportImport import importXML
from OFS.XMLExportImport import exportXML
......@@ -68,7 +69,15 @@ try:
from webdav.Collection import Collection
except ImportError:
class Collection(object):
pass
def dav__init(self, request, response):
pass
def dav__validate(self, object, methodname, REQUEST):
pass
def dav__simpleifhandler(self, request, response, method='PUT',
col=0, url=None, refresh=0):
pass
# Constants: __replaceable__ flags:
NOT_REPLACEABLE = 0
......@@ -147,6 +156,7 @@ class ObjectManager(CopyContainer,
Implicit,
Persistent,
Collection,
LockableItem,
Traversable,
):
......
......@@ -56,6 +56,7 @@ from OFS.interfaces import IItemWithName
from OFS.interfaces import ISimpleItem
from OFS.owner import Owned
from OFS.CopySupport import CopySource
from OFS.Lockable import LockableItem
from OFS.role import RoleManager
from OFS.Traversable import Traversable
......@@ -63,13 +64,22 @@ try:
from webdav.Resource import Resource
except ImportError:
class Resource(object):
pass
def dav__init(self, request, response):
pass
def dav__validate(self, object, methodname, REQUEST):
pass
def dav__simpleifhandler(self, request, response, method='PUT',
col=0, url=None, refresh=0):
pass
logger = logging.getLogger()
class Item(Base,
Resource,
LockableItem,
CopySource,
Tabs,
Traversable,
......
......@@ -391,6 +391,133 @@ class IWriteLock(Interface):
by lock management machinery. """
class ILockItem(Interface):
"""A LockItem contains information about a lock.
This includes:
o The locktoken uri (used to identify the lock by WebDAV)
o The lock owner (The string passed in the 'owner' property by WebDAV)
o The lock creator (the Zope user who physically owns the lock)
o Depth
o Timeout information
o Modified time (for calculating timeouts)
o LockType (only EXCLUSIVE is supported right now)
"""
# XXX: WAAAA! What is a ctor doing in the interface?
def __init__(creator, owner, depth=0, timeout='Infinity',
locktype='write', lockscope='exclusive', token=None):
"""\
If any of the following are untrue, a **ValueError** exception
will be raised.
- **creator** MUST be a Zope user object or string to find a
valid user object.
- **owner** MUST be a nonempty string, or type that can be converted
to a nonempty string.
- **depth** MUST be in the set {0,'infinity'}
- **timeout** MUST either be an integer, or a string in the form
of 'Seconds-nnn' where nnn is an integer. The timeout value
MUST be less than (2^32)-1. *IF* timeout is the string value
'Infinite', the timeout value will be set to 1800 (30 minutes).
(Timeout is the value in seconds from creation\modification
time until the lock MAY time out).
- **locktype** not in set {'write'} *this may expand later*
- **lockscope** not in set {'exclusive'} *this may expand later*
If the value passed in to 'token' is 'None', the a new locktoken
will be generated during the construction process.
__init__ must generate the opaquelocktoken uri used to identify the
lock (if 'token' is 'None')and set all of the above attributes on
the object.
"""
def getCreator():
""" Returns the Zope user who created the lock. This is returned
in a tuple containing the Users ID and the path to the user folder
they came from."""
def getCreatorPath():
""" Returns a string of the path to the user object in the user
folder they were found in. """
def getOwner():
""" Returns the string value of the 'owner' property sent
in by WebDAV """
def getLockToken():
""" returns the opaque lock token """
def getDepth():
""" returns the depth of the lock """
def getTimeout():
""" returns an integer value of the timeout setting """
def getTimeoutString():
""" returns the timeout value in a form acceptable by
WebDAV (ie - 'Seconds-40800') """
def setTimeout(newtimeout):
""" refreshes the timeout information """
def getModifiedTime():
""" returns a time.time value of the last time the Lock was
modified. From RFC 2518:
The timeout counter SHOULD be restarted any time an owner of the
lock sends a method to any member of the lock, including unsupported
methods or methods which are unsucscessful. The lock MUST be
refreshed if a refresh LOCK method is successfully received.
The modified time is used to calculate the refreshed value """
def refresh():
""" Tickles the locks modified time by setting it to the current
time.time() value. (As stated in the RFC, the timeout counter
SHOULD be restarted for any HTTP method called by the lock owner
on the locked object). """
def isValid():
""" Returns true if (self.getModifiedTime() + self.getTimeout())
is greater than the current time.time() value. """
# now = time.time()
# modified = self.getModifiedTime()
# timeout = self.getTimeout()
#
# return (modified + timeout > now) # there's time remaining
def getLockType():
""" returns the lock type ('write') """
def getLockScope():
""" returns the lock scope ('exclusive') """
def asLockDiscoveryProperty(ns='d'):
""" Return the lock rendered as an XML representation of a
WebDAV 'lockdiscovery' property. 'ns' is the namespace identifier
used on the XML elements."""
def asXML():
""" Render a full XML representation of a lock for WebDAV,
used when returning the value of a newly created lock. """
# XXX: might contain non-API methods and outdated comments;
# not synced with ZopeBook API Reference;
# based on OFS.SimpleItem.Item
......
import unittest
import Testing
import Zope2
Zope2.startup()
import transaction
......@@ -17,14 +15,19 @@ from OFS.SimpleItem import SimpleItem
from Testing.makerequest import makerequest
from Zope2.App import zcml
Zope2.startup()
class EventLogger(object):
def __init__(self):
self.reset()
def reset(self):
self._called = []
def trace(self, ob, event):
self._called.append((ob.getId(), event.__class__.__name__))
def called(self):
return self._called
......@@ -37,6 +40,7 @@ class ITestItem(interface.Interface):
class TestItem(SimpleItem):
interface.implements(ITestItem)
def __init__(self, id):
self.id = id
......@@ -47,10 +51,12 @@ class ITestFolder(interface.Interface):
class TestFolder(Folder):
interface.implements(ITestFolder)
def __init__(self, id):
self.id = id
def _verifyObjectPaste(self, object, validate_src=1):
pass # Always allow
pass # Always allow
class EventLayer:
......@@ -109,7 +115,8 @@ class TestCopySupport(EventTest):
def test_1_Clone(self):
# Test clone
self.subfolder.manage_clone(self.folder.mydoc, 'mydoc')
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('mydoc', 'ObjectCopiedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
......@@ -121,7 +128,8 @@ class TestCopySupport(EventTest):
# Test copy/paste
cb = self.folder.manage_copyObjects(['mydoc'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('mydoc', 'ObjectCopiedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
......@@ -133,7 +141,8 @@ class TestCopySupport(EventTest):
# Test cut/paste
cb = self.folder.manage_cutObjects(['mydoc'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('mydoc', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent'),
......@@ -143,52 +152,13 @@ class TestCopySupport(EventTest):
def test_4_Rename(self):
# Test rename
self.folder.manage_renameObject('mydoc', 'yourdoc')
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('mydoc', 'ObjectWillBeMovedEvent'),
('yourdoc', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent')]
)
def test_5_COPY(self):
# Test COPY
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = ('%s/subfolder/mydoc'
% self.folder.absolute_url())
self.folder.mydoc.COPY(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectCopiedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
('subfolder', 'ContainerModifiedEvent'),
('mydoc', 'ObjectClonedEvent')]
)
def test_6_MOVE(self):
# Test MOVE
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = ('%s/subfolder/mydoc'
% self.folder.absolute_url())
self.folder.mydoc.MOVE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent'),
('subfolder', 'ContainerModifiedEvent')]
)
def test_7_DELETE(self):
# Test DELETE
req = self.app.REQUEST
req['URL'] = '%s/mydoc' % self.folder.absolute_url()
self.folder.mydoc.DELETE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectWillBeRemovedEvent'),
('mydoc', 'ObjectRemovedEvent'),
('folder', 'ContainerModifiedEvent')]
)
class TestCopySupportSublocation(EventTest):
'''Tests the order in which events are fired'''
......@@ -215,13 +185,14 @@ class TestCopySupportSublocation(EventTest):
# XXX: Compare sets as the order of event handlers cannot be
# relied on between objects.
if not set(first) == set(second):
raise self.failureException, \
(msg or '%r != %r' % (first, second))
raise self.failureException(
(msg or '%r != %r' % (first, second)))
def test_1_Clone(self):
# Test clone
self.subfolder.manage_clone(self.folder.myfolder, 'myfolder')
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('myfolder', 'ObjectCopiedEvent'),
('mydoc', 'ObjectCopiedEvent'),
('myfolder', 'ObjectWillBeAddedEvent'),
......@@ -237,7 +208,8 @@ class TestCopySupportSublocation(EventTest):
# Test copy/paste
cb = self.folder.manage_copyObjects(['myfolder'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('myfolder', 'ObjectCopiedEvent'),
('mydoc', 'ObjectCopiedEvent'),
('myfolder', 'ObjectWillBeAddedEvent'),
......@@ -253,7 +225,8 @@ class TestCopySupportSublocation(EventTest):
# Test cut/paste
cb = self.folder.manage_cutObjects(['myfolder'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('myfolder', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectWillBeMovedEvent'),
('myfolder', 'ObjectMovedEvent'),
......@@ -265,66 +238,11 @@ class TestCopySupportSublocation(EventTest):
def test_4_Rename(self):
# Test rename
self.folder.manage_renameObject('myfolder', 'yourfolder')
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('myfolder', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectWillBeMovedEvent'),
('yourfolder', 'ObjectMovedEvent'),
('mydoc', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent')]
)
def test_5_COPY(self):
# Test COPY
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = ('%s/subfolder/myfolder'
% self.folder.absolute_url())
self.folder.myfolder.COPY(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('myfolder', 'ObjectCopiedEvent'),
('mydoc', 'ObjectCopiedEvent'),
('myfolder', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('myfolder', 'ObjectAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
('subfolder', 'ContainerModifiedEvent'),
('myfolder', 'ObjectClonedEvent'),
('mydoc', 'ObjectClonedEvent')]
)
def test_6_MOVE(self):
# Test MOVE
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = ('%s/subfolder/myfolder'
% self.folder.absolute_url())
self.folder.myfolder.MOVE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('myfolder', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectWillBeMovedEvent'),
('myfolder', 'ObjectMovedEvent'),
('mydoc', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent'),
('subfolder', 'ContainerModifiedEvent')]
)
def test_7_DELETE(self):
# Test DELETE
req = self.app.REQUEST
req['URL'] = '%s/myfolder' % self.folder.absolute_url()
self.folder.myfolder.DELETE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('myfolder', 'ObjectWillBeRemovedEvent'),
('mydoc', 'ObjectWillBeRemovedEvent'),
('myfolder', 'ObjectRemovedEvent'),
('mydoc', 'ObjectRemovedEvent'),
('folder', 'ContainerModifiedEvent')]
)
def test_suite():
from unittest import TestSuite, makeSuite
suite = TestSuite()
suite.addTest(makeSuite(TestCopySupport))
suite.addTest(makeSuite(TestCopySupportSublocation))
return suite
import unittest
import Testing
import Zope2
Zope2.startup()
import transaction
......@@ -12,19 +10,25 @@ from Testing.makerequest import makerequest
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from OFS.metaconfigure import setDeprecatedManageAddDelete
from OFS.SimpleItem import SimpleItem
from OFS.Folder import Folder
from Zope2.App import zcml
Zope2.startup()
class EventLogger(object):
def __init__(self):
self.reset()
def reset(self):
self._called = []
def trace(self, ob, event):
self._called.append((ob.getId(), event))
def called(self):
return self._called
......@@ -34,10 +38,13 @@ eventlog = EventLogger()
class TestItem(SimpleItem):
def __init__(self, id):
self.id = id
def manage_afterAdd(self, item, container):
eventlog.trace(self, 'manage_afterAdd')
def manage_afterClone(self, item):
eventlog.trace(self, 'manage_afterClone')
def manage_beforeDelete(self, item, container):
eventlog.trace(self, 'manage_beforeDelete')
......@@ -45,21 +52,23 @@ class TestItem(SimpleItem):
class TestFolder(Folder):
def __init__(self, id):
self.id = id
def _verifyObjectPaste(self, object, validate_src=1):
pass # Always allow
pass # Always allow
def manage_afterAdd(self, item, container):
eventlog.trace(self, 'manage_afterAdd')
Folder.manage_afterAdd(self, item, container)
def manage_afterClone(self, item):
eventlog.trace(self, 'manage_afterClone')
Folder.manage_afterClone(self, item)
def manage_beforeDelete(self, item, container):
eventlog.trace(self, 'manage_beforeDelete')
Folder.manage_beforeDelete(self, item, container)
from OFS.metaconfigure import setDeprecatedManageAddDelete
class HookLayer:
@classmethod
......@@ -116,7 +125,8 @@ class TestCopySupport(HookTest):
def test_1_Clone(self):
# Test clone
self.subfolder.manage_clone(self.folder.mydoc, 'mydoc')
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('mydoc', 'manage_afterAdd'),
('mydoc', 'manage_afterClone')]
)
......@@ -125,7 +135,8 @@ class TestCopySupport(HookTest):
# Test copy/paste
cb = self.folder.manage_copyObjects(['mydoc'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('mydoc', 'manage_afterAdd'),
('mydoc', 'manage_afterClone')]
)
......@@ -134,7 +145,8 @@ class TestCopySupport(HookTest):
# Test cut/paste
cb = self.folder.manage_cutObjects(['mydoc'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('mydoc', 'manage_afterAdd')]
)
......@@ -142,42 +154,12 @@ class TestCopySupport(HookTest):
def test_4_Rename(self):
# Test rename
self.folder.manage_renameObject('mydoc', 'yourdoc')
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('yourdoc', 'manage_afterAdd')]
)
def test_5_COPY(self):
# Test COPY
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/mydoc' % self.folder.absolute_url()
self.folder.mydoc.COPY(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'manage_afterAdd'),
('mydoc', 'manage_afterClone')]
)
def test_6_MOVE(self):
# Test MOVE
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/mydoc' % self.folder.absolute_url()
self.folder.mydoc.MOVE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('mydoc', 'manage_afterAdd')]
)
def test_7_DELETE(self):
# Test DELETE
req = self.app.REQUEST
req['URL'] = '%s/mydoc' % self.folder.absolute_url()
self.folder.mydoc.DELETE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'manage_beforeDelete')]
)
class TestCopySupportSublocation(HookTest):
'''Tests the order in which add/clone/del hooks are called'''
......@@ -203,7 +185,8 @@ class TestCopySupportSublocation(HookTest):
def test_1_Clone(self):
# Test clone
self.subfolder.manage_clone(self.folder.myfolder, 'myfolder')
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('myfolder', 'manage_afterAdd'),
('mydoc', 'manage_afterAdd'),
('myfolder', 'manage_afterClone'),
......@@ -214,7 +197,8 @@ class TestCopySupportSublocation(HookTest):
# Test copy/paste
cb = self.folder.manage_copyObjects(['myfolder'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('myfolder', 'manage_afterAdd'),
('mydoc', 'manage_afterAdd'),
('myfolder', 'manage_afterClone'),
......@@ -225,7 +209,8 @@ class TestCopySupportSublocation(HookTest):
# Test cut/paste
cb = self.folder.manage_cutObjects(['myfolder'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('myfolder', 'manage_beforeDelete'),
('myfolder', 'manage_afterAdd'),
......@@ -235,54 +220,10 @@ class TestCopySupportSublocation(HookTest):
def test_4_Rename(self):
# Test rename
self.folder.manage_renameObject('myfolder', 'yourfolder')
self.assertEqual(eventlog.called(),
self.assertEqual(
eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('myfolder', 'manage_beforeDelete'),
('yourfolder', 'manage_afterAdd'),
('mydoc', 'manage_afterAdd')]
)
def test_5_COPY(self):
# Test COPY
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/myfolder' % self.folder.absolute_url()
self.folder.myfolder.COPY(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('myfolder', 'manage_afterAdd'),
('mydoc', 'manage_afterAdd'),
('myfolder', 'manage_afterClone'),
('mydoc', 'manage_afterClone')]
)
def test_6_MOVE(self):
# Test MOVE
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/myfolder' % self.folder.absolute_url()
self.folder.myfolder.MOVE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('myfolder', 'manage_beforeDelete'),
('myfolder', 'manage_afterAdd'),
('mydoc', 'manage_afterAdd')]
)
def test_7_DELETE(self):
# Test DELETE
req = self.app.REQUEST
req['URL'] = '%s/myfolder' % self.folder.absolute_url()
self.folder.myfolder.DELETE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('myfolder', 'manage_beforeDelete')]
)
def test_suite():
from unittest import TestSuite, makeSuite
suite = TestSuite()
suite.addTest(makeSuite(TestCopySupport))
suite.addTest(makeSuite(TestCopySupportSublocation))
return suite
......@@ -5,13 +5,7 @@ class TestEtagSupport(unittest.TestCase):
def test_interfaces(self):
from zope.interface.verify import verifyClass
from webdav.EtagSupport import EtagBaseInterface
from webdav.EtagSupport import EtagSupport
from OFS.EtagSupport import EtagBaseInterface
from OFS.EtagSupport import EtagSupport
verifyClass(EtagBaseInterface, EtagSupport)
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(TestEtagSupport),
))
......@@ -4,13 +4,8 @@ import unittest
class TestLockItem(unittest.TestCase):
def test_interfaces(self):
from webdav.interfaces import ILockItem
from webdav.LockItem import LockItem
from OFS.interfaces import ILockItem
from OFS.LockItem import LockItem
from zope.interface.verify import verifyClass
verifyClass(ILockItem, LockItem)
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(TestLockItem),
))
import unittest
from zope.interface import implements
from OFS.interfaces import IWriteLock
class LockableResource(object):
implements(IWriteLock)
def __init__(self, locked):
self.locked = locked
def wl_isLocked(self):
return self.locked
class UnlockableResource(object):
pass
class TestUtilFunctions(unittest.TestCase):
def test_wl_isLocked(self):
from webdav.Lockable import wl_isLocked
from OFS.Lockable import wl_isLocked
unlockable = UnlockableResource()
self.assertFalse(wl_isLocked(unlockable))
lockable_unlocked = LockableResource(locked=False)
......@@ -11,26 +31,8 @@ class TestUtilFunctions(unittest.TestCase):
self.assertTrue(wl_isLocked(lockable_locked))
def test_wl_isLockable(self):
from webdav.Lockable import wl_isLockable
from OFS.Lockable import wl_isLockable
unlockable = UnlockableResource()
self.assertFalse(wl_isLockable(unlockable))
lockable = LockableResource(locked=False)
self.assertTrue(wl_isLockable(lockable))
from OFS.interfaces import IWriteLock
from zope.interface import implements
class LockableResource:
implements(IWriteLock)
def __init__(self, locked):
self.locked = locked
def wl_isLocked(self):
return self.locked
class UnlockableResource:
pass
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(TestUtilFunctions),
))
......@@ -39,10 +39,11 @@ SET_COOKIE_DTML = '''\
CHANGE_TITLE_DTML = '''\
<dtml-call "manage_changeProperties(title=REQUEST.get('title'))">'''
class TestFunctional(ZopeTestCase.FunctionalTestCase):
def afterSetUp(self):
self.folder_path = '/'+self.folder.absolute_url(1)
self.folder_path = '/' + self.folder.absolute_url(1)
self.basic_auth = '%s:%s' % (user_name, user_password)
# A simple document
......@@ -68,28 +69,28 @@ class TestFunctional(ZopeTestCase.FunctionalTestCase):
self.assertEqual(response.getBody(), 'index')
def testPublishDocument(self):
response = self.publish(self.folder_path+'/index_html')
response = self.publish(self.folder_path + '/index_html')
self.assertEqual(response.getStatus(), 200)
self.assertEqual(response.getBody(), 'index')
def testUnauthorized(self):
response = self.publish(self.folder_path+'/secret_html')
response = self.publish(self.folder_path + '/secret_html')
self.assertEqual(response.getStatus(), 401)
def testBasicAuth(self):
response = self.publish(self.folder_path+'/secret_html',
response = self.publish(self.folder_path + '/secret_html',
self.basic_auth)
self.assertEqual(response.getStatus(), 200)
self.assertEqual(response.getBody(), 'secret')
def testRedirect(self):
response = self.publish(self.folder_path+'/redirect')
response = self.publish(self.folder_path + '/redirect')
self.assertEqual(response.getStatus(), 302)
self.assertEqual(response.getHeader('Location'),
self.app.absolute_url())
def testCookie(self):
response = self.publish(self.folder_path+'/set_cookie')
response = self.publish(self.folder_path + '/set_cookie')
self.assertEqual(response.getStatus(), 200)
self.assertEqual(response.getCookie('foo').get('value'), 'Bar')
self.assertEqual(response.getCookie('foo').get('path'), '/')
......@@ -112,7 +113,7 @@ class TestFunctional(ZopeTestCase.FunctionalTestCase):
form = {'title': 'Foo'}
post_data = StringIO(urlencode(form))
response = self.publish(self.folder_path+'/index_html/change_title',
response = self.publish(self.folder_path + '/index_html/change_title',
request_method='POST', stdin=post_data,
basic=self.basic_auth)
......@@ -124,50 +125,16 @@ class TestFunctional(ZopeTestCase.FunctionalTestCase):
self.setPermissions([change_dtml_documents])
put_data = StringIO('foo')
response = self.publish(self.folder_path+'/index_html',
response = self.publish(self.folder_path + '/index_html',
request_method='PUT', stdin=put_data,
basic=self.basic_auth)
self.assertEqual(response.getStatus(), 204)
self.assertEqual(self.folder.index_html(), 'foo')
def testPUTNew(self):
# Create a new object via PUT
self.setPermissions([add_documents_images_and_files])
put_data = StringIO('foo')
response = self.publish(self.folder_path+'/new_document',
env={'CONTENT_TYPE': 'text/html'},
request_method='PUT', stdin=put_data,
basic=self.basic_auth)
self.assertEqual(response.getStatus(), 201)
self.assertTrue('new_document' in self.folder.objectIds())
self.assertEqual(self.folder.new_document.meta_type, 'DTML Document')
self.assertEqual(self.folder.new_document(), 'foo')
def testPUTEmpty(self):
# PUT operation without passing stdin should result in empty content
self.setPermissions([change_dtml_documents])
response = self.publish(self.folder_path+'/index_html',
request_method='PUT',
basic=self.basic_auth)
self.assertEqual(response.getStatus(), 204)
self.assertEqual(self.folder.index_html(), '')
def testPROPFIND(self):
# PROPFIND should work without passing stdin
response = self.publish(self.folder_path+'/index_html',
request_method='PROPFIND',
basic=self.basic_auth)
self.assertEqual(response.getStatus(), 207)
def testHEAD(self):
# HEAD should work without passing stdin
response = self.publish(self.folder_path+'/index_html',
response = self.publish(self.folder_path + '/index_html',
request_method='HEAD')
self.assertEqual(response.getStatus(), 200)
......
......@@ -26,13 +26,12 @@ from Testing.ZopeTestCase import transaction
from AccessControl.Permissions import add_documents_images_and_files
from AccessControl.Permissions import delete_objects
from OFS.SimpleItem import SimpleItem
import tempfile
folder_name = ZopeTestCase.folder_name
cutpaste_permissions = [add_documents_images_and_files, delete_objects]
# Dummy object
from OFS.SimpleItem import SimpleItem
class DummyObject(SimpleItem):
id = 'dummy'
......@@ -41,7 +40,6 @@ class DummyObject(SimpleItem):
_p_foo = None
class ZODBCompatLayer(layer.ZopeLite):
@classmethod
......@@ -93,24 +91,6 @@ class TestCopyPaste(ZopeTestCase.ZopeTestCase):
self.assertFalse(hasattr(self.folder, 'doc'))
self.assertTrue(hasattr(self.folder, 'new_doc'))
def testCOPY(self):
# WebDAV COPY
request = self.app.REQUEST
request.environ['HTTP_DEPTH'] = 'infinity'
request.environ['HTTP_DESTINATION'] = 'http://foo.com/%s/new_doc' % folder_name
self.folder.doc.COPY(request, request.RESPONSE)
self.assertTrue(hasattr(self.folder, 'doc'))
self.assertTrue(hasattr(self.folder, 'new_doc'))
def testMOVE(self):
# WebDAV MOVE
request = self.app.REQUEST
request.environ['HTTP_DEPTH'] = 'infinity'
request.environ['HTTP_DESTINATION'] = 'http://foo.com/%s/new_doc' % folder_name
self.folder.doc.MOVE(request, request.RESPONSE)
self.assertFalse(hasattr(self.folder, 'doc'))
self.assertTrue(hasattr(self.folder, 'new_doc'))
class TestImportExport(ZopeTestCase.ZopeTestCase):
......@@ -137,7 +117,7 @@ class TestImportExport(ZopeTestCase.ZopeTestCase):
local_home = tempfile.gettempdir()
import_dir = os.path.join(local_home, 'import')
zexp_file = os.path.join(import_dir, 'doc.zexp')
zexp_file = os.path.join(import_dir, 'doc.zexp')
def setupLocalEnvironment(self):
# Create the 'import' directory
......@@ -152,10 +132,14 @@ class TestImportExport(ZopeTestCase.ZopeTestCase):
def afterClear(self):
# Remove external resources
try: os.remove(self.zexp_file)
except OSError: pass
try: os.rmdir(self.import_dir)
except OSError: pass
try:
os.remove(self.zexp_file)
except OSError:
pass
try:
os.rmdir(self.import_dir)
except OSError:
pass
try:
import App.config
except ImportError:
......@@ -182,12 +166,12 @@ class TestAttributesOfCleanObjects(ZopeTestCase.ZopeTestCase):
their values across tests.
The only use case yet encountered in the wild is portal_memberdata's
_v_temps attribute. Test authors are cautioned to watch out for
_v_temps attribute. Test authors are cautioned to watch out for
occurrences of _v_ and _p_ attributes of objects that are not recreated
for every test method execution, but preexist in the test ZODB.
It is therefore deemed essential to initialize any _v_ and _p_
attributes of such objects in afterSetup(), as otherwise test results
It is therefore deemed essential to initialize any _v_ and _p_
attributes of such objects in afterSetup(), as otherwise test results
will be distorted!
Note that _v_ attributes used to be transactional in Zope < 2.6.
......@@ -198,7 +182,7 @@ class TestAttributesOfCleanObjects(ZopeTestCase.ZopeTestCase):
layer = ZODBCompatLayer
def afterSetUp(self):
self.dummy = self.app.dummy1 # See above
self.dummy = self.app.dummy1 # See above
def testNormal_01(self):
# foo is always None
......@@ -252,7 +236,7 @@ class TestAttributesOfCleanObjects(ZopeTestCase.ZopeTestCase):
class TestAttributesOfDirtyObjects(ZopeTestCase.ZopeTestCase):
'''This testcase shows that _v_ and _p_ attributes of dirty objects
'''This testcase shows that _v_ and _p_ attributes of dirty objects
ARE removed on abort.
This testcase exploits the fact that test methods are sorted by name.
......@@ -261,8 +245,8 @@ class TestAttributesOfDirtyObjects(ZopeTestCase.ZopeTestCase):
layer = ZODBCompatLayer
def afterSetUp(self):
self.dummy = self.app.dummy2 # See above
self.dummy.touchme = 1 # Tag, you're dirty
self.dummy = self.app.dummy2 # See above
self.dummy.touchme = 1 # Tag, you're dirty
def testDirtyNormal_01(self):
# foo is always None
......@@ -333,4 +317,3 @@ def test_suite():
suite.addTest(makeSuite(TestAttributesOfDirtyObjects))
suite.addTest(makeSuite(TestTransactionAbort))
return suite
......@@ -17,15 +17,15 @@ from urllib import unquote
from AccessControl.class_init import InitializeClass
from AccessControl.SecurityManagement import getSecurityManager
from App.Common import rfc1123_date
from OFS.Lockable import wl_isLocked
from zExceptions import MethodNotAllowed
from zExceptions import NotFound
from zope.interface import implements
from webdav.common import Locked
from webdav.common import PreconditionFailed
from webdav.common import rfc1123_date
from webdav.common import urlfix
from webdav.Lockable import wl_isLocked
from webdav.interfaces import IDAVCollection
from webdav.Resource import Resource
......
......@@ -10,133 +10,11 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Etag support.
"""
import time
from zope.deferredimport import deprecated
from zope.interface import implements
from zope.interface import Interface
from webdav.common import PreconditionFailed
class EtagBaseInterface(Interface):
"""\
Basic Etag support interface, meaning the object supports generating
an Etag that can be used by certain HTTP and WebDAV Requests.
"""
def http__etag():
"""\
Entity tags are used for comparing two or more entities from
the same requested resource. Predominantly used for Caching,
Etags can also be used to deal with the 'Lost Updates Problem'.
An HTTP Client such as Amaya that supports PUT for editing can
use the Etag value returned in the head of a GET response in the
'if-match' header submitted with a PUT request. If the Etag
for the requested resource in the PUT request's 'if-match' header
is different from the current Etag value returned by this method,
the PUT will fail (it means that the state of the resource has
changed since the last copy the Client recieved) because the
precondition (the 'if-match') fails (the submitted Etag does not
match the current Etag).
"""
def http__refreshEtag():
"""\
While it may make sense to use the ZODB Object Id or the
database mtime to generate an Etag, this could
fail on certain REQUESTS because:
o The object is not stored in the ZODB, or
o A Request such as PUT changes the oid or database mtime
*AFTER* the Response has been written out, but the Etag needs
to be updated and returned with the Response of the PUT request.
Thus, Etags need to be refreshed manually when an object changes.
"""
class EtagSupport:
"""\
This class is the basis for supporting Etags in Zope. It's main
function right now is to support the *Lost Updates Problem* by
allowing Etags and If-Match headers to be checked on PUT calls to
provide a *Seatbelt* style functionality. The Etags is based on
the databaes mtime, and thus is updated whenever the
object is updated. If a PUT request, or other HTTP or Dav request
comes in with an Etag different than the current one, that request
can be rejected according to the type of header (If-Match,
If-None-Match).
"""
implements(EtagBaseInterface)
def http__etag(self, readonly=0):
try: etag = self.__etag
except AttributeError:
if readonly: # Don't refresh the etag on reads
return
self.http__refreshEtag()
etag = self.__etag
return etag
def http__refreshEtag(self):
self.__etag = 'ts%s' % str(time.time())[2:]
def http__parseMatchList(self, REQUEST, header="if-match"):
# Return a sequence of strings found in the header specified
# (should be one of {'if-match' or 'if-none-match'}). If the
# header is not in the request, returns None. Otherwise,
# returns a tuple of Etags.
matchlist = REQUEST.get_header(header)
if matchlist is None:
matchlist = REQUEST.get_header(header.title())
if matchlist is None:
return None
matchlist = [ x.strip() for x in matchlist.split(',')]
r = []
for match in matchlist:
if match == '*': r.insert(0, match)
elif (match[0] + match[-1] == '""') and (len(match) > 2):
r.append(match[1:-1])
return tuple(r)
def http__processMatchHeaders(self, REQUEST=None):
# Process if-match and if-none-match headers
if REQUEST is None: REQUEST = self.aq_acquire('REQUEST')
matchlist = self.http__parseMatchList(REQUEST, 'if-match')
nonematch = self.http__parseMatchList(REQUEST, 'if-none-match')
if matchlist is None:
# There's no Matchlist, but 'if-none-match' might need processing
pass
elif ('*' in matchlist):
return 1 # * matches everything
elif self.http__etag() not in matchlist:
# The resource etag is not in the list of etags required
# to match, as specified in the 'if-match' header. The
# condition fails and the HTTP Method may *not* execute.
raise PreconditionFailed()
elif self.http__etag() in matchlist:
return 1
if nonematch is None:
# There's no 'if-none-match' header either, so there's no
# problem continuing with the request
return 1
elif ('*' in nonematch):
# if-none-match: * means that the operation should not
# be performed if the specified resource exists
raise PreconditionFailed()
elif self.http__etag() in nonematch:
# The opposite of if-match, the condition fails
# IF the resources Etag is in the if-none-match list
raise PreconditionFailed()
elif self.http__etag() not in nonematch:
return 1
deprecated(
'Please import from OFS.EtagSupport.',
EtagBaseInterface='OFS.EtagSupport:EtagBaseInterface',
wl_isLocked='OFS.EtagSupport:EtagSupport',
)
......@@ -10,174 +10,13 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""WebDAV support - lock item.
"""
import time
from zope.deferredimport import deprecated
from AccessControl.class_init import InitializeClass
from AccessControl.owner import ownerInfo
from AccessControl.SecurityInfo import ClassSecurityInfo
from Persistence import Persistent
from zope.interface import implements
from webdav.common import generateLockToken
from webdav.interfaces import ILockItem
MAXTIMEOUT = (2L**32)-1 # Maximum timeout time
DEFAULTTIMEOUT = 12 * 60L # Default timeout
def validateTimeout(timeout):
# Timeout *should* be in the form "Seconds-XXX" or "Infinite"
errors = []
try:
t =str(timeout).split('-')[-1]
if t.lower() == 'infinite':
timeout = DEFAULTTIMEOUT # Default to 1800 secods for infinite
else: # requests
timeout = long(t)
except ValueError:
errors.append("Bad timeout value")
if timeout > MAXTIMEOUT:
errors.append("Timeout request is greater than %s" % MAXTIMEOUT)
return timeout, errors
class LockItem(Persistent):
implements(ILockItem)
# Use the Zope 2.3 declarative security to manage access
security = ClassSecurityInfo()
security.declarePublic('getOwner', 'getLockToken', 'getDepth',
'getTimeout', 'getTimeoutString',
'getModifiedTime', 'isValid', 'getLockScope',
'getLockType')
security.declareProtected('Change Lock Information',
'setTimeout', 'refresh')
security.declareProtected('Access contents information',
'getCreator', 'getCreatorPath')
def __init__(self, creator, owner='', depth=0, timeout='Infinite',
locktype='write', lockscope='exclusive', token=None):
errors = []
# First check the values and raise value errors if outside of contract
if not getattr(creator, 'getUserName', None):
errors.append("Creator not a user object")
if str(depth).lower() not in ('0', 'infinity'):
errors.append("Depth must be 0 or infinity")
if locktype.lower() != 'write':
errors.append("Lock type '%s' not supported" % locktype)
if lockscope.lower() != 'exclusive':
errors.append("Lock scope '%s' not supported" % lockscope)
timeout, e = validateTimeout(timeout)
errors = errors + e
# Finally, if there were errors, report them ALL to on high
if errors:
raise ValueError, errors
# AccessControl.owner.ownerInfo returns the id of the creator
# and the path to the UserFolder they're defined in
self._creator = ownerInfo(creator)
self._owner = owner
self._depth = depth
self._timeout = timeout
self._locktype = locktype
self._lockscope = lockscope
self._modifiedtime = time.time()
if token is None:
self._token = generateLockToken()
else:
self._token = token
def getCreator(self):
return self._creator
def getCreatorPath(self):
db, name = self._creator
path = '/'.join(db)
return "/%s/%s" % (path, name)
def getOwner(self):
return self._owner
def getLockToken(self):
return self._token
def getDepth(self):
return self._depth
def getTimeout(self):
return self._timeout
def getTimeoutString(self):
t = str(self._timeout)
if t[-1] == 'L': t = t[:-1] # lob off Long signifier
return "Second-%s" % t
def setTimeout(self, newtimeout):
timeout, errors = validateTimeout(newtimeout)
if errors:
raise ValueError, errors
else:
self._timeout = timeout
self._modifiedtime = time.time() # reset modified
def getModifiedTime(self):
return self._modifiedtime
def refresh(self):
self._modifiedtime = time.time()
def isValid(self):
now = time.time()
modified = self._modifiedtime
timeout = self._timeout
return (modified + timeout) > now
def getLockType(self):
return self._locktype
def getLockScope(self):
return self._lockscope
def asLockDiscoveryProperty(self, ns='d',fake=0):
if fake: token = 'this-is-a-faked-no-permission-token'
else: token = self._token
s = (' <%(ns)s:activelock>\n'
' <%(ns)s:locktype><%(ns)s:%(locktype)s/></%(ns)s:locktype>\n'
' <%(ns)s:lockscope><%(ns)s:%(lockscope)s/></%(ns)s:lockscope>\n'
' <%(ns)s:depth>%(depth)s</%(ns)s:depth>\n'
' <%(ns)s:owner>%(owner)s</%(ns)s:owner>\n'
' <%(ns)s:timeout>%(timeout)s</%(ns)s:timeout>\n'
' <%(ns)s:locktoken>\n'
' <%(ns)s:href>opaquelocktoken:%(locktoken)s</%(ns)s:href>\n'
' </%(ns)s:locktoken>\n'
' </%(ns)s:activelock>\n'
) % {
'ns': ns,
'locktype': self._locktype,
'lockscope': self._lockscope,
'depth': self._depth,
'owner': self._owner,
'timeout': self.getTimeoutString(),
'locktoken': token,
}
return s
def asXML(self):
s = """<?xml version="1.0" encoding="utf-8" ?>
<d:prop xmlns:d="DAV:">
<d:lockdiscovery>
%s
</d:lockdiscovery>
</d:prop>""" % self.asLockDiscoveryProperty(ns="d")
return s
InitializeClass(LockItem)
deprecated(
'Please import from OFS.LockItem.',
LockItem='OFS.LockItem:LockItem',
MAXTIMEOUT='OFS.LockItem:MAXTIMEOUT',
DEFAULTTIMEOUT='OFS.LockItem:DEFAULTTIMEOUT',
validateTimeout='OFS.LockItem:validateTimeout',
)
......@@ -7,150 +7,20 @@
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""WebDAV support - lockable item.
"""
from AccessControl.class_init import InitializeClass
from AccessControl.SecurityInfo import ClassSecurityInfo
from Acquisition import aq_base
from Persistence import PersistentMapping
from zope.interface import implements
from zope.deferredimport import deprecated
from webdav.EtagSupport import EtagSupport
from webdav.interfaces import ILockItem
from webdav.interfaces import IWriteLock
deprecated(
'Please import from OFS.Lockable.',
LockableItem='OFS.Lockable:LockableItem',
wl_isLocked='OFS.Lockable:wl_isLocked',
wl_isLockable='OFS.Lockable:wl_isLockable',
)
# BBB
from zExceptions import ResourceLockedError # NOQA
class LockableItem(EtagSupport):
"""Implements the WriteLock interface.
This class is inherited by Resource which is then inherited by the
majority of Zope objects.
"""
implements(IWriteLock)
# Protect methods using declarative security
security = ClassSecurityInfo()
security.declarePrivate('wl_lockmapping')
security.declarePublic('wl_isLocked', 'wl_getLock', 'wl_isLockedByUser',
'wl_lockItems', 'wl_lockValues', 'wl_lockTokens',)
security.declareProtected('WebDAV Lock items', 'wl_setLock')
security.declareProtected('WebDAV Unlock items', 'wl_delLock')
security.declareProtected('Manage WebDAV Locks', 'wl_clearLocks')
# Setting default roles for permissions - we want owners of conent
# to be able to lock.
security.setPermissionDefault('WebDAV Lock items', ('Manager', 'Owner',))
security.setPermissionDefault('WebDAV Unlock items', ('Manager', 'Owner',))
def wl_lockmapping(self, killinvalids=0, create=0):
""" if 'killinvalids' is 1, locks who are no longer valid
will be deleted """
try:
locks = getattr(self, '_dav_writelocks', None)
except Exception:
locks = None
if locks is None:
if create:
locks = self._dav_writelocks = PersistentMapping()
else:
# Don't generate a side effect transaction.
locks = {}
return locks
elif killinvalids:
# Delete invalid locks
for token, lock in locks.items():
if not lock.isValid():
del locks[token]
if (not locks) and hasattr(aq_base(self),
'__no_valid_write_locks__'):
self.__no_valid_write_locks__()
return locks
else:
return locks
def wl_lockItems(self, killinvalids=0):
return self.wl_lockmapping(killinvalids).items()
def wl_lockValues(self, killinvalids=0):
return self.wl_lockmapping(killinvalids).values()
def wl_lockTokens(self, killinvalids=0):
return self.wl_lockmapping(killinvalids).keys()
def wl_hasLock(self, token, killinvalids=0):
if not token:
return 0
return token in self.wl_lockmapping(killinvalids).keys()
def wl_isLocked(self):
# returns true if 'self' is locked at all
# We set 'killinvalids' to 1 to delete all locks who are no longer
# valid (timeout has been exceeded)
locks = self.wl_lockmapping(killinvalids=1)
if locks.keys():
return 1
else:
return 0
def wl_setLock(self, locktoken, lock):
locks = self.wl_lockmapping(create=1)
if ILockItem.providedBy(lock):
if locktoken == lock.getLockToken():
locks[locktoken] = lock
else:
raise ValueError('Lock tokens do not match')
else:
raise ValueError('Lock does not implement the LockItem Interface')
def wl_getLock(self, locktoken):
locks = self.wl_lockmapping(killinvalids=1)
return locks.get(locktoken, None)
def wl_delLock(self, locktoken):
locks = self.wl_lockmapping()
if locktoken in locks:
del locks[locktoken]
def wl_clearLocks(self):
# Called by lock management machinery to quickly and effectively
# destroy all locks.
try:
locks = self.wl_lockmapping()
locks.clear()
except:
# The locks may be totally messed up, so we'll just delete
# and replace.
if hasattr(self, '_dav_writelocks'):
del self._dav_writelocks
if IWriteLock.providedBy(self):
self._dav_writelocks = PersistentMapping()
# Call into a special hook used by LockNullResources to delete
# themselves. Could be used by other objects who want to deal
# with the state of empty locks.
if hasattr(aq_base(self), '__no_valid_write_locks__'):
self.__no_valid_write_locks__()
InitializeClass(LockableItem)
def wl_isLocked(ob):
""" Returns true if the object is locked, returns 0 if the object
is not locked or does not implement the WriteLockInterface """
return wl_isLockable(ob) and ob.wl_isLocked()
def wl_isLockable(ob):
return IWriteLock.providedBy(ob)
deprecated(
'Please import from zExceptions.',
ResourceLockedError='zExceptions:ResourceLockedError',
)
......@@ -31,10 +31,14 @@ from AccessControl.Permissions import webdav_access
from Acquisition import aq_base
from Acquisition import aq_inner
from Acquisition import aq_parent
from App.Common import rfc1123_date
from ExtensionClass import Base
from OFS.event import ObjectClonedEvent
from OFS.event import ObjectWillBeMovedEvent
from OFS.interfaces import IWriteLock
from OFS.Lockable import LockableItem
from OFS.Lockable import wl_isLockable
from OFS.Lockable import wl_isLocked
from OFS.subscribers import compatibilityCall
from zExceptions import BadRequest
from zExceptions import Forbidden
......@@ -50,16 +54,12 @@ from zope.lifecycleevent import ObjectCopiedEvent
from zope.lifecycleevent import ObjectMovedEvent
from zope.container.contained import notifyContainerModified
from webdav.Lockable import LockableItem
from webdav.Lockable import wl_isLockable
from webdav.Lockable import wl_isLocked
from webdav.common import absattr
from webdav.common import Conflict
from webdav.common import IfParser
from webdav.common import isDavCollection
from webdav.common import Locked
from webdav.common import PreconditionFailed
from webdav.common import rfc1123_date
from webdav.common import tokenFinder
from webdav.common import urlbase
from webdav.common import urlfix
......
......@@ -5,7 +5,7 @@ import httplib, mimetools
from types import FileType
from mimetypes import guess_type
from base64 import encodestring
from common import rfc1123_date
from App.Common import rfc1123_date
from cStringIO import StringIO
from random import random
from urllib import quote
......
......@@ -12,50 +12,73 @@
##############################################################################
"""Commonly used functions for WebDAV support modules."""
import random
import re
import time
import urllib
from Acquisition import aq_base
from App.Common import iso8601_date
from App.Common import rfc1123_date
from App.Common import rfc850_date
from zExceptions import (
HTTPConflict,
HTTPLocked,
HTTPPreconditionFailed,
HTTPUnsupportedMediaType,
)
from zope.deferredimport import deprecated
deprecated(
'Please import from App.Common.',
iso8601_date='App.Common:iso8601_date',
rfc1123_date='App.Common:rfc1123_date',
rfc850_date='App.Common:rfc850_date',
)
deprecated(
'Please import from OFS.LockItem.',
_randGen='OFS.Locked:_randGen',
generateLockToken='OFS.LockItem:generateLockToken',
)
_randGen = random.Random(time.time())
class WebDAVException(Exception):
pass
class Locked(WebDAVException):
class Locked(WebDAVException, HTTPLocked):
pass
class PreconditionFailed(WebDAVException):
class PreconditionFailed(WebDAVException, HTTPPreconditionFailed):
pass
class Conflict(WebDAVException):
class Conflict(WebDAVException, HTTPConflict):
pass
class UnsupportedMediaType(WebDAVException):
class UnsupportedMediaType(WebDAVException, HTTPUnsupportedMediaType):
pass
def absattr(attr):
if callable(attr):
return attr()
return attr
def urljoin(url, s):
url = url.rstrip('/')
s = s.lstrip('/')
return '/'.join((url, s))
def urlfix(url, s):
n=len(s)
if url[-n:]==s: url=url[:-n]
if len(url) > 1 and url[-1]=='/':
url=url[:-1]
n = len(s)
if url[-n:] == s:
url = url[:-n]
if len(url) > 1 and url[-1] == '/':
url = url[:-1]
return url
def is_acquired(ob):
# Return true if this object is not a direct
# subobject of its aq_parent object.
......@@ -68,56 +91,61 @@ def is_acquired(ob):
return 0
return 1
def urlbase(url, ftype=urllib.splittype, fhost=urllib.splithost):
# Return a '/' based url such as '/foo/bar', removing
# type, host and port information if necessary.
if url[0]=='/': return url
type, uri=ftype(url)
host, uri=fhost(uri)
if url[0] == '/':
return url
type, uri = ftype(url)
host, uri = fhost(uri)
return uri or '/'
def generateLockToken():
# Generate a lock token
return '%s-%s-00105A989226:%.03f' % \
(_randGen.random(),_randGen.random(),time.time())
def isDavCollection(object):
"""Return true if object is a DAV collection."""
return getattr(object, '__dav_collection__', 0)
def tokenFinder(token):
# takes a string like '<opaquelocktoken:afsdfadfadf> and returns the token
# part.
if not token: return None # An empty string was passed in
if token[0] == '[': return None # An Etag was passed in
if token[0] == '<': token = token[1:-1]
return token[token.find(':')+1:]
### If: header handling support. IfParser returns a sequence of
### TagList objects in the order they were parsed which can then
### be used in WebDAV methods to decide whether an operation can
### proceed or to raise HTTP Error 412 (Precondition failed)
if not token:
return None # An empty string was passed in
if token[0] == '[':
return None # An Etag was passed in
if token[0] == '<':
token = token[1:-1]
return token[token.find(':') + 1:]
# If: header handling support. IfParser returns a sequence of
# TagList objects in the order they were parsed which can then
# be used in WebDAV methods to decide whether an operation can
# proceed or to raise HTTP Error 412 (Precondition failed)
IfHdr = re.compile(
r"(?P<resource><.+?>)?\s*\((?P<listitem>[^)]+)\)"
)
)
ListItem = re.compile(
r"(?P<not>not)?\s*(?P<listitem><[a-zA-Z]+:[^>]*>|\[.*?\])",
re.I)
class TagList:
def __init__(self):
self.resource = None
self.list = []
self.NOTTED = 0
def IfParser(hdr):
out = []
i = 0
while 1:
m = IfHdr.search(hdr[i:])
if not m: break
if not m:
break
i = i + m.end()
tag = TagList()
......@@ -130,16 +158,19 @@ def IfParser(hdr):
return out
def ListParser(listitem):
out = []
NOTTED = 0
i = 0
while 1:
m = ListItem.search(listitem[i:])
if not m: break
if not m:
break
i = i + m.end()
out.append(m.group('listitem'))
if m.group('not'): NOTTED = 1
if m.group('not'):
NOTTED = 1
return NOTTED, out
......@@ -23,6 +23,7 @@ from AccessControl.SecurityManagement import getSecurityManager
from Acquisition import aq_base
from Acquisition import aq_parent
from OFS.interfaces import IWriteLock
from OFS.LockItem import LockItem
from zExceptions import BadRequest
from zExceptions import Forbidden
......@@ -33,7 +34,6 @@ from webdav.common import PreconditionFailed
from webdav.common import urlbase
from webdav.common import urlfix
from webdav.common import urljoin
from webdav.LockItem import LockItem
from webdav.PropertySheets import DAVProperties
from webdav.xmltools import XmlParser
......
......@@ -13,137 +13,15 @@
"""webdav interfaces.
"""
from zope.interface import Interface
from zope.deferredimport import deprecated
from zope.schema import Bool, Tuple
from OFS.interfaces import IWriteLock
class ILockItem(Interface):
"""A LockItem contains information about a lock.
This includes:
o The locktoken uri (used to identify the lock by WebDAV)
o The lock owner (The string passed in the 'owner' property by WebDAV)
o The lock creator (the Zope user who physically owns the lock)
o Depth
o Timeout information
o Modified time (for calculating timeouts)
o LockType (only EXCLUSIVE is supported right now)
"""
# XXX: WAAAA! What is a ctor doing in the interface?
def __init__(creator, owner, depth=0, timeout='Infinity',
locktype='write', lockscope='exclusive', token=None):
"""\
If any of the following are untrue, a **ValueError** exception
will be raised.
- **creator** MUST be a Zope user object or string to find a
valid user object.
- **owner** MUST be a nonempty string, or type that can be converted
to a nonempty string.
- **depth** MUST be in the set {0,'infinity'}
- **timeout** MUST either be an integer, or a string in the form
of 'Seconds-nnn' where nnn is an integer. The timeout value
MUST be less than (2^32)-1. *IF* timeout is the string value
'Infinite', the timeout value will be set to 1800 (30 minutes).
(Timeout is the value in seconds from creation\modification
time until the lock MAY time out).
- **locktype** not in set {'write'} *this may expand later*
- **lockscope** not in set {'exclusive'} *this may expand later*
If the value passed in to 'token' is 'None', the a new locktoken
will be generated during the construction process.
__init__ must generate the opaquelocktoken uri used to identify the
lock (if 'token' is 'None')and set all of the above attributes on
the object.
"""
def getCreator():
""" Returns the Zope user who created the lock. This is returned
in a tuple containing the Users ID and the path to the user folder
they came from."""
def getCreatorPath():
""" Returns a string of the path to the user object in the user
folder they were found in. """
def getOwner():
""" Returns the string value of the 'owner' property sent
in by WebDAV """
def getLockToken():
""" returns the opaque lock token """
def getDepth():
""" returns the depth of the lock """
def getTimeout():
""" returns an integer value of the timeout setting """
def getTimeoutString():
""" returns the timeout value in a form acceptable by
WebDAV (ie - 'Seconds-40800') """
def setTimeout(newtimeout):
""" refreshes the timeout information """
def getModifiedTime():
""" returns a time.time value of the last time the Lock was
modified. From RFC 2518:
The timeout counter SHOULD be restarted any time an owner of the
lock sends a method to any member of the lock, including unsupported
methods or methods which are unsucscessful. The lock MUST be
refreshed if a refresh LOCK method is successfully received.
The modified time is used to calculate the refreshed value """
def refresh():
""" Tickles the locks modified time by setting it to the current
time.time() value. (As stated in the RFC, the timeout counter
SHOULD be restarted for any HTTP method called by the lock owner
on the locked object). """
def isValid():
""" Returns true if (self.getModifiedTime() + self.getTimeout())
is greater than the current time.time() value. """
# now = time.time()
# modified = self.getModifiedTime()
# timeout = self.getTimeout()
#
# return (modified + timeout > now) # there's time remaining
def getLockType():
""" returns the lock type ('write') """
def getLockScope():
""" returns the lock scope ('exclusive') """
def asLockDiscoveryProperty(ns='d'):
""" Return the lock rendered as an XML representation of a
WebDAV 'lockdiscovery' property. 'ns' is the namespace identifier
used on the XML elements."""
def asXML():
""" Render a full XML representation of a lock for WebDAV,
used when returning the value of a newly created lock. """
deprecated(
'Please import from OFS.interfaces.',
iso8601_date='OFS.interfaces:ILockItem',
)
# XXX: might contain non-API methods and outdated comments;
......
......@@ -67,10 +67,9 @@ Command line used: litmus -k http://localhost:8080/ admin admin
19. complex_cond_put...... SKIPPED
20. fail_complex_cond_put. SKIPPED
Zope's webdav package has an webdav.EtagSupport.EtagSupport
class which is inherited by the webdav.Lockable.LockableItem
class, which is in turn inherited by the
webdav.Resource.Resource class, which is in turn inherited by
Zope's OFS package has an OFS.EtagSupport.EtagSupport
class which is inherited by the OFS.Lockable.LockableItem
class, which is in turn inherited by
OFS.SimpleItem.SimpleItem (upon which almost all Zope content is
based), so potentially all Zope content can reasonably easily
generate meaningful ETags in responses. Finding out why it's
......
import unittest
import Zope2
import transaction
from zope import component
from zope import interface
from zope.interface.interfaces import IObjectEvent
from zope.testing import cleanup
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from OFS.Folder import Folder
from OFS.SimpleItem import SimpleItem
from Testing.makerequest import makerequest
from Zope2.App import zcml
Zope2.startup()
class EventLogger(object):
def __init__(self):
self.reset()
def reset(self):
self._called = []
def trace(self, ob, event):
self._called.append((ob.getId(), event.__class__.__name__))
def called(self):
return self._called
eventlog = EventLogger()
class ITestItem(interface.Interface):
pass
class TestItem(SimpleItem):
interface.implements(ITestItem)
def __init__(self, id):
self.id = id
class ITestFolder(interface.Interface):
pass
class TestFolder(Folder):
interface.implements(ITestFolder)
def __init__(self, id):
self.id = id
def _verifyObjectPaste(self, object, validate_src=1):
pass # Always allow
class EventLayer:
@classmethod
def setUp(cls):
cleanup.cleanUp()
zcml.load_site(force=True)
component.provideHandler(eventlog.trace, (ITestItem, IObjectEvent))
component.provideHandler(eventlog.trace, (ITestFolder, IObjectEvent))
@classmethod
def tearDown(cls):
cleanup.cleanUp()
class EventTest(unittest.TestCase):
layer = EventLayer
def setUp(self):
self.app = makerequest(Zope2.app())
try:
uf = self.app.acl_users
uf._doAddUser('manager', 'secret', ['Manager'], [])
user = uf.getUserById('manager').__of__(uf)
newSecurityManager(None, user)
except:
self.tearDown()
raise
def tearDown(self):
noSecurityManager()
transaction.abort()
self.app._p_jar.close()
class TestCopySupport(EventTest):
'''Tests the order in which events are fired'''
def setUp(self):
EventTest.setUp(self)
# A folder that does not verify pastes
self.app._setObject('folder', TestFolder('folder'))
self.folder = getattr(self.app, 'folder')
# The subfolder we are going to copy/move to
self.folder._setObject('subfolder', TestFolder('subfolder'))
self.subfolder = getattr(self.folder, 'subfolder')
# The document we are going to copy/move
self.folder._setObject('mydoc', TestItem('mydoc'))
# Need _p_jars
transaction.savepoint(1)
# Reset event log
eventlog.reset()
def test_5_COPY(self):
# Test COPY
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = (
'%s/subfolder/mydoc' % self.folder.absolute_url())
self.folder.mydoc.COPY(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('mydoc', 'ObjectCopiedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
('subfolder', 'ContainerModifiedEvent'),
('mydoc', 'ObjectClonedEvent')]
)
def test_6_MOVE(self):
# Test MOVE
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = (
'%s/subfolder/mydoc' % self.folder.absolute_url())
self.folder.mydoc.MOVE(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('mydoc', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent'),
('subfolder', 'ContainerModifiedEvent')]
)
def test_7_DELETE(self):
# Test DELETE
req = self.app.REQUEST
req['URL'] = '%s/mydoc' % self.folder.absolute_url()
self.folder.mydoc.DELETE(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('mydoc', 'ObjectWillBeRemovedEvent'),
('mydoc', 'ObjectRemovedEvent'),
('folder', 'ContainerModifiedEvent')]
)
class TestCopySupportSublocation(EventTest):
'''Tests the order in which events are fired'''
def setUp(self):
EventTest.setUp(self)
# A folder that does not verify pastes
self.app._setObject('folder', TestFolder('folder'))
self.folder = getattr(self.app, 'folder')
# The subfolder we are going to copy/move to
self.folder._setObject('subfolder', TestFolder('subfolder'))
self.subfolder = getattr(self.folder, 'subfolder')
# The folder we are going to copy/move
self.folder._setObject('myfolder', TestFolder('myfolder'))
self.myfolder = getattr(self.folder, 'myfolder')
# The "sublocation" inside our folder we are going to watch
self.myfolder._setObject('mydoc', TestItem('mydoc'))
# Need _p_jars
transaction.savepoint(1)
# Reset event log
eventlog.reset()
def assertEqual(self, first, second, msg=None):
# XXX: Compare sets as the order of event handlers cannot be
# relied on between objects.
if not set(first) == set(second):
raise self.failureException(
(msg or '%r != %r' % (first, second)))
def test_5_COPY(self):
# Test COPY
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = (
'%s/subfolder/myfolder' % self.folder.absolute_url())
self.folder.myfolder.COPY(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('myfolder', 'ObjectCopiedEvent'),
('mydoc', 'ObjectCopiedEvent'),
('myfolder', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('myfolder', 'ObjectAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
('subfolder', 'ContainerModifiedEvent'),
('myfolder', 'ObjectClonedEvent'),
('mydoc', 'ObjectClonedEvent')]
)
def test_6_MOVE(self):
# Test MOVE
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = (
'%s/subfolder/myfolder' % self.folder.absolute_url())
self.folder.myfolder.MOVE(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('myfolder', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectWillBeMovedEvent'),
('myfolder', 'ObjectMovedEvent'),
('mydoc', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent'),
('subfolder', 'ContainerModifiedEvent')]
)
def test_7_DELETE(self):
# Test DELETE
req = self.app.REQUEST
req['URL'] = '%s/myfolder' % self.folder.absolute_url()
self.folder.myfolder.DELETE(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('myfolder', 'ObjectWillBeRemovedEvent'),
('mydoc', 'ObjectWillBeRemovedEvent'),
('myfolder', 'ObjectRemovedEvent'),
('mydoc', 'ObjectRemovedEvent'),
('folder', 'ContainerModifiedEvent')]
)
import unittest
import Zope2
import transaction
from zope.testing import cleanup
from Testing.makerequest import makerequest
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from OFS.metaconfigure import setDeprecatedManageAddDelete
from OFS.SimpleItem import SimpleItem
from OFS.Folder import Folder
from Zope2.App import zcml
Zope2.startup()
class EventLogger(object):
def __init__(self):
self.reset()
def reset(self):
self._called = []
def trace(self, ob, event):
self._called.append((ob.getId(), event))
def called(self):
return self._called
eventlog = EventLogger()
class TestItem(SimpleItem):
def __init__(self, id):
self.id = id
def manage_afterAdd(self, item, container):
eventlog.trace(self, 'manage_afterAdd')
def manage_afterClone(self, item):
eventlog.trace(self, 'manage_afterClone')
def manage_beforeDelete(self, item, container):
eventlog.trace(self, 'manage_beforeDelete')
class TestFolder(Folder):
def __init__(self, id):
self.id = id
def _verifyObjectPaste(self, object, validate_src=1):
pass # Always allow
def manage_afterAdd(self, item, container):
eventlog.trace(self, 'manage_afterAdd')
Folder.manage_afterAdd(self, item, container)
def manage_afterClone(self, item):
eventlog.trace(self, 'manage_afterClone')
Folder.manage_afterClone(self, item)
def manage_beforeDelete(self, item, container):
eventlog.trace(self, 'manage_beforeDelete')
Folder.manage_beforeDelete(self, item, container)
class HookLayer:
@classmethod
def setUp(cls):
cleanup.cleanUp()
zcml.load_site(force=True)
setDeprecatedManageAddDelete(TestItem)
setDeprecatedManageAddDelete(TestFolder)
@classmethod
def tearDown(cls):
cleanup.cleanUp()
class HookTest(unittest.TestCase):
layer = HookLayer
def setUp(self):
self.app = makerequest(Zope2.app())
try:
uf = self.app.acl_users
uf._doAddUser('manager', 'secret', ['Manager'], [])
user = uf.getUserById('manager').__of__(uf)
newSecurityManager(None, user)
except:
self.tearDown()
raise
def tearDown(self):
noSecurityManager()
transaction.abort()
self.app._p_jar.close()
class TestCopySupport(HookTest):
'''Tests the order in which add/clone/del hooks are called'''
def setUp(self):
HookTest.setUp(self)
# A folder that does not verify pastes
self.app._setObject('folder', TestFolder('folder'))
self.folder = getattr(self.app, 'folder')
# The subfolder we are going to copy/move to
self.folder._setObject('subfolder', TestFolder('subfolder'))
self.subfolder = getattr(self.folder, 'subfolder')
# The document we are going to copy/move
self.folder._setObject('mydoc', TestItem('mydoc'))
# Need _p_jars
transaction.savepoint(1)
# Reset event log
eventlog.reset()
def test_5_COPY(self):
# Test COPY
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = (
'%s/subfolder/mydoc' % self.folder.absolute_url())
self.folder.mydoc.COPY(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('mydoc', 'manage_afterAdd'),
('mydoc', 'manage_afterClone')]
)
def test_6_MOVE(self):
# Test MOVE
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = (
'%s/subfolder/mydoc' % self.folder.absolute_url())
self.folder.mydoc.MOVE(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('mydoc', 'manage_afterAdd')]
)
def test_7_DELETE(self):
# Test DELETE
req = self.app.REQUEST
req['URL'] = '%s/mydoc' % self.folder.absolute_url()
self.folder.mydoc.DELETE(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('mydoc', 'manage_beforeDelete')]
)
class TestCopySupportSublocation(HookTest):
'''Tests the order in which add/clone/del hooks are called'''
def setUp(self):
HookTest.setUp(self)
# A folder that does not verify pastes
self.app._setObject('folder', TestFolder('folder'))
self.folder = getattr(self.app, 'folder')
# The subfolder we are going to copy/move to
self.folder._setObject('subfolder', TestFolder('subfolder'))
self.subfolder = getattr(self.folder, 'subfolder')
# The folder we are going to copy/move
self.folder._setObject('myfolder', TestFolder('myfolder'))
self.myfolder = getattr(self.folder, 'myfolder')
# The "sublocation" inside our folder we are going to watch
self.myfolder._setObject('mydoc', TestItem('mydoc'))
# Need _p_jars
transaction.savepoint(1)
# Reset event log
eventlog.reset()
def test_5_COPY(self):
# Test COPY
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = (
'%s/subfolder/myfolder' % self.folder.absolute_url())
self.folder.myfolder.COPY(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('myfolder', 'manage_afterAdd'),
('mydoc', 'manage_afterAdd'),
('myfolder', 'manage_afterClone'),
('mydoc', 'manage_afterClone')]
)
def test_6_MOVE(self):
# Test MOVE
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = (
'%s/subfolder/myfolder' % self.folder.absolute_url())
self.folder.myfolder.MOVE(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('myfolder', 'manage_beforeDelete'),
('myfolder', 'manage_afterAdd'),
('mydoc', 'manage_afterAdd')]
)
def test_7_DELETE(self):
# Test DELETE
req = self.app.REQUEST
req['URL'] = '%s/myfolder' % self.folder.absolute_url()
self.folder.myfolder.DELETE(req, req.RESPONSE)
self.assertEqual(
eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('myfolder', 'manage_beforeDelete')]
)
......@@ -20,7 +20,7 @@ persistent = 4.2.1
Products.BTreeFolder2 = 3.0
Products.ExternalMethod = 3.0
Products.MailHost = 3.0
Products.PythonScripts = 3.0
Products.PythonScripts = 4.0
Products.Sessions = 4.0
Products.SiteErrorLog = 4.0
Products.StandardCacheManagers = 3.0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment