Commit d9ce0e27 authored by Hanno Schlichting's avatar Hanno Schlichting

Removed ZMI export/import feature.

parent 97a13e65
......@@ -28,6 +28,8 @@ Features Added
Restructuring
+++++++++++++
- Removed ZMI export/import feature.
- Drop ZopeUndo dependency and move undo management to the control panel.
- Simplify ZMI control panel and globally available management screens.
......
......@@ -14,7 +14,6 @@
"""
from cgi import escape
from cStringIO import StringIO
from logging import getLogger
import copy
import fnmatch
......@@ -32,7 +31,6 @@ from AccessControl.Permissions import view_management_screens
from AccessControl.Permissions import access_contents_information
from AccessControl.Permissions import delete_objects
from AccessControl.Permissions import ftp_access
from AccessControl.Permissions import import_export_objects
from AccessControl import getSecurityManager
from AccessControl.ZopeSecurityPolicy import getRoles
from Acquisition import aq_base, aq_parent
......@@ -62,9 +60,6 @@ from OFS.event import ObjectWillBeAddedEvent
from OFS.event import ObjectWillBeRemovedEvent
from OFS.Lockable import LockableItem
from OFS.subscribers import compatibilityCall
from OFS.XMLExportImport import importXML
from OFS.XMLExportImport import exportXML
from OFS.XMLExportImport import magic
if bbb.HAS_ZSERVER:
from webdav.Collection import Collection
......@@ -85,10 +80,8 @@ LOG = getLogger('ObjectManager')
# the name BadRequestException is relied upon by 3rd-party code
BadRequestException = BadRequest
customImporters={magic: importXML,
}
bad_id = re.compile(r'[^a-zA-Z0-9-_~,.$\(\)# @]').search
bad_id=re.compile(r'[^a-zA-Z0-9-_~,.$\(\)# @]').search
def checkValidId(self, id, allow_dup=0):
# If allow_dup is false, an error will be raised if an object
......@@ -583,92 +576,6 @@ class ObjectManager(CopyContainer,
r.append(o)
return r
security.declareProtected(import_export_objects, 'manage_exportObject')
def manage_exportObject(self, id='', download=None, toxml=None,
RESPONSE=None,REQUEST=None):
"""Exports an object to a file and returns that file."""
if not id:
# can't use getId() here (breaks on "old" exported objects)
id=self.id
if hasattr(id, 'im_func'): id=id()
ob=self
else: ob=self._getOb(id)
suffix=toxml and 'xml' or 'zexp'
if download:
f=StringIO()
if toxml:
exportXML(ob._p_jar, ob._p_oid, f)
else:
ob._p_jar.exportFile(ob._p_oid, f)
if RESPONSE is not None:
RESPONSE.setHeader('Content-type','application/data')
RESPONSE.setHeader('Content-Disposition',
'inline;filename=%s.%s' % (id, suffix))
return f.getvalue()
cfg = getConfiguration()
f = os.path.join(cfg.clienthome, '%s.%s' % (id, suffix))
if toxml:
exportXML(ob._p_jar, ob._p_oid, f)
else:
ob._p_jar.exportFile(ob._p_oid, f)
if REQUEST is not None:
return self.manage_main(self, REQUEST,
manage_tabs_message=
'<em>%s</em> successfully exported to <em>%s</em>' % (id,f),
title = 'Object exported')
security.declareProtected(import_export_objects, 'manage_importExportForm')
manage_importExportForm=DTMLFile('dtml/importExport',globals())
security.declareProtected(import_export_objects, 'manage_importObject')
def manage_importObject(self, file, REQUEST=None, set_owner=1):
"""Import an object from a file"""
dirname, file=os.path.split(file)
if dirname:
raise BadRequest, 'Invalid file name %s' % escape(file)
for impath in self._getImportPaths():
filepath = os.path.join(impath, 'import', file)
if os.path.exists(filepath):
break
else:
raise BadRequest, 'File does not exist: %s' % escape(file)
self._importObjectFromFile(filepath, verify=not not REQUEST,
set_owner=set_owner)
if REQUEST is not None:
return self.manage_main(
self, REQUEST,
manage_tabs_message='<em>%s</em> successfully imported' % id,
title='Object imported',
update_menu=1)
def _importObjectFromFile(self, filepath, verify=1, set_owner=1):
# locate a valid connection
connection=self._p_jar
obj=self
while connection is None:
obj=obj.aq_parent
connection=obj._p_jar
ob=connection.importFile(
filepath, customImporters=customImporters)
if verify: self._verifyObjectPaste(ob, validate_src=0)
id=ob.id
if hasattr(id, 'im_func'): id=id()
self._setObject(id, ob, set_owner=set_owner)
# try to make ownership implicit if possible in the context
# that the object was imported into.
ob=self._getOb(id)
ob.manage_changeOwnershipType(explicit=0)
def _getImportPaths(self):
cfg = getConfiguration()
paths = []
......
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from base64 import encodestring
from cStringIO import StringIO
from ZODB.serialize import referencesf
from ZODB.ExportImport import TemporaryFile, export_end_marker
from ZODB.utils import p64
from ZODB.utils import u64
from Shared.DC.xml import ppml
magic='<?xm' # importXML(jar, file, clue)}
def XMLrecord(oid, len, p):
q=ppml.ToXMLUnpickler
f=StringIO(p)
u=q(f)
id=u64(oid)
aka=encodestring(oid)[:-1]
u.idprefix=str(id)+'.'
p=u.load().__str__(4)
if f.tell() < len:
p=p+u.load().__str__(4)
String=' <record id="%s" aka="%s">\n%s </record>\n' % (id, aka, p)
return String
def exportXML(jar, oid, file=None):
if file is None: file=TemporaryFile()
elif type(file) is str: file=open(file,'w+b')
write=file.write
write('<?xml version="1.0"?>\012<ZopeData>\012')
ref=referencesf
oids=[oid]
done_oids={}
done=done_oids.has_key
load=jar._storage.load
while oids:
oid=oids[0]
del oids[0]
if done(oid): continue
done_oids[oid]=1
try:
try:
p, serial = load(oid)
except TypeError:
# Some places inside the ZODB 3.9 still want a version
# argument, for example TmpStore from Connection.py
p, serial = load(oid, None)
except:
pass # Ick, a broken reference
else:
ref(p, oids)
write(XMLrecord(oid,len(p),p))
write('</ZopeData>\n')
return file
class zopedata:
def __init__(self, parser, tag, attrs):
self.file=parser.file
write=self.file.write
write('ZEXP')
def append(self, data):
file=self.file
write=file.write
pos=file.tell()
file.seek(pos)
write(data)
def start_zopedata(parser, tag, data):
return zopedata(parser, tag, data)
def save_zopedata(parser, tag, data):
file=parser.file
write=file.write
pos=file.tell()
file.seek(pos)
write(export_end_marker)
def save_record(parser, tag, data):
file=parser.file
write=file.write
pos=file.tell()
file.seek(pos)
a=data[1]
if a.has_key('id'): oid=a['id']
oid=p64(int(oid))
v=''
for x in data[2:]:
v=v+x
l=p64(len(v))
v=oid+l+v
return v
def importXML(jar, file, clue=''):
import xml.parsers.expat
if type(file) is str:
file=open(file, 'rb')
outfile=TemporaryFile()
data=file.read()
F=ppml.xmlPickler()
F.end_handlers['record'] = save_record
F.end_handlers['ZopeData'] = save_zopedata
F.start_handlers['ZopeData'] = start_zopedata
F.binary=1
F.file=outfile
p=xml.parsers.expat.ParserCreate()
p.CharacterDataHandler=F.handle_data
p.StartElementHandler=F.unknown_starttag
p.EndElementHandler=F.unknown_endtag
r=p.Parse(data)
outfile.seek(0)
return jar.importFile(outfile,clue)
<dtml-var manage_page_header>
<dtml-var "manage_tabs(this(), _, )">
<p class="form-help">
You can export Zope objects to a file in order to transfer
them to a different Zope installation. You can either choose
to download the export file to your local machine, or save it
in the &quot;var&quot; directory of your Zope installation
on the server.
<br/>
<br/>
<b>Note:</b>
Zope can export/import objects in two different formats: a binary format (called
ZEXP) and as XML. The ZEXP format is the officially supported export/import
format for moving data between <u>identical</u> Zope installations (it is not a migration tool).
</p>
<form action="manage_exportObject" method="post">
<table cellspacing="2" border="0">
<tr>
<td align="left" valign="top">
<div class="form-label">
Export object id
</div>
</td>
<td align="left" valign="top">
<input type="text" name="id" size="25" value="<dtml-if ids><dtml-var "ids[0]" html_quote></dtml-if>" class="form-element"/>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Export to
</div>
</td>
<td align="left" valign="top">
<div class="form-text">
<label for="download_local">
<input type="radio" name="download:int" value="1" id="download_local" />
Download to local machine
</label>
<br />
<label for="download_server">
<input type="radio" name="download:int" value="0" id="download_server" checked />
Save to file on server
</label>
</div>
</td>
</tr>
<tr>
<td align="left" valign="top">
&nbsp;
</td>
<td align="left" valign="top">
<div class="form-text">
<label for="toxml">
<input type="checkbox" id="toxml" name="toxml" value="Y" />
XML format?
</label>
<em>(unsupported, see above)</em>
</div>
</td>
</tr>
<tr>
<td></td>
<td align="left" valign="top">
<div class="form-element">
<input class="form-element" type="submit" name="submit" value="Export" />
</div>
</td>
</tr>
</table>
</form>
<hr />
<p class="form-help">
You may import Zope objects which have been previously
exported to a file, by placing the file in the &quot;import&quot;
directory of your Zope installation on the server. You should create
the &quot;import&quot; directory in the root of your Zope installation
if it does not yet exist.
</p>
<p class="form-help">
Note that by default, you will become the owner of the objects
that you are importing. If you wish the imported objects to retain
their existing ownership information, select "retain existing
ownership information".
</p>
<form action="manage_importObject" method="post">
<table cellspacing="2" border="0">
<tr>
<td align="left" valign="top">
<div class="form-label">
Import file name
</div>
</td>
<td align="left" valign="top">
<select name="file">
<dtml-in "list_imports()">
<option value="<dtml-var sequence-item>"><dtml-var sequence-item></option>
</dtml-in>
</select>
</td>
</tr>
<tr>
<td align="left" valign="top">
<div class="form-label">
Ownership
</div>
</td>
<td align="left" valign="top">
<div class="form-text">
<label for="owner_take">
<input type="radio" name="set_owner:int" value="1" id="owner_take" checked />
Take ownership of imported objects
</label>
<br />
<label for="owner_retain">
<input type="radio" name="set_owner:int" value="0" id="owner_retain" />
Retain existing ownership information
</label>
</div>
</td>
</tr>
<tr>
<td></td>
<td align="left" valign="top">
<div class="form-element">
<input class="form-element" type="submit" name="submit" value="Import" />
</div>
</td>
</tr>
</table>
</form>
<dtml-var manage_page_footer>
......@@ -190,11 +190,6 @@ function toggleSelect() {
<input class="form-element" type="submit" name="manage_delObjects:method"
value="Delete" />
</dtml-if>
<dtml-if "_.SecurityCheckPermission('Import/Export objects', this())">
<input class="form-element" type="submit"
name="manage_importExportForm:method"
value="Import/Export" />
</dtml-if>
<script type="text/javascript">
<!--
if (document.forms[0]) {
......@@ -261,10 +256,6 @@ There are currently no items in <em>&dtml-title_or_id;</em>
</div>
</dtml-if>
</dtml-unless>
<dtml-if "_.SecurityCheckPermission('Import/Export objects', this())">
<input class="form-element" type="submit"
name="manage_importExportForm:method" value="Import/Export" />
</dtml-if>
</td>
</tr>
</table>
......
......@@ -682,7 +682,6 @@ class IObjectManager(IZopeObject, ICopyContainer, INavigation, IManageable,
manage_main = Attribute(""" """)
manage_index_main = Attribute(""" """)
manage_addProduct = Attribute(""" """)
manage_importExportForm = Attribute(""" """)
def all_meta_types(interfaces=None):
"""
......@@ -761,17 +760,6 @@ class IObjectManager(IZopeObject, ICopyContainer, INavigation, IManageable,
"""Return a list of subobjects, used by tree tag.
"""
def manage_exportObject(id='', download=None, toxml=None,
RESPONSE=None, REQUEST=None):
"""Exports an object to a file and returns that file."""
def manage_importObject(file, REQUEST=None, set_owner=1):
"""Import an object from a file"""
def _importObjectFromFile(filepath, verify=1, set_owner=1):
"""
"""
class IFindSupport(Interface):
"""Find support for Zope Folders"""
......
# -*- coding: iso8859-1 -*-
##############################################################################
#
# Copyright (c) 2006 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import unittest
import os
import tempfile
import transaction
from StringIO import StringIO
try:
here = os.path.dirname(os.path.abspath(__file__))
except:
here = os.path.dirname(os.path.abspath(sys.argv[0]))
imagedata = os.path.join(here, 'test.gif')
xmldata = os.path.join(here, 'export.xml')
_LONG_DTML = ''.join([('<dtml-var foo%d' % x) for x in xrange(1000)])
class XMLExportImportTests(unittest.TestCase):
def _makeJarAndRoot(self):
import ZODB
from OFS.Folder import Folder
from ZODB.DemoStorage import DemoStorage
CACHESIZE = 5 # something tiny
LOOPCOUNT = CACHESIZE * 10
storage = DemoStorage("Test")
db = ZODB.DB(storage, cache_size=CACHESIZE)
connection = db.open()
root = connection.root()
app = Folder('app')
root['app'] = app
return connection, app
def test_export_import_as_string_idempotent(self):
from OFS.DTMLMethod import DTMLMethod
from OFS.XMLExportImport import exportXML
from OFS.XMLExportImport import importXML
connection, app = self._makeJarAndRoot()
dm = DTMLMethod('test')
dm.munge(_LONG_DTML)
app._setObject('test', dm)
transaction.savepoint(optimistic=True) # need an OID!
oid = dm._p_oid
stream = StringIO()
data = exportXML(connection, oid, stream)
stream.seek(0)
newobj = importXML(connection, stream)
self.assertTrue(isinstance(newobj, DTMLMethod))
self.assertEqual(newobj.read(), dm.read())
def test_export_import_as_file_idempotent(self):
from OFS.DTMLMethod import DTMLMethod
from OFS.XMLExportImport import exportXML
from OFS.XMLExportImport import importXML
connection, app = self._makeJarAndRoot()
dm = DTMLMethod('test')
dm.munge(_LONG_DTML)
app._setObject('test', dm)
transaction.savepoint(optimistic=True) # need an OID!
oid = dm._p_oid
handle, path = tempfile.mkstemp(suffix='.xml')
try:
ostream = os.fdopen(handle,'wb')
data = exportXML(connection, oid, ostream)
ostream.close()
newobj = importXML(connection, path)
self.assertTrue(isinstance(newobj, DTMLMethod))
self.assertEqual(newobj.read(), dm.read())
finally:
# if this operaiton fails with a 'Permission Denied' error,
# then comment it out as it's probably masking a failure in
# the block above.
os.remove(path)
def test_OFS_ObjectManager__importObjectFromFile_xml(self):
from OFS.DTMLMethod import DTMLMethod
from OFS.Folder import Folder
from OFS.XMLExportImport import exportXML
connection, app = self._makeJarAndRoot()
dm = DTMLMethod('test')
dm._setId('test')
dm.munge(_LONG_DTML)
app._setObject('test', dm)
sub = Folder('sub')
app._setObject('sub', sub)
transaction.savepoint(optimistic=True) # need an OID!
oid = dm._p_oid
sub = app._getOb('sub')
handle, path = tempfile.mkstemp(suffix='.xml')
try:
ostream = os.fdopen(handle,'wb')
data = exportXML(connection, oid, ostream)
ostream.close()
sub._importObjectFromFile(path, 0, 0)
finally:
# if this operaiton fails with a 'Permission Denied' error,
# then comment it out as it's probably masking a failure in
# the block above.
os.remove(path)
def test_exportXML(self):
from OFS.Folder import Folder
from OFS.Image import Image
from OFS.XMLExportImport import exportXML
connection, app = self._makeJarAndRoot()
data = open(imagedata, 'rb')
sub = Folder('sub')
app._setObject('sub', sub)
img = Image('image', '', data, 'image/gif')
sub._setObject('image', img)
img._setProperty('prop1', 3.14159265359, 'float')
img._setProperty('prop2', 1, 'int')
img._setProperty('prop3', 2L**31-1, 'long')
img._setProperty('prop4', 'xxx', 'string')
img._setProperty('prop5', ['xxx', 'zzz'], 'lines')
img._setProperty('prop6', u'xxx', 'unicode')
img._setProperty('prop7', [u'xxx', u'zzz'], 'ulines')
img._setProperty('prop8', '<&>', 'string')
img._setProperty('prop9', u'<&>', 'unicode')
img._setProperty('prop10', '<]]>', 'string')
img._setProperty('prop11', u'<]]>', 'unicode')
img._setProperty('prop12', u'', 'unicode')
transaction.savepoint(optimistic=True)
oid = sub._p_oid
handle, path = tempfile.mkstemp(suffix='.xml')
try:
ostream = os.fdopen(handle,'wb')
data = exportXML(connection, oid, ostream)
ostream.close()
finally:
os.remove(path)
def test_importXML(self):
from OFS.XMLExportImport import importXML
connection, app = self._makeJarAndRoot()
newobj = importXML(connection, xmldata)
img = newobj._getOb('image')
data = open(imagedata, 'rb').read()
self.assertEqual(img.data, data)
self.assertEqual(repr(img.getProperty('prop1')),
repr(3.14159265359))
self.assertEqual(repr(img.getProperty('prop2')),
repr(1))
self.assertEqual(repr(img.getProperty('prop3')),
repr(2L**31-1))
self.assertEqual(repr(img.getProperty('prop4')),
repr('xxx'))
self.assertEqual(repr(img.getProperty('prop5')),
repr(('xxx', 'zzz')))
self.assertEqual(repr(img.getProperty('prop6')),
repr(u'xxx'))
self.assertEqual(repr(img.getProperty('prop7')),
repr((u'xxx', u'zzz')))
self.assertEqual(repr(img.getProperty('prop8')),
repr('<&>'))
self.assertEqual(repr(img.getProperty('prop9')),
repr(u'<&>'))
self.assertEqual(repr(img.getProperty('prop10')),
repr('<]]>'))
self.assertEqual(repr(img.getProperty('prop11')),
repr(u'<]]>'))
self.assertEqual(repr(img.getProperty('prop12')),
repr(u''))
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(XMLExportImportTests),
))
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Provide conversion between Python pickles and XML
"""
from pickle import *
import struct
import base64
import re
from marshal import loads as mloads
from xyap import NoBlanks
from xyap import xyap
binary = re.compile('[^\x1f-\x7f]').search
def escape(s, encoding='repr'):
if binary(s) and isinstance(s, str):
s = base64.encodestring(s)[:-1]
encoding = 'base64'
elif '>' in s or '<' in s or '&' in s:
if ']]>' not in s:
s = '<![CDATA[' + s + ']]>'
encoding = 'cdata'
else:
s = s.replace('&', '&amp;')
s = s.replace('>', '&gt;')
s = s.replace('<', '&lt;')
return encoding, s
def unescape(s, encoding):
if encoding == 'base64':
return base64.decodestring(s)
else:
s = s.replace('&lt;', '<')
s = s.replace('&gt;', '>')
return s.replace('&amp;', '&')
class Global:
def __init__(self, module, name):
self.module = module
self.name = name
def __str__(self, indent=0):
if hasattr(self, 'id'):
id = ' id="%s"' % self.id
else:
id = ''
name = self.__class__.__name__.lower()
return '%s<%s%s name="%s" module="%s"/>\n' % (
' ' * indent, name, id, self.name, self.module)
class Scalar:
def __init__(self, v):
self._v = v
def value(self):
return self._v
def __str__(self, indent=0):
if hasattr(self, 'id'):
id = ' id="%s"' % self.id
else:
id = ''
name = self.__class__.__name__.lower()
return '%s<%s%s>%s</%s>\n' % (
' ' * indent, name, id, self.value(), name)
class Long(Scalar):
def value(self):
result = str(self._v)
if result[-1:] == 'L':
return result[:-1]
return result
class String(Scalar):
def __init__(self, v, encoding=''):
encoding, v = escape(v, encoding)
self.encoding = encoding
self._v = v
def __str__(self, indent=0):
if hasattr(self, 'id'):
id = ' id="%s"' % self.id
else:
id = ''
if hasattr(self, 'encoding'):
encoding = ' encoding="%s"' % self.encoding
else:
encoding = ''
name = self.__class__.__name__.lower()
return '%s<%s%s%s>%s</%s>\n' % (
' ' * indent, name, id, encoding, self.value(), name)
class Unicode(String):
def __init__(self, v, encoding):
v = unicode(v, encoding)
String.__init__(self, v)
def value(self):
return self._v.encode('utf-8')
class Wrapper:
def __init__(self, v):
self._v = v
def value(self):
return self._v
def __str__(self, indent=0):
if hasattr(self, 'id'):
id = ' id="%s"' % self.id
else:
id = ''
name = self.__class__.__name__.lower()
v = self._v
i = ' ' * indent
if isinstance(v, Scalar):
return '%s<%s%s>%s</%s>\n' % (i, name, id, str(v)[:-1], name)
else:
try:
v = v.__str__(indent + 2)
except TypeError:
v = v.__str__()
return '%s<%s%s>\n%s%s</%s>\n' % (i, name, id, v, i, name)
class Collection:
def __str__(self, indent=0):
if hasattr(self, 'id'):
id = ' id="%s"' % self.id
else:
id = ''
name = self.__class__.__name__.lower()
i = ' ' * indent
if self:
return '%s<%s%s>\n%s%s</%s>\n' % (
i, name, id, self.value(indent + 2), i, name)
else:
return '%s<%s%s/>\n' % (i, name, id)
class Dictionary(Collection):
def __init__(self):
self._d = []
def __len__(self):
return len(self._d)
def __setitem__(self, k, v):
self._d.append((k, v))
def value(self, indent):
return ''.join(
map(lambda i, ind=' ' * indent, indent=indent + 4:
'%s<item>\n'
'%s'
'%s'
'%s</item>\n'
%
(ind,
Key(i[0]).__str__(indent),
Value(i[1]).__str__(indent),
ind),
self._d
))
class Sequence(Collection):
def __init__(self, v=None):
if not v:
v = []
self._subs = v
def __len__(self):
return len(self._subs)
def append(self, v):
self._subs.append(v)
def extend(self, v):
self._subs.extend(v)
def _stringify(self, v, indent):
try:
return v.__str__(indent + 2)
except TypeError:
return v.__str__()
def value(self, indent):
return ''.join(map(
lambda v, indent=indent: self._stringify(v, indent),
self._subs))
class none:
def __str__(self, indent=0):
return ' ' * indent + '<none/>\n'
none = none()
class Reference(Scalar):
def __init__(self, v):
self._v = v
def __str__(self, indent=0):
v = self._v
name = self.__class__.__name__.lower()
return '%s<%s id="%s"/>\n' % (' ' * indent, name, v)
Get = Reference
class Object(Sequence):
def __init__(self, klass, args):
self._subs = [Klass(klass), args]
def __setstate__(self, v):
self.append(State(v))
class Int(Scalar):
pass
class Float(Scalar):
pass
class List(Sequence):
pass
class Tuple(Sequence):
pass
class Key(Wrapper):
pass
class Value(Wrapper):
pass
class Klass(Wrapper):
pass
class State(Wrapper):
pass
class Pickle(Wrapper):
pass
class Persistent(Wrapper):
pass
class ToXMLUnpickler(Unpickler):
def load(self):
return Pickle(Unpickler.load(self))
dispatch = {}
dispatch.update(Unpickler.dispatch)
def persistent_load(self, v):
return Persistent(v)
def load_persid(self):
pid = self.readline()[:-1]
self.append(self.persistent_load(String(pid)))
dispatch[PERSID] = load_persid
def load_none(self):
self.append(none)
dispatch[NONE] = load_none
def load_int(self):
self.append(Int(int(self.readline()[:-1])))
dispatch[INT] = load_int
def load_binint(self):
self.append(Int(mloads('i' + self.read(4))))
dispatch[BININT] = load_binint
def load_binint1(self):
self.append(Int(ord(self.read(1))))
dispatch[BININT1] = load_binint1
def load_binint2(self):
self.append(Int(mloads('i' + self.read(2) + '\000\000')))
dispatch[BININT2] = load_binint2
def load_long(self):
self.append(Long(long(self.readline()[:-1], 0)))
dispatch[LONG] = load_long
def load_float(self):
self.append(Float(float(self.readline()[:-1])))
dispatch[FLOAT] = load_float
def load_binfloat(self, unpack=struct.unpack):
self.append(Float(unpack('>d', self.read(8))[0]))
dispatch[BINFLOAT] = load_binfloat
def load_string(self):
rep = self.readline()[:-1]
for q in "\"'":
if rep.startswith(q):
if not rep.endswith(q):
raise ValueError('insecure string pickle')
rep = rep[len(q):-len(q)]
break
else:
raise ValueError('insecure string pickle')
self.append(String(rep.decode('string-escape')))
dispatch[STRING] = load_string
def load_binstring(self):
len = mloads('i' + self.read(4))
self.append(String(self.read(len)))
dispatch[BINSTRING] = load_binstring
def load_unicode(self):
self.append(Unicode(self.readline()[:-1], 'raw-unicode-escape'))
dispatch[UNICODE] = load_unicode
def load_binunicode(self):
len = mloads('i' + self.read(4))
self.append(Unicode(self.read(len),'utf-8'))
dispatch[BINUNICODE] = load_binunicode
def load_short_binstring(self):
len = ord(self.read(1))
self.append(String(self.read(len)))
dispatch[SHORT_BINSTRING] = load_short_binstring
def load_tuple(self):
k = self.marker()
self.stack[k:] = [Tuple(self.stack[k + 1:])]
dispatch[TUPLE] = load_tuple
def load_empty_tuple(self):
self.stack.append(Tuple())
dispatch[EMPTY_TUPLE] = load_empty_tuple
def load_empty_list(self):
self.stack.append(List())
dispatch[EMPTY_LIST] = load_empty_list
def load_empty_dictionary(self):
self.stack.append(Dictionary())
dispatch[EMPTY_DICT] = load_empty_dictionary
def load_list(self):
k = self.marker()
self.stack[k:] = [List(self.stack[k + 1:])]
dispatch[LIST] = load_list
def load_dict(self):
k = self.marker()
d = Dictionary()
items = self.stack[k + 1:]
for i in range(0, len(items), 2):
key = items[i]
value = items[i + 1]
d[key] = value
self.stack[k:] = [d]
dispatch[DICT] = load_dict
def load_inst(self):
k = self.marker()
args = Tuple(self.stack[k + 1:])
del self.stack[k:]
module = self.readline()[:-1]
name = self.readline()[:-1]
value = Object(Global(module, name), args)
self.append(value)
dispatch[INST] = load_inst
def load_obj(self):
stack = self.stack
k = self.marker()
klass = stack[k + 1]
del stack[k + 1]
args = Tuple(stack[k + 1:])
del stack[k:]
value = Object(klass, args)
self.append(value)
dispatch[OBJ] = load_obj
def load_global(self):
module = self.readline()[:-1]
name = self.readline()[:-1]
self.append(Global(module, name))
dispatch[GLOBAL] = load_global
def load_reduce(self):
stack = self.stack
callable = stack[-2]
arg_tup = stack[-1]
del stack[-2:]
value = Object(callable, arg_tup)
self.append(value)
dispatch[REDUCE] = load_reduce
idprefix=''
def load_get(self):
self.append(Get(self.idprefix + self.readline()[:-1]))
dispatch[GET] = load_get
def load_binget(self):
i = ord(self.read(1))
self.append(Get(self.idprefix + repr(i)))
dispatch[BINGET] = load_binget
def load_long_binget(self):
i = mloads('i' + self.read(4))
self.append(Get(self.idprefix + repr(i)))
dispatch[LONG_BINGET] = load_long_binget
def load_put(self):
self.stack[-1].id = self.idprefix + self.readline()[:-1]
dispatch[PUT] = load_put
def load_binput(self):
i = ord(self.read(1))
last = self.stack[-1]
if getattr(last, 'id', last) is last:
last.id = self.idprefix + repr(i)
dispatch[BINPUT] = load_binput
def load_long_binput(self):
i = mloads('i' + self.read(4))
last = self.stack[-1]
if getattr(last, 'id', last) is last:
last.id = self.idprefix + repr(i)
dispatch[LONG_BINPUT] = load_long_binput
def ToXMLload(file):
return ToXMLUnpickler(file).load()
def ToXMLloads(str):
from StringIO import StringIO
file = StringIO(str)
return ToXMLUnpickler(file).load()
def name(self, tag, data):
return ''.join(data[2:]).strip()
def start_pickle(self, tag, attrs):
self._pickleids = {}
return [tag, attrs]
def save_int(self, tag, data):
if self.binary:
v = int(name(self, tag, data))
if v >= 0:
if v <= 0xff:
return BININT1 + chr(v)
if v <= 0xffff:
return '%c%c%c' % (BININT2, v & 0xff, v >> 8)
hb = v >> 31
if hb == 0 or hb == -1:
return BININT + struct.pack('<i', v)
return INT + name(self, tag, data) + '\n'
def save_float(self, tag, data):
if self.binary:
return BINFLOAT + struct.pack('>d', float(name(self, tag, data)))
else:
return FLOAT + name(self, tag, data) + '\n'
def save_put(self, v, attrs):
id = attrs.get('id', '')
if id:
prefix = id.rfind('.')
if prefix >= 0:
id = id[prefix + 1:]
elif id[0] == 'i':
id = id[1:]
if self.binary:
id = int(id)
if id < 256:
id = BINPUT + chr(id)
else:
id = LONG_BINPUT + struct.pack('<i', id)
else:
id = PUT + repr(id) + '\n'
return v + id
return v
def save_string(self, tag, data):
a = data[1]
v = ''.join(data[2:])
encoding = a['encoding']
if encoding is not '':
v = unescape(v, encoding)
if self.binary:
l = len(v)
if l < 256:
v = SHORT_BINSTRING + chr(l) + v
else:
v = BINSTRING + struct.pack('<i', l) + v
else:
v = STRING + repr(v) + '\n'
return save_put(self, v, a)
def save_unicode(self, tag, data):
a = data[1]
v = ''.join(data[2:])
encoding = a['encoding']
if encoding is not '':
v = unescape(v, encoding)
if self.binary:
v = v.encode('utf-8')
v = BINUNICODE + struct.pack("<i", len(v)) + v
else:
v = v.replace("\\", "\\u005c")
v = v.replace("\n", "\\u000a")
v.encode('raw-unicode-escape')
v = UNICODE + v + '\n'
return save_put(self, v, a)
def save_tuple(self, tag, data):
T = data[2:]
if not T:
return EMPTY_TUPLE
return save_put(self, MARK + ''.join(T) + TUPLE, data[1])
def save_list(self, tag, data):
L = data[2:]
if self.binary:
v = save_put(self, EMPTY_LIST, data[1])
if L:
v = v + MARK + ''.join(L) + APPENDS
else:
v = save_put(self, MARK + LIST, data[1])
if L:
v = APPEND.join(L) + APPEND
return v
def save_dict(self, tag, data):
D = data[2:]
if self.binary:
v = save_put(self, EMPTY_DICT, data[1])
if D:
v = v + MARK + ''.join(D) + SETITEMS
else:
v = save_put(self, MARK + DICT, data[1])
if D:
v = v + SETITEM.join(D) + SETITEM
return v
def save_reference(self, tag, data):
a = data[1]
id = a['id']
prefix = id.rfind('.')
if prefix >= 0:
id = id[prefix + 1:]
if self.binary:
id = int(id)
if id < 256:
return BINGET + chr(id)
else:
return LONG_BINGET + struct.pack('<i', i)
else:
return GET + repr(id) + '\n'
def save_object(self, tag, data):
v = MARK + data[2]
x = data[3][1:]
stop = x.rfind('t') # This seems
if stop >= 0: # wrong!
x = x[:stop]
v = save_put(self, v + x + OBJ, data[1])
v = v + data[4] + BUILD # state
return v
def save_global(self, tag, data):
a = data[1]
return save_put(self, GLOBAL + a['module'] + '\n' + a['name'] + '\n', a)
def save_persis(self, tag, data):
v = data[2]
if self.binary:
return v + BINPERSID
else:
return PERSID + v
class xmlPickler(NoBlanks, xyap):
start_handlers = {
'pickle': lambda self, tag, attrs: [tag, attrs],
}
end_handlers = {
'pickle': lambda self, tag, data: str(data[2]) + STOP,
'none': lambda self, tag, data: NONE,
'int': save_int,
'long': lambda self, tag, data: LONG + str(data[2]) + LONG + '\n',
'float': save_float,
'string': save_string,
'reference': save_reference,
'tuple': save_tuple,
'list': save_list,
'dictionary': save_dict,
'item': lambda self, tag, data: ''.join(map(str, data[2:])),
'value': lambda self, tag, data: data[2],
'key': lambda self, tag, data: data[2],
'object': save_object,
'klass': lambda self, tag, data: data[2],
'state': lambda self, tag, data: data[2],
'global': save_global,
'persistent': save_persis,
'unicode': save_unicode,
}
"""Yet another XML parser
This is meant to be very simple:
- stack based
- The parser has a table of start handlers and end handlers.
- start tag handlers are called with the parser instance, tag names
and attributes. The result is placed on the stack. The default
handler places a special object on the stack (uh, a list, with the
tag name and attributes as the first two elements. ;)
- end tag handlers are called with the object on the parser, the tag
name, and top of the stack right after it has been removed. The
result is appended to the object on the top of the stack.
Note that namespace attributes should recieve some special handling.
Oh well.
"""
import string
import xml.parsers.expat
class xyap:
start_handlers = {}
end_handlers = {}
def __init__(self):
top = []
self._stack = _stack = [top]
self.push = _stack.append
self.append = top.append
def handle_data(self, data):
self.append(data)
def unknown_starttag(self, tag, attrs):
if isinstance(attrs, list):
attrs = dict(attrs)
start = self.start_handlers
if tag in start:
tag = start[tag](self, tag, attrs)
else:
tag = [tag, attrs]
self.push(tag)
self.append = tag.append
def unknown_endtag(self, tag):
_stack = self._stack
top = _stack.pop()
append = self.append = _stack[-1].append
end = self.end_handlers
if tag in end:
top = end[tag](self, tag, top)
append(top)
class NoBlanks:
def handle_data(self, data):
if data.strip():
self.append(data)
def struct(self, tag, data):
r = {}
for k, v in data[2:]:
r[k] = v
return r
_nulljoin = "".join
def name(self, tag, data):
return _nulljoin(data[2:]).strip()
def tuplef(self, tag, data):
return tuple(data[2:])
class XYap(xyap):
def __init__(self):
self._parser = xml.parsers.expat.ParserCreate()
self._parser.StartElementHandler = self.unknown_starttag
self._parser.EndElementHandler = self.unknown_endtag
self._parser.CharacterDataHandler = self.handle_data
xyap.__init__(self)
class xmlrpc(NoBlanks, XYap):
end_handlers = {
'methodCall': tuplef,
'methodName': name,
'params': tuplef,
'param': lambda self, tag, data: data[2],
'value': lambda self, tag, data: data[2],
'i4':
lambda self, tag, data, atoi=string.atoi, name=name:
atoi(name(self, tag, data)),
'int':
lambda self, tag, data, atoi=string.atoi, name=name:
atoi(name(self, tag, data)),
'boolean':
lambda self, tag, data, atoi=string.atoi, name=name:
atoi(name(self, tag, data)),
'string': lambda self, tag, data, join=string.join:
join(data[2:], ''),
'double':
lambda self, tag, data, atof=string.atof, name=name:
atof(name(self, tag, data)),
'float':
lambda self, tag, data, atof=string.atof, name=name:
atof(name(self, tag, data)),
'struct': struct,
'member': tuplef,
'name': name,
'array': lambda self, tag, data: data[2],
'data': lambda self, tag, data: data[2:],
}
......@@ -16,8 +16,6 @@ Demonstrates that cut/copy/paste/clone/rename and import/export
work if a savepoint is made before performing the respective operation.
"""
import os
from Testing import ZopeTestCase
from Testing.ZopeTestCase import layer
......@@ -27,7 +25,6 @@ from Testing.ZopeTestCase import transaction
from AccessControl.Permissions import add_documents_images_and_files
from AccessControl.Permissions import delete_objects
from OFS.SimpleItem import SimpleItem
import tempfile
folder_name = ZopeTestCase.folder_name
cutpaste_permissions = [add_documents_images_and_files, delete_objects]
......@@ -92,73 +89,6 @@ class TestCopyPaste(ZopeTestCase.ZopeTestCase):
self.assertTrue(hasattr(self.folder, 'new_doc'))
class TestImportExport(ZopeTestCase.ZopeTestCase):
def afterSetUp(self):
self.setupLocalEnvironment()
self.folder.addDTMLMethod('doc', file='foo')
# _p_oids are None until we create a savepoint
self.assertEqual(self.folder._p_oid, None)
transaction.savepoint(optimistic=True)
self.assertNotEqual(self.folder._p_oid, None)
def testExport(self):
self.folder.manage_exportObject('doc')
self.assertTrue(os.path.exists(self.zexp_file))
def testImport(self):
self.folder.manage_exportObject('doc')
self.folder._delObject('doc')
self.folder.manage_importObject('doc.zexp')
self.assertTrue(hasattr(self.folder, 'doc'))
# To make export and import happy, we have to provide a file-
# system 'import' directory and adapt the configuration a bit:
local_home = tempfile.gettempdir()
import_dir = os.path.join(local_home, 'import')
zexp_file = os.path.join(import_dir, 'doc.zexp')
def setupLocalEnvironment(self):
# Create the 'import' directory
os.mkdir(self.import_dir)
import App.config
config = App.config.getConfiguration()
self._ih = config.instancehome
config.instancehome = self.local_home
self._ch = config.clienthome
config.clienthome = self.import_dir
App.config.setConfiguration(config)
def afterClear(self):
# Remove external resources
try:
os.remove(self.zexp_file)
except OSError:
pass
try:
os.rmdir(self.import_dir)
except OSError:
pass
try:
import App.config
except ImportError:
# Restore builtins
builtins = getattr(__builtins__, '__dict__', __builtins__)
if hasattr(self, '_ih'):
builtins['INSTANCE_HOME'] = self._ih
if hasattr(self, '_ch'):
builtins['CLIENT_HOME'] = self._ch
else:
# Zope >= 2.7
config = App.config.getConfiguration()
if hasattr(self, '_ih'):
config.instancehome = self._ih
if hasattr(self, '_ch'):
config.clienthome = self._ch
App.config.setConfiguration(config)
class TestAttributesOfCleanObjects(ZopeTestCase.ZopeTestCase):
'''This testcase shows that _v_ and _p_ attributes are NOT bothered
by transaction boundaries, if the respective object is otherwise
......@@ -312,7 +242,6 @@ def test_suite():
from unittest import TestSuite, makeSuite
suite = TestSuite()
suite.addTest(makeSuite(TestCopyPaste))
suite.addTest(makeSuite(TestImportExport))
suite.addTest(makeSuite(TestAttributesOfCleanObjects))
suite.addTest(makeSuite(TestAttributesOfDirtyObjects))
suite.addTest(makeSuite(TestTransactionAbort))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment