Commit 74828dc4 authored by Vincent Pelletier's avatar Vincent Pelletier

WIP caucase: Fix CRL support.

Revocation lists must be signed by the authority which signed the
certificate being revoked.
TODO: Test. Coverage.
parent de9a8def
......@@ -23,6 +23,7 @@ Caucase - Certificate Authority for Users, Certificate Authority for SErvices
"""
from __future__ import absolute_import
from binascii import hexlify, unhexlify
from collections import defaultdict
import datetime
import json
import os
......@@ -321,6 +322,9 @@ class CertificateAuthority(object):
public_key = csr.public_key()
now = datetime.datetime.utcnow()
ca_key_identifier_ext = ca_crt.extensions.get_extension_for_class(
x509.AuthorityKeyIdentifier,
).value
builder = x509.CertificateBuilder(
subject_name=template_csr.subject,
issuer_name=ca_crt.subject,
......@@ -343,11 +347,7 @@ class CertificateAuthority(object):
critical=False, # "MUST mark this extension as non-critical"
),
Extension(
x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
ca_crt.extensions.get_extension_for_class(
x509.SubjectKeyIdentifier,
).value,
),
ca_key_identifier_ext,
critical=False, # "MUST mark this extension as non-critical"
),
],
......@@ -357,7 +357,10 @@ class CertificateAuthority(object):
x509.CRLDistributionPoints([
x509.DistributionPoint(
full_name=[
x509.UniformResourceIdentifier(self._crl_base_url),
x509.UniformResourceIdentifier(
self._crl_base_url + '/' +
hexlify(ca_key_identifier_ext.key_identifier).decode('ascii'),
),
],
relative_name=None,
crl_issuer=None,
......@@ -664,14 +667,20 @@ class CertificateAuthority(object):
crt = utils.load_certificate(
crt_pem,
self.getCACertificateList(),
x509.load_pem_x509_crl(
self.getCertificateRevocationList(),
_cryptography_backend,
),
crl_list=[
x509.load_pem_x509_crl(
x,
_cryptography_backend,
)
for x in self.getCertificateRevocationListDict().itervalues()
],
)
self._storage.revoke(
serial=crt.serial_number,
expiration_date=utils.datetime2timestamp(crt.not_valid_after),
authority_key_identifier=crt.extensions.get_extension_for_class(
x509.AuthorityKeyIdentifier,
).value.key_identifier,
)
def revokeSerial(self, serial):
......@@ -692,6 +701,7 @@ class CertificateAuthority(object):
expiration_date=utils.datetime2timestamp(max(
x.not_valid_after for x in self.getCACertificateList()
)),
authority_key_identifier=None,
)
def renew(self, crt_pem, csr_pem):
......@@ -706,10 +716,13 @@ class CertificateAuthority(object):
crt = utils.load_certificate(
crt_pem,
self.getCACertificateList(),
x509.load_pem_x509_crl(
self.getCertificateRevocationList(),
_cryptography_backend,
),
crl_list=[
x509.load_pem_x509_crl(
x,
_cryptography_backend,
)
for x in self.getCertificateRevocationListDict().itervalues()
],
)
return self._createCertificate(
csr_id=self.appendCertificateSigningRequest(
......@@ -730,55 +743,68 @@ class CertificateAuthority(object):
),
)
def getCertificateRevocationList(self):
def getCertificateRevocationListDict(self):
"""
Return PEM-encoded certificate revocation list.
"""
now = datetime.datetime.utcnow()
crl_pem, crl_expiration_date = self._current_crl
if crl_pem is None or crl_expiration_date > now:
ca_key_pair = self._getCurrentCAKeypair()
ca_crt = ca_key_pair['crt']
crl = x509.CertificateRevocationListBuilder(
issuer_name=ca_crt.issuer,
last_update=now,
next_update=now + self._crl_life_time,
extensions=[
Extension(
x509.CRLNumber(
self._storage.getNextCertificateRevocationListNumber(),
),
critical=False, # "MUST mark this extension as non-critical"
),
Extension(
x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
ca_crt.extensions.get_extension_for_class(
x509.SubjectKeyIdentifier,
).value,
crl_pem_dict, crl_expiration_date = self._current_crl
if crl_pem_dict is None or crl_expiration_date > now:
revocation_dict = defaultdict(list)
for revocation in self._storage.getRevocationList():
revocation_dict[
revocation['authority_key_identifier']
].append(revocation)
crl_pem_dict = {}
self._renewCAIfNeeded()
for ca_key_pair in self._ca_key_pairs_list:
ca_crt = ca_key_pair['crt']
ca_key_identifier_ext = ca_crt.extensions.get_extension_for_class(
x509.AuthorityKeyIdentifier,
).value
ca_key_identifier = ca_key_identifier_ext.key_identifier
crl = x509.CertificateRevocationListBuilder(
issuer_name=ca_crt.issuer,
last_update=now,
next_update=now + self._crl_life_time,
extensions=[
Extension(
x509.CRLNumber(
self._storage.getNextCertificateRevocationListNumber(),
),
critical=False, # "MUST mark this extension as non-critical"
),
critical=False, # No mention in RFC5280 5.2.1
),
],
revoked_certificates=[
x509.RevokedCertificateBuilder(
serial_number=x['serial'],
revocation_date=datetime.datetime.fromtimestamp(
x['revocation_date'],
Extension(
ca_key_identifier_ext,
critical=False, # No mention in RFC5280 5.2.1
),
).build(_cryptography_backend)
for x in self._storage.getRevocationList()
],
).sign(
private_key=ca_key_pair['key'],
algorithm=self._default_digest_class(),
backend=_cryptography_backend,
)
crl_pem = crl.public_bytes(serialization.Encoding.PEM)
],
revoked_certificates=[
x509.RevokedCertificateBuilder(
serial_number=x['serial'],
revocation_date=datetime.datetime.fromtimestamp(
x['revocation_date'],
),
).build(_cryptography_backend)
for x in (
revocation_dict.get(ca_key_identifier, []) +
# unidentified revocations get listed in all CRLs.
revocation_dict.get(None, [])
)
],
).sign(
private_key=ca_key_pair['key'],
algorithm=self._default_digest_class(),
backend=_cryptography_backend,
)
crl_pem_dict[ca_key_identifier] = crl.public_bytes(
serialization.Encoding.PEM,
)
self._current_crl = (
crl_pem,
crl_pem_dict,
now + self._crl_renew_time,
)
return crl_pem
return crl_pem_dict
class UserCertificateAuthority(CertificateAuthority):
"""
......@@ -799,10 +825,13 @@ class UserCertificateAuthority(CertificateAuthority):
certificates.
"""
ca_cert_list = self.getCACertificateList()
crl = x509.load_pem_x509_crl(
self.getCertificateRevocationList(),
_cryptography_backend,
)
crl_list = [
x509.load_pem_x509_crl(
x,
_cryptography_backend,
)
for x in self.getCertificateRevocationListDict().itervalues()
]
signing_key = os.urandom(32)
symetric_key = os.urandom(32)
iv = os.urandom(16)
......@@ -821,7 +850,7 @@ class UserCertificateAuthority(CertificateAuthority):
key_list = []
for crt_pem in self._storage.iterCertificates():
try:
crt = utils.load_certificate(crt_pem, ca_cert_list, crl)
crt = utils.load_certificate(crt_pem, ca_cert_list, crl_list)
except CertificateVerificationError:
continue
public_key = crt.public_key()
......
......@@ -403,13 +403,27 @@ def main(argv=None, stdout=sys.stdout, stderr=sys.stderr):
'--crl',
default='cas.crl.pem',
metavar='CRL_PATH',
help='Services certificate revocation list location. default: %(default)s',
help='Services certificate revocation list location. '
'May be an existing directory or file, or non-existing. '
'If non-existing and given path has an extension, a file will be created, '
'otherwise a directory will be. '
'When it is a file, it may contain multiple PEM-encoded concatenated '
'CRLs. When it is a directory, it may contain multiple files, each '
'containing a single PEM-encoded CRL. '
'default: %(default)s',
)
parser.add_argument(
'--user-crl',
default='cau.crl.pem',
metavar='CRL_PATH',
help='Users certificate revocation list location. default: %(default)s',
help='Users certificate revocation list location. '
'May be an existing directory or file, or non-existing. '
'If non-existing and given path has an extension, a file will be created, '
'otherwise a directory will be. '
'When it is a file, it may contain multiple PEM-encoded concatenated '
'CRLs. When it is a directory, it may contain multiple files, each '
'containing a single PEM-encoded CRL. '
'default: %(default)s',
)
parser.add_argument(
'--threshold',
......@@ -814,6 +828,12 @@ def updater(argv=None, until=utils.until):
required=True,
metavar='CRT_PATH',
help='Path of your certificate revocation list for MODE. '
'May be an existing directory or file, or non-existing. '
'If non-existing and given path has an extension, a file will be created, '
'otherwise a directory will be. '
'When it is a file, it may contain multiple PEM-encoded concatenated '
'CRLs. When it is a directory, it may contain multiple files, each '
'containing a single PEM-encoded CRL. '
'Will be maintained up-to-date.'
)
args = parser.parse_args(argv)
......@@ -889,11 +909,11 @@ def updater(argv=None, until=utils.until):
if RetryingCaucaseClient.updateCRLFile(ca_url, args.crl, ca_crt_list):
print('Got new CRL')
updated = True
with open(args.crl, 'rb') as crl_file:
for crl_pem in utils.getCRLList(args.crl):
next_deadline = min(
next_deadline,
utils.load_crl(
crl_file.read(),
crl_pem,
ca_crt_list,
).next_update - crl_threshold,
)
......
......@@ -22,10 +22,10 @@
Caucase - Certificate Authority for Users, Certificate Authority for SErvices
"""
from __future__ import absolute_import
from binascii import hexlify
import datetime
import httplib
import json
import os
import ssl
from urlparse import urlparse
from cryptography import x509
......@@ -111,16 +111,16 @@ class CaucaseClient(object):
Return whether an update happened.
"""
if os.path.exists(crl_path):
with open(crl_path, 'rb') as crl_file:
my_crl = utils.load_crl(crl_file.read(), ca_list)
else:
my_crl = None
latest_crl_pem = cls(ca_url=url).getCertificateRevocationList()
latest_crl = utils.load_crl(latest_crl_pem, ca_list)
if my_crl is None or latest_crl.signature != my_crl.signature:
with open(crl_path, 'wb') as crl_file:
crl_file.write(latest_crl_pem)
loaded_crl_pem_list = utils.getCRLList(crl_path)
now = datetime.datetime.utcnow()
crl_pem_set = {
x
for x in loaded_crl_pem_list
if utils.load_crl(x, ca_list).next_update > now
}
crl_pem_set.update(cls(ca_url=url).getCertificateRevocationListList())
if crl_pem_set.symmetric_difference(loaded_crl_pem_list):
utils.saveCRLList(crl_path, crl_pem_set)
return True
return False
......@@ -187,11 +187,30 @@ class CaucaseClient(object):
def _https(self, method, url, body=None, headers=None):
return self._request(self._https_connection, method, url, body, headers)
def getCertificateRevocationList(self):
def getCertificateRevocationList(self, authority_key_identifier=None):
"""
[ANONYMOUS] Retrieve latest CRL for given authority key identifier.
BBB: if authority_key_identifier is None, returns the CRL for the
currently-active CA certificate.
"""
if authority_key_identifier is None: # BBB
return self._http('GET', '/crl')
return self._http(
'GET',
'/crl/' + hexlify(authority_key_identifier),
)
def getCertificateRevocationListList(self):
"""
[ANONYMOUS] Retrieve latest CRL.
[ANONYMOUS] Retrieve the latest CRLs for each CA certificate.
"""
return self._http('GET', '/crl')
return [
x.encode('ascii')
for x in json.loads(self._http('GET', '/crl', headers={
# BBB: see getCertificateRevocationList
'Accept': 'application/json',
}).decode('utf-8'))
]
def getCertificateSigningRequest(self, csr_id):
"""
......
......@@ -1124,11 +1124,21 @@ def manage(argv=None, stdout=sys.stdout):
for import_crl in args.import_crl:
with open(import_crl, 'rb') as crl_file:
crl_data = crl_file.read()
for revoked in utils.load_crl(crl_data, trusted_ca_crt_set):
crl = utils.load_crl(crl_data, trusted_ca_crt_set)
try:
crl_authority_extension = crl.extensions.get_extension_for_class(
x509.AuthorityKeyIdentifier,
)
except x509.ExtensionNotFound:
authority_key_identifier = None
else:
authority_key_identifier = crl_authority_extension.value.key_identifier
for revoked in crl:
try:
db.revoke(
revoked.serial_number,
latest_ca_not_after,
serial=revoked.serial_number,
expiration_date=latest_ca_not_after,
authority_key_identifier=authority_key_identifier,
)
except exceptions.Found:
already_revoked_count += 1
......
......@@ -22,6 +22,7 @@
Caucase - Certificate Authority for Users, Certificate Authority for SErvices
"""
from __future__ import absolute_import
from binascii import a2b_base64, b2a_base64
from random import getrandbits
import os
import sqlite3
......@@ -113,7 +114,8 @@ class SQLite3Storage(local):
# sqlite can accept as integers, so store these as text. Use a trivial
# string serialisation: not very space efficient, but this should not be
# a limiting issue for our use-cases anyway.
db.cursor().executescript('''
c = db.cursor()
c.executescript('''
CREATE TABLE IF NOT EXISTS %(prefix)sca (
expiration_date INTEGER,
key TEXT,
......@@ -144,6 +146,18 @@ class SQLite3Storage(local):
'prefix': table_prefix,
'key_id_constraint': 'UNIQUE' if enforce_unique_key_id else '',
})
try:
c.execute(
'SELECT authority_key_identifier FROM %srevoked LIMIT 1' % (
table_prefix,
)
).fetchall()
except sqlite3.OperationalError:
c.execute(
'ALTER TABLE %srevoked ADD COLUMN authority_key_identifier TEXT' % (
table_prefix,
)
)
def _incrementCounter(self, name, increment=1, initial=0):
"""
......@@ -460,7 +474,7 @@ class SQLite3Storage(local):
break
yield toBytes(row['crt'])
def revoke(self, serial, expiration_date):
def revoke(self, serial, expiration_date, authority_key_identifier):
"""
Add given certificate serial to the list of revoked certificates.
Flushes any current CRL.
......@@ -470,20 +484,25 @@ class SQLite3Storage(local):
expiration_date (int)
Unix timestamp at which the certificate expires, allowing to remove this
entry from the CRL.
authority_key_identifier (str)
Value which identifies the authority key which signed the revoked
certificate, so the revocation entry ends up in the correct CRL.
"""
with self._db as db:
c = db.cursor()
try:
c.execute(
'INSERT INTO %srevoked '
'(serial, revocation_date, expiration_date) '
'VALUES (?, ?, ?)' % (
'(serial, revocation_date, expiration_date, '
'authority_key_identifier) '
'VALUES (?, ?, ?, ?)' % (
self._table_prefix,
),
(
str(serial),
int(time()),
expiration_date,
b2a_base64(authority_key_identifier),
)
)
except sqlite3.IntegrityError:
......@@ -504,6 +523,9 @@ class SQLite3Storage(local):
Unix timestamp of certificate revocation.
- serial (int)
Revoked certificate's serial.
- authority_key_identifier (str, None)
Identifier of the private key which signed the revoked certificate, or
None if is it not known.
"""
with self._db as db:
c = db.cursor()
......@@ -517,9 +539,15 @@ class SQLite3Storage(local):
{
'revocation_date': int(x['revocation_date']),
'serial': int(x['serial']),
'authority_key_identifier': (
None
if x['authority_key_identifier'] is None else
a2b_base64(x['authority_key_identifier'])
),
}
for x in c.execute(
'SELECT revocation_date, serial FROM %srevoked' % (
'SELECT revocation_date, serial, authority_key_identifier '
'FROM %srevoked' % (
self._table_prefix,
),
)
......
......@@ -542,7 +542,7 @@ class CaucaseTest(unittest.TestCase):
utils._verifyCertificateChain(
cert=crt,
trusted_cert_list=[ca_crt],
crl=None,
crl_list=None,
)
# pylint: enable=protected-access
except CertificateVerificationError: # pragma: no cover
......@@ -1563,7 +1563,7 @@ class CaucaseTest(unittest.TestCase):
self._stopServer()
crt_pem, key_pem, ca_crt_pem = utils.getCertKeyAndCACert(
self._server_key,
crl=None,
crl_list=None,
)
with open(self._server_key, 'wb') as server_key_file:
server_key_file.write(key_pem)
......@@ -1634,6 +1634,11 @@ class CaucaseTest(unittest.TestCase):
]
with open(self._client_user_crl, 'rb') as client_user_crl_file:
cau_crl = client_user_crl_file.read()
cau_crl_dict = {
utils.getAuthorityKeyIdentifier(
utils.load_crl(cau_crl, cau_list),
): cau_crl,
}
class DummyCAU(object):
"""
Mock CAU.
......@@ -1655,11 +1660,11 @@ class CaucaseTest(unittest.TestCase):
return b'notreallyPEM'
@staticmethod
def getCertificateRevocationList():
def getCertificateRevocationListDict():
"""
Return cau crl.
"""
return cau_crl
return cau_crl_dict
@staticmethod
def appendCertificateSigningRequest(_):
......@@ -1788,8 +1793,18 @@ class CaucaseTest(unittest.TestCase):
},
u"_links": {
u"getCertificateRevocationList": {
u"href": HATEOAS_HTTP_PREFIX + u"/cau/crl/{+authority_key_id}",
u"title": (
u"Retrieve latest certificate revocation list for given "
u"hex-encoded authority identifier."
),
},
u"getCertificateRevocationListList": {
u"href": HATEOAS_HTTP_PREFIX + u"/cau/crl",
u"title": u"Retrieve latest certificate revocation list.",
u"title": (
u"Retrieve latest certificate revocation list for all valid "
u"authorities."
),
},
u"getCACertificate": {
u"href": HATEOAS_HTTP_PREFIX + u"/cau/crt/ca.crt.pem",
......
......@@ -24,7 +24,7 @@ Caucase - Certificate Authority for Users, Certificate Authority for SErvices
Small-ish functions needed in many places.
"""
from __future__ import absolute_import, print_function
from binascii import a2b_base64, b2a_base64
from binascii import a2b_base64, b2a_base64, hexlify
import calendar
import codecs
from collections import defaultdict
......@@ -124,26 +124,31 @@ def _getPEMTypeDict(path, result=None):
def getCertList(crt_path):
"""
Return a list of certificates.
Raises if there is anything else than a certificate.
"""
if not os.path.exists(crt_path):
return _getPEMListFromPath(crt_path, pem.Certificate)
def getCRLList(crl_path):
"""
Return a list of Certificate Revocation Lists.
"""
return _getPEMListFromPath(crl_path, pem.CertificateRevocationList)
def _getPEMListFromPath(path, pem_type):
if not os.path.exists(path):
return []
if os.path.isdir(crt_path):
file_list = [os.path.join(crt_path, x) for x in os.listdir(crt_path)]
else:
file_list = [crt_path]
result = []
for file_name in file_list:
type_dict = _getPEMTypeDict(file_name)
crt_list = type_dict.pop(pem.Certificate)
if type_dict:
raise ValueError('%s contains more than just certificates' % (file_name, ))
result.extend(x.as_bytes() for x in crt_list)
return result
return [
pem_object.as_bytes()
for file_name in (
[os.path.join(path, x) for x in os.listdir(path)]
if os.path.isdir(path) else
[path]
)
for pem_object in _getPEMTypeDict(file_name).get(pem_type, ())
]
def saveCertList(crt_path, cert_pem_list):
"""
Store given list of PEm-encoded certificates in given path.
Store given list of PEM-encoded certificates in given path.
crt_path (str)
May point to a directory a file, or nothing.
......@@ -154,61 +159,93 @@ def saveCertList(crt_path, cert_pem_list):
cert_pem_list (list of bytes)
"""
if os.path.exists(crt_path):
if os.path.isfile(crt_path):
saveCertListTo = _saveCertListToFile
elif os.path.isdir(crt_path):
saveCertListTo = _saveCertListToDirectory
_savePEMList(crt_path, cert_pem_list, load_ca_certificate, '.cacrt.pem')
def saveCRLList(crl_path, crl_pem_list):
"""
Store given list of PEM-encoded Certificate Revocation Lists in given path.
crl_path (str)
May point to a directory a file, or nothing.
If it does not exist, and this value contains an extension, a file is
created, otherwise a directory is.
If it is a file, all CRLs are written in it.
If it is a folder, each CRL is stored in a separate file.
crl_pem_list (list of bytes)
"""
_savePEMList(
crl_path,
crl_pem_list,
lambda x: x509.load_pem_x509_crl(x, _cryptography_backend),
'.crl.pem',
)
def _savePEMList(path, pem_list, pem_loader, extension):
if os.path.exists(path):
if os.path.isfile(path):
savePEMList = _savePEMListToFile
elif os.path.isdir(path):
savePEMList = _savePEMListToDirectory
else:
raise TypeError('%s exist and is neither a directory nor a file' % (
crt_path,
path,
))
else:
saveCertListTo = (
_saveCertListToFile
if os.path.splitext(crt_path)[1] else
_saveCertListToDirectory
savePEMList = (
_savePEMListToFile
if os.path.splitext(path)[1] else
_savePEMListToDirectory
)
saveCertListTo(crt_path, cert_pem_list)
def _saveCertListToFile(ca_crt_path, cert_pem_list):
with open(ca_crt_path, 'wb') as ca_crt_file:
ca_crt_file.write(b''.join(cert_pem_list))
def _saveCertListToDirectory(crt_dir, cert_pem_list):
if not os.path.exists(crt_dir):
os.mkdir(crt_dir)
ca_cert_dict = {
'%x.pem' % (load_ca_certificate(x).serial_number, ): x
for x in cert_pem_list
savePEMList(path, pem_list, pem_loader, extension)
def _savePEMListToFile(file_path, pem_list, pem_loader, extension):
_ = pem_loader # Silence pylint
_ = extension # Silence pylint
with open(file_path, 'wb') as pem_file:
for pem_chunk in pem_list:
pem_file.write(pem_chunk)
def _savePEMListToDirectory(dir_path, pem_list, pem_loader, extension):
if not os.path.exists(dir_path):
os.mkdir(dir_path)
pem_dict = {
hexlify(
pem_loader(
x,
_cryptography_backend,
).extensions.get_extension_for_class(
x509.AuthorityKeyIdentifier,
).value.key_identifier,
) + extension: x
for x in pem_list
}
for cert_filename in os.listdir(crt_dir):
ca_crt_path = os.path.join(crt_dir, cert_filename)
if not os.path.isfile(ca_crt_path):
# Not a file and not a symlink to a file, ignore
for filename in os.listdir(dir_path):
filepath = os.path.join(dir_path, filename)
if not filepath.endswith(extension) or not os.path.isfile(filepath):
# Not a managed file name and not a symlink to a file, ignore
continue
if not os.path.islink(ca_crt_path) and cert_filename in ca_cert_dict:
if not os.path.islink(filepath) and filename in pem_dict:
try:
# pylint: disable=unbalanced-tuple-unpacking
cert, = getCertList(ca_crt_path)
file_pem_item, = _getPEMTypeDict(filepath).itervalues()
# pylint: enable=unbalanced-tuple-unpacking
# pylint: disable=broad-except
except Exception:
# pylint: enable=broad-except
# Inconsistent content (multiple certificates, not CA certificates,
# ...): overwrite file
# File contains multiple PEM items: overwrite
pass
else:
if cert == ca_cert_dict[cert_filename]:
if file_pem_item == pem_dict[filename]:
# Already consistent, do not edit.
del ca_cert_dict[cert_filename]
del pem_dict[filename]
else:
# Unknown file (ex: expired certificate), or a symlink to a file: delete
os.unlink(ca_crt_path)
for cert_filename, cert_pem in ca_cert_dict.items():
ca_crt_path = os.path.join(crt_dir, cert_filename)
with open(ca_crt_path, 'wb') as ca_crt_file:
ca_crt_file.write(cert_pem)
os.unlink(filepath)
for filename, pem_item in pem_dict.iteritems():
filepath = os.path.join(dir_path, filename)
with open(filepath, 'wb') as pem_file:
pem_file.write(pem_item)
def getCert(crt_path):
"""
......@@ -219,7 +256,7 @@ def getCert(crt_path):
crt, = type_dict.get(pem.Certificate)
return crt.as_bytes()
def getCertKeyAndCACert(crt_path, crl):
def getCertKeyAndCACert(crt_path, crl_list):
"""
Return a certificate with its private key and the certificate which signed
it.
......@@ -241,7 +278,7 @@ def getCertKeyAndCACert(crt_path, crl):
except ValueError:
continue
# key and crt match, check signatures
load_certificate(crt, [load_ca_certificate(ca_crt)], crl)
load_certificate(crt, [load_ca_certificate(ca_crt)], crl_list)
return crt, key, ca_crt
# Latest error comes from validateCertAndKey
raise # pylint: disable=misplaced-bare-raise
......@@ -337,7 +374,7 @@ def validateCertAndKey(cert_pem, key_pem):
).public_key().public_numbers():
raise ValueError('Mismatch between private key and certificate')
def _verifyCertificateChain(cert, trusted_cert_list, crl):
def _verifyCertificateChain(cert, trusted_cert_list, crl_list):
"""
Verifies whether certificate has been signed by any of the trusted
certificates, is not revoked and is whithin its validity period.
......@@ -358,8 +395,9 @@ def _verifyCertificateChain(cert, trusted_cert_list, crl):
assert trusted_cert_list
for trusted_cert in trusted_cert_list:
store.add_cert(crypto.X509.from_cryptography(trusted_cert))
if crl is not None:
store.add_crl(crypto.CRL.from_cryptography(crl))
if crl_list:
for crl in crl_list:
store.add_crl(crypto.CRL.from_cryptography(crl))
store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
try:
crypto.X509StoreContext(
......@@ -460,7 +498,7 @@ def load_ca_certificate(data):
_verifyCertificateChain(crt, [crt], None)
return crt
def load_certificate(data, trusted_cert_list, crl):
def load_certificate(data, trusted_cert_list, crl_list):
"""
Load a certificate from PEM-encoded data.
......@@ -468,7 +506,7 @@ def load_certificate(data, trusted_cert_list, crl):
any of trusted certificates, is revoked or is otherwise invalid.
"""
crt = x509.load_pem_x509_certificate(data, _cryptography_backend)
_verifyCertificateChain(crt, trusted_cert_list, crl)
_verifyCertificateChain(crt, trusted_cert_list, crl_list)
return crt
def dump_certificate(data):
......@@ -538,6 +576,14 @@ def load_crl(data, trusted_cert_list):
return crl
raise cryptography.exceptions.InvalidSignature
def getAuthorityKeyIdentifier(cert):
"""
Returns the authority key identifier of given certificate.
"""
return cert.extensions.get_extension_for_class(
x509.AuthorityKeyIdentifier,
).value.key_identifier
EPOCH = datetime.datetime(1970, 1, 1)
def datetime2timestamp(value):
"""
......
......@@ -22,6 +22,7 @@
Caucase - Certificate Authority for Users, Certificate Authority for SErvices
"""
from __future__ import absolute_import
from binascii import unhexlify
from Cookie import SimpleCookie, CookieError
import httplib
import json
......@@ -357,10 +358,24 @@ class Application(object):
'method': {
'GET': {
'do': self.getCertificateRevocationList,
'descriptor': [{
'name': 'getCertificateRevocationList',
'title': 'Retrieve latest certificate revocation list.',
}],
'subpath': SUBPATH_OPTIONAL,
'descriptor': [
{
'name': 'getCertificateRevocationListList',
'title': (
'Retrieve latest certificate revocation list for all valid '
'authorities.'
),
},
{
'name': 'getCertificateRevocationList',
'title': (
'Retrieve latest certificate revocation list for given '
'hex-encoded authority identifier.'
),
'subpath': '+{authority_key_id}',
},
],
},
},
},
......@@ -658,10 +673,10 @@ class Application(object):
utils.load_certificate(
environ.get('SSL_CLIENT_CERT', b''),
trusted_cert_list=ca_list,
crl=utils.load_crl(
self._cau.getCertificateRevocationList(),
ca_list,
),
crl_list=[
utils.load_crl(x, ca_list)
for x in self._cau.getCertificateRevocationListDict().itervalues()
],
)
except (exceptions.CertificateVerificationError, ValueError):
raise SSLUnauthorized
......@@ -812,6 +827,47 @@ class Application(object):
else:
raise Forbidden
@staticmethod
def _getAccept(environ, media_type='*', media_subtype='*'):
"""
Returns a dict with the properties of the given media type from request's
"Accept" header, or None if it is not listed.
Only the highest-quality match is returned (ex: given "text/plain",
"text/plain" will be returned rather than "text/*", itself returned rather
than "*/*").
"""
result_list = []
for accept in environ.get('HTTP_ACCEPT', '').split(','):
accept_list = [x.strip() for x in accept.split(';')]
try:
accept_type, accept_subtype = accept_list[0].split('/')
except ValueError: # Malformed
continue
if accept_type == '*':
match_quality = 0b00
elif accept_type == media_type:
if accept_subtype == '*':
match_quality = 0b10
elif accept_subtype == media_subtype:
match_quality = 0b11
else:
continue
else:
continue
property_dict = {}
for item in accept_list[1:]:
if '=' in item:
key, value = item.split('=', 1)
else:
key = item
value = ''
property_dict[key.strip()] = value.strip()
result_list.append((match_quality, property_dict))
if result_list:
result_list.sort(key=lambda x: x[0])
return result_list[-1][1]
return None
def getTopHAL(self, context, environ):
"""
Handle GET / .
......@@ -963,18 +1019,41 @@ class Application(object):
[],
)
def getCertificateRevocationList(
self,
context,
environ,
): # pylint: disable=unused-argument
def getCertificateRevocationList(self, context, environ, subpath):
"""
Handle GET /{context}/crl .
Handle GET /{context}/crl and GET /{context}/crl/{authority_key_id} .
"""
return self._returnFile(
context.getCertificateRevocationList(),
'application/pkix-crl',
)
crl_dict = context.getCertificateRevocationListDict()
if not subpath:
if self._getAccept(environ, 'application', 'json') is None:
# BBB: client does not specify which CRL it needs and does not accept
# json. It is an old client or an SSL library checking an old
# certificate, return the latest CRL (as was originally the case).
authority_key_id = utils.getAuthorityKeyIdentifier(
context.getCACertificate(),
)
else:
return self._returnFile(
json.dumps([
x.decode('ascii')
for x in crl_dict.itervalues()
]).encode('utf-8'),
'application/json',
)
else:
try:
authority_key_id, = subpath
except ValueError:
raise NotFound
try:
authority_key_id = unhexlify(authority_key_id)
except TypeError:
raise BadRequest(b'Invalid authority key id')
try:
crl = crl_dict[authority_key_id]
except KeyError:
raise NotFound
return self._returnFile(crl, 'application/pkix-crl')
def getCSR(self, context, environ, subpath):
"""
......
......@@ -149,8 +149,19 @@ paths:
description: OK - Renewed certificate retrieved
/crl:
get:
summary: Retrieve latest certificate revocation list
summary: Retrieve the list of latest certificate revocation list for all authority keys
operationId: getCertificateRevocationListList
produces:
- application/json
responses:
'200':
description: OK - CRL retrieved
/crl/{authority-key-id}:
get:
summary: Retrieve latest certificate revocation list for given authority key
operationId: getCertificateRevocationList
parameters:
- $ref: '#/parameters/authority-key-id'
produces:
- application/pkix-crl
responses:
......@@ -196,6 +207,12 @@ parameters:
description: An operation, signed with requester's private key
schema:
$ref: '#/definitions/signed-operation'
authority-key-id:
name: authority-key-id
in: path
description: hexadecimal representation of an authority key identifier
required: true
type: string
responses:
'400':
description: Bad Request - you probably provided wrong parameters
......
......@@ -50,7 +50,7 @@ setup(
install_requires=[
'cryptography>=2.2.1', # everything x509 except...
'pyOpenSSL>=18.0.0', # ...certificate chain validation
'pem>=17.1.0', # Parse PEM files
'pem>=18.2.0', # Parse PEM files
'PyJWT', # CORS token signature
],
zip_safe=True,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment