Commit e7b9c61c authored by Jérome Perrin's avatar Jérome Perrin

oauth_google_login: reimplement with oauthlib/requests

Because the librairies used here were never ported to python3.

Notable changes:

  - ERP5Site_createGoogleUserToOAuth is dropped
  - internal API changed radically, so customizations made by overriding
    scripts are broken.
  - the core logic is now implemented in a connector class (still in
    portal_oauth for simplicity, but it would be simpler to move it to
    portal_web_services)

No changes required in the google console, the redirect uri is still
ERP5Site_receiveGoogleCallback
parent 31fcb818
##############################################################################
# Copyright (c) 2024 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from AccessControl import ClassSecurityInfo
import random
import string
import time
import oauthlib.oauth2
import requests
from zExceptions import Unauthorized
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type import Permissions
from Products.ERP5Type.Utils import unicode2str
from Products.ERP5Type.Timeout import getTimeLeft
AUTH_URL = 'https://accounts.google.com/o/oauth2/auth'
TOKEN_URL = 'https://accounts.google.com/o/oauth2/token'
USER_INFO_URL = 'https://www.googleapis.com/oauth2/v1/userinfo'
SCOPE_LIST = ['https://www.googleapis.com/auth/userinfo.profile',
'https://www.googleapis.com/auth/userinfo.email']
# Default timeout (in seconds) for the HTTP request made to google servers to
# exchange the authorization code for a token.
DEFAULT_HTTP_TIMEOUT = 10
class GoogleConnector(XMLObject):
meta_type = 'ERP5 Google Connector'
portal_type = 'Google Connector'
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
@security.public
def redirectToGoogleLoginPage(self, redirect_uri, RESPONSE):
"""Redirect to authorization page.
"""
authorization_url = self._getOAuthlibClient().prepare_request_uri(
uri=AUTH_URL,
redirect_uri=redirect_uri,
scope=SCOPE_LIST,
access_type="offline",
include_granted_scopes='true',
prompt="consent",
state=self._getAuthorizationState(),
)
return RESPONSE.redirect(authorization_url)
@security.public # XXX public but not publishable
def getTokenFromCode(self, state, code, redirect_uri):
self._verifyAuthorizationState(state)
body = self._getOAuthlibClient().prepare_request_body(
code=code,
client_secret=self.getSecretKey(),
redirect_uri=redirect_uri,
)
resp = requests.post(
TOKEN_URL,
data=body,
headers={'Content-Type': 'application/x-www-form-urlencoded'},
timeout=self._getTimeout(),
)
__traceback_info__ = (resp.content, resp.status_code)
resp.raise_for_status()
return self._getGoogleTokenFromJSONResponse(resp.json())
@security.private
def refreshToken(self, token):
"""Refresh auth token.
Used by Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin
"""
body = self._getOAuthlibClient().prepare_refresh_body(
client_id=self.getClientId(),
client_secret=self.getSecretKey(),
access_type="offline",
refresh_token=token['refresh_token'],
)
resp = requests.post(
TOKEN_URL,
data=body,
headers={'Content-Type': 'application/x-www-form-urlencoded'},
timeout=self._getTimeout(),
)
if not resp.ok:
return {}
return self._getGoogleTokenFromJSONResponse(resp.json())
@security.private
def getUserEntry(self, access_token):
resp = requests.get(
USER_INFO_URL,
headers={'Authorization': 'Bearer {}'.format(access_token)},
timeout=self._getTimeout(),
)
resp.raise_for_status()
google_entry = resp.json()
user_entry = {}
# remap user info
for erp5_key, google_key in (
('first_name', 'given_name'),
('last_name', 'family_name'),
('email', 'email'),
('reference', 'email'),
):
user_entry[erp5_key] = unicode2str(google_entry.get(google_key, ''))
return user_entry
def _getOAuthlibClient(self):
return oauthlib.oauth2.WebApplicationClient(
self.getClientId(),
access_type="offline",
)
def _getGoogleTokenFromJSONResponse(self, token):
return {
'access_token': unicode2str(token['access_token']),
'refresh_token': unicode2str(token['refresh_token']),
'expires_in': token['expires_in'],
'response_timestamp': time.time(),
'connector_relative_url': self.getRelativeUrl(),
}
def _getAuthorizationState(self):
alphabet = string.ascii_letters + string.digits
state = ''.join(random.SystemRandom().choice(alphabet) for _ in range(32))
self.getPortalObject().portal_sessions['google_login_auth_state'][state] = True
return state
def _verifyAuthorizationState(self, state):
if not self.getPortalObject().portal_sessions['google_login_auth_state'].pop(state, False):
raise Unauthorized
def _getTimeout(self):
"""Compute the time left according to publisher deadline.
"""
time_left = getTimeLeft()
if time_left is None:
time_left = DEFAULT_HTTP_TIMEOUT
return min(self.getTimeout() or DEFAULT_HTTP_TIMEOUT, time_left)
......@@ -2,13 +2,13 @@
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Extension Component" module="erp5.portal_type"/>
<global name="Document Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>default_reference</string> </key>
<value> <string>GoogleLoginUtility</string> </value>
<value> <string>GoogleConnector</string> </value>
</item>
<item>
<key> <string>description</string> </key>
......@@ -18,11 +18,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>extension.erp5.GoogleLoginUtility</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Extension Component</string> </value>
<value> <string>document.erp5.GoogleConnector</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
......
import json
import oauth2client.client
import oauth2client.transport
from Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin import getGoogleUserEntry
from zExceptions import Unauthorized
SCOPE_LIST = ['https://www.googleapis.com/auth/userinfo.profile',
'https://www.googleapis.com/auth/userinfo.email']
# Default timeout (in seconds) for the HTTP request made to google servers to
# exchange the authorization code for a token.
DEFAULT_HTTP_TIMEOUT = 10
def _getGoogleClientIdAndSecretKey(portal, reference="default"):
"""Returns google client id and secret key.
Internal function.
"""
result_list = unrestrictedSearchGoogleConnector(portal, reference=reference)
assert result_list, "Google Connector not found"
if len(result_list) == 2:
raise ValueError("Impossible to select one Google Connector")
google_connector = result_list[0].getObject()
return google_connector.getClientId(), google_connector.getSecretKey()
def redirectToGoogleLoginPage(self):
client_id, secret_key = _getGoogleClientIdAndSecretKey(self.getPortalObject())
flow = oauth2client.client.OAuth2WebServerFlow(
client_id=client_id,
client_secret=secret_key,
scope=SCOPE_LIST,
redirect_uri="{0}/ERP5Site_receiveGoogleCallback".format(self.absolute_url()),
access_type="offline",
prompt="consent",
include_granted_scopes="true")
self.REQUEST.RESPONSE.redirect(flow.step1_get_authorize_url())
def getAccessTokenFromCode(self, code, redirect_uri, timeout=DEFAULT_HTTP_TIMEOUT):
client_id, secret_key = _getGoogleClientIdAndSecretKey(self.getPortalObject())
flow = oauth2client.client.OAuth2WebServerFlow(
client_id=client_id,
client_secret=secret_key,
scope=SCOPE_LIST,
redirect_uri=redirect_uri,
access_type="offline",
include_granted_scopes="true")
credential = flow.step2_exchange(
code,
http=oauth2client.transport.get_http_object(timeout=timeout))
credential_data = json.loads(credential.to_json())
return credential_data
def unrestrictedSearchGoogleConnector(self, reference="default"):
return self.getPortalObject().portal_catalog.unrestrictedSearchResults(
portal_type="Google Connector",
reference=reference,
validation_state="validated",
limit=2)
def unrestrictedSearchGoogleLogin(self, login, REQUEST=None):
if REQUEST is not None:
raise Unauthorized
return self.getPortalObject().portal_catalog.unrestrictedSearchResults(
portal_type="Google Login",
reference=login,
validation_state="validated", limit=1)
def getUserEntry(access_token):
return getGoogleUserEntry(access_token)
\ No newline at end of file
<property_sheet_list>
<portal_type id="Google Connector">
<item>OAuthClient</item>
<item>SocketClient</item>
</portal_type>
<portal_type id="Template Tool">
<item>TemplateToolERP5GoogleExtractionPluginConstraint</item>
......
......@@ -40,7 +40,7 @@
</item>
<item>
<key> <string>type_class</string> </key>
<value> <string>XMLObject</string> </value>
<value> <string>GoogleConnector</string> </value>
</item>
<item>
<key> <string>type_interface</string> </key>
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>getAccessTokenFromCode</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>GoogleLoginUtility</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getAccessTokenFromCode</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
portal = context.getPortalObject()
result_list = portal.portal_catalog(
portal_type="Google Connector",
reference=reference,
validation_state="validated",
limit=2)
if len(result_list) != 1:
raise ValueError("Impossible to select one Google Connector")
return result_list[0].getObject()
......@@ -50,19 +50,19 @@
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>login, REQUEST=None</string> </value>
<value> <string>reference="default"</string> </value>
</item>
<item>
<key> <string>_proxy_roles</string> </key>
<value>
<tuple>
<string>Manager</string>
<string>Auditor</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getPersonFromGoogleLogin</string> </value>
<value> <string>ERP5Site_getDefaultGoogleConnector</string> </value>
</item>
</dictionary>
</pickle>
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>unrestrictedSearchGoogleConnector</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>GoogleLoginUtility</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getGoogleConnector</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>unrestrictedSearchGoogleConnector</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>GoogleLoginUtility</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getGoogleLogin</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>getUserEntry</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>GoogleLoginUtility</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getGoogleUserEntry</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
from zExceptions import Unauthorized
if REQUEST is not None:
raise Unauthorized
login = context.ERP5Site_getGoogleLogin(login)
if login is None:
return login
if len(login) > 1:
raise ValueError("Duplicated User")
return login[0].getParentValue().getRelativeUrl()
import time
from Products.ERP5Type.Utils import str2bytes
portal = context.getPortalObject()
request = container.REQUEST
response = request.RESPONSE
......@@ -16,26 +17,21 @@ if error is not None:
return handleError(error)
elif code is not None:
response_dict = context.ERP5Site_getAccessTokenFromCode(
code,
"{0}/ERP5Site_receiveGoogleCallback".format(context.absolute_url()))
if response_dict is not None:
access_token = response_dict['access_token'].encode('utf-8')
hash_str = context.Base_getHMAC(access_token, access_token)
context.setAuthCookie(response, '__ac_google_hash', hash_str)
# store timestamp in second since the epoch in UTC is enough
response_dict["response_timestamp"] = time.time()
context.Base_setBearerToken(hash_str,
google_connector = portal.ERP5Site_getDefaultGoogleConnector()
response_dict = google_connector.getTokenFromCode(
state=state,
code=code,
redirect_uri="{}/ERP5Site_receiveGoogleCallback".format(portal.absolute_url()),
)
access_token = str2bytes(response_dict['access_token'])
hash_str = portal.Base_getHMAC(access_token, access_token)
portal.setAuthCookie(response, '__ac_google_hash', hash_str)
portal.Base_setBearerToken(
hash_str,
response_dict,
"google_server_auth_token_cache_factory")
user_dict = context.ERP5Site_getGoogleUserEntry(access_token)
user_reference = user_dict["email"]
context.Base_setBearerToken(access_token,
{"reference": user_reference},
"google_server_auth_token_cache_factory")
method = getattr(context, "ERP5Site_createGoogleUserToOAuth", None)
if method is not None:
method(user_reference, user_dict)
# XXX for ERP5JS web sites without a rewrite rule, we make sure there's a trailing /
return response.redirect(request.get("came_from") or context.absolute_url() + '/')
......
......@@ -50,7 +50,7 @@
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>code=None, error=None</string> </value>
<value> <string>code=None, state=None, error=None</string> </value>
</item>
<item>
<key> <string>id</string> </key>
......
portal = context.getPortalObject()
google_connector = portal.ERP5Site_getDefaultGoogleConnector()
return google_connector.redirectToGoogleLoginPage(
"{0}/ERP5Site_receiveGoogleCallback".format(portal.absolute_url()),
RESPONSE=RESPONSE,
)
......@@ -2,25 +2,67 @@
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>redirectToGoogleLoginPage</string> </value>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="_reconstructor" module="copy_reg"/>
</klass>
<tuple>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
<global name="object" module="__builtin__"/>
<none/>
</tuple>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>GoogleLoginUtility</string> </value>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_redirectToGoogleLoginPage</string> </value>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>RESPONSE</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
<key> <string>_proxy_roles</string> </key>
<value>
<tuple>
<string>Auditor</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_redirectToGoogleLoginPage</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -25,290 +25,373 @@
#
##############################################################################
import uuid
import contextlib
import mock
import lxml
import six.moves.urllib.parse
import json
import responses
import time
import six.moves.urllib as urllib
import six.moves.http_client
import six.moves.http_cookies
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript
CLIENT_ID = "a1b2c3"
SECRET_KEY = "3c2ba1"
ACCESS_TOKEN = "T1234"
CODE = "1234"
def getUserId(access_token):
return "dummy@example.com"
def getAccessTokenFromCode(code, redirect_uri):
assert code == CODE, "Invalid code"
# This is an example of a Google response
return {'_module': 'oauth2client.client',
'scopes': ['https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/userinfo.profile'],
'revoke_uri': 'https://accounts.google.com/o/oauth2/revoke',
'access_token': ACCESS_TOKEN,
'token_uri': 'https://www.googleapis.com/oauth2/v4/token',
'token_info_uri': 'https://www.googleapis.com/oauth2/v3/tokeninfo',
'invalid': False,
'token_response': {
'access_token': ACCESS_TOKEN,
'token_type': 'Bearer',
'expires_in': 3600,
'refresh_token': "111",
'id_token': '222'
},
'client_id': CLIENT_ID,
'id_token': {
'picture': '',
'sub': '',
'aud': '',
'family_name': 'D',
'iss': 'https://accounts.google.com',
'email_verified': True,
'at_hash': 'p3vPYQkVuqByBA',
'given_name': 'John',
'exp': 123,
'azp': '123.apps.googleusercontent.com',
'iat': 455,
'locale': 'pt',
'email': getUserId(None),
'name': 'John D'
},
'client_secret': 'secret',
'token_expiry': '2017-03-31T16:06:28Z',
'_class': 'OAuth2Credentials',
'refresh_token': '111',
'user_agent': None
}
def getUserEntry(access_token):
return {
"first_name": "John",
"last_name": "Doe",
"email": getUserId(None),
"reference": getUserId(None)
}
class GoogleLoginTestCase(ERP5TypeTestCase):
default_google_login_email_address = 'dummy@example.com'
dummy_connector_id = 'test_google_connector'
client_id = "a1b2c3"
secret_key = "3c2ba1"
def afterSetUp(self):
"""
This is ran before anything, used to set the environment
"""
self.login()
self.portal.TemplateTool_checkGoogleExtractionPluginExistenceConsistency(fixit=True)
self.dummy_connector_id = "test_google_connector"
# use random tokens because we can not clear the memcached cache
self.access_token = 'access-token' + self.id() + self.newPassword()
self.refresh_token = 'refresh-token' + self.id() + self.newPassword()
self.default_user_person = self.portal.person_module.newContent(
portal_type='Person',
first_name=self.id(),
)
self.default_user_person.newContent(
portal_type='Google Login',
reference=self.default_google_login_email_address,
).validate()
self.default_user_person.newContent(portal_type='Assignment').open()
if getattr(self.portal.portal_oauth, self.dummy_connector_id, None) is None:
connector = self.portal.portal_oauth.newContent(
id=self.dummy_connector_id,
portal_type="Google Connector",
reference="default",
client_id=self.client_id,
secret_key=self.secret_key)
connector.validate()
self.tic()
def beforeTearDown(self):
self.abort()
self.portal.portal_caches.getRamCacheRoot().get(
self.portal.acl_users.erp5_google_extraction.cache_factory_name
).clearCache()
self.portal.person_module.manage_delObjects([self.default_user_person.getId()])
portal_catalog = self.portal.portal_catalog
for obj in portal_catalog(portal_type=["Google Login", "Person"],
reference=getUserId(None),
validation_state="validated"):
obj.getObject().invalidate()
uuid_str = uuid.uuid4().hex
obj.setReference(uuid_str)
obj.setUserId(uuid_str)
for connector in portal_catalog(portal_type="Google Connector",
validation_state="validated",
id="NOT %s" % self.dummy_connector_id,
reference="default"):
connector.invalidate()
if getattr(self.portal.portal_oauth, self.dummy_connector_id, None) is None:
connector = self.portal.portal_oauth.newContent(id=self.dummy_connector_id,
portal_type="Google Connector",
reference="default",
client_id=CLIENT_ID,
secret_key=SECRET_KEY)
connector.validate()
self.tic()
self.logout()
@contextlib.contextmanager
def _default_login_responses(self):
with responses.RequestsMock() as rsps:
rsps.add(
method='POST',
url='https://accounts.google.com/o/oauth2/token',
json={
'access_token': self.access_token,
'refresh_token': self.refresh_token,
'expires_in': 3600,
},
)
rsps.add(
method='GET',
url='https://www.googleapis.com/oauth2/v1/userinfo',
json={
"first_name": "John",
"last_name": "Doe",
"email": self.default_google_login_email_address,
}
)
yield
class TestGoogleLogin(GoogleLoginTestCase):
def test_redirect(self):
"""
Check URL generate to redirect to Google
Check URL generated to redirect to Google
"""
self.logout()
self.portal.ERP5Site_redirectToGoogleLoginPage()
location = self.portal.REQUEST.RESPONSE.getHeader("Location")
response = self.portal.REQUEST.RESPONSE
self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response)
location = response.getHeader("Location")
self.assertIn("https://accounts.google.com/o/oauth2/", location)
self.assertIn("response_type=code", location)
self.assertIn("client_id=%s" % CLIENT_ID, location)
self.assertIn("client_id=%s" % self.client_id, location)
self.assertNotIn("secret_key=", location)
self.assertIn("ERP5Site_receiveGoogleCallback", location)
def test_existing_user(self):
self.login()
person = self.portal.person_module.newContent(
portal_type='Person',
)
person.newContent(
portal_type='Google Login',
reference=getUserId(None)
).validate()
person.newContent(portal_type='Assignment').open()
self.tic()
self.logout()
request = self.portal.REQUEST
response = request.RESPONSE
with mock.patch(
'erp5.component.extension.GoogleLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.GoogleLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.__code__ = getAccessTokenFromCode.__code__
getUserEntry_mock.__code__ = getUserEntry.__code__
self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
request["__ac_google_hash"] = response.cookies["__ac_google_hash"]["value"]
redirect_url = urllib.parse.urlparse(
self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response))
state = dict(urllib.parse.parse_qsl(redirect_url.query))['state']
self.assertTrue(state)
with mock.patch(
code = 'code-ABC'
def token_callback(request):
self.assertEqual(
request.headers['Content-Type'],
'application/x-www-form-urlencoded')
self.assertEqual(
dict(urllib.parse.parse_qsl(request.body)),
{
'client_id': self.client_id,
'code': code,
'grant_type': 'authorization_code',
'client_secret': self.secret_key,
'redirect_uri': self.portal.absolute_url() + '/ERP5Site_receiveGoogleCallback',
},
)
return 200, {}, json.dumps({
'access_token': self.access_token,
'refresh_token': self.refresh_token,
'expires_in': 3600,
})
with responses.RequestsMock() as rsps:
rsps.add_callback(
responses.POST,
'https://accounts.google.com/o/oauth2/token',
token_callback,
)
self.portal.ERP5Site_receiveGoogleCallback(code=code, state=state)
def userinfo_callback(request):
self.assertEqual(
request.headers['Authorization'],
'Bearer ' + self.access_token)
return 200, {}, json.dumps({
"first_name": "John",
"last_name": "Doe",
"email": self.default_google_login_email_address,
})
request['__ac_google_hash'] = response.cookies['__ac_google_hash']['value']
with responses.RequestsMock() as rsps, \
mock.patch(
'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin._setUserNameForAccessLog'
) as _setUserNameForAccessLog:
credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(request)
self.assertEqual(
'Google Login',
credentials['login_portal_type'])
rsps.add_callback(
responses.GET,
'https://www.googleapis.com/oauth2/v1/userinfo',
userinfo_callback,
)
credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(
request)
self.assertEqual(credentials['login_portal_type'], 'Google Login')
self.assertEqual(
getUserId(None),
credentials['external_login'])
credentials['external_login'],
self.default_google_login_email_address)
# this is what will appear in Z2.log
_setUserNameForAccessLog.assert_called_once_with(
'erp5_google_extraction=%s' % getUserId(None),
'erp5_google_extraction=dummy@example.com',
request)
user_id, login = self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials)
self.assertEqual(person.getUserId(), user_id)
self.assertEqual(getUserId(None), login)
user_id, user_name = self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials)
self.assertEqual(user_id, self.default_user_person.getUserId())
self.assertEqual(user_name, self.default_google_login_email_address)
self.login(user_id)
self.assertEqual(self.portal.Base_getUserCaption(), login)
self.assertEqual(self.portal.Base_getUserCaption(), user_name)
def test_auth_cookie(self):
request = self.portal.REQUEST
response = request.RESPONSE
# (the secure flag is only set if we accessed through https)
request.setServerURL('https', 'example.com')
with mock.patch(
'erp5.component.extension.GoogleLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.GoogleLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.__code__ = getAccessTokenFromCode.__code__
getUserEntry_mock.__code__ = getUserEntry.__code__
self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
redirect_url = urllib.parse.urlparse(
self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response))
state = dict(urllib.parse.parse_qsl(redirect_url.query))['state']
with self._default_login_responses():
self.portal.ERP5Site_receiveGoogleCallback(code='code', state=state)
ac_cookie, = [v for (k, v) in response.listHeaders() if k.lower() == 'set-cookie' and '__ac_google_hash=' in v]
self.assertIn('; secure', ac_cookie.lower())
self.assertIn('; httponly', ac_cookie.lower())
self.assertIn('; samesite=lax', ac_cookie.lower())
def test_create_user_in_ERP5Site_createGoogleUserToOAuth(self):
"""
Check if ERP5 set cookie properly after receive code from external service
"""
self.login()
id_list = []
for result in self.portal.portal_catalog(portal_type="Credential Request",
reference=getUserId(None)):
id_list.append(result.getObject().getId())
self.portal.credential_request_module.manage_delObjects(ids=id_list)
skin = self.portal.portal_skins.custom
createZODBPythonScript(skin, "CredentialRequest_createUser", "", """
person = context.getDestinationDecisionValue(portal_type="Person")
login_list = [x for x in person.objectValues(portal_type='Google Login') \
if x.getValidationState() == 'validated']
if len(login_list):
login = login_list[0]
else:
login = person.newContent(portal_type='Google Login')
reference = context.getReference()
if not login.hasReference():
if not reference:
raise ValueError("Impossible to create an account without login")
login.setReference(reference)
if not person.Person_getUserId():
person.setUserId(reference)
if login.getValidationState() == 'draft':
login.validate()
return reference, None
""")
createZODBPythonScript(skin, "ERP5Site_createGoogleUserToOAuth", "user_reference, user_dict", """
module = context.getPortalObject().getDefaultModule(portal_type='Credential Request')
credential_request = module.newContent(
portal_type="Credential Request",
first_name=user_dict["first_name"],
last_name=user_dict["last_name"],
reference=user_reference,
default_email_text=user_dict["email"],
)
credential_request.submit()
context.portal_alarms.accept_submitted_credentials.activeSense()
return credential_request
""")
self.logout()
# make sure user info URL is called for _default_login_responses
cookie = six.moves.http_cookies.SimpleCookie()
cookie.load(ac_cookie)
resp = self.publish(
self.portal.getPath(),
env={
'HTTP_COOKIE': '__ac_google_hash="%s"' % cookie.get('__ac_google_hash').value
}
)
self.assertEqual(resp.getStatus(), six.moves.http_client.OK)
def test_non_existing_user(self):
request = self.portal.REQUEST
response = request.RESPONSE
redirect_url = urllib.parse.urlparse(
self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response))
state = dict(urllib.parse.parse_qsl(redirect_url.query))['state']
with responses.RequestsMock() as rsps:
rsps.add(
method='POST',
url='https://accounts.google.com/o/oauth2/token',
json={
'access_token': self.access_token,
'refresh_token': self.refresh_token,
'expires_in': 3600,
},
)
self.portal.ERP5Site_receiveGoogleCallback(code='code', state=state)
request['__ac_google_hash'] = response.cookies['__ac_google_hash']['value']
with responses.RequestsMock() as rsps:
rsps.add(
method='GET',
url='https://www.googleapis.com/oauth2/v1/userinfo',
json={
"first_name": "Bob",
"last_name": "Doe",
"email": "unknown@example.com",
}
)
credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(
request)
self.assertEqual(credentials['login_portal_type'], 'Google Login')
self.assertEqual(
credentials['external_login'],
"unknown@example.com")
self.assertIsNone(
self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials))
def test_invalid_cookie(self):
request = self.portal.REQUEST
request['__ac_google_hash'] = '???'
credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(
request)
self.assertEqual(credentials, {})
def test_refresh_token(self):
request = self.portal.REQUEST
response = request.RESPONSE
redirect_url = urllib.parse.urlparse(
self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response))
state = dict(urllib.parse.parse_qsl(redirect_url.query))['state']
with self._default_login_responses():
resp = self.publish(
'%s/ERP5Site_receiveGoogleCallback?%s' % (
self.portal.getPath(),
urllib.parse.urlencode(
{
'code': 'code',
'state': state,
}
)
)
)
self.assertEqual(resp.getStatus(), six.moves.http_client.FOUND)
env = {
'HTTP_COOKIE': '__ac_google_hash="%s"' % resp.getCookie('__ac_google_hash')['value']
}
resp = self.publish(self.portal.getPath(), env=env)
self.assertEqual(resp.getStatus(), six.moves.http_client.OK)
def token_callback(request):
self.assertEqual(
request.headers['Content-Type'],
'application/x-www-form-urlencoded')
self.assertEqual(
dict(urllib.parse.parse_qsl(request.body)),
{
'access_type': 'offline',
'client_id': self.client_id,
'client_secret': self.secret_key,
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
}
)
return 200, {}, json.dumps({
'access_token': 'new' + self.access_token,
'refresh_token': 'new' + self.refresh_token,
'expires_in': 3600,
})
with mock.patch(
'erp5.component.extension.GoogleLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.GoogleLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.__code__ = getAccessTokenFromCode.__code__
getUserEntry_mock.__code__ = getUserEntry.__code__
response = self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
google_hash = self.portal.REQUEST.RESPONSE.cookies.get("__ac_google_hash")["value"]
self.assertEqual("b01533abb684a658dc71c81da4e67546", google_hash)
absolute_url = self.portal.absolute_url()
self.assertNotEqual(absolute_url[-1], '/')
self.assertEqual(absolute_url + '/', response)
cache_dict = self.portal.Base_getBearerToken(google_hash, "google_server_auth_token_cache_factory")
self.assertEqual(CLIENT_ID, cache_dict["client_id"])
self.assertEqual(ACCESS_TOKEN, cache_dict["access_token"])
self.assertEqual({'reference': getUserId(None)},
self.portal.Base_getBearerToken(ACCESS_TOKEN, "google_server_auth_token_cache_factory")
'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin.time.time',
return_value=time.time() + 5000), \
responses.RequestsMock() as rsps:
rsps.add_callback(
responses.POST,
'https://accounts.google.com/o/oauth2/token',
token_callback,
)
self.portal.REQUEST["__ac_google_hash"] = google_hash
erp5_google_extractor = self.portal.acl_users.erp5_google_extraction
self.assertEqual({'external_login': getUserId(None),
'login_portal_type': 'Google Login',
'remote_host': '',
'remote_address': ''}, erp5_google_extractor.extractCredentials(self.portal.REQUEST))
self.tic()
self.login()
credential_request = self.portal.portal_catalog(portal_type="Credential Request",
reference=getUserId(None))[0].getObject()
credential_request.accept()
person = credential_request.getDestinationDecisionValue()
google_login = person.objectValues(portal_types="Google Login")[0]
self.assertEqual(getUserId(None), google_login.getReference())
# refreshing the token calls userinfo again
rsps.add(
method='GET',
url='https://www.googleapis.com/oauth2/v1/userinfo',
json={
"first_name": "John",
"last_name": "Doe",
"email": self.default_google_login_email_address,
}
)
resp = self.publish(self.portal.getPath(), env=env)
self.assertEqual(resp.getStatus(), six.moves.http_client.OK)
resp = self.publish(self.portal.getPath(), env=env)
self.assertEqual(resp.getStatus(), six.moves.http_client.OK)
def test_refresh_token_expired(self):
request = self.portal.REQUEST
response = request.RESPONSE
redirect_url = urllib.parse.urlparse(
self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response))
state = dict(urllib.parse.parse_qsl(redirect_url.query))['state']
with self._default_login_responses():
resp = self.publish(
'%s/ERP5Site_receiveGoogleCallback?%s' % (
self.portal.getPath(),
urllib.parse.urlencode(
{
'code': 'code',
'state': state,
}
)
)
)
self.assertEqual(resp.getStatus(), six.moves.http_client.FOUND)
env = {
'HTTP_COOKIE': '__ac_google_hash="%s"' % resp.getCookie('__ac_google_hash')['value']
}
resp = self.publish(self.portal.getPath(), env=env)
self.assertEqual(resp.getStatus(), six.moves.http_client.OK)
with mock.patch(
'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin.time.time',
return_value=time.time() + 5000), \
responses.RequestsMock() as rsps:
rsps.add(
method='POST',
url='https://accounts.google.com/o/oauth2/token',
status=six.moves.http_client.UNAUTHORIZED,
)
resp = self.publish(self.portal.getPath(), env=env)
self.assertEqual(resp.getStatus(), six.moves.http_client.FOUND)
self.assertIn('/login_form', resp.getHeader('Location'))
def test_logout(self):
resp = self.publish(self.portal.getId() + '/logout')
......@@ -328,7 +411,7 @@ class TestERP5JSGoogleLogin(GoogleLoginTestCase):
if img.attrib['alt'] == 'Sign in with Google'
]
self.assertIn('/ERP5Site_redirectToGoogleLoginPage', google_login_link)
resp = self.publish(six.moves.urllib.parse.urlparse(google_login_link).path)
resp = self.publish(urllib.parse.urlparse(google_login_link).path)
# this request redirects to google
self.assertEqual(resp.getStatus(), six.moves.http_client.FOUND)
self.assertIn('google.com', resp.getHeader('Location'))
......
document.erp5.GoogleConnector
\ No newline at end of file
extension.erp5.GoogleLoginUtility
\ No newline at end of file
Google Connector | OAuthClient
Google Connector | SocketClient
Template Tool | TemplateToolERP5GoogleExtractionPluginConstraint
\ No newline at end of file
......@@ -38,10 +38,10 @@ from Products.ERP5Security import _setUserNameForAccessLog
from AccessControl.SecurityManagement import getSecurityManager, \
setSecurityManager, newSecurityManager
from Products.ERP5Type.Cache import DEFAULT_CACHE_SCOPE
import time
import six
import time
from six.moves import urllib
import json
from zLOG import LOG, ERROR, INFO
try:
......@@ -49,41 +49,13 @@ try:
except ImportError:
facebook = None
try:
import apiclient.discovery
import httplib2
import oauth2client.client
except ImportError:
httplib2 = None
#Form for new plugin in ZMI
# Form for new plugin in ZMI
manage_addERP5FacebookExtractionPluginForm = PageTemplateFile(
'www/ERP5Security_addERP5FacebookExtractionPlugin', globals(),
__name__='manage_addERP5FacebookExtractionPluginForm')
def getGoogleUserEntry(token):
if httplib2 is None:
LOG('ERP5GoogleExtractionPlugin', INFO,
'No Google modules available, please install google-api-python-client '
'package. Authentication disabled..')
return None
http = oauth2client.client.AccessTokenCredentials(token,
'ERP5 Client'
).authorize(httplib2.Http(timeout=5))
service = apiclient.discovery.build("oauth2", "v1", http=http)
google_entry = service.userinfo().get().execute()
user_entry = {}
if google_entry is not None:
# sanitise value
for k in (('first_name', 'given_name'),
('last_name', 'family_name'),
('email', 'email'),
('reference', 'email'),):
value = google_entry.get(k[1], '').encode('utf-8')
user_entry[k[0]] = value
return user_entry
def addERP5FacebookExtractionPlugin(dispatcher, id, title=None, REQUEST=None):
""" Add a ERP5FacebookExtractionPlugin to a Pluggable Auth Service. """
......@@ -98,7 +70,7 @@ def addERP5FacebookExtractionPlugin(dispatcher, id, title=None, REQUEST=None):
'ERP5FacebookExtractionPlugin+added.'
% dispatcher.absolute_url())
#Form for new plugin in ZMI
# Form for new plugin in ZMI
manage_addERP5GoogleExtractionPluginForm = PageTemplateFile(
'www/ERP5Security_addERP5GoogleExtractionPlugin', globals(),
__name__='manage_addERP5GoogleExtractionPluginForm')
......@@ -157,12 +129,12 @@ class ERP5ExternalOauth2ExtractionPlugin:
for cache_plugin in cache_factory.getCachePluginList():
cache_entry = cache_plugin.get(key, DEFAULT_CACHE_SCOPE)
if cache_entry is not None:
# Avoid errors if the plugin don't have the funcionality of refresh token
refreshTokenIfExpired = getattr(self, "refreshTokenIfExpired", None)
cache_value = cache_entry.getValue()
if refreshTokenIfExpired is not None:
return refreshTokenIfExpired(key, cache_value)
else:
# getToken is called for the access_token_dict and for
# the user entry. We try to refresh only for the
# access_token_dict
if 'refresh_token' in cache_value:
return self.refreshTokenIfExpired(key, cache_value)
return cache_value
raise KeyError('Key %r not found' % key)
......@@ -171,19 +143,29 @@ class ERP5ExternalOauth2ExtractionPlugin:
####################################
security.declarePrivate('extractCredentials')
def extractCredentials(self, request):
""" Extract Oauth2 credentials from the request header. """
user_dict = {}
""" Extract Oauth2 credentials from cookie.
This plugins uses two level of cache storage:
- cookie_value => access_token_dict
- access_token => user_entry
access_token_dict depends on the concrete plugin classes,
but this is generally access_token and refresh_token.
user_entry must contain "reference", which is the reference
of the corresponding login document in ERP5.
"""
access_token_dict = {}
cookie_hash = request.get(self.cookie_name)
if cookie_hash is not None:
try:
user_dict = self.getToken(cookie_hash)
access_token_dict = self.getToken(cookie_hash)
except KeyError:
LOG(self.getId(), INFO, 'Hash %s not found' % cookie_hash)
return {}
token = None
if "access_token" in user_dict:
token = user_dict["access_token"]
if "access_token" in access_token_dict:
token = access_token_dict["access_token"]
if token is None:
# no token, then no credentials
......@@ -193,7 +175,7 @@ class ERP5ExternalOauth2ExtractionPlugin:
try:
user_entry = self.getToken(token)
except KeyError:
user_entry = self.getUserEntry(token)
user_entry = self.getUserEntry(access_token_dict)
if user_entry is not None:
# Reduce data size because, we don't need more than reference
user_entry = {"reference": user_entry["reference"]}
......@@ -263,40 +245,36 @@ class ERP5FacebookExtractionPlugin(ERP5ExternalOauth2ExtractionPlugin, BasePlugi
cookie_name = "__ac_facebook_hash"
cache_factory_name = "facebook_server_auth_token_cache_factory"
def refreshTokenIfExpired(self, key, cache_value):
return cache_value
def refreshTokenIfExpired(self, key, access_token_dict):
return access_token_dict
def getUserEntry(self, token):
return getFacebookUserDict(token)
class ERP5GoogleExtractionPlugin(ERP5ExternalOauth2ExtractionPlugin, BasePlugin):
"""
Plugin to authenicate as machines.
Plugin to authenticate using google OAuth2.
"""
meta_type = "ERP5 Google Extraction Plugin"
login_portal_type = "Google Login"
cookie_name = "__ac_google_hash"
cache_factory_name = "google_server_auth_token_cache_factory"
def refreshTokenIfExpired(self, key, cache_value):
expires_in = cache_value.get("token_response", {}).get("expires_in")
refresh_token = cache_value.get("refresh_token")
if expires_in and refresh_token:
if (time.time() - cache_value["response_timestamp"]) >= float(expires_in):
credential = oauth2client.client.OAuth2Credentials(
cache_value["access_token"], cache_value["client_id"],
cache_value["client_secret"], refresh_token,
cache_value["token_expiry"], cache_value["token_uri"],
cache_value["user_agent"])
credential.refresh(httplib2.Http(timeout=5))
cache_value = json.loads(credential.to_json())
cache_value["response_timestamp"] = time.time()
self.setToken(key, cache_value)
return cache_value
def refreshTokenIfExpired(self, key, access_token_dict):
if (time.time() - access_token_dict["response_timestamp"]) \
>= access_token_dict['expires_in']:
access_token_dict = self.getPortalObject().unrestrictedTraverse(
access_token_dict['connector_relative_url']
).refreshToken(access_token_dict)
self.setToken(key, access_token_dict)
return access_token_dict
def getUserEntry(self, access_token_dict):
return self.getPortalObject().unrestrictedTraverse(
access_token_dict['connector_relative_url'],
).getUserEntry(access_token_dict['access_token'])
def getUserEntry(self, token):
return getGoogleUserEntry(token)
#List implementation of class
classImplements( ERP5FacebookExtractionPlugin,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment