Commit a6c5a2ae authored by Rafael Monnerat's avatar Rafael Monnerat

erp5_certificate_authority: Use checkConsistency to validate url existence

  and implement other things nicer.
parent 54751169
...@@ -30,7 +30,8 @@ from AccessControl import ClassSecurityInfo ...@@ -30,7 +30,8 @@ from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions from Products.ERP5Type import Permissions
from Products.ERP5Type.XMLObject import XMLObject from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type.Globals import InitializeClass from Products.ERP5Type.Globals import InitializeClass
from caucase.client import CaucaseClient, CaucaseError from caucase.client import CaucaseClient, CaucaseHTTPError
from Products.ERP5Type.Core.Workflow import ValidationFailed
from six.moves import http_client from six.moves import http_client
...@@ -42,27 +43,25 @@ from cryptography.hazmat.primitives.asymmetric import rsa ...@@ -42,27 +43,25 @@ from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID from cryptography.x509.oid import NameOID
import tempfile import tempfile
class CaucaseConnector(XMLObject): class CaucaseConnector(XMLObject):
meta_type = 'Caucase Connector' meta_type = 'Caucase Connector'
security = ClassSecurityInfo() security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation) security.declareObjectProtected(Permissions.AccessContentsInformation)
def _getConnection(self, **kw):
message_list = self.checkConsistency()
if message_list:
raise ValidationFailed(message_list)
return CaucaseClient(**kw)
def _getServiceConnection(self, **kw): def _getServiceConnection(self, **kw):
# XXX Call checkConsistency return self._getConnection(ca_url="%s/cas" % self.getUrlString(""), **kw)
if self.getUrlString() is None:
raise ValueError("Caucase url must be defined")
return CaucaseClient(ca_url="%s/cas" % self.getUrlString(), **kw)
def _getUserConnection(self, **kw): def _getUserConnection(self, **kw):
# XXX Call checkConsistency return self._getConnection(ca_url="%s/cau" % self.getUrlString(""), **kw)
if self.getUrlString() is None:
raise ValueError("Caucase url must be defined")
return CaucaseClient(ca_url="%s/cau" % self.getUrlString(), **kw)
def _getAuthenticatedConnection(self): def _getAuthenticatedServiceConnection(self):
if self.getUserCertificate() is None: if self.getUserCertificate() is None:
if self.hasUserCertificateRequestReference(): if self.hasUserCertificateRequestReference():
self.bootstrapCaucaseConfiguration() self.bootstrapCaucaseConfiguration()
...@@ -103,11 +102,11 @@ class CaucaseConnector(XMLObject): ...@@ -103,11 +102,11 @@ class CaucaseConnector(XMLObject):
self.setUserCertificateRequestReference(csr_id) self.setUserCertificateRequestReference(csr_id)
self.setUserKey(key) self.setUserKey(key)
csr_id = int(self.getUserCertificateRequestReference()) csr_id = self.getUserCertificateRequestReference()
try: try:
crt_pem = caucase_connection.getCertificate( crt_pem = caucase_connection.getCertificate(
csr_id=csr_id) csr_id=csr_id)
except CaucaseError as e: except CaucaseHTTPError as e:
if e.args[0] != http_client.NOT_FOUND: if e.args[0] != http_client.NOT_FOUND:
raise raise
...@@ -155,6 +154,11 @@ class CaucaseConnector(XMLObject): ...@@ -155,6 +154,11 @@ class CaucaseConnector(XMLObject):
) )
name_attribute_list = self._getSubjectNameAttributeList() name_attribute_list = self._getSubjectNameAttributeList()
name_attribute_list.append(
x509.NameAttribute(NameOID.COMMON_NAME,
# The cryptography library only accept Unicode.
"erp5-user".decode('UTF-8')))
# Probably we should extend a bit more the attributes. # Probably we should extend a bit more the attributes.
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name( csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name(
name_attribute_list name_attribute_list
...@@ -170,16 +174,16 @@ class CaucaseConnector(XMLObject): ...@@ -170,16 +174,16 @@ class CaucaseConnector(XMLObject):
security.declareProtected(Permissions.ManageUsers, 'createCertificate') security.declareProtected(Permissions.ManageUsers, 'createCertificate')
def createCertificate(self, csr_id, template_csr=""): def createCertificate(self, csr_id, template_csr=""):
return self._getAuthenticatedConnection().createCertificate(csr_id, template_csr) return self._getAuthenticatedServiceConnection().createCertificate(csr_id, template_csr)
security.declareProtected(Permissions.ManageUsers, 'getCertificate') security.declareProtected(Permissions.ManageUsers, 'getCertificate')
def getCertificate(self, csr_id): def getCertificate(self, csr_id):
return self._getAuthenticatedConnection().getCertificate(csr_id) return self._getAuthenticatedServiceConnection().getCertificate(csr_id)
security.declareProtected(Permissions.ManageUsers, 'revokeCertificate') security.declareProtected(Permissions.ManageUsers, 'revokeCertificate')
def revokeCertificate(self, crt_pem, key_pem=None): def revokeCertificate(self, crt_pem, key_pem=None):
if key_pem is None: if key_pem is None:
return self._getAuthenticatedConnection().revokeCertificate(crt_pem) return self._getAuthenticatedServiceConnection().revokeCertificate(crt_pem)
return self._getServiceConnection().revokeCertificate(crt_pem, key_pem) return self._getServiceConnection().revokeCertificate(crt_pem, key_pem)
InitializeClass(CaucaseConnector) InitializeClass(CaucaseConnector)
\ No newline at end of file
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Property Existence Constraint" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_identity_criterion</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>_local_properties</string> </key>
<value>
<tuple>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>message_property_not_set</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
</tuple>
</value>
</item>
<item>
<key> <string>_range_criterion</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
<item>
<key> <string>constraint_property</string> </key>
<value>
<tuple>
<string>url_string</string>
</tuple>
</value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>url_string_existence_constraint</string> </value>
</item>
<item>
<key> <string>message_no_such_property</string> </key>
<value> <string>Url String must be set</string> </value>
</item>
<item>
<key> <string>message_property_not_set</string> </key>
<value>
<none/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
<key> <string>categories</string> </key> <key> <string>categories</string> </key>
<value> <value>
<tuple> <tuple>
<string>elementary_type/string</string> <string>elementary_type/int</string>
</tuple> </tuple>
</value> </value>
</item> </item>
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
############################################################################## ##############################################################################
from Products.ERP5Type.tests.ERP5TypeCaucaseTestCase import ERP5TypeCaucaseTestCase from Products.ERP5Type.tests.ERP5TypeCaucaseTestCase import ERP5TypeCaucaseTestCase
from Products.ERP5Type.Core.Workflow import ValidationFailed
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
...@@ -58,24 +59,24 @@ class TestCertificateAuthorityCaucaseConnector(ERP5TypeCaucaseTestCase): ...@@ -58,24 +59,24 @@ class TestCertificateAuthorityCaucaseConnector(ERP5TypeCaucaseTestCase):
connector_no_url_string = self.portal.portal_web_services.newContent( connector_no_url_string = self.portal.portal_web_services.newContent(
portal_type="Caucase Connector" portal_type="Caucase Connector"
) )
self.assertRaises(ValueError, connector_no_url_string._getServiceConnection) self.assertRaises(ValidationFailed, connector_no_url_string._getServiceConnection)
def test_getConnection(self): def test_getConnection(self):
self.assertNotEqual(None, self.caucase_connector._getServiceConnection()) self.assertNotEqual(None, self.caucase_connector._getServiceConnection())
self.assertNotEqual(None, self.caucase_connector._getUserConnection()) self.assertNotEqual(None, self.caucase_connector._getUserConnection())
def test_getAuthenticatedConnection_no_url(self): def test_getAuthenticatedServiceConnection_no_url(self):
connector_no_url_string = self.portal.portal_web_services.newContent( connector_no_url_string = self.portal.portal_web_services.newContent(
portal_type="Caucase Connector" portal_type="Caucase Connector"
) )
self.assertRaises(ValueError, connector_no_url_string._getAuthenticatedConnection) self.assertRaises(ValueError, connector_no_url_string._getAuthenticatedServiceConnection)
def test_getAuthenticatedConnection_with_url(self): def test_getAuthenticatedServiceConnection_with_url(self):
connector_no_url_string = self.portal.portal_web_services.newContent( connector_no_url_string = self.portal.portal_web_services.newContent(
portal_type="Caucase Connector", portal_type="Caucase Connector",
url_string="https://hasurl.but.no.user_certificate" url_string="https://hasurl.but.no.user_certificate"
) )
self.assertRaises(ValueError, connector_no_url_string._getAuthenticatedConnection) self.assertRaises(ValueError, connector_no_url_string._getAuthenticatedServiceConnection)
def test(self): def test(self):
# Simply test # Simply test
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment