Commit 1d805f46 authored by Jérome Perrin's avatar Jérome Perrin

XXX all changes to test (product)

Co-authored-by: Carlos Ramos Carreño's avatarCarlos Ramos Carreño <carlos.ramos@nexedi.com>
Co-authored-by: Emmy Vouriot's avatarEmmeline Vouriot <emmeline.vouriot@nexedi.com>
Co-authored-by: Kazuhiko Shiozaki's avatarKazuhiko SHIOZAKI <kazuhiko@nexedi.com>
parent 8392ac0e
......@@ -62,6 +62,7 @@ import weakref
import transaction
from App.config import getConfiguration
import socket
from six.moves import range
class CommitFailed(Exception):
pass
......@@ -617,7 +618,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Monkey patch Queue to induce conflict errors artificially.
def query(self, query_string,*args, **kw):
# Not so nice, this is specific to zsql method
if "REPLACE INTO" in query_string:
if b"REPLACE INTO" in query_string:
raise OperationalError
return self.original_query(query_string,*args, **kw)
......@@ -731,7 +732,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
messages.
"""
activity_tool = self.portal.portal_activities
for _ in xrange(loop_size):
for _ in range(loop_size):
activity_tool.distribute(node_count=1)
activity_tool.tic(processing_node=1)
......@@ -924,7 +925,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
Organisation.updateDesc = updateDesc
# First check dequeue read same message only once
for i in xrange(10):
for i in range(10):
p.activate(activity="SQLDict").updateDesc()
self.commit()
......@@ -933,7 +934,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(p.getDescription(), "a")
# Check if there is pending activity after deleting an object
for i in xrange(10):
for i in range(10):
p.activate(activity="SQLDict").updateDesc()
self.commit()
......@@ -963,7 +964,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(0,organisation.getFoobar())
# Test group_method_id is working without group_id
for x in xrange(5):
for x in range(5):
organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar").reindexObject(number=1)
self.commit()
......@@ -975,7 +976,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Test group_method_id is working with one group_id defined
for x in xrange(5):
for x in range(5):
organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
self.commit()
......@@ -988,7 +989,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
del foobar_list[:]
# Test group_method_id is working with many group_id defined
for x in xrange(5):
for x in range(5):
organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
self.commit()
organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
......@@ -1024,7 +1025,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
"""
activity_tool = self.getActivityTool()
def delete_volatiles():
for property_id in activity_tool.__dict__.keys():
for property_id in list(six.iterkeys(activity_tool.__dict__)):
if property_id.startswith('_v_'):
delattr(activity_tool, property_id)
organisation_module = self.getOrganisationModule()
......@@ -1140,6 +1141,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.flushAllActivities(silent=1, loop_size=100)
# Check there is a traceback in the email notification
sender, recipients, mail = message_list.pop()
mail = mail.decode()
self.assertIn("Module %s, line %s, in failingMethod" % (
__name__, inspect.getsourcelines(failingMethod)[1]), mail)
self.assertIn("ValueError:", mail)
......@@ -1235,7 +1237,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Check that cmf_activity SQL connection still works
connection_da = self.portal.cmf_activity_sql_connection()
self.assertFalse(connection_da._registered)
connection_da.query('select 1')
connection_da.query(b'select 1')
self.assertTrue(connection_da._registered)
self.commit()
self.assertFalse(connection_da._registered)
......@@ -1691,7 +1693,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# This is a one-shot method, revert after execution
SQLDict.dequeueMessage = original_dequeue
result = self.dequeueMessage(activity_tool, processing_node, node_family_id_set)
queue_tic_test_dict['isAlive'] = process_shutdown_thread.isAlive()
queue_tic_test_dict['is_alive'] = process_shutdown_thread.is_alive()
return result
SQLDict.dequeueMessage = dequeueMessage
Organisation.waitingActivity = waitingActivity
......@@ -1715,7 +1717,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.tic()
activity_thread = ActivityThread()
# Do not try to outlive main thread.
activity_thread.setDaemon(True)
activity_thread.daemon = True
# Call process_shutdown in yet another thread because it will wait for
# running activity to complete before returning, and we need to unlock
# activity *after* calling process_shutdown to make sure the next
......@@ -1725,7 +1727,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.process_shutdown(3, 0)
process_shutdown_thread = ProcessShutdownThread()
# Do not try to outlive main thread.
process_shutdown_thread.setDaemon(True)
process_shutdown_thread.daemon = True
activity_thread.start()
# Wait at rendez-vous for activity to arrive.
......@@ -1744,7 +1746,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(len(message_list), 1)
self.assertEqual(message_list[0].method_id, 'getTitle')
# Check that process_shutdown_thread was still runing when Queue_tic returned.
self.assertTrue(queue_tic_test_dict.get('isAlive'), repr(queue_tic_test_dict))
self.assertTrue(queue_tic_test_dict.get('is_alive'), repr(queue_tic_test_dict))
# Call tic in foreground. This must not lead to activity execution.
activity_tool.tic()
self.assertEqual(len(activity_tool.getMessageList()), 1)
......@@ -1892,13 +1894,13 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
"""
original_query = six.get_unbound_function(DB.query)
def query(self, query_string, *args, **kw):
if query_string.startswith('INSERT'):
if query_string.startswith(b'INSERT'):
insert_list.append(len(query_string))
if not n:
raise Skip
return original_query(self, query_string, *args, **kw)
def check():
for i in xrange(1, N):
for i in range(1, N):
activity_tool.activate(activity=activity, group_id=str(i)
).doSomething(arg)
activity_tool.activate(activity=activity, group_id='~'
......@@ -2402,7 +2404,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# / \ |
# c3 c4 c5
c = [category_tool.newContent()]
for i in xrange(5):
for i in range(5):
c.append(c[i//2].newContent())
self.tic()
def activate(i, priority=1, **kw):
......@@ -2473,7 +2475,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
check(1, tag="foo")
check(0, tag="foo", method_id="getUid")
check(1, processing_node=-1)
check(3, processing_node=range(-5,5))
check(3, processing_node=list(range(-5,5)))
test()
self.commit()
test(check)
......@@ -2501,7 +2503,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(1, activity_tool.countMessage())
self.flushAllActivities()
sender, recipients, mail = message_list.pop()
self.assertIn('UID mismatch', mail)
self.assertIn(b'UID mismatch', mail)
m, = activity_tool.getMessageList()
self.assertEqual(m.processing_node, INVOKE_ERROR_STATE)
obj.flushActivity()
......
......@@ -785,31 +785,31 @@ class TestCMFCategory(ERP5TypeTestCase):
base_cat.getCategoryChildIndentedTitleItemList(),
[['', ''],
['The Title', 'the_id'],
['\xc2\xa0\xc2\xa0The Sub Title', 'the_id/the_sub_id']],
[NBSP_UTF8 * 2 + 'The Sub Title', 'the_id/the_sub_id']],
)
self.assertEqual(
base_cat.getCategoryChildTranslatedIndentedTitleItemList(),
[['', ''],
['The Title', 'the_id'],
['\xc2\xa0\xc2\xa0The S\xc3\xbcb T\xc3\xaftle', 'the_id/the_sub_id']],
[NBSP_UTF8 * 2 + 'The Süb Tïtle', 'the_id/the_sub_id']],
)
self.assertEqual(
base_cat.getCategoryChildTranslatedCompactTitleItemList(),
[['', ''],
['The Title', 'the_id'],
['The S\xc3\xbcb T\xc3\xaftle', 'the_id/the_sub_id']],
['The Süb Tïtle', 'the_id/the_sub_id']],
)
self.assertEqual(
base_cat.getCategoryChildTranslatedLogicalPathItemList(),
[['', ''],
['The Title', 'the_id'],
['The Title/The S\xc3\xbcb T\xc3\xaftle', 'the_id/the_sub_id']],
['The Title/The Süb Tïtle', 'the_id/the_sub_id']],
)
self.assertEqual(
base_cat.getCategoryChildTranslatedCompactLogicalPathItemList(),
[['', ''],
['The Title', 'the_id'],
['The Title/The S\xc3\xbcb T\xc3\xaftle', 'the_id/the_sub_id']],
['The Title/The Süb Tïtle', 'the_id/the_sub_id']],
)
def test_CategoryChildTitleItemListFilterNodeFilterLeave(self):
......@@ -1363,9 +1363,9 @@ class TestCMFCategory(ERP5TypeTestCase):
self.assertEqual(get(bc.id), list('aa'))
_set(bc.id, list('baa'))
self.assertEqual(get(bc.id), list('aba'))
_set(bc.id, map(base, 'bb'), 1)
_set(bc.id, [base(e) for e in 'bb'], 1)
self.assertEqual(get(bc.id), list('bb'))
_set(bc.id, map(base, 'abb'), 1)
_set(bc.id, [base(e) for e in 'abb'], 1)
self.assertEqual(get(bc.id), list('bab'))
_set(bc.id, ())
......
......@@ -16,14 +16,14 @@ from __future__ import print_function
from threading import Thread
from time import sleep
from urllib import addinfourl
from urllib import splithost
from urllib import splituser
from urllib import unquote
from urllib import splittype
from six.moves.urllib.parse import splithost
from six.moves.urllib.parse import splituser
from six.moves.urllib.parse import unquote
from six.moves.urllib.parse import splittype
import string
from urllib import FancyURLopener
from Cookie import SimpleCookie
from six.moves.urllib.request import FancyURLopener
from six.moves.http_cookies import SimpleCookie
def main():
max_thread = 7 # The number of thread we want by the same time
......@@ -55,7 +55,7 @@ def main():
print("thread: %i request: %i url: %s" % (i,request_number,url))
else:
for t in range(0,max_thread):
if threads[t].isAlive() == 0:
if threads[t].is_alive() == 0:
url = '//user%i:user%i@localhost:9673%s?__ac_name=user%s&__ac_password=user%s' % \
(t,t,list_url[i][:-1],t,t)
threads[t] = Thread(target=checker[t].CheckUrl,kwargs={'url':url})
......@@ -75,7 +75,7 @@ class URLOpener(FancyURLopener):
def open_http(self, url, data=None):
"""Use HTTP protocol."""
import httplib
import six.moves.http_client
user_passwd = None
if type(url) is type(""):
host, selector = splithost(url)
......@@ -99,10 +99,10 @@ class URLOpener(FancyURLopener):
if not host: raise IOError('http error', 'no host given')
if user_passwd:
import base64
auth = base64.encodestring(user_passwd).strip()
auth = base64.encodebytes(user_passwd).strip()
else:
auth = None
h = httplib.HTTP(host)
h = six.moves.http_client.HTTP(host)
if data is not None:
h.putrequest('POST', selector)
h.putheader('Content-type', 'application/x-www-form-urlencoded')
......@@ -142,7 +142,7 @@ class Checker(URLOpener):
try:
thread = Thread(target=self.SearchUrl,args=(url,))
thread.start()
while thread.isAlive():
while thread.is_alive():
sleep(0.5)
print("Connection to %s went fine" % url)
except IOError as err:
......
......@@ -121,7 +121,7 @@ class TestBusinessTemplateTwoFileExport(ERP5TypeTestCase):
pass
file_document_path = document_path + extension
self.assertEqual([os.path.basename(file_document_path)],
map(os.path.basename, exported))
list(map(os.path.basename, exported)))
with open(file_document_path, 'rb') as test_file:
self.assertEqual(test_file.read(), data)
else:
......@@ -129,7 +129,7 @@ class TestBusinessTemplateTwoFileExport(ERP5TypeTestCase):
with open(xml_document_path, 'rb') as xml_file:
xml_file_content = xml_file.read()
for exported_property in removed_property_list:
self.assertNotIn('<string>'+exported_property+'</string>',
self.assertNotIn(('<string>%s</string>' % exported_property).encode(),
xml_file_content)
import_template = self._importBusinessTemplate()
......@@ -152,7 +152,7 @@ class TestBusinessTemplateTwoFileExport(ERP5TypeTestCase):
import_template = self._exportAndReImport(
test_component_path,
".py",
test_component_kw["text_content"],
test_component_kw["text_content"].encode(),
['text_content'])
self.portal.portal_components.manage_delObjects([test_component_id])
......@@ -242,7 +242,7 @@ class TestBusinessTemplateTwoFileExport(ERP5TypeTestCase):
import_template = self._exportAndReImport(
python_script_path,
".py",
python_script_kw["_body"],
python_script_kw["_body"].encode(),
['_body','_code'])
skin_folder.manage_delObjects(python_script_id)
......@@ -315,7 +315,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForImageInImageModule(dict(
title = "foo",
data = "malformed data",
data = b"malformed data",
portal_type = "Image",
), '.bin')
......@@ -332,7 +332,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
file_id)
try:
args = file_document_kw['data'], ('data',) if extension else ()
args = file_document_kw['data'], (b'data',) if extension else ()
except KeyError:
args = None, ('data',)
import_template = self._exportAndReImport(
......@@ -356,7 +356,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo",
data = "a test file",
data = b"a test file",
content_type = "text/javascript",
portal_type = "File",
), '.js')
......@@ -368,7 +368,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo",
data = "a test file",
data = b"a test file",
content_type = "application/octet-stream",
portal_type = "File",
), '.bin')
......@@ -380,7 +380,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo",
data = "a test file",
data = b"a test file",
content_type = "application/epub+zip",
portal_type = "File",
), '.epub')
......@@ -392,7 +392,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo.js",
data = "a test file",
data = b"a test file",
portal_type = "File",
), '.js')
......@@ -403,7 +403,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo",
data = "<script> ... </script>",
data = b"<script> ... </script>",
default_reference = "foo.js",
portal_type = "File",
), '.js')
......@@ -415,7 +415,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo",
data = "a test file",
data = b"a test file",
content_type = None,
portal_type = "File",
), '.bin')
......@@ -429,7 +429,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo",
data = "a test file",
data = b"a test file",
content_type = "video/wavelet",
portal_type = "File",
), '.bin')
......@@ -443,7 +443,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo",
data = "a test file",
data = b"a test file",
content_type = "text/x-uri",
portal_type = "File",
), '.txt')
......@@ -454,7 +454,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
where extension (.xml, exported as ._xml to avoid conflict with the meta-data file)
is identified by the title
"""
file_content = """<person>
file_content = b"""<person>
<name>John</name>
<surname>Doe</surname>
</person>
......@@ -472,7 +472,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
where extension (.xml, exported as ._xml to avoid conflict with the meta-data file)
is identified by the title
"""
file_content = """<person>
file_content = b"""<person>
<name>John</name>
<surname>Doe</surname>
</person>
......@@ -524,7 +524,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""Test Business Template Import And Export With A PDF Document"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo.pdf",
data ="pdf content, maybe should update for base64 sample" ,
data =b"pdf content, maybe should update for base64 sample" ,
portal_type = "PDF",
), '.pdf')
......@@ -563,7 +563,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
import_template = self._exportAndReImport(
method_document_path,
".sql",
'dummy_method_template',
b'dummy_method_template',
['src'])
catalog.manage_delObjects([method_id])
......@@ -611,7 +611,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
import_template = self._exportAndReImport(
method_document_path,
".sql",
'dummy_method_template',
b'dummy_method_template',
['src'])
catalog.manage_delObjects([method_id])
......@@ -657,7 +657,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
import_template = self._exportAndReImport(
method_document_path,
".sql",
'dummy_method_template',
b'dummy_method_template',
['src'])
self.portal.portal_skins[skin_folder_id].manage_delObjects([method_id])
......@@ -696,7 +696,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
import_template = self._exportAndReImport(
page_template_path,
".zpt",
page_template_kw['_text'],
page_template_kw['_text'].encode(),
['_text'])
self.portal.portal_skins[skin_folder_id].manage_delObjects([page_template_id])
......@@ -747,7 +747,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
import_template = self._exportAndReImport(
dtml_method_path,
".js",
dtml_method_kw['raw'],
dtml_method_kw['raw'].encode(),
['raw'])
self.portal.portal_skins[skin_folder_id].manage_delObjects([dtml_method_id])
......@@ -798,7 +798,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
import_template = self._exportAndReImport(
dtml_method_path,
".txt",
dtml_method_kw['raw'],
dtml_method_kw['raw'].encode(),
['raw'])
self.portal.portal_skins[skin_folder_id].manage_delObjects([dtml_method_id])
......@@ -862,7 +862,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo",
data = "", # XXX a dummy string in data leads to 'NotConvertedError'
data = b"", # XXX dummy data in data leads to 'NotConvertedError'
portal_type = "Spreadsheet",
), '.bin')
......@@ -873,7 +873,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo",
data = "", # XXX a dummy string in data leads to 'NotConvertedError'
data = b"", # XXX dummy data in data leads to 'NotConvertedError'
content_type = "application/vnd.oasis.opendocument.spreadsheet",
portal_type = "Spreadsheet",
), '.ods')
......@@ -885,7 +885,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
"""
self._checkTwoFileImportExportForDocumentInDocumentModule(dict(
title = "foo.xlsx",
data = "", # XXX a dummy string in data leads to 'NotConvertedError'
data = b"", # XXX dummy data in data leads to 'NotConvertedError'
portal_type = "Spreadsheet",
), '.xlsx')
......@@ -910,7 +910,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
import_template = self._exportAndReImport(
test_page_document_path,
".html",
test_page_data_kw["text_content"],
test_page_data_kw["text_content"].encode(),
["text_content"])
self.portal.test_page_module.manage_delObjects([test_page_id])
......@@ -954,7 +954,7 @@ AAAFCAYAAACNbyblAAAAHElEQVQI12P4//8/w38GIAXDIBKE0DHxgljNBAAO
import_template = self._exportAndReImport(
python_script_path,
".py",
python_script_kw["_body"],
python_script_kw["_body"].encode(),
['_body','_code'])
self.portal.portal_skins[skin_folder_id].manage_delObjects([python_script_id])
......
......@@ -73,8 +73,8 @@ class TestERP5PythonScript(ERP5TypeTestCase):
basic='%s:%s' % (self.manager_username, self.manager_password),
handle_errors=False,
)
self.assertIn('ERP5 Python Scripts', resp.getBody())
self.assertIn('addPythonScriptThroughZMI', resp.getBody())
self.assertIn(b'ERP5 Python Scripts', resp.getBody())
self.assertIn(b'addPythonScriptThroughZMI', resp.getBody())
def test_call(self):
self.script.setBody('return "Hello"')
......
......@@ -30,12 +30,13 @@
import threading
import unittest
import urllib
from six.moves.urllib.request import urlopen
import transaction
import pkg_resources
from DateTime import DateTime
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript
from six.moves import range
ZEO5 = pkg_resources.parse_version(
pkg_resources.get_distribution('ZEO').version
......@@ -67,13 +68,13 @@ class TestInvalidationBug(ERP5TypeTestCase):
query = connection.factory()('-' + connection.connection_string).query
sql = "rollback\0select * from %s where path='%s'" % (table, path)
test_list.append(lambda query=query, sql=sql: len(query(sql)[1]))
result_list = [map(apply, test_list)]
result_list = [[test() for test in test_list]]
Transaction_commitResources = transaction.Transaction._commitResources
connection = module._p_jar
def _commitResources(self):
def tpc_finish(rm, txn):
rm.__class__.tpc_finish(rm, txn)
result_list.append(None if rm is connection else map(apply, test_list))
result_list.append(None if rm is connection else [test() for test in test_list])
try:
for rm in self._resources:
rm.tpc_finish = lambda txn, rm=rm: tpc_finish(rm, txn)
......@@ -163,7 +164,7 @@ class TestInvalidationBug(ERP5TypeTestCase):
storage._server = None
# ... monkey-patch done
## create object
urllib.urlopen(new_content_url).read()
urlopen(new_content_url).read()
## validate reindex activity
activity_tool.distribute()
self.assertEqual(1, len(activity_tool.getMessageList()))
......@@ -218,7 +219,7 @@ if (count % 500) < 5:
log('creation speed: %s obj/s' % ((count - start[0]) /
(86400 * (DateTime() - start[1]))))
""")
for x in xrange(0,200):
for x in range(0,200):
module.activate(activity='SQLQueue', priority=2).create_script()
self.tic()
......
......@@ -31,7 +31,7 @@
TODO: test variation
test selection_report
"""
from __future__ import division
import os
import random
import unittest
......@@ -47,6 +47,7 @@ from Products.ERP5Type.tests.utils import reindex
from zExceptions import BadRequest
import six
from six.moves import range
class InventoryAPITestCase(ERP5TypeTestCase):
"""Base class for Inventory API Tests {{{
......@@ -893,7 +894,7 @@ class TestInventoryList(InventoryAPITestCase):
getInventoryList = self.getSimulationTool().getInventoryList
inventory_list = getInventoryList()
self.assertEqual(str(inventory_list.__class__),
'Shared.DC.ZRDB.Results.Results')
'Shared.DC.ZRDB.Results.Results' if six.PY2 else "<class 'Shared.DC.ZRDB.Results.Results'>")
# the brain is InventoryListBrain
self.assertIn('InventoryListBrain',
[c.__name__ for c in inventory_list._class.__bases__])
......@@ -1704,7 +1705,7 @@ class TestMovementHistoryList(InventoryAPITestCase):
getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
mvt_history_list = getMovementHistoryList()
self.assertEqual(str(mvt_history_list.__class__),
'Shared.DC.ZRDB.Results.Results')
'Shared.DC.ZRDB.Results.Results' if six.PY2 else "<class 'Shared.DC.ZRDB.Results.Results'>")
# default is an empty list
self.assertEqual(0, len(mvt_history_list))
......@@ -2966,7 +2967,7 @@ class TestInventoryCacheTable(InventoryAPITestCase):
def afterSetUp(self):
InventoryAPITestCase.afterSetUp(self)
self.CACHE_LAG = cache_lag = self.getSimulationTool().getInventoryCacheLag()
min_lag = cache_lag / 2
min_lag = cache_lag // 2
self.NOW = now = DateTime(DateTime().strftime("%Y-%m-%d %H:%M:%S UTC"))
self.CACHE_DATE = cache_date = now - min_lag
from erp5.component.tool.SimulationTool import MYSQL_MIN_DATETIME_RESOLUTION
......@@ -3039,7 +3040,7 @@ class TestInventoryCacheTable(InventoryAPITestCase):
inventory_list = inventory_list[:] # That list is modified in this method
for criterion_dict in criterion_dict_list:
success = False
for inventory_position in xrange(len(inventory_list)):
for inventory_position in range(len(inventory_list)):
if self._doesInventoryLineMatch(criterion_dict,
inventory_list[inventory_position]):
del inventory_list[inventory_position]
......
......@@ -58,7 +58,7 @@ class Aspell(object):
command = 'echo %s | aspell -l %s -a' % (word, language)
subprocess = Popen(command, shell=True, stdin=PIPE,
stdout=PIPE, stderr=PIPE, close_fds=True)
return subprocess.communicate()[0].split('\n')[1:]
return subprocess.communicate()[0].decode('utf-8').split('\n')[1:]
class TestSpellChecking(ERP5TypeTestCase):
......@@ -115,7 +115,7 @@ class TestSpellChecking(ERP5TypeTestCase):
message = '"%s" is misspelled, suggestion are : "%s"'
result_dict = {}
for word, result_list in six.iteritems(self.spellChecker(sentence)):
filtered_result_list = filter(lambda x: x not in ('*', ''), result_list)
filtered_result_list = [x for x in result_list if x not in ('*', '')]
if filtered_result_list:
result_dict[word] = message % (word, \
filtered_result_list[0].split(':')[-1].strip())
......@@ -133,8 +133,8 @@ class TestSpellChecking(ERP5TypeTestCase):
# check some suggestion are given for a small mistake
self.assertNotEqual(self.validate_spell('canceled'), {})
self.assertIn('is misspelled', \
self.validate_spell('canceled').values()[0])
self.assertIn('is misspelled',
list(self.validate_spell('canceled').values())[0])
def test_business_template_list_with_workflow_template_item(self):
"""
......
......@@ -31,7 +31,7 @@ import unittest
import MethodObject
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.Utils import getMessageIdWithContext
from Products.ERP5Type.Utils import getMessageIdWithContext, str2unicode, unicode2str
# dependency order
target_business_templates = (
......@@ -100,7 +100,7 @@ class TestWorkflowStateTitleTranslation(ERP5TypeTestCase):
if (result == msgid) and (self.lang != 'en'):
#result = None
result = self.portal.Localizer.erp5_ui.gettext(msgid)
return result.encode('utf8')
return unicode2str(result)
def logMessage(self, message):
self.message += '%s\n' % message
......@@ -180,7 +180,7 @@ class TestWorkflowStateTitleTranslation(ERP5TypeTestCase):
if error:
for key, item_list in error_dict.items():
if len(item_list) != 0:
self.logMessage("\n'%s'" % key.encode('utf-8'))
self.logMessage("\n'%s'" % unicode2str(key))
self.logMessage('\t### Conflicting workflow with common states (ie, what user can see) ###')
for item in item_list:
# XXX Improve rendering
......@@ -250,7 +250,7 @@ class TestWorkflowStateTitleTranslation(ERP5TypeTestCase):
'item_workflow'),
add=1)
message_catalog.message_edit('Validated', self.lang,
'Validé'.decode('utf8'), '')
str2unicode('Validé'), '')
message_catalog.message_edit(getMessageIdWithContext('Validated',
'state',
'item_workflow'),
......@@ -433,7 +433,7 @@ class TestTranslation(ERP5TypeTestCase):
</tal:ommit>
""" % domain
self.myzpt.pt_edit(zpt_template, 'text/html')
return self.myzpt(words=words).encode('utf-8').split()
return unicode2str(self.myzpt(words=words)).split()
def test_ZPT_translation(self):
results = self.translate_by_zpt('erp5_ui', 'Person', 'Draft')
......
......@@ -29,7 +29,7 @@ import os
import tarfile
import xml.parsers.expat
import xml.dom.minidom
from urllib import url2pathname
from six.moves.urllib.request import url2pathname
from ZODB.DemoStorage import DemoStorage
from ZODB import DB
from Products.ERP5Type.XMLExportImport import importXML
......@@ -165,14 +165,14 @@ class BusinessTemplateInfoDir(BusinessTemplateInfoBase):
def findFileInfosByName(self, startswith='', endswith=''):
allfiles = []
def visit(arg, dirname, names):
if '.svn' in dirname:
return
for i in names:
path = os.path.join(self.target, dirname, i)
for root, dir_list, file_list in os.walk(self.target):
# We can drop this block as we no longer use Subversion.
if '.svn' in root.split(os.path.sep):
continue
for file_ in file_list:
path = os.path.join(self.target, root, file_)
if os.path.isfile(path):
allfiles.append(path)
os.path.walk(self.target, visit, None)
for i in allfiles:
if i.startswith(startswith) and i.endswith(endswith):
yield i
......@@ -186,4 +186,5 @@ class BusinessTemplateInfoDir(BusinessTemplateInfoBase):
return fileinfo
def readFileInfo(self, fileinfo):
return open(fileinfo).read()
with open(fileinfo) as f:
return f.read()
......@@ -32,7 +32,6 @@
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from AccessControl.SecurityManagement import newSecurityManager
from six import StringIO
class TestFormPrintoutMixin(ERP5TypeTestCase):
run_all_test = 1
......@@ -49,7 +48,7 @@ class TestFormPrintoutMixin(ERP5TypeTestCase):
'''return odf document from the printout
'''
document_file = getattr(self.portal, printout_form.template, None)
document_file = StringIO(document_file).read()
document_file = bytes(document_file)
if document_file is not None:
return document_file
raise ValueError ('%s template not found' % printout_form.template)
......
......@@ -36,7 +36,7 @@ from AccessControl.SecurityManagement import newSecurityManager
from Acquisition import aq_base
from Products.ERP5OOo.tests.utils import Validator
from lxml import html
import email, urlparse, httplib
import email, six.moves.urllib.parse, six.moves.http_client
from Products.Formulator.MethodField import Method
......@@ -133,7 +133,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
file_name = part.get_filename()
......@@ -158,9 +158,9 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertTrue("History%s" % extension or self.attachment_file_extension in content)
tree = html.fromstring(content)
link, = [href for href in tree.xpath('//a/@href') if href]
relative_url =urlparse.urlparse(link)
relative_url =six.moves.urllib.parse.urlparse(link)
report = self.publish(relative_url.path+"?"+relative_url.query, '%s:%s' % (self.username, self.password))
self.assertEqual(httplib.OK, report.getStatus())
self.assertEqual(six.moves.http_client.OK, report.getStatus())
self.assertEqual(report.getHeader('content-type'), content_type or self.content_type)
def _checkDocument(self):
......@@ -198,7 +198,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
if content_type == "text/html":
......@@ -236,7 +236,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
if content_type == "text/html":
......@@ -260,7 +260,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
if content_type == "text/html":
......@@ -285,7 +285,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
if content_type == self.content_type:
......@@ -315,13 +315,13 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
'HTTP_ACCEPT_LANGUAGE': 'fr;q=0.9,en;q=0.8',
})
self.tic()
mail_message = email.message_from_string(self.portal.MailHost._last_message[2])
mail_message = email.message_from_string(self.portal.MailHost._last_message[2].decode())
# mail subject is translated
self.assertEqual('Historique', mail_message['subject'])
# content is translated
part, = [x for x in mail_message.walk() if x.get_content_type() == self.content_type]
self.assertIn(
'Historique',
b'Historique',
self.portal.portal_transforms.convertTo(
'text/plain',
part.get_payload(decode=True),
......@@ -345,14 +345,14 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
'HTTP_COOKIE': 'LOCALIZER_LANGUAGE="fr"',
})
self.tic()
mail_message = email.message_from_string(self.portal.MailHost._last_message[2])
mail_message = email.message_from_string(self.portal.MailHost._last_message[2].decode())
# mail subject is translated
self.assertEqual('Historique', mail_message['subject'])
# content is translated
mail_message = email.message_from_string(self.portal.MailHost._last_message[2])
mail_message = email.message_from_string(self.portal.MailHost._last_message[2].decode())
part, = [x for x in mail_message.walk() if x.get_content_type() == self.content_type]
self.assertIn(
'Historique',
b'Historique',
self.portal.portal_transforms.convertTo(
'text/plain',
part.get_payload(decode=True),
......@@ -386,7 +386,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
# after they are saved to DB and automatically migrated. The getProperty
# above, which is also what ods_style does, only work after the report
# state is updated.
report.__setstate__(aq_base(getattr(skin_folder, report_form_name)).__getstate__())
aq_base(report).__setstate__(aq_base(getattr(skin_folder, report_form_name)).__getstate__())
self.assertEqual(report.getProperty('title'), self.id())
# Report section method
......@@ -450,7 +450,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
# inspect the report as text and check the selection was initialized from
# request parameter.
mail_message = email.message_from_string(self.portal.MailHost._last_message[2])
mail_message = email.message_from_string(self.portal.MailHost._last_message[2].decode())
part, = [x for x in mail_message.walk() if x.get_content_type() == self.content_type]
doc = self.portal.document_module.newContent(
......@@ -487,7 +487,7 @@ class TestODSDeferredStyle(TestDeferredStyleBase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
file_name = part.get_filename()
......
......@@ -152,7 +152,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
# check the style is keept after the odg generation
self.assertEqual(final_document_style_dict, style_dict)
self.assertTrue(content_xml.find("Foo title!") > 0)
self.assertIn(b"Foo title!", content_xml)
self.assertEqual(request.RESPONSE.getHeader('content-type'),
'application/vnd.oasis.opendocument.graphics')
self.assertEqual(request.RESPONSE.getHeader('content-disposition'),
......@@ -161,12 +161,11 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
# 2. Normal case: change the field value and check again the ODF document
test1.setTitle("Changed Title!")
#foo_form.my_title.set_value('default', "Changed Title!")
odf_document = foo_printout.index_html(request)
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Changed Title!") > 0)
self.assertIn(b"Changed Title!", content_xml)
self._validate(odf_document)
# 3. False case: change the field name
......@@ -178,7 +177,8 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("you cannot find") > 0)
self.assertNotIn(b"you cannot find!", content_xml)
self._validate(odf_document)
# put back
foo_form.manage_renameObject('xxx_title', 'my_title', REQUEST=request)
......@@ -199,7 +199,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("call!") > 0)
self.assertIn(b"call!", content_xml)
# when just call FormPrintout, it does not change content-type
# Zope4 add charset=utf-8
self.assertTrue('text/html' in request.RESPONSE.getHeader('content-type'))
......@@ -211,7 +211,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Français") > 0)
self.assertIn(u"Français".encode('utf-8'), content_xml)
self._validate(odf_document)
# 6. Normal case: unicode string
......@@ -220,7 +220,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Français test2") > 0)
self.assertIn(u"Français test2".encode('utf-8'), content_xml)
# leave _validate() here not to forget the validation failure
self._validate(odf_document)
......@@ -278,8 +278,8 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertEqual(final_document_style_dict, style_dict)
# check the two lines are prensent in the generated document
self.assertTrue(content_xml.find('A text a bit more longer') > 0)
self.assertTrue(content_xml.find('With a newline !') > 0)
self.assertIn(b'A text a bit more longer', content_xml)
self.assertIn(b'With a newline !', content_xml)
# check there is two line-break in the element my_description
text_xpath = '//draw:frame[@draw:name="my_description"]//text:line-break'
......@@ -403,7 +403,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
# check the style is keept after the odg generation
self.assertEqual(final_document_style_dict, style_dict)
self.assertTrue(content_xml.find("Foo title!") > 0)
self.assertIn(b"Foo title!", content_xml)
self.assertEqual(request.RESPONSE.getHeader('content-type'),
'application/vnd.oasis.opendocument.graphics')
self.assertEqual(request.RESPONSE.getHeader('content-disposition'),
......@@ -412,12 +412,11 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
# 2. Normal case: change the field value and check again the ODF document
test1.setTitle("Changed Title!")
#foo_form.my_title.set_value('default', "Changed Title!")
odf_document = foo_printout.index_html(request)
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Changed Title!") > 0)
self.assertIn(b"Changed Title!", content_xml)
self._validate(odf_document)
# 3. False case: change the field name
......@@ -429,7 +428,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("you cannot find") > 0)
self.assertNotIn(b"you cannot find", content_xml)
self._validate(odf_document)
# put back
foo_form.manage_renameObject('xxx_title', 'my_title', REQUEST=request)
......@@ -449,7 +448,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("call!") > 0)
self.assertIn(b"call!", content_xml)
self.assertEqual(request.RESPONSE.getHeader('content-type'),
'application/vnd.oasis.opendocument.graphics')
self._validate(odf_document)
......@@ -460,7 +459,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Français") > 0)
self.assertIn(u"Français".encode('utf-8'), content_xml)
self._validate(odf_document)
# 6. Normal case: unicode string
......@@ -469,7 +468,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Français test2") > 0)
self.assertIn(u"Français test2".encode('utf-8'), content_xml)
self._validate(odf_document)
def test_suite():
......
......@@ -37,9 +37,11 @@ from Products.MimetypesRegistry.mime_types.magic import guessMime
from Products.ERP5OOo.OOoUtils import OOoBuilder
from Products.ERP5OOo.tests.utils import Validator
from Products.ERP5Type.tests.utils import FileUpload
from Products.ERP5Type.Utils import bytes2str
from DateTime import DateTime
from lxml import etree
import os
from six.moves import range
class TestFormPrintoutAsODT(TestFormPrintoutMixin):
......@@ -147,7 +149,7 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Foo title!") > 0)
self.assertTrue(bytes2str(content_xml).find("Foo title!") > 0)
self.assertEqual(request.RESPONSE.getHeader('content-type'),
'application/vnd.oasis.opendocument.text')
self.assertEqual(request.RESPONSE.getHeader('content-disposition'),
......@@ -166,7 +168,7 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Changed Title!") > 0)
self.assertTrue(bytes2str(content_xml).find("Changed Title!") > 0)
self._validate(odf_document)
# 3. False case: change the field name
......@@ -178,7 +180,7 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("you cannot find") > 0)
self.assertFalse(bytes2str(content_xml).find("you cannot find") > 0)
self._validate(odf_document)
# put back
foo_form.manage_renameObject('xxx_title', 'my_title', REQUEST=request)
......@@ -198,7 +200,7 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("call!") > 0)
self.assertTrue(bytes2str(content_xml).find("call!") > 0)
# Zope4 add charset=utf-8
self.assertTrue('text/html' in request.RESPONSE.getHeader('content-type'))
self._validate(odf_document)
......@@ -209,7 +211,7 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Français") > 0)
self.assertTrue(bytes2str(content_xml).find("Français") > 0)
self._validate(odf_document)
# 6. Normal case: unicode string
......@@ -218,7 +220,7 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Français test2") > 0)
self.assertTrue(bytes2str(content_xml).find("Français test2") > 0)
self._validate(odf_document)
# 7. Change Filename of downloadable file
......@@ -306,8 +308,8 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
#test_output.write(odf_document)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("ZZZ test here ZZZ") > 0)
self.assertTrue(content_xml.find("test title") < 0)
self.assertTrue(bytes2str(content_xml).find("ZZZ test here ZZZ") > 0)
self.assertTrue(bytes2str(content_xml).find("test title") < 0)
self._validate(odf_document)
def test_02_Table_01_Normal(self):
......@@ -359,7 +361,7 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("foo_title_1") > 0)
self.assertTrue(bytes2str(content_xml).find("foo_title_1") > 0)
self._validate(odf_document)
def test_02_Table_02_SmallerThanListboxColumns(self):
......@@ -400,8 +402,8 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("foo_title_1") > 0)
self.assertTrue(content_xml.find("foo_title_2") > 0)
self.assertFalse(bytes2str(content_xml).find("foo_title_1") > 0)
self.assertTrue(bytes2str(content_xml).find("foo_title_2") > 0)
self._validate(odf_document)
def test_02_Table_03_ListboxColumnsLargerThanTable(self):
......@@ -436,8 +438,8 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("foo_title_2") > 0)
self.assertTrue(content_xml.find("foo_title_3") > 0)
self.assertFalse(bytes2str(content_xml).find("foo_title_2") > 0)
self.assertTrue(bytes2str(content_xml).find("foo_title_3") > 0)
self._validate(odf_document)
def test_02_Table_04_ListboxHasNotStat(self):
......@@ -472,8 +474,8 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(odf_document is not None)
self.assertFalse(content_xml.find("foo_title_3") > 0)
self.assertTrue(content_xml.find("foo_title_4") > 0)
self.assertFalse(bytes2str(content_xml).find("foo_title_3") > 0)
self.assertTrue(bytes2str(content_xml).find("foo_title_4") > 0)
content = etree.XML(content_xml)
table_row_xpath = '//table:table[@table:name="listbox"]/table:table-row'
......@@ -541,7 +543,7 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
#Check that each listbox values are inside ODT table cells
xpath_result_expression = '//table:table[@table:name="listbox2"]/table:table-row/table:table-cell/text:p/text()'
self.assertEqual(['foo_1', 'foo_title_5', '0.0', 'foo_2', 'foo_2', '0.0', '1234.5'], content_tree.xpath(xpath_result_expression, namespaces=content_tree.nsmap))
self.assertFalse(content_xml.find("foo_title_4") > 0)
self.assertFalse(bytes2str(content_xml).find("foo_title_4") > 0)
self._validate(odf_document)
# put back the field name
......@@ -589,8 +591,8 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("foo_title_5") > 0)
self.assertTrue(content_xml.find("foo_title_6") > 0)
self.assertFalse(bytes2str(content_xml).find("foo_title_5") > 0)
self.assertTrue(bytes2str(content_xml).find("foo_title_6") > 0)
self._validate(odf_document)
# put back the field name
......@@ -626,8 +628,8 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("foo_title_6") > 0)
self.assertTrue(content_xml.find("foo_title_7") > 0)
self.assertFalse(bytes2str(content_xml).find("foo_title_6") > 0)
self.assertTrue(bytes2str(content_xml).find("foo_title_7") > 0)
content = etree.XML(content_xml)
table_row_xpath = '//table:table[@table:name="listbox"]/table:table-row'
......@@ -689,7 +691,7 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
request = self.app.REQUEST
request['here'] = test1
for i in xrange(3, 7):
for i in range(3, 7):
foo_id = "foo_%s" % i
if test1._getOb(foo_id, None) is None:
test1.newContent(foo_id, portal_type='Foo Line')
......@@ -701,7 +703,7 @@ class TestFormPrintoutAsODT(TestFormPrintoutMixin):
r"""
line_index = kw['list_index']
line_number = line_index + 1
for n in xrange(6, 0, -1):
for n in range(6, 0, -1):
if line_number % n is 0:
return "line" + str(n)
"""
......@@ -728,7 +730,7 @@ for n in xrange(6, 0, -1):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("foo_title_9") > 0)
self.assertTrue(bytes2str(content_xml).find("foo_title_9") > 0)
content = etree.XML(content_xml)
table_row_xpath = '//table:table[@table:name="listbox4"]/table:table-row'
......@@ -837,7 +839,7 @@ return report_section_list
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("foo_04_Iteration_1") > 0)
self.assertTrue(bytes2str(content_xml).find("foo_04_Iteration_1") > 0)
content = etree.XML(content_xml)
frame_xpath = '//draw:frame[@draw:name="FooReport_getReportSectionList"]'
frame_list = content.xpath(frame_xpath, namespaces=content.nsmap)
......@@ -864,7 +866,7 @@ return []
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("foo_04_Iteration") > 0)
self.assertFalse(bytes2str(content_xml).find("foo_04_Iteration") > 0)
content = etree.XML(content_xml)
frame_xpath = '//draw:frame[@draw:name="FooReport_getReportSectionList"]'
frame_list = content.xpath(frame_xpath, namespaces=content.nsmap)
......@@ -941,7 +943,7 @@ return report_section_list
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("foo_04_Iteration_1") > 0)
self.assertTrue(bytes2str(content_xml).find("foo_04_Iteration_1") > 0)
content = etree.XML(content_xml)
section_xpath = '//text:section[@text:name="FooReport_getReportSectionList"]'
section_list = content.xpath(section_xpath, namespaces=content.nsmap)
......@@ -968,7 +970,7 @@ return []
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("foo_04_Iteration") > 0)
self.assertFalse(bytes2str(content_xml).find("foo_04_Iteration") > 0)
content = etree.XML(content_xml)
section_xpath = '//text:section[@text:name="FooReport_getReportSectionList"]'
section_list = content.xpath(section_xpath, namespaces=content.nsmap)
......@@ -1053,7 +1055,7 @@ return report_section_list
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("foo_04_Iteration_1") > 0)
self.assertTrue(bytes2str(content_xml).find("foo_04_Iteration_1") > 0)
content = etree.XML(content_xml)
section_xpath = '//text:section[@text:name="your_report_box1"]'
section_list = content.xpath(section_xpath, namespaces=content.nsmap)
......@@ -1080,7 +1082,7 @@ return []
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("foo_04_Iteration") > 0)
self.assertFalse(bytes2str(content_xml).find("foo_04_Iteration") > 0)
content = etree.XML(content_xml)
section_xpath = '//text:section[@text:name="your_report_box1"]'
section_list = content.xpath(section_xpath, namespaces=content.nsmap)
......@@ -1141,7 +1143,7 @@ return []
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
# confirming the image was removed
self.assertTrue(content_xml.find('<draw:image xlink:href') < 0)
self.assertTrue(bytes2str(content_xml).find('<draw:image xlink:href') < 0)
self._validate(odf_document)
def test_08_OOoConversion(self):
......
......@@ -38,6 +38,7 @@ from AccessControl.SecurityManagement import newSecurityManager
from AccessControl import Unauthorized
from DateTime import DateTime
from Products.ERP5Type.Utils import bytes2str, convertToUpperCase, str2bytes
from Products.ERP5Type.Utils import convertToUpperCase, str2bytes
from Products.ERP5Type.tests.ERP5TypeTestCase import (
ERP5TypeTestCase, _getConversionServerUrlList)
from Products.CMFCore.WorkflowCore import WorkflowException
......@@ -50,10 +51,10 @@ from zExceptions import BadRequest
from zExceptions import Redirect
import ZPublisher.HTTPRequest
from unittest import expectedFailure
import urllib
import urllib2
import httplib
import urlparse
import six.moves.http_client
import six.moves.urllib.parse, six.moves.urllib.request
import six
import base64
import mock
......@@ -63,15 +64,6 @@ FILENAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z&é@{]{3,7})-(?P<language>[a-z]
REFERENCE_REGULAR_EXPRESSION = "(?P<reference>[A-Z&é@{]{3,7})(-(?P<language>[a-z]{2}))?(-(?P<version>[0-9]{3}))?"
def makeFilePath(name):
return os.path.join(TEST_FILES_HOME, name)
def makeFileUpload(name, as_name=None):
if as_name is None:
as_name = name
path = makeFilePath(name)
return FileUpload(path, as_name)
class IngestionTestCase(ERP5TypeTestCase):
def getBusinessTemplateList(self):
......@@ -131,6 +123,16 @@ class IngestionTestCase(ERP5TypeTestCase):
skin_tool.custom._delObject(script_id)
self.commit()
def makeFilePath(self, name):
return os.path.join(TEST_FILES_HOME, name)
def makeFileUpload(self, name, as_name=None):
if as_name is None:
as_name = name
path = self.makeFilePath(name)
fu = FileUpload(path, as_name)
self.addCleanup(fu.close)
return fu
class TestIngestion(IngestionTestCase):
"""
......@@ -278,7 +280,7 @@ class TestIngestion(IngestionTestCase):
"""
for revision, format in enumerate(format_list):
filename = 'TEST-en-002.%s' %format
f = makeFileUpload(filename)
f = self.makeFileUpload(filename)
document.edit(file=f)
self.tic()
self.assertTrue(document.hasFile())
......@@ -298,7 +300,7 @@ class TestIngestion(IngestionTestCase):
can be converted to any of the formats in asserted_target_list
"""
filename = 'TEST-en-002.' + format
f = makeFileUpload(filename)
f = self.makeFileUpload(filename)
document.edit(file=f)
self.tic()
# We call clear cache to be sure that the target list is updated
......@@ -328,7 +330,7 @@ class TestIngestion(IngestionTestCase):
old_portal_type = ''
for extension, portal_type in extension_to_type:
filename = 'TEST-en-002.%s' %extension
file = makeFileUpload(filename)
file = self.makeFileUpload(filename)
# if we change portal type we must change version because
# mergeRevision would fail
if portal_type != old_portal_type:
......@@ -525,7 +527,7 @@ class TestIngestion(IngestionTestCase):
document = self.portal.restrictedTraverse(sequence.get('document_path'))
# First revision is 1 (like web pages)
self.assertEqual(document.getRevision(), '1')
f = makeFileUpload(filename)
f = self.makeFileUpload(filename)
document.edit(file=f)
self.assertTrue(document.hasFile())
self.assertEqual(document.getFilename(), filename)
......@@ -539,7 +541,7 @@ class TestIngestion(IngestionTestCase):
Upload a file from view form and make sure this increases the revision
"""
document = self.portal.restrictedTraverse(sequence.get('document_path'))
f = makeFileUpload('TEST-en-002.doc')
f = self.makeFileUpload('TEST-en-002.doc')
revision = document.getRevision()
document.edit(file=f)
self.assertEqual(document.getRevision(), str(int(revision) + 1))
......@@ -550,7 +552,7 @@ class TestIngestion(IngestionTestCase):
"""
Upload a file from contribution.
"""
f = makeFileUpload('TEST-en-002.doc')
f = self.makeFileUpload('TEST-en-002.doc')
document = self.portal.portal_contributions.newContent(file=f)
sequence.edit(document_path=document.getPath())
self.commit()
......@@ -565,7 +567,7 @@ class TestIngestion(IngestionTestCase):
number_of_document = len(self.portal.document_module.objectIds())
self.assertNotIn('This document is modified.', document.asText())
f = makeFileUpload('TEST-en-002-modified.doc')
f = self.makeFileUpload('TEST-en-002-modified.doc')
f.filename = 'TEST-en-002.doc'
self.portal.portal_contributions.newContent(file=f)
......@@ -581,7 +583,7 @@ class TestIngestion(IngestionTestCase):
"""
Upload another file from contribution.
"""
f = makeFileUpload('ANOTHE-en-001.doc')
f = self.makeFileUpload('ANOTHE-en-001.doc')
document = self.portal.portal_contributions.newContent(id='two', file=f)
sequence.edit(document_path=document.getPath())
self.tic()
......@@ -610,7 +612,7 @@ class TestIngestion(IngestionTestCase):
self.assertEqual(property_dict['description'], 'comments')
self.assertEqual(property_dict['subject_list'], ['keywords'])
# Then make sure metadata discovery works
f = makeFileUpload(filename)
f = self.makeFileUpload(filename)
document.edit(file=f)
self.assertEqual(document.getReference(), 'TEST')
self.assertEqual(document.getLanguage(), 'en')
......@@ -644,7 +646,7 @@ class TestIngestion(IngestionTestCase):
Upload with custom getPropertyDict methods
check that all metadata are correct
"""
f = makeFileUpload('TEST-en-002.doc')
f = self.makeFileUpload('TEST-en-002.doc')
document = self.portal.portal_contributions.newContent(file=f)
self.tic()
# Then make sure content discover works
......@@ -765,54 +767,71 @@ class TestIngestion(IngestionTestCase):
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.checkDocumentExportList(document, 'doc',
['pdf', 'doc', 'rtf', 'txt', 'odt'])
if six.PY2:
# legacy format will be replaced
expectedFailure(self.checkDocumentExportList)(document, 'doc',
['writer.html'])
else:
self.assertRaises(AssertionError, self.checkDocumentExportList,
document, 'doc', ['writer.html'])
def stepCheckSpreadsheetDocumentExportList(self, sequence=None,
sequence_list=None, **kw):
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.checkDocumentExportList(document, 'xls', ['csv', 'xls', 'ods', 'pdf'])
if six.PY2:
# legacy format will be replaced
expectedFailure(self.checkDocumentExportList)(document, 'xls',
['calc.html', 'calc.pdf'])
else:
self.assertRaises(AssertionError, self.checkDocumentExportList,
document, 'xls', ['calc.html', 'calc.pdf'])
def stepCheckPresentationDocumentExportList(self, sequence=None,
sequence_list=None, **kw):
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.checkDocumentExportList(document, 'ppt', ['ppt', 'odp', 'pdf'])
if six.PY2:
# legacy format will be replaced
expectedFailure(self.checkDocumentExportList)(document,
'ppt', ['impr.pdf'])
else:
self.assertRaises(AssertionError, self.checkDocumentExportList,
document, 'ppt', ['impr.pdf'])
def stepCheckDrawingDocumentExportList(self, sequence=None,
sequence_list=None, **kw):
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.checkDocumentExportList(document, 'sxd', ['jpg', 'svg', 'pdf', 'odg'])
if six.PY2:
# legacy format will be replaced
expectedFailure(self.checkDocumentExportList)(document,
'sxd', ['draw.pdf'])
else:
self.assertRaises(AssertionError, self.checkDocumentExportList,
document, 'sxd', ['draw.pdf'])
def stepExportPDF(self, sequence=None, sequence_list=None, **kw):
"""
Try to export PDF to text and HTML
"""
document = self.portal.restrictedTraverse(sequence.get('document_path'))
f = makeFileUpload('TEST-en-002.pdf')
f = self.makeFileUpload('TEST-en-002.pdf')
document.edit(file=f)
mime, text = document.convert('text')
self.assertIn('magic', text)
self.assertTrue(mime == 'text/plain')
self.assertEqual(mime, 'text/plain')
mime, html = document.convert('html')
self.assertIn('magic', html)
self.assertTrue(mime == 'text/html')
self.assertEqual(mime, 'text/html')
def stepExportImage(self, sequence=None, sequence_list=None, **kw):
"""
Check we are able to resize images
"""
image = self.portal.restrictedTraverse(sequence.get('document_path'))
f = makeFileUpload('TEST-en-002.jpg')
f = self.makeFileUpload('TEST-en-002.jpg')
image.edit(file=f)
self.tic()
mime, data = image.convert(None)
......@@ -941,8 +960,8 @@ class TestIngestion(IngestionTestCase):
"""
Email was sent in by someone to ERP5.
"""
f = open(makeFilePath('email_from.txt'))
document = self.receiveEmail(f.read())
with open(self.makeFilePath('email_from.txt'), "rb") as f:
self.receiveEmail(f.read())
self.tic()
def stepReceiveMultipleAttachmentsEmail(self, sequence=None,
......@@ -950,8 +969,8 @@ class TestIngestion(IngestionTestCase):
"""
Email was sent in by someone to ERP5.
"""
f = open(makeFilePath('email_multiple_attachments.eml'))
document = self.receiveEmail(f.read())
with open(self.makeFilePath('email_multiple_attachments.eml'), "rb") as f:
self.receiveEmail(f.read())
self.tic()
def stepVerifyEmailedMultipleDocumentsInitialContribution(self, sequence=None, sequence_list=None, **kw):
......@@ -1378,7 +1397,7 @@ class TestIngestion(IngestionTestCase):
"""
Upload a file from contribution.
"""
f = makeFileUpload('TEST-en-002.doc', 'T&é@{T-en-002.doc')
f = self.makeFileUpload('TEST-en-002.doc', 'T&é@{T-en-002.doc')
document = self.portal.portal_contributions.newContent(file=f)
sequence.edit(document_path=document.getPath())
self.commit()
......@@ -1450,7 +1469,7 @@ class TestIngestion(IngestionTestCase):
"""
portal = self.portal
contribution_tool = getToolByName(portal, 'portal_contributions')
file_object = makeFileUpload('TEST-en-002.doc')
file_object = self.makeFileUpload('TEST-en-002.doc')
document = contribution_tool.newContent(file=file_object)
self.assertEqual(document.getFilename(), 'TEST-en-002.doc')
my_filename = 'Something.doc'
......@@ -1473,7 +1492,7 @@ class TestIngestion(IngestionTestCase):
site='arctic/spitsbergen'))
portal.document_module.manage_setLocalRoles(user.Person_getUserId(), ['Assignor',])
self.tic()
file_object = makeFileUpload('TEST-en-002.doc')
file_object = self.makeFileUpload('TEST-en-002.doc')
document = contribution_tool.newContent(file=file_object)
document.discoverMetadata(document.getFilename(), user.Person_getUserId())
self.tic()
......@@ -1497,7 +1516,7 @@ class TestIngestion(IngestionTestCase):
portal.document_module.manage_setLocalRoles(other_user.Person_getUserId(), ['Assignor',])
self.tic()
file_object = makeFileUpload('TEST-en-002.doc')
file_object = self.makeFileUpload('TEST-en-002.doc')
document = contribution_tool.newContent(file=file_object)
# We only consider the higher group of assignments
......@@ -1516,6 +1535,7 @@ class TestIngestion(IngestionTestCase):
"""
input_script_id = 'Document_getPropertyDictFromContent'
python_code = """from Products.CMFCore.utils import getToolByName
import six
portal = context.getPortalObject()
information = context.getContentInformation()
......@@ -1524,7 +1544,7 @@ property_id_list = context.propertyIds()
for k, v in information.items():
key = k.lower()
if v:
if isinstance(v, unicode):
if six.PY2 and isinstance(v, unicode):
v = v.encode('utf-8')
if key in property_id_list:
if key == 'reference':
......@@ -1549,7 +1569,7 @@ return result
document_to_ingest = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
data=b'Hello World!')
document_to_ingest.publish()
self.tic()
url = document_to_ingest.absolute_url() + '/getData'
......@@ -1571,7 +1591,7 @@ return result
document_to_ingest2 = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
data=b'Hello World!')
document_to_ingest2.publish()
self.tic()
url2 = document_to_ingest2.absolute_url() + '/getData'
......@@ -1597,6 +1617,7 @@ return result
"""
input_script_id = 'Document_getPropertyDictFromContent'
python_code = """from Products.CMFCore.utils import getToolByName
import six
portal = context.getPortalObject()
information = context.getContentInformation()
......@@ -1605,7 +1626,7 @@ property_id_list = context.propertyIds()
for k, v in information.items():
key = k.lower()
if v:
if isinstance(v, unicode):
if six.PY2 and isinstance(v, unicode):
v = v.encode('utf-8')
if key in property_id_list:
if key == 'reference':
......@@ -1627,7 +1648,7 @@ return result
document_to_ingest = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
data=b'Hello World!')
document_to_ingest.publish()
self.tic()
url = document_to_ingest.absolute_url() + '/getData'
......@@ -1649,7 +1670,7 @@ return result
document_to_ingest2 = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
data=b'Hello World!')
document_to_ingest2.publish()
self.tic()
url2 = document_to_ingest2.absolute_url() + '/getData'
......@@ -1685,7 +1706,7 @@ context.setReference(reference)
document_to_ingest = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
data=b'Hello World!')
document_to_ingest.publish()
self.tic()
url = document_to_ingest.absolute_url() + '/getData'
......@@ -1707,7 +1728,7 @@ context.setReference(reference)
document_to_ingest2 = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
data=b'Hello World!')
document_to_ingest2.publish()
self.tic()
self.assertEqual(document_to_ingest2.getReference(),
......@@ -1737,6 +1758,7 @@ context.setReference(reference)
"""
input_script_id = 'Document_getPropertyDictFromContent'
python_code = """from Products.CMFCore.utils import getToolByName
import six
portal = context.getPortalObject()
information = context.getContentInformation()
......@@ -1745,7 +1767,7 @@ property_id_list = context.propertyIds()
for k, v in information.items():
key = k.lower()
if v:
if isinstance(v, unicode):
if six.PY2 and isinstance(v, unicode):
v = v.encode('utf-8')
if key in property_id_list:
if key == 'reference':
......@@ -1776,7 +1798,7 @@ return result
document_to_ingest = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
data=b'Hello World!')
document_to_ingest.publish()
self.tic()
url = document_to_ingest.absolute_url() + '/getData'
......@@ -1798,7 +1820,7 @@ return result
document_to_ingest2 = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
data=b'Hello World!')
document_to_ingest2.publish()
self.tic()
self.assertEqual(document_to_ingest2.getReference(),
......@@ -1828,6 +1850,7 @@ return result
"""
input_script_id = 'Document_getPropertyDictFromContent'
python_code = """from Products.CMFCore.utils import getToolByName
import six
portal = context.getPortalObject()
information = context.getContentInformation()
......@@ -1836,7 +1859,7 @@ property_id_list = context.propertyIds()
for k, v in information.items():
key = k.lower()
if v:
if isinstance(v, unicode):
if six.PY2 and isinstance(v, unicode):
v = v.encode('utf-8')
if key in property_id_list:
if key == 'reference':
......@@ -1865,7 +1888,7 @@ return result
document_to_ingest = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
data=b'Hello World!')
document_to_ingest.publish()
self.tic()
......@@ -1888,7 +1911,7 @@ return result
document_to_ingest2 = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
data=b'Hello World!')
document_to_ingest2.publish()
self.tic()
self.assertEqual(document_to_ingest2.getReference(),
......@@ -1915,8 +1938,8 @@ return result
as a application/octet-stream without explicit extension, become
a Spreadsheet ?
"""
path = makeFilePath('import_region_category.ods')
data = open(path, 'r').read()
with open(self.makeFilePath('import_region_category.ods'), 'rb') as f:
data = f.read()
document = self.portal.portal_contributions.newContent(filename='toto',
data=data,
......@@ -1935,7 +1958,7 @@ return result
module = self.portal.document_module
document = module.newContent(portal_type='File',
property_which_doesnot_exists='Foo',
data='Hello World!',
data=b'Hello World!',
filename='toto.txt')
document.publish()
self.tic()
......@@ -1950,7 +1973,7 @@ return result
self.assertEqual(new_doc.getTitle(), 'One title')
self.assertEqual(new_doc.getReference(), 'EFAA')
self.assertEqual(new_doc.getValidationState(), 'published')
self.assertEqual(new_doc.getData(), 'Hello World!')
self.assertEqual(new_doc.getData(), b'Hello World!')
# Migrate a document with url property
url = new_doc.absolute_url() + '/getData'
......@@ -1962,7 +1985,7 @@ return result
new_doc = document.migratePortalType('File')
self.assertEqual(new_doc.getPortalType(), 'File')
self.assertEqual(new_doc.asURL(), url)
self.assertEqual(new_doc.getData(), 'Hello World!')
self.assertEqual(new_doc.getData(), b'Hello World!')
self.assertEqual(new_doc.getValidationState(), 'submitted')
def test_ContributionTool_isURLIngestionPermitted(self):
......@@ -1996,8 +2019,8 @@ return result
def test_User_Portal_Type_parameter_is_honoured(self):
"""Check that given portal_type is always honoured
"""
path = makeFilePath('import_region_category.xls')
data = open(path, 'r').read()
with open(self.makeFilePath('import_region_category.xls'), 'rb') as f:
data = f.read()
document = self.portal.portal_contributions.newContent(
filename='import_region_category.xls',
......@@ -2014,8 +2037,8 @@ return result
def test_User_ID_parameter_is_honoured(self):
"""Check that given id is always honoured
"""
path = makeFilePath('import_region_category.xls')
data = open(path, 'r').read()
with open(self.makeFilePath('import_region_category.xls'), 'rb') as f:
data = f.read()
document = self.portal.portal_contributions.newContent(
id='this_id',
......@@ -2039,30 +2062,30 @@ return result
def test_newContent_trough_http(self):
filename = 'import_region_category.xls'
path = makeFilePath(filename)
data = open(path, 'r').read()
with open(self.makeFilePath(filename), 'rb') as f:
data = f.read()
reference = 'ITISAREFERENCE'
portal_url = self.portal.absolute_url()
url_split = urlparse.urlsplit(portal_url)
url_split = six.moves.urllib.parse.urlsplit(portal_url)
url_dict = dict(protocol=url_split[0],
hostname=url_split[1])
uri = '%(protocol)s://%(hostname)s' % url_dict
push_url = '%s%s/newContent' % (uri, self.portal.portal_contributions.getPath(),)
request = urllib2.Request(push_url, urllib.urlencode(
{'data': data,
request = six.moves.urllib.request.Request(push_url, str2bytes(six.moves.urllib.parse.urlencode(
{'data:bytes': data,
'filename': filename,
'reference': reference,
'disable_cookie_login__': 1,
}), headers={
})), headers={
'Authorization': 'Basic %s' %
bytes2str(base64.b64encode(str2bytes('%s:%s' % (self.manager_username, self.manager_password))))
})
# disable_cookie_login__ is required to force zope to raise Unauthorized (401)
# then HTTPDigestAuthHandler can perform HTTP Authentication
response = urllib2.urlopen(request)
self.assertEqual(response.getcode(), httplib.OK)
response = six.moves.urllib.request.urlopen(request)
self.assertEqual(response.getcode(), six.moves.http_client.OK)
self.tic()
document = self.portal.portal_catalog.getResultValue(portal_type='Spreadsheet',
reference=reference)
......@@ -2134,7 +2157,7 @@ class Base_contributeMixin:
version=None,
description=None,
attach_document_to_context=True,
file=makeFileUpload('TEST-en-002.odt'))
file=self.makeFileUpload('TEST-en-002.odt'))
self.assertEqual('Text', contributed_document.getPortalType())
self.tic()
document_list = person.getFollowUpRelatedValueList()
......@@ -2150,11 +2173,10 @@ class Base_contributeMixin:
Test contributing an empty file and attaching it to context.
"""
person = self.portal.person_module.newContent(portal_type='Person')
empty_file_upload = ZPublisher.HTTPRequest.FileUpload(FieldStorage(
fp=io.BytesIO(),
environ=dict(REQUEST_METHOD='PUT'),
headers={"content-disposition":
"attachment; filename=empty;"}))
class FileUpload(io.BytesIO):
filename = "empty"
headers = {}
empty_file_upload = FileUpload(b"")
contributed_document = person.Base_contribute(
portal_type=None,
......@@ -2179,7 +2201,7 @@ class Base_contributeMixin:
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
portal_type='PDF',
file=makeFileUpload('TEST-en-002.odt'))
file=self.makeFileUpload('TEST-en-002.odt'))
self.assertEqual('PDF', contributed_document.getPortalType())
def test_Base_contribute_input_parameter_dict(self):
......@@ -2188,7 +2210,7 @@ class Base_contributeMixin:
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
title='user supplied title',
file=makeFileUpload('TEST-en-002.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf'))
self.tic()
self.assertEqual('user supplied title', contributed_document.getTitle())
......@@ -2201,7 +2223,7 @@ class Base_contributeMixin:
# we use as_name, to prevent regular expression from detecting a
# reference during ingestion, so that we can upload multiple documents
# in one test.
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'draft')
contributed_document.setReference(None)
......@@ -2210,7 +2232,7 @@ class Base_contributeMixin:
contributed_document = person.Base_contribute(
publication_state='shared',
synchronous_metadata_discovery=False,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'shared')
contributed_document.setReference(None)
......@@ -2219,7 +2241,7 @@ class Base_contributeMixin:
contributed_document = person.Base_contribute(
publication_state='shared',
synchronous_metadata_discovery=True,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'shared')
contributed_document.setReference(None)
......@@ -2228,7 +2250,7 @@ class Base_contributeMixin:
contributed_document = person.Base_contribute(
publication_state='released',
synchronous_metadata_discovery=False,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'released')
contributed_document.setReference(None)
......@@ -2237,7 +2259,7 @@ class Base_contributeMixin:
contributed_document = person.Base_contribute(
publication_state='released',
synchronous_metadata_discovery=True,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'released')
contributed_document.setReference(None)
......@@ -2246,7 +2268,7 @@ class Base_contributeMixin:
contributed_document = person.Base_contribute(
synchronous_metadata_discovery=False,
publication_state='published',
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'published')
contributed_document.setReference(None)
......@@ -2255,7 +2277,7 @@ class Base_contributeMixin:
contributed_document = person.Base_contribute(
synchronous_metadata_discovery=True,
publication_state='published',
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'published')
......@@ -2275,7 +2297,7 @@ class Base_contributeMixin:
contributed_document = person.Base_contribute(
publication_state='shared',
synchronous_metadata_discovery=True,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'shared')
contributed_document.setReference(None)
......@@ -2284,14 +2306,14 @@ class Base_contributeMixin:
contributed_document = person.Base_contribute(
publication_state='shared',
synchronous_metadata_discovery=False,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'shared')
contributed_document.setReference(None)
contributed_document = person.Base_contribute(
publication_state=None,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'published')
......@@ -2315,10 +2337,10 @@ class TestBase_contributeWithSecurity(IngestionTestCase, Base_contributeMixin):
ret = person.Base_contribute(
redirect_to_context=True,
synchronous_metadata_discovery=True,
file=makeFileUpload('TEST-en-002.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf'))
self.assertIn(
('portal_status_message', 'PDF created successfully.'),
urlparse.parse_qsl(urlparse.urlparse(ret).query))
six.moves.urllib.parse.parse_qsl(six.moves.urllib.parse.urlparse(ret).query))
document, = self.portal.document_module.contentValues()
self.assertEqual(
......@@ -2333,10 +2355,10 @@ class TestBase_contributeWithSecurity(IngestionTestCase, Base_contributeMixin):
ret = person.Base_contribute(
redirect_to_context=True,
synchronous_metadata_discovery=True,
file=makeFileUpload('TEST-en-002.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf'))
self.assertIn(
('portal_status_message', 'PDF updated successfully.'),
urlparse.parse_qsl(urlparse.urlparse(ret).query))
six.moves.urllib.parse.parse_qsl(six.moves.urllib.parse.urlparse(ret).query))
document, = self.portal.document_module.contentValues()
self.assertEqual(
......@@ -2356,14 +2378,14 @@ class TestBase_contributeWithSecurity(IngestionTestCase, Base_contributeMixin):
person.Base_contribute(
redirect_to_context=True,
synchronous_metadata_discovery=synchronous_metadata_discovery,
file=makeFileUpload('TEST-en-002.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf'))
self.assertIn(
('portal_status_message',
'You are not allowed to update the existing document which has the same coordinates.'),
urlparse.parse_qsl(urlparse.urlparse(str(ctx.exception)).query))
six.moves.urllib.parse.parse_qsl(six.moves.urllib.parse.urlparse(str(ctx.exception)).query))
self.assertIn(
('portal_status_level', 'error'),
urlparse.parse_qsl(urlparse.urlparse(str(ctx.exception)).query))
six.moves.urllib.parse.parse_qsl(six.moves.urllib.parse.urlparse(str(ctx.exception)).query))
# document is not updated
self.assertEqual(document.getData(), b'')
......@@ -2374,7 +2396,7 @@ class TestBase_contributeWithSecurity(IngestionTestCase, Base_contributeMixin):
"You are not allowed to update the existing document which has the same coordinates"):
person.Base_contribute(
synchronous_metadata_discovery=synchronous_metadata_discovery,
file=makeFileUpload('TEST-en-002.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf'))
self.assertEqual(document.getData(), b'')
def test_Base_contribute_publication_state_unauthorized(self):
......@@ -2394,13 +2416,13 @@ class TestBase_contributeWithSecurity(IngestionTestCase, Base_contributeMixin):
publication_state='published',
redirect_to_context=True,
synchronous_metadata_discovery=True,
file=makeFileUpload('TEST-en-002.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf'))
self.assertIn(
('portal_status_message', 'You are not allowed to contribute document in that state.'),
urlparse.parse_qsl(urlparse.urlparse(str(ctx.exception)).query))
six.moves.urllib.parse.parse_qsl(six.moves.urllib.parse.urlparse(str(ctx.exception)).query))
self.assertIn(
('portal_status_level', 'error'),
urlparse.parse_qsl(urlparse.urlparse(str(ctx.exception)).query))
six.moves.urllib.parse.parse_qsl(six.moves.urllib.parse.urlparse(str(ctx.exception)).query))
# when using the script directly it's an error
with self.assertRaisesRegex(
......@@ -2409,7 +2431,7 @@ class TestBase_contributeWithSecurity(IngestionTestCase, Base_contributeMixin):
person.Base_contribute(
publication_state='published',
synchronous_metadata_discovery=True,
file=makeFileUpload('TEST-en-002.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf'))
# when using asynchronous metadata discovery, an error occurs in activity,
# but not document is published
......@@ -2417,7 +2439,7 @@ class TestBase_contributeWithSecurity(IngestionTestCase, Base_contributeMixin):
publication_state='published',
redirect_to_context=True,
synchronous_metadata_discovery=False,
file=makeFileUpload('TEST-en-002.pdf'))
file=self.makeFileUpload('TEST-en-002.pdf'))
with self.assertRaisesRegex(
Exception,
"Transition document_publication_workflow/publish unsupported"):
......
......@@ -29,11 +29,12 @@
import os
import unittest
from six.moves import cStringIO as StringIO
from io import BytesIO
from zipfile import ZipFile
from Products.ERP5Type.tests.utils import FileUpload
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import DummyLocalizer
from Products.ERP5Type.Utils import bytes2str
from Products.ERP5OOo.tests.utils import Validator
from Products.ERP5OOo.OOoUtils import OOoBuilder
......@@ -124,7 +125,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
self.assertEqual(200, response.getStatus())
ooo_builder = OOoBuilder(response.getBody())
styles_xml_body = ooo_builder.extract('styles.xml')
styles_xml_body = bytes2str(ooo_builder.extract('styles.xml'))
self.assertTrue(len(styles_xml_body) > 0)
# 'Style sheet ja' text is in the odt document header,
# and the header is in the 'styles.xml'.
......@@ -137,7 +138,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
response = self.publish('/' + self.getPortal().Dynamic_viewAsOdt.absolute_url(1))
self._validate(response.getBody())
ooo_builder = OOoBuilder(response.getBody())
styles_xml_body = ooo_builder.extract('styles.xml')
styles_xml_body = bytes2str(ooo_builder.extract('styles.xml'))
self.assertTrue(styles_xml_body.find('Style sheet en') > 0)
# 3. test a fail case, reset a not existed stylesheet
......@@ -150,7 +151,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
self.getPortal().Localizer.changeLanguage('en')
response = self.publish('/' + self.getPortal().Dynamic_viewAsOdt.absolute_url(1))
# then, it is not a zip stream
self.assertFalse(response.getBody().startswith('PK'))
self.assertFalse(response.getBody().startswith(b'PK'))
self.assertEqual(500, response.getStatus())
......@@ -177,7 +178,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
response.getHeader('content-disposition'))
self._validate(response.getBody())
ooo_builder = OOoBuilder(response.getBody())
styles_xml_body = ooo_builder.extract('styles.xml')
styles_xml_body = bytes2str(ooo_builder.extract('styles.xml'))
self.assertTrue(len(styles_xml_body) > 0)
self.assertTrue(styles_xml_body.find('Style sheet ja') > 0)
......@@ -189,7 +190,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
self.assertEqual(200, response.getStatus())
self._validate(response.getBody())
ooo_builder = OOoBuilder(response.getBody())
styles_xml_body = ooo_builder.extract('styles.xml')
styles_xml_body = bytes2str(ooo_builder.extract('styles.xml'))
self.assertTrue(len(styles_xml_body) > 0)
self.assertTrue(styles_xml_body.find('Style sheet en') > 0)
......@@ -198,7 +199,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
Static_viewAsOdt.doSettings(request, title='', xml_file_id='content.xml',
ooo_stylesheet='NotFound_getODTStyleSheet', script_name='')
response = self.publish('/' + self.getPortal().Static_viewAsOdt.absolute_url(1))
self.assertFalse(response.getBody().startswith('PK'))
self.assertFalse(response.getBody().startswith(b'PK'))
self.assertEqual(500, response.getStatus())
def test_include_img(self):
......@@ -235,14 +236,14 @@ return getattr(context, "%s_%s" % (parameter, current_language))
response.getHeader('content-type').split(';')[0])
self.assertEqual('attachment; filename="Base_viewIncludeImageAsOdt.odt"',
response.getHeader('content-disposition'))
cs = StringIO()
cs = BytesIO()
cs.write(body)
zip_document = ZipFile(cs)
picture_list = filter(lambda x: "Pictures" in x.filename,
zip_document.infolist())
self.assertNotEqual([], picture_list)
manifest = zip_document.read('META-INF/manifest.xml')
content = zip_document.read('content.xml')
manifest = bytes2str(zip_document.read('META-INF/manifest.xml'))
content = bytes2str(zip_document.read('content.xml'))
for picture in picture_list:
self.assertIn(picture.filename, manifest)
self.assertIn(picture.filename, content)
......
......@@ -38,17 +38,21 @@ from Products.ERP5OOo.OOoUtils import OOoParser
from DateTime import DateTime
import six
def makeFilePath(name):
return os.path.join(os.path.dirname(__file__), 'test_document', name)
def makeFileUpload(name):
path = makeFilePath(name)
return FileUpload(path, name)
class TestOOoImportMixin(ERP5TypeTestCase):
gender_base_cat_id = 'gender'
function_base_cat_id = 'function'
def makeFileUpload(self, name):
path = makeFilePath(name)
fu = FileUpload(path, name)
self.addCleanup(fu.close)
return fu
def afterSetUp(self):
"""
Initialize the ERP5 site.
......@@ -91,6 +95,7 @@ class TestOOoImportMixin(ERP5TypeTestCase):
self.portal.portal_categories.gender,
self.portal.portal_categories.region,
]:
parent.setLastId('0')
parent.deleteContent(list(parent.objectIds()))
self.tic()
......@@ -119,7 +124,7 @@ class TestOOoImport(TestOOoImportMixin):
## Basic steps
##################################
def stepImportRawDataFile(self, sequence=None, sequence_list=None, **kw):
f = makeFileUpload('import_data_list.ods')
f = self.makeFileUpload('import_data_list.ods')
person_module = self.getPortal().person_module
listbox=(
{ 'listbox_key': '001',
......@@ -285,7 +290,7 @@ class TestOOoImport(TestOOoImportMixin):
sorted([organisation_list[i].getEmailText() for i in range(num)]))
def stepImportFileNoMapping(self, sequence=None, sequence_list=None, **kw):
f = makeFileUpload('import_data_list.ods')
f = self.makeFileUpload('import_data_list.ods')
person_module = self.getPortal().person_module
listbox = (
......@@ -302,7 +307,7 @@ class TestOOoImport(TestOOoImportMixin):
'portal_status_message=Please%20Define%20a%20mapping.'))
def stepImportFileWithBlankLine(self, sequence=None, sequence_list=None, **kw):
f = makeFileUpload('import_data_list_blank_line.ods')
f = self.makeFileUpload('import_data_list_blank_line.ods')
person_module = self.getPortal().person_module
listbox=(
{ 'listbox_key': '001',
......@@ -317,7 +322,7 @@ class TestOOoImport(TestOOoImportMixin):
person_module.Base_importFile(import_file=f, listbox=listbox)
def stepImportFileWithCategory(self, sequence=None, sequence_list=None, **kw):
f = makeFileUpload('import_data_with_categories.ods')
f = self.makeFileUpload('import_data_with_categories.ods')
# create some regions
region = self.portal.portal_categories.region
europe = region.newContent(portal_type='Category',
......@@ -343,7 +348,7 @@ class TestOOoImport(TestOOoImportMixin):
person_module.Base_importFile(import_file=f, listbox=listbox)
def stepImportFileWithDates(self, sequence=None, sequence_list=None, **kw):
f = makeFileUpload('import_data_with_dates.ods')
f = self.makeFileUpload('import_data_with_dates.ods')
person_module = self.getPortal().person_module
listbox=(
{ 'listbox_key': '001',
......@@ -362,7 +367,7 @@ class TestOOoImport(TestOOoImportMixin):
This test make sure that either floats (1000,9), sientific numbers (1,00E+003)
or percentage (19%) are correctly imported .
"""
f = makeFileUpload('import_float_and_percentage.ods')
f = self.makeFileUpload('import_float_and_percentage.ods')
currency_module = self.getPortal().currency_module
listbox=(
{ 'listbox_key': '001',
......@@ -373,7 +378,7 @@ class TestOOoImport(TestOOoImportMixin):
currency_module.Base_importFile(import_file=f, listbox=listbox)
def stepImportOrganisation(self, sequence=None, sequence_list=None, **kw):
f = makeFileUpload('import_organisation_list.ods')
f = self.makeFileUpload('import_organisation_list.ods')
organisation_module = self.getPortal().organisation_module
listbox=(
{ 'listbox_key': '001',
......@@ -403,7 +408,7 @@ class TestOOoImport(TestOOoImportMixin):
user = user_folder.getUserById(user_name).__of__(user_folder)
newSecurityManager(None, user)
f = makeFileUpload('import_data_with_categories.ods')
f = self.makeFileUpload('import_data_with_categories.ods')
person_module = self.getPortal().person_module
listbox=(
{ 'listbox_key': '001',
......@@ -420,7 +425,7 @@ class TestOOoImport(TestOOoImportMixin):
person_module.Base_importFile(import_file=f, listbox=listbox)
def stepImportFileWithFreeText(self, sequence=None, sequence_list=None, **kw):
f = makeFileUpload('import_data_with_categories.ods')
f = self.makeFileUpload('import_data_with_categories.ods')
person_module = self.getPortal().person_module
listbox=(
{ 'listbox_key': '001',
......@@ -435,7 +440,7 @@ class TestOOoImport(TestOOoImportMixin):
person_module.Base_importFile(import_file=f, listbox=listbox)
def stepImportFileWithAccentuatedText(self, sequence=None, sequence_list=None, **kw):
f = makeFileUpload('import_data_accentuated_text.ods')
f = self.makeFileUpload('import_data_accentuated_text.ods')
person_module = self.getPortal().person_module
listbox=(
{ 'listbox_key': '001',
......@@ -450,7 +455,7 @@ class TestOOoImport(TestOOoImportMixin):
person_module.Base_importFile(import_file=f, listbox=listbox)
def stepImportXLSFile(self, sequence=None, sequence_list=None, **kw):
f = makeFileUpload('import_data_list.xls')
f = self.makeFileUpload('import_data_list.xls')
person_module = self.getPortal().person_module
listbox=(
{ 'listbox_key': '001',
......@@ -465,7 +470,7 @@ class TestOOoImport(TestOOoImportMixin):
person_module.Base_importFile(import_file=f, listbox=listbox)
def stepImportBigFile_1(self, sequence=None, sequence_list=None, **kw):
f = makeFileUpload('import_data_big_file_1.ods')
f = self.makeFileUpload('import_data_big_file_1.ods')
person_module = self.getPortal().person_module
listbox=(
{ 'listbox_key': '001',
......@@ -480,7 +485,7 @@ class TestOOoImport(TestOOoImportMixin):
person_module.Base_importFile(import_file=f, listbox=listbox)
def stepImportBigFile_2(self, sequence=None, sequence_list=None, **kw):
f = makeFileUpload('import_data_big_file_2.ods')
f = self.makeFileUpload('import_data_big_file_2.ods')
person_module = self.getPortal().person_module
listbox=(
{ 'listbox_key': '001',
......@@ -631,7 +636,7 @@ class TestOOoImport(TestOOoImportMixin):
def test_CategoryTool_importCategoryFile(self):
# tests simple use of CategoryTool_importCategoryFile script
self.portal.portal_categories.CategoryTool_importCategoryFile(
import_file=makeFileUpload('import_region_category.sxc'))
import_file=self.makeFileUpload('import_region_category.sxc'))
self.tic()
region = self.portal.portal_categories.region
self.assertEqual(2, len(region))
......@@ -651,7 +656,7 @@ class TestOOoImport(TestOOoImportMixin):
region.newContent(id='dummy_region')
self.tic()
self.portal.portal_categories.CategoryTool_importCategoryFile(
import_file=makeFileUpload('import_region_category.sxc'),
import_file=self.makeFileUpload('import_region_category.sxc'),
existing_category_list='delete')
self.tic()
self.assertEqual(2, len(region))
......@@ -674,7 +679,7 @@ class TestOOoImport(TestOOoImportMixin):
)
self.tic()
self.portal.portal_categories.CategoryTool_importCategoryFile(
import_file=makeFileUpload('import_region_category.sxc'),
import_file=self.makeFileUpload('import_region_category.sxc'),
existing_category_list='delete')
self.tic()
self.assertEqual(3, len(region))
......@@ -690,7 +695,7 @@ class TestOOoImport(TestOOoImportMixin):
)
self.tic()
self.portal.portal_categories.CategoryTool_importCategoryFile(
import_file=makeFileUpload('import_region_category.sxc'),
import_file=self.makeFileUpload('import_region_category.sxc'),
existing_category_list='force_delete')
self.tic()
self.assertEqual(2, len(region))
......@@ -702,7 +707,7 @@ class TestOOoImport(TestOOoImportMixin):
region.newContent(id='dummy_region')
self.tic()
self.portal.portal_categories.CategoryTool_importCategoryFile(
import_file=makeFileUpload('import_region_category.sxc'),
import_file=self.makeFileUpload('import_region_category.sxc'),
existing_category_list='expire')
self.tic()
self.assertEqual(3, len(region))
......@@ -720,7 +725,7 @@ class TestOOoImport(TestOOoImportMixin):
def test_CategoryTool_importCategoryFileXLS(self):
# tests that CategoryTool_importCategoryFile supports .xls files
self.portal.portal_categories.CategoryTool_importCategoryFile(
import_file=makeFileUpload('import_region_category.xls'))
import_file=self.makeFileUpload('import_region_category.xls'))
self.tic()
region = self.portal.portal_categories.region
self.assertEqual(2, len(region))
......@@ -736,7 +741,7 @@ class TestOOoImport(TestOOoImportMixin):
def test_CategoryTool_importCategoryFile_PathStars(self):
# tests CategoryTool_importCategoryFile with * in the paths columns
self.portal.portal_categories.CategoryTool_importCategoryFile(
import_file=makeFileUpload('import_region_category_path_stars.sxc'))
import_file=self.makeFileUpload('import_region_category_path_stars.sxc'))
self.tic()
region = self.portal.portal_categories.region
self.assertEqual(2, len(region))
......@@ -753,7 +758,7 @@ class TestOOoImport(TestOOoImportMixin):
# tests CategoryTool_importCategoryFile with * in the paths columns, and no
# ID column, and non ascii titles
self.portal.portal_categories.CategoryTool_importCategoryFile(
import_file=makeFileUpload(
import_file=self.makeFileUpload(
'import_region_category_path_stars_non_ascii.sxc'))
self.tic()
region = self.portal.portal_categories.region
......@@ -772,7 +777,7 @@ class TestOOoImport(TestOOoImportMixin):
# categories ID at different level (a good candidate for an acquisition
# bug)
self.portal.portal_categories.CategoryTool_importCategoryFile(
import_file=makeFileUpload('import_region_category_duplicate_ids.sxc'))
import_file=self.makeFileUpload('import_region_category_duplicate_ids.sxc'))
self.tic()
region = self.portal.portal_categories.region
self.assertEqual(1, len(region))
......@@ -786,7 +791,7 @@ class TestOOoImport(TestOOoImportMixin):
def test_Base_getCategoriesSpreadSheetMapping(self):
# test structure returned by Base_getCategoriesSpreadSheetMapping
mapping = self.portal.Base_getCategoriesSpreadSheetMapping(
import_file=makeFileUpload('import_region_category.sxc'))
import_file=self.makeFileUpload('import_region_category.sxc'))
self.assertTrue(isinstance(mapping, dict))
self.assertEqual(['region'], list(mapping.keys()))
region = mapping['region']
......@@ -816,7 +821,7 @@ class TestOOoImport(TestOOoImportMixin):
def test_Base_getCategoriesSpreadSheetMapping_DuplicateIdsAtSameLevel(self):
# tests Base_getCategoriesSpreadSheetMapping when a document contain same
# categories ID at the same level, in that case, a ValueError is raised
import_file = makeFileUpload(
import_file = self.makeFileUpload(
'import_region_category_duplicate_ids_same_level.sxc')
try:
self.portal.portal_categories.Base_getCategoriesSpreadSheetMapping(
......@@ -834,7 +839,7 @@ class TestOOoImport(TestOOoImportMixin):
def on_invalid_spreadsheet(message):
message_list.append(message)
import_file = makeFileUpload(
import_file = self.makeFileUpload(
'import_region_category_duplicate_ids_same_level.sxc')
self.portal.portal_categories.Base_getCategoriesSpreadSheetMapping(import_file,
invalid_spreadsheet_error_handler=on_invalid_spreadsheet)
......@@ -845,7 +850,7 @@ class TestOOoImport(TestOOoImportMixin):
def test_Base_getCategoriesSpreadSheetMapping_WrongHierarchy(self):
# tests Base_getCategoriesSpreadSheetMapping when the spreadsheet has an
# invalid hierarchy (#788)
import_file = makeFileUpload(
import_file = self.makeFileUpload(
'import_region_category_wrong_hierarchy.sxc')
try:
self.portal.portal_categories.Base_getCategoriesSpreadSheetMapping(
......@@ -859,7 +864,7 @@ class TestOOoImport(TestOOoImportMixin):
def test_Base_getCategoriesSpreadSheetMapping_MultiplePaths(self):
# If multiple paths is defined (for instance more than one * in paths
# columns), then it should be an error and the error must be reported
import_file = makeFileUpload(
import_file = self.makeFileUpload(
'import_region_category_multiple_paths.ods')
try:
self.portal.portal_categories.Base_getCategoriesSpreadSheetMapping(
......@@ -871,7 +876,7 @@ class TestOOoImport(TestOOoImportMixin):
def test_Base_getCategoriesSpreadSheetMapping_Id_is_reserved_property_name(self):
# tests Base_getCategoriesSpreadSheetMapping reserved property name are only test for path column, not all.
import_file = makeFileUpload(
import_file = self.makeFileUpload(
'import_region_category_with_reserved_id_in_title.sxc')
mapping = self.portal.portal_categories.Base_getCategoriesSpreadSheetMapping(
import_file=import_file)
......@@ -897,7 +902,7 @@ class TestOOoImport(TestOOoImportMixin):
return True
self.portal.portal_categories.Base_getCategoriesSpreadSheetMapping(
import_file=makeFileUpload('import_category_with_reserved_id_in_id.sxc'),
import_file=self.makeFileUpload('import_category_with_reserved_id_in_id.sxc'),
invalid_spreadsheet_error_handler=on_invalid_spreadsheet)
self.assertEqual(message_set, {
......@@ -916,7 +921,8 @@ class TestOOoImport(TestOOoImportMixin):
"""Test than OOoimport can parse a file with more than 40000 lines
"""
parser = OOoParser()
parser.openFile(open(makeFilePath('import_big_spreadsheet.ods'), 'rb'))
with open(makeFilePath('import_big_spreadsheet.ods'), 'rb') as f:
parser.openFile(f)
mapping = parser.getSpreadsheetsMapping()
not_ok = 1
for spread, values in six.iteritems(mapping):
......@@ -947,7 +953,7 @@ class TestOOoImportWeb(TestOOoImportMixin):
dummy_expired_region.expire()
self.tic()
self.portal.portal_categories.CategoryTool_importCategoryFile(
import_file=makeFileUpload('import_region_category.sxc'),
import_file=self.makeFileUpload('import_region_category.sxc'),
existing_category_list='expire')
self.tic()
self.assertEqual(4, len(region))
......
......@@ -43,7 +43,7 @@ class TestOOoParser(unittest.TestCase):
parser = OOoParser()
parser.openFile(open(makeFilePath('import_data_list.ods'), 'rb'))
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Person'], mapping.keys())
self.assertEqual(['Person'], list(mapping.keys()))
person_mapping = mapping['Person']
self.assertTrue(isinstance(person_mapping, list))
self.assertTrue(102, len(person_mapping))
......@@ -57,14 +57,14 @@ class TestOOoParser(unittest.TestCase):
with open(makeFilePath('import_data_list.ods'), 'rb') as f:
parser.openFromBytes(f.read())
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Person'], mapping.keys())
self.assertEqual(['Person'], list(mapping.keys()))
def test_getSpreadSheetMappingStyle(self):
parser = OOoParser()
with open(makeFilePath('import_data_list_with_style.ods'), 'rb') as f:
parser.openFile(f)
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Feuille1'], mapping.keys())
self.assertEqual(['Feuille1'], list(mapping.keys()))
self.assertEqual(mapping['Feuille1'][1],
['a line with style'])
self.assertEqual(mapping['Feuille1'][2],
......@@ -79,7 +79,7 @@ class TestOOoParser(unittest.TestCase):
with open(makeFilePath('import_data_list_data_type.ods'), 'rb') as f:
parser.openFile(f)
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Feuille1'], mapping.keys())
self.assertEqual(['Feuille1'], list(mapping.keys()))
self.assertEqual(mapping['Feuille1'][0],
['1234.5678'])
self.assertEqual(mapping['Feuille1'][1],
......@@ -112,7 +112,7 @@ class TestOOoParser(unittest.TestCase):
parser = OOoParser()
parser.openFile(open(makeFilePath('complex_text.ods'), 'rb'))
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Feuille1'], mapping.keys())
self.assertEqual(['Feuille1'], list(mapping.keys()))
self.assertEqual(mapping['Feuille1'][0], [' leading space'])
self.assertEqual(mapping['Feuille1'][1], [' leading space'])
self.assertEqual(mapping['Feuille1'][2], ['tab\t'])
......@@ -122,7 +122,7 @@ class TestOOoParser(unittest.TestCase):
parser = OOoParser()
parser.openFile(open(makeFilePath('empty_cells.ods'), 'rb'))
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Feuille1'], mapping.keys())
self.assertEqual(['Feuille1'], list(mapping.keys()))
self.assertEqual(mapping['Feuille1'],
[
['A1', None, 'C1'],
......
......@@ -34,12 +34,12 @@ from Products.ERP5Form.Selection import Selection
from Testing import ZopeTestCase
from DateTime import DateTime
from Products.ERP5OOo.tests.utils import Validator
import httplib
import six.moves.http_client
import lxml.html
import mock
import PyPDF2
HTTP_OK = httplib.OK
HTTP_OK = six.moves.http_client.OK
# setting this to True allows the .publish() calls to provide tracebacks
debug = False
......@@ -700,7 +700,7 @@ class TestOOoStyle(ERP5TypeTestCase, ZopeTestCase.Functional):
self.assertEqual('text/html;charset=utf-8', content_type.lower())
self.assertFalse(response.getHeader('content-disposition'))
# Simplistic assertion that we are viewing the ODF XML source
self.assertIn('office:document-content', response.getBody())
self.assertIn(b'office:document-content', response.getBody())
def test_form_list_ZMI(self):
"""We can edit form_list in the ZMI."""
......@@ -710,7 +710,7 @@ class TestOOoStyle(ERP5TypeTestCase, ZopeTestCase.Functional):
content_type = response.getHeader('content-type')
self.assertEqual('text/html;charset=utf-8', content_type.lower())
self.assertFalse(response.getHeader('content-disposition'))
self.assertIn('office:document-content', response.getBody())
self.assertIn(b'office:document-content', response.getBody())
def test_report_view_ZMI(self):
"""We can edit report_view in the ZMI."""
......@@ -720,7 +720,8 @@ class TestOOoStyle(ERP5TypeTestCase, ZopeTestCase.Functional):
content_type = response.getHeader('content-type')
self.assertEqual('text/html;charset=utf-8', content_type.lower())
self.assertFalse(response.getHeader('content-disposition'))
self.assertIn('office:document-content', response.getBody())
self.assertIn(b'office:document-content', response.getBody())
class TestODTStyle(TestOOoStyle):
skin = 'ODT'
......
......@@ -41,6 +41,7 @@ import zipfile
import subprocess
from six.moves import urllib
from six.moves import cStringIO as StringIO
from io import BytesIO
try:
import lxml
......@@ -65,14 +66,14 @@ if lxml:
def validate(self, odf_file_content):
error_list = []
odf_file = StringIO(odf_file_content)
odf_file = BytesIO(odf_file_content)
for f in ('content.xml', 'meta.xml', 'styles.xml', 'settings.xml'):
error_list.extend(self._validateXML(odf_file, f))
return error_list
def _validateXML(self, odf_file, content_file_name):
zfd = zipfile.ZipFile(odf_file)
doc = lxml.etree.parse(StringIO(zfd.read(content_file_name)))
doc = lxml.etree.parse(BytesIO(zfd.read(content_file_name)))
return []
# The following is the past implementation that validates with
# RelaxNG schema. But recent LibreOffice uses extended odf
......
......@@ -34,9 +34,11 @@ import mock
import itertools
import transaction
import unittest
import urlparse
import six
from six.moves.urllib.parse import urlparse, parse_qs
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript
from Products.ERP5Type.Utils import bytes2str
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl import SpecialUsers
......@@ -57,8 +59,10 @@ class UserManagementTestCase(ERP5TypeTestCase):
"""TestCase for user manement, with utilities to create users and helpers
assertion methods.
"""
if six.PY2:
_login_generator = itertools.count().next
else:
_login_generator = itertools.count().__next__
def getBusinessTemplateList(self):
"""List of BT to install. """
......@@ -742,12 +746,12 @@ class TestPreferences(UserManagementTestCase):
current_password='bad' + password,
new_password=new_password,
)
parsed_url = urlparse.urlparse(result)
parsed_url = urlparse(result)
self.assertEqual(
parsed_url.path.split('/')[-2:],
['portal_preferences', 'PreferenceTool_viewChangePasswordDialog'])
self.assertEqual(
urlparse.parse_qs(parsed_url.query),
parse_qs(parsed_url.query),
{'portal_status_message': ['Current password is wrong.'], 'portal_status_level': ['error']})
self.login()
......@@ -1202,8 +1206,8 @@ class TestUserManagementExternalAuthentication(TestUserManagement):
# view front page we should be logged in if we use authentication key
response = self.publish(base_url, env={self.user_id_key.replace('-', '_').upper(): login})
self.assertEqual(response.getStatus(), 200)
self.assertIn('Logged In', response.getBody())
self.assertIn(login, response.getBody())
self.assertIn('Logged In', bytes2str(response.getBody()))
self.assertIn(login, bytes2str(response.getBody()))
class _TestLocalRoleManagementMixIn(object):
......@@ -1543,7 +1547,7 @@ class _TestKeyAuthenticationMixIn(object):
# view front page we should be logged in if we use authentication key
response = self.publish('%s?__ac_key=%s' %(base_url, key))
self.assertEqual(response.getStatus(), 200)
self.assertIn(reference, response.getBody())
self.assertIn(reference, bytes2str(response.getBody()))
# check if key authentication works other page than front page
person_module = portal.person_module
......@@ -1554,7 +1558,7 @@ class _TestKeyAuthenticationMixIn(object):
self.assertTrue('%s/login_form?came_from=' % portal.getId(), response.headers['location'])
response = self.publish('%s?__ac_key=%s' %(base_url, key))
self.assertEqual(response.getStatus(), 200)
self.assertIn(reference, response.getBody())
self.assertIn(reference, bytes2str(response.getBody()))
# check if key authentication works with web_mode too
web_site = portal.web_site_module.newContent(portal_type='Web Site')
......
......@@ -42,7 +42,7 @@ from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions.ExceptionFormatter import format_exception
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.runUnitTest import log_directory
from Products.ERP5Type.Utils import stopProcess, PR_SET_PDEATHSIG
from Products.ERP5Type.Utils import stopProcess, PR_SET_PDEATHSIG, unicode2str
from lxml import etree
from lxml.html.builder import E
import certifi
......@@ -397,7 +397,7 @@ class FunctionalTestRunner:
for test_tr in test_table.xpath('.//tr[contains(@class, "status_failed")]'):
test_tr.set('style', 'background-color: red;')
details_attribute_dict = {}
if etree.tostring(test_table).find("expected failure") != -1:
if u"expected failure" in etree.tostring(test_table, encoding="unicode"):
expected_failure_amount += 1
else:
failure_amount += 1
......@@ -405,7 +405,7 @@ class FunctionalTestRunner:
details_attribute_dict['open'] = 'true'
detail_element = E.div()
detail_element.append(E.details(E.summary(test_name), test_table, **details_attribute_dict))
detail += etree.tostring(detail_element)
detail += unicode2str(etree.tostring(detail_element, encoding="unicode"))
tr_count += 1
success_amount = tr_count - 1 - failure_amount - expected_failure_amount
if detail:
......
......@@ -253,7 +253,7 @@ class ERP5TypeTestReLoader(ERP5TypeTestLoader):
def runLiveTest(test_list, verbosity=1, stream=None, request_server_url=None, **kw):
from Products.ERP5Type.tests.runUnitTest import DebugTestResult
from StringIO import StringIO
from six.moves import StringIO
# Add path of the TestTemplateItem folder of the instance
path = kw.get('path', None)
if path is not None and path not in sys.path:
......
......@@ -9,7 +9,7 @@ __version__ = '0.3.0'
import base64
import errno
import httplib
from six.moves import http_client
import os
import random
import re
......@@ -18,13 +18,12 @@ import string
import sys
import time
import traceback
import urllib
import ConfigParser
from six.moves import configparser
from contextlib import contextmanager
from io import BytesIO
from functools import partial
from six.moves.urllib.parse import unquote_to_bytes
from cPickle import dumps
from six.moves.cPickle import dumps
from glob import glob
from hashlib import md5
from warnings import warn
......@@ -32,10 +31,11 @@ from DateTime import DateTime
import mock
import Products.ZMySQLDA.DA
from Products.ZMySQLDA.DA import Connection as ZMySQLDA_Connection
from zope.globalrequest import clearRequest
from zope.globalrequest import getRequest
from zope.globalrequest import setRequest
import six
if six.PY3:
StandardError = Exception
from zope.component.hooks import setSite
......@@ -47,7 +47,7 @@ from Products.PythonScripts.PythonScript import PythonScript
from Products.ERP5Type.Accessor.Constant import PropertyGetter as ConstantGetter
from Products.ERP5Form.PreferenceTool import Priority
from zLOG import LOG, DEBUG
from Products.ERP5Type.Utils import convertToUpperCase, str2bytes
from Products.ERP5Type.Utils import convertToUpperCase, bytes2str, str2bytes
from Products.ERP5Type.tests.backportUnittest import SetupSiteError
from Products.ERP5Type.tests.utils import addUserToDeveloperRole
from Products.ERP5Type.tests.utils import parseListeningAddress
......@@ -154,7 +154,7 @@ def _createTestPromiseConfigurationFile(promise_path, bt5_repository_path_list=N
_getVolatileMemcachedServerDict()
cloudooo_url_list = _getConversionServerUrlList()
promise_config = ConfigParser.RawConfigParser()
promise_config = configparser.RawConfigParser()
promise_config.add_section('external_service')
promise_config.set('external_service', 'cloudooo_url_list', cloudooo_url_list)
promise_config.set('external_service', 'memcached_url',memcached_url)
......@@ -170,7 +170,8 @@ def _createTestPromiseConfigurationFile(promise_path, bt5_repository_path_list=N
promise_config.set('portal_certificate_authority', 'certificate_authority_path',
os.environ['TEST_CA_PATH'])
promise_config.write(open(promise_path, 'w'))
with open(promise_path, 'w') as f:
promise_config.write(f)
def profile_if_environ(environment_var_name):
if int(os.environ.get(environment_var_name, 0)):
......@@ -929,7 +930,7 @@ class ERP5TypeCommandLineTestCase(ERP5TypeTestCaseMixin):
forced_portal_id = os.environ.get('erp5_tests_portal_id')
if forced_portal_id:
return str(forced_portal_id)
m = md5(repr(self.getBusinessTemplateList()) + self.getTitle())
m = md5(str2bytes(repr(self.getBusinessTemplateList()) + self.getTitle()))
return portal_name + '_' + m.hexdigest()
def getPortal(self):
......@@ -1498,7 +1499,7 @@ class ZEOServerTestCase(ERP5TypeTestCase):
if e[0] != errno.EADDRINUSE:
raise
if zeo_client:
os.write(zeo_client, repr(host_port))
os.write(zeo_client, str2bytes(repr(host_port)))
os.close(zeo_client)
ZopeTestCase._print("\nZEO Storage started at %s:%s ... " % host_port)
......@@ -1571,7 +1572,7 @@ def optimize():
PythonScript._compile = _compile
PythonScript_exec = PythonScript._exec
def _exec(self, *args):
self.func_code # trigger compilation if needed
self.__code__ # trigger compilation if needed
return PythonScript_exec(self, *args)
PythonScript._exec = _exec
from Acquisition import aq_parent
......
......@@ -7,6 +7,7 @@ from asyncore import socket_map
from ZODB.DemoStorage import DemoStorage
from ZODB.FileStorage import FileStorage
from Products.ERP5Type.tests.utils import getMySQLArguments, instance_random
from six.moves import range
def _print(message):
sys.stderr.write(message + "\n")
......@@ -99,7 +100,7 @@ def fork():
def forkNodes():
global node_pid_list
for i in xrange(1, activity_node):
for i in range(1, activity_node):
pid = fork()
if not pid:
node_pid_list = None
......@@ -122,7 +123,7 @@ if neo_storage:
storage_count = 2
if load or save:
db_list = [os.path.join(instance_home, 'var', 'neo%u.sqlite' % i)
for i in xrange(1, storage_count+1)]
for i in range(1, storage_count+1)]
else:
db_list = [None] * storage_count
cwd = os.getcwd()
......
......@@ -154,7 +154,7 @@ def main():
assert revision == test_result.revision, (revision, test_result.revision)
while suite.acquire():
test = test_result.start(suite.running.keys())
test = test_result.start(list(suite.running.keys()))
if test is not None:
suite.start(test.name, lambda status_dict, __test=test:
__test.stop(**status_dict))
......
......@@ -300,7 +300,7 @@ class ERP5TypeTestLoader(unittest.TestLoader):
def _importZodbTestComponent(self, name):
import erp5.component.test
module = __import__('erp5.component.test.' + name,
fromlist=['erp5.component.test'],
fromlist=['erp5.component.test'] if six.PY2 else ['erp5'],
level=0)
try:
self._test_component_ref_list.append(module)
......
......@@ -1454,7 +1454,7 @@ class TestZodbModuleComponent(SecurityTestCase):
def afterSetUp(self):
self._component_tool = self.portal.portal_components
self._module = __import__(self._document_class._getDynamicModuleNamespace(),
fromlist=['erp5.component'])
fromlist=['erp5.component'] if six.PY2 else ['erp5'])
self._component_tool.reset(force=True,
reset_portal_type_at_transaction_boundary=True)
......@@ -1524,7 +1524,10 @@ class TestZodbModuleComponent(SecurityTestCase):
if expected_default_version is not None:
top_module_name = self._document_class._getDynamicModuleNamespace()
top_module = __import__(top_module_name, level=0, fromlist=[top_module_name])
top_module = __import__(
top_module_name,
level=0,
fromlist=[top_module_name] if six.PY2 else ['erp5'])
# The module must be available in its default version
self.assertHasAttribute(top_module, expected_default_version)
......@@ -1555,7 +1558,8 @@ class TestZodbModuleComponent(SecurityTestCase):
module_name = self._getComponentFullModuleName(module_name)
module = __import__(
module_name,
fromlist=[self._document_class._getDynamicModuleNamespace()],
fromlist=[self._document_class._getDynamicModuleNamespace()]
if six.PY2 else ['erp5'],
level=0)
self.assertIn(module_name, sys.modules)
return module
......@@ -2048,7 +2052,7 @@ def bar(*args, **kwargs):
self.assertModuleImportable('erp5_version.%s' % imported_reference)
top_module = __import__(top_module_name, level=0,
fromlist=[top_module_name])
fromlist=[top_module_name] if six.PY2 else ['erp5'])
self._importModule('erp5_version.%s' % imported_reference)
......@@ -2111,7 +2115,7 @@ def function_foo(*args, **kwargs):
top_module_name = self._document_class._getDynamicModuleNamespace()
top_module = __import__(top_module_name, level=0,
fromlist=[top_module_name])
fromlist=[top_module_name] if six.PY2 else ['erp5'])
self._importModule(reference)
module = getattr(top_module, reference)
......@@ -2742,7 +2746,7 @@ foobar = foobar().f
base = self.portal.getPath()
for query in 'x:int=-24&y:int=66', 'x:int=41':
path = '%s/TestExternalMethod?%s' % (base, query)
self.assertEqual(self.publish(path).getBody(), '42')
self.assertEqual(self.publish(path).getBody(), b'42')
# Test from a Python Script
createZODBPythonScript(self.portal.portal_skins.custom,
......@@ -2787,7 +2791,7 @@ def foobar(self, a, b="portal_type"):
cfg.extensions = tempfile.mkdtemp()
try:
with open(os.path.join(cfg.extensions, module + '.py'), "w") as f:
f.write("foobar = lambda **kw: sorted(kw.iteritems())")
f.write("foobar = lambda **kw: sorted(kw.items())")
self.assertEqual(external_method(z=1, a=0), [('a', 0), ('z', 1)])
finally:
shutil.rmtree(cfg.extensions)
......@@ -2878,8 +2882,8 @@ class TestWithImport(TestImported):
from ITestGC import ITestGC
import zope.interface
@zope.interface.implementer(ITestGC)
class TestGC(XMLObject):
zope.interface.implements(ITestGC)
def foo(self):
pass
""")
......@@ -2910,12 +2914,15 @@ class TestGC(XMLObject):
self.assertEqual(gc.garbage, [])
import erp5.component
gc.set_debug(
gc.DEBUG_STATS |
gc.DEBUG_UNCOLLECTABLE |
gc.DEBUG_COLLECTABLE |
gc.DEBUG_OBJECTS |
gc.DEBUG_INSTANCES)
debug_flags = (
gc.DEBUG_STATS
| gc.DEBUG_UNCOLLECTABLE
| gc.DEBUG_COLLECTABLE )
if six.PY2:
debug_flags |= (
gc.DEBUG_OBJECTS
| gc.DEBUG_INSTANCES)
gc.set_debug(debug_flags)
sys.stderr = stderr
# Still not garbage collectable as RefManager still keeps a reference
erp5.component.ref_manager.clear()
......@@ -2998,11 +3005,11 @@ from erp5.component.document.Person import Person
from ITestPortalType import ITestPortalType
import zope.interface
@zope.interface.implementer(ITestPortalType)
class TestPortalType(Person):
def test42(self):
return 42
zope.interface.implements(ITestPortalType)
def foo(self):
pass
""")
......@@ -3194,7 +3201,7 @@ InitializeClass(%(class_name)s)
'%s/manage_addProduct/ERP5/manage_addToolForm' % self.portal.getPath(),
'%s:%s' % (self.manager_username, self.manager_password))
self.assertEqual(response.getStatus(), 200)
self.assertNotIn('ERP5 Test Hook After Load Tool', response.getBody())
self.assertNotIn(b'ERP5 Test Hook After Load Tool', response.getBody())
component.validate()
self.tic()
......@@ -3206,7 +3213,7 @@ InitializeClass(%(class_name)s)
'%s/manage_addProduct/ERP5/manage_addToolForm' % self.portal.getPath(),
'%s:%s' % (self.manager_username, self.manager_password))
self.assertEqual(response.getStatus(), 200)
self.assertIn('ERP5 Test Hook After Load Tool', response.getBody())
self.assertIn(b'ERP5 Test Hook After Load Tool', response.getBody())
from Products.ERP5Type.Core.TestComponent import TestComponent
......@@ -3233,13 +3240,11 @@ class Test(ERP5TypeTestCase):
"""
Dummy mail host has already been set up when running tests
"""
pass
def _restoreMailHost(self):
"""
Dummy mail host has already been set up when running tests
"""
pass
def test_01_sampleTest(self):
self.assertEqual(0, 0)
......@@ -3335,7 +3340,7 @@ class Test(ERP5TypeTestCase):
reset_portal_type_at_transaction_boundary=True)
output = runLiveTest('testRunLiveTest')
expected_msg_re = re.compile('Ran 2 tests.*FAILED \(failures=1\)', re.DOTALL)
expected_msg_re = re.compile(r'Ran 2 tests.*FAILED \(failures=1\)', re.DOTALL)
self.assertRegex(output, expected_msg_re)
# Now try addCleanup
......@@ -3411,18 +3416,29 @@ break_at_import()
return self._component_tool.readTestOutput()
output = runLiveTest('testRunLiveTestImportError')
self.assertIn('''
if six.PY2:
expected_output = '''
File "<portal_components/test.erp5.testRunLiveTestImportError>", line 4, in <module>
break_at_import()
File "<portal_components/test.erp5.testRunLiveTestImportError>", line 3, in break_at_import
import non.existing.module # pylint:disable=import-error
ImportError: No module named non.existing.module
''', output)
'''
else:
expected_output = '''
File "<portal_components/test.erp5.testRunLiveTestImportError>", line 4, in <module>
break_at_import()
File "<portal_components/test.erp5.testRunLiveTestImportError>", line 3, in break_at_import
import non.existing.module # pylint:disable=import-error
ModuleNotFoundError: No module named 'non'
'''
self.assertIn(expected_output, output)
output = runLiveTest('testDoesNotExist_import_error_because_module_does_not_exist')
self.assertIn(
"ImportError: No module named testDoesNotExist_import_error_because_module_does_not_exist",
output)
if six.PY2:
expected_output = "ImportError: No module named testDoesNotExist_import_error_because_module_does_not_exist"
else:
expected_output = "ModuleNotFoundError: No module named 'testDoesNotExist_import_error_because_module_does_not_exist'"
self.assertIn(expected_output, output)
def testERP5Broken(self):
# Create a broken ghost object
......@@ -3430,9 +3446,9 @@ ImportError: No module named non.existing.module
name = self._testMethodName
types_tool = self.portal.portal_types
ptype = types_tool.newContent(name, type_class="File", portal_type='Base Type')
file = ptype.constructInstance(self.portal, name, data="foo")
file = ptype.constructInstance(self.portal, name, data=b"foo")
file_uid = file.getUid()
self.assertEqual(file.size, len("foo"))
self.assertEqual(file.size, len(b"foo"))
self.commit()
try:
self.portal._p_jar.cacheMinimize()
......@@ -3448,7 +3464,7 @@ ImportError: No module named non.existing.module
# Check that the class is unghosted before resolving __setattr__
self.assertRaises(BrokenModified, setattr, file, "size", 0)
self.assertIsInstance(file, ERP5BaseBroken)
self.assertEqual(file.size, len("foo"))
self.assertEqual(file.size, len(b"foo"))
# Now if we repair the portal type definition, instances will
# no longer be broken and be modifiable again.
......@@ -3458,9 +3474,9 @@ ImportError: No module named non.existing.module
file = self.portal[name]
self.assertNotIsInstance(file, ERP5BaseBroken)
self.assertEqual(file.getUid(), file_uid)
self.assertEqual(file.getData(), "foo")
file.setData("something else")
self.assertEqual(file.getData(), "something else")
self.assertEqual(file.getData(), b"foo")
file.setData(b"something else")
self.assertEqual(file.getData(), b"something else")
self.assertNotIn("__Broken_state__", file.__dict__)
finally:
self.portal._delObject(name)
......@@ -3660,6 +3676,8 @@ class TestZodbDocumentComponentReload(ERP5TypeTestCase):
component = self.portal.portal_components['document.erp5.BusinessProcess']
component.setTextContent(value)
self.tic()
self.assertEqual(component.checkConsistency(), [])
self.assertEqual(component.getValidationState(), 'validated')
def testAsComposedDocumentCacheIsCorrectlyFlushed(self):
component = self.portal.portal_components['document.erp5.BusinessProcess']
......
......@@ -34,6 +34,7 @@ of Portal Type as Classes and ZODB Components
import pickle
import unittest
import warnings
import six
from Acquisition import aq_base
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from AccessControl.ZopeGuards import guarded_import
......@@ -189,6 +190,9 @@ class TestERP5Type(ERP5TypeTestCase, LogInterceptor):
not_ok = NotOk().__of__(doc)
self.assertRaises(ValueError, getattr, not_ok, 'attr')
if six.PY3:
self.assertRaises(ValueError, hasattr, not_ok, 'attr')
else:
self.assertFalse(hasattr(not_ok, 'attr'))
def test_renameObjectsReindexSubobjects(self):
......
......@@ -27,10 +27,10 @@
##############################################################################
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type import WITH_LEGACY_WORKFLOW
import StringIO
from six.moves import cStringIO as StringIO
import unittest
import urllib
import httplib
from six.moves.urllib.parse import urlencode
import six.moves.http_client
class TestUpgradeInstanceWithOldDataFsWithLegacyWorkflow(ERP5TypeTestCase):
......@@ -120,7 +120,7 @@ class TestUpgradeInstanceWithOldDataFsWithLegacyWorkflow(ERP5TypeTestCase):
ret = self.publish(
'%s/portal_alarms/promise_check_upgrade' % self.portal.getPath(),
basic='%s:current' % self.id(),
stdin=StringIO.StringIO(urllib.urlencode({
stdin=StringIO(urlencode({
'Base_callDialogMethod:method': '',
'dialog_id': 'Alarm_viewSolveDialog',
'dialog_method': 'Alarm_solve',
......@@ -130,7 +130,7 @@ class TestUpgradeInstanceWithOldDataFsWithLegacyWorkflow(ERP5TypeTestCase):
request_method="POST",
handle_errors=False
)
self.assertEqual(httplib.FOUND, ret.getStatus())
self.assertEqual(six.moves.http_client.FOUND, ret.getStatus())
alarm.Alarm_solve()
......
......@@ -397,7 +397,7 @@ def parseListeningAddress(host_port=None, default_host='127.0.0.1'):
m = 499 # must be a prime number
x = instance_random.randrange(0, m)
c = instance_random.randrange(1, m)
for i in xrange(m):
for i in range(m):
yield default_host, 55000 + x
x = (x + c) % m
raise RuntimeError("Can't find free port (tried ports %u to %u)\n"
......@@ -591,7 +591,7 @@ def updateCellList(portal, line, cell_type, cell_range_method, cell_dict_list):
def getSortedCategoryList(line, base_id, category_list):
result = []
index_list = line.index[base_id].keys()
index_list = list(line.index[base_id].keys())
index_list.sort()
for category in category_list:
for index in index_list:
......@@ -668,7 +668,7 @@ def updateCellList(portal, line, cell_type, cell_range_method, cell_dict_list):
*category_list)
cell.edit(**mapped_value_dict)
cell.setMappedValuePropertyList(mapped_value_dict.keys())
cell.setMappedValuePropertyList(list(mapped_value_dict.keys()))
base_category_list = [category_path
for category_path in category_list
......
......@@ -56,11 +56,11 @@ class StringValidatorTestCase(ValidatorTestCase):
self.assertEqual('<html>', result)
def test_encoding(self):
utf8_string = 'M\303\274ller' # this is a M&uuml;ller
unicode_string = unicode(utf8_string, 'utf-8')
utf8_bytes = b'M\303\274ller' # this is a M&uuml;ller
unicode_string = utf8_bytes.decode('utf-8')
result = self.v.validate(
TestField('f', max_length=0, truncate=0, required=0, unicode=1),
'f', {'f' : utf8_string})
'f', {'f' : utf8_bytes})
self.assertEqual(unicode_string, result)
def test_strip_whitespace(self):
......
......@@ -36,8 +36,10 @@ class FakeRequest:
def clear(self):
self.dict.clear()
def __nonzero__(self):
return 0
def __bool__(self):
return False
if six.PY2:
__nonzero__ = __bool__
class SerializeTestCase(unittest.TestCase):
def test_simpleSerialize(self):
......
......@@ -26,6 +26,7 @@ from unittest import expectedFailure
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.PythonScripts.PythonScript import PythonScript
import six
from six.moves import range
class HBTreeFolder2Tests(ERP5TypeTestCase):
......
......@@ -102,7 +102,7 @@ class TestDeferredConnection(ERP5TypeTestCase):
Check that a basic query succeeds.
"""
connection = self.getDeferredConnection()
connection.query('REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
connection.query(b'REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
try:
self.commit()
except OperationalError:
......@@ -119,7 +119,7 @@ class TestDeferredConnection(ERP5TypeTestCase):
"""
connection = self.getDeferredConnection()
# Queue a query
connection.query('REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
connection.query(b'REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
# Replace dynamically the function used to send queries to mysql so it's
# dumber than the implemented one.
self.monkeypatchConnection(connection)
......@@ -128,7 +128,7 @@ class TestDeferredConnection(ERP5TypeTestCase):
try:
self.commit()
except OperationalError as m:
if m[0] not in hosed_connection:
if m.args[0] not in hosed_connection:
raise
else:
self.fail()
......@@ -144,7 +144,7 @@ class TestDeferredConnection(ERP5TypeTestCase):
"""
connection = self.getDeferredConnection()
# Queue a query
connection.query('REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
connection.query(b'REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
# Artificially cause a connection close.
self.monkeypatchConnection(connection)
try:
......@@ -160,10 +160,10 @@ class TestDeferredConnection(ERP5TypeTestCase):
"""
connection = self.getDeferredConnection()
# Queue a query
connection.query('REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
connection.query(b'REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
self.assertEqual(len(connection._sql_string_list), 1)
self.commit()
connection.query('REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
connection.query(b'REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
self.assertEqual(len(connection._sql_string_list), 1)
if __name__ == '__main__':
......
......@@ -40,6 +40,7 @@ from Products.ZSQLCatalog.Query.RelatedQuery import RelatedQuery
from DateTime import DateTime
from Products.ZSQLCatalog.SQLExpression import MergeConflictError
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.Utils import ensure_list
import six
from AccessControl.ZopeGuards import guarded_getattr
......@@ -101,7 +102,7 @@ class ReferenceQuery:
else:
self.args.append(arg)
if len(kw) == 1:
self.column, value = kw.items()[0]
self.column, value = ensure_list(kw.items())[0]
if not isinstance(value, MatchList):
value = MatchList([value])
self.value = value
......@@ -120,7 +121,7 @@ class ReferenceQuery:
return False
other_query_list = other.query_list[:]
for subquery in self.args:
for other_query_id in xrange(len(other_query_list)):
for other_query_id in range(len(other_query_list)):
other_query = other_query_list[other_query_id]
if subquery == other_query:
other_query_list.pop(other_query_id)
......@@ -265,12 +266,10 @@ class TestSQLCatalog(ERP5TypeTestCase):
self.assertRaises(exception, self._catalog, src__=1, query_table='foo', **kw)
def catalog(self, reference_tree, kw, check_search_text=True,
check_select_expression=True, expected_failure=False):
check_select_expression=True):
reference_param_dict = self._catalog.buildSQLQuery(query_table='foo', **kw)
query = self._catalog.buildEntireQuery(kw).query
assertEqual = self.assertEqual
if expected_failure:
assertEqual = unittest.expectedFailure(assertEqual)
assertEqual(reference_tree, query)
search_text = query.asSearchTextExpression(self._catalog)
......
......@@ -112,9 +112,9 @@ class ZuiteTests( unittest.TestCase ):
return old
def _verifyArchive( self, bits, contents ):
import StringIO
from io import BytesIO
import zipfile
stream = StringIO.StringIO( bits )
stream = BytesIO( bits )
archive = zipfile.ZipFile( stream, 'r' )
names = list( archive.namelist() )
......@@ -139,9 +139,9 @@ class ZuiteTests( unittest.TestCase ):
def _verifyManifest( self, bits, name, contents ):
import StringIO
from io import BytesIO
import zipfile
stream = StringIO.StringIO( bits )
stream = BytesIO( bits )
archive = zipfile.ZipFile( stream, 'r' )
manifest = filter( None, archive.read( name ).split( '\n' ) )
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment