Commit f355bcbf authored by Jérome Perrin's avatar Jérome Perrin

ERP5Security: make plugins log a username

This works only for medusa, using the same approach as CMFCore's
CookieCrumbler

Also increase test coverage of google/facebook plugins.

/reviewed-on !901
parents d5cfa2cc cc03d4fa
......@@ -33,6 +33,7 @@ from Products.PluggableAuthService.interfaces.plugins import IAuthenticationPlug
from DateTime import DateTime
import base64
import StringIO
import mock
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Security.ERP5DumbHTTPExtractionPlugin import ERP5DumbHTTPExtractionPlugin
......@@ -110,12 +111,18 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = self._getTokenCredential(self.portal.REQUEST)
with mock.patch(
'Products.ERP5Security.ERP5AccessTokenExtractionPlugin._setUserNameForAccessLog'
) as _setUserNameForAccessLog:
result = self._getTokenCredential(self.portal.REQUEST)
self.assertTrue(result)
user_id, login = result
self.assertEqual(user_id, person.Person_getUserId())
# tokens have a login value, for auditing purposes
self.assertEqual(access_token.getRelativeUrl(), login)
# tokens have a login value, for auditing purposes. This is the ID of the plugin
# and the relative URL of the token.
self.assertEqual('erp5_access_token_plugin=%s' % access_token.getRelativeUrl(), login)
# this is also what will appear in Z2.log
_setUserNameForAccessLog.assert_called_once_with(login, self.portal.REQUEST)
def test_bad_token(self):
person = self._createPerson(self.new_id)
......
......@@ -26,8 +26,8 @@
##############################################################################
import uuid
import mock
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from erp5.component.extension import FacebookLoginUtility
from Products.ERP5Type.tests.utils import createZODBPythonScript
CLIENT_ID = "a1b2c3"
......@@ -50,28 +50,15 @@ def getUserEntry(access_token):
'reference': getUserId(None),
'email': "dummy@example.org"}
FacebookLoginUtility_getAccessTokenFromCode = FacebookLoginUtility.getAccessTokenFromCode
FacebookLoginUtility_getUserEntry = FacebookLoginUtility.getUserEntry
class TestFacebookLogin(ERP5TypeTestCase):
def getTitle(self):
return "Test Facebook Login"
def beforeTearDown(self):
FacebookLoginUtility.getAccessTokenFromCode = FacebookLoginUtility_getAccessTokenFromCode
FacebookLoginUtility.getUserEntry = FacebookLoginUtility_getUserEntry
def afterSetUp(self):
"""
This is ran before anything, used to set the environment
"""
self.login()
self.portal.TemplateTool_checkFacebookExtractionPluginExistenceConsistency(fixit=True)
# Patch extension to avoid external connection
FacebookLoginUtility.getUserId = getUserId
FacebookLoginUtility.getAccessTokenFromCode = getAccessTokenFromCode
FacebookLoginUtility.getUserEntry = getUserEntry
self.dummy_connector_id = "test_facebook_connector"
portal_catalog = self.portal.portal_catalog
......@@ -111,13 +98,73 @@ class TestFacebookLogin(ERP5TypeTestCase):
self.assertNotIn("secret_key=", location)
self.assertIn("ERP5Site_callbackFacebookLogin", location)
def test_existing_user(self):
self.login()
person = self.portal.person_module.newContent(
portal_type='Person',
)
person.newContent(
portal_type='Facebook Login',
reference=getUserId(None)
).validate()
person.newContent(portal_type='Assignment').open()
self.tic()
self.logout()
request = self.portal.REQUEST
response = request.RESPONSE
with mock.patch(
'erp5.component.extension.FacebookLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.FacebookLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.func_code = getAccessTokenFromCode.func_code
getUserEntry_mock.func_code = getUserEntry.func_code
self.portal.ERP5Site_callbackFacebookLogin(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
request["__ac_facebook_hash"] = response.cookies["__ac_facebook_hash"]["value"]
with mock.patch(
'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin._setUserNameForAccessLog'
) as _setUserNameForAccessLog:
credentials = self.portal.acl_users.erp5_facebook_extraction.extractCredentials(request)
self.assertEqual(
'Facebook Login',
credentials['login_portal_type'])
self.assertEqual(
getUserId(None),
credentials['external_login'])
# this is what will appear in Z2.log
_setUserNameForAccessLog.assert_called_once_with(
'erp5_facebook_extraction=%s' % getUserId(None),
request)
user_id, login = self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials)
self.assertEqual(person.getUserId(), user_id)
self.assertEqual(getUserId(None), login)
def test_auth_cookie(self):
request = self.portal.REQUEST
response = request.RESPONSE
# (the secure flag is only set if we accessed through https)
request.setServerURL('https', 'example.com')
with mock.patch(
'erp5.component.extension.FacebookLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.FacebookLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.func_code = getAccessTokenFromCode.func_code
getUserEntry_mock.func_code = getUserEntry.func_code
self.portal.ERP5Site_callbackFacebookLogin(code=CODE)
self.portal.ERP5Site_callbackFacebookLogin(code=CODE)
ac_cookie, = [v for (k, v) in response.listHeaders() if k.lower() == 'set-cookie' and '__ac_facebook_hash=' in v]
self.assertIn('; Secure', ac_cookie)
self.assertIn('; HTTPOnly', ac_cookie)
......@@ -171,7 +218,17 @@ context.portal_alarms.accept_submitted_credentials.activeSense()
return credential_request
""")
self.logout()
response = self.portal.ERP5Site_callbackFacebookLogin(code=CODE)
with mock.patch(
'erp5.component.extension.FacebookLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.FacebookLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.func_code = getAccessTokenFromCode.func_code
getUserEntry_mock.func_code = getUserEntry.func_code
response = self.portal.ERP5Site_callbackFacebookLogin(code=CODE)
facebook_hash = self.portal.REQUEST.RESPONSE.cookies.get("__ac_facebook_hash")["value"]
self.assertEqual("8cec04e21e927f1023f4f4980ec11a77", facebook_hash)
# The # is because we workaround facebook adding #_=_ in return URL
......
......@@ -26,10 +26,11 @@
##############################################################################
import uuid
import mock
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from erp5.component.extension import GoogleLoginUtility
from Products.ERP5Type.tests.utils import createZODBPythonScript
CLIENT_ID = "a1b2c3"
SECRET_KEY = "3c2ba1"
ACCESS_TOKEN = "T1234"
......@@ -87,28 +88,15 @@ def getUserEntry(access_token):
"reference": getUserId(None)
}
GoogleLoginUtility_getAccessTokenFromCode = GoogleLoginUtility.getAccessTokenFromCode
GoogleLoginUtility_getUserEntry = GoogleLoginUtility.getUserEntry
class TestGoogleLogin(ERP5TypeTestCase):
def getTitle(self):
return "Test Google Login"
def beforeTearDown(self):
GoogleLoginUtility.getAccessTokenFromCode = GoogleLoginUtility_getAccessTokenFromCode
GoogleLoginUtility.getUserEntry = GoogleLoginUtility_getUserEntry
def afterSetUp(self):
"""
This is ran before anything, used to set the environment
"""
self.login()
self.portal.TemplateTool_checkGoogleExtractionPluginExistenceConsistency(fixit=True)
# Patch extension to avoid external connection
GoogleLoginUtility.getUserId = getUserId
GoogleLoginUtility.getAccessTokenFromCode = getAccessTokenFromCode
GoogleLoginUtility.getUserEntry = getUserEntry
self.dummy_connector_id = "test_google_connector"
portal_catalog = self.portal.portal_catalog
......@@ -148,13 +136,77 @@ class TestGoogleLogin(ERP5TypeTestCase):
self.assertNotIn("secret_key=", location)
self.assertIn("ERP5Site_receiveGoogleCallback", location)
def test_existing_user(self):
self.login()
person = self.portal.person_module.newContent(
portal_type='Person',
)
person.newContent(
portal_type='Google Login',
reference=getUserId(None)
).validate()
person.newContent(portal_type='Assignment').open()
self.tic()
self.logout()
request = self.portal.REQUEST
response = request.RESPONSE
with mock.patch(
'erp5.component.extension.GoogleLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.GoogleLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.func_code = getAccessTokenFromCode.func_code
getUserEntry_mock.func_code = getUserEntry.func_code
self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
request["__ac_google_hash"] = response.cookies["__ac_google_hash"]["value"]
with mock.patch(
'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin._setUserNameForAccessLog'
) as _setUserNameForAccessLog:
credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(request)
self.assertEqual(
'Google Login',
credentials['login_portal_type'])
self.assertEqual(
getUserId(None),
credentials['external_login'])
# this is what will appear in Z2.log
_setUserNameForAccessLog.assert_called_once_with(
'erp5_google_extraction=%s' % getUserId(None),
request)
user_id, login = self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials)
self.assertEqual(person.getUserId(), user_id)
self.assertEqual(getUserId(None), login)
def test_auth_cookie(self):
request = self.portal.REQUEST
response = request.RESPONSE
# (the secure flag is only set if we accessed through https)
request.setServerURL('https', 'example.com')
self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
with mock.patch(
'erp5.component.extension.GoogleLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.GoogleLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.func_code = getAccessTokenFromCode.func_code
getUserEntry_mock.func_code = getUserEntry.func_code
self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
ac_cookie, = [v for (k, v) in response.listHeaders() if k.lower() == 'set-cookie' and '__ac_google_hash=' in v]
self.assertIn('; Secure', ac_cookie)
self.assertIn('; HTTPOnly', ac_cookie)
......@@ -209,7 +261,21 @@ context.portal_alarms.accept_submitted_credentials.activeSense()
return credential_request
""")
self.logout()
response = self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
with mock.patch(
'erp5.component.extension.GoogleLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.GoogleLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.func_code = getAccessTokenFromCode.func_code
getUserEntry_mock.func_code = getUserEntry.func_code
response = self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
google_hash = self.portal.REQUEST.RESPONSE.cookies.get("__ac_google_hash")["value"]
self.assertEqual("b01533abb684a658dc71c81da4e67546", google_hash)
self.assertEqual(self.portal.absolute_url(), response)
......
......@@ -38,6 +38,8 @@ from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod
from Products.ERP5Security import _setUserNameForAccessLog
class ERP5AccessTokenExtractionPlugin(BasePlugin):
"""
......@@ -68,6 +70,7 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin):
creds['erp5_access_token_id'] = token
creds['remote_host'] = request.get('REMOTE_HOST', '')
creds['remote_address'] = request.getClientAddr()
creds['request'] = request
return creds
#######################
......@@ -86,7 +89,9 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin):
if method is not None:
user_value = method()
if user_value is not None:
return (user_value.getUserId(), token_document.getRelativeUrl())
username = '%s=%s' % (self.getId(), token_document.getRelativeUrl())
_setUserNameForAccessLog(username, credentials['request'])
return (user_value.getUserId(), username)
#Form for new plugin in ZMI
......
......@@ -33,7 +33,8 @@ from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.interfaces import plugins
from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products import ERP5Security
from Products.ERP5Security import _setUserNameForAccessLog
from AccessControl.SecurityManagement import getSecurityManager, \
setSecurityManager, newSecurityManager
from Products.ERP5Type.Cache import DEFAULT_CACHE_SCOPE
......@@ -224,6 +225,8 @@ class ERP5ExternalOauth2ExtractionPlugin:
creds['remote_address'] = request.getClientAddr()
except AttributeError:
creds['remote_address'] = request.get('REMOTE_ADDR', '')
_setUserNameForAccessLog('%s=%s' % (self.getId(), creds['external_login']) , request)
return creds
def getFacebookUserEntry(token):
......
......@@ -17,6 +17,7 @@
from copy import deepcopy
from collections import defaultdict
from base64 import encodestring
from Acquisition import aq_inner, aq_parent
from AccessControl.Permissions import manage_users as ManageUsers
......@@ -53,6 +54,23 @@ def mergedLocalRoles(object):
break
return deepcopy(merged)
def _setUserNameForAccessLog(username, REQUEST):
"""Make the current user look as `username` in Zope's Z2.log
Taken from Products.CMFCore.CookieCrumbler._setAuthHeader
"""
# Set the authorization header in the medusa http request
# so that the username can be logged to the Z2.log
try:
# Put the full-arm latex glove on now...
medusa_headers = REQUEST.RESPONSE.stdout._request._header_cache
except AttributeError:
pass
else:
medusa_headers['authorization'] = 'Basic %s' % encodestring('%s:' % username).rstrip()
def initialize(context):
import ERP5UserManager
import ERP5LoginUserManager
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment