Commit c6fb21f0 authored by Rafael Monnerat's avatar Rafael Monnerat

erp5_certificate_authority: Ensure extra namedattributes are added when master sign certificates

parent 7476c6b8
...@@ -104,7 +104,7 @@ class CaucaseConnector(XMLObject): ...@@ -104,7 +104,7 @@ class CaucaseConnector(XMLObject):
self.setUserCertificate(crt_pem) self.setUserCertificate(crt_pem)
def _getSubjectNameAttributeList(self): def _getSubjectNameAttributeList(self):
crt_pem = None #self.getUserCertificate() crt_pem = self.getUserCertificate()
if crt_pem is None: if crt_pem is None:
name_attribute_list = [] name_attribute_list = []
for oid, value in [ for oid, value in [
......
...@@ -43,11 +43,16 @@ class CertificateLoginMixin: ...@@ -43,11 +43,16 @@ class CertificateLoginMixin:
key = rsa.generate_private_key( key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()) public_exponent=65537, key_size=2048, backend=default_backend())
# Probably we should extend a bit more the attributes. name_attribute_list = self._getCaucaseConnector()._getSubjectNameAttributeList()
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
name_attribute_list.append(
x509.NameAttribute(NameOID.COMMON_NAME,
# The cryptography library only accept Unicode. # The cryptography library only accept Unicode.
x509.NameAttribute(NameOID.COMMON_NAME, self.getReference().decode('UTF-8')), self.getReference().decode('UTF-8')))
])).sign(key, hashes.SHA256(), default_backend())
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name(
name_attribute_list
)).sign(key, hashes.SHA256(), default_backend())
return csr.public_bytes(serialization.Encoding.PEM).decode() return csr.public_bytes(serialization.Encoding.PEM).decode()
......
...@@ -33,10 +33,19 @@ from cryptography import x509 ...@@ -33,10 +33,19 @@ from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from caucase.client import CaucaseHTTPError from caucase.client import CaucaseHTTPError
from cryptography.x509.oid import NameOID
class TestCertificateAuthorityCaucaseConnector(ERP5TypeCaucaseTestCase): class TestCertificateAuthorityCaucaseConnector(ERP5TypeCaucaseTestCase):
caucase_certificate_kw = {
"company_name": "ERP5 Company",
"country_name": "FR",
"email_address": "noreply@erp5.net",
"locality_name": "Lille",
"state_or_province_name": "Nord-Pas-de-Calais"
}
def afterSetUp(self): def afterSetUp(self):
self.setUpCaucase() self.setUpCaucase()
self.caucase_connector = self.portal.portal_web_services.test_caucase_connector self.caucase_connector = self.portal.portal_web_services.test_caucase_connector
...@@ -85,6 +94,21 @@ class TestCertificateAuthorityCaucaseConnector(ERP5TypeCaucaseTestCase): ...@@ -85,6 +94,21 @@ class TestCertificateAuthorityCaucaseConnector(ERP5TypeCaucaseTestCase):
cert = x509.load_pem_x509_certificate(cert_data, default_backend()) cert = x509.load_pem_x509_certificate(cert_data, default_backend())
privkey = serialization.load_pem_private_key(key.encode(), None, default_backend()) privkey = serialization.load_pem_private_key(key.encode(), None, default_backend())
self.assertEqual(["ERP5 Company"],
[i.value for i in cert.subject if i.oid == NameOID.ORGANIZATION_NAME])
self.assertEqual(["FR"],
[i.value for i in cert.subject if i.oid == NameOID.COUNTRY_NAME])
self.assertEqual(["noreply@erp5.net"],
[i.value for i in cert.subject if i.oid == NameOID.EMAIL_ADDRESS])
self.assertEqual(["Lille"],
[i.value for i in cert.subject if i.oid == NameOID.LOCALITY_NAME])
self.assertEqual(["Nord-Pas-de-Calais"],
[i.value for i in cert.subject if i.oid == NameOID.STATE_OR_PROVINCE_NAME])
cerfificate_pub = cert.public_key().public_bytes( cerfificate_pub = cert.public_key().public_bytes(
serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo) serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo)
private_key_pub = privkey.public_key().public_bytes( private_key_pub = privkey.public_key().public_bytes(
......
...@@ -39,6 +39,14 @@ from cryptography.x509.oid import NameOID ...@@ -39,6 +39,14 @@ from cryptography.x509.oid import NameOID
class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
caucase_certificate_kw = {
"company_name": "ERP5 Company",
"country_name": "FR",
"email_address": "noreply@erp5.net",
"locality_name": "Lille",
"state_or_province_name": "Nord-Pas-de-Calais"
}
def afterSetUp(self): def afterSetUp(self):
self.setUpCaucase() self.setUpCaucase()
if getattr(self.portal.portal_types.Person, if getattr(self.portal.portal_types.Person,
...@@ -80,10 +88,26 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): ...@@ -80,10 +88,26 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
self.assertTrue(certificate_login.getReference().startswith("CERT")) self.assertTrue(certificate_login.getReference().startswith("CERT"))
ssl_certificate = x509.load_pem_x509_certificate(certificate['certificate']) ssl_certificate = x509.load_pem_x509_certificate(certificate['certificate'])
self.assertEqual(len(ssl_certificate.subject), 1) self.assertEqual(len(ssl_certificate.subject), 6)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0] cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn) self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
self.assertEqual(["ERP5 Company"],
[i.value for i in ssl_certificate.subject if i.oid == NameOID.ORGANIZATION_NAME])
self.assertEqual(["FR"],
[i.value for i in ssl_certificate.subject if i.oid == NameOID.COUNTRY_NAME])
self.assertEqual(["noreply@erp5.net"],
[i.value for i in ssl_certificate.subject if i.oid == NameOID.EMAIL_ADDRESS])
self.assertEqual(["Lille"],
[i.value for i in ssl_certificate.subject if i.oid == NameOID.LOCALITY_NAME])
self.assertEqual(["Nord-Pas-de-Calais"],
[i.value for i in ssl_certificate.subject if i.oid == NameOID.STATE_OR_PROVINCE_NAME])
def test_person_duplicated_login(self): def test_person_duplicated_login(self):
user_id, login = self._createPerson() user_id, login = self._createPerson()
self.loginByUserName(login) self.loginByUserName(login)
...@@ -103,7 +127,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): ...@@ -103,7 +127,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
self.assertTrue(certificate_login.getReference().startswith("CERT")) self.assertTrue(certificate_login.getReference().startswith("CERT"))
ssl_certificate = x509.load_pem_x509_certificate(certificate['certificate']) ssl_certificate = x509.load_pem_x509_certificate(certificate['certificate'])
self.assertEqual(len(ssl_certificate.subject), 1) self.assertEqual(len(ssl_certificate.subject), 6)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0] cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn) self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
...@@ -127,7 +151,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): ...@@ -127,7 +151,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
self.assertTrue(certificate_login.getReference().startswith("CERT")) self.assertTrue(certificate_login.getReference().startswith("CERT"))
ssl_certificate = x509.load_pem_x509_certificate(certificate['certificate']) ssl_certificate = x509.load_pem_x509_certificate(certificate['certificate'])
self.assertEqual(len(ssl_certificate.subject), 1) self.assertEqual(len(ssl_certificate.subject), 6)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0] cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn) self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
...@@ -151,7 +175,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): ...@@ -151,7 +175,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
self.assertTrue(new_certificate_login.getReference().startswith("CERT")) self.assertTrue(new_certificate_login.getReference().startswith("CERT"))
ssl_certificate = x509.load_pem_x509_certificate(new_certificate['certificate']) ssl_certificate = x509.load_pem_x509_certificate(new_certificate['certificate'])
self.assertEqual(len(ssl_certificate.subject), 1) self.assertEqual(len(ssl_certificate.subject), 6)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0] cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(new_certificate_login.getReference().decode("UTF-8"), cn) self.assertEqual(new_certificate_login.getReference().decode("UTF-8"), cn)
...@@ -204,7 +228,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): ...@@ -204,7 +228,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
self.assertTrue(certificate_login.getReference().startswith("CERT")) self.assertTrue(certificate_login.getReference().startswith("CERT"))
ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate']) ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate'])
self.assertEqual(len(ssl_certificate.subject), 1) self.assertEqual(len(ssl_certificate.subject), 6)
cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME] cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME]
self.assertEqual(len(cn_list), 1) self.assertEqual(len(cn_list), 1)
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0]) self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0])
...@@ -214,6 +238,21 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): ...@@ -214,6 +238,21 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
certificate_login.validate() certificate_login.validate()
self.assertEqual(certificate_login.getValidationState(), "validated") self.assertEqual(certificate_login.getValidationState(), "validated")
self.assertEqual(["ERP5 Company"],
[i.value for i in ssl_certificate.subject if i.oid == NameOID.ORGANIZATION_NAME])
self.assertEqual(["FR"],
[i.value for i in ssl_certificate.subject if i.oid == NameOID.COUNTRY_NAME])
self.assertEqual(["noreply@erp5.net"],
[i.value for i in ssl_certificate.subject if i.oid == NameOID.EMAIL_ADDRESS])
self.assertEqual(["Lille"],
[i.value for i in ssl_certificate.subject if i.oid == NameOID.LOCALITY_NAME])
self.assertEqual(["Nord-Pas-de-Calais"],
[i.value for i in ssl_certificate.subject if i.oid == NameOID.STATE_OR_PROVINCE_NAME])
def test_certificate_login_get_certificate_set_reference(self): def test_certificate_login_get_certificate_set_reference(self):
person = self.portal.person_module.newContent(portal_type='Person') person = self.portal.person_module.newContent(portal_type='Person')
certificate_login = person.newContent(portal_type='Certificate Login', certificate_login = person.newContent(portal_type='Certificate Login',
...@@ -229,7 +268,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): ...@@ -229,7 +268,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
self.assertTrue(certificate_login.getReference().startswith("CERT")) self.assertTrue(certificate_login.getReference().startswith("CERT"))
ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate']) ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate'])
self.assertEqual(len(ssl_certificate.subject), 1) self.assertEqual(len(ssl_certificate.subject), 6)
cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME] cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME]
self.assertEqual(len(cn_list), 1) self.assertEqual(len(cn_list), 1)
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0]) self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0])
...@@ -254,7 +293,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): ...@@ -254,7 +293,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
self.assertIn("key", certificate_dict.keys()) self.assertIn("key", certificate_dict.keys())
ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate']) ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate'])
self.assertEqual(len(ssl_certificate.subject), 1) self.assertEqual(len(ssl_certificate.subject), 6)
cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME] cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME]
self.assertEqual(len(cn_list), 1) self.assertEqual(len(cn_list), 1)
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0]) self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0])
...@@ -280,7 +319,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): ...@@ -280,7 +319,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
self.assertTrue(reference.startswith("CERT")) self.assertTrue(reference.startswith("CERT"))
ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate']) ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate'])
self.assertEqual(len(ssl_certificate.subject), 1) self.assertEqual(len(ssl_certificate.subject), 6)
cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME] cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME]
self.assertEqual(len(cn_list), 1) self.assertEqual(len(cn_list), 1)
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0]) self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0])
...@@ -306,7 +345,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): ...@@ -306,7 +345,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
self.assertTrue(reference.startswith("CERT")) self.assertTrue(reference.startswith("CERT"))
ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate']) ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate'])
self.assertEqual(len(ssl_certificate.subject), 1) self.assertEqual(len(ssl_certificate.subject), 6)
cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME] cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME]
self.assertEqual(len(cn_list), 1) self.assertEqual(len(cn_list), 1)
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0]) self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0])
...@@ -332,7 +371,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase): ...@@ -332,7 +371,7 @@ class TestPersonCertificateLogin(ERP5TypeCaucaseTestCase):
self.assertTrue(reference.startswith("CERT")) self.assertTrue(reference.startswith("CERT"))
ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate']) ssl_certificate = x509.load_pem_x509_certificate(certificate_dict['certificate'])
self.assertEqual(len(ssl_certificate.subject), 1) self.assertEqual(len(ssl_certificate.subject), 6)
cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME] cn_list = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME]
self.assertEqual(len(cn_list), 1) self.assertEqual(len(cn_list), 1)
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0]) self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn_list[0])
......
...@@ -78,6 +78,7 @@ def retry(callback, try_count=10, try_delay=0.1): ...@@ -78,6 +78,7 @@ def retry(callback, try_count=10, try_delay=0.1):
class ERP5TypeCaucaseTestCase(ERP5TypeTestCase): class ERP5TypeCaucaseTestCase(ERP5TypeTestCase):
""" Helpfull code to start/stop/control a caucased service for the tests """ Helpfull code to start/stop/control a caucased service for the tests
""" """
caucase_certificate_kw = {}
def _startCaucaseServer(self, argv=(), timeout=10): def _startCaucaseServer(self, argv=(), timeout=10):
""" """
Start caucased server Start caucased server
...@@ -152,7 +153,8 @@ class ERP5TypeCaucaseTestCase(ERP5TypeTestCase): ...@@ -152,7 +153,8 @@ class ERP5TypeCaucaseTestCase(ERP5TypeTestCase):
portal_type="Caucase Connector", portal_type="Caucase Connector",
reference="erp5-certificate-login", reference="erp5-certificate-login",
user_key=None, user_key=None,
user_certificate=None user_certificate=None,
**self.caucase_certificate_kw
) )
test_caucase_connector.validate() test_caucase_connector.validate()
...@@ -177,3 +179,4 @@ class ERP5TypeCaucaseTestCase(ERP5TypeTestCase): ...@@ -177,3 +179,4 @@ class ERP5TypeCaucaseTestCase(ERP5TypeTestCase):
try_delay=1 try_delay=1
): ):
raise ValueError("Unable to configure") raise ValueError("Unable to configure")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment