Commit c3488a67 authored by Jérome Perrin's avatar Jérome Perrin

oauth_google_login: reimplement with oauthlib/requests

Because the librairies used here were never ported to python3.

Notable changes:

  - ERP5Site_createGoogleUserToOAuth is dropped
  - internal API changed radically, so customizations made by overriding
    scripts are broken.
  - the core logic is now implemented in a connector class (still in
    portal_oauth for simplicity, but it would be simpler to move it to
    portal_web_services)

No changes required in the google console, the redirect uri is still
ERP5Site_receiveGoogleCallback
parent 92184552
##############################################################################
# Copyright (c) 2024 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from AccessControl import ClassSecurityInfo
import random
import string
import time
import oauthlib.oauth2
import requests
from zExceptions import Unauthorized
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type import Permissions
from Products.ERP5Type.Utils import unicode2str
from Products.ERP5Type.Timeout import getTimeLeft
AUTH_URL = 'https://accounts.google.com/o/oauth2/auth'
TOKEN_URL = 'https://accounts.google.com/o/oauth2/token'
USER_INFO_URL = 'https://www.googleapis.com/oauth2/v1/userinfo'
SCOPE_LIST = ['https://www.googleapis.com/auth/userinfo.profile',
'https://www.googleapis.com/auth/userinfo.email']
# Default timeout (in seconds) for the HTTP request made to google servers to
# exchange the authorization code for a token.
DEFAULT_HTTP_TIMEOUT = 10
class GoogleConnector(XMLObject):
meta_type = 'ERP5 Google Connector'
portal_type = 'Google Connector'
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
@security.public
def redirectToGoogleLoginPage(self, redirect_uri, RESPONSE):
"""Redirect to authorization page.
"""
authorization_url = self._getOAuthlibClient().prepare_request_uri(
uri=AUTH_URL,
redirect_uri=redirect_uri,
scope=SCOPE_LIST,
access_type="offline",
include_granted_scopes='true',
prompt="consent",
state=self._getAuthorizationState(),
)
return RESPONSE.redirect(authorization_url)
@security.public # XXX public but not publishable
def getTokenFromCode(self, state, code, redirect_uri):
self._verifyAuthorizationState(state)
body = self._getOAuthlibClient().prepare_request_body(
code=code,
client_secret=self.getSecretKey(),
redirect_uri=redirect_uri,
)
resp = requests.post(
TOKEN_URL,
data=body,
headers={'Content-Type': 'application/x-www-form-urlencoded'},
timeout=self._getTimeout(),
)
__traceback_info__ = (resp.content, resp.status_code)
resp.raise_for_status()
return self._getGoogleTokenFromJSONResponse(resp.json())
@security.private
def refreshToken(self, token):
"""Refresh auth token.
Used by Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin
"""
body = self._getOAuthlibClient().prepare_refresh_body(
client_id=self.getClientId(),
client_secret=self.getSecretKey(),
access_type="offline",
refresh_token=token['refresh_token'],
)
resp = requests.post(
TOKEN_URL,
data=body,
headers={'Content-Type': 'application/x-www-form-urlencoded'},
timeout=self._getTimeout(),
)
if not resp.ok:
return {}
return self._getGoogleTokenFromJSONResponse(resp.json())
@security.private
def getUserEntry(self, access_token):
resp = requests.get(
USER_INFO_URL,
headers={'Authorization': 'Bearer {}'.format(access_token)},
timeout=self._getTimeout(),
)
resp.raise_for_status()
google_entry = resp.json()
user_entry = {}
# remap user info
for erp5_key, google_key in (
('first_name', 'given_name'),
('last_name', 'family_name'),
('email', 'email'),
('reference', 'email'),
):
user_entry[erp5_key] = unicode2str(google_entry.get(google_key, ''))
return user_entry
def _getOAuthlibClient(self):
return oauthlib.oauth2.WebApplicationClient(
self.getClientId(),
access_type="offline",
)
def _getGoogleTokenFromJSONResponse(self, token):
return {
'access_token': unicode2str(token['access_token']),
'refresh_token': unicode2str(token['refresh_token']),
'expires_in': token['expires_in'],
'response_timestamp': time.time(),
'connector_relative_url': self.getRelativeUrl(),
}
def _getAuthorizationState(self):
alphabet = string.ascii_letters + string.digits
state = ''.join(random.SystemRandom().choice(alphabet) for _ in range(32))
self.getPortalObject().portal_sessions['google_login_auth_state'][state] = True
return state
def _verifyAuthorizationState(self, state):
if not self.getPortalObject().portal_sessions['google_login_auth_state'].pop(state, False):
raise Unauthorized
def _getTimeout(self):
"""Compute the time left according to publisher deadline.
"""
time_left = getTimeLeft()
if time_left is None:
time_left = DEFAULT_HTTP_TIMEOUT
return min(self.getTimeout() or DEFAULT_HTTP_TIMEOUT, time_left)
...@@ -2,13 +2,13 @@ ...@@ -2,13 +2,13 @@
<ZopeData> <ZopeData>
<record id="1" aka="AAAAAAAAAAE="> <record id="1" aka="AAAAAAAAAAE=">
<pickle> <pickle>
<global name="Extension Component" module="erp5.portal_type"/> <global name="Document Component" module="erp5.portal_type"/>
</pickle> </pickle>
<pickle> <pickle>
<dictionary> <dictionary>
<item> <item>
<key> <string>default_reference</string> </key> <key> <string>default_reference</string> </key>
<value> <string>GoogleLoginUtility</string> </value> <value> <string>GoogleConnector</string> </value>
</item> </item>
<item> <item>
<key> <string>description</string> </key> <key> <string>description</string> </key>
...@@ -18,11 +18,7 @@ ...@@ -18,11 +18,7 @@
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
<value> <string>extension.erp5.GoogleLoginUtility</string> </value> <value> <string>document.erp5.GoogleConnector</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Extension Component</string> </value>
</item> </item>
<item> <item>
<key> <string>sid</string> </key> <key> <string>sid</string> </key>
......
import json
import oauth2client.client
import oauth2client.transport
from Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin import getGoogleUserEntry
from zExceptions import Unauthorized
SCOPE_LIST = ['https://www.googleapis.com/auth/userinfo.profile',
'https://www.googleapis.com/auth/userinfo.email']
# Default timeout (in seconds) for the HTTP request made to google servers to
# exchange the authorization code for a token.
DEFAULT_HTTP_TIMEOUT = 10
def _getGoogleClientIdAndSecretKey(portal, reference="default"):
"""Returns google client id and secret key.
Internal function.
"""
result_list = unrestrictedSearchGoogleConnector(portal, reference=reference)
assert result_list, "Google Connector not found"
if len(result_list) == 2:
raise ValueError("Impossible to select one Google Connector")
google_connector = result_list[0].getObject()
return google_connector.getClientId(), google_connector.getSecretKey()
def redirectToGoogleLoginPage(self):
client_id, secret_key = _getGoogleClientIdAndSecretKey(self.getPortalObject())
flow = oauth2client.client.OAuth2WebServerFlow(
client_id=client_id,
client_secret=secret_key,
scope=SCOPE_LIST,
redirect_uri="{0}/ERP5Site_receiveGoogleCallback".format(self.absolute_url()),
access_type="offline",
prompt="consent",
include_granted_scopes="true")
self.REQUEST.RESPONSE.redirect(flow.step1_get_authorize_url())
def getAccessTokenFromCode(self, code, redirect_uri, timeout=DEFAULT_HTTP_TIMEOUT):
client_id, secret_key = _getGoogleClientIdAndSecretKey(self.getPortalObject())
flow = oauth2client.client.OAuth2WebServerFlow(
client_id=client_id,
client_secret=secret_key,
scope=SCOPE_LIST,
redirect_uri=redirect_uri,
access_type="offline",
include_granted_scopes="true")
credential = flow.step2_exchange(
code,
http=oauth2client.transport.get_http_object(timeout=timeout))
credential_data = json.loads(credential.to_json())
return credential_data
def unrestrictedSearchGoogleConnector(self, reference="default"):
return self.getPortalObject().portal_catalog.unrestrictedSearchResults(
portal_type="Google Connector",
reference=reference,
validation_state="validated",
limit=2)
def unrestrictedSearchGoogleLogin(self, login, REQUEST=None):
if REQUEST is not None:
raise Unauthorized
return self.getPortalObject().portal_catalog.unrestrictedSearchResults(
portal_type="Google Login",
reference=login,
validation_state="validated", limit=1)
def getUserEntry(access_token):
return getGoogleUserEntry(access_token)
\ No newline at end of file
<property_sheet_list> <property_sheet_list>
<portal_type id="Google Connector"> <portal_type id="Google Connector">
<item>OAuthClient</item> <item>OAuthClient</item>
<item>SocketClient</item>
</portal_type> </portal_type>
<portal_type id="Template Tool"> <portal_type id="Template Tool">
<item>TemplateToolERP5GoogleExtractionPluginConstraint</item> <item>TemplateToolERP5GoogleExtractionPluginConstraint</item>
......
...@@ -40,7 +40,7 @@ ...@@ -40,7 +40,7 @@
</item> </item>
<item> <item>
<key> <string>type_class</string> </key> <key> <string>type_class</string> </key>
<value> <string>XMLObject</string> </value> <value> <string>GoogleConnector</string> </value>
</item> </item>
<item> <item>
<key> <string>type_interface</string> </key> <key> <string>type_interface</string> </key>
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>getAccessTokenFromCode</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>GoogleLoginUtility</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getAccessTokenFromCode</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
portal = context.getPortalObject()
result_list = portal.portal_catalog(
portal_type="Google Connector",
reference=reference,
validation_state="validated",
limit=2)
if len(result_list) != 1:
raise ValueError("Impossible to select one Google Connector")
return result_list[0].getObject()
...@@ -50,19 +50,19 @@ ...@@ -50,19 +50,19 @@
</item> </item>
<item> <item>
<key> <string>_params</string> </key> <key> <string>_params</string> </key>
<value> <string>login, REQUEST=None</string> </value> <value> <string>reference="default"</string> </value>
</item> </item>
<item> <item>
<key> <string>_proxy_roles</string> </key> <key> <string>_proxy_roles</string> </key>
<value> <value>
<tuple> <tuple>
<string>Manager</string> <string>Auditor</string>
</tuple> </tuple>
</value> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
<value> <string>ERP5Site_getPersonFromGoogleLogin</string> </value> <value> <string>ERP5Site_getDefaultGoogleConnector</string> </value>
</item> </item>
</dictionary> </dictionary>
</pickle> </pickle>
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>unrestrictedSearchGoogleConnector</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>GoogleLoginUtility</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getGoogleConnector</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>unrestrictedSearchGoogleConnector</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>GoogleLoginUtility</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getGoogleLogin</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_function</string> </key>
<value> <string>getUserEntry</string> </value>
</item>
<item>
<key> <string>_module</string> </key>
<value> <string>GoogleLoginUtility</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getGoogleUserEntry</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
from zExceptions import Unauthorized
if REQUEST is not None:
raise Unauthorized
login = context.ERP5Site_getGoogleLogin(login)
if login is None:
return login
if len(login) > 1:
raise ValueError("Duplicated User")
return login[0].getParentValue().getRelativeUrl()
import time from Products.ERP5Type.Utils import str2bytes
portal = context.getPortalObject()
request = container.REQUEST request = container.REQUEST
response = request.RESPONSE response = request.RESPONSE
...@@ -16,26 +17,21 @@ if error is not None: ...@@ -16,26 +17,21 @@ if error is not None:
return handleError(error) return handleError(error)
elif code is not None: elif code is not None:
response_dict = context.ERP5Site_getAccessTokenFromCode( google_connector = portal.ERP5Site_getDefaultGoogleConnector()
code, response_dict = google_connector.getTokenFromCode(
"{0}/ERP5Site_receiveGoogleCallback".format(context.absolute_url())) state=state,
if response_dict is not None: code=code,
access_token = response_dict['access_token'].encode('utf-8') redirect_uri="{}/ERP5Site_receiveGoogleCallback".format(portal.absolute_url()),
hash_str = context.Base_getHMAC(access_token, access_token) )
context.setAuthCookie(response, '__ac_google_hash', hash_str)
# store timestamp in second since the epoch in UTC is enough access_token = str2bytes(response_dict['access_token'])
response_dict["response_timestamp"] = time.time() hash_str = portal.Base_getHMAC(access_token, access_token)
context.Base_setBearerToken(hash_str, portal.setAuthCookie(response, '__ac_google_hash', hash_str)
portal.Base_setBearerToken(
hash_str,
response_dict, response_dict,
"google_server_auth_token_cache_factory") "google_server_auth_token_cache_factory")
user_dict = context.ERP5Site_getGoogleUserEntry(access_token)
user_reference = user_dict["email"]
context.Base_setBearerToken(access_token,
{"reference": user_reference},
"google_server_auth_token_cache_factory")
method = getattr(context, "ERP5Site_createGoogleUserToOAuth", None)
if method is not None:
method(user_reference, user_dict)
# XXX for ERP5JS web sites without a rewrite rule, we make sure there's a trailing / # XXX for ERP5JS web sites without a rewrite rule, we make sure there's a trailing /
return response.redirect(request.get("came_from") or context.absolute_url() + '/') return response.redirect(request.get("came_from") or context.absolute_url() + '/')
......
...@@ -50,7 +50,7 @@ ...@@ -50,7 +50,7 @@
</item> </item>
<item> <item>
<key> <string>_params</string> </key> <key> <string>_params</string> </key>
<value> <string>code=None, error=None</string> </value> <value> <string>code=None, state=None, error=None</string> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
......
portal = context.getPortalObject()
google_connector = portal.ERP5Site_getDefaultGoogleConnector()
return google_connector.redirectToGoogleLoginPage(
"{0}/ERP5Site_receiveGoogleCallback".format(portal.absolute_url()),
RESPONSE=RESPONSE,
)
...@@ -2,25 +2,67 @@ ...@@ -2,25 +2,67 @@
<ZopeData> <ZopeData>
<record id="1" aka="AAAAAAAAAAE="> <record id="1" aka="AAAAAAAAAAE=">
<pickle> <pickle>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/> <global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle> </pickle>
<pickle> <pickle>
<dictionary> <dictionary>
<item> <item>
<key> <string>_function</string> </key> <key> <string>_bind_names</string> </key>
<value> <string>redirectToGoogleLoginPage</string> </value> <value>
<object>
<klass>
<global name="_reconstructor" module="copy_reg"/>
</klass>
<tuple>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
<global name="object" module="__builtin__"/>
<none/>
</tuple>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item> </item>
<item> <item>
<key> <string>_module</string> </key> <key> <string>name_context</string> </key>
<value> <string>GoogleLoginUtility</string> </value> <value> <string>context</string> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>name_m_self</string> </key>
<value> <string>ERP5Site_redirectToGoogleLoginPage</string> </value> <value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>RESPONSE</string> </value>
</item> </item>
<item> <item>
<key> <string>title</string> </key> <key> <string>_proxy_roles</string> </key>
<value> <string></string> </value> <value>
<tuple>
<string>Auditor</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_redirectToGoogleLoginPage</string> </value>
</item> </item>
</dictionary> </dictionary>
</pickle> </pickle>
......
...@@ -25,290 +25,373 @@ ...@@ -25,290 +25,373 @@
# #
############################################################################## ##############################################################################
import uuid import contextlib
import mock import mock
import lxml import lxml
import urlparse import json
import httplib import responses
import time
import six.moves.urllib as urllib
import six.moves.http_client
import six.moves.http_cookies
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript
CLIENT_ID = "a1b2c3"
SECRET_KEY = "3c2ba1"
ACCESS_TOKEN = "T1234"
CODE = "1234"
def getUserId(access_token):
return "dummy@example.com"
def getAccessTokenFromCode(code, redirect_uri):
assert code == CODE, "Invalid code"
# This is an example of a Google response
return {'_module': 'oauth2client.client',
'scopes': ['https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/userinfo.profile'],
'revoke_uri': 'https://accounts.google.com/o/oauth2/revoke',
'access_token': ACCESS_TOKEN,
'token_uri': 'https://www.googleapis.com/oauth2/v4/token',
'token_info_uri': 'https://www.googleapis.com/oauth2/v3/tokeninfo',
'invalid': False,
'token_response': {
'access_token': ACCESS_TOKEN,
'token_type': 'Bearer',
'expires_in': 3600,
'refresh_token': "111",
'id_token': '222'
},
'client_id': CLIENT_ID,
'id_token': {
'picture': '',
'sub': '',
'aud': '',
'family_name': 'D',
'iss': 'https://accounts.google.com',
'email_verified': True,
'at_hash': 'p3vPYQkVuqByBA',
'given_name': 'John',
'exp': 123,
'azp': '123.apps.googleusercontent.com',
'iat': 455,
'locale': 'pt',
'email': getUserId(None),
'name': 'John D'
},
'client_secret': 'secret',
'token_expiry': '2017-03-31T16:06:28Z',
'_class': 'OAuth2Credentials',
'refresh_token': '111',
'user_agent': None
}
def getUserEntry(access_token):
return {
"first_name": "John",
"last_name": "Doe",
"email": getUserId(None),
"reference": getUserId(None)
}
class GoogleLoginTestCase(ERP5TypeTestCase): class GoogleLoginTestCase(ERP5TypeTestCase):
default_google_login_email_address = 'dummy@example.com'
dummy_connector_id = 'test_google_connector'
client_id = "a1b2c3"
secret_key = "3c2ba1"
def afterSetUp(self): def afterSetUp(self):
"""
This is ran before anything, used to set the environment
"""
self.login()
self.portal.TemplateTool_checkGoogleExtractionPluginExistenceConsistency(fixit=True) self.portal.TemplateTool_checkGoogleExtractionPluginExistenceConsistency(fixit=True)
self.dummy_connector_id = "test_google_connector" # use random tokens because we can not clear the memcached cache
self.access_token = 'access-token' + self.id() + self.newPassword()
self.refresh_token = 'refresh-token' + self.id() + self.newPassword()
self.default_user_person = self.portal.person_module.newContent(
portal_type='Person',
first_name=self.id(),
)
self.default_user_person.newContent(
portal_type='Google Login',
reference=self.default_google_login_email_address,
).validate()
self.default_user_person.newContent(portal_type='Assignment').open()
if getattr(self.portal.portal_oauth, self.dummy_connector_id, None) is None:
connector = self.portal.portal_oauth.newContent(
id=self.dummy_connector_id,
portal_type="Google Connector",
reference="default",
client_id=self.client_id,
secret_key=self.secret_key)
connector.validate()
self.tic()
def beforeTearDown(self):
self.abort()
self.portal.portal_caches.getRamCacheRoot().get(
self.portal.acl_users.erp5_google_extraction.cache_factory_name
).clearCache()
self.portal.person_module.manage_delObjects([self.default_user_person.getId()])
portal_catalog = self.portal.portal_catalog portal_catalog = self.portal.portal_catalog
for obj in portal_catalog(portal_type=["Google Login", "Person"],
reference=getUserId(None),
validation_state="validated"):
obj.getObject().invalidate()
uuid_str = uuid.uuid4().hex
obj.setReference(uuid_str)
obj.setUserId(uuid_str)
for connector in portal_catalog(portal_type="Google Connector", for connector in portal_catalog(portal_type="Google Connector",
validation_state="validated", validation_state="validated",
id="NOT %s" % self.dummy_connector_id, id="NOT %s" % self.dummy_connector_id,
reference="default"): reference="default"):
connector.invalidate() connector.invalidate()
if getattr(self.portal.portal_oauth, self.dummy_connector_id, None) is None:
connector = self.portal.portal_oauth.newContent(id=self.dummy_connector_id,
portal_type="Google Connector",
reference="default",
client_id=CLIENT_ID,
secret_key=SECRET_KEY)
connector.validate()
self.tic() self.tic()
self.logout()
@contextlib.contextmanager
def _default_login_responses(self):
with responses.RequestsMock() as rsps:
rsps.add(
method='POST',
url='https://accounts.google.com/o/oauth2/token',
json={
'access_token': self.access_token,
'refresh_token': self.refresh_token,
'expires_in': 3600,
},
)
rsps.add(
method='GET',
url='https://www.googleapis.com/oauth2/v1/userinfo',
json={
"first_name": "John",
"last_name": "Doe",
"email": self.default_google_login_email_address,
}
)
yield
class TestGoogleLogin(GoogleLoginTestCase): class TestGoogleLogin(GoogleLoginTestCase):
def test_redirect(self): def test_redirect(self):
""" """
Check URL generate to redirect to Google Check URL generated to redirect to Google
""" """
self.logout() self.logout()
self.portal.ERP5Site_redirectToGoogleLoginPage() response = self.portal.REQUEST.RESPONSE
location = self.portal.REQUEST.RESPONSE.getHeader("Location") self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response)
location = response.getHeader("Location")
self.assertIn("https://accounts.google.com/o/oauth2/", location) self.assertIn("https://accounts.google.com/o/oauth2/", location)
self.assertIn("response_type=code", location) self.assertIn("response_type=code", location)
self.assertIn("client_id=%s" % CLIENT_ID, location) self.assertIn("client_id=%s" % self.client_id, location)
self.assertNotIn("secret_key=", location) self.assertNotIn("secret_key=", location)
self.assertIn("ERP5Site_receiveGoogleCallback", location) self.assertIn("ERP5Site_receiveGoogleCallback", location)
def test_existing_user(self): def test_existing_user(self):
self.login()
person = self.portal.person_module.newContent(
portal_type='Person',
)
person.newContent(
portal_type='Google Login',
reference=getUserId(None)
).validate()
person.newContent(portal_type='Assignment').open()
self.tic()
self.logout()
request = self.portal.REQUEST request = self.portal.REQUEST
response = request.RESPONSE response = request.RESPONSE
with mock.patch(
'erp5.component.extension.GoogleLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.GoogleLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.__code__ = getAccessTokenFromCode.__code__
getUserEntry_mock.__code__ = getUserEntry.__code__
self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
request["__ac_google_hash"] = response.cookies["__ac_google_hash"]["value"] redirect_url = urllib.parse.urlparse(
self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response))
state = dict(urllib.parse.parse_qsl(redirect_url.query))['state']
self.assertTrue(state)
with mock.patch( code = 'code-ABC'
def token_callback(request):
self.assertEqual(
request.headers['Content-Type'],
'application/x-www-form-urlencoded')
self.assertEqual(
dict(urllib.parse.parse_qsl(request.body)),
{
'client_id': self.client_id,
'code': code,
'grant_type': 'authorization_code',
'client_secret': self.secret_key,
'redirect_uri': self.portal.absolute_url() + '/ERP5Site_receiveGoogleCallback',
},
)
return 200, {}, json.dumps({
'access_token': self.access_token,
'refresh_token': self.refresh_token,
'expires_in': 3600,
})
with responses.RequestsMock() as rsps:
rsps.add_callback(
responses.POST,
'https://accounts.google.com/o/oauth2/token',
token_callback,
)
self.portal.ERP5Site_receiveGoogleCallback(code=code, state=state)
def userinfo_callback(request):
self.assertEqual(
request.headers['Authorization'],
'Bearer ' + self.access_token)
return 200, {}, json.dumps({
"first_name": "John",
"last_name": "Doe",
"email": self.default_google_login_email_address,
})
request['__ac_google_hash'] = response.cookies['__ac_google_hash']['value']
with responses.RequestsMock() as rsps, \
mock.patch(
'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin._setUserNameForAccessLog' 'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin._setUserNameForAccessLog'
) as _setUserNameForAccessLog: ) as _setUserNameForAccessLog:
credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(request) rsps.add_callback(
self.assertEqual( responses.GET,
'Google Login', 'https://www.googleapis.com/oauth2/v1/userinfo',
credentials['login_portal_type']) userinfo_callback,
)
credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(
request)
self.assertEqual(credentials['login_portal_type'], 'Google Login')
self.assertEqual( self.assertEqual(
getUserId(None), credentials['external_login'],
credentials['external_login']) self.default_google_login_email_address)
# this is what will appear in Z2.log # this is what will appear in Z2.log
_setUserNameForAccessLog.assert_called_once_with( _setUserNameForAccessLog.assert_called_once_with(
'erp5_google_extraction=%s' % getUserId(None), 'erp5_google_extraction=dummy@example.com',
request) request)
user_id, login = self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials) user_id, user_name = self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials)
self.assertEqual(person.getUserId(), user_id) self.assertEqual(user_id, self.default_user_person.getUserId())
self.assertEqual(getUserId(None), login) self.assertEqual(user_name, self.default_google_login_email_address)
self.login(user_id) self.login(user_id)
self.assertEqual(self.portal.Base_getUserCaption(), login) self.assertEqual(self.portal.Base_getUserCaption(), user_name)
def test_auth_cookie(self): def test_auth_cookie(self):
request = self.portal.REQUEST request = self.portal.REQUEST
response = request.RESPONSE response = request.RESPONSE
# (the secure flag is only set if we accessed through https) # (the secure flag is only set if we accessed through https)
request.setServerURL('https', 'example.com') request.setServerURL('https', 'example.com')
with mock.patch( redirect_url = urllib.parse.urlparse(
'erp5.component.extension.GoogleLoginUtility.getAccessTokenFromCode', self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response))
side_effect=getAccessTokenFromCode, state = dict(urllib.parse.parse_qsl(redirect_url.query))['state']
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.GoogleLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.__code__ = getAccessTokenFromCode.__code__
getUserEntry_mock.__code__ = getUserEntry.__code__
self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
with self._default_login_responses():
self.portal.ERP5Site_receiveGoogleCallback(code='code', state=state)
ac_cookie, = [v for (k, v) in response.listHeaders() if k.lower() == 'set-cookie' and '__ac_google_hash=' in v] ac_cookie, = [v for (k, v) in response.listHeaders() if k.lower() == 'set-cookie' and '__ac_google_hash=' in v]
self.assertIn('; secure', ac_cookie.lower()) self.assertIn('; secure', ac_cookie.lower())
self.assertIn('; httponly', ac_cookie.lower()) self.assertIn('; httponly', ac_cookie.lower())
self.assertIn('; samesite=lax', ac_cookie.lower()) self.assertIn('; samesite=lax', ac_cookie.lower())
def test_create_user_in_ERP5Site_createGoogleUserToOAuth(self): # make sure user info URL is called for _default_login_responses
""" cookie = six.moves.http_cookies.SimpleCookie()
Check if ERP5 set cookie properly after receive code from external service cookie.load(ac_cookie)
""" resp = self.publish(
self.login() self.portal.getPath(),
id_list = [] env={
for result in self.portal.portal_catalog(portal_type="Credential Request", 'HTTP_COOKIE': '__ac_google_hash="%s"' % cookie.get('__ac_google_hash').value
reference=getUserId(None)): }
id_list.append(result.getObject().getId()) )
self.portal.credential_request_module.manage_delObjects(ids=id_list) self.assertEqual(resp.getStatus(), six.moves.http_client.OK)
skin = self.portal.portal_skins.custom
createZODBPythonScript(skin, "CredentialRequest_createUser", "", """ def test_non_existing_user(self):
person = context.getDestinationDecisionValue(portal_type="Person") request = self.portal.REQUEST
response = request.RESPONSE
login_list = [x for x in person.objectValues(portal_type='Google Login') \
if x.getValidationState() == 'validated'] redirect_url = urllib.parse.urlparse(
self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response))
if len(login_list): state = dict(urllib.parse.parse_qsl(redirect_url.query))['state']
login = login_list[0]
else: with responses.RequestsMock() as rsps:
login = person.newContent(portal_type='Google Login') rsps.add(
method='POST',
reference = context.getReference() url='https://accounts.google.com/o/oauth2/token',
if not login.hasReference(): json={
if not reference: 'access_token': self.access_token,
raise ValueError("Impossible to create an account without login") 'refresh_token': self.refresh_token,
login.setReference(reference) 'expires_in': 3600,
if not person.Person_getUserId(): },
person.setUserId(reference) )
self.portal.ERP5Site_receiveGoogleCallback(code='code', state=state)
if login.getValidationState() == 'draft':
login.validate() request['__ac_google_hash'] = response.cookies['__ac_google_hash']['value']
with responses.RequestsMock() as rsps:
return reference, None rsps.add(
""") method='GET',
url='https://www.googleapis.com/oauth2/v1/userinfo',
createZODBPythonScript(skin, "ERP5Site_createGoogleUserToOAuth", "user_reference, user_dict", """ json={
module = context.getPortalObject().getDefaultModule(portal_type='Credential Request') "first_name": "Bob",
credential_request = module.newContent( "last_name": "Doe",
portal_type="Credential Request", "email": "unknown@example.com",
first_name=user_dict["first_name"], }
last_name=user_dict["last_name"], )
reference=user_reference, credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(
default_email_text=user_dict["email"], request)
)
credential_request.submit() self.assertEqual(credentials['login_portal_type'], 'Google Login')
context.portal_alarms.accept_submitted_credentials.activeSense() self.assertEqual(
return credential_request credentials['external_login'],
""") "unknown@example.com")
self.logout()
self.assertIsNone(
self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials))
def test_invalid_cookie(self):
request = self.portal.REQUEST
request['__ac_google_hash'] = '???'
credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(
request)
self.assertEqual(credentials, {})
def test_refresh_token(self):
request = self.portal.REQUEST
response = request.RESPONSE
redirect_url = urllib.parse.urlparse(
self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response))
state = dict(urllib.parse.parse_qsl(redirect_url.query))['state']
with self._default_login_responses():
resp = self.publish(
'%s/ERP5Site_receiveGoogleCallback?%s' % (
self.portal.getPath(),
urllib.parse.urlencode(
{
'code': 'code',
'state': state,
}
)
)
)
self.assertEqual(resp.getStatus(), six.moves.http_client.FOUND)
env = {
'HTTP_COOKIE': '__ac_google_hash="%s"' % resp.getCookie('__ac_google_hash')['value']
}
resp = self.publish(self.portal.getPath(), env=env)
self.assertEqual(resp.getStatus(), six.moves.http_client.OK)
def token_callback(request):
self.assertEqual(
request.headers['Content-Type'],
'application/x-www-form-urlencoded')
self.assertEqual(
dict(urllib.parse.parse_qsl(request.body)),
{
'access_type': 'offline',
'client_id': self.client_id,
'client_secret': self.secret_key,
'grant_type': 'refresh_token',
'refresh_token': self.refresh_token,
}
)
return 200, {}, json.dumps({
'access_token': 'new' + self.access_token,
'refresh_token': 'new' + self.refresh_token,
'expires_in': 3600,
})
with mock.patch( with mock.patch(
'erp5.component.extension.GoogleLoginUtility.getAccessTokenFromCode', 'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin.time.time',
side_effect=getAccessTokenFromCode, return_value=time.time() + 5000), \
) as getAccessTokenFromCode_mock, \ responses.RequestsMock() as rsps:
mock.patch( rsps.add_callback(
'erp5.component.extension.GoogleLoginUtility.getUserEntry', responses.POST,
side_effect=getUserEntry 'https://accounts.google.com/o/oauth2/token',
) as getUserEntry_mock: token_callback,
getAccessTokenFromCode_mock.__code__ = getAccessTokenFromCode.__code__
getUserEntry_mock.__code__ = getUserEntry.__code__
response = self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
google_hash = self.portal.REQUEST.RESPONSE.cookies.get("__ac_google_hash")["value"]
self.assertEqual("b01533abb684a658dc71c81da4e67546", google_hash)
absolute_url = self.portal.absolute_url()
self.assertNotEqual(absolute_url[-1], '/')
self.assertEqual(absolute_url + '/', response)
cache_dict = self.portal.Base_getBearerToken(google_hash, "google_server_auth_token_cache_factory")
self.assertEqual(CLIENT_ID, cache_dict["client_id"])
self.assertEqual(ACCESS_TOKEN, cache_dict["access_token"])
self.assertEqual({'reference': getUserId(None)},
self.portal.Base_getBearerToken(ACCESS_TOKEN, "google_server_auth_token_cache_factory")
) )
self.portal.REQUEST["__ac_google_hash"] = google_hash # refreshing the token calls userinfo again
erp5_google_extractor = self.portal.acl_users.erp5_google_extraction rsps.add(
self.assertEqual({'external_login': getUserId(None), method='GET',
'login_portal_type': 'Google Login', url='https://www.googleapis.com/oauth2/v1/userinfo',
'remote_host': '', json={
'remote_address': ''}, erp5_google_extractor.extractCredentials(self.portal.REQUEST)) "first_name": "John",
self.tic() "last_name": "Doe",
self.login() "email": self.default_google_login_email_address,
credential_request = self.portal.portal_catalog(portal_type="Credential Request", }
reference=getUserId(None))[0].getObject() )
credential_request.accept() resp = self.publish(self.portal.getPath(), env=env)
person = credential_request.getDestinationDecisionValue() self.assertEqual(resp.getStatus(), six.moves.http_client.OK)
google_login = person.objectValues(portal_types="Google Login")[0]
self.assertEqual(getUserId(None), google_login.getReference()) resp = self.publish(self.portal.getPath(), env=env)
self.assertEqual(resp.getStatus(), six.moves.http_client.OK)
def test_refresh_token_expired(self):
request = self.portal.REQUEST
response = request.RESPONSE
redirect_url = urllib.parse.urlparse(
self.portal.ERP5Site_redirectToGoogleLoginPage(RESPONSE=response))
state = dict(urllib.parse.parse_qsl(redirect_url.query))['state']
with self._default_login_responses():
resp = self.publish(
'%s/ERP5Site_receiveGoogleCallback?%s' % (
self.portal.getPath(),
urllib.parse.urlencode(
{
'code': 'code',
'state': state,
}
)
)
)
self.assertEqual(resp.getStatus(), six.moves.http_client.FOUND)
env = {
'HTTP_COOKIE': '__ac_google_hash="%s"' % resp.getCookie('__ac_google_hash')['value']
}
resp = self.publish(self.portal.getPath(), env=env)
self.assertEqual(resp.getStatus(), six.moves.http_client.OK)
with mock.patch(
'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin.time.time',
return_value=time.time() + 5000), \
responses.RequestsMock() as rsps:
rsps.add(
method='POST',
url='https://accounts.google.com/o/oauth2/token',
status=six.moves.http_client.UNAUTHORIZED,
)
resp = self.publish(self.portal.getPath(), env=env)
self.assertEqual(resp.getStatus(), six.moves.http_client.FOUND)
self.assertIn('/login_form', resp.getHeader('Location'))
def test_logout(self): def test_logout(self):
resp = self.publish(self.portal.getId() + '/logout') resp = self.publish(self.portal.getId() + '/logout')
...@@ -328,9 +411,9 @@ class TestERP5JSGoogleLogin(GoogleLoginTestCase): ...@@ -328,9 +411,9 @@ class TestERP5JSGoogleLogin(GoogleLoginTestCase):
if img.attrib['alt'] == 'Sign in with Google' if img.attrib['alt'] == 'Sign in with Google'
] ]
self.assertIn('/ERP5Site_redirectToGoogleLoginPage', google_login_link) self.assertIn('/ERP5Site_redirectToGoogleLoginPage', google_login_link)
resp = self.publish(urlparse.urlparse(google_login_link).path) resp = self.publish(urllib.parse.urlparse(google_login_link).path)
# this request redirects to google # this request redirects to google
self.assertEqual(resp.getStatus(), httplib.FOUND) self.assertEqual(resp.getStatus(), six.moves.http_client.FOUND)
self.assertIn('google.com', resp.getHeader('Location')) self.assertIn('google.com', resp.getHeader('Location'))
def test_logout(self): def test_logout(self):
......
document.erp5.GoogleConnector
\ No newline at end of file
extension.erp5.GoogleLoginUtility
\ No newline at end of file
Google Connector | OAuthClient Google Connector | OAuthClient
Google Connector | SocketClient
Template Tool | TemplateToolERP5GoogleExtractionPluginConstraint Template Tool | TemplateToolERP5GoogleExtractionPluginConstraint
\ No newline at end of file
...@@ -35,54 +35,22 @@ from Products.PluggableAuthService.utils import classImplements ...@@ -35,54 +35,22 @@ from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.ERP5Security import _setUserNameForAccessLog from Products.ERP5Security import _setUserNameForAccessLog
from AccessControl.SecurityManagement import getSecurityManager, \
setSecurityManager, newSecurityManager
from Products.ERP5Type.Cache import DEFAULT_CACHE_SCOPE from Products.ERP5Type.Cache import DEFAULT_CACHE_SCOPE
import time import time
from six.moves import urllib from zLOG import LOG, INFO
import json
from zLOG import LOG, ERROR, INFO
try: try:
import facebook import facebook
except ImportError: except ImportError:
facebook = None facebook = None
try:
import apiclient.discovery
import httplib2
import oauth2client.client
except ImportError:
httplib2 = None
#Form for new plugin in ZMI # Form for new plugin in ZMI
manage_addERP5FacebookExtractionPluginForm = PageTemplateFile( manage_addERP5FacebookExtractionPluginForm = PageTemplateFile(
'www/ERP5Security_addERP5FacebookExtractionPlugin', globals(), 'www/ERP5Security_addERP5FacebookExtractionPlugin', globals(),
__name__='manage_addERP5FacebookExtractionPluginForm') __name__='manage_addERP5FacebookExtractionPluginForm')
def getGoogleUserEntry(token):
if httplib2 is None:
LOG('ERP5GoogleExtractionPlugin', INFO,
'No Google modules available, please install google-api-python-client '
'package. Authentication disabled..')
return None
http = oauth2client.client.AccessTokenCredentials(token,
'ERP5 Client'
).authorize(httplib2.Http(timeout=5))
service = apiclient.discovery.build("oauth2", "v1", http=http)
google_entry = service.userinfo().get().execute()
user_entry = {}
if google_entry is not None:
# sanitise value
for k in (('first_name', 'given_name'),
('last_name', 'family_name'),
('email', 'email'),
('reference', 'email'),):
value = google_entry.get(k[1], '').encode('utf-8')
user_entry[k[0]] = value
return user_entry
def addERP5FacebookExtractionPlugin(dispatcher, id, title=None, REQUEST=None): def addERP5FacebookExtractionPlugin(dispatcher, id, title=None, REQUEST=None):
""" Add a ERP5FacebookExtractionPlugin to a Pluggable Auth Service. """ """ Add a ERP5FacebookExtractionPlugin to a Pluggable Auth Service. """
...@@ -97,7 +65,7 @@ def addERP5FacebookExtractionPlugin(dispatcher, id, title=None, REQUEST=None): ...@@ -97,7 +65,7 @@ def addERP5FacebookExtractionPlugin(dispatcher, id, title=None, REQUEST=None):
'ERP5FacebookExtractionPlugin+added.' 'ERP5FacebookExtractionPlugin+added.'
% dispatcher.absolute_url()) % dispatcher.absolute_url())
#Form for new plugin in ZMI # Form for new plugin in ZMI
manage_addERP5GoogleExtractionPluginForm = PageTemplateFile( manage_addERP5GoogleExtractionPluginForm = PageTemplateFile(
'www/ERP5Security_addERP5GoogleExtractionPlugin', globals(), 'www/ERP5Security_addERP5GoogleExtractionPlugin', globals(),
__name__='manage_addERP5GoogleExtractionPluginForm') __name__='manage_addERP5GoogleExtractionPluginForm')
...@@ -156,12 +124,12 @@ class ERP5ExternalOauth2ExtractionPlugin: ...@@ -156,12 +124,12 @@ class ERP5ExternalOauth2ExtractionPlugin:
for cache_plugin in cache_factory.getCachePluginList(): for cache_plugin in cache_factory.getCachePluginList():
cache_entry = cache_plugin.get(key, DEFAULT_CACHE_SCOPE) cache_entry = cache_plugin.get(key, DEFAULT_CACHE_SCOPE)
if cache_entry is not None: if cache_entry is not None:
# Avoid errors if the plugin don't have the funcionality of refresh token
refreshTokenIfExpired = getattr(self, "refreshTokenIfExpired", None)
cache_value = cache_entry.getValue() cache_value = cache_entry.getValue()
if refreshTokenIfExpired is not None: # getToken is called for the access_token_dict and for
return refreshTokenIfExpired(key, cache_value) # the user entry. We try to refresh only for the
else: # access_token_dict
if 'refresh_token' in cache_value:
return self.refreshTokenIfExpired(key, cache_value)
return cache_value return cache_value
raise KeyError('Key %r not found' % key) raise KeyError('Key %r not found' % key)
...@@ -170,19 +138,29 @@ class ERP5ExternalOauth2ExtractionPlugin: ...@@ -170,19 +138,29 @@ class ERP5ExternalOauth2ExtractionPlugin:
#################################### ####################################
security.declarePrivate('extractCredentials') security.declarePrivate('extractCredentials')
def extractCredentials(self, request): def extractCredentials(self, request):
""" Extract Oauth2 credentials from the request header. """ """ Extract Oauth2 credentials from cookie.
user_dict = {}
This plugins uses two level of cache storage:
- cookie_value => access_token_dict
- access_token => user_entry
access_token_dict depends on the concrete plugin classes,
but this is generally access_token and refresh_token.
user_entry must contain "reference", which is the reference
of the corresponding login document in ERP5.
"""
access_token_dict = {}
cookie_hash = request.get(self.cookie_name) cookie_hash = request.get(self.cookie_name)
if cookie_hash is not None: if cookie_hash is not None:
try: try:
user_dict = self.getToken(cookie_hash) access_token_dict = self.getToken(cookie_hash)
except KeyError: except KeyError:
LOG(self.getId(), INFO, 'Hash %s not found' % cookie_hash) LOG(self.getId(), INFO, 'Hash %s not found' % cookie_hash)
return {} return {}
token = None token = None
if "access_token" in user_dict: if "access_token" in access_token_dict:
token = user_dict["access_token"] token = access_token_dict["access_token"]
if token is None: if token is None:
# no token, then no credentials # no token, then no credentials
...@@ -192,7 +170,7 @@ class ERP5ExternalOauth2ExtractionPlugin: ...@@ -192,7 +170,7 @@ class ERP5ExternalOauth2ExtractionPlugin:
try: try:
user_entry = self.getToken(token) user_entry = self.getToken(token)
except KeyError: except KeyError:
user_entry = self.getUserEntry(token) user_entry = self.getUserEntry(access_token_dict)
if user_entry is not None: if user_entry is not None:
# Reduce data size because, we don't need more than reference # Reduce data size because, we don't need more than reference
user_entry = {"reference": user_entry["reference"]} user_entry = {"reference": user_entry["reference"]}
...@@ -259,40 +237,36 @@ class ERP5FacebookExtractionPlugin(ERP5ExternalOauth2ExtractionPlugin, BasePlugi ...@@ -259,40 +237,36 @@ class ERP5FacebookExtractionPlugin(ERP5ExternalOauth2ExtractionPlugin, BasePlugi
cookie_name = "__ac_facebook_hash" cookie_name = "__ac_facebook_hash"
cache_factory_name = "facebook_server_auth_token_cache_factory" cache_factory_name = "facebook_server_auth_token_cache_factory"
def refreshTokenIfExpired(self, key, cache_value): def refreshTokenIfExpired(self, key, access_token_dict):
return cache_value return access_token_dict
def getUserEntry(self, token): def getUserEntry(self, token):
return getFacebookUserDict(token) return getFacebookUserDict(token)
class ERP5GoogleExtractionPlugin(ERP5ExternalOauth2ExtractionPlugin, BasePlugin): class ERP5GoogleExtractionPlugin(ERP5ExternalOauth2ExtractionPlugin, BasePlugin):
""" """
Plugin to authenicate as machines. Plugin to authenticate using google OAuth2.
""" """
meta_type = "ERP5 Google Extraction Plugin" meta_type = "ERP5 Google Extraction Plugin"
login_portal_type = "Google Login" login_portal_type = "Google Login"
cookie_name = "__ac_google_hash" cookie_name = "__ac_google_hash"
cache_factory_name = "google_server_auth_token_cache_factory" cache_factory_name = "google_server_auth_token_cache_factory"
def refreshTokenIfExpired(self, key, cache_value): def refreshTokenIfExpired(self, key, access_token_dict):
expires_in = cache_value.get("token_response", {}).get("expires_in") if (time.time() - access_token_dict["response_timestamp"]) \
refresh_token = cache_value.get("refresh_token") >= access_token_dict['expires_in']:
if expires_in and refresh_token: access_token_dict = self.getPortalObject().unrestrictedTraverse(
if (time.time() - cache_value["response_timestamp"]) >= float(expires_in): access_token_dict['connector_relative_url']
credential = oauth2client.client.OAuth2Credentials( ).refreshToken(access_token_dict)
cache_value["access_token"], cache_value["client_id"], self.setToken(key, access_token_dict)
cache_value["client_secret"], refresh_token, return access_token_dict
cache_value["token_expiry"], cache_value["token_uri"],
cache_value["user_agent"]) def getUserEntry(self, access_token_dict):
credential.refresh(httplib2.Http(timeout=5)) return self.getPortalObject().unrestrictedTraverse(
cache_value = json.loads(credential.to_json()) access_token_dict['connector_relative_url'],
cache_value["response_timestamp"] = time.time() ).getUserEntry(access_token_dict['access_token'])
self.setToken(key, cache_value)
return cache_value
def getUserEntry(self, token):
return getGoogleUserEntry(token)
#List implementation of class #List implementation of class
classImplements( ERP5FacebookExtractionPlugin, classImplements( ERP5FacebookExtractionPlugin,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment