Commit 14f00911 authored by Stefan H. Holek's avatar Stefan H. Holek

Collector #2298: webdav.Resource.COPY and webdav.Resource.MOVE did

not send the expected copy/move events.

Note: Tests live in OFS.tests.
parent 0a6e6f19
...@@ -88,6 +88,9 @@ Zope Changes ...@@ -88,6 +88,9 @@ Zope Changes
Bugs Fixed Bugs Fixed
- Collector #2298: webdav.Resource.COPY and webdav.Resource.MOVE did
not send the expected copy/move events.
- Collector #2296: Fixed import of ZClass products, broken by removal - Collector #2296: Fixed import of ZClass products, broken by removal
of BBB support for pasting objects whose meta_type info was of BBB support for pasting objects whose meta_type info was
permission-free. permission-free.
......
<configure
xmlns="http://namespaces.zope.org/zope"
i18n_domain="extfile">
<!-- Item -->
<subscriber
handler=".testCopySupportEvents.objectAddedEvent"
for=".testCopySupportEvents.ITestItem
zope.app.container.interfaces.IObjectAddedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectCopiedEvent"
for=".testCopySupportEvents.ITestItem
zope.lifecycleevent.interfaces.IObjectCopiedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectMovedEvent"
for=".testCopySupportEvents.ITestItem
zope.app.container.interfaces.IObjectMovedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectRemovedEvent"
for=".testCopySupportEvents.ITestItem
zope.app.container.interfaces.IObjectRemovedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectWillBeAddedEvent"
for=".testCopySupportEvents.ITestItem
OFS.interfaces.IObjectWillBeAddedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectWillBeMovedEvent"
for=".testCopySupportEvents.ITestItem
OFS.interfaces.IObjectWillBeMovedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectWillBeRemovedEvent"
for=".testCopySupportEvents.ITestItem
OFS.interfaces.IObjectWillBeRemovedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectClonedEvent"
for=".testCopySupportEvents.ITestItem
OFS.interfaces.IObjectClonedEvent"
/>
<!-- Folder -->
<subscriber
handler=".testCopySupportEvents.objectAddedEvent"
for=".testCopySupportEvents.ITestFolder
zope.app.container.interfaces.IObjectAddedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectCopiedEvent"
for=".testCopySupportEvents.ITestFolder
zope.lifecycleevent.interfaces.IObjectCopiedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectMovedEvent"
for=".testCopySupportEvents.ITestFolder
zope.app.container.interfaces.IObjectMovedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectRemovedEvent"
for=".testCopySupportEvents.ITestFolder
zope.app.container.interfaces.IObjectRemovedEvent"
/>
<subscriber
handler=".testCopySupportEvents.containerModifiedEvent"
for=".testCopySupportEvents.ITestFolder
zope.app.container.interfaces.IContainerModifiedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectWillBeAddedEvent"
for=".testCopySupportEvents.ITestFolder
OFS.interfaces.IObjectWillBeAddedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectWillBeMovedEvent"
for=".testCopySupportEvents.ITestFolder
OFS.interfaces.IObjectWillBeMovedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectWillBeRemovedEvent"
for=".testCopySupportEvents.ITestFolder
OFS.interfaces.IObjectWillBeRemovedEvent"
/>
<subscriber
handler=".testCopySupportEvents.objectClonedEvent"
for=".testCopySupportEvents.ITestFolder
OFS.interfaces.IObjectClonedEvent"
/>
</configure>
import unittest
import Testing
import Zope2
Zope2.startup()
import os
import transaction
from Testing.makerequest import makerequest
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from OFS.SimpleItem import SimpleItem
from OFS.Folder import Folder
from zope import interface
from zope.app.container.interfaces import IObjectAddedEvent
from zope.app.container.interfaces import IObjectRemovedEvent
from OFS.interfaces import IObjectWillBeAddedEvent
from OFS.interfaces import IObjectWillBeRemovedEvent
from zope.testing import cleanup
from Products.Five import zcml
from Globals import package_home
class EventLogger(object):
def __init__(self):
self.reset()
def reset(self):
self._called = []
def trace(self, ob, event):
self._called.append((ob.getId(), event))
def called(self):
return self._called
eventlog = EventLogger()
class ITestItem(interface.Interface):
pass
class TestItem(SimpleItem):
interface.implements(ITestItem)
def __init__(self, id):
self.id = id
class ITestFolder(interface.Interface):
pass
class TestFolder(Folder):
interface.implements(ITestFolder)
def __init__(self, id):
self.id = id
def _verifyObjectPaste(self, object, validate_src=1):
pass # Always allow
# See events.zcml
def objectAddedEvent(ob, event):
eventlog.trace(ob, 'ObjectAddedEvent')
def objectCopiedEvent(ob, event):
eventlog.trace(ob, 'ObjectCopiedEvent')
def objectMovedEvent(ob, event):
if IObjectAddedEvent.providedBy(event):
return
if IObjectRemovedEvent.providedBy(event):
return
eventlog.trace(ob, 'ObjectMovedEvent')
def objectRemovedEvent(ob, event):
eventlog.trace(ob, 'ObjectRemovedEvent')
def containerModifiedEvent(ob, event):
eventlog.trace(ob, 'ContainerModifiedEvent')
def objectWillBeAddedEvent(ob, event):
eventlog.trace(ob, 'ObjectWillBeAddedEvent')
def objectWillBeMovedEvent(ob, event):
if IObjectWillBeAddedEvent.providedBy(event):
return
if IObjectWillBeRemovedEvent.providedBy(event):
return
eventlog.trace(ob, 'ObjectWillBeMovedEvent')
def objectWillBeRemovedEvent(ob, event):
eventlog.trace(ob, 'ObjectWillBeRemovedEvent')
def objectClonedEvent(ob, event):
eventlog.trace(ob, 'ObjectClonedEvent')
class EventLayer:
@classmethod
def setUp(cls):
cleanup.cleanUp()
zcml._initialized = 0
zcml.load_site()
import OFS.tests
file = os.path.join(package_home(globals()), 'events.zcml')
zcml.load_config(file, package=OFS.tests)
@classmethod
def tearDown(cls):
cleanup.cleanUp()
zcml._initialized = 0
class EventTest(unittest.TestCase):
layer = EventLayer
def setUp(self):
self.app = makerequest(Zope2.app())
try:
uf = self.app.acl_users
uf._doAddUser('manager', 'secret', ['Manager'], [])
user = uf.getUserById('manager').__of__(uf)
newSecurityManager(None, user)
except:
self.tearDown()
raise
def tearDown(self):
noSecurityManager()
transaction.abort()
self.app._p_jar.close()
class TestCopySupport(EventTest):
'''Tests the order in which events are fired'''
def setUp(self):
EventTest.setUp(self)
# A folder that does not verify pastes
self.app._setObject('folder', TestFolder('folder'))
self.folder = getattr(self.app, 'folder')
# The subfolder we are going to copy/move to
self.folder._setObject('subfolder', TestFolder('subfolder'))
self.subfolder = getattr(self.folder, 'subfolder')
# The document we are going to copy/move
self.folder._setObject('mydoc', TestItem('mydoc'))
# Need _p_jars
transaction.savepoint(1)
# Reset event log
eventlog.reset()
def test_1_Clone(self):
# Test clone
self.subfolder.manage_clone(self.folder.mydoc, 'mydoc')
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectCopiedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
('subfolder', 'ContainerModifiedEvent'),
('mydoc', 'ObjectClonedEvent')]
)
def test_2_CopyPaste(self):
# Test copy/paste
cb = self.folder.manage_copyObjects(['mydoc'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectCopiedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
('subfolder', 'ContainerModifiedEvent'),
('mydoc', 'ObjectClonedEvent')]
)
def test_3_CutPaste(self):
# Test cut/paste
cb = self.folder.manage_cutObjects(['mydoc'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent'),
('subfolder', 'ContainerModifiedEvent')]
)
def test_4_Rename(self):
# Test rename
self.folder.manage_renameObject('mydoc', 'yourdoc')
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectWillBeMovedEvent'),
('yourdoc', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent')]
)
def test_5_COPY(self):
# Test webdav COPY
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/mydoc' % self.folder.absolute_url()
self.folder.mydoc.COPY(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectCopiedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
('subfolder', 'ContainerModifiedEvent'),
('mydoc', 'ObjectClonedEvent')]
)
def test_6_MOVE(self):
# Test webdav MOVE
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/mydoc' % self.folder.absolute_url()
self.folder.mydoc.MOVE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent'),
('subfolder', 'ContainerModifiedEvent')]
)
def test_7_DELETE(self):
# Test webdav DELETE
req = self.app.REQUEST
req['URL'] = '%s/mydoc' % self.folder.absolute_url()
self.folder.mydoc.DELETE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectWillBeRemovedEvent'),
('mydoc', 'ObjectRemovedEvent'),
('folder', 'ContainerModifiedEvent')]
)
class TestCopySupportSublocation(EventTest):
'''Tests the order in which events are fired'''
def setUp(self):
EventTest.setUp(self)
# A folder that does not verify pastes
self.app._setObject('folder', TestFolder('folder'))
self.folder = getattr(self.app, 'folder')
# The subfolder we are going to copy/move to
self.folder._setObject('subfolder', TestFolder('subfolder'))
self.subfolder = getattr(self.folder, 'subfolder')
# The folder we are going to copy/move
self.folder._setObject('myfolder', TestFolder('myfolder'))
self.myfolder = getattr(self.folder, 'myfolder')
# The "sublocation" inside our folder we are going to watch
self.myfolder._setObject('mydoc', TestItem('mydoc'))
# Need _p_jars
transaction.savepoint(1)
# Reset event log
eventlog.reset()
def test_1_Clone(self):
# Test clone
self.subfolder.manage_clone(self.folder.myfolder, 'myfolder')
self.assertEqual(eventlog.called(),
[#('mydoc', 'ObjectCopiedEvent'),
('myfolder', 'ObjectCopiedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('myfolder', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
('myfolder', 'ObjectAddedEvent'),
('subfolder', 'ContainerModifiedEvent'),
('mydoc', 'ObjectClonedEvent'),
('myfolder', 'ObjectClonedEvent')]
)
def test_2_CopyPaste(self):
# Test copy/paste
cb = self.folder.manage_copyObjects(['myfolder'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
[#('mydoc', 'ObjectCopiedEvent'),
('myfolder', 'ObjectCopiedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('myfolder', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
('myfolder', 'ObjectAddedEvent'),
('subfolder', 'ContainerModifiedEvent'),
('mydoc', 'ObjectClonedEvent'),
('myfolder', 'ObjectClonedEvent')]
)
def test_3_CutPaste(self):
# Test cut/paste
cb = self.folder.manage_cutObjects(['myfolder'])
self.subfolder.manage_pasteObjects(cb)
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectWillBeMovedEvent'),
('myfolder', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectMovedEvent'),
('myfolder', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent'),
('subfolder', 'ContainerModifiedEvent')]
)
def test_4_Rename(self):
# Test rename
self.folder.manage_renameObject('myfolder', 'yourfolder')
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectWillBeMovedEvent'),
('myfolder', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectMovedEvent'),
('yourfolder', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent')]
)
def test_5_COPY(self):
# Test webdav COPY
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/myfolder' % self.folder.absolute_url()
self.folder.myfolder.COPY(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[#('mydoc', 'ObjectCopiedEvent'),
('myfolder', 'ObjectCopiedEvent'),
('mydoc', 'ObjectWillBeAddedEvent'),
('myfolder', 'ObjectWillBeAddedEvent'),
('mydoc', 'ObjectAddedEvent'),
('myfolder', 'ObjectAddedEvent'),
('subfolder', 'ContainerModifiedEvent'),
('mydoc', 'ObjectClonedEvent'),
('myfolder', 'ObjectClonedEvent')]
)
def test_6_MOVE(self):
# Test webdav MOVE
req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/myfolder' % self.folder.absolute_url()
self.folder.myfolder.MOVE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectWillBeMovedEvent'),
('myfolder', 'ObjectWillBeMovedEvent'),
('mydoc', 'ObjectMovedEvent'),
('myfolder', 'ObjectMovedEvent'),
('folder', 'ContainerModifiedEvent'),
('subfolder', 'ContainerModifiedEvent')]
)
def test_7_DELETE(self):
# Test webdav DELETE
req = self.app.REQUEST
req['URL'] = '%s/myfolder' % self.folder.absolute_url()
self.folder.myfolder.DELETE(req, req.RESPONSE)
self.assertEqual(eventlog.called(),
[('mydoc', 'ObjectWillBeRemovedEvent'),
('myfolder', 'ObjectWillBeRemovedEvent'),
('mydoc', 'ObjectRemovedEvent'),
('myfolder', 'ObjectRemovedEvent'),
('folder', 'ContainerModifiedEvent')]
)
def test_suite():
from unittest import TestSuite, makeSuite
suite = TestSuite()
suite.addTest(makeSuite(TestCopySupport))
suite.addTest(makeSuite(TestCopySupportSublocation))
return suite
...@@ -14,70 +14,49 @@ from OFS.SimpleItem import SimpleItem ...@@ -14,70 +14,49 @@ from OFS.SimpleItem import SimpleItem
from OFS.Folder import Folder from OFS.Folder import Folder
class HookCounter: class EventLogger(object):
'''Logs calls to old-school hooks'''
def __init__(self): def __init__(self):
self.reset() self.reset()
def reset(self): def reset(self):
self.count = 0 self._called = []
self.afterAdd = [0] def trace(self, ob, event):
self.afterClone = [0] self._called.append((ob.getId(), event))
self.beforeDelete = [0] def called(self):
return self._called
def manage_afterAdd(self, item, container):
self.count = self.count + 1
self.afterAdd.append(self.count)
def manage_afterClone(self, item):
self.count = self.count + 1
self.afterClone.append(self.count)
def manage_beforeDelete(self, item, container):
self.count = self.count + 1
self.beforeDelete.append(self.count)
def order(self):
return self.afterAdd[-1], self.afterClone[-1], self.beforeDelete[-1]
eventlog = EventLogger()
class TestItem(HookCounter, SimpleItem):
class TestItem(SimpleItem):
def __init__(self, id): def __init__(self, id):
HookCounter.__init__(self)
self.id = id self.id = id
def manage_afterAdd(self, item, container):
eventlog.trace(self, 'manage_afterAdd')
def manage_afterClone(self, item):
eventlog.trace(self, 'manage_afterClone')
def manage_beforeDelete(self, item, container):
eventlog.trace(self, 'manage_beforeDelete')
class TestFolder(HookCounter, Folder): class TestFolder(Folder):
def __init__(self, id): def __init__(self, id):
HookCounter.__init__(self)
self.id = id self.id = id
def _verifyObjectPaste(self, object, validate_src=1): def _verifyObjectPaste(self, object, validate_src=1):
# Don't verify pastes as our test objects don't have pass # Always allow
# factory methods registered.
pass
def manage_afterAdd(self, item, container): def manage_afterAdd(self, item, container):
HookCounter.manage_afterAdd(self, item, container) eventlog.trace(self, 'manage_afterAdd')
Folder.manage_afterAdd(self, item, container) Folder.manage_afterAdd(self, item, container)
def manage_afterClone(self, item): def manage_afterClone(self, item):
HookCounter.manage_afterClone(self, item) eventlog.trace(self, 'manage_afterClone')
Folder.manage_afterClone(self, item) Folder.manage_afterClone(self, item)
def manage_beforeDelete(self, item, container): def manage_beforeDelete(self, item, container):
HookCounter.manage_beforeDelete(self, item, container) eventlog.trace(self, 'manage_beforeDelete')
Folder.manage_beforeDelete(self, item, container) Folder.manage_beforeDelete(self, item, container)
try: from Products.Five.eventconfigure import setDeprecatedManageAddDelete
from Products.Five.eventconfigure import setDeprecatedManageAddDelete setDeprecatedManageAddDelete(TestItem)
setDeprecatedManageAddDelete(HookCounter) setDeprecatedManageAddDelete(TestFolder)
except ImportError:
pass
class HookTest(unittest.TestCase): class HookTest(unittest.TestCase):
...@@ -100,44 +79,56 @@ class HookTest(unittest.TestCase): ...@@ -100,44 +79,56 @@ class HookTest(unittest.TestCase):
class TestCopySupport(HookTest): class TestCopySupport(HookTest):
'''Tests the order in which the add/clone/del hooks are called''' '''Tests the order in which add/clone/del hooks are called'''
def setUp(self): def setUp(self):
HookTest.setUp(self) HookTest.setUp(self)
# A folder # A folder that does not verify pastes
self.app._setObject('folder', TestFolder('folder')) self.app._setObject('folder', TestFolder('folder'))
self.folder = self.app['folder'] self.folder = getattr(self.app, 'folder')
# A subfolder we are going to copy/move to # The subfolder we are going to copy/move to
self.folder._setObject('subfolder', TestFolder('subfolder')) self.folder._setObject('subfolder', TestFolder('subfolder'))
self.subfolder = self.folder['subfolder'] self.subfolder = getattr(self.folder, 'subfolder')
# A document we are going to copy/move # The document we are going to copy/move
self.folder._setObject('mydoc', TestItem('mydoc')) self.folder._setObject('mydoc', TestItem('mydoc'))
# Must have _p_jars # Need _p_jars
transaction.savepoint(1) transaction.savepoint(1)
# Reset counters # Reset event log
self.folder.mydoc.reset() eventlog.reset()
def test_1_Clone(self): def test_1_Clone(self):
# Test clone # Test clone
self.subfolder.manage_clone(self.folder.mydoc, 'yourdoc') self.subfolder.manage_clone(self.folder.mydoc, 'mydoc')
self.assertEqual(self.subfolder.yourdoc.order(), (1, 2, 0)) # add, clone self.assertEqual(eventlog.called(),
[('mydoc', 'manage_afterAdd'),
('mydoc', 'manage_afterClone')]
)
def test_2_CopyPaste(self): def test_2_CopyPaste(self):
# Test copy/paste # Test copy/paste
cb = self.folder.manage_copyObjects(['mydoc']) cb = self.folder.manage_copyObjects(['mydoc'])
self.subfolder.manage_pasteObjects(cb) self.subfolder.manage_pasteObjects(cb)
self.assertEqual(self.subfolder.mydoc.order(), (1, 2, 0)) # add, clone self.assertEqual(eventlog.called(),
[('mydoc', 'manage_afterAdd'),
('mydoc', 'manage_afterClone')]
)
def test_3_CutPaste(self): def test_3_CutPaste(self):
# Test cut/paste # Test cut/paste
cb = self.folder.manage_cutObjects(['mydoc']) cb = self.folder.manage_cutObjects(['mydoc'])
self.subfolder.manage_pasteObjects(cb) self.subfolder.manage_pasteObjects(cb)
self.assertEqual(self.subfolder.mydoc.order(), (2, 0, 1)) # del, add self.assertEqual(eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('mydoc', 'manage_afterAdd')]
)
def test_4_Rename(self): def test_4_Rename(self):
# Test rename # Test rename
self.folder.manage_renameObject('mydoc', 'yourdoc') self.folder.manage_renameObject('mydoc', 'yourdoc')
self.assertEqual(self.folder.yourdoc.order(), (2, 0, 1)) # del, add self.assertEqual(eventlog.called(),
[('mydoc', 'manage_beforeDelete'),
('yourdoc', 'manage_afterAdd')]
)
def test_5_COPY(self): def test_5_COPY(self):
# Test webdav COPY # Test webdav COPY
...@@ -145,123 +136,136 @@ class TestCopySupport(HookTest): ...@@ -145,123 +136,136 @@ class TestCopySupport(HookTest):
req.environ['HTTP_DEPTH'] = 'infinity' req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/mydoc' % self.folder.absolute_url() req.environ['HTTP_DESTINATION'] = '%s/subfolder/mydoc' % self.folder.absolute_url()
self.folder.mydoc.COPY(req, req.RESPONSE) self.folder.mydoc.COPY(req, req.RESPONSE)
self.assertEqual(req.RESPONSE.getStatus(), 201) self.assertEqual(eventlog.called(),
self.assertEqual(self.subfolder.mydoc.order(), (1, 2, 0)) # add, clone [('mydoc', 'manage_afterAdd'),
('mydoc', 'manage_afterClone')]
)
def test_6_MOVE(self): def test_6_MOVE(self):
# Test webdav MOVE # Test webdav MOVE
req = self.app.REQUEST req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity' req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/mydoc' % self.folder.absolute_url() req.environ['HTTP_DESTINATION'] = '%s/subfolder/mydoc' % self.folder.absolute_url()
old = self.folder.mydoc
self.folder.mydoc.MOVE(req, req.RESPONSE) self.folder.mydoc.MOVE(req, req.RESPONSE)
self.assertEqual(req.RESPONSE.getStatus(), 201) self.assertEqual(eventlog.called(),
self.assertEqual(old.order(), (0, 0, 1)) # del [('mydoc', 'manage_beforeDelete'),
self.assertEqual(self.subfolder.mydoc.order(), (1, 0, 0)) # add ('mydoc', 'manage_afterAdd')]
)
def test_7_DELETE(self): def test_7_DELETE(self):
# Test webdav DELETE # Test webdav DELETE
req = self.app.REQUEST req = self.app.REQUEST
req['URL'] = '%s/mydoc' % self.folder.absolute_url() req['URL'] = '%s/mydoc' % self.folder.absolute_url()
old = self.folder.mydoc
self.folder.mydoc.DELETE(req, req.RESPONSE) self.folder.mydoc.DELETE(req, req.RESPONSE)
self.assertEqual(req.RESPONSE.getStatus(), 204) self.assertEqual(eventlog.called(),
self.assertEqual(old.order(), (0, 0, 1)) # del [('mydoc', 'manage_beforeDelete')]
)
class TestCopySupportSublocation(HookTest): class TestCopySupportSublocation(HookTest):
'''Tests the order in which the add/clone/del hooks are called''' '''Tests the order in which add/clone/del hooks are called'''
def setUp(self): def setUp(self):
HookTest.setUp(self) HookTest.setUp(self)
# A folder # A folder that does not verify pastes
self.app._setObject('folder', TestFolder('folder')) self.app._setObject('folder', TestFolder('folder'))
self.folder = self.app['folder'] self.folder = getattr(self.app, 'folder')
# A subfolder we are going to copy/move to # The subfolder we are going to copy/move to
self.folder._setObject('subfolder', TestFolder('subfolder')) self.folder._setObject('subfolder', TestFolder('subfolder'))
self.subfolder = self.folder['subfolder'] self.subfolder = getattr(self.folder, 'subfolder')
# A folderish object we are going to copy/move # The folder we are going to copy/move
self.folder._setObject('myfolder', TestFolder('myfolder')) self.folder._setObject('myfolder', TestFolder('myfolder'))
self.myfolder = self.folder['myfolder'] self.myfolder = getattr(self.folder, 'myfolder')
# A "sublocation" inside myfolder we are going to watch # The "sublocation" inside our folder we are going to watch
self.myfolder._setObject('mydoc', TestItem('mydoc')) self.myfolder._setObject('mydoc', TestItem('mydoc'))
# Must have _p_jars # Need _p_jars
transaction.savepoint(1) transaction.savepoint(1)
# Reset counters # Reset event log
self.myfolder.reset() eventlog.reset()
self.myfolder.mydoc.reset()
def test_1_Clone(self): def test_1_Clone(self):
# Test clone # Test clone
self.subfolder.manage_clone(self.folder.myfolder, 'yourfolder') self.subfolder.manage_clone(self.folder.myfolder, 'myfolder')
self.assertEqual(self.subfolder.yourfolder.order(), (1, 2, 0)) # add, clone self.assertEqual(eventlog.called(),
self.assertEqual(self.subfolder.yourfolder.mydoc.order(), (1, 2, 0)) # add, clone [('myfolder', 'manage_afterAdd'),
('mydoc', 'manage_afterAdd'),
('myfolder', 'manage_afterClone'),
('mydoc', 'manage_afterClone')]
)
def test_2_CopyPaste(self): def test_2_CopyPaste(self):
# Test copy/paste # Test copy/paste
cb = self.folder.manage_copyObjects(['myfolder']) cb = self.folder.manage_copyObjects(['myfolder'])
self.subfolder.manage_pasteObjects(cb) self.subfolder.manage_pasteObjects(cb)
self.assertEqual(self.subfolder.myfolder.order(), (1, 2, 0)) # add, clone self.assertEqual(eventlog.called(),
self.assertEqual(self.subfolder.myfolder.mydoc.order(), (1, 2, 0)) # add, clone [('myfolder', 'manage_afterAdd'),
('mydoc', 'manage_afterAdd'),
('myfolder', 'manage_afterClone'),
('mydoc', 'manage_afterClone')]
)
def test_3_CutPaste(self): def test_3_CutPaste(self):
# Test cut/paste # Test cut/paste
cb = self.folder.manage_cutObjects(['myfolder']) cb = self.folder.manage_cutObjects(['myfolder'])
self.subfolder.manage_pasteObjects(cb) self.subfolder.manage_pasteObjects(cb)
self.assertEqual(self.subfolder.myfolder.order(), (2, 0, 1)) # del, add self.assertEqual(eventlog.called(),
self.assertEqual(self.subfolder.myfolder.mydoc.order(), (2, 0, 1)) # del, add [('mydoc', 'manage_beforeDelete'),
('myfolder', 'manage_beforeDelete'),
('myfolder', 'manage_afterAdd'),
('mydoc', 'manage_afterAdd')]
)
def test_4_Rename(self): def test_4_Rename(self):
# Test rename # Test rename
self.folder.manage_renameObject('myfolder', 'yourfolder') self.folder.manage_renameObject('myfolder', 'yourfolder')
self.assertEqual(self.folder.yourfolder.order(), (2, 0, 1)) # del, add self.assertEqual(eventlog.called(),
self.assertEqual(self.folder.yourfolder.mydoc.order(), (2, 0, 1)) # del, add [('mydoc', 'manage_beforeDelete'),
('myfolder', 'manage_beforeDelete'),
('yourfolder', 'manage_afterAdd'),
('mydoc', 'manage_afterAdd')]
)
def test_5_COPY(self): def test_5_COPY(self):
# Test webdav COPY # Test webdav COPY
#
# See http://www.zope.org/Collectors/Zope/2169
#
req = self.app.REQUEST req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity' req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/yourfolder' % self.folder.absolute_url() req.environ['HTTP_DESTINATION'] = '%s/subfolder/myfolder' % self.folder.absolute_url()
self.folder.myfolder.COPY(req, req.RESPONSE) self.folder.myfolder.COPY(req, req.RESPONSE)
self.assertEqual(req.RESPONSE.getStatus(), 201) self.assertEqual(eventlog.called(),
self.assertEqual(self.subfolder.yourfolder.order(), (1, 2, 0)) # add, clone [('myfolder', 'manage_afterAdd'),
self.assertEqual(self.subfolder.yourfolder.mydoc.order(), (1, 2, 0)) # add, clone ('mydoc', 'manage_afterAdd'),
('myfolder', 'manage_afterClone'),
('mydoc', 'manage_afterClone')]
)
def test_6_MOVE(self): def test_6_MOVE(self):
# Test webdav MOVE # Test webdav MOVE
req = self.app.REQUEST req = self.app.REQUEST
req.environ['HTTP_DEPTH'] = 'infinity' req.environ['HTTP_DEPTH'] = 'infinity'
req.environ['HTTP_DESTINATION'] = '%s/subfolder/yourfolder' % self.folder.absolute_url() req.environ['HTTP_DESTINATION'] = '%s/subfolder/myfolder' % self.folder.absolute_url()
oldfolder = self.folder.myfolder
olddoc = self.folder.myfolder.mydoc
self.folder.myfolder.MOVE(req, req.RESPONSE) self.folder.myfolder.MOVE(req, req.RESPONSE)
self.assertEqual(req.RESPONSE.getStatus(), 201) self.assertEqual(eventlog.called(),
self.assertEqual(oldfolder.order(), (0, 0, 1)) # del [('mydoc', 'manage_beforeDelete'),
self.assertEqual(self.subfolder.yourfolder.order(), (1, 0, 0)) # add ('myfolder', 'manage_beforeDelete'),
self.assertEqual(olddoc.order(), (0, 0, 1)) # del ('myfolder', 'manage_afterAdd'),
self.assertEqual(self.subfolder.yourfolder.mydoc.order(), (1, 0, 0)) # add ('mydoc', 'manage_afterAdd')]
)
def test_7_DELETE(self): def test_7_DELETE(self):
# Test webdav DELETE # Test webdav DELETE
req = self.app.REQUEST req = self.app.REQUEST
req['URL'] = '%s/myfolder' % self.folder.absolute_url() req['URL'] = '%s/myfolder' % self.folder.absolute_url()
oldfolder = self.folder.myfolder
olddoc = self.folder.myfolder.mydoc
self.folder.myfolder.DELETE(req, req.RESPONSE) self.folder.myfolder.DELETE(req, req.RESPONSE)
self.assertEqual(req.RESPONSE.getStatus(), 204) self.assertEqual(eventlog.called(),
self.assertEqual(oldfolder.order(), (0, 0, 1)) # del [('mydoc', 'manage_beforeDelete'),
self.assertEqual(olddoc.order(), (0, 0, 1)) # del ('myfolder', 'manage_beforeDelete')]
)
def test_suite(): def test_suite():
suite = unittest.TestSuite() from unittest import TestSuite, makeSuite
suite.addTest(unittest.makeSuite(TestCopySupport)) suite = TestSuite()
suite.addTest(unittest.makeSuite(TestCopySupportSublocation)) suite.addTest(makeSuite(TestCopySupport))
suite.addTest(makeSuite(TestCopySupportSublocation))
return suite return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
...@@ -17,6 +17,7 @@ $Id$ ...@@ -17,6 +17,7 @@ $Id$
import mimetypes import mimetypes
import sys import sys
import warnings
from urllib import unquote from urllib import unquote
import ExtensionClass import ExtensionClass
...@@ -29,7 +30,7 @@ from AccessControl.Permissions import view as View ...@@ -29,7 +30,7 @@ from AccessControl.Permissions import view as View
from AccessControl.Permissions import webdav_lock_items from AccessControl.Permissions import webdav_lock_items
from AccessControl.Permissions import webdav_unlock_items from AccessControl.Permissions import webdav_unlock_items
from AccessControl.Permissions import webdav_access from AccessControl.Permissions import webdav_access
from Acquisition import aq_base from Acquisition import aq_base, aq_inner, aq_parent
from zExceptions import BadRequest, MethodNotAllowed from zExceptions import BadRequest, MethodNotAllowed
from zExceptions import Unauthorized, Forbidden, NotFound from zExceptions import Unauthorized, Forbidden, NotFound
from zope.interface import implements from zope.interface import implements
...@@ -46,7 +47,11 @@ from interfaces import IWriteLock ...@@ -46,7 +47,11 @@ from interfaces import IWriteLock
from WriteLockInterface import WriteLockInterface from WriteLockInterface import WriteLockInterface
from zope.event import notify from zope.event import notify
from zope.lifecycleevent import ObjectCopiedEvent
from zope.app.container.contained import ObjectMovedEvent
from zope.app.container.contained import notifyContainerModified
from OFS.event import ObjectClonedEvent from OFS.event import ObjectClonedEvent
from OFS.event import ObjectWillBeMovedEvent
import OFS.subscribers import OFS.subscribers
...@@ -230,7 +235,7 @@ class Resource(ExtensionClass.Base, Lockable.LockableItem): ...@@ -230,7 +235,7 @@ class Resource(ExtensionClass.Base, Lockable.LockableItem):
ifhdr = REQUEST.get_header('If', '') ifhdr = REQUEST.get_header('If', '')
url = urlfix(REQUEST['URL'], 'DELETE') url = urlfix(REQUEST['URL'], 'DELETE')
name = unquote(filter(None, url.split( '/')[-1])) name = unquote(filter(None, url.split( '/')[-1]))
parent = self.aq_parent parent = aq_parent(aq_inner(self))
# Lock checking # Lock checking
if Lockable.wl_isLocked(self): if Lockable.wl_isLocked(self):
if ifhdr: if ifhdr:
...@@ -396,10 +401,14 @@ class Resource(ExtensionClass.Base, Lockable.LockableItem): ...@@ -396,10 +401,14 @@ class Resource(ExtensionClass.Base, Lockable.LockableItem):
if depth=='0' and isDavCollection(ob): if depth=='0' and isDavCollection(ob):
for id in ob.objectIds(): for id in ob.objectIds():
ob._delObject(id) ob._delObject(id)
notify(ObjectCopiedEvent(ob, self))
if existing: if existing:
object=getattr(parent, name) object=getattr(parent, name)
self.dav__validate(object, 'DELETE', REQUEST) self.dav__validate(object, 'DELETE', REQUEST)
parent._delObject(name) parent._delObject(name)
parent._setObject(name, ob) parent._setObject(name, ob)
ob = parent._getOb(name) ob = parent._getOb(name)
ob._postCopy(parent, op=0) ob._postCopy(parent, op=0)
...@@ -505,20 +514,52 @@ class Resource(ExtensionClass.Base, Lockable.LockableItem): ...@@ -505,20 +514,52 @@ class Resource(ExtensionClass.Base, Lockable.LockableItem):
raise PreconditionFailed, 'Source is locked and no '\ raise PreconditionFailed, 'Source is locked and no '\
'condition was passed in.' 'condition was passed in.'
orig_container = aq_parent(aq_inner(self))
orig_id = self.getId()
self._notifyOfCopyTo(parent, op=1)
notify(ObjectWillBeMovedEvent(self, orig_container, orig_id,
parent, name))
# try to make ownership explicit so that it gets carried # try to make ownership explicit so that it gets carried
# along to the new location if needed. # along to the new location if needed.
self.manage_changeOwnershipType(explicit=1) self.manage_changeOwnershipType(explicit=1)
self._notifyOfCopyTo(parent, op=1) ob = self._getCopy(parent)
ob = aq_base(self._getCopy(parent))
self.aq_parent._delObject(absattr(self.id))
ob._setId(name) ob._setId(name)
try:
orig_container._delObject(orig_id, suppress_events=True)
except TypeError:
# BBB: removed in Zope 2.11
orig_container._delObject(orig_id)
warnings.warn(
"%s._delObject without suppress_events is deprecated "
"and will be removed in Zope 2.11." %
orig_container.__class__.__name__, DeprecationWarning)
if existing: if existing:
object=getattr(parent, name) object=getattr(parent, name)
self.dav__validate(object, 'DELETE', REQUEST) self.dav__validate(object, 'DELETE', REQUEST)
parent._delObject(name) parent._delObject(name)
parent._setObject(name, ob)
try:
parent._setObject(name, ob, set_owner=0, suppress_events=True)
except TypeError:
# BBB: removed in Zope 2.11
parent._setObject(name, ob, set_owner=0)
warnings.warn(
"%s._setObject without suppress_events is deprecated "
"and will be removed in Zope 2.11." %
parent.__class__.__name__, DeprecationWarning)
ob = parent._getOb(name) ob = parent._getOb(name)
notify(ObjectMovedEvent(ob, orig_container, orig_id, parent, name))
notifyContainerModified(orig_container)
if aq_base(orig_container) is not aq_base(parent):
notifyContainerModified(parent)
ob._postCopy(parent, op=1) ob._postCopy(parent, op=1)
# try to make ownership implicit if possible # try to make ownership implicit if possible
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment