Commit 5338e8e7 authored by Jérome Perrin's avatar Jérome Perrin

XXX all changes to test (product)

Co-authored-by: Carlos Ramos Carreño's avatarCarlos Ramos Carreño <carlos.ramos@nexedi.com>
Co-authored-by: Emmy Vouriot's avatarEmmeline Vouriot <emmeline.vouriot@nexedi.com>
Co-authored-by: Kazuhiko Shiozaki's avatarKazuhiko SHIOZAKI <kazuhiko@nexedi.com>
parent f54ffa83
......@@ -62,6 +62,7 @@ import weakref
import transaction
from App.config import getConfiguration
import socket
from six.moves import range
class CommitFailed(Exception):
pass
......@@ -617,7 +618,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Monkey patch Queue to induce conflict errors artificially.
def query(self, query_string,*args, **kw):
# Not so nice, this is specific to zsql method
if "REPLACE INTO" in query_string:
if b"REPLACE INTO" in query_string:
raise OperationalError
return self.original_query(query_string,*args, **kw)
......@@ -731,7 +732,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
messages.
"""
activity_tool = self.portal.portal_activities
for _ in xrange(loop_size):
for _ in range(loop_size):
activity_tool.distribute(node_count=1)
activity_tool.tic(processing_node=1)
......@@ -924,7 +925,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
Organisation.updateDesc = updateDesc
# First check dequeue read same message only once
for i in xrange(10):
for i in range(10):
p.activate(activity="SQLDict").updateDesc()
self.commit()
......@@ -933,7 +934,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(p.getDescription(), "a")
# Check if there is pending activity after deleting an object
for i in xrange(10):
for i in range(10):
p.activate(activity="SQLDict").updateDesc()
self.commit()
......@@ -963,7 +964,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(0,organisation.getFoobar())
# Test group_method_id is working without group_id
for x in xrange(5):
for x in range(5):
organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar").reindexObject(number=1)
self.commit()
......@@ -975,7 +976,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Test group_method_id is working with one group_id defined
for x in xrange(5):
for x in range(5):
organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
self.commit()
......@@ -988,7 +989,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
del foobar_list[:]
# Test group_method_id is working with many group_id defined
for x in xrange(5):
for x in range(5):
organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="1").reindexObject(number=1)
self.commit()
organisation.activate(activity=activity, group_method_id="organisation_module/setFoobar", group_id="2").reindexObject(number=3)
......@@ -1024,7 +1025,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
"""
activity_tool = self.getActivityTool()
def delete_volatiles():
for property_id in activity_tool.__dict__.keys():
for property_id in list(six.iterkeys(activity_tool.__dict__)):
if property_id.startswith('_v_'):
delattr(activity_tool, property_id)
organisation_module = self.getOrganisationModule()
......@@ -1140,6 +1141,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.flushAllActivities(silent=1, loop_size=100)
# Check there is a traceback in the email notification
sender, recipients, mail = message_list.pop()
mail = mail.decode()
self.assertIn("Module %s, line %s, in failingMethod" % (
__name__, inspect.getsourcelines(failingMethod)[1]), mail)
self.assertIn("ValueError:", mail)
......@@ -1235,7 +1237,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Check that cmf_activity SQL connection still works
connection_da = self.portal.cmf_activity_sql_connection()
self.assertFalse(connection_da._registered)
connection_da.query('select 1')
connection_da.query(b'select 1')
self.assertTrue(connection_da._registered)
self.commit()
self.assertFalse(connection_da._registered)
......@@ -1691,7 +1693,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# This is a one-shot method, revert after execution
SQLDict.dequeueMessage = original_dequeue
result = self.dequeueMessage(activity_tool, processing_node, node_family_id_set)
queue_tic_test_dict['isAlive'] = process_shutdown_thread.isAlive()
queue_tic_test_dict['is_alive'] = process_shutdown_thread.is_alive()
return result
SQLDict.dequeueMessage = dequeueMessage
Organisation.waitingActivity = waitingActivity
......@@ -1715,7 +1717,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.tic()
activity_thread = ActivityThread()
# Do not try to outlive main thread.
activity_thread.setDaemon(True)
activity_thread.daemon = True
# Call process_shutdown in yet another thread because it will wait for
# running activity to complete before returning, and we need to unlock
# activity *after* calling process_shutdown to make sure the next
......@@ -1725,7 +1727,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.process_shutdown(3, 0)
process_shutdown_thread = ProcessShutdownThread()
# Do not try to outlive main thread.
process_shutdown_thread.setDaemon(True)
process_shutdown_thread.daemon = True
activity_thread.start()
# Wait at rendez-vous for activity to arrive.
......@@ -1744,7 +1746,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(len(message_list), 1)
self.assertEqual(message_list[0].method_id, 'getTitle')
# Check that process_shutdown_thread was still runing when Queue_tic returned.
self.assertTrue(queue_tic_test_dict.get('isAlive'), repr(queue_tic_test_dict))
self.assertTrue(queue_tic_test_dict.get('is_alive'), repr(queue_tic_test_dict))
# Call tic in foreground. This must not lead to activity execution.
activity_tool.tic()
self.assertEqual(len(activity_tool.getMessageList()), 1)
......@@ -1892,13 +1894,13 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
"""
original_query = six.get_unbound_function(DB.query)
def query(self, query_string, *args, **kw):
if query_string.startswith('INSERT'):
if query_string.startswith(b'INSERT'):
insert_list.append(len(query_string))
if not n:
raise Skip
return original_query(self, query_string, *args, **kw)
def check():
for i in xrange(1, N):
for i in range(1, N):
activity_tool.activate(activity=activity, group_id=str(i)
).doSomething(arg)
activity_tool.activate(activity=activity, group_id='~'
......@@ -2402,7 +2404,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# / \ |
# c3 c4 c5
c = [category_tool.newContent()]
for i in xrange(5):
for i in range(5):
c.append(c[i//2].newContent())
self.tic()
def activate(i, priority=1, **kw):
......@@ -2473,7 +2475,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
check(1, tag="foo")
check(0, tag="foo", method_id="getUid")
check(1, processing_node=-1)
check(3, processing_node=range(-5,5))
check(3, processing_node=list(range(-5,5)))
test()
self.commit()
test(check)
......@@ -2501,7 +2503,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(1, activity_tool.countMessage())
self.flushAllActivities()
sender, recipients, mail = message_list.pop()
self.assertIn('UID mismatch', mail)
self.assertIn(b'UID mismatch', mail)
m, = activity_tool.getMessageList()
self.assertEqual(m.processing_node, INVOKE_ERROR_STATE)
obj.flushActivity()
......
......@@ -785,31 +785,31 @@ class TestCMFCategory(ERP5TypeTestCase):
base_cat.getCategoryChildIndentedTitleItemList(),
[['', ''],
['The Title', 'the_id'],
['\xc2\xa0\xc2\xa0The Sub Title', 'the_id/the_sub_id']],
[NBSP_UTF8 * 2 + 'The Sub Title', 'the_id/the_sub_id']],
)
self.assertEqual(
base_cat.getCategoryChildTranslatedIndentedTitleItemList(),
[['', ''],
['The Title', 'the_id'],
['\xc2\xa0\xc2\xa0The S\xc3\xbcb T\xc3\xaftle', 'the_id/the_sub_id']],
[NBSP_UTF8 * 2 + 'The Süb Tïtle', 'the_id/the_sub_id']],
)
self.assertEqual(
base_cat.getCategoryChildTranslatedCompactTitleItemList(),
[['', ''],
['The Title', 'the_id'],
['The S\xc3\xbcb T\xc3\xaftle', 'the_id/the_sub_id']],
['The Süb Tïtle', 'the_id/the_sub_id']],
)
self.assertEqual(
base_cat.getCategoryChildTranslatedLogicalPathItemList(),
[['', ''],
['The Title', 'the_id'],
['The Title/The S\xc3\xbcb T\xc3\xaftle', 'the_id/the_sub_id']],
['The Title/The Süb Tïtle', 'the_id/the_sub_id']],
)
self.assertEqual(
base_cat.getCategoryChildTranslatedCompactLogicalPathItemList(),
[['', ''],
['The Title', 'the_id'],
['The Title/The S\xc3\xbcb T\xc3\xaftle', 'the_id/the_sub_id']],
['The Title/The Süb Tïtle', 'the_id/the_sub_id']],
)
def test_CategoryChildTitleItemListFilterNodeFilterLeave(self):
......@@ -1363,9 +1363,9 @@ class TestCMFCategory(ERP5TypeTestCase):
self.assertEqual(get(bc.id), list('aa'))
_set(bc.id, list('baa'))
self.assertEqual(get(bc.id), list('aba'))
_set(bc.id, map(base, 'bb'), 1)
_set(bc.id, [base(e) for e in 'bb'], 1)
self.assertEqual(get(bc.id), list('bb'))
_set(bc.id, map(base, 'abb'), 1)
_set(bc.id, [base(e) for e in 'abb'], 1)
self.assertEqual(get(bc.id), list('bab'))
_set(bc.id, ())
......
......@@ -16,14 +16,14 @@ from __future__ import print_function
from threading import Thread
from time import sleep
from urllib import addinfourl
from urllib import splithost
from urllib import splituser
from urllib import unquote
from urllib import splittype
from six.moves.urllib.parse import splithost
from six.moves.urllib.parse import splituser
from six.moves.urllib.parse import unquote
from six.moves.urllib.parse import splittype
import string
from urllib import FancyURLopener
from Cookie import SimpleCookie
from six.moves.urllib.request import FancyURLopener
from six.moves.http_cookies import SimpleCookie
def main():
max_thread = 7 # The number of thread we want by the same time
......@@ -55,7 +55,7 @@ def main():
print("thread: %i request: %i url: %s" % (i,request_number,url))
else:
for t in range(0,max_thread):
if threads[t].isAlive() == 0:
if threads[t].is_alive() == 0:
url = '//user%i:user%i@localhost:9673%s?__ac_name=user%s&__ac_password=user%s' % \
(t,t,list_url[i][:-1],t,t)
threads[t] = Thread(target=checker[t].CheckUrl,kwargs={'url':url})
......@@ -75,7 +75,7 @@ class URLOpener(FancyURLopener):
def open_http(self, url, data=None):
"""Use HTTP protocol."""
import httplib
import six.moves.http_client
user_passwd = None
if type(url) is type(""):
host, selector = splithost(url)
......@@ -99,10 +99,10 @@ class URLOpener(FancyURLopener):
if not host: raise IOError('http error', 'no host given')
if user_passwd:
import base64
auth = base64.encodestring(user_passwd).strip()
auth = base64.encodebytes(user_passwd).strip()
else:
auth = None
h = httplib.HTTP(host)
h = six.moves.http_client.HTTP(host)
if data is not None:
h.putrequest('POST', selector)
h.putheader('Content-type', 'application/x-www-form-urlencoded')
......@@ -142,7 +142,7 @@ class Checker(URLOpener):
try:
thread = Thread(target=self.SearchUrl,args=(url,))
thread.start()
while thread.isAlive():
while thread.is_alive():
sleep(0.5)
print("Connection to %s went fine" % url)
except IOError as err:
......
......@@ -73,8 +73,8 @@ class TestERP5PythonScript(ERP5TypeTestCase):
basic='%s:%s' % (self.manager_username, self.manager_password),
handle_errors=False,
)
self.assertIn('ERP5 Python Scripts', resp.getBody())
self.assertIn('addPythonScriptThroughZMI', resp.getBody())
self.assertIn(b'ERP5 Python Scripts', resp.getBody())
self.assertIn(b'addPythonScriptThroughZMI', resp.getBody())
def test_call(self):
self.script.setBody('return "Hello"')
......
......@@ -30,12 +30,13 @@
import threading
import unittest
import urllib
from six.moves.urllib.request import urlopen
import transaction
import pkg_resources
from DateTime import DateTime
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript
from six.moves import range
ZEO5 = pkg_resources.parse_version(
pkg_resources.get_distribution('ZEO').version
......@@ -67,13 +68,13 @@ class TestInvalidationBug(ERP5TypeTestCase):
query = connection.factory()('-' + connection.connection_string).query
sql = "rollback\0select * from %s where path='%s'" % (table, path)
test_list.append(lambda query=query, sql=sql: len(query(sql)[1]))
result_list = [map(apply, test_list)]
result_list = [[test() for test in test_list]]
Transaction_commitResources = transaction.Transaction._commitResources
connection = module._p_jar
def _commitResources(self):
def tpc_finish(rm, txn):
rm.__class__.tpc_finish(rm, txn)
result_list.append(None if rm is connection else map(apply, test_list))
result_list.append(None if rm is connection else [test() for test in test_list])
try:
for rm in self._resources:
rm.tpc_finish = lambda txn, rm=rm: tpc_finish(rm, txn)
......@@ -163,7 +164,7 @@ class TestInvalidationBug(ERP5TypeTestCase):
storage._server = None
# ... monkey-patch done
## create object
urllib.urlopen(new_content_url).read()
urlopen(new_content_url).read()
## validate reindex activity
activity_tool.distribute()
self.assertEqual(1, len(activity_tool.getMessageList()))
......@@ -218,7 +219,7 @@ if (count % 500) < 5:
log('creation speed: %s obj/s' % ((count - start[0]) /
(86400 * (DateTime() - start[1]))))
""")
for x in xrange(0,200):
for x in range(0,200):
module.activate(activity='SQLQueue', priority=2).create_script()
self.tic()
......
......@@ -31,7 +31,7 @@
TODO: test variation
test selection_report
"""
from __future__ import division
import os
import random
import unittest
......@@ -47,6 +47,7 @@ from Products.ERP5Type.tests.utils import reindex
from zExceptions import BadRequest
import six
from six.moves import range
class InventoryAPITestCase(ERP5TypeTestCase):
"""Base class for Inventory API Tests {{{
......@@ -893,7 +894,7 @@ class TestInventoryList(InventoryAPITestCase):
getInventoryList = self.getSimulationTool().getInventoryList
inventory_list = getInventoryList()
self.assertEqual(str(inventory_list.__class__),
'Shared.DC.ZRDB.Results.Results')
'Shared.DC.ZRDB.Results.Results' if six.PY2 else "<class 'Shared.DC.ZRDB.Results.Results'>")
# the brain is InventoryListBrain
self.assertIn('InventoryListBrain',
[c.__name__ for c in inventory_list._class.__bases__])
......@@ -1704,7 +1705,7 @@ class TestMovementHistoryList(InventoryAPITestCase):
getMovementHistoryList = self.getSimulationTool().getMovementHistoryList
mvt_history_list = getMovementHistoryList()
self.assertEqual(str(mvt_history_list.__class__),
'Shared.DC.ZRDB.Results.Results')
'Shared.DC.ZRDB.Results.Results' if six.PY2 else "<class 'Shared.DC.ZRDB.Results.Results'>")
# default is an empty list
self.assertEqual(0, len(mvt_history_list))
......@@ -2966,7 +2967,7 @@ class TestInventoryCacheTable(InventoryAPITestCase):
def afterSetUp(self):
InventoryAPITestCase.afterSetUp(self)
self.CACHE_LAG = cache_lag = self.getSimulationTool().getInventoryCacheLag()
min_lag = cache_lag / 2
min_lag = cache_lag // 2
self.NOW = now = DateTime(DateTime().strftime("%Y-%m-%d %H:%M:%S UTC"))
self.CACHE_DATE = cache_date = now - min_lag
from erp5.component.tool.SimulationTool import MYSQL_MIN_DATETIME_RESOLUTION
......@@ -3039,7 +3040,7 @@ class TestInventoryCacheTable(InventoryAPITestCase):
inventory_list = inventory_list[:] # That list is modified in this method
for criterion_dict in criterion_dict_list:
success = False
for inventory_position in xrange(len(inventory_list)):
for inventory_position in range(len(inventory_list)):
if self._doesInventoryLineMatch(criterion_dict,
inventory_list[inventory_position]):
del inventory_list[inventory_position]
......
......@@ -58,7 +58,7 @@ class Aspell(object):
command = 'echo %s | aspell -l %s -a' % (word, language)
subprocess = Popen(command, shell=True, stdin=PIPE,
stdout=PIPE, stderr=PIPE, close_fds=True)
return subprocess.communicate()[0].split('\n')[1:]
return subprocess.communicate()[0].decode('utf-8').split('\n')[1:]
class TestSpellChecking(ERP5TypeTestCase):
......@@ -115,7 +115,7 @@ class TestSpellChecking(ERP5TypeTestCase):
message = '"%s" is misspelled, suggestion are : "%s"'
result_dict = {}
for word, result_list in six.iteritems(self.spellChecker(sentence)):
filtered_result_list = filter(lambda x: x not in ('*', ''), result_list)
filtered_result_list = [x for x in result_list if x not in ('*', '')]
if filtered_result_list:
result_dict[word] = message % (word, \
filtered_result_list[0].split(':')[-1].strip())
......@@ -133,8 +133,8 @@ class TestSpellChecking(ERP5TypeTestCase):
# check some suggestion are given for a small mistake
self.assertNotEqual(self.validate_spell('canceled'), {})
self.assertIn('is misspelled', \
self.validate_spell('canceled').values()[0])
self.assertIn('is misspelled',
list(self.validate_spell('canceled').values())[0])
def test_business_template_list_with_workflow_template_item(self):
"""
......
......@@ -31,7 +31,7 @@ import unittest
import MethodObject
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.Utils import getMessageIdWithContext
from Products.ERP5Type.Utils import getMessageIdWithContext, str2unicode, unicode2str
# dependency order
target_business_templates = (
......@@ -100,7 +100,7 @@ class TestWorkflowStateTitleTranslation(ERP5TypeTestCase):
if (result == msgid) and (self.lang != 'en'):
#result = None
result = self.portal.Localizer.erp5_ui.gettext(msgid)
return result.encode('utf8')
return unicode2str(result)
def logMessage(self, message):
self.message += '%s\n' % message
......@@ -180,7 +180,7 @@ class TestWorkflowStateTitleTranslation(ERP5TypeTestCase):
if error:
for key, item_list in error_dict.items():
if len(item_list) != 0:
self.logMessage("\n'%s'" % key.encode('utf-8'))
self.logMessage("\n'%s'" % unicode2str(key))
self.logMessage('\t### Conflicting workflow with common states (ie, what user can see) ###')
for item in item_list:
# XXX Improve rendering
......@@ -250,7 +250,7 @@ class TestWorkflowStateTitleTranslation(ERP5TypeTestCase):
'item_workflow'),
add=1)
message_catalog.message_edit('Validated', self.lang,
'Validé'.decode('utf8'), '')
str2unicode('Validé'), '')
message_catalog.message_edit(getMessageIdWithContext('Validated',
'state',
'item_workflow'),
......@@ -433,7 +433,7 @@ class TestTranslation(ERP5TypeTestCase):
</tal:ommit>
""" % domain
self.myzpt.pt_edit(zpt_template, 'text/html')
return self.myzpt(words=words).encode('utf-8').split()
return unicode2str(self.myzpt(words=words)).split()
def test_ZPT_translation(self):
results = self.translate_by_zpt('erp5_ui', 'Person', 'Draft')
......
......@@ -29,7 +29,7 @@ import os
import tarfile
import xml.parsers.expat
import xml.dom.minidom
from urllib import url2pathname
from six.moves.urllib.request import url2pathname
from ZODB.DemoStorage import DemoStorage
from ZODB import DB
from Products.ERP5Type.XMLExportImport import importXML
......@@ -165,14 +165,14 @@ class BusinessTemplateInfoDir(BusinessTemplateInfoBase):
def findFileInfosByName(self, startswith='', endswith=''):
allfiles = []
def visit(arg, dirname, names):
if '.svn' in dirname:
return
for i in names:
path = os.path.join(self.target, dirname, i)
for root, dir_list, file_list in os.walk(self.target):
# We can drop this block as we no longer use Subversion.
if '.svn' in root.split(os.path.sep):
continue
for file_ in file_list:
path = os.path.join(self.target, root, file_)
if os.path.isfile(path):
allfiles.append(path)
os.path.walk(self.target, visit, None)
for i in allfiles:
if i.startswith(startswith) and i.endswith(endswith):
yield i
......@@ -186,4 +186,5 @@ class BusinessTemplateInfoDir(BusinessTemplateInfoBase):
return fileinfo
def readFileInfo(self, fileinfo):
return open(fileinfo).read()
with open(fileinfo) as f:
return f.read()
......@@ -32,7 +32,6 @@
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from AccessControl.SecurityManagement import newSecurityManager
from six import StringIO
class TestFormPrintoutMixin(ERP5TypeTestCase):
run_all_test = 1
......@@ -49,7 +48,7 @@ class TestFormPrintoutMixin(ERP5TypeTestCase):
'''return odf document from the printout
'''
document_file = getattr(self.portal, printout_form.template, None)
document_file = StringIO(document_file).read()
document_file = bytes(document_file)
if document_file is not None:
return document_file
raise ValueError ('%s template not found' % printout_form.template)
......
......@@ -36,7 +36,7 @@ from AccessControl.SecurityManagement import newSecurityManager
from Acquisition import aq_base
from Products.ERP5OOo.tests.utils import Validator
from lxml import html
import email, urlparse, httplib
import email, six.moves.urllib.parse, six.moves.http_client
from Products.Formulator.MethodField import Method
......@@ -133,7 +133,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
file_name = part.get_filename()
......@@ -158,9 +158,9 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertTrue("History%s" % extension or self.attachment_file_extension in content)
tree = html.fromstring(content)
link, = [href for href in tree.xpath('//a/@href') if href]
relative_url =urlparse.urlparse(link)
relative_url =six.moves.urllib.parse.urlparse(link)
report = self.publish(relative_url.path+"?"+relative_url.query, '%s:%s' % (self.username, self.password))
self.assertEqual(httplib.OK, report.getStatus())
self.assertEqual(six.moves.http_client.OK, report.getStatus())
self.assertEqual(report.getHeader('content-type'), content_type or self.content_type)
def _checkDocument(self):
......@@ -198,7 +198,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
if content_type == "text/html":
......@@ -236,7 +236,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
if content_type == "text/html":
......@@ -260,7 +260,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
if content_type == "text/html":
......@@ -285,7 +285,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
if content_type == self.content_type:
......@@ -315,13 +315,13 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
'HTTP_ACCEPT_LANGUAGE': 'fr;q=0.9,en;q=0.8',
})
self.tic()
mail_message = email.message_from_string(self.portal.MailHost._last_message[2])
mail_message = email.message_from_string(self.portal.MailHost._last_message[2].decode())
# mail subject is translated
self.assertEqual('Historique', mail_message['subject'])
# content is translated
part, = [x for x in mail_message.walk() if x.get_content_type() == self.content_type]
self.assertIn(
'Historique',
b'Historique',
self.portal.portal_transforms.convertTo(
'text/plain',
part.get_payload(decode=True),
......@@ -345,14 +345,14 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
'HTTP_COOKIE': 'LOCALIZER_LANGUAGE="fr"',
})
self.tic()
mail_message = email.message_from_string(self.portal.MailHost._last_message[2])
mail_message = email.message_from_string(self.portal.MailHost._last_message[2].decode())
# mail subject is translated
self.assertEqual('Historique', mail_message['subject'])
# content is translated
mail_message = email.message_from_string(self.portal.MailHost._last_message[2])
mail_message = email.message_from_string(self.portal.MailHost._last_message[2].decode())
part, = [x for x in mail_message.walk() if x.get_content_type() == self.content_type]
self.assertIn(
'Historique',
b'Historique',
self.portal.portal_transforms.convertTo(
'text/plain',
part.get_payload(decode=True),
......@@ -386,7 +386,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
# after they are saved to DB and automatically migrated. The getProperty
# above, which is also what ods_style does, only work after the report
# state is updated.
report.__setstate__(aq_base(getattr(skin_folder, report_form_name)).__getstate__())
aq_base(report).__setstate__(aq_base(getattr(skin_folder, report_form_name)).__getstate__())
self.assertEqual(report.getProperty('title'), self.id())
# Report section method
......@@ -450,7 +450,7 @@ class TestDeferredStyleBase(DeferredStyleTestCase):
# inspect the report as text and check the selection was initialized from
# request parameter.
mail_message = email.message_from_string(self.portal.MailHost._last_message[2])
mail_message = email.message_from_string(self.portal.MailHost._last_message[2].decode())
part, = [x for x in mail_message.walk() if x.get_content_type() == self.content_type]
doc = self.portal.document_module.newContent(
......@@ -487,7 +487,7 @@ class TestODSDeferredStyle(TestDeferredStyleBase):
self.assertNotEqual(last_message, ())
mfrom, mto, message_text = last_message
self.assertEqual('"%s" <%s>' % (self.first_name, self.recipient_email_address), mto[0])
mail_message = email.message_from_string(message_text)
mail_message = email.message_from_string(message_text.decode())
for part in mail_message.walk():
content_type = part.get_content_type()
file_name = part.get_filename()
......
......@@ -152,7 +152,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
# check the style is keept after the odg generation
self.assertEqual(final_document_style_dict, style_dict)
self.assertTrue(content_xml.find("Foo title!") > 0)
self.assertIn(b"Foo title!", content_xml)
self.assertEqual(request.RESPONSE.getHeader('content-type'),
'application/vnd.oasis.opendocument.graphics')
self.assertEqual(request.RESPONSE.getHeader('content-disposition'),
......@@ -161,12 +161,11 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
# 2. Normal case: change the field value and check again the ODF document
test1.setTitle("Changed Title!")
#foo_form.my_title.set_value('default', "Changed Title!")
odf_document = foo_printout.index_html(request)
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Changed Title!") > 0)
self.assertIn(b"Changed Title!", content_xml)
self._validate(odf_document)
# 3. False case: change the field name
......@@ -178,7 +177,8 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("you cannot find") > 0)
self.assertNotIn(b"you cannot find!", content_xml)
self._validate(odf_document)
# put back
foo_form.manage_renameObject('xxx_title', 'my_title', REQUEST=request)
......@@ -199,7 +199,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("call!") > 0)
self.assertIn(b"call!", content_xml)
# when just call FormPrintout, it does not change content-type
# Zope4 add charset=utf-8
self.assertTrue('text/html' in request.RESPONSE.getHeader('content-type'))
......@@ -211,7 +211,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Français") > 0)
self.assertIn(u"Français".encode('utf-8'), content_xml)
self._validate(odf_document)
# 6. Normal case: unicode string
......@@ -220,7 +220,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Français test2") > 0)
self.assertIn(u"Français test2".encode('utf-8'), content_xml)
# leave _validate() here not to forget the validation failure
self._validate(odf_document)
......@@ -278,8 +278,8 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertEqual(final_document_style_dict, style_dict)
# check the two lines are prensent in the generated document
self.assertTrue(content_xml.find('A text a bit more longer') > 0)
self.assertTrue(content_xml.find('With a newline !') > 0)
self.assertIn(b'A text a bit more longer', content_xml)
self.assertIn(b'With a newline !', content_xml)
# check there is two line-break in the element my_description
text_xpath = '//draw:frame[@draw:name="my_description"]//text:line-break'
......@@ -403,7 +403,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
# check the style is keept after the odg generation
self.assertEqual(final_document_style_dict, style_dict)
self.assertTrue(content_xml.find("Foo title!") > 0)
self.assertIn(b"Foo title!", content_xml)
self.assertEqual(request.RESPONSE.getHeader('content-type'),
'application/vnd.oasis.opendocument.graphics')
self.assertEqual(request.RESPONSE.getHeader('content-disposition'),
......@@ -412,12 +412,11 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
# 2. Normal case: change the field value and check again the ODF document
test1.setTitle("Changed Title!")
#foo_form.my_title.set_value('default', "Changed Title!")
odf_document = foo_printout.index_html(request)
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Changed Title!") > 0)
self.assertIn(b"Changed Title!", content_xml)
self._validate(odf_document)
# 3. False case: change the field name
......@@ -429,7 +428,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertFalse(content_xml.find("you cannot find") > 0)
self.assertNotIn(b"you cannot find", content_xml)
self._validate(odf_document)
# put back
foo_form.manage_renameObject('xxx_title', 'my_title', REQUEST=request)
......@@ -449,7 +448,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("call!") > 0)
self.assertIn(b"call!", content_xml)
self.assertEqual(request.RESPONSE.getHeader('content-type'),
'application/vnd.oasis.opendocument.graphics')
self._validate(odf_document)
......@@ -460,7 +459,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Français") > 0)
self.assertIn(u"Français".encode('utf-8'), content_xml)
self._validate(odf_document)
# 6. Normal case: unicode string
......@@ -469,7 +468,7 @@ class TestFormPrintoutAsODG(TestFormPrintoutMixin):
self.assertTrue(odf_document is not None)
builder = OOoBuilder(odf_document)
content_xml = builder.extract("content.xml")
self.assertTrue(content_xml.find("Français test2") > 0)
self.assertIn(u"Français test2".encode('utf-8'), content_xml)
self._validate(odf_document)
def test_suite():
......
This diff is collapsed.
......@@ -29,11 +29,12 @@
import os
import unittest
from six.moves import cStringIO as StringIO
from io import BytesIO
from zipfile import ZipFile
from Products.ERP5Type.tests.utils import FileUpload
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import DummyLocalizer
from Products.ERP5Type.Utils import bytes2str
from Products.ERP5OOo.tests.utils import Validator
from Products.ERP5OOo.OOoUtils import OOoBuilder
......@@ -124,7 +125,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
self.assertEqual(200, response.getStatus())
ooo_builder = OOoBuilder(response.getBody())
styles_xml_body = ooo_builder.extract('styles.xml')
styles_xml_body = bytes2str(ooo_builder.extract('styles.xml'))
self.assertTrue(len(styles_xml_body) > 0)
# 'Style sheet ja' text is in the odt document header,
# and the header is in the 'styles.xml'.
......@@ -137,7 +138,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
response = self.publish('/' + self.getPortal().Dynamic_viewAsOdt.absolute_url(1))
self._validate(response.getBody())
ooo_builder = OOoBuilder(response.getBody())
styles_xml_body = ooo_builder.extract('styles.xml')
styles_xml_body = bytes2str(ooo_builder.extract('styles.xml'))
self.assertTrue(styles_xml_body.find('Style sheet en') > 0)
# 3. test a fail case, reset a not existed stylesheet
......@@ -150,7 +151,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
self.getPortal().Localizer.changeLanguage('en')
response = self.publish('/' + self.getPortal().Dynamic_viewAsOdt.absolute_url(1))
# then, it is not a zip stream
self.assertFalse(response.getBody().startswith('PK'))
self.assertFalse(response.getBody().startswith(b'PK'))
self.assertEqual(500, response.getStatus())
......@@ -177,7 +178,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
response.getHeader('content-disposition'))
self._validate(response.getBody())
ooo_builder = OOoBuilder(response.getBody())
styles_xml_body = ooo_builder.extract('styles.xml')
styles_xml_body = bytes2str(ooo_builder.extract('styles.xml'))
self.assertTrue(len(styles_xml_body) > 0)
self.assertTrue(styles_xml_body.find('Style sheet ja') > 0)
......@@ -189,7 +190,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
self.assertEqual(200, response.getStatus())
self._validate(response.getBody())
ooo_builder = OOoBuilder(response.getBody())
styles_xml_body = ooo_builder.extract('styles.xml')
styles_xml_body = bytes2str(ooo_builder.extract('styles.xml'))
self.assertTrue(len(styles_xml_body) > 0)
self.assertTrue(styles_xml_body.find('Style sheet en') > 0)
......@@ -198,7 +199,7 @@ return getattr(context, "%s_%s" % (parameter, current_language))
Static_viewAsOdt.doSettings(request, title='', xml_file_id='content.xml',
ooo_stylesheet='NotFound_getODTStyleSheet', script_name='')
response = self.publish('/' + self.getPortal().Static_viewAsOdt.absolute_url(1))
self.assertFalse(response.getBody().startswith('PK'))
self.assertFalse(response.getBody().startswith(b'PK'))
self.assertEqual(500, response.getStatus())
def test_include_img(self):
......@@ -235,14 +236,14 @@ return getattr(context, "%s_%s" % (parameter, current_language))
response.getHeader('content-type').split(';')[0])
self.assertEqual('attachment; filename="Base_viewIncludeImageAsOdt.odt"',
response.getHeader('content-disposition'))
cs = StringIO()
cs = BytesIO()
cs.write(body)
zip_document = ZipFile(cs)
picture_list = filter(lambda x: "Pictures" in x.filename,
zip_document.infolist())
self.assertNotEqual([], picture_list)
manifest = zip_document.read('META-INF/manifest.xml')
content = zip_document.read('content.xml')
manifest = bytes2str(zip_document.read('META-INF/manifest.xml'))
content = bytes2str(zip_document.read('content.xml'))
for picture in picture_list:
self.assertIn(picture.filename, manifest)
self.assertIn(picture.filename, content)
......
This diff is collapsed.
......@@ -43,7 +43,7 @@ class TestOOoParser(unittest.TestCase):
parser = OOoParser()
parser.openFile(open(makeFilePath('import_data_list.ods'), 'rb'))
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Person'], mapping.keys())
self.assertEqual(['Person'], list(mapping.keys()))
person_mapping = mapping['Person']
self.assertTrue(isinstance(person_mapping, list))
self.assertTrue(102, len(person_mapping))
......@@ -57,14 +57,14 @@ class TestOOoParser(unittest.TestCase):
with open(makeFilePath('import_data_list.ods'), 'rb') as f:
parser.openFromBytes(f.read())
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Person'], mapping.keys())
self.assertEqual(['Person'], list(mapping.keys()))
def test_getSpreadSheetMappingStyle(self):
parser = OOoParser()
with open(makeFilePath('import_data_list_with_style.ods'), 'rb') as f:
parser.openFile(f)
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Feuille1'], mapping.keys())
self.assertEqual(['Feuille1'], list(mapping.keys()))
self.assertEqual(mapping['Feuille1'][1],
['a line with style'])
self.assertEqual(mapping['Feuille1'][2],
......@@ -79,7 +79,7 @@ class TestOOoParser(unittest.TestCase):
with open(makeFilePath('import_data_list_data_type.ods'), 'rb') as f:
parser.openFile(f)
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Feuille1'], mapping.keys())
self.assertEqual(['Feuille1'], list(mapping.keys()))
self.assertEqual(mapping['Feuille1'][0],
['1234.5678'])
self.assertEqual(mapping['Feuille1'][1],
......@@ -112,7 +112,7 @@ class TestOOoParser(unittest.TestCase):
parser = OOoParser()
parser.openFile(open(makeFilePath('complex_text.ods'), 'rb'))
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Feuille1'], mapping.keys())
self.assertEqual(['Feuille1'], list(mapping.keys()))
self.assertEqual(mapping['Feuille1'][0], [' leading space'])
self.assertEqual(mapping['Feuille1'][1], [' leading space'])
self.assertEqual(mapping['Feuille1'][2], ['tab\t'])
......@@ -122,7 +122,7 @@ class TestOOoParser(unittest.TestCase):
parser = OOoParser()
parser.openFile(open(makeFilePath('empty_cells.ods'), 'rb'))
mapping = parser.getSpreadsheetsMapping()
self.assertEqual(['Feuille1'], mapping.keys())
self.assertEqual(['Feuille1'], list(mapping.keys()))
self.assertEqual(mapping['Feuille1'],
[
['A1', None, 'C1'],
......
......@@ -34,12 +34,12 @@ from Products.ERP5Form.Selection import Selection
from Testing import ZopeTestCase
from DateTime import DateTime
from Products.ERP5OOo.tests.utils import Validator
import httplib
import six.moves.http_client
import lxml.html
import mock
import PyPDF2
HTTP_OK = httplib.OK
HTTP_OK = six.moves.http_client.OK
# setting this to True allows the .publish() calls to provide tracebacks
debug = False
......@@ -700,7 +700,7 @@ class TestOOoStyle(ERP5TypeTestCase, ZopeTestCase.Functional):
self.assertEqual('text/html;charset=utf-8', content_type.lower())
self.assertFalse(response.getHeader('content-disposition'))
# Simplistic assertion that we are viewing the ODF XML source
self.assertIn('office:document-content', response.getBody())
self.assertIn(b'office:document-content', response.getBody())
def test_form_list_ZMI(self):
"""We can edit form_list in the ZMI."""
......@@ -710,7 +710,7 @@ class TestOOoStyle(ERP5TypeTestCase, ZopeTestCase.Functional):
content_type = response.getHeader('content-type')
self.assertEqual('text/html;charset=utf-8', content_type.lower())
self.assertFalse(response.getHeader('content-disposition'))
self.assertIn('office:document-content', response.getBody())
self.assertIn(b'office:document-content', response.getBody())
def test_report_view_ZMI(self):
"""We can edit report_view in the ZMI."""
......@@ -720,7 +720,8 @@ class TestOOoStyle(ERP5TypeTestCase, ZopeTestCase.Functional):
content_type = response.getHeader('content-type')
self.assertEqual('text/html;charset=utf-8', content_type.lower())
self.assertFalse(response.getHeader('content-disposition'))
self.assertIn('office:document-content', response.getBody())
self.assertIn(b'office:document-content', response.getBody())
class TestODTStyle(TestOOoStyle):
skin = 'ODT'
......
......@@ -41,6 +41,7 @@ import zipfile
import subprocess
from six.moves import urllib
from six.moves import cStringIO as StringIO
from io import BytesIO
try:
import lxml
......@@ -65,14 +66,14 @@ if lxml:
def validate(self, odf_file_content):
error_list = []
odf_file = StringIO(odf_file_content)
odf_file = BytesIO(odf_file_content)
for f in ('content.xml', 'meta.xml', 'styles.xml', 'settings.xml'):
error_list.extend(self._validateXML(odf_file, f))
return error_list
def _validateXML(self, odf_file, content_file_name):
zfd = zipfile.ZipFile(odf_file)
doc = lxml.etree.parse(StringIO(zfd.read(content_file_name)))
doc = lxml.etree.parse(BytesIO(zfd.read(content_file_name)))
return []
# The following is the past implementation that validates with
# RelaxNG schema. But recent LibreOffice uses extended odf
......
......@@ -34,9 +34,11 @@ import mock
import itertools
import transaction
import unittest
import urlparse
import six
from six.moves.urllib.parse import urlparse, parse_qs
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript
from Products.ERP5Type.Utils import bytes2str
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl import SpecialUsers
......@@ -57,8 +59,10 @@ class UserManagementTestCase(ERP5TypeTestCase):
"""TestCase for user manement, with utilities to create users and helpers
assertion methods.
"""
if six.PY2:
_login_generator = itertools.count().next
else:
_login_generator = itertools.count().__next__
def getBusinessTemplateList(self):
"""List of BT to install. """
......@@ -742,12 +746,12 @@ class TestPreferences(UserManagementTestCase):
current_password='bad' + password,
new_password=new_password,
)
parsed_url = urlparse.urlparse(result)
parsed_url = urlparse(result)
self.assertEqual(
parsed_url.path.split('/')[-2:],
['portal_preferences', 'PreferenceTool_viewChangePasswordDialog'])
self.assertEqual(
urlparse.parse_qs(parsed_url.query),
parse_qs(parsed_url.query),
{'portal_status_message': ['Current password is wrong.'], 'portal_status_level': ['error']})
self.login()
......@@ -1202,8 +1206,8 @@ class TestUserManagementExternalAuthentication(TestUserManagement):
# view front page we should be logged in if we use authentication key
response = self.publish(base_url, env={self.user_id_key.replace('-', '_').upper(): login})
self.assertEqual(response.getStatus(), 200)
self.assertIn('Logged In', response.getBody())
self.assertIn(login, response.getBody())
self.assertIn('Logged In', bytes2str(response.getBody()))
self.assertIn(login, bytes2str(response.getBody()))
class _TestLocalRoleManagementMixIn(object):
......@@ -1543,7 +1547,7 @@ class _TestKeyAuthenticationMixIn(object):
# view front page we should be logged in if we use authentication key
response = self.publish('%s?__ac_key=%s' %(base_url, key))
self.assertEqual(response.getStatus(), 200)
self.assertIn(reference, response.getBody())
self.assertIn(reference, bytes2str(response.getBody()))
# check if key authentication works other page than front page
person_module = portal.person_module
......@@ -1554,7 +1558,7 @@ class _TestKeyAuthenticationMixIn(object):
self.assertTrue('%s/login_form?came_from=' % portal.getId(), response.headers['location'])
response = self.publish('%s?__ac_key=%s' %(base_url, key))
self.assertEqual(response.getStatus(), 200)
self.assertIn(reference, response.getBody())
self.assertIn(reference, bytes2str(response.getBody()))
# check if key authentication works with web_mode too
web_site = portal.web_site_module.newContent(portal_type='Web Site')
......
......@@ -42,7 +42,7 @@ from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions.ExceptionFormatter import format_exception
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.runUnitTest import log_directory
from Products.ERP5Type.Utils import stopProcess, PR_SET_PDEATHSIG
from Products.ERP5Type.Utils import stopProcess, PR_SET_PDEATHSIG, unicode2str
from lxml import etree
from lxml.html.builder import E
import certifi
......@@ -397,7 +397,7 @@ class FunctionalTestRunner:
for test_tr in test_table.xpath('.//tr[contains(@class, "status_failed")]'):
test_tr.set('style', 'background-color: red;')
details_attribute_dict = {}
if etree.tostring(test_table).find("expected failure") != -1:
if u"expected failure" in etree.tostring(test_table, encoding="unicode"):
expected_failure_amount += 1
else:
failure_amount += 1
......@@ -405,7 +405,7 @@ class FunctionalTestRunner:
details_attribute_dict['open'] = 'true'
detail_element = E.div()
detail_element.append(E.details(E.summary(test_name), test_table, **details_attribute_dict))
detail += etree.tostring(detail_element)
detail += unicode2str(etree.tostring(detail_element, encoding="unicode"))
tr_count += 1
success_amount = tr_count - 1 - failure_amount - expected_failure_amount
if detail:
......
......@@ -253,7 +253,7 @@ class ERP5TypeTestReLoader(ERP5TypeTestLoader):
def runLiveTest(test_list, verbosity=1, stream=None, request_server_url=None, **kw):
from Products.ERP5Type.tests.runUnitTest import DebugTestResult
from StringIO import StringIO
from six.moves import StringIO
# Add path of the TestTemplateItem folder of the instance
path = kw.get('path', None)
if path is not None and path not in sys.path:
......
......@@ -9,7 +9,7 @@ __version__ = '0.3.0'
import base64
import errno
import httplib
from six.moves import http_client
import os
import random
import re
......@@ -18,13 +18,12 @@ import string
import sys
import time
import traceback
import urllib
import ConfigParser
from six.moves import configparser
from contextlib import contextmanager
from io import BytesIO
from functools import partial
from six.moves.urllib.parse import unquote_to_bytes
from cPickle import dumps
from six.moves.cPickle import dumps
from glob import glob
from hashlib import md5
from warnings import warn
......@@ -32,10 +31,11 @@ from DateTime import DateTime
import mock
import Products.ZMySQLDA.DA
from Products.ZMySQLDA.DA import Connection as ZMySQLDA_Connection
from zope.globalrequest import clearRequest
from zope.globalrequest import getRequest
from zope.globalrequest import setRequest
import six
if six.PY3:
StandardError = Exception
from zope.component.hooks import setSite
......@@ -47,7 +47,7 @@ from Products.PythonScripts.PythonScript import PythonScript
from Products.ERP5Type.Accessor.Constant import PropertyGetter as ConstantGetter
from Products.ERP5Form.PreferenceTool import Priority
from zLOG import LOG, DEBUG
from Products.ERP5Type.Utils import convertToUpperCase, str2bytes
from Products.ERP5Type.Utils import convertToUpperCase, bytes2str, str2bytes
from Products.ERP5Type.tests.backportUnittest import SetupSiteError
from Products.ERP5Type.tests.utils import addUserToDeveloperRole
from Products.ERP5Type.tests.utils import parseListeningAddress
......@@ -154,7 +154,7 @@ def _createTestPromiseConfigurationFile(promise_path, bt5_repository_path_list=N
_getVolatileMemcachedServerDict()
cloudooo_url_list = _getConversionServerUrlList()
promise_config = ConfigParser.RawConfigParser()
promise_config = configparser.RawConfigParser()
promise_config.add_section('external_service')
promise_config.set('external_service', 'cloudooo_url_list', cloudooo_url_list)
promise_config.set('external_service', 'memcached_url',memcached_url)
......@@ -170,7 +170,8 @@ def _createTestPromiseConfigurationFile(promise_path, bt5_repository_path_list=N
promise_config.set('portal_certificate_authority', 'certificate_authority_path',
os.environ['TEST_CA_PATH'])
promise_config.write(open(promise_path, 'w'))
with open(promise_path, 'w') as f:
promise_config.write(f)
def profile_if_environ(environment_var_name):
if int(os.environ.get(environment_var_name, 0)):
......@@ -929,7 +930,7 @@ class ERP5TypeCommandLineTestCase(ERP5TypeTestCaseMixin):
forced_portal_id = os.environ.get('erp5_tests_portal_id')
if forced_portal_id:
return str(forced_portal_id)
m = md5(repr(self.getBusinessTemplateList()) + self.getTitle())
m = md5(str2bytes(repr(self.getBusinessTemplateList()) + self.getTitle()))
return portal_name + '_' + m.hexdigest()
def getPortal(self):
......@@ -1498,7 +1499,7 @@ class ZEOServerTestCase(ERP5TypeTestCase):
if e[0] != errno.EADDRINUSE:
raise
if zeo_client:
os.write(zeo_client, repr(host_port))
os.write(zeo_client, str2bytes(repr(host_port)))
os.close(zeo_client)
ZopeTestCase._print("\nZEO Storage started at %s:%s ... " % host_port)
......@@ -1571,7 +1572,7 @@ def optimize():
PythonScript._compile = _compile
PythonScript_exec = PythonScript._exec
def _exec(self, *args):
self.func_code # trigger compilation if needed
self.__code__ # trigger compilation if needed
return PythonScript_exec(self, *args)
PythonScript._exec = _exec
from Acquisition import aq_parent
......
......@@ -7,6 +7,7 @@ from asyncore import socket_map
from ZODB.DemoStorage import DemoStorage
from ZODB.FileStorage import FileStorage
from Products.ERP5Type.tests.utils import getMySQLArguments, instance_random
from six.moves import range
def _print(message):
sys.stderr.write(message + "\n")
......@@ -99,7 +100,7 @@ def fork():
def forkNodes():
global node_pid_list
for i in xrange(1, activity_node):
for i in range(1, activity_node):
pid = fork()
if not pid:
node_pid_list = None
......@@ -122,7 +123,7 @@ if neo_storage:
storage_count = 2
if load or save:
db_list = [os.path.join(instance_home, 'var', 'neo%u.sqlite' % i)
for i in xrange(1, storage_count+1)]
for i in range(1, storage_count+1)]
else:
db_list = [None] * storage_count
cwd = os.getcwd()
......
......@@ -154,7 +154,7 @@ def main():
assert revision == test_result.revision, (revision, test_result.revision)
while suite.acquire():
test = test_result.start(suite.running.keys())
test = test_result.start(list(suite.running.keys()))
if test is not None:
suite.start(test.name, lambda status_dict, __test=test:
__test.stop(**status_dict))
......
......@@ -300,7 +300,7 @@ class ERP5TypeTestLoader(unittest.TestLoader):
def _importZodbTestComponent(self, name):
import erp5.component.test
module = __import__('erp5.component.test.' + name,
fromlist=['erp5.component.test'],
fromlist=['erp5.component.test'] if six.PY2 else ['erp5'],
level=0)
try:
self._test_component_ref_list.append(module)
......
......@@ -1454,7 +1454,7 @@ class TestZodbModuleComponent(SecurityTestCase):
def afterSetUp(self):
self._component_tool = self.portal.portal_components
self._module = __import__(self._document_class._getDynamicModuleNamespace(),
fromlist=['erp5.component'])
fromlist=['erp5.component'] if six.PY2 else ['erp5'])
self._component_tool.reset(force=True,
reset_portal_type_at_transaction_boundary=True)
......@@ -1524,7 +1524,10 @@ class TestZodbModuleComponent(SecurityTestCase):
if expected_default_version is not None:
top_module_name = self._document_class._getDynamicModuleNamespace()
top_module = __import__(top_module_name, level=0, fromlist=[top_module_name])
top_module = __import__(
top_module_name,
level=0,
fromlist=[top_module_name] if six.PY2 else ['erp5'])
# The module must be available in its default version
self.assertHasAttribute(top_module, expected_default_version)
......@@ -1555,7 +1558,8 @@ class TestZodbModuleComponent(SecurityTestCase):
module_name = self._getComponentFullModuleName(module_name)
module = __import__(
module_name,
fromlist=[self._document_class._getDynamicModuleNamespace()],
fromlist=[self._document_class._getDynamicModuleNamespace()]
if six.PY2 else ['erp5'],
level=0)
self.assertIn(module_name, sys.modules)
return module
......@@ -2048,7 +2052,7 @@ def bar(*args, **kwargs):
self.assertModuleImportable('erp5_version.%s' % imported_reference)
top_module = __import__(top_module_name, level=0,
fromlist=[top_module_name])
fromlist=[top_module_name] if six.PY2 else ['erp5'])
self._importModule('erp5_version.%s' % imported_reference)
......@@ -2111,7 +2115,7 @@ def function_foo(*args, **kwargs):
top_module_name = self._document_class._getDynamicModuleNamespace()
top_module = __import__(top_module_name, level=0,
fromlist=[top_module_name])
fromlist=[top_module_name] if six.PY2 else ['erp5'])
self._importModule(reference)
module = getattr(top_module, reference)
......@@ -2742,7 +2746,7 @@ foobar = foobar().f
base = self.portal.getPath()
for query in 'x:int=-24&y:int=66', 'x:int=41':
path = '%s/TestExternalMethod?%s' % (base, query)
self.assertEqual(self.publish(path).getBody(), '42')
self.assertEqual(self.publish(path).getBody(), b'42')
# Test from a Python Script
createZODBPythonScript(self.portal.portal_skins.custom,
......@@ -2787,7 +2791,7 @@ def foobar(self, a, b="portal_type"):
cfg.extensions = tempfile.mkdtemp()
try:
with open(os.path.join(cfg.extensions, module + '.py'), "w") as f:
f.write("foobar = lambda **kw: sorted(kw.iteritems())")
f.write("foobar = lambda **kw: sorted(kw.items())")
self.assertEqual(external_method(z=1, a=0), [('a', 0), ('z', 1)])
finally:
shutil.rmtree(cfg.extensions)
......@@ -2878,8 +2882,8 @@ class TestWithImport(TestImported):
from ITestGC import ITestGC
import zope.interface
@zope.interface.implementer(ITestGC)
class TestGC(XMLObject):
zope.interface.implements(ITestGC)
def foo(self):
pass
""")
......@@ -2910,12 +2914,15 @@ class TestGC(XMLObject):
self.assertEqual(gc.garbage, [])
import erp5.component
gc.set_debug(
gc.DEBUG_STATS |
gc.DEBUG_UNCOLLECTABLE |
gc.DEBUG_COLLECTABLE |
gc.DEBUG_OBJECTS |
gc.DEBUG_INSTANCES)
debug_flags = (
gc.DEBUG_STATS
| gc.DEBUG_UNCOLLECTABLE
| gc.DEBUG_COLLECTABLE )
if six.PY2:
debug_flags |= (
gc.DEBUG_OBJECTS
| gc.DEBUG_INSTANCES)
gc.set_debug(debug_flags)
sys.stderr = stderr
# Still not garbage collectable as RefManager still keeps a reference
erp5.component.ref_manager.clear()
......@@ -2998,11 +3005,11 @@ from erp5.component.document.Person import Person
from ITestPortalType import ITestPortalType
import zope.interface
@zope.interface.implementer(ITestPortalType)
class TestPortalType(Person):
def test42(self):
return 42
zope.interface.implements(ITestPortalType)
def foo(self):
pass
""")
......@@ -3194,7 +3201,7 @@ InitializeClass(%(class_name)s)
'%s/manage_addProduct/ERP5/manage_addToolForm' % self.portal.getPath(),
'%s:%s' % (self.manager_username, self.manager_password))
self.assertEqual(response.getStatus(), 200)
self.assertNotIn('ERP5 Test Hook After Load Tool', response.getBody())
self.assertNotIn(b'ERP5 Test Hook After Load Tool', response.getBody())
component.validate()
self.tic()
......@@ -3206,7 +3213,7 @@ InitializeClass(%(class_name)s)
'%s/manage_addProduct/ERP5/manage_addToolForm' % self.portal.getPath(),
'%s:%s' % (self.manager_username, self.manager_password))
self.assertEqual(response.getStatus(), 200)
self.assertIn('ERP5 Test Hook After Load Tool', response.getBody())
self.assertIn(b'ERP5 Test Hook After Load Tool', response.getBody())
from Products.ERP5Type.Core.TestComponent import TestComponent
......@@ -3233,13 +3240,11 @@ class Test(ERP5TypeTestCase):
"""
Dummy mail host has already been set up when running tests
"""
pass
def _restoreMailHost(self):
"""
Dummy mail host has already been set up when running tests
"""
pass
def test_01_sampleTest(self):
self.assertEqual(0, 0)
......@@ -3335,7 +3340,7 @@ class Test(ERP5TypeTestCase):
reset_portal_type_at_transaction_boundary=True)
output = runLiveTest('testRunLiveTest')
expected_msg_re = re.compile('Ran 2 tests.*FAILED \(failures=1\)', re.DOTALL)
expected_msg_re = re.compile(r'Ran 2 tests.*FAILED \(failures=1\)', re.DOTALL)
self.assertRegex(output, expected_msg_re)
# Now try addCleanup
......@@ -3411,18 +3416,29 @@ break_at_import()
return self._component_tool.readTestOutput()
output = runLiveTest('testRunLiveTestImportError')
self.assertIn('''
if six.PY2:
expected_output = '''
File "<portal_components/test.erp5.testRunLiveTestImportError>", line 4, in <module>
break_at_import()
File "<portal_components/test.erp5.testRunLiveTestImportError>", line 3, in break_at_import
import non.existing.module # pylint:disable=import-error
ImportError: No module named non.existing.module
''', output)
'''
else:
expected_output = '''
File "<portal_components/test.erp5.testRunLiveTestImportError>", line 4, in <module>
break_at_import()
File "<portal_components/test.erp5.testRunLiveTestImportError>", line 3, in break_at_import
import non.existing.module # pylint:disable=import-error
ModuleNotFoundError: No module named 'non'
'''
self.assertIn(expected_output, output)
output = runLiveTest('testDoesNotExist_import_error_because_module_does_not_exist')
self.assertIn(
"ImportError: No module named testDoesNotExist_import_error_because_module_does_not_exist",
output)
if six.PY2:
expected_output = "ImportError: No module named testDoesNotExist_import_error_because_module_does_not_exist"
else:
expected_output = "ModuleNotFoundError: No module named 'testDoesNotExist_import_error_because_module_does_not_exist'"
self.assertIn(expected_output, output)
def testERP5Broken(self):
# Create a broken ghost object
......@@ -3430,9 +3446,9 @@ ImportError: No module named non.existing.module
name = self._testMethodName
types_tool = self.portal.portal_types
ptype = types_tool.newContent(name, type_class="File", portal_type='Base Type')
file = ptype.constructInstance(self.portal, name, data="foo")
file = ptype.constructInstance(self.portal, name, data=b"foo")
file_uid = file.getUid()
self.assertEqual(file.size, len("foo"))
self.assertEqual(file.size, len(b"foo"))
self.commit()
try:
self.portal._p_jar.cacheMinimize()
......@@ -3448,7 +3464,7 @@ ImportError: No module named non.existing.module
# Check that the class is unghosted before resolving __setattr__
self.assertRaises(BrokenModified, setattr, file, "size", 0)
self.assertIsInstance(file, ERP5BaseBroken)
self.assertEqual(file.size, len("foo"))
self.assertEqual(file.size, len(b"foo"))
# Now if we repair the portal type definition, instances will
# no longer be broken and be modifiable again.
......@@ -3458,9 +3474,9 @@ ImportError: No module named non.existing.module
file = self.portal[name]
self.assertNotIsInstance(file, ERP5BaseBroken)
self.assertEqual(file.getUid(), file_uid)
self.assertEqual(file.getData(), "foo")
file.setData("something else")
self.assertEqual(file.getData(), "something else")
self.assertEqual(file.getData(), b"foo")
file.setData(b"something else")
self.assertEqual(file.getData(), b"something else")
self.assertNotIn("__Broken_state__", file.__dict__)
finally:
self.portal._delObject(name)
......@@ -3660,6 +3676,8 @@ class TestZodbDocumentComponentReload(ERP5TypeTestCase):
component = self.portal.portal_components['document.erp5.BusinessProcess']
component.setTextContent(value)
self.tic()
self.assertEqual(component.checkConsistency(), [])
self.assertEqual(component.getValidationState(), 'validated')
def testAsComposedDocumentCacheIsCorrectlyFlushed(self):
component = self.portal.portal_components['document.erp5.BusinessProcess']
......
......@@ -34,6 +34,7 @@ of Portal Type as Classes and ZODB Components
import pickle
import unittest
import warnings
import six
from Acquisition import aq_base
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from AccessControl.ZopeGuards import guarded_import
......@@ -189,6 +190,9 @@ class TestERP5Type(ERP5TypeTestCase, LogInterceptor):
not_ok = NotOk().__of__(doc)
self.assertRaises(ValueError, getattr, not_ok, 'attr')
if six.PY3:
self.assertRaises(ValueError, hasattr, not_ok, 'attr')
else:
self.assertFalse(hasattr(not_ok, 'attr'))
def test_renameObjectsReindexSubobjects(self):
......
......@@ -27,10 +27,10 @@
##############################################################################
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type import WITH_LEGACY_WORKFLOW
import StringIO
from six.moves import cStringIO as StringIO
import unittest
import urllib
import httplib
from six.moves.urllib.parse import urlencode
import six.moves.http_client
class TestUpgradeInstanceWithOldDataFsWithLegacyWorkflow(ERP5TypeTestCase):
......@@ -120,7 +120,7 @@ class TestUpgradeInstanceWithOldDataFsWithLegacyWorkflow(ERP5TypeTestCase):
ret = self.publish(
'%s/portal_alarms/promise_check_upgrade' % self.portal.getPath(),
basic='%s:current' % self.id(),
stdin=StringIO.StringIO(urllib.urlencode({
stdin=StringIO(urlencode({
'Base_callDialogMethod:method': '',
'dialog_id': 'Alarm_viewSolveDialog',
'dialog_method': 'Alarm_solve',
......@@ -130,7 +130,7 @@ class TestUpgradeInstanceWithOldDataFsWithLegacyWorkflow(ERP5TypeTestCase):
request_method="POST",
handle_errors=False
)
self.assertEqual(httplib.FOUND, ret.getStatus())
self.assertEqual(six.moves.http_client.FOUND, ret.getStatus())
alarm.Alarm_solve()
......
......@@ -397,7 +397,7 @@ def parseListeningAddress(host_port=None, default_host='127.0.0.1'):
m = 499 # must be a prime number
x = instance_random.randrange(0, m)
c = instance_random.randrange(1, m)
for i in xrange(m):
for i in range(m):
yield default_host, 55000 + x
x = (x + c) % m
raise RuntimeError("Can't find free port (tried ports %u to %u)\n"
......@@ -591,7 +591,7 @@ def updateCellList(portal, line, cell_type, cell_range_method, cell_dict_list):
def getSortedCategoryList(line, base_id, category_list):
result = []
index_list = line.index[base_id].keys()
index_list = list(line.index[base_id].keys())
index_list.sort()
for category in category_list:
for index in index_list:
......@@ -668,7 +668,7 @@ def updateCellList(portal, line, cell_type, cell_range_method, cell_dict_list):
*category_list)
cell.edit(**mapped_value_dict)
cell.setMappedValuePropertyList(mapped_value_dict.keys())
cell.setMappedValuePropertyList(list(mapped_value_dict.keys()))
base_category_list = [category_path
for category_path in category_list
......
......@@ -56,11 +56,11 @@ class StringValidatorTestCase(ValidatorTestCase):
self.assertEqual('<html>', result)
def test_encoding(self):
utf8_string = 'M\303\274ller' # this is a M&uuml;ller
unicode_string = unicode(utf8_string, 'utf-8')
utf8_bytes = b'M\303\274ller' # this is a M&uuml;ller
unicode_string = utf8_bytes.decode('utf-8')
result = self.v.validate(
TestField('f', max_length=0, truncate=0, required=0, unicode=1),
'f', {'f' : utf8_string})
'f', {'f' : utf8_bytes})
self.assertEqual(unicode_string, result)
def test_strip_whitespace(self):
......
......@@ -36,8 +36,10 @@ class FakeRequest:
def clear(self):
self.dict.clear()
def __nonzero__(self):
return 0
def __bool__(self):
return False
if six.PY2:
__nonzero__ = __bool__
class SerializeTestCase(unittest.TestCase):
def test_simpleSerialize(self):
......
......@@ -26,6 +26,7 @@ from unittest import expectedFailure
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.PythonScripts.PythonScript import PythonScript
import six
from six.moves import range
class HBTreeFolder2Tests(ERP5TypeTestCase):
......
......@@ -102,7 +102,7 @@ class TestDeferredConnection(ERP5TypeTestCase):
Check that a basic query succeeds.
"""
connection = self.getDeferredConnection()
connection.query('REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
connection.query(b'REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
try:
self.commit()
except OperationalError:
......@@ -119,7 +119,7 @@ class TestDeferredConnection(ERP5TypeTestCase):
"""
connection = self.getDeferredConnection()
# Queue a query
connection.query('REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
connection.query(b'REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
# Replace dynamically the function used to send queries to mysql so it's
# dumber than the implemented one.
self.monkeypatchConnection(connection)
......@@ -128,7 +128,7 @@ class TestDeferredConnection(ERP5TypeTestCase):
try:
self.commit()
except OperationalError as m:
if m[0] not in hosed_connection:
if m.args[0] not in hosed_connection:
raise
else:
self.fail()
......@@ -144,7 +144,7 @@ class TestDeferredConnection(ERP5TypeTestCase):
"""
connection = self.getDeferredConnection()
# Queue a query
connection.query('REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
connection.query(b'REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
# Artificially cause a connection close.
self.monkeypatchConnection(connection)
try:
......@@ -160,10 +160,10 @@ class TestDeferredConnection(ERP5TypeTestCase):
"""
connection = self.getDeferredConnection()
# Queue a query
connection.query('REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
connection.query(b'REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
self.assertEqual(len(connection._sql_string_list), 1)
self.commit()
connection.query('REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
connection.query(b'REPLACE INTO `full_text` SET `uid`=0, `SearchableText`="dummy test"')
self.assertEqual(len(connection._sql_string_list), 1)
if __name__ == '__main__':
......
......@@ -40,6 +40,7 @@ from Products.ZSQLCatalog.Query.RelatedQuery import RelatedQuery
from DateTime import DateTime
from Products.ZSQLCatalog.SQLExpression import MergeConflictError
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.Utils import ensure_list
import six
from AccessControl.ZopeGuards import guarded_getattr
......@@ -101,7 +102,7 @@ class ReferenceQuery:
else:
self.args.append(arg)
if len(kw) == 1:
self.column, value = kw.items()[0]
self.column, value = ensure_list(kw.items())[0]
if not isinstance(value, MatchList):
value = MatchList([value])
self.value = value
......@@ -120,7 +121,7 @@ class ReferenceQuery:
return False
other_query_list = other.query_list[:]
for subquery in self.args:
for other_query_id in xrange(len(other_query_list)):
for other_query_id in range(len(other_query_list)):
other_query = other_query_list[other_query_id]
if subquery == other_query:
other_query_list.pop(other_query_id)
......@@ -265,12 +266,10 @@ class TestSQLCatalog(ERP5TypeTestCase):
self.assertRaises(exception, self._catalog, src__=1, query_table='foo', **kw)
def catalog(self, reference_tree, kw, check_search_text=True,
check_select_expression=True, expected_failure=False):
check_select_expression=True):
reference_param_dict = self._catalog.buildSQLQuery(query_table='foo', **kw)
query = self._catalog.buildEntireQuery(kw).query
assertEqual = self.assertEqual
if expected_failure:
assertEqual = unittest.expectedFailure(assertEqual)
assertEqual(reference_tree, query)
search_text = query.asSearchTextExpression(self._catalog)
......
......@@ -112,9 +112,9 @@ class ZuiteTests( unittest.TestCase ):
return old
def _verifyArchive( self, bits, contents ):
import StringIO
from io import BytesIO
import zipfile
stream = StringIO.StringIO( bits )
stream = BytesIO( bits )
archive = zipfile.ZipFile( stream, 'r' )
names = list( archive.namelist() )
......@@ -139,9 +139,9 @@ class ZuiteTests( unittest.TestCase ):
def _verifyManifest( self, bits, name, contents ):
import StringIO
from io import BytesIO
import zipfile
stream = StringIO.StringIO( bits )
stream = BytesIO( bits )
archive = zipfile.ZipFile( stream, 'r' )
manifest = filter( None, archive.read( name ).split( '\n' ) )
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment