Commit 1634fa64 authored by Jérome Perrin's avatar Jérome Perrin

Fix AccessToken login with ERP5 Login

Backport final state of nexedi/erp5!838 API changed a bit after first backport in https://lab.nexedi.com/nexedi/erp5-capago/merge_requests/37

/reviewed-on https://lab.nexedi.com/nexedi/erp5-capago/merge_requests/38
parents 0229018e 3b0291d0
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>AccessToken_getUserId</string> </value>
<value> <string>AccessToken_getUserValue</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -14,7 +14,7 @@ if access_token_document.getValidationState() == 'validated':
agent_document = access_token_document.getAgentValue()
if agent_document is not None:
result = agent_document.Person_getUserId()
result = agent_document
comment = "Token usage accepted"
access_token_document.invalidate(comment=comment)
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>OneTimeRestrictedAccessToken_getUserId</string> </value>
<value> <string>OneTimeRestrictedAccessToken_getUserValue</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -24,7 +24,8 @@ if access_token_document.getValidationState() == 'validated':
if agent_document is not None:
if agent_document.getPortalType() == 'Person':
# if this is a token for a person, only make accept if person has valid
# assignments (for compatibility with login/password authentication)
# assignments and a validated login (for compatibility with login/password
# authentication)
if agent_document.getValidationState() == 'deleted':
return None
now = DateTime()
......@@ -37,6 +38,13 @@ if access_token_document.getValidationState() == 'validated':
break
else:
return None
result = agent_document.Person_getUserId()
user, = context.getPortalObject().acl_users.searchUsers(
exact_match=True,
id=agent_document.Person_getUserId())
if not user['login_list']:
return None
result = agent_document
return result
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>RestrictedAccessToken_getUserId</string> </value>
<value> <string>RestrictedAccessToken_getUserValue</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -42,15 +42,14 @@ class AccessTokenTestCase(ERP5TypeTestCase):
return ('erp5_base',
'erp5_access_token')
def _createPerson(self, new_id, password=None):
def _createPerson(self, new_id):
"""Creates a person in person module, and returns the object, after
indexing is done. """
person_module = self.getPersonModule()
person = person_module.newContent(portal_type='Person',
reference='TESTP-' + new_id)
if password:
person.setPassword(password)
person.newContent(portal_type = 'Assignment').open()
person.newContent(portal_type='Assignment').open()
person.newContent(portal_type='ERP5 Login', reference=new_id).validate()
self.tic()
return person
......@@ -116,7 +115,7 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
user_id, login = result
self.assertEqual(user_id, person.Person_getUserId())
# tokens have a login value, for auditing purposes
self.assertIn('token', login)
self.assertEqual(access_token.getRelativeUrl(), login)
def test_bad_token(self):
person = self._createPerson(self.new_id)
......@@ -159,8 +158,11 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
result = self._getTokenCredential(self.portal.REQUEST)
self.assertFalse(result)
def test_RestrictedAccessToken_getUserId(self):
def test_token_without_login(self):
# Token does not work when person has no validated login
person = self._createPerson(self.new_id)
for login in person.contentValues(portal_type='ERP5 Login'):
login.invalidate()
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -170,16 +172,35 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
access_token.validate()
self.tic()
self.portal.REQUEST.form["access_token"] = access_token.getId()
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getUserId()
result = self._getTokenCredential(self.portal.REQUEST)
self.assertFalse(result)
def test_RestrictedAccessToken_getUserValue(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
person,
access_method,
access_url)
access_token.validate()
self.tic()
self.assertEqual(result, person.Person_getUserId())
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, person)
self.assertEqual(access_token.getValidationState(), 'validated')
def test_RestrictedAccessToken_getUserId_access_token_secret(self):
def test_RestrictedAccessToken_getUserValue_access_token_secret(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
......@@ -193,7 +214,7 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
result = access_token.RestrictedAccessToken_getUserId()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
self.portal.REQUEST.form["access_token_secret"] = "XYXYXYXY"
......@@ -201,12 +222,12 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getUserId()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, person.Person_getUserId())
self.assertEqual(result, person)
self.assertEqual(access_token.getValidationState(), 'validated')
def test_RestrictedAccessToken_getUserId_no_agent(self):
def test_RestrictedAccessToken_getUserValue_no_agent(self):
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -220,10 +241,10 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getUserId()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
def test_RestrictedAccessToken_getUserId_wrong_values(self):
def test_RestrictedAccessToken_getUserValue_wrong_values(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
......@@ -232,7 +253,7 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
access_method,
access_url)
self.tic()
result = access_token.RestrictedAccessToken_getUserId()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
access_token.validate()
......@@ -242,22 +263,22 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getUserId()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
self.portal.REQUEST["ACTUAL_URL"] = "http://exemple.com/foo.bar"
result = access_token.RestrictedAccessToken_getUserId()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
access_token.invalidate()
self.tic()
result = access_token.RestrictedAccessToken_getUserId()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
def test_OneTimeRestrictedAccessToken_getUserId(self):
def test_OneTimeRestrictedAccessToken_getUserValue(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
......@@ -271,12 +292,12 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
result = access_token.OneTimeRestrictedAccessToken_getUserId()
result = access_token.OneTimeRestrictedAccessToken_getUserValue()
self.assertEqual(result, person.Person_getUserId())
self.assertEqual(result, person)
self.assertEqual(access_token.getValidationState(), 'invalidated')
def test_OneTimeRestrictedAccessToken_getUserId_wrong_values(self):
def test_OneTimeRestrictedAccessToken_getUserValue_wrong_values(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "POST"
......@@ -285,7 +306,7 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
access_method,
access_url)
self.tic()
result = access_token.OneTimeRestrictedAccessToken_getUserId()
result = access_token.OneTimeRestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
access_token.validate()
......@@ -294,12 +315,12 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
self.portal.REQUEST["REQUEST_METHOD"] = "GET"
self.portal.REQUEST["ACTUAL_URL"] = access_url
result = access_token.OneTimeRestrictedAccessToken_getUserId()
result = access_token.OneTimeRestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
self.portal.REQUEST["ACTUAL_URL"] = "http://exemple.com/foo.bar"
result = access_token.OneTimeRestrictedAccessToken_getUserId()
result = access_token.OneTimeRestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
......@@ -365,20 +386,6 @@ class TestERP5AccessTokenAlarm(AccessTokenTestCase):
class TestERP5DumbHTTPExtractionPlugin(AccessTokenTestCase):
test_id = 'test_erp5_dumb_http_extraction'
def generateNewId(self):
return str(self.portal.portal_ids.generateNewId(
id_group=('erp5_dumb_http_test_id')))
def afterSetUp(self):
"""
This is ran before anything, used to set the environment
"""
self.new_id = self.generateNewId()
self._setupDumbHTTPExtraction()
self.tic()
def do_fake_request(self, request_method, headers=None):
if headers is None:
headers = {}
......@@ -400,26 +407,10 @@ class TestERP5DumbHTTPExtractionPlugin(AccessTokenTestCase):
env.update(headers)
return HTTPRequest(StringIO.StringIO(), env, HTTPResponse())
def _setupDumbHTTPExtraction(self):
pas = self.portal.acl_users
access_extraction_list = [q for q in pas.objectValues() \
if q.meta_type == 'ERP5 Dumb HTTP Extraction Plugin']
if len(access_extraction_list) == 0:
dispacher = pas.manage_addProduct['ERP5Security']
dispacher.addERP5DumbHTTPExtractionPlugin(self.test_id)
getattr(pas, self.test_id).manage_activateInterfaces(
('IExtractionPlugin',))
elif len(access_extraction_list) == 1:
self.test_id = access_extraction_list[0].getId()
elif len(access_extraction_list) > 1:
raise ValueError
self.commit()
def test_working_authentication(self):
self._createPerson(self.new_id, "test")
request = self.do_fake_request("GET", {"HTTP_AUTHORIZATION": "Basic " + base64.b64encode("%s:test" % self.new_id)})
request = self.do_fake_request("GET", {"HTTP_AUTHORIZATION": "Basic " + base64.b64encode("login:password")})
ret = ERP5DumbHTTPExtractionPlugin("default_extraction").extractCredentials(request)
self.assertEqual(ret, {'login': self.new_id, 'password': 'test', 'remote_host': 'bobo.remote.host', 'remote_address': '204.183.226.81 '})
self.assertEqual(ret, {'login': 'login', 'password': 'password', 'remote_host': 'bobo.remote.host', 'remote_address': '204.183.226.81 '})
class TestERP5AccessTokenUpgraderEnablePlugin(AccessTokenTestCase):
......
......@@ -67,10 +67,7 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin):
if token:
creds['erp5_access_token_id'] = token
creds['remote_host'] = request.get('REMOTE_HOST', '')
try:
creds['remote_address'] = request.getClientAddr()
except AttributeError:
creds['remote_address'] = request.get('REMOTE_ADDR', '')
creds['remote_address'] = request.getClientAddr()
return creds
#######################
......@@ -84,23 +81,12 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin):
erp5_access_token_id = credentials['erp5_access_token_id']
token_document = self.getPortalObject().access_token_module.\
_getOb(erp5_access_token_id, None)
# Access Token should be validated
# Check restricted access of URL
# Extract login information
if token_document is not None:
# Token API changed from returning a login to returning a user id.
# We detect if the old way of configuration is still in place and
# advise that configuration has to be updated in that case.
method = token_document._getTypeBasedMethod('getExternalLogin')
assert method is None, "Please update and remove obsolete method %r" % method
user_id = None
method = token_document._getTypeBasedMethod('getUserId')
method = token_document._getTypeBasedMethod('getUserValue')
if method is not None:
user_id = method()
if user_id is not None:
return (user_id, 'token {erp5_access_token_id} for {user_id}'.format(**locals()))
user_value = method()
if user_value is not None:
return (user_value.getUserId(), token_document.getRelativeUrl())
#Form for new plugin in ZMI
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment