Commit c4d2dc47 authored by Kazuhiko Shiozaki's avatar Kazuhiko Shiozaki Committed by Jérome Perrin

py2/py3: import from six.moves.

parent a896c9b4
......@@ -31,8 +31,7 @@ from io import BytesIO
import json
from os import urandom
from time import time
import urllib
import urlparse
from six.moves.urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
import uuid
from cryptography.hazmat.backends import default_backend
from cryptography import fernet
......@@ -145,7 +144,7 @@ def substituteRequest(
environ = request.environ
inner_environ_dict = environ.copy()
inner_environ_dict['REQUEST_METHOD'] = method
inner_environ_dict['QUERY_STRING'] = urllib.urlencode(query_list)
inner_environ_dict['QUERY_STRING'] = urlencode(query_list)
if request._auth:
inner_environ_dict['HTTP_AUTHORIZATION'] = request._auth
......@@ -256,18 +255,18 @@ class _ERP5AuthorisationEndpoint(AuthorizationEndpoint):
if is_local_client and self.__login_retry_url:
# ...with a local resource server, redirect user agent to
# the provided login URL.
split_login_retry_url = urlparse.urlsplit(self.__login_retry_url)
split_login_retry_url = urlsplit(self.__login_retry_url)
return (
(
(
'Location',
urlparse.urlunsplit((
urlunsplit((
split_login_retry_url.scheme,
split_login_retry_url.netloc,
split_login_retry_url.path,
urllib.urlencode([
urlencode([
(x, y)
for x, y in urlparse.parse_qsl(split_login_retry_url.query)
for x, y in parse_qsl(split_login_retry_url.query)
if x != 'portal_status_message'
] + [(
'portal_status_message',
......@@ -299,7 +298,7 @@ class _ERP5AuthorisationEndpoint(AuthorizationEndpoint):
credentials=credentials,
)
if authorization_status == 302 and is_local_client:
split_location = urlparse.urlsplit(authorization_header_dict['Location'])
split_location = urlsplit(authorization_header_dict['Location'])
# XXX: to cut down on code complexity, this code has strong expectations on what location is.
_, client_connector_id, method_id = split_location.path.rsplit('/', 2)
if method_id != 'loggedIn':
......@@ -307,7 +306,7 @@ class _ERP5AuthorisationEndpoint(AuthorizationEndpoint):
client_connector_value = client_value.getParentValue().getParentValue()[client_connector_id]
if client_connector_value.getPortalType() != 'OAuth2 Authorisation Client Connector':
raise ValueError(split_location.path)
query_list = urlparse.parse_qsl(split_location.query)
query_list = parse_qsl(split_location.query)
# Note: query string generation should not have produce any duplicate
# entries, so convert into a dict for code simplicity.
query_dict = {
......@@ -361,7 +360,7 @@ class _ERP5AuthorisationEndpoint(AuthorizationEndpoint):
for key, value in six.iteritems(request_info_dict):
if value is None:
continue
if not isinstance(value, basestring):
if not isinstance(value, six.text_type):
raise TypeError((key, repr(value)))
new_request_info_dict[key] = value
inner_response = HTTPResponse(stdout=None, stderr=None)
......@@ -385,7 +384,7 @@ class _ERP5AuthorisationEndpoint(AuthorizationEndpoint):
# Use the internal path back to us so it can be traversed to while
# still in the just-authenticated request.
(
self.__server_connector_path + '?' + urlparse.urlsplit(uri).query
self.__server_connector_path + '?' + urlsplit(uri).query
) if is_local_client else
# Use the external URL back to us so user can be redirected to it,
# as they are then authenticated over multiple requests.
......@@ -407,8 +406,8 @@ class _ERP5AuthorisationEndpoint(AuthorizationEndpoint):
login_form = neutral_context_value.login_form
portal_status_message_list = [
value
for name, value in urlparse.parse_qsl(
urlparse.urlsplit(came_from).query,
for name, value in parse_qsl(
urlsplit(came_from).query,
)
if name == 'portal_status_message'
]
......@@ -763,8 +762,8 @@ class _ERP5RequestValidator(RequestValidator):
# redirect_uri path, but it may be under an extra layer of VirtualHost Monster
# magic.
# Client is declared local, accept any redirect URI on our scheme and netloc.
split_my_url = urlparse.urlsplit(client_value.absolute_url())
split_redirect_uri = urlparse.urlsplit(redirect_uri)
split_my_url = urlsplit(client_value.absolute_url())
split_redirect_uri = urlsplit(redirect_uri)
return (
split_my_url.scheme == split_redirect_uri.scheme and
split_my_url.netloc == split_redirect_uri.netloc
......@@ -854,10 +853,10 @@ def _callEndpoint(endpoint, self, REQUEST):
if request_body is None and content_type == 'application/x-www-form-urlencoded':
# XXX: very imperfect, but should be good enough for OAuth2 usage:
# no standard OAuth2 POST field should be marshalled by Zope.
request_body = urllib.urlencode([
request_body = urlencode([
(x, y)
for x, y in six.iteritems(REQUEST.form)
if isinstance(y, basestring)
if isinstance(y, six.text_type)
])
uri = other.get('URL', '')
query_string = environ.get('QUERY_STRING')
......
......@@ -6,14 +6,14 @@ Once the user is authenticated, the same value can be accessed with:
from AccessControl import getSecurityManager
getSecurityManager().getUser().getClientId()
"""
import urlparse
from six.moves.urllib.parse import parse_qsl, urlsplit
# The came_from for login_once_form is special: it has no scheme, no netloc, a path and a query.
# Verify this so caller knows if they are providing the wrong value.
if not context.ERP5Site_isOAuth2CameFrom(came_from=came_from):
raise ValueError
result, = [
value
for name, value in urlparse.parse_qsl(urlparse.urlsplit(came_from).query)
for name, value in parse_qsl(urlsplit(came_from).query)
if name == 'client_id'
]
return result
......@@ -2,8 +2,8 @@
OAuth2's /authorize endpoint produces a very specific format of came_from, with very specific meaning (not a real URL).
This script returns True value if given such came_from, and False otherwise.
"""
import urlparse
parsed_came_from = urlparse.urlsplit(came_from)
from six.moves.urllib.parse import urlsplit
parsed_came_from = urlsplit(came_from)
return bool(
not parsed_came_from.scheme and
not parsed_came_from.netloc and
......
......@@ -3,16 +3,16 @@
Retry calling /authorize using the values in came_from
(which a previous call to /authorize generated, and is not a traditional came_from).
"""
import urlparse
from six.moves.urllib.parse import parse_qsl, urlsplit
from erp5.component.document.OAuth2AuthorisationServerConnector import substituteRequest
if not context.ERP5Site_isOAuth2CameFrom(came_from):
# came_from is broken, there is no way to call /authorize , so escape to wherever.
context.Base_redirect()
return
parsed_came_from = urlparse.urlsplit(came_from)
parsed_came_from = urlsplit(came_from)
query_list = [
(key, value)
for key, value in urlparse.parse_qsl(parsed_came_from.query)
for key, value in parse_qsl(parsed_came_from.query)
if key != 'portal_status_message'
]
if portal_status_message is not None:
......
......@@ -3,7 +3,7 @@
Similar to logged_in, but user authentication will only last for current request if nothing else is done.
So came_from must be honoured within the current request, and not redirected to.
"""
import urlparse
from six.moves.urllib.parse import parse_qsl, urlsplit
from erp5.component.document.OAuth2AuthorisationServerConnector import substituteRequest
portal = context.getPortalObject()
if portal.portal_skins.updateSkinCookie():
......@@ -28,7 +28,7 @@ if not came_from or not context.ERP5Site_isOAuth2CameFrom(came_from):
# came_from is broken, there is no way to call authorize, so escape to wherever.
context.Base_redirect()
return
parsed_came_from = urlparse.urlsplit(came_from)
parsed_came_from = urlsplit(came_from)
# Turn the ZODB path from came_from into a relative URL and base it on context (and not portal) to
# work as expected from within Web Sites without Virtual Host Monster relocating them above portal.
connector_value = context.restrictedTraverse(parsed_came_from.path.lstrip('/'))
......@@ -40,7 +40,7 @@ if (
return
# Note: query string generation should not have produce any duplicate
# entries, so directly use to update form dict for code simplicity.
form = dict(urlparse.parse_qsl(parsed_came_from.query))
form = dict(parse_qsl(parsed_came_from.query))
login_retry_url = REQUEST.form.get('login_retry_url')
if login_retry_url is not None:
form['login_retry_url'] = login_retry_url
......
......@@ -29,15 +29,14 @@ import base64
from collections import defaultdict
from functools import partial, wraps
import hashlib
import HTMLParser
from six.moves.html_parser import HTMLParser
from io import BytesIO
import json
import random
import pprint
from time import time
import unittest
import urllib
import urlparse
from six.moves.urllib.parse import parse_qsl, quote, unquote, urlencode, urlsplit, urlunsplit
import six
from AccessControl.SecurityManagement import getSecurityManager, setSecurityManager
from DateTime import DateTime
......@@ -50,6 +49,7 @@ import Zope2
from ZPublisher.mapply import mapply
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
from six.moves import xrange
_TEST_ACCESS_COOKIE_NAME = '__Site-test_at'
_TEST_REFRESH_COOKIE_NAME = '__Site-test_rt'
......@@ -61,11 +61,11 @@ _HTML_FIELD_TAG_SET = {
'submit',
# Very incomplete, but enough for this tests' purpose: ignores "select"s...
}
class FormExtractor(HTMLParser.HTMLParser):
class FormExtractor(HTMLParser):
def reset(self):
self.__in_form = False
self.form_list = []
HTMLParser.HTMLParser.reset(self)
HTMLParser.reset(self)
def handle_starttag(self, tag, attribute_item_list):
attr_dict = dict(attribute_item_list)
......@@ -181,7 +181,7 @@ class TestOAuth2(ERP5TypeTestCase):
def afterSetUp(self):
super(TestOAuth2, self).afterSetUp()
parsed_site_url = urlparse.urlsplit(self.portal.absolute_url())
parsed_site_url = urlsplit(self.portal.absolute_url())
self.__scheme = parsed_site_url.scheme
context_netloc_list = parsed_site_url.netloc.rsplit(':', 1)
try:
......@@ -292,7 +292,7 @@ class TestOAuth2(ERP5TypeTestCase):
cleanup_list = self.__cleanup_list
# XXX: imperfect cleanup if indexation did not complete
cleanup_list.extend(
x.getObject() for x in self.__searchOAuth2Session(),
x.getObject() for x in self.__searchOAuth2Session()
)
parent_dict = defaultdict(list)
for document_value in cleanup_list:
......@@ -353,7 +353,7 @@ class TestOAuth2(ERP5TypeTestCase):
cookie_header = ';'.join(
'%s="%s"' % (
name,
urllib.quote(cookie_dict['value']),
quote(cookie_dict['value']),
) for name, cookie_dict in six.iteritems(dict(cookie_dict))
if cookie_dict
)
......@@ -482,7 +482,7 @@ class TestOAuth2(ERP5TypeTestCase):
Assert that given call redirects to given location with given status.
Only scheme, netloc and path are matched (ex: query is ignored).
"""
parsed_reference_location = urlparse.urlsplit(reference_location)
parsed_reference_location = urlsplit(reference_location)
status, header_dict, cookie_dict, body = query_result
self.assertIn(
body.strip(),
......@@ -493,7 +493,7 @@ class TestOAuth2(ERP5TypeTestCase):
header_dict.get('location', b''),
),
)
parsed_location = urlparse.urlsplit(header_dict.get('location', ''))
parsed_location = urlsplit(header_dict.get('location', ''))
self.assertEqual(
(
status,
......@@ -560,13 +560,13 @@ class TestOAuth2(ERP5TypeTestCase):
raise ValueError('No field name ending with ":method"')
# Call Base_callDialogMethod
status, inner_header_dict, inner_cookie_dict, body = self._query(
path=urlparse.urlsplit(action_url).path + '/' + script_id,
path=urlsplit(action_url).path + '/' + script_id,
method='POST',
client_ip=client_ip,
content_type='application/x-www-form-urlencoded',
header_dict=header_dict,
cookie_dict=cookie_dict,
body=urllib.urlencode(list(value_callback(
body=urlencode(list(value_callback(
field_item_list=tuple(
(key, value)
for key, value in field_list
......@@ -589,7 +589,7 @@ class TestOAuth2(ERP5TypeTestCase):
# portal, so if it is outside we know the redirection comes from the
# action script and we are done.
if location.startswith(portal.absolute_url()):
parsed_location = urlparse.urlsplit(location)
parsed_location = urlsplit(location)
dialog_method, = [
value
for key, value in field_list
......@@ -642,7 +642,7 @@ class TestOAuth2(ERP5TypeTestCase):
If the login form is displayed but this is None, test fails.
If the login for is not displayed and this is not None, test fails.
Called with:
parsed_location (urlparse.urlsplit)
parsed_location (urlsplit)
Parsed locator. Use this if you want, for example, to access the portal_status_message.
See _submitDialog for further signature definitions.
authentication_is_local (bool)
......@@ -658,7 +658,7 @@ class TestOAuth2(ERP5TypeTestCase):
throughout the course of this method.
Returns:
parsed_location (urlparse.urlsplit)
parsed_location (urlsplit)
Parsed version of the actual redirection location aimed at redirect_uri.
cookie_dict (dict)
Flattened view of all response set-cookie headers.
......@@ -676,7 +676,7 @@ class TestOAuth2(ERP5TypeTestCase):
else:
cookie_jar[key] = value
cookie_dict[key] = value
parsed_redirect_uri = urlparse.urlsplit(redirect_uri)
parsed_redirect_uri = urlsplit(redirect_uri)
def isRedirectURI(parsed_location):
return (
parsed_location.scheme == parsed_redirect_uri.scheme and
......@@ -686,7 +686,7 @@ class TestOAuth2(ERP5TypeTestCase):
assert not parsed_redirect_uri.query
assert not parsed_redirect_uri.fragment
# XXX: just to satisfy authentication_callback
parsed_location = urlparse.urlsplit(urlparse.urlunsplit((
parsed_location = urlsplit(urlunsplit((
'',
'',
path,
......@@ -713,7 +713,7 @@ class TestOAuth2(ERP5TypeTestCase):
updateCookieDictAndJar(inner_cookie_dict)
if status == 302:
# Being redirected...
parsed_location = urlparse.urlsplit(inner_header_dict.get('location', ''))
parsed_location = urlsplit(inner_header_dict.get('location', ''))
if isRedirectURI(parsed_location):
# ...to client: check if this is expected and leave
self.assertTrue(
......@@ -808,7 +808,7 @@ class TestOAuth2(ERP5TypeTestCase):
client_id = oauth2_client_declaration_value.getId()
parsed_location, cookie_dict, time_before, time_after = response = self._authorise(
path=oauth2_server_connector + '/authorize',
query=urllib.urlencode({
query=urlencode({
'response_type': 'code',
'client_id': client_id,
'state': reference_state,
......@@ -827,7 +827,7 @@ class TestOAuth2(ERP5TypeTestCase):
},
)
self.assertEqual(cookie_dict, {})
query_list = urlparse.parse_qsl(parsed_location.query)
query_list = parse_qsl(parsed_location.query)
query_dict = dict(query_list)
self.assertEqual(len(query_list), len(query_dict), (query_list, query_dict))
authorisation_code = query_dict['code']
......@@ -848,7 +848,7 @@ class TestOAuth2(ERP5TypeTestCase):
path=oauth2_server_connector + '/token',
method='POST',
content_type='application/x-www-form-urlencoded',
body=urllib.urlencode({
body=urlencode({
'grant_type': 'authorization_code',
'code': authorisation_code,
'client_id': client_id,
......@@ -876,7 +876,7 @@ class TestOAuth2(ERP5TypeTestCase):
path=oauth2_server_connector + '/token',
method='POST',
content_type='application/x-www-form-urlencoded',
body=urllib.urlencode({
body=urlencode({
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
}),
......@@ -893,7 +893,7 @@ class TestOAuth2(ERP5TypeTestCase):
path=oauth2_server_connector + '/revoke',
method='POST',
content_type='application/x-www-form-urlencoded',
body=urllib.urlencode({
body=urlencode({
'token_type_hint': 'refresh_token',
'token': refresh_token,
}),
......@@ -909,7 +909,7 @@ class TestOAuth2(ERP5TypeTestCase):
path=oauth2_server_connector + '/token',
method='POST',
content_type='application/x-www-form-urlencoded',
body=urllib.urlencode({
body=urlencode({
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
}),
......@@ -1018,8 +1018,8 @@ class TestOAuth2(ERP5TypeTestCase):
_, _, cookie_dict, _ = query_result = self._query(
path=parsed_location.path,
method='GET',
query=urllib.urlencode(
urlparse.parse_qsl(parsed_location.query) + [
query=urlencode(
parse_qsl(parsed_location.query) + [
('client_id', oauth2_client_connector_value.getReference()),
],
),
......@@ -1203,8 +1203,8 @@ class TestOAuth2(ERP5TypeTestCase):
),
reference_location=portal_url + 'login_form',
)
login_form_query = urllib.urlencode(
urlparse.parse_qsl(parsed_login_form_location.query) + [
login_form_query = urlencode(
parse_qsl(parsed_login_form_location.query) + [
# Pick the local client_id, for simplicity
('client_id', self.__oauth2_local_client_connector_value.getReference()),
],
......
......@@ -31,13 +31,12 @@ import email.utils
import functools
import hashlib
import hmac
import httplib
from six.moves.http_client import HTTPConnection, HTTPSConnection
import json
from os import urandom
import random
from time import time
import urllib
import urlparse
from six.moves.urllib.parse import urlencode, urljoin, urlparse
import ssl
from AccessControl import (
ClassSecurityInfo,
......@@ -191,7 +190,7 @@ class _OAuth2AuthorisationServerProxy(object):
ca_certificate_pem,
insecure,
):
scheme = urlparse.urlsplit(authorisation_server_url).scheme
scheme = urlsplit(authorisation_server_url).scheme
if scheme != 'https' and not insecure:
raise ValueError('Only https access to Authorisation Server is allowed')
self._scheme = scheme
......@@ -210,7 +209,7 @@ class _OAuth2AuthorisationServerProxy(object):
def _query(self, method_id, body, header_dict=()):
plain_url = self._authorisation_server_url + '/' + method_id
parsed_url = urlparse.urlparse(plain_url)
parsed_url = urlparse(plain_url)
if self._scheme == 'https':
ssl_context = ssl.create_default_context(
cadata=self._ca_certificate_pem,
......@@ -222,11 +221,11 @@ class _OAuth2AuthorisationServerProxy(object):
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.check_hostname = True
Connection = functools.partial(
httplib.HTTPSConnection,
HTTPSConnection,
context=ssl_context,
)
else:
Connection = httplib.HTTPConnection
Connection = HTTPConnection
timeout = getTimeLeft()
if timeout is None or timeout > self._timeout:
timeout = self._timeout
......@@ -256,7 +255,7 @@ class _OAuth2AuthorisationServerProxy(object):
def _queryERP5(self, method_id, kw=()):
header_dict, body, status = self._query(
method_id=method_id,
body=urllib.urlencode(kw),
body=urlencode(kw),
header_dict={
'Accept': 'application/json;charset=UTF-8',
'Content-Type': 'application/x-www-form-urlencoded',
......@@ -274,7 +273,7 @@ class _OAuth2AuthorisationServerProxy(object):
def _queryOAuth2(self, method, REQUEST, RESPONSE):
header_dict, body, status = self._query(
method,
body=urllib.urlencode(REQUEST.form.items()),
body=urlencode(REQUEST.form.items()),
header_dict={
'CONTENT_TYPE': REQUEST.environ['CONTENT_TYPE'],
},
......@@ -377,7 +376,7 @@ class OAuth2AuthorisationClientConnector(
if '/' in authorisation_server_url:
# Remote Authorisation Server
return _OAuth2AuthorisationServerProxy(
authorisation_server_url=urlparse.urljoin(
authorisation_server_url=urljoin(
# In case authorisation_server_url contains slashes but is still
# relative (to the scheme or to the netloc - path-relative is not
# supported by urljoin)
......@@ -474,7 +473,7 @@ class OAuth2AuthorisationClientConnector(
assert inner_response.status == 200
access_token = oauth2_response['access_token']
refresh_token = oauth2_response.get('refresh_token')
parsed_actual_url = urlparse.urlparse(request.other.get('ACTUAL_URL'))
parsed_actual_url = urlparse(request.other.get('ACTUAL_URL'))
same_site = self.ERP5Site_getAuthCookieSameSite(
scheme=parsed_actual_url.scheme,
hostname=parsed_actual_url.hostname,
......@@ -712,8 +711,8 @@ class OAuth2AuthorisationClientConnector(
# came_from is what the user was trying to do just before they ended up
# here, so we can redirect them there once they are authenticated.
if came_from:
parsed_came_from = urlparse.urlparse(came_from)
parsed_redirect_uri = urlparse.urlparse(redirect_uri)
parsed_came_from = urlparse(came_from)
parsed_redirect_uri = urlparse(redirect_uri)
if (
parsed_came_from.scheme != parsed_redirect_uri.scheme or
parsed_came_from.netloc != parsed_redirect_uri.netloc
......@@ -829,7 +828,7 @@ class OAuth2AuthorisationClientConnector(
'Location',
self._getAuthorisationServerValue(
REQUEST=REQUEST,
).absolute_url() + '/authorize?' + urllib.urlencode(query_list),
).absolute_url() + '/authorize?' + urlencode(query_list),
)
else:
# Provide the current URL to authorize, so that it can redirect the
......
......@@ -3,17 +3,16 @@ Modify given URL so that the resulting one prevents further login attempts when
Useful to break redirection loops.
"""
import urllib
import urlparse
from six.moves.urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
PARAMETER_NAME = 'disable_cookie_login__'
parsed_url = urlparse.urlsplit(url)
return urlparse.urlunsplit((
parsed_url = urlsplit(url)
return urlunsplit((
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
urllib.urlencode([
urlencode([
(x, y)
for x, y in urlparse.parse_qsl(parsed_url.query)
for x, y in parse_qsl(parsed_url.query)
if x != PARAMETER_NAME
] + [
(PARAMETER_NAME, '1'),
......
# Short-circuit old (pre-oauth2) web-mode "login_form"s
import urllib
from six.moves.urllib.parse import urlencode
web_section_value = context.getWebSectionValue()
client_id = context.getPortalObject().ERP5Site_getOAuth2ClientConnectorClientId(
connector_id=(
......@@ -13,7 +13,7 @@ if client_id is None:
return context.login_once_form(has_oauth2=False)
if came_from:
# Make the user go through WebSite_login after authentication, so it does its url de-templatification magic
came_from = context.absolute_url() + '/WebSite_login?' + urllib.urlencode((('came_from', came_from), ))
came_from = context.absolute_url() + '/WebSite_login?' + urlencode((('came_from', came_from), ))
return context.skinSuper('erp5_web_renderjs_ui', script.id)(
REQUEST=REQUEST,
RESPONSE=RESPONSE,
......
......@@ -26,10 +26,11 @@
#
##############################################################################
import time
import urlparse
import ssl
import httplib
import json
from six.moves.http_client import HTTPSConnection
from six.moves.urllib.parse import urlparse
from six import string_types as basestring
from Products.ERP5Type.Timeout import getTimeLeft
from contextlib import contextmanager
from Products.ERP5Type.XMLObject import XMLObject
......@@ -105,7 +106,7 @@ class RESTAPIClientConnectorMixin(XMLObject):
header_dict['content-type'] = 'application/json'
body = json.dumps(body)
plain_url = self.getBaseUrl().rstrip('/') + '/' + path.lstrip('/')
parsed_url = urlparse.urlparse(plain_url)
parsed_url = urlparse(plain_url)
ssl_context = ssl.create_default_context(
cadata=self.getCaCertificatePem(),
)
......@@ -115,7 +116,7 @@ class RESTAPIClientConnectorMixin(XMLObject):
if bind_address:
bind_address = (bind_address, 0)
time_left_before_timeout = getTimeLeft()
http_connection = httplib.HTTPSConnection(
http_connection = HTTPSConnection(
host=parsed_url.hostname,
port=parsed_url.port,
strict=True,
......@@ -184,7 +185,7 @@ class RESTAPIClientConnectorMixin(XMLObject):
with time_tracker('call'), Deadline(timeout):
# Limit numbers of retries, in case the authentication API succeeds
# but the token is not usable.
for _ in xrange(2):
for _ in range(2):
with time_tracker('token'):
access_token = self._getAccessToken()
if access_token is not None:
......
......@@ -2,7 +2,7 @@
# Copyright (c) 2002-2015 Nexedi SA and Contributors. All Rights Reserved.
from json import dumps
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from httplib import HTTPSConnection
from six.moves.http_client import HTTPSConnection
from erp5.component.mixin.RESTAPIClientConnectorMixin import RESTAPIClientConnectorMixin
from ssl import SSLError
from Products.ERP5Type.Timeout import TimeoutReachedError
......@@ -71,11 +71,11 @@ class TestRESTAPIClientConnector(ERP5TypeTestCase):
with mock.patch(
'ssl.create_default_context',
) as mock_ssl_create_default_context, mock.patch(
'httplib.HTTPSConnection.request',
'six.moves.http_client.HTTPSConnection.request',
) as mock_https_connection_request, mock.patch(
'httplib.HTTPSConnection.getresponse',
'six.moves.http_client.HTTPSConnection.getresponse',
return_value=HTTPResponse_getresponse()
), mock.patch('httplib.HTTPSConnection', return_value=HTTPSConnection) as mock_https_connection:
), mock.patch('six.moves.http_client.HTTPSConnection', return_value=HTTPSConnection) as mock_https_connection:
header_dict, body_dict, status = self.rest_api_client_connection.call(
archive_resource=None,
method='POST',
......@@ -145,9 +145,9 @@ class TestRESTAPIClientConnector(ERP5TypeTestCase):
with mock.patch(
'ssl.create_default_context',
), mock.patch(
'httplib.HTTPSConnection.request',
'six.moves.http_client.HTTPSConnection.request',
), mock.patch(
'httplib.HTTPSConnection.getresponse',
'six.moves.http_client.HTTPSConnection.getresponse',
return_value=HTTPResponse_getresponse(498)
):
with self.assertRaises(RESTAPIError) as error:
......@@ -175,9 +175,9 @@ class TestRESTAPIClientConnector(ERP5TypeTestCase):
with mock.patch(
'ssl.create_default_context',
), mock.patch(
'httplib.HTTPSConnection.request',
'six.moves.http_client.HTTPSConnection.request',
), mock.patch(
'httplib.HTTPSConnection.getresponse',
'six.moves.http_client.HTTPSConnection.getresponse',
) as mock_https_connection_getresponse:
mock_https_connection_getresponse.side_effect = SSLError('The read operation timed out')
self.assertRaises(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment