Commit fe6236be authored by Jérome Perrin's avatar Jérome Perrin

Fix AccessToken login with ERP5 Login

Backport nexedi/erp5!838

/reviewed-on https://lab.nexedi.com/nexedi/erp5-capago/merge_requests/37
parents 7765bbe7 91ea6e2d
......@@ -6,4 +6,7 @@
<item>Reference</item>
<item>Url</item>
</portal_type>
<portal_type id="Template Tool">
<item>TemplateToolERP5AccessTokenExtractionPluginConstraint</item>
</portal_type>
</property_sheet_list>
\ No newline at end of file
......@@ -2,122 +2,65 @@
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Test Component" module="erp5.portal_type"/>
<global name="Property Sheet" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<key> <string>_count</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testERP5DumbHTTPExtractionPlugin</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.testERP5DumbHTTPExtractionPlugin</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Test Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<key> <string>_mt_index</string> </key>
<value>
<none/>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<key> <string>_tree</string> </key>
<value>
<tuple/>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<key> <string>description</string> </key>
<value>
<tuple/>
<none/>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
<key> <string>id</string> </key>
<value> <string>TemplateToolERP5AccessTokenExtractionPluginConstraint</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
<key> <string>portal_type</string> </key>
<value> <string>Property Sheet</string> </value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
<global name="Length" module="BTrees.Length"/>
</pickle>
<pickle> <int>0</int> </pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
<global name="OOBTree" module="BTrees.OOBTree"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
<none/>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
<global name="OOBTree" module="BTrees.OOBTree"/>
</pickle>
<pickle>
<tuple>
<none/>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</tuple>
</pickle>
</record>
</ZopeData>
......@@ -2,61 +2,47 @@
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Test Component" module="erp5.portal_type"/>
<global name="Script Constraint" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<key> <string>_identity_criterion</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testERP5AccessTokenAlarm</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<key> <string>_range_criterion</string> </key>
<value>
<none/>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.testERP5AccessTokenAlarm</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Test Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<key> <string>categories</string> </key>
<value>
<none/>
<tuple>
<string>constraint_type/post_upgrade</string>
</tuple>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<key> <string>description</string> </key>
<value>
<tuple/>
<none/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple/>
</value>
<key> <string>id</string> </key>
<value> <string>ERP5AccessTokenExtractionPlugin_existence_constraint</string> </value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
<key> <string>portal_type</string> </key>
<value> <string>Script Constraint</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
<key> <string>script_id</string> </key>
<value> <string>TemplateTool_checkERP5AccessTokenExtractionPluginExistenceConsistency</string> </value>
</item>
</dictionary>
</pickle>
......@@ -85,39 +71,10 @@
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
</pickle>
<pickle>
<tuple>
<none/>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</tuple>
</pickle>
</record>
</ZopeData>
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>AccessToken_getExternalLogin</string> </value>
<value> <string>AccessToken_getUserId</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>RestrictedAccessToken_getExternalLogin</string> </value>
<value> <string>OneTimeRestrictedAccessToken_getUserId</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -22,6 +22,21 @@ if access_token_document.getValidationState() == 'validated':
agent_document = access_token_document.getAgentValue()
if agent_document is not None:
if agent_document.getPortalType() == 'Person':
# if this is a token for a person, only make accept if person has valid
# assignments (for compatibility with login/password authentication)
if agent_document.getValidationState() == 'deleted':
return None
now = DateTime()
for assignment in agent_document.contentValues(portal_type='Assignment'):
if assignment.getValidationState() == "open" and (
not assignment.hasStartDate() or assignment.getStartDate() <= now
) and (
not assignment.hasStopDate() or assignment.getStopDate() >= now
):
break
else:
return None
result = agent_document.Person_getUserId()
return result
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>OneTimeRestrictedAccessToken_getExternalLogin</string> </value>
<value> <string>RestrictedAccessToken_getUserId</string> </value>
</item>
</dictionary>
</pickle>
......
alpha = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
random_id = ''
for i in range(0,128):
for _ in range(0, 128):
random_id += random.choice(alpha)
# Define Reference from ID provided by portal_ids
......
acl_users = context.getPortalObject().acl_users
token_extraction_id = "erp5_access_token_plugin"
access_token_plugin_list = [
plugin for plugin in acl_users.objectValues()
if plugin.meta_type == 'ERP5 Access Token Extraction Plugin']
if len(access_token_plugin_list) > 1:
return ["More than one plugin found: %s" % access_token_plugin_list]
error_list = []
if not access_token_plugin_list:
# A dumb http extraction plugin is required as fallback if we use an access token
# since https://github.com/Nexedi/erp5/commit/0bee523da0075c6efe3c06296dddd01d9dd5045a
# we enable it automatically at site creation, but for compatibility with old instances
# make sure it is created if needed
if 'erp5_dumb_http_extraction' not in acl_users.objectIds():
error_list.append("erp5_dumb_http_extraction is missing")
if fixit:
dispacher = acl_users.manage_addProduct['ERP5Security']
dispacher.addERP5DumbHTTPExtractionPlugin('erp5_dumb_http_extraction')
acl_users.erp5_dumb_http_extraction.manage_activateInterfaces(('IExtractionPlugin', ))
error_list.append("erp5_access_token_plugin is missing")
if fixit:
dispacher = acl_users.manage_addProduct['ERP5Security']
dispacher.addERP5AccessTokenExtractionPlugin(token_extraction_id)
access_token_plugin_list = [getattr(acl_users, token_extraction_id)]
if access_token_plugin_list:
access_token_plugin, = access_token_plugin_list
# We only check that our plugin is enabled for IAuthenticationPlugin, this covers both
# cases where plugin was not enabled at all or was enabled only for IExtractionPlugin
IAuthenticationPlugin = [
# Products.PluggableAuthService.interfaces.plugins.IAuthenticationPlugin cannot
# be imported in restricted python but we can get it this way.
x for x in acl_users.plugins.listPluginTypeInfo()
if x['id'] == 'IAuthenticationPlugin'][0]['interface']
if (access_token_plugin.getId()
not in acl_users.plugins.listPluginIds(IAuthenticationPlugin)):
error_list.append("erp5_access_token_plugin is not activated")
if fixit:
access_token_plugin.manage_activateInterfaces((
'IExtractionPlugin',
'IAuthenticationPlugin',))
return error_list
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>fixit=False</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>TemplateTool_checkERP5AccessTokenExtractionPluginExistenceConsistency</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
# Copyright (c) 2002-2013 Nexedi SA and Contributors. All Rights Reserved.
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2002-2019 Nexedi SA and Contributors. All Rights Reserved.
# Tristan Cavelier <tristan.cavelier@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
from Products.PluggableAuthService.interfaces.plugins import IAuthenticationPlugin
from DateTime import DateTime
import base64
import StringIO
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
import transaction
class TestERP5AccessTokenSkins(ERP5TypeTestCase):
from Products.ERP5Security.ERP5DumbHTTPExtractionPlugin import ERP5DumbHTTPExtractionPlugin
test_token_extraction_id = 'test_erp5_access_token_extraction'
class AccessTokenTestCase(ERP5TypeTestCase):
def getBusinessTemplateList(self):
return ('erp5_base',
'erp5_access_token')
def _createPerson(self, new_id, password=None):
"""Creates a person in person module, and returns the object, after
indexing is done. """
person_module = self.getPersonModule()
person = person_module.newContent(portal_type='Person',
reference='TESTP-' + new_id)
if password:
person.setPassword(password)
person.newContent(portal_type = 'Assignment').open()
self.tic()
return person
class TestERP5AccessTokenSkins(AccessTokenTestCase):
def generateNewId(self):
return str(self.portal.portal_ids.generateNewId(
id_group=('erp5_access_token_test_id')))
......@@ -19,40 +65,15 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
"""
This is ran before anything, used to set the environment
"""
self.portal = self.getPortalObject()
self.new_id = self.generateNewId()
self._setupAccessTokenExtraction()
transaction.commit()
self.portal.portal_templates.TemplateTool_checkERP5AccessTokenExtractionPluginExistenceConsistency(
fixit=True)
self.tic()
def _setupAccessTokenExtraction(self):
pas = self.portal.acl_users
access_extraction_list = [q for q in pas.objectValues() \
if q.meta_type == 'ERP5 Access Token Extraction Plugin']
if len(access_extraction_list) == 0:
dispacher = pas.manage_addProduct['ERP5Security']
dispacher.addERP5AccessTokenExtractionPlugin(self.test_token_extraction_id)
getattr(pas, self.test_token_extraction_id).manage_activateInterfaces(
('IExtractionPlugin',))
elif len(access_extraction_list) == 1:
self.test_token_extraction_id = access_extraction_list[0].getId()
elif len(access_extraction_list) > 1:
raise ValueError
transaction.commit()
def _createPerson(self, new_id):
"""Creates a person in person module, and returns the object, after
indexing is done. """
person_module = self.getPersonModule()
person = person_module.newContent(portal_type='Person',
user_id='TESTP-' + new_id)
person.newContent(portal_type = 'Assignment').open()
transaction.commit()
return person
def _getTokenCredential(self, request):
plugin = getattr(self.portal.acl_users, self.test_token_extraction_id)
return plugin.extractCredentials(request)
"""Authenticate the request and return (user_id, login) or None if not authorized."""
plugin = self.portal.acl_users.erp5_access_token_plugin
return plugin.authenticateCredentials(plugin.extractCredentials(request))
def _createRestrictedAccessToken(self, new_id, person, method, url_string):
access_token = self.portal.access_token_module.newContent(
......@@ -75,7 +96,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
return access_token
def test_working_token(self):
person = self.person = self._createPerson(self.new_id)
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -91,10 +112,14 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = self._getTokenCredential(self.portal.REQUEST)
self.assertEqual(result.get('external_login'), person.Person_getUserId())
self.assertTrue(result)
user_id, login = result
self.assertEqual(user_id, person.Person_getUserId())
# tokens have a login value, for auditing purposes
self.assertIn('token', login)
def test_bad_token(self):
person = self.person = self._createPerson(self.new_id)
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -110,10 +135,32 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = self._getTokenCredential(self.portal.REQUEST)
self.assertEqual(result, {})
self.assertFalse(result)
def test_token_without_assignment(self):
# Token does not work when person has no open assignment
person = self._createPerson(self.new_id)
for assignment in person.contentValues(portal_type='Assignment'):
assignment.close()
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
person,
access_method,
access_url)
access_token.validate()
self.tic()
self.portal.REQUEST.form["access_token"] = access_token.getId()
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = self._getTokenCredential(self.portal.REQUEST)
self.assertFalse(result)
def test_RestrictedAccessToken_getExternalLogin(self):
person = self.person = self._createPerson(self.new_id)
def test_RestrictedAccessToken_getUserId(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -127,13 +174,13 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserId()
self.assertEqual(result, person.Person_getUserId())
self.assertEqual(access_token.getValidationState(), 'validated')
def test_RestrictedAccessToken_getExternalLogin_access_token_secret(self):
person = self.person = self._createPerson(self.new_id)
def test_RestrictedAccessToken_getUserId_access_token_secret(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -146,7 +193,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserId()
self.assertEqual(result, None)
self.portal.REQUEST.form["access_token_secret"] = "XYXYXYXY"
......@@ -154,12 +201,12 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserId()
self.assertEqual(result, person.Person_getUserId())
self.assertEqual(access_token.getValidationState(), 'validated')
def test_RestrictedAccessToken_getExternalLogin_no_agent(self):
def test_RestrictedAccessToken_getUserId_no_agent(self):
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -173,11 +220,11 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserId()
self.assertEqual(result, None)
def test_RestrictedAccessToken_getExternalLogin_wrong_values(self):
person = self.person = self._createPerson(self.new_id)
def test_RestrictedAccessToken_getUserId_wrong_values(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -185,7 +232,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
access_method,
access_url)
self.tic()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserId()
self.assertEqual(result, None)
access_token.validate()
......@@ -195,23 +242,23 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserId()
self.assertEqual(result, None)
self.portal.REQUEST["ACTUAL_URL"] = "http://exemple.com/foo.bar"
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserId()
self.assertEqual(result, None)
access_token.invalidate()
self.tic()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserId()
self.assertEqual(result, None)
def test_OneTimeRestrictedAccessToken_getExternalLogin(self):
person = self.person = self._createPerson(self.new_id)
def test_OneTimeRestrictedAccessToken_getUserId(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createOneTimeRestrictedAccessToken(self.new_id,
......@@ -224,13 +271,13 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
result = access_token.OneTimeRestrictedAccessToken_getExternalLogin()
result = access_token.OneTimeRestrictedAccessToken_getUserId()
self.assertEqual(result, person.Person_getUserId())
self.assertEqual(access_token.getValidationState(), 'invalidated')
def test_OneTimeRestrictedAccessToken_getExternalLogin_wrong_values(self):
person = self.person = self._createPerson(self.new_id)
def test_OneTimeRestrictedAccessToken_getUserId_wrong_values(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "POST"
access_token = self._createOneTimeRestrictedAccessToken(self.new_id,
......@@ -238,7 +285,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
access_method,
access_url)
self.tic()
result = access_token.OneTimeRestrictedAccessToken_getExternalLogin()
result = access_token.OneTimeRestrictedAccessToken_getUserId()
self.assertEqual(result, None)
access_token.validate()
......@@ -247,10 +294,153 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST["REQUEST_METHOD"] = "GET"
self.portal.REQUEST["ACTUAL_URL"] = access_url
result = access_token.OneTimeRestrictedAccessToken_getExternalLogin()
result = access_token.OneTimeRestrictedAccessToken_getUserId()
self.assertEqual(result, None)
self.portal.REQUEST["ACTUAL_URL"] = "http://exemple.com/foo.bar"
result = access_token.OneTimeRestrictedAccessToken_getExternalLogin()
result = access_token.OneTimeRestrictedAccessToken_getUserId()
self.assertEqual(result, None)
class TestERP5AccessTokenAlarm(AccessTokenTestCase):
def test_alarm_old_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
access_token.workflow_history['edit_workflow'] = [{
'comment':'Fake history',
'error_message': '',
'actor': 'ERP5TypeTestCase',
'state': 'current',
'time': DateTime('2012/11/15 11:11'),
'action': 'foo_action'
}]
self.portal.portal_workflow._jumpToStateFor(access_token, 'validated')
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('invalidated', access_token.getValidationState())
self.assertEqual(
'Unused for 1 day.',
access_token.workflow_history['validation_workflow'][-1]['comment'])
def test_alarm_recent_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
self.portal.portal_workflow._jumpToStateFor(access_token, 'validated')
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('validated', access_token.getValidationState())
def test_alarm_old_non_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
access_token.workflow_history['edit_workflow'] = [{
'comment':'Fake history',
'error_message': '',
'actor': 'ERP5TypeTestCase',
'state': 'current',
'time': DateTime('2012/11/15 11:11'),
'action': 'foo_action'
}]
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('draft', access_token.getValidationState())
class TestERP5DumbHTTPExtractionPlugin(AccessTokenTestCase):
test_id = 'test_erp5_dumb_http_extraction'
def generateNewId(self):
return str(self.portal.portal_ids.generateNewId(
id_group=('erp5_dumb_http_test_id')))
def afterSetUp(self):
"""
This is ran before anything, used to set the environment
"""
self.new_id = self.generateNewId()
self._setupDumbHTTPExtraction()
self.tic()
def do_fake_request(self, request_method, headers=None):
if headers is None:
headers = {}
__version__ = "0.1"
env={}
env['SERVER_NAME']='bobo.server'
env['SERVER_PORT']='80'
env['REQUEST_METHOD']=request_method
env['REMOTE_ADDR']='204.183.226.81 '
env['REMOTE_HOST']='bobo.remote.host'
env['HTTP_USER_AGENT']='Bobo/%s' % __version__
env['HTTP_HOST']='127.0.0.1'
env['SERVER_SOFTWARE']='Bobo/%s' % __version__
env['SERVER_PROTOCOL']='HTTP/1.0 '
env['HTTP_ACCEPT']='image/gif, image/x-xbitmap, image/jpeg, */* '
env['SERVER_HOSTNAME']='bobo.server.host'
env['GATEWAY_INTERFACE']='CGI/1.1 '
env['SCRIPT_NAME']='Main'
env.update(headers)
return HTTPRequest(StringIO.StringIO(), env, HTTPResponse())
def _setupDumbHTTPExtraction(self):
pas = self.portal.acl_users
access_extraction_list = [q for q in pas.objectValues() \
if q.meta_type == 'ERP5 Dumb HTTP Extraction Plugin']
if len(access_extraction_list) == 0:
dispacher = pas.manage_addProduct['ERP5Security']
dispacher.addERP5DumbHTTPExtractionPlugin(self.test_id)
getattr(pas, self.test_id).manage_activateInterfaces(
('IExtractionPlugin',))
elif len(access_extraction_list) == 1:
self.test_id = access_extraction_list[0].getId()
elif len(access_extraction_list) > 1:
raise ValueError
self.commit()
def test_working_authentication(self):
self._createPerson(self.new_id, "test")
request = self.do_fake_request("GET", {"HTTP_AUTHORIZATION": "Basic " + base64.b64encode("%s:test" % self.new_id)})
ret = ERP5DumbHTTPExtractionPlugin("default_extraction").extractCredentials(request)
self.assertEqual(ret, {'login': self.new_id, 'password': 'test', 'remote_host': 'bobo.remote.host', 'remote_address': '204.183.226.81 '})
class TestERP5AccessTokenUpgraderEnablePlugin(AccessTokenTestCase):
def afterSetUp(self):
# disable plugin if it had been enabled by another test.
acl_users = self.portal.acl_users
acl_users.manage_delObjects(ids=[
x.getId() for x in
acl_users.objectValues(spec=('ERP5 Access Token Extraction Plugin',))])
self.commit()
def test_post_upgrade_constraint_enable_plugin(self):
consistency_list = self.portal.portal_templates.checkConsistency(
filter={"constraint_type": "post_upgrade"})
self.assertIn(
'erp5_access_token_plugin is missing',
[x.message for x in consistency_list])
self.portal.portal_templates.checkConsistency(
fixit=True,
filter={"constraint_type": "post_upgrade"})
self.commit()
self.assertIn(
'erp5_access_token_plugin',
self.portal.acl_users.plugins.listPluginIds(IAuthenticationPlugin))
\ No newline at end of file
......@@ -14,7 +14,7 @@
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testERP5AccessTokenSkins</string> </value>
<value> <string>testERP5AccessToken</string> </value>
</item>
<item>
<key> <string>description</string> </key>
......@@ -24,7 +24,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.testERP5AccessTokenSkins</string> </value>
<value> <string>test.erp5.testERP5AccessToken</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
......
# Copyright (c) 2002-2013 Nexedi SA and Contributors. All Rights Reserved.
from DateTime import DateTime
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
class TestERP5AccessTokenAlarm(ERP5TypeTestCase):
def getBusinessTemplateList(self):
return ('erp5_base',
'erp5_access_token')
def test_alarm_old_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
access_token.workflow_history['edit_workflow'] = [{
'comment':'Fake history',
'error_message': '',
'actor': 'ERP5TypeTestCase',
'state': 'current',
'time': DateTime('2012/11/15 11:11'),
'action': 'foo_action'
}]
self.portal.portal_workflow._jumpToStateFor(access_token, 'validated')
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('invalidated', access_token.getValidationState())
self.assertEqual(
'Unused for 1 day.',
access_token.workflow_history['validation_workflow'][-1]['comment'])
def test_alarm_recent_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
self.portal.portal_workflow._jumpToStateFor(access_token, 'validated')
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('validated', access_token.getValidationState())
def test_alarm_old_non_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
access_token.workflow_history['edit_workflow'] = [{
'comment':'Fake history',
'error_message': '',
'actor': 'ERP5TypeTestCase',
'state': 'current',
'time': DateTime('2012/11/15 11:11'),
'action': 'foo_action'
}]
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('draft', access_token.getValidationState())
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2015 Nexedi SA and Contributors. All Rights Reserved.
# Tristan Cavelier <tristan.cavelier@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
from Products.ERP5Security.ERP5DumbHTTPExtractionPlugin import ERP5DumbHTTPExtractionPlugin
import base64
import transaction
import StringIO
class TestERP5DumbHTTPExtractionPlugin(ERP5TypeTestCase):
test_id = 'test_erp5_dumb_http_extraction'
def getBusinessTemplateList(self):
return ('erp5_base',)
def generateNewId(self):
return str(self.portal.portal_ids.generateNewId(
id_group=('erp5_dumb_http_test_id')))
def afterSetUp(self):
"""
This is ran before anything, used to set the environment
"""
self.portal = self.getPortalObject()
self.new_id = self.generateNewId()
self._setupDumbHTTPExtraction()
transaction.commit()
self.tic()
def do_fake_request(self, request_method, headers={}):
__version__ = "0.1"
env={}
env['SERVER_NAME']='bobo.server'
env['SERVER_PORT']='80'
env['REQUEST_METHOD']=request_method
env['REMOTE_ADDR']='204.183.226.81 '
env['REMOTE_HOST']='bobo.remote.host'
env['HTTP_USER_AGENT']='Bobo/%s' % __version__
env['HTTP_HOST']='127.0.0.1'
env['SERVER_SOFTWARE']='Bobo/%s' % __version__
env['SERVER_PROTOCOL']='HTTP/1.0 '
env['HTTP_ACCEPT']='image/gif, image/x-xbitmap, image/jpeg, */* '
env['SERVER_HOSTNAME']='bobo.server.host'
env['GATEWAY_INTERFACE']='CGI/1.1 '
env['SCRIPT_NAME']='Main'
env.update(headers)
return HTTPRequest(StringIO.StringIO(), env, HTTPResponse())
def _setupDumbHTTPExtraction(self):
pas = self.portal.acl_users
access_extraction_list = [q for q in pas.objectValues() \
if q.meta_type == 'ERP5 Dumb HTTP Extraction Plugin']
if len(access_extraction_list) == 0:
dispacher = pas.manage_addProduct['ERP5Security']
dispacher.addERP5DumbHTTPExtractionPlugin(self.test_id)
getattr(pas, self.test_id).manage_activateInterfaces(
('IExtractionPlugin',))
elif len(access_extraction_list) == 1:
self.test_id = access_extraction_list[0].getId()
elif len(access_extraction_list) > 1:
raise ValueError
transaction.commit()
def _createPerson(self, new_id, password=None):
"""Creates a person in person module, and returns the object, after
indexing is done. """
person_module = self.getPersonModule()
person = person_module.newContent(portal_type='Person',
reference='TESTP-' + new_id)
if password:
person.setPassword(password)
person.newContent(portal_type = 'Assignment').open()
transaction.commit()
return person
def test_working_authentication(self):
person = self.person = self._createPerson(self.new_id, "test")
request = self.do_fake_request("GET", {"HTTP_AUTHORIZATION": "Basic " + base64.b64encode("%s:test" % self.new_id)})
ret = ERP5DumbHTTPExtractionPlugin("default_extraction").extractCredentials(request)
self.assertEquals(ret, {'login': self.new_id, 'password': 'test', 'remote_host': 'bobo.remote.host', 'remote_address': '204.183.226.81 '})
One Time Restricted Access Token | Url
Restricted Access Token | Reference
Restricted Access Token | Url
Template Tool | TemplateToolERP5AccessTokenExtractionPluginConstraint
\ No newline at end of file
TemplateToolERP5AccessTokenExtractionPluginConstraint
\ No newline at end of file
test.erp5.testERP5AccessTokenAlarm
test.erp5.testERP5AccessTokenSkins
test.erp5.testERP5DumbHTTPExtractionPlugin
\ No newline at end of file
test.erp5.testERP5AccessToken
\ No newline at end of file
......@@ -59,35 +59,49 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin):
#ILoginPasswordHostExtractionPlugin#
####################################
security.declarePrivate('extractCredentials')
@UnrestrictedMethod
def extractCredentials(self, request):
""" Extract CookieHash credentials from the request header. """
""" Extract credentials from the request header. """
creds = {}
# XXX Extract from HTTP Header, URL parameter are hardcoded.
# More flexible way would be to configure on the portal type level
token = request.getHeader("X-ACCESS-TOKEN", None)
if token is None:
token = request.form.get("access_token", None)
if token is not None:
# Extract token from HTTP Header
token = request.getHeader("X-ACCESS-TOKEN", request.form.get("access_token", None))
if token:
creds['erp5_access_token_id'] = token
creds['remote_host'] = request.get('REMOTE_HOST', '')
try:
creds['remote_address'] = request.getClientAddr()
except AttributeError:
creds['remote_address'] = request.get('REMOTE_ADDR', '')
return creds
#######################
#IAuthenticationPlugin#
#######################
security.declarePrivate('authenticateCredentials')
@UnrestrictedMethod
def authenticateCredentials(self, credentials):
""" Map credentials to a user ID. """
if 'erp5_access_token_id' in credentials:
erp5_access_token_id = credentials['erp5_access_token_id']
token_document = self.getPortalObject().access_token_module.\
_getOb(token, None)
_getOb(erp5_access_token_id, None)
# Access Token should be validated
# Check restricted access of URL
# Extract login information
if token_document is not None:
external_login = None
# Token API changed from returning a login to returning a user id.
# We detect if the old way of configuration is still in place and
# advise that configuration has to be updated in that case.
method = token_document._getTypeBasedMethod('getExternalLogin')
assert method is None, "Please update and remove obsolete method %r" % method
user_id = None
method = token_document._getTypeBasedMethod('getUserId')
if method is not None:
external_login = method()
user_id = method()
if user_id is not None:
return (user_id, 'token {erp5_access_token_id} for {user_id}'.format(**locals()))
if external_login is not None:
creds['external_login'] = external_login
creds['remote_host'] = request.get('REMOTE_HOST', '')
try:
creds['remote_address'] = request.getClientAddr()
except AttributeError:
creds['remote_address'] = request.get('REMOTE_ADDR', '')
return creds
#Form for new plugin in ZMI
manage_addERP5AccessTokenExtractionPluginForm = PageTemplateFile(
......@@ -109,6 +123,7 @@ def addERP5AccessTokenExtractionPlugin(dispatcher, id, title=None, REQUEST=None)
#List implementation of class
classImplements(ERP5AccessTokenExtractionPlugin,
plugins.ILoginPasswordHostExtractionPlugin
plugins.ILoginPasswordHostExtractionPlugin,
plugins.IAuthenticationPlugin,
)
InitializeClass(ERP5AccessTokenExtractionPlugin)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment